Browse Source

Add all messages that we don't need to wait for the ckdb response to a separate thread message queue and cache them if ckdb is unresponsive

master
Con Kolivas 11 years ago
parent
commit
09f0586243
  1. 6
      src/ckpool.c
  2. 118
      src/stratifier.c

6
src/ckpool.c

@ -324,7 +324,7 @@ out:
} }
/* Send a json msg to ckdb with its idmsg and return the response, consuming /* Send a json msg to ckdb with its idmsg and return the response, consuming
* the json */ * the json on success */
char *_json_ckdb_call(const ckpool_t *ckp, const char *idmsg, json_t *val, char *_json_ckdb_call(const ckpool_t *ckp, const char *idmsg, json_t *val,
const char *file, const char *func, const int line) const char *file, const char *func, const int line)
{ {
@ -333,7 +333,7 @@ char *_json_ckdb_call(const ckpool_t *ckp, const char *idmsg, json_t *val,
dump = json_dumps(val, JSON_COMPACT); dump = json_dumps(val, JSON_COMPACT);
if (unlikely(!dump)) { if (unlikely(!dump)) {
LOGWARNING("Json dump failed in json_ckdb_call from %s %s:%d", file, func, line); LOGWARNING("Json dump failed in json_ckdb_call from %s %s:%d", file, func, line);
goto out; return buf;
} }
ASPRINTF(&msg, "id.%s.json=%s", idmsg, dump); ASPRINTF(&msg, "id.%s.json=%s", idmsg, dump);
free(dump); free(dump);
@ -341,7 +341,7 @@ char *_json_ckdb_call(const ckpool_t *ckp, const char *idmsg, json_t *val,
buf = _send_recv_ckdb(ckp, msg, file, func, line); buf = _send_recv_ckdb(ckp, msg, file, func, line);
LOGDEBUG("Received from ckdb: %s", buf); LOGDEBUG("Received from ckdb: %s", buf);
free(msg); free(msg);
out: if (likely(buf))
json_decref(val); json_decref(val);
return buf; return buf;
} }

118
src/stratifier.c

@ -164,12 +164,14 @@ static pthread_mutex_t stratum_recv_lock;
static pthread_mutex_t stratum_send_lock; static pthread_mutex_t stratum_send_lock;
static pthread_mutex_t sshare_lock; static pthread_mutex_t sshare_lock;
static pthread_mutex_t sauth_lock; static pthread_mutex_t sauth_lock;
static pthread_mutex_t ckdbq_lock;
/* For signalling the threads to wake up and do work */ /* For signalling the threads to wake up and do work */
static pthread_cond_t stratum_recv_cond; static pthread_cond_t stratum_recv_cond;
static pthread_cond_t stratum_send_cond; static pthread_cond_t stratum_send_cond;
static pthread_cond_t sshare_cond; static pthread_cond_t sshare_cond;
static pthread_cond_t sauth_cond; static pthread_cond_t sauth_cond;
static pthread_cond_t ckdbq_cond;
/* For the linked list of all queued messages */ /* For the linked list of all queued messages */
static stratum_msg_t *stratum_recvs; static stratum_msg_t *stratum_recvs;
@ -189,6 +191,18 @@ typedef struct json_params json_params_t;
static json_params_t *sshares; static json_params_t *sshares;
static json_params_t *sauths; static json_params_t *sauths;
struct ckdb_msg {
struct ckdb_msg *next;
struct ckdb_msg *prev;
json_t *val;
int idtype;
};
typedef struct ckdb_msg ckdb_msg_t;
static ckdb_msg_t *ckdb_msgs;
static int user_instance_id; static int user_instance_id;
struct user_instance { struct user_instance {
@ -400,12 +414,23 @@ static void purge_share_hashtable(int64_t wb_id)
LOGINFO("Cleared %d shares from share hashtable", purged); LOGINFO("Cleared %d shares from share hashtable", purged);
} }
/* FIXME This message will be sent to the database once it's hooked in */ static void ckdbq_add(const int idtype, json_t *val)
{
ckdb_msg_t *msg = ckalloc(sizeof(ckdb_msg_t));
msg->val = val;
msg->idtype = idtype;
mutex_lock(&ckdbq_lock);
DL_APPEND(ckdb_msgs, msg);
pthread_cond_signal(&ckdbq_cond);
mutex_unlock(&ckdbq_lock);
}
static void send_workinfo(ckpool_t *ckp, workbase_t *wb) static void send_workinfo(ckpool_t *ckp, workbase_t *wb)
{ {
char cdfield[64]; char cdfield[64];
json_t *val; json_t *val;
char *buf;
sprintf(cdfield, "%lu,%lu", wb->gentime.tv_sec, wb->gentime.tv_nsec); sprintf(cdfield, "%lu,%lu", wb->gentime.tv_sec, wb->gentime.tv_nsec);
@ -426,12 +451,7 @@ static void send_workinfo(ckpool_t *ckp, workbase_t *wb)
"createby", "code", "createby", "code",
"createcode", __func__, "createcode", __func__,
"createinet", "127.0.0.1"); "createinet", "127.0.0.1");
buf = json_ckdb_call(ckp, ckdb_ids[ID_SHARELOG], val); ckdbq_add(ID_SHARELOG, val);
if (likely(buf)) {
LOGINFO("Got workinfo response: %s", buf);
dealloc(buf);
} else
LOGWARNING("Got no workinfo response from ckdb :(");
} }
static void add_base(ckpool_t *ckp, workbase_t *wb, bool *new_block) static void add_base(ckpool_t *ckp, workbase_t *wb, bool *new_block)
@ -986,6 +1006,7 @@ static void *blockupdate(void *arg)
char *buf = NULL, hash[68]; char *buf = NULL, hash[68];
char request[8]; char request[8];
pthread_detach(pthread_self());
rename_proc("blockupdate"); rename_proc("blockupdate");
buf = send_recv_proc(ckp->generator, "getbest"); buf = send_recv_proc(ckp->generator, "getbest");
if (buf && strncasecmp(buf, "Failed", 6)) if (buf && strncasecmp(buf, "Failed", 6))
@ -1125,7 +1146,9 @@ static user_instance_t *authorise_user(const char *workername)
} }
/* Send this to the database and parse the response to authorise a user /* Send this to the database and parse the response to authorise a user
* and get SUID parameters back */ * and get SUID parameters back. We don't add these requests to the ckdbqueue
* since we have to wait for the response but this is done from the authoriser
* thread so it won't hold anything up but other authorisations. */
static bool send_recv_auth(stratum_instance_t *client) static bool send_recv_auth(stratum_instance_t *client)
{ {
char cdfield[64]; char cdfield[64];
@ -1162,8 +1185,11 @@ static bool send_recv_auth(stratum_instance_t *client)
client->secondaryuserid = strdup(secondaryuserid); client->secondaryuserid = strdup(secondaryuserid);
ret = true; ret = true;
} }
} else } else {
LOGWARNING("Got no auth response from ckdb :("); LOGWARNING("Got no auth response from ckdb :(");
json_decref(val);
}
return ret; return ret;
} }
@ -1481,9 +1507,9 @@ static json_t *parse_submit(stratum_instance_t *client, json_t *json_msg,
const char *user, *job_id, *nonce2, *ntime, *nonce; const char *user, *job_id, *nonce2, *ntime, *nonce;
double diff, wdiff = 0, sdiff = -1; double diff, wdiff = 0, sdiff = -1;
enum share_err err = SE_NONE; enum share_err err = SE_NONE;
char *fname, *s, *buf;
char idstring[20]; char idstring[20];
uint32_t ntime32; uint32_t ntime32;
char *fname, *s;
workbase_t *wb; workbase_t *wb;
uchar hash[32]; uchar hash[32];
int64_t id; int64_t id;
@ -1671,12 +1697,7 @@ out_unlock:
LOGERR("Failed to fwrite to %s", fname); LOGERR("Failed to fwrite to %s", fname);
} else } else
LOGERR("Failed to fopen %s", fname); LOGERR("Failed to fopen %s", fname);
buf = json_ckdb_call(client->ckp, ckdb_ids[ID_SHARELOG], val); ckdbq_add(ID_SHARELOG, val);
if (likely(buf)) {
LOGINFO("Got shares response: %s", buf);
dealloc(buf);
} else
LOGWARNING("Got no shares response from ckdb :(");
out: out:
if (!share) { if (!share) {
val = json_pack("{ss,si,ss,ss,sI,ss,ss,so,si,ss,ss,ss,ss}", val = json_pack("{ss,si,ss,ss,sI,ss,ss,so,si,ss,ss,ss,ss}",
@ -1693,13 +1714,7 @@ out:
"createby", "code", "createby", "code",
"createcode", __func__, "createcode", __func__,
"createinet", "127.0.0.1"); "createinet", "127.0.0.1");
/* FIXME : Send val json to database here */ ckdbq_add(ID_SHARELOG, val);
buf = json_ckdb_call(client->ckp, ckdb_ids[ID_SHARELOG], val);
if (likely(buf)) {
LOGINFO("Got shareerror response: %s", buf);
dealloc(buf);
} else
LOGWARNING("Got no shareerror response from ckdb :(");
LOGINFO("Invalid share from client %d: %s", client->id, client->workername); LOGINFO("Invalid share from client %d: %s", client->id, client->workername);
} }
return json_boolean(result); return json_boolean(result);
@ -1884,6 +1899,7 @@ static void *stratum_receiver(void *arg)
ckpool_t *ckp = (ckpool_t *)arg; ckpool_t *ckp = (ckpool_t *)arg;
stratum_msg_t *msg; stratum_msg_t *msg;
pthread_detach(pthread_self());
rename_proc("sreceiver"); rename_proc("sreceiver");
while (42) { while (42) {
@ -1934,6 +1950,7 @@ static void *stratum_sender(void *arg)
ckpool_t *ckp = (ckpool_t *)arg; ckpool_t *ckp = (ckpool_t *)arg;
stratum_msg_t *msg = NULL; stratum_msg_t *msg = NULL;
pthread_detach(pthread_self());
rename_proc("ssender"); rename_proc("ssender");
while (42) { while (42) {
@ -1984,6 +2001,7 @@ static void *share_processor(void *arg)
ckpool_t __maybe_unused *ckp = (ckpool_t *)arg; ckpool_t __maybe_unused *ckp = (ckpool_t *)arg;
json_params_t *jp = NULL; json_params_t *jp = NULL;
pthread_detach(pthread_self());
rename_proc("sprocessor"); rename_proc("sprocessor");
while (42) { while (42) {
json_t *result_val, *json_msg, *err_val = NULL; json_t *result_val, *json_msg, *err_val = NULL;
@ -2030,6 +2048,7 @@ static void *authoriser(void *arg)
ckpool_t *ckp = (ckpool_t *)arg; ckpool_t *ckp = (ckpool_t *)arg;
json_params_t *jp = NULL; json_params_t *jp = NULL;
pthread_detach(pthread_self());
rename_proc("authoriser"); rename_proc("authoriser");
while (42) { while (42) {
@ -2079,10 +2098,47 @@ static void *authoriser(void *arg)
return NULL; return NULL;
} }
static void *ckdbqueue(void *arg)
{
ckpool_t *ckp = (ckpool_t *)arg;
pthread_detach(pthread_self());
rename_proc("ckdbqueue");
while (42) {
char *buf = NULL;
ckdb_msg_t *msg;
mutex_lock(&ckdbq_lock);
if (!ckdb_msgs)
pthread_cond_wait(&ckdbq_cond, &ckdbq_lock);
msg = ckdb_msgs;
if (likely(msg))
DL_DELETE(ckdb_msgs, msg);
mutex_unlock(&ckdbq_lock);
if (unlikely(!msg))
continue;
while (!buf) {
buf = json_ckdb_call(ckp, ckdb_ids[msg->idtype], msg->val);
if (unlikely(!buf)) {
LOGWARNING("Failed to talk to ckdb, queueing messages");
sleep(5);
}
}
LOGINFO("Got %s ckdb response: %s", ckdb_ids[msg->idtype], buf);
free(buf);
}
return NULL;
}
static void *statsupdate(void *arg) static void *statsupdate(void *arg)
{ {
ckpool_t *ckp = (ckpool_t *)arg; ckpool_t *ckp = (ckpool_t *)arg;
pthread_detach(pthread_self());
rename_proc("statsupdate"); rename_proc("statsupdate");
tv_time(&stats.start_time); tv_time(&stats.start_time);
@ -2097,11 +2153,11 @@ static void *statsupdate(void *arg)
user_instance_t *instance, *tmp; user_instance_t *instance, *tmp;
char fname[512] = {}; char fname[512] = {};
tv_t now, diff; tv_t now, diff;
char *s, *buf;
int users, i; int users, i;
ts_t ts_now; ts_t ts_now;
json_t *val; json_t *val;
FILE *fp; FILE *fp;
char *s;
tv_time(&now); tv_time(&now);
timersub(&now, &stats.start_time, &diff); timersub(&now, &stats.start_time, &diff);
@ -2247,12 +2303,7 @@ static void *statsupdate(void *arg)
"createby", "code", "createby", "code",
"createcode", __func__, "createcode", __func__,
"createinet", "127.0.0.1"); "createinet", "127.0.0.1");
buf = json_ckdb_call(ckp, ckdb_ids[ID_STATS], val); ckdbq_add(ID_STATS, val);
if (likely(buf)) {
LOGINFO("Got stats response: %s", buf);
dealloc(buf);
} else
LOGWARNING("Got no stats response from ckdb :(");
/* Update stats 4 times per minute for smooth values, displaying /* Update stats 4 times per minute for smooth values, displaying
* status every minute. */ * status every minute. */
@ -2291,6 +2342,7 @@ int stratifier(proc_instance_t *pi)
{ {
pthread_t pth_blockupdate, pth_stratum_receiver, pth_stratum_sender; pthread_t pth_blockupdate, pth_stratum_receiver, pth_stratum_sender;
pthread_t pth_statsupdate, pth_share_processer, pth_authoriser; pthread_t pth_statsupdate, pth_share_processer, pth_authoriser;
pthread_t pth_ckdbqueue;
ckpool_t *ckp = pi->ckp; ckpool_t *ckp = pi->ckp;
char *buf; char *buf;
int ret; int ret;
@ -2332,6 +2384,10 @@ int stratifier(proc_instance_t *pi)
cond_init(&sauth_cond); cond_init(&sauth_cond);
create_pthread(&pth_authoriser, authoriser, ckp); create_pthread(&pth_authoriser, authoriser, ckp);
mutex_init(&ckdbq_lock);
cond_init(&ckdbq_cond);
create_pthread(&pth_ckdbqueue, ckdbqueue, ckp);
cklock_init(&workbase_lock); cklock_init(&workbase_lock);
if (!ckp->proxy) if (!ckp->proxy)
create_pthread(&pth_blockupdate, blockupdate, ckp); create_pthread(&pth_blockupdate, blockupdate, ckp);

Loading…
Cancel
Save