diff --git a/pool/page_reset.php b/pool/page_reset.php index 3a245258..0c6e878c 100644 --- a/pool/page_reset.php +++ b/pool/page_reset.php @@ -88,9 +88,9 @@ function dbreset() if ($emailinfo['STATUS'] != 'ok') syserror(); - $ans = resetPass($user, $pass); + $ans = resetPass($user, $pass, $twofa); if ($ans['STATUS'] != 'ok') - syserror(); + return resetfail(); unset($_SESSION['reset_user']); unset($_SESSION['reset_hash']); diff --git a/src/ckdb.c b/src/ckdb.c index 299c177e..a19a197d 100644 --- a/src/ckdb.c +++ b/src/ckdb.c @@ -112,7 +112,9 @@ static bool socketer_using_data; static bool summariser_using_data; static bool marker_using_data; static bool logger_using_data; -static bool listener_using_data; +static bool plistener_using_data; +static bool clistener_using_data; +static bool blistener_using_data; char *EMPTY = ""; const char *nullstr = "(null)"; @@ -303,7 +305,9 @@ K_STORE *msgline_store; // WORKQUEUE K_LIST *workqueue_free; -K_STORE *workqueue_store; +K_STORE *pool_workqueue_store; +K_STORE *cmd_workqueue_store; +K_STORE *btc_workqueue_store; mutex_t wq_waitlock; pthread_cond_t wq_waitcond; @@ -1011,7 +1015,9 @@ static void alloc_storage() workqueue_free = k_new_list("WorkQueue", sizeof(WORKQUEUE), ALLOC_WORKQUEUE, LIMIT_WORKQUEUE, true); - workqueue_store = k_new_store(workqueue_free); + pool_workqueue_store = k_new_store(workqueue_free); + cmd_workqueue_store = k_new_store(workqueue_free); + btc_workqueue_store = k_new_store(workqueue_free); heartbeatqueue_free = k_new_list("HeartBeatQueue", sizeof(HEARTBEATQUEUE), @@ -1507,7 +1513,10 @@ static void dealloc_storage() FREE_LIST(transfer); FREE_LISTS(heartbeatqueue); - FREE_LISTS(workqueue); + FREE_STORE(pool_workqueue); + FREE_STORE(cmd_workqueue); + FREE_STORE(btc_workqueue); + FREE_LIST(workqueue); FREE_LISTS(msgline); if (free_mode != FREE_MODE_ALL) @@ -2877,7 +2886,9 @@ static enum cmd_values breakdown(K_ITEM **ml_item, char *buf, tv_t *now, if (find_in_ktree_nolock(msgline->trf_root, t_item, ctx)) { if (transfer->mvalue != transfer->svalue) FREENULL(transfer->mvalue); + K_WLOCK(transfer_free); k_add_head(transfer_free, t_item); + K_WUNLOCK(transfer_free); } else { add_to_ktree_nolock(msgline->trf_root, t_item); k_add_head_nolock(msgline->trf_store, t_item); @@ -3922,33 +3933,138 @@ static void *logger(__maybe_unused void *arg) return NULL; } -#define STORELASTREPLY(_cmd) do { \ - if (last_ ## _cmd) \ - free(last_ ## _cmd); \ - last_ ## _cmd = buf; \ - buf = NULL; \ - if (reply_ ## _cmd) \ - free(reply_ ## _cmd); \ - reply_ ## _cmd = rep; \ - } while (0) +static void process_sockd(PGconn *conn, K_ITEM *wq_item) +{ + WORKQUEUE *workqueue; + MSGLINE *msgline; + K_ITEM *ml_item; + char *ans, *rep; + size_t siz; + + DATA_WORKQUEUE(workqueue, wq_item); + ml_item = workqueue->msgline_item; + DATA_MSGLINE(msgline, ml_item); + + ans = ckdb_cmds[msgline->which_cmds].func(conn, + msgline->cmd, + msgline->id, + &(msgline->now), + workqueue->by, + workqueue->code, + workqueue->inet, + &(msgline->cd), + msgline->trf_root); + siz = strlen(ans) + strlen(msgline->id) + 32; + rep = malloc(siz); + snprintf(rep, siz, "%s.%ld.%s", + msgline->id, + msgline->now.tv_sec, ans); + send_unix_msg(msgline->sockd, rep); + close(msgline->sockd); + FREENULL(ans); + FREENULL(rep); + + free_msgline_data(ml_item, true, true); + K_WLOCK(msgline_free); + k_add_head(msgline_free, ml_item); + K_WUNLOCK(msgline_free); + + K_WLOCK(workqueue_free); + k_add_head(workqueue_free, wq_item); + if (workqueue_free->count == workqueue_free->total && + workqueue_free->total >= ALLOC_WORKQUEUE * CULL_WORKQUEUE) + k_cull_list(workqueue_free); + K_WUNLOCK(workqueue_free); +} + +static void *clistener(__maybe_unused void *arg) +{ + PGconn *conn = NULL; + K_ITEM *wq_item; + time_t now; + + LOCK_INIT("db_clistener"); + rename_proc("db_clistener"); + + clistener_using_data = true; + + conn = dbconnect(); + now = time(NULL); + + while (!everyone_die) { + K_WLOCK(workqueue_free); + wq_item = k_unlink_head(cmd_workqueue_store); + K_WUNLOCK(workqueue_free); + + // Don't keep a connection for more than ~10s + if ((time(NULL) - now) > 10) { + PQfinish(conn); + conn = dbconnect(); + now = time(NULL); + } + + if (wq_item) { + process_sockd(conn, wq_item); + tick(); + } else + cksleep_ms(42); + } + + clistener_using_data = false; + + if (conn) + PQfinish(conn); + + return NULL; +} + +static void *blistener(__maybe_unused void *arg) +{ + PGconn *conn = NULL; + K_ITEM *wq_item; + time_t now; + + LOCK_INIT("db_blistener"); + rename_proc("db_blistener"); + + blistener_using_data = true; + + conn = dbconnect(); + now = time(NULL); + + while (!everyone_die) { + K_WLOCK(workqueue_free); + wq_item = k_unlink_head(btc_workqueue_store); + K_WUNLOCK(workqueue_free); + + // Don't keep a connection for more than ~10s + if ((time(NULL) - now) > 10) { + PQfinish(conn); + conn = dbconnect(); + now = time(NULL); + } + + if (wq_item) { + process_sockd(conn, wq_item); + tick(); + } else + cksleep_ms(142); + } + + blistener_using_data = false; + + if (conn) + PQfinish(conn); + + return NULL; +} static void *socketer(__maybe_unused void *arg) { proc_instance_t *pi = (proc_instance_t *)arg; + pthread_t clis_pt, blis_pt; unixsock_t *us = &pi->us; - char *end, *ans = NULL, *rep = NULL, *buf = NULL, *dot; - // No dup check for pool stats, the SEQ code will handle that - char *last_chkpass = NULL, *reply_chkpass = NULL; - char *last_adduser = NULL, *reply_adduser = NULL; - char *last_newpass = NULL, *reply_newpass = NULL; - char *last_userset = NULL, *reply_userset = NULL; - char *last_workerset = NULL, *reply_workerset = NULL; - char *last_newid = NULL, *reply_newid = NULL; - char *last_setatts = NULL, *reply_setatts = NULL; - char *last_setopts = NULL, *reply_setopts = NULL; - char *last_userstatus = NULL, *reply_userstatus = NULL; - char *last_web = NULL, *reply_web = NULL; - char *reply_last, duptype[CMD_SIZ+1]; + char *end, *ans = NULL, *rep = NULL, *buf = NULL; enum cmd_values cmdnum; int sockd; K_ITEM *wq_item = NULL, *ml_item = NULL; @@ -3957,7 +4073,7 @@ static void *socketer(__maybe_unused void *arg) char reply[1024+1]; size_t siz; tv_t now; - bool dup, want_first, show_dup, replied; + bool want_first, replied, btc; int loglevel, oldloglevel; pthread_detach(pthread_self()); @@ -3971,6 +4087,10 @@ static void *socketer(__maybe_unused void *arg) if (!everyone_die) { LOGWARNING("%s() Start processing...", __func__); socketer_using_data = true; + + create_pthread(&clis_pt, clistener, NULL); + + create_pthread(&blis_pt, blistener, NULL); } want_first = true; @@ -3979,7 +4099,7 @@ static void *socketer(__maybe_unused void *arg) dealloc(buf); sockd = accept(us->sockd, NULL, NULL); if (sockd < 0) { - LOGERR("Failed to accept on socket in listener"); + LOGERR("%s() Failed to accept on socket", __func__); break; } @@ -3997,380 +4117,171 @@ static void *socketer(__maybe_unused void *arg) if (!buf || !*buf) { // An empty message wont get a reply if (!buf) - LOGWARNING("Failed to get message in listener"); + LOGWARNING("%s() Failed to get message", __func__); else - LOGWARNING("Empty message in listener"); + LOGWARNING("%s() Empty message", __func__); } else { - /* For duplicates: - * Queued pool messages are handled by the queue code - * but since they reply ok.queued that message can - * be returned every time here - * System: repeat process them - * Web: current php web sends a timestamp of seconds - * so duplicate code will only trigger if the same - * message is sent within the same second and thus - * will effectively reduce the processing load for - * sequential duplicates - * As per the 'if' list below, - * remember individual last messages and replies and - * repeat the reply without reprocessing the message - * The rest are remembered in the same buffer 'web' - * so a duplicate will not be seen if another 'web' - * command arrived between two duplicate commands - */ - dup = false; - show_dup = true; - // These are ordered approximately most likely first - if (last_chkpass && strcmp(last_chkpass, buf) == 0) { - reply_last = reply_chkpass; - dup = true; - } else if (last_adduser && strcmp(last_adduser, buf) == 0) { - reply_last = reply_adduser; - dup = true; - } else if (last_newpass && strcmp(last_newpass, buf) == 0) { - reply_last = reply_newpass; - dup = true; - } else if (last_newid && strcmp(last_newid, buf) == 0) { - reply_last = reply_newid; - dup = true; - } else if (last_userset && strcmp(last_userset, buf) == 0) { - reply_last = reply_userset; - dup = true; - } else if (last_workerset && strcmp(last_workerset, buf) == 0) { - reply_last = reply_workerset; - dup = true; - } else if (last_setatts && strcmp(last_setatts, buf) == 0) { - reply_last = reply_setatts; - dup = true; - } else if (last_setopts && strcmp(last_setopts, buf) == 0) { - reply_last = reply_setopts; - dup = true; - } else if (last_userstatus && strcmp(last_userstatus, buf) == 0) { - reply_last = reply_userstatus; - dup = true; - } else if (last_web && strcmp(last_web, buf) == 0) { - reply_last = reply_web; - dup = true; - show_dup = false; - } - if (dup) { - send_unix_msg(sockd, reply_last); - STRNCPY(duptype, buf); - dot = strchr(duptype, '.'); - if (dot) - *dot = '\0'; - snprintf(reply, sizeof(reply), "%s%ld,%ld.%s", - LOGDUP, now.tv_sec, now.tv_usec, duptype); - // dup cant be pool - LOGQUE(reply, false); - if (show_dup) - LOGWARNING("Duplicate '%s' message received", duptype); - else - LOGDEBUG("Duplicate '%s' message received", duptype); - } else { - int seqentryflags = SE_SOCKET; - if (!reload_queue_complete) - seqentryflags = SE_EARLYSOCK; - cmdnum = breakdown(&ml_item, buf, &now, seqentryflags); - DATA_MSGLINE(msgline, ml_item); - replied = false; - switch (cmdnum) { - case CMD_REPLY: - snprintf(reply, sizeof(reply), - "%s.%ld.?.", - msgline->id, - now.tv_sec); - send_unix_msg(sockd, reply); - break; - case CMD_TERMINATE: - LOGWARNING("Listener received" - " terminate message," - " terminating ckdb"); - snprintf(reply, sizeof(reply), - "%s.%ld.ok.exiting", - msgline->id, - now.tv_sec); - send_unix_msg(sockd, reply); - everyone_die = true; - break; - case CMD_PING: - LOGDEBUG("Listener received ping" - " request"); - snprintf(reply, sizeof(reply), - "%s.%ld.ok.pong", - msgline->id, - now.tv_sec); - send_unix_msg(sockd, reply); - break; - case CMD_VERSION: + int seqentryflags = SE_SOCKET; + if (!reload_queue_complete) + seqentryflags = SE_EARLYSOCK; + cmdnum = breakdown(&ml_item, buf, &now, seqentryflags); + DATA_MSGLINE(msgline, ml_item); + replied = btc = false; + switch (cmdnum) { + case CMD_REPLY: + snprintf(reply, sizeof(reply), + "%s.%ld.?.", + msgline->id, + now.tv_sec); + send_unix_msg(sockd, reply); + break; + case CMD_TERMINATE: + LOGWARNING("Listener received" + " terminate message," + " terminating ckdb"); + snprintf(reply, sizeof(reply), + "%s.%ld.ok.exiting", + msgline->id, + now.tv_sec); + send_unix_msg(sockd, reply); + everyone_die = true; + break; + case CMD_PING: + LOGDEBUG("Listener received ping" + " request"); + snprintf(reply, sizeof(reply), + "%s.%ld.ok.pong", + msgline->id, + now.tv_sec); + send_unix_msg(sockd, reply); + break; + case CMD_VERSION: + LOGDEBUG("Listener received" + " version request"); + snprintf(reply, sizeof(reply), + "%s.%ld.ok.CKDB V%s", + msgline->id, + now.tv_sec, + CKDB_VERSION); + send_unix_msg(sockd, reply); + break; + case CMD_LOGLEVEL: + if (!*(msgline->id)) { LOGDEBUG("Listener received" - " version request"); + " loglevel, currently %d", + pi->ckp->loglevel); snprintf(reply, sizeof(reply), - "%s.%ld.ok.CKDB V%s", + "%s.%ld.ok.loglevel" + " currently %d", msgline->id, now.tv_sec, - CKDB_VERSION); - send_unix_msg(sockd, reply); - break; - case CMD_LOGLEVEL: - if (!*(msgline->id)) { - LOGDEBUG("Listener received" - " loglevel, currently %d", - pi->ckp->loglevel); - snprintf(reply, sizeof(reply), - "%s.%ld.ok.loglevel" - " currently %d", - msgline->id, - now.tv_sec, - pi->ckp->loglevel); - } else { - oldloglevel = pi->ckp->loglevel; - loglevel = atoi(msgline->id); - LOGDEBUG("Listener received loglevel" - " %d currently %d A", - loglevel, oldloglevel); - if (loglevel < LOG_EMERG || - loglevel > LOG_DEBUG) { - snprintf(reply, sizeof(reply), - "%s.%ld.ERR.invalid" - " loglevel %d" - " - currently %d", - msgline->id, - now.tv_sec, - loglevel, - oldloglevel); - } else { - pi->ckp->loglevel = loglevel; - snprintf(reply, sizeof(reply), - "%s.%ld.ok.loglevel" - " now %d - was %d", - msgline->id, - now.tv_sec, - pi->ckp->loglevel, - oldloglevel); - } - // Do this twice since the loglevel may have changed - LOGDEBUG("Listener received loglevel" - " %d currently %d B", - loglevel, oldloglevel); - } - send_unix_msg(sockd, reply); - break; - case CMD_FLUSH: - LOGDEBUG("Listener received" - " flush request"); - snprintf(reply, sizeof(reply), - "%s.%ld.ok.splash", - msgline->id, now.tv_sec); - send_unix_msg(sockd, reply); - fflush(stdout); - fflush(stderr); - if (global_ckp && global_ckp->logfd) - fflush(global_ckp->logfp); - break; - case CMD_CHKPASS: - case CMD_2FA: - case CMD_ADDUSER: - case CMD_NEWPASS: - case CMD_USERSET: - case CMD_WORKERSET: - case CMD_GETATTS: - case CMD_SETATTS: - case CMD_EXPATTS: - case CMD_GETOPTS: - case CMD_SETOPTS: - case CMD_BLOCKLIST: - case CMD_NEWID: - case CMD_STATS: - case CMD_USERSTATUS: - case CMD_SHSTA: - case CMD_USERINFO: - case CMD_BTCSET: - case CMD_LOCKS: - ans = ckdb_cmds[msgline->which_cmds].func(NULL, - msgline->cmd, - msgline->id, - &(msgline->now), - by_default, - (char *)__func__, - inet_default, - &(msgline->cd), - msgline->trf_root); - siz = strlen(ans) + strlen(msgline->id) + 32; - rep = malloc(siz); - snprintf(rep, siz, "%s.%ld.%s", - msgline->id, - now.tv_sec, ans); - send_unix_msg(sockd, rep); - FREENULL(ans); - switch (cmdnum) { - case CMD_CHKPASS: - STORELASTREPLY(chkpass); - break; - case CMD_ADDUSER: - STORELASTREPLY(adduser); - break; - case CMD_NEWPASS: - STORELASTREPLY(newpass); - break; - case CMD_USERSET: - STORELASTREPLY(userset); - break; - case CMD_WORKERSET: - STORELASTREPLY(workerset); - break; - case CMD_NEWID: - STORELASTREPLY(newid); - break; - case CMD_SETATTS: - STORELASTREPLY(setatts); - break; - case CMD_SETOPTS: - STORELASTREPLY(setopts); - break; - case CMD_USERSTATUS: - STORELASTREPLY(userstatus); - break; - // The rest - default: - free(rep); - } - rep = NULL; - break; - // Process, but reject (loading) until startup_complete - case CMD_HOMEPAGE: - case CMD_ALLUSERS: - case CMD_WORKERS: - case CMD_PAYMENTS: - case CMD_PPLNS: - case CMD_PPLNS2: - case CMD_PAYOUTS: - case CMD_MPAYOUTS: - case CMD_SHIFTS: - case CMD_PSHIFT: - case CMD_DSP: - case CMD_BLOCKSTATUS: - if (!startup_complete) { + pi->ckp->loglevel); + } else { + oldloglevel = pi->ckp->loglevel; + loglevel = atoi(msgline->id); + LOGDEBUG("Listener received loglevel" + " %d currently %d A", + loglevel, oldloglevel); + if (loglevel < LOG_EMERG || + loglevel > LOG_DEBUG) { snprintf(reply, sizeof(reply), - "%s.%ld.loading.%s", + "%s.%ld.ERR.invalid" + " loglevel %d" + " - currently %d", msgline->id, now.tv_sec, - msgline->cmd); - send_unix_msg(sockd, reply); + loglevel, + oldloglevel); } else { - DATA_MSGLINE(msgline, ml_item); - ans = ckdb_cmds[msgline->which_cmds].func(NULL, - msgline->cmd, - msgline->id, - &(msgline->now), - by_default, - (char *)__func__, - inet_default, - &(msgline->cd), - msgline->trf_root); - siz = strlen(ans) + strlen(msgline->id) + 32; - rep = malloc(siz); - snprintf(rep, siz, "%s.%ld.%s", - msgline->id, - now.tv_sec, ans); - send_unix_msg(sockd, rep); - FREENULL(ans); - if (cmdnum == CMD_DSP) - free(rep); - else { - if (last_web) - free(last_web); - last_web = buf; - buf = NULL; - if (reply_web) - free(reply_web); - reply_web = rep; - } - rep = NULL; - } - break; - /* Process, but reject (loading) until startup_complete - * and don't test for duplicates */ - case CMD_MARKS: - case CMD_QUERY: - if (!startup_complete) { + pi->ckp->loglevel = loglevel; snprintf(reply, sizeof(reply), - "%s.%ld.loading.%s", + "%s.%ld.ok.loglevel" + " now %d - was %d", msgline->id, now.tv_sec, - msgline->cmd); - send_unix_msg(sockd, reply); - } else { - DATA_MSGLINE(msgline, ml_item); - ans = ckdb_cmds[msgline->which_cmds].func(NULL, - msgline->cmd, - msgline->id, - &(msgline->now), - by_default, - (char *)__func__, - inet_default, - &(msgline->cd), - msgline->trf_root); - siz = strlen(ans) + strlen(msgline->id) + 32; - rep = malloc(siz); - snprintf(rep, siz, "%s.%ld.%s", - msgline->id, - now.tv_sec, ans); - send_unix_msg(sockd, rep); - FREENULL(ans); - FREENULL(rep); - } - break; - // Always process immediately: - case CMD_AUTH: - case CMD_ADDRAUTH: - case CMD_HEARTBEAT: - // First message from the pool - if (want_first) { - want_first = false; - ck_wlock(&fpm_lock); - first_pool_message = strdup(buf); - ck_wunlock(&fpm_lock); + pi->ckp->loglevel, + oldloglevel); } - DATA_MSGLINE(msgline, ml_item); - ans = ckdb_cmds[msgline->which_cmds].func(NULL, - msgline->cmd, - msgline->id, - &(msgline->now), - by_default, - (char *)__func__, - inet_default, - &(msgline->cd), - msgline->trf_root); - siz = strlen(ans) + strlen(msgline->id) + 32; - rep = malloc(siz); - snprintf(rep, siz, "%s.%ld.%s", + // Do this twice since the loglevel may have changed + LOGDEBUG("Listener received loglevel" + " %d currently %d B", + loglevel, oldloglevel); + } + send_unix_msg(sockd, reply); + break; + case CMD_FLUSH: + LOGDEBUG("Listener received" + " flush request"); + snprintf(reply, sizeof(reply), + "%s.%ld.ok.splash", + msgline->id, now.tv_sec); + send_unix_msg(sockd, reply); + fflush(stdout); + fflush(stderr); + if (global_ckp && global_ckp->logfd) + fflush(global_ckp->logfp); + break; + case CMD_USERSET: + case CMD_BTCSET: + btc = true; + case CMD_CHKPASS: + case CMD_2FA: + case CMD_ADDUSER: + case CMD_NEWPASS: + case CMD_WORKERSET: + case CMD_GETATTS: + case CMD_SETATTS: + case CMD_EXPATTS: + case CMD_GETOPTS: + case CMD_SETOPTS: + case CMD_BLOCKLIST: + case CMD_NEWID: + case CMD_STATS: + case CMD_USERSTATUS: + case CMD_SHSTA: + case CMD_USERINFO: + case CMD_LOCKS: + msgline->sockd = sockd; + sockd = -1; + K_WLOCK(workqueue_free); + wq_item = k_unlink_head(workqueue_free); + DATA_WORKQUEUE(workqueue, wq_item); + workqueue->msgline_item = ml_item; + workqueue->by = by_default; + workqueue->code = (char *)__func__; + workqueue->inet = inet_default; + if (btc) + k_add_tail(btc_workqueue_store, wq_item); + else + k_add_tail(cmd_workqueue_store, wq_item); + K_WUNLOCK(workqueue_free); + wq_item = ml_item = NULL; + break; + // Process, but reject (loading) until startup_complete + case CMD_HOMEPAGE: + case CMD_ALLUSERS: + case CMD_WORKERS: + case CMD_PAYMENTS: + case CMD_PPLNS: + case CMD_PPLNS2: + case CMD_PAYOUTS: + case CMD_MPAYOUTS: + case CMD_SHIFTS: + case CMD_PSHIFT: + case CMD_DSP: + case CMD_BLOCKSTATUS: + case CMD_MARKS: + case CMD_QUERY: + if (!startup_complete) { + snprintf(reply, sizeof(reply), + "%s.%ld.loading.%s", msgline->id, - now.tv_sec, ans); - send_unix_msg(sockd, rep); - FREENULL(ans); - replied = true; - // Always queue (ok.queued) - case CMD_SHARELOG: - case CMD_POOLSTAT: - case CMD_USERSTAT: - case CMD_WORKERSTAT: - case CMD_BLOCK: - if (!replied) { - // First message from the pool - if (want_first) { - want_first = false; - ck_wlock(&fpm_lock); - first_pool_message = strdup(buf); - ck_wunlock(&fpm_lock); - } - snprintf(reply, sizeof(reply), - "%s.%ld.ok.queued", - msgline->id, - now.tv_sec); - send_unix_msg(sockd, reply); - } - + now.tv_sec, + msgline->cmd); + send_unix_msg(sockd, reply); + } else { + msgline->sockd = sockd; + sockd = -1; K_WLOCK(workqueue_free); wq_item = k_unlink_head(workqueue_free); DATA_WORKQUEUE(workqueue, wq_item); @@ -4378,43 +4289,108 @@ static void *socketer(__maybe_unused void *arg) workqueue->by = by_default; workqueue->code = (char *)__func__; workqueue->inet = inet_default; - k_add_tail(workqueue_store, wq_item); - /* Stop the reload queue from growing too big - * Use a size that should be big enough */ - if (reloading && workqueue_store->count > 250000) { - K_ITEM *wq2_item = k_unlink_head(workqueue_store); - K_WUNLOCK(workqueue_free); - WORKQUEUE *wq; - DATA_WORKQUEUE(wq, wq2_item); - K_ITEM *ml_item = wq->msgline_item; - free_msgline_data(ml_item, true, false); - K_WLOCK(msgline_free); - k_add_head(msgline_free, ml_item); - K_WUNLOCK(msgline_free); - K_WLOCK(workqueue_free); - k_add_head(workqueue_free, wq2_item); - } + if (btc) + k_add_tail(btc_workqueue_store, wq_item); + else + k_add_tail(cmd_workqueue_store, wq_item); K_WUNLOCK(workqueue_free); - ml_item = NULL; - mutex_lock(&wq_waitlock); - pthread_cond_signal(&wq_waitcond); - mutex_unlock(&wq_waitlock); - break; - // Code error - default: - LOGEMERG("%s() CODE ERROR unhandled" - " message %d %.32s...", - __func__, cmdnum, buf); + wq_item = ml_item = NULL; + } + break; + // Always process immediately: + case CMD_AUTH: + case CMD_ADDRAUTH: + case CMD_HEARTBEAT: + // First message from the pool + if (want_first) { + want_first = false; + ck_wlock(&fpm_lock); + first_pool_message = strdup(buf); + ck_wunlock(&fpm_lock); + } + DATA_MSGLINE(msgline, ml_item); + ans = ckdb_cmds[msgline->which_cmds].func(NULL, + msgline->cmd, + msgline->id, + &(msgline->now), + by_default, + (char *)__func__, + inet_default, + &(msgline->cd), + msgline->trf_root); + siz = strlen(ans) + strlen(msgline->id) + 32; + rep = malloc(siz); + snprintf(rep, siz, "%s.%ld.%s", + msgline->id, + now.tv_sec, ans); + send_unix_msg(sockd, rep); + FREENULL(ans); + replied = true; + // Always queue (ok.queued) + case CMD_SHARELOG: + case CMD_POOLSTAT: + case CMD_USERSTAT: + case CMD_WORKERSTAT: + case CMD_BLOCK: + if (!replied) { + // First message from the pool + if (want_first) { + want_first = false; + ck_wlock(&fpm_lock); + first_pool_message = strdup(buf); + ck_wunlock(&fpm_lock); + } snprintf(reply, sizeof(reply), - "%s.%ld.failed.code", + "%s.%ld.ok.queued", msgline->id, now.tv_sec); send_unix_msg(sockd, reply); - break; - } + } + + K_WLOCK(workqueue_free); + wq_item = k_unlink_head(workqueue_free); + DATA_WORKQUEUE(workqueue, wq_item); + workqueue->msgline_item = ml_item; + workqueue->by = by_default; + workqueue->code = (char *)__func__; + workqueue->inet = inet_default; + k_add_tail(pool_workqueue_store, wq_item); + /* Stop the reload queue from growing too big + * Use a size that should be big enough */ + if (reloading && pool_workqueue_store->count > 250000) { + K_ITEM *wq2_item = k_unlink_head(pool_workqueue_store); + K_WUNLOCK(workqueue_free); + WORKQUEUE *wq; + DATA_WORKQUEUE(wq, wq2_item); + K_ITEM *ml_item = wq->msgline_item; + free_msgline_data(ml_item, true, false); + K_WLOCK(msgline_free); + k_add_head(msgline_free, ml_item); + K_WUNLOCK(msgline_free); + K_WLOCK(workqueue_free); + k_add_head(workqueue_free, wq2_item); + } + K_WUNLOCK(workqueue_free); + wq_item = ml_item = NULL; + mutex_lock(&wq_waitlock); + pthread_cond_signal(&wq_waitcond); + mutex_unlock(&wq_waitlock); + break; + // Code error + default: + LOGEMERG("%s() CODE ERROR unhandled" + " message %d %.32s...", + __func__, cmdnum, buf); + snprintf(reply, sizeof(reply), + "%s.%ld.failed.code", + msgline->id, + now.tv_sec); + send_unix_msg(sockd, reply); + break; } } - close(sockd); + if (sockd >= 0) + close(sockd); if (ml_item) { free_msgline_data(ml_item, true, true); @@ -4431,7 +4407,6 @@ static void *socketer(__maybe_unused void *arg) if (buf) dealloc(buf); - // TODO: if anyone cares, free all the dup buffers :P close_unix_socket(us->sockd, us->path); return NULL; @@ -4910,7 +4885,8 @@ static void *listener(void *arg) K_ITEM *ss_item; int i; - LOCK_INIT("db_listener"); + LOCK_INIT("db_plistener"); + rename_proc("db_plistener"); logqueue_free = k_new_list("LogQueue", sizeof(LOGQUEUE), ALLOC_LOGQUEUE, LIMIT_LOGQUEUE, true); @@ -4928,9 +4904,7 @@ static void *listener(void *arg) create_pthread(&mark_pt, marker, NULL); - rename_proc("db_listener"); - - listener_using_data = true; + plistener_using_data = true; if (!setup_data()) { if (!everyone_die) { @@ -4942,7 +4916,7 @@ static void *listener(void *arg) if (!everyone_die) { K_RLOCK(workqueue_free); - wqcount = workqueue_store->count; + wqcount = pool_workqueue_store->count; K_RUNLOCK(workqueue_free); LOGWARNING("reload shares OoO %s", ooo_status(ooo_buf, sizeof(ooo_buf))); @@ -4973,8 +4947,8 @@ static void *listener(void *arg) // Process queued work while (!everyone_die) { K_WLOCK(workqueue_free); - wq_item = k_unlink_head(workqueue_store); - left = workqueue_store->count; + wq_item = k_unlink_head(pool_workqueue_store); + left = pool_workqueue_store->count; K_WUNLOCK(workqueue_free); if (left == 0 && wq_stt.tv_sec != 0L) @@ -5049,7 +5023,7 @@ static void *listener(void *arg) sayonara: - listener_using_data = false; + plistener_using_data = false; if (conn) PQfinish(conn); @@ -5938,7 +5912,8 @@ int main(int argc, char **argv) trigger = start = time(NULL); while (socketer_using_data || summariser_using_data || - logger_using_data || listener_using_data || + logger_using_data || plistener_using_data || + clistener_using_data || blistener_using_data || marker_using_data) { msg = NULL; curr = time(NULL); @@ -5951,12 +5926,14 @@ int main(int argc, char **argv) } if (msg) { trigger = curr; - printf("%s %ds due to%s%s%s%s%s\n", + printf("%s %ds due to%s%s%s%s%s%s%s\n", msg, (int)(curr - start), socketer_using_data ? " socketer" : EMPTY, summariser_using_data ? " summariser" : EMPTY, logger_using_data ? " logger" : EMPTY, - listener_using_data ? " listener" : EMPTY, + plistener_using_data ? " plistener" : EMPTY, + clistener_using_data ? " clistener" : EMPTY, + blistener_using_data ? " blistener" : EMPTY, marker_using_data ? " marker" : EMPTY); fflush(stdout); } diff --git a/src/ckdb.h b/src/ckdb.h index 16e33eb7..163cf8e7 100644 --- a/src/ckdb.h +++ b/src/ckdb.h @@ -55,7 +55,7 @@ #define DB_VLOCK "1" #define DB_VERSION "1.0.4" -#define CKDB_VERSION DB_VERSION"-1.620" +#define CKDB_VERSION DB_VERSION"-1.704" #define WHERE_FFL " - from %s %s() line %d" #define WHERE_FFL_HERE __FILE__, __func__, __LINE__ @@ -765,6 +765,7 @@ typedef struct msgline { char *code; K_TREE *trf_root; K_STORE *trf_store; + int sockd; } MSGLINE; #define ALLOC_MSGLINE 8192 @@ -792,7 +793,9 @@ typedef struct workqueue { #define DATA_WORKQUEUE(_var, _item) DATA_GENERIC(_var, _item, workqueue, true) extern K_LIST *workqueue_free; -extern K_STORE *workqueue_store; +extern K_STORE *pool_workqueue_store; +extern K_STORE *cmd_workqueue_store; +extern K_STORE *btc_workqueue_store; extern mutex_t wq_waitlock; extern pthread_cond_t wq_waitcond; diff --git a/src/ckdb_cmd.c b/src/ckdb_cmd.c index c9d9f660..a7cde8a7 100644 --- a/src/ckdb_cmd.c +++ b/src/ckdb_cmd.c @@ -3304,7 +3304,13 @@ static char *cmd_homepage(__maybe_unused PGconn *conn, char *cmd, char *id, } // Don't bother with locking - it's just an FYI web stat - snprintf(tmp, sizeof(tmp), "sync=%d%c", workqueue_store->count, FLDSEP); + int psync = pool_workqueue_store->count; + int csync = cmd_workqueue_store->count; + int bsync = btc_workqueue_store->count; + snprintf(tmp, sizeof(tmp), "psync=%d%c", psync, FLDSEP); + snprintf(tmp, sizeof(tmp), "csync=%d%c", csync, FLDSEP); + snprintf(tmp, sizeof(tmp), "bsync=%d%c", bsync, FLDSEP); + snprintf(tmp, sizeof(tmp), "sync=%d%c", psync + csync + bsync, FLDSEP); APPEND_REALLOC(buf, off, len, tmp); u_item = NULL; @@ -3953,9 +3959,9 @@ static char *cmd_setopts(PGconn *conn, char *cmd, char *id, } } if (!oc_item) { - K_RLOCK(optioncontrol_free); + K_WLOCK(optioncontrol_free); oc_item = k_unlink_head(optioncontrol_free); - K_RUNLOCK(optioncontrol_free); + K_WUNLOCK(optioncontrol_free); DATA_OPTIONCONTROL(optioncontrol, oc_item); bzero(optioncontrol, sizeof(*optioncontrol)); STRNCPY(optioncontrol->optionname, optionname); @@ -4558,7 +4564,6 @@ static char *cmd_pplns2(__maybe_unused PGconn *conn, char *cmd, char *id, b_item = find_after_in_ktree(blocks_root, &b_look, b_ctx); K_RUNLOCK(blocks_free); if (!b_item) { - K_RUNLOCK(blocks_free); snprintf(reply, siz, "ERR.no block height >= %"PRId32, height); return strdup(reply); } @@ -5684,7 +5689,7 @@ static char *cmd_stats(__maybe_unused PGconn *conn, char *cmd, char *id, USEINFO(workerstatus, 1, 1); USEINFO(userinfo, 1, 1); USEINFO(msgline, 1, 0); - USEINFO(workqueue, 1, 0); + USEINFO(workqueue, 3, 0); USEINFO(transfer, 0, 0); USEINFO(heartbeatqueue, 1, 0); USEINFO(logqueue, 1, 0); @@ -6603,7 +6608,7 @@ static char *cmd_userinfo(__maybe_unused PGconn *conn, char *cmd, char *id, * You must supply the btcserver to change anything * The format for userpass is username:password * If you don't supply the btcserver it will simply report the current server - * If supply btcserver but not the userpass it will use the current userpass + * If you supply btcserver but not the userpass it will use the current userpass * The reply will ONLY contain the URL, not the user/pass */ static char *cmd_btcset(__maybe_unused PGconn *conn, char *cmd, char *id, __maybe_unused tv_t *now, __maybe_unused char *by, diff --git a/src/ckdb_data.c b/src/ckdb_data.c index 0cbde05d..32dfc830 100644 --- a/src/ckdb_data.c +++ b/src/ckdb_data.c @@ -3455,7 +3455,7 @@ K_ITEM *find_payoutid(int64_t payoutid) return find_in_ktree(payouts_id_root, &look, ctx); } -// First payouts workinfoidend equal or before workinfoidend +// First payouts workinfoidend equal or after workinfoidend K_ITEM *find_payouts_wid(int64_t workinfoidend, K_TREE_CTX *ctx) { PAYOUTS payouts; @@ -3465,12 +3465,13 @@ K_ITEM *find_payouts_wid(int64_t workinfoidend, K_TREE_CTX *ctx) if (ctx == NULL) ctx = ctx0; - payouts.workinfoidend = workinfoidend+1; - DATE_ZERO(&(payouts.expirydate)); + payouts.workinfoidend = workinfoidend-1; + payouts.expirydate.tv_sec = default_expiry.tv_sec; + payouts.expirydate.tv_usec = default_expiry.tv_usec; INIT_PAYOUTS(&look); look.data = (void *)(&payouts); - return find_before_in_ktree(payouts_wid_root, &look, ctx); + return find_after_in_ktree(payouts_wid_root, &look, ctx); } /* Values from payout stats, returns -1 if statname isn't found @@ -4921,23 +4922,25 @@ bool shift_rewards(K_ITEM *wm_item) DATA_WORKMARKERS(wm, wm_item); K_RLOCK(payouts_free); + K_WLOCK(workmarkers_free); p_item = find_payouts_wid(wm->workinfoidend, ctx); DATA_PAYOUTS_NULL(payouts, p_item); // a workmarker should not cross a payout boundary while (p_item && payouts->workinfoidstart <= wm->workinfoidstart && wm->workinfoidend <= payouts->workinfoidend) { - if (CURRENT(&(payouts->expirydate))) { + if (CURRENT(&(payouts->expirydate)) && + PAYGENERATED(payouts->status)) { rewards++; pps += (double)(payouts->minerreward) / payouts->diffused; } - p_item = prev_in_ktree(ctx); + p_item = next_in_ktree(ctx); DATA_PAYOUTS_NULL(payouts, p_item); } - K_RUNLOCK(payouts_free); - wm->rewards = rewards; wm->rewarded = pps; + K_WUNLOCK(workmarkers_free); + K_RUNLOCK(payouts_free); return (rewards > 0); } diff --git a/src/generator.c b/src/generator.c index a270229c..6537ea9d 100644 --- a/src/generator.c +++ b/src/generator.c @@ -768,16 +768,15 @@ out: static bool passthrough_stratum(connsock_t *cs, proxy_instance_t *proxi) { json_t *req, *val = NULL, *res_val, *err_val; + bool res, ret = false; float timeout = 10; - bool ret = false; JSON_CPACK(req, "{s:s,s:[s]}", "method", "mining.passthrough", "params", PACKAGE"/"VERSION); - - ret = send_json_msg(cs, req); + res = send_json_msg(cs, req); json_decref(req); - if (!ret) { + if (!res) { LOGWARNING("Failed to send message in passthrough_stratum"); goto out; } diff --git a/src/ktree.c b/src/ktree.c index c70131f2..16356452 100644 --- a/src/ktree.c +++ b/src/ktree.c @@ -638,6 +638,7 @@ K_ITEM *_find_in_ktree(K_TREE *tree, K_ITEM *data, K_TREE_CTX *ctx, bool chklock } } +// First item after data K_ITEM *_find_after_in_ktree(K_TREE *tree, K_ITEM *data, K_TREE_CTX *ctx, LOCK_MAYBE bool chklock, KTREE_FFL_ARGS) { K_NODE *knode, *old = NULL; @@ -690,6 +691,7 @@ K_ITEM *_find_after_in_ktree(K_TREE *tree, K_ITEM *data, K_TREE_CTX *ctx, LOCK_M } } +// Last item before data K_ITEM *_find_before_in_ktree(K_TREE *tree, K_ITEM *data, K_TREE_CTX *ctx, KTREE_FFL_ARGS) { K_NODE *knode, *old = NULL;