diff --git a/src/connector.c b/src/connector.c index 085723c5..7dc17000 100644 --- a/src/connector.c +++ b/src/connector.c @@ -85,13 +85,17 @@ out: } /* Invalidate this instance */ -static void invalidate_client(client_instance_t *client) +static void invalidate_client(ckpool_t *ckp, client_instance_t *client) { + char buf[256]; + close(client->fd); client->fd = -1; + sprintf(buf, "dropclient=%d", client->id); + send_proc(&ckp->stratifier, buf); } -static void send_client(conn_instance_t *ci, int id, const char *buf); +static void send_client(ckpool_t *ckp, conn_instance_t *ci, int id, const char *buf); static void parse_client_msg(conn_instance_t *ci, client_instance_t *client) { @@ -113,7 +117,7 @@ retry: return; LOGINFO("Client fd %d disconnected", client->fd); - invalidate_client(client); + invalidate_client(ckp, client); return; } client->bufofs += ret; @@ -126,7 +130,7 @@ reparse: if (!eol) { if (unlikely(client->bufofs > MAX_MSGSIZE)) { LOGWARNING("Client fd %d overloaded buffer without EOL, disconnecting", client->fd); - invalidate_client(client); + invalidate_client(ckp, client); return; } if (moredata) @@ -138,7 +142,7 @@ reparse: buflen = eol - client->buf + 1; if (unlikely(buflen > MAX_MSGSIZE)) { LOGWARNING("Client fd %d message oversize, disconnecting", client->fd); - invalidate_client(client); + invalidate_client(ckp, client); return; } memcpy(msg, client->buf, buflen); @@ -148,8 +152,8 @@ reparse: val = json_loads(msg, 0, NULL); if (!val) { LOGINFO("Client id %d sent invalid json message %s", client->id, msg); - send_client(ci, client->id, "Invalid JSON, disconnecting\n"); - invalidate_client(client); + send_client(ckp, ci, client->id, "Invalid JSON, disconnecting\n"); + invalidate_client(ckp, client); return; } else { char *s; @@ -230,7 +234,7 @@ out: return NULL; } -static void send_client(conn_instance_t *ci, int id, const char *buf) +static void send_client(ckpool_t *ckp, conn_instance_t *ci, int id, const char *buf) { int fd = -1, ret, len, ofs = 0; client_instance_t *client; @@ -265,7 +269,7 @@ static void send_client(conn_instance_t *ci, int id, const char *buf) if (interrupted()) continue; LOGWARNING("Client id %d disconnected", id); - invalidate_client(client); + invalidate_client(ckp, client); return; } ofs += ret; @@ -312,7 +316,7 @@ retry: dealloc(buf); buf = json_dumps(json_msg, 0); realloc_strcat(&buf, "\n"); - send_client(ci, client_id, buf); + send_client(ckp, ci, client_id, buf); json_decref(json_msg); close(sockd); diff --git a/src/stratifier.c b/src/stratifier.c index 6f12d121..a28f0e30 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -118,6 +118,7 @@ struct stratum_instance { char enonce1[20]; double diff; bool authorised; + bool disconnected; char *useragent; char *workername; int user_id; @@ -368,6 +369,8 @@ static void stratum_broadcast(json_t *val) if (!instance->authorised) continue; + if (instance->disconnected) + continue; msg = ckzalloc(sizeof(stratum_msg_t)); msg->json_msg = json_deep_copy(val); msg->client_id = instance->id; @@ -403,6 +406,20 @@ static void stratum_add_send(json_t *val, int client_id) mutex_unlock(&stratum_send_lock); } +static void drop_client(int client_id) +{ + stratum_instance_t *client; + + ck_rlock(&instance_lock); + client = __instance_by_id(client_id); + ck_runlock(&instance_lock); + + /* May never have been a stratum instance */ + if (unlikely(!client)) + return; + client->disconnected = true; +} + static int strat_loop(ckpool_t *ckp, proc_instance_t *pi) { int sockd, ret = 0, selret; @@ -451,6 +468,15 @@ retry: else if (!strncasecmp(buf, "update", 6)) { update_base(ckp); goto reset; + } else if (!strncasecmp(buf, "dropclient", 10)) { + int client_id; + + ret = sscanf(buf, "dropclient=%d", &client_id); + if (ret < 0) + LOGDEBUG("Failed to parse dropclient command: %s", buf); + else + drop_client(client_id); + goto retry; } else { json_t *val = json_loads(buf, 0, NULL);