From 1ad54736a736069b5cfdcd82ce8e8b413c60633a Mon Sep 17 00:00:00 2001 From: kanoi Date: Sat, 26 Sep 2015 21:21:38 +1000 Subject: [PATCH] ckdb - add to marks command deleting the markersummaries and reversing a workmarker back to ready --- src/ckdb.h | 3 +- src/ckdb_cmd.c | 96 ++++++++++++++++++++++++++- src/ckdb_dbio.c | 171 +++++++++++++++++++++++++++++++++++++++++++++++- 3 files changed, 264 insertions(+), 6 deletions(-) diff --git a/src/ckdb.h b/src/ckdb.h index 88976a4d..4186db66 100644 --- a/src/ckdb.h +++ b/src/ckdb.h @@ -55,7 +55,7 @@ #define DB_VLOCK "1" #define DB_VERSION "1.0.3" -#define CKDB_VERSION DB_VERSION"-1.323" +#define CKDB_VERSION DB_VERSION"-1.330" #define WHERE_FFL " - from %s %s() line %d" #define WHERE_FFL_HERE __FILE__, __func__, __LINE__ @@ -2523,6 +2523,7 @@ extern bool shareerrors_add(PGconn *conn, char *workinfoid, char *username, extern bool sharesummaries_to_markersummaries(PGconn *conn, WORKMARKERS *workmarkers, char *by, char *code, char *inet, tv_t *cd, K_TREE *trf_root); +extern bool delete_markersummaries(PGconn *conn, WORKMARKERS *wm); extern char *ooo_status(char *buf, size_t siz); #define sharesummary_update(_s_row, _e_row, _by, _code, _inet, _cd) \ _sharesummary_update(_s_row, _e_row, _by, _code, _inet, _cd, \ diff --git a/src/ckdb_cmd.c b/src/ckdb_cmd.c index ff472945..7fab93af 100644 --- a/src/ckdb_cmd.c +++ b/src/ckdb_cmd.c @@ -5738,7 +5738,7 @@ static char *cmd_marks(PGconn *conn, char *cmd, char *id, action = transfer_data(i_action); if (strcasecmp(action, "add") == 0) { - /* Add a mark + /* Add a mark, -m will automatically do this * Require marktype * Require workinfoid for all but 'b' * If marktype is 'b' or 'p' then require height/block (number) @@ -5944,7 +5944,7 @@ static char *cmd_marks(PGconn *conn, char *cmd, char *id, trf_root); } } else if (strcasecmp(action, "generate") == 0) { - /* Generate workmarkers + /* Generate workmarkers, -m will automatically do this * No parameters */ tmp[0] = '\0'; ok = workmarkers_generate(conn, tmp, sizeof(tmp), @@ -6005,6 +6005,7 @@ static char *cmd_marks(PGconn *conn, char *cmd, char *id, } else if (strcasecmp(action, "sum") == 0) { /* For the last available workmarker, * summarise it's sharesummaries into markersummaries + * -m will automatically do this * No parameters */ ok = make_markersummaries(true, by, code, inet, cd, trf_root); if (!ok) { @@ -6012,6 +6013,40 @@ static char *cmd_marks(PGconn *conn, char *cmd, char *id, LOGERR("%s.%s", id, reply); return strdup(reply); } + } else if (strcasecmp(action, "ready") == 0) { + /* Mark a processed workmarker as ready + * for fixing problems with markersummaries + * Requires markerid */ + i_markerid = require_name(trf_root, "markerid", 1, (char *)intpatt, reply, siz); + if (!i_markerid) + return strdup(reply); + TXT_TO_BIGINT("markerid", transfer_data(i_markerid), markerid); + K_RLOCK(workmarkers_free); + wm_item = find_workmarkerid(markerid, true, '\0'); + K_RUNLOCK(workmarkers_free); + if (!wm_item) { + snprintf(reply, siz, + "unknown workmarkers with markerid %"PRId64, markerid); + return strdup(reply); + } + DATA_WORKMARKERS(workmarkers, wm_item); + if (!WMPROCESSED(workmarkers->status)) { + snprintf(reply, siz, + "markerid isn't processed %"PRId64, markerid); + return strdup(reply); + } + ok = workmarkers_process(NULL, false, true, markerid, + workmarkers->poolinstance, + workmarkers->workinfoidend, + workmarkers->workinfoidstart, + workmarkers->description, + MARKER_READY_STR, + by, code, inet, cd, trf_root); + if (!ok) { + snprintf(reply, siz, "%s failed", action); + LOGERR("%s.%s", id, reply); + return strdup(reply); + } } else if (strcasecmp(action, "processed") == 0) { /* Mark a workmarker as processed * Requires markerid */ @@ -6045,6 +6080,63 @@ static char *cmd_marks(PGconn *conn, char *cmd, char *id, LOGERR("%s.%s", id, reply); return strdup(reply); } + } else if (strcasecmp(action, "cancel") == 0) { + /* Cancel(delete) all the markersummaries in a workmarker + * This can only be done if the workmarker isn't processed + * It reports on the console, summary information of the + * markersummaries that were deleted + * + * WARNING ... if you do this after the workmarker has been + * processed, there will no longer be any matching shares, + * sharesummaries or workmarkers in ram, so you'd need to + * restart ckdb to regenerate the markersummaries + * HOWEVER, ckdb wont reload the shares if there is a later + * workmarker that is processed + * + * SS_to_MS will complain if any markersummaries already exist + * when processing a workmarker + * Normally you would use 'processed' if the markersummaries + * are OK, and just the workmarker failed to be updated to + * processed status + * However, if there is actually something wrong with the + * shift data (markersummaries) you can delete them and they + * will be regenerated + * This will usually only work as expected if the last + * workmarker isn't marked as processed, but somehow there + * are markersummaries for it in the DB, thus the reload + * will reload all the shares for the workmarker then it + * will print a warning every 13s on the console saying + * that it can't process the workmarker + * In this case you would cancel the workmarker then ckdb + * will regenerate it from the shares/sharesummaries in ram + * + * Requires markerid */ + i_markerid = require_name(trf_root, "markerid", 1, (char *)intpatt, reply, siz); + if (!i_markerid) + return strdup(reply); + TXT_TO_BIGINT("markerid", transfer_data(i_markerid), markerid); + K_RLOCK(workmarkers_free); + wm_item = find_workmarkerid(markerid, true, '\0'); + K_RUNLOCK(workmarkers_free); + if (!wm_item) { + snprintf(reply, siz, + "unknown workmarkers with markerid %"PRId64, markerid); + return strdup(reply); + } + DATA_WORKMARKERS(workmarkers, wm_item); + if (WMPROCESSED(workmarkers->status)) { + snprintf(reply, siz, + "can't cancel a processed markerid %"PRId64, + markerid); + return strdup(reply); + } + + ok = delete_markersummaries(NULL, workmarkers); + if (!ok) { + snprintf(reply, siz, "%s failed", action); + LOGERR("%s.%s", id, reply); + return strdup(reply); + } } else { snprintf(reply, siz, "unknown action '%s'", action); LOGERR("%s.%s", id, reply); diff --git a/src/ckdb_dbio.c b/src/ckdb_dbio.c index ffd858e9..fd790a13 100644 --- a/src/ckdb_dbio.c +++ b/src/ckdb_dbio.c @@ -3567,7 +3567,7 @@ bool sharesummaries_to_markersummaries(PGconn *conn, WORKMARKERS *workmarkers, K_TREE *trf_root) { // shorter name for log messages - const char *shortname = "SS_to_MS"; + static const char *shortname = "SS_to_MS"; ExecStatusType rescode; PGresult *res; K_TREE_CTX ss_ctx[1], ms_ctx[1]; @@ -3613,6 +3613,14 @@ bool sharesummaries_to_markersummaries(PGconn *conn, WORKMARKERS *workmarkers, K_RUNLOCK(markersummary_free); DATA_MARKERSUMMARY_NULL(markersummary, ms_item); if (ms_item && markersummary->markerid == workmarkers->markerid) { + /* The fix here is either to set the workmarker as processed + * with the marks action=processed + * if the markersummaries are OK but the workmarker failed to + * have it's status set to processed + * OR + * delete the markersummaries with the marks action=cancel + * so this will continue and regenerate the markersummaries + */ reason = "markersummaries already exist"; goto flail; } @@ -3853,6 +3861,162 @@ flail: return ok; } +bool delete_markersummaries(PGconn *conn, WORKMARKERS *wm) +{ + // shorter name for log messages + static const char *shortname = "DEL_MS"; + K_STORE *del_markersummary_store = NULL; + ExecStatusType rescode; + PGresult *res; + K_TREE_CTX ms_ctx[1]; + MARKERSUMMARY *markersummary = NULL, lookmarkersummary; + K_ITEM *ms_item, ms_look, *p_ms_item = NULL; + bool ok = false, conned = false; + int64_t diffacc, shareacc; + char *reason = "unknown"; + int ms_count; + char *params[1]; + int par = 0, ms_del = 0; + char *del, *tuples = NULL; + + if (WMPROCESSED(wm->status)) { + reason = "status processed"; + goto flail; + } + + LOGWARNING("%s() Deleting: markersummaries for workmarkers " + "%"PRId64"/%s/End %"PRId64"/Stt %"PRId64"/%s/%s", + shortname, wm->markerid, wm->poolinstance, + wm->workinfoidend, wm->workinfoidstart, wm->description, + wm->status); + + del_markersummary_store = k_new_store(markersummary_free); + + lookmarkersummary.markerid = wm->markerid; + lookmarkersummary.userid = 0; + lookmarkersummary.workername = EMPTY; + + ms_count = diffacc = shareacc = 0; + ms_item = NULL; + + INIT_MARKERSUMMARY(&ms_look); + ms_look.data = (void *)(&lookmarkersummary); + + K_WLOCK(workmarkers_free); + K_WLOCK(markersummary_free); + + ms_item = find_after_in_ktree(markersummary_root, &ms_look, ms_ctx); + DATA_MARKERSUMMARY_NULL(markersummary, ms_item); + if (!ms_item || markersummary->markerid != wm->markerid) { + reason = "no markersummaries"; + goto flail; + } + + // Build the delete list of markersummaries + while (ms_item && markersummary->markerid == wm->markerid) { + ms_count++; + diffacc += markersummary->diffacc; + shareacc += markersummary->shareacc; + + k_unlink_item(markersummary_store, ms_item); + k_add_tail(del_markersummary_store, ms_item); + + ms_item = next_in_ktree(ms_ctx); + DATA_MARKERSUMMARY_NULL(markersummary, ms_item); + } + + par = 0; + params[par++] = bigint_to_buf(wm->markerid, NULL, 0); + PARCHK(par, params); + + del = "delete from markersummary where markerid=$1"; + + if (conn == NULL) { + conn = dbconnect(); + conned = true; + } + + res = PQexecParams(conn, del, par, NULL, (const char **)params, NULL, NULL, 0, CKPQ_WRITE); + rescode = PQresultStatus(res); + if (!PGOK(rescode)) { + PGLOGERR("Delete", rescode, conn); + reason = "db error"; + goto unparam; + } + + if (PGOK(rescode)) { + tuples = PQcmdTuples(res); + if (tuples && *tuples) { + ms_del = atoi(tuples); + if (ms_del != ms_count) { + LOGERR("%s() deleted markersummaries should be" + " %d but deleted=%d", + shortname, ms_count, ms_del); + reason = "del mismatch"; + goto unparam; + } + } + } + + ok = true; +unparam: + PQclear(res); +flail: + if (conned) + PQfinish(conn); + + if (!ok) { + if (del_markersummary_store && del_markersummary_store->count) { + k_list_transfer_to_head(del_markersummary_store, + markersummary_store); + } + } else { + /* TODO: add a list garbage collection thread so as to not + * invalidate the data immediately (free_*), rather after + * some delay */ + ms_item = del_markersummary_store->head; + while (ms_item) { + remove_from_ktree(markersummary_root, ms_item); + remove_from_ktree(markersummary_userid_root, ms_item); + free_markersummary_data(ms_item); + ms_item = ms_item->next; + } + + k_list_transfer_to_head(del_markersummary_store, + markersummary_free); + + p_ms_item = find_markersummary_p(wm->markerid); + if (p_ms_item) { + remove_from_ktree(markersummary_pool_root, p_ms_item); + free_markersummary_data(p_ms_item); + k_unlink_item(markersummary_pool_store, p_ms_item); + k_add_head(markersummary_free, p_ms_item); + } + } + + K_WUNLOCK(markersummary_free); + K_WUNLOCK(workmarkers_free); + + if (!ok) { + // already displayed the full workmarkers detail at the top + LOGERR("%s() %s: workmarkers %"PRId64"/%s/%s", + shortname, reason, wm->markerid, wm->description, + wm->status); + } else { + LOGWARNING("%s() Deleted: %d ms %"PRId64" shares " + "%"PRId64" diff for workmarkers %"PRId64"/%s/" + "End %"PRId64"/Stt %"PRId64"/%s/%s", + shortname, ms_count, shareacc, diffacc, + wm->markerid, wm->poolinstance, wm->workinfoidend, + wm->workinfoidstart, wm->description, wm->status); + } + + if (del_markersummary_store) + del_markersummary_store = k_free_store(del_markersummary_store); + + return ok; +} + static void set_sharesummary_stats(SHARESUMMARY *row, SHARES *s_row, SHAREERRORS *e_row, bool new, double *tdf, double *tdl) @@ -6876,8 +7040,9 @@ bool workmarkers_fill(PGconn *conn) !WMPROCESSED(row->status)) { LOGWARNING("%s(): WARNING workmarkerid %"PRId64" (%s)" " wid end %"PRId64" isn't processed! (%s) " - "You should abort ckdb and mark it if it " - "actually has already been processed", + "You need to correct it after the startup " + "completes, with a marks action: processed" + " or cancel", __func__, row->markerid, row->description, row->workinfoidend, row->status); }