Browse Source

ckdb - separate socket data processing

master
kanoi 9 years ago
parent
commit
de3be722cd
  1. 432
      src/ckdb.c
  2. 7
      src/ckdb.h
  3. 10
      src/ckdb_cmd.c

432
src/ckdb.c

@ -112,7 +112,9 @@ static bool socketer_using_data;
static bool summariser_using_data;
static bool marker_using_data;
static bool logger_using_data;
static bool listener_using_data;
static bool plistener_using_data;
static bool clistener_using_data;
static bool blistener_using_data;
char *EMPTY = "";
const char *nullstr = "(null)";
@ -303,7 +305,9 @@ K_STORE *msgline_store;
// WORKQUEUE
K_LIST *workqueue_free;
K_STORE *workqueue_store;
K_STORE *pool_workqueue_store;
K_STORE *cmd_workqueue_store;
K_STORE *btc_workqueue_store;
mutex_t wq_waitlock;
pthread_cond_t wq_waitcond;
@ -1011,7 +1015,9 @@ static void alloc_storage()
workqueue_free = k_new_list("WorkQueue", sizeof(WORKQUEUE),
ALLOC_WORKQUEUE, LIMIT_WORKQUEUE, true);
workqueue_store = k_new_store(workqueue_free);
pool_workqueue_store = k_new_store(workqueue_free);
cmd_workqueue_store = k_new_store(workqueue_free);
btc_workqueue_store = k_new_store(workqueue_free);
heartbeatqueue_free = k_new_list("HeartBeatQueue",
sizeof(HEARTBEATQUEUE),
@ -1507,7 +1513,10 @@ static void dealloc_storage()
FREE_LIST(transfer);
FREE_LISTS(heartbeatqueue);
FREE_LISTS(workqueue);
FREE_STORE(pool_workqueue);
FREE_STORE(cmd_workqueue);
FREE_STORE(btc_workqueue);
FREE_LIST(workqueue);
FREE_LISTS(msgline);
if (free_mode != FREE_MODE_ALL)
@ -3924,33 +3933,140 @@ static void *logger(__maybe_unused void *arg)
return NULL;
}
#define STORELASTREPLY(_cmd) do { \
if (last_ ## _cmd) \
free(last_ ## _cmd); \
last_ ## _cmd = buf; \
buf = NULL; \
if (reply_ ## _cmd) \
free(reply_ ## _cmd); \
reply_ ## _cmd = rep; \
} while (0)
static void process_sockd(PGconn *conn, K_ITEM *wq_item)
{
WORKQUEUE *workqueue;
MSGLINE *msgline;
K_ITEM *ml_item;
char *ans, *rep;
size_t siz;
DATA_WORKQUEUE(workqueue, wq_item);
ml_item = workqueue->msgline_item;
DATA_MSGLINE(msgline, ml_item);
ans = ckdb_cmds[msgline->which_cmds].func(conn,
msgline->cmd,
msgline->id,
&(msgline->now),
workqueue->by,
workqueue->code,
workqueue->inet,
&(msgline->cd),
msgline->trf_root);
siz = strlen(ans) + strlen(msgline->id) + 32;
rep = malloc(siz);
snprintf(rep, siz, "%s.%ld.%s",
msgline->id,
msgline->now.tv_sec, ans);
send_unix_msg(msgline->sockd, rep);
FREENULL(ans);
FREENULL(rep);
close(msgline->sockd);
free_msgline_data(ml_item, true, true);
K_WLOCK(msgline_free);
k_add_head(msgline_free, ml_item);
K_WUNLOCK(msgline_free);
K_WLOCK(workqueue_free);
k_add_head(workqueue_free, wq_item);
if (workqueue_free->count == workqueue_free->total &&
workqueue_free->total >= ALLOC_WORKQUEUE * CULL_WORKQUEUE)
k_cull_list(workqueue_free);
K_WUNLOCK(workqueue_free);
}
static void *clistener(__maybe_unused void *arg)
{
PGconn *conn = NULL;
K_ITEM *wq_item;
time_t now;
LOCK_INIT("db_clistener");
rename_proc("db_clistener");
clistener_using_data = true;
conn = dbconnect();
now = time(NULL);
while (!everyone_die) {
K_WLOCK(workqueue_free);
wq_item = k_unlink_head(cmd_workqueue_store);
K_WUNLOCK(workqueue_free);
// Don't keep a connection for more than ~10s
if ((time(NULL) - now) > 10) {
PQfinish(conn);
conn = dbconnect();
now = time(NULL);
}
if (wq_item) {
process_sockd(conn, wq_item);
tick();
}
cksleep_ms(42);
}
clistener_using_data = false;
if (conn)
PQfinish(conn);
return NULL;
}
static void *blistener(__maybe_unused void *arg)
{
PGconn *conn = NULL;
K_ITEM *wq_item;
time_t now;
LOCK_INIT("db_blistener");
rename_proc("db_blistener");
blistener_using_data = true;
conn = dbconnect();
now = time(NULL);
while (!everyone_die) {
K_WLOCK(workqueue_free);
wq_item = k_unlink_head(btc_workqueue_store);
K_WUNLOCK(workqueue_free);
// Don't keep a connection for more than ~10s
if ((time(NULL) - now) > 10) {
PQfinish(conn);
conn = dbconnect();
now = time(NULL);
}
if (wq_item) {
process_sockd(conn, wq_item);
tick();
}
cksleep_ms(142);
}
blistener_using_data = false;
if (conn)
PQfinish(conn);
return NULL;
}
static void *socketer(__maybe_unused void *arg)
{
proc_instance_t *pi = (proc_instance_t *)arg;
pthread_t clis_pt, blis_pt;
unixsock_t *us = &pi->us;
char *end, *ans = NULL, *rep = NULL, *buf = NULL, *dot;
// No dup check for pool stats, the SEQ code will handle that
char *last_chkpass = NULL, *reply_chkpass = NULL;
char *last_adduser = NULL, *reply_adduser = NULL;
char *last_newpass = NULL, *reply_newpass = NULL;
char *last_userset = NULL, *reply_userset = NULL;
char *last_workerset = NULL, *reply_workerset = NULL;
char *last_newid = NULL, *reply_newid = NULL;
char *last_setatts = NULL, *reply_setatts = NULL;
char *last_setopts = NULL, *reply_setopts = NULL;
char *last_userstatus = NULL, *reply_userstatus = NULL;
char *last_web = NULL, *reply_web = NULL;
char *reply_last, duptype[CMD_SIZ+1];
char *end, *ans = NULL, *rep = NULL, *buf = NULL;
enum cmd_values cmdnum;
int sockd;
K_ITEM *wq_item = NULL, *ml_item = NULL;
@ -3959,7 +4075,7 @@ static void *socketer(__maybe_unused void *arg)
char reply[1024+1];
size_t siz;
tv_t now;
bool dup, want_first, show_dup, replied;
bool want_first, replied, btc;
int loglevel, oldloglevel;
pthread_detach(pthread_self());
@ -3973,6 +4089,10 @@ static void *socketer(__maybe_unused void *arg)
if (!everyone_die) {
LOGWARNING("%s() Start processing...", __func__);
socketer_using_data = true;
create_pthread(&clis_pt, clistener, NULL);
create_pthread(&blis_pt, blistener, NULL);
}
want_first = true;
@ -3981,7 +4101,7 @@ static void *socketer(__maybe_unused void *arg)
dealloc(buf);
sockd = accept(us->sockd, NULL, NULL);
if (sockd < 0) {
LOGERR("Failed to accept on socket in listener");
LOGERR("%s() Failed to accept on socket", __func__);
break;
}
@ -3999,83 +4119,16 @@ static void *socketer(__maybe_unused void *arg)
if (!buf || !*buf) {
// An empty message wont get a reply
if (!buf)
LOGWARNING("Failed to get message in listener");
else
LOGWARNING("Empty message in listener");
} else {
/* For duplicates:
* Queued pool messages are handled by the queue code
* but since they reply ok.queued that message can
* be returned every time here
* System: repeat process them
* Web: current php web sends a timestamp of seconds
* so duplicate code will only trigger if the same
* message is sent within the same second and thus
* will effectively reduce the processing load for
* sequential duplicates
* As per the 'if' list below,
* remember individual last messages and replies and
* repeat the reply without reprocessing the message
* The rest are remembered in the same buffer 'web'
* so a duplicate will not be seen if another 'web'
* command arrived between two duplicate commands
*/
dup = false;
show_dup = true;
// These are ordered approximately most likely first
if (last_chkpass && strcmp(last_chkpass, buf) == 0) {
reply_last = reply_chkpass;
dup = true;
} else if (last_adduser && strcmp(last_adduser, buf) == 0) {
reply_last = reply_adduser;
dup = true;
} else if (last_newpass && strcmp(last_newpass, buf) == 0) {
reply_last = reply_newpass;
dup = true;
} else if (last_newid && strcmp(last_newid, buf) == 0) {
reply_last = reply_newid;
dup = true;
} else if (last_userset && strcmp(last_userset, buf) == 0) {
reply_last = reply_userset;
dup = true;
} else if (last_workerset && strcmp(last_workerset, buf) == 0) {
reply_last = reply_workerset;
dup = true;
} else if (last_setatts && strcmp(last_setatts, buf) == 0) {
reply_last = reply_setatts;
dup = true;
} else if (last_setopts && strcmp(last_setopts, buf) == 0) {
reply_last = reply_setopts;
dup = true;
} else if (last_userstatus && strcmp(last_userstatus, buf) == 0) {
reply_last = reply_userstatus;
dup = true;
} else if (last_web && strcmp(last_web, buf) == 0) {
reply_last = reply_web;
dup = true;
show_dup = false;
}
if (dup) {
send_unix_msg(sockd, reply_last);
STRNCPY(duptype, buf);
dot = strchr(duptype, '.');
if (dot)
*dot = '\0';
snprintf(reply, sizeof(reply), "%s%ld,%ld.%s",
LOGDUP, now.tv_sec, now.tv_usec, duptype);
// dup cant be pool
LOGQUE(reply, false);
if (show_dup)
LOGWARNING("Duplicate '%s' message received", duptype);
LOGWARNING("%s() Failed to get message", __func__);
else
LOGDEBUG("Duplicate '%s' message received", duptype);
LOGWARNING("%s() Empty message", __func__);
} else {
int seqentryflags = SE_SOCKET;
if (!reload_queue_complete)
seqentryflags = SE_EARLYSOCK;
cmdnum = breakdown(&ml_item, buf, &now, seqentryflags);
DATA_MSGLINE(msgline, ml_item);
replied = false;
replied = btc = false;
switch (cmdnum) {
case CMD_REPLY:
snprintf(reply, sizeof(reply),
@ -4170,11 +4223,13 @@ static void *socketer(__maybe_unused void *arg)
if (global_ckp && global_ckp->logfd)
fflush(global_ckp->logfp);
break;
case CMD_USERSET:
case CMD_BTCSET:
btc = true;
case CMD_CHKPASS:
case CMD_2FA:
case CMD_ADDUSER:
case CMD_NEWPASS:
case CMD_USERSET:
case CMD_WORKERSET:
case CMD_GETATTS:
case CMD_SETATTS:
@ -4187,57 +4242,22 @@ static void *socketer(__maybe_unused void *arg)
case CMD_USERSTATUS:
case CMD_SHSTA:
case CMD_USERINFO:
case CMD_BTCSET:
case CMD_LOCKS:
ans = ckdb_cmds[msgline->which_cmds].func(NULL,
msgline->cmd,
msgline->id,
&(msgline->now),
by_default,
(char *)__func__,
inet_default,
&(msgline->cd),
msgline->trf_root);
siz = strlen(ans) + strlen(msgline->id) + 32;
rep = malloc(siz);
snprintf(rep, siz, "%s.%ld.%s",
msgline->id,
now.tv_sec, ans);
send_unix_msg(sockd, rep);
FREENULL(ans);
switch (cmdnum) {
case CMD_CHKPASS:
STORELASTREPLY(chkpass);
break;
case CMD_ADDUSER:
STORELASTREPLY(adduser);
break;
case CMD_NEWPASS:
STORELASTREPLY(newpass);
break;
case CMD_USERSET:
STORELASTREPLY(userset);
break;
case CMD_WORKERSET:
STORELASTREPLY(workerset);
break;
case CMD_NEWID:
STORELASTREPLY(newid);
break;
case CMD_SETATTS:
STORELASTREPLY(setatts);
break;
case CMD_SETOPTS:
STORELASTREPLY(setopts);
break;
case CMD_USERSTATUS:
STORELASTREPLY(userstatus);
break;
// The rest
default:
free(rep);
}
rep = NULL;
msgline->sockd = sockd;
sockd = -1;
K_WLOCK(workqueue_free);
wq_item = k_unlink_head(workqueue_free);
DATA_WORKQUEUE(workqueue, wq_item);
workqueue->msgline_item = ml_item;
workqueue->by = by_default;
workqueue->code = (char *)__func__;
workqueue->inet = inet_default;
if (btc)
k_add_tail(btc_workqueue_store, wq_item);
else
k_add_tail(cmd_workqueue_store, wq_item);
K_WUNLOCK(workqueue_free);
wq_item = ml_item = NULL;
break;
// Process, but reject (loading) until startup_complete
case CMD_HOMEPAGE:
@ -4260,39 +4280,25 @@ static void *socketer(__maybe_unused void *arg)
msgline->cmd);
send_unix_msg(sockd, reply);
} else {
DATA_MSGLINE(msgline, ml_item);
ans = ckdb_cmds[msgline->which_cmds].func(NULL,
msgline->cmd,
msgline->id,
&(msgline->now),
by_default,
(char *)__func__,
inet_default,
&(msgline->cd),
msgline->trf_root);
siz = strlen(ans) + strlen(msgline->id) + 32;
rep = malloc(siz);
snprintf(rep, siz, "%s.%ld.%s",
msgline->id,
now.tv_sec, ans);
send_unix_msg(sockd, rep);
FREENULL(ans);
if (cmdnum == CMD_DSP)
free(rep);
else {
if (last_web)
free(last_web);
last_web = buf;
buf = NULL;
if (reply_web)
free(reply_web);
reply_web = rep;
}
rep = NULL;
msgline->sockd = sockd;
sockd = -1;
K_WLOCK(workqueue_free);
wq_item = k_unlink_head(workqueue_free);
DATA_WORKQUEUE(workqueue, wq_item);
workqueue->msgline_item = ml_item;
workqueue->by = by_default;
workqueue->code = (char *)__func__;
workqueue->inet = inet_default;
if (btc)
k_add_tail(btc_workqueue_store, wq_item);
else
k_add_tail(cmd_workqueue_store, wq_item);
K_WUNLOCK(workqueue_free);
wq_item = ml_item = NULL;
}
break;
/* Process, but reject (loading) until startup_complete
* and don't test for duplicates */
/* Process, but reject (loading) until
* startup_complete */
case CMD_MARKS:
case CMD_QUERY:
if (!startup_complete) {
@ -4303,24 +4309,21 @@ static void *socketer(__maybe_unused void *arg)
msgline->cmd);
send_unix_msg(sockd, reply);
} else {
DATA_MSGLINE(msgline, ml_item);
ans = ckdb_cmds[msgline->which_cmds].func(NULL,
msgline->cmd,
msgline->id,
&(msgline->now),
by_default,
(char *)__func__,
inet_default,
&(msgline->cd),
msgline->trf_root);
siz = strlen(ans) + strlen(msgline->id) + 32;
rep = malloc(siz);
snprintf(rep, siz, "%s.%ld.%s",
msgline->id,
now.tv_sec, ans);
send_unix_msg(sockd, rep);
FREENULL(ans);
FREENULL(rep);
msgline->sockd = sockd;
sockd = -1;
K_WLOCK(workqueue_free);
wq_item = k_unlink_head(workqueue_free);
DATA_WORKQUEUE(workqueue, wq_item);
workqueue->msgline_item = ml_item;
workqueue->by = by_default;
workqueue->code = (char *)__func__;
workqueue->inet = inet_default;
if (btc)
k_add_tail(btc_workqueue_store, wq_item);
else
k_add_tail(cmd_workqueue_store, wq_item);
K_WUNLOCK(workqueue_free);
wq_item = ml_item = NULL;
}
break;
// Always process immediately:
@ -4380,11 +4383,11 @@ static void *socketer(__maybe_unused void *arg)
workqueue->by = by_default;
workqueue->code = (char *)__func__;
workqueue->inet = inet_default;
k_add_tail(workqueue_store, wq_item);
k_add_tail(pool_workqueue_store, wq_item);
/* Stop the reload queue from growing too big
* Use a size that should be big enough */
if (reloading && workqueue_store->count > 250000) {
K_ITEM *wq2_item = k_unlink_head(workqueue_store);
if (reloading && pool_workqueue_store->count > 250000) {
K_ITEM *wq2_item = k_unlink_head(pool_workqueue_store);
K_WUNLOCK(workqueue_free);
WORKQUEUE *wq;
DATA_WORKQUEUE(wq, wq2_item);
@ -4415,7 +4418,7 @@ static void *socketer(__maybe_unused void *arg)
break;
}
}
}
if (sockd >= 0)
close(sockd);
if (ml_item) {
@ -4433,7 +4436,6 @@ static void *socketer(__maybe_unused void *arg)
if (buf)
dealloc(buf);
// TODO: if anyone cares, free all the dup buffers :P
close_unix_socket(us->sockd, us->path);
return NULL;
@ -4912,7 +4914,8 @@ static void *listener(void *arg)
K_ITEM *ss_item;
int i;
LOCK_INIT("db_listener");
LOCK_INIT("db_plistener");
rename_proc("db_plistener");
logqueue_free = k_new_list("LogQueue", sizeof(LOGQUEUE),
ALLOC_LOGQUEUE, LIMIT_LOGQUEUE, true);
@ -4930,9 +4933,7 @@ static void *listener(void *arg)
create_pthread(&mark_pt, marker, NULL);
rename_proc("db_listener");
listener_using_data = true;
plistener_using_data = true;
if (!setup_data()) {
if (!everyone_die) {
@ -4944,7 +4945,7 @@ static void *listener(void *arg)
if (!everyone_die) {
K_RLOCK(workqueue_free);
wqcount = workqueue_store->count;
wqcount = pool_workqueue_store->count;
K_RUNLOCK(workqueue_free);
LOGWARNING("reload shares OoO %s", ooo_status(ooo_buf, sizeof(ooo_buf)));
@ -4975,8 +4976,8 @@ static void *listener(void *arg)
// Process queued work
while (!everyone_die) {
K_WLOCK(workqueue_free);
wq_item = k_unlink_head(workqueue_store);
left = workqueue_store->count;
wq_item = k_unlink_head(pool_workqueue_store);
left = pool_workqueue_store->count;
K_WUNLOCK(workqueue_free);
if (left == 0 && wq_stt.tv_sec != 0L)
@ -5051,7 +5052,7 @@ static void *listener(void *arg)
sayonara:
listener_using_data = false;
plistener_using_data = false;
if (conn)
PQfinish(conn);
@ -5940,7 +5941,8 @@ int main(int argc, char **argv)
trigger = start = time(NULL);
while (socketer_using_data || summariser_using_data ||
logger_using_data || listener_using_data ||
logger_using_data || plistener_using_data ||
clistener_using_data || blistener_using_data ||
marker_using_data) {
msg = NULL;
curr = time(NULL);
@ -5953,12 +5955,14 @@ int main(int argc, char **argv)
}
if (msg) {
trigger = curr;
printf("%s %ds due to%s%s%s%s%s\n",
printf("%s %ds due to%s%s%s%s%s%s%s\n",
msg, (int)(curr - start),
socketer_using_data ? " socketer" : EMPTY,
summariser_using_data ? " summariser" : EMPTY,
logger_using_data ? " logger" : EMPTY,
listener_using_data ? " listener" : EMPTY,
plistener_using_data ? " plistener" : EMPTY,
clistener_using_data ? " clistener" : EMPTY,
blistener_using_data ? " blistener" : EMPTY,
marker_using_data ? " marker" : EMPTY);
fflush(stdout);
}

7
src/ckdb.h

@ -55,7 +55,7 @@
#define DB_VLOCK "1"
#define DB_VERSION "1.0.4"
#define CKDB_VERSION DB_VERSION"-1.622"
#define CKDB_VERSION DB_VERSION"-1.700"
#define WHERE_FFL " - from %s %s() line %d"
#define WHERE_FFL_HERE __FILE__, __func__, __LINE__
@ -765,6 +765,7 @@ typedef struct msgline {
char *code;
K_TREE *trf_root;
K_STORE *trf_store;
int sockd;
} MSGLINE;
#define ALLOC_MSGLINE 8192
@ -792,7 +793,9 @@ typedef struct workqueue {
#define DATA_WORKQUEUE(_var, _item) DATA_GENERIC(_var, _item, workqueue, true)
extern K_LIST *workqueue_free;
extern K_STORE *workqueue_store;
extern K_STORE *pool_workqueue_store;
extern K_STORE *cmd_workqueue_store;
extern K_STORE *btc_workqueue_store;
extern mutex_t wq_waitlock;
extern pthread_cond_t wq_waitcond;

10
src/ckdb_cmd.c

@ -3304,7 +3304,10 @@ static char *cmd_homepage(__maybe_unused PGconn *conn, char *cmd, char *id,
}
// Don't bother with locking - it's just an FYI web stat
snprintf(tmp, sizeof(tmp), "sync=%d%c", workqueue_store->count, FLDSEP);
int sync = pool_workqueue_store->count;
sync += cmd_workqueue_store->count;
sync += btc_workqueue_store->count;
snprintf(tmp, sizeof(tmp), "sync=%d%c", sync, FLDSEP);
APPEND_REALLOC(buf, off, len, tmp);
u_item = NULL;
@ -4558,7 +4561,6 @@ static char *cmd_pplns2(__maybe_unused PGconn *conn, char *cmd, char *id,
b_item = find_after_in_ktree(blocks_root, &b_look, b_ctx);
K_RUNLOCK(blocks_free);
if (!b_item) {
K_RUNLOCK(blocks_free);
snprintf(reply, siz, "ERR.no block height >= %"PRId32, height);
return strdup(reply);
}
@ -5684,7 +5686,7 @@ static char *cmd_stats(__maybe_unused PGconn *conn, char *cmd, char *id,
USEINFO(workerstatus, 1, 1);
USEINFO(userinfo, 1, 1);
USEINFO(msgline, 1, 0);
USEINFO(workqueue, 1, 0);
USEINFO(workqueue, 3, 0);
USEINFO(transfer, 0, 0);
USEINFO(heartbeatqueue, 1, 0);
USEINFO(logqueue, 1, 0);
@ -6603,7 +6605,7 @@ static char *cmd_userinfo(__maybe_unused PGconn *conn, char *cmd, char *id,
* You must supply the btcserver to change anything
* The format for userpass is username:password
* If you don't supply the btcserver it will simply report the current server
* If supply btcserver but not the userpass it will use the current userpass
* If you supply btcserver but not the userpass it will use the current userpass
* The reply will ONLY contain the URL, not the user/pass */
static char *cmd_btcset(__maybe_unused PGconn *conn, char *cmd, char *id,
__maybe_unused tv_t *now, __maybe_unused char *by,

Loading…
Cancel
Save