From b8e125a1f7c4186383d0553e0bb3c57b5965d715 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Fri, 21 Nov 2014 20:27:52 +1100 Subject: [PATCH] Store which serverurl each client is bound to in the connector and pass the information to the stratifier --- src/connector.c | 12 +++++++++--- src/stratifier.c | 19 ++++++++++++++++--- 2 files changed, 25 insertions(+), 6 deletions(-) diff --git a/src/connector.c b/src/connector.c index dcf5e48c..4759ec07 100644 --- a/src/connector.c +++ b/src/connector.c @@ -40,6 +40,9 @@ struct client_instance { struct sockaddr address; char address_name[INET6_ADDRSTRLEN]; + /* Which serverurl is this instance connected to */ + int server; + char buf[PAGESIZE]; int bufofs; @@ -115,12 +118,12 @@ static void dec_instance_ref(cdata_t *cdata, client_instance_t *client) /* Accepts incoming connections on the server socket and generates client * instances */ -static int accept_client(cdata_t *cdata, const int epfd, const int sockd) +static int accept_client(cdata_t *cdata, const int epfd, const uint64_t server) { + int fd, port, no_clients, sockd; ckpool_t *ckp = cdata->ckp; client_instance_t *client; struct epoll_event event; - int fd, port, no_clients; socklen_t address_len; ck_rlock(&cdata->lock); @@ -132,7 +135,9 @@ static int accept_client(cdata_t *cdata, const int epfd, const int sockd) return 0; } + sockd = cdata->serverfd[server]; client = ckzalloc(sizeof(client_instance_t)); + client->server = server; address_len = sizeof(client->address); fd = accept(sockd, &client->address, &address_len); if (unlikely(fd < 0)) { @@ -333,6 +338,7 @@ reparse: } else json_object_set_new_nocheck(val, "client_id", json_integer(client->id)); json_object_set_new_nocheck(val, "address", json_string(client->address_name)); + json_object_set_new_nocheck(val, "server", json_integer(client->server)); s = json_dumps(val, 0); if (ckp->passthrough) send_proc(ckp->generator, s); @@ -400,7 +406,7 @@ void *receiver(void *arg) continue; } if (event.data.u64 < (uint64_t)cdata->serverfds) { - ret = accept_client(cdata, epfd, (int)cdata->serverfd[event.data.u64]); + ret = accept_client(cdata, epfd, event.data.u64); if (unlikely(ret < 0)) { LOGEMERG("FATAL: Failed to accept_client in receiver"); break; diff --git a/src/stratifier.c b/src/stratifier.c index f08e185d..ee6af47c 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -247,6 +247,7 @@ struct stratum_instance { char *useragent; char *workername; int64_t user_id; + int server; /* Which server is this instance bound to */ ckpool_t *ckp; @@ -1082,16 +1083,17 @@ static void dec_instance_ref(sdata_t *sdata, stratum_instance_t *instance) } /* Enter with write instance_lock held */ -static stratum_instance_t *__stratum_add_instance(ckpool_t *ckp, int64_t id) +static stratum_instance_t *__stratum_add_instance(ckpool_t *ckp, int64_t id, int server) { stratum_instance_t *instance = ckzalloc(sizeof(stratum_instance_t)); sdata_t *sdata = ckp->data; instance->id = id; + instance->server = server; instance->diff = instance->old_diff = ckp->startdiff; instance->ckp = ckp; tv_time(&instance->ldc); - LOGINFO("Added instance %ld", id); + LOGINFO("Stratifier added instance %ld server %d", id, server); HASH_ADD_I64(sdata->stratum_instances, id, instance); return instance; } @@ -2966,6 +2968,7 @@ static void srecv_process(ckpool_t *ckp, char *buf) sdata_t *sdata = ckp->data; smsg_t *msg; json_t *val; + int server; val = json_loads(buf, 0, NULL); if (unlikely(!val)) { @@ -2995,11 +2998,21 @@ static void srecv_process(ckpool_t *ckp, char *buf) strcpy(msg->address, json_string_value(val)); json_object_clear(val); + val = json_object_get(msg->json_msg, "server"); + if (unlikely(!val)) { + LOGWARNING("Failed to extract server from connector json smsg %s", buf); + json_decref(msg->json_msg); + free(msg); + goto out; + } + server = json_integer_value(val); + json_object_clear(val); + /* Parse the message here */ ck_wlock(&sdata->instance_lock); /* client_id instance doesn't exist yet, create one */ if (!__instance_by_id(sdata, msg->client_id)) - __stratum_add_instance(ckp, msg->client_id); + __stratum_add_instance(ckp, msg->client_id, server); ck_wunlock(&sdata->instance_lock); parse_instance_msg(sdata, msg);