kanoi 10 years ago
parent
commit
27ef322f63
  1. 109
      src/stratifier.c

109
src/stratifier.c

@ -184,6 +184,7 @@ static ckmsgq_t *srecvs; // Stratum receives
static ckmsgq_t *ckdbq; // ckdb
static ckmsgq_t *sshareq; // Stratum share sends
static ckmsgq_t *sauthq; // Stratum authorisations
static ckmsgq_t *stxnq; // Transaction requests
static int64_t user_instance_id;
@ -242,6 +243,8 @@ struct stratum_instance {
int64_t user_id;
ckpool_t *ckp;
time_t last_txns; /* Last time this worker requested txn hashes */
};
typedef struct stratum_instance stratum_instance_t;
@ -1100,10 +1103,9 @@ static int stratum_loop(ckpool_t *ckp, proc_instance_t *pi)
{
int sockd, ret = 0, selret = 0;
unixsock_t *us = &pi->us;
tv_t start_tv = {0, 0};
char *buf = NULL;
tv_t start_tv;
tv_time(&start_tv);
retry:
do {
if (!ckp->proxy) {
@ -2124,7 +2126,7 @@ static void parse_method(const int64_t client_id, json_t *id_val, json_t *method
ck_runlock(&instance_lock);
if (unlikely(!client)) {
LOGINFO("Failed to find client id %d in hashtable!", client_id);
LOGINFO("Failed to find client id %ld in hashtable!", client_id);
return;
}
@ -2190,6 +2192,13 @@ static void parse_method(const int64_t client_id, json_t *id_val, json_t *method
return;
}
/* Covers both get_transactions and get_txnhashes */
if (cmdmatch(method, "mining.get")) {
json_params_t *jp = create_json_params(client_id, method_val, id_val, address);
ckmsgq_add(stxnq, jp);
return;
}
/* Unhandled message here */
}
@ -2398,6 +2407,99 @@ static void ckdbq_process(ckpool_t *ckp, char *msg)
free(buf);
}
static int transactions_by_jobid(int64_t id)
{
workbase_t *wb;
int ret = -1;
ck_rlock(&workbase_lock);
HASH_FIND_I64(workbases, &id, wb);
if (wb)
ret = wb->transactions;
ck_runlock(&workbase_lock);
return ret;
}
static json_t *txnhashes_by_jobid(int64_t id)
{
json_t *ret = NULL;
workbase_t *wb;
ck_rlock(&workbase_lock);
HASH_FIND_I64(workbases, &id, wb);
if (wb)
ret = json_string(wb->txn_hashes);
ck_runlock(&workbase_lock);
return ret;
}
static void send_transactions(ckpool_t *ckp, json_params_t *jp)
{
const char *msg = json_string_value(jp->params);
stratum_instance_t *client;
json_t *val, *hashes;
int64_t job_id = 0;
time_t now_t;
if (unlikely(!msg || !strlen(msg))) {
LOGWARNING("send_transactions received null method");
goto out;
}
val = json_object();
json_object_set_nocheck(val, "id", jp->id_val);
if (cmdmatch(msg, "mining.get_transactions")) {
int txns;
/* We don't actually send the transactions as that would use
* up huge bandwidth, so we just return the number of
* transactions :) */
sscanf(msg, "mining.get_transactions(%lx", &job_id);
txns = transactions_by_jobid(job_id);
if (txns != -1) {
json_set_int(val, "result", txns);
json_object_set_new_nocheck(val, "error", json_null());
} else
json_set_string(val, "error", "Invalid job_id");
goto out_send;
}
if (!cmdmatch(msg, "mining.get_txnhashes")) {
LOGDEBUG("Unhandled mining get request: %s", msg);
json_set_string(val, "error", "Unhandled");
goto out_send;
}
ck_rlock(&instance_lock);
client = __instance_by_id(jp->client_id);
ck_runlock(&instance_lock);
if (unlikely(!client)) {
LOGINFO("send_transactions failed to find client id %ld in hashtable!",
jp->client_id);
goto out;
}
now_t = time(NULL);
if (now_t - client->last_txns < ckp->update_interval) {
LOGNOTICE("Rate limiting get_txnhashes on client %ld!", jp->client_id);
json_set_string(val, "error", "Ratelimit");
goto out_send;
}
client->last_txns = now_t;
sscanf(msg, "mining.get_txnhashes(%lx", &job_id);
hashes = txnhashes_by_jobid(job_id);
if (hashes) {
json_object_set_new_nocheck(val, "result", hashes);
json_object_set_new_nocheck(val, "error", json_null());
} else
json_set_string(val, "error", "Invalid job_id");
out_send:
stratum_add_send(val, jp->client_id);
out:
discard_json_params(&jp);
}
static const double nonces = 4294967296;
/* Called every 20 seconds, we send the updated stats to ckdb of those users
@ -2731,6 +2833,7 @@ int stratifier(proc_instance_t *pi)
sshareq = create_ckmsgq(ckp, "sprocessor", &sshare_process);
sauthq = create_ckmsgq(ckp, "authoriser", &sauth_process);
ckdbq = create_ckmsgq(ckp, "ckdbqueue", &ckdbq_process);
stxnq = create_ckmsgq(ckp, "stxnq", &send_transactions);
cklock_init(&workbase_lock);
if (!ckp->proxy)

Loading…
Cancel
Save