diff --git a/src/ckpool.c b/src/ckpool.c index c2ca2348..ca07b3c2 100644 --- a/src/ckpool.c +++ b/src/ckpool.c @@ -81,6 +81,13 @@ void logmsg(int loglevel, const char *fmt, ...) { tm.tm_hour, tm.tm_min, tm.tm_sec); + if (loglevel <= LOG_WARNING) {\ + if (loglevel <= LOG_ERR && errno != 0) + fprintf(stderr, "%s %s with errno %d: %s\n", stamp, buf, errno, strerror(errno)); + else + fprintf(stderr, "%s %s\n", stamp, buf); + fflush(stderr); + } if (logfd) { char *msg; @@ -90,13 +97,6 @@ void logmsg(int loglevel, const char *fmt, ...) { ASPRINTF(&msg, "%s %s\n", stamp, buf); ckmsgq_add(global_ckp->logger, msg); } - if (loglevel <= LOG_WARNING) {\ - if (loglevel <= LOG_ERR && errno != 0) - fprintf(stderr, "%s %s with errno %d: %s\n", stamp, buf, errno, strerror(errno)); - else - fprintf(stderr, "%s %s\n", stamp, buf); - fflush(stderr); - } free(buf); } } diff --git a/src/stratifier.c b/src/stratifier.c index 28b1cb53..5d1b2cfb 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -364,6 +364,22 @@ struct stratifier_data { typedef struct stratifier_data sdata_t; +typedef struct json_entry json_entry_t; + +struct json_entry { + json_entry_t *next; + json_entry_t *prev; + json_t *val; +}; + +typedef struct char_entry char_entry_t; + +struct char_entry { + char_entry_t *next; + char_entry_t *prev; + char *buf; +}; + /* Priority levels for generator messages */ #define GEN_LAX 0 #define GEN_NORMAL 1 @@ -697,7 +713,6 @@ static void add_base(ckpool_t *ckp, workbase_t *wb, bool *new_block) hex2bin(bin, sdata->lasthash, 32); swap_256(swap, bin); __bin2hex(sdata->lastswaphash, swap, 32); - LOGNOTICE("Block hash changed to %s", sdata->lastswaphash); sdata->blockchange_id = wb->id; } if (*new_block && ckp->logshares) { @@ -726,8 +741,10 @@ static void add_base(ckpool_t *ckp, workbase_t *wb, bool *new_block) sdata->current_workbase = wb; ck_wunlock(&sdata->workbase_lock); - if (*new_block) + if (*new_block) { + LOGNOTICE("Block hash changed to %s", sdata->lastswaphash); purge_share_hashtable(sdata, wb->id); + } send_workinfo(ckp, wb); @@ -896,7 +913,6 @@ static void update_base(ckpool_t *ckp, int prio) static void __add_dead(sdata_t *sdata, stratum_instance_t *client) { - LOGDEBUG("Adding dead instance %ld", client->id); DL_APPEND(sdata->dead_instances, client); sdata->stats.dead++; sdata->dead_generated++; @@ -904,14 +920,12 @@ static void __add_dead(sdata_t *sdata, stratum_instance_t *client) static void __del_dead(sdata_t *sdata, stratum_instance_t *client) { - LOGDEBUG("Deleting dead instance %ld", client->id); DL_DELETE_INIT(sdata->dead_instances, client); sdata->stats.dead--; } static void __del_disconnected(sdata_t *sdata, stratum_instance_t *client) { - LOGDEBUG("Deleting disconnected instance %ld", client->id); HASH_DEL(sdata->disconnected_instances, client); sdata->stats.disconnected--; __add_dead(sdata, client); @@ -920,21 +934,29 @@ static void __del_disconnected(sdata_t *sdata, stratum_instance_t *client) static void drop_allclients(ckpool_t *ckp) { stratum_instance_t *client, *tmp; + int disconnects = 0, kills = 0; sdata_t *sdata = ckp->data; char buf[128]; ck_wlock(&sdata->instance_lock); HASH_ITER(hh, sdata->stratum_instances, client, tmp) { HASH_DEL(sdata->stratum_instances, client); + kills++; __add_dead(sdata, client); sprintf(buf, "dropclient=%ld", client->id); send_proc(ckp->connector, buf); } HASH_ITER(hh, sdata->disconnected_instances, client, tmp) { + disconnects++; __del_disconnected(sdata, client); } sdata->stats.users = sdata->stats.workers = 0; ck_wunlock(&sdata->instance_lock); + + if (disconnects) + LOGNOTICE("Disconnected %d instances", disconnects); + if (kills) + LOGNOTICE("Dropped %d instances", kills); } static void update_subscribe(ckpool_t *ckp) @@ -1139,10 +1161,11 @@ static stratum_instance_t *ref_instance_by_id(sdata_t *sdata, int64_t id) return instance; } -static void __drop_client(sdata_t *sdata, stratum_instance_t *client, user_instance_t *instance, - int64_t id) +/* Ret = 1 is disconnected, 2 is killed, 3 is workerless killed */ +static int __drop_client(sdata_t *sdata, stratum_instance_t *client, user_instance_t *instance) { stratum_instance_t *old_client = NULL; + int ret; HASH_DEL(sdata->stratum_instances, client); if (instance) @@ -1150,36 +1173,58 @@ static void __drop_client(sdata_t *sdata, stratum_instance_t *client, user_insta HASH_FIND(hh, sdata->disconnected_instances, &client->enonce1_64, sizeof(uint64_t), old_client); /* Only keep around one copy of the old client in server mode */ if (!client->ckp->proxy && !old_client && client->enonce1_64 && client->authorised) { - LOGNOTICE("Client %ld %s disconnected %s", id, client->workername, - client->dropped ? "lazily" : ""); + ret = 1; HASH_ADD(hh, sdata->disconnected_instances, enonce1_64, sizeof(uint64_t), client); sdata->stats.disconnected++; sdata->disconnected_generated++; client->disconnected_time = time(NULL); } else { - if (client->workername) { - LOGNOTICE("Client %ld %s dropped %s", id, client->workername, - client->dropped ? "lazily" : ""); - } else - LOGINFO("Workerless client %ld dropped %s", id, client->dropped ? "lazily" : ""); + if (client->workername) + ret = 2; + else + ret = 3; __add_dead(sdata, client); } + return ret; +} + +static void client_drop_message(int64_t client_id, int dropped, bool lazily) +{ + switch(dropped) { + case 0: + break; + case 1: + LOGNOTICE("Client %ld disconnected %s", client_id, lazily ? "lazily" : ""); + break; + case 2: + LOGNOTICE("Client %ld dropped %s", client_id, lazily ? "lazily" : ""); + break; + case 3: + LOGNOTICE("Workerless client %ld dropped %s", client_id, lazily ? "lazily" : ""); + break; + } } /* Decrease the reference count of instance. */ static void _dec_instance_ref(sdata_t *sdata, stratum_instance_t *instance, const char *file, const char *func, const int line) { + int64_t client_id = instance->id; + int dropped = 0, ref; + ck_wlock(&sdata->instance_lock); - if (unlikely(--instance->ref < 0)) { - LOGERR("Instance ref count dropped below zero from %s %s:%d", file, func, line); - instance->ref = 0; - } + ref = --instance->ref; /* See if there are any instances that were dropped that could not be * moved due to holding a reference and drop them now. */ - if (unlikely(instance->dropped && !instance->ref)) - __drop_client(sdata, instance, instance->user_instance, instance->id); + if (unlikely(instance->dropped && !ref)) + dropped = __drop_client(sdata, instance, instance->user_instance); ck_wunlock(&sdata->instance_lock); + + client_drop_message(client_id, dropped, true); + + /* This should never happen */ + if (unlikely(ref < 0)) + LOGERR("Instance ref count dropped below zero from %s %s:%d", file, func, line); } #define dec_instance_ref(sdata, instance) _dec_instance_ref(sdata, instance, __FILE__, __func__, __LINE__) @@ -1196,7 +1241,6 @@ static stratum_instance_t *__stratum_add_instance(ckpool_t *ckp, int64_t id, int instance->diff = instance->old_diff = ckp->startdiff; instance->ckp = ckp; tv_time(&instance->ldc); - LOGINFO("Stratifier added instance %ld server %d", id, server); HASH_ADD_I64(sdata->stratum_instances, id, instance); return instance; } @@ -1205,6 +1249,7 @@ static uint64_t disconnected_sessionid_exists(sdata_t *sdata, const char *sessio { stratum_instance_t *instance, *tmp; uint64_t enonce1_64 = 0, ret = 0; + int64_t old_id = 0; int slen; if (!sessionid) @@ -1233,12 +1278,15 @@ static uint64_t disconnected_sessionid_exists(sdata_t *sdata, const char *sessio if (instance) { /* Delete the entry once we are going to use it since there * will be a new instance with the enonce1_64 */ + old_id = instance->id; __del_disconnected(sdata, instance); ret = enonce1_64; } out_unlock: ck_wunlock(&sdata->instance_lock); out: + if (ret) + LOGNOTICE("Reconnecting old instance %ld to instance %ld", old_id, id); return ret; } @@ -1249,6 +1297,7 @@ static void stratum_broadcast(sdata_t *sdata, json_t *val) { stratum_instance_t *instance, *tmp; ckmsg_t *bulk_send = NULL; + ckmsgq_t *ssends; if (unlikely(!val)) { LOGERR("Sent null json to stratum_broadcast"); @@ -1276,13 +1325,15 @@ static void stratum_broadcast(sdata_t *sdata, json_t *val) if (!bulk_send) return; + ssends = sdata->ssends; + mutex_lock(sdata->ssends->lock); - if (sdata->ssends->msgs) - DL_CONCAT(sdata->ssends->msgs, bulk_send); + if (ssends->msgs) + DL_CONCAT(ssends->msgs, bulk_send); else - sdata->ssends->msgs = bulk_send; - pthread_cond_signal(sdata->ssends->cond); - mutex_unlock(sdata->ssends->lock); + ssends->msgs = bulk_send; + pthread_cond_signal(ssends->cond); + mutex_unlock(ssends->lock); } static void stratum_add_send(sdata_t *sdata, json_t *val, int64_t client_id) @@ -1319,6 +1370,7 @@ static void dec_worker(ckpool_t *ckp, user_instance_t *instance) static void drop_client(sdata_t *sdata, int64_t id) { + int dropped = 0, aged = 0, killed = 0; stratum_instance_t *client, *tmp; user_instance_t *instance = NULL; time_t now_t = time(NULL); @@ -1332,16 +1384,16 @@ static void drop_client(sdata_t *sdata, int64_t id) if (client) { instance = client->user_instance; if (client->authorised) { - client->authorised = false; dec = true; ckp = client->ckp; } /* If the client is still holding a reference, don't drop them * now but wait till the reference is dropped */ if (likely(!client->ref)) - __drop_client(sdata, client, instance, id); + dropped = __drop_client(sdata, client, instance); else client->dropped = true; + client->authorised = false; } /* Old disconnected instances will not have any valid shares so remove @@ -1352,7 +1404,7 @@ static void drop_client(sdata_t *sdata, int64_t id) continue; if (unlikely(client->ref)) continue; - LOGINFO("Ageing disconnected instance %ld to dead", client->id); + aged++; __del_disconnected(sdata, client); } @@ -1360,7 +1412,7 @@ static void drop_client(sdata_t *sdata, int64_t id) * counts for them. */ DL_FOREACH_SAFE(sdata->dead_instances, client, tmp) { if (!client->ref) { - LOGINFO("Stratifier discarding dead instance %ld", client->id); + killed++; __del_dead(sdata, client); dealloc(client->workername); dealloc(client->useragent); @@ -1369,6 +1421,12 @@ static void drop_client(sdata_t *sdata, int64_t id) } ck_wunlock(&sdata->instance_lock); + client_drop_message(id, dropped, false); + if (aged) + LOGINFO("Aged %d disconnected instances to dead", aged); + if (killed) + LOGINFO("Stratifier discarded %d dead instances", killed); + /* Decrease worker count outside of instance_lock to avoid recursive * locking */ if (dec) @@ -1986,6 +2044,7 @@ static double dsps_from_key(json_t *val, const char *key) return ret; } +/* Enter holding a reference count */ static void read_userstats(ckpool_t *ckp, user_instance_t *instance) { char s[512]; @@ -2025,6 +2084,7 @@ static void read_userstats(ckpool_t *ckp, user_instance_t *instance) json_decref(val); } +/* Enter holding a reference count */ static void read_workerstats(ckpool_t *ckp, worker_instance_t *worker) { char s[512]; @@ -2070,10 +2130,10 @@ static user_instance_t *generate_user(ckpool_t *ckp, stratum_instance_t *client, const char *workername) { char *base_username = strdupa(workername), *username; + bool new_instance = false, new_worker = false; sdata_t *sdata = ckp->data; user_instance_t *instance; stratum_instance_t *tmp; - bool new = false; int len; username = strsep(&base_username, "._"); @@ -2089,12 +2149,9 @@ static user_instance_t *generate_user(ckpool_t *ckp, stratum_instance_t *client, /* New user instance. Secondary user id will be NULL */ instance = ckzalloc(sizeof(user_instance_t)); strcpy(instance->username, username); - new = true; - + new_instance = true; instance->id = sdata->user_instance_id++; HASH_ADD_STR(sdata->user_instances, username, instance); - if (CKP_STANDALONE(ckp)) - read_userstats(ckp, instance); } DL_FOREACH(instance->instances, tmp) { if (!safecmp(workername, tmp->workername)) { @@ -2110,15 +2167,19 @@ static user_instance_t *generate_user(ckpool_t *ckp, stratum_instance_t *client, worker->workername = strdup(workername); worker->instance = instance; DL_APPEND(instance->worker_instances, worker); - if (CKP_STANDALONE(ckp)) - read_workerstats(ckp, worker); + new_worker = true; worker->start_time = time(NULL); client->worker_instance = worker; } DL_APPEND(instance->instances, client); ck_wunlock(&sdata->instance_lock); - if (new && !ckp->proxy) { + if (CKP_STANDALONE(ckp) && new_instance) + read_userstats(ckp, instance); + if (CKP_STANDALONE(ckp) && new_worker) + read_workerstats(ckp, client->worker_instance); + + if (new_instance && !ckp->proxy) { /* Is this a btc address based username? */ if (len > 26 && len < 35) instance->btcaddress = test_address(ckp, username); @@ -2140,6 +2201,7 @@ static int send_recv_auth(stratum_instance_t *client) ckpool_t *ckp = client->ckp; sdata_t *sdata = ckp->data; char *buf = NULL, *json_msg; + bool contended = false; char cdfield[64]; int ret = 1; json_t *val; @@ -2175,7 +2237,8 @@ static int send_recv_auth(stratum_instance_t *client) if (likely(!mutex_timedlock(&sdata->ckdb_lock, 3))) { buf = ckdb_msg_call(ckp, json_msg); mutex_unlock(&sdata->ckdb_lock); - } + } else + contended = true; free(json_msg); if (likely(buf)) { @@ -2219,7 +2282,10 @@ static int send_recv_auth(stratum_instance_t *client) json_decref(val); goto out; } - LOGWARNING("Got no auth response from ckdb :("); + if (contended) + LOGWARNING("Prolonged lock contention for ckdb while trying to authorise"); + else + LOGWARNING("Got no auth response from ckdb :("); out_fail: ret = -1; out: @@ -2958,20 +3024,20 @@ out: } /* Must enter with workbase_lock held */ -static json_t *__stratum_notify(sdata_t *sdata, bool clean) +static json_t *__stratum_notify(const workbase_t *wb, const bool clean) { json_t *val; JSON_CPACK(val, "{s:[ssssosssb],s:o,s:s}", "params", - sdata->current_workbase->idstring, - sdata->current_workbase->prevhash, - sdata->current_workbase->coinb1, - sdata->current_workbase->coinb2, - json_deep_copy(sdata->current_workbase->merkle_array), - sdata->current_workbase->bbversion, - sdata->current_workbase->nbit, - sdata->current_workbase->ntime, + wb->idstring, + wb->prevhash, + wb->coinb1, + wb->coinb2, + json_deep_copy(wb->merkle_array), + wb->bbversion, + wb->nbit, + wb->ntime, clean, "id", json_null(), "method", "mining.notify"); @@ -2983,7 +3049,7 @@ static void stratum_broadcast_update(sdata_t *sdata, bool clean) json_t *json_msg; ck_rlock(&sdata->workbase_lock); - json_msg = __stratum_notify(sdata, clean); + json_msg = __stratum_notify(sdata->current_workbase, clean); ck_runlock(&sdata->workbase_lock); stratum_broadcast(sdata, json_msg); @@ -2995,7 +3061,7 @@ static void stratum_send_update(sdata_t *sdata, int64_t client_id, bool clean) json_t *json_msg; ck_rlock(&sdata->workbase_lock); - json_msg = __stratum_notify(sdata, clean); + json_msg = __stratum_notify(sdata->current_workbase, clean); ck_runlock(&sdata->workbase_lock); stratum_add_send(sdata, json_msg, client_id); @@ -3264,6 +3330,7 @@ static void srecv_process(ckpool_t *ckp, char *buf) { sdata_t *sdata = ckp->data; stratum_instance_t *client; + bool added = false; smsg_t *msg; json_t *val; int server; @@ -3310,11 +3377,16 @@ static void srecv_process(ckpool_t *ckp, char *buf) ck_wlock(&sdata->instance_lock); client = __instance_by_id(sdata, msg->client_id); /* If client_id instance doesn't exist yet, create one */ - if (unlikely(!client)) + if (unlikely(!client)) { client = __stratum_add_instance(ckp, msg->client_id, server); + added = true; + } __inc_instance_ref(client); ck_wunlock(&sdata->instance_lock); + if (added) + LOGINFO("Stratifier added instance %ld server %d", client->id, server); + parse_instance_msg(sdata, msg, client); dec_instance_ref(sdata, client); out: @@ -3639,6 +3711,7 @@ out: * avoid floods of stat data coming at once. */ static void update_workerstats(ckpool_t *ckp, sdata_t *sdata) { + json_entry_t *json_list = NULL, *entry, *tmpentry; user_instance_t *user, *tmp; char cdfield[64]; time_t now_t; @@ -3671,8 +3744,8 @@ static void update_workerstats(ckpool_t *ckp, sdata_t *sdata) continue; DL_FOREACH(user->worker_instances, worker) { double ghs1, ghs5, ghs60, ghs1440; - int elapsed; json_t *val; + int elapsed; /* Send one lot of stats once the worker is idle if * they have submitted no shares in the last 10 minutes @@ -3699,10 +3772,19 @@ static void update_workerstats(ckpool_t *ckp, sdata_t *sdata) "createcode", __func__, "createinet", ckp->serverurl[0]); worker->notified_idle = worker->idle; - ckdbq_add(ckp, ID_WORKERSTATS, val); + entry = ckalloc(sizeof(json_entry_t)); + entry->val = val; + DL_APPEND(json_list, entry); } } ck_runlock(&sdata->instance_lock); + + /* Add all entries outside of the instance lock */ + DL_FOREACH_SAFE(json_list, entry, tmpentry) { + ckdbq_add(ckp, ID_WORKERSTATS, entry->val); + DL_DELETE(json_list, entry); + free(entry); + } } static void *statsupdate(void *arg) @@ -3724,6 +3806,7 @@ static void *statsupdate(void *arg) double tdiff, per_tdiff; char suffix1[16], suffix5[16], suffix15[16], suffix60[16], cdfield[64]; char suffix360[16], suffix1440[16], suffix10080[16]; + char_entry_t *char_list = NULL, *char_t, *chartmp_t; user_instance_t *instance, *tmpuser; stratum_instance_t *client, *tmp; double sps1, sps5, sps15, sps60; @@ -3852,14 +3935,24 @@ static void *statsupdate(void *arg) } s = json_dumps(val, JSON_NO_UTF8 | JSON_PRESERVE_ORDER); fprintf(fp, "%s\n", s); - if (!idle) - LOGNOTICE("User %s:%s", instance->username, s); + if (!idle) { + char_t = ckalloc(sizeof(char_entry_t)); + ASPRINTF(&char_t->buf, "User %s:%s", instance->username, s); + DL_APPEND(char_list, char_t); + } dealloc(s); json_decref(val); fclose(fp); } ck_runlock(&sdata->instance_lock); + DL_FOREACH_SAFE(char_list, char_t, chartmp_t) { + LOGNOTICE("%s", char_t->buf); + DL_DELETE(char_list, char_t); + free(char_t->buf); + dealloc(char_t); + } + ghs1 = stats->dsps1 * nonces; suffix_string(ghs1, suffix1, 16, 0); sps1 = stats->sps1;