From c11b2c11f3e8e6ba63c19832592e2043da2e5cb5 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Thu, 14 Aug 2014 22:45:47 +1000 Subject: [PATCH] Log ckdb messages before adding them to the queue of messages to send to it to ensure messages continue to be logged as generated even if they're queued to be sent to ckdb --- src/ckpool.c | 100 ++--------------------------------------------- src/ckpool.h | 6 +-- src/stratifier.c | 65 +++++++++++++++++++----------- 3 files changed, 49 insertions(+), 122 deletions(-) diff --git a/src/ckpool.c b/src/ckpool.c index 77bf20cf..2793f258 100644 --- a/src/ckpool.c +++ b/src/ckpool.c @@ -507,110 +507,18 @@ out: return buf; } -static const char *invalid_unknown = " (unknown reason)"; -static const char *invalid_toodeep = " >9 levels, recursion?"; - -#define first_invalid(_json_data) _first_invalid(_json_data, 0) - -static char *_first_invalid(json_t *json_data, int level) -{ - const char *json_key, *json_str; - json_t *json_value; - void *json_iter; - int json_typ; - char buf[512], *inside; - bool found; - - if (level > 9) - return strdup(invalid_toodeep); - - buf[0] = '\0'; - found = false; - json_iter = json_object_iter(json_data); - while (!found && json_iter) { - json_key = json_object_iter_key(json_iter); - json_value = json_object_iter_value(json_iter); - json_typ = json_typeof(json_value); - switch(json_typ) { - case JSON_STRING: - json_str = json_string_value(json_value); - if (json_str == NULL) { - snprintf(buf, sizeof(buf), - " %s is NULL", json_key); - found = true; - } - break; - case JSON_REAL: - case JSON_INTEGER: - case JSON_TRUE: - case JSON_FALSE: - break; - case JSON_ARRAY: - inside = _first_invalid(json_value, level+1); - if (inside != invalid_unknown) { - snprintf(buf, sizeof(buf), - " %s : [%s ]", json_key, inside); - free(inside); - found = true; - } - break; - case JSON_NULL: - snprintf(buf, sizeof(buf), - " %s is NULL", json_key); - found = true; - break; - default: - snprintf(buf, sizeof(buf), - " unknown type %d for %s", - json_typ, json_key); - found = true; - break; - } - if (!found) - json_iter = json_object_iter_next(json_data, json_iter); - } - - if (!*buf) { - if (level > 0) - return (char *)invalid_unknown; - else - return strdup(invalid_unknown); - } else - return strdup(buf); -} - -/* Send a json msg to ckdb with its idmsg and return the response, consuming - * the json on success */ -char *_json_ckdb_call(const ckpool_t *ckp, const char *idmsg, json_t *val, bool logged, - const char *file, const char *func, const int line) +/* Send a json msg to ckdb and return the response */ +char *_ckdb_msg_call(const ckpool_t *ckp, char *msg, const char *file, const char *func, + const int line) { - char *msg = NULL, *dump, *buf = NULL; - - dump = json_dumps(val, JSON_COMPACT); - if (unlikely(!dump)) { - char *invalid = first_invalid(val); - LOGWARNING("Json dump failed in json_ckdb_call from %s %s:%d%s", file, func, line, invalid); - free(invalid); - return buf; - } - ASPRINTF(&msg, "%s.id.json=%s", idmsg, dump); - if (!logged) { - char logname[512]; + char *buf = NULL; - snprintf(logname, 511, "%s%s", ckp->logdir, ckp->ckdb_name); - rotating_log(logname, msg); - } - free(dump); LOGDEBUG("Sending ckdb: %s", msg); buf = _send_recv_ckdb(ckp, msg, file, func, line); LOGDEBUG("Received from ckdb: %s", buf); - free(msg); - if (likely(buf)) - json_decref(val); return buf; } - json_t *json_rpc_call(connsock_t *cs, const char *rpc_req) { char *http_req = NULL; diff --git a/src/ckpool.h b/src/ckpool.h index 43b14fab..c27c042b 100644 --- a/src/ckpool.h +++ b/src/ckpool.h @@ -174,9 +174,9 @@ char *_send_recv_proc(proc_instance_t *pi, const char *msg, const char *file, co #define send_recv_proc(pi, msg) _send_recv_proc(pi, msg, __FILE__, __func__, __LINE__) char *_send_recv_ckdb(const ckpool_t *ckp, const char *msg, const char *file, const char *func, const int line); #define send_recv_ckdb(ckp, msg) _send_recv_ckdb(ckp, msg, __FILE__, __func__, __LINE__) -char *_json_ckdb_call(const ckpool_t *ckp, const char *idmsg, json_t *val, bool logged, - const char *file, const char *func, const int line); -#define json_ckdb_call(ckp, idmsg, val, logged) _json_ckdb_call(ckp, idmsg, val, logged, __FILE__, __func__, __LINE__) +char *_ckdb_msg_call(const ckpool_t *ckp, char *msg, const char *file, const char *func, + const int line); +#define ckdb_msg_call(ckp, msg) _ckdb_msg_call(ckp, msg, __FILE__, __func__, __LINE__) json_t *json_rpc_call(connsock_t *cs, const char *rpc_req); diff --git a/src/stratifier.c b/src/stratifier.c index ed49e38c..c8383eba 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -170,13 +170,6 @@ struct json_params { typedef struct json_params json_params_t; -struct ckdb_msg { - json_t *val; - int idtype; -}; - -typedef struct ckdb_msg ckdb_msg_t; - /* Stratum json messages with their associated client id */ struct smsg { json_t *json_msg; @@ -415,12 +408,32 @@ static void purge_share_hashtable(int64_t wb_id) static char *status_chars = "|/-\\"; +/* Absorbs the json and generates a ckdb json message, logs it to the ckdb + * log and returns the malloced message. */ +static char *ckdb_msg(ckpool_t *ckp, json_t *val, const int idtype) +{ + char *json_msg = json_dumps(val, JSON_COMPACT); + char logname[512]; + char *ret = NULL; + + if (unlikely(!json_msg)) + goto out; + ASPRINTF(&ret, "%s.id.json=%s", ckdb_ids[idtype], json_msg); + free(json_msg); +out: + json_decref(val); + snprintf(logname, 511, "%s%s", ckp->logdir, ckp->ckdb_name); + rotating_log(logname, ret); + return ret; +} + static void _ckdbq_add(ckpool_t *ckp, const int idtype, json_t *val, const char *file, const char *func, const int line) { - static int counter = 0; static time_t time_counter; - ckdb_msg_t *msg; + static int counter = 0; + + char *json_msg; time_t now_t; char ch; @@ -441,10 +454,13 @@ static void _ckdbq_add(ckpool_t *ckp, const int idtype, json_t *val, const char if (ckp->standalone) return json_decref(val); - msg = ckalloc(sizeof(ckdb_msg_t)); - msg->val = val; - msg->idtype = idtype; - ckmsgq_add(ckdbq, msg); + json_msg = ckdb_msg(ckp, val, idtype); + if (unlikely(!json_msg)) { + LOGWARNING("Failed to dump json from %s %s:%d", file, func, line); + return; + } + + ckmsgq_add(ckdbq, json_msg); } #define ckdbq_add(ckp, idtype, val) _ckdbq_add(ckp, idtype, val, __FILE__, __func__, __LINE__) @@ -1252,10 +1268,10 @@ static user_instance_t *authorise_user(const char *workername) static bool send_recv_auth(stratum_instance_t *client) { ckpool_t *ckp = client->ckp; + char *buf, *json_msg; char cdfield[64]; bool ret = false; json_t *val; - char *buf; ts_t now; ts_realtime(&now); @@ -1272,7 +1288,13 @@ static bool send_recv_auth(stratum_instance_t *client) "createby", "code", "createcode", __func__, "createinet", client->address); - buf = json_ckdb_call(ckp, ckdb_ids[ID_AUTH], val, false); + json_msg = ckdb_msg(ckp, val, ID_AUTH); + if (unlikely(!json_msg)) { + LOGWARNING("Failed to dump json in send_recv_auth"); + return ret; + } + buf = ckdb_msg_call(ckp, json_msg); + free(json_msg); if (likely(buf)) { char *secondaryuserid, *response = alloca(128); @@ -1286,10 +1308,8 @@ static bool send_recv_auth(stratum_instance_t *client) client->secondaryuserid = strdup(secondaryuserid); ret = true; } - } else { + } else LOGWARNING("Got no auth response from ckdb :("); - json_decref(val); - } return ret; } @@ -2158,14 +2178,13 @@ out: } -static void ckdbq_process(ckpool_t *ckp, ckdb_msg_t *data) +static void ckdbq_process(ckpool_t *ckp, char *msg) { static bool failed = false; - bool logged = false; char *buf = NULL; while (!buf) { - buf = json_ckdb_call(ckp, ckdb_ids[data->idtype], data->val, logged); + buf = ckdb_msg_call(ckp, msg); if (unlikely(!buf)) { if (!failed) { failed = true; @@ -2173,13 +2192,13 @@ static void ckdbq_process(ckpool_t *ckp, ckdb_msg_t *data) } sleep(5); } - logged = true; } + free(msg); if (failed) { failed = false; LOGWARNING("Successfully resumed talking to ckdb"); } - LOGINFO("Got %s ckdb response: %s", ckdb_ids[data->idtype], buf); + LOGINFO("Got ckdb response: %s", buf); free(buf); }