Browse Source

These weren't the droids we were looking for.

master
Con Kolivas 10 years ago
parent
commit
f027885ff6
  1. 90
      src/ckpool.c
  2. 4
      src/ckpool.h
  3. 34
      src/generator.c
  4. 90
      src/stratifier.c

90
src/ckpool.c

@ -137,27 +137,27 @@ static void *ckmsg_queue(void *arg)
/* Generic workqueue function and message receiving and parsing thread */
static void *ckwq_queue(void *arg)
{
ckwq_t *ckwq = (ckwq_t *)arg;
ckpool_t *ckp = ckwq->ckp;
ckwq_t *ckmsgq = (ckwq_t *)arg;
ckpool_t *ckp = ckmsgq->ckp;
pthread_detach(pthread_self());
rename_proc(ckwq->name);
rename_proc(ckmsgq->name);
while (42) {
ckwqmsg_t *wqmsg;
tv_t now;
ts_t abs;
mutex_lock(ckwq->lock);
mutex_lock(ckmsgq->lock);
tv_time(&now);
tv_to_ts(&abs, &now);
abs.tv_sec++;
if (!ckwq->wqmsgs)
cond_timedwait(ckwq->cond, ckwq->lock, &abs);
wqmsg = ckwq->wqmsgs;
if (!ckmsgq->wqmsgs)
cond_timedwait(ckmsgq->cond, ckmsgq->lock, &abs);
wqmsg = ckmsgq->wqmsgs;
if (wqmsg)
DL_DELETE(ckwq->wqmsgs, wqmsg);
mutex_unlock(ckwq->lock);
DL_DELETE(ckmsgq->wqmsgs, wqmsg);
mutex_unlock(ckmsgq->lock);
if (!wqmsg)
continue;
@ -209,7 +209,7 @@ ckmsgq_t *create_ckmsgqs(ckpool_t *ckp, const char *name, const void *func, cons
ckwq_t *create_ckwqs(ckpool_t *ckp, const char *name, const int count)
{
ckwq_t *ckwq = ckzalloc(sizeof(ckwq_t) * count);
ckwq_t *ckwq = ckzalloc(sizeof(ckmsgq_t) * count);
mutex_t *lock;
pthread_cond_t *cond;
int i;
@ -575,14 +575,32 @@ out:
static void childsighandler(const int sig);
/* Send a single message to a process instance when there will be no response,
* closing the socket immediately. */
void _send_proc(proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line)
struct proc_message {
proc_instance_t *pi;
char *msg;
const char *file;
const char *func;
int line;
};
/* Send all one way messages asynchronously so we can wait till the receiving
* end closes the socket to ensure all messages are received but no deadlocks
* can occur with 2 processes waiting for each other's socket closure. */
void *async_send_proc(void *arg)
{
struct proc_message *pm = (struct proc_message *)arg;
proc_instance_t *pi = pm->pi;
char *msg = pm->msg;
const char *file = pm->file;
const char *func = pm->func;
int line = pm->line;
char *path = pi->us.path;
bool ret = false;
int sockd;
pthread_detach(pthread_self());
if (unlikely(!path || !strlen(path))) {
LOGERR("Attempted to send message %s to null path in send_proc", msg ? msg : "");
goto out;
@ -593,14 +611,16 @@ void _send_proc(proc_instance_t *pi, const char *msg, const char *file, const ch
}
/* At startup the pid fields are not set up before some processes are
* forked so they never inherit them. */
if (unlikely(!pi->pid))
if (unlikely(!pi->pid)) {
pi->pid = get_proc_pid(pi);
if (!pi->pid) {
LOGALERT("Attempting to send message %s to non existent process %s", msg, pi->processname);
return;
if (!pi->pid) {
LOGALERT("Attempting to send message %s to non existent process %s", msg, pi->processname);
goto out_nofail;
}
}
if (unlikely(kill_pid(pi->pid, 0))) {
LOGALERT("Attempting to send message %s to non existent process %s", msg, pi->processname);
LOGALERT("Attempting to send message %s to non existent process %s pid %d",
msg, pi->processname, pi->pid);
goto out;
}
sockd = open_unix_client(path);
@ -620,41 +640,25 @@ out:
LOGERR("Failure in send_proc from %s %s:%d", file, func, line);
childsighandler(15);
}
}
struct proc_message {
proc_instance_t *pi;
char *msg;
const char *file;
const char *func;
int line;
};
static void asp_send(ckpool_t __maybe_unused *ckp, struct proc_message *pm)
{
_send_proc(pm->pi, pm->msg, pm->file, pm->func, pm->line);
free(pm->msg);
out_nofail:
free(msg);
free(pm);
return NULL;
}
/* Fore sending asynchronous messages to another process, the sending process
* must have ckwqs of its own, referenced in the ckpool structure */
void _async_send_proc(ckpool_t *ckp, proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line)
/* Send a single message to a process instance when there will be no response,
* closing the socket immediately. */
void _send_proc(proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line)
{
struct proc_message *pm;
struct proc_message *pm = ckalloc(sizeof(struct proc_message));
pthread_t pth;
if (unlikely(!ckp->ckwqs)) {
LOGALERT("Workqueues not set up in async_send_proc!");
_send_proc(pi, msg, file, func, line);
return;
}
pm = ckzalloc(sizeof(struct proc_message));
pm->pi = pi;
pm->msg = strdup(msg);
pm->file = file;
pm->func = func;
pm->line = line;
ckwq_add(ckp->ckwqs, &asp_send, pm);
create_pthread(&pth, async_send_proc, pm);
}
/* Send a single message to a process instance and retrieve the response, then

4
src/ckpool.h

@ -213,8 +213,6 @@ struct ckpool_instance {
/* Private data for each process */
void *data;
/* Private generic workqueues if this process has them */
ckwq_t *ckwqs;
};
#ifdef USE_CKDB
@ -239,8 +237,6 @@ void empty_buffer(connsock_t *cs);
int read_socket_line(connsock_t *cs, const int timeout);
void _send_proc(proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line);
#define send_proc(pi, msg) _send_proc(pi, msg, __FILE__, __func__, __LINE__)
void _async_send_proc(ckpool_t *ckp, proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line);
#define async_send_proc(ckp, pi, msg) _async_send_proc(ckp, pi, msg, __FILE__, __func__, __LINE__)
char *_send_recv_proc(proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line);
#define send_recv_proc(pi, msg) _send_recv_proc(pi, msg, __FILE__, __func__, __LINE__)
char *_send_recv_ckdb(const ckpool_t *ckp, const char *msg, const char *file, const char *func, const int line);

34
src/generator.c

@ -221,7 +221,7 @@ retry:
cs = &alive->cs;
LOGINFO("Connected to live server %s:%s", cs->url, cs->port);
out:
async_send_proc(ckp, ckp->connector, alive ? "accept" : "reject");
send_proc(ckp->connector, alive ? "accept" : "reject");
return alive;
}
@ -367,7 +367,7 @@ retry:
ret = submit_block(cs, buf + 12 + 64 + 1);
memset(buf + 12 + 64, 0, 1);
sprintf(blockmsg, "%sblock:%s", ret ? "" : "no", buf + 12);
async_send_proc(ckp, ckp->stratifier, blockmsg);
send_proc(ckp->stratifier, blockmsg);
} else if (cmdmatch(buf, "checkaddr:")) {
if (validate_address(cs, buf + 10))
send_unix_msg(sockd, "true");
@ -972,7 +972,7 @@ static void send_diff(ckpool_t *ckp, proxy_instance_t *proxi)
json_decref(json_msg);
ASPRINTF(&buf, "diff=%s", msg);
free(msg);
async_send_proc(ckp, ckp->stratifier, buf);
send_proc(ckp->stratifier, buf);
free(buf);
}
@ -1007,7 +1007,7 @@ static void send_notify(ckpool_t *ckp, proxy_instance_t *proxi)
json_decref(json_msg);
ASPRINTF(&buf, "notify=%s", msg);
free(msg);
async_send_proc(ckp, ckp->stratifier, buf);
send_proc(ckp->stratifier, buf);
free(buf);
}
@ -1195,7 +1195,7 @@ static void send_subscribe(ckpool_t *ckp, proxy_instance_t *proxi)
json_decref(json_msg);
ASPRINTF(&buf, "subscribe=%s", msg);
free(msg);
async_send_proc(ckp, ckp->stratifier, buf);
send_proc(ckp->stratifier, buf);
free(buf);
}
@ -1428,7 +1428,7 @@ static void *passthrough_recv(void *arg)
if (proxy_alive(ckp, si, proxi, cs, false)) {
proxi->alive = true;
async_send_proc(ckp, ckp->generator, "reconnect");
send_proc(ckp->generator, "reconnect");
LOGWARNING("Proxy %d:%s connection established",
proxi->id, proxi->si->url);
}
@ -1439,13 +1439,13 @@ static void *passthrough_recv(void *arg)
while (!proxy_alive(ckp, si, proxi, cs, true)) {
if (proxi->alive) {
proxi->alive = false;
async_send_proc(ckp, ckp->generator, "reconnect");
send_proc(ckp->generator, "reconnect");
}
sleep(5);
}
if (!proxi->alive) {
proxi->alive = true;
async_send_proc(ckp, ckp->generator, "reconnect");
send_proc(ckp->generator, "reconnect");
}
do {
@ -1456,13 +1456,13 @@ static void *passthrough_recv(void *arg)
LOGWARNING("Proxy %d:%s failed to read_socket_line in proxy_recv, attempting reconnect",
proxi->id, proxi->si->url);
proxi->alive = false;
async_send_proc(ckp, ckp->generator, "reconnect");
send_proc(ckp->generator, "reconnect");
continue;
}
/* Simply forward the message on, as is, to the connector to
* process. Possibly parse parameters sent by upstream pool
* here */
async_send_proc(ckp, ckp->connector, cs->buf);
send_proc(ckp->connector, cs->buf);
}
return NULL;
}
@ -1496,7 +1496,7 @@ static void *proxy_recv(void *arg)
if (proxy_alive(ckp, si, proxi, cs, false)) {
proxi->alive = true;
async_send_proc(ckp, ckp->generator, "reconnect");
send_proc(ckp->generator, "reconnect");
LOGWARNING("Proxy %d:%s connection established",
proxi->id, proxi->si->url);
}
@ -1510,7 +1510,7 @@ static void *proxy_recv(void *arg)
while (!proxy_alive(ckp, si, proxi, cs, true)) {
if (proxi->alive) {
proxi->alive = false;
async_send_proc(ckp, ckp->generator, "reconnect");
send_proc(ckp->generator, "reconnect");
}
sleep(5);
proxi->reconnect_time = time(NULL);
@ -1522,7 +1522,7 @@ static void *proxy_recv(void *arg)
LOGWARNING("Proxy %d:%s recovered", proxi->id, proxi->si->url);
proxi->alive = true;
proxi->reconnect_time = 0;
async_send_proc(ckp, ckp->generator, "reconnect");
send_proc(ckp->generator, "reconnect");
}
now = time(NULL);
@ -1572,7 +1572,7 @@ static void *proxy_recv(void *arg)
* pool is up */
proxi->reconnect = false;
proxi->alive = false;
async_send_proc(ckp, ckp->generator, "reconnect");
send_proc(ckp->generator, "reconnect");
LOGWARNING("Proxy %d:%s reconnect issue, dropping existing connection",
proxi->id, proxi->si->url);
Close(cs->fd);
@ -1644,7 +1644,7 @@ static proxy_instance_t *best_proxy(ckpool_t *ckp, gdata_t *gdata)
break;
sleep(1);
}
async_send_proc(ckp, ckp->connector, ret ? "accept" : "reject");
send_proc(ckp->connector, ret ? "accept" : "reject");
return ret;
}
@ -1672,7 +1672,7 @@ reconnect:
proxi->id, cs->url, cs->port);
dealloc(buf);
ASPRINTF(&buf, "proxy=%d", proxi->id);
async_send_proc(ckp, ckp->stratifier, buf);
send_proc(ckp->stratifier, buf);
/* Send a notify for the new chosen proxy or the
* stratifier won't be able to switch. */
send_notify(ckp, proxi);
@ -1857,7 +1857,7 @@ static void server_watchdog(ckpool_t *ckp, server_instance_t *cursi)
break;
}
if (alive)
async_send_proc(ckp, ckp->generator, "reconnect");
send_proc(ckp->generator, "reconnect");
}
int generator(proc_instance_t *pi)

90
src/stratifier.c

@ -336,10 +336,12 @@ struct stratifier_data {
char lasthash[68];
char lastswaphash[68];
ckwq_t *ckwqs; // Generic workqueues
ckmsgq_t *ssends; // Stratum sends
ckmsgq_t *srecvs; // Stratum receives
ckmsgq_t *ckdbq; // ckdb
ckmsgq_t *sshareq; // Stratum share sends
ckmsgq_t *sauthq; // Stratum authorisations
ckmsgq_t *stxnq; // Transaction requests
int64_t user_instance_id;
@ -808,26 +810,38 @@ static void send_generator(ckpool_t *ckp, const char *msg, const int prio)
set = true;
} else
set = false;
async_send_proc(ckp, ckp->generator, msg);
send_proc(ckp->generator, msg);
if (set)
sdata->gen_priority = 0;
}
struct update_req {
pthread_t *pth;
ckpool_t *ckp;
int prio;
};
static void broadcast_ping(sdata_t *sdata);
/* This function assumes it will only receive a valid json gbt base template
* since checking should have been done earlier, and creates the base template
* for generating work templates. */
static void do_update(ckpool_t *ckp, int *prio)
static void *do_update(void *arg)
{
struct update_req *ur = (struct update_req *)arg;
ckpool_t *ckp = ur->ckp;
sdata_t *sdata = ckp->data;
bool new_block = false;
int prio = ur->prio;
bool ret = false;
workbase_t *wb;
json_t *val;
char *buf;
buf = send_recv_generator(ckp, "getbase", *prio);
pthread_detach(pthread_self());
rename_proc("updater");
buf = send_recv_generator(ckp, "getbase", prio);
if (unlikely(!buf)) {
LOGNOTICE("Get base in update_base delayed due to higher priority request");
goto out;
@ -887,17 +901,21 @@ out:
LOGINFO("Broadcast ping due to failed stratum base update");
broadcast_ping(sdata);
}
free(buf);
free(prio);
dealloc(buf);
free(ur->pth);
free(ur);
return NULL;
}
static void update_base(ckpool_t *ckp, const int prio)
{
int *pprio = ckalloc(sizeof(int));
sdata_t *sdata = ckp->data;
struct update_req *ur = ckalloc(sizeof(struct update_req));
pthread_t *pth = ckalloc(sizeof(pthread_t));
*pprio = prio;
ckwq_add(sdata->ckwqs, &do_update, pprio);
ur->pth = pth;
ur->ckp = ckp;
ur->prio = prio;
create_pthread(pth, do_update, ur);
}
static void __kill_instance(stratum_instance_t *client)
@ -929,7 +947,7 @@ static void connector_drop_client(ckpool_t *ckp, const int64_t id)
LOGDEBUG("Stratifier requesting connector drop client %"PRId64, id);
snprintf(buf, 255, "dropclient=%"PRId64, id);
async_send_proc(ckp, ckp->connector, buf);
send_proc(ckp->connector, buf);
}
static void drop_allclients(ckpool_t *ckp)
@ -1739,21 +1757,6 @@ static void ckmsgq_stats(ckmsgq_t *ckmsgq, const int size, json_t **val)
JSON_CPACK(*val, "{si,si,si}", "count", objects, "memory", memsize, "generated", generated);
}
static void ckwq_stats(ckwq_t *ckwq, const int size, json_t **val)
{
int objects, generated;
int64_t memsize;
ckwqmsg_t *wqmsg;
mutex_lock(ckwq->lock);
DL_COUNT(ckwq->wqmsgs, wqmsg, objects);
generated = ckwq->messages;
mutex_unlock(ckwq->lock);
memsize = (sizeof(ckwqmsg_t) + size) * objects;
JSON_CPACK(*val, "{si,si,si}", "count", objects, "memory", memsize, "generated", generated);
}
static char *stratifier_stats(ckpool_t *ckp, sdata_t *sdata)
{
json_t *val = json_object(), *subval;
@ -1800,14 +1803,15 @@ static char *stratifier_stats(ckpool_t *ckp, sdata_t *sdata)
ckmsgq_stats(sdata->ssends, sizeof(smsg_t), &subval);
json_set_object(val, "ssends", subval);
/* Don't know exactly how big the string is so just count the pointer for now */
ckwq_stats(sdata->ckwqs, sizeof(char *) + sizeof(void *), &subval);
json_set_object(val, "ckwqs", subval);
ckmsgq_stats(sdata->srecvs, sizeof(char *), &subval);
json_set_object(val, "srecvs", subval);
if (!CKP_STANDALONE(ckp)) {
ckmsgq_stats(sdata->ckdbq, sizeof(char *), &subval);
json_set_object(val, "ckdbq", subval);
}
ckmsgq_stats(sdata->stxnq, sizeof(json_params_t), &subval);
json_set_object(val, "stxnq", subval);
buf = json_dumps(val, JSON_NO_UTF8 | JSON_PRESERVE_ORDER);
json_decref(val);
@ -1890,7 +1894,7 @@ retry:
/* The bulk of the messages will be received json from the
* connector so look for this first. The srecv_process frees
* the buf heap ram */
ckwq_add(sdata->ckwqs, &srecv_process, buf);
ckmsgq_add(sdata->srecvs, buf);
Close(sockd);
buf = NULL;
goto retry;
@ -3414,14 +3418,10 @@ static void suggest_diff(stratum_instance_t *client, const char *method, const j
stratum_send_diff(sdata, client);
}
static void sshare_process(ckpool_t *ckp, json_params_t *jp);
static void send_transactions(ckpool_t *ckp, json_params_t *jp);
/* Enter with client holding ref count */
static void parse_method(sdata_t *sdata, stratum_instance_t *client, const int64_t client_id,
json_t *id_val, json_t *method_val, json_t *params_val, const char *address)
{
ckpool_t *ckp = client->ckp;
const char *method;
/* Random broken clients send something not an integer as the id so we
@ -3431,7 +3431,7 @@ static void parse_method(sdata_t *sdata, stratum_instance_t *client, const int64
if (likely(cmdmatch(method, "mining.submit") && client->authorised)) {
json_params_t *jp = create_json_params(client_id, method_val, params_val, id_val, address);
ckwq_add(sdata->ckwqs, &sshare_process, jp);
ckmsgq_add(sdata->sshareq, jp);
return;
}
@ -3468,14 +3468,14 @@ static void parse_method(sdata_t *sdata, stratum_instance_t *client, const int64
* to it since it's unauthorised. Set the flag just in case. */
client->authorised = false;
snprintf(buf, 255, "passthrough=%"PRId64, client_id);
async_send_proc(ckp, ckp->connector, buf);
send_proc(client->ckp->connector, buf);
return;
}
/* We should only accept subscribed requests from here on */
if (!client->subscribed) {
LOGINFO("Dropping unsubscribed client %"PRId64, client_id);
connector_drop_client(ckp, client_id);
connector_drop_client(client->ckp, client_id);
return;
}
@ -3497,7 +3497,7 @@ static void parse_method(sdata_t *sdata, stratum_instance_t *client, const int64
* stratifier process to restart since it will have lost all
* the stratum instance data. Clients will just reconnect. */
LOGINFO("Dropping unauthorised client %"PRId64, client_id);
connector_drop_client(ckp, client_id);
connector_drop_client(client->ckp, client_id);
return;
}
@ -3510,7 +3510,7 @@ static void parse_method(sdata_t *sdata, stratum_instance_t *client, const int64
if (cmdmatch(method, "mining.get")) {
json_params_t *jp = create_json_params(client_id, method_val, params_val, id_val, address);
ckwq_add(sdata->ckwqs, &send_transactions, jp);
ckmsgq_add(sdata->stxnq, jp);
return;
}
/* Unhandled message here */
@ -3685,7 +3685,7 @@ static void ssend_process(ckpool_t *ckp, smsg_t *msg)
* connector process to be delivered */
json_object_set_new_nocheck(msg->json_msg, "client_id", json_integer(msg->client_id));
s = json_dumps(msg->json_msg, 0);
async_send_proc(ckp, ckp->connector, s);
send_proc(ckp->connector, s);
free(s);
free_smsg(msg);
}
@ -4543,10 +4543,14 @@ int stratifier(proc_instance_t *pi)
mutex_init(&sdata->ckdb_lock);
sdata->ssends = create_ckmsgq(ckp, "ssender", &ssend_process);
/* Create as many generic workqueue threads as there are CPUs */
threads = sysconf(_SC_NPROCESSORS_ONLN);
ckp->ckwqs = sdata->ckwqs = create_ckwqs(ckp, "strat", threads);
/* Create half as many share processing threads as there are CPUs */
threads = sysconf(_SC_NPROCESSORS_ONLN) / 2 ? : 1;
sdata->sshareq = create_ckmsgqs(ckp, "sprocessor", &sshare_process, threads);
/* Create 1/4 as many stratum processing threads as there are CPUs */
threads = threads / 2 ? : 1;
sdata->srecvs = create_ckmsgqs(ckp, "sreceiver", &srecv_process, threads);
sdata->sauthq = create_ckmsgq(ckp, "authoriser", &sauth_process);
sdata->stxnq = create_ckmsgq(ckp, "stxnq", &send_transactions);
if (!CKP_STANDALONE(ckp)) {
sdata->ckdbq = create_ckmsgq(ckp, "ckdbqueue", &ckdbq_process);
create_pthread(&pth_heartbeat, ckdb_heartbeat, ckp);

Loading…
Cancel
Save