diff --git a/src/generator.c b/src/generator.c index bf2a655a..1aa56e0e 100644 --- a/src/generator.c +++ b/src/generator.c @@ -18,7 +18,9 @@ #include "libckpool.h" #include "generator.h" #include "bitcoin.h" +#include "stratifier.h" #include "uthash.h" +#include "utlist.h" struct notify_instance { /* Hash table data */ @@ -76,7 +78,10 @@ struct proxy_instance { pthread_t pth_precv; pthread_t pth_psend; + pthread_mutex_t psend_lock; pthread_cond_t psend_cond; + + stratum_msg_t *psends; }; typedef struct proxy_instance proxy_instance_t; @@ -718,6 +723,19 @@ static void send_diff(proxy_instance_t *proxi, int sockd) close(sockd); } +static void submit_share(proxy_instance_t *proxi, json_t *val) +{ + stratum_msg_t *msg; + + msg = ckzalloc(sizeof(stratum_msg_t)); + msg->json_msg = val; + + mutex_lock(&proxi->psend_lock); + DL_APPEND(proxi->psends, msg); + pthread_cond_signal(&proxi->psend_cond); + mutex_unlock(&proxi->psend_lock); +} + static int proxy_loop(proc_instance_t *pi, connsock_t *cs, proxy_instance_t *proxi) { unixsock_t *us = &pi->us; @@ -760,6 +778,14 @@ retry: } else if (!strncasecmp(buf, "ping", 4)) { LOGDEBUG("Proxy received ping request"); send_unix_msg(sockd, "pong"); + } else { + /* Anything remaining should be share submissions */ + json_t *val = json_loads(buf, 0, NULL); + + if (!val) + LOGWARNING("Received unrecognised message: %s", buf); + else + submit_share(proxi, val); } close(sockd); goto retry; @@ -842,11 +868,34 @@ static void *proxy_recv(void *arg) return NULL; } +/* For processing and sending shares */ static void *proxy_send(void *arg) { proxy_instance_t *proxi = (proxy_instance_t *)arg; rename_proc("proxysend"); + + while (42) { + stratum_msg_t *msg; + char *buf; + + mutex_lock(&proxi->psend_lock); + if (!proxi->psends) + pthread_cond_wait(&proxi->psend_cond, &proxi->psend_lock); + msg = proxi->psends; + if (likely(msg)) + DL_DELETE(proxi->psends, msg); + mutex_unlock(&proxi->psend_lock); + + if (unlikely(!msg)) + continue; + + buf = json_dumps(msg->json_msg, 0); + LOGDEBUG("Proxysend received: %s", buf); + + json_decref(msg->json_msg); + free(msg); + } return NULL; } @@ -883,6 +932,7 @@ static int proxy_mode(ckpool_t *ckp, proc_instance_t *pi, connsock_t *cs, mutex_init(&proxi.notify_lock); create_pthread(&proxi.pth_precv, proxy_recv, &proxi); + mutex_init(&proxi.psend_lock); cond_init(&proxi.psend_cond); create_pthread(&proxi.pth_psend, proxy_send, &proxi); diff --git a/src/stratifier.c b/src/stratifier.c index 3b9e6ea7..961c85e7 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -20,9 +20,10 @@ #include "ckpool.h" #include "libckpool.h" #include "bitcoin.h" +#include "sha2.h" +#include "stratifier.h" #include "uthash.h" #include "utlist.h" -#include "sha2.h" static const char *workpadding = "000000800000000000000000000000000000000000000000000000000000000000000000000000000000000080020000"; @@ -155,16 +156,6 @@ static uint64_t workbase_id; static uint64_t blockchange_id; static char lasthash[68]; -struct stratum_msg { - struct stratum_msg *next; - struct stratum_msg *prev; - - json_t *json_msg; - int client_id; -}; - -typedef struct stratum_msg stratum_msg_t; - /* For protecting the stratum msg data */ static pthread_mutex_t stratum_recv_lock; static pthread_mutex_t stratum_send_lock; @@ -1188,7 +1179,6 @@ static double submission_diff(stratum_instance_t *client, workbase_t *wb, const if (!wb->proxy) test_blocksolve(wb, swap, ret, coinbase, cblen); - /* FIXME: Log share here */ return ret; } @@ -1214,6 +1204,24 @@ out_unlock: return ret; } +/* Submit a share in proxy mode to the parent pool. workbase_lock is held */ +static void __submit_share(workbase_t *wb, const char *jobid, const char *nonce2, + const char *ntime, const char *nonce) +{ + ckpool_t *ckp = wb->ckp; + json_t *json_msg; + char enonce2[32]; + char *msg; + + sprintf(enonce2, "%s%s", wb->enonce1const, nonce2); + json_msg = json_pack("{ssssssss}", "jobid", jobid, "nonce2", enonce2, + "ntime", ntime, "nonce", nonce); + msg = json_dumps(json_msg, 0); + json_decref(json_msg); + send_proc(ckp->generator, msg); + free(msg); +} + static json_t *parse_submit(stratum_instance_t *client, json_t *json_msg, json_t *params_val, json_t **err_val) { @@ -1303,6 +1311,8 @@ static json_t *parse_submit(stratum_instance_t *client, json_t *json_msg, goto out_unlock; } invalid = false; + if (wb->proxy && sdiff > wb->diff) + __submit_share(wb, idstring, nonce2, ntime, nonce); out_unlock: ck_runlock(&workbase_lock); diff --git a/src/stratifier.h b/src/stratifier.h index 0f41b91c..e6855878 100644 --- a/src/stratifier.h +++ b/src/stratifier.h @@ -10,6 +10,16 @@ #ifndef STRATIFIER_H #define STRATIFIER_H +struct stratum_msg { + struct stratum_msg *next; + struct stratum_msg *prev; + + json_t *json_msg; + int client_id; +}; + +typedef struct stratum_msg stratum_msg_t; + int stratifier(proc_instance_t *pi); #endif /* STRATIFIER_H */