Browse Source

Merge branch 'master' into passthrough

master
Con Kolivas 10 years ago
parent
commit
1b8d0019be
  1. 22
      ckpool.conf
  2. 19
      ckproxy.conf
  3. 2
      pool/base.php
  4. 3
      pool/db.php
  5. 38
      pool/page_reg.php
  6. 9
      pool/page_workers.php
  7. 40
      sql/v0.1-v0.2.sql
  8. 115
      sql/v0.2-v0.3.sql
  9. 41
      sql/v0.3-v0.4.sql
  10. 42
      sql/v0.4-v0.5.sql
  11. 17
      sql/v0.6.txt
  12. 1290
      src/ckdb.c
  13. 102
      src/ckpool.c
  14. 6
      src/ckpool.h
  15. 42
      src/connector.c
  16. 61
      src/libckpool.c
  17. 13
      src/libckpool.h
  18. 89
      src/stratifier.c

22
ckpool.conf

@ -0,0 +1,22 @@
{
"btcd" : [
{
"url" : "localhost:8332",
"auth" : "user",
"pass" : "pass"
},
{
"url" : "backup:8332",
"auth" : "user",
"pass" : "pass"
}
],
"btcaddress" : "15qSxP1SQcUX3o4nhkfdbgyoWEFMomJ4rZ",
"btcsig" : "/mined by ck/",
"blockpoll" : 500,
"update_interval" : 30,
"serverurl" : "ckpool.org:3333",
"mindiff" : 1,
"startdiff" : 1,
"logdir" : "logs"
}

19
ckproxy.conf

@ -0,0 +1,19 @@
{
"proxy" : [
{
"url" : "ckpool.org:3333",
"auth" : "user",
"pass" : "pass"
},
{
"url" : "backup.ckpool.org:3333",
"auth" : "user",
"pass" : "pass"
}
],
"update_interval" : 30,
"serverurl" : "192.168.1.100:3334",
"mindiff" : 1,
"startdiff" : 1,
"logdir" : "logs"
}

2
pool/base.php

@ -229,7 +229,7 @@ function validate()
$whoid = ''; $whoid = '';
if (!isset($_SESSION['ckpkey'])) if (!isset($_SESSION['ckpkey']))
return false; return array(false, NULL);
$key = $_SESSION['ckpkey']; $key = $_SESSION['ckpkey'];
if (!isset($_SESSION[$key])) if (!isset($_SESSION[$key]))

3
pool/db.php

@ -50,7 +50,8 @@ function msgEncode($cmd, $id, $fields)
{ {
global $send_sep, $fld_sep, $val_sep; global $send_sep, $fld_sep, $val_sep;
$msg = $cmd . $send_sep . $id; $t = time() % 10000;
$msg = $cmd . $send_sep . $id.$t;
$first = true; $first = true;
foreach ($fields as $name => $value) foreach ($fields as $name => $value)
{ {

38
pool/page_reg.php

@ -75,33 +75,47 @@ function safepass($pass)
function show_reg($menu, $name, $u) function show_reg($menu, $name, $u)
{ {
$user = getparam('user', false); $user = getparam('user', false);
$mail = getparam('mail', false); $mail = trim(getparam('mail', false));
$pass = getparam('pass', false); $pass = getparam('pass', false);
$pass2 = getparam('pass2', false); $pass2 = getparam('pass2', false);
$data = array(); $data = array();
$ok = true;
if ($user === NULL && $mail === NULL && $pass === NULL && $pass2 === NULL) if (nuem($user))
$ok = false; $data['user'] = '';
else else
{
if ($user !== NULL)
$data['user'] = $user; $data['user'] = $user;
if (nuem($mail))
$data['mail'] = '';
else else
$ok = false;
if ($mail !== NULL)
$data['mail'] = $mail; $data['mail'] = $mail;
else
$ok = true;
if (nuem($user) || nuem($mail) || nuem($pass) || nuem($pass2))
$ok = false; $ok = false;
if ($pass === NULL || safepass($pass) !== true) else
{
if (safepass($pass) !== true)
{ {
$ok = false; $ok = false;
$data['error'] = "Password is unsafe"; $data['error'] = "Password is unsafe - requires 6 or more characters, including<br>" .
} elseif ($pass2 === NULL || $pass2 != $pass) "at least one of each uppercase, lowercase and digits";
}
elseif ($pass2 != $pass)
{ {
$ok = false; $ok = false;
$data['error'] = "Passwords don't match"; $data['error'] = "Passwords don't match";
} }
$orig = $user;
$user = preg_replace('/[_\\.]/', '', $orig);
if ($user != $orig)
{
$ok = false;
$data['error'] = "Username cannot include '.' or '_'";
$data['user'] = $user;
}
} }
if ($ok === true) if ($ok === true)

9
pool/page_workers.php

@ -82,10 +82,15 @@ function doworker($data, $user)
else else
{ {
$uhr /= 10000000; $uhr /= 10000000;
if ($uhr < 0.01)
$uhr = '0GHs';
else
{
if ($uhr < 100000) if ($uhr < 100000)
$uhr = (round($uhr)/100).'GHs'; $uhr = number_format(round($uhr)/100,2).'GHs';
else else
$uhr = (round($uhr/1000)/100).'THs'; $uhr = number_format(round($uhr/1000)/100,2).'THs';
}
} }
$pg .= "<td class=dr>$uhr</td>"; $pg .= "<td class=dr>$uhr</td>";
$pg .= "</tr>\n"; $pg .= "</tr>\n";

40
sql/v0.1-v0.2.sql

@ -1,40 +0,0 @@
SET SESSION AUTHORIZATION 'postgres';
BEGIN transaction;
DO $$
DECLARE ver TEXT;
BEGIN
UPDATE version set version='0.2' where vlock=1 and version='0.1';
IF found THEN
RETURN;
END IF;
SELECT version into ver from version
WHERE vlock=1;
RAISE EXCEPTION 'Wrong DB version - expect "0.1" - found "%"', ver;
END $$;
ALTER TABLE ONLY poolstats
ADD COLUMN elapsed bigint DEFAULT 0 NOT NULL;
CREATE TABLE userstats (
poolinstance character varying(256) NOT NULL,
userid bigint NOT NULL,
elapsed bigint DEFAULT 0 NOT NULL,
hashrate float NOT NULL,
hashrate5m float NOT NULL,
hashrate1hr float NOT NULL,
hashrate24hr float NOT NULL,
createdate timestamp with time zone NOT NULL,
createby character varying(64) DEFAULT ''::character varying NOT NULL,
createcode character varying(128) DEFAULT ''::character varying NOT NULL,
createinet character varying(128) DEFAULT ''::character varying NOT NULL,
PRIMARY KEY (poolinstance, userid, createdate)
);
END transaction;

115
sql/v0.2-v0.3.sql

@ -1,115 +0,0 @@
SET SESSION AUTHORIZATION 'postgres';
BEGIN transaction;
DO $$
DECLARE ver TEXT;
BEGIN
UPDATE version set version='0.3' where vlock=1 and version='0.2';
IF found THEN
RETURN;
END IF;
SELECT version into ver from version
WHERE vlock=1;
RAISE EXCEPTION 'Wrong DB version - expect "0.2" - found "%"', ver;
END $$;
DROP TABLE sharesummary;
CREATE TABLE sharesummary (
userid bigint NOT NULL,
workername character varying(256) NOT NULL,
workinfoid bigint NOT NULL,
diffacc float NOT NULL,
diffsta float NOT NULL,
diffdup float NOT NULL,
diffhi float NOT NULL,
diffrej float NOT NULL,
shareacc float NOT NULL,
sharesta float NOT NULL,
sharedup float NOT NULL,
sharehi float NOT NULL,
sharerej float NOT NULL,
sharecount bigint NOT NULL,
errorcount bigint NOT NULL,
firstshare timestamp with time zone NOT NULL,
lastshare timestamp with time zone NOT NULL,
complete char NOT NULL,
createdate timestamp with time zone NOT NULL,
createby character varying(64) NOT NULL,
createcode character varying(128) NOT NULL,
createinet character varying(128) NOT NULL,
modifydate timestamp with time zone NOT NULL,
modifyby character varying(64) NOT NULL,
modifycode character varying(128) NOT NULL,
modifyinet character varying(128) NOT NULL,
PRIMARY KEY (userid, workername, workinfoid)
);
DROP TABLE blocksummary;
CREATE TABLE workmarkers (
markerid bigint NOT NULL,
workinfoidend bigint NOT NULL,
workinfoidstart bigint NOT NULL,
description character varying(256) DEFAULT ''::character varying NOT NULL,
createdate timestamp with time zone NOT NULL,
createby character varying(64) DEFAULT ''::character varying NOT NULL,
createcode character varying(128) DEFAULT ''::character varying NOT NULL,
createinet character varying(128) DEFAULT ''::character varying NOT NULL,
expirydate timestamp with time zone DEFAULT '6666-06-06 06:06:06+00',
PRIMARY KEY (workinfoidstart)
);
CREATE UNIQUE INDEX workmarkersid ON workmarkers USING btree (markerid);
CREATE TABLE markersummary (
markerid bigint NOT NULL,
userid bigint NOT NULL,
workername character varying(256) NOT NULL,
diffacc float NOT NULL,
diffsta float NOT NULL,
diffdup float NOT NULL,
diffhi float NOT NULL,
diffrej float NOT NULL,
shareacc float NOT NULL,
sharesta float NOT NULL,
sharedup float NOT NULL,
sharehi float NOT NULL,
sharerej float NOT NULL,
sharecount bigint NOT NULL,
errorcount bigint NOT NULL,
firstshare timestamp with time zone NOT NULL,
lastshare timestamp with time zone NOT NULL,
complete char NOT NULL,
createdate timestamp with time zone NOT NULL,
createby character varying(64) NOT NULL,
createcode character varying(128) NOT NULL,
createinet character varying(128) NOT NULL,
modifydate timestamp with time zone NOT NULL,
modifyby character varying(64) NOT NULL,
modifycode character varying(128) NOT NULL,
modifyinet character varying(128) NOT NULL,
PRIMARY KEY (markerid, userid, workername)
);
ALTER TABLE ONLY eventlog
ADD COLUMN poolinstance character varying(256) NOT NULL;
ALTER TABLE ONLY auths
ADD COLUMN poolinstance character varying(256) DEFAULT ''::character varying NOT NULL;
ALTER TABLE ONLY auths
ALTER COLUMN poolinstance DROP DEFAULT;
ALTER TABLE ONLY userstats
ADD COLUMN workername character varying(256) NOT NULL;
ALTER TABLE ONLY poolstats
ALTER COLUMN elapsed DROP DEFAULT;
END transaction;

41
sql/v0.3-v0.4.sql

@ -1,41 +0,0 @@
SET SESSION AUTHORIZATION 'postgres';
BEGIN transaction;
DO $$
DECLARE ver TEXT;
BEGIN
UPDATE version set version='0.4' where vlock=1 and version='0.3';
IF found THEN
RETURN;
END IF;
SELECT version into ver from version
WHERE vlock=1;
RAISE EXCEPTION 'Wrong DB version - expect "0.3" - found "%"', ver;
END $$;
DROP TABLE userstats;
CREATE TABLE userstats (
userid bigint NOT NULL,
workername character varying(256) NOT NULL,
elapsed bigint NOT NULL,
hashrate float NOT NULL,
hashrate5m float NOT NULL,
hashrate1hr float NOT NULL,
hashrate24hr float NOT NULL,
summarylevel char NOT NULL,
statsdate timestamp with time zone NOT NULL,
createdate timestamp with time zone NOT NULL,
createby character varying(64) DEFAULT ''::character varying NOT NULL,
createcode character varying(128) DEFAULT ''::character varying NOT NULL,
createinet character varying(128) DEFAULT ''::character varying NOT NULL,
PRIMARY KEY (userid, workername, summarylevel, statsdate)
);
END transaction;

42
sql/v0.4-v0.5.sql

@ -1,42 +0,0 @@
SET SESSION AUTHORIZATION 'postgres';
BEGIN transaction;
DO $$
DECLARE ver TEXT;
BEGIN
UPDATE version set version='0.5' where vlock=1 and version='0.4';
IF found THEN
RETURN;
END IF;
SELECT version into ver from version
WHERE vlock=1;
RAISE EXCEPTION 'Wrong DB version - expect "0.4" - found "%"', ver;
END $$;
DROP TABLE userstats;
CREATE TABLE userstats (
userid bigint NOT NULL,
workername character varying(256) NOT NULL,
elapsed bigint NOT NULL,
hashrate float NOT NULL,
hashrate5m float NOT NULL,
hashrate1hr float NOT NULL,
hashrate24hr float NOT NULL,
summarylevel char NOT NULL,
summarycount integer NOT NULL,
statsdate timestamp with time zone NOT NULL,
createdate timestamp with time zone NOT NULL,
createby character varying(64) DEFAULT ''::character varying NOT NULL,
createcode character varying(128) DEFAULT ''::character varying NOT NULL,
createinet character varying(128) DEFAULT ''::character varying NOT NULL,
PRIMARY KEY (userid, workername, summarylevel, statsdate)
);
END transaction;

17
sql/v0.6.txt

@ -1,17 +0,0 @@
To update to 0.6:
1) stop ckdb and ckpool
2) connect to postgres and delete the contents of all the tables except:
users, idcontrol, version
e.g. in psql to see all the table row counts: \i tables.sql
and to delete the rows from a table: delete from auths;
3) in pgsql: update version set version='0.6';
4) rename all your ckdb20140*.log files using the src/relog.sh script
5) start ckdb
6) wait for ckdb to say it is ready - i.e. wait for it to reload all the
ckdb20140*.log files
7) start ckpool
You can speed up step 6) if you don't care about losing the contents of the
DB forever: just go to the log directory and rename all the ckdb20140*.log
files something like: rename ".log" ".ignore" ckdb20140*.log
(or delete them)

1290
src/ckdb.c

File diff suppressed because it is too large Load Diff

102
src/ckpool.c

@ -275,8 +275,10 @@ retry:
LOGWARNING("Failed to send_procmsg to connector"); LOGWARNING("Failed to send_procmsg to connector");
} else if (cmdmatch(buf, "restart")) { } else if (cmdmatch(buf, "restart")) {
if (!fork()) { if (!fork()) {
if (!ckp->handover) {
ckp->initial_args[ckp->args++] = strdup("-H"); ckp->initial_args[ckp->args++] = strdup("-H");
ckp->initial_args[ckp->args] = NULL; ckp->initial_args[ckp->args] = NULL;
}
execv(ckp->initial_args[0], (char *const *)ckp->initial_args); execv(ckp->initial_args[0], (char *const *)ckp->initial_args);
} }
} else { } else {
@ -505,110 +507,18 @@ out:
return buf; return buf;
} }
static const char *invalid_unknown = " (unknown reason)"; /* Send a json msg to ckdb and return the response */
static const char *invalid_toodeep = " >9 levels, recursion?"; char *_ckdb_msg_call(const ckpool_t *ckp, char *msg, const char *file, const char *func,
const int line)
#define first_invalid(_json_data) _first_invalid(_json_data, 0)
static char *_first_invalid(json_t *json_data, int level)
{ {
const char *json_key, *json_str; char *buf = NULL;
json_t *json_value;
void *json_iter;
int json_typ;
char buf[512], *inside;
bool found;
if (level > 9)
return strdup(invalid_toodeep);
buf[0] = '\0';
found = false;
json_iter = json_object_iter(json_data);
while (!found && json_iter) {
json_key = json_object_iter_key(json_iter);
json_value = json_object_iter_value(json_iter);
json_typ = json_typeof(json_value);
switch(json_typ) {
case JSON_STRING:
json_str = json_string_value(json_value);
if (json_str == NULL) {
snprintf(buf, sizeof(buf),
" %s is NULL", json_key);
found = true;
}
break;
case JSON_REAL:
case JSON_INTEGER:
case JSON_TRUE:
case JSON_FALSE:
break;
case JSON_ARRAY:
inside = _first_invalid(json_value, level+1);
if (inside != invalid_unknown) {
snprintf(buf, sizeof(buf),
" %s : [%s ]", json_key, inside);
free(inside);
found = true;
}
break;
case JSON_NULL:
snprintf(buf, sizeof(buf),
" %s is NULL", json_key);
found = true;
break;
default:
snprintf(buf, sizeof(buf),
" unknown type %d for %s",
json_typ, json_key);
found = true;
break;
}
if (!found)
json_iter = json_object_iter_next(json_data, json_iter);
}
if (!*buf) {
if (level > 0)
return (char *)invalid_unknown;
else
return strdup(invalid_unknown);
} else
return strdup(buf);
}
/* Send a json msg to ckdb with its idmsg and return the response, consuming
* the json on success */
char *_json_ckdb_call(const ckpool_t *ckp, const char *idmsg, json_t *val, bool logged,
const char *file, const char *func, const int line)
{
char *msg = NULL, *dump, *buf = NULL;
dump = json_dumps(val, JSON_COMPACT);
if (unlikely(!dump)) {
char *invalid = first_invalid(val);
LOGWARNING("Json dump failed in json_ckdb_call from %s %s:%d%s", file, func, line, invalid);
free(invalid);
return buf;
}
ASPRINTF(&msg, "%s.id.json=%s", idmsg, dump);
if (!logged) {
char logname[512];
snprintf(logname, 511, "%s%s", ckp->logdir, ckp->ckdb_name);
rotating_log(logname, msg);
}
free(dump);
LOGDEBUG("Sending ckdb: %s", msg); LOGDEBUG("Sending ckdb: %s", msg);
buf = _send_recv_ckdb(ckp, msg, file, func, line); buf = _send_recv_ckdb(ckp, msg, file, func, line);
LOGDEBUG("Received from ckdb: %s", buf); LOGDEBUG("Received from ckdb: %s", buf);
free(msg);
if (likely(buf))
json_decref(val);
return buf; return buf;
} }
json_t *json_rpc_call(connsock_t *cs, const char *rpc_req) json_t *json_rpc_call(connsock_t *cs, const char *rpc_req)
{ {
char *http_req = NULL; char *http_req = NULL;

6
src/ckpool.h

@ -177,9 +177,9 @@ char *_send_recv_proc(proc_instance_t *pi, const char *msg, const char *file, co
#define send_recv_proc(pi, msg) _send_recv_proc(pi, msg, __FILE__, __func__, __LINE__) #define send_recv_proc(pi, msg) _send_recv_proc(pi, msg, __FILE__, __func__, __LINE__)
char *_send_recv_ckdb(const ckpool_t *ckp, const char *msg, const char *file, const char *func, const int line); char *_send_recv_ckdb(const ckpool_t *ckp, const char *msg, const char *file, const char *func, const int line);
#define send_recv_ckdb(ckp, msg) _send_recv_ckdb(ckp, msg, __FILE__, __func__, __LINE__) #define send_recv_ckdb(ckp, msg) _send_recv_ckdb(ckp, msg, __FILE__, __func__, __LINE__)
char *_json_ckdb_call(const ckpool_t *ckp, const char *idmsg, json_t *val, bool logged, char *_ckdb_msg_call(const ckpool_t *ckp, char *msg, const char *file, const char *func,
const char *file, const char *func, const int line); const int line);
#define json_ckdb_call(ckp, idmsg, val, logged) _json_ckdb_call(ckp, idmsg, val, logged, __FILE__, __func__, __LINE__) #define ckdb_msg_call(ckp, msg) _ckdb_msg_call(ckp, msg, __FILE__, __func__, __LINE__)
json_t *json_rpc_call(connsock_t *cs, const char *rpc_req); json_t *json_rpc_call(connsock_t *cs, const char *rpc_req);

42
src/connector.c

@ -71,14 +71,13 @@ struct sender_send {
client_instance_t *client; client_instance_t *client;
char *buf; char *buf;
int len; int len;
bool polling;
ts_t polltime;
}; };
typedef struct sender_send sender_send_t; typedef struct sender_send sender_send_t;
/* For the linked list of pending sends */ /* For the linked list of pending sends */
static sender_send_t *sender_sends; static sender_send_t *sender_sends;
static sender_send_t *delayed_sends;
/* For protecting the pending sends list */ /* For protecting the pending sends list */
static pthread_mutex_t sender_lock; static pthread_mutex_t sender_lock;
@ -335,34 +334,30 @@ void *sender(void *arg)
sender_send_t *sender_send; sender_send_t *sender_send;
client_instance_t *client; client_instance_t *client;
int ret, fd, ofs = 0; int ret, fd, ofs = 0;
bool polling = false;
mutex_lock(&sender_lock); mutex_lock(&sender_lock);
if (sender_sends && sender_sends->polling) /* Poll every 100ms if there are no new sends */
polling = true; if (!sender_sends) {
if (!sender_sends || polling) { const ts_t polltime = {0, 100000000};
ts_t timeout_ts; ts_t timeout_ts;
if (!polling) {
/* Wait 1 second in pure event driven mode */
ts_realtime(&timeout_ts); ts_realtime(&timeout_ts);
timeout_ts.tv_sec += 1; timeraddspec(&timeout_ts, &polltime);
} else {
/* Poll every 100ms if the head of the list is
* a delayed writer. */
timeout_ts.tv_sec = 0;
timeout_ts.tv_nsec = 100000000;
timeraddspec(&timeout_ts, &sender_sends->polltime);
}
pthread_cond_timedwait(&sender_cond, &sender_lock, &timeout_ts); pthread_cond_timedwait(&sender_cond, &sender_lock, &timeout_ts);
} }
sender_send = sender_sends; sender_send = sender_sends;
if (likely(sender_send)) if (sender_send)
DL_DELETE(sender_sends, sender_send); DL_DELETE(sender_sends, sender_send);
mutex_unlock(&sender_lock); mutex_unlock(&sender_lock);
if (!sender_send) /* Service delayed sends only if we have timed out on the
* conditional with no new sends appearing. */
if (!sender_send) {
if (!delayed_sends)
continue; continue;
sender_send = delayed_sends;
DL_DELETE(delayed_sends, sender_send);
}
client = sender_send->client; client = sender_send->client;
@ -390,13 +385,10 @@ void *sender(void *arg)
} }
LOGDEBUG("Client %d not ready for writes", client->id); LOGDEBUG("Client %d not ready for writes", client->id);
/* Append it to the tail of the list */ /* Append it to the tail of the delayed sends list.
mutex_lock(&sender_lock); * This is the only function that alters it so no
DL_APPEND(sender_sends, sender_send); * locking is required. */
mutex_unlock(&sender_lock); DL_APPEND(delayed_sends, sender_send);
sender_send->polling = true;
ts_realtime(&sender_send->polltime);
continue; continue;
} }
while (sender_send->len) { while (sender_send->len) {

61
src/libckpool.c

@ -258,6 +258,67 @@ void _ck_wunlock(cklock_t *lock, const char *file, const char *func, const int l
_mutex_unlock(&lock->mutex, file, func, line); _mutex_unlock(&lock->mutex, file, func, line);
} }
void _cksem_init(sem_t *sem, const char *file, const char *func, const int line)
{
int ret;
if ((ret = sem_init(sem, 0, 0)))
quitfrom(1, file, func, line, "Failed to sem_init ret=%d errno=%d", ret, errno);
}
void _cksem_post(sem_t *sem, const char *file, const char *func, const int line)
{
if (unlikely(sem_post(sem)))
quitfrom(1, file, func, line, "Failed to sem_post errno=%d sem=0x%p", errno, sem);
}
void _cksem_wait(sem_t *sem, const char *file, const char *func, const int line)
{
retry:
if (unlikely(sem_wait(sem))) {
if (errno == EINTR)
goto retry;
quitfrom(1, file, func, line, "Failed to sem_wait errno=%d sem=0x%p", errno, sem);
}
}
int _cksem_mswait(sem_t *sem, int ms, const char *file, const char *func, const int line)
{
ts_t abs_timeout, ts_now;
tv_t tv_now;
int ret;
tv_time(&tv_now);
tv_to_ts(&ts_now, &tv_now);
ms_to_ts(&abs_timeout, ms);
retry:
timeraddspec(&abs_timeout, &ts_now);
ret = sem_timedwait(sem, &abs_timeout);
if (ret) {
if (likely(errno == ETIMEDOUT))
return ETIMEDOUT;
if (errno == EINTR)
goto retry;
quitfrom(1, file, func, line, "Failed to sem_timedwait errno=%d sem=0x%p", errno, sem);
}
return 0;
}
void cksem_reset(sem_t *sem)
{
int ret;
do {
ret = sem_trywait(sem);
if (unlikely(ret < 0 && (errno == EINTR)))
ret = 0;
} while (!ret);
}
void cksem_destroy(sem_t *sem)
{
sem_destroy(sem);
}
bool extract_sockaddr(char *url, char **sockaddr_url, char **sockaddr_port) bool extract_sockaddr(char *url, char **sockaddr_url, char **sockaddr_port)
{ {

13
src/libckpool.h

@ -21,6 +21,7 @@
#include <stdio.h> #include <stdio.h>
#include <string.h> #include <string.h>
#include <syslog.h> #include <syslog.h>
#include <semaphore.h>
#if HAVE_BYTESWAP_H #if HAVE_BYTESWAP_H
# include <byteswap.h> # include <byteswap.h>
@ -356,6 +357,18 @@ void _ck_dlock(cklock_t *lock, const char *file, const char *func, const int lin
void _ck_runlock(cklock_t *lock, const char *file, const char *func, const int line); void _ck_runlock(cklock_t *lock, const char *file, const char *func, const int line);
void _ck_wunlock(cklock_t *lock, const char *file, const char *func, const int line); void _ck_wunlock(cklock_t *lock, const char *file, const char *func, const int line);
void _cksem_init(sem_t *sem, const char *file, const char *func, const int line);
void _cksem_post(sem_t *sem, const char *file, const char *func, const int line);
void _cksem_wait(sem_t *sem, const char *file, const char *func, const int line);
int _cksem_mswait(sem_t *sem, int ms, const char *file, const char *func, const int line);
void cksem_reset(sem_t *sem);
void cksem_destroy(sem_t *sem);
#define cksem_init(_sem) _cksem_init(_sem, __FILE__, __func__, __LINE__)
#define cksem_post(_sem) _cksem_post(_sem, __FILE__, __func__, __LINE__)
#define cksem_wait(_sem) _cksem_wait(_sem, __FILE__, __func__, __LINE__)
#define cksem_mswait(_sem, _timeout) _cksem_mswait(_sem, _timeout, __FILE__, __func__, __LINE__)
static inline bool sock_connecting(void) static inline bool sock_connecting(void)
{ {
return errno == EINPROGRESS; return errno == EINPROGRESS;

89
src/stratifier.c

@ -170,13 +170,6 @@ struct json_params {
typedef struct json_params json_params_t; typedef struct json_params json_params_t;
struct ckdb_msg {
json_t *val;
int idtype;
};
typedef struct ckdb_msg ckdb_msg_t;
/* Stratum json messages with their associated client id */ /* Stratum json messages with their associated client id */
struct smsg { struct smsg {
json_t *json_msg; json_t *json_msg;
@ -415,12 +408,32 @@ static void purge_share_hashtable(int64_t wb_id)
static char *status_chars = "|/-\\"; static char *status_chars = "|/-\\";
/* Absorbs the json and generates a ckdb json message, logs it to the ckdb
* log and returns the malloced message. */
static char *ckdb_msg(ckpool_t *ckp, json_t *val, const int idtype)
{
char *json_msg = json_dumps(val, JSON_COMPACT);
char logname[512];
char *ret = NULL;
if (unlikely(!json_msg))
goto out;
ASPRINTF(&ret, "%s.id.json=%s", ckdb_ids[idtype], json_msg);
free(json_msg);
out:
json_decref(val);
snprintf(logname, 511, "%s%s", ckp->logdir, ckp->ckdb_name);
rotating_log(logname, ret);
return ret;
}
static void _ckdbq_add(ckpool_t *ckp, const int idtype, json_t *val, const char *file, static void _ckdbq_add(ckpool_t *ckp, const int idtype, json_t *val, const char *file,
const char *func, const int line) const char *func, const int line)
{ {
static int counter = 0;
static time_t time_counter; static time_t time_counter;
ckdb_msg_t *msg; static int counter = 0;
char *json_msg;
time_t now_t; time_t now_t;
char ch; char ch;
@ -441,10 +454,13 @@ static void _ckdbq_add(ckpool_t *ckp, const int idtype, json_t *val, const char
if (ckp->standalone) if (ckp->standalone)
return json_decref(val); return json_decref(val);
msg = ckalloc(sizeof(ckdb_msg_t)); json_msg = ckdb_msg(ckp, val, idtype);
msg->val = val; if (unlikely(!json_msg)) {
msg->idtype = idtype; LOGWARNING("Failed to dump json from %s %s:%d", file, func, line);
ckmsgq_add(ckdbq, msg); return;
}
ckmsgq_add(ckdbq, json_msg);
} }
#define ckdbq_add(ckp, idtype, val) _ckdbq_add(ckp, idtype, val, __FILE__, __func__, __LINE__) #define ckdbq_add(ckp, idtype, val) _ckdbq_add(ckp, idtype, val, __FILE__, __func__, __LINE__)
@ -1249,13 +1265,13 @@ static user_instance_t *authorise_user(const char *workername)
* and get SUID parameters back. We don't add these requests to the ckdbqueue * and get SUID parameters back. We don't add these requests to the ckdbqueue
* since we have to wait for the response but this is done from the authoriser * since we have to wait for the response but this is done from the authoriser
* thread so it won't hold anything up but other authorisations. */ * thread so it won't hold anything up but other authorisations. */
static bool send_recv_auth(stratum_instance_t *client) static int send_recv_auth(stratum_instance_t *client)
{ {
ckpool_t *ckp = client->ckp; ckpool_t *ckp = client->ckp;
char *buf, *json_msg;
char cdfield[64]; char cdfield[64];
bool ret = false; int ret = 1;
json_t *val; json_t *val;
char *buf;
ts_t now; ts_t now;
ts_realtime(&now); ts_realtime(&now);
@ -1272,7 +1288,13 @@ static bool send_recv_auth(stratum_instance_t *client)
"createby", "code", "createby", "code",
"createcode", __func__, "createcode", __func__,
"createinet", client->address); "createinet", client->address);
buf = json_ckdb_call(ckp, ckdb_ids[ID_AUTH], val, false); json_msg = ckdb_msg(ckp, val, ID_AUTH);
if (unlikely(!json_msg)) {
LOGWARNING("Failed to dump json in send_recv_auth");
return ret;
}
buf = ckdb_msg_call(ckp, json_msg);
free(json_msg);
if (likely(buf)) { if (likely(buf)) {
char *secondaryuserid, *response = alloca(128); char *secondaryuserid, *response = alloca(128);
@ -1284,17 +1306,18 @@ static bool send_recv_auth(stratum_instance_t *client)
response, secondaryuserid); response, secondaryuserid);
if (!safecmp(response, "ok") && secondaryuserid) { if (!safecmp(response, "ok") && secondaryuserid) {
client->secondaryuserid = strdup(secondaryuserid); client->secondaryuserid = strdup(secondaryuserid);
ret = true; ret = 0;
} }
} else { } else {
ret = -1;
LOGWARNING("Got no auth response from ckdb :("); LOGWARNING("Got no auth response from ckdb :(");
json_decref(val);
} }
return ret; return ret;
} }
static json_t *parse_authorise(stratum_instance_t *client, json_t *params_val, json_t **err_val, const char *address) static json_t *parse_authorise(stratum_instance_t *client, json_t *params_val, json_t **err_val,
const char *address, int *errnum)
{ {
bool ret = false; bool ret = false;
const char *buf; const char *buf;
@ -1334,8 +1357,11 @@ static json_t *parse_authorise(stratum_instance_t *client, json_t *params_val, j
client->workername = strdup(buf); client->workername = strdup(buf);
if (client->ckp->standalone) if (client->ckp->standalone)
ret = true; ret = true;
else else {
ret = send_recv_auth(client); *errnum = send_recv_auth(client);
if (!*errnum)
ret = true;
}
client->authorised = ret; client->authorised = ret;
if (client->authorised) if (client->authorised)
inc_worker(client->user_instance); inc_worker(client->user_instance);
@ -2127,7 +2153,7 @@ static void sauth_process(ckpool_t *ckp, json_params_t *jp)
{ {
json_t *result_val, *json_msg, *err_val = NULL; json_t *result_val, *json_msg, *err_val = NULL;
stratum_instance_t *client; stratum_instance_t *client;
int client_id; int client_id, errnum = 0;
client_id = jp->client_id; client_id = jp->client_id;
@ -2139,15 +2165,19 @@ static void sauth_process(ckpool_t *ckp, json_params_t *jp)
LOGINFO("Authoriser failed to find client id %d in hashtable!", client_id); LOGINFO("Authoriser failed to find client id %d in hashtable!", client_id);
goto out; goto out;
} }
result_val = parse_authorise(client, jp->params, &err_val, jp->address); result_val = parse_authorise(client, jp->params, &err_val, jp->address, &errnum);
if (json_is_true(result_val)) { if (json_is_true(result_val)) {
char *buf; char *buf;
ASPRINTF(&buf, "Authorised, welcome to %s %s!", ckp->name, ASPRINTF(&buf, "Authorised, welcome to %s %s!", ckp->name,
client->user_instance->username); client->user_instance->username);
stratum_send_message(client, buf); stratum_send_message(client, buf);
} else } else {
if (errnum < 0)
stratum_send_message(client, "Authorisations temporarily offline :(");
else
stratum_send_message(client, "Failed authorisation :("); stratum_send_message(client, "Failed authorisation :(");
}
json_msg = json_object(); json_msg = json_object();
json_object_set_new_nocheck(json_msg, "result", result_val); json_object_set_new_nocheck(json_msg, "result", result_val);
json_object_set_new_nocheck(json_msg, "error", err_val ? err_val : json_null()); json_object_set_new_nocheck(json_msg, "error", err_val ? err_val : json_null());
@ -2158,14 +2188,13 @@ out:
} }
static void ckdbq_process(ckpool_t *ckp, ckdb_msg_t *data) static void ckdbq_process(ckpool_t *ckp, char *msg)
{ {
static bool failed = false; static bool failed = false;
bool logged = false;
char *buf = NULL; char *buf = NULL;
while (!buf) { while (!buf) {
buf = json_ckdb_call(ckp, ckdb_ids[data->idtype], data->val, logged); buf = ckdb_msg_call(ckp, msg);
if (unlikely(!buf)) { if (unlikely(!buf)) {
if (!failed) { if (!failed) {
failed = true; failed = true;
@ -2173,13 +2202,13 @@ static void ckdbq_process(ckpool_t *ckp, ckdb_msg_t *data)
} }
sleep(5); sleep(5);
} }
logged = true;
} }
free(msg);
if (failed) { if (failed) {
failed = false; failed = false;
LOGWARNING("Successfully resumed talking to ckdb"); LOGWARNING("Successfully resumed talking to ckdb");
} }
LOGINFO("Got %s ckdb response: %s", ckdb_ids[data->idtype], buf); LOGINFO("Got ckdb response: %s", buf);
free(buf); free(buf);
} }

Loading…
Cancel
Save