diff --git a/src/ckdb.c b/src/ckdb.c index 7de99bf6..19a2883e 100644 --- a/src/ckdb.c +++ b/src/ckdb.c @@ -776,6 +776,8 @@ typedef struct logqueue { static K_LIST *logqueue_free; static K_STORE *logqueue_store; +static pthread_mutex_t wq_waitlock; +static pthread_cond_t wq_waitcond; // WORKQUEUE typedef struct workqueue { @@ -6353,6 +6355,8 @@ static bool setup_data() cklock_init(&fpm_lock); cksem_init(&socketer_sem); + mutex_init(&wq_waitlock); + cond_init(&wq_waitcond); alloc_storage(); @@ -8668,6 +8672,9 @@ static void *socketer(__maybe_unused void *arg) K_WLOCK(workqueue_free); k_add_tail(workqueue_store, item); K_WUNLOCK(workqueue_free); + mutex_lock(&wq_waitlock); + pthread_cond_signal(&wq_waitcond); + mutex_unlock(&wq_waitlock); break; // Code error default: @@ -9046,8 +9053,19 @@ static void *listener(void *arg) if (wq_item) { process_queued(conn, wq_item); tick(); - } else - cksleep_ms(4); + } else { + const ts_t tsdiff = {0, 420000000}; + tv_t now; + ts_t abs; + + tv_time(&now); + tv_to_ts(&abs, &now); + timeraddspec(&abs, &tsdiff); + + mutex_lock(&wq_waitlock); + pthread_cond_timedwait(&wq_waitcond, &wq_waitlock, &abs); + mutex_unlock(&wq_waitlock); + } } PQfinish(conn); diff --git a/src/generator.c b/src/generator.c index d45bdcd8..03bc9fd9 100644 --- a/src/generator.c +++ b/src/generator.c @@ -237,6 +237,11 @@ retry: } } while (selret < 1); + if (unlikely(cs->fd < 0)) { + LOGWARNING("Bitcoind socket invalidated, will atempt failover"); + goto reconnect; + } + sockd = accept(us->sockd, NULL, NULL); if (sockd < 0) { LOGEMERG("Failed to accept on generator socket"); @@ -523,19 +528,19 @@ static bool subscribe_stratum(connsock_t *cs, proxy_instance_t *proxi) retry: /* Attempt to reconnect if the pool supports resuming */ if (proxi->sessionid) { - req = json_pack("{s:i,s:s,s:[s,s]}", + JSON_CPACK(req, "{s:i,s:s,s:[s,s]}", "id", proxi->id++, "method", "mining.subscribe", "params", PACKAGE"/"VERSION, proxi->sessionid); /* Then attempt to connect with just the client description */ } else if (!proxi->no_params) { - req = json_pack("{s:i,s:s,s:[s]}", + JSON_CPACK(req, "{s:i,s:s,s:[s]}", "id", proxi->id++, "method", "mining.subscribe", "params", PACKAGE"/"VERSION); /* Then try without any parameters */ } else { - req = json_pack("{s:i,s:s,s:[]}", + JSON_CPACK(req, "{s:i,s:s,s:[]}", "id", proxi->id++, "method", "mining.subscribe", "params"); @@ -580,7 +585,7 @@ static bool passthrough_stratum(connsock_t *cs, proxy_instance_t *proxi) json_t *req, *val = NULL, *res_val; bool ret = false; - req = json_pack("{s:s,s:[s]}", + JSON_CPACK(req, "{s:s,s:[s]}", "method", "mining.passthrough", "params", PACKAGE"/"VERSION); ret = send_json_msg(cs, req); @@ -701,7 +706,7 @@ static bool send_version(proxy_instance_t *proxi, json_t *val) connsock_t *cs = proxi->cs; bool ret; - json_msg = json_pack("{sossso}", "id", id_val, "result", PACKAGE"/"VERSION, + JSON_CPACK(json_msg, "{sossso}", "id", id_val, "result", PACKAGE"/"VERSION, "error", json_null()); ret = send_json_msg(cs, json_msg); json_decref(json_msg); @@ -864,7 +869,7 @@ static bool auth_stratum(connsock_t *cs, proxy_instance_t *proxi) json_t *val = NULL, *res_val, *req; bool ret; - req = json_pack("{s:i,s:s,s:[s,s]}", + JSON_CPACK(req, "{s:i,s:s,s:[s,s]}", "id", proxi->id++, "method", "mining.authorize", "params", proxi->auth, proxi->pass); @@ -913,7 +918,7 @@ static void send_subscribe(proxy_instance_t *proxi, int sockd) json_t *json_msg; char *msg; - json_msg = json_pack("{sssi}", "enonce1", proxi->enonce1, + JSON_CPACK(json_msg, "{sssi}", "enonce1", proxi->enonce1, "nonce2len", proxi->nonce2len); msg = json_dumps(json_msg, JSON_NO_UTF8); json_decref(json_msg); @@ -936,7 +941,7 @@ static void send_notify(proxy_instance_t *proxi, int sockd) for (i = 0; i < ni->merkles; i++) json_array_append_new(merkle_arr, json_string(&ni->merklehash[i][0])); /* Use our own jobid instead of the server's one for easy lookup */ - json_msg = json_pack("{sisssssssosssssssb}", + JSON_CPACK(json_msg, "{sisssssssosssssssb}", "jobid", ni->id, "prevhash", ni->prevhash, "coinbase1", ni->coinbase1, "coinbase2", ni->coinbase2, "merklehash", merkle_arr, "bbversion", ni->bbversion, @@ -956,7 +961,7 @@ static void send_diff(proxy_instance_t *proxi, int sockd) json_t *json_msg; char *msg; - json_msg = json_pack("{sf}", "diff", proxi->diff); + JSON_CPACK(json_msg, "{sf}", "diff", proxi->diff); msg = json_dumps(json_msg, JSON_NO_UTF8); json_decref(json_msg); send_unix_msg(sockd, msg); @@ -1156,7 +1161,7 @@ static void *proxy_send(void *arg) mutex_unlock(&proxi->notify_lock); if (jobid) { - val = json_pack("{s[ssooo]soss}", "params", proxi->auth, jobid, + JSON_CPACK(val, "{s[ssooo]soss}", "params", proxi->auth, jobid, json_object_dup(msg->json_msg, "nonce2"), json_object_dup(msg->json_msg, "ntime"), json_object_dup(msg->json_msg, "nonce"), diff --git a/src/libckpool.c b/src/libckpool.c index 572c860c..bc51fa40 100644 --- a/src/libckpool.c +++ b/src/libckpool.c @@ -933,6 +933,16 @@ out: } +void _json_check(json_t *val, json_error_t *err, const char *file, const char *func, const int line) +{ + if (likely(val)) + return; + + LOGERR("Invalid json line:%d col:%d pos:%d text: %s from %s %s:%d", + err->line, err->column, err->position, err->text, + file, func, line); +} + /* Extracts a string value from a json array with error checking. To be used * when the value of the string returned is only examined and not to be stored. * See json_array_string below */ diff --git a/src/libckpool.h b/src/libckpool.h index b3251370..98d3e54c 100644 --- a/src/libckpool.h +++ b/src/libckpool.h @@ -266,6 +266,17 @@ typedef struct unixsock unixsock_t; typedef struct proc_instance proc_instance_t; + +void _json_check(json_t *val, json_error_t *err, const char *file, const char *func, const int line); +#define json_check(val, err) _json_check(val, err, __FILE__, __func__, __LINE__) + +/* Check and pack json */ +#define JSON_CPACK(val, ...) do { \ + json_error_t err; \ + val = json_pack_ex(&err, 0, ##__VA_ARGS__); \ + json_check(val, &err); \ +} while (0) + /* No error checking with these, make sure we know they're valid already! */ static inline void json_strcpy(char *buf, json_t *val, const char *key) { diff --git a/src/stratifier.c b/src/stratifier.c index 64cd0b09..474410db 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -191,7 +191,7 @@ struct user_instance { UT_hash_handle hh; char username[128]; int64_t id; - bool new_user; + char *secondaryuserid; int workers; }; @@ -231,7 +231,6 @@ struct stratum_instance { user_instance_t *user_instance; char *useragent; char *workername; - char *secondaryuserid; int64_t user_id; ckpool_t *ckp; @@ -433,11 +432,15 @@ static void _ckdbq_add(ckpool_t *ckp, const int idtype, json_t *val, const char { static time_t time_counter; static int counter = 0; - char *json_msg; time_t now_t; char ch; + if (unlikely(!val)) { + LOGWARNING("Invalid json sent to ckdbq_add from %s %s:%d", file, func, line); + return; + } + now_t = time(NULL); if (now_t != time_counter) { /* Rate limit to 1 update per second */ @@ -447,11 +450,6 @@ static void _ckdbq_add(ckpool_t *ckp, const int idtype, json_t *val, const char fflush(stdout); } - if (!val) { - LOGWARNING("Invalid json sent to ckdbq_add from %s %s:%d", file, func, line); - return; - } - if (ckp->standalone) return json_decref(val); @@ -473,7 +471,7 @@ static void send_workinfo(ckpool_t *ckp, workbase_t *wb) sprintf(cdfield, "%lu,%lu", wb->gentime.tv_sec, wb->gentime.tv_nsec); - val = json_pack("{sI,ss,ss,ss,ss,ss,ss,ss,ss,sI,so,ss,ss,ss,ss}", + JSON_CPACK(val, "{sI,ss,ss,ss,ss,ss,ss,ss,ss,sI,so,ss,ss,ss,ss}", "workinfoid", wb->id, "poolinstance", ckp->name, "transactiontree", wb->txn_hashes, @@ -501,7 +499,7 @@ static void send_ageworkinfo(ckpool_t *ckp, int64_t id) ts_realtime(&ts_now); sprintf(cdfield, "%lu,%lu", ts_now.tv_sec, ts_now.tv_nsec); - val = json_pack("{sI,ss,ss,ss,ss,ss}", + JSON_CPACK(val, "{sI,ss,ss,ss,ss,ss}", "workinfoid", id, "poolinstance", ckp->name, "createdate", cdfield, @@ -966,7 +964,7 @@ static void stratum_broadcast_message(const char *msg) { json_t *json_msg; - json_msg = json_pack("{sosss[s]}", "id", json_null(), "method", "client.show_message", + JSON_CPACK(json_msg, "{sosss[s]}", "id", json_null(), "method", "client.show_message", "params", msg); stratum_broadcast(json_msg); } @@ -985,7 +983,7 @@ static void block_solve(ckpool_t *ckp) ASPRINTF(&msg, "Block %d solved by %s!", current_workbase->height, ckp->name); /* We send blank settings to ckdb with only the matching data from what we submitted * to say the block has been confirmed. */ - val = json_pack("{si,ss,sI,ss,ss,si,ss,ss,ss,sI,ss,ss,ss,ss}", + JSON_CPACK(val, "{si,ss,sI,ss,ss,si,ss,ss,ss,sI,ss,ss,ss,ss}", "height", current_workbase->height, "confirmed", "1", "workinfoid", current_workbase->id, @@ -1226,7 +1224,7 @@ static json_t *parse_subscribe(int64_t client_id, json_t *params_val) n2len = workbases->enonce2varlen; else n2len = 8; - ret = json_pack("[[[s,s]],s,i]", "mining.notify", client->enonce1, client->enonce1, + JSON_CPACK(ret, "[[[s,s]],s,i]", "mining.notify", client->enonce1, client->enonce1, n2len); ck_runlock(&workbase_lock); @@ -1248,10 +1246,9 @@ static user_instance_t *authorise_user(const char *workername) ck_ilock(&instance_lock); HASH_FIND_STR(user_instances, username, instance); if (!instance) { - /* New user instance */ + /* New user instance. Secondary user id will be NULL */ instance = ckzalloc(sizeof(user_instance_t)); strcpy(instance->username, username); - instance->new_user = true; ck_ulock(&instance_lock); instance->id = user_instance_id++; @@ -1269,6 +1266,7 @@ static user_instance_t *authorise_user(const char *workername) * thread so it won't hold anything up but other authorisations. */ static int send_recv_auth(stratum_instance_t *client) { + user_instance_t *user_instance = client->user_instance; ckpool_t *ckp = client->ckp; char *buf, *json_msg; char cdfield[64]; @@ -1279,8 +1277,8 @@ static int send_recv_auth(stratum_instance_t *client) ts_realtime(&now); sprintf(cdfield, "%lu,%lu", now.tv_sec, now.tv_nsec); - val = json_pack("{ss,ss,ss,ss,sI,ss,sb,ss,ss,ss,ss}", - "username", client->user_instance->username, + JSON_CPACK(val, "{ss,ss,ss,ss,sI,ss,sb,ss,ss,ss,ss}", + "username", user_instance->username, "workername", client->workername, "poolinstance", ckp->name, "useragent", client->useragent, @@ -1305,10 +1303,11 @@ static int send_recv_auth(stratum_instance_t *client) secondaryuserid = response; strsep(&secondaryuserid, "."); LOGINFO("User %s Worker %s got auth response: %s suid: %s", - client->user_instance->username, client->workername, + user_instance->username, client->workername, response, secondaryuserid); if (!safecmp(response, "ok") && secondaryuserid) { - client->secondaryuserid = strdup(secondaryuserid); + if (!user_instance->secondaryuserid) + user_instance->secondaryuserid = strdup(secondaryuserid); ret = 0; } } else { @@ -1332,7 +1331,7 @@ static void queue_delayed_auth(stratum_instance_t *client) ts_realtime(&now); sprintf(cdfield, "%lu,%lu", now.tv_sec, now.tv_nsec); - val = json_pack("{ss,ss,ss,ss,sI,ss,sb,ss,ss,ss,ss}", + JSON_CPACK(val, "{ss,ss,ss,ss,sI,ss,sb,ss,ss,ss,ss}", "username", client->user_instance->username, "workername", client->workername, "poolinstance", ckp->name, @@ -1393,7 +1392,7 @@ static json_t *parse_authorise(stratum_instance_t *client, json_t *params_val, j *errnum = send_recv_auth(client); if (!*errnum) ret = true; - else if (*errnum < 0 && !user_instance->new_user) { + else if (*errnum < 0 && user_instance->secondaryuserid) { /* This user has already been authorised but ckdb is * offline so we assume they already exist but add the * auth request to the queued messages. */ @@ -1402,10 +1401,8 @@ static json_t *parse_authorise(stratum_instance_t *client, json_t *params_val, j } } client->authorised = ret; - if (client->authorised) { + if (client->authorised) inc_worker(user_instance); - user_instance->new_user = false; - } out: return json_boolean(ret); } @@ -1414,7 +1411,7 @@ static void stratum_send_diff(stratum_instance_t *client) { json_t *json_msg; - json_msg = json_pack("{s[I]soss}", "params", client->diff, "id", json_null(), + JSON_CPACK(json_msg, "{s[I]soss}", "params", client->diff, "id", json_null(), "method", "mining.set_difficulty"); stratum_add_send(json_msg, client->id); } @@ -1423,7 +1420,7 @@ static void stratum_send_message(stratum_instance_t *client, const char *msg) { json_t *json_msg; - json_msg = json_pack("{sosss[s]}", "id", json_null(), "method", "client.show_message", + JSON_CPACK(json_msg, "{sosss[s]}", "id", json_null(), "method", "client.show_message", "params", msg); stratum_add_send(json_msg, client->id); } @@ -1587,7 +1584,7 @@ test_blocksolve(stratum_instance_t *client, workbase_t *wb, const uchar *data, c flip_32(swap, hash); __bin2hex(blockhash, swap, 32); - val = json_pack("{si,ss,ss,sI,ss,ss,sI,ss,ss,ss,sI,ss,ss,ss,ss}", + JSON_CPACK(val, "{si,ss,ss,sI,ss,ss,sI,ss,ss,ss,sI,ss,ss,ss,ss}", "height", wb->height, "blockhash", blockhash, "confirmed", "n", @@ -1698,7 +1695,7 @@ static void submit_share(stratum_instance_t *client, int64_t jobid, const char * char *msg; sprintf(enonce2, "%s%s", client->enonce1var, nonce2); - json_msg = json_pack("{sisssssssIsi}", "jobid", jobid, "nonce2", enonce2, + JSON_CPACK(json_msg, "{sisssssssIsi}", "jobid", jobid, "nonce2", enonce2, "ntime", ntime, "nonce", nonce, "client_id", client->id, "msg_id", msg_id); msg = json_dumps(json_msg, 0); @@ -1713,6 +1710,7 @@ static json_t *parse_submit(stratum_instance_t *client, json_t *json_msg, json_t *params_val, json_t **err_val) { bool share = false, result = false, invalid = true, submit = false; + user_instance_t *user_instance = client->user_instance; const char *user, *job_id, *nonce2, *ntime, *nonce; double diff = client->diff, wdiff = 0, sdiff = -1; char hexhash[68] = {}, sharehash[32], cdfield[64]; @@ -1856,7 +1854,7 @@ out_unlock: json_set_int(val, "workinfoid", id); json_set_int(val, "clientid", client->id); json_set_string(val, "enonce1", client->enonce1); - json_set_string(val, "secondaryuserid", client->secondaryuserid); + json_set_string(val, "secondaryuserid", user_instance->secondaryuserid); json_set_string(val, "nonce2", nonce2); json_set_string(val, "nonce", nonce); json_set_string(val, "ntime", ntime); @@ -1872,7 +1870,7 @@ out_unlock: json_set_string(val, "createcode", __func__); json_set_string(val, "createinet", ckp->serverurl); json_set_string(val, "workername", client->workername); - json_set_string(val, "username", client->user_instance->username); + json_set_string(val, "username", user_instance->username); if (ckp->logshares) { fp = fopen(fname, "a"); @@ -1890,13 +1888,13 @@ out_unlock: ckdbq_add(ckp, ID_SHARES, val); out: if (!share) { - val = json_pack("{sI,ss,ss,sI,ss,ss,so,si,ss,ss,ss,ss}", + JSON_CPACK(val, "{sI,ss,ss,sI,ss,ss,so,si,ss,ss,ss,ss}", "clientid", client->id, - "secondaryuserid", client->secondaryuserid, + "secondaryuserid", user_instance->secondaryuserid, "enonce1", client->enonce1, "workinfoid", current_workbase->id, "workername", client->workername, - "username", client->user_instance->username, + "username", user_instance->username, "error", json_copy(*err_val), "errn", err, "createdate", cdfield, @@ -1915,7 +1913,7 @@ static json_t *__stratum_notify(bool clean) { json_t *val; - val = json_pack("{s:[ssssosssb],s:o,s:s}", + JSON_CPACK(val, "{s:[ssssosssb],s:o,s:s}", "params", current_workbase->idstring, current_workbase->prevhash, @@ -1958,7 +1956,7 @@ static void send_json_err(int64_t client_id, json_t *id_val, const char *err_msg { json_t *val; - val = json_pack("{soss}", "id", json_copy(id_val), "error", err_msg); + JSON_CPACK(val, "{soss}", "id", json_copy(id_val), "error", err_msg); stratum_add_send(val, client_id); } @@ -2318,7 +2316,7 @@ static void update_userstats(ckpool_t *ckp) ghs5 = client->dsps5 * nonces; ghs60 = client->dsps60 * nonces; ghs1440 = client->dsps1440 * nonces; - val = json_pack("{ss,sI,si,ss,ss,sf,sf,sf,sf,sb,ss,ss,ss,ss}", + JSON_CPACK(val, "{ss,sI,si,ss,ss,sf,sf,sf,sf,sb,ss,ss,ss,ss}", "poolinstance", ckp->name, "instanceid", client->id, "elapsed", elapsed, @@ -2406,7 +2404,7 @@ static void *statsupdate(void *arg) if (unlikely(!fp)) LOGERR("Failed to fopen %s", fname); - val = json_pack("{si,si,si}", + JSON_CPACK(val, "{si,si,si}", "runtime", diff.tv_sec, "Users", stats.users, "Workers", stats.workers); @@ -2416,7 +2414,7 @@ static void *statsupdate(void *arg) fprintf(fp, "%s\n", s); dealloc(s); - val = json_pack("{ss,ss,ss,ss,ss,ss}", + JSON_CPACK(val, "{ss,ss,ss,ss,ss,ss}", "hashrate1m", suffix1, "hashrate5m", suffix5, "hashrate15m", suffix15, @@ -2429,7 +2427,7 @@ static void *statsupdate(void *arg) fprintf(fp, "%s\n", s); dealloc(s); - val = json_pack("{sf,sf,sf,sf}", + JSON_CPACK(val, "{sf,sf,sf,sf}", "SPS1m", sps1, "SPS5m", sps5, "SPS15m", sps15, @@ -2469,7 +2467,7 @@ static void *statsupdate(void *arg) ghs = client->dsps1440 * nonces; suffix_string(ghs, suffix1440, 16, 0); - val = json_pack("{ss,ss,ss,ss}", + JSON_CPACK(val, "{ss,ss,ss,ss}", "hashrate1m", suffix1, "hashrate5m", suffix5, "hashrate1hr", suffix60, @@ -2495,7 +2493,7 @@ static void *statsupdate(void *arg) ts_realtime(&ts_now); sprintf(cdfield, "%lu,%lu", ts_now.tv_sec, ts_now.tv_nsec); - val = json_pack("{ss,si,si,si,sf,sf,sf,sf,ss,ss,ss,ss}", + JSON_CPACK(val, "{ss,si,si,si,sf,sf,sf,sf,ss,ss,ss,ss}", "poolinstance", ckp->name, "elapsed", diff.tv_sec, "users", stats.users,