Browse Source

Make client id use the passthrough id as high bits of a 64bit version of the client id to determine the intrinsic stratum client versus the connected client

master
Con Kolivas 10 years ago
parent
commit
00f08eed05
  1. 50
      src/connector.c
  2. 3
      src/generator.c
  3. 83
      src/stratifier.c

50
src/connector.c

@ -37,7 +37,7 @@ typedef struct connector_instance conn_instance_t;
struct client_instance { struct client_instance {
/* For clients hashtable */ /* For clients hashtable */
UT_hash_handle hh; UT_hash_handle hh;
int id; int64_t id;
/* For fdclients hashtable */ /* For fdclients hashtable */
UT_hash_handle fdhh; UT_hash_handle fdhh;
@ -53,7 +53,6 @@ struct client_instance {
int bufofs; int bufofs;
bool passthrough; bool passthrough;
int passthrough_id;
}; };
typedef struct client_instance client_instance_t; typedef struct client_instance client_instance_t;
@ -65,7 +64,7 @@ static client_instance_t *fdclients;
/* Linked list of dead clients no longer in use but may still have references */ /* Linked list of dead clients no longer in use but may still have references */
static client_instance_t *dead_clients; static client_instance_t *dead_clients;
static int client_id; static int64_t client_id;
struct sender_send { struct sender_send {
struct sender_send *next; struct sender_send *next;
@ -136,7 +135,7 @@ retry:
client->fd = fd; client->fd = fd;
ck_wlock(&ci->lock); ck_wlock(&ci->lock);
client->id = client_id++; client->id = ++client_id;
HASH_ADD_INT(clients, id, client); HASH_ADD_INT(clients, id, client);
HASH_REPLACE(fdhh, fdclients, fd, SOI, client, old_client); HASH_REPLACE(fdhh, fdclients, fd, SOI, client, old_client);
ci->nfds++; ci->nfds++;
@ -178,11 +177,11 @@ static void invalidate_client(ckpool_t *ckp, conn_instance_t *ci, client_instanc
return; return;
if (ckp->passthrough) if (ckp->passthrough)
return; return;
sprintf(buf, "dropclient=%d", client->id); sprintf(buf, "dropclient=%ld", client->id);
send_proc(ckp->stratifier, buf); send_proc(ckp->stratifier, buf);
} }
static void send_client(conn_instance_t *ci, int id, char *buf); static void send_client(conn_instance_t *ci, int64_t id, char *buf);
static void parse_client_msg(conn_instance_t *ci, client_instance_t *client) static void parse_client_msg(conn_instance_t *ci, client_instance_t *client)
{ {
@ -246,9 +245,16 @@ reparse:
invalidate_client(ckp, ci, client); invalidate_client(ckp, ci, client);
return; return;
} else { } else {
int64_t passthrough_id;
char *s; char *s;
json_object_set_new_nocheck(val, "client_id", json_integer(client->id)); if (client->passthrough) {
passthrough_id = json_integer_value(json_object_get(val, "client_id"));
json_object_del(val, "client_id");
passthrough_id = (client->id << 32) | passthrough_id;
json_object_set_new_nocheck(val, "client_id", json_integer(passthrough_id));
} else
json_object_set_new_nocheck(val, "client_id", json_integer(client->id));
json_object_set_new_nocheck(val, "address", json_string(client->address_name)); json_object_set_new_nocheck(val, "address", json_string(client->address_name));
s = json_dumps(val, 0); s = json_dumps(val, 0);
if (ckp->passthrough) if (ckp->passthrough)
@ -417,7 +423,7 @@ void *sender(void *arg)
/* Send a client by id a heap allocated buffer, allowing this function to /* Send a client by id a heap allocated buffer, allowing this function to
* free the ram. */ * free the ram. */
static void send_client(conn_instance_t *ci, int id, char *buf) static void send_client(conn_instance_t *ci, int64_t id, char *buf)
{ {
sender_send_t *sender_send; sender_send_t *sender_send;
client_instance_t *client; client_instance_t *client;
@ -460,7 +466,7 @@ static void send_client(conn_instance_t *ci, int id, char *buf)
mutex_unlock(&sender_lock); mutex_unlock(&sender_lock);
} }
static client_instance_t *client_by_id(conn_instance_t *ci, int id) static client_instance_t *client_by_id(conn_instance_t *ci, int64_t id)
{ {
client_instance_t *client; client_instance_t *client;
@ -483,7 +489,8 @@ static void passthrough_client(conn_instance_t *ci, client_instance_t *client)
static int connector_loop(proc_instance_t *pi, conn_instance_t *ci) static int connector_loop(proc_instance_t *pi, conn_instance_t *ci)
{ {
int sockd = -1, client_id, ret = 0, selret; int sockd = -1, ret = 0, selret;
int64_t client_id64, client_id;
unixsock_t *us = &pi->us; unixsock_t *us = &pi->us;
ckpool_t *ckp = pi->ckp; ckpool_t *ckp = pi->ckp;
char *buf = NULL; char *buf = NULL;
@ -540,35 +547,34 @@ retry:
goto out; goto out;
if (cmdmatch(buf, "dropclient")) { if (cmdmatch(buf, "dropclient")) {
client_instance_t *client; client_instance_t *client;
int client_id;
ret = sscanf(buf, "dropclient=%d", &client_id); ret = sscanf(buf, "dropclient=%ld", &client_id64);
if (ret < 0) { if (ret < 0) {
LOGDEBUG("Connector failed to parse dropclient command: %s", buf); LOGDEBUG("Connector failed to parse dropclient command: %s", buf);
goto retry; goto retry;
} }
client_id = client_id64 & 0xffffffffll;
client = client_by_id(ci, client_id); client = client_by_id(ci, client_id);
if (unlikely(!client)) { if (unlikely(!client)) {
LOGINFO("Connector failed to find client id %d to drop", client_id); LOGINFO("Connector failed to find client id %ld to drop", client_id);
goto retry; goto retry;
} }
ret = drop_client(ci, client); ret = drop_client(ci, client);
if (ret >= 0) if (ret >= 0)
LOGINFO("Connector dropped client id: %d", client_id); LOGINFO("Connector dropped client id: %ld", client_id);
goto retry; goto retry;
} }
if (cmdmatch(buf, "passthrough")) { if (cmdmatch(buf, "passthrough")) {
client_instance_t *client; client_instance_t *client;
int client_id;
ret = sscanf(buf, "passthrough=%d", &client_id); ret = sscanf(buf, "passthrough=%ld", &client_id);
if (ret < 0) { if (ret < 0) {
LOGDEBUG("Connector failed to parse passthrough command: %s", buf); LOGDEBUG("Connector failed to parse passthrough command: %s", buf);
goto retry; goto retry;
} }
client = client_by_id(ci, client_id); client = client_by_id(ci, client_id);
if (unlikely(!client)) { if (unlikely(!client)) {
LOGINFO("Connector failed to find client id %d to pass through", client_id); LOGINFO("Connector failed to find client id %ld to pass through", client_id);
goto retry; goto retry;
} }
passthrough_client(ci, client); passthrough_client(ci, client);
@ -587,8 +593,16 @@ retry:
} }
/* Extract the client id from the json message and remove its entry */ /* Extract the client id from the json message and remove its entry */
client_id = json_integer_value(json_object_get(json_msg, "client_id")); client_id64 = json_integer_value(json_object_get(json_msg, "client_id"));
json_object_del(json_msg, "client_id"); json_object_del(json_msg, "client_id");
if (client_id64 > 0xffffffffll) {
int64_t passthrough_id;
passthrough_id = client_id64 & 0xffffffffll;
client_id = client_id64 >> 32;
json_object_set_new_nocheck(json_msg, "client_id", json_integer(passthrough_id));
} else
client_id = client_id64;
dealloc(buf); dealloc(buf);
buf = json_dumps(json_msg, 0); buf = json_dumps(json_msg, 0);
realloc_strcat(&buf, "\n"); realloc_strcat(&buf, "\n");

3
src/generator.c

@ -1216,6 +1216,7 @@ static void passthrough_send(ckpool_t __maybe_unused *ckp, pass_msg_t *pm)
len = strlen(pm->msg); len = strlen(pm->msg);
sent = write_socket(pm->cs->fd, pm->msg, len); sent = write_socket(pm->cs->fd, pm->msg, len);
if (sent != len) { if (sent != len) {
/* FIXME: Do something about this? */
LOGWARNING("Failed to passthrough %d bytes of message %s", len, pm->msg); LOGWARNING("Failed to passthrough %d bytes of message %s", len, pm->msg);
} }
free(pm->msg); free(pm->msg);
@ -1227,7 +1228,7 @@ static void passthrough_add_send(proxy_instance_t *proxi, const char *msg)
pass_msg_t *pm = ckzalloc(sizeof(pass_msg_t)); pass_msg_t *pm = ckzalloc(sizeof(pass_msg_t));
pm->cs = proxi->cs; pm->cs = proxi->cs;
pm->msg = strdup(msg); ASPRINTF(&pm->msg, "%s\n", msg);
ckmsgq_add(proxi->passsends, pm); ckmsgq_add(proxi->passsends, pm);
} }

83
src/stratifier.c

@ -164,7 +164,7 @@ static char lasthash[68];
struct json_params { struct json_params {
json_t *params; json_t *params;
json_t *id_val; json_t *id_val;
int client_id; int64_t client_id;
char address[INET6_ADDRSTRLEN]; char address[INET6_ADDRSTRLEN];
}; };
@ -173,7 +173,7 @@ typedef struct json_params json_params_t;
/* Stratum json messages with their associated client id */ /* Stratum json messages with their associated client id */
struct smsg { struct smsg {
json_t *json_msg; json_t *json_msg;
int client_id; int64_t client_id;
char address[INET6_ADDRSTRLEN]; char address[INET6_ADDRSTRLEN];
}; };
@ -185,12 +185,12 @@ static ckmsgq_t *ckdbq; // ckdb
static ckmsgq_t *sshareq; // Stratum share sends static ckmsgq_t *sshareq; // Stratum share sends
static ckmsgq_t *sauthq; // Stratum authorisations static ckmsgq_t *sauthq; // Stratum authorisations
static int user_instance_id; static int64_t user_instance_id;
struct user_instance { struct user_instance {
UT_hash_handle hh; UT_hash_handle hh;
char username[128]; char username[128];
int id; int64_t id;
int workers; int workers;
}; };
@ -202,7 +202,7 @@ static user_instance_t *user_instances;
/* Per client stratum instance == workers */ /* Per client stratum instance == workers */
struct stratum_instance { struct stratum_instance {
UT_hash_handle hh; UT_hash_handle hh;
int id; int64_t id;
char enonce1[32]; char enonce1[32];
uchar enonce1bin[16]; uchar enonce1bin[16];
@ -231,10 +231,9 @@ struct stratum_instance {
char *useragent; char *useragent;
char *workername; char *workername;
char *secondaryuserid; char *secondaryuserid;
int user_id; int64_t user_id;
ckpool_t *ckp; ckpool_t *ckp;
bool passthrough;
}; };
typedef struct stratum_instance stratum_instance_t; typedef struct stratum_instance stratum_instance_t;
@ -429,7 +428,7 @@ out:
} }
static void _ckdbq_add(ckpool_t *ckp, const int idtype, json_t *val, const char *file, static void _ckdbq_add(ckpool_t *ckp, const int idtype, json_t *val, const char *file,
const char *func, const int line) const char *func, const int line)
{ {
static time_t time_counter; static time_t time_counter;
static int counter = 0; static int counter = 0;
@ -639,7 +638,7 @@ static void drop_allclients(ckpool_t *ckp)
ck_wlock(&instance_lock); ck_wlock(&instance_lock);
HASH_ITER(hh, stratum_instances, client, tmp) { HASH_ITER(hh, stratum_instances, client, tmp) {
HASH_DEL(stratum_instances, client); HASH_DEL(stratum_instances, client);
sprintf(buf, "dropclient=%d", client->id); sprintf(buf, "dropclient=%ld", client->id);
send_proc(ckp->connector, buf); send_proc(ckp->connector, buf);
} }
HASH_ITER(hh, disconnected_instances, client, tmp) HASH_ITER(hh, disconnected_instances, client, tmp)
@ -794,7 +793,7 @@ static void update_diff(ckpool_t *ckp)
} }
/* Enter with instance_lock held */ /* Enter with instance_lock held */
static stratum_instance_t *__instance_by_id(int id) static stratum_instance_t *__instance_by_id(int64_t id)
{ {
stratum_instance_t *instance; stratum_instance_t *instance;
@ -803,7 +802,7 @@ static stratum_instance_t *__instance_by_id(int id)
} }
/* Enter with write instance_lock held */ /* Enter with write instance_lock held */
static stratum_instance_t *__stratum_add_instance(ckpool_t *ckp, int id) static stratum_instance_t *__stratum_add_instance(ckpool_t *ckp, int64_t id)
{ {
stratum_instance_t *instance = ckzalloc(sizeof(stratum_instance_t)); stratum_instance_t *instance = ckzalloc(sizeof(stratum_instance_t));
@ -816,7 +815,7 @@ static stratum_instance_t *__stratum_add_instance(ckpool_t *ckp, int id)
return instance; return instance;
} }
static bool disconnected_sessionid_exists(const char *sessionid, int id) static bool disconnected_sessionid_exists(const char *sessionid, int64_t id)
{ {
bool connected_exists = false, ret = false; bool connected_exists = false, ret = false;
stratum_instance_t *instance, *tmp; stratum_instance_t *instance, *tmp;
@ -903,7 +902,7 @@ static void stratum_broadcast(json_t *val)
mutex_unlock(&ssends->lock); mutex_unlock(&ssends->lock);
} }
static void stratum_add_send(json_t *val, int client_id) static void stratum_add_send(json_t *val, int64_t client_id)
{ {
smsg_t *msg; smsg_t *msg;
@ -931,7 +930,7 @@ static void dec_worker(user_instance_t *instance)
mutex_unlock(&stats_lock); mutex_unlock(&stats_lock);
} }
static void drop_client(int id) static void drop_client(int64_t id)
{ {
stratum_instance_t *client = NULL; stratum_instance_t *client = NULL;
bool dec = false; bool dec = false;
@ -1085,9 +1084,9 @@ retry:
} else if (cmdmatch(buf, "diff")) { } else if (cmdmatch(buf, "diff")) {
update_diff(ckp); update_diff(ckp);
} else if (cmdmatch(buf, "dropclient")) { } else if (cmdmatch(buf, "dropclient")) {
int client_id; int64_t client_id;
ret = sscanf(buf, "dropclient=%d", &client_id); ret = sscanf(buf, "dropclient=%ld", &client_id);
if (ret < 0) if (ret < 0)
LOGDEBUG("Stratifier failed to parse dropclient command: %s", buf); LOGDEBUG("Stratifier failed to parse dropclient command: %s", buf);
else else
@ -1166,7 +1165,7 @@ static void new_enonce1(stratum_instance_t *client)
} }
/* Extranonce1 must be set here */ /* Extranonce1 must be set here */
static json_t *parse_subscribe(int client_id, json_t *params_val) static json_t *parse_subscribe(int64_t client_id, json_t *params_val)
{ {
stratum_instance_t *client = NULL; stratum_instance_t *client = NULL;
bool old_match = false; bool old_match = false;
@ -1278,7 +1277,7 @@ static int send_recv_auth(stratum_instance_t *client)
ts_realtime(&now); ts_realtime(&now);
sprintf(cdfield, "%lu,%lu", now.tv_sec, now.tv_nsec); sprintf(cdfield, "%lu,%lu", now.tv_sec, now.tv_nsec);
val = json_pack("{ss,ss,ss,ss,si,ss,ss,ss,ss,ss}", val = json_pack("{ss,ss,ss,ss,sI,ss,ss,ss,ss,ss}",
"username", client->user_instance->username, "username", client->user_instance->username,
"workername", client->workername, "workername", client->workername,
"poolinstance", ckp->name, "poolinstance", ckp->name,
@ -1550,7 +1549,7 @@ test_blocksolve(stratum_instance_t *client, workbase_t *wb, const uchar *data, c
flip_32(swap, hash); flip_32(swap, hash);
__bin2hex(blockhash, swap, 32); __bin2hex(blockhash, swap, 32);
val = json_pack("{si,ss,ss,sI,ss,ss,si,ss,ss,ss,sI,ss,ss,ss,ss}", val = json_pack("{si,ss,ss,sI,ss,ss,sI,ss,ss,ss,sI,ss,ss,ss,ss}",
"height", wb->height, "height", wb->height,
"blockhash", blockhash, "blockhash", blockhash,
"confirmed", "n", "confirmed", "n",
@ -1661,7 +1660,7 @@ static void submit_share(stratum_instance_t *client, int64_t jobid, const char *
char *msg; char *msg;
sprintf(enonce2, "%s%s", client->enonce1var, nonce2); sprintf(enonce2, "%s%s", client->enonce1var, nonce2);
json_msg = json_pack("{sisssssssisi}", "jobid", jobid, "nonce2", enonce2, json_msg = json_pack("{sisssssssIsi}", "jobid", jobid, "nonce2", enonce2,
"ntime", ntime, "nonce", nonce, "client_id", client->id, "ntime", ntime, "nonce", nonce, "client_id", client->id,
"msg_id", msg_id); "msg_id", msg_id);
msg = json_dumps(json_msg, 0); msg = json_dumps(json_msg, 0);
@ -1851,7 +1850,7 @@ out_unlock:
ckdbq_add(ckp, ID_SHARES, val); ckdbq_add(ckp, ID_SHARES, val);
out: out:
if (!share) { if (!share) {
val = json_pack("{si,ss,ss,sI,ss,ss,so,si,ss,ss,ss,ss}", val = json_pack("{sI,ss,ss,sI,ss,ss,so,si,ss,ss,ss,ss}",
"clientid", client->id, "clientid", client->id,
"secondaryuserid", client->secondaryuserid, "secondaryuserid", client->secondaryuserid,
"enonce1", client->enonce1, "enonce1", client->enonce1,
@ -1903,7 +1902,7 @@ static void stratum_broadcast_update(bool clean)
} }
/* For sending a single stratum template update */ /* For sending a single stratum template update */
static void stratum_send_update(int client_id, bool clean) static void stratum_send_update(int64_t client_id, bool clean)
{ {
json_t *json_msg; json_t *json_msg;
@ -1914,7 +1913,7 @@ static void stratum_send_update(int client_id, bool clean)
stratum_add_send(json_msg, client_id); stratum_add_send(json_msg, client_id);
} }
static void send_json_err(int client_id, json_t *id_val, const char *err_msg) static void send_json_err(int64_t client_id, json_t *id_val, const char *err_msg)
{ {
json_t *val; json_t *val;
@ -1922,7 +1921,7 @@ static void send_json_err(int client_id, json_t *id_val, const char *err_msg)
stratum_add_send(val, client_id); stratum_add_send(val, client_id);
} }
static void update_client(const int client_id) static void update_client(const int64_t client_id)
{ {
stratum_instance_t *client; stratum_instance_t *client;
@ -1936,7 +1935,7 @@ static void update_client(const int client_id)
stratum_send_diff(client); stratum_send_diff(client);
} }
static json_params_t *create_json_params(const int client_id, const json_t *params, const json_t *id_val, const char *address) static json_params_t *create_json_params(const int64_t client_id, const json_t *params, const json_t *id_val, const char *address)
{ {
json_params_t *jp = ckalloc(sizeof(json_params_t)); json_params_t *jp = ckalloc(sizeof(json_params_t));
@ -1947,7 +1946,7 @@ static json_params_t *create_json_params(const int client_id, const json_t *para
return jp; return jp;
} }
static void parse_method(const int client_id, json_t *id_val, json_t *method_val, static void parse_method(const int64_t client_id, json_t *id_val, json_t *method_val,
json_t *params_val, char *address) json_t *params_val, char *address)
{ {
stratum_instance_t *client; stratum_instance_t *client;
@ -1982,18 +1981,17 @@ static void parse_method(const int client_id, json_t *id_val, json_t *method_val
if (unlikely(cmdmatch(method, "mining.passthrough"))) { if (unlikely(cmdmatch(method, "mining.passthrough"))) {
/* We need to inform the connector process that this client /* We need to inform the connector process that this client
* is a passthrough and to manage its messages accordingly */ * is a passthrough and to manage its messages accordingly.
client->passthrough = true; * Remove this instance since the client id may well be
* reused */
ck_wlock(&instance_lock);
HASH_DEL(stratum_instances, client);
ck_wunlock(&instance_lock);
LOGINFO("Adding passthrough client %d", client->id); LOGINFO("Adding passthrough client %d", client->id);
snprintf(buf, 255, "passthrough=%d", client->id); snprintf(buf, 255, "passthrough=%ld", client->id);
send_proc(client->ckp->connector, buf); send_proc(client->ckp->connector, buf);
return; free(client);
}
/* No passthrough messages should get through from here on */
if (unlikely(client->passthrough)) {
LOGWARNING("Passthrough client messages falling through to stratifier");
return; return;
} }
@ -2010,7 +2008,7 @@ static void parse_method(const int client_id, json_t *id_val, json_t *method_val
* stratifier process to restart since it will have lost all * stratifier process to restart since it will have lost all
* the stratum instance data. Clients will just reconnect. */ * the stratum instance data. Clients will just reconnect. */
LOGINFO("Dropping unauthorised client %d", client->id); LOGINFO("Dropping unauthorised client %d", client->id);
snprintf(buf, 255, "dropclient=%d", client->id); snprintf(buf, 255, "dropclient=%ld", client->id);
send_proc(client->ckp->connector, buf); send_proc(client->ckp->connector, buf);
return; return;
} }
@ -2028,7 +2026,7 @@ static void parse_method(const int client_id, json_t *id_val, json_t *method_val
static void parse_instance_msg(smsg_t *msg) static void parse_instance_msg(smsg_t *msg)
{ {
json_t *val = msg->json_msg, *id_val, *method, *params; json_t *val = msg->json_msg, *id_val, *method, *params;
int client_id = msg->client_id; int64_t client_id = msg->client_id;
/* Return back the same id_val even if it's null or not existent. */ /* Return back the same id_val even if it's null or not existent. */
id_val = json_object_get(val, "id"); id_val = json_object_get(val, "id");
@ -2140,7 +2138,7 @@ static void sshare_process(ckpool_t __maybe_unused *ckp, json_params_t *jp)
{ {
json_t *result_val, *json_msg, *err_val = NULL; json_t *result_val, *json_msg, *err_val = NULL;
stratum_instance_t *client; stratum_instance_t *client;
int client_id; int64_t client_id;
client_id = jp->client_id; client_id = jp->client_id;
@ -2170,7 +2168,8 @@ static void sauth_process(ckpool_t *ckp, json_params_t *jp)
{ {
json_t *result_val, *json_msg, *err_val = NULL; json_t *result_val, *json_msg, *err_val = NULL;
stratum_instance_t *client; stratum_instance_t *client;
int client_id, errnum = 0; int64_t client_id;
int errnum = 0;
client_id = jp->client_id; client_id = jp->client_id;
@ -2179,7 +2178,7 @@ static void sauth_process(ckpool_t *ckp, json_params_t *jp)
ck_runlock(&instance_lock); ck_runlock(&instance_lock);
if (unlikely(!client)) { if (unlikely(!client)) {
LOGINFO("Authoriser failed to find client id %d in hashtable!", client_id); LOGINFO("Authoriser failed to find client id %ld in hashtable!", client_id);
goto out; goto out;
} }
result_val = parse_authorise(client, jp->params, &err_val, jp->address, &errnum); result_val = parse_authorise(client, jp->params, &err_val, jp->address, &errnum);
@ -2278,7 +2277,7 @@ static void update_userstats(ckpool_t *ckp)
ghs5 = client->dsps5 * nonces; ghs5 = client->dsps5 * nonces;
ghs60 = client->dsps60 * nonces; ghs60 = client->dsps60 * nonces;
ghs1440 = client->dsps1440 * nonces; ghs1440 = client->dsps1440 * nonces;
val = json_pack("{ss,si,si,ss,ss,sf,sf,sf,sf,sb,ss,ss,ss,ss}", val = json_pack("{ss,sI,si,ss,ss,sf,sf,sf,sf,sb,ss,ss,ss,ss}",
"poolinstance", ckp->name, "poolinstance", ckp->name,
"instanceid", client->id, "instanceid", client->id,
"elapsed", elapsed, "elapsed", elapsed,

Loading…
Cancel
Save