Browse Source

Add client reference from moment we receive a message

master
Con Kolivas 10 years ago
parent
commit
7b450f16b6
  1. 61
      src/stratifier.c

61
src/stratifier.c

@ -3126,24 +3126,18 @@ static void suggest_diff(stratum_instance_t *client, const char *method, json_t
stratum_send_diff(sdata, client); stratum_send_diff(sdata, client);
} }
static void parse_method(sdata_t *sdata, const int64_t client_id, json_t *id_val, /* Enter with client holding ref count */
json_t *method_val, json_t *params_val, char *address) static void parse_method(sdata_t *sdata, stratum_instance_t *client, const int64_t client_id,
json_t *id_val, json_t *method_val, json_t *params_val, char *address)
{ {
stratum_instance_t *client;
const char *method; const char *method;
char buf[256]; char buf[256];
client = ref_instance_by_id(sdata, client_id);
if (unlikely(!client)) {
LOGINFO("Failed to find client id %ld in hashtable!", client_id);
return;
}
if (unlikely(client->reject == 2)) { if (unlikely(client->reject == 2)) {
LOGINFO("Dropping client %"PRId64" tagged for lazy invalidation", client_id); LOGINFO("Dropping client %"PRId64" tagged for lazy invalidation", client_id);
snprintf(buf, 255, "dropclient=%ld", client->id); snprintf(buf, 255, "dropclient=%ld", client_id);
send_proc(client->ckp->connector, buf); send_proc(client->ckp->connector, buf);
goto out; return;
} }
/* Random broken clients send something not an integer as the id so we copy /* Random broken clients send something not an integer as the id so we copy
@ -3155,7 +3149,7 @@ static void parse_method(sdata_t *sdata, const int64_t client_id, json_t *id_val
/* Shouldn't happen, sanity check */ /* Shouldn't happen, sanity check */
if (unlikely(!result_val)) { if (unlikely(!result_val)) {
LOGWARNING("parse_subscribe returned NULL result_val"); LOGWARNING("parse_subscribe returned NULL result_val");
goto out; return;
} }
val = json_object(); val = json_object();
json_object_set_new_nocheck(val, "result", result_val); json_object_set_new_nocheck(val, "result", result_val);
@ -3164,10 +3158,11 @@ static void parse_method(sdata_t *sdata, const int64_t client_id, json_t *id_val
stratum_add_send(sdata, val, client_id); stratum_add_send(sdata, val, client_id);
if (likely(client->subscribed)) if (likely(client->subscribed))
update_client(sdata, client, client_id); update_client(sdata, client, client_id);
goto out; return;
} }
if (unlikely(cmdmatch(method, "mining.passthrough"))) { if (unlikely(cmdmatch(method, "mining.passthrough"))) {
LOGNOTICE("Adding passthrough client %ld", client_id);
/* We need to inform the connector process that this client /* We need to inform the connector process that this client
* is a passthrough and to manage its messages accordingly. * is a passthrough and to manage its messages accordingly.
* Remove this instance since the client id may well be * Remove this instance since the client id may well be
@ -3178,17 +3173,16 @@ static void parse_method(sdata_t *sdata, const int64_t client_id, json_t *id_val
__add_dead(sdata, client); __add_dead(sdata, client);
ck_wunlock(&sdata->instance_lock); ck_wunlock(&sdata->instance_lock);
LOGNOTICE("Adding passthrough client %ld", client->id); snprintf(buf, 255, "passthrough=%ld", client_id);
snprintf(buf, 255, "passthrough=%ld", client->id);
send_proc(client->ckp->connector, buf); send_proc(client->ckp->connector, buf);
goto out; return;
} }
if (cmdmatch(method, "mining.auth") && client->subscribed) { if (cmdmatch(method, "mining.auth") && client->subscribed) {
json_params_t *jp = create_json_params(client_id, method_val, params_val, id_val, address); json_params_t *jp = create_json_params(client_id, method_val, params_val, id_val, address);
ckmsgq_add(sdata->sauthq, jp); ckmsgq_add(sdata->sauthq, jp);
goto out; return;
} }
/* We should only accept authorised requests from here on */ /* We should only accept authorised requests from here on */
@ -3196,22 +3190,22 @@ static void parse_method(sdata_t *sdata, const int64_t client_id, json_t *id_val
/* Dropping unauthorised clients here also allows the /* Dropping unauthorised clients here also allows the
* stratifier process to restart since it will have lost all * stratifier process to restart since it will have lost all
* the stratum instance data. Clients will just reconnect. */ * the stratum instance data. Clients will just reconnect. */
LOGINFO("Dropping unauthorised client %ld", client->id); LOGINFO("Dropping unauthorised client %ld", client_id);
snprintf(buf, 255, "dropclient=%ld", client->id); snprintf(buf, 255, "dropclient=%ld", client_id);
send_proc(client->ckp->connector, buf); send_proc(client->ckp->connector, buf);
goto out; return;
} }
if (cmdmatch(method, "mining.submit")) { if (cmdmatch(method, "mining.submit")) {
json_params_t *jp = create_json_params(client_id, method_val, params_val, id_val, address); json_params_t *jp = create_json_params(client_id, method_val, params_val, id_val, address);
ckmsgq_add(sdata->sshareq, jp); ckmsgq_add(sdata->sshareq, jp);
goto out; return;
} }
if (cmdmatch(method, "mining.suggest")) { if (cmdmatch(method, "mining.suggest")) {
suggest_diff(client, method, params_val); suggest_diff(client, method, params_val);
goto out; return;
} }
/* Covers both get_transactions and get_txnhashes */ /* Covers both get_transactions and get_txnhashes */
@ -3219,14 +3213,15 @@ static void parse_method(sdata_t *sdata, const int64_t client_id, json_t *id_val
json_params_t *jp = create_json_params(client_id, method_val, params_val, id_val, address); json_params_t *jp = create_json_params(client_id, method_val, params_val, id_val, address);
ckmsgq_add(sdata->stxnq, jp); ckmsgq_add(sdata->stxnq, jp);
goto out; return;
} }
/* Unhandled message here */ /* Unhandled message here */
out: LOGINFO("Unhandled client %ld method %s", client_id, method);
dec_instance_ref(sdata, client); return;
} }
static void parse_instance_msg(sdata_t *sdata, smsg_t *msg) /* Entered with client holding ref count */
static void parse_instance_msg(sdata_t *sdata, smsg_t *msg, stratum_instance_t *client)
{ {
json_t *val = msg->json_msg, *id_val, *method, *params; json_t *val = msg->json_msg, *id_val, *method, *params;
int64_t client_id = msg->client_id; int64_t client_id = msg->client_id;
@ -3257,7 +3252,7 @@ static void parse_instance_msg(sdata_t *sdata, smsg_t *msg)
send_json_err(sdata, client_id, id_val, "-1:params not found"); send_json_err(sdata, client_id, id_val, "-1:params not found");
goto out; goto out;
} }
parse_method(sdata, client_id, id_val, method, params, msg->address); parse_method(sdata, client, client_id, id_val, method, params, msg->address);
out: out:
json_decref(val); json_decref(val);
free(msg); free(msg);
@ -3266,6 +3261,7 @@ out:
static void srecv_process(ckpool_t *ckp, char *buf) static void srecv_process(ckpool_t *ckp, char *buf)
{ {
sdata_t *sdata = ckp->data; sdata_t *sdata = ckp->data;
stratum_instance_t *client;
smsg_t *msg; smsg_t *msg;
json_t *val; json_t *val;
int server; int server;
@ -3310,12 +3306,15 @@ static void srecv_process(ckpool_t *ckp, char *buf)
/* Parse the message here */ /* Parse the message here */
ck_wlock(&sdata->instance_lock); ck_wlock(&sdata->instance_lock);
/* client_id instance doesn't exist yet, create one */ client = __instance_by_id(sdata, msg->client_id);
if (!__instance_by_id(sdata, msg->client_id)) /* If client_id instance doesn't exist yet, create one */
__stratum_add_instance(ckp, msg->client_id, server); if (unlikely(!client))
client = __stratum_add_instance(ckp, msg->client_id, server);
__inc_instance_ref(client);
ck_wunlock(&sdata->instance_lock); ck_wunlock(&sdata->instance_lock);
parse_instance_msg(sdata, msg); parse_instance_msg(sdata, msg, client);
dec_instance_ref(sdata, client);
out: out:
free(buf); free(buf);
} }

Loading…
Cancel
Save