diff --git a/src/ckdb.c b/src/ckdb.c index bd38d44a..005da811 100644 --- a/src/ckdb.c +++ b/src/ckdb.c @@ -41,7 +41,8 @@ * to ensure all code using those trees/lists use locks * This code's lock implementation is equivalent to table level locking * Consider adding row level locking (a per kitem usage count) if needed - * */ + * TODO: verify all tables with multuthread access are locked + */ #define DB_VLOCK "1" #define DB_VERSION "0.6" @@ -102,11 +103,11 @@ static char *restorefrom; * in the CCLs and thus where to stop processing the CCLs to stay in * sync with ckpool * If ckpool isn't running, then the reload will complete at the end of - * the last CCL file, however if a message arrives from ckpool while + * the last CCL file, however if the 1st message arrives from ckpool while * processing the CCLs, that will mark the point where to stop processing * but can also produce a fatal error at the end of processing, reporting - * the full ckpool message, if the message was not found in the CCL - * processing after the message was received + * the ckpool message, if the message was not found in the CCL processing + * after the message was received * This can be caused by two circumstances: * 1) the disk had not yet written it to the CCL when ckdb read EOF and * ckpool was started at about the same time as the reload completed. @@ -416,15 +417,15 @@ static const tv_t date_begin = { DATE_BEGIN, 0L }; } while (0) // Override _row defaults if transfer fields are present -#define HISTORYDATETRANSFER(_row) do { \ +#define HISTORYDATETRANSFER(_root, _row) do { \ K_ITEM *item; \ - item = optional_name("createby", 1, NULL); \ + item = optional_name(_root, "createby", 1, NULL); \ if (item) \ STRNCPY(_row->createby, DATA_TRANSFER(item)->data); \ - item = optional_name("createcode", 1, NULL); \ + item = optional_name(_root, "createcode", 1, NULL); \ if (item) \ STRNCPY(_row->createcode, DATA_TRANSFER(item)->data); \ - item = optional_name("createinet", 1, NULL); \ + item = optional_name(_root, "createinet", 1, NULL); \ if (item) \ STRNCPY(_row->createinet, DATA_TRANSFER(item)->data); \ } while (0) @@ -569,21 +570,21 @@ static const tv_t date_begin = { DATE_BEGIN, 0L }; #define SIMPLEDATEDEFAULT(_row, _cd) do { \ _row->createdate.tv_sec = (_cd)->tv_sec; \ _row->createdate.tv_usec = (_cd)->tv_usec; \ - STRNCPY(_row->createby, (char *)"code"); \ + STRNCPY(_row->createby, by_default); \ STRNCPY(_row->createcode, (char *)__func__); \ - STRNCPY(_row->createinet, (char *)"127.0.0.1"); \ + STRNCPY(_row->createinet, inet_default); \ } while (0) // Override _row defaults if transfer fields are present -#define SIMPLEDATETRANSFER(_row) do { \ +#define SIMPLEDATETRANSFER(_root, _row) do { \ K_ITEM *item; \ - item = optional_name("createby", 1, NULL); \ + item = optional_name(_root, "createby", 1, NULL); \ if (item) \ STRNCPY(_row->createby, DATA_TRANSFER(item)->data); \ - item = optional_name("createcode", 1, NULL); \ + item = optional_name(_root, "createcode", 1, NULL); \ if (item) \ STRNCPY(_row->createcode, DATA_TRANSFER(item)->data); \ - item = optional_name("createinet", 1, NULL); \ + item = optional_name(_root, "createinet", 1, NULL); \ if (item) \ STRNCPY(_row->createinet, DATA_TRANSFER(item)->data); \ } while (0) @@ -626,14 +627,20 @@ static const tv_t date_begin = { DATE_BEGIN, 0L }; } \ } while (0) -// Different input data handling -static bool reloading = false; +// DB users,workers,auth load is complete +static bool db_auths_complete = false; // DB load is complete static bool db_load_complete = false; +// Different input data handling +static bool reloading = false; // Data load is complete static bool startup_complete = false; -// Tell the summarizer to die -static bool summarizer_die = false; +// Tell everyone to die +static bool everyone_die = false; + +static cklock_t fpm_lock; +static char *first_pool_message; +static sem_t socketer_sem; static const char *userpatt = "^[!-~]*$"; // no spaces static const char *mailpatt = "^[A-Za-z0-9_-][A-Za-z0-9_\\.-]*@[A-Za-z0-9][A-Za-z0-9\\.]*[A-Za-z0-9]$"; @@ -650,6 +657,55 @@ static const char *hashpatt = "^[A-Fa-f0-9]*$"; #define STR_SHAREERRORS "shareerror" #define STR_AGEWORKINFO "ageworkinfo" +static char *by_default = "code"; +static char *inet_default = "127.0.0.1"; + +enum cmd_values { + CMD_UNSET, + CMD_REPLY, // Means something was wrong - send back reply + CMD_SHUTDOWN, + CMD_PING, + CMD_SHARELOG, + CMD_AUTH, + CMD_ADDUSER, + CMD_CHKPASS, + CMD_POOLSTAT, + CMD_USERSTAT, + CMD_BLOCK, + CMD_NEWID, + CMD_PAYMENTS, + CMD_WORKERS, + CMD_ALLUSERS, + CMD_HOMEPAGE, + CMD_DSP, + CMD_STATS, + CMD_END +}; + +// WORKQUEUE +typedef struct workqueue { + char *buf; + int which_cmds; + enum cmd_values cmdnum; + char cmd[CMD_SIZ+1]; + char id[ID_SIZ+1]; + tv_t now; + char by[TXT_SML+1]; + char code[TXT_MED+1]; + char inet[TXT_MED+1]; + tv_t cd; + K_TREE *trf_root; + K_STORE *trf_store; +} WORKQUEUE; + +#define ALLOC_WORKQUEUE 1024 +#define LIMIT_WORKQUEUE 0 +#define DATA_WORKQUEUE(_item) ((WORKQUEUE *)(_item->data)) + +static K_LIST *workqueue_free; +static K_STORE *workqueue_store; +static sem_t workqueue_sem; + // TRANSFER #define NAME_SIZE 63 #define VALUE_SIZE 1023 @@ -663,9 +719,7 @@ typedef struct transfer { #define LIMIT_TRANSFER 0 #define DATA_TRANSFER(_item) ((TRANSFER *)(_item->data)) -static K_TREE *transfer_root; static K_LIST *transfer_free; -static K_STORE *transfer_store; // older version missing field defaults static TRANSFER auth_1 = { "poolinstance", "", auth_1.value }; @@ -1331,7 +1385,7 @@ static cmp_t cmp_transfer(K_ITEM *a, K_ITEM *b) DATA_TRANSFER(b)->name); } -static K_ITEM *find_transfer(char *name) +static K_ITEM *find_transfer(K_TREE *trf_root, char *name) { TRANSFER transfer; K_TREE_CTX ctx[1]; @@ -1339,17 +1393,17 @@ static K_ITEM *find_transfer(char *name) STRNCPY(transfer.name, name); look.data = (void *)(&transfer); - return find_in_ktree(transfer_root, &look, cmp_transfer, ctx); + return find_in_ktree(trf_root, &look, cmp_transfer, ctx); } -static K_ITEM *optional_name(char *name, int len, char *patt) +static K_ITEM *optional_name(K_TREE *trf_root, char *name, int len, char *patt) { K_ITEM *item; char *value; regex_t re; int ret; - item = find_transfer(name); + item = find_transfer(trf_root, name); if (!item) return NULL; @@ -1371,12 +1425,12 @@ static K_ITEM *optional_name(char *name, int len, char *patt) return item; } -#define require_name(_name, _len, _patt, _reply, _siz) \ - _require_name(_name, _len, _patt, _reply, _siz, \ - WHERE_FFL_HERE) +#define require_name(_root, _name, _len, _patt, _reply, _siz) \ + _require_name(_root, _name, _len, _patt, _reply, \ + _siz, WHERE_FFL_HERE) -static K_ITEM *_require_name(char *name, int len, char *patt, char *reply, - size_t siz, WHERE_FFL_ARGS) +static K_ITEM *_require_name(K_TREE *trf_root, char *name, int len, char *patt, + char *reply, size_t siz, WHERE_FFL_ARGS) { K_ITEM *item; char *value; @@ -1384,7 +1438,7 @@ static K_ITEM *_require_name(char *name, int len, char *patt, char *reply, size_t dlen; int ret; - item = find_transfer(name); + item = find_transfer(trf_root, name); if (!item) { LOGERR("%s(): failed, field '%s' missing from %s():%d", __func__, name, func, line); @@ -1838,13 +1892,16 @@ static K_ITEM *get_workerstatus(int64_t userid, char *workername) { WORKERSTATUS workerstatus; K_TREE_CTX ctx[1]; - K_ITEM look; + K_ITEM look, *find; workerstatus.userid = userid; STRNCPY(workerstatus.workername, workername); look.data = (void *)(&workerstatus); - return find_in_ktree(workerstatus_root, &look, cmp_workerstatus, ctx); + K_RLOCK(workerstatus_free); + find = find_in_ktree(workerstatus_root, &look, cmp_workerstatus, ctx); + K_RUNLOCK(workerstatus_free); + return find; } static K_ITEM *_find_create_workerstatus(int64_t userid, char *workername, bool create) @@ -1992,6 +2049,7 @@ static cmp_t cmp_userid(K_ITEM *a, K_ITEM *b) return c; } +// Must be R or W locked before call static K_ITEM *find_users(char *username) { USERS users; @@ -2006,6 +2064,7 @@ static K_ITEM *find_users(char *username) return find_in_ktree(users_root, &look, cmp_users, ctx); } +// Must be R or W locked before call static K_ITEM *find_userid(int64_t userid) { USERS users; @@ -2547,7 +2606,9 @@ static K_ITEM *new_worker_find_user(PGconn *conn, bool update, char *username, { K_ITEM *item; + K_RLOCK(users_free); item = find_users(username); + K_RUNLOCK(users_free); if (!item) return NULL; @@ -2882,13 +2943,15 @@ static int64_t workinfo_add(PGconn *conn, char *workinfoidstr, char *poolinstanc char *transactiontree, char *merklehash, char *prevhash, char *coinbase1, char *coinbase2, char *version, char *bits, char *ntime, char *reward, char *by, - char *code, char *inet, tv_t *cd, bool igndup) + char *code, char *inet, tv_t *cd, bool igndup, + K_TREE *trf_root) { ExecStatusType rescode; bool conned = false; K_TREE_CTX ctx[1]; PGresult *res; K_ITEM *item; + char cd_buf[DATE_BUFSIZ]; int n; int64_t workinfoid = -1; WORKINFO *row; @@ -2917,15 +2980,23 @@ static int64_t workinfo_add(PGconn *conn, char *workinfoidstr, char *poolinstanc TXT_TO_BIGINT("reward", reward, row->reward); HISTORYDATEINIT(row, cd, by, code, inet); - HISTORYDATETRANSFER(row); + HISTORYDATETRANSFER(trf_root, row); - if (igndup && find_in_ktree(workinfo_root, item, cmp_workinfo, ctx)) { + K_WLOCK(workinfo_free); + if (find_in_ktree(workinfo_root, item, cmp_workinfo, ctx)) { workinfoid = row->workinfoid; - K_WLOCK(workinfo_free); k_add_head(workinfo_free, item); K_WUNLOCK(workinfo_free); + + if (!igndup) { + tv_to_buf(cd, cd_buf, sizeof(cd_buf)); + LOGERR("%s(): Duplicate workinfo ignored %s/%s/%s", + __func__, workinfoidstr, poolinstance, cd_buf); + } + return workinfoid; } + K_WUNLOCK(workinfo_free); par = 0; params[par++] = bigint_to_buf(row->workinfoid, NULL, 0); @@ -3314,7 +3385,7 @@ static K_ITEM *find_sharesummary(int64_t userid, char *workername, int64_t worki static bool shares_add(PGconn *conn, char *workinfoid, char *username, char *workername, char *clientid, char *enonce1, char *nonce2, char *nonce, char *diff, char *sdiff, char *secondaryuserid, char *by, - char *code, char *inet, tv_t *cd) + char *code, char *inet, tv_t *cd, K_TREE *trf_root) { K_ITEM *s_item, *u_item, *wi_item, *w_item, *ss_item; SHARES *shares; @@ -3329,7 +3400,9 @@ static bool shares_add(PGconn *conn, char *workinfoid, char *username, char *wor shares = DATA_SHARES(s_item); // TODO: allow BTC address later? + K_RLOCK(users_free); u_item = find_users(username); + K_RUNLOCK(users_free); if (!u_item) goto unitem; @@ -3346,7 +3419,7 @@ static bool shares_add(PGconn *conn, char *workinfoid, char *username, char *wor STRNCPY(shares->secondaryuserid, secondaryuserid); HISTORYDATEINIT(shares, cd, by, code, inet); - HISTORYDATETRANSFER(shares); + HISTORYDATETRANSFER(trf_root, shares); wi_item = find_workinfo(shares->workinfoid); if (!wi_item) @@ -3425,7 +3498,7 @@ static cmp_t cmp_shareerrors(K_ITEM *a, K_ITEM *b) static bool shareerrors_add(PGconn *conn, char *workinfoid, char *username, char *workername, char *clientid, char *errn, char *error, char *secondaryuserid, char *by, - char *code, char *inet, tv_t *cd) + char *code, char *inet, tv_t *cd, K_TREE *trf_root) { K_ITEM *s_item, *u_item, *wi_item, *w_item, *ss_item; SHAREERRORS *shareerrors; @@ -3440,7 +3513,9 @@ static bool shareerrors_add(PGconn *conn, char *workinfoid, char *username, shareerrors = DATA_SHAREERRORS(s_item); // TODO: allow BTC address later? + K_RLOCK(users_free); u_item = find_users(username); + K_RUNLOCK(users_free); if (!u_item) goto unitem; @@ -3454,7 +3529,7 @@ static bool shareerrors_add(PGconn *conn, char *workinfoid, char *username, STRNCPY(shareerrors->secondaryuserid, secondaryuserid); HISTORYDATEINIT(shareerrors, cd, by, code, inet); - HISTORYDATETRANSFER(shareerrors); + HISTORYDATETRANSFER(trf_root, shareerrors); wi_item = find_workinfo(shareerrors->workinfoid); if (!wi_item) @@ -4081,7 +4156,7 @@ static bool blocks_add(PGconn *conn, char *height, char *blockhash, char *workername, char *clientid, char *enonce1, char *nonce2, char *nonce, char *reward, char *by, char *code, char *inet, tv_t *cd, - bool igndup, char *id) + bool igndup, char *id, K_TREE *trf_root) { ExecStatusType rescode; bool conned = false; @@ -4111,7 +4186,9 @@ static bool blocks_add(PGconn *conn, char *height, char *blockhash, switch (confirmed[0]) { case BLOCKS_NEW: + K_RLOCK(users_free); u_item = find_users(username); + K_RUNLOCK(users_free); if (!u_item) row->userid = KANO; else @@ -4125,7 +4202,7 @@ static bool blocks_add(PGconn *conn, char *height, char *blockhash, STRNCPY(row->nonce, nonce); TXT_TO_BIGINT("reward", reward, row->reward); - HISTORYDATETRANSFER(row); + HISTORYDATETRANSFER(trf_root, row); if (igndup && find_in_ktree(blocks_root, item, cmp_blocks, ctx)) { K_WLOCK(blocks_free); @@ -4418,13 +4495,14 @@ static cmp_t cmp_auths(K_ITEM *a, K_ITEM *b) static char *auths_add(PGconn *conn, char *poolinstance, char *username, char *workername, char *clientid, char *enonce1, char *useragent, char *by, char *code, char *inet, - tv_t *cd, bool igndup) + tv_t *cd, bool igndup, K_TREE *trf_root) { ExecStatusType rescode; bool conned = false; PGresult *res; K_TREE_CTX ctx[1]; K_ITEM *a_item, *u_item; + char cd_buf[DATE_BUFSIZ]; int n; AUTHS *row; char *ins; @@ -4440,7 +4518,9 @@ static char *auths_add(PGconn *conn, char *poolinstance, char *username, row = DATA_AUTHS(a_item); + K_RLOCK(users_free); u_item = find_users(username); + K_RUNLOCK(users_free); if (!u_item) goto unitem; @@ -4456,14 +4536,22 @@ static char *auths_add(PGconn *conn, char *poolinstance, char *username, STRNCPY(row->useragent, useragent); HISTORYDATEINIT(row, cd, by, code, inet); - HISTORYDATETRANSFER(row); + HISTORYDATETRANSFER(trf_root, row); - if (igndup && find_in_ktree(auths_root, a_item, cmp_auths, ctx)) { - K_WLOCK(auths_free); + K_WLOCK(auths_free); + if (find_in_ktree(auths_root, a_item, cmp_auths, ctx)) { k_add_head(auths_free, a_item); K_WUNLOCK(auths_free); + + if (!igndup) { + tv_to_buf(cd, cd_buf, sizeof(cd_buf)); + LOGERR("%s(): Duplicate auths ignored %s/%s/%s", + __func__, poolinstance, workername, cd_buf); + } + return DATA_USERS(u_item)->secondaryuserid; } + K_WUNLOCK(auths_free); // Update even if DB fails workerstatus_update(row, NULL, NULL, NULL); @@ -4656,7 +4744,7 @@ static bool poolstats_add(PGconn *conn, bool store, char *poolinstance, char *hashrate, char *hashrate5m, char *hashrate1hr, char *hashrate24hr, char *by, char *code, char *inet, tv_t *cd, - bool igndup) + bool igndup, K_TREE *trf_root) { ExecStatusType rescode; bool conned = false; @@ -4690,7 +4778,7 @@ static bool poolstats_add(PGconn *conn, bool store, char *poolinstance, TXT_TO_DOUBLE("hashrate24hr", hashrate24hr, row->hashrate24hr); SIMPLEDATEINIT(row, cd, by, code, inet); - SIMPLEDATETRANSFER(row); + SIMPLEDATETRANSFER(trf_root, row); if (igndup && find_in_ktree(poolstats_root, p_item, cmp_poolstats, ctx)) { K_WLOCK(poolstats_free); @@ -5036,7 +5124,8 @@ unparam: static bool userstats_add(char *poolinstance, char *elapsed, char *username, char *workername, char *hashrate, char *hashrate5m, char *hashrate1hr, char *hashrate24hr, bool idle, - bool eos, char *by, char *code, char *inet, tv_t *cd) + bool eos, char *by, char *code, char *inet, tv_t *cd, + K_TREE *trf_root) { K_ITEM *us_item, *u_item, *us_match, *us_next, look; tv_t eosdate; @@ -5053,7 +5142,9 @@ static bool userstats_add(char *poolinstance, char *elapsed, char *username, STRNCPY(row->poolinstance, poolinstance); TXT_TO_BIGINT("elapsed", elapsed, row->elapsed); + K_RLOCK(users_free); u_item = find_users(username); + K_RUNLOCK(users_free); if (!u_item) return false; row->userid = DATA_USERS(u_item)->userid; @@ -5067,7 +5158,7 @@ static bool userstats_add(char *poolinstance, char *elapsed, char *username, row->summarylevel[1] = '\0'; row->summarycount = 1; SIMPLEDATEINIT(row, cd, by, code, inet); - SIMPLEDATETRANSFER(row); + SIMPLEDATETRANSFER(trf_root, row); copy_tv(&(row->statsdate), &(row->createdate)); if (eos) { @@ -5419,6 +5510,9 @@ static bool check_db_version(PGconn *conn) return true; } +/* Load tables required to support auths,adduser,chkpass and newid + * N.B. idcontrol is DB internal so is always ready + */ static bool getdata1() { PGconn *conn = dbconnect(); @@ -5465,9 +5559,9 @@ sukamudai: return ok; } -static void reload_from(tv_t *start); +static bool reload_from(tv_t *start); -static void reload() +static bool reload() { char buf[DATE_BUFSIZ+1]; char *filename; @@ -5536,7 +5630,7 @@ static void reload() } free(filename); } - reload_from(&start); + return reload_from(&start); } /* TODO: @@ -5635,10 +5729,16 @@ static bool setup_data() K_ITEM look, *found; WORKINFO wi; + cklock_init(&fpm_lock); + cksem_init(&workqueue_sem); + cksem_init(&socketer_sem); + + workqueue_free = k_new_list("WorkQueue", sizeof(WORKQUEUE), + ALLOC_WORKQUEUE, LIMIT_WORKQUEUE, true); + workqueue_store = k_new_store(workqueue_free); + transfer_free = k_new_list("Transfer", sizeof(TRANSFER), ALLOC_TRANSFER, LIMIT_TRANSFER, true); - transfer_store = k_new_store(transfer_free); - transfer_root = new_ktree(); transfer_free->dsp_func = dsp_transfer; users_free = k_new_list("Users", sizeof(USERS), @@ -5717,12 +5817,16 @@ static bool setup_data() if (!getdata1()) return false; + db_auths_complete = true; + cksem_post(&socketer_sem); + if (!getdata2()) return false; db_load_complete = true; - reload(); + if (!reload()) + return false; workerstatus_ready(); @@ -5746,7 +5850,8 @@ static bool setup_data() } static char *cmd_adduser(PGconn *conn, char *cmd, char *id, tv_t *now, char *by, - char *code, char *inet, __maybe_unused tv_t *notcd) + char *code, char *inet, __maybe_unused tv_t *notcd, + K_TREE *trf_root) { char reply[1024] = ""; size_t siz = sizeof(reply); @@ -5756,15 +5861,15 @@ static char *cmd_adduser(PGconn *conn, char *cmd, char *id, tv_t *now, char *by, LOGDEBUG("%s(): cmd '%s'", __func__, cmd); - i_username = require_name("username", 3, (char *)userpatt, reply, siz); + i_username = require_name(trf_root, "username", 3, (char *)userpatt, reply, siz); if (!i_username) return strdup(reply); - i_emailaddress = require_name("emailaddress", 7, (char *)mailpatt, reply, siz); + i_emailaddress = require_name(trf_root, "emailaddress", 7, (char *)mailpatt, reply, siz); if (!i_emailaddress) return strdup(reply); - i_passwordhash = require_name("passwordhash", 64, (char *)hashpatt, reply, siz); + i_passwordhash = require_name(trf_root, "passwordhash", 64, (char *)hashpatt, reply, siz); if (!i_passwordhash) return strdup(reply); @@ -5785,7 +5890,7 @@ static char *cmd_adduser(PGconn *conn, char *cmd, char *id, tv_t *now, char *by, static char *cmd_chkpass(__maybe_unused PGconn *conn, char *cmd, char *id, __maybe_unused tv_t *now, __maybe_unused char *by, __maybe_unused char *code, __maybe_unused char *inet, - __maybe_unused tv_t *notcd) + __maybe_unused tv_t *notcd, K_TREE *trf_root) { K_ITEM *i_username, *i_passwordhash, *u_item; char reply[1024] = ""; @@ -5794,15 +5899,17 @@ static char *cmd_chkpass(__maybe_unused PGconn *conn, char *cmd, char *id, LOGDEBUG("%s(): cmd '%s'", __func__, cmd); - i_username = require_name("username", 3, (char *)userpatt, reply, siz); + i_username = require_name(trf_root, "username", 3, (char *)userpatt, reply, siz); if (!i_username) return strdup(reply); - i_passwordhash = require_name("passwordhash", 64, (char *)hashpatt, reply, siz); + i_passwordhash = require_name(trf_root, "passwordhash", 64, (char *)hashpatt, reply, siz); if (!i_passwordhash) return strdup(reply); + K_RLOCK(users_free); u_item = find_users(DATA_TRANSFER(i_username)->data); + K_RUNLOCK(users_free); if (!u_item) ok = false; @@ -5822,7 +5929,8 @@ static char *cmd_chkpass(__maybe_unused PGconn *conn, char *cmd, char *id, } static char *cmd_poolstats_do(PGconn *conn, char *cmd, char *id, char *by, - char *code, char *inet, tv_t *cd, bool igndup) + char *code, char *inet, tv_t *cd, bool igndup, + K_TREE *trf_root) { char reply[1024] = ""; size_t siz = sizeof(reply); @@ -5839,35 +5947,35 @@ static char *cmd_poolstats_do(PGconn *conn, char *cmd, char *id, char *by, LOGDEBUG("%s(): cmd '%s'", __func__, cmd); - i_poolinstance = require_name("poolinstance", 1, NULL, reply, siz); + i_poolinstance = require_name(trf_root, "poolinstance", 1, NULL, reply, siz); if (!i_poolinstance) return strdup(reply); - i_elapsed = optional_name("elapsed", 1, NULL); + i_elapsed = optional_name(trf_root, "elapsed", 1, NULL); if (!i_elapsed) i_elapsed = &poolstats_elapsed; - i_users = require_name("users", 1, NULL, reply, siz); + i_users = require_name(trf_root, "users", 1, NULL, reply, siz); if (!i_users) return strdup(reply); - i_workers = require_name("workers", 1, NULL, reply, siz); + i_workers = require_name(trf_root, "workers", 1, NULL, reply, siz); if (!i_workers) return strdup(reply); - i_hashrate = require_name("hashrate", 1, NULL, reply, siz); + i_hashrate = require_name(trf_root, "hashrate", 1, NULL, reply, siz); if (!i_hashrate) return strdup(reply); - i_hashrate5m = require_name("hashrate5m", 1, NULL, reply, siz); + i_hashrate5m = require_name(trf_root, "hashrate5m", 1, NULL, reply, siz); if (!i_hashrate5m) return strdup(reply); - i_hashrate1hr = require_name("hashrate1hr", 1, NULL, reply, siz); + i_hashrate1hr = require_name(trf_root, "hashrate1hr", 1, NULL, reply, siz); if (!i_hashrate1hr) return strdup(reply); - i_hashrate24hr = require_name("hashrate24hr", 1, NULL, reply, siz); + i_hashrate24hr = require_name(trf_root, "hashrate24hr", 1, NULL, reply, siz); if (!i_hashrate24hr) return strdup(reply); @@ -5902,7 +6010,7 @@ static char *cmd_poolstats_do(PGconn *conn, char *cmd, char *id, char *by, DATA_TRANSFER(i_hashrate5m)->data, DATA_TRANSFER(i_hashrate1hr)->data, DATA_TRANSFER(i_hashrate24hr)->data, - by, code, inet, cd, igndup); + by, code, inet, cd, igndup, trf_root); if (!ok) { LOGERR("%s.failed.DBE", id); @@ -5915,7 +6023,8 @@ static char *cmd_poolstats_do(PGconn *conn, char *cmd, char *id, char *by, static char *cmd_poolstats(PGconn *conn, char *cmd, char *id, __maybe_unused tv_t *notnow, char *by, - char *code, char *inet, tv_t *cd) + char *code, char *inet, tv_t *cd, + K_TREE *trf_root) { bool igndup = false; @@ -5926,12 +6035,12 @@ static char *cmd_poolstats(PGconn *conn, char *cmd, char *id, return NULL; } - return cmd_poolstats_do(conn, cmd, id, by, code, inet, cd, igndup); + return cmd_poolstats_do(conn, cmd, id, by, code, inet, cd, igndup, trf_root); } static char *cmd_userstats(__maybe_unused PGconn *conn, char *cmd, char *id, __maybe_unused tv_t *notnow, char *by, char *code, - char *inet, tv_t *cd) + char *inet, tv_t *cd, K_TREE *trf_root) { char reply[1024] = ""; size_t siz = sizeof(reply); @@ -5945,45 +6054,45 @@ static char *cmd_userstats(__maybe_unused PGconn *conn, char *cmd, char *id, LOGDEBUG("%s(): cmd '%s'", __func__, cmd); - i_poolinstance = require_name("poolinstance", 1, NULL, reply, siz); + i_poolinstance = require_name(trf_root, "poolinstance", 1, NULL, reply, siz); if (!i_poolinstance) return strdup(reply); - i_elapsed = optional_name("elapsed", 1, NULL); + i_elapsed = optional_name(trf_root, "elapsed", 1, NULL); if (!i_elapsed) i_elapsed = &userstats_elapsed; - i_username = require_name("username", 1, NULL, reply, siz); + i_username = require_name(trf_root, "username", 1, NULL, reply, siz); if (!i_username) return strdup(reply); - i_workername = optional_name("workername", 1, NULL); + i_workername = optional_name(trf_root, "workername", 1, NULL); if (!i_workername) i_workername = &userstats_workername; - i_hashrate = require_name("hashrate", 1, NULL, reply, siz); + i_hashrate = require_name(trf_root, "hashrate", 1, NULL, reply, siz); if (!i_hashrate) return strdup(reply); - i_hashrate5m = require_name("hashrate5m", 1, NULL, reply, siz); + i_hashrate5m = require_name(trf_root, "hashrate5m", 1, NULL, reply, siz); if (!i_hashrate5m) return strdup(reply); - i_hashrate1hr = require_name("hashrate1hr", 1, NULL, reply, siz); + i_hashrate1hr = require_name(trf_root, "hashrate1hr", 1, NULL, reply, siz); if (!i_hashrate1hr) return strdup(reply); - i_hashrate24hr = require_name("hashrate24hr", 1, NULL, reply, siz); + i_hashrate24hr = require_name(trf_root, "hashrate24hr", 1, NULL, reply, siz); if (!i_hashrate24hr) return strdup(reply); - i_idle = optional_name("idle", 1, NULL); + i_idle = optional_name(trf_root, "idle", 1, NULL); if (!i_idle) i_idle = &userstats_idle; idle = (strcasecmp(DATA_TRANSFER(i_idle)->data, TRUE_STR) == 0); - i_eos = optional_name("eos", 1, NULL); + i_eos = optional_name(trf_root, "eos", 1, NULL); if (!i_eos) i_eos = &userstats_eos; @@ -5997,7 +6106,7 @@ static char *cmd_userstats(__maybe_unused PGconn *conn, char *cmd, char *id, DATA_TRANSFER(i_hashrate5m)->data, DATA_TRANSFER(i_hashrate1hr)->data, DATA_TRANSFER(i_hashrate24hr)->data, - idle, eos, by, code, inet, cd); + idle, eos, by, code, inet, cd, trf_root); if (!ok) { LOGERR("%s.failed.DATA", id); @@ -6009,7 +6118,8 @@ static char *cmd_userstats(__maybe_unused PGconn *conn, char *cmd, char *id, } static char *cmd_newid(PGconn *conn, char *cmd, char *id, tv_t *now, char *by, - char *code, char *inet, __maybe_unused tv_t *cd) + char *code, char *inet, __maybe_unused tv_t *cd, + K_TREE *trf_root) { char reply[1024] = ""; size_t siz = sizeof(reply); @@ -6026,11 +6136,11 @@ static char *cmd_newid(PGconn *conn, char *cmd, char *id, tv_t *now, char *by, LOGDEBUG("%s(): cmd '%s'", __func__, cmd); - i_idname = require_name("idname", 3, (char *)idpatt, reply, siz); + i_idname = require_name(trf_root, "idname", 3, (char *)idpatt, reply, siz); if (!i_idname) return strdup(reply); - i_idvalue = require_name("idvalue", 1, (char *)intpatt, reply, siz); + i_idvalue = require_name(trf_root, "idvalue", 1, (char *)intpatt, reply, siz); if (!i_idvalue) return strdup(reply); @@ -6090,7 +6200,8 @@ foil: static char *cmd_payments(__maybe_unused PGconn *conn, char *cmd, char *id, __maybe_unused tv_t *now, __maybe_unused char *by, __maybe_unused char *code, __maybe_unused char *inet, - __maybe_unused tv_t *notcd) + __maybe_unused tv_t *notcd, + __maybe_unused K_TREE *trf_root) { K_ITEM *i_username, look, *u_item, *p_item; K_TREE_CTX ctx[1]; @@ -6104,11 +6215,13 @@ static char *cmd_payments(__maybe_unused PGconn *conn, char *cmd, char *id, LOGDEBUG("%s(): cmd '%s'", __func__, cmd); - i_username = require_name("username", 3, (char *)userpatt, reply, siz); + i_username = require_name(trf_root, "username", 3, (char *)userpatt, reply, siz); if (!i_username) return strdup(reply); + K_RLOCK(users_free); u_item = find_users(DATA_TRANSFER(i_username)->data); + K_RUNLOCK(users_free); if (!u_item) return strdup("bad"); @@ -6146,7 +6259,7 @@ static char *cmd_payments(__maybe_unused PGconn *conn, char *cmd, char *id, static char *cmd_workers(__maybe_unused PGconn *conn, char *cmd, char *id, __maybe_unused tv_t *now, __maybe_unused char *by, __maybe_unused char *code, __maybe_unused char *inet, - __maybe_unused tv_t *notcd) + __maybe_unused tv_t *notcd, K_TREE *trf_root) { K_ITEM *i_username, *i_stats, wlook, *u_item, *w_item, uslook, *us_item, *ws_item; K_TREE_CTX w_ctx[1], us_ctx[1]; @@ -6162,15 +6275,17 @@ static char *cmd_workers(__maybe_unused PGconn *conn, char *cmd, char *id, LOGDEBUG("%s(): cmd '%s'", __func__, cmd); - i_username = require_name("username", 3, (char *)userpatt, reply, siz); + i_username = require_name(trf_root, "username", 3, (char *)userpatt, reply, siz); if (!i_username) return strdup(reply); + K_RLOCK(users_free); u_item = find_users(DATA_TRANSFER(i_username)->data); + K_RUNLOCK(users_free); if (!u_item) return strdup("bad"); - i_stats = optional_name("stats", 1, NULL); + i_stats = optional_name(trf_root, "stats", 1, NULL); if (!i_stats) stats = false; else @@ -6284,7 +6399,8 @@ static char *cmd_workers(__maybe_unused PGconn *conn, char *cmd, char *id, static char *cmd_allusers(__maybe_unused PGconn *conn, char *cmd, char *id, __maybe_unused tv_t *now, __maybe_unused char *by, __maybe_unused char *code, __maybe_unused char *inet, - __maybe_unused tv_t *notcd) + __maybe_unused tv_t *notcd, + __maybe_unused K_TREE *trf_root) { K_TREE *userstats_workername_root = new_ktree(); K_ITEM *us_item, *usw_item, *tmp_item, *u_item; @@ -6325,7 +6441,9 @@ static char *cmd_allusers(__maybe_unused PGconn *conn, char *cmd, char *id, while (usw_item) { if (DATA_USERSTATS(usw_item)->userid != userid) { if (userid != -1) { + K_RLOCK(users_free); u_item = find_userid(userid); + K_RUNLOCK(users_free); if (!u_item) { LOGERR("%s() userid %"PRId64" ignored - userstats but not users", __func__, userid); @@ -6356,7 +6474,9 @@ static char *cmd_allusers(__maybe_unused PGconn *conn, char *cmd, char *id, k_add_head(userstats_free, tmp_item); } if (userid != -1) { + K_RLOCK(users_free); u_item = find_userid(userid); + K_RUNLOCK(users_free); if (!u_item) { LOGERR("%s() userid %"PRId64" ignored - userstats but not users", __func__, userid); @@ -6389,7 +6509,8 @@ static char *cmd_allusers(__maybe_unused PGconn *conn, char *cmd, char *id, static char *cmd_sharelog(PGconn *conn, char *cmd, char *id, __maybe_unused tv_t *notnow, char *by, - char *code, char *inet, tv_t *cd) + char *code, char *inet, tv_t *cd, + K_TREE *trf_root) { char reply[1024] = ""; size_t siz = sizeof(reply); @@ -6412,47 +6533,47 @@ static char *cmd_sharelog(PGconn *conn, char *cmd, char *id, return NULL; } - i_workinfoid = require_name("workinfoid", 1, NULL, reply, siz); + i_workinfoid = require_name(trf_root, "workinfoid", 1, NULL, reply, siz); if (!i_workinfoid) return strdup(reply); - i_poolinstance = require_name("poolinstance", 1, NULL, reply, siz); + i_poolinstance = require_name(trf_root, "poolinstance", 1, NULL, reply, siz); if (!i_poolinstance) return strdup(reply); - i_transactiontree = require_name("transactiontree", 0, NULL, reply, siz); + i_transactiontree = require_name(trf_root, "transactiontree", 0, NULL, reply, siz); if (!i_transactiontree) return strdup(reply); - i_merklehash = require_name("merklehash", 0, NULL, reply, siz); + i_merklehash = require_name(trf_root, "merklehash", 0, NULL, reply, siz); if (!i_merklehash) return strdup(reply); - i_prevhash = require_name("prevhash", 1, NULL, reply, siz); + i_prevhash = require_name(trf_root, "prevhash", 1, NULL, reply, siz); if (!i_prevhash) return strdup(reply); - i_coinbase1 = require_name("coinbase1", 1, NULL, reply, siz); + i_coinbase1 = require_name(trf_root, "coinbase1", 1, NULL, reply, siz); if (!i_coinbase1) return strdup(reply); - i_coinbase2 = require_name("coinbase2", 1, NULL, reply, siz); + i_coinbase2 = require_name(trf_root, "coinbase2", 1, NULL, reply, siz); if (!i_coinbase2) return strdup(reply); - i_version = require_name("version", 1, NULL, reply, siz); + i_version = require_name(trf_root, "version", 1, NULL, reply, siz); if (!i_version) return strdup(reply); - i_bits = require_name("bits", 1, NULL, reply, siz); + i_bits = require_name(trf_root, "bits", 1, NULL, reply, siz); if (!i_bits) return strdup(reply); - i_ntime = require_name("ntime", 1, NULL, reply, siz); + i_ntime = require_name(trf_root, "ntime", 1, NULL, reply, siz); if (!i_ntime) return strdup(reply); - i_reward = require_name("reward", 1, NULL, reply, siz); + i_reward = require_name(trf_root, "reward", 1, NULL, reply, siz); if (!i_reward) return strdup(reply); @@ -6467,7 +6588,7 @@ static char *cmd_sharelog(PGconn *conn, char *cmd, char *id, DATA_TRANSFER(i_bits)->data, DATA_TRANSFER(i_ntime)->data, DATA_TRANSFER(i_reward)->data, - by, code, inet, cd, igndup); + by, code, inet, cd, igndup, trf_root); if (workinfoid == -1) { LOGERR("%s.failed.DBE", id); @@ -6487,43 +6608,43 @@ static char *cmd_sharelog(PGconn *conn, char *cmd, char *id, return NULL; } - i_workinfoid = require_name("workinfoid", 1, NULL, reply, siz); + i_workinfoid = require_name(trf_root, "workinfoid", 1, NULL, reply, siz); if (!i_workinfoid) return strdup(reply); - i_username = require_name("username", 1, NULL, reply, siz); + i_username = require_name(trf_root, "username", 1, NULL, reply, siz); if (!i_username) return strdup(reply); - i_workername = require_name("workername", 1, NULL, reply, siz); + i_workername = require_name(trf_root, "workername", 1, NULL, reply, siz); if (!i_workername) return strdup(reply); - i_clientid = require_name("clientid", 1, NULL, reply, siz); + i_clientid = require_name(trf_root, "clientid", 1, NULL, reply, siz); if (!i_clientid) return strdup(reply); - i_enonce1 = require_name("enonce1", 1, NULL, reply, siz); + i_enonce1 = require_name(trf_root, "enonce1", 1, NULL, reply, siz); if (!i_enonce1) return strdup(reply); - i_nonce2 = require_name("nonce2", 1, NULL, reply, siz); + i_nonce2 = require_name(trf_root, "nonce2", 1, NULL, reply, siz); if (!i_nonce2) return strdup(reply); - i_nonce = require_name("nonce", 1, NULL, reply, siz); + i_nonce = require_name(trf_root, "nonce", 1, NULL, reply, siz); if (!i_nonce) return strdup(reply); - i_diff = require_name("diff", 1, NULL, reply, siz); + i_diff = require_name(trf_root, "diff", 1, NULL, reply, siz); if (!i_diff) return strdup(reply); - i_sdiff = require_name("sdiff", 1, NULL, reply, siz); + i_sdiff = require_name(trf_root, "sdiff", 1, NULL, reply, siz); if (!i_sdiff) return strdup(reply); - i_secondaryuserid = require_name("secondaryuserid", 1, NULL, reply, siz); + i_secondaryuserid = require_name(trf_root, "secondaryuserid", 1, NULL, reply, siz); if (!i_secondaryuserid) return strdup(reply); @@ -6537,7 +6658,7 @@ static char *cmd_sharelog(PGconn *conn, char *cmd, char *id, DATA_TRANSFER(i_diff)->data, DATA_TRANSFER(i_sdiff)->data, DATA_TRANSFER(i_secondaryuserid)->data, - by, code, inet, cd); + by, code, inet, cd, trf_root); if (!ok) { LOGERR("%s.failed.DATA", id); @@ -6557,31 +6678,31 @@ static char *cmd_sharelog(PGconn *conn, char *cmd, char *id, return NULL; } - i_workinfoid = require_name("workinfoid", 1, NULL, reply, siz); + i_workinfoid = require_name(trf_root, "workinfoid", 1, NULL, reply, siz); if (!i_workinfoid) return strdup(reply); - i_username = require_name("username", 1, NULL, reply, siz); + i_username = require_name(trf_root, "username", 1, NULL, reply, siz); if (!i_username) return strdup(reply); - i_workername = require_name("workername", 1, NULL, reply, siz); + i_workername = require_name(trf_root, "workername", 1, NULL, reply, siz); if (!i_workername) return strdup(reply); - i_clientid = require_name("clientid", 1, NULL, reply, siz); + i_clientid = require_name(trf_root, "clientid", 1, NULL, reply, siz); if (!i_clientid) return strdup(reply); - i_errn = require_name("errn", 1, NULL, reply, siz); + i_errn = require_name(trf_root, "errn", 1, NULL, reply, siz); if (!i_errn) return strdup(reply); - i_error = require_name("error", 1, NULL, reply, siz); + i_error = require_name(trf_root, "error", 1, NULL, reply, siz); if (!i_error) return strdup(reply); - i_secondaryuserid = require_name("secondaryuserid", 1, NULL, reply, siz); + i_secondaryuserid = require_name(trf_root, "secondaryuserid", 1, NULL, reply, siz); if (!i_secondaryuserid) return strdup(reply); @@ -6592,7 +6713,7 @@ static char *cmd_sharelog(PGconn *conn, char *cmd, char *id, DATA_TRANSFER(i_errn)->data, DATA_TRANSFER(i_error)->data, DATA_TRANSFER(i_secondaryuserid)->data, - by, code, inet, cd); + by, code, inet, cd, trf_root); if (!ok) { LOGERR("%s.failed.DATA", id); return strdup("failed.DATA"); @@ -6609,11 +6730,11 @@ static char *cmd_sharelog(PGconn *conn, char *cmd, char *id, return NULL; } - i_workinfoid = require_name("workinfoid", 1, NULL, reply, siz); + i_workinfoid = require_name(trf_root, "workinfoid", 1, NULL, reply, siz); if (!i_workinfoid) return strdup(reply); - i_poolinstance = require_name("poolinstance", 1, NULL, reply, siz); + i_poolinstance = require_name(trf_root, "poolinstance", 1, NULL, reply, siz); if (!i_poolinstance) return strdup(reply); @@ -6640,7 +6761,8 @@ static char *cmd_sharelog(PGconn *conn, char *cmd, char *id, // TODO: the confirm update: identify block changes from workinfo height? static char *cmd_blocks_do(PGconn *conn, char *cmd, char *id, char *by, - char *code, char *inet, tv_t *cd, bool igndup) + char *code, char *inet, tv_t *cd, bool igndup, + K_TREE *trf_root) { char reply[1024] = ""; size_t siz = sizeof(reply); @@ -6651,50 +6773,50 @@ static char *cmd_blocks_do(PGconn *conn, char *cmd, char *id, char *by, LOGDEBUG("%s(): cmd '%s'", __func__, cmd); - i_height = require_name("height", 1, NULL, reply, siz); + i_height = require_name(trf_root, "height", 1, NULL, reply, siz); if (!i_height) return strdup(reply); - i_blockhash = require_name("blockhash", 1, NULL, reply, siz); + i_blockhash = require_name(trf_root, "blockhash", 1, NULL, reply, siz); if (!i_blockhash) return strdup(reply); - i_confirmed = require_name("confirmed", 1, NULL, reply, siz); + i_confirmed = require_name(trf_root, "confirmed", 1, NULL, reply, siz); if (!i_confirmed) return strdup(reply); DATA_TRANSFER(i_confirmed)->data[0] = tolower(DATA_TRANSFER(i_confirmed)->data[0]); switch(DATA_TRANSFER(i_confirmed)->data[0]) { case BLOCKS_NEW: - i_workinfoid = require_name("workinfoid", 1, NULL, reply, siz); + i_workinfoid = require_name(trf_root, "workinfoid", 1, NULL, reply, siz); if (!i_workinfoid) return strdup(reply); - i_username = require_name("username", 1, NULL, reply, siz); + i_username = require_name(trf_root, "username", 1, NULL, reply, siz); if (!i_username) return strdup(reply); - i_workername = require_name("workername", 1, NULL, reply, siz); + i_workername = require_name(trf_root, "workername", 1, NULL, reply, siz); if (!i_workername) return strdup(reply); - i_clientid = require_name("clientid", 1, NULL, reply, siz); + i_clientid = require_name(trf_root, "clientid", 1, NULL, reply, siz); if (!i_clientid) return strdup(reply); - i_enonce1 = require_name("enonce1", 1, NULL, reply, siz); + i_enonce1 = require_name(trf_root, "enonce1", 1, NULL, reply, siz); if (!i_enonce1) return strdup(reply); - i_nonce2 = require_name("nonce2", 1, NULL, reply, siz); + i_nonce2 = require_name(trf_root, "nonce2", 1, NULL, reply, siz); if (!i_nonce2) return strdup(reply); - i_nonce = require_name("nonce", 1, NULL, reply, siz); + i_nonce = require_name(trf_root, "nonce", 1, NULL, reply, siz); if (!i_nonce) return strdup(reply); - i_reward = require_name("reward", 1, NULL, reply, siz); + i_reward = require_name(trf_root, "reward", 1, NULL, reply, siz); if (!i_reward) return strdup(reply); @@ -6710,7 +6832,8 @@ static char *cmd_blocks_do(PGconn *conn, char *cmd, char *id, char *by, DATA_TRANSFER(i_nonce2)->data, DATA_TRANSFER(i_nonce)->data, DATA_TRANSFER(i_reward)->data, - by, code, inet, cd, igndup, id); + by, code, inet, cd, igndup, id, + trf_root); break; case BLOCKS_CONFIRM: msg = "confirmed"; @@ -6719,7 +6842,8 @@ static char *cmd_blocks_do(PGconn *conn, char *cmd, char *id, char *by, DATA_TRANSFER(i_confirmed)->data, EMPTY, EMPTY, EMPTY, EMPTY, EMPTY, EMPTY, EMPTY, EMPTY, - by, code, inet, cd, igndup, id); + by, code, inet, cd, igndup, id, + trf_root); break; default: LOGERR("%s(): %s.failed.invalid confirm='%s'", @@ -6739,7 +6863,8 @@ static char *cmd_blocks_do(PGconn *conn, char *cmd, char *id, char *by, static char *cmd_blocks(PGconn *conn, char *cmd, char *id, __maybe_unused tv_t *notnow, char *by, - char *code, char *inet, tv_t *cd) + char *code, char *inet, tv_t *cd, + K_TREE *trf_root) { bool igndup = false; @@ -6750,11 +6875,12 @@ static char *cmd_blocks(PGconn *conn, char *cmd, char *id, return NULL; } - return cmd_blocks_do(conn, cmd, id, by, code, inet, cd, igndup); + return cmd_blocks_do(conn, cmd, id, by, code, inet, cd, igndup, trf_root); } static char *cmd_auth_do(PGconn *conn, char *cmd, char *id, char *by, - char *code, char *inet, tv_t *cd, bool igndup) + char *code, char *inet, tv_t *cd, bool igndup, + K_TREE *trf_root) { char reply[1024] = ""; size_t siz = sizeof(reply); @@ -6764,27 +6890,27 @@ static char *cmd_auth_do(PGconn *conn, char *cmd, char *id, char *by, LOGDEBUG("%s(): cmd '%s'", __func__, cmd); - i_poolinstance = optional_name("poolinstance", 1, NULL); + i_poolinstance = optional_name(trf_root, "poolinstance", 1, NULL); if (!i_poolinstance) i_poolinstance = &auth_poolinstance; - i_username = require_name("username", 1, NULL, reply, siz); + i_username = require_name(trf_root, "username", 1, NULL, reply, siz); if (!i_username) return strdup(reply); - i_workername = require_name("workername", 1, NULL, reply, siz); + i_workername = require_name(trf_root, "workername", 1, NULL, reply, siz); if (!i_workername) return strdup(reply); - i_clientid = require_name("clientid", 1, NULL, reply, siz); + i_clientid = require_name(trf_root, "clientid", 1, NULL, reply, siz); if (!i_clientid) return strdup(reply); - i_enonce1 = require_name("enonce1", 1, NULL, reply, siz); + i_enonce1 = require_name(trf_root, "enonce1", 1, NULL, reply, siz); if (!i_enonce1) return strdup(reply); - i_useragent = require_name("useragent", 0, NULL, reply, siz); + i_useragent = require_name(trf_root, "useragent", 0, NULL, reply, siz); if (!i_useragent) return strdup(reply); @@ -6794,7 +6920,7 @@ static char *cmd_auth_do(PGconn *conn, char *cmd, char *id, char *by, DATA_TRANSFER(i_clientid)->data, DATA_TRANSFER(i_enonce1)->data, DATA_TRANSFER(i_useragent)->data, - by, code, inet, cd, igndup); + by, code, inet, cd, igndup, trf_root); if (!secuserid) { LOGDEBUG("%s.failed.DBE", id); @@ -6808,7 +6934,8 @@ static char *cmd_auth_do(PGconn *conn, char *cmd, char *id, char *by, static char *cmd_auth(PGconn *conn, char *cmd, char *id, __maybe_unused tv_t *now, char *by, - char *code, char *inet, tv_t *cd) + char *code, char *inet, tv_t *cd, + K_TREE *trf_root) { bool igndup = false; @@ -6819,13 +6946,13 @@ static char *cmd_auth(PGconn *conn, char *cmd, char *id, return NULL; } - return cmd_auth_do(conn, cmd, id, by, code, inet, cd, igndup); + return cmd_auth_do(conn, cmd, id, by, code, inet, cd, igndup, trf_root); } static char *cmd_homepage(__maybe_unused PGconn *conn, char *cmd, char *id, __maybe_unused tv_t *now, __maybe_unused char *by, __maybe_unused char *code, __maybe_unused char *inet, - __maybe_unused tv_t *notcd) + __maybe_unused tv_t *notcd, K_TREE *trf_root) { K_ITEM *i_username, *u_item, *b_item, *p_item, *us_item, look; double u_hashrate5m, u_hashrate1hr; @@ -6838,7 +6965,7 @@ static char *cmd_homepage(__maybe_unused PGconn *conn, char *cmd, char *id, LOGDEBUG("%s(): cmd '%s'", __func__, cmd); - i_username = optional_name("username", 1, NULL); + i_username = optional_name(trf_root, "username", 1, NULL); APPEND_REALLOC_INIT(buf, off, len); APPEND_REALLOC(buf, off, len, "ok."); @@ -6895,8 +7022,11 @@ static char *cmd_homepage(__maybe_unused PGconn *conn, char *cmd, char *id, } u_item = NULL; - if (i_username) + if (i_username) { + K_RLOCK(users_free); u_item = find_users(DATA_TRANSFER(i_username)->data); + K_RUNLOCK(users_free); + } has_uhr = false; if (p_item && u_item) { @@ -6962,7 +7092,8 @@ static char *cmd_homepage(__maybe_unused PGconn *conn, char *cmd, char *id, static char *cmd_dsp(__maybe_unused PGconn *conn, __maybe_unused char *cmd, char *id, __maybe_unused tv_t *now, __maybe_unused char *by, __maybe_unused char *code, - __maybe_unused char *inet, __maybe_unused tv_t *notcd) + __maybe_unused char *inet, __maybe_unused tv_t *notcd, + __maybe_unused K_TREE *trf_root) { __maybe_unused K_ITEM *i_file; __maybe_unused char reply[1024] = ""; @@ -6974,11 +7105,11 @@ static char *cmd_dsp(__maybe_unused PGconn *conn, __maybe_unused char *cmd, LOGDEBUG("%s.disabled.dsp", id); return strdup("disabled.dsp"); /* - i_file = require_name("file", 1, NULL, reply, siz); + i_file = require_name(trf_root, "file", 1, NULL, reply, siz); if (!i_file) return strdup(reply); - dsp_ktree(transfer_free, transfer_root, DATA_TRANSFER(i_file)->data, NULL); + dsp_ktree(transfer_free, trf_root, DATA_TRANSFER(i_file)->data, NULL); dsp_ktree(sharesummary_free, sharesummary_root, DATA_TRANSFER(i_file)->data, NULL); @@ -6992,7 +7123,7 @@ static char *cmd_dsp(__maybe_unused PGconn *conn, __maybe_unused char *cmd, static char *cmd_stats(__maybe_unused PGconn *conn, char *cmd, char *id, __maybe_unused tv_t *now, __maybe_unused char *by, __maybe_unused char *code, __maybe_unused char *inet, - __maybe_unused tv_t *notcd) + __maybe_unused tv_t *notcd, __maybe_unused K_TREE *trf_root) { char tmp[1024], *buf; size_t len, off; @@ -7038,6 +7169,8 @@ static char *cmd_stats(__maybe_unused PGconn *conn, char *cmd, char *id, USEINFO(poolstats, 1, 1); USEINFO(userstats, 4, 2); USEINFO(workerstatus, 1, 1); + USEINFO(workqueue, 1, 0); + USEINFO(transfer, 0, 0); snprintf(tmp, sizeof(tmp), "%ctotalram=%"PRIu64, FLDSEP, tot); APPEND_REALLOC(buf, off, len, tmp); @@ -7046,28 +7179,6 @@ static char *cmd_stats(__maybe_unused PGconn *conn, char *cmd, char *id, return buf; } -enum cmd_values { - CMD_UNSET, - CMD_REPLY, // Means something was wrong - send back reply - CMD_SHUTDOWN, - CMD_PING, - CMD_SHARELOG, - CMD_AUTH, - CMD_ADDUSER, - CMD_CHKPASS, - CMD_POOLSTAT, - CMD_USERSTAT, - CMD_BLOCK, - CMD_NEWID, - CMD_PAYMENTS, - CMD_WORKERS, - CMD_ALLUSERS, - CMD_HOMEPAGE, - CMD_DSP, - CMD_STATS, - CMD_END -}; - // TODO: limit access #define ACCESS_POOL "p" #define ACCESS_SYSTEM "s" @@ -7080,7 +7191,8 @@ static struct CMDS { char *cmd_str; bool noid; // doesn't require an id bool createdate; // requires a createdate - char *(*func)(PGconn *, char *, char *, tv_t *, char *, char *, char *, tv_t *); + char *(*func)(PGconn *, char *, char *, tv_t *, char *, char *, + char *, tv_t *, K_TREE *); char *access; } cmds[] = { { CMD_SHUTDOWN, "shutdown", true, false, NULL, ACCESS_SYSTEM }, @@ -7105,7 +7217,9 @@ static struct CMDS { { CMD_END, NULL, false, false, NULL, NULL } }; -static enum cmd_values breakdown(char *buf, int *which_cmds, char *cmd, char *id, tv_t *cd) +static enum cmd_values breakdown(K_TREE **trf_root, K_STORE **trf_store, + char *buf, int *which_cmds, char *cmd, + char *id, tv_t *cd) { char reply[1024] = ""; K_TREE_CTX ctx[1]; @@ -7113,6 +7227,8 @@ static enum cmd_values breakdown(char *buf, int *which_cmds, char *cmd, char *id char *cmdptr, *idptr, *data, *next, *eq; bool noid = false; + *trf_root = NULL; + *trf_store = NULL; *which_cmds = CMD_UNSET; *cmd = *id = '\0'; cd->tv_sec = 0; @@ -7155,6 +7271,8 @@ static enum cmd_values breakdown(char *buf, int *which_cmds, char *cmd, char *id return CMD_REPLY; } + *trf_root = new_ktree(); + *trf_store = k_new_store(transfer_free); next = data; if (next && strncmp(next, JSON_TRANSFER, JSON_TRANSFER_LEN) == 0) { json_t *json_data; @@ -7251,13 +7369,13 @@ static enum cmd_values breakdown(char *buf, int *which_cmds, char *cmd, char *id if (ok) STRNCPY(DATA_TRANSFER(item)->name, json_key); - if (!ok || find_in_ktree(transfer_root, item, cmp_transfer, ctx)) { + if (!ok || find_in_ktree(*trf_root, item, cmp_transfer, ctx)) { if (DATA_TRANSFER(item)->data != DATA_TRANSFER(item)->value) free(DATA_TRANSFER(item)->data); k_add_head(transfer_free, item); } else { - transfer_root = add_to_ktree(transfer_root, item, cmp_transfer); - k_add_head(transfer_store, item); + *trf_root = add_to_ktree(*trf_root, item, cmp_transfer); + k_add_head(*trf_store, item); } json_iter = json_object_iter_next(json_data, json_iter); } @@ -7282,19 +7400,19 @@ static enum cmd_values breakdown(char *buf, int *which_cmds, char *cmd, char *id STRNCPY(DATA_TRANSFER(item)->value, eq); DATA_TRANSFER(item)->data = DATA_TRANSFER(item)->value; - if (find_in_ktree(transfer_root, item, cmp_transfer, ctx)) { + if (find_in_ktree(*trf_root, item, cmp_transfer, ctx)) { if (DATA_TRANSFER(item)->data != DATA_TRANSFER(item)->value) free(DATA_TRANSFER(item)->data); k_add_head(transfer_free, item); } else { - transfer_root = add_to_ktree(transfer_root, item, cmp_transfer); - k_add_head(transfer_store, item); + *trf_root = add_to_ktree(*trf_root, item, cmp_transfer); + k_add_head(*trf_store, item); } } K_WUNLOCK(transfer_free); } if (cmds[*which_cmds].createdate) { - item = require_name("createdate", 10, NULL, reply, sizeof(reply)); + item = require_name(*trf_root, "createdate", 10, NULL, reply, sizeof(reply)); if (!item) return CMD_REPLY; @@ -7502,30 +7620,333 @@ static void *summariser(__maybe_unused void *arg) { pthread_detach(pthread_self()); - while (!summarizer_die && !db_load_complete) + while (!everyone_die && !db_load_complete) cksleep_ms(42); - while (!summarizer_die) { - sleep(19); + while (!everyone_die) { + sleep(13); - if (!summarizer_die) + if (!everyone_die) summarise_poolstats(); - if (!summarizer_die) + if (!everyone_die) summarise_userstats(); } return NULL; } -static void reload_line(PGconn *conn, char *filename, uint64_t count, char *buf) +static void *socketer(__maybe_unused void *arg) +{ + proc_instance_t *pi = (proc_instance_t *)arg; + unixsock_t *us = &pi->us; + char *end, *ans = NULL, *rep = NULL, *buf = NULL, *dot; + char cmd[CMD_SIZ+1], id[ID_SIZ+1], reply[1024+1]; + char *last_auth = NULL, *reply_auth = NULL; + char *last_adduser = NULL, *reply_adduser = NULL; + char *last_chkpass = NULL, *reply_chkpass = NULL; + char *last_newid = NULL, *reply_newid = NULL; + char *last_web = NULL, *reply_web = NULL; + char *reply_last, duptype[CMD_SIZ+1]; + enum cmd_values cmdnum; + int sockd, which_cmds; + WORKQUEUE *workqueue; + K_STORE *trf_store; + K_TREE *trf_root; + K_ITEM *item; + size_t siz; + tv_t now, cd; + bool dup, want_first; + + pthread_detach(pthread_self()); + + while (!everyone_die && !db_auths_complete) + cksem_mswait(&socketer_sem, 420); + + want_first = true; + while (!everyone_die) { + if (buf) + dealloc(buf); + sockd = accept(us->sockd, NULL, NULL); + if (sockd < 0) { + LOGERR("Failed to accept on socket in listener"); + break; + } + + cmdnum = CMD_UNSET; + trf_root = NULL; + trf_store = NULL; + + buf = recv_unix_msg(sockd); + // Once we've read the message + setnow(&now); + if (buf) { + end = buf + strlen(buf) - 1; + // strip trailing \n and \r + while (end >= buf && (*end == '\n' || *end == '\r')) + *(end--) = '\0'; + } + if (!buf || !*buf) { + // An empty message wont get a reply + if (!buf) + LOGWARNING("Failed to get message in listener"); + else + LOGWARNING("Empty message in listener"); + } else { + if (want_first) { + ck_wlock(&fpm_lock); + first_pool_message = strdup(buf); + ck_wunlock(&fpm_lock); + want_first = false; + } + + /* For duplicates: + * Queued pool messages are handled by the queue code + * but since they reply ok.queued that message can + * be returned every time here + * System: repeat process them + * Web: current php web sends a timestamp of seconds + * so duplicate code will only trigger if the same + * message is sent within the same second and thus + * will effectively reduce the processing load for + * sequential duplicates + * adduser duplicates are handled by the DB code + * auth, chkpass, adduser, newid - remember individual + * last message and reply and repeat the reply without + * reprocessing the message + */ + dup = false; + if (last_auth && strcmp(last_auth, buf) == 0) { + reply_last = reply_auth; + dup = true; + } else if (last_chkpass && strcmp(last_chkpass, buf) == 0) { + reply_last = reply_chkpass; + dup = true; + } else if (last_adduser && strcmp(last_adduser, buf) == 0) { + reply_last = reply_adduser; + dup = true; + } else if (last_newid && strcmp(last_newid, buf) == 0) { + reply_last = reply_newid; + dup = true; + } else if (last_web && strcmp(last_web, buf) == 0) { + reply_last = reply_web; + dup = true; + } + if (dup) { + send_unix_msg(sockd, reply_last); + STRNCPY(duptype, buf); + dot = strchr(duptype, '.'); + if (dot) + *dot = '\0'; + snprintf(reply, sizeof(reply), "%s%ld,%ld.%s", + LOGDUP, now.tv_sec, now.tv_usec, duptype); + LOGFILE(reply); + LOGWARNING("Duplicate '%s' message received", duptype); + } else { + LOGFILE(buf); + cmdnum = breakdown(&trf_root, &trf_store, buf, &which_cmds, cmd, id, &cd); + switch (cmdnum) { + case CMD_REPLY: + snprintf(reply, sizeof(reply), "%s.%ld.?.", id, now.tv_sec); + send_unix_msg(sockd, reply); + break; + case CMD_SHUTDOWN: + LOGWARNING("Listener received shutdown message, terminating ckdb"); + snprintf(reply, sizeof(reply), "%s.%ld.ok.exiting", id, now.tv_sec); + send_unix_msg(sockd, reply); + break; + case CMD_PING: + LOGDEBUG("Listener received ping request"); + snprintf(reply, sizeof(reply), "%s.%ld.ok.pong", id, now.tv_sec); + send_unix_msg(sockd, reply); + break; + // Always process immediately: + case CMD_AUTH: + case CMD_CHKPASS: + case CMD_ADDUSER: + case CMD_NEWID: + case CMD_STATS: + ans = cmds[which_cmds].func(NULL, cmd, id, &now, + by_default, + (char *)__func__, + inet_default, + &cd, trf_root); + siz = strlen(ans) + strlen(id) + 32; + rep = malloc(siz); + snprintf(rep, siz, "%s.%ld.%s", id, now.tv_sec, ans); + send_unix_msg(sockd, rep); + free(ans); + ans = NULL; + switch (cmdnum) { + case CMD_AUTH: + if (last_auth) + free(last_auth); + last_auth = buf; + buf = NULL; + if (reply_auth) + free(reply_auth); + reply_auth = rep; + break; + case CMD_CHKPASS: + if (last_chkpass) + free(last_chkpass); + last_chkpass = buf; + buf = NULL; + if (reply_chkpass) + free(reply_chkpass); + reply_chkpass = rep; + break; + case CMD_ADDUSER: + if (last_adduser) + free(last_adduser); + last_adduser = buf; + buf = NULL; + if (reply_adduser) + free(reply_adduser); + reply_adduser = rep; + break; + case CMD_NEWID: + if (last_newid) + free(last_newid); + last_newid = buf; + buf = NULL; + if (reply_newid) + free(reply_newid); + reply_newid = rep; + break; + default: + free(rep); + } + rep = NULL; + break; + // Process, but reject (loading) until startup_complete + case CMD_HOMEPAGE: + case CMD_ALLUSERS: + case CMD_WORKERS: + case CMD_PAYMENTS: + case CMD_DSP: + if (!startup_complete) { + snprintf(reply, sizeof(reply), + "%s.%ld.loading.%s", + id, now.tv_sec, cmd); + send_unix_msg(sockd, reply); + } else { + ans = cmds[which_cmds].func(NULL, cmd, id, &now, + by_default, + (char *)__func__, + inet_default, + &cd, trf_root); + siz = strlen(ans) + strlen(id) + 32; + rep = malloc(siz); + snprintf(rep, siz, "%s.%ld.%s", id, now.tv_sec, ans); + send_unix_msg(sockd, rep); + free(ans); + ans = NULL; + if (cmdnum == CMD_DSP) + free(rep); + else { + if (last_web) + free(last_web); + last_web = buf; + buf = NULL; + if (reply_web) + free(reply_web); + reply_web = rep; + } + rep = NULL; + } + break; + // Always queue (ok.queued) + case CMD_SHARELOG: + case CMD_POOLSTAT: + case CMD_USERSTAT: + case CMD_BLOCK: + snprintf(reply, sizeof(reply), + "%s.%ld.ok.queued", + id, now.tv_sec); + send_unix_msg(sockd, reply); + + K_WLOCK(workqueue_free); + item = k_unlink_head(workqueue_free); + K_WUNLOCK(workqueue_free); + + workqueue = DATA_WORKQUEUE(item); + workqueue->buf = buf; + buf = NULL; + workqueue->which_cmds = which_cmds; + workqueue->cmdnum = cmdnum; + STRNCPY(workqueue->cmd, cmd); + STRNCPY(workqueue->id, id); + copy_tv(&(workqueue->now), &now); + STRNCPY(workqueue->by, by_default); + STRNCPY(workqueue->code, __func__); + STRNCPY(workqueue->inet, inet_default); + copy_tv(&(workqueue->cd), &cd); + workqueue->trf_root = trf_root; + trf_root = NULL; + workqueue->trf_store = trf_store; + trf_store = NULL; + + K_WLOCK(workqueue_free); + k_add_tail(workqueue_store, item); + K_WUNLOCK(workqueue_free); + + cksem_post(&workqueue_sem); + break; + // Code error + default: + LOGEMERG("%s() CODE ERROR unhandled message %d %.32s...", + __func__, cmdnum, buf); + snprintf(reply, sizeof(reply), + "%s.%ld.failed.code", + id, now.tv_sec); + send_unix_msg(sockd, reply); + break; + } + } + } + close(sockd); + + tick(); + + if (cmdnum == CMD_SHUTDOWN) + break; + + if (trf_root) + trf_root = free_ktree(trf_root, NULL); + if (trf_store) { + item = trf_store->head; + while (item) { + if (DATA_TRANSFER(item)->data != DATA_TRANSFER(item)->value) + free(DATA_TRANSFER(item)->data); + item = item->next; + } + K_WLOCK(transfer_free); + k_list_transfer_to_head(trf_store, transfer_free); + K_WUNLOCK(transfer_free); + trf_store = k_free_store(trf_store); + } + } + + if (buf) + dealloc(buf); + // TODO: if anyone cares, free all the dup buffers :P + close_unix_socket(us->sockd, us->path); + + return NULL; +} + +static bool reload_line(PGconn *conn, char *filename, uint64_t count, char *buf) { char cmd[CMD_SIZ+1], id[ID_SIZ+1]; enum cmd_values cmdnum; char *end, *ans; int which_cmds; + K_STORE *trf_store = NULL; + K_TREE *trf_root = NULL; K_ITEM *item; tv_t now, cd; + bool finished; // Once we've read the message setnow(&now); @@ -7541,8 +7962,18 @@ static void reload_line(PGconn *conn, char *filename, uint64_t count, char *buf) else LOGERR("%s() Empty message line %"PRIu64, __func__, count); } else { + finished = false; + ck_wlock(&fpm_lock); + if (first_pool_message && strcmp(first_pool_message, buf) == 0) + finished = true; + ck_wunlock(&fpm_lock); + if (finished) { + LOGERR("%s() reload completed, ckpool queue match at line %"PRIu64, __func__, count); + return true; + } + LOGFILE(buf); - cmdnum = breakdown(buf, &which_cmds, cmd, id, &cd); + cmdnum = breakdown(&trf_root, &trf_store, buf, &which_cmds, cmd, id, &cd); switch (cmdnum) { // Ignore case CMD_REPLY: @@ -7569,9 +8000,10 @@ static void reload_line(PGconn *conn, char *filename, uint64_t count, char *buf) case CMD_USERSTAT: case CMD_BLOCK: ans = cmds[which_cmds].func(conn, cmd, id, &now, - (char *)"code", + by_default, (char *)__func__, - (char *)"127.0.0.1", &cd); + inet_default, + &cd, trf_root); if (ans) free(ans); break; @@ -7582,19 +8014,25 @@ static void reload_line(PGconn *conn, char *filename, uint64_t count, char *buf) break; } - K_WLOCK(transfer_free); - transfer_root = free_ktree(transfer_root, NULL); - item = transfer_store->head; - while (item) { - if (DATA_TRANSFER(item)->data != DATA_TRANSFER(item)->value) - free(DATA_TRANSFER(item)->data); - item = item->next; + if (trf_root) + trf_root = free_ktree(trf_root, NULL); + if (trf_store) { + item = trf_store->head; + while (item) { + if (DATA_TRANSFER(item)->data != DATA_TRANSFER(item)->value) + free(DATA_TRANSFER(item)->data); + item = item->next; + } + K_WLOCK(transfer_free); + k_list_transfer_to_head(trf_store, transfer_free); + K_WUNLOCK(transfer_free); + trf_store = k_free_store(trf_store); } - k_list_transfer_to_head(transfer_store, transfer_free); - K_WUNLOCK(transfer_free); } tick(); + + return false; } // Log files are every ... @@ -7605,7 +8043,7 @@ static void reload_line(PGconn *conn, char *filename, uint64_t count, char *buf) /* If the reload start file is missing and -r was specified correctly: * touch the filename reported in "Failed to open 'filename'" * when ckdb aborts at the beginning of the reload */ -static void reload_from(tv_t *start) +static bool reload_from(tv_t *start) { PGconn *conn = NULL; char buf[DATE_BUFSIZ+1], run[DATE_BUFSIZ+1]; @@ -7613,7 +8051,7 @@ static void reload_from(tv_t *start) char *missingfirst = NULL, *missinglast = NULL; int missing_count; int processing; - bool finished = false; + bool finished = false, matched = false, ret = true; char *filename = NULL; char data[MAX_READ]; uint64_t count, total; @@ -7644,8 +8082,8 @@ static void reload_from(tv_t *start) processing++; count = 0; - while (fgets_unlocked(data, MAX_READ, fp)) - reload_line(conn, filename, ++count, data); + while (!matched && fgets_unlocked(data, MAX_READ, fp)) + matched = reload_line(conn, filename, ++count, data); if (ferror(fp)) { int err = errno; @@ -7660,6 +8098,8 @@ static void reload_from(tv_t *start) total += count; fclose(fp); free(filename); + if (matched) + break; start->tv_sec += ROLL_S; filename = rotating_filename(restorefrom, start->tv_sec); fp = fopen(filename, "r"); @@ -7715,167 +8155,102 @@ static void reload_from(tv_t *start) processing, processing == 1 ? "" : "s", total, total == 1 ? "" : "s"); + if (!matched) { + ck_wlock(&fpm_lock); + if (first_pool_message) { + LOGERR("%s() reload completed without finding ckpool queue match '%.32s'...", + __func__, first_pool_message); + LOGERR("%s() restart ckdb to resolve this", __func__); + ret = false; + } + ck_wunlock(&fpm_lock); + } + reloading = false; + + return ret; +} + +static void process_queued(K_ITEM *wq_item) +{ + static char *last_buf = NULL; + WORKQUEUE *workqueue; + K_ITEM *item; + char *ans; + + workqueue = DATA_WORKQUEUE(wq_item); + + // Simply ignore the (very rare) duplicates + if (!last_buf || strcmp(workqueue->buf, last_buf)) { + ans = cmds[workqueue->which_cmds].func(NULL, workqueue->cmd, workqueue->id, + &(workqueue->now), workqueue->by, + workqueue->code, workqueue->inet, + &(workqueue->cd), workqueue->trf_root); + free(ans); + } + + if (last_buf) + free(last_buf); + last_buf = workqueue->buf; + + workqueue->trf_root = free_ktree(workqueue->trf_root, NULL); + item = workqueue->trf_store->head; + while (item) { + if (DATA_TRANSFER(item)->data != DATA_TRANSFER(item)->value) + free(DATA_TRANSFER(item)->data); + item = item->next; + } + K_WLOCK(transfer_free); + k_list_transfer_to_head(workqueue->trf_store, transfer_free); + K_WUNLOCK(transfer_free); + workqueue->trf_store = k_free_store(workqueue->trf_store); + + K_WLOCK(workqueue_free); + k_add_head(workqueue_free, wq_item); + K_WUNLOCK(workqueue_free); } // TODO: equivalent of api_allow static void *listener(void *arg) { proc_instance_t *pi = (proc_instance_t *)arg; - unixsock_t *us = &pi->us; - char *end, *ans, *rep, *buf = NULL, *dot; - char *last_msg = NULL, *last_reply = NULL; - char cmd[CMD_SIZ+1], id[ID_SIZ+1], reply[1024+1]; - // Minimise the size in case of garbage - char duptype[CMD_SIZ+1]; - enum cmd_values cmdnum, last_cmd = 9001; - int sockd, which_cmds; - pthread_t summzer; - uint64_t counter = 0; - K_ITEM *item; - size_t siz; - tv_t now, cd; - bool dup; + pthread_t sock_pt; + pthread_t summ_pt; + K_ITEM *wq_item; + int qc; - create_pthread(&summzer, summariser, NULL); + create_pthread(&sock_pt, socketer, arg); + + create_pthread(&summ_pt, summariser, NULL); rename_proc(pi->sockname); if (!setup_data()) { + everyone_die = true; LOGEMERG("ABORTING"); return NULL; } - LOGWARNING("%s(): ckdb ready", __func__); - - startup_complete = true; - - while (true) { - dealloc(buf); - sockd = accept(us->sockd, NULL, NULL); - if (sockd < 0) { - LOGERR("Failed to accept on socket in listener"); - break; - } - - cmdnum = CMD_UNSET; - - buf = recv_unix_msg(sockd); - // Once we've read the message - setnow(&now); - if (buf) { - end = buf + strlen(buf) - 1; - // strip trailing \n and \r - while (end >= buf && (*end == '\n' || *end == '\r')) - *(end--) = '\0'; - } - if (!buf || !*buf) { - // An empty message wont get a reply - if (!buf) - LOGWARNING("Failed to get message in listener"); - else - LOGWARNING("Empty message in listener"); - } else { - /* For duplicates: - * System: shutdown and ping are always processed, - * so for any others, send a ping between them - * if you need to send the same message twice - * Web: if the pool didn't do anything since the original - * then the reply could be wrong for any reply that - * changes over time ... however if the pool is busy - * and we get the same request repeatedly, this will - * reduce the load - thus always send the same reply - * Pool: must not process it, must send back the same reply - * TODO: remember last message+reply per source - */ - if (last_msg && strcmp(last_msg, buf) == 0) { - dup = true; - // This means an exact duplicate of the last non-dup - snprintf(reply, sizeof(reply), "%s%ld,%ld", LOGDUP, now.tv_sec, now.tv_usec); - LOGFILE(reply); - cmdnum = last_cmd; - - STRNCPY(duptype, buf); - dot = strchr(duptype, '.'); - if (dot) - *dot = '\0'; - LOGWARNING("Duplicate '%s' message received", duptype); - } else { - dup = false; - LOGFILE(buf); - cmdnum = breakdown(buf, &which_cmds, cmd, id, &cd); - last_cmd = cmdnum; - } - switch (cmdnum) { - case CMD_REPLY: - if (dup) - send_unix_msg(sockd, last_reply); - else { - snprintf(reply, sizeof(reply), "%s.%ld.?.", id, now.tv_sec); - if (last_reply) - free(last_reply); - last_reply = strdup(reply); - send_unix_msg(sockd, reply); - } - break; - case CMD_SHUTDOWN: - LOGWARNING("Listener received shutdown message, terminating ckdb"); - snprintf(reply, sizeof(reply), "%s.%ld.ok.exiting", id, now.tv_sec); - send_unix_msg(sockd, reply); - break; - case CMD_PING: - LOGDEBUG("Listener received ping request"); - // Generate a new reply each time even on dup - snprintf(reply, sizeof(reply), "%s.%ld.ok.pong", id, now.tv_sec); - send_unix_msg(sockd, reply); - break; - default: - if (dup) - send_unix_msg(sockd, last_reply); - else { - ans = cmds[which_cmds].func(NULL, cmd, id, &now, - (char *)"code", - (char *)__func__, - (char *)"127.0.0.1", &cd); - siz = strlen(ans) + strlen(id) + 32; - rep = malloc(siz); - snprintf(rep, siz, "%s.%ld.%s", id, now.tv_sec, ans); - free(ans); - ans = NULL; - if (last_reply) - free(last_reply); - last_reply = strdup(rep); - send_unix_msg(sockd, rep); - free(rep); - rep = NULL; - } - break; - } - } - close(sockd); + K_RLOCK(workqueue_store); + qc = workqueue_store->count; + K_RUNLOCK(workqueue_store); - counter++; - tick(); + LOGWARNING("%s(): ckdb ready, queue %d", __func__, qc); - if (cmdnum == CMD_SHUTDOWN) - break; + startup_complete = true; - K_WLOCK(transfer_free); - transfer_root = free_ktree(transfer_root, NULL); - item = transfer_store->head; - while (item) { - if (DATA_TRANSFER(item)->data != DATA_TRANSFER(item)->value) - free(DATA_TRANSFER(item)->data); - item = item->next; - } - k_list_transfer_to_head(transfer_store, transfer_free); - K_WUNLOCK(transfer_free); + // Process queued work + while (!everyone_die) { + K_WLOCK(workqueue_store); + wq_item = k_unlink_head(workqueue_store); + K_WUNLOCK(workqueue_store); + if (wq_item) { + process_queued(wq_item); + tick(); + } else + cksem_mswait(&workqueue_sem, 420); } - dealloc(buf); - if (last_reply) - free(last_reply); - close_unix_socket(us->sockd, us->path); return NULL; } @@ -7978,7 +8353,6 @@ int main(int argc, char **argv) } if (!ckp.socket_dir) { -// ckp.socket_dir = strdup("/tmp/"); ckp.socket_dir = strdup("/opt/"); realloc_strcat(&ckp.socket_dir, ckp.name); }