Browse Source

ckdb - reply to heartbeat with web diff change info

master
kanoi 10 years ago
parent
commit
64ff56cb63
  1. 9
      src/ckdb.c
  2. 15
      src/ckdb.h
  3. 79
      src/ckdb_cmd.c

9
src/ckdb.c

@ -285,6 +285,10 @@ K_STORE *workqueue_store;
pthread_mutex_t wq_waitlock; pthread_mutex_t wq_waitlock;
pthread_cond_t wq_waitcond; pthread_cond_t wq_waitcond;
// HEARTBEATQUEUE
K_LIST *heartbeatqueue_free;
K_STORE *heartbeatqueue_store;
// TRANSFER // TRANSFER
K_LIST *transfer_free; K_LIST *transfer_free;
@ -861,6 +865,11 @@ static void alloc_storage()
ALLOC_WORKQUEUE, LIMIT_WORKQUEUE, true); ALLOC_WORKQUEUE, LIMIT_WORKQUEUE, true);
workqueue_store = k_new_store(workqueue_free); workqueue_store = k_new_store(workqueue_free);
heartbeatqueue_free = k_new_list("HeartBeatQueue", sizeof(HEARTBEATQUEUE),
ALLOC_HEARTBEATQUEUE,
LIMIT_HEARTBEATQUEUE, true);
heartbeatqueue_store = k_new_store(heartbeatqueue_free);
transfer_free = k_new_list(Transfer, sizeof(TRANSFER), transfer_free = k_new_list(Transfer, sizeof(TRANSFER),
ALLOC_TRANSFER, LIMIT_TRANSFER, true); ALLOC_TRANSFER, LIMIT_TRANSFER, true);
transfer_free->dsp_func = dsp_transfer; transfer_free->dsp_func = dsp_transfer;

15
src/ckdb.h

@ -488,6 +488,21 @@ extern K_STORE *workqueue_store;
extern pthread_mutex_t wq_waitlock; extern pthread_mutex_t wq_waitlock;
extern pthread_cond_t wq_waitcond; extern pthread_cond_t wq_waitcond;
// HEARTBEATQUEUE
typedef struct heartbeatqueue {
char workername[TXT_BIG+1];
int32_t difficultydefault;
tv_t createdate;
} HEARTBEATQUEUE;
#define ALLOC_HEARTBEATQUEUE 128
#define LIMIT_HEARTBEATQUEUE 0
#define INIT_HEARTBEATQUEUE(_item) INIT_GENERIC(_item, heartbeatqueue)
#define DATA_HEARTBEATQUEUE(_var, _item) DATA_GENERIC(_var, _item, heartbeatqueue, true)
extern K_LIST *heartbeatqueue_free;
extern K_STORE *heartbeatqueue_store;
// TRANSFER // TRANSFER
#define NAME_SIZE 63 #define NAME_SIZE 63
#define VALUE_SIZE 1023 #define VALUE_SIZE 1023

79
src/ckdb_cmd.c

@ -284,6 +284,8 @@ static char *cmd_workerset(PGconn *conn, char *cmd, char *id, tv_t *now,
__maybe_unused tv_t *notcd, K_TREE *trf_root) __maybe_unused tv_t *notcd, K_TREE *trf_root)
{ {
K_ITEM *i_username, *i_workername, *i_diffdef, *u_item, *w_item; K_ITEM *i_username, *i_workername, *i_diffdef, *u_item, *w_item;
HEARTBEATQUEUE *heartbeatqueue;
K_ITEM *hq_item;
char workername_buf[32]; // 'workername:' + digits char workername_buf[32]; // 'workername:' + digits
char diffdef_buf[32]; // 'difficultydefault:' + digits char diffdef_buf[32]; // 'difficultydefault:' + digits
char reply[1024] = ""; char reply[1024] = "";
@ -375,6 +377,21 @@ static char *cmd_workerset(PGconn *conn, char *cmd, char *id, tv_t *now,
reason = "DB error"; reason = "DB error";
break; break;
} }
/* workerset is not from a log file,
so always queue it */
K_WLOCK(heartbeatqueue_free);
hq_item = k_unlink_head(heartbeatqueue_free);
K_WUNLOCK(heartbeatqueue_free);
DATA_HEARTBEATQUEUE(heartbeatqueue, hq_item);
STRNCPY(heartbeatqueue->workername, workers->workername);
heartbeatqueue->difficultydefault = workers->difficultydefault;
copy_tv(&(heartbeatqueue->createdate), now);
K_WLOCK(heartbeatqueue_free);
k_add_tail(heartbeatqueue_store, hq_item);
K_WUNLOCK(heartbeatqueue_free);
} }
} }
} }
@ -1895,20 +1912,64 @@ static char *cmd_addrauth(PGconn *conn, char *cmd, char *id,
return cmd_addrauth_do(conn, cmd, id, by, code, inet, cd, igndup, trf_root); return cmd_addrauth_do(conn, cmd, id, by, code, inet, cd, igndup, trf_root);
} }
/* TODO: This must decide the reply based on the reloading/startup status
* Reload data will be a simple pulse reply
* If workers have been loaded (db_auths_complete) then any worker diff
* changes from the web, after that, will mean a diff change reply
*/
static char *cmd_heartbeat(__maybe_unused PGconn *conn, char *cmd, char *id, static char *cmd_heartbeat(__maybe_unused PGconn *conn, char *cmd, char *id,
__maybe_unused tv_t *now, __maybe_unused char *by, __maybe_unused tv_t *now, __maybe_unused char *by,
__maybe_unused char *code, __maybe_unused char *inet, __maybe_unused char *code, __maybe_unused char *inet,
__maybe_unused tv_t *cd, __maybe_unused tv_t *cd,
__maybe_unused K_TREE *trf_root) __maybe_unused K_TREE *trf_root)
{ {
char reply[1024] = ""; HEARTBEATQUEUE *heartbeatqueue;
K_STORE *hq_store;
K_ITEM *hq_item;
char reply[1024], tmp[1024], *buf;
size_t siz = sizeof(reply); size_t siz = sizeof(reply);
size_t len, off;
bool first;
// Wait until startup is complete, we get a heartbeat every second
if (!startup_complete)
goto pulse;
K_WLOCK(heartbeatqueue_free);
if (heartbeatqueue_store->count == 0) {
K_WUNLOCK(heartbeatqueue_free);
goto pulse;
}
hq_store = k_new_store(heartbeatqueue_free);
k_list_transfer_to_head(heartbeatqueue_store, hq_store);
K_WUNLOCK(heartbeatqueue_free);
APPEND_REALLOC_INIT(buf, off, len);
APPEND_REALLOC(buf, off, len, "ok.heartbeat={\"diffchange\":[");
hq_item = hq_store->tail;
first = true;
while (hq_item) {
DATA_HEARTBEATQUEUE(heartbeatqueue, hq_item);
tvs_to_buf(&last_bc, reply, siz);
snprintf(tmp, sizeof(tmp),
"%s{\"workername\":\"%s\","
"\"difficultydefault\":%d,"
"\"createdate\":\"%ld,%ld\"}",
first ? "" : ",",
heartbeatqueue->workername,
heartbeatqueue->difficultydefault,
heartbeatqueue->createdate.tv_sec,
heartbeatqueue->createdate.tv_usec);
APPEND_REALLOC(buf, off, len, tmp);
hq_item = hq_item->prev;
first = false;
}
APPEND_REALLOC(buf, off, len, "]}");
K_WLOCK(heartbeatqueue_free);
k_list_transfer_to_head(hq_store, heartbeatqueue_free);
K_WUNLOCK(heartbeatqueue_free);
hq_store = k_free_store(hq_store);
LOGDEBUG("%s.%s.%s", cmd, id, buf);
return buf;
pulse:
snprintf(reply, siz, "ok.pulse"); snprintf(reply, siz, "ok.pulse");
LOGDEBUG("%s.%s.%s", cmd, id, reply); LOGDEBUG("%s.%s.%s", cmd, id, reply);
return strdup(reply); return strdup(reply);
@ -3246,6 +3307,7 @@ static char *cmd_stats(__maybe_unused PGconn *conn, char *cmd, char *id,
USEINFO(workerstatus, 1, 1); USEINFO(workerstatus, 1, 1);
USEINFO(workqueue, 1, 0); USEINFO(workqueue, 1, 0);
USEINFO(transfer, 0, 0); USEINFO(transfer, 0, 0);
USEINFO(heartbeatqueue, 1, 0);
USEINFO(logqueue, 1, 0); USEINFO(logqueue, 1, 0);
snprintf(tmp, sizeof(tmp), "totalram=%"PRIu64"%c", tot, FLDSEP); snprintf(tmp, sizeof(tmp), "totalram=%"PRIu64"%c", tot, FLDSEP);
@ -3323,6 +3385,11 @@ static char *cmd_stats(__maybe_unused PGconn *conn, char *cmd, char *id,
* *
* createdate = true * createdate = true
* means that the data sent must contain a fld or json fld called createdate * means that the data sent must contain a fld or json fld called createdate
*
* The reply format for authorise, addrauth and heartbeat includes json:
* ID.STAMP.ok.cmd={json}
* where cmd is auth, addrauth, or heartbeat
* For the heartbeat pulse reply it has no '={}'
*/ */
// cmd_val cmd_str noid createdate func access // cmd_val cmd_str noid createdate func access

Loading…
Cancel
Save