Browse Source

Log ckdb messages before adding them to the queue of messages to send to it to ensure messages continue to be logged as generated even if they're queued to be sent to ckdb

master
Con Kolivas 11 years ago
parent
commit
c11b2c11f3
  1. 100
      src/ckpool.c
  2. 6
      src/ckpool.h
  3. 65
      src/stratifier.c

100
src/ckpool.c

@ -507,110 +507,18 @@ out:
return buf;
}
static const char *invalid_unknown = " (unknown reason)";
static const char *invalid_toodeep = " >9 levels, recursion?";
#define first_invalid(_json_data) _first_invalid(_json_data, 0)
static char *_first_invalid(json_t *json_data, int level)
{
const char *json_key, *json_str;
json_t *json_value;
void *json_iter;
int json_typ;
char buf[512], *inside;
bool found;
if (level > 9)
return strdup(invalid_toodeep);
buf[0] = '\0';
found = false;
json_iter = json_object_iter(json_data);
while (!found && json_iter) {
json_key = json_object_iter_key(json_iter);
json_value = json_object_iter_value(json_iter);
json_typ = json_typeof(json_value);
switch(json_typ) {
case JSON_STRING:
json_str = json_string_value(json_value);
if (json_str == NULL) {
snprintf(buf, sizeof(buf),
" %s is NULL", json_key);
found = true;
}
break;
case JSON_REAL:
case JSON_INTEGER:
case JSON_TRUE:
case JSON_FALSE:
break;
case JSON_ARRAY:
inside = _first_invalid(json_value, level+1);
if (inside != invalid_unknown) {
snprintf(buf, sizeof(buf),
" %s : [%s ]", json_key, inside);
free(inside);
found = true;
}
break;
case JSON_NULL:
snprintf(buf, sizeof(buf),
" %s is NULL", json_key);
found = true;
break;
default:
snprintf(buf, sizeof(buf),
" unknown type %d for %s",
json_typ, json_key);
found = true;
break;
}
if (!found)
json_iter = json_object_iter_next(json_data, json_iter);
}
if (!*buf) {
if (level > 0)
return (char *)invalid_unknown;
else
return strdup(invalid_unknown);
} else
return strdup(buf);
}
/* Send a json msg to ckdb with its idmsg and return the response, consuming
* the json on success */
char *_json_ckdb_call(const ckpool_t *ckp, const char *idmsg, json_t *val, bool logged,
const char *file, const char *func, const int line)
/* Send a json msg to ckdb and return the response */
char *_ckdb_msg_call(const ckpool_t *ckp, char *msg, const char *file, const char *func,
const int line)
{
char *msg = NULL, *dump, *buf = NULL;
dump = json_dumps(val, JSON_COMPACT);
if (unlikely(!dump)) {
char *invalid = first_invalid(val);
LOGWARNING("Json dump failed in json_ckdb_call from %s %s:%d%s", file, func, line, invalid);
free(invalid);
return buf;
}
ASPRINTF(&msg, "%s.id.json=%s", idmsg, dump);
if (!logged) {
char logname[512];
char *buf = NULL;
snprintf(logname, 511, "%s%s", ckp->logdir, ckp->ckdb_name);
rotating_log(logname, msg);
}
free(dump);
LOGDEBUG("Sending ckdb: %s", msg);
buf = _send_recv_ckdb(ckp, msg, file, func, line);
LOGDEBUG("Received from ckdb: %s", buf);
free(msg);
if (likely(buf))
json_decref(val);
return buf;
}
json_t *json_rpc_call(connsock_t *cs, const char *rpc_req)
{
char *http_req = NULL;

6
src/ckpool.h

@ -174,9 +174,9 @@ char *_send_recv_proc(proc_instance_t *pi, const char *msg, const char *file, co
#define send_recv_proc(pi, msg) _send_recv_proc(pi, msg, __FILE__, __func__, __LINE__)
char *_send_recv_ckdb(const ckpool_t *ckp, const char *msg, const char *file, const char *func, const int line);
#define send_recv_ckdb(ckp, msg) _send_recv_ckdb(ckp, msg, __FILE__, __func__, __LINE__)
char *_json_ckdb_call(const ckpool_t *ckp, const char *idmsg, json_t *val, bool logged,
const char *file, const char *func, const int line);
#define json_ckdb_call(ckp, idmsg, val, logged) _json_ckdb_call(ckp, idmsg, val, logged, __FILE__, __func__, __LINE__)
char *_ckdb_msg_call(const ckpool_t *ckp, char *msg, const char *file, const char *func,
const int line);
#define ckdb_msg_call(ckp, msg) _ckdb_msg_call(ckp, msg, __FILE__, __func__, __LINE__)
json_t *json_rpc_call(connsock_t *cs, const char *rpc_req);

65
src/stratifier.c

@ -170,13 +170,6 @@ struct json_params {
typedef struct json_params json_params_t;
struct ckdb_msg {
json_t *val;
int idtype;
};
typedef struct ckdb_msg ckdb_msg_t;
/* Stratum json messages with their associated client id */
struct smsg {
json_t *json_msg;
@ -415,12 +408,32 @@ static void purge_share_hashtable(int64_t wb_id)
static char *status_chars = "|/-\\";
/* Absorbs the json and generates a ckdb json message, logs it to the ckdb
* log and returns the malloced message. */
static char *ckdb_msg(ckpool_t *ckp, json_t *val, const int idtype)
{
char *json_msg = json_dumps(val, JSON_COMPACT);
char logname[512];
char *ret = NULL;
if (unlikely(!json_msg))
goto out;
ASPRINTF(&ret, "%s.id.json=%s", ckdb_ids[idtype], json_msg);
free(json_msg);
out:
json_decref(val);
snprintf(logname, 511, "%s%s", ckp->logdir, ckp->ckdb_name);
rotating_log(logname, ret);
return ret;
}
static void _ckdbq_add(ckpool_t *ckp, const int idtype, json_t *val, const char *file,
const char *func, const int line)
{
static int counter = 0;
static time_t time_counter;
ckdb_msg_t *msg;
static int counter = 0;
char *json_msg;
time_t now_t;
char ch;
@ -441,10 +454,13 @@ static void _ckdbq_add(ckpool_t *ckp, const int idtype, json_t *val, const char
if (ckp->standalone)
return json_decref(val);
msg = ckalloc(sizeof(ckdb_msg_t));
msg->val = val;
msg->idtype = idtype;
ckmsgq_add(ckdbq, msg);
json_msg = ckdb_msg(ckp, val, idtype);
if (unlikely(!json_msg)) {
LOGWARNING("Failed to dump json from %s %s:%d", file, func, line);
return;
}
ckmsgq_add(ckdbq, json_msg);
}
#define ckdbq_add(ckp, idtype, val) _ckdbq_add(ckp, idtype, val, __FILE__, __func__, __LINE__)
@ -1252,10 +1268,10 @@ static user_instance_t *authorise_user(const char *workername)
static bool send_recv_auth(stratum_instance_t *client)
{
ckpool_t *ckp = client->ckp;
char *buf, *json_msg;
char cdfield[64];
bool ret = false;
json_t *val;
char *buf;
ts_t now;
ts_realtime(&now);
@ -1272,7 +1288,13 @@ static bool send_recv_auth(stratum_instance_t *client)
"createby", "code",
"createcode", __func__,
"createinet", client->address);
buf = json_ckdb_call(ckp, ckdb_ids[ID_AUTH], val, false);
json_msg = ckdb_msg(ckp, val, ID_AUTH);
if (unlikely(!json_msg)) {
LOGWARNING("Failed to dump json in send_recv_auth");
return ret;
}
buf = ckdb_msg_call(ckp, json_msg);
free(json_msg);
if (likely(buf)) {
char *secondaryuserid, *response = alloca(128);
@ -1286,10 +1308,8 @@ static bool send_recv_auth(stratum_instance_t *client)
client->secondaryuserid = strdup(secondaryuserid);
ret = true;
}
} else {
} else
LOGWARNING("Got no auth response from ckdb :(");
json_decref(val);
}
return ret;
}
@ -2158,14 +2178,13 @@ out:
}
static void ckdbq_process(ckpool_t *ckp, ckdb_msg_t *data)
static void ckdbq_process(ckpool_t *ckp, char *msg)
{
static bool failed = false;
bool logged = false;
char *buf = NULL;
while (!buf) {
buf = json_ckdb_call(ckp, ckdb_ids[data->idtype], data->val, logged);
buf = ckdb_msg_call(ckp, msg);
if (unlikely(!buf)) {
if (!failed) {
failed = true;
@ -2173,13 +2192,13 @@ static void ckdbq_process(ckpool_t *ckp, ckdb_msg_t *data)
}
sleep(5);
}
logged = true;
}
free(msg);
if (failed) {
failed = false;
LOGWARNING("Successfully resumed talking to ckdb");
}
LOGINFO("Got %s ckdb response: %s", ckdb_ids[data->idtype], buf);
LOGINFO("Got ckdb response: %s", buf);
free(buf);
}

Loading…
Cancel
Save