Browse Source

Merge branch 'master' into multiproxy

Conflicts:
	src/ckpool.h
	src/generator.c
	src/stratifier.c
master
Con Kolivas 9 years ago
parent
commit
7280596f7e
  1. 2
      pool/inc.php
  2. 2
      pool/page_allwork.php
  3. 2
      pool/page_luck.php
  4. 2
      pool/page_psperf.php
  5. 9
      pool/page_usperf.php
  6. 39
      pool/page_workers.php
  7. 12
      src/bitcoin.c
  8. 20
      src/ckdb.c
  9. 5
      src/ckdb.h
  10. 39
      src/ckdb_cmd.c
  11. 16
      src/ckdb_dbio.c
  12. 32
      src/ckpool.c
  13. 3
      src/ckpool.h
  14. 130
      src/generator.c
  15. 2
      src/klist.h
  16. 56
      src/libckpool.c
  17. 5
      src/libckpool.h
  18. 22
      src/stratifier.c

2
pool/inc.php

@ -29,7 +29,7 @@ function gfl(c){c['ctx'].fill()}
function gst(c){c['ctx'].stroke()} function gst(c){c['ctx'].stroke()}
function gfi(c){gle(c);gst(c)} function gfi(c){gle(c);gst(c)}
function gbd(c){gbe(c,0,0);gln(c,1,0);gln(c,1,1);gln(c,0,1);gle(c);gfl(c);gst(c)} function gbd(c){gbe(c,0,0);gln(c,1,0);gln(c,1,1);gln(c,0,1);gle(c);gfl(c);gst(c)}
function ggr(c,xs,ys,yt,xn,x0,x1,y0,y1,ar,nx,vx,vy,av,w,cols){gtso(c,xs,ys);gss(c,'black');glw(c,1.5);gbe(c,0,1);gln(c,0,0);gln(c,1,0);gst(c);glw(c,0.2);var hi=c['ctx'].measureText('M').width, wi=c['ctx'].measureText('0').width;for(var i=0;i<11;i++){var y=i/10.0;gbe(c,-0.01,y);gln(c,1,y);gst(c);var t=''+(((y1-y0)*i/10+y0).toFixed(2));gfz(c,0,y,-wi,0,t,'black','end')}gfz(c,gx0(c),0.55,wi,0,yt,'#0080ff','left');var m=Math.round(0.5+xn/20.0);for(var i=0;i<xn;i++){var n=ar[nx+i];var x=ar[vx+i];var xo=(x-x0)/(x1-x0);if(c['skey'] && (i<(xn-1)) && (i%m) == 0){gbe(c,xo,0);gln(c,xo,-0.01);gst(c);gfz(c,xo,0,0,-hi*1.5,n,'#00a050','center')}if(c['slines']){gbe(c,xo,0);gln(c,xo,1);gst(c)}}var xhr=3600+x1-(x1%3600);gss(c,'brown');if(c['tkey'] || c['tlines']){var hlv=c['hln'][c['hl']];hrs=c['hrs'][c['hr']]*3600/hlv;var n,l=0;tpos=2.7;if(c['over']){tpos=1.5}for(var i=xhr;i>=x0;i-=hrs){var xo=(i-x0)/(x1-x0);if(c['hrs'][c['hr']]<=48){n=dfmt(c,i)}else{n=dfmtm(c,i)}if(xo<=1 && c['tkey'] && ((l%hlv)==0)){gbe(c,xo,0);gln(c,xo,-0.02);gst(c);gfz(c,xo,0,0,-hi*tpos,n,'brown','center')}if(xo<=1 && c['tlines']){gbe(c,xo,0);gln(c,xo,1);gst(c)}l++}}glw(c,1);if(c['smooth']){for(var j=1;j<w.length;j++){if(c['lin'+j]){var f=1;gss(c,cols[j-1]);var xa=0,ya=0,xb=0,yb=0;for(var i=0;i<xn;i++){var x=ar[vx+i];var y=ar[w[j]+vy+i];var xo=(x-x0)/(x1-x0);var yo=(y-y0)/(y1-y0);if(f==1){gbe(c,xo,yo);f=0;xb=xo;yb=yo}else{gct(c,(xa+xb)/2,(ya+yb)/2,xb,yb,(xb+xo)/2,(yb+yo)/2)}xa=xb;ya=yb;xb=xo;yb=yo}gct(c,(xa+xb)/2,(ya+yb)/2,xo,yo,xo,yo);gst(c)}}}else{for(var j=1;j<w.length;j++){if(c['lin'+j]){var f=1;gss(c,cols[j-1]);for(var i=0;i<xn;i++){var x=ar[vx+i];var y=ar[w[j]+vy+i];var xo=(x-x0)/(x1-x0);var yo=(y-y0)/(y1-y0);if(f==1){gbe(c,xo,yo);f=0}else{gln(c,xo,yo)}}gst(c)}}}glw(c,1);for(var j=1;j<w.length;j++){if(av[j-1]>0 && c['lin'+j]){gss(c,'red');var y=(av[j-1]-y0)/(y1-y0);gbe(c,0,y);gln(c,1,y);gst(c);var t=''+av[j-1].toFixed(2)+'av';gfz(c,1,y,1,0,t,cols[j-1],'left')}}if(c['tkey']){var i,dsp;i=c['hrs'][c['hr']];if(i < 24){dsp=''+i+'h'}else{dsp=''+(i/24)+'d'}gfz(c,1,0,c['xo']-c['pxe'],hi,dsp,'red','end');i=c['hln'][c['hl']];gfz(c,1,0,c['xo']-c['pxe'],hi*3,''+i,'red','end')}} function ggr(c,xs,ys,yt,xn,x0,x1,y0,y1,ar,nx,vx,vy,av,w,cols,ex){gtso(c,xs,ys);if(ex){ex(0,c,xn,x0,x1,y0,y1,ar)}gss(c,'black');glw(c,1.5);gbe(c,0,1);gln(c,0,0);gln(c,1,0);gst(c);glw(c,0.2);var hi=c['ctx'].measureText('M').width, wi=c['ctx'].measureText('0').width;for(var i=0;i<11;i++){var y=i/10.0;gbe(c,-0.01,y);gln(c,1,y);gst(c);var t=''+(((y1-y0)*i/10+y0).toFixed(2));gfz(c,0,y,-wi,0,t,'black','end')}gfz(c,gx0(c),0.55,wi,0,yt,'#0080ff','left');var m=Math.round(0.5+xn/20.0);for(var i=0;i<xn;i++){var n=ar[nx+i];var x=ar[vx+i];var xo=(x-x0)/(x1-x0);if(c['skey'] && (i<(xn-1)) && (i%m) == 0){gbe(c,xo,0);gln(c,xo,-0.01);gst(c);gfz(c,xo,0,0,-hi*1.5,n,'#00a050','center')}if(c['slines']){gbe(c,xo,0);gln(c,xo,1);gst(c)}}var xhr=3600+x1-(x1%3600);gss(c,'brown');if(c['tkey'] || c['tlines']){var hlv=c['hln'][c['hl']];hrs=c['hrs'][c['hr']]*3600/hlv;var n,l=0;tpos=2.7;if(c['over']){tpos=1.5}for(var i=xhr;i>=x0;i-=hrs){var xo=(i-x0)/(x1-x0);if(c['hrs'][c['hr']]<=48){n=dfmt(c,i)}else{n=dfmtm(c,i)}if(xo<=1 && c['tkey'] && ((l%hlv)==0)){gbe(c,xo,0);gln(c,xo,-0.02);gst(c);gfz(c,xo,0,0,-hi*tpos,n,'brown','center')}if(xo<=1 && c['tlines']){gbe(c,xo,0);gln(c,xo,1);gst(c)}l++}}glw(c,1);if(c['smooth']){for(var j=1;j<w.length;j++){if(c['lin'+j]){var f=1;gss(c,cols[j-1]);var xa=0,ya=0,xb=0,yb=0;for(var i=0;i<xn;i++){var x=ar[vx+i];var y=ar[w[j]+vy+i];var xo=(x-x0)/(x1-x0);var yo=(y-y0)/(y1-y0);if(f==1){gbe(c,xo,yo);f=0;xb=xo;yb=yo}else{gct(c,(xa+xb)/2,(ya+yb)/2,xb,yb,(xb+xo)/2,(yb+yo)/2)}xa=xb;ya=yb;xb=xo;yb=yo}gct(c,(xa+xb)/2,(ya+yb)/2,xo,yo,xo,yo);gst(c)}}}else{for(var j=1;j<w.length;j++){if(c['lin'+j]){var f=1;gss(c,cols[j-1]);for(var i=0;i<xn;i++){var x=ar[vx+i];var y=ar[w[j]+vy+i];var xo=(x-x0)/(x1-x0);var yo=(y-y0)/(y1-y0);if(f==1){gbe(c,xo,yo);f=0}else{gln(c,xo,yo)}}gst(c)}}}glw(c,1);for(var j=1;j<w.length;j++){if(av[j-1]>0 && c['lin'+j]){gss(c,'red');var y=(av[j-1]-y0)/(y1-y0);gbe(c,0,y);gln(c,1,y);gst(c);var t=''+av[j-1].toFixed(2)+'av';gfz(c,1,y,1,0,t,cols[j-1],'left')}}if(c['tkey']){var i,dsp;i=c['hrs'][c['hr']];if(i < 24){dsp=''+i+'h'}else{dsp=''+(i/24)+'d'}gfz(c,1,0,c['xo']-c['pxe'],hi,dsp,'red','end');i=c['hln'][c['hl']];gfz(c,1,0,c['xo']-c['pxe'],hi*3,''+i,'red','end')}if(ex){ex(9,c,xn,x0,x1,y0,y1,ar)}}
function sn(i,shi){if(shi.indexOf(' Shift ')<0){return ''+(i%10)}else{return shi.replace(/.* ([a-z])[a-z]*$/,'$1')}} function sn(i,shi){if(shi.indexOf(' Shift ')<0){return ''+(i%10)}else{return shi.replace(/.* ([a-z])[a-z]*$/,'$1')}}
function gc2(c){var div=document.getElementById('can0');while (div.firstChild){div.removeChild(div.firstChild)}c['can']=document.createElement('canvas');c['can'].id='can';c['xo']=0.0;c['yo']=0.0;c['ctx']=c['can'].getContext('2d');c['ctx'].canvas.width=c['xm']+1;c['ctx'].canvas.height=c['ym']+1;div.appendChild(c['can']);c['pxe']=Math.max(Math.round(c['xm']/250),1)} function gc2(c){var div=document.getElementById('can0');while (div.firstChild){div.removeChild(div.firstChild)}c['can']=document.createElement('canvas');c['can'].id='can';c['xo']=0.0;c['yo']=0.0;c['ctx']=c['can'].getContext('2d');c['ctx'].canvas.width=c['xm']+1;c['ctx'].canvas.height=c['ym']+1;div.appendChild(c['can']);c['pxe']=Math.max(Math.round(c['xm']/250),1)}
function gc(c){c['wx']=window.innerWidth;c['wy']=window.innerHeight;c['xm']=Math.max(Math.round(c['wx']*0.9+0.5),400);c['ym']=Math.max(Math.round(c['wy']*0.8+0.5),400);if(c['ym']>c['xm']){c['ym']=c['xm']}gc2(c)} function gc(c){c['wx']=window.innerWidth;c['wy']=window.innerHeight;c['xm']=Math.max(Math.round(c['wx']*0.9+0.5),400);c['ym']=Math.max(Math.round(c['wy']*0.8+0.5),400);if(c['ym']>c['xm']){c['ym']=c['xm']}gc2(c)}

2
pool/page_allwork.php

@ -6,7 +6,7 @@ function doallwork($data, $user)
{ {
$pg = '<h1>All Workers</h1>'; $pg = '<h1>All Workers</h1>';
$pg .= "<table callpadding=0 cellspacing=0 border=0>\n"; $pg .= worktable();
$totshare = 0; $totshare = 0;
$totdiff = 0; $totdiff = 0;

2
pool/page_luck.php

@ -13,7 +13,7 @@ for(var j=1;j<w.length;j++){var pre=w[j];var lk=0,nam=pre+'luck:'+i;if(d[nam]){l
} }
if(ymax>500){ymax=500} if(ymax>500){ymax=500}
ghg(c,xmax-xmin); ghg(c,xmax-xmin);
ggr(c,0.90,0.90,'Luck%',rows,xmin,xmax,ymin,ymax,d,'seq:','vx:','luck:',tlk,w,cols)} ggr(c,0.90,0.90,'Luck%',rows,xmin,xmax,ymin,ymax,d,'seq:','vx:','luck:',tlk,w,cols,null)}
c={}; c={};
function dodrw(data,cbx){if(hasCan()){gdrw(c,sep(data),cbx)}} function dodrw(data,cbx){if(hasCan()){gdrw(c,sep(data),cbx)}}
function gact(t){if(t.checked){scnv(t.id,1)}else{scnv(t.id,0)}godrw(0)}"; function gact(t){if(t.checked){scnv(t.id,1)}else{scnv(t.id,0)}godrw(0)}";

2
pool/page_psperf.php

@ -15,7 +15,7 @@ for(var j=1;j<w.length;j++){tda[j-1]*=(Math.pow(2,32)/Math.pow(10,12)/(xmax-xmin
var p5=(ymax-ymin)*0.05;ymax+=p5;ymin-=p5;if(ymin<0){ymin=0} var p5=(ymax-ymin)*0.05;ymax+=p5;ymin-=p5;if(ymin<0){ymin=0}
if(c['zerob']){ymin=0} if(c['zerob']){ymin=0}
ghg(c,xmax-xmin); ghg(c,xmax-xmin);
ggr(c,0.9,0.9,'TH/s',rows,xmin,xmax,ymin,ymax,d,'nx:','vx:','ths:',tda,w,cols)} ggr(c,0.9,0.9,'TH/s',rows,xmin,xmax,ymin,ymax,d,'nx:','vx:','ths:',tda,w,cols,null)}
c={}; c={};
function dodrw(data,cbx){if(hasCan()){gdrw(c,sep(data),cbx)}} function dodrw(data,cbx){if(hasCan()){gdrw(c,sep(data),cbx)}}
function gact(t){if(t.checked){scnv(t.id,1)}else{scnv(t.id,0)}godrw(0)}"; function gact(t){if(t.checked){scnv(t.id,1)}else{scnv(t.id,0)}godrw(0)}";

9
pool/page_usperf.php

@ -1,8 +1,9 @@
<?php <?php
# #
function uspg($nc) function uspg($nc,$fs)
{ {
$g = "function gdrw(c,d,cbx){gc(c);ghrs(c);gopt(c,cbx); $g = "function exf(n,c,xn,x0,x1,y0,y1,ar){if(n==0){var i,st=-1,fi=-1;for(i=0;i<xn;i++){if(st==-1&&ar['lastpayoutstart:'+i]!=''){st=ar['start:'+i]}if(fi==-1&&ar['endmarkextra:'+i].indexOf('Block')==0){fi=ar['end:'+i]}}if(st>=0&&fi>=0){var xs,xf;xs=(st-x0)/(x1-x0);xf=(fi-x0)/(x1-x0);gfs(c,'$fs');gbe(c,xs,0);gln(c,xs,1);gln(c,xf,1);gln(c,xf,0);gle(c);gfl(c)}}}
function gdrw(c,d,cbx){gc(c);ghrs(c);gopt(c,cbx);
gfs(c,'white');gss(c,'#0000c0');glw(c,2);gbd(c); gfs(c,'white');gss(c,'#0000c0');glw(c,2);gbd(c);
var rows=d['rows'],ymin=-1,ymax=0,xmin=-1,xmax=0,tda=[]; var rows=d['rows'],ymin=-1,ymax=0,xmin=-1,xmax=0,tda=[];
var w=d['arp'].split(',');var cols=d['cols'].split(','); var w=d['arp'].split(',');var cols=d['cols'].split(',');
@ -15,7 +16,7 @@ for(var j=1;j<w.length;j++){tda[j-1]*=(Math.pow(2,32)/Math.pow(10,12)/(xmax-xmin
var p5=(ymax-ymin)*0.05;ymax+=p5;ymin-=p5;if(ymin<0){ymin=0} var p5=(ymax-ymin)*0.05;ymax+=p5;ymin-=p5;if(ymin<0){ymin=0}
if(c['zerob']){ymin=0} if(c['zerob']){ymin=0}
ghg(c,xmax-xmin); ghg(c,xmax-xmin);
ggr(c,0.9,0.9,'TH/s',rows,xmin,xmax,ymin,ymax,d,'nx:','vx:','ths:',tda,w,cols)} ggr(c,0.9,0.9,'TH/s',rows,xmin,xmax,ymin,ymax,d,'nx:','vx:','ths:',tda,w,cols,exf)}
c={}; c={};
function dodrw(data,cbx){if(hasCan()){gdrw(c,sep(data),cbx)}} function dodrw(data,cbx){if(hasCan()){gdrw(c,sep(data),cbx)}}
function gact(t){if(t.checked){scnv(t.id,1)}else{scnv(t.id,0)}godrw(0)} function gact(t){if(t.checked){scnv(t.id,1)}else{scnv(t.id,0)}godrw(0)}
@ -109,7 +110,7 @@ function dousperf($data, $user)
$data = str_replace(array("\\","'"), array("\\\\","\\'"), $ans['DATA']); $data = str_replace(array("\\","'"), array("\\\\","\\'"), $ans['DATA']);
$data .= $fld_sep . 'cols' . $val_sep . $datacols; $data .= $fld_sep . 'cols' . $val_sep . $datacols;
$pg .= "<script type='text/javascript'>\n"; $pg .= "<script type='text/javascript'>\n";
$pg .= uspg($nc); $pg .= uspg($nc,'#fff0f0');
$pg .= "\nfunction godrw(f){var cbx=["; $pg .= "\nfunction godrw(f){var cbx=[";
$comma = ''; $comma = '';
foreach ($cbx as $nam => $txt) foreach ($cbx as $nam => $txt)

39
pool/page_workers.php

@ -1,5 +1,16 @@
<?php <?php
# #
function worktable()
{
$pg = "<script type='text/javascript'>\n";
$pg .= "function wkdet(n,i){var t=document.getElementById(n);if(i&&t){var b,cs,j,c,a;b=i.checked;cs=t.getElementsByTagName('td');for(j=0;c=cs[j];j++)
{a=c.getAttribute('data-hid');if(a){if(b){c.className=a}else{c.className='hid'}}}}}";
$pg .= "</script>\n";
$pg .= "Show Details for Invalids: <input type=checkbox onclick='wkdet(\"wkt\",this)'><br>";
$pg .= "<table id=wkt callpadding=0 cellspacing=0 border=0>\n";
return $pg;
}
#
function worktitle($data, $user) function worktitle($data, $user)
{ {
addSort(); addSort();
@ -13,8 +24,12 @@ function worktitle($data, $user)
$pg .= "<td class=dr><span class=nb><$r id=srtshrate data-sf=r5>:Share Rate</span></td>"; $pg .= "<td class=dr><span class=nb><$r id=srtshrate data-sf=r5>:Share Rate</span></td>";
$pg .= '<td class=dr>&laquo;Elapsed</td>'; $pg .= '<td class=dr>&laquo;Elapsed</td>';
$pg .= "<td class=dr><span class=nb><$r id=srtinv data-sf=r7>:Invalid</span></td>"; $pg .= "<td class=dr><span class=nb><$r id=srtinv data-sf=r7>:Invalid</span></td>";
$pg .= '<td class=dr>Block %</td>'; $pg .= "<td class=hid data-hid=dr><span class=nb><$r id=srtstale data-sf=r8>:Stale</span></td>";
$pg .= "<td class=dr><span class=nb><$r id=srtrate data-sf=r9>:Hash</span> Rate</td>"; $pg .= "<td class=hid data-hid=dr><span class=nb><$r id=srtdup data-sf=r9>:Dup</span></td>";
$pg .= "<td class=hid data-hid=dr><span class=nb><$r id=srthi data-sf=r10>:Hi</span></td>";
$pg .= "<td class=hid data-hid=dr><span class=nb><$r id=srtreject data-sf=r11>:Rej</span></td>";
$pg .= '<td class=dr>Block&nbsp;%</td>';
$pg .= "<td class=dr><span class=nb><$r id=srtrate data-sf=r13>:Hash</span> Rate</td>";
$pg .= "</tr>\n"; $pg .= "</tr>\n";
return $pg; return $pg;
} }
@ -66,6 +81,14 @@ function workuser($data, $user, &$offset, &$totshare, &$totdiff,
'w_shareacc' => $ans['w_shareacc:'.$i], 'w_shareacc' => $ans['w_shareacc:'.$i],
'w_diffacc' => $ans['w_diffacc:'.$i], 'w_diffacc' => $ans['w_diffacc:'.$i],
'w_diffinv' => $ans['w_diffinv:'.$i], 'w_diffinv' => $ans['w_diffinv:'.$i],
'w_diffsta' => $ans['w_diffsta:'.$i],
'w_diffdup' => $ans['w_diffdup:'.$i],
'w_diffhi' => $ans['w_diffhi:'.$i],
'w_diffrej' => $ans['w_diffrej:'.$i],
'w_sharesta' => $ans['w_sharesta:'.$i],
'w_sharedup' => $ans['w_sharedup:'.$i],
'w_sharehi' => $ans['w_sharehi:'.$i],
'w_sharerej' => $ans['w_sharerej:'.$i],
'w_lastdiff' => $ans['w_lastdiff:'.$i], 'w_lastdiff' => $ans['w_lastdiff:'.$i],
'w_active_diffacc' => $ans['w_active_diffacc:'.$i], 'w_active_diffacc' => $ans['w_active_diffacc:'.$i],
'w_active_start' => $ans['w_active_start:'.$i], 'w_active_start' => $ans['w_active_start:'.$i],
@ -143,6 +166,15 @@ function workuser($data, $user, &$offset, &$totshare, &$totdiff,
$pg .= "<td class=dr data-srt=$rejf>$rej%</td>"; $pg .= "<td class=dr data-srt=$rejf>$rej%</td>";
foreach(array('sta','dup','hi','rej') as $fld)
{
$shr = number_format($all[$i]['w_share'.$fld]);
$dif = $all[$i]['w_diff'.$fld];
$ddif = number_format($dif);
$sdif = number_format($dif,0,'','');
$pg .= "<td class=hid data-srt=$sdif data-hid=dr>$ddif/$shr</td>";
}
if ($blockacc <= 0) if ($blockacc <= 0)
$blkpct = '&nbsp;'; $blkpct = '&nbsp;';
else else
@ -198,6 +230,7 @@ function worktotal($offset, $totshare, $totdiff, $totshrate, $totinvalid,
$blkpct = '&nbsp;'; $blkpct = '&nbsp;';
else else
$blkpct = number_format(100.0 * $totdiff / $blockacc, 3) . '%'; $blkpct = number_format(100.0 * $totdiff / $blockacc, 3) . '%';
$pg .= "<td class=hid colspan=4 data-hid=dr>&nbsp;</td>";
$pg .= "<td class=dr>$blkpct</td>"; $pg .= "<td class=dr>$blkpct</td>";
$pg .= "<td class=dr>$totrate</td></tr>\n"; $pg .= "<td class=dr>$totrate</td></tr>\n";
return $pg; return $pg;
@ -207,7 +240,7 @@ function doworker($data, $user)
{ {
$title = ''; $title = '';
$pg = "<table callpadding=0 cellspacing=0 border=0>\n"; $pg = worktable();
$totshare = 0; $totshare = 0;
$totdiff = 0; $totdiff = 0;

12
src/bitcoin.c

@ -54,7 +54,7 @@ bool validate_address(connsock_t *cs, const char *address)
snprintf(rpc_req, 128, "{\"method\": \"validateaddress\", \"params\": [\"%s\"]}\n", address); snprintf(rpc_req, 128, "{\"method\": \"validateaddress\", \"params\": [\"%s\"]}\n", address);
val = json_rpc_call(cs, rpc_req); val = json_rpc_call(cs, rpc_req);
if (!val) { if (!val) {
LOGERR("Failed to get valid json response to validate_address"); LOGERR("%s:%s Failed to get valid json response to validate_address", cs->url, cs->port);
return ret; return ret;
} }
res_val = json_object_get(val, "result"); res_val = json_object_get(val, "result");
@ -194,7 +194,7 @@ bool gen_gbtbase(connsock_t *cs, gbtbase_t *gbt)
val = json_rpc_call(cs, gbt_req); val = json_rpc_call(cs, gbt_req);
if (!val) { if (!val) {
LOGWARNING("Failed to get valid json response to getblocktemplate"); LOGWARNING("%s:%s Failed to get valid json response to getblocktemplate", cs->url, cs->port);
return ret; return ret;
} }
res_val = json_object_get(val, "result"); res_val = json_object_get(val, "result");
@ -299,7 +299,7 @@ int get_blockcount(connsock_t *cs)
val = json_rpc_call(cs, blockcount_req); val = json_rpc_call(cs, blockcount_req);
if (!val) { if (!val) {
LOGWARNING("Failed to get valid json response to getblockcount"); LOGWARNING("%s:%s Failed to get valid json response to getblockcount", cs->url, cs->port);
return ret; return ret;
} }
res_val = json_object_get(val, "result"); res_val = json_object_get(val, "result");
@ -325,7 +325,7 @@ bool get_blockhash(connsock_t *cs, int height, char *hash)
sprintf(rpc_req, "{\"method\": \"getblockhash\", \"params\": [%d]}\n", height); sprintf(rpc_req, "{\"method\": \"getblockhash\", \"params\": [%d]}\n", height);
val = json_rpc_call(cs, rpc_req); val = json_rpc_call(cs, rpc_req);
if (!val) { if (!val) {
LOGWARNING("Failed to get valid json response to getblockhash"); LOGWARNING("%s:%s Failed to get valid json response to getblockhash", cs->url, cs->port);
return ret; return ret;
} }
res_val = json_object_get(val, "result"); res_val = json_object_get(val, "result");
@ -356,7 +356,7 @@ bool get_bestblockhash(connsock_t *cs, char *hash)
val = json_rpc_call(cs, bestblockhash_req); val = json_rpc_call(cs, bestblockhash_req);
if (!val) { if (!val) {
LOGWARNING("Failed to get valid json response to getbestblockhash"); LOGWARNING("%s:%s Failed to get valid json response to getbestblockhash", cs->url, cs->port);
return ret; return ret;
} }
res_val = json_object_get(val, "result"); res_val = json_object_get(val, "result");
@ -391,7 +391,7 @@ retry:
val = json_rpc_call(cs, rpc_req); val = json_rpc_call(cs, rpc_req);
dealloc(rpc_req); dealloc(rpc_req);
if (!val) { if (!val) {
LOGWARNING("Failed to get valid json response to submitblock"); LOGWARNING("%s:%s Failed to get valid json response to submitblock", cs->url, cs->port);
if (++retries < 5) if (++retries < 5)
goto retry; goto retry;
return ret; return ret;

20
src/ckdb.c

@ -483,6 +483,9 @@ K_STORE *markersummary_store;
K_TREE *markersummary_pool_root; K_TREE *markersummary_pool_root;
K_STORE *markersummary_pool_store; K_STORE *markersummary_pool_store;
// The markerid load start for markersummary
char *mark_start = NULL;
// WORKMARKERS // WORKMARKERS
K_TREE *workmarkers_root; K_TREE *workmarkers_root;
K_TREE *workmarkers_workinfoid_root; K_TREE *workmarkers_workinfoid_root;
@ -1746,7 +1749,7 @@ static void trans_process(SEQSET *seqset, tv_t *now, K_STORE *store)
seqtrans->seq = seq; seqtrans->seq = seq;
seqtrans->seqnum = u; seqtrans->seqnum = u;
memcpy(&(seqtrans->entry), seqentry, sizeof(SEQENTRY)); memcpy(&(seqtrans->entry), seqentry, sizeof(SEQENTRY));
k_add_head(store, st_item); k_add_head_nolock(store, st_item);
} }
u++; u++;
seqentry++; seqentry++;
@ -1772,7 +1775,7 @@ static void trans_process(SEQSET *seqset, tv_t *now, K_STORE *store)
seqtrans->seq = seq; seqtrans->seq = seq;
seqtrans->seqnum = u; seqtrans->seqnum = u;
memcpy(&(seqtrans->entry), seqentry, sizeof(SEQENTRY)); memcpy(&(seqtrans->entry), seqentry, sizeof(SEQENTRY));
k_add_head(store, st_item); k_add_head_nolock(store, st_item);
} }
u++; u++;
seqentry++; seqentry++;
@ -2143,7 +2146,7 @@ gotseqset:
sizeof(SEQENTRY)); sizeof(SEQENTRY));
if (!lost) if (!lost)
lost = k_new_store(seqtrans_free); lost = k_new_store(seqtrans_free);
k_add_tail(lost, st_item); k_add_tail_nolock(lost, st_item);
seqdata->lost++; seqdata->lost++;
seqset->lost++; seqset->lost++;
if (ENTRYISTRANS(u_entry)) { if (ENTRYISTRANS(u_entry)) {
@ -2173,7 +2176,7 @@ gotseqset:
seqdata->reload_lost = k_new_store(seqtrans_free); seqdata->reload_lost = k_new_store(seqtrans_free);
seqdata_reload_lost = true; seqdata_reload_lost = true;
} }
k_add_tail(seqdata->reload_lost, stl_item); k_add_tail_nolock(seqdata->reload_lost, stl_item);
} }
} else { } else {
// (u-size) wasn't missing // (u-size) wasn't missing
@ -2254,8 +2257,7 @@ gotseqset:
} }
if (st_item) { if (st_item) {
// recovered a lost entry // recovered a lost entry
k_unlink_item(seqtrans_free, st_item); k_unlink_item_nolock(seqdata->reload_lost, st_item);
// N.B. lock inside lock
K_WLOCK(seqtrans_free); K_WLOCK(seqtrans_free);
k_add_head(seqtrans_free, st_item); k_add_head(seqtrans_free, st_item);
K_WUNLOCK(seqtrans_free); K_WUNLOCK(seqtrans_free);
@ -5640,6 +5642,7 @@ static struct option long_options[] = {
{ "loglevel", required_argument, 0, 'l' }, { "loglevel", required_argument, 0, 'l' },
// marker = enable mark/workmarker/markersummary auto generation // marker = enable mark/workmarker/markersummary auto generation
{ "marker", no_argument, 0, 'm' }, { "marker", no_argument, 0, 'm' },
{ "markstart", required_argument, 0, 'M' },
{ "name", required_argument, 0, 'n' }, { "name", required_argument, 0, 'n' },
{ "dbpass", required_argument, 0, 'p' }, { "dbpass", required_argument, 0, 'p' },
{ "btc-pass", required_argument, 0, 'P' }, { "btc-pass", required_argument, 0, 'P' },
@ -5684,7 +5687,7 @@ int main(int argc, char **argv)
memset(&ckp, 0, sizeof(ckp)); memset(&ckp, 0, sizeof(ckp));
ckp.loglevel = LOG_NOTICE; ckp.loglevel = LOG_NOTICE;
while ((c = getopt_long(argc, argv, "c:d:ghkl:mn:p:P:r:R:s:S:t:u:U:vw:yY:", long_options, &i)) != -1) { while ((c = getopt_long(argc, argv, "c:d:ghkl:mM:n:p:P:r:R:s:S:t:u:U:vw:yY:", long_options, &i)) != -1) {
switch(c) { switch(c) {
case 'c': case 'c':
ckp.config = strdup(optarg); ckp.config = strdup(optarg);
@ -5743,6 +5746,9 @@ int main(int argc, char **argv)
case 'm': case 'm':
markersummary_auto = true; markersummary_auto = true;
break; break;
case 'M':
mark_start = strdup(optarg);
break;
case 'n': case 'n':
ckp.name = strdup(optarg); ckp.name = strdup(optarg);
break; break;

5
src/ckdb.h

@ -55,7 +55,7 @@
#define DB_VLOCK "1" #define DB_VLOCK "1"
#define DB_VERSION "1.0.4" #define DB_VERSION "1.0.4"
#define CKDB_VERSION DB_VERSION"-1.610" #define CKDB_VERSION DB_VERSION"-1.620"
#define WHERE_FFL " - from %s %s() line %d" #define WHERE_FFL " - from %s %s() line %d"
#define WHERE_FFL_HERE __FILE__, __func__, __LINE__ #define WHERE_FFL_HERE __FILE__, __func__, __LINE__
@ -1958,6 +1958,9 @@ extern K_STORE *markersummary_store;
extern K_TREE *markersummary_pool_root; extern K_TREE *markersummary_pool_root;
extern K_STORE *markersummary_pool_store; extern K_STORE *markersummary_pool_store;
// The markerid load start for markersummary
extern char *mark_start;
// WORKMARKERS // WORKMARKERS
typedef struct workmarkers { typedef struct workmarkers {
int64_t markerid; int64_t markerid;

39
src/ckdb_cmd.c

@ -3636,9 +3636,9 @@ static char *cmd_setatts(PGconn *conn, char *cmd, char *id,
} }
} }
if (!ua_item) { if (!ua_item) {
K_RLOCK(useratts_free); K_WLOCK(useratts_free);
ua_item = k_unlink_head(useratts_free); ua_item = k_unlink_head(useratts_free);
K_RUNLOCK(useratts_free); K_WUNLOCK(useratts_free);
DATA_USERATTS(useratts, ua_item); DATA_USERATTS(useratts, ua_item);
bzero(useratts, sizeof(*useratts)); bzero(useratts, sizeof(*useratts));
useratts->userid = users->userid; useratts->userid = users->userid;
@ -5796,7 +5796,7 @@ static char *cmd_marks(PGconn *conn, char *cmd, char *id,
char description[TXT_BIG+1] = { '\0' }; char description[TXT_BIG+1] = { '\0' };
char extra[TXT_BIG+1] = { '\0' }; char extra[TXT_BIG+1] = { '\0' };
char status[TXT_FLAG+1] = { MARK_READY, '\0' }; char status[TXT_FLAG+1] = { MARK_READY, '\0' };
bool ok; bool ok = false, pps;
LOGDEBUG("%s(): cmd '%s'", __func__, cmd); LOGDEBUG("%s(): cmd '%s'", __func__, cmd);
@ -6237,6 +6237,39 @@ static char *cmd_marks(PGconn *conn, char *cmd, char *id,
old ? "On" : "Off", old ? "On" : "Off",
markersummary_auto ? "On" : "Off"); markersummary_auto ? "On" : "Off");
ok = true; ok = true;
} else if (strcasecmp(action, "pps") == 0) {
/* Recalculate a shift's rewards/rewarded
* Require markerid */
i_markerid = require_name(trf_root, "markerid", 1,
(char *)intpatt, reply, siz);
if (!i_markerid)
return strdup(reply);
TXT_TO_BIGINT("markerid", transfer_data(i_markerid), markerid);
K_RLOCK(workmarkers_free);
wm_item = find_workmarkerid(markerid, false, MARKER_PROCESSED);
K_RUNLOCK(workmarkers_free);
if (!wm_item) {
snprintf(reply, siz, "no markerid %"PRId64, markerid);
return strdup(reply);
}
DATA_WORKMARKERS(workmarkers, wm_item);
pps = shift_rewards(wm_item);
if (pps) {
snprintf(msg, sizeof(msg),
"shift '%s' markerid %"PRId64" rewards %d "
"rewarded %.3e pps %.3e",
workmarkers->description,
workmarkers->markerid, workmarkers->rewards,
workmarkers->rewarded, workmarkers->pps_value);
} else {
snprintf(msg, sizeof(msg),
"shift '%s' markerid %"PRId64" no rewards yet"
" pps %.3e",
workmarkers->description,
workmarkers->markerid, workmarkers->pps_value);
}
ok = true;
} else { } else {
snprintf(reply, siz, "unknown action '%s'", action); snprintf(reply, siz, "unknown action '%s'", action);
LOGERR("%s.%s", id, reply); LOGERR("%s.%s", id, reply);

16
src/ckdb_dbio.c

@ -6766,9 +6766,10 @@ bool markersummary_fill(PGconn *conn)
K_ITEM *item, *p_item; K_ITEM *item, *p_item;
int n, t, i, p_n; int n, t, i, p_n;
MARKERSUMMARY *row, *p_row; MARKERSUMMARY *row, *p_row;
char *params[1];
char *field; char *field;
char *sel; char *sel;
int fields = 20; int fields = 20, par = 0;
bool ok = false; bool ok = false;
LOGDEBUG("%s(): select", __func__); LOGDEBUG("%s(): select", __func__);
@ -6783,7 +6784,16 @@ bool markersummary_fill(PGconn *conn)
"sharecount,errorcount,firstshare,lastshare,firstshareacc," "sharecount,errorcount,firstshare,lastshare,firstshareacc,"
"lastshareacc,lastdiffacc" "lastshareacc,lastdiffacc"
MODIFYDATECONTROL MODIFYDATECONTROL
" from markersummary"; " from markersummary where markerid>=$1";
par = 0;
if (mark_start)
params[par++] = mark_start;
else
params[par++] = "0";
PARCHK(par, params);
LOGWARNING("%s(): loading from markerid>=%s", __func__, params[0]);
res = PQexec(conn, "Begin", CKPQ_READ); res = PQexec(conn, "Begin", CKPQ_READ);
rescode = PQresultStatus(res); rescode = PQresultStatus(res);
PQclear(res); PQclear(res);
@ -6792,7 +6802,7 @@ bool markersummary_fill(PGconn *conn)
return false; return false;
} }
res = PQexec(conn, sel, CKPQ_READ); res = PQexecParams(conn, sel, par, NULL, (const char **)params, NULL, NULL, 0, CKPQ_READ);
rescode = PQresultStatus(res); rescode = PQresultStatus(res);
PQclear(res); PQclear(res);
if (!PGOK(rescode)) { if (!PGOK(rescode)) {

32
src/ckpool.c

@ -515,13 +515,14 @@ void empty_buffer(connsock_t *cs)
* of the buffer for use on the next receive. */ * of the buffer for use on the next receive. */
int read_socket_line(connsock_t *cs, float *timeout) int read_socket_line(connsock_t *cs, float *timeout)
{ {
int fd = cs->fd, ret = -1;
char *eom = NULL; char *eom = NULL;
tv_t start, now; tv_t start, now;
size_t buflen; size_t buflen;
int ret = -1;
bool polled;
float diff; float diff;
if (unlikely(fd < 0)) if (unlikely(cs->fd < 0))
goto out; goto out;
if (unlikely(!cs->buf)) if (unlikely(!cs->buf))
@ -542,7 +543,8 @@ rewait:
ret = 0; ret = 0;
goto out; goto out;
} }
ret = wait_read_select(fd, eom ? 0 : *timeout); ret = wait_recv_select(cs->fd, eom ? 0 : *timeout);
polled = true;
if (ret < 1) { if (ret < 1) {
if (!ret) { if (!ret) {
if (eom) if (eom)
@ -558,19 +560,23 @@ rewait:
} }
tv_time(&now); tv_time(&now);
diff = tvdiff(&now, &start); diff = tvdiff(&now, &start);
copy_tv(&start, &now);
*timeout -= diff; *timeout -= diff;
while (42) { while (42) {
char readbuf[PAGESIZE] = {}; char readbuf[PAGESIZE] = {};
int backoff = 1; int backoff = 1;
char *newbuf; char *newbuf;
ret = recv(fd, readbuf, PAGESIZE - 4, MSG_DONTWAIT); ret = recv(cs->fd, readbuf, PAGESIZE - 4, MSG_DONTWAIT);
if (ret < 1) { if (ret < 1) {
/* No more to read or closed socket after valid message */ /* No more to read or closed socket after valid message */
if (eom) if (eom)
break; break;
/* Have we used up all the timeout yet? */ /* Have we used up all the timeout yet? If polled is
if (*timeout >= 0 && (errno == EAGAIN || errno == EWOULDBLOCK || !ret)) * set that means poll has said there should be
* something to read and if we get nothing it means the
* socket is closed. */
if (!polled && *timeout >= 0 && (errno == EAGAIN || errno == EWOULDBLOCK || !ret))
goto rewait; goto rewait;
if (cs->ckp->proxy) if (cs->ckp->proxy)
LOGINFO("Failed to recv in read_socket_line"); LOGINFO("Failed to recv in read_socket_line");
@ -578,6 +584,7 @@ rewait:
LOGERR("Failed to recv in read_socket_line"); LOGERR("Failed to recv in read_socket_line");
goto out; goto out;
} }
polled = false;
buflen = cs->bufofs + ret + 1; buflen = cs->bufofs + ret + 1;
while (42) { while (42) {
newbuf = realloc(cs->buf, buflen); newbuf = realloc(cs->buf, buflen);
@ -766,6 +773,8 @@ json_t *json_rpc_call(connsock_t *cs, const char *rpc_req)
double elapsed; double elapsed;
int len, ret; int len, ret;
/* Serialise all calls in case we use cs from multiple threads */
cksem_wait(&cs->sem);
if (unlikely(cs->fd < 0)) { if (unlikely(cs->fd < 0)) {
LOGWARNING("FD %d invalid in %s", cs->fd, __func__); LOGWARNING("FD %d invalid in %s", cs->fd, __func__);
goto out; goto out;
@ -853,13 +862,20 @@ out_empty:
if (!val) { if (!val) {
/* Assume that a failed request means the socket will be closed /* Assume that a failed request means the socket will be closed
* and reopen it */ * and reopen it */
LOGWARNING("Reopening socket to %s:%s", cs->url, cs->port);
Close(cs->fd); Close(cs->fd);
cs->fd = connect_socket(cs->url, cs->port);
} }
out: out:
if (cs->fd < 0) {
/* Attempt to reopen a socket that has been closed due to a
* failed request or if the socket was closed while trying to
* read/write to it. */
cs->fd = connect_socket(cs->url, cs->port);
LOGWARNING("Attempt to reopen socket to %s:%s %ssuccessful",
cs->url, cs->port, cs->fd > 0 ? "" : "un");
}
free(http_req); free(http_req);
dealloc(cs->buf); dealloc(cs->buf);
cksem_post(&cs->sem);
return val; return val;
} }

3
src/ckpool.h

@ -80,6 +80,8 @@ struct connsock {
int bufofs; int bufofs;
int buflen; int buflen;
ckpool_t *ckp; ckpool_t *ckp;
/* Semaphore used to serialise request/responses */
sem_t sem;
}; };
typedef struct connsock connsock_t; typedef struct connsock connsock_t;
@ -110,6 +112,7 @@ struct server_instance {
char *auth; char *auth;
char *pass; char *pass;
bool notify; bool notify;
bool alive;
connsock_t cs; connsock_t cs;
void *data; // Private data void *data; // Private data

130
src/generator.c

@ -146,7 +146,7 @@ struct generator_data {
int subproxies_generated; int subproxies_generated;
int proxy_notify_id; // Globally increasing notify id int proxy_notify_id; // Globally increasing notify id
ckmsgq_t *srvchk; // Server check message queue server_instance_t *si; /* Current server instance */
pthread_t pth_uprecv; // User proxy receive thread pthread_t pth_uprecv; // User proxy receive thread
pthread_t pth_psend; // Combined proxy send thread pthread_t pth_psend; // Combined proxy send thread
@ -177,6 +177,7 @@ static bool server_alive(ckpool_t *ckp, server_instance_t *si, bool pinging)
/* Has this server already been reconnected? */ /* Has this server already been reconnected? */
if (cs->fd > 0) if (cs->fd > 0)
return true; return true;
si->alive = false;
if (!extract_sockaddr(si->url, &cs->url, &cs->port)) { if (!extract_sockaddr(si->url, &cs->url, &cs->port)) {
LOGWARNING("Failed to extract address from %s", si->url); LOGWARNING("Failed to extract address from %s", si->url);
return ret; return ret;
@ -218,8 +219,11 @@ out:
if (!ret) { if (!ret) {
/* Close and invalidate the file handle */ /* Close and invalidate the file handle */
Close(cs->fd); Close(cs->fd);
} else } else {
si->alive = true;
LOGNOTICE("Server alive: %s:%s", cs->url, cs->port);
keep_sockalive(cs->fd); keep_sockalive(cs->fd);
}
return ret; return ret;
} }
@ -235,19 +239,31 @@ retry:
if (!ping_main(ckp)) if (!ping_main(ckp))
goto out; goto out;
/* First find a server that is already flagged alive if possible
* without blocking on server_alive() */
for (i = 0; i < ckp->btcds; i++) {
server_instance_t *si = ckp->servers[i];
cs = &si->cs;
if (si->alive && cs->fd > 0) {
alive = si;
goto living;
}
}
/* No servers flagged alive, try to connect to them blocking */
for (i = 0; i < ckp->btcds; i++) { for (i = 0; i < ckp->btcds; i++) {
server_instance_t *si = ckp->servers[i]; server_instance_t *si = ckp->servers[i];
if (server_alive(ckp, si, false)) { if (server_alive(ckp, si, false)) {
alive = si; alive = si;
break; goto living;
} }
} }
if (!alive) {
LOGWARNING("CRITICAL: No bitcoinds active!"); LOGWARNING("CRITICAL: No bitcoinds active!");
sleep(5); sleep(5);
goto retry; goto retry;
} living:
cs = &alive->cs; cs = &alive->cs;
LOGINFO("Connected to live server %s:%s", cs->url, cs->port); LOGINFO("Connected to live server %s:%s", cs->url, cs->port);
out: out:
@ -284,11 +300,9 @@ static void clear_unix_msg(unix_msg_t **umsg)
static int gen_loop(proc_instance_t *pi) static int gen_loop(proc_instance_t *pi)
{ {
server_instance_t *si = NULL; server_instance_t *si = NULL, *old_si;
bool reconnecting = false;
unix_msg_t *umsg = NULL; unix_msg_t *umsg = NULL;
ckpool_t *ckp = pi->ckp; ckpool_t *ckp = pi->ckp;
gdata_t *gdata = ckp->data;
bool started = false; bool started = false;
char *buf = NULL; char *buf = NULL;
connsock_t *cs; connsock_t *cs;
@ -298,24 +312,24 @@ static int gen_loop(proc_instance_t *pi)
reconnect: reconnect:
clear_unix_msg(&umsg); clear_unix_msg(&umsg);
if (si) { old_si = si;
kill_server(si);
reconnecting = true;
}
si = live_server(ckp); si = live_server(ckp);
if (!si) if (!si)
goto out; goto out;
if (unlikely(!started)) {
started = true;
LOGWARNING("%s generator ready", ckp->name);
}
gbt = si->data; gbt = si->data;
cs = &si->cs; cs = &si->cs;
if (reconnecting) { if (!old_si)
LOGWARNING("Connected to bitcoind: %s:%s", cs->url, cs->port);
else if (si != old_si)
LOGWARNING("Failed over to bitcoind: %s:%s", cs->url, cs->port); LOGWARNING("Failed over to bitcoind: %s:%s", cs->url, cs->port);
reconnecting = false;
}
retry: retry:
clear_unix_msg(&umsg); clear_unix_msg(&umsg);
ckmsgq_add(gdata->srvchk, si);
do { do {
umsg = get_unix_msg(pi); umsg = get_unix_msg(pi);
@ -327,7 +341,7 @@ retry:
} while (!umsg); } while (!umsg);
if (unlikely(cs->fd < 0)) { if (unlikely(cs->fd < 0)) {
LOGWARNING("Bitcoind socket invalidated, will attempt failover"); LOGWARNING("%s:%s Bitcoind socket invalidated, will attempt failover", cs->url, cs->port);
goto reconnect; goto reconnect;
} }
@ -358,10 +372,6 @@ retry:
cs->url, cs->port); cs->url, cs->port);
send_unix_msg(umsg->sockd, "failed"); send_unix_msg(umsg->sockd, "failed");
} else { } else {
if (unlikely(!started)) {
started = true;
LOGWARNING("%s generator ready", ckp->name);
}
send_unix_msg(umsg->sockd, hash); send_unix_msg(umsg->sockd, hash);
} }
} else if (cmdmatch(buf, "getlast")) { } else if (cmdmatch(buf, "getlast")) {
@ -378,11 +388,6 @@ retry:
send_unix_msg(umsg->sockd, "failed"); send_unix_msg(umsg->sockd, "failed");
goto reconnect; goto reconnect;
} else { } else {
if (unlikely(!started)) {
started = true;
LOGWARNING("%s generator ready", ckp->name);
}
send_unix_msg(umsg->sockd, hash); send_unix_msg(umsg->sockd, hash);
LOGDEBUG("Hash: %s", hash); LOGDEBUG("Hash: %s", hash);
} }
@ -2724,8 +2729,38 @@ out:
return ret; return ret;
} }
/* Check which servers are alive, maintaining a connection with them and
* reconnect if a higher priority one is available. */
static void *server_watchdog(void *arg)
{
ckpool_t *ckp = (ckpool_t *)arg;
gdata_t *gdata = ckp->data;
while (42) {
server_instance_t *best = NULL;
ts_t timer_t;
int i;
cksleep_prepare_r(&timer_t);
for (i = 0; i < ckp->btcds; i++) {
server_instance_t *si = ckp->servers[i];
/* Have we reached the current server? */
if (server_alive(ckp, si, true) && !best)
best = si;
}
if (best && best != gdata->si) {
gdata->si = best;
send_proc(ckp->generator, "reconnect");
}
cksleep_ms_r(&timer_t, 5000);
}
return NULL;
}
static int server_mode(ckpool_t *ckp, proc_instance_t *pi) static int server_mode(ckpool_t *ckp, proc_instance_t *pi)
{ {
pthread_t pth_watchdog;
server_instance_t *si; server_instance_t *si;
int i, ret; int i, ret;
@ -2737,8 +2772,12 @@ static int server_mode(ckpool_t *ckp, proc_instance_t *pi)
si->auth = ckp->btcdauth[i]; si->auth = ckp->btcdauth[i];
si->pass = ckp->btcdpass[i]; si->pass = ckp->btcdpass[i];
si->notify = ckp->btcdnotify[i]; si->notify = ckp->btcdnotify[i];
si->id = i;
cksem_init(&si->cs.sem);
cksem_post(&si->cs.sem);
} }
create_pthread(&pth_watchdog, server_watchdog, ckp);
ret = gen_loop(pi); ret = gen_loop(pi);
for (i = 0; i < ckp->btcds; i++) { for (i = 0; i < ckp->btcds; i++) {
@ -2798,41 +2837,6 @@ static int proxy_mode(ckpool_t *ckp, proc_instance_t *pi)
return ret; return ret;
} }
/* Tell the watchdog what the current server instance is and decide if we
* should check to see if the higher priority servers are alive and fallback */
static void server_watchdog(ckpool_t *ckp, server_instance_t *cursi)
{
static time_t last_t = 0;
bool alive = false;
time_t now_t;
int i;
/* Rate limit to checking only once every 5 seconds */
now_t = time(NULL);
if (now_t <= last_t + 5)
return;
last_t = now_t;
/* Is this the highest priority server already? */
if (!cursi->id)
return;
for (i = 0; i < ckp->btcds; i++) {
server_instance_t *si = ckp->servers[i];
/* Have we reached the current server? */
if (si == cursi)
return;
alive = server_alive(ckp, si, true);
if (alive)
break;
}
if (alive)
reconnect_generator(ckp);
}
int generator(proc_instance_t *pi) int generator(proc_instance_t *pi)
{ {
ckpool_t *ckp = pi->ckp; ckpool_t *ckp = pi->ckp;
@ -2859,10 +2863,8 @@ int generator(proc_instance_t *pi)
} while (!buf); } while (!buf);
dealloc(buf); dealloc(buf);
ret = proxy_mode(ckp, pi); ret = proxy_mode(ckp, pi);
} else { } else
gdata->srvchk = create_ckmsgq(ckp, "srvchk", &server_watchdog);
ret = server_mode(ckp, pi); ret = server_mode(ckp, pi);
}
out: out:
dealloc(ckp->data); dealloc(ckp->data);
return process_exit(ckp, pi, ret); return process_exit(ckp, pi, ret);

2
src/klist.h

@ -586,7 +586,7 @@ extern void _k_insert_after(K_LIST *list, K_ITEM *item, K_ITEM *after, LOCK_MAYB
//#define k_insert_after_nolock(_list, _item, _after) _k_insert_after(_list, _item, _after, false, KLIST_FFL_HERE) //#define k_insert_after_nolock(_list, _item, _after) _k_insert_after(_list, _item, _after, false, KLIST_FFL_HERE)
extern void _k_unlink_item(K_LIST *list, K_ITEM *item, LOCK_MAYBE bool chklock, KLIST_FFL_ARGS); extern void _k_unlink_item(K_LIST *list, K_ITEM *item, LOCK_MAYBE bool chklock, KLIST_FFL_ARGS);
#define k_unlink_item(_list, _item) _k_unlink_item(_list, _item, true, KLIST_FFL_HERE) #define k_unlink_item(_list, _item) _k_unlink_item(_list, _item, true, KLIST_FFL_HERE)
//#define k_unlink_item_nolock(_list, _item) _k_unlink_item(_list, _item, false, KLIST_FFL_HERE) #define k_unlink_item_nolock(_list, _item) _k_unlink_item(_list, _item, false, KLIST_FFL_HERE)
extern void _k_list_transfer_to_head(K_LIST *from, K_LIST *to, LOCK_MAYBE bool chklock, KLIST_FFL_ARGS); extern void _k_list_transfer_to_head(K_LIST *from, K_LIST *to, LOCK_MAYBE bool chklock, KLIST_FFL_ARGS);
#define k_list_transfer_to_head(_from, _to) _k_list_transfer_to_head(_from, _to, true, KLIST_FFL_HERE) #define k_list_transfer_to_head(_from, _to) _k_list_transfer_to_head(_from, _to, true, KLIST_FFL_HERE)
//#define k_list_transfer_to_head_nolock(_from, _to) _k_list_transfer_to_head(_from, _to, false, KLIST_FFL_HERE) //#define k_list_transfer_to_head_nolock(_from, _to) _k_list_transfer_to_head(_from, _to, false, KLIST_FFL_HERE)

56
src/libckpool.c

@ -620,13 +620,17 @@ void block_socket(int fd)
void _close(int *fd, const char *file, const char *func, const int line) void _close(int *fd, const char *file, const char *func, const int line)
{ {
int sockd;
if (*fd < 0) if (*fd < 0)
return; return;
LOGDEBUG("Closing file handle %d", *fd); sockd = *fd;
if (unlikely(close(*fd))) LOGDEBUG("Closing file handle %d", sockd);
LOGWARNING("Close of fd %d failed with errno %d:%s from %s %s:%d",
*fd, errno, strerror(errno), file, func, line);
*fd = -1; *fd = -1;
if (unlikely(close(sockd))) {
LOGWARNING("Close of fd %d failed with errno %d:%s from %s %s:%d",
sockd, errno, strerror(errno), file, func, line);
}
} }
int bind_socket(char *url, char *port) int bind_socket(char *url, char *port)
@ -754,17 +758,15 @@ out:
void empty_socket(int fd) void empty_socket(int fd)
{ {
char buf[PAGESIZE];
int ret; int ret;
if (fd < 1) if (fd < 1)
return; return;
do { do {
char buf[PAGESIZE]; ret = recv(fd, buf, PAGESIZE - 1, MSG_DONTWAIT);
ret = wait_read_select(fd, 0);
if (ret > 0) { if (ret > 0) {
ret = recv(fd, buf, PAGESIZE - 1, 0);
buf[ret] = 0; buf[ret] = 0;
LOGDEBUG("Discarding: %s", buf); LOGDEBUG("Discarding: %s", buf);
} }
@ -903,21 +905,45 @@ int wait_close(int sockd, int timeout)
return sfd.revents & (POLLHUP | POLLRDHUP | POLLERR); return sfd.revents & (POLLHUP | POLLRDHUP | POLLERR);
} }
/* Emulate a select read wait for high fds that select doesn't support */ /* Emulate a select read wait for high fds that select doesn't support.
int wait_read_select(int sockd, float timeout) * wait_read_select is for unix sockets and _wait_recv_select for regular
* sockets. */
int _wait_read_select(int *sockd, float timeout)
{ {
struct pollfd sfd; struct pollfd sfd;
int ret = -1; int ret = -1;
if (unlikely(sockd < 0)) if (unlikely(*sockd < 0))
goto out; goto out;
sfd.fd = sockd; sfd.fd = *sockd;
sfd.events = POLLIN | POLLRDHUP; sfd.events = POLLIN | POLLRDHUP;
sfd.revents = 0;
timeout *= 1000; timeout *= 1000;
ret = poll(&sfd, 1, timeout); ret = poll(&sfd, 1, timeout);
if (ret && !(sfd.revents & POLLIN)) if (ret > 0 && sfd.revents & (POLLERR)) {
ret = -1; ret = -1;
_Close(sockd);
}
out:
return ret;
}
int _wait_recv_select(int *sockd, float timeout)
{
struct pollfd sfd;
int ret = -1;
if (unlikely(*sockd < 0))
goto out;
sfd.fd = *sockd;
sfd.events = POLLIN | POLLRDHUP;
timeout *= 1000;
ret = poll(&sfd, 1, timeout);
/* If POLLRDHUP occurs, we may still have data to read so let recv()
* after this determine if the socket can still be used. */
if (ret > 0 && sfd.revents & (POLLHUP | POLLERR)) {
ret = -1;
_Close(sockd);
}
out: out:
return ret; return ret;
} }
@ -1978,6 +2004,8 @@ double diff_from_nbits(char *nbits)
pow = nbits[0]; pow = nbits[0];
powdiff = (8 * (0x1d - 3)) - (8 * (pow - 3)); powdiff = (8 * (0x1d - 3)) - (8 * (pow - 3));
if (powdiff < 8)
powdiff = 8;
diff32 = be32toh(*((uint32_t *)nbits)) & 0x00FFFFFF; diff32 = be32toh(*((uint32_t *)nbits)) & 0x00FFFFFF;
if (likely(powdiff > 0)) if (likely(powdiff > 0))
numerator = 0xFFFFULL << powdiff; numerator = 0xFFFFULL << powdiff;

5
src/libckpool.h

@ -496,7 +496,10 @@ int _open_unix_server(const char *server_path, const char *file, const char *fun
int _open_unix_client(const char *server_path, const char *file, const char *func, const int line); int _open_unix_client(const char *server_path, const char *file, const char *func, const int line);
#define open_unix_client(server_path) _open_unix_client(server_path, __FILE__, __func__, __LINE__) #define open_unix_client(server_path) _open_unix_client(server_path, __FILE__, __func__, __LINE__)
int wait_close(int sockd, int timeout); int wait_close(int sockd, int timeout);
int wait_read_select(int sockd, float timeout); int _wait_read_select(int *sockd, float timeout);
#define wait_read_select(SOCKD, TIMEOUT) _wait_read_select(&(SOCKD), TIMEOUT)
int _wait_recv_select(int *sockd, float timeout);
#define wait_recv_select(SOCKD, TIMEOUT) _wait_recv_select(&(SOCKD), TIMEOUT)
int read_length(int sockd, void *buf, int len); int read_length(int sockd, void *buf, int len);
char *_recv_unix_msg(int sockd, int timeout1, int timeout2, const char *file, const char *func, const int line); char *_recv_unix_msg(int sockd, int timeout1, int timeout2, const char *file, const char *func, const int line);
#define RECV_UNIX_TIMEOUT1 30 #define RECV_UNIX_TIMEOUT1 30

22
src/stratifier.c

@ -970,10 +970,10 @@ static void broadcast_ping(sdata_t *sdata);
static void *do_update(void *arg) static void *do_update(void *arg)
{ {
struct update_req *ur = (struct update_req *)arg; struct update_req *ur = (struct update_req *)arg;
int prio = ur->prio, retries = 0;
ckpool_t *ckp = ur->ckp; ckpool_t *ckp = ur->ckp;
sdata_t *sdata = ckp->data; sdata_t *sdata = ckp->data;
bool new_block = false; bool new_block = false;
int prio = ur->prio;
bool ret = false; bool ret = false;
workbase_t *wb; workbase_t *wb;
time_t now_t; time_t now_t;
@ -983,15 +983,20 @@ static void *do_update(void *arg)
pthread_detach(pthread_self()); pthread_detach(pthread_self());
rename_proc("updater"); rename_proc("updater");
retry:
buf = send_recv_generator(ckp, "getbase", prio); buf = send_recv_generator(ckp, "getbase", prio);
if (unlikely(!buf)) { if (unlikely(!buf)) {
LOGNOTICE("Get base in update_base delayed due to higher priority request"); LOGNOTICE("Get base in update_base delayed due to higher priority request");
goto out; goto out;
} }
if (unlikely(cmdmatch(buf, "failed"))) { if (unlikely(cmdmatch(buf, "failed"))) {
LOGWARNING("Generator returned failure in update_base"); if (retries++ < 5 || prio == GEN_PRIORITY) {
goto out; LOGWARNING("Generator returned failure in update_base, retry #%d", retries);
goto retry;
} }
}
if (unlikely(retries))
LOGWARNING("Generator succeeded in update_base after retrying");
wb = ckzalloc(sizeof(workbase_t)); wb = ckzalloc(sizeof(workbase_t));
wb->ckp = ckp; wb->ckp = ckp;
@ -4968,7 +4973,7 @@ static void suggest_diff(stratum_instance_t *client, const char *method, const j
int64_t sdiff; int64_t sdiff;
if (unlikely(!client_active(client))) { if (unlikely(!client_active(client))) {
LOGWARNING("Attempted to suggest diff on unauthorised client %"PRId64, client->id); LOGNOTICE("Attempted to suggest diff on unauthorised client %"PRId64, client->id);
return; return;
} }
if (arr_val && json_is_integer(arr_val)) if (arr_val && json_is_integer(arr_val))
@ -4989,6 +4994,13 @@ static void suggest_diff(stratum_instance_t *client, const char *method, const j
stratum_send_diff(sdata, client); stratum_send_diff(sdata, client);
} }
/* Send diff first when sending the first stratum template after subscribing */
static void init_client(sdata_t *sdata, const stratum_instance_t *client, const int64_t client_id)
{
stratum_send_diff(sdata, client);
stratum_send_update(sdata, client_id, true);
}
/* Enter with client holding ref count */ /* Enter with client holding ref count */
static void parse_method(ckpool_t *ckp, sdata_t *sdata, stratum_instance_t *client, static void parse_method(ckpool_t *ckp, sdata_t *sdata, stratum_instance_t *client,
const int64_t client_id, json_t *id_val, json_t *method_val, const int64_t client_id, json_t *id_val, json_t *method_val,
@ -5027,7 +5039,7 @@ static void parse_method(ckpool_t *ckp, sdata_t *sdata, stratum_instance_t *clie
json_object_set_new_nocheck(val, "error", json_null()); json_object_set_new_nocheck(val, "error", json_null());
stratum_add_send(sdata, val, client_id); stratum_add_send(sdata, val, client_id);
if (likely(client->subscribed)) if (likely(client->subscribed))
update_client(client, client_id); init_client(sdata, client, client_id);
return; return;
} }

Loading…
Cancel
Save