kanoi 10 years ago
parent
commit
28cc00ae4b
  1. 22
      src/ckdb.c
  2. 25
      src/generator.c
  3. 10
      src/libckpool.c
  4. 11
      src/libckpool.h
  5. 80
      src/stratifier.c

22
src/ckdb.c

@ -776,6 +776,8 @@ typedef struct logqueue {
static K_LIST *logqueue_free;
static K_STORE *logqueue_store;
static pthread_mutex_t wq_waitlock;
static pthread_cond_t wq_waitcond;
// WORKQUEUE
typedef struct workqueue {
@ -6353,6 +6355,8 @@ static bool setup_data()
cklock_init(&fpm_lock);
cksem_init(&socketer_sem);
mutex_init(&wq_waitlock);
cond_init(&wq_waitcond);
alloc_storage();
@ -8668,6 +8672,9 @@ static void *socketer(__maybe_unused void *arg)
K_WLOCK(workqueue_free);
k_add_tail(workqueue_store, item);
K_WUNLOCK(workqueue_free);
mutex_lock(&wq_waitlock);
pthread_cond_signal(&wq_waitcond);
mutex_unlock(&wq_waitlock);
break;
// Code error
default:
@ -9046,8 +9053,19 @@ static void *listener(void *arg)
if (wq_item) {
process_queued(conn, wq_item);
tick();
} else
cksleep_ms(4);
} else {
const ts_t tsdiff = {0, 420000000};
tv_t now;
ts_t abs;
tv_time(&now);
tv_to_ts(&abs, &now);
timeraddspec(&abs, &tsdiff);
mutex_lock(&wq_waitlock);
pthread_cond_timedwait(&wq_waitcond, &wq_waitlock, &abs);
mutex_unlock(&wq_waitlock);
}
}
PQfinish(conn);

25
src/generator.c

@ -237,6 +237,11 @@ retry:
}
} while (selret < 1);
if (unlikely(cs->fd < 0)) {
LOGWARNING("Bitcoind socket invalidated, will atempt failover");
goto reconnect;
}
sockd = accept(us->sockd, NULL, NULL);
if (sockd < 0) {
LOGEMERG("Failed to accept on generator socket");
@ -523,19 +528,19 @@ static bool subscribe_stratum(connsock_t *cs, proxy_instance_t *proxi)
retry:
/* Attempt to reconnect if the pool supports resuming */
if (proxi->sessionid) {
req = json_pack("{s:i,s:s,s:[s,s]}",
JSON_CPACK(req, "{s:i,s:s,s:[s,s]}",
"id", proxi->id++,
"method", "mining.subscribe",
"params", PACKAGE"/"VERSION, proxi->sessionid);
/* Then attempt to connect with just the client description */
} else if (!proxi->no_params) {
req = json_pack("{s:i,s:s,s:[s]}",
JSON_CPACK(req, "{s:i,s:s,s:[s]}",
"id", proxi->id++,
"method", "mining.subscribe",
"params", PACKAGE"/"VERSION);
/* Then try without any parameters */
} else {
req = json_pack("{s:i,s:s,s:[]}",
JSON_CPACK(req, "{s:i,s:s,s:[]}",
"id", proxi->id++,
"method", "mining.subscribe",
"params");
@ -580,7 +585,7 @@ static bool passthrough_stratum(connsock_t *cs, proxy_instance_t *proxi)
json_t *req, *val = NULL, *res_val;
bool ret = false;
req = json_pack("{s:s,s:[s]}",
JSON_CPACK(req, "{s:s,s:[s]}",
"method", "mining.passthrough",
"params", PACKAGE"/"VERSION);
ret = send_json_msg(cs, req);
@ -701,7 +706,7 @@ static bool send_version(proxy_instance_t *proxi, json_t *val)
connsock_t *cs = proxi->cs;
bool ret;
json_msg = json_pack("{sossso}", "id", id_val, "result", PACKAGE"/"VERSION,
JSON_CPACK(json_msg, "{sossso}", "id", id_val, "result", PACKAGE"/"VERSION,
"error", json_null());
ret = send_json_msg(cs, json_msg);
json_decref(json_msg);
@ -864,7 +869,7 @@ static bool auth_stratum(connsock_t *cs, proxy_instance_t *proxi)
json_t *val = NULL, *res_val, *req;
bool ret;
req = json_pack("{s:i,s:s,s:[s,s]}",
JSON_CPACK(req, "{s:i,s:s,s:[s,s]}",
"id", proxi->id++,
"method", "mining.authorize",
"params", proxi->auth, proxi->pass);
@ -913,7 +918,7 @@ static void send_subscribe(proxy_instance_t *proxi, int sockd)
json_t *json_msg;
char *msg;
json_msg = json_pack("{sssi}", "enonce1", proxi->enonce1,
JSON_CPACK(json_msg, "{sssi}", "enonce1", proxi->enonce1,
"nonce2len", proxi->nonce2len);
msg = json_dumps(json_msg, JSON_NO_UTF8);
json_decref(json_msg);
@ -936,7 +941,7 @@ static void send_notify(proxy_instance_t *proxi, int sockd)
for (i = 0; i < ni->merkles; i++)
json_array_append_new(merkle_arr, json_string(&ni->merklehash[i][0]));
/* Use our own jobid instead of the server's one for easy lookup */
json_msg = json_pack("{sisssssssosssssssb}",
JSON_CPACK(json_msg, "{sisssssssosssssssb}",
"jobid", ni->id, "prevhash", ni->prevhash,
"coinbase1", ni->coinbase1, "coinbase2", ni->coinbase2,
"merklehash", merkle_arr, "bbversion", ni->bbversion,
@ -956,7 +961,7 @@ static void send_diff(proxy_instance_t *proxi, int sockd)
json_t *json_msg;
char *msg;
json_msg = json_pack("{sf}", "diff", proxi->diff);
JSON_CPACK(json_msg, "{sf}", "diff", proxi->diff);
msg = json_dumps(json_msg, JSON_NO_UTF8);
json_decref(json_msg);
send_unix_msg(sockd, msg);
@ -1156,7 +1161,7 @@ static void *proxy_send(void *arg)
mutex_unlock(&proxi->notify_lock);
if (jobid) {
val = json_pack("{s[ssooo]soss}", "params", proxi->auth, jobid,
JSON_CPACK(val, "{s[ssooo]soss}", "params", proxi->auth, jobid,
json_object_dup(msg->json_msg, "nonce2"),
json_object_dup(msg->json_msg, "ntime"),
json_object_dup(msg->json_msg, "nonce"),

10
src/libckpool.c

@ -933,6 +933,16 @@ out:
}
void _json_check(json_t *val, json_error_t *err, const char *file, const char *func, const int line)
{
if (likely(val))
return;
LOGERR("Invalid json line:%d col:%d pos:%d text: %s from %s %s:%d",
err->line, err->column, err->position, err->text,
file, func, line);
}
/* Extracts a string value from a json array with error checking. To be used
* when the value of the string returned is only examined and not to be stored.
* See json_array_string below */

11
src/libckpool.h

@ -266,6 +266,17 @@ typedef struct unixsock unixsock_t;
typedef struct proc_instance proc_instance_t;
void _json_check(json_t *val, json_error_t *err, const char *file, const char *func, const int line);
#define json_check(val, err) _json_check(val, err, __FILE__, __func__, __LINE__)
/* Check and pack json */
#define JSON_CPACK(val, ...) do { \
json_error_t err; \
val = json_pack_ex(&err, 0, ##__VA_ARGS__); \
json_check(val, &err); \
} while (0)
/* No error checking with these, make sure we know they're valid already! */
static inline void json_strcpy(char *buf, json_t *val, const char *key)
{

80
src/stratifier.c

@ -191,7 +191,7 @@ struct user_instance {
UT_hash_handle hh;
char username[128];
int64_t id;
bool new_user;
char *secondaryuserid;
int workers;
};
@ -231,7 +231,6 @@ struct stratum_instance {
user_instance_t *user_instance;
char *useragent;
char *workername;
char *secondaryuserid;
int64_t user_id;
ckpool_t *ckp;
@ -433,11 +432,15 @@ static void _ckdbq_add(ckpool_t *ckp, const int idtype, json_t *val, const char
{
static time_t time_counter;
static int counter = 0;
char *json_msg;
time_t now_t;
char ch;
if (unlikely(!val)) {
LOGWARNING("Invalid json sent to ckdbq_add from %s %s:%d", file, func, line);
return;
}
now_t = time(NULL);
if (now_t != time_counter) {
/* Rate limit to 1 update per second */
@ -447,11 +450,6 @@ static void _ckdbq_add(ckpool_t *ckp, const int idtype, json_t *val, const char
fflush(stdout);
}
if (!val) {
LOGWARNING("Invalid json sent to ckdbq_add from %s %s:%d", file, func, line);
return;
}
if (ckp->standalone)
return json_decref(val);
@ -473,7 +471,7 @@ static void send_workinfo(ckpool_t *ckp, workbase_t *wb)
sprintf(cdfield, "%lu,%lu", wb->gentime.tv_sec, wb->gentime.tv_nsec);
val = json_pack("{sI,ss,ss,ss,ss,ss,ss,ss,ss,sI,so,ss,ss,ss,ss}",
JSON_CPACK(val, "{sI,ss,ss,ss,ss,ss,ss,ss,ss,sI,so,ss,ss,ss,ss}",
"workinfoid", wb->id,
"poolinstance", ckp->name,
"transactiontree", wb->txn_hashes,
@ -501,7 +499,7 @@ static void send_ageworkinfo(ckpool_t *ckp, int64_t id)
ts_realtime(&ts_now);
sprintf(cdfield, "%lu,%lu", ts_now.tv_sec, ts_now.tv_nsec);
val = json_pack("{sI,ss,ss,ss,ss,ss}",
JSON_CPACK(val, "{sI,ss,ss,ss,ss,ss}",
"workinfoid", id,
"poolinstance", ckp->name,
"createdate", cdfield,
@ -966,7 +964,7 @@ static void stratum_broadcast_message(const char *msg)
{
json_t *json_msg;
json_msg = json_pack("{sosss[s]}", "id", json_null(), "method", "client.show_message",
JSON_CPACK(json_msg, "{sosss[s]}", "id", json_null(), "method", "client.show_message",
"params", msg);
stratum_broadcast(json_msg);
}
@ -985,7 +983,7 @@ static void block_solve(ckpool_t *ckp)
ASPRINTF(&msg, "Block %d solved by %s!", current_workbase->height, ckp->name);
/* We send blank settings to ckdb with only the matching data from what we submitted
* to say the block has been confirmed. */
val = json_pack("{si,ss,sI,ss,ss,si,ss,ss,ss,sI,ss,ss,ss,ss}",
JSON_CPACK(val, "{si,ss,sI,ss,ss,si,ss,ss,ss,sI,ss,ss,ss,ss}",
"height", current_workbase->height,
"confirmed", "1",
"workinfoid", current_workbase->id,
@ -1226,7 +1224,7 @@ static json_t *parse_subscribe(int64_t client_id, json_t *params_val)
n2len = workbases->enonce2varlen;
else
n2len = 8;
ret = json_pack("[[[s,s]],s,i]", "mining.notify", client->enonce1, client->enonce1,
JSON_CPACK(ret, "[[[s,s]],s,i]", "mining.notify", client->enonce1, client->enonce1,
n2len);
ck_runlock(&workbase_lock);
@ -1248,10 +1246,9 @@ static user_instance_t *authorise_user(const char *workername)
ck_ilock(&instance_lock);
HASH_FIND_STR(user_instances, username, instance);
if (!instance) {
/* New user instance */
/* New user instance. Secondary user id will be NULL */
instance = ckzalloc(sizeof(user_instance_t));
strcpy(instance->username, username);
instance->new_user = true;
ck_ulock(&instance_lock);
instance->id = user_instance_id++;
@ -1269,6 +1266,7 @@ static user_instance_t *authorise_user(const char *workername)
* thread so it won't hold anything up but other authorisations. */
static int send_recv_auth(stratum_instance_t *client)
{
user_instance_t *user_instance = client->user_instance;
ckpool_t *ckp = client->ckp;
char *buf, *json_msg;
char cdfield[64];
@ -1279,8 +1277,8 @@ static int send_recv_auth(stratum_instance_t *client)
ts_realtime(&now);
sprintf(cdfield, "%lu,%lu", now.tv_sec, now.tv_nsec);
val = json_pack("{ss,ss,ss,ss,sI,ss,sb,ss,ss,ss,ss}",
"username", client->user_instance->username,
JSON_CPACK(val, "{ss,ss,ss,ss,sI,ss,sb,ss,ss,ss,ss}",
"username", user_instance->username,
"workername", client->workername,
"poolinstance", ckp->name,
"useragent", client->useragent,
@ -1305,10 +1303,11 @@ static int send_recv_auth(stratum_instance_t *client)
secondaryuserid = response;
strsep(&secondaryuserid, ".");
LOGINFO("User %s Worker %s got auth response: %s suid: %s",
client->user_instance->username, client->workername,
user_instance->username, client->workername,
response, secondaryuserid);
if (!safecmp(response, "ok") && secondaryuserid) {
client->secondaryuserid = strdup(secondaryuserid);
if (!user_instance->secondaryuserid)
user_instance->secondaryuserid = strdup(secondaryuserid);
ret = 0;
}
} else {
@ -1332,7 +1331,7 @@ static void queue_delayed_auth(stratum_instance_t *client)
ts_realtime(&now);
sprintf(cdfield, "%lu,%lu", now.tv_sec, now.tv_nsec);
val = json_pack("{ss,ss,ss,ss,sI,ss,sb,ss,ss,ss,ss}",
JSON_CPACK(val, "{ss,ss,ss,ss,sI,ss,sb,ss,ss,ss,ss}",
"username", client->user_instance->username,
"workername", client->workername,
"poolinstance", ckp->name,
@ -1393,7 +1392,7 @@ static json_t *parse_authorise(stratum_instance_t *client, json_t *params_val, j
*errnum = send_recv_auth(client);
if (!*errnum)
ret = true;
else if (*errnum < 0 && !user_instance->new_user) {
else if (*errnum < 0 && user_instance->secondaryuserid) {
/* This user has already been authorised but ckdb is
* offline so we assume they already exist but add the
* auth request to the queued messages. */
@ -1402,10 +1401,8 @@ static json_t *parse_authorise(stratum_instance_t *client, json_t *params_val, j
}
}
client->authorised = ret;
if (client->authorised) {
if (client->authorised)
inc_worker(user_instance);
user_instance->new_user = false;
}
out:
return json_boolean(ret);
}
@ -1414,7 +1411,7 @@ static void stratum_send_diff(stratum_instance_t *client)
{
json_t *json_msg;
json_msg = json_pack("{s[I]soss}", "params", client->diff, "id", json_null(),
JSON_CPACK(json_msg, "{s[I]soss}", "params", client->diff, "id", json_null(),
"method", "mining.set_difficulty");
stratum_add_send(json_msg, client->id);
}
@ -1423,7 +1420,7 @@ static void stratum_send_message(stratum_instance_t *client, const char *msg)
{
json_t *json_msg;
json_msg = json_pack("{sosss[s]}", "id", json_null(), "method", "client.show_message",
JSON_CPACK(json_msg, "{sosss[s]}", "id", json_null(), "method", "client.show_message",
"params", msg);
stratum_add_send(json_msg, client->id);
}
@ -1587,7 +1584,7 @@ test_blocksolve(stratum_instance_t *client, workbase_t *wb, const uchar *data, c
flip_32(swap, hash);
__bin2hex(blockhash, swap, 32);
val = json_pack("{si,ss,ss,sI,ss,ss,sI,ss,ss,ss,sI,ss,ss,ss,ss}",
JSON_CPACK(val, "{si,ss,ss,sI,ss,ss,sI,ss,ss,ss,sI,ss,ss,ss,ss}",
"height", wb->height,
"blockhash", blockhash,
"confirmed", "n",
@ -1698,7 +1695,7 @@ static void submit_share(stratum_instance_t *client, int64_t jobid, const char *
char *msg;
sprintf(enonce2, "%s%s", client->enonce1var, nonce2);
json_msg = json_pack("{sisssssssIsi}", "jobid", jobid, "nonce2", enonce2,
JSON_CPACK(json_msg, "{sisssssssIsi}", "jobid", jobid, "nonce2", enonce2,
"ntime", ntime, "nonce", nonce, "client_id", client->id,
"msg_id", msg_id);
msg = json_dumps(json_msg, 0);
@ -1713,6 +1710,7 @@ static json_t *parse_submit(stratum_instance_t *client, json_t *json_msg,
json_t *params_val, json_t **err_val)
{
bool share = false, result = false, invalid = true, submit = false;
user_instance_t *user_instance = client->user_instance;
const char *user, *job_id, *nonce2, *ntime, *nonce;
double diff = client->diff, wdiff = 0, sdiff = -1;
char hexhash[68] = {}, sharehash[32], cdfield[64];
@ -1856,7 +1854,7 @@ out_unlock:
json_set_int(val, "workinfoid", id);
json_set_int(val, "clientid", client->id);
json_set_string(val, "enonce1", client->enonce1);
json_set_string(val, "secondaryuserid", client->secondaryuserid);
json_set_string(val, "secondaryuserid", user_instance->secondaryuserid);
json_set_string(val, "nonce2", nonce2);
json_set_string(val, "nonce", nonce);
json_set_string(val, "ntime", ntime);
@ -1872,7 +1870,7 @@ out_unlock:
json_set_string(val, "createcode", __func__);
json_set_string(val, "createinet", ckp->serverurl);
json_set_string(val, "workername", client->workername);
json_set_string(val, "username", client->user_instance->username);
json_set_string(val, "username", user_instance->username);
if (ckp->logshares) {
fp = fopen(fname, "a");
@ -1890,13 +1888,13 @@ out_unlock:
ckdbq_add(ckp, ID_SHARES, val);
out:
if (!share) {
val = json_pack("{sI,ss,ss,sI,ss,ss,so,si,ss,ss,ss,ss}",
JSON_CPACK(val, "{sI,ss,ss,sI,ss,ss,so,si,ss,ss,ss,ss}",
"clientid", client->id,
"secondaryuserid", client->secondaryuserid,
"secondaryuserid", user_instance->secondaryuserid,
"enonce1", client->enonce1,
"workinfoid", current_workbase->id,
"workername", client->workername,
"username", client->user_instance->username,
"username", user_instance->username,
"error", json_copy(*err_val),
"errn", err,
"createdate", cdfield,
@ -1915,7 +1913,7 @@ static json_t *__stratum_notify(bool clean)
{
json_t *val;
val = json_pack("{s:[ssssosssb],s:o,s:s}",
JSON_CPACK(val, "{s:[ssssosssb],s:o,s:s}",
"params",
current_workbase->idstring,
current_workbase->prevhash,
@ -1958,7 +1956,7 @@ static void send_json_err(int64_t client_id, json_t *id_val, const char *err_msg
{
json_t *val;
val = json_pack("{soss}", "id", json_copy(id_val), "error", err_msg);
JSON_CPACK(val, "{soss}", "id", json_copy(id_val), "error", err_msg);
stratum_add_send(val, client_id);
}
@ -2318,7 +2316,7 @@ static void update_userstats(ckpool_t *ckp)
ghs5 = client->dsps5 * nonces;
ghs60 = client->dsps60 * nonces;
ghs1440 = client->dsps1440 * nonces;
val = json_pack("{ss,sI,si,ss,ss,sf,sf,sf,sf,sb,ss,ss,ss,ss}",
JSON_CPACK(val, "{ss,sI,si,ss,ss,sf,sf,sf,sf,sb,ss,ss,ss,ss}",
"poolinstance", ckp->name,
"instanceid", client->id,
"elapsed", elapsed,
@ -2406,7 +2404,7 @@ static void *statsupdate(void *arg)
if (unlikely(!fp))
LOGERR("Failed to fopen %s", fname);
val = json_pack("{si,si,si}",
JSON_CPACK(val, "{si,si,si}",
"runtime", diff.tv_sec,
"Users", stats.users,
"Workers", stats.workers);
@ -2416,7 +2414,7 @@ static void *statsupdate(void *arg)
fprintf(fp, "%s\n", s);
dealloc(s);
val = json_pack("{ss,ss,ss,ss,ss,ss}",
JSON_CPACK(val, "{ss,ss,ss,ss,ss,ss}",
"hashrate1m", suffix1,
"hashrate5m", suffix5,
"hashrate15m", suffix15,
@ -2429,7 +2427,7 @@ static void *statsupdate(void *arg)
fprintf(fp, "%s\n", s);
dealloc(s);
val = json_pack("{sf,sf,sf,sf}",
JSON_CPACK(val, "{sf,sf,sf,sf}",
"SPS1m", sps1,
"SPS5m", sps5,
"SPS15m", sps15,
@ -2469,7 +2467,7 @@ static void *statsupdate(void *arg)
ghs = client->dsps1440 * nonces;
suffix_string(ghs, suffix1440, 16, 0);
val = json_pack("{ss,ss,ss,ss}",
JSON_CPACK(val, "{ss,ss,ss,ss}",
"hashrate1m", suffix1,
"hashrate5m", suffix5,
"hashrate1hr", suffix60,
@ -2495,7 +2493,7 @@ static void *statsupdate(void *arg)
ts_realtime(&ts_now);
sprintf(cdfield, "%lu,%lu", ts_now.tv_sec, ts_now.tv_nsec);
val = json_pack("{ss,si,si,si,sf,sf,sf,sf,ss,ss,ss,ss}",
JSON_CPACK(val, "{ss,si,si,si,sf,sf,sf,sf,ss,ss,ss,ss}",
"poolinstance", ckp->name,
"elapsed", diff.tv_sec,
"users", stats.users,

Loading…
Cancel
Save