Browse Source

Add a userproxy mode that connects to the same upstream pool according to the username supplied by clients connecting to the proxy

master
Con Kolivas 10 years ago
parent
commit
88e20dd111
  1. 21
      src/ckpool.c
  2. 3
      src/ckpool.h
  3. 86
      src/generator.c
  4. 23
      src/stratifier.c

21
src/ckpool.c

@ -1517,6 +1517,7 @@ static struct option long_options[] = {
{"redirector", no_argument, 0, 'R'},
{"ckdb-sockdir",required_argument, 0, 'S'},
{"sockdir", required_argument, 0, 's'},
{"userproxy", no_argument, 0, 'u'},
{0, 0, 0, 0}
};
#else
@ -1534,6 +1535,7 @@ static struct option long_options[] = {
{"proxy", no_argument, 0, 'p'},
{"redirector", no_argument, 0, 'R'},
{"sockdir", required_argument, 0, 's'},
{"userproxy", no_argument, 0, 'u'},
{0, 0, 0, 0}
};
#endif
@ -1577,7 +1579,7 @@ int main(int argc, char **argv)
ckp.initial_args[ckp.args] = strdup(argv[ckp.args]);
ckp.initial_args[ckp.args] = NULL;
while ((c = getopt_long(argc, argv, "Ac:Dd:g:HhkLl:n:PpRS:s:", long_options, &i)) != -1) {
while ((c = getopt_long(argc, argv, "Ac:Dd:g:HhkLl:n:PpRS:s:u", long_options, &i)) != -1) {
switch (c) {
case 'A':
ckp.standalone = true;
@ -1632,18 +1634,18 @@ int main(int argc, char **argv)
ckp.name = optarg;
break;
case 'P':
if (ckp.proxy || ckp.redirector)
quit(1, "Cannot set both proxy or redirector and passthrough mode");
if (ckp.proxy || ckp.redirector || ckp.userproxy)
quit(1, "Cannot set another proxy type or redirector and passthrough mode");
ckp.standalone = ckp.proxy = ckp.passthrough = true;
break;
case 'p':
if (ckp.passthrough || ckp.redirector)
quit(1, "Cannot set both passthrough or redirector and proxy mode");
if (ckp.passthrough || ckp.redirector || ckp.userproxy)
quit(1, "Cannot set another proxy type or redirector and proxy mode");
ckp.proxy = true;
break;
case 'R':
if (ckp.proxy || ckp.passthrough)
quit(1, "Cannot set both proxy or passthrough and redirector modes");
if (ckp.proxy || ckp.passthrough || ckp.userproxy)
quit(1, "Cannot set a proxy type or passthrough and redirector modes");
ckp.standalone = ckp.proxy = ckp.passthrough = ckp.redirector = true;
break;
case 'S':
@ -1652,6 +1654,11 @@ int main(int argc, char **argv)
case 's':
ckp.socket_dir = strdup(optarg);
break;
case 'u':
if (ckp.proxy || ckp.redirector || ckp.passthrough)
quit(1, "Cannot set both userproxy and another proxy type or redirector");
ckp.userproxy = ckp.proxy = true;
break;
}
}

3
src/ckpool.h

@ -189,6 +189,9 @@ struct ckpool_instance {
/* Are we running without ckdb */
bool standalone;
/* Are we running in userproxy mode */
bool userproxy;
/* Should we daemonise the ckpool process */
bool daemon;

86
src/generator.c

@ -1012,11 +1012,11 @@ static void disable_subproxy(gdata_t *gdata, proxy_instance_t *proxi, proxy_inst
epoll_ctl(proxi->epfd, EPOLL_CTL_DEL, subproxy->cs.fd, NULL);
Close(subproxy->cs.fd);
}
subproxy->disabled = true;
if (parent_proxy(subproxy))
return;
mutex_lock(&proxi->proxy_lock);
subproxy->disabled = true;
/* Make sure subproxy is still in the list */
subproxy = __subproxy_by_id(proxi, subproxy->subid);
if (likely(subproxy))
@ -1167,7 +1167,7 @@ static bool parse_method(ckpool_t *ckp, proxy_instance_t *proxi, const char *msg
memset(&err, 0, sizeof(err));
val = json_loads(msg, 0, &err);
if (!val) {
LOGWARNING("JSON decode failed(%d): %s", err.line, err.text);
LOGWARNING("JSON decode of msg %s failed(%d): %s", msg, err.line, err.text);
goto out;
}
@ -1278,8 +1278,13 @@ static bool auth_stratum(ckpool_t *ckp, connsock_t *cs, proxy_instance_t *proxi)
val = json_msg_result(buf, &res_val, &err_val);
if (!val) {
LOGWARNING("Proxy %d:%d %s failed to get a json result in auth_stratum, got: %s",
proxi->id, proxi->subid, proxi->url, buf);
if (proxi->global) {
LOGWARNING("Proxy %d:%d %s failed to get a json result in auth_stratum, got: %s",
proxi->id, proxi->subid, proxi->url, buf);
} else {
LOGNOTICE("Proxy %d:%d %s failed to get a json result in auth_stratum, got: %s",
proxi->id, proxi->subid, proxi->url, buf);
}
goto out;
}
@ -1314,6 +1319,12 @@ out:
parse_method(ckp, proxi, buf);
};
}
if (!proxi->global) {
LOGNOTICE("Disabling userproxy %d:%d %s that failed authorisation as %s",
proxi->id, proxi->subid, proxi->url, proxi->auth);
proxi->disabled = true;
disable_subproxy(ckp->data, proxi->parent, proxi);
}
return ret;
}
@ -2133,7 +2144,8 @@ static void *userproxy_recv(void *arg)
}
mutex_unlock(&gdata->share_lock);
do {
timeout = 0;
while ((ret = read_socket_line(cs, &timeout)) > 0) {
/* proxy may have been recycled here if it is not a
* parent and reconnect was issued */
if (parse_method(ckp, proxy, cs->buf))
@ -2144,7 +2156,7 @@ static void *userproxy_recv(void *arg)
proxy->id, proxy->subid, cs->buf);
}
timeout = 0;
} while ((ret = read_socket_line(cs, &timeout)) > 0);
}
}
return NULL;
}
@ -2282,6 +2294,24 @@ static proxy_instance_t *__add_userproxy(ckpool_t *ckp, gdata_t *gdata, const in
return proxy;
}
static void add_userproxy(ckpool_t *ckp, gdata_t *gdata, const int userid,
const char *url, const char *auth, const char *pass)
{
proxy_instance_t *proxy;
char *newurl = strdup(url);
char *newauth = strdup(auth);
char *newpass = strdup(pass);
int id;
mutex_lock(&gdata->lock);
id = ckp->proxies++;
proxy = __add_userproxy(ckp, gdata, id, userid, newurl, newauth, newpass);
mutex_unlock(&gdata->lock);
LOGWARNING("Adding non global user %s, %d proxy %d:%s", auth, userid, id, url);
prepare_proxy(proxy);
}
static void parse_addproxy(ckpool_t *ckp, gdata_t *gdata, const int sockd, const char *buf)
{
char *url = NULL, *auth = NULL, *pass = NULL;
@ -2555,6 +2585,48 @@ out:
send_api_response(val, sockd);
}
static void parse_globaluser(ckpool_t *ckp, gdata_t *gdata, const char *buf)
{
char *url, *username, *pass = strdupa(buf);
int userid = -1, proxyid = -1;
proxy_instance_t *proxy, *tmp;
int64_t clientid = -1;
bool found = false;
sscanf(buf, "%d:%d:%"PRId64":%s", &proxyid, &userid, &clientid, pass);
if (unlikely(clientid < 0 || userid < 0 || proxyid < 0)) {
LOGWARNING("Failed to parse_globaluser ids from command %s", buf);
return;
}
username = strsep(&pass, ",");
if (unlikely(!username)) {
LOGWARNING("Failed to parse_globaluser username from command %s", buf);
return;
}
LOGDEBUG("Checking userproxy proxy %d user %d:%"PRId64" worker %s pass %s",
proxyid, userid, clientid, username, pass);
if (unlikely(proxyid >= ckp->proxies)) {
LOGWARNING("Trying to find non-existent proxy id %d in parse_globaluser", proxyid);
return;
}
mutex_lock(&gdata->lock);
url = ckp->proxyurl[proxyid];
HASH_ITER(hh, gdata->proxies, proxy, tmp) {
if (!strcmp(proxy->auth, username)) {
found = true;
break;
}
}
mutex_unlock(&gdata->lock);
if (found)
return;
add_userproxy(ckp, gdata, userid, url, username, pass);
}
static int proxy_loop(proc_instance_t *pi)
{
proxy_instance_t *proxi = NULL, *cproxy;
@ -2617,6 +2689,8 @@ retry:
parse_ableproxy(gdata, umsg->sockd, buf + 13, true);
} else if (cmdmatch(buf, "proxystats")) {
parse_proxystats(gdata, umsg->sockd, buf + 11);
} else if (cmdmatch(buf, "globaluser")) {
parse_globaluser(ckp, gdata, buf + 11);
} else if (cmdmatch(buf, "shutdown")) {
ret = 0;
goto out;

23
src/stratifier.c

@ -282,6 +282,7 @@ struct stratum_instance {
char *useragent;
char *workername;
char *password;
int user_id;
int server; /* Which server is this instance bound to */
@ -1078,6 +1079,7 @@ static void __kill_instance(sdata_t *sdata, stratum_instance_t *client)
client->proxy->parent->combined_clients--;
}
free(client->workername);
free(client->password);
free(client->useragent);
memset(client, 0, sizeof(stratum_instance_t));
DL_APPEND(sdata->recycled_instances, client);
@ -4071,14 +4073,26 @@ static void queue_delayed_auth(stratum_instance_t *client)
ckdbq_add(ckp, ID_AUTH, val);
}
static void check_global_user(ckpool_t *ckp, user_instance_t *user, stratum_instance_t *client)
{
sdata_t *sdata = ckp->data;
proxy_t *proxy = best_proxy(sdata);
int proxyid = proxy->id;
char buf[256];
sprintf(buf, "globaluser=%d:%d:%"PRId64":%s,%s", proxyid, user->id, client->id,
user->username, client->password);
send_generator(ckp, buf, GEN_LAX);
}
/* Needs to be entered with client holding a ref count. */
static json_t *parse_authorise(stratum_instance_t *client, const json_t *params_val,
json_t **err_val, int *errnum)
{
user_instance_t *user;
ckpool_t *ckp = client->ckp;
const char *buf, *pass;
bool ret = false;
const char *buf;
int arr_size;
ts_t now;
@ -4112,6 +4126,7 @@ static json_t *parse_authorise(stratum_instance_t *client, const json_t *params_
*err_val = json_string("Invalid character in username");
goto out;
}
pass = json_string_value(json_array_get(params_val, 1));
user = generate_user(ckp, client, buf);
client->user_id = user->id;
ts_realtime(&now);
@ -4119,6 +4134,10 @@ static json_t *parse_authorise(stratum_instance_t *client, const json_t *params_
/* NOTE workername is NULL prior to this so should not be used in code
* till after this point */
client->workername = strdup(buf);
if (pass)
client->password = strndup(pass, 64);
else
client->password = strdup("");
if (user->failed_authtime) {
time_t now_t = time(NULL);
@ -4161,6 +4180,8 @@ static json_t *parse_authorise(stratum_instance_t *client, const json_t *params_
if (ckp->proxy) {
LOGNOTICE("Authorised client %"PRId64" to proxy %d:%d, worker %s as user %s",
client->id, client->proxyid, client->subproxyid, buf, user->username);
if (ckp->userproxy)
check_global_user(ckp, user, client);
} else {
LOGNOTICE("Authorised client %"PRId64" worker %s as user %s",
client->id, buf, user->username);

Loading…
Cancel
Save