kanoi 11 years ago
parent
commit
fd9033073e
  1. 56
      src/ckpool.c
  2. 22
      src/ckpool.h
  3. 11
      src/generator.c
  4. 2
      src/libckpool.c
  5. 2
      src/libckpool.h
  6. 278
      src/stratifier.c
  7. 10
      src/stratifier.h

56
src/ckpool.c

@ -74,6 +74,62 @@ void logmsg(int loglevel, const char *fmt, ...) {
} }
} }
/* Generic function for creating a message queue receiving and parsing thread */
static void *ckmsg_queue(void *arg)
{
ckmsgq_t *ckmsgq = (ckmsgq_t *)arg;
ckpool_t *ckp = ckmsgq->ckp;
pthread_detach(pthread_self());
rename_proc(ckmsgq->name);
while (42) {
ckmsg_t *msg;
mutex_lock(&ckmsgq->lock);
if (!ckmsgq->msgs)
pthread_cond_wait(&ckmsgq->cond, &ckmsgq->lock);
msg = ckmsgq->msgs;
if (likely(msg))
DL_DELETE(ckmsgq->msgs, msg);
mutex_unlock(&ckmsgq->lock);
if (unlikely(!msg))
continue;
ckmsgq->func(ckp, msg->data);
free(msg);
}
return NULL;
}
ckmsgq_t *create_ckmsgq(ckpool_t *ckp, const char *name, const void *func)
{
ckmsgq_t *ckmsgq = ckzalloc(sizeof(ckmsgq_t));
strncpy(ckmsgq->name, name, 15);
ckmsgq->func = func;
ckmsgq->ckp = ckp;
mutex_init(&ckmsgq->lock);
cond_init(&ckmsgq->cond);
create_pthread(&ckmsgq->pth, ckmsg_queue, ckmsgq);
return ckmsgq;
}
/* Generic function for adding messages to a ckmsgq linked list and signal the ckmsgq
* parsing thread to wake up and process it. */
void ckmsgq_add(ckmsgq_t *ckmsgq, void *data)
{
ckmsg_t *msg = ckalloc(sizeof(ckmsg_t));
msg->data = data;
mutex_lock(&ckmsgq->lock);
DL_APPEND(ckmsgq->msgs, msg);
pthread_cond_signal(&ckmsgq->cond);
mutex_unlock(&ckmsgq->lock);
}
/* Listen for incoming global requests. Always returns a response if possible */ /* Listen for incoming global requests. Always returns a response if possible */
static void *listener(void *arg) static void *listener(void *arg)
{ {

22
src/ckpool.h

@ -126,6 +126,28 @@ struct ckpool_instance {
char **proxypass; char **proxypass;
}; };
struct ckmsg {
struct ckmsg *next;
struct ckmsg *prev;
void *data;
};
typedef struct ckmsg ckmsg_t;
struct ckmsgq {
ckpool_t *ckp;
char name[16];
pthread_t pth;
pthread_mutex_t lock;
pthread_cond_t cond;
ckmsg_t *msgs;
void (*func)(ckpool_t *, void *);
};
typedef struct ckmsgq ckmsgq_t;
ckmsgq_t *create_ckmsgq(ckpool_t *ckp, const char *name, const void *func);
void ckmsgq_add(ckmsgq_t *ckmsgq, void *data);
ckpool_t *global_ckp; ckpool_t *global_ckp;
bool ping_main(ckpool_t *ckp); bool ping_main(ckpool_t *ckp);

11
src/generator.c

@ -18,7 +18,6 @@
#include "libckpool.h" #include "libckpool.h"
#include "generator.h" #include "generator.h"
#include "bitcoin.h" #include "bitcoin.h"
#include "stratifier.h"
#include "uthash.h" #include "uthash.h"
#include "utlist.h" #include "utlist.h"
@ -54,6 +53,16 @@ struct share_msg {
typedef struct share_msg share_msg_t; typedef struct share_msg share_msg_t;
struct stratum_msg {
struct stratum_msg *next;
struct stratum_msg *prev;
json_t *json_msg;
int client_id;
};
typedef struct stratum_msg stratum_msg_t;
/* Per proxied pool instance data */ /* Per proxied pool instance data */
struct proxy_instance { struct proxy_instance {
ckpool_t *ckp; ckpool_t *ckp;

2
src/libckpool.c

@ -32,6 +32,7 @@
#include "ckpool.h" #include "ckpool.h"
#include "libckpool.h" #include "libckpool.h"
#include "sha2.h" #include "sha2.h"
#include "utlist.h"
#ifndef UNIX_PATH_MAX #ifndef UNIX_PATH_MAX
#define UNIX_PATH_MAX 108 #define UNIX_PATH_MAX 108
@ -75,7 +76,6 @@ void join_pthread(pthread_t thread)
pthread_join(thread, NULL); pthread_join(thread, NULL);
} }
/* Place holders for when we add lock debugging */ /* Place holders for when we add lock debugging */
#define GETLOCK(_lock, _file, _func, _line) #define GETLOCK(_lock, _file, _func, _line)
#define GOTLOCK(_lock, _file, _func, _line) #define GOTLOCK(_lock, _file, _func, _line)

2
src/libckpool.h

@ -32,6 +32,8 @@
# include <sys/endian.h> # include <sys/endian.h>
#endif #endif
#include "utlist.h"
#ifndef bswap_16 #ifndef bswap_16
#define bswap_16 __builtin_bswap16 #define bswap_16 __builtin_bswap16
#define bswap_32 __builtin_bswap32 #define bswap_32 __builtin_bswap32

278
src/stratifier.c

@ -157,28 +157,7 @@ static int64_t workbase_id;
static int64_t blockchange_id; static int64_t blockchange_id;
static char lasthash[68]; static char lasthash[68];
/* For protecting the stratum msg data */
static pthread_mutex_t stratum_recv_lock;
static pthread_mutex_t stratum_send_lock;
static pthread_mutex_t sshare_lock;
static pthread_mutex_t sauth_lock;
static pthread_mutex_t ckdbq_lock;
/* For signalling the threads to wake up and do work */
static pthread_cond_t stratum_recv_cond;
static pthread_cond_t stratum_send_cond;
static pthread_cond_t sshare_cond;
static pthread_cond_t sauth_cond;
static pthread_cond_t ckdbq_cond;
/* For the linked list of all queued messages */
static stratum_msg_t *stratum_recvs;
static stratum_msg_t *stratum_sends;
struct json_params { struct json_params {
struct json_params *next;
struct json_params *prev;
json_t *params; json_t *params;
json_t *id_val; json_t *id_val;
int client_id; int client_id;
@ -186,20 +165,26 @@ struct json_params {
typedef struct json_params json_params_t; typedef struct json_params json_params_t;
static json_params_t *sshares;
static json_params_t *sauths;
struct ckdb_msg { struct ckdb_msg {
struct ckdb_msg *next;
struct ckdb_msg *prev;
json_t *val; json_t *val;
int idtype; int idtype;
}; };
typedef struct ckdb_msg ckdb_msg_t; typedef struct ckdb_msg ckdb_msg_t;
static ckdb_msg_t *ckdb_msgs; /* Stratum json messages with their associated client id */
struct smsg {
json_t *json_msg;
int client_id;
};
typedef struct smsg smsg_t;
static ckmsgq_t *ssends; // Stratum sends
static ckmsgq_t *srecvs; // Stratum receives
static ckmsgq_t *ckdbq; // ckdb
static ckmsgq_t *sshareq; // Stratum share sends
static ckmsgq_t *sauthq; // Stratum authorisations
static int user_instance_id; static int user_instance_id;
@ -427,11 +412,7 @@ static void ckdbq_add(const int idtype, json_t *val)
msg->val = val; msg->val = val;
msg->idtype = idtype; msg->idtype = idtype;
ckmsgq_add(ckdbq, msg);
mutex_lock(&ckdbq_lock);
DL_APPEND(ckdb_msgs, msg);
pthread_cond_signal(&ckdbq_cond);
mutex_unlock(&ckdbq_lock);
} }
static void send_workinfo(ckpool_t *ckp, workbase_t *wb) static void send_workinfo(ckpool_t *ckp, workbase_t *wb)
@ -796,15 +777,11 @@ out:
static void stratum_add_recvd(json_t *val) static void stratum_add_recvd(json_t *val)
{ {
stratum_msg_t *msg; smsg_t *msg;
msg = ckzalloc(sizeof(stratum_msg_t)); msg = ckzalloc(sizeof(smsg_t));
msg->json_msg = val; msg->json_msg = val;
ckmsgq_add(srecvs, msg);
mutex_lock(&stratum_recv_lock);
DL_APPEND(stratum_recvs, msg);
pthread_cond_signal(&stratum_recv_cond);
mutex_unlock(&stratum_recv_lock);
} }
/* For creating a list of sends without locking that can then be concatenated /* For creating a list of sends without locking that can then be concatenated
@ -813,7 +790,7 @@ static void stratum_add_recvd(json_t *val)
static void stratum_broadcast(json_t *val) static void stratum_broadcast(json_t *val)
{ {
stratum_instance_t *instance, *tmp; stratum_instance_t *instance, *tmp;
stratum_msg_t *bulk_send = NULL; ckmsg_t *bulk_send = NULL;
if (unlikely(!val)) { if (unlikely(!val)) {
LOGERR("Sent null json to stratum_broadcast"); LOGERR("Sent null json to stratum_broadcast");
@ -822,14 +799,17 @@ static void stratum_broadcast(json_t *val)
ck_rlock(&instance_lock); ck_rlock(&instance_lock);
HASH_ITER(hh, stratum_instances, instance, tmp) { HASH_ITER(hh, stratum_instances, instance, tmp) {
stratum_msg_t *msg; ckmsg_t *client_msg;
smsg_t *msg;
if (!instance->authorised) if (!instance->authorised)
continue; continue;
msg = ckzalloc(sizeof(stratum_msg_t)); client_msg = ckalloc(sizeof(ckmsg_t));
msg = ckzalloc(sizeof(smsg_t));
msg->json_msg = json_deep_copy(val); msg->json_msg = json_deep_copy(val);
msg->client_id = instance->id; msg->client_id = instance->id;
DL_APPEND(bulk_send, msg); client_msg->data = msg;
DL_APPEND(bulk_send, client_msg);
} }
ck_runlock(&instance_lock); ck_runlock(&instance_lock);
@ -838,27 +818,23 @@ static void stratum_broadcast(json_t *val)
if (!bulk_send) if (!bulk_send)
return; return;
mutex_lock(&stratum_send_lock); mutex_lock(&ssends->lock);
if (stratum_sends) if (ssends->msgs)
DL_CONCAT(stratum_sends, bulk_send); DL_CONCAT(ssends->msgs, bulk_send);
else else
stratum_sends = bulk_send; ssends->msgs = bulk_send;
pthread_cond_signal(&stratum_send_cond); pthread_cond_signal(&ssends->cond);
mutex_unlock(&stratum_send_lock); mutex_unlock(&ssends->lock);
} }
static void stratum_add_send(json_t *val, int client_id) static void stratum_add_send(json_t *val, int client_id)
{ {
stratum_msg_t *msg; smsg_t *msg;
msg = ckzalloc(sizeof(stratum_msg_t)); msg = ckzalloc(sizeof(smsg_t));
msg->json_msg = val; msg->json_msg = val;
msg->client_id = client_id; msg->client_id = client_id;
ckmsgq_add(ssends, msg);
mutex_lock(&stratum_send_lock);
DL_APPEND(stratum_sends, msg);
pthread_cond_signal(&stratum_send_cond);
mutex_unlock(&stratum_send_lock);
} }
static void drop_client(int id) static void drop_client(int id)
@ -1835,11 +1811,7 @@ static void parse_method(const int client_id, json_t *id_val, json_t *method_val
if (!strncasecmp(method, "mining.auth", 11)) { if (!strncasecmp(method, "mining.auth", 11)) {
json_params_t *jp = create_json_params(client_id, params_val, id_val); json_params_t *jp = create_json_params(client_id, params_val, id_val);
mutex_lock(&sauth_lock); ckmsgq_add(sauthq, jp);
DL_APPEND(sauths, jp);
pthread_cond_signal(&sauth_cond);
mutex_unlock(&sauth_lock);
return; return;
} }
@ -1859,18 +1831,14 @@ static void parse_method(const int client_id, json_t *id_val, json_t *method_val
if (!strncasecmp(method, "mining.submit", 13)) { if (!strncasecmp(method, "mining.submit", 13)) {
json_params_t *jp = create_json_params(client_id, params_val, id_val); json_params_t *jp = create_json_params(client_id, params_val, id_val);
mutex_lock(&sshare_lock); ckmsgq_add(sshareq, jp);
DL_APPEND(sshares, jp);
pthread_cond_signal(&sshare_cond);
mutex_unlock(&sshare_lock);
return; return;
} }
/* Unhandled message here */ /* Unhandled message here */
} }
static void parse_instance_msg(stratum_msg_t *msg) static void parse_instance_msg(smsg_t *msg)
{ {
json_t *val = msg->json_msg, *id_val, *method, *params; json_t *val = msg->json_msg, *id_val, *method, *params;
int client_id = msg->client_id; int client_id = msg->client_id;
@ -1898,30 +1866,10 @@ out:
free(msg); free(msg);
} }
static void *stratum_receiver(void *arg) static void srecv_process(ckpool_t *ckp, smsg_t *msg)
{ {
ckpool_t *ckp = (ckpool_t *)arg;
stratum_msg_t *msg;
pthread_detach(pthread_self());
rename_proc("sreceiver");
while (42) {
stratum_instance_t *instance; stratum_instance_t *instance;
/* Pop the head off the list if it exists or wait for a conditional
* signal telling us there is work */
mutex_lock(&stratum_recv_lock);
if (!stratum_recvs)
pthread_cond_wait(&stratum_recv_cond, &stratum_recv_lock);
msg = stratum_recvs;
if (likely(msg))
DL_DELETE(stratum_recvs, msg);
mutex_unlock(&stratum_recv_lock);
if (unlikely(!msg))
continue;
msg->client_id = json_integer_value(json_object_get(msg->json_msg, "client_id")); msg->client_id = json_integer_value(json_object_get(msg->json_msg, "client_id"));
json_object_del(msg->json_msg, "client_id"); json_object_del(msg->json_msg, "client_id");
@ -1937,46 +1885,24 @@ static void *stratum_receiver(void *arg)
ck_uilock(&instance_lock); ck_uilock(&instance_lock);
parse_instance_msg(msg); parse_instance_msg(msg);
}
return NULL;
} }
static void discard_stratum_msg(stratum_msg_t **msg) static void discard_stratum_msg(smsg_t **msg)
{ {
json_decref((*msg)->json_msg); json_decref((*msg)->json_msg);
free(*msg); free(*msg);
*msg = NULL; *msg = NULL;
} }
static void *stratum_sender(void *arg) static void ssend_process(ckpool_t *ckp, smsg_t *msg)
{ {
ckpool_t *ckp = (ckpool_t *)arg;
stratum_msg_t *msg = NULL;
pthread_detach(pthread_self());
rename_proc("ssender");
while (42) {
char *s; char *s;
if (msg)
discard_stratum_msg(&msg);
mutex_lock(&stratum_send_lock);
if (!stratum_sends)
pthread_cond_wait(&stratum_send_cond, &stratum_send_lock);
msg = stratum_sends;
if (likely(msg))
DL_DELETE(stratum_sends, msg);
mutex_unlock(&stratum_send_lock);
if (unlikely(!msg))
continue;
if (unlikely(!msg->json_msg)) { if (unlikely(!msg->json_msg)) {
LOGERR("Sent null json msg to stratum_sender"); LOGERR("Sent null json msg to stratum_sender");
continue; free(msg);
return;
} }
/* Add client_id to the json message and send it to the /* Add client_id to the json message and send it to the
@ -1985,13 +1911,9 @@ static void *stratum_sender(void *arg)
s = json_dumps(msg->json_msg, 0); s = json_dumps(msg->json_msg, 0);
send_proc(ckp->connector, s); send_proc(ckp->connector, s);
free(s); free(s);
discard_stratum_msg(&msg); discard_stratum_msg(&msg);
} }
return NULL;
}
static void discard_json_params(json_params_t **jp) static void discard_json_params(json_params_t **jp)
{ {
json_decref((*jp)->params); json_decref((*jp)->params);
@ -2000,32 +1922,12 @@ static void discard_json_params(json_params_t **jp)
*jp = NULL; *jp = NULL;
} }
static void *share_processor(void *arg) static void sshare_process(ckpool_t __maybe_unused *ckp, json_params_t *jp)
{ {
ckpool_t __maybe_unused *ckp = (ckpool_t *)arg;
json_params_t *jp = NULL;
pthread_detach(pthread_self());
rename_proc("sprocessor");
while (42) {
json_t *result_val, *json_msg, *err_val = NULL; json_t *result_val, *json_msg, *err_val = NULL;
stratum_instance_t *client; stratum_instance_t *client;
int client_id; int client_id;
if (jp)
discard_json_params(&jp);
mutex_lock(&sshare_lock);
if (!sshares)
pthread_cond_wait(&sshare_cond, &sshare_lock);
jp = sshares;
if (likely(jp))
DL_DELETE(sshares, jp);
mutex_unlock(&sshare_lock);
if (unlikely(!jp))
continue;
client_id = jp->client_id; client_id = jp->client_id;
ck_rlock(&instance_lock); ck_rlock(&instance_lock);
@ -2034,7 +1936,7 @@ static void *share_processor(void *arg)
if (unlikely(!client)) { if (unlikely(!client)) {
LOGINFO("Share processor failed to find client id %d in hashtable!", client_id); LOGINFO("Share processor failed to find client id %d in hashtable!", client_id);
continue; goto out;
} }
json_msg = json_object(); json_msg = json_object();
result_val = parse_submit(client, json_msg, jp->params, &err_val); result_val = parse_submit(client, json_msg, jp->params, &err_val);
@ -2042,38 +1944,15 @@ static void *share_processor(void *arg)
json_object_set_new_nocheck(json_msg, "error", err_val ? err_val : json_null()); json_object_set_new_nocheck(json_msg, "error", err_val ? err_val : json_null());
json_object_set_nocheck(json_msg, "id", jp->id_val); json_object_set_nocheck(json_msg, "id", jp->id_val);
stratum_add_send(json_msg, client_id); stratum_add_send(json_msg, client_id);
out:
discard_json_params(&jp);
} }
return NULL; static void sauth_process(ckpool_t *ckp, json_params_t *jp)
}
static void *authoriser(void *arg)
{ {
ckpool_t *ckp = (ckpool_t *)arg;
json_params_t *jp = NULL;
pthread_detach(pthread_self());
rename_proc("authoriser");
while (42) {
json_t *result_val, *json_msg, *err_val = NULL; json_t *result_val, *json_msg, *err_val = NULL;
stratum_instance_t *client; stratum_instance_t *client;
int client_id; int client_id;
char buf[256];
if (jp)
discard_json_params(&jp);
mutex_lock(&sauth_lock);
if (!sauths)
pthread_cond_wait(&sauth_cond, &sauth_lock);
jp = sauths;
if (likely(jp))
DL_DELETE(sauths, jp);
mutex_unlock(&sauth_lock);
if (unlikely(!jp))
continue;
client_id = jp->client_id; client_id = jp->client_id;
@ -2083,11 +1962,13 @@ static void *authoriser(void *arg)
if (unlikely(!client)) { if (unlikely(!client)) {
LOGINFO("Authoriser failed to find client id %d in hashtable!", client_id); LOGINFO("Authoriser failed to find client id %d in hashtable!", client_id);
continue; goto out;
} }
result_val = parse_authorise(client, jp->params, &err_val); result_val = parse_authorise(client, jp->params, &err_val);
if (json_is_true(result_val)) { if (json_is_true(result_val)) {
snprintf(buf, 255, "Authorised, welcome to %s %s!", ckp->name, char *buf;
ASPRINTF(&buf, "Authorised, welcome to %s %s!", ckp->name,
client->user_instance->username); client->user_instance->username);
stratum_send_message(client, buf); stratum_send_message(client, buf);
} else } else
@ -2097,49 +1978,28 @@ static void *authoriser(void *arg)
json_object_set_new_nocheck(json_msg, "error", err_val ? err_val : json_null()); json_object_set_new_nocheck(json_msg, "error", err_val ? err_val : json_null());
json_object_set_nocheck(json_msg, "id", jp->id_val); json_object_set_nocheck(json_msg, "id", jp->id_val);
stratum_add_send(json_msg, client_id); stratum_add_send(json_msg, client_id);
} out:
discard_json_params(&jp);
return NULL;
} }
static void *ckdbqueue(void *arg) static void ckdbq_process(ckpool_t *ckp, ckdb_msg_t *data)
{ {
ckpool_t *ckp = (ckpool_t *)arg;
pthread_detach(pthread_self());
rename_proc("ckdbqueue");
while (42) {
bool logged = false; bool logged = false;
char *buf = NULL; char *buf = NULL;
ckdb_msg_t *msg;
mutex_lock(&ckdbq_lock);
if (!ckdb_msgs)
pthread_cond_wait(&ckdbq_cond, &ckdbq_lock);
msg = ckdb_msgs;
if (likely(msg))
DL_DELETE(ckdb_msgs, msg);
mutex_unlock(&ckdbq_lock);
if (unlikely(!msg))
continue;
while (!buf) { while (!buf) {
buf = json_ckdb_call(ckp, ckdb_ids[msg->idtype], msg->val, logged); buf = json_ckdb_call(ckp, ckdb_ids[data->idtype], data->val, logged);
if (unlikely(!buf)) { if (unlikely(!buf)) {
LOGWARNING("Failed to talk to ckdb, queueing messages"); LOGWARNING("Failed to talk to ckdb, queueing messages");
sleep(5); sleep(5);
} }
logged = true; logged = true;
} }
LOGINFO("Got %s ckdb response: %s", ckdb_ids[msg->idtype], buf); LOGINFO("Got %s ckdb response: %s", ckdb_ids[data->idtype], buf);
free(buf); free(buf);
} }
return NULL;
}
static const double nonces = 4294967296; static const double nonces = 4294967296;
/* Called every 15 seconds, we send the updated stats to ckdb of those users /* Called every 15 seconds, we send the updated stats to ckdb of those users
@ -2386,9 +2246,7 @@ static void *statsupdate(void *arg)
int stratifier(proc_instance_t *pi) int stratifier(proc_instance_t *pi)
{ {
pthread_t pth_blockupdate, pth_stratum_receiver, pth_stratum_sender; pthread_t pth_blockupdate, pth_statsupdate;
pthread_t pth_statsupdate, pth_share_processer, pth_authoriser;
pthread_t pth_ckdbqueue;
ckpool_t *ckp = pi->ckp; ckpool_t *ckp = pi->ckp;
char *buf; char *buf;
int ret; int ret;
@ -2414,25 +2272,11 @@ int stratifier(proc_instance_t *pi)
cklock_init(&instance_lock); cklock_init(&instance_lock);
mutex_init(&stratum_send_lock); ssends = create_ckmsgq(ckp, "ssender", &ssend_process);
cond_init(&stratum_send_cond); srecvs = create_ckmsgq(ckp, "sreceiver", &srecv_process);
create_pthread(&pth_stratum_sender, stratum_sender, ckp); sshareq = create_ckmsgq(ckp, "sprocessor", &sshare_process);
sauthq = create_ckmsgq(ckp, "authoriser", &sauth_process);
mutex_init(&stratum_recv_lock); ckdbq = create_ckmsgq(ckp, "ckdbqueue", &ckdbq_process);
cond_init(&stratum_recv_cond);
create_pthread(&pth_stratum_receiver, stratum_receiver, ckp);
mutex_init(&sshare_lock);
cond_init(&sshare_cond);
create_pthread(&pth_share_processer, share_processor, ckp);
mutex_init(&sauth_lock);
cond_init(&sauth_cond);
create_pthread(&pth_authoriser, authoriser, ckp);
mutex_init(&ckdbq_lock);
cond_init(&ckdbq_cond);
create_pthread(&pth_ckdbqueue, ckdbqueue, ckp);
cklock_init(&workbase_lock); cklock_init(&workbase_lock);
if (!ckp->proxy) if (!ckp->proxy)

10
src/stratifier.h

@ -10,16 +10,6 @@
#ifndef STRATIFIER_H #ifndef STRATIFIER_H
#define STRATIFIER_H #define STRATIFIER_H
struct stratum_msg {
struct stratum_msg *next;
struct stratum_msg *prev;
json_t *json_msg;
int client_id;
};
typedef struct stratum_msg stratum_msg_t;
int stratifier(proc_instance_t *pi); int stratifier(proc_instance_t *pi);
#endif /* STRATIFIER_H */ #endif /* STRATIFIER_H */

Loading…
Cancel
Save