Browse Source

ckdb - redesigned userstats with eos marker - and more severe data errors logged

master
kanoi 11 years ago
parent
commit
069f724c22
  1. 292
      src/ckdb.c

292
src/ckdb.c

@ -46,6 +46,10 @@
__maybe_unused const char *func, \
__maybe_unused const int line
// So they can fit into a 1 byte flag field
#define TRUE_STR "Y"
#define FALSE_STR "N"
#define coinbase1height(_cb1) _coinbase1height(_cb1, WHERE_FFL_HERE)
#define cmp_height(_cb1a, _cb1b) _cmp_height(_cb1a, _cb1b, WHERE_FFL_HERE)
@ -56,6 +60,12 @@ static char *db_pass;
#define CMD_SIZ 31
#define ID_SIZ 31
// size to allocate for pgsql text and display (bigger than needed)
#define DATE_BUFSIZ (63+1)
#define BIGINT_BUFSIZ (63+1)
#define INT_BUFSIZ (63+1)
#define DOUBLE_BUFSIZ (63+1)
#define TXT_BIG 256
#define TXT_MED 128
#define TXT_SML 64
@ -874,6 +884,7 @@ typedef struct userstats {
char poolinstance[TXT_BIG+1];
int64_t elapsed;
int64_t userid;
char workername[TXT_BIG+1];
double hashrate;
double hashrate5m;
double hashrate1hr;
@ -881,6 +892,11 @@ typedef struct userstats {
SIMPLEDATECONTROLFIELDS;
} USERSTATS;
/* USERSATS protocol includes a boolean 'eos' that when true,
* we have received the full set of data for the given
* createdate batch, and thus can move all (complete) records
* matching the createdate in userstats_eos_store into the tree */
#define ALLOC_USERSTATS 1000
#define LIMIT_USERSTATS 0
#define DATA_USERSTATS(_item) ((USERSTATS *)(_item->data))
@ -888,6 +904,14 @@ typedef struct userstats {
static K_TREE *userstats_root;
static K_LIST *userstats_list;
static K_STORE *userstats_store;
// Awaiting EOS
static K_STORE *userstats_eos_store;
/* 1.5 x how often we expect to get user's stats from ckpool
* This is used when grouping the sub-worker stats into a single user
* We add each worker's latest stats to the total - except we ignore
* any worker with newest stats being older than USERSTATS_PER_S */
#define USERSTATS_PER_S (int)(600 * 1.5)
static char logname[512];
#define LOGFILE(_msg) rotating_log(logname, _msg)
@ -1022,27 +1046,45 @@ static K_ITEM *optional_name(char *name, int len, char *patt)
return item;
}
static K_ITEM *require_name(char *name, int len, char *patt, char *reply, size_t siz)
#define require_name(_name, _len, _patt, _reply, _siz) \
_require_name(_name, _len, _patt, _reply, _siz, \
WHERE_FFL_HERE)
static K_ITEM *_require_name(char *name, int len, char *patt, char *reply,
size_t siz, WHERE_FFL_ARGS)
{
K_ITEM *item;
char *value;
regex_t re;
size_t dlen;
int ret;
item = find_transfer(name);
if (!item) {
LOGERR("%s(): failed, field '%s' missing from %s():%d",
__func__, name, func, line);
snprintf(reply, siz, "failed.missing %s", name);
return NULL;
}
value = DATA_TRANSFER(item)->data;
if (!value || (int)strlen(value) < len) {
if (value)
dlen = strlen(value);
else
dlen = 0;
if (!value || (int)dlen < len) {
LOGERR("%s(): failed, field '%s' short (%s%d<%d) from %s():%d",
__func__, name, value ? "" : "null",
(int)dlen, len, func, line);
snprintf(reply, siz, "failed.short %s", name);
return NULL;
}
if (patt) {
if (regcomp(&re, patt, REG_NOSUB) != 0) {
LOGERR("%s(): failed, field '%s' failed to"
" compile patt from %s():%d",
__func__, name, func, line);
snprintf(reply, siz, "failed.REC %s", name);
return NULL;
}
@ -1051,6 +1093,8 @@ static K_ITEM *require_name(char *name, int len, char *patt, char *reply, size_t
regfree(&re);
if (ret != 0) {
LOGERR("%s(): failed, field '%s' invalid from %s():%d",
__func__, name, func, line);
snprintf(reply, siz, "failed.invalid %s", name);
return NULL;
}
@ -1212,12 +1256,18 @@ static char *data_to_buf(enum data_type typ, void *data, char *buf, size_t siz)
siz = strlen((char *)data) + 1;
break;
case TYPE_BIGINT:
siz = BIGINT_BUFSIZ;
break;
case TYPE_INT:
siz = INT_BUFSIZ;
break;
case TYPE_TV:
case TYPE_TVS:
case TYPE_CTV:
siz = DATE_BUFSIZ;
break;
case TYPE_DOUBLE:
siz = 64; // More than big enough
siz = DOUBLE_BUFSIZ;
break;
default:
quithere(1, "Unknown field (%d) to convert", (int)typ);
@ -3122,43 +3172,64 @@ static void dsp_userstats(K_ITEM *item, FILE *stream)
u = DATA_USERSTATS(item);
createdate_buf = tv_to_buf(&(u->createdate), NULL, 0);
fprintf(stream, " pi='%s' uid=%"PRId64" Hs=%f Hs5m=%f Hs1hr=%f Hs24hr=%f cd=%s\n",
u->poolinstance, u->userid, u->hashrate, u->hashrate5m,
fprintf(stream, " pi='%s' e=%"PRId64" uid=%"PRId64" w='%s' Hs=%f "
"Hs5m=%f Hs1hr=%f Hs24hr=%f cd=%s\n",
u->poolinstance, u->elapsed, u->userid,
u->workername, u->hashrate, u->hashrate5m,
u->hashrate1hr, u->hashrate24hr, createdate_buf);
free(createdate_buf);
}
// order by poolinstance asc,userid asc,createdate asc
/* order by userid asc,createdate asc,poolinstance asc,workername asc
as per required for userstats homepage summarisation */
static double cmp_userstats(K_ITEM *a, K_ITEM *b)
{
double c = (double)strcmp(DATA_USERSTATS(a)->poolinstance,
DATA_USERSTATS(b)->poolinstance);
if (c == 0) {
c = (double)(DATA_USERSTATS(a)->userid) -
double c = (double)(DATA_USERSTATS(a)->userid) -
(double)(DATA_USERSTATS(b)->userid);
if (c == 0) {
c = tvdiff(&(DATA_USERSTATS(a)->createdate),
&(DATA_USERSTATS(b)->createdate));
if (c == 0) {
c = (double)strcmp(DATA_USERSTATS(a)->poolinstance,
DATA_USERSTATS(b)->poolinstance);
if (c == 0) {
c = (double)strcmp(DATA_USERSTATS(a)->workername,
DATA_USERSTATS(b)->workername);
}
}
}
return c;
}
/* order by userid asc,workername asc
temporary tree for summing userstats when sending user homepage info */
static double cmp_userstats_workername(K_ITEM *a, K_ITEM *b)
{
double c = (double)(DATA_USERSTATS(a)->userid) -
(double)(DATA_USERSTATS(b)->userid);
if (c == 0) {
c = (double)strcmp(DATA_USERSTATS(a)->workername,
DATA_USERSTATS(b)->workername);
}
return c;
}
static bool userstats_add(char *poolinstance, char *elapsed, char *username,
char *hashrate, char *hashrate5m, char *hashrate1hr,
char *hashrate24hr, tv_t *now, char *by, char *code,
char *inet)
char *workername, char *hashrate, char *hashrate5m,
char *hashrate1hr, char *hashrate24hr, bool eos,
tv_t *now, char *by, char *code, char *inet)
{
K_ITEM *s_item, *u_item;
K_ITEM *us_item, *u_item, *us_match, *us_next;
USERSTATS *row;
tv_t createdate;
LOGDEBUG("%s(): add", __func__);
K_WLOCK(userstats_list);
s_item = k_unlink_head(userstats_list);
us_item = k_unlink_head(userstats_list);
K_WUNLOCK(userstats_list);
row = DATA_USERSTATS(s_item);
row = DATA_USERSTATS(us_item);
STRNCPY(row->poolinstance, poolinstance);
TXT_TO_BIGINT("elapsed", elapsed, row->elapsed);
@ -3166,18 +3237,71 @@ static bool userstats_add(char *poolinstance, char *elapsed, char *username,
if (!u_item)
return false;
row->userid = DATA_USERS(u_item)->userid;
TXT_TO_STR("workername", workername, row->workername);
TXT_TO_DOUBLE("hashrate", hashrate, row->hashrate);
TXT_TO_DOUBLE("hashrate5m", hashrate5m, row->hashrate5m);
TXT_TO_DOUBLE("hashrate1hr", hashrate1hr, row->hashrate1hr);
TXT_TO_DOUBLE("hashrate24hr", hashrate24hr, row->hashrate24hr);
SIMPLEDATEINIT(row, now, by, code, inet);
SIMPLEDATETRANSFER(row);
if (eos) {
// Save it for end processing
createdate.tv_sec = row->createdate.tv_sec;
createdate.tv_usec = row->createdate.tv_usec;
}
/* group at full key: userid,createdate,poolinstance,workername
i.e. ignore instance and group together down at workername */
us_match = userstats_eos_store->head;
while (us_match && cmp_userstats(us_item, us_match) != 0.0)
us_match = us_match->next;
if (us_match) {
DATA_USERSTATS(us_match)->hashrate += row->hashrate;
DATA_USERSTATS(us_match)->hashrate5m += row->hashrate5m;
DATA_USERSTATS(us_match)->hashrate1hr += row->hashrate1hr;
DATA_USERSTATS(us_match)->hashrate24hr += row->hashrate24hr;
// Minimum elapsed of the data set
if (DATA_USERSTATS(us_match)->elapsed > row->elapsed)
DATA_USERSTATS(us_match)->elapsed = row->elapsed;
// Unused
K_WLOCK(userstats_list);
userstats_root = add_to_ktree(userstats_root, s_item, cmp_userstats);
k_add_head(userstats_store, s_item);
k_add_head(userstats_list, us_item);
K_WUNLOCK(userstats_list);
} else {
// New worker
K_WLOCK(userstats_list);
k_add_head(userstats_eos_store, us_item);
K_WUNLOCK(userstats_list);
}
if (eos) {
K_WLOCK(userstats_list);
us_next = userstats_eos_store->head;
while (us_next) {
if (tvdiff(&DATA_USERSTATS(us_next)->createdate, &createdate) != 0.0) {
char date_buf[DATE_BUFSIZ];
LOGERR("userstats != eos '%s' discarded: %s/%"PRId64"/%s",
tv_to_buf(&createdate, date_buf, DATE_BUFSIZ),
DATA_USERSTATS(us_next)->poolinstance,
DATA_USERSTATS(us_next)->userid,
DATA_USERSTATS(us_next)->workername);
us_next = us_next->next;
} else {
us_match = us_next;
us_next = us_match->next;
k_unlink_item(userstats_eos_store, us_match);
userstats_root = add_to_ktree(userstats_root, us_match,
cmp_userstats);
k_add_head(userstats_store, us_match);
}
}
// Discard them
if (userstats_eos_store->count > 0)
k_list_transfer_to_head(userstats_eos_store, userstats_list);
K_WUNLOCK(userstats_list);
}
return true;
}
@ -3338,6 +3462,7 @@ static void setup_data()
userstats_list = k_new_list("UserStats", sizeof(USERSTATS), ALLOC_USERSTATS, LIMIT_USERSTATS, true);
userstats_store = k_new_store(userstats_list);
userstats_eos_store = k_new_store(userstats_list);
userstats_root = new_ktree();
userstats_list->dsp_func = dsp_userstats;
@ -3531,9 +3656,10 @@ static char *cmd_userstats(char *cmd, __maybe_unused char *id, tv_t *now, char *
// log to logfile
K_ITEM *i_poolinstance, *i_elapsed, *i_username, *i_hashrate;
K_ITEM *i_hashrate5m, *i_hashrate1hr, *i_hashrate24hr;
bool ok = false;
K_ITEM *i_poolinstance, *i_elapsed, *i_username, *i_workername;
K_ITEM *i_hashrate, *i_hashrate5m, *i_hashrate1hr, *i_hashrate24hr;
K_ITEM *i_eos;
bool ok = false, eos;
LOGDEBUG("%s(): cmd '%s'", __func__, cmd);
@ -3549,6 +3675,10 @@ static char *cmd_userstats(char *cmd, __maybe_unused char *id, tv_t *now, char *
if (!i_username)
return strdup(reply);
i_workername = require_name("workername", 1, NULL, reply, siz);
if (!i_workername)
return strdup(reply);
i_hashrate = require_name("hashrate", 1, NULL, reply, siz);
if (!i_hashrate)
return strdup(reply);
@ -3565,14 +3695,21 @@ static char *cmd_userstats(char *cmd, __maybe_unused char *id, tv_t *now, char *
if (!i_hashrate24hr)
return strdup(reply);
i_eos = require_name("eos", 1, NULL, reply, siz);
if (!i_eos)
return strdup(reply);
eos = (strcasecmp(DATA_TRANSFER(i_eos)->data, TRUE_STR) == 0);
ok = userstats_add(DATA_TRANSFER(i_poolinstance)->data,
DATA_TRANSFER(i_elapsed)->data,
DATA_TRANSFER(i_username)->data,
DATA_TRANSFER(i_workername)->data,
DATA_TRANSFER(i_hashrate)->data,
DATA_TRANSFER(i_hashrate5m)->data,
DATA_TRANSFER(i_hashrate1hr)->data,
DATA_TRANSFER(i_hashrate24hr)->data,
now, by, code, inet);
eos, now, by, code, inet);
if (!ok) {
LOGDEBUG("%s.failed.DATA", id);
@ -3974,10 +4111,13 @@ static char *cmd_homepage(char *cmd, char *id, __maybe_unused tv_t *now, __maybe
__maybe_unused char *code, __maybe_unused char *inet)
{
K_ITEM *i_username, *u_item, *p_item, *us_item, look;
K_TREE_CTX ctx[1];
USERSTATS userstats;
double u_hashrate5m, u_hashrate1hr;
char reply[1024], tmp[1024], *buf;
USERSTATS userstats;
int64_t u_elapsed;
K_TREE_CTX ctx[1], wctx[1];
size_t len, off;
bool has_uhr;
LOGDEBUG("%s(): cmd '%s'", __func__, cmd);
@ -4047,27 +4187,49 @@ static char *cmd_homepage(char *cmd, char *id, __maybe_unused tv_t *now, __maybe
if (i_username)
u_item = find_users(DATA_TRANSFER(i_username)->data);
us_item = NULL;
has_uhr = false;
if (p_item && u_item) {
STRNCPY(userstats.poolinstance,
DATA_POOLSTATS(p_item)->poolinstance);
K_TREE *userstats_workername_root = new_ktree();
u_hashrate5m = u_hashrate1hr = 0.0;
u_elapsed = -1;
// find last stored userid record
userstats.userid = DATA_USERS(u_item)->userid;
userstats.createdate.tv_sec = date_eot.tv_sec;
userstats.createdate.tv_usec = date_eot.tv_usec;
// find/cmp doesn't get to here
STRNCPY(userstats.poolinstance, "");
STRNCPY(userstats.workername, "");
look.data = (void *)(&userstats);
us_item = find_before_in_ktree(userstats_root, &look, cmp_userstats, ctx);
}
if (us_item) {
double_to_buf(DATA_USERSTATS(us_item)->hashrate5m, reply, sizeof(reply));
while (us_item &&
DATA_USERSTATS(us_item)->userid == userstats.userid &&
tvdiff(now, &(DATA_USERSTATS(us_item)->createdate)) < USERSTATS_PER_S) {
if (!find_in_ktree(userstats_workername_root, us_item, cmp_userstats_workername, wctx)) {
u_hashrate5m += DATA_USERSTATS(us_item)->hashrate5m;
u_hashrate1hr += DATA_USERSTATS(us_item)->hashrate1hr;
if (u_elapsed == -1 ||
u_elapsed > DATA_USERSTATS(us_item)->elapsed)
u_elapsed = DATA_USERSTATS(us_item)->elapsed;
has_uhr = true;
userstats_workername_root = add_to_ktree(userstats_workername_root,
us_item,
cmp_userstats_workername);
}
us_item = prev_in_ktree(ctx);
}
userstats_workername_root = free_ktree(userstats_workername_root, NULL);
}
if (has_uhr) {
double_to_buf(u_hashrate5m, reply, sizeof(reply));
snprintf(tmp, sizeof(tmp), "u_hashrate5m=%s%c", reply, FLDSEP);
APPEND_REALLOC(buf, off, len, tmp);
double_to_buf(DATA_USERSTATS(us_item)->hashrate1hr, reply, sizeof(reply));
double_to_buf(u_hashrate1hr, reply, sizeof(reply));
snprintf(tmp, sizeof(tmp), "u_hashrate1hr=%s%c", reply, FLDSEP);
APPEND_REALLOC(buf, off, len, tmp);
bigint_to_buf(DATA_USERSTATS(us_item)->elapsed, reply, sizeof(reply));
bigint_to_buf(u_elapsed, reply, sizeof(reply));
snprintf(tmp, sizeof(tmp), "u_elapsed=%s", reply);
APPEND_REALLOC(buf, off, len, tmp);
} else {
@ -4119,6 +4281,7 @@ enum cmd_values {
CMD_CHKPASS,
CMD_POOLSTAT,
CMD_USERSTAT,
// CMD_BLOCK,
CMD_NEWID,
CMD_PAYMENTS,
CMD_HOMEPAGE,
@ -4126,6 +4289,7 @@ enum cmd_values {
CMD_END
};
// TODO: limit access
#define ACCESS_POOL "p"
#define ACCESS_SYSTEM "s"
#define ACCESS_WEB "w"
@ -4138,7 +4302,7 @@ static struct CMDS {
char *access;
} cmds[] = {
{ CMD_SHUTDOWN, "shutdown", NULL, ACCESS_SYSTEM },
{ CMD_PING, "ping", NULL, ACCESS_SYSTEM ACCESS_WEB },
{ CMD_PING, "ping", NULL, ACCESS_SYSTEM ACCESS_POOL ACCESS_WEB },
{ CMD_SHARELOG, "workinfo", cmd_sharelog, ACCESS_POOL },
{ CMD_SHARELOG, "shares", cmd_sharelog, ACCESS_POOL },
{ CMD_SHARELOG, "shareerrors", cmd_sharelog, ACCESS_POOL },
@ -4147,6 +4311,7 @@ static struct CMDS {
{ CMD_CHKPASS, "chkpass", cmd_chkpass, ACCESS_WEB },
{ CMD_POOLSTAT, "poolstats", cmd_poolstats, ACCESS_POOL },
{ CMD_USERSTAT, "userstats", cmd_userstats, ACCESS_POOL },
// TODO { CMD_BLOCK, "block", cmd_block, ACCESS_POOL },
{ CMD_NEWID, "newid", cmd_newid, ACCESS_SYSTEM },
{ CMD_PAYMENTS, "payments", cmd_payments, ACCESS_WEB },
{ CMD_HOMEPAGE, "homepage", cmd_homepage, ACCESS_WEB },
@ -4168,7 +4333,7 @@ static enum cmd_values breakdown(char *buf, int *which_cmds, char *cmd, char *id
if (!idptr || !*idptr) {
STRNCPYSIZ(cmd, cmdptr, CMD_SIZ);
STRNCPYSIZ(id, cmdptr, ID_SIZ);
LOGINFO("Listener received invalid message: '%s'", buf);
LOGERR("Listener received invalid message: '%s'", buf);
free(cmdptr);
return CMD_REPLY;
}
@ -4186,7 +4351,7 @@ static enum cmd_values breakdown(char *buf, int *which_cmds, char *cmd, char *id
}
if (cmds[*which_cmds].cmd_val == CMD_END) {
LOGINFO("Listener received unknown command: '%s'", buf);
LOGERR("Listener received unknown command: '%s'", buf);
free(cmdptr);
return CMD_REPLY;
}
@ -4198,12 +4363,14 @@ static enum cmd_values breakdown(char *buf, int *which_cmds, char *cmd, char *id
void *json_iter;
const char *json_key, *json_str;
json_t *json_value;
int json_typ;
size_t siz;
bool ok;
next += JSON_TRANSFER_LEN;
json_data = json_loads(next, JSON_DISABLE_EOF_CHECK, &err_val);
if (!json_data) {
LOGINFO("Json decode error from command: '%s'", cmd);
LOGERR("Json decode error from command: '%s'", cmd);
free(cmdptr);
return CMD_REPLY;
}
@ -4212,14 +4379,11 @@ static enum cmd_values breakdown(char *buf, int *which_cmds, char *cmd, char *id
while (json_iter) {
json_key = json_object_iter_key(json_iter);
json_value = json_object_iter_value(json_iter);
if (json_is_string(json_value) ||
json_is_integer(json_value) ||
json_is_real(json_value) ||
json_is_array(json_value)) {
item = k_unlink_head(transfer_list);
STRNCPY(DATA_TRANSFER(item)->name, json_key);
if (json_is_string(json_value)) {
ok = true;
json_typ = json_typeof(json_value);
switch (json_typ) {
case JSON_STRING:
json_str = json_string_value(json_value);
siz = strlen(json_str);
if (siz >= sizeof(DATA_TRANSFER(item)->value))
@ -4228,19 +4392,31 @@ static enum cmd_values breakdown(char *buf, int *which_cmds, char *cmd, char *id
STRNCPY(DATA_TRANSFER(item)->value, json_str);
DATA_TRANSFER(item)->data = DATA_TRANSFER(item)->value;
}
} else if (json_is_integer(json_value)) {
break;
case JSON_REAL:
snprintf(DATA_TRANSFER(item)->value,
sizeof(DATA_TRANSFER(item)->value),
"%f", json_real_value(json_value));
DATA_TRANSFER(item)->data = DATA_TRANSFER(item)->value;
break;
case JSON_INTEGER:
snprintf(DATA_TRANSFER(item)->value,
sizeof(DATA_TRANSFER(item)->value),
"%"PRId64,
(int64_t)json_integer_value(json_value));
DATA_TRANSFER(item)->data = DATA_TRANSFER(item)->value;
} else if (json_is_real(json_value)) {
break;
case JSON_TRUE:
case JSON_FALSE:
snprintf(DATA_TRANSFER(item)->value,
sizeof(DATA_TRANSFER(item)->value),
"%f", json_real_value(json_value));
"%s", (json_typ == JSON_TRUE) ?
TRUE_STR : FALSE_STR);
DATA_TRANSFER(item)->data = DATA_TRANSFER(item)->value;
} else {
/* Array - only one level array of strings for now (merkletree)
break;
case JSON_ARRAY:
{
/* only one level array of strings for now (merkletree)
* ignore other data */
size_t i, len, off, count = json_array_size(json_value);
json_t *json_element;
@ -4264,11 +4440,23 @@ static enum cmd_values breakdown(char *buf, int *which_cmds, char *cmd, char *id
}
APPEND_REALLOC(DATA_TRANSFER(item)->data,
off, len, json_str);
} else
LOGERR("%s() unhandled json type %d in array %s"
" in cmd %s", __func__,
json_typ, json_key, cmd);
}
}
break;
default:
LOGERR("%s() unhandled json type %d in cmd %s",
__func__, json_typ, cmd);
ok = false;
break;
}
if (find_in_ktree(transfer_root, item, cmp_transfer, ctx)) {
if (ok)
STRNCPY(DATA_TRANSFER(item)->name, json_key);
if (!ok || find_in_ktree(transfer_root, item, cmp_transfer, ctx)) {
if (DATA_TRANSFER(item)->data != DATA_TRANSFER(item)->value)
free(DATA_TRANSFER(item)->data);
k_add_head(transfer_list, item);
@ -4276,7 +4464,6 @@ static enum cmd_values breakdown(char *buf, int *which_cmds, char *cmd, char *id
transfer_root = add_to_ktree(transfer_root, item, cmp_transfer);
k_add_head(transfer_store, item);
}
}
json_iter = json_object_iter_next(json_data, json_iter);
}
K_WUNLOCK(transfer_list);
@ -4381,7 +4568,6 @@ static void *listener(void *arg)
ans = cmds[which_cmds].func(cmd, id, &now, (char *)"code",
(char *)__func__,
(char *)"127.0.0.1");
siz = strlen(ans) + strlen(id) + 32;
rep = malloc(siz);
snprintf(rep, siz, "%s.%ld.%s", id, now.tv_sec, ans);

Loading…
Cancel
Save