diff --git a/src/ckpool.c b/src/ckpool.c index 68cf93fd..b2b6eb00 100644 --- a/src/ckpool.c +++ b/src/ckpool.c @@ -137,27 +137,27 @@ static void *ckmsg_queue(void *arg) /* Generic workqueue function and message receiving and parsing thread */ static void *ckwq_queue(void *arg) { - ckwq_t *ckwq = (ckwq_t *)arg; - ckpool_t *ckp = ckwq->ckp; + ckwq_t *ckmsgq = (ckwq_t *)arg; + ckpool_t *ckp = ckmsgq->ckp; pthread_detach(pthread_self()); - rename_proc(ckwq->name); + rename_proc(ckmsgq->name); while (42) { ckwqmsg_t *wqmsg; tv_t now; ts_t abs; - mutex_lock(ckwq->lock); + mutex_lock(ckmsgq->lock); tv_time(&now); tv_to_ts(&abs, &now); abs.tv_sec++; - if (!ckwq->wqmsgs) - cond_timedwait(ckwq->cond, ckwq->lock, &abs); - wqmsg = ckwq->wqmsgs; + if (!ckmsgq->wqmsgs) + cond_timedwait(ckmsgq->cond, ckmsgq->lock, &abs); + wqmsg = ckmsgq->wqmsgs; if (wqmsg) - DL_DELETE(ckwq->wqmsgs, wqmsg); - mutex_unlock(ckwq->lock); + DL_DELETE(ckmsgq->wqmsgs, wqmsg); + mutex_unlock(ckmsgq->lock); if (!wqmsg) continue; @@ -209,7 +209,7 @@ ckmsgq_t *create_ckmsgqs(ckpool_t *ckp, const char *name, const void *func, cons ckwq_t *create_ckwqs(ckpool_t *ckp, const char *name, const int count) { - ckwq_t *ckwq = ckzalloc(sizeof(ckwq_t) * count); + ckwq_t *ckwq = ckzalloc(sizeof(ckmsgq_t) * count); mutex_t *lock; pthread_cond_t *cond; int i; @@ -575,14 +575,32 @@ out: static void childsighandler(const int sig); -/* Send a single message to a process instance when there will be no response, - * closing the socket immediately. */ -void _send_proc(proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line) +struct proc_message { + proc_instance_t *pi; + char *msg; + const char *file; + const char *func; + int line; +}; + +/* Send all one way messages asynchronously so we can wait till the receiving + * end closes the socket to ensure all messages are received but no deadlocks + * can occur with 2 processes waiting for each other's socket closure. */ +void *async_send_proc(void *arg) { + struct proc_message *pm = (struct proc_message *)arg; + proc_instance_t *pi = pm->pi; + char *msg = pm->msg; + const char *file = pm->file; + const char *func = pm->func; + int line = pm->line; + char *path = pi->us.path; bool ret = false; int sockd; + pthread_detach(pthread_self()); + if (unlikely(!path || !strlen(path))) { LOGERR("Attempted to send message %s to null path in send_proc", msg ? msg : ""); goto out; @@ -593,14 +611,16 @@ void _send_proc(proc_instance_t *pi, const char *msg, const char *file, const ch } /* At startup the pid fields are not set up before some processes are * forked so they never inherit them. */ - if (unlikely(!pi->pid)) + if (unlikely(!pi->pid)) { pi->pid = get_proc_pid(pi); - if (!pi->pid) { - LOGALERT("Attempting to send message %s to non existent process %s", msg, pi->processname); - return; + if (!pi->pid) { + LOGALERT("Attempting to send message %s to non existent process %s", msg, pi->processname); + goto out_nofail; + } } if (unlikely(kill_pid(pi->pid, 0))) { - LOGALERT("Attempting to send message %s to non existent process %s", msg, pi->processname); + LOGALERT("Attempting to send message %s to non existent process %s pid %d", + msg, pi->processname, pi->pid); goto out; } sockd = open_unix_client(path); @@ -620,41 +640,25 @@ out: LOGERR("Failure in send_proc from %s %s:%d", file, func, line); childsighandler(15); } -} - -struct proc_message { - proc_instance_t *pi; - char *msg; - const char *file; - const char *func; - int line; -}; - -static void asp_send(ckpool_t __maybe_unused *ckp, struct proc_message *pm) -{ - _send_proc(pm->pi, pm->msg, pm->file, pm->func, pm->line); - free(pm->msg); +out_nofail: + free(msg); free(pm); + return NULL; } -/* Fore sending asynchronous messages to another process, the sending process - * must have ckwqs of its own, referenced in the ckpool structure */ -void _async_send_proc(ckpool_t *ckp, proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line) +/* Send a single message to a process instance when there will be no response, + * closing the socket immediately. */ +void _send_proc(proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line) { - struct proc_message *pm; + struct proc_message *pm = ckalloc(sizeof(struct proc_message)); + pthread_t pth; - if (unlikely(!ckp->ckwqs)) { - LOGALERT("Workqueues not set up in async_send_proc!"); - _send_proc(pi, msg, file, func, line); - return; - } - pm = ckzalloc(sizeof(struct proc_message)); pm->pi = pi; pm->msg = strdup(msg); pm->file = file; pm->func = func; pm->line = line; - ckwq_add(ckp->ckwqs, &asp_send, pm); + create_pthread(&pth, async_send_proc, pm); } /* Send a single message to a process instance and retrieve the response, then diff --git a/src/ckpool.h b/src/ckpool.h index 3193db89..2c3bcb0b 100644 --- a/src/ckpool.h +++ b/src/ckpool.h @@ -213,8 +213,6 @@ struct ckpool_instance { /* Private data for each process */ void *data; - /* Private generic workqueues if this process has them */ - ckwq_t *ckwqs; }; #ifdef USE_CKDB @@ -239,8 +237,6 @@ void empty_buffer(connsock_t *cs); int read_socket_line(connsock_t *cs, const int timeout); void _send_proc(proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line); #define send_proc(pi, msg) _send_proc(pi, msg, __FILE__, __func__, __LINE__) -void _async_send_proc(ckpool_t *ckp, proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line); -#define async_send_proc(ckp, pi, msg) _async_send_proc(ckp, pi, msg, __FILE__, __func__, __LINE__) char *_send_recv_proc(proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line); #define send_recv_proc(pi, msg) _send_recv_proc(pi, msg, __FILE__, __func__, __LINE__) char *_send_recv_ckdb(const ckpool_t *ckp, const char *msg, const char *file, const char *func, const int line); diff --git a/src/generator.c b/src/generator.c index a47dfe74..1eb7e8fb 100644 --- a/src/generator.c +++ b/src/generator.c @@ -221,7 +221,7 @@ retry: cs = &alive->cs; LOGINFO("Connected to live server %s:%s", cs->url, cs->port); out: - async_send_proc(ckp, ckp->connector, alive ? "accept" : "reject"); + send_proc(ckp->connector, alive ? "accept" : "reject"); return alive; } @@ -367,7 +367,7 @@ retry: ret = submit_block(cs, buf + 12 + 64 + 1); memset(buf + 12 + 64, 0, 1); sprintf(blockmsg, "%sblock:%s", ret ? "" : "no", buf + 12); - async_send_proc(ckp, ckp->stratifier, blockmsg); + send_proc(ckp->stratifier, blockmsg); } else if (cmdmatch(buf, "checkaddr:")) { if (validate_address(cs, buf + 10)) send_unix_msg(sockd, "true"); @@ -972,7 +972,7 @@ static void send_diff(ckpool_t *ckp, proxy_instance_t *proxi) json_decref(json_msg); ASPRINTF(&buf, "diff=%s", msg); free(msg); - async_send_proc(ckp, ckp->stratifier, buf); + send_proc(ckp->stratifier, buf); free(buf); } @@ -1007,7 +1007,7 @@ static void send_notify(ckpool_t *ckp, proxy_instance_t *proxi) json_decref(json_msg); ASPRINTF(&buf, "notify=%s", msg); free(msg); - async_send_proc(ckp, ckp->stratifier, buf); + send_proc(ckp->stratifier, buf); free(buf); } @@ -1195,7 +1195,7 @@ static void send_subscribe(ckpool_t *ckp, proxy_instance_t *proxi) json_decref(json_msg); ASPRINTF(&buf, "subscribe=%s", msg); free(msg); - async_send_proc(ckp, ckp->stratifier, buf); + send_proc(ckp->stratifier, buf); free(buf); } @@ -1428,7 +1428,7 @@ static void *passthrough_recv(void *arg) if (proxy_alive(ckp, si, proxi, cs, false)) { proxi->alive = true; - async_send_proc(ckp, ckp->generator, "reconnect"); + send_proc(ckp->generator, "reconnect"); LOGWARNING("Proxy %d:%s connection established", proxi->id, proxi->si->url); } @@ -1439,13 +1439,13 @@ static void *passthrough_recv(void *arg) while (!proxy_alive(ckp, si, proxi, cs, true)) { if (proxi->alive) { proxi->alive = false; - async_send_proc(ckp, ckp->generator, "reconnect"); + send_proc(ckp->generator, "reconnect"); } sleep(5); } if (!proxi->alive) { proxi->alive = true; - async_send_proc(ckp, ckp->generator, "reconnect"); + send_proc(ckp->generator, "reconnect"); } do { @@ -1456,13 +1456,13 @@ static void *passthrough_recv(void *arg) LOGWARNING("Proxy %d:%s failed to read_socket_line in proxy_recv, attempting reconnect", proxi->id, proxi->si->url); proxi->alive = false; - async_send_proc(ckp, ckp->generator, "reconnect"); + send_proc(ckp->generator, "reconnect"); continue; } /* Simply forward the message on, as is, to the connector to * process. Possibly parse parameters sent by upstream pool * here */ - async_send_proc(ckp, ckp->connector, cs->buf); + send_proc(ckp->connector, cs->buf); } return NULL; } @@ -1496,7 +1496,7 @@ static void *proxy_recv(void *arg) if (proxy_alive(ckp, si, proxi, cs, false)) { proxi->alive = true; - async_send_proc(ckp, ckp->generator, "reconnect"); + send_proc(ckp->generator, "reconnect"); LOGWARNING("Proxy %d:%s connection established", proxi->id, proxi->si->url); } @@ -1510,7 +1510,7 @@ static void *proxy_recv(void *arg) while (!proxy_alive(ckp, si, proxi, cs, true)) { if (proxi->alive) { proxi->alive = false; - async_send_proc(ckp, ckp->generator, "reconnect"); + send_proc(ckp->generator, "reconnect"); } sleep(5); proxi->reconnect_time = time(NULL); @@ -1522,7 +1522,7 @@ static void *proxy_recv(void *arg) LOGWARNING("Proxy %d:%s recovered", proxi->id, proxi->si->url); proxi->alive = true; proxi->reconnect_time = 0; - async_send_proc(ckp, ckp->generator, "reconnect"); + send_proc(ckp->generator, "reconnect"); } now = time(NULL); @@ -1572,7 +1572,7 @@ static void *proxy_recv(void *arg) * pool is up */ proxi->reconnect = false; proxi->alive = false; - async_send_proc(ckp, ckp->generator, "reconnect"); + send_proc(ckp->generator, "reconnect"); LOGWARNING("Proxy %d:%s reconnect issue, dropping existing connection", proxi->id, proxi->si->url); Close(cs->fd); @@ -1644,7 +1644,7 @@ static proxy_instance_t *best_proxy(ckpool_t *ckp, gdata_t *gdata) break; sleep(1); } - async_send_proc(ckp, ckp->connector, ret ? "accept" : "reject"); + send_proc(ckp->connector, ret ? "accept" : "reject"); return ret; } @@ -1672,7 +1672,7 @@ reconnect: proxi->id, cs->url, cs->port); dealloc(buf); ASPRINTF(&buf, "proxy=%d", proxi->id); - async_send_proc(ckp, ckp->stratifier, buf); + send_proc(ckp->stratifier, buf); /* Send a notify for the new chosen proxy or the * stratifier won't be able to switch. */ send_notify(ckp, proxi); @@ -1857,7 +1857,7 @@ static void server_watchdog(ckpool_t *ckp, server_instance_t *cursi) break; } if (alive) - async_send_proc(ckp, ckp->generator, "reconnect"); + send_proc(ckp->generator, "reconnect"); } int generator(proc_instance_t *pi) diff --git a/src/stratifier.c b/src/stratifier.c index 3e8760d8..792972fb 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -336,10 +336,12 @@ struct stratifier_data { char lasthash[68]; char lastswaphash[68]; - ckwq_t *ckwqs; // Generic workqueues ckmsgq_t *ssends; // Stratum sends + ckmsgq_t *srecvs; // Stratum receives ckmsgq_t *ckdbq; // ckdb + ckmsgq_t *sshareq; // Stratum share sends ckmsgq_t *sauthq; // Stratum authorisations + ckmsgq_t *stxnq; // Transaction requests int64_t user_instance_id; @@ -808,26 +810,38 @@ static void send_generator(ckpool_t *ckp, const char *msg, const int prio) set = true; } else set = false; - async_send_proc(ckp, ckp->generator, msg); + send_proc(ckp->generator, msg); if (set) sdata->gen_priority = 0; } +struct update_req { + pthread_t *pth; + ckpool_t *ckp; + int prio; +}; + static void broadcast_ping(sdata_t *sdata); /* This function assumes it will only receive a valid json gbt base template * since checking should have been done earlier, and creates the base template * for generating work templates. */ -static void do_update(ckpool_t *ckp, int *prio) +static void *do_update(void *arg) { + struct update_req *ur = (struct update_req *)arg; + ckpool_t *ckp = ur->ckp; sdata_t *sdata = ckp->data; bool new_block = false; + int prio = ur->prio; bool ret = false; workbase_t *wb; json_t *val; char *buf; - buf = send_recv_generator(ckp, "getbase", *prio); + pthread_detach(pthread_self()); + rename_proc("updater"); + + buf = send_recv_generator(ckp, "getbase", prio); if (unlikely(!buf)) { LOGNOTICE("Get base in update_base delayed due to higher priority request"); goto out; @@ -887,17 +901,21 @@ out: LOGINFO("Broadcast ping due to failed stratum base update"); broadcast_ping(sdata); } - free(buf); - free(prio); + dealloc(buf); + free(ur->pth); + free(ur); + return NULL; } static void update_base(ckpool_t *ckp, const int prio) { - int *pprio = ckalloc(sizeof(int)); - sdata_t *sdata = ckp->data; + struct update_req *ur = ckalloc(sizeof(struct update_req)); + pthread_t *pth = ckalloc(sizeof(pthread_t)); - *pprio = prio; - ckwq_add(sdata->ckwqs, &do_update, pprio); + ur->pth = pth; + ur->ckp = ckp; + ur->prio = prio; + create_pthread(pth, do_update, ur); } static void __kill_instance(stratum_instance_t *client) @@ -929,7 +947,7 @@ static void connector_drop_client(ckpool_t *ckp, const int64_t id) LOGDEBUG("Stratifier requesting connector drop client %"PRId64, id); snprintf(buf, 255, "dropclient=%"PRId64, id); - async_send_proc(ckp, ckp->connector, buf); + send_proc(ckp->connector, buf); } static void drop_allclients(ckpool_t *ckp) @@ -1739,21 +1757,6 @@ static void ckmsgq_stats(ckmsgq_t *ckmsgq, const int size, json_t **val) JSON_CPACK(*val, "{si,si,si}", "count", objects, "memory", memsize, "generated", generated); } -static void ckwq_stats(ckwq_t *ckwq, const int size, json_t **val) -{ - int objects, generated; - int64_t memsize; - ckwqmsg_t *wqmsg; - - mutex_lock(ckwq->lock); - DL_COUNT(ckwq->wqmsgs, wqmsg, objects); - generated = ckwq->messages; - mutex_unlock(ckwq->lock); - - memsize = (sizeof(ckwqmsg_t) + size) * objects; - JSON_CPACK(*val, "{si,si,si}", "count", objects, "memory", memsize, "generated", generated); -} - static char *stratifier_stats(ckpool_t *ckp, sdata_t *sdata) { json_t *val = json_object(), *subval; @@ -1800,14 +1803,15 @@ static char *stratifier_stats(ckpool_t *ckp, sdata_t *sdata) ckmsgq_stats(sdata->ssends, sizeof(smsg_t), &subval); json_set_object(val, "ssends", subval); - /* Don't know exactly how big the string is so just count the pointer for now */ - ckwq_stats(sdata->ckwqs, sizeof(char *) + sizeof(void *), &subval); - json_set_object(val, "ckwqs", subval); + ckmsgq_stats(sdata->srecvs, sizeof(char *), &subval); + json_set_object(val, "srecvs", subval); if (!CKP_STANDALONE(ckp)) { ckmsgq_stats(sdata->ckdbq, sizeof(char *), &subval); json_set_object(val, "ckdbq", subval); } + ckmsgq_stats(sdata->stxnq, sizeof(json_params_t), &subval); + json_set_object(val, "stxnq", subval); buf = json_dumps(val, JSON_NO_UTF8 | JSON_PRESERVE_ORDER); json_decref(val); @@ -1890,7 +1894,7 @@ retry: /* The bulk of the messages will be received json from the * connector so look for this first. The srecv_process frees * the buf heap ram */ - ckwq_add(sdata->ckwqs, &srecv_process, buf); + ckmsgq_add(sdata->srecvs, buf); Close(sockd); buf = NULL; goto retry; @@ -3414,14 +3418,10 @@ static void suggest_diff(stratum_instance_t *client, const char *method, const j stratum_send_diff(sdata, client); } -static void sshare_process(ckpool_t *ckp, json_params_t *jp); -static void send_transactions(ckpool_t *ckp, json_params_t *jp); - /* Enter with client holding ref count */ static void parse_method(sdata_t *sdata, stratum_instance_t *client, const int64_t client_id, json_t *id_val, json_t *method_val, json_t *params_val, const char *address) { - ckpool_t *ckp = client->ckp; const char *method; /* Random broken clients send something not an integer as the id so we @@ -3431,7 +3431,7 @@ static void parse_method(sdata_t *sdata, stratum_instance_t *client, const int64 if (likely(cmdmatch(method, "mining.submit") && client->authorised)) { json_params_t *jp = create_json_params(client_id, method_val, params_val, id_val, address); - ckwq_add(sdata->ckwqs, &sshare_process, jp); + ckmsgq_add(sdata->sshareq, jp); return; } @@ -3468,14 +3468,14 @@ static void parse_method(sdata_t *sdata, stratum_instance_t *client, const int64 * to it since it's unauthorised. Set the flag just in case. */ client->authorised = false; snprintf(buf, 255, "passthrough=%"PRId64, client_id); - async_send_proc(ckp, ckp->connector, buf); + send_proc(client->ckp->connector, buf); return; } /* We should only accept subscribed requests from here on */ if (!client->subscribed) { LOGINFO("Dropping unsubscribed client %"PRId64, client_id); - connector_drop_client(ckp, client_id); + connector_drop_client(client->ckp, client_id); return; } @@ -3497,7 +3497,7 @@ static void parse_method(sdata_t *sdata, stratum_instance_t *client, const int64 * stratifier process to restart since it will have lost all * the stratum instance data. Clients will just reconnect. */ LOGINFO("Dropping unauthorised client %"PRId64, client_id); - connector_drop_client(ckp, client_id); + connector_drop_client(client->ckp, client_id); return; } @@ -3510,7 +3510,7 @@ static void parse_method(sdata_t *sdata, stratum_instance_t *client, const int64 if (cmdmatch(method, "mining.get")) { json_params_t *jp = create_json_params(client_id, method_val, params_val, id_val, address); - ckwq_add(sdata->ckwqs, &send_transactions, jp); + ckmsgq_add(sdata->stxnq, jp); return; } /* Unhandled message here */ @@ -3685,7 +3685,7 @@ static void ssend_process(ckpool_t *ckp, smsg_t *msg) * connector process to be delivered */ json_object_set_new_nocheck(msg->json_msg, "client_id", json_integer(msg->client_id)); s = json_dumps(msg->json_msg, 0); - async_send_proc(ckp, ckp->connector, s); + send_proc(ckp->connector, s); free(s); free_smsg(msg); } @@ -4543,10 +4543,14 @@ int stratifier(proc_instance_t *pi) mutex_init(&sdata->ckdb_lock); sdata->ssends = create_ckmsgq(ckp, "ssender", &ssend_process); - /* Create as many generic workqueue threads as there are CPUs */ - threads = sysconf(_SC_NPROCESSORS_ONLN); - ckp->ckwqs = sdata->ckwqs = create_ckwqs(ckp, "strat", threads); + /* Create half as many share processing threads as there are CPUs */ + threads = sysconf(_SC_NPROCESSORS_ONLN) / 2 ? : 1; + sdata->sshareq = create_ckmsgqs(ckp, "sprocessor", &sshare_process, threads); + /* Create 1/4 as many stratum processing threads as there are CPUs */ + threads = threads / 2 ? : 1; + sdata->srecvs = create_ckmsgqs(ckp, "sreceiver", &srecv_process, threads); sdata->sauthq = create_ckmsgq(ckp, "authoriser", &sauth_process); + sdata->stxnq = create_ckmsgq(ckp, "stxnq", &send_transactions); if (!CKP_STANDALONE(ckp)) { sdata->ckdbq = create_ckmsgq(ckp, "ckdbqueue", &ckdbq_process); create_pthread(&pth_heartbeat, ckdb_heartbeat, ckp);