Browse Source

Try async messages again

master
Con Kolivas 10 years ago
parent
commit
d75dd5543c
  1. 70
      src/ckpool.c
  2. 4
      src/ckpool.h
  3. 10
      src/connector.c
  4. 28
      src/generator.c
  5. 25
      src/stratifier.c

70
src/ckpool.c

@ -575,14 +575,32 @@ out:
static void childsighandler(const int sig);
/* Send a single message to a process instance when there will be no response,
* closing the socket immediately. */
void _send_proc(proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line)
struct proc_message {
proc_instance_t *pi;
char *msg;
const char *file;
const char *func;
int line;
};
/* Send all one way messages asynchronously so we can wait till the receiving
* end closes the socket to ensure all messages are received but no deadlocks
* can occur with 2 processes waiting for each other's socket closure. */
void *async_send_proc(void *arg)
{
struct proc_message *pm = (struct proc_message *)arg;
proc_instance_t *pi = pm->pi;
char *msg = pm->msg;
const char *file = pm->file;
const char *func = pm->func;
int line = pm->line;
char *path = pi->us.path;
bool ret = false;
int sockd;
pthread_detach(pthread_self());
if (unlikely(!path || !strlen(path))) {
LOGERR("Attempted to send message %s to null path in send_proc", msg ? msg : "");
goto out;
@ -593,14 +611,16 @@ void _send_proc(proc_instance_t *pi, const char *msg, const char *file, const ch
}
/* At startup the pid fields are not set up before some processes are
* forked so they never inherit them. */
if (unlikely(!pi->pid))
if (unlikely(!pi->pid)) {
pi->pid = get_proc_pid(pi);
if (!pi->pid) {
LOGALERT("Attempting to send message %s to non existent process %s", msg, pi->processname);
return;
if (!pi->pid) {
LOGALERT("Attempting to send message %s to non existent process %s", msg, pi->processname);
goto out_nofail;
}
}
if (unlikely(kill_pid(pi->pid, 0))) {
LOGALERT("Attempting to send message %s to non existent process %s", msg, pi->processname);
LOGALERT("Attempting to send message %s to non existent process %s pid %d",
msg, pi->processname, pi->pid);
goto out;
}
sockd = open_unix_client(path);
@ -620,41 +640,25 @@ out:
LOGERR("Failure in send_proc from %s %s:%d", file, func, line);
childsighandler(15);
}
}
struct proc_message {
proc_instance_t *pi;
char *msg;
const char *file;
const char *func;
int line;
};
static void asp_send(ckpool_t __maybe_unused *ckp, struct proc_message *pm)
{
_send_proc(pm->pi, pm->msg, pm->file, pm->func, pm->line);
free(pm->msg);
out_nofail:
free(msg);
free(pm);
return NULL;
}
/* Fore sending asynchronous messages to another process, the sending process
* must have ckwqs of its own, referenced in the ckpool structure */
void _async_send_proc(ckpool_t *ckp, proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line)
/* Send a single message to a process instance when there will be no response,
* closing the socket immediately. */
void _send_proc(proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line)
{
struct proc_message *pm;
struct proc_message *pm = ckalloc(sizeof(struct proc_message));
pthread_t pth;
if (unlikely(!ckp->ckwqs)) {
LOGALERT("Workqueues not set up in async_send_proc!");
_send_proc(pi, msg, file, func, line);
return;
}
pm = ckzalloc(sizeof(struct proc_message));
pm->pi = pi;
pm->msg = strdup(msg);
pm->file = file;
pm->func = func;
pm->line = line;
ckwq_add(ckp->ckwqs, &asp_send, pm);
create_pthread(&pth, async_send_proc, pm);
}
/* Send a single message to a process instance and retrieve the response, then

4
src/ckpool.h

@ -210,8 +210,6 @@ struct ckpool_instance {
/* Private data for each process */
void *data;
/* Private generic workqueues if this process has them */
ckwq_t *ckwqs;
};
#ifdef USE_CKDB
@ -236,8 +234,6 @@ void empty_buffer(connsock_t *cs);
int read_socket_line(connsock_t *cs, const int timeout);
void _send_proc(proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line);
#define send_proc(pi, msg) _send_proc(pi, msg, __FILE__, __func__, __LINE__)
void _async_send_proc(ckpool_t *ckp, proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line);
#define async_send_proc(ckp, pi, msg) _async_send_proc(ckp, pi, msg, __FILE__, __func__, __LINE__)
char *_send_recv_proc(proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line);
#define send_recv_proc(pi, msg) _send_recv_proc(pi, msg, __FILE__, __func__, __LINE__)
char *_send_recv_ckdb(const ckpool_t *ckp, const char *msg, const char *file, const char *func, const int line);

10
src/connector.c

@ -97,8 +97,6 @@ struct connector_data {
/* For protecting the pending sends list */
mutex_t sender_lock;
pthread_cond_t sender_cond;
ckwq_t *ckwqs;
};
typedef struct connector_data cdata_t;
@ -244,7 +242,7 @@ static void stratifier_drop_client(ckpool_t *ckp, const int64_t id)
char buf[256];
sprintf(buf, "dropclient=%"PRId64, id);
async_send_proc(ckp, ckp->stratifier, buf);
send_proc(ckp->stratifier, buf);
}
/* Invalidate this instance. Remove them from the hashtables we look up
@ -362,9 +360,9 @@ reparse:
* filtered by the stratifier. */
if (likely(client->fd != -1)) {
if (ckp->passthrough)
async_send_proc(ckp, ckp->generator, s);
send_proc(ckp->generator, s);
else
async_send_proc(ckp, ckp->stratifier, s);
send_proc(ckp->stratifier, s);
}
free(s);
@ -870,8 +868,6 @@ int connector(proc_instance_t *pi)
LOGWARNING("%s connector starting", ckp->name);
ckp->data = cdata;
cdata->ckp = ckp;
/* Connector only requires one work queue */
ckp->ckwqs = cdata->ckwqs = create_ckwqs(ckp, "conn", 1);
if (!ckp->serverurls)
cdata->serverfd = ckalloc(sizeof(int *));

28
src/generator.c

@ -148,7 +148,6 @@ struct generator_data {
proxy_instance_t *dead_proxies; /* Disabled proxies */
int proxy_notify_id; // Globally increasing notify id
ckmsgq_t *srvchk; // Server check message queue
ckwq_t *ckwqs;
};
typedef struct generator_data gdata_t;
@ -238,7 +237,7 @@ retry:
cs = &alive->cs;
LOGINFO("Connected to live server %s:%s", cs->url, cs->port);
out:
async_send_proc(ckp, ckp->connector, alive ? "accept" : "reject");
send_proc(ckp->connector, alive ? "accept" : "reject");
return alive;
}
@ -384,7 +383,7 @@ retry:
ret = submit_block(cs, buf + 12 + 64 + 1);
memset(buf + 12 + 64, 0, 1);
sprintf(blockmsg, "%sblock:%s", ret ? "" : "no", buf + 12);
async_send_proc(ckp, ckp->stratifier, blockmsg);
send_proc(ckp->stratifier, blockmsg);
} else if (cmdmatch(buf, "checkaddr:")) {
if (validate_address(cs, buf + 10))
send_unix_msg(sockd, "true");
@ -949,7 +948,7 @@ static void send_stratifier_deadproxy(ckpool_t *ckp, const int64_t id, const int
char buf[256];
sprintf(buf, "deadproxy=%ld:%d", id, subid);
async_send_proc(ckp, ckp->stratifier, buf);
send_proc(ckp->stratifier, buf);
}
/* Remove the subproxy from the proxi list and put it on the dead list.
@ -1088,7 +1087,7 @@ static void send_diff(ckpool_t *ckp, proxy_instance_t *proxi)
json_decref(json_msg);
ASPRINTF(&buf, "diff=%s", msg);
free(msg);
async_send_proc(ckp, ckp->stratifier, buf);
send_proc(ckp->stratifier, buf);
free(buf);
}
@ -1116,7 +1115,7 @@ static void send_notify(ckpool_t *ckp, proxy_instance_t *proxi, notify_instance_
json_decref(json_msg);
ASPRINTF(&buf, "notify=%s", msg);
free(msg);
async_send_proc(ckp, ckp->stratifier, buf);
send_proc(ckp->stratifier, buf);
free(buf);
/* Send diff now as stratifier will not accept diff till it has a
@ -1309,7 +1308,7 @@ static void send_subscribe(ckpool_t *ckp, proxy_instance_t *proxi)
json_decref(json_msg);
ASPRINTF(&buf, "subscribe=%s", msg);
free(msg);
async_send_proc(ckp, ckp->stratifier, buf);
send_proc(ckp->stratifier, buf);
free(buf);
}
@ -1356,7 +1355,7 @@ static void stratifier_reconnect_client(ckpool_t *ckp, const int64_t id)
char buf[256];
sprintf(buf, "reconnclient=%"PRId64, id);
async_send_proc(ckp, ckp->stratifier, buf);
send_proc(ckp->stratifier, buf);
}
static void submit_share(gdata_t *gdata, json_t *val)
@ -1779,9 +1778,9 @@ static void reconnect_proxy(proxy_instance_t *proxi)
create_pthread(&pth, proxy_reconnect, proxi);
}
static void reconnect_generator(ckpool_t *ckp)
static void reconnect_generator(const ckpool_t *ckp)
{
async_send_proc(ckp, ckp->generator, "reconnect");
send_proc(ckp->generator, "reconnect");
}
/* For receiving messages from an upstream pool to pass downstream. Responsible
@ -1838,7 +1837,7 @@ static void *passthrough_recv(void *arg)
/* Simply forward the message on, as is, to the connector to
* process. Possibly parse parameters sent by upstream pool
* here */
async_send_proc(ckp, ckp->connector, cs->buf);
send_proc(ckp->connector, cs->buf);
}
return NULL;
}
@ -2046,10 +2045,10 @@ static proxy_instance_t *wait_best_proxy(ckpool_t *ckp, gdata_t *gdata)
if (ret)
break;
async_send_proc(ckp, ckp->connector, "reject");
send_proc(ckp->connector, "reject");
sleep(1);
}
async_send_proc(ckp, ckp->connector, ret ? "accept" : "reject");
send_proc(ckp->connector, ret ? "accept" : "reject");
return ret;
}
@ -2078,7 +2077,7 @@ reconnect:
proxi->low_id, cs->url, cs->port);
dealloc(buf);
ASPRINTF(&buf, "proxy=%ld", proxi->id);
async_send_proc(ckp, ckp->stratifier, buf);
send_proc(ckp->stratifier, buf);
}
}
retry:
@ -2267,7 +2266,6 @@ int generator(proc_instance_t *pi)
gdata = ckzalloc(sizeof(gdata_t));
ckp->data = gdata;
gdata->ckp = ckp;
ckp->ckwqs = gdata->ckwqs = create_ckwqs(ckp, "gen", 1);
if (ckp->proxy) {
char *buf = NULL;

25
src/stratifier.c

@ -828,7 +828,7 @@ static char *send_recv_generator(ckpool_t *ckp, const char *msg, const int prio)
return buf;
}
static void send_generator(ckpool_t *ckp, const char *msg, const int prio)
static void send_generator(const ckpool_t *ckp, const char *msg, const int prio)
{
sdata_t *sdata = ckp->data;
bool set;
@ -838,7 +838,7 @@ static void send_generator(ckpool_t *ckp, const char *msg, const int prio)
set = true;
} else
set = false;
async_send_proc(ckp, ckp->generator, msg);
send_proc(ckp->generator, msg);
if (set)
sdata->gen_priority = 0;
}
@ -963,7 +963,7 @@ static void connector_drop_client(ckpool_t *ckp, const int64_t id)
LOGDEBUG("Stratifier requesting connector drop client %"PRId64, id);
snprintf(buf, 255, "dropclient=%"PRId64, id);
async_send_proc(ckp, ckp->connector, buf);
send_proc(ckp->connector, buf);
}
static void drop_allclients(ckpool_t *ckp)
@ -1009,8 +1009,8 @@ static sdata_t *duplicate_sdata(const sdata_t *sdata)
memcpy(dsdata->donkeytxnbin, sdata->donkeytxnbin, 25);
/* Use the same work queues for all subproxies */
dsdata->ckwqs = sdata->ckwqs;
dsdata->ssends = sdata->ssends;
dsdata->ckwqs = sdata->ckwqs;
dsdata->ckdbq = sdata->ckdbq;
dsdata->sauthq = sdata->sauthq;
@ -1136,7 +1136,7 @@ out_unlock:
static void reconnect_client(sdata_t *sdata, stratum_instance_t *client);
static void generator_recruit(ckpool_t *ckp, const int recruits)
static void generator_recruit(const ckpool_t *ckp, const int recruits)
{
char buf[256];
@ -1774,7 +1774,7 @@ static void connector_test_client(ckpool_t *ckp, const int64_t id)
LOGDEBUG("Stratifier requesting connector test client %"PRId64, id);
snprintf(buf, 255, "testclient=%"PRId64, id);
async_send_proc(ckp, ckp->connector, buf);
send_proc(ckp->connector, buf);
}
/* For creating a list of sends without locking that can then be concatenated
@ -2531,7 +2531,7 @@ static void stratum_send_message(sdata_t *sdata, const stratum_instance_t *clien
* in proxy mode where we find a subproxy based on the current proxy with room
* for more clients. Signal the generator to recruit more subproxies if we are
* running out of room. */
static sdata_t *select_sdata(ckpool_t *ckp, sdata_t *ckp_sdata)
static sdata_t *select_sdata(const ckpool_t *ckp, sdata_t *ckp_sdata)
{
proxy_t *current, *proxy, *subproxy, *best = NULL, *tmp, *tmpsub;
int best_subid = 0, best_lowid;
@ -3938,7 +3938,6 @@ static void send_transactions(ckpool_t *ckp, json_params_t *jp);
static void parse_method(sdata_t *sdata, stratum_instance_t *client, const int64_t client_id,
json_t *id_val, json_t *method_val, json_t *params_val, const char *address)
{
ckpool_t *ckp = client->ckp;
const char *method;
/* Random broken clients send something not an integer as the id so we
@ -3985,14 +3984,14 @@ static void parse_method(sdata_t *sdata, stratum_instance_t *client, const int64
* to it since it's unauthorised. Set the flag just in case. */
client->authorised = false;
snprintf(buf, 255, "passthrough=%"PRId64, client_id);
async_send_proc(ckp, ckp->connector, buf);
send_proc(client->ckp->connector, buf);
return;
}
/* We should only accept subscribed requests from here on */
if (!client->subscribed) {
LOGINFO("Dropping unsubscribed client %"PRId64, client_id);
connector_drop_client(ckp, client_id);
connector_drop_client(client->ckp, client_id);
return;
}
@ -4014,7 +4013,7 @@ static void parse_method(sdata_t *sdata, stratum_instance_t *client, const int64
* stratifier process to restart since it will have lost all
* the stratum instance data. Clients will just reconnect. */
LOGINFO("Dropping unauthorised client %"PRId64, client_id);
connector_drop_client(ckp, client_id);
connector_drop_client(client->ckp, client_id);
return;
}
@ -4188,7 +4187,7 @@ static void ssend_process(ckpool_t *ckp, smsg_t *msg)
* connector process to be delivered */
json_object_set_new_nocheck(msg->json_msg, "client_id", json_integer(msg->client_id));
s = json_dumps(msg->json_msg, 0);
async_send_proc(ckp, ckp->connector, s);
send_proc(ckp->connector, s);
free(s);
free_smsg(msg);
}
@ -5051,7 +5050,7 @@ int stratifier(proc_instance_t *pi)
sdata->ssends = create_ckmsgq(ckp, "ssender", &ssend_process);
/* Create as many generic workqueue threads as there are CPUs */
threads = sysconf(_SC_NPROCESSORS_ONLN);
ckp->ckwqs = sdata->ckwqs = create_ckwqs(ckp, "strat", threads);
sdata->ckwqs = create_ckwqs(ckp, "strat", threads);
sdata->sauthq = create_ckmsgq(ckp, "authoriser", &sauth_process);
if (!CKP_STANDALONE(ckp)) {
sdata->ckdbq = create_ckmsgq(ckp, "ckdbqueue", &ckdbq_process);

Loading…
Cancel
Save