Browse Source

Create a send processing queue in the proxy generator and receive shares that meet the parent pool target

master
Con Kolivas 11 years ago
parent
commit
d265e51664
  1. 50
      src/generator.c
  2. 34
      src/stratifier.c
  3. 10
      src/stratifier.h

50
src/generator.c

@ -18,7 +18,9 @@
#include "libckpool.h"
#include "generator.h"
#include "bitcoin.h"
#include "stratifier.h"
#include "uthash.h"
#include "utlist.h"
struct notify_instance {
/* Hash table data */
@ -76,7 +78,10 @@ struct proxy_instance {
pthread_t pth_precv;
pthread_t pth_psend;
pthread_mutex_t psend_lock;
pthread_cond_t psend_cond;
stratum_msg_t *psends;
};
typedef struct proxy_instance proxy_instance_t;
@ -718,6 +723,19 @@ static void send_diff(proxy_instance_t *proxi, int sockd)
close(sockd);
}
static void submit_share(proxy_instance_t *proxi, json_t *val)
{
stratum_msg_t *msg;
msg = ckzalloc(sizeof(stratum_msg_t));
msg->json_msg = val;
mutex_lock(&proxi->psend_lock);
DL_APPEND(proxi->psends, msg);
pthread_cond_signal(&proxi->psend_cond);
mutex_unlock(&proxi->psend_lock);
}
static int proxy_loop(proc_instance_t *pi, connsock_t *cs, proxy_instance_t *proxi)
{
unixsock_t *us = &pi->us;
@ -760,6 +778,14 @@ retry:
} else if (!strncasecmp(buf, "ping", 4)) {
LOGDEBUG("Proxy received ping request");
send_unix_msg(sockd, "pong");
} else {
/* Anything remaining should be share submissions */
json_t *val = json_loads(buf, 0, NULL);
if (!val)
LOGWARNING("Received unrecognised message: %s", buf);
else
submit_share(proxi, val);
}
close(sockd);
goto retry;
@ -842,11 +868,34 @@ static void *proxy_recv(void *arg)
return NULL;
}
/* For processing and sending shares */
static void *proxy_send(void *arg)
{
proxy_instance_t *proxi = (proxy_instance_t *)arg;
rename_proc("proxysend");
while (42) {
stratum_msg_t *msg;
char *buf;
mutex_lock(&proxi->psend_lock);
if (!proxi->psends)
pthread_cond_wait(&proxi->psend_cond, &proxi->psend_lock);
msg = proxi->psends;
if (likely(msg))
DL_DELETE(proxi->psends, msg);
mutex_unlock(&proxi->psend_lock);
if (unlikely(!msg))
continue;
buf = json_dumps(msg->json_msg, 0);
LOGDEBUG("Proxysend received: %s", buf);
json_decref(msg->json_msg);
free(msg);
}
return NULL;
}
@ -883,6 +932,7 @@ static int proxy_mode(ckpool_t *ckp, proc_instance_t *pi, connsock_t *cs,
mutex_init(&proxi.notify_lock);
create_pthread(&proxi.pth_precv, proxy_recv, &proxi);
mutex_init(&proxi.psend_lock);
cond_init(&proxi.psend_cond);
create_pthread(&proxi.pth_psend, proxy_send, &proxi);

34
src/stratifier.c

@ -20,9 +20,10 @@
#include "ckpool.h"
#include "libckpool.h"
#include "bitcoin.h"
#include "sha2.h"
#include "stratifier.h"
#include "uthash.h"
#include "utlist.h"
#include "sha2.h"
static const char *workpadding = "000000800000000000000000000000000000000000000000000000000000000000000000000000000000000080020000";
@ -155,16 +156,6 @@ static uint64_t workbase_id;
static uint64_t blockchange_id;
static char lasthash[68];
struct stratum_msg {
struct stratum_msg *next;
struct stratum_msg *prev;
json_t *json_msg;
int client_id;
};
typedef struct stratum_msg stratum_msg_t;
/* For protecting the stratum msg data */
static pthread_mutex_t stratum_recv_lock;
static pthread_mutex_t stratum_send_lock;
@ -1188,7 +1179,6 @@ static double submission_diff(stratum_instance_t *client, workbase_t *wb, const
if (!wb->proxy)
test_blocksolve(wb, swap, ret, coinbase, cblen);
/* FIXME: Log share here */
return ret;
}
@ -1214,6 +1204,24 @@ out_unlock:
return ret;
}
/* Submit a share in proxy mode to the parent pool. workbase_lock is held */
static void __submit_share(workbase_t *wb, const char *jobid, const char *nonce2,
const char *ntime, const char *nonce)
{
ckpool_t *ckp = wb->ckp;
json_t *json_msg;
char enonce2[32];
char *msg;
sprintf(enonce2, "%s%s", wb->enonce1const, nonce2);
json_msg = json_pack("{ssssssss}", "jobid", jobid, "nonce2", enonce2,
"ntime", ntime, "nonce", nonce);
msg = json_dumps(json_msg, 0);
json_decref(json_msg);
send_proc(ckp->generator, msg);
free(msg);
}
static json_t *parse_submit(stratum_instance_t *client, json_t *json_msg,
json_t *params_val, json_t **err_val)
{
@ -1303,6 +1311,8 @@ static json_t *parse_submit(stratum_instance_t *client, json_t *json_msg,
goto out_unlock;
}
invalid = false;
if (wb->proxy && sdiff > wb->diff)
__submit_share(wb, idstring, nonce2, ntime, nonce);
out_unlock:
ck_runlock(&workbase_lock);

10
src/stratifier.h

@ -10,6 +10,16 @@
#ifndef STRATIFIER_H
#define STRATIFIER_H
struct stratum_msg {
struct stratum_msg *next;
struct stratum_msg *prev;
json_t *json_msg;
int client_id;
};
typedef struct stratum_msg stratum_msg_t;
int stratifier(proc_instance_t *pi);
#endif /* STRATIFIER_H */

Loading…
Cancel
Save