Browse Source

Add preliminary structures required to add user proxies to the generator

master
Con Kolivas 10 years ago
parent
commit
9c60f55b61
  1. 69
      src/generator.c

69
src/generator.c

@ -86,6 +86,7 @@ struct proxy_instance {
bool passthrough; bool passthrough;
int id; /* Proxy server id*/ int id; /* Proxy server id*/
int subid; /* Subproxy id */ int subid; /* Subproxy id */
int userid; /* User id if this proxy is bound to a user */
char *url; char *url;
char *auth; char *auth;
@ -137,8 +138,10 @@ struct proxy_instance {
struct generator_data { struct generator_data {
ckpool_t *ckp; ckpool_t *ckp;
mutex_t lock; /* Lock protecting linked lists */ mutex_t lock; /* Lock protecting linked lists */
proxy_instance_t *proxies; /* Hash list of all proxies */ proxy_instance_t *proxies; /* Hash list of all global proxies */
proxy_instance_t *user_proxies; /* Hash list of all user proxies */
proxy_instance_t *dead_proxies; /* Disabled proxies */ proxy_instance_t *dead_proxies; /* Disabled proxies */
int proxy_notify_id; // Globally increasing notify id int proxy_notify_id; // Globally increasing notify id
ckmsgq_t *srvchk; // Server check message queue ckmsgq_t *srvchk; // Server check message queue
pthread_t pth_uprecv; // User proxy receive thread pthread_t pth_uprecv; // User proxy receive thread
@ -919,13 +922,15 @@ static proxy_instance_t *create_subproxy(gdata_t *gdata, proxy_instance_t *proxi
} }
mutex_unlock(&gdata->lock); mutex_unlock(&gdata->lock);
subproxy->cs.ckp = subproxy->ckp = proxi->ckp; subproxy->ckp = proxi->ckp;
mutex_lock(&proxi->proxy_lock); mutex_lock(&proxi->proxy_lock);
subproxy->subid = ++proxi->subproxy_count; subproxy->subid = ++proxi->subproxy_count;
mutex_unlock(&proxi->proxy_lock); mutex_unlock(&proxi->proxy_lock);
subproxy->id = proxi->id; subproxy->id = proxi->id;
subproxy->userid = proxi->userid;
subproxy->global = proxi->global;
subproxy->url = strdup(url); subproxy->url = strdup(url);
subproxy->auth = strdup(proxi->auth); subproxy->auth = strdup(proxi->auth);
subproxy->pass = strdup(proxi->pass); subproxy->pass = strdup(proxi->pass);
@ -1303,10 +1308,11 @@ static void send_subscribe(ckpool_t *ckp, proxy_instance_t *proxi)
json_t *json_msg; json_t *json_msg;
char *msg, *buf; char *msg, *buf;
JSON_CPACK(json_msg, "{ss,ss,ss,sI,si,ss,si}", JSON_CPACK(json_msg, "{ss,ss,ss,sI,si,ss,si,sb,si}",
"url", proxi->url, "auth", proxi->auth, "pass", proxi->pass, "url", proxi->url, "auth", proxi->auth, "pass", proxi->pass,
"proxy", proxi->id, "subproxy", proxi->subid, "proxy", proxi->id, "subproxy", proxi->subid,
"enonce1", proxi->enonce1, "nonce2len", proxi->nonce2len); "enonce1", proxi->enonce1, "nonce2len", proxi->nonce2len,
"global", proxi->global, "userid", proxi->userid);
msg = json_dumps(json_msg, JSON_NO_UTF8); msg = json_dumps(json_msg, JSON_NO_UTF8);
json_decref(json_msg); json_decref(json_msg);
ASPRINTF(&buf, "subscribe=%s", msg); ASPRINTF(&buf, "subscribe=%s", msg);
@ -2002,13 +2008,22 @@ static void *userproxy_recv(void *arg)
} }
while (42) { while (42) {
proxy_instance_t *proxy, *parent; proxy_instance_t *proxy, *tmpproxy, *parent;
share_msg_t *share, *tmpshare; share_msg_t *share, *tmpshare;
notify_instance_t *ni, *tmp; notify_instance_t *ni, *tmp;
connsock_t *cs; connsock_t *cs;
time_t now; time_t now;
int ret; int ret;
mutex_lock(&gdata->lock);
HASH_ITER(hh, gdata->user_proxies, proxy, tmpproxy) {
if (!proxy->alive) {
proxy->epfd = epfd;
reconnect_proxy(proxy);
}
}
mutex_unlock(&gdata->lock);
ret = epoll_wait(epfd, &event, 1, 1000); ret = epoll_wait(epfd, &event, 1, 1000);
if (ret < 1) { if (ret < 1) {
if (likely(!ret)) if (likely(!ret))
@ -2066,6 +2081,7 @@ static void prepare_proxy(proxy_instance_t *proxi)
proxi->parent = proxi; proxi->parent = proxi;
mutex_init(&proxi->proxy_lock); mutex_init(&proxi->proxy_lock);
add_subproxy(proxi, proxi); add_subproxy(proxi, proxi);
if (proxi->global)
create_pthread(&proxi->pth_precv, proxy_recv, proxi); create_pthread(&proxi->pth_precv, proxy_recv, proxi);
} }
@ -2176,6 +2192,24 @@ out:
static proxy_instance_t *__add_proxy(ckpool_t *ckp, gdata_t *gdata, const int num); static proxy_instance_t *__add_proxy(ckpool_t *ckp, gdata_t *gdata, const int num);
static proxy_instance_t *__add_userproxy(ckpool_t *ckp, gdata_t *gdata, const int id,
const int userid, char *url, char *auth, char *pass)
{
proxy_instance_t *proxy;
proxy = ckzalloc(sizeof(proxy_instance_t));
proxy->id = id;
proxy->userid = userid;
proxy->url = url;
proxy->auth = auth;
proxy->pass = pass;
proxy->ckp = ckp;
mutex_init(&proxy->notify_lock);
mutex_init(&proxy->share_lock);
HASH_ADD_INT(gdata->proxies, id, proxy);
return proxy;
}
static void parse_addproxy(ckpool_t *ckp, gdata_t *gdata, const int sockd, const char *buf) static void parse_addproxy(ckpool_t *ckp, gdata_t *gdata, const int sockd, const char *buf)
{ {
char *url = NULL, *auth = NULL, *pass = NULL; char *url = NULL, *auth = NULL, *pass = NULL;
@ -2193,25 +2227,28 @@ static void parse_addproxy(ckpool_t *ckp, gdata_t *gdata, const int sockd, const
json_get_string(&url, val, "url"); json_get_string(&url, val, "url");
json_get_string(&auth, val, "auth"); json_get_string(&auth, val, "auth");
json_get_string(&pass, val, "pass"); json_get_string(&pass, val, "pass");
if (json_get_int(&userid, val, "userid"))
global = false;
else
global = true;
json_decref(val); json_decref(val);
if (unlikely(!url || !auth || !pass)) { if (unlikely(!url || !auth || !pass)) {
val = json_errormsg("Failed to decode url/auth/pass in addproxy %s", buf); val = json_errormsg("Failed to decode url/auth/pass in addproxy %s", buf);
goto out; goto out;
} }
if (json_get_int(&userid, val, "userid"))
global = false;
else
global = true;
mutex_lock(&gdata->lock); mutex_lock(&gdata->lock);
id = ckp->proxies++; id = ckp->proxies++;
if (global) {
ckp->proxyurl = realloc(ckp->proxyurl, sizeof(char **) * ckp->proxies); ckp->proxyurl = realloc(ckp->proxyurl, sizeof(char **) * ckp->proxies);
ckp->proxyauth = realloc(ckp->proxyauth, sizeof(char **) * ckp->proxies); ckp->proxyauth = realloc(ckp->proxyauth, sizeof(char **) * ckp->proxies);
ckp->proxypass = realloc(ckp->proxypass, sizeof(char **) * ckp->proxies); ckp->proxypass = realloc(ckp->proxypass, sizeof(char **) * ckp->proxies);
ckp->proxyurl[id] = strdup(url); ckp->proxyurl[id] = url;
ckp->proxyauth[id] = strdup(auth); ckp->proxyauth[id] = auth;
ckp->proxypass[id] = strdup(pass); ckp->proxypass[id] = pass;
proxy = __add_proxy(ckp, gdata, id); proxy = __add_proxy(ckp, gdata, id);
} else
proxy = __add_userproxy(ckp, gdata, id, userid, url, auth, pass);
mutex_unlock(&gdata->lock); mutex_unlock(&gdata->lock);
if (global) if (global)
@ -2219,8 +2256,14 @@ static void parse_addproxy(ckpool_t *ckp, gdata_t *gdata, const int sockd, const
else else
LOGNOTICE("Adding user %d proxy %d:%s", userid, id, proxy->url); LOGNOTICE("Adding user %d proxy %d:%s", userid, id, proxy->url);
prepare_proxy(proxy); prepare_proxy(proxy);
if (global) {
JSON_CPACK(val, "{si,ss,ss,ss}", JSON_CPACK(val, "{si,ss,ss,ss}",
"id", proxy->id, "url", url, "auth", auth, "pass", pass); "id", proxy->id, "url", url, "auth", auth, "pass", pass);
} else {
JSON_CPACK(val, "{si,ss,ss,ss,si}",
"id", proxy->id, "url", url, "auth", auth, "pass", pass,
"userid", proxy->userid);
}
out: out:
send_api_response(val, sockd); send_api_response(val, sockd);
} }
@ -2447,10 +2490,10 @@ static proxy_instance_t *__add_proxy(ckpool_t *ckp, gdata_t *gdata, const int id
proxy->auth = strdup(ckp->proxyauth[id]); proxy->auth = strdup(ckp->proxyauth[id]);
proxy->pass = strdup(ckp->proxypass[id]); proxy->pass = strdup(ckp->proxypass[id]);
proxy->ckp = ckp; proxy->ckp = ckp;
proxy->cs.ckp = ckp;
mutex_init(&proxy->notify_lock); mutex_init(&proxy->notify_lock);
mutex_init(&proxy->share_lock); mutex_init(&proxy->share_lock);
HASH_ADD_INT(gdata->proxies, id, proxy); HASH_ADD_INT(gdata->proxies, id, proxy);
proxy->global = true;
return proxy; return proxy;
} }

Loading…
Cancel
Save