Browse Source

ckdb - implement marks with cmd_marks to create and use marks and workmarkers

master
kanoi 10 years ago
parent
commit
93b8bf0071
  1. 3
      sql/ckdb.sql
  2. 39
      sql/v0.9.5-v0.9.6.sql
  3. 69
      src/ckdb.c
  4. 116
      src/ckdb.h
  5. 323
      src/ckdb_cmd.c
  6. 398
      src/ckdb_data.c
  7. 442
      src/ckdb_dbio.c

3
sql/ckdb.sql

@ -255,6 +255,7 @@ CREATE TABLE marks ( -- workinfoids to make workmarkers
poolinstance character varying(256) NOT NULL, poolinstance character varying(256) NOT NULL,
workinfoid bigint NOT NULL, workinfoid bigint NOT NULL,
description character varying(256) DEFAULT ''::character varying NOT NULL, description character varying(256) DEFAULT ''::character varying NOT NULL,
extra character varying(256) DEFAULT ''::character varying NOT NULL,
marktype char NOT NULL, -- 'b'lock 'p'plns-begin 's'hift-begin 'e'=shift-end marktype char NOT NULL, -- 'b'lock 'p'plns-begin 's'hift-begin 'e'=shift-end
status char NOT NULL, status char NOT NULL,
createdate timestamp with time zone NOT NULL, createdate timestamp with time zone NOT NULL,
@ -432,4 +433,4 @@ CREATE TABLE version (
PRIMARY KEY (vlock) PRIMARY KEY (vlock)
); );
insert into version (vlock,version) values (1,'0.9.5'); insert into version (vlock,version) values (1,'0.9.6');

39
sql/v0.9.5-v0.9.6.sql

@ -0,0 +1,39 @@
SET SESSION AUTHORIZATION 'postgres';
BEGIN transaction;
DO $$
DECLARE ver TEXT;
BEGIN
UPDATE version set version='0.9.6' where vlock=1 and version='0.9.5';
IF found THEN
RETURN;
END IF;
SELECT version into ver from version
WHERE vlock=1;
RAISE EXCEPTION 'Wrong DB version - expect "0.9.5" - found "%"', ver;
END $$;
DROP TABLE marks;
CREATE TABLE marks ( -- workinfoids to make workmarkers
poolinstance character varying(256) NOT NULL,
workinfoid bigint NOT NULL,
description character varying(256) DEFAULT ''::character varying NOT NULL,
extra character varying(256) DEFAULT ''::character varying NOT NULL,
marktype char NOT NULL, -- 'b'lock(end) 'p'plns-begin 's'hift-begin 'e'=shift-end
status char NOT NULL,
createdate timestamp with time zone NOT NULL,
createby character varying(64) DEFAULT ''::character varying NOT NULL,
createcode character varying(128) DEFAULT ''::character varying NOT NULL,
createinet character varying(128) DEFAULT ''::character varying NOT NULL,
expirydate timestamp with time zone DEFAULT '6666-06-06 06:06:06+00',
PRIMARY KEY (poolinstance, workinfoid, expirydate)
);
END transaction;

69
src/ckdb.c

@ -460,6 +460,20 @@ K_TREE *marks_root;
K_LIST *marks_free; K_LIST *marks_free;
K_STORE *marks_store; K_STORE *marks_store;
const char *marktype_block = "Block End";
const char *marktype_pplns = "Payout Begin";
const char *marktype_shift_begin = "Shift Begin";
const char *marktype_shift_end = "Shift End";
const char *marktype_other_begin = "Other Begin";
const char *marktype_other_finish = "Other Finish";
const char *marktype_block_fmt = "Block %"PRId32" fin";
const char *marktype_pplns_fmt = "Payout %"PRId32" stt";
const char *marktype_shift_begin_fmt = "Shift %s stt";
const char *marktype_shift_end_fmt = "Shift %s fin";
const char *marktype_other_begin_fmt = "stt: %s";
const char *marktype_other_finish_fmt = "fin: %s";
static char logname[512]; static char logname[512];
static char *dbcode; static char *dbcode;
@ -1046,7 +1060,7 @@ static void alloc_storage()
marks_free = k_new_list("Marks", sizeof(MARKS), marks_free = k_new_list("Marks", sizeof(MARKS),
ALLOC_MARKS, LIMIT_MARKS, true); ALLOC_MARKS, LIMIT_MARKS, true);
marks_store = k_new_store(workmarkers_free); marks_store = k_new_store(marks_free);
marks_root = new_ktree(); marks_root = new_ktree();
} }
@ -1114,6 +1128,19 @@ static void free_workmarkers_data(K_ITEM *item)
free(workmarkers->description); free(workmarkers->description);
} }
static void free_marks_data(K_ITEM *item)
{
MARKS *marks;
DATA_MARKS(marks, item);
if (marks->poolinstance && marks->poolinstance != EMPTY)
free(marks->poolinstance);
if (marks->description && marks->description != EMPTY)
free(marks->description);
if (marks->extra && marks->extra != EMPTY)
free(marks->extra);
}
#define FREE_TREE(_tree) \ #define FREE_TREE(_tree) \
if (_tree ## _root) \ if (_tree ## _root) \
_tree ## _root = free_ktree(_tree ## _root, NULL) \ _tree ## _root = free_ktree(_tree ## _root, NULL) \
@ -1154,7 +1181,9 @@ static void dealloc_storage()
{ {
FREE_LISTS(logqueue); FREE_LISTS(logqueue);
FREE_ALL(marks); FREE_TREE(marks);
FREE_STORE_DATA(marks);
FREE_LIST_DATA(marks);
FREE_TREE(workmarkers_workinfoid); FREE_TREE(workmarkers_workinfoid);
FREE_TREE(workmarkers); FREE_TREE(workmarkers);
@ -1281,7 +1310,7 @@ static bool setup_data()
static enum cmd_values breakdown(K_TREE **trf_root, K_STORE **trf_store, static enum cmd_values breakdown(K_TREE **trf_root, K_STORE **trf_store,
char *buf, int *which_cmds, char *cmd, char *buf, int *which_cmds, char *cmd,
char *id, tv_t *cd) char *id, tv_t *now, tv_t *cd)
{ {
char reply[1024] = ""; char reply[1024] = "";
TRANSFER *transfer; TRANSFER *transfer;
@ -1295,8 +1324,7 @@ static enum cmd_values breakdown(K_TREE **trf_root, K_STORE **trf_store,
*trf_store = NULL; *trf_store = NULL;
*which_cmds = CMD_UNSET; *which_cmds = CMD_UNSET;
*cmd = *id = '\0'; *cmd = *id = '\0';
cd->tv_sec = 0; copy_tv(cd, now); // default cd to 'now'
cd->tv_usec = 0;
cmdptr = strdup(buf); cmdptr = strdup(buf);
idptr = strchr(cmdptr, '.'); idptr = strchr(cmdptr, '.');
@ -1676,7 +1704,7 @@ static void summarise_blocks()
workmarkers->description, workmarkers->description,
workmarkers->status, hi, wi_finish); workmarkers->status, hi, wi_finish);
} }
if (WMREADY(workmarkers->status)) { if (WMPROCESSED(workmarkers->status)) {
lookmarkersummary.markerid = workmarkers->markerid; lookmarkersummary.markerid = workmarkers->markerid;
lookmarkersummary.userid = MAXID; lookmarkersummary.userid = MAXID;
lookmarkersummary.workername = EMPTY; lookmarkersummary.workername = EMPTY;
@ -2202,7 +2230,7 @@ static void *socketer(__maybe_unused void *arg)
LOGDEBUG("Duplicate '%s' message received", duptype); LOGDEBUG("Duplicate '%s' message received", duptype);
} else { } else {
LOGQUE(buf); LOGQUE(buf);
cmdnum = breakdown(&trf_root, &trf_store, buf, &which_cmds, cmd, id, &cd); cmdnum = breakdown(&trf_root, &trf_store, buf, &which_cmds, cmd, id, &now, &cd);
switch (cmdnum) { switch (cmdnum) {
case CMD_REPLY: case CMD_REPLY:
snprintf(reply, sizeof(reply), "%s.%ld.?.", id, now.tv_sec); snprintf(reply, sizeof(reply), "%s.%ld.?.", id, now.tv_sec);
@ -2373,6 +2401,30 @@ static void *socketer(__maybe_unused void *arg)
rep = NULL; rep = NULL;
} }
break; break;
/* Process, but reject (loading) until startup_complete
* and don't test for duplicates */
case CMD_MARKS:
if (!startup_complete) {
snprintf(reply, sizeof(reply),
"%s.%ld.loading.%s",
id, now.tv_sec, cmd);
send_unix_msg(sockd, reply);
} else {
ans = ckdb_cmds[which_cmds].func(NULL, cmd, id, &now,
by_default,
(char *)__func__,
inet_default,
&cd, trf_root);
siz = strlen(ans) + strlen(id) + 32;
rep = malloc(siz);
snprintf(rep, siz, "%s.%ld.%s", id, now.tv_sec, ans);
send_unix_msg(sockd, rep);
free(ans);
ans = NULL;
free(rep);
rep = NULL;
}
break;
// Always queue (ok.queued) // Always queue (ok.queued)
case CMD_SHARELOG: case CMD_SHARELOG:
case CMD_POOLSTAT: case CMD_POOLSTAT:
@ -2503,7 +2555,7 @@ static bool reload_line(PGconn *conn, char *filename, uint64_t count, char *buf)
} }
LOGQUE(buf); LOGQUE(buf);
cmdnum = breakdown(&trf_root, &trf_store, buf, &which_cmds, cmd, id, &cd); cmdnum = breakdown(&trf_root, &trf_store, buf, &which_cmds, cmd, id, &now, &cd);
switch (cmdnum) { switch (cmdnum) {
// Ignore // Ignore
case CMD_REPLY: case CMD_REPLY:
@ -2535,6 +2587,7 @@ static bool reload_line(PGconn *conn, char *filename, uint64_t count, char *buf)
case CMD_STATS: case CMD_STATS:
case CMD_PPLNS: case CMD_PPLNS:
case CMD_USERSTATUS: case CMD_USERSTATUS:
case CMD_MARKS:
LOGERR("%s() Message line %"PRIu64" '%s' - invalid - ignored", LOGERR("%s() Message line %"PRIu64" '%s' - invalid - ignored",
__func__, count, cmd); __func__, count, cmd);
break; break;

116
src/ckdb.h

@ -51,8 +51,8 @@
*/ */
#define DB_VLOCK "1" #define DB_VLOCK "1"
#define DB_VERSION "0.9.5" #define DB_VERSION "0.9.6"
#define CKDB_VERSION DB_VERSION"-0.652" #define CKDB_VERSION DB_VERSION"-0.664"
#define WHERE_FFL " - from %s %s() line %d" #define WHERE_FFL " - from %s %s() line %d"
#define WHERE_FFL_HERE __FILE__, __func__, __LINE__ #define WHERE_FFL_HERE __FILE__, __func__, __LINE__
@ -306,6 +306,7 @@ enum cmd_values {
CMD_STATS, CMD_STATS,
CMD_PPLNS, CMD_PPLNS,
CMD_USERSTATUS, CMD_USERSTATUS,
CMD_MARKS,
CMD_END CMD_END
}; };
@ -882,6 +883,7 @@ typedef struct workinfo {
#define LIMIT_WORKINFO 0 #define LIMIT_WORKINFO 0
#define INIT_WORKINFO(_item) INIT_GENERIC(_item, workinfo) #define INIT_WORKINFO(_item) INIT_GENERIC(_item, workinfo)
#define DATA_WORKINFO(_var, _item) DATA_GENERIC(_var, _item, workinfo, true) #define DATA_WORKINFO(_var, _item) DATA_GENERIC(_var, _item, workinfo, true)
#define DATA_WORKINFO_NULL(_var, _item) DATA_GENERIC(_var, _item, workinfo, false)
extern K_TREE *workinfo_root; extern K_TREE *workinfo_root;
// created during data load then destroyed since not needed later // created during data load then destroyed since not needed later
@ -1339,46 +1341,59 @@ extern K_TREE *workmarkers_workinfoid_root;
extern K_LIST *workmarkers_free; extern K_LIST *workmarkers_free;
extern K_STORE *workmarkers_store; extern K_STORE *workmarkers_store;
#define MARKER_COMPLETE 'x' #define MARKER_READY 'x'
#define WMREADY(_status) (tolower(_status[0]) == MARKER_COMPLETE) #define MARKER_READY_STR "x"
#define WMREADY(_status) (tolower((_status)[0]) == MARKER_READY)
#define MARKER_PROCESSED 'p'
#define MARKER_PROCESSED_STR "p"
#define WMPROCESSED(_status) (tolower((_status)[0]) == MARKER_PROCESSED)
// MARKS // MARKS
// TODO: implement
typedef struct marks { typedef struct marks {
char *poolinstance; char *poolinstance;
int64_t workinfoid; int64_t workinfoid;
char *description; char *description;
char *extra;
char marktype[TXT_FLAG+1]; char marktype[TXT_FLAG+1];
char status[TXT_FLAG+1]; char status[TXT_FLAG+1];
HISTORYDATECONTROLFIELDS; HISTORYDATECONTROLFIELDS;
} MARKS; } MARKS;
/* Marks: /* marks:
* marktype is one of: * marktype is one of:
* b - block end * b - block end
* p - pplns begin * p - pplns begin
* s - shift begin (not yet used) * s - shift begin (not yet used)
* e - shift end (not yet used) * e - shift end (not yet used)
* description should one one of * o - other begin
* b - Block NNN stt * f - other finish/end
* p - Payout NNN fin (where NNN is the block number of the payout) * description generated will be:
* s/e - to be decided * b - Block NNN fin
* p - Payout NNN stt (where NNN is the block number of the payout)
* s - Shift AAA stt
* e - Shift AAA fin
* o - The string passed to the marks command
* f - The string passed to the marks command
* *
* WorkMarkers are from a begin workinfoid to an end workinfoid * workmarkers are from a begin workinfoid to an end workinfoid
* the "-1" and "+1" below mean adding to or subtracting from * the "-1" and "+1" below mean adding to or subtracting from
* the workinfoid number * the workinfoid number - but should move forward/back to the
* next/prev valid workinfoid, not simply +1 or -1
* i.e. all workinfoid in marks and workmarkers must exist in workinfo
* *
* Until we start using shifts: * Until we start using shifts:
* WorkMarkers can be created up to ending in the largest 'p' "-1" * workmarkers can be created up to ending in the largest 'p' "-1"
* WorkMarkers will always be the smallest of: * workmarkers will always be the smallest of:
* Block NNN-1 "+1" to Block NNN * Block NNN-1 "+1" to Block NNN
* Block NNN "+1" to Payout MMM "-1" * Block NNN "+1" to Payout MMM "-1"
* Payout MMM to Block NNN * Payout MMM to Block NNN
* Payout MMM-1 to Payout MMM "-1" * Payout MMM-1 to Payout MMM "-1"
* Thus to generate the WorkMarkers from the Marks: * Thus to generate the workmarkers from the marks:
* Find the last 'p' with no matching workinfoidbegin * Find the last USED mark then create a workmarker from each pair of
* Then determine each previous WorkMarker based on each previous * marks going forward for each READY mark
* mark, using the above rules and stop when we find one that already exists * Set each mark as USED when we use it
* Stop when either we run out of marks or find a non-READY mark
* Finding a workmarker that already exists is an error
*/ */
#define ALLOC_MARKS 1000 #define ALLOC_MARKS 1000
@ -1391,8 +1406,33 @@ extern K_TREE *marks_root;
extern K_LIST *marks_free; extern K_LIST *marks_free;
extern K_STORE *marks_store; extern K_STORE *marks_store;
#define MARKTYPE_BLOCK 'b'
#define MARKTYPE_PPLNS 'p'
#define MARKTYPE_SHIFT_BEGIN 's'
#define MARKTYPE_SHIFT_END 'e'
#define MARKTYPE_OTHER_BEGIN 'o'
#define MARKTYPE_OTHER_FINISH 'f'
extern const char *marktype_block;
extern const char *marktype_pplns;
extern const char *marktype_shift_begin;
extern const char *marktype_shift_end;
extern const char *marktype_other_begin;
extern const char *marktype_other_finish;
extern const char *marktype_block_fmt;
extern const char *marktype_pplns_fmt;
extern const char *marktype_shift_begin_fmt;
extern const char *marktype_shift_end_fmt;
extern const char *marktype_other_begin_fmt;
extern const char *marktype_other_finish_fmt;
#define MARK_READY 'x' #define MARK_READY 'x'
#define MREADY(_status) (tolower(_status[0]) == MARK_READY) #define MARK_READY_STR "x"
#define MREADY(_status) (tolower((_status)[0]) == MARK_READY)
#define MARK_USED 'u'
#define MARK_USED_STR "u"
#define MUSED(_status) (tolower((_status)[0]) == MARK_USED)
extern void logmsg(int loglevel, const char *fmt, ...); extern void logmsg(int loglevel, const char *fmt, ...);
extern void setnow(tv_t *now); extern void setnow(tv_t *now);
@ -1557,9 +1597,19 @@ extern K_ITEM *find_markersummary_userid(int64_t userid, char *workername,
extern void dsp_workmarkers(K_ITEM *item, FILE *stream); extern void dsp_workmarkers(K_ITEM *item, FILE *stream);
extern cmp_t cmp_workmarkers(K_ITEM *a, K_ITEM *b); extern cmp_t cmp_workmarkers(K_ITEM *a, K_ITEM *b);
extern cmp_t cmp_workmarkers_workinfoid(K_ITEM *a, K_ITEM *b); extern cmp_t cmp_workmarkers_workinfoid(K_ITEM *a, K_ITEM *b);
extern K_ITEM *find_workmarkers(int64_t workinfoid); extern K_ITEM *find_workmarkers(int64_t workinfoid, bool anystatus, char status);
extern K_ITEM *find_workmarkerid(int64_t markerid); extern K_ITEM *find_workmarkerid(int64_t markerid, bool anystatus, char status);
extern bool workmarkers_generate(PGconn *conn, char *err, size_t siz,
char *by, char *code, char *inet, tv_t *cd,
K_TREE *trf_root);
extern cmp_t cmp_marks(K_ITEM *a, K_ITEM *b); extern cmp_t cmp_marks(K_ITEM *a, K_ITEM *b);
extern K_ITEM *find_marks(int64_t workinfoid);
extern const char *marks_marktype(char *marktype);
#define marks_description(_description, _siz, _marktype, _height, _shift, _other) \
_marks_description(_description, _siz, _marktype, _height, _shift, _other, WHERE_FFL_HERE)
extern bool _marks_description(char *description, size_t siz, char *marktype,
int32_t height, char *shift, char *other,
WHERE_FFL_ARGS);
// *** // ***
// *** PostgreSQL functions ckdb_dbio.c // *** PostgreSQL functions ckdb_dbio.c
@ -1702,7 +1752,31 @@ extern bool userstats_add(char *poolinstance, char *elapsed, char *username,
K_TREE *trf_root); K_TREE *trf_root);
extern bool userstats_fill(PGconn *conn); extern bool userstats_fill(PGconn *conn);
extern bool markersummary_fill(PGconn *conn); extern bool markersummary_fill(PGconn *conn);
#define workmarkers_process(_conn, _add, _markerid, _poolinstance, \
_workinfoidend, _workinfoidstart, _description, \
_status, _by, _code, _inet, _cd, _trf_root) \
_workmarkers_process(_conn, _add, _markerid, _poolinstance, \
_workinfoidend, _workinfoidstart, _description, \
_status, _by, _code, _inet, _cd, _trf_root, \
WHERE_FFL_HERE)
extern bool _workmarkers_process(PGconn *conn, bool add, int64_t markerid,
char *poolinstance, int64_t workinfoidend,
int64_t workinfoidstart, char *description,
char *status, char *by, char *code,
char *inet, tv_t *cd, K_TREE *trf_root,
WHERE_FFL_ARGS);
extern bool workmarkers_fill(PGconn *conn); extern bool workmarkers_fill(PGconn *conn);
#define marks_process(_conn, _add, _poolinstance, _workinfoid, _description, \
_extra, _marktype, _status, _by, _code, _inet, _cd, \
_trf_root) \
_marks_process(_conn, _add, _poolinstance, _workinfoid, _description, \
_extra, _marktype, _status, _by, _code, _inet, _cd, \
_trf_root, WHERE_FFL_HERE)
extern bool _marks_process(PGconn *conn, bool add, char *poolinstance,
int64_t workinfoid, char *description,
char *extra, char *marktype, char *status,
char *by, char *code, char *inet, tv_t *cd,
K_TREE *trf_root, WHERE_FFL_ARGS);
extern bool marks_fill(PGconn *conn); extern bool marks_fill(PGconn *conn);
extern bool check_db_version(PGconn *conn); extern bool check_db_version(PGconn *conn);

323
src/ckdb_cmd.c

@ -3157,7 +3157,7 @@ static char *cmd_pplns(__maybe_unused PGconn *conn, char *cmd, char *id,
cmp_workmarkers_workinfoid, wm_ctx); cmp_workmarkers_workinfoid, wm_ctx);
DATA_WORKMARKERS_NULL(workmarkers, wm_item); DATA_WORKMARKERS_NULL(workmarkers, wm_item);
while (total_diff < diff_want && wm_item && CURRENT(&(workmarkers->expirydate))) { while (total_diff < diff_want && wm_item && CURRENT(&(workmarkers->expirydate))) {
if (WMREADY(workmarkers->status)) { if (WMPROCESSED(workmarkers->status)) {
wm_count++; wm_count++;
lookmarkersummary.markerid = workmarkers->markerid; lookmarkersummary.markerid = workmarkers->markerid;
lookmarkersummary.userid = MAXID; lookmarkersummary.userid = MAXID;
@ -3454,6 +3454,7 @@ static char *cmd_stats(__maybe_unused PGconn *conn, char *cmd, char *id,
USEINFO(sharesummary, 1, 2); USEINFO(sharesummary, 1, 2);
USEINFO(workmarkers, 1, 2); USEINFO(workmarkers, 1, 2);
USEINFO(markersummary, 1, 2); USEINFO(markersummary, 1, 2);
USEINFO(marks, 1, 1);
USEINFO(blocks, 1, 1); USEINFO(blocks, 1, 1);
USEINFO(miningpayouts, 1, 1); USEINFO(miningpayouts, 1, 1);
USEINFO(auths, 1, 1); USEINFO(auths, 1, 1);
@ -3544,6 +3545,325 @@ static char *cmd_userstatus(PGconn *conn, char *cmd, char *id, tv_t *now, char *
return strdup(reply); return strdup(reply);
} }
/* Socket interface to the functions that will be used later to automatically
* create marks, workmarkers and process the workmarkers
* to generate markersummaries */
static char *cmd_marks(PGconn *conn, char *cmd, char *id,
__maybe_unused tv_t *now, char *by,
char *code, char *inet, tv_t *cd,
K_TREE *trf_root)
{
char reply[1024] = "";
size_t siz = sizeof(reply);
char tmp[1024] = "";
char msg[1024] = "";
K_ITEM *i_action, *i_workinfoid, *i_marktype, *i_description;
K_ITEM *i_height, *i_status, *i_extra, *m_item, *b_item, *w_item;
K_ITEM *wm_item, *wm_item_prev;
WORKMARKERS *workmarkers;
K_TREE_CTX ctx[1];
BLOCKS *blocks;
MARKS *marks;
char *action;
int64_t workinfoid = -1;
char *marktype;
int32_t height = 0;
char description[TXT_BIG+1] = { '\0' };
char extra[TXT_BIG+1] = { '\0' };
char status[TXT_FLAG+1] = { MARK_READY, '\0' };
bool ok;
LOGDEBUG("%s(): cmd '%s'", __func__, cmd);
i_action = require_name(trf_root, "action", 1, NULL, reply, siz);
if (!i_action)
return strdup(reply);
action = transfer_data(i_action);
if (strcasecmp(action, "add") == 0) {
/* Add a mark
* Require marktype
* Require workinfoid for all but 'b'
* If marktype is 'b' or 'p' then require height/block (number)
* If marktype is 'o' or 'f' then require description
* Status optional - default READY */
i_marktype = require_name(trf_root, "marktype",
1, NULL,
reply, siz);
if (!i_marktype)
return strdup(reply);
marktype = transfer_data(i_marktype);
if (marktype[0] != MARKTYPE_BLOCK) {
i_workinfoid = require_name(trf_root, "workinfoid",
1, (char *)intpatt,
reply, siz);
if (!i_workinfoid)
return strdup(reply);
TXT_TO_BIGINT("workinfoid",
transfer_data(i_workinfoid),
workinfoid);
}
switch (marktype[0]) {
case MARKTYPE_BLOCK:
case MARKTYPE_PPLNS:
i_height = require_name(trf_root,
"height",
1, (char *)intpatt,
reply, siz);
if (!i_height)
return strdup(reply);
TXT_TO_INT("height", transfer_data(i_height),
height);
K_RLOCK(blocks_free);
b_item = find_prev_blocks(height+1);
K_RUNLOCK(blocks_free);
if (b_item) {
DATA_BLOCKS(blocks, b_item);
if (blocks->height != height)
b_item = NULL;
}
if (!b_item) {
snprintf(reply, siz,
"no blocks with height %"PRId32, height);
return strdup(reply);
}
if (marktype[0] == MARKTYPE_BLOCK)
workinfoid = blocks->workinfoid;
if (!marks_description(description, sizeof(description),
marktype, height, NULL, NULL))
goto dame;
break;
case MARKTYPE_SHIFT_BEGIN:
case MARKTYPE_SHIFT_END:
snprintf(reply, siz,
"marktype %s not yet handled",
marks_marktype(marktype));
return strdup(reply);
case MARKTYPE_OTHER_BEGIN:
case MARKTYPE_OTHER_FINISH:
i_description = require_name(trf_root,
"description",
1, NULL,
reply, siz);
if (!i_description)
return strdup(reply);
if (!marks_description(description, sizeof(description),
marktype, height, NULL,
transfer_data(i_description)))
goto dame;
break;
default:
snprintf(reply, siz,
"unknown marktype '%s'", marktype);
return strdup(reply);
}
i_status = optional_name(trf_root, "status", 1, NULL, reply, siz);
if (i_status) {
STRNCPY(status, transfer_data(i_status));
switch(status[0]) {
case MARK_READY:
case MARK_USED:
case '\0':
break;
default:
snprintf(reply, siz,
"unknown mark status '%s'", status);
return strdup(reply);
}
}
if (workinfoid == -1) {
snprintf(reply, siz, "workinfoid not found");
return strdup(reply);
}
w_item = find_workinfo(workinfoid);
if (!w_item) {
snprintf(reply, siz, "invalid workinfoid %"PRId64,
workinfoid);
return strdup(reply);
}
ok = marks_process(conn, true, EMPTY, workinfoid, description,
extra, marktype, status, by, code, inet, cd,
trf_root);
} else if (strcasecmp(action, "expire") == 0) {
/* Expire the mark - effectively deletes it
* Require workinfoid */
i_workinfoid = require_name(trf_root, "workinfoid", 1, (char *)intpatt, reply, siz);
if (!i_workinfoid)
return strdup(reply);
TXT_TO_BIGINT("workinfoid", transfer_data(i_workinfoid), workinfoid);
K_RLOCK(marks_free);
m_item = find_marks(workinfoid);
K_RUNLOCK(marks_free);
if (!m_item) {
snprintf(reply, siz,
"unknown current mark with workinfoid %"PRId64, workinfoid);
return strdup(reply);
}
ok = marks_process(conn, false, EMPTY, workinfoid, NULL,
NULL, NULL, NULL, by, code, inet, cd,
trf_root);
} else if (strcasecmp(action, "status") == 0) {
/* Change the status on a mark
* Require workinfoid and status
* N.B. you can cause generate errors if you change the status of a USED marks */
i_workinfoid = require_name(trf_root, "workinfoid", 1, (char *)intpatt, reply, siz);
if (!i_workinfoid)
return strdup(reply);
TXT_TO_BIGINT("workinfoid", transfer_data(i_workinfoid), workinfoid);
K_RLOCK(marks_free);
m_item = find_marks(workinfoid);
K_RUNLOCK(marks_free);
if (!m_item) {
snprintf(reply, siz,
"unknown current mark with workinfoid %"PRId64, workinfoid);
return strdup(reply);
}
DATA_MARKS(marks, m_item);
i_status = require_name(trf_root, "status", 0, NULL, reply, siz);
if (!i_status)
return strdup(reply);
STRNCPY(status, transfer_data(i_status));
switch(status[0]) {
case MARK_READY:
case MARK_USED:
case '\0':
break;
default:
snprintf(reply, siz,
"unknown mark status '%s'", status);
return strdup(reply);
}
// Unchanged
if (strcmp(status, marks->status) == 0) {
action = "status-unchanged";
ok = true;
} else {
ok = marks_process(conn, true, marks->poolinstance,
workinfoid, marks->description,
marks->extra, marks->marktype,
status, by, code, inet, cd,
trf_root);
}
} else if (strcasecmp(action, "extra") == 0) {
/* Change the 'extra' description
* Require workinfoid and extra
* If a mark is actually multiple marks with the same
* workinfoid, then we can record the extra info here
* This would be true of each block, once shifts are
* implemented, since the current shift ends when a
* block is found
* This could also be true, very rarely, if the beginning
* of a pplns payout range matched any other mark,
* since the beginning can be any workinfoid */
i_workinfoid = require_name(trf_root, "workinfoid", 1, (char *)intpatt, reply, siz);
if (!i_workinfoid)
return strdup(reply);
TXT_TO_BIGINT("workinfoid", transfer_data(i_workinfoid), workinfoid);
K_RLOCK(marks_free);
m_item = find_marks(workinfoid);
K_RUNLOCK(marks_free);
if (!m_item) {
snprintf(reply, siz,
"unknown current mark with workinfoid %"PRId64, workinfoid);
return strdup(reply);
}
DATA_MARKS(marks, m_item);
i_extra = require_name(trf_root, "extra", 0, NULL, reply, siz);
if (!i_extra)
return strdup(reply);
STRNCPY(extra, transfer_data(i_extra));
// Unchanged
if (strcmp(extra, marks->extra) == 0) {
action = "extra-unchanged";
ok = true;
} else {
ok = marks_process(conn, true, marks->poolinstance,
workinfoid, marks->description,
extra, marks->marktype,
status, by, code, inet, cd,
trf_root);
}
} else if (strcasecmp(action, "generate") == 0) {
/* Generate workmarkers
* No parameters */
tmp[0] = '\0';
ok = workmarkers_generate(conn, tmp, sizeof(tmp),
by, code, inet, cd, trf_root);
if (!ok) {
snprintf(reply, siz, "%s error: %s", action, tmp);
LOGERR("%s.%s", id, reply);
return strdup(reply);
}
if (*tmp) {
snprintf(reply, siz, "%s: %s", action, tmp);
LOGWARNING("%s.%s", id, reply);
}
} else if (strcasecmp(action, "expunge") == 0) {
/* Expire all generated workmarkers that aren't PROCESSED
* No parameters
* This exists so we can fix all workmarkers that haven't
* been PROCESSED yet,
* if there was a problem with the marks
* Simply expunge all the workmarkers, correct the marks,
* then generate the workmarkers again
* WARNING - using psql to do the worksummary generation
* will not update the workmarkers status inside ckdb
* so this will expunge those worksummary records also
* You'll need to restart ckdb after using psql */
int count = 0;
ok = true;
wm_item_prev = NULL;
K_RLOCK(workmarkers_free);
wm_item = last_in_ktree(workmarkers_root, ctx);
K_RUNLOCK(workmarkers_free);
while (wm_item) {
K_RLOCK(workmarkers_free);
wm_item_prev = prev_in_ktree(ctx);
K_RUNLOCK(workmarkers_free);
DATA_WORKMARKERS(workmarkers, wm_item);
if (CURRENT(&(workmarkers->expirydate)) &&
!WMPROCESSED(workmarkers->status)) {
ok = workmarkers_process(conn, false,
workmarkers->markerid,
NULL, 0, 0, NULL, NULL, by,
code, inet, cd, trf_root);
if (!ok)
break;
count++;
}
wm_item = wm_item_prev;
}
if (ok) {
if (count == 0) {
snprintf(msg, sizeof(msg),
"no unprocessed current workmarkers");
} else {
snprintf(msg, sizeof(msg),
"%d workmarkers expunged", count);
}
}
} else {
snprintf(reply, siz, "unknown action '%s'", action);
LOGERR("%s.%s", id, reply);
return strdup(reply);
}
if (!ok) {
dame:
LOGERR("%s() %s.failed.DBE", __func__, id);
return strdup("failed.DBE");
}
if (msg[0])
snprintf(reply, siz, "ok.%s %s", action, msg);
else
snprintf(reply, siz, "ok.%s", action);
LOGWARNING("%s.%s", id, reply);
return strdup(reply);
}
// TODO: limit access by having seperate sockets for each // TODO: limit access by having seperate sockets for each
#define ACCESS_POOL "p" #define ACCESS_POOL "p"
#define ACCESS_SYSTEM "s" #define ACCESS_SYSTEM "s"
@ -3647,5 +3967,6 @@ struct CMDS ckdb_cmds[] = {
{ CMD_STATS, "stats", true, false, cmd_stats, ACCESS_SYSTEM ACCESS_WEB }, { CMD_STATS, "stats", true, false, cmd_stats, ACCESS_SYSTEM ACCESS_WEB },
{ CMD_PPLNS, "pplns", false, false, cmd_pplns, ACCESS_SYSTEM ACCESS_WEB }, { CMD_PPLNS, "pplns", false, false, cmd_pplns, ACCESS_SYSTEM ACCESS_WEB },
{ CMD_USERSTATUS,"userstatus", false, false, cmd_userstatus, ACCESS_SYSTEM ACCESS_WEB }, { CMD_USERSTATUS,"userstatus", false, false, cmd_userstatus, ACCESS_SYSTEM ACCESS_WEB },
{ CMD_MARKS, "marks", false, false, cmd_marks, ACCESS_SYSTEM },
{ CMD_END, NULL, false, false, NULL, NULL } { CMD_END, NULL, false, false, NULL, NULL }
}; };

398
src/ckdb_data.c

@ -697,7 +697,8 @@ void workerstatus_ready()
if (ms_item) { if (ms_item) {
DATA_MARKERSUMMARY(markersummary, ms_item); DATA_MARKERSUMMARY(markersummary, ms_item);
K_RLOCK(workmarkers_free); K_RLOCK(workmarkers_free);
wm_item = find_workmarkerid(markersummary->markerid); wm_item = find_workmarkerid(markersummary->markerid,
false, MARKER_PROCESSED);
K_RUNLOCK(workmarkers_free); K_RUNLOCK(workmarkers_free);
if (wm_item && if (wm_item &&
tv_newer(&(workerstatus->last_share), &(markersummary->lastshare))) { tv_newer(&(workerstatus->last_share), &(markersummary->lastshare))) {
@ -1357,9 +1358,9 @@ bool workinfo_age(PGconn *conn, int64_t workinfoid, char *poolinstance,
goto bye; goto bye;
} }
K_RLOCK(markersummary_free); K_RLOCK(workmarkers_free);
wm_item = find_workmarkers(workinfoid); wm_item = find_workmarkers(workinfoid, false, MARKER_PROCESSED);
K_RUNLOCK(markersummary_free); K_RUNLOCK(workmarkers_free);
// Should never happen? // Should never happen?
if (wm_item && !reloading) { if (wm_item && !reloading) {
tv_to_buf(cd, cd_buf, sizeof(cd_buf)); tv_to_buf(cd, cd_buf, sizeof(cd_buf));
@ -2020,7 +2021,7 @@ void set_block_share_counters()
CURRENT(&(workmarkers->expirydate)) && CURRENT(&(workmarkers->expirydate)) &&
workmarkers->workinfoidend > pool.workinfoid) { workmarkers->workinfoidend > pool.workinfoid) {
if (WMREADY(workmarkers->status)) if (WMPROCESSED(workmarkers->status))
{ {
// Should never be true // Should never be true
if (workmarkers->workinfoidstart <= pool.workinfoid) { if (workmarkers->workinfoidstart <= pool.workinfoid) {
@ -2332,7 +2333,7 @@ K_ITEM *find_markersummary(int64_t workinfoid, int64_t userid, char *workername)
WORKMARKERS *wm; WORKMARKERS *wm;
K_TREE_CTX ctx[1]; K_TREE_CTX ctx[1];
wm_item = find_workmarkers(workinfoid); wm_item = find_workmarkers(workinfoid, false, MARKER_PROCESSED);
if (wm_item) { if (wm_item) {
DATA_WORKMARKERS(wm, wm_item); DATA_WORKMARKERS(wm, wm_item);
markersummary.markerid = wm->markerid; markersummary.markerid = wm->markerid;
@ -2388,7 +2389,7 @@ cmp_t cmp_workmarkers_workinfoid(K_ITEM *a, K_ITEM *b)
return c; return c;
} }
K_ITEM *find_workmarkers(int64_t workinfoid) K_ITEM *find_workmarkers(int64_t workinfoid, bool anystatus, char status)
{ {
WORKMARKERS workmarkers, *wm; WORKMARKERS workmarkers, *wm;
K_TREE_CTX ctx[1]; K_TREE_CTX ctx[1];
@ -2404,7 +2405,7 @@ K_ITEM *find_workmarkers(int64_t workinfoid)
if (wm_item) { if (wm_item) {
DATA_WORKMARKERS(wm, wm_item); DATA_WORKMARKERS(wm, wm_item);
if (!CURRENT(&(wm->expirydate)) || if (!CURRENT(&(wm->expirydate)) ||
!WMREADY(wm->status) || (!anystatus && wm->status[0] != status) ||
workinfoid < wm->workinfoidstart || workinfoid < wm->workinfoidstart ||
workinfoid > wm->workinfoidend) workinfoid > wm->workinfoidend)
wm_item = NULL; wm_item = NULL;
@ -2412,7 +2413,7 @@ K_ITEM *find_workmarkers(int64_t workinfoid)
return wm_item; return wm_item;
} }
K_ITEM *find_workmarkerid(int64_t markerid) K_ITEM *find_workmarkerid(int64_t markerid, bool anystatus, char status)
{ {
WORKMARKERS workmarkers, *wm; WORKMARKERS workmarkers, *wm;
K_TREE_CTX ctx[1]; K_TREE_CTX ctx[1];
@ -2428,12 +2429,269 @@ K_ITEM *find_workmarkerid(int64_t markerid)
if (wm_item) { if (wm_item) {
DATA_WORKMARKERS(wm, wm_item); DATA_WORKMARKERS(wm, wm_item);
if (!CURRENT(&(wm->expirydate)) || if (!CURRENT(&(wm->expirydate)) ||
!WMREADY(wm->status)) (!anystatus && wm->status[0] != status))
wm_item = NULL; wm_item = NULL;
} }
return wm_item; return wm_item;
} }
// Create one
static bool gen_workmarkers(PGconn *conn, MARKS *stt, bool after, MARKS *fin,
bool before, char *by, char *code, char *inet,
tv_t *cd, K_TREE *trf_root)
{
K_ITEM look, *wi_stt_item, *wi_fin_item, *old_wm_item;
WORKMARKERS *old_wm;
WORKINFO workinfo, *wi_stt, *wi_fin;
K_TREE_CTX ctx[1];
char description[TXT_BIG+1];
bool ok;
workinfo.workinfoid = stt->workinfoid;
workinfo.expirydate.tv_sec = default_expiry.tv_sec;
workinfo.expirydate.tv_usec = default_expiry.tv_usec;
INIT_WORKINFO(&look);
look.data = (void *)(&workinfo);
K_RLOCK(workinfo_free);
if (after) {
wi_stt_item = find_after_in_ktree(workinfo_root, &look,
cmp_workinfo, ctx);
while (wi_stt_item) {
DATA_WORKINFO(wi_stt, wi_stt_item);
if (CURRENT(&(wi_stt->expirydate)))
break;
wi_stt_item = next_in_ktree(ctx);
}
} else {
wi_stt_item = find_in_ktree(workinfo_root, &look,
cmp_workinfo, ctx);
DATA_WORKINFO_NULL(wi_stt, wi_stt_item);
}
K_RUNLOCK(workinfo_free);
if (!wi_stt_item)
return false;
if (!CURRENT(&(wi_stt->expirydate)))
return false;
workinfo.workinfoid = fin->workinfoid;
INIT_WORKINFO(&look);
look.data = (void *)(&workinfo);
K_RLOCK(workinfo_free);
if (before) {
workinfo.expirydate.tv_sec = 0;
workinfo.expirydate.tv_usec = 0;
wi_fin_item = find_before_in_ktree(workinfo_root, &look,
cmp_workinfo, ctx);
while (wi_fin_item) {
DATA_WORKINFO(wi_fin, wi_fin_item);
if (CURRENT(&(wi_fin->expirydate)))
break;
wi_fin_item = prev_in_ktree(ctx);
}
} else {
workinfo.expirydate.tv_sec = default_expiry.tv_sec;
workinfo.expirydate.tv_usec = default_expiry.tv_usec;
wi_fin_item = find_in_ktree(workinfo_root, &look,
cmp_workinfo, ctx);
DATA_WORKINFO_NULL(wi_fin, wi_fin_item);
}
K_RUNLOCK(workinfo_free);
if (!wi_fin_item)
return false;
if (!CURRENT(&(wi_fin->expirydate)))
return false;
/* If two marks in a row are fin(+after) then stt(-before),
* it may be that there should be no workmarkers range between them
* This may show up as the calculated finish id being before
* the start id - so no need to create it since it will be empty
* Also note that empty workmarkers are not a problem,
* but simply unnecessary and in this specific case,
* we don't create it since a negative range would cause tree
* sort order and matching errors */
if (wi_fin->workinfoid >= wi_stt->workinfoid) {
K_RLOCK(workmarkers_free);
old_wm_item = find_workmarkers(wi_fin->workinfoid, true, '\0');
K_RUNLOCK(workmarkers_free);
DATA_WORKMARKERS_NULL(old_wm, old_wm_item);
if (old_wm_item && (WMREADY(old_wm->status) ||
WMPROCESSED(old_wm->status))) {
/* This actually means a code bug or a DB marks has
* been set incorrectly via cmd_marks (or pgsql) */
LOGEMERG("%s(): marks workinfoid %"PRId64" matches or"
" is part of the existing markerid %"PRId64,
__func__, wi_fin->workinfoid,
old_wm->markerid);
return false;
}
snprintf(description, sizeof(description), "%s%s to %s%s",
stt->description, after ? "++" : "",
fin->description, before ? "--" : "");
ok = workmarkers_process(conn, true, 0, EMPTY,
wi_fin->workinfoid, wi_stt->workinfoid,
description, MARKER_READY_STR,
by, code, inet, cd, trf_root);
if (!ok)
return false;
}
ok = marks_process(conn, true, EMPTY, fin->workinfoid,
fin->description, fin->extra, fin->marktype,
MARK_USED_STR, by, code, inet, cd, trf_root);
return ok;
}
/* Generate workmarkers from the last USED mark
* Will only use the last USED mark and the contiguous READY
* marks after the last USED mark
* If a mark is found not READY it will stop at that one and
* report success with a message regarding the not READY one
* No checks are done for the validity of the mark status
* information, however, until the next step of generating
* the markersummaries is completely automated, rather than
* simply running the SQL script manually, the existence of
a workmarker wont actually do anything automatically */
bool workmarkers_generate(PGconn *conn, char *err, size_t siz, char *by,
char *code, char *inet, tv_t *cd, K_TREE *trf_root)
{
K_ITEM *m_item, *m_next_item;
MARKS *mused, *mnext;
MARKS marks;
K_TREE_CTX ctx[1];
K_ITEM look;
bool any = false, ok;
marks.expirydate.tv_sec = default_expiry.tv_sec;
marks.expirydate.tv_usec = default_expiry.tv_usec;
marks.workinfoid = MAXID;
INIT_MARKS(&look);
look.data = (void *)(&marks);
K_RLOCK(marks_free);
m_item = find_before_in_ktree(marks_root, &look, cmp_marks, ctx);
while (m_item) {
DATA_MARKS(mused, m_item);
if (CURRENT(&(mused->expirydate)) && MUSED(mused->status))
break;
m_item = prev_in_ktree(ctx);
}
K_RUNLOCK(marks_free);
if (!m_item || !CURRENT(&(mused->expirydate)) || !MUSED(mused->status)) {
snprintf(err, siz, "%s", "No trailing used mark found");
return false;
}
K_RLOCK(marks_free);
m_item = next_in_ktree(ctx);
K_RUNLOCK(marks_free);
while (m_item) {
DATA_MARKS(mnext, m_item);
if (!CURRENT(&(mnext->expirydate)) || !!MREADY(mused->status))
break;
/* We need to get the next marks in advance since
* gen_workmarker will create a new m_item flagged USED
* and the tree position ctx for m_item will no longer give
* us the correct 'next'
* However, we can still use mnext as mused in the subsequent
* loop since the data that we need hasn't been changed
*/
K_RLOCK(marks_free);
m_next_item = next_in_ktree(ctx);
K_RUNLOCK(marks_free);
// save code space ...
#define GENWM(m1, b1, m2, b2) \
gen_workmarkers(conn, m1, b1, m2, b2, by, code, inet, cd, trf_root)
ok = true;
switch(mused->marktype[0]) {
case MARKTYPE_BLOCK:
case MARKTYPE_SHIFT_END:
case MARKTYPE_OTHER_FINISH:
switch(mnext->marktype[0]) {
case MARKTYPE_BLOCK:
case MARKTYPE_SHIFT_END:
case MARKTYPE_OTHER_FINISH:
ok = GENWM(mused, true, mnext, false);
if (ok)
any = true;
break;
case MARKTYPE_PPLNS:
case MARKTYPE_SHIFT_BEGIN:
case MARKTYPE_OTHER_BEGIN:
ok = GENWM(mused, true, mnext, true);
if (ok)
any = true;
break;
default:
snprintf(err, siz,
"Mark %"PRId64" has"
" an unknown marktype"
" '%s' - aborting",
mnext->workinfoid,
mnext->marktype);
return false;
}
break;
case MARKTYPE_PPLNS:
case MARKTYPE_SHIFT_BEGIN:
case MARKTYPE_OTHER_BEGIN:
switch(mnext->marktype[0]) {
case MARKTYPE_BLOCK:
case MARKTYPE_SHIFT_END:
case MARKTYPE_OTHER_FINISH:
ok = GENWM(mused, false, mnext, false);
if (ok)
any = true;
break;
case MARKTYPE_PPLNS:
case MARKTYPE_SHIFT_BEGIN:
case MARKTYPE_OTHER_BEGIN:
ok = GENWM(mused, false, mnext, true);
if (ok)
any = true;
break;
default:
snprintf(err, siz,
"Mark %"PRId64" has"
" an unknown marktype"
" '%s' - aborting",
mnext->workinfoid,
mnext->marktype);
return false;
}
break;
default:
snprintf(err, siz,
"Mark %"PRId64" has an unknown "
"marktype '%s' - aborting",
mused->workinfoid,
mused->marktype);
return false;
}
if (!ok) {
snprintf(err, siz,
"Processing marks %"PRId64" to "
"%"PRId64" failed - aborting",
mused->workinfoid,
mnext->workinfoid);
return false;
}
mused = mnext;
m_item = m_next_item;
}
if (!any) {
snprintf(err, siz, "%s", "No ready marks found");
return false;
}
return true;
}
// order by expirydate asc,workinfoid asc // order by expirydate asc,workinfoid asc
// TODO: add poolinstance // TODO: add poolinstance
cmp_t cmp_marks(K_ITEM *a, K_ITEM *b) cmp_t cmp_marks(K_ITEM *a, K_ITEM *b)
@ -2447,3 +2705,123 @@ cmp_t cmp_marks(K_ITEM *a, K_ITEM *b)
return c; return c;
} }
K_ITEM *find_marks(int64_t workinfoid)
{
MARKS marks;
K_TREE_CTX ctx[1];
K_ITEM look;
marks.expirydate.tv_sec = default_expiry.tv_sec;
marks.expirydate.tv_usec = default_expiry.tv_usec;
marks.workinfoid = workinfoid;
INIT_MARKS(&look);
look.data = (void *)(&marks);
return find_in_ktree(marks_root, &look, cmp_marks, ctx);
}
const char *marks_marktype(char *marktype)
{
switch (marktype[0]) {
case MARKTYPE_BLOCK:
return marktype_block;
case MARKTYPE_PPLNS:
return marktype_pplns;
case MARKTYPE_SHIFT_BEGIN:
return marktype_shift_begin;
case MARKTYPE_SHIFT_END:
return marktype_shift_end;
case MARKTYPE_OTHER_BEGIN:
return marktype_other_begin;
case MARKTYPE_OTHER_FINISH:
return marktype_other_finish;
}
return NULL;
}
bool _marks_description(char *description, size_t siz, char *marktype,
int32_t height, char *shift, char *other,
WHERE_FFL_ARGS)
{
switch (marktype[0]) {
case MARKTYPE_BLOCK:
if (height < START_POOL_HEIGHT) {
LOGERR("%s() invalid pool height %"PRId32
"for mark %s " WHERE_FFL,
__func__, height,
marks_marktype(marktype),
WHERE_FFL_PASS);
return false;
}
snprintf(description, siz,
marktype_block_fmt, height);
break;
case MARKTYPE_PPLNS:
if (height < START_POOL_HEIGHT) {
LOGERR("%s() invalid pool height %"PRId32
"for mark %s " WHERE_FFL,
__func__, height,
marks_marktype(marktype),
WHERE_FFL_PASS);
return false;
}
snprintf(description, siz,
marktype_pplns_fmt, height);
break;
case MARKTYPE_SHIFT_BEGIN:
if (shift == NULL || !*shift) {
LOGERR("%s() invalid mark shift NULL/empty "
"for mark %s " WHERE_FFL,
__func__,
marks_marktype(marktype),
WHERE_FFL_PASS);
return false;
}
snprintf(description, siz,
marktype_shift_begin_fmt, shift);
break;
case MARKTYPE_SHIFT_END:
if (shift == NULL || !*shift) {
LOGERR("%s() invalid mark shift NULL/empty "
"for mark %s " WHERE_FFL,
__func__,
marks_marktype(marktype),
WHERE_FFL_PASS);
return false;
}
snprintf(description, siz,
marktype_shift_end_fmt, shift);
break;
case MARKTYPE_OTHER_BEGIN:
if (other == NULL) {
LOGERR("%s() invalid mark other NULL/empty "
"for mark %s " WHERE_FFL,
__func__,
marks_marktype(marktype),
WHERE_FFL_PASS);
return false;
}
snprintf(description, siz,
marktype_other_begin_fmt, other);
break;
case MARKTYPE_OTHER_FINISH:
if (other == NULL) {
LOGERR("%s() invalid mark other NULL/empty "
"for mark %s " WHERE_FFL,
__func__,
marks_marktype(marktype),
WHERE_FFL_PASS);
return false;
}
snprintf(description, siz,
marktype_other_finish_fmt, other);
break;
default:
LOGERR("%s() invalid marktype '%s'" WHERE_FFL,
__func__, marktype,
WHERE_FFL_PASS);
return false;
}
return true;
}

442
src/ckdb_dbio.c

@ -2473,8 +2473,9 @@ bool shares_add(PGconn *conn, char *workinfoid, char *username, char *workername
goto unitem; goto unitem;
if (reloading && !confirm_sharesummary) { if (reloading && !confirm_sharesummary) {
// We only need to know if the workmarker is ready // We only need to know if the workmarker is processed
wm_item = find_workmarkers(shares->workinfoid); wm_item = find_workmarkers(shares->workinfoid, false,
MARKER_PROCESSED);
if (wm_item) { if (wm_item) {
K_WLOCK(shares_free); K_WLOCK(shares_free);
k_add_head(shares_free, s_item); k_add_head(shares_free, s_item);
@ -2590,8 +2591,9 @@ bool shareerrors_add(PGconn *conn, char *workinfoid, char *username,
goto unitem; goto unitem;
if (reloading && !confirm_sharesummary) { if (reloading && !confirm_sharesummary) {
// We only need to know if the workmarker is ready // We only need to know if the workmarker is processed
wm_item = find_workmarkers(shareerrors->workinfoid); wm_item = find_workmarkers(shareerrors->workinfoid, false,
MARKER_PROCESSED);
if (wm_item) { if (wm_item) {
K_WLOCK(shareerrors_free); K_WLOCK(shareerrors_free);
k_add_head(shareerrors_free, s_item); k_add_head(shareerrors_free, s_item);
@ -2693,14 +2695,14 @@ bool _sharesummary_update(PGconn *conn, SHARES *s_row, SHAREERRORS *e_row, K_ITE
} }
K_RLOCK(workmarkers_free); K_RLOCK(workmarkers_free);
wm_item = find_workmarkers(workinfoid); wm_item = find_workmarkers(workinfoid, false, MARKER_PROCESSED);
K_RUNLOCK(workmarkers_free); K_RUNLOCK(workmarkers_free);
if (wm_item) { if (wm_item) {
char *tmp; char *tmp;
DATA_WORKMARKERS(wm, wm_item); DATA_WORKMARKERS(wm, wm_item);
LOGERR("%s(): attempt to update sharesummary " LOGERR("%s(): attempt to update sharesummary "
"with %s %"PRId64"/%"PRId64"/%s createdate %s" "with %s %"PRId64"/%"PRId64"/%s createdate %s"
" but ready workmarkers %"PRId64" exists", " but processed workmarkers %"PRId64" exists",
__func__, s_row ? "shares" : "shareerrors", __func__, s_row ? "shares" : "shareerrors",
workinfoid, userid, workername, workinfoid, userid, workername,
(tmp = ctv_to_buf(sharecreatedate, NULL, 0)), (tmp = ctv_to_buf(sharecreatedate, NULL, 0)),
@ -5009,6 +5011,231 @@ bool markersummary_fill(PGconn *conn)
return ok; return ok;
} }
/* Add means create a new one and expire the old one if it exists,
* otherwise we only expire the old one if it exists
* Add requires all db fields except markerid, however if markerid
* is non-zero, it will be used instead of getting a new one
* i.e. this effectively means updating a workmarker
* !Add requires markerid or workinfoidend, only
* workinfoidend is used if markerid is zero
* N.B. if you expire a workmarker without creating a new one,
* it's markerid is effectively cancelled, since creating a
* new matching workmarker later, will get a new markerid,
* since we only check for a CURRENT workmarkers
* N.B. also, this returns success if !add and there is no matching
* old workmarkers */
bool _workmarkers_process(PGconn *conn, bool add, int64_t markerid,
char *poolinstance, int64_t workinfoidend,
int64_t workinfoidstart, char *description,
char *status, char *by, char *code,
char *inet, tv_t *cd, K_TREE *trf_root,
WHERE_FFL_ARGS)
{
ExecStatusType rescode;
bool conned = false;
PGresult *res = NULL;
K_ITEM *wm_item = NULL, *old_wm_item = NULL, *w_item;
WORKMARKERS *row, *oldworkmarkers;
char *upd, *ins;
char *params[6 + HISTORYDATECOUNT];
bool ok = false, begun = false;
int n, par = 0;
LOGDEBUG("%s(): add", __func__);
if (markerid == 0) {
K_RLOCK(workmarkers_free);
old_wm_item = find_workmarkers(workinfoidend, true, '\0');
K_RUNLOCK(workmarkers_free);
} else {
K_RLOCK(workmarkers_free);
old_wm_item = find_workmarkerid(markerid, true, '\0');
K_RUNLOCK(workmarkers_free);
}
if (old_wm_item) {
DATA_WORKMARKERS(oldworkmarkers, old_wm_item);
if (!conn) {
conn = dbconnect();
conned = true;
}
res = PQexec(conn, "Begin", CKPQ_WRITE);
rescode = PQresultStatus(res);
PQclear(res);
if (!PGOK(rescode)) {
PGLOGERR("Begin", rescode, conn);
goto unparam;
}
begun = true;
upd = "update workmarkers set expirydate=$1 where markerid=$2"
" and expirydate=$3";
par = 0;
params[par++] = tv_to_buf(cd, NULL, 0);
params[par++] = bigint_to_buf(oldworkmarkers->markerid, NULL, 0);
params[par++] = tv_to_buf((tv_t *)&default_expiry, NULL, 0);
PARCHKVAL(par, 3, params);
res = PQexecParams(conn, upd, par, NULL, (const char **)params, NULL, NULL, 0, CKPQ_WRITE);
rescode = PQresultStatus(res);
PQclear(res);
if (!PGOK(rescode)) {
PGLOGERR("Update", rescode, conn);
goto rollback;
}
for (n = 0; n < par; n++)
free(params[n]);
par = 0;
}
if (add) {
if (poolinstance == NULL || description == NULL ||
status == NULL) {
LOGEMERG("%s(): NULL field(s) passed:%s%s%s"
WHERE_FFL, __func__,
poolinstance ? "" : " poolinstance",
description ? "" : " description",
status ? "" : " status",
WHERE_FFL_PASS);
goto rollback;
}
w_item = find_workinfo(workinfoidend);
if (!w_item)
goto rollback;
w_item = find_workinfo(workinfoidstart);
if (!w_item)
goto rollback;
K_WLOCK(workmarkers_free);
wm_item = k_unlink_head(workmarkers_free);
K_WUNLOCK(workmarkers_free);
DATA_WORKMARKERS(row, wm_item);
bzero(row, sizeof(*row));
if (conn == NULL) {
conn = dbconnect();
conned = true;
}
if (!begun) {
res = PQexec(conn, "Begin", CKPQ_WRITE);
rescode = PQresultStatus(res);
PQclear(res);
if (!PGOK(rescode)) {
PGLOGERR("Begin", rescode, conn);
goto unparam;
}
begun = true;
}
if (old_wm_item)
row->markerid = oldworkmarkers->markerid;
else {
if (markerid != 0)
row->markerid = markerid;
else {
row->markerid = nextid(conn, "markerid", 1,
cd, by, code, inet);
if (row->markerid == 0)
goto rollback;
}
}
row->poolinstance = strdup(poolinstance);
LIST_MEM_ADD(workmarkers_free, poolinstance);
row->workinfoidend = workinfoidend;
row->workinfoidstart = workinfoidstart;
row->description = strdup(description);
LIST_MEM_ADD(workmarkers_free, description);
STRNCPY(row->status, status);
HISTORYDATEINIT(row, cd, by, code, inet);
HISTORYDATETRANSFER(trf_root, row);
ins = "insert into workmarkers "
"(markerid,poolinstance,workinfoidend,workinfoidstart,"
"description,status"
HISTORYDATECONTROL ") values (" PQPARAM11 ")";
par = 0;
params[par++] = bigint_to_buf(row->markerid, NULL, 0);
params[par++] = str_to_buf(row->poolinstance, NULL, 0);
params[par++] = bigint_to_buf(row->workinfoidend, NULL, 0);
params[par++] = bigint_to_buf(row->workinfoidstart, NULL, 0);
params[par++] = str_to_buf(row->description, NULL, 0);
params[par++] = str_to_buf(row->status, NULL, 0);
HISTORYDATEPARAMS(params, par, row);
PARCHK(par, params);
res = PQexecParams(conn, ins, par, NULL, (const char **)params, NULL, NULL, 0, CKPQ_WRITE);
rescode = PQresultStatus(res);
PQclear(res);
if (!PGOK(rescode)) {
PGLOGERR("Insert", rescode, conn);
goto rollback;
}
}
ok = true;
rollback:
if (begun) {
if (ok)
res = PQexec(conn, "Commit", CKPQ_WRITE);
else
res = PQexec(conn, "Rollback", CKPQ_WRITE);
PQclear(res);
}
unparam:
for (n = 0; n < par; n++)
free(params[n]);
if (conned)
PQfinish(conn);
K_WLOCK(workmarkers_free);
if (!ok) {
if (wm_item) {
DATA_WORKMARKERS(row, wm_item);
if (row->poolinstance) {
if (row->poolinstance != EMPTY) {
LIST_MEM_SUB(workmarkers_free,
row->poolinstance);
free(row->poolinstance);
}
row->poolinstance = NULL;
}
if (row->description) {
if (row->description != EMPTY) {
LIST_MEM_SUB(workmarkers_free,
row->description);
free(row->description);
}
row->description = NULL;
}
k_add_head(workmarkers_free, wm_item);
}
}
else {
if (old_wm_item) {
workmarkers_root = remove_from_ktree(workmarkers_root,
old_wm_item,
cmp_workmarkers);
copy_tv(&(oldworkmarkers->expirydate), cd);
workmarkers_root = add_to_ktree(workmarkers_root,
old_wm_item,
cmp_workmarkers);
}
if (wm_item) {
workmarkers_root = add_to_ktree(workmarkers_root,
wm_item,
cmp_workmarkers);
k_add_head(workmarkers_store, wm_item);
}
}
K_WUNLOCK(workmarkers_free);
return ok;
}
bool workmarkers_fill(PGconn *conn) bool workmarkers_fill(PGconn *conn)
{ {
ExecStatusType rescode; ExecStatusType rescode;
@ -5111,6 +5338,200 @@ bool workmarkers_fill(PGconn *conn)
return ok; return ok;
} }
/* Add means create a new one and expire the old one if it exists,
* otherwise we only expire the old one if it exists
* Add requires all db fields
* !Add only requires the (poolinstance and) workinfoid db fields */
bool _marks_process(PGconn *conn, bool add, char *poolinstance,
int64_t workinfoid, char *description, char *extra,
char *marktype, char *status, char *by, char *code,
char *inet, tv_t *cd, K_TREE *trf_root, WHERE_FFL_ARGS)
{
ExecStatusType rescode;
bool conned = false;
PGresult *res = NULL;
K_ITEM *m_item = NULL, *old_m_item = NULL, *w_item;
MARKS *row, *oldmarks;
char *upd, *ins;
char *params[6 + HISTORYDATECOUNT];
bool ok = false, begun = false;
int n, par = 0;
LOGDEBUG("%s(): add", __func__);
K_RLOCK(marks_free);
old_m_item = find_marks(workinfoid);
K_RUNLOCK(marks_free);
if (old_m_item) {
DATA_MARKS(oldmarks, old_m_item);
if (!conn) {
conn = dbconnect();
conned = true;
}
res = PQexec(conn, "Begin", CKPQ_WRITE);
rescode = PQresultStatus(res);
PQclear(res);
if (!PGOK(rescode)) {
PGLOGERR("Begin", rescode, conn);
goto unparam;
}
begun = true;
upd = "update marks set expirydate=$1 where workinfoid=$2"
" and expirydate=$3";
par = 0;
params[par++] = tv_to_buf(cd, NULL, 0);
params[par++] = bigint_to_buf(workinfoid, NULL, 0);
params[par++] = tv_to_buf((tv_t *)&default_expiry, NULL, 0);
PARCHKVAL(par, 3, params);
res = PQexecParams(conn, upd, par, NULL, (const char **)params, NULL, NULL, 0, CKPQ_WRITE);
rescode = PQresultStatus(res);
PQclear(res);
if (!PGOK(rescode)) {
PGLOGERR("Update", rescode, conn);
goto rollback;
}
for (n = 0; n < par; n++)
free(params[n]);
par = 0;
}
if (add) {
if (poolinstance == NULL || description == NULL ||
extra == NULL || marktype == NULL || status == NULL) {
LOGEMERG("%s(): NULL field(s) passed:%s%s%s%s%s"
WHERE_FFL, __func__,
poolinstance ? "" : " poolinstance",
description ? "" : " description",
extra ? "" : " extra",
marktype ? "" : " marktype",
status ? "" : " status",
WHERE_FFL_PASS);
goto rollback;
}
w_item = find_workinfo(workinfoid);
if (!w_item)
goto rollback;
K_WLOCK(marks_free);
m_item = k_unlink_head(marks_free);
K_WUNLOCK(marks_free);
DATA_MARKS(row, m_item);
bzero(row, sizeof(*row));
row->poolinstance = strdup(poolinstance);
LIST_MEM_ADD(marks_free, poolinstance);
row->workinfoid = workinfoid;
row->description = strdup(description);
LIST_MEM_ADD(marks_free, description);
row->extra = strdup(extra);
LIST_MEM_ADD(marks_free, extra);
STRNCPY(row->marktype, marktype);
STRNCPY(row->status, status);
HISTORYDATEINIT(row, cd, by, code, inet);
HISTORYDATETRANSFER(trf_root, row);
ins = "insert into marks "
"(poolinstance,workinfoid,description,extra,marktype,"
"status"
HISTORYDATECONTROL ") values (" PQPARAM11 ")";
par = 0;
params[par++] = str_to_buf(row->poolinstance, NULL, 0);
params[par++] = bigint_to_buf(workinfoid, NULL, 0);
params[par++] = str_to_buf(row->description, NULL, 0);
params[par++] = str_to_buf(row->extra, NULL, 0);
params[par++] = str_to_buf(row->marktype, NULL, 0);
params[par++] = str_to_buf(row->status, NULL, 0);
HISTORYDATEPARAMS(params, par, row);
PARCHK(par, params);
if (conn == NULL) {
conn = dbconnect();
conned = true;
}
if (!begun) {
res = PQexec(conn, "Begin", CKPQ_WRITE);
rescode = PQresultStatus(res);
PQclear(res);
if (!PGOK(rescode)) {
PGLOGERR("Begin", rescode, conn);
goto unparam;
}
begun = true;
}
res = PQexecParams(conn, ins, par, NULL, (const char **)params, NULL, NULL, 0, CKPQ_WRITE);
rescode = PQresultStatus(res);
PQclear(res);
if (!PGOK(rescode)) {
PGLOGERR("Insert", rescode, conn);
goto rollback;
}
}
ok = true;
rollback:
if (begun) {
if (ok)
res = PQexec(conn, "Commit", CKPQ_WRITE);
else
res = PQexec(conn, "Rollback", CKPQ_WRITE);
PQclear(res);
}
unparam:
for (n = 0; n < par; n++)
free(params[n]);
if (conned)
PQfinish(conn);
K_WLOCK(marks_free);
if (!ok) {
if (m_item) {
DATA_MARKS(row, m_item);
if (row->poolinstance) {
if (row->poolinstance != EMPTY) {
LIST_MEM_SUB(marks_free, row->poolinstance);
free(row->poolinstance);
}
row->poolinstance = NULL;
}
if (row->description) {
if (row->description != EMPTY) {
LIST_MEM_SUB(marks_free, row->description);
free(row->description);
}
row->description = NULL;
}
if (row->extra) {
if (row->extra != EMPTY) {
LIST_MEM_SUB(marks_free, row->extra);
free(row->extra);
}
row->extra = NULL;
}
k_add_head(marks_free, m_item);
}
}
else {
if (old_m_item) {
marks_root = remove_from_ktree(marks_root, old_m_item, cmp_marks);
copy_tv(&(oldmarks->expirydate), cd);
marks_root = add_to_ktree(marks_root, old_m_item, cmp_marks);
}
if (m_item) {
marks_root = add_to_ktree(marks_root, m_item, cmp_marks);
k_add_head(marks_store, m_item);
}
}
K_WUNLOCK(marks_free);
return ok;
}
bool marks_fill(PGconn *conn) bool marks_fill(PGconn *conn)
{ {
ExecStatusType rescode; ExecStatusType rescode;
@ -5120,14 +5541,14 @@ bool marks_fill(PGconn *conn)
MARKS *row; MARKS *row;
char *field; char *field;
char *sel; char *sel;
int fields = 5; int fields = 6;
bool ok; bool ok;
LOGDEBUG("%s(): select", __func__); LOGDEBUG("%s(): select", __func__);
// TODO: limit how far back // TODO: limit how far back
sel = "select " sel = "select "
"poolinstance,workinfoid,description,marktype,status" "poolinstance,workinfoid,description,extra,marktype,status"
HISTORYDATECONTROL HISTORYDATECONTROL
" from marks"; " from marks";
res = PQexec(conn, sel, CKPQ_READ); res = PQexec(conn, sel, CKPQ_READ);
@ -5173,6 +5594,11 @@ bool marks_fill(PGconn *conn)
break; break;
TXT_TO_PTR("description", field, row->description); TXT_TO_PTR("description", field, row->description);
PQ_GET_FLD(res, i, "extra", field, ok);
if (!ok)
break;
TXT_TO_PTR("extra", field, row->extra);
PQ_GET_FLD(res, i, "marktype", field, ok); PQ_GET_FLD(res, i, "marktype", field, ok);
if (!ok) if (!ok)
break; break;

Loading…
Cancel
Save