diff --git a/cardano-db-sync/src/Cardano/DbSync/DbAction.hs b/cardano-db-sync/src/Cardano/DbSync/DbAction.hs index 096280191..ababaf8ae 100644 --- a/cardano-db-sync/src/Cardano/DbSync/DbAction.hs +++ b/cardano-db-sync/src/Cardano/DbSync/DbAction.hs @@ -78,7 +78,7 @@ newThreadChannels = -- The pipeline queue in the LocalChainSync machinery is 50 elements long -- so we should not exceed that. ThreadChannels - <$> TBQ.newTBQueueIO 47 + <$> TBQ.newTBQueueIO 300 <*> newTVarIO False writeDbActionQueue :: ThreadChannels -> DbAction -> STM () diff --git a/cardano-db-sync/src/Cardano/DbSync/Default.hs b/cardano-db-sync/src/Cardano/DbSync/Default.hs index 1e48d53a6..3bb3ca29b 100644 --- a/cardano-db-sync/src/Cardano/DbSync/Default.hs +++ b/cardano-db-sync/src/Cardano/DbSync/Default.hs @@ -24,6 +24,7 @@ import Cardano.DbSync.Era.Shelley.Adjust (adjustEpochRewards) import qualified Cardano.DbSync.Era.Shelley.Generic as Generic import Cardano.DbSync.Era.Shelley.Insert (insertShelleyBlock, mkAdaPots) import Cardano.DbSync.Era.Shelley.Insert.Epoch (insertPoolDepositRefunds, insertRewards) +import Cardano.DbSync.Era.Shelley.Insert.Grouped import Cardano.DbSync.Era.Shelley.Validate (validateEpochRewards) import Cardano.DbSync.Error import Cardano.DbSync.Fix.EpochStake @@ -57,22 +58,24 @@ insertListBlocks :: [CardanoBlock] -> IO (Either SyncNodeError ()) insertListBlocks synEnv blocks = do - DB.runDbIohkLogging (envBackend synEnv) tracer - . runExceptT - $ traverse_ (applyAndInsertBlockMaybe synEnv) blocks + DB.runDbIohkLogging (envBackend synEnv) tracer $ runExceptT $ do + groups <- foldM (applyAndInsertBlockMaybe synEnv) [] blocks + unless (null groups) $ + void $ insertBlockGroupedData synEnv $ mconcat $ reverse groups where tracer = getTrace synEnv applyAndInsertBlockMaybe :: SyncEnv -> + [BlockGroupedData] -> CardanoBlock -> - ExceptT SyncNodeError (ReaderT SqlBackend (LoggingT IO)) () -applyAndInsertBlockMaybe syncEnv cblk = do + ExceptT SyncNodeError (ReaderT SqlBackend (LoggingT IO)) [BlockGroupedData] +applyAndInsertBlockMaybe syncEnv groups cblk = do bl <- liftIO $ isConsistent syncEnv (!applyRes, !tookSnapshot) <- liftIO (mkApplyResult bl) if bl then -- In the usual case it will be consistent so we don't need to do any queries. Just insert the block - insertBlock syncEnv cblk applyRes False tookSnapshot + insertBlock syncEnv groups cblk applyRes False False tookSnapshot else do eiBlockInDbAlreadyId <- lift (DB.queryBlockId (SBS.fromShort . Consensus.getOneEraHash $ blockHash cblk)) -- If the block is already in db, do nothing. If not, delete all blocks with greater 'BlockNo' or @@ -88,7 +91,7 @@ applyAndInsertBlockMaybe syncEnv cblk = do ] rollbackFromBlockNo syncEnv (blockNo cblk) void $ migrateStakeDistr syncEnv (apOldLedger applyRes) - insertBlock syncEnv cblk applyRes True tookSnapshot + _ <- insertBlock syncEnv groups cblk applyRes True True tookSnapshot liftIO $ setConsistentLevel syncEnv Consistent Right blockId | Just (adaPots, slotNo, epochNo) <- getAdaPots applyRes -> do replaced <- lift $ DB.replaceAdaPots blockId $ mkAdaPots blockId slotNo epochNo adaPots @@ -99,6 +102,7 @@ applyAndInsertBlockMaybe syncEnv cblk = do | Just epochNo <- getNewEpoch applyRes -> liftIO $ logInfo tracer $ "Reached " <> textShow epochNo _ -> pure () + pure [] where tracer = getTrace syncEnv @@ -122,19 +126,23 @@ applyAndInsertBlockMaybe syncEnv cblk = do insertBlock :: SyncEnv -> + [BlockGroupedData] -> CardanoBlock -> ApplyResult -> + -- force inserting all data + Bool -> -- is first Block after rollback Bool -> -- has snapshot been taken Bool -> - ExceptT SyncNodeError (ReaderT SqlBackend (LoggingT IO)) () -insertBlock syncEnv cblk applyRes firstAfterRollback tookSnapshot = do + ExceptT SyncNodeError (ReaderT SqlBackend (LoggingT IO)) [BlockGroupedData] +insertBlock syncEnv groupsPrev cblk applyRes forceInsert firstAfterRollback tookSnapshot = do !epochEvents <- liftIO $ atomically $ generateNewEpochEvents syncEnv (apSlotDetails applyRes) let !applyResult = applyRes {apEvents = sort $ epochEvents <> apEvents applyRes} let !details = apSlotDetails applyResult let !withinTwoMin = isWithinTwoMin details let !withinHalfHour = isWithinHalfHour details + let !insertAll = forceInsert || withinTwoMin || withinHalfHour || tookSnapshot insertLedgerEvents syncEnv (sdEpochNo details) (apEvents applyResult) let isNewEpochEvent = hasNewEpochEvent (apEvents applyResult) let isStartEventOrRollback = hasEpochStartEvent (apEvents applyResult) || firstAfterRollback @@ -142,6 +150,7 @@ insertBlock syncEnv cblk applyRes firstAfterRollback tookSnapshot = do let insertShelley blk = insertShelleyBlock syncEnv + groupsPrev isStartEventOrRollback withinTwoMin withinHalfHour @@ -152,10 +161,11 @@ insertBlock syncEnv cblk applyRes firstAfterRollback tookSnapshot = do -- Here we insert the block and it's txs, but in adition we also cache some values which we later -- use when updating the Epoch, thus saving us having to recalulating them later. - case cblk of - BlockByron blk -> + groups <- case cblk of + BlockByron blk -> do newExceptT $ insertByronBlock syncEnv isStartEventOrRollback blk details + pure [] BlockShelley blk -> newExceptT $ insertShelley $ @@ -186,7 +196,15 @@ insertBlock syncEnv cblk applyRes firstAfterRollback tookSnapshot = do when (unBlockNo blkNo `mod` getPruneInterval syncEnv == 0) $ do lift $ DB.deleteConsumedTxOut tracer (getSafeBlockNoDiff syncEnv) + groups' <- + if insertAll then do + unless (null groups) $ + void $ insertBlockGroupedData syncEnv $ mconcat $ reverse groups + pure [] + else + pure groups commitOrIndexes withinTwoMin withinHalfHour + pure groups' where tracer = getTrace syncEnv iopts = getInsertOptions syncEnv diff --git a/cardano-db-sync/src/Cardano/DbSync/Era/Shelley/Insert.hs b/cardano-db-sync/src/Cardano/DbSync/Era/Shelley/Insert.hs index ffa591f8f..60cf8f375 100644 --- a/cardano-db-sync/src/Cardano/DbSync/Era/Shelley/Insert.hs +++ b/cardano-db-sync/src/Cardano/DbSync/Era/Shelley/Insert.hs @@ -7,6 +7,7 @@ {-# LANGUAGE RankNTypes #-} {-# LANGUAGE ScopedTypeVariables #-} {-# LANGUAGE TypeFamilies #-} +{-# LANGUAGE TupleSections #-} {-# LANGUAGE NoImplicitPrelude #-} module Cardano.DbSync.Era.Shelley.Insert ( @@ -100,6 +101,7 @@ type IsPoolMember = PoolKeyHash -> Bool insertShelleyBlock :: (MonadBaseControl IO m, MonadIO m) => SyncEnv -> + [BlockGroupedData] -> Bool -> Bool -> Bool -> @@ -107,8 +109,8 @@ insertShelleyBlock :: SlotDetails -> IsPoolMember -> ApplyResult -> - ReaderT SqlBackend m (Either SyncNodeError ()) -insertShelleyBlock syncEnv shouldLog withinTwoMins withinHalfHour blk details isMember applyResult = do + ReaderT SqlBackend m (Either SyncNodeError [BlockGroupedData]) +insertShelleyBlock syncEnv groupsPrev shouldLog withinTwoMins withinHalfHour blk details isMember applyResult = do runExceptT $ do pbid <- case Generic.blkPreviousHash blk of Nothing -> liftLookupFail (renderErrorMessage (Generic.blkEra blk)) DB.queryGenesis -- this is for networks that fork from Byron on epoch 0. @@ -139,9 +141,12 @@ insertShelleyBlock syncEnv shouldLog withinTwoMins withinHalfHour blk details is } let zippedTx = zip [0 ..] (Generic.blkTxs blk) - let txInserter = insertTx syncEnv isMember blkId (sdEpochNo details) (Generic.blkSlotNo blk) applyResult - blockGroupedData <- foldM (\gp (idx, tx) -> txInserter idx tx gp) mempty zippedTx - minIds <- insertBlockGroupedData syncEnv blockGroupedData + + txsPrepared <- foldAndAccM (prepareTx syncEnv txOutPrev blkId applyResult) zippedTx + txIds <- lift $ DB.insertManyTx (ptrTxDb <$> txsPrepared) + let txInserter = insertTx syncEnv txOutPrev blkId isMember (sdEpochNo details) (Generic.blkSlotNo blk) applyResult + let newZip = zipWith3 (\tx txId ptr -> (txId, tx, ptr)) (Generic.blkTxs blk) txIds txsPrepared + blockGroupedData <- foldM txInserter mempty newZip -- now that we've inserted the Block and all it's txs lets cache what we'll need -- when we later update the epoch values. @@ -153,15 +158,12 @@ insertShelleyBlock syncEnv shouldLog withinTwoMins withinHalfHour blk details is EpochBlockDiff { ebdBlockId = blkId , ebdTime = sdSlotTime details - , ebdFees = groupedTxFees blockGroupedData + , ebdFees = sum (ptrFees <$> txsPrepared) , ebdEpochNo = unEpochNo (sdEpochNo details) - , ebdOutSum = fromIntegral $ groupedTxOutSum blockGroupedData + , ebdOutSum = sum (fromIntegral . ptrOutSum <$> txsPrepared) , ebdTxCount = fromIntegral $ length (Generic.blkTxs blk) } - when withinHalfHour $ - insertReverseIndex blkId minIds - liftIO $ do let epoch = unEpochNo epochNo slotWithinEpoch = unEpochSlot (sdEpochSlot details) @@ -203,9 +205,20 @@ insertShelleyBlock syncEnv shouldLog withinTwoMins withinHalfHour blk details is when (ioOffChainPoolData iopts) . lift $ insertOffChainPoolResults tracer (envOffChainPoolResultQueue syncEnv) + + if withinHalfHour then do + unless (null groupsPrev) $ + void $ insertBlockGroupedData syncEnv $ mconcat $ reverse groupsPrev + minIds <- insertBlockGroupedData syncEnv blockGroupedData + insertReverseIndex blkId minIds + pure [] + else do + pure $ blockGroupedData : groupsPrev where iopts = getInsertOptions syncEnv + txOutPrev = fmap fst . groupedTxOut <$> groupsPrev + logger :: Trace IO a -> a -> IO () logger | shouldLog = logInfo @@ -258,52 +271,45 @@ insertOnNewEpoch tracer iopts blkId slotNo epochNo newEpoch = do -- ----------------------------------------------------------------------------- -insertTx :: +data PrepareTxRes = PrepareTxRes + { ptrTxDb :: DB.Tx + , ptrFees :: Word64 + , ptrOutSum :: Word64 + , ptrResolvedTxIn :: [(Generic.TxIn, Maybe DB.TxId, Either Generic.TxIn DB.TxOutId)] + } + +prepareTx :: (MonadBaseControl IO m, MonadIO m) => SyncEnv -> - IsPoolMember -> + [[ExtendedTxOut]] -> DB.BlockId -> - EpochNo -> - SlotNo -> ApplyResult -> - Word64 -> - Generic.Tx -> - BlockGroupedData -> - ExceptT SyncNodeError (ReaderT SqlBackend m) BlockGroupedData -insertTx syncEnv isMember blkId epochNo slotNo applyResult blockIndex tx grouped = do + [(ByteString, Generic.TxOut)] -> + (Word64, Generic.Tx) -> + ExceptT SyncNodeError (ReaderT SqlBackend m) (PrepareTxRes, [(ByteString, Generic.TxOut)]) +prepareTx syncEnv txOutPrev blkId applyResult blockTxOuts (blockIndex, tx) = do let !txHash = Generic.txHash tx let !mdeposits = if not (Generic.txValidContract tx) then Just (Coin 0) else lookupDepositsMap txHash (apDepositsMap applyResult) let !outSum = fromIntegral $ unCoin $ Generic.txOutSum tx - !withdrawalSum = fromIntegral $ unCoin $ Generic.txWithdrawalSum tx hasConsumed = getHasConsumedOrPruneTxOut syncEnv disInOut <- liftIO $ getDisableInOutState syncEnv -- In some txs and with specific configuration we may be able to find necessary data within the tx body. -- In these cases we can avoid expensive queries. - (resolvedInputs, fees', deposits) <- case (disInOut, mdeposits, unCoin <$> Generic.txFees tx) of - (True, _, _) -> pure ([], 0, unCoin <$> mdeposits) - (_, Just deposits, Just fees) -> do - (resolvedInputs, _) <- splitLast <$> mapM (resolveTxInputs hasConsumed False (fst <$> groupedTxOut grouped)) (Generic.txInputs tx) - pure (resolvedInputs, fees, Just (unCoin deposits)) - (_, Nothing, Just fees) -> do - (resolvedInputs, amounts) <- splitLast <$> mapM (resolveTxInputs hasConsumed False (fst <$> groupedTxOut grouped)) (Generic.txInputs tx) - if any isNothing amounts - then pure (resolvedInputs, fees, Nothing) - else - let !inSum = sum $ map unDbLovelace $ catMaybes amounts - in pure (resolvedInputs, fees, Just $ fromIntegral (inSum + withdrawalSum) - fromIntegral outSum - fromIntegral fees) - (_, _, Nothing) -> do + (resolvedInputs, fees', deposits) <- case (disInOut, unCoin <$> Generic.txFees tx) of + (True, _) -> pure ([], 0, unCoin <$> mdeposits) + (_, Just fees) -> do + resolvedInputsDB <- lift $ mapM (resolveTxInputs hasConsumed) (Generic.txInputs tx) + pure (resolvedInputsDB, fees, unCoin <$> mdeposits) + (_, Nothing) -> do -- Nothing in fees means a phase 2 failure - (resolvedInsFull, amounts) <- splitLast <$> mapM (resolveTxInputs hasConsumed True (fst <$> groupedTxOut grouped)) (Generic.txInputs tx) + (resolvedInsFull, amounts) <- splitLast <$> mapM (resolveTxInputsValue txOutPrev blockTxOuts) (Generic.txInputs tx) let !inSum = sum $ map unDbLovelace $ catMaybes amounts !diffSum = if inSum >= outSum then inSum - outSum else 0 !fees = maybe diffSum (fromIntegral . unCoin) (Generic.txFees tx) pure (resolvedInsFull, fromIntegral fees, Just 0) let fees = fromIntegral fees' -- Insert transaction and get txId from the DB. - !txId <- - lift - . DB.insertTx - $ DB.Tx + let txDb = DB.Tx { DB.txHash = txHash , DB.txBlockId = blkId , DB.txBlockIndex = blockIndex @@ -316,15 +322,31 @@ insertTx syncEnv isMember blkId epochNo slotNo applyResult blockIndex tx grouped , DB.txValidContract = Generic.txValidContract tx , DB.txScriptSize = sum $ Generic.txScriptSizes tx } + pure (PrepareTxRes txDb fees outSum resolvedInputs, blockTxOuts <> ((txHash,) <$> Generic.txOutputs tx)) +insertTx :: + (MonadBaseControl IO m, MonadIO m) => + SyncEnv -> + [[ExtendedTxOut]] -> + DB.BlockId -> + IsPoolMember -> + EpochNo -> + SlotNo -> + ApplyResult -> + BlockGroupedData -> + (DB.TxId, Generic.Tx, PrepareTxRes) -> + ExceptT SyncNodeError (ReaderT SqlBackend m) BlockGroupedData +insertTx syncEnv txOutPrev blkId isMember epochNo slotNo applyResult grouped (txId, tx, ptr) = do + let !txHash = Generic.txHash tx + disInOut <- liftIO $ getDisableInOutState syncEnv if not (Generic.txValidContract tx) then do !txOutsGrouped <- mapM (prepareTxOut tracer cache iopts (txId, txHash)) (Generic.txOutputs tx) - let !txIns = map (prepareTxIn txId Map.empty) resolvedInputs + !txIns <- mapM (prepareTxIn txId groups Map.empty) (ptrResolvedTxIn ptr) -- There is a custom semigroup instance for BlockGroupedData which uses addition for the values `fees` and `outSum`. -- Same happens bellow on last line of this function. - pure (grouped <> BlockGroupedData txIns txOutsGrouped [] [] fees outSum) + pure (grouped <> BlockGroupedData txIns txOutsGrouped [] []) else do -- The following operations only happen if the script passes stage 2 validation (or the tx has -- no script). @@ -334,7 +356,7 @@ insertTx syncEnv isMember blkId epochNo slotNo applyResult blockIndex tx grouped Map.fromList <$> whenFalseMempty (ioPlutusExtra iopts) - (mapM (insertRedeemer tracer disInOut (fst <$> groupedTxOut grouped) txId) (Generic.txRedeemer tx)) + (mapM (insertRedeemer tracer disInOut groups txId) (Generic.txRedeemer tx)) when (ioPlutusExtra iopts) $ do mapM_ (insertDatum tracer cache txId) (Generic.txData tx) @@ -378,13 +400,15 @@ insertTx syncEnv isMember blkId epochNo slotNo applyResult blockIndex tx grouped mapM_ (insertGovActionProposal cache blkId txId (getGovExpiresAt applyResult epochNo)) $ zip [0 ..] (Generic.txProposalProcedure tx) mapM_ (insertVotingProcedures tracer cache txId) (Generic.txVotingProcedure tx) - let !txIns = map (prepareTxIn txId redeemers) resolvedInputs - pure (grouped <> BlockGroupedData txIns txOutsGrouped txMetadata maTxMint fees outSum) + !txIns <- mapM (prepareTxIn txId groups redeemers) (ptrResolvedTxIn ptr) + pure (grouped <> BlockGroupedData txIns txOutsGrouped txMetadata maTxMint) where tracer = getTrace syncEnv cache = envCache syncEnv iopts = getInsertOptions syncEnv + groups = (fst <$> groupedTxOut grouped) : txOutPrev + prepareTxOut :: (MonadBaseControl IO m, MonadIO m) => Trace IO Text -> @@ -467,23 +491,31 @@ insertCollateralTxOut tracer cache iopts (txId, _txHash) (Generic.TxOut index ad hasScript = maybe False Generic.hasCredScript (Generic.getPaymentCred addr) prepareTxIn :: + Monad m => DB.TxId -> + [[ExtendedTxOut]] -> Map Word64 DB.RedeemerId -> - (Generic.TxIn, DB.TxId, Either Generic.TxIn DB.TxOutId) -> - ExtendedTxIn -prepareTxIn txInId redeemers (txIn, txOutId, mTxOutId) = - ExtendedTxIn - { etiTxIn = txInDB - , etiTxOutId = mTxOutId - } - where - txInDB = - DB.TxIn - { DB.txInTxInId = txInId - , DB.txInTxOutId = txOutId - , DB.txInTxOutIndex = fromIntegral $ Generic.txInIndex txIn - , DB.txInRedeemerId = mlookup (Generic.txInRedeemerIndex txIn) redeemers - } + (Generic.TxIn, Maybe DB.TxId, Either Generic.TxIn DB.TxOutId) -> + ExceptT SyncNodeError m ExtendedTxIn +prepareTxIn txInId groupedOutputs redeemers (txIn, mtxOutId, mTxOutId) = do + txOutId <- liftLookupFail "resolveScriptHash" $ + case mtxOutId of + Just txOutId -> pure $ Right txOutId + Nothing -> case resolveInMemoryMany txIn groupedOutputs of + Nothing -> pure $ Left $ DB.DbLookupTxHash (Generic.txInHash txIn) + Just txOut -> pure $ Right $ DB.txOutTxId $ etoTxOut txOut + let txInDB = + DB.TxIn + { DB.txInTxInId = txInId + , DB.txInTxOutId = txOutId + , DB.txInTxOutIndex = fromIntegral $ Generic.txInIndex txIn + , DB.txInRedeemerId = mlookup (Generic.txInRedeemerIndex txIn) redeemers + } + pure + ExtendedTxIn + { etiTxIn = txInDB + , etiTxOutId = mTxOutId + } insertCollateralTxIn :: (MonadBaseControl IO m, MonadIO m) => @@ -1118,7 +1150,7 @@ insertRedeemer :: (MonadBaseControl IO m, MonadIO m) => Trace IO Text -> Bool -> - [ExtendedTxOut] -> + [[ExtendedTxOut]] -> DB.TxId -> (Word64, Generic.TxRedeemer) -> ExceptT SyncNodeError (ReaderT SqlBackend m) (Word64, DB.RedeemerId) diff --git a/cardano-db-sync/src/Cardano/DbSync/Era/Shelley/Insert/Grouped.hs b/cardano-db-sync/src/Cardano/DbSync/Era/Shelley/Insert/Grouped.hs index ec3c81057..f7e39f23a 100644 --- a/cardano-db-sync/src/Cardano/DbSync/Era/Shelley/Insert/Grouped.hs +++ b/cardano-db-sync/src/Cardano/DbSync/Era/Shelley/Insert/Grouped.hs @@ -10,7 +10,10 @@ module Cardano.DbSync.Era.Shelley.Insert.Grouped ( insertBlockGroupedData, insertReverseIndex, resolveTxInputs, + resolveTxInputsValue, resolveScriptHash, + resolveInMemory, + resolveInMemoryMany, mkmaTxOuts, ) where @@ -23,6 +26,7 @@ import qualified Cardano.DbSync.Era.Shelley.Generic as Generic import Cardano.DbSync.Era.Shelley.Query import Cardano.DbSync.Era.Util import Cardano.DbSync.Error +import Cardano.Ledger.Coin (Coin (..)) import Cardano.Prelude import Control.Monad.Trans.Control (MonadBaseControl) import qualified Data.List as List @@ -45,8 +49,6 @@ data BlockGroupedData = BlockGroupedData , groupedTxOut :: ![(ExtendedTxOut, [MissingMaTxOut])] , groupedTxMetadata :: ![DB.TxMetadata] , groupedTxMint :: ![DB.MaTxMint] - , groupedTxFees :: !Word64 - , groupedTxOutSum :: !Word64 } -- | While we collect data, we don't have access yet to the 'TxOutId', since @@ -70,7 +72,7 @@ data ExtendedTxIn = ExtendedTxIn deriving (Show) instance Monoid BlockGroupedData where - mempty = BlockGroupedData [] [] [] [] 0 0 + mempty = BlockGroupedData [] [] [] [] instance Semigroup BlockGroupedData where tgd1 <> tgd2 = @@ -79,8 +81,6 @@ instance Semigroup BlockGroupedData where (groupedTxOut tgd1 <> groupedTxOut tgd2) (groupedTxMetadata tgd1 <> groupedTxMetadata tgd2) (groupedTxMint tgd1 <> groupedTxMint tgd2) - (groupedTxFees tgd1 + groupedTxFees tgd2) - (groupedTxOutSum tgd1 + groupedTxOutSum tgd2) insertBlockGroupedData :: (MonadBaseControl IO m, MonadIO m) => @@ -140,41 +140,56 @@ insertReverseIndex blockId minIds = , DB.reverseIndexMinIds = minIdsToText minIds } --- | If we can't resolve from the db, we fall back to the provided outputs +-- | If we can't resolve from the db, we return nothing. -- This happens the input consumes an output introduced in the same block. resolveTxInputs :: MonadIO m => Bool -> - Bool -> - [ExtendedTxOut] -> Generic.TxIn -> - ExceptT SyncNodeError (ReaderT SqlBackend m) (Generic.TxIn, DB.TxId, Either Generic.TxIn DB.TxOutId, Maybe DbLovelace) -resolveTxInputs hasConsumed needsValue groupedOutputs txIn = - liftLookupFail ("resolveTxInputs " <> textShow txIn <> " ") $ do + ReaderT SqlBackend m (Generic.TxIn, Maybe DB.TxId, Either Generic.TxIn DB.TxOutId) +resolveTxInputs hasConsumed txIn = do qres <- - case (hasConsumed, needsValue) of - (_, True) -> fmap convertFoundAll <$> resolveInputTxOutIdValue txIn - (False, _) -> fmap convertnotFound <$> resolveInputTxId txIn - (True, False) -> fmap convertFoundTxOutId <$> resolveInputTxOutId txIn + if hasConsumed + then fmap convertFoundTxOutId <$> resolveInputTxOutId txIn + else fmap convertFoundTxId <$> resolveInputTxId txIn case qres of - Right ret -> pure $ Right ret - Left err -> - case (resolveInMemory txIn groupedOutputs, hasConsumed, needsValue) of - (Nothing, _, _) -> pure $ Left err - (Just eutxo, True, True) -> pure $ Right $ convertFoundValue (DB.txOutTxId (etoTxOut eutxo), DB.txOutValue (etoTxOut eutxo)) - (Just eutxo, _, _) -> pure $ Right $ convertnotFound $ DB.txOutTxId (etoTxOut eutxo) + Right ret -> pure ret + Left _ -> pure foundNothing where - convertnotFound :: DB.TxId -> (Generic.TxIn, DB.TxId, Either Generic.TxIn DB.TxOutId, Maybe DbLovelace) - convertnotFound txId = (txIn, txId, Left txIn, Nothing) + convertFoundTxId :: DB.TxId -> (Generic.TxIn, Maybe DB.TxId, Either Generic.TxIn DB.TxOutId) + convertFoundTxId txId = (txIn, Just txId, Left txIn) - convertFoundTxOutId :: (DB.TxId, DB.TxOutId) -> (Generic.TxIn, DB.TxId, Either Generic.TxIn DB.TxOutId, Maybe DbLovelace) - convertFoundTxOutId (txId, txOutId) = (txIn, txId, Right txOutId, Nothing) + convertFoundTxOutId :: (DB.TxId, DB.TxOutId) -> (Generic.TxIn, Maybe DB.TxId, Either Generic.TxIn DB.TxOutId) + convertFoundTxOutId (txId, txOutId) = (txIn, Just txId, Right txOutId) - convertFoundValue :: (DB.TxId, DbLovelace) -> (Generic.TxIn, DB.TxId, Either Generic.TxIn DB.TxOutId, Maybe DbLovelace) - convertFoundValue (txId, lovelace) = (txIn, txId, Left txIn, Just lovelace) + foundNothing :: (Generic.TxIn, Maybe DB.TxId, Either Generic.TxIn DB.TxOutId) + foundNothing = (txIn, Nothing, Left txIn) - convertFoundAll :: (DB.TxId, DB.TxOutId, DbLovelace) -> (Generic.TxIn, DB.TxId, Either Generic.TxIn DB.TxOutId, Maybe DbLovelace) - convertFoundAll (txId, txOutId, lovelace) = (txIn, txId, Right txOutId, Just lovelace) +-- | If we can't resolve from the db, we fall back to the provided outputs +-- This happens the input consumes an output introduced in the same block. +resolveTxInputsValue :: + MonadIO m => + [[ExtendedTxOut]] -> + [(ByteString, Generic.TxOut)] -> + Generic.TxIn -> + ExceptT SyncNodeError (ReaderT SqlBackend m) (Generic.TxIn, Maybe DB.TxId, Either Generic.TxIn DB.TxOutId, Maybe DbLovelace) +resolveTxInputsValue txOutPrev blockTxOuts txIn = + liftLookupFail ("resolveTxInputsValue " <> textShow txIn <> " ") $ do + qres <- fmap convertFoundAll <$> resolveInputTxOutIdValue txIn + case qres of + Right ret -> pure $ Right ret + Left err -> + case resolveInMemory' txIn blockTxOuts of + Just txOut -> pure $ Right $ convertFoundValue $ DB.DbLovelace $ fromIntegral $ unCoin $ Generic.txOutAdaValue txOut + Nothing -> case resolveInMemoryMany txIn txOutPrev of + Nothing -> pure $ Left err + Just txOut -> pure $ Right $ convertFoundValue $ DB.txOutValue $ etoTxOut txOut + where + convertFoundAll :: (DB.TxId, DB.TxOutId, DbLovelace) -> (Generic.TxIn, Maybe DB.TxId, Either Generic.TxIn DB.TxOutId, Maybe DbLovelace) + convertFoundAll (txId, txOutId, lovelace) = (txIn, Just txId, Right txOutId, Just lovelace) + + convertFoundValue :: DbLovelace -> (Generic.TxIn, Maybe DB.TxId, Either Generic.TxIn DB.TxOutId, Maybe DbLovelace) + convertFoundValue lovelace = (txIn, Nothing, Left txIn, Just lovelace) resolveRemainingInputs :: MonadIO m => @@ -193,7 +208,7 @@ resolveRemainingInputs etis mp = resolveScriptHash :: (MonadBaseControl IO m, MonadIO m) => - [ExtendedTxOut] -> + [[ExtendedTxOut]] -> Generic.TxIn -> ExceptT SyncNodeError (ReaderT SqlBackend m) (Maybe ByteString) resolveScriptHash groupedOutputs txIn = @@ -202,7 +217,7 @@ resolveScriptHash groupedOutputs txIn = case qres of Right ret -> pure $ Right ret Left err -> - case resolveInMemory txIn groupedOutputs of + case resolveInMemoryMany txIn groupedOutputs of Nothing -> pure $ Left err Just eutxo -> pure $ Right $ DB.txOutPaymentCred $ etoTxOut eutxo @@ -215,6 +230,27 @@ matches txIn eutxo = Generic.txInHash txIn == etoTxHash eutxo && Generic.txInIndex txIn == DB.txOutIndex (etoTxOut eutxo) +resolveInMemory' :: Generic.TxIn -> [(ByteString, Generic.TxOut)] -> Maybe Generic.TxOut +resolveInMemory' txIn txOuts = + snd <$> List.find (matches' txIn) txOuts + +matches' :: Generic.TxIn -> (ByteString, Generic.TxOut) -> Bool +matches' txIn (txHash, txOut) = + Generic.txInHash txIn == txHash + && Generic.txInIndex txIn == Generic.txOutIndex txOut + +resolveInMemoryMany :: Generic.TxIn -> [[ExtendedTxOut]] -> Maybe ExtendedTxOut +resolveInMemoryMany txIn = + findMapMaybe (resolveInMemory txIn) + where + findMapMaybe :: (a -> Maybe b) -> [a] -> Maybe b + findMapMaybe f = go + where + go [] = Nothing + go (a : as) = case f a of + Nothing -> go as + Just b -> Just b + minimumMaybe :: (Ord a, Foldable f) => f a -> Maybe a minimumMaybe xs | null xs = Nothing diff --git a/cardano-db-sync/src/Cardano/DbSync/Util.hs b/cardano-db-sync/src/Cardano/DbSync/Util.hs index 0b8bda526..c5674044e 100644 --- a/cardano-db-sync/src/Cardano/DbSync/Util.hs +++ b/cardano-db-sync/src/Cardano/DbSync/Util.hs @@ -1,5 +1,7 @@ {-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE RankNTypes #-} +{-# LANGUAGE ScopedTypeVariables #-} {-# LANGUAGE TypeApplications #-} {-# LANGUAGE TypeFamilies #-} {-# LANGUAGE NoImplicitPrelude #-} @@ -23,12 +25,14 @@ module Cardano.DbSync.Util ( textPrettyShow, textShow, third, - thrd3, + first3, forth4, + zipWith3, splitLast, traverseMEither, whenStrictJust, whenMaybe, + foldAndAccM, mlookup, whenRight, whenFalseEmpty, @@ -185,15 +189,30 @@ whenMaybe :: Monad m => Maybe a -> (a -> m b) -> m (Maybe b) whenMaybe (Just a) f = Just <$> f a whenMaybe Nothing _f = pure Nothing +foldAndAccM :: forall a b c t m. (Foldable t, Monad m) => ([c] -> a -> m (b, [c])) -> t a -> m [b] +foldAndAccM f as = reverse . snd <$> foldM g ([], []) as + where + g :: ([c], [b]) -> a -> m ([c], [b]) + g (cs, bs) a = do + (b, cs') <- f cs a + pure (cs <> cs', b : bs) + third :: (a, b, c) -> c third (_, _, c) = c -thrd3 :: (a, b, c, d) -> c -thrd3 (_, _, c, _) = c +first3 :: (a, b, c) -> a +first3 (a, _, _) = a forth4 :: (a, b, c, d) -> d forth4 (_, _, _, d) = d +{-# NOINLINE [1] zipWith3 #-} +zipWith3 :: (a -> b -> c -> d) -> [a]-> [b] -> [c] -> [d] +zipWith3 z = go + where + go (a:as) (b:bs) (c:cs) = z a b c : go as bs cs + go _ _ _ = [] + splitLast :: [(a, b, c, d)] -> ([(a, b, c)], [d]) splitLast = unzip . fmap (\(a, b, c, d) -> ((a, b, c), d)) diff --git a/cardano-db/src/Cardano/Db/Insert.hs b/cardano-db/src/Cardano/Db/Insert.hs index 14645af46..61e02dfbc 100644 --- a/cardano-db/src/Cardano/Db/Insert.hs +++ b/cardano-db/src/Cardano/Db/Insert.hs @@ -40,6 +40,7 @@ module Cardano.Db.Insert ( insertStakeRegistration, insertTreasury, insertTx, + insertManyTx, insertTxIn, insertManyTxMint, insertManyTxMetadata, @@ -268,6 +269,9 @@ insertTreasury = insertUnchecked "Treasury" insertTx :: (MonadBaseControl IO m, MonadIO m) => Tx -> ReaderT SqlBackend m TxId insertTx tx = insertUnchecked ("Tx: " ++ show (BS.length (txHash tx))) tx +insertManyTx :: (MonadBaseControl IO m, MonadIO m) => [Tx] -> ReaderT SqlBackend m [TxId] +insertManyTx = insertMany' "Txs" + insertTxIn :: (MonadBaseControl IO m, MonadIO m) => TxIn -> ReaderT SqlBackend m TxInId insertTxIn = insertUnchecked "TxIn"