Browse Source

Create separate threads for processing share and authorisation submissions since they will be waiting for dtabase responses

master
Con Kolivas 11 years ago
parent
commit
7055f3b5d5
  1. 294
      src/stratifier.c

294
src/stratifier.c

@ -164,15 +164,33 @@ static char lasthash[68];
/* For protecting the stratum msg data */ /* For protecting the stratum msg data */
static pthread_mutex_t stratum_recv_lock; static pthread_mutex_t stratum_recv_lock;
static pthread_mutex_t stratum_send_lock; static pthread_mutex_t stratum_send_lock;
static pthread_mutex_t sshare_lock;
static pthread_mutex_t sauth_lock;
/* For signalling the threads to wake up and do work */ /* For signalling the threads to wake up and do work */
static pthread_cond_t stratum_recv_cond; static pthread_cond_t stratum_recv_cond;
static pthread_cond_t stratum_send_cond; static pthread_cond_t stratum_send_cond;
static pthread_cond_t sshare_cond;
static pthread_cond_t sauth_cond;
/* For the linked list of all queued messages */ /* For the linked list of all queued messages */
static stratum_msg_t *stratum_recvs; static stratum_msg_t *stratum_recvs;
static stratum_msg_t *stratum_sends; static stratum_msg_t *stratum_sends;
struct json_params {
struct json_params *next;
struct json_params *prev;
json_t *params;
json_t *id_val;
int client_id;
};
typedef struct json_params json_params_t;
static json_params_t *sshares;
static json_params_t *sauths;
static int user_instance_id; static int user_instance_id;
struct user_instance { struct user_instance {
@ -1677,6 +1695,7 @@ out:
return json_boolean(result); return json_boolean(result);
} }
#if 0
/* We should have already determined all the values passed to this are valid /* We should have already determined all the values passed to this are valid
* by now. Set update if we should also send the latest stratum parameters */ * by now. Set update if we should also send the latest stratum parameters */
static json_t *gen_json_result(int client_id, json_t *json_msg, json_t *method_val, static json_t *gen_json_result(int client_id, json_t *json_msg, json_t *method_val,
@ -1731,6 +1750,7 @@ static json_t *gen_json_result(int client_id, json_t *json_msg, json_t *method_v
out: out:
return ret; return ret;
} }
#endif
/* Must enter with workbase_lock held */ /* Must enter with workbase_lock held */
static json_t *__stratum_notify(bool clean) static json_t *__stratum_notify(bool clean)
@ -1776,6 +1796,7 @@ static void stratum_send_update(int client_id, bool clean)
stratum_add_send(json_msg, client_id); stratum_add_send(json_msg, client_id);
} }
#if 0
static void parse_instance_msg(int client_id, json_t *msg) static void parse_instance_msg(int client_id, json_t *msg)
{ {
json_t *result_val = NULL, *err_val = NULL, *id_val = NULL; json_t *result_val = NULL, *err_val = NULL, *id_val = NULL;
@ -1838,6 +1859,133 @@ out:
stratum_send_diff(client); stratum_send_diff(client);
} }
} }
#endif
static void send_json_err(int client_id, json_t *id_val, const char *err_msg)
{
json_t *val;
val = json_pack("{soss}", "id", json_copy(id_val), "error", err_msg);
stratum_add_send(val, client_id);
}
static void update_client(const int client_id)
{
stratum_instance_t *client;
stratum_send_update(client_id, true);
ck_rlock(&instance_lock);
client = __instance_by_id(client_id);
ck_runlock(&instance_lock);
if (likely(client))
stratum_send_diff(client);
}
static json_params_t *create_json_params(const int client_id, const json_t *params, const json_t *id_val)
{
json_params_t *jp = ckalloc(sizeof(json_params_t));
jp->params = json_deep_copy(params);
jp->id_val = json_deep_copy(id_val);
jp->client_id = client_id;
return jp;
}
static void parse_method(const int client_id, json_t *id_val, json_t *method_val,
json_t *params_val)
{
stratum_instance_t *client;
const char *method;
json_t *val;
method = json_string_value(method_val);
if (!strncasecmp(method, "mining.subscribe", 16)) {
val = parse_subscribe(client_id, params_val);
if (!val)
return;
json_object_set(val, "id", id_val);
json_object_set(val, "error", json_null());
stratum_add_send(val, client_id);
update_client(client_id);
return;
}
ck_rlock(&instance_lock);
client = __instance_by_id(client_id);
ck_runlock(&instance_lock);
if (unlikely(!client)) {
LOGINFO("Failed to find client id %d in hashtable!", client_id);
return;
}
if (!strncasecmp(method, "mining.auth", 11)) {
json_params_t *jp = create_json_params(client_id, params_val, id_val);
mutex_lock(&sauth_lock);
DL_APPEND(sauths, jp);
pthread_cond_signal(&sauth_cond);
mutex_unlock(&sauth_lock);
return;
}
/* We should only accept authorised requests from here on */
if (!client->authorised) {
char buf[256];
/* Dropping unauthorised clients here also allows the
* stratifier process to restart since it will have lost all
* the stratum instance data. Clients will just reconnect. */
LOGINFO("Dropping unauthorised client %d", client->id);
snprintf(buf, 255, "dropclient=%d", client->id);
send_proc(client->ckp->connector, buf);
return;
}
if (!strncasecmp(method, "mining.submit", 13)) {
json_params_t *jp = create_json_params(client_id, params_val, id_val);
mutex_lock(&sshare_lock);
DL_APPEND(sshares, jp);
pthread_cond_signal(&sshare_cond);
mutex_unlock(&sshare_lock);
return;
}
/* Unhandled message here */
}
static void parse_instance_msg(stratum_msg_t *msg)
{
json_t *val = msg->json_msg, *id_val, *method, *params;
int client_id = msg->client_id;
/* Return back the same id_val even if it's null or not existent. */
id_val = json_object_get(val, "id");
method = json_object_get(val, "method");
if (unlikely(!method)) {
send_json_err(client_id, id_val, "-3:method not found");
goto out;
}
if (unlikely(!json_is_string(method))) {
send_json_err(client_id, id_val, "-1:method is not string");
goto out;
}
params = json_object_get(val, "params");
if (unlikely(!params)) {
send_json_err(client_id, id_val, "-1:params not found");
goto out;
}
parse_method(client_id, id_val, method, params);
out:
json_decref(val);
free(msg);
}
static void *stratum_receiver(void *arg) static void *stratum_receiver(void *arg)
{ {
@ -1846,6 +1994,7 @@ static void *stratum_receiver(void *arg)
rename_proc("sreceiver"); rename_proc("sreceiver");
sleep(1);
while (42) { while (42) {
stratum_instance_t *instance; stratum_instance_t *instance;
@ -1876,25 +2025,32 @@ static void *stratum_receiver(void *arg)
} }
ck_uilock(&instance_lock); ck_uilock(&instance_lock);
parse_instance_msg(msg->client_id, msg->json_msg); parse_instance_msg(msg);
json_decref(msg->json_msg);
free(msg);
} }
return NULL; return NULL;
} }
static void discard_stratum_msg(stratum_msg_t **msg)
{
json_decref((*msg)->json_msg);
free(*msg);
*msg = NULL;
}
static void *stratum_sender(void *arg) static void *stratum_sender(void *arg)
{ {
ckpool_t *ckp = (ckpool_t *)arg; ckpool_t *ckp = (ckpool_t *)arg;
stratum_msg_t *msg = NULL;
rename_proc("ssender"); rename_proc("ssender");
while (42) { while (42) {
stratum_msg_t *msg;
char *s; char *s;
if (msg)
discard_stratum_msg(&msg);
mutex_lock(&stratum_send_lock); mutex_lock(&stratum_send_lock);
if (!stratum_sends) if (!stratum_sends)
pthread_cond_wait(&stratum_send_cond, &stratum_send_lock); pthread_cond_wait(&stratum_send_cond, &stratum_send_lock);
@ -1908,7 +2064,6 @@ static void *stratum_sender(void *arg)
if (unlikely(!msg->json_msg)) { if (unlikely(!msg->json_msg)) {
LOGERR("Sent null json msg to stratum_sender"); LOGERR("Sent null json msg to stratum_sender");
free(msg);
continue; continue;
} }
@ -1919,8 +2074,111 @@ static void *stratum_sender(void *arg)
send_proc(ckp->connector, s); send_proc(ckp->connector, s);
free(s); free(s);
json_decref(msg->json_msg); discard_stratum_msg(&msg);
free(msg); }
return NULL;
}
static void discard_json_params(json_params_t **jp)
{
json_decref((*jp)->params);
json_decref((*jp)->id_val);
free(*jp);
*jp = NULL;
}
static void *share_processor(void *arg)
{
ckpool_t __maybe_unused *ckp = (ckpool_t *)arg;
json_params_t *jp = NULL;
rename_proc("sprocessor");
while (42) {
json_t *result_val, *json_msg, *err_val;
stratum_instance_t *client;
int client_id;
if (jp)
discard_json_params(&jp);
mutex_lock(&sauth_lock);
if (!sshares)
pthread_cond_wait(&sshare_cond, &sshare_lock);
jp = sshares;
mutex_unlock(&sshare_lock);
if (unlikely(!jp))
continue;
client_id = jp->client_id;
ck_rlock(&instance_lock);
client = __instance_by_id(client_id);
ck_runlock(&instance_lock);
if (unlikely(!client)) {
LOGINFO("Share processor failed to find client id %d in hashtable!", client_id);
continue;
}
json_msg = json_object();
result_val = parse_submit(client, json_msg, jp->params, &err_val);
json_object_set_new_nocheck(json_msg, "result", result_val);
json_object_set_new_nocheck(json_msg, "error", err_val ? err_val : json_null());
json_object_set_nocheck(json_msg, "id", jp->id_val);
stratum_add_send(json_msg, client_id);
}
return NULL;
}
static void *authoriser(void *arg)
{
ckpool_t *ckp = (ckpool_t *)arg;
json_params_t *jp = NULL;
rename_proc("authoriser");
while (42) {
json_t *result_val, *json_msg, *err_val;
stratum_instance_t *client;
int client_id;
char buf[256];
if (jp)
discard_json_params(&jp);
mutex_lock(&sauth_lock);
if (!sauths)
pthread_cond_wait(&sauth_cond, &sauth_lock);
jp = sauths;
mutex_unlock(&sauth_lock);
if (unlikely(!jp))
continue;
client_id = jp->client_id;
ck_rlock(&instance_lock);
client = __instance_by_id(client_id);
ck_runlock(&instance_lock);
if (unlikely(!client)) {
LOGINFO("Authoriser failed to find client id %d in hashtable!", client_id);
continue;
}
result_val = parse_authorise(client, jp->params, &err_val);
if (json_is_true(result_val)) {
snprintf(buf, 255, "Authorised, welcome to %s %s!", ckp->name,
client->user_instance->username);
stratum_send_message(client, buf);
} else
stratum_send_message(client, "Failed authorisation :(");
json_msg = json_object();
json_object_set_new_nocheck(json_msg, "result", result_val);
json_object_set_new_nocheck(json_msg, "error", err_val ? err_val : json_null());
json_object_set_nocheck(json_msg, "id", jp->id_val);
stratum_add_send(json_msg, client_id);
} }
return NULL; return NULL;
@ -2148,7 +2406,7 @@ static void load_users(ckpool_t *ckp)
int stratifier(proc_instance_t *pi) int stratifier(proc_instance_t *pi)
{ {
pthread_t pth_blockupdate, pth_stratum_receiver, pth_stratum_sender; pthread_t pth_blockupdate, pth_stratum_receiver, pth_stratum_sender;
pthread_t pth_statsupdate; pthread_t pth_statsupdate, pth_share_processer, pth_authoriser;
ckpool_t *ckp = pi->ckp; ckpool_t *ckp = pi->ckp;
char *buf; char *buf;
int ret; int ret;
@ -2174,14 +2432,18 @@ int stratifier(proc_instance_t *pi)
cklock_init(&instance_lock); cklock_init(&instance_lock);
mutex_init(&stratum_recv_lock);
cond_init(&stratum_recv_cond);
create_pthread(&pth_stratum_receiver, stratum_receiver, ckp);
mutex_init(&stratum_send_lock); mutex_init(&stratum_send_lock);
cond_init(&stratum_send_cond); cond_init(&stratum_send_cond);
create_pthread(&pth_stratum_sender, stratum_sender, ckp); create_pthread(&pth_stratum_sender, stratum_sender, ckp);
mutex_init(&sshare_lock);
cond_init(&sshare_cond);
create_pthread(&pth_share_processer, share_processor, ckp);
mutex_init(&sauth_lock);
cond_init(&sauth_cond);
create_pthread(&pth_authoriser, authoriser, ckp);
cklock_init(&workbase_lock); cklock_init(&workbase_lock);
if (!ckp->proxy) if (!ckp->proxy)
create_pthread(&pth_blockupdate, blockupdate, ckp); create_pthread(&pth_blockupdate, blockupdate, ckp);
@ -2193,6 +2455,12 @@ int stratifier(proc_instance_t *pi)
load_users(ckp); load_users(ckp);
/* Start the receiver last to only process requests once all structures
* are set up. */
mutex_init(&stratum_recv_lock);
cond_init(&stratum_recv_cond);
create_pthread(&pth_stratum_receiver, stratum_receiver, ckp);
ret = stratum_loop(ckp, pi); ret = stratum_loop(ckp, pi);
out: out:
return process_exit(ckp, pi, ret); return process_exit(ckp, pi, ret);

Loading…
Cancel
Save