Browse Source

Revert "Use async send proc as a separate thread from workqueues"

This reverts commit 7de43b1c6e.
master
Con Kolivas 10 years ago
parent
commit
6b3cd82525
  1. 66
      src/ckpool.c
  2. 4
      src/ckpool.h
  3. 10
      src/connector.c
  4. 28
      src/generator.c
  5. 25
      src/stratifier.c

66
src/ckpool.c

@ -575,32 +575,14 @@ out:
static void childsighandler(const int sig);
struct proc_message {
proc_instance_t *pi;
char *msg;
const char *file;
const char *func;
int line;
};
/* Send all one way messages asynchronously so we can wait till the receiving
* end closes the socket to ensure all messages are received but no deadlocks
* can occur with 2 processes waiting for each other's socket closure. */
void *async_send_proc(void *arg)
/* Send a single message to a process instance when there will be no response,
* closing the socket immediately. */
void _send_proc(proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line)
{
struct proc_message *pm = (struct proc_message *)arg;
proc_instance_t *pi = pm->pi;
char *msg = pm->msg;
const char *file = pm->file;
const char *func = pm->func;
int line = pm->line;
char *path = pi->us.path;
bool ret = false;
int sockd;
pthread_detach(pthread_self());
if (unlikely(!path || !strlen(path))) {
LOGERR("Attempted to send message %s to null path in send_proc", msg ? msg : "");
goto out;
@ -611,16 +593,14 @@ void *async_send_proc(void *arg)
}
/* At startup the pid fields are not set up before some processes are
* forked so they never inherit them. */
if (unlikely(!pi->pid)) {
if (unlikely(!pi->pid))
pi->pid = get_proc_pid(pi);
if (!pi->pid) {
LOGALERT("Attempting to send message %s to non existent process %s", msg, pi->processname);
goto out_nofail;
}
return;
}
if (unlikely(kill_pid(pi->pid, 0))) {
LOGALERT("Attempting to send message %s to non existent process %s pid %d",
msg, pi->processname, pi->pid);
LOGALERT("Attempting to send message %s to non existent process %s", msg, pi->processname);
goto out;
}
sockd = open_unix_client(path);
@ -640,25 +620,41 @@ out:
LOGERR("Failure in send_proc from %s %s:%d", file, func, line);
childsighandler(15);
}
out_nofail:
free(msg);
}
struct proc_message {
proc_instance_t *pi;
char *msg;
const char *file;
const char *func;
int line;
};
static void asp_send(ckpool_t __maybe_unused *ckp, struct proc_message *pm)
{
_send_proc(pm->pi, pm->msg, pm->file, pm->func, pm->line);
free(pm->msg);
free(pm);
return NULL;
}
/* Send a single message to a process instance when there will be no response,
* closing the socket immediately. */
void _send_proc(proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line)
/* Fore sending asynchronous messages to another process, the sending process
* must have ckwqs of its own, referenced in the ckpool structure */
void _async_send_proc(ckpool_t *ckp, proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line)
{
struct proc_message *pm = ckalloc(sizeof(struct proc_message));
pthread_t pth;
struct proc_message *pm;
if (unlikely(!ckp->ckwqs)) {
LOGALERT("Workqueues not set up in async_send_proc!");
_send_proc(pi, msg, file, func, line);
return;
}
pm = ckzalloc(sizeof(struct proc_message));
pm->pi = pi;
pm->msg = strdup(msg);
pm->file = file;
pm->func = func;
pm->line = line;
create_pthread(&pth, async_send_proc, pm);
ckwq_add(ckp->ckwqs, &asp_send, pm);
}
/* Send a single message to a process instance and retrieve the response, then

4
src/ckpool.h

@ -210,6 +210,8 @@ struct ckpool_instance {
/* Private data for each process */
void *data;
/* Private generic workqueues if this process has them */
ckwq_t *ckwqs;
};
#ifdef USE_CKDB
@ -234,6 +236,8 @@ void empty_buffer(connsock_t *cs);
int read_socket_line(connsock_t *cs, const int timeout);
void _send_proc(proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line);
#define send_proc(pi, msg) _send_proc(pi, msg, __FILE__, __func__, __LINE__)
void _async_send_proc(ckpool_t *ckp, proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line);
#define async_send_proc(ckp, pi, msg) _async_send_proc(ckp, pi, msg, __FILE__, __func__, __LINE__)
char *_send_recv_proc(proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line);
#define send_recv_proc(pi, msg) _send_recv_proc(pi, msg, __FILE__, __func__, __LINE__)
char *_send_recv_ckdb(const ckpool_t *ckp, const char *msg, const char *file, const char *func, const int line);

10
src/connector.c

@ -97,6 +97,8 @@ struct connector_data {
/* For protecting the pending sends list */
mutex_t sender_lock;
pthread_cond_t sender_cond;
ckwq_t *ckwqs;
};
typedef struct connector_data cdata_t;
@ -242,7 +244,7 @@ static void stratifier_drop_client(ckpool_t *ckp, const int64_t id)
char buf[256];
sprintf(buf, "dropclient=%"PRId64, id);
send_proc(ckp->stratifier, buf);
async_send_proc(ckp, ckp->stratifier, buf);
}
/* Invalidate this instance. Remove them from the hashtables we look up
@ -360,9 +362,9 @@ reparse:
* filtered by the stratifier. */
if (likely(client->fd != -1)) {
if (ckp->passthrough)
send_proc(ckp->generator, s);
async_send_proc(ckp, ckp->generator, s);
else
send_proc(ckp->stratifier, s);
async_send_proc(ckp, ckp->stratifier, s);
}
free(s);
@ -868,6 +870,8 @@ int connector(proc_instance_t *pi)
LOGWARNING("%s connector starting", ckp->name);
ckp->data = cdata;
cdata->ckp = ckp;
/* Connector only requires one work queue */
ckp->ckwqs = cdata->ckwqs = create_ckwqs(ckp, "conn", 1);
if (!ckp->serverurls)
cdata->serverfd = ckalloc(sizeof(int *));

28
src/generator.c

@ -146,6 +146,7 @@ struct generator_data {
proxy_instance_t *dead_proxies; /* Disabled proxies */
int proxy_notify_id; // Globally increasing notify id
ckmsgq_t *srvchk; // Server check message queue
ckwq_t *ckwqs;
};
typedef struct generator_data gdata_t;
@ -235,7 +236,7 @@ retry:
cs = &alive->cs;
LOGINFO("Connected to live server %s:%s", cs->url, cs->port);
out:
send_proc(ckp->connector, alive ? "accept" : "reject");
async_send_proc(ckp, ckp->connector, alive ? "accept" : "reject");
return alive;
}
@ -381,7 +382,7 @@ retry:
ret = submit_block(cs, buf + 12 + 64 + 1);
memset(buf + 12 + 64, 0, 1);
sprintf(blockmsg, "%sblock:%s", ret ? "" : "no", buf + 12);
send_proc(ckp->stratifier, blockmsg);
async_send_proc(ckp, ckp->stratifier, blockmsg);
} else if (cmdmatch(buf, "checkaddr:")) {
if (validate_address(cs, buf + 10))
send_unix_msg(sockd, "true");
@ -944,7 +945,7 @@ static void send_stratifier_deadproxy(ckpool_t *ckp, const int id, const int sub
char buf[256];
sprintf(buf, "deadproxy=%d:%d", id, subid);
send_proc(ckp->stratifier, buf);
async_send_proc(ckp, ckp->stratifier, buf);
}
/* Remove the subproxy from the proxi list and put it on the dead list.
@ -1091,7 +1092,7 @@ static void send_diff(ckpool_t *ckp, proxy_instance_t *proxi)
json_decref(json_msg);
ASPRINTF(&buf, "diff=%s", msg);
free(msg);
send_proc(ckp->stratifier, buf);
async_send_proc(ckp, ckp->stratifier, buf);
free(buf);
}
@ -1119,7 +1120,7 @@ static void send_notify(ckpool_t *ckp, proxy_instance_t *proxi, notify_instance_
json_decref(json_msg);
ASPRINTF(&buf, "notify=%s", msg);
free(msg);
send_proc(ckp->stratifier, buf);
async_send_proc(ckp, ckp->stratifier, buf);
free(buf);
/* Send diff now as stratifier will not accept diff till it has a
@ -1309,7 +1310,7 @@ static void send_subscribe(ckpool_t *ckp, proxy_instance_t *proxi)
json_decref(json_msg);
ASPRINTF(&buf, "subscribe=%s", msg);
free(msg);
send_proc(ckp->stratifier, buf);
async_send_proc(ckp, ckp->stratifier, buf);
free(buf);
}
@ -1355,7 +1356,7 @@ static void stratifier_reconnect_client(ckpool_t *ckp, const int64_t id)
char buf[256];
sprintf(buf, "reconnclient=%"PRId64, id);
send_proc(ckp->stratifier, buf);
async_send_proc(ckp, ckp->stratifier, buf);
}
static void submit_share(gdata_t *gdata, json_t *val)
@ -1731,9 +1732,9 @@ static void reconnect_proxy(proxy_instance_t *proxi)
create_pthread(&pth, proxy_reconnect, proxi);
}
static void reconnect_generator(const ckpool_t *ckp)
static void reconnect_generator(ckpool_t *ckp)
{
send_proc(ckp->generator, "reconnect");
async_send_proc(ckp, ckp->generator, "reconnect");
}
/* For receiving messages from an upstream pool to pass downstream. Responsible
@ -1790,7 +1791,7 @@ static void *passthrough_recv(void *arg)
/* Simply forward the message on, as is, to the connector to
* process. Possibly parse parameters sent by upstream pool
* here */
send_proc(ckp->connector, cs->buf);
async_send_proc(ckp, ckp->connector, cs->buf);
}
return NULL;
}
@ -1995,10 +1996,10 @@ static proxy_instance_t *wait_best_proxy(ckpool_t *ckp, gdata_t *gdata)
if (ret)
break;
send_proc(ckp->connector, "reject");
async_send_proc(ckp, ckp->connector, "reject");
sleep(1);
}
send_proc(ckp->connector, ret ? "accept" : "reject");
async_send_proc(ckp, ckp->connector, ret ? "accept" : "reject");
return ret;
}
@ -2027,7 +2028,7 @@ reconnect:
proxi->id, cs->url, cs->port);
dealloc(buf);
ASPRINTF(&buf, "proxy=%d", proxi->id);
send_proc(ckp->stratifier, buf);
async_send_proc(ckp, ckp->stratifier, buf);
}
}
retry:
@ -2217,6 +2218,7 @@ int generator(proc_instance_t *pi)
gdata = ckzalloc(sizeof(gdata_t));
ckp->data = gdata;
gdata->ckp = ckp;
ckp->ckwqs = gdata->ckwqs = create_ckwqs(ckp, "gen", 1);
if (ckp->proxy) {
char *buf = NULL;

25
src/stratifier.c

@ -831,7 +831,7 @@ static char *send_recv_generator(ckpool_t *ckp, const char *msg, const int prio)
return buf;
}
static void send_generator(const ckpool_t *ckp, const char *msg, const int prio)
static void send_generator(ckpool_t *ckp, const char *msg, const int prio)
{
sdata_t *sdata = ckp->data;
bool set;
@ -841,7 +841,7 @@ static void send_generator(const ckpool_t *ckp, const char *msg, const int prio)
set = true;
} else
set = false;
send_proc(ckp->generator, msg);
async_send_proc(ckp, ckp->generator, msg);
if (set)
sdata->gen_priority = 0;
}
@ -966,7 +966,7 @@ static void connector_drop_client(ckpool_t *ckp, const int64_t id)
LOGDEBUG("Stratifier requesting connector drop client %"PRId64, id);
snprintf(buf, 255, "dropclient=%"PRId64, id);
send_proc(ckp->connector, buf);
async_send_proc(ckp, ckp->connector, buf);
}
static void drop_allclients(ckpool_t *ckp)
@ -1012,8 +1012,8 @@ static sdata_t *duplicate_sdata(const sdata_t *sdata)
memcpy(dsdata->donkeytxnbin, sdata->donkeytxnbin, 25);
/* Use the same work queues for all subproxies */
dsdata->ssends = sdata->ssends;
dsdata->ckwqs = sdata->ckwqs;
dsdata->ssends = sdata->ssends;
dsdata->ckdbq = sdata->ckdbq;
dsdata->sauthq = sdata->sauthq;
@ -1139,7 +1139,7 @@ out_unlock:
static void reconnect_client(sdata_t *sdata, stratum_instance_t *client);
static void generator_recruit(const ckpool_t *ckp)
static void generator_recruit(ckpool_t *ckp)
{
LOGINFO("Stratifer requesting more proxies from generator");
send_generator(ckp, "recruit", GEN_PRIORITY);
@ -1778,7 +1778,7 @@ static void connector_test_client(ckpool_t *ckp, const int64_t id)
LOGDEBUG("Stratifier requesting connector test client %"PRId64, id);
snprintf(buf, 255, "testclient=%"PRId64, id);
send_proc(ckp->connector, buf);
async_send_proc(ckp, ckp->connector, buf);
}
/* For creating a list of sends without locking that can then be concatenated
@ -2564,7 +2564,7 @@ static void stratum_send_message(sdata_t *sdata, const stratum_instance_t *clien
* in proxy mode where we find a subproxy based on the current proxy with room
* for more clients. Signal the generator to recruit more subproxies if we are
* running out of room. */
static sdata_t *select_sdata(const ckpool_t *ckp, sdata_t *ckp_sdata)
static sdata_t *select_sdata(ckpool_t *ckp, sdata_t *ckp_sdata)
{
proxy_t *current, *proxy, *subproxy, *best = NULL, *tmp, *tmpsub;
int best_id, best_subid = 0;
@ -3965,6 +3965,7 @@ static void send_transactions(ckpool_t *ckp, json_params_t *jp);
static void parse_method(sdata_t *sdata, stratum_instance_t *client, const int64_t client_id,
json_t *id_val, json_t *method_val, json_t *params_val, const char *address)
{
ckpool_t *ckp = client->ckp;
const char *method;
/* Random broken clients send something not an integer as the id so we
@ -4011,14 +4012,14 @@ static void parse_method(sdata_t *sdata, stratum_instance_t *client, const int64
* to it since it's unauthorised. Set the flag just in case. */
client->authorised = false;
snprintf(buf, 255, "passthrough=%"PRId64, client_id);
send_proc(client->ckp->connector, buf);
async_send_proc(ckp, ckp->connector, buf);
return;
}
/* We should only accept subscribed requests from here on */
if (!client->subscribed) {
LOGINFO("Dropping unsubscribed client %"PRId64, client_id);
connector_drop_client(client->ckp, client_id);
connector_drop_client(ckp, client_id);
return;
}
@ -4040,7 +4041,7 @@ static void parse_method(sdata_t *sdata, stratum_instance_t *client, const int64
* stratifier process to restart since it will have lost all
* the stratum instance data. Clients will just reconnect. */
LOGINFO("Dropping unauthorised client %"PRId64, client_id);
connector_drop_client(client->ckp, client_id);
connector_drop_client(ckp, client_id);
return;
}
@ -4214,7 +4215,7 @@ static void ssend_process(ckpool_t *ckp, smsg_t *msg)
* connector process to be delivered */
json_object_set_new_nocheck(msg->json_msg, "client_id", json_integer(msg->client_id));
s = json_dumps(msg->json_msg, 0);
send_proc(ckp->connector, s);
async_send_proc(ckp, ckp->connector, s);
free(s);
free_smsg(msg);
}
@ -5077,7 +5078,7 @@ int stratifier(proc_instance_t *pi)
sdata->ssends = create_ckmsgq(ckp, "ssender", &ssend_process);
/* Create as many generic workqueue threads as there are CPUs */
threads = sysconf(_SC_NPROCESSORS_ONLN);
sdata->ckwqs = create_ckwqs(ckp, "strat", threads);
ckp->ckwqs = sdata->ckwqs = create_ckwqs(ckp, "strat", threads);
sdata->sauthq = create_ckmsgq(ckp, "authoriser", &sauth_process);
if (!CKP_STANDALONE(ckp)) {
sdata->ckdbq = create_ckmsgq(ckp, "ckdbqueue", &ckdbq_process);

Loading…
Cancel
Save