Browse Source

Move to pushing all notify data from the generator to avoid an extra message and reconnect clients once the current proxy has its first notification data

master
Con Kolivas 10 years ago
parent
commit
64a3703f09
  1. 42
      src/generator.c
  2. 81
      src/stratifier.c

42
src/generator.c

@ -1120,6 +1120,7 @@ out:
return ret; return ret;
} }
#if 0
static proxy_instance_t *proxy_by_id(gdata_t *gdata, const int id) static proxy_instance_t *proxy_by_id(gdata_t *gdata, const int id)
{ {
proxy_instance_t *proxi; proxy_instance_t *proxi;
@ -1130,20 +1131,17 @@ static proxy_instance_t *proxy_by_id(gdata_t *gdata, const int id)
return proxi; return proxi;
} }
#endif
static proxy_instance_t *current_proxy(gdata_t *gdata);
static void send_subscribe(ckpool_t *ckp, proxy_instance_t *proxi) static void send_subscribe(ckpool_t *ckp, proxy_instance_t *proxi)
{ {
gdata_t *gdata = ckp->data;
json_t *json_msg; json_t *json_msg;
char *msg, *buf; char *msg, *buf;
JSON_CPACK(json_msg, "{sisssisb}", JSON_CPACK(json_msg, "{sisssi}",
"proxy", proxi->id, "proxy", proxi->id,
"enonce1", proxi->enonce1, "enonce1", proxi->enonce1,
"nonce2len", proxi->nonce2len, "nonce2len", proxi->nonce2len);
"reconnect", proxi == current_proxy(gdata) ? true : false);
msg = json_dumps(json_msg, JSON_NO_UTF8); msg = json_dumps(json_msg, JSON_NO_UTF8);
json_decref(json_msg); json_decref(json_msg);
ASPRINTF(&buf, "subscribe=%s", msg); ASPRINTF(&buf, "subscribe=%s", msg);
@ -1152,28 +1150,21 @@ static void send_subscribe(ckpool_t *ckp, proxy_instance_t *proxi)
free(buf); free(buf);
} }
static void send_notify(gdata_t *gdata, int *sockd, const char *buf) static void send_notify(ckpool_t *ckp, proxy_instance_t *proxi)
{ {
json_t *json_msg, *merkle_arr; json_t *json_msg, *merkle_arr;
proxy_instance_t *proxi;
notify_instance_t *ni; notify_instance_t *ni;
int i, id = 0; char *msg, *buf;
char *msg; int i;
sscanf(buf, "getnotify=%d", &id);
proxi = proxy_by_id(gdata, id);
if (unlikely(!proxi)) {
ASPRINTF(&msg, "notready");
goto out_send;
}
merkle_arr = json_array(); merkle_arr = json_array();
mutex_lock(&proxi->notify_lock); mutex_lock(&proxi->notify_lock);
ni = proxi->current_notify; ni = proxi->current_notify;
if (unlikely(!ni)) { if (unlikely(!ni)) {
mutex_unlock(&proxi->notify_lock); mutex_unlock(&proxi->notify_lock);
ASPRINTF(&msg, "notready"); LOGNOTICE("Proxi %d not ready to send notify", proxi->id);
goto out_send; return;
} }
for (i = 0; i < ni->merkles; i++) for (i = 0; i < ni->merkles; i++)
json_array_append_new(merkle_arr, json_string(&ni->merklehash[i][0])); json_array_append_new(merkle_arr, json_string(&ni->merklehash[i][0]));
@ -1188,10 +1179,10 @@ static void send_notify(gdata_t *gdata, int *sockd, const char *buf)
msg = json_dumps(json_msg, JSON_NO_UTF8); msg = json_dumps(json_msg, JSON_NO_UTF8);
json_decref(json_msg); json_decref(json_msg);
out_send: ASPRINTF(&buf, "notify=%s", msg);
send_unix_msg(*sockd, msg);
free(msg); free(msg);
_Close(sockd); send_proc(ckp->stratifier, buf);
free(buf);
} }
static void send_diff(proxy_instance_t *proxi, int *sockd) static void send_diff(proxy_instance_t *proxi, int *sockd)
@ -1511,7 +1502,6 @@ static void *proxy_recv(void *arg)
notify_instance_t *ni, *tmp; notify_instance_t *ni, *tmp;
share_msg_t *share, *tmpshare; share_msg_t *share, *tmpshare;
int retries = 0, ret; int retries = 0, ret;
char buf[128];
time_t now; time_t now;
while (!proxy_alive(ckp, si, proxi, cs, true)) { while (!proxy_alive(ckp, si, proxi, cs, true)) {
@ -1574,8 +1564,7 @@ static void *proxy_recv(void *arg)
} }
if (parse_method(proxi, cs->buf)) { if (parse_method(proxi, cs->buf)) {
if (proxi->notified && proxi == current_proxy(gdata)) { if (proxi->notified && proxi == current_proxy(gdata)) {
snprintf(buf, 127, "notify=%d", proxi->id); send_notify(ckp, proxi);
send_proc(ckp->stratifier, buf);
proxi->notified = false; proxi->notified = false;
} }
if (proxi->diffed) { if (proxi->diffed) {
@ -1689,6 +1678,9 @@ reconnect:
dealloc(buf); dealloc(buf);
ASPRINTF(&buf, "proxy=%d", proxi->id); ASPRINTF(&buf, "proxy=%d", proxi->id);
send_proc(ckp->stratifier, buf); send_proc(ckp->stratifier, buf);
/* Send a notify for the new chosen proxy or the
* stratifier won't be able to switch. */
send_notify(ckp, proxi);
} }
} }
retry: retry:
@ -1726,8 +1718,6 @@ retry:
if (cmdmatch(buf, "shutdown")) { if (cmdmatch(buf, "shutdown")) {
ret = 0; ret = 0;
goto out; goto out;
} else if (cmdmatch(buf, "getnotify")) {
send_notify(gdata, &sockd, buf);
} else if (cmdmatch(buf, "getdiff")) { } else if (cmdmatch(buf, "getdiff")) {
send_diff(proxi, &sockd); send_diff(proxi, &sockd);
} else if (cmdmatch(buf, "reconnect")) { } else if (cmdmatch(buf, "reconnect")) {

81
src/stratifier.c

@ -297,6 +297,7 @@ struct proxy_base {
int enonce2varlen; int enonce2varlen;
bool subscribed; bool subscribed;
bool notified;
}; };
typedef struct proxy_base proxy_t; typedef struct proxy_base proxy_t;
@ -1027,8 +1028,6 @@ static void reconnect_clients(sdata_t *sdata)
ck_runlock(&sdata->instance_lock); ck_runlock(&sdata->instance_lock);
} }
static void _update_notify(ckpool_t *ckp, const int id);
static proxy_t *current_proxy(sdata_t *sdata) static proxy_t *current_proxy(sdata_t *sdata)
{ {
proxy_t *proxy; proxy_t *proxy;
@ -1043,7 +1042,6 @@ static proxy_t *current_proxy(sdata_t *sdata)
static void update_subscribe(ckpool_t *ckp, const char *cmd) static void update_subscribe(ckpool_t *ckp, const char *cmd)
{ {
sdata_t *sdata = ckp->data; sdata_t *sdata = ckp->data;
bool reconnect = true;
const char *buf; const char *buf;
proxy_t *proxy; proxy_t *proxy;
json_t *val; json_t *val;
@ -1055,10 +1053,6 @@ static void update_subscribe(ckpool_t *ckp, const char *cmd)
} }
buf = cmd + 10; buf = cmd + 10;
LOGDEBUG("Update subscribe: %s", buf); LOGDEBUG("Update subscribe: %s", buf);
if (unlikely(!safecmp(buf, "notready"))) {
LOGNOTICE("Generator not ready to send subscribe for proxy %d", id);
return;
}
val = json_loads(buf, 0, NULL); val = json_loads(buf, 0, NULL);
if (unlikely(!val)) { if (unlikely(!val)) {
LOGWARNING("Failed to json decode subscribe response in update_subscribe"); LOGWARNING("Failed to json decode subscribe response in update_subscribe");
@ -1069,7 +1063,7 @@ static void update_subscribe(ckpool_t *ckp, const char *cmd)
LOGNOTICE("Got updated subscribe for proxy %d", id); LOGNOTICE("Got updated subscribe for proxy %d", id);
proxy = proxy_by_id(sdata, id); proxy = proxy_by_id(sdata, id);
json_get_bool(&reconnect, val, "reconnect"); proxy->notified = false; /* Reset this */
ck_wlock(&sdata->workbase_lock); ck_wlock(&sdata->workbase_lock);
proxy->subscribed = true; proxy->subscribed = true;
@ -1101,52 +1095,54 @@ static void update_subscribe(ckpool_t *ckp, const char *cmd)
proxy->nonce2len, 1ll << (proxy->enonce1varlen * 8)); proxy->nonce2len, 1ll << (proxy->enonce1varlen * 8));
json_decref(val); json_decref(val);
/* Notify implied required now too */
_update_notify(ckp, id);
if (reconnect)
reconnect_clients(sdata);
} }
static void update_diff(ckpool_t *ckp); static void update_diff(ckpool_t *ckp);
static void _update_notify(ckpool_t *ckp, const int id) static void update_notify(ckpool_t *ckp, const char *cmd)
{ {
bool new_block = false, clean; bool new_block = false, clean;
sdata_t *sdata = ckp->data; sdata_t *sdata = ckp->data;
char header[228]; char header[228];
char *buf, *msg; const char *buf;
proxy_t *proxy; proxy_t *proxy;
workbase_t *wb; workbase_t *wb;
int i, id = 0;
json_t *val; json_t *val;
int i;
if (unlikely(strlen(cmd) < 8)) {
LOGWARNING("Zero length string passed to update_notify");
return;
}
buf = cmd + 7;
LOGDEBUG("Update notify: %s", buf);
val = json_loads(buf, 0, NULL);
if (unlikely(!val)) {
LOGWARNING("Failed to json decode in update_notify");
return;
}
json_get_int(&id, val, "proxy");
proxy = proxy_by_id(sdata, id); proxy = proxy_by_id(sdata, id);
if (unlikely(!proxy->subscribed)) { if (unlikely(!proxy->subscribed)) {
LOGINFO("No valid proxy subscription to update notify yet"); LOGNOTICE("No valid proxy %d subscription to update notify yet", id);
return; goto out;
} }
LOGNOTICE("Got updated notify for proxy %d", id);
if (proxy != current_proxy(sdata)) { if (proxy != current_proxy(sdata)) {
LOGINFO("Notify from backup proxy"); LOGINFO("Notify from backup proxy");
return; goto out;
} }
ASPRINTF(&msg, "getnotify=%d", id); if (!proxy->notified) {
buf = send_recv_proc(ckp->generator, msg); /* This is the first notification from the current proxy, tell
dealloc(msg); * clients now to reconnect since we have enough information to
if (unlikely(!buf)) { * switch. */
LOGWARNING("Failed to get notify from generator in update_notify"); proxy->notified = true;
return; reconnect_clients(sdata);
}
LOGDEBUG("Update notify: %s", buf);
if (unlikely(!safecmp(buf, "notready"))) {
LOGNOTICE("Generator not ready to send notify to stratifier");
return;
} }
LOGINFO("Got updated notify for proxy %d", id);
wb = ckzalloc(sizeof(workbase_t)); wb = ckzalloc(sizeof(workbase_t));
val = json_loads(buf, 0, NULL);
dealloc(buf);
wb->ckp = ckp; wb->ckp = ckp;
wb->proxy = true; wb->proxy = true;
@ -1173,7 +1169,6 @@ static void _update_notify(ckpool_t *ckp, const int id)
json_strcpy(wb->ntime, val, "ntime"); json_strcpy(wb->ntime, val, "ntime");
sscanf(wb->ntime, "%x", &wb->ntime32); sscanf(wb->ntime, "%x", &wb->ntime32);
clean = json_is_true(json_object_get(val, "clean")); clean = json_is_true(json_object_get(val, "clean"));
json_decref(val);
ts_realtime(&wb->gentime); ts_realtime(&wb->gentime);
snprintf(header, 225, "%s%s%s%s%s%s%s", snprintf(header, 225, "%s%s%s%s%s%s%s",
wb->bbversion, wb->prevhash, wb->bbversion, wb->prevhash,
@ -1200,14 +1195,8 @@ static void _update_notify(ckpool_t *ckp, const int id)
add_base(ckp, wb, &new_block); add_base(ckp, wb, &new_block);
stratum_broadcast_update(sdata, new_block | clean); stratum_broadcast_update(sdata, new_block | clean);
} out:
json_decref(val);
static void update_notify(ckpool_t *ckp, const char *cmd)
{
int id = 0;
sscanf(cmd, "notify=%d", &id);
_update_notify(ckp, id);
} }
static void stratum_send_diff(sdata_t *sdata, const stratum_instance_t *client); static void stratum_send_diff(sdata_t *sdata, const stratum_instance_t *client);
@ -1822,8 +1811,9 @@ static char *stratifier_stats(ckpool_t *ckp, sdata_t *sdata)
return buf; return buf;
} }
/* Sets the currently active proxy */ /* Sets the currently active proxy. Clients will be told to reconnect once the
static void set_proxy(ckpool_t *ckp, sdata_t *sdata, const char *buf) * first notify data comes from this proxy. */
static void set_proxy(sdata_t *sdata, const char *buf)
{ {
proxy_t *proxy; proxy_t *proxy;
int id = 0; int id = 0;
@ -1834,9 +1824,6 @@ static void set_proxy(ckpool_t *ckp, sdata_t *sdata, const char *buf)
proxy = __proxy_by_id(sdata, id); proxy = __proxy_by_id(sdata, id);
sdata->proxy = proxy; sdata->proxy = proxy;
mutex_unlock(&sdata->proxy_lock); mutex_unlock(&sdata->proxy_lock);
_update_notify(ckp, id);
reconnect_clients(sdata);
} }
static int stratum_loop(ckpool_t *ckp, proc_instance_t *pi) static int stratum_loop(ckpool_t *ckp, proc_instance_t *pi)
@ -1946,7 +1933,7 @@ retry:
} else if (cmdmatch(buf, "reconnect")) { } else if (cmdmatch(buf, "reconnect")) {
request_reconnect(sdata, buf); request_reconnect(sdata, buf);
} else if (cmdmatch(buf, "proxy")) { } else if (cmdmatch(buf, "proxy")) {
set_proxy(ckp, sdata, buf); set_proxy(sdata, buf);
} else if (cmdmatch(buf, "loglevel")) { } else if (cmdmatch(buf, "loglevel")) {
sscanf(buf, "loglevel=%d", &ckp->loglevel); sscanf(buf, "loglevel=%d", &ckp->loglevel);
} else } else

Loading…
Cancel
Save