From 00f08eed05817ab174a5b11b6486cc45d202bdb6 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Sun, 17 Aug 2014 21:08:13 +1000 Subject: [PATCH] Make client id use the passthrough id as high bits of a 64bit version of the client id to determine the intrinsic stratum client versus the connected client --- src/connector.c | 50 ++++++++++++++++++----------- src/generator.c | 3 +- src/stratifier.c | 83 ++++++++++++++++++++++++------------------------ 3 files changed, 75 insertions(+), 61 deletions(-) diff --git a/src/connector.c b/src/connector.c index 90d87c41..7657e6d6 100644 --- a/src/connector.c +++ b/src/connector.c @@ -37,7 +37,7 @@ typedef struct connector_instance conn_instance_t; struct client_instance { /* For clients hashtable */ UT_hash_handle hh; - int id; + int64_t id; /* For fdclients hashtable */ UT_hash_handle fdhh; @@ -53,7 +53,6 @@ struct client_instance { int bufofs; bool passthrough; - int passthrough_id; }; typedef struct client_instance client_instance_t; @@ -65,7 +64,7 @@ static client_instance_t *fdclients; /* Linked list of dead clients no longer in use but may still have references */ static client_instance_t *dead_clients; -static int client_id; +static int64_t client_id; struct sender_send { struct sender_send *next; @@ -136,7 +135,7 @@ retry: client->fd = fd; ck_wlock(&ci->lock); - client->id = client_id++; + client->id = ++client_id; HASH_ADD_INT(clients, id, client); HASH_REPLACE(fdhh, fdclients, fd, SOI, client, old_client); ci->nfds++; @@ -178,11 +177,11 @@ static void invalidate_client(ckpool_t *ckp, conn_instance_t *ci, client_instanc return; if (ckp->passthrough) return; - sprintf(buf, "dropclient=%d", client->id); + sprintf(buf, "dropclient=%ld", client->id); send_proc(ckp->stratifier, buf); } -static void send_client(conn_instance_t *ci, int id, char *buf); +static void send_client(conn_instance_t *ci, int64_t id, char *buf); static void parse_client_msg(conn_instance_t *ci, client_instance_t *client) { @@ -246,9 +245,16 @@ reparse: invalidate_client(ckp, ci, client); return; } else { + int64_t passthrough_id; char *s; - json_object_set_new_nocheck(val, "client_id", json_integer(client->id)); + if (client->passthrough) { + passthrough_id = json_integer_value(json_object_get(val, "client_id")); + json_object_del(val, "client_id"); + passthrough_id = (client->id << 32) | passthrough_id; + json_object_set_new_nocheck(val, "client_id", json_integer(passthrough_id)); + } else + json_object_set_new_nocheck(val, "client_id", json_integer(client->id)); json_object_set_new_nocheck(val, "address", json_string(client->address_name)); s = json_dumps(val, 0); if (ckp->passthrough) @@ -417,7 +423,7 @@ void *sender(void *arg) /* Send a client by id a heap allocated buffer, allowing this function to * free the ram. */ -static void send_client(conn_instance_t *ci, int id, char *buf) +static void send_client(conn_instance_t *ci, int64_t id, char *buf) { sender_send_t *sender_send; client_instance_t *client; @@ -460,7 +466,7 @@ static void send_client(conn_instance_t *ci, int id, char *buf) mutex_unlock(&sender_lock); } -static client_instance_t *client_by_id(conn_instance_t *ci, int id) +static client_instance_t *client_by_id(conn_instance_t *ci, int64_t id) { client_instance_t *client; @@ -483,7 +489,8 @@ static void passthrough_client(conn_instance_t *ci, client_instance_t *client) static int connector_loop(proc_instance_t *pi, conn_instance_t *ci) { - int sockd = -1, client_id, ret = 0, selret; + int sockd = -1, ret = 0, selret; + int64_t client_id64, client_id; unixsock_t *us = &pi->us; ckpool_t *ckp = pi->ckp; char *buf = NULL; @@ -540,35 +547,34 @@ retry: goto out; if (cmdmatch(buf, "dropclient")) { client_instance_t *client; - int client_id; - ret = sscanf(buf, "dropclient=%d", &client_id); + ret = sscanf(buf, "dropclient=%ld", &client_id64); if (ret < 0) { LOGDEBUG("Connector failed to parse dropclient command: %s", buf); goto retry; } + client_id = client_id64 & 0xffffffffll; client = client_by_id(ci, client_id); if (unlikely(!client)) { - LOGINFO("Connector failed to find client id %d to drop", client_id); + LOGINFO("Connector failed to find client id %ld to drop", client_id); goto retry; } ret = drop_client(ci, client); if (ret >= 0) - LOGINFO("Connector dropped client id: %d", client_id); + LOGINFO("Connector dropped client id: %ld", client_id); goto retry; } if (cmdmatch(buf, "passthrough")) { client_instance_t *client; - int client_id; - ret = sscanf(buf, "passthrough=%d", &client_id); + ret = sscanf(buf, "passthrough=%ld", &client_id); if (ret < 0) { LOGDEBUG("Connector failed to parse passthrough command: %s", buf); goto retry; } client = client_by_id(ci, client_id); if (unlikely(!client)) { - LOGINFO("Connector failed to find client id %d to pass through", client_id); + LOGINFO("Connector failed to find client id %ld to pass through", client_id); goto retry; } passthrough_client(ci, client); @@ -587,8 +593,16 @@ retry: } /* Extract the client id from the json message and remove its entry */ - client_id = json_integer_value(json_object_get(json_msg, "client_id")); + client_id64 = json_integer_value(json_object_get(json_msg, "client_id")); json_object_del(json_msg, "client_id"); + if (client_id64 > 0xffffffffll) { + int64_t passthrough_id; + + passthrough_id = client_id64 & 0xffffffffll; + client_id = client_id64 >> 32; + json_object_set_new_nocheck(json_msg, "client_id", json_integer(passthrough_id)); + } else + client_id = client_id64; dealloc(buf); buf = json_dumps(json_msg, 0); realloc_strcat(&buf, "\n"); diff --git a/src/generator.c b/src/generator.c index 2fc4639c..9efadd1f 100644 --- a/src/generator.c +++ b/src/generator.c @@ -1216,6 +1216,7 @@ static void passthrough_send(ckpool_t __maybe_unused *ckp, pass_msg_t *pm) len = strlen(pm->msg); sent = write_socket(pm->cs->fd, pm->msg, len); if (sent != len) { + /* FIXME: Do something about this? */ LOGWARNING("Failed to passthrough %d bytes of message %s", len, pm->msg); } free(pm->msg); @@ -1227,7 +1228,7 @@ static void passthrough_add_send(proxy_instance_t *proxi, const char *msg) pass_msg_t *pm = ckzalloc(sizeof(pass_msg_t)); pm->cs = proxi->cs; - pm->msg = strdup(msg); + ASPRINTF(&pm->msg, "%s\n", msg); ckmsgq_add(proxi->passsends, pm); } diff --git a/src/stratifier.c b/src/stratifier.c index 307c7e86..2de0b1f3 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -164,7 +164,7 @@ static char lasthash[68]; struct json_params { json_t *params; json_t *id_val; - int client_id; + int64_t client_id; char address[INET6_ADDRSTRLEN]; }; @@ -173,7 +173,7 @@ typedef struct json_params json_params_t; /* Stratum json messages with their associated client id */ struct smsg { json_t *json_msg; - int client_id; + int64_t client_id; char address[INET6_ADDRSTRLEN]; }; @@ -185,12 +185,12 @@ static ckmsgq_t *ckdbq; // ckdb static ckmsgq_t *sshareq; // Stratum share sends static ckmsgq_t *sauthq; // Stratum authorisations -static int user_instance_id; +static int64_t user_instance_id; struct user_instance { UT_hash_handle hh; char username[128]; - int id; + int64_t id; int workers; }; @@ -202,7 +202,7 @@ static user_instance_t *user_instances; /* Per client stratum instance == workers */ struct stratum_instance { UT_hash_handle hh; - int id; + int64_t id; char enonce1[32]; uchar enonce1bin[16]; @@ -231,10 +231,9 @@ struct stratum_instance { char *useragent; char *workername; char *secondaryuserid; - int user_id; + int64_t user_id; ckpool_t *ckp; - bool passthrough; }; typedef struct stratum_instance stratum_instance_t; @@ -429,7 +428,7 @@ out: } static void _ckdbq_add(ckpool_t *ckp, const int idtype, json_t *val, const char *file, - const char *func, const int line) + const char *func, const int line) { static time_t time_counter; static int counter = 0; @@ -639,7 +638,7 @@ static void drop_allclients(ckpool_t *ckp) ck_wlock(&instance_lock); HASH_ITER(hh, stratum_instances, client, tmp) { HASH_DEL(stratum_instances, client); - sprintf(buf, "dropclient=%d", client->id); + sprintf(buf, "dropclient=%ld", client->id); send_proc(ckp->connector, buf); } HASH_ITER(hh, disconnected_instances, client, tmp) @@ -794,7 +793,7 @@ static void update_diff(ckpool_t *ckp) } /* Enter with instance_lock held */ -static stratum_instance_t *__instance_by_id(int id) +static stratum_instance_t *__instance_by_id(int64_t id) { stratum_instance_t *instance; @@ -803,7 +802,7 @@ static stratum_instance_t *__instance_by_id(int id) } /* Enter with write instance_lock held */ -static stratum_instance_t *__stratum_add_instance(ckpool_t *ckp, int id) +static stratum_instance_t *__stratum_add_instance(ckpool_t *ckp, int64_t id) { stratum_instance_t *instance = ckzalloc(sizeof(stratum_instance_t)); @@ -816,7 +815,7 @@ static stratum_instance_t *__stratum_add_instance(ckpool_t *ckp, int id) return instance; } -static bool disconnected_sessionid_exists(const char *sessionid, int id) +static bool disconnected_sessionid_exists(const char *sessionid, int64_t id) { bool connected_exists = false, ret = false; stratum_instance_t *instance, *tmp; @@ -903,7 +902,7 @@ static void stratum_broadcast(json_t *val) mutex_unlock(&ssends->lock); } -static void stratum_add_send(json_t *val, int client_id) +static void stratum_add_send(json_t *val, int64_t client_id) { smsg_t *msg; @@ -931,7 +930,7 @@ static void dec_worker(user_instance_t *instance) mutex_unlock(&stats_lock); } -static void drop_client(int id) +static void drop_client(int64_t id) { stratum_instance_t *client = NULL; bool dec = false; @@ -1085,9 +1084,9 @@ retry: } else if (cmdmatch(buf, "diff")) { update_diff(ckp); } else if (cmdmatch(buf, "dropclient")) { - int client_id; + int64_t client_id; - ret = sscanf(buf, "dropclient=%d", &client_id); + ret = sscanf(buf, "dropclient=%ld", &client_id); if (ret < 0) LOGDEBUG("Stratifier failed to parse dropclient command: %s", buf); else @@ -1166,7 +1165,7 @@ static void new_enonce1(stratum_instance_t *client) } /* Extranonce1 must be set here */ -static json_t *parse_subscribe(int client_id, json_t *params_val) +static json_t *parse_subscribe(int64_t client_id, json_t *params_val) { stratum_instance_t *client = NULL; bool old_match = false; @@ -1278,7 +1277,7 @@ static int send_recv_auth(stratum_instance_t *client) ts_realtime(&now); sprintf(cdfield, "%lu,%lu", now.tv_sec, now.tv_nsec); - val = json_pack("{ss,ss,ss,ss,si,ss,ss,ss,ss,ss}", + val = json_pack("{ss,ss,ss,ss,sI,ss,ss,ss,ss,ss}", "username", client->user_instance->username, "workername", client->workername, "poolinstance", ckp->name, @@ -1550,7 +1549,7 @@ test_blocksolve(stratum_instance_t *client, workbase_t *wb, const uchar *data, c flip_32(swap, hash); __bin2hex(blockhash, swap, 32); - val = json_pack("{si,ss,ss,sI,ss,ss,si,ss,ss,ss,sI,ss,ss,ss,ss}", + val = json_pack("{si,ss,ss,sI,ss,ss,sI,ss,ss,ss,sI,ss,ss,ss,ss}", "height", wb->height, "blockhash", blockhash, "confirmed", "n", @@ -1661,7 +1660,7 @@ static void submit_share(stratum_instance_t *client, int64_t jobid, const char * char *msg; sprintf(enonce2, "%s%s", client->enonce1var, nonce2); - json_msg = json_pack("{sisssssssisi}", "jobid", jobid, "nonce2", enonce2, + json_msg = json_pack("{sisssssssIsi}", "jobid", jobid, "nonce2", enonce2, "ntime", ntime, "nonce", nonce, "client_id", client->id, "msg_id", msg_id); msg = json_dumps(json_msg, 0); @@ -1851,7 +1850,7 @@ out_unlock: ckdbq_add(ckp, ID_SHARES, val); out: if (!share) { - val = json_pack("{si,ss,ss,sI,ss,ss,so,si,ss,ss,ss,ss}", + val = json_pack("{sI,ss,ss,sI,ss,ss,so,si,ss,ss,ss,ss}", "clientid", client->id, "secondaryuserid", client->secondaryuserid, "enonce1", client->enonce1, @@ -1903,7 +1902,7 @@ static void stratum_broadcast_update(bool clean) } /* For sending a single stratum template update */ -static void stratum_send_update(int client_id, bool clean) +static void stratum_send_update(int64_t client_id, bool clean) { json_t *json_msg; @@ -1914,7 +1913,7 @@ static void stratum_send_update(int client_id, bool clean) stratum_add_send(json_msg, client_id); } -static void send_json_err(int client_id, json_t *id_val, const char *err_msg) +static void send_json_err(int64_t client_id, json_t *id_val, const char *err_msg) { json_t *val; @@ -1922,7 +1921,7 @@ static void send_json_err(int client_id, json_t *id_val, const char *err_msg) stratum_add_send(val, client_id); } -static void update_client(const int client_id) +static void update_client(const int64_t client_id) { stratum_instance_t *client; @@ -1936,7 +1935,7 @@ static void update_client(const int client_id) stratum_send_diff(client); } -static json_params_t *create_json_params(const int client_id, const json_t *params, const json_t *id_val, const char *address) +static json_params_t *create_json_params(const int64_t client_id, const json_t *params, const json_t *id_val, const char *address) { json_params_t *jp = ckalloc(sizeof(json_params_t)); @@ -1947,7 +1946,7 @@ static json_params_t *create_json_params(const int client_id, const json_t *para return jp; } -static void parse_method(const int client_id, json_t *id_val, json_t *method_val, +static void parse_method(const int64_t client_id, json_t *id_val, json_t *method_val, json_t *params_val, char *address) { stratum_instance_t *client; @@ -1982,18 +1981,17 @@ static void parse_method(const int client_id, json_t *id_val, json_t *method_val if (unlikely(cmdmatch(method, "mining.passthrough"))) { /* We need to inform the connector process that this client - * is a passthrough and to manage its messages accordingly */ - client->passthrough = true; + * is a passthrough and to manage its messages accordingly. + * Remove this instance since the client id may well be + * reused */ + ck_wlock(&instance_lock); + HASH_DEL(stratum_instances, client); + ck_wunlock(&instance_lock); + LOGINFO("Adding passthrough client %d", client->id); - snprintf(buf, 255, "passthrough=%d", client->id); + snprintf(buf, 255, "passthrough=%ld", client->id); send_proc(client->ckp->connector, buf); - return; - - } - - /* No passthrough messages should get through from here on */ - if (unlikely(client->passthrough)) { - LOGWARNING("Passthrough client messages falling through to stratifier"); + free(client); return; } @@ -2010,7 +2008,7 @@ static void parse_method(const int client_id, json_t *id_val, json_t *method_val * stratifier process to restart since it will have lost all * the stratum instance data. Clients will just reconnect. */ LOGINFO("Dropping unauthorised client %d", client->id); - snprintf(buf, 255, "dropclient=%d", client->id); + snprintf(buf, 255, "dropclient=%ld", client->id); send_proc(client->ckp->connector, buf); return; } @@ -2028,7 +2026,7 @@ static void parse_method(const int client_id, json_t *id_val, json_t *method_val static void parse_instance_msg(smsg_t *msg) { json_t *val = msg->json_msg, *id_val, *method, *params; - int client_id = msg->client_id; + int64_t client_id = msg->client_id; /* Return back the same id_val even if it's null or not existent. */ id_val = json_object_get(val, "id"); @@ -2140,7 +2138,7 @@ static void sshare_process(ckpool_t __maybe_unused *ckp, json_params_t *jp) { json_t *result_val, *json_msg, *err_val = NULL; stratum_instance_t *client; - int client_id; + int64_t client_id; client_id = jp->client_id; @@ -2170,7 +2168,8 @@ static void sauth_process(ckpool_t *ckp, json_params_t *jp) { json_t *result_val, *json_msg, *err_val = NULL; stratum_instance_t *client; - int client_id, errnum = 0; + int64_t client_id; + int errnum = 0; client_id = jp->client_id; @@ -2179,7 +2178,7 @@ static void sauth_process(ckpool_t *ckp, json_params_t *jp) ck_runlock(&instance_lock); if (unlikely(!client)) { - LOGINFO("Authoriser failed to find client id %d in hashtable!", client_id); + LOGINFO("Authoriser failed to find client id %ld in hashtable!", client_id); goto out; } result_val = parse_authorise(client, jp->params, &err_val, jp->address, &errnum); @@ -2278,7 +2277,7 @@ static void update_userstats(ckpool_t *ckp) ghs5 = client->dsps5 * nonces; ghs60 = client->dsps60 * nonces; ghs1440 = client->dsps1440 * nonces; - val = json_pack("{ss,si,si,ss,ss,sf,sf,sf,sf,sb,ss,ss,ss,ss}", + val = json_pack("{ss,sI,si,ss,ss,sf,sf,sf,sf,sb,ss,ss,ss,ss}", "poolinstance", ckp->name, "instanceid", client->id, "elapsed", elapsed,