Browse Source

Send which proxy we wish to get a subscribe or notify to/from the connector to stratifier and whether to reconnect

master
Con Kolivas 10 years ago
parent
commit
520b36cc70
  1. 2
      src/ckpool.c
  2. 1
      src/ckpool.h
  3. 46
      src/generator.c
  4. 58
      src/stratifier.c

2
src/ckpool.c

@ -1024,7 +1024,7 @@ out:
return ret;
}
static bool json_get_bool(bool *store, const json_t *val, const char *res)
bool json_get_bool(bool *store, const json_t *val, const char *res)
{
json_t *entry = json_object_get(val, res);
bool ret = false;

1
src/ckpool.h

@ -229,5 +229,6 @@ bool json_get_string(char **store, const json_t *val, const char *res);
bool json_get_int64(int64_t *store, const json_t *val, const char *res);
bool json_get_int(int *store, const json_t *val, const char *res);
bool json_get_double(double *store, const json_t *val, const char *res);
bool json_get_bool(bool *store, const json_t *val, const char *res);
#endif /* CKPOOL_H */

46
src/generator.c

@ -1116,29 +1116,57 @@ out:
return ret;
}
static void send_subscribe(proxy_instance_t *proxi, int *sockd)
static proxy_instance_t *proxy_by_id(gdata_t *gdata, const int id)
{
proxy_instance_t *proxi;
mutex_lock(&gdata->lock);
HASH_FIND_INT(gdata->proxies, &id, proxi);
mutex_unlock(&gdata->lock);
return proxi;
}
static void send_subscribe(gdata_t *gdata, proxy_instance_t *cproxy, int *sockd, const char *buf)
{
proxy_instance_t *proxi;
json_t *json_msg;
int id = 0;
char *msg;
JSON_CPACK(json_msg, "{sisssi}",
sscanf(buf, "getsubscribe=%d", &id);
proxi = proxy_by_id(gdata, id);
if (unlikely(!proxi || !proxi->nonce2len)) {
ASPRINTF(&msg, "notready");
goto out_send;
}
JSON_CPACK(json_msg, "{sisssisb}",
"proxy", proxi->id,
"enonce1", proxi->enonce1,
"nonce2len", proxi->nonce2len);
"nonce2len", proxi->nonce2len,
"reconnect", proxi == cproxy ? true : false);
msg = json_dumps(json_msg, JSON_NO_UTF8);
json_decref(json_msg);
out_send:
send_unix_msg(*sockd, msg);
free(msg);
_Close(sockd);
}
static void send_notify(proxy_instance_t *proxi, int *sockd)
static void send_notify(gdata_t *gdata, int *sockd, const char *buf)
{
json_t *json_msg, *merkle_arr;
proxy_instance_t *proxi;
notify_instance_t *ni;
int i, id = 0;
char *msg;
int i;
sscanf(buf, "getnotify=%d", &id);
proxi = proxy_by_id(gdata, id);
if (unlikely(!proxi)) {
ASPRINTF(&msg, "notready");
goto out_send;
}
merkle_arr = json_array();
mutex_lock(&proxi->notify_lock);
@ -1618,7 +1646,9 @@ reconnect:
LOGWARNING("Successfully connected to %s:%s as proxy",
cs->url, cs->port);
/* Sending subscribe implies stratifier will also do a notify */
send_proc(ckp->stratifier, "subscribe");
dealloc(buf);
ASPRINTF(&buf, "subscribe=%d", proxi->id);
send_proc(ckp->stratifier, buf);
proxi->notified = false;
}
}
@ -1656,9 +1686,9 @@ retry:
ret = 0;
goto out;
} else if (cmdmatch(buf, "getsubscribe")) {
send_subscribe(proxi, &sockd);
send_subscribe(gdata, proxi, &sockd, buf);
} else if (cmdmatch(buf, "getnotify")) {
send_notify(proxi, &sockd);
send_notify(gdata, &sockd, buf);
} else if (cmdmatch(buf, "getdiff")) {
send_diff(proxi, &sockd);
} else if (cmdmatch(buf, "reconnect")) {

58
src/stratifier.c

@ -1006,24 +1006,32 @@ static proxy_t *proxy_by_id(sdata_t *sdata, const int id)
return proxy;
}
static void update_notify(ckpool_t *ckp);
static void reconnect_clients(sdata_t *sdata, const char *cmd);
static void _update_notify(ckpool_t *ckp, const int id);
static void update_subscribe(ckpool_t *ckp)
static void update_subscribe(ckpool_t *ckp, const char *cmd)
{
sdata_t *sdata = ckp->data;
bool reconnect = true;
char *buf, *msg;
proxy_t *proxy;
json_t *val;
int id = 0;
char *buf;
buf = send_recv_proc(ckp->generator, "getsubscribe");
sscanf(cmd, "subscribe=%d", &id);
ASPRINTF(&msg, "getsubscribe=%d", id);
buf = send_recv_proc(ckp->generator, msg);
dealloc(msg);
if (unlikely(!buf)) {
LOGWARNING("Failed to get subscribe from generator in update_subscribe");
drop_allclients(ckp);
return;
}
LOGDEBUG("Update subscribe: %s", buf);
if (unlikely(!safecmp(buf, "notready"))) {
LOGNOTICE("Generator not ready to send subscribe for proxy %d", id);
return;
}
val = json_loads(buf, 0, NULL);
free(buf);
if (unlikely(!val)) {
@ -1031,8 +1039,8 @@ static void update_subscribe(ckpool_t *ckp)
return;
}
json_get_int(&id, val, "proxy");
proxy = proxy_by_id(sdata, id);
json_get_bool(&reconnect, val, "reconnect");
mutex_lock(&sdata->proxy_lock);
if (sdata->proxy != proxy)
@ -1070,12 +1078,13 @@ static void update_subscribe(ckpool_t *ckp)
json_decref(val);
/* Notify implied required now too */
update_notify(ckp);
reconnect_clients(sdata, "");
_update_notify(ckp, id);
if (reconnect)
reconnect_clients(sdata, "");
}
static void update_diff(ckpool_t *ckp);
#if 0
static proxy_t *current_proxy(sdata_t *sdata)
{
proxy_t *proxy;
@ -1086,19 +1095,28 @@ static proxy_t *current_proxy(sdata_t *sdata)
return proxy;
}
#endif
static void update_notify(ckpool_t *ckp)
static void _update_notify(ckpool_t *ckp, const int id)
{
bool new_block = false, clean;
sdata_t *sdata = ckp->data;
char header[228];
char *buf, *msg;
proxy_t *proxy;
workbase_t *wb;
json_t *val;
char *buf;
int i;
buf = send_recv_proc(ckp->generator, "getnotify");
proxy = proxy_by_id(sdata, id);
if (unlikely(!proxy->subscribed)) {
LOGINFO("No valid proxy subscription to update notify yet");
return;
}
ASPRINTF(&msg, "getnotify=%d", id);
buf = send_recv_proc(ckp->generator, msg);
dealloc(msg);
if (unlikely(!buf)) {
LOGWARNING("Failed to get notify from generator in update_notify");
return;
@ -1108,12 +1126,6 @@ static void update_notify(ckpool_t *ckp)
return;
}
proxy = current_proxy(sdata);
if (unlikely(!proxy || !proxy->subscribed)) {
LOGINFO("No valid proxy subscription to update notify yet");
return;
}
LOGDEBUG("Update notify: %s", buf);
wb = ckzalloc(sizeof(workbase_t));
val = json_loads(buf, 0, NULL);
@ -1173,6 +1185,14 @@ static void update_notify(ckpool_t *ckp)
stratum_broadcast_update(sdata, new_block | clean);
}
static void update_notify(ckpool_t *ckp, const char *cmd)
{
int id = 0;
sscanf(cmd, "notify=%d", &id);
_update_notify(ckp, id);
}
static void stratum_send_diff(sdata_t *sdata, const stratum_instance_t *client);
static void update_diff(ckpool_t *ckp)
@ -1866,10 +1886,10 @@ retry:
update_base(ckp, GEN_PRIORITY);
} else if (cmdmatch(buf, "subscribe")) {
/* Proxifier has a new subscription */
update_subscribe(ckp);
update_subscribe(ckp, buf);
} else if (cmdmatch(buf, "notify")) {
/* Proxifier has a new notify ready */
update_notify(ckp);
update_notify(ckp, buf);
} else if (cmdmatch(buf, "diff")) {
update_diff(ckp);
} else if (cmdmatch(buf, "dropclient")) {

Loading…
Cancel
Save