Browse Source

ckdb - add to marks command deleting the markersummaries and reversing a workmarker back to ready

master
kanoi 9 years ago
parent
commit
1ad54736a7
  1. 3
      src/ckdb.h
  2. 96
      src/ckdb_cmd.c
  3. 171
      src/ckdb_dbio.c

3
src/ckdb.h

@ -55,7 +55,7 @@
#define DB_VLOCK "1" #define DB_VLOCK "1"
#define DB_VERSION "1.0.3" #define DB_VERSION "1.0.3"
#define CKDB_VERSION DB_VERSION"-1.323" #define CKDB_VERSION DB_VERSION"-1.330"
#define WHERE_FFL " - from %s %s() line %d" #define WHERE_FFL " - from %s %s() line %d"
#define WHERE_FFL_HERE __FILE__, __func__, __LINE__ #define WHERE_FFL_HERE __FILE__, __func__, __LINE__
@ -2523,6 +2523,7 @@ extern bool shareerrors_add(PGconn *conn, char *workinfoid, char *username,
extern bool sharesummaries_to_markersummaries(PGconn *conn, WORKMARKERS *workmarkers, extern bool sharesummaries_to_markersummaries(PGconn *conn, WORKMARKERS *workmarkers,
char *by, char *code, char *inet, char *by, char *code, char *inet,
tv_t *cd, K_TREE *trf_root); tv_t *cd, K_TREE *trf_root);
extern bool delete_markersummaries(PGconn *conn, WORKMARKERS *wm);
extern char *ooo_status(char *buf, size_t siz); extern char *ooo_status(char *buf, size_t siz);
#define sharesummary_update(_s_row, _e_row, _by, _code, _inet, _cd) \ #define sharesummary_update(_s_row, _e_row, _by, _code, _inet, _cd) \
_sharesummary_update(_s_row, _e_row, _by, _code, _inet, _cd, \ _sharesummary_update(_s_row, _e_row, _by, _code, _inet, _cd, \

96
src/ckdb_cmd.c

@ -5738,7 +5738,7 @@ static char *cmd_marks(PGconn *conn, char *cmd, char *id,
action = transfer_data(i_action); action = transfer_data(i_action);
if (strcasecmp(action, "add") == 0) { if (strcasecmp(action, "add") == 0) {
/* Add a mark /* Add a mark, -m will automatically do this
* Require marktype * Require marktype
* Require workinfoid for all but 'b' * Require workinfoid for all but 'b'
* If marktype is 'b' or 'p' then require height/block (number) * If marktype is 'b' or 'p' then require height/block (number)
@ -5944,7 +5944,7 @@ static char *cmd_marks(PGconn *conn, char *cmd, char *id,
trf_root); trf_root);
} }
} else if (strcasecmp(action, "generate") == 0) { } else if (strcasecmp(action, "generate") == 0) {
/* Generate workmarkers /* Generate workmarkers, -m will automatically do this
* No parameters */ * No parameters */
tmp[0] = '\0'; tmp[0] = '\0';
ok = workmarkers_generate(conn, tmp, sizeof(tmp), ok = workmarkers_generate(conn, tmp, sizeof(tmp),
@ -6005,6 +6005,7 @@ static char *cmd_marks(PGconn *conn, char *cmd, char *id,
} else if (strcasecmp(action, "sum") == 0) { } else if (strcasecmp(action, "sum") == 0) {
/* For the last available workmarker, /* For the last available workmarker,
* summarise it's sharesummaries into markersummaries * summarise it's sharesummaries into markersummaries
* -m will automatically do this
* No parameters */ * No parameters */
ok = make_markersummaries(true, by, code, inet, cd, trf_root); ok = make_markersummaries(true, by, code, inet, cd, trf_root);
if (!ok) { if (!ok) {
@ -6012,6 +6013,40 @@ static char *cmd_marks(PGconn *conn, char *cmd, char *id,
LOGERR("%s.%s", id, reply); LOGERR("%s.%s", id, reply);
return strdup(reply); return strdup(reply);
} }
} else if (strcasecmp(action, "ready") == 0) {
/* Mark a processed workmarker as ready
* for fixing problems with markersummaries
* Requires markerid */
i_markerid = require_name(trf_root, "markerid", 1, (char *)intpatt, reply, siz);
if (!i_markerid)
return strdup(reply);
TXT_TO_BIGINT("markerid", transfer_data(i_markerid), markerid);
K_RLOCK(workmarkers_free);
wm_item = find_workmarkerid(markerid, true, '\0');
K_RUNLOCK(workmarkers_free);
if (!wm_item) {
snprintf(reply, siz,
"unknown workmarkers with markerid %"PRId64, markerid);
return strdup(reply);
}
DATA_WORKMARKERS(workmarkers, wm_item);
if (!WMPROCESSED(workmarkers->status)) {
snprintf(reply, siz,
"markerid isn't processed %"PRId64, markerid);
return strdup(reply);
}
ok = workmarkers_process(NULL, false, true, markerid,
workmarkers->poolinstance,
workmarkers->workinfoidend,
workmarkers->workinfoidstart,
workmarkers->description,
MARKER_READY_STR,
by, code, inet, cd, trf_root);
if (!ok) {
snprintf(reply, siz, "%s failed", action);
LOGERR("%s.%s", id, reply);
return strdup(reply);
}
} else if (strcasecmp(action, "processed") == 0) { } else if (strcasecmp(action, "processed") == 0) {
/* Mark a workmarker as processed /* Mark a workmarker as processed
* Requires markerid */ * Requires markerid */
@ -6045,6 +6080,63 @@ static char *cmd_marks(PGconn *conn, char *cmd, char *id,
LOGERR("%s.%s", id, reply); LOGERR("%s.%s", id, reply);
return strdup(reply); return strdup(reply);
} }
} else if (strcasecmp(action, "cancel") == 0) {
/* Cancel(delete) all the markersummaries in a workmarker
* This can only be done if the workmarker isn't processed
* It reports on the console, summary information of the
* markersummaries that were deleted
*
* WARNING ... if you do this after the workmarker has been
* processed, there will no longer be any matching shares,
* sharesummaries or workmarkers in ram, so you'd need to
* restart ckdb to regenerate the markersummaries
* HOWEVER, ckdb wont reload the shares if there is a later
* workmarker that is processed
*
* SS_to_MS will complain if any markersummaries already exist
* when processing a workmarker
* Normally you would use 'processed' if the markersummaries
* are OK, and just the workmarker failed to be updated to
* processed status
* However, if there is actually something wrong with the
* shift data (markersummaries) you can delete them and they
* will be regenerated
* This will usually only work as expected if the last
* workmarker isn't marked as processed, but somehow there
* are markersummaries for it in the DB, thus the reload
* will reload all the shares for the workmarker then it
* will print a warning every 13s on the console saying
* that it can't process the workmarker
* In this case you would cancel the workmarker then ckdb
* will regenerate it from the shares/sharesummaries in ram
*
* Requires markerid */
i_markerid = require_name(trf_root, "markerid", 1, (char *)intpatt, reply, siz);
if (!i_markerid)
return strdup(reply);
TXT_TO_BIGINT("markerid", transfer_data(i_markerid), markerid);
K_RLOCK(workmarkers_free);
wm_item = find_workmarkerid(markerid, true, '\0');
K_RUNLOCK(workmarkers_free);
if (!wm_item) {
snprintf(reply, siz,
"unknown workmarkers with markerid %"PRId64, markerid);
return strdup(reply);
}
DATA_WORKMARKERS(workmarkers, wm_item);
if (WMPROCESSED(workmarkers->status)) {
snprintf(reply, siz,
"can't cancel a processed markerid %"PRId64,
markerid);
return strdup(reply);
}
ok = delete_markersummaries(NULL, workmarkers);
if (!ok) {
snprintf(reply, siz, "%s failed", action);
LOGERR("%s.%s", id, reply);
return strdup(reply);
}
} else { } else {
snprintf(reply, siz, "unknown action '%s'", action); snprintf(reply, siz, "unknown action '%s'", action);
LOGERR("%s.%s", id, reply); LOGERR("%s.%s", id, reply);

171
src/ckdb_dbio.c

@ -3567,7 +3567,7 @@ bool sharesummaries_to_markersummaries(PGconn *conn, WORKMARKERS *workmarkers,
K_TREE *trf_root) K_TREE *trf_root)
{ {
// shorter name for log messages // shorter name for log messages
const char *shortname = "SS_to_MS"; static const char *shortname = "SS_to_MS";
ExecStatusType rescode; ExecStatusType rescode;
PGresult *res; PGresult *res;
K_TREE_CTX ss_ctx[1], ms_ctx[1]; K_TREE_CTX ss_ctx[1], ms_ctx[1];
@ -3613,6 +3613,14 @@ bool sharesummaries_to_markersummaries(PGconn *conn, WORKMARKERS *workmarkers,
K_RUNLOCK(markersummary_free); K_RUNLOCK(markersummary_free);
DATA_MARKERSUMMARY_NULL(markersummary, ms_item); DATA_MARKERSUMMARY_NULL(markersummary, ms_item);
if (ms_item && markersummary->markerid == workmarkers->markerid) { if (ms_item && markersummary->markerid == workmarkers->markerid) {
/* The fix here is either to set the workmarker as processed
* with the marks action=processed
* if the markersummaries are OK but the workmarker failed to
* have it's status set to processed
* OR
* delete the markersummaries with the marks action=cancel
* so this will continue and regenerate the markersummaries
*/
reason = "markersummaries already exist"; reason = "markersummaries already exist";
goto flail; goto flail;
} }
@ -3853,6 +3861,162 @@ flail:
return ok; return ok;
} }
bool delete_markersummaries(PGconn *conn, WORKMARKERS *wm)
{
// shorter name for log messages
static const char *shortname = "DEL_MS";
K_STORE *del_markersummary_store = NULL;
ExecStatusType rescode;
PGresult *res;
K_TREE_CTX ms_ctx[1];
MARKERSUMMARY *markersummary = NULL, lookmarkersummary;
K_ITEM *ms_item, ms_look, *p_ms_item = NULL;
bool ok = false, conned = false;
int64_t diffacc, shareacc;
char *reason = "unknown";
int ms_count;
char *params[1];
int par = 0, ms_del = 0;
char *del, *tuples = NULL;
if (WMPROCESSED(wm->status)) {
reason = "status processed";
goto flail;
}
LOGWARNING("%s() Deleting: markersummaries for workmarkers "
"%"PRId64"/%s/End %"PRId64"/Stt %"PRId64"/%s/%s",
shortname, wm->markerid, wm->poolinstance,
wm->workinfoidend, wm->workinfoidstart, wm->description,
wm->status);
del_markersummary_store = k_new_store(markersummary_free);
lookmarkersummary.markerid = wm->markerid;
lookmarkersummary.userid = 0;
lookmarkersummary.workername = EMPTY;
ms_count = diffacc = shareacc = 0;
ms_item = NULL;
INIT_MARKERSUMMARY(&ms_look);
ms_look.data = (void *)(&lookmarkersummary);
K_WLOCK(workmarkers_free);
K_WLOCK(markersummary_free);
ms_item = find_after_in_ktree(markersummary_root, &ms_look, ms_ctx);
DATA_MARKERSUMMARY_NULL(markersummary, ms_item);
if (!ms_item || markersummary->markerid != wm->markerid) {
reason = "no markersummaries";
goto flail;
}
// Build the delete list of markersummaries
while (ms_item && markersummary->markerid == wm->markerid) {
ms_count++;
diffacc += markersummary->diffacc;
shareacc += markersummary->shareacc;
k_unlink_item(markersummary_store, ms_item);
k_add_tail(del_markersummary_store, ms_item);
ms_item = next_in_ktree(ms_ctx);
DATA_MARKERSUMMARY_NULL(markersummary, ms_item);
}
par = 0;
params[par++] = bigint_to_buf(wm->markerid, NULL, 0);
PARCHK(par, params);
del = "delete from markersummary where markerid=$1";
if (conn == NULL) {
conn = dbconnect();
conned = true;
}
res = PQexecParams(conn, del, par, NULL, (const char **)params, NULL, NULL, 0, CKPQ_WRITE);
rescode = PQresultStatus(res);
if (!PGOK(rescode)) {
PGLOGERR("Delete", rescode, conn);
reason = "db error";
goto unparam;
}
if (PGOK(rescode)) {
tuples = PQcmdTuples(res);
if (tuples && *tuples) {
ms_del = atoi(tuples);
if (ms_del != ms_count) {
LOGERR("%s() deleted markersummaries should be"
" %d but deleted=%d",
shortname, ms_count, ms_del);
reason = "del mismatch";
goto unparam;
}
}
}
ok = true;
unparam:
PQclear(res);
flail:
if (conned)
PQfinish(conn);
if (!ok) {
if (del_markersummary_store && del_markersummary_store->count) {
k_list_transfer_to_head(del_markersummary_store,
markersummary_store);
}
} else {
/* TODO: add a list garbage collection thread so as to not
* invalidate the data immediately (free_*), rather after
* some delay */
ms_item = del_markersummary_store->head;
while (ms_item) {
remove_from_ktree(markersummary_root, ms_item);
remove_from_ktree(markersummary_userid_root, ms_item);
free_markersummary_data(ms_item);
ms_item = ms_item->next;
}
k_list_transfer_to_head(del_markersummary_store,
markersummary_free);
p_ms_item = find_markersummary_p(wm->markerid);
if (p_ms_item) {
remove_from_ktree(markersummary_pool_root, p_ms_item);
free_markersummary_data(p_ms_item);
k_unlink_item(markersummary_pool_store, p_ms_item);
k_add_head(markersummary_free, p_ms_item);
}
}
K_WUNLOCK(markersummary_free);
K_WUNLOCK(workmarkers_free);
if (!ok) {
// already displayed the full workmarkers detail at the top
LOGERR("%s() %s: workmarkers %"PRId64"/%s/%s",
shortname, reason, wm->markerid, wm->description,
wm->status);
} else {
LOGWARNING("%s() Deleted: %d ms %"PRId64" shares "
"%"PRId64" diff for workmarkers %"PRId64"/%s/"
"End %"PRId64"/Stt %"PRId64"/%s/%s",
shortname, ms_count, shareacc, diffacc,
wm->markerid, wm->poolinstance, wm->workinfoidend,
wm->workinfoidstart, wm->description, wm->status);
}
if (del_markersummary_store)
del_markersummary_store = k_free_store(del_markersummary_store);
return ok;
}
static void set_sharesummary_stats(SHARESUMMARY *row, SHARES *s_row, static void set_sharesummary_stats(SHARESUMMARY *row, SHARES *s_row,
SHAREERRORS *e_row, bool new, SHAREERRORS *e_row, bool new,
double *tdf, double *tdl) double *tdf, double *tdl)
@ -6876,8 +7040,9 @@ bool workmarkers_fill(PGconn *conn)
!WMPROCESSED(row->status)) { !WMPROCESSED(row->status)) {
LOGWARNING("%s(): WARNING workmarkerid %"PRId64" (%s)" LOGWARNING("%s(): WARNING workmarkerid %"PRId64" (%s)"
" wid end %"PRId64" isn't processed! (%s) " " wid end %"PRId64" isn't processed! (%s) "
"You should abort ckdb and mark it if it " "You need to correct it after the startup "
"actually has already been processed", "completes, with a marks action: processed"
" or cancel",
__func__, row->markerid, row->description, __func__, row->markerid, row->description,
row->workinfoidend, row->status); row->workinfoidend, row->status);
} }

Loading…
Cancel
Save