Browse Source

ckdb - track ram usage for transfer and msgline

master
kanoi 8 years ago
parent
commit
98929f5ce8
  1. 59
      src/ckdb.c
  2. 5
      src/ckdb.h
  3. 22
      src/ckdb_data.c
  4. 2
      src/klist.h

59
src/ckdb.c

@ -209,22 +209,22 @@ const char Transfer[] = "Transfer";
// older version missing field defaults
// see end of alloc_storage()
static TRANSFER auth_2 = { "preauth", FALSE_STR, auth_2.svalue, NULL };
static TRANSFER auth_2 = { "preauth", FALSE_STR, auth_2.svalue, 0, NULL };
K_ITEM auth_preauth = { Transfer, NULL, NULL, (void *)(&auth_2) };
static TRANSFER poolstats_1 = { "elapsed", "0", poolstats_1.svalue, NULL };
static TRANSFER poolstats_1 = { "elapsed", "0", poolstats_1.svalue, 0, NULL };
K_ITEM poolstats_elapsed = { Transfer, NULL, NULL, (void *)(&poolstats_1) };
static TRANSFER userstats_1 = { "elapsed", "0", userstats_1.svalue, NULL };
static TRANSFER userstats_1 = { "elapsed", "0", userstats_1.svalue, 0, NULL };
K_ITEM userstats_elapsed = { Transfer, NULL, NULL, (void *)(&userstats_1) };
// see end of alloc_storage()
INTRANSIENT *userstats_workername = NULL;
static TRANSFER userstats_3 = { "idle", FALSE_STR, userstats_3.svalue, NULL };
static TRANSFER userstats_3 = { "idle", FALSE_STR, userstats_3.svalue, 0, NULL };
K_ITEM userstats_idle = { Transfer, NULL, NULL, (void *)(&userstats_3) };
static TRANSFER userstats_4 = { "eos", TRUE_STR, userstats_4.svalue, NULL };
static TRANSFER userstats_4 = { "eos", TRUE_STR, userstats_4.svalue, 0, NULL };
K_ITEM userstats_eos = { Transfer, NULL, NULL, (void *)(&userstats_4) };
static TRANSFER shares_1 = { "secondaryuserid", TRUE_STR, shares_1.svalue, NULL };
static TRANSFER shares_1 = { "secondaryuserid", TRUE_STR, shares_1.svalue, 0, NULL };
K_ITEM shares_secondaryuserid = { Transfer, NULL, NULL, (void *)(&shares_1) };
static TRANSFER shareerrors_1 = { "secondaryuserid", TRUE_STR, shareerrors_1.svalue, NULL };
static TRANSFER shareerrors_1 = { "secondaryuserid", TRUE_STR, shareerrors_1.svalue, 0, NULL };
K_ITEM shareerrors_secondaryuserid = { Transfer, NULL, NULL, (void *)(&shareerrors_1) };
// Time limit that this problem occurred
// 24-Aug-2014 05:20+00 (1st one shortly after this)
@ -4007,11 +4007,14 @@ static enum cmd_values breakdown(K_ITEM **ml_item, char *buf, tv_t *now,
char *cmdptr, *idptr, *next, *eq, *end, *was;
char *data = NULL, *st = NULL, *st2 = NULL, *ip = NULL;
bool noid = false, intrans;
uint64_t ram2 = 0;
size_t siz;
int i;
siz = strlen(buf)+1;
K_WLOCK(msgline_free);
*ml_item = k_unlink_head_zero(msgline_free);
msgline_free->ram += siz;
K_WUNLOCK(msgline_free);
DATA_MSGLINE(msgline, *ml_item);
msgline->which_cmds = CMD_UNSET;
@ -4021,6 +4024,7 @@ static enum cmd_values breakdown(K_ITEM **ml_item, char *buf, tv_t *now,
DATE_ZERO(&(msgline->broken));
DATE_ZERO(&(msgline->processed));
msgline->msg = strdup(buf);
msgline->msgsiz = siz;
msgline->seqentryflags = seqentryflags;
cmdptr = strdup(buf);
@ -4133,7 +4137,7 @@ static enum cmd_values breakdown(K_ITEM **ml_item, char *buf, tv_t *now,
// LOGERR of buf could be truncated
*(end++) = '\0';
K_WLOCK(transfer_free);
t_item = k_unlink_head(transfer_free);
t_item = k_unlink_head_zero(transfer_free);
K_WUNLOCK(transfer_free);
DATA_TRANSFER(transfer, t_item);
STRNCPY(transfer->name, next);
@ -4246,6 +4250,8 @@ static enum cmd_values breakdown(K_ITEM **ml_item, char *buf, tv_t *now,
if (!intrans) {
transfer->intransient = NULL;
if (siz >= sizeof(transfer->svalue)) {
transfer->msiz = siz+1;
ram2 += siz+1;
transfer->mvalue = malloc(siz+1);
STRNCPYSIZ(transfer->mvalue, next,
siz+1);
@ -4291,7 +4297,7 @@ static enum cmd_values breakdown(K_ITEM **ml_item, char *buf, tv_t *now,
*(eq++) = '\0';
K_WLOCK(transfer_free);
t_item = k_unlink_head(transfer_free);
t_item = k_unlink_head_zero(transfer_free);
K_WUNLOCK(transfer_free);
DATA_TRANSFER(transfer, t_item);
STRNCPY(transfer->name, data);
@ -4311,6 +4317,8 @@ static enum cmd_values breakdown(K_ITEM **ml_item, char *buf, tv_t *now,
if (!intrans) {
transfer->intransient = NULL;
if (siz > sizeof(transfer->svalue)) {
ram2 += siz;
transfer->msiz = siz;
transfer->mvalue = malloc(siz);
STRNCPYSIZ(transfer->mvalue, eq, siz);
} else {
@ -4321,8 +4329,9 @@ static enum cmd_values breakdown(K_ITEM **ml_item, char *buf, tv_t *now,
// Discard duplicates
if (find_in_ktree_nolock(msgline->trf_root, t_item, ctx)) {
if (transfer->mvalue != transfer->svalue)
FREENULL(transfer->mvalue);
if (transfer->msiz)
ram2 -= transfer->msiz;
free_transfer_data(transfer);
K_WLOCK(transfer_free);
k_add_head(transfer_free, t_item);
K_WUNLOCK(transfer_free);
@ -4332,7 +4341,15 @@ static enum cmd_values breakdown(K_ITEM **ml_item, char *buf, tv_t *now,
add_to_ktree_nolock(msgline->trf_root, t_item);
k_add_head_nolock(msgline->trf_store, t_item);
}
t_item = NULL;
}
}
if (ram2) {
K_WLOCK(transfer_free);
transfer_free->ram += ram2;
K_WUNLOCK(transfer_free);
ram2 = 0;
}
seqall = find_transfer(msgline->trf_root, SEQALL);
@ -4394,9 +4411,17 @@ static enum cmd_values breakdown(K_ITEM **ml_item, char *buf, tv_t *now,
return ckdb_cmds[msgline->which_cmds].cmd_val;
nogood:
if (t_item) {
if (t_item || ram2) {
K_WLOCK(transfer_free);
if (t_item) {
DATA_TRANSFER(transfer, t_item);
if (transfer->msiz)
ram2 -= transfer->msiz;
free_transfer_data(transfer);
k_add_head(transfer_free, t_item);
}
if (ram2)
transfer_free->ram += ram2;
K_WUNLOCK(transfer_free);
}
free(cmdptr);
@ -6012,6 +6037,7 @@ static void process_sockd(PGconn *conn, K_ITEM *wq_item, enum reply_type reply_t
free_msgline_data(ml_item, true, true);
K_WLOCK(msgline_free);
msgline_free->ram -= msgline->msgsiz;
k_add_head(msgline_free, ml_item);
K_WUNLOCK(msgline_free);
@ -6585,8 +6611,11 @@ static void *process_socket(__maybe_unused void *arg)
WORKQUEUE *wq;
DATA_WORKQUEUE(wq, wq2_item);
K_ITEM *ml_item = wq->msgline_item;
MSGLINE *ml;
DATA_MSGLINE(ml, ml_item);
free_msgline_data(ml_item, true, false);
K_WLOCK(msgline_free);
msgline_free->ram -= ml->msgsiz;
k_add_head(msgline_free, ml_item);
K_WUNLOCK(msgline_free);
K_WLOCK(workqueue_free);
@ -6625,8 +6654,11 @@ skippy:
dec_sockd = false;
if (bq->ml_item) {
MSGLINE *ml;
DATA_MSGLINE(ml, bq->ml_item);
free_msgline_data(bq->ml_item, true, true);
K_WLOCK(msgline_free);
msgline_free->ram -= ml->msgsiz;
k_add_head(msgline_free, bq->ml_item);
K_WUNLOCK(msgline_free);
bq->ml_item = NULL;
@ -6974,8 +7006,10 @@ static void process_reload_item(PGconn *conn, K_ITEM *bq_item)
}
if (bq->ml_item) {
DATA_MSGLINE(msgline, bq->ml_item);
free_msgline_data(bq->ml_item, true, true);
K_WLOCK(msgline_free);
msgline_free->ram -= msgline->msgsiz;
k_add_head(msgline_free, bq->ml_item);
K_WUNLOCK(msgline_free);
bq->ml_item = NULL;
@ -7609,6 +7643,7 @@ static void process_queued(PGconn *conn, K_ITEM *wq_item)
free_msgline_data(ml_item, true, true);
K_WLOCK(msgline_free);
msgline_free->ram -= msgline->msgsiz;
k_add_head(msgline_free, ml_item);
K_WUNLOCK(msgline_free);

5
src/ckdb.h

@ -58,7 +58,7 @@
#define DB_VLOCK "1"
#define DB_VERSION "1.0.7"
#define CKDB_VERSION DB_VERSION"-2.506"
#define CKDB_VERSION DB_VERSION"-2.507"
#define WHERE_FFL " - from %s %s() line %d"
#define WHERE_FFL_HERE __FILE__, __func__, __LINE__
@ -1349,6 +1349,7 @@ typedef struct msgline {
char id[ID_SIZ+1];
char cmd[CMD_SIZ+1];
char *msg;
size_t msgsiz;
bool hasseq;
char *seqcmdnam;
uint64_t n_seqall;
@ -1549,6 +1550,7 @@ typedef struct transfer {
char name[NAME_SIZE+1];
char svalue[VALUE_SIZE+1];
char *mvalue;
size_t msiz;
INTRANSIENT *intransient;
} TRANSFER;
@ -3215,6 +3217,7 @@ extern void sequence_report(bool lock);
// Data free functions (first)
#define FREE_ITEM(item) do { } while(0)
// TODO: make a macro for all other to use above macro
extern void free_transfer_data(TRANSFER *transfer);
extern void free_msgline_data(K_ITEM *item, bool t_lock, bool t_cull);
extern void free_users_data(K_ITEM *item);
extern void free_workinfo_data(K_ITEM *item);

22
src/ckdb_data.c

@ -12,11 +12,18 @@
// Data free functions (added here as needed)
void free_transfer_data(TRANSFER *transfer)
{
if (transfer->msiz)
FREENULL(transfer->mvalue);
}
void free_msgline_data(K_ITEM *item, bool t_lock, bool t_cull)
{
K_ITEM *t_item = NULL;
TRANSFER *transfer;
MSGLINE *msgline;
uint64_t ram2 = 0;
DATA_MSGLINE(msgline, item);
if (msgline->trf_root)
@ -25,17 +32,13 @@ void free_msgline_data(K_ITEM *item, bool t_lock, bool t_cull)
t_item = STORE_HEAD_NOLOCK(msgline->trf_store);
while (t_item) {
DATA_TRANSFER(transfer, t_item);
if (transfer->mvalue != transfer->svalue) {
if (transfer->intransient) {
transfer->intransient = NULL;
transfer->mvalue = NULL;
} else
FREENULL(transfer->mvalue);
}
ram2 += transfer->msiz;
free_transfer_data(transfer);
t_item = t_item->next;
}
if (t_lock)
K_WLOCK(transfer_free);
transfer_free->ram -= ram2;
k_list_transfer_to_head(msgline->trf_store, transfer_free);
if (t_cull) {
if (transfer_free->count == transfer_free->total &&
@ -853,9 +856,8 @@ void dsp_transfer(K_ITEM *item, FILE *stream)
fprintf(stream, "%s() called with (null) item\n", __func__);
else {
DATA_TRANSFER(t, item);
fprintf(stream, " name='%s' mvalue='%s' malloc=%c\n",
t->name, t->mvalue,
(t->svalue == t->mvalue) ? 'N' : 'Y');
fprintf(stream, " name='%s' mvalue='%s' malloc=%"PRIu64"\n",
t->name, t->mvalue, t->msiz);
}
}

2
src/klist.h

@ -152,7 +152,7 @@ typedef struct k_list {
void **data_memory; // allocated item data memory buffers
void (*dsp_func)(K_ITEM *, FILE *); // optional data display to a file
int cull_count;
int ram; // ram allocated for data pointers - code must manage it
uint64_t ram; // ram allocated for data pointers - code must manage it
int stores; // how many stores it currently has
#if LOCK_CHECK
// Since each thread has it's own k_lock no locking is required on this

Loading…
Cancel
Save