diff --git a/src/ckdb.c b/src/ckdb.c index 77b785f0..3f4f9350 100644 --- a/src/ckdb.c +++ b/src/ckdb.c @@ -117,8 +117,15 @@ static bool blistener_using_data; static bool breakdown_using_data; static bool replier_using_data; +// Define the array size for thread data +#define THREAD_LIMIT 99 +/* Use -Q to set it higher + * Setting it higher can degrade performance if the server can't + * handle the extra locking or is swapping */ +static int queue_threads = 1; // -B to override calculated value static int breakdown_threads = -1; +#define BREAKDOWN_RATIO 3 static int reload_breakdown_count = 0; static int cmd_breakdown_count = 0; /* Lock for access to *breakdown_count @@ -275,6 +282,10 @@ int64_t confirm_last_workinfoid; #define WORKINFO_AGE 660 static tv_t reload_timestamp; +// Shared by threads - accessed under breakqueue_free lock +static uint64_t reload_processed = 0; +// Shared by threads - accessed under workqueue_free lock +static uint64_t workqueue_proc0 = 0, workqueue_proc1 = 0; /* Allow overriding the workinfoid range used in the db load of * workinfo and sharesummary */ @@ -297,7 +308,7 @@ bool prereload = true; // Different input data handling bool reloading = false; // Start marks processing during a larger reload -static bool reloaded_N_files = false; +bool reloaded_N_files = false; // Data load is complete bool startup_complete = false; // Set to true when pool0 completes, pool0 = socket data during reload @@ -389,6 +400,9 @@ K_STORE *logqueue_store; K_LIST *msgline_free; K_STORE *msgline_store; +// This can be set with the -q option +static int reload_queue_limit = RELOAD_QUEUE_LIMIT; + // BREAKQUEUE K_LIST *breakqueue_free; K_STORE *reload_breakqueue_store; @@ -498,6 +512,8 @@ K_TREE *users_root; K_TREE *userid_root; K_LIST *users_free; K_STORE *users_store; +// Emulate a list for lock checking +K_LIST *users_db_free; // USERATTS K_TREE *useratts_root; @@ -556,6 +572,8 @@ tv_t last_bc; // current network diff double current_ndiff; bool txn_tree_store = true; +// avoid trying to run 2 ages at the same time +bool workinfo_age_lock = false; // SHARES shares.id.json={...} K_TREE *shares_root; @@ -3716,7 +3734,7 @@ static void *breaker(void *arg) // Is this a reload thread or a cmd thread? reload = *(bool *)(arg); if (reload) { - queue_limit = RELOAD_QUEUE_LIMIT; + queue_limit = reload_queue_limit; queue_sleep = RELOAD_QUEUE_SLEEP_MS; when_add.tv_sec = RELOAD_QUEUE_SLEEP_MS / 1000; when_add.tv_nsec = (RELOAD_QUEUE_SLEEP_MS % 1000) * 1000000; @@ -4353,7 +4371,7 @@ static void make_a_shift_mark() int32_t prev_height; char wi_bits[TXT_SML+1]; bool was_block = false, ok, oc_look = true; - char cd_buf[DATE_BUFSIZ], cd_buf2[DATE_BUFSIZ], cd_buf3[DATE_BUFSIZ]; + char cd_buf[DATE_BUFSIZ], cd_buf2[DATE_BUFSIZ]; int used_wid; /* If there are no CURRENT marks, make the first one by @@ -4433,11 +4451,10 @@ static void make_a_shift_mark() if (ss_item) { tv_to_buf(&(sharesummary->lastshareacc), cd_buf, sizeof(cd_buf)); tv_to_buf(&(sharesummary->lastshare), cd_buf2, sizeof(cd_buf2)); - tv_to_buf(&(sharesummary->createdate), cd_buf3, sizeof(cd_buf3)); - LOGDEBUG("%s() last sharesummary %s/%s/%"PRId64"/%s/%s/%s", + LOGDEBUG("%s() last sharesummary %s/%s/%"PRId64"/%s/%s", __func__, sharesummary->complete, sharesummary->workername, - ss_age_wid, cd_buf, cd_buf2, cd_buf3); + ss_age_wid, cd_buf, cd_buf2); } LOGDEBUG("%s() age sharesummary limit wid %"PRId64, __func__, ss_age_wid); @@ -4676,14 +4693,12 @@ static void make_a_shift_mark() cd_buf, sizeof(cd_buf)); tv_to_buf(&(sharesummary->lastshare), cd_buf2, sizeof(cd_buf2)); - tv_to_buf(&(sharesummary->createdate), - cd_buf3, sizeof(cd_buf3)); LOGEMERG("%s() ERR unaged sharesummary " - "%s/%s/%"PRId64"/%s/%s/%s", + "%s/%s/%"PRId64"/%s/%s", __func__, sharesummary->complete, sharesummary->workername, sharesummary->workinfoid, - cd_buf, cd_buf2, cd_buf3); + cd_buf, cd_buf2); return; } } @@ -5367,6 +5382,34 @@ static void *process_socket(void *arg) DATA_BREAKQUEUE(bq, bq_item); DATA_MSGLINE(msgline, bq->ml_item); + + // Limited commands available during key_update + if (key_update) { + switch (bq->cmdnum) { + case CMD_TERMINATE: + case CMD_PING: + case CMD_VERSION: + case CMD_LOGLEVEL: + case CMD_FLUSH: + case CMD_STATS: + case CMD_SHSTA: + case CMD_CHKPASS: + case CMD_GETATTS: + case CMD_HOMEPAGE: + break; + default: + snprintf(reply, sizeof(reply), + "%s.%ld.unavailable.%s", + msgline->id, + bq->now.tv_sec, + msgline->cmd); + setnow(&(msgline->processed)); + ckdb_unix_msg(REPLIER_CMD, bq->sockd, + reply, msgline, true); + goto skippy; + } + } + if (SEQALL_LOG) { K_ITEM *seqall; if (msgline->trf_root) { @@ -5378,6 +5421,7 @@ static void *process_socket(void *arg) } } } + replied = btc = false; switch (bq->cmdnum) { case CMD_AUTH: @@ -5584,7 +5628,7 @@ static void *process_socket(void *arg) case CMD_BLOCKSTATUS: case CMD_MARKS: case CMD_QUERY: - if (!startup_complete) { + if (!startup_complete && !key_update) { snprintf(reply, sizeof(reply), "%s.%ld.loading.%s", msgline->id, @@ -5729,6 +5773,9 @@ static void *process_socket(void *arg) msgline, true); break; } + +skippy: + if (bq->sockd >= 0) dec_sockd = true; else @@ -5905,28 +5952,167 @@ static void *socketer(void *arg) return NULL; } -static void *process_reload(__maybe_unused void *arg) +static void process_reload_item(PGconn *conn, K_ITEM *bq_item) { - PGconn *conn = NULL; MSGLINE *msgline = NULL; - K_ITEM *bq_item = NULL; BREAKQUEUE *bq = NULL; enum cmd_values cmdnum; char *ans, *st = NULL; + + DATA_BREAKQUEUE(bq, bq_item); + DATA_MSGLINE(msgline, bq->ml_item); + if (SEQALL_LOG) { + K_ITEM *seqall; + if (msgline->trf_root) { + seqall = find_transfer(msgline->trf_root, SEQALL); + if (seqall) { + LOGNOTICE("%s() SEQALL %d %s", + __func__, bq->cmdnum, + transfer_data(seqall)); + } + } + } + switch (bq->cmdnum) { + // Ignore + case CMD_REPLY: + case CMD_ALERTEVENT: + case CMD_ALERTOVENT: + break; + // Shouldn't be there + case CMD_TERMINATE: + case CMD_PING: + case CMD_VERSION: + case CMD_LOGLEVEL: + case CMD_FLUSH: + // Non pool commands, shouldn't be there + case CMD_ADDUSER: + case CMD_NEWPASS: + case CMD_CHKPASS: + case CMD_2FA: + case CMD_USERSET: + case CMD_WORKERSET: + case CMD_BLOCKLIST: + case CMD_BLOCKSTATUS: + case CMD_NEWID: + case CMD_PAYMENTS: + case CMD_WORKERS: + case CMD_ALLUSERS: + case CMD_HOMEPAGE: + case CMD_GETATTS: + case CMD_SETATTS: + case CMD_EXPATTS: + case CMD_GETOPTS: + case CMD_SETOPTS: + case CMD_DSP: + case CMD_STATS: + case CMD_PPLNS: + case CMD_PPLNS2: + case CMD_PAYOUTS: + case CMD_MPAYOUTS: + case CMD_SHIFTS: + case CMD_USERSTATUS: + case CMD_MARKS: + case CMD_PSHIFT: + case CMD_SHSTA: + case CMD_USERINFO: + case CMD_BTCSET: + case CMD_QUERY: + case CMD_LOCKS: + case CMD_EVENTS: + case CMD_HIGH: + LOGERR("%s() INVALID message line %"PRIu64 + " ignored '%.42s...", + __func__, bq->count, + st = safe_text(msgline->msg)); + FREENULL(st); + break; + case CMD_HEARTBEAT: + case CMD_POOLSTAT: + case CMD_USERSTAT: + case CMD_WORKERSTAT: + case CMD_BLOCK: + if (key_update) + break; + case CMD_AUTH: + case CMD_ADDRAUTH: + if (confirm_sharesummary) + break; + case CMD_SHARELOG: + // This will return the same cmdnum or DUP + cmdnum = process_seq(msgline); + if (cmdnum != CMD_DUPSEQ) { + ans = ckdb_cmds[msgline->which_cmds].func(conn, + msgline->cmd, + msgline->id, + &(msgline->now), + by_default, + (char *)__func__, + inet_default, + &(msgline->cd), + msgline->trf_root, true); + FREENULL(ans); + } + // TODO: time stats from each msgline tv_t + break; + default: + // Force this switch to be updated if new cmds are added + quithere(1, "%s line %"PRIu64" '%s' - not " + "handled by reload", + bq->filename, bq->count, + st = safe_text_nonull(msgline->cmd)); + // Won't get here ... + FREENULL(st); + break; + } + + if (bq->ml_item) { + free_msgline_data(bq->ml_item, true, true); + K_WLOCK(msgline_free); + k_add_head(msgline_free, bq->ml_item); + K_WUNLOCK(msgline_free); + bq->ml_item = NULL; + } + free(bq->buf); +} + +static void *process_reload(__maybe_unused void *arg) +{ + pthread_t *procrel_pt; + PGconn *conn = NULL; + K_ITEM *bq_item = NULL; + char buf[128]; time_t now; - uint64_t processed = 0; + int i, *n, zeros; ts_t when, when_add; int ret; - pthread_detach(pthread_self()); + if (arg) + i = *(int *)(arg); + else { + pthread_detach(pthread_self()); - when_add.tv_sec = RELOAD_QUEUE_SLEEP_MS / 1000; - when_add.tv_nsec = (RELOAD_QUEUE_SLEEP_MS % 1000) * 1000000; + n = malloc(queue_threads * sizeof(int)); + procrel_pt = malloc(queue_threads * sizeof(*procrel_pt)); + for (i = 1; i < queue_threads; i++) { + n[i] = i; + create_pthread(&(procrel_pt[i]), process_reload, &(n[i])); + } + i = 0; - LOCK_INIT("db_procreload"); - rename_proc("db_procreload"); + LOGNOTICE("%s() starting", __func__); + } - LOGNOTICE("%s() starting", __func__); + if (queue_threads < 10) + zeros = 1; + else + zeros = (int)log10(queue_threads) + 1; + + snprintf(buf, sizeof(buf), "db_p%0*drload", zeros, i); + LOCK_INIT(buf); + rename_proc(buf); + + when_add.tv_sec = RELOAD_QUEUE_SLEEP_MS / 1000; + when_add.tv_nsec = (RELOAD_QUEUE_SLEEP_MS % 1000) * 1000000; conn = dbconnect(); now = time(NULL); @@ -5934,8 +6120,10 @@ static void *process_reload(__maybe_unused void *arg) while (!everyone_die) { K_WLOCK(breakqueue_free); bq_item = k_unlink_head(reload_done_breakqueue_store); - if (bq_item) + if (bq_item) { reload_processing++; + reload_processed++; + } K_WUNLOCK(breakqueue_free); if (!bq_item) { @@ -5958,8 +6146,6 @@ static void *process_reload(__maybe_unused void *arg) continue; } - processed++; - // Don't keep a connection for more than ~10s ... of processing if ((time(NULL) - now) > 10) { PQfinish(conn); @@ -5967,120 +6153,7 @@ static void *process_reload(__maybe_unused void *arg) now = time(NULL); } - DATA_BREAKQUEUE(bq, bq_item); - DATA_MSGLINE(msgline, bq->ml_item); - if (SEQALL_LOG) { - K_ITEM *seqall; - if (msgline->trf_root) { - seqall = find_transfer(msgline->trf_root, SEQALL); - if (seqall) { - LOGNOTICE("%s() SEQALL %d %s", - __func__, bq->cmdnum, - transfer_data(seqall)); - } - } - } - switch (bq->cmdnum) { - // Ignore - case CMD_REPLY: - case CMD_ALERTEVENT: - case CMD_ALERTOVENT: - break; - // Shouldn't be there - case CMD_TERMINATE: - case CMD_PING: - case CMD_VERSION: - case CMD_LOGLEVEL: - case CMD_FLUSH: - // Non pool commands, shouldn't be there - case CMD_ADDUSER: - case CMD_NEWPASS: - case CMD_CHKPASS: - case CMD_2FA: - case CMD_USERSET: - case CMD_WORKERSET: - case CMD_BLOCKLIST: - case CMD_BLOCKSTATUS: - case CMD_NEWID: - case CMD_PAYMENTS: - case CMD_WORKERS: - case CMD_ALLUSERS: - case CMD_HOMEPAGE: - case CMD_GETATTS: - case CMD_SETATTS: - case CMD_EXPATTS: - case CMD_GETOPTS: - case CMD_SETOPTS: - case CMD_DSP: - case CMD_STATS: - case CMD_PPLNS: - case CMD_PPLNS2: - case CMD_PAYOUTS: - case CMD_MPAYOUTS: - case CMD_SHIFTS: - case CMD_USERSTATUS: - case CMD_MARKS: - case CMD_PSHIFT: - case CMD_SHSTA: - case CMD_USERINFO: - case CMD_BTCSET: - case CMD_QUERY: - case CMD_LOCKS: - case CMD_EVENTS: - case CMD_HIGH: - LOGERR("%s() INVALID message line %"PRIu64 - " ignored '%.42s...", - __func__, bq->count, - st = safe_text(msgline->msg)); - FREENULL(st); - break; - case CMD_HEARTBEAT: - case CMD_POOLSTAT: - case CMD_USERSTAT: - case CMD_WORKERSTAT: - case CMD_BLOCK: - if (key_update) - break; - case CMD_AUTH: - case CMD_ADDRAUTH: - if (confirm_sharesummary) - break; - case CMD_SHARELOG: - // This will return the same cmdnum or DUP - cmdnum = process_seq(msgline); - if (cmdnum != CMD_DUPSEQ) { - ans = ckdb_cmds[msgline->which_cmds].func(conn, - msgline->cmd, - msgline->id, - &(msgline->now), - by_default, - (char *)__func__, - inet_default, - &(msgline->cd), - msgline->trf_root, true); - FREENULL(ans); - } - // TODO: time stats from each msgline tv_t - break; - default: - // Force this switch to be updated if new cmds are added - quithere(1, "%s line %"PRIu64" '%s' - not " - "handled by reload", - bq->filename, bq->count, - st = safe_text_nonull(msgline->cmd)); - // Won't get here ... - FREENULL(st); - break; - } - - if (bq->ml_item) { - free_msgline_data(bq->ml_item, true, true); - K_WLOCK(msgline_free); - k_add_head(msgline_free, bq->ml_item); - K_WUNLOCK(msgline_free); - bq->ml_item = NULL; - } - free(bq->buf); + process_reload_item(conn, bq_item); K_WLOCK(breakqueue_free); reload_processing--; @@ -6092,7 +6165,13 @@ static void *process_reload(__maybe_unused void *arg) PQfinish(conn); - LOGNOTICE("%s() exiting, processed %"PRIu64, __func__, processed); + if (!arg) { + for (i = 1; i < queue_threads; i++) + join_pthread(procrel_pt[i]); + + LOGNOTICE("%s() exiting, processed %"PRIu64, + __func__, reload_processed); + } return NULL; } @@ -6164,7 +6243,7 @@ static void reload_line(char *filename, char *buf, uint64_t count) pthread_cond_signal(&bq_reload_waitcond); mutex_unlock(&bq_reload_waitlock); - while (qcount > RELOAD_QUEUE_LIMIT) { + while (qcount > reload_queue_limit) { cksleep_ms(RELOAD_QUEUE_SLEEP_MS); K_RLOCK(breakqueue_free); qcount = reload_breakqueue_store->count; @@ -6546,129 +6625,57 @@ static void free_lost(SEQDATA *seqdata) } } -// TODO: equivalent of api_allow -static void *listener(void *arg) +static void *pqproc(void *arg) { + /* Process queued work - ensure pool0 is emptied first, + * even if there is pending pool0 data being processed by breaker() */ + static bool pool0 = true; + static tv_t wq_stt, wq_fin; + + pthread_t *queue_pt; PGconn *conn = NULL; - pthread_t log_pt; - pthread_t sock_pt; - pthread_t summ_pt; - pthread_t mark_pt; - pthread_t break_pt; K_ITEM *wq_item; time_t now = 0; - int bq, bqp, bqd, wq0count, wqcount, wqgot; - char ooo_buf[256]; - tv_t wq_stt, wq_fin; - double min, sec; + bool switch_msg = false, complete_msg; + int wqcount, wqgot; + char buf[128]; + double min, sec = 0; SEQSET *seqset = NULL; SEQDATA *seqdata; K_ITEM *ss_item; - int cpus, i; - bool reloader, cmder, pool0, switch_msg = false; - uint64_t proc0 = 0, proc1 = 0; + int i, *n, zeros; ts_t when, when_add; int ret; - pthread_detach(pthread_self()); - - when_add.tv_sec = CMD_QUEUE_SLEEP_MS / 1000; - when_add.tv_nsec = (CMD_QUEUE_SLEEP_MS % 1000) * 1000000; - - LOCK_INIT("db_plistener"); - rename_proc("db_plistener"); - - logqueue_free = k_new_list("LogQueue", sizeof(LOGQUEUE), - ALLOC_LOGQUEUE, LIMIT_LOGQUEUE, true); - logqueue_store = k_new_store(logqueue_free); - - breakqueue_free = k_new_list("BreakQueue", sizeof(BREAKQUEUE), - ALLOC_BREAKQUEUE, LIMIT_BREAKQUEUE, true); - reload_breakqueue_store = k_new_store(breakqueue_free); - reload_done_breakqueue_store = k_new_store(breakqueue_free); - cmd_breakqueue_store = k_new_store(breakqueue_free); - cmd_done_breakqueue_store = k_new_store(breakqueue_free); - -#if LOCK_CHECK - DLPRIO(logqueue, 94); - DLPRIO(breakqueue, PRIO_TERMINAL); -#endif - if (breakdown_threads <= 0) { - cpus = sysconf(_SC_NPROCESSORS_ONLN) ? : 1; - breakdown_threads = (int)(cpus / 3) ? : 1; - } - LOGWARNING("%s(): creating %d*2 breaker threads ...", - __func__, breakdown_threads); - reloader = true; - for (i = 0; i < breakdown_threads; i++) - create_pthread(&break_pt, breaker, &reloader); - cmder = false; - for (i = 0; i < breakdown_threads; i++) - create_pthread(&break_pt, breaker, &cmder); - - if (no_data_log == false) - create_pthread(&log_pt, logger, NULL); - - if (!confirm_sharesummary) - create_pthread(&sock_pt, socketer, arg); - - create_pthread(&summ_pt, summariser, NULL); - - create_pthread(&mark_pt, marker, NULL); - - plistener_using_data = true; + if (!arg) { + setnow(&wq_stt); - if (!setup_data()) { - if (!everyone_die) { - LOGEMERG("ABORTING"); - everyone_die = true; + n = malloc(queue_threads * sizeof(int)); + queue_pt = malloc(queue_threads * sizeof(*queue_pt)); + for (i = 1; i < queue_threads; i++) { + n[i] = i; + create_pthread(&(queue_pt[i]), pqproc, &(n[i])); } - goto sayonara; - } - - if (!everyone_die) { - K_RLOCK(workqueue_free); - wq0count = pool0_workqueue_store->count; - wqcount = pool_workqueue_store->count; - K_RUNLOCK(workqueue_free); - K_RLOCK(breakqueue_free); - bq = cmd_breakqueue_store->count; - bqp = cmd_processing; - bqd = cmd_done_breakqueue_store->count; - K_RUNLOCK(breakqueue_free); - - LOGWARNING("reload shares OoO %s", - ooo_status(ooo_buf, sizeof(ooo_buf))); - sequence_report(true); + } else { + i = *(int *)(arg); - LOGWARNING("%s(): ckdb ready, pool queue %d (%d/%d/%d/%d/%d)", - __func__, bq+bqp+bqd+wq0count+wqcount, - bq, bqp, bqd, wq0count, wqcount); + if (queue_threads < 10) + zeros = 1; + else + zeros = (int)log10(queue_threads) + 1; - /* Until startup_complete, the values should be ignored - * Setting them to 'now' means that they won't time out - * until after startup_complete */ - ck_wlock(&last_lock); - setnow(&last_heartbeat); - copy_tv(&last_workinfo, &last_heartbeat); - copy_tv(&last_share, &last_heartbeat); - copy_tv(&last_share_acc, &last_heartbeat); - copy_tv(&last_share_inv, &last_heartbeat); - copy_tv(&last_auth, &last_heartbeat); - ck_wunlock(&last_lock); + snprintf(buf, sizeof(buf), "db_p%0*dqproc", zeros, i); + LOCK_INIT(buf); + rename_proc(buf); + } - startup_complete = true; + when_add.tv_sec = CMD_QUEUE_SLEEP_MS / 1000; + when_add.tv_nsec = (CMD_QUEUE_SLEEP_MS % 1000) * 1000000; - setnow(&wq_stt); - conn = dbconnect(); - now = time(NULL); - wqgot = 0; - } + now = time(NULL); + conn = dbconnect(); + wqgot = 0; - LOGNOTICE("%s() processing pool0", __func__); - /* Process queued work - ensure pool0 is emptied first, - * even if there is pending pool0 data being processed by breaker() */ - pool0 = true; // Override checking until pool0 is complete wqcount = -1; while (!everyone_die) { @@ -6688,17 +6695,24 @@ static void *listener(void *arg) wq_item = k_unlink_head(pool_workqueue_store); wqcount = pool_workqueue_store->count; } + + if (wqcount == 0 && wq_stt.tv_sec != 0L) + setnow(&wq_fin); + + if (wq_item) { + if (pool0) + workqueue_proc0++; + else + workqueue_proc1++; + } K_WUNLOCK(workqueue_free); if (switch_msg) { switch_msg = false; LOGNOTICE("%s() pool0 complete, processed %"PRIu64, - __func__, proc0); + __func__, workqueue_proc0); } - if (wqcount == 0 && wq_stt.tv_sec != 0L) - setnow(&wq_fin); - /* Don't keep a connection for more than ~10s or ~10000 items * but always have a connection open */ if ((time(NULL) - now) > 10 || wqgot > 10000) { @@ -6709,24 +6723,27 @@ static void *listener(void *arg) } if (wq_item) { - if (pool0) - proc0++; - else - proc1++; wqgot++; process_queued(conn, wq_item); tick(); } + complete_msg = false; + K_WLOCK(workqueue_free); if (wqcount == 0 && wq_stt.tv_sec != 0L) { sec = tvdiff(&wq_fin, &wq_stt); - min = floor(sec / 60.0); - sec -= min * 60.0; - LOGWARNING("pool queue completed %.0fm %.3fs", min, sec); + complete_msg = true; // Used as the flag to display the message once wq_stt.tv_sec = 0L; reload_queue_complete = true; } + K_WUNLOCK(workqueue_free); + if (complete_msg) { + min = floor(sec / 60.0); + sec -= min * 60.0; + LOGWARNING("%s() pool queue completed %.0fm %.3fs", + __func__, min, sec); + } /* Checked outside lock but only changed under lock * This avoids taking out the lock repeatedly and the cleanup @@ -6771,15 +6788,127 @@ static void *listener(void *arg) } } -sayonara: + if (conn) + PQfinish(conn); + + if (!arg) { + for (i = 1; i < queue_threads; i++) + join_pthread(queue_pt[i]); + } + + return NULL; +} + +static void *listener(void *arg) +{ + pthread_t log_pt; + pthread_t sock_pt; + pthread_t summ_pt; + pthread_t mark_pt; + pthread_t break_pt; + int bq, bqp, bqd, wq0count, wqcount; + char ooo_buf[256]; + char buf[128]; + int cpus, i; + bool reloader, cmder; + + pthread_detach(pthread_self()); + + snprintf(buf, sizeof(buf), "db_p0qproc"); + LOCK_INIT(buf); + rename_proc(buf); + + logqueue_free = k_new_list("LogQueue", sizeof(LOGQUEUE), + ALLOC_LOGQUEUE, LIMIT_LOGQUEUE, true); + logqueue_store = k_new_store(logqueue_free); + + breakqueue_free = k_new_list("BreakQueue", sizeof(BREAKQUEUE), + ALLOC_BREAKQUEUE, LIMIT_BREAKQUEUE, true); + reload_breakqueue_store = k_new_store(breakqueue_free); + reload_done_breakqueue_store = k_new_store(breakqueue_free); + cmd_breakqueue_store = k_new_store(breakqueue_free); + cmd_done_breakqueue_store = k_new_store(breakqueue_free); + +#if LOCK_CHECK + DLPRIO(logqueue, 94); + DLPRIO(breakqueue, PRIO_TERMINAL); +#endif + if (breakdown_threads <= 0) { + cpus = sysconf(_SC_NPROCESSORS_ONLN); + if (cpus < 1) + cpus = 1; + breakdown_threads = (int)(cpus / BREAKDOWN_RATIO) ? : 1; + } + LOGWARNING("%s(): creating %d*2 breaker threads ...", + __func__, breakdown_threads); + reloader = true; + for (i = 0; i < breakdown_threads; i++) + create_pthread(&break_pt, breaker, &reloader); + cmder = false; + for (i = 0; i < breakdown_threads; i++) + create_pthread(&break_pt, breaker, &cmder); + + if (no_data_log == false) + create_pthread(&log_pt, logger, NULL); + + if (!confirm_sharesummary) + create_pthread(&sock_pt, socketer, arg); + + create_pthread(&summ_pt, summariser, NULL); + + create_pthread(&mark_pt, marker, NULL); + + plistener_using_data = true; + + if (!setup_data()) { + if (!everyone_die) { + LOGEMERG("ABORTING"); + everyone_die = true; + } + } + + if (!everyone_die) { + K_RLOCK(workqueue_free); + wq0count = pool0_workqueue_store->count; + wqcount = pool_workqueue_store->count; + K_RUNLOCK(workqueue_free); + K_RLOCK(breakqueue_free); + bq = cmd_breakqueue_store->count; + bqp = cmd_processing; + bqd = cmd_done_breakqueue_store->count; + K_RUNLOCK(breakqueue_free); + + LOGWARNING("reload shares OoO %s", + ooo_status(ooo_buf, sizeof(ooo_buf))); + sequence_report(true); + + LOGWARNING("%s(): ckdb ready, pool queue %d (%d/%d/%d/%d/%d)", + __func__, bq+bqp+bqd+wq0count+wqcount, + bq, bqp, bqd, wq0count, wqcount); + + /* Until startup_complete, the values should be ignored + * Setting them to 'now' means that they won't time out + * until after startup_complete */ + ck_wlock(&last_lock); + setnow(&last_heartbeat); + copy_tv(&last_workinfo, &last_heartbeat); + copy_tv(&last_share, &last_heartbeat); + copy_tv(&last_share_acc, &last_heartbeat); + copy_tv(&last_share_inv, &last_heartbeat); + copy_tv(&last_auth, &last_heartbeat); + ck_wunlock(&last_lock); + + startup_complete = true; + + LOGNOTICE("%s() processing pool0", __func__); + pqproc(NULL); + } + LOGNOTICE("%s() exiting, pool0 %"PRIu64" pool %"PRIu64, - __func__, proc0, proc1); + __func__, workqueue_proc0, workqueue_proc1); plistener_using_data = false; - if (conn) - PQfinish(conn); - POOLINSTANCE_RESET_MSG("exiting"); return NULL; @@ -7062,14 +7191,14 @@ static void update_check(int64_t markerid_stt, int64_t markerid_fin) LOGWARNING("update complete %.0fm %.3fs", min, sec); } -static void update_keysummary() +static void update_keysummary(ckpool_t *ckp) { int64_t markerid_stt, markerid_fin; char *tmp, *minus; tv_t db_stt, db_fin; - pthread_t break_pt; + pthread_t break_pt, sock_pt; double min, sec; - bool reloader; + bool reloader, cmder; int cpus, i; // Simple value check to abort early @@ -7132,14 +7261,20 @@ static void update_keysummary() DLPRIO(breakqueue, PRIO_TERMINAL); #endif if (breakdown_threads <= 0) { - cpus = sysconf(_SC_NPROCESSORS_ONLN) ? : 1; - breakdown_threads = (int)(cpus / 3) ? : 1; + cpus = sysconf(_SC_NPROCESSORS_ONLN); + if (cpus < 1) + cpus = 1; + breakdown_threads = (int)(cpus / BREAKDOWN_RATIO) ? : 1; } - LOGWARNING("%s(): creating %d breaker threads ...", + LOGWARNING("%s(): creating %d+1 breaker threads ...", __func__, breakdown_threads); reloader = true; for (i = 0; i < breakdown_threads; i++) create_pthread(&break_pt, breaker, &reloader); + cmder = false; + // Only needs one + for (i = 0; i < 1; i++) + create_pthread(&break_pt, breaker, &cmder); alloc_storage(); @@ -7147,6 +7282,8 @@ static void update_keysummary() setnow(&db_stt); + create_pthread(&sock_pt, socketer, &(ckp->main)); + if (!getdata1() || everyone_die) return; @@ -7773,6 +7910,8 @@ static struct option long_options[] = { { "name", required_argument, 0, 'n' }, { "dbpass", required_argument, 0, 'p' }, { "btc-pass", required_argument, 0, 'P' }, + { "reload-queue-limit", required_argument, 0, 'q' }, + { "queue-threads", required_argument, 0, 'Q' }, { "ckpool-logdir", required_argument, 0, 'r' }, { "logdir", required_argument, 0, 'R' }, { "sockdir", required_argument, 0, 's' }, @@ -7822,7 +7961,7 @@ int main(int argc, char **argv) memset(&ckp, 0, sizeof(ckp)); ckp.loglevel = LOG_NOTICE; - while ((c = getopt_long(argc, argv, "a:b:B:c:d:D:ghi:IkK:l:L:mM:n:p:P:r:R:s:S:t:Tu:U:vw:yY:", long_options, &i)) != -1) { + while ((c = getopt_long(argc, argv, "a:b:B:c:d:D:ghi:IkK:l:L:mM:n:p:P:q:Q:r:R:s:S:t:Tu:U:vw:yY:", long_options, &i)) != -1) { switch(c) { case '?': case ':': @@ -7850,11 +7989,11 @@ int main(int argc, char **argv) case 'B': { int bt = atoi(optarg); - if (bt < 1) { + if (bt < 1 || bt > THREAD_LIMIT) { quit(1, "Invalid breakdown " "thread count %d " - "- must be > 0", - bt); + "- must be >0 and <=%d", + bt, THREAD_LIMIT); } breakdown_threads = bt; } @@ -7985,6 +8124,29 @@ int main(int argc, char **argv) while (*kill) *(kill++) = '\0'; break; + case 'q': + { + int rql = atoi(optarg); + if (rql < 1) { + quit(1, "Invalid reload queue " + "limit %d - must be > 0", + rql); + } + reload_queue_limit = rql; + } + break; + case 'Q': + { + int qt = atoi(optarg); + if (qt < 1 || qt > THREAD_LIMIT) { + quit(1, "Invalid queue " + "thread count %d " + "- must be >0 and <=%d", + qt, THREAD_LIMIT); + } + queue_threads = qt; + } + break; case 'r': restorefrom = strdup(optarg); break; @@ -8156,11 +8318,13 @@ int main(int argc, char **argv) // Emulate a list for lock checking process_pplns_free = k_lock_only_list("ProcessPPLNS"); workers_db_free = k_lock_only_list("WorkersDB"); + users_db_free = k_lock_only_list("UsersDB"); event_limits_free = k_lock_only_list("EventLimits"); #if LOCK_CHECK DLPRIO(process_pplns, 99); DLPRIO(workers_db, 98); + DLPRIO(users_db, 97); DLPRIO(event_limits, 46); // events-2 #endif @@ -8173,7 +8337,12 @@ int main(int argc, char **argv) } if (key_update) { - update_keysummary(); + ckp.main.sockname = strdup("klistener"); + write_namepid(&ckp.main); + create_process_unixsock(&ckp.main); + fcntl(ckp.main.us.sockd, F_SETFD, FD_CLOEXEC); + + update_keysummary(&ckp); everyone_die = true; } else if (confirm_sharesummary) { // TODO: add a system lock to stop running 2 at once? diff --git a/src/ckdb.h b/src/ckdb.h index 8f8b760d..20796597 100644 --- a/src/ckdb.h +++ b/src/ckdb.h @@ -55,7 +55,7 @@ #define DB_VLOCK "1" #define DB_VERSION "1.0.7" -#define CKDB_VERSION DB_VERSION"-2.305" +#define CKDB_VERSION DB_VERSION"-2.400" #define WHERE_FFL " - from %s %s() line %d" #define WHERE_FFL_HERE __FILE__, __func__, __LINE__ @@ -338,6 +338,8 @@ extern bool db_load_complete; extern bool prereload; // Different input data handling extern bool reloading; +// Start marks processing during a larger reload +extern bool reloaded_N_files; // Data load is complete extern bool startup_complete; // Tell everyone to die @@ -1624,6 +1626,8 @@ extern K_TREE *users_root; extern K_TREE *userid_root; extern K_LIST *users_free; extern K_STORE *users_store; +// Emulate a list for lock checking +extern K_LIST *users_db_free; // USERATTS typedef struct useratts { @@ -1890,6 +1894,8 @@ extern tv_t last_bc; // current network diff extern double current_ndiff; extern bool txn_tree_store; +// avoid trying to run 2 ages at the same time +extern bool workinfo_age_lock; // Offset in binary coinbase1 of the block number #define BLOCKNUM_OFFSET 42 @@ -2013,7 +2019,6 @@ typedef struct sharesummary { tv_t lastshareacc; double lastdiffacc; char complete[TXT_FLAG+1]; - MODIFYDATECONTROLPOINTERS; } SHARESUMMARY; /* After this many shares added, we need to update the DB record @@ -2697,7 +2702,6 @@ typedef struct keysharesummary { tv_t lastshareacc; double lastdiffacc; char complete[TXT_FLAG+1]; - SIMPLEDATECONTROLPOINTERS; } KEYSHARESUMMARY; #define ALLOC_KEYSHARESUMMARY 1000 @@ -3142,10 +3146,9 @@ extern cmp_t cmp_workinfo_height(K_ITEM *a, K_ITEM *b); #define find_workinfo(_wid, _ctx) _find_workinfo(_wid, false, _ctx); extern K_ITEM *_find_workinfo(int64_t workinfoid, bool gotlock, K_TREE_CTX *ctx); extern K_ITEM *next_workinfo(int64_t workinfoid, K_TREE_CTX *ctx); -extern bool workinfo_age(int64_t workinfoid, char *poolinstance, char *by, - char *code, char *inet, tv_t *cd, tv_t *ss_first, - tv_t *ss_last, int64_t *ss_count, int64_t *s_count, - int64_t *s_diff); +extern bool workinfo_age(int64_t workinfoid, char *poolinstance, tv_t *cd, + tv_t *ss_first, tv_t *ss_last, int64_t *ss_count, + int64_t *s_count, int64_t *s_diff); extern double coinbase_reward(int32_t height); extern double workinfo_pps(K_ITEM *w_item, int64_t workinfoid); extern cmp_t cmp_shares(K_ITEM *a, K_ITEM *b); @@ -3166,8 +3169,7 @@ extern void zero_sharesummary(SHARESUMMARY *row); extern K_ITEM *_find_sharesummary(int64_t userid, char *workername, int64_t workinfoid, bool pool); extern K_ITEM *find_last_sharesummary(int64_t userid, char *workername); -extern void auto_age_older(int64_t workinfoid, char *poolinstance, char *by, - char *code, char *inet, tv_t *cd); +extern void auto_age_older(int64_t workinfoid, char *poolinstance, tv_t *cd); #define dbhash2btchash(_hash, _buf, _siz) \ _dbhash2btchash(_hash, _buf, _siz, WHERE_FFL_HERE) void _dbhash2btchash(char *hash, char *buf, size_t siz, WHERE_FFL_ARGS); @@ -3411,16 +3413,11 @@ extern bool sharesummaries_to_markersummaries(PGconn *conn, WORKMARKERS *workmar tv_t *cd, K_TREE *trf_root); extern bool delete_markersummaries(PGconn *conn, WORKMARKERS *wm); extern char *ooo_status(char *buf, size_t siz); -#define sharesummary_update(_s_row, _e_row, _by, _code, _inet, _cd) \ - _sharesummary_update(_s_row, _e_row, _by, _code, _inet, _cd, \ - WHERE_FFL_HERE) -extern bool _sharesummary_update(SHARES *s_row, SHAREERRORS *e_row, char *by, - char *code, char *inet, tv_t *cd, +#define sharesummary_update(_s_row, _e_row, _cd) \ + _sharesummary_update(_s_row, _e_row, _cd, WHERE_FFL_HERE) +extern bool _sharesummary_update(SHARES *s_row, SHAREERRORS *e_row, tv_t *cd, WHERE_FFL_ARGS); -#define sharesummary_age(_ss_item, _by, _code, _inet, _cd) \ - _sharesummary_age(_ss_item, _by, _code, _inet, _cd, WHERE_FFL_HERE) -extern bool _sharesummary_age(K_ITEM *ss_item, char *by, char *code, char *inet, - tv_t *cd, WHERE_FFL_ARGS); +extern bool sharesummary_age(K_ITEM *ss_item); extern bool keysharesummary_age(K_ITEM *kss_item); extern bool sharesummary_fill(PGconn *conn); extern bool blocks_stats(PGconn *conn, int32_t height, char *blockhash, diff --git a/src/ckdb_cmd.c b/src/ckdb_cmd.c index ee34e613..dfe8b576 100644 --- a/src/ckdb_cmd.c +++ b/src/ckdb_cmd.c @@ -2865,18 +2865,20 @@ seconf: } ok = workinfo_age(workinfoid, transfer_data(i_poolinstance), - by, code, inet, cd, &ss_first, &ss_last, - &ss_count, &s_count, &s_diff); + cd, &ss_first, &ss_last, &ss_count, &s_count, + &s_diff); if (!ok) { LOGERR("%s(%s) %s.failed.DATA", __func__, cmd, id); return strdup("failed.DATA"); } else { - /* Don't slow down the reload - do them later */ - if (!reloading || key_update) { + /* Don't slow down the reload - do them later, + * unless it's a long reload since: + * Any pool restarts in the reload data will cause + * unaged workinfos and thus would stop marker() */ + if (!reloading || key_update || reloaded_N_files) { // Aging is a queued item thus the reply is ignored auto_age_older(workinfoid, - transfer_data(i_poolinstance), - by, code, inet, cd); + transfer_data(i_poolinstance), cd); } } LOGDEBUG("%s.ok.aged %"PRId64, id, workinfoid); diff --git a/src/ckdb_data.c b/src/ckdb_data.c index c8702d67..20c6d88e 100644 --- a/src/ckdb_data.c +++ b/src/ckdb_data.c @@ -89,12 +89,6 @@ void free_sharesummary_data(K_ITEM *item) DATA_SHARESUMMARY(sharesummary, item); LIST_MEM_SUB(sharesummary_free, sharesummary->workername); FREENULL(sharesummary->workername); - SET_CREATEBY(sharesummary_free, sharesummary->createby, EMPTY); - SET_CREATECODE(sharesummary_free, sharesummary->createcode, EMPTY); - SET_CREATEINET(sharesummary_free, sharesummary->createinet, EMPTY); - SET_MODIFYBY(sharesummary_free, sharesummary->modifyby, EMPTY); - SET_MODIFYCODE(sharesummary_free, sharesummary->modifycode, EMPTY); - SET_MODIFYINET(sharesummary_free, sharesummary->modifyinet, EMPTY); } void free_optioncontrol_data(K_ITEM *item) @@ -128,9 +122,6 @@ void free_keysharesummary_data(K_ITEM *item) DATA_KEYSHARESUMMARY(keysharesummary, item); LIST_MEM_SUB(keysharesummary_free, keysharesummary->key); FREENULL(keysharesummary->key); - SET_CREATEBY(keysharesummary_free, keysharesummary->createby, EMPTY); - SET_CREATECODE(keysharesummary_free, keysharesummary->createcode, EMPTY); - SET_CREATEINET(keysharesummary_free, keysharesummary->createinet, EMPTY); } void free_keysummary_data(K_ITEM *item) @@ -2144,6 +2135,8 @@ static void discard_shares(int64_t *shares_tot, int64_t *shares_dumped, SHARES lookshares, *shares; K_TREE_CTX s_ctx[1]; char error[1024]; + bool multiple = false; + int64_t curr_userid; error[0] = '\0'; INIT_SHARES(&s_look); @@ -2154,6 +2147,7 @@ static void discard_shares(int64_t *shares_tot, int64_t *shares_dumped, DATE_ZERO(&(lookshares.createdate)); s_look.data = (void *)(&lookshares); + curr_userid = userid; K_WLOCK(shares_free); s_item = find_after_in_ktree(shares_root, &s_look, s_ctx); while (s_item) { @@ -2167,37 +2161,61 @@ static void discard_shares(int64_t *shares_tot, int64_t *shares_dumped, break; } + // Avoid releasing the lock the first time in + if (curr_userid == DISCARD_ALL) + curr_userid = shares->userid; + + /* The shares being removed here wont be touched by any other + * code, so we don't need to hold the shares_free lock the + * whole time, since that would slow down incoming share + * processing too much - this only affects DISCARD_ALL + * TODO: delete the shares when they are summarised in the + * sharesummary */ + if (shares->userid != curr_userid) { + K_WUNLOCK(shares_free); + curr_userid = shares->userid; + K_WLOCK(shares_free); + } + (*shares_tot)++; if (shares->errn == SE_NONE) (*diff_tot) += shares->diff; + if (reloading && skipupdate) { + (*shares_dumped)++; + if (error[0]) + multiple = true; + else { + snprintf(error, sizeof(error), + "%"PRId64"/%"PRId64"/%s/%s%.0f", + shares->workinfoid, + shares->userid, + shares->workername, + (shares->errn == SE_NONE) ? "" : "*", + shares->diff); + } + } tmp_item = next_in_ktree(s_ctx); remove_from_ktree(shares_root, s_item); k_unlink_item(shares_store, s_item); - if (reloading && skipupdate) - (*shares_dumped)++; - if (reloading && skipupdate && !error[0]) { - snprintf(error, sizeof(error), - "reload found aged share: %"PRId64 - "/%"PRId64"/%s/%s%.0f", - shares->workinfoid, - shares->userid, - shares->workername, - (shares->errn == SE_NONE) ? "" : "*", - shares->diff); - } k_add_head(shares_free, s_item); s_item = tmp_item; } K_WUNLOCK(shares_free); - if (error[0]) - LOGERR("%s(): %s", __func__, error); + if (error[0]) { + LOGERR("%s(): reload found %s aged share%s%s: %s", + __func__, multiple ? "multiple" : "an", + multiple ? "s" : EMPTY, + multiple ? ", the first was" : EMPTY, + error); + } + } // Duplicates during a reload are set to not show messages -bool workinfo_age(int64_t workinfoid, char *poolinstance, char *by, char *code, - char *inet, tv_t *cd, tv_t *ss_first, tv_t *ss_last, - int64_t *ss_count, int64_t *s_count, int64_t *s_diff) +bool workinfo_age(int64_t workinfoid, char *poolinstance, tv_t *cd, + tv_t *ss_first, tv_t *ss_last, int64_t *ss_count, + int64_t *s_count, int64_t *s_diff) { K_ITEM *wi_item, ss_look, *ss_item; K_ITEM ks_look, *ks_item, *wm_item; @@ -2208,6 +2226,7 @@ bool workinfo_age(int64_t workinfoid, char *poolinstance, char *by, char *code, int64_t diff_tot; KEYSHARESUMMARY lookkeysharesummary, *keysharesummary; SHARESUMMARY looksharesummary, *sharesummary; + char complete[TXT_FLAG+1]; WORKINFO *workinfo; bool ok = false, ksok = false, skipupdate = false; @@ -2269,8 +2288,10 @@ bool workinfo_age(int64_t workinfoid, char *poolinstance, char *by, char *code, ss_look.data = (void *)(&looksharesummary); K_RLOCK(sharesummary_free); ss_item = find_after_in_ktree(sharesummary_workinfoid_root, &ss_look, ss_ctx); - K_RUNLOCK(sharesummary_free); DATA_SHARESUMMARY_NULL(sharesummary, ss_item); + // complete could change, the id fields wont be changed/removed yet + STRNCPY(complete, sharesummary->complete); + K_RUNLOCK(sharesummary_free); while (ss_item && sharesummary->workinfoid == workinfoid) { ss_tot++; skipupdate = false; @@ -2278,7 +2299,7 @@ bool workinfo_age(int64_t workinfoid, char *poolinstance, char *by, char *code, * so finding an aged sharesummary here is an error * N.B. this can only happen with (very) old reload files */ if (reloading) { - if (sharesummary->complete[0] == SUMMARY_COMPLETE) { + if (complete[0] == SUMMARY_COMPLETE) { ss_already++; skipupdate = true; if (confirm_sharesummary) { @@ -2292,7 +2313,9 @@ bool workinfo_age(int64_t workinfoid, char *poolinstance, char *by, char *code, } if (!skipupdate) { - if (!sharesummary_age(ss_item, by, code, inet, cd)) { + K_WLOCK(sharesummary_free); + if (!sharesummary_age(ss_item)) { + K_WUNLOCK(sharesummary_free); ss_failed++; LOGERR("%s(): Failed to age sharesummary %"PRId64"/%s/%"PRId64, __func__, sharesummary->userid, @@ -2308,6 +2331,7 @@ bool workinfo_age(int64_t workinfoid, char *poolinstance, char *by, char *code, copy_tv(ss_first, &(sharesummary->firstshare)); if (tv_newer(ss_last, &(sharesummary->lastshare))) copy_tv(ss_last, &(sharesummary->lastshare)); + K_WUNLOCK(sharesummary_free); } } @@ -2318,8 +2342,9 @@ bool workinfo_age(int64_t workinfoid, char *poolinstance, char *by, char *code, K_RLOCK(sharesummary_free); ss_item = next_in_ktree(ss_ctx); - K_RUNLOCK(sharesummary_free); DATA_SHARESUMMARY_NULL(sharesummary, ss_item); + STRNCPY(complete, sharesummary->complete); + K_RUNLOCK(sharesummary_free); } if (ss_already || ss_failed || shares_dumped) { @@ -2350,8 +2375,10 @@ skip_ss: ks_look.data = (void *)(&lookkeysharesummary); K_RLOCK(keysharesummary_free); ks_item = find_after_in_ktree(keysharesummary_root, &ks_look, ks_ctx); - K_RUNLOCK(keysharesummary_free); DATA_KEYSHARESUMMARY_NULL(keysharesummary, ks_item); + // complete could change, the id fields wont be changed/removed yet + STRNCPY(complete, keysharesummary->complete); + K_RUNLOCK(keysharesummary_free); while (ks_item && keysharesummary->workinfoid == workinfoid) { ks_tot++; skipupdate = false; @@ -2359,7 +2386,7 @@ skip_ss: * so finding an aged keysharesummary here is an error * N.B. this can only happen with (very) old reload files */ if (reloading && !key_update) { - if (keysharesummary->complete[0] == SUMMARY_COMPLETE) { + if (complete[0] == SUMMARY_COMPLETE) { ks_already++; skipupdate = true; if (confirm_sharesummary) { @@ -2373,20 +2400,25 @@ skip_ss: } if (!skipupdate) { + K_WLOCK(keysharesummary_free); if (!keysharesummary_age(ks_item)) { ks_failed++; + K_WUNLOCK(keysharesummary_free); LOGERR("%s(): Failed to age keysharesummary %"PRId64"/%s/%s", __func__, keysharesummary->workinfoid, keysharesummary->keytype, keysharesummary->key); ksok = false; + } else { + K_WUNLOCK(keysharesummary_free); } } K_RLOCK(keysharesummary_free); ks_item = next_in_ktree(ks_ctx); - K_RUNLOCK(keysharesummary_free); DATA_KEYSHARESUMMARY_NULL(keysharesummary, ks_item); + STRNCPY(complete, keysharesummary->complete); + K_RUNLOCK(keysharesummary_free); } /* All shares should have been discarded during sharesummary @@ -2527,19 +2559,17 @@ cmp_t cmp_shareerrors(K_ITEM *a, K_ITEM *b) void dsp_sharesummary(K_ITEM *item, FILE *stream) { - char createdate_buf[DATE_BUFSIZ]; SHARESUMMARY *s; if (!item) fprintf(stream, "%s() called with (null) item\n", __func__); else { DATA_SHARESUMMARY(s, item); - tv_to_buf(&(s->createdate), createdate_buf, sizeof(createdate_buf)); fprintf(stream, " uid=%"PRId64" wn='%s' wid=%"PRId64" " - "da=%f ds=%f ss=%f c='%s' cd=%s\n", + "da=%f ds=%f ss=%f c='%s'\n", s->userid, s->workername, s->workinfoid, s->diffacc, s->diffsta, s->sharesta, - s->complete, createdate_buf); + s->complete); } } @@ -2632,8 +2662,7 @@ K_ITEM *find_last_sharesummary(int64_t userid, char *workername) } // key_update must age keysharesummary directly -static void key_auto_age_older(int64_t workinfoid, char *poolinstance, char *by, - char *code, char *inet, tv_t *cd) +static void key_auto_age_older(int64_t workinfoid, char *poolinstance, tv_t *cd) { static int64_t last_attempted_id = -1; static int64_t prev_found = 0; @@ -2651,6 +2680,14 @@ static void key_auto_age_older(int64_t workinfoid, char *poolinstance, char *by, int64_t age_id, do_id, to_id; bool ok, found; + K_WLOCK(workinfo_free); + if (workinfo_age_lock) { + K_WUNLOCK(workinfo_free); + return; + } else + workinfo_age_lock = true; + K_WUNLOCK(workinfo_free); + LOGDEBUG("%s(): workinfoid=%"PRId64" prev=%"PRId64, __func__, workinfoid, prev_found); age_id = prev_found; @@ -2663,15 +2700,15 @@ static void key_auto_age_older(int64_t workinfoid, char *poolinstance, char *by, INIT_KEYSHARESUMMARY(&look); look.data = (void *)(&lookkeysharesummary); - K_RLOCK(keysharesummary_free); - kss_item = find_after_in_ktree(keysharesummary_root, &look, ctx); - DATA_KEYSHARESUMMARY_NULL(keysharesummary, kss_item); - DATE_ZERO(&kss_first_min); DATE_ZERO(&kss_last_max); kss_count_tot = s_count_tot = s_diff_tot = 0; - found = false; + + K_RLOCK(keysharesummary_free); + kss_item = find_after_in_ktree(keysharesummary_root, &look, ctx); + DATA_KEYSHARESUMMARY_NULL(keysharesummary, kss_item); + while (kss_item && keysharesummary->workinfoid < workinfoid) { if (keysharesummary->complete[0] == SUMMARY_NEW) { age_id = keysharesummary->workinfoid; @@ -2686,9 +2723,9 @@ static void key_auto_age_older(int64_t workinfoid, char *poolinstance, char *by, LOGDEBUG("%s(): age_id=%"PRId64" found=%d", __func__, age_id, found); // Don't repeat searching old items to avoid accessing their ram - if (!found) + if (!found) { prev_found = workinfoid; - else { + } else { /* Process all the consecutive keysharesummaries that's aren't aged * This way we find each oldest 'batch' of keysharesummaries that have * been missed and can report the range of data that was aged, @@ -2700,9 +2737,9 @@ static void key_auto_age_older(int64_t workinfoid, char *poolinstance, char *by, do_id = age_id; to_id = 0; do { - ok = workinfo_age(do_id, poolinstance, by, code, inet, - cd, &kss_first, &kss_last, &kss_count, - &s_count, &s_diff); + ok = workinfo_age(do_id, poolinstance, cd, &kss_first, + &kss_last, &kss_count, &s_count, + &s_diff); kss_count_tot += kss_count; s_count_tot += s_count; @@ -2774,12 +2811,14 @@ static void key_auto_age_older(int64_t workinfoid, char *poolinstance, char *by, idrange, keysharerange); } } + K_WLOCK(workinfo_free); + workinfo_age_lock = false; + K_WUNLOCK(workinfo_free); } /* TODO: markersummary checking? * However, there should be no issues since the sharesummaries are removed */ -void auto_age_older(int64_t workinfoid, char *poolinstance, char *by, - char *code, char *inet, tv_t *cd) +void auto_age_older(int64_t workinfoid, char *poolinstance, tv_t *cd) { static int64_t last_attempted_id = -1; static int64_t prev_found = 0; @@ -2798,10 +2837,21 @@ void auto_age_older(int64_t workinfoid, char *poolinstance, char *by, bool ok, found; if (key_update) { - key_auto_age_older(workinfoid, poolinstance, by, code, inet, cd); + key_auto_age_older(workinfoid, poolinstance, cd); return; } + /* Simply lock out more than one from running at the same time + * This locks access to prev_found, repeat and last_attempted_id + * If any are missed they'll be aged by the next age_workinfo in 30s */ + K_WLOCK(workinfo_free); + if (workinfo_age_lock) { + K_WUNLOCK(workinfo_free); + return; + } else + workinfo_age_lock = true; + K_WUNLOCK(workinfo_free); + LOGDEBUG("%s(): workinfoid=%"PRId64" prev=%"PRId64, __func__, workinfoid, prev_found); age_id = prev_found; @@ -2814,15 +2864,15 @@ void auto_age_older(int64_t workinfoid, char *poolinstance, char *by, INIT_SHARESUMMARY(&look); look.data = (void *)(&looksharesummary); - K_RLOCK(sharesummary_free); - ss_item = find_after_in_ktree(sharesummary_workinfoid_root, &look, ctx); - DATA_SHARESUMMARY_NULL(sharesummary, ss_item); - DATE_ZERO(&ss_first_min); DATE_ZERO(&ss_last_max); ss_count_tot = s_count_tot = s_diff_tot = 0; - found = false; + + K_RLOCK(sharesummary_free); + ss_item = find_after_in_ktree(sharesummary_workinfoid_root, &look, ctx); + DATA_SHARESUMMARY_NULL(sharesummary, ss_item); + while (ss_item && sharesummary->workinfoid < workinfoid) { if (sharesummary->complete[0] == SUMMARY_NEW) { age_id = sharesummary->workinfoid; @@ -2852,9 +2902,9 @@ void auto_age_older(int64_t workinfoid, char *poolinstance, char *by, do_id = age_id; to_id = 0; do { - ok = workinfo_age(do_id, poolinstance, by, code, inet, - cd, &ss_first, &ss_last, &ss_count, - &s_count, &s_diff); + ok = workinfo_age(do_id, poolinstance, cd, &ss_first, + &ss_last, &ss_count, &s_count, + &s_diff); ss_count_tot += ss_count; s_count_tot += s_count; @@ -2926,6 +2976,9 @@ void auto_age_older(int64_t workinfoid, char *poolinstance, char *by, idrange, sharerange); } } + K_WLOCK(workinfo_free); + workinfo_age_lock = false; + K_WUNLOCK(workinfo_free); } void _dbhash2btchash(char *hash, char *buf, size_t siz, WHERE_FFL_ARGS) diff --git a/src/ckdb_dbio.c b/src/ckdb_dbio.c index a6cec406..ec866d05 100644 --- a/src/ckdb_dbio.c +++ b/src/ckdb_dbio.c @@ -214,6 +214,17 @@ char *pqerrmsg(PGconn *conn) #undef PQexec #undef PQexecParams +/* Debug level to display write transactions - 0 removes the code + * Also enables checking the isread flag */ +#define CKPQ_SHOW_WRITE 0 + +#define CKPQ_ISREAD1 "select " +#define CKPQ_ISREAD1LEN (sizeof(CKPQ_ISREAD1)-1) +#define CKPQ_ISREAD2 "declare " +#define CKPQ_ISREAD2LEN (sizeof(CKPQ_ISREAD2)-1) +#define CKPQ_ISREAD3 "fetch " +#define CKPQ_ISREAD3LEN (sizeof(CKPQ_ISREAD3)-1) + // Bug check to ensure no unexpected write txns occur PGresult *_CKPQexec(PGconn *conn, const char *qry, bool isread, WHERE_FFL_ARGS) { @@ -221,6 +232,40 @@ PGresult *_CKPQexec(PGconn *conn, const char *qry, bool isread, WHERE_FFL_ARGS) if (!isread && confirm_sharesummary) quitfrom(1, file, func, line, "BUG: write txn during confirm"); +#if CKPQ_SHOW_WRITE + if (isread) { + if ((strncmp(qry, CKPQ_ISREAD1, CKPQ_ISREAD1LEN) != 0) && + (strncmp(qry, CKPQ_ISREAD2, CKPQ_ISREAD2LEN) != 0) && + (strncmp(qry, CKPQ_ISREAD3, CKPQ_ISREAD3LEN) != 0)) { + LOGERR("%s() ERR: query flagged as read, but isn't" + WHERE_FFL, __func__, WHERE_FFL_PASS); + isread = false; + } + } else { + if ((strncmp(qry, CKPQ_ISREAD1, CKPQ_ISREAD1LEN) == 0) || + (strncmp(qry, CKPQ_ISREAD2, CKPQ_ISREAD2LEN) == 0) || + (strncmp(qry, CKPQ_ISREAD3, CKPQ_ISREAD3LEN) == 0)) { + LOGERR("%s() ERR: query flagged as write, but isn't" + WHERE_FFL, __func__, WHERE_FFL_PASS); + isread = true; + } + } + if (!isread) { + char *buf = NULL, ffl[128]; + size_t len, off; + + APPEND_REALLOC_INIT(buf, off, len); + APPEND_REALLOC(buf, off, len, __func__); + APPEND_REALLOC(buf, off, len, "() W: '"); + APPEND_REALLOC(buf, off, len, qry); + APPEND_REALLOC(buf, off, len, "'"); + snprintf(ffl, sizeof(ffl), WHERE_FFL, WHERE_FFL_PASS); + APPEND_REALLOC(buf, off, len, ffl); + LOGMSGBUF(CKPQ_SHOW_WRITE, buf); + FREENULL(buf); + } +#endif + return PQexec(conn, qry); } @@ -237,6 +282,47 @@ PGresult *_CKPQexecParams(PGconn *conn, const char *qry, if (!isread && confirm_sharesummary) quitfrom(1, file, func, line, "BUG: write txn during confirm"); +#if CKPQ_SHOW_WRITE + if (isread) { + if ((strncmp(qry, CKPQ_ISREAD1, CKPQ_ISREAD1LEN) != 0) && + (strncmp(qry, CKPQ_ISREAD2, CKPQ_ISREAD2LEN) != 0) && + (strncmp(qry, CKPQ_ISREAD3, CKPQ_ISREAD3LEN) != 0)) { + LOGERR("%s() ERR: query flagged as read, but isn't" + WHERE_FFL, __func__, WHERE_FFL_PASS); + isread = false; + } + } else { + if ((strncmp(qry, CKPQ_ISREAD1, CKPQ_ISREAD1LEN) == 0) || + (strncmp(qry, CKPQ_ISREAD2, CKPQ_ISREAD2LEN) == 0) || + (strncmp(qry, CKPQ_ISREAD3, CKPQ_ISREAD3LEN) == 0)) { + LOGERR("%s() ERR: query flagged as write, but isn't" + WHERE_FFL, __func__, WHERE_FFL_PASS); + isread = true; + } + } + if (!isread) { + char *buf = NULL, num[16], ffl[128]; + size_t len, off; + int i; + + APPEND_REALLOC_INIT(buf, off, len); + APPEND_REALLOC(buf, off, len, __func__); + APPEND_REALLOC(buf, off, len, "() W: '"); + APPEND_REALLOC(buf, off, len, qry); + APPEND_REALLOC(buf, off, len, "'"); + for (i = 0; i < nParams; i++) { + snprintf(num, sizeof(num), " $%d='", i+1); + APPEND_REALLOC(buf, off, len, num); + APPEND_REALLOC(buf, off, len, paramValues[i]); + APPEND_REALLOC(buf, off, len, "'"); + } + snprintf(ffl, sizeof(ffl), WHERE_FFL, WHERE_FFL_PASS); + APPEND_REALLOC(buf, off, len, ffl); + LOGMSGBUF(CKPQ_SHOW_WRITE, buf); + FREENULL(buf); + } +#endif + return PQexecParams(conn, qry, nParams, paramTypes, paramValues, paramLengths, paramFormats, resultFormat); } @@ -558,6 +644,17 @@ K_ITEM *users_add(PGconn *conn, char *username, char *emailaddress, LOGDEBUG("%s(): add", __func__); + /* 2 attempts to add the same user at the same time will only do it once + * The 2nd attempt will get back the data provided by the 1st + * and thus throw away any differences in the 2nd */ + K_WLOCK(users_db_free); + + item = find_users(username); + if (item) { + ok = true; + goto already; + } + K_WLOCK(users_free); item = k_unlink_head(users_free); K_WUNLOCK(users_free); @@ -665,6 +762,10 @@ unitem: } K_WUNLOCK(users_free); +already: + + K_WUNLOCK(users_db_free); + if (ok) return item; else @@ -1469,6 +1570,11 @@ bool workers_update(PGconn *conn, K_ITEM *item, char *difficultydefault, LOGDEBUG("%s(): update", __func__); + /* Two attempts to update the same worker at the same time + * will determine the final state based on which gets the lock last, + * i.e. randomly, but without overwriting at the same time */ + K_WLOCK(workers_db_free); + DATA_WORKERS(row, item); if (check) { @@ -1583,6 +1689,9 @@ unparam: for (n = 0; n < par; n++) free(params[n]); early: + + K_WUNLOCK(workers_db_free); + return ok; } @@ -3399,6 +3508,7 @@ static bool shares_process(PGconn *conn, SHARES *shares, K_ITEM *wi_item, { K_ITEM *w_item, *wm_item, *ss_item; SHARESUMMARY *sharesummary; + char complete[TXT_FLAG+1]; WORKINFO *workinfo; char *st = NULL; @@ -3479,13 +3589,14 @@ static bool shares_process(PGconn *conn, SHARES *shares, K_ITEM *wi_item, K_RLOCK(sharesummary_free); ss_item = find_sharesummary(shares->userid, shares->workername, shares->workinfoid); - K_RUNLOCK(sharesummary_free); if (ss_item) { DATA_SHARESUMMARY(sharesummary, ss_item); if (sharesummary->complete[0] != SUMMARY_NEW) { + STRNCPY(complete, sharesummary->complete); + K_RUNLOCK(sharesummary_free); LOGDEBUG("%s(): '%s' sharesummary exists " "%"PRId64" %"PRId64"/%s/%ld,%ld", - __func__, sharesummary->complete, + __func__, complete, shares->workinfoid, shares->userid, st = safe_text_nonull(shares->workername), shares->createdate.tv_sec, @@ -3495,6 +3606,7 @@ static bool shares_process(PGconn *conn, SHARES *shares, K_ITEM *wi_item, return true; } } + K_RUNLOCK(sharesummary_free); } if (!key_update && !confirm_sharesummary) { @@ -3504,8 +3616,7 @@ static bool shares_process(PGconn *conn, SHARES *shares, K_ITEM *wi_item, K_WUNLOCK(userinfo_free); } - sharesummary_update(shares, NULL, shares->createby, shares->createcode, - shares->createinet, &(shares->createdate)); + sharesummary_update(shares, NULL, &(shares->createdate)); return true; } @@ -3660,6 +3771,8 @@ bool shares_add(PGconn *conn, char *workinfoid, char *username, char *workername K_RLOCK(users_free); u_item = find_users(username); K_RUNLOCK(users_free); + /* Can't change outside lock since we don't delete users + * or change their *userid */ if (!u_item) { btv_to_buf(cd, cd_buf, sizeof(cd_buf)); /* This should never happen unless there's a bug in ckpool @@ -3672,7 +3785,6 @@ bool shares_add(PGconn *conn, char *workinfoid, char *username, char *workername goto tisbad; } DATA_USERS(users, u_item); - shares->userid = users->userid; TXT_TO_BIGINT("workinfoid", workinfoid, shares->workinfoid); @@ -4142,6 +4254,7 @@ static bool shareerrors_process(PGconn *conn, SHAREERRORS *shareerrors, { K_ITEM *w_item, *wm_item, *ss_item; SHARESUMMARY *sharesummary; + char complete[TXT_FLAG+1]; char *st = NULL; LOGDEBUG("%s() add", __func__); @@ -4185,13 +4298,14 @@ static bool shareerrors_process(PGconn *conn, SHAREERRORS *shareerrors, ss_item = find_sharesummary(shareerrors->userid, shareerrors->workername, shareerrors->workinfoid); - K_RUNLOCK(sharesummary_free); if (ss_item) { DATA_SHARESUMMARY(sharesummary, ss_item); if (sharesummary->complete[0] != SUMMARY_NEW) { + STRNCPY(complete, sharesummary->complete); + K_RUNLOCK(sharesummary_free); LOGDEBUG("%s(): '%s' sharesummary exists " "%"PRId64" %"PRId64"/%s/%ld,%ld", - __func__, sharesummary->complete, + __func__, complete, shareerrors->workinfoid, shareerrors->userid, st = safe_text_nonull(shareerrors->workername), @@ -4201,11 +4315,10 @@ static bool shareerrors_process(PGconn *conn, SHAREERRORS *shareerrors, return false; } } + K_RUNLOCK(sharesummary_free); } - sharesummary_update(NULL, shareerrors, shareerrors->createby, - shareerrors->createcode, shareerrors->createinet, - &(shareerrors->createdate)); + sharesummary_update(NULL, shareerrors, &(shareerrors->createdate)); return true; } @@ -5182,14 +5295,13 @@ flail: return ok; } +// Requires K_WLOCK(sharesummary_free) static void set_sharesummary_stats(SHARESUMMARY *row, SHARES *s_row, SHAREERRORS *e_row, bool new, double *tdf, double *tdl) { tv_t *createdate; - K_WLOCK(sharesummary_free); - if (s_row) createdate = &(s_row->createdate); else @@ -5251,15 +5363,11 @@ static void set_sharesummary_stats(SHARESUMMARY *row, SHARES *s_row, *tdf = tvdiff(createdate, &(row->firstshare)); *tdl = tvdiff(createdate, &(row->lastshare)); } - - K_WUNLOCK(sharesummary_free); } static void set_keysharesummary_stats(KEYSHARESUMMARY *row, SHARES *s_row, bool new) { - K_WLOCK(keysharesummary_free); - if (new) { zero_keysharesummary(row); copy_tv(&(row->firstshare), &(s_row->createdate)); @@ -5307,8 +5415,6 @@ static void set_keysharesummary_stats(KEYSHARESUMMARY *row, SHARES *s_row, row->sharerej++; break; } - - K_WUNLOCK(keysharesummary_free); } /* Keep some simple stats on how often shares are out of order @@ -5330,15 +5436,15 @@ char *ooo_status(char *buf, size_t siz) /* sharesummaries are no longer stored in the DB but fields are updated as b4 * This creates/updates both the sharesummaries and the keysharesummaries */ -bool _sharesummary_update(SHARES *s_row, SHAREERRORS *e_row, char *by, - char *code, char *inet, tv_t *cd, WHERE_FFL_ARGS) +bool _sharesummary_update(SHARES *s_row, SHAREERRORS *e_row, tv_t *cd, + WHERE_FFL_ARGS) { WORKMARKERS *wm; SHARESUMMARY *row, *p_row; KEYSHARESUMMARY *ki_row = NULL, *ka_row = NULL; K_ITEM *ss_item, *kiss_item = NULL, *kass_item = NULL, *wm_item, *p_item = NULL; bool new = false, p_new = false, ki_new = false, ka_new = false; - int64_t userid, workinfoid; + int64_t userid, workinfoid, markerid; char *workername, *address = NULL, *agent = NULL; char *st = NULL, *db = NULL; char ooo_buf[256]; @@ -5371,33 +5477,30 @@ bool _sharesummary_update(SHARES *s_row, SHAREERRORS *e_row, char *by, K_RLOCK(workmarkers_free); wm_item = find_workmarkers(workinfoid, false, MARKER_PROCESSED, NULL); - K_RUNLOCK(workmarkers_free); if (wm_item) { DATA_WORKMARKERS(wm, wm_item); + markerid = wm->markerid; + K_RUNLOCK(workmarkers_free); LOGERR("%s(): attempt to update sharesummary " "with %s %"PRId64"/%"PRId64"/%s "CDDB" %s" " but processed workmarkers %"PRId64" exists", __func__, s_row ? "shares" : "shareerrors", workinfoid, userid, st = safe_text(workername), - db = ctv_to_buf(cd, NULL, 0), - wm->markerid); + db = ctv_to_buf(cd, NULL, 0), markerid); FREENULL(st); FREENULL(db); return false; } + K_RUNLOCK(workmarkers_free); - K_RLOCK(sharesummary_free); + K_WLOCK(sharesummary_free); ss_item = find_sharesummary(userid, workername, workinfoid); - p_item = find_sharesummary_p(workinfoid); - K_RUNLOCK(sharesummary_free); if (ss_item) { DATA_SHARESUMMARY(row, ss_item); } else { new = true; - K_WLOCK(sharesummary_free); ss_item = k_unlink_head(sharesummary_free); - K_WUNLOCK(sharesummary_free); DATA_SHARESUMMARY(row, ss_item); bzero(row, sizeof(*row)); row->userid = userid; @@ -5409,20 +5512,23 @@ bool _sharesummary_update(SHARES *s_row, SHAREERRORS *e_row, char *by, // N.B. this directly updates the non-key data set_sharesummary_stats(row, s_row, e_row, new, &tdf, &tdl); + if (new) { + add_to_ktree(sharesummary_root, ss_item); + add_to_ktree(sharesummary_workinfoid_root, ss_item); + k_add_head(sharesummary_store, ss_item); + } + K_WUNLOCK(sharesummary_free); + // Ignore shareerrors for keysummaries if (s_row) { - K_RLOCK(keysharesummary_free); + K_WLOCK(keysharesummary_free); kiss_item = find_keysharesummary(workinfoid, KEYTYPE_IP, address); - kass_item = find_keysharesummary(workinfoid, KEYTYPE_AGENT, agent); - K_RUNLOCK(keysharesummary_free); if (kiss_item) { DATA_KEYSHARESUMMARY(ki_row, kiss_item); } else { ki_new = true; - K_WLOCK(keysharesummary_free); kiss_item = k_unlink_head(keysharesummary_free); - K_WUNLOCK(keysharesummary_free); DATA_KEYSHARESUMMARY(ki_row, kiss_item); bzero(ki_row, sizeof(*ki_row)); ki_row->workinfoid = workinfoid; @@ -5433,14 +5539,20 @@ bool _sharesummary_update(SHARES *s_row, SHAREERRORS *e_row, char *by, // N.B. this directly updates the non-key data set_keysharesummary_stats(ki_row, s_row, ki_new); + if (ki_new) { + add_to_ktree(keysharesummary_root, kiss_item); + k_add_head(keysharesummary_store, kiss_item); + } + K_WUNLOCK(keysharesummary_free); + + K_WLOCK(keysharesummary_free); + kass_item = find_keysharesummary(workinfoid, KEYTYPE_AGENT, agent); if (kass_item) { DATA_KEYSHARESUMMARY(ka_row, kass_item); } else { ka_new = true; - K_WLOCK(keysharesummary_free); kass_item = k_unlink_head(keysharesummary_free); - K_WUNLOCK(keysharesummary_free); DATA_KEYSHARESUMMARY(ka_row, kass_item); bzero(ka_row, sizeof(*ka_row)); ka_row->workinfoid = workinfoid; @@ -5451,6 +5563,11 @@ bool _sharesummary_update(SHARES *s_row, SHAREERRORS *e_row, char *by, // N.B. this directly updates the non-key data set_keysharesummary_stats(ka_row, s_row, ka_new); + if (ka_new) { + add_to_ktree(keysharesummary_root, kass_item); + k_add_head(keysharesummary_store, kass_item); + } + K_WUNLOCK(keysharesummary_free); } if (!new) { @@ -5506,13 +5623,14 @@ bool _sharesummary_update(SHARES *s_row, SHAREERRORS *e_row, char *by, } } + K_WLOCK(sharesummary_free); + p_item = find_sharesummary_p(workinfoid); + if (p_item) { DATA_SHARESUMMARY(p_row, p_item); } else { p_new = true; - K_WLOCK(sharesummary_free); p_item = k_unlink_head(sharesummary_free); - K_WUNLOCK(sharesummary_free); DATA_SHARESUMMARY(p_row, p_item); bzero(p_row, sizeof(*p_row)); POOL_SS(p_row); @@ -5521,42 +5639,17 @@ bool _sharesummary_update(SHARES *s_row, SHAREERRORS *e_row, char *by, set_sharesummary_stats(p_row, s_row, e_row, p_new, &tdf, &tdl); - MODIFYDATEPOINTERS(sharesummary_free, row, cd, by, code, inet); - - // Store either new item - if (new || p_new) { - K_WLOCK(sharesummary_free); - if (new) { - add_to_ktree(sharesummary_root, ss_item); - add_to_ktree(sharesummary_workinfoid_root, ss_item); - k_add_head(sharesummary_store, ss_item); - } - if (p_new) { - add_to_ktree(sharesummary_pool_root, p_item); - k_add_head(sharesummary_pool_store, p_item); - } - K_WUNLOCK(sharesummary_free); - } - - if (ki_new || ka_new) { - K_WLOCK(keysharesummary_free); - if (ki_new) { - add_to_ktree(keysharesummary_root, kiss_item); - k_add_head(keysharesummary_store, kiss_item); - } - if (ka_new) { - add_to_ktree(keysharesummary_root, kass_item); - k_add_head(keysharesummary_store, kass_item); - } - K_WUNLOCK(keysharesummary_free); + if (p_new) { + add_to_ktree(sharesummary_pool_root, p_item); + k_add_head(sharesummary_pool_store, p_item); } + K_WUNLOCK(sharesummary_free); return true; } // No key fields are modified -bool _sharesummary_age(K_ITEM *ss_item, char *by, char *code, char *inet, - tv_t *cd, WHERE_FFL_ARGS) +bool sharesummary_age(K_ITEM *ss_item) { SHARESUMMARY *row; @@ -5566,8 +5659,6 @@ bool _sharesummary_age(K_ITEM *ss_item, char *by, char *code, char *inet, row->complete[0] = SUMMARY_COMPLETE; row->complete[1] = '\0'; - MODIFYDATEPOINTERS(sharesummary_free, row, cd, by, code, inet); - return true; }