diff --git a/src/ckpool.c b/src/ckpool.c index 63593b21..ff303c50 100644 --- a/src/ckpool.c +++ b/src/ckpool.c @@ -575,32 +575,14 @@ out: static void childsighandler(const int sig); -struct proc_message { - proc_instance_t *pi; - char *msg; - const char *file; - const char *func; - int line; -}; - -/* Send all one way messages asynchronously so we can wait till the receiving - * end closes the socket to ensure all messages are received but no deadlocks - * can occur with 2 processes waiting for each other's socket closure. */ -void *async_send_proc(void *arg) +/* Send a single message to a process instance when there will be no response, + * closing the socket immediately. */ +void _send_proc(proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line) { - struct proc_message *pm = (struct proc_message *)arg; - proc_instance_t *pi = pm->pi; - char *msg = pm->msg; - const char *file = pm->file; - const char *func = pm->func; - int line = pm->line; - char *path = pi->us.path; bool ret = false; int sockd; - pthread_detach(pthread_self()); - if (unlikely(!path || !strlen(path))) { LOGERR("Attempted to send message %s to null path in send_proc", msg ? msg : ""); goto out; @@ -611,16 +593,14 @@ void *async_send_proc(void *arg) } /* At startup the pid fields are not set up before some processes are * forked so they never inherit them. */ - if (unlikely(!pi->pid)) { + if (unlikely(!pi->pid)) pi->pid = get_proc_pid(pi); - if (!pi->pid) { - LOGALERT("Attempting to send message %s to non existent process %s", msg, pi->processname); - goto out_nofail; - } + if (!pi->pid) { + LOGALERT("Attempting to send message %s to non existent process %s", msg, pi->processname); + return; } if (unlikely(kill_pid(pi->pid, 0))) { - LOGALERT("Attempting to send message %s to non existent process %s pid %d", - msg, pi->processname, pi->pid); + LOGALERT("Attempting to send message %s to non existent process %s", msg, pi->processname); goto out; } sockd = open_unix_client(path); @@ -640,25 +620,41 @@ out: LOGERR("Failure in send_proc from %s %s:%d", file, func, line); childsighandler(15); } -out_nofail: - free(msg); +} + +struct proc_message { + proc_instance_t *pi; + char *msg; + const char *file; + const char *func; + int line; +}; + +static void asp_send(ckpool_t __maybe_unused *ckp, struct proc_message *pm) +{ + _send_proc(pm->pi, pm->msg, pm->file, pm->func, pm->line); + free(pm->msg); free(pm); - return NULL; } -/* Send a single message to a process instance when there will be no response, - * closing the socket immediately. */ -void _send_proc(proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line) +/* Fore sending asynchronous messages to another process, the sending process + * must have ckwqs of its own, referenced in the ckpool structure */ +void _async_send_proc(ckpool_t *ckp, proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line) { - struct proc_message *pm = ckalloc(sizeof(struct proc_message)); - pthread_t pth; + struct proc_message *pm; + if (unlikely(!ckp->ckwqs)) { + LOGALERT("Workqueues not set up in async_send_proc!"); + _send_proc(pi, msg, file, func, line); + return; + } + pm = ckzalloc(sizeof(struct proc_message)); pm->pi = pi; pm->msg = strdup(msg); pm->file = file; pm->func = func; pm->line = line; - create_pthread(&pth, async_send_proc, pm); + ckwq_add(ckp->ckwqs, &asp_send, pm); } /* Send a single message to a process instance and retrieve the response, then diff --git a/src/ckpool.h b/src/ckpool.h index d840528e..e88aa71e 100644 --- a/src/ckpool.h +++ b/src/ckpool.h @@ -210,6 +210,8 @@ struct ckpool_instance { /* Private data for each process */ void *data; + /* Private generic workqueues if this process has them */ + ckwq_t *ckwqs; }; #ifdef USE_CKDB @@ -234,6 +236,8 @@ void empty_buffer(connsock_t *cs); int read_socket_line(connsock_t *cs, const int timeout); void _send_proc(proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line); #define send_proc(pi, msg) _send_proc(pi, msg, __FILE__, __func__, __LINE__) +void _async_send_proc(ckpool_t *ckp, proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line); +#define async_send_proc(ckp, pi, msg) _async_send_proc(ckp, pi, msg, __FILE__, __func__, __LINE__) char *_send_recv_proc(proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line); #define send_recv_proc(pi, msg) _send_recv_proc(pi, msg, __FILE__, __func__, __LINE__) char *_send_recv_ckdb(const ckpool_t *ckp, const char *msg, const char *file, const char *func, const int line); diff --git a/src/connector.c b/src/connector.c index ee6cf809..3e568d36 100644 --- a/src/connector.c +++ b/src/connector.c @@ -97,6 +97,8 @@ struct connector_data { /* For protecting the pending sends list */ mutex_t sender_lock; pthread_cond_t sender_cond; + + ckwq_t *ckwqs; }; typedef struct connector_data cdata_t; @@ -242,7 +244,7 @@ static void stratifier_drop_client(ckpool_t *ckp, const int64_t id) char buf[256]; sprintf(buf, "dropclient=%"PRId64, id); - send_proc(ckp->stratifier, buf); + async_send_proc(ckp, ckp->stratifier, buf); } /* Invalidate this instance. Remove them from the hashtables we look up @@ -360,9 +362,9 @@ reparse: * filtered by the stratifier. */ if (likely(client->fd != -1)) { if (ckp->passthrough) - send_proc(ckp->generator, s); + async_send_proc(ckp, ckp->generator, s); else - send_proc(ckp->stratifier, s); + async_send_proc(ckp, ckp->stratifier, s); } free(s); @@ -868,6 +870,8 @@ int connector(proc_instance_t *pi) LOGWARNING("%s connector starting", ckp->name); ckp->data = cdata; cdata->ckp = ckp; + /* Connector only requires one work queue */ + ckp->ckwqs = cdata->ckwqs = create_ckwqs(ckp, "conn", 1); if (!ckp->serverurls) cdata->serverfd = ckalloc(sizeof(int *)); diff --git a/src/generator.c b/src/generator.c index 8e270d1f..d76f31f3 100644 --- a/src/generator.c +++ b/src/generator.c @@ -146,6 +146,7 @@ struct generator_data { proxy_instance_t *dead_proxies; /* Disabled proxies */ int proxy_notify_id; // Globally increasing notify id ckmsgq_t *srvchk; // Server check message queue + ckwq_t *ckwqs; }; typedef struct generator_data gdata_t; @@ -235,7 +236,7 @@ retry: cs = &alive->cs; LOGINFO("Connected to live server %s:%s", cs->url, cs->port); out: - send_proc(ckp->connector, alive ? "accept" : "reject"); + async_send_proc(ckp, ckp->connector, alive ? "accept" : "reject"); return alive; } @@ -381,7 +382,7 @@ retry: ret = submit_block(cs, buf + 12 + 64 + 1); memset(buf + 12 + 64, 0, 1); sprintf(blockmsg, "%sblock:%s", ret ? "" : "no", buf + 12); - send_proc(ckp->stratifier, blockmsg); + async_send_proc(ckp, ckp->stratifier, blockmsg); } else if (cmdmatch(buf, "checkaddr:")) { if (validate_address(cs, buf + 10)) send_unix_msg(sockd, "true"); @@ -944,7 +945,7 @@ static void send_stratifier_deadproxy(ckpool_t *ckp, const int id, const int sub char buf[256]; sprintf(buf, "deadproxy=%d:%d", id, subid); - send_proc(ckp->stratifier, buf); + async_send_proc(ckp, ckp->stratifier, buf); } /* Remove the subproxy from the proxi list and put it on the dead list. @@ -1091,7 +1092,7 @@ static void send_diff(ckpool_t *ckp, proxy_instance_t *proxi) json_decref(json_msg); ASPRINTF(&buf, "diff=%s", msg); free(msg); - send_proc(ckp->stratifier, buf); + async_send_proc(ckp, ckp->stratifier, buf); free(buf); } @@ -1119,7 +1120,7 @@ static void send_notify(ckpool_t *ckp, proxy_instance_t *proxi, notify_instance_ json_decref(json_msg); ASPRINTF(&buf, "notify=%s", msg); free(msg); - send_proc(ckp->stratifier, buf); + async_send_proc(ckp, ckp->stratifier, buf); free(buf); /* Send diff now as stratifier will not accept diff till it has a @@ -1309,7 +1310,7 @@ static void send_subscribe(ckpool_t *ckp, proxy_instance_t *proxi) json_decref(json_msg); ASPRINTF(&buf, "subscribe=%s", msg); free(msg); - send_proc(ckp->stratifier, buf); + async_send_proc(ckp, ckp->stratifier, buf); free(buf); } @@ -1355,7 +1356,7 @@ static void stratifier_reconnect_client(ckpool_t *ckp, const int64_t id) char buf[256]; sprintf(buf, "reconnclient=%"PRId64, id); - send_proc(ckp->stratifier, buf); + async_send_proc(ckp, ckp->stratifier, buf); } static void submit_share(gdata_t *gdata, json_t *val) @@ -1731,9 +1732,9 @@ static void reconnect_proxy(proxy_instance_t *proxi) create_pthread(&pth, proxy_reconnect, proxi); } -static void reconnect_generator(const ckpool_t *ckp) +static void reconnect_generator(ckpool_t *ckp) { - send_proc(ckp->generator, "reconnect"); + async_send_proc(ckp, ckp->generator, "reconnect"); } /* For receiving messages from an upstream pool to pass downstream. Responsible @@ -1790,7 +1791,7 @@ static void *passthrough_recv(void *arg) /* Simply forward the message on, as is, to the connector to * process. Possibly parse parameters sent by upstream pool * here */ - send_proc(ckp->connector, cs->buf); + async_send_proc(ckp, ckp->connector, cs->buf); } return NULL; } @@ -1995,10 +1996,10 @@ static proxy_instance_t *wait_best_proxy(ckpool_t *ckp, gdata_t *gdata) if (ret) break; - send_proc(ckp->connector, "reject"); + async_send_proc(ckp, ckp->connector, "reject"); sleep(1); } - send_proc(ckp->connector, ret ? "accept" : "reject"); + async_send_proc(ckp, ckp->connector, ret ? "accept" : "reject"); return ret; } @@ -2027,7 +2028,7 @@ reconnect: proxi->id, cs->url, cs->port); dealloc(buf); ASPRINTF(&buf, "proxy=%d", proxi->id); - send_proc(ckp->stratifier, buf); + async_send_proc(ckp, ckp->stratifier, buf); } } retry: @@ -2217,6 +2218,7 @@ int generator(proc_instance_t *pi) gdata = ckzalloc(sizeof(gdata_t)); ckp->data = gdata; gdata->ckp = ckp; + ckp->ckwqs = gdata->ckwqs = create_ckwqs(ckp, "gen", 1); if (ckp->proxy) { char *buf = NULL; diff --git a/src/stratifier.c b/src/stratifier.c index dd129bf7..a96b1526 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -831,7 +831,7 @@ static char *send_recv_generator(ckpool_t *ckp, const char *msg, const int prio) return buf; } -static void send_generator(const ckpool_t *ckp, const char *msg, const int prio) +static void send_generator(ckpool_t *ckp, const char *msg, const int prio) { sdata_t *sdata = ckp->data; bool set; @@ -841,7 +841,7 @@ static void send_generator(const ckpool_t *ckp, const char *msg, const int prio) set = true; } else set = false; - send_proc(ckp->generator, msg); + async_send_proc(ckp, ckp->generator, msg); if (set) sdata->gen_priority = 0; } @@ -966,7 +966,7 @@ static void connector_drop_client(ckpool_t *ckp, const int64_t id) LOGDEBUG("Stratifier requesting connector drop client %"PRId64, id); snprintf(buf, 255, "dropclient=%"PRId64, id); - send_proc(ckp->connector, buf); + async_send_proc(ckp, ckp->connector, buf); } static void drop_allclients(ckpool_t *ckp) @@ -1012,8 +1012,8 @@ static sdata_t *duplicate_sdata(const sdata_t *sdata) memcpy(dsdata->donkeytxnbin, sdata->donkeytxnbin, 25); /* Use the same work queues for all subproxies */ - dsdata->ssends = sdata->ssends; dsdata->ckwqs = sdata->ckwqs; + dsdata->ssends = sdata->ssends; dsdata->ckdbq = sdata->ckdbq; dsdata->sauthq = sdata->sauthq; @@ -1139,7 +1139,7 @@ out_unlock: static void reconnect_client(sdata_t *sdata, stratum_instance_t *client); -static void generator_recruit(const ckpool_t *ckp) +static void generator_recruit(ckpool_t *ckp) { LOGINFO("Stratifer requesting more proxies from generator"); send_generator(ckp, "recruit", GEN_PRIORITY); @@ -1778,7 +1778,7 @@ static void connector_test_client(ckpool_t *ckp, const int64_t id) LOGDEBUG("Stratifier requesting connector test client %"PRId64, id); snprintf(buf, 255, "testclient=%"PRId64, id); - send_proc(ckp->connector, buf); + async_send_proc(ckp, ckp->connector, buf); } /* For creating a list of sends without locking that can then be concatenated @@ -2564,7 +2564,7 @@ static void stratum_send_message(sdata_t *sdata, const stratum_instance_t *clien * in proxy mode where we find a subproxy based on the current proxy with room * for more clients. Signal the generator to recruit more subproxies if we are * running out of room. */ -static sdata_t *select_sdata(const ckpool_t *ckp, sdata_t *ckp_sdata) +static sdata_t *select_sdata(ckpool_t *ckp, sdata_t *ckp_sdata) { proxy_t *current, *proxy, *subproxy, *best = NULL, *tmp, *tmpsub; int best_id, best_subid = 0; @@ -3965,6 +3965,7 @@ static void send_transactions(ckpool_t *ckp, json_params_t *jp); static void parse_method(sdata_t *sdata, stratum_instance_t *client, const int64_t client_id, json_t *id_val, json_t *method_val, json_t *params_val, const char *address) { + ckpool_t *ckp = client->ckp; const char *method; /* Random broken clients send something not an integer as the id so we @@ -4011,14 +4012,14 @@ static void parse_method(sdata_t *sdata, stratum_instance_t *client, const int64 * to it since it's unauthorised. Set the flag just in case. */ client->authorised = false; snprintf(buf, 255, "passthrough=%"PRId64, client_id); - send_proc(client->ckp->connector, buf); + async_send_proc(ckp, ckp->connector, buf); return; } /* We should only accept subscribed requests from here on */ if (!client->subscribed) { LOGINFO("Dropping unsubscribed client %"PRId64, client_id); - connector_drop_client(client->ckp, client_id); + connector_drop_client(ckp, client_id); return; } @@ -4040,7 +4041,7 @@ static void parse_method(sdata_t *sdata, stratum_instance_t *client, const int64 * stratifier process to restart since it will have lost all * the stratum instance data. Clients will just reconnect. */ LOGINFO("Dropping unauthorised client %"PRId64, client_id); - connector_drop_client(client->ckp, client_id); + connector_drop_client(ckp, client_id); return; } @@ -4214,7 +4215,7 @@ static void ssend_process(ckpool_t *ckp, smsg_t *msg) * connector process to be delivered */ json_object_set_new_nocheck(msg->json_msg, "client_id", json_integer(msg->client_id)); s = json_dumps(msg->json_msg, 0); - send_proc(ckp->connector, s); + async_send_proc(ckp, ckp->connector, s); free(s); free_smsg(msg); } @@ -5077,7 +5078,7 @@ int stratifier(proc_instance_t *pi) sdata->ssends = create_ckmsgq(ckp, "ssender", &ssend_process); /* Create as many generic workqueue threads as there are CPUs */ threads = sysconf(_SC_NPROCESSORS_ONLN); - sdata->ckwqs = create_ckwqs(ckp, "strat", threads); + ckp->ckwqs = sdata->ckwqs = create_ckwqs(ckp, "strat", threads); sdata->sauthq = create_ckmsgq(ckp, "authoriser", &sauth_process); if (!CKP_STANDALONE(ckp)) { sdata->ckdbq = create_ckmsgq(ckp, "ckdbqueue", &ckdbq_process);