diff --git a/src/generator.c b/src/generator.c index a58ab901..ae88a773 100644 --- a/src/generator.c +++ b/src/generator.c @@ -43,6 +43,17 @@ struct notify_instance { typedef struct notify_instance notify_instance_t; +struct share_msg { + UT_hash_handle hh; + int id; // Our own id for submitting upstream + + int client_id; + int msg_id; // Stratum message id from client + time_t submit_time; +}; + +typedef struct share_msg share_msg_t; + /* Per proxied pool instance data */ struct proxy_instance { ckpool_t *ckp; @@ -83,6 +94,10 @@ struct proxy_instance { pthread_cond_t psend_cond; stratum_msg_t *psends; + + pthread_mutex_t share_lock; + share_msg_t *shares; + int share_id; }; typedef struct proxy_instance proxy_instance_t; @@ -710,10 +725,26 @@ static void send_diff(proxy_instance_t *proxi, int sockd) static void submit_share(proxy_instance_t *proxi, json_t *val) { stratum_msg_t *msg; + share_msg_t *share; msg = ckzalloc(sizeof(stratum_msg_t)); + share = ckzalloc(sizeof(share_msg_t)); + share->submit_time = time(NULL); + share->client_id = json_integer_value(json_object_get(val, "client_id")); + share->msg_id = json_integer_value(json_object_get(val, "msg_id")); + json_object_del(val, "client_id"); + json_object_del(val, "msg_id"); msg->json_msg = val; + /* Add new share entry to the share hashtable */ + mutex_lock(&proxi->share_lock); + share->id = proxi->share_id++; + HASH_ADD_INT(proxi->shares, id, share); + mutex_unlock(&proxi->share_lock); + + json_object_set_nocheck(val, "id", json_integer(share->id)); + + /* Add the new message to the psend list */ mutex_lock(&proxi->psend_lock); DL_APPEND(proxi->psends, msg); pthread_cond_signal(&proxi->psend_cond); @@ -803,6 +834,46 @@ static void clear_notify(notify_instance_t *ni) free(ni->coinbase2); } +/* FIXME: Return something useful to the stratifier based on this result */ +static bool parse_share(ckpool_t *ckp, proxy_instance_t *proxi, const char *buf) +{ + json_t *val = NULL, *idval; + share_msg_t *share; + bool ret = false; + int id; + + val = json_loads(buf, 0, NULL); + if (!val) { + LOGINFO("Failed to parse json msg: %s", buf); + goto out; + } + idval = json_object_get(val, "id"); + if (!idval) { + LOGINFO("Failed to find id in json msg: %s", buf); + goto out; + } + id = json_integer_value(idval); + + mutex_lock(&proxi->share_lock); + HASH_FIND_INT(proxi->shares, &id, share); + if (share) + HASH_DEL(proxi->shares, share); + mutex_unlock(&proxi->share_lock); + + if (!share) { + LOGINFO("Failed to find matching share to result: %s", buf); + goto out; + } + ret = true; + LOGDEBUG("Found share from client %d with msg_id %d", share->client_id, + share->msg_id); + free(share); +out: + if (val) + json_decref(val); + return ret; +} + static void *proxy_recv(void *arg) { proxy_instance_t *proxi = (proxy_instance_t *)arg; @@ -813,16 +884,17 @@ static void *proxy_recv(void *arg) while (42) { notify_instance_t *ni, *tmp; + share_msg_t *share, *tmpshare; time_t now; int ret; now = time(NULL); + /* Age old notifications older than 10 mins old */ mutex_lock(&proxi->notify_lock); HASH_ITER(hh, proxi->notify_instances, ni, tmp) { if (HASH_COUNT(proxi->notify_instances) < 3) break; - /* Age old notifications older than 10 mins old */ if (ni->notify_time < now - 600) { HASH_DEL(proxi->notify_instances, ni); clear_notify(ni); @@ -830,6 +902,15 @@ static void *proxy_recv(void *arg) } mutex_unlock(&proxi->notify_lock); + /* Similary with shares older than 2 mins without response */ + mutex_lock(&proxi->share_lock); + HASH_ITER(hh, proxi->shares, share, tmpshare) { + if (share->submit_time < now - 120) { + HASH_DEL(proxi->shares, share); + } + } + mutex_unlock(&proxi->share_lock); + ret = read_socket_line(cs, 120); if (ret < 1) { LOGWARNING("Failed to read_socket_line in proxy_recv, attempting reconnect"); @@ -847,6 +928,10 @@ static void *proxy_recv(void *arg) } continue; } + if (parse_share(ckp, proxi, cs->buf)) { + continue; + } + /* If it's not a method it should be a share result */ LOGWARNING("Unhandled stratum message: %s", cs->buf); } return NULL; @@ -890,12 +975,12 @@ static void *proxy_send(void *arg) LOGWARNING("Failed to find matching jobid in proxysend"); continue; } - /* FIXME Use unique IDs and parse responses */ - val = json_pack("{s[ssooo]siss}", "params", proxi->auth, jobid, + val = json_pack("{s[ssooo]soss}", "params", proxi->auth, jobid, json_object_get(msg->json_msg, "nonce2"), json_object_get(msg->json_msg, "ntime"), json_object_get(msg->json_msg, "nonce"), - "id", 0, "method", "mining.submit"); + "id", json_object_get(msg->json_msg, "id"), + "method", "mining.submit"); free(jobid); send_json_msg(cs, val); json_decref(val); diff --git a/src/stratifier.c b/src/stratifier.c index 9bc2d22b..a2010891 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -1224,7 +1224,7 @@ out_unlock: /* Submit a share in proxy mode to the parent pool. workbase_lock is held */ static void submit_share(stratum_instance_t *client, uint64_t jobid, const char *nonce2, - const char *ntime, const char *nonce) + const char *ntime, const char *nonce, int msg_id) { ckpool_t *ckp = client->ckp; json_t *json_msg; @@ -1232,8 +1232,9 @@ static void submit_share(stratum_instance_t *client, uint64_t jobid, const char char *msg; sprintf(enonce2, "%s%s", client->enonce1var, nonce2); - json_msg = json_pack("{sissssss}", "jobid", jobid, "nonce2", enonce2, - "ntime", ntime, "nonce", nonce); + json_msg = json_pack("{sisssssssisi}", "jobid", jobid, "nonce2", enonce2, + "ntime", ntime, "nonce", nonce, "client_id", client->id, + "msg_id", msg_id); msg = json_dumps(json_msg, 0); json_decref(json_msg); send_proc(ckp->generator, msg); @@ -1336,7 +1337,7 @@ out_unlock: ck_runlock(&workbase_lock); if (submit) - submit_share(client, id, nonce2, ntime, nonce); + submit_share(client, id, nonce2, ntime, nonce, json_integer_value(json_object_get(json_msg, "id"))); /* Accept the lower of new and old diffs until the next update */ if (id < client->diff_change_job_id && client->old_diff < client->diff) @@ -1529,6 +1530,7 @@ static void parse_instance_msg(int client_id, json_t *msg) err_val = json_string("-1:id not found"); goto out; } + json_object_set_nocheck(json_msg, "id", id_val); #if 0 /* Random broken clients send something not an integer so just use the * json object, whatever it is. */ @@ -1553,14 +1555,13 @@ static void parse_instance_msg(int client_id, json_t *msg) } result_val = gen_json_result(client_id, json_msg, method, params, &err_val, &update); - if (unlikely(!result_val)) { + if (!result_val) { json_decref(json_msg); return; } if (!err_val) err_val = json_null(); out: - json_object_set_nocheck(json_msg, "id", id_val); json_object_set_nocheck(json_msg, "error", err_val); json_object_set_nocheck(json_msg, "result", result_val);