kanoi 10 years ago
parent
commit
310595c478
  1. 14
      src/ckpool.c
  2. 203
      src/stratifier.c

14
src/ckpool.c

@ -81,6 +81,13 @@ void logmsg(int loglevel, const char *fmt, ...) {
tm.tm_hour,
tm.tm_min,
tm.tm_sec);
if (loglevel <= LOG_WARNING) {\
if (loglevel <= LOG_ERR && errno != 0)
fprintf(stderr, "%s %s with errno %d: %s\n", stamp, buf, errno, strerror(errno));
else
fprintf(stderr, "%s %s\n", stamp, buf);
fflush(stderr);
}
if (logfd) {
char *msg;
@ -90,13 +97,6 @@ void logmsg(int loglevel, const char *fmt, ...) {
ASPRINTF(&msg, "%s %s\n", stamp, buf);
ckmsgq_add(global_ckp->logger, msg);
}
if (loglevel <= LOG_WARNING) {\
if (loglevel <= LOG_ERR && errno != 0)
fprintf(stderr, "%s %s with errno %d: %s\n", stamp, buf, errno, strerror(errno));
else
fprintf(stderr, "%s %s\n", stamp, buf);
fflush(stderr);
}
free(buf);
}
}

203
src/stratifier.c

@ -364,6 +364,22 @@ struct stratifier_data {
typedef struct stratifier_data sdata_t;
typedef struct json_entry json_entry_t;
struct json_entry {
json_entry_t *next;
json_entry_t *prev;
json_t *val;
};
typedef struct char_entry char_entry_t;
struct char_entry {
char_entry_t *next;
char_entry_t *prev;
char *buf;
};
/* Priority levels for generator messages */
#define GEN_LAX 0
#define GEN_NORMAL 1
@ -697,7 +713,6 @@ static void add_base(ckpool_t *ckp, workbase_t *wb, bool *new_block)
hex2bin(bin, sdata->lasthash, 32);
swap_256(swap, bin);
__bin2hex(sdata->lastswaphash, swap, 32);
LOGNOTICE("Block hash changed to %s", sdata->lastswaphash);
sdata->blockchange_id = wb->id;
}
if (*new_block && ckp->logshares) {
@ -726,8 +741,10 @@ static void add_base(ckpool_t *ckp, workbase_t *wb, bool *new_block)
sdata->current_workbase = wb;
ck_wunlock(&sdata->workbase_lock);
if (*new_block)
if (*new_block) {
LOGNOTICE("Block hash changed to %s", sdata->lastswaphash);
purge_share_hashtable(sdata, wb->id);
}
send_workinfo(ckp, wb);
@ -896,7 +913,6 @@ static void update_base(ckpool_t *ckp, int prio)
static void __add_dead(sdata_t *sdata, stratum_instance_t *client)
{
LOGDEBUG("Adding dead instance %ld", client->id);
DL_APPEND(sdata->dead_instances, client);
sdata->stats.dead++;
sdata->dead_generated++;
@ -904,14 +920,12 @@ static void __add_dead(sdata_t *sdata, stratum_instance_t *client)
static void __del_dead(sdata_t *sdata, stratum_instance_t *client)
{
LOGDEBUG("Deleting dead instance %ld", client->id);
DL_DELETE_INIT(sdata->dead_instances, client);
sdata->stats.dead--;
}
static void __del_disconnected(sdata_t *sdata, stratum_instance_t *client)
{
LOGDEBUG("Deleting disconnected instance %ld", client->id);
HASH_DEL(sdata->disconnected_instances, client);
sdata->stats.disconnected--;
__add_dead(sdata, client);
@ -920,21 +934,29 @@ static void __del_disconnected(sdata_t *sdata, stratum_instance_t *client)
static void drop_allclients(ckpool_t *ckp)
{
stratum_instance_t *client, *tmp;
int disconnects = 0, kills = 0;
sdata_t *sdata = ckp->data;
char buf[128];
ck_wlock(&sdata->instance_lock);
HASH_ITER(hh, sdata->stratum_instances, client, tmp) {
HASH_DEL(sdata->stratum_instances, client);
kills++;
__add_dead(sdata, client);
sprintf(buf, "dropclient=%ld", client->id);
send_proc(ckp->connector, buf);
}
HASH_ITER(hh, sdata->disconnected_instances, client, tmp) {
disconnects++;
__del_disconnected(sdata, client);
}
sdata->stats.users = sdata->stats.workers = 0;
ck_wunlock(&sdata->instance_lock);
if (disconnects)
LOGNOTICE("Disconnected %d instances", disconnects);
if (kills)
LOGNOTICE("Dropped %d instances", kills);
}
static void update_subscribe(ckpool_t *ckp)
@ -1139,10 +1161,11 @@ static stratum_instance_t *ref_instance_by_id(sdata_t *sdata, int64_t id)
return instance;
}
static void __drop_client(sdata_t *sdata, stratum_instance_t *client, user_instance_t *instance,
int64_t id)
/* Ret = 1 is disconnected, 2 is killed, 3 is workerless killed */
static int __drop_client(sdata_t *sdata, stratum_instance_t *client, user_instance_t *instance)
{
stratum_instance_t *old_client = NULL;
int ret;
HASH_DEL(sdata->stratum_instances, client);
if (instance)
@ -1150,36 +1173,58 @@ static void __drop_client(sdata_t *sdata, stratum_instance_t *client, user_insta
HASH_FIND(hh, sdata->disconnected_instances, &client->enonce1_64, sizeof(uint64_t), old_client);
/* Only keep around one copy of the old client in server mode */
if (!client->ckp->proxy && !old_client && client->enonce1_64 && client->authorised) {
LOGNOTICE("Client %ld %s disconnected %s", id, client->workername,
client->dropped ? "lazily" : "");
ret = 1;
HASH_ADD(hh, sdata->disconnected_instances, enonce1_64, sizeof(uint64_t), client);
sdata->stats.disconnected++;
sdata->disconnected_generated++;
client->disconnected_time = time(NULL);
} else {
if (client->workername) {
LOGNOTICE("Client %ld %s dropped %s", id, client->workername,
client->dropped ? "lazily" : "");
} else
LOGINFO("Workerless client %ld dropped %s", id, client->dropped ? "lazily" : "");
if (client->workername)
ret = 2;
else
ret = 3;
__add_dead(sdata, client);
}
return ret;
}
static void client_drop_message(int64_t client_id, int dropped, bool lazily)
{
switch(dropped) {
case 0:
break;
case 1:
LOGNOTICE("Client %ld disconnected %s", client_id, lazily ? "lazily" : "");
break;
case 2:
LOGNOTICE("Client %ld dropped %s", client_id, lazily ? "lazily" : "");
break;
case 3:
LOGNOTICE("Workerless client %ld dropped %s", client_id, lazily ? "lazily" : "");
break;
}
}
/* Decrease the reference count of instance. */
static void _dec_instance_ref(sdata_t *sdata, stratum_instance_t *instance, const char *file,
const char *func, const int line)
{
int64_t client_id = instance->id;
int dropped = 0, ref;
ck_wlock(&sdata->instance_lock);
if (unlikely(--instance->ref < 0)) {
LOGERR("Instance ref count dropped below zero from %s %s:%d", file, func, line);
instance->ref = 0;
}
ref = --instance->ref;
/* See if there are any instances that were dropped that could not be
* moved due to holding a reference and drop them now. */
if (unlikely(instance->dropped && !instance->ref))
__drop_client(sdata, instance, instance->user_instance, instance->id);
if (unlikely(instance->dropped && !ref))
dropped = __drop_client(sdata, instance, instance->user_instance);
ck_wunlock(&sdata->instance_lock);
client_drop_message(client_id, dropped, true);
/* This should never happen */
if (unlikely(ref < 0))
LOGERR("Instance ref count dropped below zero from %s %s:%d", file, func, line);
}
#define dec_instance_ref(sdata, instance) _dec_instance_ref(sdata, instance, __FILE__, __func__, __LINE__)
@ -1196,7 +1241,6 @@ static stratum_instance_t *__stratum_add_instance(ckpool_t *ckp, int64_t id, int
instance->diff = instance->old_diff = ckp->startdiff;
instance->ckp = ckp;
tv_time(&instance->ldc);
LOGINFO("Stratifier added instance %ld server %d", id, server);
HASH_ADD_I64(sdata->stratum_instances, id, instance);
return instance;
}
@ -1205,6 +1249,7 @@ static uint64_t disconnected_sessionid_exists(sdata_t *sdata, const char *sessio
{
stratum_instance_t *instance, *tmp;
uint64_t enonce1_64 = 0, ret = 0;
int64_t old_id = 0;
int slen;
if (!sessionid)
@ -1233,12 +1278,15 @@ static uint64_t disconnected_sessionid_exists(sdata_t *sdata, const char *sessio
if (instance) {
/* Delete the entry once we are going to use it since there
* will be a new instance with the enonce1_64 */
old_id = instance->id;
__del_disconnected(sdata, instance);
ret = enonce1_64;
}
out_unlock:
ck_wunlock(&sdata->instance_lock);
out:
if (ret)
LOGNOTICE("Reconnecting old instance %ld to instance %ld", old_id, id);
return ret;
}
@ -1249,6 +1297,7 @@ static void stratum_broadcast(sdata_t *sdata, json_t *val)
{
stratum_instance_t *instance, *tmp;
ckmsg_t *bulk_send = NULL;
ckmsgq_t *ssends;
if (unlikely(!val)) {
LOGERR("Sent null json to stratum_broadcast");
@ -1276,13 +1325,15 @@ static void stratum_broadcast(sdata_t *sdata, json_t *val)
if (!bulk_send)
return;
ssends = sdata->ssends;
mutex_lock(sdata->ssends->lock);
if (sdata->ssends->msgs)
DL_CONCAT(sdata->ssends->msgs, bulk_send);
if (ssends->msgs)
DL_CONCAT(ssends->msgs, bulk_send);
else
sdata->ssends->msgs = bulk_send;
pthread_cond_signal(sdata->ssends->cond);
mutex_unlock(sdata->ssends->lock);
ssends->msgs = bulk_send;
pthread_cond_signal(ssends->cond);
mutex_unlock(ssends->lock);
}
static void stratum_add_send(sdata_t *sdata, json_t *val, int64_t client_id)
@ -1319,6 +1370,7 @@ static void dec_worker(ckpool_t *ckp, user_instance_t *instance)
static void drop_client(sdata_t *sdata, int64_t id)
{
int dropped = 0, aged = 0, killed = 0;
stratum_instance_t *client, *tmp;
user_instance_t *instance = NULL;
time_t now_t = time(NULL);
@ -1332,16 +1384,16 @@ static void drop_client(sdata_t *sdata, int64_t id)
if (client) {
instance = client->user_instance;
if (client->authorised) {
client->authorised = false;
dec = true;
ckp = client->ckp;
}
/* If the client is still holding a reference, don't drop them
* now but wait till the reference is dropped */
if (likely(!client->ref))
__drop_client(sdata, client, instance, id);
dropped = __drop_client(sdata, client, instance);
else
client->dropped = true;
client->authorised = false;
}
/* Old disconnected instances will not have any valid shares so remove
@ -1352,7 +1404,7 @@ static void drop_client(sdata_t *sdata, int64_t id)
continue;
if (unlikely(client->ref))
continue;
LOGINFO("Ageing disconnected instance %ld to dead", client->id);
aged++;
__del_disconnected(sdata, client);
}
@ -1360,7 +1412,7 @@ static void drop_client(sdata_t *sdata, int64_t id)
* counts for them. */
DL_FOREACH_SAFE(sdata->dead_instances, client, tmp) {
if (!client->ref) {
LOGINFO("Stratifier discarding dead instance %ld", client->id);
killed++;
__del_dead(sdata, client);
dealloc(client->workername);
dealloc(client->useragent);
@ -1369,6 +1421,12 @@ static void drop_client(sdata_t *sdata, int64_t id)
}
ck_wunlock(&sdata->instance_lock);
client_drop_message(id, dropped, false);
if (aged)
LOGINFO("Aged %d disconnected instances to dead", aged);
if (killed)
LOGINFO("Stratifier discarded %d dead instances", killed);
/* Decrease worker count outside of instance_lock to avoid recursive
* locking */
if (dec)
@ -1986,6 +2044,7 @@ static double dsps_from_key(json_t *val, const char *key)
return ret;
}
/* Enter holding a reference count */
static void read_userstats(ckpool_t *ckp, user_instance_t *instance)
{
char s[512];
@ -2025,6 +2084,7 @@ static void read_userstats(ckpool_t *ckp, user_instance_t *instance)
json_decref(val);
}
/* Enter holding a reference count */
static void read_workerstats(ckpool_t *ckp, worker_instance_t *worker)
{
char s[512];
@ -2070,10 +2130,10 @@ static user_instance_t *generate_user(ckpool_t *ckp, stratum_instance_t *client,
const char *workername)
{
char *base_username = strdupa(workername), *username;
bool new_instance = false, new_worker = false;
sdata_t *sdata = ckp->data;
user_instance_t *instance;
stratum_instance_t *tmp;
bool new = false;
int len;
username = strsep(&base_username, "._");
@ -2089,12 +2149,9 @@ static user_instance_t *generate_user(ckpool_t *ckp, stratum_instance_t *client,
/* New user instance. Secondary user id will be NULL */
instance = ckzalloc(sizeof(user_instance_t));
strcpy(instance->username, username);
new = true;
new_instance = true;
instance->id = sdata->user_instance_id++;
HASH_ADD_STR(sdata->user_instances, username, instance);
if (CKP_STANDALONE(ckp))
read_userstats(ckp, instance);
}
DL_FOREACH(instance->instances, tmp) {
if (!safecmp(workername, tmp->workername)) {
@ -2110,15 +2167,19 @@ static user_instance_t *generate_user(ckpool_t *ckp, stratum_instance_t *client,
worker->workername = strdup(workername);
worker->instance = instance;
DL_APPEND(instance->worker_instances, worker);
if (CKP_STANDALONE(ckp))
read_workerstats(ckp, worker);
new_worker = true;
worker->start_time = time(NULL);
client->worker_instance = worker;
}
DL_APPEND(instance->instances, client);
ck_wunlock(&sdata->instance_lock);
if (new && !ckp->proxy) {
if (CKP_STANDALONE(ckp) && new_instance)
read_userstats(ckp, instance);
if (CKP_STANDALONE(ckp) && new_worker)
read_workerstats(ckp, client->worker_instance);
if (new_instance && !ckp->proxy) {
/* Is this a btc address based username? */
if (len > 26 && len < 35)
instance->btcaddress = test_address(ckp, username);
@ -2140,6 +2201,7 @@ static int send_recv_auth(stratum_instance_t *client)
ckpool_t *ckp = client->ckp;
sdata_t *sdata = ckp->data;
char *buf = NULL, *json_msg;
bool contended = false;
char cdfield[64];
int ret = 1;
json_t *val;
@ -2175,7 +2237,8 @@ static int send_recv_auth(stratum_instance_t *client)
if (likely(!mutex_timedlock(&sdata->ckdb_lock, 3))) {
buf = ckdb_msg_call(ckp, json_msg);
mutex_unlock(&sdata->ckdb_lock);
}
} else
contended = true;
free(json_msg);
if (likely(buf)) {
@ -2219,6 +2282,9 @@ static int send_recv_auth(stratum_instance_t *client)
json_decref(val);
goto out;
}
if (contended)
LOGWARNING("Prolonged lock contention for ckdb while trying to authorise");
else
LOGWARNING("Got no auth response from ckdb :(");
out_fail:
ret = -1;
@ -2958,20 +3024,20 @@ out:
}
/* Must enter with workbase_lock held */
static json_t *__stratum_notify(sdata_t *sdata, bool clean)
static json_t *__stratum_notify(const workbase_t *wb, const bool clean)
{
json_t *val;
JSON_CPACK(val, "{s:[ssssosssb],s:o,s:s}",
"params",
sdata->current_workbase->idstring,
sdata->current_workbase->prevhash,
sdata->current_workbase->coinb1,
sdata->current_workbase->coinb2,
json_deep_copy(sdata->current_workbase->merkle_array),
sdata->current_workbase->bbversion,
sdata->current_workbase->nbit,
sdata->current_workbase->ntime,
wb->idstring,
wb->prevhash,
wb->coinb1,
wb->coinb2,
json_deep_copy(wb->merkle_array),
wb->bbversion,
wb->nbit,
wb->ntime,
clean,
"id", json_null(),
"method", "mining.notify");
@ -2983,7 +3049,7 @@ static void stratum_broadcast_update(sdata_t *sdata, bool clean)
json_t *json_msg;
ck_rlock(&sdata->workbase_lock);
json_msg = __stratum_notify(sdata, clean);
json_msg = __stratum_notify(sdata->current_workbase, clean);
ck_runlock(&sdata->workbase_lock);
stratum_broadcast(sdata, json_msg);
@ -2995,7 +3061,7 @@ static void stratum_send_update(sdata_t *sdata, int64_t client_id, bool clean)
json_t *json_msg;
ck_rlock(&sdata->workbase_lock);
json_msg = __stratum_notify(sdata, clean);
json_msg = __stratum_notify(sdata->current_workbase, clean);
ck_runlock(&sdata->workbase_lock);
stratum_add_send(sdata, json_msg, client_id);
@ -3264,6 +3330,7 @@ static void srecv_process(ckpool_t *ckp, char *buf)
{
sdata_t *sdata = ckp->data;
stratum_instance_t *client;
bool added = false;
smsg_t *msg;
json_t *val;
int server;
@ -3310,11 +3377,16 @@ static void srecv_process(ckpool_t *ckp, char *buf)
ck_wlock(&sdata->instance_lock);
client = __instance_by_id(sdata, msg->client_id);
/* If client_id instance doesn't exist yet, create one */
if (unlikely(!client))
if (unlikely(!client)) {
client = __stratum_add_instance(ckp, msg->client_id, server);
added = true;
}
__inc_instance_ref(client);
ck_wunlock(&sdata->instance_lock);
if (added)
LOGINFO("Stratifier added instance %ld server %d", client->id, server);
parse_instance_msg(sdata, msg, client);
dec_instance_ref(sdata, client);
out:
@ -3639,6 +3711,7 @@ out:
* avoid floods of stat data coming at once. */
static void update_workerstats(ckpool_t *ckp, sdata_t *sdata)
{
json_entry_t *json_list = NULL, *entry, *tmpentry;
user_instance_t *user, *tmp;
char cdfield[64];
time_t now_t;
@ -3671,8 +3744,8 @@ static void update_workerstats(ckpool_t *ckp, sdata_t *sdata)
continue;
DL_FOREACH(user->worker_instances, worker) {
double ghs1, ghs5, ghs60, ghs1440;
int elapsed;
json_t *val;
int elapsed;
/* Send one lot of stats once the worker is idle if
* they have submitted no shares in the last 10 minutes
@ -3699,10 +3772,19 @@ static void update_workerstats(ckpool_t *ckp, sdata_t *sdata)
"createcode", __func__,
"createinet", ckp->serverurl[0]);
worker->notified_idle = worker->idle;
ckdbq_add(ckp, ID_WORKERSTATS, val);
entry = ckalloc(sizeof(json_entry_t));
entry->val = val;
DL_APPEND(json_list, entry);
}
}
ck_runlock(&sdata->instance_lock);
/* Add all entries outside of the instance lock */
DL_FOREACH_SAFE(json_list, entry, tmpentry) {
ckdbq_add(ckp, ID_WORKERSTATS, entry->val);
DL_DELETE(json_list, entry);
free(entry);
}
}
static void *statsupdate(void *arg)
@ -3724,6 +3806,7 @@ static void *statsupdate(void *arg)
double tdiff, per_tdiff;
char suffix1[16], suffix5[16], suffix15[16], suffix60[16], cdfield[64];
char suffix360[16], suffix1440[16], suffix10080[16];
char_entry_t *char_list = NULL, *char_t, *chartmp_t;
user_instance_t *instance, *tmpuser;
stratum_instance_t *client, *tmp;
double sps1, sps5, sps15, sps60;
@ -3852,14 +3935,24 @@ static void *statsupdate(void *arg)
}
s = json_dumps(val, JSON_NO_UTF8 | JSON_PRESERVE_ORDER);
fprintf(fp, "%s\n", s);
if (!idle)
LOGNOTICE("User %s:%s", instance->username, s);
if (!idle) {
char_t = ckalloc(sizeof(char_entry_t));
ASPRINTF(&char_t->buf, "User %s:%s", instance->username, s);
DL_APPEND(char_list, char_t);
}
dealloc(s);
json_decref(val);
fclose(fp);
}
ck_runlock(&sdata->instance_lock);
DL_FOREACH_SAFE(char_list, char_t, chartmp_t) {
LOGNOTICE("%s", char_t->buf);
DL_DELETE(char_list, char_t);
free(char_t->buf);
dealloc(char_t);
}
ghs1 = stats->dsps1 * nonces;
suffix_string(ghs1, suffix1, 16, 0);
sps1 = stats->sps1;

Loading…
Cancel
Save