diff --git a/src/connector.c b/src/connector.c index f47e30b0..f1679362 100644 --- a/src/connector.c +++ b/src/connector.c @@ -352,15 +352,15 @@ reparse: json_object_set_new_nocheck(val, "server", json_integer(client->server)); s = json_dumps(val, 0); - ck_rlock(&cdata->lock); - /* Do not send messages of clients we've already dropped */ + /* Do not send messages of clients we've already dropped. We + * do this unlocked as the occasional false negative can be + * filtered by the stratifier. */ if (likely(client->fd != -1)) { if (ckp->passthrough) send_proc(ckp->generator, s); else send_proc(ckp->stratifier, s); } - ck_runlock(&cdata->lock); free(s); json_decref(val); diff --git a/src/libckpool.c b/src/libckpool.c index f2937a05..bed7a816 100644 --- a/src/libckpool.c +++ b/src/libckpool.c @@ -1295,12 +1295,6 @@ void *_ckzalloc(size_t len, const char *file, const char *func, const int line) return ptr; } -void _dealloc(void **ptr) -{ - free(*ptr); - *ptr = NULL; -} - /* Adequate size s==len*2 + 1 must be alloced to use this variant */ void __bin2hex(void *vs, const void *vp, size_t len) { diff --git a/src/libckpool.h b/src/libckpool.h index 1b24d9c2..cb7c1d01 100644 --- a/src/libckpool.h +++ b/src/libckpool.h @@ -177,7 +177,10 @@ static inline void flip_80(void *dest_p, const void *src_p) #define ckalloc(len) _ckalloc(len, __FILE__, __func__, __LINE__) #define ckzalloc(len) _ckzalloc(len, __FILE__, __func__, __LINE__) -#define dealloc(ptr) _dealloc((void *)&(ptr)) +#define dealloc(ptr) do { \ + free(ptr); \ + ptr = NULL; \ +} while (0) #define VASPRINTF(strp, fmt, ...) do { \ if (unlikely(vasprintf(strp, fmt, ##__VA_ARGS__) < 0)) \ @@ -499,7 +502,6 @@ void trail_slash(char **buf); void *_ckalloc(size_t len, const char *file, const char *func, const int line); void *json_ckalloc(size_t size); void *_ckzalloc(size_t len, const char *file, const char *func, const int line); -void _dealloc(void **ptr); extern const int hex2bin_tbl[]; void __bin2hex(void *vs, const void *vp, size_t len); void *bin2hex(const void *vp, size_t len); diff --git a/src/stratifier.c b/src/stratifier.c index 5d1b2cfb..4e0c35d2 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -185,6 +185,8 @@ struct user_instance { bool authorised; /* Has this username ever been authorised? */ time_t auth_time; + time_t failed_authtime; /* Last time this username failed to authorise */ + int auth_backoff; /* How long to reject any auth attempts since last failure */ }; /* Combined data from workers with the same workername */ @@ -334,7 +336,7 @@ struct stratifier_data { cklock_t instance_lock; share_t *shares; - cklock_t share_lock; + pthread_mutex_t share_lock; int64_t shares_generated; @@ -546,7 +548,7 @@ static void purge_share_hashtable(sdata_t *sdata, int64_t wb_id) share_t *share, *tmp; int purged = 0; - ck_wlock(&sdata->share_lock); + mutex_lock(&sdata->share_lock); HASH_ITER(hh, sdata->shares, share, tmp) { if (share->workbase_id < wb_id) { HASH_DEL(sdata->shares, share); @@ -554,7 +556,7 @@ static void purge_share_hashtable(sdata_t *sdata, int64_t wb_id) purged++; } } - ck_wunlock(&sdata->share_lock); + mutex_unlock(&sdata->share_lock); if (purged) LOGINFO("Cleared %d shares from share hashtable", purged); @@ -566,7 +568,7 @@ static void age_share_hashtable(sdata_t *sdata, int64_t wb_id) share_t *share, *tmp; int aged = 0; - ck_wlock(&sdata->share_lock); + mutex_lock(&sdata->share_lock); HASH_ITER(hh, sdata->shares, share, tmp) { if (share->workbase_id == wb_id) { HASH_DEL(sdata->shares, share); @@ -574,7 +576,7 @@ static void age_share_hashtable(sdata_t *sdata, int64_t wb_id) aged++; } } - ck_wunlock(&sdata->share_lock); + mutex_unlock(&sdata->share_lock); if (aged) LOGINFO("Aged %d shares from share hashtable", aged); @@ -920,7 +922,7 @@ static void __add_dead(sdata_t *sdata, stratum_instance_t *client) static void __del_dead(sdata_t *sdata, stratum_instance_t *client) { - DL_DELETE_INIT(sdata->dead_instances, client); + DL_DELETE(sdata->dead_instances, client); sdata->stats.dead--; } @@ -1161,6 +1163,25 @@ static stratum_instance_t *ref_instance_by_id(sdata_t *sdata, int64_t id) return instance; } +/* Has this client_id already been used and is now in one of the dropped lists */ +static bool __dropped_instance(sdata_t *sdata, int64_t id) +{ + stratum_instance_t *client, *tmp; + bool ret = true; + + HASH_ITER(hh, sdata->disconnected_instances, client, tmp) { + if (unlikely(client->id == id)) + goto out; + } + DL_FOREACH(sdata->dead_instances, client) { + if (unlikely(client->id == id)) + goto out; + } + ret = false; +out: + return ret; +} + /* Ret = 1 is disconnected, 2 is killed, 3 is workerless killed */ static int __drop_client(sdata_t *sdata, stratum_instance_t *client, user_instance_t *instance) { @@ -1169,7 +1190,7 @@ static int __drop_client(sdata_t *sdata, stratum_instance_t *client, user_instan HASH_DEL(sdata->stratum_instances, client); if (instance) - DL_DELETE_INIT(instance->instances, client); + DL_DELETE(instance->instances, client); HASH_FIND(hh, sdata->disconnected_instances, &client->enonce1_64, sizeof(uint64_t), old_client); /* Only keep around one copy of the old client in server mode */ if (!client->ckp->proxy && !old_client && client->enonce1_64 && client->authorised) { @@ -1275,7 +1296,7 @@ static uint64_t disconnected_sessionid_exists(sdata_t *sdata, const char *sessio } instance = NULL; HASH_FIND(hh, sdata->disconnected_instances, &enonce1_64, sizeof(uint64_t), instance); - if (instance) { + if (instance && !instance->ref) { /* Delete the entry once we are going to use it since there * will be a new instance with the enonce1_64 */ old_id = instance->id; @@ -1484,8 +1505,6 @@ static void reset_bestshares(sdata_t *sdata) ck_runlock(&sdata->instance_lock); } -/* Ram from blocks is NOT freed at all for now, only their entry is removed - * from the linked list, leaving a very small leak here and reject. */ static void block_solve(ckpool_t *ckp, const char *blockhash) { ckmsg_t *block, *tmp, *found = NULL; @@ -1514,7 +1533,7 @@ static void block_solve(ckpool_t *ckp, const char *blockhash) if (!strcmp(solvehash, blockhash)) { dealloc(solvehash); found = block; - DL_DELETE_INIT(sdata->block_solves, block); + DL_DELETE(sdata->block_solves, block); break; } dealloc(solvehash); @@ -1561,7 +1580,7 @@ static void block_reject(sdata_t *sdata, const char *blockhash) if (!strcmp(solvehash, blockhash)) { dealloc(solvehash); found = block; - DL_DELETE_INIT(sdata->block_solves, block); + DL_DELETE(sdata->block_solves, block); break; } dealloc(solvehash); @@ -1610,7 +1629,7 @@ static void ckmsgq_stats(ckmsgq_t *ckmsgq, int size, json_t **val) JSON_CPACK(*val, "{si,si,si}", "count", objects, "memory", memsize, "generated", generated); } -static char *stratifier_stats(sdata_t *sdata) +static char *stratifier_stats(ckpool_t *ckp, sdata_t *sdata) { json_t *val = json_object(), *subval; int objects, generated; @@ -1652,11 +1671,11 @@ static char *stratifier_stats(sdata_t *sdata) JSON_CPACK(subval, "{si,si,si}", "count", objects, "memory", memsize, "generated", generated); json_set_object(val, "dead", subval); - ck_rlock(&sdata->share_lock); + mutex_lock(&sdata->share_lock); generated = sdata->shares_generated; objects = HASH_COUNT(sdata->shares); memsize = SAFE_HASH_OVERHEAD(sdata->shares) + sizeof(share_t) * objects; - ck_runlock(&sdata->share_lock); + mutex_unlock(&sdata->share_lock); JSON_CPACK(subval, "{si,si,si}", "count", objects, "memory", memsize, "generated", generated); json_set_object(val, "shares", subval); @@ -1666,8 +1685,10 @@ static char *stratifier_stats(sdata_t *sdata) /* Don't know exactly how big the string is so just count the pointer for now */ ckmsgq_stats(sdata->srecvs, sizeof(char *), &subval); json_set_object(val, "srecvs", subval); - ckmsgq_stats(sdata->ckdbq, sizeof(char *), &subval); - json_set_object(val, "ckdbq", subval); + if (!CKP_STANDALONE(ckp)) { + ckmsgq_stats(sdata->ckdbq, sizeof(char *), &subval); + json_set_object(val, "ckdbq", subval); + } ckmsgq_stats(sdata->stxnq, sizeof(json_params_t), &subval); json_set_object(val, "stxnq", subval); @@ -1746,7 +1767,7 @@ retry: char *msg; LOGDEBUG("Stratifier received stats request"); - msg = stratifier_stats(sdata); + msg = stratifier_stats(ckp, sdata); send_unix_msg(sockd, msg); Close(sockd); goto retry; @@ -2132,8 +2153,8 @@ static user_instance_t *generate_user(ckpool_t *ckp, stratum_instance_t *client, char *base_username = strdupa(workername), *username; bool new_instance = false, new_worker = false; sdata_t *sdata = ckp->data; - user_instance_t *instance; - stratum_instance_t *tmp; + worker_instance_t *tmp; + user_instance_t *user; int len; username = strsep(&base_username, "._"); @@ -2144,18 +2165,19 @@ static user_instance_t *generate_user(ckpool_t *ckp, stratum_instance_t *client, username[127] = '\0'; ck_wlock(&sdata->instance_lock); - HASH_FIND_STR(sdata->user_instances, username, instance); - if (!instance) { + HASH_FIND_STR(sdata->user_instances, username, user); + if (!user) { /* New user instance. Secondary user id will be NULL */ - instance = ckzalloc(sizeof(user_instance_t)); - strcpy(instance->username, username); + user = ckzalloc(sizeof(user_instance_t)); + user->auth_backoff = 3; /* Set initial backoff to 3 seconds */ + strcpy(user->username, username); new_instance = true; - instance->id = sdata->user_instance_id++; - HASH_ADD_STR(sdata->user_instances, username, instance); + user->id = sdata->user_instance_id++; + HASH_ADD_STR(sdata->user_instances, username, user); } - DL_FOREACH(instance->instances, tmp) { + DL_FOREACH(user->worker_instances, tmp) { if (!safecmp(workername, tmp->workername)) { - client->worker_instance = tmp->worker_instance; + client->worker_instance = tmp; break; } } @@ -2165,29 +2187,29 @@ static user_instance_t *generate_user(ckpool_t *ckp, stratum_instance_t *client, worker_instance_t *worker = ckzalloc(sizeof(worker_instance_t)); worker->workername = strdup(workername); - worker->instance = instance; - DL_APPEND(instance->worker_instances, worker); + worker->instance = user; + DL_APPEND(user->worker_instances, worker); new_worker = true; worker->start_time = time(NULL); client->worker_instance = worker; } - DL_APPEND(instance->instances, client); + DL_APPEND(user->instances, client); ck_wunlock(&sdata->instance_lock); if (CKP_STANDALONE(ckp) && new_instance) - read_userstats(ckp, instance); + read_userstats(ckp, user); if (CKP_STANDALONE(ckp) && new_worker) read_workerstats(ckp, client->worker_instance); if (new_instance && !ckp->proxy) { /* Is this a btc address based username? */ if (len > 26 && len < 35) - instance->btcaddress = test_address(ckp, username); - LOGNOTICE("Added new user %s%s", username, instance->btcaddress ? + user->btcaddress = test_address(ckp, username); + LOGNOTICE("Added new user %s%s", username, user->btcaddress ? " as address based registration" : ""); } - return instance; + return user; } /* Send this to the database and parse the response to authorise a user @@ -2368,8 +2390,17 @@ static json_t *parse_authorise(stratum_instance_t *client, json_t *params_val, j ts_realtime(&now); client->start_time = now.tv_sec; strcpy(client->address, address); - client->workername = strdup(buf); + if (user_instance->failed_authtime) { + time_t now_t = time(NULL); + + if (now_t < user_instance->failed_authtime + user_instance->auth_backoff) { + LOGNOTICE("Client %ld worker %s rate limited due to failed auth attempts", + client->id, buf); + client->dropped = true; + goto out; + } + } if (CKP_STANDALONE(ckp)) ret = true; else { @@ -2396,9 +2427,15 @@ static json_t *parse_authorise(stratum_instance_t *client, json_t *params_val, j inc_worker(ckp, user_instance); LOGNOTICE("Authorised client %ld worker %s as user %s", client->id, buf, user_instance->username); + user_instance->auth_backoff = 3; /* Reset auth backoff time */ } else { LOGNOTICE("Client %ld worker %s failed to authorise as user %s", client->id, buf, user_instance->username); + user_instance->failed_authtime = time(NULL); + user_instance->auth_backoff <<= 1; + /* Cap backoff time to 10 mins */ + if (user_instance->auth_backoff > 600) + user_instance->auth_backoff = 600; } out: return json_boolean(ret); @@ -2730,12 +2767,12 @@ static bool new_share(sdata_t *sdata, const uchar *hash, int64_t wb_id) memcpy(share->hash, hash, 32); share->workbase_id = wb_id; - ck_wlock(&sdata->share_lock); + mutex_lock(&sdata->share_lock); sdata->shares_generated++; HASH_FIND(hh, sdata->shares, hash, 32, match); if (likely(!match)) HASH_ADD(hh, sdata->shares, hash, 32, share); - ck_wunlock(&sdata->share_lock); + mutex_unlock(&sdata->share_lock); if (unlikely(match)) { dealloc(share); @@ -3288,6 +3325,12 @@ static void parse_method(sdata_t *sdata, stratum_instance_t *client, const int64 return; } +static void free_smsg(smsg_t *msg) +{ + json_decref(msg->json_msg); + free(msg); +} + /* Entered with client holding ref count */ static void parse_instance_msg(sdata_t *sdata, smsg_t *msg, stratum_instance_t *client) { @@ -3322,15 +3365,14 @@ static void parse_instance_msg(sdata_t *sdata, smsg_t *msg, stratum_instance_t * } parse_method(sdata, client, client_id, id_val, method, params, msg->address); out: - json_decref(val); - free(msg); + free_smsg(msg); } static void srecv_process(ckpool_t *ckp, char *buf) { + bool noid = false, dropped = false; sdata_t *sdata = ckp->data; stratum_instance_t *client; - bool added = false; smsg_t *msg; json_t *val; int server; @@ -3378,13 +3420,25 @@ static void srecv_process(ckpool_t *ckp, char *buf) client = __instance_by_id(sdata, msg->client_id); /* If client_id instance doesn't exist yet, create one */ if (unlikely(!client)) { - client = __stratum_add_instance(ckp, msg->client_id, server); - added = true; - } - __inc_instance_ref(client); + if (likely(!__dropped_instance(sdata, msg->client_id))) { + noid = true; + client = __stratum_add_instance(ckp, msg->client_id, server); + } else + dropped = true; + } else if (unlikely(client->dropped)) + dropped = true; + if (likely(!dropped)) + __inc_instance_ref(client); ck_wunlock(&sdata->instance_lock); - if (added) + if (unlikely(dropped)) { + /* Client may be NULL here */ + LOGNOTICE("Stratifier skipped dropped instance %ld message server %d", + msg->client_id, server); + free_smsg(msg); + goto out; + } + if (unlikely(noid)) LOGINFO("Stratifier added instance %ld server %d", client->id, server); parse_instance_msg(sdata, msg, client); @@ -4175,10 +4229,11 @@ int stratifier(proc_instance_t *pi) threads = threads / 2 ? : 1; sdata->srecvs = create_ckmsgqs(ckp, "sreceiver", &srecv_process, threads); sdata->sauthq = create_ckmsgq(ckp, "authoriser", &sauth_process); - sdata->ckdbq = create_ckmsgq(ckp, "ckdbqueue", &ckdbq_process); sdata->stxnq = create_ckmsgq(ckp, "stxnq", &send_transactions); - if (!CKP_STANDALONE(ckp)) + if (!CKP_STANDALONE(ckp)) { + sdata->ckdbq = create_ckmsgq(ckp, "ckdbqueue", &ckdbq_process); create_pthread(&pth_heartbeat, ckdb_heartbeat, ckp); + } cklock_init(&sdata->workbase_lock); if (!ckp->proxy) @@ -4187,7 +4242,7 @@ int stratifier(proc_instance_t *pi) mutex_init(&sdata->stats_lock); create_pthread(&pth_statsupdate, statsupdate, ckp); - cklock_init(&sdata->share_lock); + mutex_init(&sdata->share_lock); mutex_init(&sdata->block_lock); LOGWARNING("%s stratifier ready", ckp->name); diff --git a/src/utlist.h b/src/utlist.h index b4558aee..48a8c7d0 100644 --- a/src/utlist.h +++ b/src/utlist.h @@ -572,28 +572,6 @@ do { } \ } while (0) -#define DL_DELETE_INIT(head,del) \ - DL_DELETE3(head,del,prev,next) - -#define DL_DELETE3(head,del,prev,next) \ -do { \ - assert((del)->prev != NULL); \ - if ((del)->prev == (del)) { \ - (head)=NULL; \ - } else if ((del)==(head)) { \ - (del)->next->prev = (del)->prev; \ - (head) = (del)->next; \ - } else { \ - (del)->prev->next = (del)->next; \ - if ((del)->next) { \ - (del)->next->prev = (del)->prev; \ - } else { \ - (head)->prev = (del)->prev; \ - } \ - } \ - (del)->prev = (del)->next = NULL; \ -} while (0) - #define DL_COUNT(head,el,counter) \ DL_COUNT2(head,el,counter,next) \