diff --git a/src/generator.c b/src/generator.c index 2dcd0c94..f31b5da3 100644 --- a/src/generator.c +++ b/src/generator.c @@ -44,8 +44,8 @@ struct proxy_instance { ckpool_t *ckp; connsock_t *cs; - char *auth; - char *pass; + const char *auth; + const char *pass; char *enonce1; char *enonce1bin; @@ -588,8 +588,7 @@ out: return ret; } -static bool auth_stratum(connsock_t *cs, proxy_instance_t *proxi, const char *auth, - const char *pass) +static bool auth_stratum(connsock_t *cs, proxy_instance_t *proxi) { json_t *val = NULL, *res_val, *req; bool ret; @@ -597,7 +596,7 @@ static bool auth_stratum(connsock_t *cs, proxy_instance_t *proxi, const char *au req = json_pack("{s:i,s:s,s:[s,s]}", "id", proxi->id++, "method", "mining.authorize", - "params", auth, pass); + "params", proxi->auth, proxi->pass); ret = send_json_msg(cs, req); json_decref(req); if (!ret) { @@ -638,22 +637,142 @@ out: return ret; } -static int proxy_loop(proc_instance_t *pi, connsock_t *cs) +static void send_subscribe(proxy_instance_t *proxi, int sockd) { - return 0; + json_t *json_msg; + char *msg; + + json_msg = json_pack("{sssi}", "enonce1", proxi->enonce1, + "nonce2len", proxi->nonce2len); + msg = json_dumps(json_msg, 0); + json_decref(json_msg); + send_unix_msg(sockd, msg); + free(msg); + close(sockd); +} + +static void send_notify(proxy_instance_t *proxi, int sockd) +{ + json_t *json_msg, *merkle_arr; + notify_instance_t *ni; + char *msg; + int i; + + merkle_arr = json_array(); + + mutex_lock(&proxi->notify_lock); + ni = proxi->notify_instances; + for (i = 0; i < ni->merkles; i++) + json_array_append(merkle_arr, json_string(&ni->merklehash[i][0])); + /* Use our own jobid instead of the server's one for easy lookup */ + json_msg = json_pack("{sisssssssosssssssb}", + "jobid", ni->id, "prevhash", ni->prevhash, + "coinbase1", ni->coinbase1, "coinbase2", ni->coinbase2, + "merklehash", merkle_arr, "bbversion", ni->bbversion, + "nbit", ni->nbit, "ntime", ni->ntime, + "clean", ni->clean); + mutex_unlock(&proxi->notify_lock); + + msg = json_dumps(json_msg, 0); + json_decref(json_msg); + send_unix_msg(sockd, msg); + free(msg); + close(sockd); +} + +static int proxy_loop(proc_instance_t *pi, connsock_t *cs, proxy_instance_t *proxi) +{ + unixsock_t *us = &pi->us; + ckpool_t *ckp = pi->ckp; + int sockd, ret = 0; + char *buf = NULL; + + /* We're not subscribed and authorised so tell the stratifier to + * retrieve the first subscription. */ + send_proc(ckp->stratifier, "subscribe"); + send_proc(ckp->stratifier, "notify"); + +retry: + sockd = accept(us->sockd, NULL, NULL); + if (sockd < 0) { + if (interrupted()) + goto retry; + LOGERR("Failed to accept on proxy socket"); + ret = 1; + goto out; + } + dealloc(buf); + buf = recv_unix_msg(sockd); + if (!buf) { + LOGWARNING("Failed to get message in proxy_loop"); + close(sockd); + goto retry; + } + LOGDEBUG("Proxy received request: %s", buf); + if (!strncasecmp(buf, "shutdown", 8)) { + ret = 0; + goto out; + } else if (!strncasecmp(buf, "getsubscribe", 12)) { + send_subscribe(proxi, sockd); + } else if (!strncasecmp(buf, "getnotify", 9)) { + send_notify(proxi, sockd); + } else if (!strncasecmp(buf, "ping", 4)) { + LOGDEBUG("Proxy received ping request"); + send_unix_msg(sockd, "pong"); + } + close(sockd); + goto retry; +out: + close(sockd); + return ret; +} + +static void reconnect_stratum(connsock_t *cs, proxy_instance_t *proxi) +{ + bool ret = true; + + do { + if (!ret) + sleep(5); + close(cs->fd); + ret = connect_proxy(cs); + if (!ret) + continue; + ret = subscribe_stratum(cs, proxi); + if (!ret) + continue; + ret = auth_stratum(cs, proxi); + } while (!ret); } static void *proxy_recv(void *arg) { - proxy_instance_t *pi = (proxy_instance_t *)arg; + proxy_instance_t *proxi = (proxy_instance_t *)arg; + connsock_t *cs = proxi->cs; + + rename_proc("proxyrecv"); + while (42) { + int ret; + + ret = read_socket_line(cs, 120); + if (ret < 1) { + LOGWARNING("Failed to read_socket_line in proxy_recv, attempting reconnect"); + reconnect_stratum(cs, proxi); + continue; + } + if (parse_method(proxi, cs->buf)) + continue; + LOGWARNING("Unhandled stratum message: %s", cs->buf); + } return NULL; } static void *proxy_send(void *arg) { - proxy_instance_t *pi = (proxy_instance_t *)arg; + proxy_instance_t *proxi = (proxy_instance_t *)arg; + rename_proc("proxysend"); return NULL; } @@ -680,17 +799,25 @@ static int proxy_mode(ckpool_t *ckp, proc_instance_t *pi, connsock_t *cs, goto out; } - if (!auth_stratum(cs, &proxi, auth, pass)) { + proxi.auth = auth; + proxi.pass = pass; + if (!auth_stratum(cs, &proxi)) { LOGWARNING("FATAL: Failed initial authorise to %s:%s with %s:%s !", cs->url, cs->port, auth, pass); goto out; } + mutex_init(&proxi.notify_lock); create_pthread(&proxi.pth_precv, proxy_recv, &proxi); cond_init(&proxi.psend_cond); create_pthread(&proxi.pth_psend, proxy_send, &proxi); - ret = proxy_loop(pi, cs); + ret = proxy_loop(pi, cs, &proxi); + + /* Return from the proxy loop means we have received a shutdown + * request */ + pthread_cancel(proxi.pth_precv); + pthread_cancel(proxi.pth_psend); join_pthread(proxi.pth_precv); join_pthread(proxi.pth_psend); out: diff --git a/src/stratifier.c b/src/stratifier.c index 937b4064..9882aa6b 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -429,6 +429,32 @@ static void update_base(ckpool_t *ckp) stratum_broadcast_update(new_block); } +static void update_subscribe(ckpool_t *ckp) +{ + char *buf; + + buf = send_recv_proc(ckp->generator, "getsubscribe"); + if (unlikely(!buf)) { + LOGWARNING("Failed to get subscribe from generator in update_notify"); + return; + } + LOGWARNING("Subscribe was %s", buf); + free(buf); +} + +static void update_notify(ckpool_t *ckp) +{ + char *buf; + + buf = send_recv_proc(ckp->generator, "getnotify"); + if (unlikely(!buf)) { + LOGWARNING("Failed to get notify from generator in update_notify"); + return; + } + LOGWARNING("Notify was %s", buf); + free(buf); +} + /* Enter with instance_lock held */ static stratum_instance_t *__instance_by_id(int id) { @@ -585,16 +611,21 @@ static int stratum_loop(ckpool_t *ckp, proc_instance_t *pi) { int sockd, ret = 0, selret; unixsock_t *us = &pi->us; + tv_t timeout, *to; char *buf = NULL; fd_set readfds; - tv_t timeout; reset: - timeout.tv_sec = ckp->update_interval; + if (ckp->proxy) + to = NULL; + else { + timeout.tv_sec = ckp->update_interval; + to = &timeout; + } retry: FD_ZERO(&readfds); FD_SET(us->sockd, &readfds); - selret = select(us->sockd + 1, &readfds, NULL, NULL, &timeout); + selret = select(us->sockd + 1, &readfds, NULL, NULL, to); if (selret < 0) { if (interrupted()) goto retry; @@ -639,6 +670,14 @@ retry: } else if (!strncasecmp(buf, "update", 6)) { update_base(ckp); goto reset; + } else if (!strncasecmp(buf, "subscribe", 9)) { + /* Proxifier has a new subscription */ + update_subscribe(ckp); + goto reset; + } else if (!strncasecmp(buf, "notify", 6)) { + /* Proxifier has a new notify ready */ + update_notify(ckp); + goto reset; } else if (!strncasecmp(buf, "dropclient", 10)) { int client_id;