Browse Source

Provide helper functions for setting buffer sizes on any socket fds

master
Con Kolivas 9 years ago
parent
commit
a6cc0bd90c
  1. 96
      src/ckpool.c
  2. 6
      src/ckpool.h
  3. 31
      src/connector.c

96
src/ckpool.c

@ -512,6 +512,62 @@ void empty_buffer(connsock_t *cs)
cs->buflen = cs->bufofs = 0; cs->buflen = cs->bufofs = 0;
} }
int set_sendbufsize(ckpool_t *ckp, const int fd, const int len)
{
socklen_t optlen;
int opt;
optlen = sizeof(opt);
opt = len * 4 / 3;
setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &opt, optlen);
getsockopt(fd, SOL_SOCKET, SO_SNDBUF, &opt, &optlen);
opt /= 2;
if (opt < len) {
LOGDEBUG("Failed to set desired sendbufsize of %d unprivileged, only got %d",
len, opt);
optlen = sizeof(opt);
opt = len * 4 / 3;
setsockopt(fd, SOL_SOCKET, SO_SNDBUFFORCE, &opt, optlen);
getsockopt(fd, SOL_SOCKET, SO_SNDBUF, &opt, &optlen);
opt /= 2;
}
if (opt < len) {
LOGWARNING("Failed to increase sendbufsize to %d, increase wmem_max or start %s privileged",
len, ckp->name);
ckp->wmem_warn = true;
} else
LOGDEBUG("Increased sendbufsize to %d of desired %d", opt, len);
return opt;
}
int set_recvbufsize(ckpool_t *ckp, const int fd, const int len)
{
socklen_t optlen;
int opt;
optlen = sizeof(opt);
opt = len * 4 / 3;
setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &opt, optlen);
getsockopt(fd, SOL_SOCKET, SO_RCVBUF, &opt, &optlen);
opt /= 2;
if (opt < len) {
LOGDEBUG("Failed to set desired rcvbufsiz of %d unprivileged, only got %d",
len, opt);
optlen = sizeof(opt);
opt = len * 4 / 3;
setsockopt(fd, SOL_SOCKET, SO_RCVBUFFORCE, &opt, optlen);
getsockopt(fd, SOL_SOCKET, SO_RCVBUF, &opt, &optlen);
opt /= 2;
}
if (opt < len) {
LOGWARNING("Failed to increase rcvbufsiz to %d, increase rmem_max or start %s privileged",
len, ckp->name);
ckp->rmem_warn = true;
} else
LOGDEBUG("Increased rcvbufsiz to %d of desired %d", opt, len);
return opt;
}
/* If there is any cs->buflen it implies a full line was received on the last /* If there is any cs->buflen it implies a full line was received on the last
* pass through read_socket_line and subsequently processed, leaving * pass through read_socket_line and subsequently processed, leaving
* unprocessed data beyond cs->bufofs. Otherwise a zero buflen means there is * unprocessed data beyond cs->bufofs. Otherwise a zero buflen means there is
@ -535,7 +591,7 @@ static void clear_bufline(connsock_t *cs)
} }
} }
static void add_buflen(connsock_t *cs, const char *readbuf, const int len) static void add_buflen(ckpool_t *ckp, connsock_t *cs, const char *readbuf, const int len)
{ {
int backoff = 1; int backoff = 1;
int buflen; int buflen;
@ -556,32 +612,9 @@ static void add_buflen(connsock_t *cs, const char *readbuf, const int len)
} }
/* Increase receive buffer if possible to larger than the largest /* Increase receive buffer if possible to larger than the largest
* message we're likely to buffer */ * message we're likely to buffer */
if (buflen > cs->rcvbufsiz && !cs->rcvbufsiz_setfail) { if (unlikely(!ckp->rmem_warn && buflen > cs->rcvbufsiz))
socklen_t optlen; cs->rcvbufsiz = set_recvbufsize(ckp, cs->fd, buflen);
int opt;
optlen = sizeof(opt);
opt = buflen * 4 / 3;
setsockopt(cs->fd, SOL_SOCKET, SO_RCVBUF, &opt, optlen);
getsockopt(cs->fd, SOL_SOCKET, SO_RCVBUF, &opt, &optlen);
opt /= 2;
if (opt < buflen) {
LOGDEBUG("Failed to set desired rcvbufsiz of %d unprivileged, only got %d",
buflen, opt);
optlen = sizeof(opt);
opt = buflen * 4 / 3;
setsockopt(cs->fd, SOL_SOCKET, SO_RCVBUFFORCE, &opt, optlen);
getsockopt(cs->fd, SOL_SOCKET, SO_RCVBUF, &opt, &optlen);
opt /= 2;
}
cs->rcvbufsiz = opt;
if (opt < buflen) {
LOGWARNING("Failed to increase rcvbufsiz to %d, increase rmem_max or start %s privileged",
buflen, cs->ckp->name);
cs->rcvbufsiz_setfail = true;
} else
LOGDEBUG("Increased rcvbufsiz to %d of desired %d", opt, buflen);
}
memcpy(cs->buf + cs->bufofs, readbuf, len); memcpy(cs->buf + cs->bufofs, readbuf, len);
cs->bufofs += len; cs->bufofs += len;
cs->buf[cs->bufofs] = '\0'; cs->buf[cs->bufofs] = '\0';
@ -589,7 +622,7 @@ static void add_buflen(connsock_t *cs, const char *readbuf, const int len)
/* Receive as much data is currently available without blocking into a connsock /* Receive as much data is currently available without blocking into a connsock
* buffer. Returns total length of data read. */ * buffer. Returns total length of data read. */
static int recv_available(connsock_t *cs) static int recv_available(ckpool_t *ckp, connsock_t *cs)
{ {
char readbuf[PAGESIZE]; char readbuf[PAGESIZE];
int len = 0, ret; int len = 0, ret;
@ -597,7 +630,7 @@ static int recv_available(connsock_t *cs)
do { do {
ret = recv(cs->fd, readbuf, PAGESIZE - 4, MSG_DONTWAIT); ret = recv(cs->fd, readbuf, PAGESIZE - 4, MSG_DONTWAIT);
if (ret > 0) { if (ret > 0) {
add_buflen(cs, readbuf, ret); add_buflen(ckp, cs, readbuf, ret);
len += ret; len += ret;
} }
} while (ret > 0); } while (ret > 0);
@ -612,14 +645,15 @@ static int recv_available(connsock_t *cs)
* and -1 on error. */ * and -1 on error. */
int read_socket_line(connsock_t *cs, float *timeout) int read_socket_line(connsock_t *cs, float *timeout)
{ {
bool proxy = cs->ckp->proxy; ckpool_t *ckp = cs->ckp;
bool proxy = ckp->proxy;
char *eom = NULL; char *eom = NULL;
tv_t start, now; tv_t start, now;
float diff; float diff;
int ret; int ret;
clear_bufline(cs); clear_bufline(cs);
recv_available(cs); // Intentionally ignore return value recv_available(ckp, cs); // Intentionally ignore return value
eom = strchr(cs->buf, '\n'); eom = strchr(cs->buf, '\n');
tv_time(&start); tv_time(&start);
@ -646,7 +680,7 @@ int read_socket_line(connsock_t *cs, float *timeout)
LOGERR("Select %s in read_socket_line", !ret ? "timed out" : "failed"); LOGERR("Select %s in read_socket_line", !ret ? "timed out" : "failed");
goto out; goto out;
} }
ret = recv_available(cs); ret = recv_available(ckp, cs);
if (ret < 1) { if (ret < 1) {
/* If we have done wait_read_select there should be /* If we have done wait_read_select there should be
* something to read and if we get nothing it means the * something to read and if we get nothing it means the

6
src/ckpool.h

@ -209,6 +209,10 @@ struct ckpool_instance {
/* Should we daemonise the ckpool process */ /* Should we daemonise the ckpool process */
bool daemon; bool daemon;
/* Have we given warnings about the inability to raise buf sizes */
bool wmem_warn;
bool rmem_warn;
/* Bitcoind data */ /* Bitcoind data */
int btcds; int btcds;
char **btcdurl; char **btcdurl;
@ -316,6 +320,8 @@ ckpool_t *global_ckp;
bool ping_main(ckpool_t *ckp); bool ping_main(ckpool_t *ckp);
void empty_buffer(connsock_t *cs); void empty_buffer(connsock_t *cs);
int set_sendbufsize(ckpool_t *ckp, const int fd, const int len);
int set_recvbufsize(ckpool_t *ckp, const int fd, const int len);
int read_socket_line(connsock_t *cs, float *timeout); int read_socket_line(connsock_t *cs, float *timeout);
void _send_proc(proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line); void _send_proc(proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line);
#define send_proc(pi, msg) _send_proc(pi, msg, __FILE__, __func__, __LINE__) #define send_proc(pi, msg) _send_proc(pi, msg, __FILE__, __func__, __LINE__)

31
src/connector.c

@ -156,9 +156,6 @@ struct connector_data {
/* Pending sends to the upstream server */ /* Pending sends to the upstream server */
ckmsgq_t *upstream_sends; ckmsgq_t *upstream_sends;
connsock_t upstream_cs; connsock_t upstream_cs;
/* Have we given the warning about inability to raise sendbuf size */
bool wmem_warn;
}; };
typedef struct connector_data cdata_t; typedef struct connector_data cdata_t;
@ -681,32 +678,8 @@ static bool send_sender_send(ckpool_t *ckp, cdata_t *cdata, sender_send_t *sende
/* Increase sendbufsize to match large messages sent to clients - this /* Increase sendbufsize to match large messages sent to clients - this
* usually only applies to clients as mining nodes. */ * usually only applies to clients as mining nodes. */
if (unlikely(sender_send->len > client->sendbufsize && !cdata->wmem_warn)) { if (unlikely(!ckp->wmem_warn && sender_send->len > client->sendbufsize))
int opt, len = sender_send->len; client->sendbufsize = set_sendbufsize(ckp, client->fd, sender_send->len);
socklen_t optlen;
optlen = sizeof(opt);
opt = len * 4 / 3;
setsockopt(client->fd, SOL_SOCKET, SO_SNDBUF, &opt, optlen);
getsockopt(client->fd, SOL_SOCKET, SO_SNDBUF, &opt, &optlen);
opt /= 2;
if (opt < len) {
LOGDEBUG("Failed to set desired sendbufsize of %d unprivileged, only got %d",
len, opt);
optlen = sizeof(opt);
opt = len * 4 / 3;
setsockopt(client->fd, SOL_SOCKET, SO_SNDBUFFORCE, &opt, optlen);
getsockopt(client->fd, SOL_SOCKET, SO_SNDBUF, &opt, &optlen);
opt /= 2;
}
client->sendbufsize = opt;
if (opt < len) {
LOGWARNING("Failed to increase sendbufsize to %d, increase wmem_max or start %s privileged",
len, ckp->name);
cdata->wmem_warn = true;
} else
LOGDEBUG("Increased sendbufsize to %d of desired %d", opt, len);
}
while (sender_send->len) { while (sender_send->len) {
int ret = write(client->fd, sender_send->buf + sender_send->ofs, sender_send->len); int ret = write(client->fd, sender_send->buf + sender_send->ofs, sender_send->len);

Loading…
Cancel
Save