kanoi 9 years ago
parent
commit
2137f733b6
  1. 25
      src/connector.c
  2. 19
      src/stratifier.c

25
src/connector.c

@ -1226,11 +1226,17 @@ out:
return ret; return ret;
} }
static void client_message_processor(ckpool_t *ckp, json_t *json_msg) static void client_message_processor(ckpool_t *ckp, char *buf)
{ {
json_t *json_msg = json_loads(buf, 0, NULL);
int64_t client_id; int64_t client_id;
char *msg; char *msg;
if (unlikely(!json_msg)) {
LOGWARNING("Invalid json message in process_client_msg: %s", buf);
goto out;
}
/* Extract the client id from the json message and remove its entry */ /* Extract the client id from the json message and remove its entry */
client_id = json_integer_value(json_object_get(json_msg, "client_id")); client_id = json_integer_value(json_object_get(json_msg, "client_id"));
json_object_del(json_msg, "client_id"); json_object_del(json_msg, "client_id");
@ -1242,18 +1248,8 @@ static void client_message_processor(ckpool_t *ckp, json_t *json_msg)
msg = json_dumps(json_msg, JSON_EOL | JSON_COMPACT); msg = json_dumps(json_msg, JSON_EOL | JSON_COMPACT);
send_client(ckp->data, client_id, msg); send_client(ckp->data, client_id, msg);
json_decref(json_msg); json_decref(json_msg);
} out:
free(buf);
static void process_client_msg(cdata_t *cdata, const char *buf)
{
json_t *json_msg = json_loads(buf, 0, NULL);
if (unlikely(!json_msg)) {
LOGWARNING("Invalid json message in process_client_msg: %s", buf);
return;
}
ckmsgq_add(cdata->cmpq, json_msg);
} }
/* Send the passthrough the terminate node.method */ /* Send the passthrough the terminate node.method */
@ -1365,7 +1361,8 @@ retry:
/* The bulk of the messages will be json messages to send to clients /* The bulk of the messages will be json messages to send to clients
* so look for them first. */ * so look for them first. */
if (likely(buf[0] == '{')) { if (likely(buf[0] == '{')) {
process_client_msg(cdata, buf); ckmsgq_add(cdata->cmpq, buf);
umsg->buf = NULL;
} else if (cmdmatch(buf, "upstream=")) { } else if (cmdmatch(buf, "upstream=")) {
char *msg = strdup(buf + 9); char *msg = strdup(buf + 9);

19
src/stratifier.c

@ -5775,7 +5775,7 @@ static void set_worker_mindiff(ckpool_t *ckp, const char *workername, int mindif
/* Then find the matching worker user */ /* Then find the matching worker user */
worker = get_worker(sdata, user, workername); worker = get_worker(sdata, user, workername);
if (mindiff < 1) { if (mindiff < 0) {
LOGINFO("Worker %s requested invalid diff %d", worker->workername, mindiff); LOGINFO("Worker %s requested invalid diff %d", worker->workername, mindiff);
return; return;
} }
@ -6783,7 +6783,7 @@ static bool test_and_clear(bool *val, mutex_t *lock)
static void ckdbq_process(ckpool_t *ckp, char *msg) static void ckdbq_process(ckpool_t *ckp, char *msg)
{ {
sdata_t *sdata = ckp->data; sdata_t *sdata = ckp->data;
size_t responselen = 0; size_t responselen;
char *buf = NULL; char *buf = NULL;
while (!buf) { while (!buf) {
@ -6803,16 +6803,15 @@ static void ckdbq_process(ckpool_t *ckp, char *msg)
/* Process any requests from ckdb that are heartbeat responses with /* Process any requests from ckdb that are heartbeat responses with
* specific requests. */ * specific requests. */
if (likely(buf))
responselen = strlen(buf); responselen = strlen(buf);
if (likely(responselen > 0)) { if (likely(responselen > 1)) {
char *response = alloca(responselen); char *response = alloca(responselen);
int offset = 0; int offset = 0;
memset(response, 0, responselen); memset(response, 0, responselen);
if (sscanf(buf, "%*d.%*d.%c%n", response, &offset) > 0) { if (likely(sscanf(buf, "%*d.%*d.%c%n", response, &offset) > 0)) {
strcpy(response+1, buf+offset); strcpy(response + 1, buf + offset);
if (safecmp(response, "ok")) { if (likely(safecmp(response, "ok"))) {
char *cmd; char *cmd;
cmd = response; cmd = response;
@ -6826,8 +6825,8 @@ static void ckdbq_process(ckpool_t *ckp, char *msg)
LOGWARNING("Got ckdb failure response: %s", buf); LOGWARNING("Got ckdb failure response: %s", buf);
} else } else
LOGWARNING("Got bad ckdb response: %s", buf); LOGWARNING("Got bad ckdb response: %s", buf);
free(buf);
} }
free(buf);
} }
static int transactions_by_jobid(sdata_t *sdata, const int64_t id) static int transactions_by_jobid(sdata_t *sdata, const int64_t id)
@ -7597,11 +7596,11 @@ int stratifier(proc_instance_t *pi)
* are CPUs */ * are CPUs */
threads = sysconf(_SC_NPROCESSORS_ONLN) / 2 ? : 1; threads = sysconf(_SC_NPROCESSORS_ONLN) / 2 ? : 1;
sdata->sshareq = create_ckmsgqs(ckp, "sprocessor", &sshare_process, threads); sdata->sshareq = create_ckmsgqs(ckp, "sprocessor", &sshare_process, threads);
sdata->ssends = create_ckmsgq(ckp, "ssender", &ssend_process); sdata->ssends = create_ckmsgqs(ckp, "ssender", &ssend_process, threads);
sdata->sauthq = create_ckmsgq(ckp, "authoriser", &sauth_process); sdata->sauthq = create_ckmsgq(ckp, "authoriser", &sauth_process);
sdata->stxnq = create_ckmsgq(ckp, "stxnq", &send_transactions); sdata->stxnq = create_ckmsgq(ckp, "stxnq", &send_transactions);
sdata->srecvs = create_ckmsgqs(ckp, "sreceiver", &srecv_process, threads); sdata->srecvs = create_ckmsgqs(ckp, "sreceiver", &srecv_process, threads);
sdata->ckdbq = create_ckmsgq(ckp, "ckdbqueue", &ckdbq_process); sdata->ckdbq = create_ckmsgqs(ckp, "ckdbqueue", &ckdbq_process, threads);
create_pthread(&pth_heartbeat, ckdb_heartbeat, ckp); create_pthread(&pth_heartbeat, ckdb_heartbeat, ckp);
read_poolstats(ckp); read_poolstats(ckp);

Loading…
Cancel
Save