diff --git a/src/ckdb.c b/src/ckdb.c index 4242eaa1..e6ac8f66 100644 --- a/src/ckdb.c +++ b/src/ckdb.c @@ -285,6 +285,10 @@ K_STORE *workqueue_store; pthread_mutex_t wq_waitlock; pthread_cond_t wq_waitcond; +// HEARTBEATQUEUE +K_LIST *heartbeatqueue_free; +K_STORE *heartbeatqueue_store; + // TRANSFER K_LIST *transfer_free; @@ -861,6 +865,11 @@ static void alloc_storage() ALLOC_WORKQUEUE, LIMIT_WORKQUEUE, true); workqueue_store = k_new_store(workqueue_free); + heartbeatqueue_free = k_new_list("HeartBeatQueue", sizeof(HEARTBEATQUEUE), + ALLOC_HEARTBEATQUEUE, + LIMIT_HEARTBEATQUEUE, true); + heartbeatqueue_store = k_new_store(heartbeatqueue_free); + transfer_free = k_new_list(Transfer, sizeof(TRANSFER), ALLOC_TRANSFER, LIMIT_TRANSFER, true); transfer_free->dsp_func = dsp_transfer; diff --git a/src/ckdb.h b/src/ckdb.h index b2b39f53..6bfeb572 100644 --- a/src/ckdb.h +++ b/src/ckdb.h @@ -488,6 +488,21 @@ extern K_STORE *workqueue_store; extern pthread_mutex_t wq_waitlock; extern pthread_cond_t wq_waitcond; +// HEARTBEATQUEUE +typedef struct heartbeatqueue { + char workername[TXT_BIG+1]; + int32_t difficultydefault; + tv_t createdate; +} HEARTBEATQUEUE; + +#define ALLOC_HEARTBEATQUEUE 128 +#define LIMIT_HEARTBEATQUEUE 0 +#define INIT_HEARTBEATQUEUE(_item) INIT_GENERIC(_item, heartbeatqueue) +#define DATA_HEARTBEATQUEUE(_var, _item) DATA_GENERIC(_var, _item, heartbeatqueue, true) + +extern K_LIST *heartbeatqueue_free; +extern K_STORE *heartbeatqueue_store; + // TRANSFER #define NAME_SIZE 63 #define VALUE_SIZE 1023 diff --git a/src/ckdb_cmd.c b/src/ckdb_cmd.c index ebe2b3a2..b04893ed 100644 --- a/src/ckdb_cmd.c +++ b/src/ckdb_cmd.c @@ -284,6 +284,8 @@ static char *cmd_workerset(PGconn *conn, char *cmd, char *id, tv_t *now, __maybe_unused tv_t *notcd, K_TREE *trf_root) { K_ITEM *i_username, *i_workername, *i_diffdef, *u_item, *w_item; + HEARTBEATQUEUE *heartbeatqueue; + K_ITEM *hq_item; char workername_buf[32]; // 'workername:' + digits char diffdef_buf[32]; // 'difficultydefault:' + digits char reply[1024] = ""; @@ -375,6 +377,21 @@ static char *cmd_workerset(PGconn *conn, char *cmd, char *id, tv_t *now, reason = "DB error"; break; } + + /* workerset is not from a log file, + so always queue it */ + K_WLOCK(heartbeatqueue_free); + hq_item = k_unlink_head(heartbeatqueue_free); + K_WUNLOCK(heartbeatqueue_free); + + DATA_HEARTBEATQUEUE(heartbeatqueue, hq_item); + STRNCPY(heartbeatqueue->workername, workers->workername); + heartbeatqueue->difficultydefault = workers->difficultydefault; + copy_tv(&(heartbeatqueue->createdate), now); + + K_WLOCK(heartbeatqueue_free); + k_add_tail(heartbeatqueue_store, hq_item); + K_WUNLOCK(heartbeatqueue_free); } } } @@ -1895,20 +1912,64 @@ static char *cmd_addrauth(PGconn *conn, char *cmd, char *id, return cmd_addrauth_do(conn, cmd, id, by, code, inet, cd, igndup, trf_root); } -/* TODO: This must decide the reply based on the reloading/startup status - * Reload data will be a simple pulse reply - * If workers have been loaded (db_auths_complete) then any worker diff - * changes from the web, after that, will mean a diff change reply - */ static char *cmd_heartbeat(__maybe_unused PGconn *conn, char *cmd, char *id, __maybe_unused tv_t *now, __maybe_unused char *by, __maybe_unused char *code, __maybe_unused char *inet, __maybe_unused tv_t *cd, __maybe_unused K_TREE *trf_root) { - char reply[1024] = ""; + HEARTBEATQUEUE *heartbeatqueue; + K_STORE *hq_store; + K_ITEM *hq_item; + char reply[1024], tmp[1024], *buf; size_t siz = sizeof(reply); + size_t len, off; + bool first; + + // Wait until startup is complete, we get a heartbeat every second + if (!startup_complete) + goto pulse; + + K_WLOCK(heartbeatqueue_free); + if (heartbeatqueue_store->count == 0) { + K_WUNLOCK(heartbeatqueue_free); + goto pulse; + } + hq_store = k_new_store(heartbeatqueue_free); + k_list_transfer_to_head(heartbeatqueue_store, hq_store); + K_WUNLOCK(heartbeatqueue_free); + + APPEND_REALLOC_INIT(buf, off, len); + APPEND_REALLOC(buf, off, len, "ok.heartbeat={\"diffchange\":["); + hq_item = hq_store->tail; + first = true; + while (hq_item) { + DATA_HEARTBEATQUEUE(heartbeatqueue, hq_item); + tvs_to_buf(&last_bc, reply, siz); + snprintf(tmp, sizeof(tmp), + "%s{\"workername\":\"%s\"," + "\"difficultydefault\":%d," + "\"createdate\":\"%ld,%ld\"}", + first ? "" : ",", + heartbeatqueue->workername, + heartbeatqueue->difficultydefault, + heartbeatqueue->createdate.tv_sec, + heartbeatqueue->createdate.tv_usec); + APPEND_REALLOC(buf, off, len, tmp); + hq_item = hq_item->prev; + first = false; + } + APPEND_REALLOC(buf, off, len, "]}"); + + K_WLOCK(heartbeatqueue_free); + k_list_transfer_to_head(hq_store, heartbeatqueue_free); + K_WUNLOCK(heartbeatqueue_free); + hq_store = k_free_store(hq_store); + + LOGDEBUG("%s.%s.%s", cmd, id, buf); + return buf; +pulse: snprintf(reply, siz, "ok.pulse"); LOGDEBUG("%s.%s.%s", cmd, id, reply); return strdup(reply); @@ -3246,6 +3307,7 @@ static char *cmd_stats(__maybe_unused PGconn *conn, char *cmd, char *id, USEINFO(workerstatus, 1, 1); USEINFO(workqueue, 1, 0); USEINFO(transfer, 0, 0); + USEINFO(heartbeatqueue, 1, 0); USEINFO(logqueue, 1, 0); snprintf(tmp, sizeof(tmp), "totalram=%"PRIu64"%c", tot, FLDSEP); @@ -3323,6 +3385,11 @@ static char *cmd_stats(__maybe_unused PGconn *conn, char *cmd, char *id, * * createdate = true * means that the data sent must contain a fld or json fld called createdate + * + * The reply format for authorise, addrauth and heartbeat includes json: + * ID.STAMP.ok.cmd={json} + * where cmd is auth, addrauth, or heartbeat + * For the heartbeat pulse reply it has no '={}' */ // cmd_val cmd_str noid createdate func access