Browse Source

Inform the stratifier process of the data in subscribe and notify from the generator proxy

master
Con Kolivas 11 years ago
parent
commit
fae9983fd7
  1. 149
      src/generator.c
  2. 43
      src/stratifier.c

149
src/generator.c

@ -44,8 +44,8 @@ struct proxy_instance {
ckpool_t *ckp; ckpool_t *ckp;
connsock_t *cs; connsock_t *cs;
char *auth; const char *auth;
char *pass; const char *pass;
char *enonce1; char *enonce1;
char *enonce1bin; char *enonce1bin;
@ -588,8 +588,7 @@ out:
return ret; return ret;
} }
static bool auth_stratum(connsock_t *cs, proxy_instance_t *proxi, const char *auth, static bool auth_stratum(connsock_t *cs, proxy_instance_t *proxi)
const char *pass)
{ {
json_t *val = NULL, *res_val, *req; json_t *val = NULL, *res_val, *req;
bool ret; bool ret;
@ -597,7 +596,7 @@ static bool auth_stratum(connsock_t *cs, proxy_instance_t *proxi, const char *au
req = json_pack("{s:i,s:s,s:[s,s]}", req = json_pack("{s:i,s:s,s:[s,s]}",
"id", proxi->id++, "id", proxi->id++,
"method", "mining.authorize", "method", "mining.authorize",
"params", auth, pass); "params", proxi->auth, proxi->pass);
ret = send_json_msg(cs, req); ret = send_json_msg(cs, req);
json_decref(req); json_decref(req);
if (!ret) { if (!ret) {
@ -638,22 +637,142 @@ out:
return ret; return ret;
} }
static int proxy_loop(proc_instance_t *pi, connsock_t *cs) static void send_subscribe(proxy_instance_t *proxi, int sockd)
{ {
return 0; json_t *json_msg;
char *msg;
json_msg = json_pack("{sssi}", "enonce1", proxi->enonce1,
"nonce2len", proxi->nonce2len);
msg = json_dumps(json_msg, 0);
json_decref(json_msg);
send_unix_msg(sockd, msg);
free(msg);
close(sockd);
}
static void send_notify(proxy_instance_t *proxi, int sockd)
{
json_t *json_msg, *merkle_arr;
notify_instance_t *ni;
char *msg;
int i;
merkle_arr = json_array();
mutex_lock(&proxi->notify_lock);
ni = proxi->notify_instances;
for (i = 0; i < ni->merkles; i++)
json_array_append(merkle_arr, json_string(&ni->merklehash[i][0]));
/* Use our own jobid instead of the server's one for easy lookup */
json_msg = json_pack("{sisssssssosssssssb}",
"jobid", ni->id, "prevhash", ni->prevhash,
"coinbase1", ni->coinbase1, "coinbase2", ni->coinbase2,
"merklehash", merkle_arr, "bbversion", ni->bbversion,
"nbit", ni->nbit, "ntime", ni->ntime,
"clean", ni->clean);
mutex_unlock(&proxi->notify_lock);
msg = json_dumps(json_msg, 0);
json_decref(json_msg);
send_unix_msg(sockd, msg);
free(msg);
close(sockd);
}
static int proxy_loop(proc_instance_t *pi, connsock_t *cs, proxy_instance_t *proxi)
{
unixsock_t *us = &pi->us;
ckpool_t *ckp = pi->ckp;
int sockd, ret = 0;
char *buf = NULL;
/* We're not subscribed and authorised so tell the stratifier to
* retrieve the first subscription. */
send_proc(ckp->stratifier, "subscribe");
send_proc(ckp->stratifier, "notify");
retry:
sockd = accept(us->sockd, NULL, NULL);
if (sockd < 0) {
if (interrupted())
goto retry;
LOGERR("Failed to accept on proxy socket");
ret = 1;
goto out;
}
dealloc(buf);
buf = recv_unix_msg(sockd);
if (!buf) {
LOGWARNING("Failed to get message in proxy_loop");
close(sockd);
goto retry;
}
LOGDEBUG("Proxy received request: %s", buf);
if (!strncasecmp(buf, "shutdown", 8)) {
ret = 0;
goto out;
} else if (!strncasecmp(buf, "getsubscribe", 12)) {
send_subscribe(proxi, sockd);
} else if (!strncasecmp(buf, "getnotify", 9)) {
send_notify(proxi, sockd);
} else if (!strncasecmp(buf, "ping", 4)) {
LOGDEBUG("Proxy received ping request");
send_unix_msg(sockd, "pong");
}
close(sockd);
goto retry;
out:
close(sockd);
return ret;
}
static void reconnect_stratum(connsock_t *cs, proxy_instance_t *proxi)
{
bool ret = true;
do {
if (!ret)
sleep(5);
close(cs->fd);
ret = connect_proxy(cs);
if (!ret)
continue;
ret = subscribe_stratum(cs, proxi);
if (!ret)
continue;
ret = auth_stratum(cs, proxi);
} while (!ret);
} }
static void *proxy_recv(void *arg) static void *proxy_recv(void *arg)
{ {
proxy_instance_t *pi = (proxy_instance_t *)arg; proxy_instance_t *proxi = (proxy_instance_t *)arg;
connsock_t *cs = proxi->cs;
rename_proc("proxyrecv");
while (42) {
int ret;
ret = read_socket_line(cs, 120);
if (ret < 1) {
LOGWARNING("Failed to read_socket_line in proxy_recv, attempting reconnect");
reconnect_stratum(cs, proxi);
continue;
}
if (parse_method(proxi, cs->buf))
continue;
LOGWARNING("Unhandled stratum message: %s", cs->buf);
}
return NULL; return NULL;
} }
static void *proxy_send(void *arg) static void *proxy_send(void *arg)
{ {
proxy_instance_t *pi = (proxy_instance_t *)arg; proxy_instance_t *proxi = (proxy_instance_t *)arg;
rename_proc("proxysend");
return NULL; return NULL;
} }
@ -680,17 +799,25 @@ static int proxy_mode(ckpool_t *ckp, proc_instance_t *pi, connsock_t *cs,
goto out; goto out;
} }
if (!auth_stratum(cs, &proxi, auth, pass)) { proxi.auth = auth;
proxi.pass = pass;
if (!auth_stratum(cs, &proxi)) {
LOGWARNING("FATAL: Failed initial authorise to %s:%s with %s:%s !", LOGWARNING("FATAL: Failed initial authorise to %s:%s with %s:%s !",
cs->url, cs->port, auth, pass); cs->url, cs->port, auth, pass);
goto out; goto out;
} }
mutex_init(&proxi.notify_lock); mutex_init(&proxi.notify_lock);
create_pthread(&proxi.pth_precv, proxy_recv, &proxi); create_pthread(&proxi.pth_precv, proxy_recv, &proxi);
cond_init(&proxi.psend_cond); cond_init(&proxi.psend_cond);
create_pthread(&proxi.pth_psend, proxy_send, &proxi); create_pthread(&proxi.pth_psend, proxy_send, &proxi);
ret = proxy_loop(pi, cs); ret = proxy_loop(pi, cs, &proxi);
/* Return from the proxy loop means we have received a shutdown
* request */
pthread_cancel(proxi.pth_precv);
pthread_cancel(proxi.pth_psend);
join_pthread(proxi.pth_precv); join_pthread(proxi.pth_precv);
join_pthread(proxi.pth_psend); join_pthread(proxi.pth_psend);
out: out:

43
src/stratifier.c

@ -429,6 +429,32 @@ static void update_base(ckpool_t *ckp)
stratum_broadcast_update(new_block); stratum_broadcast_update(new_block);
} }
static void update_subscribe(ckpool_t *ckp)
{
char *buf;
buf = send_recv_proc(ckp->generator, "getsubscribe");
if (unlikely(!buf)) {
LOGWARNING("Failed to get subscribe from generator in update_notify");
return;
}
LOGWARNING("Subscribe was %s", buf);
free(buf);
}
static void update_notify(ckpool_t *ckp)
{
char *buf;
buf = send_recv_proc(ckp->generator, "getnotify");
if (unlikely(!buf)) {
LOGWARNING("Failed to get notify from generator in update_notify");
return;
}
LOGWARNING("Notify was %s", buf);
free(buf);
}
/* Enter with instance_lock held */ /* Enter with instance_lock held */
static stratum_instance_t *__instance_by_id(int id) static stratum_instance_t *__instance_by_id(int id)
{ {
@ -585,16 +611,21 @@ static int stratum_loop(ckpool_t *ckp, proc_instance_t *pi)
{ {
int sockd, ret = 0, selret; int sockd, ret = 0, selret;
unixsock_t *us = &pi->us; unixsock_t *us = &pi->us;
tv_t timeout, *to;
char *buf = NULL; char *buf = NULL;
fd_set readfds; fd_set readfds;
tv_t timeout;
reset: reset:
if (ckp->proxy)
to = NULL;
else {
timeout.tv_sec = ckp->update_interval; timeout.tv_sec = ckp->update_interval;
to = &timeout;
}
retry: retry:
FD_ZERO(&readfds); FD_ZERO(&readfds);
FD_SET(us->sockd, &readfds); FD_SET(us->sockd, &readfds);
selret = select(us->sockd + 1, &readfds, NULL, NULL, &timeout); selret = select(us->sockd + 1, &readfds, NULL, NULL, to);
if (selret < 0) { if (selret < 0) {
if (interrupted()) if (interrupted())
goto retry; goto retry;
@ -639,6 +670,14 @@ retry:
} else if (!strncasecmp(buf, "update", 6)) { } else if (!strncasecmp(buf, "update", 6)) {
update_base(ckp); update_base(ckp);
goto reset; goto reset;
} else if (!strncasecmp(buf, "subscribe", 9)) {
/* Proxifier has a new subscription */
update_subscribe(ckp);
goto reset;
} else if (!strncasecmp(buf, "notify", 6)) {
/* Proxifier has a new notify ready */
update_notify(ckp);
goto reset;
} else if (!strncasecmp(buf, "dropclient", 10)) { } else if (!strncasecmp(buf, "dropclient", 10)) {
int client_id; int client_id;

Loading…
Cancel
Save