Browse Source

Check for valid shares in redirector and redirect for first url for now

master
Con Kolivas 10 years ago
parent
commit
03edcc03cd
  1. 17
      src/ckpool.c
  2. 1
      src/ckpool.h
  3. 2
      src/connector.c
  4. 81
      src/generator.c

17
src/ckpool.c

@ -1163,7 +1163,7 @@ bool json_get_bool(bool *store, const json_t *val, const char *res)
goto out; goto out;
} }
if (!json_is_boolean(entry)) { if (!json_is_boolean(entry)) {
LOGWARNING("Json entry %s is not a boolean", res); LOGINFO("Json entry %s is not a boolean", res);
goto out; goto out;
} }
*store = json_is_true(entry); *store = json_is_true(entry);
@ -1262,6 +1262,7 @@ static bool parse_redirecturls(ckpool_t *ckp, const json_t *arr_val)
{ {
bool ret = false; bool ret = false;
int arr_size, i; int arr_size, i;
char redirecturl[INET6_ADDRSTRLEN], url[INET6_ADDRSTRLEN], port[8];
if (!arr_val) if (!arr_val)
goto out; goto out;
@ -1276,11 +1277,15 @@ static bool parse_redirecturls(ckpool_t *ckp, const json_t *arr_val)
} }
ckp->redirecturls = arr_size; ckp->redirecturls = arr_size;
ckp->redirecturl = ckalloc(sizeof(char *) * arr_size); ckp->redirecturl = ckalloc(sizeof(char *) * arr_size);
ckp->redirectport = ckalloc(sizeof(char *) * arr_size);
for (i = 0; i < arr_size; i++) { for (i = 0; i < arr_size; i++) {
json_t *val = json_array_get(arr_val, i); json_t *val = json_array_get(arr_val, i);
if (!_json_get_string(&ckp->redirecturl[i], val, "redirecturl")) strncpy(redirecturl, json_string_value(val), INET6_ADDRSTRLEN - 1);
LOGWARNING("Invalid redirecturl entry number %d", i); if (!url_from_serverurl(redirecturl, url, port))
quit(1, "Invalid redirecturl entry %d %s", i, redirecturl);
ckp->redirecturl[i] = strdup(url);
ckp->redirectport[i] = strdup(port);
} }
ret = true; ret = true;
out: out:
@ -1609,10 +1614,10 @@ int main(int argc, char **argv)
} }
if (!ckp.name) { if (!ckp.name) {
if (ckp.proxy) if (ckp.redirector)
ckp.name = "ckproxy";
else if (ckp.redirector)
ckp.name = "ckredirector"; ckp.name = "ckredirector";
else if (ckp.proxy)
ckp.name = "ckproxy";
else if (ckp.passthrough) else if (ckp.passthrough)
ckp.name = "ckpassthrough"; ckp.name = "ckpassthrough";
else else

1
src/ckpool.h

@ -225,6 +225,7 @@ struct ckpool_instance {
/* Passthrough redirect options */ /* Passthrough redirect options */
int redirecturls; int redirecturls;
char **redirecturl; char **redirecturl;
char **redirectport;
/* Private data for each process */ /* Private data for each process */
void *data; void *data;

2
src/connector.c

@ -733,7 +733,7 @@ static void process_client_msg(cdata_t *cdata, const char *buf)
json_msg = json_loads(buf, 0, NULL); json_msg = json_loads(buf, 0, NULL);
if (unlikely(!json_msg)) { if (unlikely(!json_msg)) {
LOGWARNING("Invalid json message: %s", buf); LOGWARNING("Invalid json message in process_client_msg: %s", buf);
return; return;
} }

81
src/generator.c

@ -1375,17 +1375,22 @@ static void stratifier_reconnect_client(ckpool_t *ckp, const int64_t id)
/* Add a share to the gdata share hashlist. Returns the share id */ /* Add a share to the gdata share hashlist. Returns the share id */
static int add_share(gdata_t *gdata, const int64_t client_id, const double diff) static int add_share(gdata_t *gdata, const int64_t client_id, const double diff)
{ {
share_msg_t *share = ckzalloc(sizeof(share_msg_t)); share_msg_t *share = ckzalloc(sizeof(share_msg_t)), *tmpshare;
time_t now;
int ret; int ret;
share->submit_time = time(NULL); share->submit_time = now = time(NULL);
share->client_id = client_id; share->client_id = client_id;
share->diff = diff; share->diff = diff;
/* Add new share entry to the share hashtable */ /* Add new share entry to the share hashtable. Age old shares */
mutex_lock(&gdata->share_lock); mutex_lock(&gdata->share_lock);
ret = share->id = gdata->share_id++; ret = share->id = gdata->share_id++;
HASH_ADD_I64(gdata->shares, id, share); HASH_ADD_I64(gdata->shares, id, share);
HASH_ITER(hh, gdata->shares, share, tmpshare) {
if (share->submit_time < now - 120)
HASH_DEL(gdata->shares, share);
}
mutex_unlock(&gdata->share_lock); mutex_unlock(&gdata->share_lock);
return ret; return ret;
@ -1479,7 +1484,7 @@ static void account_shares(proxy_instance_t *proxy, const double diff, const boo
/* Returns zero if it is not recognised as a share, 1 if it is a valid share /* Returns zero if it is not recognised as a share, 1 if it is a valid share
* and -1 if it is recognised as a share but invalid. */ * and -1 if it is recognised as a share but invalid. */
static int parse_share(gdata_t *gdata, proxy_instance_t *proxi, const char *buf) static int parse_share(ckpool_t *ckp, gdata_t *gdata, proxy_instance_t *proxi, const char *buf)
{ {
json_t *val = NULL, *idval; json_t *val = NULL, *idval;
bool result = false; bool result = false;
@ -1500,6 +1505,7 @@ static int parse_share(gdata_t *gdata, proxy_instance_t *proxi, const char *buf)
id = json_integer_value(idval); id = json_integer_value(idval);
if (unlikely(!json_get_bool(&result, val, "result"))) { if (unlikely(!json_get_bool(&result, val, "result"))) {
LOGINFO("Failed to find result in upstream json msg: %s", buf); LOGINFO("Failed to find result in upstream json msg: %s", buf);
goto out;
} }
mutex_lock(&gdata->share_lock); mutex_lock(&gdata->share_lock);
@ -1513,11 +1519,13 @@ static int parse_share(gdata_t *gdata, proxy_instance_t *proxi, const char *buf)
proxi->id, proxi->subid, buf); proxi->id, proxi->subid, buf);
/* We don't know what diff these shares are so assume the /* We don't know what diff these shares are so assume the
* current proxy diff. */ * current proxy diff. */
if (!ckp->redirector)
account_shares(proxi, proxi->diff, result); account_shares(proxi, proxi->diff, result);
ret = -1; ret = -1;
goto out; goto out;
} }
ret = 1; ret = 1;
if (!ckp->redirector)
account_shares(proxi, share->diff, result); account_shares(proxi, share->diff, result);
LOGINFO("Proxy %d:%d share result %s from client %d", proxi->id, proxi->subid, LOGINFO("Proxy %d:%d share result %s from client %d", proxi->id, proxi->subid,
buf, share->client_id); buf, share->client_id);
@ -1692,11 +1700,36 @@ static void *proxy_send(void *arg)
return NULL; return NULL;
} }
static void passthrough_send(ckpool_t __maybe_unused *ckp, pass_msg_t *pm) static void parse_redirector_share(ckpool_t *ckp, const char *msg)
{
int64_t client_id;
json_t *val;
val = json_loads(msg, 0, NULL);
if (unlikely(!val)) {
LOGWARNING("Invalid json message in parse_redirector_share: %s", msg);
return;
}
/* Extract the client id from the json message */
client_id = json_integer_value(json_object_get(val, "client_id"));
/* Make sure this is a passthrough client value! */
if (unlikely(client_id < 0xffffffffll)) {
LOGERR("redirect_client got invalid client id %"PRId64, client_id);
goto out;
}
/* Diff is irrelevant here as we don't keep track of it so use 0 */
add_share(ckp->data, client_id, 0);
out:
json_decref(val);
}
static void passthrough_send(ckpool_t *ckp, pass_msg_t *pm)
{ {
int len, sent; int len, sent;
LOGDEBUG("Sending upstream json msg: %s", pm->msg); LOGDEBUG("Sending upstream json msg: %s", pm->msg);
if (ckp->redirector && strstr(pm->msg, "mining.submit"))
parse_redirector_share(ckp, pm->msg);
len = strlen(pm->msg); len = strlen(pm->msg);
sent = write_socket(pm->cs->fd, pm->msg, len); sent = write_socket(pm->cs->fd, pm->msg, len);
if (sent != len) { if (sent != len) {
@ -1875,6 +1908,34 @@ static void reconnect_proxy(proxy_instance_t *proxi)
create_pthread(&pth, proxy_reconnect, proxi); create_pthread(&pth, proxy_reconnect, proxi);
} }
static void redirect_client(ckpool_t *ckp, const char *buf)
{
json_t *json_msg, *val;
int64_t client_id;
char *msg;
json_msg = json_loads(buf, 0, NULL);
if (unlikely(!json_msg)) {
LOGWARNING("Invalid json message in redirect_client: %s", buf);
return;
}
/* Extract the client id from the json message */
client_id = json_integer_value(json_object_get(json_msg, "client_id"));
/* Make sure this is a passthrough client value! */
if (unlikely(client_id < 0xffffffffll)) {
LOGERR("redirect_client got invalid client id %"PRId64, client_id);
goto out;
}
JSON_CPACK(val, "{sIsosss[ssi]}", "id", "client_id", client_id, json_null(),
"method", "client.reconnect", "params", ckp->redirecturl[0], ckp->redirectport[0], 0);
msg = json_dumps(val, JSON_NO_UTF8 | JSON_PRESERVE_ORDER);
json_decref(val);
send_proc(ckp->connector, msg);
free(msg);
out:
json_decref(json_msg);
}
/* For receiving messages from an upstream pool to pass downstream. Responsible /* For receiving messages from an upstream pool to pass downstream. Responsible
* for setting up the connection and testing pool is live. */ * for setting up the connection and testing pool is live. */
static void *passthrough_recv(void *arg) static void *passthrough_recv(void *arg)
@ -1928,6 +1989,12 @@ static void *passthrough_recv(void *arg)
* process. Possibly parse parameters sent by upstream pool * process. Possibly parse parameters sent by upstream pool
* here */ * here */
send_proc(ckp->connector, cs->buf); send_proc(ckp->connector, cs->buf);
/* If we're a redirecting passthrough, look for a share
* responses here and redirect on a valid share. */
if (ckp->redirector && parse_share(ckp, ckp->data, proxi, cs->buf) > 0)
redirect_client(ckp, cs->buf);
} }
return NULL; return NULL;
} }
@ -2046,7 +2113,7 @@ static void *proxy_recv(void *arg)
if (parse_method(ckp, subproxy, cs->buf)) if (parse_method(ckp, subproxy, cs->buf))
continue; continue;
/* If it's not a method it should be a share result */ /* If it's not a method it should be a share result */
if (!parse_share(gdata, subproxy, cs->buf)) { if (!parse_share(ckp, gdata, subproxy, cs->buf)) {
LOGNOTICE("Proxy %d:%d unhandled stratum message: %s", LOGNOTICE("Proxy %d:%d unhandled stratum message: %s",
subproxy->id, subproxy->subid, cs->buf); subproxy->id, subproxy->subid, cs->buf);
} }
@ -2133,7 +2200,7 @@ static void *userproxy_recv(void *arg)
if (parse_method(ckp, proxy, cs->buf)) if (parse_method(ckp, proxy, cs->buf))
continue; continue;
/* If it's not a method it should be a share result */ /* If it's not a method it should be a share result */
if (!parse_share(gdata, proxy, cs->buf)) { if (!parse_share(ckp, gdata, proxy, cs->buf)) {
LOGNOTICE("Proxy %d:%d unhandled stratum message: %s", LOGNOTICE("Proxy %d:%d unhandled stratum message: %s",
proxy->id, proxy->subid, cs->buf); proxy->id, proxy->subid, cs->buf);
} }

Loading…
Cancel
Save