kanoi 9 years ago
parent
commit
999d2174c1
  1. 2
      src/ckpool.h
  2. 107
      src/connector.c
  3. 121
      src/stratifier.c

2
src/ckpool.h

@ -277,6 +277,7 @@ enum stratum_msgtype {
SM_WORKINFO, SM_WORKINFO,
SM_SUGGESTDIFF, SM_SUGGESTDIFF,
SM_BLOCK, SM_BLOCK,
SM_PONG,
SM_NONE SM_NONE
}; };
@ -298,6 +299,7 @@ static const char __maybe_unused *stratum_msgs[] = {
"workinfo", "workinfo",
"suggestdiff", "suggestdiff",
"block", "block",
"pong",
"" ""
}; };

107
src/connector.c

@ -55,7 +55,7 @@ struct client_instance {
/* Which serverurl is this instance connected to */ /* Which serverurl is this instance connected to */
int server; int server;
char buf[PAGESIZE]; char *buf;
unsigned long bufofs; unsigned long bufofs;
/* Are we currently sending a blocked message from this client */ /* Are we currently sending a blocked message from this client */
@ -205,6 +205,10 @@ static client_instance_t *recruit_client(cdata_t *cdata)
client = ckzalloc(sizeof(client_instance_t)); client = ckzalloc(sizeof(client_instance_t));
} else } else
LOGDEBUG("Connector recycled client instance"); LOGDEBUG("Connector recycled client instance");
client->buf = realloc(client->buf, PAGESIZE);
client->buf[0] = '\0';
return client; return client;
} }
@ -457,20 +461,22 @@ static void parse_redirector_share(client_instance_t *client, const json_t *val)
static void parse_client_msg(cdata_t *cdata, client_instance_t *client) static void parse_client_msg(cdata_t *cdata, client_instance_t *client)
{ {
ckpool_t *ckp = cdata->ckp; ckpool_t *ckp = cdata->ckp;
char msg[PAGESIZE], *eol;
int buflen, ret; int buflen, ret;
char *msg, *eol;
json_t *val; json_t *val;
retry: retry:
if (unlikely(client->bufofs > MAX_MSGSIZE)) { if (unlikely(client->bufofs > MAX_MSGSIZE)) {
if (!client->remote) {
LOGNOTICE("Client id %"PRId64" fd %d overloaded buffer without EOL, disconnecting", LOGNOTICE("Client id %"PRId64" fd %d overloaded buffer without EOL, disconnecting",
client->id, client->fd); client->id, client->fd);
invalidate_client(ckp, cdata, client); invalidate_client(ckp, cdata, client);
return; return;
} }
buflen = PAGESIZE - client->bufofs; client->buf = realloc(client->buf, round_up_page(client->bufofs + MAX_MSGSIZE + 1));
}
/* This read call is non-blocking since the socket is set to O_NOBLOCK */ /* This read call is non-blocking since the socket is set to O_NOBLOCK */
ret = read(client->fd, client->buf + client->bufofs, buflen); ret = read(client->fd, client->buf + client->bufofs, MAX_MSGSIZE);
if (ret < 1) { if (ret < 1) {
if (likely(errno == EAGAIN || errno == EWOULDBLOCK || !ret)) if (likely(errno == EAGAIN || errno == EWOULDBLOCK || !ret))
return; return;
@ -487,12 +493,13 @@ reparse:
/* Do something useful with this message now */ /* Do something useful with this message now */
buflen = eol - client->buf + 1; buflen = eol - client->buf + 1;
if (unlikely(buflen > MAX_MSGSIZE)) { if (unlikely(buflen > MAX_MSGSIZE && !client->remote)) {
LOGNOTICE("Client id %"PRId64" fd %d message oversize, disconnecting", client->id, client->fd); LOGNOTICE("Client id %"PRId64" fd %d message oversize, disconnecting", client->id, client->fd);
invalidate_client(ckp, cdata, client); invalidate_client(ckp, cdata, client);
return; return;
} }
msg = alloca(round_up_page(buflen + 1));
memcpy(msg, client->buf, buflen); memcpy(msg, client->buf, buflen);
msg[buflen] = '\0'; msg[buflen] = '\0';
client->bufofs -= buflen; client->bufofs -= buflen;
@ -1009,6 +1016,7 @@ static bool connect_upstream(ckpool_t *ckp, connsock_t *cs)
bool res, ret = false; bool res, ret = false;
float timeout = 10; float timeout = 10;
cksem_wait(&cs->sem);
cs->fd = connect_socket(cs->url, cs->port); cs->fd = connect_socket(cs->url, cs->port);
if (cs->fd < 0) { if (cs->fd < 0) {
LOGWARNING("Failed to connect to upstream server %s:%s", cs->url, cs->port); LOGWARNING("Failed to connect to upstream server %s:%s", cs->url, cs->port);
@ -1047,6 +1055,8 @@ static bool connect_upstream(ckpool_t *ckp, connsock_t *cs)
LOGWARNING("Connected to upstream server %s:%s as trusted remote", cs->url, cs->port); LOGWARNING("Connected to upstream server %s:%s as trusted remote", cs->url, cs->port);
ret = true; ret = true;
out: out:
cksem_post(&cs->sem);
return ret; return ret;
} }
@ -1078,10 +1088,92 @@ out:
free(buf); free(buf);
} }
static void parse_remote_submitblock(ckpool_t *ckp, const json_t *val, const char *buf)
{
const char *gbt_block = json_string_value(json_object_get(val, "submitblock"));
if (unlikely(!gbt_block)) {
LOGWARNING("Failed to find submitblock data from upstream submitblock method %s",
buf);
return;
}
LOGWARNING("Submitting possible upstream block!");
send_proc(ckp->generator, gbt_block);
}
static void ping_upstream(cdata_t *cdata)
{
char *buf;
ASPRINTF(&buf, "{\"method\":\"ping\"}\n");
ckmsgq_add(cdata->upstream_sends, buf);
}
static void *urecv_process(void *arg)
{
ckpool_t *ckp = (ckpool_t *)arg;
cdata_t *cdata = ckp->data;
connsock_t *cs = &cdata->upstream_cs;
bool alive = true;
ckp->proxy = true;
rename_proc("ureceiver");
pthread_detach(pthread_self());
while (42) {
const char *method;
float timeout = 5;
json_t *val;
int ret;
cksem_wait(&cs->sem);
ret = read_socket_line(cs, &timeout);
if (ret < 1) {
ping_upstream(cdata);
if (likely(!ret)) {
LOGDEBUG("No message from upstream pool");
} else {
LOGNOTICE("Failed to read from upstream pool");
alive = false;
}
goto nomsg;
}
alive = true;
val = json_loads(cs->buf, 0, NULL);
if (unlikely(!val)) {
LOGWARNING("Received non-json msg from upstream pool %s",
cs->buf);
goto nomsg;
}
method = json_string_value(json_object_get(val, "method"));
if (unlikely(!method)) {
LOGWARNING("Failed to find method from upstream pool json %s",
cs->buf);
json_decref(val);
goto nomsg;
}
if (!safecmp(method, "submitblock"))
parse_remote_submitblock(ckp, val, cs->buf);
else if (!safecmp(method, "pong"))
LOGDEBUG("Received upstream pong");
else
LOGWARNING("Unrecognised upstream method %s", method);
nomsg:
cksem_post(&cs->sem);
if (!alive)
sleep(5);
}
return NULL;
}
static bool setup_upstream(ckpool_t *ckp, cdata_t *cdata) static bool setup_upstream(ckpool_t *ckp, cdata_t *cdata)
{ {
connsock_t *cs = &cdata->upstream_cs; connsock_t *cs = &cdata->upstream_cs;
bool ret = false; bool ret = false;
pthread_t pth;
cs->ckp = ckp; cs->ckp = ckp;
if (!ckp->upstream) { if (!ckp->upstream) {
@ -1093,11 +1185,14 @@ static bool setup_upstream(ckpool_t *ckp, cdata_t *cdata)
goto out; goto out;
} }
cksem_init(&cs->sem);
cksem_post(&cs->sem);
/* Must succeed on initial connect to upstream pool */ /* Must succeed on initial connect to upstream pool */
if (!connect_upstream(ckp, cs)) { if (!connect_upstream(ckp, cs)) {
LOGEMERG("Failed initial connect to upstream server %s:%s", cs->url, cs->port); LOGEMERG("Failed initial connect to upstream server %s:%s", cs->url, cs->port);
goto out; goto out;
} }
create_pthread(&pth, urecv_process, ckp);
cdata->upstream_sends = create_ckmsgq(ckp, "usender", &usend_process); cdata->upstream_sends = create_ckmsgq(ckp, "usender", &usend_process);
ret = true; ret = true;
out: out:
@ -1139,7 +1234,7 @@ static void drop_passthrough_client(cdata_t *cdata, const int64_t id)
client_id = id & 0xffffffffll; client_id = id & 0xffffffffll;
/* We have a direct connection to the passthrough's connector so we /* We have a direct connection to the passthrough's connector so we
* can send it any regular commands. */ * can send it any regular commands. */
ASPRINTF(&msg, "dropclient=%"PRId64, client_id); ASPRINTF(&msg, "dropclient=%"PRId64"\n", client_id);
send_client(cdata, id, msg); send_client(cdata, id, msg);
} }

121
src/stratifier.c

@ -472,6 +472,7 @@ struct stratifier_data {
stratum_instance_t *stratum_instances; stratum_instance_t *stratum_instances;
stratum_instance_t *recycled_instances; stratum_instance_t *recycled_instances;
stratum_instance_t *node_instances; stratum_instance_t *node_instances;
stratum_instance_t *remote_instances;
int stratum_generated; int stratum_generated;
int disconnected_generated; int disconnected_generated;
@ -1378,6 +1379,56 @@ static void send_node_block(sdata_t *sdata, const char *enonce1, const char *non
} }
} }
static void upstream_blocksubmit(ckpool_t *ckp, const char *gbt_block)
{
char *buf;
ASPRINTF(&buf, "upstream={\"method\":\"submitblock\",\"submitblock\":\"%s\"}\n",
gbt_block);
send_proc(ckp->connector, buf);
free(buf);
}
static void downstream_blocksubmits(ckpool_t *ckp, const char *gbt_block, const stratum_instance_t *source)
{
stratum_instance_t *client;
sdata_t *sdata = ckp->data;
ckmsg_t *bulk_send = NULL;
int messages = 0;
ck_rlock(&sdata->instance_lock);
if (sdata->remote_instances) {
json_t *val = json_object();
JSON_CPACK(val, "{ss,ss}",
"method", "submitblock",
"submitblock", gbt_block);
DL_FOREACH(sdata->remote_instances, client) {
ckmsg_t *client_msg;
smsg_t *msg;
json_t *json_msg;
if (client == source)
continue;
json_msg = json_copy(val);
client_msg = ckalloc(sizeof(ckmsg_t));
msg = ckzalloc(sizeof(smsg_t));
msg->json_msg = json_msg;
msg->client_id = client->id;
client_msg->data = msg;
DL_APPEND(bulk_send, client_msg);
messages++;
}
json_decref(val);
}
ck_runlock(&sdata->instance_lock);
if (bulk_send) {
LOGNOTICE("Sending submitblock to downstream servers");
ssend_bulk_prepend(sdata, bulk_send, messages);
}
}
static void static void
process_block(ckpool_t *ckp, const workbase_t *wb, const char *coinbase, const int cblen, process_block(ckpool_t *ckp, const workbase_t *wb, const char *coinbase, const int cblen,
const uchar *data, const uchar *hash, uchar *swap32, char *blockhash) const uchar *data, const uchar *hash, uchar *swap32, char *blockhash)
@ -1414,9 +1465,11 @@ process_block(ckpool_t *ckp, const workbase_t *wb, const char *coinbase, const i
if (wb->transactions) if (wb->transactions)
realloc_strcat(&gbt_block, wb->txn_data); realloc_strcat(&gbt_block, wb->txn_data);
send_generator(ckp, gbt_block, GEN_PRIORITY); send_generator(ckp, gbt_block, GEN_PRIORITY);
if (ckp->remote)
upstream_blocksubmit(ckp, gbt_block);
else
downstream_blocksubmits(ckp, gbt_block, NULL);
free(gbt_block); free(gbt_block);
} }
static void submit_node_block(ckpool_t *ckp, sdata_t *sdata, json_t *val) static void submit_node_block(ckpool_t *ckp, sdata_t *sdata, json_t *val)
@ -2444,6 +2497,8 @@ static void __drop_client(sdata_t *sdata, stratum_instance_t *client, bool lazil
if (unlikely(client->node)) if (unlikely(client->node))
DL_DELETE(sdata->node_instances, client); DL_DELETE(sdata->node_instances, client);
if (unlikely(client->remote))
DL_DELETE(sdata->remote_instances, client);
if (client->workername) { if (client->workername) {
if (user) { if (user) {
ASPRINTF(msg, "Dropped client %"PRId64" %s %suser %s worker %s %s", ASPRINTF(msg, "Dropped client %"PRId64" %s %suser %s worker %s %s",
@ -2602,11 +2657,6 @@ static void stratum_broadcast(sdata_t *sdata, json_t *val, const int msg_type)
return; return;
} }
if (ckp->node) {
json_decref(val);
return;
}
/* Use this locking as an opportunity to test other clients. */ /* Use this locking as an opportunity to test other clients. */
ck_rlock(&ckp_sdata->instance_lock); ck_rlock(&ckp_sdata->instance_lock);
HASH_ITER(hh, ckp_sdata->stratum_instances, client, tmp) { HASH_ITER(hh, ckp_sdata->stratum_instances, client, tmp) {
@ -2892,8 +2942,9 @@ static void block_solve(ckpool_t *ckp, const char *blockhash)
} }
mutex_unlock(&sdata->block_lock); mutex_unlock(&sdata->block_lock);
if (unlikely(!found)) { if (!found) {
LOGERR("Failed to find blockhash %s in block_solve!", blockhash); LOGINFO("Failed to find blockhash %s in block_solve, possibly from downstream",
blockhash);
return; return;
} }
@ -2971,8 +3022,9 @@ static void block_reject(sdata_t *sdata, const char *blockhash)
} }
mutex_unlock(&sdata->block_lock); mutex_unlock(&sdata->block_lock);
if (unlikely(!found)) { if (!found) {
LOGERR("Failed to find blockhash %s in block_reject!", blockhash); LOGINFO("Failed to find blockhash %s in block_reject, possibly from downstream",
blockhash);
return; return;
} }
val = found->data; val = found->data;
@ -5539,9 +5591,8 @@ static void add_mining_node(ckpool_t *ckp, sdata_t *sdata, stratum_instance_t *c
{ {
pthread_t pth; pthread_t pth;
client->node = true;
ck_wlock(&sdata->instance_lock); ck_wlock(&sdata->instance_lock);
client->node = true;
DL_APPEND(sdata->node_instances, client); DL_APPEND(sdata->node_instances, client);
__inc_instance_ref(client); __inc_instance_ref(client);
ck_wunlock(&sdata->instance_lock); ck_wunlock(&sdata->instance_lock);
@ -5552,6 +5603,14 @@ static void add_mining_node(ckpool_t *ckp, sdata_t *sdata, stratum_instance_t *c
create_pthread(&pth, set_node_latency, client); create_pthread(&pth, set_node_latency, client);
} }
static void add_remote_server(sdata_t *sdata, stratum_instance_t *client)
{
ck_wlock(&sdata->instance_lock);
client->remote = true;
DL_APPEND(sdata->remote_instances, client);
ck_wunlock(&sdata->instance_lock);
}
/* Enter with client holding ref count */ /* Enter with client holding ref count */
static void parse_method(ckpool_t *ckp, sdata_t *sdata, stratum_instance_t *client, static void parse_method(ckpool_t *ckp, sdata_t *sdata, stratum_instance_t *client,
const int64_t client_id, json_t *id_val, json_t *method_val, const int64_t client_id, json_t *id_val, json_t *method_val,
@ -5610,10 +5669,10 @@ static void parse_method(ckpool_t *ckp, sdata_t *sdata, stratum_instance_t *clie
client_id, client->address, client->server); client_id, client->address, client->server);
connector_drop_client(ckp, client_id); connector_drop_client(ckp, client_id);
} else { } else {
add_remote_server(sdata, client);
snprintf(buf, 255, "remote=%"PRId64, client_id); snprintf(buf, 255, "remote=%"PRId64, client_id);
send_proc(ckp->connector, buf); send_proc(ckp->connector, buf);
} }
client->remote = true;
return; return;
} }
@ -5893,6 +5952,23 @@ static void parse_remote_workers(sdata_t *sdata, json_t *val, const char *buf)
LOGDEBUG("Adding %d remote workers to user %s", workers, username); LOGDEBUG("Adding %d remote workers to user %s", workers, username);
} }
static void parse_remote_blocksubmit(ckpool_t *ckp, json_t *val, const char *buf,
const stratum_instance_t *client)
{
json_t *submitblock_val;
const char *gbt_block;
submitblock_val = json_object_get(val, "submitblock");
gbt_block = json_string_value(submitblock_val);
if (unlikely(!gbt_block)) {
LOGWARNING("Failed to get submitblock data from remote message %s", buf);
return;
}
LOGWARNING("Submitting possible downstream block!");
send_generator(ckp, gbt_block, GEN_PRIORITY);
downstream_blocksubmits(ckp, gbt_block, client);
}
static void parse_remote_block(sdata_t *sdata, json_t *val, const char *buf) static void parse_remote_block(sdata_t *sdata, json_t *val, const char *buf)
{ {
json_t *workername_val = json_object_get(val, "workername"), json_t *workername_val = json_object_get(val, "workername"),
@ -5923,7 +5999,16 @@ static void parse_remote_block(sdata_t *sdata, json_t *val, const char *buf)
reset_bestshares(sdata); reset_bestshares(sdata);
} }
static void parse_trusted_msg(ckpool_t *ckp, sdata_t *sdata, json_t *val, const char *buf) static void send_remote_pong(sdata_t *sdata, stratum_instance_t *client)
{
json_t *json_msg;
JSON_CPACK(json_msg, "{ss}", "method", "pong");
stratum_add_send(sdata, json_msg, client->id, SM_PONG);
}
static void parse_trusted_msg(ckpool_t *ckp, sdata_t *sdata, json_t *val, const char *buf,
stratum_instance_t *client)
{ {
json_t *method_val = json_object_get(val, "method"); json_t *method_val = json_object_get(val, "method");
const char *method; const char *method;
@ -5938,8 +6023,12 @@ static void parse_trusted_msg(ckpool_t *ckp, sdata_t *sdata, json_t *val, const
parse_remote_shares(ckp, sdata, val, buf); parse_remote_shares(ckp, sdata, val, buf);
else if (!safecmp(method, "workers")) else if (!safecmp(method, "workers"))
parse_remote_workers(sdata, val, buf); parse_remote_workers(sdata, val, buf);
else if (!safecmp(method, "submitblock"))
parse_remote_blocksubmit(ckp, val, buf, client);
else if (!safecmp(method, "block")) else if (!safecmp(method, "block"))
parse_remote_block(sdata, val, buf); parse_remote_block(sdata, val, buf);
else if (!safecmp(method, "ping"))
send_remote_pong(sdata, client);
else else
LOGWARNING("unrecognised trusted message %s", buf); LOGWARNING("unrecognised trusted message %s", buf);
} }
@ -6135,7 +6224,7 @@ static void srecv_process(ckpool_t *ckp, char *buf)
LOGINFO("Stratifier added instance %"PRId64" server %d", client->id, server); LOGINFO("Stratifier added instance %"PRId64" server %d", client->id, server);
if (client->remote) if (client->remote)
parse_trusted_msg(ckp, sdata, msg->json_msg, buf); parse_trusted_msg(ckp, sdata, msg->json_msg, buf, client);
else if (ckp->node) else if (ckp->node)
node_client_msg(ckp, msg->json_msg, buf, client); node_client_msg(ckp, msg->json_msg, buf, client);
else else

Loading…
Cancel
Save