kanoi 10 years ago
parent
commit
3d89862c18
  1. 12
      README
  2. 8
      ckpool.conf
  3. 5
      ckproxy.conf
  4. 232
      src/ckpool.c
  5. 19
      src/ckpool.h
  6. 541
      src/connector.c
  7. 42
      src/generator.c
  8. 159
      src/libckpool.c
  9. 19
      src/libckpool.h
  10. 1209
      src/stratifier.c

12
README

@ -240,13 +240,21 @@ new network blocks and is 100 by default. It is intended to be a backup only
for when the notifier is not set up and only polls if the "notify" field is
not set on a btcd.
"nonce1length" : This is optional allowing the extranonce1 length to be chosen
from 2 to 8. Default 8
"nonce2length" : This is optional allowing the extranonce2 length to be chosen
from 2 to 8. Default 8
"update_interval" : This is the frequency that stratum updates are sent out to
miners and is set to 30 seconds by default to help perpetuate transactions for
the health of the bitcoin network.
"serverurl" : This is the IP to try to bind ckpool uniquely to, otherwise it
"serverurl" : This is the IP(s) to try to bind ckpool uniquely to, otherwise it
will attempt to bind to all interfaces in port 3333 by default in pool mode
and 3334 in proxy mode.
and 3334 in proxy mode. Multiple entries can be specified as an array by
either IP or resolvable domain name but the executable must be able to bind to
all of them and ports up to 1024 usually require privileged access.
"mindiff" : Minimum diff that vardiff will allow miners to drop to. Default 1

8
ckpool.conf

@ -16,8 +16,14 @@
"btcaddress" : "14BMjogz69qe8hk9thyzbmR5pg34mVKB1e",
"btcsig" : "/mined by ck/",
"blockpoll" : 100,
"nonce1length" : 8,
"nonce2length" : 8,
"update_interval" : 30,
"serverurl" : "ckpool.org:3333",
"serverurl" : [
"ckpool.org:3333",
"node.ckpool.org:3333",
"node.ckpool.org:80"
],
"mindiff" : 1,
"startdiff" : 42,
"maxdiff" : 0,

5
ckproxy.conf

@ -12,7 +12,10 @@
}
],
"update_interval" : 30,
"serverurl" : "192.168.1.100:3334",
"serverurl" : [
"192.168.1.100:3334",
"127.0.0.1:3334"
],
"mindiff" : 1,
"startdiff" : 42,
"maxdiff" : 0,

232
src/ckpool.c

@ -221,6 +221,21 @@ static int kill_pid(int pid, int sig)
return kill(pid, sig);
}
static int pid_wait(pid_t pid, int ms)
{
tv_t start, now;
int ret;
tv_time(&start);
do {
ret = kill_pid(pid, 0);
if (ret)
break;
tv_time(&now);
} while (ms_tvdiff(&now, &start) < ms);
return ret;
}
static int send_procmsg(proc_instance_t *pi, const char *buf)
{
char *path = pi->us.path;
@ -297,8 +312,8 @@ retry:
broadcast_proc(ckp, buf);
send_unix_msg(sockd, "success");
}
} else if (cmdmatch(buf, "getfd")) {
int connfd = send_procmsg(ckp->connector, "getfd");
} else if (cmdmatch(buf, "getxfd")) {
int connfd = send_procmsg(ckp->connector, buf);
if (connfd > 0) {
int newfd = get_fd(connfd);
@ -350,8 +365,10 @@ bool ping_main(ckpool_t *ckp)
{
char *buf;
if (unlikely(kill_pid(ckp->main.pid, 0)))
return false;
buf = send_recv_proc(&ckp->main, "ping");
if (!buf)
if (unlikely(!buf))
return false;
free(buf);
return true;
@ -674,18 +691,35 @@ static bool write_pid(ckpool_t *ckp, const char *path, pid_t pid)
ret = fscanf(fp, "%d", &oldpid);
fclose(fp);
if (ret == 1 && !(kill_pid(oldpid, 0))) {
if (ckp->handover) {
if (pid_wait(oldpid, 500))
goto out;
LOGWARNING("Old process pid %d failed to shutdown cleanly, terminating", oldpid);
}
if (!ckp->killold) {
LOGEMERG("Process %s pid %d still exists, start ckpool with -k if you wish to kill it",
path, oldpid);
return false;
}
if (kill_pid(oldpid, 15)) {
LOGEMERG("Unable to kill old process %s pid %d", path, oldpid);
return false;
}
LOGWARNING("Terminating old process %s pid %d", path, oldpid);
if (pid_wait(oldpid, 500))
goto out;
if (kill_pid(oldpid, 9)) {
LOGEMERG("Unable to kill old process %s pid %d", path, oldpid);
return false;
}
LOGWARNING("Killing off old process %s pid %d", path, oldpid);
LOGWARNING("Unable to terminate old process %s pid %d, killing", path, oldpid);
if (!pid_wait(oldpid, 500)) {
LOGEMERG("Unable to kill old process %s pid %d", path, oldpid);
return false;
}
}
}
out:
fp = fopen(path, "we");
if (!fp) {
LOGERR("Failed to open file %s", path);
@ -747,11 +781,9 @@ static void childsighandler(int sig)
signal(sig, SIG_IGN);
signal(SIGTERM, SIG_IGN);
if (sig != SIGUSR1) {
pid_t ppid = getppid();
LOGWARNING("Child process received signal %d, forwarding signal to %s main process",
sig, global_ckp->name);
kill_pid(ppid, sig);
kill_pid(global_ckp->main.pid, sig);
}
exit(0);
}
@ -834,7 +866,7 @@ static void clean_up(ckpool_t *ckp)
static void cancel_join_pthread(pthread_t *pth)
{
if (!*pth)
if (!pth || !*pth)
return;
pthread_cancel(*pth);
join_pthread(*pth);
@ -843,13 +875,22 @@ static void cancel_join_pthread(pthread_t *pth)
static void cancel_pthread(pthread_t *pth)
{
if (!*pth)
if (!pth || !*pth)
return;
pthread_cancel(*pth);
pth = NULL;
}
static void __shutdown_children(ckpool_t *ckp, int sig)
static void wait_child(pid_t *pid)
{
int ret;
do {
ret = waitpid(*pid, NULL, 0);
} while (ret != *pid);
}
static void __shutdown_children(ckpool_t *ckp)
{
int i;
@ -859,18 +900,22 @@ static void __shutdown_children(ckpool_t *ckp, int sig)
if (!ckp->children)
return;
/* Send the children a SIGUSR1 for them to shutdown gracefully, then
* wait for them to exit and kill them if they don't for 500ms. */
for (i = 0; i < ckp->proc_instances; i++) {
pid_t pid = ckp->children[i]->pid;
if (!kill_pid(pid, 0))
kill_pid(pid, sig);
kill_pid(pid, SIGUSR1);
if (!ck_completion_timeout(&wait_child, (void *)&pid, 500))
kill_pid(pid, SIGKILL);
}
}
static void shutdown_children(ckpool_t *ckp, int sig)
static void shutdown_children(ckpool_t *ckp)
{
cancel_join_pthread(&ckp->pth_watchdog);
__shutdown_children(ckp, sig);
__shutdown_children(ckp);
}
static void sighandler(int sig)
@ -883,31 +928,36 @@ static void sighandler(int sig)
ckp->name, sig);
cancel_join_pthread(&ckp->pth_watchdog);
__shutdown_children(ckp, SIGUSR1);
/* Wait, then send SIGKILL */
cksleep_ms(100);
__shutdown_children(ckp, SIGKILL);
__shutdown_children(ckp);
cancel_pthread(&ckp->pth_listener);
exit(0);
}
void json_get_string(char **store, json_t *val, const char *res)
static bool _json_get_string(char **store, json_t *entry, const char *res)
{
json_t *entry = json_object_get(val, res);
bool ret = false;
const char *buf;
*store = NULL;
if (!entry || json_is_null(entry)) {
LOGDEBUG("Json did not find entry %s", res);
return;
goto out;
}
if (!json_is_string(entry)) {
LOGWARNING("Json entry %s is not a string", res);
return;
goto out;
}
buf = json_string_value(entry);
LOGDEBUG("Json found entry %s: %s", res, buf);
*store = strdup(buf);
ret = true;
out:
return ret;
}
bool json_get_string(char **store, json_t *val, const char *res)
{
return _json_get_string(store, json_object_get(val, res), res);
}
static void json_get_int64(int64_t *store, json_t *val, const char *res)
@ -926,36 +976,44 @@ static void json_get_int64(int64_t *store, json_t *val, const char *res)
LOGDEBUG("Json found entry %s: %ld", res, *store);
}
void json_get_int(int *store, json_t *val, const char *res)
bool json_get_int(int *store, json_t *val, const char *res)
{
json_t *entry = json_object_get(val, res);
bool ret = false;
if (!entry) {
LOGDEBUG("Json did not find entry %s", res);
return;
goto out;
}
if (!json_is_integer(entry)) {
LOGWARNING("Json entry %s is not an integer", res);
return;
goto out;
}
*store = json_integer_value(entry);
LOGDEBUG("Json found entry %s: %d", res, *store);
ret = true;
out:
return ret;
}
static void json_get_bool(bool *store, json_t *val, const char *res)
static bool json_get_bool(bool *store, json_t *val, const char *res)
{
json_t *entry = json_object_get(val, res);
bool ret = false;
if (!entry) {
LOGDEBUG("Json did not find entry %s", res);
return;
goto out;
}
if (!json_is_boolean(entry)) {
LOGWARNING("Json entry %s is not a boolean", res);
return;
goto out;
}
*store = json_is_true(entry);
LOGDEBUG("Json found entry %s: %s", res, *store ? "true" : "false");
ret = true;
out:
return ret;
}
static void parse_btcds(ckpool_t *ckp, json_t *arr_val, int arr_size)
@ -994,10 +1052,41 @@ static void parse_proxies(ckpool_t *ckp, json_t *arr_val, int arr_size)
}
}
static bool parse_serverurls(ckpool_t *ckp, json_t *arr_val)
{
bool ret = false;
int arr_size, i;
if (!arr_val)
goto out;
if (!json_is_array(arr_val)) {
LOGWARNING("Unable to parse serverurl entries as an array");
goto out;
}
arr_size = json_array_size(arr_val);
if (!arr_size) {
LOGWARNING("Serverurl array empty");
goto out;
}
ckp->serverurls = arr_size;
ckp->serverurl = ckalloc(sizeof(char *) * arr_size);
for (i = 0; i < arr_size; i++) {
json_t *val = json_array_get(arr_val, i);
if (!_json_get_string(&ckp->serverurl[i], val, "serverurl"))
LOGWARNING("Invalid serverurl entry number %d", i);
}
ret = true;
out:
return ret;
}
static void parse_config(ckpool_t *ckp)
{
json_t *json_conf, *arr_val;
json_error_t err_val;
int arr_size;
char *url;
json_conf = json_load_file(ckp->config, JSON_DISABLE_EOF_CHECK, &err_val);
if (!json_conf) {
@ -1007,8 +1096,7 @@ static void parse_config(ckpool_t *ckp)
}
arr_val = json_object_get(json_conf, "btcd");
if (arr_val && json_is_array(arr_val)) {
int arr_size = json_array_size(arr_val);
arr_size = json_array_size(arr_val);
if (arr_size)
parse_btcds(ckp, arr_val, arr_size);
}
@ -1019,8 +1107,18 @@ static void parse_config(ckpool_t *ckp)
ckp->btcsig[38] = '\0';
}
json_get_int(&ckp->blockpoll, json_conf, "blockpoll");
json_get_int(&ckp->nonce1length, json_conf, "nonce1length");
json_get_int(&ckp->nonce2length, json_conf, "nonce2length");
json_get_int(&ckp->update_interval, json_conf, "update_interval");
json_get_string(&ckp->serverurl, json_conf, "serverurl");
/* Look for an array first and then a single entry */
arr_val = json_object_get(json_conf, "serverurl");
if (!parse_serverurls(ckp, arr_val)) {
if (json_get_string(&url, json_conf, "serverurl")) {
ckp->serverurl = ckalloc(sizeof(char *));
ckp->serverurl[0] = url;
ckp->serverurls = 1;
}
}
json_get_int64(&ckp->mindiff, json_conf, "mindiff");
json_get_int64(&ckp->startdiff, json_conf, "startdiff");
json_get_int64(&ckp->maxdiff, json_conf, "maxdiff");
@ -1028,8 +1126,7 @@ static void parse_config(ckpool_t *ckp)
json_get_int(&ckp->maxclients, json_conf, "maxclients");
arr_val = json_object_get(json_conf, "proxy");
if (arr_val && json_is_array(arr_val)) {
int arr_size = json_array_size(arr_val);
arr_size = json_array_size(arr_val);
if (arr_size)
parse_proxies(ckp, arr_val, arr_size);
}
@ -1104,6 +1201,7 @@ static void *watchdog(void *arg)
static struct option long_options[] = {
{"standalone", no_argument, 0, 'A'},
{"config", required_argument, 0, 'c'},
{"daemonise", no_argument, 0, 'D'},
{"ckdb-name", required_argument, 0, 'd'},
{"group", required_argument, 0, 'g'},
{"handover", no_argument, 0, 'H'},
@ -1121,6 +1219,7 @@ static struct option long_options[] = {
#else
static struct option long_options[] = {
{"config", required_argument, 0, 'c'},
{"daemonise", no_argument, 0, 'D'},
{"group", required_argument, 0, 'g'},
{"handover", no_argument, 0, 'H'},
{"help", no_argument, 0, 'h'},
@ -1135,16 +1234,21 @@ static struct option long_options[] = {
};
#endif
static void send_recv_path(const char *path, const char *msg)
static bool send_recv_path(const char *path, const char *msg)
{
int sockd = open_unix_client(path);
bool ret = false;
char *response;
send_unix_msg(sockd, msg);
response = recv_unix_msg(sockd);
if (response) {
ret = true;
LOGWARNING("Received: %s in response to %s request", response, msg);
dealloc(response);
}
Close(sockd);
return ret;
}
int main(int argc, char **argv)
@ -1165,7 +1269,7 @@ int main(int argc, char **argv)
ckp.initial_args[ckp.args] = strdup(argv[ckp.args]);
ckp.initial_args[ckp.args] = NULL;
while ((c = getopt_long(argc, argv, "Ac:d:g:HhkLl:n:PpS:s:", long_options, &i)) != -1) {
while ((c = getopt_long(argc, argv, "Ac:Dd:g:HhkLl:n:PpS:s:", long_options, &i)) != -1) {
switch (c) {
case 'A':
ckp.standalone = true;
@ -1173,6 +1277,9 @@ int main(int argc, char **argv)
case 'c':
ckp.config = optarg;
break;
case 'D':
ckp.daemon = true;
break;
case 'd':
ckp.ckdb_name = optarg;
break;
@ -1313,6 +1420,14 @@ int main(int argc, char **argv)
ckp.btcaddress = ckp.donaddress;
if (!ckp.blockpoll)
ckp.blockpoll = 100;
if (!ckp.nonce1length)
ckp.nonce1length = 8;
else if (ckp.nonce1length < 2 || ckp.nonce1length > 8)
quit(0, "Invalid nonce1length %d specified, must be 2~8", ckp.nonce1length);
if (!ckp.nonce2length)
ckp.nonce2length = 8;
else if (ckp.nonce2length < 2 || ckp.nonce2length > 8)
quit(0, "Invalid nonce2length %d specified, must be 2~8", ckp.nonce2length);
if (!ckp.update_interval)
ckp.update_interval = 30;
if (!ckp.mindiff)
@ -1321,6 +1436,8 @@ int main(int argc, char **argv)
ckp.startdiff = 42;
if (!ckp.logdir)
ckp.logdir = strdup("logs");
if (!ckp.serverurls)
ckp.serverurl = ckzalloc(sizeof(char *));
if (ckp.proxy && !ckp.proxies)
quit(0, "No proxy entries found in config file %s", ckp.config);
@ -1360,20 +1477,45 @@ int main(int argc, char **argv)
ckp.main.processname = strdup("main");
ckp.main.sockname = strdup("listener");
name_process_sockname(&ckp.main.us, &ckp.main);
ckp.oldconnfd = ckzalloc(sizeof(int *) * ckp.serverurls);
if (ckp.handover) {
int sockd = open_unix_client(ckp.main.us.path);
const char *path = ckp.main.us.path;
if (send_recv_path(path, "ping")) {
for (i = 0; i < ckp.serverurls; i++) {
char getfd[16];
int sockd;
if (sockd > 0 && send_unix_msg(sockd, "getfd")) {
ckp.oldconnfd = get_fd(sockd);
snprintf(getfd, 15, "getxfd%d", i);
sockd = open_unix_client(path);
if (sockd < 1)
break;
if (!send_unix_msg(sockd, getfd))
break;
ckp.oldconnfd[i] = get_fd(sockd);
Close(sockd);
if (!ckp.oldconnfd[i])
break;
LOGWARNING("Inherited old server socket %d with new file descriptor %d!",
i, ckp.oldconnfd[i]);
}
send_recv_path(path, "reject");
send_recv_path(path, "reconnect");
send_recv_path(path, "shutdown");
}
}
send_recv_path(ckp.main.us.path, "reject");
send_recv_path(ckp.main.us.path, "reconnect");
send_recv_path(ckp.main.us.path, "shutdown");
cksleep_ms(500);
if (ckp.daemon) {
int fd;
if (ckp.oldconnfd > 0)
LOGWARNING("Inherited old socket with new file descriptor %d!", ckp.oldconnfd);
if (fork())
exit(0);
setsid();
fd = open("/dev/null",O_RDWR, 0);
if (fd != -1) {
dup2(fd, STDIN_FILENO);
dup2(fd, STDOUT_FILENO);
dup2(fd, STDERR_FILENO);
}
}
@ -1403,7 +1545,7 @@ int main(int argc, char **argv)
if (ckp.pth_listener)
join_pthread(ckp.pth_listener);
shutdown_children(&ckp, SIGTERM);
shutdown_children(&ckp);
clean_up(&ckp);
return 0;

19
src/ckpool.h

@ -109,8 +109,8 @@ struct ckpool_instance {
/* Logfile */
FILE *logfp;
int logfd;
/* Connector fd if we inherited it from a running process */
int oldconnfd;
/* Connector fds if we inherit them from a running process */
int *oldconnfd;
/* Should we inherit a running instance's socket and shut it down */
bool handover;
/* How many clients maximum to accept before rejecting further */
@ -141,6 +141,9 @@ struct ckpool_instance {
/* Are we running without ckdb */
bool standalone;
/* Should we daemonise the ckpool process */
bool daemon;
/* Bitcoind data */
int btcds;
char **btcdurl;
@ -148,6 +151,8 @@ struct ckpool_instance {
char **btcdpass;
bool *btcdnotify;
int blockpoll; // How frequently in ms to poll bitcoind for block updates
int nonce1length; // Extranonce1 length
int nonce2length; // Extranonce2 length
/* Difficulty settings */
int64_t mindiff; // Default 1
@ -162,7 +167,8 @@ struct ckpool_instance {
/* Stratum options */
server_instance_t **servers;
char *serverurl; // URL to bind our server/proxy to
char **serverurl; // Array of URLs to bind our server/proxy to
int serverurls; // Number of server bindings
int update_interval; // Seconds between stratum updates
int chosen_server; // Chosen server for next connection
@ -172,6 +178,9 @@ struct ckpool_instance {
char **proxyauth;
char **proxypass;
server_instance_t *btcdbackup;
/* Private data for each process */
void *data;
};
#ifdef USE_CKDB
@ -203,7 +212,7 @@ char *_ckdb_msg_call(const ckpool_t *ckp, char *msg, const char *file, const ch
json_t *json_rpc_call(connsock_t *cs, const char *rpc_req);
int process_exit(ckpool_t *ckp, proc_instance_t *pi, int ret);
void json_get_string(char **store, json_t *val, const char *res);
void json_get_int(int *store, json_t *val, const char *res);
bool json_get_string(char **store, json_t *val, const char *res);
bool json_get_int(int *store, json_t *val, const char *res);
#endif /* CKPOOL_H */

541
src/connector.c

@ -24,30 +24,25 @@
#define MAX_MSGSIZE 1024
#define SOI (sizeof(int))
struct connector_instance {
cklock_t lock;
proc_instance_t *pi;
int serverfd;
int nfds;
bool accept;
pthread_t pth_sender;
pthread_t pth_receiver;
};
typedef struct connector_instance conn_instance_t;
struct client_instance {
/* For clients hashtable */
UT_hash_handle hh;
int64_t id;
int fd;
/* Reference count for when this instance is used outside of the
* connector_data lock */
int ref;
/* For dead_clients list */
struct client_instance *next;
struct sockaddr address;
char address_name[INET6_ADDRSTRLEN];
/* Which serverurl is this instance connected to */
int server;
char buf[PAGESIZE];
int bufofs;
@ -56,13 +51,6 @@ struct client_instance {
typedef struct client_instance client_instance_t;
/* For the hashtable of all clients */
static client_instance_t *clients;
/* Linked list of dead clients no longer in use but may still have references */
static client_instance_t *dead_clients;
static int64_t client_id = 1;
struct sender_send {
struct sender_send *next;
struct sender_send *prev;
@ -74,44 +62,90 @@ struct sender_send {
typedef struct sender_send sender_send_t;
/* For the linked list of pending sends */
static sender_send_t *sender_sends;
static sender_send_t *delayed_sends;
/* Private data for the connector */
struct connector_data {
ckpool_t *ckp;
cklock_t lock;
proc_instance_t *pi;
/* Array of server fds */
int *serverfd;
/* All time count of clients connected */
int nfds;
bool accept;
pthread_t pth_sender;
pthread_t pth_receiver;
/* For the hashtable of all clients */
client_instance_t *clients;
/* Linked list of dead clients no longer in use but may still have references */
client_instance_t *dead_clients;
int64_t client_id;
/* For the linked list of pending sends */
sender_send_t *sender_sends;
sender_send_t *delayed_sends;
/* For protecting the pending sends list */
pthread_mutex_t sender_lock;
pthread_cond_t sender_cond;
};
typedef struct connector_data cdata_t;
/* For protecting the pending sends list */
static pthread_mutex_t sender_lock;
static pthread_cond_t sender_cond;
/* Increase the reference count of instance */
static void __inc_instance_ref(client_instance_t *client)
{
client->ref++;
}
/* Increase the reference count of instance */
static void __dec_instance_ref(client_instance_t *client)
{
client->ref--;
}
static void dec_instance_ref(cdata_t *cdata, client_instance_t *client)
{
ck_wlock(&cdata->lock);
__dec_instance_ref(client);
ck_wunlock(&cdata->lock);
}
/* Accepts incoming connections on the server socket and generates client
* instances */
static int accept_client(conn_instance_t *ci, int epfd)
static int accept_client(cdata_t *cdata, const int epfd, const uint64_t server)
{
ckpool_t *ckp = ci->pi->ckp;
int fd, port, no_clients, sockd;
ckpool_t *ckp = cdata->ckp;
client_instance_t *client;
struct epoll_event event;
int fd, port, no_clients;
socklen_t address_len;
ck_rlock(&ci->lock);
no_clients = HASH_COUNT(clients);
ck_runlock(&ci->lock);
ck_rlock(&cdata->lock);
no_clients = HASH_COUNT(cdata->clients);
ck_runlock(&cdata->lock);
if (unlikely(ckp->maxclients && no_clients >= ckp->maxclients)) {
LOGWARNING("Server full with %d clients", no_clients);
return 0;
}
sockd = cdata->serverfd[server];
client = ckzalloc(sizeof(client_instance_t));
client->server = server;
address_len = sizeof(client->address);
fd = accept(ci->serverfd, &client->address, &address_len);
fd = accept(sockd, &client->address, &address_len);
if (unlikely(fd < 0)) {
/* Handle these errors gracefully should we ever share this
* socket */
if (errno == EAGAIN || errno == EWOULDBLOCK || errno == ECONNABORTED || errno == EINTR) {
if (errno == EAGAIN || errno == EWOULDBLOCK || errno == ECONNABORTED) {
LOGERR("Recoverable error on accept in accept_client");
return 0;
}
LOGERR("Failed to accept on socket %d in acceptor", ci->serverfd);
LOGERR("Failed to accept on socket %d in acceptor", sockd);
dealloc(client);
return -1;
}
@ -132,7 +166,7 @@ static int accept_client(conn_instance_t *ci, int epfd)
break;
default:
LOGWARNING("Unknown INET type for client %d on socket %d",
ci->nfds, fd);
cdata->nfds, fd);
Close(fd);
free(client);
return 0;
@ -142,7 +176,7 @@ static int accept_client(conn_instance_t *ci, int epfd)
nolinger_socket(fd);
LOGINFO("Connected new client %d on socket %d to %d active clients from %s:%d",
ci->nfds, fd, no_clients, client->address_name, port);
cdata->nfds, fd, no_clients, client->address_name, port);
client->fd = fd;
event.data.ptr = client;
@ -153,27 +187,36 @@ static int accept_client(conn_instance_t *ci, int epfd)
return 0;
}
ck_wlock(&ci->lock);
client->id = client_id++;
HASH_ADD_I64(clients, id, client);
ci->nfds++;
ck_wunlock(&ci->lock);
/* We increase the ref count on this client as epoll creates a pointer
* to it. We drop that reference when the socket is closed which
* removes it automatically from the epoll list. */
__inc_instance_ref(client);
ck_wlock(&cdata->lock);
client->id = cdata->client_id++;
HASH_ADD_I64(cdata->clients, id, client);
cdata->nfds++;
ck_wunlock(&cdata->lock);
return 1;
}
static int drop_client(conn_instance_t *ci, client_instance_t *client)
/* Client must hold a reference count */
static int drop_client(cdata_t *cdata, client_instance_t *client)
{
int fd;
ck_wlock(&ci->lock);
ck_wlock(&cdata->lock);
fd = client->fd;
if (fd != -1) {
Close(client->fd);
HASH_DEL(clients, client);
LL_PREPEND(dead_clients, client);
HASH_DEL(cdata->clients, client);
LL_PREPEND(cdata->dead_clients, client);
/* This is the reference to this client's presence in the
* epoll list. */
__dec_instance_ref(client);
}
ck_wunlock(&ci->lock);
ck_wunlock(&cdata->lock);
if (fd > -1)
LOGINFO("Connector dropped client %"PRId64" fd %d", client->id, fd);
@ -190,28 +233,44 @@ static void stratifier_drop_client(ckpool_t *ckp, int64_t id)
}
/* Invalidate this instance. Remove them from the hashtables we look up
* regularly but keep the instances in a linked list indefinitely in case we
* still reference any of its members. */
static void invalidate_client(ckpool_t *ckp, conn_instance_t *ci, client_instance_t *client)
* regularly but keep the instances in a linked list until their ref count
* drops to zero when we can remove them lazily. Client must hold a reference
* count. */
static void invalidate_client(ckpool_t *ckp, cdata_t *cdata, client_instance_t *client)
{
drop_client(ci, client);
client_instance_t *tmp;
drop_client(cdata, client);
if (ckp->passthrough)
return;
stratifier_drop_client(ckp, client->id);
/* Cull old unused clients lazily when there are no more reference
* counts for them. */
ck_wlock(&cdata->lock);
LL_FOREACH_SAFE(cdata->dead_clients, client, tmp) {
if (!client->ref) {
LL_DELETE(cdata->dead_clients, client);
LOGINFO("Connector discarding client %ld", client->id);
free(client);
}
}
ck_wunlock(&cdata->lock);
}
static void send_client(conn_instance_t *ci, int64_t id, char *buf);
static void send_client(cdata_t *cdata, int64_t id, char *buf);
static void parse_client_msg(conn_instance_t *ci, client_instance_t *client)
/* Client is holding a reference count from being on the epoll list */
static void parse_client_msg(cdata_t *cdata, client_instance_t *client)
{
int buflen, ret, selfail = 0;
ckpool_t *ckp = ci->pi->ckp;
ckpool_t *ckp = cdata->ckp;
char msg[PAGESIZE], *eol;
json_t *val;
retry:
/* Select should always return positive after poll unless we have
* been disconnected. On retries, decide whether we should do further
* been disconnected. On retries, decdatade whether we should do further
* reads based on select readiness and only fail if we get an error. */
ret = wait_read_select(client->fd, 0);
if (ret < 1) {
@ -219,7 +278,7 @@ retry:
return;
LOGINFO("Client fd %d disconnected - select fail with bufofs %d ret %d errno %d %s",
client->fd, client->bufofs, ret, errno, ret && errno ? strerror(errno) : "");
invalidate_client(ckp, ci, client);
invalidate_client(ckp, cdata, client);
return;
}
selfail = -1;
@ -231,7 +290,7 @@ retry:
* client has disconnected. */
LOGINFO("Client fd %d disconnected - recv fail with bufofs %d ret %d errno %d %s",
client->fd, client->bufofs, ret, errno, ret && errno ? strerror(errno) : "");
invalidate_client(ckp, ci, client);
invalidate_client(ckp, cdata, client);
return;
}
client->bufofs += ret;
@ -240,7 +299,7 @@ reparse:
if (!eol) {
if (unlikely(client->bufofs > MAX_MSGSIZE)) {
LOGWARNING("Client fd %d overloaded buffer without EOL, disconnecting", client->fd);
invalidate_client(ckp, ci, client);
invalidate_client(ckp, cdata, client);
return;
}
goto retry;
@ -250,7 +309,7 @@ reparse:
buflen = eol - client->buf + 1;
if (unlikely(buflen > MAX_MSGSIZE)) {
LOGWARNING("Client fd %d message oversize, disconnecting", client->fd);
invalidate_client(ckp, ci, client);
invalidate_client(ckp, cdata, client);
return;
}
memcpy(msg, client->buf, buflen);
@ -261,9 +320,9 @@ reparse:
if (!(val = json_loads(msg, 0, NULL))) {
char *buf = strdup("Invalid JSON, disconnecting\n");
LOGINFO("Client id %"PRId64" sent invalid json message %s", client->id, msg);
send_client(ci, client->id, buf);
invalidate_client(ckp, ci, client);
LOGINFO("Client id %ld sent invalid json message %s", client->id, msg);
send_client(cdata, client->id, buf);
invalidate_client(ckp, cdata, client);
return;
} else {
int64_t passthrough_id;
@ -277,6 +336,7 @@ reparse:
} else
json_object_set_new_nocheck(val, "client_id", json_integer(client->id));
json_object_set_new_nocheck(val, "address", json_string(client->address_name));
json_object_set_new_nocheck(val, "server", json_integer(client->server));
s = json_dumps(val, 0);
if (ckp->passthrough)
send_proc(ckp->generator, s);
@ -295,10 +355,9 @@ reparse:
* handles the incoming messages */
void *receiver(void *arg)
{
conn_instance_t *ci = (conn_instance_t *)arg;
cdata_t *cdata = (cdata_t *)arg;
int ret, epfd, i, serverfds;
struct epoll_event event;
bool maxconn = true;
int ret, epfd;
rename_proc("creceiver");
@ -307,38 +366,40 @@ void *receiver(void *arg)
LOGEMERG("FATAL: Failed to create epoll in receiver");
return NULL;
}
event.data.fd = ci->serverfd;
serverfds = cdata->ckp->serverurls;
/* Add all the serverfds to the epoll */
for (i = 0; i < serverfds; i++) {
/* The small values will be easily identifiable compared to
* pointers */
event.data.u64 = i;
event.events = EPOLLIN;
ret = epoll_ctl(epfd, EPOLL_CTL_ADD, ci->serverfd, &event);
ret = epoll_ctl(epfd, EPOLL_CTL_ADD, cdata->serverfd[i], &event);
if (ret < 0) {
LOGEMERG("FATAL: Failed to add epfd %d to epoll_ctl", epfd);
return NULL;
}
/* When we first start we listen to as many connections as
* possible. Once we start polling we drop the listen to the
* minimum to effectively ratelimit how fast we can receive
* connections. */
LOGDEBUG("Dropping listen backlog to 0");
listen(cdata->serverfd[i], 0);
}
while (42) {
client_instance_t *client;
while (unlikely(!ci->accept))
while (unlikely(!cdata->accept))
cksleep_ms(100);
ret = epoll_wait(epfd, &event, 1, 1000);
if (unlikely(ret == -1)) {
LOGEMERG("FATAL: Failed to epoll_wait in receiver");
break;
}
if (unlikely(!ret)) {
if (unlikely(maxconn)) {
/* When we first start we listen to as many connections as
* possible. Once we stop receiving connections we drop the
* listen to the minimum to effectively ratelimit how fast we
* can receive connections. */
LOGDEBUG("Dropping listen backlog to 0");
maxconn = false;
listen(ci->serverfd, 0);
}
if (unlikely(!ret))
continue;
}
if (event.data.fd == ci->serverfd) {
ret = accept_client(ci, epfd);
if (event.data.u64 < (uint64_t)serverfds) {
ret = accept_client(cdata, epfd, event.data.u64);
if (unlikely(ret < 0)) {
LOGEMERG("FATAL: Failed to accept_client in receiver");
break;
@ -349,10 +410,10 @@ void *receiver(void *arg)
if ((event.events & EPOLLERR) || (event.events & EPOLLHUP)) {
/* Client disconnected */
LOGDEBUG("Client fd %d HUP in epoll", client->fd);
invalidate_client(ci->pi->ckp, ci, client);
invalidate_client(cdata->pi->ckp, cdata, client);
continue;
}
parse_client_msg(ci, client);
parse_client_msg(cdata, client);
}
return NULL;
}
@ -361,8 +422,8 @@ void *receiver(void *arg)
* ready for writing immediately to not delay other messages. */
void *sender(void *arg)
{
conn_instance_t *ci = (conn_instance_t *)arg;
ckpool_t *ckp = ci->pi->ckp;
cdata_t *cdata = (cdata_t *)arg;
ckpool_t *ckp = cdata->ckp;
bool sent = false;
rename_proc("csender");
@ -372,23 +433,23 @@ void *sender(void *arg)
client_instance_t *client;
int ret, fd, ofs = 0;
mutex_lock(&sender_lock);
mutex_lock(&cdata->sender_lock);
/* Poll every 100ms if there are no new sends. Re-examine
* delayed sends immediately after a successful send in case
* endless new sends more frequently end up starving the
* delayed sends. */
if (!sender_sends && !sent) {
if (!cdata->sender_sends && !sent) {
const ts_t polltime = {0, 100000000};
ts_t timeout_ts;
ts_realtime(&timeout_ts);
timeraddspec(&timeout_ts, &polltime);
pthread_cond_timedwait(&sender_cond, &sender_lock, &timeout_ts);
pthread_cond_timedwait(&cdata->sender_cond, &cdata->sender_lock, &timeout_ts);
}
sender_send = sender_sends;
sender_send = cdata->sender_sends;
if (sender_send)
DL_DELETE(sender_sends, sender_send);
mutex_unlock(&sender_lock);
DL_DELETE(cdata->sender_sends, sender_send);
mutex_unlock(&cdata->sender_lock);
sent = false;
@ -396,23 +457,21 @@ void *sender(void *arg)
* conditional with no new sends appearing or have just
* serviced another message successfully. */
if (!sender_send) {
if (!delayed_sends)
if (!cdata->delayed_sends)
continue;
sender_send = delayed_sends;
DL_DELETE(delayed_sends, sender_send);
sender_send = cdata->delayed_sends;
DL_DELETE(cdata->delayed_sends, sender_send);
}
client = sender_send->client;
ck_rlock(&ci->lock);
ck_rlock(&cdata->lock);
fd = client->fd;
ck_runlock(&ci->lock);
ck_runlock(&cdata->lock);
if (fd == -1) {
LOGDEBUG("Discarding message sent to invalidated client");
free(sender_send->buf);
free(sender_send);
continue;
goto contfree;
}
/* If this socket is not ready to receive data from us, put the
* send back on the tail of the list and decrease the timeout
@ -421,33 +480,33 @@ void *sender(void *arg)
ret = wait_write_select(fd, 0);
if (ret < 1) {
if (ret < 0) {
LOGINFO("Client id %"PRId64" fd %d interrupted", client->id, fd);
invalidate_client(ckp, ci, client);
free(sender_send->buf);
free(sender_send);
continue;
LOGINFO("Client id %ld fd %d interrupted", client->id, fd);
invalidate_client(ckp, cdata, client);
goto contfree;
}
LOGDEBUG("Client %"PRId64" not ready for writes", client->id);
/* Append it to the tail of the delayed sends list.
* This is the only function that alters it so no
* locking is required. */
DL_APPEND(delayed_sends, sender_send);
* locking is required. Keep the client ref. */
DL_APPEND(cdata->delayed_sends, sender_send);
continue;
}
sent = true;
while (sender_send->len) {
ret = send(fd, sender_send->buf + ofs, sender_send->len , 0);
if (unlikely(ret < 0)) {
LOGINFO("Client id %"PRId64" fd %d disconnected", client->id, fd);
invalidate_client(ckp, ci, client);
LOGINFO("Client id %ld fd %d disconnected", client->id, fd);
invalidate_client(ckp, cdata, client);
break;
}
ofs += ret;
sender_send->len -= ret;
}
contfree:
free(sender_send->buf);
free(sender_send);
dec_instance_ref(cdata, client);
}
return NULL;
@ -455,7 +514,7 @@ void *sender(void *arg)
/* Send a client by id a heap allocated buffer, allowing this function to
* free the ram. */
static void send_client(conn_instance_t *ci, int64_t id, char *buf)
static void send_client(cdata_t *cdata, int64_t id, char *buf)
{
sender_send_t *sender_send;
client_instance_t *client;
@ -472,19 +531,25 @@ static void send_client(conn_instance_t *ci, int64_t id, char *buf)
return;
}
ck_rlock(&ci->lock);
HASH_FIND_I64(clients, &id, client);
if (likely(client))
ck_ilock(&cdata->lock);
HASH_FIND_I64(cdata->clients, &id, client);
if (likely(client)) {
ck_ulock(&cdata->lock);
fd = client->fd;
ck_runlock(&ci->lock);
/* Grab a reference to this client until the sender_send has
* completed processing. */
__inc_instance_ref(client);
ck_dwilock(&cdata->lock);
}
ck_uilock(&cdata->lock);
if (unlikely(fd == -1)) {
ckpool_t *ckp = ci->pi->ckp;
ckpool_t *ckp = cdata->ckp;
if (client) {
/* This shouldn't happen */
LOGWARNING("Client id %ld disconnected but fd already invalidated!", id);
invalidate_client(ckp, ci, client);
invalidate_client(ckp, cdata, client);
} else {
LOGINFO("Connector failed to find client id %ld to send to", id);
stratifier_drop_client(ckp, id);
@ -498,41 +563,74 @@ static void send_client(conn_instance_t *ci, int64_t id, char *buf)
sender_send->buf = buf;
sender_send->len = len;
mutex_lock(&sender_lock);
DL_APPEND(sender_sends, sender_send);
pthread_cond_signal(&sender_cond);
mutex_unlock(&sender_lock);
mutex_lock(&cdata->sender_lock);
DL_APPEND(cdata->sender_sends, sender_send);
pthread_cond_signal(&cdata->sender_cond);
mutex_unlock(&cdata->sender_lock);
}
static client_instance_t *client_by_id(conn_instance_t *ci, int64_t id)
static client_instance_t *ref_client_by_id(cdata_t *cdata, int64_t id)
{
client_instance_t *client;
ck_rlock(&ci->lock);
HASH_FIND_I64(clients, &id, client);
ck_runlock(&ci->lock);
ck_ilock(&cdata->lock);
HASH_FIND_I64(cdata->clients, &id, client);
if (client) {
ck_ulock(&cdata->lock);
__inc_instance_ref(client);
ck_dwilock(&cdata->lock);
}
ck_uilock(&cdata->lock);
return client;
}
static void passthrough_client(conn_instance_t *ci, client_instance_t *client)
static void passthrough_client(cdata_t *cdata, client_instance_t *client)
{
char *buf;
LOGINFO("Connector adding passthrough client %"PRId64, client->id);
client->passthrough = true;
ASPRINTF(&buf, "{\"result\": true}\n");
send_client(ci, client->id, buf);
send_client(cdata, client->id, buf);
}
static int connector_loop(proc_instance_t *pi, conn_instance_t *ci)
static void process_client_msg(cdata_t *cdata, const char *buf)
{
int64_t client_id64, client_id;
json_t *json_msg;
char *msg;
json_msg = json_loads(buf, 0, NULL);
if (unlikely(!json_msg)) {
LOGWARNING("Invalid json message: %s", buf);
return;
}
/* Extract the client id from the json message and remove its entry */
client_id64 = json_integer_value(json_object_get(json_msg, "client_id"));
json_object_del(json_msg, "client_id");
if (client_id64 > 0xffffffffll) {
int64_t passthrough_id;
passthrough_id = client_id64 & 0xffffffffll;
client_id = client_id64 >> 32;
json_object_set_new_nocheck(json_msg, "client_id", json_integer(passthrough_id));
} else
client_id = client_id64;
msg = json_dumps(json_msg, 0);
realloc_strcat(&msg, "\n");
send_client(cdata, client_id, msg);
json_decref(json_msg);
}
static int connector_loop(proc_instance_t *pi, cdata_t *cdata)
{
int sockd = -1, ret = 0, selret;
int64_t client_id64, client_id;
unixsock_t *us = &pi->us;
ckpool_t *ckp = pi->ckp;
char *buf = NULL;
json_t *json_msg;
do {
selret = wait_read_select(us->sockd, 5);
@ -546,12 +644,12 @@ static int connector_loop(proc_instance_t *pi, conn_instance_t *ci)
LOGWARNING("%s connector ready", ckp->name);
retry:
if (unlikely(!pthread_tryjoin_np(ci->pth_sender, NULL))) {
if (unlikely(!pthread_tryjoin_np(cdata->pth_sender, NULL))) {
LOGEMERG("Connector sender thread shutdown, exiting");
ret = 1;
goto out;
}
if (unlikely(!pthread_tryjoin_np(ci->pth_receiver, NULL))) {
if (unlikely(!pthread_tryjoin_np(cdata->pth_receiver, NULL))) {
LOGEMERG("Connector receiver thread shutdown, exiting");
ret = 1;
goto out;
@ -571,30 +669,26 @@ retry:
LOGWARNING("Failed to get message in connector_loop");
goto retry;
}
LOGDEBUG("Connector received message: %s", buf);
if (cmdmatch(buf, "ping")) {
/* The bulk of the messages will be json messages to send to clients
* so look for them first. */
if (likely(buf[0] == '{')) {
process_client_msg(cdata, buf);
} else if (cmdmatch(buf, "ping")) {
LOGDEBUG("Connector received ping request");
send_unix_msg(sockd, "pong");
goto retry;
}
if (cmdmatch(buf, "accept")) {
} else if (cmdmatch(buf, "accept")) {
LOGDEBUG("Connector received accept signal");
ci->accept = true;
goto retry;
}
if (cmdmatch(buf, "reject")) {
cdata->accept = true;
} else if (cmdmatch(buf, "reject")) {
LOGDEBUG("Connector received reject signal");
ci->accept = false;
goto retry;
}
if (cmdmatch(buf, "loglevel")) {
cdata->accept = false;
} else if (cmdmatch(buf, "loglevel")) {
sscanf(buf, "loglevel=%d", &ckp->loglevel);
goto retry;
}
if (cmdmatch(buf, "shutdown"))
} else if (cmdmatch(buf, "shutdown")) {
goto out;
if (cmdmatch(buf, "dropclient")) {
} else if (cmdmatch(buf, "dropclient")) {
client_instance_t *client;
ret = sscanf(buf, "dropclient=%ld", &client_id64);
@ -603,17 +697,16 @@ retry:
goto retry;
}
client_id = client_id64 & 0xffffffffll;
client = client_by_id(ci, client_id);
client = ref_client_by_id(cdata, client_id);
if (unlikely(!client)) {
LOGINFO("Connector failed to find client id %ld to drop", client_id);
goto retry;
}
ret = drop_client(ci, client);
ret = drop_client(cdata, client);
dec_instance_ref(cdata, client);
if (ret >= 0)
LOGINFO("Connector dropped client id: %ld", client_id);
goto retry;
}
if (cmdmatch(buf, "passthrough")) {
} else if (cmdmatch(buf, "passthrough")) {
client_instance_t *client;
ret = sscanf(buf, "passthrough=%ld", &client_id);
@ -621,44 +714,21 @@ retry:
LOGDEBUG("Connector failed to parse passthrough command: %s", buf);
goto retry;
}
client = client_by_id(ci, client_id);
client = ref_client_by_id(cdata, client_id);
if (unlikely(!client)) {
LOGINFO("Connector failed to find client id %ld to pass through", client_id);
goto retry;
}
passthrough_client(ci, client);
goto retry;
}
if (cmdmatch(buf, "getfd")) {
send_fd(ci->serverfd, sockd);
goto retry;
}
/* Anything else should be a json message to send to a client */
json_msg = json_loads(buf, 0, NULL);
if (unlikely(!json_msg)) {
LOGWARNING("Invalid json message: %s", buf);
goto retry;
}
passthrough_client(cdata, client);
dec_instance_ref(cdata, client);
} else if (cmdmatch(buf, "getxfd")) {
int fdno = -1;
/* Extract the client id from the json message and remove its entry */
client_id64 = json_integer_value(json_object_get(json_msg, "client_id"));
json_object_del(json_msg, "client_id");
if (client_id64 > 0xffffffffll) {
int64_t passthrough_id;
passthrough_id = client_id64 & 0xffffffffll;
client_id = client_id64 >> 32;
json_object_set_new_nocheck(json_msg, "client_id", json_integer(passthrough_id));
sscanf(buf, "getxfd%d", &fdno);
if (fdno > -1 && fdno < ckp->serverurls)
send_fd(cdata->serverfd[fdno], sockd);
} else
client_id = client_id64;
dealloc(buf);
buf = json_dumps(json_msg, 0);
realloc_strcat(&buf, "\n");
send_client(ci, client_id, buf);
json_decref(json_msg);
buf = NULL;
LOGWARNING("Unhandled connector message: %s", buf);
goto retry;
out:
Close(sockd);
@ -668,39 +738,24 @@ out:
int connector(proc_instance_t *pi)
{
char *url = NULL, *port = NULL;
cdata_t *cdata = ckzalloc(sizeof(cdata_t));
ckpool_t *ckp = pi->ckp;
int sockd, ret = 0;
conn_instance_t ci;
int sockd, ret = 0, i;
const int on = 1;
int tries = 0;
LOGWARNING("%s connector starting", ckp->name);
ckp->data = cdata;
cdata->ckp = ckp;
if (ckp->oldconnfd > 0) {
sockd = ckp->oldconnfd;
} else if (ckp->serverurl) {
if (!extract_sockaddr(ckp->serverurl, &url, &port)) {
LOGWARNING("Failed to extract server address from %s", ckp->serverurl);
ret = 1;
goto out;
}
do {
sockd = bind_socket(url, port);
if (sockd > 0)
break;
LOGWARNING("Connector failed to bind to socket, retrying in 5s");
sleep(5);
} while (++tries < 25);
if (!ckp->serverurls)
cdata->serverfd = ckalloc(sizeof(int *));
else
cdata->serverfd = ckalloc(sizeof(int *) * ckp->serverurls);
dealloc(url);
dealloc(port);
if (sockd < 0) {
LOGERR("Connector failed to bind to socket for 2 minutes");
ret = 1;
goto out;
}
} else {
if (!ckp->serverurls) {
/* No serverurls have been specified. Bind to all interfaces
* on default sockets. */
struct sockaddr_in serv_addr;
sockd = socket(AF_INET, SOCK_STREAM, 0);
@ -726,28 +781,70 @@ int connector(proc_instance_t *pi)
Close(sockd);
goto out;
}
if (listen(sockd, SOMAXCONN) < 0) {
LOGERR("Connector failed to listen on socket");
Close(sockd);
goto out;
}
if (tries)
LOGWARNING("Connector successfully bound to socket");
cdata->serverfd[0] = sockd;
} else {
for (i = 0; i < ckp->serverurls; i++) {
char oldurl[INET6_ADDRSTRLEN], oldport[8];
char newurl[INET6_ADDRSTRLEN], newport[8];
char *serverurl = ckp->serverurl[i];
ret = listen(sockd, SOMAXCONN);
if (ret < 0) {
if (!url_from_serverurl(serverurl, newurl, newport)) {
LOGWARNING("Failed to extract resolved url from %s", serverurl);
ret = 1;
goto out;
}
sockd = ckp->oldconnfd[i];
if (url_from_socket(sockd, oldurl, oldport)) {
if (strcmp(newurl, oldurl) || strcmp(newport, oldport)) {
LOGWARNING("Handed over socket url %s:%s does not match config %s:%s, creating new socket",
oldurl, oldport, newurl, newport);
Close(sockd);
}
}
do {
if (sockd > 0)
break;
sockd = bind_socket(newurl, newport);
if (sockd > 0)
break;
LOGWARNING("Connector failed to bind to socket, retrying in 5s");
sleep(5);
} while (++tries < 25);
if (sockd < 0) {
LOGERR("Connector failed to bind to socket for 2 minutes");
ret = 1;
goto out;
}
if (listen(sockd, SOMAXCONN) < 0) {
LOGERR("Connector failed to listen on socket");
Close(sockd);
goto out;
}
cdata->serverfd[i] = sockd;
}
}
if (tries)
LOGWARNING("Connector successfully bound to socket");
cklock_init(&ci.lock);
memset(&ci, 0, sizeof(ci));
ci.pi = pi;
ci.serverfd = sockd;
ci.nfds = 0;
mutex_init(&sender_lock);
cond_init(&sender_cond);
create_pthread(&ci.pth_sender, sender, &ci);
create_pthread(&ci.pth_receiver, receiver, &ci);
cklock_init(&cdata->lock);
cdata->pi = pi;
cdata->nfds = 0;
cdata->client_id = 1;
mutex_init(&cdata->sender_lock);
cond_init(&cdata->sender_cond);
create_pthread(&cdata->pth_sender, sender, cdata);
create_pthread(&cdata->pth_receiver, receiver, cdata);
ret = connector_loop(pi, &ci);
ret = connector_loop(pi, cdata);
out:
dealloc(ckp->data);
return process_exit(ckp, pi, ret);
}

42
src/generator.c

@ -122,9 +122,13 @@ struct proxy_instance {
typedef struct proxy_instance proxy_instance_t;
static int proxy_notify_id; // Globally increasing notify id
/* Private data for the generator */
struct generator_data {
int proxy_notify_id; // Globally increasing notify id
ckmsgq_t *srvchk; // Server check message queue
};
static ckmsgq_t *srvchk; // Server check message queue
typedef struct generator_data gdata_t;
static bool server_alive(ckpool_t *ckp, server_instance_t *si, bool pinging)
{
@ -239,6 +243,7 @@ static int gen_loop(proc_instance_t *pi)
bool reconnecting = false;
unixsock_t *us = &pi->us;
ckpool_t *ckp = pi->ckp;
gdata_t *gdata = ckp->data;
bool started = false;
char *buf = NULL;
connsock_t *cs;
@ -264,7 +269,7 @@ reconnect:
retry:
Close(sockd);
ckmsgq_add(srvchk, si);
ckmsgq_add(gdata->srvchk, si);
do {
selret = wait_read_select(us->sockd, 5);
@ -682,6 +687,7 @@ static bool parse_notify(proxy_instance_t *proxi, json_t *val)
{
const char *prev_hash, *bbversion, *nbit, *ntime;
char *job_id, *coinbase1, *coinbase2;
gdata_t *gdata = proxi->ckp->data;
bool clean, ret = false;
notify_instance_t *ni;
int merkles, i;
@ -741,7 +747,7 @@ static bool parse_notify(proxy_instance_t *proxi, json_t *val)
ni->notify_time = time(NULL);
mutex_lock(&proxi->notify_lock);
ni->id = proxy_notify_id++;
ni->id = gdata->proxy_notify_id++;
HASH_ADD_INT(proxi->notify_instances, id, ni);
proxi->current_notify = ni;
mutex_unlock(&proxi->notify_lock);
@ -787,6 +793,19 @@ static bool show_message(json_t *val)
return true;
}
static bool send_pong(proxy_instance_t *proxi, json_t *val)
{
json_t *json_msg, *id_val = json_object_dup(val, "id");
connsock_t *cs = proxi->cs;
bool ret;
JSON_CPACK(json_msg, "{sossso}", "id", id_val, "result", "pong",
"error", json_null());
ret = send_json_msg(cs, json_msg);
json_decref(json_msg);
return ret;
}
static bool parse_reconnect(proxy_instance_t *proxi, json_t *val)
{
server_instance_t *newsi, *si = proxi->si;
@ -919,6 +938,11 @@ static bool parse_method(proxy_instance_t *proxi, const char *msg)
ret = show_message(params);
goto out;
}
if (cmdmatch(buf, "mining.ping")) {
ret = send_pong(proxi, val);
goto out;
}
out:
if (val)
json_decref(val);
@ -1450,6 +1474,7 @@ static int proxy_loop(proc_instance_t *pi)
bool reconnecting = false;
unixsock_t *us = &pi->us;
ckpool_t *ckp = pi->ckp;
gdata_t *gdata = ckp->data;
char *buf = NULL;
reconnect:
@ -1476,7 +1501,7 @@ reconnect:
}
retry:
ckmsgq_add(srvchk, proxi->si);
ckmsgq_add(gdata->srvchk, proxi->si);
do {
selret = wait_read_select(us->sockd, 5);
@ -1727,16 +1752,19 @@ static void server_watchdog(ckpool_t *ckp, server_instance_t *cursi)
int generator(proc_instance_t *pi)
{
ckpool_t *ckp = pi->ckp;
gdata_t *gdata;
int ret;
LOGWARNING("%s generator starting", ckp->name);
srvchk = create_ckmsgq(ckp, "srvchk", &server_watchdog);
gdata = ckzalloc(sizeof(gdata_t));
ckp->data = gdata;
gdata->srvchk = create_ckmsgq(ckp, "srvchk", &server_watchdog);
if (ckp->proxy)
ret = proxy_mode(ckp, pi);
else
ret = server_mode(ckp, pi);
dealloc(ckp->data);
return process_exit(ckp, pi, ret);
}

159
src/libckpool.c

@ -30,6 +30,7 @@
#include <time.h>
#include <math.h>
#include <poll.h>
#include <arpa/inet.h>
#include "libckpool.h"
#include "sha2.h"
@ -77,6 +78,43 @@ void join_pthread(pthread_t thread)
pthread_join(thread, NULL);
}
struct ck_completion {
sem_t sem;
void (*fn)(void *fnarg);
void *fnarg;
};
static void *completion_thread(void *arg)
{
struct ck_completion *ckc = (struct ck_completion *)arg;
ckc->fn(ckc->fnarg);
cksem_post(&ckc->sem);
return NULL;
}
bool ck_completion_timeout(void *fn, void *fnarg, int timeout)
{
struct ck_completion ckc;
pthread_t pthread;
bool ret = false;
cksem_init(&ckc.sem);
ckc.fn = fn;
ckc.fnarg = fnarg;
pthread_create(&pthread, NULL, completion_thread, (void *)&ckc);
ret = cksem_mswait(&ckc.sem, timeout);
if (!ret)
pthread_join(pthread, NULL);
else
pthread_cancel(pthread);
return !ret;
}
/* Place holders for when we add lock debugging */
#define GETLOCK(_lock, _file, _func, _line)
#define GOTLOCK(_lock, _file, _func, _line)
@ -294,10 +332,9 @@ void _cksem_post(sem_t *sem, const char *file, const char *func, const int line)
void _cksem_wait(sem_t *sem, const char *file, const char *func, const int line)
{
retry:
if (unlikely(sem_wait(sem))) {
if (errno == EINTR)
goto retry;
return;
quitfrom(1, file, func, line, "Failed to sem_wait errno=%d sem=0x%p", errno, sem);
}
}
@ -311,7 +348,6 @@ int _cksem_mswait(sem_t *sem, int ms, const char *file, const char *func, const
tv_time(&tv_now);
tv_to_ts(&ts_now, &tv_now);
ms_to_ts(&abs_timeout, ms);
retry:
timeraddspec(&abs_timeout, &ts_now);
ret = sem_timedwait(sem, &abs_timeout);
@ -319,28 +355,21 @@ retry:
if (likely(errno == ETIMEDOUT))
return ETIMEDOUT;
if (errno == EINTR)
goto retry;
return EINTR;
quitfrom(1, file, func, line, "Failed to sem_timedwait errno=%d sem=0x%p", errno, sem);
}
return 0;
}
void cksem_reset(sem_t *sem)
void _cksem_destroy(sem_t *sem, const char *file, const char *func, const int line)
{
int ret;
do {
ret = sem_trywait(sem);
if (unlikely(ret < 0 && (errno == EINTR)))
ret = 0;
} while (!ret);
}
void cksem_destroy(sem_t *sem)
{
sem_destroy(sem);
if (unlikely(sem_destroy(sem)))
quitfrom(1, file, func, line, "Failed to sem_destroy errno=%d sem=0x%p", errno, sem);
}
/* Extract just the url and port information from a url string, allocating
* heap memory for sockaddr_url and sockaddr_port. */
bool extract_sockaddr(char *url, char **sockaddr_url, char **sockaddr_port)
{
char *url_begin, *url_end, *ipv6_begin, *ipv6_end, *port_start = NULL;
@ -406,6 +435,95 @@ bool extract_sockaddr(char *url, char **sockaddr_url, char **sockaddr_port)
return true;
}
/* Convert a sockaddr structure into a url and port. URL should be a string of
* INET6_ADDRSTRLEN size, port at least a string of 6 bytes */
bool url_from_sockaddr(const struct sockaddr *addr, char *url, char *port)
{
int port_no = 0;
switch(addr->sa_family) {
const struct sockaddr_in *inet4_in;
const struct sockaddr_in6 *inet6_in;
case AF_INET:
inet4_in = (struct sockaddr_in *)addr;
inet_ntop(AF_INET, &inet4_in->sin_addr, url, INET6_ADDRSTRLEN);
port_no = htons(inet4_in->sin_port);
break;
case AF_INET6:
inet6_in = (struct sockaddr_in6 *)addr;
inet_ntop(AF_INET6, &inet6_in->sin6_addr, url, INET6_ADDRSTRLEN);
port_no = htons(inet6_in->sin6_port);
break;
default:
return false;
}
sprintf(port, "%d", port_no);
return true;
}
bool addrinfo_from_url(const char *url, const char *port, struct addrinfo *addrinfo)
{
struct addrinfo *servinfo, hints;
memset(&hints, 0, sizeof(struct addrinfo));
hints.ai_family = AF_UNSPEC;
hints.ai_socktype = SOCK_STREAM;
servinfo = addrinfo;
if (getaddrinfo(url, port, &hints, &servinfo) != 0)
return false;
if (!servinfo)
return false;
memcpy(addrinfo, servinfo->ai_addr, servinfo->ai_addrlen);
freeaddrinfo(servinfo);
return true;
}
/* Extract a resolved url and port from a serverurl string. newurl must be
* a string of at least INET6_ADDRSTRLEN and newport at least 6 bytes. */
bool url_from_serverurl(char *serverurl, char *newurl, char *newport)
{
char *url = NULL, *port = NULL;
struct addrinfo addrinfo;
bool ret = false;
if (!extract_sockaddr(serverurl, &url, &port)) {
LOGWARNING("Failed to extract server address from %s", serverurl);
goto out;
}
if (!addrinfo_from_url(url, port, &addrinfo)) {
LOGWARNING("Failed to extract addrinfo from url %s:%s", url, port);
goto out;
}
if (!url_from_sockaddr((const struct sockaddr *)&addrinfo, newurl, newport)) {
LOGWARNING("Failed to extract url from sockaddr for original url: %s:%s",
url, port);
goto out;
}
ret = true;
out:
dealloc(url);
dealloc(port);
return ret;
}
/* Convert a socket into a url and port. URL should be a string of
* INET6_ADDRSTRLEN size, port at least a string of 6 bytes */
bool url_from_socket(const int sockd, char *url, char *port)
{
socklen_t addrlen = sizeof(struct sockaddr);
struct sockaddr addr;
if (sockd < 1)
return false;
if (getsockname(sockd, &addr, &addrlen))
return false;
if (!url_from_sockaddr(&addr, url, port))
return false;
return true;
}
void keep_sockalive(int fd)
{
const int tcp_one = 1;
@ -464,7 +582,7 @@ int bind_socket(char *url, char *port)
if (getaddrinfo(url, port, &hints, &servinfo) != 0) {
LOGWARNING("Failed to resolve (?wrong URL) %s:%s", url, port);
goto out;
return sockd;
}
for (p = servinfo; p != NULL; p = p->ai_next) {
sockd = socket(p->ai_family, p->ai_socktype, p->ai_protocol);
@ -484,6 +602,7 @@ int bind_socket(char *url, char *port)
}
out:
freeaddrinfo(servinfo);
return sockd;
}
@ -1494,11 +1613,7 @@ void cksleep_prepare_r(ts_t *ts)
void nanosleep_abstime(ts_t *ts_end)
{
int ret;
do {
ret = clock_nanosleep(CLOCK_MONOTONIC, TIMER_ABSTIME, ts_end, NULL);
} while (ret == EINTR);
clock_nanosleep(CLOCK_MONOTONIC, TIMER_ABSTIME, ts_end, NULL);
}
void timeraddspec(ts_t *a, const ts_t *b)

19
src/libckpool.h

@ -14,6 +14,7 @@
#include <errno.h>
#include <jansson.h>
#include <netdb.h>
#include <pthread.h>
#include <signal.h>
#include <stdbool.h>
@ -372,6 +373,7 @@ static inline void _json_set_bool(json_t *val, const char *key, bool boolean,
void rename_proc(const char *name);
void create_pthread(pthread_t *thread, void *(*start_routine)(void *), void *arg);
void join_pthread(pthread_t thread);
bool ck_completion_timeout(void *fn, void *fnarg, int timeout);
void _mutex_lock(pthread_mutex_t *lock, const char *file, const char *func, const int line);
void _mutex_unlock_noyield(pthread_mutex_t *lock, const char *file, const char *func, const int line);
@ -409,13 +411,13 @@ void _cksem_init(sem_t *sem, const char *file, const char *func, const int line)
void _cksem_post(sem_t *sem, const char *file, const char *func, const int line);
void _cksem_wait(sem_t *sem, const char *file, const char *func, const int line);
int _cksem_mswait(sem_t *sem, int ms, const char *file, const char *func, const int line);
void cksem_reset(sem_t *sem);
void cksem_destroy(sem_t *sem);
void _cksem_destroy(sem_t *sem, const char *file, const char *func, const int line);
#define cksem_init(_sem) _cksem_init(_sem, __FILE__, __func__, __LINE__)
#define cksem_post(_sem) _cksem_post(_sem, __FILE__, __func__, __LINE__)
#define cksem_wait(_sem) _cksem_wait(_sem, __FILE__, __func__, __LINE__)
#define cksem_mswait(_sem, _timeout) _cksem_mswait(_sem, _timeout, __FILE__, __func__, __LINE__)
#define cksem_init(SEM) _cksem_init(SEM, __FILE__, __func__, __LINE__)
#define cksem_post(SEM) _cksem_post(SEM, __FILE__, __func__, __LINE__)
#define cksem_wait(SEM) _cksem_wait(SEM, __FILE__, __func__, __LINE__)
#define cksem_mswait(SEM, _timeout) _cksem_mswait(SEM, _timeout, __FILE__, __func__, __LINE__)
#define cksem_destroy(SEM) _cksem_destroy(SEM, __FILE__, __func__, __LINE__)
static inline bool sock_connecting(void)
{
@ -433,6 +435,11 @@ static inline bool sock_timeout(void)
}
bool extract_sockaddr(char *url, char **sockaddr_url, char **sockaddr_port);
bool url_from_sockaddr(const struct sockaddr *addr, char *url, char *port);
bool addrinfo_from_url(const char *url, const char *port, struct addrinfo *addrinfo);
bool url_from_serverurl(char *serverurl, char *newurl, char *newport);
bool url_from_socket(const int sockd, char *url, char *port);
void keep_sockalive(int fd);
void nolinger_socket(int fd);
void noblock_socket(int fd);

1209
src/stratifier.c

File diff suppressed because it is too large Load Diff
Loading…
Cancel
Save