Browse Source

Store structures for all possible upstream servers to allow us to choose an alive one for now with a view to enable switching

master
Con Kolivas 11 years ago
parent
commit
7c98ab5c1d
  1. 24
      src/ckpool.h
  2. 226
      src/generator.c

24
src/ckpool.h

@ -13,6 +13,7 @@
#include "config.h" #include "config.h"
#include "libckpool.h" #include "libckpool.h"
#include "uthash.h"
struct ckpool_instance; struct ckpool_instance;
typedef struct ckpool_instance ckpool_t; typedef struct ckpool_instance ckpool_t;
@ -26,6 +27,22 @@ struct proc_instance {
int (*process)(proc_instance_t *); int (*process)(proc_instance_t *);
}; };
struct server_instance {
/* Hash table data */
UT_hash_handle hh;
int id;
char *url;
char *auth;
char *pass;
connsock_t cs;
bool alive;
void *data; // Private data
};
typedef struct server_instance server_instance_t;
struct ckpool_instance { struct ckpool_instance {
/* Filename of config file */ /* Filename of config file */
char *config; char *config;
@ -54,6 +71,9 @@ struct ckpool_instance {
pthread_t pth_listener; pthread_t pth_listener;
pthread_t pth_watchdog; pthread_t pth_watchdog;
/* Are we running as a proxy */
bool proxy;
/* Bitcoind data */ /* Bitcoind data */
int btcds; int btcds;
char **btcdurl; char **btcdurl;
@ -70,11 +90,11 @@ struct ckpool_instance {
char *btcsig; // Optional signature to add to coinbase char *btcsig; // Optional signature to add to coinbase
/* Stratum options */ /* Stratum options */
server_instance_t **servers;
char *serverurl; // URL to bind our server/proxy to
int update_interval; // Seconds between stratum updates int update_interval; // Seconds between stratum updates
char *serverurl;
/* Proxy options */ /* Proxy options */
bool proxy;
int proxies; int proxies;
char **proxyurl; char **proxyurl;
char **proxyauth; char **proxyauth;

226
src/generator.c

@ -47,6 +47,7 @@ typedef struct notify_instance notify_instance_t;
struct proxy_instance { struct proxy_instance {
ckpool_t *ckp; ckpool_t *ckp;
connsock_t *cs; connsock_t *cs;
server_instance_t *si;
const char *auth; const char *auth;
const char *pass; const char *pass;
@ -170,48 +171,6 @@ out:
return ret; return ret;
} }
static int server_mode(ckpool_t *ckp, proc_instance_t *pi, connsock_t *cs,
const char *auth, const char *pass)
{
char *userpass = NULL;
gbtbase_t gbt;
int ret = 1;
memset(&gbt, 0, sizeof(gbt));
userpass = strdup(auth);
realloc_strcat(&userpass, ":");
realloc_strcat(&userpass, pass);
cs->auth = http_base64(userpass);
if (!cs->auth) {
LOGWARNING("Failed to create base64 auth from %s", userpass);
goto out;
}
cs->fd = connect_socket(cs->url, cs->port);
if (cs->fd < 0) {
LOGWARNING("FATAL: Failed to connect socket to %s:%s !", cs->url, cs->port);
goto out;
}
keep_sockalive(cs->fd);
/* Test we can connect, authorise and get a block template */
if (!gen_gbtbase(cs, &gbt)) {
LOGWARNING("FATAL: Failed to get test block template from %s:%s auth %s !",
cs->url, cs->port, userpass);
goto out;
}
clear_gbtbase(&gbt);
if (!validate_address(cs, ckp->btcaddress)) {
LOGWARNING("FATAL: Invalid btcaddress: %s !", ckp->btcaddress);
goto out;
}
ret = gen_loop(pi, cs);
out:
close(cs->fd);
dealloc(userpass);
return ret;
}
static bool send_json_msg(connsock_t *cs, json_t *json_msg) static bool send_json_msg(connsock_t *cs, json_t *json_msg)
{ {
int len, sent; int len, sent;
@ -946,6 +905,7 @@ static void *proxy_send(void *arg)
return NULL; return NULL;
} }
#if 0
static int proxy_mode(ckpool_t *ckp, proc_instance_t *pi, connsock_t *cs, static int proxy_mode(ckpool_t *ckp, proc_instance_t *pi, connsock_t *cs,
const char *auth, const char *pass) const char *auth, const char *pass)
{ {
@ -999,41 +959,171 @@ out:
return ret; return ret;
} }
#endif
/* FIXME: Make these use multiple BTCDs instead of just first alive. */
/* FIXME: Hard wired to just use config 0 for now */ static int server_mode(ckpool_t *ckp, proc_instance_t *pi)
int generator(proc_instance_t *pi)
{ {
char *url, *auth, *pass, *userpass = NULL; int i, ret = 1, alive = 0;
ckpool_t *ckp = pi->ckp; server_instance_t *si;
connsock_t cs; connsock_t *cs;
int ret = 1; gbtbase_t gbt;
memset(&cs, 0, sizeof(cs)); memset(&gbt, 0, sizeof(gbt));
if (!ckp->proxy) { ckp->servers = ckalloc(sizeof(server_instance_t *) * ckp->btcds);
url = ckp->btcdurl[0]; for (i = 0; i < ckp->btcds; i++) {
auth = ckp->btcdauth[0]; char *userpass = NULL;
pass = ckp->btcdpass[0];
} else { dealloc(userpass);
url = ckp->proxyurl[0]; ckp->servers[i] = ckzalloc(sizeof(server_instance_t));
auth = ckp->proxyauth[0]; si = ckp->servers[i];
pass = ckp->proxypass[0]; cs = &si->cs;
si->url = ckp->btcdurl[i];
si->auth = ckp->btcdauth[i];
si->pass = ckp->btcdpass[i];
if (!extract_sockaddr(si->url, &cs->url, &cs->port)) {
LOGWARNING("Failed to extract address from %s", si->url);
continue;
}
userpass = strdup(si->auth);
realloc_strcat(&userpass, ":");
realloc_strcat(&userpass, si->pass);
cs->auth = http_base64(userpass);
if (!cs->auth) {
LOGWARNING("Failed to create base64 auth from %s", userpass);
continue;
}
cs->fd = connect_socket(cs->url, cs->port);
if (cs->fd < 0) {
LOGWARNING("Failed to connect socket to %s:%s !", cs->url, cs->port);
continue;
}
keep_sockalive(cs->fd);
/* Test we can connect, authorise and get a block template */
if (!gen_gbtbase(cs, &gbt)) {
LOGWARNING("Failed to get test block template from %s:%s auth %s !",
cs->url, cs->port, userpass);
continue;
}
clear_gbtbase(&gbt);
if (!validate_address(cs, ckp->btcaddress)) {
LOGWARNING("Invalid btcaddress: %s !", ckp->btcaddress);
continue;
}
dealloc(userpass);
si->alive = true;
alive++;
} }
if (!extract_sockaddr(url, &cs.url, &cs.port)) { if (!alive) {
LOGWARNING("Failed to extract address from %s", url); LOGEMERG("FATAL: No bitcoinds active!");
goto out; goto out;
} }
for (i = 0; i < ckp->btcds; si = ckp->servers[i], i++) {
if (si->alive)
break;
}
ret = gen_loop(pi, &si->cs);
out:
for (i = 0; i < ckp->btcds; si = ckp->servers[i], i++)
dealloc(si);
dealloc(ckp->servers);
return ret;
}
if (!ckp->proxy) static int proxy_mode(ckpool_t *ckp, proc_instance_t *pi)
ret = server_mode(ckp, pi, &cs, auth, pass); {
else int i, ret = 1, alive = 0;
ret = proxy_mode(ckp, pi, &cs, auth, pass); proxy_instance_t *proxi;
server_instance_t *si;
connsock_t *cs;
ckp->servers = ckalloc(sizeof(server_instance_t *) * ckp->proxies);
for (i = 0; i < ckp->proxies; i++) {
ckp->servers[i] = ckzalloc(sizeof(server_instance_t));
si = ckp->servers[i];
cs = &si->cs;
si->url = ckp->proxyurl[i];
si->auth = ckp->proxyauth[i];
si->pass = ckp->proxypass[i];
if (!extract_sockaddr(si->url, &cs->url, &cs->port)) {
LOGWARNING("Failed to extract address from %s", si->url);
continue;
}
if (!connect_proxy(cs)) {
LOGWARNING("Failed to connect to %s:%s in proxy_mode!",
cs->url, cs->port);
continue;
}
proxi = ckzalloc(sizeof(proxy_instance_t));
si->data = proxi;
/* Test we can connect, authorise and get stratum information */
if (!subscribe_stratum(cs, proxi)) {
LOGWARNING("Failed initial subscribe to %s:%s !",
cs->url, cs->port);
continue;
}
proxi->auth = si->auth;
proxi->pass = si->pass;
if (!auth_stratum(cs, proxi)) {
LOGWARNING("Failed initial authorise to %s:%s with %s:%s !",
cs->url, cs->port, si->auth, si->pass);
continue;
}
proxi->si = si;
proxi->ckp = ckp;
proxi->cs = cs;
si->alive = true;
alive++;
}
if (!alive) {
LOGEMERG("FATAL: No proxied servers active!");
goto out;
}
for (i = 0; i < ckp->proxies; si = ckp->servers[i], i++) {
if (si->alive)
break;
}
proxi = si->data;
mutex_init(&proxi->notify_lock);
create_pthread(&proxi->pth_precv, proxy_recv, proxi);
mutex_init(&proxi->psend_lock);
cond_init(&proxi->psend_cond);
create_pthread(&proxi->pth_psend, proxy_send, proxi);
ret = proxy_loop(pi, proxi);
/* Return from the proxy loop means we have received a shutdown
* request */
pthread_cancel(proxi->pth_precv);
pthread_cancel(proxi->pth_psend);
join_pthread(proxi->pth_precv);
join_pthread(proxi->pth_psend);
out: out:
/* Clean up here */ for (i = 0; i < ckp->proxies; si = ckp->servers[i], i++) {
dealloc(cs.url); close(si->cs.fd);
dealloc(cs.port); proxi = si->data;
dealloc(userpass); free(proxi->enonce1);
free(proxi->enonce1bin);
free(proxi->sessionid);
dealloc(si->data);
dealloc(si);
}
dealloc(ckp->servers);
return ret;
}
int generator(proc_instance_t *pi)
{
ckpool_t *ckp = pi->ckp;
int ret;
if (ckp->proxy)
ret = proxy_mode(ckp, pi);
else
ret = server_mode(ckp, pi);
LOGINFO("%s generator exiting with return code %d", ckp->name, ret); LOGINFO("%s generator exiting with return code %d", ckp->name, ret);
if (ret) { if (ret) {

Loading…
Cancel
Save