Browse Source

ckdb - cleanup lists/trees on shutdown and notify what is delaying shutdown

master
kanoi 10 years ago
parent
commit
e2b378cb84
  1. 165
      src/ckdb.c
  2. 2
      src/ckdb.h
  3. 4
      src/ckdb_dbio.c

165
src/ckdb.c

@ -150,6 +150,11 @@
* and a warning is displayed if there were any matching shares * and a warning is displayed if there were any matching shares
*/ */
bool socketer_release;
bool summariser_release;
bool logger_release;
bool listener_release;
char *EMPTY = ""; char *EMPTY = "";
static char *db_name; static char *db_name;
@ -995,6 +1000,121 @@ static void alloc_storage()
workerstatus_root = new_ktree(); workerstatus_root = new_ktree();
} }
static void free_workinfo_data(K_ITEM *item)
{
WORKINFO *workinfo;
DATA_WORKINFO(workinfo, item);
if (workinfo->transactiontree)
free(workinfo->transactiontree);
if (workinfo->merklehash)
free(workinfo->merklehash);
}
static void free_sharesummary_data(K_ITEM *item)
{
SHARESUMMARY *sharesummary;
DATA_SHARESUMMARY(sharesummary, item);
SET_CREATEBY(sharesummary_free, sharesummary->createby, EMPTY);
SET_CREATECODE(sharesummary_free, sharesummary->createcode, EMPTY);
SET_CREATEINET(sharesummary_free, sharesummary->createinet, EMPTY);
SET_MODIFYBY(sharesummary_free, sharesummary->modifyby, EMPTY);
SET_MODIFYCODE(sharesummary_free, sharesummary->modifycode, EMPTY);
SET_MODIFYINET(sharesummary_free, sharesummary->modifyinet, EMPTY);
}
#define FREE_TREE(_tree) \
if (_tree ## _root) \
_tree ## _root = free_ktree(_tree ## _root, NULL) \
#define FREE_STORE(_list) \
if (_list ## _store) \
_list ## _store = k_free_store(_list ## _store) \
#define FREE_LIST(_list) \
if (_list ## _free) \
_list ## _free = k_free_list(_list ## _free) \
#define FREE_ALL(_list) FREE_TREE(_list); FREE_STORE(_list); FREE_LIST(_list)
#define FREE_LISTS(_list) FREE_STORE(_list); FREE_LIST(_list)
static void dealloc_storage()
{
K_ITEM *item;
FREE_LISTS(logqueue);
FREE_ALL(workerstatus);
FREE_TREE(userstats_workerstatus);
FREE_TREE(userstats_statsdate);
if (userstats_summ)
userstats_summ = k_free_store(userstats_summ);
FREE_STORE(userstats_eos);
FREE_ALL(userstats);
FREE_ALL(poolstats);
FREE_ALL(auths);
FREE_ALL(miningpayouts);
FREE_ALL(blocks);
FREE_TREE(sharesummary_workinfoid);
FREE_TREE(sharesummary);
if (sharesummary_store) {
item = sharesummary_store->head;
while (item) {
free_sharesummary_data(item);
item = item->next;
}
FREE_STORE(sharesummary);
}
if (sharesummary_free) {
item = sharesummary_free->head;
while (item) {
free_sharesummary_data(item);
item = item->next;
}
FREE_LIST(sharesummary);
}
FREE_ALL(shareerrors);
FREE_ALL(shares);
FREE_TREE(workinfo_height);
FREE_TREE(workinfo);
if (workinfo_store) {
item = workinfo_store->head;
while (item) {
free_workinfo_data(item);
item = item->next;
}
FREE_STORE(workinfo);
}
if (workinfo_free) {
item = workinfo_free->head;
while (item) {
free_workinfo_data(item);
item = item->next;
}
FREE_LIST(workinfo);
}
FREE_LISTS(idcontrol);
FREE_ALL(payments);
FREE_ALL(paymentaddresses);
FREE_ALL(workers);
FREE_ALL(optioncontrol);
FREE_ALL(useratts);
FREE_TREE(userid);
FREE_ALL(users);
FREE_LIST(transfer);
FREE_LISTS(heartbeatqueue);
FREE_LISTS(workqueue);
}
static bool setup_data() static bool setup_data()
{ {
K_TREE_CTX ctx[1]; K_TREE_CTX ctx[1];
@ -1669,6 +1789,8 @@ static void *summariser(__maybe_unused void *arg)
summarise_userstats(); summarise_userstats();
} }
summariser_release = true;
return NULL; return NULL;
} }
@ -1715,12 +1837,19 @@ static void *logger(__maybe_unused void *arg)
logqueue_store->count, logqueue_store->count,
now.tv_sec, now.tv_usec); now.tv_sec, now.tv_usec);
LOGFILE(buf); LOGFILE(buf);
while((lq_item = k_unlink_head(logqueue_store))) { if (logqueue_store->count)
LOGERR("%s", buf);
lq_item = logqueue_store->head;
while (lq_item) {
DATA_LOGQUEUE(lq, lq_item); DATA_LOGQUEUE(lq, lq_item);
LOGFILE(lq->msg); LOGFILE(lq->msg);
free(lq->msg);
lq_item = lq_item->next;
} }
K_WUNLOCK(logqueue_free); K_WUNLOCK(logqueue_free);
logger_release = true;
setnow(&now); setnow(&now);
snprintf(buf, sizeof(buf), "logstop.%ld,%ld", snprintf(buf, sizeof(buf), "logstop.%ld,%ld",
now.tv_sec, now.tv_usec); now.tv_sec, now.tv_usec);
@ -2125,6 +2254,8 @@ static void *socketer(__maybe_unused void *arg)
} }
} }
socketer_release = true;
if (buf) if (buf)
dealloc(buf); dealloc(buf);
// TODO: if anyone cares, free all the dup buffers :P // TODO: if anyone cares, free all the dup buffers :P
@ -2526,6 +2657,8 @@ static void *listener(void *arg)
} }
} }
listener_release = true;
if (conn) if (conn)
PQfinish(conn); PQfinish(conn);
@ -3337,6 +3470,36 @@ int main(int argc, char **argv)
/* Shutdown from here if the listener is sent a shutdown message */ /* Shutdown from here if the listener is sent a shutdown message */
join_pthread(ckp.pth_listener); join_pthread(ckp.pth_listener);
time_t start, trigger, now;
char *msg = NULL;
trigger = start = time(NULL);
while (!socketer_release || !summariser_release ||
!logger_release || !listener_release) {
msg = NULL;
now = time(NULL);
if (now - start > 4) {
if (now - trigger > 4) {
msg = "Shutdown initial delay";
} else if (now - trigger > 2) {
msg = "Shutdown delay";
}
}
if (msg) {
trigger = now;
printf("%s %ds due to%s%s%s%s\n",
msg, (int)(now - start),
socketer_release ? EMPTY : " socketer",
summariser_release ? EMPTY : " summariser",
logger_release ? EMPTY : " logger",
listener_release ? EMPTY : " listener");
fflush(stdout);
}
sleep(1);
}
dealloc_storage();
} }
clean_up(&ckp); clean_up(&ckp);

2
src/ckdb.h

@ -52,7 +52,7 @@
#define DB_VLOCK "1" #define DB_VLOCK "1"
#define DB_VERSION "0.9.2" #define DB_VERSION "0.9.2"
#define CKDB_VERSION DB_VERSION"-0.590" #define CKDB_VERSION DB_VERSION"-0.592"
#define WHERE_FFL " - from %s %s() line %d" #define WHERE_FFL " - from %s %s() line %d"
#define WHERE_FFL_HERE __FILE__, __func__, __LINE__ #define WHERE_FFL_HERE __FILE__, __func__, __LINE__

4
src/ckdb_dbio.c

@ -2146,7 +2146,9 @@ int64_t workinfo_add(PGconn *conn, char *workinfoidstr, char *poolinstance,
K_WLOCK(workinfo_free); K_WLOCK(workinfo_free);
if (find_in_ktree(workinfo_root, item, cmp_workinfo, ctx)) { if (find_in_ktree(workinfo_root, item, cmp_workinfo, ctx)) {
free(row->transactiontree); free(row->transactiontree);
row->transactiontree = NULL;
free(row->merklehash); free(row->merklehash);
row->merklehash = NULL;
workinfoid = row->workinfoid; workinfoid = row->workinfoid;
k_add_head(workinfo_free, item); k_add_head(workinfo_free, item);
K_WUNLOCK(workinfo_free); K_WUNLOCK(workinfo_free);
@ -2209,7 +2211,9 @@ unparam:
K_WLOCK(workinfo_free); K_WLOCK(workinfo_free);
if (workinfoid == -1) { if (workinfoid == -1) {
free(row->transactiontree); free(row->transactiontree);
row->transactiontree = NULL;
free(row->merklehash); free(row->merklehash);
row->merklehash = NULL;
k_add_head(workinfo_free, item); k_add_head(workinfo_free, item);
} else { } else {
if (row->transactiontree && *(row->transactiontree)) { if (row->transactiontree && *(row->transactiontree)) {

Loading…
Cancel
Save