From 9c60f55b6163c1147de255dd4fcc3c9819343d40 Mon Sep 17 00:00:00 2001 From: Con Kolivas Date: Fri, 20 Mar 2015 13:54:33 +1100 Subject: [PATCH] Add preliminary structures required to add user proxies to the generator --- src/generator.c | 83 +++++++++++++++++++++++++++++++++++++------------ 1 file changed, 63 insertions(+), 20 deletions(-) diff --git a/src/generator.c b/src/generator.c index fbb67e4d..8613f8c1 100644 --- a/src/generator.c +++ b/src/generator.c @@ -86,6 +86,7 @@ struct proxy_instance { bool passthrough; int id; /* Proxy server id*/ int subid; /* Subproxy id */ + int userid; /* User id if this proxy is bound to a user */ char *url; char *auth; @@ -137,8 +138,10 @@ struct proxy_instance { struct generator_data { ckpool_t *ckp; mutex_t lock; /* Lock protecting linked lists */ - proxy_instance_t *proxies; /* Hash list of all proxies */ + proxy_instance_t *proxies; /* Hash list of all global proxies */ + proxy_instance_t *user_proxies; /* Hash list of all user proxies */ proxy_instance_t *dead_proxies; /* Disabled proxies */ + int proxy_notify_id; // Globally increasing notify id ckmsgq_t *srvchk; // Server check message queue pthread_t pth_uprecv; // User proxy receive thread @@ -919,13 +922,15 @@ static proxy_instance_t *create_subproxy(gdata_t *gdata, proxy_instance_t *proxi } mutex_unlock(&gdata->lock); - subproxy->cs.ckp = subproxy->ckp = proxi->ckp; + subproxy->ckp = proxi->ckp; mutex_lock(&proxi->proxy_lock); subproxy->subid = ++proxi->subproxy_count; mutex_unlock(&proxi->proxy_lock); subproxy->id = proxi->id; + subproxy->userid = proxi->userid; + subproxy->global = proxi->global; subproxy->url = strdup(url); subproxy->auth = strdup(proxi->auth); subproxy->pass = strdup(proxi->pass); @@ -1303,10 +1308,11 @@ static void send_subscribe(ckpool_t *ckp, proxy_instance_t *proxi) json_t *json_msg; char *msg, *buf; - JSON_CPACK(json_msg, "{ss,ss,ss,sI,si,ss,si}", + JSON_CPACK(json_msg, "{ss,ss,ss,sI,si,ss,si,sb,si}", "url", proxi->url, "auth", proxi->auth, "pass", proxi->pass, "proxy", proxi->id, "subproxy", proxi->subid, - "enonce1", proxi->enonce1, "nonce2len", proxi->nonce2len); + "enonce1", proxi->enonce1, "nonce2len", proxi->nonce2len, + "global", proxi->global, "userid", proxi->userid); msg = json_dumps(json_msg, JSON_NO_UTF8); json_decref(json_msg); ASPRINTF(&buf, "subscribe=%s", msg); @@ -2002,13 +2008,22 @@ static void *userproxy_recv(void *arg) } while (42) { - proxy_instance_t *proxy, *parent; + proxy_instance_t *proxy, *tmpproxy, *parent; share_msg_t *share, *tmpshare; notify_instance_t *ni, *tmp; connsock_t *cs; time_t now; int ret; + mutex_lock(&gdata->lock); + HASH_ITER(hh, gdata->user_proxies, proxy, tmpproxy) { + if (!proxy->alive) { + proxy->epfd = epfd; + reconnect_proxy(proxy); + } + } + mutex_unlock(&gdata->lock); + ret = epoll_wait(epfd, &event, 1, 1000); if (ret < 1) { if (likely(!ret)) @@ -2066,7 +2081,8 @@ static void prepare_proxy(proxy_instance_t *proxi) proxi->parent = proxi; mutex_init(&proxi->proxy_lock); add_subproxy(proxi, proxi); - create_pthread(&proxi->pth_precv, proxy_recv, proxi); + if (proxi->global) + create_pthread(&proxi->pth_precv, proxy_recv, proxi); } static proxy_instance_t *wait_best_proxy(ckpool_t *ckp, gdata_t *gdata) @@ -2176,6 +2192,24 @@ out: static proxy_instance_t *__add_proxy(ckpool_t *ckp, gdata_t *gdata, const int num); +static proxy_instance_t *__add_userproxy(ckpool_t *ckp, gdata_t *gdata, const int id, + const int userid, char *url, char *auth, char *pass) +{ + proxy_instance_t *proxy; + + proxy = ckzalloc(sizeof(proxy_instance_t)); + proxy->id = id; + proxy->userid = userid; + proxy->url = url; + proxy->auth = auth; + proxy->pass = pass; + proxy->ckp = ckp; + mutex_init(&proxy->notify_lock); + mutex_init(&proxy->share_lock); + HASH_ADD_INT(gdata->proxies, id, proxy); + return proxy; +} + static void parse_addproxy(ckpool_t *ckp, gdata_t *gdata, const int sockd, const char *buf) { char *url = NULL, *auth = NULL, *pass = NULL; @@ -2193,25 +2227,28 @@ static void parse_addproxy(ckpool_t *ckp, gdata_t *gdata, const int sockd, const json_get_string(&url, val, "url"); json_get_string(&auth, val, "auth"); json_get_string(&pass, val, "pass"); + if (json_get_int(&userid, val, "userid")) + global = false; + else + global = true; json_decref(val); if (unlikely(!url || !auth || !pass)) { val = json_errormsg("Failed to decode url/auth/pass in addproxy %s", buf); goto out; } - if (json_get_int(&userid, val, "userid")) - global = false; - else - global = true; mutex_lock(&gdata->lock); id = ckp->proxies++; - ckp->proxyurl = realloc(ckp->proxyurl, sizeof(char **) * ckp->proxies); - ckp->proxyauth = realloc(ckp->proxyauth, sizeof(char **) * ckp->proxies); - ckp->proxypass = realloc(ckp->proxypass, sizeof(char **) * ckp->proxies); - ckp->proxyurl[id] = strdup(url); - ckp->proxyauth[id] = strdup(auth); - ckp->proxypass[id] = strdup(pass); - proxy = __add_proxy(ckp, gdata, id); + if (global) { + ckp->proxyurl = realloc(ckp->proxyurl, sizeof(char **) * ckp->proxies); + ckp->proxyauth = realloc(ckp->proxyauth, sizeof(char **) * ckp->proxies); + ckp->proxypass = realloc(ckp->proxypass, sizeof(char **) * ckp->proxies); + ckp->proxyurl[id] = url; + ckp->proxyauth[id] = auth; + ckp->proxypass[id] = pass; + proxy = __add_proxy(ckp, gdata, id); + } else + proxy = __add_userproxy(ckp, gdata, id, userid, url, auth, pass); mutex_unlock(&gdata->lock); if (global) @@ -2219,8 +2256,14 @@ static void parse_addproxy(ckpool_t *ckp, gdata_t *gdata, const int sockd, const else LOGNOTICE("Adding user %d proxy %d:%s", userid, id, proxy->url); prepare_proxy(proxy); - JSON_CPACK(val, "{si,ss,ss,ss}", - "id", proxy->id, "url", url, "auth", auth, "pass", pass); + if (global) { + JSON_CPACK(val, "{si,ss,ss,ss}", + "id", proxy->id, "url", url, "auth", auth, "pass", pass); + } else { + JSON_CPACK(val, "{si,ss,ss,ss,si}", + "id", proxy->id, "url", url, "auth", auth, "pass", pass, + "userid", proxy->userid); + } out: send_api_response(val, sockd); } @@ -2447,10 +2490,10 @@ static proxy_instance_t *__add_proxy(ckpool_t *ckp, gdata_t *gdata, const int id proxy->auth = strdup(ckp->proxyauth[id]); proxy->pass = strdup(ckp->proxypass[id]); proxy->ckp = ckp; - proxy->cs.ckp = ckp; mutex_init(&proxy->notify_lock); mutex_init(&proxy->share_lock); HASH_ADD_INT(gdata->proxies, id, proxy); + proxy->global = true; return proxy; }