From d75dd5543c24ae6b1aeb593beb78e67846019a8b Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Tue, 24 Feb 2015 22:26:37 +1100 Subject: [PATCH] Try async messages again --- src/ckpool.c | 70 +++++++++++++++++++++++++----------------------- src/ckpool.h | 4 --- src/connector.c | 10 +++---- src/generator.c | 28 +++++++++---------- src/stratifier.c | 25 +++++++++-------- 5 files changed, 65 insertions(+), 72 deletions(-) diff --git a/src/ckpool.c b/src/ckpool.c index ff303c50..63593b21 100644 --- a/src/ckpool.c +++ b/src/ckpool.c @@ -575,14 +575,32 @@ out: static void childsighandler(const int sig); -/* Send a single message to a process instance when there will be no response, - * closing the socket immediately. */ -void _send_proc(proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line) +struct proc_message { + proc_instance_t *pi; + char *msg; + const char *file; + const char *func; + int line; +}; + +/* Send all one way messages asynchronously so we can wait till the receiving + * end closes the socket to ensure all messages are received but no deadlocks + * can occur with 2 processes waiting for each other's socket closure. */ +void *async_send_proc(void *arg) { + struct proc_message *pm = (struct proc_message *)arg; + proc_instance_t *pi = pm->pi; + char *msg = pm->msg; + const char *file = pm->file; + const char *func = pm->func; + int line = pm->line; + char *path = pi->us.path; bool ret = false; int sockd; + pthread_detach(pthread_self()); + if (unlikely(!path || !strlen(path))) { LOGERR("Attempted to send message %s to null path in send_proc", msg ? msg : ""); goto out; @@ -593,14 +611,16 @@ void _send_proc(proc_instance_t *pi, const char *msg, const char *file, const ch } /* At startup the pid fields are not set up before some processes are * forked so they never inherit them. */ - if (unlikely(!pi->pid)) + if (unlikely(!pi->pid)) { pi->pid = get_proc_pid(pi); - if (!pi->pid) { - LOGALERT("Attempting to send message %s to non existent process %s", msg, pi->processname); - return; + if (!pi->pid) { + LOGALERT("Attempting to send message %s to non existent process %s", msg, pi->processname); + goto out_nofail; + } } if (unlikely(kill_pid(pi->pid, 0))) { - LOGALERT("Attempting to send message %s to non existent process %s", msg, pi->processname); + LOGALERT("Attempting to send message %s to non existent process %s pid %d", + msg, pi->processname, pi->pid); goto out; } sockd = open_unix_client(path); @@ -620,41 +640,25 @@ out: LOGERR("Failure in send_proc from %s %s:%d", file, func, line); childsighandler(15); } -} - -struct proc_message { - proc_instance_t *pi; - char *msg; - const char *file; - const char *func; - int line; -}; - -static void asp_send(ckpool_t __maybe_unused *ckp, struct proc_message *pm) -{ - _send_proc(pm->pi, pm->msg, pm->file, pm->func, pm->line); - free(pm->msg); +out_nofail: + free(msg); free(pm); + return NULL; } -/* Fore sending asynchronous messages to another process, the sending process - * must have ckwqs of its own, referenced in the ckpool structure */ -void _async_send_proc(ckpool_t *ckp, proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line) +/* Send a single message to a process instance when there will be no response, + * closing the socket immediately. */ +void _send_proc(proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line) { - struct proc_message *pm; + struct proc_message *pm = ckalloc(sizeof(struct proc_message)); + pthread_t pth; - if (unlikely(!ckp->ckwqs)) { - LOGALERT("Workqueues not set up in async_send_proc!"); - _send_proc(pi, msg, file, func, line); - return; - } - pm = ckzalloc(sizeof(struct proc_message)); pm->pi = pi; pm->msg = strdup(msg); pm->file = file; pm->func = func; pm->line = line; - ckwq_add(ckp->ckwqs, &asp_send, pm); + create_pthread(&pth, async_send_proc, pm); } /* Send a single message to a process instance and retrieve the response, then diff --git a/src/ckpool.h b/src/ckpool.h index e88aa71e..d840528e 100644 --- a/src/ckpool.h +++ b/src/ckpool.h @@ -210,8 +210,6 @@ struct ckpool_instance { /* Private data for each process */ void *data; - /* Private generic workqueues if this process has them */ - ckwq_t *ckwqs; }; #ifdef USE_CKDB @@ -236,8 +234,6 @@ void empty_buffer(connsock_t *cs); int read_socket_line(connsock_t *cs, const int timeout); void _send_proc(proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line); #define send_proc(pi, msg) _send_proc(pi, msg, __FILE__, __func__, __LINE__) -void _async_send_proc(ckpool_t *ckp, proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line); -#define async_send_proc(ckp, pi, msg) _async_send_proc(ckp, pi, msg, __FILE__, __func__, __LINE__) char *_send_recv_proc(proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line); #define send_recv_proc(pi, msg) _send_recv_proc(pi, msg, __FILE__, __func__, __LINE__) char *_send_recv_ckdb(const ckpool_t *ckp, const char *msg, const char *file, const char *func, const int line); diff --git a/src/connector.c b/src/connector.c index 3e568d36..ee6cf809 100644 --- a/src/connector.c +++ b/src/connector.c @@ -97,8 +97,6 @@ struct connector_data { /* For protecting the pending sends list */ mutex_t sender_lock; pthread_cond_t sender_cond; - - ckwq_t *ckwqs; }; typedef struct connector_data cdata_t; @@ -244,7 +242,7 @@ static void stratifier_drop_client(ckpool_t *ckp, const int64_t id) char buf[256]; sprintf(buf, "dropclient=%"PRId64, id); - async_send_proc(ckp, ckp->stratifier, buf); + send_proc(ckp->stratifier, buf); } /* Invalidate this instance. Remove them from the hashtables we look up @@ -362,9 +360,9 @@ reparse: * filtered by the stratifier. */ if (likely(client->fd != -1)) { if (ckp->passthrough) - async_send_proc(ckp, ckp->generator, s); + send_proc(ckp->generator, s); else - async_send_proc(ckp, ckp->stratifier, s); + send_proc(ckp->stratifier, s); } free(s); @@ -870,8 +868,6 @@ int connector(proc_instance_t *pi) LOGWARNING("%s connector starting", ckp->name); ckp->data = cdata; cdata->ckp = ckp; - /* Connector only requires one work queue */ - ckp->ckwqs = cdata->ckwqs = create_ckwqs(ckp, "conn", 1); if (!ckp->serverurls) cdata->serverfd = ckalloc(sizeof(int *)); diff --git a/src/generator.c b/src/generator.c index 7b4e5141..98e9a00d 100644 --- a/src/generator.c +++ b/src/generator.c @@ -148,7 +148,6 @@ struct generator_data { proxy_instance_t *dead_proxies; /* Disabled proxies */ int proxy_notify_id; // Globally increasing notify id ckmsgq_t *srvchk; // Server check message queue - ckwq_t *ckwqs; }; typedef struct generator_data gdata_t; @@ -238,7 +237,7 @@ retry: cs = &alive->cs; LOGINFO("Connected to live server %s:%s", cs->url, cs->port); out: - async_send_proc(ckp, ckp->connector, alive ? "accept" : "reject"); + send_proc(ckp->connector, alive ? "accept" : "reject"); return alive; } @@ -384,7 +383,7 @@ retry: ret = submit_block(cs, buf + 12 + 64 + 1); memset(buf + 12 + 64, 0, 1); sprintf(blockmsg, "%sblock:%s", ret ? "" : "no", buf + 12); - async_send_proc(ckp, ckp->stratifier, blockmsg); + send_proc(ckp->stratifier, blockmsg); } else if (cmdmatch(buf, "checkaddr:")) { if (validate_address(cs, buf + 10)) send_unix_msg(sockd, "true"); @@ -949,7 +948,7 @@ static void send_stratifier_deadproxy(ckpool_t *ckp, const int64_t id, const int char buf[256]; sprintf(buf, "deadproxy=%ld:%d", id, subid); - async_send_proc(ckp, ckp->stratifier, buf); + send_proc(ckp->stratifier, buf); } /* Remove the subproxy from the proxi list and put it on the dead list. @@ -1088,7 +1087,7 @@ static void send_diff(ckpool_t *ckp, proxy_instance_t *proxi) json_decref(json_msg); ASPRINTF(&buf, "diff=%s", msg); free(msg); - async_send_proc(ckp, ckp->stratifier, buf); + send_proc(ckp->stratifier, buf); free(buf); } @@ -1116,7 +1115,7 @@ static void send_notify(ckpool_t *ckp, proxy_instance_t *proxi, notify_instance_ json_decref(json_msg); ASPRINTF(&buf, "notify=%s", msg); free(msg); - async_send_proc(ckp, ckp->stratifier, buf); + send_proc(ckp->stratifier, buf); free(buf); /* Send diff now as stratifier will not accept diff till it has a @@ -1309,7 +1308,7 @@ static void send_subscribe(ckpool_t *ckp, proxy_instance_t *proxi) json_decref(json_msg); ASPRINTF(&buf, "subscribe=%s", msg); free(msg); - async_send_proc(ckp, ckp->stratifier, buf); + send_proc(ckp->stratifier, buf); free(buf); } @@ -1356,7 +1355,7 @@ static void stratifier_reconnect_client(ckpool_t *ckp, const int64_t id) char buf[256]; sprintf(buf, "reconnclient=%"PRId64, id); - async_send_proc(ckp, ckp->stratifier, buf); + send_proc(ckp->stratifier, buf); } static void submit_share(gdata_t *gdata, json_t *val) @@ -1779,9 +1778,9 @@ static void reconnect_proxy(proxy_instance_t *proxi) create_pthread(&pth, proxy_reconnect, proxi); } -static void reconnect_generator(ckpool_t *ckp) +static void reconnect_generator(const ckpool_t *ckp) { - async_send_proc(ckp, ckp->generator, "reconnect"); + send_proc(ckp->generator, "reconnect"); } /* For receiving messages from an upstream pool to pass downstream. Responsible @@ -1838,7 +1837,7 @@ static void *passthrough_recv(void *arg) /* Simply forward the message on, as is, to the connector to * process. Possibly parse parameters sent by upstream pool * here */ - async_send_proc(ckp, ckp->connector, cs->buf); + send_proc(ckp->connector, cs->buf); } return NULL; } @@ -2046,10 +2045,10 @@ static proxy_instance_t *wait_best_proxy(ckpool_t *ckp, gdata_t *gdata) if (ret) break; - async_send_proc(ckp, ckp->connector, "reject"); + send_proc(ckp->connector, "reject"); sleep(1); } - async_send_proc(ckp, ckp->connector, ret ? "accept" : "reject"); + send_proc(ckp->connector, ret ? "accept" : "reject"); return ret; } @@ -2078,7 +2077,7 @@ reconnect: proxi->low_id, cs->url, cs->port); dealloc(buf); ASPRINTF(&buf, "proxy=%ld", proxi->id); - async_send_proc(ckp, ckp->stratifier, buf); + send_proc(ckp->stratifier, buf); } } retry: @@ -2267,7 +2266,6 @@ int generator(proc_instance_t *pi) gdata = ckzalloc(sizeof(gdata_t)); ckp->data = gdata; gdata->ckp = ckp; - ckp->ckwqs = gdata->ckwqs = create_ckwqs(ckp, "gen", 1); if (ckp->proxy) { char *buf = NULL; diff --git a/src/stratifier.c b/src/stratifier.c index 87464e6c..f09494ad 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -828,7 +828,7 @@ static char *send_recv_generator(ckpool_t *ckp, const char *msg, const int prio) return buf; } -static void send_generator(ckpool_t *ckp, const char *msg, const int prio) +static void send_generator(const ckpool_t *ckp, const char *msg, const int prio) { sdata_t *sdata = ckp->data; bool set; @@ -838,7 +838,7 @@ static void send_generator(ckpool_t *ckp, const char *msg, const int prio) set = true; } else set = false; - async_send_proc(ckp, ckp->generator, msg); + send_proc(ckp->generator, msg); if (set) sdata->gen_priority = 0; } @@ -963,7 +963,7 @@ static void connector_drop_client(ckpool_t *ckp, const int64_t id) LOGDEBUG("Stratifier requesting connector drop client %"PRId64, id); snprintf(buf, 255, "dropclient=%"PRId64, id); - async_send_proc(ckp, ckp->connector, buf); + send_proc(ckp->connector, buf); } static void drop_allclients(ckpool_t *ckp) @@ -1009,8 +1009,8 @@ static sdata_t *duplicate_sdata(const sdata_t *sdata) memcpy(dsdata->donkeytxnbin, sdata->donkeytxnbin, 25); /* Use the same work queues for all subproxies */ - dsdata->ckwqs = sdata->ckwqs; dsdata->ssends = sdata->ssends; + dsdata->ckwqs = sdata->ckwqs; dsdata->ckdbq = sdata->ckdbq; dsdata->sauthq = sdata->sauthq; @@ -1136,7 +1136,7 @@ out_unlock: static void reconnect_client(sdata_t *sdata, stratum_instance_t *client); -static void generator_recruit(ckpool_t *ckp, const int recruits) +static void generator_recruit(const ckpool_t *ckp, const int recruits) { char buf[256]; @@ -1774,7 +1774,7 @@ static void connector_test_client(ckpool_t *ckp, const int64_t id) LOGDEBUG("Stratifier requesting connector test client %"PRId64, id); snprintf(buf, 255, "testclient=%"PRId64, id); - async_send_proc(ckp, ckp->connector, buf); + send_proc(ckp->connector, buf); } /* For creating a list of sends without locking that can then be concatenated @@ -2531,7 +2531,7 @@ static void stratum_send_message(sdata_t *sdata, const stratum_instance_t *clien * in proxy mode where we find a subproxy based on the current proxy with room * for more clients. Signal the generator to recruit more subproxies if we are * running out of room. */ -static sdata_t *select_sdata(ckpool_t *ckp, sdata_t *ckp_sdata) +static sdata_t *select_sdata(const ckpool_t *ckp, sdata_t *ckp_sdata) { proxy_t *current, *proxy, *subproxy, *best = NULL, *tmp, *tmpsub; int best_subid = 0, best_lowid; @@ -3938,7 +3938,6 @@ static void send_transactions(ckpool_t *ckp, json_params_t *jp); static void parse_method(sdata_t *sdata, stratum_instance_t *client, const int64_t client_id, json_t *id_val, json_t *method_val, json_t *params_val, const char *address) { - ckpool_t *ckp = client->ckp; const char *method; /* Random broken clients send something not an integer as the id so we @@ -3985,14 +3984,14 @@ static void parse_method(sdata_t *sdata, stratum_instance_t *client, const int64 * to it since it's unauthorised. Set the flag just in case. */ client->authorised = false; snprintf(buf, 255, "passthrough=%"PRId64, client_id); - async_send_proc(ckp, ckp->connector, buf); + send_proc(client->ckp->connector, buf); return; } /* We should only accept subscribed requests from here on */ if (!client->subscribed) { LOGINFO("Dropping unsubscribed client %"PRId64, client_id); - connector_drop_client(ckp, client_id); + connector_drop_client(client->ckp, client_id); return; } @@ -4014,7 +4013,7 @@ static void parse_method(sdata_t *sdata, stratum_instance_t *client, const int64 * stratifier process to restart since it will have lost all * the stratum instance data. Clients will just reconnect. */ LOGINFO("Dropping unauthorised client %"PRId64, client_id); - connector_drop_client(ckp, client_id); + connector_drop_client(client->ckp, client_id); return; } @@ -4188,7 +4187,7 @@ static void ssend_process(ckpool_t *ckp, smsg_t *msg) * connector process to be delivered */ json_object_set_new_nocheck(msg->json_msg, "client_id", json_integer(msg->client_id)); s = json_dumps(msg->json_msg, 0); - async_send_proc(ckp, ckp->connector, s); + send_proc(ckp->connector, s); free(s); free_smsg(msg); } @@ -5051,7 +5050,7 @@ int stratifier(proc_instance_t *pi) sdata->ssends = create_ckmsgq(ckp, "ssender", &ssend_process); /* Create as many generic workqueue threads as there are CPUs */ threads = sysconf(_SC_NPROCESSORS_ONLN); - ckp->ckwqs = sdata->ckwqs = create_ckwqs(ckp, "strat", threads); + sdata->ckwqs = create_ckwqs(ckp, "strat", threads); sdata->sauthq = create_ckmsgq(ckp, "authoriser", &sauth_process); if (!CKP_STANDALONE(ckp)) { sdata->ckdbq = create_ckmsgq(ckp, "ckdbqueue", &ckdbq_process);