Browse Source

Move the stratifier stratum send and receive queues to generic ckmsg queues

master
Con Kolivas 11 years ago
parent
commit
72df09d650
  1. 11
      src/generator.c
  2. 142
      src/stratifier.c
  3. 10
      src/stratifier.h

11
src/generator.c

@ -18,7 +18,6 @@
#include "libckpool.h" #include "libckpool.h"
#include "generator.h" #include "generator.h"
#include "bitcoin.h" #include "bitcoin.h"
#include "stratifier.h"
#include "uthash.h" #include "uthash.h"
#include "utlist.h" #include "utlist.h"
@ -54,6 +53,16 @@ struct share_msg {
typedef struct share_msg share_msg_t; typedef struct share_msg share_msg_t;
struct stratum_msg {
struct stratum_msg *next;
struct stratum_msg *prev;
json_t *json_msg;
int client_id;
};
typedef struct stratum_msg stratum_msg_t;
/* Per proxied pool instance data */ /* Per proxied pool instance data */
struct proxy_instance { struct proxy_instance {
ckpool_t *ckp; ckpool_t *ckp;

142
src/stratifier.c

@ -157,18 +157,6 @@ static int64_t workbase_id;
static int64_t blockchange_id; static int64_t blockchange_id;
static char lasthash[68]; static char lasthash[68];
/* For protecting the stratum msg data */
static pthread_mutex_t stratum_recv_lock;
static pthread_mutex_t stratum_send_lock;
/* For signalling the threads to wake up and do work */
static pthread_cond_t stratum_recv_cond;
static pthread_cond_t stratum_send_cond;
/* For the linked list of all queued messages */
static stratum_msg_t *stratum_recvs;
static stratum_msg_t *stratum_sends;
struct json_params { struct json_params {
json_t *params; json_t *params;
json_t *id_val; json_t *id_val;
@ -184,9 +172,19 @@ struct ckdb_msg {
typedef struct ckdb_msg ckdb_msg_t; typedef struct ckdb_msg ckdb_msg_t;
static ckmsgq_t *ckdbq; /* Stratum json messages with their associated client id */
static ckmsgq_t *sshareq; struct smsg {
static ckmsgq_t *sauthq; json_t *json_msg;
int client_id;
};
typedef struct smsg smsg_t;
static ckmsgq_t *ssends; // Stratum sends
static ckmsgq_t *srecvs; // Stratum receives
static ckmsgq_t *ckdbq; // ckdb
static ckmsgq_t *sshareq; // Stratum share sends
static ckmsgq_t *sauthq; // Stratum authorisations
static int user_instance_id; static int user_instance_id;
@ -779,15 +777,11 @@ out:
static void stratum_add_recvd(json_t *val) static void stratum_add_recvd(json_t *val)
{ {
stratum_msg_t *msg; smsg_t *msg;
msg = ckzalloc(sizeof(stratum_msg_t)); msg = ckzalloc(sizeof(smsg_t));
msg->json_msg = val; msg->json_msg = val;
ckmsgq_add(srecvs, msg);
mutex_lock(&stratum_recv_lock);
DL_APPEND(stratum_recvs, msg);
pthread_cond_signal(&stratum_recv_cond);
mutex_unlock(&stratum_recv_lock);
} }
/* For creating a list of sends without locking that can then be concatenated /* For creating a list of sends without locking that can then be concatenated
@ -796,7 +790,7 @@ static void stratum_add_recvd(json_t *val)
static void stratum_broadcast(json_t *val) static void stratum_broadcast(json_t *val)
{ {
stratum_instance_t *instance, *tmp; stratum_instance_t *instance, *tmp;
stratum_msg_t *bulk_send = NULL; ckmsg_t *bulk_send = NULL;
if (unlikely(!val)) { if (unlikely(!val)) {
LOGERR("Sent null json to stratum_broadcast"); LOGERR("Sent null json to stratum_broadcast");
@ -805,14 +799,17 @@ static void stratum_broadcast(json_t *val)
ck_rlock(&instance_lock); ck_rlock(&instance_lock);
HASH_ITER(hh, stratum_instances, instance, tmp) { HASH_ITER(hh, stratum_instances, instance, tmp) {
stratum_msg_t *msg; ckmsg_t *client_msg;
smsg_t *msg;
if (!instance->authorised) if (!instance->authorised)
continue; continue;
msg = ckzalloc(sizeof(stratum_msg_t)); client_msg = ckalloc(sizeof(ckmsg_t));
msg = ckzalloc(sizeof(smsg_t));
msg->json_msg = json_deep_copy(val); msg->json_msg = json_deep_copy(val);
msg->client_id = instance->id; msg->client_id = instance->id;
DL_APPEND(bulk_send, msg); client_msg->data = msg;
DL_APPEND(bulk_send, client_msg);
} }
ck_runlock(&instance_lock); ck_runlock(&instance_lock);
@ -821,27 +818,23 @@ static void stratum_broadcast(json_t *val)
if (!bulk_send) if (!bulk_send)
return; return;
mutex_lock(&stratum_send_lock); mutex_lock(&ssends->lock);
if (stratum_sends) if (ssends->msgs)
DL_CONCAT(stratum_sends, bulk_send); DL_CONCAT(ssends->msgs, bulk_send);
else else
stratum_sends = bulk_send; ssends->msgs = bulk_send;
pthread_cond_signal(&stratum_send_cond); pthread_cond_signal(&ssends->cond);
mutex_unlock(&stratum_send_lock); mutex_unlock(&ssends->lock);
} }
static void stratum_add_send(json_t *val, int client_id) static void stratum_add_send(json_t *val, int client_id)
{ {
stratum_msg_t *msg; smsg_t *msg;
msg = ckzalloc(sizeof(stratum_msg_t)); msg = ckzalloc(sizeof(smsg_t));
msg->json_msg = val; msg->json_msg = val;
msg->client_id = client_id; msg->client_id = client_id;
ckmsgq_add(ssends, msg);
mutex_lock(&stratum_send_lock);
DL_APPEND(stratum_sends, msg);
pthread_cond_signal(&stratum_send_cond);
mutex_unlock(&stratum_send_lock);
} }
static void drop_client(int id) static void drop_client(int id)
@ -1845,7 +1838,7 @@ static void parse_method(const int client_id, json_t *id_val, json_t *method_val
/* Unhandled message here */ /* Unhandled message here */
} }
static void parse_instance_msg(stratum_msg_t *msg) static void parse_instance_msg(smsg_t *msg)
{ {
json_t *val = msg->json_msg, *id_val, *method, *params; json_t *val = msg->json_msg, *id_val, *method, *params;
int client_id = msg->client_id; int client_id = msg->client_id;
@ -1873,30 +1866,10 @@ out:
free(msg); free(msg);
} }
static void *stratum_receiver(void *arg) static void srecv_process(ckpool_t *ckp, smsg_t *msg)
{ {
ckpool_t *ckp = (ckpool_t *)arg;
stratum_msg_t *msg;
pthread_detach(pthread_self());
rename_proc("sreceiver");
while (42) {
stratum_instance_t *instance; stratum_instance_t *instance;
/* Pop the head off the list if it exists or wait for a conditional
* signal telling us there is work */
mutex_lock(&stratum_recv_lock);
if (!stratum_recvs)
pthread_cond_wait(&stratum_recv_cond, &stratum_recv_lock);
msg = stratum_recvs;
if (likely(msg))
DL_DELETE(stratum_recvs, msg);
mutex_unlock(&stratum_recv_lock);
if (unlikely(!msg))
continue;
msg->client_id = json_integer_value(json_object_get(msg->json_msg, "client_id")); msg->client_id = json_integer_value(json_object_get(msg->json_msg, "client_id"));
json_object_del(msg->json_msg, "client_id"); json_object_del(msg->json_msg, "client_id");
@ -1912,46 +1885,24 @@ static void *stratum_receiver(void *arg)
ck_uilock(&instance_lock); ck_uilock(&instance_lock);
parse_instance_msg(msg); parse_instance_msg(msg);
}
return NULL;
} }
static void discard_stratum_msg(stratum_msg_t **msg) static void discard_stratum_msg(smsg_t **msg)
{ {
json_decref((*msg)->json_msg); json_decref((*msg)->json_msg);
free(*msg); free(*msg);
*msg = NULL; *msg = NULL;
} }
static void *stratum_sender(void *arg) static void ssend_process(ckpool_t *ckp, smsg_t *msg)
{ {
ckpool_t *ckp = (ckpool_t *)arg;
stratum_msg_t *msg = NULL;
pthread_detach(pthread_self());
rename_proc("ssender");
while (42) {
char *s; char *s;
if (msg)
discard_stratum_msg(&msg);
mutex_lock(&stratum_send_lock);
if (!stratum_sends)
pthread_cond_wait(&stratum_send_cond, &stratum_send_lock);
msg = stratum_sends;
if (likely(msg))
DL_DELETE(stratum_sends, msg);
mutex_unlock(&stratum_send_lock);
if (unlikely(!msg))
continue;
if (unlikely(!msg->json_msg)) { if (unlikely(!msg->json_msg)) {
LOGERR("Sent null json msg to stratum_sender"); LOGERR("Sent null json msg to stratum_sender");
continue; free(msg);
return;
} }
/* Add client_id to the json message and send it to the /* Add client_id to the json message and send it to the
@ -1960,11 +1911,7 @@ static void *stratum_sender(void *arg)
s = json_dumps(msg->json_msg, 0); s = json_dumps(msg->json_msg, 0);
send_proc(ckp->connector, s); send_proc(ckp->connector, s);
free(s); free(s);
discard_stratum_msg(&msg); discard_stratum_msg(&msg);
}
return NULL;
} }
static void discard_json_params(json_params_t **jp) static void discard_json_params(json_params_t **jp)
@ -2299,8 +2246,7 @@ static void *statsupdate(void *arg)
int stratifier(proc_instance_t *pi) int stratifier(proc_instance_t *pi)
{ {
pthread_t pth_blockupdate, pth_stratum_receiver, pth_stratum_sender; pthread_t pth_blockupdate, pth_statsupdate;
pthread_t pth_statsupdate;
ckpool_t *ckp = pi->ckp; ckpool_t *ckp = pi->ckp;
char *buf; char *buf;
int ret; int ret;
@ -2326,14 +2272,8 @@ int stratifier(proc_instance_t *pi)
cklock_init(&instance_lock); cklock_init(&instance_lock);
mutex_init(&stratum_send_lock); ssends = create_ckmsgq(ckp, "ssender", &ssend_process);
cond_init(&stratum_send_cond); srecvs = create_ckmsgq(ckp, "sreceiver", &srecv_process);
create_pthread(&pth_stratum_sender, stratum_sender, ckp);
mutex_init(&stratum_recv_lock);
cond_init(&stratum_recv_cond);
create_pthread(&pth_stratum_receiver, stratum_receiver, ckp);
sshareq = create_ckmsgq(ckp, "sprocessor", &sshare_process); sshareq = create_ckmsgq(ckp, "sprocessor", &sshare_process);
sauthq = create_ckmsgq(ckp, "authoriser", &sauth_process); sauthq = create_ckmsgq(ckp, "authoriser", &sauth_process);
ckdbq = create_ckmsgq(ckp, "ckdbqueue", &ckdbq_process); ckdbq = create_ckmsgq(ckp, "ckdbqueue", &ckdbq_process);

10
src/stratifier.h

@ -10,16 +10,6 @@
#ifndef STRATIFIER_H #ifndef STRATIFIER_H
#define STRATIFIER_H #define STRATIFIER_H
struct stratum_msg {
struct stratum_msg *next;
struct stratum_msg *prev;
json_t *json_msg;
int client_id;
};
typedef struct stratum_msg stratum_msg_t;
int stratifier(proc_instance_t *pi); int stratifier(proc_instance_t *pi);
#endif /* STRATIFIER_H */ #endif /* STRATIFIER_H */

Loading…
Cancel
Save