Browse Source

ckdb - add functions for talking to bitcoind - and an orphan check to flag orphans automatically

master
kanoi 10 years ago
parent
commit
4749642121
  1. 4
      src/Makefile.am
  2. 72
      src/ckdb.c
  3. 33
      src/ckdb.h
  4. 316
      src/ckdb_btc.c
  5. 41
      src/ckdb_data.c

4
src/Makefile.am

@ -21,7 +21,7 @@ notifier_LDADD = libckpool.la @JANSSON_LIBS@
if WANT_CKDB
bin_PROGRAMS += ckdb
ckdb_SOURCES = ckdb.c ckdb_cmd.c ckdb_data.c ckdb_dbio.c ckdb.h \
klist.c ktree.c klist.h ktree.h
ckdb_SOURCES = ckdb.c ckdb_cmd.c ckdb_data.c ckdb_dbio.c ckdb_btc.c \
ckdb.h klist.c ktree.c klist.h ktree.h
ckdb_LDADD = libckpool.la @JANSSON_LIBS@ @PQ_LIBS@
endif

72
src/ckdb.c

@ -272,8 +272,13 @@ static cklock_t fpm_lock;
static char *first_pool_message;
static sem_t socketer_sem;
char *btc_server = "http://127.0.0.1:8330";
char *btc_auth;
int btc_timeout = 5;
char *by_default = "code";
char *inet_default = "127.0.0.1";
char *id_default = "42";
// LOGQUEUE
K_LIST *logqueue_free;
@ -373,7 +378,7 @@ K_STORE *sharesummary_store;
// BLOCKS block.id.json={...}
const char *blocks_new = "New";
const char *blocks_confirm = "1-Confirm";
const char *blocks_42 = "42-Confirm";
const char *blocks_42 = "Matured";
const char *blocks_orphan = "Orphan";
const char *blocks_unknown = "?Unknown?";
@ -542,7 +547,7 @@ void logmsg(int loglevel, const char *fmt, ...)
free(buf);
}
static void setnow(tv_t *now)
void setnow(tv_t *now)
{
ts_t spec;
spec.tv_sec = 0;
@ -1257,6 +1262,33 @@ static enum cmd_values breakdown(K_TREE **trf_root, K_STORE **trf_store,
return ckdb_cmds[*which_cmds].cmd_val;
}
static void check_blocks()
{
K_TREE_CTX ctx[1];
K_ITEM *b_item;
BLOCKS *blocks;
K_RLOCK(blocks_free);
// Find the oldest block BLOCKS_NEW or BLOCKS_CONFIRM
b_item = first_in_ktree(blocks_root, ctx);
while (b_item) {
DATA_BLOCKS(blocks, b_item);
if (!blocks->ignore &&
CURRENT(&(blocks->expirydate)) &&
(blocks->confirmed[0] == BLOCKS_NEW ||
blocks->confirmed[0] == BLOCKS_CONFIRM))
break;
b_item = next_in_ktree(ctx);
}
K_RUNLOCK(blocks_free);
// None
if (!b_item)
return;
btc_blockstatus(blocks);
}
static void summarise_blocks()
{
K_ITEM *b_item, *b_prev, *wi_item, ss_look, *ss_item;
@ -1597,8 +1629,10 @@ static void *summariser(__maybe_unused void *arg)
while (!everyone_die) {
sleep(5);
if (!everyone_die)
if (!everyone_die) {
check_blocks();
summarise_blocks();
}
sleep(4);
if (!everyone_die)
@ -3027,10 +3061,14 @@ static struct option long_options[] = {
{ "loglevel", required_argument, 0, 'l' },
{ "name", required_argument, 0, 'n' },
{ "dbpass", required_argument, 0, 'p' },
{ "btc-pass", required_argument, 0, 'P' },
{ "ckpool-logdir", required_argument, 0, 'r' },
{ "logdir", required_argument, 0, 'R' },
{ "sockdir", required_argument, 0, 's' },
{ "btc-server", required_argument, 0, 'S' },
{ "btc-timeout", required_argument, 0, 't' },
{ "dbuser", required_argument, 0, 'u' },
{ "btc-user", required_argument, 0, 'U' },
{ "version", no_argument, 0, 'v' },
{ "confirm", no_argument, 0, 'y' },
{ "confirmrange", required_argument, 0, 'Y' },
@ -3048,6 +3086,8 @@ static void sighandler(int sig)
int main(int argc, char **argv)
{
struct sigaction handler;
char *btc_user = "user";
char *btc_pass = "p";
char buf[512];
ckpool_t ckp;
int c, ret, i = 0, j;
@ -3062,7 +3102,7 @@ int main(int argc, char **argv)
memset(&ckp, 0, sizeof(ckp));
ckp.loglevel = LOG_NOTICE;
while ((c = getopt_long(argc, argv, "c:d:hkl:n:p:r:R:s:u:vyY:", long_options, &i)) != -1) {
while ((c = getopt_long(argc, argv, "c:d:hkl:n:O:p:P:r:R:s:S:t:T:u:U:vyY:", long_options, &i)) != -1) {
switch(c) {
case 'c':
ckp.config = strdup(optarg);
@ -3111,6 +3151,14 @@ int main(int argc, char **argv)
while (*kill)
*(kill++) = '\0';
break;
case 'P':
btc_pass = strdup(optarg);
kill = optarg;
if (*kill)
*(kill++) = ' ';
while (*kill)
*(kill++) = '\0';
break;
case 'r':
restorefrom = strdup(optarg);
break;
@ -3120,12 +3168,24 @@ int main(int argc, char **argv)
case 's':
ckp.socket_dir = strdup(optarg);
break;
case 'S':
btc_server = strdup(optarg);
break;
case 't':
btc_timeout = atoi(optarg);
break;
case 'u':
db_user = strdup(optarg);
kill = optarg;
while (*kill)
*(kill++) = ' ';
break;
case 'U':
btc_user = strdup(optarg);
kill = optarg;
while (*kill)
*(kill++) = ' ';
break;
case 'v':
exit(0);
case 'y':
@ -3139,6 +3199,10 @@ int main(int argc, char **argv)
}
}
snprintf(buf, sizeof(buf), "%s:%s", btc_user, btc_pass);
btc_auth = http_base64(buf);
bzero(buf, sizeof(buf));
if (confirm_sharesummary)
dbcode = "y";
else

33
src/ckdb.h

@ -52,7 +52,7 @@
#define DB_VLOCK "1"
#define DB_VERSION "0.9.2"
#define CKDB_VERSION DB_VERSION"-0.504"
#define CKDB_VERSION DB_VERSION"-0.510"
#define WHERE_FFL " - from %s %s() line %d"
#define WHERE_FFL_HERE __FILE__, __func__, __LINE__
@ -245,8 +245,13 @@ extern bool everyone_die;
#define STR_SHAREERRORS "shareerror"
#define STR_AGEWORKINFO "ageworkinfo"
extern char *btc_server;
extern char *btc_auth;
extern int btc_timeout;
extern char *by_default;
extern char *inet_default;
extern char *id_default;
enum cmd_values {
CMD_UNSET,
@ -344,6 +349,7 @@ enum cmd_values {
/* Override _row defaults if transfer fields are present
* We don't care about the reply so it can be small */
#define HISTORYDATETRANSFER(_root, _row) do { \
if (_root) { \
char __reply[16]; \
size_t __siz = sizeof(__reply); \
K_ITEM *__item; \
@ -363,6 +369,7 @@ enum cmd_values {
DATA_TRANSFER(__transfer, __item); \
STRNCPY(_row->createinet, __transfer->mvalue); \
} \
} \
} while (0)
#define MODIFYDATECONTROL ",createdate,createby,createcode,createinet" \
@ -899,6 +906,7 @@ typedef struct blocks {
int64_t elapsed;
char statsconfirmed[TXT_FLAG+1];
HISTORYDATECONTROLFIELDS;
bool ignore; // Non DB field
} BLOCKS;
#define ALLOC_BLOCKS 100
@ -911,10 +919,16 @@ typedef struct blocks {
#define BLOCKS_NEW_STR "n"
#define BLOCKS_CONFIRM '1'
#define BLOCKS_CONFIRM_STR "1"
// 42 doesn't actually mean '42' it means matured
#define BLOCKS_42 'F'
#define BLOCKS_42_STR "F"
// Current block maturity is ... 100
#define BLOCKS_42_VALUE 100
#define BLOCKS_ORPHAN 'O'
#define BLOCKS_ORPHAN_STR "O"
/* Block height difference required before checking if it's orphaned
* TODO: add a cmd_blockstatus option to un-orphan a block */
#define BLOCKS_ORPHAN_CHECK 1
#define BLOCKS_STATSPENDING FALSE_CHR
#define BLOCKS_STATSPENDING_STR FALSE_STR
@ -1167,6 +1181,7 @@ extern K_LIST *workerstatus_free;
extern K_STORE *workerstatus_store;
extern void logmsg(int loglevel, const char *fmt, ...);
extern void setnow(tv_t *now);
extern void tick();
extern PGconn *dbconnect();
@ -1290,10 +1305,16 @@ extern void dsp_sharesummary(K_ITEM *item, FILE *stream);
extern cmp_t cmp_sharesummary(K_ITEM *a, K_ITEM *b);
extern cmp_t cmp_sharesummary_workinfoid(K_ITEM *a, K_ITEM *b);
extern void zero_sharesummary(SHARESUMMARY *row, tv_t *cd, double diff);
extern K_ITEM *find_sharesummary(int64_t userid, char *workername, int64_t workinfoid);
extern K_ITEM *find_sharesummary(int64_t userid, char *workername,
int64_t workinfoid);
extern void auto_age_older(PGconn *conn, int64_t workinfoid, char *poolinstance,
char *by, char *code, char *inet, tv_t *cd);
extern void dsp_hash(char *hash, char *buf, size_t siz);
#define dbhash2btchash(_hash, _buf, _siz) \
_dbhash2btchash(_hash, _buf, _siz, WHERE_FFL_HERE)
void _dbhash2btchash(char *hash, char *buf, size_t siz, WHERE_FFL_ARGS);
#define dsp_hash(_hash, _buf, _siz) \
_dsp_hash(_hash, _buf, _siz, WHERE_FFL_HERE)
extern void _dsp_hash(char *hash, char *buf, size_t siz, WHERE_FFL_ARGS);
extern void dsp_blocks(K_ITEM *item, FILE *stream);
extern cmp_t cmp_blocks(K_ITEM *a, K_ITEM *b);
extern K_ITEM *find_blocks(int32_t height, char *blockhash);
@ -1469,4 +1490,10 @@ struct CMDS {
extern struct CMDS ckdb_cmds[];
// ***
// *** ckdb_btc.c
// ***
extern void btc_blockstatus(BLOCKS *blocks);
#endif

316
src/ckdb_btc.c

@ -0,0 +1,316 @@
/*
* Copyright 2014 Andrew Smith
* Copyright 2014 Con Kolivas
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License as published by the Free
* Software Foundation; either version 3 of the License, or (at your option)
* any later version. See COPYING for more details.
*/
#include "ckdb.h"
//#include <curl/curl.h>
#define GETBLOCKHASHCMD "getblockhash"
#define GETBLOCKHASH "{\"method\":\"" GETBLOCKHASHCMD "\",\"params\":[%d],\"id\":1}"
#define GETBLOCKHASHKEY ((const char *)"result")
#define GETBLOCKCMD "getblock"
#define GETBLOCK "{\"method\":\"" GETBLOCKCMD "\",\"params\":[\"%s\"],\"id\":1}"
#define GETBLOCKCONFKEY ((const char *)"confirmations")
static char *btc_data(char *json, size_t *len)
{
size_t off;
char tmp[1024];
char *buf;
APPEND_REALLOC_INIT(buf, off, *len);
APPEND_REALLOC(buf, off, *len, "POST / HTTP/1.1\n");
snprintf(tmp, sizeof(tmp), "Authorization: Basic %s\n", btc_auth);
APPEND_REALLOC(buf, off, *len, tmp);
snprintf(tmp, sizeof(tmp), "Host: %s/\n", btc_server);
APPEND_REALLOC(buf, off, *len, tmp);
APPEND_REALLOC(buf, off, *len, "Content-Type: application/json\n");
snprintf(tmp, sizeof(tmp), "Content-Length: %d\n\n", (int)strlen(json));
APPEND_REALLOC(buf, off, *len, tmp);
APPEND_REALLOC(buf, off, *len, json);
return buf;
}
#define SOCK_READ 8192
static int read_socket(int fd, char **buf, int timeout)
{
char tmp[SOCK_READ+1];
int ret, off, len;
tv_t tv_timeout;
fd_set readfs;
len = SOCK_READ;
*buf = malloc(len+1);
if (!(*buf))
quithere(1, "malloc (%d) OOM", len+1);
off = 0;
while (42) {
tv_timeout.tv_sec = timeout;
tv_timeout.tv_usec = 0;
FD_ZERO(&readfs);
FD_SET(fd, &readfs);
ret = select(fd + 1, &readfs, NULL, NULL, &tv_timeout);
if (ret == 0)
break;
if (ret < 0) {
LOGERR("%s() btc socket select error %d:%s",
__func__, errno, strerror(errno));
break;
}
ret = recv(fd, tmp, SOCK_READ, 0);
if (ret == 0)
break;
if (ret < 0) {
LOGERR("%s() btc socket recv error %d:%s",
__func__, errno, strerror(errno));
break;
}
if ((off + ret) > len) {
len += SOCK_READ;
*buf = realloc(*buf, len + 1);
if (!(*buf))
quithere(1, "realloc (%d) OOM", len);
}
memcpy(*buf + off, tmp, ret);
off += ret;
}
if (close(fd)) {
LOGERR("%s() btc socket close error %d:%s",
__func__, errno, strerror(errno));
}
return off;
}
#define btc_io(_cmd, _json) _btc_io(_cmd, _json, WHERE_FFL_HERE)
static char *_btc_io(__maybe_unused const char *cmd, char *json, WHERE_FFL_ARGS)
{
char *ip, *port;
char *data, *ans, *res, *ptr;
int fd, ret, red;
size_t len;
data = btc_data(json, &len);
if (!extract_sockaddr(btc_server, &ip, &port)) {
LOGERR("%s() invalid btc server '%s'",
__func__, btc_server);
return NULL;
}
fd = connect_socket(ip, port);
if (fd < 0) {
LOGERR("%s() failed to connect to btc server %s",
__func__, btc_server);
return NULL;
}
ret = write_socket(fd, data, len);
if (ret != (int)len) {
LOGERR("%s() failed to write to btc server %s",
__func__, btc_server);
return NULL;
}
red = read_socket(fd, &ans, btc_timeout);
ans[red] = '\0';
if (strncasecmp(ans, "HTTP/1.1 200 OK", 15)) {
char *text = safe_text(ans);
LOGERR("%s() btc server response not ok: %s",
__func__, text);
free(text);
free(ans);
res = strdup(EMPTY);
} else {
ptr = strstr(ans, "\n{");
if (ptr)
res = strdup(ptr+1);
else
res = strdup(EMPTY);
free(ans);
}
return res;
}
static char *single_decode_str(char *ans, const char *cmd, const char *key)
{
json_t *json_data, *json_ob;
json_error_t err_val;
const char *json_str;
if (ans) {
json_data = json_loads(ans, JSON_DISABLE_EOF_CHECK, &err_val);
if (!json_data) {
char *text = safe_text(ans);
LOGERR("%s() Json %s decode error "
"json_err=(%d:%d:%d)%s:%s ans='%s'",
__func__, cmd,
err_val.line, err_val.column,
err_val.position, err_val.source,
err_val.text, text);
free(text);
} else {
json_ob = json_object_get(json_data, key);
if (!json_ob) {
char *text = safe_text(ans);
LOGERR("%s() Json %s reply missing key %s "
"ans='%s'",
__func__, cmd, key, text);
free(text);
} else {
if (!json_is_string(json_ob)) {
char *text = safe_text(ans);
LOGERR("%s() Json %s key %s "
"not a string ans='%s'",
__func__, cmd, key, text);
free(text);
} else {
json_str = json_string_value(json_ob);
if (json_str)
return strdup(json_str);
else
return strdup(EMPTY);
}
}
}
}
return NULL;
}
static int64_t single_decode_int(char *ans, const char *cmd, const char *key)
{
json_t *json_data, *json_ob;
json_error_t err_val;
if (ans) {
json_data = json_loads(ans, JSON_DISABLE_EOF_CHECK, &err_val);
if (!json_data) {
char *text = safe_text(ans);
LOGERR("%s() Json %s decode error "
"json_err=(%d:%d:%d)%s:%s ans='%s'",
__func__, cmd,
err_val.line, err_val.column,
err_val.position, err_val.source,
err_val.text, text);
free(text);
} else {
json_ob = json_object_get(json_data, key);
if (!json_ob) {
char *text = safe_text(ans);
LOGERR("%s() Json %s reply missing key %s "
"ans='%s'",
__func__, cmd, key, text);
free(text);
} else
return (int64_t)json_integer_value(json_ob);
}
}
return 0;
}
static char *btc_blockhash(int32_t height)
{
char buf[1024];
char *ans;
char *hash;
snprintf(buf, sizeof(buf), GETBLOCKHASH, height);
ans = btc_io(GETBLOCKHASHCMD, buf);
hash = single_decode_str(ans, GETBLOCKHASHCMD, GETBLOCKHASHKEY);
free(ans);
return hash;
}
static __maybe_unused int32_t btc_confirms(int32_t height)
{
char buf[1024];
char *ans;
int32_t conf;
snprintf(buf, sizeof(buf), GETBLOCKHASH, height);
ans = btc_io(GETBLOCKHASHCMD, buf);
conf = (int32_t)single_decode_int(ans, GETBLOCKCMD, GETBLOCKCONFKEY);
free(ans);
return conf;
}
// Check for orphan or update confirm count
void btc_blockstatus(BLOCKS *blocks)
{
char hash[TXT_BIG+1];
char *blockhash;
size_t len;
tv_t now;
bool ok;
setnow(&now);
LOGDEBUG("%s() checking %d %s",
__func__,
blocks->height, blocks->blockhash);
// Caller must check this to avoid resending it every time
if (blocks->ignore) {
LOGERR("%s() ignored block %d passed",
__func__, blocks->height);
return;
}
len = strlen(blocks->blockhash);
if (len != SHA256SIZHEX) {
LOGERR("%s() invalid blockhash size %d (%d) for block %d",
__func__, len, SHA256SIZHEX, blocks->height);
/* So we don't keep repeating the message
* This should never happen */
blocks->ignore = true;
return;
}
dbhash2btchash(blocks->blockhash, hash, sizeof(hash));
blockhash = btc_blockhash(blocks->height);
// Something's amiss - let it try again later
if (!blockhash)
return;
if (strcmp(blockhash, hash) != 0) {
char height_tmp[32];
snprintf(height_tmp, sizeof(height_tmp), "%d", blocks->height);
LOGERR("%s() flagging block %d(%s) as %s pool=%s btc=%s",
__func__,
blocks->height, height_tmp,
blocks_confirmed(BLOCKS_ORPHAN_STR),
hash, blockhash);
ok = blocks_add(NULL, height_tmp,
blocks->blockhash,
BLOCKS_ORPHAN_STR,
EMPTY, EMPTY, EMPTY, EMPTY,
EMPTY, EMPTY, EMPTY, EMPTY,
by_default, (char *)__func__, inet_default,
&now, false, id_default, NULL);
if (!ok)
blocks->ignore = true;
return;
}
// confirms = btc_confirms(hash);
}

41
src/ckdb_data.c

@ -1645,14 +1645,45 @@ void auto_age_older(PGconn *conn, int64_t workinfoid, char *poolinstance,
}
}
// TODO: do this better ... :)
void dsp_hash(char *hash, char *buf, size_t siz)
void _dbhash2btchash(char *hash, char *buf, size_t siz, WHERE_FFL_ARGS)
{
size_t len;
int i, j;
// code bug
if (siz < (SHA256SIZHEX + 1)) {
quitfrom(1, file, func, line,
"%s() passed buf too small %d (%d)",
__func__, (int)siz, SHA256SIZHEX+1);
}
len = strlen(hash);
// code bug - check this before calling
if (len != SHA256SIZHEX) {
quitfrom(1, file, func, line,
"%s() invalid hash passed - size %d (%d)",
__func__, (int)len, SHA256SIZHEX);
}
for (i = 0; i < SHA256SIZHEX; i++) {
j = SHA256SIZHEX - 8 - (i & 0xfff8) + (i % 8);
buf[i] = hash[j];
}
buf[SHA256SIZHEX] = '\0';
}
void _dsp_hash(char *hash, char *buf, size_t siz, WHERE_FFL_ARGS)
{
char tmp[SHA256SIZHEX+1];
char *ptr;
ptr = hash + strlen(hash) - (siz - 1) - 8;
if (ptr < hash)
ptr = hash;
_dbhash2btchash(hash, tmp, sizeof(tmp), file, func, line);
ptr = tmp;
while (*ptr && *ptr == '0')
ptr++;
ptr -= 4;
if (ptr < tmp)
ptr = tmp;
STRNCPYSIZ(buf, ptr, siz);
}

Loading…
Cancel
Save