Browse Source

Store all notifies in the generator in the parent proxy list and extract the subproxy details from share submission to submit upstream to the right connection

master
Con Kolivas 10 years ago
parent
commit
575ac70de8
  1. 85
      src/generator.c

85
src/generator.c

@ -770,6 +770,7 @@ out:
static bool parse_notify(proxy_instance_t *proxi, json_t *val) static bool parse_notify(proxy_instance_t *proxi, json_t *val)
{ {
const char *prev_hash, *bbversion, *nbit, *ntime; const char *prev_hash, *bbversion, *nbit, *ntime;
proxy_instance_t *proxy = proxi->proxy;
char *job_id, *coinbase1, *coinbase2; char *job_id, *coinbase1, *coinbase2;
gdata_t *gdata = proxi->ckp->data; gdata_t *gdata = proxi->ckp->data;
bool clean, ret = false; bool clean, ret = false;
@ -830,11 +831,13 @@ static bool parse_notify(proxy_instance_t *proxi, json_t *val)
ret = true; ret = true;
ni->notify_time = time(NULL); ni->notify_time = time(NULL);
mutex_lock(&proxi->notify_lock); /* Add the notify instance to the parent proxy list, not the subproxy */
mutex_lock(&proxy->notify_lock);
ni->id = gdata->proxy_notify_id++; ni->id = gdata->proxy_notify_id++;
HASH_ADD_INT(proxi->notify_instances, id, ni); HASH_ADD_INT(proxy->notify_instances, id, ni);
/* Now set the subproxy's current notify to this */
proxi->current_notify = ni; proxi->current_notify = ni;
mutex_unlock(&proxi->notify_lock); mutex_unlock(&proxy->notify_lock);
out: out:
return ret; return ret;
@ -973,15 +976,16 @@ out:
static void send_diff(ckpool_t *ckp, proxy_instance_t *proxi) static void send_diff(ckpool_t *ckp, proxy_instance_t *proxi)
{ {
proxy_instance_t *proxy = proxi->proxy;
json_t *json_msg; json_t *json_msg;
char *msg, *buf; char *msg, *buf;
/* Master proxy, we don't use this for work */ /* Master proxy, we don't use this for work */
if (proxi == proxi->proxy) if (proxi == proxy)
return; return;
JSON_CPACK(json_msg, "{sisisf}", JSON_CPACK(json_msg, "{sisisf}",
"proxy", proxi->proxy->id, "proxy", proxy->id,
"subproxy", proxi->id, "subproxy", proxi->id,
"diff", proxi->diff); "diff", proxi->diff);
msg = json_dumps(json_msg, JSON_NO_UTF8); msg = json_dumps(json_msg, JSON_NO_UTF8);
@ -994,21 +998,22 @@ static void send_diff(ckpool_t *ckp, proxy_instance_t *proxi)
static void send_notify(ckpool_t *ckp, proxy_instance_t *proxi) static void send_notify(ckpool_t *ckp, proxy_instance_t *proxi)
{ {
proxy_instance_t *proxy = proxi->proxy;
json_t *json_msg, *merkle_arr; json_t *json_msg, *merkle_arr;
notify_instance_t *ni; notify_instance_t *ni;
char *msg, *buf; char *msg, *buf;
int i; int i;
/* Master proxy, we don't use this for work */ /* Master proxy, we don't use this for work */
if (proxi == proxi->proxy) if (proxi == proxy)
return; return;
merkle_arr = json_array(); merkle_arr = json_array();
mutex_lock(&proxi->notify_lock); mutex_lock(&proxy->notify_lock);
ni = proxi->current_notify; ni = proxi->current_notify;
if (unlikely(!ni)) { if (unlikely(!ni)) {
mutex_unlock(&proxi->notify_lock); mutex_unlock(&proxy->notify_lock);
LOGNOTICE("Proxi %d not ready to send notify", proxi->id); LOGNOTICE("Proxi %d not ready to send notify", proxi->id);
return; return;
} }
@ -1016,13 +1021,13 @@ static void send_notify(ckpool_t *ckp, proxy_instance_t *proxi)
json_array_append_new(merkle_arr, json_string(&ni->merklehash[i][0])); json_array_append_new(merkle_arr, json_string(&ni->merklehash[i][0]));
/* Use our own jobid instead of the server's one for easy lookup */ /* Use our own jobid instead of the server's one for easy lookup */
JSON_CPACK(json_msg, "{sisisisssisssssosssssssb}", JSON_CPACK(json_msg, "{sisisisssisssssosssssssb}",
"proxy", proxi->proxy->id, "subproxy", proxi->id, "proxy", proxy->id, "subproxy", proxi->id,
"jobid", ni->id, "prevhash", ni->prevhash, "coinb1len", ni->coinb1len, "jobid", ni->id, "prevhash", ni->prevhash, "coinb1len", ni->coinb1len,
"coinbase1", ni->coinbase1, "coinbase2", ni->coinbase2, "coinbase1", ni->coinbase1, "coinbase2", ni->coinbase2,
"merklehash", merkle_arr, "bbversion", ni->bbversion, "merklehash", merkle_arr, "bbversion", ni->bbversion,
"nbit", ni->nbit, "ntime", ni->ntime, "nbit", ni->nbit, "ntime", ni->ntime,
"clean", ni->clean); "clean", ni->clean);
mutex_unlock(&proxi->notify_lock); mutex_unlock(&proxy->notify_lock);
msg = json_dumps(json_msg, JSON_NO_UTF8); msg = json_dumps(json_msg, JSON_NO_UTF8);
json_decref(json_msg); json_decref(json_msg);
@ -1030,6 +1035,10 @@ static void send_notify(ckpool_t *ckp, proxy_instance_t *proxi)
free(msg); free(msg);
send_proc(ckp->stratifier, buf); send_proc(ckp->stratifier, buf);
free(buf); free(buf);
/* Send diff now as stratifier will not accept diff till it has a
* valid workbase */
send_diff(ckp, proxi);
} }
static bool parse_method(ckpool_t *ckp, proxy_instance_t *proxi, const char *msg) static bool parse_method(ckpool_t *ckp, proxy_instance_t *proxi, const char *msg)
@ -1302,59 +1311,77 @@ out:
return ret; return ret;
} }
/* For processing and sending shares */ static proxy_instance_t *subproxy_by_id(proxy_instance_t *proxy, const int id)
{
proxy_instance_t *subproxy;
mutex_lock(&proxy->proxy_lock);
HASH_FIND_INT(proxy->subproxies, &id, subproxy);
mutex_unlock(&proxy->proxy_lock);
return subproxy;
}
/* For processing and sending shares. proxy refers to parent proxy here */
static void *proxy_send(void *arg) static void *proxy_send(void *arg)
{ {
proxy_instance_t *proxi = (proxy_instance_t *)arg; proxy_instance_t *proxy = (proxy_instance_t *)arg;
connsock_t *cs = proxi->cs; connsock_t *cs = proxy->cs;
rename_proc("proxysend"); rename_proc("proxysend");
while (42) { while (42) {
proxy_instance_t *subproxy;
notify_instance_t *ni; notify_instance_t *ni;
stratum_msg_t *msg; stratum_msg_t *msg;
char *jobid = NULL; char *jobid = NULL;
bool ret = true; bool ret = true;
int subid = 0;
json_t *val; json_t *val;
uint32_t id; uint32_t id;
mutex_lock(&proxi->psend_lock); mutex_lock(&proxy->psend_lock);
if (!proxi->psends) if (!proxy->psends)
pthread_cond_wait(&proxi->psend_cond, &proxi->psend_lock); pthread_cond_wait(&proxy->psend_cond, &proxy->psend_lock);
msg = proxi->psends; msg = proxy->psends;
if (likely(msg)) if (likely(msg))
DL_DELETE(proxi->psends, msg); DL_DELETE(proxy->psends, msg);
mutex_unlock(&proxi->psend_lock); mutex_unlock(&proxy->psend_lock);
if (unlikely(!msg)) if (unlikely(!msg))
continue; continue;
json_get_int(&subid, msg->json_msg, "subproxy");
json_uintcpy(&id, msg->json_msg, "jobid"); json_uintcpy(&id, msg->json_msg, "jobid");
mutex_lock(&proxi->notify_lock); mutex_lock(&proxy->notify_lock);
HASH_FIND_INT(proxi->notify_instances, &id, ni); HASH_FIND_INT(proxy->notify_instances, &id, ni);
if (ni) if (ni)
jobid = strdup(ni->jobid); jobid = strdup(ni->jobid);
mutex_unlock(&proxi->notify_lock); mutex_unlock(&proxy->notify_lock);
if (jobid) { subproxy = subproxy_by_id(proxy, subid);
JSON_CPACK(val, "{s[ssooo]soss}", "params", proxi->auth, jobid,
if (jobid && subproxy) {
cs = subproxy->cs;
JSON_CPACK(val, "{s[ssooo]soss}", "params", proxy->auth, jobid,
json_object_dup(msg->json_msg, "nonce2"), json_object_dup(msg->json_msg, "nonce2"),
json_object_dup(msg->json_msg, "ntime"), json_object_dup(msg->json_msg, "ntime"),
json_object_dup(msg->json_msg, "nonce"), json_object_dup(msg->json_msg, "nonce"),
"id", json_object_dup(msg->json_msg, "id"), "id", json_object_dup(msg->json_msg, "id"),
"method", "mining.submit"); "method", "mining.submit");
free(jobid);
ret = send_json_msg(cs, val); ret = send_json_msg(cs, val);
json_decref(val); json_decref(val);
} else } else {
LOGNOTICE("Proxy %d:%s failed to find matching jobid in proxysend", LOGNOTICE("Proxy %d:%s failed to find matching jobid in proxysend",
proxi->id, proxi->si->url); proxy->id, proxy->si->url);
}
free(jobid);
json_decref(msg->json_msg); json_decref(msg->json_msg);
free(msg); free(msg);
if (!ret && cs->fd > 0) { if (!ret && subproxy && cs->fd > 0) {
LOGWARNING("Proxy %d:%s failed to send msg in proxy_send, dropping to reconnect", LOGWARNING("Proxy %d:%s failed to send msg in proxy_send, dropping to reconnect",
proxi->id, proxi->si->url); proxy->id, proxy->si->url);
Close(cs->fd); Close(cs->fd);
} }
} }

Loading…
Cancel
Save