Skip to content

Commit

Permalink
Merge pull request #1634 from skalenetwork/ticket-883/fix-empty-SNB-c…
Browse files Browse the repository at this point in the history
…ache-rewrite

ticket-883 Fixed problem related to empty SNB cache rewriting data
  • Loading branch information
DmytroNazarenko authored Nov 14, 2023
2 parents feb3bca + c88275c commit 133bafd
Show file tree
Hide file tree
Showing 5 changed files with 156 additions and 160 deletions.
81 changes: 47 additions & 34 deletions agent/loop.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -510,17 +510,21 @@ const gArrClients = [];
export function notifyCacheChangedSNB( arrSChainsCached ) {
const cntWorkers = gArrWorkers.length;
if( cntWorkers == 0 ) {
if( log.verboseGet() >= log.verboseReversed().debug ) {
log.write( cc.warning( "Will skip chainsCacheChanged dispatch event with " ) +
cc.warning( "no chains arrived in " ) + threadInfo.threadDescription() + "\n" );
if( threadInfo.joCustomThreadProperties.isSChainsCacheNeeded ) {
if( log.verboseGet() >= log.verboseReversed().debug ) {
log.write( cc.warning( "Will skip chainsCacheChanged dispatch event with " ) +
cc.warning( "no chains arrived in " ) + threadInfo.threadDescription() + "\n" );
}
}
return;
}
if( log.verboseGet() >= log.verboseReversed().debug ) {
log.write(
cc.debug( "Loop module will broadcast arrSChainsCached event to its " ) +
cc.info( cntWorkers ) + cc.debug( " worker(s) in " ) +
threadInfo.threadDescription() + cc.debug( "..." ) + "\n" );
if( threadInfo.joCustomThreadProperties.isSChainsCacheNeeded ) {
if( log.verboseGet() >= log.verboseReversed().debug ) {
log.write(
cc.debug( "Loop module will broadcast arrSChainsCached event to its " ) +
cc.info( cntWorkers ) + cc.debug( " worker(s) in " ) +
threadInfo.threadDescription() + cc.debug( "..." ) + "\n" );
}
}
for( let idxWorker = 0; idxWorker < cntWorkers; ++ idxWorker ) {
const jo = {
Expand All @@ -529,34 +533,44 @@ export function notifyCacheChangedSNB( arrSChainsCached ) {
"arrSChainsCached": arrSChainsCached
}
};
if( log.verboseGet() >= log.verboseReversed().debug ) {
log.write( cc.debug( "S-Chains cache will be sent to " ) +
cc.notice( gArrClients[idxWorker].url ) + cc.debug( " loop worker..." ) +
"\n" );
if( threadInfo.joCustomThreadProperties.isSChainsCacheNeeded ) {
if( log.verboseGet() >= log.verboseReversed().debug ) {
log.write( cc.debug( "S-Chains cache will be sent to " ) +
cc.notice( gArrClients[idxWorker].url ) + cc.debug( " loop worker..." ) +
"\n" );
}
}
gArrClients[idxWorker].send( jo );
if( log.verboseGet() >= log.verboseReversed().debug ) {
log.write( cc.debug( "S-Chains cache did sent to " ) +
cc.notice( gArrClients[idxWorker].url ) + cc.debug( " loop worker" ) +
"\n" );
if( threadInfo.joCustomThreadProperties.isSChainsCacheNeeded ) {
if( log.verboseGet() >= log.verboseReversed().debug ) {
log.write( cc.debug( "S-Chains cache did sent to " ) +
cc.notice( gArrClients[idxWorker].url ) + cc.debug( " loop worker" ) +
"\n" );
}
}
}
if( log.verboseGet() >= log.verboseReversed().debug ) {
log.write(
cc.debug( "Loop module did finished broadcasting arrSChainsCached event to its " ) +
cc.info( cntWorkers ) + cc.debug( " worker(s) in " ) +
threadInfo.threadDescription() + cc.debug( "..." ) + "\n" );
if( threadInfo.joCustomThreadProperties.isSChainsCacheNeeded ) {
if( log.verboseGet() >= log.verboseReversed().debug ) {
log.write(
cc.debug( "Loop module did finished broadcasting arrSChainsCached event to its " ) +
cc.info( cntWorkers ) + cc.debug( " worker(s) in " ) +
threadInfo.threadDescription() + cc.debug( "..." ) + "\n" );
}
}
}

if( log.verboseGet() >= log.verboseReversed().trace ) {
log.write( cc.debug( "Subscribe to chainsCacheChanged event in " ) +
threadInfo.threadDescription() + "\n" );
if( threadInfo.joCustomThreadProperties.isSChainsCacheNeeded ) {
log.write( cc.debug( "Subscribe to chainsCacheChanged event in " ) +
threadInfo.threadDescription() + "\n" );
}
}
skaleObserver.events.on( "chainsCacheChanged", function( eventData ) {
if( log.verboseGet() >= log.verboseReversed().trace ) {
log.write( cc.debug( "Did arrived chainsCacheChanged event in " ) +
threadInfo.threadDescription() + "\n" );
if( threadInfo.joCustomThreadProperties.isSChainsCacheNeeded ) {
if( log.verboseGet() >= log.verboseReversed().trace ) {
log.write( cc.debug( "Did arrived chainsCacheChanged event in " ) +
threadInfo.threadDescription() + "\n" );
}
}
notifyCacheChangedSNB( eventData.detail.arrSChainsCached );
} );
Expand Down Expand Up @@ -755,14 +769,12 @@ export async function ensureHaveWorkers( opts ) {
"nMaxTransactionsM2S": opts.imaState.nMaxTransactionsM2S,
"nMaxTransactionsS2M": opts.imaState.nMaxTransactionsS2M,
"nMaxTransactionsS2S": opts.imaState.nMaxTransactionsS2S,

"nBlockAwaitDepthM2S": opts.imaState.nBlockAwaitDepthM2S,
"nBlockAwaitDepthS2M": opts.imaState.nBlockAwaitDepthS2M,
"nBlockAwaitDepthS2S": opts.imaState.nBlockAwaitDepthS2S,
"nBlockAgeM2S": opts.imaState.nBlockAgeM2S,
"nBlockAgeS2M": opts.imaState.nBlockAgeS2M,
"nBlockAgeS2S": opts.imaState.nBlockAgeS2S,

"nLoopPeriodSeconds": opts.imaState.nLoopPeriodSeconds,
"nNodeNumber": opts.imaState.nNodeNumber,
"nNodesCount": opts.imaState.nNodesCount,
Expand Down Expand Up @@ -834,7 +846,7 @@ export async function ensureHaveWorkers( opts ) {
};
while( ! aClient.logicalInitComplete ) {
if( log.verboseGet() >= log.verboseReversed().info )
log.write( "LOOP server is not inited yet...\n" );
log.write( "LOOP server is not initialized yet...\n" );
await threadInfo.sleep( 1000 );
aClient.send( jo );
}
Expand All @@ -844,12 +856,14 @@ export async function ensureHaveWorkers( opts ) {
cc.info( gArrWorkers.length ) + cc.debug( " worker(s) in " ) +
threadInfo.threadDescription() + cc.debug( "" ) + "\n" );
}
if( log.verboseGet() >= log.verboseReversed().trace ) {
if( threadInfo.joCustomThreadProperties.isSChainsCacheNeeded &&
log.verboseGet() >= log.verboseReversed().trace ) {
log.write( cc.debug( "Subscribe to inThread-arrSChainsCached event in " ) +
threadInfo.threadDescription() + "\n" );
}
skaleObserver.events.on( "inThread-arrSChainsCached", function( eventData ) {
if( log.verboseGet() >= log.verboseReversed().trace ) {
if( threadInfo.joCustomThreadProperties.isSChainsCacheNeeded &&
log.verboseGet() >= log.verboseReversed().trace ) {
log.write( cc.debug( "Did arrived inThread-arrSChainsCached event in " ) +
threadInfo.threadDescription() + "\n" );
}
Expand All @@ -859,9 +873,8 @@ export async function ensureHaveWorkers( opts ) {
// Force broadcast what we have in SNB right now because works above can start later than SNB
// is finished download connected chains quickly
if( log.verboseGet() >= log.verboseReversed().debug ) {
log.write(
cc.debug( "Loop module will do first initial broadcast of arrSChainsCached to its " ) +
cc.info( cntWorkers ) + cc.debug( " worker(s) in " ) +
log.write( cc.debug( "Loop module will do first initial broadcast of arrSChainsCached " +
"to its " ) + cc.info( cntWorkers ) + cc.debug( " worker(s) in " ) +
threadInfo.threadDescription() + cc.debug( "..." ) + "\n" );
}
notifyCacheChangedSNB( skaleObserver.getLastCachedSChains() );
Expand Down
26 changes: 18 additions & 8 deletions agent/loopWorker.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,17 @@ class ObserverServer extends SocketServer {
const isFlush = true;
socket.send( jo, isFlush );
} );
if( log.verboseGet() >= log.verboseReversed().debug ) {
log.write(
cc.debug( "Loop worker " ) + cc.notice( workerData.url ) +
if( threadInfo.joCustomThreadProperties.isSChainsCacheNeeded ) {
if( log.verboseGet() >= log.verboseReversed().debug ) {
log.write(
cc.debug( "Loop worker " ) + cc.notice( workerData.url ) +
cc.debug( " will save cached S-Chains..." ) + "\n" );
}
}
if( ! self.opts.imaState.optsLoop.enableStepS2S )
threadInfo.joCustomThreadProperties.isSChainsCacheNeeded = false;
if( threadInfo.joCustomThreadProperties.isSChainsCacheNeeded ) {
log.write( cc.debug( "Loop worker " ) + cc.notice( workerData.url ) +
cc.debug( " will save cached S-Chains..." ) + "\n" );
}
skaleObserver.setLastCachedSChains( self.opts.imaState.arrSChainsCached );
Expand Down Expand Up @@ -200,11 +208,13 @@ class ObserverServer extends SocketServer {
imaState.joSChainNetworkInfo = joMessage.joSChainNetworkInfo;
};
self.mapApiHandlers.schainsCached = function( joMessage, joAnswer, eventData, socket ) {
if( log.verboseGet() >= log.verboseReversed().debug ) {
self.log( cc.debug( "S-Chains cache did arrived to " ) +
cc.notice( workerData.url ) + cc.debug( " loop worker in " ) +
threadInfo.threadDescription() + cc.debug( ": " ) +
cc.j( joMessage.message.arrSChainsCached ) + "\n" );
if( threadInfo.joCustomThreadProperties.isSChainsCacheNeeded ) {
if( log.verboseGet() >= log.verboseReversed().debug ) {
self.log( cc.debug( "S-Chains cache did arrived to " ) +
cc.notice( workerData.url ) + cc.debug( " loop worker in " ) +
threadInfo.threadDescription() + cc.debug( ": " ) +
cc.j( joMessage.message.arrSChainsCached ) + "\n" );
}
}
skaleObserver.setLastCachedSChains( joMessage.message.arrSChainsCached );
};
Expand Down
5 changes: 5 additions & 0 deletions agent/threadInfo.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ import * as cc from "../npms/skale-cc/cc.mjs";
const Worker = worker_threads.Worker;
export { Worker };

const joCustomThreadProperties = {
"isSChainsCacheNeeded": true // by default is set to true
};
export { joCustomThreadProperties };

export const sleep = ( milliseconds ) => {
return new Promise( resolve => setTimeout( resolve, milliseconds ) );
};
Expand Down
Loading

0 comments on commit 133bafd

Please sign in to comment.