/* * Copyright 2014-2015 Con Kolivas * * This program is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License as published by the Free * Software Foundation; either version 3 of the License, or (at your option) * any later version. See COPYING for more details. */ #include "config.h" #include #include #include #include #include #include "ckpool.h" #include "libckpool.h" #include "generator.h" #include "bitcoin.h" #include "uthash.h" #include "utlist.h" #include "api.h" struct notify_instance { /* Hash table data */ UT_hash_handle hh; int id; char prevhash[68]; json_t *jobid; char *coinbase1; char *coinbase2; int coinb1len; int merkles; char merklehash[16][68]; char nbit[12]; char ntime[12]; char bbversion[12]; bool clean; time_t notify_time; }; typedef struct notify_instance notify_instance_t; typedef struct proxy_instance proxy_instance_t; struct share_msg { UT_hash_handle hh; int id; // Our own id for submitting upstream int client_id; time_t submit_time; double diff; }; typedef struct share_msg share_msg_t; struct stratum_msg { struct stratum_msg *next; struct stratum_msg *prev; json_t *json_msg; int client_id; }; typedef struct stratum_msg stratum_msg_t; struct pass_msg { connsock_t *cs; char *msg; }; typedef struct pass_msg pass_msg_t; /* Per proxied pool instance data */ struct proxy_instance { UT_hash_handle hh; /* Proxy list */ UT_hash_handle sh; /* Subproxy list */ proxy_instance_t *next; /* For dead proxy list */ proxy_instance_t *prev; /* For dead proxy list */ ckpool_t *ckp; connsock_t cs; bool passthrough; int id; /* Proxy server id*/ int subid; /* Subproxy id */ int userid; /* User id if this proxy is bound to a user */ char *url; char *auth; char *pass; char *enonce1; char *enonce1bin; int nonce1len; int nonce2len; tv_t last_message; double diff; double diff_accepted; double diff_rejected; double total_accepted; /* Used only by parent proxy structures */ double total_rejected; /* "" */ tv_t last_share; bool no_params; /* Doesn't want any parameters on subscribe */ bool global; /* Part of the global list of proxies */ bool notified; /* Has this proxy received any notifies yet */ bool disabled; /* Subproxy no longer to be used */ bool reconnect; /* We need to drop and reconnect */ bool reconnecting; /* Testing of parent in progress */ int64_t recruit; /* No of recruiting requests in progress */ bool alive; pthread_t pth_precv; ckmsgq_t *passsends; // passthrough sends char_entry_t *recvd_lines; /* Linked list of unprocessed messages */ int epfd; /* Epoll fd used by the parent proxy */ mutex_t proxy_lock; /* Lock protecting hashlist of proxies */ proxy_instance_t *parent; /* Parent proxy of subproxies */ proxy_instance_t *subproxies; /* Hashlist of subproxies of this proxy */ int64_t clients_per_proxy; /* Max number of clients of this proxy */ int subproxy_count; /* Number of subproxies */ }; /* Private data for the generator */ struct generator_data { ckpool_t *ckp; mutex_t lock; /* Lock protecting linked lists */ proxy_instance_t *proxies; /* Hash list of all proxies */ proxy_instance_t *dead_proxies; /* Disabled proxies */ int proxies_generated; int subproxies_generated; int proxy_notify_id; // Globally increasing notify id ckmsgq_t *srvchk; // Server check message queue pthread_t pth_uprecv; // User proxy receive thread pthread_t pth_psend; // Combined proxy send thread mutex_t psend_lock; // Lock associated with conditional below pthread_cond_t psend_cond; stratum_msg_t *psends; int psends_generated; mutex_t notify_lock; notify_instance_t *notify_instances; mutex_t share_lock; share_msg_t *shares; int64_t share_id; }; typedef struct generator_data gdata_t; static bool server_alive(ckpool_t *ckp, server_instance_t *si, bool pinging) { char *userpass = NULL; bool ret = false; connsock_t *cs; gbtbase_t *gbt; cs = &si->cs; /* Has this server already been reconnected? */ if (cs->fd > 0) return true; if (!extract_sockaddr(si->url, &cs->url, &cs->port)) { LOGWARNING("Failed to extract address from %s", si->url); return ret; } userpass = strdup(si->auth); realloc_strcat(&userpass, ":"); realloc_strcat(&userpass, si->pass); cs->auth = http_base64(userpass); dealloc(userpass); if (!cs->auth) { LOGWARNING("Failed to create base64 auth from %s", userpass); return ret; } cs->fd = connect_socket(cs->url, cs->port); if (cs->fd < 0) { if (!pinging) LOGWARNING("Failed to connect socket to %s:%s !", cs->url, cs->port); return ret; } /* Test we can connect, authorise and get a block template */ gbt = ckzalloc(sizeof(gbtbase_t)); si->data = gbt; if (!gen_gbtbase(cs, gbt)) { if (!pinging) { LOGINFO("Failed to get test block template from %s:%s!", cs->url, cs->port); } goto out; } clear_gbtbase(gbt); if (!validate_address(cs, ckp->btcaddress)) { LOGWARNING("Invalid btcaddress: %s !", ckp->btcaddress); goto out; } ret = true; out: if (!ret) { /* Close and invalidate the file handle */ Close(cs->fd); } else keep_sockalive(cs->fd); return ret; } /* Find the highest priority server alive and return it */ static server_instance_t *live_server(ckpool_t *ckp) { server_instance_t *alive = NULL; connsock_t *cs; int i; LOGDEBUG("Attempting to connect to bitcoind"); retry: if (!ping_main(ckp)) goto out; for (i = 0; i < ckp->btcds; i++) { server_instance_t *si = ckp->servers[i]; if (server_alive(ckp, si, false)) { alive = si; break; } } if (!alive) { LOGWARNING("CRITICAL: No bitcoinds active!"); sleep(5); goto retry; } cs = &alive->cs; LOGINFO("Connected to live server %s:%s", cs->url, cs->port); out: send_proc(ckp->connector, alive ? "accept" : "reject"); return alive; } static void kill_server(server_instance_t *si) { connsock_t *cs; if (!si) // This shouldn't happen return; LOGNOTICE("Killing server"); cs = &si->cs; Close(cs->fd); empty_buffer(cs); dealloc(cs->url); dealloc(cs->port); dealloc(cs->auth); dealloc(si->data); } static int gen_loop(proc_instance_t *pi) { int sockd = -1, ret = 0, selret; server_instance_t *si = NULL; bool reconnecting = false; unixsock_t *us = &pi->us; ckpool_t *ckp = pi->ckp; gdata_t *gdata = ckp->data; bool started = false; char *buf = NULL; connsock_t *cs; gbtbase_t *gbt; char hash[68]; reconnect: Close(sockd); if (si) { kill_server(si); reconnecting = true; } si = live_server(ckp); if (!si) goto out; gbt = si->data; cs = &si->cs; if (reconnecting) { LOGWARNING("Failed over to bitcoind: %s:%s", cs->url, cs->port); reconnecting = false; } retry: Close(sockd); ckmsgq_add(gdata->srvchk, si); do { selret = wait_read_select(us->sockd, 5); if (!selret && !ping_main(ckp)) { LOGEMERG("Generator failed to ping main process, exiting"); ret = 1; goto out; } } while (selret < 1); if (unlikely(cs->fd < 0)) { LOGWARNING("Bitcoind socket invalidated, will attempt failover"); goto reconnect; } sockd = accept(us->sockd, NULL, NULL); if (sockd < 0) { LOGEMERG("Failed to accept on generator socket"); ret = 1; goto out; } dealloc(buf); buf = recv_unix_msg(sockd); if (!buf) { LOGWARNING("Failed to get message in gen_loop"); goto retry; } LOGDEBUG("Generator received request: %s", buf); if (cmdmatch(buf, "shutdown")) { ret = 0; goto out; } if (cmdmatch(buf, "getbase")) { if (!gen_gbtbase(cs, gbt)) { LOGWARNING("Failed to get block template from %s:%s", cs->url, cs->port); send_unix_msg(sockd, "Failed"); goto reconnect; } else { char *s = json_dumps(gbt->json, JSON_NO_UTF8); send_unix_msg(sockd, s); free(s); clear_gbtbase(gbt); } } else if (cmdmatch(buf, "getbest")) { if (si->notify) send_unix_msg(sockd, "notify"); else if (!get_bestblockhash(cs, hash)) { LOGINFO("No best block hash support from %s:%s", cs->url, cs->port); send_unix_msg(sockd, "failed"); } else { if (unlikely(!started)) { started = true; LOGWARNING("%s generator ready", ckp->name); } send_unix_msg(sockd, hash); } } else if (cmdmatch(buf, "getlast")) { int height; if (si->notify) send_unix_msg(sockd, "notify"); else if ((height = get_blockcount(cs)) == -1) { send_unix_msg(sockd, "failed"); goto reconnect; } else { LOGDEBUG("Height: %d", height); if (!get_blockhash(cs, height, hash)) { send_unix_msg(sockd, "failed"); goto reconnect; } else { if (unlikely(!started)) { started = true; LOGWARNING("%s generator ready", ckp->name); } send_unix_msg(sockd, hash); LOGDEBUG("Hash: %s", hash); } } } else if (cmdmatch(buf, "submitblock:")) { char blockmsg[80]; bool ret; LOGNOTICE("Submitting block data!"); ret = submit_block(cs, buf + 12 + 64 + 1); memset(buf + 12 + 64, 0, 1); sprintf(blockmsg, "%sblock:%s", ret ? "" : "no", buf + 12); send_proc(ckp->stratifier, blockmsg); } else if (cmdmatch(buf, "checkaddr:")) { if (validate_address(cs, buf + 10)) send_unix_msg(sockd, "true"); else send_unix_msg(sockd, "false"); } else if (cmdmatch(buf, "reconnect")) { goto reconnect; } else if (cmdmatch(buf, "loglevel")) { sscanf(buf, "loglevel=%d", &ckp->loglevel); } else if (cmdmatch(buf, "ping")) { LOGDEBUG("Generator received ping request"); send_unix_msg(sockd, "pong"); } goto retry; out: kill_server(si); dealloc(buf); return ret; } static bool send_json_msg(connsock_t *cs, const json_t *json_msg) { int len, sent; char *s; s = json_dumps(json_msg, JSON_ESCAPE_SLASH | JSON_EOL); LOGDEBUG("Sending json msg: %s", s); len = strlen(s); sent = write_socket(cs->fd, s, len); dealloc(s); if (sent != len) { LOGNOTICE("Failed to send %d bytes sent %d in send_json_msg", len, sent); return false; } return true; } static bool connect_proxy(connsock_t *cs) { cs->fd = connect_socket(cs->url, cs->port); if (cs->fd < 0) { LOGINFO("Failed to connect socket to %s:%s in connect_proxy", cs->url, cs->port); return false; } keep_sockalive(cs->fd); return true; } /* Decode a string that should have a json message and return just the contents * of the result key or NULL. */ static json_t *json_result(json_t *val) { json_t *res_val = NULL, *err_val; res_val = json_object_get(val, "result"); /* (null) is a valid result while no value is an error, so mask out * (null) and only handle lack of result */ if (json_is_null(res_val)) res_val = NULL; else if (!res_val) { char *ss; err_val = json_object_get(val, "error"); if (err_val) ss = json_dumps(err_val, 0); else ss = strdup("(unknown reason)"); LOGNOTICE("JSON-RPC decode of json_result failed: %s", ss); free(ss); } return res_val; } /* Return the error value if one exists */ static json_t *json_errval(json_t *val) { json_t *err_val = json_object_get(val, "error"); return err_val; } /* Parse a string and return the json value it contains, if any, and the * result in res_val. Return NULL if no result key is found. */ static json_t *json_msg_result(char *msg, json_t **res_val, json_t **err_val) { json_error_t err; json_t *val; *res_val = NULL; val = json_loads(msg, 0, &err); if (!val) { LOGWARNING("Json decode failed(%d): %s", err.line, err.text); goto out; } *res_val = json_result(val); *err_val = json_errval(val); out: return val; } /* For some reason notify is buried at various different array depths so use * a reentrant function to try and find it. */ static json_t *find_notify(json_t *val) { int arr_size, i; json_t *ret = NULL; const char *entry; if (!json_is_array(val)) return NULL; arr_size = json_array_size(val); entry = json_string_value(json_array_get(val, 0)); if (cmdmatch(entry, "mining.notify")) return val; for (i = 0; i < arr_size; i++) { json_t *arr_val; arr_val = json_array_get(val, i); ret = find_notify(arr_val); if (ret) break; } return ret; } /* Get stored line in the proxy linked list of messages if any exist or NULL */ static char *cached_proxy_line(proxy_instance_t *proxi) { char *buf = NULL; if (proxi->recvd_lines) { char_entry_t *char_t = proxi->recvd_lines; DL_DELETE(proxi->recvd_lines, char_t); buf = char_t->buf; free(char_t); } return buf; } /* Get next line in the proxy linked list of messages or a new line from the * connsock if there are none. */ static char *next_proxy_line(connsock_t *cs, proxy_instance_t *proxi) { char *buf = cached_proxy_line(proxi); if (!buf && read_socket_line(cs, 5) > 0) buf = strdup(cs->buf); return buf; } /* For appending a line to the proxy recv list */ static void append_proxy_line(proxy_instance_t *proxi, const char *buf) { char_entry_t *char_t = ckalloc(sizeof(char_entry_t)); char_t->buf = strdup(buf); DL_APPEND(proxi->recvd_lines, char_t); } /* Get a new line from the connsock and return a copy of it */ static char *new_proxy_line(connsock_t *cs) { char *buf = NULL; if (read_socket_line(cs, 5) < 1) goto out; buf = strdup(cs->buf); out: return buf; } static inline bool parent_proxy(proxy_instance_t *proxy) { return (proxy->parent == proxy); } static void recruit_subproxies(proxy_instance_t *proxi, const int recruits); static bool parse_subscribe(connsock_t *cs, proxy_instance_t *proxi) { json_t *val = NULL, *res_val, *notify_val, *tmp; bool parsed, ret = false; proxy_instance_t *parent; int retries = 0, size; const char *string; char *buf, *old; retry: parsed = true; if (!(buf = new_proxy_line(cs))) { LOGNOTICE("Proxy %d:%d %s failed to receive line in parse_subscribe", proxi->id, proxi->subid, proxi->url); goto out; } LOGDEBUG("parse_subscribe received %s", buf); /* Ignore err_val here stored in &tmp */ val = json_msg_result(buf, &res_val, &tmp); if (!val || !res_val) { LOGINFO("Failed to get a json result in parse_subscribe, got: %s", buf); parsed = false; } if (!json_is_array(res_val)) { LOGINFO("Result in parse_subscribe not an array"); parsed = false; } size = json_array_size(res_val); if (size < 3) { LOGINFO("Result in parse_subscribe array too small"); parsed = false; } notify_val = find_notify(res_val); if (!notify_val) { LOGINFO("Failed to find notify in parse_subscribe"); parsed = false; } if (!parsed) { if (++retries < 3) { /* We don't want this response so put it on the proxy * recvd list to be parsed later */ append_proxy_line(proxi, buf); buf = NULL; goto retry; } LOGNOTICE("Proxy %d:%d %s failed to parse subscribe response in parse_subscribe", proxi->id, proxi->subid, proxi->url); goto out; } tmp = json_array_get(res_val, 1); if (!tmp || !json_is_string(tmp)) { LOGWARNING("Failed to parse enonce1 in parse_subscribe"); goto out; } string = json_string_value(tmp); old = proxi->enonce1; proxi->enonce1 = strdup(string); free(old); proxi->nonce1len = strlen(proxi->enonce1) / 2; if (proxi->nonce1len > 15) { LOGWARNING("Nonce1 too long at %d", proxi->nonce1len); goto out; } old = proxi->enonce1bin; proxi->enonce1bin = ckalloc(proxi->nonce1len); free(old); hex2bin(proxi->enonce1bin, proxi->enonce1, proxi->nonce1len); tmp = json_array_get(res_val, 2); if (!tmp || !json_is_integer(tmp)) { LOGWARNING("Failed to parse nonce2len in parse_subscribe"); goto out; } size = json_integer_value(tmp); if (size < 1 || size > 8) { LOGWARNING("Invalid nonce2len %d in parse_subscribe", size); goto out; } if (size < 3) { if (!proxi->subid) { LOGWARNING("Proxy %d %s Nonce2 length %d too small for fast miners", proxi->id, proxi->url, size); } else { LOGNOTICE("Proxy %d:%d Nonce2 length %d too small for fast miners", proxi->id, proxi->subid, size); } } proxi->nonce2len = size; proxi->clients_per_proxy = 1ll << ((size - 3) * 8); parent = proxi->parent; mutex_lock(&parent->proxy_lock); parent->recruit -= proxi->clients_per_proxy; if (parent->recruit < 0) parent->recruit = 0; mutex_unlock(&parent->proxy_lock); LOGNOTICE("Found notify for new proxy %d:%d with enonce %s nonce2len %d", proxi->id, proxi->subid, proxi->enonce1, proxi->nonce2len); ret = true; out: if (val) json_decref(val); free(buf); return ret; } static bool subscribe_stratum(connsock_t *cs, proxy_instance_t *proxi) { bool ret = false; json_t *req; retry: /* Attempt to connect with the client description */ if (!proxi->no_params) { JSON_CPACK(req, "{s:i,s:s,s:[s]}", "id", 0, "method", "mining.subscribe", "params", PACKAGE"/"VERSION); /* Then try without any parameters */ } else { JSON_CPACK(req, "{s:i,s:s,s:[]}", "id", 0, "method", "mining.subscribe", "params"); } ret = send_json_msg(cs, req); json_decref(req); if (!ret) { LOGNOTICE("Proxy %d:%d %s failed to send message in subscribe_stratum", proxi->id, proxi->subid, proxi->url); goto out; } ret = parse_subscribe(cs, proxi); if (ret) goto out; if (proxi->no_params) { LOGNOTICE("Proxy %d:%d %s failed all subscription options in subscribe_stratum", proxi->id, proxi->subid, proxi->url); goto out; } LOGINFO("Proxy %d:%d %s failed connecting with parameters in subscribe_stratum, retrying without", proxi->id, proxi->subid, proxi->url); proxi->no_params = true; ret = connect_proxy(cs); if (!ret) { LOGNOTICE("Proxy %d:%d %s failed to reconnect in subscribe_stratum", proxi->id, proxi->subid, proxi->url); goto out; } goto retry; out: if (!ret && cs->fd > 0) { epoll_ctl(proxi->epfd, EPOLL_CTL_DEL, cs->fd, NULL); Close(cs->fd); } return ret; } static bool passthrough_stratum(connsock_t *cs, proxy_instance_t *proxi) { json_t *req, *val = NULL, *res_val, *err_val; bool ret = false; JSON_CPACK(req, "{s:s,s:[s]}", "method", "mining.passthrough", "params", PACKAGE"/"VERSION); ret = send_json_msg(cs, req); json_decref(req); if (!ret) { LOGWARNING("Failed to send message in passthrough_stratum"); goto out; } if (read_socket_line(cs, 5) < 1) { LOGWARNING("Failed to receive line in passthrough_stratum"); goto out; } /* Ignore err_val here since we should always get a result from an * upstream passthrough server */ val = json_msg_result(cs->buf, &res_val, &err_val); if (!val || !res_val) { LOGWARNING("Failed to get a json result in passthrough_stratum, got: %s", cs->buf); goto out; } ret = json_is_true(res_val); if (!ret) { LOGWARNING("Denied passthrough for stratum"); goto out; } proxi->passthrough = true; out: if (val) json_decref(val); if (!ret) Close(cs->fd); return ret; } static void send_notify(ckpool_t *ckp, proxy_instance_t *proxi, notify_instance_t *ni); static void reconnect_generator(const ckpool_t *ckp) { send_proc(ckp->generator, "reconnect"); } static bool parse_notify(ckpool_t *ckp, proxy_instance_t *proxi, json_t *val) { const char *prev_hash, *bbversion, *nbit, *ntime; gdata_t *gdata = proxi->ckp->data; char *coinbase1, *coinbase2; const char *jobidbuf; bool clean, ret = false; notify_instance_t *ni; json_t *arr, *job_id; int merkles, i; arr = json_array_get(val, 4); if (!arr || !json_is_array(arr)) goto out; merkles = json_array_size(arr); job_id = json_copy(json_array_get(val, 0)); prev_hash = __json_array_string(val, 1); coinbase1 = json_array_string(val, 2); coinbase2 = json_array_string(val, 3); bbversion = __json_array_string(val, 5); nbit = __json_array_string(val, 6); ntime = __json_array_string(val, 7); clean = json_is_true(json_array_get(val, 8)); if (!job_id || !prev_hash || !coinbase1 || !coinbase2 || !bbversion || !nbit || !ntime) { if (job_id) json_decref(job_id); if (coinbase1) free(coinbase1); if (coinbase2) free(coinbase2); goto out; } LOGDEBUG("Received new notify from proxy %d:%d", proxi->id, proxi->subid); ni = ckzalloc(sizeof(notify_instance_t)); ni->jobid = job_id; jobidbuf = json_string_value(job_id); LOGDEBUG("JobID %s", jobidbuf); ni->coinbase1 = coinbase1; LOGDEBUG("Coinbase1 %s", coinbase1); ni->coinb1len = strlen(coinbase1) / 2; ni->coinbase2 = coinbase2; LOGDEBUG("Coinbase2 %s", coinbase2); memcpy(ni->prevhash, prev_hash, 65); LOGDEBUG("Prevhash %s", prev_hash); memcpy(ni->bbversion, bbversion, 9); LOGDEBUG("BBVersion %s", bbversion); memcpy(ni->nbit, nbit, 9); LOGDEBUG("Nbit %s", nbit); memcpy(ni->ntime, ntime, 9); LOGDEBUG("Ntime %s", ntime); ni->clean = clean; LOGDEBUG("Clean %s", clean ? "true" : "false"); LOGDEBUG("Merkles %d", merkles); for (i = 0; i < merkles; i++) { const char *merkle = __json_array_string(arr, i); LOGDEBUG("Merkle %d %s", i, merkle); memcpy(&ni->merklehash[i][0], merkle, 65); } ni->merkles = merkles; ret = true; ni->notify_time = time(NULL); /* Add the notify instance to the parent proxy list, not the subproxy */ mutex_lock(&gdata->notify_lock); ni->id = gdata->proxy_notify_id++; HASH_ADD_INT(gdata->notify_instances, id, ni); mutex_unlock(&gdata->notify_lock); send_notify(ckp, proxi, ni); out: return ret; } static bool parse_diff(proxy_instance_t *proxi, json_t *val) { double diff = json_number_value(json_array_get(val, 0)); if (diff == 0 || diff == proxi->diff) return true; proxi->diff = diff; return true; } static bool send_version(proxy_instance_t *proxi, json_t *val) { json_t *json_msg, *id_val = json_object_dup(val, "id"); bool ret; JSON_CPACK(json_msg, "{sossso}", "id", id_val, "result", PACKAGE"/"VERSION, "error", json_null()); ret = send_json_msg(&proxi->cs, json_msg); json_decref(json_msg); return ret; } static bool show_message(json_t *val) { const char *msg; if (!json_is_array(val)) return false; msg = json_string_value(json_array_get(val, 0)); if (!msg) return false; LOGNOTICE("Pool message: %s", msg); return true; } static bool send_pong(proxy_instance_t *proxi, json_t *val) { json_t *json_msg, *id_val = json_object_dup(val, "id"); bool ret; JSON_CPACK(json_msg, "{sossso}", "id", id_val, "result", "pong", "error", json_null()); ret = send_json_msg(&proxi->cs, json_msg); json_decref(json_msg); return ret; } static void prepare_proxy(proxy_instance_t *proxi); /* Creates a duplicate instance or proxi to be used as a subproxy, ignoring * fields we don't use in the subproxy. */ static proxy_instance_t *create_subproxy(ckpool_t *ckp, gdata_t *gdata, proxy_instance_t *proxi, const char *url) { proxy_instance_t *subproxy; mutex_lock(&gdata->lock); if (gdata->dead_proxies) { /* Recycle an old proxy instance if one exists */ subproxy = gdata->dead_proxies; DL_DELETE(gdata->dead_proxies, subproxy); subproxy->disabled = false; } else { gdata->subproxies_generated++; subproxy = ckzalloc(sizeof(proxy_instance_t)); } mutex_unlock(&gdata->lock); subproxy->cs.ckp = subproxy->ckp = ckp; mutex_lock(&proxi->proxy_lock); subproxy->subid = ++proxi->subproxy_count; mutex_unlock(&proxi->proxy_lock); subproxy->id = proxi->id; subproxy->userid = proxi->userid; subproxy->global = proxi->global; subproxy->url = strdup(url); subproxy->auth = strdup(proxi->auth); subproxy->pass = strdup(proxi->pass); subproxy->parent = proxi; subproxy->epfd = proxi->epfd; return subproxy; } static void add_subproxy(proxy_instance_t *proxi, proxy_instance_t *subproxy) { mutex_lock(&proxi->proxy_lock); HASH_ADD(sh, proxi->subproxies, subid, sizeof(int), subproxy); mutex_unlock(&proxi->proxy_lock); } static proxy_instance_t *__subproxy_by_id(proxy_instance_t *proxy, const int subid) { proxy_instance_t *subproxy; HASH_FIND(sh, proxy->subproxies, &subid, sizeof(int), subproxy); return subproxy; } /* Add to the dead list to be recycled if possible */ static void store_proxy(gdata_t *gdata, proxy_instance_t *proxy) { LOGINFO("Recycling data from proxy %d:%d", proxy->id, proxy->subid); mutex_lock(&gdata->lock); dealloc(proxy->enonce1); dealloc(proxy->url); dealloc(proxy->auth); dealloc(proxy->pass); DL_APPEND(gdata->dead_proxies, proxy); mutex_unlock(&gdata->lock); } static void send_stratifier_deadproxy(ckpool_t *ckp, const int id, const int subid) { char buf[256]; sprintf(buf, "deadproxy=%d:%d", id, subid); send_proc(ckp->stratifier, buf); } /* Remove the subproxy from the proxi list and put it on the dead list. * Further use of the subproxy pointer may point to a new proxy but will not * dereference */ static void disable_subproxy(gdata_t *gdata, proxy_instance_t *proxi, proxy_instance_t *subproxy) { subproxy->alive = false; send_stratifier_deadproxy(gdata->ckp, subproxy->id, subproxy->subid); if (subproxy->cs.fd > 0) { epoll_ctl(proxi->epfd, EPOLL_CTL_DEL, subproxy->cs.fd, NULL); Close(subproxy->cs.fd); } if (parent_proxy(subproxy)) return; mutex_lock(&proxi->proxy_lock); subproxy->disabled = true; /* Make sure subproxy is still in the list */ subproxy = __subproxy_by_id(proxi, subproxy->subid); if (likely(subproxy)) HASH_DELETE(sh, proxi->subproxies, subproxy); mutex_unlock(&proxi->proxy_lock); if (subproxy) { send_stratifier_deadproxy(gdata->ckp, subproxy->id, subproxy->subid); store_proxy(gdata, subproxy); } } static bool parse_reconnect(proxy_instance_t *proxy, json_t *val) { bool sameurl = false, ret = false; ckpool_t *ckp = proxy->ckp; gdata_t *gdata = ckp->data; proxy_instance_t *parent; const char *new_url; int new_port; char *url; new_url = json_string_value(json_array_get(val, 0)); new_port = json_integer_value(json_array_get(val, 1)); /* See if we have an invalid entry listing port as a string instead of * integer and handle that. */ if (!new_port) { const char *newport_string = json_string_value(json_array_get(val, 1)); if (newport_string) sscanf(newport_string, "%d", &new_port); } if (new_url && strlen(new_url) && new_port) { char *dot_pool, *dot_reconnect; int len; dot_pool = strchr(proxy->url, '.'); if (!dot_pool) { LOGWARNING("Denied stratum reconnect request from server without domain %s", proxy->url); goto out; } dot_reconnect = strchr(new_url, '.'); if (!dot_reconnect) { LOGWARNING("Denied stratum reconnect request to url without domain %s", new_url); goto out; } len = strlen(dot_reconnect); if (strncmp(dot_pool, dot_reconnect, len)) { LOGWARNING("Denied stratum reconnect request from %s to non-matching domain %s", proxy->url, new_url); goto out; } ASPRINTF(&url, "%s:%d", new_url, new_port); } else { url = strdup(proxy->url); sameurl = true; } LOGINFO("Processing reconnect request to %s", url); ret = true; parent = proxy->parent; disable_subproxy(gdata, parent, proxy); if (parent != proxy) { /* If this is a subproxy we only need to create a new one if * the url has changed. Otherwise automated recruiting will * take care of creating one if needed. */ if (!sameurl) create_subproxy(ckp, gdata, parent, url); goto out; } proxy->reconnect = true; LOGWARNING("Proxy %d:%s reconnect issue to %s, dropping existing connection", proxy->id, proxy->url, url); if (!sameurl) { char *oldurl = proxy->url; proxy->url = url; free(oldurl); } out: return ret; } static void send_diff(ckpool_t *ckp, proxy_instance_t *proxi) { proxy_instance_t *proxy = proxi->parent; json_t *json_msg; char *msg, *buf; /* Not set yet */ if (!proxi->diff) return; JSON_CPACK(json_msg, "{sIsisf}", "proxy", proxy->id, "subproxy", proxi->subid, "diff", proxi->diff); msg = json_dumps(json_msg, JSON_NO_UTF8); json_decref(json_msg); ASPRINTF(&buf, "diff=%s", msg); free(msg); send_proc(ckp->stratifier, buf); free(buf); } static void send_notify(ckpool_t *ckp, proxy_instance_t *proxi, notify_instance_t *ni) { proxy_instance_t *proxy = proxi->parent; json_t *json_msg, *merkle_arr; char *msg, *buf; int i; merkle_arr = json_array(); for (i = 0; i < ni->merkles; i++) json_array_append_new(merkle_arr, json_string(&ni->merklehash[i][0])); /* Use our own jobid instead of the server's one for easy lookup */ JSON_CPACK(json_msg, "{sIsisisssisssssosssssssb}", "proxy", proxy->id, "subproxy", proxi->subid, "jobid", ni->id, "prevhash", ni->prevhash, "coinb1len", ni->coinb1len, "coinbase1", ni->coinbase1, "coinbase2", ni->coinbase2, "merklehash", merkle_arr, "bbversion", ni->bbversion, "nbit", ni->nbit, "ntime", ni->ntime, "clean", ni->clean); msg = json_dumps(json_msg, JSON_NO_UTF8); json_decref(json_msg); ASPRINTF(&buf, "notify=%s", msg); free(msg); send_proc(ckp->stratifier, buf); free(buf); /* Send diff now as stratifier will not accept diff till it has a * valid workbase */ send_diff(ckp, proxi); } static bool parse_method(ckpool_t *ckp, proxy_instance_t *proxi, const char *msg) { json_t *val = NULL, *method, *err_val, *params; json_error_t err; bool ret = false; const char *buf; memset(&err, 0, sizeof(err)); val = json_loads(msg, 0, &err); if (!val) { LOGWARNING("JSON decode failed(%d): %s", err.line, err.text); goto out; } method = json_object_get(val, "method"); if (!method) { LOGDEBUG("Failed to find method in json for parse_method"); goto out; } err_val = json_object_get(val, "error"); params = json_object_get(val, "params"); if (err_val && !json_is_null(err_val)) { char *ss; if (err_val) ss = json_dumps(err_val, 0); else ss = strdup("(unknown reason)"); LOGINFO("JSON-RPC method decode failed: %s", ss); free(ss); goto out; } if (!json_is_string(method)) { LOGINFO("Method is not string in parse_method"); goto out; } buf = json_string_value(method); if (!buf || strlen(buf) < 1) { LOGINFO("Invalid string for method in parse_method"); goto out; } LOGDEBUG("Proxy %d:%d received method %s", proxi->id, proxi->subid, buf); if (cmdmatch(buf, "mining.notify")) { ret = parse_notify(ckp, proxi, params); goto out; } if (cmdmatch(buf, "mining.set_difficulty")) { ret = parse_diff(proxi, params); if (likely(ret)) send_diff(ckp, proxi); goto out; } if (cmdmatch(buf, "client.reconnect")) { ret = parse_reconnect(proxi, params); goto out; } if (cmdmatch(buf, "client.get_version")) { ret = send_version(proxi, val); goto out; } if (cmdmatch(buf, "client.show_message")) { ret = show_message(params); goto out; } if (cmdmatch(buf, "mining.ping")) { ret = send_pong(proxi, val); goto out; } out: if (val) json_decref(val); return ret; } static bool auth_stratum(ckpool_t *ckp, connsock_t *cs, proxy_instance_t *proxi) { json_t *val = NULL, *res_val, *req, *err_val; char *buf = NULL; bool ret; JSON_CPACK(req, "{s:i,s:s,s:[s,s]}", "id", 42, "method", "mining.authorize", "params", proxi->auth, proxi->pass); ret = send_json_msg(cs, req); json_decref(req); if (!ret) { LOGNOTICE("Proxy %d:%d %s failed to send message in auth_stratum", proxi->id, proxi->subid, proxi->url); if (cs->fd > 0) { epoll_ctl(proxi->epfd, EPOLL_CTL_DEL, cs->fd, NULL); Close(cs->fd); } goto out; } /* Read and parse any extra methods sent. Anything left in the buffer * should be the response to our auth request. */ do { free(buf); buf = next_proxy_line(cs, proxi); if (!buf) { LOGNOTICE("Proxy %d:%d %s failed to receive line in auth_stratum", proxi->id, proxi->subid, proxi->url); ret = false; goto out; } ret = parse_method(ckp, proxi, buf); } while (ret); val = json_msg_result(buf, &res_val, &err_val); if (!val) { LOGWARNING("Proxy %d:%d %s failed to get a json result in auth_stratum, got: %s", proxi->id, proxi->subid, proxi->url, buf); goto out; } if (err_val && !json_is_null(err_val)) { LOGWARNING("Proxy %d:%d %s failed to authorise in auth_stratum due to err_val, got: %s", proxi->id, proxi->subid, proxi->url, buf); goto out; } if (res_val) { ret = json_is_true(res_val); if (!ret) { LOGWARNING("Proxy %d:%d %s failed to authorise in auth_stratum, got: %s", proxi->id, proxi->subid, proxi->url, buf); goto out; } } else { /* No result and no error but successful val means auth success */ ret = true; } LOGINFO("Proxy %d:%d %s auth success in auth_stratum", proxi->id, proxi->subid, proxi->url); out: if (val) json_decref(val); if (ret) { /* Now parse any cached responses so there are none in the * queue and they can be managed one at a time from now on. */ while(42) { dealloc(buf); buf = cached_proxy_line(proxi); if (!buf) break; parse_method(ckp, proxi, buf); }; } return ret; } static proxy_instance_t *proxy_by_id(gdata_t *gdata, const int id) { proxy_instance_t *proxi; mutex_lock(&gdata->lock); HASH_FIND_INT(gdata->proxies, &id, proxi); mutex_unlock(&gdata->lock); return proxi; } static void send_subscribe(ckpool_t *ckp, proxy_instance_t *proxi) { json_t *json_msg; char *msg, *buf; JSON_CPACK(json_msg, "{ss,ss,ss,sI,si,ss,si,sb,si}", "url", proxi->url, "auth", proxi->auth, "pass", proxi->pass, "proxy", proxi->id, "subproxy", proxi->subid, "enonce1", proxi->enonce1, "nonce2len", proxi->nonce2len, "global", proxi->global, "userid", proxi->userid); msg = json_dumps(json_msg, JSON_NO_UTF8); json_decref(json_msg); ASPRINTF(&buf, "subscribe=%s", msg); free(msg); send_proc(ckp->stratifier, buf); free(buf); } static proxy_instance_t *subproxy_by_id(proxy_instance_t *proxy, const int subid) { proxy_instance_t *subproxy; mutex_lock(&proxy->proxy_lock); subproxy = __subproxy_by_id(proxy, subid); mutex_unlock(&proxy->proxy_lock); return subproxy; } static void drop_proxy(gdata_t *gdata, const char *buf) { proxy_instance_t *proxy, *subproxy; int id = -1, subid = -1; sscanf(buf, "dropproxy=%d:%d", &id, &subid); if (unlikely(!subid)) { LOGWARNING("Generator asked to drop parent proxy %d", id); return; } proxy = proxy_by_id(gdata, id); if (unlikely(!proxy)) { LOGINFO("Generator asked to drop subproxy from non-existent parent %d", id); return; } subproxy = subproxy_by_id(proxy, subid); if (!subproxy) { LOGINFO("Generator asked to drop non-existent subproxy %d:%d", id, subid); return; } LOGNOTICE("Generator asked to drop proxy %d:%d", id, subid); disable_subproxy(gdata, proxy, subproxy); } static void stratifier_reconnect_client(ckpool_t *ckp, const int64_t id) { char buf[256]; sprintf(buf, "reconnclient=%"PRId64, id); send_proc(ckp->stratifier, buf); } static void submit_share(gdata_t *gdata, json_t *val) { proxy_instance_t *proxy, *proxi; ckpool_t *ckp = gdata->ckp; bool success = false; stratum_msg_t *msg; share_msg_t *share; int64_t client_id; int id, subid; /* Get the client id so we can tell the stratifier to drop it if the * proxy it's bound to is not functional */ if (unlikely(!json_get_int64(&client_id, val, "client_id"))) { LOGWARNING("Got no client_id in share"); goto out; } if (unlikely(!json_get_int(&id, val, "proxy"))) { LOGWARNING("Got no proxy in share"); goto out; } if (unlikely(!json_get_int(&subid, val, "subproxy"))) { LOGWARNING("Got no subproxy in share"); goto out; } proxy = proxy_by_id(gdata, id); if (unlikely(!proxy)) { LOGNOTICE("Client %"PRId64" sending shares to non existent proxy %d, dropping", client_id, id); stratifier_reconnect_client(ckp, client_id); goto out; } proxi = subproxy_by_id(proxy, subid); if (unlikely(!proxi)) { LOGNOTICE("Client %"PRId64" sending shares to non existent subproxy %d:%d, dropping", client_id, id, subid); stratifier_reconnect_client(ckp, client_id); goto out; } if (!proxi->alive) { LOGNOTICE("Client %"PRId64" sending shares to dead subproxy %d:%d, dropping", client_id, id, subid); stratifier_reconnect_client(ckp, client_id); goto out; } success = true; msg = ckzalloc(sizeof(stratum_msg_t)); share = ckzalloc(sizeof(share_msg_t)); share->submit_time = time(NULL); share->client_id = client_id; share->diff = proxi->diff; msg->json_msg = val; /* Add new share entry to the share hashtable */ mutex_lock(&gdata->share_lock); share->id = gdata->share_id++; HASH_ADD_I64(gdata->shares, id, share); mutex_unlock(&gdata->share_lock); json_object_set_nocheck(val, "id", json_integer(share->id)); /* Add the new message to the psend list */ mutex_lock(&gdata->psend_lock); gdata->psends_generated++; DL_APPEND(gdata->psends, msg); pthread_cond_signal(&gdata->psend_cond); mutex_unlock(&gdata->psend_lock); out: if (!success) json_decref(val); } static void clear_notify(notify_instance_t *ni) { if (ni->jobid) json_decref(ni->jobid); free(ni->coinbase1); free(ni->coinbase2); free(ni); } static void account_shares(proxy_instance_t *proxy, const double diff, const bool result) { proxy_instance_t *parent = proxy->parent; mutex_lock(&parent->proxy_lock); if (result) { proxy->diff_accepted += diff; parent->total_accepted += diff; } else { proxy->diff_rejected += diff; parent->total_rejected += diff; } mutex_unlock(&parent->proxy_lock); } static bool parse_share(gdata_t *gdata, proxy_instance_t *proxi, const char *buf) { bool ret = false, result = false; json_t *val = NULL, *idval; share_msg_t *share; int64_t id; val = json_loads(buf, 0, NULL); if (unlikely(!val)) { LOGINFO("Failed to parse upstream json msg: %s", buf); goto out; } idval = json_object_get(val, "id"); if (unlikely(!idval)) { LOGINFO("Failed to find id in upstream json msg: %s", buf); goto out; } id = json_integer_value(idval); if (unlikely(!json_get_bool(&result, val, "result"))) { LOGINFO("Failed to find result in upstream json msg: %s", buf); } mutex_lock(&gdata->share_lock); HASH_FIND_I64(gdata->shares, &id, share); if (share) HASH_DEL(gdata->shares, share); mutex_unlock(&gdata->share_lock); /* We set response to true even if we don't find the matching share, * so long as we recognised it as a share response */ ret = true; if (!share) { LOGINFO("Proxy %d:%d failed to find matching share to result: %s", proxi->id, proxi->subid, buf); /* We don't know what diff these shares are so assume the * current proxy diff. */ account_shares(proxi, proxi->diff, result); goto out; } account_shares(proxi, share->diff, result); LOGINFO("Proxy %d:%d share result %s from client %d", proxi->id, proxi->subid, buf, share->client_id); free(share); out: if (val) json_decref(val); return ret; } typedef struct cs_msg cs_msg_t; struct cs_msg { cs_msg_t *next; cs_msg_t *prev; proxy_instance_t *proxy; json_t *val; }; /* Sends all messages in the queue ready to be dispatched, leaving those that * fail write select to be handled next pass */ static void send_json_msgq(gdata_t *gdata, cs_msg_t **csmsgq) { cs_msg_t *csmsg, *tmp; int ret; DL_FOREACH_SAFE(*csmsgq, csmsg, tmp) { connsock_t *cs = &(csmsg->proxy->cs); if ((ret = wait_write_select(cs->fd, 0))) { DL_DELETE(*csmsgq, csmsg); if (ret > 0) ret = send_json_msg(cs, csmsg->val); json_decref(csmsg->val); if (ret < 1) { proxy_instance_t *proxy = csmsg->proxy; LOGNOTICE("Proxy %d:%d %s failed to send msg in send_json_msgq, dropping", proxy->id, proxy->subid, proxy->url); disable_subproxy(gdata, proxy->parent, proxy); } free(csmsg); } } } static void add_json_msgq(cs_msg_t **csmsgq, proxy_instance_t *proxy, json_t **val) { cs_msg_t *csmsg = ckalloc(sizeof(cs_msg_t)); csmsg->val = *val; *val = NULL; csmsg->proxy = proxy; DL_APPEND(*csmsgq, csmsg); } /* For processing and sending shares. proxy refers to parent proxy here */ static void *proxy_send(void *arg) { ckpool_t *ckp = (ckpool_t *)arg; gdata_t *gdata = ckp->data; stratum_msg_t *msg = NULL; cs_msg_t *csmsgq = NULL; rename_proc("proxysend"); pthread_detach(pthread_self()); while (42) { proxy_instance_t *proxy, *subproxy; int proxyid = 0, subid = 0; int64_t client_id = 0, id; notify_instance_t *ni; json_t *jobid = NULL; json_t *val; if (unlikely(msg)) { json_decref(msg->json_msg); free(msg); } mutex_lock(&gdata->psend_lock); if (!gdata->psends) { /* Poll every 10ms */ const ts_t polltime = {0, 10000000}; ts_t timeout_ts; ts_realtime(&timeout_ts); timeraddspec(&timeout_ts, &polltime); cond_timedwait(&gdata->psend_cond, &gdata->psend_lock, &timeout_ts); } msg = gdata->psends; if (likely(msg)) DL_DELETE(gdata->psends, msg); mutex_unlock(&gdata->psend_lock); if (!msg) { send_json_msgq(gdata, &csmsgq); continue; } if (unlikely(!json_get_int(&subid, msg->json_msg, "subproxy"))) { LOGWARNING("Failed to find subproxy in proxy_send msg"); continue; } if (unlikely(!json_get_int64(&id, msg->json_msg, "jobid"))) { LOGWARNING("Failed to find jobid in proxy_send msg"); continue; } if (unlikely(!json_get_int(&proxyid, msg->json_msg, "proxy"))) { LOGWARNING("Failed to find proxy in proxy_send msg"); continue; } if (unlikely(!json_get_int64(&client_id, msg->json_msg, "client_id"))) { LOGWARNING("Failed to find client_id in proxy_send msg"); continue; } proxy = proxy_by_id(gdata, proxyid); if (unlikely(!proxy)) { LOGWARNING("Proxysend for got message for non-existent proxy %d", proxyid); continue; } subproxy = subproxy_by_id(proxy, subid); if (unlikely(!subproxy)) { LOGWARNING("Proxysend for got message for non-existent subproxy %d:%d", proxyid, subid); continue; } mutex_lock(&gdata->notify_lock); HASH_FIND_INT(gdata->notify_instances, &id, ni); if (ni) jobid = json_copy(ni->jobid); mutex_unlock(&gdata->notify_lock); if (unlikely(!jobid)) { stratifier_reconnect_client(ckp, client_id); LOGNOTICE("Proxy %d:%s failed to find matching jobid in proxysend", subproxy->id, subproxy->url); continue; } JSON_CPACK(val, "{s[soooo]soss}", "params", subproxy->auth, jobid, json_object_dup(msg->json_msg, "nonce2"), json_object_dup(msg->json_msg, "ntime"), json_object_dup(msg->json_msg, "nonce"), "id", json_object_dup(msg->json_msg, "id"), "method", "mining.submit"); add_json_msgq(&csmsgq, subproxy, &val); send_json_msgq(gdata, &csmsgq); } return NULL; } static void passthrough_send(ckpool_t __maybe_unused *ckp, pass_msg_t *pm) { int len, sent; LOGDEBUG("Sending upstream json msg: %s", pm->msg); len = strlen(pm->msg); sent = write_socket(pm->cs->fd, pm->msg, len); if (sent != len) { /* FIXME: Do something about this? */ LOGWARNING("Failed to passthrough %d bytes of message %s", len, pm->msg); } free(pm->msg); free(pm); } static void passthrough_add_send(proxy_instance_t *proxi, const char *msg) { pass_msg_t *pm = ckzalloc(sizeof(pass_msg_t)); pm->cs = &proxi->cs; ASPRINTF(&pm->msg, "%s\n", msg); ckmsgq_add(proxi->passsends, pm); } static bool proxy_alive(ckpool_t *ckp, proxy_instance_t *proxi, connsock_t *cs, bool pinging, int epfd) { struct epoll_event event; bool ret = false; /* Has this proxy already been reconnected? */ if (cs->fd > 0) return true; if (proxi->disabled) return false; if (!extract_sockaddr(proxi->url, &cs->url, &cs->port)) { LOGWARNING("Failed to extract address from %s", proxi->url); goto out; } if (!connect_proxy(cs)) { if (!pinging) { LOGINFO("Failed to connect to %s:%s in proxy_mode!", cs->url, cs->port); } goto out; } if (ckp->passthrough) { if (!passthrough_stratum(cs, proxi)) { LOGWARNING("Failed initial passthrough to %s:%s !", cs->url, cs->port); goto out; } ret = true; goto out; } /* Test we can connect, authorise and get stratum information */ if (!subscribe_stratum(cs, proxi)) { if (!pinging) { LOGWARNING("Failed initial subscribe to %s:%s !", cs->url, cs->port); } goto out; } if (!ckp->passthrough) send_subscribe(ckp, proxi); if (!auth_stratum(ckp, cs, proxi)) { if (!pinging) { LOGWARNING("Failed initial authorise to %s:%s with %s:%s !", cs->url, cs->port, proxi->auth, proxi->pass); } goto out; } ret = true; out: if (!ret) { send_stratifier_deadproxy(ckp, proxi->id, proxi->subid); /* Close and invalidate the file handle */ if (cs->fd > 0) { epoll_ctl(proxi->epfd, EPOLL_CTL_DEL, cs->fd, NULL); Close(cs->fd); } } else { keep_sockalive(cs->fd); event.events = EPOLLIN; event.data.ptr = proxi; /* Add this connsock_t to the epoll list */ if (unlikely(epoll_ctl(epfd, EPOLL_CTL_ADD, cs->fd, &event) == -1)) { LOGERR("Failed to add fd %d to epfd %d to epoll_ctl in proxy_alive", cs->fd, epfd); return false; } } proxi->alive = ret; return ret; } static void *proxy_recruit(void *arg) { proxy_instance_t *proxy, *parent = (proxy_instance_t *)arg; ckpool_t *ckp = parent->ckp; gdata_t *gdata = ckp->data; bool recruit, alive; pthread_detach(pthread_self()); retry: recruit = false; proxy = create_subproxy(ckp, gdata, parent, parent->url); alive = proxy_alive(ckp, proxy, &proxy->cs, false, parent->epfd); if (!alive) { LOGNOTICE("Subproxy failed proxy_alive testing"); store_proxy(gdata, proxy); } else add_subproxy(parent, proxy); mutex_lock(&parent->proxy_lock); if (alive && parent->recruit > 0) recruit = true; else /* Reset so the next request will try again */ parent->recruit = 0; mutex_unlock(&parent->proxy_lock); if (recruit) goto retry; return NULL; } static void recruit_subproxies(proxy_instance_t *proxi, const int recruits) { bool recruit = false; pthread_t pth; mutex_lock(&proxi->proxy_lock); if (!proxi->recruit) recruit = true; if (proxi->recruit < recruits) proxi->recruit = recruits; mutex_unlock(&proxi->proxy_lock); if (recruit) create_pthread(&pth, proxy_recruit, proxi); } /* Queue up to the requested amount */ static void recruit_subproxy(gdata_t *gdata, const char *buf) { int recruits = 1, id = 0; proxy_instance_t *proxy; sscanf(buf, "recruit=%d:%d", &id, &recruits); proxy = proxy_by_id(gdata, id); if (unlikely(!proxy)) { LOGNOTICE("Generator failed to find proxy id %d to recruit subproxies", id); return; } recruit_subproxies(proxy, recruits); } static void *proxy_reconnect(void *arg) { proxy_instance_t *proxy = (proxy_instance_t *)arg; connsock_t *cs = &proxy->cs; ckpool_t *ckp = proxy->ckp; pthread_detach(pthread_self()); proxy_alive(ckp, proxy, cs, true, proxy->epfd); proxy->reconnecting = false; return NULL; } /* For reconnecting the parent proxy instance async */ static void reconnect_proxy(proxy_instance_t *proxi) { pthread_t pth; if (proxi->reconnecting) return; proxi->reconnecting = true; create_pthread(&pth, proxy_reconnect, proxi); } /* For receiving messages from an upstream pool to pass downstream. Responsible * for setting up the connection and testing pool is live. */ static void *passthrough_recv(void *arg) { proxy_instance_t *proxi = (proxy_instance_t *)arg; connsock_t *cs = &proxi->cs; ckpool_t *ckp = proxi->ckp; struct epoll_event event; bool alive; int epfd; rename_proc("passrecv"); proxi->epfd = epfd = epoll_create1(EPOLL_CLOEXEC); if (epfd < 0){ LOGEMERG("FATAL: Failed to create epoll in passrecv"); return NULL; } if (proxy_alive(ckp, proxi, cs, false, epfd)) { reconnect_generator(ckp); LOGWARNING("Proxy %d:%s connection established", proxi->id, proxi->url); } alive = proxi->alive; while (42) { int ret; while (!proxy_alive(ckp, proxi, cs, true, epfd)) { if (alive) { alive = false; reconnect_generator(ckp); } sleep(5); } if (!alive) reconnect_generator(ckp); /* Make sure we receive a line within 90 seconds */ ret = epoll_wait(epfd, &event, 1, 90000); if (likely(ret > 0)) ret = read_socket_line(cs, 60); if (ret < 1) { LOGWARNING("Proxy %d:%s failed to read_socket_line in passthrough_recv, attempting reconnect", proxi->id, proxi->url); alive = proxi->alive = false; reconnect_generator(ckp); continue; } /* Simply forward the message on, as is, to the connector to * process. Possibly parse parameters sent by upstream pool * here */ send_proc(ckp->connector, cs->buf); } return NULL; } static bool subproxies_alive(proxy_instance_t *proxy) { proxy_instance_t *subproxy, *tmp; bool ret = false; mutex_lock(&proxy->proxy_lock); HASH_ITER(sh, proxy->subproxies, subproxy, tmp) { if (subproxy->alive) { ret = true; break; } } mutex_unlock(&proxy->proxy_lock); return ret; } /* For receiving messages from the upstream proxy, also responsible for setting * up the connection and testing it's alive. */ static void *proxy_recv(void *arg) { proxy_instance_t *proxi = (proxy_instance_t *)arg; connsock_t *cs = &proxi->cs; proxy_instance_t *subproxy; ckpool_t *ckp = proxi->ckp; gdata_t *gdata = ckp->data; struct epoll_event event; bool alive; int epfd; rename_proc("proxyrecv"); pthread_detach(pthread_self()); proxi->epfd = epfd = epoll_create1(EPOLL_CLOEXEC); if (epfd < 0){ LOGEMERG("FATAL: Failed to create epoll in proxyrecv"); return NULL; } if (proxy_alive(ckp, proxi, cs, false, epfd)) { LOGWARNING("Proxy %d:%s connection established", proxi->id, proxi->url); } alive = proxi->alive; while (42) { share_msg_t *share, *tmpshare; notify_instance_t *ni, *tmp; time_t now; int ret; subproxy = proxi; if (!proxi->alive) { reconnect_proxy(proxi); while (!subproxies_alive(proxi)) { reconnect_proxy(proxi); if (alive) { LOGWARNING("Proxy %d:%s failed, attempting reconnect", proxi->id, proxi->url); alive = false; } sleep(5); } } if (!alive) { LOGWARNING("Proxy %d:%s recovered", proxi->id, proxi->url); alive = true; } now = time(NULL); /* Age old notifications older than 10 mins old */ mutex_lock(&gdata->notify_lock); HASH_ITER(hh, gdata->notify_instances, ni, tmp) { if (HASH_COUNT(gdata->notify_instances) < 3) break; if (ni->notify_time < now - 600) { HASH_DEL(gdata->notify_instances, ni); clear_notify(ni); } } mutex_unlock(&gdata->notify_lock); /* Similary with shares older than 2 mins without response */ mutex_lock(&gdata->share_lock); HASH_ITER(hh, gdata->shares, share, tmpshare) { if (share->submit_time < now - 120) { HASH_DEL(gdata->shares, share); } } mutex_unlock(&gdata->share_lock); /* If we don't get an update within 10 minutes the upstream pool * has likely stopped responding. */ ret = epoll_wait(epfd, &event, 1, 600000); if (likely(ret > 0)) { subproxy = event.data.ptr; cs = &subproxy->cs; if (event.events & EPOLLHUP) ret = -1; else ret = read_socket_line(cs, 5); } if (ret < 1) { LOGNOTICE("Proxy %d:%d %s failed to epoll/read_socket_line in proxy_recv", proxi->id, subproxy->subid, subproxy->url); disable_subproxy(gdata, proxi, subproxy); continue; } do { /* subproxy may have been recycled here if it is not a * parent and reconnect was issued */ if (parse_method(ckp, subproxy, cs->buf)) continue; /* If it's not a method it should be a share result */ if (!parse_share(gdata, subproxy, cs->buf)) LOGNOTICE("Proxy %d:%d unhandled stratum message: %s", subproxy->id, subproxy->subid, cs->buf); } while ((ret = read_socket_line(cs, 0)) > 0); } return NULL; } /* Thread that handles all received messages from user proxies */ static void *userproxy_recv(void *arg) { ckpool_t *ckp = (ckpool_t *)arg; gdata_t *gdata = ckp->data; struct epoll_event event; int epfd; rename_proc("uproxyrecv"); pthread_detach(pthread_self()); epfd = epoll_create1(EPOLL_CLOEXEC); if (epfd < 0){ LOGEMERG("FATAL: Failed to create epoll in userproxy_recv"); return NULL; } while (42) { proxy_instance_t *proxy, *tmpproxy; share_msg_t *share, *tmpshare; notify_instance_t *ni, *tmp; connsock_t *cs; time_t now; int ret; mutex_lock(&gdata->lock); HASH_ITER(hh, gdata->proxies, proxy, tmpproxy) { if (!proxy->global && !proxy->alive) { proxy->epfd = epfd; reconnect_proxy(proxy); } } mutex_unlock(&gdata->lock); ret = epoll_wait(epfd, &event, 1, 1000); if (ret < 1) { if (likely(!ret)) continue; LOGEMERG("Failed to epoll_wait in userproxy_recv"); break; } proxy = event.data.ptr; cs = &proxy->cs; if (event.events & EPOLLHUP) { LOGNOTICE("Proxy %d:%d %s hung up in epoll_wait", proxy->id, proxy->subid, proxy->url); disable_subproxy(gdata, proxy->parent, proxy); continue; } now = time(NULL); mutex_lock(&gdata->notify_lock); HASH_ITER(hh, gdata->notify_instances, ni, tmp) { if (HASH_COUNT(gdata->notify_instances) < 3) break; if (ni->notify_time < now - 600) { HASH_DEL(gdata->notify_instances, ni); clear_notify(ni); } } mutex_unlock(&gdata->notify_lock); /* Similary with shares older than 2 mins without response */ mutex_lock(&gdata->share_lock); HASH_ITER(hh, gdata->shares, share, tmpshare) { if (share->submit_time < now - 120) { HASH_DEL(gdata->shares, share); } } mutex_unlock(&gdata->share_lock); do { /* proxy may have been recycled here if it is not a * parent and reconnect was issued */ if (parse_method(ckp, proxy, cs->buf)) continue; /* If it's not a method it should be a share result */ if (!parse_share(gdata, proxy, cs->buf)) LOGNOTICE("Proxy %d:%d unhandled stratum message: %s", proxy->id, proxy->subid, cs->buf); } while ((ret = read_socket_line(cs, 0)) > 0); } return NULL; } static void prepare_proxy(proxy_instance_t *proxi) { proxi->parent = proxi; mutex_init(&proxi->proxy_lock); add_subproxy(proxi, proxi); if (proxi->global) create_pthread(&proxi->pth_precv, proxy_recv, proxi); } static proxy_instance_t *wait_best_proxy(ckpool_t *ckp, gdata_t *gdata) { proxy_instance_t *ret = NULL, *proxi, *tmp; int retries = 0; while (42) { if (!ping_main(ckp)) break; mutex_lock(&gdata->lock); HASH_ITER(hh, gdata->proxies, proxi, tmp) { if (proxi->disabled || !proxi->global) continue; if (proxi->alive || subproxies_alive(proxi)) { if (!ret || proxi->id < ret->id) ret = proxi; } } mutex_unlock(&gdata->lock); if (ret) break; /* Send reject message if we are unable to find an active * proxy for more than 5 seconds */ if (!((++retries) % 5)) send_proc(ckp->connector, "reject"); sleep(1); } send_proc(ckp->connector, ret ? "accept" : "reject"); return ret; } static void send_list(gdata_t *gdata, const int sockd) { proxy_instance_t *proxy, *tmp; json_t *val, *array_val; array_val = json_array(); mutex_lock(&gdata->lock); HASH_ITER(hh, gdata->proxies, proxy, tmp) { JSON_CPACK(val, "{si,sb,si,ss,ss,sf,sb,sb,sb,si}", "id", proxy->id, "global", proxy->global, "userid", proxy->userid, "auth", proxy->auth, "pass", proxy->pass, "diff", proxy->diff, "notified", proxy->notified, "disabled", proxy->disabled, "alive", proxy->alive, "subproxies", proxy->subproxy_count); if (proxy->enonce1) { json_set_string(val, "enonce1", proxy->enonce1); json_set_int(val, "nonce1len", proxy->nonce1len); json_set_int(val, "nonce2len", proxy->nonce2len); } json_array_append_new(array_val, val); } mutex_unlock(&gdata->lock); JSON_CPACK(val, "{so}", "proxies", array_val); send_api_response(val, sockd); } static void send_sublist(gdata_t *gdata, const int sockd, const char *buf) { proxy_instance_t *proxy, *subproxy, *tmp; json_t *val = NULL, *array_val; json_error_t err_val; int64_t id; array_val = json_array(); val = json_loads(buf, 0, &err_val); if (unlikely(!val)) { val = json_encode_errormsg(&err_val); goto out; } if (unlikely(!json_get_int64(&id, val, "id"))) { val = json_errormsg("Failed to get ID in send_sublist JSON: %s", buf); goto out; } proxy = proxy_by_id(gdata, id); if (unlikely(!proxy)) { val = json_errormsg("Failed to find proxy %"PRId64" in send_sublist", id); goto out; } mutex_lock(&gdata->lock); HASH_ITER(sh, proxy->subproxies, subproxy, tmp) { JSON_CPACK(val, "{si,ss,ss,sf,sb,sb,sb}", "subid", subproxy->id, "auth", subproxy->auth, "pass", subproxy->pass, "diff", subproxy->diff, "notified", subproxy->notified, "disabled", subproxy->disabled, "alive", subproxy->alive); if (subproxy->enonce1) { json_set_string(val, "enonce1", subproxy->enonce1); json_set_int(val, "nonce1len", subproxy->nonce1len); json_set_int(val, "nonce2len", subproxy->nonce2len); } json_array_append_new(array_val, val); } mutex_unlock(&gdata->lock); JSON_CPACK(val, "{so}", "subproxies", array_val); out: send_api_response(val, sockd); } static proxy_instance_t *__add_proxy(ckpool_t *ckp, gdata_t *gdata, const int num); static proxy_instance_t *__add_userproxy(ckpool_t *ckp, gdata_t *gdata, const int id, const int userid, char *url, char *auth, char *pass) { proxy_instance_t *proxy; gdata->proxies_generated++; proxy = ckzalloc(sizeof(proxy_instance_t)); proxy->id = id; proxy->userid = userid; proxy->url = url; proxy->auth = auth; proxy->pass = pass; proxy->ckp = proxy->cs.ckp = ckp; HASH_ADD_INT(gdata->proxies, id, proxy); return proxy; } static void parse_addproxy(ckpool_t *ckp, gdata_t *gdata, const int sockd, const char *buf) { char *url = NULL, *auth = NULL, *pass = NULL; proxy_instance_t *proxy; json_error_t err_val; json_t *val = NULL; int id, userid; bool global; val = json_loads(buf, 0, &err_val); if (unlikely(!val)) { val = json_encode_errormsg(&err_val); goto out; } json_get_string(&url, val, "url"); json_get_string(&auth, val, "auth"); json_get_string(&pass, val, "pass"); if (json_get_int(&userid, val, "userid")) global = false; else global = true; json_decref(val); if (unlikely(!url || !auth || !pass)) { val = json_errormsg("Failed to decode url/auth/pass in addproxy %s", buf); goto out; } mutex_lock(&gdata->lock); id = ckp->proxies++; if (global) { ckp->proxyurl = realloc(ckp->proxyurl, sizeof(char **) * ckp->proxies); ckp->proxyauth = realloc(ckp->proxyauth, sizeof(char **) * ckp->proxies); ckp->proxypass = realloc(ckp->proxypass, sizeof(char **) * ckp->proxies); ckp->proxyurl[id] = url; ckp->proxyauth[id] = auth; ckp->proxypass[id] = pass; proxy = __add_proxy(ckp, gdata, id); } else proxy = __add_userproxy(ckp, gdata, id, userid, url, auth, pass); mutex_unlock(&gdata->lock); if (global) LOGNOTICE("Adding global proxy %d:%s", id, proxy->url); else LOGNOTICE("Adding user %d proxy %d:%s", userid, id, proxy->url); prepare_proxy(proxy); if (global) { JSON_CPACK(val, "{si,ss,ss,ss}", "id", proxy->id, "url", url, "auth", auth, "pass", pass); } else { JSON_CPACK(val, "{si,ss,ss,ss,si}", "id", proxy->id, "url", url, "auth", auth, "pass", pass, "userid", proxy->userid); } out: send_api_response(val, sockd); } static void delete_proxy(ckpool_t *ckp, gdata_t *gdata, proxy_instance_t *proxy) { proxy_instance_t *subproxy; /* Remove the proxy from the master list first */ mutex_lock(&gdata->lock); HASH_DEL(gdata->proxies, proxy); /* Disable all its threads */ pthread_cancel(proxy->pth_precv); Close(proxy->cs.fd); mutex_unlock(&gdata->lock); /* Recycle all its subproxies */ do { mutex_lock(&proxy->proxy_lock); subproxy = proxy->subproxies; if (subproxy) HASH_DELETE(sh, proxy->subproxies, subproxy); mutex_unlock(&proxy->proxy_lock); send_stratifier_deadproxy(ckp, subproxy->id, subproxy->subid); if (subproxy && proxy != subproxy) store_proxy(gdata, subproxy); } while (subproxy); /* Recycle the proxy itself */ store_proxy(gdata, proxy); } static void parse_delproxy(ckpool_t *ckp, gdata_t *gdata, const int sockd, const char *buf) { proxy_instance_t *proxy; json_error_t err_val; json_t *val = NULL; int id = -1; val = json_loads(buf, 0, &err_val); if (unlikely(!val)) { val = json_encode_errormsg(&err_val); goto out; } json_get_int(&id, val, "id"); proxy = proxy_by_id(gdata, id); if (!proxy) { val = json_errormsg("Proxy id %d not found", id); goto out; } JSON_CPACK(val, "{si,ss,ss,ss}", "id", proxy->id, "url", proxy->url, "auth", proxy->auth, "pass", proxy->pass); LOGNOTICE("Deleting proxy %d:%s", proxy->id, proxy->url); delete_proxy(ckp, gdata, proxy); out: send_api_response(val, sockd); } static void parse_ableproxy(gdata_t *gdata, const int sockd, const char *buf, bool disable) { proxy_instance_t *proxy; json_error_t err_val; json_t *val = NULL; int id = -1; val = json_loads(buf, 0, &err_val); if (unlikely(!val)) { val = json_encode_errormsg(&err_val); goto out; } json_get_int(&id, val, "id"); proxy = proxy_by_id(gdata, id); if (!proxy) { val = json_errormsg("Proxy id %d not found", id); goto out; } JSON_CPACK(val, "{si,ss,ss,ss}", "id", proxy->id, "url", proxy->url, "auth", proxy->auth, "pass", proxy->pass); if (proxy->disabled != disable) { proxy->disabled = disable; LOGNOTICE("%sabling proxy %d:%s", disable ? "Dis" : "En", id, proxy->url); } if (disable) disable_subproxy(gdata, proxy, proxy); else reconnect_proxy(proxy); out: send_api_response(val, sockd); } static void send_stats(gdata_t *gdata, const int sockd) { json_t *val = json_object(), *subval; int total_objects, objects, generated; proxy_instance_t *proxy; stratum_msg_t *msg; int64_t memsize; mutex_lock(&gdata->lock); objects = HASH_COUNT(gdata->proxies); memsize = SAFE_HASH_OVERHEAD(gdata->proxies) + sizeof(proxy_instance_t) * objects; generated = gdata->proxies_generated; JSON_CPACK(subval, "{si,si,si}", "count", objects, "memory", memsize, "generated", generated); json_set_object(val, "proxies", subval); DL_COUNT(gdata->dead_proxies, proxy, objects); memsize = sizeof(proxy_instance_t) * objects; JSON_CPACK(subval, "{si,si}", "count", objects, "memory", memsize); json_set_object(val, "dead_proxies", subval); total_objects = memsize = 0; for (proxy = gdata->proxies; proxy; proxy=proxy->hh.next) { mutex_lock(&proxy->proxy_lock); total_objects += objects = HASH_COUNT(proxy->subproxies); memsize += SAFE_HASH_OVERHEAD(proxy->subproxies) + sizeof(proxy_instance_t) * objects; mutex_unlock(&proxy->proxy_lock); } generated = gdata->subproxies_generated; mutex_unlock(&gdata->lock); JSON_CPACK(subval, "{si,si,si}", "count", total_objects, "memory", memsize, "generated", generated); json_set_object(val, "subproxies", subval); mutex_lock(&gdata->notify_lock); objects = HASH_COUNT(gdata->notify_instances); memsize = SAFE_HASH_OVERHEAD(gdata->notify_instances) + sizeof(notify_instance_t) * objects; generated = gdata->proxy_notify_id; mutex_unlock(&gdata->notify_lock); JSON_CPACK(subval, "{si,si,si}", "count", objects, "memory", memsize, "generated", generated); json_set_object(val, "notifies", subval); mutex_lock(&gdata->share_lock); objects = HASH_COUNT(gdata->shares); memsize = SAFE_HASH_OVERHEAD(gdata->shares) + sizeof(share_msg_t) * objects; generated = gdata->share_id; mutex_unlock(&gdata->share_lock); JSON_CPACK(subval, "{si,si,si}", "count", objects, "memory", memsize, "generated", generated); json_set_object(val, "shares", subval); mutex_lock(&gdata->psend_lock); DL_COUNT(gdata->psends, msg, objects); generated = gdata->psends_generated; mutex_unlock(&gdata->psend_lock); memsize = sizeof(stratum_msg_t) * objects; JSON_CPACK(subval, "{si,si,si}", "count", objects, "memory", memsize, "generated", generated); json_set_object(val, "psends", subval); send_api_response(val, sockd); } static void parse_proxystats(gdata_t *gdata, const int sockd, const char *buf) { proxy_instance_t *proxy; json_error_t err_val; bool totals = false; json_t *val = NULL; int id, subid = 0; val = json_loads(buf, 0, &err_val); if (unlikely(!val)) { val = json_encode_errormsg(&err_val); goto out; } if (!json_get_int(&id, val, "id")) { val = json_errormsg("Failed to find id key"); goto out; } if (!json_get_int(&subid, val, "subid")) totals = true; proxy = proxy_by_id(gdata, id); if (!proxy) { val = json_errormsg("Proxy id %d not found", id); goto out; } if (!totals) proxy = subproxy_by_id(proxy, subid); if (!proxy) { val = json_errormsg("Proxy id %d:%d not found", id, subid); goto out; } if (totals) { JSON_CPACK(val, "{si,si,ss,ss,ss,ss,si,si,sf,sf,sf,si,sb,sb,sb,sb,sI,si}", "id", proxy->id, "userid", proxy->userid, "url", proxy->url, "auth", proxy->auth, "pass", proxy->pass, "enonce1", proxy->enonce1 ? proxy->enonce1 : "", "nonce1len", proxy->nonce1len, "nonce2len", proxy->nonce2len, "diff", proxy->diff, "accepted", proxy->total_accepted, "rejected", proxy->total_rejected, "lastshare", proxy->last_share.tv_sec, "global", proxy->global, "notified", proxy->notified, "disabled", proxy->disabled, "alive", proxy->alive, "maxclients", proxy->clients_per_proxy, "subproxies", proxy->subproxy_count); } else { JSON_CPACK(val, "{si,si,si,ss,ss,ss,ss,si,si,sf,sf,sf,si,sb,sb,sb,sb,sI,si}", "id", proxy->id, "subid", proxy->subid, "userid", proxy->userid, "url", proxy->url, "auth", proxy->auth, "pass", proxy->pass, "enonce1", proxy->enonce1 ? proxy->enonce1 : "", "nonce1len", proxy->nonce1len, "nonce2len", proxy->nonce2len, "diff", proxy->diff, "accepted", proxy->diff_accepted, "rejected", proxy->diff_rejected, "lastshare", proxy->last_share.tv_sec, "global", proxy->global, "notified", proxy->notified, "disabled", proxy->disabled, "alive", proxy->alive, "maxclients", proxy->clients_per_proxy, "subproxies", proxy->subproxy_count); } out: send_api_response(val, sockd); } static int proxy_loop(proc_instance_t *pi) { proxy_instance_t *proxi = NULL, *cproxy; int sockd = -1, ret = 0, selret; unixsock_t *us = &pi->us; ckpool_t *ckp = pi->ckp; gdata_t *gdata = ckp->data; char *buf = NULL; reconnect: Close(sockd); /* This does not necessarily mean we reconnect, but a change has * occurred and we need to reexamine the proxies. */ cproxy = wait_best_proxy(ckp, gdata); if (!cproxy) goto out; if (proxi != cproxy) { proxi = cproxy; if (!ckp->passthrough) { LOGWARNING("Successfully connected to proxy %d %s as proxy", proxi->id, proxi->url); } } retry: Close(sockd); do { selret = wait_read_select(us->sockd, 5); if (!selret && !ping_main(ckp)) { LOGEMERG("Generator failed to ping main process, exiting"); ret = 1; goto out; } } while (selret < 1); sockd = accept(us->sockd, NULL, NULL); if (sockd < 0) { LOGEMERG("Failed to accept on proxy socket"); ret = 1; goto out; } dealloc(buf); buf = recv_unix_msg(sockd); if (!buf) { LOGWARNING("Failed to get message in proxy_loop"); goto retry; } LOGDEBUG("Proxy received request: %s", buf); if (likely(buf[0] == '{')) { if (ckp->passthrough) passthrough_add_send(proxi, buf); else { /* Anything remaining should be share submissions */ json_t *val = json_loads(buf, 0, NULL); if (unlikely(!val)) LOGWARNING("Generator received invalid json message: %s", buf); else submit_share(gdata, val); } } else if (cmdmatch(buf, "stats")) { send_stats(gdata, sockd); } else if (cmdmatch(buf, "list")) { send_list(gdata, sockd); } else if (cmdmatch(buf, "sublist")) { send_sublist(gdata, sockd, buf + 8); } else if (cmdmatch(buf, "addproxy")) { parse_addproxy(ckp, gdata, sockd, buf + 9); } else if (cmdmatch(buf, "delproxy")) { parse_delproxy(ckp, gdata, sockd, buf + 9); } else if (cmdmatch(buf, "enableproxy")) { parse_ableproxy(gdata, sockd, buf + 12, false); } else if (cmdmatch(buf, "disableproxy")) { parse_ableproxy(gdata, sockd, buf + 13, true); } else if (cmdmatch(buf, "proxystats")) { parse_proxystats(gdata, sockd, buf + 11); } else if (cmdmatch(buf, "shutdown")) { ret = 0; goto out; } else if (cmdmatch(buf, "reconnect")) { goto reconnect; } else if (cmdmatch(buf, "submitblock:")) { LOGNOTICE("Submitting likely block solve share to upstream pool"); } else if (cmdmatch(buf, "loglevel")) { sscanf(buf, "loglevel=%d", &ckp->loglevel); } else if (cmdmatch(buf, "ping")) { LOGDEBUG("Proxy received ping request"); send_unix_msg(sockd, "pong"); } else if (cmdmatch(buf, "recruit")) { recruit_subproxy(gdata, buf); } else if (cmdmatch(buf, "dropproxy")) { drop_proxy(gdata, buf); } else { LOGWARNING("Generator received unrecognised message: %s", buf); } goto retry; out: Close(sockd); return ret; } static int server_mode(ckpool_t *ckp, proc_instance_t *pi) { server_instance_t *si; int i, ret; ckp->servers = ckalloc(sizeof(server_instance_t *) * ckp->btcds); for (i = 0; i < ckp->btcds; i++) { ckp->servers[i] = ckzalloc(sizeof(server_instance_t)); si = ckp->servers[i]; si->url = ckp->btcdurl[i]; si->auth = ckp->btcdauth[i]; si->pass = ckp->btcdpass[i]; si->notify = ckp->btcdnotify[i]; } ret = gen_loop(pi); for (i = 0; i < ckp->btcds; i++) { si = ckp->servers[i]; kill_server(si); dealloc(si); } dealloc(ckp->servers); return ret; } static proxy_instance_t *__add_proxy(ckpool_t *ckp, gdata_t *gdata, const int id) { proxy_instance_t *proxy; gdata->proxies_generated++; proxy = ckzalloc(sizeof(proxy_instance_t)); proxy->id = id; proxy->url = strdup(ckp->proxyurl[id]); proxy->auth = strdup(ckp->proxyauth[id]); proxy->pass = strdup(ckp->proxypass[id]); proxy->ckp = proxy->cs.ckp = ckp; HASH_ADD_INT(gdata->proxies, id, proxy); proxy->global = true; return proxy; } static int proxy_mode(ckpool_t *ckp, proc_instance_t *pi) { gdata_t *gdata = ckp->data; proxy_instance_t *proxy; int i, ret; mutex_init(&gdata->lock); mutex_init(&gdata->notify_lock); mutex_init(&gdata->share_lock); /* Create all our proxy structures and pointers */ for (i = 0; i < ckp->proxies; i++) { proxy = __add_proxy(ckp, gdata, i); if (ckp->passthrough) { create_pthread(&proxy->pth_precv, passthrough_recv, proxy); proxy->passsends = create_ckmsgq(ckp, "passsend", &passthrough_send); } else { prepare_proxy(proxy); create_pthread(&gdata->pth_uprecv, userproxy_recv, ckp); mutex_init(&gdata->psend_lock); cond_init(&gdata->psend_cond); create_pthread(&gdata->pth_psend, proxy_send, ckp); } } LOGWARNING("%s generator ready", ckp->name); ret = proxy_loop(pi); return ret; } /* Tell the watchdog what the current server instance is and decide if we * should check to see if the higher priority servers are alive and fallback */ static void server_watchdog(ckpool_t *ckp, server_instance_t *cursi) { static time_t last_t = 0; bool alive = false; time_t now_t; int i; /* Rate limit to checking only once every 5 seconds */ now_t = time(NULL); if (now_t <= last_t + 5) return; last_t = now_t; /* Is this the highest priority server already? */ if (!cursi->id) return; for (i = 0; i < ckp->btcds; i++) { server_instance_t *si = ckp->servers[i]; /* Have we reached the current server? */ if (si == cursi) return; alive = server_alive(ckp, si, true); if (alive) break; } if (alive) reconnect_generator(ckp); } int generator(proc_instance_t *pi) { ckpool_t *ckp = pi->ckp; gdata_t *gdata; int ret; LOGWARNING("%s generator starting", ckp->name); gdata = ckzalloc(sizeof(gdata_t)); ckp->data = gdata; gdata->ckp = ckp; if (ckp->proxy) { char *buf = NULL; /* Wait for the stratifier to be ready for us */ do { if (!ping_main(ckp)) { ret = 1; goto out; } cksleep_ms(10); buf = send_recv_proc(ckp->stratifier, "ping"); } while (!buf); dealloc(buf); ret = proxy_mode(ckp, pi); } else { gdata->srvchk = create_ckmsgq(ckp, "srvchk", &server_watchdog); ret = server_mode(ckp, pi); } out: dealloc(ckp->data); return process_exit(ckp, pi, ret); }