Browse Source

Create a hashtable of shares we submit in proxy mode to compare responses for future logging

master
Con Kolivas 11 years ago
parent
commit
f2361b921b
  1. 93
      src/generator.c
  2. 13
      src/stratifier.c

93
src/generator.c

@ -43,6 +43,17 @@ struct notify_instance {
typedef struct notify_instance notify_instance_t; typedef struct notify_instance notify_instance_t;
struct share_msg {
UT_hash_handle hh;
int id; // Our own id for submitting upstream
int client_id;
int msg_id; // Stratum message id from client
time_t submit_time;
};
typedef struct share_msg share_msg_t;
/* Per proxied pool instance data */ /* Per proxied pool instance data */
struct proxy_instance { struct proxy_instance {
ckpool_t *ckp; ckpool_t *ckp;
@ -83,6 +94,10 @@ struct proxy_instance {
pthread_cond_t psend_cond; pthread_cond_t psend_cond;
stratum_msg_t *psends; stratum_msg_t *psends;
pthread_mutex_t share_lock;
share_msg_t *shares;
int share_id;
}; };
typedef struct proxy_instance proxy_instance_t; typedef struct proxy_instance proxy_instance_t;
@ -710,10 +725,26 @@ static void send_diff(proxy_instance_t *proxi, int sockd)
static void submit_share(proxy_instance_t *proxi, json_t *val) static void submit_share(proxy_instance_t *proxi, json_t *val)
{ {
stratum_msg_t *msg; stratum_msg_t *msg;
share_msg_t *share;
msg = ckzalloc(sizeof(stratum_msg_t)); msg = ckzalloc(sizeof(stratum_msg_t));
share = ckzalloc(sizeof(share_msg_t));
share->submit_time = time(NULL);
share->client_id = json_integer_value(json_object_get(val, "client_id"));
share->msg_id = json_integer_value(json_object_get(val, "msg_id"));
json_object_del(val, "client_id");
json_object_del(val, "msg_id");
msg->json_msg = val; msg->json_msg = val;
/* Add new share entry to the share hashtable */
mutex_lock(&proxi->share_lock);
share->id = proxi->share_id++;
HASH_ADD_INT(proxi->shares, id, share);
mutex_unlock(&proxi->share_lock);
json_object_set_nocheck(val, "id", json_integer(share->id));
/* Add the new message to the psend list */
mutex_lock(&proxi->psend_lock); mutex_lock(&proxi->psend_lock);
DL_APPEND(proxi->psends, msg); DL_APPEND(proxi->psends, msg);
pthread_cond_signal(&proxi->psend_cond); pthread_cond_signal(&proxi->psend_cond);
@ -803,6 +834,46 @@ static void clear_notify(notify_instance_t *ni)
free(ni->coinbase2); free(ni->coinbase2);
} }
/* FIXME: Return something useful to the stratifier based on this result */
static bool parse_share(ckpool_t *ckp, proxy_instance_t *proxi, const char *buf)
{
json_t *val = NULL, *idval;
share_msg_t *share;
bool ret = false;
int id;
val = json_loads(buf, 0, NULL);
if (!val) {
LOGINFO("Failed to parse json msg: %s", buf);
goto out;
}
idval = json_object_get(val, "id");
if (!idval) {
LOGINFO("Failed to find id in json msg: %s", buf);
goto out;
}
id = json_integer_value(idval);
mutex_lock(&proxi->share_lock);
HASH_FIND_INT(proxi->shares, &id, share);
if (share)
HASH_DEL(proxi->shares, share);
mutex_unlock(&proxi->share_lock);
if (!share) {
LOGINFO("Failed to find matching share to result: %s", buf);
goto out;
}
ret = true;
LOGDEBUG("Found share from client %d with msg_id %d", share->client_id,
share->msg_id);
free(share);
out:
if (val)
json_decref(val);
return ret;
}
static void *proxy_recv(void *arg) static void *proxy_recv(void *arg)
{ {
proxy_instance_t *proxi = (proxy_instance_t *)arg; proxy_instance_t *proxi = (proxy_instance_t *)arg;
@ -813,16 +884,17 @@ static void *proxy_recv(void *arg)
while (42) { while (42) {
notify_instance_t *ni, *tmp; notify_instance_t *ni, *tmp;
share_msg_t *share, *tmpshare;
time_t now; time_t now;
int ret; int ret;
now = time(NULL); now = time(NULL);
/* Age old notifications older than 10 mins old */
mutex_lock(&proxi->notify_lock); mutex_lock(&proxi->notify_lock);
HASH_ITER(hh, proxi->notify_instances, ni, tmp) { HASH_ITER(hh, proxi->notify_instances, ni, tmp) {
if (HASH_COUNT(proxi->notify_instances) < 3) if (HASH_COUNT(proxi->notify_instances) < 3)
break; break;
/* Age old notifications older than 10 mins old */
if (ni->notify_time < now - 600) { if (ni->notify_time < now - 600) {
HASH_DEL(proxi->notify_instances, ni); HASH_DEL(proxi->notify_instances, ni);
clear_notify(ni); clear_notify(ni);
@ -830,6 +902,15 @@ static void *proxy_recv(void *arg)
} }
mutex_unlock(&proxi->notify_lock); mutex_unlock(&proxi->notify_lock);
/* Similary with shares older than 2 mins without response */
mutex_lock(&proxi->share_lock);
HASH_ITER(hh, proxi->shares, share, tmpshare) {
if (share->submit_time < now - 120) {
HASH_DEL(proxi->shares, share);
}
}
mutex_unlock(&proxi->share_lock);
ret = read_socket_line(cs, 120); ret = read_socket_line(cs, 120);
if (ret < 1) { if (ret < 1) {
LOGWARNING("Failed to read_socket_line in proxy_recv, attempting reconnect"); LOGWARNING("Failed to read_socket_line in proxy_recv, attempting reconnect");
@ -847,6 +928,10 @@ static void *proxy_recv(void *arg)
} }
continue; continue;
} }
if (parse_share(ckp, proxi, cs->buf)) {
continue;
}
/* If it's not a method it should be a share result */
LOGWARNING("Unhandled stratum message: %s", cs->buf); LOGWARNING("Unhandled stratum message: %s", cs->buf);
} }
return NULL; return NULL;
@ -890,12 +975,12 @@ static void *proxy_send(void *arg)
LOGWARNING("Failed to find matching jobid in proxysend"); LOGWARNING("Failed to find matching jobid in proxysend");
continue; continue;
} }
/* FIXME Use unique IDs and parse responses */ val = json_pack("{s[ssooo]soss}", "params", proxi->auth, jobid,
val = json_pack("{s[ssooo]siss}", "params", proxi->auth, jobid,
json_object_get(msg->json_msg, "nonce2"), json_object_get(msg->json_msg, "nonce2"),
json_object_get(msg->json_msg, "ntime"), json_object_get(msg->json_msg, "ntime"),
json_object_get(msg->json_msg, "nonce"), json_object_get(msg->json_msg, "nonce"),
"id", 0, "method", "mining.submit"); "id", json_object_get(msg->json_msg, "id"),
"method", "mining.submit");
free(jobid); free(jobid);
send_json_msg(cs, val); send_json_msg(cs, val);
json_decref(val); json_decref(val);

13
src/stratifier.c

@ -1224,7 +1224,7 @@ out_unlock:
/* Submit a share in proxy mode to the parent pool. workbase_lock is held */ /* Submit a share in proxy mode to the parent pool. workbase_lock is held */
static void submit_share(stratum_instance_t *client, uint64_t jobid, const char *nonce2, static void submit_share(stratum_instance_t *client, uint64_t jobid, const char *nonce2,
const char *ntime, const char *nonce) const char *ntime, const char *nonce, int msg_id)
{ {
ckpool_t *ckp = client->ckp; ckpool_t *ckp = client->ckp;
json_t *json_msg; json_t *json_msg;
@ -1232,8 +1232,9 @@ static void submit_share(stratum_instance_t *client, uint64_t jobid, const char
char *msg; char *msg;
sprintf(enonce2, "%s%s", client->enonce1var, nonce2); sprintf(enonce2, "%s%s", client->enonce1var, nonce2);
json_msg = json_pack("{sissssss}", "jobid", jobid, "nonce2", enonce2, json_msg = json_pack("{sisssssssisi}", "jobid", jobid, "nonce2", enonce2,
"ntime", ntime, "nonce", nonce); "ntime", ntime, "nonce", nonce, "client_id", client->id,
"msg_id", msg_id);
msg = json_dumps(json_msg, 0); msg = json_dumps(json_msg, 0);
json_decref(json_msg); json_decref(json_msg);
send_proc(ckp->generator, msg); send_proc(ckp->generator, msg);
@ -1336,7 +1337,7 @@ out_unlock:
ck_runlock(&workbase_lock); ck_runlock(&workbase_lock);
if (submit) if (submit)
submit_share(client, id, nonce2, ntime, nonce); submit_share(client, id, nonce2, ntime, nonce, json_integer_value(json_object_get(json_msg, "id")));
/* Accept the lower of new and old diffs until the next update */ /* Accept the lower of new and old diffs until the next update */
if (id < client->diff_change_job_id && client->old_diff < client->diff) if (id < client->diff_change_job_id && client->old_diff < client->diff)
@ -1529,6 +1530,7 @@ static void parse_instance_msg(int client_id, json_t *msg)
err_val = json_string("-1:id not found"); err_val = json_string("-1:id not found");
goto out; goto out;
} }
json_object_set_nocheck(json_msg, "id", id_val);
#if 0 #if 0
/* Random broken clients send something not an integer so just use the /* Random broken clients send something not an integer so just use the
* json object, whatever it is. */ * json object, whatever it is. */
@ -1553,14 +1555,13 @@ static void parse_instance_msg(int client_id, json_t *msg)
} }
result_val = gen_json_result(client_id, json_msg, method, params, result_val = gen_json_result(client_id, json_msg, method, params,
&err_val, &update); &err_val, &update);
if (unlikely(!result_val)) { if (!result_val) {
json_decref(json_msg); json_decref(json_msg);
return; return;
} }
if (!err_val) if (!err_val)
err_val = json_null(); err_val = json_null();
out: out:
json_object_set_nocheck(json_msg, "id", id_val);
json_object_set_nocheck(json_msg, "error", err_val); json_object_set_nocheck(json_msg, "error", err_val);
json_object_set_nocheck(json_msg, "result", result_val); json_object_set_nocheck(json_msg, "result", result_val);

Loading…
Cancel
Save