Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Insert in Bulk #1595

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cardano-db-sync/src/Cardano/DbSync/DbAction.hs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ newThreadChannels =
-- The pipeline queue in the LocalChainSync machinery is 50 elements long
-- so we should not exceed that.
ThreadChannels
<$> TBQ.newTBQueueIO 47
<$> TBQ.newTBQueueIO 300
<*> newTVarIO False

writeDbActionQueue :: ThreadChannels -> DbAction -> STM ()
Expand Down
40 changes: 29 additions & 11 deletions cardano-db-sync/src/Cardano/DbSync/Default.hs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import Cardano.DbSync.Era.Shelley.Adjust (adjustEpochRewards)
import qualified Cardano.DbSync.Era.Shelley.Generic as Generic
import Cardano.DbSync.Era.Shelley.Insert (insertShelleyBlock, mkAdaPots)
import Cardano.DbSync.Era.Shelley.Insert.Epoch (insertPoolDepositRefunds, insertRewards)
import Cardano.DbSync.Era.Shelley.Insert.Grouped
import Cardano.DbSync.Era.Shelley.Validate (validateEpochRewards)
import Cardano.DbSync.Error
import Cardano.DbSync.Fix.EpochStake
Expand Down Expand Up @@ -57,22 +58,24 @@ insertListBlocks ::
[CardanoBlock] ->
IO (Either SyncNodeError ())
insertListBlocks synEnv blocks = do
DB.runDbIohkLogging (envBackend synEnv) tracer
. runExceptT
$ traverse_ (applyAndInsertBlockMaybe synEnv) blocks
DB.runDbIohkLogging (envBackend synEnv) tracer $ runExceptT $ do
groups <- foldM (applyAndInsertBlockMaybe synEnv) [] blocks
unless (null groups) $
void $ insertBlockGroupedData synEnv $ mconcat $ reverse groups
where
tracer = getTrace synEnv

applyAndInsertBlockMaybe ::
SyncEnv ->
[BlockGroupedData] ->
CardanoBlock ->
ExceptT SyncNodeError (ReaderT SqlBackend (LoggingT IO)) ()
applyAndInsertBlockMaybe syncEnv cblk = do
ExceptT SyncNodeError (ReaderT SqlBackend (LoggingT IO)) [BlockGroupedData]
applyAndInsertBlockMaybe syncEnv groups cblk = do
bl <- liftIO $ isConsistent syncEnv
(!applyRes, !tookSnapshot) <- liftIO (mkApplyResult bl)
if bl
then -- In the usual case it will be consistent so we don't need to do any queries. Just insert the block
insertBlock syncEnv cblk applyRes False tookSnapshot
insertBlock syncEnv groups cblk applyRes False False tookSnapshot
else do
eiBlockInDbAlreadyId <- lift (DB.queryBlockId (SBS.fromShort . Consensus.getOneEraHash $ blockHash cblk))
-- If the block is already in db, do nothing. If not, delete all blocks with greater 'BlockNo' or
Expand All @@ -88,7 +91,7 @@ applyAndInsertBlockMaybe syncEnv cblk = do
]
rollbackFromBlockNo syncEnv (blockNo cblk)
void $ migrateStakeDistr syncEnv (apOldLedger applyRes)
insertBlock syncEnv cblk applyRes True tookSnapshot
_ <- insertBlock syncEnv groups cblk applyRes True True tookSnapshot
liftIO $ setConsistentLevel syncEnv Consistent
Right blockId | Just (adaPots, slotNo, epochNo) <- getAdaPots applyRes -> do
replaced <- lift $ DB.replaceAdaPots blockId $ mkAdaPots blockId slotNo epochNo adaPots
Expand All @@ -99,6 +102,7 @@ applyAndInsertBlockMaybe syncEnv cblk = do
| Just epochNo <- getNewEpoch applyRes ->
liftIO $ logInfo tracer $ "Reached " <> textShow epochNo
_ -> pure ()
pure []
where
tracer = getTrace syncEnv

Expand All @@ -122,26 +126,31 @@ applyAndInsertBlockMaybe syncEnv cblk = do

insertBlock ::
SyncEnv ->
[BlockGroupedData] ->
CardanoBlock ->
ApplyResult ->
-- force inserting all data
Bool ->
-- is first Block after rollback
Bool ->
-- has snapshot been taken
Bool ->
ExceptT SyncNodeError (ReaderT SqlBackend (LoggingT IO)) ()
insertBlock syncEnv cblk applyRes firstAfterRollback tookSnapshot = do
ExceptT SyncNodeError (ReaderT SqlBackend (LoggingT IO)) [BlockGroupedData]
insertBlock syncEnv groupsPrev cblk applyRes forceInsert firstAfterRollback tookSnapshot = do
!epochEvents <- liftIO $ atomically $ generateNewEpochEvents syncEnv (apSlotDetails applyRes)
let !applyResult = applyRes {apEvents = sort $ epochEvents <> apEvents applyRes}
let !details = apSlotDetails applyResult
let !withinTwoMin = isWithinTwoMin details
let !withinHalfHour = isWithinHalfHour details
let !insertAll = forceInsert || withinTwoMin || withinHalfHour || tookSnapshot
insertLedgerEvents syncEnv (sdEpochNo details) (apEvents applyResult)
let isNewEpochEvent = hasNewEpochEvent (apEvents applyResult)
let isStartEventOrRollback = hasEpochStartEvent (apEvents applyResult) || firstAfterRollback
let isMember poolId = Set.member poolId (apPoolsRegistered applyResult)
let insertShelley blk =
insertShelleyBlock
syncEnv
groupsPrev
isStartEventOrRollback
withinTwoMin
withinHalfHour
Expand All @@ -152,10 +161,11 @@ insertBlock syncEnv cblk applyRes firstAfterRollback tookSnapshot = do

-- Here we insert the block and it's txs, but in adition we also cache some values which we later
-- use when updating the Epoch, thus saving us having to recalulating them later.
case cblk of
BlockByron blk ->
groups <- case cblk of
BlockByron blk -> do
newExceptT $
insertByronBlock syncEnv isStartEventOrRollback blk details
pure []
BlockShelley blk ->
newExceptT $
insertShelley $
Expand Down Expand Up @@ -186,7 +196,15 @@ insertBlock syncEnv cblk applyRes firstAfterRollback tookSnapshot = do
when (unBlockNo blkNo `mod` getPruneInterval syncEnv == 0) $
do
lift $ DB.deleteConsumedTxOut tracer (getSafeBlockNoDiff syncEnv)
groups' <-
if insertAll then do
unless (null groups) $
void $ insertBlockGroupedData syncEnv $ mconcat $ reverse groups
pure []
else
pure groups
commitOrIndexes withinTwoMin withinHalfHour
pure groups'
where
tracer = getTrace syncEnv
iopts = getInsertOptions syncEnv
Expand Down
Loading
Loading