diff --git a/src/generator.c b/src/generator.c index 6b41abc2..2dcd0c94 100644 --- a/src/generator.c +++ b/src/generator.c @@ -41,6 +41,9 @@ typedef struct notify_instance notify_instance_t; /* Per proxied pool instance data */ struct proxy_instance { + ckpool_t *ckp; + connsock_t *cs; + char *auth; char *pass; @@ -65,6 +68,10 @@ struct proxy_instance { pthread_mutex_t notify_lock; notify_instance_t *notify_instances; int notify_id; + + pthread_t pth_precv; + pthread_t pth_psend; + pthread_cond_t psend_cond; }; typedef struct proxy_instance proxy_instance_t; @@ -304,7 +311,7 @@ static bool parse_subscribe(connsock_t *cs, proxy_instance_t *proxi) bool ret = false; int size; - size = read_socket_line(cs); + size = read_socket_line(cs, 5); if (size < 1) { LOGWARNING("Failed to receive line in parse_subscribe"); goto out; @@ -604,7 +611,7 @@ static bool auth_stratum(connsock_t *cs, proxy_instance_t *proxi, const char *au do { int size; - size = read_socket_line(cs); + size = read_socket_line(cs, 5); if (size < 1) { LOGWARNING("Failed to receive line in auth_stratum"); ret = false; @@ -636,6 +643,20 @@ static int proxy_loop(proc_instance_t *pi, connsock_t *cs) return 0; } +static void *proxy_recv(void *arg) +{ + proxy_instance_t *pi = (proxy_instance_t *)arg; + + return NULL; +} + +static void *proxy_send(void *arg) +{ + proxy_instance_t *pi = (proxy_instance_t *)arg; + + return NULL; +} + static int proxy_mode(ckpool_t *ckp, proc_instance_t *pi, connsock_t *cs, const char *auth, const char *pass) { @@ -643,6 +664,9 @@ static int proxy_mode(ckpool_t *ckp, proc_instance_t *pi, connsock_t *cs, int ret = 1; memset(&proxi, 0, sizeof(proxi)); + proxi.ckp = ckp; + proxi.cs = cs; + if (!connect_proxy(cs)) { LOGWARNING("FATAL: Failed to connect to %s:%s in proxy_mode!", cs->url, cs->port); @@ -662,8 +686,13 @@ static int proxy_mode(ckpool_t *ckp, proc_instance_t *pi, connsock_t *cs, goto out; } mutex_init(&proxi.notify_lock); + create_pthread(&proxi.pth_precv, proxy_recv, &proxi); + cond_init(&proxi.psend_cond); + create_pthread(&proxi.pth_psend, proxy_send, &proxi); ret = proxy_loop(pi, cs); + join_pthread(proxi.pth_precv); + join_pthread(proxi.pth_psend); out: close(cs->fd); free(proxi.enonce1); diff --git a/src/libckpool.c b/src/libckpool.c index fd6e0333..a77c8658 100644 --- a/src/libckpool.c +++ b/src/libckpool.c @@ -469,11 +469,11 @@ out: /* Peek in a socket, and then receive only one line at a time, allocing enough * memory in *buf */ -int read_socket_line(connsock_t *cs) +int read_socket_line(connsock_t *cs, int timeout) { char readbuf[PAGESIZE], *eom = NULL; size_t buflen = 0, bufofs = 0; - tv_t timeout = {5, 0}; + tv_t tv_timeout; int ret, bufsiz; fd_set rd; @@ -485,9 +485,9 @@ int read_socket_line(connsock_t *cs) FD_ZERO(&rd); FD_SET(cs->fd, &rd); - timeout.tv_sec = 5; - timeout.tv_usec = 0; - ret = select(cs->fd + 1, &rd, NULL, NULL, &timeout); + tv_timeout.tv_sec = timeout; + tv_timeout.tv_usec = 0; + ret = select(cs->fd + 1, &rd, NULL, NULL, &tv_timeout); if (ret < 0 && interrupted()) continue; if (ret < 1) { @@ -951,7 +951,7 @@ json_t *json_rpc_call(connsock_t *cs, const char *rpc_req) LOGWARNING("Failed to write to socket in json_rpc_call"); goto out_empty; } - ret = read_socket_line(cs); + ret = read_socket_line(cs, 5); if (ret < 1) { LOGWARNING("Failed to read socket line in json_rpc_call"); goto out_empty; @@ -961,7 +961,7 @@ json_t *json_rpc_call(connsock_t *cs, const char *rpc_req) goto out_empty; } do { - ret = read_socket_line(cs); + ret = read_socket_line(cs, 5); if (ret < 1) { LOGWARNING("Failed to read http socket lines in json_rpc_call"); goto out_empty; diff --git a/src/libckpool.h b/src/libckpool.h index 17da9548..8fc637b0 100644 --- a/src/libckpool.h +++ b/src/libckpool.h @@ -312,7 +312,7 @@ void block_socket(int fd); int bind_socket(char *url, char *port); int connect_socket(char *url, char *port); int write_socket(int fd, const void *buf, size_t nbyte); -int read_socket_line(connsock_t *cs); +int read_socket_line(connsock_t *cs, int timeout); void empty_socket(int fd); void close_unix_socket(const int sockd, const char *server_path); int open_unix_server(const char *server_path);