diff --git a/pool/inc.php b/pool/inc.php index cf29fbef..13f71101 100644 --- a/pool/inc.php +++ b/pool/inc.php @@ -29,7 +29,7 @@ function gfl(c){c['ctx'].fill()} function gst(c){c['ctx'].stroke()} function gfi(c){gle(c);gst(c)} function gbd(c){gbe(c,0,0);gln(c,1,0);gln(c,1,1);gln(c,0,1);gle(c);gfl(c);gst(c)} -function ggr(c,xs,ys,yt,xn,x0,x1,y0,y1,ar,nx,vx,vy,av,w,cols){gtso(c,xs,ys);gss(c,'black');glw(c,1.5);gbe(c,0,1);gln(c,0,0);gln(c,1,0);gst(c);glw(c,0.2);var hi=c['ctx'].measureText('M').width, wi=c['ctx'].measureText('0').width;for(var i=0;i<11;i++){var y=i/10.0;gbe(c,-0.01,y);gln(c,1,y);gst(c);var t=''+(((y1-y0)*i/10+y0).toFixed(2));gfz(c,0,y,-wi,0,t,'black','end')}gfz(c,gx0(c),0.55,wi,0,yt,'#0080ff','left');var m=Math.round(0.5+xn/20.0);for(var i=0;i=x0;i-=hrs){var xo=(i-x0)/(x1-x0);if(c['hrs'][c['hr']]<=48){n=dfmt(c,i)}else{n=dfmtm(c,i)}if(xo<=1 && c['tkey'] && ((l%hlv)==0)){gbe(c,xo,0);gln(c,xo,-0.02);gst(c);gfz(c,xo,0,0,-hi*tpos,n,'brown','center')}if(xo<=1 && c['tlines']){gbe(c,xo,0);gln(c,xo,1);gst(c)}l++}}glw(c,1);if(c['smooth']){for(var j=1;j0 && c['lin'+j]){gss(c,'red');var y=(av[j-1]-y0)/(y1-y0);gbe(c,0,y);gln(c,1,y);gst(c);var t=''+av[j-1].toFixed(2)+'av';gfz(c,1,y,1,0,t,cols[j-1],'left')}}if(c['tkey']){var i,dsp;i=c['hrs'][c['hr']];if(i < 24){dsp=''+i+'h'}else{dsp=''+(i/24)+'d'}gfz(c,1,0,c['xo']-c['pxe'],hi,dsp,'red','end');i=c['hln'][c['hl']];gfz(c,1,0,c['xo']-c['pxe'],hi*3,''+i,'red','end')}} +function ggr(c,xs,ys,yt,xn,x0,x1,y0,y1,ar,nx,vx,vy,av,w,cols,ex){gtso(c,xs,ys);if(ex){ex(0,c,xn,x0,x1,y0,y1,ar)}gss(c,'black');glw(c,1.5);gbe(c,0,1);gln(c,0,0);gln(c,1,0);gst(c);glw(c,0.2);var hi=c['ctx'].measureText('M').width, wi=c['ctx'].measureText('0').width;for(var i=0;i<11;i++){var y=i/10.0;gbe(c,-0.01,y);gln(c,1,y);gst(c);var t=''+(((y1-y0)*i/10+y0).toFixed(2));gfz(c,0,y,-wi,0,t,'black','end')}gfz(c,gx0(c),0.55,wi,0,yt,'#0080ff','left');var m=Math.round(0.5+xn/20.0);for(var i=0;i=x0;i-=hrs){var xo=(i-x0)/(x1-x0);if(c['hrs'][c['hr']]<=48){n=dfmt(c,i)}else{n=dfmtm(c,i)}if(xo<=1 && c['tkey'] && ((l%hlv)==0)){gbe(c,xo,0);gln(c,xo,-0.02);gst(c);gfz(c,xo,0,0,-hi*tpos,n,'brown','center')}if(xo<=1 && c['tlines']){gbe(c,xo,0);gln(c,xo,1);gst(c)}l++}}glw(c,1);if(c['smooth']){for(var j=1;j0 && c['lin'+j]){gss(c,'red');var y=(av[j-1]-y0)/(y1-y0);gbe(c,0,y);gln(c,1,y);gst(c);var t=''+av[j-1].toFixed(2)+'av';gfz(c,1,y,1,0,t,cols[j-1],'left')}}if(c['tkey']){var i,dsp;i=c['hrs'][c['hr']];if(i < 24){dsp=''+i+'h'}else{dsp=''+(i/24)+'d'}gfz(c,1,0,c['xo']-c['pxe'],hi,dsp,'red','end');i=c['hln'][c['hl']];gfz(c,1,0,c['xo']-c['pxe'],hi*3,''+i,'red','end')}if(ex){ex(9,c,xn,x0,x1,y0,y1,ar)}} function sn(i,shi){if(shi.indexOf(' Shift ')<0){return ''+(i%10)}else{return shi.replace(/.* ([a-z])[a-z]*$/,'$1')}} function gc2(c){var div=document.getElementById('can0');while (div.firstChild){div.removeChild(div.firstChild)}c['can']=document.createElement('canvas');c['can'].id='can';c['xo']=0.0;c['yo']=0.0;c['ctx']=c['can'].getContext('2d');c['ctx'].canvas.width=c['xm']+1;c['ctx'].canvas.height=c['ym']+1;div.appendChild(c['can']);c['pxe']=Math.max(Math.round(c['xm']/250),1)} function gc(c){c['wx']=window.innerWidth;c['wy']=window.innerHeight;c['xm']=Math.max(Math.round(c['wx']*0.9+0.5),400);c['ym']=Math.max(Math.round(c['wy']*0.8+0.5),400);if(c['ym']>c['xm']){c['ym']=c['xm']}gc2(c)} diff --git a/pool/page_allwork.php b/pool/page_allwork.php index e289e8ac..bb1fb3a6 100644 --- a/pool/page_allwork.php +++ b/pool/page_allwork.php @@ -6,7 +6,7 @@ function doallwork($data, $user) { $pg = '

All Workers

'; - $pg .= "\n"; + $pg .= worktable(); $totshare = 0; $totdiff = 0; diff --git a/pool/page_luck.php b/pool/page_luck.php index 434a3e4e..19a932fd 100644 --- a/pool/page_luck.php +++ b/pool/page_luck.php @@ -13,7 +13,7 @@ for(var j=1;j500){ymax=500} ghg(c,xmax-xmin); -ggr(c,0.90,0.90,'Luck%',rows,xmin,xmax,ymin,ymax,d,'seq:','vx:','luck:',tlk,w,cols)} +ggr(c,0.90,0.90,'Luck%',rows,xmin,xmax,ymin,ymax,d,'seq:','vx:','luck:',tlk,w,cols,null)} c={}; function dodrw(data,cbx){if(hasCan()){gdrw(c,sep(data),cbx)}} function gact(t){if(t.checked){scnv(t.id,1)}else{scnv(t.id,0)}godrw(0)}"; diff --git a/pool/page_psperf.php b/pool/page_psperf.php index 62672cc3..c9c60bfa 100644 --- a/pool/page_psperf.php +++ b/pool/page_psperf.php @@ -15,7 +15,7 @@ for(var j=1;j=0&&fi>=0){var xs,xf;xs=(st-x0)/(x1-x0);xf=(fi-x0)/(x1-x0);gfs(c,'$fs');gbe(c,xs,0);gln(c,xs,1);gln(c,xf,1);gln(c,xf,0);gle(c);gfl(c)}}} +function gdrw(c,d,cbx){gc(c);ghrs(c);gopt(c,cbx); gfs(c,'white');gss(c,'#0000c0');glw(c,2);gbd(c); var rows=d['rows'],ymin=-1,ymax=0,xmin=-1,xmax=0,tda=[]; var w=d['arp'].split(',');var cols=d['cols'].split(','); @@ -15,7 +16,7 @@ for(var j=1;j $txt) diff --git a/pool/page_workers.php b/pool/page_workers.php index 78173010..0f9c197b 100644 --- a/pool/page_workers.php +++ b/pool/page_workers.php @@ -1,5 +1,16 @@ \n"; + $pg .= "function wkdet(n,i){var t=document.getElementById(n);if(i&&t){var b,cs,j,c,a;b=i.checked;cs=t.getElementsByTagName('td');for(j=0;c=cs[j];j++) +{a=c.getAttribute('data-hid');if(a){if(b){c.className=a}else{c.className='hid'}}}}}"; + $pg .= "\n"; + $pg .= "Show Details for Invalids:
"; + $pg .= "
\n"; + return $pg; +} +# function worktitle($data, $user) { addSort(); @@ -13,8 +24,12 @@ function worktitle($data, $user) $pg .= ""; $pg .= ''; $pg .= ""; - $pg .= ''; - $pg .= ""; + $pg .= ""; + $pg .= ""; + $pg .= ""; + $pg .= ""; + $pg .= ''; + $pg .= ""; $pg .= "\n"; return $pg; } @@ -66,6 +81,14 @@ function workuser($data, $user, &$offset, &$totshare, &$totdiff, 'w_shareacc' => $ans['w_shareacc:'.$i], 'w_diffacc' => $ans['w_diffacc:'.$i], 'w_diffinv' => $ans['w_diffinv:'.$i], + 'w_diffsta' => $ans['w_diffsta:'.$i], + 'w_diffdup' => $ans['w_diffdup:'.$i], + 'w_diffhi' => $ans['w_diffhi:'.$i], + 'w_diffrej' => $ans['w_diffrej:'.$i], + 'w_sharesta' => $ans['w_sharesta:'.$i], + 'w_sharedup' => $ans['w_sharedup:'.$i], + 'w_sharehi' => $ans['w_sharehi:'.$i], + 'w_sharerej' => $ans['w_sharerej:'.$i], 'w_lastdiff' => $ans['w_lastdiff:'.$i], 'w_active_diffacc' => $ans['w_active_diffacc:'.$i], 'w_active_start' => $ans['w_active_start:'.$i], @@ -143,6 +166,15 @@ function workuser($data, $user, &$offset, &$totshare, &$totdiff, $pg .= ""; + foreach(array('sta','dup','hi','rej') as $fld) + { + $shr = number_format($all[$i]['w_share'.$fld]); + $dif = $all[$i]['w_diff'.$fld]; + $ddif = number_format($dif); + $sdif = number_format($dif,0,'',''); + $pg .= ""; + } + if ($blockacc <= 0) $blkpct = ' '; else @@ -198,6 +230,7 @@ function worktotal($offset, $totshare, $totdiff, $totshrate, $totinvalid, $blkpct = ' '; else $blkpct = number_format(100.0 * $totdiff / $blockacc, 3) . '%'; + $pg .= ""; $pg .= ""; $pg .= "\n"; return $pg; @@ -207,7 +240,7 @@ function doworker($data, $user) { $title = ''; - $pg = "
<$r id=srtshrate data-sf=r5>:Share Rate«Elapsed<$r id=srtinv data-sf=r7>:InvalidBlock %<$r id=srtrate data-sf=r9>:Hash Rate<$r id=srtstale data-sf=r8>:Stale<$r id=srtdup data-sf=r9>:Dup<$r id=srthi data-sf=r10>:Hi<$r id=srtreject data-sf=r11>:RejBlock %<$r id=srtrate data-sf=r13>:Hash Rate
$rej%$ddif/$shr $blkpct$totrate
\n"; + $pg = worktable(); $totshare = 0; $totdiff = 0; diff --git a/src/bitcoin.c b/src/bitcoin.c index 88e37817..48052684 100644 --- a/src/bitcoin.c +++ b/src/bitcoin.c @@ -54,7 +54,7 @@ bool validate_address(connsock_t *cs, const char *address) snprintf(rpc_req, 128, "{\"method\": \"validateaddress\", \"params\": [\"%s\"]}\n", address); val = json_rpc_call(cs, rpc_req); if (!val) { - LOGERR("Failed to get valid json response to validate_address"); + LOGERR("%s:%s Failed to get valid json response to validate_address", cs->url, cs->port); return ret; } res_val = json_object_get(val, "result"); @@ -194,7 +194,7 @@ bool gen_gbtbase(connsock_t *cs, gbtbase_t *gbt) val = json_rpc_call(cs, gbt_req); if (!val) { - LOGWARNING("Failed to get valid json response to getblocktemplate"); + LOGWARNING("%s:%s Failed to get valid json response to getblocktemplate", cs->url, cs->port); return ret; } res_val = json_object_get(val, "result"); @@ -299,7 +299,7 @@ int get_blockcount(connsock_t *cs) val = json_rpc_call(cs, blockcount_req); if (!val) { - LOGWARNING("Failed to get valid json response to getblockcount"); + LOGWARNING("%s:%s Failed to get valid json response to getblockcount", cs->url, cs->port); return ret; } res_val = json_object_get(val, "result"); @@ -325,7 +325,7 @@ bool get_blockhash(connsock_t *cs, int height, char *hash) sprintf(rpc_req, "{\"method\": \"getblockhash\", \"params\": [%d]}\n", height); val = json_rpc_call(cs, rpc_req); if (!val) { - LOGWARNING("Failed to get valid json response to getblockhash"); + LOGWARNING("%s:%s Failed to get valid json response to getblockhash", cs->url, cs->port); return ret; } res_val = json_object_get(val, "result"); @@ -356,7 +356,7 @@ bool get_bestblockhash(connsock_t *cs, char *hash) val = json_rpc_call(cs, bestblockhash_req); if (!val) { - LOGWARNING("Failed to get valid json response to getbestblockhash"); + LOGWARNING("%s:%s Failed to get valid json response to getbestblockhash", cs->url, cs->port); return ret; } res_val = json_object_get(val, "result"); @@ -391,7 +391,7 @@ retry: val = json_rpc_call(cs, rpc_req); dealloc(rpc_req); if (!val) { - LOGWARNING("Failed to get valid json response to submitblock"); + LOGWARNING("%s:%s Failed to get valid json response to submitblock", cs->url, cs->port); if (++retries < 5) goto retry; return ret; diff --git a/src/ckdb.c b/src/ckdb.c index 696c56f3..299c177e 100644 --- a/src/ckdb.c +++ b/src/ckdb.c @@ -483,6 +483,9 @@ K_STORE *markersummary_store; K_TREE *markersummary_pool_root; K_STORE *markersummary_pool_store; +// The markerid load start for markersummary +char *mark_start = NULL; + // WORKMARKERS K_TREE *workmarkers_root; K_TREE *workmarkers_workinfoid_root; @@ -1746,7 +1749,7 @@ static void trans_process(SEQSET *seqset, tv_t *now, K_STORE *store) seqtrans->seq = seq; seqtrans->seqnum = u; memcpy(&(seqtrans->entry), seqentry, sizeof(SEQENTRY)); - k_add_head(store, st_item); + k_add_head_nolock(store, st_item); } u++; seqentry++; @@ -1772,7 +1775,7 @@ static void trans_process(SEQSET *seqset, tv_t *now, K_STORE *store) seqtrans->seq = seq; seqtrans->seqnum = u; memcpy(&(seqtrans->entry), seqentry, sizeof(SEQENTRY)); - k_add_head(store, st_item); + k_add_head_nolock(store, st_item); } u++; seqentry++; @@ -2143,7 +2146,7 @@ gotseqset: sizeof(SEQENTRY)); if (!lost) lost = k_new_store(seqtrans_free); - k_add_tail(lost, st_item); + k_add_tail_nolock(lost, st_item); seqdata->lost++; seqset->lost++; if (ENTRYISTRANS(u_entry)) { @@ -2173,7 +2176,7 @@ gotseqset: seqdata->reload_lost = k_new_store(seqtrans_free); seqdata_reload_lost = true; } - k_add_tail(seqdata->reload_lost, stl_item); + k_add_tail_nolock(seqdata->reload_lost, stl_item); } } else { // (u-size) wasn't missing @@ -2254,8 +2257,7 @@ gotseqset: } if (st_item) { // recovered a lost entry - k_unlink_item(seqtrans_free, st_item); - // N.B. lock inside lock + k_unlink_item_nolock(seqdata->reload_lost, st_item); K_WLOCK(seqtrans_free); k_add_head(seqtrans_free, st_item); K_WUNLOCK(seqtrans_free); @@ -5640,6 +5642,7 @@ static struct option long_options[] = { { "loglevel", required_argument, 0, 'l' }, // marker = enable mark/workmarker/markersummary auto generation { "marker", no_argument, 0, 'm' }, + { "markstart", required_argument, 0, 'M' }, { "name", required_argument, 0, 'n' }, { "dbpass", required_argument, 0, 'p' }, { "btc-pass", required_argument, 0, 'P' }, @@ -5684,7 +5687,7 @@ int main(int argc, char **argv) memset(&ckp, 0, sizeof(ckp)); ckp.loglevel = LOG_NOTICE; - while ((c = getopt_long(argc, argv, "c:d:ghkl:mn:p:P:r:R:s:S:t:u:U:vw:yY:", long_options, &i)) != -1) { + while ((c = getopt_long(argc, argv, "c:d:ghkl:mM:n:p:P:r:R:s:S:t:u:U:vw:yY:", long_options, &i)) != -1) { switch(c) { case 'c': ckp.config = strdup(optarg); @@ -5743,6 +5746,9 @@ int main(int argc, char **argv) case 'm': markersummary_auto = true; break; + case 'M': + mark_start = strdup(optarg); + break; case 'n': ckp.name = strdup(optarg); break; diff --git a/src/ckdb.h b/src/ckdb.h index 2e2f1601..16e33eb7 100644 --- a/src/ckdb.h +++ b/src/ckdb.h @@ -55,7 +55,7 @@ #define DB_VLOCK "1" #define DB_VERSION "1.0.4" -#define CKDB_VERSION DB_VERSION"-1.610" +#define CKDB_VERSION DB_VERSION"-1.620" #define WHERE_FFL " - from %s %s() line %d" #define WHERE_FFL_HERE __FILE__, __func__, __LINE__ @@ -1958,6 +1958,9 @@ extern K_STORE *markersummary_store; extern K_TREE *markersummary_pool_root; extern K_STORE *markersummary_pool_store; +// The markerid load start for markersummary +extern char *mark_start; + // WORKMARKERS typedef struct workmarkers { int64_t markerid; diff --git a/src/ckdb_cmd.c b/src/ckdb_cmd.c index bab3f32a..c9d9f660 100644 --- a/src/ckdb_cmd.c +++ b/src/ckdb_cmd.c @@ -3636,9 +3636,9 @@ static char *cmd_setatts(PGconn *conn, char *cmd, char *id, } } if (!ua_item) { - K_RLOCK(useratts_free); + K_WLOCK(useratts_free); ua_item = k_unlink_head(useratts_free); - K_RUNLOCK(useratts_free); + K_WUNLOCK(useratts_free); DATA_USERATTS(useratts, ua_item); bzero(useratts, sizeof(*useratts)); useratts->userid = users->userid; @@ -5796,7 +5796,7 @@ static char *cmd_marks(PGconn *conn, char *cmd, char *id, char description[TXT_BIG+1] = { '\0' }; char extra[TXT_BIG+1] = { '\0' }; char status[TXT_FLAG+1] = { MARK_READY, '\0' }; - bool ok; + bool ok = false, pps; LOGDEBUG("%s(): cmd '%s'", __func__, cmd); @@ -6237,6 +6237,39 @@ static char *cmd_marks(PGconn *conn, char *cmd, char *id, old ? "On" : "Off", markersummary_auto ? "On" : "Off"); ok = true; + } else if (strcasecmp(action, "pps") == 0) { + /* Recalculate a shift's rewards/rewarded + * Require markerid */ + i_markerid = require_name(trf_root, "markerid", 1, + (char *)intpatt, reply, siz); + if (!i_markerid) + return strdup(reply); + TXT_TO_BIGINT("markerid", transfer_data(i_markerid), markerid); + + K_RLOCK(workmarkers_free); + wm_item = find_workmarkerid(markerid, false, MARKER_PROCESSED); + K_RUNLOCK(workmarkers_free); + if (!wm_item) { + snprintf(reply, siz, "no markerid %"PRId64, markerid); + return strdup(reply); + } + DATA_WORKMARKERS(workmarkers, wm_item); + pps = shift_rewards(wm_item); + if (pps) { + snprintf(msg, sizeof(msg), + "shift '%s' markerid %"PRId64" rewards %d " + "rewarded %.3e pps %.3e", + workmarkers->description, + workmarkers->markerid, workmarkers->rewards, + workmarkers->rewarded, workmarkers->pps_value); + } else { + snprintf(msg, sizeof(msg), + "shift '%s' markerid %"PRId64" no rewards yet" + " pps %.3e", + workmarkers->description, + workmarkers->markerid, workmarkers->pps_value); + } + ok = true; } else { snprintf(reply, siz, "unknown action '%s'", action); LOGERR("%s.%s", id, reply); diff --git a/src/ckdb_dbio.c b/src/ckdb_dbio.c index 2aa6d8cf..5c481963 100644 --- a/src/ckdb_dbio.c +++ b/src/ckdb_dbio.c @@ -6766,9 +6766,10 @@ bool markersummary_fill(PGconn *conn) K_ITEM *item, *p_item; int n, t, i, p_n; MARKERSUMMARY *row, *p_row; + char *params[1]; char *field; char *sel; - int fields = 20; + int fields = 20, par = 0; bool ok = false; LOGDEBUG("%s(): select", __func__); @@ -6783,7 +6784,16 @@ bool markersummary_fill(PGconn *conn) "sharecount,errorcount,firstshare,lastshare,firstshareacc," "lastshareacc,lastdiffacc" MODIFYDATECONTROL - " from markersummary"; + " from markersummary where markerid>=$1"; + par = 0; + if (mark_start) + params[par++] = mark_start; + else + params[par++] = "0"; + PARCHK(par, params); + + LOGWARNING("%s(): loading from markerid>=%s", __func__, params[0]); + res = PQexec(conn, "Begin", CKPQ_READ); rescode = PQresultStatus(res); PQclear(res); @@ -6792,7 +6802,7 @@ bool markersummary_fill(PGconn *conn) return false; } - res = PQexec(conn, sel, CKPQ_READ); + res = PQexecParams(conn, sel, par, NULL, (const char **)params, NULL, NULL, 0, CKPQ_READ); rescode = PQresultStatus(res); PQclear(res); if (!PGOK(rescode)) { diff --git a/src/ckpool.c b/src/ckpool.c index a76db57b..01adfc38 100644 --- a/src/ckpool.c +++ b/src/ckpool.c @@ -515,13 +515,14 @@ void empty_buffer(connsock_t *cs) * of the buffer for use on the next receive. */ int read_socket_line(connsock_t *cs, float *timeout) { - int fd = cs->fd, ret = -1; char *eom = NULL; tv_t start, now; size_t buflen; + int ret = -1; + bool polled; float diff; - if (unlikely(fd < 0)) + if (unlikely(cs->fd < 0)) goto out; if (unlikely(!cs->buf)) @@ -542,7 +543,8 @@ rewait: ret = 0; goto out; } - ret = wait_read_select(fd, eom ? 0 : *timeout); + ret = wait_recv_select(cs->fd, eom ? 0 : *timeout); + polled = true; if (ret < 1) { if (!ret) { if (eom) @@ -558,19 +560,23 @@ rewait: } tv_time(&now); diff = tvdiff(&now, &start); + copy_tv(&start, &now); *timeout -= diff; while (42) { char readbuf[PAGESIZE] = {}; int backoff = 1; char *newbuf; - ret = recv(fd, readbuf, PAGESIZE - 4, MSG_DONTWAIT); + ret = recv(cs->fd, readbuf, PAGESIZE - 4, MSG_DONTWAIT); if (ret < 1) { /* No more to read or closed socket after valid message */ if (eom) break; - /* Have we used up all the timeout yet? */ - if (*timeout >= 0 && (errno == EAGAIN || errno == EWOULDBLOCK || !ret)) + /* Have we used up all the timeout yet? If polled is + * set that means poll has said there should be + * something to read and if we get nothing it means the + * socket is closed. */ + if (!polled && *timeout >= 0 && (errno == EAGAIN || errno == EWOULDBLOCK || !ret)) goto rewait; if (cs->ckp->proxy) LOGINFO("Failed to recv in read_socket_line"); @@ -578,6 +584,7 @@ rewait: LOGERR("Failed to recv in read_socket_line"); goto out; } + polled = false; buflen = cs->bufofs + ret + 1; while (42) { newbuf = realloc(cs->buf, buflen); @@ -766,6 +773,8 @@ json_t *json_rpc_call(connsock_t *cs, const char *rpc_req) double elapsed; int len, ret; + /* Serialise all calls in case we use cs from multiple threads */ + cksem_wait(&cs->sem); if (unlikely(cs->fd < 0)) { LOGWARNING("FD %d invalid in %s", cs->fd, __func__); goto out; @@ -853,13 +862,20 @@ out_empty: if (!val) { /* Assume that a failed request means the socket will be closed * and reopen it */ - LOGWARNING("Reopening socket to %s:%s", cs->url, cs->port); Close(cs->fd); - cs->fd = connect_socket(cs->url, cs->port); } out: + if (cs->fd < 0) { + /* Attempt to reopen a socket that has been closed due to a + * failed request or if the socket was closed while trying to + * read/write to it. */ + cs->fd = connect_socket(cs->url, cs->port); + LOGWARNING("Attempt to reopen socket to %s:%s %ssuccessful", + cs->url, cs->port, cs->fd > 0 ? "" : "un"); + } free(http_req); dealloc(cs->buf); + cksem_post(&cs->sem); return val; } diff --git a/src/ckpool.h b/src/ckpool.h index 2593c969..f0e48c26 100644 --- a/src/ckpool.h +++ b/src/ckpool.h @@ -80,6 +80,8 @@ struct connsock { int bufofs; int buflen; ckpool_t *ckp; + /* Semaphore used to serialise request/responses */ + sem_t sem; }; typedef struct connsock connsock_t; @@ -110,6 +112,7 @@ struct server_instance { char *auth; char *pass; bool notify; + bool alive; connsock_t cs; void *data; // Private data diff --git a/src/generator.c b/src/generator.c index cddd0fe9..560ac7ac 100644 --- a/src/generator.c +++ b/src/generator.c @@ -146,7 +146,7 @@ struct generator_data { int subproxies_generated; int proxy_notify_id; // Globally increasing notify id - ckmsgq_t *srvchk; // Server check message queue + server_instance_t *si; /* Current server instance */ pthread_t pth_uprecv; // User proxy receive thread pthread_t pth_psend; // Combined proxy send thread @@ -177,6 +177,7 @@ static bool server_alive(ckpool_t *ckp, server_instance_t *si, bool pinging) /* Has this server already been reconnected? */ if (cs->fd > 0) return true; + si->alive = false; if (!extract_sockaddr(si->url, &cs->url, &cs->port)) { LOGWARNING("Failed to extract address from %s", si->url); return ret; @@ -218,8 +219,11 @@ out: if (!ret) { /* Close and invalidate the file handle */ Close(cs->fd); - } else + } else { + si->alive = true; + LOGNOTICE("Server alive: %s:%s", cs->url, cs->port); keep_sockalive(cs->fd); + } return ret; } @@ -235,19 +239,31 @@ retry: if (!ping_main(ckp)) goto out; + /* First find a server that is already flagged alive if possible + * without blocking on server_alive() */ for (i = 0; i < ckp->btcds; i++) { server_instance_t *si = ckp->servers[i]; + cs = &si->cs; - if (server_alive(ckp, si, false)) { + if (si->alive && cs->fd > 0) { alive = si; - break; + goto living; } } - if (!alive) { - LOGWARNING("CRITICAL: No bitcoinds active!"); - sleep(5); - goto retry; + + /* No servers flagged alive, try to connect to them blocking */ + for (i = 0; i < ckp->btcds; i++) { + server_instance_t *si = ckp->servers[i]; + + if (server_alive(ckp, si, false)) { + alive = si; + goto living; + } } + LOGWARNING("CRITICAL: No bitcoinds active!"); + sleep(5); + goto retry; +living: cs = &alive->cs; LOGINFO("Connected to live server %s:%s", cs->url, cs->port); out: @@ -284,11 +300,9 @@ static void clear_unix_msg(unix_msg_t **umsg) static int gen_loop(proc_instance_t *pi) { - server_instance_t *si = NULL; - bool reconnecting = false; + server_instance_t *si = NULL, *old_si; unix_msg_t *umsg = NULL; ckpool_t *ckp = pi->ckp; - gdata_t *gdata = ckp->data; bool started = false; char *buf = NULL; connsock_t *cs; @@ -298,24 +312,24 @@ static int gen_loop(proc_instance_t *pi) reconnect: clear_unix_msg(&umsg); - if (si) { - kill_server(si); - reconnecting = true; - } + old_si = si; si = live_server(ckp); if (!si) goto out; + if (unlikely(!started)) { + started = true; + LOGWARNING("%s generator ready", ckp->name); + } gbt = si->data; cs = &si->cs; - if (reconnecting) { + if (!old_si) + LOGWARNING("Connected to bitcoind: %s:%s", cs->url, cs->port); + else if (si != old_si) LOGWARNING("Failed over to bitcoind: %s:%s", cs->url, cs->port); - reconnecting = false; - } retry: clear_unix_msg(&umsg); - ckmsgq_add(gdata->srvchk, si); do { umsg = get_unix_msg(pi); @@ -327,7 +341,7 @@ retry: } while (!umsg); if (unlikely(cs->fd < 0)) { - LOGWARNING("Bitcoind socket invalidated, will attempt failover"); + LOGWARNING("%s:%s Bitcoind socket invalidated, will attempt failover", cs->url, cs->port); goto reconnect; } @@ -358,10 +372,6 @@ retry: cs->url, cs->port); send_unix_msg(umsg->sockd, "failed"); } else { - if (unlikely(!started)) { - started = true; - LOGWARNING("%s generator ready", ckp->name); - } send_unix_msg(umsg->sockd, hash); } } else if (cmdmatch(buf, "getlast")) { @@ -378,11 +388,6 @@ retry: send_unix_msg(umsg->sockd, "failed"); goto reconnect; } else { - if (unlikely(!started)) { - started = true; - LOGWARNING("%s generator ready", ckp->name); - } - send_unix_msg(umsg->sockd, hash); LOGDEBUG("Hash: %s", hash); } @@ -2724,8 +2729,38 @@ out: return ret; } +/* Check which servers are alive, maintaining a connection with them and + * reconnect if a higher priority one is available. */ +static void *server_watchdog(void *arg) +{ + ckpool_t *ckp = (ckpool_t *)arg; + gdata_t *gdata = ckp->data; + + while (42) { + server_instance_t *best = NULL; + ts_t timer_t; + int i; + + cksleep_prepare_r(&timer_t); + for (i = 0; i < ckp->btcds; i++) { + server_instance_t *si = ckp->servers[i]; + + /* Have we reached the current server? */ + if (server_alive(ckp, si, true) && !best) + best = si; + } + if (best && best != gdata->si) { + gdata->si = best; + send_proc(ckp->generator, "reconnect"); + } + cksleep_ms_r(&timer_t, 5000); + } + return NULL; +} + static int server_mode(ckpool_t *ckp, proc_instance_t *pi) { + pthread_t pth_watchdog; server_instance_t *si; int i, ret; @@ -2737,8 +2772,12 @@ static int server_mode(ckpool_t *ckp, proc_instance_t *pi) si->auth = ckp->btcdauth[i]; si->pass = ckp->btcdpass[i]; si->notify = ckp->btcdnotify[i]; + si->id = i; + cksem_init(&si->cs.sem); + cksem_post(&si->cs.sem); } + create_pthread(&pth_watchdog, server_watchdog, ckp); ret = gen_loop(pi); for (i = 0; i < ckp->btcds; i++) { @@ -2798,41 +2837,6 @@ static int proxy_mode(ckpool_t *ckp, proc_instance_t *pi) return ret; } -/* Tell the watchdog what the current server instance is and decide if we - * should check to see if the higher priority servers are alive and fallback */ -static void server_watchdog(ckpool_t *ckp, server_instance_t *cursi) -{ - static time_t last_t = 0; - bool alive = false; - time_t now_t; - int i; - - /* Rate limit to checking only once every 5 seconds */ - now_t = time(NULL); - if (now_t <= last_t + 5) - return; - - last_t = now_t; - - /* Is this the highest priority server already? */ - if (!cursi->id) - return; - - for (i = 0; i < ckp->btcds; i++) { - server_instance_t *si = ckp->servers[i]; - - /* Have we reached the current server? */ - if (si == cursi) - return; - - alive = server_alive(ckp, si, true); - if (alive) - break; - } - if (alive) - reconnect_generator(ckp); -} - int generator(proc_instance_t *pi) { ckpool_t *ckp = pi->ckp; @@ -2859,10 +2863,8 @@ int generator(proc_instance_t *pi) } while (!buf); dealloc(buf); ret = proxy_mode(ckp, pi); - } else { - gdata->srvchk = create_ckmsgq(ckp, "srvchk", &server_watchdog); + } else ret = server_mode(ckp, pi); - } out: dealloc(ckp->data); return process_exit(ckp, pi, ret); diff --git a/src/klist.h b/src/klist.h index 9e5888a7..d872f034 100644 --- a/src/klist.h +++ b/src/klist.h @@ -586,7 +586,7 @@ extern void _k_insert_after(K_LIST *list, K_ITEM *item, K_ITEM *after, LOCK_MAYB //#define k_insert_after_nolock(_list, _item, _after) _k_insert_after(_list, _item, _after, false, KLIST_FFL_HERE) extern void _k_unlink_item(K_LIST *list, K_ITEM *item, LOCK_MAYBE bool chklock, KLIST_FFL_ARGS); #define k_unlink_item(_list, _item) _k_unlink_item(_list, _item, true, KLIST_FFL_HERE) -//#define k_unlink_item_nolock(_list, _item) _k_unlink_item(_list, _item, false, KLIST_FFL_HERE) +#define k_unlink_item_nolock(_list, _item) _k_unlink_item(_list, _item, false, KLIST_FFL_HERE) extern void _k_list_transfer_to_head(K_LIST *from, K_LIST *to, LOCK_MAYBE bool chklock, KLIST_FFL_ARGS); #define k_list_transfer_to_head(_from, _to) _k_list_transfer_to_head(_from, _to, true, KLIST_FFL_HERE) //#define k_list_transfer_to_head_nolock(_from, _to) _k_list_transfer_to_head(_from, _to, false, KLIST_FFL_HERE) diff --git a/src/libckpool.c b/src/libckpool.c index a3fb3bcb..4d07fc02 100644 --- a/src/libckpool.c +++ b/src/libckpool.c @@ -620,13 +620,17 @@ void block_socket(int fd) void _close(int *fd, const char *file, const char *func, const int line) { + int sockd; + if (*fd < 0) return; - LOGDEBUG("Closing file handle %d", *fd); - if (unlikely(close(*fd))) - LOGWARNING("Close of fd %d failed with errno %d:%s from %s %s:%d", - *fd, errno, strerror(errno), file, func, line); + sockd = *fd; + LOGDEBUG("Closing file handle %d", sockd); *fd = -1; + if (unlikely(close(sockd))) { + LOGWARNING("Close of fd %d failed with errno %d:%s from %s %s:%d", + sockd, errno, strerror(errno), file, func, line); + } } int bind_socket(char *url, char *port) @@ -754,17 +758,15 @@ out: void empty_socket(int fd) { + char buf[PAGESIZE]; int ret; if (fd < 1) return; do { - char buf[PAGESIZE]; - - ret = wait_read_select(fd, 0); + ret = recv(fd, buf, PAGESIZE - 1, MSG_DONTWAIT); if (ret > 0) { - ret = recv(fd, buf, PAGESIZE - 1, 0); buf[ret] = 0; LOGDEBUG("Discarding: %s", buf); } @@ -903,21 +905,45 @@ int wait_close(int sockd, int timeout) return sfd.revents & (POLLHUP | POLLRDHUP | POLLERR); } -/* Emulate a select read wait for high fds that select doesn't support */ -int wait_read_select(int sockd, float timeout) +/* Emulate a select read wait for high fds that select doesn't support. + * wait_read_select is for unix sockets and _wait_recv_select for regular + * sockets. */ +int _wait_read_select(int *sockd, float timeout) { struct pollfd sfd; int ret = -1; - if (unlikely(sockd < 0)) + if (unlikely(*sockd < 0)) goto out; - sfd.fd = sockd; + sfd.fd = *sockd; sfd.events = POLLIN | POLLRDHUP; - sfd.revents = 0; timeout *= 1000; ret = poll(&sfd, 1, timeout); - if (ret && !(sfd.revents & POLLIN)) + if (ret > 0 && sfd.revents & (POLLERR)) { ret = -1; + _Close(sockd); + } +out: + return ret; +} + +int _wait_recv_select(int *sockd, float timeout) +{ + struct pollfd sfd; + int ret = -1; + + if (unlikely(*sockd < 0)) + goto out; + sfd.fd = *sockd; + sfd.events = POLLIN | POLLRDHUP; + timeout *= 1000; + ret = poll(&sfd, 1, timeout); + /* If POLLRDHUP occurs, we may still have data to read so let recv() + * after this determine if the socket can still be used. */ + if (ret > 0 && sfd.revents & (POLLHUP | POLLERR)) { + ret = -1; + _Close(sockd); + } out: return ret; } @@ -1978,6 +2004,8 @@ double diff_from_nbits(char *nbits) pow = nbits[0]; powdiff = (8 * (0x1d - 3)) - (8 * (pow - 3)); + if (powdiff < 8) + powdiff = 8; diff32 = be32toh(*((uint32_t *)nbits)) & 0x00FFFFFF; if (likely(powdiff > 0)) numerator = 0xFFFFULL << powdiff; diff --git a/src/libckpool.h b/src/libckpool.h index 9d8ba341..222ce26f 100644 --- a/src/libckpool.h +++ b/src/libckpool.h @@ -496,7 +496,10 @@ int _open_unix_server(const char *server_path, const char *file, const char *fun int _open_unix_client(const char *server_path, const char *file, const char *func, const int line); #define open_unix_client(server_path) _open_unix_client(server_path, __FILE__, __func__, __LINE__) int wait_close(int sockd, int timeout); -int wait_read_select(int sockd, float timeout); +int _wait_read_select(int *sockd, float timeout); +#define wait_read_select(SOCKD, TIMEOUT) _wait_read_select(&(SOCKD), TIMEOUT) +int _wait_recv_select(int *sockd, float timeout); +#define wait_recv_select(SOCKD, TIMEOUT) _wait_recv_select(&(SOCKD), TIMEOUT) int read_length(int sockd, void *buf, int len); char *_recv_unix_msg(int sockd, int timeout1, int timeout2, const char *file, const char *func, const int line); #define RECV_UNIX_TIMEOUT1 30 diff --git a/src/stratifier.c b/src/stratifier.c index 0a9e19b2..163f39f9 100644 --- a/src/stratifier.c +++ b/src/stratifier.c @@ -970,10 +970,10 @@ static void broadcast_ping(sdata_t *sdata); static void *do_update(void *arg) { struct update_req *ur = (struct update_req *)arg; + int prio = ur->prio, retries = 0; ckpool_t *ckp = ur->ckp; sdata_t *sdata = ckp->data; bool new_block = false; - int prio = ur->prio; bool ret = false; workbase_t *wb; time_t now_t; @@ -983,15 +983,20 @@ static void *do_update(void *arg) pthread_detach(pthread_self()); rename_proc("updater"); +retry: buf = send_recv_generator(ckp, "getbase", prio); if (unlikely(!buf)) { LOGNOTICE("Get base in update_base delayed due to higher priority request"); goto out; } if (unlikely(cmdmatch(buf, "failed"))) { - LOGWARNING("Generator returned failure in update_base"); - goto out; + if (retries++ < 5 || prio == GEN_PRIORITY) { + LOGWARNING("Generator returned failure in update_base, retry #%d", retries); + goto retry; + } } + if (unlikely(retries)) + LOGWARNING("Generator succeeded in update_base after retrying"); wb = ckzalloc(sizeof(workbase_t)); wb->ckp = ckp; @@ -4968,7 +4973,7 @@ static void suggest_diff(stratum_instance_t *client, const char *method, const j int64_t sdiff; if (unlikely(!client_active(client))) { - LOGWARNING("Attempted to suggest diff on unauthorised client %"PRId64, client->id); + LOGNOTICE("Attempted to suggest diff on unauthorised client %"PRId64, client->id); return; } if (arr_val && json_is_integer(arr_val)) @@ -4989,6 +4994,13 @@ static void suggest_diff(stratum_instance_t *client, const char *method, const j stratum_send_diff(sdata, client); } +/* Send diff first when sending the first stratum template after subscribing */ +static void init_client(sdata_t *sdata, const stratum_instance_t *client, const int64_t client_id) +{ + stratum_send_diff(sdata, client); + stratum_send_update(sdata, client_id, true); +} + /* Enter with client holding ref count */ static void parse_method(ckpool_t *ckp, sdata_t *sdata, stratum_instance_t *client, const int64_t client_id, json_t *id_val, json_t *method_val, @@ -5027,7 +5039,7 @@ static void parse_method(ckpool_t *ckp, sdata_t *sdata, stratum_instance_t *clie json_object_set_new_nocheck(val, "error", json_null()); stratum_add_send(sdata, val, client_id); if (likely(client->subscribed)) - update_client(client, client_id); + init_client(sdata, client, client_id); return; }