diff --git a/src/ckdb.c b/src/ckdb.c index 2c901d5b..5c77f970 100644 --- a/src/ckdb.c +++ b/src/ckdb.c @@ -1130,16 +1130,16 @@ static void alloc_storage() marks_root = new_ktree(); } -#define SEQSETWARN(_seqset, _msgtxt, _endtxt) do { \ +#define SEQSETWARN(_set, _seqset, _msgtxt, _endtxt) do { \ char _t_buf[DATE_BUFSIZ]; \ btu64_to_buf(&((_seqset)->seqstt), _t_buf, sizeof(_t_buf)); \ - LOGWARNING("SEQ %s: "SEQSTT" %"PRIu64"=%s "SEQPID" %"PRIu64" M%"PRIu64 \ + LOGWARNING("SEQ %s: %d/"SEQSTT" %"PRIu64"=%s "SEQPID" %"PRIu64" M%"PRIu64 \ "/T%"PRIu64"/L%"PRIu64"/S%"PRIu64"/H%"PRIu64"/OK%"PRIu64 \ " %s v%"PRIu64"/^%"PRIu64"/M%"PRIu64"/T%"PRIu64"/L%"PRIu64 \ "/S%"PRIu64"/H%"PRIu64"/OK%"PRIu64" %s v%"PRIu64"/^%"PRIu64 \ "/M%"PRIu64"/T%"PRIu64"/L%"PRIu64"/S%"PRIu64"/H%"PRIu64 \ "/OK%"PRIu64"%s", \ - _msgtxt, (_seqset)->seqstt, _t_buf, (_seqset)->seqpid, \ + _msgtxt, _set, (_seqset)->seqstt, _t_buf, (_seqset)->seqpid, \ (_seqset)->missing, (_seqset)->trans, (_seqset)->lost, \ (_seqset)->stale, (_seqset)->high, (_seqset)->ok, \ seqnam[SEQ_ALL], \ @@ -1198,13 +1198,61 @@ static void alloc_storage() #define FREE_ALL(_list) FREE_TREE(_list); FREE_LISTS(_list) +/* Write a share missing/lost report to the console - always report set 0 + * It's possible for the set numbers to be wrong and the output to report one + * seqset twice if a new seqset arrives between the unlock/lock for writing + * the console message - since a new seqset would shuffle the sets down and + * if the list was full, move the last one back to the top of the setset_store + * list, but this would normally only be when ckpool restarts and also wont + * cause a problem since only the last set can be moved and the code checks + * if it is the end and also duplicates the set before releasing the lock */ +void sequence_report(bool lock) +{ + SEQSET *seqset, seqset_copy; + K_ITEM *ss_item; + bool last, miss; + int set; + + last = false; + set = 0; + if (lock) + ck_wlock(&seq_lock); + ss_item = seqset_store->head; + while (!last && ss_item) { + if (!ss_item->next) + last = true; + DATA_SEQSET(seqset, ss_item); + /* Missing/Trans/Lost should all be 0 for shares */ + if (seqset->seqstt && (set == 0 || + seqset->seqdata[SEQ_SHARES].missing || + seqset->seqdata[SEQ_SHARES].trans || + seqset->seqdata[SEQ_SHARES].lost)) { + miss = (seqset->seqdata[SEQ_SHARES].missing || + seqset->seqdata[SEQ_SHARES].trans || + seqset->seqdata[SEQ_SHARES].lost); + if (lock) { + memcpy(&seqset_copy, seqset, sizeof(seqset_copy)); + ck_wunlock(&seq_lock); + seqset = &seqset_copy; + } + SEQSETWARN(set, seqset, + miss ? "SHARES MISSING" : "status" , EMPTY); + if (lock) + ck_wlock(&seq_lock); + } + ss_item = ss_item->next; + set++; + } + if (lock) + ck_wunlock(&seq_lock); +} + static void dealloc_storage() { SHAREERRORS *shareerrors; - K_ITEM *s_item, *ss_item; + K_ITEM *s_item; char *st = NULL; SHARES *shares; - SEQSET *seqset; LOGWARNING("%s() logqueue ...", __func__); @@ -1331,18 +1379,8 @@ static void dealloc_storage() FREE_LISTS(workqueue); LOGWARNING("%s() seqset ...", __func__); + sequence_report(false); - ss_item = seqset_store->head; - while (ss_item) { - DATA_SEQSET(seqset, ss_item); - /* Missing/Trans/Lost should all be 0 for shares */ - if (seqset->seqstt && (seqset->seqdata[SEQ_SHARES].missing || - seqset->seqdata[SEQ_SHARES].trans || - seqset->seqdata[SEQ_SHARES].lost)) { - SEQSETWARN(seqset, "SHARES MISSING", EMPTY); - } - ss_item = ss_item->next; - } FREE_LIST(seqtrans); FREE_STORE_DATA(seqset); @@ -1651,7 +1689,7 @@ static void trans_seq(tv_t *now) static bool update_seq(enum seq_num seq, uint64_t n_seqcmd, uint64_t n_seqstt, uint64_t n_seqpid, char *nam, tv_t *now, tv_t *cd, char *code, - bool warndup, char *msg) + int seqitemflags, char *msg) { char t_buf[DATE_BUFSIZ], t_buf2[DATE_BUFSIZ], *st = NULL; bool firstseq, newseq, expseq, gothigh, gotstale, gotstalestart; @@ -1659,13 +1697,13 @@ static bool update_seq(enum seq_num seq, uint64_t n_seqcmd, SEQSET seqset_exp = { 0 }, seqset_copy = { 0 }; bool dup, wastrans, doitem, dotime; SEQDATA *seqdata; - SEQITEM *seqitem; + SEQITEM *seqitem, seqitem_copy; K_ITEM *seqset_item = NULL, *st_item = NULL; SEQTRANS *seqtrans = NULL; size_t siz, end; void *off0, *offn; uint64_t u; - int set = -1, highlimit, i; + int set = -1, expset = -1, highlimit, i; K_STORE *lost; // We store the lost items in here @@ -1694,10 +1732,10 @@ static bool update_seq(enum seq_num seq, uint64_t n_seqcmd, } } - // Need to get a new seqset + // Need to setup a new seqset newseq = true; if (!firstseq) { - /* The current seqset (about to become the previous) + /* The current seqset (may become the previous) * If !seqset_store->head (i.e. a bug) this will quit() */ DATA_SEQSET(seqset0, seqset_store->head); memcpy(&seqset_pre, seqset0, sizeof(seqset_pre)); @@ -1758,16 +1796,59 @@ static bool update_seq(enum seq_num seq, uint64_t n_seqcmd, LIST_MEM_ADD_SIZ(seqset_free, end); } } else { - // Expire the last set and overwrite it - seqset_item = k_unlink_tail(seqset_store); - // If !item (i.e. a bug) this will quit() + // Expire the oldest set and overwrite it + K_ITEM *ss_item; + SEQSET *ss = NULL; + int s = 0; + seqset = NULL; + seqset_item = NULL; + ss_item = seqset_store->head; + while (ss_item) { + DATA_SEQSET(ss, ss_item); + if (!seqset) { + seqset = ss; + seqset_item = ss_item; + expset = s; + } else { + // choose the last match + if (ss->seqstt >= seqset->seqstt) { + seqset = ss; + seqset_item = ss_item; + expset = s; + } + } + ss_item = ss_item->next; + s++; + } + // If !seqset_item (i.e. a bug) k_unlink_item() will quit() + k_unlink_item(seqset_store, seqset_item); DATA_SEQSET(seqset, seqset_item); memcpy(&seqset_exp, seqset, sizeof(seqset_exp)); expseq = true; RESETSET(seqset, n_seqstt, n_seqpid); } - k_add_head(seqset_store, seqset_item); - set = 0; + /* Since the pool queue is active during the reload, sets can be out + * of order, so each new one should be added depending upon the value + * of seqstt so the current pool is first, to minimise searching + * seqset_store, but the order of the rest isn't as important + * N.B. a new set is only created once per pool start */ + if (firstseq) { + k_add_head(seqset_store, seqset_item); + set = 0; + } else { + // seqset0 already is the head + if (n_seqstt >= seqset0->seqstt) { + // if new set is >= head then make it the head + k_add_head(seqset_store, seqset_item); + set = 0; + } else { + // put it next after the head + k_insert_after(seqset_store, seqset_item, + seqset_store->head); + set = 1; + } + + } gotseqset: doitem = dotime = false; @@ -1857,6 +1938,7 @@ gotseqset: if (!ITEMISMIS(seqitem)) { dup = true; memcpy(&seqset_copy, seqset, sizeof(seqset_copy)); + memcpy(&seqitem_copy, seqitem, sizeof(seqitem_copy)); } else { // Found a missing one seqdata->missing--; @@ -1904,6 +1986,7 @@ gotseqset: setitemdata: // Store the new seq if flagged to do so if (doitem) { + seqitem->flags = seqitemflags; copy_tv(&(seqitem->time), now); copy_tv(&(seqitem->cd), cd); STRNCPY(seqitem->code, code); @@ -1930,28 +2013,38 @@ setitemdata: SEQSTT" %"PRIu64"=%s "SEQPID" %"PRIu64, nam, n_seqcmd, n_seqstt, t_buf, n_seqpid); } else { - if (newseq) - SEQSETWARN(&seqset_pre, "previous", EMPTY); + if (newseq) { + if (set == 0) + SEQSETWARN(0, &seqset_pre, "previous", EMPTY); + else + SEQSETWARN(0, &seqset_pre, "current", EMPTY); + } if (expseq) - SEQSETWARN(&seqset_exp, "discarded old", " for:"); + SEQSETWARN(expset, &seqset_exp, "discarded old", " for:"); if (newseq || expseq) { btu64_to_buf(&n_seqstt, t_buf, sizeof(t_buf)); - LOGWARNING("Seq created new: %s %"PRIu64" " + LOGWARNING("Seq created new: set %d %s %"PRIu64" " SEQSTT" %"PRIu64"=%s "SEQPID" %"PRIu64, - nam, n_seqcmd, n_seqstt, t_buf, n_seqpid); + set, nam, n_seqcmd, n_seqstt, t_buf, + n_seqpid); } } if (dup) { - int level = LOG_DEBUG; - if (warndup) - level = LOG_WARNING; + int level = LOG_WARNING; + /* If one is SI_RELOAD and the other is SI_EARLYSOCK then it's + * not unexpected so only LOG_DEBUG */ + if (((seqitem_copy.flags | seqitemflags) & SI_RELOAD) && + ((seqitem_copy.flags | seqitemflags) & SI_EARLYSOCK)) + level = LOG_DEBUG; btu64_to_buf(&n_seqstt, t_buf, sizeof(t_buf)); bt_to_buf(&(cd->tv_sec), t_buf2, sizeof(t_buf2)); - LOGMSG(level, "SEQ dup %s %"PRIu64" set %d/%"PRIu64"=%s/%"PRIu64 - " %s/%s v%"PRIu64"/^%"PRIu64"/M%"PRIu64 - "/T%"PRIu64"/L%"PRIu64"/S%"PRIu64"/H%"PRIu64 - "/OK%"PRIu64" cmd=%.42s...", + LOGMSG(level, "SEQ dup%s %c:%c %s %"PRIu64" set %d/%"PRIu64 + "=%s/%"PRIu64" %s/%s v%"PRIu64"/^%"PRIu64 + "/M%"PRIu64"/T%"PRIu64"/L%"PRIu64"/S%"PRIu64 + "/H%"PRIu64"/OK%"PRIu64" cmd=%.42s...", + (level == LOG_DEBUG) ? "*" : EMPTY, + SICHR(seqitemflags), SICHR(seqitem_copy.flags), nam, n_seqcmd, set, n_seqstt, t_buf, n_seqpid, t_buf2, code, seqset_copy.seqdata[seq].minseq, @@ -1967,15 +2060,12 @@ setitemdata: } if (wastrans) { - int level = LOG_DEBUG; - if (warndup) - level = LOG_WARNING; btu64_to_buf(&n_seqstt, t_buf, sizeof(t_buf)); bt_to_buf(&(cd->tv_sec), t_buf2, sizeof(t_buf2)); - LOGMSG(level, "SEQ found trans %s %"PRIu64" set %d/%"PRIu64 - "=%s/%"PRIu64" %s/%s", - nam, n_seqcmd, set, n_seqstt, t_buf, n_seqpid, - t_buf2, code); + LOGWARNING("SEQ found trans %s %"PRIu64" set %d/%"PRIu64 + "=%s/%"PRIu64" %s/%s", + nam, n_seqcmd, set, n_seqstt, t_buf, n_seqpid, + t_buf2, code); } if (gotstale || gotstalestart || gothigh) { @@ -2038,13 +2128,13 @@ setitemdata: static enum cmd_values process_seq(K_ITEM *seqall, int which, tv_t *cd, tv_t *now, char *msg, K_TREE *trf_root, - bool wantauth) + bool wantauth, int seqitemflags) { uint64_t n_seqall, n_seqstt, n_seqpid, n_seqcmd; K_ITEM *seqstt, *seqpid, *seqcmd, *i_code; char *err = NULL, *st = NULL; size_t len, off; - bool dupall, dupcmd, warndup; + bool dupall, dupcmd; char *code = NULL; char buf[64]; @@ -2127,27 +2217,19 @@ static enum cmd_values process_seq(K_ITEM *seqall, int which, tv_t *cd, code = EMPTY; } - if (!startup_complete) - warndup = true; - else { - if (reload_queue_complete) - warndup = true; - else - warndup = false; - } - dupall = update_seq(SEQ_ALL, n_seqall, n_seqstt, n_seqpid, SEQALL, - now, cd, code, warndup, msg); + now, cd, code, seqitemflags, msg); dupcmd = update_seq(ckdb_cmds[which].seq, n_seqcmd, n_seqstt, n_seqpid, - buf, now, cd, code, warndup, msg); + buf, now, cd, code, seqitemflags, msg); if (dupall != dupcmd) { - LOGERR("SEQ INIMICAL %s/%"PRIu64"=%s %s/%"PRIu64"=%s " - "cmd=%.32s...", - seqnam[SEQ_ALL], n_seqall, dupall ? "DUP" : "notdup", - seqnam[ckdb_cmds[which].seq], n_seqcmd, - dupcmd ? "DUP" : "notdup", - st = safe_text_nonull(msg)); + // Bad/corrupt data or a code bug + LOGEMERG("SEQ INIMICAL %s/%"PRIu64"=%s %s/%"PRIu64"=%s " + "cmd=%.32s...", + seqnam[SEQ_ALL], n_seqall, dupall ? "DUP" : "notdup", + seqnam[ckdb_cmds[which].seq], n_seqcmd, + dupcmd ? "DUP" : "notdup", + st = safe_text_nonull(msg)); FREENULL(st); } @@ -2169,7 +2251,8 @@ static enum cmd_values process_seq(K_ITEM *seqall, int which, tv_t *cd, static enum cmd_values breakdown(K_TREE **trf_root, K_STORE **trf_store, char *buf, int *which_cmds, char *cmd, - char *id, tv_t *now, tv_t *cd, bool wantauth) + char *id, tv_t *now, tv_t *cd, bool wantauth, + int seqitemflags) { char reply[1024] = ""; TRANSFER *transfer; @@ -2487,7 +2570,7 @@ static enum cmd_values breakdown(K_TREE **trf_root, K_STORE **trf_store, if (seqall) { enum cmd_values ret; ret = process_seq(seqall, *which_cmds, cd, now, buf, - *trf_root, wantauth); + *trf_root, wantauth, seqitemflags); free(cmdptr); return ret; } else { @@ -3551,10 +3634,13 @@ static void *socketer(__maybe_unused void *arg) else LOGDEBUG("Duplicate '%s' message received", duptype); } else { + int seqitemflags = SI_SOCKET; + if (!reload_queue_complete) + seqitemflags = SI_EARLYSOCK; LOGQUE(buf); cmdnum = breakdown(&trf_root, &trf_store, buf, &which_cmds, cmd, id, &now, - &cd, true); + &cd, true, seqitemflags); switch (cmdnum) { case CMD_DUPSEQ: snprintf(reply, sizeof(reply), "%s.%ld.dup.", id, now.tv_sec); @@ -3631,10 +3717,10 @@ static void *socketer(__maybe_unused void *arg) case CMD_HEARTBEAT: // First message from the pool if (want_first) { + want_first = false; ck_wlock(&fpm_lock); first_pool_message = strdup(buf); ck_wunlock(&fpm_lock); - want_first = false; } case CMD_CHKPASS: case CMD_ADDUSER: @@ -3650,6 +3736,7 @@ static void *socketer(__maybe_unused void *arg) case CMD_NEWID: case CMD_STATS: case CMD_USERSTATUS: + case CMD_SHSTA: ans = ckdb_cmds[which_cmds].func(NULL, cmd, id, &now, by_default, (char *)__func__, @@ -3773,10 +3860,10 @@ static void *socketer(__maybe_unused void *arg) case CMD_BLOCK: // First message from the pool if (want_first) { + want_first = false; ck_wlock(&fpm_lock); first_pool_message = strdup(buf); ck_wunlock(&fpm_lock); - want_first = false; } snprintf(reply, sizeof(reply), @@ -3858,7 +3945,7 @@ static void *socketer(__maybe_unused void *arg) return NULL; } -static bool reload_line(PGconn *conn, char *filename, uint64_t count, char *buf) +static void reload_line(PGconn *conn, char *filename, uint64_t count, char *buf) { char cmd[CMD_SIZ+1], id[ID_SIZ+1]; enum cmd_values cmdnum; @@ -3869,7 +3956,7 @@ static bool reload_line(PGconn *conn, char *filename, uint64_t count, char *buf) TRANSFER *transfer; K_ITEM *item; tv_t now, cd; - bool finished; + bool matched; // Once we've read the message setnow(&now); @@ -3885,19 +3972,19 @@ static bool reload_line(PGconn *conn, char *filename, uint64_t count, char *buf) else LOGERR("%s() Empty message line %"PRIu64, __func__, count); } else { - finished = false; + matched = false; ck_wlock(&fpm_lock); - if (first_pool_message && strcmp(first_pool_message, buf) == 0) - finished = true; + if (first_pool_message && strcmp(first_pool_message, buf) == 0) { + matched = true; + FREENULL(first_pool_message); + } ck_wunlock(&fpm_lock); - if (finished) { + if (matched) LOGERR("%s() reload ckpool queue match at line %"PRIu64, __func__, count); - return true; - } LOGQUE(buf); cmdnum = breakdown(&trf_root, &trf_store, buf, &which_cmds, - cmd, id, &now, &cd, false); + cmd, id, &now, &cd, false, SI_RELOAD); switch (cmdnum) { // Don't ever attempt to double process reload data case CMD_DUPSEQ: @@ -3939,7 +4026,8 @@ static bool reload_line(PGconn *conn, char *filename, uint64_t count, char *buf) case CMD_USERSTATUS: case CMD_MARKS: case CMD_PSHIFT: - LOGERR("%s() Message line %"PRIu64" '%s' - invalid - ignored", + case CMD_SHSTA: + LOGERR("%s() INVALID message line %"PRIu64" '%s' - ignored", __func__, count, cmd); break; case CMD_AUTH: @@ -3984,8 +4072,6 @@ static bool reload_line(PGconn *conn, char *filename, uint64_t count, char *buf) } tick(); - - return false; } // 10Mb for now - transactiontree can be large @@ -4067,10 +4153,10 @@ static bool reload_from(tv_t *start) PGconn *conn = NULL; char buf[DATE_BUFSIZ+1], run[DATE_BUFSIZ+1]; size_t rflen = strlen(restorefrom); - char *missingfirst = NULL, *missinglast = NULL; + char *missingfirst = NULL, *missinglast = NULL, *st = NULL; int missing_count; int processing; - bool finished = false, matched = false, ret = true, ok, apipe = false; + bool finished = false, ret = true, ok, apipe = false; char *filename = NULL; uint64_t count, total; tv_t now, begin; @@ -4115,8 +4201,9 @@ static bool reload_from(tv_t *start) * aborting early and not get the few slightly later out of * order messages in the log file */ while (!everyone_die && - logline(reload_buf, MAX_READ, fp, filename)) - matched = reload_line(conn, filename, ++count, reload_buf); + logline(reload_buf, MAX_READ, fp, filename)) { + reload_line(conn, filename, ++count, reload_buf); + } LOGWARNING("%s(): %sread %"PRIu64" line%s from %s", __func__, @@ -4199,16 +4286,14 @@ static bool reload_from(tv_t *start) if (everyone_die) return true; - if (!matched) { - ck_wlock(&fpm_lock); - if (first_pool_message) { - LOGERR("%s() reload completed without finding ckpool queue match '%.32s'...", - __func__, first_pool_message); - LOGERR("%s() restart ckdb to resolve this", __func__); - ret = false; - } - ck_wunlock(&fpm_lock); + ck_wlock(&fpm_lock); + if (first_pool_message) { + LOGERR("%s() reload didn't find the first ckpool queue '%.32s...", + __func__, st = safe_text(first_pool_message)); + FREENULL(st); + FREENULL(first_pool_message); } + ck_wunlock(&fpm_lock); reloading = false; FREENULL(reload_buf); @@ -4305,6 +4390,7 @@ static void *listener(void *arg) K_RUNLOCK(workqueue_store); LOGWARNING("reload shares OoO %s", ooo_status(ooo_buf, sizeof(ooo_buf))); + sequence_report(true); LOGWARNING("%s(): ckdb ready, queue %d", __func__, wqcount); diff --git a/src/ckdb.h b/src/ckdb.h index 636e7922..e5e1d6f7 100644 --- a/src/ckdb.h +++ b/src/ckdb.h @@ -55,7 +55,7 @@ #define DB_VLOCK "1" #define DB_VERSION "1.0.0" -#define CKDB_VERSION DB_VERSION"-1.073" +#define CKDB_VERSION DB_VERSION"-1.079" #define WHERE_FFL " - from %s %s() line %d" #define WHERE_FFL_HERE __FILE__, __func__, __LINE__ @@ -402,6 +402,7 @@ enum cmd_values { CMD_USERSTATUS, CMD_MARKS, CMD_PSHIFT, + CMD_SHSTA, CMD_END }; @@ -859,7 +860,23 @@ enum seq_num { // Ensure size is a (multiple of 8)-1 #define SEQ_CODE 15 +#define SICHR(_sif) (((_sif) == SI_EARLYSOCK) ? 'E' : \ + (((_sif) == SI_RELOAD) ? 'R' : \ + (((_sif) == SI_SOCKET) ? 'S' : '?'))) + +// Msg from the socket before startup completed - ignore if it was a DUP +#define SI_EARLYSOCK 1 +// Msg was from reload +#define SI_RELOAD 2 +// Msg from the socket after startup completed +#define SI_SOCKET 4 + +/* An SI_EARLYSOCK item vs an SI_RELOAD item is not considered a DUP + * since the reload reads to the end of the reload file after + * the match between the queue and the reload has been found */ + typedef struct seqitem { + int flags; tv_t cd; // sec:0=missing, usec:0=miss !0=trans tv_t time; char code[SEQ_CODE+1]; @@ -1864,6 +1881,7 @@ extern void logmsg(int loglevel, const char *fmt, ...); extern void setnow(tv_t *now); extern void tick(); extern PGconn *dbconnect(); +extern void sequence_report(bool lock); // *** // *** ckdb_data.c *** diff --git a/src/ckdb_cmd.c b/src/ckdb_cmd.c index 6187d467..5e145ebd 100644 --- a/src/ckdb_cmd.c +++ b/src/ckdb_cmd.c @@ -5844,6 +5844,24 @@ static char *cmd_pshift(__maybe_unused PGconn *conn, char *cmd, char *id, return(buf); } +/* Show a share status report on the console + * Currently: sequence status and OoO info */ +static char *cmd_shsta(__maybe_unused PGconn *conn, char *cmd, char *id, + __maybe_unused tv_t *now, __maybe_unused char *by, + __maybe_unused char *code, __maybe_unused char *inet, + __maybe_unused tv_t *notcd, __maybe_unused K_TREE *trf_root) +{ + char ooo_buf[256]; + char buf[256]; + + LOGWARNING("OoO %s", ooo_status(ooo_buf, sizeof(ooo_buf))); + sequence_report(true); + + snprintf(buf, sizeof(buf), "ok.%s", cmd); + LOGDEBUG("%s.%s", id, buf); + return strdup(buf); +} + // TODO: limit access by having seperate sockets for each #define ACCESS_POOL "p" #define ACCESS_SYSTEM "s" @@ -5955,5 +5973,6 @@ struct CMDS ckdb_cmds[] = { { CMD_USERSTATUS,"userstatus", false, false, cmd_userstatus, SEQ_NONE, ACCESS_SYSTEM ACCESS_WEB }, { CMD_MARKS, "marks", false, false, cmd_marks, SEQ_NONE, ACCESS_SYSTEM }, { CMD_PSHIFT, "pshift", false, false, cmd_pshift, SEQ_NONE, ACCESS_SYSTEM ACCESS_WEB }, + { CMD_SHSTA, "shsta", true, false, cmd_shsta, SEQ_NONE, ACCESS_SYSTEM }, { CMD_END, NULL, false, false, NULL, SEQ_NONE, NULL } }; diff --git a/src/ckdb_data.c b/src/ckdb_data.c index deee3f5d..17e3d431 100644 --- a/src/ckdb_data.c +++ b/src/ckdb_data.c @@ -2383,7 +2383,7 @@ void set_block_share_counters() { K_TREE_CTX ctx[1], ctx_ms[1]; K_ITEM *ss_item, ss_look, *ws_item, *wm_item, *ms_item, ms_look; - WORKERSTATUS *workerstatus; + WORKERSTATUS *workerstatus = NULL; SHARESUMMARY *sharesummary, looksharesummary; WORKMARKERS *workmarkers; MARKERSUMMARY *markersummary, lookmarkersummary; diff --git a/src/ckdb_dbio.c b/src/ckdb_dbio.c index 2c04c4af..be709f60 100644 --- a/src/ckdb_dbio.c +++ b/src/ckdb_dbio.c @@ -3436,7 +3436,7 @@ bool sharesummaries_to_markersummaries(PGconn *conn, WORKMARKERS *workmarkers, PGresult *res; K_TREE_CTX ss_ctx[1], ms_ctx[1]; SHARESUMMARY *sharesummary, looksharesummary; - MARKERSUMMARY *markersummary, lookmarkersummary, *p_markersummary; + MARKERSUMMARY *markersummary, lookmarkersummary, *p_markersummary = NULL; K_ITEM *ss_item, *ss_prev, ss_look, *ms_item, ms_look; K_ITEM *p_ss_item, *p_ms_item; bool ok = false, conned = false; @@ -3865,7 +3865,7 @@ bool _sharesummary_update(PGconn *conn, SHARES *s_row, SHAREERRORS *e_row, K_ITE PGresult *res = NULL; WORKMARKERS *wm; SHARESUMMARY *row, *p_row; - K_ITEM *item, *wm_item, *p_item; + K_ITEM *item, *wm_item, *p_item = NULL; char *ins, *upd; bool ok = false, new = false, p_new = false; char *params[19 + MODIFYDATECOUNT]; diff --git a/src/ckpmsg.c b/src/ckpmsg.c index 092a0121..52235247 100644 --- a/src/ckpmsg.c +++ b/src/ckpmsg.c @@ -1,5 +1,5 @@ /* - * Copyright 2014 Con Kolivas + * Copyright 2014-2015 Con Kolivas * * This program is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License as published by the Free @@ -54,15 +54,20 @@ void mkstamp(char *stamp, size_t siz) int main(int argc, char **argv) { - char *name = NULL, *socket_dir = NULL, *buf = NULL; + char *name = NULL, *socket_dir = NULL, *buf = NULL, *sockname = "listener"; int tmo1 = RECV_UNIX_TIMEOUT1; int tmo2 = RECV_UNIX_TIMEOUT2; bool proxy = false; char stamp[128]; int c; - while ((c = getopt(argc, argv, "n:s:pt:T:")) != -1) { + while ((c = getopt(argc, argv, "N:n:s:pt:T:")) != -1) { switch(c) { + /* Allows us to specify which process or socket to + * talk to. */ + case 'N': + sockname = strdup(optarg); + break; case 'n': name = strdup(optarg); break; @@ -92,7 +97,7 @@ int main(int argc, char **argv) realloc_strcat(&socket_dir, name); dealloc(name); trail_slash(&socket_dir); - realloc_strcat(&socket_dir, "listener"); + realloc_strcat(&socket_dir, sockname); while (42) { int sockd, len; diff --git a/src/ckpool.c b/src/ckpool.c index fd243e4e..4688c59d 100644 --- a/src/ckpool.c +++ b/src/ckpool.c @@ -203,8 +203,6 @@ bool ckmsgq_empty(ckmsgq_t *ckmsgq) return ret; } -static void childsighandler(const int sig); - /* Create a standalone thread that queues received unix messages for a proc * instance and adds them to linked list of received messages with their * associated receive socket, then signal the associated rmsg_cond for the @@ -830,7 +828,7 @@ static void terminate_oldpid(const ckpool_t *ckp, proc_instance_t *pi, const pid return; LOGWARNING("Old process %s pid %d failed to respond to terminate request, killing", pi->processname, oldpid); - if (kill_pid(oldpid, 9) || !pid_wait(oldpid, 500)) + if (kill_pid(oldpid, 9) || !pid_wait(oldpid, 3000)) quit(1, "Unable to kill old process %s pid %d", pi->processname, oldpid); } @@ -901,7 +899,7 @@ static void rm_namepid(const proc_instance_t *pi) /* Disable signal handlers for child processes, but simply pass them onto the * parent process to shut down cleanly. */ -static void childsighandler(const int sig) +void childsighandler(const int sig) { signal(sig, SIG_IGN); signal(SIGTERM, SIG_IGN); diff --git a/src/ckpool.h b/src/ckpool.h index 370bce71..9b09625e 100644 --- a/src/ckpool.h +++ b/src/ckpool.h @@ -255,6 +255,7 @@ char *_ckdb_msg_call(const ckpool_t *ckp, const char *msg, const char *file, co json_t *json_rpc_call(connsock_t *cs, const char *rpc_req); +void childsighandler(const int sig); int process_exit(ckpool_t *ckp, const proc_instance_t *pi, int ret); bool json_get_string(char **store, const json_t *val, const char *res); bool json_get_int64(int64_t *store, const json_t *val, const char *res); diff --git a/src/connector.c b/src/connector.c index ff0a75a8..ffaac679 100644 --- a/src/connector.c +++ b/src/connector.c @@ -29,12 +29,17 @@ struct client_instance { /* For clients hashtable */ UT_hash_handle hh; int64_t id; + + /* fd cannot be changed while a ref is held */ int fd; /* Reference count for when this instance is used outside of the * connector_data lock */ int ref; + /* Have we disabled this client to be removed when there are no refs? */ + bool invalid; + /* For dead_clients list */ client_instance_t *next; client_instance_t *prev; @@ -58,6 +63,7 @@ struct sender_send { client_instance_t *client; char *buf; int len; + int ofs; }; typedef struct sender_send sender_send_t; @@ -68,6 +74,8 @@ struct connector_data { cklock_t lock; proc_instance_t *pi; + time_t start_time; + /* Array of server fds */ int *serverfd; /* All time count of clients connected */ @@ -93,10 +101,11 @@ struct connector_data { /* For the linked list of pending sends */ sender_send_t *sender_sends; - sender_send_t *delayed_sends; int64_t sends_generated; int64_t sends_delayed; + int64_t sends_queued; + int64_t sends_size; /* For protecting the pending sends list */ mutex_t sender_lock; @@ -219,17 +228,21 @@ static int accept_client(cdata_t *cdata, const int epfd, const uint64_t server) } keep_sockalive(fd); - nolinger_socket(fd); + noblock_socket(fd); LOGINFO("Connected new client %d on socket %d to %d active clients from %s:%d", cdata->nfds, fd, no_clients, client->address_name, port); - client->fd = fd; - event.data.ptr = client; - event.events = EPOLLIN; + ck_wlock(&cdata->lock); + client->id = cdata->client_id++; + HASH_ADD_I64(cdata->clients, id, client); + cdata->nfds++; + ck_wunlock(&cdata->lock); + + event.data.u64 = client->id; + event.events = EPOLLIN | EPOLLRDHUP; if (unlikely(epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &event) < 0)) { LOGERR("Failed to epoll_ctl add in accept_client"); - recycle_client(cdata, client); return 0; } @@ -237,12 +250,7 @@ static int accept_client(cdata_t *cdata, const int epfd, const uint64_t server) * to it. We drop that reference when the socket is closed which * removes it automatically from the epoll list. */ __inc_instance_ref(client); - - ck_wlock(&cdata->lock); - client->id = cdata->client_id++; - HASH_ADD_I64(cdata->clients, id, client); - cdata->nfds++; - ck_wunlock(&cdata->lock); + client->fd = fd; return 1; } @@ -251,15 +259,14 @@ static int accept_client(cdata_t *cdata, const int epfd, const uint64_t server) static int drop_client(cdata_t *cdata, client_instance_t *client) { int64_t client_id = 0; - int fd; + int fd = -1; ck_wlock(&cdata->lock); - fd = client->fd; - if (fd != -1) { + if (!client->invalid) { + client->invalid = true; client_id = client->id; - - epoll_ctl(cdata->epfd, EPOLL_CTL_DEL, client->fd, NULL); - Close(client->fd); + fd = client->fd; + epoll_ctl(cdata->epfd, EPOLL_CTL_DEL, fd, NULL); HASH_DEL(cdata->clients, client); DL_APPEND(cdata->dead_clients, client); /* This is the reference to this client's presence in the @@ -275,7 +282,22 @@ static int drop_client(cdata_t *cdata, client_instance_t *client) return fd; } -static void stratifier_drop_client(ckpool_t *ckp, const int64_t id) +/* For sending the drop command to the upstream pool in passthrough mode */ +static void generator_drop_client(ckpool_t *ckp, const client_instance_t *client) +{ + json_t *val; + char *s; + + JSON_CPACK(val, "{si,sI:ss:si:ss:s[]}", "id", 42, "client_id", client->id, "address", + client->address_name, "server", client->server, "method", "mining.term", + "params"); + s = json_dumps(val, 0); + json_decref(val); + send_proc(ckp->generator, s); + free(s); +} + +static void stratifier_drop_id(ckpool_t *ckp, const int64_t id) { char buf[256]; @@ -283,6 +305,11 @@ static void stratifier_drop_client(ckpool_t *ckp, const int64_t id) send_proc(ckp->stratifier, buf); } +static void stratifier_drop_client(ckpool_t *ckp, const client_instance_t *client) +{ + stratifier_drop_id(ckp, client->id); +} + /* Invalidate this instance. Remove them from the hashtables we look up * regularly but keep the instances in a linked list until their ref count * drops to zero when we can remove them lazily. Client must hold a reference @@ -293,9 +320,10 @@ static int invalidate_client(ckpool_t *ckp, cdata_t *cdata, client_instance_t *c int ret; ret = drop_client(cdata, client); - if (ckp->passthrough) - goto out; - stratifier_drop_client(ckp, client->id); + if (!ckp->passthrough && !client->passthrough) + stratifier_drop_client(ckp, client); + else if (ckp->passthrough) + generator_drop_client(ckp, client); /* Cull old unused clients lazily when there are no more reference * counts for them. */ @@ -304,12 +332,16 @@ static int invalidate_client(ckpool_t *ckp, cdata_t *cdata, client_instance_t *c if (!client->ref) { DL_DELETE(cdata->dead_clients, client); LOGINFO("Connector recycling client %"PRId64, client->id); + /* We only close the client fd once we're sure there + * are no references to it left to prevent fds being + * reused on new and old clients. */ + nolinger_socket(client->fd); + Close(client->fd); __recycle_client(cdata, client); } } ck_wunlock(&cdata->lock); -out: return ret; } @@ -318,52 +350,39 @@ static void send_client(cdata_t *cdata, int64_t id, char *buf); /* Client is holding a reference count from being on the epoll list */ static void parse_client_msg(cdata_t *cdata, client_instance_t *client) { - int buflen, ret, selfail = 0; ckpool_t *ckp = cdata->ckp; char msg[PAGESIZE], *eol; + int buflen, ret; json_t *val; retry: - /* Select should always return positive after poll unless we have - * been disconnected. On retries, decdatade whether we should do further - * reads based on select readiness and only fail if we get an error. */ - ret = wait_read_select(client->fd, 0); - if (ret < 1) { - if (ret > selfail) - return; - LOGINFO("Client fd %d disconnected - select fail with bufofs %d ret %d errno %d %s", - client->fd, client->bufofs, ret, errno, ret && errno ? strerror(errno) : ""); + if (unlikely(client->bufofs > MAX_MSGSIZE)) { + LOGNOTICE("Client id %"PRId64" fd %d overloaded buffer without EOL, disconnecting", + client->id, client->fd); invalidate_client(ckp, cdata, client); return; } - selfail = -1; buflen = PAGESIZE - client->bufofs; - ret = recv(client->fd, client->buf + client->bufofs, buflen, 0); + /* This read call is non-blocking since the socket is set to O_NOBLOCK */ + ret = read(client->fd, client->buf + client->bufofs, buflen); if (ret < 1) { - /* We should have something to read if called since poll set - * this fd's revents status so if there's nothing it means the - * client has disconnected. */ - LOGINFO("Client fd %d disconnected - recv fail with bufofs %d ret %d errno %d %s", - client->fd, client->bufofs, ret, errno, ret && errno ? strerror(errno) : ""); + if (likely(errno == EAGAIN || errno == EWOULDBLOCK || !ret)) + return; + LOGINFO("Client id %"PRId64" fd %d disconnected - recv fail with bufofs %d ret %d errno %d %s", + client->id, client->fd, client->bufofs, ret, errno, ret && errno ? strerror(errno) : ""); invalidate_client(ckp, cdata, client); return; } client->bufofs += ret; reparse: eol = memchr(client->buf, '\n', client->bufofs); - if (!eol) { - if (unlikely(client->bufofs > MAX_MSGSIZE)) { - LOGWARNING("Client fd %d overloaded buffer without EOL, disconnecting", client->fd); - invalidate_client(ckp, cdata, client); - return; - } + if (!eol) goto retry; - } /* Do something useful with this message now */ buflen = eol - client->buf + 1; if (unlikely(buflen > MAX_MSGSIZE)) { - LOGWARNING("Client fd %d message oversize, disconnecting", client->fd); + LOGNOTICE("Client id %"PRId64" fd %d message oversize, disconnecting", client->id, client->fd); invalidate_client(ckp, cdata, client); return; } @@ -387,16 +406,17 @@ reparse: json_getdel_int64(&passthrough_id, val, "client_id"); passthrough_id = (client->id << 32) | passthrough_id; json_object_set_new_nocheck(val, "client_id", json_integer(passthrough_id)); - } else + } else { json_object_set_new_nocheck(val, "client_id", json_integer(client->id)); - json_object_set_new_nocheck(val, "address", json_string(client->address_name)); + json_object_set_new_nocheck(val, "address", json_string(client->address_name)); + } json_object_set_new_nocheck(val, "server", json_integer(client->server)); s = json_dumps(val, 0); /* Do not send messages of clients we've already dropped. We * do this unlocked as the occasional false negative can be * filtered by the stratifier. */ - if (likely(client->fd != -1)) { + if (likely(!client->invalid)) { if (ckp->passthrough) send_proc(ckp->generator, s); else @@ -418,8 +438,12 @@ static client_instance_t *ref_client_by_id(cdata_t *cdata, int64_t id) ck_wlock(&cdata->lock); HASH_FIND_I64(cdata->clients, &id, client); - if (client) - __inc_instance_ref(client); + if (client) { + if (!client->invalid) + __inc_instance_ref(client); + else + client = NULL; + } ck_wunlock(&cdata->lock); return client; @@ -439,19 +463,18 @@ void *receiver(void *arg) epfd = cdata->epfd = epoll_create1(EPOLL_CLOEXEC); if (epfd < 0) { LOGEMERG("FATAL: Failed to create epoll in receiver"); - return NULL; + goto out; } serverfds = cdata->ckp->serverurls; /* Add all the serverfds to the epoll */ for (i = 0; i < serverfds; i++) { - /* The small values will be easily identifiable compared to - * pointers */ + /* The small values will be less than the first client ids */ event.data.u64 = i; event.events = EPOLLIN; ret = epoll_ctl(epfd, EPOLL_CTL_ADD, cdata->serverfd[i], &event); if (ret < 0) { LOGEMERG("FATAL: Failed to add epfd %d to epoll_ctl", epfd); - return NULL; + goto out; } } @@ -480,44 +503,117 @@ void *receiver(void *arg) } continue; } - client = event.data.ptr; - /* Recheck this client still exists in the same form when it - * was queued. */ - client = ref_client_by_id(cdata, client->id); - if (unlikely(!client)) + client = ref_client_by_id(cdata, event.data.u64); + if (unlikely(!client)) { + LOGNOTICE("Failed to find client by id %"PRId64" in receiver!", event.data.u64); continue; - if ((event.events & EPOLLERR) || (event.events & EPOLLHUP)) { - /* Client disconnected */ - LOGDEBUG("Client fd %d HUP in epoll", client->fd); - invalidate_client(cdata->pi->ckp, cdata, client); - } else + } + if (unlikely(client->invalid)) + goto noparse; + /* We can have both messages and read hang ups so process the + * message first. */ + if (likely(event.events & EPOLLIN)) parse_client_msg(cdata, client); + if (unlikely(client->invalid)) + goto noparse; + if (unlikely(event.events & EPOLLERR)) { + socklen_t errlen = sizeof(int); + int error = 0; + + /* See what type of error this is and raise the log + * level of the message if it's unexpected. */ + getsockopt(client->fd, SOL_SOCKET, SO_ERROR, (void *)&error, &errlen); + if (error != 104) { + LOGNOTICE("Client id %"PRId64" fd %d epollerr HUP in epoll with errno %d: %s", + client->id, client->fd, error, strerror(error)); + } else { + LOGINFO("Client id %"PRId64" fd %d epollerr HUP in epoll with errno %d: %s", + client->id, client->fd, error, strerror(error)); + } + invalidate_client(cdata->pi->ckp, cdata, client); + } else if (unlikely(event.events & EPOLLHUP)) { + /* Client connection reset by peer */ + LOGINFO("Client id %"PRId64" fd %d HUP in epoll", client->id, client->fd); + invalidate_client(cdata->pi->ckp, cdata, client); + } else if (unlikely(event.events & EPOLLRDHUP)) { + /* Client disconnected by peer */ + LOGINFO("Client id %"PRId64" fd %d RDHUP in epoll", client->id, client->fd); + invalidate_client(cdata->pi->ckp, cdata, client); + } +noparse: dec_instance_ref(cdata, client); } +out: + /* We shouldn't get here unless there's an error */ + childsighandler(15); return NULL; } -/* Use a thread to send queued messages, using select() to only send to sockets - * ready for writing immediately to not delay other messages. */ -void *sender(void *arg) +/* Send a sender_send message and return true if we've finished sending it or + * are unable to send any more. */ +static bool send_sender_send(ckpool_t *ckp, cdata_t *cdata, sender_send_t *sender_send) +{ + client_instance_t *client = sender_send->client; + + if (unlikely(client->invalid)) + return true; + + while (sender_send->len) { + int ret = write(client->fd, sender_send->buf + sender_send->ofs, sender_send->len); + + if (unlikely(ret < 1)) { + if (errno == EAGAIN || errno == EWOULDBLOCK || !ret) + return false; + LOGINFO("Client id %"PRId64" fd %d disconnected with write errno %d:%s", + client->id, client->fd, errno, strerror(errno)); + invalidate_client(ckp, cdata, client); + return true; + } + sender_send->ofs += ret; + sender_send->len -= ret; + } + return true; +} + +static void clear_sender_send(sender_send_t *sender_send, cdata_t *cdata) +{ + dec_instance_ref(cdata, sender_send->client); + free(sender_send->buf); + free(sender_send); +} + +/* Use a thread to send queued messages, appending them to the sends list and + * iterating over all of them, attempting to send them all non-blocking to + * only send to those clients ready to receive data. */ +static void *sender(void *arg) { cdata_t *cdata = (cdata_t *)arg; + sender_send_t *sends = NULL; ckpool_t *ckp = cdata->ckp; - bool sent = false; rename_proc("csender"); while (42) { - sender_send_t *sender_send, *delayed; - client_instance_t *client; - int ret = 0, fd, ofs = 0; + int64_t sends_queued = 0, sends_size = 0; + sender_send_t *sending, *tmp; + + /* Check all sends to see if they can be written out */ + DL_FOREACH_SAFE(sends, sending, tmp) { + if (send_sender_send(ckp, cdata, sending)) { + DL_DELETE(sends, sending); + clear_sender_send(sending, cdata); + } else { + sends_queued++; + sends_size += sizeof(sender_send_t) + sending->len + 1; + } + } mutex_lock(&cdata->sender_lock); - /* Poll every 10ms if there are no new sends. Re-examine - * delayed sends immediately after a successful send in case - * endless new sends more frequently end up starving the - * delayed sends. */ - if (!cdata->sender_sends && !sent) { + cdata->sends_delayed += sends_queued; + cdata->sends_queued = sends_queued; + cdata->sends_size = sends_size; + /* Poll every 10ms if there are no new sends. */ + if (!cdata->sender_sends) { const ts_t polltime = {0, 10000000}; ts_t timeout_ts; @@ -525,86 +621,25 @@ void *sender(void *arg) timeraddspec(&timeout_ts, &polltime); cond_timedwait(&cdata->sender_cond, &cdata->sender_lock, &timeout_ts); } - sender_send = cdata->sender_sends; - if (sender_send) - DL_DELETE(cdata->sender_sends, sender_send); - mutex_unlock(&cdata->sender_lock); - - sent = false; - - /* Service delayed sends only if we have timed out on the - * conditional with no new sends appearing or have just - * serviced another message successfully. */ - if (!sender_send) { - /* Find a delayed client that needs servicing and set - * ret accordingly. We do not need to use FOREACH_SAFE - * as we break out of the loop as soon as we manipuate - * the list. */ - DL_FOREACH(cdata->delayed_sends, delayed) { - if ((ret = wait_write_select(delayed->client->fd, 0))) { - sender_send = cdata->delayed_sends; - DL_DELETE(cdata->delayed_sends, sender_send); - break; - } - } - /* None found ? */ - if (!sender_send) - continue; - } - client = sender_send->client; - - /* If this socket is not ready to receive data from us, put the - * send back on the tail of the list and decrease the timeout - * to poll to either look for a client that is ready or poll - * select on this one */ - ck_rlock(&cdata->lock); - fd = client->fd; - if (!ret) - ret = wait_write_select(fd, 0); - ck_runlock(&cdata->lock); - - if (ret < 1) { - if (ret < 0) { - LOGINFO("Client id %"PRId64" fd %d interrupted", client->id, fd); - invalidate_client(ckp, cdata, client); - goto contfree; - } - LOGDEBUG("Client %"PRId64" not ready for writes", client->id); - - /* Append it to the tail of the delayed sends list. - * This is the only function that alters it so no - * locking is required. Keep the client ref. */ - DL_APPEND(cdata->delayed_sends, sender_send); - cdata->sends_delayed++; - continue; + if (cdata->sender_sends) { + DL_CONCAT(sends, cdata->sender_sends); + cdata->sender_sends = NULL; } - while (sender_send->len) { - ret = send(fd, sender_send->buf + ofs, sender_send->len , 0); - if (unlikely(ret < 0)) { - LOGINFO("Client id %"PRId64" fd %d disconnected", client->id, fd); - invalidate_client(ckp, cdata, client); - break; - } - ofs += ret; - sender_send->len -= ret; - } -contfree: - sent = true; - free(sender_send->buf); - free(sender_send); - dec_instance_ref(cdata, client); + mutex_unlock(&cdata->sender_lock); } - + /* We shouldn't get here unless there's an error */ + childsighandler(15); return NULL; } /* Send a client by id a heap allocated buffer, allowing this function to * free the ram. */ -static void send_client(cdata_t *cdata, int64_t id, char *buf) +static void send_client(cdata_t *cdata, const int64_t id, char *buf) { + ckpool_t *ckp = cdata->ckp; sender_send_t *sender_send; client_instance_t *client; - int fd = -1, len; + int len; if (unlikely(!buf)) { LOGWARNING("Connector send_client sent a null buffer"); @@ -617,28 +652,36 @@ static void send_client(cdata_t *cdata, int64_t id, char *buf) return; } - ck_wlock(&cdata->lock); - HASH_FIND_I64(cdata->clients, &id, client); - if (likely(client)) { - fd = client->fd; - /* Grab a reference to this client until the sender_send has - * completed processing. */ - __inc_instance_ref(client); - } - ck_wunlock(&cdata->lock); + /* Grab a reference to this client until the sender_send has + * completed processing. Is this a passthrough subclient ? */ + if (id > 0xffffffffll) { + int64_t client_id, pass_id; - if (unlikely(fd == -1)) { - ckpool_t *ckp = cdata->ckp; - - if (client) { - /* This shouldn't happen */ - LOGWARNING("Client id %"PRId64" disconnected but fd already invalidated!", id); - invalidate_client(ckp, cdata, client); - } else { + client_id = id & 0xffffffffll; + pass_id = id >> 32; + /* Make sure the passthrough exists for passthrough subclients */ + client = ref_client_by_id(cdata, pass_id); + if (unlikely(!client)) { + LOGINFO("Connector failed to find passthrough id %"PRId64" of client id %"PRId64" to send to", + pass_id, client_id); + /* Now see if the subclient exists */ + client = ref_client_by_id(cdata, client_id); + if (client) { + invalidate_client(ckp, cdata, client); + dec_instance_ref(cdata, client); + } else + stratifier_drop_id(ckp, id); + free(buf); + return; + } + } else { + client = ref_client_by_id(cdata, id); + if (unlikely(!client)) { LOGINFO("Connector failed to find client id %"PRId64" to send to", id); + stratifier_drop_id(ckp, id); + free(buf); + return; } - free(buf); - return; } sender_send = ckzalloc(sizeof(sender_send_t)); @@ -676,7 +719,7 @@ static void passthrough_client(cdata_t *cdata, client_instance_t *client) static void process_client_msg(cdata_t *cdata, const char *buf) { - int64_t client_id64, client_id; + int64_t client_id; json_t *json_msg; char *msg; @@ -687,21 +730,18 @@ static void process_client_msg(cdata_t *cdata, const char *buf) } /* Extract the client id from the json message and remove its entry */ - json_getdel_int64(&client_id64, json_msg, "client_id"); - if (client_id64 > 0xffffffffll) { - int64_t passthrough_id; - - passthrough_id = client_id64 & 0xffffffffll; - client_id = client_id64 >> 32; - json_object_set_new_nocheck(json_msg, "client_id", json_integer(passthrough_id)); - } else - client_id = client_id64; + client_id = json_integer_value(json_object_get(json_msg, "client_id")); + json_object_del(json_msg, "client_id"); + /* Put client_id back in for a passthrough subclient, passing its + * upstream client_id instead of the passthrough's. */ + if (client_id > 0xffffffffll) + json_object_set_new_nocheck(json_msg, "client_id", json_integer(client_id & 0xffffffffll)); msg = json_dumps(json_msg, JSON_EOL); send_client(cdata, client_id, msg); json_decref(json_msg); } -static char *connector_stats(cdata_t *cdata) +static char *connector_stats(cdata_t *cdata, const int runtime) { json_t *val = json_object(), *subval; client_instance_t *client; @@ -710,6 +750,10 @@ static char *connector_stats(cdata_t *cdata) int64_t memsize; char *buf; + /* If called in passthrough mode we log stats instead of the stratifier */ + if (runtime) + json_set_int(val, "runtime", runtime); + ck_rlock(&cdata->lock); objects = HASH_COUNT(cdata->clients); memsize = SAFE_HASH_OVERHEAD(cdata->clients) + sizeof(client_instance_t) * objects; @@ -732,68 +776,57 @@ static char *connector_stats(cdata_t *cdata) memsize = 0; mutex_lock(&cdata->sender_lock); - generated = cdata->sends_generated; DL_FOREACH(cdata->sender_sends, send) { objects++; memsize += sizeof(sender_send_t) + send->len + 1; } - mutex_unlock(&cdata->sender_lock); - - JSON_CPACK(subval, "{si,si,si}", "count", objects, "memory", memsize, "generated", generated); + JSON_CPACK(subval, "{si,si,si}", "count", objects, "memory", memsize, "generated", cdata->sends_generated); json_set_object(val, "sends", subval); - objects = 0; - memsize = 0; - - mutex_lock(&cdata->sender_lock); - generated = cdata->sends_delayed; - DL_FOREACH(cdata->delayed_sends, send) { - objects++; - memsize += sizeof(sender_send_t) + send->len + 1; - } + JSON_CPACK(subval, "{si,si,si}", "count", cdata->sends_queued, "memory", cdata->sends_size, "generated", cdata->sends_delayed); mutex_unlock(&cdata->sender_lock); - JSON_CPACK(subval, "{si,si,si}", "count", objects, "memory", memsize, "generated", generated); json_set_object(val, "delays", subval); buf = json_dumps(val, JSON_NO_UTF8 | JSON_PRESERVE_ORDER); json_decref(val); - LOGNOTICE("Connector stats: %s", buf); + if (runtime) + LOGNOTICE("Passthrough:%s", buf); + else + LOGNOTICE("Connector stats: %s", buf); return buf; } static int connector_loop(proc_instance_t *pi, cdata_t *cdata) { - int64_t client_id64, client_id; unix_msg_t *umsg = NULL; ckpool_t *ckp = pi->ckp; - uint8_t test_cycle = 0; - char *buf; + time_t last_stats; + int64_t client_id; int ret = 0; + char *buf; LOGWARNING("%s connector ready", ckp->name); + last_stats = cdata->start_time; retry: + if (ckp->passthrough) { + time_t diff = time(NULL); + + if (diff - last_stats >= 60) { + last_stats = diff; + diff -= cdata->start_time; + buf = connector_stats(cdata, diff); + dealloc(buf); + } + } + if (umsg) { Close(umsg->sockd); free(umsg->buf); dealloc(umsg); } - if (!++test_cycle) { - /* Test for pthread join every 256 messages */ - if (unlikely(!pthread_tryjoin_np(cdata->pth_sender, NULL))) { - LOGEMERG("Connector sender thread shutdown, exiting"); - ret = 1; - goto out; - } - if (unlikely(!pthread_tryjoin_np(cdata->pth_receiver, NULL))) { - LOGEMERG("Connector receiver thread shutdown, exiting"); - ret = 1; - goto out; - } - } - do { umsg = get_unix_msg(pi); } while (!umsg); @@ -807,12 +840,14 @@ retry: } else if (cmdmatch(buf, "dropclient")) { client_instance_t *client; - ret = sscanf(buf, "dropclient=%"PRId64, &client_id64); - if (unlikely(ret < 0)) { + ret = sscanf(buf, "dropclient=%"PRId64, &client_id); + if (ret < 0) { LOGDEBUG("Connector failed to parse dropclient command: %s", buf); goto retry; } - client_id = client_id64 & 0xffffffffll; + /* A passthrough client, we can't drop this yet */ + if (client_id > 0xffffffffll) + goto retry; client = ref_client_by_id(cdata, client_id); if (unlikely(!client)) { LOGINFO("Connector failed to find client id %"PRId64" to drop", client_id); @@ -823,16 +858,16 @@ retry: if (ret >= 0) LOGINFO("Connector dropped client id: %"PRId64, client_id); } else if (cmdmatch(buf, "testclient")) { - ret = sscanf(buf, "testclient=%"PRId64, &client_id64); + ret = sscanf(buf, "testclient=%"PRId64, &client_id); if (unlikely(ret < 0)) { LOGDEBUG("Connector failed to parse testclient command: %s", buf); goto retry; } - client_id = client_id64 & 0xffffffffll; + client_id &= 0xffffffffll; if (client_exists(cdata, client_id)) goto retry; LOGINFO("Connector detected non-existent client id: %"PRId64, client_id); - stratifier_drop_client(ckp, client_id); + stratifier_drop_id(ckp, client_id); } else if (cmdmatch(buf, "ping")) { LOGDEBUG("Connector received ping request"); send_unix_msg(umsg->sockd, "pong"); @@ -846,7 +881,7 @@ retry: char *msg; LOGDEBUG("Connector received stats request"); - msg = connector_stats(cdata); + msg = connector_stats(cdata, 0); send_unix_msg(umsg->sockd, msg); } else if (cmdmatch(buf, "loglevel")) { sscanf(buf, "loglevel=%d", &ckp->loglevel); @@ -925,7 +960,9 @@ int connector(proc_instance_t *pi) Close(sockd); goto out; } - if (listen(sockd, SOMAXCONN) < 0) { + /* Set listen backlog to larger than SOMAXCONN in case the + * system configuration supports it */ + if (listen(sockd, 8192) < 0) { LOGERR("Connector failed to listen on socket"); Close(sockd); goto out; @@ -967,7 +1004,7 @@ int connector(proc_instance_t *pi) ret = 1; goto out; } - if (listen(sockd, SOMAXCONN) < 0) { + if (listen(sockd, 8192) < 0) { LOGERR("Connector failed to listen on socket"); Close(sockd); goto out; @@ -982,11 +1019,14 @@ int connector(proc_instance_t *pi) cklock_init(&cdata->lock); cdata->pi = pi; cdata->nfds = 0; - cdata->client_id = 1; + /* Set the client id to the highest serverurl count to distinguish + * them from the server fds in epoll. */ + cdata->client_id = ckp->serverurls; mutex_init(&cdata->sender_lock); cond_init(&cdata->sender_cond); create_pthread(&cdata->pth_sender, sender, cdata); create_pthread(&cdata->pth_receiver, receiver, cdata); + cdata->start_time = time(NULL); create_unix_receiver(pi); diff --git a/src/klist.c b/src/klist.c index 65e30448..1254628d 100644 --- a/src/klist.c +++ b/src/klist.c @@ -9,11 +9,36 @@ #include "klist.h" +#define _CHKLIST(_list, _name) do {\ + if (!_list) { \ + quithere(1, "%s() can't process a NULL " _name \ + KLIST_FFL, \ + __func__, KLIST_FFL_PASS); \ + } \ + } while (0); + +#define CHKLIST(__list) _CHKLIST(__list, "list") + +#define CHKLS(__list) _CHKLIST(__list, "list/store") + +#define _CHKITEM(_item, _list, _name) do {\ + if (!_item) { \ + quithere(1, "%s() can't process a NULL %s " _name \ + KLIST_FFL, \ + __func__, _list->name, \ + KLIST_FFL_PASS); \ + } \ + } while (0); + +#define CHKITEM(__item, __list) _CHKITEM(__item, __list, "item") + static void k_alloc_items(K_LIST *list, KLIST_FFL_ARGS) { K_ITEM *item; int allocate, i; + CHKLIST(list); + if (list->is_store) { quithere(1, "List %s store can't %s()" KLIST_FFL, list->name, __func__, KLIST_FFL_PASS); @@ -76,10 +101,12 @@ static void k_alloc_items(K_LIST *list, KLIST_FFL_ARGS) } } -K_STORE *k_new_store(K_LIST *list) +K_STORE *_k_new_store(K_LIST *list, KLIST_FFL_ARGS) { K_STORE *store; + CHKLIST(list); + store = calloc(1, sizeof(*store)); if (!store) quithere(1, "Failed to calloc store for %s", list->name); @@ -136,6 +163,8 @@ K_ITEM *_k_unlink_head(K_LIST *list, KLIST_FFL_ARGS) { K_ITEM *item; + CHKLS(list); + if (!(list->head) && !(list->is_store)) k_alloc_items(list, KLIST_FFL_PASS); @@ -163,6 +192,8 @@ K_ITEM *_k_unlink_head_zero(K_LIST *list, KLIST_FFL_ARGS) { K_ITEM *item; + CHKLS(list); + item = _k_unlink_head(list, KLIST_FFL_PASS); if (item) @@ -176,6 +207,8 @@ K_ITEM *_k_unlink_tail(K_LIST *list, KLIST_FFL_ARGS) { K_ITEM *item; + CHKLS(list); + if (!(list->do_tail)) { quithere(1, "List %s can't %s() - do_tail is false" KLIST_FFL, list->name, __func__, KLIST_FFL_PASS); @@ -200,6 +233,10 @@ K_ITEM *_k_unlink_tail(K_LIST *list, KLIST_FFL_ARGS) void _k_add_head(K_LIST *list, K_ITEM *item, KLIST_FFL_ARGS) { + CHKLS(list); + + CHKITEM(item, list); + if (item->name != list->name) { quithere(1, "List %s can't %s() a %s item" KLIST_FFL, list->name, __func__, item->name, KLIST_FFL_PASS); @@ -229,6 +266,10 @@ void _k_add_head(K_LIST *list, K_ITEM *item, KLIST_FFL_ARGS) /* slows it down (of course) - only for debugging void _k_free_head(K_LIST *list, K_ITEM *item, KLIST_FFL_ARGS) { + CHKLS(list); + + CHKITEM(item, list); + memset(item->data, 0xff, list->siz); _k_add_head(list, item, KLIST_FFL_PASS); } @@ -236,6 +277,10 @@ void _k_free_head(K_LIST *list, K_ITEM *item, KLIST_FFL_ARGS) void _k_add_tail(K_LIST *list, K_ITEM *item, KLIST_FFL_ARGS) { + CHKLS(list); + + CHKITEM(item, list); + if (item->name != list->name) { quithere(1, "List %s can't %s() a %s item" KLIST_FFL, list->name, __func__, item->name, KLIST_FFL_PASS); @@ -265,8 +310,51 @@ void _k_add_tail(K_LIST *list, K_ITEM *item, KLIST_FFL_ARGS) list->count_up++; } +// Insert item into the list next after 'after' +void _k_insert_after(K_LIST *list, K_ITEM *item, K_ITEM *after, KLIST_FFL_ARGS) +{ + CHKLS(list); + + CHKITEM(item, list); + + _CHKITEM(item, after, "after"); + + if (item->name != list->name) { + quithere(1, "List %s can't %s() a %s item" KLIST_FFL, + list->name, __func__, item->name, KLIST_FFL_PASS); + } + + if (after->name != list->name) { + quithere(1, "List %s can't %s() a %s after" KLIST_FFL, + list->name, __func__, item->name, KLIST_FFL_PASS); + } + + if (item->prev || item->next) { + quithere(1, "%s() added item %s still linked" KLIST_FFL, + __func__, item->name, KLIST_FFL_PASS); + } + + item->prev = after; + item->next = after->next; + if (item->next) + item->next->prev = item; + after->next = item; + + if (list->do_tail) { + if (list->tail == after) + list->tail = item; + } + + list->count++; + list->count_up++; +} + void _k_unlink_item(K_LIST *list, K_ITEM *item, KLIST_FFL_ARGS) { + CHKLS(list); + + CHKITEM(item, list); + if (item->name != list->name) { quithere(1, "List %s can't %s() a %s item" KLIST_FFL, list->name, __func__, item->name, KLIST_FFL_PASS); @@ -293,6 +381,10 @@ void _k_unlink_item(K_LIST *list, K_ITEM *item, KLIST_FFL_ARGS) void _k_list_transfer_to_head(K_LIST *from, K_LIST *to, KLIST_FFL_ARGS) { + _CHKLIST(from, "from list/store"); + + _CHKLIST(to, "to list/store"); + if (from->name != to->name) { quithere(1, "List %s can't %s() to a %s list" KLIST_FFL, from->name, __func__, to->name, KLIST_FFL_PASS); @@ -323,6 +415,10 @@ void _k_list_transfer_to_head(K_LIST *from, K_LIST *to, KLIST_FFL_ARGS) void _k_list_transfer_to_tail(K_LIST *from, K_LIST *to, KLIST_FFL_ARGS) { + _CHKLIST(from, "from list/store"); + + _CHKLIST(to, "to list/store"); + if (from->name != to->name) { quithere(1, "List %s can't %s() to a %s list" KLIST_FFL, from->name, __func__, to->name, KLIST_FFL_PASS); @@ -355,6 +451,8 @@ K_LIST *_k_free_list(K_LIST *list, KLIST_FFL_ARGS) { int i; + CHKLIST(list); + if (list->is_store) { quithere(1, "List %s can't %s() a store" KLIST_FFL, list->name, __func__, KLIST_FFL_PASS); @@ -379,6 +477,8 @@ K_LIST *_k_free_list(K_LIST *list, KLIST_FFL_ARGS) K_STORE *_k_free_store(K_STORE *store, KLIST_FFL_ARGS) { + _CHKLIST(store, "store"); + if (!(store->is_store)) { quithere(1, "Store %s can't %s() the list" KLIST_FFL, store->name, __func__, KLIST_FFL_PASS); @@ -394,6 +494,8 @@ void _k_cull_list(K_LIST *list, KLIST_FFL_ARGS) { int i; + CHKLIST(list); + if (list->is_store) { quithere(1, "List %s can't %s() a store" KLIST_FFL, list->name, __func__, KLIST_FFL_PASS); diff --git a/src/klist.h b/src/klist.h index 26aaf303..602bdac1 100644 --- a/src/klist.h +++ b/src/klist.h @@ -71,7 +71,8 @@ typedef struct k_list { // Upgrade I to W #define K_ULOCK(_list) ck_ulock(_list->lock) -extern K_STORE *k_new_store(K_LIST *list); +extern K_STORE *_k_new_store(K_LIST *list, KLIST_FFL_ARGS); +#define k_new_store(_list) _k_new_store(_list, KLIST_FFL_HERE) extern K_LIST *_k_new_list(const char *name, size_t siz, int allocate, int limit, bool do_tail, KLIST_FFL_ARGS); #define k_new_list(_name, _siz, _allocate, _limit, _do_tail) _k_new_list(_name, _siz, _allocate, _limit, _do_tail, KLIST_FFL_HERE) extern K_ITEM *_k_unlink_head(K_LIST *list, KLIST_FFL_ARGS); @@ -86,11 +87,13 @@ extern void _k_add_head(K_LIST *list, K_ITEM *item, KLIST_FFL_ARGS); #define k_free_head(__list, __item) _k_add_head(__list, __item, KLIST_FFL_HERE) extern void _k_add_tail(K_LIST *list, K_ITEM *item, KLIST_FFL_ARGS); #define k_add_tail(_list, _item) _k_add_tail(_list, _item, KLIST_FFL_HERE) +extern void _k_insert_after(K_LIST *list, K_ITEM *item, K_ITEM *after, KLIST_FFL_ARGS); +#define k_insert_after(_list, _item, _after) _k_insert_after(_list, _item, _after, KLIST_FFL_HERE) extern void _k_unlink_item(K_LIST *list, K_ITEM *item, KLIST_FFL_ARGS); #define k_unlink_item(_list, _item) _k_unlink_item(_list, _item, KLIST_FFL_HERE) -void _k_list_transfer_to_head(K_LIST *from, K_LIST *to, KLIST_FFL_ARGS); +extern void _k_list_transfer_to_head(K_LIST *from, K_LIST *to, KLIST_FFL_ARGS); #define k_list_transfer_to_head(_from, _to) _k_list_transfer_to_head(_from, _to, KLIST_FFL_HERE) -void _k_list_transfer_to_tail(K_LIST *from, K_LIST *to, KLIST_FFL_ARGS); +extern void _k_list_transfer_to_tail(K_LIST *from, K_LIST *to, KLIST_FFL_ARGS); #define k_list_transfer_to_tail(_from, _to) _k_list_transfer_to_tail(_from, _to, KLIST_FFL_HERE) extern K_LIST *_k_free_list(K_LIST *list, KLIST_FFL_ARGS); #define k_free_list(_list) _k_free_list(_list, KLIST_FFL_HERE) diff --git a/src/libckpool.c b/src/libckpool.c index 5bbb14ee..813ca7c5 100644 --- a/src/libckpool.c +++ b/src/libckpool.c @@ -893,27 +893,32 @@ int wait_close(int sockd, int timeout) if (unlikely(sockd < 0)) return -1; sfd.fd = sockd; - sfd.events = POLLIN; + sfd.events = POLLRDHUP; sfd.revents = 0; timeout *= 1000; ret = poll(&sfd, 1, timeout); if (ret < 1) return 0; - return sfd.revents & POLLHUP; + return sfd.revents & (POLLHUP | POLLRDHUP | POLLERR); } /* Emulate a select read wait for high fds that select doesn't support */ int wait_read_select(int sockd, int timeout) { struct pollfd sfd; + int ret = -1; if (unlikely(sockd < 0)) - return -1; + goto out; sfd.fd = sockd; - sfd.events = POLLIN; + sfd.events = POLLIN | POLLRDHUP; sfd.revents = 0; timeout *= 1000; - return poll(&sfd, 1, timeout); + ret = poll(&sfd, 1, timeout); + if (ret && !(sfd.revents & POLLIN)) + ret = -1; +out: + return ret; } int read_length(int sockd, void *buf, int len) @@ -983,14 +988,19 @@ out: int wait_write_select(int sockd, int timeout) { struct pollfd sfd; + int ret = -1; if (unlikely(sockd < 0)) - return -1; + goto out; sfd.fd = sockd; - sfd.events = POLLOUT; + sfd.events = POLLOUT | POLLRDHUP; sfd.revents = 0; timeout *= 1000; - return poll(&sfd, 1, timeout); + ret = poll(&sfd, 1, timeout); + if (ret && !(sfd.revents & POLLOUT)) + ret = -1; +out: + return ret; } int write_length(int sockd, const void *buf, int len) diff --git a/src/stratifier.c b/src/stratifier.c index e0623908..d9182035 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -1934,17 +1934,17 @@ static void __drop_client(sdata_t *sdata, stratum_instance_t *client, bool lazil if (client->workername) { if (user) { - ASPRINTF(msg, "Client %"PRId64" %s %suser %s worker %s dropped %s", - client->id, client->address, user->throttled ? "throttled " : "", - user->username, client->workername, lazily ? "lazily" : ""); + ASPRINTF(msg, "Dropped client %"PRId64" %s %suser %s worker %s %s", + client->id, client->address, user->throttled ? "throttled " : "", + user->username, client->workername, lazily ? "lazily" : ""); } else { - ASPRINTF(msg, "Client %"PRId64" %s no user worker %s dropped %s", - client->id, client->address, client->workername, - lazily ? "lazily" : ""); + ASPRINTF(msg, "Dropped client %"PRId64" %s no user worker %s %s", + client->id, client->address, client->workername, + lazily ? "lazily" : ""); } } else { - ASPRINTF(msg, "Workerless client %"PRId64" %s dropped %s", - client->id, client->address, lazily ? "lazily" : ""); + ASPRINTF(msg, "Dropped workerless client %"PRId64" %s %s", + client->id, client->address, lazily ? "lazily" : ""); } __del_client(sdata, client); __kill_instance(sdata, client); @@ -1998,7 +1998,7 @@ static stratum_instance_t *__recruit_stratum_instance(sdata_t *sdata) /* Enter with write instance_lock held */ static stratum_instance_t *__stratum_add_instance(ckpool_t *ckp, const int64_t id, - const char *address, const int server) + const char *address, int server) { stratum_instance_t *client; sdata_t *sdata = ckp->data; @@ -2008,6 +2008,9 @@ static stratum_instance_t *__stratum_add_instance(ckpool_t *ckp, const int64_t i client->id = id; client->session_id = ++sdata->session_id; strcpy(client->address, address); + /* Sanity check to not overflow lookup in ckp->serverurl[] */ + if (server >= ckp->serverurls) + server = 0; client->server = server; client->diff = client->old_diff = ckp->startdiff; client->ckp = ckp; @@ -2192,11 +2195,8 @@ static void request_reconnect(sdata_t *sdata, const char *cmd) if (port) url = strsep(&port, ","); if (url && port) { - int port_no; - - port_no = strtol(port, NULL, 10); - JSON_CPACK(json_msg, "{sosss[sii]}", "id", json_null(), "method", "client.reconnect", - "params", url, port_no, 0); + JSON_CPACK(json_msg, "{sosss[ssi]}", "id", json_null(), "method", "client.reconnect", + "params", url, port, 0); } else JSON_CPACK(json_msg, "{sosss[]}", "id", json_null(), "method", "client.reconnect", "params"); @@ -3836,8 +3836,7 @@ static int send_recv_auth(stratum_instance_t *client) json_get_string(&secondaryuserid, val, "secondaryuserid"); parse_worker_diffs(ckp, worker_array); client->suggest_diff = worker->mindiff; - if (!user->auth_time) - user->auth_time = time(NULL); + user->auth_time = time(NULL); } if (secondaryuserid && (!safecmp(response, "ok.authorise") || !safecmp(response, "ok.addrauth"))) { @@ -4691,7 +4690,7 @@ static void send_json_err(sdata_t *sdata, const int64_t client_id, json_t *id_va { json_t *val; - JSON_CPACK(val, "{soss}", "id", json_copy(id_val), "error", err_msg); + JSON_CPACK(val, "{soss}", "id", json_deep_copy(id_val), "error", err_msg); stratum_add_send(sdata, val, client_id); } @@ -4795,8 +4794,9 @@ static void suggest_diff(stratum_instance_t *client, const char *method, const j } /* Enter with client holding ref count */ -static void parse_method(sdata_t *sdata, stratum_instance_t *client, const int64_t client_id, - json_t *id_val, json_t *method_val, json_t *params_val) +static void parse_method(ckpool_t *ckp, sdata_t *sdata, stratum_instance_t *client, + const int64_t client_id, json_t *id_val, json_t *method_val, + json_t *params_val) { const char *method; @@ -4838,21 +4838,21 @@ static void parse_method(sdata_t *sdata, stratum_instance_t *client, const int64 if (unlikely(cmdmatch(method, "mining.passthrough"))) { char buf[256]; - LOGNOTICE("Adding passthrough client %"PRId64" %s", client_id, client->address); /* We need to inform the connector process that this client - * is a passthrough and to manage its messages accordingly. - * The client_id stays on the list but we won't send anything - * to it since it's unauthorised. Set the flag just in case. */ - client->authorised = false; + * is a passthrough and to manage its messages accordingly. No + * data from this client id should ever come back to this + * stratifier after this so drop the client in the stratifier. */ + LOGNOTICE("Adding passthrough client %"PRId64" %s", client_id, client->address); snprintf(buf, 255, "passthrough=%"PRId64, client_id); - send_proc(client->ckp->connector, buf); + send_proc(ckp->connector, buf); + drop_client(ckp, sdata, client_id); return; } /* We should only accept subscribed requests from here on */ if (!client->subscribed) { LOGINFO("Dropping unsubscribed client %"PRId64" %s", client_id, client->address); - connector_drop_client(client->ckp, client_id); + connector_drop_client(ckp, client_id); return; } @@ -4871,11 +4871,8 @@ static void parse_method(sdata_t *sdata, stratum_instance_t *client, const int64 /* We should only accept authorised requests from here on */ if (!client->authorised) { - /* Dropping unauthorised clients here also allows the - * stratifier process to restart since it will have lost all - * the stratum instance data. Clients will just reconnect. */ - LOGINFO("Dropping unauthorised client %"PRId64" %s", client_id, client->address); - connector_drop_client(client->ckp, client_id); + LOGINFO("Dropping %s from unauthorised client %"PRId64" %s", method, + client_id, client->address); return; } @@ -4891,6 +4888,13 @@ static void parse_method(sdata_t *sdata, stratum_instance_t *client, const int64 ckmsgq_add(sdata->stxnq, jp); return; } + + if (cmdmatch(method, "mining.term")) { + LOGDEBUG("Mining terminate requested from %"PRId64" %s", client_id, client->address); + drop_client(ckp, sdata, client_id); + return; + } + /* Unhandled message here */ LOGINFO("Unhandled client %"PRId64" %s method %s", client_id, client->address, method); return; @@ -4945,7 +4949,7 @@ static void parse_instance_msg(ckpool_t *ckp, sdata_t *sdata, smsg_t *msg, strat send_json_err(sdata, client_id, id_val, "-1:params not found"); goto out; } - parse_method(sdata, client, client_id, id_val, method, params); + parse_method(ckp, sdata, client, client_id, id_val, method, params); out: free_smsg(msg); } @@ -5055,10 +5059,18 @@ static void discard_json_params(json_params_t *jp) { json_decref(jp->method); json_decref(jp->params); - json_decref(jp->id_val); + if (jp->id_val) + json_decref(jp->id_val); free(jp); } +static void steal_json_id(json_t *val, json_params_t *jp) +{ + /* Steal the id_val as is to avoid a copy */ + json_object_set_new_nocheck(val, "id", jp->id_val); + jp->id_val = NULL; +} + static void sshare_process(ckpool_t *ckp, json_params_t *jp) { json_t *result_val, *json_msg, *err_val = NULL; @@ -5081,7 +5093,7 @@ static void sshare_process(ckpool_t *ckp, json_params_t *jp) result_val = parse_submit(client, json_msg, jp->params, &err_val); json_object_set_new_nocheck(json_msg, "result", result_val); json_object_set_new_nocheck(json_msg, "error", err_val ? err_val : json_null()); - json_object_set_nocheck(json_msg, "id", jp->id_val); + steal_json_id(json_msg, jp); stratum_add_send(sdata, json_msg, client_id); out_decref: dec_instance_ref(sdata, client); @@ -5143,7 +5155,7 @@ static void sauth_process(ckpool_t *ckp, json_params_t *jp) json_msg = json_object(); json_object_set_new_nocheck(json_msg, "result", result_val); json_object_set_new_nocheck(json_msg, "error", err_val ? err_val : json_null()); - json_object_set_nocheck(json_msg, "id", jp->id_val); + steal_json_id(json_msg, jp); stratum_add_send(sdata, json_msg, client_id); if (!json_is_true(result_val) || !client->suggest_diff) @@ -5307,7 +5319,7 @@ static void send_transactions(ckpool_t *ckp, json_params_t *jp) goto out; } val = json_object(); - json_object_set_nocheck(val, "id", jp->id_val); + steal_json_id(val, jp); if (cmdmatch(msg, "mining.get_transactions")) { int txns; @@ -5998,7 +6010,8 @@ int stratifier(proc_instance_t *pi) } mutex_init(&sdata->stats_lock); - create_pthread(&pth_statsupdate, statsupdate, ckp); + if (!ckp->passthrough) + create_pthread(&pth_statsupdate, statsupdate, ckp); mutex_init(&sdata->share_lock); mutex_init(&sdata->block_lock);