Browse Source

Use only one share hashlist in the generator

master
Con Kolivas 10 years ago
parent
commit
104ab03f2b
  1. 54
      src/generator.c

54
src/generator.c

@ -47,7 +47,7 @@ typedef struct notify_instance notify_instance_t;
struct share_msg { struct share_msg {
UT_hash_handle hh; UT_hash_handle hh;
int64_t id; // Our own id for submitting upstream int id; // Our own id for submitting upstream
int client_id; int client_id;
time_t submit_time; time_t submit_time;
@ -117,10 +117,6 @@ struct proxy_instance {
pthread_t pth_precv; pthread_t pth_precv;
mutex_t share_lock;
share_msg_t *shares;
int64_t share_id;
ckmsgq_t *passsends; // passthrough sends ckmsgq_t *passsends; // passthrough sends
char_entry_t *recvd_lines; /* Linked list of unprocessed messages */ char_entry_t *recvd_lines; /* Linked list of unprocessed messages */
@ -150,6 +146,10 @@ struct generator_data {
pthread_cond_t psend_cond; pthread_cond_t psend_cond;
stratum_msg_t *psends; stratum_msg_t *psends;
mutex_t share_lock;
share_msg_t *shares;
int64_t share_id;
}; };
typedef struct generator_data gdata_t; typedef struct generator_data gdata_t;
@ -918,7 +918,6 @@ static proxy_instance_t *create_subproxy(ckpool_t *ckp, gdata_t *gdata, proxy_in
subproxy->disabled = false; subproxy->disabled = false;
} else { } else {
subproxy = ckzalloc(sizeof(proxy_instance_t)); subproxy = ckzalloc(sizeof(proxy_instance_t));
mutex_init(&subproxy->share_lock);
} }
mutex_unlock(&gdata->lock); mutex_unlock(&gdata->lock);
@ -1419,10 +1418,10 @@ static void submit_share(gdata_t *gdata, json_t *val)
msg->json_msg = val; msg->json_msg = val;
/* Add new share entry to the share hashtable */ /* Add new share entry to the share hashtable */
mutex_lock(&proxi->share_lock); mutex_lock(&gdata->share_lock);
share->id = proxi->share_id++; share->id = gdata->share_id++;
HASH_ADD_I64(proxi->shares, id, share); HASH_ADD_I64(gdata->shares, id, share);
mutex_unlock(&proxi->share_lock); mutex_unlock(&gdata->share_lock);
json_object_set_nocheck(val, "id", json_integer(share->id)); json_object_set_nocheck(val, "id", json_integer(share->id));
@ -1446,8 +1445,8 @@ static void clear_notify(notify_instance_t *ni)
free(ni); free(ni);
} }
/* FIXME: Return something useful to the stratifier based on this result */ /* FIXME: Return something useful to the stratifier based on this result? */
static bool parse_share(proxy_instance_t *proxi, const char *buf) static bool parse_share(gdata_t *gdata, proxy_instance_t *proxi, const char *buf)
{ {
json_t *val = NULL, *idval; json_t *val = NULL, *idval;
share_msg_t *share; share_msg_t *share;
@ -1466,11 +1465,11 @@ static bool parse_share(proxy_instance_t *proxi, const char *buf)
} }
id = json_integer_value(idval); id = json_integer_value(idval);
mutex_lock(&proxi->share_lock); mutex_lock(&gdata->share_lock);
HASH_FIND_I64(proxi->shares, &id, share); HASH_FIND_I64(gdata->shares, &id, share);
if (share) if (share)
HASH_DEL(proxi->shares, share); HASH_DEL(gdata->shares, share);
mutex_unlock(&proxi->share_lock); mutex_unlock(&gdata->share_lock);
/* We set response to true even if we don't find the matching share, /* We set response to true even if we don't find the matching share,
* so long as we recognised it as a share response */ * so long as we recognised it as a share response */
@ -1957,13 +1956,13 @@ static void *proxy_recv(void *arg)
mutex_unlock(&proxi->notify_lock); mutex_unlock(&proxi->notify_lock);
/* Similary with shares older than 2 mins without response */ /* Similary with shares older than 2 mins without response */
mutex_lock(&proxi->share_lock); mutex_lock(&gdata->share_lock);
HASH_ITER(hh, proxi->shares, share, tmpshare) { HASH_ITER(hh, gdata->shares, share, tmpshare) {
if (share->submit_time < now - 120) { if (share->submit_time < now - 120) {
HASH_DEL(proxi->shares, share); HASH_DEL(gdata->shares, share);
} }
} }
mutex_unlock(&proxi->share_lock); mutex_unlock(&gdata->share_lock);
/* If we don't get an update within 10 minutes the upstream pool /* If we don't get an update within 10 minutes the upstream pool
* has likely stopped responding. */ * has likely stopped responding. */
@ -1988,7 +1987,7 @@ static void *proxy_recv(void *arg)
if (parse_method(ckp, subproxy, cs->buf)) if (parse_method(ckp, subproxy, cs->buf))
continue; continue;
/* If it's not a method it should be a share result */ /* If it's not a method it should be a share result */
if (!parse_share(subproxy, cs->buf)) if (!parse_share(gdata, subproxy, cs->buf))
LOGNOTICE("Proxy %d:%d unhandled stratum message: %s", LOGNOTICE("Proxy %d:%d unhandled stratum message: %s",
subproxy->id, subproxy->subid, cs->buf); subproxy->id, subproxy->subid, cs->buf);
} while ((ret = read_socket_line(cs, 0)) > 0); } while ((ret = read_socket_line(cs, 0)) > 0);
@ -2061,13 +2060,13 @@ static void *userproxy_recv(void *arg)
mutex_unlock(&parent->notify_lock); mutex_unlock(&parent->notify_lock);
/* Similary with shares older than 2 mins without response */ /* Similary with shares older than 2 mins without response */
mutex_lock(&parent->share_lock); mutex_lock(&gdata->share_lock);
HASH_ITER(hh, parent->shares, share, tmpshare) { HASH_ITER(hh, gdata->shares, share, tmpshare) {
if (share->submit_time < now - 120) { if (share->submit_time < now - 120) {
HASH_DEL(parent->shares, share); HASH_DEL(gdata->shares, share);
} }
} }
mutex_unlock(&parent->share_lock); mutex_unlock(&gdata->share_lock);
do { do {
/* proxy may have been recycled here if it is not a /* proxy may have been recycled here if it is not a
@ -2075,7 +2074,7 @@ static void *userproxy_recv(void *arg)
if (parse_method(ckp, proxy, cs->buf)) if (parse_method(ckp, proxy, cs->buf))
continue; continue;
/* If it's not a method it should be a share result */ /* If it's not a method it should be a share result */
if (!parse_share(proxy, cs->buf)) if (!parse_share(gdata, proxy, cs->buf))
LOGNOTICE("Proxy %d:%d unhandled stratum message: %s", LOGNOTICE("Proxy %d:%d unhandled stratum message: %s",
proxy->id, proxy->subid, cs->buf); proxy->id, proxy->subid, cs->buf);
} while ((ret = read_socket_line(cs, 0)) > 0); } while ((ret = read_socket_line(cs, 0)) > 0);
@ -2212,7 +2211,6 @@ static proxy_instance_t *__add_userproxy(ckpool_t *ckp, gdata_t *gdata, const in
proxy->pass = pass; proxy->pass = pass;
proxy->ckp = proxy->cs.ckp = ckp; proxy->ckp = proxy->cs.ckp = ckp;
mutex_init(&proxy->notify_lock); mutex_init(&proxy->notify_lock);
mutex_init(&proxy->share_lock);
HASH_ADD_INT(gdata->proxies, id, proxy); HASH_ADD_INT(gdata->proxies, id, proxy);
return proxy; return proxy;
} }
@ -2498,7 +2496,6 @@ static proxy_instance_t *__add_proxy(ckpool_t *ckp, gdata_t *gdata, const int id
proxy->pass = strdup(ckp->proxypass[id]); proxy->pass = strdup(ckp->proxypass[id]);
proxy->ckp = proxy->cs.ckp = ckp; proxy->ckp = proxy->cs.ckp = ckp;
mutex_init(&proxy->notify_lock); mutex_init(&proxy->notify_lock);
mutex_init(&proxy->share_lock);
HASH_ADD_INT(gdata->proxies, id, proxy); HASH_ADD_INT(gdata->proxies, id, proxy);
proxy->global = true; proxy->global = true;
return proxy; return proxy;
@ -2511,6 +2508,7 @@ static int proxy_mode(ckpool_t *ckp, proc_instance_t *pi)
int i, ret; int i, ret;
mutex_init(&gdata->lock); mutex_init(&gdata->lock);
mutex_init(&gdata->share_lock);
/* Create all our proxy structures and pointers */ /* Create all our proxy structures and pointers */
for (i = 0; i < ckp->proxies; i++) { for (i = 0; i < ckp->proxies; i++) {

Loading…
Cancel
Save