From ed6febe6dbaf59f1223f3de083da785d5f4a0b6b Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Tue, 17 Mar 2015 16:06:19 +1100 Subject: [PATCH] Handle proxy sends according to which socket is ready to receive data --- src/generator.c | 84 ++++++++++++++++++++++++++++++++++++------------- 1 file changed, 63 insertions(+), 21 deletions(-) diff --git a/src/generator.c b/src/generator.c index a2eacfe5..cd6bf604 100644 --- a/src/generator.c +++ b/src/generator.c @@ -1491,12 +1491,59 @@ out: return ret; } +typedef struct cs_msg cs_msg_t; + +struct cs_msg { + cs_msg_t *next; + cs_msg_t *prev; + proxy_instance_t *proxy; + json_t *val; +}; + +/* Sends all messages in the queue ready to be dispatched, leaving those that + * fail write select to be handled next pass */ +static void send_json_msgq(gdata_t *gdata, cs_msg_t **csmsgq) +{ + cs_msg_t *csmsg, *tmp; + int ret; + + DL_FOREACH_SAFE(*csmsgq, csmsg, tmp) { + connsock_t *cs = &(csmsg->proxy->cs); + + if ((ret = wait_write_select(cs->fd, 0))) { + DL_DELETE(*csmsgq, csmsg); + if (ret > 0) + ret = send_json_msg(cs, csmsg->val); + json_decref(csmsg->val); + if (ret < 1) { + proxy_instance_t *proxy = csmsg->proxy; + + LOGNOTICE("Proxy %d:%d %s failed to send msg in send_json_msgq, dropping", + proxy->id, proxy->subid, proxy->url); + disable_subproxy(gdata, proxy->parent, proxy); + } + free(csmsg); + } + } +} + +static void add_json_msgq(cs_msg_t **csmsgq, proxy_instance_t *proxy, json_t **val) +{ + cs_msg_t *csmsg = ckalloc(sizeof(cs_msg_t)); + + csmsg->val = *val; + *val = NULL; + csmsg->proxy = proxy; + DL_APPEND(*csmsgq, csmsg); +} + /* For processing and sending shares. proxy refers to parent proxy here */ static void *proxy_send(void *arg) { ckpool_t *ckp = (ckpool_t *)arg; gdata_t *gdata = ckp->data; stratum_msg_t *msg = NULL; + cs_msg_t *csmsgq = NULL; rename_proc("proxysend"); @@ -1508,15 +1555,7 @@ static void *proxy_send(void *arg) int64_t client_id = 0, id; notify_instance_t *ni; json_t *jobid = NULL; - bool ret = true; - connsock_t *cs; json_t *val; - tv_t now; - ts_t abs; - - tv_time(&now); - tv_to_ts(&abs, &now); - abs.tv_sec++; if (unlikely(msg)) { json_decref(msg->json_msg); @@ -1524,15 +1563,24 @@ static void *proxy_send(void *arg) } mutex_lock(&gdata->psend_lock); - if (!gdata->psends) - cond_timedwait(&gdata->psend_cond, &gdata->psend_lock, &abs); + if (!gdata->psends) { + /* Poll every 10ms */ + const ts_t polltime = {0, 10000000}; + ts_t timeout_ts; + + ts_realtime(&timeout_ts); + timeraddspec(&timeout_ts, &polltime); + cond_timedwait(&gdata->psend_cond, &gdata->psend_lock, &timeout_ts); + } msg = gdata->psends; if (likely(msg)) DL_DELETE(gdata->psends, msg); mutex_unlock(&gdata->psend_lock); - if (!msg) + if (!msg) { + send_json_msgq(gdata, &csmsgq); continue; + } if (unlikely(!json_get_int(&subid, msg->json_msg, "subproxy"))) { LOGWARNING("Failed to find subproxy in proxy_send msg"); @@ -1572,24 +1620,18 @@ static void *proxy_send(void *arg) if (unlikely(!jobid)) { stratifier_reconnect_client(ckp, client_id); LOGNOTICE("Proxy %d:%s failed to find matching jobid in proxysend", - proxy->id, proxy->url); + subproxy->id, subproxy->url); continue; } - cs = &subproxy->cs; - JSON_CPACK(val, "{s[soooo]soss}", "params", proxy->auth, jobid, + JSON_CPACK(val, "{s[soooo]soss}", "params", subproxy->auth, jobid, json_object_dup(msg->json_msg, "nonce2"), json_object_dup(msg->json_msg, "ntime"), json_object_dup(msg->json_msg, "nonce"), "id", json_object_dup(msg->json_msg, "id"), "method", "mining.submit"); - ret = send_json_msg(cs, val); - json_decref(val); - if (!ret) { - LOGNOTICE("Proxy %d:%d %s failed to send msg in proxy_send, dropping to reconnect", - proxy->id, subid, proxy->url); - disable_subproxy(gdata, proxy, subproxy); - } + add_json_msgq(&csmsgq, subproxy, &val); + send_json_msgq(gdata, &csmsgq); } return NULL; }