From c2bcd43be4f1f5a44518740a1173ade58aad26e7 Mon Sep 17 00:00:00 2001 From: Finley McIlwaine Date: Tue, 30 Jul 2024 13:00:05 -0700 Subject: [PATCH 1/7] Wrap outbound client thread stream exceptions Unlike the server inbound/outbound and client outbound threads, `http2` is aware of the client outbound thread. If the server disconnects, there is a race between the `http2`-thrown exception and the exception that will come from `grapesy` attempting to read. No matter who wins that race, we want to mark the exception with `ServerDisconnected`. --- docs/demo-client.md | 2 +- util/Network/GRPC/Util/HTTP2/Stream.hs | 16 ++++++++++------ util/Network/GRPC/Util/Session/Client.hs | 13 ++++++++++++- 3 files changed, 23 insertions(+), 8 deletions(-) diff --git a/docs/demo-client.md b/docs/demo-client.md index 84e6fb98..15d7b3e6 100644 --- a/docs/demo-client.md +++ b/docs/demo-client.md @@ -169,7 +169,7 @@ $ cabal run demo-client -- --core sayHelloBidiStream \ {message: "John Ack"} Disconnected. Reconnecting after 1701081μs Reconnecting now. -demo-client: GrpcException {grpcError = GrpcUnknown, grpcErrorMessage = Just "Call closed without trailers", grpcErrorMetadata = []} +demo-client: demo-client: ServerDisconnected {serverDisconnectedException = ..., ...} ``` ### Dealing with unterminated streams diff --git a/util/Network/GRPC/Util/HTTP2/Stream.hs b/util/Network/GRPC/Util/HTTP2/Stream.hs index f5e155d8..55d74e53 100644 --- a/util/Network/GRPC/Util/HTTP2/Stream.hs +++ b/util/Network/GRPC/Util/HTTP2/Stream.hs @@ -16,6 +16,7 @@ module Network.GRPC.Util.HTTP2.Stream ( -- * Exceptions , ClientDisconnected(..) , ServerDisconnected(..) + , wrapStreamExceptionsWith ) where import Control.Exception @@ -153,18 +154,21 @@ clientInputStream resp = do maybe [] fromHeaderTable <$> Client.getResponseTrailers resp } +-- | Construct a client 'OutputStream' +-- +-- We do not wrap the members of the 'OutputStream' with +-- 'wrapStreamExceptionsWith', since we do this around the entire +-- 'sendMessageLoop'. See the comment for @outboundThread@ in +-- 'Network.GRPC.Util.Session.Client.setupRequestChannel'. clientOutputStream :: OutBodyIface -> IO OutputStream clientOutputStream iface = return OutputStream { _writeChunk = \c -> - wrapStreamExceptionsWith ServerDisconnected $ - outBodyPush iface c + outBodyPush iface c , _writeChunkFinal = \c -> - wrapStreamExceptionsWith ServerDisconnected $ - outBodyPushFinal iface c + outBodyPushFinal iface c , _flush = - wrapStreamExceptionsWith ServerDisconnected $ - outBodyFlush iface + outBodyFlush iface } {------------------------------------------------------------------------------- diff --git a/util/Network/GRPC/Util/Session/Client.hs b/util/Network/GRPC/Util/Session/Client.hs index dd26d4a8..5ea1df51 100644 --- a/util/Network/GRPC/Util/Session/Client.hs +++ b/util/Network/GRPC/Util/Session/Client.hs @@ -181,7 +181,18 @@ setupRequestChannel sess threadBody "grapesy:clientOutbound" (channelOutbound channel) $ \markReady _debugId -> do markReady $ FlowStateRegular regular stream <- clientOutputStream iface - Client.outBodyUnmask iface $ sendMessageLoop sess regular stream + -- Unlike the client inbound thread, or the inbound/outbound threads + -- of the server, http2 knows about this particular thread and may + -- raise an exception on it when the server dies. This results in a + -- race condition between that exception and the exception we get from + -- attempting to read the next message. No matter who wins that race, + -- we need to mark that as 'ServerDisconnected'. + -- + -- We don't have this top-level exception handler in other places + -- because we don't want to mark /our own/ exceptions as + -- 'ServerDisconnected' or 'ClientDisconnected'. + wrapStreamExceptionsWith ServerDisconnected $ + Client.outBodyUnmask iface $ sendMessageLoop sess regular stream {------------------------------------------------------------------------------- Auxiliary http2 From 5834d3da609ada873784798f7d9afdbcb417b776 Mon Sep 17 00:00:00 2001 From: Finley McIlwaine Date: Tue, 30 Jul 2024 13:00:05 -0700 Subject: [PATCH 2/7] Sanity tests for server/client exception/disconnect See module headers of `Test.Sanity.Disconnect` and `Test.Sanity.Exception` --- grapesy.cabal | 6 + .../Interop/Client/TestCase/CustomMetadata.hs | 2 +- src/Network/GRPC/Server/Call.hs | 2 +- src/Network/GRPC/Server/Handler.hs | 2 +- test-grapesy/Main.hs | 6 +- test-grapesy/Test/Driver/ClientServer.hs | 9 +- .../Test/Driver/Dialogue/Definition.hs | 21 +- .../Test/Driver/Dialogue/Execution.hs | 8 + test-grapesy/Test/Sanity/BrokenDeployments.hs | 110 +------ test-grapesy/Test/Sanity/Disconnect.hs | 288 ++++++++++++++++++ test-grapesy/Test/Sanity/Exception.hs | 196 ++++++++++++ test-grapesy/Test/Util.hs | 16 +- test-grapesy/Test/Util/Exception.hs | 36 +++ test-grapesy/Test/Util/RawTestServer.hs | 110 +++++++ util/Network/GRPC/Util/Session/Channel.hs | 22 +- 15 files changed, 687 insertions(+), 147 deletions(-) create mode 100644 test-grapesy/Test/Sanity/Disconnect.hs create mode 100644 test-grapesy/Test/Sanity/Exception.hs create mode 100644 test-grapesy/Test/Util/Exception.hs create mode 100644 test-grapesy/Test/Util/RawTestServer.hs diff --git a/grapesy.cabal b/grapesy.cabal index e42f773d..7796bfb7 100644 --- a/grapesy.cabal +++ b/grapesy.cabal @@ -309,14 +309,18 @@ test-suite test-grapesy Test.Prop.IncrementalParsing Test.Prop.Serialization Test.Sanity.BrokenDeployments + Test.Sanity.Disconnect Test.Sanity.EndOfStream + Test.Sanity.Exception Test.Sanity.Interop Test.Sanity.StreamingType.CustomFormat Test.Sanity.StreamingType.NonStreaming Test.Util Test.Util.Awkward + Test.Util.Exception Test.Util.Orphans Test.Util.Protobuf + Test.Util.RawTestServer -- Internals we're testing Network.GRPC.Util.Parser @@ -342,6 +346,7 @@ test-suite test-grapesy , bytestring >= 0.10 && < 0.13 , case-insensitive >= 1.2 && < 1.3 , containers >= 0.6 && < 0.8 + , directory >= 1.3 && < 1.4 , exceptions >= 0.10 && < 0.11 , http-types >= 0.12 && < 0.13 , http2 >= 5.3.1 && < 5.4 @@ -363,6 +368,7 @@ test-suite test-grapesy , text >= 1.2 && < 2.2 , tls >= 1.7 && < 2.2 , tree-diff >= 0.3 && < 0.4 + , unix >= 2.7 && < 2.9 , utf8-string >= 1.0 && < 1.1 executable demo-client diff --git a/interop/Interop/Client/TestCase/CustomMetadata.hs b/interop/Interop/Client/TestCase/CustomMetadata.hs index a567298f..20daf465 100644 --- a/interop/Interop/Client/TestCase/CustomMetadata.hs +++ b/interop/Interop/Client/TestCase/CustomMetadata.hs @@ -21,7 +21,7 @@ import Proto.API.Interop -- For both UnaryCall and FullDuplexCall, the reference server (at least some) -- does not return any initial metadata until we send the first request. The -- test spec does not specify whether this is expected behaviour or not, so we --- play it save and only ask for the initial metadata after sending the request. +-- play it safe and only ask for the initial metadata after sending the request. runTest :: Cmdline -> IO () runTest cmdline = do withConnection def (testServer cmdline) $ \conn -> do diff --git a/src/Network/GRPC/Server/Call.hs b/src/Network/GRPC/Server/Call.hs index bbbdbbfc..4f7f4113 100644 --- a/src/Network/GRPC/Server/Call.hs +++ b/src/Network/GRPC/Server/Call.hs @@ -628,7 +628,7 @@ recvEndOfInput call@Call{} = do -- | Send 'ProperTrailers' -- --- This function is not part of the public API: we use it the top-level +-- This function is not part of the public API: we use it as the top-level -- exception handler in "Network.GRPC.Server" to forward exceptions in server -- handlers to the client. -- diff --git a/src/Network/GRPC/Server/Handler.hs b/src/Network/GRPC/Server/Handler.hs index 8aeaea15..f323bc1c 100644 --- a/src/Network/GRPC/Server/Handler.hs +++ b/src/Network/GRPC/Server/Handler.hs @@ -272,7 +272,7 @@ waitForHandler unmask call handlerThread = loop -- -- The attempt to forward it to the client is a best-effort only: -- --- * The nature of the exception might mean that we we cannot send anything to +-- * The nature of the exception might mean that we cannot send anything to -- the client at all. -- * It is possible the exception was thrown /after/ the handler already send -- the trailers to the client. diff --git a/test-grapesy/Main.hs b/test-grapesy/Main.hs index a3575150..a62daf31 100644 --- a/test-grapesy/Main.hs +++ b/test-grapesy/Main.hs @@ -17,7 +17,9 @@ import Test.Prop.Dialogue qualified as Dialogue import Test.Prop.IncrementalParsing qualified as IncrementalParsing import Test.Prop.Serialization qualified as Serialization import Test.Sanity.BrokenDeployments qualified as BrokenDeployments +import Test.Sanity.Disconnect qualified as Disconnect import Test.Sanity.EndOfStream qualified as EndOfStream +import Test.Sanity.Exception qualified as Exception import Test.Sanity.Interop qualified as Interop import Test.Sanity.StreamingType.CustomFormat qualified as StreamingType.CustomFormat import Test.Sanity.StreamingType.NonStreaming qualified as StreamingType.NonStreaming @@ -28,11 +30,13 @@ main = do defaultMain $ testGroup "grapesy" [ testGroup "Sanity" [ - EndOfStream.tests + Disconnect.tests + , EndOfStream.tests , testGroup "StreamingType" [ StreamingType.NonStreaming.tests , StreamingType.CustomFormat.tests ] + , Exception.tests , Interop.tests , BrokenDeployments.tests ] diff --git a/test-grapesy/Test/Driver/ClientServer.hs b/test-grapesy/Test/Driver/ClientServer.hs index d9d70bb4..2678a31a 100644 --- a/test-grapesy/Test/Driver/ClientServer.hs +++ b/test-grapesy/Test/Driver/ClientServer.hs @@ -40,6 +40,7 @@ import Network.GRPC.Common import Network.GRPC.Common.Compression qualified as Compr import Network.GRPC.Server qualified as Server import Network.GRPC.Server.Run qualified as Server +import Test.Util.Exception import Paths_grapesy @@ -168,12 +169,6 @@ data TlsFail = we don't see these exceptions server-side. -------------------------------------------------------------------------------} --- | Exception thrown by client or handler to test exception handling -data DeliberateException = forall e. Exception e => DeliberateException e - deriving anyclass (Exception) - -deriving stock instance Show DeliberateException - isExpectedServerException :: ClientServerConfig -> SomeException -> Bool isExpectedServerException cfg e -- @@ -232,7 +227,7 @@ isExpectedClientException cfg e | Just (DeliberateException _) <- fromException e = True - -- Server threw deliberat exception + -- Server threw deliberate exception | Just grpcException <- fromException e , Just msg <- grpcErrorMessage grpcException , "DeliberateException" `Text.isInfixOf` msg diff --git a/test-grapesy/Test/Driver/Dialogue/Definition.hs b/test-grapesy/Test/Driver/Dialogue/Definition.hs index 802de12d..53541d04 100644 --- a/test-grapesy/Test/Driver/Dialogue/Definition.hs +++ b/test-grapesy/Test/Driver/Dialogue/Definition.hs @@ -20,7 +20,6 @@ module Test.Driver.Dialogue.Definition ( , hasEarlyTermination ) where -import Control.Exception import Control.Monad.State (StateT, execStateT, modify) import Data.Bifunctor import Data.ByteString qualified as Strict (ByteString) @@ -28,6 +27,7 @@ import Data.ByteString qualified as Strict (ByteString) import Network.GRPC.Common import Test.Driver.Dialogue.TestClock qualified as TestClock +import Test.Util.Exception import Control.Monad.Catch import GHC.Show (appPrec1, showCommaSpace) @@ -154,25 +154,6 @@ newtype GlobalSteps = GlobalSteps { } deriving stock (Show) -{------------------------------------------------------------------------------- - User exceptions - - When a test calls for the client or the server to throw an exception, we throw - one of these. Their sole purpose is to be "any" kind of exception (not a - specific one). --------------------------------------------------------------------------------} - -data SomeServerException = SomeServerException ExceptionId - deriving stock (Show, Eq) - deriving anyclass (Exception) - -data SomeClientException = SomeClientException ExceptionId - deriving stock (Show, Eq) - deriving anyclass (Exception) - --- | We distinguish exceptions from each other simply by a number -type ExceptionId = Int - {------------------------------------------------------------------------------- Utility -------------------------------------------------------------------------------} diff --git a/test-grapesy/Test/Driver/Dialogue/Execution.hs b/test-grapesy/Test/Driver/Dialogue/Execution.hs index 25f2ba03..13eea7bb 100644 --- a/test-grapesy/Test/Driver/Dialogue/Execution.hs +++ b/test-grapesy/Test/Driver/Dialogue/Execution.hs @@ -413,6 +413,8 @@ serverLocal clock call = \(LocalSteps steps) -> do Terminate mErr -> do mInp <- liftIO $ try $ within timeoutReceive action $ Server.Binary.recvInput call + -- TODO: + -- -- On the server side we cannot distinguish regular client -- termination from an exception when receiving. let expectation = isExpectedElem $ NoMoreElems NoMetadata @@ -426,6 +428,12 @@ serverLocal clock call = \(LocalSteps steps) -> do -- terminate more-or-less immediately, this does not necessarily indicate -- any kind of failure: the client may simply have put the call in -- half-closed mode. + -- + -- TODO: + -- However, when the client terminates early and we are not using one + -- connection per RPC (i.e. we are sharing a connection), the server will + -- /never/ realize that the client has disappeared. See the discussion in + -- the issue above. waitForClientDisconnect :: IO () waitForClientDisconnect = within timeoutFailure () $ loop diff --git a/test-grapesy/Test/Sanity/BrokenDeployments.hs b/test-grapesy/Test/Sanity/BrokenDeployments.hs index a36bb613..ec0ea9a1 100644 --- a/test-grapesy/Test/Sanity/BrokenDeployments.hs +++ b/test-grapesy/Test/Sanity/BrokenDeployments.hs @@ -3,26 +3,18 @@ module Test.Sanity.BrokenDeployments (tests) where -import Control.Concurrent -import Control.Concurrent.Async import Control.Exception -import Data.ByteString qualified as BS.Strict -import Data.ByteString qualified as Strict (ByteString) -import Data.ByteString.Builder qualified as BS.Builder import Data.ByteString.Char8 qualified as BS.Strict.Char8 import Data.ByteString.UTF8 qualified as BS.Strict.UTF8 -import Data.String (fromString) import Data.Text qualified as Text import Network.HTTP.Types qualified as HTTP -import Network.HTTP2.Server qualified as HTTP2 -import Network.Run.TCP qualified as NetworkRun -import Network.Socket import Test.Tasty import Test.Tasty.HUnit import Network.GRPC.Client qualified as Client import Network.GRPC.Common import Network.GRPC.Common.Protobuf +import Test.Util.RawTestServer import Proto.API.Ping @@ -54,6 +46,11 @@ tests = testGroup "Test.Sanity.BrokenDeployments" [ ] ] +connParams :: Client.ConnParams +connParams = def { + Client.connVerifyHeaders = True + } + {------------------------------------------------------------------------------- HTTP Status -------------------------------------------------------------------------------} @@ -322,101 +319,6 @@ test_invalidTrailerMetadata = respondWith response $ \addr -> do someInvalidMetadata :: String someInvalidMetadata = "This is invalid: 你好" -{------------------------------------------------------------------------------- - Test server - - This allows us to simulate broken /servers/. --------------------------------------------------------------------------------} - -data Response = Response { - responseStatus :: HTTP.Status - , responseHeaders :: [HTTP.Header] - , responseBody :: Strict.ByteString - , responseTrailers :: [HTTP.Header] - } - -instance Default Response where - def = Response { - responseStatus = HTTP.ok200 - , responseHeaders = [ asciiHeader "content-type" "application/grpc" ] - , responseBody = BS.Strict.empty - , responseTrailers = [ asciiHeader "grpc-status" "0" ] - } - --- | Server that responds with the given 'Response', independent of the request -respondWith :: Response -> (Client.Address -> IO a) -> IO a -respondWith response = withTestServer $ \_req _aux respond -> - respond http2Response [] - where - http2Response :: HTTP2.Response - http2Response = - flip HTTP2.setResponseTrailersMaker trailersMaker $ - HTTP2.responseBuilder - (responseStatus response) - (responseHeaders response) - (BS.Builder.byteString $ responseBody response) - - trailersMaker :: HTTP2.TrailersMaker - trailersMaker Nothing = return $ HTTP2.Trailers (responseTrailers response) - trailersMaker (Just _) = return $ HTTP2.NextTrailersMaker trailersMaker - --- | Low-level test server --- --- We bypass the entire grapesy machinery for constructing the server, because --- we need to mock a broken deployment. --- --- The grapesy client can auto reconnect when the server is not (yet) up and --- running, but to keep things simple, and since the server anyway runs in the --- same process, we just signal when the server is ready. This also allows us --- to avoid binding to a specific port in the tests (which might already be in --- use on the machine running the tests, leading to spurious test failures). -testServer :: HTTP2.Server -> MVar PortNumber -> IO () -testServer server serverPort = do - addr <- NetworkRun.resolve Stream (Just "127.0.0.1") "0" [AI_PASSIVE] - bracket (NetworkRun.openTCPServerSocket addr) close $ \listenSock -> do - addr' <- getSocketName listenSock - port <- case addr' of - SockAddrInet port _host -> return port - SockAddrInet6 port _ _host _ -> return port - SockAddrUnix{} -> error "respondWith: unexpected unix socket" - putMVar serverPort port - NetworkRun.runTCPServerWithSocket listenSock $ \clientSock -> - bracket (HTTP2.allocSimpleConfig clientSock 4096) - HTTP2.freeSimpleConfig $ \config -> - HTTP2.run HTTP2.defaultServerConfig config server - -withTestServer :: HTTP2.Server -> (Client.Address -> IO a) -> IO a -withTestServer server k = do - serverPort <- newEmptyMVar - withAsync (testServer server serverPort) $ \_serverThread -> do - port <- readMVar serverPort - let addr :: Client.Address - addr = Client.Address { - addressHost = "127.0.0.1" - , addressPort = port - , addressAuthority = Nothing - } - k addr - -{------------------------------------------------------------------------------- - Auxiliary --------------------------------------------------------------------------------} - -connParams :: Client.ConnParams -connParams = def { - Client.connVerifyHeaders = True - } - --- | Header with ASCII value --- --- (Header /names/ are always ASCII.) -asciiHeader :: String -> String -> HTTP.Header -asciiHeader name value = (fromString name, BS.Strict.Char8.pack value) - --- | Header with UTF-8 encoded value -utf8Header :: String -> String -> HTTP.Header -utf8Header name value = (fromString name, BS.Strict.UTF8.fromString value) - grpcMessageContains :: GrpcException -> String -> Bool grpcMessageContains GrpcException{grpcErrorMessage} str = case grpcErrorMessage of diff --git a/test-grapesy/Test/Sanity/Disconnect.hs b/test-grapesy/Test/Sanity/Disconnect.hs new file mode 100644 index 00000000..2d164a0d --- /dev/null +++ b/test-grapesy/Test/Sanity/Disconnect.hs @@ -0,0 +1,288 @@ +{-# OPTIONS_GHC -Wno-orphans #-} + +-- | Handling of client or server disconnections occurring with ongoing RPCs on +-- a shared connection. +-- +-- When a server disconnects, we expect: +-- +-- 1. All current calls fail with 'Client.ServerDisconnected' +-- 2. Future calls (after reconnection) succeed +-- +-- When a client disconnects, we expect: +-- +-- 1. The handlers dealing with that client (i.e. on that connection) should +-- fail with 'Server.ClientDisonnected' +-- 2. Future calls (after reconnection) succeed +module Test.Sanity.Disconnect where + +import Control.Concurrent +import Control.Concurrent.Async +import Control.Exception +import Control.Monad +import Data.ByteString.Lazy qualified as Lazy (ByteString) +import Data.Either +import Data.IORef +import Data.Word +import Foreign.C.Types (CInt(..)) +import System.Posix +import Test.Tasty +import Test.Tasty.HUnit +import Text.Read + +import Network.GRPC.Client qualified as Client +import Network.GRPC.Client.Binary qualified as Binary +import Network.GRPC.Common +import Network.GRPC.Server qualified as Server +import Network.GRPC.Server.Binary qualified as Binary +import Network.GRPC.Spec +import Test.Util +import Test.Util.RawTestServer + +tests :: TestTree +tests = testGroup "Test.Sanity.Disconnect" [ + testCase "client" test_clientDisconnect + , testCase "server" test_serverDisconnect + ] + +-- | Two separate clients make many concurrent calls, one of them disconnects. +test_clientDisconnect :: Assertion +test_clientDisconnect = do + -- Create the server + disconnectCounter1 <- newIORef 0 + disconnectCounter2 <- newIORef 0 + server <- + Server.mkGrpcServer def [ + Server.someRpcHandler $ + Server.mkRpcHandler @Trivial $ echoHandler (Just disconnectCounter1) + , Server.someRpcHandler $ + Server.mkRpcHandler @Trivial' $ echoHandler (Just disconnectCounter2) + ] + + portSignal <- newEmptyMVar + void $ forkIO $ rawTestServer (pure Nothing) (putMVar portSignal) server + + -- Start server + serverPort <- readMVar portSignal + let serverAddress = + Client.ServerInsecure Client.Address { + addressHost = "127.0.0.1" + , addressPort = serverPort + , addressAuthority = Nothing + } + + -- Start a client in a separate process + void $ forkProcess $ + Client.withConnection def serverAddress $ \conn -> do + -- Make 50 concurrent calls. 49 of them sending infinite messages. One + -- of them kills this client process after 100 messages. + let numCalls = 50 + predicate = pure . const False + predicates = + replicate (numCalls - 1) predicate ++ + [ \n -> do + when (n == 100) $ c_exit 1 + return False + ] + mapConcurrently_ + ( Client.withRPC conn def (Proxy @Trivial) + . countUntil + ) + predicates + + -- Start two more clients that make 50 calls to each handler, all calls + -- counting up to 1000 + let numCalls = 50 + countTo = 100 + predicate = pure . (>= countTo) + predicates = replicate numCalls predicate + (result1, result2) <- concurrently + ( Client.withConnection def serverAddress $ \conn -> do + sum <$> mapConcurrently + ( Client.withRPC conn def (Proxy @Trivial) + . countUntil + ) + predicates + ) + ( Client.withConnection def serverAddress $ \conn -> do + sum <$> mapConcurrently + ( Client.withRPC conn def (Proxy @Trivial') + . countUntil + ) + predicates + ) + + -- All calls should have finished with a results of 'countTo', for both + -- clients + assertBool "" (result1 + result2 == 2 * sum (replicate numCalls countTo)) + + -- We should also see only 50 client disconnects for the first handler and + -- none for the second + clientDisconnects1 <- readIORef disconnectCounter1 + clientDisconnects2 <- readIORef disconnectCounter2 + assertBool "" (clientDisconnects1 == 50 && clientDisconnects2 == 0) + +-- | Client makes many concurrent calls, server disconnects +test_serverDisconnect :: Assertion +test_serverDisconnect = withTemporaryFile $ \ipcFile -> do + -- We use a temporary file as a very rudimentary means of inter-process + -- communication so the server (which runs in a separate process) can make + -- the client aware of the port it is assigned by the OS. This also helps us + -- make sure the server binds to the same port when it comes back up for + -- reconnect purposes. + let ipcWrite :: String -> IO () + ipcWrite msg = do + writeFile ipcFile "" + writeFile ipcFile msg + + ipcRead :: IO String + ipcRead = readFile ipcFile + + ipcWaitRead :: IO String + ipcWaitRead = do + ipcRead >>= \case + "" -> do + threadDelay 10000 >> ipcWaitRead + msg -> do + return msg + + -- Create the server + server <- + Server.mkGrpcServer def [ + Server.someRpcHandler $ + Server.mkRpcHandler @Trivial $ echoHandler Nothing + ] + + let -- Starts the server in a new process. Gives back an action that kills + -- the server process. + startServer :: IO (IO ()) + startServer = do + serverPid <- + forkProcess $ + rawTestServer (readMaybe <$> ipcRead) (ipcWrite . show) server + return $ c_kill (fromIntegral serverPid) sigKILL + + -- Start server, get the port + killServer <- startServer + serverPort <- read <$> ipcWaitRead + signalRestart <- newEmptyMVar + let serverAddress = + Client.ServerInsecure Client.Address { + addressHost = "127.0.0.1" + , addressPort = serverPort + , addressAuthority = Nothing + } + + reconnectPolicy :: Client.ReconnectPolicy + reconnectPolicy = go 0 + where + go :: Int -> Client.ReconnectPolicy + go n + | n == 5 + = Client.ReconnectAfter $ do + killRestarted <- startServer + putMVar signalRestart killRestarted + return $ Client.exponentialBackoff threadDelay 1 (1, 1) 100 + | otherwise + = Client.ReconnectAfter $ do + threadDelay 10000 + return $ go (n + 1) + + connParams :: Client.ConnParams + connParams = def { Client.connReconnectPolicy = reconnectPolicy } + + Client.withConnection connParams serverAddress $ \conn -> do + -- Make 50 concurrent calls. 49 of them sending infinite messages. One + -- of them kills the server after 100 messages. + let numCalls = 50 + predicate = pure . const False + predicates = + replicate (numCalls - 1) predicate ++ + [ \n -> do + when (n == 100) killServer + return False + ] + results <- + mapConcurrently + ( try @Client.ServerDisconnected + . Client.withRPC conn def (Proxy @Trivial) + . countUntil + ) + predicates + + -- All calls should have failed + assertBool "" (null (rights results) && length (lefts results) == numCalls) + + -- New calls should succeed (after reconnection) + killRestarted <- takeMVar signalRestart + result <- Client.withRPC conn def (Proxy @Trivial) $ + countUntil (pure . (>= 100)) + assertBool "" (result == 100) + + -- Do not leave the server process hanging around + killRestarted + +{------------------------------------------------------------------------------- + Client and handler functions +-------------------------------------------------------------------------------} + +-- | Send increasing numbers to the server until it responds with one that +-- satisfies the given predicate. +countUntil :: forall rpc. + ( Input rpc ~ Lazy.ByteString + , Output rpc ~ Lazy.ByteString + , ResponseTrailingMetadata rpc ~ NoMetadata + ) => (Word64 -> IO Bool) -> Client.Call rpc -> IO Word64 +countUntil = go 0 + where + go :: Word64 -> (Word64 -> IO Bool) -> Client.Call rpc -> IO Word64 + go next p call = do + sat <- p next + if sat then do + Binary.sendFinalInput @Word64 call next + (final, NoMetadata) <- Binary.recvFinalOutput @Word64 call + return final + else do + Binary.sendNextInput @Word64 call next + next' <- Binary.recvNextOutput @Word64 call + go (succ next') p call + +-- | Echos any input +echoHandler :: + ( Input rpc ~ Lazy.ByteString + , Output rpc ~ Lazy.ByteString + , ResponseTrailingMetadata rpc ~ NoMetadata + ) => Maybe (IORef Int) -> Server.Call rpc -> IO () +echoHandler disconnectCounter call = trackDisconnects disconnectCounter $ do + Binary.recvInput @Word64 call >>= \case + StreamElem n -> do + Binary.sendNextOutput @Word64 call n + echoHandler disconnectCounter call + FinalElem n _ -> do + Binary.sendFinalOutput @Word64 call (n, NoMetadata) + NoMoreElems _ -> do + Server.sendTrailers call NoMetadata + where + trackDisconnects Nothing = + id + trackDisconnects (Just counter) = + handle ( + \(_e :: Server.ClientDisconnected) -> + atomicModifyIORef' counter $ \n -> (n + 1, ()) + ) + +{------------------------------------------------------------------------------- + Auxiliary +-------------------------------------------------------------------------------} + +foreign import ccall unsafe "kill" c_kill :: CInt -> CInt -> IO () +foreign import ccall unsafe "exit" c_exit :: CInt -> IO () + +type Trivial = RawRpc "trivial" "trivial" +type Trivial' = RawRpc "trivial" "trivial'" + +type instance RequestMetadata Trivial = NoMetadata +type instance ResponseInitialMetadata Trivial = NoMetadata +type instance ResponseTrailingMetadata Trivial = NoMetadata +type instance RequestMetadata Trivial' = NoMetadata +type instance ResponseInitialMetadata Trivial' = NoMetadata +type instance ResponseTrailingMetadata Trivial' = NoMetadata diff --git a/test-grapesy/Test/Sanity/Exception.hs b/test-grapesy/Test/Sanity/Exception.hs new file mode 100644 index 00000000..e3312e74 --- /dev/null +++ b/test-grapesy/Test/Sanity/Exception.hs @@ -0,0 +1,196 @@ +{-# LANGUAGE OverloadedStrings #-} +{-# OPTIONS_GHC -Wno-orphans #-} + +-- | Handling of exceptions occurring in an RPC on a shared connection. +-- +-- These tests check for the behavior described in +-- . In particular, there are +-- two conditions that should hold when an exception occurs in the scope of a +-- call (on either the server or client, e.g. inside either 'mkRpcHandler' or +-- 'withRPC'): +-- +-- 1. Other ongoing calls on that connection are not terminated, and +-- 2. future calls are still possible. +module Test.Sanity.Exception where + +import Control.Concurrent.Async +import Control.Exception +import Control.Monad +import Data.Either +import Data.IORef +import Data.Text qualified as Text +import Data.Word +import Test.Tasty +import Test.Tasty.HUnit + +import Network.GRPC.Client qualified as Client +import Network.GRPC.Client.Binary qualified as Binary +import Network.GRPC.Common +import Network.GRPC.Server qualified as Server +import Network.GRPC.Server.Binary qualified as Binary +import Network.GRPC.Spec +import Test.Driver.ClientServer +import Test.Util.Exception + +tests :: TestTree +tests = testGroup "Test.Sanity.Exception" [ + testCase "client" test_clientException + , testCase "server" test_serverException + + , testCase "earlyTerminationNoWait" test_earlyTerminationNoWait + ] + +-- | Client makes many concurrent calls, throws an exception during one of them. +test_clientException :: IO () +test_clientException = testClientServer $ ClientServerTest { + config = def + , client = simpleTestClient $ \conn -> do + -- Make 100 concurrent calls. 99 of them counting to 50, and one + -- more that throws an exception once it reaches 10. + let + predicate = (> 50) + predicates = + replicate 99 predicate ++ + [ \n -> + (n > 10) + && throw (DeliberateException $ SomeClientException 1) + ] + + results <- + mapConcurrently + ( try @DeliberateException + . Client.withRPC conn def (Proxy @Trivial) + . countUntil + ) + predicates + + -- Only one of the calls failed + assertEqual "" (length $ lefts results) 1 + + -- All others terminated with results satisfying the predicate + assertBool "" (all predicate $ rights results) + + -- New calls still succeed + assertBool "" . predicate + =<< Client.withRPC conn def (Proxy @Trivial) (countUntil predicate) + , server = [ + Server.someRpcHandler $ + Server.mkRpcHandler @Trivial incUntilFinal + ] + } + +-- | Client makes many concurrent calls, the handler throws an exception during +-- one of them. +test_serverException :: IO () +test_serverException = do + handlerCounter <- newIORef @Int 0 + testClientServer $ ClientServerTest { + config = def { expectEarlyServerTermination = True } + , client = simpleTestClient $ \conn -> do + -- Make 100 concurrent calls counting to 50. + let predicate = (> 50) + results <- + replicateConcurrently 100 $ + try @GrpcException + . Client.withRPC conn def (Proxy @Trivial) + $ countUntil predicate + + -- Only one of the calls failed, and we got the appropriate + -- exception + case lefts results of + [GrpcException GrpcUnknown (Just msg) []] -> do + assertBool "" $ "DeliberateException" `Text.isInfixOf` msg + assertBool "" $ "SomeServerException 1" `Text.isInfixOf` msg + _ -> + assertFailure "" + + -- All others terminated with results satisfying the predicate + assertBool "" (all predicate $ rights results) + + -- New calls still succeed + assertBool "" . predicate + =<< Client.withRPC conn def (Proxy @Trivial) (countUntil predicate) + , server = [ + Server.someRpcHandler $ + Server.mkRpcHandler @Trivial $ \call -> do + handlerCount <- + atomicModifyIORef' handlerCounter (\n -> (n + 1, n)) + when (handlerCount == 25) $ + throwIO $ + DeliberateException $ SomeServerException 1 + incUntilFinal call + ] + } + +-- | This is essentially 'Test.Prop.Dialogue.earlyTermination15', but the server +-- does not wait for client termination. +test_earlyTerminationNoWait :: IO () +test_earlyTerminationNoWait = testClientServer $ ClientServerTest { + config = def + , client = simpleTestClient $ \conn -> do + _mResult <- + try @DeliberateException $ + Client.withRPC conn def (Proxy @Trivial) $ \_call -> + throwIO (DeliberateException $ SomeServerException 0) + + Client.withRPC conn def (Proxy @Trivial) $ \call -> do + Binary.sendFinalInput @Word8 call 0 + _output <- Binary.recvOutput @Word8 call + return () + , server = [ + Server.someRpcHandler $ + Server.mkRpcHandler @Trivial $ \call -> + Binary.recvInput @Word8 call >>= \case + _ -> Server.sendTrailers call NoMetadata + ] + } + +{------------------------------------------------------------------------------- + Client and handler functions +-------------------------------------------------------------------------------} + +-- | Send numbers to the server until it responds with one that satisfies the +-- given predicate. +countUntil :: (Word64 -> Bool) -> Client.Call Trivial -> IO Word64 +countUntil = go 0 + where + go :: Word64 -> (Word64 -> Bool) -> Client.Call Trivial -> IO Word64 + go next p call + | p next + = do + Binary.sendFinalInput @Word64 call next + (_final, NoMetadata) <- Binary.recvFinalOutput @Word64 call + return next + | otherwise + = do + Binary.sendNextInput @Word64 call next + next' <- Binary.recvNextOutput @Word64 call + go next' p call + +-- | Reads numbers from the client and sends them back incremented by one. +incUntilFinal :: Server.Call Trivial -> IO () +incUntilFinal call = do + Binary.recvInput call >>= \case + StreamElem n -> do + Binary.sendNextOutput @Word64 call $ succ n + incUntilFinal call + FinalElem n _ -> do + Binary.sendFinalOutput @Word64 call (succ n, NoMetadata) + NoMoreElems _ -> do + -- TODO: + -- + -- We shouldn't need to handle this case, since our client never + -- explicitly sends 'NoMoreElems'. However, see discussion in the + -- ticket above. + Server.sendTrailers call NoMetadata + return () + +{------------------------------------------------------------------------------- + Auxiliary +-------------------------------------------------------------------------------} + +type Trivial = RawRpc "trivial" "trivial" + +type instance RequestMetadata Trivial = NoMetadata +type instance ResponseInitialMetadata Trivial = NoMetadata +type instance ResponseTrailingMetadata Trivial = NoMetadata diff --git a/test-grapesy/Test/Util.hs b/test-grapesy/Test/Util.hs index f5510ba3..ef8e0b3e 100644 --- a/test-grapesy/Test/Util.hs +++ b/test-grapesy/Test/Util.hs @@ -4,6 +4,9 @@ module Test.Util ( -- * Timeouts Timeout(..) , within + + -- * Files + , withTemporaryFile ) where import Control.Concurrent @@ -11,6 +14,8 @@ import Control.Exception import Control.Monad.Catch import Control.Monad.IO.Class import GHC.Stack +import System.Directory +import System.IO {------------------------------------------------------------------------------- Timeouts @@ -45,4 +50,13 @@ within t info io = do fmap fst $ generalBracket startTimer stopTimer $ \_ -> io - +withTemporaryFile :: (FilePath -> IO a) -> IO a +withTemporaryFile k = do + tmpDir <- getTemporaryDirectory + Control.Exception.bracket + (openTempFile tmpDir "grapesy-test-suite.txt") + (removeFile . fst) + ( \(fp, h) -> do + hClose h + k fp + ) diff --git a/test-grapesy/Test/Util/Exception.hs b/test-grapesy/Test/Util/Exception.hs new file mode 100644 index 00000000..5b0af1f8 --- /dev/null +++ b/test-grapesy/Test/Util/Exception.hs @@ -0,0 +1,36 @@ +-- | Utility exception types for the tests +module Test.Util.Exception + ( -- * User exceptions + SomeServerException(..) + , SomeClientException(..) + + -- * Deliberate exceptions + , DeliberateException(..) + , ExceptionId + ) where + +import Control.Exception + +{------------------------------------------------------------------------------- + User exceptions + + When a test calls for the client or the server to throw an exception, we throw + one of these. Their sole purpose is to be "any" kind of exception (not a + specific one). +-------------------------------------------------------------------------------} + +data SomeServerException = SomeServerException ExceptionId + deriving stock (Show, Eq) + deriving anyclass (Exception) + +data SomeClientException = SomeClientException ExceptionId + deriving stock (Show, Eq) + deriving anyclass (Exception) + +-- | Exception thrown by client or handler to test exception handling +data DeliberateException = forall e. Exception e => DeliberateException e + deriving anyclass (Exception) +deriving stock instance Show DeliberateException + +-- | We distinguish exceptions from each other simply by a number +type ExceptionId = Int diff --git a/test-grapesy/Test/Util/RawTestServer.hs b/test-grapesy/Test/Util/RawTestServer.hs new file mode 100644 index 00000000..10d5708e --- /dev/null +++ b/test-grapesy/Test/Util/RawTestServer.hs @@ -0,0 +1,110 @@ +module Test.Util.RawTestServer where + +import Control.Concurrent +import Control.Concurrent.Async +import Control.Exception +import Data.ByteString qualified as BS.Strict +import Data.ByteString qualified as Strict (ByteString) +import Data.ByteString.Builder qualified as BS.Builder +import Data.ByteString.Char8 qualified as BS.Strict.Char8 +import Data.ByteString.UTF8 qualified as BS.Strict.UTF8 +import Data.String (fromString) +import Network.HTTP2.Server qualified as HTTP2 +import Network.Run.TCP qualified as NetworkRun +import Network.Socket + +import Network.GRPC.Client qualified as Client +import Network.HTTP.Types qualified as HTTP +import Network.GRPC.Common +import Data.Maybe + +{------------------------------------------------------------------------------- + Raw test server + + This allows us to simulate broken /servers/. +-------------------------------------------------------------------------------} + +-- | Low-level test server +-- +-- We bypass the entire grapesy machinery for constructing the server, for added +-- flexibility. This allows us to mock broken deployments or run the server in +-- another thread that we throw asynchronous exceptions to, for example. +-- +-- The grapesy client can auto reconnect when the server is not (yet) up and +-- running, but to keep things simple, we just signal when the server is ready. +-- This also allows us to avoid binding to a specific port in the tests (which +-- might already be in use on the machine running the tests, leading to spurious +-- test failures). +rawTestServer :: IO (Maybe PortNumber) -> (PortNumber -> IO ()) -> HTTP2.Server -> IO () +rawTestServer getPort signalPort server = do + mPortIn <- fromMaybe 0 <$> getPort + addr <- NetworkRun.resolve Stream (Just "127.0.0.1") (show mPortIn) [AI_PASSIVE] + bracket (NetworkRun.openTCPServerSocket addr) close $ \listenSock -> do + addr' <- getSocketName listenSock + portOut <- case addr' of + SockAddrInet port _host -> return port + SockAddrInet6 port _ _host _ -> return port + SockAddrUnix{} -> error "rawTestServer: unexpected unix socket" + signalPort portOut + NetworkRun.runTCPServerWithSocket listenSock $ \clientSock -> + bracket (HTTP2.allocSimpleConfig clientSock 4096) + HTTP2.freeSimpleConfig $ \config -> + HTTP2.run HTTP2.defaultServerConfig config server + +-- | Run the server and apply the continuation to an 'Client.Address' holding +-- the running server's host and port. +withTestServer :: HTTP2.Server -> (Client.Address -> IO a) -> IO a +withTestServer server k = do + serverPort <- newEmptyMVar + withAsync (rawTestServer (pure Nothing) (putMVar serverPort) server) $ + \_serverThread -> do + port <- readMVar serverPort + let addr :: Client.Address + addr = Client.Address { + addressHost = "127.0.0.1" + , addressPort = port + , addressAuthority = Nothing + } + k addr + +-- | Server that responds with the given 'Response', independent of the request +respondWith :: Response -> (Client.Address -> IO a) -> IO a +respondWith response = withTestServer $ \_req _aux respond -> + respond (toHTTP2Response response) [] + +data Response = Response { + responseStatus :: HTTP.Status + , responseHeaders :: [HTTP.Header] + , responseBody :: Strict.ByteString + , responseTrailers :: [HTTP.Header] + } + +instance Default Response where + def = Response { + responseStatus = HTTP.ok200 + , responseHeaders = [ asciiHeader "content-type" "application/grpc" ] + , responseBody = BS.Strict.empty + , responseTrailers = [ asciiHeader "grpc-status" "0" ] + } + +toHTTP2Response :: Response -> HTTP2.Response +toHTTP2Response response = + flip HTTP2.setResponseTrailersMaker trailersMaker $ + HTTP2.responseBuilder + (responseStatus response) + (responseHeaders response) + (BS.Builder.byteString $ responseBody response) + where + trailersMaker :: HTTP2.TrailersMaker + trailersMaker Nothing = return $ HTTP2.Trailers (responseTrailers response) + trailersMaker (Just _) = return $ HTTP2.NextTrailersMaker trailersMaker + +-- | Header with ASCII value +-- +-- (Header /names/ are always ASCII.) +asciiHeader :: String -> String -> HTTP.Header +asciiHeader name value = (fromString name, BS.Strict.Char8.pack value) + +-- | Header with UTF-8 encoded value +utf8Header :: String -> String -> HTTP.Header +utf8Header name value = (fromString name, BS.Strict.UTF8.fromString value) diff --git a/util/Network/GRPC/Util/Session/Channel.hs b/util/Network/GRPC/Util/Session/Channel.hs index 56bd1eff..ebb87de9 100644 --- a/util/Network/GRPC/Util/Session/Channel.hs +++ b/util/Network/GRPC/Util/Session/Channel.hs @@ -429,16 +429,16 @@ close Channel{channelOutbound} reason = do -- We leave the inbound thread running. Although the channel is closed, -- there might still be unprocessed messages in the queue. The inbound -- thread will terminate once it reaches the end of the queue - outbound <- cancelThread channelOutbound channelClosed - case outbound of - AlreadyTerminated _ -> - return $ Nothing - AlreadyAborted _err -> - -- Connection_ to the peer was lost prior to closing - return $ Nothing - Cancelled -> - -- Proper procedure for outbound messages was not followed - return $ Just channelClosed + outbound <- cancelThread channelOutbound channelClosed + case outbound of + AlreadyTerminated _ -> + return $ Nothing + AlreadyAborted _err -> + -- Connection_ to the peer was lost prior to closing + return $ Nothing + Cancelled -> + -- Proper procedure for outbound messages was not followed + return $ Just channelClosed where channelClosed :: SomeException channelClosed = @@ -522,7 +522,7 @@ linkOutboundToInbound allowHalfClosed channel inbound = do threads, using the 'Thread' API from "Network.GRPC.Util.Thread". We are therefore not particularly worried about these loops being interrupted by asynchronous exceptions: this only happens if the threads are explicitly - terminated (when the corrresponding channels are closed), in which case any + terminated (when the corresponding channels are closed), in which case any attempt to interact with them after they have been killed will be handled by 'getThreadInterface' throwing 'ThreadInterfaceUnavailable'. -------------------------------------------------------------------------------} From 9710050c3469b6c1e7dd8aca73752a74f0d60091 Mon Sep 17 00:00:00 2001 From: Finley McIlwaine Date: Tue, 27 Aug 2024 20:02:18 -0700 Subject: [PATCH 3/7] Let 'ReconnectPolicy' specify new server address The 'ReconnectAfter' constructor of reconnect policy now holds an optional 'Server' argument, allowing reconnect policies to specify new server addresses to attempt reconnection to. This makes it possible to fall back to redundant servers, without needing to completely throw away a connection on the client. --- grapesy.cabal | 2 +- src/Network/GRPC/Client/Connection.hs | 24 ++++++---- test-grapesy/Test/Driver/ClientServer.hs | 2 +- test-grapesy/Test/Sanity/Disconnect.hs | 59 ++++++++++++------------ test-grapesy/Test/Util.hs | 13 ++---- test-grapesy/Test/Util/RawTestServer.hs | 10 ++-- 6 files changed, 53 insertions(+), 57 deletions(-) diff --git a/grapesy.cabal b/grapesy.cabal index 7796bfb7..187e4c0a 100644 --- a/grapesy.cabal +++ b/grapesy.cabal @@ -346,7 +346,6 @@ test-suite test-grapesy , bytestring >= 0.10 && < 0.13 , case-insensitive >= 1.2 && < 1.3 , containers >= 0.6 && < 0.8 - , directory >= 1.3 && < 1.4 , exceptions >= 0.10 && < 0.11 , http-types >= 0.12 && < 0.13 , http2 >= 5.3.1 && < 5.4 @@ -365,6 +364,7 @@ test-suite test-grapesy , tasty >= 1.4 && < 1.6 , tasty-hunit >= 0.10 && < 0.11 , tasty-quickcheck >= 0.10 && < 0.12 + , temporary >= 1.3 && < 1.4 , text >= 1.2 && < 2.2 , tls >= 1.7 && < 2.2 , tree-diff >= 0.3 && < 0.4 diff --git a/src/Network/GRPC/Client/Connection.hs b/src/Network/GRPC/Client/Connection.hs index 745011c7..a9f4ffa5 100644 --- a/src/Network/GRPC/Client/Connection.hs +++ b/src/Network/GRPC/Client/Connection.hs @@ -29,6 +29,7 @@ import Control.Concurrent.STM import Control.Monad import Control.Monad.Catch import Data.Default +import Data.Maybe import GHC.Stack import Network.HPACK qualified as HPACK import Network.HTTP2.Client qualified as HTTP2.Client @@ -165,13 +166,18 @@ data ReconnectPolicy = -- connection), do not attempt to connect again. DontReconnect - -- | Reconnect after random delay after the IO action returns + -- | Reconnect to the (potentially different) server after the IO action + -- returns + -- + -- If the 'Maybe' is 'Just', we'll attempt to reconnect to a server at the + -- new address. If 'Nothing', we'll attempt to connect to the original + -- server that 'withConnection' was given. -- -- This is a very general API: typically the IO action will call -- 'threadDelay' after some amount of time (which will typically involve -- some randomness), but it can be used to do things such as display a -- message to the user somewhere that the client is reconnecting. - | ReconnectAfter (IO ReconnectPolicy) + | ReconnectAfter (Maybe Server) (IO ReconnectPolicy) -- | The default policy is 'DontReconnect' -- @@ -207,7 +213,7 @@ exponentialBackoff waitFor e = go where go :: (Double, Double) -> Word -> ReconnectPolicy go _ 0 = DontReconnect - go (lo, hi) n = ReconnectAfter $ do + go (lo, hi) n = ReconnectAfter Nothing $ do delay <- randomRIO (lo, hi) waitFor $ round $ delay * 1_000_000 return $ go (lo * e, hi * e) (pred n) @@ -378,11 +384,11 @@ stayConnected :: -> TVar ConnectionState -> MVar () -> IO () -stayConnected connParams server connStateVar connOutOfScope = - loop (connReconnectPolicy connParams) +stayConnected connParams initialServer connStateVar connOutOfScope = do + loop initialServer (connReconnectPolicy connParams) where - loop :: ReconnectPolicy -> IO () - loop remainingReconnectPolicy = do + loop :: Server -> ReconnectPolicy -> IO () + loop server remainingReconnectPolicy = do -- Start new attempt (this just allocates some internal state) attempt <- newConnectionAttempt connParams connStateVar connOutOfScope @@ -425,9 +431,9 @@ stayConnected connParams server connStateVar connOutOfScope = atomically $ writeTVar connStateVar $ ConnectionAbandoned err (False, DontReconnect) -> do atomically $ writeTVar connStateVar $ ConnectionAbandoned err - (False, ReconnectAfter f) -> do + (False, ReconnectAfter mNewServer f) -> do atomically $ writeTVar connStateVar $ ConnectionNotReady - loop =<< f + loop (fromMaybe initialServer mNewServer) =<< f -- | Insecure connection (no TLS) connectInsecure :: ConnParams -> Attempt -> Address -> IO () diff --git a/test-grapesy/Test/Driver/ClientServer.hs b/test-grapesy/Test/Driver/ClientServer.hs index 2678a31a..11388cad 100644 --- a/test-grapesy/Test/Driver/ClientServer.hs +++ b/test-grapesy/Test/Driver/ClientServer.hs @@ -533,7 +533,7 @@ runTestClient cfg firstTestFailure port clientRun = do -- This avoids a race condition between the server starting first -- and the client starting first. , connReconnectPolicy = - Client.ReconnectAfter $ do + Client.ReconnectAfter Nothing $ do threadDelay 100_000 return Client.DontReconnect } diff --git a/test-grapesy/Test/Sanity/Disconnect.hs b/test-grapesy/Test/Sanity/Disconnect.hs index 2d164a0d..d2ba6c3e 100644 --- a/test-grapesy/Test/Sanity/Disconnect.hs +++ b/test-grapesy/Test/Sanity/Disconnect.hs @@ -24,6 +24,7 @@ import Data.Either import Data.IORef import Data.Word import Foreign.C.Types (CInt(..)) +import Network.Socket import System.Posix import Test.Tasty import Test.Tasty.HUnit @@ -59,7 +60,7 @@ test_clientDisconnect = do ] portSignal <- newEmptyMVar - void $ forkIO $ rawTestServer (pure Nothing) (putMVar portSignal) server + void $ forkIO $ rawTestServer (putMVar portSignal) server -- Start server serverPort <- readMVar portSignal @@ -126,24 +127,19 @@ test_serverDisconnect :: Assertion test_serverDisconnect = withTemporaryFile $ \ipcFile -> do -- We use a temporary file as a very rudimentary means of inter-process -- communication so the server (which runs in a separate process) can make - -- the client aware of the port it is assigned by the OS. This also helps us - -- make sure the server binds to the same port when it comes back up for - -- reconnect purposes. - let ipcWrite :: String -> IO () - ipcWrite msg = do - writeFile ipcFile "" - writeFile ipcFile msg + -- the client aware of the port it is assigned by the OS. + let ipcWrite :: PortNumber -> IO () + ipcWrite port = do + writeFile ipcFile (show port) - ipcRead :: IO String - ipcRead = readFile ipcFile - - ipcWaitRead :: IO String - ipcWaitRead = do - ipcRead >>= \case - "" -> do - threadDelay 10000 >> ipcWaitRead - msg -> do - return msg + ipcRead :: IO PortNumber + ipcRead = do + fmap (readMaybe @PortNumber) (readFile ipcFile) >>= \case + Nothing -> do + ipcRead + Just p -> do + writeFile ipcFile "" + return p -- Create the server server <- @@ -153,22 +149,22 @@ test_serverDisconnect = withTemporaryFile $ \ipcFile -> do ] let -- Starts the server in a new process. Gives back an action that kills - -- the server process. + -- the created server process. startServer :: IO (IO ()) startServer = do serverPid <- forkProcess $ - rawTestServer (readMaybe <$> ipcRead) (ipcWrite . show) server - return $ c_kill (fromIntegral serverPid) sigKILL + rawTestServer ipcWrite server + return $ signalProcess sigKILL serverPid -- Start server, get the port killServer <- startServer - serverPort <- read <$> ipcWaitRead + port1 <- ipcRead signalRestart <- newEmptyMVar - let serverAddress = + let serverAddress port = Client.ServerInsecure Client.Address { addressHost = "127.0.0.1" - , addressPort = serverPort + , addressPort = port , addressAuthority = Nothing } @@ -178,19 +174,23 @@ test_serverDisconnect = withTemporaryFile $ \ipcFile -> do go :: Int -> Client.ReconnectPolicy go n | n == 5 - = Client.ReconnectAfter $ do + = Client.ReconnectAfter Nothing $ do killRestarted <- startServer + port2 <- ipcRead putMVar signalRestart killRestarted - return $ Client.exponentialBackoff threadDelay 1 (1, 1) 100 + return $ + Client.ReconnectAfter + (Just $ serverAddress port2) + (pure Client.DontReconnect) | otherwise - = Client.ReconnectAfter $ do + = Client.ReconnectAfter Nothing $ do threadDelay 10000 return $ go (n + 1) connParams :: Client.ConnParams connParams = def { Client.connReconnectPolicy = reconnectPolicy } - Client.withConnection connParams serverAddress $ \conn -> do + Client.withConnection connParams (serverAddress port1) $ \conn -> do -- Make 50 concurrent calls. 49 of them sending infinite messages. One -- of them kills the server after 100 messages. let numCalls = 50 @@ -216,7 +216,7 @@ test_serverDisconnect = withTemporaryFile $ \ipcFile -> do killRestarted <- takeMVar signalRestart result <- Client.withRPC conn def (Proxy @Trivial) $ countUntil (pure . (>= 100)) - assertBool "" (result == 100) + assertEqual "" 100 result -- Do not leave the server process hanging around killRestarted @@ -274,7 +274,6 @@ echoHandler disconnectCounter call = trackDisconnects disconnectCounter $ do Auxiliary -------------------------------------------------------------------------------} -foreign import ccall unsafe "kill" c_kill :: CInt -> CInt -> IO () foreign import ccall unsafe "exit" c_exit :: CInt -> IO () type Trivial = RawRpc "trivial" "trivial" diff --git a/test-grapesy/Test/Util.hs b/test-grapesy/Test/Util.hs index ef8e0b3e..c0356271 100644 --- a/test-grapesy/Test/Util.hs +++ b/test-grapesy/Test/Util.hs @@ -14,8 +14,8 @@ import Control.Exception import Control.Monad.Catch import Control.Monad.IO.Class import GHC.Stack -import System.Directory import System.IO +import System.IO.Temp {------------------------------------------------------------------------------- Timeouts @@ -51,12 +51,5 @@ within t info io = do generalBracket startTimer stopTimer $ \_ -> io withTemporaryFile :: (FilePath -> IO a) -> IO a -withTemporaryFile k = do - tmpDir <- getTemporaryDirectory - Control.Exception.bracket - (openTempFile tmpDir "grapesy-test-suite.txt") - (removeFile . fst) - ( \(fp, h) -> do - hClose h - k fp - ) +withTemporaryFile k = + withSystemTempFile "grapesy-test-suite.txt" (\fp h -> hClose h >> k fp) diff --git a/test-grapesy/Test/Util/RawTestServer.hs b/test-grapesy/Test/Util/RawTestServer.hs index 10d5708e..77410c8a 100644 --- a/test-grapesy/Test/Util/RawTestServer.hs +++ b/test-grapesy/Test/Util/RawTestServer.hs @@ -16,7 +16,6 @@ import Network.Socket import Network.GRPC.Client qualified as Client import Network.HTTP.Types qualified as HTTP import Network.GRPC.Common -import Data.Maybe {------------------------------------------------------------------------------- Raw test server @@ -35,10 +34,9 @@ import Data.Maybe -- This also allows us to avoid binding to a specific port in the tests (which -- might already be in use on the machine running the tests, leading to spurious -- test failures). -rawTestServer :: IO (Maybe PortNumber) -> (PortNumber -> IO ()) -> HTTP2.Server -> IO () -rawTestServer getPort signalPort server = do - mPortIn <- fromMaybe 0 <$> getPort - addr <- NetworkRun.resolve Stream (Just "127.0.0.1") (show mPortIn) [AI_PASSIVE] +rawTestServer :: (PortNumber -> IO ()) -> HTTP2.Server -> IO () +rawTestServer signalPort server = do + addr <- NetworkRun.resolve Stream (Just "127.0.0.1") "0" [AI_PASSIVE] bracket (NetworkRun.openTCPServerSocket addr) close $ \listenSock -> do addr' <- getSocketName listenSock portOut <- case addr' of @@ -56,7 +54,7 @@ rawTestServer getPort signalPort server = do withTestServer :: HTTP2.Server -> (Client.Address -> IO a) -> IO a withTestServer server k = do serverPort <- newEmptyMVar - withAsync (rawTestServer (pure Nothing) (putMVar serverPort) server) $ + withAsync (rawTestServer (putMVar serverPort) server) $ \_serverThread -> do port <- readMVar serverPort let addr :: Client.Address From 346ac83fb6e1b38daedd40167e10ce75ebccb782 Mon Sep 17 00:00:00 2001 From: Finley McIlwaine Date: Tue, 27 Aug 2024 20:27:48 -0700 Subject: [PATCH 4/7] Add `Proto.API.Trivial` module We had a few spots where we were defining a `RawRPC "trivial" "trivial"` RPC with `NoMetadata`, so just abstracted to deduplicate. --- grapesy.cabal | 1 + proto/Proto/API/Trivial.hs | 21 +++++++++++++++++ test-grapesy/Test/Sanity/Disconnect.hs | 30 ++++++++++++------------- test-grapesy/Test/Sanity/EndOfStream.hs | 14 ++++++------ test-grapesy/Test/Sanity/Exception.hs | 12 +--------- 5 files changed, 44 insertions(+), 34 deletions(-) create mode 100644 proto/Proto/API/Trivial.hs diff --git a/grapesy.cabal b/grapesy.cabal index 187e4c0a..345e9bc1 100644 --- a/grapesy.cabal +++ b/grapesy.cabal @@ -329,6 +329,7 @@ test-suite test-grapesy Proto.API.Interop Proto.API.Ping + Proto.API.Trivial Proto.Empty Proto.Messages Proto.Ping diff --git a/proto/Proto/API/Trivial.hs b/proto/Proto/API/Trivial.hs new file mode 100644 index 00000000..b09b9f43 --- /dev/null +++ b/proto/Proto/API/Trivial.hs @@ -0,0 +1,21 @@ +{-# OPTIONS_GHC -Wno-orphans #-} + +module Proto.API.Trivial + ( -- * Trivial RPC + Trivial + , Trivial' + ) where + +import Network.GRPC.Common +import Network.GRPC.Spec + +{------------------------------------------------------------------------------- + Trivial RPC +-------------------------------------------------------------------------------} + +type Trivial = RawRpc "trivial" "trivial" +type Trivial' s = RawRpc "trivial" s + +type instance RequestMetadata (Trivial' s) = NoMetadata +type instance ResponseInitialMetadata (Trivial' s) = NoMetadata +type instance ResponseTrailingMetadata (Trivial' s) = NoMetadata diff --git a/test-grapesy/Test/Sanity/Disconnect.hs b/test-grapesy/Test/Sanity/Disconnect.hs index d2ba6c3e..8a8d17e8 100644 --- a/test-grapesy/Test/Sanity/Disconnect.hs +++ b/test-grapesy/Test/Sanity/Disconnect.hs @@ -13,7 +13,7 @@ -- 1. The handlers dealing with that client (i.e. on that connection) should -- fail with 'Server.ClientDisonnected' -- 2. Future calls (after reconnection) succeed -module Test.Sanity.Disconnect where +module Test.Sanity.Disconnect (tests) where import Control.Concurrent import Control.Concurrent.Async @@ -36,6 +36,7 @@ import Network.GRPC.Common import Network.GRPC.Server qualified as Server import Network.GRPC.Server.Binary qualified as Binary import Network.GRPC.Spec +import Proto.API.Trivial import Test.Util import Test.Util.RawTestServer @@ -45,6 +46,13 @@ tests = testGroup "Test.Sanity.Disconnect" [ , testCase "server" test_serverDisconnect ] +-- | We want two distinct handlers running at the same time, so we have two +-- trivial RPCs +type RPC1 = Trivial' "rpc1" + +-- | See 'RPC1' +type RPC2 = Trivial' "rpc2" + -- | Two separate clients make many concurrent calls, one of them disconnects. test_clientDisconnect :: Assertion test_clientDisconnect = do @@ -54,9 +62,9 @@ test_clientDisconnect = do server <- Server.mkGrpcServer def [ Server.someRpcHandler $ - Server.mkRpcHandler @Trivial $ echoHandler (Just disconnectCounter1) + Server.mkRpcHandler @RPC1 $ echoHandler (Just disconnectCounter1) , Server.someRpcHandler $ - Server.mkRpcHandler @Trivial' $ echoHandler (Just disconnectCounter2) + Server.mkRpcHandler @RPC2 $ echoHandler (Just disconnectCounter2) ] portSignal <- newEmptyMVar @@ -85,7 +93,7 @@ test_clientDisconnect = do return False ] mapConcurrently_ - ( Client.withRPC conn def (Proxy @Trivial) + ( Client.withRPC conn def (Proxy @RPC1) . countUntil ) predicates @@ -99,14 +107,14 @@ test_clientDisconnect = do (result1, result2) <- concurrently ( Client.withConnection def serverAddress $ \conn -> do sum <$> mapConcurrently - ( Client.withRPC conn def (Proxy @Trivial) + ( Client.withRPC conn def (Proxy @RPC1) . countUntil ) predicates ) ( Client.withConnection def serverAddress $ \conn -> do sum <$> mapConcurrently - ( Client.withRPC conn def (Proxy @Trivial') + ( Client.withRPC conn def (Proxy @RPC2) . countUntil ) predicates @@ -275,13 +283,3 @@ echoHandler disconnectCounter call = trackDisconnects disconnectCounter $ do -------------------------------------------------------------------------------} foreign import ccall unsafe "exit" c_exit :: CInt -> IO () - -type Trivial = RawRpc "trivial" "trivial" -type Trivial' = RawRpc "trivial" "trivial'" - -type instance RequestMetadata Trivial = NoMetadata -type instance ResponseInitialMetadata Trivial = NoMetadata -type instance ResponseTrailingMetadata Trivial = NoMetadata -type instance RequestMetadata Trivial' = NoMetadata -type instance ResponseInitialMetadata Trivial' = NoMetadata -type instance ResponseTrailingMetadata Trivial' = NoMetadata diff --git a/test-grapesy/Test/Sanity/EndOfStream.hs b/test-grapesy/Test/Sanity/EndOfStream.hs index 26e7dea7..97144f2b 100644 --- a/test-grapesy/Test/Sanity/EndOfStream.hs +++ b/test-grapesy/Test/Sanity/EndOfStream.hs @@ -101,7 +101,7 @@ test_recvTrailers = testClientServer $ ClientServerTest { config = def , server = [Server.fromMethod nonStreamingHandler] , client = simpleTestClient $ \conn -> - Client.withRPC conn def (Proxy @Trivial) $ \call -> do + Client.withRPC conn def (Proxy @Poke) $ \call -> do Client.sendFinalInput call BS.Lazy.empty resp <- Client.recvNextOutput call @@ -123,7 +123,7 @@ test_recvTrailers = testClientServer $ ClientServerTest { -------------------------------------------------------------------------------} -- | Receive any string, respond with a single 'mempty' -type Trivial = RawRpc "Test" "trivial" +type Poke = RawRpc "Test" "trivial" -- | Service that simply absorbs all messages and then returns with 'mempty' type Absorb = RawRpc "Test" "absorb" @@ -132,7 +132,7 @@ type Absorb = RawRpc "Test" "absorb" -- client with a bunch of 'mempty' messages type Spam = RawRpc "Test" "spam" -nonStreamingHandler :: ServerHandler' NonStreaming IO Trivial +nonStreamingHandler :: ServerHandler' NonStreaming IO Poke nonStreamingHandler = Server.mkNonStreaming $ \_inp -> return BS.Lazy.empty @@ -161,13 +161,13 @@ test_recvInput = testClientServer $ ClientServerTest { config = def , server = [Server.someRpcHandler handler] , client = simpleTestClient $ \conn -> - Client.withRPC conn def (Proxy @Trivial) $ \call -> do + Client.withRPC conn def (Proxy @Poke) $ \call -> do Client.sendFinalInput call BS.Lazy.empty _resp <- Client.recvFinalOutput call return () } where - handler :: Server.RpcHandler IO Trivial + handler :: Server.RpcHandler IO Poke handler = Server.mkRpcHandler $ \call -> do x <- Server.recvInput call @@ -189,13 +189,13 @@ test_recvEndOfInput = testClientServer $ ClientServerTest { config = def , server = [Server.someRpcHandler handler] , client = simpleTestClient $ \conn -> - Client.withRPC conn def (Proxy @Trivial) $ \call -> do + Client.withRPC conn def (Proxy @Poke) $ \call -> do Client.sendFinalInput call BS.Lazy.empty _resp <- Client.recvFinalOutput call return () } where - handler :: Server.RpcHandler IO Trivial + handler :: Server.RpcHandler IO Poke handler = Server.mkRpcHandler $ \call -> do resp <- Server.recvNextInput call assertEqual "resp" BS.Lazy.empty $ resp diff --git a/test-grapesy/Test/Sanity/Exception.hs b/test-grapesy/Test/Sanity/Exception.hs index e3312e74..46265527 100644 --- a/test-grapesy/Test/Sanity/Exception.hs +++ b/test-grapesy/Test/Sanity/Exception.hs @@ -28,7 +28,7 @@ import Network.GRPC.Client.Binary qualified as Binary import Network.GRPC.Common import Network.GRPC.Server qualified as Server import Network.GRPC.Server.Binary qualified as Binary -import Network.GRPC.Spec +import Proto.API.Trivial import Test.Driver.ClientServer import Test.Util.Exception @@ -184,13 +184,3 @@ incUntilFinal call = do -- ticket above. Server.sendTrailers call NoMetadata return () - -{------------------------------------------------------------------------------- - Auxiliary --------------------------------------------------------------------------------} - -type Trivial = RawRpc "trivial" "trivial" - -type instance RequestMetadata Trivial = NoMetadata -type instance ResponseInitialMetadata Trivial = NoMetadata -type instance ResponseTrailingMetadata Trivial = NoMetadata From 153d200a13f61667d13b482d437732e910edbd65 Mon Sep 17 00:00:00 2001 From: Finley McIlwaine Date: Wed, 28 Aug 2024 07:09:13 -0700 Subject: [PATCH 5/7] Remove `rawTestServer`, use `forkServer` instead `rawTestServer` was doing nothing special anymore, and `forkServer` allows us to query the server port as well. We keep `respondWith` around because it is a useful abstraction for modeling broken servers. --- grapesy.cabal | 1 - test-grapesy/Test/Sanity/Disconnect.hs | 36 ++++++++++--- test-grapesy/Test/Util/RawTestServer.hs | 72 ++++++++++--------------- 3 files changed, 56 insertions(+), 53 deletions(-) diff --git a/grapesy.cabal b/grapesy.cabal index 345e9bc1..0720c6c2 100644 --- a/grapesy.cabal +++ b/grapesy.cabal @@ -353,7 +353,6 @@ test-suite test-grapesy , lens >= 5.0 && < 5.4 , mtl >= 2.2 && < 2.4 , network >= 3.1 && < 3.3 - , network-run >= 0.4 && < 0.5 , prettyprinter >= 1.7 && < 1.8 , prettyprinter-ansi-terminal >= 1.1 && < 1.2 , proto-lens >= 0.7 && < 0.8 diff --git a/test-grapesy/Test/Sanity/Disconnect.hs b/test-grapesy/Test/Sanity/Disconnect.hs index 8a8d17e8..280c5022 100644 --- a/test-grapesy/Test/Sanity/Disconnect.hs +++ b/test-grapesy/Test/Sanity/Disconnect.hs @@ -35,10 +35,10 @@ import Network.GRPC.Client.Binary qualified as Binary import Network.GRPC.Common import Network.GRPC.Server qualified as Server import Network.GRPC.Server.Binary qualified as Binary +import Network.GRPC.Server.Run import Network.GRPC.Spec import Proto.API.Trivial import Test.Util -import Test.Util.RawTestServer tests :: TestTree tests = testGroup "Test.Sanity.Disconnect" [ @@ -67,19 +67,29 @@ test_clientDisconnect = do Server.mkRpcHandler @RPC2 $ echoHandler (Just disconnectCounter2) ] + -- Start server + let serverConfig = ServerConfig { + serverInsecure = Just $ InsecureConfig { + insecureHost = Just "127.0.0.1" + , insecurePort = 0 + } + , serverSecure = Nothing + } portSignal <- newEmptyMVar - void $ forkIO $ rawTestServer (putMVar portSignal) server + void $ forkIO $ forkServer def serverConfig server $ \runningServer -> do + putMVar portSignal =<< getServerPort runningServer + waitServer runningServer - -- Start server + -- Wait for the server to signal its port serverPort <- readMVar portSignal + + -- Start a client in a separate process let serverAddress = Client.ServerInsecure Client.Address { addressHost = "127.0.0.1" , addressPort = serverPort , addressAuthority = Nothing } - - -- Start a client in a separate process void $ forkProcess $ Client.withConnection def serverAddress $ \conn -> do -- Make 50 concurrent calls. 49 of them sending infinite messages. One @@ -156,16 +166,26 @@ test_serverDisconnect = withTemporaryFile $ \ipcFile -> do Server.mkRpcHandler @Trivial $ echoHandler Nothing ] - let -- Starts the server in a new process. Gives back an action that kills + let serverConfig = ServerConfig { + serverInsecure = Just $ InsecureConfig { + insecureHost = Just "127.0.0.1" + , insecurePort = 0 + } + , serverSecure = Nothing + } + + -- Starts the server in a new process. Gives back an action that kills -- the created server process. startServer :: IO (IO ()) startServer = do serverPid <- forkProcess $ - rawTestServer ipcWrite server + forkServer def serverConfig server $ \runningServer -> do + ipcWrite =<< getServerPort runningServer + waitServer runningServer return $ signalProcess sigKILL serverPid - -- Start server, get the port + -- Start server, get the initial port killServer <- startServer port1 <- ipcRead signalRestart <- newEmptyMVar diff --git a/test-grapesy/Test/Util/RawTestServer.hs b/test-grapesy/Test/Util/RawTestServer.hs index 77410c8a..03362f23 100644 --- a/test-grapesy/Test/Util/RawTestServer.hs +++ b/test-grapesy/Test/Util/RawTestServer.hs @@ -1,8 +1,13 @@ -module Test.Util.RawTestServer where +module Test.Util.RawTestServer + ( -- * Raw test server + respondWith + + -- * Abstract response type + , Response(..) + , asciiHeader + , utf8Header + ) where -import Control.Concurrent -import Control.Concurrent.Async -import Control.Exception import Data.ByteString qualified as BS.Strict import Data.ByteString qualified as Strict (ByteString) import Data.ByteString.Builder qualified as BS.Builder @@ -10,12 +15,11 @@ import Data.ByteString.Char8 qualified as BS.Strict.Char8 import Data.ByteString.UTF8 qualified as BS.Strict.UTF8 import Data.String (fromString) import Network.HTTP2.Server qualified as HTTP2 -import Network.Run.TCP qualified as NetworkRun -import Network.Socket import Network.GRPC.Client qualified as Client -import Network.HTTP.Types qualified as HTTP import Network.GRPC.Common +import Network.GRPC.Server.Run +import Network.HTTP.Types qualified as HTTP {------------------------------------------------------------------------------- Raw test server @@ -23,47 +27,27 @@ import Network.GRPC.Common This allows us to simulate broken /servers/. -------------------------------------------------------------------------------} --- | Low-level test server --- --- We bypass the entire grapesy machinery for constructing the server, for added --- flexibility. This allows us to mock broken deployments or run the server in --- another thread that we throw asynchronous exceptions to, for example. --- --- The grapesy client can auto reconnect when the server is not (yet) up and --- running, but to keep things simple, we just signal when the server is ready. --- This also allows us to avoid binding to a specific port in the tests (which --- might already be in use on the machine running the tests, leading to spurious --- test failures). -rawTestServer :: (PortNumber -> IO ()) -> HTTP2.Server -> IO () -rawTestServer signalPort server = do - addr <- NetworkRun.resolve Stream (Just "127.0.0.1") "0" [AI_PASSIVE] - bracket (NetworkRun.openTCPServerSocket addr) close $ \listenSock -> do - addr' <- getSocketName listenSock - portOut <- case addr' of - SockAddrInet port _host -> return port - SockAddrInet6 port _ _host _ -> return port - SockAddrUnix{} -> error "rawTestServer: unexpected unix socket" - signalPort portOut - NetworkRun.runTCPServerWithSocket listenSock $ \clientSock -> - bracket (HTTP2.allocSimpleConfig clientSock 4096) - HTTP2.freeSimpleConfig $ \config -> - HTTP2.run HTTP2.defaultServerConfig config server - -- | Run the server and apply the continuation to an 'Client.Address' holding -- the running server's host and port. withTestServer :: HTTP2.Server -> (Client.Address -> IO a) -> IO a withTestServer server k = do - serverPort <- newEmptyMVar - withAsync (rawTestServer (putMVar serverPort) server) $ - \_serverThread -> do - port <- readMVar serverPort - let addr :: Client.Address - addr = Client.Address { - addressHost = "127.0.0.1" - , addressPort = port - , addressAuthority = Nothing - } - k addr + let serverConfig = + ServerConfig { + serverInsecure = Just $ InsecureConfig { + insecureHost = Just "127.0.0.1" + , insecurePort = 0 + } + , serverSecure = Nothing + } + forkServer def serverConfig server $ \runningServer -> do + port <- getServerPort runningServer + let addr :: Client.Address + addr = Client.Address { + addressHost = "127.0.0.1" + , addressPort = port + , addressAuthority = Nothing + } + k addr -- | Server that responds with the given 'Response', independent of the request respondWith :: Response -> (Client.Address -> IO a) -> IO a From 592ec92f46e0fe6857ff5e3507e5add2ff20efc8 Mon Sep 17 00:00:00 2001 From: Finley McIlwaine Date: Wed, 28 Aug 2024 07:53:59 -0700 Subject: [PATCH 6/7] Abstract test steps in disconnect tests No more of that "predicate" nonsense. Client steps are now described by a little DSL and interpreted by the client. Also enable `-Wmissing-export-lists` --- grapesy.cabal | 1 + interop/Main.hs | 2 +- test-grapesy/Test/Sanity/Disconnect.hs | 134 ++++++++++++++----------- test-grapesy/Test/Sanity/Exception.hs | 9 +- 4 files changed, 81 insertions(+), 65 deletions(-) diff --git a/grapesy.cabal b/grapesy.cabal index 0720c6c2..268dc349 100644 --- a/grapesy.cabal +++ b/grapesy.cabal @@ -37,6 +37,7 @@ common lang -Wno-unticked-promoted-constructors -Wprepositive-qualified-module -Widentities + -Wmissing-export-lists build-depends: base >= 4.14 && < 4.21 default-language: diff --git a/interop/Main.hs b/interop/Main.hs index d1305ec5..d893fd2b 100644 --- a/interop/Main.hs +++ b/interop/Main.hs @@ -1,4 +1,4 @@ -module Main where +module Main (main) where import GHC.Conc (setUncaughtExceptionHandler) import System.IO diff --git a/test-grapesy/Test/Sanity/Disconnect.hs b/test-grapesy/Test/Sanity/Disconnect.hs index 280c5022..ea7c44a5 100644 --- a/test-grapesy/Test/Sanity/Disconnect.hs +++ b/test-grapesy/Test/Sanity/Disconnect.hs @@ -22,13 +22,14 @@ import Control.Monad import Data.ByteString.Lazy qualified as Lazy (ByteString) import Data.Either import Data.IORef +import Data.Maybe import Data.Word import Foreign.C.Types (CInt(..)) import Network.Socket import System.Posix import Test.Tasty import Test.Tasty.HUnit -import Text.Read +import Text.Read hiding (step) import Network.GRPC.Client qualified as Client import Network.GRPC.Client.Binary qualified as Binary @@ -82,63 +83,59 @@ test_clientDisconnect = do -- Wait for the server to signal its port serverPort <- readMVar portSignal - - -- Start a client in a separate process let serverAddress = Client.ServerInsecure Client.Address { addressHost = "127.0.0.1" , addressPort = serverPort , addressAuthority = Nothing } + + + -- Start a client in a separate process + let numCalls = 50 void $ forkProcess $ Client.withConnection def serverAddress $ \conn -> do -- Make 50 concurrent calls. 49 of them sending infinite messages. One -- of them kills this client process after 100 messages. - let numCalls = 50 - predicate = pure . const False - predicates = - replicate (numCalls - 1) predicate ++ - [ \n -> do - when (n == 100) $ c_exit 1 - return False - ] mapConcurrently_ - ( Client.withRPC conn def (Proxy @RPC1) - . countUntil - ) - predicates + ( Client.withRPC conn def (Proxy @RPC1) + . runSteps + ) + $ replicate (numCalls - 1) stepsInfinite ++ + [ mkClientSteps Nothing [ (100, c_exit 1) ] ] -- Start two more clients that make 50 calls to each handler, all calls - -- counting up to 1000 - let numCalls = 50 - countTo = 100 - predicate = pure . (>= countTo) - predicates = replicate numCalls predicate + -- counting up to 100 + let numSteps = 100 + steps = replicate numCalls $ stepsN numSteps (result1, result2) <- concurrently ( Client.withConnection def serverAddress $ \conn -> do sum <$> mapConcurrently ( Client.withRPC conn def (Proxy @RPC1) - . countUntil + . runSteps ) - predicates + steps ) ( Client.withConnection def serverAddress $ \conn -> do sum <$> mapConcurrently ( Client.withRPC conn def (Proxy @RPC2) - . countUntil + . runSteps ) - predicates + steps ) - -- All calls should have finished with a results of 'countTo', for both - -- clients - assertBool "" (result1 + result2 == 2 * sum (replicate numCalls countTo)) + -- All calls by clients in /this/ process (not the ones we killed) should + -- have finished with a result of 'countTo' + assertEqual "" + (2 * sum (replicate numCalls numSteps)) + (fromIntegral $ result1 + result2) -- We should also see only 50 client disconnects for the first handler and -- none for the second clientDisconnects1 <- readIORef disconnectCounter1 clientDisconnects2 <- readIORef disconnectCounter2 - assertBool "" (clientDisconnects1 == 50 && clientDisconnects2 == 0) + assertEqual "" 50 clientDisconnects1 + assertEqual "" 0 clientDisconnects2 -- | Client makes many concurrent calls, server disconnects test_serverDisconnect :: Assertion @@ -221,30 +218,26 @@ test_serverDisconnect = withTemporaryFile $ \ipcFile -> do Client.withConnection connParams (serverAddress port1) $ \conn -> do -- Make 50 concurrent calls. 49 of them sending infinite messages. One -- of them kills the server after 100 messages. - let numCalls = 50 - predicate = pure . const False - predicates = - replicate (numCalls - 1) predicate ++ - [ \n -> do - when (n == 100) killServer - return False - ] + let numCalls = 50 results <- mapConcurrently - ( try @Client.ServerDisconnected - . Client.withRPC conn def (Proxy @Trivial) - . countUntil - ) - predicates + ( try @Client.ServerDisconnected + . Client.withRPC conn def (Proxy @Trivial) + . runSteps + ) + $ replicate (numCalls - 1) stepsInfinite ++ + [ mkClientSteps Nothing [(100, killServer)] ] -- All calls should have failed - assertBool "" (null (rights results) && length (lefts results) == numCalls) + assertBool "" (null (rights results)) + assertEqual "" numCalls (length (lefts results)) -- New calls should succeed (after reconnection) killRestarted <- takeMVar signalRestart - result <- Client.withRPC conn def (Proxy @Trivial) $ - countUntil (pure . (>= 100)) - assertEqual "" 100 result + result <- + Client.withRPC conn def (Proxy @Trivial) $ + runSteps (stepsN numCalls) + assertEqual "" numCalls (fromIntegral result) -- Do not leave the server process hanging around killRestarted @@ -253,26 +246,27 @@ test_serverDisconnect = withTemporaryFile $ \ipcFile -> do Client and handler functions -------------------------------------------------------------------------------} --- | Send increasing numbers to the server until it responds with one that --- satisfies the given predicate. -countUntil :: forall rpc. +-- | Execute the client steps +runSteps :: forall rpc. ( Input rpc ~ Lazy.ByteString , Output rpc ~ Lazy.ByteString , ResponseTrailingMetadata rpc ~ NoMetadata - ) => (Word64 -> IO Bool) -> Client.Call rpc -> IO Word64 -countUntil = go 0 + ) => ClientStep -> Client.Call rpc -> IO Word64 +runSteps = + go 0 where - go :: Word64 -> (Word64 -> IO Bool) -> Client.Call rpc -> IO Word64 - go next p call = do - sat <- p next - if sat then do - Binary.sendFinalInput @Word64 call next - (final, NoMetadata) <- Binary.recvFinalOutput @Word64 call - return final - else do - Binary.sendNextInput @Word64 call next - next' <- Binary.recvNextOutput @Word64 call - go (succ next') p call + go :: Word64 -> ClientStep -> Client.Call rpc -> IO Word64 + go n step call = do + case step of + KeepGoing mact next -> do + fromMaybe (return ()) mact + Binary.sendNextInput @Word64 call n + _ <- Binary.recvNextOutput @Word64 call + go (n + 1) next call + Done -> do + Binary.sendFinalInput @Word64 call n + (_, NoMetadata) <- Binary.recvFinalOutput @Word64 call + return n -- | Echos any input echoHandler :: @@ -303,3 +297,21 @@ echoHandler disconnectCounter call = trackDisconnects disconnectCounter $ do -------------------------------------------------------------------------------} foreign import ccall unsafe "exit" c_exit :: CInt -> IO () + +data ClientStep = KeepGoing (Maybe (IO ())) ClientStep | Done + +mkClientSteps :: Maybe Int -> [(Int, IO ())] -> ClientStep +mkClientSteps = go 0 + where + go !i mn acts + | maybe False (i >=) mn + = Done + | otherwise + = KeepGoing (lookup i acts) $ go (i + 1) mn acts + +stepsN :: Int -> ClientStep +stepsN n = mkClientSteps (Just n) [] + +{-# INLINE stepsInfinite #-} +stepsInfinite :: ClientStep +stepsInfinite = mkClientSteps Nothing [] diff --git a/test-grapesy/Test/Sanity/Exception.hs b/test-grapesy/Test/Sanity/Exception.hs index 46265527..0e97e6a5 100644 --- a/test-grapesy/Test/Sanity/Exception.hs +++ b/test-grapesy/Test/Sanity/Exception.hs @@ -9,9 +9,12 @@ -- call (on either the server or client, e.g. inside either 'mkRpcHandler' or -- 'withRPC'): -- --- 1. Other ongoing calls on that connection are not terminated, and --- 2. future calls are still possible. -module Test.Sanity.Exception where +-- 1. Other ongoing calls on that connection are not terminated (client), and +-- handlers dealing with other calls on that connection are not terminated +-- (server), and +-- 2. future calls are still possible (client), and more handlers can be started +-- to deal with future calls (server). +module Test.Sanity.Exception (tests) where import Control.Concurrent.Async import Control.Exception From bab1fddd9557115b12227c9f1c659db6d5f52d81 Mon Sep 17 00:00:00 2001 From: Finley McIlwaine Date: Fri, 30 Aug 2024 11:08:49 -0700 Subject: [PATCH 7/7] Introduce `ReconnectTo` Reconnect policies can now specify whether they want to attempt reconnection with the original server given to `withConnection`, the last server we attempted connection with, or a new server specified by the policy itself. --- src/Network/GRPC/Client.hs | 1 + src/Network/GRPC/Client/Connection.hs | 38 +++++++++++++++++++----- test-grapesy/Test/Driver/ClientServer.hs | 2 +- test-grapesy/Test/Sanity/Disconnect.hs | 9 ++++-- 4 files changed, 38 insertions(+), 12 deletions(-) diff --git a/src/Network/GRPC/Client.hs b/src/Network/GRPC/Client.hs index 271733fc..9738d19b 100644 --- a/src/Network/GRPC/Client.hs +++ b/src/Network/GRPC/Client.hs @@ -9,6 +9,7 @@ module Network.GRPC.Client ( -- ** Reconnection policy , ReconnectPolicy(..) + , ReconnectTo(..) , exponentialBackoff -- ** Connection parameters diff --git a/src/Network/GRPC/Client/Connection.hs b/src/Network/GRPC/Client/Connection.hs index a9f4ffa5..070474e5 100644 --- a/src/Network/GRPC/Client/Connection.hs +++ b/src/Network/GRPC/Client/Connection.hs @@ -16,6 +16,7 @@ module Network.GRPC.Client.Connection ( , SslKeyLog(..) , ConnParams(..) , ReconnectPolicy(..) + , ReconnectTo(..) , exponentialBackoff -- * Using the connection , connParams @@ -29,7 +30,6 @@ import Control.Concurrent.STM import Control.Monad import Control.Monad.Catch import Data.Default -import Data.Maybe import GHC.Stack import Network.HPACK qualified as HPACK import Network.HTTP2.Client qualified as HTTP2.Client @@ -169,15 +169,27 @@ data ReconnectPolicy = -- | Reconnect to the (potentially different) server after the IO action -- returns -- - -- If the 'Maybe' is 'Just', we'll attempt to reconnect to a server at the - -- new address. If 'Nothing', we'll attempt to connect to the original - -- server that 'withConnection' was given. + -- The 'ReconnectTo' can be used to implement a rudimentary redundancy + -- scheme. For example, you could decide to reconnect to a known fallback + -- server after connection to a main server fails a certain number of times. -- -- This is a very general API: typically the IO action will call -- 'threadDelay' after some amount of time (which will typically involve -- some randomness), but it can be used to do things such as display a -- message to the user somewhere that the client is reconnecting. - | ReconnectAfter (Maybe Server) (IO ReconnectPolicy) + | ReconnectAfter ReconnectTo (IO ReconnectPolicy) + +-- | What server should we attempt to reconnect to? +-- +-- * 'ReconnectToPrevious' will attempt to reconnect to the last server we +-- attempted to connect to, whether or not that attempt was successful. +-- * 'ReconnectToOriginal' will attempt to reconnect to the original server that +-- 'withConnection' was given. +-- * 'ReconnectToNew' will attempt to connect to the newly specified server. +data ReconnectTo = + ReconnectToPrevious + | ReconnectToOriginal + | ReconnectToNew Server -- | The default policy is 'DontReconnect' -- @@ -186,6 +198,9 @@ data ReconnectPolicy = instance Default ReconnectPolicy where def = DontReconnect +instance Default ReconnectTo where + def = ReconnectToPrevious + -- | Exponential backoff -- -- If the exponent is @1@, the delay interval will be the same every step; @@ -213,7 +228,7 @@ exponentialBackoff waitFor e = go where go :: (Double, Double) -> Word -> ReconnectPolicy go _ 0 = DontReconnect - go (lo, hi) n = ReconnectAfter Nothing $ do + go (lo, hi) n = ReconnectAfter def $ do delay <- randomRIO (lo, hi) waitFor $ round $ delay * 1_000_000 return $ go (lo * e, hi * e) (pred n) @@ -431,9 +446,16 @@ stayConnected connParams initialServer connStateVar connOutOfScope = do atomically $ writeTVar connStateVar $ ConnectionAbandoned err (False, DontReconnect) -> do atomically $ writeTVar connStateVar $ ConnectionAbandoned err - (False, ReconnectAfter mNewServer f) -> do + atomically $ writeTVar connStateVar $ ConnectionAbandoned err + (False, ReconnectAfter to f) -> do + let + nextServer = + case to of + ReconnectToPrevious -> server + ReconnectToOriginal -> initialServer + ReconnectToNew new -> new atomically $ writeTVar connStateVar $ ConnectionNotReady - loop (fromMaybe initialServer mNewServer) =<< f + loop nextServer =<< f -- | Insecure connection (no TLS) connectInsecure :: ConnParams -> Attempt -> Address -> IO () diff --git a/test-grapesy/Test/Driver/ClientServer.hs b/test-grapesy/Test/Driver/ClientServer.hs index 11388cad..bbde47ca 100644 --- a/test-grapesy/Test/Driver/ClientServer.hs +++ b/test-grapesy/Test/Driver/ClientServer.hs @@ -533,7 +533,7 @@ runTestClient cfg firstTestFailure port clientRun = do -- This avoids a race condition between the server starting first -- and the client starting first. , connReconnectPolicy = - Client.ReconnectAfter Nothing $ do + Client.ReconnectAfter def $ do threadDelay 100_000 return Client.DontReconnect } diff --git a/test-grapesy/Test/Sanity/Disconnect.hs b/test-grapesy/Test/Sanity/Disconnect.hs index ea7c44a5..4a265277 100644 --- a/test-grapesy/Test/Sanity/Disconnect.hs +++ b/test-grapesy/Test/Sanity/Disconnect.hs @@ -199,16 +199,16 @@ test_serverDisconnect = withTemporaryFile $ \ipcFile -> do go :: Int -> Client.ReconnectPolicy go n | n == 5 - = Client.ReconnectAfter Nothing $ do + = Client.ReconnectAfter def $ do killRestarted <- startServer port2 <- ipcRead putMVar signalRestart killRestarted return $ Client.ReconnectAfter - (Just $ serverAddress port2) + (Client.ReconnectToNew $ serverAddress port2) (pure Client.DontReconnect) | otherwise - = Client.ReconnectAfter Nothing $ do + = Client.ReconnectAfter def $ do threadDelay 10000 return $ go (n + 1) @@ -296,6 +296,9 @@ echoHandler disconnectCounter call = trackDisconnects disconnectCounter $ do Auxiliary -------------------------------------------------------------------------------} +-- We need to use this to properly simulate the execution environment crashing +-- in an unrecoverable way. In particular, we don't want to give the program a +-- chance to do any of its normal exception handling/cleanup behavior. foreign import ccall unsafe "exit" c_exit :: CInt -> IO () data ClientStep = KeepGoing (Maybe (IO ())) ClientStep | Done