Browse Source

Add mining nodes to a linked list and send them the workinfo for now

master
Con Kolivas 9 years ago
parent
commit
04eaabfaa2
  1. 2
      src/ckpool.h
  2. 6
      src/connector.c
  3. 54
      src/stratifier.c

2
src/ckpool.h

@ -257,6 +257,7 @@ enum stratum_msgtype {
SM_AUTHRESULT, SM_AUTHRESULT,
SM_TXNS, SM_TXNS,
SM_TXNSRESULT, SM_TXNSRESULT,
SM_WORKINFO,
SM_NONE SM_NONE
}; };
@ -274,6 +275,7 @@ static const char __maybe_unused *stratum_msgs[] = {
"auth.result", "auth.result",
"txns", "txns",
"txns.result", "txns.result",
"workinfo",
"" ""
}; };

6
src/connector.c

@ -869,6 +869,12 @@ static void send_client(cdata_t *cdata, const int64_t id, char *buf)
return; return;
} }
if (unlikely(ckp->node && !id)) {
LOGWARNING("Message for node: %s", buf);
free(buf);
return;
}
/* Grab a reference to this client until the sender_send has /* Grab a reference to this client until the sender_send has
* completed processing. Is this a passthrough subclient ? */ * completed processing. Is this a passthrough subclient ? */
if (id > 0xffffffffll) { if (id > 0xffffffffll) {

54
src/stratifier.c

@ -265,6 +265,7 @@ struct stratum_instance {
time_t start_time; time_t start_time;
char address[INET6_ADDRSTRLEN]; char address[INET6_ADDRSTRLEN];
bool node; /* Is this a mining node */
bool subscribed; bool subscribed;
bool authorising; /* In progress, protected by instance_lock */ bool authorising; /* In progress, protected by instance_lock */
bool authorised; bool authorised;
@ -459,6 +460,7 @@ struct stratifier_data {
stratum_instance_t *stratum_instances; stratum_instance_t *stratum_instances;
stratum_instance_t *recycled_instances; stratum_instance_t *recycled_instances;
stratum_instance_t *node_instances;
int stratum_generated; int stratum_generated;
int disconnected_generated; int disconnected_generated;
@ -784,8 +786,13 @@ static void _ckdbq_add(ckpool_t *ckp, const int idtype, json_t *val, const char
#define ckdbq_add(ckp, idtype, val) _ckdbq_add(ckp, idtype, val, __FILE__, __func__, __LINE__) #define ckdbq_add(ckp, idtype, val) _ckdbq_add(ckp, idtype, val, __FILE__, __func__, __LINE__)
static void send_workinfo(ckpool_t *ckp, const workbase_t *wb) static void stratum_add_send(sdata_t *sdata, json_t *val, const int64_t client_id,
const int msg_type);
static void send_workinfo(ckpool_t *ckp, sdata_t *sdata, const workbase_t *wb)
{ {
stratum_instance_t *client;
ckmsg_t *bulk_send = NULL;
char cdfield[64]; char cdfield[64];
json_t *val; json_t *val;
@ -807,6 +814,33 @@ static void send_workinfo(ckpool_t *ckp, const workbase_t *wb)
"createby", "code", "createby", "code",
"createcode", __func__, "createcode", __func__,
"createinet", ckp->serverurl[0]); "createinet", ckp->serverurl[0]);
ck_rlock(&sdata->instance_lock);
DL_FOREACH(sdata->node_instances, client) {
ckmsg_t *client_msg;
smsg_t *msg;
json_t *json_msg = json_deep_copy(val);
json_set_string(json_msg, "node.method", stratum_msgs[SM_WORKINFO]);
client_msg = ckalloc(sizeof(ckmsg_t));
msg = ckzalloc(sizeof(smsg_t));
msg->json_msg = json_msg;
msg->client_id = client->id;
client_msg->data = msg;
DL_APPEND(bulk_send, client_msg);
}
ck_runlock(&sdata->instance_lock);
if (bulk_send) {
ckmsgq_t *ssends = sdata->ssends;
LOGINFO("Sending workinfo to mining nodes");
mutex_lock(ssends->lock);
DL_CONCAT(ssends->msgs, bulk_send);
pthread_cond_signal(ssends->cond);
mutex_unlock(ssends->lock);
}
ckdbq_add(ckp, ID_WORKINFO, val); ckdbq_add(ckp, ID_WORKINFO, val);
} }
@ -894,7 +928,7 @@ static void add_base(ckpool_t *ckp, sdata_t *sdata, workbase_t *wb, bool *new_bl
if (*new_block) if (*new_block)
purge_share_hashtable(sdata, wb->id); purge_share_hashtable(sdata, wb->id);
send_workinfo(ckp, wb); send_workinfo(ckp, sdata, wb);
/* Send the aged work message to ckdb once we have dropped the workbase lock /* Send the aged work message to ckdb once we have dropped the workbase lock
* to prevent taking recursive locks */ * to prevent taking recursive locks */
@ -1994,6 +2028,8 @@ static void __drop_client(sdata_t *sdata, stratum_instance_t *client, bool lazil
{ {
user_instance_t *user = client->user_instance; user_instance_t *user = client->user_instance;
if (unlikely(client->node))
DL_DELETE(sdata->node_instances, client);
if (client->workername) { if (client->workername) {
if (user) { if (user) {
ASPRINTF(msg, "Dropped client %"PRId64" %s %suser %s worker %s %s", ASPRINTF(msg, "Dropped client %"PRId64" %s %suser %s worker %s %s",
@ -5014,6 +5050,17 @@ static void init_client(sdata_t *sdata, const stratum_instance_t *client, const
stratum_send_update(sdata, client_id, true); stratum_send_update(sdata, client_id, true);
} }
static void add_mining_node(ckpool_t *ckp, sdata_t *sdata, stratum_instance_t *client)
{
client->node = true;
ck_wlock(&sdata->instance_lock);
DL_APPEND(sdata->node_instances, client);
ck_wunlock(&sdata->instance_lock);
LOGWARNING("Added client %"PRId64" %s as mining node!", client->id, client->address);
}
/* Enter with client holding ref count */ /* Enter with client holding ref count */
static void parse_method(ckpool_t *ckp, sdata_t *sdata, stratum_instance_t *client, static void parse_method(ckpool_t *ckp, sdata_t *sdata, stratum_instance_t *client,
const int64_t client_id, json_t *id_val, json_t *method_val, const int64_t client_id, json_t *id_val, json_t *method_val,
@ -5067,10 +5114,9 @@ static void parse_method(ckpool_t *ckp, sdata_t *sdata, stratum_instance_t *clie
/* Add this client as a passthrough in the connector and /* Add this client as a passthrough in the connector and
* add it to the list of mining nodes in the stratifier */ * add it to the list of mining nodes in the stratifier */
LOGNOTICE("Adding mining client %"PRId64" %s", client_id, client->address); add_mining_node(ckp, sdata, client);
snprintf(buf, 255, "passthrough=%"PRId64, client_id); snprintf(buf, 255, "passthrough=%"PRId64, client_id);
send_proc(ckp->connector, buf); send_proc(ckp->connector, buf);
drop_client(ckp, sdata, client_id);
return; return;
} }

Loading…
Cancel
Save