Browse Source

ckdb - enable, optionally automatic but off by default, workmarkers processing

master
kanoi 10 years ago committed by Con Kolivas
parent
commit
f1f0290074
  1. 101
      src/ckdb.c
  2. 65
      src/ckdb.h
  3. 14
      src/ckdb_cmd.c
  4. 131
      src/ckdb_data.c
  5. 425
      src/ckdb_dbio.c

101
src/ckdb.c

@ -169,6 +169,9 @@ static char *status_chars = "|/-\\";
static char *restorefrom; static char *restorefrom;
// Only accessed in here
static bool markersummary_auto;
// disallow: '/' '.' '_' and FLDSEP // disallow: '/' '.' '_' and FLDSEP
const char *userpatt = "^[^/\\._"FLDSEPSTR"]*$"; const char *userpatt = "^[^/\\._"FLDSEPSTR"]*$";
const char *mailpatt = "^[A-Za-z0-9_-][A-Za-z0-9_\\.-]*@[A-Za-z0-9][A-Za-z0-9\\.-]*[A-Za-z0-9]$"; const char *mailpatt = "^[A-Za-z0-9_-][A-Za-z0-9_\\.-]*@[A-Za-z0-9][A-Za-z0-9\\.-]*[A-Za-z0-9]$";
@ -1076,82 +1079,6 @@ static void alloc_storage()
marks_root = new_ktree(); marks_root = new_ktree();
} }
static void free_workinfo_data(K_ITEM *item)
{
WORKINFO *workinfo;
DATA_WORKINFO(workinfo, item);
if (workinfo->transactiontree)
FREENULL(workinfo->transactiontree);
if (workinfo->merklehash)
FREENULL(workinfo->merklehash);
}
static void free_sharesummary_data(K_ITEM *item)
{
SHARESUMMARY *sharesummary;
DATA_SHARESUMMARY(sharesummary, item);
if (sharesummary->workername) {
LIST_MEM_SUB(sharesummary_free, sharesummary->workername);
FREENULL(sharesummary->workername);
}
SET_CREATEBY(sharesummary_free, sharesummary->createby, EMPTY);
SET_CREATECODE(sharesummary_free, sharesummary->createcode, EMPTY);
SET_CREATEINET(sharesummary_free, sharesummary->createinet, EMPTY);
SET_MODIFYBY(sharesummary_free, sharesummary->modifyby, EMPTY);
SET_MODIFYCODE(sharesummary_free, sharesummary->modifycode, EMPTY);
SET_MODIFYINET(sharesummary_free, sharesummary->modifyinet, EMPTY);
}
static void free_optioncontrol_data(K_ITEM *item)
{
OPTIONCONTROL *optioncontrol;
DATA_OPTIONCONTROL(optioncontrol, item);
if (optioncontrol->optionvalue)
FREENULL(optioncontrol->optionvalue);
}
static void free_markersummary_data(K_ITEM *item)
{
MARKERSUMMARY *markersummary;
DATA_MARKERSUMMARY(markersummary, item);
if (markersummary->workername)
FREENULL(markersummary->workername);
SET_CREATEBY(markersummary_free, markersummary->createby, EMPTY);
SET_CREATECODE(markersummary_free, markersummary->createcode, EMPTY);
SET_CREATEINET(markersummary_free, markersummary->createinet, EMPTY);
SET_MODIFYBY(markersummary_free, markersummary->modifyby, EMPTY);
SET_MODIFYCODE(markersummary_free, markersummary->modifycode, EMPTY);
SET_MODIFYINET(markersummary_free, markersummary->modifyinet, EMPTY);
}
static void free_workmarkers_data(K_ITEM *item)
{
WORKMARKERS *workmarkers;
DATA_WORKMARKERS(workmarkers, item);
if (workmarkers->poolinstance)
FREENULL(workmarkers->poolinstance);
if (workmarkers->description)
FREENULL(workmarkers->description);
}
static void free_marks_data(K_ITEM *item)
{
MARKS *marks;
DATA_MARKS(marks, item);
if (marks->poolinstance && marks->poolinstance != EMPTY)
FREENULL(marks->poolinstance);
if (marks->description && marks->description != EMPTY)
FREENULL(marks->description);
if (marks->extra && marks->extra != EMPTY)
FREENULL(marks->extra);
}
#define FREE_TREE(_tree) \ #define FREE_TREE(_tree) \
if (_tree ## _root) \ if (_tree ## _root) \
_tree ## _root = free_ktree(_tree ## _root, NULL) \ _tree ## _root = free_ktree(_tree ## _root, NULL) \
@ -2081,8 +2008,8 @@ static void make_a_shift_mark()
K_ITEM wi_look, ss_look; K_ITEM wi_look, ss_look;
SHARESUMMARY *sharesummary, looksharesummary; SHARESUMMARY *sharesummary, looksharesummary;
WORKINFO *workinfo, lookworkinfo; WORKINFO *workinfo, lookworkinfo;
BLOCKS *blocks; BLOCKS *blocks = NULL;
MARKS *marks, *sh_marks; MARKS *marks = NULL, *sh_marks = NULL;
int64_t ss_age_wid, last_marks_wid, marks_wid, prev_wid; int64_t ss_age_wid, last_marks_wid, marks_wid, prev_wid;
bool was_block = false, ok; bool was_block = false, ok;
char cd_buf[DATE_BUFSIZ], cd_buf2[DATE_BUFSIZ]; char cd_buf[DATE_BUFSIZ], cd_buf2[DATE_BUFSIZ];
@ -2197,7 +2124,8 @@ static void make_a_shift_mark()
if (m_item) { if (m_item) {
/* First block after the last mark /* First block after the last mark
* Shift must stop at or before this */ * Shift must stop at or before this
* N.B. any block, even 'New' */
K_RLOCK(blocks_free); K_RLOCK(blocks_free);
b_item = first_in_ktree(blocks_root, b_ctx); b_item = first_in_ktree(blocks_root, b_ctx);
while (b_item) { while (b_item) {
@ -2474,16 +2402,16 @@ static void *marker(__maybe_unused void *arg)
else else
make_a_workmarker(); make_a_workmarker();
#if 0
for (i = 0; i < 4; i++) { for (i = 0; i < 4; i++) {
if (!everyone_die) if (!everyone_die)
sleep(1); sleep(1);
} }
if (everyone_die) if (everyone_die)
break; break;
else else {
make_markersummaries(); if (markersummary_auto)
#endif make_markersummaries(false, NULL, NULL, NULL, NULL, NULL);
}
} }
marker_using_data = false; marker_using_data = false;
@ -4044,6 +3972,8 @@ static struct option long_options[] = {
{ "help", no_argument, 0, 'h' }, { "help", no_argument, 0, 'h' },
{ "killold", no_argument, 0, 'k' }, { "killold", no_argument, 0, 'k' },
{ "loglevel", required_argument, 0, 'l' }, { "loglevel", required_argument, 0, 'l' },
// markersummary = enable markersummary auto generation
{ "markersummary", no_argument, 0, 'm' },
{ "name", required_argument, 0, 'n' }, { "name", required_argument, 0, 'n' },
{ "dbpass", required_argument, 0, 'p' }, { "dbpass", required_argument, 0, 'p' },
{ "btc-pass", required_argument, 0, 'P' }, { "btc-pass", required_argument, 0, 'P' },
@ -4088,7 +4018,7 @@ int main(int argc, char **argv)
memset(&ckp, 0, sizeof(ckp)); memset(&ckp, 0, sizeof(ckp));
ckp.loglevel = LOG_NOTICE; ckp.loglevel = LOG_NOTICE;
while ((c = getopt_long(argc, argv, "c:d:hkl:n:p:P:r:R:s:S:t:u:U:vw:yY:", long_options, &i)) != -1) { while ((c = getopt_long(argc, argv, "c:d:hkl:mn:p:P:r:R:s:S:t:u:U:vw:yY:", long_options, &i)) != -1) {
switch(c) { switch(c) {
case 'c': case 'c':
ckp.config = strdup(optarg); ckp.config = strdup(optarg);
@ -4126,6 +4056,9 @@ int main(int argc, char **argv)
LOG_EMERG, LOG_DEBUG, ckp.loglevel); LOG_EMERG, LOG_DEBUG, ckp.loglevel);
} }
break; break;
case 'm':
markersummary_auto = true;
break;
case 'n': case 'n':
ckp.name = strdup(optarg); ckp.name = strdup(optarg);
break; break;

65
src/ckdb.h

@ -52,7 +52,7 @@
#define DB_VLOCK "1" #define DB_VLOCK "1"
#define DB_VERSION "0.9.6" #define DB_VERSION "0.9.6"
#define CKDB_VERSION DB_VERSION"-0.822" #define CKDB_VERSION DB_VERSION"-0.830"
#define WHERE_FFL " - from %s %s() line %d" #define WHERE_FFL " - from %s %s() line %d"
#define WHERE_FFL_HERE __FILE__, __func__, __LINE__ #define WHERE_FFL_HERE __FILE__, __func__, __LINE__
@ -523,6 +523,34 @@ enum cmd_values {
_row->pointers = _row->pointers; \ _row->pointers = _row->pointers; \
} while (0) } while (0)
/* Override _row defaults if transfer fields are present
* We don't care about the reply so it can be small
* This is the pointer version - only one required so far */
#define MODIFYDATETRANSFER(_list, _root, _row) do { \
if (_root) { \
char __reply[16]; \
size_t __siz = sizeof(__reply); \
K_ITEM *__item; \
TRANSFER *__transfer; \
__item = optional_name(_root, "createby", 1, NULL, __reply, __siz); \
if (__item) { \
DATA_TRANSFER(__transfer, __item); \
SET_CREATEBY(_list, _row->createby, __transfer->mvalue); \
} \
__item = optional_name(_root, "createcode", 1, NULL, __reply, __siz); \
if (__item) { \
DATA_TRANSFER(__transfer, __item); \
SET_CREATECODE(_list, _row->createcode, __transfer->mvalue); \
} \
__item = optional_name(_root, "createinet", 1, NULL, __reply, __siz); \
if (__item) { \
DATA_TRANSFER(__transfer, __item); \
SET_CREATEINET(_list, _row->createinet, __transfer->mvalue); \
} \
_row->pointers = _row->pointers; \
} \
} while (0)
#define SIMPLEDATECONTROL ",createdate,createby,createcode,createinet" #define SIMPLEDATECONTROL ",createdate,createby,createcode,createinet"
#define SIMPLEDATECOUNT 4 #define SIMPLEDATECOUNT 4
#define SIMPLEDATECONTROLFIELDS \ #define SIMPLEDATECONTROLFIELDS \
@ -1480,6 +1508,14 @@ extern PGconn *dbconnect();
// *** ckdb_data.c *** // *** ckdb_data.c ***
// *** // ***
// Data free functions (first)
extern void free_workinfo_data(K_ITEM *item);
extern void free_sharesummary_data(K_ITEM *item);
extern void free_optioncontrol_data(K_ITEM *item);
extern void free_markersummary_data(K_ITEM *item);
extern void free_workmarkers_data(K_ITEM *item);
extern void free_marks_data(K_ITEM *item);
extern char *safe_text(char *txt); extern char *safe_text(char *txt);
extern void username_trim(USERS *users); extern void username_trim(USERS *users);
extern bool like_address(char *username); extern bool like_address(char *username);
@ -1629,10 +1665,12 @@ extern bool userstats_starttimeband(USERSTATS *row, tv_t *statsdate);
extern void dsp_markersummary(K_ITEM *item, FILE *stream); extern void dsp_markersummary(K_ITEM *item, FILE *stream);
extern cmp_t cmp_markersummary(K_ITEM *a, K_ITEM *b); extern cmp_t cmp_markersummary(K_ITEM *a, K_ITEM *b);
extern cmp_t cmp_markersummary_userid(K_ITEM *a, K_ITEM *b); extern cmp_t cmp_markersummary_userid(K_ITEM *a, K_ITEM *b);
extern K_ITEM *find_markersummary(int64_t workinfoid, int64_t userid,
char *workername);
extern K_ITEM *find_markersummary_userid(int64_t userid, char *workername, extern K_ITEM *find_markersummary_userid(int64_t userid, char *workername,
K_TREE_CTX *ctx); K_TREE_CTX *ctx);
extern K_ITEM *find_markersummary(int64_t workinfoid, int64_t userid,
char *workername);
extern bool make_markersummaries(bool msg, char *by, char *code, char *inet,
tv_t *cd, K_TREE *trf_root);
extern void dsp_workmarkers(K_ITEM *item, FILE *stream); extern void dsp_workmarkers(K_ITEM *item, FILE *stream);
extern cmp_t cmp_workmarkers(K_ITEM *a, K_ITEM *b); extern cmp_t cmp_workmarkers(K_ITEM *a, K_ITEM *b);
extern cmp_t cmp_workmarkers_workinfoid(K_ITEM *a, K_ITEM *b); extern cmp_t cmp_workmarkers_workinfoid(K_ITEM *a, K_ITEM *b);
@ -1750,6 +1788,9 @@ extern bool shareerrors_add(PGconn *conn, char *workinfoid, char *username,
char *workername, char *clientid, char *errn, char *workername, char *clientid, char *errn,
char *error, char *secondaryuserid, char *by, char *error, char *secondaryuserid, char *by,
char *code, char *inet, tv_t *cd, K_TREE *trf_root); char *code, char *inet, tv_t *cd, K_TREE *trf_root);
extern bool sharesummaries_to_markersummaries(PGconn *conn, WORKMARKERS *workmarkers,
char *by, char *code, char *inet,
tv_t *cd, K_TREE *trf_root);
#define sharesummary_update(_conn, _s_row, _e_row, _ss_item, _by, _code, _inet, _cd) \ #define sharesummary_update(_conn, _s_row, _e_row, _ss_item, _by, _code, _inet, _cd) \
_sharesummary_update(_conn, _s_row, _e_row, _ss_item, _by, _code, _inet, _cd, \ _sharesummary_update(_conn, _s_row, _e_row, _ss_item, _by, _code, _inet, _cd, \
WHERE_FFL_HERE) WHERE_FFL_HERE)
@ -1796,20 +1837,22 @@ extern bool workerstats_add(char *poolinstance, char *elapsed, char *username,
char *by, char *code, char *inet, tv_t *cd, char *by, char *code, char *inet, tv_t *cd,
K_TREE *trf_root); K_TREE *trf_root);
extern bool userstats_fill(PGconn *conn); extern bool userstats_fill(PGconn *conn);
extern bool markersummary_add(PGconn *conn, K_ITEM *ms_item, char *by, char *code,
char *inet, tv_t *cd, K_TREE *trf_root);
extern bool markersummary_fill(PGconn *conn); extern bool markersummary_fill(PGconn *conn);
#define workmarkers_process(_conn, _add, _markerid, _poolinstance, \ #define workmarkers_process(_conn, _already, _add, _markerid, _poolinstance, \
_workinfoidend, _workinfoidstart, _description, \ _workinfoidend, _workinfoidstart, _description, \
_status, _by, _code, _inet, _cd, _trf_root) \ _status, _by, _code, _inet, _cd, _trf_root) \
_workmarkers_process(_conn, _add, _markerid, _poolinstance, \ _workmarkers_process(_conn, _already, _add, _markerid, _poolinstance, \
_workinfoidend, _workinfoidstart, _description, \ _workinfoidend, _workinfoidstart, _description, \
_status, _by, _code, _inet, _cd, _trf_root, \ _status, _by, _code, _inet, _cd, _trf_root, \
WHERE_FFL_HERE) WHERE_FFL_HERE)
extern bool _workmarkers_process(PGconn *conn, bool add, int64_t markerid, extern bool _workmarkers_process(PGconn *conn, bool already, bool add,
char *poolinstance, int64_t workinfoidend, int64_t markerid, char *poolinstance,
int64_t workinfoidstart, char *description, int64_t workinfoidend, int64_t workinfoidstart,
char *status, char *by, char *code, char *description, char *status, char *by,
char *inet, tv_t *cd, K_TREE *trf_root, char *code, char *inet, tv_t *cd,
WHERE_FFL_ARGS); K_TREE *trf_root, WHERE_FFL_ARGS);
extern bool workmarkers_fill(PGconn *conn); extern bool workmarkers_fill(PGconn *conn);
#define marks_process(_conn, _add, _poolinstance, _workinfoid, _description, \ #define marks_process(_conn, _add, _poolinstance, _workinfoid, _description, \
_extra, _marktype, _status, _by, _code, _inet, _cd, \ _extra, _marktype, _status, _by, _code, _inet, _cd, \

14
src/ckdb_cmd.c

@ -4092,7 +4092,7 @@ static char *cmd_userstatus(PGconn *conn, char *cmd, char *id, tv_t *now, char *
} }
/* Socket interface to the functions that will be used later to automatically /* Socket interface to the functions that will be used later to automatically
* create marks, workmarkers and process the workmarkers * create marks, workmarkers and process the workmarkers and sharesummaries
* to generate markersummaries */ * to generate markersummaries */
static char *cmd_marks(PGconn *conn, char *cmd, char *id, static char *cmd_marks(PGconn *conn, char *cmd, char *id,
__maybe_unused tv_t *now, char *by, __maybe_unused tv_t *now, char *by,
@ -4372,7 +4372,7 @@ static char *cmd_marks(PGconn *conn, char *cmd, char *id,
DATA_WORKMARKERS(workmarkers, wm_item); DATA_WORKMARKERS(workmarkers, wm_item);
if (CURRENT(&(workmarkers->expirydate)) && if (CURRENT(&(workmarkers->expirydate)) &&
!WMPROCESSED(workmarkers->status)) { !WMPROCESSED(workmarkers->status)) {
ok = workmarkers_process(conn, false, ok = workmarkers_process(conn, false, false,
workmarkers->markerid, workmarkers->markerid,
NULL, 0, 0, NULL, NULL, by, NULL, 0, 0, NULL, NULL, by,
code, inet, cd, trf_root); code, inet, cd, trf_root);
@ -4391,6 +4391,16 @@ static char *cmd_marks(PGconn *conn, char *cmd, char *id,
"%d workmarkers expunged", count); "%d workmarkers expunged", count);
} }
} }
} else if (strcasecmp(action, "sum") == 0) {
/* For the last available workmarker,
* summarise it's sharesummaries into markersummaries
* No parameters */
ok = make_markersummaries(true, by, code, inet, cd, trf_root);
if (!ok) {
snprintf(reply, siz, "%s failed", action);
LOGERR("%s.%s", id, reply);
return strdup(reply);
}
} else { } else {
snprintf(reply, siz, "unknown action '%s'", action); snprintf(reply, siz, "unknown action '%s'", action);
LOGERR("%s.%s", id, reply); LOGERR("%s.%s", id, reply);

131
src/ckdb_data.c

@ -9,6 +9,83 @@
#include "ckdb.h" #include "ckdb.h"
// Data free functions (added here as needed)
void free_workinfo_data(K_ITEM *item)
{
WORKINFO *workinfo;
DATA_WORKINFO(workinfo, item);
if (workinfo->transactiontree)
FREENULL(workinfo->transactiontree);
if (workinfo->merklehash)
FREENULL(workinfo->merklehash);
}
void free_sharesummary_data(K_ITEM *item)
{
SHARESUMMARY *sharesummary;
DATA_SHARESUMMARY(sharesummary, item);
if (sharesummary->workername) {
LIST_MEM_SUB(sharesummary_free, sharesummary->workername);
FREENULL(sharesummary->workername);
}
SET_CREATEBY(sharesummary_free, sharesummary->createby, EMPTY);
SET_CREATECODE(sharesummary_free, sharesummary->createcode, EMPTY);
SET_CREATEINET(sharesummary_free, sharesummary->createinet, EMPTY);
SET_MODIFYBY(sharesummary_free, sharesummary->modifyby, EMPTY);
SET_MODIFYCODE(sharesummary_free, sharesummary->modifycode, EMPTY);
SET_MODIFYINET(sharesummary_free, sharesummary->modifyinet, EMPTY);
}
void free_optioncontrol_data(K_ITEM *item)
{
OPTIONCONTROL *optioncontrol;
DATA_OPTIONCONTROL(optioncontrol, item);
if (optioncontrol->optionvalue)
FREENULL(optioncontrol->optionvalue);
}
void free_markersummary_data(K_ITEM *item)
{
MARKERSUMMARY *markersummary;
DATA_MARKERSUMMARY(markersummary, item);
if (markersummary->workername)
FREENULL(markersummary->workername);
SET_CREATEBY(markersummary_free, markersummary->createby, EMPTY);
SET_CREATECODE(markersummary_free, markersummary->createcode, EMPTY);
SET_CREATEINET(markersummary_free, markersummary->createinet, EMPTY);
SET_MODIFYBY(markersummary_free, markersummary->modifyby, EMPTY);
SET_MODIFYCODE(markersummary_free, markersummary->modifycode, EMPTY);
SET_MODIFYINET(markersummary_free, markersummary->modifyinet, EMPTY);
}
void free_workmarkers_data(K_ITEM *item)
{
WORKMARKERS *workmarkers;
DATA_WORKMARKERS(workmarkers, item);
if (workmarkers->poolinstance)
FREENULL(workmarkers->poolinstance);
if (workmarkers->description)
FREENULL(workmarkers->description);
}
void free_marks_data(K_ITEM *item)
{
MARKS *marks;
DATA_MARKS(marks, item);
if (marks->poolinstance && marks->poolinstance != EMPTY)
FREENULL(marks->poolinstance);
if (marks->description && marks->description != EMPTY)
FREENULL(marks->description);
if (marks->extra && marks->extra != EMPTY)
FREENULL(marks->extra);
}
// Clear text printable version of txt up to first '\0' // Clear text printable version of txt up to first '\0'
char *safe_text(char *txt) char *safe_text(char *txt)
{ {
@ -2390,6 +2467,58 @@ K_ITEM *find_markersummary(int64_t workinfoid, int64_t userid, char *workername)
return ms_item; return ms_item;
} }
bool make_markersummaries(bool msg, char *by, char *code, char *inet,
tv_t *cd, K_TREE *trf_root)
{
K_TREE_CTX ctx[1];
WORKMARKERS *workmarkers;
K_ITEM *wm_item, *wm_last = NULL;
tv_t now;
K_RLOCK(workmarkers_free);
wm_item = last_in_ktree(workmarkers_workinfoid_root, ctx);
while (wm_item) {
DATA_WORKMARKERS(workmarkers, wm_item);
if (!CURRENT(&(workmarkers->expirydate)))
break;
// find the oldest READY workinfoid
if (WMREADY(workmarkers->status))
wm_last = wm_item;
wm_item = prev_in_ktree(ctx);
}
K_RUNLOCK(workmarkers_free);
if (!wm_last) {
if (!msg)
LOGDEBUG("%s() no READY workmarkers", __func__);
else
LOGWARNING("%s() no READY workmarkers", __func__);
return false;
}
DATA_WORKMARKERS(workmarkers, wm_last);
LOGDEBUG("%s() processing workmarkers %"PRId64"/%s/End %"PRId64"/"
"Stt %"PRId64"/%s/%s",
__func__, workmarkers->markerid, workmarkers->poolinstance,
workmarkers->workinfoidend, workmarkers->workinfoidstart,
workmarkers->description, workmarkers->status);
if (by == NULL)
by = (char *)by_default;
if (code == NULL)
code = (char *)__func__;
if (inet == NULL)
inet = (char *)inet_default;
if (cd)
copy_tv(&now, cd);
else
setnow(&now);
return sharesummaries_to_markersummaries(NULL, workmarkers, by, code,
inet, &now, trf_root);
}
void dsp_workmarkers(K_ITEM *item, FILE *stream) void dsp_workmarkers(K_ITEM *item, FILE *stream)
{ {
WORKMARKERS *wm; WORKMARKERS *wm;
@ -2573,7 +2702,7 @@ static bool gen_workmarkers(PGconn *conn, MARKS *stt, bool after, MARKS *fin,
stt->description, after ? "++" : "", stt->description, after ? "++" : "",
fin->description, before ? "--" : ""); fin->description, before ? "--" : "");
ok = workmarkers_process(conn, true, 0, EMPTY, ok = workmarkers_process(conn, false, true, 0, EMPTY,
wi_fin->workinfoid, wi_stt->workinfoid, wi_fin->workinfoid, wi_stt->workinfoid,
description, MARKER_READY_STR, description, MARKER_READY_STR,
by, code, inet, cd, trf_root); by, code, inet, cd, trf_root);

425
src/ckdb_dbio.c

@ -1,5 +1,5 @@
/* /*
* Copyright 1995-2014 Andrew Smith * Copyright 1995-2015 Andrew Smith
* *
* This program is free software; you can redistribute it and/or modify it * This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License as published by the Free * under the terms of the GNU General Public License as published by the Free
@ -168,7 +168,8 @@ char *pqerrmsg(PGconn *conn)
#define PQPARAM15 PQPARAM8 ",$9,$10,$11,$12,$13,$14,$15" #define PQPARAM15 PQPARAM8 ",$9,$10,$11,$12,$13,$14,$15"
#define PQPARAM16 PQPARAM8 ",$9,$10,$11,$12,$13,$14,$15,$16" #define PQPARAM16 PQPARAM8 ",$9,$10,$11,$12,$13,$14,$15,$16"
#define PQPARAM22 PQPARAM16 ",$17,$18,$19,$20,$21,$22" #define PQPARAM22 PQPARAM16 ",$17,$18,$19,$20,$21,$22"
#define PQPARAM27 PQPARAM22 ",$23,$24,$25,$26,$27" #define PQPARAM26 PQPARAM22 ",$23,$24,$25,$26"
#define PQPARAM27 PQPARAM26 ",$27"
#define PARCHK(_par, _params) do { \ #define PARCHK(_par, _params) do { \
if (_par != (int)(sizeof(_params)/sizeof(_params[0]))) { \ if (_par != (int)(sizeof(_params)/sizeof(_params[0]))) { \
@ -2751,6 +2752,318 @@ bool shareerrors_fill()
return true; return true;
} }
/* TODO: what to do about a failure?
* since it will repeat every ~13s
* Of course manual intervention is possible via cmd_marks,
* so that is probably the best solution since
* we should be watching the pool all the time :)
* The cause would most likely be either a code bug or a DB problem
* so there many be no obvious automated fix
* and flagging the workmarkers to be skipped may or may not be the solution,
* thus manual intervention will be the rule for now */
bool sharesummaries_to_markersummaries(PGconn *conn, WORKMARKERS *workmarkers,
char *by, char *code, char *inet, tv_t *cd,
K_TREE *trf_root)
{
// shorter name for log messages
const char *shortname = "SS_to_MS";
ExecStatusType rescode;
PGresult *res;
K_TREE_CTX ss_ctx[1], ms_ctx[1];
SHARESUMMARY *sharesummary, looksharesummary;
MARKERSUMMARY *markersummary, lookmarkersummary;
K_ITEM *ss_item, *ss_prev, ss_look, *ms_item, ms_look;
bool ok = false, conned = false;
int64_t diffacc, shareacc;
char *reason = NULL, *tuples = NULL;
char *params[2];
int n, par = 0, deleted = -7;
int ss_count, ms_count;
char *del;
LOGWARNING("%s() Processing: workmarkers %"PRId64"/%s/"
"End %"PRId64"/Stt %"PRId64"/%s/%s",
shortname, workmarkers->markerid, workmarkers->poolinstance,
workmarkers->workinfoidend, workmarkers->workinfoidstart,
workmarkers->description, workmarkers->status);
K_STORE *old_sharesummary_store = k_new_store(sharesummary_free);
K_STORE *new_markersummary_store = k_new_store(markersummary_free);
K_TREE *ms_root = new_ktree();
if (!CURRENT(&(workmarkers->expirydate))) {
reason = "unexpired";
goto flail;
}
if (!WMREADY(workmarkers->status)) {
reason = "not ready";
goto flail;
}
// Check there aren't already any matching markersummaries
lookmarkersummary.markerid = workmarkers->markerid;
lookmarkersummary.userid = 0;
lookmarkersummary.workername = EMPTY;
INIT_MARKERSUMMARY(&ms_look);
ms_look.data = (void *)(&lookmarkersummary);
K_RLOCK(markersummary_free);
ms_item = find_after_in_ktree(markersummary_root, &ms_look,
cmp_markersummary, ms_ctx);
K_RUNLOCK(markersummary_free);
DATA_MARKERSUMMARY_NULL(markersummary, ms_item);
if (ms_item && markersummary->markerid == workmarkers->markerid) {
reason = "markersummaries already exist";
goto flail;
}
diffacc = shareacc = 0;
ms_item = NULL;
looksharesummary.workinfoid = workmarkers->workinfoidend;
looksharesummary.userid = MAXID;
looksharesummary.workername = EMPTY;
INIT_SHARESUMMARY(&ss_look);
ss_look.data = (void *)(&looksharesummary);
/* Since shares come in from ckpool at a high rate,
* we don't want to lock sharesummary for long
* Those incoming shares will not be touching the sharesummaries
* we are processing here */
K_RLOCK(sharesummary_free);
ss_item = find_before_in_ktree(sharesummary_workinfoid_root, &ss_look,
cmp_sharesummary_workinfoid, ss_ctx);
K_RUNLOCK(sharesummary_free);
while (ss_item) {
DATA_SHARESUMMARY(sharesummary, ss_item);
if (sharesummary->workinfoid < workmarkers->workinfoidstart)
break;
K_RLOCK(sharesummary_free);
ss_prev = prev_in_ktree(ss_ctx);
K_RUNLOCK(sharesummary_free);
// Find/create the markersummary only once per worker change
if (!ms_item || markersummary->userid != sharesummary->userid ||
strcmp(markersummary->workername, sharesummary->workername) != 0) {
lookmarkersummary.markerid = workmarkers->markerid;
lookmarkersummary.userid = sharesummary->userid;
lookmarkersummary.workername = sharesummary->workername;
ms_look.data = (void *)(&lookmarkersummary);
ms_item = find_in_ktree(ms_root, &ms_look,
cmp_markersummary, ms_ctx);
if (!ms_item) {
K_WLOCK(markersummary_free);
ms_item = k_unlink_head(markersummary_free);
K_WUNLOCK(markersummary_free);
k_add_head(new_markersummary_store, ms_item);
DATA_MARKERSUMMARY(markersummary, ms_item);
bzero(markersummary, sizeof(*markersummary));
markersummary->markerid = workmarkers->markerid;
markersummary->userid = sharesummary->userid;
markersummary->workername = strdup(sharesummary->workername);
LIST_MEM_ADD(markersummary_free, markersummary->workername);
ms_root = add_to_ktree(ms_root, ms_item, cmp_markersummary);
LOGDEBUG("%s() new ms %"PRId64"/%"PRId64"/%s",
shortname, markersummary->markerid,
markersummary->userid,
markersummary->workername);
} else {
DATA_MARKERSUMMARY(markersummary, ms_item);
}
}
markersummary->diffacc += sharesummary->diffacc;
markersummary->diffsta += sharesummary->diffsta;
markersummary->diffdup += sharesummary->diffdup;
markersummary->diffhi += sharesummary->diffhi;
markersummary->diffrej += sharesummary->diffrej;
markersummary->shareacc += sharesummary->shareacc;
markersummary->sharesta += sharesummary->sharesta;
markersummary->sharedup += sharesummary->sharedup;
markersummary->sharehi += sharesummary->sharehi;
markersummary->sharerej += sharesummary->sharerej;
markersummary->sharecount += sharesummary->sharecount;
markersummary->errorcount += sharesummary->errorcount;
if (!markersummary->firstshare.tv_sec ||
!tv_newer(&(markersummary->firstshare), &(sharesummary->firstshare))) {
copy_tv(&(markersummary->firstshare), &(sharesummary->firstshare));
}
if (tv_newer(&(markersummary->lastshare), &(sharesummary->lastshare))) {
copy_tv(&(markersummary->lastshare), &(sharesummary->lastshare));
markersummary->lastdiffacc = sharesummary->lastdiffacc;
}
diffacc += sharesummary->diffacc;
shareacc += sharesummary->shareacc;
k_unlink_item(sharesummary_store, ss_item);
k_add_head(old_sharesummary_store, ss_item);
ss_item = ss_prev;
}
if (old_sharesummary_store->count == 0)
reason = "no sharesummaries";
else {
if (conn == NULL) {
conn = dbconnect();
conned = true;
}
res = PQexec(conn, "Begin", CKPQ_WRITE);
rescode = PQresultStatus(res);
PQclear(res);
if (!PGOK(rescode)) {
PGLOGERR("Begin", rescode, conn);
goto flail;
}
ms_item = new_markersummary_store->head;
while (ms_item) {
if (!(markersummary_add(conn, ms_item, by, code, inet,
cd, trf_root))) {
reason = "db error";
goto rollback;
}
ms_item = ms_item->next;
}
par = 0;
params[par++] = bigint_to_buf(workmarkers->workinfoidstart, NULL, 0);
params[par++] = bigint_to_buf(workmarkers->workinfoidend, NULL, 0);
PARCHK(par, params);
del = "delete from sharesummary "
"where workinfoid >= $1 and workinfoid <= $2";
res = PQexecParams(conn, del, par, NULL, (const char **)params, NULL, NULL, 0, CKPQ_WRITE);
rescode = PQresultStatus(res);
if (PGOK(rescode)) {
tuples = PQcmdTuples(res);
if (tuples && *tuples)
deleted = atoi(tuples);
}
PQclear(res);
if (!PGOK(rescode)) {
PGLOGERR("Delete", rescode, conn);
reason = "delete failure";
goto rollback;
}
if (deleted != old_sharesummary_store->count) {
LOGERR("%s() processed sharesummaries=%d but deleted=%d",
shortname, old_sharesummary_store->count, deleted);
reason = "delete mismatch";
goto rollback;
}
ok = workmarkers_process(conn, true, true,
workmarkers->markerid,
workmarkers->poolinstance,
workmarkers->workinfoidend,
workmarkers->workinfoidstart,
workmarkers->description,
MARKER_PROCESSED_STR,
by, code, inet, cd, trf_root);
rollback:
if (ok)
res = PQexec(conn, "Commit", CKPQ_WRITE);
else
res = PQexec(conn, "Rollback", CKPQ_WRITE);
PQclear(res);
}
flail:
for (n = 0; n < par; n++)
free(params[n]);
if (conned)
PQfinish(conn);
if (reason) {
// already displayed the full workmarkers detail at the top
LOGERR("%s() %s: workmarkers %"PRId64"/%s/%s",
shortname, reason, workmarkers->markerid,
workmarkers->description, workmarkers->status);
ok = false;
}
if (!ok) {
if (new_markersummary_store->count > 0) {
// Throw them away (they don't exist anywhere else)
ms_item = new_markersummary_store->head;
while (ms_item) {
free_markersummary_data(ms_item);
ms_item = ms_item->next;
}
K_WLOCK(markersummary_free);
k_list_transfer_to_head(new_markersummary_store, markersummary_free);
K_WUNLOCK(markersummary_free);
}
if (old_sharesummary_store->count > 0) {
// Put them back in the store where they came from
K_WLOCK(sharesummary_free);
k_list_transfer_to_head(old_sharesummary_store, sharesummary_store);
K_WUNLOCK(sharesummary_free);
}
} else {
ms_count = new_markersummary_store->count;
ss_count = old_sharesummary_store->count;
// Deadlock alert for other newer code ...
K_WLOCK(sharesummary_free);
K_WLOCK(markersummary_free);
ms_item = new_markersummary_store->head;
while (ms_item) {
// Move the new markersummaries into the trees/stores
markersummary_root = add_to_ktree(markersummary_root,
ms_item,
cmp_markersummary);
markersummary_userid_root = add_to_ktree(markersummary_userid_root,
ms_item,
cmp_markersummary_userid);
ms_item = ms_item->next;
}
k_list_transfer_to_head(new_markersummary_store, markersummary_store);
/* For normal shift processing this wont be very quick
* so it will be a 'long' LOCK */
ss_item = old_sharesummary_store->head;
while (ss_item) {
// remove the old sharesummaries from the trees
sharesummary_root = remove_from_ktree(sharesummary_root,
ss_item,
cmp_sharesummary);
sharesummary_workinfoid_root = remove_from_ktree(sharesummary_workinfoid_root,
ss_item,
cmp_sharesummary_workinfoid);
free_sharesummary_data(ss_item);
ss_item = ss_item->next;
}
k_list_transfer_to_head(old_sharesummary_store, sharesummary_free);
K_WUNLOCK(markersummary_free);
K_WUNLOCK(sharesummary_free);
LOGWARNING("%s() Processed: %d ms %d ss %"PRId64" shares "
"%"PRId64" diff for workmarkers %"PRId64"/%s/"
"End %"PRId64"/Stt %"PRId64"/%s/%s",
shortname, ms_count, ss_count, shareacc, diffacc,
workmarkers->markerid, workmarkers->poolinstance,
workmarkers->workinfoidend,
workmarkers->workinfoidstart,
workmarkers->description,
workmarkers->status);
}
ms_root = free_ktree(ms_root, NULL);
new_markersummary_store = k_free_store(new_markersummary_store);
old_sharesummary_store = k_free_store(old_sharesummary_store);
return ok;
}
bool _sharesummary_update(PGconn *conn, SHARES *s_row, SHAREERRORS *e_row, K_ITEM *ss_item, bool _sharesummary_update(PGconn *conn, SHARES *s_row, SHAREERRORS *e_row, K_ITEM *ss_item,
char *by, char *code, char *inet, tv_t *cd, WHERE_FFL_ARGS) char *by, char *code, char *inet, tv_t *cd, WHERE_FFL_ARGS)
{ {
@ -5204,6 +5517,82 @@ clean:
return ok; return ok;
} }
bool markersummary_add(PGconn *conn, K_ITEM *ms_item, char *by, char *code,
char *inet, tv_t *cd, K_TREE *trf_root)
{
ExecStatusType rescode;
bool conned = false;
PGresult *res;
MARKERSUMMARY *row;
char *params[18 + MODIFYDATECOUNT];
int n, par = 0;
char *ins;
bool ok = false;
LOGDEBUG("%s(): add", __func__);
DATA_MARKERSUMMARY(row, ms_item);
MODIFYDATEPOINTERS(markersummary_free, row, cd, by, code, inet);
MODIFYDATETRANSFER(markersummary_free, trf_root, row);
par = 0;
params[par++] = bigint_to_buf(row->markerid, NULL, 0);
params[par++] = bigint_to_buf(row->userid, NULL, 0);
params[par++] = str_to_buf(row->workername, NULL, 0);
params[par++] = double_to_buf(row->diffacc, NULL, 0);
params[par++] = double_to_buf(row->diffsta, NULL, 0);
params[par++] = double_to_buf(row->diffdup, NULL, 0);
params[par++] = double_to_buf(row->diffhi, NULL, 0);
params[par++] = double_to_buf(row->diffrej, NULL, 0);
params[par++] = double_to_buf(row->shareacc, NULL, 0);
params[par++] = double_to_buf(row->sharesta, NULL, 0);
params[par++] = double_to_buf(row->sharedup, NULL, 0);
params[par++] = double_to_buf(row->sharehi, NULL, 0);
params[par++] = double_to_buf(row->sharerej, NULL, 0);
params[par++] = bigint_to_buf(row->sharecount, NULL, 0);
params[par++] = bigint_to_buf(row->errorcount, NULL, 0);
params[par++] = tv_to_buf(&(row->firstshare), NULL, 0);
params[par++] = tv_to_buf(&(row->lastshare), NULL, 0);
params[par++] = double_to_buf(row->lastdiffacc, NULL, 0);
MODIFYDATEPARAMS(params, par, row);
PARCHK(par, params);
ins = "insert into markersummary "
"(markerid,userid,workername,diffacc,diffsta,diffdup,diffhi,"
"diffrej,shareacc,sharesta,sharedup,sharehi,sharerej,"
"sharecount,errorcount,firstshare,lastshare,lastdiffacc"
MODIFYDATECONTROL ") values (" PQPARAM26 ")";
LOGDEBUG("%s() adding ms %"PRId64"/%"PRId64"/%s/%.0f",
__func__, row->markerid, row->userid, row->workername,
row->diffacc);
if (!conn) {
conn = dbconnect();
conned = true;
}
res = PQexecParams(conn, ins, par, NULL, (const char **)params, NULL, NULL, 0, CKPQ_WRITE);
rescode = PQresultStatus(res);
if (!PGOK(rescode)) {
PGLOGERR("Insert", rescode, conn);
goto unparam;
}
ok = true;
unparam:
PQclear(res);
if (conned)
PQfinish(conn);
for (n = 0; n < par; n++)
free(params[n]);
// caller must do tree/list/store changes
return ok;
}
bool markersummary_fill(PGconn *conn) bool markersummary_fill(PGconn *conn)
{ {
ExecStatusType rescode; ExecStatusType rescode;
@ -5380,10 +5769,10 @@ bool markersummary_fill(PGconn *conn)
* since we only check for a CURRENT workmarkers * since we only check for a CURRENT workmarkers
* N.B. also, this returns success if !add and there is no matching * N.B. also, this returns success if !add and there is no matching
* old workmarkers */ * old workmarkers */
bool _workmarkers_process(PGconn *conn, bool add, int64_t markerid, bool _workmarkers_process(PGconn *conn, bool already, bool add,
char *poolinstance, int64_t workinfoidend, int64_t markerid, char *poolinstance,
int64_t workinfoidstart, char *description, int64_t workinfoidend, int64_t workinfoidstart,
char *status, char *by, char *code, char *description, char *status, char *by, char *code,
char *inet, tv_t *cd, K_TREE *trf_root, char *inet, tv_t *cd, K_TREE *trf_root,
WHERE_FFL_ARGS) WHERE_FFL_ARGS)
{ {
@ -5391,7 +5780,7 @@ bool _workmarkers_process(PGconn *conn, bool add, int64_t markerid,
bool conned = false; bool conned = false;
PGresult *res = NULL; PGresult *res = NULL;
K_ITEM *wm_item = NULL, *old_wm_item = NULL, *w_item; K_ITEM *wm_item = NULL, *old_wm_item = NULL, *w_item;
WORKMARKERS *row, *oldworkmarkers; WORKMARKERS *row, *oldworkmarkers = NULL;
char *upd, *ins; char *upd, *ins;
char *params[6 + HISTORYDATECOUNT]; char *params[6 + HISTORYDATECOUNT];
bool ok = false, begun = false; bool ok = false, begun = false;
@ -5414,15 +5803,17 @@ bool _workmarkers_process(PGconn *conn, bool add, int64_t markerid,
conn = dbconnect(); conn = dbconnect();
conned = true; conned = true;
} }
res = PQexec(conn, "Begin", CKPQ_WRITE); if (!already) {
rescode = PQresultStatus(res); res = PQexec(conn, "Begin", CKPQ_WRITE);
PQclear(res); rescode = PQresultStatus(res);
if (!PGOK(rescode)) { PQclear(res);
PGLOGERR("Begin", rescode, conn); if (!PGOK(rescode)) {
goto unparam; PGLOGERR("Begin", rescode, conn);
} goto unparam;
}
begun = true; begun = true;
}
upd = "update workmarkers set expirydate=$1 where markerid=$2" upd = "update workmarkers set expirydate=$1 where markerid=$2"
" and expirydate=$3"; " and expirydate=$3";
@ -5473,7 +5864,7 @@ bool _workmarkers_process(PGconn *conn, bool add, int64_t markerid,
conned = true; conned = true;
} }
if (!begun) { if (!already && !begun) {
res = PQexec(conn, "Begin", CKPQ_WRITE); res = PQexec(conn, "Begin", CKPQ_WRITE);
rescode = PQresultStatus(res); rescode = PQresultStatus(res);
PQclear(res); PQclear(res);
@ -5707,7 +6098,7 @@ bool _marks_process(PGconn *conn, bool add, char *poolinstance,
bool conned = false; bool conned = false;
PGresult *res = NULL; PGresult *res = NULL;
K_ITEM *m_item = NULL, *old_m_item = NULL, *w_item; K_ITEM *m_item = NULL, *old_m_item = NULL, *w_item;
MARKS *row, *oldmarks; MARKS *row, *oldmarks = NULL;
char *upd, *ins; char *upd, *ins;
char *params[6 + HISTORYDATECOUNT]; char *params[6 + HISTORYDATECOUNT];
bool ok = false, begun = false; bool ok = false, begun = false;

Loading…
Cancel
Save