Browse Source

Increase send buffer size when possible to nodes to accommodate large messages

master
Con Kolivas 9 years ago
parent
commit
39f542fac9
  1. 39
      src/connector.c

39
src/connector.c

@ -75,6 +75,9 @@ struct client_instance {
/* Time this client started blocking, 0 when not blocked */ /* Time this client started blocking, 0 when not blocked */
time_t blocked_time; time_t blocked_time;
/* The size of the socket send buffer */
int sendbufsize;
}; };
struct sender_send { struct sender_send {
@ -153,6 +156,9 @@ struct connector_data {
/* Pending sends to the upstream server */ /* Pending sends to the upstream server */
ckmsgq_t *upstream_sends; ckmsgq_t *upstream_sends;
connsock_t upstream_cs; connsock_t upstream_cs;
/* Have we given the warning about inability to raise sendbuf size */
bool wmem_warn;
}; };
typedef struct connector_data cdata_t; typedef struct connector_data cdata_t;
@ -228,6 +234,7 @@ static int accept_client(cdata_t *cdata, const int epfd, const uint64_t server)
client_instance_t *client; client_instance_t *client;
struct epoll_event event; struct epoll_event event;
socklen_t address_len; socklen_t address_len;
socklen_t optlen;
ck_rlock(&cdata->lock); ck_rlock(&cdata->lock);
no_clients = HASH_COUNT(cdata->clients); no_clients = HASH_COUNT(cdata->clients);
@ -302,6 +309,9 @@ static int accept_client(cdata_t *cdata, const int epfd, const uint64_t server)
* removes it automatically from the epoll list. */ * removes it automatically from the epoll list. */
__inc_instance_ref(client); __inc_instance_ref(client);
client->fd = fd; client->fd = fd;
optlen = sizeof(client->sendbufsize);
getsockopt(fd, SOL_SOCKET, SO_RCVBUF, &client->sendbufsize, &optlen);
LOGDEBUG("Client sendbufsize detected as %d", client->sendbufsize);
return 1; return 1;
} }
@ -669,6 +679,35 @@ static bool send_sender_send(ckpool_t *ckp, cdata_t *cdata, sender_send_t *sende
client->sending = sender_send; client->sending = sender_send;
now_t = time(NULL); now_t = time(NULL);
/* Increase sendbufsize to match large messages sent to clients - this
* usually only applies to clients as mining nodes. */
if (unlikely(sender_send->len > client->sendbufsize && !cdata->wmem_warn)) {
int opt, len = sender_send->len;
socklen_t optlen;
optlen = sizeof(opt);
opt = len * 4 / 3;
setsockopt(client->fd, SOL_SOCKET, SO_SNDBUF, &opt, optlen);
getsockopt(client->fd, SOL_SOCKET, SO_SNDBUF, &opt, &optlen);
opt /= 2;
if (opt < len) {
LOGDEBUG("Failed to set desired sendbufsize of %d unprivileged, only got %d",
len, opt);
optlen = sizeof(opt);
opt = len * 4 / 3;
setsockopt(client->fd, SOL_SOCKET, SO_SNDBUFFORCE, &opt, optlen);
getsockopt(client->fd, SOL_SOCKET, SO_SNDBUF, &opt, &optlen);
opt /= 2;
}
client->sendbufsize = opt;
if (opt < len) {
LOGWARNING("Failed to increase sendbufsize to %d, increase wmem_max or start %s privileged",
len, ckp->name);
cdata->wmem_warn = true;
} else
LOGDEBUG("Increased sendbufsize to %d of desired %d", opt, len);
}
while (sender_send->len) { while (sender_send->len) {
int ret = write(client->fd, sender_send->buf + sender_send->ofs, sender_send->len); int ret = write(client->fd, sender_send->buf + sender_send->ofs, sender_send->len);

Loading…
Cancel
Save