Browse Source

Move all proxy sends to the same thread

master
Con Kolivas 10 years ago
parent
commit
964b9fb095
  1. 78
      src/generator.c

78
src/generator.c

@ -115,11 +115,6 @@ struct proxy_instance {
notify_instance_t *notify_instances; notify_instance_t *notify_instances;
pthread_t pth_precv; pthread_t pth_precv;
pthread_t pth_psend;
mutex_t psend_lock;
pthread_cond_t psend_cond;
stratum_msg_t *psends;
mutex_t share_lock; mutex_t share_lock;
share_msg_t *shares; share_msg_t *shares;
@ -150,7 +145,12 @@ struct generator_data {
int proxy_notify_id; // Globally increasing notify id int proxy_notify_id; // Globally increasing notify id
ckmsgq_t *srvchk; // Server check message queue ckmsgq_t *srvchk; // Server check message queue
pthread_t pth_uprecv; // User proxy receive thread pthread_t pth_uprecv; // User proxy receive thread
pthread_t pth_upsend; // User proxy send thread pthread_t pth_psend; // Combined proxy send thread
mutex_t psend_lock; // Lock associated with conditional below
pthread_cond_t psend_cond;
stratum_msg_t *psends;
}; };
typedef struct generator_data gdata_t; typedef struct generator_data gdata_t;
@ -1429,10 +1429,10 @@ static void submit_share(gdata_t *gdata, json_t *val)
json_object_set_nocheck(val, "id", json_integer(share->id)); json_object_set_nocheck(val, "id", json_integer(share->id));
/* Add the new message to the psend list */ /* Add the new message to the psend list */
mutex_lock(&proxy->psend_lock); mutex_lock(&gdata->psend_lock);
DL_APPEND(proxy->psends, msg); DL_APPEND(gdata->psends, msg);
pthread_cond_signal(&proxy->psend_cond); pthread_cond_signal(&gdata->psend_cond);
mutex_unlock(&proxy->psend_lock); mutex_unlock(&gdata->psend_lock);
out: out:
if (!success) if (!success)
@ -1494,9 +1494,7 @@ out:
/* For processing and sending shares. proxy refers to parent proxy here */ /* For processing and sending shares. proxy refers to parent proxy here */
static void *proxy_send(void *arg) static void *proxy_send(void *arg)
{ {
proxy_instance_t *proxy = (proxy_instance_t *)arg; ckpool_t *ckp = (ckpool_t *)arg;
connsock_t *cs = &proxy->cs;
ckpool_t *ckp = cs->ckp;
gdata_t *gdata = ckp->data; gdata_t *gdata = ckp->data;
stratum_msg_t *msg = NULL; stratum_msg_t *msg = NULL;
@ -1505,22 +1503,17 @@ static void *proxy_send(void *arg)
pthread_detach(pthread_self()); pthread_detach(pthread_self());
while (42) { while (42) {
proxy_instance_t *subproxy; proxy_instance_t *proxy, *subproxy;
int proxyid = 0, subid = 0; int proxyid = 0, subid = 0;
int64_t client_id = 0, id; int64_t client_id = 0, id;
notify_instance_t *ni; notify_instance_t *ni;
json_t *jobid = NULL; json_t *jobid = NULL;
bool ret = true; bool ret = true;
connsock_t *cs;
json_t *val; json_t *val;
tv_t now; tv_t now;
ts_t abs; ts_t abs;
if (unlikely(proxy->reconnect)) {
LOGINFO("Shutting down proxy_send thread for proxy %d to reconnect",
proxy->id);
break;
}
tv_time(&now); tv_time(&now);
tv_to_ts(&abs, &now); tv_to_ts(&abs, &now);
abs.tv_sec++; abs.tv_sec++;
@ -1530,13 +1523,13 @@ static void *proxy_send(void *arg)
free(msg); free(msg);
} }
mutex_lock(&proxy->psend_lock); mutex_lock(&gdata->psend_lock);
if (!proxy->psends) if (!gdata->psends)
cond_timedwait(&proxy->psend_cond, &proxy->psend_lock, &abs); cond_timedwait(&gdata->psend_cond, &gdata->psend_lock, &abs);
msg = proxy->psends; msg = gdata->psends;
if (likely(msg)) if (likely(msg))
DL_DELETE(proxy->psends, msg); DL_DELETE(gdata->psends, msg);
mutex_unlock(&proxy->psend_lock); mutex_unlock(&gdata->psend_lock);
if (!msg) if (!msg)
continue; continue;
@ -1557,9 +1550,11 @@ static void *proxy_send(void *arg)
LOGWARNING("Failed to find client_id in proxy_send msg"); LOGWARNING("Failed to find client_id in proxy_send msg");
continue; continue;
} }
if (unlikely(proxyid != proxy->id)) { proxy = proxy_by_id(gdata, proxyid);
LOGWARNING("Proxysend for proxy %d got message for proxy %d!", if (unlikely(!proxy)) {
proxy->id, proxyid); LOGWARNING("Proxysend for got message for non-existent proxy %d",
proxyid);
continue;
} }
mutex_lock(&proxy->notify_lock); mutex_lock(&proxy->notify_lock);
@ -1569,9 +1564,13 @@ static void *proxy_send(void *arg)
mutex_unlock(&proxy->notify_lock); mutex_unlock(&proxy->notify_lock);
subproxy = subproxy_by_id(proxy, subid); subproxy = subproxy_by_id(proxy, subid);
if (subproxy) if (unlikely(!subproxy)) {
LOGWARNING("Proxysend for got message for non-existent subproxy %d:%d",
proxyid, subid);
continue;
}
cs = &subproxy->cs; cs = &subproxy->cs;
if (jobid && subproxy) { if (jobid) {
JSON_CPACK(val, "{s[soooo]soss}", "params", proxy->auth, jobid, JSON_CPACK(val, "{s[soooo]soss}", "params", proxy->auth, jobid,
json_object_dup(msg->json_msg, "nonce2"), json_object_dup(msg->json_msg, "nonce2"),
json_object_dup(msg->json_msg, "ntime"), json_object_dup(msg->json_msg, "ntime"),
@ -1580,16 +1579,12 @@ static void *proxy_send(void *arg)
"method", "mining.submit"); "method", "mining.submit");
ret = send_json_msg(cs, val); ret = send_json_msg(cs, val);
json_decref(val); json_decref(val);
} else if (!jobid) {
stratifier_reconnect_client(ckp, client_id);
LOGNOTICE("Proxy %d:%s failed to find matching jobid for %sknown subproxy in proxysend",
proxy->id, proxy->url, subproxy ? "" : "un");
} else { } else {
stratifier_reconnect_client(ckp, client_id); stratifier_reconnect_client(ckp, client_id);
LOGNOTICE("Failed to find subproxy %d:%d to send message to", LOGNOTICE("Proxy %d:%s failed to find matching jobid in proxysend",
proxy->id, subid); proxy->id, proxy->url);
} }
if (!ret && subproxy) { if (!ret) {
LOGNOTICE("Proxy %d:%d %s failed to send msg in proxy_send, dropping to reconnect", LOGNOTICE("Proxy %d:%d %s failed to send msg in proxy_send, dropping to reconnect",
proxy->id, subid, proxy->url); proxy->id, subid, proxy->url);
disable_subproxy(gdata, proxy, subproxy); disable_subproxy(gdata, proxy, subproxy);
@ -2054,9 +2049,6 @@ static void prepare_proxy(proxy_instance_t *proxi)
proxi->parent = proxi; proxi->parent = proxi;
mutex_init(&proxi->proxy_lock); mutex_init(&proxi->proxy_lock);
add_subproxy(proxi, proxi); add_subproxy(proxi, proxi);
mutex_init(&proxi->psend_lock);
cond_init(&proxi->psend_cond);
create_pthread(&proxi->pth_psend, proxy_send, proxi);
create_pthread(&proxi->pth_precv, proxy_recv, proxi); create_pthread(&proxi->pth_precv, proxy_recv, proxi);
} }
@ -2217,7 +2209,6 @@ static void delete_proxy(gdata_t *gdata, proxy_instance_t *proxy)
mutex_lock(&gdata->lock); mutex_lock(&gdata->lock);
HASH_DEL(gdata->proxies, proxy); HASH_DEL(gdata->proxies, proxy);
/* Disable all its threads */ /* Disable all its threads */
pthread_cancel(proxy->pth_psend);
pthread_cancel(proxy->pth_precv); pthread_cancel(proxy->pth_precv);
Close(proxy->cs.fd); Close(proxy->cs.fd);
mutex_unlock(&gdata->lock); mutex_unlock(&gdata->lock);
@ -2461,6 +2452,9 @@ static int proxy_mode(ckpool_t *ckp, proc_instance_t *pi)
} else { } else {
prepare_proxy(proxy); prepare_proxy(proxy);
create_pthread(&gdata->pth_uprecv, userproxy_recv, ckp); create_pthread(&gdata->pth_uprecv, userproxy_recv, ckp);
mutex_init(&gdata->psend_lock);
cond_init(&gdata->psend_cond);
create_pthread(&gdata->pth_psend, proxy_send, ckp);
} }
} }

Loading…
Cancel
Save