Browse Source

Handle proxy sends according to which socket is ready to receive data

master
Con Kolivas 10 years ago
parent
commit
ed6febe6db
  1. 84
      src/generator.c

84
src/generator.c

@ -1491,12 +1491,59 @@ out:
return ret;
}
typedef struct cs_msg cs_msg_t;
struct cs_msg {
cs_msg_t *next;
cs_msg_t *prev;
proxy_instance_t *proxy;
json_t *val;
};
/* Sends all messages in the queue ready to be dispatched, leaving those that
* fail write select to be handled next pass */
static void send_json_msgq(gdata_t *gdata, cs_msg_t **csmsgq)
{
cs_msg_t *csmsg, *tmp;
int ret;
DL_FOREACH_SAFE(*csmsgq, csmsg, tmp) {
connsock_t *cs = &(csmsg->proxy->cs);
if ((ret = wait_write_select(cs->fd, 0))) {
DL_DELETE(*csmsgq, csmsg);
if (ret > 0)
ret = send_json_msg(cs, csmsg->val);
json_decref(csmsg->val);
if (ret < 1) {
proxy_instance_t *proxy = csmsg->proxy;
LOGNOTICE("Proxy %d:%d %s failed to send msg in send_json_msgq, dropping",
proxy->id, proxy->subid, proxy->url);
disable_subproxy(gdata, proxy->parent, proxy);
}
free(csmsg);
}
}
}
static void add_json_msgq(cs_msg_t **csmsgq, proxy_instance_t *proxy, json_t **val)
{
cs_msg_t *csmsg = ckalloc(sizeof(cs_msg_t));
csmsg->val = *val;
*val = NULL;
csmsg->proxy = proxy;
DL_APPEND(*csmsgq, csmsg);
}
/* For processing and sending shares. proxy refers to parent proxy here */
static void *proxy_send(void *arg)
{
ckpool_t *ckp = (ckpool_t *)arg;
gdata_t *gdata = ckp->data;
stratum_msg_t *msg = NULL;
cs_msg_t *csmsgq = NULL;
rename_proc("proxysend");
@ -1508,15 +1555,7 @@ static void *proxy_send(void *arg)
int64_t client_id = 0, id;
notify_instance_t *ni;
json_t *jobid = NULL;
bool ret = true;
connsock_t *cs;
json_t *val;
tv_t now;
ts_t abs;
tv_time(&now);
tv_to_ts(&abs, &now);
abs.tv_sec++;
if (unlikely(msg)) {
json_decref(msg->json_msg);
@ -1524,15 +1563,24 @@ static void *proxy_send(void *arg)
}
mutex_lock(&gdata->psend_lock);
if (!gdata->psends)
cond_timedwait(&gdata->psend_cond, &gdata->psend_lock, &abs);
if (!gdata->psends) {
/* Poll every 10ms */
const ts_t polltime = {0, 10000000};
ts_t timeout_ts;
ts_realtime(&timeout_ts);
timeraddspec(&timeout_ts, &polltime);
cond_timedwait(&gdata->psend_cond, &gdata->psend_lock, &timeout_ts);
}
msg = gdata->psends;
if (likely(msg))
DL_DELETE(gdata->psends, msg);
mutex_unlock(&gdata->psend_lock);
if (!msg)
if (!msg) {
send_json_msgq(gdata, &csmsgq);
continue;
}
if (unlikely(!json_get_int(&subid, msg->json_msg, "subproxy"))) {
LOGWARNING("Failed to find subproxy in proxy_send msg");
@ -1572,24 +1620,18 @@ static void *proxy_send(void *arg)
if (unlikely(!jobid)) {
stratifier_reconnect_client(ckp, client_id);
LOGNOTICE("Proxy %d:%s failed to find matching jobid in proxysend",
proxy->id, proxy->url);
subproxy->id, subproxy->url);
continue;
}
cs = &subproxy->cs;
JSON_CPACK(val, "{s[soooo]soss}", "params", proxy->auth, jobid,
JSON_CPACK(val, "{s[soooo]soss}", "params", subproxy->auth, jobid,
json_object_dup(msg->json_msg, "nonce2"),
json_object_dup(msg->json_msg, "ntime"),
json_object_dup(msg->json_msg, "nonce"),
"id", json_object_dup(msg->json_msg, "id"),
"method", "mining.submit");
ret = send_json_msg(cs, val);
json_decref(val);
if (!ret) {
LOGNOTICE("Proxy %d:%d %s failed to send msg in proxy_send, dropping to reconnect",
proxy->id, subid, proxy->url);
disable_subproxy(gdata, proxy, subproxy);
}
add_json_msgq(&csmsgq, subproxy, &val);
send_json_msgq(gdata, &csmsgq);
}
return NULL;
}

Loading…
Cancel
Save