Browse Source

Convert the authoriser and share processor to using the generic ckmsg queues and fix the data passed to the queue function

master
Con Kolivas 11 years ago
parent
commit
93ecc3a5c8
  1. 2
      src/ckpool.c
  2. 99
      src/stratifier.c

2
src/ckpool.c

@ -96,7 +96,7 @@ static void *ckmsg_queue(void *arg)
if (unlikely(!msg)) if (unlikely(!msg))
continue; continue;
ckmsgq->func(ckp, msg); ckmsgq->func(ckp, msg->data);
free(msg); free(msg);
} }
return NULL; return NULL;

99
src/stratifier.c

@ -160,23 +160,16 @@ static char lasthash[68];
/* For protecting the stratum msg data */ /* For protecting the stratum msg data */
static pthread_mutex_t stratum_recv_lock; static pthread_mutex_t stratum_recv_lock;
static pthread_mutex_t stratum_send_lock; static pthread_mutex_t stratum_send_lock;
static pthread_mutex_t sshare_lock;
static pthread_mutex_t sauth_lock;
/* For signalling the threads to wake up and do work */ /* For signalling the threads to wake up and do work */
static pthread_cond_t stratum_recv_cond; static pthread_cond_t stratum_recv_cond;
static pthread_cond_t stratum_send_cond; static pthread_cond_t stratum_send_cond;
static pthread_cond_t sshare_cond;
static pthread_cond_t sauth_cond;
/* For the linked list of all queued messages */ /* For the linked list of all queued messages */
static stratum_msg_t *stratum_recvs; static stratum_msg_t *stratum_recvs;
static stratum_msg_t *stratum_sends; static stratum_msg_t *stratum_sends;
struct json_params { struct json_params {
struct json_params *next;
struct json_params *prev;
json_t *params; json_t *params;
json_t *id_val; json_t *id_val;
int client_id; int client_id;
@ -184,9 +177,6 @@ struct json_params {
typedef struct json_params json_params_t; typedef struct json_params json_params_t;
static json_params_t *sshares;
static json_params_t *sauths;
struct ckdb_msg { struct ckdb_msg {
json_t *val; json_t *val;
int idtype; int idtype;
@ -195,6 +185,8 @@ struct ckdb_msg {
typedef struct ckdb_msg ckdb_msg_t; typedef struct ckdb_msg ckdb_msg_t;
static ckmsgq_t *ckdbq; static ckmsgq_t *ckdbq;
static ckmsgq_t *sshareq;
static ckmsgq_t *sauthq;
static int user_instance_id; static int user_instance_id;
@ -1826,11 +1818,7 @@ static void parse_method(const int client_id, json_t *id_val, json_t *method_val
if (!strncasecmp(method, "mining.auth", 11)) { if (!strncasecmp(method, "mining.auth", 11)) {
json_params_t *jp = create_json_params(client_id, params_val, id_val); json_params_t *jp = create_json_params(client_id, params_val, id_val);
mutex_lock(&sauth_lock); ckmsgq_add(sauthq, jp);
DL_APPEND(sauths, jp);
pthread_cond_signal(&sauth_cond);
mutex_unlock(&sauth_lock);
return; return;
} }
@ -1850,11 +1838,7 @@ static void parse_method(const int client_id, json_t *id_val, json_t *method_val
if (!strncasecmp(method, "mining.submit", 13)) { if (!strncasecmp(method, "mining.submit", 13)) {
json_params_t *jp = create_json_params(client_id, params_val, id_val); json_params_t *jp = create_json_params(client_id, params_val, id_val);
mutex_lock(&sshare_lock); ckmsgq_add(sshareq, jp);
DL_APPEND(sshares, jp);
pthread_cond_signal(&sshare_cond);
mutex_unlock(&sshare_lock);
return; return;
} }
@ -1991,32 +1975,12 @@ static void discard_json_params(json_params_t **jp)
*jp = NULL; *jp = NULL;
} }
static void *share_processor(void *arg) static void sshare_process(ckpool_t __maybe_unused *ckp, json_params_t *jp)
{ {
ckpool_t __maybe_unused *ckp = (ckpool_t *)arg;
json_params_t *jp = NULL;
pthread_detach(pthread_self());
rename_proc("sprocessor");
while (42) {
json_t *result_val, *json_msg, *err_val = NULL; json_t *result_val, *json_msg, *err_val = NULL;
stratum_instance_t *client; stratum_instance_t *client;
int client_id; int client_id;
if (jp)
discard_json_params(&jp);
mutex_lock(&sshare_lock);
if (!sshares)
pthread_cond_wait(&sshare_cond, &sshare_lock);
jp = sshares;
if (likely(jp))
DL_DELETE(sshares, jp);
mutex_unlock(&sshare_lock);
if (unlikely(!jp))
continue;
client_id = jp->client_id; client_id = jp->client_id;
ck_rlock(&instance_lock); ck_rlock(&instance_lock);
@ -2025,7 +1989,7 @@ static void *share_processor(void *arg)
if (unlikely(!client)) { if (unlikely(!client)) {
LOGINFO("Share processor failed to find client id %d in hashtable!", client_id); LOGINFO("Share processor failed to find client id %d in hashtable!", client_id);
continue; goto out;
} }
json_msg = json_object(); json_msg = json_object();
result_val = parse_submit(client, json_msg, jp->params, &err_val); result_val = parse_submit(client, json_msg, jp->params, &err_val);
@ -2033,38 +1997,15 @@ static void *share_processor(void *arg)
json_object_set_new_nocheck(json_msg, "error", err_val ? err_val : json_null()); json_object_set_new_nocheck(json_msg, "error", err_val ? err_val : json_null());
json_object_set_nocheck(json_msg, "id", jp->id_val); json_object_set_nocheck(json_msg, "id", jp->id_val);
stratum_add_send(json_msg, client_id); stratum_add_send(json_msg, client_id);
} out:
discard_json_params(&jp);
return NULL;
} }
static void *authoriser(void *arg) static void sauth_process(ckpool_t *ckp, json_params_t *jp)
{ {
ckpool_t *ckp = (ckpool_t *)arg;
json_params_t *jp = NULL;
pthread_detach(pthread_self());
rename_proc("authoriser");
while (42) {
json_t *result_val, *json_msg, *err_val = NULL; json_t *result_val, *json_msg, *err_val = NULL;
stratum_instance_t *client; stratum_instance_t *client;
int client_id; int client_id;
char buf[256];
if (jp)
discard_json_params(&jp);
mutex_lock(&sauth_lock);
if (!sauths)
pthread_cond_wait(&sauth_cond, &sauth_lock);
jp = sauths;
if (likely(jp))
DL_DELETE(sauths, jp);
mutex_unlock(&sauth_lock);
if (unlikely(!jp))
continue;
client_id = jp->client_id; client_id = jp->client_id;
@ -2074,11 +2015,13 @@ static void *authoriser(void *arg)
if (unlikely(!client)) { if (unlikely(!client)) {
LOGINFO("Authoriser failed to find client id %d in hashtable!", client_id); LOGINFO("Authoriser failed to find client id %d in hashtable!", client_id);
continue; goto out;
} }
result_val = parse_authorise(client, jp->params, &err_val); result_val = parse_authorise(client, jp->params, &err_val);
if (json_is_true(result_val)) { if (json_is_true(result_val)) {
snprintf(buf, 255, "Authorised, welcome to %s %s!", ckp->name, char *buf;
ASPRINTF(&buf, "Authorised, welcome to %s %s!", ckp->name,
client->user_instance->username); client->user_instance->username);
stratum_send_message(client, buf); stratum_send_message(client, buf);
} else } else
@ -2088,9 +2031,9 @@ static void *authoriser(void *arg)
json_object_set_new_nocheck(json_msg, "error", err_val ? err_val : json_null()); json_object_set_new_nocheck(json_msg, "error", err_val ? err_val : json_null());
json_object_set_nocheck(json_msg, "id", jp->id_val); json_object_set_nocheck(json_msg, "id", jp->id_val);
stratum_add_send(json_msg, client_id); stratum_add_send(json_msg, client_id);
} out:
discard_json_params(&jp);
return NULL;
} }
static void ckdbq_process(ckpool_t *ckp, ckdb_msg_t *data) static void ckdbq_process(ckpool_t *ckp, ckdb_msg_t *data)
@ -2357,7 +2300,7 @@ static void *statsupdate(void *arg)
int stratifier(proc_instance_t *pi) int stratifier(proc_instance_t *pi)
{ {
pthread_t pth_blockupdate, pth_stratum_receiver, pth_stratum_sender; pthread_t pth_blockupdate, pth_stratum_receiver, pth_stratum_sender;
pthread_t pth_statsupdate, pth_share_processer, pth_authoriser; pthread_t pth_statsupdate;
ckpool_t *ckp = pi->ckp; ckpool_t *ckp = pi->ckp;
char *buf; char *buf;
int ret; int ret;
@ -2391,14 +2334,8 @@ int stratifier(proc_instance_t *pi)
cond_init(&stratum_recv_cond); cond_init(&stratum_recv_cond);
create_pthread(&pth_stratum_receiver, stratum_receiver, ckp); create_pthread(&pth_stratum_receiver, stratum_receiver, ckp);
mutex_init(&sshare_lock); sshareq = create_ckmsgq(ckp, "sprocessor", &sshare_process);
cond_init(&sshare_cond); sauthq = create_ckmsgq(ckp, "authoriser", &sauth_process);
create_pthread(&pth_share_processer, share_processor, ckp);
mutex_init(&sauth_lock);
cond_init(&sauth_cond);
create_pthread(&pth_authoriser, authoriser, ckp);
ckdbq = create_ckmsgq(ckp, "ckdbqueue", &ckdbq_process); ckdbq = create_ckmsgq(ckp, "ckdbqueue", &ckdbq_process);
cklock_init(&workbase_lock); cklock_init(&workbase_lock);

Loading…
Cancel
Save