Browse Source

Update notify when we have a new subscription and gracefully drop connections from the send process to be detected rapidly in the recv thread

master
Con Kolivas 11 years ago
parent
commit
0471340fdd
  1. 11
      src/generator.c

11
src/generator.c

@ -829,7 +829,6 @@ static void clear_notify(notify_instance_t *ni)
static void reconnect_stratum(connsock_t *cs, proxy_instance_t *proxi) static void reconnect_stratum(connsock_t *cs, proxy_instance_t *proxi)
{ {
notify_instance_t *ni, *tmp; notify_instance_t *ni, *tmp;
ckpool_t *ckp = proxi->ckp;
bool ret = true; bool ret = true;
/* All our notify data is invalid if we reconnect so discard them */ /* All our notify data is invalid if we reconnect so discard them */
@ -852,7 +851,6 @@ static void reconnect_stratum(connsock_t *cs, proxy_instance_t *proxi)
continue; continue;
ret = auth_stratum(cs, proxi); ret = auth_stratum(cs, proxi);
} while (!ret); } while (!ret);
send_proc(ckp->stratifier, "subscribe");
} }
/* FIXME: Return something useful to the stratifier based on this result */ /* FIXME: Return something useful to the stratifier based on this result */
@ -935,12 +933,18 @@ static void *proxy_recv(void *arg)
/* If we don't get an update within 2 minutes the upstream pool /* If we don't get an update within 2 minutes the upstream pool
* has likely stopped responding. */ * has likely stopped responding. */
do { do {
if (cs->fd == -1) {
ret = -1;
break;
}
ret = read_socket_line(cs, 5); ret = read_socket_line(cs, 5);
} while (ret == 0 && ++retries < 24); } while (ret == 0 && ++retries < 24);
if (ret < 1) { if (ret < 1) {
LOGWARNING("Failed to read_socket_line in proxy_recv, attempting reconnect"); LOGWARNING("Failed to read_socket_line in proxy_recv, attempting reconnect");
reconnect_stratum(cs, proxi); reconnect_stratum(cs, proxi);
send_proc(ckp->stratifier, "subscribe");
send_proc(ckp->stratifier, "notify");
continue; continue;
} }
if (parse_method(proxi, cs->buf)) { if (parse_method(proxi, cs->buf)) {
@ -1012,9 +1016,10 @@ static void *proxy_send(void *arg)
LOGWARNING("Failed to find matching jobid in proxysend"); LOGWARNING("Failed to find matching jobid in proxysend");
json_decref(msg->json_msg); json_decref(msg->json_msg);
free(msg); free(msg);
if (!ret) { if (!ret && cs->fd > 0) {
LOGWARNING("Failed to send msg in proxy_send, dropping to reconnect"); LOGWARNING("Failed to send msg in proxy_send, dropping to reconnect");
close(cs->fd); close(cs->fd);
cs->fd = -1;
} }
} }
return NULL; return NULL;

Loading…
Cancel
Save