kanoi 10 years ago
parent
commit
eb8d0919da
  1. 63
      src/ckpool.c
  2. 41
      src/ckpool.h
  3. 48
      src/connector.c
  4. 40
      src/generator.c
  5. 21
      src/libckpool.c
  6. 6
      src/libckpool.h
  7. 30
      src/stratifier.c

63
src/ckpool.c

@ -34,6 +34,30 @@
ckpool_t *global_ckp;
static void proclog(ckpool_t *ckp, char *msg)
{
FILE *LOGFP;
int logfd;
if (unlikely(!msg)) {
fprintf(stderr, "Proclog received null message");
return;
}
if (unlikely(!strlen(msg))) {
fprintf(stderr, "Proclog received zero length message");
free(msg);
return;
}
LOGFP = ckp->logfp;
logfd = ckp->logfd;
flock(logfd, LOCK_EX);
fprintf(LOGFP, "%s", msg);
flock(logfd, LOCK_UN);
free(msg);
}
/* Log everything to the logfile, but display warnings on the console as well */
void logmsg(int loglevel, const char *fmt, ...) {
if (global_ckp->loglevel >= loglevel && fmt) {
@ -58,14 +82,13 @@ void logmsg(int loglevel, const char *fmt, ...) {
tm->tm_min,
tm->tm_sec);
if (logfd) {
FILE *LOGFP = global_ckp->logfp;
char *msg;
flock(logfd, LOCK_EX);
fprintf(LOGFP, "%s %s", stamp, buf);
if (loglevel <= LOG_ERR && errno != 0)
fprintf(LOGFP, " with errno %d: %s", errno, strerror(errno));
fprintf(LOGFP, "\n");
flock(logfd, LOCK_UN);
ASPRINTF(&msg, "%s %s with errno %d: %s\n", stamp, buf, errno, strerror(errno));
else
ASPRINTF(&msg, "%s %s\n", stamp, buf);
ckmsgq_add(global_ckp->logger, msg);
}
if (loglevel <= LOG_WARNING) {\
if (loglevel <= LOG_ERR && errno != 0)
@ -89,16 +112,21 @@ static void *ckmsg_queue(void *arg)
while (42) {
ckmsg_t *msg;
tv_t now;
ts_t abs;
mutex_lock(&ckmsgq->lock);
tv_time(&now);
tv_to_ts(&abs, &now);
abs.tv_sec++;
if (!ckmsgq->msgs)
pthread_cond_wait(&ckmsgq->cond, &ckmsgq->lock);
pthread_cond_timedwait(&ckmsgq->cond, &ckmsgq->lock, &abs);
msg = ckmsgq->msgs;
if (likely(msg))
if (msg)
DL_DELETE(ckmsgq->msgs, msg);
mutex_unlock(&ckmsgq->lock);
if (unlikely(!msg))
if (!msg)
continue;
ckmsgq->func(ckp, msg->data);
free(msg);
@ -677,6 +705,17 @@ static void childsighandler(int sig)
kill(ppid, sig);
}
static void launch_logger(proc_instance_t *pi)
{
ckpool_t *ckp = pi->ckp;
char loggername[16];
/* Note that the logger is unique per process so it is the only value
* in ckp that differs between processes */
snprintf(loggername, 15, "%clogger", pi->processname[0]);
ckp->logger = create_ckmsgq(ckp, loggername, &proclog);
}
static void launch_process(proc_instance_t *pi)
{
pid_t pid;
@ -688,6 +727,7 @@ static void launch_process(proc_instance_t *pi)
struct sigaction handler;
int ret;
launch_logger(pi);
handler.sa_handler = &childsighandler;
handler.sa_flags = 0;
sigemptyset(&handler.sa_mask);
@ -1131,13 +1171,16 @@ int main(int argc, char **argv)
ckp.logfp = fopen(buf, "a");
if (!ckp.logfp)
quit(1, "Failed to make open log file %s", buf);
ckp.logfd = fileno(ckp.logfp);
/* Make logging line buffered */
setvbuf(ckp.logfp, NULL, _IOLBF, 0);
ckp.main.ckp = &ckp;
ckp.main.processname = strdup("main");
ckp.main.sockname = strdup("listener");
write_namepid(&ckp.main);
create_process_unixsock(&ckp.main);
launch_logger(&ckp.main);
ckp.logfd = fileno(ckp.logfp);
create_pthread(&ckp.pth_listener, listener, &ckp.main);

41
src/ckpool.h

@ -20,6 +20,26 @@
struct ckpool_instance;
typedef struct ckpool_instance ckpool_t;
struct ckmsg {
struct ckmsg *next;
struct ckmsg *prev;
void *data;
};
typedef struct ckmsg ckmsg_t;
struct ckmsgq {
ckpool_t *ckp;
char name[16];
pthread_t pth;
pthread_mutex_t lock;
pthread_cond_t cond;
ckmsg_t *msgs;
void (*func)(ckpool_t *, void *);
};
typedef struct ckmsgq ckmsgq_t;
struct proc_instance {
ckpool_t *ckp;
unixsock_t us;
@ -81,6 +101,8 @@ struct ckpool_instance {
FILE *logfp;
int logfd;
/* Logger message queue NOTE: Unique per process */
ckmsgq_t *logger;
/* Process instance data of parent/child processes */
proc_instance_t main;
@ -129,25 +151,6 @@ struct ckpool_instance {
char **proxypass;
};
struct ckmsg {
struct ckmsg *next;
struct ckmsg *prev;
void *data;
};
typedef struct ckmsg ckmsg_t;
struct ckmsgq {
ckpool_t *ckp;
char name[16];
pthread_t pth;
pthread_mutex_t lock;
pthread_cond_t cond;
ckmsg_t *msgs;
void (*func)(ckpool_t *, void *);
};
typedef struct ckmsgq ckmsgq_t;
ckmsgq_t *create_ckmsgq(ckpool_t *ckp, const char *name, const void *func);
void ckmsgq_add(ckmsgq_t *ckmsgq, void *data);

48
src/connector.c

@ -284,10 +284,8 @@ retry:
cksleep_ms(100);
goto retry;
}
do {
ret = poll(fds, nfds, 1000);
} while (unlikely(ret < 0 && interrupted()));
if (ret < 0) {
ret = poll(fds, nfds, 1000);
if (unlikely(ret < 0)) {
LOGERR("Failed to poll in receiver");
goto out;
}
@ -328,13 +326,13 @@ void *sender(void *arg)
{
conn_instance_t *ci = (conn_instance_t *)arg;
ckpool_t *ckp = ci->pi->ckp;
bool polling = false;
rename_proc("csender");
while (42) {
sender_send_t *sender_send;
client_instance_t *client;
bool only_send = false;
int ret, fd, ofs = 0;
mutex_lock(&sender_lock);
@ -342,14 +340,17 @@ void *sender(void *arg)
ts_t timeout_ts;
ts_realtime(&timeout_ts);
timeout_ts.tv_sec += 1;
if (!polling)
timeout_ts.tv_sec += 1;
else {
ts_t wait_ts = {0, 1000000};
timeraddspec(&timeout_ts, &wait_ts);
}
pthread_cond_timedwait(&sender_cond, &sender_lock, &timeout_ts);
}
sender_send = sender_sends;
if (likely(sender_send))
DL_DELETE(sender_sends, sender_send);
if (!sender_send)
only_send = true;
mutex_unlock(&sender_lock);
if (!sender_send)
@ -367,22 +368,23 @@ void *sender(void *arg)
free(sender_send);
continue;
}
/* If there are other sends pending and this socket is not
* ready to receive data from us, put the send back on the
* list. */
if (!only_send) {
ret = wait_write_select(fd, 0);
if (ret < 1) {
LOGDEBUG("Client %d not ready for writes", client->id);
/* Append it to the tail of the list */
mutex_lock(&sender_lock);
DL_APPEND(sender_sends, sender_send);
mutex_unlock(&sender_lock);
continue;
}
/* If this socket is not ready to receive data from us, put the
* send back on the tail of the list and decrease the timeout
* to poll to either look for a client that is ready or poll
* select on this one */
ret = wait_write_select(fd, 0);
if (ret < 1) {
LOGDEBUG("Client %d not ready for writes", client->id);
/* Append it to the tail of the list */
mutex_lock(&sender_lock);
DL_APPEND(sender_sends, sender_send);
mutex_unlock(&sender_lock);
polling = true;
continue;
}
polling = false;
while (sender_send->len) {
ret = send(fd, sender_send->buf + ofs, sender_send->len , 0);
if (unlikely(ret < 0)) {

40
src/generator.c

@ -233,11 +233,11 @@ retry:
goto retry;
}
LOGDEBUG("Generator received request: %s", buf);
if (!strncasecmp(buf, "shutdown", 8)) {
if (cmdmatch(buf, "shutdown")) {
ret = 0;
goto out;
}
if (!strncasecmp(buf, "getbase", 7)) {
if (cmdmatch(buf, "getbase")) {
if (!gen_gbtbase(cs, gbt)) {
LOGWARNING("Failed to get block template from %s:%s",
cs->url, cs->port);
@ -250,14 +250,14 @@ retry:
free(s);
clear_gbtbase(gbt);
}
} else if (!strncasecmp(buf, "getbest", 7)) {
} else if (cmdmatch(buf, "getbest")) {
if (!get_bestblockhash(cs, hash)) {
LOGINFO("No best block hash support from %s:%s",
cs->url, cs->port);
send_unix_msg(sockd, "Failed");
} else
send_unix_msg(sockd, hash);
} else if (!strncasecmp(buf, "getlast", 7)) {
} else if (cmdmatch(buf, "getlast")) {
int height = get_blockcount(cs);
if (height == -1) {
@ -273,13 +273,13 @@ retry:
LOGDEBUG("Hash: %s", hash);
}
}
} else if (!strncasecmp(buf, "submitblock:", 12)) {
} else if (cmdmatch(buf, "submitblock:")) {
LOGNOTICE("Submitting block data!");
if (submit_block(cs, buf + 12))
send_proc(ckp->stratifier, "block");
} else if (!strncasecmp(buf, "loglevel", 8)) {
} else if (cmdmatch(buf, "loglevel")) {
sscanf(buf, "loglevel=%d", &ckp->loglevel);
} else if (!strncasecmp(buf, "ping", 4)) {
} else if (cmdmatch(buf, "ping")) {
LOGDEBUG("Generator received ping request");
send_unix_msg(sockd, "pong");
}
@ -383,7 +383,7 @@ static json_t *find_notify(json_t *val)
return NULL;
arr_size = json_array_size(val);
entry = json_string_value(json_array_get(val, 0));
if (entry && !strncasecmp(entry, "mining.notify", 13))
if (cmdmatch(entry, "mining.notify"))
return val;
for (i = 0; i < arr_size; i++) {
json_t *arr_val;
@ -759,7 +759,7 @@ static bool parse_method(proxy_instance_t *proxi, const char *msg)
goto out;
}
if (!strncasecmp(buf, "mining.notify", 13)) {
if (cmdmatch(buf, "mining.notify")) {
if (parse_notify(proxi, params))
proxi->notified = ret = true;
else
@ -767,22 +767,22 @@ static bool parse_method(proxy_instance_t *proxi, const char *msg)
goto out;
}
if (!strncasecmp(buf, "mining.set_difficulty", 21)) {
if (cmdmatch(buf, "mining.set_difficulty")) {
ret = parse_diff(proxi, params);
goto out;
}
if (!strncasecmp(buf, "client.reconnect", 16)) {
if (cmdmatch(buf, "client.reconnect")) {
ret = parse_reconnect(proxi, params);
goto out;
}
if (!strncasecmp(buf, "client.get_version", 18)) {
if (cmdmatch(buf, "client.get_version")) {
ret = send_version(proxi, val);
goto out;
}
if (!strncasecmp(buf, "client.show_message", 19)) {
if (cmdmatch(buf, "client.show_message")) {
ret = show_message(params);
goto out;
}
@ -1237,23 +1237,23 @@ retry:
goto retry;
}
LOGDEBUG("Proxy received request: %s", buf);
if (!strncasecmp(buf, "shutdown", 8)) {
if (cmdmatch(buf, "shutdown")) {
ret = 0;
goto out;
} else if (!strncasecmp(buf, "getsubscribe", 12)) {
} else if (cmdmatch(buf, "getsubscribe")) {
send_subscribe(proxi, sockd);
} else if (!strncasecmp(buf, "getnotify", 9)) {
} else if (cmdmatch(buf, "getnotify")) {
send_notify(proxi, sockd);
} else if (!strncasecmp(buf, "getdiff", 7)) {
} else if (cmdmatch(buf, "getdiff")) {
send_diff(proxi, sockd);
} else if (!strncasecmp(buf, "reconnect", 9)) {
} else if (cmdmatch(buf, "reconnect")) {
kill_proxy(proxi);
pthread_cancel(proxi->pth_precv);
pthread_cancel(proxi->pth_psend);
goto reconnect;
} else if (!strncasecmp(buf, "loglevel", 8)) {
} else if (cmdmatch(buf, "loglevel")) {
sscanf(buf, "loglevel=%d", &ckp->loglevel);
} else if (!strncasecmp(buf, "ping", 4)) {
} else if (cmdmatch(buf, "ping")) {
LOGDEBUG("Proxy received ping request");
send_unix_msg(sockd, "pong");
} else {

21
src/libckpool.c

@ -615,7 +615,7 @@ int wait_read_select(int sockd, int timeout)
FD_SET(sockd, &readfs);
do {
ret = select(sockd + 1, &readfs, NULL, NULL, &tv_timeout);
} while (unlikely(ret < 0 && interrupted()));
} while (unlikely(ret < 0));
return ret;
}
@ -692,7 +692,7 @@ int wait_write_select(int sockd, int timeout)
FD_SET(sockd, &writefds);
do {
ret = select(sockd + 1, NULL, &writefds, NULL, &tv_timeout);
} while (unlikely(ret < 0 && interrupted()));
} while (unlikely(ret < 0));
return ret;
}
@ -1081,6 +1081,23 @@ int safecmp(const char *a, const char *b)
return (strcmp(a, b));
}
/* Returns whether there is a case insensitive match of buf to cmd, safely
* handling NULL or zero length strings. */
bool cmdmatch(const char *buf, const char *cmd)
{
int cmdlen, buflen;
if (!buf)
return false;
buflen = strlen(buf);
if (!buflen)
return false;
cmdlen = strlen(cmd);
if (buflen < cmdlen)
return false;
return !strncasecmp(buf, cmd, cmdlen);
}
static const char base64[] = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";

6
src/libckpool.h

@ -362,14 +362,11 @@ static inline bool sock_blocks(void)
{
return (errno == EAGAIN || errno == EWOULDBLOCK);
}
static inline bool sock_timeout(void)
{
return (errno == ETIMEDOUT);
}
static inline bool interrupted(void)
{
return (errno == EINTR);
}
bool extract_sockaddr(char *url, char **sockaddr_url, char **sockaddr_port);
void keep_sockalive(int fd);
@ -415,6 +412,7 @@ bool _hex2bin(void *p, const void *vhexstr, size_t len, const char *file, const
char *http_base64(const char *src);
void b58tobin(char *b58bin, const char *b58);
int safecmp(const char *a, const char *b);
bool cmdmatch(const char *buf, const char *cmd);
void address_to_pubkeytxn(char *pkh, const char *addr);
int ser_number(uchar *s, int32_t val);

30
src/stratifier.c

@ -542,7 +542,7 @@ static void update_base(ckpool_t *ckp)
LOGWARNING("Failed to get base from generator in update_base");
return;
}
if (unlikely(!strncasecmp(buf, "failed", 6))) {
if (unlikely(cmdmatch(buf, "failed"))) {
LOGWARNING("Generator returned failure in update_base");
return;
}
@ -1020,7 +1020,7 @@ retry:
LOGWARNING("Failed to get message in stratum_loop");
goto retry;
}
if (!strncasecmp(buf, "ping", 4)) {
if (cmdmatch(buf, "ping")) {
LOGDEBUG("Stratifier received ping request");
send_unix_msg(sockd, "pong");
close(sockd);
@ -1029,21 +1029,21 @@ retry:
close(sockd);
LOGDEBUG("Stratifier received request: %s", buf);
if (!strncasecmp(buf, "shutdown", 8)) {
if (cmdmatch(buf, "shutdown")) {
ret = 0;
goto out;
} else if (!strncasecmp(buf, "update", 6)) {
} else if (cmdmatch(buf, "update")) {
update_base(ckp);
} else if (!strncasecmp(buf, "subscribe", 9)) {
} else if (cmdmatch(buf, "subscribe")) {
/* Proxifier has a new subscription */
if (!update_subscribe(ckp))
goto out;
} else if (!strncasecmp(buf, "notify", 6)) {
} else if (cmdmatch(buf, "notify")) {
/* Proxifier has a new notify ready */
update_notify(ckp);
} else if (!strncasecmp(buf, "diff", 4)) {
} else if (cmdmatch(buf, "diff")) {
update_diff(ckp);
} else if (!strncasecmp(buf, "dropclient", 10)) {
} else if (cmdmatch(buf, "dropclient")) {
int client_id;
ret = sscanf(buf, "dropclient=%d", &client_id);
@ -1051,9 +1051,9 @@ retry:
LOGDEBUG("Stratifier failed to parse dropclient command: %s", buf);
else
drop_client(client_id);
} else if (!strncasecmp(buf, "block", 5)) {
} else if (cmdmatch(buf, "block")) {
block_solve(ckp);
} else if (!strncasecmp(buf, "loglevel", 8)) {
} else if (cmdmatch(buf, "loglevel")) {
sscanf(buf, "loglevel=%d", &ckp->loglevel);
} else {
json_t *val = json_loads(buf, 0, NULL);
@ -1079,7 +1079,7 @@ static void *blockupdate(void *arg)
pthread_detach(pthread_self());
rename_proc("blockupdate");
buf = send_recv_proc(ckp->generator, "getbest");
if (buf && strncasecmp(buf, "Failed", 6))
if (!cmdmatch(buf, "failed"))
sprintf(request, "getbest");
else
sprintf(request, "getlast");
@ -1088,7 +1088,7 @@ static void *blockupdate(void *arg)
while (42) {
dealloc(buf);
buf = send_recv_proc(ckp->generator, request);
if (safecmp(buf, hash) && strncasecmp(buf, "Failed", 6)) {
if (safecmp(buf, hash) && !cmdmatch(buf, "failed")) {
strcpy(hash, buf);
LOGNOTICE("Block hash changed to %s", hash);
send_proc(ckp->stratifier, "update");
@ -1899,7 +1899,7 @@ static void parse_method(const int client_id, json_t *id_val, json_t *method_val
/* Random broken clients send something not an integer as the id so we copy
* the json item for id_val as is for the response. */
method = json_string_value(method_val);
if (!strncasecmp(method, "mining.subscribe", 16)) {
if (cmdmatch(method, "mining.subscribe")) {
json_t *val, *result_val = parse_subscribe(client_id, params_val);
if (!result_val)
@ -1922,7 +1922,7 @@ static void parse_method(const int client_id, json_t *id_val, json_t *method_val
return;
}
if (!strncasecmp(method, "mining.auth", 11)) {
if (cmdmatch(method, "mining.auth")) {
json_params_t *jp = create_json_params(client_id, params_val, id_val, address);
ckmsgq_add(sauthq, jp);
@ -1942,7 +1942,7 @@ static void parse_method(const int client_id, json_t *id_val, json_t *method_val
return;
}
if (!strncasecmp(method, "mining.submit", 13)) {
if (cmdmatch(method, "mining.submit")) {
json_params_t *jp = create_json_params(client_id, params_val, id_val, address);
ckmsgq_add(sshareq, jp);

Loading…
Cancel
Save