From 8dbc90fbafc0b7fd40a70cffc0295a1d2d22cef0 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Fri, 1 Aug 2014 11:21:28 +1000 Subject: [PATCH 1/7] Add a helper function for safely checking if a buffer matches a command --- src/libckpool.c | 16 ++++++++++++++++ src/libckpool.h | 1 + 2 files changed, 17 insertions(+) diff --git a/src/libckpool.c b/src/libckpool.c index 0a478649..af3633a1 100644 --- a/src/libckpool.c +++ b/src/libckpool.c @@ -1081,6 +1081,22 @@ int safecmp(const char *a, const char *b) return (strcmp(a, b)); } +/* Returns whether there is a case insensitive match of buf to cmd, safely + * handling NULL or zero length strings. */ +bool cmdmatch(const char *buf, const char *cmd) +{ + int cmdlen, buflen; + + if (!buf) + return false; + buflen = strlen(buf); + if (!buflen) + return false; + cmdlen = strlen(cmd); + if (buflen < cmdlen) + return false; + return !strncasecmp(buf, cmd, cmdlen); +} static const char base64[] = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/"; diff --git a/src/libckpool.h b/src/libckpool.h index 08d30242..e2af60c9 100644 --- a/src/libckpool.h +++ b/src/libckpool.h @@ -415,6 +415,7 @@ bool _hex2bin(void *p, const void *vhexstr, size_t len, const char *file, const char *http_base64(const char *src); void b58tobin(char *b58bin, const char *b58); int safecmp(const char *a, const char *b); +bool cmdmatch(const char *buf, const char *cmd); void address_to_pubkeytxn(char *pkh, const char *addr); int ser_number(uchar *s, int32_t val); From 772315e41753b27c42c9e6a73deb511f26ea1fa6 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Fri, 1 Aug 2014 12:18:00 +1000 Subject: [PATCH 2/7] Use the cmdmatch function for commands in the stratifier --- src/libckpool.c | 1 + src/stratifier.c | 30 +++++++++++++++--------------- 2 files changed, 16 insertions(+), 15 deletions(-) diff --git a/src/libckpool.c b/src/libckpool.c index af3633a1..ab3f6056 100644 --- a/src/libckpool.c +++ b/src/libckpool.c @@ -1098,6 +1098,7 @@ bool cmdmatch(const char *buf, const char *cmd) return !strncasecmp(buf, cmd, cmdlen); } + static const char base64[] = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/"; /* Return a malloced string of *src encoded into mime base 64 */ diff --git a/src/stratifier.c b/src/stratifier.c index 8ad15f5c..5c5b0432 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -542,7 +542,7 @@ static void update_base(ckpool_t *ckp) LOGWARNING("Failed to get base from generator in update_base"); return; } - if (unlikely(!strncasecmp(buf, "failed", 6))) { + if (unlikely(cmdmatch(buf, "failed"))) { LOGWARNING("Generator returned failure in update_base"); return; } @@ -1020,7 +1020,7 @@ retry: LOGWARNING("Failed to get message in stratum_loop"); goto retry; } - if (!strncasecmp(buf, "ping", 4)) { + if (cmdmatch(buf, "ping")) { LOGDEBUG("Stratifier received ping request"); send_unix_msg(sockd, "pong"); close(sockd); @@ -1029,21 +1029,21 @@ retry: close(sockd); LOGDEBUG("Stratifier received request: %s", buf); - if (!strncasecmp(buf, "shutdown", 8)) { + if (cmdmatch(buf, "shutdown")) { ret = 0; goto out; - } else if (!strncasecmp(buf, "update", 6)) { + } else if (cmdmatch(buf, "update")) { update_base(ckp); - } else if (!strncasecmp(buf, "subscribe", 9)) { + } else if (cmdmatch(buf, "subscribe")) { /* Proxifier has a new subscription */ if (!update_subscribe(ckp)) goto out; - } else if (!strncasecmp(buf, "notify", 6)) { + } else if (cmdmatch(buf, "notify")) { /* Proxifier has a new notify ready */ update_notify(ckp); - } else if (!strncasecmp(buf, "diff", 4)) { + } else if (cmdmatch(buf, "diff")) { update_diff(ckp); - } else if (!strncasecmp(buf, "dropclient", 10)) { + } else if (cmdmatch(buf, "dropclient")) { int client_id; ret = sscanf(buf, "dropclient=%d", &client_id); @@ -1051,9 +1051,9 @@ retry: LOGDEBUG("Stratifier failed to parse dropclient command: %s", buf); else drop_client(client_id); - } else if (!strncasecmp(buf, "block", 5)) { + } else if (cmdmatch(buf, "block")) { block_solve(ckp); - } else if (!strncasecmp(buf, "loglevel", 8)) { + } else if (cmdmatch(buf, "loglevel")) { sscanf(buf, "loglevel=%d", &ckp->loglevel); } else { json_t *val = json_loads(buf, 0, NULL); @@ -1079,7 +1079,7 @@ static void *blockupdate(void *arg) pthread_detach(pthread_self()); rename_proc("blockupdate"); buf = send_recv_proc(ckp->generator, "getbest"); - if (buf && strncasecmp(buf, "Failed", 6)) + if (!cmdmatch(buf, "failed")) sprintf(request, "getbest"); else sprintf(request, "getlast"); @@ -1088,7 +1088,7 @@ static void *blockupdate(void *arg) while (42) { dealloc(buf); buf = send_recv_proc(ckp->generator, request); - if (safecmp(buf, hash) && strncasecmp(buf, "Failed", 6)) { + if (safecmp(buf, hash) && !cmdmatch(buf, "failed")) { strcpy(hash, buf); LOGNOTICE("Block hash changed to %s", hash); send_proc(ckp->stratifier, "update"); @@ -1899,7 +1899,7 @@ static void parse_method(const int client_id, json_t *id_val, json_t *method_val /* Random broken clients send something not an integer as the id so we copy * the json item for id_val as is for the response. */ method = json_string_value(method_val); - if (!strncasecmp(method, "mining.subscribe", 16)) { + if (cmdmatch(method, "mining.subscribe")) { json_t *val, *result_val = parse_subscribe(client_id, params_val); if (!result_val) @@ -1922,7 +1922,7 @@ static void parse_method(const int client_id, json_t *id_val, json_t *method_val return; } - if (!strncasecmp(method, "mining.auth", 11)) { + if (cmdmatch(method, "mining.auth")) { json_params_t *jp = create_json_params(client_id, params_val, id_val, address); ckmsgq_add(sauthq, jp); @@ -1942,7 +1942,7 @@ static void parse_method(const int client_id, json_t *id_val, json_t *method_val return; } - if (!strncasecmp(method, "mining.submit", 13)) { + if (cmdmatch(method, "mining.submit")) { json_params_t *jp = create_json_params(client_id, params_val, id_val, address); ckmsgq_add(sshareq, jp); From baacb6ed54d93aa5d24db54e65b8ddd1e02ccca4 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Fri, 1 Aug 2014 12:23:57 +1000 Subject: [PATCH 3/7] Convert the generator to use cmdmatch --- src/generator.c | 40 ++++++++++++++++++++-------------------- 1 file changed, 20 insertions(+), 20 deletions(-) diff --git a/src/generator.c b/src/generator.c index 20a6cab1..85f04c42 100644 --- a/src/generator.c +++ b/src/generator.c @@ -233,11 +233,11 @@ retry: goto retry; } LOGDEBUG("Generator received request: %s", buf); - if (!strncasecmp(buf, "shutdown", 8)) { + if (cmdmatch(buf, "shutdown")) { ret = 0; goto out; } - if (!strncasecmp(buf, "getbase", 7)) { + if (cmdmatch(buf, "getbase")) { if (!gen_gbtbase(cs, gbt)) { LOGWARNING("Failed to get block template from %s:%s", cs->url, cs->port); @@ -250,14 +250,14 @@ retry: free(s); clear_gbtbase(gbt); } - } else if (!strncasecmp(buf, "getbest", 7)) { + } else if (cmdmatch(buf, "getbest")) { if (!get_bestblockhash(cs, hash)) { LOGINFO("No best block hash support from %s:%s", cs->url, cs->port); send_unix_msg(sockd, "Failed"); } else send_unix_msg(sockd, hash); - } else if (!strncasecmp(buf, "getlast", 7)) { + } else if (cmdmatch(buf, "getlast")) { int height = get_blockcount(cs); if (height == -1) { @@ -273,13 +273,13 @@ retry: LOGDEBUG("Hash: %s", hash); } } - } else if (!strncasecmp(buf, "submitblock:", 12)) { + } else if (cmdmatch(buf, "submitblock:")) { LOGNOTICE("Submitting block data!"); if (submit_block(cs, buf + 12)) send_proc(ckp->stratifier, "block"); - } else if (!strncasecmp(buf, "loglevel", 8)) { + } else if (cmdmatch(buf, "loglevel")) { sscanf(buf, "loglevel=%d", &ckp->loglevel); - } else if (!strncasecmp(buf, "ping", 4)) { + } else if (cmdmatch(buf, "ping")) { LOGDEBUG("Generator received ping request"); send_unix_msg(sockd, "pong"); } @@ -383,7 +383,7 @@ static json_t *find_notify(json_t *val) return NULL; arr_size = json_array_size(val); entry = json_string_value(json_array_get(val, 0)); - if (entry && !strncasecmp(entry, "mining.notify", 13)) + if (cmdmatch(entry, "mining.notify")) return val; for (i = 0; i < arr_size; i++) { json_t *arr_val; @@ -759,7 +759,7 @@ static bool parse_method(proxy_instance_t *proxi, const char *msg) goto out; } - if (!strncasecmp(buf, "mining.notify", 13)) { + if (cmdmatch(buf, "mining.notify")) { if (parse_notify(proxi, params)) proxi->notified = ret = true; else @@ -767,22 +767,22 @@ static bool parse_method(proxy_instance_t *proxi, const char *msg) goto out; } - if (!strncasecmp(buf, "mining.set_difficulty", 21)) { + if (cmdmatch(buf, "mining.set_difficulty")) { ret = parse_diff(proxi, params); goto out; } - if (!strncasecmp(buf, "client.reconnect", 16)) { + if (cmdmatch(buf, "client.reconnect")) { ret = parse_reconnect(proxi, params); goto out; } - if (!strncasecmp(buf, "client.get_version", 18)) { + if (cmdmatch(buf, "client.get_version")) { ret = send_version(proxi, val); goto out; } - if (!strncasecmp(buf, "client.show_message", 19)) { + if (cmdmatch(buf, "client.show_message")) { ret = show_message(params); goto out; } @@ -1237,23 +1237,23 @@ retry: goto retry; } LOGDEBUG("Proxy received request: %s", buf); - if (!strncasecmp(buf, "shutdown", 8)) { + if (cmdmatch(buf, "shutdown")) { ret = 0; goto out; - } else if (!strncasecmp(buf, "getsubscribe", 12)) { + } else if (cmdmatch(buf, "getsubscribe")) { send_subscribe(proxi, sockd); - } else if (!strncasecmp(buf, "getnotify", 9)) { + } else if (cmdmatch(buf, "getnotify")) { send_notify(proxi, sockd); - } else if (!strncasecmp(buf, "getdiff", 7)) { + } else if (cmdmatch(buf, "getdiff")) { send_diff(proxi, sockd); - } else if (!strncasecmp(buf, "reconnect", 9)) { + } else if (cmdmatch(buf, "reconnect")) { kill_proxy(proxi); pthread_cancel(proxi->pth_precv); pthread_cancel(proxi->pth_psend); goto reconnect; - } else if (!strncasecmp(buf, "loglevel", 8)) { + } else if (cmdmatch(buf, "loglevel")) { sscanf(buf, "loglevel=%d", &ckp->loglevel); - } else if (!strncasecmp(buf, "ping", 4)) { + } else if (cmdmatch(buf, "ping")) { LOGDEBUG("Proxy received ping request"); send_unix_msg(sockd, "pong"); } else { From fc1d956f37d3ab0cbd7602d5bf1b30edb77af11b Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Fri, 1 Aug 2014 13:14:03 +1000 Subject: [PATCH 4/7] Make a per-process logger allowing logging to be line buffered yet asynchronous --- src/ckpool.c | 52 +++++++++++++++++++++++++++++++++++++++++++++------- src/ckpool.h | 41 ++++++++++++++++++++++------------------- 2 files changed, 67 insertions(+), 26 deletions(-) diff --git a/src/ckpool.c b/src/ckpool.c index b4fbd941..4490ab84 100644 --- a/src/ckpool.c +++ b/src/ckpool.c @@ -34,6 +34,30 @@ ckpool_t *global_ckp; +static void proclog(ckpool_t *ckp, char *msg) +{ + FILE *LOGFP; + int logfd; + + if (unlikely(!msg)) { + fprintf(stderr, "Proclog received null message"); + return; + } + if (unlikely(!strlen(msg))) { + fprintf(stderr, "Proclog received zero length message"); + free(msg); + return; + } + LOGFP = ckp->logfp; + logfd = ckp->logfd; + + flock(logfd, LOCK_EX); + fprintf(LOGFP, "%s", msg); + flock(logfd, LOCK_UN); + + free(msg); +} + /* Log everything to the logfile, but display warnings on the console as well */ void logmsg(int loglevel, const char *fmt, ...) { if (global_ckp->loglevel >= loglevel && fmt) { @@ -58,14 +82,13 @@ void logmsg(int loglevel, const char *fmt, ...) { tm->tm_min, tm->tm_sec); if (logfd) { - FILE *LOGFP = global_ckp->logfp; + char *msg; - flock(logfd, LOCK_EX); - fprintf(LOGFP, "%s %s", stamp, buf); if (loglevel <= LOG_ERR && errno != 0) - fprintf(LOGFP, " with errno %d: %s", errno, strerror(errno)); - fprintf(LOGFP, "\n"); - flock(logfd, LOCK_UN); + ASPRINTF(&msg, "%s %s with errno %d: %s\n", stamp, buf, errno, strerror(errno)); + else + ASPRINTF(&msg, "%s %s\n", stamp, buf); + ckmsgq_add(global_ckp->logger, msg); } if (loglevel <= LOG_WARNING) {\ if (loglevel <= LOG_ERR && errno != 0) @@ -677,6 +700,17 @@ static void childsighandler(int sig) kill(ppid, sig); } +static void launch_logger(proc_instance_t *pi) +{ + ckpool_t *ckp = pi->ckp; + char loggername[16]; + + /* Note that the logger is unique per process so it is the only value + * in ckp that differs between processes */ + snprintf(loggername, 15, "%clogger", pi->processname[0]); + ckp->logger = create_ckmsgq(ckp, loggername, &proclog); +} + static void launch_process(proc_instance_t *pi) { pid_t pid; @@ -688,6 +722,7 @@ static void launch_process(proc_instance_t *pi) struct sigaction handler; int ret; + launch_logger(pi); handler.sa_handler = &childsighandler; handler.sa_flags = 0; sigemptyset(&handler.sa_mask); @@ -1131,13 +1166,16 @@ int main(int argc, char **argv) ckp.logfp = fopen(buf, "a"); if (!ckp.logfp) quit(1, "Failed to make open log file %s", buf); - ckp.logfd = fileno(ckp.logfp); + /* Make logging line buffered */ + setvbuf(ckp.logfp, NULL, _IOLBF, 0); ckp.main.ckp = &ckp; ckp.main.processname = strdup("main"); ckp.main.sockname = strdup("listener"); write_namepid(&ckp.main); create_process_unixsock(&ckp.main); + launch_logger(&ckp.main); + ckp.logfd = fileno(ckp.logfp); create_pthread(&ckp.pth_listener, listener, &ckp.main); diff --git a/src/ckpool.h b/src/ckpool.h index a785b1bc..94d64e13 100644 --- a/src/ckpool.h +++ b/src/ckpool.h @@ -20,6 +20,26 @@ struct ckpool_instance; typedef struct ckpool_instance ckpool_t; +struct ckmsg { + struct ckmsg *next; + struct ckmsg *prev; + void *data; +}; + +typedef struct ckmsg ckmsg_t; + +struct ckmsgq { + ckpool_t *ckp; + char name[16]; + pthread_t pth; + pthread_mutex_t lock; + pthread_cond_t cond; + ckmsg_t *msgs; + void (*func)(ckpool_t *, void *); +}; + +typedef struct ckmsgq ckmsgq_t; + struct proc_instance { ckpool_t *ckp; unixsock_t us; @@ -81,6 +101,8 @@ struct ckpool_instance { FILE *logfp; int logfd; + /* Logger message queue NOTE: Unique per process */ + ckmsgq_t *logger; /* Process instance data of parent/child processes */ proc_instance_t main; @@ -129,25 +151,6 @@ struct ckpool_instance { char **proxypass; }; -struct ckmsg { - struct ckmsg *next; - struct ckmsg *prev; - void *data; -}; - -typedef struct ckmsg ckmsg_t; - -struct ckmsgq { - ckpool_t *ckp; - char name[16]; - pthread_t pth; - pthread_mutex_t lock; - pthread_cond_t cond; - ckmsg_t *msgs; - void (*func)(ckpool_t *, void *); -}; - -typedef struct ckmsgq ckmsgq_t; ckmsgq_t *create_ckmsgq(ckpool_t *ckp, const char *name, const void *func); void ckmsgq_add(ckmsgq_t *ckmsgq, void *data); From dc3dd18e924720e8d7aac295f8353e6cfb447366 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Fri, 1 Aug 2014 13:28:55 +1000 Subject: [PATCH 5/7] Add a slow 1s timedwait to ckmsg_queue parsing to not miss wakeups --- src/ckpool.c | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/src/ckpool.c b/src/ckpool.c index 4490ab84..5b212564 100644 --- a/src/ckpool.c +++ b/src/ckpool.c @@ -112,16 +112,21 @@ static void *ckmsg_queue(void *arg) while (42) { ckmsg_t *msg; + tv_t now; + ts_t abs; mutex_lock(&ckmsgq->lock); + tv_time(&now); + tv_to_ts(&abs, &now); + abs.tv_sec++; if (!ckmsgq->msgs) - pthread_cond_wait(&ckmsgq->cond, &ckmsgq->lock); + pthread_cond_timedwait(&ckmsgq->cond, &ckmsgq->lock, &abs); msg = ckmsgq->msgs; - if (likely(msg)) + if (msg) DL_DELETE(ckmsgq->msgs, msg); mutex_unlock(&ckmsgq->lock); - if (unlikely(!msg)) + if (!msg) continue; ckmsgq->func(ckp, msg->data); free(msg); From 5b2b4c73ef4f36fe1458edc96416919571d4433a Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Fri, 1 Aug 2014 13:37:56 +1000 Subject: [PATCH 6/7] Remove all handling of interrupted calls which may have been blocking appropriate failure modes --- src/connector.c | 6 ++---- src/libckpool.c | 4 ++-- src/libckpool.h | 5 +---- 3 files changed, 5 insertions(+), 10 deletions(-) diff --git a/src/connector.c b/src/connector.c index 4b6b5fbf..df21489a 100644 --- a/src/connector.c +++ b/src/connector.c @@ -284,10 +284,8 @@ retry: cksleep_ms(100); goto retry; } - do { - ret = poll(fds, nfds, 1000); - } while (unlikely(ret < 0 && interrupted())); - if (ret < 0) { + ret = poll(fds, nfds, 1000); + if (unlikely(ret < 0)) { LOGERR("Failed to poll in receiver"); goto out; } diff --git a/src/libckpool.c b/src/libckpool.c index ab3f6056..c20d6ce2 100644 --- a/src/libckpool.c +++ b/src/libckpool.c @@ -615,7 +615,7 @@ int wait_read_select(int sockd, int timeout) FD_SET(sockd, &readfs); do { ret = select(sockd + 1, &readfs, NULL, NULL, &tv_timeout); - } while (unlikely(ret < 0 && interrupted())); + } while (unlikely(ret < 0)); return ret; } @@ -692,7 +692,7 @@ int wait_write_select(int sockd, int timeout) FD_SET(sockd, &writefds); do { ret = select(sockd + 1, NULL, &writefds, NULL, &tv_timeout); - } while (unlikely(ret < 0 && interrupted())); + } while (unlikely(ret < 0)); return ret; } diff --git a/src/libckpool.h b/src/libckpool.h index e2af60c9..1433bdcc 100644 --- a/src/libckpool.h +++ b/src/libckpool.h @@ -362,14 +362,11 @@ static inline bool sock_blocks(void) { return (errno == EAGAIN || errno == EWOULDBLOCK); } + static inline bool sock_timeout(void) { return (errno == ETIMEDOUT); } -static inline bool interrupted(void) -{ - return (errno == EINTR); -} bool extract_sockaddr(char *url, char **sockaddr_url, char **sockaddr_port); void keep_sockalive(int fd); From f564714345952b916c3aa401f78bcd6062154290 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Fri, 1 Aug 2014 13:54:12 +1000 Subject: [PATCH 7/7] Poll more frequently when a client is not ready to be written to instead of waiting for it --- src/connector.c | 42 +++++++++++++++++++++++------------------- 1 file changed, 23 insertions(+), 19 deletions(-) diff --git a/src/connector.c b/src/connector.c index df21489a..7ea47e6f 100644 --- a/src/connector.c +++ b/src/connector.c @@ -326,13 +326,13 @@ void *sender(void *arg) { conn_instance_t *ci = (conn_instance_t *)arg; ckpool_t *ckp = ci->pi->ckp; + bool polling = false; rename_proc("csender"); while (42) { sender_send_t *sender_send; client_instance_t *client; - bool only_send = false; int ret, fd, ofs = 0; mutex_lock(&sender_lock); @@ -340,14 +340,17 @@ void *sender(void *arg) ts_t timeout_ts; ts_realtime(&timeout_ts); - timeout_ts.tv_sec += 1; + if (!polling) + timeout_ts.tv_sec += 1; + else { + ts_t wait_ts = {0, 1000000}; + timeraddspec(&timeout_ts, &wait_ts); + } pthread_cond_timedwait(&sender_cond, &sender_lock, &timeout_ts); } sender_send = sender_sends; if (likely(sender_send)) DL_DELETE(sender_sends, sender_send); - if (!sender_send) - only_send = true; mutex_unlock(&sender_lock); if (!sender_send) @@ -365,22 +368,23 @@ void *sender(void *arg) free(sender_send); continue; } - /* If there are other sends pending and this socket is not - * ready to receive data from us, put the send back on the - * list. */ - if (!only_send) { - ret = wait_write_select(fd, 0); - if (ret < 1) { - LOGDEBUG("Client %d not ready for writes", client->id); - - /* Append it to the tail of the list */ - mutex_lock(&sender_lock); - DL_APPEND(sender_sends, sender_send); - mutex_unlock(&sender_lock); - - continue; - } + /* If this socket is not ready to receive data from us, put the + * send back on the tail of the list and decrease the timeout + * to poll to either look for a client that is ready or poll + * select on this one */ + ret = wait_write_select(fd, 0); + if (ret < 1) { + LOGDEBUG("Client %d not ready for writes", client->id); + + /* Append it to the tail of the list */ + mutex_lock(&sender_lock); + DL_APPEND(sender_sends, sender_send); + mutex_unlock(&sender_lock); + + polling = true; + continue; } + polling = false; while (sender_send->len) { ret = send(fd, sender_send->buf + ofs, sender_send->len , 0); if (unlikely(ret < 0)) {