kanoi 11 years ago
parent
commit
d6de5d1427
  1. 100
      src/ckpool.c
  2. 6
      src/ckpool.h
  3. 65
      src/stratifier.c

100
src/ckpool.c

@ -507,110 +507,18 @@ out:
return buf;
}
static const char *invalid_unknown = " (unknown reason)";
static const char *invalid_toodeep = " >9 levels, recursion?";
#define first_invalid(_json_data) _first_invalid(_json_data, 0)
static char *_first_invalid(json_t *json_data, int level)
{
const char *json_key, *json_str;
json_t *json_value;
void *json_iter;
int json_typ;
char buf[512], *inside;
bool found;
if (level > 9)
return strdup(invalid_toodeep);
buf[0] = '\0';
found = false;
json_iter = json_object_iter(json_data);
while (!found && json_iter) {
json_key = json_object_iter_key(json_iter);
json_value = json_object_iter_value(json_iter);
json_typ = json_typeof(json_value);
switch(json_typ) {
case JSON_STRING:
json_str = json_string_value(json_value);
if (json_str == NULL) {
snprintf(buf, sizeof(buf),
" %s is NULL", json_key);
found = true;
}
break;
case JSON_REAL:
case JSON_INTEGER:
case JSON_TRUE:
case JSON_FALSE:
break;
case JSON_ARRAY:
inside = _first_invalid(json_value, level+1);
if (inside != invalid_unknown) {
snprintf(buf, sizeof(buf),
" %s : [%s ]", json_key, inside);
free(inside);
found = true;
}
break;
case JSON_NULL:
snprintf(buf, sizeof(buf),
" %s is NULL", json_key);
found = true;
break;
default:
snprintf(buf, sizeof(buf),
" unknown type %d for %s",
json_typ, json_key);
found = true;
break;
}
if (!found)
json_iter = json_object_iter_next(json_data, json_iter);
}
if (!*buf) {
if (level > 0)
return (char *)invalid_unknown;
else
return strdup(invalid_unknown);
} else
return strdup(buf);
}
/* Send a json msg to ckdb with its idmsg and return the response, consuming
* the json on success */
char *_json_ckdb_call(const ckpool_t *ckp, const char *idmsg, json_t *val, bool logged,
const char *file, const char *func, const int line)
/* Send a json msg to ckdb and return the response */
char *_ckdb_msg_call(const ckpool_t *ckp, char *msg, const char *file, const char *func,
const int line)
{
char *msg = NULL, *dump, *buf = NULL;
dump = json_dumps(val, JSON_COMPACT);
if (unlikely(!dump)) {
char *invalid = first_invalid(val);
LOGWARNING("Json dump failed in json_ckdb_call from %s %s:%d%s", file, func, line, invalid);
free(invalid);
return buf;
}
ASPRINTF(&msg, "%s.id.json=%s", idmsg, dump);
if (!logged) {
char logname[512];
char *buf = NULL;
snprintf(logname, 511, "%s%s", ckp->logdir, ckp->ckdb_name);
rotating_log(logname, msg);
}
free(dump);
LOGDEBUG("Sending ckdb: %s", msg);
buf = _send_recv_ckdb(ckp, msg, file, func, line);
LOGDEBUG("Received from ckdb: %s", buf);
free(msg);
if (likely(buf))
json_decref(val);
return buf;
}
json_t *json_rpc_call(connsock_t *cs, const char *rpc_req)
{
char *http_req = NULL;

6
src/ckpool.h

@ -174,9 +174,9 @@ char *_send_recv_proc(proc_instance_t *pi, const char *msg, const char *file, co
#define send_recv_proc(pi, msg) _send_recv_proc(pi, msg, __FILE__, __func__, __LINE__)
char *_send_recv_ckdb(const ckpool_t *ckp, const char *msg, const char *file, const char *func, const int line);
#define send_recv_ckdb(ckp, msg) _send_recv_ckdb(ckp, msg, __FILE__, __func__, __LINE__)
char *_json_ckdb_call(const ckpool_t *ckp, const char *idmsg, json_t *val, bool logged,
const char *file, const char *func, const int line);
#define json_ckdb_call(ckp, idmsg, val, logged) _json_ckdb_call(ckp, idmsg, val, logged, __FILE__, __func__, __LINE__)
char *_ckdb_msg_call(const ckpool_t *ckp, char *msg, const char *file, const char *func,
const int line);
#define ckdb_msg_call(ckp, msg) _ckdb_msg_call(ckp, msg, __FILE__, __func__, __LINE__)
json_t *json_rpc_call(connsock_t *cs, const char *rpc_req);

65
src/stratifier.c

@ -170,13 +170,6 @@ struct json_params {
typedef struct json_params json_params_t;
struct ckdb_msg {
json_t *val;
int idtype;
};
typedef struct ckdb_msg ckdb_msg_t;
/* Stratum json messages with their associated client id */
struct smsg {
json_t *json_msg;
@ -415,12 +408,32 @@ static void purge_share_hashtable(int64_t wb_id)
static char *status_chars = "|/-\\";
/* Absorbs the json and generates a ckdb json message, logs it to the ckdb
* log and returns the malloced message. */
static char *ckdb_msg(ckpool_t *ckp, json_t *val, const int idtype)
{
char *json_msg = json_dumps(val, JSON_COMPACT);
char logname[512];
char *ret = NULL;
if (unlikely(!json_msg))
goto out;
ASPRINTF(&ret, "%s.id.json=%s", ckdb_ids[idtype], json_msg);
free(json_msg);
out:
json_decref(val);
snprintf(logname, 511, "%s%s", ckp->logdir, ckp->ckdb_name);
rotating_log(logname, ret);
return ret;
}
static void _ckdbq_add(ckpool_t *ckp, const int idtype, json_t *val, const char *file,
const char *func, const int line)
{
static int counter = 0;
static time_t time_counter;
ckdb_msg_t *msg;
static int counter = 0;
char *json_msg;
time_t now_t;
char ch;
@ -441,10 +454,13 @@ static void _ckdbq_add(ckpool_t *ckp, const int idtype, json_t *val, const char
if (ckp->standalone)
return json_decref(val);
msg = ckalloc(sizeof(ckdb_msg_t));
msg->val = val;
msg->idtype = idtype;
ckmsgq_add(ckdbq, msg);
json_msg = ckdb_msg(ckp, val, idtype);
if (unlikely(!json_msg)) {
LOGWARNING("Failed to dump json from %s %s:%d", file, func, line);
return;
}
ckmsgq_add(ckdbq, json_msg);
}
#define ckdbq_add(ckp, idtype, val) _ckdbq_add(ckp, idtype, val, __FILE__, __func__, __LINE__)
@ -1252,10 +1268,10 @@ static user_instance_t *authorise_user(const char *workername)
static bool send_recv_auth(stratum_instance_t *client)
{
ckpool_t *ckp = client->ckp;
char *buf, *json_msg;
char cdfield[64];
bool ret = false;
json_t *val;
char *buf;
ts_t now;
ts_realtime(&now);
@ -1272,7 +1288,13 @@ static bool send_recv_auth(stratum_instance_t *client)
"createby", "code",
"createcode", __func__,
"createinet", client->address);
buf = json_ckdb_call(ckp, ckdb_ids[ID_AUTH], val, false);
json_msg = ckdb_msg(ckp, val, ID_AUTH);
if (unlikely(!json_msg)) {
LOGWARNING("Failed to dump json in send_recv_auth");
return ret;
}
buf = ckdb_msg_call(ckp, json_msg);
free(json_msg);
if (likely(buf)) {
char *secondaryuserid, *response = alloca(128);
@ -1286,10 +1308,8 @@ static bool send_recv_auth(stratum_instance_t *client)
client->secondaryuserid = strdup(secondaryuserid);
ret = true;
}
} else {
} else
LOGWARNING("Got no auth response from ckdb :(");
json_decref(val);
}
return ret;
}
@ -2158,14 +2178,13 @@ out:
}
static void ckdbq_process(ckpool_t *ckp, ckdb_msg_t *data)
static void ckdbq_process(ckpool_t *ckp, char *msg)
{
static bool failed = false;
bool logged = false;
char *buf = NULL;
while (!buf) {
buf = json_ckdb_call(ckp, ckdb_ids[data->idtype], data->val, logged);
buf = ckdb_msg_call(ckp, msg);
if (unlikely(!buf)) {
if (!failed) {
failed = true;
@ -2173,13 +2192,13 @@ static void ckdbq_process(ckpool_t *ckp, ckdb_msg_t *data)
}
sleep(5);
}
logged = true;
}
free(msg);
if (failed) {
failed = false;
LOGWARNING("Successfully resumed talking to ckdb");
}
LOGINFO("Got %s ckdb response: %s", ckdb_ids[data->idtype], buf);
LOGINFO("Got ckdb response: %s", buf);
free(buf);
}

Loading…
Cancel
Save