Skip to content

Commit

Permalink
Move startRPC to .Call
Browse files Browse the repository at this point in the history
This separates all connection stuff from all RPC specific stuff.
  • Loading branch information
edsko committed Jul 24, 2024
1 parent 72670a0 commit 968ae3d
Show file tree
Hide file tree
Showing 2 changed files with 157 additions and 131 deletions.
121 changes: 118 additions & 3 deletions src/Network/GRPC/Client/Call.hs
Original file line number Diff line number Diff line change
Expand Up @@ -36,21 +36,30 @@ import Control.Monad
import Control.Monad.Catch
import Control.Monad.IO.Class
import Data.Bitraversable
import Data.ByteString.Char8 qualified as BS.Strict.C8
import Data.Default
import Data.Maybe (isJust)
import Data.Foldable (asum)
import Data.List (intersperse)
import Data.Maybe (fromMaybe, isJust)
import Data.Proxy
import Data.Text qualified as Text
import Data.Version
import GHC.Stack

import Network.GRPC.Client.Connection (Connection, Call(..))
import Network.GRPC.Client.Connection (Connection, ConnParams, Call(..))
import Network.GRPC.Client.Connection qualified as Connection
import Network.GRPC.Client.Session
import Network.GRPC.Common
import Network.GRPC.Common.Compression qualified as Compression
import Network.GRPC.Common.StreamElem qualified as StreamElem
import Network.GRPC.Spec
import Network.GRPC.Util.GHC
import Network.GRPC.Util.HTTP2.Stream (ServerDisconnected(..))
import Network.GRPC.Util.Session qualified as Session
import Network.GRPC.Util.Thread qualified as Thread

import Paths_grapesy qualified as Grapesy

{-------------------------------------------------------------------------------
Open a call
-------------------------------------------------------------------------------}
Expand Down Expand Up @@ -90,7 +99,7 @@ withRPC :: forall m rpc a.
=> Connection -> CallParams rpc -> Proxy rpc -> (Call rpc -> m a) -> m a
withRPC conn callParams proxy k = fmap fst $
generalBracket
(liftIO $ Connection.startRPC conn proxy callParams)
(liftIO $ startRPC conn proxy callParams)
closeRPC
k
where
Expand Down Expand Up @@ -155,6 +164,112 @@ withRPC conn callParams proxy k = fmap fst $
, grpcErrorMetadata = []
}

-- | Open new channel to the server
--
-- This is a non-blocking call; the connection will be set up in a
-- background thread; if this takes time, then the first call to
-- 'sendInput' or 'recvOutput' will block, but the call to 'startRPC'
-- itself will not block. This non-blocking nature makes this safe to use
-- in 'bracket' patterns.
startRPC :: forall rpc.
(SupportsClientRpc rpc, HasCallStack)
=> Connection
-> Proxy rpc
-> CallParams rpc
-> IO (Call rpc)
startRPC conn _ callParams = do
(connClosed, connToServer) <- Connection.getConnectionToServer conn
cOut <- Connection.getOutboundCompression conn
metadata <- buildMetadataIO $ callRequestMetadata callParams
let flowStart :: Session.FlowStart (ClientOutbound rpc)
flowStart = Session.FlowStartRegular $ OutboundHeaders {
outHeaders = requestHeaders cOut metadata
, outCompression = fromMaybe noCompression cOut
}

let serverClosedConnection ::
Either TrailersOnly' ProperTrailers'
-> SomeException
serverClosedConnection =
either toException toException
. grpcClassifyTermination
. either (fst . trailersOnlyToProperTrailers) id

channel <-
Session.setupRequestChannel
callSession
connToServer
serverClosedConnection
flowStart

-- Spawn a thread to monitor the connection, and close the new channel when
-- the connection is closed. To prevent a memory leak by hanging on to the
-- channel for the lifetime of the connection, the thread also terminates in
-- the (normal) case that the channel is closed before the connection is.
_ <- forkLabelled "grapesy:monitorConnection" $ do
status <- atomically $ do
(Left <$> Thread.waitForNormalOrAbnormalThreadTermination
(Session.channelOutbound channel))
`orElse`
(Right <$> readTMVar connClosed)
case status of
Left _ -> return () -- Channel closed before the connection
Right mErr -> do
let exitReason :: ExitCase ()
exitReason =
case mErr of
Nothing -> ExitCaseSuccess ()
Just exitWithException ->
ExitCaseException . toException $
ServerDisconnected exitWithException callStack
_mAlreadyClosed <- Session.close channel exitReason
return ()

return $ Call callSession channel
where
connParams :: ConnParams
connParams = Connection.connParams conn

requestHeaders :: Maybe Compression -> [CustomMetadata] -> RequestHeaders
requestHeaders cOut metadata = RequestHeaders{
requestTimeout =
asum [
callTimeout callParams
, Connection.connDefaultTimeout connParams
]
, requestMetadata =
customMetadataMapFromList metadata
, requestCompression =
compressionId <$> cOut
, requestAcceptCompression = Just $
Compression.offer $ Connection.connCompression connParams
, requestContentType =
Connection.connContentType connParams
, requestMessageType =
Just MessageTypeDefault
, requestUserAgent = Just $
mconcat [
"grpc-haskell-grapesy/"
, mconcat . intersperse "." $
map (BS.Strict.C8.pack . show) $
versionBranch Grapesy.version
]
, requestIncludeTE =
True
, requestTraceContext =
Nothing
, requestPreviousRpcAttempts =
Nothing
, requestUnrecognized =
()
}

callSession :: ClientSession rpc
callSession = ClientSession {
clientCompression = Connection.connCompression connParams
, clientUpdateMeta = Connection.updateConnectionMeta conn
}

{-------------------------------------------------------------------------------
Open (ongoing) call
-------------------------------------------------------------------------------}
Expand Down
167 changes: 39 additions & 128 deletions src/Network/GRPC/Client/Connection.hs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ module Network.GRPC.Client.Connection (
-- * Definition
Connection -- opaque
, Call(..)
, connParams
, startRPC
, withConnection
-- * Configuration
, Server(..)
Expand All @@ -20,19 +18,18 @@ module Network.GRPC.Client.Connection (
, ConnParams(..)
, ReconnectPolicy(..)
, exponentialBackoff
-- * Using the connection
, connParams
, getConnectionToServer
, getOutboundCompression
, updateConnectionMeta
) where

import Control.Concurrent
import Control.Concurrent.STM
import Control.Monad
import Control.Monad.Catch
import Data.ByteString.Char8 qualified as BS.Strict.C8
import Data.Default
import Data.Foldable (asum)
import Data.List (intersperse)
import Data.Maybe (fromMaybe)
import Data.Proxy
import Data.Version
import GHC.Stack
import Network.HPACK qualified as HPACK
import Network.HTTP2.Client qualified as HTTP2.Client
Expand All @@ -46,18 +43,13 @@ import Network.GRPC.Client.Meta (Meta)
import Network.GRPC.Client.Meta qualified as Meta
import Network.GRPC.Client.Session
import Network.GRPC.Common.Compression qualified as Compr
import Network.GRPC.Common.Compression qualified as Compression
import Network.GRPC.Common.HTTP2Settings
import Network.GRPC.Spec
import Network.GRPC.Util.GHC
import Network.GRPC.Util.HTTP2.Stream (ServerDisconnected(..))
import Network.GRPC.Util.Session qualified as Session
import Network.GRPC.Util.Thread
import Network.GRPC.Util.TLS (ServerValidation(..), SslKeyLog(..))
import Network.GRPC.Util.TLS qualified as Util.TLS

import Paths_grapesy qualified as Grapesy

{---------------------------------------------------2----------------------------
Connection API
Expand Down Expand Up @@ -292,123 +284,42 @@ withConnection connParams server k = do
k Connection {connParams, connMetaVar, connStateVar}
`finally` putMVar connOutOfScope ()

-- | Open new channel to the server
{-------------------------------------------------------------------------------
Making use of the connection
-------------------------------------------------------------------------------}

-- | Get connection to the server
--
-- This is a non-blocking call; the connection will be set up in a
-- background thread; if this takes time, then the first call to
-- 'sendInput' or 'recvOutput' will block, but the call to 'startRPC'
-- itself will not block. This non-blocking nature makes this safe to use
-- in 'bracket' patterns.
startRPC :: forall rpc.
(SupportsClientRpc rpc, HasCallStack)
-- Returns two things: the connection to the server, as well as a @TMVar@ that
-- should be monitored to see if that connection is still live.
getConnectionToServer :: forall.
HasCallStack
=> Connection
-> Proxy rpc
-> CallParams rpc
-> IO (Call rpc)
startRPC Connection{connMetaVar, connParams, connStateVar} _ callParams = do
(connClosed, conn) <-
atomically $ do
connState <- readTVar connStateVar
case connState of
ConnectionNotReady -> retry
ConnectionReady connClosed conn -> return (connClosed, conn)
ConnectionAbandoned err -> throwSTM err
ConnectionOutOfScope -> error "impossible"

cOut <- Meta.outboundCompression <$> currentMeta
metadata <- buildMetadataIO $ callRequestMetadata callParams
let flowStart :: Session.FlowStart (ClientOutbound rpc)
flowStart = Session.FlowStartRegular $ OutboundHeaders {
outHeaders = requestHeaders cOut metadata
, outCompression = fromMaybe noCompression cOut
}

let serverClosedConnection ::
Either TrailersOnly' ProperTrailers'
-> SomeException
serverClosedConnection =
either toException toException
. grpcClassifyTermination
. either (fst . trailersOnlyToProperTrailers) id

channel <-
Session.setupRequestChannel
callSession
conn
serverClosedConnection
flowStart

-- Spawn a thread to monitor the connection, and close the new channel when
-- the connection is closed. To prevent a memory leak by hanging on to the
-- channel for the lifetime of the connection, the thread also terminates in
-- the (normal) case that the channel is closed before the connection is.
_ <- forkLabelled "grapesy:monitorConnection" $ do
status <- atomically $ do
(Left <$> waitForNormalOrAbnormalThreadTermination
(Session.channelOutbound channel))
`orElse`
(Right <$> readTMVar connClosed)
case status of
Left _ -> return () -- Channel closed before the connection
Right mErr -> do
let exitReason :: ExitCase ()
exitReason =
case mErr of
Nothing -> ExitCaseSuccess ()
Just exitWithException ->
ExitCaseException . toException $
ServerDisconnected exitWithException callStack
_mAlreadyClosed <- Session.close channel exitReason
return ()

return $ Call callSession channel
where
currentMeta :: IO Meta
currentMeta = readMVar connMetaVar

updateMeta :: ResponseHeaders' -> IO ()
updateMeta hdrs =
modifyMVar_ connMetaVar $ Meta.update (connCompression connParams) hdrs

requestHeaders :: Maybe Compression -> [CustomMetadata] -> RequestHeaders
requestHeaders cOut metadata = RequestHeaders{
requestTimeout =
asum [
callTimeout callParams
, connDefaultTimeout connParams
]
, requestMetadata =
customMetadataMapFromList metadata
, requestCompression =
compressionId <$> cOut
, requestAcceptCompression = Just $
Compression.offer $ connCompression connParams
, requestContentType =
connContentType connParams
, requestMessageType =
Just MessageTypeDefault
, requestUserAgent = Just $
mconcat [
"grpc-haskell-grapesy/"
, mconcat . intersperse "." $
map (BS.Strict.C8.pack . show) $
versionBranch Grapesy.version
]
, requestIncludeTE =
True
, requestTraceContext =
Nothing
, requestPreviousRpcAttempts =
Nothing
, requestUnrecognized =
()
}

callSession :: ClientSession rpc
callSession = ClientSession {
clientCompression = connCompression connParams
, clientUpdateMeta = updateMeta
}
-> IO (TMVar (Maybe SomeException), Session.ConnectionToServer)
getConnectionToServer Connection{connStateVar} = atomically $ do
connState <- readTVar connStateVar
case connState of
ConnectionNotReady -> retry
ConnectionReady connClosed conn -> return (connClosed, conn)
ConnectionAbandoned err -> throwSTM err
ConnectionOutOfScope -> error "impossible"

-- | Get outbound compression algorithm
--
-- This is stateful, because it depends on whether or not compression negotation
-- has happened yet: before the remote peer has told us which compression
-- algorithms it can support, we must use no compression.
getOutboundCompression :: Connection -> IO (Maybe Compression)
getOutboundCompression Connection{connMetaVar} =
Meta.outboundCompression <$> readMVar connMetaVar

-- | Update connection metadata
--
-- Amongst other things, this updates the compression algorithm to be used
-- (see also 'getOutboundCompression').
updateConnectionMeta :: Connection -> ResponseHeaders' -> IO ()
updateConnectionMeta Connection{connMetaVar, connParams} hdrs =
modifyMVar_ connMetaVar $ Meta.update (connCompression connParams) hdrs

{-------------------------------------------------------------------------------
Internal auxiliary
Expand Down

0 comments on commit 968ae3d

Please sign in to comment.