From 09f0586243d1de5c9552d12288354d0cfc3f0c8b Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Tue, 17 Jun 2014 12:28:10 +1000 Subject: [PATCH] Add all messages that we don't need to wait for the ckdb response to a separate thread message queue and cache them if ckdb is unresponsive --- src/ckpool.c | 8 ++-- src/stratifier.c | 118 ++++++++++++++++++++++++++++++++++------------- 2 files changed, 91 insertions(+), 35 deletions(-) diff --git a/src/ckpool.c b/src/ckpool.c index 47142152..7532d12c 100644 --- a/src/ckpool.c +++ b/src/ckpool.c @@ -324,7 +324,7 @@ out: } /* Send a json msg to ckdb with its idmsg and return the response, consuming - * the json */ + * the json on success */ char *_json_ckdb_call(const ckpool_t *ckp, const char *idmsg, json_t *val, const char *file, const char *func, const int line) { @@ -333,7 +333,7 @@ char *_json_ckdb_call(const ckpool_t *ckp, const char *idmsg, json_t *val, dump = json_dumps(val, JSON_COMPACT); if (unlikely(!dump)) { LOGWARNING("Json dump failed in json_ckdb_call from %s %s:%d", file, func, line); - goto out; + return buf; } ASPRINTF(&msg, "id.%s.json=%s", idmsg, dump); free(dump); @@ -341,8 +341,8 @@ char *_json_ckdb_call(const ckpool_t *ckp, const char *idmsg, json_t *val, buf = _send_recv_ckdb(ckp, msg, file, func, line); LOGDEBUG("Received from ckdb: %s", buf); free(msg); -out: - json_decref(val); + if (likely(buf)) + json_decref(val); return buf; } diff --git a/src/stratifier.c b/src/stratifier.c index 43e03aca..a4d15722 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -164,12 +164,14 @@ static pthread_mutex_t stratum_recv_lock; static pthread_mutex_t stratum_send_lock; static pthread_mutex_t sshare_lock; static pthread_mutex_t sauth_lock; +static pthread_mutex_t ckdbq_lock; /* For signalling the threads to wake up and do work */ static pthread_cond_t stratum_recv_cond; static pthread_cond_t stratum_send_cond; static pthread_cond_t sshare_cond; static pthread_cond_t sauth_cond; +static pthread_cond_t ckdbq_cond; /* For the linked list of all queued messages */ static stratum_msg_t *stratum_recvs; @@ -189,6 +191,18 @@ typedef struct json_params json_params_t; static json_params_t *sshares; static json_params_t *sauths; +struct ckdb_msg { + struct ckdb_msg *next; + struct ckdb_msg *prev; + + json_t *val; + int idtype; +}; + +typedef struct ckdb_msg ckdb_msg_t; + +static ckdb_msg_t *ckdb_msgs; + static int user_instance_id; struct user_instance { @@ -400,12 +414,23 @@ static void purge_share_hashtable(int64_t wb_id) LOGINFO("Cleared %d shares from share hashtable", purged); } -/* FIXME This message will be sent to the database once it's hooked in */ +static void ckdbq_add(const int idtype, json_t *val) +{ + ckdb_msg_t *msg = ckalloc(sizeof(ckdb_msg_t)); + + msg->val = val; + msg->idtype = idtype; + + mutex_lock(&ckdbq_lock); + DL_APPEND(ckdb_msgs, msg); + pthread_cond_signal(&ckdbq_cond); + mutex_unlock(&ckdbq_lock); +} + static void send_workinfo(ckpool_t *ckp, workbase_t *wb) { char cdfield[64]; json_t *val; - char *buf; sprintf(cdfield, "%lu,%lu", wb->gentime.tv_sec, wb->gentime.tv_nsec); @@ -426,12 +451,7 @@ static void send_workinfo(ckpool_t *ckp, workbase_t *wb) "createby", "code", "createcode", __func__, "createinet", "127.0.0.1"); - buf = json_ckdb_call(ckp, ckdb_ids[ID_SHARELOG], val); - if (likely(buf)) { - LOGINFO("Got workinfo response: %s", buf); - dealloc(buf); - } else - LOGWARNING("Got no workinfo response from ckdb :("); + ckdbq_add(ID_SHARELOG, val); } static void add_base(ckpool_t *ckp, workbase_t *wb, bool *new_block) @@ -986,6 +1006,7 @@ static void *blockupdate(void *arg) char *buf = NULL, hash[68]; char request[8]; + pthread_detach(pthread_self()); rename_proc("blockupdate"); buf = send_recv_proc(ckp->generator, "getbest"); if (buf && strncasecmp(buf, "Failed", 6)) @@ -1125,7 +1146,9 @@ static user_instance_t *authorise_user(const char *workername) } /* Send this to the database and parse the response to authorise a user - * and get SUID parameters back */ + * and get SUID parameters back. We don't add these requests to the ckdbqueue + * since we have to wait for the response but this is done from the authoriser + * thread so it won't hold anything up but other authorisations. */ static bool send_recv_auth(stratum_instance_t *client) { char cdfield[64]; @@ -1162,8 +1185,11 @@ static bool send_recv_auth(stratum_instance_t *client) client->secondaryuserid = strdup(secondaryuserid); ret = true; } - } else + } else { LOGWARNING("Got no auth response from ckdb :("); + json_decref(val); + } + return ret; } @@ -1481,9 +1507,9 @@ static json_t *parse_submit(stratum_instance_t *client, json_t *json_msg, const char *user, *job_id, *nonce2, *ntime, *nonce; double diff, wdiff = 0, sdiff = -1; enum share_err err = SE_NONE; - char *fname, *s, *buf; char idstring[20]; uint32_t ntime32; + char *fname, *s; workbase_t *wb; uchar hash[32]; int64_t id; @@ -1671,12 +1697,7 @@ out_unlock: LOGERR("Failed to fwrite to %s", fname); } else LOGERR("Failed to fopen %s", fname); - buf = json_ckdb_call(client->ckp, ckdb_ids[ID_SHARELOG], val); - if (likely(buf)) { - LOGINFO("Got shares response: %s", buf); - dealloc(buf); - } else - LOGWARNING("Got no shares response from ckdb :("); + ckdbq_add(ID_SHARELOG, val); out: if (!share) { val = json_pack("{ss,si,ss,ss,sI,ss,ss,so,si,ss,ss,ss,ss}", @@ -1693,13 +1714,7 @@ out: "createby", "code", "createcode", __func__, "createinet", "127.0.0.1"); - /* FIXME : Send val json to database here */ - buf = json_ckdb_call(client->ckp, ckdb_ids[ID_SHARELOG], val); - if (likely(buf)) { - LOGINFO("Got shareerror response: %s", buf); - dealloc(buf); - } else - LOGWARNING("Got no shareerror response from ckdb :("); + ckdbq_add(ID_SHARELOG, val); LOGINFO("Invalid share from client %d: %s", client->id, client->workername); } return json_boolean(result); @@ -1884,6 +1899,7 @@ static void *stratum_receiver(void *arg) ckpool_t *ckp = (ckpool_t *)arg; stratum_msg_t *msg; + pthread_detach(pthread_self()); rename_proc("sreceiver"); while (42) { @@ -1934,6 +1950,7 @@ static void *stratum_sender(void *arg) ckpool_t *ckp = (ckpool_t *)arg; stratum_msg_t *msg = NULL; + pthread_detach(pthread_self()); rename_proc("ssender"); while (42) { @@ -1984,6 +2001,7 @@ static void *share_processor(void *arg) ckpool_t __maybe_unused *ckp = (ckpool_t *)arg; json_params_t *jp = NULL; + pthread_detach(pthread_self()); rename_proc("sprocessor"); while (42) { json_t *result_val, *json_msg, *err_val = NULL; @@ -2030,6 +2048,7 @@ static void *authoriser(void *arg) ckpool_t *ckp = (ckpool_t *)arg; json_params_t *jp = NULL; + pthread_detach(pthread_self()); rename_proc("authoriser"); while (42) { @@ -2079,10 +2098,47 @@ static void *authoriser(void *arg) return NULL; } +static void *ckdbqueue(void *arg) +{ + ckpool_t *ckp = (ckpool_t *)arg; + + pthread_detach(pthread_self()); + rename_proc("ckdbqueue"); + + while (42) { + char *buf = NULL; + ckdb_msg_t *msg; + + mutex_lock(&ckdbq_lock); + if (!ckdb_msgs) + pthread_cond_wait(&ckdbq_cond, &ckdbq_lock); + msg = ckdb_msgs; + if (likely(msg)) + DL_DELETE(ckdb_msgs, msg); + mutex_unlock(&ckdbq_lock); + + if (unlikely(!msg)) + continue; + + while (!buf) { + buf = json_ckdb_call(ckp, ckdb_ids[msg->idtype], msg->val); + if (unlikely(!buf)) { + LOGWARNING("Failed to talk to ckdb, queueing messages"); + sleep(5); + } + } + LOGINFO("Got %s ckdb response: %s", ckdb_ids[msg->idtype], buf); + free(buf); + } + + return NULL; +} + static void *statsupdate(void *arg) { ckpool_t *ckp = (ckpool_t *)arg; + pthread_detach(pthread_self()); rename_proc("statsupdate"); tv_time(&stats.start_time); @@ -2097,11 +2153,11 @@ static void *statsupdate(void *arg) user_instance_t *instance, *tmp; char fname[512] = {}; tv_t now, diff; - char *s, *buf; int users, i; ts_t ts_now; json_t *val; FILE *fp; + char *s; tv_time(&now); timersub(&now, &stats.start_time, &diff); @@ -2247,12 +2303,7 @@ static void *statsupdate(void *arg) "createby", "code", "createcode", __func__, "createinet", "127.0.0.1"); - buf = json_ckdb_call(ckp, ckdb_ids[ID_STATS], val); - if (likely(buf)) { - LOGINFO("Got stats response: %s", buf); - dealloc(buf); - } else - LOGWARNING("Got no stats response from ckdb :("); + ckdbq_add(ID_STATS, val); /* Update stats 4 times per minute for smooth values, displaying * status every minute. */ @@ -2291,6 +2342,7 @@ int stratifier(proc_instance_t *pi) { pthread_t pth_blockupdate, pth_stratum_receiver, pth_stratum_sender; pthread_t pth_statsupdate, pth_share_processer, pth_authoriser; + pthread_t pth_ckdbqueue; ckpool_t *ckp = pi->ckp; char *buf; int ret; @@ -2332,6 +2384,10 @@ int stratifier(proc_instance_t *pi) cond_init(&sauth_cond); create_pthread(&pth_authoriser, authoriser, ckp); + mutex_init(&ckdbq_lock); + cond_init(&ckdbq_cond); + create_pthread(&pth_ckdbqueue, ckdbqueue, ckp); + cklock_init(&workbase_lock); if (!ckp->proxy) create_pthread(&pth_blockupdate, blockupdate, ckp);