From 297ae72e064d0e81e53c17535c540ca442736da4 Mon Sep 17 00:00:00 2001 From: kanoi Date: Thu, 15 Jan 2015 19:08:31 +1100 Subject: [PATCH] ckdb - enable, optionally automatic but off by default, workmarkers processing --- src/ckdb.c | 101 ++---------- src/ckdb.h | 65 ++++++-- src/ckdb_cmd.c | 14 +- src/ckdb_data.c | 131 ++++++++++++++- src/ckdb_dbio.c | 425 ++++++++++++++++++++++++++++++++++++++++++++++-- 5 files changed, 621 insertions(+), 115 deletions(-) diff --git a/src/ckdb.c b/src/ckdb.c index 8fa5580d..91bb3fdc 100644 --- a/src/ckdb.c +++ b/src/ckdb.c @@ -169,6 +169,9 @@ static char *status_chars = "|/-\\"; static char *restorefrom; +// Only accessed in here +static bool markersummary_auto; + // disallow: '/' '.' '_' and FLDSEP const char *userpatt = "^[^/\\._"FLDSEPSTR"]*$"; const char *mailpatt = "^[A-Za-z0-9_-][A-Za-z0-9_\\.-]*@[A-Za-z0-9][A-Za-z0-9\\.-]*[A-Za-z0-9]$"; @@ -1076,82 +1079,6 @@ static void alloc_storage() marks_root = new_ktree(); } -static void free_workinfo_data(K_ITEM *item) -{ - WORKINFO *workinfo; - - DATA_WORKINFO(workinfo, item); - if (workinfo->transactiontree) - FREENULL(workinfo->transactiontree); - if (workinfo->merklehash) - FREENULL(workinfo->merklehash); -} - -static void free_sharesummary_data(K_ITEM *item) -{ - SHARESUMMARY *sharesummary; - - DATA_SHARESUMMARY(sharesummary, item); - if (sharesummary->workername) { - LIST_MEM_SUB(sharesummary_free, sharesummary->workername); - FREENULL(sharesummary->workername); - } - SET_CREATEBY(sharesummary_free, sharesummary->createby, EMPTY); - SET_CREATECODE(sharesummary_free, sharesummary->createcode, EMPTY); - SET_CREATEINET(sharesummary_free, sharesummary->createinet, EMPTY); - SET_MODIFYBY(sharesummary_free, sharesummary->modifyby, EMPTY); - SET_MODIFYCODE(sharesummary_free, sharesummary->modifycode, EMPTY); - SET_MODIFYINET(sharesummary_free, sharesummary->modifyinet, EMPTY); -} - -static void free_optioncontrol_data(K_ITEM *item) -{ - OPTIONCONTROL *optioncontrol; - - DATA_OPTIONCONTROL(optioncontrol, item); - if (optioncontrol->optionvalue) - FREENULL(optioncontrol->optionvalue); -} - -static void free_markersummary_data(K_ITEM *item) -{ - MARKERSUMMARY *markersummary; - - DATA_MARKERSUMMARY(markersummary, item); - if (markersummary->workername) - FREENULL(markersummary->workername); - SET_CREATEBY(markersummary_free, markersummary->createby, EMPTY); - SET_CREATECODE(markersummary_free, markersummary->createcode, EMPTY); - SET_CREATEINET(markersummary_free, markersummary->createinet, EMPTY); - SET_MODIFYBY(markersummary_free, markersummary->modifyby, EMPTY); - SET_MODIFYCODE(markersummary_free, markersummary->modifycode, EMPTY); - SET_MODIFYINET(markersummary_free, markersummary->modifyinet, EMPTY); -} - -static void free_workmarkers_data(K_ITEM *item) -{ - WORKMARKERS *workmarkers; - - DATA_WORKMARKERS(workmarkers, item); - if (workmarkers->poolinstance) - FREENULL(workmarkers->poolinstance); - if (workmarkers->description) - FREENULL(workmarkers->description); -} - -static void free_marks_data(K_ITEM *item) -{ - MARKS *marks; - - DATA_MARKS(marks, item); - if (marks->poolinstance && marks->poolinstance != EMPTY) - FREENULL(marks->poolinstance); - if (marks->description && marks->description != EMPTY) - FREENULL(marks->description); - if (marks->extra && marks->extra != EMPTY) - FREENULL(marks->extra); -} - #define FREE_TREE(_tree) \ if (_tree ## _root) \ _tree ## _root = free_ktree(_tree ## _root, NULL) \ @@ -2081,8 +2008,8 @@ static void make_a_shift_mark() K_ITEM wi_look, ss_look; SHARESUMMARY *sharesummary, looksharesummary; WORKINFO *workinfo, lookworkinfo; - BLOCKS *blocks; - MARKS *marks, *sh_marks; + BLOCKS *blocks = NULL; + MARKS *marks = NULL, *sh_marks = NULL; int64_t ss_age_wid, last_marks_wid, marks_wid, prev_wid; bool was_block = false, ok; char cd_buf[DATE_BUFSIZ], cd_buf2[DATE_BUFSIZ]; @@ -2197,7 +2124,8 @@ static void make_a_shift_mark() if (m_item) { /* First block after the last mark - * Shift must stop at or before this */ + * Shift must stop at or before this + * N.B. any block, even 'New' */ K_RLOCK(blocks_free); b_item = first_in_ktree(blocks_root, b_ctx); while (b_item) { @@ -2474,16 +2402,16 @@ static void *marker(__maybe_unused void *arg) else make_a_workmarker(); -#if 0 for (i = 0; i < 4; i++) { if (!everyone_die) sleep(1); } if (everyone_die) break; - else - make_markersummaries(); -#endif + else { + if (markersummary_auto) + make_markersummaries(false, NULL, NULL, NULL, NULL, NULL); + } } marker_using_data = false; @@ -4044,6 +3972,8 @@ static struct option long_options[] = { { "help", no_argument, 0, 'h' }, { "killold", no_argument, 0, 'k' }, { "loglevel", required_argument, 0, 'l' }, + // markersummary = enable markersummary auto generation + { "markersummary", no_argument, 0, 'm' }, { "name", required_argument, 0, 'n' }, { "dbpass", required_argument, 0, 'p' }, { "btc-pass", required_argument, 0, 'P' }, @@ -4088,7 +4018,7 @@ int main(int argc, char **argv) memset(&ckp, 0, sizeof(ckp)); ckp.loglevel = LOG_NOTICE; - while ((c = getopt_long(argc, argv, "c:d:hkl:n:p:P:r:R:s:S:t:u:U:vw:yY:", long_options, &i)) != -1) { + while ((c = getopt_long(argc, argv, "c:d:hkl:mn:p:P:r:R:s:S:t:u:U:vw:yY:", long_options, &i)) != -1) { switch(c) { case 'c': ckp.config = strdup(optarg); @@ -4126,6 +4056,9 @@ int main(int argc, char **argv) LOG_EMERG, LOG_DEBUG, ckp.loglevel); } break; + case 'm': + markersummary_auto = true; + break; case 'n': ckp.name = strdup(optarg); break; diff --git a/src/ckdb.h b/src/ckdb.h index 6206887d..341ced0c 100644 --- a/src/ckdb.h +++ b/src/ckdb.h @@ -52,7 +52,7 @@ #define DB_VLOCK "1" #define DB_VERSION "0.9.6" -#define CKDB_VERSION DB_VERSION"-0.822" +#define CKDB_VERSION DB_VERSION"-0.830" #define WHERE_FFL " - from %s %s() line %d" #define WHERE_FFL_HERE __FILE__, __func__, __LINE__ @@ -523,6 +523,34 @@ enum cmd_values { _row->pointers = _row->pointers; \ } while (0) +/* Override _row defaults if transfer fields are present + * We don't care about the reply so it can be small + * This is the pointer version - only one required so far */ +#define MODIFYDATETRANSFER(_list, _root, _row) do { \ + if (_root) { \ + char __reply[16]; \ + size_t __siz = sizeof(__reply); \ + K_ITEM *__item; \ + TRANSFER *__transfer; \ + __item = optional_name(_root, "createby", 1, NULL, __reply, __siz); \ + if (__item) { \ + DATA_TRANSFER(__transfer, __item); \ + SET_CREATEBY(_list, _row->createby, __transfer->mvalue); \ + } \ + __item = optional_name(_root, "createcode", 1, NULL, __reply, __siz); \ + if (__item) { \ + DATA_TRANSFER(__transfer, __item); \ + SET_CREATECODE(_list, _row->createcode, __transfer->mvalue); \ + } \ + __item = optional_name(_root, "createinet", 1, NULL, __reply, __siz); \ + if (__item) { \ + DATA_TRANSFER(__transfer, __item); \ + SET_CREATEINET(_list, _row->createinet, __transfer->mvalue); \ + } \ + _row->pointers = _row->pointers; \ + } \ + } while (0) + #define SIMPLEDATECONTROL ",createdate,createby,createcode,createinet" #define SIMPLEDATECOUNT 4 #define SIMPLEDATECONTROLFIELDS \ @@ -1480,6 +1508,14 @@ extern PGconn *dbconnect(); // *** ckdb_data.c *** // *** +// Data free functions (first) +extern void free_workinfo_data(K_ITEM *item); +extern void free_sharesummary_data(K_ITEM *item); +extern void free_optioncontrol_data(K_ITEM *item); +extern void free_markersummary_data(K_ITEM *item); +extern void free_workmarkers_data(K_ITEM *item); +extern void free_marks_data(K_ITEM *item); + extern char *safe_text(char *txt); extern void username_trim(USERS *users); extern bool like_address(char *username); @@ -1629,10 +1665,12 @@ extern bool userstats_starttimeband(USERSTATS *row, tv_t *statsdate); extern void dsp_markersummary(K_ITEM *item, FILE *stream); extern cmp_t cmp_markersummary(K_ITEM *a, K_ITEM *b); extern cmp_t cmp_markersummary_userid(K_ITEM *a, K_ITEM *b); -extern K_ITEM *find_markersummary(int64_t workinfoid, int64_t userid, - char *workername); extern K_ITEM *find_markersummary_userid(int64_t userid, char *workername, K_TREE_CTX *ctx); +extern K_ITEM *find_markersummary(int64_t workinfoid, int64_t userid, + char *workername); +extern bool make_markersummaries(bool msg, char *by, char *code, char *inet, + tv_t *cd, K_TREE *trf_root); extern void dsp_workmarkers(K_ITEM *item, FILE *stream); extern cmp_t cmp_workmarkers(K_ITEM *a, K_ITEM *b); extern cmp_t cmp_workmarkers_workinfoid(K_ITEM *a, K_ITEM *b); @@ -1750,6 +1788,9 @@ extern bool shareerrors_add(PGconn *conn, char *workinfoid, char *username, char *workername, char *clientid, char *errn, char *error, char *secondaryuserid, char *by, char *code, char *inet, tv_t *cd, K_TREE *trf_root); +extern bool sharesummaries_to_markersummaries(PGconn *conn, WORKMARKERS *workmarkers, + char *by, char *code, char *inet, + tv_t *cd, K_TREE *trf_root); #define sharesummary_update(_conn, _s_row, _e_row, _ss_item, _by, _code, _inet, _cd) \ _sharesummary_update(_conn, _s_row, _e_row, _ss_item, _by, _code, _inet, _cd, \ WHERE_FFL_HERE) @@ -1796,20 +1837,22 @@ extern bool workerstats_add(char *poolinstance, char *elapsed, char *username, char *by, char *code, char *inet, tv_t *cd, K_TREE *trf_root); extern bool userstats_fill(PGconn *conn); +extern bool markersummary_add(PGconn *conn, K_ITEM *ms_item, char *by, char *code, + char *inet, tv_t *cd, K_TREE *trf_root); extern bool markersummary_fill(PGconn *conn); -#define workmarkers_process(_conn, _add, _markerid, _poolinstance, \ +#define workmarkers_process(_conn, _already, _add, _markerid, _poolinstance, \ _workinfoidend, _workinfoidstart, _description, \ _status, _by, _code, _inet, _cd, _trf_root) \ - _workmarkers_process(_conn, _add, _markerid, _poolinstance, \ + _workmarkers_process(_conn, _already, _add, _markerid, _poolinstance, \ _workinfoidend, _workinfoidstart, _description, \ _status, _by, _code, _inet, _cd, _trf_root, \ WHERE_FFL_HERE) -extern bool _workmarkers_process(PGconn *conn, bool add, int64_t markerid, - char *poolinstance, int64_t workinfoidend, - int64_t workinfoidstart, char *description, - char *status, char *by, char *code, - char *inet, tv_t *cd, K_TREE *trf_root, - WHERE_FFL_ARGS); +extern bool _workmarkers_process(PGconn *conn, bool already, bool add, + int64_t markerid, char *poolinstance, + int64_t workinfoidend, int64_t workinfoidstart, + char *description, char *status, char *by, + char *code, char *inet, tv_t *cd, + K_TREE *trf_root, WHERE_FFL_ARGS); extern bool workmarkers_fill(PGconn *conn); #define marks_process(_conn, _add, _poolinstance, _workinfoid, _description, \ _extra, _marktype, _status, _by, _code, _inet, _cd, \ diff --git a/src/ckdb_cmd.c b/src/ckdb_cmd.c index 2ec947a6..233b36bf 100644 --- a/src/ckdb_cmd.c +++ b/src/ckdb_cmd.c @@ -4092,7 +4092,7 @@ static char *cmd_userstatus(PGconn *conn, char *cmd, char *id, tv_t *now, char * } /* Socket interface to the functions that will be used later to automatically - * create marks, workmarkers and process the workmarkers + * create marks, workmarkers and process the workmarkers and sharesummaries * to generate markersummaries */ static char *cmd_marks(PGconn *conn, char *cmd, char *id, __maybe_unused tv_t *now, char *by, @@ -4372,7 +4372,7 @@ static char *cmd_marks(PGconn *conn, char *cmd, char *id, DATA_WORKMARKERS(workmarkers, wm_item); if (CURRENT(&(workmarkers->expirydate)) && !WMPROCESSED(workmarkers->status)) { - ok = workmarkers_process(conn, false, + ok = workmarkers_process(conn, false, false, workmarkers->markerid, NULL, 0, 0, NULL, NULL, by, code, inet, cd, trf_root); @@ -4391,6 +4391,16 @@ static char *cmd_marks(PGconn *conn, char *cmd, char *id, "%d workmarkers expunged", count); } } + } else if (strcasecmp(action, "sum") == 0) { + /* For the last available workmarker, + * summarise it's sharesummaries into markersummaries + * No parameters */ + ok = make_markersummaries(true, by, code, inet, cd, trf_root); + if (!ok) { + snprintf(reply, siz, "%s failed", action); + LOGERR("%s.%s", id, reply); + return strdup(reply); + } } else { snprintf(reply, siz, "unknown action '%s'", action); LOGERR("%s.%s", id, reply); diff --git a/src/ckdb_data.c b/src/ckdb_data.c index be92fbd0..abb1b40f 100644 --- a/src/ckdb_data.c +++ b/src/ckdb_data.c @@ -9,6 +9,83 @@ #include "ckdb.h" +// Data free functions (added here as needed) +void free_workinfo_data(K_ITEM *item) +{ + WORKINFO *workinfo; + + DATA_WORKINFO(workinfo, item); + if (workinfo->transactiontree) + FREENULL(workinfo->transactiontree); + if (workinfo->merklehash) + FREENULL(workinfo->merklehash); +} + +void free_sharesummary_data(K_ITEM *item) +{ + SHARESUMMARY *sharesummary; + + DATA_SHARESUMMARY(sharesummary, item); + if (sharesummary->workername) { + LIST_MEM_SUB(sharesummary_free, sharesummary->workername); + FREENULL(sharesummary->workername); + } + SET_CREATEBY(sharesummary_free, sharesummary->createby, EMPTY); + SET_CREATECODE(sharesummary_free, sharesummary->createcode, EMPTY); + SET_CREATEINET(sharesummary_free, sharesummary->createinet, EMPTY); + SET_MODIFYBY(sharesummary_free, sharesummary->modifyby, EMPTY); + SET_MODIFYCODE(sharesummary_free, sharesummary->modifycode, EMPTY); + SET_MODIFYINET(sharesummary_free, sharesummary->modifyinet, EMPTY); +} + +void free_optioncontrol_data(K_ITEM *item) +{ + OPTIONCONTROL *optioncontrol; + + DATA_OPTIONCONTROL(optioncontrol, item); + if (optioncontrol->optionvalue) + FREENULL(optioncontrol->optionvalue); +} + +void free_markersummary_data(K_ITEM *item) +{ + MARKERSUMMARY *markersummary; + + DATA_MARKERSUMMARY(markersummary, item); + if (markersummary->workername) + FREENULL(markersummary->workername); + SET_CREATEBY(markersummary_free, markersummary->createby, EMPTY); + SET_CREATECODE(markersummary_free, markersummary->createcode, EMPTY); + SET_CREATEINET(markersummary_free, markersummary->createinet, EMPTY); + SET_MODIFYBY(markersummary_free, markersummary->modifyby, EMPTY); + SET_MODIFYCODE(markersummary_free, markersummary->modifycode, EMPTY); + SET_MODIFYINET(markersummary_free, markersummary->modifyinet, EMPTY); +} + +void free_workmarkers_data(K_ITEM *item) +{ + WORKMARKERS *workmarkers; + + DATA_WORKMARKERS(workmarkers, item); + if (workmarkers->poolinstance) + FREENULL(workmarkers->poolinstance); + if (workmarkers->description) + FREENULL(workmarkers->description); +} + +void free_marks_data(K_ITEM *item) +{ + MARKS *marks; + + DATA_MARKS(marks, item); + if (marks->poolinstance && marks->poolinstance != EMPTY) + FREENULL(marks->poolinstance); + if (marks->description && marks->description != EMPTY) + FREENULL(marks->description); + if (marks->extra && marks->extra != EMPTY) + FREENULL(marks->extra); +} + // Clear text printable version of txt up to first '\0' char *safe_text(char *txt) { @@ -2390,6 +2467,58 @@ K_ITEM *find_markersummary(int64_t workinfoid, int64_t userid, char *workername) return ms_item; } +bool make_markersummaries(bool msg, char *by, char *code, char *inet, + tv_t *cd, K_TREE *trf_root) +{ + K_TREE_CTX ctx[1]; + WORKMARKERS *workmarkers; + K_ITEM *wm_item, *wm_last = NULL; + tv_t now; + + K_RLOCK(workmarkers_free); + wm_item = last_in_ktree(workmarkers_workinfoid_root, ctx); + while (wm_item) { + DATA_WORKMARKERS(workmarkers, wm_item); + if (!CURRENT(&(workmarkers->expirydate))) + break; + // find the oldest READY workinfoid + if (WMREADY(workmarkers->status)) + wm_last = wm_item; + wm_item = prev_in_ktree(ctx); + } + K_RUNLOCK(workmarkers_free); + + if (!wm_last) { + if (!msg) + LOGDEBUG("%s() no READY workmarkers", __func__); + else + LOGWARNING("%s() no READY workmarkers", __func__); + return false; + } + + DATA_WORKMARKERS(workmarkers, wm_last); + + LOGDEBUG("%s() processing workmarkers %"PRId64"/%s/End %"PRId64"/" + "Stt %"PRId64"/%s/%s", + __func__, workmarkers->markerid, workmarkers->poolinstance, + workmarkers->workinfoidend, workmarkers->workinfoidstart, + workmarkers->description, workmarkers->status); + + if (by == NULL) + by = (char *)by_default; + if (code == NULL) + code = (char *)__func__; + if (inet == NULL) + inet = (char *)inet_default; + if (cd) + copy_tv(&now, cd); + else + setnow(&now); + + return sharesummaries_to_markersummaries(NULL, workmarkers, by, code, + inet, &now, trf_root); +} + void dsp_workmarkers(K_ITEM *item, FILE *stream) { WORKMARKERS *wm; @@ -2573,7 +2702,7 @@ static bool gen_workmarkers(PGconn *conn, MARKS *stt, bool after, MARKS *fin, stt->description, after ? "++" : "", fin->description, before ? "--" : ""); - ok = workmarkers_process(conn, true, 0, EMPTY, + ok = workmarkers_process(conn, false, true, 0, EMPTY, wi_fin->workinfoid, wi_stt->workinfoid, description, MARKER_READY_STR, by, code, inet, cd, trf_root); diff --git a/src/ckdb_dbio.c b/src/ckdb_dbio.c index 9245f968..c4385982 100644 --- a/src/ckdb_dbio.c +++ b/src/ckdb_dbio.c @@ -1,5 +1,5 @@ /* - * Copyright 1995-2014 Andrew Smith + * Copyright 1995-2015 Andrew Smith * * This program is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License as published by the Free @@ -168,7 +168,8 @@ char *pqerrmsg(PGconn *conn) #define PQPARAM15 PQPARAM8 ",$9,$10,$11,$12,$13,$14,$15" #define PQPARAM16 PQPARAM8 ",$9,$10,$11,$12,$13,$14,$15,$16" #define PQPARAM22 PQPARAM16 ",$17,$18,$19,$20,$21,$22" -#define PQPARAM27 PQPARAM22 ",$23,$24,$25,$26,$27" +#define PQPARAM26 PQPARAM22 ",$23,$24,$25,$26" +#define PQPARAM27 PQPARAM26 ",$27" #define PARCHK(_par, _params) do { \ if (_par != (int)(sizeof(_params)/sizeof(_params[0]))) { \ @@ -2751,6 +2752,318 @@ bool shareerrors_fill() return true; } +/* TODO: what to do about a failure? + * since it will repeat every ~13s + * Of course manual intervention is possible via cmd_marks, + * so that is probably the best solution since + * we should be watching the pool all the time :) + * The cause would most likely be either a code bug or a DB problem + * so there many be no obvious automated fix + * and flagging the workmarkers to be skipped may or may not be the solution, + * thus manual intervention will be the rule for now */ +bool sharesummaries_to_markersummaries(PGconn *conn, WORKMARKERS *workmarkers, + char *by, char *code, char *inet, tv_t *cd, + K_TREE *trf_root) +{ + // shorter name for log messages + const char *shortname = "SS_to_MS"; + ExecStatusType rescode; + PGresult *res; + K_TREE_CTX ss_ctx[1], ms_ctx[1]; + SHARESUMMARY *sharesummary, looksharesummary; + MARKERSUMMARY *markersummary, lookmarkersummary; + K_ITEM *ss_item, *ss_prev, ss_look, *ms_item, ms_look; + bool ok = false, conned = false; + int64_t diffacc, shareacc; + char *reason = NULL, *tuples = NULL; + char *params[2]; + int n, par = 0, deleted = -7; + int ss_count, ms_count; + char *del; + + LOGWARNING("%s() Processing: workmarkers %"PRId64"/%s/" + "End %"PRId64"/Stt %"PRId64"/%s/%s", + shortname, workmarkers->markerid, workmarkers->poolinstance, + workmarkers->workinfoidend, workmarkers->workinfoidstart, + workmarkers->description, workmarkers->status); + + K_STORE *old_sharesummary_store = k_new_store(sharesummary_free); + K_STORE *new_markersummary_store = k_new_store(markersummary_free); + K_TREE *ms_root = new_ktree(); + + if (!CURRENT(&(workmarkers->expirydate))) { + reason = "unexpired"; + goto flail; + } + + if (!WMREADY(workmarkers->status)) { + reason = "not ready"; + goto flail; + } + + // Check there aren't already any matching markersummaries + lookmarkersummary.markerid = workmarkers->markerid; + lookmarkersummary.userid = 0; + lookmarkersummary.workername = EMPTY; + + INIT_MARKERSUMMARY(&ms_look); + ms_look.data = (void *)(&lookmarkersummary); + K_RLOCK(markersummary_free); + ms_item = find_after_in_ktree(markersummary_root, &ms_look, + cmp_markersummary, ms_ctx); + K_RUNLOCK(markersummary_free); + DATA_MARKERSUMMARY_NULL(markersummary, ms_item); + if (ms_item && markersummary->markerid == workmarkers->markerid) { + reason = "markersummaries already exist"; + goto flail; + } + + diffacc = shareacc = 0; + ms_item = NULL; + + looksharesummary.workinfoid = workmarkers->workinfoidend; + looksharesummary.userid = MAXID; + looksharesummary.workername = EMPTY; + + INIT_SHARESUMMARY(&ss_look); + ss_look.data = (void *)(&looksharesummary); + /* Since shares come in from ckpool at a high rate, + * we don't want to lock sharesummary for long + * Those incoming shares will not be touching the sharesummaries + * we are processing here */ + K_RLOCK(sharesummary_free); + ss_item = find_before_in_ktree(sharesummary_workinfoid_root, &ss_look, + cmp_sharesummary_workinfoid, ss_ctx); + K_RUNLOCK(sharesummary_free); + while (ss_item) { + DATA_SHARESUMMARY(sharesummary, ss_item); + if (sharesummary->workinfoid < workmarkers->workinfoidstart) + break; + K_RLOCK(sharesummary_free); + ss_prev = prev_in_ktree(ss_ctx); + K_RUNLOCK(sharesummary_free); + + // Find/create the markersummary only once per worker change + if (!ms_item || markersummary->userid != sharesummary->userid || + strcmp(markersummary->workername, sharesummary->workername) != 0) { + lookmarkersummary.markerid = workmarkers->markerid; + lookmarkersummary.userid = sharesummary->userid; + lookmarkersummary.workername = sharesummary->workername; + + ms_look.data = (void *)(&lookmarkersummary); + ms_item = find_in_ktree(ms_root, &ms_look, + cmp_markersummary, ms_ctx); + if (!ms_item) { + K_WLOCK(markersummary_free); + ms_item = k_unlink_head(markersummary_free); + K_WUNLOCK(markersummary_free); + k_add_head(new_markersummary_store, ms_item); + DATA_MARKERSUMMARY(markersummary, ms_item); + bzero(markersummary, sizeof(*markersummary)); + markersummary->markerid = workmarkers->markerid; + markersummary->userid = sharesummary->userid; + markersummary->workername = strdup(sharesummary->workername); + LIST_MEM_ADD(markersummary_free, markersummary->workername); + ms_root = add_to_ktree(ms_root, ms_item, cmp_markersummary); + + LOGDEBUG("%s() new ms %"PRId64"/%"PRId64"/%s", + shortname, markersummary->markerid, + markersummary->userid, + markersummary->workername); + } else { + DATA_MARKERSUMMARY(markersummary, ms_item); + } + } + markersummary->diffacc += sharesummary->diffacc; + markersummary->diffsta += sharesummary->diffsta; + markersummary->diffdup += sharesummary->diffdup; + markersummary->diffhi += sharesummary->diffhi; + markersummary->diffrej += sharesummary->diffrej; + markersummary->shareacc += sharesummary->shareacc; + markersummary->sharesta += sharesummary->sharesta; + markersummary->sharedup += sharesummary->sharedup; + markersummary->sharehi += sharesummary->sharehi; + markersummary->sharerej += sharesummary->sharerej; + markersummary->sharecount += sharesummary->sharecount; + markersummary->errorcount += sharesummary->errorcount; + if (!markersummary->firstshare.tv_sec || + !tv_newer(&(markersummary->firstshare), &(sharesummary->firstshare))) { + copy_tv(&(markersummary->firstshare), &(sharesummary->firstshare)); + } + if (tv_newer(&(markersummary->lastshare), &(sharesummary->lastshare))) { + copy_tv(&(markersummary->lastshare), &(sharesummary->lastshare)); + markersummary->lastdiffacc = sharesummary->lastdiffacc; + } + + diffacc += sharesummary->diffacc; + shareacc += sharesummary->shareacc; + + k_unlink_item(sharesummary_store, ss_item); + k_add_head(old_sharesummary_store, ss_item); + + ss_item = ss_prev; + } + + if (old_sharesummary_store->count == 0) + reason = "no sharesummaries"; + else { + if (conn == NULL) { + conn = dbconnect(); + conned = true; + } + + res = PQexec(conn, "Begin", CKPQ_WRITE); + rescode = PQresultStatus(res); + PQclear(res); + if (!PGOK(rescode)) { + PGLOGERR("Begin", rescode, conn); + goto flail; + } + + ms_item = new_markersummary_store->head; + while (ms_item) { + if (!(markersummary_add(conn, ms_item, by, code, inet, + cd, trf_root))) { + reason = "db error"; + goto rollback; + } + ms_item = ms_item->next; + } + + par = 0; + params[par++] = bigint_to_buf(workmarkers->workinfoidstart, NULL, 0); + params[par++] = bigint_to_buf(workmarkers->workinfoidend, NULL, 0); + PARCHK(par, params); + + del = "delete from sharesummary " + "where workinfoid >= $1 and workinfoid <= $2"; + + res = PQexecParams(conn, del, par, NULL, (const char **)params, NULL, NULL, 0, CKPQ_WRITE); + rescode = PQresultStatus(res); + if (PGOK(rescode)) { + tuples = PQcmdTuples(res); + if (tuples && *tuples) + deleted = atoi(tuples); + } + PQclear(res); + if (!PGOK(rescode)) { + PGLOGERR("Delete", rescode, conn); + reason = "delete failure"; + goto rollback; + } + + if (deleted != old_sharesummary_store->count) { + LOGERR("%s() processed sharesummaries=%d but deleted=%d", + shortname, old_sharesummary_store->count, deleted); + reason = "delete mismatch"; + goto rollback; + } + + ok = workmarkers_process(conn, true, true, + workmarkers->markerid, + workmarkers->poolinstance, + workmarkers->workinfoidend, + workmarkers->workinfoidstart, + workmarkers->description, + MARKER_PROCESSED_STR, + by, code, inet, cd, trf_root); +rollback: + if (ok) + res = PQexec(conn, "Commit", CKPQ_WRITE); + else + res = PQexec(conn, "Rollback", CKPQ_WRITE); + + PQclear(res); + } +flail: + for (n = 0; n < par; n++) + free(params[n]); + + if (conned) + PQfinish(conn); + + if (reason) { + // already displayed the full workmarkers detail at the top + LOGERR("%s() %s: workmarkers %"PRId64"/%s/%s", + shortname, reason, workmarkers->markerid, + workmarkers->description, workmarkers->status); + + ok = false; + } + + if (!ok) { + if (new_markersummary_store->count > 0) { + // Throw them away (they don't exist anywhere else) + ms_item = new_markersummary_store->head; + while (ms_item) { + free_markersummary_data(ms_item); + ms_item = ms_item->next; + } + K_WLOCK(markersummary_free); + k_list_transfer_to_head(new_markersummary_store, markersummary_free); + K_WUNLOCK(markersummary_free); + } + if (old_sharesummary_store->count > 0) { + // Put them back in the store where they came from + K_WLOCK(sharesummary_free); + k_list_transfer_to_head(old_sharesummary_store, sharesummary_store); + K_WUNLOCK(sharesummary_free); + } + } else { + ms_count = new_markersummary_store->count; + ss_count = old_sharesummary_store->count; + // Deadlock alert for other newer code ... + K_WLOCK(sharesummary_free); + K_WLOCK(markersummary_free); + ms_item = new_markersummary_store->head; + while (ms_item) { + // Move the new markersummaries into the trees/stores + markersummary_root = add_to_ktree(markersummary_root, + ms_item, + cmp_markersummary); + markersummary_userid_root = add_to_ktree(markersummary_userid_root, + ms_item, + cmp_markersummary_userid); + ms_item = ms_item->next; + } + k_list_transfer_to_head(new_markersummary_store, markersummary_store); + + /* For normal shift processing this wont be very quick + * so it will be a 'long' LOCK */ + ss_item = old_sharesummary_store->head; + while (ss_item) { + // remove the old sharesummaries from the trees + sharesummary_root = remove_from_ktree(sharesummary_root, + ss_item, + cmp_sharesummary); + sharesummary_workinfoid_root = remove_from_ktree(sharesummary_workinfoid_root, + ss_item, + cmp_sharesummary_workinfoid); + free_sharesummary_data(ss_item); + + ss_item = ss_item->next; + } + k_list_transfer_to_head(old_sharesummary_store, sharesummary_free); + K_WUNLOCK(markersummary_free); + K_WUNLOCK(sharesummary_free); + + LOGWARNING("%s() Processed: %d ms %d ss %"PRId64" shares " + "%"PRId64" diff for workmarkers %"PRId64"/%s/" + "End %"PRId64"/Stt %"PRId64"/%s/%s", + shortname, ms_count, ss_count, shareacc, diffacc, + workmarkers->markerid, workmarkers->poolinstance, + workmarkers->workinfoidend, + workmarkers->workinfoidstart, + workmarkers->description, + workmarkers->status); + } + ms_root = free_ktree(ms_root, NULL); + new_markersummary_store = k_free_store(new_markersummary_store); + old_sharesummary_store = k_free_store(old_sharesummary_store); + + return ok; +} + bool _sharesummary_update(PGconn *conn, SHARES *s_row, SHAREERRORS *e_row, K_ITEM *ss_item, char *by, char *code, char *inet, tv_t *cd, WHERE_FFL_ARGS) { @@ -5204,6 +5517,82 @@ clean: return ok; } +bool markersummary_add(PGconn *conn, K_ITEM *ms_item, char *by, char *code, + char *inet, tv_t *cd, K_TREE *trf_root) +{ + ExecStatusType rescode; + bool conned = false; + PGresult *res; + MARKERSUMMARY *row; + char *params[18 + MODIFYDATECOUNT]; + int n, par = 0; + char *ins; + bool ok = false; + + LOGDEBUG("%s(): add", __func__); + + DATA_MARKERSUMMARY(row, ms_item); + + MODIFYDATEPOINTERS(markersummary_free, row, cd, by, code, inet); + MODIFYDATETRANSFER(markersummary_free, trf_root, row); + + par = 0; + params[par++] = bigint_to_buf(row->markerid, NULL, 0); + params[par++] = bigint_to_buf(row->userid, NULL, 0); + params[par++] = str_to_buf(row->workername, NULL, 0); + params[par++] = double_to_buf(row->diffacc, NULL, 0); + params[par++] = double_to_buf(row->diffsta, NULL, 0); + params[par++] = double_to_buf(row->diffdup, NULL, 0); + params[par++] = double_to_buf(row->diffhi, NULL, 0); + params[par++] = double_to_buf(row->diffrej, NULL, 0); + params[par++] = double_to_buf(row->shareacc, NULL, 0); + params[par++] = double_to_buf(row->sharesta, NULL, 0); + params[par++] = double_to_buf(row->sharedup, NULL, 0); + params[par++] = double_to_buf(row->sharehi, NULL, 0); + params[par++] = double_to_buf(row->sharerej, NULL, 0); + params[par++] = bigint_to_buf(row->sharecount, NULL, 0); + params[par++] = bigint_to_buf(row->errorcount, NULL, 0); + params[par++] = tv_to_buf(&(row->firstshare), NULL, 0); + params[par++] = tv_to_buf(&(row->lastshare), NULL, 0); + params[par++] = double_to_buf(row->lastdiffacc, NULL, 0); + MODIFYDATEPARAMS(params, par, row); + PARCHK(par, params); + + ins = "insert into markersummary " + "(markerid,userid,workername,diffacc,diffsta,diffdup,diffhi," + "diffrej,shareacc,sharesta,sharedup,sharehi,sharerej," + "sharecount,errorcount,firstshare,lastshare,lastdiffacc" + MODIFYDATECONTROL ") values (" PQPARAM26 ")"; + + LOGDEBUG("%s() adding ms %"PRId64"/%"PRId64"/%s/%.0f", + __func__, row->markerid, row->userid, row->workername, + row->diffacc); + + if (!conn) { + conn = dbconnect(); + conned = true; + } + + res = PQexecParams(conn, ins, par, NULL, (const char **)params, NULL, NULL, 0, CKPQ_WRITE); + rescode = PQresultStatus(res); + if (!PGOK(rescode)) { + PGLOGERR("Insert", rescode, conn); + goto unparam; + } + + ok = true; +unparam: + PQclear(res); + if (conned) + PQfinish(conn); + for (n = 0; n < par; n++) + free(params[n]); + + // caller must do tree/list/store changes + + return ok; +} + bool markersummary_fill(PGconn *conn) { ExecStatusType rescode; @@ -5380,10 +5769,10 @@ bool markersummary_fill(PGconn *conn) * since we only check for a CURRENT workmarkers * N.B. also, this returns success if !add and there is no matching * old workmarkers */ -bool _workmarkers_process(PGconn *conn, bool add, int64_t markerid, - char *poolinstance, int64_t workinfoidend, - int64_t workinfoidstart, char *description, - char *status, char *by, char *code, +bool _workmarkers_process(PGconn *conn, bool already, bool add, + int64_t markerid, char *poolinstance, + int64_t workinfoidend, int64_t workinfoidstart, + char *description, char *status, char *by, char *code, char *inet, tv_t *cd, K_TREE *trf_root, WHERE_FFL_ARGS) { @@ -5391,7 +5780,7 @@ bool _workmarkers_process(PGconn *conn, bool add, int64_t markerid, bool conned = false; PGresult *res = NULL; K_ITEM *wm_item = NULL, *old_wm_item = NULL, *w_item; - WORKMARKERS *row, *oldworkmarkers; + WORKMARKERS *row, *oldworkmarkers = NULL; char *upd, *ins; char *params[6 + HISTORYDATECOUNT]; bool ok = false, begun = false; @@ -5414,15 +5803,17 @@ bool _workmarkers_process(PGconn *conn, bool add, int64_t markerid, conn = dbconnect(); conned = true; } - res = PQexec(conn, "Begin", CKPQ_WRITE); - rescode = PQresultStatus(res); - PQclear(res); - if (!PGOK(rescode)) { - PGLOGERR("Begin", rescode, conn); - goto unparam; - } + if (!already) { + res = PQexec(conn, "Begin", CKPQ_WRITE); + rescode = PQresultStatus(res); + PQclear(res); + if (!PGOK(rescode)) { + PGLOGERR("Begin", rescode, conn); + goto unparam; + } - begun = true; + begun = true; + } upd = "update workmarkers set expirydate=$1 where markerid=$2" " and expirydate=$3"; @@ -5473,7 +5864,7 @@ bool _workmarkers_process(PGconn *conn, bool add, int64_t markerid, conned = true; } - if (!begun) { + if (!already && !begun) { res = PQexec(conn, "Begin", CKPQ_WRITE); rescode = PQresultStatus(res); PQclear(res); @@ -5707,7 +6098,7 @@ bool _marks_process(PGconn *conn, bool add, char *poolinstance, bool conned = false; PGresult *res = NULL; K_ITEM *m_item = NULL, *old_m_item = NULL, *w_item; - MARKS *row, *oldmarks; + MARKS *row, *oldmarks = NULL; char *upd, *ins; char *params[6 + HISTORYDATECOUNT]; bool ok = false, begun = false;