From 9d099060d3415c7b3e29d74ec80533e4fc373fe4 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Tue, 17 Jun 2014 11:38:59 +1000 Subject: [PATCH 1/3] Clean up auth response handling and drop regular ckdb responses to verbose logging --- src/stratifier.c | 29 +++++++++++++++-------------- 1 file changed, 15 insertions(+), 14 deletions(-) diff --git a/src/stratifier.c b/src/stratifier.c index 6afad0b2..1b8bd338 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -418,10 +418,10 @@ static void send_workinfo(ckpool_t *ckp, workbase_t *wb) "createinet", "127.0.0.1"); buf = json_ckdb_call(ckp, "sharelog", val); if (likely(buf)) { - LOGWARNING("Got workinfo response: %s", buf); + LOGINFO("Got workinfo response: %s", buf); dealloc(buf); } else - LOGWARNING("Got no workinfo response :("); + LOGWARNING("Got no workinfo response from ckdb :("); } static void add_base(ckpool_t *ckp, workbase_t *wb, bool *new_block) @@ -1086,8 +1086,8 @@ static json_t *parse_subscribe(int client_id, json_t *params_val) return ret; } -/* FIXME: Talk to database here instead. This simply strips off the first part - * of the workername and matches it to a user or creates a new one. */ +/* This simply strips off the first part of the workername and matches it to a + * user or creates a new one. */ static user_instance_t *authorise_user(const char *workername) { char *fullname = strdupa(workername); @@ -1114,8 +1114,8 @@ static user_instance_t *authorise_user(const char *workername) return instance; } -/* FIXME: Send this to the database and parse the response to authorise a user - * and get parameters back */ +/* Send this to the database and parse the response to authorise a user + * and get SUID parameters back */ static bool send_recv_auth(stratum_instance_t *client) { char cdfield[64]; @@ -1145,14 +1145,15 @@ static bool send_recv_auth(stratum_instance_t *client) sscanf(buf, "id.%*d.%s", response); secondaryuserid = response; strsep(&secondaryuserid, "."); - LOGWARNING("Got auth response: %s response: %s suid: %s", buf, - response, secondaryuserid); - if (!strcmp(response, "added")) { + LOGINFO("User %s Worker %s got auth response: %s suid: %s", + client->user_instance->username, client->workername, + response, secondaryuserid); + if (!strcmp(response, "added") && secondaryuserid) { client->secondaryuserid = strdup(secondaryuserid); ret = true; } } else - LOGWARNING("Got no auth response :("); + LOGWARNING("Got no auth response from ckdb :("); return ret; } @@ -1662,10 +1663,10 @@ out_unlock: LOGERR("Failed to fopen %s", fname); buf = json_ckdb_call(client->ckp, "sharelog", val); if (likely(buf)) { - LOGWARNING("Got sharelog response: %s", buf); + LOGINFO("Got shares response: %s", buf); dealloc(buf); } else - LOGWARNING("Got no sharelog response :("); + LOGWARNING("Got no shares response from ckdb :("); out: if (!share) { val = json_pack("{ss,si,ss,ss,sI,ss,ss,so,si,ss,ss,ss,ss}", @@ -1685,10 +1686,10 @@ out: /* FIXME : Send val json to database here */ buf = json_ckdb_call(client->ckp, "sharelog", val); if (likely(buf)) { - LOGWARNING("Got sharelog response: %s", buf); + LOGINFO("Got shareerror response: %s", buf); dealloc(buf); } else - LOGWARNING("Got no sharelog response :("); + LOGWARNING("Got no shareerror response from ckdb :("); LOGINFO("Invalid share from client %d: %s", client->id, client->workername); } return json_boolean(result); From 8798f0862293b8a84317037a9b07de288328632e Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Tue, 17 Jun 2014 11:56:45 +1000 Subject: [PATCH 2/3] Make an array for the different ids we send to ckdb --- src/stratifier.c | 24 +++++++++++++++++------- 1 file changed, 17 insertions(+), 7 deletions(-) diff --git a/src/stratifier.c b/src/stratifier.c index 1b8bd338..43e03aca 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -263,6 +263,16 @@ static share_t *shares; static cklock_t share_lock; +#define ID_AUTH 0 +#define ID_SHARELOG 1 +#define ID_STATS 2 + +static const char *ckdb_ids[] = { + "authorise", + "sharelog", + "stats" +}; + static void generate_coinbase(ckpool_t *ckp, workbase_t *wb) { char header[228]; @@ -416,7 +426,7 @@ static void send_workinfo(ckpool_t *ckp, workbase_t *wb) "createby", "code", "createcode", __func__, "createinet", "127.0.0.1"); - buf = json_ckdb_call(ckp, "sharelog", val); + buf = json_ckdb_call(ckp, ckdb_ids[ID_SHARELOG], val); if (likely(buf)) { LOGINFO("Got workinfo response: %s", buf); dealloc(buf); @@ -1138,7 +1148,7 @@ static bool send_recv_auth(stratum_instance_t *client) "createby", "code", "createcode", __func__, "createinet", "127.0.0.1"); - buf = json_ckdb_call(client->ckp, "authorise", val); + buf = json_ckdb_call(client->ckp, ckdb_ids[ID_AUTH], val); if (likely(buf)) { char *secondaryuserid, *response = alloca(128); @@ -1661,7 +1671,7 @@ out_unlock: LOGERR("Failed to fwrite to %s", fname); } else LOGERR("Failed to fopen %s", fname); - buf = json_ckdb_call(client->ckp, "sharelog", val); + buf = json_ckdb_call(client->ckp, ckdb_ids[ID_SHARELOG], val); if (likely(buf)) { LOGINFO("Got shares response: %s", buf); dealloc(buf); @@ -1684,7 +1694,7 @@ out: "createcode", __func__, "createinet", "127.0.0.1"); /* FIXME : Send val json to database here */ - buf = json_ckdb_call(client->ckp, "sharelog", val); + buf = json_ckdb_call(client->ckp, ckdb_ids[ID_SHARELOG], val); if (likely(buf)) { LOGINFO("Got shareerror response: %s", buf); dealloc(buf); @@ -2237,12 +2247,12 @@ static void *statsupdate(void *arg) "createby", "code", "createcode", __func__, "createinet", "127.0.0.1"); - buf = json_ckdb_call(ckp, "stats", val); + buf = json_ckdb_call(ckp, ckdb_ids[ID_STATS], val); if (likely(buf)) { - LOGWARNING("Got stats response: %s", buf); + LOGINFO("Got stats response: %s", buf); dealloc(buf); } else - LOGWARNING("Got no stats response :("); + LOGWARNING("Got no stats response from ckdb :("); /* Update stats 4 times per minute for smooth values, displaying * status every minute. */ From 09f0586243d1de5c9552d12288354d0cfc3f0c8b Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Tue, 17 Jun 2014 12:28:10 +1000 Subject: [PATCH 3/3] Add all messages that we don't need to wait for the ckdb response to a separate thread message queue and cache them if ckdb is unresponsive --- src/ckpool.c | 8 ++-- src/stratifier.c | 118 ++++++++++++++++++++++++++++++++++------------- 2 files changed, 91 insertions(+), 35 deletions(-) diff --git a/src/ckpool.c b/src/ckpool.c index 47142152..7532d12c 100644 --- a/src/ckpool.c +++ b/src/ckpool.c @@ -324,7 +324,7 @@ out: } /* Send a json msg to ckdb with its idmsg and return the response, consuming - * the json */ + * the json on success */ char *_json_ckdb_call(const ckpool_t *ckp, const char *idmsg, json_t *val, const char *file, const char *func, const int line) { @@ -333,7 +333,7 @@ char *_json_ckdb_call(const ckpool_t *ckp, const char *idmsg, json_t *val, dump = json_dumps(val, JSON_COMPACT); if (unlikely(!dump)) { LOGWARNING("Json dump failed in json_ckdb_call from %s %s:%d", file, func, line); - goto out; + return buf; } ASPRINTF(&msg, "id.%s.json=%s", idmsg, dump); free(dump); @@ -341,8 +341,8 @@ char *_json_ckdb_call(const ckpool_t *ckp, const char *idmsg, json_t *val, buf = _send_recv_ckdb(ckp, msg, file, func, line); LOGDEBUG("Received from ckdb: %s", buf); free(msg); -out: - json_decref(val); + if (likely(buf)) + json_decref(val); return buf; } diff --git a/src/stratifier.c b/src/stratifier.c index 43e03aca..a4d15722 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -164,12 +164,14 @@ static pthread_mutex_t stratum_recv_lock; static pthread_mutex_t stratum_send_lock; static pthread_mutex_t sshare_lock; static pthread_mutex_t sauth_lock; +static pthread_mutex_t ckdbq_lock; /* For signalling the threads to wake up and do work */ static pthread_cond_t stratum_recv_cond; static pthread_cond_t stratum_send_cond; static pthread_cond_t sshare_cond; static pthread_cond_t sauth_cond; +static pthread_cond_t ckdbq_cond; /* For the linked list of all queued messages */ static stratum_msg_t *stratum_recvs; @@ -189,6 +191,18 @@ typedef struct json_params json_params_t; static json_params_t *sshares; static json_params_t *sauths; +struct ckdb_msg { + struct ckdb_msg *next; + struct ckdb_msg *prev; + + json_t *val; + int idtype; +}; + +typedef struct ckdb_msg ckdb_msg_t; + +static ckdb_msg_t *ckdb_msgs; + static int user_instance_id; struct user_instance { @@ -400,12 +414,23 @@ static void purge_share_hashtable(int64_t wb_id) LOGINFO("Cleared %d shares from share hashtable", purged); } -/* FIXME This message will be sent to the database once it's hooked in */ +static void ckdbq_add(const int idtype, json_t *val) +{ + ckdb_msg_t *msg = ckalloc(sizeof(ckdb_msg_t)); + + msg->val = val; + msg->idtype = idtype; + + mutex_lock(&ckdbq_lock); + DL_APPEND(ckdb_msgs, msg); + pthread_cond_signal(&ckdbq_cond); + mutex_unlock(&ckdbq_lock); +} + static void send_workinfo(ckpool_t *ckp, workbase_t *wb) { char cdfield[64]; json_t *val; - char *buf; sprintf(cdfield, "%lu,%lu", wb->gentime.tv_sec, wb->gentime.tv_nsec); @@ -426,12 +451,7 @@ static void send_workinfo(ckpool_t *ckp, workbase_t *wb) "createby", "code", "createcode", __func__, "createinet", "127.0.0.1"); - buf = json_ckdb_call(ckp, ckdb_ids[ID_SHARELOG], val); - if (likely(buf)) { - LOGINFO("Got workinfo response: %s", buf); - dealloc(buf); - } else - LOGWARNING("Got no workinfo response from ckdb :("); + ckdbq_add(ID_SHARELOG, val); } static void add_base(ckpool_t *ckp, workbase_t *wb, bool *new_block) @@ -986,6 +1006,7 @@ static void *blockupdate(void *arg) char *buf = NULL, hash[68]; char request[8]; + pthread_detach(pthread_self()); rename_proc("blockupdate"); buf = send_recv_proc(ckp->generator, "getbest"); if (buf && strncasecmp(buf, "Failed", 6)) @@ -1125,7 +1146,9 @@ static user_instance_t *authorise_user(const char *workername) } /* Send this to the database and parse the response to authorise a user - * and get SUID parameters back */ + * and get SUID parameters back. We don't add these requests to the ckdbqueue + * since we have to wait for the response but this is done from the authoriser + * thread so it won't hold anything up but other authorisations. */ static bool send_recv_auth(stratum_instance_t *client) { char cdfield[64]; @@ -1162,8 +1185,11 @@ static bool send_recv_auth(stratum_instance_t *client) client->secondaryuserid = strdup(secondaryuserid); ret = true; } - } else + } else { LOGWARNING("Got no auth response from ckdb :("); + json_decref(val); + } + return ret; } @@ -1481,9 +1507,9 @@ static json_t *parse_submit(stratum_instance_t *client, json_t *json_msg, const char *user, *job_id, *nonce2, *ntime, *nonce; double diff, wdiff = 0, sdiff = -1; enum share_err err = SE_NONE; - char *fname, *s, *buf; char idstring[20]; uint32_t ntime32; + char *fname, *s; workbase_t *wb; uchar hash[32]; int64_t id; @@ -1671,12 +1697,7 @@ out_unlock: LOGERR("Failed to fwrite to %s", fname); } else LOGERR("Failed to fopen %s", fname); - buf = json_ckdb_call(client->ckp, ckdb_ids[ID_SHARELOG], val); - if (likely(buf)) { - LOGINFO("Got shares response: %s", buf); - dealloc(buf); - } else - LOGWARNING("Got no shares response from ckdb :("); + ckdbq_add(ID_SHARELOG, val); out: if (!share) { val = json_pack("{ss,si,ss,ss,sI,ss,ss,so,si,ss,ss,ss,ss}", @@ -1693,13 +1714,7 @@ out: "createby", "code", "createcode", __func__, "createinet", "127.0.0.1"); - /* FIXME : Send val json to database here */ - buf = json_ckdb_call(client->ckp, ckdb_ids[ID_SHARELOG], val); - if (likely(buf)) { - LOGINFO("Got shareerror response: %s", buf); - dealloc(buf); - } else - LOGWARNING("Got no shareerror response from ckdb :("); + ckdbq_add(ID_SHARELOG, val); LOGINFO("Invalid share from client %d: %s", client->id, client->workername); } return json_boolean(result); @@ -1884,6 +1899,7 @@ static void *stratum_receiver(void *arg) ckpool_t *ckp = (ckpool_t *)arg; stratum_msg_t *msg; + pthread_detach(pthread_self()); rename_proc("sreceiver"); while (42) { @@ -1934,6 +1950,7 @@ static void *stratum_sender(void *arg) ckpool_t *ckp = (ckpool_t *)arg; stratum_msg_t *msg = NULL; + pthread_detach(pthread_self()); rename_proc("ssender"); while (42) { @@ -1984,6 +2001,7 @@ static void *share_processor(void *arg) ckpool_t __maybe_unused *ckp = (ckpool_t *)arg; json_params_t *jp = NULL; + pthread_detach(pthread_self()); rename_proc("sprocessor"); while (42) { json_t *result_val, *json_msg, *err_val = NULL; @@ -2030,6 +2048,7 @@ static void *authoriser(void *arg) ckpool_t *ckp = (ckpool_t *)arg; json_params_t *jp = NULL; + pthread_detach(pthread_self()); rename_proc("authoriser"); while (42) { @@ -2079,10 +2098,47 @@ static void *authoriser(void *arg) return NULL; } +static void *ckdbqueue(void *arg) +{ + ckpool_t *ckp = (ckpool_t *)arg; + + pthread_detach(pthread_self()); + rename_proc("ckdbqueue"); + + while (42) { + char *buf = NULL; + ckdb_msg_t *msg; + + mutex_lock(&ckdbq_lock); + if (!ckdb_msgs) + pthread_cond_wait(&ckdbq_cond, &ckdbq_lock); + msg = ckdb_msgs; + if (likely(msg)) + DL_DELETE(ckdb_msgs, msg); + mutex_unlock(&ckdbq_lock); + + if (unlikely(!msg)) + continue; + + while (!buf) { + buf = json_ckdb_call(ckp, ckdb_ids[msg->idtype], msg->val); + if (unlikely(!buf)) { + LOGWARNING("Failed to talk to ckdb, queueing messages"); + sleep(5); + } + } + LOGINFO("Got %s ckdb response: %s", ckdb_ids[msg->idtype], buf); + free(buf); + } + + return NULL; +} + static void *statsupdate(void *arg) { ckpool_t *ckp = (ckpool_t *)arg; + pthread_detach(pthread_self()); rename_proc("statsupdate"); tv_time(&stats.start_time); @@ -2097,11 +2153,11 @@ static void *statsupdate(void *arg) user_instance_t *instance, *tmp; char fname[512] = {}; tv_t now, diff; - char *s, *buf; int users, i; ts_t ts_now; json_t *val; FILE *fp; + char *s; tv_time(&now); timersub(&now, &stats.start_time, &diff); @@ -2247,12 +2303,7 @@ static void *statsupdate(void *arg) "createby", "code", "createcode", __func__, "createinet", "127.0.0.1"); - buf = json_ckdb_call(ckp, ckdb_ids[ID_STATS], val); - if (likely(buf)) { - LOGINFO("Got stats response: %s", buf); - dealloc(buf); - } else - LOGWARNING("Got no stats response from ckdb :("); + ckdbq_add(ID_STATS, val); /* Update stats 4 times per minute for smooth values, displaying * status every minute. */ @@ -2291,6 +2342,7 @@ int stratifier(proc_instance_t *pi) { pthread_t pth_blockupdate, pth_stratum_receiver, pth_stratum_sender; pthread_t pth_statsupdate, pth_share_processer, pth_authoriser; + pthread_t pth_ckdbqueue; ckpool_t *ckp = pi->ckp; char *buf; int ret; @@ -2332,6 +2384,10 @@ int stratifier(proc_instance_t *pi) cond_init(&sauth_cond); create_pthread(&pth_authoriser, authoriser, ckp); + mutex_init(&ckdbq_lock); + cond_init(&ckdbq_cond); + create_pthread(&pth_ckdbqueue, ckdbqueue, ckp); + cklock_init(&workbase_lock); if (!ckp->proxy) create_pthread(&pth_blockupdate, blockupdate, ckp);