Browse Source

Create stratum receive/send threads that will use their own hashtables and conditionals to do work

master
Con Kolivas 11 years ago
parent
commit
e946665023
  1. 6
      src/libckpool.c
  2. 3
      src/libckpool.h
  3. 56
      src/stratifier.c

6
src/libckpool.c

@ -179,6 +179,12 @@ void rwlock_destroy(pthread_rwlock_t *lock)
pthread_rwlock_destroy(lock); pthread_rwlock_destroy(lock);
} }
void _cond_init(pthread_cond_t *cond, const char *file, const char *func, const int line)
{
if (unlikely(pthread_cond_init(cond, NULL)))
quitfrom(1, file, func, line, "Failed to pthread_cond_init!");
}
void _cklock_init(cklock_t *lock, const char *file, const char *func, const int line) void _cklock_init(cklock_t *lock, const char *file, const char *func, const int line)
{ {
_mutex_init(&lock->mutex, file, func, line); _mutex_init(&lock->mutex, file, func, line);

3
src/libckpool.h

@ -34,6 +34,8 @@
#define wr_unlock(_lock) _wr_unlock(_lock, __FILE__, __func__, __LINE__) #define wr_unlock(_lock) _wr_unlock(_lock, __FILE__, __func__, __LINE__)
#define mutex_init(_lock) _mutex_init(_lock, __FILE__, __func__, __LINE__) #define mutex_init(_lock) _mutex_init(_lock, __FILE__, __func__, __LINE__)
#define rwlock_init(_lock) _rwlock_init(_lock, __FILE__, __func__, __LINE__) #define rwlock_init(_lock) _rwlock_init(_lock, __FILE__, __func__, __LINE__)
#define cond_init(_cond) _cond_init(_cond, __FILE__, __func__, __LINE__)
#define cklock_init(_lock) _cklock_init(_lock, __FILE__, __func__, __LINE__) #define cklock_init(_lock) _cklock_init(_lock, __FILE__, __func__, __LINE__)
#define ck_rlock(_lock) _ck_rlock(_lock, __FILE__, __func__, __LINE__) #define ck_rlock(_lock) _ck_rlock(_lock, __FILE__, __func__, __LINE__)
#define ck_ilock(_lock) _ck_ilock(_lock, __FILE__, __func__, __LINE__) #define ck_ilock(_lock) _ck_ilock(_lock, __FILE__, __func__, __LINE__)
@ -190,6 +192,7 @@ void _mutex_init(pthread_mutex_t *lock, const char *file, const char *func, cons
void mutex_destroy(pthread_mutex_t *lock); void mutex_destroy(pthread_mutex_t *lock);
void _rwlock_init(pthread_rwlock_t *lock, const char *file, const char *func, const int line); void _rwlock_init(pthread_rwlock_t *lock, const char *file, const char *func, const int line);
void rwlock_destroy(pthread_rwlock_t *lock); void rwlock_destroy(pthread_rwlock_t *lock);
void _cond_init(pthread_cond_t *cond, const char *file, const char *func, const int line);
void _cklock_init(cklock_t *lock, const char *file, const char *func, const int line); void _cklock_init(cklock_t *lock, const char *file, const char *func, const int line);
void cklock_destroy(cklock_t *lock); void cklock_destroy(cklock_t *lock);

56
src/stratifier.c

@ -76,12 +76,36 @@ struct workbase {
typedef struct workbase workbase_t; typedef struct workbase workbase_t;
/* For protecting the hashtable data */ /* For protecting the hashtable data */
cklock_t workbase_lock; static cklock_t workbase_lock;
/* For the hashtable of all workbases */ /* For the hashtable of all workbases */
workbase_t *workbases; static workbase_t *workbases;
static int workbase_id; static int workbase_id;
struct stratum_msg {
UT_hash_handle hh;
int id;
char *msg;
};
typedef struct stratum_msg stratum_msg_t;
/* For protecting the stratum msg data */
static pthread_mutex_t stratum_recv_lock;
static pthread_mutex_t stratum_send_lock;
/* For signalling the threads to wake up and do work */
static pthread_cond_t stratum_recv_cond;
static pthread_cond_t stratum_send_cond;
/* For the hashtable of all queued messages */
static stratum_msg_t *stratum_recvs;
static stratum_msg_t *stratum_sends;
static int stratum_recv_id;
static int stratum_send_id;
/* No error checking with these, make sure we know they're valid already! */ /* No error checking with these, make sure we know they're valid already! */
static inline void json_strcpy(char *buf, json_t *val, const char *key) static inline void json_strcpy(char *buf, json_t *val, const char *key)
{ {
@ -338,9 +362,23 @@ static void *blockupdate(void *arg)
return NULL; return NULL;
} }
static void *stratum_receiver(void *arg)
{
ckpool_t *ckp = (ckpool_t *)arg;
return NULL;
}
static void *stratum_sender(void *arg)
{
ckpool_t *ckp = (ckpool_t *)arg;
return NULL;
}
int stratifier(proc_instance_t *pi) int stratifier(proc_instance_t *pi)
{ {
pthread_t pth_blockupdate; pthread_t pth_blockupdate, pth_stratum_receiver, pth_stratum_sender;
ckpool_t *ckp = pi->ckp; ckpool_t *ckp = pi->ckp;
int ret = 0; int ret = 0;
@ -348,9 +386,19 @@ int stratifier(proc_instance_t *pi)
hex2bin(scriptsig_header_bin, scriptsig_header, 41); hex2bin(scriptsig_header_bin, scriptsig_header, 41);
address_to_pubkeytxn(pubkeytxnbin, ckp->btcaddress); address_to_pubkeytxn(pubkeytxnbin, ckp->btcaddress);
__bin2hex(pubkeytxn, pubkeytxnbin, 25); __bin2hex(pubkeytxn, pubkeytxnbin, 25);
cklock_init(&workbase_lock);
mutex_init(&stratum_recv_lock);
cond_init(&stratum_recv_cond);
create_pthread(&pth_stratum_receiver, stratum_receiver, ckp);
mutex_init(&stratum_send_lock);
cond_init(&stratum_send_cond);
create_pthread(&pth_stratum_sender, stratum_sender, ckp);
cklock_init(&workbase_lock);
create_pthread(&pth_blockupdate, blockupdate, ckp); create_pthread(&pth_blockupdate, blockupdate, ckp);
strat_loop(ckp, pi); strat_loop(ckp, pi);
return ret; return ret;
} }

Loading…
Cancel
Save