diff --git a/src/libckpool.c b/src/libckpool.c index a14f44b8..444a5b8d 100644 --- a/src/libckpool.c +++ b/src/libckpool.c @@ -179,6 +179,12 @@ void rwlock_destroy(pthread_rwlock_t *lock) pthread_rwlock_destroy(lock); } +void _cond_init(pthread_cond_t *cond, const char *file, const char *func, const int line) +{ + if (unlikely(pthread_cond_init(cond, NULL))) + quitfrom(1, file, func, line, "Failed to pthread_cond_init!"); +} + void _cklock_init(cklock_t *lock, const char *file, const char *func, const int line) { _mutex_init(&lock->mutex, file, func, line); diff --git a/src/libckpool.h b/src/libckpool.h index 9b647c4f..67d7125f 100644 --- a/src/libckpool.h +++ b/src/libckpool.h @@ -34,6 +34,8 @@ #define wr_unlock(_lock) _wr_unlock(_lock, __FILE__, __func__, __LINE__) #define mutex_init(_lock) _mutex_init(_lock, __FILE__, __func__, __LINE__) #define rwlock_init(_lock) _rwlock_init(_lock, __FILE__, __func__, __LINE__) +#define cond_init(_cond) _cond_init(_cond, __FILE__, __func__, __LINE__) + #define cklock_init(_lock) _cklock_init(_lock, __FILE__, __func__, __LINE__) #define ck_rlock(_lock) _ck_rlock(_lock, __FILE__, __func__, __LINE__) #define ck_ilock(_lock) _ck_ilock(_lock, __FILE__, __func__, __LINE__) @@ -190,6 +192,7 @@ void _mutex_init(pthread_mutex_t *lock, const char *file, const char *func, cons void mutex_destroy(pthread_mutex_t *lock); void _rwlock_init(pthread_rwlock_t *lock, const char *file, const char *func, const int line); void rwlock_destroy(pthread_rwlock_t *lock); +void _cond_init(pthread_cond_t *cond, const char *file, const char *func, const int line); void _cklock_init(cklock_t *lock, const char *file, const char *func, const int line); void cklock_destroy(cklock_t *lock); diff --git a/src/stratifier.c b/src/stratifier.c index 1664961c..d3fe377a 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -76,12 +76,36 @@ struct workbase { typedef struct workbase workbase_t; /* For protecting the hashtable data */ -cklock_t workbase_lock; +static cklock_t workbase_lock; /* For the hashtable of all workbases */ -workbase_t *workbases; +static workbase_t *workbases; static int workbase_id; +struct stratum_msg { + UT_hash_handle hh; + int id; + + char *msg; +}; + +typedef struct stratum_msg stratum_msg_t; + +/* For protecting the stratum msg data */ +static pthread_mutex_t stratum_recv_lock; +static pthread_mutex_t stratum_send_lock; + +/* For signalling the threads to wake up and do work */ +static pthread_cond_t stratum_recv_cond; +static pthread_cond_t stratum_send_cond; + +/* For the hashtable of all queued messages */ +static stratum_msg_t *stratum_recvs; +static stratum_msg_t *stratum_sends; + +static int stratum_recv_id; +static int stratum_send_id; + /* No error checking with these, make sure we know they're valid already! */ static inline void json_strcpy(char *buf, json_t *val, const char *key) { @@ -338,9 +362,23 @@ static void *blockupdate(void *arg) return NULL; } +static void *stratum_receiver(void *arg) +{ + ckpool_t *ckp = (ckpool_t *)arg; + + return NULL; +} + +static void *stratum_sender(void *arg) +{ + ckpool_t *ckp = (ckpool_t *)arg; + + return NULL; +} + int stratifier(proc_instance_t *pi) { - pthread_t pth_blockupdate; + pthread_t pth_blockupdate, pth_stratum_receiver, pth_stratum_sender; ckpool_t *ckp = pi->ckp; int ret = 0; @@ -348,9 +386,19 @@ int stratifier(proc_instance_t *pi) hex2bin(scriptsig_header_bin, scriptsig_header, 41); address_to_pubkeytxn(pubkeytxnbin, ckp->btcaddress); __bin2hex(pubkeytxn, pubkeytxnbin, 25); - cklock_init(&workbase_lock); + mutex_init(&stratum_recv_lock); + cond_init(&stratum_recv_cond); + create_pthread(&pth_stratum_receiver, stratum_receiver, ckp); + + mutex_init(&stratum_send_lock); + cond_init(&stratum_send_cond); + create_pthread(&pth_stratum_sender, stratum_sender, ckp); + + cklock_init(&workbase_lock); create_pthread(&pth_blockupdate, blockupdate, ckp); + strat_loop(ckp, pi); + return ret; }