From 1df99747c0c9997603755b07803b8ec81cea4ce1 Mon Sep 17 00:00:00 2001 From: Chris Penner Date: Tue, 14 Jan 2025 15:43:02 -0800 Subject: [PATCH] SyncV2 with Share server --- .../U/Codebase/Sqlite/Queries.hs | 92 +++++++++++------- .../sql/001-temp-entity-tables.sql | 3 +- lib/unison-sqlite/package.yaml | 1 + .../src/Unison/Sqlite/Connection.hs | 62 +++++++++--- .../src/Unison/Sqlite/Connection/Internal.hs | 8 +- lib/unison-sqlite/unison-sqlite.cabal | 3 +- unison-cli/src/Unison/Cli/DownloadUtils.hs | 46 ++++++--- .../Codebase/Editor/HandleInput/InstallLib.hs | 2 +- .../Editor/HandleInput/ProjectClone.hs | 4 +- .../Editor/HandleInput/ProjectCreate.hs | 4 +- .../Codebase/Editor/HandleInput/Pull.hs | 1 + .../Codebase/Editor/HandleInput/SyncV2.hs | 30 ++++++ unison-cli/src/Unison/Share/Sync/Util.hs | 42 ++++++++ unison-cli/src/Unison/Share/SyncV2.hs | 95 +++++++++++++++++++ unison-cli/unison-cli.cabal | 1 + unison-share-api/src/Unison/SyncV2/API.hs | 29 ++++++ unison-share-api/unison-share-api.cabal | 1 + 17 files changed, 356 insertions(+), 68 deletions(-) create mode 100644 unison-cli/src/Unison/Share/Sync/Util.hs create mode 100644 unison-share-api/src/Unison/SyncV2/API.hs diff --git a/codebase2/codebase-sqlite/U/Codebase/Sqlite/Queries.hs b/codebase2/codebase-sqlite/U/Codebase/Sqlite/Queries.hs index 033efb8655..936dd91cdf 100644 --- a/codebase2/codebase-sqlite/U/Codebase/Sqlite/Queries.hs +++ b/codebase2/codebase-sqlite/U/Codebase/Sqlite/Queries.hs @@ -228,6 +228,7 @@ module U.Codebase.Sqlite.Queries expectEntity, syncToTempEntity, insertTempEntity, + insertTempEntityV2, saveTempEntityInMain, expectTempEntity, deleteTempEntity, @@ -315,6 +316,7 @@ import Data.Map.NonEmpty qualified as NEMap import Data.Maybe qualified as Maybe import Data.Sequence qualified as Seq import Data.Set qualified as Set +import Data.Set.NonEmpty (NESet) import Data.Text qualified as Text import Data.Text.Encoding qualified as Text import Data.Text.Lazy qualified as Text.Lazy @@ -532,23 +534,18 @@ countWatches = queryOneCol [sql| SELECT COUNT(*) FROM watch |] saveHash :: Hash32 -> Transaction HashId saveHash hash = do - execute - [sql| - INSERT INTO hash (base32) VALUES (:hash) - ON CONFLICT DO NOTHING - |] - expectHashId hash + loadHashId hash >>= \case + Just h -> pure h + Nothing -> do + queryOneCol + [sql| + INSERT INTO hash (base32) VALUES (:hash) + RETURNING id + |] saveHashes :: Traversable f => f Hash32 -> Transaction (f HashId) saveHashes hashes = do - for_ hashes \hash -> - execute - [sql| - INSERT INTO hash (base32) - VALUES (:hash) - ON CONFLICT DO NOTHING - |] - traverse expectHashId hashes + for hashes saveHash saveHashHash :: Hash -> Transaction HashId saveHashHash = saveHash . Hash32.fromHash @@ -623,13 +620,15 @@ expectBranchHashForCausalHash ch = do saveText :: Text -> Transaction TextId saveText t = do - execute - [sql| - INSERT INTO text (text) - VALUES (:t) - ON CONFLICT DO NOTHING - |] - expectTextId t + loadTextId t >>= \case + Just h -> pure h + Nothing -> do + queryOneCol + [sql| + INSERT INTO text (text) + VALUES (:t) + RETURNING id + |] saveTexts :: Traversable f => f Text -> Transaction (f TextId) saveTexts = @@ -686,7 +685,7 @@ saveObject :: ObjectType -> ByteString -> Transaction ObjectId -saveObject hh h t blob = do +saveObject _hh h t blob = do execute [sql| INSERT INTO object (primary_hash_id, type_id, bytes) @@ -697,9 +696,9 @@ saveObject hh h t blob = do saveHashObject h oId 2 -- todo: remove this from here, and add it to other relevant places once there are v1 and v2 hashes rowsModified >>= \case 0 -> pure () - _ -> do - hash <- expectHash32 h - tryMoveTempEntityDependents hh hash + _ -> pure () + -- hash <- expectHash32 h + -- tryMoveTempEntityDependents hh hash pure oId expectObject :: SqliteExceptionReason e => ObjectId -> (ByteString -> Either e a) -> Transaction a @@ -957,7 +956,7 @@ saveCausal :: BranchHashId -> [CausalHashId] -> Transaction () -saveCausal hh self value parents = do +saveCausal _hh self value parents = do execute [sql| INSERT INTO causal (self_hash_id, value_hash_id) @@ -973,15 +972,15 @@ saveCausal hh self value parents = do INSERT INTO causal_parent (causal_id, parent_id) VALUES (:self, :parent) |] - flushCausalDependents hh self + -- flushCausalDependents hh self -flushCausalDependents :: +_flushCausalDependents :: HashHandle -> CausalHashId -> Transaction () -flushCausalDependents hh chId = do +_flushCausalDependents hh chId = do hash <- expectHash32 (unCausalHashId chId) - tryMoveTempEntityDependents hh hash + _tryMoveTempEntityDependents hh hash -- | `tryMoveTempEntityDependents #foo` does this: -- 0. Precondition: We just inserted object #foo. @@ -989,11 +988,11 @@ flushCausalDependents hh chId = do -- 2. Delete #foo as dependency from temp_entity_missing_dependency. e.g. (#bar, #foo), (#baz, #foo) -- 3. For each like #bar and #baz with no more rows in temp_entity_missing_dependency, -- insert_entity them. -tryMoveTempEntityDependents :: +_tryMoveTempEntityDependents :: HashHandle -> Hash32 -> Transaction () -tryMoveTempEntityDependents hh dependency = do +_tryMoveTempEntityDependents hh dependency = do dependents <- queryListCol [sql| @@ -2993,6 +2992,35 @@ insertTempEntity entityHash entity missingDependencies = do entityType = Entity.entityType entity +-- | Insert a new `temp_entity` row, and its associated 1+ `temp_entity_missing_dependency` rows. +-- +-- Preconditions: +-- 1. The entity does not already exist in "main" storage (`object` / `causal`) +-- 2. The entity does not already exist in `temp_entity`. +insertTempEntityV2 :: Hash32 -> TempEntity -> NESet Hash32 -> Transaction () +insertTempEntityV2 entityHash entity missingDependencies = do + execute + [sql| + INSERT INTO temp_entity (hash, blob, type_id) + VALUES (:entityHash, :entityBlob, :entityType) + ON CONFLICT DO NOTHING + |] + + for_ missingDependencies \depHash -> + execute + [sql| + INSERT INTO temp_entity_missing_dependency (dependent, dependency) + VALUES (:entityHash, :depHash) + |] + where + entityBlob :: ByteString + entityBlob = + runPutS (Serialization.putTempEntity entity) + + entityType :: TempEntityType + entityType = + Entity.entityType entity + -- | Delete a row from the `temp_entity` table, if it exists. deleteTempEntity :: Hash32 -> Transaction () deleteTempEntity hash = diff --git a/codebase2/codebase-sqlite/sql/001-temp-entity-tables.sql b/codebase2/codebase-sqlite/sql/001-temp-entity-tables.sql index 0ae13812b1..6651d4a6fe 100644 --- a/codebase2/codebase-sqlite/sql/001-temp-entity-tables.sql +++ b/codebase2/codebase-sqlite/sql/001-temp-entity-tables.sql @@ -56,7 +56,8 @@ create table if not exists temp_entity ( create table if not exists temp_entity_missing_dependency ( dependent text not null references temp_entity(hash), dependency text not null, - dependencyJwt text not null, + -- TODO: this is just for testing + dependencyJwt text null, unique (dependent, dependency) ); create index if not exists temp_entity_missing_dependency_ix_dependent on temp_entity_missing_dependency (dependent); diff --git a/lib/unison-sqlite/package.yaml b/lib/unison-sqlite/package.yaml index 84d0201eab..b90bd2aa57 100644 --- a/lib/unison-sqlite/package.yaml +++ b/lib/unison-sqlite/package.yaml @@ -9,6 +9,7 @@ library: dependencies: - base + - containers - direct-sqlite - megaparsec - pretty-simple diff --git a/lib/unison-sqlite/src/Unison/Sqlite/Connection.hs b/lib/unison-sqlite/src/Unison/Sqlite/Connection.hs index 48167980db..726cac860e 100644 --- a/lib/unison-sqlite/src/Unison/Sqlite/Connection.hs +++ b/lib/unison-sqlite/src/Unison/Sqlite/Connection.hs @@ -58,6 +58,7 @@ module Unison.Sqlite.Connection ) where +import Data.Map qualified as Map import Database.SQLite.Simple qualified as Sqlite import Database.SQLite.Simple.FromField qualified as Sqlite import Database.SQLite3 qualified as Direct.Sqlite @@ -71,7 +72,10 @@ import Unison.Sqlite.Connection.Internal (Connection (..)) import Unison.Sqlite.Exception import Unison.Sqlite.Sql (Sql (..)) import Unison.Sqlite.Sql qualified as Sql +import UnliftIO (atomically) import UnliftIO.Exception +import UnliftIO.STM (readTVar) +import UnliftIO.STM qualified as STM -- | Perform an action with a connection to a SQLite database. -- @@ -103,19 +107,47 @@ openConnection name file = do Just "" -> file _ -> "file:" <> file <> "?mode=ro" conn0 <- Sqlite.open sqliteURI `catch` rethrowAsSqliteConnectException name file - let conn = Connection {conn = conn0, file, name} + statementCache <- STM.newTVarIO Map.empty + let conn = Connection {conn = conn0, file, name, statementCache} execute conn [Sql.sql| PRAGMA foreign_keys = ON |] execute conn [Sql.sql| PRAGMA busy_timeout = 60000 |] + execute conn [Sql.sql| PRAGMA synchronous = normal |] + execute conn [Sql.sql| PRAGMA journal_size_limit = 6144000 |] + execute conn [Sql.sql| PRAGMA cache_size = -64000 |] + execute conn [Sql.sql| PRAGMA temp_store = 2 |] + pure conn -- Close a connection opened with 'openConnection'. closeConnection :: Connection -> IO () -closeConnection (Connection _ _ conn) = +closeConnection conn@(Connection {conn = conn0}) = do -- FIXME if this throws an exception, it won't be under `SomeSqliteException` -- Possible fixes: -- 1. Add close exception to the hierarchy, e.g. `SqliteCloseException` -- 2. Always ignore exceptions thrown by `close` (Mitchell prefers this one) - Sqlite.close conn + closeAllStatements conn + Sqlite.close conn0 + +withStatement :: Connection -> Text -> (Sqlite.Statement -> IO a) -> IO a +withStatement conn sql action = do + bracket (prepareStatement conn sql) Sqlite.reset action + where + prepareStatement :: Connection -> Text -> IO Sqlite.Statement + prepareStatement Connection {conn, statementCache} sql = do + cached <- atomically $ do + cache <- STM.readTVar statementCache + pure $ Map.lookup sql cache + case cached of + Just stmt -> pure stmt + Nothing -> do + stmt <- Sqlite.openStatement conn (coerce @Text @Sqlite.Query sql) + atomically $ STM.modifyTVar statementCache (Map.insert sql stmt) + pure stmt + +closeAllStatements :: Connection -> IO () +closeAllStatements Connection {statementCache} = do + cache <- atomically $ readTVar statementCache + for_ cache Sqlite.closeStatement -- An internal type, for making prettier debug logs @@ -152,7 +184,7 @@ logQuery (Sql sql params) result = -- Without results execute :: (HasCallStack) => Connection -> Sql -> IO () -execute conn@(Connection _ _ conn0) sql@(Sql s params) = do +execute conn sql@(Sql s params) = do logQuery sql Nothing doExecute `catch` \(exception :: Sqlite.SQLError) -> throwSqliteQueryException @@ -163,16 +195,16 @@ execute conn@(Connection _ _ conn0) sql@(Sql s params) = do } where doExecute :: IO () - doExecute = - Sqlite.withStatement conn0 (coerce s) \(Sqlite.Statement statement) -> do - bindParameters statement params - void (Direct.Sqlite.step statement) + doExecute = do + withStatement conn s \statement -> do + bindParameters (coerce statement) params + void (Direct.Sqlite.step $ coerce statement) -- | Execute one or more semicolon-delimited statements. -- -- This function does not support parameters, and is mostly useful for executing DDL and migrations. executeStatements :: (HasCallStack) => Connection -> Text -> IO () -executeStatements conn@(Connection _ _ (Sqlite.Connection database _tempNameCounter)) sql = do +executeStatements conn@(Connection {conn = Sqlite.Connection database _tempNameCounter}) sql = do logQuery (Sql sql []) Nothing Direct.Sqlite.exec database sql `catch` \(exception :: Sqlite.SQLError) -> throwSqliteQueryException @@ -185,7 +217,7 @@ executeStatements conn@(Connection _ _ (Sqlite.Connection database _tempNameCoun -- With results, without checks queryStreamRow :: (HasCallStack, Sqlite.FromRow a) => Connection -> Sql -> (IO (Maybe a) -> IO r) -> IO r -queryStreamRow conn@(Connection _ _ conn0) sql@(Sql s params) callback = +queryStreamRow conn sql@(Sql s params) callback = run `catch` \(exception :: Sqlite.SQLError) -> throwSqliteQueryException SqliteQueryExceptionInfo @@ -194,8 +226,8 @@ queryStreamRow conn@(Connection _ _ conn0) sql@(Sql s params) callback = sql } where - run = - bracket (Sqlite.openStatement conn0 (coerce s)) Sqlite.closeStatement \statement -> do + run = do + withStatement conn s \statement -> do Sqlite.bind statement params callback (Sqlite.nextRow statement) @@ -213,7 +245,7 @@ queryStreamCol = queryStreamRow queryListRow :: forall a. (Sqlite.FromRow a, HasCallStack) => Connection -> Sql -> IO [a] -queryListRow conn@(Connection _ _ conn0) sql@(Sql s params) = do +queryListRow conn sql@(Sql s params) = do result <- doQuery `catch` \(exception :: Sqlite.SQLError) -> @@ -228,7 +260,7 @@ queryListRow conn@(Connection _ _ conn0) sql@(Sql s params) = do where doQuery :: IO [a] doQuery = - Sqlite.withStatement conn0 (coerce s) \statement -> do + withStatement conn (coerce s) \statement -> do bindParameters (coerce statement) params let loop :: [a] -> IO [a] loop rows = @@ -347,7 +379,7 @@ queryOneColCheck conn s check = -- Rows modified rowsModified :: Connection -> IO Int -rowsModified (Connection _ _ conn) = +rowsModified (Connection {conn}) = Sqlite.changes conn -- Vacuum diff --git a/lib/unison-sqlite/src/Unison/Sqlite/Connection/Internal.hs b/lib/unison-sqlite/src/Unison/Sqlite/Connection/Internal.hs index 5f80151f94..579c37cfb9 100644 --- a/lib/unison-sqlite/src/Unison/Sqlite/Connection/Internal.hs +++ b/lib/unison-sqlite/src/Unison/Sqlite/Connection/Internal.hs @@ -3,15 +3,19 @@ module Unison.Sqlite.Connection.Internal ) where +import Data.Map (Map) +import Data.Text (Text) import Database.SQLite.Simple qualified as Sqlite +import UnliftIO.STM (TVar) -- | A /non-thread safe/ connection to a SQLite database. data Connection = Connection { name :: String, file :: FilePath, - conn :: Sqlite.Connection + conn :: Sqlite.Connection, + statementCache :: TVar (Map Text Sqlite.Statement) } instance Show Connection where - show (Connection name file _conn) = + show (Connection name file _conn _statementCache) = "Connection { name = " ++ show name ++ ", file = " ++ show file ++ " }" diff --git a/lib/unison-sqlite/unison-sqlite.cabal b/lib/unison-sqlite/unison-sqlite.cabal index 28ea0f7c4f..329a05c5d8 100644 --- a/lib/unison-sqlite/unison-sqlite.cabal +++ b/lib/unison-sqlite/unison-sqlite.cabal @@ -1,6 +1,6 @@ cabal-version: 1.12 --- This file has been generated from package.yaml by hpack version 0.36.0. +-- This file has been generated from package.yaml by hpack version 0.37.0. -- -- see: https://github.com/sol/hpack @@ -64,6 +64,7 @@ library ghc-options: -Wall build-depends: base + , containers , direct-sqlite , megaparsec , pretty-simple diff --git a/unison-cli/src/Unison/Cli/DownloadUtils.hs b/unison-cli/src/Unison/Cli/DownloadUtils.hs index 343ebfeeb5..fb53a84176 100644 --- a/unison-cli/src/Unison/Cli/DownloadUtils.hs +++ b/unison-cli/src/Unison/Cli/DownloadUtils.hs @@ -4,12 +4,14 @@ module Unison.Cli.DownloadUtils ( downloadProjectBranchFromShare, downloadLooseCodeFromShare, + SyncVersion (..), ) where import Control.Concurrent.STM (atomically) import Control.Concurrent.STM.TVar (modifyTVar', newTVarIO, readTVar, readTVarIO) import Data.List.NonEmpty (pattern (:|)) +import Data.Set qualified as Set import System.Console.Regions qualified as Console.Regions import U.Codebase.HashTags (CausalHash) import U.Codebase.Sqlite.Queries qualified as Queries @@ -28,20 +30,24 @@ import Unison.Share.API.Hash qualified as Share import Unison.Share.Codeserver qualified as Codeserver import Unison.Share.Sync qualified as Share import Unison.Share.Sync.Types qualified as Share +import Unison.Share.SyncV2 qualified as SyncV2 import Unison.Share.Types (codeserverBaseURL) import Unison.Sync.Common qualified as Sync.Common import Unison.Sync.Types qualified as Share +import Unison.SyncV2.Types qualified as SyncV2 + +data SyncVersion = SyncV1 | SyncV2 -- | Download a project/branch from Share. downloadProjectBranchFromShare :: (HasCallStack) => + SyncVersion -> Share.IncludeSquashedHead -> Share.RemoteProjectBranch -> Cli (Either Output.ShareError CausalHash) -downloadProjectBranchFromShare useSquashed branch = +downloadProjectBranchFromShare syncVersion useSquashed branch = Cli.labelE \done -> do let remoteProjectBranchName = branch.branchName - let repoInfo = Share.RepoInfo (into @Text (ProjectAndBranch branch.projectName remoteProjectBranchName)) causalHashJwt <- case (useSquashed, branch.squashedBranchHead) of (Share.IncludeSquashedHead, Nothing) -> done Output.ShareExpectedSquashedHead @@ -49,16 +55,32 @@ downloadProjectBranchFromShare useSquashed branch = (Share.NoSquashedHead, _) -> pure branch.branchHead exists <- Cli.runTransaction (Queries.causalExistsByHash32 (Share.hashJWTHash causalHashJwt)) when (not exists) do - (result, numDownloaded) <- - Cli.with withEntitiesDownloadedProgressCallback \(downloadedCallback, getNumDownloaded) -> do - result <- Share.downloadEntities Share.hardCodedBaseUrl repoInfo causalHashJwt downloadedCallback - numDownloaded <- liftIO getNumDownloaded - pure (result, numDownloaded) - result & onLeft \err0 -> do - done case err0 of - Share.SyncError err -> Output.ShareErrorDownloadEntities err - Share.TransportError err -> Output.ShareErrorTransport err - Cli.respond (Output.DownloadedEntities numDownloaded) + case syncVersion of + SyncV1 -> do + let repoInfo = Share.RepoInfo (into @Text (ProjectAndBranch branch.projectName remoteProjectBranchName)) + Cli.with withEntitiesDownloadedProgressCallback \(downloadedCallback, getNumDownloaded) -> do + result <- Share.downloadEntities Share.hardCodedBaseUrl repoInfo causalHashJwt downloadedCallback + numDownloaded <- liftIO getNumDownloaded + result & onLeft \err0 -> do + done case err0 of + Share.SyncError err -> Output.ShareErrorDownloadEntities err + Share.TransportError err -> Output.ShareErrorTransport err + Cli.respond (Output.DownloadedEntities numDownloaded) + SyncV2 -> do + -- Cli.with withEntitiesDownloadedProgressCallback \(downloadedCallback, getNumDownloaded) -> do + let branchRef = SyncV2.BranchRef (into @Text (ProjectAndBranch branch.projectName remoteProjectBranchName)) + -- TODO: Fill this in. + let knownHashes = Set.empty + let downloadedCallback = \_ -> pure () + let shouldValidate = not $ Codeserver.isCustomCodeserver Codeserver.defaultCodeserver + result <- SyncV2.syncFromCodeserver shouldValidate Share.hardCodedBaseUrl branchRef causalHashJwt knownHashes downloadedCallback + result & onLeft \err0 -> do + done case err0 of + Share.SyncError err -> + -- TODO: Fix this + error (show err) + -- Output.ShareErrorDownloadEntities err + Share.TransportError err -> Output.ShareErrorTransport err pure (Sync.Common.hash32ToCausalHash (Share.hashJWTHash causalHashJwt)) -- | Download loose code from Share. diff --git a/unison-cli/src/Unison/Codebase/Editor/HandleInput/InstallLib.hs b/unison-cli/src/Unison/Codebase/Editor/HandleInput/InstallLib.hs index 52e70188c8..299f30ba47 100644 --- a/unison-cli/src/Unison/Codebase/Editor/HandleInput/InstallLib.hs +++ b/unison-cli/src/Unison/Codebase/Editor/HandleInput/InstallLib.hs @@ -60,7 +60,7 @@ handleInstallLib remind (ProjectAndBranch libdepProjectName unresolvedLibdepBran Cli.Env {codebase} <- ask causalHash <- - downloadProjectBranchFromShare Share.IncludeSquashedHead libdepProjectBranch + downloadProjectBranchFromShare SyncV1 Share.IncludeSquashedHead libdepProjectBranch & onLeftM (Cli.returnEarly . Output.ShareError) remoteBranchObject <- liftIO (Codebase.expectBranchForHash codebase causalHash) diff --git a/unison-cli/src/Unison/Codebase/Editor/HandleInput/ProjectClone.hs b/unison-cli/src/Unison/Codebase/Editor/HandleInput/ProjectClone.hs index 8a872d18b8..670a730b5e 100644 --- a/unison-cli/src/Unison/Codebase/Editor/HandleInput/ProjectClone.hs +++ b/unison-cli/src/Unison/Codebase/Editor/HandleInput/ProjectClone.hs @@ -13,7 +13,7 @@ import U.Codebase.Sqlite.Project qualified as Sqlite (Project (..)) import U.Codebase.Sqlite.ProjectBranch qualified as Sqlite import U.Codebase.Sqlite.Queries qualified as Q import U.Codebase.Sqlite.Queries qualified as Queries -import Unison.Cli.DownloadUtils (downloadProjectBranchFromShare) +import Unison.Cli.DownloadUtils (SyncVersion (..), downloadProjectBranchFromShare) import Unison.Cli.Monad (Cli) import Unison.Cli.Monad qualified as Cli import Unison.Cli.MonadUtils qualified as Cli (getCurrentProjectAndBranch) @@ -225,7 +225,7 @@ cloneInto localProjectBranch remoteProjectBranch = do let remoteProjectBranchNames = ProjectAndBranch remoteProjectName remoteBranchName branchHead <- - downloadProjectBranchFromShare Share.NoSquashedHead remoteProjectBranch + downloadProjectBranchFromShare SyncV1 Share.NoSquashedHead remoteProjectBranch & onLeftM (Cli.returnEarly . Output.ShareError) localProjectAndBranch <- diff --git a/unison-cli/src/Unison/Codebase/Editor/HandleInput/ProjectCreate.hs b/unison-cli/src/Unison/Codebase/Editor/HandleInput/ProjectCreate.hs index e9f6e99e95..0096a91d8d 100644 --- a/unison-cli/src/Unison/Codebase/Editor/HandleInput/ProjectCreate.hs +++ b/unison-cli/src/Unison/Codebase/Editor/HandleInput/ProjectCreate.hs @@ -13,7 +13,7 @@ import U.Codebase.Sqlite.Project (Project (..)) import U.Codebase.Sqlite.ProjectBranch (ProjectBranch (..)) import U.Codebase.Sqlite.Queries (expectCausalHashIdByCausalHash) import U.Codebase.Sqlite.Queries qualified as Queries -import Unison.Cli.DownloadUtils (downloadProjectBranchFromShare) +import Unison.Cli.DownloadUtils (SyncVersion (..), downloadProjectBranchFromShare) import Unison.Cli.Monad (Cli) import Unison.Cli.Monad qualified as Cli import Unison.Cli.Share.Projects qualified as Share @@ -108,7 +108,7 @@ projectCreate tryDownloadingBase maybeProjectName = do Share.GetProjectBranchResponseBranchNotFound -> done Nothing Share.GetProjectBranchResponseProjectNotFound -> done Nothing Share.GetProjectBranchResponseSuccess branch -> pure branch - downloadProjectBranchFromShare Share.NoSquashedHead baseLatestReleaseBranch + downloadProjectBranchFromShare SyncV1 Share.NoSquashedHead baseLatestReleaseBranch & onLeftM (Cli.returnEarly . Output.ShareError) Cli.Env {codebase} <- ask baseLatestReleaseBranchObject <- diff --git a/unison-cli/src/Unison/Codebase/Editor/HandleInput/Pull.hs b/unison-cli/src/Unison/Codebase/Editor/HandleInput/Pull.hs index 3ff7012220..42aebf0299 100644 --- a/unison-cli/src/Unison/Codebase/Editor/HandleInput/Pull.hs +++ b/unison-cli/src/Unison/Codebase/Editor/HandleInput/Pull.hs @@ -59,6 +59,7 @@ handlePull unresolvedSourceAndTarget pullMode = do ReadShare'LooseCode repo -> downloadLooseCodeFromShare repo & onLeftM (Cli.returnEarly . Output.ShareError) ReadShare'ProjectBranch remoteBranch -> downloadProjectBranchFromShare + SyncV1 ( case pullMode of Input.PullWithHistory -> Share.NoSquashedHead Input.PullWithoutHistory -> Share.IncludeSquashedHead diff --git a/unison-cli/src/Unison/Codebase/Editor/HandleInput/SyncV2.hs b/unison-cli/src/Unison/Codebase/Editor/HandleInput/SyncV2.hs index f34a64302a..3e3c7ba5ec 100644 --- a/unison-cli/src/Unison/Codebase/Editor/HandleInput/SyncV2.hs +++ b/unison-cli/src/Unison/Codebase/Editor/HandleInput/SyncV2.hs @@ -2,6 +2,7 @@ module Unison.Codebase.Editor.HandleInput.SyncV2 ( handleSyncToFile, handleSyncFromFile, handleSyncFromCodebase, + handleSyncFromCodeserver, ) where @@ -21,6 +22,7 @@ import Unison.Prelude import Unison.Project (ProjectAndBranch (..), ProjectBranchName, ProjectName) import Unison.Share.SyncV2 qualified as SyncV2 import Unison.SyncV2.Types (BranchRef) +import Unison.Cli.DownloadUtils (SyncVersion, downloadProjectBranchFromShare) handleSyncToFile :: FilePath -> ProjectAndBranch (Maybe ProjectName) (Maybe ProjectBranchName) -> Cli () handleSyncToFile destSyncFile branchToSync = do @@ -69,3 +71,31 @@ handleSyncFromCodebase description srcCodebasePath srcBranch destBranch = do Cli.setProjectBranchRootToCausalHash (projectBranch ^. #branch) description causalHash Right (Right (Left syncErr)) -> do Cli.respond (Output.SyncPullError syncErr) + +handleSyncFromCodebase :: Text -> CodebasePath -> ProjectAndBranch ProjectName ProjectBranchName -> ProjectAndBranch (Maybe ProjectName) ProjectBranchName -> Cli () +handleSyncFromCodebase description srcCodebasePath srcBranch destBranch = do + Cli.Env {codebase} <- ask + pp <- Cli.getCurrentProjectPath + projectBranch <- Project.resolveProjectBranchInProject (pp ^. #project) (over #branch Just destBranch) + r <- liftIO $ Init.withOpenCodebase SqliteCodebase.init "sync-src" srcCodebasePath Init.DontLock (Init.MigrateAfterPrompt Init.Backup Init.Vacuum) \srcCodebase -> do + Codebase.withConnection srcCodebase \srcConn -> do + maySrcCausalHash <- Codebase.runTransaction srcCodebase $ do + let ProjectAndBranch srcProjName srcBranchName = srcBranch + runMaybeT do + project <- MaybeT (Q.loadProjectByName srcProjName) + branch <- MaybeT (Q.loadProjectBranchByName (project ^. #projectId) srcBranchName) + lift $ Project.getProjectBranchCausalHash branch + case maySrcCausalHash of + Nothing -> pure $ Left (error "Todo proper error") + Just srcCausalHash -> do + let shouldValidate = True + fmap (const srcCausalHash) <$> liftIO (SyncV2.syncFromCodebase shouldValidate srcConn codebase srcCausalHash) + + case r of + Left _err -> pure $ error "Todo proper error" + Right (Left syncErr) -> Cli.respond (Output.SyncPullError syncErr) + Right (Right causalHash) -> do + Cli.setProjectBranchRootToCausalHash (projectBranch ^. #branch) description causalHash + +handleSyncFromCodeserver :: SyncVersion -> Projects.IncludeSquashedHead -> Projects.RemoteProjectBranch -> Cli (Either Output.ShareError CausalHash) +handleSyncFromCodeserver = downloadProjectBranchFromShare diff --git a/unison-cli/src/Unison/Share/Sync/Util.hs b/unison-cli/src/Unison/Share/Sync/Util.hs new file mode 100644 index 0000000000..39eeb2cede --- /dev/null +++ b/unison-cli/src/Unison/Share/Sync/Util.hs @@ -0,0 +1,42 @@ +module Unison.Share.Sync.Util + ( BailT (..), + MonadBail (..), + runBailT, + mapBailT, + withError, + ) +where + +import Control.Monad.Reader (MonadReader (..), ReaderT (..), mapReaderT, withReaderT) +import Data.Data (Typeable) +import UnliftIO qualified as IO + +newtype Handler e = Handler {runHandler :: forall x. e -> IO x} + +newtype BailT e m a = BailT {unErrGroupT :: ReaderT (Handler e) m a} + deriving newtype (Functor, Applicative, Monad, IO.MonadUnliftIO, IO.MonadIO) + +newtype ExceptionWrapper e = ExceptionWrapper {unException :: e} + +instance Show (ExceptionWrapper e) where + show (ExceptionWrapper _) = "ExceptionWrapper<>" + +instance (Typeable e) => IO.Exception (ExceptionWrapper e) + +class MonadBail e m where + bail :: e -> m a + +mapBailT :: (Monad n) => (m a -> n b) -> BailT e m a -> BailT e n b +mapBailT f (BailT m) = BailT $ mapReaderT f $ m + +withError :: (Monad m) => (e' -> e) -> BailT e' m a -> BailT e m a +withError f (BailT m) = BailT $ withReaderT (\h -> Handler $ runHandler h . f) m + +instance (IO.MonadUnliftIO m, Typeable e) => MonadBail e (BailT e m) where + bail e = do + handler <- BailT ask + BailT $ IO.liftIO $ runHandler handler e + +runBailT :: (IO.MonadUnliftIO m, Typeable e) => BailT e m a -> (e -> m a) -> m a +runBailT (BailT m) handler = do + IO.handle (handler . unException) $ runReaderT m (Handler (IO.throwIO . ExceptionWrapper)) diff --git a/unison-cli/src/Unison/Share/SyncV2.hs b/unison-cli/src/Unison/Share/SyncV2.hs index bcfccd85c3..dd90bdac75 100644 --- a/unison-cli/src/Unison/Share/SyncV2.hs +++ b/unison-cli/src/Unison/Share/SyncV2.hs @@ -48,6 +48,12 @@ import Unison.SyncV2.Types qualified as SyncV2 import Unison.Util.Servant.CBOR qualified as CBOR import Unison.Util.Timing qualified as Timing import UnliftIO qualified as IO +import Unison.SyncV2.API (Routes (downloadEntitiesStream)) +import Unison.SyncV2.API qualified as SyncV2 +import Data.Attoparsec.ByteString qualified as A +import Data.Attoparsec.ByteString.Char8 qualified as A8 +import Data.Conduit.Attoparsec qualified as C + type Stream i o = ConduitT i o StreamM () @@ -281,6 +287,23 @@ withCodebaseEntityStream conn rootHash mayBranchRef callback = do lift . Sqlite.unsafeIO $ counter 1 traverseOf_ Sync.entityHashes_ expandEntities entity +-- | Gets the framed chunks from a NetString framed stream. +_unNetString :: ConduitT ByteString ByteString StreamM () +_unNetString = do + bs <- C.sinkParser $ do + len <- A8.decimal + _ <- A8.char ':' + bs <- A.take len + _ <- A8.char ',' + pure bs + C.yield bs + +_decodeFramedEntity :: ByteString -> StreamM SyncV2.DownloadEntitiesChunk +_decodeFramedEntity bs = do + case CBOR.deserialiseOrFail (BL.fromStrict bs) of + Left err -> throwError . SyncError . SyncV2.PullError'Sync $ SyncV2.SyncErrorDeserializationFailure err + Right chunk -> pure chunk + -- Expects a stream of tightly-packed CBOR entities without any framing/separators. decodeUnframedEntities :: Stream ByteString SyncV2.DownloadEntitiesChunk decodeUnframedEntities = C.transPipe (mapExceptT (lift . stToIO)) $ do @@ -329,6 +352,78 @@ decodeUnframedEntities = C.transPipe (mapExceptT (lift . stToIO)) $ do k <- newDecoder loop rem k +------------------------------------------------------------------------------------------------------------------------ +-- Servant stuff + +type SyncAPI = ("ucm" Servant.:> "v2" Servant.:> "sync" Servant.:> SyncV2.API) + +syncAPI :: Proxy SyncAPI +syncAPI = Proxy @SyncAPI + +downloadEntitiesStreamClientM :: SyncV2.DownloadEntitiesRequest -> Servant.ClientM (Servant.SourceT IO SyncV2.DownloadEntitiesChunk) +SyncV2.Routes + { downloadEntitiesStream = downloadEntitiesStreamClientM + } = Servant.client syncAPI + +-- -- | Helper for running clientM that returns a stream of entities. +-- -- You MUST consume the stream within the callback, it will be closed when the callback returns. +-- handleStream :: forall m o. (MonadUnliftIO m) => Servant.ClientEnv -> (o -> m ()) -> Servant.ClientM (Servant.SourceIO o) -> m (Either CodeserverTransportError ()) +-- handleStream clientEnv callback clientM = do +-- handleSourceT clientEnv (SourceT.foreach (throwError . StreamingError . Text.pack) callback) clientM + +-- | Helper for running clientM that returns a stream of entities. +-- You MUST consume the stream within the callback, it will be closed when the callback returns. +withConduit :: forall r. Servant.ClientEnv -> (Stream () SyncV2.DownloadEntitiesChunk -> StreamM r) -> Servant.ClientM (Servant.SourceIO SyncV2.DownloadEntitiesChunk) -> StreamM r +withConduit clientEnv callback clientM = do + Debug.debugLogM Debug.Temp $ "Running clientM" + ExceptT $ withRunInIO \runInIO -> do + Servant.withClientM clientM clientEnv $ \case + Left err -> pure . Left . TransportError $ (handleClientError clientEnv err) + Right sourceT -> do + Debug.debugLogM Debug.Temp $ "Converting sourceIO to conduit" + conduit <- liftIO $ Servant.fromSourceIO sourceT + (runInIO . runExceptT $ callback conduit) + +handleClientError :: Servant.ClientEnv -> Servant.ClientError -> CodeserverTransportError +handleClientError clientEnv err = + case err of + Servant.FailureResponse _req resp -> + case HTTP.statusCode $ Servant.responseStatusCode resp of + 401 -> Unauthenticated (Servant.baseUrl clientEnv) + -- The server should provide semantically relevant permission-denied messages + -- when possible, but this should catch any we miss. + 403 -> PermissionDenied (Text.Lazy.toStrict . Text.Lazy.decodeUtf8 $ Servant.responseBody resp) + 408 -> Timeout + 429 -> RateLimitExceeded + 504 -> Timeout + _ -> UnexpectedResponse resp + Servant.DecodeFailure msg resp -> DecodeFailure msg resp + Servant.UnsupportedContentType _ct resp -> UnexpectedResponse resp + Servant.InvalidContentTypeHeader resp -> UnexpectedResponse resp + Servant.ConnectionError _ -> UnreachableCodeserver (Servant.baseUrl clientEnv) + +httpStreamEntities :: + forall. + Auth.AuthenticatedHttpClient -> + Servant.BaseUrl -> + SyncV2.DownloadEntitiesRequest -> + (SyncV2.StreamInitInfo -> Stream () SyncV2.EntityChunk -> StreamM ()) -> + StreamM () +httpStreamEntities (Auth.AuthenticatedHttpClient httpClient) unisonShareUrl req callback = do + let clientEnv = + (Servant.mkClientEnv httpClient unisonShareUrl) + { Servant.makeClientRequest = \url request -> + -- Disable client-side timeouts + (Servant.defaultMakeClientRequest url request) + <&> \r -> + r + { Http.Client.responseTimeout = Http.Client.responseTimeoutNone + } + } + (downloadEntitiesStreamClientM req) & withConduit clientEnv \stream -> do + (init, entityStream) <- initializeStream stream + callback init entityStream + -- | Peel the header off the stream and parse the remaining entity chunks into EntityChunks initializeStream :: Stream () SyncV2.DownloadEntitiesChunk -> StreamM (SyncV2.StreamInitInfo, Stream () SyncV2.EntityChunk) initializeStream stream = do diff --git a/unison-cli/unison-cli.cabal b/unison-cli/unison-cli.cabal index e1f51a7633..795180729d 100644 --- a/unison-cli/unison-cli.cabal +++ b/unison-cli/unison-cli.cabal @@ -152,6 +152,7 @@ library Unison.Share.ExpectedHashMismatches Unison.Share.Sync Unison.Share.Sync.Types + Unison.Share.Sync.Util Unison.Share.SyncV2 Unison.Util.HTTP Unison.Version diff --git a/unison-share-api/src/Unison/SyncV2/API.hs b/unison-share-api/src/Unison/SyncV2/API.hs new file mode 100644 index 0000000000..71ea8693d3 --- /dev/null +++ b/unison-share-api/src/Unison/SyncV2/API.hs @@ -0,0 +1,29 @@ +{-# LANGUAGE DataKinds #-} + +module Unison.SyncV2.API + ( API, + api, + Routes (..), + ) +where + +import Data.Proxy +import GHC.Generics (Generic) +import Servant.API +import Unison.SyncV2.Types +import Unison.Util.Servant.CBOR (CBOR) + +api :: Proxy API +api = Proxy + +type API = NamedRoutes Routes + +type DownloadEntitiesStream = + -- | The causal hash the client needs. The server should provide it and all of its dependencies + ReqBody '[CBOR, JSON] DownloadEntitiesRequest + :> StreamPost NetstringFraming CBOR (SourceIO DownloadEntitiesChunk) + +data Routes mode = Routes + { downloadEntitiesStream :: mode :- "entities" :> "download" :> DownloadEntitiesStream + } + deriving stock (Generic) diff --git a/unison-share-api/unison-share-api.cabal b/unison-share-api/unison-share-api.cabal index aaacbda4fd..a9447279da 100644 --- a/unison-share-api/unison-share-api.cabal +++ b/unison-share-api/unison-share-api.cabal @@ -48,6 +48,7 @@ library Unison.Sync.Common Unison.Sync.EntityValidation Unison.Sync.Types + Unison.SyncV2.API Unison.SyncV2.Types Unison.Util.Find Unison.Util.Servant.CBOR