diff --git a/ckpool.conf b/ckpool.conf new file mode 100644 index 00000000..fcd1d7bb --- /dev/null +++ b/ckpool.conf @@ -0,0 +1,22 @@ +{ +"btcd" : [ + { + "url" : "localhost:8332", + "auth" : "user", + "pass" : "pass" + }, + { + "url" : "backup:8332", + "auth" : "user", + "pass" : "pass" + } +], +"btcaddress" : "15qSxP1SQcUX3o4nhkfdbgyoWEFMomJ4rZ", +"btcsig" : "/mined by ck/", +"blockpoll" : 500, +"update_interval" : 30, +"serverurl" : "ckpool.org:3333", +"mindiff" : 1, +"startdiff" : 1, +"logdir" : "logs" +} diff --git a/ckproxy.conf b/ckproxy.conf new file mode 100644 index 00000000..af528c1d --- /dev/null +++ b/ckproxy.conf @@ -0,0 +1,19 @@ +{ +"proxy" : [ + { + "url" : "ckpool.org:3333", + "auth" : "user", + "pass" : "pass" + }, + { + "url" : "backup.ckpool.org:3333", + "auth" : "user", + "pass" : "pass" + } +], +"update_interval" : 30, +"serverurl" : "192.168.1.100:3334", +"mindiff" : 1, +"startdiff" : 1, +"logdir" : "logs" +} diff --git a/pool/base.php b/pool/base.php index 8903293d..97211a63 100644 --- a/pool/base.php +++ b/pool/base.php @@ -229,7 +229,7 @@ function validate() $whoid = ''; if (!isset($_SESSION['ckpkey'])) - return false; + return array(false, NULL); $key = $_SESSION['ckpkey']; if (!isset($_SESSION[$key])) diff --git a/pool/db.php b/pool/db.php index 6ec73703..14a939c3 100644 --- a/pool/db.php +++ b/pool/db.php @@ -50,7 +50,8 @@ function msgEncode($cmd, $id, $fields) { global $send_sep, $fld_sep, $val_sep; - $msg = $cmd . $send_sep . $id; + $t = time() % 10000; + $msg = $cmd . $send_sep . $id.$t; $first = true; foreach ($fields as $name => $value) { diff --git a/pool/page_reg.php b/pool/page_reg.php index 5a97f3a4..fab55747 100644 --- a/pool/page_reg.php +++ b/pool/page_reg.php @@ -75,33 +75,47 @@ function safepass($pass) function show_reg($menu, $name, $u) { $user = getparam('user', false); - $mail = getparam('mail', false); + $mail = trim(getparam('mail', false)); $pass = getparam('pass', false); $pass2 = getparam('pass2', false); $data = array(); + + if (nuem($user)) + $data['user'] = ''; + else + $data['user'] = $user; + + if (nuem($mail)) + $data['mail'] = ''; + else + $data['mail'] = $mail; + $ok = true; - if ($user === NULL && $mail === NULL && $pass === NULL && $pass2 === NULL) - $ok = false; + if (nuem($user) || nuem($mail) || nuem($pass) || nuem($pass2)) + $ok = false; else { - if ($user !== NULL) - $data['user'] = $user; - else - $ok = false; - if ($mail !== NULL) - $data['mail'] = $mail; - else - $ok = false; - if ($pass === NULL || safepass($pass) !== true) + if (safepass($pass) !== true) { $ok = false; - $data['error'] = "Password is unsafe"; - } elseif ($pass2 === NULL || $pass2 != $pass) + $data['error'] = "Password is unsafe - requires 6 or more characters, including
" . + "at least one of each uppercase, lowercase and digits"; + } + elseif ($pass2 != $pass) { $ok = false; $data['error'] = "Passwords don't match"; } + + $orig = $user; + $user = preg_replace('/[_\\.]/', '', $orig); + if ($user != $orig) + { + $ok = false; + $data['error'] = "Username cannot include '.' or '_'"; + $data['user'] = $user; + } } if ($ok === true) diff --git a/pool/page_workers.php b/pool/page_workers.php index 86e10e6c..83c5da01 100644 --- a/pool/page_workers.php +++ b/pool/page_workers.php @@ -82,10 +82,15 @@ function doworker($data, $user) else { $uhr /= 10000000; - if ($uhr < 100000) - $uhr = (round($uhr)/100).'GHs'; + if ($uhr < 0.01) + $uhr = '0GHs'; else - $uhr = (round($uhr/1000)/100).'THs'; + { + if ($uhr < 100000) + $uhr = number_format(round($uhr)/100,2).'GHs'; + else + $uhr = number_format(round($uhr/1000)/100,2).'THs'; + } } $pg .= "$uhr"; $pg .= "\n"; diff --git a/sql/v0.1-v0.2.sql b/sql/v0.1-v0.2.sql deleted file mode 100644 index 0d8d8e23..00000000 --- a/sql/v0.1-v0.2.sql +++ /dev/null @@ -1,40 +0,0 @@ -SET SESSION AUTHORIZATION 'postgres'; - -BEGIN transaction; - -DO $$ -DECLARE ver TEXT; -BEGIN - - UPDATE version set version='0.2' where vlock=1 and version='0.1'; - - IF found THEN - RETURN; - END IF; - - SELECT version into ver from version - WHERE vlock=1; - - RAISE EXCEPTION 'Wrong DB version - expect "0.1" - found "%"', ver; - -END $$; - -ALTER TABLE ONLY poolstats - ADD COLUMN elapsed bigint DEFAULT 0 NOT NULL; - -CREATE TABLE userstats ( - poolinstance character varying(256) NOT NULL, - userid bigint NOT NULL, - elapsed bigint DEFAULT 0 NOT NULL, - hashrate float NOT NULL, - hashrate5m float NOT NULL, - hashrate1hr float NOT NULL, - hashrate24hr float NOT NULL, - createdate timestamp with time zone NOT NULL, - createby character varying(64) DEFAULT ''::character varying NOT NULL, - createcode character varying(128) DEFAULT ''::character varying NOT NULL, - createinet character varying(128) DEFAULT ''::character varying NOT NULL, - PRIMARY KEY (poolinstance, userid, createdate) -); - -END transaction; diff --git a/sql/v0.2-v0.3.sql b/sql/v0.2-v0.3.sql deleted file mode 100644 index f22cff78..00000000 --- a/sql/v0.2-v0.3.sql +++ /dev/null @@ -1,115 +0,0 @@ -SET SESSION AUTHORIZATION 'postgres'; - -BEGIN transaction; - -DO $$ -DECLARE ver TEXT; -BEGIN - - UPDATE version set version='0.3' where vlock=1 and version='0.2'; - - IF found THEN - RETURN; - END IF; - - SELECT version into ver from version - WHERE vlock=1; - - RAISE EXCEPTION 'Wrong DB version - expect "0.2" - found "%"', ver; - -END $$; - -DROP TABLE sharesummary; - -CREATE TABLE sharesummary ( - userid bigint NOT NULL, - workername character varying(256) NOT NULL, - workinfoid bigint NOT NULL, - diffacc float NOT NULL, - diffsta float NOT NULL, - diffdup float NOT NULL, - diffhi float NOT NULL, - diffrej float NOT NULL, - shareacc float NOT NULL, - sharesta float NOT NULL, - sharedup float NOT NULL, - sharehi float NOT NULL, - sharerej float NOT NULL, - sharecount bigint NOT NULL, - errorcount bigint NOT NULL, - firstshare timestamp with time zone NOT NULL, - lastshare timestamp with time zone NOT NULL, - complete char NOT NULL, - createdate timestamp with time zone NOT NULL, - createby character varying(64) NOT NULL, - createcode character varying(128) NOT NULL, - createinet character varying(128) NOT NULL, - modifydate timestamp with time zone NOT NULL, - modifyby character varying(64) NOT NULL, - modifycode character varying(128) NOT NULL, - modifyinet character varying(128) NOT NULL, - PRIMARY KEY (userid, workername, workinfoid) -); - -DROP TABLE blocksummary; - -CREATE TABLE workmarkers ( - markerid bigint NOT NULL, - workinfoidend bigint NOT NULL, - workinfoidstart bigint NOT NULL, - description character varying(256) DEFAULT ''::character varying NOT NULL, - createdate timestamp with time zone NOT NULL, - createby character varying(64) DEFAULT ''::character varying NOT NULL, - createcode character varying(128) DEFAULT ''::character varying NOT NULL, - createinet character varying(128) DEFAULT ''::character varying NOT NULL, - expirydate timestamp with time zone DEFAULT '6666-06-06 06:06:06+00', - PRIMARY KEY (workinfoidstart) -); -CREATE UNIQUE INDEX workmarkersid ON workmarkers USING btree (markerid); - -CREATE TABLE markersummary ( - markerid bigint NOT NULL, - userid bigint NOT NULL, - workername character varying(256) NOT NULL, - diffacc float NOT NULL, - diffsta float NOT NULL, - diffdup float NOT NULL, - diffhi float NOT NULL, - diffrej float NOT NULL, - shareacc float NOT NULL, - sharesta float NOT NULL, - sharedup float NOT NULL, - sharehi float NOT NULL, - sharerej float NOT NULL, - sharecount bigint NOT NULL, - errorcount bigint NOT NULL, - firstshare timestamp with time zone NOT NULL, - lastshare timestamp with time zone NOT NULL, - complete char NOT NULL, - createdate timestamp with time zone NOT NULL, - createby character varying(64) NOT NULL, - createcode character varying(128) NOT NULL, - createinet character varying(128) NOT NULL, - modifydate timestamp with time zone NOT NULL, - modifyby character varying(64) NOT NULL, - modifycode character varying(128) NOT NULL, - modifyinet character varying(128) NOT NULL, - PRIMARY KEY (markerid, userid, workername) -); - -ALTER TABLE ONLY eventlog - ADD COLUMN poolinstance character varying(256) NOT NULL; - -ALTER TABLE ONLY auths - ADD COLUMN poolinstance character varying(256) DEFAULT ''::character varying NOT NULL; - -ALTER TABLE ONLY auths - ALTER COLUMN poolinstance DROP DEFAULT; - -ALTER TABLE ONLY userstats - ADD COLUMN workername character varying(256) NOT NULL; - -ALTER TABLE ONLY poolstats - ALTER COLUMN elapsed DROP DEFAULT; - -END transaction; diff --git a/sql/v0.3-v0.4.sql b/sql/v0.3-v0.4.sql deleted file mode 100644 index a6964ee4..00000000 --- a/sql/v0.3-v0.4.sql +++ /dev/null @@ -1,41 +0,0 @@ -SET SESSION AUTHORIZATION 'postgres'; - -BEGIN transaction; - -DO $$ -DECLARE ver TEXT; -BEGIN - - UPDATE version set version='0.4' where vlock=1 and version='0.3'; - - IF found THEN - RETURN; - END IF; - - SELECT version into ver from version - WHERE vlock=1; - - RAISE EXCEPTION 'Wrong DB version - expect "0.3" - found "%"', ver; - -END $$; - -DROP TABLE userstats; - -CREATE TABLE userstats ( - userid bigint NOT NULL, - workername character varying(256) NOT NULL, - elapsed bigint NOT NULL, - hashrate float NOT NULL, - hashrate5m float NOT NULL, - hashrate1hr float NOT NULL, - hashrate24hr float NOT NULL, - summarylevel char NOT NULL, - statsdate timestamp with time zone NOT NULL, - createdate timestamp with time zone NOT NULL, - createby character varying(64) DEFAULT ''::character varying NOT NULL, - createcode character varying(128) DEFAULT ''::character varying NOT NULL, - createinet character varying(128) DEFAULT ''::character varying NOT NULL, - PRIMARY KEY (userid, workername, summarylevel, statsdate) -); - -END transaction; diff --git a/sql/v0.4-v0.5.sql b/sql/v0.4-v0.5.sql deleted file mode 100644 index 57e74b69..00000000 --- a/sql/v0.4-v0.5.sql +++ /dev/null @@ -1,42 +0,0 @@ -SET SESSION AUTHORIZATION 'postgres'; - -BEGIN transaction; - -DO $$ -DECLARE ver TEXT; -BEGIN - - UPDATE version set version='0.5' where vlock=1 and version='0.4'; - - IF found THEN - RETURN; - END IF; - - SELECT version into ver from version - WHERE vlock=1; - - RAISE EXCEPTION 'Wrong DB version - expect "0.4" - found "%"', ver; - -END $$; - -DROP TABLE userstats; - -CREATE TABLE userstats ( - userid bigint NOT NULL, - workername character varying(256) NOT NULL, - elapsed bigint NOT NULL, - hashrate float NOT NULL, - hashrate5m float NOT NULL, - hashrate1hr float NOT NULL, - hashrate24hr float NOT NULL, - summarylevel char NOT NULL, - summarycount integer NOT NULL, - statsdate timestamp with time zone NOT NULL, - createdate timestamp with time zone NOT NULL, - createby character varying(64) DEFAULT ''::character varying NOT NULL, - createcode character varying(128) DEFAULT ''::character varying NOT NULL, - createinet character varying(128) DEFAULT ''::character varying NOT NULL, - PRIMARY KEY (userid, workername, summarylevel, statsdate) -); - -END transaction; diff --git a/sql/v0.6.txt b/sql/v0.6.txt deleted file mode 100644 index e3f6b032..00000000 --- a/sql/v0.6.txt +++ /dev/null @@ -1,17 +0,0 @@ -To update to 0.6: -1) stop ckdb and ckpool -2) connect to postgres and delete the contents of all the tables except: - users, idcontrol, version - e.g. in psql to see all the table row counts: \i tables.sql - and to delete the rows from a table: delete from auths; -3) in pgsql: update version set version='0.6'; -4) rename all your ckdb20140*.log files using the src/relog.sh script -5) start ckdb -6) wait for ckdb to say it is ready - i.e. wait for it to reload all the - ckdb20140*.log files -7) start ckpool - -You can speed up step 6) if you don't care about losing the contents of the -DB forever: just go to the log directory and rename all the ckdb20140*.log -files something like: rename ".log" ".ignore" ckdb20140*.log -(or delete them) diff --git a/src/ckdb.c b/src/ckdb.c index 586b729a..0ba6f981 100644 --- a/src/ckdb.c +++ b/src/ckdb.c @@ -41,7 +41,8 @@ * to ensure all code using those trees/lists use locks * This code's lock implementation is equivalent to table level locking * Consider adding row level locking (a per kitem usage count) if needed - * */ + * TODO: verify all tables with multuthread access are locked + */ #define DB_VLOCK "1" #define DB_VERSION "0.6" @@ -102,11 +103,11 @@ static char *restorefrom; * in the CCLs and thus where to stop processing the CCLs to stay in * sync with ckpool * If ckpool isn't running, then the reload will complete at the end of - * the last CCL file, however if a message arrives from ckpool while + * the last CCL file, however if the 1st message arrives from ckpool while * processing the CCLs, that will mark the point where to stop processing * but can also produce a fatal error at the end of processing, reporting - * the full ckpool message, if the message was not found in the CCL - * processing after the message was received + * the ckpool message, if the message was not found in the CCL processing + * after the message was received * This can be caused by two circumstances: * 1) the disk had not yet written it to the CCL when ckdb read EOF and * ckpool was started at about the same time as the reload completed. @@ -416,15 +417,15 @@ static const tv_t date_begin = { DATE_BEGIN, 0L }; } while (0) // Override _row defaults if transfer fields are present -#define HISTORYDATETRANSFER(_row) do { \ +#define HISTORYDATETRANSFER(_root, _row) do { \ K_ITEM *item; \ - item = optional_name("createby", 1, NULL); \ + item = optional_name(_root, "createby", 1, NULL); \ if (item) \ STRNCPY(_row->createby, DATA_TRANSFER(item)->data); \ - item = optional_name("createcode", 1, NULL); \ + item = optional_name(_root, "createcode", 1, NULL); \ if (item) \ STRNCPY(_row->createcode, DATA_TRANSFER(item)->data); \ - item = optional_name("createinet", 1, NULL); \ + item = optional_name(_root, "createinet", 1, NULL); \ if (item) \ STRNCPY(_row->createinet, DATA_TRANSFER(item)->data); \ } while (0) @@ -569,21 +570,21 @@ static const tv_t date_begin = { DATE_BEGIN, 0L }; #define SIMPLEDATEDEFAULT(_row, _cd) do { \ _row->createdate.tv_sec = (_cd)->tv_sec; \ _row->createdate.tv_usec = (_cd)->tv_usec; \ - STRNCPY(_row->createby, (char *)"code"); \ + STRNCPY(_row->createby, by_default); \ STRNCPY(_row->createcode, (char *)__func__); \ - STRNCPY(_row->createinet, (char *)"127.0.0.1"); \ + STRNCPY(_row->createinet, inet_default); \ } while (0) // Override _row defaults if transfer fields are present -#define SIMPLEDATETRANSFER(_row) do { \ +#define SIMPLEDATETRANSFER(_root, _row) do { \ K_ITEM *item; \ - item = optional_name("createby", 1, NULL); \ + item = optional_name(_root, "createby", 1, NULL); \ if (item) \ STRNCPY(_row->createby, DATA_TRANSFER(item)->data); \ - item = optional_name("createcode", 1, NULL); \ + item = optional_name(_root, "createcode", 1, NULL); \ if (item) \ STRNCPY(_row->createcode, DATA_TRANSFER(item)->data); \ - item = optional_name("createinet", 1, NULL); \ + item = optional_name(_root, "createinet", 1, NULL); \ if (item) \ STRNCPY(_row->createinet, DATA_TRANSFER(item)->data); \ } while (0) @@ -626,17 +627,23 @@ static const tv_t date_begin = { DATE_BEGIN, 0L }; } \ } while (0) -// Different input data handling -static bool reloading = false; +// DB users,workers,auth load is complete +static bool db_auths_complete = false; // DB load is complete static bool db_load_complete = false; +// Different input data handling +static bool reloading = false; // Data load is complete static bool startup_complete = false; -// Tell the summarizer to die -static bool summarizer_die = false; +// Tell everyone to die +static bool everyone_die = false; + +static cklock_t fpm_lock; +static char *first_pool_message; +static sem_t socketer_sem; static const char *userpatt = "^[!-~]*$"; // no spaces -static const char *mailpatt = "^[A-Za-z0-9_-][A-Za-z0-9_\\.-]*@[A-Za-z0-9][A-Za-z0-9\\.]*[A-Za-z0-9]$"; +static const char *mailpatt = "^[A-Za-z0-9_-][A-Za-z0-9_\\.-]*@[A-Za-z0-9][A-Za-z0-9\\.-]*[A-Za-z0-9]$"; static const char *idpatt = "^[_A-Za-z][_A-Za-z0-9]*$"; static const char *intpatt = "^[0-9][0-9]*$"; static const char *hashpatt = "^[A-Fa-f0-9]*$"; @@ -650,6 +657,55 @@ static const char *hashpatt = "^[A-Fa-f0-9]*$"; #define STR_SHAREERRORS "shareerror" #define STR_AGEWORKINFO "ageworkinfo" +static char *by_default = "code"; +static char *inet_default = "127.0.0.1"; + +enum cmd_values { + CMD_UNSET, + CMD_REPLY, // Means something was wrong - send back reply + CMD_SHUTDOWN, + CMD_PING, + CMD_SHARELOG, + CMD_AUTH, + CMD_ADDUSER, + CMD_CHKPASS, + CMD_POOLSTAT, + CMD_USERSTAT, + CMD_BLOCK, + CMD_NEWID, + CMD_PAYMENTS, + CMD_WORKERS, + CMD_ALLUSERS, + CMD_HOMEPAGE, + CMD_DSP, + CMD_STATS, + CMD_END +}; + +// WORKQUEUE +typedef struct workqueue { + char *buf; + int which_cmds; + enum cmd_values cmdnum; + char cmd[CMD_SIZ+1]; + char id[ID_SIZ+1]; + tv_t now; + char by[TXT_SML+1]; + char code[TXT_MED+1]; + char inet[TXT_MED+1]; + tv_t cd; + K_TREE *trf_root; + K_STORE *trf_store; +} WORKQUEUE; + +#define ALLOC_WORKQUEUE 1024 +#define LIMIT_WORKQUEUE 0 +#define DATA_WORKQUEUE(_item) ((WORKQUEUE *)(_item->data)) + +static K_LIST *workqueue_free; +static K_STORE *workqueue_store; +static sem_t workqueue_sem; + // TRANSFER #define NAME_SIZE 63 #define VALUE_SIZE 1023 @@ -663,9 +719,7 @@ typedef struct transfer { #define LIMIT_TRANSFER 0 #define DATA_TRANSFER(_item) ((TRANSFER *)(_item->data)) -static K_TREE *transfer_root; static K_LIST *transfer_free; -static K_STORE *transfer_store; // older version missing field defaults static TRANSFER auth_1 = { "poolinstance", "", auth_1.value }; @@ -1331,7 +1385,7 @@ static cmp_t cmp_transfer(K_ITEM *a, K_ITEM *b) DATA_TRANSFER(b)->name); } -static K_ITEM *find_transfer(char *name) +static K_ITEM *find_transfer(K_TREE *trf_root, char *name) { TRANSFER transfer; K_TREE_CTX ctx[1]; @@ -1339,17 +1393,17 @@ static K_ITEM *find_transfer(char *name) STRNCPY(transfer.name, name); look.data = (void *)(&transfer); - return find_in_ktree(transfer_root, &look, cmp_transfer, ctx); + return find_in_ktree(trf_root, &look, cmp_transfer, ctx); } -static K_ITEM *optional_name(char *name, int len, char *patt) +static K_ITEM *optional_name(K_TREE *trf_root, char *name, int len, char *patt) { K_ITEM *item; char *value; regex_t re; int ret; - item = find_transfer(name); + item = find_transfer(trf_root, name); if (!item) return NULL; @@ -1371,12 +1425,12 @@ static K_ITEM *optional_name(char *name, int len, char *patt) return item; } -#define require_name(_name, _len, _patt, _reply, _siz) \ - _require_name(_name, _len, _patt, _reply, _siz, \ - WHERE_FFL_HERE) +#define require_name(_root, _name, _len, _patt, _reply, _siz) \ + _require_name(_root, _name, _len, _patt, _reply, \ + _siz, WHERE_FFL_HERE) -static K_ITEM *_require_name(char *name, int len, char *patt, char *reply, - size_t siz, WHERE_FFL_ARGS) +static K_ITEM *_require_name(K_TREE *trf_root, char *name, int len, char *patt, + char *reply, size_t siz, WHERE_FFL_ARGS) { K_ITEM *item; char *value; @@ -1384,7 +1438,7 @@ static K_ITEM *_require_name(char *name, int len, char *patt, char *reply, size_t dlen; int ret; - item = find_transfer(name); + item = find_transfer(trf_root, name); if (!item) { LOGERR("%s(): failed, field '%s' missing from %s():%d", __func__, name, func, line); @@ -1742,6 +1796,7 @@ static int64_t nextid(PGconn *conn, char *idname, int64_t increment, tv_t *cd, char *by, char *code, char *inet) { ExecStatusType rescode; + bool conned = false; PGresult *res; char qry[1024]; char *params[5]; @@ -1757,6 +1812,11 @@ static int64_t nextid(PGconn *conn, char *idname, int64_t increment, "where idname='%s' for update", idname); + if (conn == NULL) { + conn = dbconnect(); + conned = true; + } + res = PQexec(conn, qry); rescode = PQresultStatus(res); if (!PGOK(rescode)) { @@ -1811,6 +1871,8 @@ static int64_t nextid(PGconn *conn, char *idname, int64_t increment, free(params[n]); cleanup: PQclear(res); + if (conned) + PQfinish(conn); return lastid; } @@ -1830,13 +1892,16 @@ static K_ITEM *get_workerstatus(int64_t userid, char *workername) { WORKERSTATUS workerstatus; K_TREE_CTX ctx[1]; - K_ITEM look; + K_ITEM look, *find; workerstatus.userid = userid; STRNCPY(workerstatus.workername, workername); look.data = (void *)(&workerstatus); - return find_in_ktree(workerstatus_root, &look, cmp_workerstatus, ctx); + K_RLOCK(workerstatus_free); + find = find_in_ktree(workerstatus_root, &look, cmp_workerstatus, ctx); + K_RUNLOCK(workerstatus_free); + return find; } static K_ITEM *_find_create_workerstatus(int64_t userid, char *workername, bool create) @@ -1984,6 +2049,7 @@ static cmp_t cmp_userid(K_ITEM *a, K_ITEM *b) return c; } +// Must be R or W locked before call static K_ITEM *find_users(char *username) { USERS users; @@ -1998,6 +2064,7 @@ static K_ITEM *find_users(char *username) return find_in_ktree(users_root, &look, cmp_users, ctx); } +// Must be R or W locked before call static K_ITEM *find_userid(int64_t userid) { USERS users; @@ -2016,6 +2083,7 @@ static bool users_add(PGconn *conn, char *username, char *emailaddress, char *pa tv_t *now, char *by, char *code, char *inet) { ExecStatusType rescode; + bool conned = false; PGresult *res; K_ITEM *item; int n; @@ -2072,6 +2140,11 @@ static bool users_add(PGconn *conn, char *username, char *emailaddress, char *pa "secondaryuserid" HISTORYDATECONTROL ") values (" PQPARAM11 ")"; + if (!conn) { + conn = dbconnect(); + conned = true; + } + res = PQexecParams(conn, ins, par, NULL, (const char **)params, NULL, NULL, 0); rescode = PQresultStatus(res); if (!PGOK(rescode)) { @@ -2082,6 +2155,8 @@ static bool users_add(PGconn *conn, char *username, char *emailaddress, char *pa ok = true; unparam: PQclear(res); + if (conned) + PQfinish(conn); for (n = 0; n < par; n++) free(params[n]); unitem: @@ -2245,6 +2320,7 @@ static K_ITEM *workers_add(PGconn *conn, int64_t userid, char *workername, char *code, char *inet, tv_t *cd) { ExecStatusType rescode; + bool conned = false; PGresult *res; K_ITEM *item, *ret = NULL; int n; @@ -2263,6 +2339,11 @@ static K_ITEM *workers_add(PGconn *conn, int64_t userid, char *workername, row = DATA_WORKERS(item); + if (conn == NULL) { + conn = dbconnect(); + conned = true; + } + row->workerid = nextid(conn, "workerid", (int64_t)1, cd, by, code, inet); if (row->workerid == 0) goto unitem; @@ -2329,6 +2410,8 @@ unparam: for (n = 0; n < par; n++) free(params[n]); unitem: + if (conned) + PQfinish(conn); K_WLOCK(workers_free); if (!ret) k_add_head(workers_free, item); @@ -2346,6 +2429,7 @@ static bool workers_update(PGconn *conn, K_ITEM *item, char *difficultydefault, char *by, char *code, char *inet, tv_t *cd) { ExecStatusType rescode; + bool conned = false; PGresult *res; int n; WORKERS *row; @@ -2402,6 +2486,11 @@ static bool workers_update(PGconn *conn, K_ITEM *item, char *difficultydefault, params[par++] = tv_to_buf((tv_t *)&default_expiry, NULL, 0); PARCHKVAL(par, 3, params); + if (conn == NULL) { + conn = dbconnect(); + conned = true; + } + res = PQexec(conn, "Begin"); rescode = PQresultStatus(res); if (!PGOK(rescode)) { @@ -2457,6 +2546,8 @@ static bool workers_update(PGconn *conn, K_ITEM *item, char *difficultydefault, ok = true; unparam: PQclear(res); + if (conned) + PQfinish(conn); for (n = 0; n < par; n++) free(params[n]); early: @@ -2515,7 +2606,9 @@ static K_ITEM *new_worker_find_user(PGconn *conn, bool update, char *username, { K_ITEM *item; + K_RLOCK(users_free); item = find_users(username); + K_RUNLOCK(users_free); if (!item) return NULL; @@ -2850,12 +2943,15 @@ static int64_t workinfo_add(PGconn *conn, char *workinfoidstr, char *poolinstanc char *transactiontree, char *merklehash, char *prevhash, char *coinbase1, char *coinbase2, char *version, char *bits, char *ntime, char *reward, char *by, - char *code, char *inet, tv_t *cd, bool igndup) + char *code, char *inet, tv_t *cd, bool igndup, + K_TREE *trf_root) { ExecStatusType rescode; + bool conned = false; K_TREE_CTX ctx[1]; PGresult *res; K_ITEM *item; + char cd_buf[DATE_BUFSIZ]; int n; int64_t workinfoid = -1; WORKINFO *row; @@ -2884,15 +2980,23 @@ static int64_t workinfo_add(PGconn *conn, char *workinfoidstr, char *poolinstanc TXT_TO_BIGINT("reward", reward, row->reward); HISTORYDATEINIT(row, cd, by, code, inet); - HISTORYDATETRANSFER(row); + HISTORYDATETRANSFER(trf_root, row); - if (igndup && find_in_ktree(workinfo_root, item, cmp_workinfo, ctx)) { + K_WLOCK(workinfo_free); + if (find_in_ktree(workinfo_root, item, cmp_workinfo, ctx)) { workinfoid = row->workinfoid; - K_WLOCK(workinfo_free); k_add_head(workinfo_free, item); K_WUNLOCK(workinfo_free); + + if (!igndup) { + tv_to_buf(cd, cd_buf, sizeof(cd_buf)); + LOGERR("%s(): Duplicate workinfo ignored %s/%s/%s", + __func__, workinfoidstr, poolinstance, cd_buf); + } + return workinfoid; } + K_WUNLOCK(workinfo_free); par = 0; params[par++] = bigint_to_buf(row->workinfoid, NULL, 0); @@ -2914,6 +3018,11 @@ static int64_t workinfo_add(PGconn *conn, char *workinfoidstr, char *poolinstanc "prevhash,coinbase1,coinbase2,version,bits,ntime,reward" HISTORYDATECONTROL ") values (" PQPARAM16 ")"; + if (!conn) { + conn = dbconnect(); + conned = true; + } + res = PQexecParams(conn, ins, par, NULL, (const char **)params, NULL, NULL, 0); rescode = PQresultStatus(res); if (!PGOK(rescode)) { @@ -2925,6 +3034,8 @@ static int64_t workinfo_add(PGconn *conn, char *workinfoidstr, char *poolinstanc unparam: PQclear(res); + if (conned) + PQfinish(conn); for (n = 0; n < par; n++) free(params[n]); @@ -3271,9 +3382,10 @@ static void zero_sharesummary(SHARESUMMARY *row, tv_t *cd) static K_ITEM *find_sharesummary(int64_t userid, char *workername, int64_t workinfoid); // Memory (and log file) only -static bool shares_add(char *workinfoid, char *username, char *workername, char *clientid, - char *enonce1, char *nonce2, char *nonce, char *diff, char *sdiff, - char *secondaryuserid, char *by, char *code, char *inet, tv_t *cd) +static bool shares_add(PGconn *conn, char *workinfoid, char *username, char *workername, + char *clientid, char *enonce1, char *nonce2, char *nonce, + char *diff, char *sdiff, char *secondaryuserid, char *by, + char *code, char *inet, tv_t *cd, K_TREE *trf_root) { K_ITEM *s_item, *u_item, *wi_item, *w_item, *ss_item; SHARES *shares; @@ -3288,7 +3400,9 @@ static bool shares_add(char *workinfoid, char *username, char *workername, char shares = DATA_SHARES(s_item); // TODO: allow BTC address later? + K_RLOCK(users_free); u_item = find_users(username); + K_RUNLOCK(users_free); if (!u_item) goto unitem; @@ -3305,13 +3419,13 @@ static bool shares_add(char *workinfoid, char *username, char *workername, char STRNCPY(shares->secondaryuserid, secondaryuserid); HISTORYDATEINIT(shares, cd, by, code, inet); - HISTORYDATETRANSFER(shares); + HISTORYDATETRANSFER(trf_root, shares); wi_item = find_workinfo(shares->workinfoid); if (!wi_item) goto unitem; - w_item = new_default_worker(NULL, false, shares->userid, shares->workername, + w_item = new_default_worker(conn, false, shares->userid, shares->workername, by, code, inet, cd); if (!w_item) goto unitem; @@ -3335,7 +3449,7 @@ static bool shares_add(char *workinfoid, char *username, char *workername, char workerstatus_update(NULL, shares, NULL, NULL); - sharesummary_update(NULL, shares, NULL, NULL, by, code, inet, cd); + sharesummary_update(conn, shares, NULL, NULL, by, code, inet, cd); ok = true; unitem: @@ -3381,9 +3495,10 @@ static cmp_t cmp_shareerrors(K_ITEM *a, K_ITEM *b) // Memory (and log file) only // TODO: handle shareerrors that appear after a workinfoid is aged or doesn't exist? -static bool shareerrors_add(char *workinfoid, char *username, char *workername, - char *clientid, char *errn, char *error, char *secondaryuserid, - char *by, char *code, char *inet, tv_t *cd) +static bool shareerrors_add(PGconn *conn, char *workinfoid, char *username, + char *workername, char *clientid, char *errn, + char *error, char *secondaryuserid, char *by, + char *code, char *inet, tv_t *cd, K_TREE *trf_root) { K_ITEM *s_item, *u_item, *wi_item, *w_item, *ss_item; SHAREERRORS *shareerrors; @@ -3398,7 +3513,9 @@ static bool shareerrors_add(char *workinfoid, char *username, char *workername, shareerrors = DATA_SHAREERRORS(s_item); // TODO: allow BTC address later? + K_RLOCK(users_free); u_item = find_users(username); + K_RUNLOCK(users_free); if (!u_item) goto unitem; @@ -3412,7 +3529,7 @@ static bool shareerrors_add(char *workinfoid, char *username, char *workername, STRNCPY(shareerrors->secondaryuserid, secondaryuserid); HISTORYDATEINIT(shareerrors, cd, by, code, inet); - HISTORYDATETRANSFER(shareerrors); + HISTORYDATETRANSFER(trf_root, shareerrors); wi_item = find_workinfo(shareerrors->workinfoid); if (!wi_item) @@ -3440,7 +3557,7 @@ static bool shareerrors_add(char *workinfoid, char *username, char *workername, } } - sharesummary_update(NULL, NULL, shareerrors, NULL, by, code, inet, cd); + sharesummary_update(conn, NULL, shareerrors, NULL, by, code, inet, cd); ok = true; unitem: @@ -4039,9 +4156,10 @@ static bool blocks_add(PGconn *conn, char *height, char *blockhash, char *workername, char *clientid, char *enonce1, char *nonce2, char *nonce, char *reward, char *by, char *code, char *inet, tv_t *cd, - bool igndup, char *id) + bool igndup, char *id, K_TREE *trf_root) { ExecStatusType rescode; + bool conned = false; PGresult *res = NULL; K_TREE_CTX ctx[1]; K_ITEM *item, *u_item; @@ -4068,7 +4186,9 @@ static bool blocks_add(PGconn *conn, char *height, char *blockhash, switch (confirmed[0]) { case BLOCKS_NEW: + K_RLOCK(users_free); u_item = find_users(username); + K_RUNLOCK(users_free); if (!u_item) row->userid = KANO; else @@ -4082,7 +4202,7 @@ static bool blocks_add(PGconn *conn, char *height, char *blockhash, STRNCPY(row->nonce, nonce); TXT_TO_BIGINT("reward", reward, row->reward); - HISTORYDATETRANSFER(row); + HISTORYDATETRANSFER(trf_root, row); if (igndup && find_in_ktree(blocks_root, item, cmp_blocks, ctx)) { K_WLOCK(blocks_free); @@ -4111,6 +4231,11 @@ static bool blocks_add(PGconn *conn, char *height, char *blockhash, "clientid,enonce1,nonce2,nonce,reward,confirmed" HISTORYDATECONTROL ") values (" PQPARAM16 ")"; + if (conn == NULL) { + conn = dbconnect(); + conned = true; + } + res = PQexecParams(conn, ins, par, NULL, (const char **)params, NULL, NULL, 0); rescode = PQresultStatus(res); if (!PGOK(rescode)) { @@ -4127,6 +4252,11 @@ static bool blocks_add(PGconn *conn, char *height, char *blockhash, params[par++] = tv_to_buf((tv_t *)&default_expiry, NULL, 0); PARCHKVAL(par, 3, params); + if (conn == NULL) { + conn = dbconnect(); + conned = true; + } + res = PQexec(conn, "Begin"); rescode = PQresultStatus(res); if (!PGOK(rescode)) { @@ -4186,6 +4316,9 @@ unparam: for (n = 0; n < par; n++) free(params[n]); flail: + if (conned) + PQfinish(conn); + K_WLOCK(blocks_free); if (!ok) k_add_head(blocks_free, item); @@ -4362,12 +4495,14 @@ static cmp_t cmp_auths(K_ITEM *a, K_ITEM *b) static char *auths_add(PGconn *conn, char *poolinstance, char *username, char *workername, char *clientid, char *enonce1, char *useragent, char *by, char *code, char *inet, - tv_t *cd, bool igndup) + tv_t *cd, bool igndup, K_TREE *trf_root) { ExecStatusType rescode; + bool conned = false; PGresult *res; K_TREE_CTX ctx[1]; K_ITEM *a_item, *u_item; + char cd_buf[DATE_BUFSIZ]; int n; AUTHS *row; char *ins; @@ -4383,7 +4518,9 @@ static char *auths_add(PGconn *conn, char *poolinstance, char *username, row = DATA_AUTHS(a_item); + K_RLOCK(users_free); u_item = find_users(username); + K_RUNLOCK(users_free); if (!u_item) goto unitem; @@ -4399,18 +4536,31 @@ static char *auths_add(PGconn *conn, char *poolinstance, char *username, STRNCPY(row->useragent, useragent); HISTORYDATEINIT(row, cd, by, code, inet); - HISTORYDATETRANSFER(row); + HISTORYDATETRANSFER(trf_root, row); - if (igndup && find_in_ktree(auths_root, a_item, cmp_auths, ctx)) { - K_WLOCK(auths_free); + K_WLOCK(auths_free); + if (find_in_ktree(auths_root, a_item, cmp_auths, ctx)) { k_add_head(auths_free, a_item); K_WUNLOCK(auths_free); + + if (!igndup) { + tv_to_buf(cd, cd_buf, sizeof(cd_buf)); + LOGERR("%s(): Duplicate auths ignored %s/%s/%s", + __func__, poolinstance, workername, cd_buf); + } + return DATA_USERS(u_item)->secondaryuserid; } + K_WUNLOCK(auths_free); // Update even if DB fails workerstatus_update(row, NULL, NULL, NULL); + if (conn == NULL) { + conn = dbconnect(); + conned = true; + } + row->authid = nextid(conn, "authid", (int64_t)1, cd, by, code, inet); if (row->authid == 0) goto unitem; @@ -4444,6 +4594,8 @@ unparam: for (n = 0; n < par; n++) free(params[n]); unitem: + if (conned) + PQfinish(conn); K_WLOCK(auths_free); if (!secuserid) k_add_head(auths_free, a_item); @@ -4592,9 +4744,10 @@ static bool poolstats_add(PGconn *conn, bool store, char *poolinstance, char *hashrate, char *hashrate5m, char *hashrate1hr, char *hashrate24hr, char *by, char *code, char *inet, tv_t *cd, - bool igndup) + bool igndup, K_TREE *trf_root) { ExecStatusType rescode; + bool conned = false; PGresult *res; K_TREE_CTX ctx[1]; K_ITEM *p_item; @@ -4625,7 +4778,7 @@ static bool poolstats_add(PGconn *conn, bool store, char *poolinstance, TXT_TO_DOUBLE("hashrate24hr", hashrate24hr, row->hashrate24hr); SIMPLEDATEINIT(row, cd, by, code, inet); - SIMPLEDATETRANSFER(row); + SIMPLEDATETRANSFER(trf_root, row); if (igndup && find_in_ktree(poolstats_root, p_item, cmp_poolstats, ctx)) { K_WLOCK(poolstats_free); @@ -4652,6 +4805,11 @@ static bool poolstats_add(PGconn *conn, bool store, char *poolinstance, "hashrate5m,hashrate1hr,hashrate24hr" SIMPLEDATECONTROL ") values (" PQPARAM12 ")"; + if (!conn) { + conn = dbconnect(); + conned = true; + } + res = PQexecParams(conn, ins, par, NULL, (const char **)params, NULL, NULL, 0); rescode = PQresultStatus(res); if (!PGOK(rescode)) { @@ -4666,6 +4824,8 @@ static bool poolstats_add(PGconn *conn, bool store, char *poolinstance, unparam: if (store) { PQclear(res); + if (conned) + PQfinish(conn); for (n = 0; n < par; n++) free(params[n]); } @@ -4909,6 +5069,7 @@ static cmp_t cmp_userstats_workerstatus(K_ITEM *a, K_ITEM *b) static bool userstats_add_db(PGconn *conn, USERSTATS *row) { ExecStatusType rescode; + bool conned = false; PGresult *res; char *ins; bool ok = false; @@ -4937,6 +5098,11 @@ static bool userstats_add_db(PGconn *conn, USERSTATS *row) "hashrate24hr,summarylevel,summarycount,statsdate" SIMPLEDATECONTROL ") values (" PQPARAM14 ")"; + if (conn == NULL) { + conn = dbconnect(); + conned = true; + } + res = PQexecParams(conn, ins, par, NULL, (const char **)params, NULL, NULL, 0); rescode = PQresultStatus(res); if (!PGOK(rescode)) { @@ -4947,6 +5113,8 @@ static bool userstats_add_db(PGconn *conn, USERSTATS *row) ok = true; unparam: PQclear(res); + if (conned) + PQfinish(conn); for (n = 0; n < par; n++) free(params[n]); @@ -4956,7 +5124,8 @@ unparam: static bool userstats_add(char *poolinstance, char *elapsed, char *username, char *workername, char *hashrate, char *hashrate5m, char *hashrate1hr, char *hashrate24hr, bool idle, - bool eos, char *by, char *code, char *inet, tv_t *cd) + bool eos, char *by, char *code, char *inet, tv_t *cd, + K_TREE *trf_root) { K_ITEM *us_item, *u_item, *us_match, *us_next, look; tv_t eosdate; @@ -4973,7 +5142,9 @@ static bool userstats_add(char *poolinstance, char *elapsed, char *username, STRNCPY(row->poolinstance, poolinstance); TXT_TO_BIGINT("elapsed", elapsed, row->elapsed); + K_RLOCK(users_free); u_item = find_users(username); + K_RUNLOCK(users_free); if (!u_item) return false; row->userid = DATA_USERS(u_item)->userid; @@ -4987,7 +5158,7 @@ static bool userstats_add(char *poolinstance, char *elapsed, char *username, row->summarylevel[1] = '\0'; row->summarycount = 1; SIMPLEDATEINIT(row, cd, by, code, inet); - SIMPLEDATETRANSFER(row); + SIMPLEDATETRANSFER(trf_root, row); copy_tv(&(row->statsdate), &(row->createdate)); if (eos) { @@ -5339,7 +5510,10 @@ static bool check_db_version(PGconn *conn) return true; } -static bool getdata() +/* Load tables required to support auths,adduser,chkpass and newid + * N.B. idcontrol is DB internal so is always ready + */ +static bool getdata1() { PGconn *conn = dbconnect(); bool ok = true; @@ -5350,33 +5524,44 @@ static bool getdata() goto matane; if (!(ok = workers_fill(conn))) goto matane; + ok = auths_fill(conn); + +matane: + + PQfinish(conn); + return ok; +} + +static bool getdata2() +{ + PGconn *conn = dbconnect(); + bool ok = true; + + if (!(ok = blocks_fill(conn))) + goto sukamudai; if (!(ok = payments_fill(conn))) - goto matane; + goto sukamudai; if (!(ok = workinfo_fill(conn))) - goto matane; + goto sukamudai; if (!(ok = shares_fill())) - goto matane; + goto sukamudai; if (!(ok = shareerrors_fill())) - goto matane; + goto sukamudai; if (!(ok = sharesummary_fill(conn))) - goto matane; - if (!(ok = blocks_fill(conn))) - goto matane; - if (!(ok = auths_fill(conn))) - goto matane; + goto sukamudai; if (!(ok = poolstats_fill(conn))) - goto matane; + goto sukamudai; ok = userstats_fill(conn); -matane: +sukamudai: PQfinish(conn); return ok; } -static void reload_from(tv_t *start); +static bool reload_from(tv_t *start); -static void reload() +static bool reload() { char buf[DATE_BUFSIZ+1]; char *filename; @@ -5445,7 +5630,7 @@ static void reload() } free(filename); } - reload_from(&start); + return reload_from(&start); } /* TODO: @@ -5544,10 +5729,16 @@ static bool setup_data() K_ITEM look, *found; WORKINFO wi; + cklock_init(&fpm_lock); + cksem_init(&workqueue_sem); + cksem_init(&socketer_sem); + + workqueue_free = k_new_list("WorkQueue", sizeof(WORKQUEUE), + ALLOC_WORKQUEUE, LIMIT_WORKQUEUE, true); + workqueue_store = k_new_store(workqueue_free); + transfer_free = k_new_list("Transfer", sizeof(TRANSFER), ALLOC_TRANSFER, LIMIT_TRANSFER, true); - transfer_store = k_new_store(transfer_free); - transfer_root = new_ktree(); transfer_free->dsp_func = dsp_transfer; users_free = k_new_list("Users", sizeof(USERS), @@ -5623,12 +5814,19 @@ static bool setup_data() workerstatus_store = k_new_store(workerstatus_free); workerstatus_root = new_ktree(); - if (!getdata()) + if (!getdata1()) + return false; + + db_auths_complete = true; + cksem_post(&socketer_sem); + + if (!getdata2()) return false; db_load_complete = true; - reload(); + if (!reload()) + return false; workerstatus_ready(); @@ -5651,36 +5849,34 @@ static bool setup_data() return true; } -static char *cmd_adduser(char *cmd, char *id, tv_t *now, char *by, char *code, char *inet, - __maybe_unused tv_t *notcd) +static char *cmd_adduser(PGconn *conn, char *cmd, char *id, tv_t *now, char *by, + char *code, char *inet, __maybe_unused tv_t *notcd, + K_TREE *trf_root) { char reply[1024] = ""; size_t siz = sizeof(reply); K_ITEM *i_username, *i_emailaddress, *i_passwordhash; - PGconn *conn; bool ok; LOGDEBUG("%s(): cmd '%s'", __func__, cmd); - i_username = require_name("username", 3, (char *)userpatt, reply, siz); + i_username = require_name(trf_root, "username", 3, (char *)userpatt, reply, siz); if (!i_username) return strdup(reply); - i_emailaddress = require_name("emailaddress", 7, (char *)mailpatt, reply, siz); + i_emailaddress = require_name(trf_root, "emailaddress", 7, (char *)mailpatt, reply, siz); if (!i_emailaddress) return strdup(reply); - i_passwordhash = require_name("passwordhash", 64, (char *)hashpatt, reply, siz); + i_passwordhash = require_name(trf_root, "passwordhash", 64, (char *)hashpatt, reply, siz); if (!i_passwordhash) return strdup(reply); - conn = dbconnect(); ok = users_add(conn, DATA_TRANSFER(i_username)->data, - DATA_TRANSFER(i_emailaddress)->data, - DATA_TRANSFER(i_passwordhash)->data, - now, by, code, inet); - PQfinish(conn); + DATA_TRANSFER(i_emailaddress)->data, + DATA_TRANSFER(i_passwordhash)->data, + now, by, code, inet); if (!ok) { LOGERR("%s.failed.DBE", id); @@ -5691,9 +5887,10 @@ static char *cmd_adduser(char *cmd, char *id, tv_t *now, char *by, char *code, c return strdup(reply); } -static char *cmd_chkpass(char *cmd, char *id, __maybe_unused tv_t *now, __maybe_unused char *by, - __maybe_unused char *code, __maybe_unused char *inet, - __maybe_unused tv_t *notcd) +static char *cmd_chkpass(__maybe_unused PGconn *conn, char *cmd, char *id, + __maybe_unused tv_t *now, __maybe_unused char *by, + __maybe_unused char *code, __maybe_unused char *inet, + __maybe_unused tv_t *notcd, K_TREE *trf_root) { K_ITEM *i_username, *i_passwordhash, *u_item; char reply[1024] = ""; @@ -5702,15 +5899,17 @@ static char *cmd_chkpass(char *cmd, char *id, __maybe_unused tv_t *now, __maybe_ LOGDEBUG("%s(): cmd '%s'", __func__, cmd); - i_username = require_name("username", 3, (char *)userpatt, reply, siz); + i_username = require_name(trf_root, "username", 3, (char *)userpatt, reply, siz); if (!i_username) return strdup(reply); - i_passwordhash = require_name("passwordhash", 64, (char *)hashpatt, reply, siz); + i_passwordhash = require_name(trf_root, "passwordhash", 64, (char *)hashpatt, reply, siz); if (!i_passwordhash) return strdup(reply); + K_RLOCK(users_free); u_item = find_users(DATA_TRANSFER(i_username)->data); + K_RUNLOCK(users_free); if (!u_item) ok = false; @@ -5729,13 +5928,13 @@ static char *cmd_chkpass(char *cmd, char *id, __maybe_unused tv_t *now, __maybe_ return strdup("ok."); } -static char *cmd_poolstats_do(char *cmd, char *id, char *by, char *code, - char *inet, tv_t *cd, bool igndup) +static char *cmd_poolstats_do(PGconn *conn, char *cmd, char *id, char *by, + char *code, char *inet, tv_t *cd, bool igndup, + K_TREE *trf_root) { char reply[1024] = ""; size_t siz = sizeof(reply); K_TREE_CTX ctx[1]; - PGconn *conn; bool store; // log to logfile @@ -5748,35 +5947,35 @@ static char *cmd_poolstats_do(char *cmd, char *id, char *by, char *code, LOGDEBUG("%s(): cmd '%s'", __func__, cmd); - i_poolinstance = require_name("poolinstance", 1, NULL, reply, siz); + i_poolinstance = require_name(trf_root, "poolinstance", 1, NULL, reply, siz); if (!i_poolinstance) return strdup(reply); - i_elapsed = optional_name("elapsed", 1, NULL); + i_elapsed = optional_name(trf_root, "elapsed", 1, NULL); if (!i_elapsed) i_elapsed = &poolstats_elapsed; - i_users = require_name("users", 1, NULL, reply, siz); + i_users = require_name(trf_root, "users", 1, NULL, reply, siz); if (!i_users) return strdup(reply); - i_workers = require_name("workers", 1, NULL, reply, siz); + i_workers = require_name(trf_root, "workers", 1, NULL, reply, siz); if (!i_workers) return strdup(reply); - i_hashrate = require_name("hashrate", 1, NULL, reply, siz); + i_hashrate = require_name(trf_root, "hashrate", 1, NULL, reply, siz); if (!i_hashrate) return strdup(reply); - i_hashrate5m = require_name("hashrate5m", 1, NULL, reply, siz); + i_hashrate5m = require_name(trf_root, "hashrate5m", 1, NULL, reply, siz); if (!i_hashrate5m) return strdup(reply); - i_hashrate1hr = require_name("hashrate1hr", 1, NULL, reply, siz); + i_hashrate1hr = require_name(trf_root, "hashrate1hr", 1, NULL, reply, siz); if (!i_hashrate1hr) return strdup(reply); - i_hashrate24hr = require_name("hashrate24hr", 1, NULL, reply, siz); + i_hashrate24hr = require_name(trf_root, "hashrate24hr", 1, NULL, reply, siz); if (!i_hashrate24hr) return strdup(reply); @@ -5803,7 +6002,6 @@ static char *cmd_poolstats_do(char *cmd, char *id, char *by, char *code, store = false; } - conn = dbconnect(); ok = poolstats_add(conn, store, DATA_TRANSFER(i_poolinstance)->data, DATA_TRANSFER(i_elapsed)->data, DATA_TRANSFER(i_users)->data, @@ -5812,8 +6010,7 @@ static char *cmd_poolstats_do(char *cmd, char *id, char *by, char *code, DATA_TRANSFER(i_hashrate5m)->data, DATA_TRANSFER(i_hashrate1hr)->data, DATA_TRANSFER(i_hashrate24hr)->data, - by, code, inet, cd, igndup); - PQfinish(conn); + by, code, inet, cd, igndup, trf_root); if (!ok) { LOGERR("%s.failed.DBE", id); @@ -5824,8 +6021,10 @@ static char *cmd_poolstats_do(char *cmd, char *id, char *by, char *code, return strdup(reply); } -static char *cmd_poolstats(char *cmd, char *id, __maybe_unused tv_t *notnow, - char *by, char *code, char *inet, tv_t *cd) +static char *cmd_poolstats(PGconn *conn, char *cmd, char *id, + __maybe_unused tv_t *notnow, char *by, + char *code, char *inet, tv_t *cd, + K_TREE *trf_root) { bool igndup = false; @@ -5836,11 +6035,12 @@ static char *cmd_poolstats(char *cmd, char *id, __maybe_unused tv_t *notnow, return NULL; } - return cmd_poolstats_do(cmd, id, by, code, inet, cd, igndup); + return cmd_poolstats_do(conn, cmd, id, by, code, inet, cd, igndup, trf_root); } -static char *cmd_userstats(char *cmd, char *id, __maybe_unused tv_t *notnow, - char *by, char *code, char *inet, tv_t *cd) +static char *cmd_userstats(__maybe_unused PGconn *conn, char *cmd, char *id, + __maybe_unused tv_t *notnow, char *by, char *code, + char *inet, tv_t *cd, K_TREE *trf_root) { char reply[1024] = ""; size_t siz = sizeof(reply); @@ -5854,45 +6054,45 @@ static char *cmd_userstats(char *cmd, char *id, __maybe_unused tv_t *notnow, LOGDEBUG("%s(): cmd '%s'", __func__, cmd); - i_poolinstance = require_name("poolinstance", 1, NULL, reply, siz); + i_poolinstance = require_name(trf_root, "poolinstance", 1, NULL, reply, siz); if (!i_poolinstance) return strdup(reply); - i_elapsed = optional_name("elapsed", 1, NULL); + i_elapsed = optional_name(trf_root, "elapsed", 1, NULL); if (!i_elapsed) i_elapsed = &userstats_elapsed; - i_username = require_name("username", 1, NULL, reply, siz); + i_username = require_name(trf_root, "username", 1, NULL, reply, siz); if (!i_username) return strdup(reply); - i_workername = optional_name("workername", 1, NULL); + i_workername = optional_name(trf_root, "workername", 1, NULL); if (!i_workername) i_workername = &userstats_workername; - i_hashrate = require_name("hashrate", 1, NULL, reply, siz); + i_hashrate = require_name(trf_root, "hashrate", 1, NULL, reply, siz); if (!i_hashrate) return strdup(reply); - i_hashrate5m = require_name("hashrate5m", 1, NULL, reply, siz); + i_hashrate5m = require_name(trf_root, "hashrate5m", 1, NULL, reply, siz); if (!i_hashrate5m) return strdup(reply); - i_hashrate1hr = require_name("hashrate1hr", 1, NULL, reply, siz); + i_hashrate1hr = require_name(trf_root, "hashrate1hr", 1, NULL, reply, siz); if (!i_hashrate1hr) return strdup(reply); - i_hashrate24hr = require_name("hashrate24hr", 1, NULL, reply, siz); + i_hashrate24hr = require_name(trf_root, "hashrate24hr", 1, NULL, reply, siz); if (!i_hashrate24hr) return strdup(reply); - i_idle = optional_name("idle", 1, NULL); + i_idle = optional_name(trf_root, "idle", 1, NULL); if (!i_idle) i_idle = &userstats_idle; idle = (strcasecmp(DATA_TRANSFER(i_idle)->data, TRUE_STR) == 0); - i_eos = optional_name("eos", 1, NULL); + i_eos = optional_name(trf_root, "eos", 1, NULL); if (!i_eos) i_eos = &userstats_eos; @@ -5906,7 +6106,7 @@ static char *cmd_userstats(char *cmd, char *id, __maybe_unused tv_t *notnow, DATA_TRANSFER(i_hashrate5m)->data, DATA_TRANSFER(i_hashrate1hr)->data, DATA_TRANSFER(i_hashrate24hr)->data, - idle, eos, by, code, inet, cd); + idle, eos, by, code, inet, cd, trf_root); if (!ok) { LOGERR("%s.failed.DATA", id); @@ -5917,8 +6117,9 @@ static char *cmd_userstats(char *cmd, char *id, __maybe_unused tv_t *notnow, return strdup(reply); } -static char *cmd_newid(char *cmd, char *id, tv_t *now, char *by, char *code, char *inet, - __maybe_unused tv_t *cd) +static char *cmd_newid(PGconn *conn, char *cmd, char *id, tv_t *now, char *by, + char *code, char *inet, __maybe_unused tv_t *cd, + K_TREE *trf_root) { char reply[1024] = ""; size_t siz = sizeof(reply); @@ -5928,18 +6129,18 @@ static char *cmd_newid(char *cmd, char *id, tv_t *now, char *by, char *code, cha int par; bool ok = false; ExecStatusType rescode; + bool conned = false; PGresult *res; - PGconn *conn; char *ins; int n; LOGDEBUG("%s(): cmd '%s'", __func__, cmd); - i_idname = require_name("idname", 3, (char *)idpatt, reply, siz); + i_idname = require_name(trf_root, "idname", 3, (char *)idpatt, reply, siz); if (!i_idname) return strdup(reply); - i_idvalue = require_name("idvalue", 1, (char *)intpatt, reply, siz); + i_idvalue = require_name(trf_root, "idvalue", 1, (char *)intpatt, reply, siz); if (!i_idvalue) return strdup(reply); @@ -5962,7 +6163,10 @@ static char *cmd_newid(char *cmd, char *id, tv_t *now, char *by, char *code, cha ins = "insert into idcontrol " "(idname,lastid" MODIFYDATECONTROL ") values (" PQPARAM10 ")"; - conn = dbconnect(); + if (!conn) { + conn = dbconnect(); + conned = true; + } res = PQexecParams(conn, ins, par, NULL, (const char **)params, NULL, NULL, 0); rescode = PQresultStatus(res); @@ -5974,7 +6178,8 @@ static char *cmd_newid(char *cmd, char *id, tv_t *now, char *by, char *code, cha ok = true; foil: PQclear(res); - PQfinish(conn); + if (conned) + PQfinish(conn); for (n = 0; n < par; n++) free(params[n]); @@ -5992,9 +6197,11 @@ foil: return strdup(reply); } -static char *cmd_payments(char *cmd, char *id, __maybe_unused tv_t *now, __maybe_unused char *by, - __maybe_unused char *code, __maybe_unused char *inet, - __maybe_unused tv_t *notcd) +static char *cmd_payments(__maybe_unused PGconn *conn, char *cmd, char *id, + __maybe_unused tv_t *now, __maybe_unused char *by, + __maybe_unused char *code, __maybe_unused char *inet, + __maybe_unused tv_t *notcd, + __maybe_unused K_TREE *trf_root) { K_ITEM *i_username, look, *u_item, *p_item; K_TREE_CTX ctx[1]; @@ -6008,11 +6215,13 @@ static char *cmd_payments(char *cmd, char *id, __maybe_unused tv_t *now, __maybe LOGDEBUG("%s(): cmd '%s'", __func__, cmd); - i_username = require_name("username", 3, (char *)userpatt, reply, siz); + i_username = require_name(trf_root, "username", 3, (char *)userpatt, reply, siz); if (!i_username) return strdup(reply); + K_RLOCK(users_free); u_item = find_users(DATA_TRANSFER(i_username)->data); + K_RUNLOCK(users_free); if (!u_item) return strdup("bad"); @@ -6047,9 +6256,10 @@ static char *cmd_payments(char *cmd, char *id, __maybe_unused tv_t *now, __maybe return buf; } -static char *cmd_workers(char *cmd, char *id, __maybe_unused tv_t *now, __maybe_unused char *by, - __maybe_unused char *code, __maybe_unused char *inet, - __maybe_unused tv_t *notcd) +static char *cmd_workers(__maybe_unused PGconn *conn, char *cmd, char *id, + __maybe_unused tv_t *now, __maybe_unused char *by, + __maybe_unused char *code, __maybe_unused char *inet, + __maybe_unused tv_t *notcd, K_TREE *trf_root) { K_ITEM *i_username, *i_stats, wlook, *u_item, *w_item, uslook, *us_item, *ws_item; K_TREE_CTX w_ctx[1], us_ctx[1]; @@ -6065,15 +6275,17 @@ static char *cmd_workers(char *cmd, char *id, __maybe_unused tv_t *now, __maybe_ LOGDEBUG("%s(): cmd '%s'", __func__, cmd); - i_username = require_name("username", 3, (char *)userpatt, reply, siz); + i_username = require_name(trf_root, "username", 3, (char *)userpatt, reply, siz); if (!i_username) return strdup(reply); + K_RLOCK(users_free); u_item = find_users(DATA_TRANSFER(i_username)->data); + K_RUNLOCK(users_free); if (!u_item) return strdup("bad"); - i_stats = optional_name("stats", 1, NULL); + i_stats = optional_name(trf_root, "stats", 1, NULL); if (!i_stats) stats = false; else @@ -6184,9 +6396,11 @@ static char *cmd_workers(char *cmd, char *id, __maybe_unused tv_t *now, __maybe_ return buf; } -static char *cmd_allusers(char *cmd, char *id, __maybe_unused tv_t *now, __maybe_unused char *by, - __maybe_unused char *code, __maybe_unused char *inet, - __maybe_unused tv_t *notcd) +static char *cmd_allusers(__maybe_unused PGconn *conn, char *cmd, char *id, + __maybe_unused tv_t *now, __maybe_unused char *by, + __maybe_unused char *code, __maybe_unused char *inet, + __maybe_unused tv_t *notcd, + __maybe_unused K_TREE *trf_root) { K_TREE *userstats_workername_root = new_ktree(); K_ITEM *us_item, *usw_item, *tmp_item, *u_item; @@ -6227,7 +6441,9 @@ static char *cmd_allusers(char *cmd, char *id, __maybe_unused tv_t *now, __maybe while (usw_item) { if (DATA_USERSTATS(usw_item)->userid != userid) { if (userid != -1) { + K_RLOCK(users_free); u_item = find_userid(userid); + K_RUNLOCK(users_free); if (!u_item) { LOGERR("%s() userid %"PRId64" ignored - userstats but not users", __func__, userid); @@ -6258,7 +6474,9 @@ static char *cmd_allusers(char *cmd, char *id, __maybe_unused tv_t *now, __maybe k_add_head(userstats_free, tmp_item); } if (userid != -1) { + K_RLOCK(users_free); u_item = find_userid(userid); + K_RUNLOCK(users_free); if (!u_item) { LOGERR("%s() userid %"PRId64" ignored - userstats but not users", __func__, userid); @@ -6289,12 +6507,13 @@ static char *cmd_allusers(char *cmd, char *id, __maybe_unused tv_t *now, __maybe return buf; } -static char *cmd_sharelog(char *cmd, char *id, __maybe_unused tv_t *notnow, - char *by, char *code, char *inet, tv_t *cd) +static char *cmd_sharelog(PGconn *conn, char *cmd, char *id, + __maybe_unused tv_t *notnow, char *by, + char *code, char *inet, tv_t *cd, + K_TREE *trf_root) { char reply[1024] = ""; size_t siz = sizeof(reply); - PGconn *conn; // log to logfile with processing success/failure code @@ -6314,51 +6533,50 @@ static char *cmd_sharelog(char *cmd, char *id, __maybe_unused tv_t *notnow, return NULL; } - i_workinfoid = require_name("workinfoid", 1, NULL, reply, siz); + i_workinfoid = require_name(trf_root, "workinfoid", 1, NULL, reply, siz); if (!i_workinfoid) return strdup(reply); - i_poolinstance = require_name("poolinstance", 1, NULL, reply, siz); + i_poolinstance = require_name(trf_root, "poolinstance", 1, NULL, reply, siz); if (!i_poolinstance) return strdup(reply); - i_transactiontree = require_name("transactiontree", 0, NULL, reply, siz); + i_transactiontree = require_name(trf_root, "transactiontree", 0, NULL, reply, siz); if (!i_transactiontree) return strdup(reply); - i_merklehash = require_name("merklehash", 0, NULL, reply, siz); + i_merklehash = require_name(trf_root, "merklehash", 0, NULL, reply, siz); if (!i_merklehash) return strdup(reply); - i_prevhash = require_name("prevhash", 1, NULL, reply, siz); + i_prevhash = require_name(trf_root, "prevhash", 1, NULL, reply, siz); if (!i_prevhash) return strdup(reply); - i_coinbase1 = require_name("coinbase1", 1, NULL, reply, siz); + i_coinbase1 = require_name(trf_root, "coinbase1", 1, NULL, reply, siz); if (!i_coinbase1) return strdup(reply); - i_coinbase2 = require_name("coinbase2", 1, NULL, reply, siz); + i_coinbase2 = require_name(trf_root, "coinbase2", 1, NULL, reply, siz); if (!i_coinbase2) return strdup(reply); - i_version = require_name("version", 1, NULL, reply, siz); + i_version = require_name(trf_root, "version", 1, NULL, reply, siz); if (!i_version) return strdup(reply); - i_bits = require_name("bits", 1, NULL, reply, siz); + i_bits = require_name(trf_root, "bits", 1, NULL, reply, siz); if (!i_bits) return strdup(reply); - i_ntime = require_name("ntime", 1, NULL, reply, siz); + i_ntime = require_name(trf_root, "ntime", 1, NULL, reply, siz); if (!i_ntime) return strdup(reply); - i_reward = require_name("reward", 1, NULL, reply, siz); + i_reward = require_name(trf_root, "reward", 1, NULL, reply, siz); if (!i_reward) return strdup(reply); - conn = dbconnect(); workinfoid = workinfo_add(conn, DATA_TRANSFER(i_workinfoid)->data, DATA_TRANSFER(i_poolinstance)->data, DATA_TRANSFER(i_transactiontree)->data, @@ -6370,8 +6588,7 @@ static char *cmd_sharelog(char *cmd, char *id, __maybe_unused tv_t *notnow, DATA_TRANSFER(i_bits)->data, DATA_TRANSFER(i_ntime)->data, DATA_TRANSFER(i_reward)->data, - by, code, inet, cd, igndup); - PQfinish(conn); + by, code, inet, cd, igndup, trf_root); if (workinfoid == -1) { LOGERR("%s.failed.DBE", id); @@ -6391,57 +6608,57 @@ static char *cmd_sharelog(char *cmd, char *id, __maybe_unused tv_t *notnow, return NULL; } - i_workinfoid = require_name("workinfoid", 1, NULL, reply, siz); + i_workinfoid = require_name(trf_root, "workinfoid", 1, NULL, reply, siz); if (!i_workinfoid) return strdup(reply); - i_username = require_name("username", 1, NULL, reply, siz); + i_username = require_name(trf_root, "username", 1, NULL, reply, siz); if (!i_username) return strdup(reply); - i_workername = require_name("workername", 1, NULL, reply, siz); + i_workername = require_name(trf_root, "workername", 1, NULL, reply, siz); if (!i_workername) return strdup(reply); - i_clientid = require_name("clientid", 1, NULL, reply, siz); + i_clientid = require_name(trf_root, "clientid", 1, NULL, reply, siz); if (!i_clientid) return strdup(reply); - i_enonce1 = require_name("enonce1", 1, NULL, reply, siz); + i_enonce1 = require_name(trf_root, "enonce1", 1, NULL, reply, siz); if (!i_enonce1) return strdup(reply); - i_nonce2 = require_name("nonce2", 1, NULL, reply, siz); + i_nonce2 = require_name(trf_root, "nonce2", 1, NULL, reply, siz); if (!i_nonce2) return strdup(reply); - i_nonce = require_name("nonce", 1, NULL, reply, siz); + i_nonce = require_name(trf_root, "nonce", 1, NULL, reply, siz); if (!i_nonce) return strdup(reply); - i_diff = require_name("diff", 1, NULL, reply, siz); + i_diff = require_name(trf_root, "diff", 1, NULL, reply, siz); if (!i_diff) return strdup(reply); - i_sdiff = require_name("sdiff", 1, NULL, reply, siz); + i_sdiff = require_name(trf_root, "sdiff", 1, NULL, reply, siz); if (!i_sdiff) return strdup(reply); - i_secondaryuserid = require_name("secondaryuserid", 1, NULL, reply, siz); + i_secondaryuserid = require_name(trf_root, "secondaryuserid", 1, NULL, reply, siz); if (!i_secondaryuserid) return strdup(reply); - ok = shares_add(DATA_TRANSFER(i_workinfoid)->data, - DATA_TRANSFER(i_username)->data, - DATA_TRANSFER(i_workername)->data, - DATA_TRANSFER(i_clientid)->data, - DATA_TRANSFER(i_enonce1)->data, - DATA_TRANSFER(i_nonce2)->data, - DATA_TRANSFER(i_nonce)->data, - DATA_TRANSFER(i_diff)->data, - DATA_TRANSFER(i_sdiff)->data, - DATA_TRANSFER(i_secondaryuserid)->data, - by, code, inet, cd); + ok = shares_add(conn, DATA_TRANSFER(i_workinfoid)->data, + DATA_TRANSFER(i_username)->data, + DATA_TRANSFER(i_workername)->data, + DATA_TRANSFER(i_clientid)->data, + DATA_TRANSFER(i_enonce1)->data, + DATA_TRANSFER(i_nonce2)->data, + DATA_TRANSFER(i_nonce)->data, + DATA_TRANSFER(i_diff)->data, + DATA_TRANSFER(i_sdiff)->data, + DATA_TRANSFER(i_secondaryuserid)->data, + by, code, inet, cd, trf_root); if (!ok) { LOGERR("%s.failed.DATA", id); @@ -6461,42 +6678,42 @@ static char *cmd_sharelog(char *cmd, char *id, __maybe_unused tv_t *notnow, return NULL; } - i_workinfoid = require_name("workinfoid", 1, NULL, reply, siz); + i_workinfoid = require_name(trf_root, "workinfoid", 1, NULL, reply, siz); if (!i_workinfoid) return strdup(reply); - i_username = require_name("username", 1, NULL, reply, siz); + i_username = require_name(trf_root, "username", 1, NULL, reply, siz); if (!i_username) return strdup(reply); - i_workername = require_name("workername", 1, NULL, reply, siz); + i_workername = require_name(trf_root, "workername", 1, NULL, reply, siz); if (!i_workername) return strdup(reply); - i_clientid = require_name("clientid", 1, NULL, reply, siz); + i_clientid = require_name(trf_root, "clientid", 1, NULL, reply, siz); if (!i_clientid) return strdup(reply); - i_errn = require_name("errn", 1, NULL, reply, siz); + i_errn = require_name(trf_root, "errn", 1, NULL, reply, siz); if (!i_errn) return strdup(reply); - i_error = require_name("error", 1, NULL, reply, siz); + i_error = require_name(trf_root, "error", 1, NULL, reply, siz); if (!i_error) return strdup(reply); - i_secondaryuserid = require_name("secondaryuserid", 1, NULL, reply, siz); + i_secondaryuserid = require_name(trf_root, "secondaryuserid", 1, NULL, reply, siz); if (!i_secondaryuserid) return strdup(reply); - ok = shareerrors_add(DATA_TRANSFER(i_workinfoid)->data, - DATA_TRANSFER(i_username)->data, - DATA_TRANSFER(i_workername)->data, - DATA_TRANSFER(i_clientid)->data, - DATA_TRANSFER(i_errn)->data, - DATA_TRANSFER(i_error)->data, - DATA_TRANSFER(i_secondaryuserid)->data, - by, code, inet, cd); + ok = shareerrors_add(conn, DATA_TRANSFER(i_workinfoid)->data, + DATA_TRANSFER(i_username)->data, + DATA_TRANSFER(i_workername)->data, + DATA_TRANSFER(i_clientid)->data, + DATA_TRANSFER(i_errn)->data, + DATA_TRANSFER(i_error)->data, + DATA_TRANSFER(i_secondaryuserid)->data, + by, code, inet, cd, trf_root); if (!ok) { LOGERR("%s.failed.DATA", id); return strdup("failed.DATA"); @@ -6513,15 +6730,15 @@ static char *cmd_sharelog(char *cmd, char *id, __maybe_unused tv_t *notnow, return NULL; } - i_workinfoid = require_name("workinfoid", 1, NULL, reply, siz); + i_workinfoid = require_name(trf_root, "workinfoid", 1, NULL, reply, siz); if (!i_workinfoid) return strdup(reply); - i_poolinstance = require_name("poolinstance", 1, NULL, reply, siz); + i_poolinstance = require_name(trf_root, "poolinstance", 1, NULL, reply, siz); if (!i_poolinstance) return strdup(reply); - ok = workinfo_age(NULL, DATA_TRANSFER(i_workinfoid)->data, + ok = workinfo_age(conn, DATA_TRANSFER(i_workinfoid)->data, DATA_TRANSFER(i_poolinstance)->data, by, code, inet, cd); @@ -6543,12 +6760,12 @@ static char *cmd_sharelog(char *cmd, char *id, __maybe_unused tv_t *notnow, } // TODO: the confirm update: identify block changes from workinfo height? -static char *cmd_blocks_do(char *cmd, char *id, char *by, char *code, char *inet, - tv_t *cd, bool igndup) +static char *cmd_blocks_do(PGconn *conn, char *cmd, char *id, char *by, + char *code, char *inet, tv_t *cd, bool igndup, + K_TREE *trf_root) { char reply[1024] = ""; size_t siz = sizeof(reply); - PGconn *conn; K_ITEM *i_height, *i_blockhash, *i_confirmed, *i_workinfoid, *i_username; K_ITEM *i_workername, *i_clientid, *i_enonce1, *i_nonce2, *i_nonce, *i_reward; char *msg; @@ -6556,55 +6773,54 @@ static char *cmd_blocks_do(char *cmd, char *id, char *by, char *code, char *inet LOGDEBUG("%s(): cmd '%s'", __func__, cmd); - i_height = require_name("height", 1, NULL, reply, siz); + i_height = require_name(trf_root, "height", 1, NULL, reply, siz); if (!i_height) return strdup(reply); - i_blockhash = require_name("blockhash", 1, NULL, reply, siz); + i_blockhash = require_name(trf_root, "blockhash", 1, NULL, reply, siz); if (!i_blockhash) return strdup(reply); - i_confirmed = require_name("confirmed", 1, NULL, reply, siz); + i_confirmed = require_name(trf_root, "confirmed", 1, NULL, reply, siz); if (!i_confirmed) return strdup(reply); DATA_TRANSFER(i_confirmed)->data[0] = tolower(DATA_TRANSFER(i_confirmed)->data[0]); switch(DATA_TRANSFER(i_confirmed)->data[0]) { case BLOCKS_NEW: - i_workinfoid = require_name("workinfoid", 1, NULL, reply, siz); + i_workinfoid = require_name(trf_root, "workinfoid", 1, NULL, reply, siz); if (!i_workinfoid) return strdup(reply); - i_username = require_name("username", 1, NULL, reply, siz); + i_username = require_name(trf_root, "username", 1, NULL, reply, siz); if (!i_username) return strdup(reply); - i_workername = require_name("workername", 1, NULL, reply, siz); + i_workername = require_name(trf_root, "workername", 1, NULL, reply, siz); if (!i_workername) return strdup(reply); - i_clientid = require_name("clientid", 1, NULL, reply, siz); + i_clientid = require_name(trf_root, "clientid", 1, NULL, reply, siz); if (!i_clientid) return strdup(reply); - i_enonce1 = require_name("enonce1", 1, NULL, reply, siz); + i_enonce1 = require_name(trf_root, "enonce1", 1, NULL, reply, siz); if (!i_enonce1) return strdup(reply); - i_nonce2 = require_name("nonce2", 1, NULL, reply, siz); + i_nonce2 = require_name(trf_root, "nonce2", 1, NULL, reply, siz); if (!i_nonce2) return strdup(reply); - i_nonce = require_name("nonce", 1, NULL, reply, siz); + i_nonce = require_name(trf_root, "nonce", 1, NULL, reply, siz); if (!i_nonce) return strdup(reply); - i_reward = require_name("reward", 1, NULL, reply, siz); + i_reward = require_name(trf_root, "reward", 1, NULL, reply, siz); if (!i_reward) return strdup(reply); msg = "added"; - conn = dbconnect(); ok = blocks_add(conn, DATA_TRANSFER(i_height)->data, DATA_TRANSFER(i_blockhash)->data, DATA_TRANSFER(i_confirmed)->data, @@ -6616,17 +6832,18 @@ static char *cmd_blocks_do(char *cmd, char *id, char *by, char *code, char *inet DATA_TRANSFER(i_nonce2)->data, DATA_TRANSFER(i_nonce)->data, DATA_TRANSFER(i_reward)->data, - by, code, inet, cd, igndup, id); + by, code, inet, cd, igndup, id, + trf_root); break; case BLOCKS_CONFIRM: msg = "confirmed"; - conn = dbconnect(); ok = blocks_add(conn, DATA_TRANSFER(i_height)->data, DATA_TRANSFER(i_blockhash)->data, DATA_TRANSFER(i_confirmed)->data, EMPTY, EMPTY, EMPTY, EMPTY, EMPTY, EMPTY, EMPTY, EMPTY, - by, code, inet, cd, igndup, id); + by, code, inet, cd, igndup, id, + trf_root); break; default: LOGERR("%s(): %s.failed.invalid confirm='%s'", @@ -6634,8 +6851,6 @@ static char *cmd_blocks_do(char *cmd, char *id, char *by, char *code, char *inet return strdup("failed.DATA"); } - PQfinish(conn); - if (!ok) { LOGERR("%s.failed.DBE", id); return strdup("failed.DBE"); @@ -6646,8 +6861,10 @@ static char *cmd_blocks_do(char *cmd, char *id, char *by, char *code, char *inet return strdup(reply); } -static char *cmd_blocks(char *cmd, char *id, __maybe_unused tv_t *notnow, - char *by, char *code, char *inet, tv_t *cd) +static char *cmd_blocks(PGconn *conn, char *cmd, char *id, + __maybe_unused tv_t *notnow, char *by, + char *code, char *inet, tv_t *cd, + K_TREE *trf_root) { bool igndup = false; @@ -6658,54 +6875,52 @@ static char *cmd_blocks(char *cmd, char *id, __maybe_unused tv_t *notnow, return NULL; } - return cmd_blocks_do(cmd, id, by, code, inet, cd, igndup); + return cmd_blocks_do(conn, cmd, id, by, code, inet, cd, igndup, trf_root); } -static char *cmd_auth_do(char *cmd, char *id, __maybe_unused tv_t *now, char *by, - char *code, char *inet, tv_t *cd, bool igndup) +static char *cmd_auth_do(PGconn *conn, char *cmd, char *id, char *by, + char *code, char *inet, tv_t *cd, bool igndup, + K_TREE *trf_root) { char reply[1024] = ""; size_t siz = sizeof(reply); - PGconn *conn; K_ITEM *i_poolinstance, *i_username, *i_workername, *i_clientid; K_ITEM *i_enonce1, *i_useragent; char *secuserid; LOGDEBUG("%s(): cmd '%s'", __func__, cmd); - i_poolinstance = optional_name("poolinstance", 1, NULL); + i_poolinstance = optional_name(trf_root, "poolinstance", 1, NULL); if (!i_poolinstance) i_poolinstance = &auth_poolinstance; - i_username = require_name("username", 1, NULL, reply, siz); + i_username = require_name(trf_root, "username", 1, NULL, reply, siz); if (!i_username) return strdup(reply); - i_workername = require_name("workername", 1, NULL, reply, siz); + i_workername = require_name(trf_root, "workername", 1, NULL, reply, siz); if (!i_workername) return strdup(reply); - i_clientid = require_name("clientid", 1, NULL, reply, siz); + i_clientid = require_name(trf_root, "clientid", 1, NULL, reply, siz); if (!i_clientid) return strdup(reply); - i_enonce1 = require_name("enonce1", 1, NULL, reply, siz); + i_enonce1 = require_name(trf_root, "enonce1", 1, NULL, reply, siz); if (!i_enonce1) return strdup(reply); - i_useragent = require_name("useragent", 0, NULL, reply, siz); + i_useragent = require_name(trf_root, "useragent", 0, NULL, reply, siz); if (!i_useragent) return strdup(reply); - conn = dbconnect(); secuserid = auths_add(conn, DATA_TRANSFER(i_poolinstance)->data, DATA_TRANSFER(i_username)->data, DATA_TRANSFER(i_workername)->data, DATA_TRANSFER(i_clientid)->data, DATA_TRANSFER(i_enonce1)->data, DATA_TRANSFER(i_useragent)->data, - by, code, inet, cd, igndup); - PQfinish(conn); + by, code, inet, cd, igndup, trf_root); if (!secuserid) { LOGDEBUG("%s.failed.DBE", id); @@ -6717,8 +6932,10 @@ static char *cmd_auth_do(char *cmd, char *id, __maybe_unused tv_t *now, char *by return strdup(reply); } -static char *cmd_auth(char *cmd, char *id, tv_t *now, char *by, - char *code, char *inet, tv_t *cd) +static char *cmd_auth(PGconn *conn, char *cmd, char *id, + __maybe_unused tv_t *now, char *by, + char *code, char *inet, tv_t *cd, + K_TREE *trf_root) { bool igndup = false; @@ -6729,12 +6946,13 @@ static char *cmd_auth(char *cmd, char *id, tv_t *now, char *by, return NULL; } - return cmd_auth_do(cmd, id, now, by, code, inet, cd, igndup); + return cmd_auth_do(conn, cmd, id, by, code, inet, cd, igndup, trf_root); } -static char *cmd_homepage(char *cmd, char *id, __maybe_unused tv_t *now, __maybe_unused char *by, - __maybe_unused char *code, __maybe_unused char *inet, - __maybe_unused tv_t *notcd) +static char *cmd_homepage(__maybe_unused PGconn *conn, char *cmd, char *id, + __maybe_unused tv_t *now, __maybe_unused char *by, + __maybe_unused char *code, __maybe_unused char *inet, + __maybe_unused tv_t *notcd, K_TREE *trf_root) { K_ITEM *i_username, *u_item, *b_item, *p_item, *us_item, look; double u_hashrate5m, u_hashrate1hr; @@ -6747,7 +6965,7 @@ static char *cmd_homepage(char *cmd, char *id, __maybe_unused tv_t *now, __maybe LOGDEBUG("%s(): cmd '%s'", __func__, cmd); - i_username = optional_name("username", 1, NULL); + i_username = optional_name(trf_root, "username", 1, NULL); APPEND_REALLOC_INIT(buf, off, len); APPEND_REALLOC(buf, off, len, "ok."); @@ -6804,8 +7022,11 @@ static char *cmd_homepage(char *cmd, char *id, __maybe_unused tv_t *now, __maybe } u_item = NULL; - if (i_username) + if (i_username) { + K_RLOCK(users_free); u_item = find_users(DATA_TRANSFER(i_username)->data); + K_RUNLOCK(users_free); + } has_uhr = false; if (p_item && u_item) { @@ -6868,9 +7089,11 @@ static char *cmd_homepage(char *cmd, char *id, __maybe_unused tv_t *now, __maybe return buf; } -static char *cmd_dsp(char *cmd, char *id, __maybe_unused tv_t *now, - __maybe_unused char *by, __maybe_unused char *code, - __maybe_unused char *inet, __maybe_unused tv_t *notcd) +static char *cmd_dsp(__maybe_unused PGconn *conn, __maybe_unused char *cmd, + char *id, __maybe_unused tv_t *now, + __maybe_unused char *by, __maybe_unused char *code, + __maybe_unused char *inet, __maybe_unused tv_t *notcd, + __maybe_unused K_TREE *trf_root) { __maybe_unused K_ITEM *i_file; __maybe_unused char reply[1024] = ""; @@ -6882,11 +7105,11 @@ static char *cmd_dsp(char *cmd, char *id, __maybe_unused tv_t *now, LOGDEBUG("%s.disabled.dsp", id); return strdup("disabled.dsp"); /* - i_file = require_name("file", 1, NULL, reply, siz); + i_file = require_name(trf_root, "file", 1, NULL, reply, siz); if (!i_file) return strdup(reply); - dsp_ktree(transfer_free, transfer_root, DATA_TRANSFER(i_file)->data, NULL); + dsp_ktree(transfer_free, trf_root, DATA_TRANSFER(i_file)->data, NULL); dsp_ktree(sharesummary_free, sharesummary_root, DATA_TRANSFER(i_file)->data, NULL); @@ -6897,9 +7120,10 @@ static char *cmd_dsp(char *cmd, char *id, __maybe_unused tv_t *now, */ } -static char *cmd_stats(char *cmd, char *id, __maybe_unused tv_t *now, - __maybe_unused char *by, __maybe_unused char *code, - __maybe_unused char *inet, __maybe_unused tv_t *notcd) +static char *cmd_stats(__maybe_unused PGconn *conn, char *cmd, char *id, + __maybe_unused tv_t *now, __maybe_unused char *by, + __maybe_unused char *code, __maybe_unused char *inet, + __maybe_unused tv_t *notcd, __maybe_unused K_TREE *trf_root) { char tmp[1024], *buf; size_t len, off; @@ -6945,6 +7169,8 @@ static char *cmd_stats(char *cmd, char *id, __maybe_unused tv_t *now, USEINFO(poolstats, 1, 1); USEINFO(userstats, 4, 2); USEINFO(workerstatus, 1, 1); + USEINFO(workqueue, 1, 0); + USEINFO(transfer, 0, 0); snprintf(tmp, sizeof(tmp), "%ctotalram=%"PRIu64, FLDSEP, tot); APPEND_REALLOC(buf, off, len, tmp); @@ -6953,28 +7179,6 @@ static char *cmd_stats(char *cmd, char *id, __maybe_unused tv_t *now, return buf; } -enum cmd_values { - CMD_UNSET, - CMD_REPLY, // Means something was wrong - send back reply - CMD_SHUTDOWN, - CMD_PING, - CMD_SHARELOG, - CMD_AUTH, - CMD_ADDUSER, - CMD_CHKPASS, - CMD_POOLSTAT, - CMD_USERSTAT, - CMD_BLOCK, - CMD_NEWID, - CMD_PAYMENTS, - CMD_WORKERS, - CMD_ALLUSERS, - CMD_HOMEPAGE, - CMD_DSP, - CMD_STATS, - CMD_END -}; - // TODO: limit access #define ACCESS_POOL "p" #define ACCESS_SYSTEM "s" @@ -6987,7 +7191,8 @@ static struct CMDS { char *cmd_str; bool noid; // doesn't require an id bool createdate; // requires a createdate - char *(*func)(char *, char *, tv_t *, char *, char *, char *, tv_t *); + char *(*func)(PGconn *, char *, char *, tv_t *, char *, char *, + char *, tv_t *, K_TREE *); char *access; } cmds[] = { { CMD_SHUTDOWN, "shutdown", true, false, NULL, ACCESS_SYSTEM }, @@ -7012,7 +7217,9 @@ static struct CMDS { { CMD_END, NULL, false, false, NULL, NULL } }; -static enum cmd_values breakdown(char *buf, int *which_cmds, char *cmd, char *id, tv_t *cd) +static enum cmd_values breakdown(K_TREE **trf_root, K_STORE **trf_store, + char *buf, int *which_cmds, char *cmd, + char *id, tv_t *cd) { char reply[1024] = ""; K_TREE_CTX ctx[1]; @@ -7020,6 +7227,8 @@ static enum cmd_values breakdown(char *buf, int *which_cmds, char *cmd, char *id char *cmdptr, *idptr, *data, *next, *eq; bool noid = false; + *trf_root = NULL; + *trf_store = NULL; *which_cmds = CMD_UNSET; *cmd = *id = '\0'; cd->tv_sec = 0; @@ -7062,6 +7271,8 @@ static enum cmd_values breakdown(char *buf, int *which_cmds, char *cmd, char *id return CMD_REPLY; } + *trf_root = new_ktree(); + *trf_store = k_new_store(transfer_free); next = data; if (next && strncmp(next, JSON_TRANSFER, JSON_TRANSFER_LEN) == 0) { json_t *json_data; @@ -7158,13 +7369,13 @@ static enum cmd_values breakdown(char *buf, int *which_cmds, char *cmd, char *id if (ok) STRNCPY(DATA_TRANSFER(item)->name, json_key); - if (!ok || find_in_ktree(transfer_root, item, cmp_transfer, ctx)) { + if (!ok || find_in_ktree(*trf_root, item, cmp_transfer, ctx)) { if (DATA_TRANSFER(item)->data != DATA_TRANSFER(item)->value) free(DATA_TRANSFER(item)->data); k_add_head(transfer_free, item); } else { - transfer_root = add_to_ktree(transfer_root, item, cmp_transfer); - k_add_head(transfer_store, item); + *trf_root = add_to_ktree(*trf_root, item, cmp_transfer); + k_add_head(*trf_store, item); } json_iter = json_object_iter_next(json_data, json_iter); } @@ -7189,19 +7400,19 @@ static enum cmd_values breakdown(char *buf, int *which_cmds, char *cmd, char *id STRNCPY(DATA_TRANSFER(item)->value, eq); DATA_TRANSFER(item)->data = DATA_TRANSFER(item)->value; - if (find_in_ktree(transfer_root, item, cmp_transfer, ctx)) { + if (find_in_ktree(*trf_root, item, cmp_transfer, ctx)) { if (DATA_TRANSFER(item)->data != DATA_TRANSFER(item)->value) free(DATA_TRANSFER(item)->data); k_add_head(transfer_free, item); } else { - transfer_root = add_to_ktree(transfer_root, item, cmp_transfer); - k_add_head(transfer_store, item); + *trf_root = add_to_ktree(*trf_root, item, cmp_transfer); + k_add_head(*trf_store, item); } } K_WUNLOCK(transfer_free); } if (cmds[*which_cmds].createdate) { - item = require_name("createdate", 10, NULL, reply, sizeof(reply)); + item = require_name(*trf_root, "createdate", 10, NULL, reply, sizeof(reply)); if (!item) return CMD_REPLY; @@ -7409,30 +7620,331 @@ static void *summariser(__maybe_unused void *arg) { pthread_detach(pthread_self()); - while (!summarizer_die && !db_load_complete) + while (!everyone_die && !db_load_complete) cksleep_ms(42); - while (!summarizer_die) { - sleep(19); + while (!everyone_die) { + sleep(13); - if (!summarizer_die) + if (!everyone_die) summarise_poolstats(); - if (!summarizer_die) + if (!everyone_die) summarise_userstats(); } return NULL; } -static void reload_line(char *filename, uint64_t count, char *buf) +static void *socketer(__maybe_unused void *arg) +{ + proc_instance_t *pi = (proc_instance_t *)arg; + unixsock_t *us = &pi->us; + char *end, *ans = NULL, *rep = NULL, *buf = NULL, *dot; + char cmd[CMD_SIZ+1], id[ID_SIZ+1], reply[1024+1]; + char *last_auth = NULL, *reply_auth = NULL; + char *last_adduser = NULL, *reply_adduser = NULL; + char *last_chkpass = NULL, *reply_chkpass = NULL; + char *last_newid = NULL, *reply_newid = NULL; + char *last_web = NULL, *reply_web = NULL; + char *reply_last, duptype[CMD_SIZ+1]; + enum cmd_values cmdnum; + int sockd, which_cmds; + WORKQUEUE *workqueue; + K_STORE *trf_store; + K_TREE *trf_root; + K_ITEM *item; + size_t siz; + tv_t now, cd; + bool dup, want_first; + + pthread_detach(pthread_self()); + + while (!everyone_die && !db_auths_complete) + cksem_mswait(&socketer_sem, 420); + + want_first = true; + while (!everyone_die) { + if (buf) + dealloc(buf); + sockd = accept(us->sockd, NULL, NULL); + if (sockd < 0) { + LOGERR("Failed to accept on socket in listener"); + break; + } + + cmdnum = CMD_UNSET; + trf_root = NULL; + trf_store = NULL; + + buf = recv_unix_msg(sockd); + // Once we've read the message + setnow(&now); + if (buf) { + end = buf + strlen(buf) - 1; + // strip trailing \n and \r + while (end >= buf && (*end == '\n' || *end == '\r')) + *(end--) = '\0'; + } + if (!buf || !*buf) { + // An empty message wont get a reply + if (!buf) + LOGWARNING("Failed to get message in listener"); + else + LOGWARNING("Empty message in listener"); + } else { + if (want_first) { + ck_wlock(&fpm_lock); + first_pool_message = strdup(buf); + ck_wunlock(&fpm_lock); + want_first = false; + } + + /* For duplicates: + * Queued pool messages are handled by the queue code + * but since they reply ok.queued that message can + * be returned every time here + * System: repeat process them + * Web: current php web sends a timestamp of seconds + * so duplicate code will only trigger if the same + * message is sent within the same second and thus + * will effectively reduce the processing load for + * sequential duplicates + * adduser duplicates are handled by the DB code + * auth, chkpass, adduser, newid - remember individual + * last message and reply and repeat the reply without + * reprocessing the message + */ + dup = false; + if (last_auth && strcmp(last_auth, buf) == 0) { + reply_last = reply_auth; + dup = true; + } else if (last_chkpass && strcmp(last_chkpass, buf) == 0) { + reply_last = reply_chkpass; + dup = true; + } else if (last_adduser && strcmp(last_adduser, buf) == 0) { + reply_last = reply_adduser; + dup = true; + } else if (last_newid && strcmp(last_newid, buf) == 0) { + reply_last = reply_newid; + dup = true; + } else if (last_web && strcmp(last_web, buf) == 0) { + reply_last = reply_web; + dup = true; + } + if (dup) { + send_unix_msg(sockd, reply_last); + STRNCPY(duptype, buf); + dot = strchr(duptype, '.'); + if (dot) + *dot = '\0'; + snprintf(reply, sizeof(reply), "%s%ld,%ld.%s", + LOGDUP, now.tv_sec, now.tv_usec, duptype); + LOGFILE(reply); + LOGWARNING("Duplicate '%s' message received", duptype); + } else { + LOGFILE(buf); + cmdnum = breakdown(&trf_root, &trf_store, buf, &which_cmds, cmd, id, &cd); + switch (cmdnum) { + case CMD_REPLY: + snprintf(reply, sizeof(reply), "%s.%ld.?.", id, now.tv_sec); + send_unix_msg(sockd, reply); + break; + case CMD_SHUTDOWN: + LOGWARNING("Listener received shutdown message, terminating ckdb"); + snprintf(reply, sizeof(reply), "%s.%ld.ok.exiting", id, now.tv_sec); + send_unix_msg(sockd, reply); + everyone_die = true; + break; + case CMD_PING: + LOGDEBUG("Listener received ping request"); + snprintf(reply, sizeof(reply), "%s.%ld.ok.pong", id, now.tv_sec); + send_unix_msg(sockd, reply); + break; + // Always process immediately: + case CMD_AUTH: + case CMD_CHKPASS: + case CMD_ADDUSER: + case CMD_NEWID: + case CMD_STATS: + ans = cmds[which_cmds].func(NULL, cmd, id, &now, + by_default, + (char *)__func__, + inet_default, + &cd, trf_root); + siz = strlen(ans) + strlen(id) + 32; + rep = malloc(siz); + snprintf(rep, siz, "%s.%ld.%s", id, now.tv_sec, ans); + send_unix_msg(sockd, rep); + free(ans); + ans = NULL; + switch (cmdnum) { + case CMD_AUTH: + if (last_auth) + free(last_auth); + last_auth = buf; + buf = NULL; + if (reply_auth) + free(reply_auth); + reply_auth = rep; + break; + case CMD_CHKPASS: + if (last_chkpass) + free(last_chkpass); + last_chkpass = buf; + buf = NULL; + if (reply_chkpass) + free(reply_chkpass); + reply_chkpass = rep; + break; + case CMD_ADDUSER: + if (last_adduser) + free(last_adduser); + last_adduser = buf; + buf = NULL; + if (reply_adduser) + free(reply_adduser); + reply_adduser = rep; + break; + case CMD_NEWID: + if (last_newid) + free(last_newid); + last_newid = buf; + buf = NULL; + if (reply_newid) + free(reply_newid); + reply_newid = rep; + break; + default: + free(rep); + } + rep = NULL; + break; + // Process, but reject (loading) until startup_complete + case CMD_HOMEPAGE: + case CMD_ALLUSERS: + case CMD_WORKERS: + case CMD_PAYMENTS: + case CMD_DSP: + if (!startup_complete) { + snprintf(reply, sizeof(reply), + "%s.%ld.loading.%s", + id, now.tv_sec, cmd); + send_unix_msg(sockd, reply); + } else { + ans = cmds[which_cmds].func(NULL, cmd, id, &now, + by_default, + (char *)__func__, + inet_default, + &cd, trf_root); + siz = strlen(ans) + strlen(id) + 32; + rep = malloc(siz); + snprintf(rep, siz, "%s.%ld.%s", id, now.tv_sec, ans); + send_unix_msg(sockd, rep); + free(ans); + ans = NULL; + if (cmdnum == CMD_DSP) + free(rep); + else { + if (last_web) + free(last_web); + last_web = buf; + buf = NULL; + if (reply_web) + free(reply_web); + reply_web = rep; + } + rep = NULL; + } + break; + // Always queue (ok.queued) + case CMD_SHARELOG: + case CMD_POOLSTAT: + case CMD_USERSTAT: + case CMD_BLOCK: + snprintf(reply, sizeof(reply), + "%s.%ld.ok.queued", + id, now.tv_sec); + send_unix_msg(sockd, reply); + + K_WLOCK(workqueue_free); + item = k_unlink_head(workqueue_free); + K_WUNLOCK(workqueue_free); + + workqueue = DATA_WORKQUEUE(item); + workqueue->buf = buf; + buf = NULL; + workqueue->which_cmds = which_cmds; + workqueue->cmdnum = cmdnum; + STRNCPY(workqueue->cmd, cmd); + STRNCPY(workqueue->id, id); + copy_tv(&(workqueue->now), &now); + STRNCPY(workqueue->by, by_default); + STRNCPY(workqueue->code, __func__); + STRNCPY(workqueue->inet, inet_default); + copy_tv(&(workqueue->cd), &cd); + workqueue->trf_root = trf_root; + trf_root = NULL; + workqueue->trf_store = trf_store; + trf_store = NULL; + + K_WLOCK(workqueue_free); + k_add_tail(workqueue_store, item); + K_WUNLOCK(workqueue_free); + + cksem_post(&workqueue_sem); + break; + // Code error + default: + LOGEMERG("%s() CODE ERROR unhandled message %d %.32s...", + __func__, cmdnum, buf); + snprintf(reply, sizeof(reply), + "%s.%ld.failed.code", + id, now.tv_sec); + send_unix_msg(sockd, reply); + break; + } + } + } + close(sockd); + + tick(); + + if (trf_root) + trf_root = free_ktree(trf_root, NULL); + if (trf_store) { + item = trf_store->head; + while (item) { + if (DATA_TRANSFER(item)->data != DATA_TRANSFER(item)->value) + free(DATA_TRANSFER(item)->data); + item = item->next; + } + K_WLOCK(transfer_free); + k_list_transfer_to_head(trf_store, transfer_free); + K_WUNLOCK(transfer_free); + trf_store = k_free_store(trf_store); + } + } + + if (buf) + dealloc(buf); + // TODO: if anyone cares, free all the dup buffers :P + close_unix_socket(us->sockd, us->path); + + return NULL; +} + +static bool reload_line(PGconn *conn, char *filename, uint64_t count, char *buf) { char cmd[CMD_SIZ+1], id[ID_SIZ+1]; enum cmd_values cmdnum; char *end, *ans; int which_cmds; + K_STORE *trf_store = NULL; + K_TREE *trf_root = NULL; K_ITEM *item; tv_t now, cd; + bool finished; // Once we've read the message setnow(&now); @@ -7448,8 +7960,18 @@ static void reload_line(char *filename, uint64_t count, char *buf) else LOGERR("%s() Empty message line %"PRIu64, __func__, count); } else { + finished = false; + ck_wlock(&fpm_lock); + if (first_pool_message && strcmp(first_pool_message, buf) == 0) + finished = true; + ck_wunlock(&fpm_lock); + if (finished) { + LOGERR("%s() reload completed, ckpool queue match at line %"PRIu64, __func__, count); + return true; + } + LOGFILE(buf); - cmdnum = breakdown(buf, &which_cmds, cmd, id, &cd); + cmdnum = breakdown(&trf_root, &trf_store, buf, &which_cmds, cmd, id, &cd); switch (cmdnum) { // Ignore case CMD_REPLY: @@ -7475,9 +7997,11 @@ static void reload_line(char *filename, uint64_t count, char *buf) case CMD_POOLSTAT: case CMD_USERSTAT: case CMD_BLOCK: - ans = cmds[which_cmds].func(cmd, id, &now, (char *)"code", + ans = cmds[which_cmds].func(conn, cmd, id, &now, + by_default, (char *)__func__, - (char *)"127.0.0.1", &cd); + inet_default, + &cd, trf_root); if (ans) free(ans); break; @@ -7488,19 +8012,25 @@ static void reload_line(char *filename, uint64_t count, char *buf) break; } - K_WLOCK(transfer_free); - transfer_root = free_ktree(transfer_root, NULL); - item = transfer_store->head; - while (item) { - if (DATA_TRANSFER(item)->data != DATA_TRANSFER(item)->value) - free(DATA_TRANSFER(item)->data); - item = item->next; + if (trf_root) + trf_root = free_ktree(trf_root, NULL); + if (trf_store) { + item = trf_store->head; + while (item) { + if (DATA_TRANSFER(item)->data != DATA_TRANSFER(item)->value) + free(DATA_TRANSFER(item)->data); + item = item->next; + } + K_WLOCK(transfer_free); + k_list_transfer_to_head(trf_store, transfer_free); + K_WUNLOCK(transfer_free); + trf_store = k_free_store(trf_store); } - k_list_transfer_to_head(transfer_store, transfer_free); - K_WUNLOCK(transfer_free); } tick(); + + return false; } // Log files are every ... @@ -7511,14 +8041,15 @@ static void reload_line(char *filename, uint64_t count, char *buf) /* If the reload start file is missing and -r was specified correctly: * touch the filename reported in "Failed to open 'filename'" * when ckdb aborts at the beginning of the reload */ -static void reload_from(tv_t *start) +static bool reload_from(tv_t *start) { + PGconn *conn = NULL; char buf[DATE_BUFSIZ+1], run[DATE_BUFSIZ+1]; size_t rflen = strlen(restorefrom); char *missingfirst = NULL, *missinglast = NULL; int missing_count; int processing; - bool finished = false; + bool finished = false, matched = false, ret = true; char *filename = NULL; char data[MAX_READ]; uint64_t count, total; @@ -7540,6 +8071,8 @@ static void reload_from(tv_t *start) snprintf(data, sizeof(data), "reload.%s.s0", run); LOGFILE(data); + conn = dbconnect(); + total = 0; processing = 0; while (!finished) { @@ -7547,8 +8080,8 @@ static void reload_from(tv_t *start) processing++; count = 0; - while (fgets_unlocked(data, MAX_READ, fp)) - reload_line(filename, ++count, data); + while (!matched && fgets_unlocked(data, MAX_READ, fp)) + matched = reload_line(conn, filename, ++count, data); if (ferror(fp)) { int err = errno; @@ -7563,6 +8096,8 @@ static void reload_from(tv_t *start) total += count; fclose(fp); free(filename); + if (matched) + break; start->tv_sec += ROLL_S; filename = rotating_filename(restorefrom, start->tv_sec); fp = fopen(filename, "r"); @@ -7609,6 +8144,8 @@ static void reload_from(tv_t *start) } } + PQfinish(conn); + snprintf(data, sizeof(data), "reload.%s.%"PRIu64, run, total); LOGFILE(data); LOGWARNING("%s(): read %d file%s, total %"PRIu64" line%s", @@ -7616,166 +8153,102 @@ static void reload_from(tv_t *start) processing, processing == 1 ? "" : "s", total, total == 1 ? "" : "s"); + if (!matched) { + ck_wlock(&fpm_lock); + if (first_pool_message) { + LOGERR("%s() reload completed without finding ckpool queue match '%.32s'...", + __func__, first_pool_message); + LOGERR("%s() restart ckdb to resolve this", __func__); + ret = false; + } + ck_wunlock(&fpm_lock); + } + reloading = false; + + return ret; +} + +static void process_queued(K_ITEM *wq_item) +{ + static char *last_buf = NULL; + WORKQUEUE *workqueue; + K_ITEM *item; + char *ans; + + workqueue = DATA_WORKQUEUE(wq_item); + + // Simply ignore the (very rare) duplicates + if (!last_buf || strcmp(workqueue->buf, last_buf)) { + ans = cmds[workqueue->which_cmds].func(NULL, workqueue->cmd, workqueue->id, + &(workqueue->now), workqueue->by, + workqueue->code, workqueue->inet, + &(workqueue->cd), workqueue->trf_root); + free(ans); + } + + if (last_buf) + free(last_buf); + last_buf = workqueue->buf; + + workqueue->trf_root = free_ktree(workqueue->trf_root, NULL); + item = workqueue->trf_store->head; + while (item) { + if (DATA_TRANSFER(item)->data != DATA_TRANSFER(item)->value) + free(DATA_TRANSFER(item)->data); + item = item->next; + } + K_WLOCK(transfer_free); + k_list_transfer_to_head(workqueue->trf_store, transfer_free); + K_WUNLOCK(transfer_free); + workqueue->trf_store = k_free_store(workqueue->trf_store); + + K_WLOCK(workqueue_free); + k_add_head(workqueue_free, wq_item); + K_WUNLOCK(workqueue_free); } // TODO: equivalent of api_allow static void *listener(void *arg) { proc_instance_t *pi = (proc_instance_t *)arg; - unixsock_t *us = &pi->us; - char *end, *ans, *rep, *buf = NULL, *dot; - char *last_msg = NULL, *last_reply = NULL; - char cmd[CMD_SIZ+1], id[ID_SIZ+1], reply[1024+1]; - // Minimise the size in case of garbage - char duptype[CMD_SIZ+1]; - enum cmd_values cmdnum, last_cmd = 9001; - int sockd, which_cmds; - pthread_t summzer; - uint64_t counter = 0; - K_ITEM *item; - size_t siz; - tv_t now, cd; - bool dup; + pthread_t sock_pt; + pthread_t summ_pt; + K_ITEM *wq_item; + int qc; + + create_pthread(&sock_pt, socketer, arg); - create_pthread(&summzer, summariser, NULL); + create_pthread(&summ_pt, summariser, NULL); rename_proc(pi->sockname); if (!setup_data()) { + everyone_die = true; LOGEMERG("ABORTING"); return NULL; } - LOGWARNING("%s(): ckdb ready", __func__); - - startup_complete = true; - - while (true) { - dealloc(buf); - sockd = accept(us->sockd, NULL, NULL); - if (sockd < 0) { - LOGERR("Failed to accept on socket in listener"); - break; - } + K_RLOCK(workqueue_store); + qc = workqueue_store->count; + K_RUNLOCK(workqueue_store); - cmdnum = CMD_UNSET; + LOGWARNING("%s(): ckdb ready, queue %d", __func__, qc); - buf = recv_unix_msg(sockd); - // Once we've read the message - setnow(&now); - if (buf) { - end = buf + strlen(buf) - 1; - // strip trailing \n and \r - while (end >= buf && (*end == '\n' || *end == '\r')) - *(end--) = '\0'; - } - if (!buf || !*buf) { - // An empty message wont get a reply - if (!buf) - LOGWARNING("Failed to get message in listener"); - else - LOGWARNING("Empty message in listener"); - } else { - /* For duplicates: - * System: shutdown and ping are always processed, - * so for any others, send a ping between them - * if you need to send the same message twice - * Web: if the pool didn't do anything since the original - * then the reply could be wrong for any reply that - * changes over time ... however if the pool is busy - * and we get the same request repeatedly, this will - * reduce the load - thus always send the same reply - * Pool: must not process it, must send back the same reply - * TODO: remember last message+reply per source - */ - if (last_msg && strcmp(last_msg, buf) == 0) { - dup = true; - // This means an exact duplicate of the last non-dup - snprintf(reply, sizeof(reply), "%s%ld,%ld", LOGDUP, now.tv_sec, now.tv_usec); - LOGFILE(reply); - cmdnum = last_cmd; - - STRNCPY(duptype, buf); - dot = strchr(duptype, '.'); - if (dot) - *dot = '\0'; - LOGWARNING("Duplicate '%s' message received", duptype); - } else { - dup = false; - LOGFILE(buf); - cmdnum = breakdown(buf, &which_cmds, cmd, id, &cd); - last_cmd = cmdnum; - } - switch (cmdnum) { - case CMD_REPLY: - if (dup) - send_unix_msg(sockd, last_reply); - else { - snprintf(reply, sizeof(reply), "%s.%ld.?.", id, now.tv_sec); - if (last_reply) - free(last_reply); - last_reply = strdup(reply); - send_unix_msg(sockd, reply); - } - break; - case CMD_SHUTDOWN: - LOGWARNING("Listener received shutdown message, terminating ckdb"); - snprintf(reply, sizeof(reply), "%s.%ld.ok.exiting", id, now.tv_sec); - send_unix_msg(sockd, reply); - break; - case CMD_PING: - LOGDEBUG("Listener received ping request"); - // Generate a new reply each time even on dup - snprintf(reply, sizeof(reply), "%s.%ld.ok.pong", id, now.tv_sec); - send_unix_msg(sockd, reply); - break; - default: - if (dup) - send_unix_msg(sockd, last_reply); - else { - ans = cmds[which_cmds].func(cmd, id, &now, (char *)"code", - (char *)__func__, - (char *)"127.0.0.1", &cd); - siz = strlen(ans) + strlen(id) + 32; - rep = malloc(siz); - snprintf(rep, siz, "%s.%ld.%s", id, now.tv_sec, ans); - free(ans); - ans = NULL; - if (last_reply) - free(last_reply); - last_reply = strdup(rep); - send_unix_msg(sockd, rep); - free(rep); - rep = NULL; - } - break; - } - } - close(sockd); - - counter++; - tick(); - - if (cmdnum == CMD_SHUTDOWN) - break; + startup_complete = true; - K_WLOCK(transfer_free); - transfer_root = free_ktree(transfer_root, NULL); - item = transfer_store->head; - while (item) { - if (DATA_TRANSFER(item)->data != DATA_TRANSFER(item)->value) - free(DATA_TRANSFER(item)->data); - item = item->next; - } - k_list_transfer_to_head(transfer_store, transfer_free); - K_WUNLOCK(transfer_free); + // Process queued work + while (!everyone_die) { + K_WLOCK(workqueue_store); + wq_item = k_unlink_head(workqueue_store); + K_WUNLOCK(workqueue_store); + if (wq_item) { + process_queued(wq_item); + tick(); + } else + cksem_mswait(&workqueue_sem, 420); } - dealloc(buf); - if (last_reply) - free(last_reply); - close_unix_socket(us->sockd, us->path); return NULL; } @@ -7878,7 +8351,6 @@ int main(int argc, char **argv) } if (!ckp.socket_dir) { -// ckp.socket_dir = strdup("/tmp/"); ckp.socket_dir = strdup("/opt/"); realloc_strcat(&ckp.socket_dir, ckp.name); } diff --git a/src/ckpool.c b/src/ckpool.c index dd46f162..c66e727e 100644 --- a/src/ckpool.c +++ b/src/ckpool.c @@ -275,8 +275,10 @@ retry: LOGWARNING("Failed to send_procmsg to connector"); } else if (cmdmatch(buf, "restart")) { if (!fork()) { - ckp->initial_args[ckp->args++] = strdup("-H"); - ckp->initial_args[ckp->args] = NULL; + if (!ckp->handover) { + ckp->initial_args[ckp->args++] = strdup("-H"); + ckp->initial_args[ckp->args] = NULL; + } execv(ckp->initial_args[0], (char *const *)ckp->initial_args); } } else { @@ -505,110 +507,18 @@ out: return buf; } -static const char *invalid_unknown = " (unknown reason)"; -static const char *invalid_toodeep = " >9 levels, recursion?"; - -#define first_invalid(_json_data) _first_invalid(_json_data, 0) - -static char *_first_invalid(json_t *json_data, int level) +/* Send a json msg to ckdb and return the response */ +char *_ckdb_msg_call(const ckpool_t *ckp, char *msg, const char *file, const char *func, + const int line) { - const char *json_key, *json_str; - json_t *json_value; - void *json_iter; - int json_typ; - char buf[512], *inside; - bool found; - - if (level > 9) - return strdup(invalid_toodeep); - - buf[0] = '\0'; - found = false; - json_iter = json_object_iter(json_data); - while (!found && json_iter) { - json_key = json_object_iter_key(json_iter); - json_value = json_object_iter_value(json_iter); - json_typ = json_typeof(json_value); - switch(json_typ) { - case JSON_STRING: - json_str = json_string_value(json_value); - if (json_str == NULL) { - snprintf(buf, sizeof(buf), - " %s is NULL", json_key); - found = true; - } - break; - case JSON_REAL: - case JSON_INTEGER: - case JSON_TRUE: - case JSON_FALSE: - break; - case JSON_ARRAY: - inside = _first_invalid(json_value, level+1); - if (inside != invalid_unknown) { - snprintf(buf, sizeof(buf), - " %s : [%s ]", json_key, inside); - free(inside); - found = true; - } - break; - case JSON_NULL: - snprintf(buf, sizeof(buf), - " %s is NULL", json_key); - found = true; - break; - default: - snprintf(buf, sizeof(buf), - " unknown type %d for %s", - json_typ, json_key); - found = true; - break; - } - if (!found) - json_iter = json_object_iter_next(json_data, json_iter); - } - - if (!*buf) { - if (level > 0) - return (char *)invalid_unknown; - else - return strdup(invalid_unknown); - } else - return strdup(buf); -} - -/* Send a json msg to ckdb with its idmsg and return the response, consuming - * the json on success */ -char *_json_ckdb_call(const ckpool_t *ckp, const char *idmsg, json_t *val, bool logged, - const char *file, const char *func, const int line) -{ - char *msg = NULL, *dump, *buf = NULL; - - dump = json_dumps(val, JSON_COMPACT); - if (unlikely(!dump)) { - char *invalid = first_invalid(val); - LOGWARNING("Json dump failed in json_ckdb_call from %s %s:%d%s", file, func, line, invalid); - free(invalid); - return buf; - } - ASPRINTF(&msg, "%s.id.json=%s", idmsg, dump); - if (!logged) { - char logname[512]; + char *buf = NULL; - snprintf(logname, 511, "%s%s", ckp->logdir, ckp->ckdb_name); - rotating_log(logname, msg); - } - free(dump); LOGDEBUG("Sending ckdb: %s", msg); buf = _send_recv_ckdb(ckp, msg, file, func, line); LOGDEBUG("Received from ckdb: %s", buf); - free(msg); - if (likely(buf)) - json_decref(val); return buf; } - json_t *json_rpc_call(connsock_t *cs, const char *rpc_req) { char *http_req = NULL; diff --git a/src/ckpool.h b/src/ckpool.h index d4260aad..4bbd0159 100644 --- a/src/ckpool.h +++ b/src/ckpool.h @@ -177,9 +177,9 @@ char *_send_recv_proc(proc_instance_t *pi, const char *msg, const char *file, co #define send_recv_proc(pi, msg) _send_recv_proc(pi, msg, __FILE__, __func__, __LINE__) char *_send_recv_ckdb(const ckpool_t *ckp, const char *msg, const char *file, const char *func, const int line); #define send_recv_ckdb(ckp, msg) _send_recv_ckdb(ckp, msg, __FILE__, __func__, __LINE__) -char *_json_ckdb_call(const ckpool_t *ckp, const char *idmsg, json_t *val, bool logged, - const char *file, const char *func, const int line); -#define json_ckdb_call(ckp, idmsg, val, logged) _json_ckdb_call(ckp, idmsg, val, logged, __FILE__, __func__, __LINE__) +char *_ckdb_msg_call(const ckpool_t *ckp, char *msg, const char *file, const char *func, + const int line); +#define ckdb_msg_call(ckp, msg) _ckdb_msg_call(ckp, msg, __FILE__, __func__, __LINE__) json_t *json_rpc_call(connsock_t *cs, const char *rpc_req); diff --git a/src/connector.c b/src/connector.c index 4cbe5b6f..bcd15ef4 100644 --- a/src/connector.c +++ b/src/connector.c @@ -71,14 +71,13 @@ struct sender_send { client_instance_t *client; char *buf; int len; - bool polling; - ts_t polltime; }; typedef struct sender_send sender_send_t; /* For the linked list of pending sends */ static sender_send_t *sender_sends; +static sender_send_t *delayed_sends; /* For protecting the pending sends list */ static pthread_mutex_t sender_lock; @@ -335,34 +334,30 @@ void *sender(void *arg) sender_send_t *sender_send; client_instance_t *client; int ret, fd, ofs = 0; - bool polling = false; mutex_lock(&sender_lock); - if (sender_sends && sender_sends->polling) - polling = true; - if (!sender_sends || polling) { + /* Poll every 100ms if there are no new sends */ + if (!sender_sends) { + const ts_t polltime = {0, 100000000}; ts_t timeout_ts; - if (!polling) { - /* Wait 1 second in pure event driven mode */ - ts_realtime(&timeout_ts); - timeout_ts.tv_sec += 1; - } else { - /* Poll every 100ms if the head of the list is - * a delayed writer. */ - timeout_ts.tv_sec = 0; - timeout_ts.tv_nsec = 100000000; - timeraddspec(&timeout_ts, &sender_sends->polltime); - } + ts_realtime(&timeout_ts); + timeraddspec(&timeout_ts, &polltime); pthread_cond_timedwait(&sender_cond, &sender_lock, &timeout_ts); } sender_send = sender_sends; - if (likely(sender_send)) + if (sender_send) DL_DELETE(sender_sends, sender_send); mutex_unlock(&sender_lock); - if (!sender_send) - continue; + /* Service delayed sends only if we have timed out on the + * conditional with no new sends appearing. */ + if (!sender_send) { + if (!delayed_sends) + continue; + sender_send = delayed_sends; + DL_DELETE(delayed_sends, sender_send); + } client = sender_send->client; @@ -390,13 +385,10 @@ void *sender(void *arg) } LOGDEBUG("Client %d not ready for writes", client->id); - /* Append it to the tail of the list */ - mutex_lock(&sender_lock); - DL_APPEND(sender_sends, sender_send); - mutex_unlock(&sender_lock); - - sender_send->polling = true; - ts_realtime(&sender_send->polltime); + /* Append it to the tail of the delayed sends list. + * This is the only function that alters it so no + * locking is required. */ + DL_APPEND(delayed_sends, sender_send); continue; } while (sender_send->len) { diff --git a/src/libckpool.c b/src/libckpool.c index dd0bd1c4..572c860c 100644 --- a/src/libckpool.c +++ b/src/libckpool.c @@ -258,6 +258,67 @@ void _ck_wunlock(cklock_t *lock, const char *file, const char *func, const int l _mutex_unlock(&lock->mutex, file, func, line); } +void _cksem_init(sem_t *sem, const char *file, const char *func, const int line) +{ + int ret; + if ((ret = sem_init(sem, 0, 0))) + quitfrom(1, file, func, line, "Failed to sem_init ret=%d errno=%d", ret, errno); +} + +void _cksem_post(sem_t *sem, const char *file, const char *func, const int line) +{ + if (unlikely(sem_post(sem))) + quitfrom(1, file, func, line, "Failed to sem_post errno=%d sem=0x%p", errno, sem); +} + +void _cksem_wait(sem_t *sem, const char *file, const char *func, const int line) +{ +retry: + if (unlikely(sem_wait(sem))) { + if (errno == EINTR) + goto retry; + quitfrom(1, file, func, line, "Failed to sem_wait errno=%d sem=0x%p", errno, sem); + } +} + +int _cksem_mswait(sem_t *sem, int ms, const char *file, const char *func, const int line) +{ + ts_t abs_timeout, ts_now; + tv_t tv_now; + int ret; + + tv_time(&tv_now); + tv_to_ts(&ts_now, &tv_now); + ms_to_ts(&abs_timeout, ms); +retry: + timeraddspec(&abs_timeout, &ts_now); + ret = sem_timedwait(sem, &abs_timeout); + + if (ret) { + if (likely(errno == ETIMEDOUT)) + return ETIMEDOUT; + if (errno == EINTR) + goto retry; + quitfrom(1, file, func, line, "Failed to sem_timedwait errno=%d sem=0x%p", errno, sem); + } + return 0; +} + +void cksem_reset(sem_t *sem) +{ + int ret; + + do { + ret = sem_trywait(sem); + if (unlikely(ret < 0 && (errno == EINTR))) + ret = 0; + } while (!ret); +} + +void cksem_destroy(sem_t *sem) +{ + sem_destroy(sem); +} bool extract_sockaddr(char *url, char **sockaddr_url, char **sockaddr_port) { diff --git a/src/libckpool.h b/src/libckpool.h index c59b24f7..b3251370 100644 --- a/src/libckpool.h +++ b/src/libckpool.h @@ -21,6 +21,7 @@ #include #include #include +#include #if HAVE_BYTESWAP_H # include @@ -356,6 +357,18 @@ void _ck_dlock(cklock_t *lock, const char *file, const char *func, const int lin void _ck_runlock(cklock_t *lock, const char *file, const char *func, const int line); void _ck_wunlock(cklock_t *lock, const char *file, const char *func, const int line); +void _cksem_init(sem_t *sem, const char *file, const char *func, const int line); +void _cksem_post(sem_t *sem, const char *file, const char *func, const int line); +void _cksem_wait(sem_t *sem, const char *file, const char *func, const int line); +int _cksem_mswait(sem_t *sem, int ms, const char *file, const char *func, const int line); +void cksem_reset(sem_t *sem); +void cksem_destroy(sem_t *sem); + +#define cksem_init(_sem) _cksem_init(_sem, __FILE__, __func__, __LINE__) +#define cksem_post(_sem) _cksem_post(_sem, __FILE__, __func__, __LINE__) +#define cksem_wait(_sem) _cksem_wait(_sem, __FILE__, __func__, __LINE__) +#define cksem_mswait(_sem, _timeout) _cksem_mswait(_sem, _timeout, __FILE__, __func__, __LINE__) + static inline bool sock_connecting(void) { return errno == EINPROGRESS; diff --git a/src/stratifier.c b/src/stratifier.c index ed49e38c..6fd372d7 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -170,13 +170,6 @@ struct json_params { typedef struct json_params json_params_t; -struct ckdb_msg { - json_t *val; - int idtype; -}; - -typedef struct ckdb_msg ckdb_msg_t; - /* Stratum json messages with their associated client id */ struct smsg { json_t *json_msg; @@ -415,12 +408,32 @@ static void purge_share_hashtable(int64_t wb_id) static char *status_chars = "|/-\\"; +/* Absorbs the json and generates a ckdb json message, logs it to the ckdb + * log and returns the malloced message. */ +static char *ckdb_msg(ckpool_t *ckp, json_t *val, const int idtype) +{ + char *json_msg = json_dumps(val, JSON_COMPACT); + char logname[512]; + char *ret = NULL; + + if (unlikely(!json_msg)) + goto out; + ASPRINTF(&ret, "%s.id.json=%s", ckdb_ids[idtype], json_msg); + free(json_msg); +out: + json_decref(val); + snprintf(logname, 511, "%s%s", ckp->logdir, ckp->ckdb_name); + rotating_log(logname, ret); + return ret; +} + static void _ckdbq_add(ckpool_t *ckp, const int idtype, json_t *val, const char *file, const char *func, const int line) { - static int counter = 0; static time_t time_counter; - ckdb_msg_t *msg; + static int counter = 0; + + char *json_msg; time_t now_t; char ch; @@ -441,10 +454,13 @@ static void _ckdbq_add(ckpool_t *ckp, const int idtype, json_t *val, const char if (ckp->standalone) return json_decref(val); - msg = ckalloc(sizeof(ckdb_msg_t)); - msg->val = val; - msg->idtype = idtype; - ckmsgq_add(ckdbq, msg); + json_msg = ckdb_msg(ckp, val, idtype); + if (unlikely(!json_msg)) { + LOGWARNING("Failed to dump json from %s %s:%d", file, func, line); + return; + } + + ckmsgq_add(ckdbq, json_msg); } #define ckdbq_add(ckp, idtype, val) _ckdbq_add(ckp, idtype, val, __FILE__, __func__, __LINE__) @@ -1249,13 +1265,13 @@ static user_instance_t *authorise_user(const char *workername) * and get SUID parameters back. We don't add these requests to the ckdbqueue * since we have to wait for the response but this is done from the authoriser * thread so it won't hold anything up but other authorisations. */ -static bool send_recv_auth(stratum_instance_t *client) +static int send_recv_auth(stratum_instance_t *client) { ckpool_t *ckp = client->ckp; + char *buf, *json_msg; char cdfield[64]; - bool ret = false; + int ret = 1; json_t *val; - char *buf; ts_t now; ts_realtime(&now); @@ -1272,7 +1288,13 @@ static bool send_recv_auth(stratum_instance_t *client) "createby", "code", "createcode", __func__, "createinet", client->address); - buf = json_ckdb_call(ckp, ckdb_ids[ID_AUTH], val, false); + json_msg = ckdb_msg(ckp, val, ID_AUTH); + if (unlikely(!json_msg)) { + LOGWARNING("Failed to dump json in send_recv_auth"); + return ret; + } + buf = ckdb_msg_call(ckp, json_msg); + free(json_msg); if (likely(buf)) { char *secondaryuserid, *response = alloca(128); @@ -1284,17 +1306,18 @@ static bool send_recv_auth(stratum_instance_t *client) response, secondaryuserid); if (!safecmp(response, "ok") && secondaryuserid) { client->secondaryuserid = strdup(secondaryuserid); - ret = true; + ret = 0; } } else { + ret = -1; LOGWARNING("Got no auth response from ckdb :("); - json_decref(val); } return ret; } -static json_t *parse_authorise(stratum_instance_t *client, json_t *params_val, json_t **err_val, const char *address) +static json_t *parse_authorise(stratum_instance_t *client, json_t *params_val, json_t **err_val, + const char *address, int *errnum) { bool ret = false; const char *buf; @@ -1334,8 +1357,11 @@ static json_t *parse_authorise(stratum_instance_t *client, json_t *params_val, j client->workername = strdup(buf); if (client->ckp->standalone) ret = true; - else - ret = send_recv_auth(client); + else { + *errnum = send_recv_auth(client); + if (!*errnum) + ret = true; + } client->authorised = ret; if (client->authorised) inc_worker(client->user_instance); @@ -2127,7 +2153,7 @@ static void sauth_process(ckpool_t *ckp, json_params_t *jp) { json_t *result_val, *json_msg, *err_val = NULL; stratum_instance_t *client; - int client_id; + int client_id, errnum = 0; client_id = jp->client_id; @@ -2139,15 +2165,19 @@ static void sauth_process(ckpool_t *ckp, json_params_t *jp) LOGINFO("Authoriser failed to find client id %d in hashtable!", client_id); goto out; } - result_val = parse_authorise(client, jp->params, &err_val, jp->address); + result_val = parse_authorise(client, jp->params, &err_val, jp->address, &errnum); if (json_is_true(result_val)) { char *buf; ASPRINTF(&buf, "Authorised, welcome to %s %s!", ckp->name, client->user_instance->username); stratum_send_message(client, buf); - } else - stratum_send_message(client, "Failed authorisation :("); + } else { + if (errnum < 0) + stratum_send_message(client, "Authorisations temporarily offline :("); + else + stratum_send_message(client, "Failed authorisation :("); + } json_msg = json_object(); json_object_set_new_nocheck(json_msg, "result", result_val); json_object_set_new_nocheck(json_msg, "error", err_val ? err_val : json_null()); @@ -2158,14 +2188,13 @@ out: } -static void ckdbq_process(ckpool_t *ckp, ckdb_msg_t *data) +static void ckdbq_process(ckpool_t *ckp, char *msg) { static bool failed = false; - bool logged = false; char *buf = NULL; while (!buf) { - buf = json_ckdb_call(ckp, ckdb_ids[data->idtype], data->val, logged); + buf = ckdb_msg_call(ckp, msg); if (unlikely(!buf)) { if (!failed) { failed = true; @@ -2173,13 +2202,13 @@ static void ckdbq_process(ckpool_t *ckp, ckdb_msg_t *data) } sleep(5); } - logged = true; } + free(msg); if (failed) { failed = false; LOGWARNING("Successfully resumed talking to ckdb"); } - LOGINFO("Got %s ckdb response: %s", ckdb_ids[data->idtype], buf); + LOGINFO("Got ckdb response: %s", buf); free(buf); }