diff --git a/src/generator.c b/src/generator.c index 79a02dce..bad61581 100644 --- a/src/generator.c +++ b/src/generator.c @@ -47,7 +47,7 @@ typedef struct notify_instance notify_instance_t; struct share_msg { UT_hash_handle hh; - int64_t id; // Our own id for submitting upstream + int id; // Our own id for submitting upstream int client_id; time_t submit_time; @@ -117,10 +117,6 @@ struct proxy_instance { pthread_t pth_precv; - mutex_t share_lock; - share_msg_t *shares; - int64_t share_id; - ckmsgq_t *passsends; // passthrough sends char_entry_t *recvd_lines; /* Linked list of unprocessed messages */ @@ -150,6 +146,10 @@ struct generator_data { pthread_cond_t psend_cond; stratum_msg_t *psends; + + mutex_t share_lock; + share_msg_t *shares; + int64_t share_id; }; typedef struct generator_data gdata_t; @@ -918,7 +918,6 @@ static proxy_instance_t *create_subproxy(ckpool_t *ckp, gdata_t *gdata, proxy_in subproxy->disabled = false; } else { subproxy = ckzalloc(sizeof(proxy_instance_t)); - mutex_init(&subproxy->share_lock); } mutex_unlock(&gdata->lock); @@ -1419,10 +1418,10 @@ static void submit_share(gdata_t *gdata, json_t *val) msg->json_msg = val; /* Add new share entry to the share hashtable */ - mutex_lock(&proxi->share_lock); - share->id = proxi->share_id++; - HASH_ADD_I64(proxi->shares, id, share); - mutex_unlock(&proxi->share_lock); + mutex_lock(&gdata->share_lock); + share->id = gdata->share_id++; + HASH_ADD_I64(gdata->shares, id, share); + mutex_unlock(&gdata->share_lock); json_object_set_nocheck(val, "id", json_integer(share->id)); @@ -1446,8 +1445,8 @@ static void clear_notify(notify_instance_t *ni) free(ni); } -/* FIXME: Return something useful to the stratifier based on this result */ -static bool parse_share(proxy_instance_t *proxi, const char *buf) +/* FIXME: Return something useful to the stratifier based on this result? */ +static bool parse_share(gdata_t *gdata, proxy_instance_t *proxi, const char *buf) { json_t *val = NULL, *idval; share_msg_t *share; @@ -1466,11 +1465,11 @@ static bool parse_share(proxy_instance_t *proxi, const char *buf) } id = json_integer_value(idval); - mutex_lock(&proxi->share_lock); - HASH_FIND_I64(proxi->shares, &id, share); + mutex_lock(&gdata->share_lock); + HASH_FIND_I64(gdata->shares, &id, share); if (share) - HASH_DEL(proxi->shares, share); - mutex_unlock(&proxi->share_lock); + HASH_DEL(gdata->shares, share); + mutex_unlock(&gdata->share_lock); /* We set response to true even if we don't find the matching share, * so long as we recognised it as a share response */ @@ -1957,13 +1956,13 @@ static void *proxy_recv(void *arg) mutex_unlock(&proxi->notify_lock); /* Similary with shares older than 2 mins without response */ - mutex_lock(&proxi->share_lock); - HASH_ITER(hh, proxi->shares, share, tmpshare) { + mutex_lock(&gdata->share_lock); + HASH_ITER(hh, gdata->shares, share, tmpshare) { if (share->submit_time < now - 120) { - HASH_DEL(proxi->shares, share); + HASH_DEL(gdata->shares, share); } } - mutex_unlock(&proxi->share_lock); + mutex_unlock(&gdata->share_lock); /* If we don't get an update within 10 minutes the upstream pool * has likely stopped responding. */ @@ -1988,7 +1987,7 @@ static void *proxy_recv(void *arg) if (parse_method(ckp, subproxy, cs->buf)) continue; /* If it's not a method it should be a share result */ - if (!parse_share(subproxy, cs->buf)) + if (!parse_share(gdata, subproxy, cs->buf)) LOGNOTICE("Proxy %d:%d unhandled stratum message: %s", subproxy->id, subproxy->subid, cs->buf); } while ((ret = read_socket_line(cs, 0)) > 0); @@ -2061,13 +2060,13 @@ static void *userproxy_recv(void *arg) mutex_unlock(&parent->notify_lock); /* Similary with shares older than 2 mins without response */ - mutex_lock(&parent->share_lock); - HASH_ITER(hh, parent->shares, share, tmpshare) { + mutex_lock(&gdata->share_lock); + HASH_ITER(hh, gdata->shares, share, tmpshare) { if (share->submit_time < now - 120) { - HASH_DEL(parent->shares, share); + HASH_DEL(gdata->shares, share); } } - mutex_unlock(&parent->share_lock); + mutex_unlock(&gdata->share_lock); do { /* proxy may have been recycled here if it is not a @@ -2075,7 +2074,7 @@ static void *userproxy_recv(void *arg) if (parse_method(ckp, proxy, cs->buf)) continue; /* If it's not a method it should be a share result */ - if (!parse_share(proxy, cs->buf)) + if (!parse_share(gdata, proxy, cs->buf)) LOGNOTICE("Proxy %d:%d unhandled stratum message: %s", proxy->id, proxy->subid, cs->buf); } while ((ret = read_socket_line(cs, 0)) > 0); @@ -2212,7 +2211,6 @@ static proxy_instance_t *__add_userproxy(ckpool_t *ckp, gdata_t *gdata, const in proxy->pass = pass; proxy->ckp = proxy->cs.ckp = ckp; mutex_init(&proxy->notify_lock); - mutex_init(&proxy->share_lock); HASH_ADD_INT(gdata->proxies, id, proxy); return proxy; } @@ -2498,7 +2496,6 @@ static proxy_instance_t *__add_proxy(ckpool_t *ckp, gdata_t *gdata, const int id proxy->pass = strdup(ckp->proxypass[id]); proxy->ckp = proxy->cs.ckp = ckp; mutex_init(&proxy->notify_lock); - mutex_init(&proxy->share_lock); HASH_ADD_INT(gdata->proxies, id, proxy); proxy->global = true; return proxy; @@ -2511,6 +2508,7 @@ static int proxy_mode(ckpool_t *ckp, proc_instance_t *pi) int i, ret; mutex_init(&gdata->lock); + mutex_init(&gdata->share_lock); /* Create all our proxy structures and pointers */ for (i = 0; i < ckp->proxies; i++) {