Skip to content

Commit

Permalink
PR cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
ChrisPenner committed Jan 17, 2025
1 parent b4adcb6 commit 0a7ef4f
Show file tree
Hide file tree
Showing 6 changed files with 119 additions and 133 deletions.
1 change: 0 additions & 1 deletion unison-cli/package.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ library:
- condition: "!os(windows)"
dependencies: unix
dependencies:
- attoparsec
- Diff
- IntervalMap
- ListLike
Expand Down
3 changes: 2 additions & 1 deletion unison-cli/src/Unison/Codebase/Editor/Input.hs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,8 @@ data Input
| PushRemoteBranchI PushRemoteBranchInput
| SyncToFileI FilePath (ProjectAndBranch (Maybe ProjectName) (Maybe ProjectBranchName))
| SyncFromFileI FilePath UnresolvedProjectBranch
| SyncFromCodebaseI FilePath (ProjectAndBranch ProjectName ProjectBranchName) UnresolvedProjectBranch
| -- | Sync from a codebase project branch to this codebase's project branch
SyncFromCodebaseI FilePath (ProjectAndBranch ProjectName ProjectBranchName) UnresolvedProjectBranch
| ResetI (BranchId2 {- namespace to reset it to -}) (Maybe UnresolvedProjectBranch {- ProjectBranch to reset -})
| -- | used in Welcome module to give directions to user
--
Expand Down
6 changes: 4 additions & 2 deletions unison-cli/src/Unison/CommandLine/InputPatterns.hs
Original file line number Diff line number Diff line change
Expand Up @@ -2205,11 +2205,13 @@ syncFromCodebase =
args = [("codebase-location", Required, filePathArg), ("branch-to-sync", Required, projectAndBranchNamesArg suggestionsConfig), ("destination-branch", Optional, projectAndBranchNamesArg suggestionsConfig)],
help =
( P.wrapColumn2
[ (makeExample syncFromCodebase ["./codebase", "/feature", "/main"], "Sets the /feature branch to the contents of the codebase at ./codebase.")
[ ( makeExample syncFromCodebase ["./codebase", "srcProject/main", "destProject/main"],
"Imports srcProject/main from the specified codebase, then sets destProject/main to the imported branch."
)
]
),
parse = \case
[codebaseLocation, branchToSync, destinationBranch] -> Input.SyncFromCodebaseI <$> unsupportedStructuredArgument makeStandalone "a file name" codebaseLocation <*> handleBranchWithProject branchToSync <*> handleBranchWithOptionalProject destinationBranch
[codebaseLocation, srcBranch, destinationBranch] -> Input.SyncFromCodebaseI <$> unsupportedStructuredArgument makeStandalone "a file name" codebaseLocation <*> handleBranchWithProject srcBranch <*> handleBranchWithOptionalProject destinationBranch
args -> wrongArgsLength "three arguments" args
}
where
Expand Down
231 changes: 112 additions & 119 deletions unison-cli/src/Unison/Share/SyncV2.hs
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,8 @@ import Control.Monad.Except
import Control.Monad.Reader (ask)
import Control.Monad.ST (ST, stToIO)
import Control.Monad.State
import Data.Attoparsec.ByteString qualified as A
import Data.Attoparsec.ByteString.Char8 qualified as A8
import Data.ByteString qualified as BS
import Data.ByteString.Lazy qualified as BL
import Data.Conduit.Attoparsec qualified as C
import Data.Conduit.List qualified as C
import Data.Conduit.Zlib qualified as C
import Data.Graph qualified as Graph
Expand All @@ -36,7 +33,6 @@ import U.Codebase.Sqlite.V2.HashHandle (v2HashHandle)
import Unison.Cli.Monad (Cli)
import Unison.Cli.Monad qualified as Cli
import Unison.Codebase qualified as Codebase
import Unison.Debug qualified as Debug
import Unison.Hash32 (Hash32)
import Unison.Prelude
import Unison.Share.ExpectedHashMismatches (expectedCausalHashMismatches, expectedComponentHashMismatches)
Expand All @@ -57,27 +53,102 @@ type Stream i o = ConduitT i o StreamM ()

type SyncErr = SyncError SyncV2.PullError

-- The base monad we use within the conduit pipeline.
type StreamM = (ExceptT SyncErr (C.ResourceT IO))

-- | The number of entities to process in a single transaction.
--
-- SQLite transactions have some fixed overhead, so setting this too low can really slow things down,
-- but going too high here means we may be waiting on the network to get a full batch when we could be starting work.
batchSize :: Int
batchSize = 5000

------------------------------------------------------------------------------------------------------------------------
-- Download entities
-- Main methods
------------------------------------------------------------------------------------------------------------------------

-- | Sync a given causal hash and its dependencies to a sync-file.
syncToFile ::
Codebase.Codebase IO v a ->
CausalHash ->
Maybe SyncV2.BranchRef ->
FilePath ->
IO (Either SyncErr ())
syncToFile codebase rootHash mayBranchRef destFilePath = do
liftIO $ Codebase.withConnection codebase \conn -> do
C.runResourceT $
withCodebaseEntityStream conn rootHash mayBranchRef \mayTotal stream -> do
withStreamProgressCallback (Just mayTotal) \countC -> runExceptT do
C.runConduit $ stream C..| countC C..| C.map (BL.toStrict . CBOR.serialise) C..| C.transPipe liftIO C.gzip C..| C.sinkFile destFilePath

syncFromFile ::
Bool ->
-- | Location of the sync-file
FilePath ->
Cli (Either (SyncError SyncV2.PullError) CausalHash)
syncFromFile shouldValidate syncFilePath = do
Cli.Env {codebase} <- ask
runExceptT do
mapExceptT liftIO $ Timing.time "File Sync" $ do
header <- mapExceptT C.runResourceT $ do
let stream = C.sourceFile syncFilePath C..| C.ungzip C..| decodeUnframedEntities
(header, rest) <- initializeStream stream
streamIntoCodebase shouldValidate codebase header rest
pure header
afterSyncChecks codebase (SyncV2.rootCausalHash header)
pure . hash32ToCausalHash $ SyncV2.rootCausalHash header

syncFromCodebase ::
Bool ->
-- | The codebase to sync from.
Sqlite.Connection ->
(Codebase.Codebase IO v a) ->
-- | The hash to sync.
CausalHash ->
IO (Either (SyncError SyncV2.PullError) ())
syncFromCodebase shouldValidate srcConn destCodebase causalHash = do
liftIO . C.runResourceT . runExceptT $ withCodebaseEntityStream srcConn causalHash Nothing \_total entityStream -> do
(header, rest) <- initializeStream entityStream
streamIntoCodebase shouldValidate destCodebase header rest
mapExceptT liftIO (afterSyncChecks destCodebase (causalHashToHash32 causalHash))

------------------------------------------------------------------------------------------------------------------------
-- Helpers
------------------------------------------------------------------------------------------------------------------------

-- | Validate that the provided entities match their expected hashes, and if so, save them to the codebase.
validateAndSave :: Bool -> (Codebase.Codebase IO v a) -> [(Hash32, TempEntity)] -> StreamM ()
validateAndSave shouldValidate codebase entities = do
let validateEntities =
runExceptT $ when shouldValidate (batchValidateEntities entities)
-- Validation is slow, run it in parallel with insertion, but don't commit the transaction until we're done
-- validation.
-- Validation is slow, so we run it in parallel with insertion (which can also be slow),
-- but we don't commit the transaction until we're done validation to avoid inserting invalid entities.
ExceptT . liftIO $ IO.withAsync validateEntities \validationTask -> do
Timing.time "Inserting entities" $ Codebase.runTransactionExceptT codebase do
for_ entities \(hash, entity) -> do
void . lift $ Q.saveTempEntityInMain v2HashHandle hash entity
lift (Sqlite.unsafeIO (IO.wait validationTask)) >>= \case
Left err -> throwError err
Right _ -> pure ()
where
batchValidateEntities :: [(Hash32, TempEntity)] -> ExceptT SyncErr IO ()
batchValidateEntities entities = do
mismatches <- fmap catMaybes $ liftIO $ IO.pooledForConcurrently entities \(hash, entity) -> do
IO.evaluate $ EV.validateTempEntity hash entity
for_ mismatches \case
err@(Share.EntityHashMismatch et (Share.HashMismatchForEntity {supplied, computed})) ->
let expectedMismatches = case et of
Share.TermComponentType -> expectedComponentHashMismatches
Share.DeclComponentType -> expectedComponentHashMismatches
Share.CausalType -> expectedCausalHashMismatches
_ -> mempty
in case Map.lookup supplied expectedMismatches of
Just expected
| expected == computed -> pure ()
_ -> do
throwError . SyncError . SyncV2.PullError'DownloadEntities . SyncV2.DownloadEntitiesEntityValidationFailure $ err
err -> do
throwError . SyncError . SyncV2.PullError'DownloadEntities . SyncV2.DownloadEntitiesEntityValidationFailure $ err

-- | Syncs a stream which could send entities in any order.
syncUnsortedStream ::
Expand All @@ -86,7 +157,6 @@ syncUnsortedStream ::
Stream () SyncV2.EntityChunk ->
StreamM ()
syncUnsortedStream shouldValidate codebase stream = do
Debug.debugLogM Debug.Temp $ "Syncing unsorted stream"
allResults <- C.runConduit $ stream C..| C.sinkList
allEntities <- ExceptT $ Timing.time "Unpacking chunks" $ liftIO $ Codebase.runTransactionExceptT codebase $ do unpackChunks allResults
let sortedEntities = sortDependencyFirst allEntities
Expand All @@ -99,13 +169,20 @@ syncSortedStream ::
Stream () SyncV2.EntityChunk ->
StreamM ()
syncSortedStream shouldValidate codebase stream = do
Debug.debugLogM Debug.Temp $ "Syncing sorted stream"
let handler :: Stream [SyncV2.EntityChunk] o
handler = C.mapM_C \chunkBatch -> do
entityBatch <- mapExceptT lift . ExceptT $ Codebase.runTransactionExceptT codebase do for chunkBatch unpackChunk
validateAndSave shouldValidate codebase (catMaybes entityBatch)
entityBatch <- mapExceptT lift . ExceptT $ Codebase.runTransactionExceptT codebase do unpackChunks chunkBatch
validateAndSave shouldValidate codebase entityBatch
C.runConduit $ stream C..| C.chunksOf batchSize C..| handler

-- | Topologically sort entities based on their dependencies.
sortDependencyFirst :: [(Hash32, TempEntity)] -> [(Hash32, TempEntity)]
sortDependencyFirst entities = do
let adjList = entities <&> \(hash32, entity) -> ((hash32, entity), hash32, Set.toList $ Share.entityDependencies (tempEntityToEntity entity))
(graph, vertexInfo, _vertexForKey) = Graph.graphFromEdges adjList
in Graph.reverseTopSort graph <&> \v -> (view _1 $ vertexInfo v)

-- | Unpack a single entity chunk, returning the entity if it's not already in the codebase, Nothing otherwise.
unpackChunk :: SyncV2.EntityChunk -> ExceptT SyncErr Sqlite.Transaction (Maybe (Hash32, TempEntity))
unpackChunk = \case
SyncV2.EntityChunk {hash, entityCBOR = entityBytes} -> do
Expand All @@ -126,25 +203,6 @@ unpackChunks xs = do
for xs unpackChunk
<&> catMaybes

batchValidateEntities :: [(Hash32, TempEntity)] -> ExceptT SyncErr IO ()
batchValidateEntities entities = do
mismatches <- fmap catMaybes $ liftIO $ IO.pooledForConcurrently entities \(hash, entity) -> do
IO.evaluate $ EV.validateTempEntity hash entity
for_ mismatches \case
err@(Share.EntityHashMismatch et (Share.HashMismatchForEntity {supplied, computed})) ->
let expectedMismatches = case et of
Share.TermComponentType -> expectedComponentHashMismatches
Share.DeclComponentType -> expectedComponentHashMismatches
Share.CausalType -> expectedCausalHashMismatches
_ -> mempty
in case Map.lookup supplied expectedMismatches of
Just expected
| expected == computed -> pure ()
_ -> do
throwError . SyncError . SyncV2.PullError'DownloadEntities . SyncV2.DownloadEntitiesEntityValidationFailure $ err
err -> do
throwError . SyncError . SyncV2.PullError'DownloadEntities . SyncV2.DownloadEntitiesEntityValidationFailure $ err

streamIntoCodebase :: Bool -> Codebase.Codebase IO v a -> SyncV2.StreamInitInfo -> Stream () SyncV2.EntityChunk -> StreamM ()
streamIntoCodebase shouldValidate codebase SyncV2.StreamInitInfo {version, entitySorting, numEntities = numEntities} stream = ExceptT do
withStreamProgressCallback (fromIntegral <$> numEntities) \countC -> runExceptT do
Expand All @@ -157,6 +215,7 @@ streamIntoCodebase shouldValidate codebase SyncV2.StreamInitInfo {version, entit
SyncV2.DependenciesFirst -> syncSortedStream shouldValidate codebase stream'
SyncV2.Unsorted -> syncUnsortedStream shouldValidate codebase stream'

-- | Verify that the hash we expected to import from the stream was successfully loaded into the codebase.
afterSyncChecks :: Codebase.Codebase IO v a -> Hash32 -> ExceptT (SyncError SyncV2.PullError) IO ()
afterSyncChecks codebase hash = do
lift (didCausalSuccessfullyImport codebase hash) >>= \case
Expand All @@ -171,53 +230,15 @@ afterSyncChecks codebase hash = do
let expectedHash = hash32ToCausalHash hash
isJust <$> (Codebase.runTransaction codebase $ Q.loadCausalByCausalHash expectedHash)

-- | Topologically sort entities based on their dependencies.
sortDependencyFirst :: [(Hash32, TempEntity)] -> [(Hash32, TempEntity)]
sortDependencyFirst entities = do
let adjList = entities <&> \(hash32, entity) -> ((hash32, entity), hash32, Set.toList $ Share.entityDependencies (tempEntityToEntity entity))
(graph, vertexInfo, _vertexForKey) = Graph.graphFromEdges adjList
in Graph.reverseTopSort graph <&> \v -> (view _1 $ vertexInfo v)

syncFromFile ::
Bool ->
-- | Location of the sync-file
FilePath ->
Cli (Either (SyncError SyncV2.PullError) CausalHash)
syncFromFile shouldValidate syncFilePath = do
Cli.Env {codebase} <- ask
runExceptT do
Debug.debugLogM Debug.Temp $ "Kicking off sync"
mapExceptT liftIO $ Timing.time "File Sync" $ do
header <- mapExceptT C.runResourceT $ do
let stream = C.sourceFile syncFilePath C..| C.ungzip C..| decodeUnframedEntities
(header, rest) <- initializeStream stream
streamIntoCodebase shouldValidate codebase header rest
pure header
afterSyncChecks codebase (SyncV2.rootCausalHash header)
pure . hash32ToCausalHash $ SyncV2.rootCausalHash header

syncFromCodebase ::
Bool ->
-- | The codebase to sync from.
Sqlite.Connection ->
(Codebase.Codebase IO v a) ->
-- | The hash to sync.
CausalHash ->
IO (Either (SyncError SyncV2.PullError) ())
syncFromCodebase shouldValidate srcConn destCodebase causalHash = do
liftIO . C.runResourceT . runExceptT $ withEntityStream srcConn causalHash Nothing \_total entityStream -> do
(header, rest) <- initializeStream entityStream
streamIntoCodebase shouldValidate destCodebase header rest
mapExceptT liftIO (afterSyncChecks destCodebase (causalHashToHash32 causalHash))

withEntityStream ::
-- | Load and stream entities for a given causal hash from a codebase.
withCodebaseEntityStream ::
(MonadIO m) =>
Sqlite.Connection ->
CausalHash ->
Maybe SyncV2.BranchRef ->
(Int -> Stream () SyncV2.DownloadEntitiesChunk -> m r) ->
m r
withEntityStream conn rootHash mayBranchRef callback = do
withCodebaseEntityStream conn rootHash mayBranchRef callback = do
entities <- liftIO $ withEntityLoadingCallback $ \counter -> do
Sqlite.runTransaction conn (depsForCausal rootHash counter)
liftIO $ Text.hPutStrLn IO.stderr $ "Finished loading entities, writing sync-file."
Expand All @@ -244,54 +265,24 @@ withEntityStream conn rootHash mayBranchRef callback = do
& (initialChunk :)
let stream = C.yieldMany contents
callback totalEntities stream

syncToFile ::
Codebase.Codebase IO v a ->
CausalHash ->
Maybe SyncV2.BranchRef ->
FilePath ->
IO (Either SyncErr ())
syncToFile codebase rootHash mayBranchRef destFilePath = do
liftIO $ Codebase.withConnection codebase \conn -> do
C.runResourceT $
withEntityStream conn rootHash mayBranchRef \mayTotal stream -> do
withStreamProgressCallback (Just mayTotal) \countC -> runExceptT do
C.runConduit $ stream C..| countC C..| C.map (BL.toStrict . CBOR.serialise) C..| C.transPipe liftIO C.gzip C..| C.sinkFile destFilePath

-- | Collect all dependencies of a given causal hash.
depsForCausal :: CausalHash -> (Int -> IO ()) -> Sqlite.Transaction (Map Hash32 (Sync.Entity Text Hash32 Hash32))
depsForCausal causalHash counter = do
flip execStateT mempty $ expandEntities (causalHashToHash32 causalHash)
where
expandEntities :: Hash32 -> ((StateT (Map Hash32 (Sync.Entity Text Hash32 Hash32)) Sqlite.Transaction)) ()
expandEntities hash32 = do
gets (Map.member hash32) >>= \case
True -> pure ()
False -> do
entity <- lift $ Sync.expectEntity hash32
modify (Map.insert hash32 entity)
lift . Sqlite.unsafeIO $ counter 1
traverseOf_ Sync.entityHashes_ expandEntities entity

-- | Gets the framed chunks from a NetString framed stream.
_unNetString :: ConduitT ByteString ByteString StreamM ()
_unNetString = do
bs <- C.sinkParser $ do
len <- A8.decimal
_ <- A8.char ':'
bs <- A.take len
_ <- A8.char ','
pure bs
C.yield bs

_decodeFramedEntity :: ByteString -> StreamM SyncV2.DownloadEntitiesChunk
_decodeFramedEntity bs = do
case CBOR.deserialiseOrFail (BL.fromStrict bs) of
Left err -> throwError . SyncError . SyncV2.PullError'Sync $ SyncV2.SyncErrorDeserializationFailure err
Right chunk -> pure chunk
-- Collect all dependencies of a given causal hash.
depsForCausal :: CausalHash -> (Int -> IO ()) -> Sqlite.Transaction (Map Hash32 (Sync.Entity Text Hash32 Hash32))
depsForCausal causalHash counter = do
flip execStateT mempty $ expandEntities (causalHashToHash32 causalHash)
where
expandEntities :: Hash32 -> ((StateT (Map Hash32 (Sync.Entity Text Hash32 Hash32)) Sqlite.Transaction)) ()
expandEntities hash32 = do
gets (Map.member hash32) >>= \case
True -> pure ()
False -> do
entity <- lift $ Sync.expectEntity hash32
modify (Map.insert hash32 entity)
lift . Sqlite.unsafeIO $ counter 1
traverseOf_ Sync.entityHashes_ expandEntities entity

-- Expects a stream of tightly-packed CBOR entities without any framing/separators.
decodeUnframedEntities :: ConduitT ByteString SyncV2.DownloadEntitiesChunk StreamM ()
decodeUnframedEntities :: Stream ByteString SyncV2.DownloadEntitiesChunk
decodeUnframedEntities = C.transPipe (mapExceptT (lift . stToIO)) $ do
C.await >>= \case
Nothing -> pure ()
Expand Down Expand Up @@ -338,11 +329,10 @@ decodeUnframedEntities = C.transPipe (mapExceptT (lift . stToIO)) $ do
k <- newDecoder
loop rem k

-- | Peel the header off the stream and parse the remaining entity chunks.
-- | Peel the header off the stream and parse the remaining entity chunks into EntityChunks
initializeStream :: Stream () SyncV2.DownloadEntitiesChunk -> StreamM (SyncV2.StreamInitInfo, Stream () SyncV2.EntityChunk)
initializeStream stream = do
(streamRemainder, init) <- stream C.$$+ C.headC
Debug.debugM Debug.Temp "Got initial chunk: " init
case init of
Nothing -> throwError . SyncError . SyncV2.PullError'Sync $ SyncV2.SyncErrorMissingInitialChunk
Just chunk -> do
Expand All @@ -351,7 +341,6 @@ initializeStream stream = do
let entityStream = C.unsealConduitT streamRemainder C..| C.mapM parseEntity
pure $ (info, entityStream)
SyncV2.EntityC _ -> do
Debug.debugLogM Debug.Temp $ "Got unexpected entity chunk"
throwError . SyncError . SyncV2.PullError'Sync $ SyncV2.SyncErrorMissingInitialChunk
SyncV2.ErrorC (SyncV2.ErrorChunk err) -> throwError . SyncError . SyncV2.PullError'DownloadEntities $ err
where
Expand All @@ -361,6 +350,10 @@ initializeStream stream = do
SyncV2.ErrorC (SyncV2.ErrorChunk err) -> throwError . SyncError $ SyncV2.PullError'DownloadEntities err
SyncV2.InitialC {} -> throwError . SyncError $ SyncV2.PullError'Sync SyncV2.SyncErrorMisplacedInitialChunk

------------------------------------------------------------------------------------------------------------------------
-- Progress Tracking
------------------------------------------------------------------------------------------------------------------------

-- Provide the given action a callback that display to the terminal.
withStreamProgressCallback :: (MonadIO m, MonadUnliftIO n) => Maybe Int -> (ConduitT i i m () -> n a) -> n a
withStreamProgressCallback total action = do
Expand Down
1 change: 0 additions & 1 deletion unison-cli/unison-cli.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,6 @@ library
, aeson-pretty
, ansi-terminal
, async
, attoparsec
, base
, bytestring
, cmark
Expand Down
Loading

0 comments on commit 0a7ef4f

Please sign in to comment.