Browse Source

Store which serverurl each client is bound to in the connector and pass the information to the stratifier

master
Con Kolivas 10 years ago
parent
commit
b8e125a1f7
  1. 12
      src/connector.c
  2. 19
      src/stratifier.c

12
src/connector.c

@ -40,6 +40,9 @@ struct client_instance {
struct sockaddr address; struct sockaddr address;
char address_name[INET6_ADDRSTRLEN]; char address_name[INET6_ADDRSTRLEN];
/* Which serverurl is this instance connected to */
int server;
char buf[PAGESIZE]; char buf[PAGESIZE];
int bufofs; int bufofs;
@ -115,12 +118,12 @@ static void dec_instance_ref(cdata_t *cdata, client_instance_t *client)
/* Accepts incoming connections on the server socket and generates client /* Accepts incoming connections on the server socket and generates client
* instances */ * instances */
static int accept_client(cdata_t *cdata, const int epfd, const int sockd) static int accept_client(cdata_t *cdata, const int epfd, const uint64_t server)
{ {
int fd, port, no_clients, sockd;
ckpool_t *ckp = cdata->ckp; ckpool_t *ckp = cdata->ckp;
client_instance_t *client; client_instance_t *client;
struct epoll_event event; struct epoll_event event;
int fd, port, no_clients;
socklen_t address_len; socklen_t address_len;
ck_rlock(&cdata->lock); ck_rlock(&cdata->lock);
@ -132,7 +135,9 @@ static int accept_client(cdata_t *cdata, const int epfd, const int sockd)
return 0; return 0;
} }
sockd = cdata->serverfd[server];
client = ckzalloc(sizeof(client_instance_t)); client = ckzalloc(sizeof(client_instance_t));
client->server = server;
address_len = sizeof(client->address); address_len = sizeof(client->address);
fd = accept(sockd, &client->address, &address_len); fd = accept(sockd, &client->address, &address_len);
if (unlikely(fd < 0)) { if (unlikely(fd < 0)) {
@ -333,6 +338,7 @@ reparse:
} else } else
json_object_set_new_nocheck(val, "client_id", json_integer(client->id)); json_object_set_new_nocheck(val, "client_id", json_integer(client->id));
json_object_set_new_nocheck(val, "address", json_string(client->address_name)); json_object_set_new_nocheck(val, "address", json_string(client->address_name));
json_object_set_new_nocheck(val, "server", json_integer(client->server));
s = json_dumps(val, 0); s = json_dumps(val, 0);
if (ckp->passthrough) if (ckp->passthrough)
send_proc(ckp->generator, s); send_proc(ckp->generator, s);
@ -400,7 +406,7 @@ void *receiver(void *arg)
continue; continue;
} }
if (event.data.u64 < (uint64_t)cdata->serverfds) { if (event.data.u64 < (uint64_t)cdata->serverfds) {
ret = accept_client(cdata, epfd, (int)cdata->serverfd[event.data.u64]); ret = accept_client(cdata, epfd, event.data.u64);
if (unlikely(ret < 0)) { if (unlikely(ret < 0)) {
LOGEMERG("FATAL: Failed to accept_client in receiver"); LOGEMERG("FATAL: Failed to accept_client in receiver");
break; break;

19
src/stratifier.c

@ -247,6 +247,7 @@ struct stratum_instance {
char *useragent; char *useragent;
char *workername; char *workername;
int64_t user_id; int64_t user_id;
int server; /* Which server is this instance bound to */
ckpool_t *ckp; ckpool_t *ckp;
@ -1082,16 +1083,17 @@ static void dec_instance_ref(sdata_t *sdata, stratum_instance_t *instance)
} }
/* Enter with write instance_lock held */ /* Enter with write instance_lock held */
static stratum_instance_t *__stratum_add_instance(ckpool_t *ckp, int64_t id) static stratum_instance_t *__stratum_add_instance(ckpool_t *ckp, int64_t id, int server)
{ {
stratum_instance_t *instance = ckzalloc(sizeof(stratum_instance_t)); stratum_instance_t *instance = ckzalloc(sizeof(stratum_instance_t));
sdata_t *sdata = ckp->data; sdata_t *sdata = ckp->data;
instance->id = id; instance->id = id;
instance->server = server;
instance->diff = instance->old_diff = ckp->startdiff; instance->diff = instance->old_diff = ckp->startdiff;
instance->ckp = ckp; instance->ckp = ckp;
tv_time(&instance->ldc); tv_time(&instance->ldc);
LOGINFO("Added instance %ld", id); LOGINFO("Stratifier added instance %ld server %d", id, server);
HASH_ADD_I64(sdata->stratum_instances, id, instance); HASH_ADD_I64(sdata->stratum_instances, id, instance);
return instance; return instance;
} }
@ -2966,6 +2968,7 @@ static void srecv_process(ckpool_t *ckp, char *buf)
sdata_t *sdata = ckp->data; sdata_t *sdata = ckp->data;
smsg_t *msg; smsg_t *msg;
json_t *val; json_t *val;
int server;
val = json_loads(buf, 0, NULL); val = json_loads(buf, 0, NULL);
if (unlikely(!val)) { if (unlikely(!val)) {
@ -2995,11 +2998,21 @@ static void srecv_process(ckpool_t *ckp, char *buf)
strcpy(msg->address, json_string_value(val)); strcpy(msg->address, json_string_value(val));
json_object_clear(val); json_object_clear(val);
val = json_object_get(msg->json_msg, "server");
if (unlikely(!val)) {
LOGWARNING("Failed to extract server from connector json smsg %s", buf);
json_decref(msg->json_msg);
free(msg);
goto out;
}
server = json_integer_value(val);
json_object_clear(val);
/* Parse the message here */ /* Parse the message here */
ck_wlock(&sdata->instance_lock); ck_wlock(&sdata->instance_lock);
/* client_id instance doesn't exist yet, create one */ /* client_id instance doesn't exist yet, create one */
if (!__instance_by_id(sdata, msg->client_id)) if (!__instance_by_id(sdata, msg->client_id))
__stratum_add_instance(ckp, msg->client_id); __stratum_add_instance(ckp, msg->client_id, server);
ck_wunlock(&sdata->instance_lock); ck_wunlock(&sdata->instance_lock);
parse_instance_msg(sdata, msg); parse_instance_msg(sdata, msg);

Loading…
Cancel
Save