From fa38324af4a216407eccf81918f578374b97b740 Mon Sep 17 00:00:00 2001 From: kanoi Date: Tue, 21 Apr 2015 11:37:08 +1000 Subject: [PATCH 01/62] ckdb - make the terminate sequence report external --- src/ckdb.c | 95 +++++++++++++++++++++++++++++++++++++++--------------- src/ckdb.h | 3 +- 2 files changed, 71 insertions(+), 27 deletions(-) diff --git a/src/ckdb.c b/src/ckdb.c index 2c901d5b..1b652518 100644 --- a/src/ckdb.c +++ b/src/ckdb.c @@ -1130,16 +1130,16 @@ static void alloc_storage() marks_root = new_ktree(); } -#define SEQSETWARN(_seqset, _msgtxt, _endtxt) do { \ +#define SEQSETWARN(_set, _seqset, _msgtxt, _endtxt) do { \ char _t_buf[DATE_BUFSIZ]; \ btu64_to_buf(&((_seqset)->seqstt), _t_buf, sizeof(_t_buf)); \ - LOGWARNING("SEQ %s: "SEQSTT" %"PRIu64"=%s "SEQPID" %"PRIu64" M%"PRIu64 \ + LOGWARNING("SEQ %s: %d/"SEQSTT" %"PRIu64"=%s "SEQPID" %"PRIu64" M%"PRIu64 \ "/T%"PRIu64"/L%"PRIu64"/S%"PRIu64"/H%"PRIu64"/OK%"PRIu64 \ " %s v%"PRIu64"/^%"PRIu64"/M%"PRIu64"/T%"PRIu64"/L%"PRIu64 \ "/S%"PRIu64"/H%"PRIu64"/OK%"PRIu64" %s v%"PRIu64"/^%"PRIu64 \ "/M%"PRIu64"/T%"PRIu64"/L%"PRIu64"/S%"PRIu64"/H%"PRIu64 \ "/OK%"PRIu64"%s", \ - _msgtxt, (_seqset)->seqstt, _t_buf, (_seqset)->seqpid, \ + _msgtxt, _set, (_seqset)->seqstt, _t_buf, (_seqset)->seqpid, \ (_seqset)->missing, (_seqset)->trans, (_seqset)->lost, \ (_seqset)->stale, (_seqset)->high, (_seqset)->ok, \ seqnam[SEQ_ALL], \ @@ -1198,13 +1198,61 @@ static void alloc_storage() #define FREE_ALL(_list) FREE_TREE(_list); FREE_LISTS(_list) +/* Write a share missing/lost report to the console - always report set 0 + * It's possible for the set numbers to be wrong and the output to report one + * seqset twice if a new seqset arrives between the unlock/lock for writing + * the console message - since a new seqset would shuffle the sets down and + * if the list was full, move the last one back to the top of the setset_store + * list, but this would normally only be when ckpool restarts and also wont + * cause a problem since only the last set can be moved and the code checks + * if it is the end and also duplicates the set before releasing the lock */ +void sequence_report(bool lock) +{ + SEQSET *seqset, seqset_copy; + K_ITEM *ss_item; + bool last, miss; + int set; + + last = false; + set = 0; + if (lock) + ck_wlock(&seq_lock); + ss_item = seqset_store->head; + while (!last && ss_item) { + if (!ss_item->next) + last = true; + DATA_SEQSET(seqset, ss_item); + /* Missing/Trans/Lost should all be 0 for shares */ + if (seqset->seqstt && (set == 0 || + seqset->seqdata[SEQ_SHARES].missing || + seqset->seqdata[SEQ_SHARES].trans || + seqset->seqdata[SEQ_SHARES].lost)) { + miss = (seqset->seqdata[SEQ_SHARES].missing || + seqset->seqdata[SEQ_SHARES].trans || + seqset->seqdata[SEQ_SHARES].lost); + if (lock) { + memcpy(&seqset_copy, seqset, sizeof(seqset_copy)); + ck_wunlock(&seq_lock); + seqset = &seqset_copy; + } + SEQSETWARN(set, seqset, + miss ? "SHARES MISSING" : "status" , EMPTY); + if (lock) + ck_wlock(&seq_lock); + } + ss_item = ss_item->next; + set++; + } + if (lock) + ck_wunlock(&seq_lock); +} + static void dealloc_storage() { SHAREERRORS *shareerrors; - K_ITEM *s_item, *ss_item; + K_ITEM *s_item; char *st = NULL; SHARES *shares; - SEQSET *seqset; LOGWARNING("%s() logqueue ...", __func__); @@ -1331,18 +1379,8 @@ static void dealloc_storage() FREE_LISTS(workqueue); LOGWARNING("%s() seqset ...", __func__); + sequence_report(false); - ss_item = seqset_store->head; - while (ss_item) { - DATA_SEQSET(seqset, ss_item); - /* Missing/Trans/Lost should all be 0 for shares */ - if (seqset->seqstt && (seqset->seqdata[SEQ_SHARES].missing || - seqset->seqdata[SEQ_SHARES].trans || - seqset->seqdata[SEQ_SHARES].lost)) { - SEQSETWARN(seqset, "SHARES MISSING", EMPTY); - } - ss_item = ss_item->next; - } FREE_LIST(seqtrans); FREE_STORE_DATA(seqset); @@ -1930,10 +1968,14 @@ setitemdata: SEQSTT" %"PRIu64"=%s "SEQPID" %"PRIu64, nam, n_seqcmd, n_seqstt, t_buf, n_seqpid); } else { - if (newseq) - SEQSETWARN(&seqset_pre, "previous", EMPTY); - if (expseq) - SEQSETWARN(&seqset_exp, "discarded old", " for:"); + if (newseq) { + // previous set is set 1 + SEQSETWARN(1, &seqset_pre, "previous", EMPTY); + } + if (expseq) { + // set -1 means it was the discarded/removed last set + SEQSETWARN(-1, &seqset_exp, "discarded old", " for:"); + } if (newseq || expseq) { btu64_to_buf(&n_seqstt, t_buf, sizeof(t_buf)); LOGWARNING("Seq created new: %s %"PRIu64" " @@ -2142,12 +2184,13 @@ static enum cmd_values process_seq(K_ITEM *seqall, int which, tv_t *cd, buf, now, cd, code, warndup, msg); if (dupall != dupcmd) { - LOGERR("SEQ INIMICAL %s/%"PRIu64"=%s %s/%"PRIu64"=%s " - "cmd=%.32s...", - seqnam[SEQ_ALL], n_seqall, dupall ? "DUP" : "notdup", - seqnam[ckdb_cmds[which].seq], n_seqcmd, - dupcmd ? "DUP" : "notdup", - st = safe_text_nonull(msg)); + // Bad/corrupt data or a code bug + LOGEMERG("SEQ INIMICAL %s/%"PRIu64"=%s %s/%"PRIu64"=%s " + "cmd=%.32s...", + seqnam[SEQ_ALL], n_seqall, dupall ? "DUP" : "notdup", + seqnam[ckdb_cmds[which].seq], n_seqcmd, + dupcmd ? "DUP" : "notdup", + st = safe_text_nonull(msg)); FREENULL(st); } diff --git a/src/ckdb.h b/src/ckdb.h index 636e7922..30be9f8f 100644 --- a/src/ckdb.h +++ b/src/ckdb.h @@ -55,7 +55,7 @@ #define DB_VLOCK "1" #define DB_VERSION "1.0.0" -#define CKDB_VERSION DB_VERSION"-1.073" +#define CKDB_VERSION DB_VERSION"-1.074" #define WHERE_FFL " - from %s %s() line %d" #define WHERE_FFL_HERE __FILE__, __func__, __LINE__ @@ -1864,6 +1864,7 @@ extern void logmsg(int loglevel, const char *fmt, ...); extern void setnow(tv_t *now); extern void tick(); extern PGconn *dbconnect(); +extern void sequence_report(bool lock); // *** // *** ckdb_data.c *** From 19bce29d1c88ea428bc3004ef526438142026b26 Mon Sep 17 00:00:00 2001 From: kanoi Date: Tue, 21 Apr 2015 15:31:24 +1000 Subject: [PATCH 02/62] ckdb - add a share status console report command shsta --- src/ckdb.c | 9 ++++++--- src/ckdb.h | 3 ++- src/ckdb_cmd.c | 19 +++++++++++++++++++ 3 files changed, 27 insertions(+), 4 deletions(-) diff --git a/src/ckdb.c b/src/ckdb.c index 1b652518..197eda34 100644 --- a/src/ckdb.c +++ b/src/ckdb.c @@ -3674,10 +3674,10 @@ static void *socketer(__maybe_unused void *arg) case CMD_HEARTBEAT: // First message from the pool if (want_first) { + want_first = false; ck_wlock(&fpm_lock); first_pool_message = strdup(buf); ck_wunlock(&fpm_lock); - want_first = false; } case CMD_CHKPASS: case CMD_ADDUSER: @@ -3693,6 +3693,7 @@ static void *socketer(__maybe_unused void *arg) case CMD_NEWID: case CMD_STATS: case CMD_USERSTATUS: + case CMD_SHSTA: ans = ckdb_cmds[which_cmds].func(NULL, cmd, id, &now, by_default, (char *)__func__, @@ -3816,10 +3817,10 @@ static void *socketer(__maybe_unused void *arg) case CMD_BLOCK: // First message from the pool if (want_first) { + want_first = false; ck_wlock(&fpm_lock); first_pool_message = strdup(buf); ck_wunlock(&fpm_lock); - want_first = false; } snprintf(reply, sizeof(reply), @@ -3982,7 +3983,8 @@ static bool reload_line(PGconn *conn, char *filename, uint64_t count, char *buf) case CMD_USERSTATUS: case CMD_MARKS: case CMD_PSHIFT: - LOGERR("%s() Message line %"PRIu64" '%s' - invalid - ignored", + case CMD_SHSTA: + LOGERR("%s() INVALID message line %"PRIu64" '%s' - ignored", __func__, count, cmd); break; case CMD_AUTH: @@ -4348,6 +4350,7 @@ static void *listener(void *arg) K_RUNLOCK(workqueue_store); LOGWARNING("reload shares OoO %s", ooo_status(ooo_buf, sizeof(ooo_buf))); + sequence_report(true); LOGWARNING("%s(): ckdb ready, queue %d", __func__, wqcount); diff --git a/src/ckdb.h b/src/ckdb.h index 30be9f8f..f7b8f43a 100644 --- a/src/ckdb.h +++ b/src/ckdb.h @@ -55,7 +55,7 @@ #define DB_VLOCK "1" #define DB_VERSION "1.0.0" -#define CKDB_VERSION DB_VERSION"-1.074" +#define CKDB_VERSION DB_VERSION"-1.075" #define WHERE_FFL " - from %s %s() line %d" #define WHERE_FFL_HERE __FILE__, __func__, __LINE__ @@ -402,6 +402,7 @@ enum cmd_values { CMD_USERSTATUS, CMD_MARKS, CMD_PSHIFT, + CMD_SHSTA, CMD_END }; diff --git a/src/ckdb_cmd.c b/src/ckdb_cmd.c index 6187d467..5e145ebd 100644 --- a/src/ckdb_cmd.c +++ b/src/ckdb_cmd.c @@ -5844,6 +5844,24 @@ static char *cmd_pshift(__maybe_unused PGconn *conn, char *cmd, char *id, return(buf); } +/* Show a share status report on the console + * Currently: sequence status and OoO info */ +static char *cmd_shsta(__maybe_unused PGconn *conn, char *cmd, char *id, + __maybe_unused tv_t *now, __maybe_unused char *by, + __maybe_unused char *code, __maybe_unused char *inet, + __maybe_unused tv_t *notcd, __maybe_unused K_TREE *trf_root) +{ + char ooo_buf[256]; + char buf[256]; + + LOGWARNING("OoO %s", ooo_status(ooo_buf, sizeof(ooo_buf))); + sequence_report(true); + + snprintf(buf, sizeof(buf), "ok.%s", cmd); + LOGDEBUG("%s.%s", id, buf); + return strdup(buf); +} + // TODO: limit access by having seperate sockets for each #define ACCESS_POOL "p" #define ACCESS_SYSTEM "s" @@ -5955,5 +5973,6 @@ struct CMDS ckdb_cmds[] = { { CMD_USERSTATUS,"userstatus", false, false, cmd_userstatus, SEQ_NONE, ACCESS_SYSTEM ACCESS_WEB }, { CMD_MARKS, "marks", false, false, cmd_marks, SEQ_NONE, ACCESS_SYSTEM }, { CMD_PSHIFT, "pshift", false, false, cmd_pshift, SEQ_NONE, ACCESS_SYSTEM ACCESS_WEB }, + { CMD_SHSTA, "shsta", true, false, cmd_shsta, SEQ_NONE, ACCESS_SYSTEM }, { CMD_END, NULL, false, false, NULL, SEQ_NONE, NULL } }; From da20ac7f3e9d2063dc9387c2955885974ed3b684 Mon Sep 17 00:00:00 2001 From: ckolivas Date: Wed, 22 Apr 2015 12:10:45 +1000 Subject: [PATCH 03/62] Disable user of nolinger on client sockets --- src/connector.c | 1 - 1 file changed, 1 deletion(-) diff --git a/src/connector.c b/src/connector.c index 43f268e5..5fc9fcec 100644 --- a/src/connector.c +++ b/src/connector.c @@ -219,7 +219,6 @@ static int accept_client(cdata_t *cdata, const int epfd, const uint64_t server) } keep_sockalive(fd); - nolinger_socket(fd); LOGINFO("Connected new client %d on socket %d to %d active clients from %s:%d", cdata->nfds, fd, no_clients, client->address_name, port); From caeee9b32234df64f057975b94281beed4d675a0 Mon Sep 17 00:00:00 2001 From: kanoi Date: Wed, 22 Apr 2015 13:32:07 +1000 Subject: [PATCH 04/62] ckdb - add klist insert_after and general null checking to report calling line --- src/ckdb.h | 2 +- src/klist.c | 104 +++++++++++++++++++++++++++++++++++++++++++++++++++- src/klist.h | 9 +++-- 3 files changed, 110 insertions(+), 5 deletions(-) diff --git a/src/ckdb.h b/src/ckdb.h index f7b8f43a..4b0dfc16 100644 --- a/src/ckdb.h +++ b/src/ckdb.h @@ -55,7 +55,7 @@ #define DB_VLOCK "1" #define DB_VERSION "1.0.0" -#define CKDB_VERSION DB_VERSION"-1.075" +#define CKDB_VERSION DB_VERSION"-1.076" #define WHERE_FFL " - from %s %s() line %d" #define WHERE_FFL_HERE __FILE__, __func__, __LINE__ diff --git a/src/klist.c b/src/klist.c index 65e30448..1254628d 100644 --- a/src/klist.c +++ b/src/klist.c @@ -9,11 +9,36 @@ #include "klist.h" +#define _CHKLIST(_list, _name) do {\ + if (!_list) { \ + quithere(1, "%s() can't process a NULL " _name \ + KLIST_FFL, \ + __func__, KLIST_FFL_PASS); \ + } \ + } while (0); + +#define CHKLIST(__list) _CHKLIST(__list, "list") + +#define CHKLS(__list) _CHKLIST(__list, "list/store") + +#define _CHKITEM(_item, _list, _name) do {\ + if (!_item) { \ + quithere(1, "%s() can't process a NULL %s " _name \ + KLIST_FFL, \ + __func__, _list->name, \ + KLIST_FFL_PASS); \ + } \ + } while (0); + +#define CHKITEM(__item, __list) _CHKITEM(__item, __list, "item") + static void k_alloc_items(K_LIST *list, KLIST_FFL_ARGS) { K_ITEM *item; int allocate, i; + CHKLIST(list); + if (list->is_store) { quithere(1, "List %s store can't %s()" KLIST_FFL, list->name, __func__, KLIST_FFL_PASS); @@ -76,10 +101,12 @@ static void k_alloc_items(K_LIST *list, KLIST_FFL_ARGS) } } -K_STORE *k_new_store(K_LIST *list) +K_STORE *_k_new_store(K_LIST *list, KLIST_FFL_ARGS) { K_STORE *store; + CHKLIST(list); + store = calloc(1, sizeof(*store)); if (!store) quithere(1, "Failed to calloc store for %s", list->name); @@ -136,6 +163,8 @@ K_ITEM *_k_unlink_head(K_LIST *list, KLIST_FFL_ARGS) { K_ITEM *item; + CHKLS(list); + if (!(list->head) && !(list->is_store)) k_alloc_items(list, KLIST_FFL_PASS); @@ -163,6 +192,8 @@ K_ITEM *_k_unlink_head_zero(K_LIST *list, KLIST_FFL_ARGS) { K_ITEM *item; + CHKLS(list); + item = _k_unlink_head(list, KLIST_FFL_PASS); if (item) @@ -176,6 +207,8 @@ K_ITEM *_k_unlink_tail(K_LIST *list, KLIST_FFL_ARGS) { K_ITEM *item; + CHKLS(list); + if (!(list->do_tail)) { quithere(1, "List %s can't %s() - do_tail is false" KLIST_FFL, list->name, __func__, KLIST_FFL_PASS); @@ -200,6 +233,10 @@ K_ITEM *_k_unlink_tail(K_LIST *list, KLIST_FFL_ARGS) void _k_add_head(K_LIST *list, K_ITEM *item, KLIST_FFL_ARGS) { + CHKLS(list); + + CHKITEM(item, list); + if (item->name != list->name) { quithere(1, "List %s can't %s() a %s item" KLIST_FFL, list->name, __func__, item->name, KLIST_FFL_PASS); @@ -229,6 +266,10 @@ void _k_add_head(K_LIST *list, K_ITEM *item, KLIST_FFL_ARGS) /* slows it down (of course) - only for debugging void _k_free_head(K_LIST *list, K_ITEM *item, KLIST_FFL_ARGS) { + CHKLS(list); + + CHKITEM(item, list); + memset(item->data, 0xff, list->siz); _k_add_head(list, item, KLIST_FFL_PASS); } @@ -236,6 +277,10 @@ void _k_free_head(K_LIST *list, K_ITEM *item, KLIST_FFL_ARGS) void _k_add_tail(K_LIST *list, K_ITEM *item, KLIST_FFL_ARGS) { + CHKLS(list); + + CHKITEM(item, list); + if (item->name != list->name) { quithere(1, "List %s can't %s() a %s item" KLIST_FFL, list->name, __func__, item->name, KLIST_FFL_PASS); @@ -265,8 +310,51 @@ void _k_add_tail(K_LIST *list, K_ITEM *item, KLIST_FFL_ARGS) list->count_up++; } +// Insert item into the list next after 'after' +void _k_insert_after(K_LIST *list, K_ITEM *item, K_ITEM *after, KLIST_FFL_ARGS) +{ + CHKLS(list); + + CHKITEM(item, list); + + _CHKITEM(item, after, "after"); + + if (item->name != list->name) { + quithere(1, "List %s can't %s() a %s item" KLIST_FFL, + list->name, __func__, item->name, KLIST_FFL_PASS); + } + + if (after->name != list->name) { + quithere(1, "List %s can't %s() a %s after" KLIST_FFL, + list->name, __func__, item->name, KLIST_FFL_PASS); + } + + if (item->prev || item->next) { + quithere(1, "%s() added item %s still linked" KLIST_FFL, + __func__, item->name, KLIST_FFL_PASS); + } + + item->prev = after; + item->next = after->next; + if (item->next) + item->next->prev = item; + after->next = item; + + if (list->do_tail) { + if (list->tail == after) + list->tail = item; + } + + list->count++; + list->count_up++; +} + void _k_unlink_item(K_LIST *list, K_ITEM *item, KLIST_FFL_ARGS) { + CHKLS(list); + + CHKITEM(item, list); + if (item->name != list->name) { quithere(1, "List %s can't %s() a %s item" KLIST_FFL, list->name, __func__, item->name, KLIST_FFL_PASS); @@ -293,6 +381,10 @@ void _k_unlink_item(K_LIST *list, K_ITEM *item, KLIST_FFL_ARGS) void _k_list_transfer_to_head(K_LIST *from, K_LIST *to, KLIST_FFL_ARGS) { + _CHKLIST(from, "from list/store"); + + _CHKLIST(to, "to list/store"); + if (from->name != to->name) { quithere(1, "List %s can't %s() to a %s list" KLIST_FFL, from->name, __func__, to->name, KLIST_FFL_PASS); @@ -323,6 +415,10 @@ void _k_list_transfer_to_head(K_LIST *from, K_LIST *to, KLIST_FFL_ARGS) void _k_list_transfer_to_tail(K_LIST *from, K_LIST *to, KLIST_FFL_ARGS) { + _CHKLIST(from, "from list/store"); + + _CHKLIST(to, "to list/store"); + if (from->name != to->name) { quithere(1, "List %s can't %s() to a %s list" KLIST_FFL, from->name, __func__, to->name, KLIST_FFL_PASS); @@ -355,6 +451,8 @@ K_LIST *_k_free_list(K_LIST *list, KLIST_FFL_ARGS) { int i; + CHKLIST(list); + if (list->is_store) { quithere(1, "List %s can't %s() a store" KLIST_FFL, list->name, __func__, KLIST_FFL_PASS); @@ -379,6 +477,8 @@ K_LIST *_k_free_list(K_LIST *list, KLIST_FFL_ARGS) K_STORE *_k_free_store(K_STORE *store, KLIST_FFL_ARGS) { + _CHKLIST(store, "store"); + if (!(store->is_store)) { quithere(1, "Store %s can't %s() the list" KLIST_FFL, store->name, __func__, KLIST_FFL_PASS); @@ -394,6 +494,8 @@ void _k_cull_list(K_LIST *list, KLIST_FFL_ARGS) { int i; + CHKLIST(list); + if (list->is_store) { quithere(1, "List %s can't %s() a store" KLIST_FFL, list->name, __func__, KLIST_FFL_PASS); diff --git a/src/klist.h b/src/klist.h index 26aaf303..602bdac1 100644 --- a/src/klist.h +++ b/src/klist.h @@ -71,7 +71,8 @@ typedef struct k_list { // Upgrade I to W #define K_ULOCK(_list) ck_ulock(_list->lock) -extern K_STORE *k_new_store(K_LIST *list); +extern K_STORE *_k_new_store(K_LIST *list, KLIST_FFL_ARGS); +#define k_new_store(_list) _k_new_store(_list, KLIST_FFL_HERE) extern K_LIST *_k_new_list(const char *name, size_t siz, int allocate, int limit, bool do_tail, KLIST_FFL_ARGS); #define k_new_list(_name, _siz, _allocate, _limit, _do_tail) _k_new_list(_name, _siz, _allocate, _limit, _do_tail, KLIST_FFL_HERE) extern K_ITEM *_k_unlink_head(K_LIST *list, KLIST_FFL_ARGS); @@ -86,11 +87,13 @@ extern void _k_add_head(K_LIST *list, K_ITEM *item, KLIST_FFL_ARGS); #define k_free_head(__list, __item) _k_add_head(__list, __item, KLIST_FFL_HERE) extern void _k_add_tail(K_LIST *list, K_ITEM *item, KLIST_FFL_ARGS); #define k_add_tail(_list, _item) _k_add_tail(_list, _item, KLIST_FFL_HERE) +extern void _k_insert_after(K_LIST *list, K_ITEM *item, K_ITEM *after, KLIST_FFL_ARGS); +#define k_insert_after(_list, _item, _after) _k_insert_after(_list, _item, _after, KLIST_FFL_HERE) extern void _k_unlink_item(K_LIST *list, K_ITEM *item, KLIST_FFL_ARGS); #define k_unlink_item(_list, _item) _k_unlink_item(_list, _item, KLIST_FFL_HERE) -void _k_list_transfer_to_head(K_LIST *from, K_LIST *to, KLIST_FFL_ARGS); +extern void _k_list_transfer_to_head(K_LIST *from, K_LIST *to, KLIST_FFL_ARGS); #define k_list_transfer_to_head(_from, _to) _k_list_transfer_to_head(_from, _to, KLIST_FFL_HERE) -void _k_list_transfer_to_tail(K_LIST *from, K_LIST *to, KLIST_FFL_ARGS); +extern void _k_list_transfer_to_tail(K_LIST *from, K_LIST *to, KLIST_FFL_ARGS); #define k_list_transfer_to_tail(_from, _to) _k_list_transfer_to_tail(_from, _to, KLIST_FFL_HERE) extern K_LIST *_k_free_list(K_LIST *list, KLIST_FFL_ARGS); #define k_free_list(_list) _k_free_list(_list, KLIST_FFL_HERE) From 49ce58794930d4a4b58b9fed686c007a62076b7c Mon Sep 17 00:00:00 2001 From: ckolivas Date: Wed, 22 Apr 2015 14:17:52 +1000 Subject: [PATCH 05/62] nolinger client sockets when we're about to close them --- src/connector.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/connector.c b/src/connector.c index 5fc9fcec..ea1a0a45 100644 --- a/src/connector.c +++ b/src/connector.c @@ -257,7 +257,8 @@ static int drop_client(cdata_t *cdata, client_instance_t *client) if (fd != -1) { client_id = client->id; - epoll_ctl(cdata->epfd, EPOLL_CTL_DEL, client->fd, NULL); + epoll_ctl(cdata->epfd, EPOLL_CTL_DEL, fd, NULL); + nolinger_socket(fd); Close(client->fd); HASH_DEL(cdata->clients, client); DL_APPEND(cdata->dead_clients, client); From 8313628629ad452c4eb62961026aad1614e94645 Mon Sep 17 00:00:00 2001 From: ckolivas Date: Wed, 22 Apr 2015 14:54:00 +1000 Subject: [PATCH 06/62] Differentiate ready for writes from hangups in wait_write_select --- src/libckpool.c | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/src/libckpool.c b/src/libckpool.c index f20a8803..fee42439 100644 --- a/src/libckpool.c +++ b/src/libckpool.c @@ -983,14 +983,19 @@ out: int wait_write_select(int sockd, int timeout) { struct pollfd sfd; + int ret = -1; if (unlikely(sockd < 0)) - return -1; + goto out; sfd.fd = sockd; sfd.events = POLLOUT; sfd.revents = 0; timeout *= 1000; - return poll(&sfd, 1, timeout); + ret = poll(&sfd, 1, timeout); + if (ret && !(sfd.revents & POLLOUT)) + ret = -1; +out: + return ret; } int write_length(int sockd, const void *buf, int len) From 57e3aa83c6fff2fc5ddc89239ac48091c925fd71 Mon Sep 17 00:00:00 2001 From: ckolivas Date: Wed, 22 Apr 2015 14:55:49 +1000 Subject: [PATCH 07/62] Differentiate pollin from pollhup in wait_read_select --- src/libckpool.c | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/src/libckpool.c b/src/libckpool.c index fee42439..84e8ce8c 100644 --- a/src/libckpool.c +++ b/src/libckpool.c @@ -906,14 +906,19 @@ int wait_close(int sockd, int timeout) int wait_read_select(int sockd, int timeout) { struct pollfd sfd; + int ret = -1; if (unlikely(sockd < 0)) - return -1; + goto out; sfd.fd = sockd; sfd.events = POLLIN; sfd.revents = 0; timeout *= 1000; - return poll(&sfd, 1, timeout); + ret = poll(&sfd, 1, timeout); + if (ret && !(sfd.revents & POLLIN)) + ret = -1; +out: + return ret; } int read_length(int sockd, void *buf, int len) From 91d2aca51f3934cc8bf8ee133d652be2b99dec96 Mon Sep 17 00:00:00 2001 From: ckolivas Date: Wed, 22 Apr 2015 15:07:04 +1000 Subject: [PATCH 08/62] Detect pollhup in wait_read_select and not through recv fail conditions --- src/connector.c | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/src/connector.c b/src/connector.c index ea1a0a45..baad4c30 100644 --- a/src/connector.c +++ b/src/connector.c @@ -318,31 +318,26 @@ static void send_client(cdata_t *cdata, int64_t id, char *buf); /* Client is holding a reference count from being on the epoll list */ static void parse_client_msg(cdata_t *cdata, client_instance_t *client) { - int buflen, ret, selfail = 0; ckpool_t *ckp = cdata->ckp; char msg[PAGESIZE], *eol; + int buflen, ret; json_t *val; retry: - /* Select should always return positive after poll unless we have - * been disconnected. On retries, decdatade whether we should do further - * reads based on select readiness and only fail if we get an error. */ ret = wait_read_select(client->fd, 0); if (ret < 1) { - if (ret > selfail) + if (!ret) return; LOGINFO("Client fd %d disconnected - select fail with bufofs %d ret %d errno %d %s", client->fd, client->bufofs, ret, errno, ret && errno ? strerror(errno) : ""); invalidate_client(ckp, cdata, client); return; } - selfail = -1; buflen = PAGESIZE - client->bufofs; ret = recv(client->fd, client->buf + client->bufofs, buflen, 0); if (ret < 1) { - /* We should have something to read if called since poll set - * this fd's revents status so if there's nothing it means the - * client has disconnected. */ + if (!ret) + return; LOGINFO("Client fd %d disconnected - recv fail with bufofs %d ret %d errno %d %s", client->fd, client->bufofs, ret, errno, ret && errno ? strerror(errno) : ""); invalidate_client(ckp, cdata, client); From 1d25cb88b0c7475a5466cb9ff00f38264e549901 Mon Sep 17 00:00:00 2001 From: kanoi Date: Wed, 22 Apr 2015 17:49:05 +1000 Subject: [PATCH 09/62] ckdb - ensure expected duplicates are debug only and don't stop the reload early --- src/ckdb.c | 179 ++++++++++++++++++++++++++++++++--------------------- src/ckdb.h | 18 +++++- 2 files changed, 126 insertions(+), 71 deletions(-) diff --git a/src/ckdb.c b/src/ckdb.c index 197eda34..66c9a525 100644 --- a/src/ckdb.c +++ b/src/ckdb.c @@ -1689,7 +1689,7 @@ static void trans_seq(tv_t *now) static bool update_seq(enum seq_num seq, uint64_t n_seqcmd, uint64_t n_seqstt, uint64_t n_seqpid, char *nam, tv_t *now, tv_t *cd, char *code, - bool warndup, char *msg) + int seqitemflags, char *msg) { char t_buf[DATE_BUFSIZ], t_buf2[DATE_BUFSIZ], *st = NULL; bool firstseq, newseq, expseq, gothigh, gotstale, gotstalestart; @@ -1697,13 +1697,13 @@ static bool update_seq(enum seq_num seq, uint64_t n_seqcmd, SEQSET seqset_exp = { 0 }, seqset_copy = { 0 }; bool dup, wastrans, doitem, dotime; SEQDATA *seqdata; - SEQITEM *seqitem; + SEQITEM *seqitem, seqitem_copy; K_ITEM *seqset_item = NULL, *st_item = NULL; SEQTRANS *seqtrans = NULL; size_t siz, end; void *off0, *offn; uint64_t u; - int set = -1, highlimit, i; + int set = -1, expset = -1, highlimit, i; K_STORE *lost; // We store the lost items in here @@ -1732,10 +1732,10 @@ static bool update_seq(enum seq_num seq, uint64_t n_seqcmd, } } - // Need to get a new seqset + // Need to setup a new seqset newseq = true; if (!firstseq) { - /* The current seqset (about to become the previous) + /* The current seqset (may become the previous) * If !seqset_store->head (i.e. a bug) this will quit() */ DATA_SEQSET(seqset0, seqset_store->head); memcpy(&seqset_pre, seqset0, sizeof(seqset_pre)); @@ -1796,16 +1796,59 @@ static bool update_seq(enum seq_num seq, uint64_t n_seqcmd, LIST_MEM_ADD_SIZ(seqset_free, end); } } else { - // Expire the last set and overwrite it - seqset_item = k_unlink_tail(seqset_store); - // If !item (i.e. a bug) this will quit() + // Expire the oldest set and overwrite it + K_ITEM *ss_item; + SEQSET *ss = NULL; + int s = 0; + seqset = NULL; + seqset_item = NULL; + ss_item = seqset_store->head; + while (ss_item) { + DATA_SEQSET(ss, ss_item); + if (!seqset) { + seqset = ss; + seqset_item = ss_item; + expset = s; + } else { + // choose the last match + if (ss->seqstt >= seqset->seqstt) { + seqset = ss; + seqset_item = ss_item; + expset = s; + } + } + ss_item = ss_item->next; + s++; + } + // If !seqset_item (i.e. a bug) k_unlink_item() will quit() + k_unlink_item(seqset_store, seqset_item); DATA_SEQSET(seqset, seqset_item); memcpy(&seqset_exp, seqset, sizeof(seqset_exp)); expseq = true; RESETSET(seqset, n_seqstt, n_seqpid); } - k_add_head(seqset_store, seqset_item); - set = 0; + /* Since the pool queue is active during the reload, sets can be out + * of order, so each new one should be added depending upon the value + * of seqstt so the current pool is first, to minimise searching + * seqset_store, but the order of the rest isn't as important + * N.B. a new set is only created once per pool start */ + if (firstseq) { + k_add_head(seqset_store, seqset_item); + set = 0; + } else { + // seqset0 already is the head + if (n_seqstt >= seqset0->seqstt) { + // if new set is >= head then make it the head + k_add_head(seqset_store, seqset_item); + set = 0; + } else { + // put it next after the head + k_insert_after(seqset_store, seqset_item, + seqset_store->head); + set = 1; + } + + } gotseqset: doitem = dotime = false; @@ -1895,6 +1938,7 @@ gotseqset: if (!ITEMISMIS(seqitem)) { dup = true; memcpy(&seqset_copy, seqset, sizeof(seqset_copy)); + memcpy(&seqitem_copy, seqitem, sizeof(seqitem_copy)); } else { // Found a missing one seqdata->missing--; @@ -1942,6 +1986,7 @@ gotseqset: setitemdata: // Store the new seq if flagged to do so if (doitem) { + seqitem->flags = seqitemflags; copy_tv(&(seqitem->time), now); copy_tv(&(seqitem->cd), cd); STRNCPY(seqitem->code, code); @@ -1969,13 +2014,13 @@ setitemdata: nam, n_seqcmd, n_seqstt, t_buf, n_seqpid); } else { if (newseq) { - // previous set is set 1 - SEQSETWARN(1, &seqset_pre, "previous", EMPTY); - } - if (expseq) { - // set -1 means it was the discarded/removed last set - SEQSETWARN(-1, &seqset_exp, "discarded old", " for:"); + if (set == 0) + SEQSETWARN(0, &seqset_pre, "previous", EMPTY); + else + SEQSETWARN(0, &seqset_pre, "current", EMPTY); } + if (expseq) + SEQSETWARN(expset, &seqset_exp, "discarded old", " for:"); if (newseq || expseq) { btu64_to_buf(&n_seqstt, t_buf, sizeof(t_buf)); LOGWARNING("Seq created new: %s %"PRIu64" " @@ -1985,15 +2030,20 @@ setitemdata: } if (dup) { - int level = LOG_DEBUG; - if (warndup) - level = LOG_WARNING; + int level = LOG_WARNING; + /* If one is SI_RELOAD and the other is SI_EARLYSOCK then it's + * not unexpected so only LOG_DEBUG */ + if (((seqitem_copy.flags | seqitemflags) & SI_RELOAD) && + ((seqitem_copy.flags | seqitemflags) & SI_EARLYSOCK)) + level = LOG_DEBUG; btu64_to_buf(&n_seqstt, t_buf, sizeof(t_buf)); bt_to_buf(&(cd->tv_sec), t_buf2, sizeof(t_buf2)); - LOGMSG(level, "SEQ dup %s %"PRIu64" set %d/%"PRIu64"=%s/%"PRIu64 - " %s/%s v%"PRIu64"/^%"PRIu64"/M%"PRIu64 - "/T%"PRIu64"/L%"PRIu64"/S%"PRIu64"/H%"PRIu64 - "/OK%"PRIu64" cmd=%.42s...", + LOGMSG(level, "SEQ dup%s %c:%c %s %"PRIu64" set %d/%"PRIu64 + "=%s/%"PRIu64" %s/%s v%"PRIu64"/^%"PRIu64 + "/M%"PRIu64"/T%"PRIu64"/L%"PRIu64"/S%"PRIu64 + "/H%"PRIu64"/OK%"PRIu64" cmd=%.42s...", + (level == LOG_DEBUG) ? "*" : EMPTY, + SICHR(seqitemflags), SICHR(seqitem_copy.flags), nam, n_seqcmd, set, n_seqstt, t_buf, n_seqpid, t_buf2, code, seqset_copy.seqdata[seq].minseq, @@ -2009,15 +2059,12 @@ setitemdata: } if (wastrans) { - int level = LOG_DEBUG; - if (warndup) - level = LOG_WARNING; btu64_to_buf(&n_seqstt, t_buf, sizeof(t_buf)); bt_to_buf(&(cd->tv_sec), t_buf2, sizeof(t_buf2)); - LOGMSG(level, "SEQ found trans %s %"PRIu64" set %d/%"PRIu64 - "=%s/%"PRIu64" %s/%s", - nam, n_seqcmd, set, n_seqstt, t_buf, n_seqpid, - t_buf2, code); + LOGWARNING("SEQ found trans %s %"PRIu64" set %d/%"PRIu64 + "=%s/%"PRIu64" %s/%s", + nam, n_seqcmd, set, n_seqstt, t_buf, n_seqpid, + t_buf2, code); } if (gotstale || gotstalestart || gothigh) { @@ -2080,13 +2127,13 @@ setitemdata: static enum cmd_values process_seq(K_ITEM *seqall, int which, tv_t *cd, tv_t *now, char *msg, K_TREE *trf_root, - bool wantauth) + bool wantauth, int seqitemflags) { uint64_t n_seqall, n_seqstt, n_seqpid, n_seqcmd; K_ITEM *seqstt, *seqpid, *seqcmd, *i_code; char *err = NULL, *st = NULL; size_t len, off; - bool dupall, dupcmd, warndup; + bool dupall, dupcmd; char *code = NULL; char buf[64]; @@ -2169,19 +2216,10 @@ static enum cmd_values process_seq(K_ITEM *seqall, int which, tv_t *cd, code = EMPTY; } - if (!startup_complete) - warndup = true; - else { - if (reload_queue_complete) - warndup = true; - else - warndup = false; - } - dupall = update_seq(SEQ_ALL, n_seqall, n_seqstt, n_seqpid, SEQALL, - now, cd, code, warndup, msg); + now, cd, code, seqitemflags, msg); dupcmd = update_seq(ckdb_cmds[which].seq, n_seqcmd, n_seqstt, n_seqpid, - buf, now, cd, code, warndup, msg); + buf, now, cd, code, seqitemflags, msg); if (dupall != dupcmd) { // Bad/corrupt data or a code bug @@ -2212,7 +2250,8 @@ static enum cmd_values process_seq(K_ITEM *seqall, int which, tv_t *cd, static enum cmd_values breakdown(K_TREE **trf_root, K_STORE **trf_store, char *buf, int *which_cmds, char *cmd, - char *id, tv_t *now, tv_t *cd, bool wantauth) + char *id, tv_t *now, tv_t *cd, bool wantauth, + int seqitemflags) { char reply[1024] = ""; TRANSFER *transfer; @@ -2530,7 +2569,7 @@ static enum cmd_values breakdown(K_TREE **trf_root, K_STORE **trf_store, if (seqall) { enum cmd_values ret; ret = process_seq(seqall, *which_cmds, cd, now, buf, - *trf_root, wantauth); + *trf_root, wantauth, seqitemflags); free(cmdptr); return ret; } else { @@ -3594,10 +3633,13 @@ static void *socketer(__maybe_unused void *arg) else LOGDEBUG("Duplicate '%s' message received", duptype); } else { + int seqitemflags = SI_SOCKET; + if (!reload_queue_complete) + seqitemflags = SI_EARLYSOCK; LOGQUE(buf); cmdnum = breakdown(&trf_root, &trf_store, buf, &which_cmds, cmd, id, &now, - &cd, true); + &cd, true, seqitemflags); switch (cmdnum) { case CMD_DUPSEQ: snprintf(reply, sizeof(reply), "%s.%ld.dup.", id, now.tv_sec); @@ -3902,7 +3944,7 @@ static void *socketer(__maybe_unused void *arg) return NULL; } -static bool reload_line(PGconn *conn, char *filename, uint64_t count, char *buf) +static void reload_line(PGconn *conn, char *filename, uint64_t count, char *buf) { char cmd[CMD_SIZ+1], id[ID_SIZ+1]; enum cmd_values cmdnum; @@ -3913,7 +3955,7 @@ static bool reload_line(PGconn *conn, char *filename, uint64_t count, char *buf) TRANSFER *transfer; K_ITEM *item; tv_t now, cd; - bool finished; + bool matched; // Once we've read the message setnow(&now); @@ -3929,19 +3971,19 @@ static bool reload_line(PGconn *conn, char *filename, uint64_t count, char *buf) else LOGERR("%s() Empty message line %"PRIu64, __func__, count); } else { - finished = false; + matched = false; ck_wlock(&fpm_lock); - if (first_pool_message && strcmp(first_pool_message, buf) == 0) - finished = true; + if (first_pool_message && strcmp(first_pool_message, buf) == 0) { + matched = true; + FREENULL(first_pool_message); + } ck_wunlock(&fpm_lock); - if (finished) { + if (matched) LOGERR("%s() reload ckpool queue match at line %"PRIu64, __func__, count); - return true; - } LOGQUE(buf); cmdnum = breakdown(&trf_root, &trf_store, buf, &which_cmds, - cmd, id, &now, &cd, false); + cmd, id, &now, &cd, false, SI_RELOAD); switch (cmdnum) { // Don't ever attempt to double process reload data case CMD_DUPSEQ: @@ -4029,8 +4071,6 @@ static bool reload_line(PGconn *conn, char *filename, uint64_t count, char *buf) } tick(); - - return false; } // 10Mb for now - transactiontree can be large @@ -4112,10 +4152,10 @@ static bool reload_from(tv_t *start) PGconn *conn = NULL; char buf[DATE_BUFSIZ+1], run[DATE_BUFSIZ+1]; size_t rflen = strlen(restorefrom); - char *missingfirst = NULL, *missinglast = NULL; + char *missingfirst = NULL, *missinglast = NULL, *st = NULL; int missing_count; int processing; - bool finished = false, matched = false, ret = true, ok, apipe = false; + bool finished = false, ret = true, ok, apipe = false; char *filename = NULL; uint64_t count, total; tv_t now, begin; @@ -4160,8 +4200,9 @@ static bool reload_from(tv_t *start) * aborting early and not get the few slightly later out of * order messages in the log file */ while (!everyone_die && - logline(reload_buf, MAX_READ, fp, filename)) - matched = reload_line(conn, filename, ++count, reload_buf); + logline(reload_buf, MAX_READ, fp, filename)) { + reload_line(conn, filename, ++count, reload_buf); + } LOGWARNING("%s(): %sread %"PRIu64" line%s from %s", __func__, @@ -4244,16 +4285,14 @@ static bool reload_from(tv_t *start) if (everyone_die) return true; - if (!matched) { - ck_wlock(&fpm_lock); - if (first_pool_message) { - LOGERR("%s() reload completed without finding ckpool queue match '%.32s'...", - __func__, first_pool_message); - LOGERR("%s() restart ckdb to resolve this", __func__); - ret = false; - } - ck_wunlock(&fpm_lock); + ck_wlock(&fpm_lock); + if (first_pool_message) { + LOGERR("%s() reload didn't finding first ckpool queue '%.32s...", + __func__, st = safe_text(first_pool_message)); + FREENULL(st); + FREENULL(first_pool_message); } + ck_wunlock(&fpm_lock); reloading = false; FREENULL(reload_buf); diff --git a/src/ckdb.h b/src/ckdb.h index 4b0dfc16..3720de41 100644 --- a/src/ckdb.h +++ b/src/ckdb.h @@ -55,7 +55,7 @@ #define DB_VLOCK "1" #define DB_VERSION "1.0.0" -#define CKDB_VERSION DB_VERSION"-1.076" +#define CKDB_VERSION DB_VERSION"-1.077" #define WHERE_FFL " - from %s %s() line %d" #define WHERE_FFL_HERE __FILE__, __func__, __LINE__ @@ -860,7 +860,23 @@ enum seq_num { // Ensure size is a (multiple of 8)-1 #define SEQ_CODE 15 +#define SICHR(_sif) (((_sif) == SI_EARLYSOCK) ? 'E' : \ + (((_sif) == SI_RELOAD) ? 'R' : \ + (((_sif) == SI_SOCKET) ? 'S' : '?'))) + +// Msg from the socket before startup completed - ignore if it was a DUP +#define SI_EARLYSOCK 1 +// Msg was from reload +#define SI_RELOAD 2 +// Msg from the socket after startup completed +#define SI_SOCKET 4 + +/* An SI_EARLYSOCK item vs an SI_RELOAD item is not considered a DUP + * since the reload reads to the end of the reload file after + * the match between the queue and the reload has been found */ + typedef struct seqitem { + int flags; tv_t cd; // sec:0=missing, usec:0=miss !0=trans tv_t time; char code[SEQ_CODE+1]; From c43375e1ac4941b0041fdcdb6aad50400dc19c71 Mon Sep 17 00:00:00 2001 From: kanoi Date: Wed, 22 Apr 2015 17:56:16 +1000 Subject: [PATCH 10/62] ckdb - text typo --- src/ckdb.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ckdb.c b/src/ckdb.c index 66c9a525..72a81ed8 100644 --- a/src/ckdb.c +++ b/src/ckdb.c @@ -4287,7 +4287,7 @@ static bool reload_from(tv_t *start) ck_wlock(&fpm_lock); if (first_pool_message) { - LOGERR("%s() reload didn't finding first ckpool queue '%.32s...", + LOGERR("%s() reload didn't find the first ckpool queue '%.32s...", __func__, st = safe_text(first_pool_message)); FREENULL(st); FREENULL(first_pool_message); From a83c283b1f2d4505b145db503ebc64792a62851d Mon Sep 17 00:00:00 2001 From: kanoi Date: Wed, 22 Apr 2015 18:25:07 +1000 Subject: [PATCH 11/62] ckdb - report the set number of new seqsets --- src/ckdb.c | 5 +++-- src/ckdb.h | 2 +- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/src/ckdb.c b/src/ckdb.c index 72a81ed8..5c77f970 100644 --- a/src/ckdb.c +++ b/src/ckdb.c @@ -2023,9 +2023,10 @@ setitemdata: SEQSETWARN(expset, &seqset_exp, "discarded old", " for:"); if (newseq || expseq) { btu64_to_buf(&n_seqstt, t_buf, sizeof(t_buf)); - LOGWARNING("Seq created new: %s %"PRIu64" " + LOGWARNING("Seq created new: set %d %s %"PRIu64" " SEQSTT" %"PRIu64"=%s "SEQPID" %"PRIu64, - nam, n_seqcmd, n_seqstt, t_buf, n_seqpid); + set, nam, n_seqcmd, n_seqstt, t_buf, + n_seqpid); } } diff --git a/src/ckdb.h b/src/ckdb.h index 3720de41..71b985b5 100644 --- a/src/ckdb.h +++ b/src/ckdb.h @@ -55,7 +55,7 @@ #define DB_VLOCK "1" #define DB_VERSION "1.0.0" -#define CKDB_VERSION DB_VERSION"-1.077" +#define CKDB_VERSION DB_VERSION"-1.078" #define WHERE_FFL " - from %s %s() line %d" #define WHERE_FFL_HERE __FILE__, __func__, __LINE__ From e012108280faac4aa05160f440db277c74189370 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Wed, 22 Apr 2015 18:58:59 +1000 Subject: [PATCH 12/62] Explicitly check for EPOLLRDHUP as well --- src/connector.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/connector.c b/src/connector.c index baad4c30..d3f68ac8 100644 --- a/src/connector.c +++ b/src/connector.c @@ -225,7 +225,7 @@ static int accept_client(cdata_t *cdata, const int epfd, const uint64_t server) client->fd = fd; event.data.ptr = client; - event.events = EPOLLIN; + event.events = EPOLLIN | EPOLLRDHUP; if (unlikely(epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &event) < 0)) { LOGERR("Failed to epoll_ctl add in accept_client"); recycle_client(cdata, client); @@ -495,7 +495,7 @@ void *receiver(void *arg) client = ref_client_by_id(cdata, client->id); if (unlikely(!client)) continue; - if ((event.events & EPOLLERR) || (event.events & EPOLLHUP)) { + if (event.events & (EPOLLERR | EPOLLHUP | EPOLLRDHUP)) { /* Client disconnected */ LOGDEBUG("Client fd %d HUP in epoll", client->fd); invalidate_client(cdata->pi->ckp, cdata, client); From b50f2ba86c0d4173b7f8d179ae5177eb8379451a Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Wed, 22 Apr 2015 19:00:57 +1000 Subject: [PATCH 13/62] Check clients match in epoll loop --- src/connector.c | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/connector.c b/src/connector.c index d3f68ac8..60354661 100644 --- a/src/connector.c +++ b/src/connector.c @@ -492,8 +492,7 @@ void *receiver(void *arg) client = event.data.ptr; /* Recheck this client still exists in the same form when it * was queued. */ - client = ref_client_by_id(cdata, client->id); - if (unlikely(!client)) + if (unlikely(client != ref_client_by_id(cdata, client->id))) continue; if (event.events & (EPOLLERR | EPOLLHUP | EPOLLRDHUP)) { /* Client disconnected */ From 88fcb55faa302c8adcd6bdae3846f3374b72b470 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Wed, 22 Apr 2015 19:10:58 +1000 Subject: [PATCH 14/62] Revert "Check clients match in epoll loop" This reverts commit b50f2ba86c0d4173b7f8d179ae5177eb8379451a. Wrong --- src/connector.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/connector.c b/src/connector.c index 60354661..d3f68ac8 100644 --- a/src/connector.c +++ b/src/connector.c @@ -492,7 +492,8 @@ void *receiver(void *arg) client = event.data.ptr; /* Recheck this client still exists in the same form when it * was queued. */ - if (unlikely(client != ref_client_by_id(cdata, client->id))) + client = ref_client_by_id(cdata, client->id); + if (unlikely(!client)) continue; if (event.events & (EPOLLERR | EPOLLHUP | EPOLLRDHUP)) { /* Client disconnected */ From 75d24d1d07f29e3f5e6bb720684ad3b76366eb17 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Wed, 22 Apr 2015 19:34:49 +1000 Subject: [PATCH 15/62] Reference clients in the epoll list by their client id to avoid double lookup and possible wrong client selection --- src/connector.c | 31 +++++++++++++++---------------- 1 file changed, 15 insertions(+), 16 deletions(-) diff --git a/src/connector.c b/src/connector.c index d3f68ac8..221b808f 100644 --- a/src/connector.c +++ b/src/connector.c @@ -223,12 +223,17 @@ static int accept_client(cdata_t *cdata, const int epfd, const uint64_t server) LOGINFO("Connected new client %d on socket %d to %d active clients from %s:%d", cdata->nfds, fd, no_clients, client->address_name, port); + ck_wlock(&cdata->lock); + client->id = cdata->client_id++; + HASH_ADD_I64(cdata->clients, id, client); + cdata->nfds++; + ck_wunlock(&cdata->lock); + client->fd = fd; - event.data.ptr = client; + event.data.u64 = client->id; event.events = EPOLLIN | EPOLLRDHUP; if (unlikely(epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &event) < 0)) { LOGERR("Failed to epoll_ctl add in accept_client"); - recycle_client(cdata, client); return 0; } @@ -237,12 +242,6 @@ static int accept_client(cdata_t *cdata, const int epfd, const uint64_t server) * removes it automatically from the epoll list. */ __inc_instance_ref(client); - ck_wlock(&cdata->lock); - client->id = cdata->client_id++; - HASH_ADD_I64(cdata->clients, id, client); - cdata->nfds++; - ck_wunlock(&cdata->lock); - return 1; } @@ -442,8 +441,7 @@ void *receiver(void *arg) serverfds = cdata->ckp->serverurls; /* Add all the serverfds to the epoll */ for (i = 0; i < serverfds; i++) { - /* The small values will be easily identifiable compared to - * pointers */ + /* The small values will be less than the first client ids */ event.data.u64 = i; event.events = EPOLLIN; ret = epoll_ctl(epfd, EPOLL_CTL_ADD, cdata->serverfd[i], &event); @@ -489,12 +487,11 @@ void *receiver(void *arg) } continue; } - client = event.data.ptr; - /* Recheck this client still exists in the same form when it - * was queued. */ - client = ref_client_by_id(cdata, client->id); - if (unlikely(!client)) + client = ref_client_by_id(cdata, event.data.u64); + if (unlikely(!client)) { + LOGWARNING("Failed to find client by id in receiver!"); continue; + } if (event.events & (EPOLLERR | EPOLLHUP | EPOLLRDHUP)) { /* Client disconnected */ LOGDEBUG("Client fd %d HUP in epoll", client->fd); @@ -970,7 +967,9 @@ int connector(proc_instance_t *pi) cklock_init(&cdata->lock); cdata->pi = pi; cdata->nfds = 0; - cdata->client_id = 1; + /* Set the client id to the highest serverurl count to distinguish + * them from the server fds in epoll. */ + cdata->client_id = ckp->serverurls; mutex_init(&cdata->sender_lock); cond_init(&cdata->sender_cond); create_pthread(&cdata->pth_sender, sender, cdata); From 1c32d7829508b529b893fa397888b2fe9c6912aa Mon Sep 17 00:00:00 2001 From: kanoi Date: Wed, 22 Apr 2015 19:42:28 +1000 Subject: [PATCH 16/62] ckdb - remove 'maybe'-not warnings by NULLing variables --- src/ckdb.h | 2 +- src/ckdb_data.c | 2 +- src/ckdb_dbio.c | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/ckdb.h b/src/ckdb.h index 71b985b5..e5e1d6f7 100644 --- a/src/ckdb.h +++ b/src/ckdb.h @@ -55,7 +55,7 @@ #define DB_VLOCK "1" #define DB_VERSION "1.0.0" -#define CKDB_VERSION DB_VERSION"-1.078" +#define CKDB_VERSION DB_VERSION"-1.079" #define WHERE_FFL " - from %s %s() line %d" #define WHERE_FFL_HERE __FILE__, __func__, __LINE__ diff --git a/src/ckdb_data.c b/src/ckdb_data.c index deee3f5d..17e3d431 100644 --- a/src/ckdb_data.c +++ b/src/ckdb_data.c @@ -2383,7 +2383,7 @@ void set_block_share_counters() { K_TREE_CTX ctx[1], ctx_ms[1]; K_ITEM *ss_item, ss_look, *ws_item, *wm_item, *ms_item, ms_look; - WORKERSTATUS *workerstatus; + WORKERSTATUS *workerstatus = NULL; SHARESUMMARY *sharesummary, looksharesummary; WORKMARKERS *workmarkers; MARKERSUMMARY *markersummary, lookmarkersummary; diff --git a/src/ckdb_dbio.c b/src/ckdb_dbio.c index 2c04c4af..be709f60 100644 --- a/src/ckdb_dbio.c +++ b/src/ckdb_dbio.c @@ -3436,7 +3436,7 @@ bool sharesummaries_to_markersummaries(PGconn *conn, WORKMARKERS *workmarkers, PGresult *res; K_TREE_CTX ss_ctx[1], ms_ctx[1]; SHARESUMMARY *sharesummary, looksharesummary; - MARKERSUMMARY *markersummary, lookmarkersummary, *p_markersummary; + MARKERSUMMARY *markersummary, lookmarkersummary, *p_markersummary = NULL; K_ITEM *ss_item, *ss_prev, ss_look, *ms_item, ms_look; K_ITEM *p_ss_item, *p_ms_item; bool ok = false, conned = false; @@ -3865,7 +3865,7 @@ bool _sharesummary_update(PGconn *conn, SHARES *s_row, SHAREERRORS *e_row, K_ITE PGresult *res = NULL; WORKMARKERS *wm; SHARESUMMARY *row, *p_row; - K_ITEM *item, *wm_item, *p_item; + K_ITEM *item, *wm_item, *p_item = NULL; char *ins, *upd; bool ok = false, new = false, p_new = false; char *params[19 + MODIFYDATECOUNT]; From 70379428d6f528ae51d3cd1fbd02a75a4e13f845 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Wed, 22 Apr 2015 19:51:20 +1000 Subject: [PATCH 17/62] Steal copied id_vals in the stratifier to avoid needing to copy them again, thus allowing opaque objects to be used as id values --- src/stratifier.c | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/src/stratifier.c b/src/stratifier.c index 500cb835..e5f704a3 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -3265,7 +3265,7 @@ static void send_json_err(sdata_t *sdata, const int64_t client_id, json_t *id_va { json_t *val; - JSON_CPACK(val, "{soss}", "id", json_copy(id_val), "error", err_msg); + JSON_CPACK(val, "{soss}", "id", json_deep_copy(id_val), "error", err_msg); stratum_add_send(sdata, val, client_id); } @@ -3625,10 +3625,18 @@ static void discard_json_params(json_params_t *jp) { json_decref(jp->method); json_decref(jp->params); - json_decref(jp->id_val); + if (jp->id_val) + json_decref(jp->id_val); free(jp); } +static void steal_json_id(json_t *val, json_params_t *jp) +{ + /* Steal the id_val as is to avoid a copy */ + json_object_set_new_nocheck(val, "id", jp->id_val); + jp->id_val = NULL; +} + static void sshare_process(ckpool_t *ckp, json_params_t *jp) { json_t *result_val, *json_msg, *err_val = NULL; @@ -3651,7 +3659,7 @@ static void sshare_process(ckpool_t *ckp, json_params_t *jp) result_val = parse_submit(client, json_msg, jp->params, &err_val); json_object_set_new_nocheck(json_msg, "result", result_val); json_object_set_new_nocheck(json_msg, "error", err_val ? err_val : json_null()); - json_object_set_nocheck(json_msg, "id", jp->id_val); + steal_json_id(json_msg, jp); stratum_add_send(sdata, json_msg, client_id); out_decref: dec_instance_ref(sdata, client); @@ -3713,7 +3721,7 @@ static void sauth_process(ckpool_t *ckp, json_params_t *jp) json_msg = json_object(); json_object_set_new_nocheck(json_msg, "result", result_val); json_object_set_new_nocheck(json_msg, "error", err_val ? err_val : json_null()); - json_object_set_nocheck(json_msg, "id", jp->id_val); + steal_json_id(json_msg, jp); stratum_add_send(sdata, json_msg, client_id); if (!json_is_true(result_val) || !client->suggest_diff) @@ -3877,7 +3885,7 @@ static void send_transactions(ckpool_t *ckp, json_params_t *jp) goto out; } val = json_object(); - json_object_set_nocheck(val, "id", jp->id_val); + steal_json_id(val, jp); if (cmdmatch(msg, "mining.get_transactions")) { int txns; From bf72ebbee6f0fe9767bcce50d50cda2ab8853046 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Wed, 22 Apr 2015 19:56:24 +1000 Subject: [PATCH 18/62] Reinstate checking for zero as a return from recv for cleanly disconnected clients --- src/connector.c | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/connector.c b/src/connector.c index 221b808f..e2a66f36 100644 --- a/src/connector.c +++ b/src/connector.c @@ -335,8 +335,9 @@ retry: buflen = PAGESIZE - client->bufofs; ret = recv(client->fd, client->buf + client->bufofs, buflen, 0); if (ret < 1) { - if (!ret) - return; + /* We should have something to read if called since poll set + * this fd's revents status so if there's nothing it means the + * client has disconnected. */ LOGINFO("Client fd %d disconnected - recv fail with bufofs %d ret %d errno %d %s", client->fd, client->bufofs, ret, errno, ret && errno ? strerror(errno) : ""); invalidate_client(ckp, cdata, client); From ae2b00842b57924da1f7aa1971572afb5e6bd1d4 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Fri, 24 Apr 2015 11:07:33 +1000 Subject: [PATCH 19/62] Allow unauthorised clients to send other messages till they've authorised but silently ignore them --- src/stratifier.c | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/src/stratifier.c b/src/stratifier.c index e5f704a3..25f9cdd5 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -3443,12 +3443,8 @@ static void parse_method(sdata_t *sdata, stratum_instance_t *client, const int64 /* We should only accept authorised requests from here on */ if (!client->authorised) { - /* Dropping unauthorised clients here also allows the - * stratifier process to restart since it will have lost all - * the stratum instance data. Clients will just reconnect. */ - LOGINFO("Dropping unauthorised client %"PRId64" %s", client_id, client->address); - snprintf(buf, 255, "dropclient=%"PRId64, client_id); - send_proc(client->ckp->connector, buf); + LOGINFO("Dropping %s from unauthorised client %"PRId64" %s", method, + client_id, client->address); return; } From da2f62aea96c0a472070c2c39376121e4ddca5c2 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Fri, 24 Apr 2015 11:44:17 +1000 Subject: [PATCH 20/62] Pass through the downstream clients' address and server --- src/connector.c | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/connector.c b/src/connector.c index e2a66f36..7b930ea2 100644 --- a/src/connector.c +++ b/src/connector.c @@ -383,10 +383,11 @@ reparse: json_object_del(val, "client_id"); passthrough_id = (client->id << 32) | passthrough_id; json_object_set_new_nocheck(val, "client_id", json_integer(passthrough_id)); - } else + } else { json_object_set_new_nocheck(val, "client_id", json_integer(client->id)); - json_object_set_new_nocheck(val, "address", json_string(client->address_name)); - json_object_set_new_nocheck(val, "server", json_integer(client->server)); + json_object_set_new_nocheck(val, "address", json_string(client->address_name)); + json_object_set_new_nocheck(val, "server", json_integer(client->server)); + } s = json_dumps(val, 0); /* Do not send messages of clients we've already dropped. We From 24d89a3e72fc4bca6a72d5e55b3d7f7862c02d35 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Fri, 24 Apr 2015 14:04:49 +1000 Subject: [PATCH 21/62] Don't use MSG_WAITALL on unix sockets --- src/libckpool.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/libckpool.c b/src/libckpool.c index 84e8ce8c..f3d218d1 100644 --- a/src/libckpool.c +++ b/src/libckpool.c @@ -932,7 +932,7 @@ int read_length(int sockd, void *buf, int len) if (unlikely(sockd < 0)) return -1; while (len) { - ret = recv(sockd, buf + ofs, len, MSG_WAITALL); + ret = recv(sockd, buf + ofs, len, 0); if (unlikely(ret < 1)) return -1; ofs += ret; From 2cc42376bdce1177326fc28a50f34d0efe653eff Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Fri, 24 Apr 2015 14:06:27 +1000 Subject: [PATCH 22/62] Check also for POLLRDHUP in wait/read write select helpers --- src/libckpool.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/libckpool.c b/src/libckpool.c index f3d218d1..24c66f10 100644 --- a/src/libckpool.c +++ b/src/libckpool.c @@ -911,7 +911,7 @@ int wait_read_select(int sockd, int timeout) if (unlikely(sockd < 0)) goto out; sfd.fd = sockd; - sfd.events = POLLIN; + sfd.events = POLLIN | POLLRDHUP; sfd.revents = 0; timeout *= 1000; ret = poll(&sfd, 1, timeout); @@ -993,7 +993,7 @@ int wait_write_select(int sockd, int timeout) if (unlikely(sockd < 0)) goto out; sfd.fd = sockd; - sfd.events = POLLOUT; + sfd.events = POLLOUT | POLLRDHUP; sfd.revents = 0; timeout *= 1000; ret = poll(&sfd, 1, timeout); From 6e7f39321df599b3b05ca721f0096c3837d39556 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Fri, 24 Apr 2015 14:08:58 +1000 Subject: [PATCH 23/62] We are not interested in POLLIN in wait_close but any mode of a closed socket --- src/libckpool.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/libckpool.c b/src/libckpool.c index 24c66f10..011194fc 100644 --- a/src/libckpool.c +++ b/src/libckpool.c @@ -893,13 +893,13 @@ int wait_close(int sockd, int timeout) if (unlikely(sockd < 0)) return -1; sfd.fd = sockd; - sfd.events = POLLIN; + sfd.events = POLLRDHUP; sfd.revents = 0; timeout *= 1000; ret = poll(&sfd, 1, timeout); if (ret < 1) return 0; - return sfd.revents & POLLHUP; + return sfd.revents & (POLLHUP | POLLRDHUP | POLLERR); } /* Emulate a select read wait for high fds that select doesn't support */ From 0b96be01443ff79db8b6e2984f591ffd871599e7 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Fri, 24 Apr 2015 14:12:29 +1000 Subject: [PATCH 24/62] Use non blocking receive in parse_client_msg as we check for read readiness with wait_read_select first, and there is the unlikely event the state changes --- src/connector.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/connector.c b/src/connector.c index 7b930ea2..b141f205 100644 --- a/src/connector.c +++ b/src/connector.c @@ -333,7 +333,7 @@ retry: return; } buflen = PAGESIZE - client->bufofs; - ret = recv(client->fd, client->buf + client->bufofs, buflen, 0); + ret = recv(client->fd, client->buf + client->bufofs, buflen, MSG_DONTWAIT); if (ret < 1) { /* We should have something to read if called since poll set * this fd's revents status so if there's nothing it means the From 8bb50fa05861aa2dfac08001aac0839e057f4543 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Fri, 24 Apr 2015 15:04:41 +1000 Subject: [PATCH 25/62] Process epoll read messages before hangups and errors and add more info about the type of hang up, increasing verbosity when it's an unexpected error --- src/connector.c | 32 +++++++++++++++++++++++++++----- 1 file changed, 27 insertions(+), 5 deletions(-) diff --git a/src/connector.c b/src/connector.c index b141f205..44dbc76d 100644 --- a/src/connector.c +++ b/src/connector.c @@ -494,12 +494,34 @@ void *receiver(void *arg) LOGWARNING("Failed to find client by id in receiver!"); continue; } - if (event.events & (EPOLLERR | EPOLLHUP | EPOLLRDHUP)) { - /* Client disconnected */ - LOGDEBUG("Client fd %d HUP in epoll", client->fd); - invalidate_client(cdata->pi->ckp, cdata, client); - } else + /* We can have both messages and read hang ups so process the + * message first. */ + if (likely(event.events & EPOLLIN)) parse_client_msg(cdata, client); + if (unlikely(event.events & EPOLLERR)) { + socklen_t errlen = sizeof(int); + int error = 0; + + /* See what type of error this is and raise the log + * level of the message if it's unexpected. */ + getsockopt(client->fd, SOL_SOCKET, SO_ERROR, (void *)&error, &errlen); + if (error != 104) { + LOGNOTICE("Client id %"PRId64" fd %d epollerr HUP in epoll with error %d: %s", + client->id, client->fd, error, strerror(error)); + } else { + LOGINFO("Client id %"PRId64" fd %d epollerr HUP in epoll with error %d: %s", + client->id, client->fd, error, strerror(error)); + } + invalidate_client(cdata->pi->ckp, cdata, client); + } else if (unlikely(event.events & EPOLLHUP)) { + /* Client connection reset by peer */ + LOGINFO("Client id %"PRId64" fd %d HUP in epoll", client->id, client->fd); + invalidate_client(cdata->pi->ckp, cdata, client); + } else if (unlikely(event.events & EPOLLRDHUP)) { + /* Client disconnected by peer */ + LOGINFO("Client id %"PRId64" fd %d RDHUP in epoll", client->id, client->fd); + invalidate_client(cdata->pi->ckp, cdata, client); + } dec_instance_ref(cdata, client); } return NULL; From e9441f2e1af300b3afef10678031c514b7d2f462 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Fri, 24 Apr 2015 19:28:11 +1000 Subject: [PATCH 26/62] Drop passthrough proxy connection from stratifier --- src/connector.c | 12 ++++++------ src/stratifier.c | 10 +++++----- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/src/connector.c b/src/connector.c index 44dbc76d..ed68155d 100644 --- a/src/connector.c +++ b/src/connector.c @@ -274,11 +274,14 @@ static int drop_client(cdata_t *cdata, client_instance_t *client) return fd; } -static void stratifier_drop_client(ckpool_t *ckp, int64_t id) +static void stratifier_drop_client(ckpool_t *ckp, const client_instance_t *client) { char buf[256]; - sprintf(buf, "dropclient=%"PRId64, id); + /* The stratifier is not in use in passthrough mode */ + if (ckp->passthrough || client->passthrough) + return; + sprintf(buf, "dropclient=%"PRId64, client->id); send_proc(ckp->stratifier, buf); } @@ -292,9 +295,7 @@ static int invalidate_client(ckpool_t *ckp, cdata_t *cdata, client_instance_t *c int ret; ret = drop_client(cdata, client); - if (ckp->passthrough) - goto out; - stratifier_drop_client(ckp, client->id); + stratifier_drop_client(ckp, client); /* Cull old unused clients lazily when there are no more reference * counts for them. */ @@ -308,7 +309,6 @@ static int invalidate_client(ckpool_t *ckp, cdata_t *cdata, client_instance_t *c } ck_wunlock(&cdata->lock); -out: return ret; } diff --git a/src/stratifier.c b/src/stratifier.c index 25f9cdd5..5d483fcb 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -3409,14 +3409,14 @@ static void parse_method(sdata_t *sdata, stratum_instance_t *client, const int64 } if (unlikely(cmdmatch(method, "mining.passthrough"))) { - LOGNOTICE("Adding passthrough client %"PRId64" %s", client_id, client->address); /* We need to inform the connector process that this client - * is a passthrough and to manage its messages accordingly. - * The client_id stays on the list but we won't send anything - * to it since it's unauthorised. Set the flag just in case. */ - client->authorised = false; + * is a passthrough and to manage its messages accordingly. No + * data from this client id should ever come back to this + * stratifier after this so drop the client in the stratifier. */ + LOGNOTICE("Adding passthrough client %"PRId64" %s", client_id, client->address); snprintf(buf, 255, "passthrough=%"PRId64, client_id); send_proc(client->ckp->connector, buf); + drop_client(sdata, client_id); return; } From 3f3dc7f4f30591ec2ba57f4860e30e817cf8dc44 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Fri, 24 Apr 2015 19:58:40 +1000 Subject: [PATCH 27/62] Implement a stratum mining.term call which notifies the stratifier this client is terminating and use it to signal upstream pools when passthrough subclients have disconnected --- src/connector.c | 31 ++++++++++++++++++++++++------- src/stratifier.c | 7 +++++++ 2 files changed, 31 insertions(+), 7 deletions(-) diff --git a/src/connector.c b/src/connector.c index ed68155d..35dd0bd2 100644 --- a/src/connector.c +++ b/src/connector.c @@ -274,13 +274,25 @@ static int drop_client(cdata_t *cdata, client_instance_t *client) return fd; } +/* For sending the drop command to the upstream pool in passthrough mode */ +static void generator_drop_client(ckpool_t *ckp, const client_instance_t *client) +{ + json_t *val; + char *s; + + JSON_CPACK(val, "{si,sI:ss:si:ss:s[]}", "id", 42, "client_id", client->id, "address", + client->address_name, "server", client->server, "method", "mining.term", + "params"); + s = json_dumps(val, 0); + json_decref(val); + send_proc(ckp->generator, s); + free(s); +} + static void stratifier_drop_client(ckpool_t *ckp, const client_instance_t *client) { char buf[256]; - /* The stratifier is not in use in passthrough mode */ - if (ckp->passthrough || client->passthrough) - return; sprintf(buf, "dropclient=%"PRId64, client->id); send_proc(ckp->stratifier, buf); } @@ -295,7 +307,10 @@ static int invalidate_client(ckpool_t *ckp, cdata_t *cdata, client_instance_t *c int ret; ret = drop_client(cdata, client); - stratifier_drop_client(ckp, client); + if (!ckp->passthrough && !client->passthrough) + stratifier_drop_client(ckp, client); + else if (ckp->passthrough) + generator_drop_client(ckp, client); /* Cull old unused clients lazily when there are no more reference * counts for them. */ @@ -784,10 +799,10 @@ static char *connector_stats(cdata_t *cdata) static int connector_loop(proc_instance_t *pi, cdata_t *cdata) { - int64_t client_id64, client_id; unix_msg_t *umsg = NULL; ckpool_t *ckp = pi->ckp; uint8_t test_cycle = 0; + int64_t client_id; char *buf; int ret = 0; @@ -827,12 +842,14 @@ retry: } else if (cmdmatch(buf, "dropclient")) { client_instance_t *client; - ret = sscanf(buf, "dropclient=%"PRId64, &client_id64); + ret = sscanf(buf, "dropclient=%"PRId64, &client_id); if (ret < 0) { LOGDEBUG("Connector failed to parse dropclient command: %s", buf); goto retry; } - client_id = client_id64 & 0xffffffffll; + /* A passthrough client, we can't drop this yet */ + if (client_id > 0xffffffffll) + goto retry; client = ref_client_by_id(cdata, client_id); if (unlikely(!client)) { LOGINFO("Connector failed to find client id %"PRId64" to drop", client_id); diff --git a/src/stratifier.c b/src/stratifier.c index 5d483fcb..1b50a2cb 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -3460,6 +3460,13 @@ static void parse_method(sdata_t *sdata, stratum_instance_t *client, const int64 ckmsgq_add(sdata->stxnq, jp); return; } + + if (cmdmatch(method, "mining.term")) { + LOGDEBUG("Mining terminate requested from %"PRId64" %s", client_id, client->address); + drop_client(sdata, client_id); + return; + } + /* Unhandled message here */ LOGINFO("Unhandled client %"PRId64" %s method %s", client_id, client->address, method); return; From e095ba4b49c49d502749f74ce8dfc5f2fda955fa Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Fri, 24 Apr 2015 21:03:30 +1000 Subject: [PATCH 28/62] Mask out error 0 in socket error messages --- src/connector.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/connector.c b/src/connector.c index 35dd0bd2..b57bf048 100644 --- a/src/connector.c +++ b/src/connector.c @@ -520,7 +520,7 @@ void *receiver(void *arg) /* See what type of error this is and raise the log * level of the message if it's unexpected. */ getsockopt(client->fd, SOL_SOCKET, SO_ERROR, (void *)&error, &errlen); - if (error != 104) { + if (error && error != 104) { LOGNOTICE("Client id %"PRId64" fd %d epollerr HUP in epoll with error %d: %s", client->id, client->fd, error, strerror(error)); } else { From 93d5760b2d52b99da4608d4f2fcf31f81091029f Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Fri, 24 Apr 2015 21:13:51 +1000 Subject: [PATCH 29/62] Set server to the passthrough server client in passthrough mode --- src/connector.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/connector.c b/src/connector.c index b57bf048..650e4921 100644 --- a/src/connector.c +++ b/src/connector.c @@ -401,8 +401,8 @@ reparse: } else { json_object_set_new_nocheck(val, "client_id", json_integer(client->id)); json_object_set_new_nocheck(val, "address", json_string(client->address_name)); - json_object_set_new_nocheck(val, "server", json_integer(client->server)); } + json_object_set_new_nocheck(val, "server", json_integer(client->server)); s = json_dumps(val, 0); /* Do not send messages of clients we've already dropped. We From 5633f8365df0082920d99ff669d0ca2a2c91812a Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Fri, 24 Apr 2015 21:17:24 +1000 Subject: [PATCH 30/62] Prevent theoretical read out of bounds --- src/stratifier.c | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/stratifier.c b/src/stratifier.c index 1b50a2cb..00c9b000 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -1348,7 +1348,7 @@ static stratum_instance_t *__recruit_stratum_instance(sdata_t *sdata) /* Enter with write instance_lock held */ static stratum_instance_t *__stratum_add_instance(ckpool_t *ckp, const int64_t id, - const char *address, const int server) + const char *address, int server) { stratum_instance_t *client; sdata_t *sdata = ckp->data; @@ -1357,6 +1357,9 @@ static stratum_instance_t *__stratum_add_instance(ckpool_t *ckp, const int64_t i client->id = id; client->session_id = ++sdata->session_id; strcpy(client->address, address); + /* Sanity check to not overflow lookup in ckp->serverurl[] */ + if (server >= ckp->serverurls) + server = 0; client->server = server; client->diff = client->old_diff = ckp->startdiff; client->ckp = ckp; From 84ca31fcca398b432a151702c0c71f3d13d4a0bc Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Sat, 25 Apr 2015 09:13:20 +1000 Subject: [PATCH 31/62] Drop stratifier id when client id is not found in connector --- src/connector.c | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/src/connector.c b/src/connector.c index 650e4921..b74e53fe 100644 --- a/src/connector.c +++ b/src/connector.c @@ -289,14 +289,19 @@ static void generator_drop_client(ckpool_t *ckp, const client_instance_t *client free(s); } -static void stratifier_drop_client(ckpool_t *ckp, const client_instance_t *client) +static void stratifier_drop_id(ckpool_t *ckp, const int64_t id) { char buf[256]; - sprintf(buf, "dropclient=%"PRId64, client->id); + sprintf(buf, "dropclient=%"PRId64, id); send_proc(ckp->stratifier, buf); } +static void stratifier_drop_client(ckpool_t *ckp, const client_instance_t *client) +{ + stratifier_drop_id(ckp, client->id); +} + /* Invalidate this instance. Remove them from the hashtables we look up * regularly but keep the instances in a linked list until their ref count * drops to zero when we can remove them lazily. Client must hold a reference @@ -681,6 +686,7 @@ static void send_client(cdata_t *cdata, int64_t id, char *buf) invalidate_client(ckp, cdata, client); } else { LOGINFO("Connector failed to find client id %"PRId64" to send to", id); + stratifier_drop_id(ckp, id); } free(buf); return; From 6c0c7dd7c43df51f50591845c42d8e0cd8c544b6 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Sat, 25 Apr 2015 09:51:12 +1000 Subject: [PATCH 32/62] Rework parse_client_msg to use the non blocking recv return values avoiding the need for an extra wait_read_select --- src/connector.c | 16 ++++------------ 1 file changed, 4 insertions(+), 12 deletions(-) diff --git a/src/connector.c b/src/connector.c index b74e53fe..65c77bd6 100644 --- a/src/connector.c +++ b/src/connector.c @@ -343,21 +343,13 @@ static void parse_client_msg(cdata_t *cdata, client_instance_t *client) json_t *val; retry: - ret = wait_read_select(client->fd, 0); - if (ret < 1) { - if (!ret) - return; - LOGINFO("Client fd %d disconnected - select fail with bufofs %d ret %d errno %d %s", - client->fd, client->bufofs, ret, errno, ret && errno ? strerror(errno) : ""); - invalidate_client(ckp, cdata, client); - return; - } buflen = PAGESIZE - client->bufofs; ret = recv(client->fd, client->buf + client->bufofs, buflen, MSG_DONTWAIT); if (ret < 1) { - /* We should have something to read if called since poll set - * this fd's revents status so if there's nothing it means the - * client has disconnected. */ + if (!ret) + return; + if (errno == EAGAIN || errno == EWOULDBLOCK) + return; LOGINFO("Client fd %d disconnected - recv fail with bufofs %d ret %d errno %d %s", client->fd, client->bufofs, ret, errno, ret && errno ? strerror(errno) : ""); invalidate_client(ckp, cdata, client); From 22ece7b96cc08081d8c332ed73ae3851d6c21ffb Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Sat, 25 Apr 2015 10:13:48 +1000 Subject: [PATCH 33/62] Revert "Don't use MSG_WAITALL on unix sockets" This reverts commit 24d89a3e72fc4bca6a72d5e55b3d7f7862c02d35. Do use it... --- src/libckpool.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/libckpool.c b/src/libckpool.c index 011194fc..07a1727a 100644 --- a/src/libckpool.c +++ b/src/libckpool.c @@ -932,7 +932,7 @@ int read_length(int sockd, void *buf, int len) if (unlikely(sockd < 0)) return -1; while (len) { - ret = recv(sockd, buf + ofs, len, 0); + ret = recv(sockd, buf + ofs, len, MSG_WAITALL); if (unlikely(ret < 1)) return -1; ofs += ret; From 94dbd802a262e7d41b127b73fe6bb92530187c83 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Sat, 25 Apr 2015 10:20:54 +1000 Subject: [PATCH 34/62] Wait longer after kill message --- src/ckpool.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ckpool.c b/src/ckpool.c index 218139de..4bd99f22 100644 --- a/src/ckpool.c +++ b/src/ckpool.c @@ -808,7 +808,7 @@ static void terminate_oldpid(const ckpool_t *ckp, proc_instance_t *pi, const pid return; LOGWARNING("Old process %s pid %d failed to respond to terminate request, killing", pi->processname, oldpid); - if (kill_pid(oldpid, 9) || !pid_wait(oldpid, 500)) + if (kill_pid(oldpid, 9) || !pid_wait(oldpid, 3000)) quit(1, "Unable to kill old process %s pid %d", pi->processname, oldpid); } From fd73ebe3aeea3cd454bba9c2ece9eb9e28aed0ab Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Sat, 25 Apr 2015 15:30:44 +1000 Subject: [PATCH 35/62] Send port number as a string on reconnect as most clients will be expecting it --- src/stratifier.c | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/src/stratifier.c b/src/stratifier.c index 00c9b000..f1cdf034 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -1512,11 +1512,8 @@ static void reconnect_clients(sdata_t *sdata, const char *cmd) if (port) url = strsep(&port, ","); if (url && port) { - int port_no; - - port_no = strtol(port, NULL, 10); - JSON_CPACK(json_msg, "{sosss[sii]}", "id", json_null(), "method", "client.reconnect", - "params", url, port_no, 0); + JSON_CPACK(json_msg, "{sosss[ssi]}", "id", json_null(), "method", "client.reconnect", + "params", url, port, 0); } else JSON_CPACK(json_msg, "{sosss[]}", "id", json_null(), "method", "client.reconnect", "params"); From 4f6776a4c3501977e4af0ef48c9edde5557379f5 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Sat, 25 Apr 2015 15:40:55 +1000 Subject: [PATCH 36/62] Allow specifying of the socket name to use in ckpmsg --- src/ckpmsg.c | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/src/ckpmsg.c b/src/ckpmsg.c index 092a0121..52235247 100644 --- a/src/ckpmsg.c +++ b/src/ckpmsg.c @@ -1,5 +1,5 @@ /* - * Copyright 2014 Con Kolivas + * Copyright 2014-2015 Con Kolivas * * This program is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License as published by the Free @@ -54,15 +54,20 @@ void mkstamp(char *stamp, size_t siz) int main(int argc, char **argv) { - char *name = NULL, *socket_dir = NULL, *buf = NULL; + char *name = NULL, *socket_dir = NULL, *buf = NULL, *sockname = "listener"; int tmo1 = RECV_UNIX_TIMEOUT1; int tmo2 = RECV_UNIX_TIMEOUT2; bool proxy = false; char stamp[128]; int c; - while ((c = getopt(argc, argv, "n:s:pt:T:")) != -1) { + while ((c = getopt(argc, argv, "N:n:s:pt:T:")) != -1) { switch(c) { + /* Allows us to specify which process or socket to + * talk to. */ + case 'N': + sockname = strdup(optarg); + break; case 'n': name = strdup(optarg); break; @@ -92,7 +97,7 @@ int main(int argc, char **argv) realloc_strcat(&socket_dir, name); dealloc(name); trail_slash(&socket_dir); - realloc_strcat(&socket_dir, "listener"); + realloc_strcat(&socket_dir, sockname); while (42) { int sockd, len; From d0f7ec7c35f26b04dae5cb32867f770df1552115 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Sat, 25 Apr 2015 16:27:20 +1000 Subject: [PATCH 37/62] Decrease verbosity of missing client warning in receiver, adding id number to the output --- src/connector.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/connector.c b/src/connector.c index 65c77bd6..b1a3742d 100644 --- a/src/connector.c +++ b/src/connector.c @@ -503,7 +503,7 @@ void *receiver(void *arg) } client = ref_client_by_id(cdata, event.data.u64); if (unlikely(!client)) { - LOGWARNING("Failed to find client by id in receiver!"); + LOGNOTICE("Failed to find client by id %"PRId64" in receiver!", event.data.u64); continue; } /* We can have both messages and read hang ups so process the From 2561eff1cbc5f2bb2298d55e90f3f6a6d708e725 Mon Sep 17 00:00:00 2001 From: ckolivas Date: Sun, 26 Apr 2015 08:52:29 +1000 Subject: [PATCH 38/62] Update user auth time on each successful auth --- src/stratifier.c | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/stratifier.c b/src/stratifier.c index f1cdf034..6b4ee37b 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -2428,8 +2428,7 @@ static int send_recv_auth(stratum_instance_t *client) json_get_string(&secondaryuserid, val, "secondaryuserid"); parse_worker_diffs(ckp, worker_array); client->suggest_diff = worker->mindiff; - if (!user->auth_time) - user->auth_time = time(NULL); + user->auth_time = time(NULL); } if (secondaryuserid && (!safecmp(response, "ok.authorise") || !safecmp(response, "ok.addrauth"))) { From 2bde0bbe3d51a86e857bbc8e3f11393a0723d028 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Sun, 26 Apr 2015 17:38:36 +1000 Subject: [PATCH 39/62] Avoid trying to parse messages or test the client's error if their fd has already been invalidated --- src/connector.c | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/connector.c b/src/connector.c index b1a3742d..38052a23 100644 --- a/src/connector.c +++ b/src/connector.c @@ -506,10 +506,14 @@ void *receiver(void *arg) LOGNOTICE("Failed to find client by id %"PRId64" in receiver!", event.data.u64); continue; } + if (unlikely(client->fd == -1)) + goto noparse; /* We can have both messages and read hang ups so process the * message first. */ if (likely(event.events & EPOLLIN)) parse_client_msg(cdata, client); + if (unlikely(client->fd == -1)) + goto noparse; if (unlikely(event.events & EPOLLERR)) { socklen_t errlen = sizeof(int); int error = 0; @@ -534,6 +538,7 @@ void *receiver(void *arg) LOGINFO("Client id %"PRId64" fd %d RDHUP in epoll", client->id, client->fd); invalidate_client(cdata->pi->ckp, cdata, client); } +noparse: dec_instance_ref(cdata, client); } return NULL; From 62d1ec3f8a351d906842f772d92305ab506649fd Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Sun, 26 Apr 2015 17:47:23 +1000 Subject: [PATCH 40/62] Use signal handlers from sender and receiver threads in the connector avoid needing to pthread tryjoin on every message --- src/ckpool.c | 4 +--- src/ckpool.h | 1 + src/connector.c | 25 +++++++------------------ 3 files changed, 9 insertions(+), 21 deletions(-) diff --git a/src/ckpool.c b/src/ckpool.c index 4bd99f22..9b9396e6 100644 --- a/src/ckpool.c +++ b/src/ckpool.c @@ -202,8 +202,6 @@ bool ckmsgq_empty(ckmsgq_t *ckmsgq) return ret; } -static void childsighandler(const int sig); - /* Create a standalone thread that queues received unix messages for a proc * instance and adds them to linked list of received messages with their * associated receive socket, then signal the associated rmsg_cond for the @@ -879,7 +877,7 @@ static void rm_namepid(const proc_instance_t *pi) /* Disable signal handlers for child processes, but simply pass them onto the * parent process to shut down cleanly. */ -static void childsighandler(const int sig) +void childsighandler(const int sig) { signal(sig, SIG_IGN); signal(SIGTERM, SIG_IGN); diff --git a/src/ckpool.h b/src/ckpool.h index 2388aedd..656fc5fa 100644 --- a/src/ckpool.h +++ b/src/ckpool.h @@ -255,6 +255,7 @@ char *_ckdb_msg_call(const ckpool_t *ckp, const char *msg, const char *file, co json_t *json_rpc_call(connsock_t *cs, const char *rpc_req); +void childsighandler(const int sig); int process_exit(ckpool_t *ckp, const proc_instance_t *pi, int ret); bool json_get_string(char **store, const json_t *val, const char *res); bool json_get_int64(int64_t *store, const json_t *val, const char *res); diff --git a/src/connector.c b/src/connector.c index 38052a23..6e469e29 100644 --- a/src/connector.c +++ b/src/connector.c @@ -450,7 +450,7 @@ void *receiver(void *arg) epfd = cdata->epfd = epoll_create1(EPOLL_CLOEXEC); if (epfd < 0) { LOGEMERG("FATAL: Failed to create epoll in receiver"); - return NULL; + goto out; } serverfds = cdata->ckp->serverurls; /* Add all the serverfds to the epoll */ @@ -461,7 +461,7 @@ void *receiver(void *arg) ret = epoll_ctl(epfd, EPOLL_CTL_ADD, cdata->serverfd[i], &event); if (ret < 0) { LOGEMERG("FATAL: Failed to add epfd %d to epoll_ctl", epfd); - return NULL; + goto out; } } @@ -541,6 +541,9 @@ void *receiver(void *arg) noparse: dec_instance_ref(cdata, client); } +out: + /* We shouldn't get here unless there's an error */ + childsighandler(15); return NULL; } @@ -641,7 +644,8 @@ contfree: free(sender_send); dec_instance_ref(cdata, client); } - + /* We shouldn't get here unless there's an error */ + childsighandler(15); return NULL; } @@ -804,7 +808,6 @@ static int connector_loop(proc_instance_t *pi, cdata_t *cdata) { unix_msg_t *umsg = NULL; ckpool_t *ckp = pi->ckp; - uint8_t test_cycle = 0; int64_t client_id; char *buf; int ret = 0; @@ -818,20 +821,6 @@ retry: dealloc(umsg); } - if (!++test_cycle) { - /* Test for pthread join every 256 messages */ - if (unlikely(!pthread_tryjoin_np(cdata->pth_sender, NULL))) { - LOGEMERG("Connector sender thread shutdown, exiting"); - ret = 1; - goto out; - } - if (unlikely(!pthread_tryjoin_np(cdata->pth_receiver, NULL))) { - LOGEMERG("Connector receiver thread shutdown, exiting"); - ret = 1; - goto out; - } - } - do { umsg = get_unix_msg(pi); } while (!umsg); From 6a159bf5dc0ebc0f06f18ca170ac7378fe3bc037 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Sun, 26 Apr 2015 17:50:36 +1000 Subject: [PATCH 41/62] We won't get no error if we are not testing fd -1 sockets any more, and use errno for consistent parsing of socket close errors --- src/connector.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/connector.c b/src/connector.c index 6e469e29..2c1fb215 100644 --- a/src/connector.c +++ b/src/connector.c @@ -521,11 +521,11 @@ void *receiver(void *arg) /* See what type of error this is and raise the log * level of the message if it's unexpected. */ getsockopt(client->fd, SOL_SOCKET, SO_ERROR, (void *)&error, &errlen); - if (error && error != 104) { - LOGNOTICE("Client id %"PRId64" fd %d epollerr HUP in epoll with error %d: %s", + if (error != 104) { + LOGNOTICE("Client id %"PRId64" fd %d epollerr HUP in epoll with errno %d: %s", client->id, client->fd, error, strerror(error)); } else { - LOGINFO("Client id %"PRId64" fd %d epollerr HUP in epoll with error %d: %s", + LOGINFO("Client id %"PRId64" fd %d epollerr HUP in epoll with errno %d: %s", client->id, client->fd, error, strerror(error)); } invalidate_client(cdata->pi->ckp, cdata, client); From 8f1336986f91358786d4c4c9cfa88ec0058ab7ac Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Sun, 26 Apr 2015 18:06:20 +1000 Subject: [PATCH 42/62] Use a unique event structure for each client added to the epoll list --- src/connector.c | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/connector.c b/src/connector.c index 2c1fb215..2f0840f6 100644 --- a/src/connector.c +++ b/src/connector.c @@ -30,6 +30,7 @@ struct client_instance { UT_hash_handle hh; int64_t id; int fd; + struct epoll_event event; /* Reference count for when this instance is used outside of the * connector_data lock */ @@ -167,7 +168,6 @@ static int accept_client(cdata_t *cdata, const int epfd, const uint64_t server) int fd, port, no_clients, sockd; ckpool_t *ckp = cdata->ckp; client_instance_t *client; - struct epoll_event event; socklen_t address_len; ck_rlock(&cdata->lock); @@ -230,9 +230,9 @@ static int accept_client(cdata_t *cdata, const int epfd, const uint64_t server) ck_wunlock(&cdata->lock); client->fd = fd; - event.data.u64 = client->id; - event.events = EPOLLIN | EPOLLRDHUP; - if (unlikely(epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &event) < 0)) { + client->event.data.u64 = client->id; + client->event.events = EPOLLIN | EPOLLRDHUP; + if (unlikely(epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &client->event) < 0)) { LOGERR("Failed to epoll_ctl add in accept_client"); return 0; } @@ -256,7 +256,7 @@ static int drop_client(cdata_t *cdata, client_instance_t *client) if (fd != -1) { client_id = client->id; - epoll_ctl(cdata->epfd, EPOLL_CTL_DEL, fd, NULL); + epoll_ctl(cdata->epfd, EPOLL_CTL_DEL, fd, &client->event); nolinger_socket(fd); Close(client->fd); HASH_DEL(cdata->clients, client); From 93753620440e8456e7ebaaffce22516bd2f36b97 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Sun, 26 Apr 2015 18:53:30 +1000 Subject: [PATCH 43/62] Revert "Use a unique event structure for each client added to the epoll list" This reverts commit 8f1336986f91358786d4c4c9cfa88ec0058ab7ac. Unnecessary. --- src/connector.c | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/connector.c b/src/connector.c index 2f0840f6..2c1fb215 100644 --- a/src/connector.c +++ b/src/connector.c @@ -30,7 +30,6 @@ struct client_instance { UT_hash_handle hh; int64_t id; int fd; - struct epoll_event event; /* Reference count for when this instance is used outside of the * connector_data lock */ @@ -168,6 +167,7 @@ static int accept_client(cdata_t *cdata, const int epfd, const uint64_t server) int fd, port, no_clients, sockd; ckpool_t *ckp = cdata->ckp; client_instance_t *client; + struct epoll_event event; socklen_t address_len; ck_rlock(&cdata->lock); @@ -230,9 +230,9 @@ static int accept_client(cdata_t *cdata, const int epfd, const uint64_t server) ck_wunlock(&cdata->lock); client->fd = fd; - client->event.data.u64 = client->id; - client->event.events = EPOLLIN | EPOLLRDHUP; - if (unlikely(epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &client->event) < 0)) { + event.data.u64 = client->id; + event.events = EPOLLIN | EPOLLRDHUP; + if (unlikely(epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &event) < 0)) { LOGERR("Failed to epoll_ctl add in accept_client"); return 0; } @@ -256,7 +256,7 @@ static int drop_client(cdata_t *cdata, client_instance_t *client) if (fd != -1) { client_id = client->id; - epoll_ctl(cdata->epfd, EPOLL_CTL_DEL, fd, &client->event); + epoll_ctl(cdata->epfd, EPOLL_CTL_DEL, fd, NULL); nolinger_socket(fd); Close(client->fd); HASH_DEL(cdata->clients, client); From 0395dd052f3ed7ac1680f67c7a2e8b1341079254 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Sun, 26 Apr 2015 20:03:17 +1000 Subject: [PATCH 44/62] Use the client reference count in the connector to protect the client fd, closing it only once there are no more references to it --- src/connector.c | 59 +++++++++++++++++++++++++------------------------ 1 file changed, 30 insertions(+), 29 deletions(-) diff --git a/src/connector.c b/src/connector.c index 2c1fb215..a94ee304 100644 --- a/src/connector.c +++ b/src/connector.c @@ -29,12 +29,17 @@ struct client_instance { /* For clients hashtable */ UT_hash_handle hh; int64_t id; + + /* fd cannot be changed while a ref is held */ int fd; /* Reference count for when this instance is used outside of the * connector_data lock */ int ref; + /* Have we disabled this client to be removed when there are no refs? */ + bool invalid; + /* For dead_clients list */ client_instance_t *next; client_instance_t *prev; @@ -229,7 +234,6 @@ static int accept_client(cdata_t *cdata, const int epfd, const uint64_t server) cdata->nfds++; ck_wunlock(&cdata->lock); - client->fd = fd; event.data.u64 = client->id; event.events = EPOLLIN | EPOLLRDHUP; if (unlikely(epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &event) < 0)) { @@ -241,6 +245,7 @@ static int accept_client(cdata_t *cdata, const int epfd, const uint64_t server) * to it. We drop that reference when the socket is closed which * removes it automatically from the epoll list. */ __inc_instance_ref(client); + client->fd = fd; return 1; } @@ -249,16 +254,14 @@ static int accept_client(cdata_t *cdata, const int epfd, const uint64_t server) static int drop_client(cdata_t *cdata, client_instance_t *client) { int64_t client_id = 0; - int fd; + int fd = -1; ck_wlock(&cdata->lock); - fd = client->fd; - if (fd != -1) { + if (!client->invalid) { + client->invalid = true; client_id = client->id; - + fd = client->fd; epoll_ctl(cdata->epfd, EPOLL_CTL_DEL, fd, NULL); - nolinger_socket(fd); - Close(client->fd); HASH_DEL(cdata->clients, client); DL_APPEND(cdata->dead_clients, client); /* This is the reference to this client's presence in the @@ -324,6 +327,11 @@ static int invalidate_client(ckpool_t *ckp, cdata_t *cdata, client_instance_t *c if (!client->ref) { DL_DELETE(cdata->dead_clients, client); LOGINFO("Connector recycling client %"PRId64, client->id); + /* We only close the client fd once we're sure there + * are no references to it left to prevent fds being + * reused on new and old clients. */ + nolinger_socket(client->fd); + Close(client->fd); __recycle_client(cdata, client); } } @@ -405,7 +413,7 @@ reparse: /* Do not send messages of clients we've already dropped. We * do this unlocked as the occasional false negative can be * filtered by the stratifier. */ - if (likely(client->fd != -1)) { + if (likely(!client->invalid)) { if (ckp->passthrough) send_proc(ckp->generator, s); else @@ -427,7 +435,7 @@ static client_instance_t *ref_client_by_id(cdata_t *cdata, int64_t id) ck_wlock(&cdata->lock); HASH_FIND_I64(cdata->clients, &id, client); - if (client) + if (client && !client->invalid) __inc_instance_ref(client); ck_wunlock(&cdata->lock); @@ -506,13 +514,13 @@ void *receiver(void *arg) LOGNOTICE("Failed to find client by id %"PRId64" in receiver!", event.data.u64); continue; } - if (unlikely(client->fd == -1)) + if (unlikely(client->invalid)) goto noparse; /* We can have both messages and read hang ups so process the * message first. */ if (likely(event.events & EPOLLIN)) parse_client_msg(cdata, client); - if (unlikely(client->fd == -1)) + if (unlikely(client->invalid)) goto noparse; if (unlikely(event.events & EPOLLERR)) { socklen_t errlen = sizeof(int); @@ -602,17 +610,18 @@ void *sender(void *arg) continue; } client = sender_send->client; + if (unlikely(client->invalid)) { + LOGDEBUG("Discarding message sent to invalid client id %"PRId64, client->id); + goto contfree; + } /* If this socket is not ready to receive data from us, put the * send back on the tail of the list and decrease the timeout * to poll to either look for a client that is ready or poll * select on this one */ - ck_rlock(&cdata->lock); fd = client->fd; if (!ret) ret = wait_write_select(fd, 0); - ck_runlock(&cdata->lock); - if (ret < 1) { if (ret < 0) { LOGINFO("Client id %"PRId64" fd %d interrupted", client->id, fd); @@ -655,7 +664,7 @@ static void send_client(cdata_t *cdata, int64_t id, char *buf) { sender_send_t *sender_send; client_instance_t *client; - int fd = -1, len; + int len; if (unlikely(!buf)) { LOGWARNING("Connector send_client sent a null buffer"); @@ -670,25 +679,17 @@ static void send_client(cdata_t *cdata, int64_t id, char *buf) ck_wlock(&cdata->lock); HASH_FIND_I64(cdata->clients, &id, client); - if (likely(client)) { - fd = client->fd; - /* Grab a reference to this client until the sender_send has - * completed processing. */ + /* Grab a reference to this client until the sender_send has + * completed processing. */ + if (likely(client)) __inc_instance_ref(client); - } ck_wunlock(&cdata->lock); - if (unlikely(fd == -1)) { + if (unlikely(!client)) { ckpool_t *ckp = cdata->ckp; - if (client) { - /* This shouldn't happen */ - LOGWARNING("Client id %"PRId64" disconnected but fd already invalidated!", id); - invalidate_client(ckp, cdata, client); - } else { - LOGINFO("Connector failed to find client id %"PRId64" to send to", id); - stratifier_drop_id(ckp, id); - } + LOGINFO("Connector failed to find client id %"PRId64" to send to", id); + stratifier_drop_id(ckp, id); free(buf); return; } From ec39117a1c7d7bf2467c91a53053ade6b9b5ddcb Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Sun, 26 Apr 2015 20:17:55 +1000 Subject: [PATCH 45/62] Don't return client if it is invalid in ref_client_by_id in the connector --- src/connector.c | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/connector.c b/src/connector.c index a94ee304..e748808e 100644 --- a/src/connector.c +++ b/src/connector.c @@ -435,8 +435,12 @@ static client_instance_t *ref_client_by_id(cdata_t *cdata, int64_t id) ck_wlock(&cdata->lock); HASH_FIND_I64(cdata->clients, &id, client); - if (client && !client->invalid) - __inc_instance_ref(client); + if (client) { + if (!client->invalid) + __inc_instance_ref(client); + else + client = NULL; + } ck_wunlock(&cdata->lock); return client; From 1efe7120c49b20be51cc0cbc657125f5e38a1133 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Sun, 26 Apr 2015 21:39:53 +1000 Subject: [PATCH 46/62] Use ref_client_by_id in send_client instead of open coding it --- src/connector.c | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/src/connector.c b/src/connector.c index e748808e..3e020871 100644 --- a/src/connector.c +++ b/src/connector.c @@ -681,14 +681,9 @@ static void send_client(cdata_t *cdata, int64_t id, char *buf) return; } - ck_wlock(&cdata->lock); - HASH_FIND_I64(cdata->clients, &id, client); /* Grab a reference to this client until the sender_send has * completed processing. */ - if (likely(client)) - __inc_instance_ref(client); - ck_wunlock(&cdata->lock); - + client = ref_client_by_id(cdata, id); if (unlikely(!client)) { ckpool_t *ckp = cdata->ckp; From 3919d182607ad39ec8256ceaeef5d8e84730e2c0 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Mon, 27 Apr 2015 07:17:51 +1000 Subject: [PATCH 47/62] Rework write path to have no potentially blocking calls and be able to send partial messages --- src/connector.c | 135 +++++++++++++++++++----------------------------- 1 file changed, 52 insertions(+), 83 deletions(-) diff --git a/src/connector.c b/src/connector.c index 3e020871..2ac6bb40 100644 --- a/src/connector.c +++ b/src/connector.c @@ -63,6 +63,7 @@ struct sender_send { client_instance_t *client; char *buf; int len; + int ofs; }; typedef struct sender_send sender_send_t; @@ -98,7 +99,6 @@ struct connector_data { /* For the linked list of pending sends */ sender_send_t *sender_sends; - sender_send_t *delayed_sends; int64_t sends_generated; int64_t sends_delayed; @@ -559,27 +559,65 @@ out: return NULL; } +/* Send a sender_send message and return true if we've finished sending it or + * are unable to send any more. */ +static bool send_sender_send(ckpool_t *ckp, cdata_t *cdata, sender_send_t *sender_send) +{ + client_instance_t *client = sender_send->client; + + if (unlikely(client->invalid)) + return true; + + while (sender_send->len) { + int ret = send(client->fd, sender_send->buf + sender_send->ofs, sender_send->len , MSG_DONTWAIT); + + if (unlikely(ret < 1)) { + if (!ret) + return false; + if (errno == EAGAIN || errno == EWOULDBLOCK) + return false; + LOGINFO("Client id %"PRId64" fd %d disconnected", client->id, client->fd); + invalidate_client(ckp, cdata, client); + return true; + } + sender_send->ofs += ret; + sender_send->len -= ret; + } + return true; +} + +static void clear_sender_send(sender_send_t *sender_send, cdata_t *cdata) +{ + dec_instance_ref(cdata, sender_send->client); + free(sender_send->buf); + free(sender_send); +} + /* Use a thread to send queued messages, using select() to only send to sockets * ready for writing immediately to not delay other messages. */ -void *sender(void *arg) +static void *sender(void *arg) { cdata_t *cdata = (cdata_t *)arg; + sender_send_t *sends = NULL; ckpool_t *ckp = cdata->ckp; - bool sent = false; rename_proc("csender"); while (42) { - sender_send_t *sender_send, *delayed; - client_instance_t *client; - int ret = 0, fd, ofs = 0; + sender_send_t *sender_send, *sending, *tmp; + + /* Check all sends to see if they can be written out */ + DL_FOREACH_SAFE(sends, sending, tmp) { + if (send_sender_send(ckp, cdata, sending)) { + DL_DELETE(sends, sending); + clear_sender_send(sending, cdata); + } else + cdata->sends_delayed++; + } mutex_lock(&cdata->sender_lock); - /* Poll every 10ms if there are no new sends. Re-examine - * delayed sends immediately after a successful send in case - * endless new sends more frequently end up starving the - * delayed sends. */ - if (!cdata->sender_sends && !sent) { + /* Poll every 10ms if there are no new sends. */ + if (!cdata->sender_sends) { const ts_t polltime = {0, 10000000}; ts_t timeout_ts; @@ -592,70 +630,8 @@ void *sender(void *arg) DL_DELETE(cdata->sender_sends, sender_send); mutex_unlock(&cdata->sender_lock); - sent = false; - - /* Service delayed sends only if we have timed out on the - * conditional with no new sends appearing or have just - * serviced another message successfully. */ - if (!sender_send) { - /* Find a delayed client that needs servicing and set - * ret accordingly. We do not need to use FOREACH_SAFE - * as we break out of the loop as soon as we manipuate - * the list. */ - DL_FOREACH(cdata->delayed_sends, delayed) { - if ((ret = wait_write_select(delayed->client->fd, 0))) { - sender_send = cdata->delayed_sends; - DL_DELETE(cdata->delayed_sends, sender_send); - break; - } - } - /* None found ? */ - if (!sender_send) - continue; - } - client = sender_send->client; - if (unlikely(client->invalid)) { - LOGDEBUG("Discarding message sent to invalid client id %"PRId64, client->id); - goto contfree; - } - - /* If this socket is not ready to receive data from us, put the - * send back on the tail of the list and decrease the timeout - * to poll to either look for a client that is ready or poll - * select on this one */ - fd = client->fd; - if (!ret) - ret = wait_write_select(fd, 0); - if (ret < 1) { - if (ret < 0) { - LOGINFO("Client id %"PRId64" fd %d interrupted", client->id, fd); - invalidate_client(ckp, cdata, client); - goto contfree; - } - LOGDEBUG("Client %"PRId64" not ready for writes", client->id); - - /* Append it to the tail of the delayed sends list. - * This is the only function that alters it so no - * locking is required. Keep the client ref. */ - DL_APPEND(cdata->delayed_sends, sender_send); - cdata->sends_delayed++; - continue; - } - while (sender_send->len) { - ret = send(fd, sender_send->buf + ofs, sender_send->len , 0); - if (unlikely(ret < 0)) { - LOGINFO("Client id %"PRId64" fd %d disconnected", client->id, fd); - invalidate_client(ckp, cdata, client); - break; - } - ofs += ret; - sender_send->len -= ret; - } -contfree: - sent = true; - free(sender_send->buf); - free(sender_send); - dec_instance_ref(cdata, client); + if (sender_send) + DL_APPEND(sends, sender_send); } /* We shouldn't get here unless there's an error */ childsighandler(15); @@ -787,15 +763,8 @@ static char *connector_stats(cdata_t *cdata) objects = 0; memsize = 0; - mutex_lock(&cdata->sender_lock); generated = cdata->sends_delayed; - DL_FOREACH(cdata->delayed_sends, send) { - objects++; - memsize += sizeof(sender_send_t) + send->len + 1; - } - mutex_unlock(&cdata->sender_lock); - - JSON_CPACK(subval, "{si,si,si}", "count", objects, "memory", memsize, "generated", generated); + JSON_CPACK(subval, "{si}", "generated", generated); json_set_object(val, "delays", subval); buf = json_dumps(val, JSON_NO_UTF8 | JSON_PRESERVE_ORDER); From 883d870060217449fc8b9f2dcb19a11f23f32195 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Mon, 27 Apr 2015 17:34:38 +1000 Subject: [PATCH 48/62] Use the simpler read/write calls and make all client sockets non-blocking --- src/connector.c | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/src/connector.c b/src/connector.c index 2ac6bb40..fc4637b3 100644 --- a/src/connector.c +++ b/src/connector.c @@ -224,6 +224,7 @@ static int accept_client(cdata_t *cdata, const int epfd, const uint64_t server) } keep_sockalive(fd); + noblock_socket(fd); LOGINFO("Connected new client %d on socket %d to %d active clients from %s:%d", cdata->nfds, fd, no_clients, client->address_name, port); @@ -352,7 +353,8 @@ static void parse_client_msg(cdata_t *cdata, client_instance_t *client) retry: buflen = PAGESIZE - client->bufofs; - ret = recv(client->fd, client->buf + client->bufofs, buflen, MSG_DONTWAIT); + /* This read call is non-blocking since the socket is set to O_NOBLOCK */ + ret = read(client->fd, client->buf + client->bufofs, buflen); if (ret < 1) { if (!ret) return; @@ -569,7 +571,7 @@ static bool send_sender_send(ckpool_t *ckp, cdata_t *cdata, sender_send_t *sende return true; while (sender_send->len) { - int ret = send(client->fd, sender_send->buf + sender_send->ofs, sender_send->len , MSG_DONTWAIT); + int ret = write(client->fd, sender_send->buf + sender_send->ofs, sender_send->len); if (unlikely(ret < 1)) { if (!ret) @@ -593,8 +595,9 @@ static void clear_sender_send(sender_send_t *sender_send, cdata_t *cdata) free(sender_send); } -/* Use a thread to send queued messages, using select() to only send to sockets - * ready for writing immediately to not delay other messages. */ +/* Use a thread to send queued messages, appending them to the sends list and + * iterating over all of them, attempting to send them all non-blocking to + * only send to those clients ready to receive data. */ static void *sender(void *arg) { cdata_t *cdata = (cdata_t *)arg; From f580a82a06e6e3c9d9ec40b06fdb0b54cc5d66a6 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Mon, 27 Apr 2015 21:46:00 +1000 Subject: [PATCH 49/62] Drop the subclients of passthroughs that no longer exist --- src/connector.c | 57 ++++++++++++++++++++++++++++++++----------------- 1 file changed, 37 insertions(+), 20 deletions(-) diff --git a/src/connector.c b/src/connector.c index fc4637b3..e26626d0 100644 --- a/src/connector.c +++ b/src/connector.c @@ -643,8 +643,9 @@ static void *sender(void *arg) /* Send a client by id a heap allocated buffer, allowing this function to * free the ram. */ -static void send_client(cdata_t *cdata, int64_t id, char *buf) +static void send_client(cdata_t *cdata, const int64_t id, char *buf) { + ckpool_t *ckp = cdata->ckp; sender_send_t *sender_send; client_instance_t *client; int len; @@ -661,15 +662,35 @@ static void send_client(cdata_t *cdata, int64_t id, char *buf) } /* Grab a reference to this client until the sender_send has - * completed processing. */ - client = ref_client_by_id(cdata, id); - if (unlikely(!client)) { - ckpool_t *ckp = cdata->ckp; - - LOGINFO("Connector failed to find client id %"PRId64" to send to", id); - stratifier_drop_id(ckp, id); - free(buf); - return; + * completed processing. Is this a passthrough subclient ? */ + if (id > 0xffffffffll) { + int64_t client_id, pass_id; + + client_id = id & 0xffffffffll; + pass_id = id >> 32; + /* Make sure the passthrough exists for passthrough subclients */ + client = ref_client_by_id(cdata, pass_id); + if (unlikely(!client)) { + LOGINFO("Connector failed to find passthrough id %"PRId64" of client id %"PRId64" to send to", + pass_id, client_id); + /* Now see if the subclient exists */ + client = ref_client_by_id(cdata, client_id); + if (client) { + invalidate_client(ckp, cdata, client); + dec_instance_ref(cdata, client); + } else + stratifier_drop_id(ckp, id); + free(buf); + return; + } + } else { + client = ref_client_by_id(cdata, id); + if (unlikely(!client)) { + LOGINFO("Connector failed to find client id %"PRId64" to send to", id); + stratifier_drop_id(ckp, id); + free(buf); + return; + } } sender_send = ckzalloc(sizeof(sender_send_t)); @@ -696,7 +717,7 @@ static void passthrough_client(cdata_t *cdata, client_instance_t *client) static void process_client_msg(cdata_t *cdata, const char *buf) { - int64_t client_id64, client_id; + int64_t client_id; json_t *json_msg; char *msg; @@ -707,16 +728,12 @@ static void process_client_msg(cdata_t *cdata, const char *buf) } /* Extract the client id from the json message and remove its entry */ - client_id64 = json_integer_value(json_object_get(json_msg, "client_id")); + client_id = json_integer_value(json_object_get(json_msg, "client_id")); json_object_del(json_msg, "client_id"); - if (client_id64 > 0xffffffffll) { - int64_t passthrough_id; - - passthrough_id = client_id64 & 0xffffffffll; - client_id = client_id64 >> 32; - json_object_set_new_nocheck(json_msg, "client_id", json_integer(passthrough_id)); - } else - client_id = client_id64; + /* Put client_id back in for a passthrough subclient, passing its + * upstream client_id instead of the passthrough's. */ + if (client_id > 0xffffffffll) + json_object_set_new_nocheck(json_msg, "client_id", json_integer(client_id & 0xffffffffll)); msg = json_dumps(json_msg, JSON_EOL); send_client(cdata, client_id, msg); json_decref(json_msg); From 83b23d864fda7c0ad6f9aab29c230d1a24fd3e04 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Mon, 27 Apr 2015 22:08:41 +1000 Subject: [PATCH 50/62] Remove decrease of listen backlog which is of questionable utility --- src/connector.c | 13 ------------- 1 file changed, 13 deletions(-) diff --git a/src/connector.c b/src/connector.c index e26626d0..dac54f34 100644 --- a/src/connector.c +++ b/src/connector.c @@ -453,10 +453,8 @@ static client_instance_t *ref_client_by_id(cdata_t *cdata, int64_t id) void *receiver(void *arg) { cdata_t *cdata = (cdata_t *)arg; - bool dropped_backlog = false; struct epoll_event event; uint64_t serverfds, i; - time_t start_t; int ret, epfd; rename_proc("creceiver"); @@ -481,21 +479,10 @@ void *receiver(void *arg) while (!cdata->accept) cksleep_ms(1); - start_t = time(NULL); while (42) { client_instance_t *client; - if (unlikely(!dropped_backlog && time(NULL) - start_t > 90)) { - /* When we first start we listen to as many connections - * as possible. After the first minute we drop the - * listen to the minimum to effectively ratelimit how - * fast we can receive new connections. */ - dropped_backlog = true; - LOGNOTICE("Dropping server listen backlog to 0"); - for (i = 0; i < serverfds; i++) - listen(cdata->serverfd[i], 0); - } while (unlikely(!cdata->accept)) cksleep_ms(10); ret = epoll_wait(epfd, &event, 1, 1000); From 7c624502979d94942fc10737466726ad303fee17 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Mon, 27 Apr 2015 22:20:52 +1000 Subject: [PATCH 51/62] Increase listen backlog to speed up reconnects up to the system config limits --- src/connector.c | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/connector.c b/src/connector.c index dac54f34..17b4274d 100644 --- a/src/connector.c +++ b/src/connector.c @@ -919,7 +919,9 @@ int connector(proc_instance_t *pi) Close(sockd); goto out; } - if (listen(sockd, SOMAXCONN) < 0) { + /* Set listen backlog to larger than SOMAXCONN in case the + * system configuration supports it */ + if (listen(sockd, 1024) < 0) { LOGERR("Connector failed to listen on socket"); Close(sockd); goto out; @@ -961,7 +963,7 @@ int connector(proc_instance_t *pi) ret = 1; goto out; } - if (listen(sockd, SOMAXCONN) < 0) { + if (listen(sockd, 1024) < 0) { LOGERR("Connector failed to listen on socket"); Close(sockd); goto out; From f8356971658120168d77cebe3729e6369833fed0 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Tue, 28 Apr 2015 02:06:24 +1000 Subject: [PATCH 52/62] Microoptimisation --- src/connector.c | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/src/connector.c b/src/connector.c index 17b4274d..217946eb 100644 --- a/src/connector.c +++ b/src/connector.c @@ -356,9 +356,7 @@ retry: /* This read call is non-blocking since the socket is set to O_NOBLOCK */ ret = read(client->fd, client->buf + client->bufofs, buflen); if (ret < 1) { - if (!ret) - return; - if (errno == EAGAIN || errno == EWOULDBLOCK) + if (errno == EAGAIN || errno == EWOULDBLOCK || !ret) return; LOGINFO("Client fd %d disconnected - recv fail with bufofs %d ret %d errno %d %s", client->fd, client->bufofs, ret, errno, ret && errno ? strerror(errno) : ""); @@ -561,9 +559,7 @@ static bool send_sender_send(ckpool_t *ckp, cdata_t *cdata, sender_send_t *sende int ret = write(client->fd, sender_send->buf + sender_send->ofs, sender_send->len); if (unlikely(ret < 1)) { - if (!ret) - return false; - if (errno == EAGAIN || errno == EWOULDBLOCK) + if (errno == EAGAIN || errno == EWOULDBLOCK || !ret) return false; LOGINFO("Client id %"PRId64" fd %d disconnected", client->id, client->fd); invalidate_client(ckp, cdata, client); From ed44843e3745b6418bb789d7e4a87d274f1b90c3 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Tue, 28 Apr 2015 08:26:54 +1000 Subject: [PATCH 53/62] Append all new sender_ends in csender instead of only the first --- src/connector.c | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/src/connector.c b/src/connector.c index 217946eb..b64b578d 100644 --- a/src/connector.c +++ b/src/connector.c @@ -590,7 +590,7 @@ static void *sender(void *arg) rename_proc("csender"); while (42) { - sender_send_t *sender_send, *sending, *tmp; + sender_send_t *sending, *tmp; /* Check all sends to see if they can be written out */ DL_FOREACH_SAFE(sends, sending, tmp) { @@ -611,13 +611,11 @@ static void *sender(void *arg) timeraddspec(&timeout_ts, &polltime); cond_timedwait(&cdata->sender_cond, &cdata->sender_lock, &timeout_ts); } - sender_send = cdata->sender_sends; - if (sender_send) - DL_DELETE(cdata->sender_sends, sender_send); + if (cdata->sender_sends) { + DL_CONCAT(sends, cdata->sender_sends); + cdata->sender_sends = NULL; + } mutex_unlock(&cdata->sender_lock); - - if (sender_send) - DL_APPEND(sends, sender_send); } /* We shouldn't get here unless there's an error */ childsighandler(15); From 27d68200eca362cbdc18a12689f32eebb518893f Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Tue, 28 Apr 2015 08:42:12 +1000 Subject: [PATCH 54/62] Add better stats about queued sends consisten with the list changes --- src/connector.c | 23 +++++++++++++---------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/src/connector.c b/src/connector.c index b64b578d..89017590 100644 --- a/src/connector.c +++ b/src/connector.c @@ -102,6 +102,8 @@ struct connector_data { int64_t sends_generated; int64_t sends_delayed; + int64_t sends_queued; + int64_t sends_size; /* For protecting the pending sends list */ mutex_t sender_lock; @@ -590,6 +592,7 @@ static void *sender(void *arg) rename_proc("csender"); while (42) { + int64_t sends_queued = 0, sends_size = 0; sender_send_t *sending, *tmp; /* Check all sends to see if they can be written out */ @@ -597,11 +600,16 @@ static void *sender(void *arg) if (send_sender_send(ckp, cdata, sending)) { DL_DELETE(sends, sending); clear_sender_send(sending, cdata); - } else - cdata->sends_delayed++; + } else { + sends_queued++; + sends_size += sizeof(sender_send_t) + sending->len + 1; + } } mutex_lock(&cdata->sender_lock); + cdata->sends_delayed += sends_queued; + cdata->sends_queued = sends_queued; + cdata->sends_size = sends_size; /* Poll every 10ms if there are no new sends. */ if (!cdata->sender_sends) { const ts_t polltime = {0, 10000000}; @@ -751,21 +759,16 @@ static char *connector_stats(cdata_t *cdata) memsize = 0; mutex_lock(&cdata->sender_lock); - generated = cdata->sends_generated; DL_FOREACH(cdata->sender_sends, send) { objects++; memsize += sizeof(sender_send_t) + send->len + 1; } - mutex_unlock(&cdata->sender_lock); - - JSON_CPACK(subval, "{si,si,si}", "count", objects, "memory", memsize, "generated", generated); + JSON_CPACK(subval, "{si,si,si}", "count", objects, "memory", memsize, "generated", cdata->sends_generated); json_set_object(val, "sends", subval); - objects = 0; - memsize = 0; + JSON_CPACK(subval, "{si,si,si}", "count", cdata->sends_queued, "memory", cdata->sends_size, "generated", cdata->sends_delayed); + mutex_unlock(&cdata->sender_lock); - generated = cdata->sends_delayed; - JSON_CPACK(subval, "{si}", "generated", generated); json_set_object(val, "delays", subval); buf = json_dumps(val, JSON_NO_UTF8 | JSON_PRESERVE_ORDER); From 8447e3ff554675c3e24c5816e60dd0ee88a9ccb2 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Tue, 28 Apr 2015 08:44:21 +1000 Subject: [PATCH 55/62] Microoptimise --- src/connector.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/connector.c b/src/connector.c index 89017590..ae768f8b 100644 --- a/src/connector.c +++ b/src/connector.c @@ -358,7 +358,7 @@ retry: /* This read call is non-blocking since the socket is set to O_NOBLOCK */ ret = read(client->fd, client->buf + client->bufofs, buflen); if (ret < 1) { - if (errno == EAGAIN || errno == EWOULDBLOCK || !ret) + if (likely(errno == EAGAIN || errno == EWOULDBLOCK || !ret)) return; LOGINFO("Client fd %d disconnected - recv fail with bufofs %d ret %d errno %d %s", client->fd, client->bufofs, ret, errno, ret && errno ? strerror(errno) : ""); From 68e44be3ce3d8777760c2e828e8c094fcf13dce6 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Tue, 28 Apr 2015 08:52:16 +1000 Subject: [PATCH 56/62] Check for oversized client message before doing any reads to avoid possibility of exactly the wrong size buffer to ever invalidate the client, adding more info to a downgraded message --- src/connector.c | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/src/connector.c b/src/connector.c index ae768f8b..224fc5b8 100644 --- a/src/connector.c +++ b/src/connector.c @@ -354,6 +354,12 @@ static void parse_client_msg(cdata_t *cdata, client_instance_t *client) json_t *val; retry: + if (unlikely(client->bufofs > MAX_MSGSIZE)) { + LOGNOTICE("Client id %"PRId64" fd %d overloaded buffer without EOL, disconnecting", + client->id, client->fd); + invalidate_client(ckp, cdata, client); + return; + } buflen = PAGESIZE - client->bufofs; /* This read call is non-blocking since the socket is set to O_NOBLOCK */ ret = read(client->fd, client->buf + client->bufofs, buflen); @@ -368,14 +374,8 @@ retry: client->bufofs += ret; reparse: eol = memchr(client->buf, '\n', client->bufofs); - if (!eol) { - if (unlikely(client->bufofs > MAX_MSGSIZE)) { - LOGWARNING("Client fd %d overloaded buffer without EOL, disconnecting", client->fd); - invalidate_client(ckp, cdata, client); - return; - } + if (!eol) goto retry; - } /* Do something useful with this message now */ buflen = eol - client->buf + 1; From 806d1c76a2d057f2d9e0f105e95b4ae29b69a799 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Tue, 28 Apr 2015 09:02:52 +1000 Subject: [PATCH 57/62] Do not create a stratifier statsupdate thread in passthrough mode since the stats always read zero --- src/stratifier.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/stratifier.c b/src/stratifier.c index 6b4ee37b..356f032d 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -4531,7 +4531,8 @@ int stratifier(proc_instance_t *pi) create_pthread(&pth_blockupdate, blockupdate, ckp); mutex_init(&sdata->stats_lock); - create_pthread(&pth_statsupdate, statsupdate, ckp); + if (!ckp->passthrough) + create_pthread(&pth_statsupdate, statsupdate, ckp); mutex_init(&sdata->share_lock); mutex_init(&sdata->block_lock); From 146d164053a164b150ef83b425df2b14fb045d6b Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Tue, 28 Apr 2015 09:03:21 +1000 Subject: [PATCH 58/62] Put dropped at the start of the stratifier messages for easier parsing --- src/stratifier.c | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/src/stratifier.c b/src/stratifier.c index 356f032d..106344dc 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -1289,17 +1289,17 @@ static void __drop_client(sdata_t *sdata, stratum_instance_t *client, bool lazil if (client->workername) { if (user) { - ASPRINTF(msg, "Client %"PRId64" %s %suser %s worker %s dropped %s", - client->id, client->address, user->throttled ? "throttled " : "", - user->username, client->workername, lazily ? "lazily" : ""); + ASPRINTF(msg, "Dropped client %"PRId64" %s %suser %s worker %s %s", + client->id, client->address, user->throttled ? "throttled " : "", + user->username, client->workername, lazily ? "lazily" : ""); } else { - ASPRINTF(msg, "Client %"PRId64" %s no user worker %s dropped %s", - client->id, client->address, client->workername, - lazily ? "lazily" : ""); + ASPRINTF(msg, "Dropped client %"PRId64" %s no user worker %s %s", + client->id, client->address, client->workername, + lazily ? "lazily" : ""); } } else { - ASPRINTF(msg, "Workerless client %"PRId64" %s dropped %s", - client->id, client->address, lazily ? "lazily" : ""); + ASPRINTF(msg, "Dropped workerless client %"PRId64" %s %s", + client->id, client->address, lazily ? "lazily" : ""); } __del_client(sdata, client); __kill_instance(sdata, client); From 882bd693ddefe17d49324494e9d7ba598a3688ba Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Tue, 28 Apr 2015 09:26:34 +1000 Subject: [PATCH 59/62] Display connector stats every ~60s in passthrough mode --- src/connector.c | 31 +++++++++++++++++++++++++++---- 1 file changed, 27 insertions(+), 4 deletions(-) diff --git a/src/connector.c b/src/connector.c index 224fc5b8..fd03991f 100644 --- a/src/connector.c +++ b/src/connector.c @@ -74,6 +74,8 @@ struct connector_data { cklock_t lock; proc_instance_t *pi; + time_t start_time; + /* Array of server fds */ int *serverfd; /* All time count of clients connected */ @@ -728,7 +730,7 @@ static void process_client_msg(cdata_t *cdata, const char *buf) json_decref(json_msg); } -static char *connector_stats(cdata_t *cdata) +static char *connector_stats(cdata_t *cdata, const int runtime) { json_t *val = json_object(), *subval; client_instance_t *client; @@ -737,6 +739,10 @@ static char *connector_stats(cdata_t *cdata) int64_t memsize; char *buf; + /* If called in passthrough mode we log stats instead of the stratifier */ + if (runtime) + json_set_int(val, "runtime", runtime); + ck_rlock(&cdata->lock); objects = HASH_COUNT(cdata->clients); memsize = SAFE_HASH_OVERHEAD(cdata->clients) + sizeof(client_instance_t) * objects; @@ -773,7 +779,10 @@ static char *connector_stats(cdata_t *cdata) buf = json_dumps(val, JSON_NO_UTF8 | JSON_PRESERVE_ORDER); json_decref(val); - LOGNOTICE("Connector stats: %s", buf); + if (runtime) + LOGNOTICE("Passthrough:%s", buf); + else + LOGNOTICE("Connector stats: %s", buf); return buf; } @@ -781,13 +790,26 @@ static int connector_loop(proc_instance_t *pi, cdata_t *cdata) { unix_msg_t *umsg = NULL; ckpool_t *ckp = pi->ckp; + time_t last_stats; int64_t client_id; - char *buf; int ret = 0; + char *buf; LOGWARNING("%s connector ready", ckp->name); + last_stats = cdata->start_time; retry: + if (ckp->passthrough) { + time_t diff = time(NULL); + + if (diff - last_stats >= 60) { + last_stats = diff; + diff -= cdata->start_time; + buf = connector_stats(cdata, diff); + dealloc(buf); + } + } + if (umsg) { Close(umsg->sockd); free(umsg->buf); @@ -837,7 +859,7 @@ retry: char *msg; LOGDEBUG("Connector received stats request"); - msg = connector_stats(cdata); + msg = connector_stats(cdata, 0); send_unix_msg(umsg->sockd, msg); } else if (cmdmatch(buf, "loglevel")) { sscanf(buf, "loglevel=%d", &ckp->loglevel); @@ -982,6 +1004,7 @@ int connector(proc_instance_t *pi) cond_init(&cdata->sender_cond); create_pthread(&cdata->pth_sender, sender, cdata); create_pthread(&cdata->pth_receiver, receiver, cdata); + cdata->start_time = time(NULL); create_unix_receiver(pi); From 5365f1a420f498f741933964b214c5620272809c Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Tue, 28 Apr 2015 09:29:33 +1000 Subject: [PATCH 60/62] Increase listen backlog to 8k --- src/connector.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/connector.c b/src/connector.c index fd03991f..2f4972a4 100644 --- a/src/connector.c +++ b/src/connector.c @@ -940,7 +940,7 @@ int connector(proc_instance_t *pi) } /* Set listen backlog to larger than SOMAXCONN in case the * system configuration supports it */ - if (listen(sockd, 1024) < 0) { + if (listen(sockd, 8192) < 0) { LOGERR("Connector failed to listen on socket"); Close(sockd); goto out; @@ -982,7 +982,7 @@ int connector(proc_instance_t *pi) ret = 1; goto out; } - if (listen(sockd, 1024) < 0) { + if (listen(sockd, 8192) < 0) { LOGERR("Connector failed to listen on socket"); Close(sockd); goto out; From b73f44f00cf9453ee55b22e98c3a61234b4511d6 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Tue, 28 Apr 2015 09:50:09 +1000 Subject: [PATCH 61/62] Tidy up and demote oversize message, adding client id --- src/connector.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/connector.c b/src/connector.c index 2f4972a4..f01b0c2c 100644 --- a/src/connector.c +++ b/src/connector.c @@ -368,8 +368,8 @@ retry: if (ret < 1) { if (likely(errno == EAGAIN || errno == EWOULDBLOCK || !ret)) return; - LOGINFO("Client fd %d disconnected - recv fail with bufofs %d ret %d errno %d %s", - client->fd, client->bufofs, ret, errno, ret && errno ? strerror(errno) : ""); + LOGINFO("Client id %"PRId64" fd %d disconnected - recv fail with bufofs %d ret %d errno %d %s", + client->id, client->fd, client->bufofs, ret, errno, ret && errno ? strerror(errno) : ""); invalidate_client(ckp, cdata, client); return; } @@ -382,7 +382,7 @@ reparse: /* Do something useful with this message now */ buflen = eol - client->buf + 1; if (unlikely(buflen > MAX_MSGSIZE)) { - LOGWARNING("Client fd %d message oversize, disconnecting", client->fd); + LOGNOTICE("Client id %"PRId64" fd %d message oversize, disconnecting", client->id, client->fd); invalidate_client(ckp, cdata, client); return; } From dcfcae17799fa4ca6606728bcf90cf3c413e67c2 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Tue, 28 Apr 2015 09:53:17 +1000 Subject: [PATCH 62/62] Add errno details to write based close --- src/connector.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/connector.c b/src/connector.c index f01b0c2c..0bd6fac7 100644 --- a/src/connector.c +++ b/src/connector.c @@ -565,7 +565,8 @@ static bool send_sender_send(ckpool_t *ckp, cdata_t *cdata, sender_send_t *sende if (unlikely(ret < 1)) { if (errno == EAGAIN || errno == EWOULDBLOCK || !ret) return false; - LOGINFO("Client id %"PRId64" fd %d disconnected", client->id, client->fd); + LOGINFO("Client id %"PRId64" fd %d disconnected with write errno %d:%s", + client->id, client->fd, errno, strerror(errno)); invalidate_client(ckp, cdata, client); return true; }