diff --git a/src/stratifier.c b/src/stratifier.c index d6279b58..4cfcfe29 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -28,14 +28,11 @@ #include "uthash.h" #include "utlist.h" +/* Consistent across all pool instances */ static const char *workpadding = "000000800000000000000000000000000000000000000000000000000000000000000000000000000000000080020000"; - static const char *scriptsig_header = "01000000010000000000000000000000000000000000000000000000000000000000000000ffffffff"; static uchar scriptsig_header_bin[41]; -static char pubkeytxnbin[25]; -static char donkeytxnbin[25]; - /* Add unaccounted shares when they arrive, remove them with each update of * rolling stats. */ struct pool_stats { @@ -76,21 +73,6 @@ struct pool_stats { typedef struct pool_stats pool_stats_t; -static pool_stats_t stats; - -/* Protects changes to pool stats */ -static pthread_mutex_t stats_lock; - -/* Serialises sends/receives to ckdb if possible */ -static pthread_mutex_t ckdb_lock; - -static union { - uint64_t u64; - uint32_t u32; - uint16_t u16; - uint8_t u8; -} enonce1u; - struct workbase { /* Hash table data */ UT_hash_handle hh; @@ -148,31 +130,6 @@ struct workbase { typedef struct workbase workbase_t; -/* For protecting the hashtable data */ -static cklock_t workbase_lock; - -/* For the hashtable of all workbases */ -static workbase_t *workbases; -static workbase_t *current_workbase; - -static struct { - double diff; - - char enonce1[32]; - uchar enonce1bin[16]; - int enonce1constlen; - int enonce1varlen; - - int nonce2len; - int enonce2varlen; - - bool subscribed; -} proxy_base; - -static int64_t workbase_id; -static int64_t blockchange_id; -static char lasthash[68], lastswaphash[68]; - struct json_params { json_t *method; json_t *params; @@ -192,15 +149,6 @@ struct smsg { typedef struct smsg smsg_t; -static ckmsgq_t *ssends; // Stratum sends -static ckmsgq_t *srecvs; // Stratum receives -static ckmsgq_t *ckdbq; // ckdb -static ckmsgq_t *sshareq; // Stratum share sends -static ckmsgq_t *sauthq; // Stratum authorisations -static ckmsgq_t *stxnq; // Transaction requests - -static int64_t user_instance_id; - struct user_instance; struct worker_instance; struct stratum_instance; @@ -232,8 +180,6 @@ struct user_instance { tv_t last_share; }; -static user_instance_t *user_instances; - /* Combined data from workers with the same workername */ struct worker_instance { user_instance_t *instance; @@ -305,14 +251,6 @@ struct stratum_instance { int64_t suggest_diff; /* Stratum client suggested diff */ }; -/* Stratum_instances hashlist is stored by id, whereas disconnected_instances - * is sorted by enonce1_64. */ -static stratum_instance_t *stratum_instances; -static stratum_instance_t *disconnected_instances; - -/* Protects both stratum and user instances */ -static cklock_t instance_lock; - struct share { UT_hash_handle hh; uchar hash[32]; @@ -321,17 +259,84 @@ struct share { typedef struct share share_t; -static share_t *shares; +struct stratifier_data { + char pubkeytxnbin[25]; + char donkeytxnbin[25]; + + pool_stats_t stats; + /* Protects changes to pool stats */ + pthread_mutex_t stats_lock; -static cklock_t share_lock; + /* Serialises sends/receives to ckdb if possible */ + pthread_mutex_t ckdb_lock; -/* Linked list of block solves, added to during submission, removed on - * accept/reject. It is likely we only ever have one solve on here but you - * never know... */ -static pthread_mutex_t block_lock; -static ckmsg_t *block_solves; + /* Variable length enonce1 always refers back to a u64 */ + union { + uint64_t u64; + uint32_t u32; + uint16_t u16; + uint8_t u8; + } enonce1u; -static int gen_priority; + /* For protecting the hashtable data */ + cklock_t workbase_lock; + + /* For the hashtable of all workbases */ + workbase_t *workbases; + workbase_t *current_workbase; + + int64_t workbase_id; + int64_t blockchange_id; + char lasthash[68]; + char lastswaphash[68]; + + ckmsgq_t *ssends; // Stratum sends + ckmsgq_t *srecvs; // Stratum receives + ckmsgq_t *ckdbq; // ckdb + ckmsgq_t *sshareq; // Stratum share sends + ckmsgq_t *sauthq; // Stratum authorisations + ckmsgq_t *stxnq; // Transaction requests + + int64_t user_instance_id; + + /* Stratum_instances hashlist is stored by id, whereas disconnected_instances + * is sorted by enonce1_64. */ + stratum_instance_t *stratum_instances; + stratum_instance_t *disconnected_instances; + + user_instance_t *user_instances; + + /* Protects both stratum and user instances */ + cklock_t instance_lock; + + share_t *shares; + cklock_t share_lock; + + /* Linked list of block solves, added to during submission, removed on + * accept/reject. It is likely we only ever have one solve on here but + * you never know... */ + pthread_mutex_t block_lock; + ckmsg_t *block_solves; + + /* Generator message priority */ + int gen_priority; + + struct { + double diff; + + char enonce1[32]; + uchar enonce1bin[16]; + int enonce1constlen; + int enonce1varlen; + + int nonce2len; + int enonce2varlen; + + bool subscribed; + } proxy_base; +}; + +typedef struct stratifier_data sdata_t; /* Priority levels for generator messages */ #define GEN_LAX 0 @@ -365,6 +370,7 @@ static const char *ckdb_ids[] = { static void generate_coinbase(ckpool_t *ckp, workbase_t *wb) { uint64_t *u64, g64, d64 = 0; + sdata_t *sdata = ckp->data; char header[228]; int len, ofs = 0; ts_t now; @@ -448,7 +454,7 @@ static void generate_coinbase(ckpool_t *ckp, workbase_t *wb) wb->coinb2len += 8; wb->coinb2bin[wb->coinb2len++] = 25; - memcpy(wb->coinb2bin + wb->coinb2len, pubkeytxnbin, 25); + memcpy(wb->coinb2bin + wb->coinb2len, sdata->pubkeytxnbin, 25); wb->coinb2len += 25; if (ckp->donvalid) { @@ -457,7 +463,7 @@ static void generate_coinbase(ckpool_t *ckp, workbase_t *wb) wb->coinb2len += 8; wb->coinb2bin[wb->coinb2len++] = 25; - memcpy(wb->coinb2bin + wb->coinb2len, donkeytxnbin, 25); + memcpy(wb->coinb2bin + wb->coinb2len, sdata->donkeytxnbin, 25); wb->coinb2len += 25; } @@ -477,7 +483,7 @@ static void generate_coinbase(ckpool_t *ckp, workbase_t *wb) hex2bin(wb->headerbin, header, 112); } -static void stratum_broadcast_update(bool clean); +static void stratum_broadcast_update(sdata_t *sdata, bool clean); static void clear_workbase(workbase_t *wb) { @@ -493,20 +499,20 @@ static void clear_workbase(workbase_t *wb) free(wb); } -static void purge_share_hashtable(int64_t wb_id) +static void purge_share_hashtable(sdata_t *sdata, int64_t wb_id) { share_t *share, *tmp; int purged = 0; - ck_wlock(&share_lock); - HASH_ITER(hh, shares, share, tmp) { + ck_wlock(&sdata->share_lock); + HASH_ITER(hh, sdata->shares, share, tmp) { if (share->workbase_id < wb_id) { - HASH_DEL(shares, share); + HASH_DEL(sdata->shares, share); free(share); purged++; } } - ck_wunlock(&share_lock); + ck_wunlock(&sdata->share_lock); if (purged) LOGINFO("Cleared %d shares from share hashtable", purged); @@ -537,6 +543,7 @@ static void _ckdbq_add(ckpool_t *ckp, const int idtype, json_t *val, const char const char *func, const int line) { static time_t time_counter; + sdata_t *sdata = ckp->data; static int counter = 0; char *json_msg; time_t now_t; @@ -565,7 +572,7 @@ static void _ckdbq_add(ckpool_t *ckp, const int idtype, json_t *val, const char return; } - ckmsgq_add(ckdbq, json_msg); + ckmsgq_add(sdata->ckdbq, json_msg); } #define ckdbq_add(ckp, idtype, val) _ckdbq_add(ckp, idtype, val, __FILE__, __func__, __LINE__) @@ -618,6 +625,7 @@ static void send_ageworkinfo(ckpool_t *ckp, int64_t id) static void add_base(ckpool_t *ckp, workbase_t *wb, bool *new_block) { workbase_t *tmp, *tmpa, *aged = NULL; + sdata_t *sdata = ckp->data; int len, ret; ts_realtime(&wb->gentime); @@ -629,21 +637,21 @@ static void add_base(ckpool_t *ckp, workbase_t *wb, bool *new_block) /* In proxy mode, the wb->id is received in the notify update and * we set workbase_id from it. In server mode the stratifier is * setting the workbase_id */ - ck_wlock(&workbase_lock); + ck_wlock(&sdata->workbase_lock); if (!ckp->proxy) - wb->id = workbase_id++; + wb->id = sdata->workbase_id++; else - workbase_id = wb->id; - if (strncmp(wb->prevhash, lasthash, 64)) { + sdata->workbase_id = wb->id; + if (strncmp(wb->prevhash, sdata->lasthash, 64)) { char bin[32], swap[32]; *new_block = true; - memcpy(lasthash, wb->prevhash, 65); - hex2bin(bin, lasthash, 32); + memcpy(sdata->lasthash, wb->prevhash, 65); + hex2bin(bin, sdata->lasthash, 32); swap_256(swap, bin); - __bin2hex(lastswaphash, swap, 32); - LOGNOTICE("Block hash changed to %s", lastswaphash); - blockchange_id = wb->id; + __bin2hex(sdata->lastswaphash, swap, 32); + LOGNOTICE("Block hash changed to %s", sdata->lastswaphash); + sdata->blockchange_id = wb->id; } if (*new_block && ckp->logshares) { sprintf(wb->logdir, "%s%08x/", ckp->logdir, wb->height); @@ -655,22 +663,22 @@ static void add_base(ckpool_t *ckp, workbase_t *wb, bool *new_block) if (ckp->logshares) sprintf(wb->logdir, "%s%08x/%s", ckp->logdir, wb->height, wb->idstring); - HASH_ITER(hh, workbases, tmp, tmpa) { - if (HASH_COUNT(workbases) < 3) + HASH_ITER(hh, sdata->workbases, tmp, tmpa) { + if (HASH_COUNT(sdata->workbases) < 3) break; /* Age old workbases older than 10 minutes old */ if (tmp->gentime.tv_sec < wb->gentime.tv_sec - 600) { - HASH_DEL(workbases, tmp); + HASH_DEL(sdata->workbases, tmp); aged = tmp; break; } } - HASH_ADD_I64(workbases, id, wb); - current_workbase = wb; - ck_wunlock(&workbase_lock); + HASH_ADD_I64(sdata->workbases, id, wb); + sdata->current_workbase = wb; + ck_wunlock(&sdata->workbase_lock); if (*new_block) - purge_share_hashtable(wb->id); + purge_share_hashtable(sdata, wb->id); send_workinfo(ckp, wb); @@ -687,17 +695,18 @@ static void add_base(ckpool_t *ckp, workbase_t *wb, bool *new_block) * read the wrong priority but occasional wrong values are harmless. */ static char *__send_recv_generator(ckpool_t *ckp, const char *msg, int prio) { + sdata_t *sdata = ckp->data; char *buf = NULL; bool set; - if (prio > gen_priority) { - gen_priority = prio; + if (prio > sdata->gen_priority) { + sdata->gen_priority = prio; set = true; } else set = false; buf = send_recv_proc(ckp->generator, msg); if (set) - gen_priority = 0; + sdata->gen_priority = 0; return buf; } @@ -706,25 +715,27 @@ static char *__send_recv_generator(ckpool_t *ckp, const char *msg, int prio) * any currently being serviced. */ static char *send_recv_generator(ckpool_t *ckp, const char *msg, int prio) { + sdata_t *sdata = ckp->data; char *buf = NULL; - if (prio >= gen_priority) + if (prio >= sdata->gen_priority) buf = __send_recv_generator(ckp, msg, prio); return buf; } static void send_generator(ckpool_t *ckp, const char *msg, int prio) { + sdata_t *sdata = ckp->data; bool set; - if (prio > gen_priority) { - gen_priority = prio; + if (prio > sdata->gen_priority) { + sdata->gen_priority = prio; set = true; } else set = false; send_proc(ckp->generator, msg); if (set) - gen_priority = 0; + sdata->gen_priority = 0; } struct update_req { @@ -733,7 +744,7 @@ struct update_req { int prio; }; -static void broadcast_ping(void); +static void broadcast_ping(sdata_t *sdata); /* This function assumes it will only receive a valid json gbt base template * since checking should have been done earlier, and creates the base template @@ -742,6 +753,7 @@ static void *do_update(void *arg) { struct update_req *ur = (struct update_req *)arg; ckpool_t *ckp = ur->ckp; + sdata_t *sdata = ckp->data; bool new_block = false; int prio = ur->prio; bool ret = false; @@ -803,7 +815,7 @@ static void *do_update(void *arg) add_base(ckp, wb, &new_block); - stratum_broadcast_update(new_block); + stratum_broadcast_update(sdata, new_block); ret = true; LOGINFO("Broadcast updated stratum base"); out: @@ -811,7 +823,7 @@ out: * connected while bitcoind recovers(?) */ if (!ret) { LOGWARNING("Broadcast ping due to failed stratum base update"); - broadcast_ping(); + broadcast_ping(sdata); } free(ur->pth); free(ur); @@ -832,22 +844,24 @@ static void update_base(ckpool_t *ckp, int prio) static void drop_allclients(ckpool_t *ckp) { stratum_instance_t *client, *tmp; + sdata_t *sdata = ckp->data; char buf[128]; - ck_wlock(&instance_lock); - HASH_ITER(hh, stratum_instances, client, tmp) { - HASH_DEL(stratum_instances, client); + ck_wlock(&sdata->instance_lock); + HASH_ITER(hh, sdata->stratum_instances, client, tmp) { + HASH_DEL(sdata->stratum_instances, client); sprintf(buf, "dropclient=%ld", client->id); send_proc(ckp->connector, buf); } - HASH_ITER(hh, disconnected_instances, client, tmp) - HASH_DEL(disconnected_instances, client); - stats.users = stats.workers = 0; - ck_wunlock(&instance_lock); + HASH_ITER(hh, sdata->disconnected_instances, client, tmp) + HASH_DEL(sdata->disconnected_instances, client); + sdata->stats.users = sdata->stats.workers = 0; + ck_wunlock(&sdata->instance_lock); } static void update_subscribe(ckpool_t *ckp) { + sdata_t *sdata = ckp->data; json_t *val; char *buf; @@ -861,22 +875,22 @@ static void update_subscribe(ckpool_t *ckp) val = json_loads(buf, 0, NULL); free(buf); - ck_wlock(&workbase_lock); - proxy_base.subscribed = true; - proxy_base.diff = ckp->startdiff; + ck_wlock(&sdata->workbase_lock); + sdata->proxy_base.subscribed = true; + sdata->proxy_base.diff = ckp->startdiff; /* Length is checked by generator */ - strcpy(proxy_base.enonce1, json_string_value(json_object_get(val, "enonce1"))); - proxy_base.enonce1constlen = strlen(proxy_base.enonce1) / 2; - hex2bin(proxy_base.enonce1bin, proxy_base.enonce1, proxy_base.enonce1constlen); - proxy_base.nonce2len = json_integer_value(json_object_get(val, "nonce2len")); - if (proxy_base.nonce2len > 7) - proxy_base.enonce1varlen = 4; - else if (proxy_base.nonce2len > 5) - proxy_base.enonce1varlen = 2; + strcpy(sdata->proxy_base.enonce1, json_string_value(json_object_get(val, "enonce1"))); + sdata->proxy_base.enonce1constlen = strlen(sdata->proxy_base.enonce1) / 2; + hex2bin(sdata->proxy_base.enonce1bin, sdata->proxy_base.enonce1, sdata->proxy_base.enonce1constlen); + sdata->proxy_base.nonce2len = json_integer_value(json_object_get(val, "nonce2len")); + if (sdata->proxy_base.nonce2len > 7) + sdata->proxy_base.enonce1varlen = 4; + else if (sdata->proxy_base.nonce2len > 5) + sdata->proxy_base.enonce1varlen = 2; else - proxy_base.enonce1varlen = 1; - proxy_base.enonce2varlen = proxy_base.nonce2len - proxy_base.enonce1varlen; - ck_wunlock(&workbase_lock); + sdata->proxy_base.enonce1varlen = 1; + sdata->proxy_base.enonce2varlen = sdata->proxy_base.nonce2len - sdata->proxy_base.enonce1varlen; + ck_wunlock(&sdata->workbase_lock); json_decref(val); drop_allclients(ckp); @@ -887,6 +901,7 @@ static void update_diff(ckpool_t *ckp); static void update_notify(ckpool_t *ckp) { bool new_block = false, clean; + sdata_t *sdata = ckp->data; char header[228]; workbase_t *wb; json_t *val; @@ -899,7 +914,7 @@ static void update_notify(ckpool_t *ckp) return; } - if (unlikely(!proxy_base.subscribed)) { + if (unlikely(!sdata->proxy_base.subscribed)) { LOGINFO("No valid proxy subscription to update notify yet"); return; } @@ -949,30 +964,31 @@ static void update_notify(ckpool_t *ckp) /* Check diff on each notify */ update_diff(ckp); - ck_rlock(&workbase_lock); - strcpy(wb->enonce1const, proxy_base.enonce1); - wb->enonce1constlen = proxy_base.enonce1constlen; - memcpy(wb->enonce1constbin, proxy_base.enonce1bin, wb->enonce1constlen); - wb->enonce1varlen = proxy_base.enonce1varlen; - wb->enonce2varlen = proxy_base.enonce2varlen; - wb->diff = proxy_base.diff; - ck_runlock(&workbase_lock); + ck_rlock(&sdata->workbase_lock); + strcpy(wb->enonce1const, sdata->proxy_base.enonce1); + wb->enonce1constlen = sdata->proxy_base.enonce1constlen; + memcpy(wb->enonce1constbin, sdata->proxy_base.enonce1bin, wb->enonce1constlen); + wb->enonce1varlen = sdata->proxy_base.enonce1varlen; + wb->enonce2varlen = sdata->proxy_base.enonce2varlen; + wb->diff = sdata->proxy_base.diff; + ck_runlock(&sdata->workbase_lock); add_base(ckp, wb, &new_block); - stratum_broadcast_update(new_block | clean); + stratum_broadcast_update(sdata, new_block | clean); } -static void stratum_send_diff(stratum_instance_t *client); +static void stratum_send_diff(sdata_t *sdata, stratum_instance_t *client); static void update_diff(ckpool_t *ckp) { stratum_instance_t *client; + sdata_t *sdata = ckp->data; double old_diff, diff; json_t *val; char *buf; - if (unlikely(!current_workbase)) { + if (unlikely(!sdata->current_workbase)) { LOGINFO("No current workbase to update diff yet"); return; } @@ -994,32 +1010,32 @@ static void update_diff(ckpool_t *ckp) if (unlikely(diff < 1)) diff = 1; - ck_wlock(&workbase_lock); - old_diff = proxy_base.diff; - current_workbase->diff = proxy_base.diff = diff; - ck_wunlock(&workbase_lock); + ck_wlock(&sdata->workbase_lock); + old_diff = sdata->proxy_base.diff; + sdata->current_workbase->diff = sdata->proxy_base.diff = diff; + ck_wunlock(&sdata->workbase_lock); if (old_diff < diff) return; /* If the diff has dropped, iterate over all the clients and check * they're at or below the new diff, and update it if not. */ - ck_rlock(&instance_lock); - for (client = stratum_instances; client != NULL; client = client->hh.next) { + ck_rlock(&sdata->instance_lock); + for (client = sdata->stratum_instances; client != NULL; client = client->hh.next) { if (client->diff > diff) { client->diff = diff; - stratum_send_diff(client); + stratum_send_diff(sdata, client); } } - ck_runlock(&instance_lock); + ck_runlock(&sdata->instance_lock); } /* Enter with instance_lock held */ -static stratum_instance_t *__instance_by_id(int64_t id) +static stratum_instance_t *__instance_by_id(sdata_t *sdata, int64_t id) { stratum_instance_t *instance; - HASH_FIND_I64(stratum_instances, &id, instance); + HASH_FIND_I64(sdata->stratum_instances, &id, instance); return instance; } @@ -1027,18 +1043,19 @@ static stratum_instance_t *__instance_by_id(int64_t id) static stratum_instance_t *__stratum_add_instance(ckpool_t *ckp, int64_t id) { stratum_instance_t *instance = ckzalloc(sizeof(stratum_instance_t)); + sdata_t *sdata = ckp->data; instance->id = id; instance->diff = instance->old_diff = ckp->startdiff; instance->ckp = ckp; tv_time(&instance->ldc); LOGINFO("Added instance %ld", id); - HASH_ADD_I64(stratum_instances, id, instance); + HASH_ADD_I64(sdata->stratum_instances, id, instance); return instance; } /* Only supports a full ckpool instance sessionid with an 8 byte sessionid */ -static bool disconnected_sessionid_exists(const char *sessionid, int64_t id) +static bool disconnected_sessionid_exists(sdata_t *sdata, const char *sessionid, int64_t id) { stratum_instance_t *instance, *tmp; uint64_t session64; @@ -1051,8 +1068,8 @@ static bool disconnected_sessionid_exists(const char *sessionid, int64_t id) /* Number is in BE but we don't swap either of them */ hex2bin(&session64, sessionid, 8); - ck_rlock(&instance_lock); - HASH_ITER(hh, stratum_instances, instance, tmp) { + ck_rlock(&sdata->instance_lock); + HASH_ITER(hh, sdata->stratum_instances, instance, tmp) { if (instance->id == id) continue; if (instance->enonce1_64 == session64) { @@ -1061,11 +1078,11 @@ static bool disconnected_sessionid_exists(const char *sessionid, int64_t id) } } instance = NULL; - HASH_FIND(hh, disconnected_instances, &session64, sizeof(uint64_t), instance); + HASH_FIND(hh, sdata->disconnected_instances, &session64, sizeof(uint64_t), instance); if (instance) ret = true; out_unlock: - ck_runlock(&instance_lock); + ck_runlock(&sdata->instance_lock); out: return ret; } @@ -1073,7 +1090,7 @@ out: /* For creating a list of sends without locking that can then be concatenated * to the stratum_sends list. Minimises locking and avoids taking recursive * locks. */ -static void stratum_broadcast(json_t *val) +static void stratum_broadcast(sdata_t *sdata, json_t *val) { stratum_instance_t *instance, *tmp; ckmsg_t *bulk_send = NULL; @@ -1083,8 +1100,8 @@ static void stratum_broadcast(json_t *val) return; } - ck_rlock(&instance_lock); - HASH_ITER(hh, stratum_instances, instance, tmp) { + ck_rlock(&sdata->instance_lock); + HASH_ITER(hh, sdata->stratum_instances, instance, tmp) { ckmsg_t *client_msg; smsg_t *msg; @@ -1097,59 +1114,63 @@ static void stratum_broadcast(json_t *val) client_msg->data = msg; DL_APPEND(bulk_send, client_msg); } - ck_runlock(&instance_lock); + ck_runlock(&sdata->instance_lock); json_decref(val); if (!bulk_send) return; - mutex_lock(ssends->lock); - if (ssends->msgs) - DL_CONCAT(ssends->msgs, bulk_send); + mutex_lock(sdata->ssends->lock); + if (sdata->ssends->msgs) + DL_CONCAT(sdata->ssends->msgs, bulk_send); else - ssends->msgs = bulk_send; - pthread_cond_signal(ssends->cond); - mutex_unlock(ssends->lock); + sdata->ssends->msgs = bulk_send; + pthread_cond_signal(sdata->ssends->cond); + mutex_unlock(sdata->ssends->lock); } -static void stratum_add_send(json_t *val, int64_t client_id) +static void stratum_add_send(sdata_t *sdata, json_t *val, int64_t client_id) { smsg_t *msg; msg = ckzalloc(sizeof(smsg_t)); msg->json_msg = val; msg->client_id = client_id; - ckmsgq_add(ssends, msg); + ckmsgq_add(sdata->ssends, msg); } -static void inc_worker(user_instance_t *instance) +static void inc_worker(ckpool_t *ckp, user_instance_t *instance) { - mutex_lock(&stats_lock); - stats.workers++; + sdata_t *sdata = ckp->data; + + mutex_lock(&sdata->stats_lock); + sdata->stats.workers++; if (!instance->workers++) - stats.users++; - mutex_unlock(&stats_lock); + sdata->stats.users++; + mutex_unlock(&sdata->stats_lock); } -static void dec_worker(user_instance_t *instance) +static void dec_worker(ckpool_t *ckp, user_instance_t *instance) { - mutex_lock(&stats_lock); - stats.workers--; + sdata_t *sdata = ckp->data; + + mutex_lock(&sdata->stats_lock); + sdata->stats.workers--; if (!--instance->workers) - stats.users--; - mutex_unlock(&stats_lock); + sdata->stats.users--; + mutex_unlock(&sdata->stats_lock); } -static void drop_client(int64_t id) +static void drop_client(sdata_t *sdata, int64_t id) { stratum_instance_t *client = NULL; bool dec = false; LOGINFO("Stratifier dropping client %ld", id); - ck_wlock(&instance_lock); - client = __instance_by_id(id); + ck_wlock(&sdata->instance_lock); + client = __instance_by_id(sdata, id); if (client) { stratum_instance_t *old_client = NULL; @@ -1158,30 +1179,30 @@ static void drop_client(int64_t id) client->authorised = false; } - HASH_DEL(stratum_instances, client); - HASH_FIND(hh, disconnected_instances, &client->enonce1_64, sizeof(uint64_t), old_client); + HASH_DEL(sdata->stratum_instances, client); + HASH_FIND(hh, sdata->disconnected_instances, &client->enonce1_64, sizeof(uint64_t), old_client); /* Only keep around one copy of the old client in server mode */ if (!client->ckp->proxy && !old_client && client->enonce1_64) - HASH_ADD(hh, disconnected_instances, enonce1_64, sizeof(uint64_t), client); + HASH_ADD(hh, sdata->disconnected_instances, enonce1_64, sizeof(uint64_t), client); } - ck_wunlock(&instance_lock); + ck_wunlock(&sdata->instance_lock); if (dec) - dec_worker(client->user_instance); + dec_worker(client->ckp, client->user_instance); } -static void stratum_broadcast_message(const char *msg) +static void stratum_broadcast_message(sdata_t *sdata, const char *msg) { json_t *json_msg; JSON_CPACK(json_msg, "{sosss[s]}", "id", json_null(), "method", "client.show_message", "params", msg); - stratum_broadcast(json_msg); + stratum_broadcast(sdata, json_msg); } /* Send a generic reconnect to all clients without parameters to make them * reconnect to the same server. */ -static void reconnect_clients(const char *cmd) +static void reconnect_clients(sdata_t *sdata, const char *cmd) { char *port = strdupa(cmd), *url = NULL; json_t *json_msg; @@ -1198,12 +1219,13 @@ static void reconnect_clients(const char *cmd) } else JSON_CPACK(json_msg, "{sosss[]}", "id", json_null(), "method", "client.reconnect", "params"); - stratum_broadcast(json_msg); + stratum_broadcast(sdata, json_msg); } static void block_solve(ckpool_t *ckp, const char *blockhash) { ckmsg_t *block, *tmp, *found = NULL; + sdata_t *sdata = ckp->data; char cdfield[64]; int height = 0; ts_t ts_now; @@ -1215,8 +1237,8 @@ static void block_solve(ckpool_t *ckp, const char *blockhash) ts_realtime(&ts_now); sprintf(cdfield, "%lu,%lu", ts_now.tv_sec, ts_now.tv_nsec); - mutex_lock(&block_lock); - DL_FOREACH_SAFE(block_solves, block, tmp) { + mutex_lock(&sdata->block_lock); + DL_FOREACH_SAFE(sdata->block_solves, block, tmp) { val = block->data; char *solvehash; @@ -1228,12 +1250,12 @@ static void block_solve(ckpool_t *ckp, const char *blockhash) if (!strcmp(solvehash, blockhash)) { dealloc(solvehash); found = block; - DL_DELETE(block_solves, block); + DL_DELETE(sdata->block_solves, block); break; } dealloc(solvehash); } - mutex_unlock(&block_lock); + mutex_unlock(&sdata->block_lock); if (unlikely(!found)) { LOGERR("Failed to find blockhash %s in block_solve!", blockhash); @@ -1249,20 +1271,20 @@ static void block_solve(ckpool_t *ckp, const char *blockhash) free(found); ASPRINTF(&msg, "Block %d solved by %s!", height, ckp->name); - stratum_broadcast_message(msg); + stratum_broadcast_message(sdata, msg); free(msg); LOGWARNING("Solved and confirmed block %d", height); } -static void block_reject(const char *blockhash) +static void block_reject(sdata_t *sdata, const char *blockhash) { ckmsg_t *block, *tmp, *found = NULL; int height = 0; json_t *val; - mutex_lock(&block_lock); - DL_FOREACH_SAFE(block_solves, block, tmp) { + mutex_lock(&sdata->block_lock); + DL_FOREACH_SAFE(sdata->block_solves, block, tmp) { val = block->data; char *solvehash; @@ -1274,12 +1296,12 @@ static void block_reject(const char *blockhash) if (!strcmp(solvehash, blockhash)) { dealloc(solvehash); found = block; - DL_DELETE(block_solves, block); + DL_DELETE(sdata->block_solves, block); break; } dealloc(solvehash); } - mutex_unlock(&block_lock); + mutex_unlock(&sdata->block_lock); if (unlikely(!found)) { LOGERR("Failed to find blockhash %s in block_reject!", blockhash); @@ -1296,7 +1318,7 @@ static void block_reject(const char *blockhash) /* Some upstream pools (like p2pool) don't update stratum often enough and * miners disconnect if they don't receive regular communication so send them * a ping at regular intervals */ -static void broadcast_ping(void) +static void broadcast_ping(sdata_t *sdata) { json_t *json_msg; @@ -1305,12 +1327,13 @@ static void broadcast_ping(void) "id", 42, "method", "mining.ping"); - stratum_broadcast(json_msg); + stratum_broadcast(sdata, json_msg); } static int stratum_loop(ckpool_t *ckp, proc_instance_t *pi) { int sockd, ret = 0, selret = 0; + sdata_t *sdata = ckp->data; unixsock_t *us = &pi->us; tv_t start_tv = {0, 0}; char *buf = NULL; @@ -1331,7 +1354,7 @@ retry: } else { LOGDEBUG("%ds elapsed in strat_loop, pinging miners", ckp->update_interval); - broadcast_ping(); + broadcast_ping(sdata); } continue; } @@ -1386,20 +1409,20 @@ retry: if (ret < 0) LOGDEBUG("Stratifier failed to parse dropclient command: %s", buf); else - drop_client(client_id); + drop_client(sdata, client_id); } else if (cmdmatch(buf, "dropall")) { drop_allclients(ckp); } else if (cmdmatch(buf, "block")) { block_solve(ckp, buf + 6); } else if (cmdmatch(buf, "noblock")) { - block_reject(buf + 8); + block_reject(sdata, buf + 8); } else if (cmdmatch(buf, "reconnect")) { - reconnect_clients(buf); + reconnect_clients(sdata, buf); } else if (cmdmatch(buf, "loglevel")) { sscanf(buf, "loglevel=%d", &ckp->loglevel); } else { /* The srecv_process frees the buf heap ram */ - ckmsgq_add(srecvs, buf); + ckmsgq_add(sdata->srecvs, buf); buf = NULL; } goto retry; @@ -1412,6 +1435,7 @@ out: static void *blockupdate(void *arg) { ckpool_t *ckp = (ckpool_t *)arg; + sdata_t *sdata = ckp->data; char *buf = NULL; char request[8]; @@ -1428,7 +1452,7 @@ static void *blockupdate(void *arg) buf = send_recv_generator(ckp, request, GEN_LAX); if (buf && cmdmatch(buf, "notify")) cksleep_ms(5000); - else if (buf && strcmp(buf, lastswaphash) && !cmdmatch(buf, "failed")) + else if (buf && strcmp(buf, sdata->lastswaphash) && !cmdmatch(buf, "failed")) update_base(ckp, GEN_PRIORITY); else cksleep_ms(ckp->blockpoll); @@ -1436,7 +1460,7 @@ static void *blockupdate(void *arg) return NULL; } -static inline bool enonce1_free(uint64_t enonce1) +static inline bool enonce1_free(sdata_t *sdata, uint64_t enonce1) { stratum_instance_t *client, *tmp; bool ret = true; @@ -1445,7 +1469,7 @@ static inline bool enonce1_free(uint64_t enonce1) ret = false; goto out; } - HASH_ITER(hh, stratum_instances, client, tmp) { + HASH_ITER(hh, sdata->stratum_instances, client, tmp) { if (client->enonce1_64 == enonce1) { ret = false; break; @@ -1461,46 +1485,47 @@ out: * unused enonce1 value and reject clients instead if there is no space left */ static bool new_enonce1(stratum_instance_t *client) { + sdata_t *sdata = client->ckp->data; bool ret = false; workbase_t *wb; int i; - ck_wlock(&workbase_lock); - wb = current_workbase; + ck_wlock(&sdata->workbase_lock); + wb = sdata->current_workbase; switch(wb->enonce1varlen) { case 8: - enonce1u.u64++; + sdata->enonce1u.u64++; ret = true; break; case 4: - enonce1u.u32++; + sdata->enonce1u.u32++; ret = true; break; case 2: for (i = 0; i < 65536; i++) { - enonce1u.u16++; - ret = enonce1_free(enonce1u.u64); + sdata->enonce1u.u16++; + ret = enonce1_free(sdata, sdata->enonce1u.u64); if (ret) break; } break; case 1: for (i = 0; i < 256; i++) { - enonce1u.u8++; - ret = enonce1_free(enonce1u.u64); + sdata->enonce1u.u8++; + ret = enonce1_free(sdata, sdata->enonce1u.u64); if (ret) break; } break; } if (ret) - client->enonce1_64 = enonce1u.u64; + client->enonce1_64 = sdata->enonce1u.u64; if (wb->enonce1constlen) memcpy(client->enonce1bin, wb->enonce1constbin, wb->enonce1constlen); memcpy(client->enonce1bin + wb->enonce1constlen, &client->enonce1_64, wb->enonce1varlen); __bin2hex(client->enonce1var, &client->enonce1_64, wb->enonce1varlen); __bin2hex(client->enonce1, client->enonce1bin, wb->enonce1constlen + wb->enonce1varlen); - ck_wunlock(&workbase_lock); + ck_wunlock(&sdata->workbase_lock); if (unlikely(!ret)) LOGWARNING("Enonce1 space exhausted! Proxy rejecting clients"); @@ -1508,23 +1533,24 @@ static bool new_enonce1(stratum_instance_t *client) return ret; } -static void stratum_send_message(stratum_instance_t *client, const char *msg); +static void stratum_send_message(sdata_t *sdata, stratum_instance_t *client, const char *msg); /* Extranonce1 must be set here */ static json_t *parse_subscribe(stratum_instance_t *client, int64_t client_id, json_t *params_val) { + sdata_t *sdata = client->ckp->data; bool old_match = false; int arr_size; json_t *ret; int n2len; if (unlikely(!json_is_array(params_val))) { - stratum_send_message(client, "Invalid json: params not an array"); + stratum_send_message(sdata, client, "Invalid json: params not an array"); return json_string("params not an array"); } - if (unlikely(!current_workbase)) { - stratum_send_message(client, "Pool Initialising"); + if (unlikely(!sdata->current_workbase)) { + stratum_send_message(sdata, client, "Pool Initialising"); return json_string("Initialising"); } @@ -1543,7 +1569,7 @@ static json_t *parse_subscribe(stratum_instance_t *client, int64_t client_id, js buf = json_string_value(json_array_get(params_val, 1)); LOGDEBUG("Found old session id %s", buf); /* Add matching here */ - if (disconnected_sessionid_exists(buf, client_id)) { + if (disconnected_sessionid_exists(sdata, buf, client_id)) { sprintf(client->enonce1, "%016lx", client->enonce1_64); old_match = true; } @@ -1553,7 +1579,7 @@ static json_t *parse_subscribe(stratum_instance_t *client, int64_t client_id, js if (!old_match) { /* Create a new extranonce1 based on a uint64_t pointer */ if (!new_enonce1(client)) { - stratum_send_message(client, "Pool full of clients"); + stratum_send_message(sdata, client, "Pool full of clients"); client->reject = 2; return json_string("proxy full"); } @@ -1564,14 +1590,14 @@ static json_t *parse_subscribe(stratum_instance_t *client, int64_t client_id, js client->enonce1); } - ck_rlock(&workbase_lock); - if (likely(workbases)) - n2len = workbases->enonce2varlen; + ck_rlock(&sdata->workbase_lock); + if (likely(sdata->workbases)) + n2len = sdata->workbases->enonce2varlen; else n2len = 8; JSON_CPACK(ret, "[[[s,s]],s,i]", "mining.notify", client->enonce1, client->enonce1, n2len); - ck_runlock(&workbase_lock); + ck_runlock(&sdata->workbase_lock); client->subscribed = true; @@ -1708,6 +1734,7 @@ static user_instance_t *generate_user(ckpool_t *ckp, stratum_instance_t *client, const char *workername) { char *base_username = strdupa(workername), *username; + sdata_t *sdata = ckp->data; user_instance_t *instance; stratum_instance_t *tmp; bool new = false; @@ -1720,16 +1747,16 @@ static user_instance_t *generate_user(ckpool_t *ckp, stratum_instance_t *client, if (unlikely(len > 127)) username[127] = '\0'; - ck_wlock(&instance_lock); - HASH_FIND_STR(user_instances, username, instance); + ck_wlock(&sdata->instance_lock); + HASH_FIND_STR(sdata->user_instances, username, instance); if (!instance) { /* New user instance. Secondary user id will be NULL */ instance = ckzalloc(sizeof(user_instance_t)); strcpy(instance->username, username); new = true; - instance->id = user_instance_id++; - HASH_ADD_STR(user_instances, username, instance); + instance->id = sdata->user_instance_id++; + HASH_ADD_STR(sdata->user_instances, username, instance); read_userstats(ckp, instance); } DL_FOREACH(instance->instances, tmp) { @@ -1751,7 +1778,7 @@ static user_instance_t *generate_user(ckpool_t *ckp, stratum_instance_t *client, client->worker_instance = worker; } DL_APPEND(instance->instances, client); - ck_wunlock(&instance_lock); + ck_wunlock(&sdata->instance_lock); if (new && !ckp->proxy) { /* Is this a btc address based username? */ @@ -1765,13 +1792,14 @@ static user_instance_t *generate_user(ckpool_t *ckp, stratum_instance_t *client, } /* Send this to the database and parse the response to authorise a user - * and get SUID parameters back. We don't add these requests to the ckdbqueue + * and get SUID parameters back. We don't add these requests to the sdata->ckdbqueue * since we have to wait for the response but this is done from the authoriser * thread so it won't hold anything up but other authorisations. */ static int send_recv_auth(stratum_instance_t *client) { user_instance_t *user_instance = client->user_instance; ckpool_t *ckp = client->ckp; + sdata_t *sdata = ckp->data; char *buf = NULL, *json_msg; char cdfield[64]; int ret = 1; @@ -1805,9 +1833,9 @@ static int send_recv_auth(stratum_instance_t *client) /* We want responses from ckdb serialised and not interleaved with * other requests. Wait up to 3 seconds for exclusive access to ckdb * and if we don't receive it treat it as a delayed auth if possible */ - if (likely(!mutex_timedlock(&ckdb_lock, 3))) { + if (likely(!mutex_timedlock(&sdata->ckdb_lock, 3))) { buf = ckdb_msg_call(ckp, json_msg); - mutex_unlock(&ckdb_lock); + mutex_unlock(&sdata->ckdb_lock); } free(json_msg); @@ -1883,6 +1911,7 @@ static json_t *parse_authorise(stratum_instance_t *client, json_t *params_val, j const char *address, int *errnum) { user_instance_t *user_instance; + ckpool_t *ckp = client->ckp; bool ret = false; const char *buf; int arr_size; @@ -1918,7 +1947,7 @@ static json_t *parse_authorise(stratum_instance_t *client, json_t *params_val, j *err_val = json_string("Invalid character in username"); goto out; } - user_instance = client->user_instance = generate_user(client->ckp, client, buf); + user_instance = client->user_instance = generate_user(ckp, client, buf); client->user_id = user_instance->id; ts_realtime(&now); client->start_time = now.tv_sec; @@ -1927,7 +1956,7 @@ static json_t *parse_authorise(stratum_instance_t *client, json_t *params_val, j LOGNOTICE("Authorised client %ld worker %s as user %s", client->id, buf, user_instance->username); client->workername = strdup(buf); - if (CKP_STANDALONE(client->ckp)) + if (CKP_STANDALONE(ckp)) ret = true; else { *errnum = send_recv_auth(client); @@ -1943,27 +1972,27 @@ static json_t *parse_authorise(stratum_instance_t *client, json_t *params_val, j } client->authorised = ret; if (client->authorised) - inc_worker(user_instance); + inc_worker(ckp, user_instance); out: return json_boolean(ret); } -static void stratum_send_diff(stratum_instance_t *client) +static void stratum_send_diff(sdata_t *sdata, stratum_instance_t *client) { json_t *json_msg; JSON_CPACK(json_msg, "{s[I]soss}", "params", client->diff, "id", json_null(), "method", "mining.set_difficulty"); - stratum_add_send(json_msg, client->id); + stratum_add_send(sdata, json_msg, client->id); } -static void stratum_send_message(stratum_instance_t *client, const char *msg) +static void stratum_send_message(sdata_t *sdata, stratum_instance_t *client, const char *msg) { json_t *json_msg; JSON_CPACK(json_msg, "{sosss[s]}", "id", json_null(), "method", "client.show_message", "params", msg); - stratum_add_send(json_msg, client->id); + stratum_add_send(sdata, json_msg, client->id); } static double time_bias(double tdiff, double period) @@ -1993,15 +2022,16 @@ static void add_submit(ckpool_t *ckp, stratum_instance_t *client, int diff, bool double tdiff, bdiff, dsps, drr, network_diff, bias; user_instance_t *instance = client->user_instance; int64_t next_blockid, optimal; + sdata_t *sdata = ckp->data; tv_t now_t; - mutex_lock(&stats_lock); + mutex_lock(&sdata->stats_lock); if (valid) { - stats.unaccounted_shares++; - stats.unaccounted_diff_shares += diff; + sdata->stats.unaccounted_shares++; + sdata->stats.unaccounted_diff_shares += diff; } else - stats.unaccounted_rejects += diff; - mutex_unlock(&stats_lock); + sdata->stats.unaccounted_rejects += diff; + mutex_unlock(&sdata->stats_lock); /* Count only accepted and stale rejects in diff calculation. */ if (!valid && !submit) @@ -2009,13 +2039,13 @@ static void add_submit(ckpool_t *ckp, stratum_instance_t *client, int diff, bool tv_time(&now_t); - ck_rlock(&workbase_lock); - next_blockid = workbase_id + 1; + ck_rlock(&sdata->workbase_lock); + next_blockid = sdata->workbase_id + 1; if (ckp->proxy) - network_diff = current_workbase->diff; + network_diff = sdata->current_workbase->diff; else - network_diff = current_workbase->network_diff; - ck_runlock(&workbase_lock); + network_diff = sdata->current_workbase->network_diff; + ck_runlock(&sdata->workbase_lock); if (unlikely(!client->first_share.tv_sec)) { copy_tv(&client->first_share, &now_t); @@ -2111,7 +2141,7 @@ static void add_submit(ckpool_t *ckp, stratum_instance_t *client, int diff, bool client->diff_change_job_id = next_blockid; client->old_diff = client->diff; client->diff = optimal; - stratum_send_diff(client); + stratum_send_diff(sdata, client); } /* We should already be holding the workbase_lock */ @@ -2123,14 +2153,15 @@ test_blocksolve(stratum_instance_t *client, workbase_t *wb, const uchar *data, c char hexcoinbase[1024], blockhash[68]; json_t *val = NULL, *val_copy; char *gbt_block, varint[12]; + ckpool_t *ckp = wb->ckp; + sdata_t *sdata = ckp->data; ckmsg_t *block_ckmsg; char cdfield[64]; uchar swap[32]; - ckpool_t *ckp; ts_t ts_now; /* Submit anything over 99% of the diff in case of rounding errors */ - if (diff < current_workbase->network_diff * 0.99) + if (diff < sdata->current_workbase->network_diff * 0.99) return; LOGWARNING("Possible block solve diff %f !", diff); @@ -2168,7 +2199,6 @@ test_blocksolve(stratum_instance_t *client, workbase_t *wb, const uchar *data, c strcat(gbt_block, hexcoinbase); if (wb->transactions) realloc_strcat(&gbt_block, wb->txn_data); - ckp = wb->ckp; send_generator(ckp, gbt_block, GEN_PRIORITY); free(gbt_block); @@ -2192,9 +2222,9 @@ test_blocksolve(stratum_instance_t *client, workbase_t *wb, const uchar *data, c block_ckmsg = ckalloc(sizeof(ckmsg_t)); block_ckmsg->data = val_copy; - mutex_lock(&block_lock); - DL_APPEND(block_solves, block_ckmsg); - mutex_unlock(&block_lock); + mutex_lock(&sdata->block_lock); + DL_APPEND(sdata->block_solves, block_ckmsg); + mutex_unlock(&sdata->block_lock); ckdbq_add(ckp, ID_BLOCK, val); } @@ -2259,22 +2289,22 @@ static double submission_diff(stratum_instance_t *client, workbase_t *wb, const return ret; } -static bool new_share(const uchar *hash, int64_t wb_id) +static bool new_share(sdata_t *sdata, const uchar *hash, int64_t wb_id) { share_t *share, *match = NULL; bool ret = false; - ck_wlock(&share_lock); - HASH_FIND(hh, shares, hash, 32, match); + ck_wlock(&sdata->share_lock); + HASH_FIND(hh, sdata->shares, hash, 32, match); if (match) goto out_unlock; share = ckzalloc(sizeof(share_t)); memcpy(share->hash, hash, 32); share->workbase_id = wb_id; - HASH_ADD(hh, shares, hash, 32, share); + HASH_ADD(hh, sdata->shares, hash, 32, share); ret = true; out_unlock: - ck_wunlock(&share_lock); + ck_wunlock(&sdata->share_lock); return ret; } @@ -2311,6 +2341,7 @@ static json_t *parse_submit(stratum_instance_t *client, json_t *json_msg, char *fname = NULL, *s, *nonce2; enum share_err err = SE_NONE; ckpool_t *ckp = client->ckp; + sdata_t *sdata = ckp->data; char idstring[20] = {}; workbase_t *wb = NULL; uint32_t ntime32; @@ -2376,14 +2407,14 @@ static json_t *parse_submit(stratum_instance_t *client, json_t *json_msg, share = true; - ck_rlock(&workbase_lock); - HASH_FIND_I64(workbases, &id, wb); + ck_rlock(&sdata->workbase_lock); + HASH_FIND_I64(sdata->workbases, &id, wb); if (unlikely(!wb)) { - id = current_workbase->id; + id = sdata->current_workbase->id; err = SE_INVALID_JOBID; json_set_string(json_msg, "reject-reason", SHARE_ERR(err)); strncpy(idstring, job_id, 19); - ASPRINTF(&fname, "%s.sharelog", current_workbase->logdir); + ASPRINTF(&fname, "%s.sharelog", sdata->current_workbase->logdir); goto out_unlock; } wdiff = wb->diff; @@ -2407,7 +2438,7 @@ static json_t *parse_submit(stratum_instance_t *client, json_t *json_msg, bswap_256(sharehash, hash); __bin2hex(hexhash, sharehash, 32); - if (id < blockchange_id) { + if (id < sdata->blockchange_id) { err = SE_STALE; json_set_string(json_msg, "reject-reason", SHARE_ERR(err)); goto out_submit; @@ -2423,7 +2454,7 @@ out_submit: if (sdiff >= wdiff) submit = true; out_unlock: - ck_runlock(&workbase_lock); + ck_runlock(&sdata->workbase_lock); /* Accept the lower of new and old diffs until the next update */ if (id < client->diff_change_job_id && client->old_diff < client->diff) @@ -2433,7 +2464,7 @@ out_unlock: suffix_string(wdiff, wdiffsuffix, 16, 0); if (sdiff >= diff) { - if (new_share(hash, id)) { + if (new_share(sdata, hash, id)) { LOGINFO("Accepted client %ld share diff %.1f/%.0f/%s: %s", client->id, sdiff, diff, wdiffsuffix, hexhash); result = true; @@ -2508,12 +2539,12 @@ out: client->first_invalid = now_t; else if (client->first_invalid && client->first_invalid < now_t - 120) { LOGNOTICE("Client %d rejecting for 120s, disconnecting", client->id); - stratum_send_message(client, "Disconnecting for continuous invalid shares"); + stratum_send_message(sdata, client, "Disconnecting for continuous invalid shares"); client->reject = 2; } else if (client->first_invalid && client->first_invalid < now_t - 60) { if (!client->reject) { LOGINFO("Client %d rejecting for 60s, sending diff", client->id); - stratum_send_diff(client); + stratum_send_diff(sdata, client); client->reject = 1; } } @@ -2527,7 +2558,7 @@ out: "clientid", client->id, "secondaryuserid", user_instance->secondaryuserid, "enonce1", client->enonce1, - "workinfoid", current_workbase->id, + "workinfoid", sdata->current_workbase->id, "workername", client->workername, "username", user_instance->username, "error", json_copy(*err_val), @@ -2544,61 +2575,61 @@ out: } /* Must enter with workbase_lock held */ -static json_t *__stratum_notify(bool clean) +static json_t *__stratum_notify(sdata_t *sdata, bool clean) { json_t *val; JSON_CPACK(val, "{s:[ssssosssb],s:o,s:s}", "params", - current_workbase->idstring, - current_workbase->prevhash, - current_workbase->coinb1, - current_workbase->coinb2, - json_deep_copy(current_workbase->merkle_array), - current_workbase->bbversion, - current_workbase->nbit, - current_workbase->ntime, + sdata->current_workbase->idstring, + sdata->current_workbase->prevhash, + sdata->current_workbase->coinb1, + sdata->current_workbase->coinb2, + json_deep_copy(sdata->current_workbase->merkle_array), + sdata->current_workbase->bbversion, + sdata->current_workbase->nbit, + sdata->current_workbase->ntime, clean, "id", json_null(), "method", "mining.notify"); return val; } -static void stratum_broadcast_update(bool clean) +static void stratum_broadcast_update(sdata_t *sdata, bool clean) { json_t *json_msg; - ck_rlock(&workbase_lock); - json_msg = __stratum_notify(clean); - ck_runlock(&workbase_lock); + ck_rlock(&sdata->workbase_lock); + json_msg = __stratum_notify(sdata, clean); + ck_runlock(&sdata->workbase_lock); - stratum_broadcast(json_msg); + stratum_broadcast(sdata, json_msg); } /* For sending a single stratum template update */ -static void stratum_send_update(int64_t client_id, bool clean) +static void stratum_send_update(sdata_t *sdata, int64_t client_id, bool clean) { json_t *json_msg; - ck_rlock(&workbase_lock); - json_msg = __stratum_notify(clean); - ck_runlock(&workbase_lock); + ck_rlock(&sdata->workbase_lock); + json_msg = __stratum_notify(sdata, clean); + ck_runlock(&sdata->workbase_lock); - stratum_add_send(json_msg, client_id); + stratum_add_send(sdata, json_msg, client_id); } -static void send_json_err(int64_t client_id, json_t *id_val, const char *err_msg) +static void send_json_err(sdata_t *sdata, int64_t client_id, json_t *id_val, const char *err_msg) { json_t *val; JSON_CPACK(val, "{soss}", "id", json_copy(id_val), "error", err_msg); - stratum_add_send(val, client_id); + stratum_add_send(sdata, val, client_id); } -static void update_client(stratum_instance_t *client, const int64_t client_id) +static void update_client(sdata_t *sdata, stratum_instance_t *client, const int64_t client_id) { - stratum_send_update(client_id, true); - stratum_send_diff(client); + stratum_send_update(sdata, client_id, true); + stratum_send_diff(sdata, client); } static json_params_t @@ -2621,28 +2652,29 @@ static void set_worker_mindiff(ckpool_t *ckp, const char *workername, int mindif char *username = strdupa(workername), *ignore; user_instance_t *instance = NULL; stratum_instance_t *client; + sdata_t *sdata = ckp->data; ignore = username; strsep(&ignore, "._"); /* Find the user first */ - ck_rlock(&instance_lock); - HASH_FIND_STR(user_instances, username, instance); - ck_runlock(&instance_lock); + ck_rlock(&sdata->instance_lock); + HASH_FIND_STR(sdata->user_instances, username, instance); + ck_runlock(&sdata->instance_lock); /* They may just have not connected yet */ if (!instance) return LOGINFO("Failed to find user %s in set_worker_mindiff", username); /* Then find the matching worker instance */ - ck_rlock(&instance_lock); + ck_rlock(&sdata->instance_lock); DL_FOREACH(instance->worker_instances, tmp) { if (!safecmp(workername, tmp->workername)) { worker = tmp; break; } } - ck_runlock(&instance_lock); + ck_runlock(&sdata->instance_lock); /* They may just not be connected at the moment */ if (!worker) @@ -2660,7 +2692,7 @@ static void set_worker_mindiff(ckpool_t *ckp, const char *workername, int mindif * matching worker that are currently live and send them a new diff * if we can. Otherwise it will only act as a clamp on next share * submission. */ - ck_rlock(&instance_lock); + ck_rlock(&sdata->instance_lock); DL_FOREACH(instance->instances, client) { if (client->worker_instance != worker) continue; @@ -2670,9 +2702,9 @@ static void set_worker_mindiff(ckpool_t *ckp, const char *workername, int mindif if (mindiff == client->diff) continue; client->diff = mindiff; - stratum_send_diff(client); + stratum_send_diff(sdata, client); } - ck_runlock(&instance_lock); + ck_runlock(&sdata->instance_lock); } /* Implement support for the diff in the params as well as the originally @@ -2680,6 +2712,7 @@ static void set_worker_mindiff(ckpool_t *ckp, const char *workername, int mindif static void suggest_diff(stratum_instance_t *client, const char *method, json_t *params_val) { json_t *arr_val = json_array_get(params_val, 0); + sdata_t *sdata = client->ckp->data; int64_t sdiff; if (unlikely(!client->authorised)) @@ -2697,19 +2730,19 @@ static void suggest_diff(stratum_instance_t *client, const char *method, json_t client->diff = client->ckp->mindiff; else client->diff = sdiff; - stratum_send_diff(client); + stratum_send_diff(sdata, client); } -static void parse_method(const int64_t client_id, json_t *id_val, json_t *method_val, - json_t *params_val, char *address) +static void parse_method(sdata_t *sdata, const int64_t client_id, json_t *id_val, + json_t *method_val, json_t *params_val, char *address) { stratum_instance_t *client; const char *method; char buf[256]; - ck_rlock(&instance_lock); - client = __instance_by_id(client_id); - ck_runlock(&instance_lock); + ck_rlock(&sdata->instance_lock); + client = __instance_by_id(sdata, client_id); + ck_runlock(&sdata->instance_lock); if (unlikely(!client)) { LOGINFO("Failed to find client id %ld in hashtable!", client_id); @@ -2738,9 +2771,9 @@ static void parse_method(const int64_t client_id, json_t *id_val, json_t *method json_object_set_new_nocheck(val, "result", result_val); json_object_set_nocheck(val, "id", id_val); json_object_set_new_nocheck(val, "error", json_null()); - stratum_add_send(val, client_id); + stratum_add_send(sdata, val, client_id); if (likely(client->subscribed)) - update_client(client, client_id); + update_client(sdata, client, client_id); return; } @@ -2749,9 +2782,9 @@ static void parse_method(const int64_t client_id, json_t *id_val, json_t *method * is a passthrough and to manage its messages accordingly. * Remove this instance since the client id may well be * reused */ - ck_wlock(&instance_lock); - HASH_DEL(stratum_instances, client); - ck_wunlock(&instance_lock); + ck_wlock(&sdata->instance_lock); + HASH_DEL(sdata->stratum_instances, client); + ck_wunlock(&sdata->instance_lock); LOGINFO("Adding passthrough client %ld", client->id); snprintf(buf, 255, "passthrough=%ld", client->id); @@ -2763,7 +2796,7 @@ static void parse_method(const int64_t client_id, json_t *id_val, json_t *method if (cmdmatch(method, "mining.auth") && client->subscribed) { json_params_t *jp = create_json_params(client_id, method_val, params_val, id_val, address); - ckmsgq_add(sauthq, jp); + ckmsgq_add(sdata->sauthq, jp); return; } @@ -2781,7 +2814,7 @@ static void parse_method(const int64_t client_id, json_t *id_val, json_t *method if (cmdmatch(method, "mining.submit")) { json_params_t *jp = create_json_params(client_id, method_val, params_val, id_val, address); - ckmsgq_add(sshareq, jp); + ckmsgq_add(sdata->sshareq, jp); return; } @@ -2794,13 +2827,13 @@ static void parse_method(const int64_t client_id, json_t *id_val, json_t *method if (cmdmatch(method, "mining.get")) { json_params_t *jp = create_json_params(client_id, method_val, params_val, id_val, address); - ckmsgq_add(stxnq, jp); + ckmsgq_add(sdata->stxnq, jp); return; } /* Unhandled message here */ } -static void parse_instance_msg(smsg_t *msg) +static void parse_instance_msg(sdata_t *sdata, smsg_t *msg) { json_t *val = msg->json_msg, *id_val, *method, *params; int64_t client_id = msg->client_id; @@ -2819,19 +2852,19 @@ static void parse_instance_msg(smsg_t *msg) LOGDEBUG("Received spurious response %s", result ? result : ""); goto out; } - send_json_err(client_id, id_val, "-3:method not found"); + send_json_err(sdata, client_id, id_val, "-3:method not found"); goto out; } if (unlikely(!json_is_string(method))) { - send_json_err(client_id, id_val, "-1:method is not string"); + send_json_err(sdata, client_id, id_val, "-1:method is not string"); goto out; } params = json_object_get(val, "params"); if (unlikely(!params)) { - send_json_err(client_id, id_val, "-1:params not found"); + send_json_err(sdata, client_id, id_val, "-1:params not found"); goto out; } - parse_method(client_id, id_val, method, params, msg->address); + parse_method(sdata, client_id, id_val, method, params, msg->address); out: json_decref(val); free(msg); @@ -2840,6 +2873,7 @@ out: static void srecv_process(ckpool_t *ckp, char *buf) { stratum_instance_t *instance; + sdata_t *sdata = ckp->data; smsg_t *msg; json_t *val; @@ -2872,15 +2906,15 @@ static void srecv_process(ckpool_t *ckp, char *buf) json_object_clear(val); /* Parse the message here */ - ck_wlock(&instance_lock); - instance = __instance_by_id(msg->client_id); + ck_wlock(&sdata->instance_lock); + instance = __instance_by_id(sdata, msg->client_id); if (!instance) { /* client_id instance doesn't exist yet, create one */ instance = __stratum_add_instance(ckp, msg->client_id); } - ck_wunlock(&instance_lock); + ck_wunlock(&sdata->instance_lock); - parse_instance_msg(msg); + parse_instance_msg(sdata, msg); out: free(buf); } @@ -2920,17 +2954,18 @@ static void discard_json_params(json_params_t **jp) *jp = NULL; } -static void sshare_process(ckpool_t __maybe_unused *ckp, json_params_t *jp) +static void sshare_process(ckpool_t *ckp, json_params_t *jp) { json_t *result_val, *json_msg, *err_val = NULL; stratum_instance_t *client; + sdata_t *sdata = ckp->data; int64_t client_id; client_id = jp->client_id; - ck_rlock(&instance_lock); - client = __instance_by_id(client_id); - ck_runlock(&instance_lock); + ck_rlock(&sdata->instance_lock); + client = __instance_by_id(sdata, client_id); + ck_runlock(&sdata->instance_lock); if (unlikely(!client)) { LOGINFO("Share processor failed to find client id %ld in hashtable!", client_id); @@ -2945,7 +2980,7 @@ static void sshare_process(ckpool_t __maybe_unused *ckp, json_params_t *jp) json_object_set_new_nocheck(json_msg, "result", result_val); json_object_set_new_nocheck(json_msg, "error", err_val ? err_val : json_null()); json_object_set_nocheck(json_msg, "id", jp->id_val); - stratum_add_send(json_msg, client_id); + stratum_add_send(sdata, json_msg, client_id); out: discard_json_params(&jp); } @@ -2954,14 +2989,15 @@ static void sauth_process(ckpool_t *ckp, json_params_t *jp) { json_t *result_val, *json_msg, *err_val = NULL; stratum_instance_t *client; + sdata_t *sdata = ckp->data; int mindiff, errnum = 0; int64_t client_id; client_id = jp->client_id; - ck_rlock(&instance_lock); - client = __instance_by_id(client_id); - ck_runlock(&instance_lock); + ck_rlock(&sdata->instance_lock); + client = __instance_by_id(sdata, client_id); + ck_runlock(&sdata->instance_lock); if (unlikely(!client)) { LOGINFO("Authoriser failed to find client id %ld in hashtable!", client_id); @@ -2973,18 +3009,18 @@ static void sauth_process(ckpool_t *ckp, json_params_t *jp) ASPRINTF(&buf, "Authorised, welcome to %s %s!", ckp->name, client->user_instance->username); - stratum_send_message(client, buf); + stratum_send_message(sdata, client, buf); } else { if (errnum < 0) - stratum_send_message(client, "Authorisations temporarily offline :("); + stratum_send_message(sdata, client, "Authorisations temporarily offline :("); else - stratum_send_message(client, "Failed authorisation :("); + stratum_send_message(sdata, client, "Failed authorisation :("); } json_msg = json_object(); json_object_set_new_nocheck(json_msg, "result", result_val); json_object_set_new_nocheck(json_msg, "error", err_val ? err_val : json_null()); json_object_set_nocheck(json_msg, "id", jp->id_val); - stratum_add_send(json_msg, client_id); + stratum_add_send(sdata, json_msg, client_id); if (!json_is_true(result_val) || !client->suggest_diff) goto out; @@ -2994,7 +3030,7 @@ static void sauth_process(ckpool_t *ckp, json_params_t *jp) mindiff = MAX(ckp->mindiff, client->suggest_diff); if (mindiff != client->diff) { client->diff = mindiff; - stratum_send_diff(client); + stratum_send_diff(sdata, client); } out: discard_json_params(&jp); @@ -3030,12 +3066,13 @@ static void parse_ckdb_cmd(ckpool_t __maybe_unused *ckp, const char *cmd) static void ckdbq_process(ckpool_t *ckp, char *msg) { static bool failed = false; + sdata_t *sdata = ckp->data; char *buf = NULL; while (!buf) { - mutex_lock(&ckdb_lock); + mutex_lock(&sdata->ckdb_lock); buf = ckdb_msg_call(ckp, msg); - mutex_unlock(&ckdb_lock); + mutex_unlock(&sdata->ckdb_lock); if (unlikely(!buf)) { if (!failed) { @@ -3072,30 +3109,30 @@ static void ckdbq_process(ckpool_t *ckp, char *msg) } } -static int transactions_by_jobid(int64_t id) +static int transactions_by_jobid(sdata_t *sdata, int64_t id) { workbase_t *wb; int ret = -1; - ck_rlock(&workbase_lock); - HASH_FIND_I64(workbases, &id, wb); + ck_rlock(&sdata->workbase_lock); + HASH_FIND_I64(sdata->workbases, &id, wb); if (wb) ret = wb->transactions; - ck_runlock(&workbase_lock); + ck_runlock(&sdata->workbase_lock); return ret; } -static json_t *txnhashes_by_jobid(int64_t id) +static json_t *txnhashes_by_jobid(sdata_t *sdata, int64_t id) { json_t *ret = NULL; workbase_t *wb; - ck_rlock(&workbase_lock); - HASH_FIND_I64(workbases, &id, wb); + ck_rlock(&sdata->workbase_lock); + HASH_FIND_I64(sdata->workbases, &id, wb); if (wb) ret = json_string(wb->txn_hashes); - ck_runlock(&workbase_lock); + ck_runlock(&sdata->workbase_lock); return ret; } @@ -3105,6 +3142,7 @@ static void send_transactions(ckpool_t *ckp, json_params_t *jp) const char *msg = json_string_value(jp->method), *params = json_string_value(json_array_get(jp->params, 0)); stratum_instance_t *client; + sdata_t *sdata = ckp->data; json_t *val, *hashes; int64_t job_id = 0; time_t now_t; @@ -3126,7 +3164,7 @@ static void send_transactions(ckpool_t *ckp, json_params_t *jp) sscanf(params, "%lx", &job_id); else sscanf(msg, "mining.get_transactions(%lx", &job_id); - txns = transactions_by_jobid(job_id); + txns = transactions_by_jobid(sdata, job_id); if (txns != -1) { json_set_int(val, "result", txns); json_object_set_new_nocheck(val, "error", json_null()); @@ -3140,9 +3178,9 @@ static void send_transactions(ckpool_t *ckp, json_params_t *jp) goto out_send; } - ck_rlock(&instance_lock); - client = __instance_by_id(jp->client_id); - ck_runlock(&instance_lock); + ck_rlock(&sdata->instance_lock); + client = __instance_by_id(sdata, jp->client_id); + ck_runlock(&sdata->instance_lock); if (unlikely(!client)) { LOGINFO("send_transactions failed to find client id %ld in hashtable!", @@ -3162,14 +3200,14 @@ static void send_transactions(ckpool_t *ckp, json_params_t *jp) goto out_send; } sscanf(params, "%lx", &job_id); - hashes = txnhashes_by_jobid(job_id); + hashes = txnhashes_by_jobid(sdata, job_id); if (hashes) { json_object_set_new_nocheck(val, "result", hashes); json_object_set_new_nocheck(val, "error", json_null()); } else json_set_string(val, "error", "Invalid job_id"); out_send: - stratum_add_send(val, jp->client_id); + stratum_add_send(sdata, val, jp->client_id); out: discard_json_params(&jp); } @@ -3177,29 +3215,29 @@ out: /* Called every 20 seconds, we send the updated stats to ckdb of those users * who have gone 10 minutes between updates. This ends up staggering stats to * avoid floods of stat data coming at once. */ -static void update_workerstats(ckpool_t *ckp) +static void update_workerstats(ckpool_t *ckp, sdata_t *sdata) { user_instance_t *user, *tmp; char cdfield[64]; time_t now_t; ts_t ts_now; - if (++stats.userstats_cycle > 0x1f) - stats.userstats_cycle = 0; + if (++sdata->stats.userstats_cycle > 0x1f) + sdata->stats.userstats_cycle = 0; ts_realtime(&ts_now); sprintf(cdfield, "%lu,%lu", ts_now.tv_sec, ts_now.tv_nsec); now_t = ts_now.tv_sec; - ck_rlock(&instance_lock); - HASH_ITER(hh, user_instances, user, tmp) { + ck_rlock(&sdata->instance_lock); + HASH_ITER(hh, sdata->user_instances, user, tmp) { worker_instance_t *worker; uint8_t cycle_mask; /* Select users using a mask to return each user's stats once * every ~10 minutes */ cycle_mask = user->id & 0x1f; - if (cycle_mask != stats.userstats_cycle) + if (cycle_mask != sdata->stats.userstats_cycle) continue; DL_FOREACH(user->worker_instances, worker) { double ghs1, ghs5, ghs60, ghs1440; @@ -3234,18 +3272,20 @@ static void update_workerstats(ckpool_t *ckp) ckdbq_add(ckp, ID_WORKERSTATS, val); } } - ck_runlock(&instance_lock); + ck_runlock(&sdata->instance_lock); } static void *statsupdate(void *arg) { ckpool_t *ckp = (ckpool_t *)arg; + sdata_t *sdata = ckp->data; + pool_stats_t *stats = &sdata->stats; pthread_detach(pthread_self()); rename_proc("statsupdate"); - tv_time(&stats.start_time); - cksleep_prepare_r(&stats.last_update); + tv_time(&stats->start_time); + cksleep_prepare_r(&stats->last_update); sleep(1); while (42) { @@ -3266,38 +3306,38 @@ static void *statsupdate(void *arg) int i; tv_time(&now); - timersub(&now, &stats.start_time, &diff); + timersub(&now, &stats->start_time, &diff); tdiff = diff.tv_sec + (double)diff.tv_usec / 1000000; - ghs1 = stats.dsps1 * nonces; + ghs1 = stats->dsps1 * nonces; suffix_string(ghs1, suffix1, 16, 0); - sps1 = stats.sps1; + sps1 = stats->sps1; bias5 = time_bias(tdiff, 300); - ghs5 = stats.dsps5 * nonces / bias5; + ghs5 = stats->dsps5 * nonces / bias5; suffix_string(ghs5, suffix5, 16, 0); - sps5 = stats.sps5 / bias5; + sps5 = stats->sps5 / bias5; bias = time_bias(tdiff, 900); - ghs15 = stats.dsps15 * nonces / bias; + ghs15 = stats->dsps15 * nonces / bias; suffix_string(ghs15, suffix15, 16, 0); - sps15 = stats.sps15 / bias; + sps15 = stats->sps15 / bias; bias60 = time_bias(tdiff, 3600); - ghs60 = stats.dsps60 * nonces / bias60; + ghs60 = stats->dsps60 * nonces / bias60; suffix_string(ghs60, suffix60, 16, 0); - sps60 = stats.sps60 / bias60; + sps60 = stats->sps60 / bias60; bias = time_bias(tdiff, 21600); - ghs360 = stats.dsps360 * nonces / bias; + ghs360 = stats->dsps360 * nonces / bias; suffix_string(ghs360, suffix360, 16, 0); bias1440 = time_bias(tdiff, 86400); - ghs1440 = stats.dsps1440 * nonces / bias1440; + ghs1440 = stats->dsps1440 * nonces / bias1440; suffix_string(ghs1440, suffix1440, 16, 0); bias = time_bias(tdiff, 604800); - ghs10080 = stats.dsps10080 * nonces / bias; + ghs10080 = stats->dsps10080 * nonces / bias; suffix_string(ghs10080, suffix10080, 16, 0); snprintf(fname, 511, "%s/pool/pool.status", ckp->logdir); @@ -3307,8 +3347,8 @@ static void *statsupdate(void *arg) JSON_CPACK(val, "{si,si,si}", "runtime", diff.tv_sec, - "Users", stats.users, - "Workers", stats.workers); + "Users", stats->users, + "Workers", stats->workers); s = json_dumps(val, JSON_NO_UTF8 | JSON_PRESERVE_ORDER); json_decref(val); LOGNOTICE("Pool:%s", s); @@ -3341,8 +3381,8 @@ static void *statsupdate(void *arg) dealloc(s); fclose(fp); - ck_rlock(&instance_lock); - HASH_ITER(hh, stratum_instances, client, tmp) { + ck_rlock(&sdata->instance_lock); + HASH_ITER(hh, sdata->stratum_instances, client, tmp) { if (!client->authorised) continue; @@ -3361,7 +3401,7 @@ static void *statsupdate(void *arg) } } - HASH_ITER(hh, user_instances, instance, tmpuser) { + HASH_ITER(hh, sdata->user_instances, instance, tmpuser) { worker_instance_t *worker; bool idle = false; @@ -3453,15 +3493,15 @@ static void *statsupdate(void *arg) json_decref(val); fclose(fp); } - ck_runlock(&instance_lock); + ck_runlock(&sdata->instance_lock); ts_realtime(&ts_now); sprintf(cdfield, "%lu,%lu", ts_now.tv_sec, ts_now.tv_nsec); JSON_CPACK(val, "{ss,si,si,si,sf,sf,sf,sf,ss,ss,ss,ss}", "poolinstance", ckp->name, "elapsed", diff.tv_sec, - "users", stats.users, - "workers", stats.workers, + "users", stats->users, + "workers", stats->workers, "hashrate", ghs1, "hashrate5m", ghs5, "hashrate1hr", ghs60, @@ -3475,32 +3515,32 @@ static void *statsupdate(void *arg) /* Update stats 3 times per minute for smooth values, displaying * status every minute. */ for (i = 0; i < 3; i++) { - cksleep_ms_r(&stats.last_update, 20000); - cksleep_prepare_r(&stats.last_update); - update_workerstats(ckp); - - mutex_lock(&stats_lock); - stats.accounted_shares += stats.unaccounted_shares; - stats.accounted_diff_shares += stats.unaccounted_diff_shares; - stats.accounted_rejects += stats.unaccounted_rejects; - - decay_time(&stats.sps1, stats.unaccounted_shares, 20, 60); - decay_time(&stats.sps5, stats.unaccounted_shares, 20, 300); - decay_time(&stats.sps15, stats.unaccounted_shares, 20, 900); - decay_time(&stats.sps60, stats.unaccounted_shares, 20, 3600); - - decay_time(&stats.dsps1, stats.unaccounted_diff_shares, 20, 60); - decay_time(&stats.dsps5, stats.unaccounted_diff_shares, 20, 300); - decay_time(&stats.dsps15, stats.unaccounted_diff_shares, 20, 900); - decay_time(&stats.dsps60, stats.unaccounted_diff_shares, 20, 3600); - decay_time(&stats.dsps360, stats.unaccounted_diff_shares, 20, 21600); - decay_time(&stats.dsps1440, stats.unaccounted_diff_shares, 20, 86400); - decay_time(&stats.dsps10080, stats.unaccounted_diff_shares, 20, 604800); - - stats.unaccounted_shares = - stats.unaccounted_diff_shares = - stats.unaccounted_rejects = 0; - mutex_unlock(&stats_lock); + cksleep_ms_r(&stats->last_update, 20000); + cksleep_prepare_r(&stats->last_update); + update_workerstats(ckp, sdata); + + mutex_lock(&sdata->stats_lock); + stats->accounted_shares += stats->unaccounted_shares; + stats->accounted_diff_shares += stats->unaccounted_diff_shares; + stats->accounted_rejects += stats->unaccounted_rejects; + + decay_time(&stats->sps1, stats->unaccounted_shares, 20, 60); + decay_time(&stats->sps5, stats->unaccounted_shares, 20, 300); + decay_time(&stats->sps15, stats->unaccounted_shares, 20, 900); + decay_time(&stats->sps60, stats->unaccounted_shares, 20, 3600); + + decay_time(&stats->dsps1, stats->unaccounted_diff_shares, 20, 60); + decay_time(&stats->dsps5, stats->unaccounted_diff_shares, 20, 300); + decay_time(&stats->dsps15, stats->unaccounted_diff_shares, 20, 900); + decay_time(&stats->dsps60, stats->unaccounted_diff_shares, 20, 3600); + decay_time(&stats->dsps360, stats->unaccounted_diff_shares, 20, 21600); + decay_time(&stats->dsps1440, stats->unaccounted_diff_shares, 20, 86400); + decay_time(&stats->dsps10080, stats->unaccounted_diff_shares, 20, 604800); + + stats->unaccounted_shares = + stats->unaccounted_diff_shares = + stats->unaccounted_rejects = 0; + mutex_unlock(&sdata->stats_lock); } } @@ -3513,6 +3553,7 @@ static void *statsupdate(void *arg) static void *ckdb_heartbeat(void *arg) { ckpool_t *ckp = (ckpool_t *)arg; + sdata_t *sdata = ckp->data; pthread_detach(pthread_self()); rename_proc("heartbeat"); @@ -3523,7 +3564,7 @@ static void *ckdb_heartbeat(void *arg) json_t *val; cksleep_ms(1000); - if (unlikely(!ckmsgq_empty(ckdbq))) { + if (unlikely(!ckmsgq_empty(sdata->ckdbq))) { LOGDEBUG("Witholding heartbeat due to ckdb messages being queued"); continue; } @@ -3544,9 +3585,12 @@ int stratifier(proc_instance_t *pi) pthread_t pth_blockupdate, pth_statsupdate, pth_heartbeat; ckpool_t *ckp = pi->ckp; int ret = 1, threads; + sdata_t *sdata; char *buf; LOGWARNING("%s stratifier starting", ckp->name); + sdata = ckzalloc(sizeof(sdata_t)); + ckp->data = sdata; /* Wait for the generator to have something for us */ do { @@ -3565,52 +3609,53 @@ int stratifier(proc_instance_t *pi) /* Store this for use elsewhere */ hex2bin(scriptsig_header_bin, scriptsig_header, 41); - address_to_pubkeytxn(pubkeytxnbin, ckp->btcaddress); + address_to_pubkeytxn(sdata->pubkeytxnbin, ckp->btcaddress); if (test_address(ckp, ckp->donaddress)) { ckp->donvalid = true; - address_to_pubkeytxn(donkeytxnbin, ckp->donaddress); + address_to_pubkeytxn(sdata->donkeytxnbin, ckp->donaddress); } } /* Set the initial id to time as high bits so as to not send the same * id on restarts */ if (!ckp->proxy) - blockchange_id = workbase_id = ((int64_t)time(NULL)) << 32; + sdata->blockchange_id = sdata->workbase_id = ((int64_t)time(NULL)) << 32; dealloc(buf); if (!ckp->serverurl) ckp->serverurl = "127.0.0.1"; - cklock_init(&instance_lock); + cklock_init(&sdata->instance_lock); - mutex_init(&ckdb_lock); - ssends = create_ckmsgq(ckp, "ssender", &ssend_process); + mutex_init(&sdata->ckdb_lock); + sdata->ssends = create_ckmsgq(ckp, "ssender", &ssend_process); /* Create half as many share processing threads as there are CPUs */ threads = sysconf(_SC_NPROCESSORS_ONLN) / 2 ? : 1; - sshareq = create_ckmsgqs(ckp, "sprocessor", &sshare_process, threads); + sdata->sshareq = create_ckmsgqs(ckp, "sprocessor", &sshare_process, threads); /* Create 1/4 as many stratum processing threads as there are CPUs */ threads = threads / 2 ? : 1; - srecvs = create_ckmsgqs(ckp, "sreceiver", &srecv_process, threads); - sauthq = create_ckmsgq(ckp, "authoriser", &sauth_process); - ckdbq = create_ckmsgq(ckp, "ckdbqueue", &ckdbq_process); - stxnq = create_ckmsgq(ckp, "stxnq", &send_transactions); + sdata->srecvs = create_ckmsgqs(ckp, "sreceiver", &srecv_process, threads); + sdata->sauthq = create_ckmsgq(ckp, "authoriser", &sauth_process); + sdata->ckdbq = create_ckmsgq(ckp, "ckdbqueue", &ckdbq_process); + sdata->stxnq = create_ckmsgq(ckp, "stxnq", &send_transactions); if (!CKP_STANDALONE(ckp)) create_pthread(&pth_heartbeat, ckdb_heartbeat, ckp); - cklock_init(&workbase_lock); + cklock_init(&sdata->workbase_lock); if (!ckp->proxy) create_pthread(&pth_blockupdate, blockupdate, ckp); - mutex_init(&stats_lock); + mutex_init(&sdata->stats_lock); create_pthread(&pth_statsupdate, statsupdate, ckp); - cklock_init(&share_lock); - mutex_init(&block_lock); + cklock_init(&sdata->share_lock); + mutex_init(&sdata->block_lock); LOGWARNING("%s stratifier ready", ckp->name); ret = stratum_loop(ckp, pi); out: + dealloc(ckp->data); return process_exit(ckp, pi, ret); }