Browse Source

Include client id in the key for remote workbases to avoid collisions from multiple remote servers.

master
Con Kolivas 8 years ago
parent
commit
5ca1781355
  1. 55
      src/stratifier.c

55
src/stratifier.c

@ -83,7 +83,13 @@ typedef struct pool_stats pool_stats_t;
struct workbase { struct workbase {
/* Hash table data */ /* Hash table data */
UT_hash_handle hh; UT_hash_handle hh;
/* The next two fields need to be consecutive as both of them are
* used as the key for their hashtable entry in remote_workbases */
int64_t id; int64_t id;
/* The client id this workinfo came from if remote */
int64_t client_id;
char idstring[20]; char idstring[20];
/* How many readers we currently have of this workbase, set /* How many readers we currently have of this workbase, set
@ -92,8 +98,6 @@ struct workbase {
/* The id a remote workinfo is mapped to locally */ /* The id a remote workinfo is mapped to locally */
int64_t mapped_id; int64_t mapped_id;
/* The client id this remote workinfo came from */
int64_t client_id;
ts_t gentime; ts_t gentime;
tv_t retired; tv_t retired;
@ -1748,6 +1752,14 @@ out:
return ret; return ret;
} }
/* Remote workbases are keyed by the combined values of wb->id and
* wb->client_id to prevent collisions in the unlikely event two remote
* servers are generating the same workbase ids. */
static void __add_to_remote_workbases(sdata_t *sdata, workbase_t *wb)
{
HASH_ADD(hh, sdata->remote_workbases, id, sizeof(int64_t) * 2, wb);
}
static void check_incomplete_wbs(ckpool_t *ckp, sdata_t *sdata) static void check_incomplete_wbs(ckpool_t *ckp, sdata_t *sdata)
{ {
workbase_t *wb, *tmp; workbase_t *wb, *tmp;
@ -1769,7 +1781,7 @@ static void check_incomplete_wbs(ckpool_t *ckp, sdata_t *sdata)
/* Readd it to the hashlist */ /* Readd it to the hashlist */
ck_wlock(&sdata->workbase_lock); ck_wlock(&sdata->workbase_lock);
HASH_ADD_I64(sdata->remote_workbases, id, wb); __add_to_remote_workbases(sdata, wb);
} }
ck_wunlock(&sdata->workbase_lock); ck_wunlock(&sdata->workbase_lock);
@ -1810,7 +1822,7 @@ static void add_remote_base(ckpool_t *ckp, sdata_t *sdata, workbase_t *wb)
ck_wlock(&sdata->workbase_lock); ck_wlock(&sdata->workbase_lock);
} }
} }
HASH_ADD_I64(sdata->remote_workbases, id, wb); __add_to_remote_workbases(sdata, wb);
ck_wunlock(&sdata->workbase_lock); ck_wunlock(&sdata->workbase_lock);
val = generate_workinfo(ckp, wb, __func__); val = generate_workinfo(ckp, wb, __func__);
@ -2117,12 +2129,21 @@ static workbase_t *get_workbase(sdata_t *sdata, const int64_t id)
return wb; return wb;
} }
static workbase_t *get_remote_workbase(sdata_t *sdata, const int64_t id) static workbase_t *__find_remote_workbase(sdata_t *sdata, const int64_t id, const int64_t client_id)
{
int64_t lookup[2] = {id, client_id};
workbase_t *wb;
HASH_FIND(hh, sdata->remote_workbases, lookup, sizeof(int64_t) * 2, wb);
return wb;
}
static workbase_t *get_remote_workbase(sdata_t *sdata, const int64_t id, const int64_t client_id)
{ {
workbase_t *wb; workbase_t *wb;
ck_wlock(&sdata->workbase_lock); ck_wlock(&sdata->workbase_lock);
HASH_FIND_I64(sdata->remote_workbases, &id, wb); wb = __find_remote_workbase(sdata, id, client_id);
if (wb) { if (wb) {
if (wb->incomplete) if (wb->incomplete)
wb = NULL; wb = NULL;
@ -3681,7 +3702,7 @@ static json_t *user_stats(const user_instance_t *user)
} }
/* Adjust workinfo id to virtual value for remote trusted workinfos */ /* Adjust workinfo id to virtual value for remote trusted workinfos */
static void remap_workinfo_id(sdata_t *sdata, json_t *val) static void remap_workinfo_id(sdata_t *sdata, json_t *val, const int64_t client_id)
{ {
int64_t mapped_id, id; int64_t mapped_id, id;
workbase_t *wb; workbase_t *wb;
@ -3689,7 +3710,7 @@ static void remap_workinfo_id(sdata_t *sdata, json_t *val)
json_get_int64(&id, val, "workinfoid"); json_get_int64(&id, val, "workinfoid");
ck_rlock(&sdata->workbase_lock); ck_rlock(&sdata->workbase_lock);
HASH_FIND_I64(sdata->remote_workbases, &id, wb); wb = __find_remote_workbase(sdata, id, client_id);
if (likely(wb)) if (likely(wb))
mapped_id = wb->mapped_id; mapped_id = wb->mapped_id;
else else
@ -6736,7 +6757,8 @@ static user_instance_t *generate_remote_user(ckpool_t *ckp, const char *workerna
return user; return user;
} }
static void parse_remote_share(ckpool_t *ckp, sdata_t *sdata, json_t *val, const char *buf) static void parse_remote_share(ckpool_t *ckp, sdata_t *sdata, json_t *val, const char *buf,
const int64_t client_id)
{ {
json_t *workername_val = json_object_get(val, "workername"); json_t *workername_val = json_object_get(val, "workername");
worker_instance_t *worker; worker_instance_t *worker;
@ -6784,12 +6806,13 @@ static void parse_remote_share(ckpool_t *ckp, sdata_t *sdata, json_t *val, const
val = json_deep_copy(val); val = json_deep_copy(val);
if (likely(user->secondaryuserid)) if (likely(user->secondaryuserid))
json_set_string(val, "secondaryuserid", user->secondaryuserid); json_set_string(val, "secondaryuserid", user->secondaryuserid);
remap_workinfo_id(sdata, val); remap_workinfo_id(sdata, val, client_id);
ckdbq_add(ckp, ID_SHARES, val); ckdbq_add(ckp, ID_SHARES, val);
} }
static void parse_remote_shareerr(ckpool_t *ckp, sdata_t *sdata, json_t *val, const char *buf) static void parse_remote_shareerr(ckpool_t *ckp, sdata_t *sdata, json_t *val, const char *buf,
const int64_t client_id)
{ {
user_instance_t *user = NULL; user_instance_t *user = NULL;
const char *workername; const char *workername;
@ -6807,7 +6830,7 @@ static void parse_remote_shareerr(ckpool_t *ckp, sdata_t *sdata, json_t *val, co
val = json_deep_copy(val); val = json_deep_copy(val);
if (likely(user->secondaryuserid)) if (likely(user->secondaryuserid))
json_set_string(val, "secondaryuserid", user->secondaryuserid); json_set_string(val, "secondaryuserid", user->secondaryuserid);
remap_workinfo_id(sdata, val); remap_workinfo_id(sdata, val, client_id);
ckdbq_add(ckp, ID_SHAREERR, val); ckdbq_add(ckp, ID_SHAREERR, val);
} }
@ -6981,7 +7004,7 @@ static void parse_remote_block(ckpool_t *ckp, sdata_t *sdata, json_t *val, const
json_get_double(&diff, val, "diff"); json_get_double(&diff, val, "diff");
if (likely(id && coinbasehex && swaphex && cblen)) if (likely(id && coinbasehex && swaphex && cblen))
wb = get_remote_workbase(sdata, id); wb = get_remote_workbase(sdata, id, client_id);
if (unlikely(!wb)) if (unlikely(!wb))
LOGWARNING("Inadequate data locally to attempt submit of remote block"); LOGWARNING("Inadequate data locally to attempt submit of remote block");
@ -7016,7 +7039,7 @@ static void parse_remote_block(ckpool_t *ckp, sdata_t *sdata, json_t *val, const
out_add: out_add:
/* Make a duplicate for use by ckdbq_add */ /* Make a duplicate for use by ckdbq_add */
val = json_deep_copy(val); val = json_deep_copy(val);
remap_workinfo_id(sdata, val); remap_workinfo_id(sdata, val, client_id);
if (!ckp->remote) if (!ckp->remote)
downstream_json(sdata, val, client_id, SSEND_PREPEND); downstream_json(sdata, val, client_id, SSEND_PREPEND);
@ -7170,7 +7193,7 @@ static void parse_trusted_msg(ckpool_t *ckp, sdata_t *sdata, json_t *val, stratu
} }
if (likely(!safecmp(method, stratum_msgs[SM_SHARE]))) if (likely(!safecmp(method, stratum_msgs[SM_SHARE])))
parse_remote_share(ckp, sdata, val, buf); parse_remote_share(ckp, sdata, val, buf, client->id);
else if (!safecmp(method, stratum_msgs[SM_TRANSACTIONS])) else if (!safecmp(method, stratum_msgs[SM_TRANSACTIONS]))
add_node_txns(ckp, sdata, val); add_node_txns(ckp, sdata, val);
else if (!safecmp(method, stratum_msgs[SM_WORKERSTATS])) else if (!safecmp(method, stratum_msgs[SM_WORKERSTATS]))
@ -7180,7 +7203,7 @@ static void parse_trusted_msg(ckpool_t *ckp, sdata_t *sdata, json_t *val, stratu
else if (!safecmp(method, stratum_msgs[SM_AUTH])) else if (!safecmp(method, stratum_msgs[SM_AUTH]))
parse_remote_auth(ckp, sdata, val, client, client->id); parse_remote_auth(ckp, sdata, val, client, client->id);
else if (!safecmp(method, stratum_msgs[SM_SHAREERR])) else if (!safecmp(method, stratum_msgs[SM_SHAREERR]))
parse_remote_shareerr(ckp, sdata, val, buf); parse_remote_shareerr(ckp, sdata, val, buf, client->id);
else if (!safecmp(method, stratum_msgs[SM_BLOCK])) else if (!safecmp(method, stratum_msgs[SM_BLOCK]))
parse_remote_block(ckp, sdata, val, buf, client->id); parse_remote_block(ckp, sdata, val, buf, client->id);
else if (!safecmp(method, stratum_msgs[SM_REQTXNS])) else if (!safecmp(method, stratum_msgs[SM_REQTXNS]))

Loading…
Cancel
Save