From 93b8bf007155369845d9d391656b44b62495d7ac Mon Sep 17 00:00:00 2001 From: kanoi Date: Thu, 27 Nov 2014 16:24:05 +1100 Subject: [PATCH 1/3] ckdb - implement marks with cmd_marks to create and use marks and workmarkers --- sql/ckdb.sql | 3 +- sql/v0.9.5-v0.9.6.sql | 39 ++++ src/ckdb.c | 69 ++++++- src/ckdb.h | 116 +++++++++-- src/ckdb_cmd.c | 327 ++++++++++++++++++++++++++++++- src/ckdb_data.c | 398 ++++++++++++++++++++++++++++++++++++- src/ckdb_dbio.c | 442 +++++++++++++++++++++++++++++++++++++++++- 7 files changed, 1343 insertions(+), 51 deletions(-) create mode 100644 sql/v0.9.5-v0.9.6.sql diff --git a/sql/ckdb.sql b/sql/ckdb.sql index aaa6eca7..441fbc90 100644 --- a/sql/ckdb.sql +++ b/sql/ckdb.sql @@ -255,6 +255,7 @@ CREATE TABLE marks ( -- workinfoids to make workmarkers poolinstance character varying(256) NOT NULL, workinfoid bigint NOT NULL, description character varying(256) DEFAULT ''::character varying NOT NULL, + extra character varying(256) DEFAULT ''::character varying NOT NULL, marktype char NOT NULL, -- 'b'lock 'p'plns-begin 's'hift-begin 'e'=shift-end status char NOT NULL, createdate timestamp with time zone NOT NULL, @@ -432,4 +433,4 @@ CREATE TABLE version ( PRIMARY KEY (vlock) ); -insert into version (vlock,version) values (1,'0.9.5'); +insert into version (vlock,version) values (1,'0.9.6'); diff --git a/sql/v0.9.5-v0.9.6.sql b/sql/v0.9.5-v0.9.6.sql new file mode 100644 index 00000000..5b683680 --- /dev/null +++ b/sql/v0.9.5-v0.9.6.sql @@ -0,0 +1,39 @@ +SET SESSION AUTHORIZATION 'postgres'; + +BEGIN transaction; + +DO $$ +DECLARE ver TEXT; +BEGIN + + UPDATE version set version='0.9.6' where vlock=1 and version='0.9.5'; + + IF found THEN + RETURN; + END IF; + + SELECT version into ver from version + WHERE vlock=1; + + RAISE EXCEPTION 'Wrong DB version - expect "0.9.5" - found "%"', ver; + +END $$; + +DROP TABLE marks; + +CREATE TABLE marks ( -- workinfoids to make workmarkers + poolinstance character varying(256) NOT NULL, + workinfoid bigint NOT NULL, + description character varying(256) DEFAULT ''::character varying NOT NULL, + extra character varying(256) DEFAULT ''::character varying NOT NULL, + marktype char NOT NULL, -- 'b'lock(end) 'p'plns-begin 's'hift-begin 'e'=shift-end + status char NOT NULL, + createdate timestamp with time zone NOT NULL, + createby character varying(64) DEFAULT ''::character varying NOT NULL, + createcode character varying(128) DEFAULT ''::character varying NOT NULL, + createinet character varying(128) DEFAULT ''::character varying NOT NULL, + expirydate timestamp with time zone DEFAULT '6666-06-06 06:06:06+00', + PRIMARY KEY (poolinstance, workinfoid, expirydate) +); + +END transaction; diff --git a/src/ckdb.c b/src/ckdb.c index c0305a38..71e5eed3 100644 --- a/src/ckdb.c +++ b/src/ckdb.c @@ -460,6 +460,20 @@ K_TREE *marks_root; K_LIST *marks_free; K_STORE *marks_store; +const char *marktype_block = "Block End"; +const char *marktype_pplns = "Payout Begin"; +const char *marktype_shift_begin = "Shift Begin"; +const char *marktype_shift_end = "Shift End"; +const char *marktype_other_begin = "Other Begin"; +const char *marktype_other_finish = "Other Finish"; + +const char *marktype_block_fmt = "Block %"PRId32" fin"; +const char *marktype_pplns_fmt = "Payout %"PRId32" stt"; +const char *marktype_shift_begin_fmt = "Shift %s stt"; +const char *marktype_shift_end_fmt = "Shift %s fin"; +const char *marktype_other_begin_fmt = "stt: %s"; +const char *marktype_other_finish_fmt = "fin: %s"; + static char logname[512]; static char *dbcode; @@ -1046,7 +1060,7 @@ static void alloc_storage() marks_free = k_new_list("Marks", sizeof(MARKS), ALLOC_MARKS, LIMIT_MARKS, true); - marks_store = k_new_store(workmarkers_free); + marks_store = k_new_store(marks_free); marks_root = new_ktree(); } @@ -1114,6 +1128,19 @@ static void free_workmarkers_data(K_ITEM *item) free(workmarkers->description); } +static void free_marks_data(K_ITEM *item) +{ + MARKS *marks; + + DATA_MARKS(marks, item); + if (marks->poolinstance && marks->poolinstance != EMPTY) + free(marks->poolinstance); + if (marks->description && marks->description != EMPTY) + free(marks->description); + if (marks->extra && marks->extra != EMPTY) + free(marks->extra); +} + #define FREE_TREE(_tree) \ if (_tree ## _root) \ _tree ## _root = free_ktree(_tree ## _root, NULL) \ @@ -1154,7 +1181,9 @@ static void dealloc_storage() { FREE_LISTS(logqueue); - FREE_ALL(marks); + FREE_TREE(marks); + FREE_STORE_DATA(marks); + FREE_LIST_DATA(marks); FREE_TREE(workmarkers_workinfoid); FREE_TREE(workmarkers); @@ -1281,7 +1310,7 @@ static bool setup_data() static enum cmd_values breakdown(K_TREE **trf_root, K_STORE **trf_store, char *buf, int *which_cmds, char *cmd, - char *id, tv_t *cd) + char *id, tv_t *now, tv_t *cd) { char reply[1024] = ""; TRANSFER *transfer; @@ -1295,8 +1324,7 @@ static enum cmd_values breakdown(K_TREE **trf_root, K_STORE **trf_store, *trf_store = NULL; *which_cmds = CMD_UNSET; *cmd = *id = '\0'; - cd->tv_sec = 0; - cd->tv_usec = 0; + copy_tv(cd, now); // default cd to 'now' cmdptr = strdup(buf); idptr = strchr(cmdptr, '.'); @@ -1676,7 +1704,7 @@ static void summarise_blocks() workmarkers->description, workmarkers->status, hi, wi_finish); } - if (WMREADY(workmarkers->status)) { + if (WMPROCESSED(workmarkers->status)) { lookmarkersummary.markerid = workmarkers->markerid; lookmarkersummary.userid = MAXID; lookmarkersummary.workername = EMPTY; @@ -2202,7 +2230,7 @@ static void *socketer(__maybe_unused void *arg) LOGDEBUG("Duplicate '%s' message received", duptype); } else { LOGQUE(buf); - cmdnum = breakdown(&trf_root, &trf_store, buf, &which_cmds, cmd, id, &cd); + cmdnum = breakdown(&trf_root, &trf_store, buf, &which_cmds, cmd, id, &now, &cd); switch (cmdnum) { case CMD_REPLY: snprintf(reply, sizeof(reply), "%s.%ld.?.", id, now.tv_sec); @@ -2373,6 +2401,30 @@ static void *socketer(__maybe_unused void *arg) rep = NULL; } break; + /* Process, but reject (loading) until startup_complete + * and don't test for duplicates */ + case CMD_MARKS: + if (!startup_complete) { + snprintf(reply, sizeof(reply), + "%s.%ld.loading.%s", + id, now.tv_sec, cmd); + send_unix_msg(sockd, reply); + } else { + ans = ckdb_cmds[which_cmds].func(NULL, cmd, id, &now, + by_default, + (char *)__func__, + inet_default, + &cd, trf_root); + siz = strlen(ans) + strlen(id) + 32; + rep = malloc(siz); + snprintf(rep, siz, "%s.%ld.%s", id, now.tv_sec, ans); + send_unix_msg(sockd, rep); + free(ans); + ans = NULL; + free(rep); + rep = NULL; + } + break; // Always queue (ok.queued) case CMD_SHARELOG: case CMD_POOLSTAT: @@ -2503,7 +2555,7 @@ static bool reload_line(PGconn *conn, char *filename, uint64_t count, char *buf) } LOGQUE(buf); - cmdnum = breakdown(&trf_root, &trf_store, buf, &which_cmds, cmd, id, &cd); + cmdnum = breakdown(&trf_root, &trf_store, buf, &which_cmds, cmd, id, &now, &cd); switch (cmdnum) { // Ignore case CMD_REPLY: @@ -2535,6 +2587,7 @@ static bool reload_line(PGconn *conn, char *filename, uint64_t count, char *buf) case CMD_STATS: case CMD_PPLNS: case CMD_USERSTATUS: + case CMD_MARKS: LOGERR("%s() Message line %"PRIu64" '%s' - invalid - ignored", __func__, count, cmd); break; diff --git a/src/ckdb.h b/src/ckdb.h index 47c1b9cc..afade726 100644 --- a/src/ckdb.h +++ b/src/ckdb.h @@ -51,8 +51,8 @@ */ #define DB_VLOCK "1" -#define DB_VERSION "0.9.5" -#define CKDB_VERSION DB_VERSION"-0.652" +#define DB_VERSION "0.9.6" +#define CKDB_VERSION DB_VERSION"-0.664" #define WHERE_FFL " - from %s %s() line %d" #define WHERE_FFL_HERE __FILE__, __func__, __LINE__ @@ -306,6 +306,7 @@ enum cmd_values { CMD_STATS, CMD_PPLNS, CMD_USERSTATUS, + CMD_MARKS, CMD_END }; @@ -882,6 +883,7 @@ typedef struct workinfo { #define LIMIT_WORKINFO 0 #define INIT_WORKINFO(_item) INIT_GENERIC(_item, workinfo) #define DATA_WORKINFO(_var, _item) DATA_GENERIC(_var, _item, workinfo, true) +#define DATA_WORKINFO_NULL(_var, _item) DATA_GENERIC(_var, _item, workinfo, false) extern K_TREE *workinfo_root; // created during data load then destroyed since not needed later @@ -1339,46 +1341,59 @@ extern K_TREE *workmarkers_workinfoid_root; extern K_LIST *workmarkers_free; extern K_STORE *workmarkers_store; -#define MARKER_COMPLETE 'x' -#define WMREADY(_status) (tolower(_status[0]) == MARKER_COMPLETE) +#define MARKER_READY 'x' +#define MARKER_READY_STR "x" +#define WMREADY(_status) (tolower((_status)[0]) == MARKER_READY) +#define MARKER_PROCESSED 'p' +#define MARKER_PROCESSED_STR "p" +#define WMPROCESSED(_status) (tolower((_status)[0]) == MARKER_PROCESSED) // MARKS -// TODO: implement typedef struct marks { char *poolinstance; int64_t workinfoid; char *description; + char *extra; char marktype[TXT_FLAG+1]; char status[TXT_FLAG+1]; HISTORYDATECONTROLFIELDS; } MARKS; -/* Marks: +/* marks: * marktype is one of: * b - block end * p - pplns begin * s - shift begin (not yet used) * e - shift end (not yet used) - * description should one one of - * b - Block NNN stt - * p - Payout NNN fin (where NNN is the block number of the payout) - * s/e - to be decided + * o - other begin + * f - other finish/end + * description generated will be: + * b - Block NNN fin + * p - Payout NNN stt (where NNN is the block number of the payout) + * s - Shift AAA stt + * e - Shift AAA fin + * o - The string passed to the marks command + * f - The string passed to the marks command * - * WorkMarkers are from a begin workinfoid to an end workinfoid + * workmarkers are from a begin workinfoid to an end workinfoid * the "-1" and "+1" below mean adding to or subtracting from - * the workinfoid number + * the workinfoid number - but should move forward/back to the + * next/prev valid workinfoid, not simply +1 or -1 + * i.e. all workinfoid in marks and workmarkers must exist in workinfo * * Until we start using shifts: - * WorkMarkers can be created up to ending in the largest 'p' "-1" - * WorkMarkers will always be the smallest of: + * workmarkers can be created up to ending in the largest 'p' "-1" + * workmarkers will always be the smallest of: * Block NNN-1 "+1" to Block NNN * Block NNN "+1" to Payout MMM "-1" * Payout MMM to Block NNN * Payout MMM-1 to Payout MMM "-1" - * Thus to generate the WorkMarkers from the Marks: - * Find the last 'p' with no matching workinfoidbegin - * Then determine each previous WorkMarker based on each previous - * mark, using the above rules and stop when we find one that already exists + * Thus to generate the workmarkers from the marks: + * Find the last USED mark then create a workmarker from each pair of + * marks going forward for each READY mark + * Set each mark as USED when we use it + * Stop when either we run out of marks or find a non-READY mark + * Finding a workmarker that already exists is an error */ #define ALLOC_MARKS 1000 @@ -1391,8 +1406,33 @@ extern K_TREE *marks_root; extern K_LIST *marks_free; extern K_STORE *marks_store; +#define MARKTYPE_BLOCK 'b' +#define MARKTYPE_PPLNS 'p' +#define MARKTYPE_SHIFT_BEGIN 's' +#define MARKTYPE_SHIFT_END 'e' +#define MARKTYPE_OTHER_BEGIN 'o' +#define MARKTYPE_OTHER_FINISH 'f' + +extern const char *marktype_block; +extern const char *marktype_pplns; +extern const char *marktype_shift_begin; +extern const char *marktype_shift_end; +extern const char *marktype_other_begin; +extern const char *marktype_other_finish; + +extern const char *marktype_block_fmt; +extern const char *marktype_pplns_fmt; +extern const char *marktype_shift_begin_fmt; +extern const char *marktype_shift_end_fmt; +extern const char *marktype_other_begin_fmt; +extern const char *marktype_other_finish_fmt; + #define MARK_READY 'x' -#define MREADY(_status) (tolower(_status[0]) == MARK_READY) +#define MARK_READY_STR "x" +#define MREADY(_status) (tolower((_status)[0]) == MARK_READY) +#define MARK_USED 'u' +#define MARK_USED_STR "u" +#define MUSED(_status) (tolower((_status)[0]) == MARK_USED) extern void logmsg(int loglevel, const char *fmt, ...); extern void setnow(tv_t *now); @@ -1557,9 +1597,19 @@ extern K_ITEM *find_markersummary_userid(int64_t userid, char *workername, extern void dsp_workmarkers(K_ITEM *item, FILE *stream); extern cmp_t cmp_workmarkers(K_ITEM *a, K_ITEM *b); extern cmp_t cmp_workmarkers_workinfoid(K_ITEM *a, K_ITEM *b); -extern K_ITEM *find_workmarkers(int64_t workinfoid); -extern K_ITEM *find_workmarkerid(int64_t markerid); +extern K_ITEM *find_workmarkers(int64_t workinfoid, bool anystatus, char status); +extern K_ITEM *find_workmarkerid(int64_t markerid, bool anystatus, char status); +extern bool workmarkers_generate(PGconn *conn, char *err, size_t siz, + char *by, char *code, char *inet, tv_t *cd, + K_TREE *trf_root); extern cmp_t cmp_marks(K_ITEM *a, K_ITEM *b); +extern K_ITEM *find_marks(int64_t workinfoid); +extern const char *marks_marktype(char *marktype); +#define marks_description(_description, _siz, _marktype, _height, _shift, _other) \ + _marks_description(_description, _siz, _marktype, _height, _shift, _other, WHERE_FFL_HERE) +extern bool _marks_description(char *description, size_t siz, char *marktype, + int32_t height, char *shift, char *other, + WHERE_FFL_ARGS); // *** // *** PostgreSQL functions ckdb_dbio.c @@ -1702,7 +1752,31 @@ extern bool userstats_add(char *poolinstance, char *elapsed, char *username, K_TREE *trf_root); extern bool userstats_fill(PGconn *conn); extern bool markersummary_fill(PGconn *conn); +#define workmarkers_process(_conn, _add, _markerid, _poolinstance, \ + _workinfoidend, _workinfoidstart, _description, \ + _status, _by, _code, _inet, _cd, _trf_root) \ + _workmarkers_process(_conn, _add, _markerid, _poolinstance, \ + _workinfoidend, _workinfoidstart, _description, \ + _status, _by, _code, _inet, _cd, _trf_root, \ + WHERE_FFL_HERE) +extern bool _workmarkers_process(PGconn *conn, bool add, int64_t markerid, + char *poolinstance, int64_t workinfoidend, + int64_t workinfoidstart, char *description, + char *status, char *by, char *code, + char *inet, tv_t *cd, K_TREE *trf_root, + WHERE_FFL_ARGS); extern bool workmarkers_fill(PGconn *conn); +#define marks_process(_conn, _add, _poolinstance, _workinfoid, _description, \ + _extra, _marktype, _status, _by, _code, _inet, _cd, \ + _trf_root) \ + _marks_process(_conn, _add, _poolinstance, _workinfoid, _description, \ + _extra, _marktype, _status, _by, _code, _inet, _cd, \ + _trf_root, WHERE_FFL_HERE) +extern bool _marks_process(PGconn *conn, bool add, char *poolinstance, + int64_t workinfoid, char *description, + char *extra, char *marktype, char *status, + char *by, char *code, char *inet, tv_t *cd, + K_TREE *trf_root, WHERE_FFL_ARGS); extern bool marks_fill(PGconn *conn); extern bool check_db_version(PGconn *conn); diff --git a/src/ckdb_cmd.c b/src/ckdb_cmd.c index 1dcd7b6b..45fe8dc3 100644 --- a/src/ckdb_cmd.c +++ b/src/ckdb_cmd.c @@ -3157,7 +3157,7 @@ static char *cmd_pplns(__maybe_unused PGconn *conn, char *cmd, char *id, cmp_workmarkers_workinfoid, wm_ctx); DATA_WORKMARKERS_NULL(workmarkers, wm_item); while (total_diff < diff_want && wm_item && CURRENT(&(workmarkers->expirydate))) { - if (WMREADY(workmarkers->status)) { + if (WMPROCESSED(workmarkers->status)) { wm_count++; lookmarkersummary.markerid = workmarkers->markerid; lookmarkersummary.userid = MAXID; @@ -3454,6 +3454,7 @@ static char *cmd_stats(__maybe_unused PGconn *conn, char *cmd, char *id, USEINFO(sharesummary, 1, 2); USEINFO(workmarkers, 1, 2); USEINFO(markersummary, 1, 2); + USEINFO(marks, 1, 1); USEINFO(blocks, 1, 1); USEINFO(miningpayouts, 1, 1); USEINFO(auths, 1, 1); @@ -3483,8 +3484,8 @@ static char *cmd_stats(__maybe_unused PGconn *conn, char *cmd, char *id, // TODO: add to heartbeat to disable the miner if active and status != "" static char *cmd_userstatus(PGconn *conn, char *cmd, char *id, tv_t *now, char *by, - char *code, char *inet, __maybe_unused tv_t *cd, - K_TREE *trf_root) + char *code, char *inet, __maybe_unused tv_t *cd, + K_TREE *trf_root) { char reply[1024] = ""; size_t siz = sizeof(reply); @@ -3544,6 +3545,325 @@ static char *cmd_userstatus(PGconn *conn, char *cmd, char *id, tv_t *now, char * return strdup(reply); } +/* Socket interface to the functions that will be used later to automatically + * create marks, workmarkers and process the workmarkers + * to generate markersummaries */ +static char *cmd_marks(PGconn *conn, char *cmd, char *id, + __maybe_unused tv_t *now, char *by, + char *code, char *inet, tv_t *cd, + K_TREE *trf_root) +{ + char reply[1024] = ""; + size_t siz = sizeof(reply); + char tmp[1024] = ""; + char msg[1024] = ""; + K_ITEM *i_action, *i_workinfoid, *i_marktype, *i_description; + K_ITEM *i_height, *i_status, *i_extra, *m_item, *b_item, *w_item; + K_ITEM *wm_item, *wm_item_prev; + WORKMARKERS *workmarkers; + K_TREE_CTX ctx[1]; + BLOCKS *blocks; + MARKS *marks; + char *action; + int64_t workinfoid = -1; + char *marktype; + int32_t height = 0; + char description[TXT_BIG+1] = { '\0' }; + char extra[TXT_BIG+1] = { '\0' }; + char status[TXT_FLAG+1] = { MARK_READY, '\0' }; + bool ok; + + LOGDEBUG("%s(): cmd '%s'", __func__, cmd); + + i_action = require_name(trf_root, "action", 1, NULL, reply, siz); + if (!i_action) + return strdup(reply); + action = transfer_data(i_action); + + if (strcasecmp(action, "add") == 0) { + /* Add a mark + * Require marktype + * Require workinfoid for all but 'b' + * If marktype is 'b' or 'p' then require height/block (number) + * If marktype is 'o' or 'f' then require description + * Status optional - default READY */ + i_marktype = require_name(trf_root, "marktype", + 1, NULL, + reply, siz); + if (!i_marktype) + return strdup(reply); + marktype = transfer_data(i_marktype); + + if (marktype[0] != MARKTYPE_BLOCK) { + i_workinfoid = require_name(trf_root, "workinfoid", + 1, (char *)intpatt, + reply, siz); + if (!i_workinfoid) + return strdup(reply); + TXT_TO_BIGINT("workinfoid", + transfer_data(i_workinfoid), + workinfoid); + } + + switch (marktype[0]) { + case MARKTYPE_BLOCK: + case MARKTYPE_PPLNS: + i_height = require_name(trf_root, + "height", + 1, (char *)intpatt, + reply, siz); + if (!i_height) + return strdup(reply); + TXT_TO_INT("height", transfer_data(i_height), + height); + K_RLOCK(blocks_free); + b_item = find_prev_blocks(height+1); + K_RUNLOCK(blocks_free); + if (b_item) { + DATA_BLOCKS(blocks, b_item); + if (blocks->height != height) + b_item = NULL; + } + if (!b_item) { + snprintf(reply, siz, + "no blocks with height %"PRId32, height); + return strdup(reply); + } + if (marktype[0] == MARKTYPE_BLOCK) + workinfoid = blocks->workinfoid; + + if (!marks_description(description, sizeof(description), + marktype, height, NULL, NULL)) + goto dame; + break; + case MARKTYPE_SHIFT_BEGIN: + case MARKTYPE_SHIFT_END: + snprintf(reply, siz, + "marktype %s not yet handled", + marks_marktype(marktype)); + return strdup(reply); + case MARKTYPE_OTHER_BEGIN: + case MARKTYPE_OTHER_FINISH: + i_description = require_name(trf_root, + "description", + 1, NULL, + reply, siz); + if (!i_description) + return strdup(reply); + if (!marks_description(description, sizeof(description), + marktype, height, NULL, + transfer_data(i_description))) + goto dame; + break; + default: + snprintf(reply, siz, + "unknown marktype '%s'", marktype); + return strdup(reply); + } + i_status = optional_name(trf_root, "status", 1, NULL, reply, siz); + if (i_status) { + STRNCPY(status, transfer_data(i_status)); + switch(status[0]) { + case MARK_READY: + case MARK_USED: + case '\0': + break; + default: + snprintf(reply, siz, + "unknown mark status '%s'", status); + return strdup(reply); + } + } + if (workinfoid == -1) { + snprintf(reply, siz, "workinfoid not found"); + return strdup(reply); + } + w_item = find_workinfo(workinfoid); + if (!w_item) { + snprintf(reply, siz, "invalid workinfoid %"PRId64, + workinfoid); + return strdup(reply); + } + ok = marks_process(conn, true, EMPTY, workinfoid, description, + extra, marktype, status, by, code, inet, cd, + trf_root); + } else if (strcasecmp(action, "expire") == 0) { + /* Expire the mark - effectively deletes it + * Require workinfoid */ + i_workinfoid = require_name(trf_root, "workinfoid", 1, (char *)intpatt, reply, siz); + if (!i_workinfoid) + return strdup(reply); + TXT_TO_BIGINT("workinfoid", transfer_data(i_workinfoid), workinfoid); + K_RLOCK(marks_free); + m_item = find_marks(workinfoid); + K_RUNLOCK(marks_free); + if (!m_item) { + snprintf(reply, siz, + "unknown current mark with workinfoid %"PRId64, workinfoid); + return strdup(reply); + } + ok = marks_process(conn, false, EMPTY, workinfoid, NULL, + NULL, NULL, NULL, by, code, inet, cd, + trf_root); + } else if (strcasecmp(action, "status") == 0) { + /* Change the status on a mark + * Require workinfoid and status + * N.B. you can cause generate errors if you change the status of a USED marks */ + i_workinfoid = require_name(trf_root, "workinfoid", 1, (char *)intpatt, reply, siz); + if (!i_workinfoid) + return strdup(reply); + TXT_TO_BIGINT("workinfoid", transfer_data(i_workinfoid), workinfoid); + K_RLOCK(marks_free); + m_item = find_marks(workinfoid); + K_RUNLOCK(marks_free); + if (!m_item) { + snprintf(reply, siz, + "unknown current mark with workinfoid %"PRId64, workinfoid); + return strdup(reply); + } + DATA_MARKS(marks, m_item); + i_status = require_name(trf_root, "status", 0, NULL, reply, siz); + if (!i_status) + return strdup(reply); + STRNCPY(status, transfer_data(i_status)); + switch(status[0]) { + case MARK_READY: + case MARK_USED: + case '\0': + break; + default: + snprintf(reply, siz, + "unknown mark status '%s'", status); + return strdup(reply); + } + // Unchanged + if (strcmp(status, marks->status) == 0) { + action = "status-unchanged"; + ok = true; + } else { + ok = marks_process(conn, true, marks->poolinstance, + workinfoid, marks->description, + marks->extra, marks->marktype, + status, by, code, inet, cd, + trf_root); + } + } else if (strcasecmp(action, "extra") == 0) { + /* Change the 'extra' description + * Require workinfoid and extra + * If a mark is actually multiple marks with the same + * workinfoid, then we can record the extra info here + * This would be true of each block, once shifts are + * implemented, since the current shift ends when a + * block is found + * This could also be true, very rarely, if the beginning + * of a pplns payout range matched any other mark, + * since the beginning can be any workinfoid */ + i_workinfoid = require_name(trf_root, "workinfoid", 1, (char *)intpatt, reply, siz); + if (!i_workinfoid) + return strdup(reply); + TXT_TO_BIGINT("workinfoid", transfer_data(i_workinfoid), workinfoid); + K_RLOCK(marks_free); + m_item = find_marks(workinfoid); + K_RUNLOCK(marks_free); + if (!m_item) { + snprintf(reply, siz, + "unknown current mark with workinfoid %"PRId64, workinfoid); + return strdup(reply); + } + DATA_MARKS(marks, m_item); + i_extra = require_name(trf_root, "extra", 0, NULL, reply, siz); + if (!i_extra) + return strdup(reply); + STRNCPY(extra, transfer_data(i_extra)); + // Unchanged + if (strcmp(extra, marks->extra) == 0) { + action = "extra-unchanged"; + ok = true; + } else { + ok = marks_process(conn, true, marks->poolinstance, + workinfoid, marks->description, + extra, marks->marktype, + status, by, code, inet, cd, + trf_root); + } + } else if (strcasecmp(action, "generate") == 0) { + /* Generate workmarkers + * No parameters */ + tmp[0] = '\0'; + ok = workmarkers_generate(conn, tmp, sizeof(tmp), + by, code, inet, cd, trf_root); + if (!ok) { + snprintf(reply, siz, "%s error: %s", action, tmp); + LOGERR("%s.%s", id, reply); + return strdup(reply); + } + if (*tmp) { + snprintf(reply, siz, "%s: %s", action, tmp); + LOGWARNING("%s.%s", id, reply); + } + } else if (strcasecmp(action, "expunge") == 0) { + /* Expire all generated workmarkers that aren't PROCESSED + * No parameters + * This exists so we can fix all workmarkers that haven't + * been PROCESSED yet, + * if there was a problem with the marks + * Simply expunge all the workmarkers, correct the marks, + * then generate the workmarkers again + * WARNING - using psql to do the worksummary generation + * will not update the workmarkers status inside ckdb + * so this will expunge those worksummary records also + * You'll need to restart ckdb after using psql */ + int count = 0; + ok = true; + wm_item_prev = NULL; + K_RLOCK(workmarkers_free); + wm_item = last_in_ktree(workmarkers_root, ctx); + K_RUNLOCK(workmarkers_free); + while (wm_item) { + K_RLOCK(workmarkers_free); + wm_item_prev = prev_in_ktree(ctx); + K_RUNLOCK(workmarkers_free); + DATA_WORKMARKERS(workmarkers, wm_item); + if (CURRENT(&(workmarkers->expirydate)) && + !WMPROCESSED(workmarkers->status)) { + ok = workmarkers_process(conn, false, + workmarkers->markerid, + NULL, 0, 0, NULL, NULL, by, + code, inet, cd, trf_root); + if (!ok) + break; + count++; + } + wm_item = wm_item_prev; + } + if (ok) { + if (count == 0) { + snprintf(msg, sizeof(msg), + "no unprocessed current workmarkers"); + } else { + snprintf(msg, sizeof(msg), + "%d workmarkers expunged", count); + } + } + } else { + snprintf(reply, siz, "unknown action '%s'", action); + LOGERR("%s.%s", id, reply); + return strdup(reply); + } + + if (!ok) { +dame: + LOGERR("%s() %s.failed.DBE", __func__, id); + return strdup("failed.DBE"); + } + if (msg[0]) + snprintf(reply, siz, "ok.%s %s", action, msg); + else + snprintf(reply, siz, "ok.%s", action); + LOGWARNING("%s.%s", id, reply); + return strdup(reply); +} + // TODO: limit access by having seperate sockets for each #define ACCESS_POOL "p" #define ACCESS_SYSTEM "s" @@ -3647,5 +3967,6 @@ struct CMDS ckdb_cmds[] = { { CMD_STATS, "stats", true, false, cmd_stats, ACCESS_SYSTEM ACCESS_WEB }, { CMD_PPLNS, "pplns", false, false, cmd_pplns, ACCESS_SYSTEM ACCESS_WEB }, { CMD_USERSTATUS,"userstatus", false, false, cmd_userstatus, ACCESS_SYSTEM ACCESS_WEB }, + { CMD_MARKS, "marks", false, false, cmd_marks, ACCESS_SYSTEM }, { CMD_END, NULL, false, false, NULL, NULL } }; diff --git a/src/ckdb_data.c b/src/ckdb_data.c index 241fdac3..e7005975 100644 --- a/src/ckdb_data.c +++ b/src/ckdb_data.c @@ -697,7 +697,8 @@ void workerstatus_ready() if (ms_item) { DATA_MARKERSUMMARY(markersummary, ms_item); K_RLOCK(workmarkers_free); - wm_item = find_workmarkerid(markersummary->markerid); + wm_item = find_workmarkerid(markersummary->markerid, + false, MARKER_PROCESSED); K_RUNLOCK(workmarkers_free); if (wm_item && tv_newer(&(workerstatus->last_share), &(markersummary->lastshare))) { @@ -1357,9 +1358,9 @@ bool workinfo_age(PGconn *conn, int64_t workinfoid, char *poolinstance, goto bye; } - K_RLOCK(markersummary_free); - wm_item = find_workmarkers(workinfoid); - K_RUNLOCK(markersummary_free); + K_RLOCK(workmarkers_free); + wm_item = find_workmarkers(workinfoid, false, MARKER_PROCESSED); + K_RUNLOCK(workmarkers_free); // Should never happen? if (wm_item && !reloading) { tv_to_buf(cd, cd_buf, sizeof(cd_buf)); @@ -2020,7 +2021,7 @@ void set_block_share_counters() CURRENT(&(workmarkers->expirydate)) && workmarkers->workinfoidend > pool.workinfoid) { - if (WMREADY(workmarkers->status)) + if (WMPROCESSED(workmarkers->status)) { // Should never be true if (workmarkers->workinfoidstart <= pool.workinfoid) { @@ -2332,7 +2333,7 @@ K_ITEM *find_markersummary(int64_t workinfoid, int64_t userid, char *workername) WORKMARKERS *wm; K_TREE_CTX ctx[1]; - wm_item = find_workmarkers(workinfoid); + wm_item = find_workmarkers(workinfoid, false, MARKER_PROCESSED); if (wm_item) { DATA_WORKMARKERS(wm, wm_item); markersummary.markerid = wm->markerid; @@ -2388,7 +2389,7 @@ cmp_t cmp_workmarkers_workinfoid(K_ITEM *a, K_ITEM *b) return c; } -K_ITEM *find_workmarkers(int64_t workinfoid) +K_ITEM *find_workmarkers(int64_t workinfoid, bool anystatus, char status) { WORKMARKERS workmarkers, *wm; K_TREE_CTX ctx[1]; @@ -2404,7 +2405,7 @@ K_ITEM *find_workmarkers(int64_t workinfoid) if (wm_item) { DATA_WORKMARKERS(wm, wm_item); if (!CURRENT(&(wm->expirydate)) || - !WMREADY(wm->status) || + (!anystatus && wm->status[0] != status) || workinfoid < wm->workinfoidstart || workinfoid > wm->workinfoidend) wm_item = NULL; @@ -2412,7 +2413,7 @@ K_ITEM *find_workmarkers(int64_t workinfoid) return wm_item; } -K_ITEM *find_workmarkerid(int64_t markerid) +K_ITEM *find_workmarkerid(int64_t markerid, bool anystatus, char status) { WORKMARKERS workmarkers, *wm; K_TREE_CTX ctx[1]; @@ -2428,12 +2429,269 @@ K_ITEM *find_workmarkerid(int64_t markerid) if (wm_item) { DATA_WORKMARKERS(wm, wm_item); if (!CURRENT(&(wm->expirydate)) || - !WMREADY(wm->status)) + (!anystatus && wm->status[0] != status)) wm_item = NULL; } return wm_item; } +// Create one +static bool gen_workmarkers(PGconn *conn, MARKS *stt, bool after, MARKS *fin, + bool before, char *by, char *code, char *inet, + tv_t *cd, K_TREE *trf_root) +{ + K_ITEM look, *wi_stt_item, *wi_fin_item, *old_wm_item; + WORKMARKERS *old_wm; + WORKINFO workinfo, *wi_stt, *wi_fin; + K_TREE_CTX ctx[1]; + char description[TXT_BIG+1]; + bool ok; + + workinfo.workinfoid = stt->workinfoid; + workinfo.expirydate.tv_sec = default_expiry.tv_sec; + workinfo.expirydate.tv_usec = default_expiry.tv_usec; + + INIT_WORKINFO(&look); + look.data = (void *)(&workinfo); + K_RLOCK(workinfo_free); + if (after) { + wi_stt_item = find_after_in_ktree(workinfo_root, &look, + cmp_workinfo, ctx); + while (wi_stt_item) { + DATA_WORKINFO(wi_stt, wi_stt_item); + if (CURRENT(&(wi_stt->expirydate))) + break; + wi_stt_item = next_in_ktree(ctx); + } + } else { + wi_stt_item = find_in_ktree(workinfo_root, &look, + cmp_workinfo, ctx); + DATA_WORKINFO_NULL(wi_stt, wi_stt_item); + } + K_RUNLOCK(workinfo_free); + if (!wi_stt_item) + return false; + if (!CURRENT(&(wi_stt->expirydate))) + return false; + + workinfo.workinfoid = fin->workinfoid; + + INIT_WORKINFO(&look); + look.data = (void *)(&workinfo); + K_RLOCK(workinfo_free); + if (before) { + workinfo.expirydate.tv_sec = 0; + workinfo.expirydate.tv_usec = 0; + wi_fin_item = find_before_in_ktree(workinfo_root, &look, + cmp_workinfo, ctx); + while (wi_fin_item) { + DATA_WORKINFO(wi_fin, wi_fin_item); + if (CURRENT(&(wi_fin->expirydate))) + break; + wi_fin_item = prev_in_ktree(ctx); + } + } else { + workinfo.expirydate.tv_sec = default_expiry.tv_sec; + workinfo.expirydate.tv_usec = default_expiry.tv_usec; + wi_fin_item = find_in_ktree(workinfo_root, &look, + cmp_workinfo, ctx); + DATA_WORKINFO_NULL(wi_fin, wi_fin_item); + } + K_RUNLOCK(workinfo_free); + if (!wi_fin_item) + return false; + if (!CURRENT(&(wi_fin->expirydate))) + return false; + + /* If two marks in a row are fin(+after) then stt(-before), + * it may be that there should be no workmarkers range between them + * This may show up as the calculated finish id being before + * the start id - so no need to create it since it will be empty + * Also note that empty workmarkers are not a problem, + * but simply unnecessary and in this specific case, + * we don't create it since a negative range would cause tree + * sort order and matching errors */ + if (wi_fin->workinfoid >= wi_stt->workinfoid) { + K_RLOCK(workmarkers_free); + old_wm_item = find_workmarkers(wi_fin->workinfoid, true, '\0'); + K_RUNLOCK(workmarkers_free); + DATA_WORKMARKERS_NULL(old_wm, old_wm_item); + if (old_wm_item && (WMREADY(old_wm->status) || + WMPROCESSED(old_wm->status))) { + /* This actually means a code bug or a DB marks has + * been set incorrectly via cmd_marks (or pgsql) */ + LOGEMERG("%s(): marks workinfoid %"PRId64" matches or" + " is part of the existing markerid %"PRId64, + __func__, wi_fin->workinfoid, + old_wm->markerid); + return false; + } + + snprintf(description, sizeof(description), "%s%s to %s%s", + stt->description, after ? "++" : "", + fin->description, before ? "--" : ""); + + ok = workmarkers_process(conn, true, 0, EMPTY, + wi_fin->workinfoid, wi_stt->workinfoid, + description, MARKER_READY_STR, + by, code, inet, cd, trf_root); + + if (!ok) + return false; + } + + ok = marks_process(conn, true, EMPTY, fin->workinfoid, + fin->description, fin->extra, fin->marktype, + MARK_USED_STR, by, code, inet, cd, trf_root); + + return ok; +} + +/* Generate workmarkers from the last USED mark + * Will only use the last USED mark and the contiguous READY + * marks after the last USED mark + * If a mark is found not READY it will stop at that one and + * report success with a message regarding the not READY one + * No checks are done for the validity of the mark status + * information, however, until the next step of generating + * the markersummaries is completely automated, rather than + * simply running the SQL script manually, the existence of + a workmarker wont actually do anything automatically */ +bool workmarkers_generate(PGconn *conn, char *err, size_t siz, char *by, + char *code, char *inet, tv_t *cd, K_TREE *trf_root) +{ + K_ITEM *m_item, *m_next_item; + MARKS *mused, *mnext; + MARKS marks; + K_TREE_CTX ctx[1]; + K_ITEM look; + bool any = false, ok; + + marks.expirydate.tv_sec = default_expiry.tv_sec; + marks.expirydate.tv_usec = default_expiry.tv_usec; + marks.workinfoid = MAXID; + + INIT_MARKS(&look); + look.data = (void *)(&marks); + K_RLOCK(marks_free); + m_item = find_before_in_ktree(marks_root, &look, cmp_marks, ctx); + while (m_item) { + DATA_MARKS(mused, m_item); + if (CURRENT(&(mused->expirydate)) && MUSED(mused->status)) + break; + m_item = prev_in_ktree(ctx); + } + K_RUNLOCK(marks_free); + if (!m_item || !CURRENT(&(mused->expirydate)) || !MUSED(mused->status)) { + snprintf(err, siz, "%s", "No trailing used mark found"); + return false; + } + K_RLOCK(marks_free); + m_item = next_in_ktree(ctx); + K_RUNLOCK(marks_free); + while (m_item) { + DATA_MARKS(mnext, m_item); + if (!CURRENT(&(mnext->expirydate)) || !!MREADY(mused->status)) + break; + /* We need to get the next marks in advance since + * gen_workmarker will create a new m_item flagged USED + * and the tree position ctx for m_item will no longer give + * us the correct 'next' + * However, we can still use mnext as mused in the subsequent + * loop since the data that we need hasn't been changed + */ + K_RLOCK(marks_free); + m_next_item = next_in_ktree(ctx); + K_RUNLOCK(marks_free); + +// save code space ... +#define GENWM(m1, b1, m2, b2) \ + gen_workmarkers(conn, m1, b1, m2, b2, by, code, inet, cd, trf_root) + + ok = true; + switch(mused->marktype[0]) { + case MARKTYPE_BLOCK: + case MARKTYPE_SHIFT_END: + case MARKTYPE_OTHER_FINISH: + switch(mnext->marktype[0]) { + case MARKTYPE_BLOCK: + case MARKTYPE_SHIFT_END: + case MARKTYPE_OTHER_FINISH: + ok = GENWM(mused, true, mnext, false); + if (ok) + any = true; + break; + case MARKTYPE_PPLNS: + case MARKTYPE_SHIFT_BEGIN: + case MARKTYPE_OTHER_BEGIN: + ok = GENWM(mused, true, mnext, true); + if (ok) + any = true; + break; + default: + snprintf(err, siz, + "Mark %"PRId64" has" + " an unknown marktype" + " '%s' - aborting", + mnext->workinfoid, + mnext->marktype); + return false; + } + break; + case MARKTYPE_PPLNS: + case MARKTYPE_SHIFT_BEGIN: + case MARKTYPE_OTHER_BEGIN: + switch(mnext->marktype[0]) { + case MARKTYPE_BLOCK: + case MARKTYPE_SHIFT_END: + case MARKTYPE_OTHER_FINISH: + ok = GENWM(mused, false, mnext, false); + if (ok) + any = true; + break; + case MARKTYPE_PPLNS: + case MARKTYPE_SHIFT_BEGIN: + case MARKTYPE_OTHER_BEGIN: + ok = GENWM(mused, false, mnext, true); + if (ok) + any = true; + break; + default: + snprintf(err, siz, + "Mark %"PRId64" has" + " an unknown marktype" + " '%s' - aborting", + mnext->workinfoid, + mnext->marktype); + return false; + } + break; + default: + snprintf(err, siz, + "Mark %"PRId64" has an unknown " + "marktype '%s' - aborting", + mused->workinfoid, + mused->marktype); + return false; + } + if (!ok) { + snprintf(err, siz, + "Processing marks %"PRId64" to " + "%"PRId64" failed - aborting", + mused->workinfoid, + mnext->workinfoid); + return false; + } + mused = mnext; + m_item = m_next_item; + } + if (!any) { + snprintf(err, siz, "%s", "No ready marks found"); + return false; + } + return true; +} + // order by expirydate asc,workinfoid asc // TODO: add poolinstance cmp_t cmp_marks(K_ITEM *a, K_ITEM *b) @@ -2447,3 +2705,123 @@ cmp_t cmp_marks(K_ITEM *a, K_ITEM *b) return c; } +K_ITEM *find_marks(int64_t workinfoid) +{ + MARKS marks; + K_TREE_CTX ctx[1]; + K_ITEM look; + + marks.expirydate.tv_sec = default_expiry.tv_sec; + marks.expirydate.tv_usec = default_expiry.tv_usec; + marks.workinfoid = workinfoid; + + INIT_MARKS(&look); + look.data = (void *)(&marks); + return find_in_ktree(marks_root, &look, cmp_marks, ctx); +} + +const char *marks_marktype(char *marktype) +{ + switch (marktype[0]) { + case MARKTYPE_BLOCK: + return marktype_block; + case MARKTYPE_PPLNS: + return marktype_pplns; + case MARKTYPE_SHIFT_BEGIN: + return marktype_shift_begin; + case MARKTYPE_SHIFT_END: + return marktype_shift_end; + case MARKTYPE_OTHER_BEGIN: + return marktype_other_begin; + case MARKTYPE_OTHER_FINISH: + return marktype_other_finish; + } + return NULL; +} + +bool _marks_description(char *description, size_t siz, char *marktype, + int32_t height, char *shift, char *other, + WHERE_FFL_ARGS) +{ + switch (marktype[0]) { + case MARKTYPE_BLOCK: + if (height < START_POOL_HEIGHT) { + LOGERR("%s() invalid pool height %"PRId32 + "for mark %s " WHERE_FFL, + __func__, height, + marks_marktype(marktype), + WHERE_FFL_PASS); + return false; + } + snprintf(description, siz, + marktype_block_fmt, height); + break; + case MARKTYPE_PPLNS: + if (height < START_POOL_HEIGHT) { + LOGERR("%s() invalid pool height %"PRId32 + "for mark %s " WHERE_FFL, + __func__, height, + marks_marktype(marktype), + WHERE_FFL_PASS); + return false; + } + snprintf(description, siz, + marktype_pplns_fmt, height); + break; + case MARKTYPE_SHIFT_BEGIN: + if (shift == NULL || !*shift) { + LOGERR("%s() invalid mark shift NULL/empty " + "for mark %s " WHERE_FFL, + __func__, + marks_marktype(marktype), + WHERE_FFL_PASS); + return false; + } + snprintf(description, siz, + marktype_shift_begin_fmt, shift); + break; + case MARKTYPE_SHIFT_END: + if (shift == NULL || !*shift) { + LOGERR("%s() invalid mark shift NULL/empty " + "for mark %s " WHERE_FFL, + __func__, + marks_marktype(marktype), + WHERE_FFL_PASS); + return false; + } + snprintf(description, siz, + marktype_shift_end_fmt, shift); + break; + case MARKTYPE_OTHER_BEGIN: + if (other == NULL) { + LOGERR("%s() invalid mark other NULL/empty " + "for mark %s " WHERE_FFL, + __func__, + marks_marktype(marktype), + WHERE_FFL_PASS); + return false; + } + snprintf(description, siz, + marktype_other_begin_fmt, other); + break; + case MARKTYPE_OTHER_FINISH: + if (other == NULL) { + LOGERR("%s() invalid mark other NULL/empty " + "for mark %s " WHERE_FFL, + __func__, + marks_marktype(marktype), + WHERE_FFL_PASS); + return false; + } + snprintf(description, siz, + marktype_other_finish_fmt, other); + break; + default: + LOGERR("%s() invalid marktype '%s'" WHERE_FFL, + __func__, marktype, + WHERE_FFL_PASS); + return false; + } + return true; +} + diff --git a/src/ckdb_dbio.c b/src/ckdb_dbio.c index 7d5bddd7..79ddfb85 100644 --- a/src/ckdb_dbio.c +++ b/src/ckdb_dbio.c @@ -2473,8 +2473,9 @@ bool shares_add(PGconn *conn, char *workinfoid, char *username, char *workername goto unitem; if (reloading && !confirm_sharesummary) { - // We only need to know if the workmarker is ready - wm_item = find_workmarkers(shares->workinfoid); + // We only need to know if the workmarker is processed + wm_item = find_workmarkers(shares->workinfoid, false, + MARKER_PROCESSED); if (wm_item) { K_WLOCK(shares_free); k_add_head(shares_free, s_item); @@ -2590,8 +2591,9 @@ bool shareerrors_add(PGconn *conn, char *workinfoid, char *username, goto unitem; if (reloading && !confirm_sharesummary) { - // We only need to know if the workmarker is ready - wm_item = find_workmarkers(shareerrors->workinfoid); + // We only need to know if the workmarker is processed + wm_item = find_workmarkers(shareerrors->workinfoid, false, + MARKER_PROCESSED); if (wm_item) { K_WLOCK(shareerrors_free); k_add_head(shareerrors_free, s_item); @@ -2693,14 +2695,14 @@ bool _sharesummary_update(PGconn *conn, SHARES *s_row, SHAREERRORS *e_row, K_ITE } K_RLOCK(workmarkers_free); - wm_item = find_workmarkers(workinfoid); + wm_item = find_workmarkers(workinfoid, false, MARKER_PROCESSED); K_RUNLOCK(workmarkers_free); if (wm_item) { char *tmp; DATA_WORKMARKERS(wm, wm_item); LOGERR("%s(): attempt to update sharesummary " "with %s %"PRId64"/%"PRId64"/%s createdate %s" - " but ready workmarkers %"PRId64" exists", + " but processed workmarkers %"PRId64" exists", __func__, s_row ? "shares" : "shareerrors", workinfoid, userid, workername, (tmp = ctv_to_buf(sharecreatedate, NULL, 0)), @@ -5009,6 +5011,231 @@ bool markersummary_fill(PGconn *conn) return ok; } +/* Add means create a new one and expire the old one if it exists, + * otherwise we only expire the old one if it exists + * Add requires all db fields except markerid, however if markerid + * is non-zero, it will be used instead of getting a new one + * i.e. this effectively means updating a workmarker + * !Add requires markerid or workinfoidend, only + * workinfoidend is used if markerid is zero + * N.B. if you expire a workmarker without creating a new one, + * it's markerid is effectively cancelled, since creating a + * new matching workmarker later, will get a new markerid, + * since we only check for a CURRENT workmarkers + * N.B. also, this returns success if !add and there is no matching + * old workmarkers */ +bool _workmarkers_process(PGconn *conn, bool add, int64_t markerid, + char *poolinstance, int64_t workinfoidend, + int64_t workinfoidstart, char *description, + char *status, char *by, char *code, + char *inet, tv_t *cd, K_TREE *trf_root, + WHERE_FFL_ARGS) +{ + ExecStatusType rescode; + bool conned = false; + PGresult *res = NULL; + K_ITEM *wm_item = NULL, *old_wm_item = NULL, *w_item; + WORKMARKERS *row, *oldworkmarkers; + char *upd, *ins; + char *params[6 + HISTORYDATECOUNT]; + bool ok = false, begun = false; + int n, par = 0; + + LOGDEBUG("%s(): add", __func__); + + if (markerid == 0) { + K_RLOCK(workmarkers_free); + old_wm_item = find_workmarkers(workinfoidend, true, '\0'); + K_RUNLOCK(workmarkers_free); + } else { + K_RLOCK(workmarkers_free); + old_wm_item = find_workmarkerid(markerid, true, '\0'); + K_RUNLOCK(workmarkers_free); + } + if (old_wm_item) { + DATA_WORKMARKERS(oldworkmarkers, old_wm_item); + if (!conn) { + conn = dbconnect(); + conned = true; + } + res = PQexec(conn, "Begin", CKPQ_WRITE); + rescode = PQresultStatus(res); + PQclear(res); + if (!PGOK(rescode)) { + PGLOGERR("Begin", rescode, conn); + goto unparam; + } + + begun = true; + + upd = "update workmarkers set expirydate=$1 where markerid=$2" + " and expirydate=$3"; + par = 0; + params[par++] = tv_to_buf(cd, NULL, 0); + params[par++] = bigint_to_buf(oldworkmarkers->markerid, NULL, 0); + params[par++] = tv_to_buf((tv_t *)&default_expiry, NULL, 0); + PARCHKVAL(par, 3, params); + + res = PQexecParams(conn, upd, par, NULL, (const char **)params, NULL, NULL, 0, CKPQ_WRITE); + rescode = PQresultStatus(res); + PQclear(res); + if (!PGOK(rescode)) { + PGLOGERR("Update", rescode, conn); + goto rollback; + } + + for (n = 0; n < par; n++) + free(params[n]); + par = 0; + } + + if (add) { + if (poolinstance == NULL || description == NULL || + status == NULL) { + LOGEMERG("%s(): NULL field(s) passed:%s%s%s" + WHERE_FFL, __func__, + poolinstance ? "" : " poolinstance", + description ? "" : " description", + status ? "" : " status", + WHERE_FFL_PASS); + goto rollback; + } + w_item = find_workinfo(workinfoidend); + if (!w_item) + goto rollback; + w_item = find_workinfo(workinfoidstart); + if (!w_item) + goto rollback; + K_WLOCK(workmarkers_free); + wm_item = k_unlink_head(workmarkers_free); + K_WUNLOCK(workmarkers_free); + DATA_WORKMARKERS(row, wm_item); + bzero(row, sizeof(*row)); + + if (conn == NULL) { + conn = dbconnect(); + conned = true; + } + + if (!begun) { + res = PQexec(conn, "Begin", CKPQ_WRITE); + rescode = PQresultStatus(res); + PQclear(res); + if (!PGOK(rescode)) { + PGLOGERR("Begin", rescode, conn); + goto unparam; + } + begun = true; + } + + if (old_wm_item) + row->markerid = oldworkmarkers->markerid; + else { + if (markerid != 0) + row->markerid = markerid; + else { + row->markerid = nextid(conn, "markerid", 1, + cd, by, code, inet); + if (row->markerid == 0) + goto rollback; + } + } + + row->poolinstance = strdup(poolinstance); + LIST_MEM_ADD(workmarkers_free, poolinstance); + row->workinfoidend = workinfoidend; + row->workinfoidstart = workinfoidstart; + row->description = strdup(description); + LIST_MEM_ADD(workmarkers_free, description); + STRNCPY(row->status, status); + HISTORYDATEINIT(row, cd, by, code, inet); + HISTORYDATETRANSFER(trf_root, row); + + ins = "insert into workmarkers " + "(markerid,poolinstance,workinfoidend,workinfoidstart," + "description,status" + HISTORYDATECONTROL ") values (" PQPARAM11 ")"; + par = 0; + params[par++] = bigint_to_buf(row->markerid, NULL, 0); + params[par++] = str_to_buf(row->poolinstance, NULL, 0); + params[par++] = bigint_to_buf(row->workinfoidend, NULL, 0); + params[par++] = bigint_to_buf(row->workinfoidstart, NULL, 0); + params[par++] = str_to_buf(row->description, NULL, 0); + params[par++] = str_to_buf(row->status, NULL, 0); + HISTORYDATEPARAMS(params, par, row); + PARCHK(par, params); + + res = PQexecParams(conn, ins, par, NULL, (const char **)params, NULL, NULL, 0, CKPQ_WRITE); + rescode = PQresultStatus(res); + PQclear(res); + if (!PGOK(rescode)) { + PGLOGERR("Insert", rescode, conn); + goto rollback; + } + } + + ok = true; +rollback: + if (begun) { + if (ok) + res = PQexec(conn, "Commit", CKPQ_WRITE); + else + res = PQexec(conn, "Rollback", CKPQ_WRITE); + + PQclear(res); + } +unparam: + for (n = 0; n < par; n++) + free(params[n]); + + if (conned) + PQfinish(conn); + + K_WLOCK(workmarkers_free); + if (!ok) { + if (wm_item) { + DATA_WORKMARKERS(row, wm_item); + if (row->poolinstance) { + if (row->poolinstance != EMPTY) { + LIST_MEM_SUB(workmarkers_free, + row->poolinstance); + free(row->poolinstance); + } + row->poolinstance = NULL; + } + if (row->description) { + if (row->description != EMPTY) { + LIST_MEM_SUB(workmarkers_free, + row->description); + free(row->description); + } + row->description = NULL; + } + k_add_head(workmarkers_free, wm_item); + } + } + else { + if (old_wm_item) { + workmarkers_root = remove_from_ktree(workmarkers_root, + old_wm_item, + cmp_workmarkers); + copy_tv(&(oldworkmarkers->expirydate), cd); + workmarkers_root = add_to_ktree(workmarkers_root, + old_wm_item, + cmp_workmarkers); + } + if (wm_item) { + workmarkers_root = add_to_ktree(workmarkers_root, + wm_item, + cmp_workmarkers); + k_add_head(workmarkers_store, wm_item); + } + } + K_WUNLOCK(workmarkers_free); + + return ok; +} + bool workmarkers_fill(PGconn *conn) { ExecStatusType rescode; @@ -5111,6 +5338,200 @@ bool workmarkers_fill(PGconn *conn) return ok; } +/* Add means create a new one and expire the old one if it exists, + * otherwise we only expire the old one if it exists + * Add requires all db fields + * !Add only requires the (poolinstance and) workinfoid db fields */ +bool _marks_process(PGconn *conn, bool add, char *poolinstance, + int64_t workinfoid, char *description, char *extra, + char *marktype, char *status, char *by, char *code, + char *inet, tv_t *cd, K_TREE *trf_root, WHERE_FFL_ARGS) +{ + ExecStatusType rescode; + bool conned = false; + PGresult *res = NULL; + K_ITEM *m_item = NULL, *old_m_item = NULL, *w_item; + MARKS *row, *oldmarks; + char *upd, *ins; + char *params[6 + HISTORYDATECOUNT]; + bool ok = false, begun = false; + int n, par = 0; + + LOGDEBUG("%s(): add", __func__); + + K_RLOCK(marks_free); + old_m_item = find_marks(workinfoid); + K_RUNLOCK(marks_free); + if (old_m_item) { + DATA_MARKS(oldmarks, old_m_item); + if (!conn) { + conn = dbconnect(); + conned = true; + } + res = PQexec(conn, "Begin", CKPQ_WRITE); + rescode = PQresultStatus(res); + PQclear(res); + if (!PGOK(rescode)) { + PGLOGERR("Begin", rescode, conn); + goto unparam; + } + + begun = true; + + upd = "update marks set expirydate=$1 where workinfoid=$2" + " and expirydate=$3"; + par = 0; + params[par++] = tv_to_buf(cd, NULL, 0); + params[par++] = bigint_to_buf(workinfoid, NULL, 0); + params[par++] = tv_to_buf((tv_t *)&default_expiry, NULL, 0); + PARCHKVAL(par, 3, params); + + res = PQexecParams(conn, upd, par, NULL, (const char **)params, NULL, NULL, 0, CKPQ_WRITE); + rescode = PQresultStatus(res); + PQclear(res); + if (!PGOK(rescode)) { + PGLOGERR("Update", rescode, conn); + goto rollback; + } + + for (n = 0; n < par; n++) + free(params[n]); + par = 0; + } + + if (add) { + if (poolinstance == NULL || description == NULL || + extra == NULL || marktype == NULL || status == NULL) { + LOGEMERG("%s(): NULL field(s) passed:%s%s%s%s%s" + WHERE_FFL, __func__, + poolinstance ? "" : " poolinstance", + description ? "" : " description", + extra ? "" : " extra", + marktype ? "" : " marktype", + status ? "" : " status", + WHERE_FFL_PASS); + goto rollback; + } + w_item = find_workinfo(workinfoid); + if (!w_item) + goto rollback; + K_WLOCK(marks_free); + m_item = k_unlink_head(marks_free); + K_WUNLOCK(marks_free); + DATA_MARKS(row, m_item); + bzero(row, sizeof(*row)); + row->poolinstance = strdup(poolinstance); + LIST_MEM_ADD(marks_free, poolinstance); + row->workinfoid = workinfoid; + row->description = strdup(description); + LIST_MEM_ADD(marks_free, description); + row->extra = strdup(extra); + LIST_MEM_ADD(marks_free, extra); + STRNCPY(row->marktype, marktype); + STRNCPY(row->status, status); + HISTORYDATEINIT(row, cd, by, code, inet); + HISTORYDATETRANSFER(trf_root, row); + + ins = "insert into marks " + "(poolinstance,workinfoid,description,extra,marktype," + "status" + HISTORYDATECONTROL ") values (" PQPARAM11 ")"; + par = 0; + params[par++] = str_to_buf(row->poolinstance, NULL, 0); + params[par++] = bigint_to_buf(workinfoid, NULL, 0); + params[par++] = str_to_buf(row->description, NULL, 0); + params[par++] = str_to_buf(row->extra, NULL, 0); + params[par++] = str_to_buf(row->marktype, NULL, 0); + params[par++] = str_to_buf(row->status, NULL, 0); + HISTORYDATEPARAMS(params, par, row); + PARCHK(par, params); + + if (conn == NULL) { + conn = dbconnect(); + conned = true; + } + + if (!begun) { + res = PQexec(conn, "Begin", CKPQ_WRITE); + rescode = PQresultStatus(res); + PQclear(res); + if (!PGOK(rescode)) { + PGLOGERR("Begin", rescode, conn); + goto unparam; + } + begun = true; + } + + res = PQexecParams(conn, ins, par, NULL, (const char **)params, NULL, NULL, 0, CKPQ_WRITE); + rescode = PQresultStatus(res); + PQclear(res); + if (!PGOK(rescode)) { + PGLOGERR("Insert", rescode, conn); + goto rollback; + } + } + + ok = true; +rollback: + if (begun) { + if (ok) + res = PQexec(conn, "Commit", CKPQ_WRITE); + else + res = PQexec(conn, "Rollback", CKPQ_WRITE); + + PQclear(res); + } +unparam: + for (n = 0; n < par; n++) + free(params[n]); + + if (conned) + PQfinish(conn); + + K_WLOCK(marks_free); + if (!ok) { + if (m_item) { + DATA_MARKS(row, m_item); + if (row->poolinstance) { + if (row->poolinstance != EMPTY) { + LIST_MEM_SUB(marks_free, row->poolinstance); + free(row->poolinstance); + } + row->poolinstance = NULL; + } + if (row->description) { + if (row->description != EMPTY) { + LIST_MEM_SUB(marks_free, row->description); + free(row->description); + } + row->description = NULL; + } + if (row->extra) { + if (row->extra != EMPTY) { + LIST_MEM_SUB(marks_free, row->extra); + free(row->extra); + } + row->extra = NULL; + } + k_add_head(marks_free, m_item); + } + } + else { + if (old_m_item) { + marks_root = remove_from_ktree(marks_root, old_m_item, cmp_marks); + copy_tv(&(oldmarks->expirydate), cd); + marks_root = add_to_ktree(marks_root, old_m_item, cmp_marks); + } + if (m_item) { + marks_root = add_to_ktree(marks_root, m_item, cmp_marks); + k_add_head(marks_store, m_item); + } + } + K_WUNLOCK(marks_free); + + return ok; +} + bool marks_fill(PGconn *conn) { ExecStatusType rescode; @@ -5120,14 +5541,14 @@ bool marks_fill(PGconn *conn) MARKS *row; char *field; char *sel; - int fields = 5; + int fields = 6; bool ok; LOGDEBUG("%s(): select", __func__); // TODO: limit how far back sel = "select " - "poolinstance,workinfoid,description,marktype,status" + "poolinstance,workinfoid,description,extra,marktype,status" HISTORYDATECONTROL " from marks"; res = PQexec(conn, sel, CKPQ_READ); @@ -5173,6 +5594,11 @@ bool marks_fill(PGconn *conn) break; TXT_TO_PTR("description", field, row->description); + PQ_GET_FLD(res, i, "extra", field, ok); + if (!ok) + break; + TXT_TO_PTR("extra", field, row->extra); + PQ_GET_FLD(res, i, "marktype", field, ok); if (!ok) break; From 58be2ed29f7bce2548d7721de2e118ff648d048a Mon Sep 17 00:00:00 2001 From: kanoi Date: Thu, 27 Nov 2014 21:07:23 +1100 Subject: [PATCH 2/3] ckdb - fix for pplns when payout is only in workmarkers --- src/ckdb.h | 2 +- src/ckdb_cmd.c | 23 +++++++++++++---------- 2 files changed, 14 insertions(+), 11 deletions(-) diff --git a/src/ckdb.h b/src/ckdb.h index afade726..9ed96f68 100644 --- a/src/ckdb.h +++ b/src/ckdb.h @@ -52,7 +52,7 @@ #define DB_VLOCK "1" #define DB_VERSION "0.9.6" -#define CKDB_VERSION DB_VERSION"-0.664" +#define CKDB_VERSION DB_VERSION"-0.665" #define WHERE_FFL " - from %s %s() line %d" #define WHERE_FFL_HERE __FILE__, __func__, __LINE__ diff --git a/src/ckdb_cmd.c b/src/ckdb_cmd.c index 45fe8dc3..1e9ac011 100644 --- a/src/ckdb_cmd.c +++ b/src/ckdb_cmd.c @@ -2969,7 +2969,7 @@ static char *cmd_pplns(__maybe_unused PGconn *conn, char *cmd, char *id, K_STORE *mu_store; USERS *users; int32_t height; - int64_t workinfoid, end_workinfoid = 0; + int64_t block_workinfoid, end_workinfoid; int64_t begin_workinfoid; int64_t total_share_count, acc_share_count; int64_t ss_count, wm_count, ms_count; @@ -3054,10 +3054,10 @@ static char *cmd_pplns(__maybe_unused PGconn *conn, char *cmd, char *id, block_extra = EMPTY; break; } - workinfoid = blocks->workinfoid; - w_item = find_workinfo(workinfoid); + block_workinfoid = blocks->workinfoid; + w_item = find_workinfo(block_workinfoid); if (!w_item) { - snprintf(reply, siz, "ERR.missing workinfo %"PRId64, workinfoid); + snprintf(reply, siz, "ERR.missing workinfo %"PRId64, block_workinfoid); return strdup(reply); } DATA_WORKINFO(workinfo, w_item); @@ -3072,7 +3072,7 @@ static char *cmd_pplns(__maybe_unused PGconn *conn, char *cmd, char *id, return strdup(reply); } - begin_workinfoid = 0; + begin_workinfoid = end_workinfoid = 0; total_share_count = acc_share_count = 0; total_diff = 0; ss_count = wm_count = ms_count = 0; @@ -3080,7 +3080,7 @@ static char *cmd_pplns(__maybe_unused PGconn *conn, char *cmd, char *id, mu_store = k_new_store(miningpayouts_free); mu_root = new_ktree(); - looksharesummary.workinfoid = workinfoid; + looksharesummary.workinfoid = block_workinfoid; looksharesummary.userid = MAXID; looksharesummary.workername = EMPTY; INIT_SHARESUMMARY(&ss_look); @@ -3150,7 +3150,10 @@ static char *cmd_pplns(__maybe_unused PGconn *conn, char *cmd, char *id, if (total_diff < diff_want) { lookworkmarkers.expirydate.tv_sec = default_expiry.tv_sec; lookworkmarkers.expirydate.tv_usec = default_expiry.tv_usec; - lookworkmarkers.workinfoidend = begin_workinfoid; + if (begin_workinfoid != 0) + lookworkmarkers.workinfoidend = begin_workinfoid; + else + lookworkmarkers.workinfoidend = block_workinfoid + 1; INIT_WORKMARKERS(&wm_look); wm_look.data = (void *)(&lookworkmarkers); wm_item = find_before_in_ktree(workmarkers_workinfoid_root, &wm_look, @@ -3196,13 +3199,13 @@ static char *cmd_pplns(__maybe_unused PGconn *conn, char *cmd, char *id, if (total_diff == 0.0) { snprintf(reply, siz, "ERR.total share diff 0 before workinfo %"PRId64, - workinfoid); + block_workinfoid); goto shazbot; } wb_item = find_workinfo(begin_workinfoid); if (!wb_item) { - snprintf(reply, siz, "ERR.missing begin workinfo record! %"PRId64, workinfoid); + snprintf(reply, siz, "ERR.missing begin workinfo record! %"PRId64, block_workinfoid); goto shazbot; } DATA_WORKINFO(workinfo, wb_item); @@ -3238,7 +3241,7 @@ static char *cmd_pplns(__maybe_unused PGconn *conn, char *cmd, char *id, APPEND_REALLOC(buf, off, len, tmp); snprintf(tmp, sizeof(tmp), "begin_workinfoid=%"PRId64"%c", begin_workinfoid, FLDSEP); APPEND_REALLOC(buf, off, len, tmp); - snprintf(tmp, sizeof(tmp), "block_workinfoid=%"PRId64"%c", workinfoid, FLDSEP); + snprintf(tmp, sizeof(tmp), "block_workinfoid=%"PRId64"%c", block_workinfoid, FLDSEP); APPEND_REALLOC(buf, off, len, tmp); snprintf(tmp, sizeof(tmp), "end_workinfoid=%"PRId64"%c", end_workinfoid, FLDSEP); APPEND_REALLOC(buf, off, len, tmp); From ac05feeebcad07796106f38a0bf546768032b496 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Fri, 28 Nov 2014 10:24:32 +1100 Subject: [PATCH 3/3] Fix potential socket leak in gen_loop --- src/generator.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/generator.c b/src/generator.c index 0fd73d55..07fc5d40 100644 --- a/src/generator.c +++ b/src/generator.c @@ -246,6 +246,7 @@ static int gen_loop(proc_instance_t *pi) char hash[68]; reconnect: + Close(sockd); if (si) { kill_server(si); reconnecting = true; @@ -262,6 +263,7 @@ reconnect: } retry: + Close(sockd); ckmsgq_add(srvchk, si); do { @@ -289,7 +291,6 @@ retry: buf = recv_unix_msg(sockd); if (!buf) { LOGWARNING("Failed to get message in gen_loop"); - Close(sockd); goto retry; } LOGDEBUG("Generator received request: %s", buf); @@ -369,7 +370,6 @@ retry: LOGDEBUG("Generator received ping request"); send_unix_msg(sockd, "pong"); } - Close(sockd); goto retry; out: