Browse Source

Create a thread for all userproxy receives

master
Con Kolivas 10 years ago
parent
commit
73268c15c5
  1. 98
      src/generator.c

98
src/generator.c

@ -103,6 +103,7 @@ struct proxy_instance {
bool no_params; /* Doesn't want any parameters on subscribe */ bool no_params; /* Doesn't want any parameters on subscribe */
bool global; /* Part of the global list of proxies */
bool notified; /* Has this proxy received any notifies yet */ bool notified; /* Has this proxy received any notifies yet */
bool disabled; /* Subproxy no longer to be used */ bool disabled; /* Subproxy no longer to be used */
bool reconnect; /* We need to drop and reconnect */ bool reconnect; /* We need to drop and reconnect */
@ -148,6 +149,8 @@ struct generator_data {
proxy_instance_t *dead_proxies; /* Disabled proxies */ proxy_instance_t *dead_proxies; /* Disabled proxies */
int proxy_notify_id; // Globally increasing notify id int proxy_notify_id; // Globally increasing notify id
ckmsgq_t *srvchk; // Server check message queue ckmsgq_t *srvchk; // Server check message queue
pthread_t pth_uprecv; // User proxy receive thread
pthread_t pth_upsend; // User proxy send thread
}; };
typedef struct generator_data gdata_t; typedef struct generator_data gdata_t;
@ -1816,7 +1819,7 @@ static void *passthrough_recv(void *arg)
if (likely(ret > 0)) if (likely(ret > 0))
ret = read_socket_line(cs, 60); ret = read_socket_line(cs, 60);
if (ret < 1) { if (ret < 1) {
LOGWARNING("Proxy %d:%s failed to read_socket_line in proxy_recv, attempting reconnect", LOGWARNING("Proxy %d:%s failed to read_socket_line in passthrough_recv, attempting reconnect",
proxi->id, proxi->url); proxi->id, proxi->url);
alive = proxi->alive = false; alive = proxi->alive = false;
reconnect_generator(ckp); reconnect_generator(ckp);
@ -1872,6 +1875,7 @@ static void *proxy_recv(void *arg)
int epfd; int epfd;
rename_proc("proxyrecv"); rename_proc("proxyrecv");
pthread_detach(pthread_self());
proxi->epfd = epfd = epoll_create1(EPOLL_CLOEXEC); proxi->epfd = epfd = epoll_create1(EPOLL_CLOEXEC);
if (epfd < 0){ if (epfd < 0){
@ -1968,6 +1972,83 @@ static void *proxy_recv(void *arg)
return NULL; return NULL;
} }
/* Thread that handles all received messages from user proxies */
static void *userproxy_recv(void *arg)
{
ckpool_t *ckp = (ckpool_t *)arg;
gdata_t *gdata = ckp->data;
struct epoll_event event;
int epfd;
rename_proc("uproxyrecv");
pthread_detach(pthread_self());
epfd = epoll_create1(EPOLL_CLOEXEC);
if (epfd < 0){
LOGEMERG("FATAL: Failed to create epoll in userproxy_recv");
return NULL;
}
while (42) {
share_msg_t *share, *tmpshare;
notify_instance_t *ni, *tmp;
proxy_instance_t *proxy;
connsock_t *cs;
time_t now;
int ret;
ret = epoll_wait(epfd, &event, 1, 1000);
if (ret < 1) {
if (likely(!ret))
continue;
LOGEMERG("Failed to epoll_wait in userproxy_recv");
break;
}
proxy = event.data.ptr;
cs = &proxy->cs;
if (event.events & EPOLLHUP) {
LOGNOTICE("Proxy %d:%d %s hung up in epoll_wait", proxy->id,
proxy->subid, proxy->url);
disable_subproxy(gdata, proxy->parent, proxy);
continue;
}
now = time(NULL);
/* Age old notifications older than 10 mins old */
mutex_lock(&proxy->notify_lock);
HASH_ITER(hh, proxy->notify_instances, ni, tmp) {
if (HASH_COUNT(proxy->notify_instances) < 3)
break;
if (ni->notify_time < now - 600) {
HASH_DEL(proxy->notify_instances, ni);
clear_notify(ni);
}
}
mutex_unlock(&proxy->notify_lock);
/* Similary with shares older than 2 mins without response */
mutex_lock(&proxy->share_lock);
HASH_ITER(hh, proxy->shares, share, tmpshare) {
if (share->submit_time < now - 120) {
HASH_DEL(proxy->shares, share);
}
}
mutex_unlock(&proxy->share_lock);
do {
/* proxy may have been recycled here if it is not a
* parent and reconnect was issued */
if (parse_method(ckp, proxy, cs->buf))
continue;
/* If it's not a method it should be a share result */
if (!parse_share(proxy, cs->buf))
LOGNOTICE("Proxy %d:%d unhandled stratum message: %s",
proxy->id, proxy->subid, cs->buf);
} while ((ret = read_socket_line(cs, 0)) > 0);
}
return NULL;
}
static void prepare_proxy(proxy_instance_t *proxi) static void prepare_proxy(proxy_instance_t *proxi)
{ {
proxi->parent = proxi; proxi->parent = proxi;
@ -2379,6 +2460,7 @@ static int proxy_mode(ckpool_t *ckp, proc_instance_t *pi)
proxy->passsends = create_ckmsgq(ckp, "passsend", &passthrough_send); proxy->passsends = create_ckmsgq(ckp, "passsend", &passthrough_send);
} else { } else {
prepare_proxy(proxy); prepare_proxy(proxy);
create_pthread(&gdata->pth_uprecv, userproxy_recv, ckp);
} }
} }
@ -2386,20 +2468,6 @@ static int proxy_mode(ckpool_t *ckp, proc_instance_t *pi)
ret = proxy_loop(pi); ret = proxy_loop(pi);
mutex_lock(&gdata->lock);
for (i = 0; i < ckp->proxies; i++) {
continue; // FIXME: Find proxies
Close(proxy->cs.fd);
free(proxy->enonce1);
free(proxy->enonce1bin);
pthread_cancel(proxy->pth_psend);
pthread_cancel(proxy->pth_precv);
dealloc(proxy->url);
dealloc(proxy->auth);
dealloc(proxy->pass);
}
mutex_unlock(&gdata->lock);
return ret; return ret;
} }

Loading…
Cancel
Save