Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wrap outbound client thread stream exceptions #210

Merged
merged 7 commits into from
Aug 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/demo-client.md
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ $ cabal run demo-client -- --core sayHelloBidiStream \
{message: "John Ack"}
Disconnected. Reconnecting after 1701081μs
Reconnecting now.
demo-client: GrpcException {grpcError = GrpcUnknown, grpcErrorMessage = Just "Call closed without trailers", grpcErrorMetadata = []}
demo-client: demo-client: ServerDisconnected {serverDisconnectedException = ..., ...}
```

### Dealing with unterminated streams
Expand Down
9 changes: 8 additions & 1 deletion grapesy.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ common lang
-Wno-unticked-promoted-constructors
-Wprepositive-qualified-module
-Widentities
-Wmissing-export-lists
build-depends:
base >= 4.14 && < 4.21
default-language:
Expand Down Expand Up @@ -309,14 +310,18 @@ test-suite test-grapesy
Test.Prop.IncrementalParsing
Test.Prop.Serialization
Test.Sanity.BrokenDeployments
Test.Sanity.Disconnect
Test.Sanity.EndOfStream
Test.Sanity.Exception
Test.Sanity.Interop
Test.Sanity.StreamingType.CustomFormat
Test.Sanity.StreamingType.NonStreaming
Test.Util
Test.Util.Awkward
Test.Util.Exception
Test.Util.Orphans
Test.Util.Protobuf
Test.Util.RawTestServer

-- Internals we're testing
Network.GRPC.Util.Parser
Expand All @@ -325,6 +330,7 @@ test-suite test-grapesy

Proto.API.Interop
Proto.API.Ping
Proto.API.Trivial
Proto.Empty
Proto.Messages
Proto.Ping
Expand All @@ -348,7 +354,6 @@ test-suite test-grapesy
, lens >= 5.0 && < 5.4
, mtl >= 2.2 && < 2.4
, network >= 3.1 && < 3.3
, network-run >= 0.4 && < 0.5
, prettyprinter >= 1.7 && < 1.8
, prettyprinter-ansi-terminal >= 1.1 && < 1.2
, proto-lens >= 0.7 && < 0.8
Expand All @@ -360,9 +365,11 @@ test-suite test-grapesy
, tasty >= 1.4 && < 1.6
, tasty-hunit >= 0.10 && < 0.11
, tasty-quickcheck >= 0.10 && < 0.12
, temporary >= 1.3 && < 1.4
, text >= 1.2 && < 2.2
, tls >= 1.7 && < 2.2
, tree-diff >= 0.3 && < 0.4
, unix >= 2.7 && < 2.9
, utf8-string >= 1.0 && < 1.1

executable demo-client
Expand Down
2 changes: 1 addition & 1 deletion interop/Interop/Client/TestCase/CustomMetadata.hs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import Proto.API.Interop
-- For both UnaryCall and FullDuplexCall, the reference server (at least some)
-- does not return any initial metadata until we send the first request. The
-- test spec does not specify whether this is expected behaviour or not, so we
-- play it save and only ask for the initial metadata after sending the request.
-- play it safe and only ask for the initial metadata after sending the request.
runTest :: Cmdline -> IO ()
runTest cmdline = do
withConnection def (testServer cmdline) $ \conn -> do
Expand Down
2 changes: 1 addition & 1 deletion interop/Main.hs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
module Main where
module Main (main) where

import GHC.Conc (setUncaughtExceptionHandler)
import System.IO
Expand Down
21 changes: 21 additions & 0 deletions proto/Proto/API/Trivial.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
{-# OPTIONS_GHC -Wno-orphans #-}

module Proto.API.Trivial
( -- * Trivial RPC
Trivial
, Trivial'
) where

import Network.GRPC.Common
import Network.GRPC.Spec

{-------------------------------------------------------------------------------
Trivial RPC
-------------------------------------------------------------------------------}

type Trivial = RawRpc "trivial" "trivial"
type Trivial' s = RawRpc "trivial" s

type instance RequestMetadata (Trivial' s) = NoMetadata
type instance ResponseInitialMetadata (Trivial' s) = NoMetadata
type instance ResponseTrailingMetadata (Trivial' s) = NoMetadata
1 change: 1 addition & 0 deletions src/Network/GRPC/Client.hs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ module Network.GRPC.Client (

-- ** Reconnection policy
, ReconnectPolicy(..)
, ReconnectTo(..)
, exponentialBackoff

-- ** Connection parameters
Expand Down
46 changes: 37 additions & 9 deletions src/Network/GRPC/Client/Connection.hs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ module Network.GRPC.Client.Connection (
, SslKeyLog(..)
, ConnParams(..)
, ReconnectPolicy(..)
, ReconnectTo(..)
, exponentialBackoff
-- * Using the connection
, connParams
Expand Down Expand Up @@ -165,13 +166,30 @@ data ReconnectPolicy =
-- connection), do not attempt to connect again.
DontReconnect

-- | Reconnect after random delay after the IO action returns
-- | Reconnect to the (potentially different) server after the IO action
-- returns
FinleyMcIlwaine marked this conversation as resolved.
Show resolved Hide resolved
--
-- The 'ReconnectTo' can be used to implement a rudimentary redundancy
-- scheme. For example, you could decide to reconnect to a known fallback
-- server after connection to a main server fails a certain number of times.
--
-- This is a very general API: typically the IO action will call
-- 'threadDelay' after some amount of time (which will typically involve
-- some randomness), but it can be used to do things such as display a
-- message to the user somewhere that the client is reconnecting.
| ReconnectAfter (IO ReconnectPolicy)
| ReconnectAfter ReconnectTo (IO ReconnectPolicy)

-- | What server should we attempt to reconnect to?
--
-- * 'ReconnectToPrevious' will attempt to reconnect to the last server we
-- attempted to connect to, whether or not that attempt was successful.
-- * 'ReconnectToOriginal' will attempt to reconnect to the original server that
-- 'withConnection' was given.
-- * 'ReconnectToNew' will attempt to connect to the newly specified server.
data ReconnectTo =
ReconnectToPrevious
| ReconnectToOriginal
| ReconnectToNew Server

-- | The default policy is 'DontReconnect'
--
Expand All @@ -180,6 +198,9 @@ data ReconnectPolicy =
instance Default ReconnectPolicy where
def = DontReconnect

instance Default ReconnectTo where
def = ReconnectToPrevious

-- | Exponential backoff
--
-- If the exponent is @1@, the delay interval will be the same every step;
Expand Down Expand Up @@ -207,7 +228,7 @@ exponentialBackoff waitFor e = go
where
go :: (Double, Double) -> Word -> ReconnectPolicy
go _ 0 = DontReconnect
go (lo, hi) n = ReconnectAfter $ do
go (lo, hi) n = ReconnectAfter def $ do
delay <- randomRIO (lo, hi)
waitFor $ round $ delay * 1_000_000
return $ go (lo * e, hi * e) (pred n)
Expand Down Expand Up @@ -378,11 +399,11 @@ stayConnected ::
-> TVar ConnectionState
-> MVar ()
-> IO ()
stayConnected connParams server connStateVar connOutOfScope =
loop (connReconnectPolicy connParams)
stayConnected connParams initialServer connStateVar connOutOfScope = do
loop initialServer (connReconnectPolicy connParams)
where
loop :: ReconnectPolicy -> IO ()
loop remainingReconnectPolicy = do
loop :: Server -> ReconnectPolicy -> IO ()
loop server remainingReconnectPolicy = do
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Glad to see that the actual implementation was as clean as this :)

-- Start new attempt (this just allocates some internal state)
attempt <- newConnectionAttempt connParams connStateVar connOutOfScope

Expand Down Expand Up @@ -425,9 +446,16 @@ stayConnected connParams server connStateVar connOutOfScope =
atomically $ writeTVar connStateVar $ ConnectionAbandoned err
(False, DontReconnect) -> do
atomically $ writeTVar connStateVar $ ConnectionAbandoned err
(False, ReconnectAfter f) -> do
atomically $ writeTVar connStateVar $ ConnectionAbandoned err
(False, ReconnectAfter to f) -> do
let
nextServer =
case to of
ReconnectToPrevious -> server
ReconnectToOriginal -> initialServer
ReconnectToNew new -> new
atomically $ writeTVar connStateVar $ ConnectionNotReady
loop =<< f
loop nextServer =<< f

-- | Insecure connection (no TLS)
connectInsecure :: ConnParams -> Attempt -> Address -> IO ()
Expand Down
2 changes: 1 addition & 1 deletion src/Network/GRPC/Server/Call.hs
Original file line number Diff line number Diff line change
Expand Up @@ -628,7 +628,7 @@ recvEndOfInput call@Call{} = do

-- | Send 'ProperTrailers'
--
-- This function is not part of the public API: we use it the top-level
-- This function is not part of the public API: we use it as the top-level
-- exception handler in "Network.GRPC.Server" to forward exceptions in server
-- handlers to the client.
--
Expand Down
2 changes: 1 addition & 1 deletion src/Network/GRPC/Server/Handler.hs
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ waitForHandler unmask call handlerThread = loop
--
-- The attempt to forward it to the client is a best-effort only:
--
-- * The nature of the exception might mean that we we cannot send anything to
-- * The nature of the exception might mean that we cannot send anything to
-- the client at all.
-- * It is possible the exception was thrown /after/ the handler already send
-- the trailers to the client.
Expand Down
6 changes: 5 additions & 1 deletion test-grapesy/Main.hs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ import Test.Prop.Dialogue qualified as Dialogue
import Test.Prop.IncrementalParsing qualified as IncrementalParsing
import Test.Prop.Serialization qualified as Serialization
import Test.Sanity.BrokenDeployments qualified as BrokenDeployments
import Test.Sanity.Disconnect qualified as Disconnect
import Test.Sanity.EndOfStream qualified as EndOfStream
import Test.Sanity.Exception qualified as Exception
import Test.Sanity.Interop qualified as Interop
import Test.Sanity.StreamingType.CustomFormat qualified as StreamingType.CustomFormat
import Test.Sanity.StreamingType.NonStreaming qualified as StreamingType.NonStreaming
Expand All @@ -28,11 +30,13 @@ main = do

defaultMain $ testGroup "grapesy" [
testGroup "Sanity" [
EndOfStream.tests
Disconnect.tests
, EndOfStream.tests
, testGroup "StreamingType" [
StreamingType.NonStreaming.tests
, StreamingType.CustomFormat.tests
]
, Exception.tests
, Interop.tests
, BrokenDeployments.tests
]
Expand Down
11 changes: 3 additions & 8 deletions test-grapesy/Test/Driver/ClientServer.hs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import Network.GRPC.Common
import Network.GRPC.Common.Compression qualified as Compr
import Network.GRPC.Server qualified as Server
import Network.GRPC.Server.Run qualified as Server
import Test.Util.Exception

import Paths_grapesy

Expand Down Expand Up @@ -168,12 +169,6 @@ data TlsFail =
we don't see these exceptions server-side.
-------------------------------------------------------------------------------}

-- | Exception thrown by client or handler to test exception handling
data DeliberateException = forall e. Exception e => DeliberateException e
deriving anyclass (Exception)

deriving stock instance Show DeliberateException

isExpectedServerException :: ClientServerConfig -> SomeException -> Bool
isExpectedServerException cfg e
--
Expand Down Expand Up @@ -232,7 +227,7 @@ isExpectedClientException cfg e
| Just (DeliberateException _) <- fromException e
= True

-- Server threw deliberat exception
-- Server threw deliberate exception
| Just grpcException <- fromException e
, Just msg <- grpcErrorMessage grpcException
, "DeliberateException" `Text.isInfixOf` msg
Expand Down Expand Up @@ -538,7 +533,7 @@ runTestClient cfg firstTestFailure port clientRun = do
-- This avoids a race condition between the server starting first
-- and the client starting first.
, connReconnectPolicy =
Client.ReconnectAfter $ do
Client.ReconnectAfter def $ do
threadDelay 100_000
return Client.DontReconnect
}
Expand Down
21 changes: 1 addition & 20 deletions test-grapesy/Test/Driver/Dialogue/Definition.hs
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@ module Test.Driver.Dialogue.Definition (
, hasEarlyTermination
) where

import Control.Exception
import Control.Monad.State (StateT, execStateT, modify)
import Data.Bifunctor
import Data.ByteString qualified as Strict (ByteString)

import Network.GRPC.Common

import Test.Driver.Dialogue.TestClock qualified as TestClock
import Test.Util.Exception
import Control.Monad.Catch
import GHC.Show (appPrec1, showCommaSpace)

Expand Down Expand Up @@ -154,25 +154,6 @@ newtype GlobalSteps = GlobalSteps {
}
deriving stock (Show)

{-------------------------------------------------------------------------------
User exceptions

When a test calls for the client or the server to throw an exception, we throw
one of these. Their sole purpose is to be "any" kind of exception (not a
specific one).
-------------------------------------------------------------------------------}

data SomeServerException = SomeServerException ExceptionId
deriving stock (Show, Eq)
deriving anyclass (Exception)

data SomeClientException = SomeClientException ExceptionId
deriving stock (Show, Eq)
deriving anyclass (Exception)

-- | We distinguish exceptions from each other simply by a number
type ExceptionId = Int

{-------------------------------------------------------------------------------
Utility
-------------------------------------------------------------------------------}
Expand Down
8 changes: 8 additions & 0 deletions test-grapesy/Test/Driver/Dialogue/Execution.hs
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,8 @@ serverLocal clock call = \(LocalSteps steps) -> do
Terminate mErr -> do
mInp <- liftIO $ try $ within timeoutReceive action $
Server.Binary.recvInput call
-- TODO: <https://github.com/well-typed/grapesy/issues/209>
--
-- On the server side we cannot distinguish regular client
-- termination from an exception when receiving.
let expectation = isExpectedElem $ NoMoreElems NoMetadata
Expand All @@ -426,6 +428,12 @@ serverLocal clock call = \(LocalSteps steps) -> do
-- terminate more-or-less immediately, this does not necessarily indicate
-- any kind of failure: the client may simply have put the call in
-- half-closed mode.
--
-- TODO: <https://github.com/well-typed/grapesy/issues/209>
-- However, when the client terminates early and we are not using one
-- connection per RPC (i.e. we are sharing a connection), the server will
-- /never/ realize that the client has disappeared. See the discussion in
-- the issue above.
waitForClientDisconnect :: IO ()
waitForClientDisconnect =
within timeoutFailure () $ loop
Expand Down
Loading