Browse Source

Act as a passthrough in node mode, passing what type of stratum message we're passing through

master
Con Kolivas 9 years ago
parent
commit
b09bb253fc
  1. 2
      src/ckpool.c
  2. 32
      src/ckpool.h
  3. 16
      src/connector.c
  4. 55
      src/stratifier.c

2
src/ckpool.c

@ -1641,7 +1641,7 @@ int main(int argc, char **argv)
case 'N': case 'N':
if (ckp.proxy || ckp.redirector || ckp.userproxy || ckp.passthrough) if (ckp.proxy || ckp.redirector || ckp.userproxy || ckp.passthrough)
quit(1, "Cannot set another proxy type or redirector and node mode"); quit(1, "Cannot set another proxy type or redirector and node mode");
ckp.standalone = ckp.proxy = ckp.node = true; ckp.standalone = ckp.proxy = ckp.passthrough = ckp.node = true;
break; break;
case 'n': case 'n':
ckp.name = optarg; ckp.name = optarg;

32
src/ckpool.h

@ -243,6 +243,38 @@ struct ckpool_instance {
void *data; void *data;
}; };
enum stratum_msgtype {
SM_RECONNECT = 0,
SM_DIFF,
SM_MSG,
SM_UPDATE,
SM_ERROR,
SM_SUBSCRIBE,
SM_SUBSCRIBERESULT,
SM_SHARE,
SM_SHARERESULT,
SM_AUTH,
SM_AUTHRESULT,
SM_TXNS,
SM_TXNSRESULT
};
static const char __maybe_unused *stratum_msgs[] = {
"reconnect",
"diff",
"message",
"update",
"error",
"subscribe",
"subscribe.result",
"share",
"share.result",
"auth",
"auth.result",
"txns",
"txns.result"
};
#ifdef USE_CKDB #ifdef USE_CKDB
#define CKP_STANDALONE(CKP) ((CKP)->standalone == true) #define CKP_STANDALONE(CKP) ((CKP)->standalone == true)
#else #else

16
src/connector.c

@ -504,10 +504,10 @@ reparse:
* do this unlocked as the occasional false negative can be * do this unlocked as the occasional false negative can be
* filtered by the stratifier. */ * filtered by the stratifier. */
if (likely(!client->invalid)) { if (likely(!client->invalid)) {
if (ckp->node || !ckp->passthrough)
send_proc(ckp->stratifier, s);
if (ckp->passthrough) if (ckp->passthrough)
send_proc(ckp->generator, s); send_proc(ckp->generator, s);
else
send_proc(ckp->stratifier, s);
} }
free(s); free(s);
@ -936,11 +936,11 @@ static void passthrough_client(cdata_t *cdata, client_instance_t *client)
send_client(cdata, client->id, buf); send_client(cdata, client->id, buf);
} }
static void process_client_msg(cdata_t *cdata, const char *buf) static void process_client_msg(ckpool_t *ckp, cdata_t *cdata, const char *buf)
{ {
char *msg, *node_msg;
int64_t client_id; int64_t client_id;
json_t *json_msg; json_t *json_msg;
char *msg;
json_msg = json_loads(buf, 0, NULL); json_msg = json_loads(buf, 0, NULL);
if (unlikely(!json_msg)) { if (unlikely(!json_msg)) {
@ -953,8 +953,12 @@ static void process_client_msg(cdata_t *cdata, const char *buf)
json_object_del(json_msg, "client_id"); json_object_del(json_msg, "client_id");
/* Put client_id back in for a passthrough subclient, passing its /* Put client_id back in for a passthrough subclient, passing its
* upstream client_id instead of the passthrough's. */ * upstream client_id instead of the passthrough's. */
if (client_id > 0xffffffffll) if (ckp->node || client_id > 0xffffffffll) {
json_object_set_new_nocheck(json_msg, "client_id", json_integer(client_id & 0xffffffffll)); json_object_set_new_nocheck(json_msg, "client_id", json_integer(client_id & 0xffffffffll));
if (json_get_string(&node_msg, json_msg, "node.method"))
LOGDEBUG("Got node method %s", node_msg);
}
msg = json_dumps(json_msg, JSON_EOL); msg = json_dumps(json_msg, JSON_EOL);
send_client(cdata, client_id, msg); send_client(cdata, client_id, msg);
json_decref(json_msg); json_decref(json_msg);
@ -1055,7 +1059,7 @@ retry:
/* The bulk of the messages will be json messages to send to clients /* The bulk of the messages will be json messages to send to clients
* so look for them first. */ * so look for them first. */
if (likely(buf[0] == '{')) { if (likely(buf[0] == '{')) {
process_client_msg(cdata, buf); process_client_msg(ckp, cdata, buf);
} else if (cmdmatch(buf, "dropclient")) { } else if (cmdmatch(buf, "dropclient")) {
client_instance_t *client; client_instance_t *client;

55
src/stratifier.c

@ -2192,11 +2192,21 @@ static void stratum_broadcast(sdata_t *sdata, json_t *val)
mutex_unlock(ssends->lock); mutex_unlock(ssends->lock);
} }
static void stratum_add_send(sdata_t *sdata, json_t *val, const int64_t client_id) /* passthrough subclients have client_ids in the high bits */
static inline bool passthrough_subclient(const int64_t client_id)
{ {
smsg_t *msg; return (client_id > 0xffffffffll);
}
msg = ckzalloc(sizeof(smsg_t)); static void stratum_add_send(sdata_t *sdata, json_t *val, const int64_t client_id,
const int msg_type)
{
smsg_t *msg = ckzalloc(sizeof(smsg_t));
ckpool_t *ckp = sdata->ckp;
if (ckp->node || passthrough_subclient(client_id))
json_set_string(val, "node.method", stratum_msgs[msg_type]);
LOGDEBUG("Sending stratum message %s", stratum_msgs[msg_type]);
msg->json_msg = val; msg->json_msg = val;
msg->client_id = client_id; msg->client_id = client_id;
ckmsgq_add(sdata->ssends, msg); ckmsgq_add(sdata->ssends, msg);
@ -2595,7 +2605,7 @@ static void reconnect_client(sdata_t *sdata, stratum_instance_t *client)
client->reconnect_request = time(NULL); client->reconnect_request = time(NULL);
JSON_CPACK(json_msg, "{sosss[]}", "id", json_null(), "method", "client.reconnect", JSON_CPACK(json_msg, "{sosss[]}", "id", json_null(), "method", "client.reconnect",
"params"); "params");
stratum_add_send(sdata, json_msg, client->id); stratum_add_send(sdata, json_msg, client->id, SM_RECONNECT);
} }
static void dead_proxy(sdata_t *sdata, const char *buf) static void dead_proxy(sdata_t *sdata, const char *buf)
@ -3520,12 +3530,6 @@ out_unlock:
return ret; return ret;
} }
/* passthrough subclients have client_ids in the high bits */
static inline bool passthrough_subclient(const int64_t client_id)
{
return (client_id > 0xffffffffll);
}
/* Extranonce1 must be set here. Needs to be entered with client holding a ref /* Extranonce1 must be set here. Needs to be entered with client holding a ref
* count. */ * count. */
static json_t *parse_subscribe(stratum_instance_t *client, const int64_t client_id, const json_t *params_val) static json_t *parse_subscribe(stratum_instance_t *client, const int64_t client_id, const json_t *params_val)
@ -4251,7 +4255,7 @@ static void stratum_send_diff(sdata_t *sdata, const stratum_instance_t *client)
JSON_CPACK(json_msg, "{s[I]soss}", "params", client->diff, "id", json_null(), JSON_CPACK(json_msg, "{s[I]soss}", "params", client->diff, "id", json_null(),
"method", "mining.set_difficulty"); "method", "mining.set_difficulty");
stratum_add_send(sdata, json_msg, client->id); stratum_add_send(sdata, json_msg, client->id, SM_DIFF);
} }
/* Needs to be entered with client holding a ref count. */ /* Needs to be entered with client holding a ref count. */
@ -4261,7 +4265,7 @@ static void stratum_send_message(sdata_t *sdata, const stratum_instance_t *clien
JSON_CPACK(json_msg, "{sosss[s]}", "id", json_null(), "method", "client.show_message", JSON_CPACK(json_msg, "{sosss[s]}", "id", json_null(), "method", "client.show_message",
"params", msg); "params", msg);
stratum_add_send(sdata, json_msg, client->id); stratum_add_send(sdata, json_msg, client->id, SM_MSG);
} }
static double time_bias(const double tdiff, const double period) static double time_bias(const double tdiff, const double period)
@ -4897,7 +4901,7 @@ static void stratum_send_update(sdata_t *sdata, const int64_t client_id, const b
json_msg = __stratum_notify(sdata->current_workbase, clean); json_msg = __stratum_notify(sdata->current_workbase, clean);
ck_runlock(&sdata->workbase_lock); ck_runlock(&sdata->workbase_lock);
stratum_add_send(sdata, json_msg, client_id); stratum_add_send(sdata, json_msg, client_id, SM_UPDATE);
} }
static void send_json_err(sdata_t *sdata, const int64_t client_id, json_t *id_val, const char *err_msg) static void send_json_err(sdata_t *sdata, const int64_t client_id, json_t *id_val, const char *err_msg)
@ -4905,7 +4909,7 @@ static void send_json_err(sdata_t *sdata, const int64_t client_id, json_t *id_va
json_t *val; json_t *val;
JSON_CPACK(val, "{soss}", "id", json_deep_copy(id_val), "error", err_msg); JSON_CPACK(val, "{soss}", "id", json_deep_copy(id_val), "error", err_msg);
stratum_add_send(sdata, val, client_id); stratum_add_send(sdata, val, client_id, SM_ERROR);
} }
/* Needs to be entered with client holding a ref count. */ /* Needs to be entered with client holding a ref count. */
@ -5028,6 +5032,15 @@ static void parse_method(ckpool_t *ckp, sdata_t *sdata, stratum_instance_t *clie
return; return;
} }
if (cmdmatch(method, "mining.term")) {
LOGDEBUG("Mining terminate requested from %"PRId64" %s", client_id, client->address);
drop_client(ckp, sdata, client_id);
return;
}
if (ckp->node)
return;
if (cmdmatch(method, "mining.subscribe")) { if (cmdmatch(method, "mining.subscribe")) {
json_t *val, *result_val; json_t *val, *result_val;
@ -5046,7 +5059,7 @@ static void parse_method(ckpool_t *ckp, sdata_t *sdata, stratum_instance_t *clie
json_object_set_new_nocheck(val, "result", result_val); json_object_set_new_nocheck(val, "result", result_val);
json_object_set_nocheck(val, "id", id_val); json_object_set_nocheck(val, "id", id_val);
json_object_set_new_nocheck(val, "error", json_null()); json_object_set_new_nocheck(val, "error", json_null());
stratum_add_send(sdata, val, client_id); stratum_add_send(sdata, val, client_id, SM_SUBSCRIBERESULT);
if (likely(client->subscribed)) if (likely(client->subscribed))
init_client(sdata, client, client_id); init_client(sdata, client, client_id);
return; return;
@ -5118,12 +5131,6 @@ static void parse_method(ckpool_t *ckp, sdata_t *sdata, stratum_instance_t *clie
return; return;
} }
if (cmdmatch(method, "mining.term")) {
LOGDEBUG("Mining terminate requested from %"PRId64" %s", client_id, client->address);
drop_client(ckp, sdata, client_id);
return;
}
/* Unhandled message here */ /* Unhandled message here */
LOGINFO("Unhandled client %"PRId64" %s method %s", client_id, client->address, method); LOGINFO("Unhandled client %"PRId64" %s method %s", client_id, client->address, method);
return; return;
@ -5319,7 +5326,7 @@ static void sshare_process(ckpool_t *ckp, json_params_t *jp)
json_object_set_new_nocheck(json_msg, "result", result_val); json_object_set_new_nocheck(json_msg, "result", result_val);
json_object_set_new_nocheck(json_msg, "error", err_val ? err_val : json_null()); json_object_set_new_nocheck(json_msg, "error", err_val ? err_val : json_null());
steal_json_id(json_msg, jp); steal_json_id(json_msg, jp);
stratum_add_send(sdata, json_msg, client_id); stratum_add_send(sdata, json_msg, client_id, SM_SHARERESULT);
out_decref: out_decref:
dec_instance_ref(sdata, client); dec_instance_ref(sdata, client);
out: out:
@ -5381,7 +5388,7 @@ static void sauth_process(ckpool_t *ckp, json_params_t *jp)
json_object_set_new_nocheck(json_msg, "result", result_val); json_object_set_new_nocheck(json_msg, "result", result_val);
json_object_set_new_nocheck(json_msg, "error", err_val ? err_val : json_null()); json_object_set_new_nocheck(json_msg, "error", err_val ? err_val : json_null());
steal_json_id(json_msg, jp); steal_json_id(json_msg, jp);
stratum_add_send(sdata, json_msg, client_id); stratum_add_send(sdata, json_msg, client_id, SM_AUTHRESULT);
if (!json_is_true(result_val) || !client->suggest_diff) if (!json_is_true(result_val) || !client->suggest_diff)
goto out; goto out;
@ -5596,7 +5603,7 @@ static void send_transactions(ckpool_t *ckp, json_params_t *jp)
} else } else
json_set_string(val, "error", "Invalid job_id"); json_set_string(val, "error", "Invalid job_id");
out_send: out_send:
stratum_add_send(sdata, val, jp->client_id); stratum_add_send(sdata, val, jp->client_id, SM_TXNSRESULT);
out: out:
if (client) if (client)
dec_instance_ref(sdata, client); dec_instance_ref(sdata, client);

Loading…
Cancel
Save