kanoi 11 years ago
parent
commit
cf2f27fabc
  1. 23
      src/ckpool.c
  2. 3
      src/ckpool.h
  3. 223
      src/stratifier.c

23
src/ckpool.c

@ -323,6 +323,29 @@ out:
return buf;
}
/* Send a json msg to ckdb with its idmsg and return the response, consuming
* the json */
char *_json_ckdb_call(const ckpool_t *ckp, const char *idmsg, json_t *val,
const char *file, const char *func, const int line)
{
char *msg = NULL, *dump, *buf = NULL;
dump = json_dumps(val, JSON_COMPACT);
if (unlikely(!dump)) {
LOGWARNING("Json dump failed in json_ckdb_call from %s %s:%d", file, func, line);
goto out;
}
ASPRINTF(&msg, "id.%s.json=%s", idmsg, dump);
free(dump);
LOGDEBUG("Sending ckdb: %s", msg);
buf = _send_recv_ckdb(ckp, msg, file, func, line);
LOGDEBUG("Received from ckdb: %s", buf);
free(msg);
out:
json_decref(val);
return buf;
}
json_t *json_rpc_call(connsock_t *cs, const char *rpc_req)
{

3
src/ckpool.h

@ -136,6 +136,9 @@ char *_send_recv_proc(proc_instance_t *pi, const char *msg, const char *file, co
#define send_recv_proc(pi, msg) _send_recv_proc(pi, msg, __FILE__, __func__, __LINE__)
char *_send_recv_ckdb(const ckpool_t *ckp, const char *msg, const char *file, const char *func, const int line);
#define send_recv_ckdb(ckp, msg) _send_recv_ckdb(ckp, msg, __FILE__, __func__, __LINE__)
char *_json_ckdb_call(const ckpool_t *ckp, const char *idmsg, json_t *val,
const char *file, const char *func, const int line);
#define json_ckdb_call(ckp, idmsg, val) _json_ckdb_call(ckp, idmsg, val, __FILE__, __func__, __LINE__)
json_t *json_rpc_call(connsock_t *cs, const char *rpc_req);

223
src/stratifier.c

@ -61,8 +61,6 @@ struct pool_stats {
int64_t accounted_diff_shares;
int64_t unaccounted_rejects;
int64_t accounted_rejects;
int64_t pplns_shares;
int64_t round_shares;
/* Diff shares per second for 1/5/15... minute rolling averages */
double dsps1;
@ -198,10 +196,6 @@ struct user_instance {
char username[128];
int id;
int64_t diff_accepted;
int64_t diff_rejected;
uint64_t pplns_shares;
tv_t last_share;
double dsps1;
@ -234,8 +228,6 @@ struct stratum_instance {
int ssdc; /* Shares since diff change */
tv_t first_share;
tv_t last_share;
int64_t absolute_shares;
int64_t diff_shares;
bool authorised;
@ -401,13 +393,13 @@ static void purge_share_hashtable(int64_t wb_id)
/* FIXME This message will be sent to the database once it's hooked in */
static void send_workinfo(ckpool_t *ckp, workbase_t *wb)
{
char *msg, *dump, *buf;
char cdfield[64];
json_t *val;
char *buf;
sprintf(cdfield, "%lu,%lu", wb->gentime.tv_sec, wb->gentime.tv_nsec);
val = json_pack("{ss,si,ss,ss,ss,ss,ss,ss,ss,ss,si,so,ss,ss,ss,ss}",
val = json_pack("{ss,sI,ss,ss,ss,ss,ss,ss,ss,ss,sI,so,ss,ss,ss,ss}",
"method", "workinfo",
"workinfoid", wb->id,
"poolinstance", ckp->name,
@ -424,17 +416,12 @@ static void send_workinfo(ckpool_t *ckp, workbase_t *wb)
"createby", "code",
"createcode", __func__,
"createinet", "127.0.0.1");
dump = json_dumps(val, 0);
json_decref(val);
ASPRINTF(&msg, "id.sharelog.json=%s", dump);
dealloc(dump);
buf = send_recv_ckdb(ckp, msg);
buf = json_ckdb_call(ckp, "sharelog", val);
if (likely(buf)) {
LOGWARNING("Got workinfo response: %s", buf);
dealloc(buf);
} else
LOGWARNING("Got no workinfo response :(");
dealloc(msg);
}
static void add_base(ckpool_t *ckp, workbase_t *wb, bool *new_block)
@ -529,7 +516,8 @@ static void update_base(ckpool_t *ckp)
if (wb->transactions) {
json_strdup(&wb->txn_data, val, "txn_data");
json_strdup(&wb->txn_hashes, val, "txn_hashes");
}
} else
wb->txn_hashes = ckzalloc(1);
json_intcpy(&wb->merkles, val, "merkles");
wb->merkle_array = json_array();
if (wb->merkles) {
@ -866,25 +854,6 @@ static void drop_client(int id)
ck_uilock(&instance_lock);
}
/* FIXME: Talk to database instead of doing this */
static void log_pplns(const char *logdir, user_instance_t *instance)
{
char fnametmp[512] = {}, fname[512] = {};
FILE *fp;
snprintf(fnametmp, 511, "%s/%stmp.pplns", logdir, instance->username);
fp = fopen(fnametmp, "w");
if (unlikely(!fp)) {
LOGERR("Failed to fopen %s", fnametmp);
return;
}
fprintf(fp, "%lu,%ld,%ld", instance->pplns_shares, instance->diff_accepted, instance->diff_rejected);
fclose(fp);
snprintf(fname, 511, "%s/%s.pplns", logdir, instance->username);
if (rename(fnametmp, fname))
LOGERR("Failed to rename %s to %s", fnametmp, fname);
}
static void stratum_broadcast_message(const char *msg)
{
json_t *json_msg;
@ -894,57 +863,14 @@ static void stratum_broadcast_message(const char *msg)
stratum_broadcast(json_msg);
}
/* FIXME: This is all a simple workaround till we use a proper database. */
/* FIXME: Speak to database here. */
static void block_solve(ckpool_t *ckp)
{
double round, total = 0, retain = 0, window;
user_instance_t *instance, *tmp;
char *msg;
ck_rlock(&workbase_lock);
window = current_workbase->network_diff;
ck_runlock(&workbase_lock);
LOGWARNING("Block solve user summary");
mutex_lock(&stats_lock);
total = stats.pplns_shares;
round = stats.round_shares;
mutex_unlock(&stats_lock);
if (unlikely(total == 0.0))
total = 1;
ck_rlock(&instance_lock);
/* What proportion of shares should each user retain */
if (total > window)
retain = (total - window) / total;
HASH_ITER(hh, user_instances, instance, tmp) {
double residual, shares, percentage;
shares = instance->pplns_shares;
if (!shares)
continue;
residual = shares * retain;
percentage = shares / total * 100;
LOGWARNING("User %s: Reward: %f %% Credited: %.0f Remaining: %.0f",
instance->username, percentage, shares, residual);
instance->pplns_shares = residual;
instance->diff_accepted = instance->diff_rejected = 0;
log_pplns(ckp->logdir, instance);
}
ck_runlock(&instance_lock);
LOGWARNING("Round shares: %.0f Total pplns: %.0f pplns window %.0f", round, total, window);
ASPRINTF(&msg, "Block solved by %s after %.0f shares!", ckp->name, round);
ASPRINTF(&msg, "Block solved by %s!", ckp->name);
stratum_broadcast_message(msg);
free(msg);
mutex_lock(&stats_lock);
stats.round_shares = 0;
stats.pplns_shares *= retain;
mutex_unlock(&stats_lock);
}
static int stratum_loop(ckpool_t *ckp, proc_instance_t *pi)
@ -1192,10 +1118,10 @@ static user_instance_t *authorise_user(const char *workername)
* and get parameters back */
static bool send_recv_auth(stratum_instance_t *client)
{
char *msg, *dump, *buf;
char cdfield[64];
bool ret = false;
json_t *val;
char *buf;
ts_t now;
ts_realtime(&now);
@ -1212,25 +1138,21 @@ static bool send_recv_auth(stratum_instance_t *client)
"createby", "code",
"createcode", __func__,
"createinet", "127.0.0.1");
dump = json_dumps(val, 0);
json_decref(val);
ASPRINTF(&msg, "id.authorise.json=%s", dump);
dealloc(dump);
buf = send_recv_ckdb(client->ckp, msg);
buf = json_ckdb_call(client->ckp, "authorise", val);
if (likely(buf)) {
char *secondaryuserid, *response = alloca(128);
LOGWARNING("Got auth response: %s", buf);
sscanf(buf, "id.%*d.%s", response);
secondaryuserid = response;
strsep(&secondaryuserid, ".");
LOGWARNING("Got auth response: %s response: %s suid: %s", buf,
response, secondaryuserid);
if (!strcmp(response, "added")) {
client->secondaryuserid = strdup(secondaryuserid);
ret = true;
}
} else
LOGWARNING("Got no auth response :(");
dealloc(msg);
return ret;
}
@ -1274,7 +1196,7 @@ static void stratum_send_diff(stratum_instance_t *client)
{
json_t *json_msg;
json_msg = json_pack("{s[i]soss}", "params", client->diff, "id", json_null(),
json_msg = json_pack("{s[I]soss}", "params", client->diff, "id", json_null(),
"method", "mining.set_difficulty");
stratum_add_send(json_msg, client->id);
}
@ -1307,7 +1229,6 @@ static void add_submit(stratum_instance_t *client, user_instance_t *instance, in
bool valid)
{
double tdiff, bdiff, dsps, drr, network_diff, bias;
ckpool_t *ckp = client->ckp;
int64_t next_blockid, optimal;
tv_t now_t;
@ -1318,12 +1239,9 @@ static void add_submit(stratum_instance_t *client, user_instance_t *instance, in
network_diff = current_workbase->network_diff;
ck_runlock(&workbase_lock);
if (valid) {
tdiff = sane_tdiff(&now_t, &instance->last_share);
if (unlikely(!client->absolute_shares++))
tv_time(&client->first_share);
client->diff_shares += diff;
instance->diff_accepted += diff;
if (unlikely(!client->first_share.tv_sec))
copy_tv(&client->first_share, &now_t);
decay_time(&instance->dsps1, diff, tdiff, 60);
decay_time(&instance->dsps5, diff, tdiff, 300);
decay_time(&instance->dsps15, diff, tdiff, 900);
@ -1332,14 +1250,6 @@ static void add_submit(stratum_instance_t *client, user_instance_t *instance, in
decay_time(&instance->dsps1440, diff, tdiff, 86400);
copy_tv(&instance->last_share, &now_t);
/* Write the share summary to a tmp file first and then move
* it to the user file to prevent leaving us without a file
* if we abort at just the wrong time. */
instance->pplns_shares += diff;
log_pplns(ckp->logdir, instance);
} else
instance->diff_rejected += diff;
tdiff = sane_tdiff(&now_t, &client->last_share);
copy_tv(&client->last_share, &now_t);
client->ssdc++;
@ -1560,10 +1470,9 @@ static json_t *parse_submit(stratum_instance_t *client, json_t *json_msg,
const char *user, *job_id, *nonce2, *ntime, *nonce;
double diff, wdiff = 0, sdiff = -1;
enum share_err err = SE_NONE;
char *msg, *dump, *buf;
char *fname, *s, *buf;
char idstring[20];
uint32_t ntime32;
char *fname, *s;
workbase_t *wb;
uchar hash[32];
int64_t id;
@ -1751,21 +1660,15 @@ out_unlock:
LOGERR("Failed to fwrite to %s", fname);
} else
LOGERR("Failed to fopen %s", fname);
/* FIXME : Send val json to database here */
dump = json_dumps(val, 0);
json_decref(val);
ASPRINTF(&msg, "id.sharelog.json=%s", dump);
dealloc(dump);
buf = send_recv_ckdb(client->ckp, msg);
buf = json_ckdb_call(client->ckp, "sharelog", val);
if (likely(buf)) {
LOGWARNING("Got sharelog response: %s", buf);
dealloc(buf);
} else
LOGWARNING("Got no sharelog response :(");
dealloc(msg);
out:
if (!share) {
val = json_pack("{ss,si,ss,ss,si,ss,ss,so,si,ss,ss,ss,ss}",
val = json_pack("{ss,si,ss,ss,sI,ss,ss,so,si,ss,ss,ss,ss}",
"method", "shareerror",
"clientid", client->id,
"secondaryuserid", client->secondaryuserid,
@ -1780,17 +1683,12 @@ out:
"createcode", __func__,
"createinet", "127.0.0.1");
/* FIXME : Send val json to database here */
dump = json_dumps(val, 0);
json_decref(val);
ASPRINTF(&msg, "id.sharelog.json=%s", dump);
dealloc(dump);
buf = send_recv_ckdb(client->ckp, msg);
buf = json_ckdb_call(client->ckp, "sharelog", val);
if (likely(buf)) {
LOGWARNING("Got sharelog response: %s", buf);
dealloc(buf);
} else
LOGWARNING("Got no sharelog response :(");
dealloc(msg);
LOGINFO("Invalid share from client %d: %s", client->id, client->workername);
}
return json_boolean(result);
@ -2186,15 +2084,13 @@ static void *statsupdate(void *arg)
const double nonces = 4294967296;
double sps1, sps5, sps15, sps60;
user_instance_t *instance, *tmp;
char *msg, *dump, *buf;
int64_t pplns_shares;
char fname[512] = {};
tv_t now, diff;
char *s, *buf;
int users, i;
ts_t ts_now;
json_t *val;
FILE *fp;
char *s;
tv_time(&now);
timersub(&now, &stats.start_time, &diff);
@ -2233,8 +2129,6 @@ static void *statsupdate(void *arg)
if (unlikely(!fp))
LOGERR("Failed to fopen %s", fname);
pplns_shares = stats.pplns_shares + 1;
val = json_pack("{si,si,si,si,si}",
"runtime", diff.tv_sec,
"Live clients", stats.live_clients,
@ -2260,8 +2154,7 @@ static void *statsupdate(void *arg)
fprintf(fp, "%s\n", s);
dealloc(s);
val = json_pack("{si,sf,sf,sf,sf}",
"Round shares", stats.round_shares,
val = json_pack("{sf,sf,sf,sf}",
"SPS1m", sps1,
"SPS5m", sps5,
"SPS15m", sps15,
@ -2276,8 +2169,8 @@ static void *statsupdate(void *arg)
ck_rlock(&instance_lock);
users = HASH_COUNT(user_instances);
HASH_ITER(hh, user_instances, instance, tmp) {
double reward, ghs;
bool idle = false;
double ghs;
if (now.tv_sec - instance->last_share.tv_sec > 60) {
idle = true;
@ -2302,12 +2195,7 @@ static void *statsupdate(void *arg)
ghs = instance->dsps1440 * nonces;
suffix_string(ghs, suffix1440, 16, 0);
reward = 25 * instance->pplns_shares;
reward /= pplns_shares;
val = json_pack("{si,si,sf,ss,ss,ss,ss,ss,ss}",
"Accepted", instance->diff_accepted,
"Rejected", instance->diff_rejected,
"Est reward", reward,
val = json_pack("{ss,ss,ss,ss,ss,ss}",
"hashrate1m", suffix1,
"hashrate5m", suffix5,
"hashrate15m", suffix15,
@ -2348,17 +2236,12 @@ static void *statsupdate(void *arg)
"createby", "code",
"createcode", __func__,
"createinet", "127.0.0.1");
dump = json_dumps(val, 0);
json_decref(val);
ASPRINTF(&msg, "id.stats.json=%s", dump);
dealloc(dump);
buf = send_recv_ckdb(ckp, msg);
buf = json_ckdb_call(ckp, "stats", val);
if (likely(buf)) {
LOGWARNING("Got stats response: %s", buf);
dealloc(buf);
} else
LOGWARNING("Got no stats response :(");
dealloc(msg);
/* Update stats 4 times per minute for smooth values, displaying
* status every minute. */
@ -2369,8 +2252,6 @@ static void *statsupdate(void *arg)
mutex_lock(&stats_lock);
stats.accounted_shares += stats.unaccounted_shares;
stats.accounted_diff_shares += stats.unaccounted_diff_shares;
stats.round_shares += stats.unaccounted_diff_shares;
stats.pplns_shares += stats.unaccounted_diff_shares;
stats.accounted_rejects += stats.unaccounted_rejects;
decay_time(&stats.sps1, stats.unaccounted_shares, 15, 60);
@ -2395,62 +2276,6 @@ static void *statsupdate(void *arg)
return NULL;
}
static void load_users(ckpool_t *ckp)
{
struct dirent *ep;
DIR *dp;
dp = opendir(ckp->logdir);
if (!dp)
quit(1, "Failed to open logdir %s!", ckp->logdir);
while ((ep = readdir(dp))) {
int64_t diff_accepted, diff_rejected = 0;
user_instance_t *instance;
uint64_t pplns_shares;
char fname[512] = {};
char *period;
int results;
FILE *fp;
if (strlen(ep->d_name) < 7)
continue;
if (!strstr(ep->d_name, ".pplns"))
continue;
snprintf(fname, 511, "%s%s", ckp->logdir, ep->d_name);
fp = fopen(fname, "r");
if (!fp) {
LOGERR("Failed to open pplns logfile %s!", fname);
continue;
}
results = fscanf(fp, "%lu,%ld,%ld", &pplns_shares, &diff_accepted, &diff_rejected);
if (results < 1)
continue;
if (results == 1)
diff_accepted = pplns_shares;
if (!pplns_shares)
continue;
/* Create a new user instance */
instance = ckzalloc(sizeof(user_instance_t));
strncpy(instance->username, ep->d_name, 127);
period = strstr(instance->username, ".");
*period = '\0';
instance->pplns_shares = pplns_shares;
stats.pplns_shares += pplns_shares;
instance->diff_accepted = diff_accepted;
instance->diff_rejected = diff_rejected;
stats.round_shares += diff_accepted;
ck_wlock(&instance_lock);
HASH_ADD_STR(user_instances, username, instance);
ck_wunlock(&instance_lock);
LOGDEBUG("Added user %s with %lu shares", instance->username, pplns_shares);
}
closedir(dp);
}
int stratifier(proc_instance_t *pi)
{
pthread_t pth_blockupdate, pth_stratum_receiver, pth_stratum_sender;
@ -2505,8 +2330,6 @@ int stratifier(proc_instance_t *pi)
cklock_init(&share_lock);
load_users(ckp);
ret = stratum_loop(ckp, pi);
out:
return process_exit(ckp, pi, ret);

Loading…
Cancel
Save