Browse Source

Deprecate send_proc by adding messages to other proc instances directly to their message queues for now.

master
Con Kolivas 8 years ago
parent
commit
3dfe179605
  1. 39
      src/ckpool.c
  2. 4
      src/ckpool.h
  3. 2
      src/generator.c
  4. 6
      src/stratifier.c

39
src/ckpool.c

@ -709,37 +709,26 @@ out:
return ret; return ret;
} }
/* Send a single message to a process instance when there will be no response, /* We used to send messages between each proc_instance via unix sockets when
* closing the socket immediately. */ * ckpool was a multi-process model but that is no longer required so we can
void _send_proc(const proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line) * place the messages directly on the other proc_instance's queue until we
* deprecate this mechanism. */
void _queue_proc(proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line)
{ {
char *path = pi->us.path; unix_msg_t *umsg;
bool ret = false;
int sockd;
if (unlikely(!msg || !strlen(msg))) { if (unlikely(!msg || !strlen(msg))) {
LOGERR("Attempted to send null message to %s in send_proc", pi->processname); LOGWARNING("Null msg passed to queue_proc from %s %s:%d", file, func, line);
return; return;
} }
umsg = ckalloc(sizeof(unix_msg_t));
umsg->sockd = -1;
umsg->buf = strdup(msg);
if (unlikely(!path || !strlen(path))) { mutex_lock(&pi->rmsg_lock);
LOGERR("Attempted to send message %s to null path in send_proc", msg ? msg : ""); DL_APPEND(pi->unix_msgs, umsg);
goto out; pthread_cond_signal(&pi->rmsg_cond);
} mutex_unlock(&pi->rmsg_lock);
sockd = open_unix_client(path);
if (unlikely(sockd < 0)) {
LOGWARNING("Failed to open socket %s", path);
goto out;
}
if (unlikely(!send_unix_msg(sockd, msg)))
LOGWARNING("Failed to send %s to socket %s", msg, path);
else
ret = true;
Close(sockd);
out:
if (unlikely(!ret))
LOGERR("Failure in send_proc from %s %s:%d", file, func, line);
} }
/* Send a single message to a process instance and retrieve the response, then /* Send a single message to a process instance and retrieve the response, then

4
src/ckpool.h

@ -342,8 +342,8 @@ void empty_buffer(connsock_t *cs);
int set_sendbufsize(ckpool_t *ckp, const int fd, const int len); int set_sendbufsize(ckpool_t *ckp, const int fd, const int len);
int set_recvbufsize(ckpool_t *ckp, const int fd, const int len); int set_recvbufsize(ckpool_t *ckp, const int fd, const int len);
int read_socket_line(connsock_t *cs, float *timeout); int read_socket_line(connsock_t *cs, float *timeout);
void _send_proc(const proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line); void _queue_proc(proc_instance_t *pi, const char *msg, const char *file, const char *func, const int line);
#define send_proc(pi, msg) _send_proc(&(pi), msg, __FILE__, __func__, __LINE__) #define send_proc(pi, msg) _queue_proc(&(pi), msg, __FILE__, __func__, __LINE__)
char *_send_recv_proc(const proc_instance_t *pi, const char *msg, int writetimeout, int readtimedout, char *_send_recv_proc(const proc_instance_t *pi, const char *msg, int writetimeout, int readtimedout,
const char *file, const char *func, const int line); const char *file, const char *func, const int line);
#define send_recv_proc(pi, msg) _send_recv_proc(&(pi), msg, UNIX_WRITE_TIMEOUT, UNIX_READ_TIMEOUT, __FILE__, __func__, __LINE__) #define send_recv_proc(pi, msg) _send_recv_proc(&(pi), msg, UNIX_WRITE_TIMEOUT, UNIX_READ_TIMEOUT, __FILE__, __func__, __LINE__)

2
src/generator.c

@ -787,7 +787,7 @@ out:
static void send_notify(ckpool_t *ckp, proxy_instance_t *proxi, notify_instance_t *ni); static void send_notify(ckpool_t *ckp, proxy_instance_t *proxi, notify_instance_t *ni);
static void reconnect_generator(const ckpool_t *ckp) static void reconnect_generator(ckpool_t *ckp)
{ {
send_proc(ckp->generator, "reconnect"); send_proc(ckp->generator, "reconnect");
} }

6
src/stratifier.c

@ -1155,7 +1155,7 @@ static char *send_recv_generator(ckpool_t *ckp, const char *msg, const int prio)
return buf; return buf;
} }
static void send_generator(const ckpool_t *ckp, const char *msg, const int prio) static void send_generator(ckpool_t *ckp, const char *msg, const int prio)
{ {
sdata_t *sdata = ckp->sdata; sdata_t *sdata = ckp->sdata;
bool set; bool set;
@ -2342,7 +2342,7 @@ static int64_t best_userproxy_headroom(sdata_t *sdata, const int userid)
static void reconnect_client(sdata_t *sdata, stratum_instance_t *client); static void reconnect_client(sdata_t *sdata, stratum_instance_t *client);
static void generator_recruit(const ckpool_t *ckp, const int proxyid, const int recruits) static void generator_recruit(ckpool_t *ckp, const int proxyid, const int recruits)
{ {
char buf[256]; char buf[256];
@ -4529,7 +4529,7 @@ static proxy_t *__best_subproxy(proxy_t *proxy)
* in proxy mode where we find a subproxy based on the current proxy with room * in proxy mode where we find a subproxy based on the current proxy with room
* for more clients. Signal the generator to recruit more subproxies if we are * for more clients. Signal the generator to recruit more subproxies if we are
* running out of room. */ * running out of room. */
static sdata_t *select_sdata(const ckpool_t *ckp, sdata_t *ckp_sdata, const int userid) static sdata_t *select_sdata(ckpool_t *ckp, sdata_t *ckp_sdata, const int userid)
{ {
proxy_t *global, *proxy, *tmp, *best = NULL; proxy_t *global, *proxy, *tmp, *best = NULL;

Loading…
Cancel
Save