kanoi 10 years ago
parent
commit
33e4af9a2d
  1. 2
      configure.ac
  2. 7
      src/ckpool.c
  3. 1
      src/ckpool.h
  4. 4
      src/jansson-2.6/src/jansson_private.h
  5. 7
      src/jansson-2.6/src/memory.c
  6. 8
      src/libckpool.h
  7. 151
      src/stratifier.c

2
configure.ac

@ -1,4 +1,4 @@
AC_INIT(ckpool, 0.8.2, kernel@kolivas.org)
AC_INIT(ckpool, 0.8.3, kernel@kolivas.org)
AC_CANONICAL_SYSTEM
AC_CONFIG_MACRO_DIR([m4])

7
src/ckpool.c

@ -183,6 +183,7 @@ void ckmsgq_add(ckmsgq_t *ckmsgq, void *data)
msg->data = data;
mutex_lock(ckmsgq->lock);
ckmsgq->messages++;
DL_APPEND(ckmsgq->msgs, msg);
pthread_cond_signal(ckmsgq->cond);
mutex_unlock(ckmsgq->lock);
@ -349,6 +350,12 @@ retry:
}
execv(ckp->initial_args[0], (char *const *)ckp->initial_args);
}
} else if (cmdmatch(buf, "stratifierstats")) {
char *msg;
LOGDEBUG("Listener received stratifierstats request");
msg = send_recv_proc(ckp->stratifier, "stats");
send_unix_msg(sockd, msg);
} else {
LOGINFO("Listener received unhandled message: %s", buf);
send_unix_msg(sockd, "unknown");

1
src/ckpool.h

@ -38,6 +38,7 @@ struct ckmsgq {
pthread_cond_t *cond;
ckmsg_t *msgs;
void (*func)(ckpool_t *, void *);
int64_t messages;
};
typedef struct ckmsgq ckmsgq_t;

4
src/jansson-2.6/src/jansson_private.h

@ -81,7 +81,9 @@ int jsonp_dtostr(char *buffer, size_t size, double value);
/* Wrappers for custom memory functions */
void* jsonp_malloc(size_t size);
void jsonp_free(void *ptr);
void _jsonp_free(void **ptr);
#define jsonp_free(ptr) _jsonp_free((void *)&(ptr))
char *jsonp_strndup(const char *str, size_t length);
char *jsonp_strdup(const char *str);
char *jsonp_eolstrdup(const char *str);

7
src/jansson-2.6/src/memory.c

@ -25,12 +25,13 @@ void *jsonp_malloc(size_t size)
return (*do_malloc)(size);
}
void jsonp_free(void *ptr)
void _jsonp_free(void **ptr)
{
if(!ptr)
if(!*ptr)
return;
(*do_free)(ptr);
(*do_free)(*ptr);
*ptr = NULL;
}
char *jsonp_strdup(const char *str)

8
src/libckpool.h

@ -370,6 +370,14 @@ static inline void _json_set_bool(json_t *val, const char *key, bool boolean,
}
#define json_set_bool(val, key, boolean) _json_set_bool(val, key, boolean, __FILE__, __func__, __LINE__)
static inline void _json_set_object(json_t *val, const char *key, json_t *object,
const char *file, const char *func, const int line)
{
if (unlikely(json_object_set_new_nocheck(val, key, object)))
LOGERR("Failed to set json object from %s %s:%d", file, func, line);
}
#define json_set_object(val, key, object) _json_set_object(val, key, object, __FILE__, __func__, __LINE__)
void rename_proc(const char *name);
void create_pthread(pthread_t *thread, void *(*start_routine)(void *), void *arg);
void join_pthread(pthread_t thread);

151
src/stratifier.c

@ -302,6 +302,7 @@ struct stratifier_data {
/* For the hashtable of all workbases */
workbase_t *workbases;
workbase_t *current_workbase;
int workbases_generated;
int64_t workbase_id;
int64_t blockchange_id;
@ -323,6 +324,10 @@ struct stratifier_data {
stratum_instance_t *disconnected_instances;
stratum_instance_t *dead_instances;
int stratum_generated;
int disconnected_generated;
int dead_generated;
user_instance_t *user_instances;
/* Protects both stratum and user instances */
@ -331,6 +336,8 @@ struct stratifier_data {
share_t *shares;
cklock_t share_lock;
int64_t shares_generated;
/* Linked list of block solves, added to during submission, removed on
* accept/reject. It is likely we only ever have one solve on here but
* you never know... */
@ -656,6 +663,7 @@ static void add_base(ckpool_t *ckp, workbase_t *wb, bool *new_block)
* we set workbase_id from it. In server mode the stratifier is
* setting the workbase_id */
ck_wlock(&sdata->workbase_lock);
sdata->workbases_generated++;
if (!ckp->proxy)
wb->id = sdata->workbase_id++;
else
@ -867,6 +875,7 @@ static void __add_dead(sdata_t *sdata, stratum_instance_t *client)
LOGDEBUG("Adding dead instance %ld", client->id);
LL_PREPEND(sdata->dead_instances, client);
sdata->stats.dead++;
sdata->dead_generated++;
}
static void __del_dead(sdata_t *sdata, stratum_instance_t *client)
@ -1117,17 +1126,18 @@ static void __drop_client(sdata_t *sdata, stratum_instance_t *client, user_insta
HASH_FIND(hh, sdata->disconnected_instances, &client->enonce1_64, sizeof(uint64_t), old_client);
/* Only keep around one copy of the old client in server mode */
if (!client->ckp->proxy && !old_client && client->enonce1_64 && client->authorised) {
LOGNOTICE("Disconnecting client %ld %s %s", id, client->workername,
LOGNOTICE("Client %ld %s disconnected %s", id, client->workername,
client->dropped ? "lazily" : "");
HASH_ADD(hh, sdata->disconnected_instances, enonce1_64, sizeof(uint64_t), client);
sdata->stats.disconnected++;
sdata->disconnected_generated++;
client->disconnected_time = time(NULL);
} else {
if (client->workername)
LOGNOTICE("Dropping client %ld %s %s", id, client->workername,
if (client->workername) {
LOGNOTICE("Client %ld %s dropped %s", id, client->workername,
client->dropped ? "lazily" : "");
else
LOGINFO("Dropping workerless client %ld %s", id, client->dropped ? "lazily" : "");
} else
LOGINFO("Workerless client %ld dropped %s", id, client->dropped ? "lazily" : "");
__add_dead(sdata, client);
}
}
@ -1156,6 +1166,7 @@ static stratum_instance_t *__stratum_add_instance(ckpool_t *ckp, int64_t id, int
stratum_instance_t *instance = ckzalloc(sizeof(stratum_instance_t));
sdata_t *sdata = ckp->data;
sdata->stratum_generated++;
instance->id = id;
instance->server = server;
instance->diff = instance->old_diff = ckp->startdiff;
@ -1501,6 +1512,90 @@ static void broadcast_ping(sdata_t *sdata)
stratum_broadcast(sdata, json_msg);
}
#define SAFE_HASH_OVERHEAD(HASHLIST) (HASHLIST ? HASH_OVERHEAD(hh, HASHLIST) : 0)
static void ckmsgq_stats(ckmsgq_t *ckmsgq, int size, json_t **val)
{
int objects, generated;
int64_t memsize;
ckmsg_t *msg;
mutex_lock(ckmsgq->lock);
DL_COUNT(ckmsgq->msgs, msg, objects);
generated = ckmsgq->messages;
mutex_unlock(ckmsgq->lock);
memsize = (sizeof(ckmsg_t) + size) * objects;
JSON_CPACK(*val, "{si,si,si}", "count", objects, "memory", memsize, "generated", generated);
}
static char *stratifier_stats(sdata_t *sdata)
{
json_t *val = json_object(), *subval;
int objects, generated;
int64_t memsize;
char *buf;
ck_rlock(&sdata->workbase_lock);
objects = HASH_COUNT(sdata->workbases);
memsize = SAFE_HASH_OVERHEAD(sdata->workbases) + sizeof(workbase_t) * objects;
generated = sdata->workbases_generated;
ck_runlock(&sdata->workbase_lock);
JSON_CPACK(subval, "{si,si,si}", "count", objects, "memory", memsize, "generated", generated);
json_set_object(val, "workbases", subval);
ck_rlock(&sdata->instance_lock);
objects = HASH_COUNT(sdata->user_instances);
memsize = SAFE_HASH_OVERHEAD(sdata->user_instances) + sizeof(stratum_instance_t) * objects;
JSON_CPACK(subval, "{si,si}", "count", objects, "memory", memsize);
json_set_object(val, "users", subval);
objects = HASH_COUNT(sdata->stratum_instances);
memsize = SAFE_HASH_OVERHEAD(sdata->stratum_instances);
generated = sdata->stratum_generated;
JSON_CPACK(subval, "{si,si,si}", "count", objects, "memory", memsize, "generated", generated);
json_set_object(val, "clients", subval);
objects = sdata->stats.disconnected;
generated = sdata->disconnected_generated;
memsize = sizeof(stratum_instance_t) * sdata->stats.disconnected;
JSON_CPACK(subval, "{si,si,si}", "count", objects, "memory", memsize, "generated", generated);
json_set_object(val, "disconnected", subval);
objects = sdata->stats.dead;
generated = sdata->dead_generated;
memsize = sizeof(stratum_instance_t) * sdata->stats.dead;
ck_runlock(&sdata->instance_lock);
JSON_CPACK(subval, "{si,si,si}", "count", objects, "memory", memsize, "generated", generated);
json_set_object(val, "dead", subval);
ck_rlock(&sdata->share_lock);
generated = sdata->shares_generated;
objects = HASH_COUNT(sdata->shares);
memsize = SAFE_HASH_OVERHEAD(sdata->shares) + sizeof(share_t) * objects;
ck_runlock(&sdata->share_lock);
JSON_CPACK(subval, "{si,si,si}", "count", objects, "memory", memsize, "generated", generated);
json_set_object(val, "shares", subval);
ckmsgq_stats(sdata->ssends, sizeof(smsg_t), &subval);
json_set_object(val, "ssends", subval);
/* Don't know exactly how big the string is so just count the pointer for now */
ckmsgq_stats(sdata->srecvs, sizeof(char *), &subval);
json_set_object(val, "srecvs", subval);
ckmsgq_stats(sdata->ckdbq, sizeof(char *), &subval);
json_set_object(val, "ckdbq", subval);
ckmsgq_stats(sdata->stxnq, sizeof(json_params_t), &subval);
json_set_object(val, "stxnq", subval);
buf = json_dumps(val, JSON_NO_UTF8 | JSON_PRESERVE_ORDER);
json_decref(val);
LOGNOTICE("Stratifier stats: %s", buf);
return buf;
}
static int stratum_loop(ckpool_t *ckp, proc_instance_t *pi)
{
int sockd, ret = 0, selret = 0;
@ -1566,6 +1661,15 @@ retry:
Close(sockd);
goto retry;
}
if (cmdmatch(buf, "stats")) {
char *msg;
LOGDEBUG("Stratifier received stats request");
msg = stratifier_stats(sdata);
send_unix_msg(sockd, msg);
Close(sockd);
goto retry;
}
Close(sockd);
LOGDEBUG("Stratifier received request: %s", buf);
@ -2532,6 +2636,7 @@ static bool new_share(sdata_t *sdata, const uchar *hash, int64_t wb_id)
share = ckzalloc(sizeof(share_t));
memcpy(share->hash, hash, 32);
share->workbase_id = wb_id;
sdata->shares_generated++;
HASH_ADD(hh, sdata->shares, hash, 32, share);
ret = true;
out_unlock:
@ -3497,8 +3602,8 @@ out:
dec_instance_ref(sdata, client);
}
/* Called every 20 seconds, we send the updated stats to ckdb of those users
* who have gone 10 minutes between updates. This ends up staggering stats to
/* Called 32 times per min, we send the updated stats to ckdb of those users
* who have gone 1 minute between updates. This ends up staggering stats to
* avoid floods of stat data coming at once. */
static void update_workerstats(ckpool_t *ckp, sdata_t *sdata)
{
@ -3815,10 +3920,10 @@ static void *statsupdate(void *arg)
"createinet", ckp->serverurl[0]);
ckdbq_add(ckp, ID_POOLSTATS, val);
/* Update stats 3 times per minute for smooth values, displaying
* status every minute. */
for (i = 0; i < 3; i++) {
cksleep_ms_r(&stats->last_update, 20000);
/* Update stats 32 times per minute to divide up userstats for
* ckdb, displaying status every minute. */
for (i = 0; i < 32; i++) {
cksleep_ms_r(&stats->last_update, 1875);
cksleep_prepare_r(&stats->last_update);
update_workerstats(ckp, sdata);
@ -3827,18 +3932,18 @@ static void *statsupdate(void *arg)
stats->accounted_diff_shares += stats->unaccounted_diff_shares;
stats->accounted_rejects += stats->unaccounted_rejects;
decay_time(&stats->sps1, stats->unaccounted_shares, 20, 60);
decay_time(&stats->sps5, stats->unaccounted_shares, 20, 300);
decay_time(&stats->sps15, stats->unaccounted_shares, 20, 900);
decay_time(&stats->sps60, stats->unaccounted_shares, 20, 3600);
decay_time(&stats->dsps1, stats->unaccounted_diff_shares, 20, 60);
decay_time(&stats->dsps5, stats->unaccounted_diff_shares, 20, 300);
decay_time(&stats->dsps15, stats->unaccounted_diff_shares, 20, 900);
decay_time(&stats->dsps60, stats->unaccounted_diff_shares, 20, 3600);
decay_time(&stats->dsps360, stats->unaccounted_diff_shares, 20, 21600);
decay_time(&stats->dsps1440, stats->unaccounted_diff_shares, 20, 86400);
decay_time(&stats->dsps10080, stats->unaccounted_diff_shares, 20, 604800);
decay_time(&stats->sps1, stats->unaccounted_shares, 1.875, 60);
decay_time(&stats->sps5, stats->unaccounted_shares, 1.875, 300);
decay_time(&stats->sps15, stats->unaccounted_shares, 1.875, 900);
decay_time(&stats->sps60, stats->unaccounted_shares, 1.875, 3600);
decay_time(&stats->dsps1, stats->unaccounted_diff_shares, 1.875, 60);
decay_time(&stats->dsps5, stats->unaccounted_diff_shares, 1.875, 300);
decay_time(&stats->dsps15, stats->unaccounted_diff_shares, 1.875, 900);
decay_time(&stats->dsps60, stats->unaccounted_diff_shares, 1.875, 3600);
decay_time(&stats->dsps360, stats->unaccounted_diff_shares, 1.875, 21600);
decay_time(&stats->dsps1440, stats->unaccounted_diff_shares, 1.875, 86400);
decay_time(&stats->dsps10080, stats->unaccounted_diff_shares, 1.875, 604800);
stats->unaccounted_shares =
stats->unaccounted_diff_shares =

Loading…
Cancel
Save