Browse Source

ckdb - standardise messages and replies

master
kanoi 11 years ago
parent
commit
532405b300
  1. 230
      src/ckdb.c

230
src/ckdb.c

@ -43,6 +43,7 @@ static char *db_user;
static char *db_pass;
// size limit on the command string
#define CMD_SIZ 31
#define ID_SIZ 31
#define TXT_BIG 256
@ -403,16 +404,10 @@ static const char *hashpatt = "^[A-Fa-f0-9]*$";
#define JSON_TRANSFER "json="
#define JSON_TRANSFER_LEN (sizeof(JSON_TRANSFER)-1)
// JSON Methods
// Methods for sharelog (common function for all 3)
#define METHOD_WORKINFO "workinfo"
#define METHOD_SHARES "shares"
#define METHOD_SHAREERRORS "shareerror"
#define METHOD_AUTH "authorise"
// LOGFILE codes - should be in libckpool.h ... with the file logging code
#define CODE_WORKINFO "W"
#define CODE_SHARES "S"
#define CODE_SHAREERRORSS "E"
// TRANSFER
#define NAME_SIZE 63
@ -833,7 +828,6 @@ static K_STORE *auths_store;
typedef struct poolstats {
char poolinstance[TXT_BIG+1];
tv_t when;
int32_t users;
int32_t workers;
double hashrate;
@ -851,6 +845,26 @@ static K_TREE *poolstats_root;
static K_LIST *poolstats_list;
static K_STORE *poolstats_store;
// USERSTATS
// Pool sends each user (staggered) once per 10m
typedef struct userstats {
char poolinstance[TXT_BIG+1];
int64_t userid;
double hashrate;
double hashrate5m;
double hashrate1hr;
double hashrate24hr;
SIMPLEDATECONTROLFIELDS;
} USERSTATS;
#define ALLOC_USERSTATS 1000
#define LIMIT_USERSTATS 0
#define DATA_USERSTATS(_item) ((USERSTATS *)(_item->data))
static K_TREE *userstats_root;
static K_LIST *userstats_list;
static K_STORE *userstats_store;
static void setnow(tv_t *now)
{
ts_t spec;
@ -3041,10 +3055,14 @@ static void setup_data()
poolstats_store = k_new_store(poolstats_list);
poolstats_root = new_ktree();
userstats_list = k_new_list("UserStats", sizeof(USERSTATS), ALLOC_USERSTATS, LIMIT_USERSTATS, true);
userstats_store = k_new_store(userstats_list);
userstats_root = new_ktree();
getdata();
}
static char *cmd_adduser(char *id, tv_t *now, char *by, char *code, char *inet)
static char *cmd_adduser(char *cmd, char *id, tv_t *now, char *by, char *code, char *inet)
{
char reply[1024] = "";
size_t siz = sizeof(reply);
@ -3053,6 +3071,8 @@ static char *cmd_adduser(char *id, tv_t *now, char *by, char *code, char *inet)
PGconn *conn;
bool ok;
LOGDEBUG("%s(): cmd '%s'", __func__, cmd);
i_username = require_name("username", 3, (char *)userpatt, reply, siz);
if (!i_username)
return strdup(reply);
@ -3073,17 +3093,15 @@ static char *cmd_adduser(char *id, tv_t *now, char *by, char *code, char *inet)
PQfinish(conn);
if (!ok) {
STRNCPY(reply, "failed.DBE");
return strdup(reply);
LOGDEBUG("%s.failed.DBE", id);
return strdup("failed.DBE");
}
LOGDEBUG("%s.added.%s", id, DATA_TRANSFER(i_username)->data);
snprintf(reply, siz, "added.%s", DATA_TRANSFER(i_username)->data);
LOGDEBUG("%s.ok.added %s", id, DATA_TRANSFER(i_username)->data);
snprintf(reply, siz, "ok.added %s", DATA_TRANSFER(i_username)->data);
return strdup(reply);
}
static char *cmd_chkpass(char *id, __maybe_unused tv_t *now, __maybe_unused char *by,
static char *cmd_chkpass(char *cmd, char *id, __maybe_unused tv_t *now, __maybe_unused char *by,
__maybe_unused char *code, __maybe_unused char *inet)
{
K_ITEM *i_username, *i_passwordhash, *u_item;
@ -3091,6 +3109,8 @@ static char *cmd_chkpass(char *id, __maybe_unused tv_t *now, __maybe_unused char
size_t siz = sizeof(reply);
bool ok;
LOGDEBUG("%s(): cmd '%s'", __func__, cmd);
i_username = require_name("username", 3, (char *)userpatt, reply, siz);
if (!i_username)
return strdup(reply);
@ -3110,14 +3130,15 @@ static char *cmd_chkpass(char *id, __maybe_unused tv_t *now, __maybe_unused char
ok = false;
}
if (!ok)
return strdup("bad");
LOGDEBUG("%s.login.%s", id, DATA_TRANSFER(i_username)->data);
return strdup("ok");
if (!ok) {
LOGDEBUG("%s.failed.%s", id, DATA_TRANSFER(i_username)->data);
return strdup("failed.");
}
LOGDEBUG("%s.ok.%s", id, DATA_TRANSFER(i_username)->data);
return strdup("ok.");
}
static char *cmd_poolstats(__maybe_unused char *id, tv_t *now, char *by, char *code, char *inet)
static char *cmd_poolstats(char *cmd, __maybe_unused char *id, tv_t *now, char *by, char *code, char *inet)
{
char reply[1024] = "";
size_t siz = sizeof(reply);
@ -3133,6 +3154,8 @@ static char *cmd_poolstats(__maybe_unused char *id, tv_t *now, char *by, char *c
POOLSTATS row;
bool ok = false;
LOGDEBUG("%s(): cmd '%s'", __func__, cmd);
i_poolinstance = require_name("poolinstance", 1, NULL, reply, siz);
if (!i_poolinstance)
return strdup(reply);
@ -3191,16 +3214,15 @@ static char *cmd_poolstats(__maybe_unused char *id, tv_t *now, char *by, char *c
PQfinish(conn);
if (!ok) {
STRNCPY(reply, "bad.DBE");
return strdup(reply);
LOGDEBUG("%s.failed.DBE", id);
return strdup("failed.DBE");
}
LOGDEBUG("%s.added.ok", id);
snprintf(reply, siz, "added.ok");
LOGDEBUG("%s.ok.", id);
snprintf(reply, siz, "ok.");
return strdup(reply);
}
static char *cmd_newid(__maybe_unused char *id, tv_t *now, char *by, char *code, char *inet)
static char *cmd_newid(char *cmd, char *id, tv_t *now, char *by, char *code, char *inet)
{
char reply[1024] = "";
size_t siz = sizeof(reply);
@ -3215,7 +3237,7 @@ static char *cmd_newid(__maybe_unused char *id, tv_t *now, char *by, char *code,
char *ins;
int n;
LOGDEBUG("%s(): add", __func__);
LOGDEBUG("%s(): cmd '%s'", __func__, cmd);
i_idname = require_name("idname", 3, (char *)idpatt, reply, siz);
if (!i_idname)
@ -3265,16 +3287,16 @@ foil:
K_WUNLOCK(idcontrol_list);
if (!ok) {
snprintf(reply, siz, "failed.DBE");
return strdup(reply);
LOGDEBUG("%s.failed.DBE", id);
return strdup("failed.DBE");
}
LOGDEBUG("added.%s", DATA_TRANSFER(i_idname)->data);
snprintf(reply, siz, "added.%s", DATA_TRANSFER(i_idname)->data);
LOGDEBUG("%s.ok.added %s %"PRId64, id, DATA_TRANSFER(i_idname)->data, row->lastid);
snprintf(reply, siz, "ok.added %s %"PRId64,
DATA_TRANSFER(i_idname)->data, row->lastid);
return strdup(reply);
}
static char *cmd_payments(char *id, __maybe_unused tv_t *now, __maybe_unused char *by,
static char *cmd_payments(char *cmd, char *id, __maybe_unused tv_t *now, __maybe_unused char *by,
__maybe_unused char *code, __maybe_unused char *inet)
{
K_ITEM *i_username, *look, *u_item, *p_item;
@ -3287,6 +3309,8 @@ static char *cmd_payments(char *id, __maybe_unused tv_t *now, __maybe_unused cha
size_t len, off;
int rows;
LOGDEBUG("%s(): cmd '%s'", __func__, cmd);
i_username = require_name("username", 3, (char *)userpatt, reply, siz);
if (!i_username)
return strdup(reply);
@ -3333,24 +3357,21 @@ static char *cmd_payments(char *id, __maybe_unused tv_t *now, __maybe_unused cha
k_add_head(payments_list, look);
K_WUNLOCK(payments_list);
LOGDEBUG("%s.payments.%s", id, DATA_TRANSFER(i_username)->data);
LOGDEBUG("%s.ok.%s", id, DATA_TRANSFER(i_username)->data);
return buf;
}
static char *cmd_sharelog(__maybe_unused char *id, tv_t *now, char *by, char *code, char *inet)
static char *cmd_sharelog(char *cmd, char *id, tv_t *now, char *by, char *code, char *inet)
{
char reply[1024] = "";
size_t siz = sizeof(reply);
K_ITEM *i_method;
PGconn *conn;
// log to logfile with processing success/failure code
i_method = require_name("method", 1, NULL, reply, siz);
if (!i_method)
return strdup(reply);
LOGDEBUG("%s(): cmd '%s'", __func__, cmd);
if (strcasecmp(DATA_TRANSFER(i_method)->data, METHOD_WORKINFO) == 0) {
if (strcasecmp(cmd, METHOD_WORKINFO) == 0) {
K_ITEM *i_workinfoid, *i_poolinstance, *i_transactiontree, *i_merklehash;
K_ITEM *i_prevhash, *i_coinbase1, *i_coinbase2, *i_version, *i_bits;
K_ITEM *i_ntime, *i_reward;
@ -3416,14 +3437,13 @@ static char *cmd_sharelog(__maybe_unused char *id, tv_t *now, char *by, char *co
PQfinish(conn);
if (workinfoid == -1) {
STRNCPY(reply, "bad.DBE");
return strdup(reply);
LOGDEBUG("%s.failed.DBE", id);
return strdup("failed.DBE");
}
LOGDEBUG("added.%s.%"PRId64, DATA_TRANSFER(i_method)->data, workinfoid);
snprintf(reply, siz, "added.%"PRId64, workinfoid);
LOGDEBUG("%s.ok.added %"PRId64, id, workinfoid);
snprintf(reply, siz, "ok.%"PRId64, workinfoid);
return strdup(reply);
} else if (strcasecmp(DATA_TRANSFER(i_method)->data, METHOD_SHARES) == 0) {
} else if (strcasecmp(cmd, METHOD_SHARES) == 0) {
K_ITEM *i_workinfoid, *i_username, *i_workername, *i_clientid, *i_enonce1;
K_ITEM *i_nonce2, *i_nonce, *i_diff, *i_sdiff, *i_secondaryuserid;
bool ok;
@ -3479,16 +3499,15 @@ static char *cmd_sharelog(__maybe_unused char *id, tv_t *now, char *by, char *co
DATA_TRANSFER(i_sdiff)->data,
DATA_TRANSFER(i_secondaryuserid)->data,
now, by, code, inet);
if (!ok) {
STRNCPY(reply, "bad.DATA");
return strdup(reply);
LOGDEBUG("%s.failed.DATA", id);
return strdup("failed.DATA");
}
LOGDEBUG("added.%s.%s", DATA_TRANSFER(i_method)->data,
DATA_TRANSFER(i_nonce)->data);
snprintf(reply, siz, "added.%s", DATA_TRANSFER(i_nonce)->data);
LOGDEBUG("%s.ok.added %s", id, DATA_TRANSFER(i_nonce)->data);
snprintf(reply, siz, "ok,added %s", DATA_TRANSFER(i_nonce)->data);
return strdup(reply);
} else if (strcasecmp(DATA_TRANSFER(i_method)->data, METHOD_SHAREERRORS) == 0) {
} else if (strcasecmp(cmd, METHOD_SHAREERRORS) == 0) {
K_ITEM *i_workinfoid, *i_username, *i_workername, *i_clientid, *i_errn;
K_ITEM *i_error, *i_secondaryuserid;
bool ok;
@ -3530,35 +3549,27 @@ static char *cmd_sharelog(__maybe_unused char *id, tv_t *now, char *by, char *co
DATA_TRANSFER(i_secondaryuserid)->data,
now, by, code, inet);
if (!ok) {
STRNCPY(reply, "bad.DATA");
return strdup(reply);
LOGDEBUG("%s.failed.DATA", id);
return strdup("failed.DATA");
}
LOGDEBUG("added.%s.%s", DATA_TRANSFER(i_method)->data,
DATA_TRANSFER(i_username)->data);
snprintf(reply, siz, "added.%s", DATA_TRANSFER(i_username)->data);
return strdup(reply);
LOGDEBUG("%s.ok.added %s", id, DATA_TRANSFER(i_username)->data);
snprintf(reply, siz, "ok.added %s", DATA_TRANSFER(i_username)->data);
}
STRNCPY(reply, "bad.method");
return strdup(reply);
LOGDEBUG("%s.bad.cmd %s", cmd);
return strdup("bad.cmd");
}
static char *cmd_auth(__maybe_unused char *id, tv_t *now, char *by, char *code, char *inet)
static char *cmd_auth(char *cmd, char *id, tv_t *now, char *by, char *code, char *inet)
{
char reply[1024] = "";
size_t siz = sizeof(reply);
K_ITEM *i_method;
PGconn *conn;
i_method = require_name("method", 1, NULL, reply, siz);
if (!i_method)
return strdup(reply);
if (strcasecmp(DATA_TRANSFER(i_method)->data, METHOD_AUTH) == 0) {
K_ITEM *i_username, *i_workername, *i_clientid, *i_enonce1, *i_useragent;
char *secuserid;
LOGDEBUG("%s(): cmd '%s'", __func__, cmd);
i_username = require_name("username", 1, NULL, reply, siz);
if (!i_username)
return strdup(reply);
@ -3589,16 +3600,12 @@ static char *cmd_auth(__maybe_unused char *id, tv_t *now, char *by, char *code,
PQfinish(conn);
if (!secuserid) {
STRNCPY(reply, "bad.DBE");
return strdup(reply);
}
LOGDEBUG("added.%s.%s", DATA_TRANSFER(i_method)->data, secuserid);
snprintf(reply, siz, "added.%s", secuserid);
return strdup(reply);
LOGDEBUG("%s.bad.DBE", id);
return strdup("bad.DBE");
}
STRNCPY(reply, "bad.method");
LOGDEBUG("%s.ok.added, %s", id, secuserid);
snprintf(reply, siz, "ok.%s", secuserid);
return strdup(reply);
}
@ -3607,7 +3614,7 @@ enum cmd_values {
CMD_REPLY, // Means something was wrong - send back reply
CMD_SHUTDOWN,
CMD_PING,
CMD_LOGSHARE,
CMD_SHARELOG,
CMD_AUTH,
CMD_ADDUSER,
CMD_CHKPASS,
@ -3625,44 +3632,49 @@ enum cmd_values {
static struct CMDS {
enum cmd_values cmd_val;
char *cmd_str;
char *(*func)(char *, tv_t *, char *, char *, char *);
char *(*func)(char *, char *, tv_t *, char *, char *, char *);
char *access;
} cmds[] = {
{ CMD_SHUTDOWN, "shutdown", NULL, ACCESS_SYSTEM },
{ CMD_PING, "ping", NULL, ACCESS_SYSTEM ACCESS_WEB },
// Workinfo, Shares and Shareerrors
{ CMD_LOGSHARE, "sharelog", cmd_sharelog, ACCESS_POOL },
{ CMD_SHARELOG, "workinfo", cmd_sharelog, ACCESS_POOL },
{ CMD_SHARELOG, "shares", cmd_sharelog, ACCESS_POOL },
{ CMD_SHARELOG, "shareerrors", cmd_sharelog, ACCESS_POOL },
{ CMD_AUTH, "authorise", cmd_auth, ACCESS_POOL },
{ CMD_ADDUSER, "adduser", cmd_adduser, ACCESS_WEB },
{ CMD_CHKPASS, "chkpass", cmd_chkpass, ACCESS_WEB },
{ CMD_POOLSTAT, "poolstats", cmd_poolstats, ACCESS_WEB },
{ CMD_POOLSTAT, "poolstats", cmd_poolstats, ACCESS_POOL },
// { CMD_USERSTAT, "userstats", cmd_userstats, ACCESS_POOL },
{ CMD_NEWID, "newid", cmd_newid, ACCESS_SYSTEM },
{ CMD_PAYMENTS, "payments", cmd_payments, ACCESS_WEB },
{ CMD_END, NULL, NULL, NULL }
};
// TODO: size limits?
static enum cmd_values breakdown(char *buf, int *which_cmds, char *id)
static enum cmd_values breakdown(char *buf, int *which_cmds, char *cmd, char *id)
{
K_TREE_CTX ctx[1];
K_ITEM *item;
char *copy, *cmd, *data, *next, *eq;
char *cmdptr, *idptr, *data, *next, *eq;
*which_cmds = CMD_UNSET;
copy = strdup(buf);
cmd = strchr(copy, '.');
if (!cmd || !*cmd) {
STRNCPYSIZ(id, copy, ID_SIZ);
*cmd = *id = '\0';
cmdptr = strdup(buf);
idptr = strchr(cmdptr, '.');
if (!idptr || !*idptr) {
STRNCPYSIZ(cmd, cmdptr, CMD_SIZ);
STRNCPYSIZ(id, cmdptr, ID_SIZ);
LOGINFO("Listener received invalid message: '%s'", buf);
free(copy);
free(cmdptr);
return CMD_REPLY;
}
*(cmd++) = '\0';
STRNCPYSIZ(id, copy, ID_SIZ);
data = strchr(cmd, '.');
*(idptr++) = '\0';
STRNCPYSIZ(cmd, cmdptr, CMD_SIZ);
data = strchr(idptr, '.');
if (data)
*(data++) = '\0';
STRNCPYSIZ(id, idptr, ID_SIZ);
for (*which_cmds = 0; cmds[*which_cmds].cmd_val != CMD_END; (*which_cmds)++) {
if (strcasecmp(cmd, cmds[*which_cmds].cmd_str) == 0)
@ -3671,7 +3683,7 @@ static enum cmd_values breakdown(char *buf, int *which_cmds, char *id)
if (cmds[*which_cmds].cmd_val == CMD_END) {
LOGINFO("Listener received unknown command: '%s'", buf);
free(copy);
free(cmdptr);
return CMD_REPLY;
}
@ -3688,7 +3700,7 @@ static enum cmd_values breakdown(char *buf, int *which_cmds, char *id)
json_data = json_loads(next, JSON_DISABLE_EOF_CHECK, &err_val);
if (!json_data) {
LOGINFO("Json decode error from command: '%s'", cmd);
free(copy);
free(cmdptr);
return CMD_REPLY;
}
json_iter = json_object_iter(json_data);
@ -3796,7 +3808,7 @@ static enum cmd_values breakdown(char *buf, int *which_cmds, char *id)
K_WUNLOCK(transfer_list);
}
free(copy);
free(cmdptr);
return cmds[*which_cmds].cmd_val;
}
@ -3806,8 +3818,8 @@ static void *listener(void *arg)
proc_instance_t *pi = (proc_instance_t *)arg;
unixsock_t *us = &pi->us;
char *end, *ans, *rep, *buf = NULL;
char id[ID_SIZ+1], reply[1024+1];
enum cmd_values cmd;
char cmd[CMD_SIZ+1], id[ID_SIZ+1], reply[1024+1];
enum cmd_values cmdnum;
int sockd, which_cmds;
K_ITEM *item;
size_t siz;
@ -3825,7 +3837,7 @@ static void *listener(void *arg)
break;
}
cmd = CMD_UNSET;
cmdnum = CMD_UNSET;
buf = recv_unix_msg(sockd);
// Once we've read the message
@ -3843,25 +3855,25 @@ static void *listener(void *arg)
else
LOGWARNING("Empty message in listener");
} else {
cmd = breakdown(buf, &which_cmds, id);
switch (cmd) {
cmdnum = breakdown(buf, &which_cmds, cmd, id);
switch (cmdnum) {
case CMD_REPLY:
snprintf(reply, sizeof(reply), "%s.%ld.?", id, now.tv_sec);
snprintf(reply, sizeof(reply), "%s.%ld.?.", id, now.tv_sec);
send_unix_msg(sockd, reply);
break;
case CMD_SHUTDOWN:
LOGWARNING("Listener received shutdown message, terminating ckdb");
snprintf(reply, sizeof(reply), "%s.%d.exiting", id, (int)now.tv_sec);
snprintf(reply, sizeof(reply), "%s.%d.ok.exiting", id, (int)now.tv_sec);
send_unix_msg(sockd, reply);
break;
case CMD_PING:
LOGDEBUG("Listener received ping request");
snprintf(reply, sizeof(reply), "%s.%ld.pong", id, now.tv_sec);
snprintf(reply, sizeof(reply), "%s.%ld.ok.pong", id, now.tv_sec);
send_unix_msg(sockd, reply);
break;
default:
// TODO: optionally get by/code/inet from transfer here instead?
ans = cmds[which_cmds].func(id, &now, (char *)"code",
ans = cmds[which_cmds].func(cmd, id, &now, (char *)"code",
(char *)__func__,
(char *)"127.0.0.1");
@ -3878,7 +3890,7 @@ static void *listener(void *arg)
}
close(sockd);
if (cmd == CMD_SHUTDOWN)
if (cmdnum == CMD_SHUTDOWN)
break;
K_WLOCK(transfer_list);

Loading…
Cancel
Save