Browse Source

Implement an unlocked priority mechanism for the stratifier talking to the generator to not poll it when the responses will be in flux

master
Con Kolivas 10 years ago
parent
commit
47084ccead
  1. 71
      src/stratifier.c

71
src/stratifier.c

@ -266,6 +266,13 @@ static share_t *shares;
static cklock_t share_lock; static cklock_t share_lock;
static int gen_priority;
/* Priority levels for generator messages */
#define GEN_LAX 0
#define GEN_NORMAL 1
#define GEN_PRIORITY 2
#define ID_AUTH 0 #define ID_AUTH 0
#define ID_WORKINFO 1 #define ID_WORKINFO 1
#define ID_AGEWORKINFO 2 #define ID_AGEWORKINFO 2
@ -591,17 +598,62 @@ static void add_base(ckpool_t *ckp, workbase_t *wb, bool *new_block)
} }
} }
/* Mandatory send_recv to the generator which sets the message priority if this
* message is higher priority. Races galore on gen_priority mean this might
* read the wrong priority but occasional wrong values are harmless. */
static char *__send_recv_generator(ckpool_t *ckp, const char *msg, int prio)
{
char *buf = NULL;
bool set;
if (prio > gen_priority) {
gen_priority = prio;
set = true;
} else
set = false;
buf = send_recv_proc(ckp->generator, msg);
if (set)
gen_priority = 0;
return buf;
}
/* Conditionally send_recv a message only if it's equal or higher priority than
* any currently being serviced. */
static char *send_recv_generator(ckpool_t *ckp, const char *msg, int prio)
{
char *buf = NULL;
if (prio >= gen_priority)
buf = __send_recv_generator(ckp, msg, prio);
return buf;
}
static void send_generator(ckpool_t *ckp, const char *msg, int prio)
{
bool set;
if (prio > gen_priority) {
gen_priority = prio;
set = true;
} else
set = false;
send_proc(ckp->generator, msg);
if (set)
gen_priority = 0;
}
/* This function assumes it will only receive a valid json gbt base template /* This function assumes it will only receive a valid json gbt base template
* since checking should have been done earlier, and creates the base template * since checking should have been done earlier, and creates the base template
* for generating work templates. */ * for generating work templates. */
static void update_base(ckpool_t *ckp) static void update_base(ckpool_t *ckp, int prio)
{ {
bool new_block = false; bool new_block = false;
workbase_t *wb; workbase_t *wb;
json_t *val; json_t *val;
char *buf; char *buf;
buf = send_recv_proc(ckp->generator, "getbase"); buf = send_recv_generator(ckp, "getbase", prio);
if (unlikely(!buf)) { if (unlikely(!buf)) {
LOGWARNING("Failed to get base from generator in update_base"); LOGWARNING("Failed to get base from generator in update_base");
return; return;
@ -1026,7 +1078,7 @@ static void block_solve(ckpool_t *ckp)
"createinet", ckp->serverurl); "createinet", ckp->serverurl);
ck_runlock(&workbase_lock); ck_runlock(&workbase_lock);
update_base(ckp); update_base(ckp, GEN_PRIORITY);
ck_rlock(&workbase_lock); ck_rlock(&workbase_lock);
json_set_string(val, "blockhash", current_workbase->prevhash); json_set_string(val, "blockhash", current_workbase->prevhash);
@ -1059,7 +1111,7 @@ retry:
copy_tv(&start_tv, &end_tv); copy_tv(&start_tv, &end_tv);
LOGDEBUG("%ds elapsed in strat_loop, updating gbt base", LOGDEBUG("%ds elapsed in strat_loop, updating gbt base",
ckp->update_interval); ckp->update_interval);
update_base(ckp); update_base(ckp, GEN_NORMAL);
continue; continue;
} }
} }
@ -1098,7 +1150,7 @@ retry:
ret = 0; ret = 0;
goto out; goto out;
} else if (cmdmatch(buf, "update")) { } else if (cmdmatch(buf, "update")) {
update_base(ckp); update_base(ckp, GEN_PRIORITY);
} else if (cmdmatch(buf, "subscribe")) { } else if (cmdmatch(buf, "subscribe")) {
/* Proxifier has a new subscription */ /* Proxifier has a new subscription */
if (!update_subscribe(ckp)) if (!update_subscribe(ckp))
@ -1154,7 +1206,7 @@ static void *blockupdate(void *arg)
memset(hash, 0, 68); memset(hash, 0, 68);
while (42) { while (42) {
dealloc(buf); dealloc(buf);
buf = send_recv_proc(ckp->generator, request); buf = send_recv_generator(ckp, request, GEN_LAX);
if (buf && strcmp(buf, hash) && !cmdmatch(buf, "failed")) { if (buf && strcmp(buf, hash) && !cmdmatch(buf, "failed")) {
strcpy(hash, buf); strcpy(hash, buf);
LOGNOTICE("Block hash changed to %s", hash); LOGNOTICE("Block hash changed to %s", hash);
@ -1265,7 +1317,8 @@ static bool test_address(ckpool_t *ckp, const char *address)
char *buf, *msg; char *buf, *msg;
ASPRINTF(&msg, "checkaddr:%s", address); ASPRINTF(&msg, "checkaddr:%s", address);
buf = send_recv_proc(ckp->generator, msg); /* Must wait for a response here */
buf = __send_recv_generator(ckp, msg, GEN_LAX);
dealloc(msg); dealloc(msg);
if (!buf) if (!buf)
return ret; return ret;
@ -1649,7 +1702,7 @@ test_blocksolve(stratum_instance_t *client, workbase_t *wb, const uchar *data, c
if (wb->transactions) if (wb->transactions)
realloc_strcat(&gbt_block, wb->txn_data); realloc_strcat(&gbt_block, wb->txn_data);
ckp = wb->ckp; ckp = wb->ckp;
send_proc(ckp->generator, gbt_block); send_generator(ckp, gbt_block, GEN_PRIORITY);
free(gbt_block); free(gbt_block);
flip_32(swap, hash); flip_32(swap, hash);
@ -1770,7 +1823,7 @@ static void submit_share(stratum_instance_t *client, int64_t jobid, const char *
"msg_id", msg_id); "msg_id", msg_id);
msg = json_dumps(json_msg, 0); msg = json_dumps(json_msg, 0);
json_decref(json_msg); json_decref(json_msg);
send_proc(ckp->generator, msg); send_generator(ckp, msg, GEN_LAX);
free(msg); free(msg);
} }

Loading…
Cancel
Save