Skip to content

Commit

Permalink
Check that messages are well-formed before enqueue
Browse files Browse the repository at this point in the history
  • Loading branch information
edsko committed Oct 20, 2024
1 parent 7a9648c commit 3f58f07
Show file tree
Hide file tree
Showing 12 changed files with 126 additions and 16 deletions.
2 changes: 2 additions & 0 deletions grapesy/grapesy.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,7 @@ test-suite test-grapesy
, bytestring >= 0.10 && < 0.13
, case-insensitive >= 1.2 && < 1.3
, containers >= 0.6 && < 0.8
, deepseq >= 1.4 && < 1.6
, exceptions >= 0.10 && < 0.11
, http-types >= 0.12 && < 0.13
, http2 >= 5.3.4 && < 5.4
Expand Down Expand Up @@ -636,6 +637,7 @@ benchmark grapesy-kvstore
, base64-bytestring >= 1.2 && < 1.3
, bytestring >= 0.10 && < 0.13
, containers >= 0.6 && < 0.8
, deepseq >= 1.4 && < 1.6
, hashable >= 1.3 && < 1.5
, optparse-applicative >= 0.16 && < 0.19
, proto-lens-runtime >= 0.7 && < 0.8
Expand Down
4 changes: 3 additions & 1 deletion grapesy/kvstore/KVStore/API.hs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ module KVStore.API (
, KVStore(..)
) where

import Control.DeepSeq (NFData)
import Control.Monad
import Data.Aeson.Types qualified as Aeson
import Data.ByteString (ByteString)
Expand All @@ -25,13 +26,14 @@ newtype Key = Key {
getKey :: ByteString
}
deriving stock (Show, Eq, Ord)
deriving newtype (Hashable)
deriving newtype (Hashable, NFData)
deriving (ToJSON, FromJSON) via Base64

newtype Value = Value {
getValue :: ByteString
}
deriving stock (Show, Eq, Ord)
deriving newtype (NFData)
deriving (ToJSON, FromJSON) via Base64

{-------------------------------------------------------------------------------
Expand Down
2 changes: 1 addition & 1 deletion grapesy/src/Network/GRPC/Common/NextElem.hs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import Network.GRPC.Common.StreamElem (StreamElem(..))
-- | Is there a next element in a stream?
--
-- Does not record metadata, unlike 'Network.GRPC.Common.StreamElem.StreamElem'.
data NextElem a = NoNextElem | NextElem a
data NextElem a = NoNextElem | NextElem !a
deriving stock (Show, Eq, Functor, Foldable, Traversable)

{-------------------------------------------------------------------------------
Expand Down
6 changes: 3 additions & 3 deletions grapesy/src/Network/GRPC/Common/StreamElem.hs
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,12 @@ data StreamElem b a =
--
-- In this case, this element is /not/ final (and the final element, when
-- we receive it, will be tagged as 'Final').
StreamElem a
StreamElem !a

-- | We received the final element
--
-- The final element is annotated with some additional information.
| FinalElem a b
| FinalElem !a !b

-- | There are no more elements
--
Expand All @@ -59,7 +59,7 @@ data StreamElem b a =
-- * The stream didn't contain any elements at all.
-- * The final element was not marked as final.
-- See 'StreamElem' for detailed additional discussion.
| NoMoreElems b
| NoMoreElems !b
deriving stock (Show, Eq, Functor, Foldable, Traversable)

instance Bifunctor StreamElem where
Expand Down
8 changes: 8 additions & 0 deletions grapesy/src/Network/GRPC/Server/StreamType.hs
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,14 @@ data Services m (servs :: [[k]]) where
--
-- > Server.fromMethod @EmptyCall $ ServerHandler $ \(_ ::Empty) ->
-- > return (defMessage :: Empty)
--
-- If the streaming type cannot be deduced, you might need to specify that also:
--
-- > Server.fromMethod @Ping @NonStreaming $ ServerHandler $ ..
--
-- Alternatively, use one of the handler construction functions, such as
--
-- > Server.fromMethod @Ping $ Server.mkNonStreaming $ ..
fromMethod :: forall rpc styp m.
( SupportsServerRpc rpc
, ValidStreamingType styp
Expand Down
5 changes: 4 additions & 1 deletion grapesy/src/Network/GRPC/Spec/MessageMeta.hs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ module Network.GRPC.Spec.MessageMeta (
, InboundMeta(..)
) where

import Control.DeepSeq (NFData)
import Data.Default
import Data.Word
import GHC.Generics (Generic)

{-------------------------------------------------------------------------------
Outbound messages
Expand All @@ -18,7 +20,8 @@ data OutboundMeta = OutboundMeta {
-- smaller message.
outboundEnableCompression :: Bool
}
deriving stock (Show)
deriving stock (Show, Generic)
deriving anyclass (NFData)

instance Default OutboundMeta where
def = OutboundMeta {
Expand Down
14 changes: 12 additions & 2 deletions grapesy/src/Network/GRPC/Spec/RPC.hs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ module Network.GRPC.Spec.RPC (
, defaultRpcContentType
) where

import Control.DeepSeq (NFData)
import Data.ByteString qualified as Strict (ByteString)
import Data.ByteString.Lazy qualified as Lazy
import Data.Kind
Expand Down Expand Up @@ -36,13 +37,22 @@ type family Output (rpc :: k) :: Type
-- We therefore punt on the encoding issue here, and use bytestrings. /If/
-- applications want to use non-ASCII characters, they can choose their own
-- encoding.
class ( -- Debug constraints
class ( -- Serialization
--
-- We force messages to NF before enqueueing them. This ensures that
-- if those messages contain any pure exceptions (due to a bug in a
-- client or a server), we detect the problem when the message is
-- enqueued, and can throw an appropriate exception.
NFData (Input rpc)
, NFData (Output rpc)

-- Debug constraints
--
-- For debugging it is useful when we have 'Show' instances in scope.
-- This is not that strong a requirement; after all, we must be able
-- to serialize inputs and deserialize outputs, so they must also be
-- 'Show'able.
Show (Input rpc)
, Show (Input rpc)
, Show (Output rpc)
, Show (RequestMetadata rpc)
, Show (ResponseInitialMetadata rpc)
Expand Down
18 changes: 16 additions & 2 deletions grapesy/src/Network/GRPC/Spec/RPC/JSON.hs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ module Network.GRPC.Spec.RPC.JSON (
, Optional(..)
) where

import Control.DeepSeq (NFData(..))
import Data.Aeson (ToJSON(..), FromJSON(..), (.=), (.:), (.:?))
import Data.Aeson qualified as Aeson
import Data.Aeson.Types qualified as Aeson
Expand Down Expand Up @@ -59,6 +60,10 @@ data JsonRpc (serv :: Symbol) (meth :: Symbol)
instance ( KnownSymbol serv
, KnownSymbol meth

-- Serialization
, NFData (Input (JsonRpc serv meth))
, NFData (Output (JsonRpc serv meth))

-- Debugging constraints
, Show (Input (JsonRpc serv meth))
, Show (Output (JsonRpc serv meth))
Expand Down Expand Up @@ -129,19 +134,28 @@ instance (Show x, Show (JsonObject fs))
. showString " :* "
. showsPrec 6 xs

instance NFData (JsonObject '[]) where
rnf JsonObject = ()

instance (NFData x, NFData (JsonObject fs))
=> NFData (JsonObject ('(f, x) : fs)) where
rnf (x :* xs) = rnf (x, xs)

-- | Required field
newtype Required a = Required {
getRequired :: a
}
deriving (Show)
deriving stock (Show)
deriving newtype (NFData)

-- | Optional field
--
-- 'Maybe' will be represented by the /absence/ of the field in the object.
newtype Optional a = Optional {
getOptional :: Maybe a
}
deriving (Show)
deriving stock (Show)
deriving newtype (NFData)

infixr 5 :*

Expand Down
8 changes: 8 additions & 0 deletions grapesy/src/Network/GRPC/Spec/RPC/Protobuf.hs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ module Network.GRPC.Spec.RPC.Protobuf (
, getProto
) where

import Control.DeepSeq (NFData)
import Control.Lens hiding (lens)
import Data.ByteString qualified as Strict (ByteString)
import Data.ByteString.Char8 qualified as BS.Char8
Expand Down Expand Up @@ -51,9 +52,15 @@ type instance Input (Protobuf serv meth) = Proto (MethodInput serv meth)
type instance Output (Protobuf serv meth) = Proto (MethodOutput serv meth)

instance ( HasMethodImpl serv meth

-- Debugging
, Show (MethodInput serv meth)
, Show (MethodOutput serv meth)

-- Serialization
, NFData (MethodInput serv meth)
, NFData (MethodOutput serv meth)

-- Metadata constraints
, Show (RequestMetadata (Protobuf serv meth))
, Show (ResponseInitialMetadata (Protobuf serv meth))
Expand Down Expand Up @@ -144,6 +151,7 @@ newtype Proto msg = Proto msg
, Enum
, FieldDefault
, MessageEnum
, NFData
)

-- | Field accessor for 'Proto'
Expand Down
55 changes: 55 additions & 0 deletions grapesy/test-grapesy/Test/Sanity/BrokenDeployments.hs
Original file line number Diff line number Diff line change
@@ -1,19 +1,26 @@
-- Intentionally /NOT/ enabling OverloadedStrings.
-- This forces us to be precise about encoding issues.

{-# LANGUAGE OverloadedLabels #-}

module Test.Sanity.BrokenDeployments (tests) where

import Control.Exception
import Data.ByteString.Char8 qualified as BS.Strict.Char8
import Data.ByteString.UTF8 qualified as BS.Strict.UTF8
import Data.IORef
import Data.Text qualified as Text
import Network.HTTP.Types qualified as HTTP
import Test.Tasty
import Test.Tasty.HUnit

import Network.GRPC.Client qualified as Client
import Network.GRPC.Client.StreamType.IO qualified as Client
import Network.GRPC.Common
import Network.GRPC.Common.Protobuf
import Network.GRPC.Server.StreamType qualified as Server

import Test.Driver.ClientServer
import Test.Util.RawTestServer

import Proto.API.Ping
Expand Down Expand Up @@ -44,6 +51,9 @@ tests = testGroup "Test.Sanity.BrokenDeployments" [
, testCase "requestMetadata" test_invalidRequestMetadata
, testCase "trailerMetadata" test_invalidTrailerMetadata
]
, testGroup "Undefined" [
testCase "output" test_undefinedOutput
]
]

connParams :: Client.ConnParams
Expand Down Expand Up @@ -324,3 +334,48 @@ grpcMessageContains GrpcException{grpcErrorMessage} str =
case grpcErrorMessage of
Just msg -> Text.pack str `Text.isInfixOf` msg
Nothing -> False

{-------------------------------------------------------------------------------
Undefined values
-------------------------------------------------------------------------------}

test_undefinedOutput :: Assertion
test_undefinedOutput = do
st <- newIORef 0
testClientServer $ ClientServerTest {
config = def
, server = [Server.fromMethod @Ping $ Server.mkNonStreaming (handler st)]
, client = simpleTestClient $ \conn -> do

-- The first time the handler is invoked, it attempts to enqueue a
-- an undefined message (one containing a pure exception). Prior to
-- #235 this would result in undefined behaviour, probably the server
-- disconnecting. What should happen instead is that this exception
-- is thrown in the handler, caught, sent to the client as a
-- 'GrpcException', and re-raised in the client.
mResp1 :: Either GrpcException (Proto PongMessage) <- try $
Client.nonStreaming conn (Client.rpc @Ping) (defMessage & #id .~ 1)
case mResp1 of
Left err | Just msg <- grpcErrorMessage err ->
assertBool "" $ Text.pack "uhoh" `Text.isInfixOf` msg
_otherwise ->
assertFailure "Unexpected response"

-- Meanwhile, the server should just continue running; the /second/
-- invocation of the handler should succeed normally.
mResp2 :: Either GrpcException (Proto PongMessage) <- try $
Client.nonStreaming conn (Client.rpc @Ping) (defMessage & #id .~ 2)
case mResp2 of
Right resp ->
assertEqual "" 2 $ resp ^. #id
_otherwise ->
assertFailure "Unexpected response"
}
where
-- Server handler attempts to enqueue an undefined message
handler :: IORef Int -> Proto PingMessage -> IO (Proto PongMessage)
handler st req = do
isFirst <- atomicModifyIORef st $ \i -> (succ i, i == 0)
if isFirst
then return $ throw $ DeliberateException (userError "uhoh")
else return $ defMessage & #id .~ req ^. #id
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ module Test.Sanity.StreamingType.CustomFormat (tests) where

import Codec.Serialise qualified as Cbor
import Control.Concurrent.Async (concurrently)
import Control.DeepSeq (NFData)
import Data.Bifunctor
import Data.ByteString qualified as Strict (ByteString)
import Data.Kind
Expand Down Expand Up @@ -59,6 +60,8 @@ data Function =
class ( Typeable fun
, Show (CalcInput fun)
, Show (CalcOutput fun)
, NFData (CalcInput fun)
, NFData (CalcOutput fun)
, Cbor.Serialise (CalcInput fun)
, Cbor.Serialise (CalcOutput fun)
) => CalculatorFunction (fun :: Function) where
Expand Down
17 changes: 11 additions & 6 deletions grapesy/util/Network/GRPC/Util/Session/Channel.hs
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,13 @@ module Network.GRPC.Util.Session.Channel (
) where

import Control.Concurrent.STM
import Control.DeepSeq (NFData, force)
import Control.Exception
import Control.Monad
import Control.Monad.Catch (ExitCase(..))
import Data.Bifunctor
import Data.ByteString.Builder (Builder)
import Data.ByteString.Lazy qualified as BS.Lazy
import GHC.Stack

-- Doesn't really matter if we import from .Client or .Server
Expand All @@ -55,7 +57,6 @@ import Network.GRPC.Util.RedundantConstraint
import Network.GRPC.Util.Session.API
import Network.GRPC.Util.Thread
import Network.GRPC.Util.Parser qualified as Parser
import Data.ByteString.Lazy qualified as BS.Lazy

{-------------------------------------------------------------------------------
Definitions
Expand Down Expand Up @@ -237,15 +238,19 @@ getInboundHeaders Channel{channelInbound} =
-- which 'StreamElem.whenDefinitelyFinal' considers to be final). Doing so will
-- result in a 'SendAfterFinal' exception.
send :: forall sess.
HasCallStack
(HasCallStack, NFData (Message (Outbound sess)))
=> Channel sess
-> StreamElem (Trailers (Outbound sess)) (Message (Outbound sess))
-> IO ()
send Channel{channelOutbound, channelSentFinal} msg =
withThreadInterface channelOutbound aux
send Channel{channelOutbound, channelSentFinal} = \msg -> do
msg' <- evaluate $ force <$> msg
withThreadInterface channelOutbound $ aux msg'
where
aux :: FlowState (Outbound sess) -> STM ()
aux st = do
aux ::
StreamElem (Trailers (Outbound sess)) (Message (Outbound sess))
-> FlowState (Outbound sess)
-> STM ()
aux msg st = do
-- By checking that we haven't sent the final message yet, we know that
-- this call to 'putMVar' will not block indefinitely: the thread that
-- sends messages to the peer will get to it eventually (unless it dies,
Expand Down

0 comments on commit 3f58f07

Please sign in to comment.