kanoi 10 years ago
parent
commit
98aa9b66fe
  1. 6
      src/connector.c
  2. 6
      src/libckpool.c
  3. 6
      src/libckpool.h
  4. 153
      src/stratifier.c
  5. 22
      src/utlist.h

6
src/connector.c

@ -352,15 +352,15 @@ reparse:
json_object_set_new_nocheck(val, "server", json_integer(client->server));
s = json_dumps(val, 0);
ck_rlock(&cdata->lock);
/* Do not send messages of clients we've already dropped */
/* Do not send messages of clients we've already dropped. We
* do this unlocked as the occasional false negative can be
* filtered by the stratifier. */
if (likely(client->fd != -1)) {
if (ckp->passthrough)
send_proc(ckp->generator, s);
else
send_proc(ckp->stratifier, s);
}
ck_runlock(&cdata->lock);
free(s);
json_decref(val);

6
src/libckpool.c

@ -1295,12 +1295,6 @@ void *_ckzalloc(size_t len, const char *file, const char *func, const int line)
return ptr;
}
void _dealloc(void **ptr)
{
free(*ptr);
*ptr = NULL;
}
/* Adequate size s==len*2 + 1 must be alloced to use this variant */
void __bin2hex(void *vs, const void *vp, size_t len)
{

6
src/libckpool.h

@ -177,7 +177,10 @@ static inline void flip_80(void *dest_p, const void *src_p)
#define ckalloc(len) _ckalloc(len, __FILE__, __func__, __LINE__)
#define ckzalloc(len) _ckzalloc(len, __FILE__, __func__, __LINE__)
#define dealloc(ptr) _dealloc((void *)&(ptr))
#define dealloc(ptr) do { \
free(ptr); \
ptr = NULL; \
} while (0)
#define VASPRINTF(strp, fmt, ...) do { \
if (unlikely(vasprintf(strp, fmt, ##__VA_ARGS__) < 0)) \
@ -499,7 +502,6 @@ void trail_slash(char **buf);
void *_ckalloc(size_t len, const char *file, const char *func, const int line);
void *json_ckalloc(size_t size);
void *_ckzalloc(size_t len, const char *file, const char *func, const int line);
void _dealloc(void **ptr);
extern const int hex2bin_tbl[];
void __bin2hex(void *vs, const void *vp, size_t len);
void *bin2hex(const void *vp, size_t len);

153
src/stratifier.c

@ -185,6 +185,8 @@ struct user_instance {
bool authorised; /* Has this username ever been authorised? */
time_t auth_time;
time_t failed_authtime; /* Last time this username failed to authorise */
int auth_backoff; /* How long to reject any auth attempts since last failure */
};
/* Combined data from workers with the same workername */
@ -334,7 +336,7 @@ struct stratifier_data {
cklock_t instance_lock;
share_t *shares;
cklock_t share_lock;
pthread_mutex_t share_lock;
int64_t shares_generated;
@ -546,7 +548,7 @@ static void purge_share_hashtable(sdata_t *sdata, int64_t wb_id)
share_t *share, *tmp;
int purged = 0;
ck_wlock(&sdata->share_lock);
mutex_lock(&sdata->share_lock);
HASH_ITER(hh, sdata->shares, share, tmp) {
if (share->workbase_id < wb_id) {
HASH_DEL(sdata->shares, share);
@ -554,7 +556,7 @@ static void purge_share_hashtable(sdata_t *sdata, int64_t wb_id)
purged++;
}
}
ck_wunlock(&sdata->share_lock);
mutex_unlock(&sdata->share_lock);
if (purged)
LOGINFO("Cleared %d shares from share hashtable", purged);
@ -566,7 +568,7 @@ static void age_share_hashtable(sdata_t *sdata, int64_t wb_id)
share_t *share, *tmp;
int aged = 0;
ck_wlock(&sdata->share_lock);
mutex_lock(&sdata->share_lock);
HASH_ITER(hh, sdata->shares, share, tmp) {
if (share->workbase_id == wb_id) {
HASH_DEL(sdata->shares, share);
@ -574,7 +576,7 @@ static void age_share_hashtable(sdata_t *sdata, int64_t wb_id)
aged++;
}
}
ck_wunlock(&sdata->share_lock);
mutex_unlock(&sdata->share_lock);
if (aged)
LOGINFO("Aged %d shares from share hashtable", aged);
@ -920,7 +922,7 @@ static void __add_dead(sdata_t *sdata, stratum_instance_t *client)
static void __del_dead(sdata_t *sdata, stratum_instance_t *client)
{
DL_DELETE_INIT(sdata->dead_instances, client);
DL_DELETE(sdata->dead_instances, client);
sdata->stats.dead--;
}
@ -1161,6 +1163,25 @@ static stratum_instance_t *ref_instance_by_id(sdata_t *sdata, int64_t id)
return instance;
}
/* Has this client_id already been used and is now in one of the dropped lists */
static bool __dropped_instance(sdata_t *sdata, int64_t id)
{
stratum_instance_t *client, *tmp;
bool ret = true;
HASH_ITER(hh, sdata->disconnected_instances, client, tmp) {
if (unlikely(client->id == id))
goto out;
}
DL_FOREACH(sdata->dead_instances, client) {
if (unlikely(client->id == id))
goto out;
}
ret = false;
out:
return ret;
}
/* Ret = 1 is disconnected, 2 is killed, 3 is workerless killed */
static int __drop_client(sdata_t *sdata, stratum_instance_t *client, user_instance_t *instance)
{
@ -1169,7 +1190,7 @@ static int __drop_client(sdata_t *sdata, stratum_instance_t *client, user_instan
HASH_DEL(sdata->stratum_instances, client);
if (instance)
DL_DELETE_INIT(instance->instances, client);
DL_DELETE(instance->instances, client);
HASH_FIND(hh, sdata->disconnected_instances, &client->enonce1_64, sizeof(uint64_t), old_client);
/* Only keep around one copy of the old client in server mode */
if (!client->ckp->proxy && !old_client && client->enonce1_64 && client->authorised) {
@ -1275,7 +1296,7 @@ static uint64_t disconnected_sessionid_exists(sdata_t *sdata, const char *sessio
}
instance = NULL;
HASH_FIND(hh, sdata->disconnected_instances, &enonce1_64, sizeof(uint64_t), instance);
if (instance) {
if (instance && !instance->ref) {
/* Delete the entry once we are going to use it since there
* will be a new instance with the enonce1_64 */
old_id = instance->id;
@ -1484,8 +1505,6 @@ static void reset_bestshares(sdata_t *sdata)
ck_runlock(&sdata->instance_lock);
}
/* Ram from blocks is NOT freed at all for now, only their entry is removed
* from the linked list, leaving a very small leak here and reject. */
static void block_solve(ckpool_t *ckp, const char *blockhash)
{
ckmsg_t *block, *tmp, *found = NULL;
@ -1514,7 +1533,7 @@ static void block_solve(ckpool_t *ckp, const char *blockhash)
if (!strcmp(solvehash, blockhash)) {
dealloc(solvehash);
found = block;
DL_DELETE_INIT(sdata->block_solves, block);
DL_DELETE(sdata->block_solves, block);
break;
}
dealloc(solvehash);
@ -1561,7 +1580,7 @@ static void block_reject(sdata_t *sdata, const char *blockhash)
if (!strcmp(solvehash, blockhash)) {
dealloc(solvehash);
found = block;
DL_DELETE_INIT(sdata->block_solves, block);
DL_DELETE(sdata->block_solves, block);
break;
}
dealloc(solvehash);
@ -1610,7 +1629,7 @@ static void ckmsgq_stats(ckmsgq_t *ckmsgq, int size, json_t **val)
JSON_CPACK(*val, "{si,si,si}", "count", objects, "memory", memsize, "generated", generated);
}
static char *stratifier_stats(sdata_t *sdata)
static char *stratifier_stats(ckpool_t *ckp, sdata_t *sdata)
{
json_t *val = json_object(), *subval;
int objects, generated;
@ -1652,11 +1671,11 @@ static char *stratifier_stats(sdata_t *sdata)
JSON_CPACK(subval, "{si,si,si}", "count", objects, "memory", memsize, "generated", generated);
json_set_object(val, "dead", subval);
ck_rlock(&sdata->share_lock);
mutex_lock(&sdata->share_lock);
generated = sdata->shares_generated;
objects = HASH_COUNT(sdata->shares);
memsize = SAFE_HASH_OVERHEAD(sdata->shares) + sizeof(share_t) * objects;
ck_runlock(&sdata->share_lock);
mutex_unlock(&sdata->share_lock);
JSON_CPACK(subval, "{si,si,si}", "count", objects, "memory", memsize, "generated", generated);
json_set_object(val, "shares", subval);
@ -1666,8 +1685,10 @@ static char *stratifier_stats(sdata_t *sdata)
/* Don't know exactly how big the string is so just count the pointer for now */
ckmsgq_stats(sdata->srecvs, sizeof(char *), &subval);
json_set_object(val, "srecvs", subval);
ckmsgq_stats(sdata->ckdbq, sizeof(char *), &subval);
json_set_object(val, "ckdbq", subval);
if (!CKP_STANDALONE(ckp)) {
ckmsgq_stats(sdata->ckdbq, sizeof(char *), &subval);
json_set_object(val, "ckdbq", subval);
}
ckmsgq_stats(sdata->stxnq, sizeof(json_params_t), &subval);
json_set_object(val, "stxnq", subval);
@ -1746,7 +1767,7 @@ retry:
char *msg;
LOGDEBUG("Stratifier received stats request");
msg = stratifier_stats(sdata);
msg = stratifier_stats(ckp, sdata);
send_unix_msg(sockd, msg);
Close(sockd);
goto retry;
@ -2132,8 +2153,8 @@ static user_instance_t *generate_user(ckpool_t *ckp, stratum_instance_t *client,
char *base_username = strdupa(workername), *username;
bool new_instance = false, new_worker = false;
sdata_t *sdata = ckp->data;
user_instance_t *instance;
stratum_instance_t *tmp;
worker_instance_t *tmp;
user_instance_t *user;
int len;
username = strsep(&base_username, "._");
@ -2144,18 +2165,19 @@ static user_instance_t *generate_user(ckpool_t *ckp, stratum_instance_t *client,
username[127] = '\0';
ck_wlock(&sdata->instance_lock);
HASH_FIND_STR(sdata->user_instances, username, instance);
if (!instance) {
HASH_FIND_STR(sdata->user_instances, username, user);
if (!user) {
/* New user instance. Secondary user id will be NULL */
instance = ckzalloc(sizeof(user_instance_t));
strcpy(instance->username, username);
user = ckzalloc(sizeof(user_instance_t));
user->auth_backoff = 3; /* Set initial backoff to 3 seconds */
strcpy(user->username, username);
new_instance = true;
instance->id = sdata->user_instance_id++;
HASH_ADD_STR(sdata->user_instances, username, instance);
user->id = sdata->user_instance_id++;
HASH_ADD_STR(sdata->user_instances, username, user);
}
DL_FOREACH(instance->instances, tmp) {
DL_FOREACH(user->worker_instances, tmp) {
if (!safecmp(workername, tmp->workername)) {
client->worker_instance = tmp->worker_instance;
client->worker_instance = tmp;
break;
}
}
@ -2165,29 +2187,29 @@ static user_instance_t *generate_user(ckpool_t *ckp, stratum_instance_t *client,
worker_instance_t *worker = ckzalloc(sizeof(worker_instance_t));
worker->workername = strdup(workername);
worker->instance = instance;
DL_APPEND(instance->worker_instances, worker);
worker->instance = user;
DL_APPEND(user->worker_instances, worker);
new_worker = true;
worker->start_time = time(NULL);
client->worker_instance = worker;
}
DL_APPEND(instance->instances, client);
DL_APPEND(user->instances, client);
ck_wunlock(&sdata->instance_lock);
if (CKP_STANDALONE(ckp) && new_instance)
read_userstats(ckp, instance);
read_userstats(ckp, user);
if (CKP_STANDALONE(ckp) && new_worker)
read_workerstats(ckp, client->worker_instance);
if (new_instance && !ckp->proxy) {
/* Is this a btc address based username? */
if (len > 26 && len < 35)
instance->btcaddress = test_address(ckp, username);
LOGNOTICE("Added new user %s%s", username, instance->btcaddress ?
user->btcaddress = test_address(ckp, username);
LOGNOTICE("Added new user %s%s", username, user->btcaddress ?
" as address based registration" : "");
}
return instance;
return user;
}
/* Send this to the database and parse the response to authorise a user
@ -2368,8 +2390,17 @@ static json_t *parse_authorise(stratum_instance_t *client, json_t *params_val, j
ts_realtime(&now);
client->start_time = now.tv_sec;
strcpy(client->address, address);
client->workername = strdup(buf);
if (user_instance->failed_authtime) {
time_t now_t = time(NULL);
if (now_t < user_instance->failed_authtime + user_instance->auth_backoff) {
LOGNOTICE("Client %ld worker %s rate limited due to failed auth attempts",
client->id, buf);
client->dropped = true;
goto out;
}
}
if (CKP_STANDALONE(ckp))
ret = true;
else {
@ -2396,9 +2427,15 @@ static json_t *parse_authorise(stratum_instance_t *client, json_t *params_val, j
inc_worker(ckp, user_instance);
LOGNOTICE("Authorised client %ld worker %s as user %s", client->id, buf,
user_instance->username);
user_instance->auth_backoff = 3; /* Reset auth backoff time */
} else {
LOGNOTICE("Client %ld worker %s failed to authorise as user %s", client->id, buf,
user_instance->username);
user_instance->failed_authtime = time(NULL);
user_instance->auth_backoff <<= 1;
/* Cap backoff time to 10 mins */
if (user_instance->auth_backoff > 600)
user_instance->auth_backoff = 600;
}
out:
return json_boolean(ret);
@ -2730,12 +2767,12 @@ static bool new_share(sdata_t *sdata, const uchar *hash, int64_t wb_id)
memcpy(share->hash, hash, 32);
share->workbase_id = wb_id;
ck_wlock(&sdata->share_lock);
mutex_lock(&sdata->share_lock);
sdata->shares_generated++;
HASH_FIND(hh, sdata->shares, hash, 32, match);
if (likely(!match))
HASH_ADD(hh, sdata->shares, hash, 32, share);
ck_wunlock(&sdata->share_lock);
mutex_unlock(&sdata->share_lock);
if (unlikely(match)) {
dealloc(share);
@ -3288,6 +3325,12 @@ static void parse_method(sdata_t *sdata, stratum_instance_t *client, const int64
return;
}
static void free_smsg(smsg_t *msg)
{
json_decref(msg->json_msg);
free(msg);
}
/* Entered with client holding ref count */
static void parse_instance_msg(sdata_t *sdata, smsg_t *msg, stratum_instance_t *client)
{
@ -3322,15 +3365,14 @@ static void parse_instance_msg(sdata_t *sdata, smsg_t *msg, stratum_instance_t *
}
parse_method(sdata, client, client_id, id_val, method, params, msg->address);
out:
json_decref(val);
free(msg);
free_smsg(msg);
}
static void srecv_process(ckpool_t *ckp, char *buf)
{
bool noid = false, dropped = false;
sdata_t *sdata = ckp->data;
stratum_instance_t *client;
bool added = false;
smsg_t *msg;
json_t *val;
int server;
@ -3378,13 +3420,25 @@ static void srecv_process(ckpool_t *ckp, char *buf)
client = __instance_by_id(sdata, msg->client_id);
/* If client_id instance doesn't exist yet, create one */
if (unlikely(!client)) {
client = __stratum_add_instance(ckp, msg->client_id, server);
added = true;
}
__inc_instance_ref(client);
if (likely(!__dropped_instance(sdata, msg->client_id))) {
noid = true;
client = __stratum_add_instance(ckp, msg->client_id, server);
} else
dropped = true;
} else if (unlikely(client->dropped))
dropped = true;
if (likely(!dropped))
__inc_instance_ref(client);
ck_wunlock(&sdata->instance_lock);
if (added)
if (unlikely(dropped)) {
/* Client may be NULL here */
LOGNOTICE("Stratifier skipped dropped instance %ld message server %d",
msg->client_id, server);
free_smsg(msg);
goto out;
}
if (unlikely(noid))
LOGINFO("Stratifier added instance %ld server %d", client->id, server);
parse_instance_msg(sdata, msg, client);
@ -4175,10 +4229,11 @@ int stratifier(proc_instance_t *pi)
threads = threads / 2 ? : 1;
sdata->srecvs = create_ckmsgqs(ckp, "sreceiver", &srecv_process, threads);
sdata->sauthq = create_ckmsgq(ckp, "authoriser", &sauth_process);
sdata->ckdbq = create_ckmsgq(ckp, "ckdbqueue", &ckdbq_process);
sdata->stxnq = create_ckmsgq(ckp, "stxnq", &send_transactions);
if (!CKP_STANDALONE(ckp))
if (!CKP_STANDALONE(ckp)) {
sdata->ckdbq = create_ckmsgq(ckp, "ckdbqueue", &ckdbq_process);
create_pthread(&pth_heartbeat, ckdb_heartbeat, ckp);
}
cklock_init(&sdata->workbase_lock);
if (!ckp->proxy)
@ -4187,7 +4242,7 @@ int stratifier(proc_instance_t *pi)
mutex_init(&sdata->stats_lock);
create_pthread(&pth_statsupdate, statsupdate, ckp);
cklock_init(&sdata->share_lock);
mutex_init(&sdata->share_lock);
mutex_init(&sdata->block_lock);
LOGWARNING("%s stratifier ready", ckp->name);

22
src/utlist.h

@ -572,28 +572,6 @@ do {
} \
} while (0)
#define DL_DELETE_INIT(head,del) \
DL_DELETE3(head,del,prev,next)
#define DL_DELETE3(head,del,prev,next) \
do { \
assert((del)->prev != NULL); \
if ((del)->prev == (del)) { \
(head)=NULL; \
} else if ((del)==(head)) { \
(del)->next->prev = (del)->prev; \
(head) = (del)->next; \
} else { \
(del)->prev->next = (del)->next; \
if ((del)->next) { \
(del)->next->prev = (del)->prev; \
} else { \
(head)->prev = (del)->prev; \
} \
} \
(del)->prev = (del)->next = NULL; \
} while (0)
#define DL_COUNT(head,el,counter) \
DL_COUNT2(head,el,counter,next) \

Loading…
Cancel
Save