kanoi 10 years ago
parent
commit
221d0df3b4
  1. 2
      configure.ac
  2. 4
      src/ckdb.c
  3. 2
      src/ckdb.h
  4. 131
      src/ckpool.c
  5. 4
      src/ckpool.h
  6. 46
      src/connector.c
  7. 10
      src/generator.c
  8. 231
      src/libckpool.c
  9. 68
      src/libckpool.h
  10. 29
      src/stratifier.c

2
configure.ac

@ -1,4 +1,4 @@
AC_INIT(ckpool, 0.8.5, kernel@kolivas.org) AC_INIT(ckpool, 0.8.6, kernel@kolivas.org)
AC_CANONICAL_SYSTEM AC_CANONICAL_SYSTEM
AC_CONFIG_MACRO_DIR([m4]) AC_CONFIG_MACRO_DIR([m4])

4
src/ckdb.c

@ -301,7 +301,7 @@ K_STORE *logqueue_store;
// WORKQUEUE // WORKQUEUE
K_LIST *workqueue_free; K_LIST *workqueue_free;
K_STORE *workqueue_store; K_STORE *workqueue_store;
pthread_mutex_t wq_waitlock; mutex_t wq_waitlock;
pthread_cond_t wq_waitcond; pthread_cond_t wq_waitcond;
// HEARTBEATQUEUE // HEARTBEATQUEUE
@ -3286,7 +3286,7 @@ static void *listener(void *arg)
timeraddspec(&abs, &tsdiff); timeraddspec(&abs, &tsdiff);
mutex_lock(&wq_waitlock); mutex_lock(&wq_waitlock);
pthread_cond_timedwait(&wq_waitcond, &wq_waitlock, &abs); cond_timedwait(&wq_waitcond, &wq_waitlock, &abs);
mutex_unlock(&wq_waitlock); mutex_unlock(&wq_waitlock);
} }
} }

2
src/ckdb.h

@ -659,7 +659,7 @@ typedef struct workqueue {
extern K_LIST *workqueue_free; extern K_LIST *workqueue_free;
extern K_STORE *workqueue_store; extern K_STORE *workqueue_store;
extern pthread_mutex_t wq_waitlock; extern mutex_t wq_waitlock;
extern pthread_cond_t wq_waitcond; extern pthread_cond_t wq_waitcond;
// HEARTBEATQUEUE // HEARTBEATQUEUE

131
src/ckpool.c

@ -120,7 +120,7 @@ static void *ckmsg_queue(void *arg)
tv_to_ts(&abs, &now); tv_to_ts(&abs, &now);
abs.tv_sec++; abs.tv_sec++;
if (!ckmsgq->msgs) if (!ckmsgq->msgs)
pthread_cond_timedwait(ckmsgq->cond, ckmsgq->lock, &abs); cond_timedwait(ckmsgq->cond, ckmsgq->lock, &abs);
msg = ckmsgq->msgs; msg = ckmsgq->msgs;
if (msg) if (msg)
DL_DELETE(ckmsgq->msgs, msg); DL_DELETE(ckmsgq->msgs, msg);
@ -141,7 +141,7 @@ ckmsgq_t *create_ckmsgq(ckpool_t *ckp, const char *name, const void *func)
strncpy(ckmsgq->name, name, 15); strncpy(ckmsgq->name, name, 15);
ckmsgq->func = func; ckmsgq->func = func;
ckmsgq->ckp = ckp; ckmsgq->ckp = ckp;
ckmsgq->lock = ckalloc(sizeof(pthread_mutex_t)); ckmsgq->lock = ckalloc(sizeof(mutex_t));
ckmsgq->cond = ckalloc(sizeof(pthread_cond_t)); ckmsgq->cond = ckalloc(sizeof(pthread_cond_t));
mutex_init(ckmsgq->lock); mutex_init(ckmsgq->lock);
cond_init(ckmsgq->cond); cond_init(ckmsgq->cond);
@ -153,11 +153,11 @@ ckmsgq_t *create_ckmsgq(ckpool_t *ckp, const char *name, const void *func)
ckmsgq_t *create_ckmsgqs(ckpool_t *ckp, const char *name, const void *func, const int count) ckmsgq_t *create_ckmsgqs(ckpool_t *ckp, const char *name, const void *func, const int count)
{ {
ckmsgq_t *ckmsgq = ckzalloc(sizeof(ckmsgq_t) * count); ckmsgq_t *ckmsgq = ckzalloc(sizeof(ckmsgq_t) * count);
pthread_mutex_t *lock; mutex_t *lock;
pthread_cond_t *cond; pthread_cond_t *cond;
int i; int i;
lock = ckalloc(sizeof(pthread_mutex_t)); lock = ckalloc(sizeof(mutex_t));
cond = ckalloc(sizeof(pthread_cond_t)); cond = ckalloc(sizeof(pthread_cond_t));
mutex_init(lock); mutex_init(lock);
cond_init(cond); cond_init(cond);
@ -237,7 +237,25 @@ static int pid_wait(const pid_t pid, const int ms)
return ret; return ret;
} }
static int send_procmsg(const proc_instance_t *pi, const char *buf) static int get_proc_pid(const proc_instance_t *pi)
{
int ret, pid = 0;
char path[256];
FILE *fp;
sprintf(path, "%s%s.pid", pi->ckp->socket_dir, pi->processname);
fp = fopen(path, "re");
if (!fp)
goto out;
ret = fscanf(fp, "%d", &pid);
if (ret < 1)
pid = 0;
fclose(fp);
out:
return pid;
}
static int send_procmsg(proc_instance_t *pi, const char *buf)
{ {
char *path = pi->us.path; char *path = pi->us.path;
int ret = -1; int ret = -1;
@ -251,6 +269,12 @@ static int send_procmsg(const proc_instance_t *pi, const char *buf)
LOGERR("Attempted to send null message to socket %s in send_proc", path); LOGERR("Attempted to send null message to socket %s in send_proc", path);
goto out; goto out;
} }
if (unlikely(!pi->pid)) {
pi->pid = get_proc_pid(pi);
if (!pi->pid)
goto out;
}
if (unlikely(kill_pid(pi->pid, 0))) { if (unlikely(kill_pid(pi->pid, 0))) {
LOGALERT("Attempting to send message %s to dead process %s", buf, pi->processname); LOGALERT("Attempting to send message %s to dead process %s", buf, pi->processname);
goto out; goto out;
@ -419,9 +443,9 @@ int read_socket_line(connsock_t *cs, const int timeout)
char *newbuf; char *newbuf;
ret = wait_read_select(fd, eom ? 0 : timeout); ret = wait_read_select(fd, eom ? 0 : timeout);
if (eom && !ret)
break;
if (ret < 1) { if (ret < 1) {
if (eom)
break;
if (!ret) if (!ret)
LOGDEBUG("Select timed out in read_socket_line"); LOGDEBUG("Select timed out in read_socket_line");
else else
@ -431,7 +455,7 @@ int read_socket_line(connsock_t *cs, const int timeout)
ret = recv(fd, readbuf, PAGESIZE - 4, 0); ret = recv(fd, readbuf, PAGESIZE - 4, 0);
if (ret < 1) { if (ret < 1) {
/* Closed socket after valid message */ /* Closed socket after valid message */
if (!ret && eom) if (eom)
break; break;
LOGERR("Failed to recv in read_socket_line"); LOGERR("Failed to recv in read_socket_line");
ret = -1; ret = -1;
@ -474,46 +498,48 @@ out:
static void childsighandler(const int sig); static void childsighandler(const int sig);
static int get_proc_pid(const proc_instance_t *pi) struct proc_message {
{ proc_instance_t *pi;
int ret, pid = 0; char *msg;
char path[256]; const char *file;
FILE *fp; const char *func;
int line;
sprintf(path, "%s%s.pid", pi->ckp->socket_dir, pi->processname); };
fp = fopen(path, "re");
if (!fp)
goto out;
ret = fscanf(fp, "%d", &pid);
if (ret < 1)
pid = 0;
fclose(fp);
out:
return pid;
}
/* Send a single message to a process instance when there will be no response, /* Send all one way messages asynchronously so we can wait till the receiving
* closing the socket immediately. */ * end closes the socket to ensure all messages are received but no deadlocks
bool _send_proc(proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line) * can occur with 2 processes waiting for each other's socket closure. */
void *async_send_proc(void *arg)
{ {
struct proc_message *pm = (struct proc_message *)arg;
proc_instance_t *pi = pm->pi;
char *msg = pm->msg;
const char *file = pm->file;
const char *func = pm->func;
int line = pm->line;
char *path = pi->us.path; char *path = pi->us.path;
bool ret = false; bool ret = false;
int sockd; int sockd;
pthread_detach(pthread_self());
if (unlikely(!path || !strlen(path))) { if (unlikely(!path || !strlen(path))) {
LOGERR("Attempted to send message %s to null path in send_proc", msg ? msg : ""); LOGERR("Attempted to send message %s to null path in send_proc", msg ? msg : "");
goto out; goto out;
} }
if (unlikely(!msg || !strlen(msg))) {
LOGERR("Attempted to send null message to socket %s in send_proc", path);
goto out;
}
/* At startup the pid fields are not set up before some processes are /* At startup the pid fields are not set up before some processes are
* forked so they never inherit them. */ * forked so they never inherit them. */
if (unlikely(!pi->pid)) if (unlikely(!pi->pid)) {
pi->pid = get_proc_pid(pi); pi->pid = get_proc_pid(pi);
if (!pi->pid) {
LOGALERT("Attempting to send message %s to non existent process %s", msg, pi->processname);
goto out_nofail;
}
}
if (unlikely(kill_pid(pi->pid, 0))) { if (unlikely(kill_pid(pi->pid, 0))) {
LOGALERT("Attempting to send message %s to non existent process %s", msg, pi->processname); LOGALERT("Attempting to send message %s to non existent process %s pid %d",
msg, pi->processname, pi->pid);
goto out; goto out;
} }
sockd = open_unix_client(path); sockd = open_unix_client(path);
@ -525,13 +551,38 @@ bool _send_proc(proc_instance_t *pi, const char *msg, const char *file, const ch
LOGWARNING("Failed to send %s to socket %s", msg, path); LOGWARNING("Failed to send %s to socket %s", msg, path);
else else
ret = true; ret = true;
if (!wait_close(sockd, 5))
LOGWARNING("send_proc %s did not detect close from %s %s:%d", msg, file, func, line);
Close(sockd); Close(sockd);
out: out:
if (unlikely(!ret)) { if (unlikely(!ret)) {
LOGERR("Failure in send_proc from %s %s:%d", file, func, line); LOGERR("Failure in send_proc from %s %s:%d", file, func, line);
childsighandler(15); childsighandler(15);
} }
return ret; out_nofail:
free(msg);
free(pm);
return NULL;
}
/* Send a single message to a process instance when there will be no response,
* closing the socket immediately. */
void _send_proc(proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line)
{
struct proc_message *pm;
pthread_t pth;
if (unlikely(!msg || !strlen(msg))) {
LOGERR("Attempted to send null message to %s in send_proc", pi->processname);
return;
}
pm = ckalloc(sizeof(struct proc_message));
pm->pi = pi;
pm->msg = strdup(msg);
pm->file = file;
pm->func = func;
pm->line = line;
create_pthread(&pth, async_send_proc, pm);
} }
/* Send a single message to a process instance and retrieve the response, then /* Send a single message to a process instance and retrieve the response, then
@ -549,7 +600,15 @@ char *_send_recv_proc(proc_instance_t *pi, const char *msg, const char *file, co
LOGERR("Attempted to send null message to socket %s in send_proc", path); LOGERR("Attempted to send null message to socket %s in send_proc", path);
goto out; goto out;
} }
if (unlikely(!pi->pid)) {
pi->pid = get_proc_pid(pi);
if (!pi->pid)
goto out;
}
if (unlikely(kill_pid(pi->pid, 0))) { if (unlikely(kill_pid(pi->pid, 0))) {
/* Reset the pid value in case we are still looking for an old
* process */
pi->pid = 0;
LOGALERT("Attempting to send message %s to dead process %s", msg, pi->processname); LOGALERT("Attempting to send message %s to dead process %s", msg, pi->processname);
goto out; goto out;
} }
@ -1202,6 +1261,8 @@ static proc_instance_t *prepare_child(ckpool_t *ckp, int (*process)(), char *nam
pi->process = process; pi->process = process;
create_process_unixsock(pi); create_process_unixsock(pi);
manage_old_child(ckp, pi); manage_old_child(ckp, pi);
/* Remove the old pid file if we've succeeded in coming this far */
rm_namepid(pi);
return pi; return pi;
} }

4
src/ckpool.h

@ -34,7 +34,7 @@ struct ckmsgq {
ckpool_t *ckp; ckpool_t *ckp;
char name[16]; char name[16];
pthread_t pth; pthread_t pth;
pthread_mutex_t *lock; mutex_t *lock;
pthread_cond_t *cond; pthread_cond_t *cond;
ckmsg_t *msgs; ckmsg_t *msgs;
void (*func)(ckpool_t *, void *); void (*func)(ckpool_t *, void *);
@ -212,7 +212,7 @@ ckpool_t *global_ckp;
bool ping_main(ckpool_t *ckp); bool ping_main(ckpool_t *ckp);
void empty_buffer(connsock_t *cs); void empty_buffer(connsock_t *cs);
int read_socket_line(connsock_t *cs, const int timeout); int read_socket_line(connsock_t *cs, const int timeout);
bool _send_proc(proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line); void _send_proc(proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line);
#define send_proc(pi, msg) _send_proc(pi, msg, __FILE__, __func__, __LINE__) #define send_proc(pi, msg) _send_proc(pi, msg, __FILE__, __func__, __LINE__)
char *_send_recv_proc(proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line); char *_send_recv_proc(proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line);
#define send_recv_proc(pi, msg) _send_recv_proc(pi, msg, __FILE__, __func__, __LINE__) #define send_recv_proc(pi, msg) _send_recv_proc(pi, msg, __FILE__, __func__, __LINE__)

46
src/connector.c

@ -95,7 +95,7 @@ struct connector_data {
int64_t sends_delayed; int64_t sends_delayed;
/* For protecting the pending sends list */ /* For protecting the pending sends list */
pthread_mutex_t sender_lock; mutex_t sender_lock;
pthread_cond_t sender_cond; pthread_cond_t sender_cond;
}; };
@ -466,22 +466,22 @@ void *sender(void *arg)
rename_proc("csender"); rename_proc("csender");
while (42) { while (42) {
sender_send_t *sender_send; sender_send_t *sender_send, *delayed;
client_instance_t *client; client_instance_t *client;
int ret, fd, ofs = 0; int ret = 0, fd, ofs = 0;
mutex_lock(&cdata->sender_lock); mutex_lock(&cdata->sender_lock);
/* Poll every 100ms if there are no new sends. Re-examine /* Poll every 10ms if there are no new sends. Re-examine
* delayed sends immediately after a successful send in case * delayed sends immediately after a successful send in case
* endless new sends more frequently end up starving the * endless new sends more frequently end up starving the
* delayed sends. */ * delayed sends. */
if (!cdata->sender_sends && !sent) { if (!cdata->sender_sends && !sent) {
const ts_t polltime = {0, 100000000}; const ts_t polltime = {0, 10000000};
ts_t timeout_ts; ts_t timeout_ts;
ts_realtime(&timeout_ts); ts_realtime(&timeout_ts);
timeraddspec(&timeout_ts, &polltime); timeraddspec(&timeout_ts, &polltime);
pthread_cond_timedwait(&cdata->sender_cond, &cdata->sender_lock, &timeout_ts); cond_timedwait(&cdata->sender_cond, &cdata->sender_lock, &timeout_ts);
} }
sender_send = cdata->sender_sends; sender_send = cdata->sender_sends;
if (sender_send) if (sender_send)
@ -494,27 +494,33 @@ void *sender(void *arg)
* conditional with no new sends appearing or have just * conditional with no new sends appearing or have just
* serviced another message successfully. */ * serviced another message successfully. */
if (!sender_send) { if (!sender_send) {
if (!cdata->delayed_sends) /* Find a delayed client that needs servicing and set
* ret accordingly. We do not need to use FOREACH_SAFE
* as we break out of the loop as soon as we manipuate
* the list. */
DL_FOREACH(cdata->delayed_sends, delayed) {
if ((ret = wait_write_select(delayed->client->fd, 0))) {
sender_send = cdata->delayed_sends;
DL_DELETE(cdata->delayed_sends, sender_send);
break;
}
}
/* None found ? */
if (!sender_send)
continue; continue;
sender_send = cdata->delayed_sends;
DL_DELETE(cdata->delayed_sends, sender_send);
} }
client = sender_send->client; client = sender_send->client;
ck_rlock(&cdata->lock);
fd = client->fd;
ck_runlock(&cdata->lock);
if (fd == -1) {
LOGDEBUG("Discarding message sent to invalidated client");
goto contfree;
}
/* If this socket is not ready to receive data from us, put the /* If this socket is not ready to receive data from us, put the
* send back on the tail of the list and decrease the timeout * send back on the tail of the list and decrease the timeout
* to poll to either look for a client that is ready or poll * to poll to either look for a client that is ready or poll
* select on this one */ * select on this one */
ret = wait_write_select(fd, 0); ck_rlock(&cdata->lock);
fd = client->fd;
if (!ret)
ret = wait_write_select(fd, 0);
ck_runlock(&cdata->lock);
if (ret < 1) { if (ret < 1) {
if (ret < 0) { if (ret < 0) {
LOGINFO("Client id %"PRId64" fd %d interrupted", client->id, fd); LOGINFO("Client id %"PRId64" fd %d interrupted", client->id, fd);
@ -530,7 +536,6 @@ void *sender(void *arg)
cdata->sends_delayed++; cdata->sends_delayed++;
continue; continue;
} }
sent = true;
while (sender_send->len) { while (sender_send->len) {
ret = send(fd, sender_send->buf + ofs, sender_send->len , 0); ret = send(fd, sender_send->buf + ofs, sender_send->len , 0);
if (unlikely(ret < 0)) { if (unlikely(ret < 0)) {
@ -542,6 +547,7 @@ void *sender(void *arg)
sender_send->len -= ret; sender_send->len -= ret;
} }
contfree: contfree:
sent = true;
free(sender_send->buf); free(sender_send->buf);
free(sender_send); free(sender_send);
dec_instance_ref(cdata, client); dec_instance_ref(cdata, client);

10
src/generator.c

@ -106,18 +106,18 @@ struct proxy_instance {
bool diffed; /* Received new diff */ bool diffed; /* Received new diff */
bool reconnect; /* We need to drop and reconnect */ bool reconnect; /* We need to drop and reconnect */
pthread_mutex_t notify_lock; mutex_t notify_lock;
notify_instance_t *notify_instances; notify_instance_t *notify_instances;
notify_instance_t *current_notify; notify_instance_t *current_notify;
pthread_t pth_precv; pthread_t pth_precv;
pthread_t pth_psend; pthread_t pth_psend;
pthread_mutex_t psend_lock; mutex_t psend_lock;
pthread_cond_t psend_cond; pthread_cond_t psend_cond;
stratum_msg_t *psends; stratum_msg_t *psends;
pthread_mutex_t share_lock; mutex_t share_lock;
share_msg_t *shares; share_msg_t *shares;
int64_t share_id; int64_t share_id;
@ -128,7 +128,7 @@ struct proxy_instance {
/* Private data for the generator */ /* Private data for the generator */
struct generator_data { struct generator_data {
pthread_mutex_t lock; /* Lock protecting linked lists */ mutex_t lock; /* Lock protecting linked lists */
proxy_instance_t *proxy_list; /* Linked list of all active proxies */ proxy_instance_t *proxy_list; /* Linked list of all active proxies */
int proxy_notify_id; // Globally increasing notify id int proxy_notify_id; // Globally increasing notify id
ckmsgq_t *srvchk; // Server check message queue ckmsgq_t *srvchk; // Server check message queue
@ -1344,7 +1344,7 @@ static void *proxy_send(void *arg)
mutex_lock(&proxi->psend_lock); mutex_lock(&proxi->psend_lock);
if (!proxi->psends) if (!proxi->psends)
pthread_cond_wait(&proxi->psend_cond, &proxi->psend_lock); cond_wait(&proxi->psend_cond, &proxi->psend_lock);
msg = proxi->psends; msg = proxi->psends;
if (likely(msg)) if (likely(msg))
DL_DELETE(proxi->psends, msg); DL_DELETE(proxi->psends, msg);

231
src/libckpool.c

@ -114,42 +114,96 @@ bool ck_completion_timeout(void *fn, void *fnarg, int timeout)
return !ret; return !ret;
} }
int _cond_wait(pthread_cond_t *cond, mutex_t *lock, const char *file, const char *func, const int line)
{
int ret;
ret = pthread_cond_wait(cond, &lock->mutex);
lock->file = file;
lock->func = func;
lock->line = line;
return ret;
}
int _cond_timedwait(pthread_cond_t *cond, mutex_t *lock, const struct timespec *abstime, const char *file, const char *func, const int line)
{
int ret;
ret = pthread_cond_timedwait(cond, &lock->mutex, abstime);
lock->file = file;
lock->func = func;
lock->line = line;
return ret;
}
/* Place holders for when we add lock debugging */
#define GETLOCK(_lock, _file, _func, _line)
#define GOTLOCK(_lock, _file, _func, _line)
#define TRYLOCK(_lock, _file, _func, _line)
#define DIDLOCK(_ret, _lock, _file, _func, _line)
#define GUNLOCK(_lock, _file, _func, _line)
#define INITLOCK(_typ, _lock, _file, _func, _line)
void _mutex_lock(pthread_mutex_t *lock, const char *file, const char *func, const int line) int _mutex_timedlock(mutex_t *lock, int timeout, const char *file, const char *func, const int line)
{ {
GETLOCK(lock, file, func, line); tv_t now;
if (unlikely(pthread_mutex_lock(lock))) ts_t abs;
int ret;
tv_time(&now);
tv_to_ts(&abs, &now);
abs.tv_sec += timeout;
ret = pthread_mutex_timedlock(&lock->mutex, &abs);
if (!ret) {
lock->file = file;
lock->func = func;
lock->line = line;
}
return ret;
}
/* Make every locking attempt warn if we're unable to get the lock for more
* than 10 seconds and fail if we can't get it for longer than a minute. */
void _mutex_lock(mutex_t *lock, const char *file, const char *func, const int line)
{
int ret, retries = 0;
retry:
ret = _mutex_timedlock(lock, 10, file, func, line);
if (unlikely(ret)) {
if (likely(ret == ETIMEDOUT)) {
LOGERR("WARNING: Prolonged mutex lock contention from %s %s:%d, held by %s %s:%d",
file, func, line, lock->file, lock->func, lock->line);
if (++retries < 6)
goto retry;
quitfrom(1, file, func, line, "FAILED TO GRAB MUTEX!");
}
quitfrom(1, file, func, line, "WTF MUTEX ERROR ON LOCK!"); quitfrom(1, file, func, line, "WTF MUTEX ERROR ON LOCK!");
GOTLOCK(lock, file, func, line); }
} }
void _mutex_unlock(pthread_mutex_t *lock, const char *file, const char *func, const int line) /* Does not unset lock->file/func/line since they're only relevant when the lock is held */
void _mutex_unlock(mutex_t *lock, const char *file, const char *func, const int line)
{ {
if (unlikely(pthread_mutex_unlock(lock))) if (unlikely(pthread_mutex_unlock(&lock->mutex)))
quitfrom(1, file, func, line, "WTF MUTEX ERROR ON UNLOCK!"); quitfrom(1, file, func, line, "WTF MUTEX ERROR ON UNLOCK!");
GUNLOCK(lock, file, func, line);
} }
int _mutex_trylock(pthread_mutex_t *lock, __maybe_unused const char *file, __maybe_unused const char *func, __maybe_unused const int line) int _mutex_trylock(mutex_t *lock, __maybe_unused const char *file, __maybe_unused const char *func, __maybe_unused const int line)
{ {
int ret; int ret;
TRYLOCK(lock, file, func, line); ret = pthread_mutex_trylock(&lock->mutex);
ret = pthread_mutex_trylock(lock); if (!ret) {
DIDLOCK(ret, lock, file, func, line); lock->file = file;
lock->func = func;
lock->line = line;
}
return ret; return ret;
} }
int _mutex_timedlock(pthread_mutex_t *lock, int timeout, __maybe_unused const char *file, __maybe_unused const char *func, __maybe_unused const int line) void mutex_destroy(mutex_t *lock)
{
pthread_mutex_destroy(&lock->mutex);
}
static int wr_timedlock(pthread_rwlock_t *lock, int timeout)
{ {
tv_t now; tv_t now;
ts_t abs; ts_t abs;
@ -159,79 +213,108 @@ int _mutex_timedlock(pthread_mutex_t *lock, int timeout, __maybe_unused const ch
tv_to_ts(&abs, &now); tv_to_ts(&abs, &now);
abs.tv_sec += timeout; abs.tv_sec += timeout;
TRYLOCK(lock, file, func, line); ret = pthread_rwlock_timedwrlock(lock, &abs);
ret = pthread_mutex_timedlock(lock, &abs);
DIDLOCK(ret, lock, file, func, line);
return ret; return ret;
} }
void _wr_lock(pthread_rwlock_t *lock, const char *file, const char *func, const int line) void _wr_lock(rwlock_t *lock, const char *file, const char *func, const int line)
{ {
GETLOCK(lock, file, func, line); int ret, retries = 0;
if (unlikely(pthread_rwlock_wrlock(lock)))
quitfrom(1, file, func, line, "WTF WRLOCK ERROR ON LOCK!"); retry:
GOTLOCK(lock, file, func, line); ret = wr_timedlock(&lock->rwlock, 10);
if (unlikely(ret)) {
if (likely(ret == ETIMEDOUT)) {
LOGERR("WARNING: Prolonged write lock contention from %s %s:%d, held by %s %s:%d",
file, func, line, lock->file, lock->func, lock->line);
if (++retries < 6)
goto retry;
quitfrom(1, file, func, line, "FAILED TO GRAB WRITE LOCK!");
}
quitfrom(1, file, func, line, "WTF ERROR ON WRITE LOCK!");
}
lock->file = file;
lock->func = func;
lock->line = line;
} }
int _wr_trylock(pthread_rwlock_t *lock, __maybe_unused const char *file, __maybe_unused const char *func, __maybe_unused const int line) int _wr_trylock(rwlock_t *lock, __maybe_unused const char *file, __maybe_unused const char *func, __maybe_unused const int line)
{ {
TRYLOCK(lock, file, func, line); int ret = pthread_rwlock_trywrlock(&lock->rwlock);
int ret = pthread_rwlock_trywrlock(lock);
DIDLOCK(ret, lock, file, func, line); if (!ret) {
lock->file = file;
lock->func = func;
lock->line = line;
}
return ret; return ret;
} }
void _rd_lock(pthread_rwlock_t *lock, const char *file, const char *func, const int line) static int rd_timedlock(pthread_rwlock_t *lock, int timeout)
{ {
GETLOCK(lock, file, func, line); tv_t now;
if (unlikely(pthread_rwlock_rdlock(lock))) ts_t abs;
quitfrom(1, file, func, line, "WTF RDLOCK ERROR ON LOCK!"); int ret;
GOTLOCK(lock, file, func, line);
tv_time(&now);
tv_to_ts(&abs, &now);
abs.tv_sec += timeout;
ret = pthread_rwlock_timedrdlock(lock, &abs);
return ret;
} }
void _rw_unlock(pthread_rwlock_t *lock, const char *file, const char *func, const int line) void _rd_lock(rwlock_t *lock, const char *file, const char *func, const int line)
{ {
if (unlikely(pthread_rwlock_unlock(lock))) int ret, retries = 0;
quitfrom(1, file, func, line, "WTF RWLOCK ERROR ON UNLOCK!");
GUNLOCK(lock, file, func, line); retry:
ret = rd_timedlock(&lock->rwlock, 10);
if (unlikely(ret)) {
if (likely(ret == ETIMEDOUT)) {
LOGERR("WARNING: Prolonged read lock contention from %s %s:%d, held by %s %s:%d",
file, func, line, lock->file, lock->func, lock->line);
if (++retries < 6)
goto retry;
quitfrom(1, file, func, line, "FAILED TO GRAB READ LOCK!");
}
quitfrom(1, file, func, line, "WTF ERROR ON READ LOCK!");
}
lock->file = file;
lock->func = func;
lock->line = line;
} }
void _rd_unlock(pthread_rwlock_t *lock, const char *file, const char *func, const int line) void _rw_unlock(rwlock_t *lock, const char *file, const char *func, const int line)
{ {
_rw_unlock(lock, file, func, line); if (unlikely(pthread_rwlock_unlock(&lock->rwlock)))
quitfrom(1, file, func, line, "WTF RWLOCK ERROR ON UNLOCK!");
} }
void _wr_unlock(pthread_rwlock_t *lock, const char *file, const char *func, const int line) void _rd_unlock(rwlock_t *lock, const char *file, const char *func, const int line)
{ {
_rw_unlock(lock, file, func, line); _rw_unlock(lock, file, func, line);
} }
void _mutex_init(pthread_mutex_t *lock, const char *file, const char *func, const int line) void _wr_unlock(rwlock_t *lock, const char *file, const char *func, const int line)
{ {
if (unlikely(pthread_mutex_init(lock, NULL))) _rw_unlock(lock, file, func, line);
quitfrom(1, file, func, line, "Failed to pthread_mutex_init");
INITLOCK(lock, CGLOCK_MUTEX, file, func, line);
} }
void mutex_destroy(pthread_mutex_t *lock) void _mutex_init(mutex_t *lock, const char *file, const char *func, const int line)
{ {
/* Ignore return code. This only invalidates the mutex on linux but if (unlikely(pthread_mutex_init(&lock->mutex, NULL)))
* releases resources on windows. */ quitfrom(1, file, func, line, "Failed to pthread_mutex_init");
pthread_mutex_destroy(lock);
} }
void _rwlock_init(pthread_rwlock_t *lock, const char *file, const char *func, const int line) void _rwlock_init(rwlock_t *lock, const char *file, const char *func, const int line)
{ {
if (unlikely(pthread_rwlock_init(lock, NULL))) if (unlikely(pthread_rwlock_init(&lock->rwlock, NULL)))
quitfrom(1, file, func, line, "Failed to pthread_rwlock_init"); quitfrom(1, file, func, line, "Failed to pthread_rwlock_init");
INITLOCK(lock, CGLOCK_RW, file, func, line);
} }
void rwlock_destroy(pthread_rwlock_t *lock)
{
pthread_rwlock_destroy(lock);
}
void _cond_init(pthread_cond_t *cond, const char *file, const char *func, const int line) void _cond_init(pthread_cond_t *cond, const char *file, const char *func, const int line)
{ {
@ -245,11 +328,6 @@ void _cklock_init(cklock_t *lock, const char *file, const char *func, const int
_rwlock_init(&lock->rwlock, file, func, line); _rwlock_init(&lock->rwlock, file, func, line);
} }
void cklock_destroy(cklock_t *lock)
{
rwlock_destroy(&lock->rwlock);
mutex_destroy(&lock->mutex);
}
/* Read lock variant of cklock. Cannot be promoted. */ /* Read lock variant of cklock. Cannot be promoted. */
void _ck_rlock(cklock_t *lock, const char *file, const char *func, const int line) void _ck_rlock(cklock_t *lock, const char *file, const char *func, const int line)
@ -317,6 +395,13 @@ void _ck_wunlock(cklock_t *lock, const char *file, const char *func, const int l
_mutex_unlock(&lock->mutex, file, func, line); _mutex_unlock(&lock->mutex, file, func, line);
} }
void cklock_destroy(cklock_t *lock)
{
pthread_rwlock_destroy(&lock->rwlock.rwlock);
pthread_mutex_destroy(&lock->mutex.mutex);
}
void _cksem_init(sem_t *sem, const char *file, const char *func, const int line) void _cksem_init(sem_t *sem, const char *file, const char *func, const int line)
{ {
int ret; int ret;
@ -825,6 +910,24 @@ out:
return sockd; return sockd;
} }
/* Wait till a socket has been closed at the other end */
int wait_close(int sockd, int timeout)
{
struct pollfd sfd;
int ret;
if (unlikely(sockd < 0))
return -1;
sfd.fd = sockd;
sfd.events = POLLIN;
sfd.revents = 0;
timeout *= 1000;
ret = poll(&sfd, 1, timeout);
if (ret < 1)
return 0;
return sfd.revents & POLLHUP;
}
/* Emulate a select read wait for high fds that select doesn't support */ /* Emulate a select read wait for high fds that select doesn't support */
int wait_read_select(int sockd, int timeout) int wait_read_select(int sockd, int timeout)
{ {

68
src/libckpool.h

@ -145,11 +145,13 @@ static inline void flip_80(void *dest_p, const void *src_p)
dest[i] = bswap_32(src[i]); dest[i] = bswap_32(src[i]);
} }
#define cond_wait(_cond, _lock) _cond_wait(_cond, _lock, __FILE__, __func__, __LINE__)
#define cond_timedwait(_cond, _lock, _abstime) _cond_timedwait(_cond, _lock, _abstime, __FILE__, __func__, __LINE__)
#define mutex_timedlock(_lock, _timeout) _mutex_timedlock(_lock, _timeout, __FILE__, __func__, __LINE__)
#define mutex_lock(_lock) _mutex_lock(_lock, __FILE__, __func__, __LINE__) #define mutex_lock(_lock) _mutex_lock(_lock, __FILE__, __func__, __LINE__)
#define mutex_unlock_noyield(_lock) _mutex_unlock_noyield(_lock, __FILE__, __func__, __LINE__) #define mutex_unlock_noyield(_lock) _mutex_unlock_noyield(_lock, __FILE__, __func__, __LINE__)
#define mutex_unlock(_lock) _mutex_unlock(_lock, __FILE__, __func__, __LINE__) #define mutex_unlock(_lock) _mutex_unlock(_lock, __FILE__, __func__, __LINE__)
#define mutex_trylock(_lock) _mutex_trylock(_lock, __FILE__, __func__, __LINE__) #define mutex_trylock(_lock) _mutex_trylock(_lock, __FILE__, __func__, __LINE__)
#define mutex_timedlock(_lock, timeout) _mutex_timedlock(_lock, timeout, __FILE__, __func__, __LINE__)
#define wr_lock(_lock) _wr_lock(_lock, __FILE__, __func__, __LINE__) #define wr_lock(_lock) _wr_lock(_lock, __FILE__, __func__, __LINE__)
#define wr_trylock(_lock) _wr_trylock(_lock, __FILE__, __func__, __LINE__) #define wr_trylock(_lock) _wr_trylock(_lock, __FILE__, __func__, __LINE__)
#define rd_lock(_lock) _rd_lock(_lock, __FILE__, __func__, __LINE__) #define rd_lock(_lock) _rd_lock(_lock, __FILE__, __func__, __LINE__)
@ -275,10 +277,31 @@ static const char __maybe_unused *share_errs[] = {
#define SHARE_ERR(x) share_errs[((x) + 9)] #define SHARE_ERR(x) share_errs[((x) + 9)]
/* ck locks, a write biased variant of rwlocks */ typedef struct ckmutex mutex_t;
struct cklock {
struct ckmutex {
pthread_mutex_t mutex; pthread_mutex_t mutex;
const char *file;
const char *func;
int line;
};
typedef struct ckrwlock rwlock_t;
struct ckrwlock {
pthread_rwlock_t rwlock; pthread_rwlock_t rwlock;
const char *file;
const char *func;
int line;
};
/* ck locks, a write biased variant of rwlocks */
struct cklock {
mutex_t mutex;
rwlock_t rwlock;
const char *file;
const char *func;
int line;
}; };
typedef struct cklock cklock_t; typedef struct cklock cklock_t;
@ -386,27 +409,28 @@ void create_pthread(pthread_t *thread, void *(*start_routine)(void *), void *arg
void join_pthread(pthread_t thread); void join_pthread(pthread_t thread);
bool ck_completion_timeout(void *fn, void *fnarg, int timeout); bool ck_completion_timeout(void *fn, void *fnarg, int timeout);
void _mutex_lock(pthread_mutex_t *lock, const char *file, const char *func, const int line); int _cond_wait(pthread_cond_t *cond, mutex_t *lock, const char *file, const char *func, const int line);
void _mutex_unlock_noyield(pthread_mutex_t *lock, const char *file, const char *func, const int line); int _cond_timedwait(pthread_cond_t *cond, mutex_t *lock, const struct timespec *abstime, const char *file, const char *func, const int line);
void _mutex_unlock(pthread_mutex_t *lock, const char *file, const char *func, const int line); int _mutex_timedlock(mutex_t *lock, int timeout, const char *file, const char *func, const int line);
int _mutex_trylock(pthread_mutex_t *lock, __maybe_unused const char *file, __maybe_unused const char *func, __maybe_unused const int line); void _mutex_lock(mutex_t *lock, const char *file, const char *func, const int line);
int _mutex_timedlock(pthread_mutex_t *lock, int timeout, __maybe_unused const char *file, __maybe_unused const char *func, __maybe_unused const int line); void _mutex_unlock_noyield(mutex_t *lock, const char *file, const char *func, const int line);
void _wr_lock(pthread_rwlock_t *lock, const char *file, const char *func, const int line); void _mutex_unlock(mutex_t *lock, const char *file, const char *func, const int line);
int _wr_trylock(pthread_rwlock_t *lock, __maybe_unused const char *file, __maybe_unused const char *func, __maybe_unused const int line); int _mutex_trylock(mutex_t *lock, __maybe_unused const char *file, __maybe_unused const char *func, __maybe_unused const int line);
void _rd_lock(pthread_rwlock_t *lock, const char *file, const char *func, const int line); void mutex_destroy(mutex_t *lock);
void _rw_unlock(pthread_rwlock_t *lock, const char *file, const char *func, const int line);
void _rd_unlock_noyield(pthread_rwlock_t *lock, const char *file, const char *func, const int line); void _wr_lock(rwlock_t *lock, const char *file, const char *func, const int line);
void _wr_unlock_noyield(pthread_rwlock_t *lock, const char *file, const char *func, const int line); int _wr_trylock(rwlock_t *lock, __maybe_unused const char *file, __maybe_unused const char *func, __maybe_unused const int line);
void _rd_unlock(pthread_rwlock_t *lock, const char *file, const char *func, const int line); void _rd_lock(rwlock_t *lock, const char *file, const char *func, const int line);
void _wr_unlock(pthread_rwlock_t *lock, const char *file, const char *func, const int line); void _rw_unlock(rwlock_t *lock, const char *file, const char *func, const int line);
void _mutex_init(pthread_mutex_t *lock, const char *file, const char *func, const int line); void _rd_unlock_noyield(rwlock_t *lock, const char *file, const char *func, const int line);
void mutex_destroy(pthread_mutex_t *lock); void _wr_unlock_noyield(rwlock_t *lock, const char *file, const char *func, const int line);
void _rwlock_init(pthread_rwlock_t *lock, const char *file, const char *func, const int line); void _rd_unlock(rwlock_t *lock, const char *file, const char *func, const int line);
void rwlock_destroy(pthread_rwlock_t *lock); void _wr_unlock(rwlock_t *lock, const char *file, const char *func, const int line);
void _mutex_init(mutex_t *lock, const char *file, const char *func, const int line);
void _rwlock_init(rwlock_t *lock, const char *file, const char *func, const int line);
void _cond_init(pthread_cond_t *cond, const char *file, const char *func, const int line); void _cond_init(pthread_cond_t *cond, const char *file, const char *func, const int line);
void _cklock_init(cklock_t *lock, const char *file, const char *func, const int line); void _cklock_init(cklock_t *lock, const char *file, const char *func, const int line);
void cklock_destroy(cklock_t *lock);
void _ck_rlock(cklock_t *lock, const char *file, const char *func, const int line); void _ck_rlock(cklock_t *lock, const char *file, const char *func, const int line);
void _ck_ilock(cklock_t *lock, const char *file, const char *func, const int line); void _ck_ilock(cklock_t *lock, const char *file, const char *func, const int line);
void _ck_uilock(cklock_t *lock, const char *file, const char *func, const int line); void _ck_uilock(cklock_t *lock, const char *file, const char *func, const int line);
@ -417,6 +441,7 @@ void _ck_dwilock(cklock_t *lock, const char *file, const char *func, const int l
void _ck_dlock(cklock_t *lock, const char *file, const char *func, const int line); void _ck_dlock(cklock_t *lock, const char *file, const char *func, const int line);
void _ck_runlock(cklock_t *lock, const char *file, const char *func, const int line); void _ck_runlock(cklock_t *lock, const char *file, const char *func, const int line);
void _ck_wunlock(cklock_t *lock, const char *file, const char *func, const int line); void _ck_wunlock(cklock_t *lock, const char *file, const char *func, const int line);
void cklock_destroy(cklock_t *lock);
void _cksem_init(sem_t *sem, const char *file, const char *func, const int line); void _cksem_init(sem_t *sem, const char *file, const char *func, const int line);
void _cksem_post(sem_t *sem, const char *file, const char *func, const int line); void _cksem_post(sem_t *sem, const char *file, const char *func, const int line);
@ -468,6 +493,7 @@ int _open_unix_server(const char *server_path, const char *file, const char *fun
#define open_unix_server(server_path) _open_unix_server(server_path, __FILE__, __func__, __LINE__) #define open_unix_server(server_path) _open_unix_server(server_path, __FILE__, __func__, __LINE__)
int _open_unix_client(const char *server_path, const char *file, const char *func, const int line); int _open_unix_client(const char *server_path, const char *file, const char *func, const int line);
#define open_unix_client(server_path) _open_unix_client(server_path, __FILE__, __func__, __LINE__) #define open_unix_client(server_path) _open_unix_client(server_path, __FILE__, __func__, __LINE__)
int wait_close(int sockd, int timeout);
int wait_read_select(int sockd, int timeout); int wait_read_select(int sockd, int timeout);
int read_length(int sockd, void *buf, int len); int read_length(int sockd, void *buf, int len);
char *_recv_unix_msg(int sockd, int timeout1, int timeout2, const char *file, const char *func, const int line); char *_recv_unix_msg(int sockd, int timeout1, int timeout2, const char *file, const char *func, const int line);

29
src/stratifier.c

@ -285,10 +285,10 @@ struct stratifier_data {
pool_stats_t stats; pool_stats_t stats;
/* Protects changes to pool stats */ /* Protects changes to pool stats */
pthread_mutex_t stats_lock; mutex_t stats_lock;
/* Serialises sends/receives to ckdb if possible */ /* Serialises sends/receives to ckdb if possible */
pthread_mutex_t ckdb_lock; mutex_t ckdb_lock;
bool ckdb_offline; bool ckdb_offline;
@ -336,14 +336,14 @@ struct stratifier_data {
cklock_t instance_lock; cklock_t instance_lock;
share_t *shares; share_t *shares;
pthread_mutex_t share_lock; mutex_t share_lock;
int64_t shares_generated; int64_t shares_generated;
/* Linked list of block solves, added to during submission, removed on /* Linked list of block solves, added to during submission, removed on
* accept/reject. It is likely we only ever have one solve on here but * accept/reject. It is likely we only ever have one solve on here but
* you never know... */ * you never know... */
pthread_mutex_t block_lock; mutex_t block_lock;
ckmsg_t *block_solves; ckmsg_t *block_solves;
/* Generator message priority */ /* Generator message priority */
@ -1241,11 +1241,15 @@ static void client_drop_message(const int64_t client_id, const int dropped, cons
} }
} }
static void dec_worker(ckpool_t *ckp, user_instance_t *instance);
/* Decrease the reference count of instance. */ /* Decrease the reference count of instance. */
static void _dec_instance_ref(sdata_t *sdata, stratum_instance_t *client, const char *file, static void _dec_instance_ref(sdata_t *sdata, stratum_instance_t *client, const char *file,
const char *func, const int line) const char *func, const int line)
{ {
user_instance_t *user = client->user_instance;
int64_t client_id = client->id; int64_t client_id = client->id;
ckpool_t *ckp = client->ckp;
int dropped = 0, ref; int dropped = 0, ref;
ck_wlock(&sdata->instance_lock); ck_wlock(&sdata->instance_lock);
@ -1253,7 +1257,7 @@ static void _dec_instance_ref(sdata_t *sdata, stratum_instance_t *client, const
/* See if there are any instances that were dropped that could not be /* See if there are any instances that were dropped that could not be
* moved due to holding a reference and drop them now. */ * moved due to holding a reference and drop them now. */
if (unlikely(client->dropped && !ref)) if (unlikely(client->dropped && !ref))
dropped = __drop_client(sdata, client, client->user_instance); dropped = __drop_client(sdata, client, user);
ck_wunlock(&sdata->instance_lock); ck_wunlock(&sdata->instance_lock);
client_drop_message(client_id, dropped, true); client_drop_message(client_id, dropped, true);
@ -1261,6 +1265,9 @@ static void _dec_instance_ref(sdata_t *sdata, stratum_instance_t *client, const
/* This should never happen */ /* This should never happen */
if (unlikely(ref < 0)) if (unlikely(ref < 0))
LOGERR("Instance ref count dropped below zero from %s %s:%d", file, func, line); LOGERR("Instance ref count dropped below zero from %s %s:%d", file, func, line);
if (dropped && user)
dec_worker(ckp, user);
} }
#define dec_instance_ref(sdata, instance) _dec_instance_ref(sdata, instance, __FILE__, __func__, __LINE__) #define dec_instance_ref(sdata, instance) _dec_instance_ref(sdata, instance, __FILE__, __func__, __LINE__)
@ -3414,7 +3421,11 @@ static void parse_instance_msg(sdata_t *sdata, smsg_t *msg, stratum_instance_t *
if (res_val) { if (res_val) {
const char *result = json_string_value(res_val); const char *result = json_string_value(res_val);
LOGDEBUG("Received spurious response %s", result ? result : ""); if (!safecmp(result, "pong"))
LOGDEBUG("Received pong from client %"PRId64, client_id);
else
LOGDEBUG("Received spurious response %s from client %"PRId64,
result ? result : "", client_id);
goto out; goto out;
} }
send_json_err(sdata, client_id, id_val, "-3:method not found"); send_json_err(sdata, client_id, id_val, "-3:method not found");
@ -3676,7 +3687,7 @@ static void parse_ckdb_cmd(ckpool_t *ckp, const char *cmd)
} }
/* Test a value under lock and set it, returning the original value */ /* Test a value under lock and set it, returning the original value */
static bool test_and_set(bool *val, pthread_mutex_t *lock) static bool test_and_set(bool *val, mutex_t *lock)
{ {
bool ret; bool ret;
@ -3688,7 +3699,7 @@ static bool test_and_set(bool *val, pthread_mutex_t *lock)
return ret; return ret;
} }
static bool test_and_clear(bool *val, pthread_mutex_t *lock) static bool test_and_clear(bool *val, mutex_t *lock)
{ {
bool ret; bool ret;
@ -4338,8 +4349,8 @@ int stratifier(proc_instance_t *pi)
ckpool_t *ckp = pi->ckp; ckpool_t *ckp = pi->ckp;
int ret = 1, threads; int ret = 1, threads;
int64_t randomiser; int64_t randomiser;
char *buf = NULL;
sdata_t *sdata; sdata_t *sdata;
char *buf;
LOGWARNING("%s stratifier starting", ckp->name); LOGWARNING("%s stratifier starting", ckp->name);
sdata = ckzalloc(sizeof(sdata_t)); sdata = ckzalloc(sizeof(sdata_t));

Loading…
Cancel
Save