From 333d05fb3a29ebcaa548afaa16bfac48268a678f Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Tue, 17 Jun 2014 08:36:15 +1000 Subject: [PATCH 1/5] Create a json sending wrapper for ckdb and fix int64 usage in packed json --- src/ckpool.c | 23 ++++++++++++++++++ src/ckpool.h | 3 +++ src/stratifier.c | 61 ++++++++++++++---------------------------------- 3 files changed, 43 insertions(+), 44 deletions(-) diff --git a/src/ckpool.c b/src/ckpool.c index 5015dbc8..df6b8f91 100644 --- a/src/ckpool.c +++ b/src/ckpool.c @@ -323,6 +323,29 @@ out: return buf; } +/* Send a json msg to ckdb with its idmsg and return the response, consuming + * the json */ +char *_json_ckdb_call(const ckpool_t *ckp, const char *idmsg, json_t *val, + const char *file, const char *func, const int line) +{ + char *msg = NULL, *dump, *buf = NULL; + + dump = json_dumps(val, JSON_COMPACT); + if (unlikely(!dump)) { + LOGWARNING("Json dump failed in json_ckdb_call from %s %s:%d", file, func, line); + goto out; + } + ASPRINTF(&msg, "is.%s.json=%s", idmsg, dump); + free(dump); + LOGDEBUG("Sending ckdb: %s", msg); + buf = _send_recv_ckdb(ckp, msg, file, func, line); + LOGDEBUG("Received from ckdb: %s", buf); + free(msg); +out: + json_decref(val); + return buf; +} + json_t *json_rpc_call(connsock_t *cs, const char *rpc_req) { diff --git a/src/ckpool.h b/src/ckpool.h index 3a857f37..58f75553 100644 --- a/src/ckpool.h +++ b/src/ckpool.h @@ -136,6 +136,9 @@ char *_send_recv_proc(proc_instance_t *pi, const char *msg, const char *file, co #define send_recv_proc(pi, msg) _send_recv_proc(pi, msg, __FILE__, __func__, __LINE__) char *_send_recv_ckdb(const ckpool_t *ckp, const char *msg, const char *file, const char *func, const int line); #define send_recv_ckdb(ckp, msg) _send_recv_ckdb(ckp, msg, __FILE__, __func__, __LINE__) +char *_json_ckdb_call(const ckpool_t *ckp, const char *idmsg, json_t *val, + const char *file, const char *func, const int line); +#define json_ckdb_call(ckp, idmsg, val) _json_ckdb_call(ckp, idmsg, val, __FILE__, __func__, __LINE__) json_t *json_rpc_call(connsock_t *cs, const char *rpc_req); diff --git a/src/stratifier.c b/src/stratifier.c index 0e40c149..0190d0ab 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -401,13 +401,13 @@ static void purge_share_hashtable(int64_t wb_id) /* FIXME This message will be sent to the database once it's hooked in */ static void send_workinfo(ckpool_t *ckp, workbase_t *wb) { - char *msg, *dump, *buf; char cdfield[64]; json_t *val; + char *buf; sprintf(cdfield, "%lu,%lu", wb->gentime.tv_sec, wb->gentime.tv_nsec); - val = json_pack("{ss,si,ss,ss,ss,ss,ss,ss,ss,ss,si,so,ss,ss,ss,ss}", + val = json_pack("{ss,sI,ss,ss,ss,ss,ss,ss,ss,ss,sI,so,ss,ss,ss,ss}", "method", "workinfo", "workinfoid", wb->id, "poolinstance", ckp->name, @@ -424,17 +424,12 @@ static void send_workinfo(ckpool_t *ckp, workbase_t *wb) "createby", "code", "createcode", __func__, "createinet", "127.0.0.1"); - dump = json_dumps(val, 0); - json_decref(val); - ASPRINTF(&msg, "id.sharelog.json=%s", dump); - dealloc(dump); - buf = send_recv_ckdb(ckp, msg); + buf = json_ckdb_call(ckp, "sharelog", val); if (likely(buf)) { LOGWARNING("Got workinfo response: %s", buf); dealloc(buf); } else LOGWARNING("Got no workinfo response :("); - dealloc(msg); } static void add_base(ckpool_t *ckp, workbase_t *wb, bool *new_block) @@ -1192,10 +1187,10 @@ static user_instance_t *authorise_user(const char *workername) * and get parameters back */ static bool send_recv_auth(stratum_instance_t *client) { - char *msg, *dump, *buf; char cdfield[64]; bool ret = false; json_t *val; + char *buf; ts_t now; ts_realtime(&now); @@ -1212,25 +1207,21 @@ static bool send_recv_auth(stratum_instance_t *client) "createby", "code", "createcode", __func__, "createinet", "127.0.0.1"); - dump = json_dumps(val, 0); - json_decref(val); - ASPRINTF(&msg, "id.authorise.json=%s", dump); - dealloc(dump); - buf = send_recv_ckdb(client->ckp, msg); + buf = json_ckdb_call(client->ckp, "authorise", val); if (likely(buf)) { char *secondaryuserid, *response = alloca(128); - LOGWARNING("Got auth response: %s", buf); - sscanf(buf, "id.%*d.%s", response); + sscanf(buf, "is.%*d.%s", response); secondaryuserid = response; strsep(&secondaryuserid, "."); + LOGWARNING("Got auth response: %s response: %s suid: %s", buf, + response, secondaryuserid); if (!strcmp(response, "added")) { client->secondaryuserid = strdup(secondaryuserid); ret = true; } } else LOGWARNING("Got no auth response :("); - dealloc(msg); return ret; } @@ -1274,7 +1265,7 @@ static void stratum_send_diff(stratum_instance_t *client) { json_t *json_msg; - json_msg = json_pack("{s[i]soss}", "params", client->diff, "id", json_null(), + json_msg = json_pack("{s[I]soss}", "params", client->diff, "id", json_null(), "method", "mining.set_difficulty"); stratum_add_send(json_msg, client->id); } @@ -1560,10 +1551,9 @@ static json_t *parse_submit(stratum_instance_t *client, json_t *json_msg, const char *user, *job_id, *nonce2, *ntime, *nonce; double diff, wdiff = 0, sdiff = -1; enum share_err err = SE_NONE; - char *msg, *dump, *buf; + char *fname, *s, *buf; char idstring[20]; uint32_t ntime32; - char *fname, *s; workbase_t *wb; uchar hash[32]; int64_t id; @@ -1751,21 +1741,15 @@ out_unlock: LOGERR("Failed to fwrite to %s", fname); } else LOGERR("Failed to fopen %s", fname); - /* FIXME : Send val json to database here */ - dump = json_dumps(val, 0); - json_decref(val); - ASPRINTF(&msg, "id.sharelog.json=%s", dump); - dealloc(dump); - buf = send_recv_ckdb(client->ckp, msg); + buf = json_ckdb_call(client->ckp, "sharelog", val); if (likely(buf)) { LOGWARNING("Got sharelog response: %s", buf); dealloc(buf); } else LOGWARNING("Got no sharelog response :("); - dealloc(msg); out: if (!share) { - val = json_pack("{ss,si,ss,ss,si,ss,ss,so,si,ss,ss,ss,ss}", + val = json_pack("{ss,si,ss,ss,sI,ss,ss,so,si,ss,ss,ss,ss}", "method", "shareerror", "clientid", client->id, "secondaryuserid", client->secondaryuserid, @@ -1780,17 +1764,12 @@ out: "createcode", __func__, "createinet", "127.0.0.1"); /* FIXME : Send val json to database here */ - dump = json_dumps(val, 0); - json_decref(val); - ASPRINTF(&msg, "id.sharelog.json=%s", dump); - dealloc(dump); - buf = send_recv_ckdb(client->ckp, msg); + buf = json_ckdb_call(client->ckp, "sharelog", val); if (likely(buf)) { LOGWARNING("Got sharelog response: %s", buf); dealloc(buf); } else LOGWARNING("Got no sharelog response :("); - dealloc(msg); LOGINFO("Invalid share from client %d: %s", client->id, client->workername); } return json_boolean(result); @@ -2186,15 +2165,14 @@ static void *statsupdate(void *arg) const double nonces = 4294967296; double sps1, sps5, sps15, sps60; user_instance_t *instance, *tmp; - char *msg, *dump, *buf; int64_t pplns_shares; char fname[512] = {}; tv_t now, diff; + char *s, *buf; int users, i; ts_t ts_now; json_t *val; FILE *fp; - char *s; tv_time(&now); timersub(&now, &stats.start_time, &diff); @@ -2260,7 +2238,7 @@ static void *statsupdate(void *arg) fprintf(fp, "%s\n", s); dealloc(s); - val = json_pack("{si,sf,sf,sf,sf}", + val = json_pack("{sI,sf,sf,sf,sf}", "Round shares", stats.round_shares, "SPS1m", sps1, "SPS5m", sps5, @@ -2304,7 +2282,7 @@ static void *statsupdate(void *arg) reward = 25 * instance->pplns_shares; reward /= pplns_shares; - val = json_pack("{si,si,sf,ss,ss,ss,ss,ss,ss}", + val = json_pack("{sI,sI,sf,ss,ss,ss,ss,ss,ss}", "Accepted", instance->diff_accepted, "Rejected", instance->diff_rejected, "Est reward", reward, @@ -2348,17 +2326,12 @@ static void *statsupdate(void *arg) "createby", "code", "createcode", __func__, "createinet", "127.0.0.1"); - dump = json_dumps(val, 0); - json_decref(val); - ASPRINTF(&msg, "id.stats.json=%s", dump); - dealloc(dump); - buf = send_recv_ckdb(ckp, msg); + buf = json_ckdb_call(ckp, "stats", val); if (likely(buf)) { LOGWARNING("Got stats response: %s", buf); dealloc(buf); } else LOGWARNING("Got no stats response :("); - dealloc(msg); /* Update stats 4 times per minute for smooth values, displaying * status every minute. */ From 0ce603c9e57b9c486158993f11bfa545abfa97c4 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Tue, 17 Jun 2014 09:45:35 +1000 Subject: [PATCH 2/5] Change message to ckdb to id. --- src/ckpool.c | 2 +- src/stratifier.c | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/ckpool.c b/src/ckpool.c index df6b8f91..47142152 100644 --- a/src/ckpool.c +++ b/src/ckpool.c @@ -335,7 +335,7 @@ char *_json_ckdb_call(const ckpool_t *ckp, const char *idmsg, json_t *val, LOGWARNING("Json dump failed in json_ckdb_call from %s %s:%d", file, func, line); goto out; } - ASPRINTF(&msg, "is.%s.json=%s", idmsg, dump); + ASPRINTF(&msg, "id.%s.json=%s", idmsg, dump); free(dump); LOGDEBUG("Sending ckdb: %s", msg); buf = _send_recv_ckdb(ckp, msg, file, func, line); diff --git a/src/stratifier.c b/src/stratifier.c index 0190d0ab..9dcd7cc4 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -1211,7 +1211,7 @@ static bool send_recv_auth(stratum_instance_t *client) if (likely(buf)) { char *secondaryuserid, *response = alloca(128); - sscanf(buf, "is.%*d.%s", response); + sscanf(buf, "id.%*d.%s", response); secondaryuserid = response; strsep(&secondaryuserid, "."); LOGWARNING("Got auth response: %s response: %s suid: %s", buf, From 7d16041dd1623fee0deab6dabd8ad34c65cc976c Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Tue, 17 Jun 2014 10:07:01 +1000 Subject: [PATCH 3/5] Remove any concept of round shares or pplns from stratifier --- src/stratifier.c | 149 ++--------------------------------------------- 1 file changed, 5 insertions(+), 144 deletions(-) diff --git a/src/stratifier.c b/src/stratifier.c index 9dcd7cc4..0406c94d 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -61,8 +61,6 @@ struct pool_stats { int64_t accounted_diff_shares; int64_t unaccounted_rejects; int64_t accounted_rejects; - int64_t pplns_shares; - int64_t round_shares; /* Diff shares per second for 1/5/15... minute rolling averages */ double dsps1; @@ -200,7 +198,6 @@ struct user_instance { int64_t diff_accepted; int64_t diff_rejected; - uint64_t pplns_shares; tv_t last_share; @@ -861,25 +858,6 @@ static void drop_client(int id) ck_uilock(&instance_lock); } -/* FIXME: Talk to database instead of doing this */ -static void log_pplns(const char *logdir, user_instance_t *instance) -{ - char fnametmp[512] = {}, fname[512] = {}; - FILE *fp; - - snprintf(fnametmp, 511, "%s/%stmp.pplns", logdir, instance->username); - fp = fopen(fnametmp, "w"); - if (unlikely(!fp)) { - LOGERR("Failed to fopen %s", fnametmp); - return; - } - fprintf(fp, "%lu,%ld,%ld", instance->pplns_shares, instance->diff_accepted, instance->diff_rejected); - fclose(fp); - snprintf(fname, 511, "%s/%s.pplns", logdir, instance->username); - if (rename(fnametmp, fname)) - LOGERR("Failed to rename %s to %s", fnametmp, fname); -} - static void stratum_broadcast_message(const char *msg) { json_t *json_msg; @@ -889,57 +867,14 @@ static void stratum_broadcast_message(const char *msg) stratum_broadcast(json_msg); } -/* FIXME: This is all a simple workaround till we use a proper database. */ +/* FIXME: Speak to database here. */ static void block_solve(ckpool_t *ckp) { - double round, total = 0, retain = 0, window; - user_instance_t *instance, *tmp; char *msg; - ck_rlock(&workbase_lock); - window = current_workbase->network_diff; - ck_runlock(&workbase_lock); - - LOGWARNING("Block solve user summary"); - - mutex_lock(&stats_lock); - total = stats.pplns_shares; - round = stats.round_shares; - mutex_unlock(&stats_lock); - - if (unlikely(total == 0.0)) - total = 1; - - ck_rlock(&instance_lock); - /* What proportion of shares should each user retain */ - if (total > window) - retain = (total - window) / total; - HASH_ITER(hh, user_instances, instance, tmp) { - double residual, shares, percentage; - - shares = instance->pplns_shares; - if (!shares) - continue; - residual = shares * retain; - percentage = shares / total * 100; - LOGWARNING("User %s: Reward: %f %% Credited: %.0f Remaining: %.0f", - instance->username, percentage, shares, residual); - instance->pplns_shares = residual; - instance->diff_accepted = instance->diff_rejected = 0; - log_pplns(ckp->logdir, instance); - } - ck_runlock(&instance_lock); - - LOGWARNING("Round shares: %.0f Total pplns: %.0f pplns window %.0f", round, total, window); - - ASPRINTF(&msg, "Block solved by %s after %.0f shares!", ckp->name, round); + ASPRINTF(&msg, "Block solved by %s!", ckp->name); stratum_broadcast_message(msg); free(msg); - - mutex_lock(&stats_lock); - stats.round_shares = 0; - stats.pplns_shares *= retain; - mutex_unlock(&stats_lock); } static int stratum_loop(ckpool_t *ckp, proc_instance_t *pi) @@ -1298,7 +1233,6 @@ static void add_submit(stratum_instance_t *client, user_instance_t *instance, in bool valid) { double tdiff, bdiff, dsps, drr, network_diff, bias; - ckpool_t *ckp = client->ckp; int64_t next_blockid, optimal; tv_t now_t; @@ -1322,12 +1256,6 @@ static void add_submit(stratum_instance_t *client, user_instance_t *instance, in decay_time(&instance->dsps360, diff, tdiff, 21600); decay_time(&instance->dsps1440, diff, tdiff, 86400); copy_tv(&instance->last_share, &now_t); - - /* Write the share summary to a tmp file first and then move - * it to the user file to prevent leaving us without a file - * if we abort at just the wrong time. */ - instance->pplns_shares += diff; - log_pplns(ckp->logdir, instance); } else instance->diff_rejected += diff; @@ -2165,7 +2093,6 @@ static void *statsupdate(void *arg) const double nonces = 4294967296; double sps1, sps5, sps15, sps60; user_instance_t *instance, *tmp; - int64_t pplns_shares; char fname[512] = {}; tv_t now, diff; char *s, *buf; @@ -2211,8 +2138,6 @@ static void *statsupdate(void *arg) if (unlikely(!fp)) LOGERR("Failed to fopen %s", fname); - pplns_shares = stats.pplns_shares + 1; - val = json_pack("{si,si,si,si,si}", "runtime", diff.tv_sec, "Live clients", stats.live_clients, @@ -2238,8 +2163,7 @@ static void *statsupdate(void *arg) fprintf(fp, "%s\n", s); dealloc(s); - val = json_pack("{sI,sf,sf,sf,sf}", - "Round shares", stats.round_shares, + val = json_pack("{sf,sf,sf,sf}", "SPS1m", sps1, "SPS5m", sps5, "SPS15m", sps15, @@ -2254,8 +2178,8 @@ static void *statsupdate(void *arg) ck_rlock(&instance_lock); users = HASH_COUNT(user_instances); HASH_ITER(hh, user_instances, instance, tmp) { - double reward, ghs; bool idle = false; + double ghs; if (now.tv_sec - instance->last_share.tv_sec > 60) { idle = true; @@ -2280,12 +2204,9 @@ static void *statsupdate(void *arg) ghs = instance->dsps1440 * nonces; suffix_string(ghs, suffix1440, 16, 0); - reward = 25 * instance->pplns_shares; - reward /= pplns_shares; - val = json_pack("{sI,sI,sf,ss,ss,ss,ss,ss,ss}", + val = json_pack("{sI,sI,ss,ss,ss,ss,ss,ss}", "Accepted", instance->diff_accepted, "Rejected", instance->diff_rejected, - "Est reward", reward, "hashrate1m", suffix1, "hashrate5m", suffix5, "hashrate15m", suffix15, @@ -2342,8 +2263,6 @@ static void *statsupdate(void *arg) mutex_lock(&stats_lock); stats.accounted_shares += stats.unaccounted_shares; stats.accounted_diff_shares += stats.unaccounted_diff_shares; - stats.round_shares += stats.unaccounted_diff_shares; - stats.pplns_shares += stats.unaccounted_diff_shares; stats.accounted_rejects += stats.unaccounted_rejects; decay_time(&stats.sps1, stats.unaccounted_shares, 15, 60); @@ -2368,62 +2287,6 @@ static void *statsupdate(void *arg) return NULL; } -static void load_users(ckpool_t *ckp) -{ - struct dirent *ep; - DIR *dp; - - dp = opendir(ckp->logdir); - if (!dp) - quit(1, "Failed to open logdir %s!", ckp->logdir); - while ((ep = readdir(dp))) { - int64_t diff_accepted, diff_rejected = 0; - user_instance_t *instance; - uint64_t pplns_shares; - char fname[512] = {}; - char *period; - int results; - FILE *fp; - - if (strlen(ep->d_name) < 7) - continue; - if (!strstr(ep->d_name, ".pplns")) - continue; - - snprintf(fname, 511, "%s%s", ckp->logdir, ep->d_name); - fp = fopen(fname, "r"); - if (!fp) { - LOGERR("Failed to open pplns logfile %s!", fname); - continue; - } - results = fscanf(fp, "%lu,%ld,%ld", &pplns_shares, &diff_accepted, &diff_rejected); - if (results < 1) - continue; - if (results == 1) - diff_accepted = pplns_shares; - if (!pplns_shares) - continue; - - /* Create a new user instance */ - instance = ckzalloc(sizeof(user_instance_t)); - strncpy(instance->username, ep->d_name, 127); - period = strstr(instance->username, "."); - *period = '\0'; - instance->pplns_shares = pplns_shares; - stats.pplns_shares += pplns_shares; - instance->diff_accepted = diff_accepted; - instance->diff_rejected = diff_rejected; - stats.round_shares += diff_accepted; - - ck_wlock(&instance_lock); - HASH_ADD_STR(user_instances, username, instance); - ck_wunlock(&instance_lock); - - LOGDEBUG("Added user %s with %lu shares", instance->username, pplns_shares); - } - closedir(dp); -} - int stratifier(proc_instance_t *pi) { pthread_t pth_blockupdate, pth_stratum_receiver, pth_stratum_sender; @@ -2478,8 +2341,6 @@ int stratifier(proc_instance_t *pi) cklock_init(&share_lock); - load_users(ckp); - ret = stratum_loop(ckp, pi); out: return process_exit(ckp, pi, ret); From 1af6eda2b11a9d7352852e74f6ea7f1ebde7b84d Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Tue, 17 Jun 2014 10:16:32 +1000 Subject: [PATCH 4/5] Remove all concept of accepted/rejected shares per user from stratifier --- src/stratifier.c | 34 +++++++++++----------------------- 1 file changed, 11 insertions(+), 23 deletions(-) diff --git a/src/stratifier.c b/src/stratifier.c index 0406c94d..3f6a47c3 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -196,9 +196,6 @@ struct user_instance { char username[128]; int id; - int64_t diff_accepted; - int64_t diff_rejected; - tv_t last_share; double dsps1; @@ -231,8 +228,6 @@ struct stratum_instance { int ssdc; /* Shares since diff change */ tv_t first_share; tv_t last_share; - int64_t absolute_shares; - int64_t diff_shares; bool authorised; @@ -1243,21 +1238,16 @@ static void add_submit(stratum_instance_t *client, user_instance_t *instance, in network_diff = current_workbase->network_diff; ck_runlock(&workbase_lock); - if (valid) { - tdiff = sane_tdiff(&now_t, &instance->last_share); - if (unlikely(!client->absolute_shares++)) - tv_time(&client->first_share); - client->diff_shares += diff; - instance->diff_accepted += diff; - decay_time(&instance->dsps1, diff, tdiff, 60); - decay_time(&instance->dsps5, diff, tdiff, 300); - decay_time(&instance->dsps15, diff, tdiff, 900); - decay_time(&instance->dsps60, diff, tdiff, 3600); - decay_time(&instance->dsps360, diff, tdiff, 21600); - decay_time(&instance->dsps1440, diff, tdiff, 86400); - copy_tv(&instance->last_share, &now_t); - } else - instance->diff_rejected += diff; + tdiff = sane_tdiff(&now_t, &instance->last_share); + if (unlikely(!client->first_share.tv_sec)) + copy_tv(&client->first_share, &now_t); + decay_time(&instance->dsps1, diff, tdiff, 60); + decay_time(&instance->dsps5, diff, tdiff, 300); + decay_time(&instance->dsps15, diff, tdiff, 900); + decay_time(&instance->dsps60, diff, tdiff, 3600); + decay_time(&instance->dsps360, diff, tdiff, 21600); + decay_time(&instance->dsps1440, diff, tdiff, 86400); + copy_tv(&instance->last_share, &now_t); tdiff = sane_tdiff(&now_t, &client->last_share); copy_tv(&client->last_share, &now_t); @@ -2204,9 +2194,7 @@ static void *statsupdate(void *arg) ghs = instance->dsps1440 * nonces; suffix_string(ghs, suffix1440, 16, 0); - val = json_pack("{sI,sI,ss,ss,ss,ss,ss,ss}", - "Accepted", instance->diff_accepted, - "Rejected", instance->diff_rejected, + val = json_pack("{ss,ss,ss,ss,ss,ss}", "hashrate1m", suffix1, "hashrate5m", suffix5, "hashrate15m", suffix15, From 1682d8e0f32bbba9c5144de5ad18d98c7c2d8d2c Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Tue, 17 Jun 2014 10:45:28 +1000 Subject: [PATCH 5/5] Cope with empty transactions in json encoding --- src/stratifier.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/stratifier.c b/src/stratifier.c index 3f6a47c3..6afad0b2 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -516,7 +516,8 @@ static void update_base(ckpool_t *ckp) if (wb->transactions) { json_strdup(&wb->txn_data, val, "txn_data"); json_strdup(&wb->txn_hashes, val, "txn_hashes"); - } + } else + wb->txn_hashes = ckzalloc(1); json_intcpy(&wb->merkles, val, "merkles"); wb->merkle_array = json_array(); if (wb->merkles) {