Browse Source

Remove invalidated clients from broadcasts

master
Con Kolivas 11 years ago
parent
commit
3bee9efed6
  1. 24
      src/connector.c
  2. 26
      src/stratifier.c

24
src/connector.c

@ -85,13 +85,17 @@ out:
} }
/* Invalidate this instance */ /* Invalidate this instance */
static void invalidate_client(client_instance_t *client) static void invalidate_client(ckpool_t *ckp, client_instance_t *client)
{ {
char buf[256];
close(client->fd); close(client->fd);
client->fd = -1; client->fd = -1;
sprintf(buf, "dropclient=%d", client->id);
send_proc(&ckp->stratifier, buf);
} }
static void send_client(conn_instance_t *ci, int id, const char *buf); static void send_client(ckpool_t *ckp, conn_instance_t *ci, int id, const char *buf);
static void parse_client_msg(conn_instance_t *ci, client_instance_t *client) static void parse_client_msg(conn_instance_t *ci, client_instance_t *client)
{ {
@ -113,7 +117,7 @@ retry:
return; return;
LOGINFO("Client fd %d disconnected", client->fd); LOGINFO("Client fd %d disconnected", client->fd);
invalidate_client(client); invalidate_client(ckp, client);
return; return;
} }
client->bufofs += ret; client->bufofs += ret;
@ -126,7 +130,7 @@ reparse:
if (!eol) { if (!eol) {
if (unlikely(client->bufofs > MAX_MSGSIZE)) { if (unlikely(client->bufofs > MAX_MSGSIZE)) {
LOGWARNING("Client fd %d overloaded buffer without EOL, disconnecting", client->fd); LOGWARNING("Client fd %d overloaded buffer without EOL, disconnecting", client->fd);
invalidate_client(client); invalidate_client(ckp, client);
return; return;
} }
if (moredata) if (moredata)
@ -138,7 +142,7 @@ reparse:
buflen = eol - client->buf + 1; buflen = eol - client->buf + 1;
if (unlikely(buflen > MAX_MSGSIZE)) { if (unlikely(buflen > MAX_MSGSIZE)) {
LOGWARNING("Client fd %d message oversize, disconnecting", client->fd); LOGWARNING("Client fd %d message oversize, disconnecting", client->fd);
invalidate_client(client); invalidate_client(ckp, client);
return; return;
} }
memcpy(msg, client->buf, buflen); memcpy(msg, client->buf, buflen);
@ -148,8 +152,8 @@ reparse:
val = json_loads(msg, 0, NULL); val = json_loads(msg, 0, NULL);
if (!val) { if (!val) {
LOGINFO("Client id %d sent invalid json message %s", client->id, msg); LOGINFO("Client id %d sent invalid json message %s", client->id, msg);
send_client(ci, client->id, "Invalid JSON, disconnecting\n"); send_client(ckp, ci, client->id, "Invalid JSON, disconnecting\n");
invalidate_client(client); invalidate_client(ckp, client);
return; return;
} else { } else {
char *s; char *s;
@ -230,7 +234,7 @@ out:
return NULL; return NULL;
} }
static void send_client(conn_instance_t *ci, int id, const char *buf) static void send_client(ckpool_t *ckp, conn_instance_t *ci, int id, const char *buf)
{ {
int fd = -1, ret, len, ofs = 0; int fd = -1, ret, len, ofs = 0;
client_instance_t *client; client_instance_t *client;
@ -265,7 +269,7 @@ static void send_client(conn_instance_t *ci, int id, const char *buf)
if (interrupted()) if (interrupted())
continue; continue;
LOGWARNING("Client id %d disconnected", id); LOGWARNING("Client id %d disconnected", id);
invalidate_client(client); invalidate_client(ckp, client);
return; return;
} }
ofs += ret; ofs += ret;
@ -312,7 +316,7 @@ retry:
dealloc(buf); dealloc(buf);
buf = json_dumps(json_msg, 0); buf = json_dumps(json_msg, 0);
realloc_strcat(&buf, "\n"); realloc_strcat(&buf, "\n");
send_client(ci, client_id, buf); send_client(ckp, ci, client_id, buf);
json_decref(json_msg); json_decref(json_msg);
close(sockd); close(sockd);

26
src/stratifier.c

@ -118,6 +118,7 @@ struct stratum_instance {
char enonce1[20]; char enonce1[20];
double diff; double diff;
bool authorised; bool authorised;
bool disconnected;
char *useragent; char *useragent;
char *workername; char *workername;
int user_id; int user_id;
@ -368,6 +369,8 @@ static void stratum_broadcast(json_t *val)
if (!instance->authorised) if (!instance->authorised)
continue; continue;
if (instance->disconnected)
continue;
msg = ckzalloc(sizeof(stratum_msg_t)); msg = ckzalloc(sizeof(stratum_msg_t));
msg->json_msg = json_deep_copy(val); msg->json_msg = json_deep_copy(val);
msg->client_id = instance->id; msg->client_id = instance->id;
@ -403,6 +406,20 @@ static void stratum_add_send(json_t *val, int client_id)
mutex_unlock(&stratum_send_lock); mutex_unlock(&stratum_send_lock);
} }
static void drop_client(int client_id)
{
stratum_instance_t *client;
ck_rlock(&instance_lock);
client = __instance_by_id(client_id);
ck_runlock(&instance_lock);
/* May never have been a stratum instance */
if (unlikely(!client))
return;
client->disconnected = true;
}
static int strat_loop(ckpool_t *ckp, proc_instance_t *pi) static int strat_loop(ckpool_t *ckp, proc_instance_t *pi)
{ {
int sockd, ret = 0, selret; int sockd, ret = 0, selret;
@ -451,6 +468,15 @@ retry:
else if (!strncasecmp(buf, "update", 6)) { else if (!strncasecmp(buf, "update", 6)) {
update_base(ckp); update_base(ckp);
goto reset; goto reset;
} else if (!strncasecmp(buf, "dropclient", 10)) {
int client_id;
ret = sscanf(buf, "dropclient=%d", &client_id);
if (ret < 0)
LOGDEBUG("Failed to parse dropclient command: %s", buf);
else
drop_client(client_id);
goto retry;
} else { } else {
json_t *val = json_loads(buf, 0, NULL); json_t *val = json_loads(buf, 0, NULL);

Loading…
Cancel
Save