|
|
|
@ -112,7 +112,9 @@ static bool socketer_using_data;
|
|
|
|
|
static bool summariser_using_data; |
|
|
|
|
static bool marker_using_data; |
|
|
|
|
static bool logger_using_data; |
|
|
|
|
static bool listener_using_data; |
|
|
|
|
static bool plistener_using_data; |
|
|
|
|
static bool clistener_using_data; |
|
|
|
|
static bool blistener_using_data; |
|
|
|
|
|
|
|
|
|
char *EMPTY = ""; |
|
|
|
|
const char *nullstr = "(null)"; |
|
|
|
@ -303,7 +305,9 @@ K_STORE *msgline_store;
|
|
|
|
|
|
|
|
|
|
// WORKQUEUE
|
|
|
|
|
K_LIST *workqueue_free; |
|
|
|
|
K_STORE *workqueue_store; |
|
|
|
|
K_STORE *pool_workqueue_store; |
|
|
|
|
K_STORE *cmd_workqueue_store; |
|
|
|
|
K_STORE *btc_workqueue_store; |
|
|
|
|
mutex_t wq_waitlock; |
|
|
|
|
pthread_cond_t wq_waitcond; |
|
|
|
|
|
|
|
|
@ -1011,7 +1015,9 @@ static void alloc_storage()
|
|
|
|
|
|
|
|
|
|
workqueue_free = k_new_list("WorkQueue", sizeof(WORKQUEUE), |
|
|
|
|
ALLOC_WORKQUEUE, LIMIT_WORKQUEUE, true); |
|
|
|
|
workqueue_store = k_new_store(workqueue_free); |
|
|
|
|
pool_workqueue_store = k_new_store(workqueue_free); |
|
|
|
|
cmd_workqueue_store = k_new_store(workqueue_free); |
|
|
|
|
btc_workqueue_store = k_new_store(workqueue_free); |
|
|
|
|
|
|
|
|
|
heartbeatqueue_free = k_new_list("HeartBeatQueue", |
|
|
|
|
sizeof(HEARTBEATQUEUE), |
|
|
|
@ -1507,7 +1513,10 @@ static void dealloc_storage()
|
|
|
|
|
|
|
|
|
|
FREE_LIST(transfer); |
|
|
|
|
FREE_LISTS(heartbeatqueue); |
|
|
|
|
FREE_LISTS(workqueue); |
|
|
|
|
FREE_STORE(pool_workqueue); |
|
|
|
|
FREE_STORE(cmd_workqueue); |
|
|
|
|
FREE_STORE(btc_workqueue); |
|
|
|
|
FREE_LIST(workqueue); |
|
|
|
|
FREE_LISTS(msgline); |
|
|
|
|
|
|
|
|
|
if (free_mode != FREE_MODE_ALL) |
|
|
|
@ -2877,7 +2886,9 @@ static enum cmd_values breakdown(K_ITEM **ml_item, char *buf, tv_t *now,
|
|
|
|
|
if (find_in_ktree_nolock(msgline->trf_root, t_item, ctx)) { |
|
|
|
|
if (transfer->mvalue != transfer->svalue) |
|
|
|
|
FREENULL(transfer->mvalue); |
|
|
|
|
K_WLOCK(transfer_free); |
|
|
|
|
k_add_head(transfer_free, t_item); |
|
|
|
|
K_WUNLOCK(transfer_free); |
|
|
|
|
} else { |
|
|
|
|
add_to_ktree_nolock(msgline->trf_root, t_item); |
|
|
|
|
k_add_head_nolock(msgline->trf_store, t_item); |
|
|
|
@ -3922,33 +3933,138 @@ static void *logger(__maybe_unused void *arg)
|
|
|
|
|
return NULL; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
#define STORELASTREPLY(_cmd) do { \ |
|
|
|
|
if (last_ ## _cmd) \
|
|
|
|
|
free(last_ ## _cmd); \
|
|
|
|
|
last_ ## _cmd = buf; \
|
|
|
|
|
buf = NULL; \
|
|
|
|
|
if (reply_ ## _cmd) \
|
|
|
|
|
free(reply_ ## _cmd); \
|
|
|
|
|
reply_ ## _cmd = rep; \
|
|
|
|
|
} while (0) |
|
|
|
|
static void process_sockd(PGconn *conn, K_ITEM *wq_item) |
|
|
|
|
{ |
|
|
|
|
WORKQUEUE *workqueue; |
|
|
|
|
MSGLINE *msgline; |
|
|
|
|
K_ITEM *ml_item; |
|
|
|
|
char *ans, *rep; |
|
|
|
|
size_t siz; |
|
|
|
|
|
|
|
|
|
DATA_WORKQUEUE(workqueue, wq_item); |
|
|
|
|
ml_item = workqueue->msgline_item; |
|
|
|
|
DATA_MSGLINE(msgline, ml_item); |
|
|
|
|
|
|
|
|
|
ans = ckdb_cmds[msgline->which_cmds].func(conn, |
|
|
|
|
msgline->cmd, |
|
|
|
|
msgline->id, |
|
|
|
|
&(msgline->now), |
|
|
|
|
workqueue->by, |
|
|
|
|
workqueue->code, |
|
|
|
|
workqueue->inet, |
|
|
|
|
&(msgline->cd), |
|
|
|
|
msgline->trf_root); |
|
|
|
|
siz = strlen(ans) + strlen(msgline->id) + 32; |
|
|
|
|
rep = malloc(siz); |
|
|
|
|
snprintf(rep, siz, "%s.%ld.%s", |
|
|
|
|
msgline->id, |
|
|
|
|
msgline->now.tv_sec, ans); |
|
|
|
|
send_unix_msg(msgline->sockd, rep); |
|
|
|
|
close(msgline->sockd); |
|
|
|
|
FREENULL(ans); |
|
|
|
|
FREENULL(rep); |
|
|
|
|
|
|
|
|
|
free_msgline_data(ml_item, true, true); |
|
|
|
|
K_WLOCK(msgline_free); |
|
|
|
|
k_add_head(msgline_free, ml_item); |
|
|
|
|
K_WUNLOCK(msgline_free); |
|
|
|
|
|
|
|
|
|
K_WLOCK(workqueue_free); |
|
|
|
|
k_add_head(workqueue_free, wq_item); |
|
|
|
|
if (workqueue_free->count == workqueue_free->total && |
|
|
|
|
workqueue_free->total >= ALLOC_WORKQUEUE * CULL_WORKQUEUE) |
|
|
|
|
k_cull_list(workqueue_free); |
|
|
|
|
K_WUNLOCK(workqueue_free); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void *clistener(__maybe_unused void *arg) |
|
|
|
|
{ |
|
|
|
|
PGconn *conn = NULL; |
|
|
|
|
K_ITEM *wq_item; |
|
|
|
|
time_t now; |
|
|
|
|
|
|
|
|
|
LOCK_INIT("db_clistener"); |
|
|
|
|
rename_proc("db_clistener"); |
|
|
|
|
|
|
|
|
|
clistener_using_data = true; |
|
|
|
|
|
|
|
|
|
conn = dbconnect(); |
|
|
|
|
now = time(NULL); |
|
|
|
|
|
|
|
|
|
while (!everyone_die) { |
|
|
|
|
K_WLOCK(workqueue_free); |
|
|
|
|
wq_item = k_unlink_head(cmd_workqueue_store); |
|
|
|
|
K_WUNLOCK(workqueue_free); |
|
|
|
|
|
|
|
|
|
// Don't keep a connection for more than ~10s
|
|
|
|
|
if ((time(NULL) - now) > 10) { |
|
|
|
|
PQfinish(conn); |
|
|
|
|
conn = dbconnect(); |
|
|
|
|
now = time(NULL); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if (wq_item) { |
|
|
|
|
process_sockd(conn, wq_item); |
|
|
|
|
tick(); |
|
|
|
|
} else |
|
|
|
|
cksleep_ms(42); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
clistener_using_data = false; |
|
|
|
|
|
|
|
|
|
if (conn) |
|
|
|
|
PQfinish(conn); |
|
|
|
|
|
|
|
|
|
return NULL; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void *blistener(__maybe_unused void *arg) |
|
|
|
|
{ |
|
|
|
|
PGconn *conn = NULL; |
|
|
|
|
K_ITEM *wq_item; |
|
|
|
|
time_t now; |
|
|
|
|
|
|
|
|
|
LOCK_INIT("db_blistener"); |
|
|
|
|
rename_proc("db_blistener"); |
|
|
|
|
|
|
|
|
|
blistener_using_data = true; |
|
|
|
|
|
|
|
|
|
conn = dbconnect(); |
|
|
|
|
now = time(NULL); |
|
|
|
|
|
|
|
|
|
while (!everyone_die) { |
|
|
|
|
K_WLOCK(workqueue_free); |
|
|
|
|
wq_item = k_unlink_head(btc_workqueue_store); |
|
|
|
|
K_WUNLOCK(workqueue_free); |
|
|
|
|
|
|
|
|
|
// Don't keep a connection for more than ~10s
|
|
|
|
|
if ((time(NULL) - now) > 10) { |
|
|
|
|
PQfinish(conn); |
|
|
|
|
conn = dbconnect(); |
|
|
|
|
now = time(NULL); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if (wq_item) { |
|
|
|
|
process_sockd(conn, wq_item); |
|
|
|
|
tick(); |
|
|
|
|
} else
|
|
|
|
|
cksleep_ms(142); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
blistener_using_data = false; |
|
|
|
|
|
|
|
|
|
if (conn) |
|
|
|
|
PQfinish(conn); |
|
|
|
|
|
|
|
|
|
return NULL; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void *socketer(__maybe_unused void *arg) |
|
|
|
|
{ |
|
|
|
|
proc_instance_t *pi = (proc_instance_t *)arg; |
|
|
|
|
pthread_t clis_pt, blis_pt; |
|
|
|
|
unixsock_t *us = &pi->us; |
|
|
|
|
char *end, *ans = NULL, *rep = NULL, *buf = NULL, *dot; |
|
|
|
|
// No dup check for pool stats, the SEQ code will handle that
|
|
|
|
|
char *last_chkpass = NULL, *reply_chkpass = NULL; |
|
|
|
|
char *last_adduser = NULL, *reply_adduser = NULL; |
|
|
|
|
char *last_newpass = NULL, *reply_newpass = NULL; |
|
|
|
|
char *last_userset = NULL, *reply_userset = NULL; |
|
|
|
|
char *last_workerset = NULL, *reply_workerset = NULL; |
|
|
|
|
char *last_newid = NULL, *reply_newid = NULL; |
|
|
|
|
char *last_setatts = NULL, *reply_setatts = NULL; |
|
|
|
|
char *last_setopts = NULL, *reply_setopts = NULL; |
|
|
|
|
char *last_userstatus = NULL, *reply_userstatus = NULL; |
|
|
|
|
char *last_web = NULL, *reply_web = NULL; |
|
|
|
|
char *reply_last, duptype[CMD_SIZ+1]; |
|
|
|
|
char *end, *ans = NULL, *rep = NULL, *buf = NULL; |
|
|
|
|
enum cmd_values cmdnum; |
|
|
|
|
int sockd; |
|
|
|
|
K_ITEM *wq_item = NULL, *ml_item = NULL; |
|
|
|
@ -3957,7 +4073,7 @@ static void *socketer(__maybe_unused void *arg)
|
|
|
|
|
char reply[1024+1]; |
|
|
|
|
size_t siz; |
|
|
|
|
tv_t now; |
|
|
|
|
bool dup, want_first, show_dup, replied; |
|
|
|
|
bool want_first, replied, btc; |
|
|
|
|
int loglevel, oldloglevel; |
|
|
|
|
|
|
|
|
|
pthread_detach(pthread_self()); |
|
|
|
@ -3971,6 +4087,10 @@ static void *socketer(__maybe_unused void *arg)
|
|
|
|
|
if (!everyone_die) { |
|
|
|
|
LOGWARNING("%s() Start processing...", __func__); |
|
|
|
|
socketer_using_data = true; |
|
|
|
|
|
|
|
|
|
create_pthread(&clis_pt, clistener, NULL); |
|
|
|
|
|
|
|
|
|
create_pthread(&blis_pt, blistener, NULL); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
want_first = true; |
|
|
|
@ -3979,7 +4099,7 @@ static void *socketer(__maybe_unused void *arg)
|
|
|
|
|
dealloc(buf); |
|
|
|
|
sockd = accept(us->sockd, NULL, NULL); |
|
|
|
|
if (sockd < 0) { |
|
|
|
|
LOGERR("Failed to accept on socket in listener"); |
|
|
|
|
LOGERR("%s() Failed to accept on socket", __func__); |
|
|
|
|
break; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -3997,83 +4117,16 @@ static void *socketer(__maybe_unused void *arg)
|
|
|
|
|
if (!buf || !*buf) { |
|
|
|
|
// An empty message wont get a reply
|
|
|
|
|
if (!buf) |
|
|
|
|
LOGWARNING("Failed to get message in listener"); |
|
|
|
|
LOGWARNING("%s() Failed to get message", __func__); |
|
|
|
|
else |
|
|
|
|
LOGWARNING("Empty message in listener"); |
|
|
|
|
} else { |
|
|
|
|
/* For duplicates:
|
|
|
|
|
* Queued pool messages are handled by the queue code |
|
|
|
|
* but since they reply ok.queued that message can |
|
|
|
|
* be returned every time here |
|
|
|
|
* System: repeat process them |
|
|
|
|
* Web: current php web sends a timestamp of seconds |
|
|
|
|
* so duplicate code will only trigger if the same |
|
|
|
|
* message is sent within the same second and thus |
|
|
|
|
* will effectively reduce the processing load for |
|
|
|
|
* sequential duplicates |
|
|
|
|
* As per the 'if' list below, |
|
|
|
|
* remember individual last messages and replies and |
|
|
|
|
* repeat the reply without reprocessing the message |
|
|
|
|
* The rest are remembered in the same buffer 'web' |
|
|
|
|
* so a duplicate will not be seen if another 'web' |
|
|
|
|
* command arrived between two duplicate commands |
|
|
|
|
*/ |
|
|
|
|
dup = false; |
|
|
|
|
show_dup = true; |
|
|
|
|
// These are ordered approximately most likely first
|
|
|
|
|
if (last_chkpass && strcmp(last_chkpass, buf) == 0) { |
|
|
|
|
reply_last = reply_chkpass; |
|
|
|
|
dup = true; |
|
|
|
|
} else if (last_adduser && strcmp(last_adduser, buf) == 0) { |
|
|
|
|
reply_last = reply_adduser; |
|
|
|
|
dup = true; |
|
|
|
|
} else if (last_newpass && strcmp(last_newpass, buf) == 0) { |
|
|
|
|
reply_last = reply_newpass; |
|
|
|
|
dup = true; |
|
|
|
|
} else if (last_newid && strcmp(last_newid, buf) == 0) { |
|
|
|
|
reply_last = reply_newid; |
|
|
|
|
dup = true; |
|
|
|
|
} else if (last_userset && strcmp(last_userset, buf) == 0) { |
|
|
|
|
reply_last = reply_userset; |
|
|
|
|
dup = true; |
|
|
|
|
} else if (last_workerset && strcmp(last_workerset, buf) == 0) { |
|
|
|
|
reply_last = reply_workerset; |
|
|
|
|
dup = true; |
|
|
|
|
} else if (last_setatts && strcmp(last_setatts, buf) == 0) { |
|
|
|
|
reply_last = reply_setatts; |
|
|
|
|
dup = true; |
|
|
|
|
} else if (last_setopts && strcmp(last_setopts, buf) == 0) { |
|
|
|
|
reply_last = reply_setopts; |
|
|
|
|
dup = true; |
|
|
|
|
} else if (last_userstatus && strcmp(last_userstatus, buf) == 0) { |
|
|
|
|
reply_last = reply_userstatus; |
|
|
|
|
dup = true; |
|
|
|
|
} else if (last_web && strcmp(last_web, buf) == 0) { |
|
|
|
|
reply_last = reply_web; |
|
|
|
|
dup = true; |
|
|
|
|
show_dup = false; |
|
|
|
|
} |
|
|
|
|
if (dup) { |
|
|
|
|
send_unix_msg(sockd, reply_last); |
|
|
|
|
STRNCPY(duptype, buf); |
|
|
|
|
dot = strchr(duptype, '.'); |
|
|
|
|
if (dot) |
|
|
|
|
*dot = '\0'; |
|
|
|
|
snprintf(reply, sizeof(reply), "%s%ld,%ld.%s", |
|
|
|
|
LOGDUP, now.tv_sec, now.tv_usec, duptype); |
|
|
|
|
// dup cant be pool
|
|
|
|
|
LOGQUE(reply, false); |
|
|
|
|
if (show_dup) |
|
|
|
|
LOGWARNING("Duplicate '%s' message received", duptype); |
|
|
|
|
else |
|
|
|
|
LOGDEBUG("Duplicate '%s' message received", duptype); |
|
|
|
|
LOGWARNING("%s() Empty message", __func__); |
|
|
|
|
} else { |
|
|
|
|
int seqentryflags = SE_SOCKET; |
|
|
|
|
if (!reload_queue_complete) |
|
|
|
|
seqentryflags = SE_EARLYSOCK; |
|
|
|
|
cmdnum = breakdown(&ml_item, buf, &now, seqentryflags); |
|
|
|
|
DATA_MSGLINE(msgline, ml_item); |
|
|
|
|
replied = false; |
|
|
|
|
replied = btc = false; |
|
|
|
|
switch (cmdnum) { |
|
|
|
|
case CMD_REPLY: |
|
|
|
|
snprintf(reply, sizeof(reply), |
|
|
|
@ -4168,11 +4221,13 @@ static void *socketer(__maybe_unused void *arg)
|
|
|
|
|
if (global_ckp && global_ckp->logfd) |
|
|
|
|
fflush(global_ckp->logfp); |
|
|
|
|
break; |
|
|
|
|
case CMD_USERSET: |
|
|
|
|
case CMD_BTCSET: |
|
|
|
|
btc = true; |
|
|
|
|
case CMD_CHKPASS: |
|
|
|
|
case CMD_2FA: |
|
|
|
|
case CMD_ADDUSER: |
|
|
|
|
case CMD_NEWPASS: |
|
|
|
|
case CMD_USERSET: |
|
|
|
|
case CMD_WORKERSET: |
|
|
|
|
case CMD_GETATTS: |
|
|
|
|
case CMD_SETATTS: |
|
|
|
@ -4185,57 +4240,22 @@ static void *socketer(__maybe_unused void *arg)
|
|
|
|
|
case CMD_USERSTATUS: |
|
|
|
|
case CMD_SHSTA: |
|
|
|
|
case CMD_USERINFO: |
|
|
|
|
case CMD_BTCSET: |
|
|
|
|
case CMD_LOCKS: |
|
|
|
|
ans = ckdb_cmds[msgline->which_cmds].func(NULL, |
|
|
|
|
msgline->cmd, |
|
|
|
|
msgline->id, |
|
|
|
|
&(msgline->now), |
|
|
|
|
by_default, |
|
|
|
|
(char *)__func__, |
|
|
|
|
inet_default, |
|
|
|
|
&(msgline->cd), |
|
|
|
|
msgline->trf_root); |
|
|
|
|
siz = strlen(ans) + strlen(msgline->id) + 32; |
|
|
|
|
rep = malloc(siz); |
|
|
|
|
snprintf(rep, siz, "%s.%ld.%s", |
|
|
|
|
msgline->id, |
|
|
|
|
now.tv_sec, ans); |
|
|
|
|
send_unix_msg(sockd, rep); |
|
|
|
|
FREENULL(ans); |
|
|
|
|
switch (cmdnum) { |
|
|
|
|
case CMD_CHKPASS: |
|
|
|
|
STORELASTREPLY(chkpass); |
|
|
|
|
break; |
|
|
|
|
case CMD_ADDUSER: |
|
|
|
|
STORELASTREPLY(adduser); |
|
|
|
|
break; |
|
|
|
|
case CMD_NEWPASS: |
|
|
|
|
STORELASTREPLY(newpass); |
|
|
|
|
break; |
|
|
|
|
case CMD_USERSET: |
|
|
|
|
STORELASTREPLY(userset); |
|
|
|
|
break; |
|
|
|
|
case CMD_WORKERSET: |
|
|
|
|
STORELASTREPLY(workerset); |
|
|
|
|
break; |
|
|
|
|
case CMD_NEWID: |
|
|
|
|
STORELASTREPLY(newid); |
|
|
|
|
break; |
|
|
|
|
case CMD_SETATTS: |
|
|
|
|
STORELASTREPLY(setatts); |
|
|
|
|
break; |
|
|
|
|
case CMD_SETOPTS: |
|
|
|
|
STORELASTREPLY(setopts); |
|
|
|
|
break; |
|
|
|
|
case CMD_USERSTATUS: |
|
|
|
|
STORELASTREPLY(userstatus); |
|
|
|
|
break; |
|
|
|
|
// The rest
|
|
|
|
|
default: |
|
|
|
|
free(rep); |
|
|
|
|
} |
|
|
|
|
rep = NULL; |
|
|
|
|
msgline->sockd = sockd; |
|
|
|
|
sockd = -1; |
|
|
|
|
K_WLOCK(workqueue_free); |
|
|
|
|
wq_item = k_unlink_head(workqueue_free); |
|
|
|
|
DATA_WORKQUEUE(workqueue, wq_item); |
|
|
|
|
workqueue->msgline_item = ml_item; |
|
|
|
|
workqueue->by = by_default; |
|
|
|
|
workqueue->code = (char *)__func__; |
|
|
|
|
workqueue->inet = inet_default; |
|
|
|
|
if (btc) |
|
|
|
|
k_add_tail(btc_workqueue_store, wq_item); |
|
|
|
|
else |
|
|
|
|
k_add_tail(cmd_workqueue_store, wq_item); |
|
|
|
|
K_WUNLOCK(workqueue_free); |
|
|
|
|
wq_item = ml_item = NULL; |
|
|
|
|
break; |
|
|
|
|
// Process, but reject (loading) until startup_complete
|
|
|
|
|
case CMD_HOMEPAGE: |
|
|
|
@ -4250,47 +4270,6 @@ static void *socketer(__maybe_unused void *arg)
|
|
|
|
|
case CMD_PSHIFT: |
|
|
|
|
case CMD_DSP: |
|
|
|
|
case CMD_BLOCKSTATUS: |
|
|
|
|
if (!startup_complete) { |
|
|
|
|
snprintf(reply, sizeof(reply), |
|
|
|
|
"%s.%ld.loading.%s", |
|
|
|
|
msgline->id, |
|
|
|
|
now.tv_sec, |
|
|
|
|
msgline->cmd); |
|
|
|
|
send_unix_msg(sockd, reply); |
|
|
|
|
} else { |
|
|
|
|
DATA_MSGLINE(msgline, ml_item); |
|
|
|
|
ans = ckdb_cmds[msgline->which_cmds].func(NULL, |
|
|
|
|
msgline->cmd, |
|
|
|
|
msgline->id, |
|
|
|
|
&(msgline->now), |
|
|
|
|
by_default, |
|
|
|
|
(char *)__func__, |
|
|
|
|
inet_default, |
|
|
|
|
&(msgline->cd), |
|
|
|
|
msgline->trf_root); |
|
|
|
|
siz = strlen(ans) + strlen(msgline->id) + 32; |
|
|
|
|
rep = malloc(siz); |
|
|
|
|
snprintf(rep, siz, "%s.%ld.%s", |
|
|
|
|
msgline->id, |
|
|
|
|
now.tv_sec, ans); |
|
|
|
|
send_unix_msg(sockd, rep); |
|
|
|
|
FREENULL(ans); |
|
|
|
|
if (cmdnum == CMD_DSP) |
|
|
|
|
free(rep); |
|
|
|
|
else { |
|
|
|
|
if (last_web) |
|
|
|
|
free(last_web); |
|
|
|
|
last_web = buf; |
|
|
|
|
buf = NULL; |
|
|
|
|
if (reply_web) |
|
|
|
|
free(reply_web); |
|
|
|
|
reply_web = rep; |
|
|
|
|
} |
|
|
|
|
rep = NULL; |
|
|
|
|
} |
|
|
|
|
break; |
|
|
|
|
/* Process, but reject (loading) until startup_complete
|
|
|
|
|
* and don't test for duplicates */ |
|
|
|
|
case CMD_MARKS: |
|
|
|
|
case CMD_QUERY: |
|
|
|
|
if (!startup_complete) { |
|
|
|
@ -4301,24 +4280,21 @@ static void *socketer(__maybe_unused void *arg)
|
|
|
|
|
msgline->cmd); |
|
|
|
|
send_unix_msg(sockd, reply); |
|
|
|
|
} else { |
|
|
|
|
DATA_MSGLINE(msgline, ml_item); |
|
|
|
|
ans = ckdb_cmds[msgline->which_cmds].func(NULL, |
|
|
|
|
msgline->cmd, |
|
|
|
|
msgline->id, |
|
|
|
|
&(msgline->now), |
|
|
|
|
by_default, |
|
|
|
|
(char *)__func__, |
|
|
|
|
inet_default, |
|
|
|
|
&(msgline->cd), |
|
|
|
|
msgline->trf_root); |
|
|
|
|
siz = strlen(ans) + strlen(msgline->id) + 32; |
|
|
|
|
rep = malloc(siz); |
|
|
|
|
snprintf(rep, siz, "%s.%ld.%s", |
|
|
|
|
msgline->id, |
|
|
|
|
now.tv_sec, ans); |
|
|
|
|
send_unix_msg(sockd, rep); |
|
|
|
|
FREENULL(ans); |
|
|
|
|
FREENULL(rep); |
|
|
|
|
msgline->sockd = sockd; |
|
|
|
|
sockd = -1; |
|
|
|
|
K_WLOCK(workqueue_free); |
|
|
|
|
wq_item = k_unlink_head(workqueue_free); |
|
|
|
|
DATA_WORKQUEUE(workqueue, wq_item); |
|
|
|
|
workqueue->msgline_item = ml_item; |
|
|
|
|
workqueue->by = by_default; |
|
|
|
|
workqueue->code = (char *)__func__; |
|
|
|
|
workqueue->inet = inet_default; |
|
|
|
|
if (btc) |
|
|
|
|
k_add_tail(btc_workqueue_store, wq_item); |
|
|
|
|
else |
|
|
|
|
k_add_tail(cmd_workqueue_store, wq_item); |
|
|
|
|
K_WUNLOCK(workqueue_free); |
|
|
|
|
wq_item = ml_item = NULL; |
|
|
|
|
} |
|
|
|
|
break; |
|
|
|
|
// Always process immediately:
|
|
|
|
@ -4378,11 +4354,11 @@ static void *socketer(__maybe_unused void *arg)
|
|
|
|
|
workqueue->by = by_default; |
|
|
|
|
workqueue->code = (char *)__func__; |
|
|
|
|
workqueue->inet = inet_default; |
|
|
|
|
k_add_tail(workqueue_store, wq_item); |
|
|
|
|
k_add_tail(pool_workqueue_store, wq_item); |
|
|
|
|
/* Stop the reload queue from growing too big
|
|
|
|
|
* Use a size that should be big enough */ |
|
|
|
|
if (reloading && workqueue_store->count > 250000) { |
|
|
|
|
K_ITEM *wq2_item = k_unlink_head(workqueue_store); |
|
|
|
|
if (reloading && pool_workqueue_store->count > 250000) { |
|
|
|
|
K_ITEM *wq2_item = k_unlink_head(pool_workqueue_store); |
|
|
|
|
K_WUNLOCK(workqueue_free); |
|
|
|
|
WORKQUEUE *wq; |
|
|
|
|
DATA_WORKQUEUE(wq, wq2_item); |
|
|
|
@ -4395,7 +4371,7 @@ static void *socketer(__maybe_unused void *arg)
|
|
|
|
|
k_add_head(workqueue_free, wq2_item); |
|
|
|
|
} |
|
|
|
|
K_WUNLOCK(workqueue_free); |
|
|
|
|
ml_item = NULL; |
|
|
|
|
wq_item = ml_item = NULL; |
|
|
|
|
mutex_lock(&wq_waitlock); |
|
|
|
|
pthread_cond_signal(&wq_waitcond); |
|
|
|
|
mutex_unlock(&wq_waitlock); |
|
|
|
@ -4413,7 +4389,7 @@ static void *socketer(__maybe_unused void *arg)
|
|
|
|
|
break; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
if (sockd >= 0) |
|
|
|
|
close(sockd); |
|
|
|
|
|
|
|
|
|
if (ml_item) { |
|
|
|
@ -4431,7 +4407,6 @@ static void *socketer(__maybe_unused void *arg)
|
|
|
|
|
|
|
|
|
|
if (buf) |
|
|
|
|
dealloc(buf); |
|
|
|
|
// TODO: if anyone cares, free all the dup buffers :P
|
|
|
|
|
close_unix_socket(us->sockd, us->path); |
|
|
|
|
|
|
|
|
|
return NULL; |
|
|
|
@ -4910,7 +4885,8 @@ static void *listener(void *arg)
|
|
|
|
|
K_ITEM *ss_item; |
|
|
|
|
int i; |
|
|
|
|
|
|
|
|
|
LOCK_INIT("db_listener"); |
|
|
|
|
LOCK_INIT("db_plistener"); |
|
|
|
|
rename_proc("db_plistener"); |
|
|
|
|
|
|
|
|
|
logqueue_free = k_new_list("LogQueue", sizeof(LOGQUEUE), |
|
|
|
|
ALLOC_LOGQUEUE, LIMIT_LOGQUEUE, true); |
|
|
|
@ -4928,9 +4904,7 @@ static void *listener(void *arg)
|
|
|
|
|
|
|
|
|
|
create_pthread(&mark_pt, marker, NULL); |
|
|
|
|
|
|
|
|
|
rename_proc("db_listener"); |
|
|
|
|
|
|
|
|
|
listener_using_data = true; |
|
|
|
|
plistener_using_data = true; |
|
|
|
|
|
|
|
|
|
if (!setup_data()) { |
|
|
|
|
if (!everyone_die) { |
|
|
|
@ -4942,7 +4916,7 @@ static void *listener(void *arg)
|
|
|
|
|
|
|
|
|
|
if (!everyone_die) { |
|
|
|
|
K_RLOCK(workqueue_free); |
|
|
|
|
wqcount = workqueue_store->count; |
|
|
|
|
wqcount = pool_workqueue_store->count; |
|
|
|
|
K_RUNLOCK(workqueue_free); |
|
|
|
|
|
|
|
|
|
LOGWARNING("reload shares OoO %s", ooo_status(ooo_buf, sizeof(ooo_buf))); |
|
|
|
@ -4973,8 +4947,8 @@ static void *listener(void *arg)
|
|
|
|
|
// Process queued work
|
|
|
|
|
while (!everyone_die) { |
|
|
|
|
K_WLOCK(workqueue_free); |
|
|
|
|
wq_item = k_unlink_head(workqueue_store); |
|
|
|
|
left = workqueue_store->count; |
|
|
|
|
wq_item = k_unlink_head(pool_workqueue_store); |
|
|
|
|
left = pool_workqueue_store->count; |
|
|
|
|
K_WUNLOCK(workqueue_free); |
|
|
|
|
|
|
|
|
|
if (left == 0 && wq_stt.tv_sec != 0L) |
|
|
|
@ -5049,7 +5023,7 @@ static void *listener(void *arg)
|
|
|
|
|
|
|
|
|
|
sayonara: |
|
|
|
|
|
|
|
|
|
listener_using_data = false; |
|
|
|
|
plistener_using_data = false; |
|
|
|
|
|
|
|
|
|
if (conn) |
|
|
|
|
PQfinish(conn); |
|
|
|
@ -5938,7 +5912,8 @@ int main(int argc, char **argv)
|
|
|
|
|
|
|
|
|
|
trigger = start = time(NULL); |
|
|
|
|
while (socketer_using_data || summariser_using_data || |
|
|
|
|
logger_using_data || listener_using_data || |
|
|
|
|
logger_using_data || plistener_using_data || |
|
|
|
|
clistener_using_data || blistener_using_data || |
|
|
|
|
marker_using_data) { |
|
|
|
|
msg = NULL; |
|
|
|
|
curr = time(NULL); |
|
|
|
@ -5951,12 +5926,14 @@ int main(int argc, char **argv)
|
|
|
|
|
} |
|
|
|
|
if (msg) { |
|
|
|
|
trigger = curr; |
|
|
|
|
printf("%s %ds due to%s%s%s%s%s\n", |
|
|
|
|
printf("%s %ds due to%s%s%s%s%s%s%s\n", |
|
|
|
|
msg, (int)(curr - start), |
|
|
|
|
socketer_using_data ? " socketer" : EMPTY, |
|
|
|
|
summariser_using_data ? " summariser" : EMPTY, |
|
|
|
|
logger_using_data ? " logger" : EMPTY, |
|
|
|
|
listener_using_data ? " listener" : EMPTY, |
|
|
|
|
plistener_using_data ? " plistener" : EMPTY, |
|
|
|
|
clistener_using_data ? " clistener" : EMPTY, |
|
|
|
|
blistener_using_data ? " blistener" : EMPTY, |
|
|
|
|
marker_using_data ? " marker" : EMPTY); |
|
|
|
|
fflush(stdout); |
|
|
|
|
} |
|
|
|
|