diff --git a/docs/demo-client.md b/docs/demo-client.md index 84e6fb98..15d7b3e6 100644 --- a/docs/demo-client.md +++ b/docs/demo-client.md @@ -169,7 +169,7 @@ $ cabal run demo-client -- --core sayHelloBidiStream \ {message: "John Ack"} Disconnected. Reconnecting after 1701081μs Reconnecting now. -demo-client: GrpcException {grpcError = GrpcUnknown, grpcErrorMessage = Just "Call closed without trailers", grpcErrorMetadata = []} +demo-client: demo-client: ServerDisconnected {serverDisconnectedException = ..., ...} ``` ### Dealing with unterminated streams diff --git a/grapesy.cabal b/grapesy.cabal index e42f773d..268dc349 100644 --- a/grapesy.cabal +++ b/grapesy.cabal @@ -37,6 +37,7 @@ common lang -Wno-unticked-promoted-constructors -Wprepositive-qualified-module -Widentities + -Wmissing-export-lists build-depends: base >= 4.14 && < 4.21 default-language: @@ -309,14 +310,18 @@ test-suite test-grapesy Test.Prop.IncrementalParsing Test.Prop.Serialization Test.Sanity.BrokenDeployments + Test.Sanity.Disconnect Test.Sanity.EndOfStream + Test.Sanity.Exception Test.Sanity.Interop Test.Sanity.StreamingType.CustomFormat Test.Sanity.StreamingType.NonStreaming Test.Util Test.Util.Awkward + Test.Util.Exception Test.Util.Orphans Test.Util.Protobuf + Test.Util.RawTestServer -- Internals we're testing Network.GRPC.Util.Parser @@ -325,6 +330,7 @@ test-suite test-grapesy Proto.API.Interop Proto.API.Ping + Proto.API.Trivial Proto.Empty Proto.Messages Proto.Ping @@ -348,7 +354,6 @@ test-suite test-grapesy , lens >= 5.0 && < 5.4 , mtl >= 2.2 && < 2.4 , network >= 3.1 && < 3.3 - , network-run >= 0.4 && < 0.5 , prettyprinter >= 1.7 && < 1.8 , prettyprinter-ansi-terminal >= 1.1 && < 1.2 , proto-lens >= 0.7 && < 0.8 @@ -360,9 +365,11 @@ test-suite test-grapesy , tasty >= 1.4 && < 1.6 , tasty-hunit >= 0.10 && < 0.11 , tasty-quickcheck >= 0.10 && < 0.12 + , temporary >= 1.3 && < 1.4 , text >= 1.2 && < 2.2 , tls >= 1.7 && < 2.2 , tree-diff >= 0.3 && < 0.4 + , unix >= 2.7 && < 2.9 , utf8-string >= 1.0 && < 1.1 executable demo-client diff --git a/interop/Interop/Client/TestCase/CustomMetadata.hs b/interop/Interop/Client/TestCase/CustomMetadata.hs index a567298f..20daf465 100644 --- a/interop/Interop/Client/TestCase/CustomMetadata.hs +++ b/interop/Interop/Client/TestCase/CustomMetadata.hs @@ -21,7 +21,7 @@ import Proto.API.Interop -- For both UnaryCall and FullDuplexCall, the reference server (at least some) -- does not return any initial metadata until we send the first request. The -- test spec does not specify whether this is expected behaviour or not, so we --- play it save and only ask for the initial metadata after sending the request. +-- play it safe and only ask for the initial metadata after sending the request. runTest :: Cmdline -> IO () runTest cmdline = do withConnection def (testServer cmdline) $ \conn -> do diff --git a/interop/Main.hs b/interop/Main.hs index d1305ec5..d893fd2b 100644 --- a/interop/Main.hs +++ b/interop/Main.hs @@ -1,4 +1,4 @@ -module Main where +module Main (main) where import GHC.Conc (setUncaughtExceptionHandler) import System.IO diff --git a/proto/Proto/API/Trivial.hs b/proto/Proto/API/Trivial.hs new file mode 100644 index 00000000..b09b9f43 --- /dev/null +++ b/proto/Proto/API/Trivial.hs @@ -0,0 +1,21 @@ +{-# OPTIONS_GHC -Wno-orphans #-} + +module Proto.API.Trivial + ( -- * Trivial RPC + Trivial + , Trivial' + ) where + +import Network.GRPC.Common +import Network.GRPC.Spec + +{------------------------------------------------------------------------------- + Trivial RPC +-------------------------------------------------------------------------------} + +type Trivial = RawRpc "trivial" "trivial" +type Trivial' s = RawRpc "trivial" s + +type instance RequestMetadata (Trivial' s) = NoMetadata +type instance ResponseInitialMetadata (Trivial' s) = NoMetadata +type instance ResponseTrailingMetadata (Trivial' s) = NoMetadata diff --git a/src/Network/GRPC/Client.hs b/src/Network/GRPC/Client.hs index 271733fc..9738d19b 100644 --- a/src/Network/GRPC/Client.hs +++ b/src/Network/GRPC/Client.hs @@ -9,6 +9,7 @@ module Network.GRPC.Client ( -- ** Reconnection policy , ReconnectPolicy(..) + , ReconnectTo(..) , exponentialBackoff -- ** Connection parameters diff --git a/src/Network/GRPC/Client/Connection.hs b/src/Network/GRPC/Client/Connection.hs index 745011c7..070474e5 100644 --- a/src/Network/GRPC/Client/Connection.hs +++ b/src/Network/GRPC/Client/Connection.hs @@ -16,6 +16,7 @@ module Network.GRPC.Client.Connection ( , SslKeyLog(..) , ConnParams(..) , ReconnectPolicy(..) + , ReconnectTo(..) , exponentialBackoff -- * Using the connection , connParams @@ -165,13 +166,30 @@ data ReconnectPolicy = -- connection), do not attempt to connect again. DontReconnect - -- | Reconnect after random delay after the IO action returns + -- | Reconnect to the (potentially different) server after the IO action + -- returns + -- + -- The 'ReconnectTo' can be used to implement a rudimentary redundancy + -- scheme. For example, you could decide to reconnect to a known fallback + -- server after connection to a main server fails a certain number of times. -- -- This is a very general API: typically the IO action will call -- 'threadDelay' after some amount of time (which will typically involve -- some randomness), but it can be used to do things such as display a -- message to the user somewhere that the client is reconnecting. - | ReconnectAfter (IO ReconnectPolicy) + | ReconnectAfter ReconnectTo (IO ReconnectPolicy) + +-- | What server should we attempt to reconnect to? +-- +-- * 'ReconnectToPrevious' will attempt to reconnect to the last server we +-- attempted to connect to, whether or not that attempt was successful. +-- * 'ReconnectToOriginal' will attempt to reconnect to the original server that +-- 'withConnection' was given. +-- * 'ReconnectToNew' will attempt to connect to the newly specified server. +data ReconnectTo = + ReconnectToPrevious + | ReconnectToOriginal + | ReconnectToNew Server -- | The default policy is 'DontReconnect' -- @@ -180,6 +198,9 @@ data ReconnectPolicy = instance Default ReconnectPolicy where def = DontReconnect +instance Default ReconnectTo where + def = ReconnectToPrevious + -- | Exponential backoff -- -- If the exponent is @1@, the delay interval will be the same every step; @@ -207,7 +228,7 @@ exponentialBackoff waitFor e = go where go :: (Double, Double) -> Word -> ReconnectPolicy go _ 0 = DontReconnect - go (lo, hi) n = ReconnectAfter $ do + go (lo, hi) n = ReconnectAfter def $ do delay <- randomRIO (lo, hi) waitFor $ round $ delay * 1_000_000 return $ go (lo * e, hi * e) (pred n) @@ -378,11 +399,11 @@ stayConnected :: -> TVar ConnectionState -> MVar () -> IO () -stayConnected connParams server connStateVar connOutOfScope = - loop (connReconnectPolicy connParams) +stayConnected connParams initialServer connStateVar connOutOfScope = do + loop initialServer (connReconnectPolicy connParams) where - loop :: ReconnectPolicy -> IO () - loop remainingReconnectPolicy = do + loop :: Server -> ReconnectPolicy -> IO () + loop server remainingReconnectPolicy = do -- Start new attempt (this just allocates some internal state) attempt <- newConnectionAttempt connParams connStateVar connOutOfScope @@ -425,9 +446,16 @@ stayConnected connParams server connStateVar connOutOfScope = atomically $ writeTVar connStateVar $ ConnectionAbandoned err (False, DontReconnect) -> do atomically $ writeTVar connStateVar $ ConnectionAbandoned err - (False, ReconnectAfter f) -> do + atomically $ writeTVar connStateVar $ ConnectionAbandoned err + (False, ReconnectAfter to f) -> do + let + nextServer = + case to of + ReconnectToPrevious -> server + ReconnectToOriginal -> initialServer + ReconnectToNew new -> new atomically $ writeTVar connStateVar $ ConnectionNotReady - loop =<< f + loop nextServer =<< f -- | Insecure connection (no TLS) connectInsecure :: ConnParams -> Attempt -> Address -> IO () diff --git a/src/Network/GRPC/Server/Call.hs b/src/Network/GRPC/Server/Call.hs index bbbdbbfc..4f7f4113 100644 --- a/src/Network/GRPC/Server/Call.hs +++ b/src/Network/GRPC/Server/Call.hs @@ -628,7 +628,7 @@ recvEndOfInput call@Call{} = do -- | Send 'ProperTrailers' -- --- This function is not part of the public API: we use it the top-level +-- This function is not part of the public API: we use it as the top-level -- exception handler in "Network.GRPC.Server" to forward exceptions in server -- handlers to the client. -- diff --git a/src/Network/GRPC/Server/Handler.hs b/src/Network/GRPC/Server/Handler.hs index 8aeaea15..f323bc1c 100644 --- a/src/Network/GRPC/Server/Handler.hs +++ b/src/Network/GRPC/Server/Handler.hs @@ -272,7 +272,7 @@ waitForHandler unmask call handlerThread = loop -- -- The attempt to forward it to the client is a best-effort only: -- --- * The nature of the exception might mean that we we cannot send anything to +-- * The nature of the exception might mean that we cannot send anything to -- the client at all. -- * It is possible the exception was thrown /after/ the handler already send -- the trailers to the client. diff --git a/test-grapesy/Main.hs b/test-grapesy/Main.hs index a3575150..a62daf31 100644 --- a/test-grapesy/Main.hs +++ b/test-grapesy/Main.hs @@ -17,7 +17,9 @@ import Test.Prop.Dialogue qualified as Dialogue import Test.Prop.IncrementalParsing qualified as IncrementalParsing import Test.Prop.Serialization qualified as Serialization import Test.Sanity.BrokenDeployments qualified as BrokenDeployments +import Test.Sanity.Disconnect qualified as Disconnect import Test.Sanity.EndOfStream qualified as EndOfStream +import Test.Sanity.Exception qualified as Exception import Test.Sanity.Interop qualified as Interop import Test.Sanity.StreamingType.CustomFormat qualified as StreamingType.CustomFormat import Test.Sanity.StreamingType.NonStreaming qualified as StreamingType.NonStreaming @@ -28,11 +30,13 @@ main = do defaultMain $ testGroup "grapesy" [ testGroup "Sanity" [ - EndOfStream.tests + Disconnect.tests + , EndOfStream.tests , testGroup "StreamingType" [ StreamingType.NonStreaming.tests , StreamingType.CustomFormat.tests ] + , Exception.tests , Interop.tests , BrokenDeployments.tests ] diff --git a/test-grapesy/Test/Driver/ClientServer.hs b/test-grapesy/Test/Driver/ClientServer.hs index d9d70bb4..bbde47ca 100644 --- a/test-grapesy/Test/Driver/ClientServer.hs +++ b/test-grapesy/Test/Driver/ClientServer.hs @@ -40,6 +40,7 @@ import Network.GRPC.Common import Network.GRPC.Common.Compression qualified as Compr import Network.GRPC.Server qualified as Server import Network.GRPC.Server.Run qualified as Server +import Test.Util.Exception import Paths_grapesy @@ -168,12 +169,6 @@ data TlsFail = we don't see these exceptions server-side. -------------------------------------------------------------------------------} --- | Exception thrown by client or handler to test exception handling -data DeliberateException = forall e. Exception e => DeliberateException e - deriving anyclass (Exception) - -deriving stock instance Show DeliberateException - isExpectedServerException :: ClientServerConfig -> SomeException -> Bool isExpectedServerException cfg e -- @@ -232,7 +227,7 @@ isExpectedClientException cfg e | Just (DeliberateException _) <- fromException e = True - -- Server threw deliberat exception + -- Server threw deliberate exception | Just grpcException <- fromException e , Just msg <- grpcErrorMessage grpcException , "DeliberateException" `Text.isInfixOf` msg @@ -538,7 +533,7 @@ runTestClient cfg firstTestFailure port clientRun = do -- This avoids a race condition between the server starting first -- and the client starting first. , connReconnectPolicy = - Client.ReconnectAfter $ do + Client.ReconnectAfter def $ do threadDelay 100_000 return Client.DontReconnect } diff --git a/test-grapesy/Test/Driver/Dialogue/Definition.hs b/test-grapesy/Test/Driver/Dialogue/Definition.hs index 802de12d..53541d04 100644 --- a/test-grapesy/Test/Driver/Dialogue/Definition.hs +++ b/test-grapesy/Test/Driver/Dialogue/Definition.hs @@ -20,7 +20,6 @@ module Test.Driver.Dialogue.Definition ( , hasEarlyTermination ) where -import Control.Exception import Control.Monad.State (StateT, execStateT, modify) import Data.Bifunctor import Data.ByteString qualified as Strict (ByteString) @@ -28,6 +27,7 @@ import Data.ByteString qualified as Strict (ByteString) import Network.GRPC.Common import Test.Driver.Dialogue.TestClock qualified as TestClock +import Test.Util.Exception import Control.Monad.Catch import GHC.Show (appPrec1, showCommaSpace) @@ -154,25 +154,6 @@ newtype GlobalSteps = GlobalSteps { } deriving stock (Show) -{------------------------------------------------------------------------------- - User exceptions - - When a test calls for the client or the server to throw an exception, we throw - one of these. Their sole purpose is to be "any" kind of exception (not a - specific one). --------------------------------------------------------------------------------} - -data SomeServerException = SomeServerException ExceptionId - deriving stock (Show, Eq) - deriving anyclass (Exception) - -data SomeClientException = SomeClientException ExceptionId - deriving stock (Show, Eq) - deriving anyclass (Exception) - --- | We distinguish exceptions from each other simply by a number -type ExceptionId = Int - {------------------------------------------------------------------------------- Utility -------------------------------------------------------------------------------} diff --git a/test-grapesy/Test/Driver/Dialogue/Execution.hs b/test-grapesy/Test/Driver/Dialogue/Execution.hs index 25f2ba03..13eea7bb 100644 --- a/test-grapesy/Test/Driver/Dialogue/Execution.hs +++ b/test-grapesy/Test/Driver/Dialogue/Execution.hs @@ -413,6 +413,8 @@ serverLocal clock call = \(LocalSteps steps) -> do Terminate mErr -> do mInp <- liftIO $ try $ within timeoutReceive action $ Server.Binary.recvInput call + -- TODO: + -- -- On the server side we cannot distinguish regular client -- termination from an exception when receiving. let expectation = isExpectedElem $ NoMoreElems NoMetadata @@ -426,6 +428,12 @@ serverLocal clock call = \(LocalSteps steps) -> do -- terminate more-or-less immediately, this does not necessarily indicate -- any kind of failure: the client may simply have put the call in -- half-closed mode. + -- + -- TODO: + -- However, when the client terminates early and we are not using one + -- connection per RPC (i.e. we are sharing a connection), the server will + -- /never/ realize that the client has disappeared. See the discussion in + -- the issue above. waitForClientDisconnect :: IO () waitForClientDisconnect = within timeoutFailure () $ loop diff --git a/test-grapesy/Test/Sanity/BrokenDeployments.hs b/test-grapesy/Test/Sanity/BrokenDeployments.hs index a36bb613..ec0ea9a1 100644 --- a/test-grapesy/Test/Sanity/BrokenDeployments.hs +++ b/test-grapesy/Test/Sanity/BrokenDeployments.hs @@ -3,26 +3,18 @@ module Test.Sanity.BrokenDeployments (tests) where -import Control.Concurrent -import Control.Concurrent.Async import Control.Exception -import Data.ByteString qualified as BS.Strict -import Data.ByteString qualified as Strict (ByteString) -import Data.ByteString.Builder qualified as BS.Builder import Data.ByteString.Char8 qualified as BS.Strict.Char8 import Data.ByteString.UTF8 qualified as BS.Strict.UTF8 -import Data.String (fromString) import Data.Text qualified as Text import Network.HTTP.Types qualified as HTTP -import Network.HTTP2.Server qualified as HTTP2 -import Network.Run.TCP qualified as NetworkRun -import Network.Socket import Test.Tasty import Test.Tasty.HUnit import Network.GRPC.Client qualified as Client import Network.GRPC.Common import Network.GRPC.Common.Protobuf +import Test.Util.RawTestServer import Proto.API.Ping @@ -54,6 +46,11 @@ tests = testGroup "Test.Sanity.BrokenDeployments" [ ] ] +connParams :: Client.ConnParams +connParams = def { + Client.connVerifyHeaders = True + } + {------------------------------------------------------------------------------- HTTP Status -------------------------------------------------------------------------------} @@ -322,101 +319,6 @@ test_invalidTrailerMetadata = respondWith response $ \addr -> do someInvalidMetadata :: String someInvalidMetadata = "This is invalid: 你好" -{------------------------------------------------------------------------------- - Test server - - This allows us to simulate broken /servers/. --------------------------------------------------------------------------------} - -data Response = Response { - responseStatus :: HTTP.Status - , responseHeaders :: [HTTP.Header] - , responseBody :: Strict.ByteString - , responseTrailers :: [HTTP.Header] - } - -instance Default Response where - def = Response { - responseStatus = HTTP.ok200 - , responseHeaders = [ asciiHeader "content-type" "application/grpc" ] - , responseBody = BS.Strict.empty - , responseTrailers = [ asciiHeader "grpc-status" "0" ] - } - --- | Server that responds with the given 'Response', independent of the request -respondWith :: Response -> (Client.Address -> IO a) -> IO a -respondWith response = withTestServer $ \_req _aux respond -> - respond http2Response [] - where - http2Response :: HTTP2.Response - http2Response = - flip HTTP2.setResponseTrailersMaker trailersMaker $ - HTTP2.responseBuilder - (responseStatus response) - (responseHeaders response) - (BS.Builder.byteString $ responseBody response) - - trailersMaker :: HTTP2.TrailersMaker - trailersMaker Nothing = return $ HTTP2.Trailers (responseTrailers response) - trailersMaker (Just _) = return $ HTTP2.NextTrailersMaker trailersMaker - --- | Low-level test server --- --- We bypass the entire grapesy machinery for constructing the server, because --- we need to mock a broken deployment. --- --- The grapesy client can auto reconnect when the server is not (yet) up and --- running, but to keep things simple, and since the server anyway runs in the --- same process, we just signal when the server is ready. This also allows us --- to avoid binding to a specific port in the tests (which might already be in --- use on the machine running the tests, leading to spurious test failures). -testServer :: HTTP2.Server -> MVar PortNumber -> IO () -testServer server serverPort = do - addr <- NetworkRun.resolve Stream (Just "127.0.0.1") "0" [AI_PASSIVE] - bracket (NetworkRun.openTCPServerSocket addr) close $ \listenSock -> do - addr' <- getSocketName listenSock - port <- case addr' of - SockAddrInet port _host -> return port - SockAddrInet6 port _ _host _ -> return port - SockAddrUnix{} -> error "respondWith: unexpected unix socket" - putMVar serverPort port - NetworkRun.runTCPServerWithSocket listenSock $ \clientSock -> - bracket (HTTP2.allocSimpleConfig clientSock 4096) - HTTP2.freeSimpleConfig $ \config -> - HTTP2.run HTTP2.defaultServerConfig config server - -withTestServer :: HTTP2.Server -> (Client.Address -> IO a) -> IO a -withTestServer server k = do - serverPort <- newEmptyMVar - withAsync (testServer server serverPort) $ \_serverThread -> do - port <- readMVar serverPort - let addr :: Client.Address - addr = Client.Address { - addressHost = "127.0.0.1" - , addressPort = port - , addressAuthority = Nothing - } - k addr - -{------------------------------------------------------------------------------- - Auxiliary --------------------------------------------------------------------------------} - -connParams :: Client.ConnParams -connParams = def { - Client.connVerifyHeaders = True - } - --- | Header with ASCII value --- --- (Header /names/ are always ASCII.) -asciiHeader :: String -> String -> HTTP.Header -asciiHeader name value = (fromString name, BS.Strict.Char8.pack value) - --- | Header with UTF-8 encoded value -utf8Header :: String -> String -> HTTP.Header -utf8Header name value = (fromString name, BS.Strict.UTF8.fromString value) - grpcMessageContains :: GrpcException -> String -> Bool grpcMessageContains GrpcException{grpcErrorMessage} str = case grpcErrorMessage of diff --git a/test-grapesy/Test/Sanity/Disconnect.hs b/test-grapesy/Test/Sanity/Disconnect.hs new file mode 100644 index 00000000..4a265277 --- /dev/null +++ b/test-grapesy/Test/Sanity/Disconnect.hs @@ -0,0 +1,320 @@ +{-# OPTIONS_GHC -Wno-orphans #-} + +-- | Handling of client or server disconnections occurring with ongoing RPCs on +-- a shared connection. +-- +-- When a server disconnects, we expect: +-- +-- 1. All current calls fail with 'Client.ServerDisconnected' +-- 2. Future calls (after reconnection) succeed +-- +-- When a client disconnects, we expect: +-- +-- 1. The handlers dealing with that client (i.e. on that connection) should +-- fail with 'Server.ClientDisonnected' +-- 2. Future calls (after reconnection) succeed +module Test.Sanity.Disconnect (tests) where + +import Control.Concurrent +import Control.Concurrent.Async +import Control.Exception +import Control.Monad +import Data.ByteString.Lazy qualified as Lazy (ByteString) +import Data.Either +import Data.IORef +import Data.Maybe +import Data.Word +import Foreign.C.Types (CInt(..)) +import Network.Socket +import System.Posix +import Test.Tasty +import Test.Tasty.HUnit +import Text.Read hiding (step) + +import Network.GRPC.Client qualified as Client +import Network.GRPC.Client.Binary qualified as Binary +import Network.GRPC.Common +import Network.GRPC.Server qualified as Server +import Network.GRPC.Server.Binary qualified as Binary +import Network.GRPC.Server.Run +import Network.GRPC.Spec +import Proto.API.Trivial +import Test.Util + +tests :: TestTree +tests = testGroup "Test.Sanity.Disconnect" [ + testCase "client" test_clientDisconnect + , testCase "server" test_serverDisconnect + ] + +-- | We want two distinct handlers running at the same time, so we have two +-- trivial RPCs +type RPC1 = Trivial' "rpc1" + +-- | See 'RPC1' +type RPC2 = Trivial' "rpc2" + +-- | Two separate clients make many concurrent calls, one of them disconnects. +test_clientDisconnect :: Assertion +test_clientDisconnect = do + -- Create the server + disconnectCounter1 <- newIORef 0 + disconnectCounter2 <- newIORef 0 + server <- + Server.mkGrpcServer def [ + Server.someRpcHandler $ + Server.mkRpcHandler @RPC1 $ echoHandler (Just disconnectCounter1) + , Server.someRpcHandler $ + Server.mkRpcHandler @RPC2 $ echoHandler (Just disconnectCounter2) + ] + + -- Start server + let serverConfig = ServerConfig { + serverInsecure = Just $ InsecureConfig { + insecureHost = Just "127.0.0.1" + , insecurePort = 0 + } + , serverSecure = Nothing + } + portSignal <- newEmptyMVar + void $ forkIO $ forkServer def serverConfig server $ \runningServer -> do + putMVar portSignal =<< getServerPort runningServer + waitServer runningServer + + -- Wait for the server to signal its port + serverPort <- readMVar portSignal + let serverAddress = + Client.ServerInsecure Client.Address { + addressHost = "127.0.0.1" + , addressPort = serverPort + , addressAuthority = Nothing + } + + + -- Start a client in a separate process + let numCalls = 50 + void $ forkProcess $ + Client.withConnection def serverAddress $ \conn -> do + -- Make 50 concurrent calls. 49 of them sending infinite messages. One + -- of them kills this client process after 100 messages. + mapConcurrently_ + ( Client.withRPC conn def (Proxy @RPC1) + . runSteps + ) + $ replicate (numCalls - 1) stepsInfinite ++ + [ mkClientSteps Nothing [ (100, c_exit 1) ] ] + + -- Start two more clients that make 50 calls to each handler, all calls + -- counting up to 100 + let numSteps = 100 + steps = replicate numCalls $ stepsN numSteps + (result1, result2) <- concurrently + ( Client.withConnection def serverAddress $ \conn -> do + sum <$> mapConcurrently + ( Client.withRPC conn def (Proxy @RPC1) + . runSteps + ) + steps + ) + ( Client.withConnection def serverAddress $ \conn -> do + sum <$> mapConcurrently + ( Client.withRPC conn def (Proxy @RPC2) + . runSteps + ) + steps + ) + + -- All calls by clients in /this/ process (not the ones we killed) should + -- have finished with a result of 'countTo' + assertEqual "" + (2 * sum (replicate numCalls numSteps)) + (fromIntegral $ result1 + result2) + + -- We should also see only 50 client disconnects for the first handler and + -- none for the second + clientDisconnects1 <- readIORef disconnectCounter1 + clientDisconnects2 <- readIORef disconnectCounter2 + assertEqual "" 50 clientDisconnects1 + assertEqual "" 0 clientDisconnects2 + +-- | Client makes many concurrent calls, server disconnects +test_serverDisconnect :: Assertion +test_serverDisconnect = withTemporaryFile $ \ipcFile -> do + -- We use a temporary file as a very rudimentary means of inter-process + -- communication so the server (which runs in a separate process) can make + -- the client aware of the port it is assigned by the OS. + let ipcWrite :: PortNumber -> IO () + ipcWrite port = do + writeFile ipcFile (show port) + + ipcRead :: IO PortNumber + ipcRead = do + fmap (readMaybe @PortNumber) (readFile ipcFile) >>= \case + Nothing -> do + ipcRead + Just p -> do + writeFile ipcFile "" + return p + + -- Create the server + server <- + Server.mkGrpcServer def [ + Server.someRpcHandler $ + Server.mkRpcHandler @Trivial $ echoHandler Nothing + ] + + let serverConfig = ServerConfig { + serverInsecure = Just $ InsecureConfig { + insecureHost = Just "127.0.0.1" + , insecurePort = 0 + } + , serverSecure = Nothing + } + + -- Starts the server in a new process. Gives back an action that kills + -- the created server process. + startServer :: IO (IO ()) + startServer = do + serverPid <- + forkProcess $ + forkServer def serverConfig server $ \runningServer -> do + ipcWrite =<< getServerPort runningServer + waitServer runningServer + return $ signalProcess sigKILL serverPid + + -- Start server, get the initial port + killServer <- startServer + port1 <- ipcRead + signalRestart <- newEmptyMVar + let serverAddress port = + Client.ServerInsecure Client.Address { + addressHost = "127.0.0.1" + , addressPort = port + , addressAuthority = Nothing + } + + reconnectPolicy :: Client.ReconnectPolicy + reconnectPolicy = go 0 + where + go :: Int -> Client.ReconnectPolicy + go n + | n == 5 + = Client.ReconnectAfter def $ do + killRestarted <- startServer + port2 <- ipcRead + putMVar signalRestart killRestarted + return $ + Client.ReconnectAfter + (Client.ReconnectToNew $ serverAddress port2) + (pure Client.DontReconnect) + | otherwise + = Client.ReconnectAfter def $ do + threadDelay 10000 + return $ go (n + 1) + + connParams :: Client.ConnParams + connParams = def { Client.connReconnectPolicy = reconnectPolicy } + + Client.withConnection connParams (serverAddress port1) $ \conn -> do + -- Make 50 concurrent calls. 49 of them sending infinite messages. One + -- of them kills the server after 100 messages. + let numCalls = 50 + results <- + mapConcurrently + ( try @Client.ServerDisconnected + . Client.withRPC conn def (Proxy @Trivial) + . runSteps + ) + $ replicate (numCalls - 1) stepsInfinite ++ + [ mkClientSteps Nothing [(100, killServer)] ] + + -- All calls should have failed + assertBool "" (null (rights results)) + assertEqual "" numCalls (length (lefts results)) + + -- New calls should succeed (after reconnection) + killRestarted <- takeMVar signalRestart + result <- + Client.withRPC conn def (Proxy @Trivial) $ + runSteps (stepsN numCalls) + assertEqual "" numCalls (fromIntegral result) + + -- Do not leave the server process hanging around + killRestarted + +{------------------------------------------------------------------------------- + Client and handler functions +-------------------------------------------------------------------------------} + +-- | Execute the client steps +runSteps :: forall rpc. + ( Input rpc ~ Lazy.ByteString + , Output rpc ~ Lazy.ByteString + , ResponseTrailingMetadata rpc ~ NoMetadata + ) => ClientStep -> Client.Call rpc -> IO Word64 +runSteps = + go 0 + where + go :: Word64 -> ClientStep -> Client.Call rpc -> IO Word64 + go n step call = do + case step of + KeepGoing mact next -> do + fromMaybe (return ()) mact + Binary.sendNextInput @Word64 call n + _ <- Binary.recvNextOutput @Word64 call + go (n + 1) next call + Done -> do + Binary.sendFinalInput @Word64 call n + (_, NoMetadata) <- Binary.recvFinalOutput @Word64 call + return n + +-- | Echos any input +echoHandler :: + ( Input rpc ~ Lazy.ByteString + , Output rpc ~ Lazy.ByteString + , ResponseTrailingMetadata rpc ~ NoMetadata + ) => Maybe (IORef Int) -> Server.Call rpc -> IO () +echoHandler disconnectCounter call = trackDisconnects disconnectCounter $ do + Binary.recvInput @Word64 call >>= \case + StreamElem n -> do + Binary.sendNextOutput @Word64 call n + echoHandler disconnectCounter call + FinalElem n _ -> do + Binary.sendFinalOutput @Word64 call (n, NoMetadata) + NoMoreElems _ -> do + Server.sendTrailers call NoMetadata + where + trackDisconnects Nothing = + id + trackDisconnects (Just counter) = + handle ( + \(_e :: Server.ClientDisconnected) -> + atomicModifyIORef' counter $ \n -> (n + 1, ()) + ) + +{------------------------------------------------------------------------------- + Auxiliary +-------------------------------------------------------------------------------} + +-- We need to use this to properly simulate the execution environment crashing +-- in an unrecoverable way. In particular, we don't want to give the program a +-- chance to do any of its normal exception handling/cleanup behavior. +foreign import ccall unsafe "exit" c_exit :: CInt -> IO () + +data ClientStep = KeepGoing (Maybe (IO ())) ClientStep | Done + +mkClientSteps :: Maybe Int -> [(Int, IO ())] -> ClientStep +mkClientSteps = go 0 + where + go !i mn acts + | maybe False (i >=) mn + = Done + | otherwise + = KeepGoing (lookup i acts) $ go (i + 1) mn acts + +stepsN :: Int -> ClientStep +stepsN n = mkClientSteps (Just n) [] + +{-# INLINE stepsInfinite #-} +stepsInfinite :: ClientStep +stepsInfinite = mkClientSteps Nothing [] diff --git a/test-grapesy/Test/Sanity/EndOfStream.hs b/test-grapesy/Test/Sanity/EndOfStream.hs index 26e7dea7..97144f2b 100644 --- a/test-grapesy/Test/Sanity/EndOfStream.hs +++ b/test-grapesy/Test/Sanity/EndOfStream.hs @@ -101,7 +101,7 @@ test_recvTrailers = testClientServer $ ClientServerTest { config = def , server = [Server.fromMethod nonStreamingHandler] , client = simpleTestClient $ \conn -> - Client.withRPC conn def (Proxy @Trivial) $ \call -> do + Client.withRPC conn def (Proxy @Poke) $ \call -> do Client.sendFinalInput call BS.Lazy.empty resp <- Client.recvNextOutput call @@ -123,7 +123,7 @@ test_recvTrailers = testClientServer $ ClientServerTest { -------------------------------------------------------------------------------} -- | Receive any string, respond with a single 'mempty' -type Trivial = RawRpc "Test" "trivial" +type Poke = RawRpc "Test" "trivial" -- | Service that simply absorbs all messages and then returns with 'mempty' type Absorb = RawRpc "Test" "absorb" @@ -132,7 +132,7 @@ type Absorb = RawRpc "Test" "absorb" -- client with a bunch of 'mempty' messages type Spam = RawRpc "Test" "spam" -nonStreamingHandler :: ServerHandler' NonStreaming IO Trivial +nonStreamingHandler :: ServerHandler' NonStreaming IO Poke nonStreamingHandler = Server.mkNonStreaming $ \_inp -> return BS.Lazy.empty @@ -161,13 +161,13 @@ test_recvInput = testClientServer $ ClientServerTest { config = def , server = [Server.someRpcHandler handler] , client = simpleTestClient $ \conn -> - Client.withRPC conn def (Proxy @Trivial) $ \call -> do + Client.withRPC conn def (Proxy @Poke) $ \call -> do Client.sendFinalInput call BS.Lazy.empty _resp <- Client.recvFinalOutput call return () } where - handler :: Server.RpcHandler IO Trivial + handler :: Server.RpcHandler IO Poke handler = Server.mkRpcHandler $ \call -> do x <- Server.recvInput call @@ -189,13 +189,13 @@ test_recvEndOfInput = testClientServer $ ClientServerTest { config = def , server = [Server.someRpcHandler handler] , client = simpleTestClient $ \conn -> - Client.withRPC conn def (Proxy @Trivial) $ \call -> do + Client.withRPC conn def (Proxy @Poke) $ \call -> do Client.sendFinalInput call BS.Lazy.empty _resp <- Client.recvFinalOutput call return () } where - handler :: Server.RpcHandler IO Trivial + handler :: Server.RpcHandler IO Poke handler = Server.mkRpcHandler $ \call -> do resp <- Server.recvNextInput call assertEqual "resp" BS.Lazy.empty $ resp diff --git a/test-grapesy/Test/Sanity/Exception.hs b/test-grapesy/Test/Sanity/Exception.hs new file mode 100644 index 00000000..0e97e6a5 --- /dev/null +++ b/test-grapesy/Test/Sanity/Exception.hs @@ -0,0 +1,189 @@ +{-# LANGUAGE OverloadedStrings #-} +{-# OPTIONS_GHC -Wno-orphans #-} + +-- | Handling of exceptions occurring in an RPC on a shared connection. +-- +-- These tests check for the behavior described in +-- . In particular, there are +-- two conditions that should hold when an exception occurs in the scope of a +-- call (on either the server or client, e.g. inside either 'mkRpcHandler' or +-- 'withRPC'): +-- +-- 1. Other ongoing calls on that connection are not terminated (client), and +-- handlers dealing with other calls on that connection are not terminated +-- (server), and +-- 2. future calls are still possible (client), and more handlers can be started +-- to deal with future calls (server). +module Test.Sanity.Exception (tests) where + +import Control.Concurrent.Async +import Control.Exception +import Control.Monad +import Data.Either +import Data.IORef +import Data.Text qualified as Text +import Data.Word +import Test.Tasty +import Test.Tasty.HUnit + +import Network.GRPC.Client qualified as Client +import Network.GRPC.Client.Binary qualified as Binary +import Network.GRPC.Common +import Network.GRPC.Server qualified as Server +import Network.GRPC.Server.Binary qualified as Binary +import Proto.API.Trivial +import Test.Driver.ClientServer +import Test.Util.Exception + +tests :: TestTree +tests = testGroup "Test.Sanity.Exception" [ + testCase "client" test_clientException + , testCase "server" test_serverException + + , testCase "earlyTerminationNoWait" test_earlyTerminationNoWait + ] + +-- | Client makes many concurrent calls, throws an exception during one of them. +test_clientException :: IO () +test_clientException = testClientServer $ ClientServerTest { + config = def + , client = simpleTestClient $ \conn -> do + -- Make 100 concurrent calls. 99 of them counting to 50, and one + -- more that throws an exception once it reaches 10. + let + predicate = (> 50) + predicates = + replicate 99 predicate ++ + [ \n -> + (n > 10) + && throw (DeliberateException $ SomeClientException 1) + ] + + results <- + mapConcurrently + ( try @DeliberateException + . Client.withRPC conn def (Proxy @Trivial) + . countUntil + ) + predicates + + -- Only one of the calls failed + assertEqual "" (length $ lefts results) 1 + + -- All others terminated with results satisfying the predicate + assertBool "" (all predicate $ rights results) + + -- New calls still succeed + assertBool "" . predicate + =<< Client.withRPC conn def (Proxy @Trivial) (countUntil predicate) + , server = [ + Server.someRpcHandler $ + Server.mkRpcHandler @Trivial incUntilFinal + ] + } + +-- | Client makes many concurrent calls, the handler throws an exception during +-- one of them. +test_serverException :: IO () +test_serverException = do + handlerCounter <- newIORef @Int 0 + testClientServer $ ClientServerTest { + config = def { expectEarlyServerTermination = True } + , client = simpleTestClient $ \conn -> do + -- Make 100 concurrent calls counting to 50. + let predicate = (> 50) + results <- + replicateConcurrently 100 $ + try @GrpcException + . Client.withRPC conn def (Proxy @Trivial) + $ countUntil predicate + + -- Only one of the calls failed, and we got the appropriate + -- exception + case lefts results of + [GrpcException GrpcUnknown (Just msg) []] -> do + assertBool "" $ "DeliberateException" `Text.isInfixOf` msg + assertBool "" $ "SomeServerException 1" `Text.isInfixOf` msg + _ -> + assertFailure "" + + -- All others terminated with results satisfying the predicate + assertBool "" (all predicate $ rights results) + + -- New calls still succeed + assertBool "" . predicate + =<< Client.withRPC conn def (Proxy @Trivial) (countUntil predicate) + , server = [ + Server.someRpcHandler $ + Server.mkRpcHandler @Trivial $ \call -> do + handlerCount <- + atomicModifyIORef' handlerCounter (\n -> (n + 1, n)) + when (handlerCount == 25) $ + throwIO $ + DeliberateException $ SomeServerException 1 + incUntilFinal call + ] + } + +-- | This is essentially 'Test.Prop.Dialogue.earlyTermination15', but the server +-- does not wait for client termination. +test_earlyTerminationNoWait :: IO () +test_earlyTerminationNoWait = testClientServer $ ClientServerTest { + config = def + , client = simpleTestClient $ \conn -> do + _mResult <- + try @DeliberateException $ + Client.withRPC conn def (Proxy @Trivial) $ \_call -> + throwIO (DeliberateException $ SomeServerException 0) + + Client.withRPC conn def (Proxy @Trivial) $ \call -> do + Binary.sendFinalInput @Word8 call 0 + _output <- Binary.recvOutput @Word8 call + return () + , server = [ + Server.someRpcHandler $ + Server.mkRpcHandler @Trivial $ \call -> + Binary.recvInput @Word8 call >>= \case + _ -> Server.sendTrailers call NoMetadata + ] + } + +{------------------------------------------------------------------------------- + Client and handler functions +-------------------------------------------------------------------------------} + +-- | Send numbers to the server until it responds with one that satisfies the +-- given predicate. +countUntil :: (Word64 -> Bool) -> Client.Call Trivial -> IO Word64 +countUntil = go 0 + where + go :: Word64 -> (Word64 -> Bool) -> Client.Call Trivial -> IO Word64 + go next p call + | p next + = do + Binary.sendFinalInput @Word64 call next + (_final, NoMetadata) <- Binary.recvFinalOutput @Word64 call + return next + | otherwise + = do + Binary.sendNextInput @Word64 call next + next' <- Binary.recvNextOutput @Word64 call + go next' p call + +-- | Reads numbers from the client and sends them back incremented by one. +incUntilFinal :: Server.Call Trivial -> IO () +incUntilFinal call = do + Binary.recvInput call >>= \case + StreamElem n -> do + Binary.sendNextOutput @Word64 call $ succ n + incUntilFinal call + FinalElem n _ -> do + Binary.sendFinalOutput @Word64 call (succ n, NoMetadata) + NoMoreElems _ -> do + -- TODO: + -- + -- We shouldn't need to handle this case, since our client never + -- explicitly sends 'NoMoreElems'. However, see discussion in the + -- ticket above. + Server.sendTrailers call NoMetadata + return () diff --git a/test-grapesy/Test/Util.hs b/test-grapesy/Test/Util.hs index f5510ba3..c0356271 100644 --- a/test-grapesy/Test/Util.hs +++ b/test-grapesy/Test/Util.hs @@ -4,6 +4,9 @@ module Test.Util ( -- * Timeouts Timeout(..) , within + + -- * Files + , withTemporaryFile ) where import Control.Concurrent @@ -11,6 +14,8 @@ import Control.Exception import Control.Monad.Catch import Control.Monad.IO.Class import GHC.Stack +import System.IO +import System.IO.Temp {------------------------------------------------------------------------------- Timeouts @@ -45,4 +50,6 @@ within t info io = do fmap fst $ generalBracket startTimer stopTimer $ \_ -> io - +withTemporaryFile :: (FilePath -> IO a) -> IO a +withTemporaryFile k = + withSystemTempFile "grapesy-test-suite.txt" (\fp h -> hClose h >> k fp) diff --git a/test-grapesy/Test/Util/Exception.hs b/test-grapesy/Test/Util/Exception.hs new file mode 100644 index 00000000..5b0af1f8 --- /dev/null +++ b/test-grapesy/Test/Util/Exception.hs @@ -0,0 +1,36 @@ +-- | Utility exception types for the tests +module Test.Util.Exception + ( -- * User exceptions + SomeServerException(..) + , SomeClientException(..) + + -- * Deliberate exceptions + , DeliberateException(..) + , ExceptionId + ) where + +import Control.Exception + +{------------------------------------------------------------------------------- + User exceptions + + When a test calls for the client or the server to throw an exception, we throw + one of these. Their sole purpose is to be "any" kind of exception (not a + specific one). +-------------------------------------------------------------------------------} + +data SomeServerException = SomeServerException ExceptionId + deriving stock (Show, Eq) + deriving anyclass (Exception) + +data SomeClientException = SomeClientException ExceptionId + deriving stock (Show, Eq) + deriving anyclass (Exception) + +-- | Exception thrown by client or handler to test exception handling +data DeliberateException = forall e. Exception e => DeliberateException e + deriving anyclass (Exception) +deriving stock instance Show DeliberateException + +-- | We distinguish exceptions from each other simply by a number +type ExceptionId = Int diff --git a/test-grapesy/Test/Util/RawTestServer.hs b/test-grapesy/Test/Util/RawTestServer.hs new file mode 100644 index 00000000..03362f23 --- /dev/null +++ b/test-grapesy/Test/Util/RawTestServer.hs @@ -0,0 +1,92 @@ +module Test.Util.RawTestServer + ( -- * Raw test server + respondWith + + -- * Abstract response type + , Response(..) + , asciiHeader + , utf8Header + ) where + +import Data.ByteString qualified as BS.Strict +import Data.ByteString qualified as Strict (ByteString) +import Data.ByteString.Builder qualified as BS.Builder +import Data.ByteString.Char8 qualified as BS.Strict.Char8 +import Data.ByteString.UTF8 qualified as BS.Strict.UTF8 +import Data.String (fromString) +import Network.HTTP2.Server qualified as HTTP2 + +import Network.GRPC.Client qualified as Client +import Network.GRPC.Common +import Network.GRPC.Server.Run +import Network.HTTP.Types qualified as HTTP + +{------------------------------------------------------------------------------- + Raw test server + + This allows us to simulate broken /servers/. +-------------------------------------------------------------------------------} + +-- | Run the server and apply the continuation to an 'Client.Address' holding +-- the running server's host and port. +withTestServer :: HTTP2.Server -> (Client.Address -> IO a) -> IO a +withTestServer server k = do + let serverConfig = + ServerConfig { + serverInsecure = Just $ InsecureConfig { + insecureHost = Just "127.0.0.1" + , insecurePort = 0 + } + , serverSecure = Nothing + } + forkServer def serverConfig server $ \runningServer -> do + port <- getServerPort runningServer + let addr :: Client.Address + addr = Client.Address { + addressHost = "127.0.0.1" + , addressPort = port + , addressAuthority = Nothing + } + k addr + +-- | Server that responds with the given 'Response', independent of the request +respondWith :: Response -> (Client.Address -> IO a) -> IO a +respondWith response = withTestServer $ \_req _aux respond -> + respond (toHTTP2Response response) [] + +data Response = Response { + responseStatus :: HTTP.Status + , responseHeaders :: [HTTP.Header] + , responseBody :: Strict.ByteString + , responseTrailers :: [HTTP.Header] + } + +instance Default Response where + def = Response { + responseStatus = HTTP.ok200 + , responseHeaders = [ asciiHeader "content-type" "application/grpc" ] + , responseBody = BS.Strict.empty + , responseTrailers = [ asciiHeader "grpc-status" "0" ] + } + +toHTTP2Response :: Response -> HTTP2.Response +toHTTP2Response response = + flip HTTP2.setResponseTrailersMaker trailersMaker $ + HTTP2.responseBuilder + (responseStatus response) + (responseHeaders response) + (BS.Builder.byteString $ responseBody response) + where + trailersMaker :: HTTP2.TrailersMaker + trailersMaker Nothing = return $ HTTP2.Trailers (responseTrailers response) + trailersMaker (Just _) = return $ HTTP2.NextTrailersMaker trailersMaker + +-- | Header with ASCII value +-- +-- (Header /names/ are always ASCII.) +asciiHeader :: String -> String -> HTTP.Header +asciiHeader name value = (fromString name, BS.Strict.Char8.pack value) + +-- | Header with UTF-8 encoded value +utf8Header :: String -> String -> HTTP.Header +utf8Header name value = (fromString name, BS.Strict.UTF8.fromString value) diff --git a/util/Network/GRPC/Util/HTTP2/Stream.hs b/util/Network/GRPC/Util/HTTP2/Stream.hs index f5e155d8..55d74e53 100644 --- a/util/Network/GRPC/Util/HTTP2/Stream.hs +++ b/util/Network/GRPC/Util/HTTP2/Stream.hs @@ -16,6 +16,7 @@ module Network.GRPC.Util.HTTP2.Stream ( -- * Exceptions , ClientDisconnected(..) , ServerDisconnected(..) + , wrapStreamExceptionsWith ) where import Control.Exception @@ -153,18 +154,21 @@ clientInputStream resp = do maybe [] fromHeaderTable <$> Client.getResponseTrailers resp } +-- | Construct a client 'OutputStream' +-- +-- We do not wrap the members of the 'OutputStream' with +-- 'wrapStreamExceptionsWith', since we do this around the entire +-- 'sendMessageLoop'. See the comment for @outboundThread@ in +-- 'Network.GRPC.Util.Session.Client.setupRequestChannel'. clientOutputStream :: OutBodyIface -> IO OutputStream clientOutputStream iface = return OutputStream { _writeChunk = \c -> - wrapStreamExceptionsWith ServerDisconnected $ - outBodyPush iface c + outBodyPush iface c , _writeChunkFinal = \c -> - wrapStreamExceptionsWith ServerDisconnected $ - outBodyPushFinal iface c + outBodyPushFinal iface c , _flush = - wrapStreamExceptionsWith ServerDisconnected $ - outBodyFlush iface + outBodyFlush iface } {------------------------------------------------------------------------------- diff --git a/util/Network/GRPC/Util/Session/Channel.hs b/util/Network/GRPC/Util/Session/Channel.hs index 56bd1eff..ebb87de9 100644 --- a/util/Network/GRPC/Util/Session/Channel.hs +++ b/util/Network/GRPC/Util/Session/Channel.hs @@ -429,16 +429,16 @@ close Channel{channelOutbound} reason = do -- We leave the inbound thread running. Although the channel is closed, -- there might still be unprocessed messages in the queue. The inbound -- thread will terminate once it reaches the end of the queue - outbound <- cancelThread channelOutbound channelClosed - case outbound of - AlreadyTerminated _ -> - return $ Nothing - AlreadyAborted _err -> - -- Connection_ to the peer was lost prior to closing - return $ Nothing - Cancelled -> - -- Proper procedure for outbound messages was not followed - return $ Just channelClosed + outbound <- cancelThread channelOutbound channelClosed + case outbound of + AlreadyTerminated _ -> + return $ Nothing + AlreadyAborted _err -> + -- Connection_ to the peer was lost prior to closing + return $ Nothing + Cancelled -> + -- Proper procedure for outbound messages was not followed + return $ Just channelClosed where channelClosed :: SomeException channelClosed = @@ -522,7 +522,7 @@ linkOutboundToInbound allowHalfClosed channel inbound = do threads, using the 'Thread' API from "Network.GRPC.Util.Thread". We are therefore not particularly worried about these loops being interrupted by asynchronous exceptions: this only happens if the threads are explicitly - terminated (when the corrresponding channels are closed), in which case any + terminated (when the corresponding channels are closed), in which case any attempt to interact with them after they have been killed will be handled by 'getThreadInterface' throwing 'ThreadInterfaceUnavailable'. -------------------------------------------------------------------------------} diff --git a/util/Network/GRPC/Util/Session/Client.hs b/util/Network/GRPC/Util/Session/Client.hs index dd26d4a8..5ea1df51 100644 --- a/util/Network/GRPC/Util/Session/Client.hs +++ b/util/Network/GRPC/Util/Session/Client.hs @@ -181,7 +181,18 @@ setupRequestChannel sess threadBody "grapesy:clientOutbound" (channelOutbound channel) $ \markReady _debugId -> do markReady $ FlowStateRegular regular stream <- clientOutputStream iface - Client.outBodyUnmask iface $ sendMessageLoop sess regular stream + -- Unlike the client inbound thread, or the inbound/outbound threads + -- of the server, http2 knows about this particular thread and may + -- raise an exception on it when the server dies. This results in a + -- race condition between that exception and the exception we get from + -- attempting to read the next message. No matter who wins that race, + -- we need to mark that as 'ServerDisconnected'. + -- + -- We don't have this top-level exception handler in other places + -- because we don't want to mark /our own/ exceptions as + -- 'ServerDisconnected' or 'ClientDisconnected'. + wrapStreamExceptionsWith ServerDisconnected $ + Client.outBodyUnmask iface $ sendMessageLoop sess regular stream {------------------------------------------------------------------------------- Auxiliary http2