diff --git a/src/ckpool.c b/src/ckpool.c index 044e6cb9..63cedf66 100644 --- a/src/ckpool.c +++ b/src/ckpool.c @@ -1163,7 +1163,7 @@ bool json_get_bool(bool *store, const json_t *val, const char *res) goto out; } if (!json_is_boolean(entry)) { - LOGWARNING("Json entry %s is not a boolean", res); + LOGINFO("Json entry %s is not a boolean", res); goto out; } *store = json_is_true(entry); @@ -1262,6 +1262,7 @@ static bool parse_redirecturls(ckpool_t *ckp, const json_t *arr_val) { bool ret = false; int arr_size, i; + char redirecturl[INET6_ADDRSTRLEN], url[INET6_ADDRSTRLEN], port[8]; if (!arr_val) goto out; @@ -1276,11 +1277,15 @@ static bool parse_redirecturls(ckpool_t *ckp, const json_t *arr_val) } ckp->redirecturls = arr_size; ckp->redirecturl = ckalloc(sizeof(char *) * arr_size); + ckp->redirectport = ckalloc(sizeof(char *) * arr_size); for (i = 0; i < arr_size; i++) { json_t *val = json_array_get(arr_val, i); - if (!_json_get_string(&ckp->redirecturl[i], val, "redirecturl")) - LOGWARNING("Invalid redirecturl entry number %d", i); + strncpy(redirecturl, json_string_value(val), INET6_ADDRSTRLEN - 1); + if (!url_from_serverurl(redirecturl, url, port)) + quit(1, "Invalid redirecturl entry %d %s", i, redirecturl); + ckp->redirecturl[i] = strdup(url); + ckp->redirectport[i] = strdup(port); } ret = true; out: @@ -1609,10 +1614,10 @@ int main(int argc, char **argv) } if (!ckp.name) { - if (ckp.proxy) - ckp.name = "ckproxy"; - else if (ckp.redirector) + if (ckp.redirector) ckp.name = "ckredirector"; + else if (ckp.proxy) + ckp.name = "ckproxy"; else if (ckp.passthrough) ckp.name = "ckpassthrough"; else diff --git a/src/ckpool.h b/src/ckpool.h index 010a4db5..cd62bcd6 100644 --- a/src/ckpool.h +++ b/src/ckpool.h @@ -225,6 +225,7 @@ struct ckpool_instance { /* Passthrough redirect options */ int redirecturls; char **redirecturl; + char **redirectport; /* Private data for each process */ void *data; diff --git a/src/connector.c b/src/connector.c index be006391..8be76549 100644 --- a/src/connector.c +++ b/src/connector.c @@ -733,7 +733,7 @@ static void process_client_msg(cdata_t *cdata, const char *buf) json_msg = json_loads(buf, 0, NULL); if (unlikely(!json_msg)) { - LOGWARNING("Invalid json message: %s", buf); + LOGWARNING("Invalid json message in process_client_msg: %s", buf); return; } diff --git a/src/generator.c b/src/generator.c index 4e4ec622..9e9452c4 100644 --- a/src/generator.c +++ b/src/generator.c @@ -1375,17 +1375,22 @@ static void stratifier_reconnect_client(ckpool_t *ckp, const int64_t id) /* Add a share to the gdata share hashlist. Returns the share id */ static int add_share(gdata_t *gdata, const int64_t client_id, const double diff) { - share_msg_t *share = ckzalloc(sizeof(share_msg_t)); + share_msg_t *share = ckzalloc(sizeof(share_msg_t)), *tmpshare; + time_t now; int ret; - share->submit_time = time(NULL); + share->submit_time = now = time(NULL); share->client_id = client_id; share->diff = diff; - /* Add new share entry to the share hashtable */ + /* Add new share entry to the share hashtable. Age old shares */ mutex_lock(&gdata->share_lock); ret = share->id = gdata->share_id++; HASH_ADD_I64(gdata->shares, id, share); + HASH_ITER(hh, gdata->shares, share, tmpshare) { + if (share->submit_time < now - 120) + HASH_DEL(gdata->shares, share); + } mutex_unlock(&gdata->share_lock); return ret; @@ -1479,7 +1484,7 @@ static void account_shares(proxy_instance_t *proxy, const double diff, const boo /* Returns zero if it is not recognised as a share, 1 if it is a valid share * and -1 if it is recognised as a share but invalid. */ -static int parse_share(gdata_t *gdata, proxy_instance_t *proxi, const char *buf) +static int parse_share(ckpool_t *ckp, gdata_t *gdata, proxy_instance_t *proxi, const char *buf) { json_t *val = NULL, *idval; bool result = false; @@ -1500,6 +1505,7 @@ static int parse_share(gdata_t *gdata, proxy_instance_t *proxi, const char *buf) id = json_integer_value(idval); if (unlikely(!json_get_bool(&result, val, "result"))) { LOGINFO("Failed to find result in upstream json msg: %s", buf); + goto out; } mutex_lock(&gdata->share_lock); @@ -1513,12 +1519,14 @@ static int parse_share(gdata_t *gdata, proxy_instance_t *proxi, const char *buf) proxi->id, proxi->subid, buf); /* We don't know what diff these shares are so assume the * current proxy diff. */ - account_shares(proxi, proxi->diff, result); + if (!ckp->redirector) + account_shares(proxi, proxi->diff, result); ret = -1; goto out; } ret = 1; - account_shares(proxi, share->diff, result); + if (!ckp->redirector) + account_shares(proxi, share->diff, result); LOGINFO("Proxy %d:%d share result %s from client %d", proxi->id, proxi->subid, buf, share->client_id); free(share); @@ -1692,11 +1700,36 @@ static void *proxy_send(void *arg) return NULL; } -static void passthrough_send(ckpool_t __maybe_unused *ckp, pass_msg_t *pm) +static void parse_redirector_share(ckpool_t *ckp, const char *msg) +{ + int64_t client_id; + json_t *val; + + val = json_loads(msg, 0, NULL); + if (unlikely(!val)) { + LOGWARNING("Invalid json message in parse_redirector_share: %s", msg); + return; + } + /* Extract the client id from the json message */ + client_id = json_integer_value(json_object_get(val, "client_id")); + /* Make sure this is a passthrough client value! */ + if (unlikely(client_id < 0xffffffffll)) { + LOGERR("redirect_client got invalid client id %"PRId64, client_id); + goto out; + } + /* Diff is irrelevant here as we don't keep track of it so use 0 */ + add_share(ckp->data, client_id, 0); +out: + json_decref(val); +} + +static void passthrough_send(ckpool_t *ckp, pass_msg_t *pm) { int len, sent; LOGDEBUG("Sending upstream json msg: %s", pm->msg); + if (ckp->redirector && strstr(pm->msg, "mining.submit")) + parse_redirector_share(ckp, pm->msg); len = strlen(pm->msg); sent = write_socket(pm->cs->fd, pm->msg, len); if (sent != len) { @@ -1875,6 +1908,34 @@ static void reconnect_proxy(proxy_instance_t *proxi) create_pthread(&pth, proxy_reconnect, proxi); } +static void redirect_client(ckpool_t *ckp, const char *buf) +{ + json_t *json_msg, *val; + int64_t client_id; + char *msg; + + json_msg = json_loads(buf, 0, NULL); + if (unlikely(!json_msg)) { + LOGWARNING("Invalid json message in redirect_client: %s", buf); + return; + } + /* Extract the client id from the json message */ + client_id = json_integer_value(json_object_get(json_msg, "client_id")); + /* Make sure this is a passthrough client value! */ + if (unlikely(client_id < 0xffffffffll)) { + LOGERR("redirect_client got invalid client id %"PRId64, client_id); + goto out; + } + JSON_CPACK(val, "{sIsosss[ssi]}", "id", "client_id", client_id, json_null(), + "method", "client.reconnect", "params", ckp->redirecturl[0], ckp->redirectport[0], 0); + msg = json_dumps(val, JSON_NO_UTF8 | JSON_PRESERVE_ORDER); + json_decref(val); + send_proc(ckp->connector, msg); + free(msg); +out: + json_decref(json_msg); +} + /* For receiving messages from an upstream pool to pass downstream. Responsible * for setting up the connection and testing pool is live. */ static void *passthrough_recv(void *arg) @@ -1928,6 +1989,12 @@ static void *passthrough_recv(void *arg) * process. Possibly parse parameters sent by upstream pool * here */ send_proc(ckp->connector, cs->buf); + + /* If we're a redirecting passthrough, look for a share + * responses here and redirect on a valid share. */ + if (ckp->redirector && parse_share(ckp, ckp->data, proxi, cs->buf) > 0) + redirect_client(ckp, cs->buf); + } return NULL; } @@ -2046,7 +2113,7 @@ static void *proxy_recv(void *arg) if (parse_method(ckp, subproxy, cs->buf)) continue; /* If it's not a method it should be a share result */ - if (!parse_share(gdata, subproxy, cs->buf)) { + if (!parse_share(ckp, gdata, subproxy, cs->buf)) { LOGNOTICE("Proxy %d:%d unhandled stratum message: %s", subproxy->id, subproxy->subid, cs->buf); } @@ -2133,7 +2200,7 @@ static void *userproxy_recv(void *arg) if (parse_method(ckp, proxy, cs->buf)) continue; /* If it's not a method it should be a share result */ - if (!parse_share(gdata, proxy, cs->buf)) { + if (!parse_share(ckp, gdata, proxy, cs->buf)) { LOGNOTICE("Proxy %d:%d unhandled stratum message: %s", proxy->id, proxy->subid, cs->buf); }