diff --git a/configure.ac b/configure.ac index d99cf945..49d28ee2 100644 --- a/configure.ac +++ b/configure.ac @@ -1,4 +1,4 @@ -AC_INIT(ckpool, 0.8.5, kernel@kolivas.org) +AC_INIT(ckpool, 0.8.6, kernel@kolivas.org) AC_CANONICAL_SYSTEM AC_CONFIG_MACRO_DIR([m4]) diff --git a/src/ckdb.c b/src/ckdb.c index 824aef28..4dca76ad 100644 --- a/src/ckdb.c +++ b/src/ckdb.c @@ -301,7 +301,7 @@ K_STORE *logqueue_store; // WORKQUEUE K_LIST *workqueue_free; K_STORE *workqueue_store; -pthread_mutex_t wq_waitlock; +mutex_t wq_waitlock; pthread_cond_t wq_waitcond; // HEARTBEATQUEUE @@ -3286,7 +3286,7 @@ static void *listener(void *arg) timeraddspec(&abs, &tsdiff); mutex_lock(&wq_waitlock); - pthread_cond_timedwait(&wq_waitcond, &wq_waitlock, &abs); + cond_timedwait(&wq_waitcond, &wq_waitlock, &abs); mutex_unlock(&wq_waitlock); } } diff --git a/src/ckdb.h b/src/ckdb.h index 9a48d42a..7f65e817 100644 --- a/src/ckdb.h +++ b/src/ckdb.h @@ -659,7 +659,7 @@ typedef struct workqueue { extern K_LIST *workqueue_free; extern K_STORE *workqueue_store; -extern pthread_mutex_t wq_waitlock; +extern mutex_t wq_waitlock; extern pthread_cond_t wq_waitcond; // HEARTBEATQUEUE diff --git a/src/ckpool.c b/src/ckpool.c index cdbfbe39..1ffe45f4 100644 --- a/src/ckpool.c +++ b/src/ckpool.c @@ -120,7 +120,7 @@ static void *ckmsg_queue(void *arg) tv_to_ts(&abs, &now); abs.tv_sec++; if (!ckmsgq->msgs) - pthread_cond_timedwait(ckmsgq->cond, ckmsgq->lock, &abs); + cond_timedwait(ckmsgq->cond, ckmsgq->lock, &abs); msg = ckmsgq->msgs; if (msg) DL_DELETE(ckmsgq->msgs, msg); @@ -141,7 +141,7 @@ ckmsgq_t *create_ckmsgq(ckpool_t *ckp, const char *name, const void *func) strncpy(ckmsgq->name, name, 15); ckmsgq->func = func; ckmsgq->ckp = ckp; - ckmsgq->lock = ckalloc(sizeof(pthread_mutex_t)); + ckmsgq->lock = ckalloc(sizeof(mutex_t)); ckmsgq->cond = ckalloc(sizeof(pthread_cond_t)); mutex_init(ckmsgq->lock); cond_init(ckmsgq->cond); @@ -153,11 +153,11 @@ ckmsgq_t *create_ckmsgq(ckpool_t *ckp, const char *name, const void *func) ckmsgq_t *create_ckmsgqs(ckpool_t *ckp, const char *name, const void *func, const int count) { ckmsgq_t *ckmsgq = ckzalloc(sizeof(ckmsgq_t) * count); - pthread_mutex_t *lock; + mutex_t *lock; pthread_cond_t *cond; int i; - lock = ckalloc(sizeof(pthread_mutex_t)); + lock = ckalloc(sizeof(mutex_t)); cond = ckalloc(sizeof(pthread_cond_t)); mutex_init(lock); cond_init(cond); @@ -237,7 +237,25 @@ static int pid_wait(const pid_t pid, const int ms) return ret; } -static int send_procmsg(const proc_instance_t *pi, const char *buf) +static int get_proc_pid(const proc_instance_t *pi) +{ + int ret, pid = 0; + char path[256]; + FILE *fp; + + sprintf(path, "%s%s.pid", pi->ckp->socket_dir, pi->processname); + fp = fopen(path, "re"); + if (!fp) + goto out; + ret = fscanf(fp, "%d", &pid); + if (ret < 1) + pid = 0; + fclose(fp); +out: + return pid; +} + +static int send_procmsg(proc_instance_t *pi, const char *buf) { char *path = pi->us.path; int ret = -1; @@ -251,6 +269,12 @@ static int send_procmsg(const proc_instance_t *pi, const char *buf) LOGERR("Attempted to send null message to socket %s in send_proc", path); goto out; } + if (unlikely(!pi->pid)) { + pi->pid = get_proc_pid(pi); + if (!pi->pid) + goto out; + + } if (unlikely(kill_pid(pi->pid, 0))) { LOGALERT("Attempting to send message %s to dead process %s", buf, pi->processname); goto out; @@ -419,9 +443,9 @@ int read_socket_line(connsock_t *cs, const int timeout) char *newbuf; ret = wait_read_select(fd, eom ? 0 : timeout); - if (eom && !ret) - break; if (ret < 1) { + if (eom) + break; if (!ret) LOGDEBUG("Select timed out in read_socket_line"); else @@ -431,7 +455,7 @@ int read_socket_line(connsock_t *cs, const int timeout) ret = recv(fd, readbuf, PAGESIZE - 4, 0); if (ret < 1) { /* Closed socket after valid message */ - if (!ret && eom) + if (eom) break; LOGERR("Failed to recv in read_socket_line"); ret = -1; @@ -474,46 +498,48 @@ out: static void childsighandler(const int sig); -static int get_proc_pid(const proc_instance_t *pi) -{ - int ret, pid = 0; - char path[256]; - FILE *fp; - - sprintf(path, "%s%s.pid", pi->ckp->socket_dir, pi->processname); - fp = fopen(path, "re"); - if (!fp) - goto out; - ret = fscanf(fp, "%d", &pid); - if (ret < 1) - pid = 0; - fclose(fp); -out: - return pid; -} +struct proc_message { + proc_instance_t *pi; + char *msg; + const char *file; + const char *func; + int line; +}; -/* Send a single message to a process instance when there will be no response, - * closing the socket immediately. */ -bool _send_proc(proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line) +/* Send all one way messages asynchronously so we can wait till the receiving + * end closes the socket to ensure all messages are received but no deadlocks + * can occur with 2 processes waiting for each other's socket closure. */ +void *async_send_proc(void *arg) { + struct proc_message *pm = (struct proc_message *)arg; + proc_instance_t *pi = pm->pi; + char *msg = pm->msg; + const char *file = pm->file; + const char *func = pm->func; + int line = pm->line; + char *path = pi->us.path; bool ret = false; int sockd; + pthread_detach(pthread_self()); + if (unlikely(!path || !strlen(path))) { LOGERR("Attempted to send message %s to null path in send_proc", msg ? msg : ""); goto out; } - if (unlikely(!msg || !strlen(msg))) { - LOGERR("Attempted to send null message to socket %s in send_proc", path); - goto out; - } /* At startup the pid fields are not set up before some processes are * forked so they never inherit them. */ - if (unlikely(!pi->pid)) + if (unlikely(!pi->pid)) { pi->pid = get_proc_pid(pi); + if (!pi->pid) { + LOGALERT("Attempting to send message %s to non existent process %s", msg, pi->processname); + goto out_nofail; + } + } if (unlikely(kill_pid(pi->pid, 0))) { - LOGALERT("Attempting to send message %s to non existent process %s", msg, pi->processname); + LOGALERT("Attempting to send message %s to non existent process %s pid %d", + msg, pi->processname, pi->pid); goto out; } sockd = open_unix_client(path); @@ -525,13 +551,38 @@ bool _send_proc(proc_instance_t *pi, const char *msg, const char *file, const ch LOGWARNING("Failed to send %s to socket %s", msg, path); else ret = true; + if (!wait_close(sockd, 5)) + LOGWARNING("send_proc %s did not detect close from %s %s:%d", msg, file, func, line); Close(sockd); out: if (unlikely(!ret)) { LOGERR("Failure in send_proc from %s %s:%d", file, func, line); childsighandler(15); } - return ret; +out_nofail: + free(msg); + free(pm); + return NULL; +} + +/* Send a single message to a process instance when there will be no response, + * closing the socket immediately. */ +void _send_proc(proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line) +{ + struct proc_message *pm; + pthread_t pth; + + if (unlikely(!msg || !strlen(msg))) { + LOGERR("Attempted to send null message to %s in send_proc", pi->processname); + return; + } + pm = ckalloc(sizeof(struct proc_message)); + pm->pi = pi; + pm->msg = strdup(msg); + pm->file = file; + pm->func = func; + pm->line = line; + create_pthread(&pth, async_send_proc, pm); } /* Send a single message to a process instance and retrieve the response, then @@ -549,7 +600,15 @@ char *_send_recv_proc(proc_instance_t *pi, const char *msg, const char *file, co LOGERR("Attempted to send null message to socket %s in send_proc", path); goto out; } + if (unlikely(!pi->pid)) { + pi->pid = get_proc_pid(pi); + if (!pi->pid) + goto out; + } if (unlikely(kill_pid(pi->pid, 0))) { + /* Reset the pid value in case we are still looking for an old + * process */ + pi->pid = 0; LOGALERT("Attempting to send message %s to dead process %s", msg, pi->processname); goto out; } @@ -1202,6 +1261,8 @@ static proc_instance_t *prepare_child(ckpool_t *ckp, int (*process)(), char *nam pi->process = process; create_process_unixsock(pi); manage_old_child(ckp, pi); + /* Remove the old pid file if we've succeeded in coming this far */ + rm_namepid(pi); return pi; } diff --git a/src/ckpool.h b/src/ckpool.h index b2f59333..771ac692 100644 --- a/src/ckpool.h +++ b/src/ckpool.h @@ -34,7 +34,7 @@ struct ckmsgq { ckpool_t *ckp; char name[16]; pthread_t pth; - pthread_mutex_t *lock; + mutex_t *lock; pthread_cond_t *cond; ckmsg_t *msgs; void (*func)(ckpool_t *, void *); @@ -212,7 +212,7 @@ ckpool_t *global_ckp; bool ping_main(ckpool_t *ckp); void empty_buffer(connsock_t *cs); int read_socket_line(connsock_t *cs, const int timeout); -bool _send_proc(proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line); +void _send_proc(proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line); #define send_proc(pi, msg) _send_proc(pi, msg, __FILE__, __func__, __LINE__) char *_send_recv_proc(proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line); #define send_recv_proc(pi, msg) _send_recv_proc(pi, msg, __FILE__, __func__, __LINE__) diff --git a/src/connector.c b/src/connector.c index 950dd26c..f6d9f1ab 100644 --- a/src/connector.c +++ b/src/connector.c @@ -95,7 +95,7 @@ struct connector_data { int64_t sends_delayed; /* For protecting the pending sends list */ - pthread_mutex_t sender_lock; + mutex_t sender_lock; pthread_cond_t sender_cond; }; @@ -466,22 +466,22 @@ void *sender(void *arg) rename_proc("csender"); while (42) { - sender_send_t *sender_send; + sender_send_t *sender_send, *delayed; client_instance_t *client; - int ret, fd, ofs = 0; + int ret = 0, fd, ofs = 0; mutex_lock(&cdata->sender_lock); - /* Poll every 100ms if there are no new sends. Re-examine + /* Poll every 10ms if there are no new sends. Re-examine * delayed sends immediately after a successful send in case * endless new sends more frequently end up starving the * delayed sends. */ if (!cdata->sender_sends && !sent) { - const ts_t polltime = {0, 100000000}; + const ts_t polltime = {0, 10000000}; ts_t timeout_ts; ts_realtime(&timeout_ts); timeraddspec(&timeout_ts, &polltime); - pthread_cond_timedwait(&cdata->sender_cond, &cdata->sender_lock, &timeout_ts); + cond_timedwait(&cdata->sender_cond, &cdata->sender_lock, &timeout_ts); } sender_send = cdata->sender_sends; if (sender_send) @@ -494,27 +494,33 @@ void *sender(void *arg) * conditional with no new sends appearing or have just * serviced another message successfully. */ if (!sender_send) { - if (!cdata->delayed_sends) + /* Find a delayed client that needs servicing and set + * ret accordingly. We do not need to use FOREACH_SAFE + * as we break out of the loop as soon as we manipuate + * the list. */ + DL_FOREACH(cdata->delayed_sends, delayed) { + if ((ret = wait_write_select(delayed->client->fd, 0))) { + sender_send = cdata->delayed_sends; + DL_DELETE(cdata->delayed_sends, sender_send); + break; + } + } + /* None found ? */ + if (!sender_send) continue; - sender_send = cdata->delayed_sends; - DL_DELETE(cdata->delayed_sends, sender_send); } - client = sender_send->client; - ck_rlock(&cdata->lock); - fd = client->fd; - ck_runlock(&cdata->lock); - - if (fd == -1) { - LOGDEBUG("Discarding message sent to invalidated client"); - goto contfree; - } /* If this socket is not ready to receive data from us, put the * send back on the tail of the list and decrease the timeout * to poll to either look for a client that is ready or poll * select on this one */ - ret = wait_write_select(fd, 0); + ck_rlock(&cdata->lock); + fd = client->fd; + if (!ret) + ret = wait_write_select(fd, 0); + ck_runlock(&cdata->lock); + if (ret < 1) { if (ret < 0) { LOGINFO("Client id %"PRId64" fd %d interrupted", client->id, fd); @@ -530,7 +536,6 @@ void *sender(void *arg) cdata->sends_delayed++; continue; } - sent = true; while (sender_send->len) { ret = send(fd, sender_send->buf + ofs, sender_send->len , 0); if (unlikely(ret < 0)) { @@ -542,6 +547,7 @@ void *sender(void *arg) sender_send->len -= ret; } contfree: + sent = true; free(sender_send->buf); free(sender_send); dec_instance_ref(cdata, client); diff --git a/src/generator.c b/src/generator.c index 9c23e5c3..90f054f4 100644 --- a/src/generator.c +++ b/src/generator.c @@ -106,18 +106,18 @@ struct proxy_instance { bool diffed; /* Received new diff */ bool reconnect; /* We need to drop and reconnect */ - pthread_mutex_t notify_lock; + mutex_t notify_lock; notify_instance_t *notify_instances; notify_instance_t *current_notify; pthread_t pth_precv; pthread_t pth_psend; - pthread_mutex_t psend_lock; + mutex_t psend_lock; pthread_cond_t psend_cond; stratum_msg_t *psends; - pthread_mutex_t share_lock; + mutex_t share_lock; share_msg_t *shares; int64_t share_id; @@ -128,7 +128,7 @@ struct proxy_instance { /* Private data for the generator */ struct generator_data { - pthread_mutex_t lock; /* Lock protecting linked lists */ + mutex_t lock; /* Lock protecting linked lists */ proxy_instance_t *proxy_list; /* Linked list of all active proxies */ int proxy_notify_id; // Globally increasing notify id ckmsgq_t *srvchk; // Server check message queue @@ -1344,7 +1344,7 @@ static void *proxy_send(void *arg) mutex_lock(&proxi->psend_lock); if (!proxi->psends) - pthread_cond_wait(&proxi->psend_cond, &proxi->psend_lock); + cond_wait(&proxi->psend_cond, &proxi->psend_lock); msg = proxi->psends; if (likely(msg)) DL_DELETE(proxi->psends, msg); diff --git a/src/libckpool.c b/src/libckpool.c index bed7a816..7fc3017b 100644 --- a/src/libckpool.c +++ b/src/libckpool.c @@ -114,42 +114,96 @@ bool ck_completion_timeout(void *fn, void *fnarg, int timeout) return !ret; } +int _cond_wait(pthread_cond_t *cond, mutex_t *lock, const char *file, const char *func, const int line) +{ + int ret; + + ret = pthread_cond_wait(cond, &lock->mutex); + lock->file = file; + lock->func = func; + lock->line = line; + return ret; +} + +int _cond_timedwait(pthread_cond_t *cond, mutex_t *lock, const struct timespec *abstime, const char *file, const char *func, const int line) +{ + int ret; + + ret = pthread_cond_timedwait(cond, &lock->mutex, abstime); + lock->file = file; + lock->func = func; + lock->line = line; + return ret; +} -/* Place holders for when we add lock debugging */ -#define GETLOCK(_lock, _file, _func, _line) -#define GOTLOCK(_lock, _file, _func, _line) -#define TRYLOCK(_lock, _file, _func, _line) -#define DIDLOCK(_ret, _lock, _file, _func, _line) -#define GUNLOCK(_lock, _file, _func, _line) -#define INITLOCK(_typ, _lock, _file, _func, _line) -void _mutex_lock(pthread_mutex_t *lock, const char *file, const char *func, const int line) +int _mutex_timedlock(mutex_t *lock, int timeout, const char *file, const char *func, const int line) { - GETLOCK(lock, file, func, line); - if (unlikely(pthread_mutex_lock(lock))) + tv_t now; + ts_t abs; + int ret; + + tv_time(&now); + tv_to_ts(&abs, &now); + abs.tv_sec += timeout; + + ret = pthread_mutex_timedlock(&lock->mutex, &abs); + if (!ret) { + lock->file = file; + lock->func = func; + lock->line = line; + } + + return ret; +} + +/* Make every locking attempt warn if we're unable to get the lock for more + * than 10 seconds and fail if we can't get it for longer than a minute. */ +void _mutex_lock(mutex_t *lock, const char *file, const char *func, const int line) +{ + int ret, retries = 0; + +retry: + ret = _mutex_timedlock(lock, 10, file, func, line); + if (unlikely(ret)) { + if (likely(ret == ETIMEDOUT)) { + LOGERR("WARNING: Prolonged mutex lock contention from %s %s:%d, held by %s %s:%d", + file, func, line, lock->file, lock->func, lock->line); + if (++retries < 6) + goto retry; + quitfrom(1, file, func, line, "FAILED TO GRAB MUTEX!"); + } quitfrom(1, file, func, line, "WTF MUTEX ERROR ON LOCK!"); - GOTLOCK(lock, file, func, line); + } } -void _mutex_unlock(pthread_mutex_t *lock, const char *file, const char *func, const int line) +/* Does not unset lock->file/func/line since they're only relevant when the lock is held */ +void _mutex_unlock(mutex_t *lock, const char *file, const char *func, const int line) { - if (unlikely(pthread_mutex_unlock(lock))) + if (unlikely(pthread_mutex_unlock(&lock->mutex))) quitfrom(1, file, func, line, "WTF MUTEX ERROR ON UNLOCK!"); - GUNLOCK(lock, file, func, line); } -int _mutex_trylock(pthread_mutex_t *lock, __maybe_unused const char *file, __maybe_unused const char *func, __maybe_unused const int line) +int _mutex_trylock(mutex_t *lock, __maybe_unused const char *file, __maybe_unused const char *func, __maybe_unused const int line) { int ret; - TRYLOCK(lock, file, func, line); - ret = pthread_mutex_trylock(lock); - DIDLOCK(ret, lock, file, func, line); - + ret = pthread_mutex_trylock(&lock->mutex); + if (!ret) { + lock->file = file; + lock->func = func; + lock->line = line; + } return ret; } -int _mutex_timedlock(pthread_mutex_t *lock, int timeout, __maybe_unused const char *file, __maybe_unused const char *func, __maybe_unused const int line) +void mutex_destroy(mutex_t *lock) +{ + pthread_mutex_destroy(&lock->mutex); +} + + +static int wr_timedlock(pthread_rwlock_t *lock, int timeout) { tv_t now; ts_t abs; @@ -159,79 +213,108 @@ int _mutex_timedlock(pthread_mutex_t *lock, int timeout, __maybe_unused const ch tv_to_ts(&abs, &now); abs.tv_sec += timeout; - TRYLOCK(lock, file, func, line); - ret = pthread_mutex_timedlock(lock, &abs); - DIDLOCK(ret, lock, file, func, line); + ret = pthread_rwlock_timedwrlock(lock, &abs); return ret; } -void _wr_lock(pthread_rwlock_t *lock, const char *file, const char *func, const int line) +void _wr_lock(rwlock_t *lock, const char *file, const char *func, const int line) { - GETLOCK(lock, file, func, line); - if (unlikely(pthread_rwlock_wrlock(lock))) - quitfrom(1, file, func, line, "WTF WRLOCK ERROR ON LOCK!"); - GOTLOCK(lock, file, func, line); + int ret, retries = 0; + +retry: + ret = wr_timedlock(&lock->rwlock, 10); + if (unlikely(ret)) { + if (likely(ret == ETIMEDOUT)) { + LOGERR("WARNING: Prolonged write lock contention from %s %s:%d, held by %s %s:%d", + file, func, line, lock->file, lock->func, lock->line); + if (++retries < 6) + goto retry; + quitfrom(1, file, func, line, "FAILED TO GRAB WRITE LOCK!"); + } + quitfrom(1, file, func, line, "WTF ERROR ON WRITE LOCK!"); + } + lock->file = file; + lock->func = func; + lock->line = line; } -int _wr_trylock(pthread_rwlock_t *lock, __maybe_unused const char *file, __maybe_unused const char *func, __maybe_unused const int line) +int _wr_trylock(rwlock_t *lock, __maybe_unused const char *file, __maybe_unused const char *func, __maybe_unused const int line) { - TRYLOCK(lock, file, func, line); - int ret = pthread_rwlock_trywrlock(lock); - DIDLOCK(ret, lock, file, func, line); + int ret = pthread_rwlock_trywrlock(&lock->rwlock); + + if (!ret) { + lock->file = file; + lock->func = func; + lock->line = line; + } return ret; } -void _rd_lock(pthread_rwlock_t *lock, const char *file, const char *func, const int line) +static int rd_timedlock(pthread_rwlock_t *lock, int timeout) { - GETLOCK(lock, file, func, line); - if (unlikely(pthread_rwlock_rdlock(lock))) - quitfrom(1, file, func, line, "WTF RDLOCK ERROR ON LOCK!"); - GOTLOCK(lock, file, func, line); + tv_t now; + ts_t abs; + int ret; + + tv_time(&now); + tv_to_ts(&abs, &now); + abs.tv_sec += timeout; + + ret = pthread_rwlock_timedrdlock(lock, &abs); + + return ret; } -void _rw_unlock(pthread_rwlock_t *lock, const char *file, const char *func, const int line) +void _rd_lock(rwlock_t *lock, const char *file, const char *func, const int line) { - if (unlikely(pthread_rwlock_unlock(lock))) - quitfrom(1, file, func, line, "WTF RWLOCK ERROR ON UNLOCK!"); - GUNLOCK(lock, file, func, line); + int ret, retries = 0; + +retry: + ret = rd_timedlock(&lock->rwlock, 10); + if (unlikely(ret)) { + if (likely(ret == ETIMEDOUT)) { + LOGERR("WARNING: Prolonged read lock contention from %s %s:%d, held by %s %s:%d", + file, func, line, lock->file, lock->func, lock->line); + if (++retries < 6) + goto retry; + quitfrom(1, file, func, line, "FAILED TO GRAB READ LOCK!"); + } + quitfrom(1, file, func, line, "WTF ERROR ON READ LOCK!"); + } + lock->file = file; + lock->func = func; + lock->line = line; } -void _rd_unlock(pthread_rwlock_t *lock, const char *file, const char *func, const int line) +void _rw_unlock(rwlock_t *lock, const char *file, const char *func, const int line) { - _rw_unlock(lock, file, func, line); + if (unlikely(pthread_rwlock_unlock(&lock->rwlock))) + quitfrom(1, file, func, line, "WTF RWLOCK ERROR ON UNLOCK!"); } -void _wr_unlock(pthread_rwlock_t *lock, const char *file, const char *func, const int line) +void _rd_unlock(rwlock_t *lock, const char *file, const char *func, const int line) { _rw_unlock(lock, file, func, line); } -void _mutex_init(pthread_mutex_t *lock, const char *file, const char *func, const int line) +void _wr_unlock(rwlock_t *lock, const char *file, const char *func, const int line) { - if (unlikely(pthread_mutex_init(lock, NULL))) - quitfrom(1, file, func, line, "Failed to pthread_mutex_init"); - INITLOCK(lock, CGLOCK_MUTEX, file, func, line); + _rw_unlock(lock, file, func, line); } -void mutex_destroy(pthread_mutex_t *lock) +void _mutex_init(mutex_t *lock, const char *file, const char *func, const int line) { - /* Ignore return code. This only invalidates the mutex on linux but - * releases resources on windows. */ - pthread_mutex_destroy(lock); + if (unlikely(pthread_mutex_init(&lock->mutex, NULL))) + quitfrom(1, file, func, line, "Failed to pthread_mutex_init"); } -void _rwlock_init(pthread_rwlock_t *lock, const char *file, const char *func, const int line) +void _rwlock_init(rwlock_t *lock, const char *file, const char *func, const int line) { - if (unlikely(pthread_rwlock_init(lock, NULL))) + if (unlikely(pthread_rwlock_init(&lock->rwlock, NULL))) quitfrom(1, file, func, line, "Failed to pthread_rwlock_init"); - INITLOCK(lock, CGLOCK_RW, file, func, line); } -void rwlock_destroy(pthread_rwlock_t *lock) -{ - pthread_rwlock_destroy(lock); -} void _cond_init(pthread_cond_t *cond, const char *file, const char *func, const int line) { @@ -245,11 +328,6 @@ void _cklock_init(cklock_t *lock, const char *file, const char *func, const int _rwlock_init(&lock->rwlock, file, func, line); } -void cklock_destroy(cklock_t *lock) -{ - rwlock_destroy(&lock->rwlock); - mutex_destroy(&lock->mutex); -} /* Read lock variant of cklock. Cannot be promoted. */ void _ck_rlock(cklock_t *lock, const char *file, const char *func, const int line) @@ -317,6 +395,13 @@ void _ck_wunlock(cklock_t *lock, const char *file, const char *func, const int l _mutex_unlock(&lock->mutex, file, func, line); } +void cklock_destroy(cklock_t *lock) +{ + pthread_rwlock_destroy(&lock->rwlock.rwlock); + pthread_mutex_destroy(&lock->mutex.mutex); +} + + void _cksem_init(sem_t *sem, const char *file, const char *func, const int line) { int ret; @@ -825,6 +910,24 @@ out: return sockd; } +/* Wait till a socket has been closed at the other end */ +int wait_close(int sockd, int timeout) +{ + struct pollfd sfd; + int ret; + + if (unlikely(sockd < 0)) + return -1; + sfd.fd = sockd; + sfd.events = POLLIN; + sfd.revents = 0; + timeout *= 1000; + ret = poll(&sfd, 1, timeout); + if (ret < 1) + return 0; + return sfd.revents & POLLHUP; +} + /* Emulate a select read wait for high fds that select doesn't support */ int wait_read_select(int sockd, int timeout) { diff --git a/src/libckpool.h b/src/libckpool.h index cb7c1d01..79f0f8fd 100644 --- a/src/libckpool.h +++ b/src/libckpool.h @@ -145,11 +145,13 @@ static inline void flip_80(void *dest_p, const void *src_p) dest[i] = bswap_32(src[i]); } +#define cond_wait(_cond, _lock) _cond_wait(_cond, _lock, __FILE__, __func__, __LINE__) +#define cond_timedwait(_cond, _lock, _abstime) _cond_timedwait(_cond, _lock, _abstime, __FILE__, __func__, __LINE__) +#define mutex_timedlock(_lock, _timeout) _mutex_timedlock(_lock, _timeout, __FILE__, __func__, __LINE__) #define mutex_lock(_lock) _mutex_lock(_lock, __FILE__, __func__, __LINE__) #define mutex_unlock_noyield(_lock) _mutex_unlock_noyield(_lock, __FILE__, __func__, __LINE__) #define mutex_unlock(_lock) _mutex_unlock(_lock, __FILE__, __func__, __LINE__) #define mutex_trylock(_lock) _mutex_trylock(_lock, __FILE__, __func__, __LINE__) -#define mutex_timedlock(_lock, timeout) _mutex_timedlock(_lock, timeout, __FILE__, __func__, __LINE__) #define wr_lock(_lock) _wr_lock(_lock, __FILE__, __func__, __LINE__) #define wr_trylock(_lock) _wr_trylock(_lock, __FILE__, __func__, __LINE__) #define rd_lock(_lock) _rd_lock(_lock, __FILE__, __func__, __LINE__) @@ -275,10 +277,31 @@ static const char __maybe_unused *share_errs[] = { #define SHARE_ERR(x) share_errs[((x) + 9)] -/* ck locks, a write biased variant of rwlocks */ -struct cklock { +typedef struct ckmutex mutex_t; + +struct ckmutex { pthread_mutex_t mutex; + const char *file; + const char *func; + int line; +}; + +typedef struct ckrwlock rwlock_t; + +struct ckrwlock { pthread_rwlock_t rwlock; + const char *file; + const char *func; + int line; +}; + +/* ck locks, a write biased variant of rwlocks */ +struct cklock { + mutex_t mutex; + rwlock_t rwlock; + const char *file; + const char *func; + int line; }; typedef struct cklock cklock_t; @@ -386,27 +409,28 @@ void create_pthread(pthread_t *thread, void *(*start_routine)(void *), void *arg void join_pthread(pthread_t thread); bool ck_completion_timeout(void *fn, void *fnarg, int timeout); -void _mutex_lock(pthread_mutex_t *lock, const char *file, const char *func, const int line); -void _mutex_unlock_noyield(pthread_mutex_t *lock, const char *file, const char *func, const int line); -void _mutex_unlock(pthread_mutex_t *lock, const char *file, const char *func, const int line); -int _mutex_trylock(pthread_mutex_t *lock, __maybe_unused const char *file, __maybe_unused const char *func, __maybe_unused const int line); -int _mutex_timedlock(pthread_mutex_t *lock, int timeout, __maybe_unused const char *file, __maybe_unused const char *func, __maybe_unused const int line); -void _wr_lock(pthread_rwlock_t *lock, const char *file, const char *func, const int line); -int _wr_trylock(pthread_rwlock_t *lock, __maybe_unused const char *file, __maybe_unused const char *func, __maybe_unused const int line); -void _rd_lock(pthread_rwlock_t *lock, const char *file, const char *func, const int line); -void _rw_unlock(pthread_rwlock_t *lock, const char *file, const char *func, const int line); -void _rd_unlock_noyield(pthread_rwlock_t *lock, const char *file, const char *func, const int line); -void _wr_unlock_noyield(pthread_rwlock_t *lock, const char *file, const char *func, const int line); -void _rd_unlock(pthread_rwlock_t *lock, const char *file, const char *func, const int line); -void _wr_unlock(pthread_rwlock_t *lock, const char *file, const char *func, const int line); -void _mutex_init(pthread_mutex_t *lock, const char *file, const char *func, const int line); -void mutex_destroy(pthread_mutex_t *lock); -void _rwlock_init(pthread_rwlock_t *lock, const char *file, const char *func, const int line); -void rwlock_destroy(pthread_rwlock_t *lock); +int _cond_wait(pthread_cond_t *cond, mutex_t *lock, const char *file, const char *func, const int line); +int _cond_timedwait(pthread_cond_t *cond, mutex_t *lock, const struct timespec *abstime, const char *file, const char *func, const int line); +int _mutex_timedlock(mutex_t *lock, int timeout, const char *file, const char *func, const int line); +void _mutex_lock(mutex_t *lock, const char *file, const char *func, const int line); +void _mutex_unlock_noyield(mutex_t *lock, const char *file, const char *func, const int line); +void _mutex_unlock(mutex_t *lock, const char *file, const char *func, const int line); +int _mutex_trylock(mutex_t *lock, __maybe_unused const char *file, __maybe_unused const char *func, __maybe_unused const int line); +void mutex_destroy(mutex_t *lock); + +void _wr_lock(rwlock_t *lock, const char *file, const char *func, const int line); +int _wr_trylock(rwlock_t *lock, __maybe_unused const char *file, __maybe_unused const char *func, __maybe_unused const int line); +void _rd_lock(rwlock_t *lock, const char *file, const char *func, const int line); +void _rw_unlock(rwlock_t *lock, const char *file, const char *func, const int line); +void _rd_unlock_noyield(rwlock_t *lock, const char *file, const char *func, const int line); +void _wr_unlock_noyield(rwlock_t *lock, const char *file, const char *func, const int line); +void _rd_unlock(rwlock_t *lock, const char *file, const char *func, const int line); +void _wr_unlock(rwlock_t *lock, const char *file, const char *func, const int line); +void _mutex_init(mutex_t *lock, const char *file, const char *func, const int line); +void _rwlock_init(rwlock_t *lock, const char *file, const char *func, const int line); void _cond_init(pthread_cond_t *cond, const char *file, const char *func, const int line); void _cklock_init(cklock_t *lock, const char *file, const char *func, const int line); -void cklock_destroy(cklock_t *lock); void _ck_rlock(cklock_t *lock, const char *file, const char *func, const int line); void _ck_ilock(cklock_t *lock, const char *file, const char *func, const int line); void _ck_uilock(cklock_t *lock, const char *file, const char *func, const int line); @@ -417,6 +441,7 @@ void _ck_dwilock(cklock_t *lock, const char *file, const char *func, const int l void _ck_dlock(cklock_t *lock, const char *file, const char *func, const int line); void _ck_runlock(cklock_t *lock, const char *file, const char *func, const int line); void _ck_wunlock(cklock_t *lock, const char *file, const char *func, const int line); +void cklock_destroy(cklock_t *lock); void _cksem_init(sem_t *sem, const char *file, const char *func, const int line); void _cksem_post(sem_t *sem, const char *file, const char *func, const int line); @@ -468,6 +493,7 @@ int _open_unix_server(const char *server_path, const char *file, const char *fun #define open_unix_server(server_path) _open_unix_server(server_path, __FILE__, __func__, __LINE__) int _open_unix_client(const char *server_path, const char *file, const char *func, const int line); #define open_unix_client(server_path) _open_unix_client(server_path, __FILE__, __func__, __LINE__) +int wait_close(int sockd, int timeout); int wait_read_select(int sockd, int timeout); int read_length(int sockd, void *buf, int len); char *_recv_unix_msg(int sockd, int timeout1, int timeout2, const char *file, const char *func, const int line); diff --git a/src/stratifier.c b/src/stratifier.c index 889a22db..859edc6e 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -285,10 +285,10 @@ struct stratifier_data { pool_stats_t stats; /* Protects changes to pool stats */ - pthread_mutex_t stats_lock; + mutex_t stats_lock; /* Serialises sends/receives to ckdb if possible */ - pthread_mutex_t ckdb_lock; + mutex_t ckdb_lock; bool ckdb_offline; @@ -336,14 +336,14 @@ struct stratifier_data { cklock_t instance_lock; share_t *shares; - pthread_mutex_t share_lock; + mutex_t share_lock; int64_t shares_generated; /* Linked list of block solves, added to during submission, removed on * accept/reject. It is likely we only ever have one solve on here but * you never know... */ - pthread_mutex_t block_lock; + mutex_t block_lock; ckmsg_t *block_solves; /* Generator message priority */ @@ -1241,11 +1241,15 @@ static void client_drop_message(const int64_t client_id, const int dropped, cons } } +static void dec_worker(ckpool_t *ckp, user_instance_t *instance); + /* Decrease the reference count of instance. */ static void _dec_instance_ref(sdata_t *sdata, stratum_instance_t *client, const char *file, const char *func, const int line) { + user_instance_t *user = client->user_instance; int64_t client_id = client->id; + ckpool_t *ckp = client->ckp; int dropped = 0, ref; ck_wlock(&sdata->instance_lock); @@ -1253,7 +1257,7 @@ static void _dec_instance_ref(sdata_t *sdata, stratum_instance_t *client, const /* See if there are any instances that were dropped that could not be * moved due to holding a reference and drop them now. */ if (unlikely(client->dropped && !ref)) - dropped = __drop_client(sdata, client, client->user_instance); + dropped = __drop_client(sdata, client, user); ck_wunlock(&sdata->instance_lock); client_drop_message(client_id, dropped, true); @@ -1261,6 +1265,9 @@ static void _dec_instance_ref(sdata_t *sdata, stratum_instance_t *client, const /* This should never happen */ if (unlikely(ref < 0)) LOGERR("Instance ref count dropped below zero from %s %s:%d", file, func, line); + + if (dropped && user) + dec_worker(ckp, user); } #define dec_instance_ref(sdata, instance) _dec_instance_ref(sdata, instance, __FILE__, __func__, __LINE__) @@ -3414,7 +3421,11 @@ static void parse_instance_msg(sdata_t *sdata, smsg_t *msg, stratum_instance_t * if (res_val) { const char *result = json_string_value(res_val); - LOGDEBUG("Received spurious response %s", result ? result : ""); + if (!safecmp(result, "pong")) + LOGDEBUG("Received pong from client %"PRId64, client_id); + else + LOGDEBUG("Received spurious response %s from client %"PRId64, + result ? result : "", client_id); goto out; } send_json_err(sdata, client_id, id_val, "-3:method not found"); @@ -3676,7 +3687,7 @@ static void parse_ckdb_cmd(ckpool_t *ckp, const char *cmd) } /* Test a value under lock and set it, returning the original value */ -static bool test_and_set(bool *val, pthread_mutex_t *lock) +static bool test_and_set(bool *val, mutex_t *lock) { bool ret; @@ -3688,7 +3699,7 @@ static bool test_and_set(bool *val, pthread_mutex_t *lock) return ret; } -static bool test_and_clear(bool *val, pthread_mutex_t *lock) +static bool test_and_clear(bool *val, mutex_t *lock) { bool ret; @@ -4338,8 +4349,8 @@ int stratifier(proc_instance_t *pi) ckpool_t *ckp = pi->ckp; int ret = 1, threads; int64_t randomiser; + char *buf = NULL; sdata_t *sdata; - char *buf; LOGWARNING("%s stratifier starting", ckp->name); sdata = ckzalloc(sizeof(sdata_t));