Browse Source

ckdb - increase the threaded processing stages and signal all threads rather than polling

master
kanoi 9 years ago
parent
commit
be8566c4ce
  1. 728
      src/ckdb.c
  2. 111
      src/ckdb.h
  3. 60
      src/ckdb_cmd.c
  4. 14
      src/ckdb_data.c

728
src/ckdb.c

File diff suppressed because it is too large Load Diff

111
src/ckdb.h

@ -19,6 +19,7 @@
#include <sys/stat.h> #include <sys/stat.h>
#include <sys/types.h> #include <sys/types.h>
#include <sys/wait.h> #include <sys/wait.h>
#include <sys/epoll.h>
#include <fenv.h> #include <fenv.h>
#include <getopt.h> #include <getopt.h>
#include <jansson.h> #include <jansson.h>
@ -51,7 +52,7 @@
#define DB_VLOCK "1" #define DB_VLOCK "1"
#define DB_VERSION "1.0.5" #define DB_VERSION "1.0.5"
#define CKDB_VERSION DB_VERSION"-2.015" #define CKDB_VERSION DB_VERSION"-2.100"
#define WHERE_FFL " - from %s %s() line %d" #define WHERE_FFL " - from %s %s() line %d"
#define WHERE_FFL_HERE __FILE__, __func__, __LINE__ #define WHERE_FFL_HERE __FILE__, __func__, __LINE__
@ -344,6 +345,9 @@ extern tv_t last_auth;
extern cklock_t last_lock; extern cklock_t last_lock;
// Running stats // Running stats
// replier()
extern double reply_full_us;
extern uint64_t reply_sent, reply_cant, reply_discarded, reply_fails;
// socketer() // socketer()
extern tv_t sock_stt; extern tv_t sock_stt;
extern double sock_us, sock_recv_us, sock_lock_wq_us, sock_lock_br_us; extern double sock_us, sock_recv_us, sock_lock_wq_us, sock_lock_br_us;
@ -1036,6 +1040,10 @@ typedef struct msgline {
int which_cmds; int which_cmds;
tv_t now; tv_t now;
tv_t cd; tv_t cd;
tv_t accepted; // copied from breakqueue
tv_t broken; // breakdown done
tv_t processed; // not all are processed
tv_t replied;
char id[ID_SIZ+1]; char id[ID_SIZ+1];
char cmd[CMD_SIZ+1]; char cmd[CMD_SIZ+1];
char *msg; char *msg;
@ -1065,7 +1073,8 @@ extern K_STORE *msgline_store;
// BREAKQUEUE // BREAKQUEUE
typedef struct breakqueue { typedef struct breakqueue {
char *buf; char *buf;
tv_t now; tv_t accepted; // socket accepted or line read
tv_t now; // msg read or line read
int seqentryflags; int seqentryflags;
int sockd; int sockd;
enum cmd_values cmdnum; enum cmd_values cmdnum;
@ -1082,13 +1091,8 @@ typedef struct breakqueue {
/* If a breaker() thread's done break queue count hits the LIMIT, or is empty, /* If a breaker() thread's done break queue count hits the LIMIT, or is empty,
* it will sleep for SLEEP ms * it will sleep for SLEEP ms
* So this means that with a single breaker() thread, * Also note that LIMIT defines how much RAM can be used by the break queues,
* it can process at most LIMIT records per SLEEP ms * so a limit is required
* or: 1000 * LIMIT / SLEEP records per second
* For N breaker() threads, that would mean between 1 and N times that value
* dependent upon the random time spacing of the N thread sleeps
* However, also note that LIMIT defines how much RAM can be used by
* the break queues, so a limit is required
* A breakqueue item can get quite large since it includes both buf * A breakqueue item can get quite large since it includes both buf
* and ml_item (which has the transfer data) in the 'done' queue * and ml_item (which has the transfer data) in the 'done' queue
* Of course the processing speed of the ml_items will also decide how big the * Of course the processing speed of the ml_items will also decide how big the
@ -1101,10 +1105,10 @@ typedef struct breakqueue {
* thus limiting the line processing of reload files * thus limiting the line processing of reload files
*/ */
#define RELOAD_QUEUE_LIMIT 16300 #define RELOAD_QUEUE_LIMIT 16300
#define RELOAD_QUEUE_SLEEP 42 #define RELOAD_QUEUE_SLEEP_MS 42
// Don't really limit the cmd queue // Don't really limit the cmd queue
#define CMD_QUEUE_LIMIT 1048500 #define CMD_QUEUE_LIMIT 1048500
#define CMD_QUEUE_SLEEP 1 #define CMD_QUEUE_SLEEP_MS 42
extern K_LIST *breakqueue_free; extern K_LIST *breakqueue_free;
extern K_STORE *reload_breakqueue_store; extern K_STORE *reload_breakqueue_store;
@ -1118,6 +1122,26 @@ extern int cmd_processing;
extern int sockd_count; extern int sockd_count;
extern int max_sockd_count; extern int max_sockd_count;
// Trigger breaker() processing
extern mutex_t bq_reload_waitlock;
extern mutex_t bq_cmd_waitlock;
extern pthread_cond_t bq_reload_waitcond;
extern pthread_cond_t bq_cmd_waitcond;
extern uint64_t bq_reload_signals, bq_cmd_signals;
extern uint64_t bq_reload_wakes, bq_cmd_wakes;
extern uint64_t bq_reload_timeouts, bq_cmd_timeouts;
// Trigger reload/socket *_done_* processing
extern mutex_t process_reload_waitlock;
extern mutex_t process_socket_waitlock;
extern pthread_cond_t process_reload_waitcond;
extern pthread_cond_t process_socket_waitcond;
extern uint64_t process_reload_signals, process_socket_signals;
extern uint64_t process_reload_wakes, process_socket_wakes;
extern uint64_t process_reload_timeouts, process_socket_timeouts;
// WORKQUEUE // WORKQUEUE
typedef struct workqueue { typedef struct workqueue {
K_ITEM *msgline_item; K_ITEM *msgline_item;
@ -1143,6 +1167,62 @@ extern int64_t earlysock_left;
extern int64_t pool0_tot; extern int64_t pool0_tot;
extern int64_t pool0_discarded; extern int64_t pool0_discarded;
// Trigger workqueue threads
extern mutex_t wq_pool_waitlock;
extern mutex_t wq_cmd_waitlock;
extern mutex_t wq_btc_waitlock;
extern pthread_cond_t wq_pool_waitcond;
extern pthread_cond_t wq_cmd_waitcond;
extern pthread_cond_t wq_btc_waitcond;
extern uint64_t wq_pool_signals, wq_cmd_signals, wq_btc_signals;
extern uint64_t wq_pool_wakes, wq_cmd_wakes, wq_btc_wakes;
extern uint64_t wq_pool_timeouts, wq_cmd_timeouts, wq_btc_timeouts;
// REPLIES
typedef struct replies {
tv_t now;
tv_t createdate;
tv_t accepted;
tv_t broken;
tv_t processed;
int sockd;
char *reply;
struct epoll_event event;
const char *file;
const char *func;
int line;
} REPLIES;
#define ALLOC_REPLIES 65536
#define LIMIT_REPLIES 0
#define INIT_REPLIES(_item) INIT_GENERIC(_item, replies)
#define DATA_REPLIES(_var, _item) DATA_GENERIC(_var, _item, replies, true)
extern K_LIST *replies_free;
extern K_STORE *replies_store;
extern K_TREE *replies_pool_root;
extern K_TREE *replies_cmd_root;
extern K_TREE *replies_btc_root;
// Close the socket and discard the reply, X ms after it gets in a list
#define REPLIES_LIMIT_MS 10000
extern int epollfd_pool;
extern int epollfd_cmd;
extern int epollfd_btc;
extern int rep_tot_sockd;
extern int rep_failed_sockd;
extern int rep_max_sockd;
// maximum counts and fd values
extern int rep_max_pool_sockd;
extern int rep_max_cmd_sockd;
extern int rep_max_btc_sockd;
extern int rep_max_pool_sockd_fd;
extern int rep_max_cmd_sockd_fd;
extern int rep_max_btc_sockd_fd;
// HEARTBEATQUEUE // HEARTBEATQUEUE
typedef struct heartbeatqueue { typedef struct heartbeatqueue {
char workername[TXT_BIG+1]; char workername[TXT_BIG+1];
@ -2676,10 +2756,14 @@ extern K_TREE *userinfo_root;
extern K_LIST *userinfo_free; extern K_LIST *userinfo_free;
extern K_STORE *userinfo_store; extern K_STORE *userinfo_store;
enum reply_type {
REPLIER_POOL,
REPLIER_CMD,
REPLIER_BTC
};
extern void logmsg(int loglevel, const char *fmt, ...); extern void logmsg(int loglevel, const char *fmt, ...);
extern void setnow(tv_t *now); extern void setnow(tv_t *now);
extern void _ckdb_unix_msg(int sockd, const char *msg, WHERE_FFL_ARGS);
#define ckdb_unix_msg(_sockd, _msg) _ckdb_unix_msg(_sockd, _msg, WHERE_FFL_HERE)
extern void tick(); extern void tick();
extern PGconn *dbconnect(); extern PGconn *dbconnect();
extern void sequence_report(bool lock); extern void sequence_report(bool lock);
@ -2823,6 +2907,7 @@ extern void workerstatus_ready();
_workerstatus_update(_auths, _shares, _userstats, WHERE_FFL_HERE) _workerstatus_update(_auths, _shares, _userstats, WHERE_FFL_HERE)
extern void _workerstatus_update(AUTHS *auths, SHARES *shares, extern void _workerstatus_update(AUTHS *auths, SHARES *shares,
USERSTATS *userstats, WHERE_FFL_ARGS); USERSTATS *userstats, WHERE_FFL_ARGS);
extern cmp_t cmp_replies(K_ITEM *a, K_ITEM *b);
extern cmp_t cmp_users(K_ITEM *a, K_ITEM *b); extern cmp_t cmp_users(K_ITEM *a, K_ITEM *b);
extern cmp_t cmp_userid(K_ITEM *a, K_ITEM *b); extern cmp_t cmp_userid(K_ITEM *a, K_ITEM *b);
extern K_ITEM *find_users(char *username); extern K_ITEM *find_users(char *username);

60
src/ckdb_cmd.c

@ -1289,8 +1289,8 @@ static char *cmd_blocklist(__maybe_unused PGconn *conn, char *cmd, char *id,
ovent = ovents_add(OVENTID_BLOCKS, trf_root); ovent = ovents_add(OVENTID_BLOCKS, trf_root);
if (ovent != OVENT_OK) { if (ovent != OVENT_OK) {
snprintf(tmp, sizeof(tmp), "ERR"); snprintf(reply, sizeof(reply), "ERR");
return reply_ovent(ovent, tmp); return reply_ovent(ovent, reply);
} }
maxrows = sys_setting(BLOCKS_SETTING_NAME, BLOCKS_DEFAULT, now); maxrows = sys_setting(BLOCKS_SETTING_NAME, BLOCKS_DEFAULT, now);
@ -6788,7 +6788,7 @@ static char *cmd_shsta(__maybe_unused PGconn *conn, char *cmd, char *id,
char buf[256]; char buf[256];
int relq_count, _reload_processing, relqd_count; int relq_count, _reload_processing, relqd_count;
int cmdq_count, _cmd_processing, cmdqd_count, _max_sockd_count; int cmdq_count, _cmd_processing, cmdqd_count, _max_sockd_count;
int pool0_count, poolq_count; int pool0_count, poolq_count, rep_max_fd;
int64_t _earlysock_left, _pool0_discarded, _pool0_tot; int64_t _earlysock_left, _pool0_discarded, _pool0_tot;
uint64_t count1, count2, count3, count4; uint64_t count1, count2, count3, count4;
double tot1, tot2; double tot1, tot2;
@ -6814,8 +6814,8 @@ static char *cmd_shsta(__maybe_unused PGconn *conn, char *cmd, char *id,
poolq_count = pool_workqueue_store->count; poolq_count = pool_workqueue_store->count;
K_RUNLOCK(workqueue_free); K_RUNLOCK(workqueue_free);
LOGWARNING(" reload=%d/%d/%d cmd=%d/%d/%d es=%"PRId64 LOGWARNING(" reload=rq%d/rp%d/rd%d cmd=cq%d/cp%d/cd%d es=%"PRId64
" pool0=%d/%"PRId64"/%"PRId64" poolq=%d max_sockd=%d", " pool0=c%d/d%"PRId64"/t%"PRId64" poolq=c%d max_sockd=%d",
relq_count, _reload_processing, relqd_count, relq_count, _reload_processing, relqd_count,
cmdq_count, _cmd_processing, cmdqd_count, cmdq_count, _cmd_processing, cmdqd_count,
_earlysock_left, _earlysock_left,
@ -6826,8 +6826,10 @@ static char *cmd_shsta(__maybe_unused PGconn *conn, char *cmd, char *id,
count2 = sock_recv ? : 1; count2 = sock_recv ? : 1;
count3 = sock_proc_early ? : 1; count3 = sock_proc_early ? : 1;
count4 = sock_processed ? : 1; count4 = sock_processed ? : 1;
LOGWARNING(" sock: tot %f sock %f/%"PRIu64"/%f recv %f/%"PRIu64"/%f " LOGWARNING(" sock: t%fs sock t%fs/t%"PRIu64"/av%fs"
"lckw %f/%"PRIu64"/%f lckb %f/%"PRIu64"/%f", " recv t%fs/t%"PRIu64"/av%fs"
" lckw t%fs/t%"PRIu64"/av%fs"
" lckb t%fs/t%"PRIu64"/av%fs",
us_tvdiff(now, &sock_stt)/1000000, us_tvdiff(now, &sock_stt)/1000000,
sock_us/1000000, sock_acc, (sock_us/count1)/1000000, sock_us/1000000, sock_acc, (sock_us/count1)/1000000,
sock_recv_us/1000000, sock_recv, sock_recv_us/1000000, sock_recv,
@ -6851,16 +6853,54 @@ static char *cmd_shsta(__maybe_unused PGconn *conn, char *cmd, char *id,
tot2 = us_tvdiff(now, &break_cmd_stt); tot2 = us_tvdiff(now, &break_cmd_stt);
count1 = break_reload_processed ? : 1; count1 = break_reload_processed ? : 1;
count2 = break_cmd_processed ? : 1; count2 = break_cmd_processed ? : 1;
LOGWARNING(" break reload: %f/%"PRIu64"/%f cmd: %f/%"PRIu64"/%f", LOGWARNING(" break reload: t%fs/t%"PRIu64"/av%fs "
"%"PRIu64"s/%"PRIu64"w/%"PRIu64"t"
" cmd: t%fs/t%"PRIu64"/av%fs "
"%"PRIu64"s/%"PRIu64"w/%"PRIu64"t",
tot1/1000000, break_reload_processed, (tot1/count1)/1000000, tot1/1000000, break_reload_processed, (tot1/count1)/1000000,
tot2/1000000, break_cmd_processed, (tot2/count2)/1000000); bq_reload_signals, bq_reload_wakes, bq_reload_timeouts,
tot2/1000000, break_cmd_processed, (tot2/count2)/1000000,
bq_cmd_signals, bq_cmd_wakes, bq_cmd_timeouts);
LOGWARNING(" queue reload: %"PRIu64"s/%"PRIu64"w/%"PRIu64"t"
" cmd: %"PRIu64"s/%"PRIu64"w/%"PRIu64"t",
process_reload_signals, process_reload_wakes,
process_reload_timeouts,
process_socket_signals, process_socket_wakes,
process_socket_timeouts);
LOGWARNING(" process pool: %"PRIu64"s/%"PRIu64"w/%"PRIu64"t"
" cmd: %"PRIu64"s/%"PRIu64"w/%"PRIu64"t"
" btc: %"PRIu64"s/%"PRIu64"w/%"PRIu64"t",
wq_pool_signals, wq_pool_wakes, wq_pool_timeouts,
wq_cmd_signals, wq_cmd_wakes, wq_cmd_timeouts,
wq_btc_signals, wq_btc_wakes, wq_btc_timeouts);
count1 = clis_processed ? : 1; count1 = clis_processed ? : 1;
count2 = blis_processed ? : 1; count2 = blis_processed ? : 1;
LOGWARNING(" clistener %f/%"PRIu64"/%f blistener: %f/%"PRIu64"/%f", LOGWARNING(" clistener: t%fs/t%"PRIu64"/av%fs"
" blistener: t%fs/t%"PRIu64"/av%fs",
clis_us/1000000, clis_processed, (clis_us/count1)/1000000, clis_us/1000000, clis_processed, (clis_us/count1)/1000000,
blis_us/1000000, blis_processed, (blis_us/count2)/1000000); blis_us/1000000, blis_processed, (blis_us/count2)/1000000);
rep_max_fd = rep_max_pool_sockd_fd;
if (rep_max_fd < rep_max_cmd_sockd_fd)
rep_max_fd = rep_max_cmd_sockd_fd;
if (rep_max_fd < rep_max_btc_sockd_fd)
rep_max_fd = rep_max_btc_sockd_fd;
LOGWARNING(" replies t%d/^%d/^%dfd/f%d pool ^%d/^%dfd cmd ^%d/^%dfd"
" btc ^%d/^%dfd",
rep_tot_sockd, rep_max_sockd, rep_max_fd, rep_failed_sockd,
rep_max_pool_sockd, rep_max_pool_sockd_fd,
rep_max_cmd_sockd, rep_max_cmd_sockd_fd,
rep_max_btc_sockd, rep_max_btc_sockd_fd);
count1 = reply_sent ? : 1;
LOGWARNING(" sent t%"PRIu64"/x%"PRIu64"/d%"PRIu64"/f%"PRIu64
"/t%fs/av%fs",
reply_sent, reply_cant, reply_discarded, reply_fails,
reply_full_us/1000000, (reply_full_us/count1)/1000000);
snprintf(buf, sizeof(buf), "ok.%s", cmd); snprintf(buf, sizeof(buf), "ok.%s", cmd);
LOGDEBUG("%s.%s", id, buf); LOGDEBUG("%s.%s", id, buf);
return strdup(buf); return strdup(buf);

14
src/ckdb_data.c

@ -1146,6 +1146,16 @@ void _workerstatus_update(AUTHS *auths, SHARES *shares,
} }
} }
/* default tree order by now asc
* now is guaranteed unique since it's acquired under exclusive lock */
cmp_t cmp_replies(K_ITEM *a, K_ITEM *b)
{
REPLIES *ra, *rb;
DATA_REPLIES(ra, a);
DATA_REPLIES(rb, b);
return CMP_TV(ra->now, rb->now);
}
// default tree order by username asc,expirydate desc // default tree order by username asc,expirydate desc
cmp_t cmp_users(K_ITEM *a, K_ITEM *b) cmp_t cmp_users(K_ITEM *a, K_ITEM *b)
{ {
@ -4991,6 +5001,10 @@ static size_t tmfsiz = sizeof(tmf); // includes null
static char tma[] = "Too many accesses, come back later"; static char tma[] = "Too many accesses, come back later";
static size_t tmasiz = sizeof(tma); // includes null static size_t tmasiz = sizeof(tma); // includes null
/* This always returns a reply that needs to be freed
* fre says if buf was malloced
* i.e. fre means buf needs to be freed if it is not returned
* and !fre means we need to strdup buf, if we need to return it */
char *_reply_event(bool is_event, int event, char *buf, bool fre) char *_reply_event(bool is_event, int event, char *buf, bool fre)
{ {
size_t len; size_t len;

Loading…
Cancel
Save