diff --git a/src/ckpool.c b/src/ckpool.c index 7587ee7c..554580ef 100644 --- a/src/ckpool.c +++ b/src/ckpool.c @@ -1103,7 +1103,7 @@ bool json_get_int64(int64_t *store, const json_t *val, const char *res) goto out; } if (!json_is_integer(entry)) { - LOGWARNING("Json entry %s is not an integer", res); + LOGINFO("Json entry %s is not an integer", res); goto out; } *store = json_integer_value(entry); diff --git a/src/connector.c b/src/connector.c index 8be76549..1821832f 100644 --- a/src/connector.c +++ b/src/connector.c @@ -25,6 +25,7 @@ typedef struct client_instance client_instance_t; typedef struct sender_send sender_send_t; +typedef struct share share_t; struct client_instance { /* For clients hashtable */ @@ -56,7 +57,11 @@ struct client_instance { /* Are we currently sending a blocked message from this client */ sender_send_t *sending; + + /* Is this the parent passthrough client */ bool passthrough; + + share_t *shares; }; struct sender_send { @@ -69,6 +74,14 @@ struct sender_send { int ofs; }; +struct share { + share_t *next; + share_t *prev; + + time_t submitted; + int64_t id; +}; + /* Private data for the connector */ struct connector_data { ckpool_t *ckp; @@ -348,6 +361,47 @@ static int invalidate_client(ckpool_t *ckp, cdata_t *cdata, client_instance_t *c static void send_client(cdata_t *cdata, int64_t id, char *buf); +/* Look for shares being submitted via a redirector and add them to a linked + * list for looking up the responses */ +static void parse_redirector_share(client_instance_t *client, const char *msg, const json_t *val) +{ + share_t *share, *tmp; + time_t now; + int64_t id; + + if (!json_get_int64(&id, val, "id")) { + LOGNOTICE("Failed to find redirector share id"); + return; + } + /* If this is not a share, delete any matching ID messages so we + * don't falsely assume the client has had an accepted share based on + * a true result to a different message. */ + if (!strstr(msg, "mining.submit")) { + LOGDEBUG("Redirector client %"PRId64" non share message: %s", client->id, msg); + DL_FOREACH_SAFE(client->shares, share, tmp) { + if (share->id == id) { + DL_DELETE(client->shares, share); + dealloc(share); + } + } + return; + } + share = ckzalloc(sizeof(share_t)); + now = time(NULL); + share->submitted = now; + share->id = id; + DL_APPEND(client->shares, share); + LOGINFO("Redirector adding client %"PRId64" share id: %"PRId64, client->id, id); + + /* Age old shares. */ + DL_FOREACH_SAFE(client->shares, share, tmp) { + if (now > share->submitted + 120) { + DL_DELETE(client->shares, share); + dealloc(share); + } + } +} + /* Client is holding a reference count from being on the epoll list */ static void parse_client_msg(cdata_t *cdata, client_instance_t *client) { @@ -400,14 +454,17 @@ reparse: invalidate_client(ckp, cdata, client); return; } else { - int64_t passthrough_id; char *s; if (client->passthrough) { + int64_t passthrough_id; + json_getdel_int64(&passthrough_id, val, "client_id"); passthrough_id = (client->id << 32) | passthrough_id; json_object_set_new_nocheck(val, "client_id", json_integer(passthrough_id)); } else { + if (ckp->redirector && strstr(msg, "mining.submit")) + parse_redirector_share(client, msg, val); json_object_set_new_nocheck(val, "client_id", json_integer(client->id)); json_object_set_new_nocheck(val, "address", json_string(client->address_name)); } @@ -640,6 +697,51 @@ static void *sender(void *arg) return NULL; } +static void test_redirector_shares(ckpool_t *ckp, client_instance_t *client, const char *buf) +{ + json_t *val = json_loads(buf, 0, NULL); + share_t *share, *found = NULL; + int64_t id; + + if (!val) { + LOGNOTICE("Invalid json response to client %"PRId64, client->id); + return; + } + if (!json_get_int64(&id, val, "id")) { + LOGINFO("Failed to find response id"); + goto out; + } + DL_FOREACH(client->shares, share) { + if (share->id == id) { + LOGDEBUG("Found matching share %"PRId64" in trs for client %"PRId64, + id, client->id); + DL_DELETE(client->shares, share); + found = share; + break; + } + } + if (found) { + bool result = false; + + dealloc(found); + if (!json_get_bool(&result, val, "result")) { + LOGINFO("Failed to find result in trs share"); + goto out; + } + if (!json_is_null(json_object_get(val, "error"))) { + LOGINFO("Got error for trs share"); + goto out; + } + if (!result) { + LOGDEBUG("Rejected trs share"); + goto out; + } + LOGWARNING("Found accepted share for client %"PRId64, client->id); + } +out: + json_decref(val); +} + /* Send a client by id a heap allocated buffer, allowing this function to * free the ram. */ static void send_client(cdata_t *cdata, const int64_t id, char *buf) @@ -690,6 +792,8 @@ static void send_client(cdata_t *cdata, const int64_t id, char *buf) free(buf); return; } + if (ckp->redirector) + test_redirector_shares(ckp, client, buf); } sender_send = ckzalloc(sizeof(sender_send_t));