Browse Source

Make read_socket_line accept a timeout in seconds

master
Con Kolivas 11 years ago
parent
commit
54883b37fd
  1. 33
      src/generator.c
  2. 14
      src/libckpool.c
  3. 2
      src/libckpool.h

33
src/generator.c

@ -41,6 +41,9 @@ typedef struct notify_instance notify_instance_t;
/* Per proxied pool instance data */ /* Per proxied pool instance data */
struct proxy_instance { struct proxy_instance {
ckpool_t *ckp;
connsock_t *cs;
char *auth; char *auth;
char *pass; char *pass;
@ -65,6 +68,10 @@ struct proxy_instance {
pthread_mutex_t notify_lock; pthread_mutex_t notify_lock;
notify_instance_t *notify_instances; notify_instance_t *notify_instances;
int notify_id; int notify_id;
pthread_t pth_precv;
pthread_t pth_psend;
pthread_cond_t psend_cond;
}; };
typedef struct proxy_instance proxy_instance_t; typedef struct proxy_instance proxy_instance_t;
@ -304,7 +311,7 @@ static bool parse_subscribe(connsock_t *cs, proxy_instance_t *proxi)
bool ret = false; bool ret = false;
int size; int size;
size = read_socket_line(cs); size = read_socket_line(cs, 5);
if (size < 1) { if (size < 1) {
LOGWARNING("Failed to receive line in parse_subscribe"); LOGWARNING("Failed to receive line in parse_subscribe");
goto out; goto out;
@ -604,7 +611,7 @@ static bool auth_stratum(connsock_t *cs, proxy_instance_t *proxi, const char *au
do { do {
int size; int size;
size = read_socket_line(cs); size = read_socket_line(cs, 5);
if (size < 1) { if (size < 1) {
LOGWARNING("Failed to receive line in auth_stratum"); LOGWARNING("Failed to receive line in auth_stratum");
ret = false; ret = false;
@ -636,6 +643,20 @@ static int proxy_loop(proc_instance_t *pi, connsock_t *cs)
return 0; return 0;
} }
static void *proxy_recv(void *arg)
{
proxy_instance_t *pi = (proxy_instance_t *)arg;
return NULL;
}
static void *proxy_send(void *arg)
{
proxy_instance_t *pi = (proxy_instance_t *)arg;
return NULL;
}
static int proxy_mode(ckpool_t *ckp, proc_instance_t *pi, connsock_t *cs, static int proxy_mode(ckpool_t *ckp, proc_instance_t *pi, connsock_t *cs,
const char *auth, const char *pass) const char *auth, const char *pass)
{ {
@ -643,6 +664,9 @@ static int proxy_mode(ckpool_t *ckp, proc_instance_t *pi, connsock_t *cs,
int ret = 1; int ret = 1;
memset(&proxi, 0, sizeof(proxi)); memset(&proxi, 0, sizeof(proxi));
proxi.ckp = ckp;
proxi.cs = cs;
if (!connect_proxy(cs)) { if (!connect_proxy(cs)) {
LOGWARNING("FATAL: Failed to connect to %s:%s in proxy_mode!", LOGWARNING("FATAL: Failed to connect to %s:%s in proxy_mode!",
cs->url, cs->port); cs->url, cs->port);
@ -662,8 +686,13 @@ static int proxy_mode(ckpool_t *ckp, proc_instance_t *pi, connsock_t *cs,
goto out; goto out;
} }
mutex_init(&proxi.notify_lock); mutex_init(&proxi.notify_lock);
create_pthread(&proxi.pth_precv, proxy_recv, &proxi);
cond_init(&proxi.psend_cond);
create_pthread(&proxi.pth_psend, proxy_send, &proxi);
ret = proxy_loop(pi, cs); ret = proxy_loop(pi, cs);
join_pthread(proxi.pth_precv);
join_pthread(proxi.pth_psend);
out: out:
close(cs->fd); close(cs->fd);
free(proxi.enonce1); free(proxi.enonce1);

14
src/libckpool.c

@ -469,11 +469,11 @@ out:
/* Peek in a socket, and then receive only one line at a time, allocing enough /* Peek in a socket, and then receive only one line at a time, allocing enough
* memory in *buf */ * memory in *buf */
int read_socket_line(connsock_t *cs) int read_socket_line(connsock_t *cs, int timeout)
{ {
char readbuf[PAGESIZE], *eom = NULL; char readbuf[PAGESIZE], *eom = NULL;
size_t buflen = 0, bufofs = 0; size_t buflen = 0, bufofs = 0;
tv_t timeout = {5, 0}; tv_t tv_timeout;
int ret, bufsiz; int ret, bufsiz;
fd_set rd; fd_set rd;
@ -485,9 +485,9 @@ int read_socket_line(connsock_t *cs)
FD_ZERO(&rd); FD_ZERO(&rd);
FD_SET(cs->fd, &rd); FD_SET(cs->fd, &rd);
timeout.tv_sec = 5; tv_timeout.tv_sec = timeout;
timeout.tv_usec = 0; tv_timeout.tv_usec = 0;
ret = select(cs->fd + 1, &rd, NULL, NULL, &timeout); ret = select(cs->fd + 1, &rd, NULL, NULL, &tv_timeout);
if (ret < 0 && interrupted()) if (ret < 0 && interrupted())
continue; continue;
if (ret < 1) { if (ret < 1) {
@ -951,7 +951,7 @@ json_t *json_rpc_call(connsock_t *cs, const char *rpc_req)
LOGWARNING("Failed to write to socket in json_rpc_call"); LOGWARNING("Failed to write to socket in json_rpc_call");
goto out_empty; goto out_empty;
} }
ret = read_socket_line(cs); ret = read_socket_line(cs, 5);
if (ret < 1) { if (ret < 1) {
LOGWARNING("Failed to read socket line in json_rpc_call"); LOGWARNING("Failed to read socket line in json_rpc_call");
goto out_empty; goto out_empty;
@ -961,7 +961,7 @@ json_t *json_rpc_call(connsock_t *cs, const char *rpc_req)
goto out_empty; goto out_empty;
} }
do { do {
ret = read_socket_line(cs); ret = read_socket_line(cs, 5);
if (ret < 1) { if (ret < 1) {
LOGWARNING("Failed to read http socket lines in json_rpc_call"); LOGWARNING("Failed to read http socket lines in json_rpc_call");
goto out_empty; goto out_empty;

2
src/libckpool.h

@ -312,7 +312,7 @@ void block_socket(int fd);
int bind_socket(char *url, char *port); int bind_socket(char *url, char *port);
int connect_socket(char *url, char *port); int connect_socket(char *url, char *port);
int write_socket(int fd, const void *buf, size_t nbyte); int write_socket(int fd, const void *buf, size_t nbyte);
int read_socket_line(connsock_t *cs); int read_socket_line(connsock_t *cs, int timeout);
void empty_socket(int fd); void empty_socket(int fd);
void close_unix_socket(const int sockd, const char *server_path); void close_unix_socket(const int sockd, const char *server_path);
int open_unix_server(const char *server_path); int open_unix_server(const char *server_path);

Loading…
Cancel
Save