kanoi 11 years ago
parent
commit
678ac47b94
  1. 32
      src/connector.c
  2. 53
      src/stratifier.c

32
src/connector.c

@ -47,7 +47,8 @@ struct client_instance {
struct client_instance *next; struct client_instance *next;
struct sockaddr address; struct sockaddr address;
socklen_t address_len; char address_name[INET6_ADDRSTRLEN];
char buf[PAGESIZE]; char buf[PAGESIZE];
int bufofs; int bufofs;
}; };
@ -87,24 +88,46 @@ void *acceptor(void *arg)
{ {
conn_instance_t *ci = (conn_instance_t *)arg; conn_instance_t *ci = (conn_instance_t *)arg;
client_instance_t *client, *old_client; client_instance_t *client, *old_client;
socklen_t address_len;
int fd; int fd;
rename_proc("acceptor"); rename_proc("acceptor");
retry: retry:
client = ckzalloc(sizeof(client_instance_t)); client = ckzalloc(sizeof(client_instance_t));
client->address_len = sizeof(client->address); address_len = sizeof(client->address);
while (!ci->accept) while (!ci->accept)
sleep(1); sleep(1);
fd = accept(ci->serverfd, &client->address, &client->address_len); fd = accept(ci->serverfd, &client->address, &address_len);
if (unlikely(fd < 0)) { if (unlikely(fd < 0)) {
LOGERR("Failed to accept on socket %d in acceptor", ci->serverfd); LOGERR("Failed to accept on socket %d in acceptor", ci->serverfd);
dealloc(client); dealloc(client);
goto out; goto out;
} }
switch (client->address.sa_family) {
const struct sockaddr_in *inet4_in;
const struct sockaddr_in6 *inet6_in;
case AF_INET:
inet4_in = (struct sockaddr_in *)&client->address;
inet_ntop(AF_INET, &inet4_in->sin_addr, client->address_name, INET6_ADDRSTRLEN);
break;
case AF_INET6:
inet6_in = (struct sockaddr_in6 *)&client->address;
inet_ntop(AF_INET6, &inet6_in->sin6_addr, client->address_name, INET6_ADDRSTRLEN);
break;
default:
LOGWARNING("Unknown INET type for client %d on socket %d",
ci->nfds, fd);
close(fd);
free(client);
goto retry;
}
keep_sockalive(fd); keep_sockalive(fd);
LOGINFO("Connected new client %d on socket %d", ci->nfds, fd); LOGINFO("Connected new client %d on socket %d from %s", ci->nfds, fd, client->address_name);
client->fd = fd; client->fd = fd;
@ -221,6 +244,7 @@ reparse:
char *s; char *s;
json_object_set_new_nocheck(val, "client_id", json_integer(client->id)); json_object_set_new_nocheck(val, "client_id", json_integer(client->id));
json_object_set_new_nocheck(val, "address", json_string(client->address_name));
s = json_dumps(val, 0); s = json_dumps(val, 0);
send_proc(ckp->stratifier, s); send_proc(ckp->stratifier, s);
free(s); free(s);

53
src/stratifier.c

@ -9,6 +9,7 @@
#include "config.h" #include "config.h"
#include <arpa/inet.h>
#include <sys/socket.h> #include <sys/socket.h>
#include <sys/stat.h> #include <sys/stat.h>
#include <sys/time.h> #include <sys/time.h>
@ -164,6 +165,7 @@ struct json_params {
json_t *params; json_t *params;
json_t *id_val; json_t *id_val;
int client_id; int client_id;
char address[INET6_ADDRSTRLEN];
}; };
typedef struct json_params json_params_t; typedef struct json_params json_params_t;
@ -179,6 +181,7 @@ typedef struct ckdb_msg ckdb_msg_t;
struct smsg { struct smsg {
json_t *json_msg; json_t *json_msg;
int client_id; int client_id;
char address[INET6_ADDRSTRLEN];
}; };
typedef struct smsg smsg_t; typedef struct smsg smsg_t;
@ -226,6 +229,7 @@ struct stratum_instance {
tv_t last_share; tv_t last_share;
time_t start_time; time_t start_time;
char address[INET6_ADDRSTRLEN];
bool authorised; bool authorised;
bool idle; bool idle;
bool notified_idle; bool notified_idle;
@ -1214,7 +1218,7 @@ static bool send_recv_auth(stratum_instance_t *client)
"createdate", cdfield, "createdate", cdfield,
"createby", "code", "createby", "code",
"createcode", __func__, "createcode", __func__,
"createinet", "127.0.0.1"); "createinet", client->address);
buf = json_ckdb_call(ckp, ckdb_ids[ID_AUTH], val, false); buf = json_ckdb_call(ckp, ckdb_ids[ID_AUTH], val, false);
if (likely(buf)) { if (likely(buf)) {
char *secondaryuserid, *response = alloca(128); char *secondaryuserid, *response = alloca(128);
@ -1237,7 +1241,7 @@ static bool send_recv_auth(stratum_instance_t *client)
return ret; return ret;
} }
static json_t *parse_authorise(stratum_instance_t *client, json_t *params_val, json_t **err_val) static json_t *parse_authorise(stratum_instance_t *client, json_t *params_val, json_t **err_val, const char *address)
{ {
bool ret = false; bool ret = false;
const char *buf; const char *buf;
@ -1270,6 +1274,7 @@ static json_t *parse_authorise(stratum_instance_t *client, json_t *params_val, j
client->user_id = client->user_instance->id; client->user_id = client->user_instance->id;
ts_realtime(&now); ts_realtime(&now);
client->start_time = now.tv_sec; client->start_time = now.tv_sec;
strcpy(client->address, address);
LOGNOTICE("Authorised client %d worker %s as user %s", client->id, buf, LOGNOTICE("Authorised client %d worker %s as user %s", client->id, buf,
client->user_instance->username); client->user_instance->username);
@ -1865,18 +1870,19 @@ static void update_client(const int client_id)
stratum_send_diff(client); stratum_send_diff(client);
} }
static json_params_t *create_json_params(const int client_id, const json_t *params, const json_t *id_val) static json_params_t *create_json_params(const int client_id, const json_t *params, const json_t *id_val, const char *address)
{ {
json_params_t *jp = ckalloc(sizeof(json_params_t)); json_params_t *jp = ckalloc(sizeof(json_params_t));
jp->params = json_deep_copy(params); jp->params = json_deep_copy(params);
jp->id_val = json_deep_copy(id_val); jp->id_val = json_deep_copy(id_val);
jp->client_id = client_id; jp->client_id = client_id;
strcpy(jp->address, address);
return jp; return jp;
} }
static void parse_method(const int client_id, json_t *id_val, json_t *method_val, static void parse_method(const int client_id, json_t *id_val, json_t *method_val,
json_t *params_val) json_t *params_val, char *address)
{ {
stratum_instance_t *client; stratum_instance_t *client;
const char *method; const char *method;
@ -1908,7 +1914,7 @@ static void parse_method(const int client_id, json_t *id_val, json_t *method_val
} }
if (!strncasecmp(method, "mining.auth", 11)) { if (!strncasecmp(method, "mining.auth", 11)) {
json_params_t *jp = create_json_params(client_id, params_val, id_val); json_params_t *jp = create_json_params(client_id, params_val, id_val, address);
ckmsgq_add(sauthq, jp); ckmsgq_add(sauthq, jp);
return; return;
@ -1928,7 +1934,7 @@ static void parse_method(const int client_id, json_t *id_val, json_t *method_val
} }
if (!strncasecmp(method, "mining.submit", 13)) { if (!strncasecmp(method, "mining.submit", 13)) {
json_params_t *jp = create_json_params(client_id, params_val, id_val); json_params_t *jp = create_json_params(client_id, params_val, id_val, address);
ckmsgq_add(sshareq, jp); ckmsgq_add(sshareq, jp);
return; return;
@ -1959,7 +1965,7 @@ static void parse_instance_msg(smsg_t *msg)
send_json_err(client_id, id_val, "-1:params not found"); send_json_err(client_id, id_val, "-1:params not found");
goto out; goto out;
} }
parse_method(client_id, id_val, method, params); parse_method(client_id, id_val, method, params, msg->address);
out: out:
json_decref(val); json_decref(val);
free(msg); free(msg);
@ -1968,9 +1974,36 @@ out:
static void srecv_process(ckpool_t *ckp, smsg_t *msg) static void srecv_process(ckpool_t *ckp, smsg_t *msg)
{ {
stratum_instance_t *instance; stratum_instance_t *instance;
json_t *val;
val = json_object_get(msg->json_msg, "client_id");
if (unlikely(!val)) {
char *s;
s = json_dumps(msg->json_msg, 0);
LOGWARNING("Failed to extract client_id from connector json smsg %s", s);
free(s);
json_decref(msg->json_msg);
free(msg);
return;
}
msg->client_id = json_integer_value(val);
json_object_clear(val);
val = json_object_get(msg->json_msg, "address");
if (unlikely(!val)) {
char *s;
msg->client_id = json_integer_value(json_object_get(msg->json_msg, "client_id")); s = json_dumps(msg->json_msg, 0);
json_object_del(msg->json_msg, "client_id"); LOGWARNING("Failed to extract address from connector json smsg %s", s);
free(s);
json_decref(msg->json_msg);
free(msg);
return;
}
strcpy(msg->address, json_string_value(val));
json_object_clear(val);
/* Parse the message here */ /* Parse the message here */
ck_ilock(&instance_lock); ck_ilock(&instance_lock);
@ -2063,7 +2096,7 @@ static void sauth_process(ckpool_t *ckp, json_params_t *jp)
LOGINFO("Authoriser failed to find client id %d in hashtable!", client_id); LOGINFO("Authoriser failed to find client id %d in hashtable!", client_id);
goto out; goto out;
} }
result_val = parse_authorise(client, jp->params, &err_val); result_val = parse_authorise(client, jp->params, &err_val, jp->address);
if (json_is_true(result_val)) { if (json_is_true(result_val)) {
char *buf; char *buf;

Loading…
Cancel
Save