Browse Source

Merge branch 'master' of bitbucket.org:ckolivas/ckpool

master
Con Kolivas 9 years ago
parent
commit
ac296e63d8
  1. 4
      pool/page_reset.php
  2. 427
      src/ckdb.c
  3. 7
      src/ckdb.h
  4. 17
      src/ckdb_cmd.c
  5. 19
      src/ckdb_data.c
  6. 2
      src/ktree.c

4
pool/page_reset.php

@ -88,9 +88,9 @@ function dbreset()
if ($emailinfo['STATUS'] != 'ok') if ($emailinfo['STATUS'] != 'ok')
syserror(); syserror();
$ans = resetPass($user, $pass); $ans = resetPass($user, $pass, $twofa);
if ($ans['STATUS'] != 'ok') if ($ans['STATUS'] != 'ok')
syserror(); return resetfail();
unset($_SESSION['reset_user']); unset($_SESSION['reset_user']);
unset($_SESSION['reset_hash']); unset($_SESSION['reset_hash']);

427
src/ckdb.c

@ -112,7 +112,9 @@ static bool socketer_using_data;
static bool summariser_using_data; static bool summariser_using_data;
static bool marker_using_data; static bool marker_using_data;
static bool logger_using_data; static bool logger_using_data;
static bool listener_using_data; static bool plistener_using_data;
static bool clistener_using_data;
static bool blistener_using_data;
char *EMPTY = ""; char *EMPTY = "";
const char *nullstr = "(null)"; const char *nullstr = "(null)";
@ -303,7 +305,9 @@ K_STORE *msgline_store;
// WORKQUEUE // WORKQUEUE
K_LIST *workqueue_free; K_LIST *workqueue_free;
K_STORE *workqueue_store; K_STORE *pool_workqueue_store;
K_STORE *cmd_workqueue_store;
K_STORE *btc_workqueue_store;
mutex_t wq_waitlock; mutex_t wq_waitlock;
pthread_cond_t wq_waitcond; pthread_cond_t wq_waitcond;
@ -1011,7 +1015,9 @@ static void alloc_storage()
workqueue_free = k_new_list("WorkQueue", sizeof(WORKQUEUE), workqueue_free = k_new_list("WorkQueue", sizeof(WORKQUEUE),
ALLOC_WORKQUEUE, LIMIT_WORKQUEUE, true); ALLOC_WORKQUEUE, LIMIT_WORKQUEUE, true);
workqueue_store = k_new_store(workqueue_free); pool_workqueue_store = k_new_store(workqueue_free);
cmd_workqueue_store = k_new_store(workqueue_free);
btc_workqueue_store = k_new_store(workqueue_free);
heartbeatqueue_free = k_new_list("HeartBeatQueue", heartbeatqueue_free = k_new_list("HeartBeatQueue",
sizeof(HEARTBEATQUEUE), sizeof(HEARTBEATQUEUE),
@ -1507,7 +1513,10 @@ static void dealloc_storage()
FREE_LIST(transfer); FREE_LIST(transfer);
FREE_LISTS(heartbeatqueue); FREE_LISTS(heartbeatqueue);
FREE_LISTS(workqueue); FREE_STORE(pool_workqueue);
FREE_STORE(cmd_workqueue);
FREE_STORE(btc_workqueue);
FREE_LIST(workqueue);
FREE_LISTS(msgline); FREE_LISTS(msgline);
if (free_mode != FREE_MODE_ALL) if (free_mode != FREE_MODE_ALL)
@ -2877,7 +2886,9 @@ static enum cmd_values breakdown(K_ITEM **ml_item, char *buf, tv_t *now,
if (find_in_ktree_nolock(msgline->trf_root, t_item, ctx)) { if (find_in_ktree_nolock(msgline->trf_root, t_item, ctx)) {
if (transfer->mvalue != transfer->svalue) if (transfer->mvalue != transfer->svalue)
FREENULL(transfer->mvalue); FREENULL(transfer->mvalue);
K_WLOCK(transfer_free);
k_add_head(transfer_free, t_item); k_add_head(transfer_free, t_item);
K_WUNLOCK(transfer_free);
} else { } else {
add_to_ktree_nolock(msgline->trf_root, t_item); add_to_ktree_nolock(msgline->trf_root, t_item);
k_add_head_nolock(msgline->trf_store, t_item); k_add_head_nolock(msgline->trf_store, t_item);
@ -3922,33 +3933,138 @@ static void *logger(__maybe_unused void *arg)
return NULL; return NULL;
} }
#define STORELASTREPLY(_cmd) do { \ static void process_sockd(PGconn *conn, K_ITEM *wq_item)
if (last_ ## _cmd) \ {
free(last_ ## _cmd); \ WORKQUEUE *workqueue;
last_ ## _cmd = buf; \ MSGLINE *msgline;
buf = NULL; \ K_ITEM *ml_item;
if (reply_ ## _cmd) \ char *ans, *rep;
free(reply_ ## _cmd); \ size_t siz;
reply_ ## _cmd = rep; \
} while (0) DATA_WORKQUEUE(workqueue, wq_item);
ml_item = workqueue->msgline_item;
DATA_MSGLINE(msgline, ml_item);
ans = ckdb_cmds[msgline->which_cmds].func(conn,
msgline->cmd,
msgline->id,
&(msgline->now),
workqueue->by,
workqueue->code,
workqueue->inet,
&(msgline->cd),
msgline->trf_root);
siz = strlen(ans) + strlen(msgline->id) + 32;
rep = malloc(siz);
snprintf(rep, siz, "%s.%ld.%s",
msgline->id,
msgline->now.tv_sec, ans);
send_unix_msg(msgline->sockd, rep);
close(msgline->sockd);
FREENULL(ans);
FREENULL(rep);
free_msgline_data(ml_item, true, true);
K_WLOCK(msgline_free);
k_add_head(msgline_free, ml_item);
K_WUNLOCK(msgline_free);
K_WLOCK(workqueue_free);
k_add_head(workqueue_free, wq_item);
if (workqueue_free->count == workqueue_free->total &&
workqueue_free->total >= ALLOC_WORKQUEUE * CULL_WORKQUEUE)
k_cull_list(workqueue_free);
K_WUNLOCK(workqueue_free);
}
static void *clistener(__maybe_unused void *arg)
{
PGconn *conn = NULL;
K_ITEM *wq_item;
time_t now;
LOCK_INIT("db_clistener");
rename_proc("db_clistener");
clistener_using_data = true;
conn = dbconnect();
now = time(NULL);
while (!everyone_die) {
K_WLOCK(workqueue_free);
wq_item = k_unlink_head(cmd_workqueue_store);
K_WUNLOCK(workqueue_free);
// Don't keep a connection for more than ~10s
if ((time(NULL) - now) > 10) {
PQfinish(conn);
conn = dbconnect();
now = time(NULL);
}
if (wq_item) {
process_sockd(conn, wq_item);
tick();
} else
cksleep_ms(42);
}
clistener_using_data = false;
if (conn)
PQfinish(conn);
return NULL;
}
static void *blistener(__maybe_unused void *arg)
{
PGconn *conn = NULL;
K_ITEM *wq_item;
time_t now;
LOCK_INIT("db_blistener");
rename_proc("db_blistener");
blistener_using_data = true;
conn = dbconnect();
now = time(NULL);
while (!everyone_die) {
K_WLOCK(workqueue_free);
wq_item = k_unlink_head(btc_workqueue_store);
K_WUNLOCK(workqueue_free);
// Don't keep a connection for more than ~10s
if ((time(NULL) - now) > 10) {
PQfinish(conn);
conn = dbconnect();
now = time(NULL);
}
if (wq_item) {
process_sockd(conn, wq_item);
tick();
} else
cksleep_ms(142);
}
blistener_using_data = false;
if (conn)
PQfinish(conn);
return NULL;
}
static void *socketer(__maybe_unused void *arg) static void *socketer(__maybe_unused void *arg)
{ {
proc_instance_t *pi = (proc_instance_t *)arg; proc_instance_t *pi = (proc_instance_t *)arg;
pthread_t clis_pt, blis_pt;
unixsock_t *us = &pi->us; unixsock_t *us = &pi->us;
char *end, *ans = NULL, *rep = NULL, *buf = NULL, *dot; char *end, *ans = NULL, *rep = NULL, *buf = NULL;
// No dup check for pool stats, the SEQ code will handle that
char *last_chkpass = NULL, *reply_chkpass = NULL;
char *last_adduser = NULL, *reply_adduser = NULL;
char *last_newpass = NULL, *reply_newpass = NULL;
char *last_userset = NULL, *reply_userset = NULL;
char *last_workerset = NULL, *reply_workerset = NULL;
char *last_newid = NULL, *reply_newid = NULL;
char *last_setatts = NULL, *reply_setatts = NULL;
char *last_setopts = NULL, *reply_setopts = NULL;
char *last_userstatus = NULL, *reply_userstatus = NULL;
char *last_web = NULL, *reply_web = NULL;
char *reply_last, duptype[CMD_SIZ+1];
enum cmd_values cmdnum; enum cmd_values cmdnum;
int sockd; int sockd;
K_ITEM *wq_item = NULL, *ml_item = NULL; K_ITEM *wq_item = NULL, *ml_item = NULL;
@ -3957,7 +4073,7 @@ static void *socketer(__maybe_unused void *arg)
char reply[1024+1]; char reply[1024+1];
size_t siz; size_t siz;
tv_t now; tv_t now;
bool dup, want_first, show_dup, replied; bool want_first, replied, btc;
int loglevel, oldloglevel; int loglevel, oldloglevel;
pthread_detach(pthread_self()); pthread_detach(pthread_self());
@ -3971,6 +4087,10 @@ static void *socketer(__maybe_unused void *arg)
if (!everyone_die) { if (!everyone_die) {
LOGWARNING("%s() Start processing...", __func__); LOGWARNING("%s() Start processing...", __func__);
socketer_using_data = true; socketer_using_data = true;
create_pthread(&clis_pt, clistener, NULL);
create_pthread(&blis_pt, blistener, NULL);
} }
want_first = true; want_first = true;
@ -3979,7 +4099,7 @@ static void *socketer(__maybe_unused void *arg)
dealloc(buf); dealloc(buf);
sockd = accept(us->sockd, NULL, NULL); sockd = accept(us->sockd, NULL, NULL);
if (sockd < 0) { if (sockd < 0) {
LOGERR("Failed to accept on socket in listener"); LOGERR("%s() Failed to accept on socket", __func__);
break; break;
} }
@ -3997,83 +4117,16 @@ static void *socketer(__maybe_unused void *arg)
if (!buf || !*buf) { if (!buf || !*buf) {
// An empty message wont get a reply // An empty message wont get a reply
if (!buf) if (!buf)
LOGWARNING("Failed to get message in listener"); LOGWARNING("%s() Failed to get message", __func__);
else
LOGWARNING("Empty message in listener");
} else {
/* For duplicates:
* Queued pool messages are handled by the queue code
* but since they reply ok.queued that message can
* be returned every time here
* System: repeat process them
* Web: current php web sends a timestamp of seconds
* so duplicate code will only trigger if the same
* message is sent within the same second and thus
* will effectively reduce the processing load for
* sequential duplicates
* As per the 'if' list below,
* remember individual last messages and replies and
* repeat the reply without reprocessing the message
* The rest are remembered in the same buffer 'web'
* so a duplicate will not be seen if another 'web'
* command arrived between two duplicate commands
*/
dup = false;
show_dup = true;
// These are ordered approximately most likely first
if (last_chkpass && strcmp(last_chkpass, buf) == 0) {
reply_last = reply_chkpass;
dup = true;
} else if (last_adduser && strcmp(last_adduser, buf) == 0) {
reply_last = reply_adduser;
dup = true;
} else if (last_newpass && strcmp(last_newpass, buf) == 0) {
reply_last = reply_newpass;
dup = true;
} else if (last_newid && strcmp(last_newid, buf) == 0) {
reply_last = reply_newid;
dup = true;
} else if (last_userset && strcmp(last_userset, buf) == 0) {
reply_last = reply_userset;
dup = true;
} else if (last_workerset && strcmp(last_workerset, buf) == 0) {
reply_last = reply_workerset;
dup = true;
} else if (last_setatts && strcmp(last_setatts, buf) == 0) {
reply_last = reply_setatts;
dup = true;
} else if (last_setopts && strcmp(last_setopts, buf) == 0) {
reply_last = reply_setopts;
dup = true;
} else if (last_userstatus && strcmp(last_userstatus, buf) == 0) {
reply_last = reply_userstatus;
dup = true;
} else if (last_web && strcmp(last_web, buf) == 0) {
reply_last = reply_web;
dup = true;
show_dup = false;
}
if (dup) {
send_unix_msg(sockd, reply_last);
STRNCPY(duptype, buf);
dot = strchr(duptype, '.');
if (dot)
*dot = '\0';
snprintf(reply, sizeof(reply), "%s%ld,%ld.%s",
LOGDUP, now.tv_sec, now.tv_usec, duptype);
// dup cant be pool
LOGQUE(reply, false);
if (show_dup)
LOGWARNING("Duplicate '%s' message received", duptype);
else else
LOGDEBUG("Duplicate '%s' message received", duptype); LOGWARNING("%s() Empty message", __func__);
} else { } else {
int seqentryflags = SE_SOCKET; int seqentryflags = SE_SOCKET;
if (!reload_queue_complete) if (!reload_queue_complete)
seqentryflags = SE_EARLYSOCK; seqentryflags = SE_EARLYSOCK;
cmdnum = breakdown(&ml_item, buf, &now, seqentryflags); cmdnum = breakdown(&ml_item, buf, &now, seqentryflags);
DATA_MSGLINE(msgline, ml_item); DATA_MSGLINE(msgline, ml_item);
replied = false; replied = btc = false;
switch (cmdnum) { switch (cmdnum) {
case CMD_REPLY: case CMD_REPLY:
snprintf(reply, sizeof(reply), snprintf(reply, sizeof(reply),
@ -4168,11 +4221,13 @@ static void *socketer(__maybe_unused void *arg)
if (global_ckp && global_ckp->logfd) if (global_ckp && global_ckp->logfd)
fflush(global_ckp->logfp); fflush(global_ckp->logfp);
break; break;
case CMD_USERSET:
case CMD_BTCSET:
btc = true;
case CMD_CHKPASS: case CMD_CHKPASS:
case CMD_2FA: case CMD_2FA:
case CMD_ADDUSER: case CMD_ADDUSER:
case CMD_NEWPASS: case CMD_NEWPASS:
case CMD_USERSET:
case CMD_WORKERSET: case CMD_WORKERSET:
case CMD_GETATTS: case CMD_GETATTS:
case CMD_SETATTS: case CMD_SETATTS:
@ -4185,57 +4240,22 @@ static void *socketer(__maybe_unused void *arg)
case CMD_USERSTATUS: case CMD_USERSTATUS:
case CMD_SHSTA: case CMD_SHSTA:
case CMD_USERINFO: case CMD_USERINFO:
case CMD_BTCSET:
case CMD_LOCKS: case CMD_LOCKS:
ans = ckdb_cmds[msgline->which_cmds].func(NULL, msgline->sockd = sockd;
msgline->cmd, sockd = -1;
msgline->id, K_WLOCK(workqueue_free);
&(msgline->now), wq_item = k_unlink_head(workqueue_free);
by_default, DATA_WORKQUEUE(workqueue, wq_item);
(char *)__func__, workqueue->msgline_item = ml_item;
inet_default, workqueue->by = by_default;
&(msgline->cd), workqueue->code = (char *)__func__;
msgline->trf_root); workqueue->inet = inet_default;
siz = strlen(ans) + strlen(msgline->id) + 32; if (btc)
rep = malloc(siz); k_add_tail(btc_workqueue_store, wq_item);
snprintf(rep, siz, "%s.%ld.%s", else
msgline->id, k_add_tail(cmd_workqueue_store, wq_item);
now.tv_sec, ans); K_WUNLOCK(workqueue_free);
send_unix_msg(sockd, rep); wq_item = ml_item = NULL;
FREENULL(ans);
switch (cmdnum) {
case CMD_CHKPASS:
STORELASTREPLY(chkpass);
break;
case CMD_ADDUSER:
STORELASTREPLY(adduser);
break;
case CMD_NEWPASS:
STORELASTREPLY(newpass);
break;
case CMD_USERSET:
STORELASTREPLY(userset);
break;
case CMD_WORKERSET:
STORELASTREPLY(workerset);
break;
case CMD_NEWID:
STORELASTREPLY(newid);
break;
case CMD_SETATTS:
STORELASTREPLY(setatts);
break;
case CMD_SETOPTS:
STORELASTREPLY(setopts);
break;
case CMD_USERSTATUS:
STORELASTREPLY(userstatus);
break;
// The rest
default:
free(rep);
}
rep = NULL;
break; break;
// Process, but reject (loading) until startup_complete // Process, but reject (loading) until startup_complete
case CMD_HOMEPAGE: case CMD_HOMEPAGE:
@ -4250,47 +4270,6 @@ static void *socketer(__maybe_unused void *arg)
case CMD_PSHIFT: case CMD_PSHIFT:
case CMD_DSP: case CMD_DSP:
case CMD_BLOCKSTATUS: case CMD_BLOCKSTATUS:
if (!startup_complete) {
snprintf(reply, sizeof(reply),
"%s.%ld.loading.%s",
msgline->id,
now.tv_sec,
msgline->cmd);
send_unix_msg(sockd, reply);
} else {
DATA_MSGLINE(msgline, ml_item);
ans = ckdb_cmds[msgline->which_cmds].func(NULL,
msgline->cmd,
msgline->id,
&(msgline->now),
by_default,
(char *)__func__,
inet_default,
&(msgline->cd),
msgline->trf_root);
siz = strlen(ans) + strlen(msgline->id) + 32;
rep = malloc(siz);
snprintf(rep, siz, "%s.%ld.%s",
msgline->id,
now.tv_sec, ans);
send_unix_msg(sockd, rep);
FREENULL(ans);
if (cmdnum == CMD_DSP)
free(rep);
else {
if (last_web)
free(last_web);
last_web = buf;
buf = NULL;
if (reply_web)
free(reply_web);
reply_web = rep;
}
rep = NULL;
}
break;
/* Process, but reject (loading) until startup_complete
* and don't test for duplicates */
case CMD_MARKS: case CMD_MARKS:
case CMD_QUERY: case CMD_QUERY:
if (!startup_complete) { if (!startup_complete) {
@ -4301,24 +4280,21 @@ static void *socketer(__maybe_unused void *arg)
msgline->cmd); msgline->cmd);
send_unix_msg(sockd, reply); send_unix_msg(sockd, reply);
} else { } else {
DATA_MSGLINE(msgline, ml_item); msgline->sockd = sockd;
ans = ckdb_cmds[msgline->which_cmds].func(NULL, sockd = -1;
msgline->cmd, K_WLOCK(workqueue_free);
msgline->id, wq_item = k_unlink_head(workqueue_free);
&(msgline->now), DATA_WORKQUEUE(workqueue, wq_item);
by_default, workqueue->msgline_item = ml_item;
(char *)__func__, workqueue->by = by_default;
inet_default, workqueue->code = (char *)__func__;
&(msgline->cd), workqueue->inet = inet_default;
msgline->trf_root); if (btc)
siz = strlen(ans) + strlen(msgline->id) + 32; k_add_tail(btc_workqueue_store, wq_item);
rep = malloc(siz); else
snprintf(rep, siz, "%s.%ld.%s", k_add_tail(cmd_workqueue_store, wq_item);
msgline->id, K_WUNLOCK(workqueue_free);
now.tv_sec, ans); wq_item = ml_item = NULL;
send_unix_msg(sockd, rep);
FREENULL(ans);
FREENULL(rep);
} }
break; break;
// Always process immediately: // Always process immediately:
@ -4378,11 +4354,11 @@ static void *socketer(__maybe_unused void *arg)
workqueue->by = by_default; workqueue->by = by_default;
workqueue->code = (char *)__func__; workqueue->code = (char *)__func__;
workqueue->inet = inet_default; workqueue->inet = inet_default;
k_add_tail(workqueue_store, wq_item); k_add_tail(pool_workqueue_store, wq_item);
/* Stop the reload queue from growing too big /* Stop the reload queue from growing too big
* Use a size that should be big enough */ * Use a size that should be big enough */
if (reloading && workqueue_store->count > 250000) { if (reloading && pool_workqueue_store->count > 250000) {
K_ITEM *wq2_item = k_unlink_head(workqueue_store); K_ITEM *wq2_item = k_unlink_head(pool_workqueue_store);
K_WUNLOCK(workqueue_free); K_WUNLOCK(workqueue_free);
WORKQUEUE *wq; WORKQUEUE *wq;
DATA_WORKQUEUE(wq, wq2_item); DATA_WORKQUEUE(wq, wq2_item);
@ -4395,7 +4371,7 @@ static void *socketer(__maybe_unused void *arg)
k_add_head(workqueue_free, wq2_item); k_add_head(workqueue_free, wq2_item);
} }
K_WUNLOCK(workqueue_free); K_WUNLOCK(workqueue_free);
ml_item = NULL; wq_item = ml_item = NULL;
mutex_lock(&wq_waitlock); mutex_lock(&wq_waitlock);
pthread_cond_signal(&wq_waitcond); pthread_cond_signal(&wq_waitcond);
mutex_unlock(&wq_waitlock); mutex_unlock(&wq_waitlock);
@ -4413,7 +4389,7 @@ static void *socketer(__maybe_unused void *arg)
break; break;
} }
} }
} if (sockd >= 0)
close(sockd); close(sockd);
if (ml_item) { if (ml_item) {
@ -4431,7 +4407,6 @@ static void *socketer(__maybe_unused void *arg)
if (buf) if (buf)
dealloc(buf); dealloc(buf);
// TODO: if anyone cares, free all the dup buffers :P
close_unix_socket(us->sockd, us->path); close_unix_socket(us->sockd, us->path);
return NULL; return NULL;
@ -4910,7 +4885,8 @@ static void *listener(void *arg)
K_ITEM *ss_item; K_ITEM *ss_item;
int i; int i;
LOCK_INIT("db_listener"); LOCK_INIT("db_plistener");
rename_proc("db_plistener");
logqueue_free = k_new_list("LogQueue", sizeof(LOGQUEUE), logqueue_free = k_new_list("LogQueue", sizeof(LOGQUEUE),
ALLOC_LOGQUEUE, LIMIT_LOGQUEUE, true); ALLOC_LOGQUEUE, LIMIT_LOGQUEUE, true);
@ -4928,9 +4904,7 @@ static void *listener(void *arg)
create_pthread(&mark_pt, marker, NULL); create_pthread(&mark_pt, marker, NULL);
rename_proc("db_listener"); plistener_using_data = true;
listener_using_data = true;
if (!setup_data()) { if (!setup_data()) {
if (!everyone_die) { if (!everyone_die) {
@ -4942,7 +4916,7 @@ static void *listener(void *arg)
if (!everyone_die) { if (!everyone_die) {
K_RLOCK(workqueue_free); K_RLOCK(workqueue_free);
wqcount = workqueue_store->count; wqcount = pool_workqueue_store->count;
K_RUNLOCK(workqueue_free); K_RUNLOCK(workqueue_free);
LOGWARNING("reload shares OoO %s", ooo_status(ooo_buf, sizeof(ooo_buf))); LOGWARNING("reload shares OoO %s", ooo_status(ooo_buf, sizeof(ooo_buf)));
@ -4973,8 +4947,8 @@ static void *listener(void *arg)
// Process queued work // Process queued work
while (!everyone_die) { while (!everyone_die) {
K_WLOCK(workqueue_free); K_WLOCK(workqueue_free);
wq_item = k_unlink_head(workqueue_store); wq_item = k_unlink_head(pool_workqueue_store);
left = workqueue_store->count; left = pool_workqueue_store->count;
K_WUNLOCK(workqueue_free); K_WUNLOCK(workqueue_free);
if (left == 0 && wq_stt.tv_sec != 0L) if (left == 0 && wq_stt.tv_sec != 0L)
@ -5049,7 +5023,7 @@ static void *listener(void *arg)
sayonara: sayonara:
listener_using_data = false; plistener_using_data = false;
if (conn) if (conn)
PQfinish(conn); PQfinish(conn);
@ -5938,7 +5912,8 @@ int main(int argc, char **argv)
trigger = start = time(NULL); trigger = start = time(NULL);
while (socketer_using_data || summariser_using_data || while (socketer_using_data || summariser_using_data ||
logger_using_data || listener_using_data || logger_using_data || plistener_using_data ||
clistener_using_data || blistener_using_data ||
marker_using_data) { marker_using_data) {
msg = NULL; msg = NULL;
curr = time(NULL); curr = time(NULL);
@ -5951,12 +5926,14 @@ int main(int argc, char **argv)
} }
if (msg) { if (msg) {
trigger = curr; trigger = curr;
printf("%s %ds due to%s%s%s%s%s\n", printf("%s %ds due to%s%s%s%s%s%s%s\n",
msg, (int)(curr - start), msg, (int)(curr - start),
socketer_using_data ? " socketer" : EMPTY, socketer_using_data ? " socketer" : EMPTY,
summariser_using_data ? " summariser" : EMPTY, summariser_using_data ? " summariser" : EMPTY,
logger_using_data ? " logger" : EMPTY, logger_using_data ? " logger" : EMPTY,
listener_using_data ? " listener" : EMPTY, plistener_using_data ? " plistener" : EMPTY,
clistener_using_data ? " clistener" : EMPTY,
blistener_using_data ? " blistener" : EMPTY,
marker_using_data ? " marker" : EMPTY); marker_using_data ? " marker" : EMPTY);
fflush(stdout); fflush(stdout);
} }

7
src/ckdb.h

@ -55,7 +55,7 @@
#define DB_VLOCK "1" #define DB_VLOCK "1"
#define DB_VERSION "1.0.4" #define DB_VERSION "1.0.4"
#define CKDB_VERSION DB_VERSION"-1.620" #define CKDB_VERSION DB_VERSION"-1.704"
#define WHERE_FFL " - from %s %s() line %d" #define WHERE_FFL " - from %s %s() line %d"
#define WHERE_FFL_HERE __FILE__, __func__, __LINE__ #define WHERE_FFL_HERE __FILE__, __func__, __LINE__
@ -765,6 +765,7 @@ typedef struct msgline {
char *code; char *code;
K_TREE *trf_root; K_TREE *trf_root;
K_STORE *trf_store; K_STORE *trf_store;
int sockd;
} MSGLINE; } MSGLINE;
#define ALLOC_MSGLINE 8192 #define ALLOC_MSGLINE 8192
@ -792,7 +793,9 @@ typedef struct workqueue {
#define DATA_WORKQUEUE(_var, _item) DATA_GENERIC(_var, _item, workqueue, true) #define DATA_WORKQUEUE(_var, _item) DATA_GENERIC(_var, _item, workqueue, true)
extern K_LIST *workqueue_free; extern K_LIST *workqueue_free;
extern K_STORE *workqueue_store; extern K_STORE *pool_workqueue_store;
extern K_STORE *cmd_workqueue_store;
extern K_STORE *btc_workqueue_store;
extern mutex_t wq_waitlock; extern mutex_t wq_waitlock;
extern pthread_cond_t wq_waitcond; extern pthread_cond_t wq_waitcond;

17
src/ckdb_cmd.c

@ -3304,7 +3304,13 @@ static char *cmd_homepage(__maybe_unused PGconn *conn, char *cmd, char *id,
} }
// Don't bother with locking - it's just an FYI web stat // Don't bother with locking - it's just an FYI web stat
snprintf(tmp, sizeof(tmp), "sync=%d%c", workqueue_store->count, FLDSEP); int psync = pool_workqueue_store->count;
int csync = cmd_workqueue_store->count;
int bsync = btc_workqueue_store->count;
snprintf(tmp, sizeof(tmp), "psync=%d%c", psync, FLDSEP);
snprintf(tmp, sizeof(tmp), "csync=%d%c", csync, FLDSEP);
snprintf(tmp, sizeof(tmp), "bsync=%d%c", bsync, FLDSEP);
snprintf(tmp, sizeof(tmp), "sync=%d%c", psync + csync + bsync, FLDSEP);
APPEND_REALLOC(buf, off, len, tmp); APPEND_REALLOC(buf, off, len, tmp);
u_item = NULL; u_item = NULL;
@ -3953,9 +3959,9 @@ static char *cmd_setopts(PGconn *conn, char *cmd, char *id,
} }
} }
if (!oc_item) { if (!oc_item) {
K_RLOCK(optioncontrol_free); K_WLOCK(optioncontrol_free);
oc_item = k_unlink_head(optioncontrol_free); oc_item = k_unlink_head(optioncontrol_free);
K_RUNLOCK(optioncontrol_free); K_WUNLOCK(optioncontrol_free);
DATA_OPTIONCONTROL(optioncontrol, oc_item); DATA_OPTIONCONTROL(optioncontrol, oc_item);
bzero(optioncontrol, sizeof(*optioncontrol)); bzero(optioncontrol, sizeof(*optioncontrol));
STRNCPY(optioncontrol->optionname, optionname); STRNCPY(optioncontrol->optionname, optionname);
@ -4558,7 +4564,6 @@ static char *cmd_pplns2(__maybe_unused PGconn *conn, char *cmd, char *id,
b_item = find_after_in_ktree(blocks_root, &b_look, b_ctx); b_item = find_after_in_ktree(blocks_root, &b_look, b_ctx);
K_RUNLOCK(blocks_free); K_RUNLOCK(blocks_free);
if (!b_item) { if (!b_item) {
K_RUNLOCK(blocks_free);
snprintf(reply, siz, "ERR.no block height >= %"PRId32, height); snprintf(reply, siz, "ERR.no block height >= %"PRId32, height);
return strdup(reply); return strdup(reply);
} }
@ -5684,7 +5689,7 @@ static char *cmd_stats(__maybe_unused PGconn *conn, char *cmd, char *id,
USEINFO(workerstatus, 1, 1); USEINFO(workerstatus, 1, 1);
USEINFO(userinfo, 1, 1); USEINFO(userinfo, 1, 1);
USEINFO(msgline, 1, 0); USEINFO(msgline, 1, 0);
USEINFO(workqueue, 1, 0); USEINFO(workqueue, 3, 0);
USEINFO(transfer, 0, 0); USEINFO(transfer, 0, 0);
USEINFO(heartbeatqueue, 1, 0); USEINFO(heartbeatqueue, 1, 0);
USEINFO(logqueue, 1, 0); USEINFO(logqueue, 1, 0);
@ -6603,7 +6608,7 @@ static char *cmd_userinfo(__maybe_unused PGconn *conn, char *cmd, char *id,
* You must supply the btcserver to change anything * You must supply the btcserver to change anything
* The format for userpass is username:password * The format for userpass is username:password
* If you don't supply the btcserver it will simply report the current server * If you don't supply the btcserver it will simply report the current server
* If supply btcserver but not the userpass it will use the current userpass * If you supply btcserver but not the userpass it will use the current userpass
* The reply will ONLY contain the URL, not the user/pass */ * The reply will ONLY contain the URL, not the user/pass */
static char *cmd_btcset(__maybe_unused PGconn *conn, char *cmd, char *id, static char *cmd_btcset(__maybe_unused PGconn *conn, char *cmd, char *id,
__maybe_unused tv_t *now, __maybe_unused char *by, __maybe_unused tv_t *now, __maybe_unused char *by,

19
src/ckdb_data.c

@ -3455,7 +3455,7 @@ K_ITEM *find_payoutid(int64_t payoutid)
return find_in_ktree(payouts_id_root, &look, ctx); return find_in_ktree(payouts_id_root, &look, ctx);
} }
// First payouts workinfoidend equal or before workinfoidend // First payouts workinfoidend equal or after workinfoidend
K_ITEM *find_payouts_wid(int64_t workinfoidend, K_TREE_CTX *ctx) K_ITEM *find_payouts_wid(int64_t workinfoidend, K_TREE_CTX *ctx)
{ {
PAYOUTS payouts; PAYOUTS payouts;
@ -3465,12 +3465,13 @@ K_ITEM *find_payouts_wid(int64_t workinfoidend, K_TREE_CTX *ctx)
if (ctx == NULL) if (ctx == NULL)
ctx = ctx0; ctx = ctx0;
payouts.workinfoidend = workinfoidend+1; payouts.workinfoidend = workinfoidend-1;
DATE_ZERO(&(payouts.expirydate)); payouts.expirydate.tv_sec = default_expiry.tv_sec;
payouts.expirydate.tv_usec = default_expiry.tv_usec;
INIT_PAYOUTS(&look); INIT_PAYOUTS(&look);
look.data = (void *)(&payouts); look.data = (void *)(&payouts);
return find_before_in_ktree(payouts_wid_root, &look, ctx); return find_after_in_ktree(payouts_wid_root, &look, ctx);
} }
/* Values from payout stats, returns -1 if statname isn't found /* Values from payout stats, returns -1 if statname isn't found
@ -4921,23 +4922,25 @@ bool shift_rewards(K_ITEM *wm_item)
DATA_WORKMARKERS(wm, wm_item); DATA_WORKMARKERS(wm, wm_item);
K_RLOCK(payouts_free); K_RLOCK(payouts_free);
K_WLOCK(workmarkers_free);
p_item = find_payouts_wid(wm->workinfoidend, ctx); p_item = find_payouts_wid(wm->workinfoidend, ctx);
DATA_PAYOUTS_NULL(payouts, p_item); DATA_PAYOUTS_NULL(payouts, p_item);
// a workmarker should not cross a payout boundary // a workmarker should not cross a payout boundary
while (p_item && payouts->workinfoidstart <= wm->workinfoidstart && while (p_item && payouts->workinfoidstart <= wm->workinfoidstart &&
wm->workinfoidend <= payouts->workinfoidend) { wm->workinfoidend <= payouts->workinfoidend) {
if (CURRENT(&(payouts->expirydate))) { if (CURRENT(&(payouts->expirydate)) &&
PAYGENERATED(payouts->status)) {
rewards++; rewards++;
pps += (double)(payouts->minerreward) / pps += (double)(payouts->minerreward) /
payouts->diffused; payouts->diffused;
} }
p_item = prev_in_ktree(ctx); p_item = next_in_ktree(ctx);
DATA_PAYOUTS_NULL(payouts, p_item); DATA_PAYOUTS_NULL(payouts, p_item);
} }
K_RUNLOCK(payouts_free);
wm->rewards = rewards; wm->rewards = rewards;
wm->rewarded = pps; wm->rewarded = pps;
K_WUNLOCK(workmarkers_free);
K_RUNLOCK(payouts_free);
return (rewards > 0); return (rewards > 0);
} }

2
src/ktree.c

@ -638,6 +638,7 @@ K_ITEM *_find_in_ktree(K_TREE *tree, K_ITEM *data, K_TREE_CTX *ctx, bool chklock
} }
} }
// First item after data
K_ITEM *_find_after_in_ktree(K_TREE *tree, K_ITEM *data, K_TREE_CTX *ctx, LOCK_MAYBE bool chklock, KTREE_FFL_ARGS) K_ITEM *_find_after_in_ktree(K_TREE *tree, K_ITEM *data, K_TREE_CTX *ctx, LOCK_MAYBE bool chklock, KTREE_FFL_ARGS)
{ {
K_NODE *knode, *old = NULL; K_NODE *knode, *old = NULL;
@ -690,6 +691,7 @@ K_ITEM *_find_after_in_ktree(K_TREE *tree, K_ITEM *data, K_TREE_CTX *ctx, LOCK_M
} }
} }
// Last item before data
K_ITEM *_find_before_in_ktree(K_TREE *tree, K_ITEM *data, K_TREE_CTX *ctx, KTREE_FFL_ARGS) K_ITEM *_find_before_in_ktree(K_TREE *tree, K_ITEM *data, K_TREE_CTX *ctx, KTREE_FFL_ARGS)
{ {
K_NODE *knode, *old = NULL; K_NODE *knode, *old = NULL;

Loading…
Cancel
Save