From 88e20dd1110d828962480848ff3553e864f7f8db Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Sat, 12 Dec 2015 14:45:51 +1100 Subject: [PATCH] Add a userproxy mode that connects to the same upstream pool according to the username supplied by clients connecting to the proxy --- src/ckpool.c | 21 ++++++++---- src/ckpool.h | 3 ++ src/generator.c | 86 ++++++++++++++++++++++++++++++++++++++++++++---- src/stratifier.c | 23 ++++++++++++- 4 files changed, 119 insertions(+), 14 deletions(-) diff --git a/src/ckpool.c b/src/ckpool.c index 5ab30979..a76db57b 100644 --- a/src/ckpool.c +++ b/src/ckpool.c @@ -1517,6 +1517,7 @@ static struct option long_options[] = { {"redirector", no_argument, 0, 'R'}, {"ckdb-sockdir",required_argument, 0, 'S'}, {"sockdir", required_argument, 0, 's'}, + {"userproxy", no_argument, 0, 'u'}, {0, 0, 0, 0} }; #else @@ -1534,6 +1535,7 @@ static struct option long_options[] = { {"proxy", no_argument, 0, 'p'}, {"redirector", no_argument, 0, 'R'}, {"sockdir", required_argument, 0, 's'}, + {"userproxy", no_argument, 0, 'u'}, {0, 0, 0, 0} }; #endif @@ -1577,7 +1579,7 @@ int main(int argc, char **argv) ckp.initial_args[ckp.args] = strdup(argv[ckp.args]); ckp.initial_args[ckp.args] = NULL; - while ((c = getopt_long(argc, argv, "Ac:Dd:g:HhkLl:n:PpRS:s:", long_options, &i)) != -1) { + while ((c = getopt_long(argc, argv, "Ac:Dd:g:HhkLl:n:PpRS:s:u", long_options, &i)) != -1) { switch (c) { case 'A': ckp.standalone = true; @@ -1632,18 +1634,18 @@ int main(int argc, char **argv) ckp.name = optarg; break; case 'P': - if (ckp.proxy || ckp.redirector) - quit(1, "Cannot set both proxy or redirector and passthrough mode"); + if (ckp.proxy || ckp.redirector || ckp.userproxy) + quit(1, "Cannot set another proxy type or redirector and passthrough mode"); ckp.standalone = ckp.proxy = ckp.passthrough = true; break; case 'p': - if (ckp.passthrough || ckp.redirector) - quit(1, "Cannot set both passthrough or redirector and proxy mode"); + if (ckp.passthrough || ckp.redirector || ckp.userproxy) + quit(1, "Cannot set another proxy type or redirector and proxy mode"); ckp.proxy = true; break; case 'R': - if (ckp.proxy || ckp.passthrough) - quit(1, "Cannot set both proxy or passthrough and redirector modes"); + if (ckp.proxy || ckp.passthrough || ckp.userproxy) + quit(1, "Cannot set a proxy type or passthrough and redirector modes"); ckp.standalone = ckp.proxy = ckp.passthrough = ckp.redirector = true; break; case 'S': @@ -1652,6 +1654,11 @@ int main(int argc, char **argv) case 's': ckp.socket_dir = strdup(optarg); break; + case 'u': + if (ckp.proxy || ckp.redirector || ckp.passthrough) + quit(1, "Cannot set both userproxy and another proxy type or redirector"); + ckp.userproxy = ckp.proxy = true; + break; } } diff --git a/src/ckpool.h b/src/ckpool.h index 39d24c01..2593c969 100644 --- a/src/ckpool.h +++ b/src/ckpool.h @@ -189,6 +189,9 @@ struct ckpool_instance { /* Are we running without ckdb */ bool standalone; + /* Are we running in userproxy mode */ + bool userproxy; + /* Should we daemonise the ckpool process */ bool daemon; diff --git a/src/generator.c b/src/generator.c index 24d5493d..f0053df0 100644 --- a/src/generator.c +++ b/src/generator.c @@ -1012,11 +1012,11 @@ static void disable_subproxy(gdata_t *gdata, proxy_instance_t *proxi, proxy_inst epoll_ctl(proxi->epfd, EPOLL_CTL_DEL, subproxy->cs.fd, NULL); Close(subproxy->cs.fd); } + subproxy->disabled = true; if (parent_proxy(subproxy)) return; mutex_lock(&proxi->proxy_lock); - subproxy->disabled = true; /* Make sure subproxy is still in the list */ subproxy = __subproxy_by_id(proxi, subproxy->subid); if (likely(subproxy)) @@ -1167,7 +1167,7 @@ static bool parse_method(ckpool_t *ckp, proxy_instance_t *proxi, const char *msg memset(&err, 0, sizeof(err)); val = json_loads(msg, 0, &err); if (!val) { - LOGWARNING("JSON decode failed(%d): %s", err.line, err.text); + LOGWARNING("JSON decode of msg %s failed(%d): %s", msg, err.line, err.text); goto out; } @@ -1278,8 +1278,13 @@ static bool auth_stratum(ckpool_t *ckp, connsock_t *cs, proxy_instance_t *proxi) val = json_msg_result(buf, &res_val, &err_val); if (!val) { - LOGWARNING("Proxy %d:%d %s failed to get a json result in auth_stratum, got: %s", - proxi->id, proxi->subid, proxi->url, buf); + if (proxi->global) { + LOGWARNING("Proxy %d:%d %s failed to get a json result in auth_stratum, got: %s", + proxi->id, proxi->subid, proxi->url, buf); + } else { + LOGNOTICE("Proxy %d:%d %s failed to get a json result in auth_stratum, got: %s", + proxi->id, proxi->subid, proxi->url, buf); + } goto out; } @@ -1314,6 +1319,12 @@ out: parse_method(ckp, proxi, buf); }; } + if (!proxi->global) { + LOGNOTICE("Disabling userproxy %d:%d %s that failed authorisation as %s", + proxi->id, proxi->subid, proxi->url, proxi->auth); + proxi->disabled = true; + disable_subproxy(ckp->data, proxi->parent, proxi); + } return ret; } @@ -2133,7 +2144,8 @@ static void *userproxy_recv(void *arg) } mutex_unlock(&gdata->share_lock); - do { + timeout = 0; + while ((ret = read_socket_line(cs, &timeout)) > 0) { /* proxy may have been recycled here if it is not a * parent and reconnect was issued */ if (parse_method(ckp, proxy, cs->buf)) @@ -2144,7 +2156,7 @@ static void *userproxy_recv(void *arg) proxy->id, proxy->subid, cs->buf); } timeout = 0; - } while ((ret = read_socket_line(cs, &timeout)) > 0); + } } return NULL; } @@ -2282,6 +2294,24 @@ static proxy_instance_t *__add_userproxy(ckpool_t *ckp, gdata_t *gdata, const in return proxy; } +static void add_userproxy(ckpool_t *ckp, gdata_t *gdata, const int userid, + const char *url, const char *auth, const char *pass) +{ + proxy_instance_t *proxy; + char *newurl = strdup(url); + char *newauth = strdup(auth); + char *newpass = strdup(pass); + int id; + + mutex_lock(&gdata->lock); + id = ckp->proxies++; + proxy = __add_userproxy(ckp, gdata, id, userid, newurl, newauth, newpass); + mutex_unlock(&gdata->lock); + + LOGWARNING("Adding non global user %s, %d proxy %d:%s", auth, userid, id, url); + prepare_proxy(proxy); +} + static void parse_addproxy(ckpool_t *ckp, gdata_t *gdata, const int sockd, const char *buf) { char *url = NULL, *auth = NULL, *pass = NULL; @@ -2555,6 +2585,48 @@ out: send_api_response(val, sockd); } +static void parse_globaluser(ckpool_t *ckp, gdata_t *gdata, const char *buf) +{ + char *url, *username, *pass = strdupa(buf); + int userid = -1, proxyid = -1; + proxy_instance_t *proxy, *tmp; + int64_t clientid = -1; + bool found = false; + + sscanf(buf, "%d:%d:%"PRId64":%s", &proxyid, &userid, &clientid, pass); + if (unlikely(clientid < 0 || userid < 0 || proxyid < 0)) { + LOGWARNING("Failed to parse_globaluser ids from command %s", buf); + return; + } + username = strsep(&pass, ","); + if (unlikely(!username)) { + LOGWARNING("Failed to parse_globaluser username from command %s", buf); + return; + } + + LOGDEBUG("Checking userproxy proxy %d user %d:%"PRId64" worker %s pass %s", + proxyid, userid, clientid, username, pass); + + if (unlikely(proxyid >= ckp->proxies)) { + LOGWARNING("Trying to find non-existent proxy id %d in parse_globaluser", proxyid); + return; + } + + mutex_lock(&gdata->lock); + url = ckp->proxyurl[proxyid]; + HASH_ITER(hh, gdata->proxies, proxy, tmp) { + if (!strcmp(proxy->auth, username)) { + found = true; + break; + } + } + mutex_unlock(&gdata->lock); + + if (found) + return; + add_userproxy(ckp, gdata, userid, url, username, pass); +} + static int proxy_loop(proc_instance_t *pi) { proxy_instance_t *proxi = NULL, *cproxy; @@ -2617,6 +2689,8 @@ retry: parse_ableproxy(gdata, umsg->sockd, buf + 13, true); } else if (cmdmatch(buf, "proxystats")) { parse_proxystats(gdata, umsg->sockd, buf + 11); + } else if (cmdmatch(buf, "globaluser")) { + parse_globaluser(ckp, gdata, buf + 11); } else if (cmdmatch(buf, "shutdown")) { ret = 0; goto out; diff --git a/src/stratifier.c b/src/stratifier.c index f6b8de85..6979d3b8 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -282,6 +282,7 @@ struct stratum_instance { char *useragent; char *workername; + char *password; int user_id; int server; /* Which server is this instance bound to */ @@ -1078,6 +1079,7 @@ static void __kill_instance(sdata_t *sdata, stratum_instance_t *client) client->proxy->parent->combined_clients--; } free(client->workername); + free(client->password); free(client->useragent); memset(client, 0, sizeof(stratum_instance_t)); DL_APPEND(sdata->recycled_instances, client); @@ -4071,14 +4073,26 @@ static void queue_delayed_auth(stratum_instance_t *client) ckdbq_add(ckp, ID_AUTH, val); } +static void check_global_user(ckpool_t *ckp, user_instance_t *user, stratum_instance_t *client) +{ + sdata_t *sdata = ckp->data; + proxy_t *proxy = best_proxy(sdata); + int proxyid = proxy->id; + char buf[256]; + + sprintf(buf, "globaluser=%d:%d:%"PRId64":%s,%s", proxyid, user->id, client->id, + user->username, client->password); + send_generator(ckp, buf, GEN_LAX); +} + /* Needs to be entered with client holding a ref count. */ static json_t *parse_authorise(stratum_instance_t *client, const json_t *params_val, json_t **err_val, int *errnum) { user_instance_t *user; ckpool_t *ckp = client->ckp; + const char *buf, *pass; bool ret = false; - const char *buf; int arr_size; ts_t now; @@ -4112,6 +4126,7 @@ static json_t *parse_authorise(stratum_instance_t *client, const json_t *params_ *err_val = json_string("Invalid character in username"); goto out; } + pass = json_string_value(json_array_get(params_val, 1)); user = generate_user(ckp, client, buf); client->user_id = user->id; ts_realtime(&now); @@ -4119,6 +4134,10 @@ static json_t *parse_authorise(stratum_instance_t *client, const json_t *params_ /* NOTE workername is NULL prior to this so should not be used in code * till after this point */ client->workername = strdup(buf); + if (pass) + client->password = strndup(pass, 64); + else + client->password = strdup(""); if (user->failed_authtime) { time_t now_t = time(NULL); @@ -4161,6 +4180,8 @@ static json_t *parse_authorise(stratum_instance_t *client, const json_t *params_ if (ckp->proxy) { LOGNOTICE("Authorised client %"PRId64" to proxy %d:%d, worker %s as user %s", client->id, client->proxyid, client->subproxyid, buf, user->username); + if (ckp->userproxy) + check_global_user(ckp, user, client); } else { LOGNOTICE("Authorised client %"PRId64" worker %s as user %s", client->id, buf, user->username);