diff --git a/src/ckpool.c b/src/ckpool.c index a5861899..f062ad93 100644 --- a/src/ckpool.c +++ b/src/ckpool.c @@ -180,11 +180,12 @@ ckmsgq_t *create_ckmsgqs(ckpool_t *ckp, const char *name, const void *func, cons /* Generic function for adding messages to a ckmsgq linked list and signal the * ckmsgq parsing thread(s) to wake up and process it. */ -void ckmsgq_add(ckmsgq_t *ckmsgq, void *data) +void _ckmsgq_add(ckmsgq_t *ckmsgq, void *data, const char *file, const char *func, const int line) { ckmsg_t *msg; if (unlikely(!ckmsgq)) { + LOGWARNING("Sending messages to no queue from %s %s:%d", file, func, line); /* Discard data if we're unlucky enough to be sending it to * msg queues not set up during start up */ free(data); @@ -286,7 +287,7 @@ unix_msg_t *get_unix_msg(proc_instance_t *pi) return umsg; } -void create_unix_receiver(proc_instance_t *pi) +static void create_unix_receiver(proc_instance_t *pi) { pthread_t pth; @@ -909,7 +910,7 @@ out: static void terminate_oldpid(const ckpool_t *ckp, proc_instance_t *pi, const pid_t oldpid) { if (!ckp->killold) { - quit(1, "Process %s pid %d still exists, start ckpool with -k if you wish to kill it", + quit(1, "Process %s pid %d still exists, start ckpool with -H to get a handover or -k if you wish to kill it", pi->processname, oldpid); } LOGNOTICE("Terminating old process %s pid %d", pi->processname, oldpid); @@ -1499,6 +1500,7 @@ static void prepare_child(ckpool_t *ckp, proc_instance_t *pi, void *process, cha pi->sockname = pi->processname; create_process_unixsock(pi); create_pthread(&pi->pth_process, process, pi); + create_unix_receiver(pi); } #ifdef USE_CKDB @@ -1559,7 +1561,7 @@ static bool send_recv_path(const char *path, const char *msg) LOGWARNING("Received: %s in response to %s request", response, msg); dealloc(response); } else - LOGWARNING("Received not response to %s request", msg); + LOGWARNING("Received no response to %s request", msg); Close(sockd); return ret; } @@ -1822,6 +1824,7 @@ int main(int argc, char **argv) ckp.main.sockname = strdup("listener"); name_process_sockname(&ckp.main.us, &ckp.main); ckp.oldconnfd = ckzalloc(sizeof(int *) * ckp.serverurls); + manage_old_instance(&ckp, &ckp.main); if (ckp.handover) { const char *path = ckp.main.us.path; @@ -1870,8 +1873,6 @@ int main(int argc, char **argv) } } - if (!ckp.handover) - manage_old_instance(&ckp, &ckp.main); write_namepid(&ckp.main); open_process_sock(&ckp, &ckp.main, &ckp.main.us); launch_logger(&ckp); @@ -1888,7 +1889,7 @@ int main(int argc, char **argv) ckp.maxclients = ret * 9 / 10; } - ckp.ckpapi = create_ckmsgq(&ckp, "api", &ckpool_api); + // ckp.ckpapi = create_ckmsgq(&ckp, "api", &ckpool_api); create_pthread(&ckp.pth_listener, listener, &ckp.main); handler.sa_handler = &sighandler; diff --git a/src/ckpool.h b/src/ckpool.h index 08de2914..66297959 100644 --- a/src/ckpool.h +++ b/src/ckpool.h @@ -315,10 +315,10 @@ static const char __maybe_unused *stratum_msgs[] = { ckmsgq_t *create_ckmsgq(ckpool_t *ckp, const char *name, const void *func); ckmsgq_t *create_ckmsgqs(ckpool_t *ckp, const char *name, const void *func, const int count); -void ckmsgq_add(ckmsgq_t *ckmsgq, void *data); +void _ckmsgq_add(ckmsgq_t *ckmsgq, void *data, const char *file, const char *func, const int line); +#define ckmsgq_add(ckmsgq, data) _ckmsgq_add(ckmsgq, data, __FILE__, __func__, __LINE__) bool ckmsgq_empty(ckmsgq_t *ckmsgq); unix_msg_t *get_unix_msg(proc_instance_t *pi); -void create_unix_receiver(proc_instance_t *pi); ckpool_t *global_ckp; diff --git a/src/connector.c b/src/connector.c index 627977b8..ea5144f9 100644 --- a/src/connector.c +++ b/src/connector.c @@ -305,13 +305,6 @@ static int accept_client(cdata_t *cdata, const int epfd, const uint64_t server) cdata->nfds++; ck_wunlock(&cdata->lock); - event.data.u64 = client->id; - event.events = EPOLLIN | EPOLLRDHUP | EPOLLONESHOT; - if (unlikely(epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &event) < 0)) { - LOGERR("Failed to epoll_ctl add in accept_client"); - return 0; - } - /* We increase the ref count on this client as epoll creates a pointer * to it. We drop that reference when the socket is closed which * removes it automatically from the epoll list. */ @@ -321,6 +314,14 @@ static int accept_client(cdata_t *cdata, const int epfd, const uint64_t server) getsockopt(fd, SOL_SOCKET, SO_RCVBUF, &client->sendbufsize, &optlen); LOGDEBUG("Client sendbufsize detected as %d", client->sendbufsize); + event.data.u64 = client->id; + event.events = EPOLLIN | EPOLLRDHUP | EPOLLONESHOT; + if (unlikely(epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &event) < 0)) { + LOGERR("Failed to epoll_ctl add in accept_client"); + dec_instance_ref(cdata, client); + return 0; + } + return 1; } @@ -332,12 +333,12 @@ static int __drop_client(cdata_t *cdata, client_instance_t *client) goto out; client->invalid = true; ret = client->fd; + /* Closing the fd will automatically remove it from the epoll list */ Close(client->fd); - epoll_ctl(cdata->epfd, EPOLL_CTL_DEL, ret, NULL); HASH_DEL(cdata->clients, client); DL_APPEND(cdata->dead_clients, client); /* This is the reference to this client's presence in the - * epoll list. */ + * epoll list. */ __dec_instance_ref(client); cdata->dead_generated++; out: @@ -572,22 +573,16 @@ static void client_event_processor(ckpool_t *ckp, struct epoll_event *event) LOGNOTICE("Failed to find client by id %"PRId64" in receiver!", id); goto outnoclient; } - if (unlikely(client->invalid)) - goto out; /* We can have both messages and read hang ups so process the - * message first. */ + * message first. */ if (likely(events & EPOLLIN)) { /* Rearm the client for epoll events if we have successfully * parsed a message from it */ - if (parse_client_msg(ckp, cdata, client)) { - event->data.u64 = id; - event->events = EPOLLIN | EPOLLRDHUP | EPOLLONESHOT; - epoll_ctl(cdata->epfd, EPOLL_CTL_MOD, client->fd, event); - } else + if (unlikely(!parse_client_msg(ckp, cdata, client))) { invalidate_client(ckp, cdata, client); + goto out; + } } - if (unlikely(client->invalid)) - goto out; if (unlikely(events & EPOLLERR)) { socklen_t errlen = sizeof(int); int error = 0; @@ -613,6 +608,12 @@ static void client_event_processor(ckpool_t *ckp, struct epoll_event *event) invalidate_client(cdata->pi->ckp, cdata, client); } out: + if (likely(!client->invalid)) { + /* Rearm the fd in the epoll list if it's still active */ + event->data.u64 = id; + event->events = EPOLLIN | EPOLLRDHUP | EPOLLONESHOT; + epoll_ctl(cdata->epfd, EPOLL_CTL_MOD, client->fd, event); + } dec_instance_ref(cdata, client); outnoclient: free(event); @@ -624,8 +625,10 @@ static void *receiver(void *arg) { cdata_t *cdata = (cdata_t *)arg; struct epoll_event *event = ckzalloc(sizeof(struct epoll_event)); + ckpool_t *ckp = cdata->ckp; uint64_t serverfds, i; int ret, epfd; + char *buf; rename_proc("creceiver"); @@ -634,7 +637,7 @@ static void *receiver(void *arg) LOGEMERG("FATAL: Failed to create epoll in receiver"); goto out; } - serverfds = cdata->ckp->serverurls; + serverfds = ckp->serverurls; /* Add all the serverfds to the epoll */ for (i = 0; i < serverfds; i++) { /* The small values will be less than the first client ids */ @@ -647,8 +650,11 @@ static void *receiver(void *arg) } } - while (!cdata->accept) - cksleep_ms(1); + /* Wait for the stratifier to be ready for us */ + do { + buf = send_recv_proc(ckp->stratifier, "ping"); + } while (!buf); + free(buf); while (42) { uint64_t edu64; @@ -1308,7 +1314,7 @@ static char *connector_stats(cdata_t *cdata, const int runtime) return buf; } -static int connector_loop(proc_instance_t *pi, cdata_t *cdata) +static void connector_loop(proc_instance_t *pi, cdata_t *cdata) { unix_msg_t *umsg = NULL; ckpool_t *ckp = pi->ckp; @@ -1408,8 +1414,6 @@ retry: send_unix_msg(umsg->sockd, msg); } else if (cmdmatch(buf, "loglevel")) { sscanf(buf, "loglevel=%d", &ckp->loglevel); - } else if (cmdmatch(buf, "shutdown")) { - goto out; } else if (cmdmatch(buf, "passthrough")) { client_instance_t *client; @@ -1449,16 +1453,14 @@ retry: } else LOGWARNING("Unhandled connector message: %s", buf); goto retry; -out: - return ret; } void *connector(void *arg) { proc_instance_t *pi = (proc_instance_t *)arg; cdata_t *cdata = ckzalloc(sizeof(cdata_t)); - int threads, sockd, ret = 0, i, tries = 0; char newurl[INET6_ADDRSTRLEN], newport[8]; + int threads, sockd, i, tries = 0, ret; ckpool_t *ckp = pi->ckp; const int on = 1; @@ -1477,7 +1479,6 @@ void *connector(void *arg) sockd = socket(AF_INET, SOCK_STREAM, 0); if (sockd < 0) { LOGERR("Connector failed to open socket"); - ret = 1; goto out; } setsockopt(sockd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)); @@ -1487,6 +1488,7 @@ void *connector(void *arg) serv_addr.sin_port = htons(ckp->proxy ? 3334 : 3333); do { ret = bind(sockd, (struct sockaddr*)&serv_addr, sizeof(serv_addr)); + if (!ret) break; LOGWARNING("Connector failed to bind to socket, retrying in 5s"); @@ -1517,7 +1519,6 @@ void *connector(void *arg) if (!url_from_serverurl(serverurl, newurl, newport)) { LOGWARNING("Failed to extract resolved url from %s", serverurl); - ret = 1; goto out; } sockd = ckp->oldconnfd[i]; @@ -1541,7 +1542,6 @@ void *connector(void *arg) if (sockd < 0) { LOGERR("Connector failed to bind to socket for 2 minutes"); - ret = 1; goto out; } if (listen(sockd, 8192) < 0) { @@ -1558,10 +1558,9 @@ void *connector(void *arg) cdata->cmpq = create_ckmsgq(ckp, "cmpq", &client_message_processor); - if (ckp->remote && !setup_upstream(ckp, cdata)) { - ret = 1; + if (ckp->remote && !setup_upstream(ckp, cdata)) goto out; - } + cklock_init(&cdata->lock); cdata->pi = pi; cdata->nfds = 0; @@ -1576,10 +1575,10 @@ void *connector(void *arg) create_pthread(&cdata->pth_receiver, receiver, cdata); cdata->start_time = time(NULL); - create_unix_receiver(pi); - - ret = connector_loop(pi, cdata); + connector_loop(pi, cdata); out: - dealloc(ckp->cdata); + /* We should never get here unless there's a fatal error */ + LOGEMERG("Connector failure, shutting down"); + exit(1); return NULL; } diff --git a/src/generator.c b/src/generator.c index 418077d9..e0f14369 100644 --- a/src/generator.c +++ b/src/generator.c @@ -2787,6 +2787,10 @@ static void *server_watchdog(void *arg) ckpool_t *ckp = (ckpool_t *)arg; gdata_t *gdata = ckp->gdata; + rename_proc("swatchdog"); + + pthread_detach(pthread_self()); + while (42) { server_instance_t *best = NULL; ts_t timer_t; @@ -2915,7 +2919,6 @@ void *generator(void *arg) gdata = ckzalloc(sizeof(gdata_t)); ckp->gdata = gdata; gdata->ckp = ckp; - create_unix_receiver(pi); if (ckp->proxy) { char *buf = NULL; @@ -2929,6 +2932,8 @@ void *generator(void *arg) proxy_mode(ckp, pi); } else server_mode(ckp, pi); - dealloc(ckp->gdata); + /* We should never get here unless there's a fatal error */ + LOGEMERG("Generator failure, shutting down"); + exit(1); return NULL; } diff --git a/src/stratifier.c b/src/stratifier.c index e9afeb99..03358643 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -4772,7 +4772,53 @@ static user_instance_t *generate_user(ckpool_t *ckp, stratum_instance_t *client, return user; } -static void set_worker_mindiff(ckpool_t *ckp, const char *workername, int mindiff); +static void set_worker_mindiff(ckpool_t *ckp, const char *workername, int mindiff) +{ + stratum_instance_t *client; + sdata_t *sdata = ckp->sdata; + worker_instance_t *worker; + user_instance_t *user; + + /* Find the user first */ + user = user_by_workername(sdata, workername); + + /* Then find the matching worker user */ + worker = get_worker(sdata, user, workername); + + if (mindiff < 1) { + if (likely(!mindiff)) { + worker->mindiff = 0; + return; + } + LOGINFO("Worker %s requested invalid diff %d", worker->workername, mindiff); + return; + } + if (mindiff < ckp->mindiff) + mindiff = ckp->mindiff; + if (mindiff == worker->mindiff) + return; + worker->mindiff = mindiff; + + /* Iterate over all the workers from this user to find any with the + * matching worker that are currently live and send them a new diff + * if we can. Otherwise it will only act as a clamp on next share + * submission. */ + ck_rlock(&sdata->instance_lock); + DL_FOREACH(user->clients, client) { + if (client->worker_instance != worker) + continue; + /* Per connection suggest diff overrides worker mindiff ugh */ + if (mindiff < client->suggest_diff) + continue; + if (mindiff == client->diff) + continue; + client->diff_change_job_id = sdata->workbase_id + 1; + client->old_diff = client->diff; + client->diff = mindiff; + stratum_send_diff(sdata, client); + } + ck_runlock(&sdata->instance_lock); +} static void parse_worker_diffs(ckpool_t *ckp, json_t *worker_array) { @@ -4845,7 +4891,6 @@ static int send_recv_auth(stratum_instance_t *client) responselen = strlen(buf); if (likely(responselen > 0)) { char *cmd = NULL, *secondaryuserid = NULL, *response; - worker_instance_t *worker = client->worker_instance; json_error_t err_val; json_t *val = NULL; int offset = 0; @@ -4877,7 +4922,6 @@ static int send_recv_auth(stratum_instance_t *client) json_get_string(&secondaryuserid, val, "secondaryuserid"); parse_worker_diffs(ckp, worker_array); - client->suggest_diff = worker->mindiff; user->auth_time = time(NULL); } if (secondaryuserid && (!safecmp(response, "ok.authorise") || @@ -4918,9 +4962,6 @@ static void queue_delayed_auth(stratum_instance_t *client) json_t *val; ts_t now; - /* Read off any cached mindiff from previous auths */ - client->suggest_diff = client->worker_instance->mindiff; - ts_realtime(&now); sprintf(cdfield, "%lu,%lu", now.tv_sec, now.tv_nsec); @@ -5026,9 +5067,7 @@ static json_t *parse_authorise(stratum_instance_t *client, const json_t *params_ /* Preauth workers for the first 10 minutes after the user is * first authorised by ckdb to avoid floods of worker auths. * *errnum is implied zero already so ret will be set true */ - if (user->auth_time && time(NULL) - user->auth_time < 600) - client->suggest_diff = client->worker_instance->mindiff; - else + if (!user->auth_time || time(NULL) - user->auth_time > 600) *errnum = send_recv_auth(client); if (!*errnum) ret = true; @@ -5121,7 +5160,7 @@ static void add_submit(ckpool_t *ckp, stratum_instance_t *client, const double d worker_instance_t *worker = client->worker_instance; double tdiff, bdiff, dsps, drr, network_diff, bias; user_instance_t *user = client->user_instance; - int64_t next_blockid, optimal; + int64_t next_blockid, optimal, mindiff; tv_t now_t; mutex_lock(&ckp_sdata->stats_lock); @@ -5196,8 +5235,13 @@ static void add_submit(ckpool_t *ckp, stratum_instance_t *client, const double d if (drr > 0.15 && drr < 0.4) return; + /* Client suggest diff overrides worker mindiff */ + if (client->suggest_diff) + mindiff = client->suggest_diff; + else + mindiff = worker->mindiff; /* Allow slightly lower diffs when users choose their own mindiff */ - if (worker->mindiff || client->suggest_diff) { + if (mindiff) { if (drr < 0.5) return; optimal = lround(dsps * 2.4); @@ -5205,18 +5249,20 @@ static void add_submit(ckpool_t *ckp, stratum_instance_t *client, const double d optimal = lround(dsps * 3.33); /* Clamp to mindiff ~ network_diff */ - if (optimal < ckp->mindiff) - optimal = ckp->mindiff; - /* Client suggest diff overrides worker mindiff */ - if (client->suggest_diff) { - if (optimal < client->suggest_diff) - optimal = client->suggest_diff; - } else if (optimal < worker->mindiff) - optimal = worker->mindiff; - if (ckp->maxdiff && optimal > ckp->maxdiff) - optimal = ckp->maxdiff; - if (optimal > network_diff) - optimal = network_diff; + + /* Set to higher of pool mindiff and optimal */ + optimal = MAX(optimal, ckp->mindiff); + + /* Set to higher of optimal and user chosen diff */ + optimal = MAX(optimal, mindiff); + + /* Set to lower of optimal and pool maxdiff */ + if (ckp->maxdiff) + optimal = MIN(optimal, ckp->maxdiff); + + /* Set to lower of optimal and network_diff */ + optimal = MIN(optimal, network_diff); + if (client->diff == optimal) return; @@ -5539,8 +5585,8 @@ out_submit: out_unlock: ck_runlock(&sdata->workbase_lock); - /* Accept the lower of new and old diffs until the next update */ - if (id < client->diff_change_job_id && client->old_diff < client->diff) + /* Accept shares of the old diff until the next update */ + if (id < client->diff_change_job_id) diff = client->old_diff; if (!invalid) { char wdiffsuffix[16]; @@ -5748,55 +5794,13 @@ static json_params_t return jp; } -static void set_worker_mindiff(ckpool_t *ckp, const char *workername, int mindiff) -{ - stratum_instance_t *client; - sdata_t *sdata = ckp->sdata; - worker_instance_t *worker; - user_instance_t *user; - - /* Find the user first */ - user = user_by_workername(sdata, workername); - - /* Then find the matching worker user */ - worker = get_worker(sdata, user, workername); - - if (mindiff < 0) { - LOGINFO("Worker %s requested invalid diff %d", worker->workername, mindiff); - return; - } - if (mindiff < ckp->mindiff) - mindiff = ckp->mindiff; - if (mindiff == worker->mindiff) - return; - worker->mindiff = mindiff; - - /* Iterate over all the workers from this user to find any with the - * matching worker that are currently live and send them a new diff - * if we can. Otherwise it will only act as a clamp on next share - * submission. */ - ck_rlock(&sdata->instance_lock); - DL_FOREACH(user->clients, client) { - if (client->worker_instance != worker) - continue; - /* Per connection suggest diff overrides worker mindiff ugh */ - if (mindiff < client->suggest_diff) - continue; - if (mindiff == client->diff) - continue; - client->diff = mindiff; - stratum_send_diff(sdata, client); - } - ck_runlock(&sdata->instance_lock); -} - /* Implement support for the diff in the params as well as the originally * documented form of placing diff within the method. Needs to be entered with * client holding a ref count. */ -static void suggest_diff(stratum_instance_t *client, const char *method, const json_t *params_val) +static void suggest_diff(ckpool_t *ckp, stratum_instance_t *client, const char *method, + const json_t *params_val) { json_t *arr_val = json_array_get(params_val, 0); - sdata_t *sdata = client->ckp->sdata; int64_t sdiff; if (unlikely(!client_active(client))) { @@ -5809,16 +5813,18 @@ static void suggest_diff(stratum_instance_t *client, const char *method, const j LOGINFO("Failed to parse suggest_difficulty for client %"PRId64, client->id); return; } + /* Clamp suggest diff to global pool mindiff */ + if (sdiff < ckp->mindiff) + sdiff = ckp->mindiff; if (sdiff == client->suggest_diff) return; client->suggest_diff = sdiff; if (client->diff == sdiff) return; - if (sdiff < client->ckp->mindiff) - client->diff = client->ckp->mindiff; - else - client->diff = sdiff; - stratum_send_diff(sdata, client); + client->diff_change_job_id = client->sdata->workbase_id + 1; + client->old_diff = client->diff; + client->diff = sdiff; + stratum_send_diff(ckp->sdata, client); } /* Send diff first when sending the first stratum template after subscribing */ @@ -6032,7 +6038,7 @@ static void parse_method(ckpool_t *ckp, sdata_t *sdata, stratum_instance_t *clie } if (cmdmatch(method, "mining.suggest")) { - suggest_diff(client, method, params_val); + suggest_diff(ckp, client, method, params_val); return; } @@ -6675,10 +6681,10 @@ static stratum_instance_t *preauth_ref_instance_by_id(sdata_t *sdata, const int6 static void sauth_process(ckpool_t *ckp, json_params_t *jp) { json_t *result_val, *json_msg, *err_val = NULL; - stratum_instance_t *client; sdata_t *sdata = ckp->sdata; - int mindiff, errnum = 0; - int64_t client_id; + stratum_instance_t *client; + int64_t mindiff, client_id; + int errnum = 0; client_id = jp->client_id; @@ -6708,12 +6714,18 @@ static void sauth_process(ckpool_t *ckp, json_params_t *jp) steal_json_id(json_msg, jp); stratum_add_send(sdata, json_msg, client_id, SM_AUTHRESULT); - if (!json_is_true(result_val) || !client->suggest_diff) + if (!json_is_true(result_val)) goto out; /* Update the client now if they have set a valid mindiff different - * from the startdiff */ - mindiff = MAX(ckp->mindiff, client->suggest_diff); + * from the startdiff. suggest_diff overrides worker mindiff */ + if (client->suggest_diff) + mindiff = client->suggest_diff; + else + mindiff = client->worker_instance->mindiff; + if (!mindiff) + goto out; + mindiff = MAX(ckp->mindiff, mindiff); if (mindiff != client->diff) { client->diff = mindiff; stratum_send_diff(sdata, client); @@ -6779,7 +6791,7 @@ static bool test_and_clear(bool *val, mutex_t *lock) static void ckdbq_process(ckpool_t *ckp, char *msg) { sdata_t *sdata = ckp->sdata; - size_t responselen = 0; + size_t responselen; char *buf = NULL; while (!buf) { @@ -6799,16 +6811,15 @@ static void ckdbq_process(ckpool_t *ckp, char *msg) /* Process any requests from ckdb that are heartbeat responses with * specific requests. */ - if (likely(buf)) - responselen = strlen(buf); - if (likely(responselen > 0)) { + responselen = strlen(buf); + if (likely(responselen > 1)) { char *response = alloca(responselen); int offset = 0; memset(response, 0, responselen); - if (sscanf(buf, "%*d.%*d.%c%n", response, &offset) > 0) { - strcpy(response+1, buf+offset); - if (safecmp(response, "ok")) { + if (likely(sscanf(buf, "%*d.%*d.%c%n", response, &offset) > 0)) { + strcpy(response + 1, buf + offset); + if (likely(safecmp(response, "ok"))) { char *cmd; cmd = response; @@ -6822,8 +6833,8 @@ static void ckdbq_process(ckpool_t *ckp, char *msg) LOGWARNING("Got ckdb failure response: %s", buf); } else LOGWARNING("Got bad ckdb response: %s", buf); - free(buf); } + free(buf); } static int transactions_by_jobid(sdata_t *sdata, const int64_t id) @@ -7587,11 +7598,11 @@ void *stratifier(void *arg) * are CPUs */ threads = sysconf(_SC_NPROCESSORS_ONLN) / 2 ? : 1; sdata->sshareq = create_ckmsgqs(ckp, "sprocessor", &sshare_process, threads); - sdata->ssends = create_ckmsgq(ckp, "ssender", &ssend_process); + sdata->ssends = create_ckmsgqs(ckp, "ssender", &ssend_process, threads); sdata->sauthq = create_ckmsgq(ckp, "authoriser", &sauth_process); sdata->stxnq = create_ckmsgq(ckp, "stxnq", &send_transactions); sdata->srecvs = create_ckmsgqs(ckp, "sreceiver", &srecv_process, threads); - sdata->ckdbq = create_ckmsgq(ckp, "ckdbqueue", &ckdbq_process); + sdata->ckdbq = create_ckmsgqs(ckp, "ckdbqueue", &ckdbq_process, threads); create_pthread(&pth_heartbeat, ckdb_heartbeat, ckp); read_poolstats(ckp); @@ -7609,22 +7620,12 @@ void *stratifier(void *arg) mutex_init(&sdata->share_lock); mutex_init(&sdata->block_lock); - create_unix_receiver(pi); - LOGWARNING("%s stratifier ready", ckp->name); stratum_loop(ckp, pi); out: - if (ckp->proxy) { - proxy_t *proxy, *tmpproxy; - - mutex_lock(&sdata->proxy_lock); - HASH_ITER(hh, sdata->proxies, proxy, tmpproxy) { - HASH_DEL(sdata->proxies, proxy); - dealloc(proxy); - } - mutex_unlock(&sdata->proxy_lock); - } - dealloc(ckp->sdata); + /* We should never get here unless there's a fatal error */ + LOGEMERG("Stratifier failure, shutting down"); + exit(1); return NULL; }