From 8b1b6d45d645449fdf8263629e7eee4baa8bfc8f Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Sun, 8 Feb 2015 13:02:50 +1100 Subject: [PATCH 01/35] fd being invalidated is checked for in wait_write_select so we don't need to handle it twice --- src/connector.c | 4 ---- 1 file changed, 4 deletions(-) diff --git a/src/connector.c b/src/connector.c index 950dd26c..9d053dba 100644 --- a/src/connector.c +++ b/src/connector.c @@ -506,10 +506,6 @@ void *sender(void *arg) fd = client->fd; ck_runlock(&cdata->lock); - if (fd == -1) { - LOGDEBUG("Discarding message sent to invalidated client"); - goto contfree; - } /* If this socket is not ready to receive data from us, put the * send back on the tail of the list and decrease the timeout * to poll to either look for a client that is ready or poll From 163fc40afbbe53201e40cd8a8855f09a7bf8acd9 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Sun, 8 Feb 2015 13:27:33 +1100 Subject: [PATCH 02/35] Check all delayed clients for a serviceable one in the connector when we can and consider dropping a client servicing one to not potentially create delayed sends faster than we service them --- src/connector.c | 38 ++++++++++++++++++++++++-------------- 1 file changed, 24 insertions(+), 14 deletions(-) diff --git a/src/connector.c b/src/connector.c index 9d053dba..135da0bb 100644 --- a/src/connector.c +++ b/src/connector.c @@ -466,17 +466,17 @@ void *sender(void *arg) rename_proc("csender"); while (42) { - sender_send_t *sender_send; + sender_send_t *sender_send, *delayed; client_instance_t *client; - int ret, fd, ofs = 0; + int ret = 0, fd, ofs = 0; mutex_lock(&cdata->sender_lock); - /* Poll every 100ms if there are no new sends. Re-examine + /* Poll every 10ms if there are no new sends. Re-examine * delayed sends immediately after a successful send in case * endless new sends more frequently end up starving the * delayed sends. */ if (!cdata->sender_sends && !sent) { - const ts_t polltime = {0, 100000000}; + const ts_t polltime = {0, 10000000}; ts_t timeout_ts; ts_realtime(&timeout_ts); @@ -494,23 +494,33 @@ void *sender(void *arg) * conditional with no new sends appearing or have just * serviced another message successfully. */ if (!sender_send) { - if (!cdata->delayed_sends) + /* Find a delayed client that needs servicing and set + * ret accordingly. We do not need to use FOREACH_SAFE + * as we break out of the loop as soon as we manipuate + * the list. */ + DL_FOREACH(cdata->delayed_sends, delayed) { + if ((ret = wait_write_select(delayed->client->fd, 0))) { + sender_send = cdata->delayed_sends; + DL_DELETE(cdata->delayed_sends, sender_send); + break; + } + } + /* None found ? */ + if (!sender_send) continue; - sender_send = cdata->delayed_sends; - DL_DELETE(cdata->delayed_sends, sender_send); } - client = sender_send->client; - ck_rlock(&cdata->lock); - fd = client->fd; - ck_runlock(&cdata->lock); - /* If this socket is not ready to receive data from us, put the * send back on the tail of the list and decrease the timeout * to poll to either look for a client that is ready or poll * select on this one */ - ret = wait_write_select(fd, 0); + ck_rlock(&cdata->lock); + fd = client->fd; + if (!ret) + ret = wait_write_select(fd, 0); + ck_runlock(&cdata->lock); + if (ret < 1) { if (ret < 0) { LOGINFO("Client id %"PRId64" fd %d interrupted", client->id, fd); @@ -526,7 +536,6 @@ void *sender(void *arg) cdata->sends_delayed++; continue; } - sent = true; while (sender_send->len) { ret = send(fd, sender_send->buf + ofs, sender_send->len , 0); if (unlikely(ret < 0)) { @@ -538,6 +547,7 @@ void *sender(void *arg) sender_send->len -= ret; } contfree: + sent = true; free(sender_send->buf); free(sender_send); dec_instance_ref(cdata, client); From 3ed9f9757b0b8b66c1e2bf32dbdbc993fb87991e Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Sun, 8 Feb 2015 19:02:23 +1100 Subject: [PATCH 03/35] Push version to 0.8.6 --- configure.ac | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/configure.ac b/configure.ac index d99cf945..49d28ee2 100644 --- a/configure.ac +++ b/configure.ac @@ -1,4 +1,4 @@ -AC_INIT(ckpool, 0.8.5, kernel@kolivas.org) +AC_INIT(ckpool, 0.8.6, kernel@kolivas.org) AC_CANONICAL_SYSTEM AC_CONFIG_MACRO_DIR([m4]) From 0f08e9518f633c1dfda789b86b572b7f73e86572 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Thu, 19 Feb 2015 11:42:35 +1100 Subject: [PATCH 04/35] Add helpers for rw timed locks --- src/libckpool.c | 38 ++++++++++++++++++++++++++++++++++++-- src/libckpool.h | 4 +++- 2 files changed, 39 insertions(+), 3 deletions(-) diff --git a/src/libckpool.c b/src/libckpool.c index bed7a816..655e79c2 100644 --- a/src/libckpool.c +++ b/src/libckpool.c @@ -123,6 +123,23 @@ bool ck_completion_timeout(void *fn, void *fnarg, int timeout) #define GUNLOCK(_lock, _file, _func, _line) #define INITLOCK(_typ, _lock, _file, _func, _line) +int _mutex_timedlock(pthread_mutex_t *lock, int timeout, __maybe_unused const char *file, __maybe_unused const char *func, __maybe_unused const int line) +{ + tv_t now; + ts_t abs; + int ret; + + tv_time(&now); + tv_to_ts(&abs, &now); + abs.tv_sec += timeout; + + TRYLOCK(lock, file, func, line); + ret = pthread_mutex_timedlock(lock, &abs); + DIDLOCK(ret, lock, file, func, line); + + return ret; +} + void _mutex_lock(pthread_mutex_t *lock, const char *file, const char *func, const int line) { GETLOCK(lock, file, func, line); @@ -149,7 +166,7 @@ int _mutex_trylock(pthread_mutex_t *lock, __maybe_unused const char *file, __may return ret; } -int _mutex_timedlock(pthread_mutex_t *lock, int timeout, __maybe_unused const char *file, __maybe_unused const char *func, __maybe_unused const int line) +int _wr_timedlock(pthread_rwlock_t *lock, int timeout, __maybe_unused const char *file, __maybe_unused const char *func, __maybe_unused const int line) { tv_t now; ts_t abs; @@ -160,7 +177,7 @@ int _mutex_timedlock(pthread_mutex_t *lock, int timeout, __maybe_unused const ch abs.tv_sec += timeout; TRYLOCK(lock, file, func, line); - ret = pthread_mutex_timedlock(lock, &abs); + ret = pthread_rwlock_timedwrlock(lock, &abs); DIDLOCK(ret, lock, file, func, line); return ret; @@ -182,6 +199,23 @@ int _wr_trylock(pthread_rwlock_t *lock, __maybe_unused const char *file, __maybe return ret; } +int _rd_timedlock(pthread_rwlock_t *lock, int timeout, __maybe_unused const char *file, __maybe_unused const char *func, __maybe_unused const int line) +{ + tv_t now; + ts_t abs; + int ret; + + tv_time(&now); + tv_to_ts(&abs, &now); + abs.tv_sec += timeout; + + TRYLOCK(lock, file, func, line); + ret = pthread_rwlock_timedrdlock(lock, &abs); + DIDLOCK(ret, lock, file, func, line); + + return ret; +} + void _rd_lock(pthread_rwlock_t *lock, const char *file, const char *func, const int line) { GETLOCK(lock, file, func, line); diff --git a/src/libckpool.h b/src/libckpool.h index cb7c1d01..870e1f0b 100644 --- a/src/libckpool.h +++ b/src/libckpool.h @@ -386,13 +386,15 @@ void create_pthread(pthread_t *thread, void *(*start_routine)(void *), void *arg void join_pthread(pthread_t thread); bool ck_completion_timeout(void *fn, void *fnarg, int timeout); +int _mutex_timedlock(pthread_mutex_t *lock, int timeout, __maybe_unused const char *file, __maybe_unused const char *func, __maybe_unused const int line); void _mutex_lock(pthread_mutex_t *lock, const char *file, const char *func, const int line); void _mutex_unlock_noyield(pthread_mutex_t *lock, const char *file, const char *func, const int line); void _mutex_unlock(pthread_mutex_t *lock, const char *file, const char *func, const int line); int _mutex_trylock(pthread_mutex_t *lock, __maybe_unused const char *file, __maybe_unused const char *func, __maybe_unused const int line); -int _mutex_timedlock(pthread_mutex_t *lock, int timeout, __maybe_unused const char *file, __maybe_unused const char *func, __maybe_unused const int line); +int _wr_timedlock(pthread_rwlock_t *lock, int timeout, __maybe_unused const char *file, __maybe_unused const char *func, __maybe_unused const int line); void _wr_lock(pthread_rwlock_t *lock, const char *file, const char *func, const int line); int _wr_trylock(pthread_rwlock_t *lock, __maybe_unused const char *file, __maybe_unused const char *func, __maybe_unused const int line); +int _rd_timedlock(pthread_rwlock_t *lock, int timeout, __maybe_unused const char *file, __maybe_unused const char *func, __maybe_unused const int line); void _rd_lock(pthread_rwlock_t *lock, const char *file, const char *func, const int line); void _rw_unlock(pthread_rwlock_t *lock, const char *file, const char *func, const int line); void _rd_unlock_noyield(pthread_rwlock_t *lock, const char *file, const char *func, const int line); From c572d580eb6e2b70f9389a7307d2e49b38e74c51 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Thu, 19 Feb 2015 12:02:07 +1100 Subject: [PATCH 05/35] Add lock contention testing warning at 10 seconds to all lock grabbing and fail if a lock is unable to be grabbed for more than a minute --- src/libckpool.c | 45 ++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 40 insertions(+), 5 deletions(-) diff --git a/src/libckpool.c b/src/libckpool.c index 655e79c2..350ae09e 100644 --- a/src/libckpool.c +++ b/src/libckpool.c @@ -140,11 +140,24 @@ int _mutex_timedlock(pthread_mutex_t *lock, int timeout, __maybe_unused const ch return ret; } +/* Make every locking attempt warn if we're unable to get the lock for more + * than 10 seconds and fail if we can't get it for longer than a minute. */ void _mutex_lock(pthread_mutex_t *lock, const char *file, const char *func, const int line) { + int ret, retries = 0; + GETLOCK(lock, file, func, line); - if (unlikely(pthread_mutex_lock(lock))) +retry: + ret = _mutex_timedlock(lock, 10, file, func, line); + if (unlikely(ret)) { + if (likely(ret == ETIMEDOUT)) { + LOGERR("WARNING: Prolonged mutex lock contention from %s %s:%d", file, func, line); + if (++retries < 6) + goto retry; + quitfrom(1, file, func, line, "FAILED TO GRAB MUTEX!"); + } quitfrom(1, file, func, line, "WTF MUTEX ERROR ON LOCK!"); + } GOTLOCK(lock, file, func, line); } @@ -185,9 +198,20 @@ int _wr_timedlock(pthread_rwlock_t *lock, int timeout, __maybe_unused const char void _wr_lock(pthread_rwlock_t *lock, const char *file, const char *func, const int line) { + int ret, retries = 0; + GETLOCK(lock, file, func, line); - if (unlikely(pthread_rwlock_wrlock(lock))) - quitfrom(1, file, func, line, "WTF WRLOCK ERROR ON LOCK!"); +retry: + ret = _wr_timedlock(lock, 10, file, func, line); + if (unlikely(ret)) { + if (likely(ret == ETIMEDOUT)) { + LOGERR("WARNING: Prolonged write lock contention from %s %s:%d", file, func, line); + if (++retries < 6) + goto retry; + quitfrom(1, file, func, line, "FAILED TO GRAB WRITE LOCK!"); + } + quitfrom(1, file, func, line, "WTF ERROR ON WRITE LOCK!"); + } GOTLOCK(lock, file, func, line); } @@ -218,9 +242,20 @@ int _rd_timedlock(pthread_rwlock_t *lock, int timeout, __maybe_unused const char void _rd_lock(pthread_rwlock_t *lock, const char *file, const char *func, const int line) { + int ret, retries = 0; + GETLOCK(lock, file, func, line); - if (unlikely(pthread_rwlock_rdlock(lock))) - quitfrom(1, file, func, line, "WTF RDLOCK ERROR ON LOCK!"); +retry: + ret = _rd_timedlock(lock, 10, file, func, line); + if (unlikely(ret)) { + if (likely(ret == ETIMEDOUT)) { + LOGERR("WARNING: Prolonged read lock contention from %s %s:%d", file, func, line); + if (++retries < 6) + goto retry; + quitfrom(1, file, func, line, "FAILED TO GRAB READ LOCK!"); + } + quitfrom(1, file, func, line, "WTF ERROR ON READ LOCK!"); + } GOTLOCK(lock, file, func, line); } From 7b452b361f12a6c3aa9e6e8220254d640b3e37a0 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Thu, 19 Feb 2015 13:33:23 +1100 Subject: [PATCH 06/35] Remove macro placeholders for locks in preparation for implementing lock tracking --- src/libckpool.c | 41 +++++++---------------------------------- src/libckpool.h | 7 +++---- 2 files changed, 10 insertions(+), 38 deletions(-) diff --git a/src/libckpool.c b/src/libckpool.c index 350ae09e..531f8fe8 100644 --- a/src/libckpool.c +++ b/src/libckpool.c @@ -115,15 +115,7 @@ bool ck_completion_timeout(void *fn, void *fnarg, int timeout) } -/* Place holders for when we add lock debugging */ -#define GETLOCK(_lock, _file, _func, _line) -#define GOTLOCK(_lock, _file, _func, _line) -#define TRYLOCK(_lock, _file, _func, _line) -#define DIDLOCK(_ret, _lock, _file, _func, _line) -#define GUNLOCK(_lock, _file, _func, _line) -#define INITLOCK(_typ, _lock, _file, _func, _line) - -int _mutex_timedlock(pthread_mutex_t *lock, int timeout, __maybe_unused const char *file, __maybe_unused const char *func, __maybe_unused const int line) +int mutex_timedlock(pthread_mutex_t *lock, int timeout) { tv_t now; ts_t abs; @@ -133,9 +125,7 @@ int _mutex_timedlock(pthread_mutex_t *lock, int timeout, __maybe_unused const ch tv_to_ts(&abs, &now); abs.tv_sec += timeout; - TRYLOCK(lock, file, func, line); ret = pthread_mutex_timedlock(lock, &abs); - DIDLOCK(ret, lock, file, func, line); return ret; } @@ -146,9 +136,8 @@ void _mutex_lock(pthread_mutex_t *lock, const char *file, const char *func, cons { int ret, retries = 0; - GETLOCK(lock, file, func, line); retry: - ret = _mutex_timedlock(lock, 10, file, func, line); + ret = mutex_timedlock(lock, 10); if (unlikely(ret)) { if (likely(ret == ETIMEDOUT)) { LOGERR("WARNING: Prolonged mutex lock contention from %s %s:%d", file, func, line); @@ -158,28 +147,24 @@ retry: } quitfrom(1, file, func, line, "WTF MUTEX ERROR ON LOCK!"); } - GOTLOCK(lock, file, func, line); } void _mutex_unlock(pthread_mutex_t *lock, const char *file, const char *func, const int line) { if (unlikely(pthread_mutex_unlock(lock))) quitfrom(1, file, func, line, "WTF MUTEX ERROR ON UNLOCK!"); - GUNLOCK(lock, file, func, line); } int _mutex_trylock(pthread_mutex_t *lock, __maybe_unused const char *file, __maybe_unused const char *func, __maybe_unused const int line) { int ret; - TRYLOCK(lock, file, func, line); ret = pthread_mutex_trylock(lock); - DIDLOCK(ret, lock, file, func, line); return ret; } -int _wr_timedlock(pthread_rwlock_t *lock, int timeout, __maybe_unused const char *file, __maybe_unused const char *func, __maybe_unused const int line) +int wr_timedlock(pthread_rwlock_t *lock, int timeout) { tv_t now; ts_t abs; @@ -189,9 +174,7 @@ int _wr_timedlock(pthread_rwlock_t *lock, int timeout, __maybe_unused const char tv_to_ts(&abs, &now); abs.tv_sec += timeout; - TRYLOCK(lock, file, func, line); ret = pthread_rwlock_timedwrlock(lock, &abs); - DIDLOCK(ret, lock, file, func, line); return ret; } @@ -200,9 +183,8 @@ void _wr_lock(pthread_rwlock_t *lock, const char *file, const char *func, const { int ret, retries = 0; - GETLOCK(lock, file, func, line); retry: - ret = _wr_timedlock(lock, 10, file, func, line); + ret = wr_timedlock(lock, 10); if (unlikely(ret)) { if (likely(ret == ETIMEDOUT)) { LOGERR("WARNING: Prolonged write lock contention from %s %s:%d", file, func, line); @@ -212,18 +194,16 @@ retry: } quitfrom(1, file, func, line, "WTF ERROR ON WRITE LOCK!"); } - GOTLOCK(lock, file, func, line); } int _wr_trylock(pthread_rwlock_t *lock, __maybe_unused const char *file, __maybe_unused const char *func, __maybe_unused const int line) { - TRYLOCK(lock, file, func, line); int ret = pthread_rwlock_trywrlock(lock); - DIDLOCK(ret, lock, file, func, line); + return ret; } -int _rd_timedlock(pthread_rwlock_t *lock, int timeout, __maybe_unused const char *file, __maybe_unused const char *func, __maybe_unused const int line) +int rd_timedlock(pthread_rwlock_t *lock, int timeout) { tv_t now; ts_t abs; @@ -233,9 +213,7 @@ int _rd_timedlock(pthread_rwlock_t *lock, int timeout, __maybe_unused const char tv_to_ts(&abs, &now); abs.tv_sec += timeout; - TRYLOCK(lock, file, func, line); ret = pthread_rwlock_timedrdlock(lock, &abs); - DIDLOCK(ret, lock, file, func, line); return ret; } @@ -244,9 +222,8 @@ void _rd_lock(pthread_rwlock_t *lock, const char *file, const char *func, const { int ret, retries = 0; - GETLOCK(lock, file, func, line); retry: - ret = _rd_timedlock(lock, 10, file, func, line); + ret = rd_timedlock(lock, 10); if (unlikely(ret)) { if (likely(ret == ETIMEDOUT)) { LOGERR("WARNING: Prolonged read lock contention from %s %s:%d", file, func, line); @@ -256,14 +233,12 @@ retry: } quitfrom(1, file, func, line, "WTF ERROR ON READ LOCK!"); } - GOTLOCK(lock, file, func, line); } void _rw_unlock(pthread_rwlock_t *lock, const char *file, const char *func, const int line) { if (unlikely(pthread_rwlock_unlock(lock))) quitfrom(1, file, func, line, "WTF RWLOCK ERROR ON UNLOCK!"); - GUNLOCK(lock, file, func, line); } void _rd_unlock(pthread_rwlock_t *lock, const char *file, const char *func, const int line) @@ -280,7 +255,6 @@ void _mutex_init(pthread_mutex_t *lock, const char *file, const char *func, cons { if (unlikely(pthread_mutex_init(lock, NULL))) quitfrom(1, file, func, line, "Failed to pthread_mutex_init"); - INITLOCK(lock, CGLOCK_MUTEX, file, func, line); } void mutex_destroy(pthread_mutex_t *lock) @@ -294,7 +268,6 @@ void _rwlock_init(pthread_rwlock_t *lock, const char *file, const char *func, co { if (unlikely(pthread_rwlock_init(lock, NULL))) quitfrom(1, file, func, line, "Failed to pthread_rwlock_init"); - INITLOCK(lock, CGLOCK_RW, file, func, line); } void rwlock_destroy(pthread_rwlock_t *lock) diff --git a/src/libckpool.h b/src/libckpool.h index 870e1f0b..9237707d 100644 --- a/src/libckpool.h +++ b/src/libckpool.h @@ -149,7 +149,6 @@ static inline void flip_80(void *dest_p, const void *src_p) #define mutex_unlock_noyield(_lock) _mutex_unlock_noyield(_lock, __FILE__, __func__, __LINE__) #define mutex_unlock(_lock) _mutex_unlock(_lock, __FILE__, __func__, __LINE__) #define mutex_trylock(_lock) _mutex_trylock(_lock, __FILE__, __func__, __LINE__) -#define mutex_timedlock(_lock, timeout) _mutex_timedlock(_lock, timeout, __FILE__, __func__, __LINE__) #define wr_lock(_lock) _wr_lock(_lock, __FILE__, __func__, __LINE__) #define wr_trylock(_lock) _wr_trylock(_lock, __FILE__, __func__, __LINE__) #define rd_lock(_lock) _rd_lock(_lock, __FILE__, __func__, __LINE__) @@ -386,15 +385,15 @@ void create_pthread(pthread_t *thread, void *(*start_routine)(void *), void *arg void join_pthread(pthread_t thread); bool ck_completion_timeout(void *fn, void *fnarg, int timeout); -int _mutex_timedlock(pthread_mutex_t *lock, int timeout, __maybe_unused const char *file, __maybe_unused const char *func, __maybe_unused const int line); +int mutex_timedlock(pthread_mutex_t *lock, int timeout); void _mutex_lock(pthread_mutex_t *lock, const char *file, const char *func, const int line); void _mutex_unlock_noyield(pthread_mutex_t *lock, const char *file, const char *func, const int line); void _mutex_unlock(pthread_mutex_t *lock, const char *file, const char *func, const int line); int _mutex_trylock(pthread_mutex_t *lock, __maybe_unused const char *file, __maybe_unused const char *func, __maybe_unused const int line); -int _wr_timedlock(pthread_rwlock_t *lock, int timeout, __maybe_unused const char *file, __maybe_unused const char *func, __maybe_unused const int line); +int wr_timedlock(pthread_rwlock_t *lock, int timeout); void _wr_lock(pthread_rwlock_t *lock, const char *file, const char *func, const int line); int _wr_trylock(pthread_rwlock_t *lock, __maybe_unused const char *file, __maybe_unused const char *func, __maybe_unused const int line); -int _rd_timedlock(pthread_rwlock_t *lock, int timeout, __maybe_unused const char *file, __maybe_unused const char *func, __maybe_unused const int line); +int rd_timedlock(pthread_rwlock_t *lock, int timeout); void _rd_lock(pthread_rwlock_t *lock, const char *file, const char *func, const int line); void _rw_unlock(pthread_rwlock_t *lock, const char *file, const char *func, const int line); void _rd_unlock_noyield(pthread_rwlock_t *lock, const char *file, const char *func, const int line); From 42f14a95362f639c2d8bb420adf6a3f7d8e3ea35 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Thu, 19 Feb 2015 14:26:57 +1100 Subject: [PATCH 07/35] Add full lock tracking for mutexes and rwlocks --- src/ckdb.c | 4 +- src/ckdb.h | 2 +- src/ckpool.c | 8 +-- src/ckpool.h | 2 +- src/connector.c | 4 +- src/generator.c | 10 ++-- src/libckpool.c | 131 +++++++++++++++++++++++++++++++---------------- src/libckpool.h | 68 ++++++++++++++++-------- src/stratifier.c | 12 ++--- 9 files changed, 154 insertions(+), 87 deletions(-) diff --git a/src/ckdb.c b/src/ckdb.c index ca6f1e33..24f05d4b 100644 --- a/src/ckdb.c +++ b/src/ckdb.c @@ -301,7 +301,7 @@ K_STORE *logqueue_store; // WORKQUEUE K_LIST *workqueue_free; K_STORE *workqueue_store; -pthread_mutex_t wq_waitlock; +mutex_t wq_waitlock; pthread_cond_t wq_waitcond; // HEARTBEATQUEUE @@ -3247,7 +3247,7 @@ static void *listener(void *arg) timeraddspec(&abs, &tsdiff); mutex_lock(&wq_waitlock); - pthread_cond_timedwait(&wq_waitcond, &wq_waitlock, &abs); + cond_timedwait(&wq_waitcond, &wq_waitlock, &abs); mutex_unlock(&wq_waitlock); } } diff --git a/src/ckdb.h b/src/ckdb.h index e8ecd940..a78ec107 100644 --- a/src/ckdb.h +++ b/src/ckdb.h @@ -654,7 +654,7 @@ typedef struct workqueue { extern K_LIST *workqueue_free; extern K_STORE *workqueue_store; -extern pthread_mutex_t wq_waitlock; +extern mutex_t wq_waitlock; extern pthread_cond_t wq_waitcond; // HEARTBEATQUEUE diff --git a/src/ckpool.c b/src/ckpool.c index cdbfbe39..e90d0bb5 100644 --- a/src/ckpool.c +++ b/src/ckpool.c @@ -120,7 +120,7 @@ static void *ckmsg_queue(void *arg) tv_to_ts(&abs, &now); abs.tv_sec++; if (!ckmsgq->msgs) - pthread_cond_timedwait(ckmsgq->cond, ckmsgq->lock, &abs); + cond_timedwait(ckmsgq->cond, ckmsgq->lock, &abs); msg = ckmsgq->msgs; if (msg) DL_DELETE(ckmsgq->msgs, msg); @@ -141,7 +141,7 @@ ckmsgq_t *create_ckmsgq(ckpool_t *ckp, const char *name, const void *func) strncpy(ckmsgq->name, name, 15); ckmsgq->func = func; ckmsgq->ckp = ckp; - ckmsgq->lock = ckalloc(sizeof(pthread_mutex_t)); + ckmsgq->lock = ckalloc(sizeof(mutex_t)); ckmsgq->cond = ckalloc(sizeof(pthread_cond_t)); mutex_init(ckmsgq->lock); cond_init(ckmsgq->cond); @@ -153,11 +153,11 @@ ckmsgq_t *create_ckmsgq(ckpool_t *ckp, const char *name, const void *func) ckmsgq_t *create_ckmsgqs(ckpool_t *ckp, const char *name, const void *func, const int count) { ckmsgq_t *ckmsgq = ckzalloc(sizeof(ckmsgq_t) * count); - pthread_mutex_t *lock; + mutex_t *lock; pthread_cond_t *cond; int i; - lock = ckalloc(sizeof(pthread_mutex_t)); + lock = ckalloc(sizeof(mutex_t)); cond = ckalloc(sizeof(pthread_cond_t)); mutex_init(lock); cond_init(cond); diff --git a/src/ckpool.h b/src/ckpool.h index b2f59333..797beb24 100644 --- a/src/ckpool.h +++ b/src/ckpool.h @@ -34,7 +34,7 @@ struct ckmsgq { ckpool_t *ckp; char name[16]; pthread_t pth; - pthread_mutex_t *lock; + mutex_t *lock; pthread_cond_t *cond; ckmsg_t *msgs; void (*func)(ckpool_t *, void *); diff --git a/src/connector.c b/src/connector.c index 135da0bb..f6d9f1ab 100644 --- a/src/connector.c +++ b/src/connector.c @@ -95,7 +95,7 @@ struct connector_data { int64_t sends_delayed; /* For protecting the pending sends list */ - pthread_mutex_t sender_lock; + mutex_t sender_lock; pthread_cond_t sender_cond; }; @@ -481,7 +481,7 @@ void *sender(void *arg) ts_realtime(&timeout_ts); timeraddspec(&timeout_ts, &polltime); - pthread_cond_timedwait(&cdata->sender_cond, &cdata->sender_lock, &timeout_ts); + cond_timedwait(&cdata->sender_cond, &cdata->sender_lock, &timeout_ts); } sender_send = cdata->sender_sends; if (sender_send) diff --git a/src/generator.c b/src/generator.c index 9c23e5c3..90f054f4 100644 --- a/src/generator.c +++ b/src/generator.c @@ -106,18 +106,18 @@ struct proxy_instance { bool diffed; /* Received new diff */ bool reconnect; /* We need to drop and reconnect */ - pthread_mutex_t notify_lock; + mutex_t notify_lock; notify_instance_t *notify_instances; notify_instance_t *current_notify; pthread_t pth_precv; pthread_t pth_psend; - pthread_mutex_t psend_lock; + mutex_t psend_lock; pthread_cond_t psend_cond; stratum_msg_t *psends; - pthread_mutex_t share_lock; + mutex_t share_lock; share_msg_t *shares; int64_t share_id; @@ -128,7 +128,7 @@ struct proxy_instance { /* Private data for the generator */ struct generator_data { - pthread_mutex_t lock; /* Lock protecting linked lists */ + mutex_t lock; /* Lock protecting linked lists */ proxy_instance_t *proxy_list; /* Linked list of all active proxies */ int proxy_notify_id; // Globally increasing notify id ckmsgq_t *srvchk; // Server check message queue @@ -1344,7 +1344,7 @@ static void *proxy_send(void *arg) mutex_lock(&proxi->psend_lock); if (!proxi->psends) - pthread_cond_wait(&proxi->psend_cond, &proxi->psend_lock); + cond_wait(&proxi->psend_cond, &proxi->psend_lock); msg = proxi->psends; if (likely(msg)) DL_DELETE(proxi->psends, msg); diff --git a/src/libckpool.c b/src/libckpool.c index 531f8fe8..19fe19ed 100644 --- a/src/libckpool.c +++ b/src/libckpool.c @@ -114,8 +114,30 @@ bool ck_completion_timeout(void *fn, void *fnarg, int timeout) return !ret; } +int _cond_wait(pthread_cond_t *cond, mutex_t *lock, const char *file, const char *func, const int line) +{ + int ret; + + ret = pthread_cond_wait(cond, &lock->mutex); + lock->file = file; + lock->func = func; + lock->line = line; + return ret; +} -int mutex_timedlock(pthread_mutex_t *lock, int timeout) +int _cond_timedwait(pthread_cond_t *cond, mutex_t *lock, const struct timespec *abstime, const char *file, const char *func, const int line) +{ + int ret; + + ret = pthread_cond_timedwait(cond, &lock->mutex, abstime); + lock->file = file; + lock->func = func; + lock->line = line; + return ret; +} + + +int _mutex_timedlock(mutex_t *lock, int timeout, const char *file, const char *func, const int line) { tv_t now; ts_t abs; @@ -125,22 +147,28 @@ int mutex_timedlock(pthread_mutex_t *lock, int timeout) tv_to_ts(&abs, &now); abs.tv_sec += timeout; - ret = pthread_mutex_timedlock(lock, &abs); + ret = pthread_mutex_timedlock(&lock->mutex, &abs); + if (!ret) { + lock->file = file; + lock->func = func; + lock->line = line; + } return ret; } /* Make every locking attempt warn if we're unable to get the lock for more * than 10 seconds and fail if we can't get it for longer than a minute. */ -void _mutex_lock(pthread_mutex_t *lock, const char *file, const char *func, const int line) +void _mutex_lock(mutex_t *lock, const char *file, const char *func, const int line) { int ret, retries = 0; retry: - ret = mutex_timedlock(lock, 10); + ret = _mutex_timedlock(lock, 10, file, func, line); if (unlikely(ret)) { if (likely(ret == ETIMEDOUT)) { - LOGERR("WARNING: Prolonged mutex lock contention from %s %s:%d", file, func, line); + LOGERR("WARNING: Prolonged mutex lock contention from %s %s:%d, held by %s %s:%d", + file, func, line, lock->file, lock->func, lock->line); if (++retries < 6) goto retry; quitfrom(1, file, func, line, "FAILED TO GRAB MUTEX!"); @@ -149,22 +177,33 @@ retry: } } -void _mutex_unlock(pthread_mutex_t *lock, const char *file, const char *func, const int line) +/* Does not unset lock->file/func/line since they're only relevant when the lock is held */ +void _mutex_unlock(mutex_t *lock, const char *file, const char *func, const int line) { - if (unlikely(pthread_mutex_unlock(lock))) + if (unlikely(pthread_mutex_unlock(&lock->mutex))) quitfrom(1, file, func, line, "WTF MUTEX ERROR ON UNLOCK!"); } -int _mutex_trylock(pthread_mutex_t *lock, __maybe_unused const char *file, __maybe_unused const char *func, __maybe_unused const int line) +int _mutex_trylock(mutex_t *lock, __maybe_unused const char *file, __maybe_unused const char *func, __maybe_unused const int line) { int ret; - ret = pthread_mutex_trylock(lock); - + ret = pthread_mutex_trylock(&lock->mutex); + if (!ret) { + lock->file = file; + lock->func = func; + lock->line = line; + } return ret; } -int wr_timedlock(pthread_rwlock_t *lock, int timeout) +void mutex_destroy(mutex_t *lock) +{ + pthread_mutex_destroy(&lock->mutex); +} + + +static int wr_timedlock(pthread_rwlock_t *lock, int timeout) { tv_t now; ts_t abs; @@ -179,31 +218,40 @@ int wr_timedlock(pthread_rwlock_t *lock, int timeout) return ret; } -void _wr_lock(pthread_rwlock_t *lock, const char *file, const char *func, const int line) +void _wr_lock(rwlock_t *lock, const char *file, const char *func, const int line) { int ret, retries = 0; retry: - ret = wr_timedlock(lock, 10); + ret = wr_timedlock(&lock->rwlock, 10); if (unlikely(ret)) { if (likely(ret == ETIMEDOUT)) { - LOGERR("WARNING: Prolonged write lock contention from %s %s:%d", file, func, line); + LOGERR("WARNING: Prolonged write lock contention from %s %s:%d, held by %s %s:%d", + file, func, line, lock->file, lock->func, lock->line); if (++retries < 6) goto retry; quitfrom(1, file, func, line, "FAILED TO GRAB WRITE LOCK!"); } quitfrom(1, file, func, line, "WTF ERROR ON WRITE LOCK!"); } + lock->file = file; + lock->func = func; + lock->line = line; } -int _wr_trylock(pthread_rwlock_t *lock, __maybe_unused const char *file, __maybe_unused const char *func, __maybe_unused const int line) +int _wr_trylock(rwlock_t *lock, __maybe_unused const char *file, __maybe_unused const char *func, __maybe_unused const int line) { - int ret = pthread_rwlock_trywrlock(lock); + int ret = pthread_rwlock_trywrlock(&lock->rwlock); + if (!ret) { + lock->file = file; + lock->func = func; + lock->line = line; + } return ret; } -int rd_timedlock(pthread_rwlock_t *lock, int timeout) +static int rd_timedlock(pthread_rwlock_t *lock, int timeout) { tv_t now; ts_t abs; @@ -218,62 +266,55 @@ int rd_timedlock(pthread_rwlock_t *lock, int timeout) return ret; } -void _rd_lock(pthread_rwlock_t *lock, const char *file, const char *func, const int line) +void _rd_lock(rwlock_t *lock, const char *file, const char *func, const int line) { int ret, retries = 0; retry: - ret = rd_timedlock(lock, 10); + ret = rd_timedlock(&lock->rwlock, 10); if (unlikely(ret)) { if (likely(ret == ETIMEDOUT)) { - LOGERR("WARNING: Prolonged read lock contention from %s %s:%d", file, func, line); + LOGERR("WARNING: Prolonged read lock contention from %s %s:%d, held by %s %s:%d", + file, func, line, lock->file, lock->func, lock->line); if (++retries < 6) goto retry; quitfrom(1, file, func, line, "FAILED TO GRAB READ LOCK!"); } quitfrom(1, file, func, line, "WTF ERROR ON READ LOCK!"); } + lock->file = file; + lock->func = func; + lock->line = line; } -void _rw_unlock(pthread_rwlock_t *lock, const char *file, const char *func, const int line) +void _rw_unlock(rwlock_t *lock, const char *file, const char *func, const int line) { - if (unlikely(pthread_rwlock_unlock(lock))) + if (unlikely(pthread_rwlock_unlock(&lock->rwlock))) quitfrom(1, file, func, line, "WTF RWLOCK ERROR ON UNLOCK!"); } -void _rd_unlock(pthread_rwlock_t *lock, const char *file, const char *func, const int line) +void _rd_unlock(rwlock_t *lock, const char *file, const char *func, const int line) { _rw_unlock(lock, file, func, line); } -void _wr_unlock(pthread_rwlock_t *lock, const char *file, const char *func, const int line) +void _wr_unlock(rwlock_t *lock, const char *file, const char *func, const int line) { _rw_unlock(lock, file, func, line); } -void _mutex_init(pthread_mutex_t *lock, const char *file, const char *func, const int line) +void _mutex_init(mutex_t *lock, const char *file, const char *func, const int line) { - if (unlikely(pthread_mutex_init(lock, NULL))) + if (unlikely(pthread_mutex_init(&lock->mutex, NULL))) quitfrom(1, file, func, line, "Failed to pthread_mutex_init"); } -void mutex_destroy(pthread_mutex_t *lock) -{ - /* Ignore return code. This only invalidates the mutex on linux but - * releases resources on windows. */ - pthread_mutex_destroy(lock); -} - -void _rwlock_init(pthread_rwlock_t *lock, const char *file, const char *func, const int line) +void _rwlock_init(rwlock_t *lock, const char *file, const char *func, const int line) { - if (unlikely(pthread_rwlock_init(lock, NULL))) + if (unlikely(pthread_rwlock_init(&lock->rwlock, NULL))) quitfrom(1, file, func, line, "Failed to pthread_rwlock_init"); } -void rwlock_destroy(pthread_rwlock_t *lock) -{ - pthread_rwlock_destroy(lock); -} void _cond_init(pthread_cond_t *cond, const char *file, const char *func, const int line) { @@ -287,11 +328,6 @@ void _cklock_init(cklock_t *lock, const char *file, const char *func, const int _rwlock_init(&lock->rwlock, file, func, line); } -void cklock_destroy(cklock_t *lock) -{ - rwlock_destroy(&lock->rwlock); - mutex_destroy(&lock->mutex); -} /* Read lock variant of cklock. Cannot be promoted. */ void _ck_rlock(cklock_t *lock, const char *file, const char *func, const int line) @@ -359,6 +395,13 @@ void _ck_wunlock(cklock_t *lock, const char *file, const char *func, const int l _mutex_unlock(&lock->mutex, file, func, line); } +void cklock_destroy(cklock_t *lock) +{ + pthread_rwlock_destroy(&lock->rwlock.rwlock); + pthread_mutex_destroy(&lock->mutex.mutex); +} + + void _cksem_init(sem_t *sem, const char *file, const char *func, const int line) { int ret; diff --git a/src/libckpool.h b/src/libckpool.h index 9237707d..e21acc96 100644 --- a/src/libckpool.h +++ b/src/libckpool.h @@ -145,6 +145,9 @@ static inline void flip_80(void *dest_p, const void *src_p) dest[i] = bswap_32(src[i]); } +#define cond_wait(_cond, _lock) _cond_wait(_cond, _lock, __FILE__, __func__, __LINE__) +#define cond_timedwait(_cond, _lock, _abstime) _cond_timedwait(_cond, _lock, _abstime, __FILE__, __func__, __LINE__) +#define mutex_timedlock(_lock, _timeout) _mutex_timedlock(_lock, _timeout, __FILE__, __func__, __LINE__) #define mutex_lock(_lock) _mutex_lock(_lock, __FILE__, __func__, __LINE__) #define mutex_unlock_noyield(_lock) _mutex_unlock_noyield(_lock, __FILE__, __func__, __LINE__) #define mutex_unlock(_lock) _mutex_unlock(_lock, __FILE__, __func__, __LINE__) @@ -274,10 +277,31 @@ static const char __maybe_unused *share_errs[] = { #define SHARE_ERR(x) share_errs[((x) + 9)] -/* ck locks, a write biased variant of rwlocks */ -struct cklock { +typedef struct ckmutex mutex_t; + +struct ckmutex { pthread_mutex_t mutex; + const char *file; + const char *func; + int line; +}; + +typedef struct ckrwlock rwlock_t; + +struct ckrwlock { pthread_rwlock_t rwlock; + const char *file; + const char *func; + int line; +}; + +/* ck locks, a write biased variant of rwlocks */ +struct cklock { + mutex_t mutex; + rwlock_t rwlock; + const char *file; + const char *func; + int line; }; typedef struct cklock cklock_t; @@ -385,29 +409,28 @@ void create_pthread(pthread_t *thread, void *(*start_routine)(void *), void *arg void join_pthread(pthread_t thread); bool ck_completion_timeout(void *fn, void *fnarg, int timeout); -int mutex_timedlock(pthread_mutex_t *lock, int timeout); -void _mutex_lock(pthread_mutex_t *lock, const char *file, const char *func, const int line); -void _mutex_unlock_noyield(pthread_mutex_t *lock, const char *file, const char *func, const int line); -void _mutex_unlock(pthread_mutex_t *lock, const char *file, const char *func, const int line); -int _mutex_trylock(pthread_mutex_t *lock, __maybe_unused const char *file, __maybe_unused const char *func, __maybe_unused const int line); -int wr_timedlock(pthread_rwlock_t *lock, int timeout); -void _wr_lock(pthread_rwlock_t *lock, const char *file, const char *func, const int line); -int _wr_trylock(pthread_rwlock_t *lock, __maybe_unused const char *file, __maybe_unused const char *func, __maybe_unused const int line); -int rd_timedlock(pthread_rwlock_t *lock, int timeout); -void _rd_lock(pthread_rwlock_t *lock, const char *file, const char *func, const int line); -void _rw_unlock(pthread_rwlock_t *lock, const char *file, const char *func, const int line); -void _rd_unlock_noyield(pthread_rwlock_t *lock, const char *file, const char *func, const int line); -void _wr_unlock_noyield(pthread_rwlock_t *lock, const char *file, const char *func, const int line); -void _rd_unlock(pthread_rwlock_t *lock, const char *file, const char *func, const int line); -void _wr_unlock(pthread_rwlock_t *lock, const char *file, const char *func, const int line); -void _mutex_init(pthread_mutex_t *lock, const char *file, const char *func, const int line); -void mutex_destroy(pthread_mutex_t *lock); -void _rwlock_init(pthread_rwlock_t *lock, const char *file, const char *func, const int line); -void rwlock_destroy(pthread_rwlock_t *lock); +int _cond_wait(pthread_cond_t *cond, mutex_t *lock, const char *file, const char *func, const int line); +int _cond_timedwait(pthread_cond_t *cond, mutex_t *lock, const struct timespec *abstime, const char *file, const char *func, const int line); +int _mutex_timedlock(mutex_t *lock, int timeout, const char *file, const char *func, const int line); +void _mutex_lock(mutex_t *lock, const char *file, const char *func, const int line); +void _mutex_unlock_noyield(mutex_t *lock, const char *file, const char *func, const int line); +void _mutex_unlock(mutex_t *lock, const char *file, const char *func, const int line); +int _mutex_trylock(mutex_t *lock, __maybe_unused const char *file, __maybe_unused const char *func, __maybe_unused const int line); +void mutex_destroy(mutex_t *lock); + +void _wr_lock(rwlock_t *lock, const char *file, const char *func, const int line); +int _wr_trylock(rwlock_t *lock, __maybe_unused const char *file, __maybe_unused const char *func, __maybe_unused const int line); +void _rd_lock(rwlock_t *lock, const char *file, const char *func, const int line); +void _rw_unlock(rwlock_t *lock, const char *file, const char *func, const int line); +void _rd_unlock_noyield(rwlock_t *lock, const char *file, const char *func, const int line); +void _wr_unlock_noyield(rwlock_t *lock, const char *file, const char *func, const int line); +void _rd_unlock(rwlock_t *lock, const char *file, const char *func, const int line); +void _wr_unlock(rwlock_t *lock, const char *file, const char *func, const int line); +void _mutex_init(mutex_t *lock, const char *file, const char *func, const int line); +void _rwlock_init(rwlock_t *lock, const char *file, const char *func, const int line); void _cond_init(pthread_cond_t *cond, const char *file, const char *func, const int line); void _cklock_init(cklock_t *lock, const char *file, const char *func, const int line); -void cklock_destroy(cklock_t *lock); void _ck_rlock(cklock_t *lock, const char *file, const char *func, const int line); void _ck_ilock(cklock_t *lock, const char *file, const char *func, const int line); void _ck_uilock(cklock_t *lock, const char *file, const char *func, const int line); @@ -418,6 +441,7 @@ void _ck_dwilock(cklock_t *lock, const char *file, const char *func, const int l void _ck_dlock(cklock_t *lock, const char *file, const char *func, const int line); void _ck_runlock(cklock_t *lock, const char *file, const char *func, const int line); void _ck_wunlock(cklock_t *lock, const char *file, const char *func, const int line); +void cklock_destroy(cklock_t *lock); void _cksem_init(sem_t *sem, const char *file, const char *func, const int line); void _cksem_post(sem_t *sem, const char *file, const char *func, const int line); diff --git a/src/stratifier.c b/src/stratifier.c index 889a22db..a62530e7 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -285,10 +285,10 @@ struct stratifier_data { pool_stats_t stats; /* Protects changes to pool stats */ - pthread_mutex_t stats_lock; + mutex_t stats_lock; /* Serialises sends/receives to ckdb if possible */ - pthread_mutex_t ckdb_lock; + mutex_t ckdb_lock; bool ckdb_offline; @@ -336,14 +336,14 @@ struct stratifier_data { cklock_t instance_lock; share_t *shares; - pthread_mutex_t share_lock; + mutex_t share_lock; int64_t shares_generated; /* Linked list of block solves, added to during submission, removed on * accept/reject. It is likely we only ever have one solve on here but * you never know... */ - pthread_mutex_t block_lock; + mutex_t block_lock; ckmsg_t *block_solves; /* Generator message priority */ @@ -3676,7 +3676,7 @@ static void parse_ckdb_cmd(ckpool_t *ckp, const char *cmd) } /* Test a value under lock and set it, returning the original value */ -static bool test_and_set(bool *val, pthread_mutex_t *lock) +static bool test_and_set(bool *val, mutex_t *lock) { bool ret; @@ -3688,7 +3688,7 @@ static bool test_and_set(bool *val, pthread_mutex_t *lock) return ret; } -static bool test_and_clear(bool *val, pthread_mutex_t *lock) +static bool test_and_clear(bool *val, mutex_t *lock) { bool ret; From a0753a3965e03de3f662e12e10bb9ad05947fa35 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Sun, 8 Feb 2015 10:12:04 +1100 Subject: [PATCH 08/35] Handle other forms of read_socket_line ending after message complete as not a failure --- src/ckpool.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/ckpool.c b/src/ckpool.c index e90d0bb5..55f9185d 100644 --- a/src/ckpool.c +++ b/src/ckpool.c @@ -419,9 +419,9 @@ int read_socket_line(connsock_t *cs, const int timeout) char *newbuf; ret = wait_read_select(fd, eom ? 0 : timeout); - if (eom && !ret) - break; if (ret < 1) { + if (eom) + break; if (!ret) LOGDEBUG("Select timed out in read_socket_line"); else @@ -431,7 +431,7 @@ int read_socket_line(connsock_t *cs, const int timeout) ret = recv(fd, readbuf, PAGESIZE - 4, 0); if (ret < 1) { /* Closed socket after valid message */ - if (!ret && eom) + if (eom) break; LOGERR("Failed to recv in read_socket_line"); ret = -1; From 08ef8ef3debe889d6d5423ddf0b7f97d0229c786 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Sat, 14 Feb 2015 11:25:33 +1100 Subject: [PATCH 09/35] Differentiate pong from other spurious messages from clients --- src/stratifier.c | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/stratifier.c b/src/stratifier.c index a62530e7..57d73a0a 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -3414,7 +3414,11 @@ static void parse_instance_msg(sdata_t *sdata, smsg_t *msg, stratum_instance_t * if (res_val) { const char *result = json_string_value(res_val); - LOGDEBUG("Received spurious response %s", result ? result : ""); + if (!safecmp(result, "pong")) + LOGDEBUG("Received pong from client %"PRId64, client_id); + else + LOGDEBUG("Received spurious response %s from client %"PRId64, + result ? result : "", client_id); goto out; } send_json_err(sdata, client_id, id_val, "-3:method not found"); From 2b97f1833faabdf56b6a4f79cc1aad68f4e16751 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Fri, 20 Feb 2015 15:57:55 +1100 Subject: [PATCH 10/35] Shutdown instead of closing a socket after sending a unix message allowing the receiving end to close the socket after receiving the data --- src/ckpool.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ckpool.c b/src/ckpool.c index 55f9185d..0d176bac 100644 --- a/src/ckpool.c +++ b/src/ckpool.c @@ -525,7 +525,7 @@ bool _send_proc(proc_instance_t *pi, const char *msg, const char *file, const ch LOGWARNING("Failed to send %s to socket %s", msg, path); else ret = true; - Close(sockd); + shutdown(sockd, SHUT_WR); out: if (unlikely(!ret)) { LOGERR("Failure in send_proc from %s %s:%d", file, func, line); From 316ceba75bbc17df77cfdb471de23dc5aecbdd04 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Fri, 20 Feb 2015 17:11:38 +1100 Subject: [PATCH 11/35] Check for pid in send_recv_proc as well --- src/ckpool.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/ckpool.c b/src/ckpool.c index 0d176bac..d6091ff8 100644 --- a/src/ckpool.c +++ b/src/ckpool.c @@ -549,6 +549,8 @@ char *_send_recv_proc(proc_instance_t *pi, const char *msg, const char *file, co LOGERR("Attempted to send null message to socket %s in send_proc", path); goto out; } + if (unlikely(!pi->pid)) + pi->pid = get_proc_pid(pi); if (unlikely(kill_pid(pi->pid, 0))) { LOGALERT("Attempting to send message %s to dead process %s", msg, pi->processname); goto out; From 90c682177f67e86e19c477c1831c9d03f5edbfd8 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Fri, 20 Feb 2015 17:49:56 +1100 Subject: [PATCH 12/35] Wait for the other end to close a unix socket to ensure the message has gone through --- src/ckpool.c | 2 ++ src/libckpool.c | 14 ++++++++++++++ src/libckpool.h | 1 + 3 files changed, 17 insertions(+) diff --git a/src/ckpool.c b/src/ckpool.c index d6091ff8..004a09a3 100644 --- a/src/ckpool.c +++ b/src/ckpool.c @@ -526,6 +526,8 @@ bool _send_proc(proc_instance_t *pi, const char *msg, const char *file, const ch else ret = true; shutdown(sockd, SHUT_WR); + if (!wait_close(sockd, 5)) + LOGWARNING("send_proc did not close from %s %s:%d", file, func, line); out: if (unlikely(!ret)) { LOGERR("Failure in send_proc from %s %s:%d", file, func, line); diff --git a/src/libckpool.c b/src/libckpool.c index 19fe19ed..cae50bda 100644 --- a/src/libckpool.c +++ b/src/libckpool.c @@ -910,6 +910,20 @@ out: return sockd; } +/* Wait till a socket has been closed at the other end */ +int wait_close(int sockd, int timeout) +{ + struct pollfd sfd; + + if (unlikely(sockd < 0)) + return -1; + sfd.fd = sockd; + sfd.events = POLLHUP; + sfd.revents = 0; + timeout *= 1000; + return poll(&sfd, 1, timeout); +} + /* Emulate a select read wait for high fds that select doesn't support */ int wait_read_select(int sockd, int timeout) { diff --git a/src/libckpool.h b/src/libckpool.h index e21acc96..79f0f8fd 100644 --- a/src/libckpool.h +++ b/src/libckpool.h @@ -493,6 +493,7 @@ int _open_unix_server(const char *server_path, const char *file, const char *fun #define open_unix_server(server_path) _open_unix_server(server_path, __FILE__, __func__, __LINE__) int _open_unix_client(const char *server_path, const char *file, const char *func, const int line); #define open_unix_client(server_path) _open_unix_client(server_path, __FILE__, __func__, __LINE__) +int wait_close(int sockd, int timeout); int wait_read_select(int sockd, int timeout); int read_length(int sockd, void *buf, int len); char *_recv_unix_msg(int sockd, int timeout1, int timeout2, const char *file, const char *func, const int line); From 236634239e9b680b01f3be88b87c55131396c510 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Fri, 20 Feb 2015 20:04:03 +1100 Subject: [PATCH 13/35] Close our end of the socket in send_proc --- src/ckpool.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ckpool.c b/src/ckpool.c index 004a09a3..3b433d44 100644 --- a/src/ckpool.c +++ b/src/ckpool.c @@ -525,9 +525,9 @@ bool _send_proc(proc_instance_t *pi, const char *msg, const char *file, const ch LOGWARNING("Failed to send %s to socket %s", msg, path); else ret = true; - shutdown(sockd, SHUT_WR); if (!wait_close(sockd, 5)) LOGWARNING("send_proc did not close from %s %s:%d", file, func, line); + Close(sockd); out: if (unlikely(!ret)) { LOGERR("Failure in send_proc from %s %s:%d", file, func, line); From 07874e9f3027cf6723baa594d70892fdf91a607c Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Fri, 20 Feb 2015 23:59:42 +1100 Subject: [PATCH 14/35] Show message associated with no close fd detection --- src/ckpool.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ckpool.c b/src/ckpool.c index 3b433d44..70d33adb 100644 --- a/src/ckpool.c +++ b/src/ckpool.c @@ -526,7 +526,7 @@ bool _send_proc(proc_instance_t *pi, const char *msg, const char *file, const ch else ret = true; if (!wait_close(sockd, 5)) - LOGWARNING("send_proc did not close from %s %s:%d", file, func, line); + LOGWARNING("send_proc %s did not detect close from %s %s:%d", msg, file, func, line); Close(sockd); out: if (unlikely(!ret)) { From 85b17f1b78563baa3705b5e0b152e30bd6c6ff7f Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Sat, 21 Feb 2015 00:34:48 +1100 Subject: [PATCH 15/35] Check for correct condition in wait_close --- src/libckpool.c | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/libckpool.c b/src/libckpool.c index cae50bda..7fc3017b 100644 --- a/src/libckpool.c +++ b/src/libckpool.c @@ -914,14 +914,18 @@ out: int wait_close(int sockd, int timeout) { struct pollfd sfd; + int ret; if (unlikely(sockd < 0)) return -1; sfd.fd = sockd; - sfd.events = POLLHUP; + sfd.events = POLLIN; sfd.revents = 0; timeout *= 1000; - return poll(&sfd, 1, timeout); + ret = poll(&sfd, 1, timeout); + if (ret < 1) + return 0; + return sfd.revents & POLLHUP; } /* Emulate a select read wait for high fds that select doesn't support */ From a23060d786600ee337aad92b1ca9089a064096a2 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Sat, 21 Feb 2015 01:08:26 +1100 Subject: [PATCH 16/35] Fix buf dereference error --- src/stratifier.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/stratifier.c b/src/stratifier.c index 57d73a0a..9bf2ab98 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -4342,8 +4342,8 @@ int stratifier(proc_instance_t *pi) ckpool_t *ckp = pi->ckp; int ret = 1, threads; int64_t randomiser; + char *buf = NULL; sdata_t *sdata; - char *buf; LOGWARNING("%s stratifier starting", ckp->name); sdata = ckzalloc(sizeof(sdata_t)); From 487e918ff7eaafaeade6b57d7a9e6d26e882946c Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Sat, 21 Feb 2015 01:16:05 +1100 Subject: [PATCH 17/35] Return value of send_proc is never used --- src/ckpool.c | 3 +-- src/ckpool.h | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/src/ckpool.c b/src/ckpool.c index 70d33adb..4b753074 100644 --- a/src/ckpool.c +++ b/src/ckpool.c @@ -494,7 +494,7 @@ out: /* Send a single message to a process instance when there will be no response, * closing the socket immediately. */ -bool _send_proc(proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line) +void _send_proc(proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line) { char *path = pi->us.path; bool ret = false; @@ -533,7 +533,6 @@ out: LOGERR("Failure in send_proc from %s %s:%d", file, func, line); childsighandler(15); } - return ret; } /* Send a single message to a process instance and retrieve the response, then diff --git a/src/ckpool.h b/src/ckpool.h index 797beb24..771ac692 100644 --- a/src/ckpool.h +++ b/src/ckpool.h @@ -212,7 +212,7 @@ ckpool_t *global_ckp; bool ping_main(ckpool_t *ckp); void empty_buffer(connsock_t *cs); int read_socket_line(connsock_t *cs, const int timeout); -bool _send_proc(proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line); +void _send_proc(proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line); #define send_proc(pi, msg) _send_proc(pi, msg, __FILE__, __func__, __LINE__) char *_send_recv_proc(proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line); #define send_recv_proc(pi, msg) _send_recv_proc(pi, msg, __FILE__, __func__, __LINE__) From bd68f928b710816c984ec3a8dbc852f10ed3f6b4 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Sat, 21 Feb 2015 01:49:24 +1100 Subject: [PATCH 18/35] Make all one way send_procs asynchronous to avoid message response deadlocks --- src/ckpool.c | 42 +++++++++++++++++++++++++++++++++++++++--- 1 file changed, 39 insertions(+), 3 deletions(-) diff --git a/src/ckpool.c b/src/ckpool.c index 4b753074..989b1b65 100644 --- a/src/ckpool.c +++ b/src/ckpool.c @@ -492,14 +492,32 @@ out: return pid; } -/* Send a single message to a process instance when there will be no response, - * closing the socket immediately. */ -void _send_proc(proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line) +struct proc_message { + proc_instance_t *pi; + char *msg; + const char *file; + const char *func; + int line; +}; + +/* Send all one way messages asynchronously so we can wait till the receiving + * end closes the socket to ensure all messages are received but no deadlocks + * can occur with 2 processes waiting for each other's socket closure. */ +void *async_send_proc(void *arg) { + struct proc_message *pm = (struct proc_message *)arg; + proc_instance_t *pi = pm->pi; + char *msg = pm->msg; + const char *file = pm->file; + const char *func = pm->func; + int line = pm->line; + char *path = pi->us.path; bool ret = false; int sockd; + pthread_detach(pthread_self()); + if (unlikely(!path || !strlen(path))) { LOGERR("Attempted to send message %s to null path in send_proc", msg ? msg : ""); goto out; @@ -533,6 +551,24 @@ out: LOGERR("Failure in send_proc from %s %s:%d", file, func, line); childsighandler(15); } + free(msg); + free(pm); + return NULL; +} + +/* Send a single message to a process instance when there will be no response, + * closing the socket immediately. */ +void _send_proc(proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line) +{ + struct proc_message *pm = ckalloc(sizeof(struct proc_message)); + pthread_t pth; + + pm->pi = pi; + pm->msg = strdup(msg); + pm->file = file; + pm->func = func; + pm->line = line; + create_pthread(&pth, async_send_proc, pm); } /* Send a single message to a process instance and retrieve the response, then From 33508b2243818bea6c96037800ced4a59da3d0c5 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Sat, 21 Feb 2015 09:47:37 +1100 Subject: [PATCH 19/35] Reset the pi pid after a failure to find the process alive so we can look it up again in case it has changed --- src/ckpool.c | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/ckpool.c b/src/ckpool.c index 989b1b65..b4b280f4 100644 --- a/src/ckpool.c +++ b/src/ckpool.c @@ -589,6 +589,9 @@ char *_send_recv_proc(proc_instance_t *pi, const char *msg, const char *file, co if (unlikely(!pi->pid)) pi->pid = get_proc_pid(pi); if (unlikely(kill_pid(pi->pid, 0))) { + /* Reset the pid value in case we are still looking for an old + * process */ + pi->pid = 0; LOGALERT("Attempting to send message %s to dead process %s", msg, pi->processname); goto out; } From 646d4a95602ead1e6cad397e3ef18c62f11d8e29 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Sat, 21 Feb 2015 17:50:50 +1100 Subject: [PATCH 20/35] Cope with unknown pids in various send msg commands without terminal failure --- src/ckpool.c | 60 +++++++++++++++++++++++++++++++++------------------- 1 file changed, 38 insertions(+), 22 deletions(-) diff --git a/src/ckpool.c b/src/ckpool.c index b4b280f4..56d1c385 100644 --- a/src/ckpool.c +++ b/src/ckpool.c @@ -237,7 +237,25 @@ static int pid_wait(const pid_t pid, const int ms) return ret; } -static int send_procmsg(const proc_instance_t *pi, const char *buf) +static int get_proc_pid(const proc_instance_t *pi) +{ + int ret, pid = 0; + char path[256]; + FILE *fp; + + sprintf(path, "%s%s.pid", pi->ckp->socket_dir, pi->processname); + fp = fopen(path, "re"); + if (!fp) + goto out; + ret = fscanf(fp, "%d", &pid); + if (ret < 1) + pid = 0; + fclose(fp); +out: + return pid; +} + +static int send_procmsg(proc_instance_t *pi, const char *buf) { char *path = pi->us.path; int ret = -1; @@ -251,6 +269,12 @@ static int send_procmsg(const proc_instance_t *pi, const char *buf) LOGERR("Attempted to send null message to socket %s in send_proc", path); goto out; } + if (unlikely(!pi->pid)) { + pi->pid = get_proc_pid(pi); + if (!pi->pid) + goto out; + + } if (unlikely(kill_pid(pi->pid, 0))) { LOGALERT("Attempting to send message %s to dead process %s", buf, pi->processname); goto out; @@ -474,24 +498,6 @@ out: static void childsighandler(const int sig); -static int get_proc_pid(const proc_instance_t *pi) -{ - int ret, pid = 0; - char path[256]; - FILE *fp; - - sprintf(path, "%s%s.pid", pi->ckp->socket_dir, pi->processname); - fp = fopen(path, "re"); - if (!fp) - goto out; - ret = fscanf(fp, "%d", &pid); - if (ret < 1) - pid = 0; - fclose(fp); -out: - return pid; -} - struct proc_message { proc_instance_t *pi; char *msg; @@ -528,10 +534,16 @@ void *async_send_proc(void *arg) } /* At startup the pid fields are not set up before some processes are * forked so they never inherit them. */ - if (unlikely(!pi->pid)) + if (unlikely(!pi->pid)) { pi->pid = get_proc_pid(pi); + if (!pi->pid) { + LOGALERT("Attempting to send message %s to non existent process %s", msg, pi->processname); + goto out_nofail; + } + } if (unlikely(kill_pid(pi->pid, 0))) { - LOGALERT("Attempting to send message %s to non existent process %s", msg, pi->processname); + LOGALERT("Attempting to send message %s to non existent process %s pid %d", + msg, pi->processname, pi->pid); goto out; } sockd = open_unix_client(path); @@ -551,6 +563,7 @@ out: LOGERR("Failure in send_proc from %s %s:%d", file, func, line); childsighandler(15); } +out_nofail: free(msg); free(pm); return NULL; @@ -586,8 +599,11 @@ char *_send_recv_proc(proc_instance_t *pi, const char *msg, const char *file, co LOGERR("Attempted to send null message to socket %s in send_proc", path); goto out; } - if (unlikely(!pi->pid)) + if (unlikely(!pi->pid)) { pi->pid = get_proc_pid(pi); + if (!pi->pid) + goto out; + } if (unlikely(kill_pid(pi->pid, 0))) { /* Reset the pid value in case we are still looking for an old * process */ From d15ccdf54d713da45e734833b448e7c60b6ebeea Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Sat, 21 Feb 2015 17:59:59 +1100 Subject: [PATCH 21/35] Remove the old pid file per process when preparing the new child processes --- src/ckpool.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/ckpool.c b/src/ckpool.c index 56d1c385..a1c4a514 100644 --- a/src/ckpool.c +++ b/src/ckpool.c @@ -1260,6 +1260,8 @@ static proc_instance_t *prepare_child(ckpool_t *ckp, int (*process)(), char *nam pi->process = process; create_process_unixsock(pi); manage_old_child(ckp, pi); + /* Remove the old pid file if we've succeeded in coming this far */ + rm_namepid(pi); return pi; } From d594f86520731384278ba32f3190bef6ba64ea9f Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Mon, 23 Feb 2015 09:54:39 +1100 Subject: [PATCH 22/35] Create generic workqueue function and message receiving and parsing helpers --- src/ckpool.c | 72 +++++++++++++++++++++++++++++++++++++++++++++++++++- src/ckpool.h | 29 ++++++++++++++++++--- 2 files changed, 97 insertions(+), 4 deletions(-) diff --git a/src/ckpool.c b/src/ckpool.c index a1c4a514..6b8fa480 100644 --- a/src/ckpool.c +++ b/src/ckpool.c @@ -134,6 +134,39 @@ static void *ckmsg_queue(void *arg) return NULL; } +/* Generic workqueue function and message receiving and parsing thread */ +static void *ckwq_queue(void *arg) +{ + ckwq_t *ckmsgq = (ckwq_t *)arg; + ckpool_t *ckp = ckmsgq->ckp; + + pthread_detach(pthread_self()); + rename_proc(ckmsgq->name); + + while (42) { + ckwqmsg_t *wqmsg; + tv_t now; + ts_t abs; + + mutex_lock(ckmsgq->lock); + tv_time(&now); + tv_to_ts(&abs, &now); + abs.tv_sec++; + if (!ckmsgq->wqmsgs) + cond_timedwait(ckmsgq->cond, ckmsgq->lock, &abs); + wqmsg = ckmsgq->wqmsgs; + if (wqmsg) + DL_DELETE(ckmsgq->wqmsgs, wqmsg); + mutex_unlock(ckmsgq->lock); + + if (!wqmsg) + continue; + wqmsg->func(ckp, wqmsg->data); + free(wqmsg); + } + return NULL; +} + ckmsgq_t *create_ckmsgq(ckpool_t *ckp, const char *name, const void *func) { ckmsgq_t *ckmsgq = ckzalloc(sizeof(ckmsgq_t)); @@ -174,6 +207,29 @@ ckmsgq_t *create_ckmsgqs(ckpool_t *ckp, const char *name, const void *func, cons return ckmsgq; } +ckwq_t *create_ckwqs(ckpool_t *ckp, const char *name, const int count) +{ + ckwq_t *ckwq = ckzalloc(sizeof(ckmsgq_t) * count); + mutex_t *lock; + pthread_cond_t *cond; + int i; + + lock = ckalloc(sizeof(mutex_t)); + cond = ckalloc(sizeof(pthread_cond_t)); + mutex_init(lock); + cond_init(cond); + + for (i = 0; i < count; i++) { + snprintf(ckwq[i].name, 15, "%.6swq%d", name, i); + ckwq[i].ckp = ckp; + ckwq[i].lock = lock; + ckwq[i].cond = cond; + create_pthread(&ckwq[i].pth, ckwq_queue, &ckwq[i]); + } + + return ckwq; +} + /* Generic function for adding messages to a ckmsgq linked list and signal the * ckmsgq parsing thread to wake up and process it. */ void ckmsgq_add(ckmsgq_t *ckmsgq, void *data) @@ -185,10 +241,24 @@ void ckmsgq_add(ckmsgq_t *ckmsgq, void *data) mutex_lock(ckmsgq->lock); ckmsgq->messages++; DL_APPEND(ckmsgq->msgs, msg); - pthread_cond_signal(ckmsgq->cond); + pthread_cond_broadcast(ckmsgq->cond); mutex_unlock(ckmsgq->lock); } +void ckwq_add(ckwq_t *ckwq, const void *func, void *data) +{ + ckwqmsg_t *wqmsg = ckalloc(sizeof(ckwqmsg_t)); + + wqmsg->func = func; + wqmsg->data = data; + + mutex_lock(ckwq->lock); + ckwq->messages++; + DL_APPEND(ckwq->wqmsgs, wqmsg); + pthread_cond_broadcast(ckwq->cond); + mutex_unlock(ckwq->lock); +} + /* Return whether there are any messages queued in the ckmsgq linked list. */ bool ckmsgq_empty(ckmsgq_t *ckmsgq) { diff --git a/src/ckpool.h b/src/ckpool.h index 771ac692..eaf166fd 100644 --- a/src/ckpool.h +++ b/src/ckpool.h @@ -22,13 +22,22 @@ struct ckpool_instance; typedef struct ckpool_instance ckpool_t; +typedef struct ckmsg ckmsg_t; + struct ckmsg { - struct ckmsg *next; - struct ckmsg *prev; + ckmsg_t *next; + ckmsg_t *prev; void *data; }; -typedef struct ckmsg ckmsg_t; +typedef struct ckwqmsg ckwqmsg_t; + +struct ckwqmsg { + ckwqmsg_t *next; + ckwqmsg_t *prev; + void *data; + void (*func)(ckpool_t *, void *); +}; struct ckmsgq { ckpool_t *ckp; @@ -43,6 +52,18 @@ struct ckmsgq { typedef struct ckmsgq ckmsgq_t; +struct ckwq { + ckpool_t *ckp; + char name[16]; + pthread_t pth; + mutex_t *lock; + pthread_cond_t *cond; + ckwqmsg_t *wqmsgs; + int64_t messages; +}; + +typedef struct ckwq ckwq_t; + struct proc_instance { ckpool_t *ckp; unixsock_t us; @@ -204,7 +225,9 @@ struct ckpool_instance { ckmsgq_t *create_ckmsgq(ckpool_t *ckp, const char *name, const void *func); ckmsgq_t *create_ckmsgqs(ckpool_t *ckp, const char *name, const void *func, const int count); +ckwq_t *create_ckwqs(ckpool_t *ckp, const char *name, const int count); void ckmsgq_add(ckmsgq_t *ckmsgq, void *data); +void ckwq_add(ckwq_t *ckwq, const void *func, void *data); bool ckmsgq_empty(ckmsgq_t *ckmsgq); ckpool_t *global_ckp; From 2d94b18b99b2cef3bc5dc5c987eff267ff50f123 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Mon, 23 Feb 2015 10:17:17 +1100 Subject: [PATCH 23/35] Create a pool of workqueue threads for use by the stratifier using them for share processing, stratum receiving and transaction processing --- src/stratifier.c | 47 ++++++++++++++++++++++++++++++----------------- 1 file changed, 30 insertions(+), 17 deletions(-) diff --git a/src/stratifier.c b/src/stratifier.c index 9bf2ab98..442175d2 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -313,12 +313,10 @@ struct stratifier_data { char lasthash[68]; char lastswaphash[68]; + ckwq_t *ckwqs; // Generic workqueues ckmsgq_t *ssends; // Stratum sends - ckmsgq_t *srecvs; // Stratum receives ckmsgq_t *ckdbq; // ckdb - ckmsgq_t *sshareq; // Stratum share sends ckmsgq_t *sauthq; // Stratum authorisations - ckmsgq_t *stxnq; // Transaction requests int64_t user_instance_id; @@ -1644,6 +1642,21 @@ static void ckmsgq_stats(ckmsgq_t *ckmsgq, const int size, json_t **val) JSON_CPACK(*val, "{si,si,si}", "count", objects, "memory", memsize, "generated", generated); } +static void ckwq_stats(ckwq_t *ckwq, const int size, json_t **val) +{ + int objects, generated; + int64_t memsize; + ckwqmsg_t *wqmsg; + + mutex_lock(ckwq->lock); + DL_COUNT(ckwq->wqmsgs, wqmsg, objects); + generated = ckwq->messages; + mutex_unlock(ckwq->lock); + + memsize = (sizeof(ckwqmsg_t) + size) * objects; + JSON_CPACK(*val, "{si,si,si}", "count", objects, "memory", memsize, "generated", generated); +} + static char *stratifier_stats(ckpool_t *ckp, sdata_t *sdata) { json_t *val = json_object(), *subval; @@ -1690,15 +1703,14 @@ static char *stratifier_stats(ckpool_t *ckp, sdata_t *sdata) ckmsgq_stats(sdata->ssends, sizeof(smsg_t), &subval); json_set_object(val, "ssends", subval); + /* Don't know exactly how big the string is so just count the pointer for now */ - ckmsgq_stats(sdata->srecvs, sizeof(char *), &subval); - json_set_object(val, "srecvs", subval); + ckwq_stats(sdata->ckwqs, sizeof(char *) + sizeof(void *), &subval); + json_set_object(val, "ckwqs", subval); if (!CKP_STANDALONE(ckp)) { ckmsgq_stats(sdata->ckdbq, sizeof(char *), &subval); json_set_object(val, "ckdbq", subval); } - ckmsgq_stats(sdata->stxnq, sizeof(json_params_t), &subval); - json_set_object(val, "stxnq", subval); buf = json_dumps(val, JSON_NO_UTF8 | JSON_PRESERVE_ORDER); json_decref(val); @@ -1706,6 +1718,8 @@ static char *stratifier_stats(ckpool_t *ckp, sdata_t *sdata) return buf; } +static void srecv_process(ckpool_t *ckp, char *buf); + static int stratum_loop(ckpool_t *ckp, proc_instance_t *pi) { int sockd, ret = 0, selret = 0; @@ -1760,7 +1774,7 @@ retry: /* The bulk of the messages will be received json from the * connector so look for this first. The srecv_process frees * the buf heap ram */ - ckmsgq_add(sdata->srecvs, buf); + ckwq_add(sdata->ckwqs, &srecv_process, buf); Close(sockd); buf = NULL; goto retry; @@ -3282,6 +3296,9 @@ static void suggest_diff(stratum_instance_t *client, const char *method, const j stratum_send_diff(sdata, client); } +static void sshare_process(ckpool_t *ckp, json_params_t *jp); +static void send_transactions(ckpool_t *ckp, json_params_t *jp); + /* Enter with client holding ref count */ static void parse_method(sdata_t *sdata, stratum_instance_t *client, const int64_t client_id, json_t *id_val, json_t *method_val, json_t *params_val, const char *address) @@ -3296,7 +3313,7 @@ static void parse_method(sdata_t *sdata, stratum_instance_t *client, const int64 if (likely(cmdmatch(method, "mining.submit") && client->authorised)) { json_params_t *jp = create_json_params(client_id, method_val, params_val, id_val, address); - ckmsgq_add(sdata->sshareq, jp); + ckwq_add(sdata->ckwqs, &sshare_process, jp); return; } @@ -3375,7 +3392,7 @@ static void parse_method(sdata_t *sdata, stratum_instance_t *client, const int64 if (cmdmatch(method, "mining.get")) { json_params_t *jp = create_json_params(client_id, method_val, params_val, id_val, address); - ckmsgq_add(sdata->stxnq, jp); + ckwq_add(sdata->ckwqs, &send_transactions, jp); return; } /* Unhandled message here */ @@ -4391,14 +4408,10 @@ int stratifier(proc_instance_t *pi) mutex_init(&sdata->ckdb_lock); sdata->ssends = create_ckmsgq(ckp, "ssender", &ssend_process); - /* Create half as many share processing threads as there are CPUs */ - threads = sysconf(_SC_NPROCESSORS_ONLN) / 2 ? : 1; - sdata->sshareq = create_ckmsgqs(ckp, "sprocessor", &sshare_process, threads); - /* Create 1/4 as many stratum processing threads as there are CPUs */ - threads = threads / 2 ? : 1; - sdata->srecvs = create_ckmsgqs(ckp, "sreceiver", &srecv_process, threads); + /* Create as many generic workqueue threads as there are CPUs */ + threads = sysconf(_SC_NPROCESSORS_ONLN); + sdata->ckwqs = create_ckwqs(ckp, "strat", threads); sdata->sauthq = create_ckmsgq(ckp, "authoriser", &sauth_process); - sdata->stxnq = create_ckmsgq(ckp, "stxnq", &send_transactions); if (!CKP_STANDALONE(ckp)) { sdata->ckdbq = create_ckmsgq(ckp, "ckdbqueue", &ckdbq_process); create_pthread(&pth_heartbeat, ckdb_heartbeat, ckp); From 4c4b48795bdd4fea53762078ad39a57344c3b84f Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Mon, 23 Feb 2015 10:27:22 +1100 Subject: [PATCH 24/35] Use the generic workqueues for do_update --- src/stratifier.c | 32 ++++++++------------------------ 1 file changed, 8 insertions(+), 24 deletions(-) diff --git a/src/stratifier.c b/src/stratifier.c index 442175d2..2956ad36 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -800,33 +800,21 @@ static void send_generator(ckpool_t *ckp, const char *msg, const int prio) sdata->gen_priority = 0; } -struct update_req { - pthread_t *pth; - ckpool_t *ckp; - int prio; -}; - static void broadcast_ping(sdata_t *sdata); /* This function assumes it will only receive a valid json gbt base template * since checking should have been done earlier, and creates the base template * for generating work templates. */ -static void *do_update(void *arg) +static void do_update(ckpool_t *ckp, int *prio) { - struct update_req *ur = (struct update_req *)arg; - ckpool_t *ckp = ur->ckp; sdata_t *sdata = ckp->data; bool new_block = false; - int prio = ur->prio; bool ret = false; workbase_t *wb; json_t *val; char *buf; - pthread_detach(pthread_self()); - rename_proc("updater"); - - buf = send_recv_generator(ckp, "getbase", prio); + buf = send_recv_generator(ckp, "getbase", *prio); if (unlikely(!buf)) { LOGNOTICE("Get base in update_base delayed due to higher priority request"); goto out; @@ -886,21 +874,17 @@ out: LOGINFO("Broadcast ping due to failed stratum base update"); broadcast_ping(sdata); } - dealloc(buf); - free(ur->pth); - free(ur); - return NULL; + free(buf); + free(prio); } static void update_base(ckpool_t *ckp, const int prio) { - struct update_req *ur = ckalloc(sizeof(struct update_req)); - pthread_t *pth = ckalloc(sizeof(pthread_t)); + int *pprio = ckalloc(sizeof(int)); + sdata_t *sdata = ckp->data; - ur->pth = pth; - ur->ckp = ckp; - ur->prio = prio; - create_pthread(pth, do_update, ur); + *pprio = prio; + ckwq_add(sdata->ckwqs, &do_update, pprio); } static void __kill_instance(stratum_instance_t *client) From 5f9f01e89442c85cd927f5fed82af7b08394ec17 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Mon, 23 Feb 2015 11:11:00 +1100 Subject: [PATCH 25/35] Revert to synchronous proc messages in anticipation of new async functions --- src/ckpool.c | 55 ++++++++-------------------------------------------- 1 file changed, 8 insertions(+), 47 deletions(-) diff --git a/src/ckpool.c b/src/ckpool.c index 6b8fa480..2108e515 100644 --- a/src/ckpool.c +++ b/src/ckpool.c @@ -568,32 +568,14 @@ out: static void childsighandler(const int sig); -struct proc_message { - proc_instance_t *pi; - char *msg; - const char *file; - const char *func; - int line; -}; - -/* Send all one way messages asynchronously so we can wait till the receiving - * end closes the socket to ensure all messages are received but no deadlocks - * can occur with 2 processes waiting for each other's socket closure. */ -void *async_send_proc(void *arg) +/* Send a single message to a process instance when there will be no response, + * closing the socket immediately. */ +void _send_proc(proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line) { - struct proc_message *pm = (struct proc_message *)arg; - proc_instance_t *pi = pm->pi; - char *msg = pm->msg; - const char *file = pm->file; - const char *func = pm->func; - int line = pm->line; - char *path = pi->us.path; bool ret = false; int sockd; - pthread_detach(pthread_self()); - if (unlikely(!path || !strlen(path))) { LOGERR("Attempted to send message %s to null path in send_proc", msg ? msg : ""); goto out; @@ -604,16 +586,14 @@ void *async_send_proc(void *arg) } /* At startup the pid fields are not set up before some processes are * forked so they never inherit them. */ - if (unlikely(!pi->pid)) { + if (unlikely(!pi->pid)) pi->pid = get_proc_pid(pi); - if (!pi->pid) { - LOGALERT("Attempting to send message %s to non existent process %s", msg, pi->processname); - goto out_nofail; - } + if (!pi->pid) { + LOGALERT("Attempting to send message %s to non existent process %s", msg, pi->processname); + return; } if (unlikely(kill_pid(pi->pid, 0))) { - LOGALERT("Attempting to send message %s to non existent process %s pid %d", - msg, pi->processname, pi->pid); + LOGALERT("Attempting to send message %s to non existent process %s", msg, pi->processname); goto out; } sockd = open_unix_client(path); @@ -633,25 +613,6 @@ out: LOGERR("Failure in send_proc from %s %s:%d", file, func, line); childsighandler(15); } -out_nofail: - free(msg); - free(pm); - return NULL; -} - -/* Send a single message to a process instance when there will be no response, - * closing the socket immediately. */ -void _send_proc(proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line) -{ - struct proc_message *pm = ckalloc(sizeof(struct proc_message)); - pthread_t pth; - - pm->pi = pi; - pm->msg = strdup(msg); - pm->file = file; - pm->func = func; - pm->line = line; - create_pthread(&pth, async_send_proc, pm); } /* Send a single message to a process instance and retrieve the response, then From 2865a0378fe7e60b5ac9dcc43f6f99421c4bb068 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Mon, 23 Feb 2015 11:15:44 +1100 Subject: [PATCH 26/35] Keep track of per process ckwqs in the ckpool structure --- src/ckpool.h | 2 ++ src/stratifier.c | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/src/ckpool.h b/src/ckpool.h index eaf166fd..a734258a 100644 --- a/src/ckpool.h +++ b/src/ckpool.h @@ -213,6 +213,8 @@ struct ckpool_instance { /* Private data for each process */ void *data; + /* Private generic workqueues if this process has them */ + ckwq_t *ckwqs; }; #ifdef USE_CKDB diff --git a/src/stratifier.c b/src/stratifier.c index 2956ad36..94c70c24 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -4394,7 +4394,7 @@ int stratifier(proc_instance_t *pi) sdata->ssends = create_ckmsgq(ckp, "ssender", &ssend_process); /* Create as many generic workqueue threads as there are CPUs */ threads = sysconf(_SC_NPROCESSORS_ONLN); - sdata->ckwqs = create_ckwqs(ckp, "strat", threads); + ckp->ckwqs = sdata->ckwqs = create_ckwqs(ckp, "strat", threads); sdata->sauthq = create_ckmsgq(ckp, "authoriser", &sauth_process); if (!CKP_STANDALONE(ckp)) { sdata->ckdbq = create_ckmsgq(ckp, "ckdbqueue", &ckdbq_process); From 428cabdfc4c8fe0cf3be17aef5033295eeffb50f Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Mon, 23 Feb 2015 11:26:10 +1100 Subject: [PATCH 27/35] Add an asynchronous send proc function which uses each process' generic workqueues if they exist --- src/ckpool.c | 35 +++++++++++++++++++++++++++++++++++ src/ckpool.h | 2 ++ 2 files changed, 37 insertions(+) diff --git a/src/ckpool.c b/src/ckpool.c index 2108e515..1d676aa7 100644 --- a/src/ckpool.c +++ b/src/ckpool.c @@ -615,6 +615,41 @@ out: } } +struct proc_message { + proc_instance_t *pi; + char *msg; + const char *file; + const char *func; + int line; +}; + +static void asp_send(ckpool_t __maybe_unused *ckp, struct proc_message *pm) +{ + _send_proc(pm->pi, pm->msg, pm->file, pm->func, pm->line); + free(pm->msg); + free(pm); +} + +/* Fore sending asynchronous messages to another process, the sending process + * must have ckwqs of its own, referenced in the ckpool structure */ +void _async_send_proc(ckpool_t *ckp, proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line) +{ + struct proc_message *pm; + + if (unlikely(!ckp->ckwqs)) { + LOGALERT("Workqueues not set up in async_send_proc!"); + _send_proc(pi, msg, file, func, line); + return; + } + pm = ckzalloc(sizeof(struct proc_message)); + pm->pi = pi; + pm->msg = strdup(msg); + pm->file = file; + pm->func = func; + pm->line = line; + ckwq_add(ckp->ckwqs, &asp_send, pm); +} + /* Send a single message to a process instance and retrieve the response, then * close the socket. */ char *_send_recv_proc(proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line) diff --git a/src/ckpool.h b/src/ckpool.h index a734258a..74dc5e3e 100644 --- a/src/ckpool.h +++ b/src/ckpool.h @@ -239,6 +239,8 @@ void empty_buffer(connsock_t *cs); int read_socket_line(connsock_t *cs, const int timeout); void _send_proc(proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line); #define send_proc(pi, msg) _send_proc(pi, msg, __FILE__, __func__, __LINE__) +void _async_send_proc(ckpool_t *ckp, proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line); +#define async_send_proc(ckp, pi, msg) _async_send_proc(ckp, pi, msg, __FILE__, __func__, __LINE__) char *_send_recv_proc(proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line); #define send_recv_proc(pi, msg) _send_recv_proc(pi, msg, __FILE__, __func__, __LINE__) char *_send_recv_ckdb(const ckpool_t *ckp, const char *msg, const char *file, const char *func, const int line); From cabc01d7cda3e674ec5a6e8e1a7e9d83b870098e Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Mon, 23 Feb 2015 11:36:46 +1100 Subject: [PATCH 28/35] Use asynchronous send_proc in the stratifier --- src/stratifier.c | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/src/stratifier.c b/src/stratifier.c index 94c70c24..c844c1dd 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -795,7 +795,7 @@ static void send_generator(ckpool_t *ckp, const char *msg, const int prio) set = true; } else set = false; - send_proc(ckp->generator, msg); + async_send_proc(ckp, ckp->generator, msg); if (set) sdata->gen_priority = 0; } @@ -928,7 +928,7 @@ static void drop_allclients(ckpool_t *ckp) client->dropped = true; kills++; sprintf(buf, "dropclient=%"PRId64, client_id); - send_proc(ckp->connector, buf); + async_send_proc(ckp, ckp->connector, buf); } HASH_ITER(hh, sdata->disconnected_instances, client, tmp) { disconnects++; @@ -3287,6 +3287,7 @@ static void send_transactions(ckpool_t *ckp, json_params_t *jp); static void parse_method(sdata_t *sdata, stratum_instance_t *client, const int64_t client_id, json_t *id_val, json_t *method_val, json_t *params_val, const char *address) { + ckpool_t *ckp = client->ckp; const char *method; char buf[256]; @@ -3332,7 +3333,7 @@ static void parse_method(sdata_t *sdata, stratum_instance_t *client, const int64 * to it since it's unauthorised. Set the flag just in case. */ client->authorised = false; snprintf(buf, 255, "passthrough=%"PRId64, client_id); - send_proc(client->ckp->connector, buf); + async_send_proc(ckp, client->ckp->connector, buf); return; } @@ -3340,7 +3341,7 @@ static void parse_method(sdata_t *sdata, stratum_instance_t *client, const int64 if (!client->subscribed) { LOGINFO("Dropping unsubscribed client %"PRId64, client_id); snprintf(buf, 255, "dropclient=%"PRId64, client_id); - send_proc(client->ckp->connector, buf); + async_send_proc(ckp, client->ckp->connector, buf); return; } @@ -3363,7 +3364,7 @@ static void parse_method(sdata_t *sdata, stratum_instance_t *client, const int64 * the stratum instance data. Clients will just reconnect. */ LOGINFO("Dropping unauthorised client %"PRId64, client_id); snprintf(buf, 255, "dropclient=%"PRId64, client_id); - send_proc(client->ckp->connector, buf); + async_send_proc(ckp, client->ckp->connector, buf); return; } @@ -3395,12 +3396,13 @@ static void parse_instance_msg(sdata_t *sdata, smsg_t *msg, stratum_instance_t * { json_t *val = msg->json_msg, *id_val, *method, *params; int64_t client_id = msg->client_id; + ckpool_t *ckp = client->ckp; char buf[256]; if (unlikely(client->reject == 2)) { LOGINFO("Dropping client %"PRId64" tagged for lazy invalidation", client_id); snprintf(buf, 255, "dropclient=%"PRId64, client_id); - send_proc(client->ckp->connector, buf); + async_send_proc(ckp, ckp->connector, buf); goto out; } @@ -3534,7 +3536,7 @@ static void ssend_process(ckpool_t *ckp, smsg_t *msg) * connector process to be delivered */ json_object_set_new_nocheck(msg->json_msg, "client_id", json_integer(msg->client_id)); s = json_dumps(msg->json_msg, 0); - send_proc(ckp->connector, s); + async_send_proc(ckp, ckp->connector, s); free(s); free_smsg(msg); } From a465e47f857058e2c6d7b2993d0b9869fed5e695 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Mon, 23 Feb 2015 11:40:02 +1100 Subject: [PATCH 29/35] Use async send proc in the connector --- src/connector.c | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/src/connector.c b/src/connector.c index f6d9f1ab..ece13099 100644 --- a/src/connector.c +++ b/src/connector.c @@ -97,6 +97,8 @@ struct connector_data { /* For protecting the pending sends list */ mutex_t sender_lock; pthread_cond_t sender_cond; + + ckwq_t *ckwqs; }; typedef struct connector_data cdata_t; @@ -242,7 +244,7 @@ static void stratifier_drop_client(ckpool_t *ckp, int64_t id) char buf[256]; sprintf(buf, "dropclient=%"PRId64, id); - send_proc(ckp->stratifier, buf); + async_send_proc(ckp, ckp->stratifier, buf); } /* Invalidate this instance. Remove them from the hashtables we look up @@ -361,9 +363,9 @@ reparse: * filtered by the stratifier. */ if (likely(client->fd != -1)) { if (ckp->passthrough) - send_proc(ckp->generator, s); + async_send_proc(ckp, ckp->generator, s); else - send_proc(ckp->stratifier, s); + async_send_proc(ckp, ckp->stratifier, s); } free(s); @@ -861,6 +863,8 @@ int connector(proc_instance_t *pi) LOGWARNING("%s connector starting", ckp->name); ckp->data = cdata; cdata->ckp = ckp; + /* Connector only requires one work queue */ + ckp->ckwqs = cdata->ckwqs = create_ckwqs(ckp, "conn", 1); if (!ckp->serverurls) cdata->serverfd = ckalloc(sizeof(int *)); From cada930aa8b612535d47616a07bc8e9288796520 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Mon, 23 Feb 2015 11:42:39 +1100 Subject: [PATCH 30/35] Use async senc proc in the generator --- src/generator.c | 37 ++++++++++++++++++++----------------- 1 file changed, 20 insertions(+), 17 deletions(-) diff --git a/src/generator.c b/src/generator.c index 90f054f4..233f25e6 100644 --- a/src/generator.c +++ b/src/generator.c @@ -132,6 +132,7 @@ struct generator_data { proxy_instance_t *proxy_list; /* Linked list of all active proxies */ int proxy_notify_id; // Globally increasing notify id ckmsgq_t *srvchk; // Server check message queue + ckwq_t *ckwqs; }; typedef struct generator_data gdata_t; @@ -221,7 +222,7 @@ retry: cs = &alive->cs; LOGINFO("Connected to live server %s:%s", cs->url, cs->port); out: - send_proc(ckp->connector, alive ? "accept" : "reject"); + async_send_proc(ckp, ckp->connector, alive ? "accept" : "reject"); return alive; } @@ -367,7 +368,7 @@ retry: ret = submit_block(cs, buf + 12 + 64 + 1); memset(buf + 12 + 64, 0, 1); sprintf(blockmsg, "%sblock:%s", ret ? "" : "no", buf + 12); - send_proc(ckp->stratifier, blockmsg); + async_send_proc(ckp, ckp->stratifier, blockmsg); } else if (cmdmatch(buf, "checkaddr:")) { if (validate_address(cs, buf + 10)) send_unix_msg(sockd, "true"); @@ -1297,22 +1298,22 @@ static void *proxy_recv(void *arg) if (ret < 1) { /* Send ourselves a reconnect message */ LOGWARNING("Failed to read_socket_line in proxy_recv, attempting reconnect"); - send_proc(ckp->generator, "reconnect"); + async_send_proc(ckp, ckp->generator, "reconnect"); break; } if (parse_method(proxi, cs->buf)) { if (proxi->notified) { - send_proc(ckp->stratifier, "notify"); + async_send_proc(ckp, ckp->stratifier, "notify"); proxi->notified = false; } if (proxi->diffed) { - send_proc(ckp->stratifier, "diff"); + async_send_proc(ckp, ckp->stratifier, "diff"); proxi->diffed = false; } if (proxi->reconnect) { proxi->reconnect = false; LOGWARNING("Reconnect issue, dropping existing connection"); - send_proc(ckp->generator, "reconnect"); + async_send_proc(ckp, ckp->generator, "reconnect"); break; } continue; @@ -1402,13 +1403,13 @@ static void *passthrough_recv(void *arg) if (ret < 1) { /* Send ourselves a reconnect message */ LOGWARNING("Failed to read_socket_line in proxy_recv, attempting reconnect"); - send_proc(ckp->generator, "reconnect"); + async_send_proc(ckp, ckp->generator, "reconnect"); break; } /* Simply forward the message on, as is, to the connector to * process. Possibly parse parameters sent by upstream pool * here */ - send_proc(ckp->connector, cs->buf); + async_send_proc(ckp, ckp->connector, cs->buf); } return NULL; } @@ -1523,8 +1524,8 @@ retry: } } if (!alive) { - send_proc(ckp->connector, "reject"); - send_proc(ckp->stratifier, "dropall"); + async_send_proc(ckp, ckp->connector, "reject"); + async_send_proc(ckp, ckp->stratifier, "dropall"); LOGWARNING("Failed to connect to any servers as proxy, retrying in 5s!"); sleep(5); goto retry; @@ -1543,7 +1544,7 @@ retry: create_pthread(&alive->pth_psend, proxy_send, alive); } out: - send_proc(ckp->connector, alive ? "accept" : "reject"); + async_send_proc(ckp, ckp->connector, alive ? "accept" : "reject"); return alive; } @@ -1552,8 +1553,8 @@ static void kill_proxy(ckpool_t *ckp, proxy_instance_t *proxi) notify_instance_t *ni, *tmp; connsock_t *cs; - send_proc(ckp->stratifier, "reconnect"); - send_proc(ckp->connector, "reject"); + async_send_proc(ckp, ckp->stratifier, "reconnect"); + async_send_proc(ckp, ckp->connector, "reject"); if (!proxi) // This shouldn't happen return; @@ -1603,8 +1604,8 @@ reconnect: /* We've just subscribed and authorised so tell the stratifier to * retrieve the first subscription. */ if (!ckp->passthrough) { - send_proc(ckp->stratifier, "subscribe"); - send_proc(ckp->stratifier, "notify"); + async_send_proc(ckp, ckp->stratifier, "subscribe"); + async_send_proc(ckp, ckp->stratifier, "notify"); proxi->notified = false; } @@ -1792,7 +1793,7 @@ static void server_watchdog(ckpool_t *ckp, server_instance_t *cursi) break; } if (alive) - send_proc(ckp->generator, "reconnect"); + async_send_proc(ckp, ckp->generator, "reconnect"); } static void proxy_watchdog(ckpool_t *ckp, server_instance_t *cursi) @@ -1838,7 +1839,7 @@ static void proxy_watchdog(ckpool_t *ckp, server_instance_t *cursi) break; } if (alive) - send_proc(ckp->generator, "reconnect"); + async_send_proc(ckp, ckp->generator, "reconnect"); } @@ -1851,6 +1852,8 @@ int generator(proc_instance_t *pi) LOGWARNING("%s generator starting", ckp->name); gdata = ckzalloc(sizeof(gdata_t)); ckp->data = gdata; + /* Generator only requires one work queue */ + ckp->ckwqs = gdata->ckwqs = create_ckwqs(ckp, "gen", 1); if (ckp->proxy) { gdata->srvchk = create_ckmsgq(ckp, "prxchk", &proxy_watchdog); ret = proxy_mode(ckp, pi); From 862e3df8215c68c9c1420decdd3e45d14cd84f5e Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Mon, 23 Feb 2015 17:07:34 +1100 Subject: [PATCH 31/35] Fix use of ckwq variables --- src/ckpool.c | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/src/ckpool.c b/src/ckpool.c index 1d676aa7..cf79794f 100644 --- a/src/ckpool.c +++ b/src/ckpool.c @@ -137,27 +137,27 @@ static void *ckmsg_queue(void *arg) /* Generic workqueue function and message receiving and parsing thread */ static void *ckwq_queue(void *arg) { - ckwq_t *ckmsgq = (ckwq_t *)arg; - ckpool_t *ckp = ckmsgq->ckp; + ckwq_t *ckwq = (ckwq_t *)arg; + ckpool_t *ckp = ckwq->ckp; pthread_detach(pthread_self()); - rename_proc(ckmsgq->name); + rename_proc(ckwq->name); while (42) { ckwqmsg_t *wqmsg; tv_t now; ts_t abs; - mutex_lock(ckmsgq->lock); + mutex_lock(ckwq->lock); tv_time(&now); tv_to_ts(&abs, &now); abs.tv_sec++; - if (!ckmsgq->wqmsgs) - cond_timedwait(ckmsgq->cond, ckmsgq->lock, &abs); - wqmsg = ckmsgq->wqmsgs; + if (!ckwq->wqmsgs) + cond_timedwait(ckwq->cond, ckwq->lock, &abs); + wqmsg = ckwq->wqmsgs; if (wqmsg) - DL_DELETE(ckmsgq->wqmsgs, wqmsg); - mutex_unlock(ckmsgq->lock); + DL_DELETE(ckwq->wqmsgs, wqmsg); + mutex_unlock(ckwq->lock); if (!wqmsg) continue; @@ -209,7 +209,7 @@ ckmsgq_t *create_ckmsgqs(ckpool_t *ckp, const char *name, const void *func, cons ckwq_t *create_ckwqs(ckpool_t *ckp, const char *name, const int count) { - ckwq_t *ckwq = ckzalloc(sizeof(ckmsgq_t) * count); + ckwq_t *ckwq = ckzalloc(sizeof(ckwq_t) * count); mutex_t *lock; pthread_cond_t *cond; int i; From 21194cfbb427b6d6b75e43fefc6ea7e4af4f0d6c Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Mon, 23 Feb 2015 20:21:05 +1100 Subject: [PATCH 32/35] Count dropped workers in _dec_instance_ref Conflicts: src/stratifier.c --- src/stratifier.c | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/src/stratifier.c b/src/stratifier.c index c844c1dd..5752ef2f 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -1223,11 +1223,15 @@ static void client_drop_message(const int64_t client_id, const int dropped, cons } } +static void dec_worker(ckpool_t *ckp, user_instance_t *instance); + /* Decrease the reference count of instance. */ static void _dec_instance_ref(sdata_t *sdata, stratum_instance_t *client, const char *file, const char *func, const int line) { + user_instance_t *user = client->user_instance; int64_t client_id = client->id; + ckpool_t *ckp = client->ckp; int dropped = 0, ref; ck_wlock(&sdata->instance_lock); @@ -1235,7 +1239,7 @@ static void _dec_instance_ref(sdata_t *sdata, stratum_instance_t *client, const /* See if there are any instances that were dropped that could not be * moved due to holding a reference and drop them now. */ if (unlikely(client->dropped && !ref)) - dropped = __drop_client(sdata, client, client->user_instance); + dropped = __drop_client(sdata, client, user); ck_wunlock(&sdata->instance_lock); client_drop_message(client_id, dropped, true); @@ -1243,6 +1247,9 @@ static void _dec_instance_ref(sdata_t *sdata, stratum_instance_t *client, const /* This should never happen */ if (unlikely(ref < 0)) LOGERR("Instance ref count dropped below zero from %s %s:%d", file, func, line); + + if (dropped) + dec_worker(ckp, user); } #define dec_instance_ref(sdata, instance) _dec_instance_ref(sdata, instance, __FILE__, __func__, __LINE__) From 7fa3dc25298d648b113b7db001d5fcf7c32abf9d Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Tue, 24 Feb 2015 10:32:51 +1100 Subject: [PATCH 33/35] Only dec worker if user exists --- src/stratifier.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/stratifier.c b/src/stratifier.c index 5752ef2f..b46bafc0 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -1248,7 +1248,7 @@ static void _dec_instance_ref(sdata_t *sdata, stratum_instance_t *client, const if (unlikely(ref < 0)) LOGERR("Instance ref count dropped below zero from %s %s:%d", file, func, line); - if (dropped) + if (dropped && user) dec_worker(ckp, user); } From d4304de798ea8d86d77c07f3b0fac0f03c7d3086 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Tue, 24 Feb 2015 23:46:34 +1100 Subject: [PATCH 34/35] These weren't the droids we were looking for. --- src/ckpool.c | 142 +++++++++++++---------------------------------- src/ckpool.h | 33 +---------- src/connector.c | 10 +--- src/generator.c | 37 ++++++------ src/stratifier.c | 95 +++++++++++++++---------------- 5 files changed, 109 insertions(+), 208 deletions(-) diff --git a/src/ckpool.c b/src/ckpool.c index cf79794f..a1c4a514 100644 --- a/src/ckpool.c +++ b/src/ckpool.c @@ -134,39 +134,6 @@ static void *ckmsg_queue(void *arg) return NULL; } -/* Generic workqueue function and message receiving and parsing thread */ -static void *ckwq_queue(void *arg) -{ - ckwq_t *ckwq = (ckwq_t *)arg; - ckpool_t *ckp = ckwq->ckp; - - pthread_detach(pthread_self()); - rename_proc(ckwq->name); - - while (42) { - ckwqmsg_t *wqmsg; - tv_t now; - ts_t abs; - - mutex_lock(ckwq->lock); - tv_time(&now); - tv_to_ts(&abs, &now); - abs.tv_sec++; - if (!ckwq->wqmsgs) - cond_timedwait(ckwq->cond, ckwq->lock, &abs); - wqmsg = ckwq->wqmsgs; - if (wqmsg) - DL_DELETE(ckwq->wqmsgs, wqmsg); - mutex_unlock(ckwq->lock); - - if (!wqmsg) - continue; - wqmsg->func(ckp, wqmsg->data); - free(wqmsg); - } - return NULL; -} - ckmsgq_t *create_ckmsgq(ckpool_t *ckp, const char *name, const void *func) { ckmsgq_t *ckmsgq = ckzalloc(sizeof(ckmsgq_t)); @@ -207,29 +174,6 @@ ckmsgq_t *create_ckmsgqs(ckpool_t *ckp, const char *name, const void *func, cons return ckmsgq; } -ckwq_t *create_ckwqs(ckpool_t *ckp, const char *name, const int count) -{ - ckwq_t *ckwq = ckzalloc(sizeof(ckwq_t) * count); - mutex_t *lock; - pthread_cond_t *cond; - int i; - - lock = ckalloc(sizeof(mutex_t)); - cond = ckalloc(sizeof(pthread_cond_t)); - mutex_init(lock); - cond_init(cond); - - for (i = 0; i < count; i++) { - snprintf(ckwq[i].name, 15, "%.6swq%d", name, i); - ckwq[i].ckp = ckp; - ckwq[i].lock = lock; - ckwq[i].cond = cond; - create_pthread(&ckwq[i].pth, ckwq_queue, &ckwq[i]); - } - - return ckwq; -} - /* Generic function for adding messages to a ckmsgq linked list and signal the * ckmsgq parsing thread to wake up and process it. */ void ckmsgq_add(ckmsgq_t *ckmsgq, void *data) @@ -241,24 +185,10 @@ void ckmsgq_add(ckmsgq_t *ckmsgq, void *data) mutex_lock(ckmsgq->lock); ckmsgq->messages++; DL_APPEND(ckmsgq->msgs, msg); - pthread_cond_broadcast(ckmsgq->cond); + pthread_cond_signal(ckmsgq->cond); mutex_unlock(ckmsgq->lock); } -void ckwq_add(ckwq_t *ckwq, const void *func, void *data) -{ - ckwqmsg_t *wqmsg = ckalloc(sizeof(ckwqmsg_t)); - - wqmsg->func = func; - wqmsg->data = data; - - mutex_lock(ckwq->lock); - ckwq->messages++; - DL_APPEND(ckwq->wqmsgs, wqmsg); - pthread_cond_broadcast(ckwq->cond); - mutex_unlock(ckwq->lock); -} - /* Return whether there are any messages queued in the ckmsgq linked list. */ bool ckmsgq_empty(ckmsgq_t *ckmsgq) { @@ -568,14 +498,32 @@ out: static void childsighandler(const int sig); -/* Send a single message to a process instance when there will be no response, - * closing the socket immediately. */ -void _send_proc(proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line) +struct proc_message { + proc_instance_t *pi; + char *msg; + const char *file; + const char *func; + int line; +}; + +/* Send all one way messages asynchronously so we can wait till the receiving + * end closes the socket to ensure all messages are received but no deadlocks + * can occur with 2 processes waiting for each other's socket closure. */ +void *async_send_proc(void *arg) { + struct proc_message *pm = (struct proc_message *)arg; + proc_instance_t *pi = pm->pi; + char *msg = pm->msg; + const char *file = pm->file; + const char *func = pm->func; + int line = pm->line; + char *path = pi->us.path; bool ret = false; int sockd; + pthread_detach(pthread_self()); + if (unlikely(!path || !strlen(path))) { LOGERR("Attempted to send message %s to null path in send_proc", msg ? msg : ""); goto out; @@ -586,14 +534,16 @@ void _send_proc(proc_instance_t *pi, const char *msg, const char *file, const ch } /* At startup the pid fields are not set up before some processes are * forked so they never inherit them. */ - if (unlikely(!pi->pid)) + if (unlikely(!pi->pid)) { pi->pid = get_proc_pid(pi); - if (!pi->pid) { - LOGALERT("Attempting to send message %s to non existent process %s", msg, pi->processname); - return; + if (!pi->pid) { + LOGALERT("Attempting to send message %s to non existent process %s", msg, pi->processname); + goto out_nofail; + } } if (unlikely(kill_pid(pi->pid, 0))) { - LOGALERT("Attempting to send message %s to non existent process %s", msg, pi->processname); + LOGALERT("Attempting to send message %s to non existent process %s pid %d", + msg, pi->processname, pi->pid); goto out; } sockd = open_unix_client(path); @@ -613,41 +563,25 @@ out: LOGERR("Failure in send_proc from %s %s:%d", file, func, line); childsighandler(15); } -} - -struct proc_message { - proc_instance_t *pi; - char *msg; - const char *file; - const char *func; - int line; -}; - -static void asp_send(ckpool_t __maybe_unused *ckp, struct proc_message *pm) -{ - _send_proc(pm->pi, pm->msg, pm->file, pm->func, pm->line); - free(pm->msg); +out_nofail: + free(msg); free(pm); + return NULL; } -/* Fore sending asynchronous messages to another process, the sending process - * must have ckwqs of its own, referenced in the ckpool structure */ -void _async_send_proc(ckpool_t *ckp, proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line) +/* Send a single message to a process instance when there will be no response, + * closing the socket immediately. */ +void _send_proc(proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line) { - struct proc_message *pm; + struct proc_message *pm = ckalloc(sizeof(struct proc_message)); + pthread_t pth; - if (unlikely(!ckp->ckwqs)) { - LOGALERT("Workqueues not set up in async_send_proc!"); - _send_proc(pi, msg, file, func, line); - return; - } - pm = ckzalloc(sizeof(struct proc_message)); pm->pi = pi; pm->msg = strdup(msg); pm->file = file; pm->func = func; pm->line = line; - ckwq_add(ckp->ckwqs, &asp_send, pm); + create_pthread(&pth, async_send_proc, pm); } /* Send a single message to a process instance and retrieve the response, then diff --git a/src/ckpool.h b/src/ckpool.h index 74dc5e3e..771ac692 100644 --- a/src/ckpool.h +++ b/src/ckpool.h @@ -22,22 +22,13 @@ struct ckpool_instance; typedef struct ckpool_instance ckpool_t; -typedef struct ckmsg ckmsg_t; - struct ckmsg { - ckmsg_t *next; - ckmsg_t *prev; + struct ckmsg *next; + struct ckmsg *prev; void *data; }; -typedef struct ckwqmsg ckwqmsg_t; - -struct ckwqmsg { - ckwqmsg_t *next; - ckwqmsg_t *prev; - void *data; - void (*func)(ckpool_t *, void *); -}; +typedef struct ckmsg ckmsg_t; struct ckmsgq { ckpool_t *ckp; @@ -52,18 +43,6 @@ struct ckmsgq { typedef struct ckmsgq ckmsgq_t; -struct ckwq { - ckpool_t *ckp; - char name[16]; - pthread_t pth; - mutex_t *lock; - pthread_cond_t *cond; - ckwqmsg_t *wqmsgs; - int64_t messages; -}; - -typedef struct ckwq ckwq_t; - struct proc_instance { ckpool_t *ckp; unixsock_t us; @@ -213,8 +192,6 @@ struct ckpool_instance { /* Private data for each process */ void *data; - /* Private generic workqueues if this process has them */ - ckwq_t *ckwqs; }; #ifdef USE_CKDB @@ -227,9 +204,7 @@ struct ckpool_instance { ckmsgq_t *create_ckmsgq(ckpool_t *ckp, const char *name, const void *func); ckmsgq_t *create_ckmsgqs(ckpool_t *ckp, const char *name, const void *func, const int count); -ckwq_t *create_ckwqs(ckpool_t *ckp, const char *name, const int count); void ckmsgq_add(ckmsgq_t *ckmsgq, void *data); -void ckwq_add(ckwq_t *ckwq, const void *func, void *data); bool ckmsgq_empty(ckmsgq_t *ckmsgq); ckpool_t *global_ckp; @@ -239,8 +214,6 @@ void empty_buffer(connsock_t *cs); int read_socket_line(connsock_t *cs, const int timeout); void _send_proc(proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line); #define send_proc(pi, msg) _send_proc(pi, msg, __FILE__, __func__, __LINE__) -void _async_send_proc(ckpool_t *ckp, proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line); -#define async_send_proc(ckp, pi, msg) _async_send_proc(ckp, pi, msg, __FILE__, __func__, __LINE__) char *_send_recv_proc(proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line); #define send_recv_proc(pi, msg) _send_recv_proc(pi, msg, __FILE__, __func__, __LINE__) char *_send_recv_ckdb(const ckpool_t *ckp, const char *msg, const char *file, const char *func, const int line); diff --git a/src/connector.c b/src/connector.c index ece13099..f6d9f1ab 100644 --- a/src/connector.c +++ b/src/connector.c @@ -97,8 +97,6 @@ struct connector_data { /* For protecting the pending sends list */ mutex_t sender_lock; pthread_cond_t sender_cond; - - ckwq_t *ckwqs; }; typedef struct connector_data cdata_t; @@ -244,7 +242,7 @@ static void stratifier_drop_client(ckpool_t *ckp, int64_t id) char buf[256]; sprintf(buf, "dropclient=%"PRId64, id); - async_send_proc(ckp, ckp->stratifier, buf); + send_proc(ckp->stratifier, buf); } /* Invalidate this instance. Remove them from the hashtables we look up @@ -363,9 +361,9 @@ reparse: * filtered by the stratifier. */ if (likely(client->fd != -1)) { if (ckp->passthrough) - async_send_proc(ckp, ckp->generator, s); + send_proc(ckp->generator, s); else - async_send_proc(ckp, ckp->stratifier, s); + send_proc(ckp->stratifier, s); } free(s); @@ -863,8 +861,6 @@ int connector(proc_instance_t *pi) LOGWARNING("%s connector starting", ckp->name); ckp->data = cdata; cdata->ckp = ckp; - /* Connector only requires one work queue */ - ckp->ckwqs = cdata->ckwqs = create_ckwqs(ckp, "conn", 1); if (!ckp->serverurls) cdata->serverfd = ckalloc(sizeof(int *)); diff --git a/src/generator.c b/src/generator.c index 233f25e6..90f054f4 100644 --- a/src/generator.c +++ b/src/generator.c @@ -132,7 +132,6 @@ struct generator_data { proxy_instance_t *proxy_list; /* Linked list of all active proxies */ int proxy_notify_id; // Globally increasing notify id ckmsgq_t *srvchk; // Server check message queue - ckwq_t *ckwqs; }; typedef struct generator_data gdata_t; @@ -222,7 +221,7 @@ retry: cs = &alive->cs; LOGINFO("Connected to live server %s:%s", cs->url, cs->port); out: - async_send_proc(ckp, ckp->connector, alive ? "accept" : "reject"); + send_proc(ckp->connector, alive ? "accept" : "reject"); return alive; } @@ -368,7 +367,7 @@ retry: ret = submit_block(cs, buf + 12 + 64 + 1); memset(buf + 12 + 64, 0, 1); sprintf(blockmsg, "%sblock:%s", ret ? "" : "no", buf + 12); - async_send_proc(ckp, ckp->stratifier, blockmsg); + send_proc(ckp->stratifier, blockmsg); } else if (cmdmatch(buf, "checkaddr:")) { if (validate_address(cs, buf + 10)) send_unix_msg(sockd, "true"); @@ -1298,22 +1297,22 @@ static void *proxy_recv(void *arg) if (ret < 1) { /* Send ourselves a reconnect message */ LOGWARNING("Failed to read_socket_line in proxy_recv, attempting reconnect"); - async_send_proc(ckp, ckp->generator, "reconnect"); + send_proc(ckp->generator, "reconnect"); break; } if (parse_method(proxi, cs->buf)) { if (proxi->notified) { - async_send_proc(ckp, ckp->stratifier, "notify"); + send_proc(ckp->stratifier, "notify"); proxi->notified = false; } if (proxi->diffed) { - async_send_proc(ckp, ckp->stratifier, "diff"); + send_proc(ckp->stratifier, "diff"); proxi->diffed = false; } if (proxi->reconnect) { proxi->reconnect = false; LOGWARNING("Reconnect issue, dropping existing connection"); - async_send_proc(ckp, ckp->generator, "reconnect"); + send_proc(ckp->generator, "reconnect"); break; } continue; @@ -1403,13 +1402,13 @@ static void *passthrough_recv(void *arg) if (ret < 1) { /* Send ourselves a reconnect message */ LOGWARNING("Failed to read_socket_line in proxy_recv, attempting reconnect"); - async_send_proc(ckp, ckp->generator, "reconnect"); + send_proc(ckp->generator, "reconnect"); break; } /* Simply forward the message on, as is, to the connector to * process. Possibly parse parameters sent by upstream pool * here */ - async_send_proc(ckp, ckp->connector, cs->buf); + send_proc(ckp->connector, cs->buf); } return NULL; } @@ -1524,8 +1523,8 @@ retry: } } if (!alive) { - async_send_proc(ckp, ckp->connector, "reject"); - async_send_proc(ckp, ckp->stratifier, "dropall"); + send_proc(ckp->connector, "reject"); + send_proc(ckp->stratifier, "dropall"); LOGWARNING("Failed to connect to any servers as proxy, retrying in 5s!"); sleep(5); goto retry; @@ -1544,7 +1543,7 @@ retry: create_pthread(&alive->pth_psend, proxy_send, alive); } out: - async_send_proc(ckp, ckp->connector, alive ? "accept" : "reject"); + send_proc(ckp->connector, alive ? "accept" : "reject"); return alive; } @@ -1553,8 +1552,8 @@ static void kill_proxy(ckpool_t *ckp, proxy_instance_t *proxi) notify_instance_t *ni, *tmp; connsock_t *cs; - async_send_proc(ckp, ckp->stratifier, "reconnect"); - async_send_proc(ckp, ckp->connector, "reject"); + send_proc(ckp->stratifier, "reconnect"); + send_proc(ckp->connector, "reject"); if (!proxi) // This shouldn't happen return; @@ -1604,8 +1603,8 @@ reconnect: /* We've just subscribed and authorised so tell the stratifier to * retrieve the first subscription. */ if (!ckp->passthrough) { - async_send_proc(ckp, ckp->stratifier, "subscribe"); - async_send_proc(ckp, ckp->stratifier, "notify"); + send_proc(ckp->stratifier, "subscribe"); + send_proc(ckp->stratifier, "notify"); proxi->notified = false; } @@ -1793,7 +1792,7 @@ static void server_watchdog(ckpool_t *ckp, server_instance_t *cursi) break; } if (alive) - async_send_proc(ckp, ckp->generator, "reconnect"); + send_proc(ckp->generator, "reconnect"); } static void proxy_watchdog(ckpool_t *ckp, server_instance_t *cursi) @@ -1839,7 +1838,7 @@ static void proxy_watchdog(ckpool_t *ckp, server_instance_t *cursi) break; } if (alive) - async_send_proc(ckp, ckp->generator, "reconnect"); + send_proc(ckp->generator, "reconnect"); } @@ -1852,8 +1851,6 @@ int generator(proc_instance_t *pi) LOGWARNING("%s generator starting", ckp->name); gdata = ckzalloc(sizeof(gdata_t)); ckp->data = gdata; - /* Generator only requires one work queue */ - ckp->ckwqs = gdata->ckwqs = create_ckwqs(ckp, "gen", 1); if (ckp->proxy) { gdata->srvchk = create_ckmsgq(ckp, "prxchk", &proxy_watchdog); ret = proxy_mode(ckp, pi); diff --git a/src/stratifier.c b/src/stratifier.c index b46bafc0..859edc6e 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -313,10 +313,12 @@ struct stratifier_data { char lasthash[68]; char lastswaphash[68]; - ckwq_t *ckwqs; // Generic workqueues ckmsgq_t *ssends; // Stratum sends + ckmsgq_t *srecvs; // Stratum receives ckmsgq_t *ckdbq; // ckdb + ckmsgq_t *sshareq; // Stratum share sends ckmsgq_t *sauthq; // Stratum authorisations + ckmsgq_t *stxnq; // Transaction requests int64_t user_instance_id; @@ -795,26 +797,38 @@ static void send_generator(ckpool_t *ckp, const char *msg, const int prio) set = true; } else set = false; - async_send_proc(ckp, ckp->generator, msg); + send_proc(ckp->generator, msg); if (set) sdata->gen_priority = 0; } +struct update_req { + pthread_t *pth; + ckpool_t *ckp; + int prio; +}; + static void broadcast_ping(sdata_t *sdata); /* This function assumes it will only receive a valid json gbt base template * since checking should have been done earlier, and creates the base template * for generating work templates. */ -static void do_update(ckpool_t *ckp, int *prio) +static void *do_update(void *arg) { + struct update_req *ur = (struct update_req *)arg; + ckpool_t *ckp = ur->ckp; sdata_t *sdata = ckp->data; bool new_block = false; + int prio = ur->prio; bool ret = false; workbase_t *wb; json_t *val; char *buf; - buf = send_recv_generator(ckp, "getbase", *prio); + pthread_detach(pthread_self()); + rename_proc("updater"); + + buf = send_recv_generator(ckp, "getbase", prio); if (unlikely(!buf)) { LOGNOTICE("Get base in update_base delayed due to higher priority request"); goto out; @@ -874,17 +888,21 @@ out: LOGINFO("Broadcast ping due to failed stratum base update"); broadcast_ping(sdata); } - free(buf); - free(prio); + dealloc(buf); + free(ur->pth); + free(ur); + return NULL; } static void update_base(ckpool_t *ckp, const int prio) { - int *pprio = ckalloc(sizeof(int)); - sdata_t *sdata = ckp->data; + struct update_req *ur = ckalloc(sizeof(struct update_req)); + pthread_t *pth = ckalloc(sizeof(pthread_t)); - *pprio = prio; - ckwq_add(sdata->ckwqs, &do_update, pprio); + ur->pth = pth; + ur->ckp = ckp; + ur->prio = prio; + create_pthread(pth, do_update, ur); } static void __kill_instance(stratum_instance_t *client) @@ -928,7 +946,7 @@ static void drop_allclients(ckpool_t *ckp) client->dropped = true; kills++; sprintf(buf, "dropclient=%"PRId64, client_id); - async_send_proc(ckp, ckp->connector, buf); + send_proc(ckp->connector, buf); } HASH_ITER(hh, sdata->disconnected_instances, client, tmp) { disconnects++; @@ -1633,21 +1651,6 @@ static void ckmsgq_stats(ckmsgq_t *ckmsgq, const int size, json_t **val) JSON_CPACK(*val, "{si,si,si}", "count", objects, "memory", memsize, "generated", generated); } -static void ckwq_stats(ckwq_t *ckwq, const int size, json_t **val) -{ - int objects, generated; - int64_t memsize; - ckwqmsg_t *wqmsg; - - mutex_lock(ckwq->lock); - DL_COUNT(ckwq->wqmsgs, wqmsg, objects); - generated = ckwq->messages; - mutex_unlock(ckwq->lock); - - memsize = (sizeof(ckwqmsg_t) + size) * objects; - JSON_CPACK(*val, "{si,si,si}", "count", objects, "memory", memsize, "generated", generated); -} - static char *stratifier_stats(ckpool_t *ckp, sdata_t *sdata) { json_t *val = json_object(), *subval; @@ -1694,14 +1697,15 @@ static char *stratifier_stats(ckpool_t *ckp, sdata_t *sdata) ckmsgq_stats(sdata->ssends, sizeof(smsg_t), &subval); json_set_object(val, "ssends", subval); - /* Don't know exactly how big the string is so just count the pointer for now */ - ckwq_stats(sdata->ckwqs, sizeof(char *) + sizeof(void *), &subval); - json_set_object(val, "ckwqs", subval); + ckmsgq_stats(sdata->srecvs, sizeof(char *), &subval); + json_set_object(val, "srecvs", subval); if (!CKP_STANDALONE(ckp)) { ckmsgq_stats(sdata->ckdbq, sizeof(char *), &subval); json_set_object(val, "ckdbq", subval); } + ckmsgq_stats(sdata->stxnq, sizeof(json_params_t), &subval); + json_set_object(val, "stxnq", subval); buf = json_dumps(val, JSON_NO_UTF8 | JSON_PRESERVE_ORDER); json_decref(val); @@ -1709,8 +1713,6 @@ static char *stratifier_stats(ckpool_t *ckp, sdata_t *sdata) return buf; } -static void srecv_process(ckpool_t *ckp, char *buf); - static int stratum_loop(ckpool_t *ckp, proc_instance_t *pi) { int sockd, ret = 0, selret = 0; @@ -1765,7 +1767,7 @@ retry: /* The bulk of the messages will be received json from the * connector so look for this first. The srecv_process frees * the buf heap ram */ - ckwq_add(sdata->ckwqs, &srecv_process, buf); + ckmsgq_add(sdata->srecvs, buf); Close(sockd); buf = NULL; goto retry; @@ -3287,14 +3289,10 @@ static void suggest_diff(stratum_instance_t *client, const char *method, const j stratum_send_diff(sdata, client); } -static void sshare_process(ckpool_t *ckp, json_params_t *jp); -static void send_transactions(ckpool_t *ckp, json_params_t *jp); - /* Enter with client holding ref count */ static void parse_method(sdata_t *sdata, stratum_instance_t *client, const int64_t client_id, json_t *id_val, json_t *method_val, json_t *params_val, const char *address) { - ckpool_t *ckp = client->ckp; const char *method; char buf[256]; @@ -3305,7 +3303,7 @@ static void parse_method(sdata_t *sdata, stratum_instance_t *client, const int64 if (likely(cmdmatch(method, "mining.submit") && client->authorised)) { json_params_t *jp = create_json_params(client_id, method_val, params_val, id_val, address); - ckwq_add(sdata->ckwqs, &sshare_process, jp); + ckmsgq_add(sdata->sshareq, jp); return; } @@ -3340,7 +3338,7 @@ static void parse_method(sdata_t *sdata, stratum_instance_t *client, const int64 * to it since it's unauthorised. Set the flag just in case. */ client->authorised = false; snprintf(buf, 255, "passthrough=%"PRId64, client_id); - async_send_proc(ckp, client->ckp->connector, buf); + send_proc(client->ckp->connector, buf); return; } @@ -3348,7 +3346,7 @@ static void parse_method(sdata_t *sdata, stratum_instance_t *client, const int64 if (!client->subscribed) { LOGINFO("Dropping unsubscribed client %"PRId64, client_id); snprintf(buf, 255, "dropclient=%"PRId64, client_id); - async_send_proc(ckp, client->ckp->connector, buf); + send_proc(client->ckp->connector, buf); return; } @@ -3371,7 +3369,7 @@ static void parse_method(sdata_t *sdata, stratum_instance_t *client, const int64 * the stratum instance data. Clients will just reconnect. */ LOGINFO("Dropping unauthorised client %"PRId64, client_id); snprintf(buf, 255, "dropclient=%"PRId64, client_id); - async_send_proc(ckp, client->ckp->connector, buf); + send_proc(client->ckp->connector, buf); return; } @@ -3384,7 +3382,7 @@ static void parse_method(sdata_t *sdata, stratum_instance_t *client, const int64 if (cmdmatch(method, "mining.get")) { json_params_t *jp = create_json_params(client_id, method_val, params_val, id_val, address); - ckwq_add(sdata->ckwqs, &send_transactions, jp); + ckmsgq_add(sdata->stxnq, jp); return; } /* Unhandled message here */ @@ -3403,13 +3401,12 @@ static void parse_instance_msg(sdata_t *sdata, smsg_t *msg, stratum_instance_t * { json_t *val = msg->json_msg, *id_val, *method, *params; int64_t client_id = msg->client_id; - ckpool_t *ckp = client->ckp; char buf[256]; if (unlikely(client->reject == 2)) { LOGINFO("Dropping client %"PRId64" tagged for lazy invalidation", client_id); snprintf(buf, 255, "dropclient=%"PRId64, client_id); - async_send_proc(ckp, ckp->connector, buf); + send_proc(client->ckp->connector, buf); goto out; } @@ -3543,7 +3540,7 @@ static void ssend_process(ckpool_t *ckp, smsg_t *msg) * connector process to be delivered */ json_object_set_new_nocheck(msg->json_msg, "client_id", json_integer(msg->client_id)); s = json_dumps(msg->json_msg, 0); - async_send_proc(ckp, ckp->connector, s); + send_proc(ckp->connector, s); free(s); free_smsg(msg); } @@ -4401,10 +4398,14 @@ int stratifier(proc_instance_t *pi) mutex_init(&sdata->ckdb_lock); sdata->ssends = create_ckmsgq(ckp, "ssender", &ssend_process); - /* Create as many generic workqueue threads as there are CPUs */ - threads = sysconf(_SC_NPROCESSORS_ONLN); - ckp->ckwqs = sdata->ckwqs = create_ckwqs(ckp, "strat", threads); + /* Create half as many share processing threads as there are CPUs */ + threads = sysconf(_SC_NPROCESSORS_ONLN) / 2 ? : 1; + sdata->sshareq = create_ckmsgqs(ckp, "sprocessor", &sshare_process, threads); + /* Create 1/4 as many stratum processing threads as there are CPUs */ + threads = threads / 2 ? : 1; + sdata->srecvs = create_ckmsgqs(ckp, "sreceiver", &srecv_process, threads); sdata->sauthq = create_ckmsgq(ckp, "authoriser", &sauth_process); + sdata->stxnq = create_ckmsgq(ckp, "stxnq", &send_transactions); if (!CKP_STANDALONE(ckp)) { sdata->ckdbq = create_ckmsgq(ckp, "ckdbqueue", &ckdbq_process); create_pthread(&pth_heartbeat, ckdb_heartbeat, ckp); From aa7c479c0302267b29059013cd2aea0c7242b032 Mon Sep 17 00:00:00 2001 From: ckolivas Date: Wed, 25 Feb 2015 15:08:41 +1100 Subject: [PATCH 35/35] Cope with null message in send_proc --- src/ckpool.c | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/src/ckpool.c b/src/ckpool.c index a1c4a514..1ffe45f4 100644 --- a/src/ckpool.c +++ b/src/ckpool.c @@ -528,10 +528,6 @@ void *async_send_proc(void *arg) LOGERR("Attempted to send message %s to null path in send_proc", msg ? msg : ""); goto out; } - if (unlikely(!msg || !strlen(msg))) { - LOGERR("Attempted to send null message to socket %s in send_proc", path); - goto out; - } /* At startup the pid fields are not set up before some processes are * forked so they never inherit them. */ if (unlikely(!pi->pid)) { @@ -573,9 +569,14 @@ out_nofail: * closing the socket immediately. */ void _send_proc(proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line) { - struct proc_message *pm = ckalloc(sizeof(struct proc_message)); + struct proc_message *pm; pthread_t pth; + if (unlikely(!msg || !strlen(msg))) { + LOGERR("Attempted to send null message to %s in send_proc", pi->processname); + return; + } + pm = ckalloc(sizeof(struct proc_message)); pm->pi = pi; pm->msg = strdup(msg); pm->file = file;