From 73268c15c5ceb4242abf11882a1e36b2b8631295 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Tue, 17 Mar 2015 13:18:04 +1100 Subject: [PATCH] Create a thread for all userproxy receives --- src/generator.c | 98 +++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 83 insertions(+), 15 deletions(-) diff --git a/src/generator.c b/src/generator.c index b2acc16a..5feb1f35 100644 --- a/src/generator.c +++ b/src/generator.c @@ -103,6 +103,7 @@ struct proxy_instance { bool no_params; /* Doesn't want any parameters on subscribe */ + bool global; /* Part of the global list of proxies */ bool notified; /* Has this proxy received any notifies yet */ bool disabled; /* Subproxy no longer to be used */ bool reconnect; /* We need to drop and reconnect */ @@ -148,6 +149,8 @@ struct generator_data { proxy_instance_t *dead_proxies; /* Disabled proxies */ int proxy_notify_id; // Globally increasing notify id ckmsgq_t *srvchk; // Server check message queue + pthread_t pth_uprecv; // User proxy receive thread + pthread_t pth_upsend; // User proxy send thread }; typedef struct generator_data gdata_t; @@ -1816,7 +1819,7 @@ static void *passthrough_recv(void *arg) if (likely(ret > 0)) ret = read_socket_line(cs, 60); if (ret < 1) { - LOGWARNING("Proxy %d:%s failed to read_socket_line in proxy_recv, attempting reconnect", + LOGWARNING("Proxy %d:%s failed to read_socket_line in passthrough_recv, attempting reconnect", proxi->id, proxi->url); alive = proxi->alive = false; reconnect_generator(ckp); @@ -1872,6 +1875,7 @@ static void *proxy_recv(void *arg) int epfd; rename_proc("proxyrecv"); + pthread_detach(pthread_self()); proxi->epfd = epfd = epoll_create1(EPOLL_CLOEXEC); if (epfd < 0){ @@ -1968,6 +1972,83 @@ static void *proxy_recv(void *arg) return NULL; } +/* Thread that handles all received messages from user proxies */ +static void *userproxy_recv(void *arg) +{ + ckpool_t *ckp = (ckpool_t *)arg; + gdata_t *gdata = ckp->data; + struct epoll_event event; + int epfd; + + rename_proc("uproxyrecv"); + pthread_detach(pthread_self()); + + epfd = epoll_create1(EPOLL_CLOEXEC); + if (epfd < 0){ + LOGEMERG("FATAL: Failed to create epoll in userproxy_recv"); + return NULL; + } + + while (42) { + share_msg_t *share, *tmpshare; + notify_instance_t *ni, *tmp; + proxy_instance_t *proxy; + connsock_t *cs; + time_t now; + int ret; + + ret = epoll_wait(epfd, &event, 1, 1000); + if (ret < 1) { + if (likely(!ret)) + continue; + LOGEMERG("Failed to epoll_wait in userproxy_recv"); + break; + } + proxy = event.data.ptr; + cs = &proxy->cs; + if (event.events & EPOLLHUP) { + LOGNOTICE("Proxy %d:%d %s hung up in epoll_wait", proxy->id, + proxy->subid, proxy->url); + disable_subproxy(gdata, proxy->parent, proxy); + continue; + } + now = time(NULL); + + /* Age old notifications older than 10 mins old */ + mutex_lock(&proxy->notify_lock); + HASH_ITER(hh, proxy->notify_instances, ni, tmp) { + if (HASH_COUNT(proxy->notify_instances) < 3) + break; + if (ni->notify_time < now - 600) { + HASH_DEL(proxy->notify_instances, ni); + clear_notify(ni); + } + } + mutex_unlock(&proxy->notify_lock); + + /* Similary with shares older than 2 mins without response */ + mutex_lock(&proxy->share_lock); + HASH_ITER(hh, proxy->shares, share, tmpshare) { + if (share->submit_time < now - 120) { + HASH_DEL(proxy->shares, share); + } + } + mutex_unlock(&proxy->share_lock); + + do { + /* proxy may have been recycled here if it is not a + * parent and reconnect was issued */ + if (parse_method(ckp, proxy, cs->buf)) + continue; + /* If it's not a method it should be a share result */ + if (!parse_share(proxy, cs->buf)) + LOGNOTICE("Proxy %d:%d unhandled stratum message: %s", + proxy->id, proxy->subid, cs->buf); + } while ((ret = read_socket_line(cs, 0)) > 0); + } + return NULL; +} + static void prepare_proxy(proxy_instance_t *proxi) { proxi->parent = proxi; @@ -2379,6 +2460,7 @@ static int proxy_mode(ckpool_t *ckp, proc_instance_t *pi) proxy->passsends = create_ckmsgq(ckp, "passsend", &passthrough_send); } else { prepare_proxy(proxy); + create_pthread(&gdata->pth_uprecv, userproxy_recv, ckp); } } @@ -2386,20 +2468,6 @@ static int proxy_mode(ckpool_t *ckp, proc_instance_t *pi) ret = proxy_loop(pi); - mutex_lock(&gdata->lock); - for (i = 0; i < ckp->proxies; i++) { - continue; // FIXME: Find proxies - Close(proxy->cs.fd); - free(proxy->enonce1); - free(proxy->enonce1bin); - pthread_cancel(proxy->pth_psend); - pthread_cancel(proxy->pth_precv); - dealloc(proxy->url); - dealloc(proxy->auth); - dealloc(proxy->pass); - } - mutex_unlock(&gdata->lock); - return ret; }