kanoi 11 years ago
parent
commit
d6b62be21f
  1. 6
      src/ckpool.c
  2. 143
      src/stratifier.c

6
src/ckpool.c

@ -324,7 +324,7 @@ out:
}
/* Send a json msg to ckdb with its idmsg and return the response, consuming
* the json */
* the json on success */
char *_json_ckdb_call(const ckpool_t *ckp, const char *idmsg, json_t *val,
const char *file, const char *func, const int line)
{
@ -333,7 +333,7 @@ char *_json_ckdb_call(const ckpool_t *ckp, const char *idmsg, json_t *val,
dump = json_dumps(val, JSON_COMPACT);
if (unlikely(!dump)) {
LOGWARNING("Json dump failed in json_ckdb_call from %s %s:%d", file, func, line);
goto out;
return buf;
}
ASPRINTF(&msg, "id.%s.json=%s", idmsg, dump);
free(dump);
@ -341,7 +341,7 @@ char *_json_ckdb_call(const ckpool_t *ckp, const char *idmsg, json_t *val,
buf = _send_recv_ckdb(ckp, msg, file, func, line);
LOGDEBUG("Received from ckdb: %s", buf);
free(msg);
out:
if (likely(buf))
json_decref(val);
return buf;
}

143
src/stratifier.c

@ -164,12 +164,14 @@ static pthread_mutex_t stratum_recv_lock;
static pthread_mutex_t stratum_send_lock;
static pthread_mutex_t sshare_lock;
static pthread_mutex_t sauth_lock;
static pthread_mutex_t ckdbq_lock;
/* For signalling the threads to wake up and do work */
static pthread_cond_t stratum_recv_cond;
static pthread_cond_t stratum_send_cond;
static pthread_cond_t sshare_cond;
static pthread_cond_t sauth_cond;
static pthread_cond_t ckdbq_cond;
/* For the linked list of all queued messages */
static stratum_msg_t *stratum_recvs;
@ -189,6 +191,18 @@ typedef struct json_params json_params_t;
static json_params_t *sshares;
static json_params_t *sauths;
struct ckdb_msg {
struct ckdb_msg *next;
struct ckdb_msg *prev;
json_t *val;
int idtype;
};
typedef struct ckdb_msg ckdb_msg_t;
static ckdb_msg_t *ckdb_msgs;
static int user_instance_id;
struct user_instance {
@ -263,6 +277,16 @@ static share_t *shares;
static cklock_t share_lock;
#define ID_AUTH 0
#define ID_SHARELOG 1
#define ID_STATS 2
static const char *ckdb_ids[] = {
"authorise",
"sharelog",
"stats"
};
static void generate_coinbase(ckpool_t *ckp, workbase_t *wb)
{
char header[228];
@ -390,12 +414,23 @@ static void purge_share_hashtable(int64_t wb_id)
LOGINFO("Cleared %d shares from share hashtable", purged);
}
/* FIXME This message will be sent to the database once it's hooked in */
static void ckdbq_add(const int idtype, json_t *val)
{
ckdb_msg_t *msg = ckalloc(sizeof(ckdb_msg_t));
msg->val = val;
msg->idtype = idtype;
mutex_lock(&ckdbq_lock);
DL_APPEND(ckdb_msgs, msg);
pthread_cond_signal(&ckdbq_cond);
mutex_unlock(&ckdbq_lock);
}
static void send_workinfo(ckpool_t *ckp, workbase_t *wb)
{
char cdfield[64];
json_t *val;
char *buf;
sprintf(cdfield, "%lu,%lu", wb->gentime.tv_sec, wb->gentime.tv_nsec);
@ -416,12 +451,7 @@ static void send_workinfo(ckpool_t *ckp, workbase_t *wb)
"createby", "code",
"createcode", __func__,
"createinet", "127.0.0.1");
buf = json_ckdb_call(ckp, "sharelog", val);
if (likely(buf)) {
LOGWARNING("Got workinfo response: %s", buf);
dealloc(buf);
} else
LOGWARNING("Got no workinfo response :(");
ckdbq_add(ID_SHARELOG, val);
}
static void add_base(ckpool_t *ckp, workbase_t *wb, bool *new_block)
@ -976,6 +1006,7 @@ static void *blockupdate(void *arg)
char *buf = NULL, hash[68];
char request[8];
pthread_detach(pthread_self());
rename_proc("blockupdate");
buf = send_recv_proc(ckp->generator, "getbest");
if (buf && strncasecmp(buf, "Failed", 6))
@ -1086,8 +1117,8 @@ static json_t *parse_subscribe(int client_id, json_t *params_val)
return ret;
}
/* FIXME: Talk to database here instead. This simply strips off the first part
* of the workername and matches it to a user or creates a new one. */
/* This simply strips off the first part of the workername and matches it to a
* user or creates a new one. */
static user_instance_t *authorise_user(const char *workername)
{
char *fullname = strdupa(workername);
@ -1114,8 +1145,10 @@ static user_instance_t *authorise_user(const char *workername)
return instance;
}
/* FIXME: Send this to the database and parse the response to authorise a user
* and get parameters back */
/* Send this to the database and parse the response to authorise a user
* and get SUID parameters back. We don't add these requests to the ckdbqueue
* since we have to wait for the response but this is done from the authoriser
* thread so it won't hold anything up but other authorisations. */
static bool send_recv_auth(stratum_instance_t *client)
{
char cdfield[64];
@ -1138,21 +1171,25 @@ static bool send_recv_auth(stratum_instance_t *client)
"createby", "code",
"createcode", __func__,
"createinet", "127.0.0.1");
buf = json_ckdb_call(client->ckp, "authorise", val);
buf = json_ckdb_call(client->ckp, ckdb_ids[ID_AUTH], val);
if (likely(buf)) {
char *secondaryuserid, *response = alloca(128);
sscanf(buf, "id.%*d.%s", response);
secondaryuserid = response;
strsep(&secondaryuserid, ".");
LOGWARNING("Got auth response: %s response: %s suid: %s", buf,
LOGINFO("User %s Worker %s got auth response: %s suid: %s",
client->user_instance->username, client->workername,
response, secondaryuserid);
if (!strcmp(response, "added")) {
if (!strcmp(response, "added") && secondaryuserid) {
client->secondaryuserid = strdup(secondaryuserid);
ret = true;
}
} else
LOGWARNING("Got no auth response :(");
} else {
LOGWARNING("Got no auth response from ckdb :(");
json_decref(val);
}
return ret;
}
@ -1470,9 +1507,9 @@ static json_t *parse_submit(stratum_instance_t *client, json_t *json_msg,
const char *user, *job_id, *nonce2, *ntime, *nonce;
double diff, wdiff = 0, sdiff = -1;
enum share_err err = SE_NONE;
char *fname, *s, *buf;
char idstring[20];
uint32_t ntime32;
char *fname, *s;
workbase_t *wb;
uchar hash[32];
int64_t id;
@ -1660,12 +1697,7 @@ out_unlock:
LOGERR("Failed to fwrite to %s", fname);
} else
LOGERR("Failed to fopen %s", fname);
buf = json_ckdb_call(client->ckp, "sharelog", val);
if (likely(buf)) {
LOGWARNING("Got sharelog response: %s", buf);
dealloc(buf);
} else
LOGWARNING("Got no sharelog response :(");
ckdbq_add(ID_SHARELOG, val);
out:
if (!share) {
val = json_pack("{ss,si,ss,ss,sI,ss,ss,so,si,ss,ss,ss,ss}",
@ -1682,13 +1714,7 @@ out:
"createby", "code",
"createcode", __func__,
"createinet", "127.0.0.1");
/* FIXME : Send val json to database here */
buf = json_ckdb_call(client->ckp, "sharelog", val);
if (likely(buf)) {
LOGWARNING("Got sharelog response: %s", buf);
dealloc(buf);
} else
LOGWARNING("Got no sharelog response :(");
ckdbq_add(ID_SHARELOG, val);
LOGINFO("Invalid share from client %d: %s", client->id, client->workername);
}
return json_boolean(result);
@ -1873,6 +1899,7 @@ static void *stratum_receiver(void *arg)
ckpool_t *ckp = (ckpool_t *)arg;
stratum_msg_t *msg;
pthread_detach(pthread_self());
rename_proc("sreceiver");
while (42) {
@ -1923,6 +1950,7 @@ static void *stratum_sender(void *arg)
ckpool_t *ckp = (ckpool_t *)arg;
stratum_msg_t *msg = NULL;
pthread_detach(pthread_self());
rename_proc("ssender");
while (42) {
@ -1973,6 +2001,7 @@ static void *share_processor(void *arg)
ckpool_t __maybe_unused *ckp = (ckpool_t *)arg;
json_params_t *jp = NULL;
pthread_detach(pthread_self());
rename_proc("sprocessor");
while (42) {
json_t *result_val, *json_msg, *err_val = NULL;
@ -2019,6 +2048,7 @@ static void *authoriser(void *arg)
ckpool_t *ckp = (ckpool_t *)arg;
json_params_t *jp = NULL;
pthread_detach(pthread_self());
rename_proc("authoriser");
while (42) {
@ -2068,10 +2098,47 @@ static void *authoriser(void *arg)
return NULL;
}
static void *ckdbqueue(void *arg)
{
ckpool_t *ckp = (ckpool_t *)arg;
pthread_detach(pthread_self());
rename_proc("ckdbqueue");
while (42) {
char *buf = NULL;
ckdb_msg_t *msg;
mutex_lock(&ckdbq_lock);
if (!ckdb_msgs)
pthread_cond_wait(&ckdbq_cond, &ckdbq_lock);
msg = ckdb_msgs;
if (likely(msg))
DL_DELETE(ckdb_msgs, msg);
mutex_unlock(&ckdbq_lock);
if (unlikely(!msg))
continue;
while (!buf) {
buf = json_ckdb_call(ckp, ckdb_ids[msg->idtype], msg->val);
if (unlikely(!buf)) {
LOGWARNING("Failed to talk to ckdb, queueing messages");
sleep(5);
}
}
LOGINFO("Got %s ckdb response: %s", ckdb_ids[msg->idtype], buf);
free(buf);
}
return NULL;
}
static void *statsupdate(void *arg)
{
ckpool_t *ckp = (ckpool_t *)arg;
pthread_detach(pthread_self());
rename_proc("statsupdate");
tv_time(&stats.start_time);
@ -2086,11 +2153,11 @@ static void *statsupdate(void *arg)
user_instance_t *instance, *tmp;
char fname[512] = {};
tv_t now, diff;
char *s, *buf;
int users, i;
ts_t ts_now;
json_t *val;
FILE *fp;
char *s;
tv_time(&now);
timersub(&now, &stats.start_time, &diff);
@ -2236,12 +2303,7 @@ static void *statsupdate(void *arg)
"createby", "code",
"createcode", __func__,
"createinet", "127.0.0.1");
buf = json_ckdb_call(ckp, "stats", val);
if (likely(buf)) {
LOGWARNING("Got stats response: %s", buf);
dealloc(buf);
} else
LOGWARNING("Got no stats response :(");
ckdbq_add(ID_STATS, val);
/* Update stats 4 times per minute for smooth values, displaying
* status every minute. */
@ -2280,6 +2342,7 @@ int stratifier(proc_instance_t *pi)
{
pthread_t pth_blockupdate, pth_stratum_receiver, pth_stratum_sender;
pthread_t pth_statsupdate, pth_share_processer, pth_authoriser;
pthread_t pth_ckdbqueue;
ckpool_t *ckp = pi->ckp;
char *buf;
int ret;
@ -2321,6 +2384,10 @@ int stratifier(proc_instance_t *pi)
cond_init(&sauth_cond);
create_pthread(&pth_authoriser, authoriser, ckp);
mutex_init(&ckdbq_lock);
cond_init(&ckdbq_cond);
create_pthread(&pth_ckdbqueue, ckdbqueue, ckp);
cklock_init(&workbase_lock);
if (!ckp->proxy)
create_pthread(&pth_blockupdate, blockupdate, ckp);

Loading…
Cancel
Save