kanoi 9 years ago
parent
commit
1713f32042
  1. 15
      src/ckpool.c
  2. 4
      src/ckpool.h
  3. 71
      src/connector.c
  4. 9
      src/generator.c
  5. 205
      src/stratifier.c

15
src/ckpool.c

@ -180,11 +180,12 @@ ckmsgq_t *create_ckmsgqs(ckpool_t *ckp, const char *name, const void *func, cons
/* Generic function for adding messages to a ckmsgq linked list and signal the /* Generic function for adding messages to a ckmsgq linked list and signal the
* ckmsgq parsing thread(s) to wake up and process it. */ * ckmsgq parsing thread(s) to wake up and process it. */
void ckmsgq_add(ckmsgq_t *ckmsgq, void *data) void _ckmsgq_add(ckmsgq_t *ckmsgq, void *data, const char *file, const char *func, const int line)
{ {
ckmsg_t *msg; ckmsg_t *msg;
if (unlikely(!ckmsgq)) { if (unlikely(!ckmsgq)) {
LOGWARNING("Sending messages to no queue from %s %s:%d", file, func, line);
/* Discard data if we're unlucky enough to be sending it to /* Discard data if we're unlucky enough to be sending it to
* msg queues not set up during start up */ * msg queues not set up during start up */
free(data); free(data);
@ -286,7 +287,7 @@ unix_msg_t *get_unix_msg(proc_instance_t *pi)
return umsg; return umsg;
} }
void create_unix_receiver(proc_instance_t *pi) static void create_unix_receiver(proc_instance_t *pi)
{ {
pthread_t pth; pthread_t pth;
@ -909,7 +910,7 @@ out:
static void terminate_oldpid(const ckpool_t *ckp, proc_instance_t *pi, const pid_t oldpid) static void terminate_oldpid(const ckpool_t *ckp, proc_instance_t *pi, const pid_t oldpid)
{ {
if (!ckp->killold) { if (!ckp->killold) {
quit(1, "Process %s pid %d still exists, start ckpool with -k if you wish to kill it", quit(1, "Process %s pid %d still exists, start ckpool with -H to get a handover or -k if you wish to kill it",
pi->processname, oldpid); pi->processname, oldpid);
} }
LOGNOTICE("Terminating old process %s pid %d", pi->processname, oldpid); LOGNOTICE("Terminating old process %s pid %d", pi->processname, oldpid);
@ -1499,6 +1500,7 @@ static void prepare_child(ckpool_t *ckp, proc_instance_t *pi, void *process, cha
pi->sockname = pi->processname; pi->sockname = pi->processname;
create_process_unixsock(pi); create_process_unixsock(pi);
create_pthread(&pi->pth_process, process, pi); create_pthread(&pi->pth_process, process, pi);
create_unix_receiver(pi);
} }
#ifdef USE_CKDB #ifdef USE_CKDB
@ -1559,7 +1561,7 @@ static bool send_recv_path(const char *path, const char *msg)
LOGWARNING("Received: %s in response to %s request", response, msg); LOGWARNING("Received: %s in response to %s request", response, msg);
dealloc(response); dealloc(response);
} else } else
LOGWARNING("Received not response to %s request", msg); LOGWARNING("Received no response to %s request", msg);
Close(sockd); Close(sockd);
return ret; return ret;
} }
@ -1822,6 +1824,7 @@ int main(int argc, char **argv)
ckp.main.sockname = strdup("listener"); ckp.main.sockname = strdup("listener");
name_process_sockname(&ckp.main.us, &ckp.main); name_process_sockname(&ckp.main.us, &ckp.main);
ckp.oldconnfd = ckzalloc(sizeof(int *) * ckp.serverurls); ckp.oldconnfd = ckzalloc(sizeof(int *) * ckp.serverurls);
manage_old_instance(&ckp, &ckp.main);
if (ckp.handover) { if (ckp.handover) {
const char *path = ckp.main.us.path; const char *path = ckp.main.us.path;
@ -1870,8 +1873,6 @@ int main(int argc, char **argv)
} }
} }
if (!ckp.handover)
manage_old_instance(&ckp, &ckp.main);
write_namepid(&ckp.main); write_namepid(&ckp.main);
open_process_sock(&ckp, &ckp.main, &ckp.main.us); open_process_sock(&ckp, &ckp.main, &ckp.main.us);
launch_logger(&ckp); launch_logger(&ckp);
@ -1888,7 +1889,7 @@ int main(int argc, char **argv)
ckp.maxclients = ret * 9 / 10; ckp.maxclients = ret * 9 / 10;
} }
ckp.ckpapi = create_ckmsgq(&ckp, "api", &ckpool_api); // ckp.ckpapi = create_ckmsgq(&ckp, "api", &ckpool_api);
create_pthread(&ckp.pth_listener, listener, &ckp.main); create_pthread(&ckp.pth_listener, listener, &ckp.main);
handler.sa_handler = &sighandler; handler.sa_handler = &sighandler;

4
src/ckpool.h

@ -315,10 +315,10 @@ static const char __maybe_unused *stratum_msgs[] = {
ckmsgq_t *create_ckmsgq(ckpool_t *ckp, const char *name, const void *func); ckmsgq_t *create_ckmsgq(ckpool_t *ckp, const char *name, const void *func);
ckmsgq_t *create_ckmsgqs(ckpool_t *ckp, const char *name, const void *func, const int count); ckmsgq_t *create_ckmsgqs(ckpool_t *ckp, const char *name, const void *func, const int count);
void ckmsgq_add(ckmsgq_t *ckmsgq, void *data); void _ckmsgq_add(ckmsgq_t *ckmsgq, void *data, const char *file, const char *func, const int line);
#define ckmsgq_add(ckmsgq, data) _ckmsgq_add(ckmsgq, data, __FILE__, __func__, __LINE__)
bool ckmsgq_empty(ckmsgq_t *ckmsgq); bool ckmsgq_empty(ckmsgq_t *ckmsgq);
unix_msg_t *get_unix_msg(proc_instance_t *pi); unix_msg_t *get_unix_msg(proc_instance_t *pi);
void create_unix_receiver(proc_instance_t *pi);
ckpool_t *global_ckp; ckpool_t *global_ckp;

71
src/connector.c

@ -305,13 +305,6 @@ static int accept_client(cdata_t *cdata, const int epfd, const uint64_t server)
cdata->nfds++; cdata->nfds++;
ck_wunlock(&cdata->lock); ck_wunlock(&cdata->lock);
event.data.u64 = client->id;
event.events = EPOLLIN | EPOLLRDHUP | EPOLLONESHOT;
if (unlikely(epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &event) < 0)) {
LOGERR("Failed to epoll_ctl add in accept_client");
return 0;
}
/* We increase the ref count on this client as epoll creates a pointer /* We increase the ref count on this client as epoll creates a pointer
* to it. We drop that reference when the socket is closed which * to it. We drop that reference when the socket is closed which
* removes it automatically from the epoll list. */ * removes it automatically from the epoll list. */
@ -321,6 +314,14 @@ static int accept_client(cdata_t *cdata, const int epfd, const uint64_t server)
getsockopt(fd, SOL_SOCKET, SO_RCVBUF, &client->sendbufsize, &optlen); getsockopt(fd, SOL_SOCKET, SO_RCVBUF, &client->sendbufsize, &optlen);
LOGDEBUG("Client sendbufsize detected as %d", client->sendbufsize); LOGDEBUG("Client sendbufsize detected as %d", client->sendbufsize);
event.data.u64 = client->id;
event.events = EPOLLIN | EPOLLRDHUP | EPOLLONESHOT;
if (unlikely(epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &event) < 0)) {
LOGERR("Failed to epoll_ctl add in accept_client");
dec_instance_ref(cdata, client);
return 0;
}
return 1; return 1;
} }
@ -332,8 +333,8 @@ static int __drop_client(cdata_t *cdata, client_instance_t *client)
goto out; goto out;
client->invalid = true; client->invalid = true;
ret = client->fd; ret = client->fd;
/* Closing the fd will automatically remove it from the epoll list */
Close(client->fd); Close(client->fd);
epoll_ctl(cdata->epfd, EPOLL_CTL_DEL, ret, NULL);
HASH_DEL(cdata->clients, client); HASH_DEL(cdata->clients, client);
DL_APPEND(cdata->dead_clients, client); DL_APPEND(cdata->dead_clients, client);
/* This is the reference to this client's presence in the /* This is the reference to this client's presence in the
@ -572,22 +573,16 @@ static void client_event_processor(ckpool_t *ckp, struct epoll_event *event)
LOGNOTICE("Failed to find client by id %"PRId64" in receiver!", id); LOGNOTICE("Failed to find client by id %"PRId64" in receiver!", id);
goto outnoclient; goto outnoclient;
} }
if (unlikely(client->invalid))
goto out;
/* We can have both messages and read hang ups so process the /* We can have both messages and read hang ups so process the
* message first. */ * message first. */
if (likely(events & EPOLLIN)) { if (likely(events & EPOLLIN)) {
/* Rearm the client for epoll events if we have successfully /* Rearm the client for epoll events if we have successfully
* parsed a message from it */ * parsed a message from it */
if (parse_client_msg(ckp, cdata, client)) { if (unlikely(!parse_client_msg(ckp, cdata, client))) {
event->data.u64 = id;
event->events = EPOLLIN | EPOLLRDHUP | EPOLLONESHOT;
epoll_ctl(cdata->epfd, EPOLL_CTL_MOD, client->fd, event);
} else
invalidate_client(ckp, cdata, client); invalidate_client(ckp, cdata, client);
}
if (unlikely(client->invalid))
goto out; goto out;
}
}
if (unlikely(events & EPOLLERR)) { if (unlikely(events & EPOLLERR)) {
socklen_t errlen = sizeof(int); socklen_t errlen = sizeof(int);
int error = 0; int error = 0;
@ -613,6 +608,12 @@ static void client_event_processor(ckpool_t *ckp, struct epoll_event *event)
invalidate_client(cdata->pi->ckp, cdata, client); invalidate_client(cdata->pi->ckp, cdata, client);
} }
out: out:
if (likely(!client->invalid)) {
/* Rearm the fd in the epoll list if it's still active */
event->data.u64 = id;
event->events = EPOLLIN | EPOLLRDHUP | EPOLLONESHOT;
epoll_ctl(cdata->epfd, EPOLL_CTL_MOD, client->fd, event);
}
dec_instance_ref(cdata, client); dec_instance_ref(cdata, client);
outnoclient: outnoclient:
free(event); free(event);
@ -624,8 +625,10 @@ static void *receiver(void *arg)
{ {
cdata_t *cdata = (cdata_t *)arg; cdata_t *cdata = (cdata_t *)arg;
struct epoll_event *event = ckzalloc(sizeof(struct epoll_event)); struct epoll_event *event = ckzalloc(sizeof(struct epoll_event));
ckpool_t *ckp = cdata->ckp;
uint64_t serverfds, i; uint64_t serverfds, i;
int ret, epfd; int ret, epfd;
char *buf;
rename_proc("creceiver"); rename_proc("creceiver");
@ -634,7 +637,7 @@ static void *receiver(void *arg)
LOGEMERG("FATAL: Failed to create epoll in receiver"); LOGEMERG("FATAL: Failed to create epoll in receiver");
goto out; goto out;
} }
serverfds = cdata->ckp->serverurls; serverfds = ckp->serverurls;
/* Add all the serverfds to the epoll */ /* Add all the serverfds to the epoll */
for (i = 0; i < serverfds; i++) { for (i = 0; i < serverfds; i++) {
/* The small values will be less than the first client ids */ /* The small values will be less than the first client ids */
@ -647,8 +650,11 @@ static void *receiver(void *arg)
} }
} }
while (!cdata->accept) /* Wait for the stratifier to be ready for us */
cksleep_ms(1); do {
buf = send_recv_proc(ckp->stratifier, "ping");
} while (!buf);
free(buf);
while (42) { while (42) {
uint64_t edu64; uint64_t edu64;
@ -1308,7 +1314,7 @@ static char *connector_stats(cdata_t *cdata, const int runtime)
return buf; return buf;
} }
static int connector_loop(proc_instance_t *pi, cdata_t *cdata) static void connector_loop(proc_instance_t *pi, cdata_t *cdata)
{ {
unix_msg_t *umsg = NULL; unix_msg_t *umsg = NULL;
ckpool_t *ckp = pi->ckp; ckpool_t *ckp = pi->ckp;
@ -1408,8 +1414,6 @@ retry:
send_unix_msg(umsg->sockd, msg); send_unix_msg(umsg->sockd, msg);
} else if (cmdmatch(buf, "loglevel")) { } else if (cmdmatch(buf, "loglevel")) {
sscanf(buf, "loglevel=%d", &ckp->loglevel); sscanf(buf, "loglevel=%d", &ckp->loglevel);
} else if (cmdmatch(buf, "shutdown")) {
goto out;
} else if (cmdmatch(buf, "passthrough")) { } else if (cmdmatch(buf, "passthrough")) {
client_instance_t *client; client_instance_t *client;
@ -1449,16 +1453,14 @@ retry:
} else } else
LOGWARNING("Unhandled connector message: %s", buf); LOGWARNING("Unhandled connector message: %s", buf);
goto retry; goto retry;
out:
return ret;
} }
void *connector(void *arg) void *connector(void *arg)
{ {
proc_instance_t *pi = (proc_instance_t *)arg; proc_instance_t *pi = (proc_instance_t *)arg;
cdata_t *cdata = ckzalloc(sizeof(cdata_t)); cdata_t *cdata = ckzalloc(sizeof(cdata_t));
int threads, sockd, ret = 0, i, tries = 0;
char newurl[INET6_ADDRSTRLEN], newport[8]; char newurl[INET6_ADDRSTRLEN], newport[8];
int threads, sockd, i, tries = 0, ret;
ckpool_t *ckp = pi->ckp; ckpool_t *ckp = pi->ckp;
const int on = 1; const int on = 1;
@ -1477,7 +1479,6 @@ void *connector(void *arg)
sockd = socket(AF_INET, SOCK_STREAM, 0); sockd = socket(AF_INET, SOCK_STREAM, 0);
if (sockd < 0) { if (sockd < 0) {
LOGERR("Connector failed to open socket"); LOGERR("Connector failed to open socket");
ret = 1;
goto out; goto out;
} }
setsockopt(sockd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)); setsockopt(sockd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
@ -1487,6 +1488,7 @@ void *connector(void *arg)
serv_addr.sin_port = htons(ckp->proxy ? 3334 : 3333); serv_addr.sin_port = htons(ckp->proxy ? 3334 : 3333);
do { do {
ret = bind(sockd, (struct sockaddr*)&serv_addr, sizeof(serv_addr)); ret = bind(sockd, (struct sockaddr*)&serv_addr, sizeof(serv_addr));
if (!ret) if (!ret)
break; break;
LOGWARNING("Connector failed to bind to socket, retrying in 5s"); LOGWARNING("Connector failed to bind to socket, retrying in 5s");
@ -1517,7 +1519,6 @@ void *connector(void *arg)
if (!url_from_serverurl(serverurl, newurl, newport)) { if (!url_from_serverurl(serverurl, newurl, newport)) {
LOGWARNING("Failed to extract resolved url from %s", serverurl); LOGWARNING("Failed to extract resolved url from %s", serverurl);
ret = 1;
goto out; goto out;
} }
sockd = ckp->oldconnfd[i]; sockd = ckp->oldconnfd[i];
@ -1541,7 +1542,6 @@ void *connector(void *arg)
if (sockd < 0) { if (sockd < 0) {
LOGERR("Connector failed to bind to socket for 2 minutes"); LOGERR("Connector failed to bind to socket for 2 minutes");
ret = 1;
goto out; goto out;
} }
if (listen(sockd, 8192) < 0) { if (listen(sockd, 8192) < 0) {
@ -1558,10 +1558,9 @@ void *connector(void *arg)
cdata->cmpq = create_ckmsgq(ckp, "cmpq", &client_message_processor); cdata->cmpq = create_ckmsgq(ckp, "cmpq", &client_message_processor);
if (ckp->remote && !setup_upstream(ckp, cdata)) { if (ckp->remote && !setup_upstream(ckp, cdata))
ret = 1;
goto out; goto out;
}
cklock_init(&cdata->lock); cklock_init(&cdata->lock);
cdata->pi = pi; cdata->pi = pi;
cdata->nfds = 0; cdata->nfds = 0;
@ -1576,10 +1575,10 @@ void *connector(void *arg)
create_pthread(&cdata->pth_receiver, receiver, cdata); create_pthread(&cdata->pth_receiver, receiver, cdata);
cdata->start_time = time(NULL); cdata->start_time = time(NULL);
create_unix_receiver(pi); connector_loop(pi, cdata);
ret = connector_loop(pi, cdata);
out: out:
dealloc(ckp->cdata); /* We should never get here unless there's a fatal error */
LOGEMERG("Connector failure, shutting down");
exit(1);
return NULL; return NULL;
} }

9
src/generator.c

@ -2787,6 +2787,10 @@ static void *server_watchdog(void *arg)
ckpool_t *ckp = (ckpool_t *)arg; ckpool_t *ckp = (ckpool_t *)arg;
gdata_t *gdata = ckp->gdata; gdata_t *gdata = ckp->gdata;
rename_proc("swatchdog");
pthread_detach(pthread_self());
while (42) { while (42) {
server_instance_t *best = NULL; server_instance_t *best = NULL;
ts_t timer_t; ts_t timer_t;
@ -2915,7 +2919,6 @@ void *generator(void *arg)
gdata = ckzalloc(sizeof(gdata_t)); gdata = ckzalloc(sizeof(gdata_t));
ckp->gdata = gdata; ckp->gdata = gdata;
gdata->ckp = ckp; gdata->ckp = ckp;
create_unix_receiver(pi);
if (ckp->proxy) { if (ckp->proxy) {
char *buf = NULL; char *buf = NULL;
@ -2929,6 +2932,8 @@ void *generator(void *arg)
proxy_mode(ckp, pi); proxy_mode(ckp, pi);
} else } else
server_mode(ckp, pi); server_mode(ckp, pi);
dealloc(ckp->gdata); /* We should never get here unless there's a fatal error */
LOGEMERG("Generator failure, shutting down");
exit(1);
return NULL; return NULL;
} }

205
src/stratifier.c

@ -4772,7 +4772,53 @@ static user_instance_t *generate_user(ckpool_t *ckp, stratum_instance_t *client,
return user; return user;
} }
static void set_worker_mindiff(ckpool_t *ckp, const char *workername, int mindiff); static void set_worker_mindiff(ckpool_t *ckp, const char *workername, int mindiff)
{
stratum_instance_t *client;
sdata_t *sdata = ckp->sdata;
worker_instance_t *worker;
user_instance_t *user;
/* Find the user first */
user = user_by_workername(sdata, workername);
/* Then find the matching worker user */
worker = get_worker(sdata, user, workername);
if (mindiff < 1) {
if (likely(!mindiff)) {
worker->mindiff = 0;
return;
}
LOGINFO("Worker %s requested invalid diff %d", worker->workername, mindiff);
return;
}
if (mindiff < ckp->mindiff)
mindiff = ckp->mindiff;
if (mindiff == worker->mindiff)
return;
worker->mindiff = mindiff;
/* Iterate over all the workers from this user to find any with the
* matching worker that are currently live and send them a new diff
* if we can. Otherwise it will only act as a clamp on next share
* submission. */
ck_rlock(&sdata->instance_lock);
DL_FOREACH(user->clients, client) {
if (client->worker_instance != worker)
continue;
/* Per connection suggest diff overrides worker mindiff ugh */
if (mindiff < client->suggest_diff)
continue;
if (mindiff == client->diff)
continue;
client->diff_change_job_id = sdata->workbase_id + 1;
client->old_diff = client->diff;
client->diff = mindiff;
stratum_send_diff(sdata, client);
}
ck_runlock(&sdata->instance_lock);
}
static void parse_worker_diffs(ckpool_t *ckp, json_t *worker_array) static void parse_worker_diffs(ckpool_t *ckp, json_t *worker_array)
{ {
@ -4845,7 +4891,6 @@ static int send_recv_auth(stratum_instance_t *client)
responselen = strlen(buf); responselen = strlen(buf);
if (likely(responselen > 0)) { if (likely(responselen > 0)) {
char *cmd = NULL, *secondaryuserid = NULL, *response; char *cmd = NULL, *secondaryuserid = NULL, *response;
worker_instance_t *worker = client->worker_instance;
json_error_t err_val; json_error_t err_val;
json_t *val = NULL; json_t *val = NULL;
int offset = 0; int offset = 0;
@ -4877,7 +4922,6 @@ static int send_recv_auth(stratum_instance_t *client)
json_get_string(&secondaryuserid, val, "secondaryuserid"); json_get_string(&secondaryuserid, val, "secondaryuserid");
parse_worker_diffs(ckp, worker_array); parse_worker_diffs(ckp, worker_array);
client->suggest_diff = worker->mindiff;
user->auth_time = time(NULL); user->auth_time = time(NULL);
} }
if (secondaryuserid && (!safecmp(response, "ok.authorise") || if (secondaryuserid && (!safecmp(response, "ok.authorise") ||
@ -4918,9 +4962,6 @@ static void queue_delayed_auth(stratum_instance_t *client)
json_t *val; json_t *val;
ts_t now; ts_t now;
/* Read off any cached mindiff from previous auths */
client->suggest_diff = client->worker_instance->mindiff;
ts_realtime(&now); ts_realtime(&now);
sprintf(cdfield, "%lu,%lu", now.tv_sec, now.tv_nsec); sprintf(cdfield, "%lu,%lu", now.tv_sec, now.tv_nsec);
@ -5026,9 +5067,7 @@ static json_t *parse_authorise(stratum_instance_t *client, const json_t *params_
/* Preauth workers for the first 10 minutes after the user is /* Preauth workers for the first 10 minutes after the user is
* first authorised by ckdb to avoid floods of worker auths. * first authorised by ckdb to avoid floods of worker auths.
* *errnum is implied zero already so ret will be set true */ * *errnum is implied zero already so ret will be set true */
if (user->auth_time && time(NULL) - user->auth_time < 600) if (!user->auth_time || time(NULL) - user->auth_time > 600)
client->suggest_diff = client->worker_instance->mindiff;
else
*errnum = send_recv_auth(client); *errnum = send_recv_auth(client);
if (!*errnum) if (!*errnum)
ret = true; ret = true;
@ -5121,7 +5160,7 @@ static void add_submit(ckpool_t *ckp, stratum_instance_t *client, const double d
worker_instance_t *worker = client->worker_instance; worker_instance_t *worker = client->worker_instance;
double tdiff, bdiff, dsps, drr, network_diff, bias; double tdiff, bdiff, dsps, drr, network_diff, bias;
user_instance_t *user = client->user_instance; user_instance_t *user = client->user_instance;
int64_t next_blockid, optimal; int64_t next_blockid, optimal, mindiff;
tv_t now_t; tv_t now_t;
mutex_lock(&ckp_sdata->stats_lock); mutex_lock(&ckp_sdata->stats_lock);
@ -5196,8 +5235,13 @@ static void add_submit(ckpool_t *ckp, stratum_instance_t *client, const double d
if (drr > 0.15 && drr < 0.4) if (drr > 0.15 && drr < 0.4)
return; return;
/* Client suggest diff overrides worker mindiff */
if (client->suggest_diff)
mindiff = client->suggest_diff;
else
mindiff = worker->mindiff;
/* Allow slightly lower diffs when users choose their own mindiff */ /* Allow slightly lower diffs when users choose their own mindiff */
if (worker->mindiff || client->suggest_diff) { if (mindiff) {
if (drr < 0.5) if (drr < 0.5)
return; return;
optimal = lround(dsps * 2.4); optimal = lround(dsps * 2.4);
@ -5205,18 +5249,20 @@ static void add_submit(ckpool_t *ckp, stratum_instance_t *client, const double d
optimal = lround(dsps * 3.33); optimal = lround(dsps * 3.33);
/* Clamp to mindiff ~ network_diff */ /* Clamp to mindiff ~ network_diff */
if (optimal < ckp->mindiff)
optimal = ckp->mindiff; /* Set to higher of pool mindiff and optimal */
/* Client suggest diff overrides worker mindiff */ optimal = MAX(optimal, ckp->mindiff);
if (client->suggest_diff) {
if (optimal < client->suggest_diff) /* Set to higher of optimal and user chosen diff */
optimal = client->suggest_diff; optimal = MAX(optimal, mindiff);
} else if (optimal < worker->mindiff)
optimal = worker->mindiff; /* Set to lower of optimal and pool maxdiff */
if (ckp->maxdiff && optimal > ckp->maxdiff) if (ckp->maxdiff)
optimal = ckp->maxdiff; optimal = MIN(optimal, ckp->maxdiff);
if (optimal > network_diff)
optimal = network_diff; /* Set to lower of optimal and network_diff */
optimal = MIN(optimal, network_diff);
if (client->diff == optimal) if (client->diff == optimal)
return; return;
@ -5539,8 +5585,8 @@ out_submit:
out_unlock: out_unlock:
ck_runlock(&sdata->workbase_lock); ck_runlock(&sdata->workbase_lock);
/* Accept the lower of new and old diffs until the next update */ /* Accept shares of the old diff until the next update */
if (id < client->diff_change_job_id && client->old_diff < client->diff) if (id < client->diff_change_job_id)
diff = client->old_diff; diff = client->old_diff;
if (!invalid) { if (!invalid) {
char wdiffsuffix[16]; char wdiffsuffix[16];
@ -5748,55 +5794,13 @@ static json_params_t
return jp; return jp;
} }
static void set_worker_mindiff(ckpool_t *ckp, const char *workername, int mindiff)
{
stratum_instance_t *client;
sdata_t *sdata = ckp->sdata;
worker_instance_t *worker;
user_instance_t *user;
/* Find the user first */
user = user_by_workername(sdata, workername);
/* Then find the matching worker user */
worker = get_worker(sdata, user, workername);
if (mindiff < 0) {
LOGINFO("Worker %s requested invalid diff %d", worker->workername, mindiff);
return;
}
if (mindiff < ckp->mindiff)
mindiff = ckp->mindiff;
if (mindiff == worker->mindiff)
return;
worker->mindiff = mindiff;
/* Iterate over all the workers from this user to find any with the
* matching worker that are currently live and send them a new diff
* if we can. Otherwise it will only act as a clamp on next share
* submission. */
ck_rlock(&sdata->instance_lock);
DL_FOREACH(user->clients, client) {
if (client->worker_instance != worker)
continue;
/* Per connection suggest diff overrides worker mindiff ugh */
if (mindiff < client->suggest_diff)
continue;
if (mindiff == client->diff)
continue;
client->diff = mindiff;
stratum_send_diff(sdata, client);
}
ck_runlock(&sdata->instance_lock);
}
/* Implement support for the diff in the params as well as the originally /* Implement support for the diff in the params as well as the originally
* documented form of placing diff within the method. Needs to be entered with * documented form of placing diff within the method. Needs to be entered with
* client holding a ref count. */ * client holding a ref count. */
static void suggest_diff(stratum_instance_t *client, const char *method, const json_t *params_val) static void suggest_diff(ckpool_t *ckp, stratum_instance_t *client, const char *method,
const json_t *params_val)
{ {
json_t *arr_val = json_array_get(params_val, 0); json_t *arr_val = json_array_get(params_val, 0);
sdata_t *sdata = client->ckp->sdata;
int64_t sdiff; int64_t sdiff;
if (unlikely(!client_active(client))) { if (unlikely(!client_active(client))) {
@ -5809,16 +5813,18 @@ static void suggest_diff(stratum_instance_t *client, const char *method, const j
LOGINFO("Failed to parse suggest_difficulty for client %"PRId64, client->id); LOGINFO("Failed to parse suggest_difficulty for client %"PRId64, client->id);
return; return;
} }
/* Clamp suggest diff to global pool mindiff */
if (sdiff < ckp->mindiff)
sdiff = ckp->mindiff;
if (sdiff == client->suggest_diff) if (sdiff == client->suggest_diff)
return; return;
client->suggest_diff = sdiff; client->suggest_diff = sdiff;
if (client->diff == sdiff) if (client->diff == sdiff)
return; return;
if (sdiff < client->ckp->mindiff) client->diff_change_job_id = client->sdata->workbase_id + 1;
client->diff = client->ckp->mindiff; client->old_diff = client->diff;
else
client->diff = sdiff; client->diff = sdiff;
stratum_send_diff(sdata, client); stratum_send_diff(ckp->sdata, client);
} }
/* Send diff first when sending the first stratum template after subscribing */ /* Send diff first when sending the first stratum template after subscribing */
@ -6032,7 +6038,7 @@ static void parse_method(ckpool_t *ckp, sdata_t *sdata, stratum_instance_t *clie
} }
if (cmdmatch(method, "mining.suggest")) { if (cmdmatch(method, "mining.suggest")) {
suggest_diff(client, method, params_val); suggest_diff(ckp, client, method, params_val);
return; return;
} }
@ -6675,10 +6681,10 @@ static stratum_instance_t *preauth_ref_instance_by_id(sdata_t *sdata, const int6
static void sauth_process(ckpool_t *ckp, json_params_t *jp) static void sauth_process(ckpool_t *ckp, json_params_t *jp)
{ {
json_t *result_val, *json_msg, *err_val = NULL; json_t *result_val, *json_msg, *err_val = NULL;
stratum_instance_t *client;
sdata_t *sdata = ckp->sdata; sdata_t *sdata = ckp->sdata;
int mindiff, errnum = 0; stratum_instance_t *client;
int64_t client_id; int64_t mindiff, client_id;
int errnum = 0;
client_id = jp->client_id; client_id = jp->client_id;
@ -6708,12 +6714,18 @@ static void sauth_process(ckpool_t *ckp, json_params_t *jp)
steal_json_id(json_msg, jp); steal_json_id(json_msg, jp);
stratum_add_send(sdata, json_msg, client_id, SM_AUTHRESULT); stratum_add_send(sdata, json_msg, client_id, SM_AUTHRESULT);
if (!json_is_true(result_val) || !client->suggest_diff) if (!json_is_true(result_val))
goto out; goto out;
/* Update the client now if they have set a valid mindiff different /* Update the client now if they have set a valid mindiff different
* from the startdiff */ * from the startdiff. suggest_diff overrides worker mindiff */
mindiff = MAX(ckp->mindiff, client->suggest_diff); if (client->suggest_diff)
mindiff = client->suggest_diff;
else
mindiff = client->worker_instance->mindiff;
if (!mindiff)
goto out;
mindiff = MAX(ckp->mindiff, mindiff);
if (mindiff != client->diff) { if (mindiff != client->diff) {
client->diff = mindiff; client->diff = mindiff;
stratum_send_diff(sdata, client); stratum_send_diff(sdata, client);
@ -6779,7 +6791,7 @@ static bool test_and_clear(bool *val, mutex_t *lock)
static void ckdbq_process(ckpool_t *ckp, char *msg) static void ckdbq_process(ckpool_t *ckp, char *msg)
{ {
sdata_t *sdata = ckp->sdata; sdata_t *sdata = ckp->sdata;
size_t responselen = 0; size_t responselen;
char *buf = NULL; char *buf = NULL;
while (!buf) { while (!buf) {
@ -6799,16 +6811,15 @@ static void ckdbq_process(ckpool_t *ckp, char *msg)
/* Process any requests from ckdb that are heartbeat responses with /* Process any requests from ckdb that are heartbeat responses with
* specific requests. */ * specific requests. */
if (likely(buf))
responselen = strlen(buf); responselen = strlen(buf);
if (likely(responselen > 0)) { if (likely(responselen > 1)) {
char *response = alloca(responselen); char *response = alloca(responselen);
int offset = 0; int offset = 0;
memset(response, 0, responselen); memset(response, 0, responselen);
if (sscanf(buf, "%*d.%*d.%c%n", response, &offset) > 0) { if (likely(sscanf(buf, "%*d.%*d.%c%n", response, &offset) > 0)) {
strcpy(response+1, buf+offset); strcpy(response + 1, buf + offset);
if (safecmp(response, "ok")) { if (likely(safecmp(response, "ok"))) {
char *cmd; char *cmd;
cmd = response; cmd = response;
@ -6822,8 +6833,8 @@ static void ckdbq_process(ckpool_t *ckp, char *msg)
LOGWARNING("Got ckdb failure response: %s", buf); LOGWARNING("Got ckdb failure response: %s", buf);
} else } else
LOGWARNING("Got bad ckdb response: %s", buf); LOGWARNING("Got bad ckdb response: %s", buf);
free(buf);
} }
free(buf);
} }
static int transactions_by_jobid(sdata_t *sdata, const int64_t id) static int transactions_by_jobid(sdata_t *sdata, const int64_t id)
@ -7587,11 +7598,11 @@ void *stratifier(void *arg)
* are CPUs */ * are CPUs */
threads = sysconf(_SC_NPROCESSORS_ONLN) / 2 ? : 1; threads = sysconf(_SC_NPROCESSORS_ONLN) / 2 ? : 1;
sdata->sshareq = create_ckmsgqs(ckp, "sprocessor", &sshare_process, threads); sdata->sshareq = create_ckmsgqs(ckp, "sprocessor", &sshare_process, threads);
sdata->ssends = create_ckmsgq(ckp, "ssender", &ssend_process); sdata->ssends = create_ckmsgqs(ckp, "ssender", &ssend_process, threads);
sdata->sauthq = create_ckmsgq(ckp, "authoriser", &sauth_process); sdata->sauthq = create_ckmsgq(ckp, "authoriser", &sauth_process);
sdata->stxnq = create_ckmsgq(ckp, "stxnq", &send_transactions); sdata->stxnq = create_ckmsgq(ckp, "stxnq", &send_transactions);
sdata->srecvs = create_ckmsgqs(ckp, "sreceiver", &srecv_process, threads); sdata->srecvs = create_ckmsgqs(ckp, "sreceiver", &srecv_process, threads);
sdata->ckdbq = create_ckmsgq(ckp, "ckdbqueue", &ckdbq_process); sdata->ckdbq = create_ckmsgqs(ckp, "ckdbqueue", &ckdbq_process, threads);
create_pthread(&pth_heartbeat, ckdb_heartbeat, ckp); create_pthread(&pth_heartbeat, ckdb_heartbeat, ckp);
read_poolstats(ckp); read_poolstats(ckp);
@ -7609,22 +7620,12 @@ void *stratifier(void *arg)
mutex_init(&sdata->share_lock); mutex_init(&sdata->share_lock);
mutex_init(&sdata->block_lock); mutex_init(&sdata->block_lock);
create_unix_receiver(pi);
LOGWARNING("%s stratifier ready", ckp->name); LOGWARNING("%s stratifier ready", ckp->name);
stratum_loop(ckp, pi); stratum_loop(ckp, pi);
out: out:
if (ckp->proxy) { /* We should never get here unless there's a fatal error */
proxy_t *proxy, *tmpproxy; LOGEMERG("Stratifier failure, shutting down");
exit(1);
mutex_lock(&sdata->proxy_lock);
HASH_ITER(hh, sdata->proxies, proxy, tmpproxy) {
HASH_DEL(sdata->proxies, proxy);
dealloc(proxy);
}
mutex_unlock(&sdata->proxy_lock);
}
dealloc(ckp->sdata);
return NULL; return NULL;
} }

Loading…
Cancel
Save