diff --git a/src/generator.c b/src/generator.c index c3b1b746..02ff825b 100644 --- a/src/generator.c +++ b/src/generator.c @@ -115,11 +115,6 @@ struct proxy_instance { notify_instance_t *notify_instances; pthread_t pth_precv; - pthread_t pth_psend; - mutex_t psend_lock; - pthread_cond_t psend_cond; - - stratum_msg_t *psends; mutex_t share_lock; share_msg_t *shares; @@ -150,7 +145,12 @@ struct generator_data { int proxy_notify_id; // Globally increasing notify id ckmsgq_t *srvchk; // Server check message queue pthread_t pth_uprecv; // User proxy receive thread - pthread_t pth_upsend; // User proxy send thread + pthread_t pth_psend; // Combined proxy send thread + + mutex_t psend_lock; // Lock associated with conditional below + pthread_cond_t psend_cond; + + stratum_msg_t *psends; }; typedef struct generator_data gdata_t; @@ -1429,10 +1429,10 @@ static void submit_share(gdata_t *gdata, json_t *val) json_object_set_nocheck(val, "id", json_integer(share->id)); /* Add the new message to the psend list */ - mutex_lock(&proxy->psend_lock); - DL_APPEND(proxy->psends, msg); - pthread_cond_signal(&proxy->psend_cond); - mutex_unlock(&proxy->psend_lock); + mutex_lock(&gdata->psend_lock); + DL_APPEND(gdata->psends, msg); + pthread_cond_signal(&gdata->psend_cond); + mutex_unlock(&gdata->psend_lock); out: if (!success) @@ -1494,9 +1494,7 @@ out: /* For processing and sending shares. proxy refers to parent proxy here */ static void *proxy_send(void *arg) { - proxy_instance_t *proxy = (proxy_instance_t *)arg; - connsock_t *cs = &proxy->cs; - ckpool_t *ckp = cs->ckp; + ckpool_t *ckp = (ckpool_t *)arg; gdata_t *gdata = ckp->data; stratum_msg_t *msg = NULL; @@ -1505,22 +1503,17 @@ static void *proxy_send(void *arg) pthread_detach(pthread_self()); while (42) { - proxy_instance_t *subproxy; + proxy_instance_t *proxy, *subproxy; int proxyid = 0, subid = 0; int64_t client_id = 0, id; notify_instance_t *ni; json_t *jobid = NULL; bool ret = true; + connsock_t *cs; json_t *val; tv_t now; ts_t abs; - if (unlikely(proxy->reconnect)) { - LOGINFO("Shutting down proxy_send thread for proxy %d to reconnect", - proxy->id); - break; - } - tv_time(&now); tv_to_ts(&abs, &now); abs.tv_sec++; @@ -1530,13 +1523,13 @@ static void *proxy_send(void *arg) free(msg); } - mutex_lock(&proxy->psend_lock); - if (!proxy->psends) - cond_timedwait(&proxy->psend_cond, &proxy->psend_lock, &abs); - msg = proxy->psends; + mutex_lock(&gdata->psend_lock); + if (!gdata->psends) + cond_timedwait(&gdata->psend_cond, &gdata->psend_lock, &abs); + msg = gdata->psends; if (likely(msg)) - DL_DELETE(proxy->psends, msg); - mutex_unlock(&proxy->psend_lock); + DL_DELETE(gdata->psends, msg); + mutex_unlock(&gdata->psend_lock); if (!msg) continue; @@ -1557,9 +1550,11 @@ static void *proxy_send(void *arg) LOGWARNING("Failed to find client_id in proxy_send msg"); continue; } - if (unlikely(proxyid != proxy->id)) { - LOGWARNING("Proxysend for proxy %d got message for proxy %d!", - proxy->id, proxyid); + proxy = proxy_by_id(gdata, proxyid); + if (unlikely(!proxy)) { + LOGWARNING("Proxysend for got message for non-existent proxy %d", + proxyid); + continue; } mutex_lock(&proxy->notify_lock); @@ -1569,9 +1564,13 @@ static void *proxy_send(void *arg) mutex_unlock(&proxy->notify_lock); subproxy = subproxy_by_id(proxy, subid); - if (subproxy) - cs = &subproxy->cs; - if (jobid && subproxy) { + if (unlikely(!subproxy)) { + LOGWARNING("Proxysend for got message for non-existent subproxy %d:%d", + proxyid, subid); + continue; + } + cs = &subproxy->cs; + if (jobid) { JSON_CPACK(val, "{s[soooo]soss}", "params", proxy->auth, jobid, json_object_dup(msg->json_msg, "nonce2"), json_object_dup(msg->json_msg, "ntime"), @@ -1580,16 +1579,12 @@ static void *proxy_send(void *arg) "method", "mining.submit"); ret = send_json_msg(cs, val); json_decref(val); - } else if (!jobid) { - stratifier_reconnect_client(ckp, client_id); - LOGNOTICE("Proxy %d:%s failed to find matching jobid for %sknown subproxy in proxysend", - proxy->id, proxy->url, subproxy ? "" : "un"); } else { stratifier_reconnect_client(ckp, client_id); - LOGNOTICE("Failed to find subproxy %d:%d to send message to", - proxy->id, subid); + LOGNOTICE("Proxy %d:%s failed to find matching jobid in proxysend", + proxy->id, proxy->url); } - if (!ret && subproxy) { + if (!ret) { LOGNOTICE("Proxy %d:%d %s failed to send msg in proxy_send, dropping to reconnect", proxy->id, subid, proxy->url); disable_subproxy(gdata, proxy, subproxy); @@ -2054,9 +2049,6 @@ static void prepare_proxy(proxy_instance_t *proxi) proxi->parent = proxi; mutex_init(&proxi->proxy_lock); add_subproxy(proxi, proxi); - mutex_init(&proxi->psend_lock); - cond_init(&proxi->psend_cond); - create_pthread(&proxi->pth_psend, proxy_send, proxi); create_pthread(&proxi->pth_precv, proxy_recv, proxi); } @@ -2217,7 +2209,6 @@ static void delete_proxy(gdata_t *gdata, proxy_instance_t *proxy) mutex_lock(&gdata->lock); HASH_DEL(gdata->proxies, proxy); /* Disable all its threads */ - pthread_cancel(proxy->pth_psend); pthread_cancel(proxy->pth_precv); Close(proxy->cs.fd); mutex_unlock(&gdata->lock); @@ -2461,6 +2452,9 @@ static int proxy_mode(ckpool_t *ckp, proc_instance_t *pi) } else { prepare_proxy(proxy); create_pthread(&gdata->pth_uprecv, userproxy_recv, ckp); + mutex_init(&gdata->psend_lock); + cond_init(&gdata->psend_cond); + create_pthread(&gdata->pth_psend, proxy_send, ckp); } }