Skip to content

Commit

Permalink
Explicitly cancel streams on closeRPC
Browse files Browse the repository at this point in the history
Using the new `outBodyCancel`, the client can now clearly indicate that it is
closing the stream as the result of an exception, and the server can handle it
properly.
  • Loading branch information
FinleyMcIlwaine committed Aug 30, 2024
1 parent dec0e10 commit 9196096
Show file tree
Hide file tree
Showing 10 changed files with 169 additions and 102 deletions.
4 changes: 2 additions & 2 deletions grapesy.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ library
, exceptions >= 0.10 && < 0.11
, hashable >= 1.3 && < 1.5
, http-types >= 0.12 && < 0.13
, http2 >= 5.3.1 && < 5.4
, http2 >= 5.3.4 && < 5.4
, http2-tls >= 0.4.1 && < 0.5
, lens >= 5.0 && < 5.4
, mtl >= 2.2 && < 2.4
Expand Down Expand Up @@ -350,7 +350,7 @@ test-suite test-grapesy
, containers >= 0.6 && < 0.8
, exceptions >= 0.10 && < 0.11
, http-types >= 0.12 && < 0.13
, http2 >= 5.3.1 && < 5.4
, http2 >= 5.3.4 && < 5.4
, lens >= 5.0 && < 5.4
, mtl >= 2.2 && < 2.4
, network >= 3.1 && < 3.3
Expand Down
14 changes: 7 additions & 7 deletions interop/Interop/Client/TestCase/CancelAfterBegin.hs
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,19 @@ import Network.GRPC.Common

import Interop.Client.Connect
import Interop.Cmdline
import Interop.Util.Exceptions

import Proto.API.Interop

-- | <https://github.com/grpc/grpc/blob/master/doc/interop-test-descriptions.md#cancel_after_begin>
--
-- This is not really testing anything about the server, but rather about how
-- cancellation gets reported by the grapesy client library itself.
-- cancellation is handled by the grapesy client itself. In particular,
-- immediate cancellation does not result in any client-side exceptions or
-- errors.
runTest :: Cmdline -> IO ()
runTest cmdline =
withConnection def (testServer cmdline) $ \conn -> do
assertThrows (assertEqual GrpcCancelled . grpcError) $
withRPC conn def (Proxy @StreamingInputCall) $ \_call ->
-- Immediately cancel request
return ()
withConnection def (testServer cmdline) $ \conn ->
withRPC conn def (Proxy @StreamingInputCall) $ \_call ->
-- Immediately cancel request
return ()

2 changes: 1 addition & 1 deletion interop/Interop/Server.hs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ withInteropServer cmdline k = do
= ServerConfig {
serverSecure = Nothing
, serverInsecure = Just InsecureConfig {
insecureHost = Nothing
insecureHost = Just "127.0.0.1"
, insecurePort = cmdPort cmdline
}
}
Expand Down
35 changes: 29 additions & 6 deletions src/Network/GRPC/Client/Call.hs
Original file line number Diff line number Diff line change
Expand Up @@ -111,10 +111,33 @@ withRPC conn callParams proxy k = fmap fst $
generalBracket
(liftIO $ startRPC conn proxy callParams)
closeRPC
k
(k . fst)
where
closeRPC :: Call rpc -> ExitCase a -> m ()
closeRPC call exitCase = liftIO $ do
closeRPC :: (Call rpc, Session.CancelRequest) -> ExitCase a -> m ()
closeRPC (call, cancelRequest) exitCase = liftIO $ do
-- When we call 'Session.close', we will terminate the
-- 'sendMessageLoop', @http2@ will interpret this as a clean termination
-- of the stream. We must therefore cancel this stream before calling
-- 'Session.close'. /If/ the final message has already been sent,
-- @http2@ guarantees (as a postcondition of @outBodyPushFinal@) that
-- cancellation will be a no-op.
cancelRequest $
case exitCase of
ExitCaseSuccess _ ->
-- Error code will be CANCEL
Nothing
ExitCaseAbort ->
-- Error code will be INTERNAL_ERROR. The client aborted with an
-- error that we don't have access to. We want to tell the server
-- that something has gone wrong (i.e. INTERNAL_ERROR), so we must
-- pass an exception, however the exact nature of the exception is
-- not particularly important as it is only recorded locally.
Just . toException $ Session.ChannelAborted callStack
ExitCaseException e ->
-- Error code will be INTERNAL_ERROR
Just e

-- Close /after/ cancelling
mException <- liftIO $ Session.close (callChannel call) exitCase
case mException of
Nothing -> return ()
Expand Down Expand Up @@ -186,7 +209,7 @@ startRPC :: forall rpc.
=> Connection
-> Proxy rpc
-> CallParams rpc
-> IO (Call rpc)
-> IO (Call rpc, Session.CancelRequest)
startRPC conn _ callParams = do
(connClosed, connToServer) <- Connection.getConnectionToServer conn
cOut <- Connection.getOutboundCompression conn
Expand All @@ -205,7 +228,7 @@ startRPC conn _ callParams = do
. grpcClassifyTermination
. either trailersOnlyToProperTrailers' id

channel <-
(channel, cancelRequest) <-
Session.setupRequestChannel
session
connToServer
Expand Down Expand Up @@ -235,7 +258,7 @@ startRPC conn _ callParams = do
_mAlreadyClosed <- Session.close channel exitReason
return ()

return $ Call channel
return (Call channel, cancelRequest)
where
connParams :: ConnParams
connParams = Connection.connParams conn
Expand Down
54 changes: 25 additions & 29 deletions test-grapesy/Test/Driver/Dialogue/Execution.hs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
{-# OPTIONS_GHC -Wno-orphans #-}

module Test.Driver.Dialogue.Execution (
execGlobalSteps
ConnUsage(..)
, execGlobalSteps
) where

import Control.Concurrent
Expand All @@ -18,6 +19,7 @@ import GHC.Stack
import GHC.TypeLits

import Network.GRPC.Client qualified as Client
import Network.HTTP2.Client qualified as HTTP2.Client
import Network.GRPC.Client.Binary qualified as Client.Binary
import Network.GRPC.Common
import Network.GRPC.Common.Binary
Expand Down Expand Up @@ -271,21 +273,17 @@ clientLocal clock call = \(LocalSteps steps) ->

clientGlobal ::
TestClock
-> Bool
-> ConnUsage
-- ^ Use new connection for each RPC call?
--
-- Multiple RPC calls on a single connection /ought/ to be independent of
-- each other, with something going wrong on one should not affect another.
-- This is currently however not the case, I /think/ due to limitations of
-- @http2@.
--
-- See <https://github.com/well-typed/grapesy/issues/102>.
-- each other. Something going wrong on one should not affect another.
-> GlobalSteps
-> TestClient
clientGlobal clock connPerRPC global connParams testServer delimitTestScope =
if connPerRPC
then go Nothing [] (getGlobalSteps global)
else withConn $ \c -> go (Just c) [] (getGlobalSteps global)
clientGlobal clock connUsage global connParams testServer delimitTestScope =
case connUsage of
ConnPerRPC -> go Nothing [] (getGlobalSteps global)
SharedConn -> withConn $ \c -> go (Just c) [] (getGlobalSteps global)
where
withConn :: (Client.Connection -> IO ()) -> IO ()
withConn = Client.withConnection connParams testServer
Expand Down Expand Up @@ -413,12 +411,7 @@ serverLocal clock call = \(LocalSteps steps) -> do
Terminate mErr -> do
mInp <- liftIO $ try $ within timeoutReceive action $
Server.Binary.recvInput call
-- TODO: <https://github.com/well-typed/grapesy/issues/209>
--
-- On the server side we cannot distinguish regular client
-- termination from an exception when receiving.
let expectation = isExpectedElem $ NoMoreElems NoMetadata
expect (tick, action) expectation mInp
expect (tick, action) isClientDisconnected mInp
modify $ ifPeerAlive $ PeerTerminated $ DeliberateException <$> mErr

-- Wait for the client disconnect to become visible
Expand All @@ -428,12 +421,6 @@ serverLocal clock call = \(LocalSteps steps) -> do
-- terminate more-or-less immediately, this does not necessarily indicate
-- any kind of failure: the client may simply have put the call in
-- half-closed mode.
--
-- TODO: <https://github.com/well-typed/grapesy/issues/209>
-- However, when the client terminates early and we are not using one
-- connection per RPC (i.e. we are sharing a connection), the server will
-- /never/ realize that the client has disappeared. See the discussion in
-- the issue above.
waitForClientDisconnect :: IO ()
waitForClientDisconnect =
within timeoutFailure () $ loop
Expand All @@ -457,6 +444,16 @@ serverLocal clock call = \(LocalSteps steps) -> do
isExpectedElem _ (Left _) = False
isExpectedElem expectedElem (Right streamElem) = expectedElem == streamElem

isClientDisconnected ::
Either Server.ClientDisconnected (StreamElem NoMetadata Int)
-> Bool
isClientDisconnected (Left (Server.ClientDisconnected e _))
| Just HTTP2.Client.ConnectionIsClosed <- fromException e
= True
| otherwise
= False
isClientDisconnected _ = False

serverGlobal ::
HasCallStack
=> TestClock
Expand Down Expand Up @@ -495,8 +492,10 @@ serverGlobal clock globalStepsVar call = do
Top-level
-------------------------------------------------------------------------------}

execGlobalSteps :: GlobalSteps -> IO ClientServerTest
execGlobalSteps steps = do
data ConnUsage = SharedConn | ConnPerRPC

execGlobalSteps :: ConnUsage -> GlobalSteps -> IO ClientServerTest
execGlobalSteps connUsage steps = do
globalStepsVar <- newMVar (order steps)
clock <- TestClock.new

Expand All @@ -513,7 +512,7 @@ execGlobalSteps steps = do
expectEarlyClientTermination = clientTerminatesEarly
, expectEarlyServerTermination = serverTerminatesEarly
}
, client = clientGlobal clock connPerRPC steps
, client = clientGlobal clock connUsage steps
, server = [
handler (Proxy @TestRpc1)
, handler (Proxy @TestRpc2)
Expand All @@ -524,9 +523,6 @@ execGlobalSteps steps = do
clientTerminatesEarly, serverTerminatesEarly :: Bool
(clientTerminatesEarly, serverTerminatesEarly) = hasEarlyTermination steps

connPerRPC :: Bool
connPerRPC = serverTerminatesEarly || clientTerminatesEarly

-- For 'clientGlobal' the order doesn't matter, because it spawns a thread
-- for each 'LocalSteps'. The server however doesn't get this option; the
-- threads /get/ spawnwed for each incoming connection, and must feel off
Expand Down
118 changes: 78 additions & 40 deletions test-grapesy/Test/Prop/Dialogue.hs
Original file line number Diff line number Diff line change
Expand Up @@ -15,39 +15,48 @@ import Test.Driver.Dialogue
tests :: TestTree
tests = testGroup "Test.Prop.Dialogue" [
testGroup "Regression" [
testCase "trivial1" $ regression trivial1
, testCase "trivial2" $ regression trivial2
, testCase "trivial3" $ regression trivial3
, testCase "concurrent1" $ regression concurrent1
, testCase "concurrent2" $ regression concurrent2
, testCase "concurrent3" $ regression concurrent3
, testCase "concurrent4" $ regression concurrent4
, testCase "exception1" $ regression exception1
, testCase "exception2" $ regression exception2
, testCase "earlyTermination01" $ regression earlyTermination01
, testCase "earlyTermination02" $ regression earlyTermination02
, testCase "earlyTermination03" $ regression earlyTermination03
, testCase "earlyTermination04" $ regression earlyTermination04
, testCase "earlyTermination05" $ regression earlyTermination05
, testCase "earlyTermination06" $ regression earlyTermination06
, testCase "earlyTermination07" $ regression earlyTermination07
, testCase "earlyTermination08" $ regression earlyTermination08
, testCase "earlyTermination09" $ regression earlyTermination09
, testCase "earlyTermination10" $ regression earlyTermination10
, testCase "earlyTermination11" $ regression earlyTermination11
, testCase "earlyTermination12" $ regression earlyTermination12
, testCase "earlyTermination13" $ regression earlyTermination13
, testCase "earlyTermination14" $ regression earlyTermination14
, testCase "allowHalfClosed1" $ regression allowHalfClosed1
, testCase "allowHalfClosed2" $ regression allowHalfClosed2
, testCase "allowHalfClosed3" $ regression allowHalfClosed3
testCase "trivial1" $ regression SharedConn trivial1
, testCase "trivial2" $ regression SharedConn trivial2
, testCase "trivial3" $ regression SharedConn trivial3
, testCase "concurrent1" $ regression SharedConn concurrent1
, testCase "concurrent2" $ regression SharedConn concurrent2
, testCase "concurrent3" $ regression SharedConn concurrent3
, testCase "concurrent4" $ regression SharedConn concurrent4
, testCase "exception1" $ regression ConnPerRPC exception1
, testCase "exception2" $ regression ConnPerRPC exception2
, testCase "earlyTermination01" $ regression ConnPerRPC earlyTermination01
, testCase "earlyTermination02" $ regression ConnPerRPC earlyTermination02
, testCase "earlyTermination03" $ regression ConnPerRPC earlyTermination03
, testCase "earlyTermination04" $ regression ConnPerRPC earlyTermination04
, testCase "earlyTermination05" $ regression ConnPerRPC earlyTermination05
, testCase "earlyTermination06" $ regression ConnPerRPC earlyTermination06
, testCase "earlyTermination07" $ regression ConnPerRPC earlyTermination07
, testCase "earlyTermination08" $ regression ConnPerRPC earlyTermination08
, testCase "earlyTermination09" $ regression ConnPerRPC earlyTermination09
, testCase "earlyTermination10" $ regression ConnPerRPC earlyTermination10
, testCase "earlyTermination11" $ regression ConnPerRPC earlyTermination11
, testCase "earlyTermination12" $ regression ConnPerRPC earlyTermination12
, testCase "earlyTermination13" $ regression ConnPerRPC earlyTermination13
, testCase "earlyTermination14" $ regression ConnPerRPC earlyTermination14
, testCase "unilateralTermination1" $ regression SharedConn unilateralTermination1
, testCase "unilateralTermination2" $ regression SharedConn unilateralTermination2
, testCase "unilateralTermination3" $ regression SharedConn unilateralTermination3
, testCase "allowHalfClosed1" $ regression SharedConn allowHalfClosed1
, testCase "allowHalfClosed2" $ regression SharedConn allowHalfClosed2
, testCase "allowHalfClosed3" $ regression ConnPerRPC allowHalfClosed3
]
, testGroup "Setup" [
testProperty "shrinkingWellFounded" prop_shrinkingWellFounded
]
, testGroup "Arbitrary" [
testProperty "withoutExceptions" arbitraryWithoutExceptions
, testProperty "withExceptions" arbitraryWithExceptions
testGroup "WithoutExceptions" [
testProperty "connPerRPC" $ arbitraryWithoutExceptions ConnPerRPC
, testProperty "sharedConn" $ arbitraryWithoutExceptions SharedConn
]
, testGroup "WithExceptions" [
testProperty "connPerRPC" $ arbitraryWithExceptions ConnPerRPC
, testProperty "sharedConn" $ arbitraryWithExceptions SharedConn
]
]
]

Expand All @@ -66,26 +75,26 @@ prop_shrinkingWellFounded =
Running the tests
-------------------------------------------------------------------------------}

arbitraryWithoutExceptions :: DialogueWithoutExceptions -> Property
arbitraryWithoutExceptions (DialogueWithoutExceptions dialogue) =
propDialogue dialogue
arbitraryWithoutExceptions :: ConnUsage -> DialogueWithoutExceptions -> Property
arbitraryWithoutExceptions connUsage (DialogueWithoutExceptions dialogue) =
propDialogue connUsage dialogue

arbitraryWithExceptions :: DialogueWithExceptions -> Property
arbitraryWithExceptions (DialogueWithExceptions dialogue) =
propDialogue dialogue
arbitraryWithExceptions :: ConnUsage -> DialogueWithExceptions -> Property
arbitraryWithExceptions connUsage (DialogueWithExceptions dialogue) =
propDialogue connUsage dialogue

propDialogue :: Dialogue -> Property
propDialogue dialogue =
propDialogue :: ConnUsage -> Dialogue -> Property
propDialogue connUsage dialogue =
counterexample (show globalSteps) $
propClientServer $ execGlobalSteps globalSteps
propClientServer $ execGlobalSteps connUsage globalSteps
where
globalSteps :: GlobalSteps
globalSteps = dialogueGlobalSteps dialogue

regression :: Dialogue -> IO ()
regression dialogue =
regression :: ConnUsage -> Dialogue -> IO ()
regression connUsage dialogue =
handle (throwIO . RegressionTestFailed globalSteps) $
testClientServer =<< execGlobalSteps globalSteps
testClientServer =<< execGlobalSteps connUsage globalSteps
where
globalSteps :: GlobalSteps
globalSteps = dialogueGlobalSteps dialogue
Expand Down Expand Up @@ -359,6 +368,35 @@ earlyTermination14 = NormalizedDialogue [
, (0, ServerAction $ Terminate (Just (SomeServerException 0)))
]

unilateralTermination1 :: Dialogue
unilateralTermination1 = NormalizedDialogue [
(1,ClientAction (Initiate (def,RPC1)))
, (1,ServerAction (Send (FinalElem 0 def)))
, (0,ClientAction (Initiate (def,RPC1)))
, (0,ClientAction (Send (NoMoreElems NoMetadata)))
, (0,ServerAction (Send (NoMoreElems def)))
]

unilateralTermination2 :: Dialogue
unilateralTermination2 = NormalizedDialogue [
(1,ClientAction (Initiate (def,RPC1)))
, (1,ServerAction (Send (FinalElem 0 def)))
, (0,ClientAction (Initiate (def,RPC1)))
, (0,ClientAction (Send (NoMoreElems NoMetadata)))
, (0,ServerAction (Send (NoMoreElems def)))
, (2,ClientAction (Initiate (def,RPC1)))
, (2,ServerAction (Send (FinalElem 0 def)))
]

unilateralTermination3 :: Dialogue
unilateralTermination3 = NormalizedDialogue [
(0, ClientAction (Initiate (def,RPC1)))
, (0, ServerAction (Send (NoMoreElems def)))
, (1, ClientAction (Initiate (def,RPC1)))
, (1, ClientAction (Send (FinalElem 0 NoMetadata)))
, (1, ServerAction (Send (NoMoreElems def)))
]

{-------------------------------------------------------------------------------
Dealing correctly with 'AllowHalfClosed'
Expand Down
Loading

0 comments on commit 9196096

Please sign in to comment.