Browse Source

Merge branch 'master' of bitbucket.org:ckolivas/ckpool

master
ckolivas 10 years ago
parent
commit
e277fa2731
  1. 88
      src/ckdb.c
  2. 14
      src/connector.c
  3. 22
      src/generator.c

88
src/ckdb.c

@ -47,7 +47,7 @@
#define DB_VLOCK "1" #define DB_VLOCK "1"
#define DB_VERSION "0.7" #define DB_VERSION "0.7"
#define CKDB_VERSION DB_VERSION"-0.66" #define CKDB_VERSION DB_VERSION"-0.68"
#define WHERE_FFL " - from %s %s() line %d" #define WHERE_FFL " - from %s %s() line %d"
#define WHERE_FFL_HERE __FILE__, __func__, __LINE__ #define WHERE_FFL_HERE __FILE__, __func__, __LINE__
@ -8100,12 +8100,15 @@ static K_TREE *upd_add_mu(K_TREE *mu_root, K_STORE *mu_store, int64_t userid, in
/* Find the block_workinfoid of the block requested /* Find the block_workinfoid of the block requested
then add all it's diffacc shares then add all it's diffacc shares
then keep stepping back shares until diffacc_total matches or exceeds then keep stepping back shares until diffacc_total matches or exceeds
the blocks network difficulty (block_ndiff) - this is begin_workinfoid the number required (diff_want) - this is begin_workinfoid
(also summarising diffacc per user) (also summarising diffacc per user)
then keep stepping back until we complete the current begin_workinfoid then keep stepping back until we complete the current begin_workinfoid
(also summarising diffacc per user) (also summarising diffacc per user)
This will give us the total number of diff1 shares (diffacc_total) This will give us the total number of diff1 shares (diffacc_total)
to use for the payment calculations to use for the payment calculations
The value of diff_want defaults to the block's network difficulty
(block_ndiff) but can be changed with diff_times and diff_add to:
block_ndiff * diff_times + diff_add
The pplns_elapsed time of the shares is from the createdate of the The pplns_elapsed time of the shares is from the createdate of the
begin_workinfoid that has shares accounted to the total, begin_workinfoid that has shares accounted to the total,
up to the createdate of the block up to the createdate of the block
@ -8122,7 +8125,8 @@ static char *cmd_pplns(__maybe_unused PGconn *conn, char *cmd, char *id,
{ {
char reply[1024], tmp[1024], *buf; char reply[1024], tmp[1024], *buf;
size_t siz = sizeof(reply); size_t siz = sizeof(reply);
K_ITEM look, *i_height, *b_item, *w_item, *ss_item; K_ITEM *i_height, *i_difftimes, *i_diffadd, *i_allowaged;
K_ITEM look, *b_item, *w_item, *ss_item;
K_ITEM *mu_item, *wb_item, *u_item; K_ITEM *mu_item, *wb_item, *u_item;
SHARESUMMARY sharesummary; SHARESUMMARY sharesummary;
BLOCKS blocks; BLOCKS blocks;
@ -8131,9 +8135,14 @@ static char *cmd_pplns(__maybe_unused PGconn *conn, char *cmd, char *id,
int32_t height; int32_t height;
int64_t workinfoid, end_workinfoid; int64_t workinfoid, end_workinfoid;
int64_t begin_workinfoid; int64_t begin_workinfoid;
int64_t share_count;
tv_t cd, begin_tv, end_tv; tv_t cd, begin_tv, end_tv;
K_TREE_CTX ctx[1]; K_TREE_CTX ctx[1];
double ndiff, total, elapsed; double ndiff, total, elapsed;
double diff_times = 1.0;
double diff_add = 0.0;
double diff_want;
bool allow_aged = false;
char ndiffbin[TXT_SML+1]; char ndiffbin[TXT_SML+1];
size_t len, off; size_t len, off;
int rows; int rows;
@ -8143,9 +8152,22 @@ static char *cmd_pplns(__maybe_unused PGconn *conn, char *cmd, char *id,
i_height = require_name(trf_root, "height", 1, NULL, reply, siz); i_height = require_name(trf_root, "height", 1, NULL, reply, siz);
if (!i_height) if (!i_height)
return strdup(reply); return strdup(reply);
TXT_TO_INT("height", DATA_TRANSFER(i_height)->data, height); TXT_TO_INT("height", DATA_TRANSFER(i_height)->data, height);
i_difftimes = optional_name(trf_root, "diff_times", 1, NULL);
if (i_difftimes)
TXT_TO_DOUBLE("diff_times", DATA_TRANSFER(i_difftimes)->data, diff_times);
i_diffadd = optional_name(trf_root, "diff_add", 1, NULL);
if (i_diffadd)
TXT_TO_DOUBLE("diff_add", DATA_TRANSFER(i_diffadd)->data, diff_add);
i_allowaged = optional_name(trf_root, "allow_aged", 1, NULL);
if (i_allowaged) {
if (toupper(DATA_TRANSFER(i_allowaged)->data[0]) == TRUE_STR[0])
allow_aged = true;
}
cd.tv_sec = cd.tv_usec = 0L; cd.tv_sec = cd.tv_usec = 0L;
blocks.height = height + 1; blocks.height = height + 1;
blocks.blockhash[0] = '\0'; blocks.blockhash[0] = '\0';
@ -8175,7 +8197,9 @@ static char *cmd_pplns(__maybe_unused PGconn *conn, char *cmd, char *id,
hex2bin(ndiffbin, DATA_WORKINFO(w_item)->bits, 4); hex2bin(ndiffbin, DATA_WORKINFO(w_item)->bits, 4);
ndiff = diff_from_nbits(ndiffbin); ndiff = diff_from_nbits(ndiffbin);
diff_want = ndiff * diff_times + diff_add;
begin_workinfoid = 0; begin_workinfoid = 0;
share_count = 0;
total = 0; total = 0;
sharesummary.workinfoid = workinfoid; sharesummary.workinfoid = workinfoid;
@ -8196,8 +8220,21 @@ static char *cmd_pplns(__maybe_unused PGconn *conn, char *cmd, char *id,
mu_store = k_new_store(miningpayouts_free); mu_store = k_new_store(miningpayouts_free);
mu_root = new_ktree(); mu_root = new_ktree();
end_workinfoid = DATA_SHARESUMMARY(ss_item)->workinfoid; end_workinfoid = DATA_SHARESUMMARY(ss_item)->workinfoid;
// add up all sharesummaries until >= ndiff // add up all sharesummaries until >= diff_want
while (ss_item && total < ndiff) { while (ss_item && total < diff_want) {
switch (DATA_SHARESUMMARY(ss_item)->complete[0]) {
case SUMMARY_CONFIRM:
break;
case SUMMARY_COMPLETE:
if (allow_aged)
break;
default:
snprintf(reply, siz,
"ERR.sharesummary not ready in workinfo %"PRId64,
DATA_SHARESUMMARY(ss_item)->workinfoid);
goto shazbot;
}
share_count++;
total += (int64_t)(DATA_SHARESUMMARY(ss_item)->diffacc); total += (int64_t)(DATA_SHARESUMMARY(ss_item)->diffacc);
begin_workinfoid = DATA_SHARESUMMARY(ss_item)->workinfoid; begin_workinfoid = DATA_SHARESUMMARY(ss_item)->workinfoid;
mu_root = upd_add_mu(mu_root, mu_store, mu_root = upd_add_mu(mu_root, mu_store,
@ -8208,6 +8245,19 @@ static char *cmd_pplns(__maybe_unused PGconn *conn, char *cmd, char *id,
// include all the rest of the sharesummaries with begin_workinfoid // include all the rest of the sharesummaries with begin_workinfoid
while (ss_item && DATA_SHARESUMMARY(ss_item)->workinfoid == begin_workinfoid) { while (ss_item && DATA_SHARESUMMARY(ss_item)->workinfoid == begin_workinfoid) {
switch (DATA_SHARESUMMARY(ss_item)->complete[0]) {
case SUMMARY_CONFIRM:
break;
case SUMMARY_COMPLETE:
if (allow_aged)
break;
default:
snprintf(reply, siz,
"ERR.sharesummary not ready in workinfo %"PRId64,
DATA_SHARESUMMARY(ss_item)->workinfoid);
goto shazbot;
}
share_count++;
total += (int64_t)(DATA_SHARESUMMARY(ss_item)->diffacc); total += (int64_t)(DATA_SHARESUMMARY(ss_item)->diffacc);
mu_root = upd_add_mu(mu_root, mu_store, mu_root = upd_add_mu(mu_root, mu_store,
DATA_SHARESUMMARY(ss_item)->userid, DATA_SHARESUMMARY(ss_item)->userid,
@ -8234,7 +8284,7 @@ static char *cmd_pplns(__maybe_unused PGconn *conn, char *cmd, char *id,
* that were accepted as part of the block's workinfoid anyway * that were accepted as part of the block's workinfoid anyway
* All shares accepted in a workinfoid after the blocks workinfoid * All shares accepted in a workinfoid after the blocks workinfoid
* will not be creditied in this block no matter what the height * will not be creditied in this block no matter what the height
* of the workinfo - but will be candidates for the next block */ * of the workinfoid - but will be candidates for the next block */
elapsed = tvdiff(&end_tv, &begin_tv); elapsed = tvdiff(&end_tv, &begin_tv);
APPEND_REALLOC_INIT(buf, off, len); APPEND_REALLOC_INIT(buf, off, len);
@ -8249,8 +8299,6 @@ static char *cmd_pplns(__maybe_unused PGconn *conn, char *cmd, char *id,
APPEND_REALLOC(buf, off, len, tmp); APPEND_REALLOC(buf, off, len, tmp);
snprintf(tmp, sizeof(tmp), "diffacc_total=%.0f%c", total, FLDSEP); snprintf(tmp, sizeof(tmp), "diffacc_total=%.0f%c", total, FLDSEP);
APPEND_REALLOC(buf, off, len, tmp); APPEND_REALLOC(buf, off, len, tmp);
snprintf(tmp, sizeof(tmp), "block_ndiff=%f%c", ndiff, FLDSEP);
APPEND_REALLOC(buf, off, len, tmp);
snprintf(tmp, sizeof(tmp), "pplns_elapsed=%f%c", elapsed, FLDSEP); snprintf(tmp, sizeof(tmp), "pplns_elapsed=%f%c", elapsed, FLDSEP);
APPEND_REALLOC(buf, off, len, tmp); APPEND_REALLOC(buf, off, len, tmp);
@ -8286,7 +8334,18 @@ static char *cmd_pplns(__maybe_unused PGconn *conn, char *cmd, char *id,
mu_item = next_in_ktree(ctx); mu_item = next_in_ktree(ctx);
} }
snprintf(tmp, sizeof(tmp), "rows=%d", rows); snprintf(tmp, sizeof(tmp), "rows=%d%c", rows, FLDSEP);
APPEND_REALLOC(buf, off, len, tmp);
snprintf(tmp, sizeof(tmp), "block_ndiff=%f%c", ndiff, FLDSEP);
APPEND_REALLOC(buf, off, len, tmp);
snprintf(tmp, sizeof(tmp), "diff_times=%f%c", diff_times, FLDSEP);
APPEND_REALLOC(buf, off, len, tmp);
snprintf(tmp, sizeof(tmp), "diff_add=%f%c", diff_add, FLDSEP);
APPEND_REALLOC(buf, off, len, tmp);
snprintf(tmp, sizeof(tmp), "diff_want=%f%c", diff_want, FLDSEP);
APPEND_REALLOC(buf, off, len, tmp);
snprintf(tmp, sizeof(tmp), "share_count=%"PRId64, share_count);
APPEND_REALLOC(buf, off, len, tmp); APPEND_REALLOC(buf, off, len, tmp);
mu_root = free_ktree(mu_root, NULL); mu_root = free_ktree(mu_root, NULL);
@ -10204,6 +10263,14 @@ static struct option long_options[] = {
{ 0, 0, 0, 0 } { 0, 0, 0, 0 }
}; };
static void sighandler(int sig)
{
LOGWARNING("Received signal %d, shutting down", sig);
everyone_die = true;
cksleep_ms(420);
exit(0);
}
int main(int argc, char **argv) int main(int argc, char **argv)
{ {
struct sigaction handler; struct sigaction handler;
@ -10367,6 +10434,7 @@ int main(int argc, char **argv)
create_pthread(&ckp.pth_listener, listener, &ckp.main); create_pthread(&ckp.pth_listener, listener, &ckp.main);
handler.sa_handler = sighandler;
handler.sa_flags = 0; handler.sa_flags = 0;
sigemptyset(&handler.sa_mask); sigemptyset(&handler.sa_mask);
sigaction(SIGTERM, &handler, NULL); sigaction(SIGTERM, &handler, NULL);

14
src/connector.c

@ -340,6 +340,7 @@ void *sender(void *arg)
{ {
conn_instance_t *ci = (conn_instance_t *)arg; conn_instance_t *ci = (conn_instance_t *)arg;
ckpool_t *ckp = ci->pi->ckp; ckpool_t *ckp = ci->pi->ckp;
bool sent = false;
rename_proc("csender"); rename_proc("csender");
@ -349,8 +350,11 @@ void *sender(void *arg)
int ret, fd, ofs = 0; int ret, fd, ofs = 0;
mutex_lock(&sender_lock); mutex_lock(&sender_lock);
/* Poll every 100ms if there are no new sends */ /* Poll every 100ms if there are no new sends. Re-examine
if (!sender_sends) { * delayed sends immediately after a successful send in case
* endless new sends more frequently end up starving the
* delayed sends. */
if (!sender_sends && !sent) {
const ts_t polltime = {0, 100000000}; const ts_t polltime = {0, 100000000};
ts_t timeout_ts; ts_t timeout_ts;
@ -363,8 +367,11 @@ void *sender(void *arg)
DL_DELETE(sender_sends, sender_send); DL_DELETE(sender_sends, sender_send);
mutex_unlock(&sender_lock); mutex_unlock(&sender_lock);
sent = false;
/* Service delayed sends only if we have timed out on the /* Service delayed sends only if we have timed out on the
* conditional with no new sends appearing. */ * conditional with no new sends appearing or have just
* serviced another message successfully. */
if (!sender_send) { if (!sender_send) {
if (!delayed_sends) if (!delayed_sends)
continue; continue;
@ -404,6 +411,7 @@ void *sender(void *arg)
DL_APPEND(delayed_sends, sender_send); DL_APPEND(delayed_sends, sender_send);
continue; continue;
} }
sent = true;
while (sender_send->len) { while (sender_send->len) {
ret = send(fd, sender_send->buf + ofs, sender_send->len , 0); ret = send(fd, sender_send->buf + ofs, sender_send->len , 0);
if (unlikely(ret < 0)) { if (unlikely(ret < 0)) {

22
src/generator.c

@ -209,6 +209,7 @@ static int gen_loop(proc_instance_t *pi)
{ {
int sockd = -1, ret = 0, selret; int sockd = -1, ret = 0, selret;
server_instance_t *si = NULL; server_instance_t *si = NULL;
bool reconnecting = false;
unixsock_t *us = &pi->us; unixsock_t *us = &pi->us;
ckpool_t *ckp = pi->ckp; ckpool_t *ckp = pi->ckp;
bool started = false; bool started = false;
@ -218,14 +219,20 @@ static int gen_loop(proc_instance_t *pi)
char hash[68]; char hash[68];
reconnect: reconnect:
if (si) if (si) {
kill_server(si); kill_server(si);
reconnecting = true;
}
si = live_server(ckp); si = live_server(ckp);
if (!si) if (!si)
goto out; goto out;
gbt = si->data; gbt = si->data;
cs = &si->cs; cs = &si->cs;
if (reconnecting) {
LOGWARNING("Failed over to bitcoind: %s:%s", cs->url, cs->port);
reconnecting = false;
}
retry: retry:
do { do {
@ -238,7 +245,7 @@ retry:
} while (selret < 1); } while (selret < 1);
if (unlikely(cs->fd < 0)) { if (unlikely(cs->fd < 0)) {
LOGWARNING("Bitcoind socket invalidated, will atempt failover"); LOGWARNING("Bitcoind socket invalidated, will attempt failover");
goto reconnect; goto reconnect;
} }
@ -1346,15 +1353,24 @@ static void kill_proxy(proxy_instance_t *proxi)
static int proxy_loop(proc_instance_t *pi) static int proxy_loop(proc_instance_t *pi)
{ {
int sockd = -1, ret = 0, selret; int sockd = -1, ret = 0, selret;
proxy_instance_t *proxi = NULL;
bool reconnecting = false;
unixsock_t *us = &pi->us; unixsock_t *us = &pi->us;
ckpool_t *ckp = pi->ckp; ckpool_t *ckp = pi->ckp;
proxy_instance_t *proxi;
char *buf = NULL; char *buf = NULL;
reconnect: reconnect:
if (proxi)
reconnecting = true;
proxi = live_proxy(ckp); proxi = live_proxy(ckp);
if (!proxi) if (!proxi)
goto out; goto out;
if (reconnecting) {
connsock_t *cs = proxi->cs;
LOGWARNING("Successfully reconnected to %s:%s as proxy",
cs->url, cs->port);
reconnecting = false;
}
/* We've just subscribed and authorised so tell the stratifier to /* We've just subscribed and authorised so tell the stratifier to
* retrieve the first subscription. */ * retrieve the first subscription. */

Loading…
Cancel
Save