diff --git a/src/ckdb.c b/src/ckdb.c index 92fce348..d80720b0 100644 --- a/src/ckdb.c +++ b/src/ckdb.c @@ -209,22 +209,22 @@ const char Transfer[] = "Transfer"; // older version missing field defaults // see end of alloc_storage() -static TRANSFER auth_2 = { "preauth", FALSE_STR, auth_2.svalue, NULL }; +static TRANSFER auth_2 = { "preauth", FALSE_STR, auth_2.svalue, 0, NULL }; K_ITEM auth_preauth = { Transfer, NULL, NULL, (void *)(&auth_2) }; -static TRANSFER poolstats_1 = { "elapsed", "0", poolstats_1.svalue, NULL }; +static TRANSFER poolstats_1 = { "elapsed", "0", poolstats_1.svalue, 0, NULL }; K_ITEM poolstats_elapsed = { Transfer, NULL, NULL, (void *)(&poolstats_1) }; -static TRANSFER userstats_1 = { "elapsed", "0", userstats_1.svalue, NULL }; +static TRANSFER userstats_1 = { "elapsed", "0", userstats_1.svalue, 0, NULL }; K_ITEM userstats_elapsed = { Transfer, NULL, NULL, (void *)(&userstats_1) }; // see end of alloc_storage() INTRANSIENT *userstats_workername = NULL; -static TRANSFER userstats_3 = { "idle", FALSE_STR, userstats_3.svalue, NULL }; +static TRANSFER userstats_3 = { "idle", FALSE_STR, userstats_3.svalue, 0, NULL }; K_ITEM userstats_idle = { Transfer, NULL, NULL, (void *)(&userstats_3) }; -static TRANSFER userstats_4 = { "eos", TRUE_STR, userstats_4.svalue, NULL }; +static TRANSFER userstats_4 = { "eos", TRUE_STR, userstats_4.svalue, 0, NULL }; K_ITEM userstats_eos = { Transfer, NULL, NULL, (void *)(&userstats_4) }; -static TRANSFER shares_1 = { "secondaryuserid", TRUE_STR, shares_1.svalue, NULL }; +static TRANSFER shares_1 = { "secondaryuserid", TRUE_STR, shares_1.svalue, 0, NULL }; K_ITEM shares_secondaryuserid = { Transfer, NULL, NULL, (void *)(&shares_1) }; -static TRANSFER shareerrors_1 = { "secondaryuserid", TRUE_STR, shareerrors_1.svalue, NULL }; +static TRANSFER shareerrors_1 = { "secondaryuserid", TRUE_STR, shareerrors_1.svalue, 0, NULL }; K_ITEM shareerrors_secondaryuserid = { Transfer, NULL, NULL, (void *)(&shareerrors_1) }; // Time limit that this problem occurred // 24-Aug-2014 05:20+00 (1st one shortly after this) @@ -4007,11 +4007,14 @@ static enum cmd_values breakdown(K_ITEM **ml_item, char *buf, tv_t *now, char *cmdptr, *idptr, *next, *eq, *end, *was; char *data = NULL, *st = NULL, *st2 = NULL, *ip = NULL; bool noid = false, intrans; + uint64_t ram2 = 0; size_t siz; int i; + siz = strlen(buf)+1; K_WLOCK(msgline_free); *ml_item = k_unlink_head_zero(msgline_free); + msgline_free->ram += siz; K_WUNLOCK(msgline_free); DATA_MSGLINE(msgline, *ml_item); msgline->which_cmds = CMD_UNSET; @@ -4021,6 +4024,7 @@ static enum cmd_values breakdown(K_ITEM **ml_item, char *buf, tv_t *now, DATE_ZERO(&(msgline->broken)); DATE_ZERO(&(msgline->processed)); msgline->msg = strdup(buf); + msgline->msgsiz = siz; msgline->seqentryflags = seqentryflags; cmdptr = strdup(buf); @@ -4133,7 +4137,7 @@ static enum cmd_values breakdown(K_ITEM **ml_item, char *buf, tv_t *now, // LOGERR of buf could be truncated *(end++) = '\0'; K_WLOCK(transfer_free); - t_item = k_unlink_head(transfer_free); + t_item = k_unlink_head_zero(transfer_free); K_WUNLOCK(transfer_free); DATA_TRANSFER(transfer, t_item); STRNCPY(transfer->name, next); @@ -4246,6 +4250,8 @@ static enum cmd_values breakdown(K_ITEM **ml_item, char *buf, tv_t *now, if (!intrans) { transfer->intransient = NULL; if (siz >= sizeof(transfer->svalue)) { + transfer->msiz = siz+1; + ram2 += siz+1; transfer->mvalue = malloc(siz+1); STRNCPYSIZ(transfer->mvalue, next, siz+1); @@ -4291,7 +4297,7 @@ static enum cmd_values breakdown(K_ITEM **ml_item, char *buf, tv_t *now, *(eq++) = '\0'; K_WLOCK(transfer_free); - t_item = k_unlink_head(transfer_free); + t_item = k_unlink_head_zero(transfer_free); K_WUNLOCK(transfer_free); DATA_TRANSFER(transfer, t_item); STRNCPY(transfer->name, data); @@ -4311,6 +4317,8 @@ static enum cmd_values breakdown(K_ITEM **ml_item, char *buf, tv_t *now, if (!intrans) { transfer->intransient = NULL; if (siz > sizeof(transfer->svalue)) { + ram2 += siz; + transfer->msiz = siz; transfer->mvalue = malloc(siz); STRNCPYSIZ(transfer->mvalue, eq, siz); } else { @@ -4321,8 +4329,9 @@ static enum cmd_values breakdown(K_ITEM **ml_item, char *buf, tv_t *now, // Discard duplicates if (find_in_ktree_nolock(msgline->trf_root, t_item, ctx)) { - if (transfer->mvalue != transfer->svalue) - FREENULL(transfer->mvalue); + if (transfer->msiz) + ram2 -= transfer->msiz; + free_transfer_data(transfer); K_WLOCK(transfer_free); k_add_head(transfer_free, t_item); K_WUNLOCK(transfer_free); @@ -4332,9 +4341,17 @@ static enum cmd_values breakdown(K_ITEM **ml_item, char *buf, tv_t *now, add_to_ktree_nolock(msgline->trf_root, t_item); k_add_head_nolock(msgline->trf_store, t_item); } + t_item = NULL; } } + if (ram2) { + K_WLOCK(transfer_free); + transfer_free->ram += ram2; + K_WUNLOCK(transfer_free); + ram2 = 0; + } + seqall = find_transfer(msgline->trf_root, SEQALL); if (ckdb_cmds[msgline->which_cmds].createdate) { cd_item = require_name(msgline->trf_root, CDTRF, 10, NULL, @@ -4394,9 +4411,17 @@ static enum cmd_values breakdown(K_ITEM **ml_item, char *buf, tv_t *now, return ckdb_cmds[msgline->which_cmds].cmd_val; nogood: - if (t_item) { + if (t_item || ram2) { K_WLOCK(transfer_free); - k_add_head(transfer_free, t_item); + if (t_item) { + DATA_TRANSFER(transfer, t_item); + if (transfer->msiz) + ram2 -= transfer->msiz; + free_transfer_data(transfer); + k_add_head(transfer_free, t_item); + } + if (ram2) + transfer_free->ram += ram2; K_WUNLOCK(transfer_free); } free(cmdptr); @@ -6012,6 +6037,7 @@ static void process_sockd(PGconn *conn, K_ITEM *wq_item, enum reply_type reply_t free_msgline_data(ml_item, true, true); K_WLOCK(msgline_free); + msgline_free->ram -= msgline->msgsiz; k_add_head(msgline_free, ml_item); K_WUNLOCK(msgline_free); @@ -6585,8 +6611,11 @@ static void *process_socket(__maybe_unused void *arg) WORKQUEUE *wq; DATA_WORKQUEUE(wq, wq2_item); K_ITEM *ml_item = wq->msgline_item; + MSGLINE *ml; + DATA_MSGLINE(ml, ml_item); free_msgline_data(ml_item, true, false); K_WLOCK(msgline_free); + msgline_free->ram -= ml->msgsiz; k_add_head(msgline_free, ml_item); K_WUNLOCK(msgline_free); K_WLOCK(workqueue_free); @@ -6625,8 +6654,11 @@ skippy: dec_sockd = false; if (bq->ml_item) { + MSGLINE *ml; + DATA_MSGLINE(ml, bq->ml_item); free_msgline_data(bq->ml_item, true, true); K_WLOCK(msgline_free); + msgline_free->ram -= ml->msgsiz; k_add_head(msgline_free, bq->ml_item); K_WUNLOCK(msgline_free); bq->ml_item = NULL; @@ -6974,8 +7006,10 @@ static void process_reload_item(PGconn *conn, K_ITEM *bq_item) } if (bq->ml_item) { + DATA_MSGLINE(msgline, bq->ml_item); free_msgline_data(bq->ml_item, true, true); K_WLOCK(msgline_free); + msgline_free->ram -= msgline->msgsiz; k_add_head(msgline_free, bq->ml_item); K_WUNLOCK(msgline_free); bq->ml_item = NULL; @@ -7609,6 +7643,7 @@ static void process_queued(PGconn *conn, K_ITEM *wq_item) free_msgline_data(ml_item, true, true); K_WLOCK(msgline_free); + msgline_free->ram -= msgline->msgsiz; k_add_head(msgline_free, ml_item); K_WUNLOCK(msgline_free); diff --git a/src/ckdb.h b/src/ckdb.h index ab0caefa..884ccf81 100644 --- a/src/ckdb.h +++ b/src/ckdb.h @@ -58,7 +58,7 @@ #define DB_VLOCK "1" #define DB_VERSION "1.0.7" -#define CKDB_VERSION DB_VERSION"-2.506" +#define CKDB_VERSION DB_VERSION"-2.507" #define WHERE_FFL " - from %s %s() line %d" #define WHERE_FFL_HERE __FILE__, __func__, __LINE__ @@ -1349,6 +1349,7 @@ typedef struct msgline { char id[ID_SIZ+1]; char cmd[CMD_SIZ+1]; char *msg; + size_t msgsiz; bool hasseq; char *seqcmdnam; uint64_t n_seqall; @@ -1549,6 +1550,7 @@ typedef struct transfer { char name[NAME_SIZE+1]; char svalue[VALUE_SIZE+1]; char *mvalue; + size_t msiz; INTRANSIENT *intransient; } TRANSFER; @@ -3215,6 +3217,7 @@ extern void sequence_report(bool lock); // Data free functions (first) #define FREE_ITEM(item) do { } while(0) // TODO: make a macro for all other to use above macro +extern void free_transfer_data(TRANSFER *transfer); extern void free_msgline_data(K_ITEM *item, bool t_lock, bool t_cull); extern void free_users_data(K_ITEM *item); extern void free_workinfo_data(K_ITEM *item); diff --git a/src/ckdb_data.c b/src/ckdb_data.c index aaded271..d4bc4461 100644 --- a/src/ckdb_data.c +++ b/src/ckdb_data.c @@ -12,11 +12,18 @@ // Data free functions (added here as needed) +void free_transfer_data(TRANSFER *transfer) +{ + if (transfer->msiz) + FREENULL(transfer->mvalue); +} + void free_msgline_data(K_ITEM *item, bool t_lock, bool t_cull) { K_ITEM *t_item = NULL; TRANSFER *transfer; MSGLINE *msgline; + uint64_t ram2 = 0; DATA_MSGLINE(msgline, item); if (msgline->trf_root) @@ -25,17 +32,13 @@ void free_msgline_data(K_ITEM *item, bool t_lock, bool t_cull) t_item = STORE_HEAD_NOLOCK(msgline->trf_store); while (t_item) { DATA_TRANSFER(transfer, t_item); - if (transfer->mvalue != transfer->svalue) { - if (transfer->intransient) { - transfer->intransient = NULL; - transfer->mvalue = NULL; - } else - FREENULL(transfer->mvalue); - } + ram2 += transfer->msiz; + free_transfer_data(transfer); t_item = t_item->next; } if (t_lock) K_WLOCK(transfer_free); + transfer_free->ram -= ram2; k_list_transfer_to_head(msgline->trf_store, transfer_free); if (t_cull) { if (transfer_free->count == transfer_free->total && @@ -853,9 +856,8 @@ void dsp_transfer(K_ITEM *item, FILE *stream) fprintf(stream, "%s() called with (null) item\n", __func__); else { DATA_TRANSFER(t, item); - fprintf(stream, " name='%s' mvalue='%s' malloc=%c\n", - t->name, t->mvalue, - (t->svalue == t->mvalue) ? 'N' : 'Y'); + fprintf(stream, " name='%s' mvalue='%s' malloc=%"PRIu64"\n", + t->name, t->mvalue, t->msiz); } } diff --git a/src/klist.h b/src/klist.h index 8450dd6c..d40b199b 100644 --- a/src/klist.h +++ b/src/klist.h @@ -152,7 +152,7 @@ typedef struct k_list { void **data_memory; // allocated item data memory buffers void (*dsp_func)(K_ITEM *, FILE *); // optional data display to a file int cull_count; - int ram; // ram allocated for data pointers - code must manage it + uint64_t ram; // ram allocated for data pointers - code must manage it int stores; // how many stores it currently has #if LOCK_CHECK // Since each thread has it's own k_lock no locking is required on this