Browse Source

Merge branch 'master' into multiproxy

Conflicts:
	src/connector.c
master
Con Kolivas 10 years ago
parent
commit
5cb9408705
  1. 64
      html/can.js
  2. 55
      pool/db.php
  3. 1
      pool/page.php
  4. 18
      pool/page_blocks.php
  5. 42
      pool/page_usperf.php
  6. 1
      pool/prime.php
  7. 58
      src/ckdb.c
  8. 3
      src/ckdb.h
  9. 11
      src/ckdb_cmd.c
  10. 99
      src/connector.c
  11. 16
      src/stratifier.c

64
html/can.js

@ -0,0 +1,64 @@
function hasCan(){var c0=document.getElementById('can0');c=document.getElementById('can');return !!(c0&&c&&c.getContext&&c.getContext('2d'));}
function sep(d){ans={};var ar=d.split("\t");var l=ar.length;for(var i=0;i<l;i++){var e=ar[i].indexOf('=');ans[ar[i].substr(0,e)]=ar[i].substr(e+1)};return ans}
function dfmt(e){var d=new Date(e*1000);var DD=d.getUTCDate();var HH=d.getUTCHours();var MM=d.getUTCMinutes();var ans=''+DD+'/';if(HH<10){ans+='0'}ans+=''+HH+':';if(MM<10){ans+='0'}ans+=''+MM;return ans}
function gch(z,zm){if(z<0.5){return 0.5}if(z>(zm-0.5)){return(zm-0.5)}return z}
function gchx(c,x){return gch(x*c['xm']+c['xo'],c['ctx'].canvas.width)}
function gchy(c,y){return gch((1-y)*c['ym']+c['yo'],c['ctx'].canvas.height)}
function gx0(c){return -c['xo']/c['xm']};
function gy0(c){return -c['yo']/c['ym']};
function gto(c,xo,yo){c['xo']+=xo;c['yo']+=yo}
function gts(c,xs,ys){c['xm']*=xs;c['ym']*=ys}
function gtso(c,xs,ys){gto(c,c['xm']*(1.0-xs)/2.0,c['ym']*(1.0-ys)/2.0);gts(c,xs,ys)}
function gfs(c,bg){c['ctx'].fillStyle=bg}
function gss(c,fg){c['ctx'].strokeStyle=fg}
function glw(c,pct){c['ctx'].lineWidth=pct*c['ym']/100.0}
function glwr(c,rat){c['ctx'].lineWidth*=rat}
function gfz(c,x,y,ox,oy,t,co,a){gfs(c,co);c['ctx'].textAlign=a;c['ctx'].fillText(t,gchx(c,x)+ox,gchy(c,y)-oy)}
function gbe(c,x,y){c['ctx'].beginPath();c['ctx'].moveTo(gchx(c,x),gchy(c,y))}
function gln(c,x,y){c['ctx'].lineTo(gchx(c,x),gchy(c,y))}
function gct(c,x1,y1,x2,y2,x3,y3){c['ctx'].bezierCurveTo(gchx(c,x1),gchy(c,y1),gchx(c,x2),gchy(c,y2),gchx(c,x3),gchy(c,y3))}
function glm(c,x,y){c['ctx'].moveTo(gchx(c,x),gchy(c,y))}
function gle(c){c['ctx'].closePath()}
function gfl(c){c['ctx'].fill()}
function gst(c){c['ctx'].stroke()}
function gfi(c){gle(c);gst(c)}
function gbd(c){gbe(c,0,0);gln(c,1,0);gln(c,1,1);gln(c,0,1);gle(c);gfl(c);gst(c)}
function ggr(c,xs,ys,yt,xn,x0,x1,y0,y1,ar,nx,vx,vy,av){
gtso(c,xs,ys);
gss(c,'black');glw(c,0.2);
gbe(c,0,1);gln(c,0,0);gln(c,1,0);gst(c);
glw(c,0.01);
var hi=c['ctx'].measureText('M').width, wi=c['ctx'].measureText('1').width;
for(var i=0;i<11;i++){var y=i/10.0;gbe(c,-0.01,y);gln(c,1,y);gst(c);var t=''+(((y1-y0)*i/10+y0).toFixed(2));gfz(c,0,y,-wi,0,t,'black','end')}
gfz(c,gx0(c),0.55,wi,0,yt,'#0080ff','left');
var m=Math.round(0.5+xn/20.0);
var f=1;
for(var i=0;i<xn;i++){var n=ar[nx+i];var x=ar[vx+i];var xo=(x-x0)/(x1-x0);if(c['skey']&&(i<(xn-1))&&(i%m)==0){gbe(c,xo,0);gln(c,xo,-0.01);gst(c);gfz(c,xo,0,0,-hi*1.5,n,'#00a050','center')}if(c['slines']){gbe(c,xo,0);gln(c,xo,1);gst(c)}}
var xhr=x1-(x1%3600);
gss(c,'brown');
var tpos=2.7;if(c['over']){tpos=1.5}
for(var i=xhr;i>=x0;i-=(6*3600)){var n=dfmt(i);var xo=(i-x0)/(x1-x0);if(c['tkey']){gbe(c,xo,0);gln(c,xo,-0.02);gst(c);gfz(c,xo,0,0,-hi*tpos,n,'brown','center')}if(c['tlines']){gbe(c,xo,0);gln(c,xo,1);gst(c)}}
glw(c,0.1);
gss(c,'black');
if(c['smooth']){var xa=0,ya=0,xb=0,yb=0;
for(var i=0;i<xn;i++){var x=ar[vx+i];var y=ar[vy+i];var xo=(x-x0)/(x1-x0);var yo=(y-y0)/(y1-y0);if(f==1){gbe(c,xo,yo);f=0;xb=xo;yb=yo}else{gct(c,(xa+xb)/2,(ya+yb)/2,xb,yb,(xb+xo)/2,(yb+yo)/2)}xa=xb;ya=yb;xb=xo;yb=yo}gct(c,(xa+xb)/2,(ya+yb)/2,xo,yo,xo,yo);gst(c);}
else{for(var i=0;i<xn;i++){var x=ar[vx+i];var y=ar[vy+i];var xo=(x-x0)/(x1-x0);var yo=(y-y0)/(y1-y0);if(f==1){gbe(c,xo,yo);f=0}else{gln(c,xo,yo)}}gst(c);}
glw(c,0.2);
gss(c,'red');
var y=(av-y0)/(y1-y0);
gbe(c,0,y);gln(c,1,y);gst(c);
var t=''+av.toFixed(2);gfz(c,1,y,1,0,t,'red','left')
}
function sn(i,shi){if(shi.indexOf(' Shift ')<0){return ''+(i%10)}else{return shi.replace(/.* ([a-z])[a-z]*$/,'$1')}}
function gc(c){var div=document.getElementById('can0');while(div.firstChild){div.removeChild(div.firstChild)}c['can']=document.createElement('canvas');c['can'].id='can';c['wx']=window.innerWidth;c['wy']=window.innerHeight;c['xm']=Math.round(c['wx']*0.9+0.5);c['ym']=Math.round(c['wy']*0.8+0.5);if(c['ym']>c['xm']){c['ym']=c['xm']}c['xo']=0.0;c['yo']=0.0;c['ctx']=c['can'].getContext('2d');c['ctx'].canvas.width=c['xm']+1;c['ctx'].canvas.height=c['ym']+1;div.appendChild(c['can']);div=document.getElementById('smooth');c['smooth']=(div&&div.checked);div=document.getElementById('over');c['over']=(div&&div.checked);div=document.getElementById('skey');c['skey']=(div&&div.checked);div=document.getElementById('slines');c['slines']=(div&&div.checked);div=document.getElementById('tkey');c['tkey']=(div&&div.checked);div=document.getElementById('tlines');c['tlines']=(div&&div.checked);div=document.getElementById('zerob');c['zerob']=(div&&div.checked)}
function gdrw(d){var c={};gc(c);
gfs(c,'white');gss(c,'#0000c0');glw(c,0.5);gbd(c);
var rows=d['rows'],ymin=-1,ymax=0,xmin=-1,xmax=0;
var tda=0;
for(var i=0;i<rows;i++){var s=parseFloat(d['start:'+i]);var e=parseFloat(d['end:'+i]);var da=parseFloat(d['diffacc:'+i]);tda+=da;var ths=(da/(e-s))*Math.pow(2,32)/Math.pow(10,12);d['ths:'+i]=ths;if(ymin==-1||ymin>ths){ymin=ths}if(ths>ymax)ymax=ths;d['nx:'+i]=sn(i,d['shift:'+i]);if(xmin==-1||xmin>s){xmin=s}if(xmax<e){xmax=e}d['vx:'+i]=(s+e)/2.0};
var tav=(tda/(xmax-xmin))*Math.pow(2,32)/Math.pow(10,12);
var p5=(ymax-ymin)*0.05;ymax+=p5;ymin-=p5;if(ymin<0){ymin=0}
if(c['zerob']){ymin=0}
ggr(c,0.9,0.9,'THs',rows,xmin,xmax,ymin,ymax,d,'nx:','vx:','ths:',tav);
}
function dodrw(data){if(hasCan()){gdrw(sep(data))}}

55
pool/db.php

@ -5,13 +5,20 @@ include_once('base.php');
# #
# List of db functions to call and get the results back from ckdb # List of db functions to call and get the results back from ckdb
# From homeInfo() and the rest after that # From homeInfo() and the rest after that
# The result is an array of all ckdb result field names and their values # The repDecode() result is an array of all ckdb result field names and
# their values
# Also included: # Also included:
# ['ID'] the id sent # ['ID'] the id sent
# ['STAMP'] the ckdb reply timestamp # ['STAMP'] the ckdb reply timestamp
# ['STATUS'] the ckdb reply status (!'ok' = error) # ['STATUS'] the ckdb reply status (!'ok' = error)
# ['ERROR'] if status not 'ok' the error message reply # ['ERROR'] if status != 'ok', the error message reply
# The reply is false if the ckdb return data was corrupt # The reply is false if the ckdb return data was corrupt
# The repData() result is:
# ['ID'] the id sent
# ['STAMP'] the ckdb reply timestamp
# ['STATUS'] the ckdb reply status (!'ok' = error)
# ['DATA'] the rest of the ckdb reply, or '' on error
# ['ERROR'] if status not 'ok', the error message reply
# #
global $send_sep, $fld_sep, $val_sep; global $send_sep, $fld_sep, $val_sep;
$send_sep = '.'; $send_sep = '.';
@ -65,6 +72,38 @@ function repDecode($rep)
return $ans; return $ans;
} }
# #
function repData($rep)
{
global $send_sep;
$fix = preg_replace("/[\n\r]*$/",'',$rep);
$major = explode($send_sep, $fix, 4);
if (count($major) < 3)
return false;
$ans = array();
$ans['ID'] = $major[0];
$ans['STAMP'] = $major[1];
$ans['STATUS'] = $major[2];
$ans['DATA'] = '';
if ($major[2] == 'ok')
{
$ans['ERROR'] = null;
if (isset($major[3]))
$ans['DATA'] = $major[3];
}
else
{
if (isset($major[3]))
$ans['ERROR'] = $major[3];
else
$ans['ERROR'] = 'system error';
}
return $ans;
}
#
# Convenience function # Convenience function
function zeip() function zeip()
{ {
@ -282,6 +321,18 @@ function getShifts($user)
return repDecode($rep); return repDecode($rep);
} }
# #
function getShiftData($user)
{
if ($user == false)
showIndex();
$flds = array('username' => $user);
$msg = msgEncode('shifts', 'shift', $flds, $user);
$rep = sendsockreply('getShifts', $msg);
if (!$rep)
dbdown();
return repData($rep);
}
#
function getBlocks($user) function getBlocks($user)
{ {
if ($user == false) if ($user == false)

1
pool/page.php

@ -98,6 +98,7 @@ function pghead($script_marker, $name)
$head .= "window.onpaint=jst();\n</script>\n"; $head .= "window.onpaint=jst();\n</script>\n";
$head .= "<style type='text/css'> $head .= "<style type='text/css'>
input {vertical-align: -2px;}
form {display: inline-block;} form {display: inline-block;}
html, body {height: 100%; font-family:Arial, Verdana, sans-serif; font-size:12pt; background-color:#eeffff; text-align: center; background-repeat: no-repeat; background-position: center;} html, body {height: 100%; font-family:Arial, Verdana, sans-serif; font-size:12pt; background-color:#eeffff; text-align: center; background-repeat: no-repeat; background-position: center;}
.page {min-height: 100%; height: auto !important; height: 100%; margin: 0 auto -50px; position: relative;} .page {min-height: 100%; height: auto !important; height: 100%; margin: 0 auto -50px; position: relative;}

18
pool/page_blocks.php

@ -182,6 +182,16 @@ function doblocks($data, $user)
if ($stat == 'Orphan') if ($stat == 'Orphan')
$stara = '<span class=st1>*</span>'; $stara = '<span class=st1>*</span>';
if (isset($ans['statsconf:'.$i]))
{
if ($ans['statsconf:'.$i] == 'Y')
$approx = '';
else
$approx = '~';
}
else
$approx = '';
$diffacc = $ans['diffacc:'.$i]; $diffacc = $ans['diffacc:'.$i];
$acc = number_format($diffacc, 0); $acc = number_format($diffacc, 0);
@ -200,7 +210,7 @@ function doblocks($data, $user)
if ($stat != 'Orphan') if ($stat != 'Orphan')
$nettot += $netdiff; $nettot += $netdiff;
$cdfdsp = number_format($cdf, 2); $cdfdsp = number_format($cdf, 3);
} }
else else
{ {
@ -218,9 +228,9 @@ function doblocks($data, $user)
$pg .= "<td class=dl$ex>".htmlspecialchars($ans['workername:'.$i]).'</td>'; $pg .= "<td class=dl$ex>".htmlspecialchars($ans['workername:'.$i]).'</td>';
$pg .= "<td class=dr$ex>".btcfmt($ans['reward:'.$i]).'</td>'; $pg .= "<td class=dr$ex>".btcfmt($ans['reward:'.$i]).'</td>';
$pg .= "<td class=dl$ex>".utcd($ans['firstcreatedate:'.$i]).'</td>'; $pg .= "<td class=dl$ex>".utcd($ans['firstcreatedate:'.$i]).'</td>';
$pg .= "<td class=dr$ex>".$stat.'</td>'; $pg .= "<td class=dr$ex>$stat</td>";
$pg .= "<td class=dr>$stara$acc</td>"; $pg .= "<td class=dr>$stara$approx$acc</td>";
$pg .= "<td class=dr$bg>$bpct</td>"; $pg .= "<td class=dr$bg>$approx$bpct</td>";
$pg .= "<td class=dr>$cdfdsp</td>"; $pg .= "<td class=dr>$cdfdsp</td>";
$pg .= "</tr>\n"; $pg .= "</tr>\n";
} }

42
pool/page_usperf.php

@ -0,0 +1,42 @@
<?php
#
function dousperf($data, $user)
{
$ans = getShiftData($user);
$iCrap = strpos($_SERVER['HTTP_USER_AGENT'],'iP');
if ($iCrap)
$vlines = false;
else
$vlines = true;
$pg = '<h1>User Shift Performance</h1><br>';
if ($ans['STATUS'] == 'ok' and $ans['DATA'] != '')
{
$pg .= "<div><input type=checkbox id=skey onclick='godrw()' checked>shift key&nbsp;";
$pg .= "&nbsp;<input type=checkbox id=slines onclick='godrw()'";
if ($vlines)
$pg .= ' checked';
$pg .= ">shift lines&nbsp;";
$pg .= "&nbsp;<input type=checkbox id=tkey onclick='godrw()'>time key&nbsp;";
$pg .= "&nbsp;<input type=checkbox id=tlines onclick='godrw()'>time lines&nbsp;";
$pg .= "&nbsp;<input type=checkbox id=over onclick='godrw()'>key overlap&nbsp;";
$pg .= "&nbsp;<input type=checkbox id=smooth onclick='godrw()'>smooth&nbsp;";
$pg .= "&nbsp;<input type=checkbox id=zerob onclick='godrw()'>zero based</div>";
$pg .= "<div id=can0><canvas id=can width=1 height=1>";
$pg .= "A graph will show here if your browser supports html5/canvas";
$pg .= "</canvas></div>\n";
$data = str_replace(array("\\","'"), array("\\\\","\\'"), $ans['DATA']);
$pg .= "<script src='/can.js'></script>\n";
$pg .= "<script type='text/javascript'>function godrw(){dodrw('$data')};godrw();</script>\n";
}
return $pg;
}
#
function show_usperf($info, $page, $menu, $name, $user)
{
gopage($info, NULL, 'dousperf', $page, $menu, $name, $user);
}
#
?>

1
pool/prime.php

@ -77,6 +77,7 @@ function check()
), ),
'Workers' => array( 'Workers' => array(
'Shifts' => 'shifts', 'Shifts' => 'shifts',
'Shift Graph' => 'usperf',
'Workers' => 'workers', 'Workers' => 'workers',
'Management' => 'workmgt', 'Management' => 'workmgt',
), ),

58
src/ckdb.c

@ -1885,6 +1885,64 @@ static void make_a_shift_mark()
char cd_buf[DATE_BUFSIZ], cd_buf2[DATE_BUFSIZ]; char cd_buf[DATE_BUFSIZ], cd_buf2[DATE_BUFSIZ];
int used_wid; int used_wid;
/* If there are no CURRENT marks, make the first one by
* finding the first CURRENT workinfo and use that
* to create a MARKTYPE_OTHER_BEGIN for the pool
* This will keep being checked when the pool first starts
* until the first workinfo is created, but once the first
* marks has been created it will skip over the if code
* forever after that */
K_RLOCK(marks_free);
m_item = last_in_ktree(marks_root, m_ctx);
K_RUNLOCK(marks_free);
DATA_MARKS_NULL(marks, m_item);
// Mark sorting means all CURRENT will be on the end
if (!m_item || !CURRENT(&(marks->expirydate))) {
K_RLOCK(workinfo_free);
wi_item = first_in_ktree(workinfo_root, wi_ctx);
DATA_WORKINFO_NULL(workinfo, wi_item);
if (!wi_item) {
K_RUNLOCK(workinfo_free);
LOGWARNING("%s() ckdb workinfo:'%s' marks:'%s' ..."
" start ckpool!", __func__,
"none", m_item ? "expired" : "none");
return;
}
while (wi_item && !CURRENT(&(workinfo->expirydate))) {
wi_item = next_in_ktree(wi_ctx);
DATA_WORKINFO_NULL(workinfo, wi_item);
}
if (!wi_item) {
K_RUNLOCK(workinfo_free);
LOGWARNING("%s() ckdb workinfo:'%s' marks:'%s' ..."
" start ckpool!", __func__,
"expired", m_item ? "expired" : "none");
return;
}
K_RUNLOCK(workinfo_free);
char description[TXT_BIG+1];
tv_t now;
ok = marks_description(description, sizeof(description),
MARKTYPE_OTHER_BEGIN_STR, 0, NULL,
"Pool Start");
if (!ok)
return;
setnow(&now);
ok = marks_process(NULL, true, EMPTY, workinfo->workinfoid,
description, EMPTY, MARKTYPE_OTHER_BEGIN_STR,
MARK_USED_STR, (char *)by_default,
(char *)__func__, (char *)inet_default,
&now, NULL);
if (ok) {
LOGWARNING("%s() FIRST mark %"PRId64"/%s/%s/%s/",
__func__, workinfo->workinfoid,
MARKTYPE_OTHER_BEGIN_STR, MARK_USED_STR,
description);
}
return;
}
/* Find the last !new sharesummary workinfoid /* Find the last !new sharesummary workinfoid
* If the shift needs to go beyond this, then it's not ready yet */ * If the shift needs to go beyond this, then it's not ready yet */
ss_age_wid = 0; ss_age_wid = 0;

3
src/ckdb.h

@ -55,7 +55,7 @@
#define DB_VLOCK "1" #define DB_VLOCK "1"
#define DB_VERSION "1.0.0" #define DB_VERSION "1.0.0"
#define CKDB_VERSION DB_VERSION"-1.031" #define CKDB_VERSION DB_VERSION"-1.033"
#define WHERE_FFL " - from %s %s() line %d" #define WHERE_FFL " - from %s %s() line %d"
#define WHERE_FFL_HERE __FILE__, __func__, __LINE__ #define WHERE_FFL_HERE __FILE__, __func__, __LINE__
@ -1596,6 +1596,7 @@ extern K_STORE *marks_store;
#define MARKTYPE_SHIFT_END 'e' #define MARKTYPE_SHIFT_END 'e'
// 'o' used for manual marks // 'o' used for manual marks
#define MARKTYPE_OTHER_BEGIN 'o' #define MARKTYPE_OTHER_BEGIN 'o'
#define MARKTYPE_OTHER_BEGIN_STR "o"
// 'f' used for manual marks // 'f' used for manual marks
#define MARKTYPE_OTHER_FINISH 'f' #define MARKTYPE_OTHER_FINISH 'f'

11
src/ckdb_cmd.c

@ -902,6 +902,11 @@ redo:
blocks_confirmed(blocks->confirmed), FLDSEP); blocks_confirmed(blocks->confirmed), FLDSEP);
APPEND_REALLOC(buf, off, len, tmp); APPEND_REALLOC(buf, off, len, tmp);
snprintf(tmp, sizeof(tmp),
"statsconf:%d=%s%c", rows,
blocks->statsconfirmed, FLDSEP);
APPEND_REALLOC(buf, off, len, tmp);
double_to_buf(blocks->diffacc, reply, sizeof(reply)); double_to_buf(blocks->diffacc, reply, sizeof(reply));
snprintf(tmp, sizeof(tmp), "diffacc:%d=%s%c", rows, reply, FLDSEP); snprintf(tmp, sizeof(tmp), "diffacc:%d=%s%c", rows, reply, FLDSEP);
APPEND_REALLOC(buf, off, len, tmp); APPEND_REALLOC(buf, off, len, tmp);
@ -1016,8 +1021,8 @@ redo:
"rows=%d%cflds=%s%c", "rows=%d%cflds=%s%c",
rows, FLDSEP, rows, FLDSEP,
"seq,height,blockhash,nonce,reward,workername,firstcreatedate," "seq,height,blockhash,nonce,reward,workername,firstcreatedate,"
"createdate,status,diffacc,diffinv,shareacc,shareinv,elapsed," "createdate,status,statsconf,diffacc,diffinv,shareacc,"
"netdiff,diffratio,cdf,luck", FLDSEP); "shareinv,elapsed,netdiff,diffratio,cdf,luck", FLDSEP);
APPEND_REALLOC(buf, off, len, tmp); APPEND_REALLOC(buf, off, len, tmp);
snprintf(tmp, sizeof(tmp), "arn=%s%carp=%s", "Blocks,BlockStats", FLDSEP, ",s"); snprintf(tmp, sizeof(tmp), "arn=%s%carp=%s", "Blocks,BlockStats", FLDSEP, ",s");
@ -5446,7 +5451,7 @@ struct CMDS ckdb_cmds[] = {
{ CMD_WORKERSTAT,"workerstats", false, true, cmd_workerstats,ACCESS_POOL }, { CMD_WORKERSTAT,"workerstats", false, true, cmd_workerstats,ACCESS_POOL },
{ CMD_BLOCK, "block", false, true, cmd_blocks, ACCESS_POOL }, { CMD_BLOCK, "block", false, true, cmd_blocks, ACCESS_POOL },
{ CMD_BLOCKLIST,"blocklist", false, false, cmd_blocklist, ACCESS_WEB }, { CMD_BLOCKLIST,"blocklist", false, false, cmd_blocklist, ACCESS_WEB },
{ CMD_BLOCKSTATUS,"blockstatus",false, false, cmd_blockstatus,ACCESS_WEB }, { CMD_BLOCKSTATUS,"blockstatus",false, false, cmd_blockstatus,ACCESS_SYSTEM },
{ CMD_NEWID, "newid", false, false, cmd_newid, ACCESS_SYSTEM }, { CMD_NEWID, "newid", false, false, cmd_newid, ACCESS_SYSTEM },
{ CMD_PAYMENTS, "payments", false, false, cmd_payments, ACCESS_WEB }, { CMD_PAYMENTS, "payments", false, false, cmd_payments, ACCESS_WEB },
{ CMD_WORKERS, "workers", false, false, cmd_workers, ACCESS_WEB }, { CMD_WORKERS, "workers", false, false, cmd_workers, ACCESS_WEB },

99
src/connector.c

@ -23,6 +23,8 @@
#define MAX_MSGSIZE 1024 #define MAX_MSGSIZE 1024
typedef struct client_instance client_instance_t;
struct client_instance { struct client_instance {
/* For clients hashtable */ /* For clients hashtable */
UT_hash_handle hh; UT_hash_handle hh;
@ -34,8 +36,8 @@ struct client_instance {
int ref; int ref;
/* For dead_clients list */ /* For dead_clients list */
struct client_instance *next; client_instance_t *next;
struct client_instance *prev; client_instance_t *prev;
struct sockaddr address; struct sockaddr address;
char address_name[INET6_ADDRSTRLEN]; char address_name[INET6_ADDRSTRLEN];
@ -49,8 +51,6 @@ struct client_instance {
bool passthrough; bool passthrough;
}; };
typedef struct client_instance client_instance_t;
struct sender_send { struct sender_send {
struct sender_send *next; struct sender_send *next;
struct sender_send *prev; struct sender_send *prev;
@ -72,6 +72,8 @@ struct connector_data {
int *serverfd; int *serverfd;
/* All time count of clients connected */ /* All time count of clients connected */
int nfds; int nfds;
/* The epoll fd */
int epfd;
bool accept; bool accept;
pthread_t pth_sender; pthread_t pth_sender;
@ -81,6 +83,8 @@ struct connector_data {
client_instance_t *clients; client_instance_t *clients;
/* Linked list of dead clients no longer in use but may still have references */ /* Linked list of dead clients no longer in use but may still have references */
client_instance_t *dead_clients; client_instance_t *dead_clients;
/* Linked list of client structures we can reuse */
client_instance_t *recycled_clients;
int clients_generated; int clients_generated;
int dead_generated; int dead_generated;
@ -120,6 +124,42 @@ static void dec_instance_ref(cdata_t *cdata, client_instance_t *client)
ck_wunlock(&cdata->lock); ck_wunlock(&cdata->lock);
} }
/* Recruit a client structure from a recycled one if available, creating a
* new structure only if we have none to reuse. */
static client_instance_t *recruit_client(cdata_t *cdata)
{
client_instance_t *client = NULL;
ck_wlock(&cdata->lock);
if (cdata->recycled_clients) {
client = cdata->recycled_clients;
DL_DELETE(cdata->recycled_clients, client);
} else
cdata->clients_generated++;
ck_wunlock(&cdata->lock);
if (!client) {
LOGDEBUG("Connector created new client instance");
client = ckzalloc(sizeof(client_instance_t));
} else
LOGDEBUG("Connector recycled client instance");
return client;
}
static void __recycle_client(cdata_t *cdata, client_instance_t *client)
{
memset(client, 0, sizeof(client_instance_t));
client->id = -1;
DL_APPEND(cdata->recycled_clients, client);
}
static void recycle_client(cdata_t *cdata, client_instance_t *client)
{
ck_wlock(&cdata->lock);
__recycle_client(cdata, client);
ck_wunlock(&cdata->lock);
}
/* Accepts incoming connections on the server socket and generates client /* Accepts incoming connections on the server socket and generates client
* instances */ * instances */
static int accept_client(cdata_t *cdata, const int epfd, const uint64_t server) static int accept_client(cdata_t *cdata, const int epfd, const uint64_t server)
@ -140,7 +180,7 @@ static int accept_client(cdata_t *cdata, const int epfd, const uint64_t server)
} }
sockd = cdata->serverfd[server]; sockd = cdata->serverfd[server];
client = ckzalloc(sizeof(client_instance_t)); client = recruit_client(cdata);
client->server = server; client->server = server;
address_len = sizeof(client->address); address_len = sizeof(client->address);
fd = accept(sockd, &client->address, &address_len); fd = accept(sockd, &client->address, &address_len);
@ -152,7 +192,7 @@ static int accept_client(cdata_t *cdata, const int epfd, const uint64_t server)
return 0; return 0;
} }
LOGERR("Failed to accept on socket %d in acceptor", sockd); LOGERR("Failed to accept on socket %d in acceptor", sockd);
dealloc(client); recycle_client(cdata, client);
return -1; return -1;
} }
@ -174,7 +214,7 @@ static int accept_client(cdata_t *cdata, const int epfd, const uint64_t server)
LOGWARNING("Unknown INET type for client %d on socket %d", LOGWARNING("Unknown INET type for client %d on socket %d",
cdata->nfds, fd); cdata->nfds, fd);
Close(fd); Close(fd);
free(client); recycle_client(cdata, client);
return 0; return 0;
} }
@ -189,7 +229,7 @@ static int accept_client(cdata_t *cdata, const int epfd, const uint64_t server)
event.events = EPOLLIN; event.events = EPOLLIN;
if (unlikely(epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &event) < 0)) { if (unlikely(epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &event) < 0)) {
LOGERR("Failed to epoll_ctl add in accept_client"); LOGERR("Failed to epoll_ctl add in accept_client");
free(client); recycle_client(cdata, client);
return 0; return 0;
} }
@ -199,7 +239,6 @@ static int accept_client(cdata_t *cdata, const int epfd, const uint64_t server)
__inc_instance_ref(client); __inc_instance_ref(client);
ck_wlock(&cdata->lock); ck_wlock(&cdata->lock);
cdata->clients_generated++;
client->id = cdata->client_id++; client->id = cdata->client_id++;
HASH_ADD_I64(cdata->clients, id, client); HASH_ADD_I64(cdata->clients, id, client);
cdata->nfds++; cdata->nfds++;
@ -219,6 +258,7 @@ static int drop_client(cdata_t *cdata, client_instance_t *client)
if (fd != -1) { if (fd != -1) {
client_id = client->id; client_id = client->id;
epoll_ctl(cdata->epfd, EPOLL_CTL_DEL, client->fd, NULL);
Close(client->fd); Close(client->fd);
HASH_DEL(cdata->clients, client); HASH_DEL(cdata->clients, client);
DL_APPEND(cdata->dead_clients, client); DL_APPEND(cdata->dead_clients, client);
@ -263,8 +303,8 @@ static int invalidate_client(ckpool_t *ckp, cdata_t *cdata, client_instance_t *c
DL_FOREACH_SAFE(cdata->dead_clients, client, tmp) { DL_FOREACH_SAFE(cdata->dead_clients, client, tmp) {
if (!client->ref) { if (!client->ref) {
DL_DELETE(cdata->dead_clients, client); DL_DELETE(cdata->dead_clients, client);
LOGINFO("Connector discarding client %"PRId64, client->id); LOGINFO("Connector recycling client %"PRId64, client->id);
dealloc(client); __recycle_client(cdata, client);
} }
} }
ck_wunlock(&cdata->lock); ck_wunlock(&cdata->lock);
@ -372,6 +412,19 @@ reparse:
goto retry; goto retry;
} }
static client_instance_t *ref_client_by_id(cdata_t *cdata, int64_t id)
{
client_instance_t *client;
ck_wlock(&cdata->lock);
HASH_FIND_I64(cdata->clients, &id, client);
if (client)
__inc_instance_ref(client);
ck_wunlock(&cdata->lock);
return client;
}
/* Waits on fds ready to read on from the list stored in conn_instance and /* Waits on fds ready to read on from the list stored in conn_instance and
* handles the incoming messages */ * handles the incoming messages */
void *receiver(void *arg) void *receiver(void *arg)
@ -383,7 +436,7 @@ void *receiver(void *arg)
rename_proc("creceiver"); rename_proc("creceiver");
epfd = epoll_create1(EPOLL_CLOEXEC); epfd = cdata->epfd = epoll_create1(EPOLL_CLOEXEC);
if (epfd < 0) { if (epfd < 0) {
LOGEMERG("FATAL: Failed to create epoll in receiver"); LOGEMERG("FATAL: Failed to create epoll in receiver");
return NULL; return NULL;
@ -428,13 +481,18 @@ void *receiver(void *arg)
continue; continue;
} }
client = event.data.ptr; client = event.data.ptr;
/* Recheck this client still exists in the same form when it
* was queued. */
client = ref_client_by_id(cdata, client->id);
if (unlikely(!client))
continue;
if ((event.events & EPOLLERR) || (event.events & EPOLLHUP)) { if ((event.events & EPOLLERR) || (event.events & EPOLLHUP)) {
/* Client disconnected */ /* Client disconnected */
LOGDEBUG("Client fd %d HUP in epoll", client->fd); LOGDEBUG("Client fd %d HUP in epoll", client->fd);
invalidate_client(cdata->pi->ckp, cdata, client); invalidate_client(cdata->pi->ckp, cdata, client);
continue; } else
}
parse_client_msg(cdata, client); parse_client_msg(cdata, client);
dec_instance_ref(cdata, client);
} }
return NULL; return NULL;
} }
@ -606,19 +664,6 @@ static bool client_exists(cdata_t *cdata, const int64_t id)
return !!client; return !!client;
} }
static client_instance_t *ref_client_by_id(cdata_t *cdata, int64_t id)
{
client_instance_t *client;
ck_wlock(&cdata->lock);
HASH_FIND_I64(cdata->clients, &id, client);
if (client)
__inc_instance_ref(client);
ck_wunlock(&cdata->lock);
return client;
}
static void passthrough_client(cdata_t *cdata, client_instance_t *client) static void passthrough_client(cdata_t *cdata, client_instance_t *client)
{ {
char *buf; char *buf;

16
src/stratifier.c

@ -2998,6 +2998,7 @@ static int send_recv_auth(stratum_instance_t *client)
sdata_t *sdata = ckp->data; sdata_t *sdata = ckp->data;
char *buf = NULL, *json_msg; char *buf = NULL, *json_msg;
bool contended = false; bool contended = false;
size_t responselen = 0;
char cdfield[64]; char cdfield[64];
int ret = 1; int ret = 1;
json_t *val; json_t *val;
@ -3037,15 +3038,16 @@ static int send_recv_auth(stratum_instance_t *client)
contended = true; contended = true;
free(json_msg); free(json_msg);
if (likely(buf)) { /* Leave ample room for response based on buf length */
if (likely(buf))
responselen = strlen(buf);
if (likely(responselen > 0)) {
char *cmd = NULL, *secondaryuserid = NULL, *response; char *cmd = NULL, *secondaryuserid = NULL, *response;
worker_instance_t *worker = client->worker_instance; worker_instance_t *worker = client->worker_instance;
json_error_t err_val; json_error_t err_val;
size_t responselen;
json_t *val = NULL; json_t *val = NULL;
LOGINFO("Got ckdb response: %s", buf); LOGINFO("Got ckdb response: %s", buf);
responselen = strlen(buf); /* Leave ample room for response based on buf length */
response = alloca(responselen); response = alloca(responselen);
memset(response, 0, responselen); memset(response, 0, responselen);
if (unlikely(sscanf(buf, "id.%*d.%s", response) < 1 || strlen(response) < 1 || !strchr(response, '='))) { if (unlikely(sscanf(buf, "id.%*d.%s", response) < 1 || strlen(response) < 1 || !strchr(response, '='))) {
@ -4446,6 +4448,7 @@ static bool test_and_clear(bool *val, mutex_t *lock)
static void ckdbq_process(ckpool_t *ckp, char *msg) static void ckdbq_process(ckpool_t *ckp, char *msg)
{ {
sdata_t *sdata = ckp->data; sdata_t *sdata = ckp->data;
size_t responselen = 0;
char *buf = NULL; char *buf = NULL;
while (!buf) { while (!buf) {
@ -4465,9 +4468,12 @@ static void ckdbq_process(ckpool_t *ckp, char *msg)
/* Process any requests from ckdb that are heartbeat responses with /* Process any requests from ckdb that are heartbeat responses with
* specific requests. */ * specific requests. */
if (likely(buf)) { if (likely(buf))
char response[PAGESIZE] = {}; responselen = strlen(buf);
if (likely(responselen > 0)) {
char *response = alloca(responselen);
memset(response, 0, responselen);
sscanf(buf, "id.%*d.%s", response); sscanf(buf, "id.%*d.%s", response);
if (safecmp(response, "ok")) { if (safecmp(response, "ok")) {
char *cmd; char *cmd;

Loading…
Cancel
Save