diff --git a/src/connector.c b/src/connector.c index 4eec5d72..bb3fea45 100644 --- a/src/connector.c +++ b/src/connector.c @@ -1226,11 +1226,17 @@ out: return ret; } -static void client_message_processor(ckpool_t *ckp, json_t *json_msg) +static void client_message_processor(ckpool_t *ckp, char *buf) { + json_t *json_msg = json_loads(buf, 0, NULL); int64_t client_id; char *msg; + if (unlikely(!json_msg)) { + LOGWARNING("Invalid json message in process_client_msg: %s", buf); + goto out; + } + /* Extract the client id from the json message and remove its entry */ client_id = json_integer_value(json_object_get(json_msg, "client_id")); json_object_del(json_msg, "client_id"); @@ -1242,18 +1248,8 @@ static void client_message_processor(ckpool_t *ckp, json_t *json_msg) msg = json_dumps(json_msg, JSON_EOL | JSON_COMPACT); send_client(ckp->data, client_id, msg); json_decref(json_msg); -} - -static void process_client_msg(cdata_t *cdata, const char *buf) -{ - json_t *json_msg = json_loads(buf, 0, NULL); - - if (unlikely(!json_msg)) { - LOGWARNING("Invalid json message in process_client_msg: %s", buf); - return; - } - - ckmsgq_add(cdata->cmpq, json_msg); +out: + free(buf); } /* Send the passthrough the terminate node.method */ @@ -1365,7 +1361,8 @@ retry: /* The bulk of the messages will be json messages to send to clients * so look for them first. */ if (likely(buf[0] == '{')) { - process_client_msg(cdata, buf); + ckmsgq_add(cdata->cmpq, buf); + umsg->buf = NULL; } else if (cmdmatch(buf, "upstream=")) { char *msg = strdup(buf + 9); diff --git a/src/stratifier.c b/src/stratifier.c index 4a63b713..e03ccf7b 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -5775,7 +5775,7 @@ static void set_worker_mindiff(ckpool_t *ckp, const char *workername, int mindif /* Then find the matching worker user */ worker = get_worker(sdata, user, workername); - if (mindiff < 1) { + if (mindiff < 0) { LOGINFO("Worker %s requested invalid diff %d", worker->workername, mindiff); return; } @@ -6783,7 +6783,7 @@ static bool test_and_clear(bool *val, mutex_t *lock) static void ckdbq_process(ckpool_t *ckp, char *msg) { sdata_t *sdata = ckp->data; - size_t responselen = 0; + size_t responselen; char *buf = NULL; while (!buf) { @@ -6803,16 +6803,15 @@ static void ckdbq_process(ckpool_t *ckp, char *msg) /* Process any requests from ckdb that are heartbeat responses with * specific requests. */ - if (likely(buf)) - responselen = strlen(buf); - if (likely(responselen > 0)) { + responselen = strlen(buf); + if (likely(responselen > 1)) { char *response = alloca(responselen); int offset = 0; memset(response, 0, responselen); - if (sscanf(buf, "%*d.%*d.%c%n", response, &offset) > 0) { - strcpy(response+1, buf+offset); - if (safecmp(response, "ok")) { + if (likely(sscanf(buf, "%*d.%*d.%c%n", response, &offset) > 0)) { + strcpy(response + 1, buf + offset); + if (likely(safecmp(response, "ok"))) { char *cmd; cmd = response; @@ -6826,8 +6825,8 @@ static void ckdbq_process(ckpool_t *ckp, char *msg) LOGWARNING("Got ckdb failure response: %s", buf); } else LOGWARNING("Got bad ckdb response: %s", buf); - free(buf); } + free(buf); } static int transactions_by_jobid(sdata_t *sdata, const int64_t id) @@ -7597,11 +7596,11 @@ int stratifier(proc_instance_t *pi) * are CPUs */ threads = sysconf(_SC_NPROCESSORS_ONLN) / 2 ? : 1; sdata->sshareq = create_ckmsgqs(ckp, "sprocessor", &sshare_process, threads); - sdata->ssends = create_ckmsgq(ckp, "ssender", &ssend_process); + sdata->ssends = create_ckmsgqs(ckp, "ssender", &ssend_process, threads); sdata->sauthq = create_ckmsgq(ckp, "authoriser", &sauth_process); sdata->stxnq = create_ckmsgq(ckp, "stxnq", &send_transactions); sdata->srecvs = create_ckmsgqs(ckp, "sreceiver", &srecv_process, threads); - sdata->ckdbq = create_ckmsgq(ckp, "ckdbqueue", &ckdbq_process); + sdata->ckdbq = create_ckmsgqs(ckp, "ckdbqueue", &ckdbq_process, threads); create_pthread(&pth_heartbeat, ckdb_heartbeat, ckp); read_poolstats(ckp);