Skip to content

Commit

Permalink
Detect final element
Browse files Browse the repository at this point in the history
Closes #114.
  • Loading branch information
edsko committed Jun 14, 2024
1 parent 1b17362 commit b91e1a8
Show file tree
Hide file tree
Showing 10 changed files with 243 additions and 121 deletions.
4 changes: 2 additions & 2 deletions grapesy.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ library
, exceptions >= 0.10 && < 0.11
, hashable >= 1.3 && < 1.5
, http-types >= 0.12 && < 0.13
, http2 >= 5.2.1 && < 5.3
, http2 >= 5.2.3 && < 5.3
, http2-tls >= 0.2.11 && < 0.3
, lens >= 5.0 && < 5.4
, mtl >= 2.2 && < 2.4
Expand Down Expand Up @@ -339,7 +339,7 @@ test-suite test-grapesy
, containers >= 0.6 && < 0.8
, exceptions >= 0.10 && < 0.11
, http-types >= 0.12 && < 0.13
, http2 >= 5.2.1 && < 5.3
, http2 >= 5.2.3 && < 5.3
, mtl >= 2.2 && < 2.4
, network >= 3.1 && < 3.3
, proto-lens-runtime >= 0.7 && < 0.8
Expand Down
4 changes: 0 additions & 4 deletions src/Network/GRPC/Client.hs
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,3 @@ import Network.GRPC.Util.TLS qualified as Util.TLS
-- The specialized functions from "Network.GRPC.Client.StreamType" take care of
-- this; if these functions are not applicable, users may wish to use
-- 'recvFinalOutput'.
--
-- **KNOWN LIMITATION**: for _incoming_ messages @grapesy@ cannot currently
-- make this distinction. For detailed discussion, see
-- <https://github.com/well-typed/grapesy/issues/114>.
11 changes: 6 additions & 5 deletions test-grapesy/Test/Driver/Dialogue/Execution.hs
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,12 @@ clientLocal clock call = \(LocalSteps steps) ->
expect (tick, action) (== ResponseInitialMetadata expectedMetadata) $
receivedMetadata
Send (FinalElem a b) -> do
-- <https://github.com/well-typed/grapesy/issues/114>
-- On the client side, when the server sends the final message, we
-- will receive that final message in one HTTP data frame, and then
-- the trailers in another. This means that when we get the message,
-- we do not yet know if this is in fact the last.
-- (This is different on the server side, because gRPC does not
-- support trailers on the client side.)
reactToServer tick $ Send (StreamElem a)
reactToServer tick $ Send (NoMoreElems b)
Send expectedElem -> do
Expand Down Expand Up @@ -401,10 +406,6 @@ serverLocal clock call = \(LocalSteps steps) -> do
case action of
Initiate _ ->
error "serverLocal: unexpected ClientInitiateRequest"
Send (FinalElem a b) -> do
-- <https://github.com/well-typed/grapesy/issues/114>
reactToClient tick $ Send (StreamElem a)
reactToClient tick $ Send (NoMoreElems b)
Send expectedElem -> do
mInp <- liftIO $ try $ within timeoutReceive action $
Server.Binary.recvInput call
Expand Down
63 changes: 48 additions & 15 deletions test-grapesy/Test/Prop/IncrementalParsing.hs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
module Test.Prop.IncrementalParsing (tests) where

import Control.Monad.Except
import Control.Monad.State
import Control.Monad
import Control.Monad.Except (MonadError, Except, runExcept, throwError)
import Control.Monad.State (MonadState, StateT, runStateT, state)
import Data.ByteString qualified as BS.Strict
import Data.ByteString qualified as Strict (ByteString)
import Data.ByteString.Lazy qualified as BS.Lazy
Expand All @@ -24,10 +25,10 @@ tests = testGroup "Test.Prop.IncrementalParsing" [
testProperty "parser" test_parser
]

test_parser :: Input -> [ChunkSize] -> PhraseSize -> Property
test_parser input splits phraseSize =
test_parser :: MarkLast -> Input -> [ChunkSize] -> PhraseSize -> Property
test_parser markLast input splits phraseSize =
counterexample ("chunks: " ++ show chunks)
$ case processAll chunks processPhrase (parsePhrase phraseSize) of
$ case processAll markLast chunks phraseSize of
Left err ->
counterexample ("Unexpected failure " ++ show err) $ False
Right unconsumed ->
Expand Down Expand Up @@ -64,29 +65,33 @@ runPure chunks =
. flip runStateT chunks
. unwrapPure

getChunk :: Pure Strict.ByteString
getChunk =
getChunk :: MarkLast -> Pure (Strict.ByteString, Bool)
getChunk (MarkLast markLast) =
state $ \case
[] -> (BS.Strict.empty, [])
c:cs -> (c, cs)
[] -> ((BS.Strict.empty, True), [])
[c] -> ((c, markLast), [])
c:cs -> ((c, False), cs)

processAll ::
[Strict.ByteString]
-> (a -> Pure ())
-> Parser String a
MarkLast
-> [Strict.ByteString]
-> PhraseSize
-> Either String Lazy.ByteString
processAll chunks processOne p =
processAll markLast chunks phraseSize =
runPure chunks aux >>= verifyAllChunksConsumed
where
p :: Parser String [Word8]
p = parsePhrase phraseSize

-- 'processAll' does not assume that the monad @m@ in which it is executed
-- has any way of reporting errors: if there is a parse failure during
-- execution, this failure is returned as a value. For the specific case of
-- 'Pure', however, we /can/ throw errors in @m@ (to allow 'processOne' to
-- throw errors), so we can reuse that also for any parse failures.
aux :: Pure Lazy.ByteString
aux =
Parser.processAll getChunk processOne p
>>= either throwError return
Parser.processAll (getChunk markLast) processPhrase processPhrase p
>>= throwParseErrors

-- 'processAll' should run until all chunks are used
verifyAllChunksConsumed ::
Expand All @@ -99,6 +104,28 @@ processAll chunks processOne p =
| otherwise
= Left "not all chunks consumed"

throwParseErrors :: Parser.ProcessResult String () -> Pure Lazy.ByteString
throwParseErrors (Parser.ProcessError err) =
throwError err
throwParseErrors (Parser.ProcessedWithFinal () bs) = do
unless canMarkFinal $ throwError "Unexpected ProcessedWithFinal"
return bs
throwParseErrors (Parser.ProcessedWithoutFinal bs) = do
when canMarkFinal $ throwError "Unexpected ProcessedWithoutFinal"
return bs

-- We can mark the final phrase as final if the final chunk is marked as
-- final, and when we get that chunk, it contains at least one phrase.
canMarkFinal :: Bool
canMarkFinal = and [
getMarkLast markLast
, case reverse chunks of
[] -> False
c:cs -> let left = sum (map BS.Strict.length cs)
`mod` getPhraseSize phraseSize
in (left + BS.Strict.length c) >= getPhraseSize phraseSize
]

{-------------------------------------------------------------------------------
Test input
Expand All @@ -111,6 +138,8 @@ processAll chunks processOne p =
```
* We split this input into non-empty chunks of varying sizes @[ChunkSize]@.
We sometimes mark the last chunk as being the last, and sometimes don't
(see <https://github.com/well-typed/grapesy/issues/114>).
* We then choose a non-zero 'PhraseSize' @n@. The idea is that the parser
splits the input into phrases of @n@ bytes
Expand All @@ -134,6 +163,7 @@ processAll chunks processOne p =
(in 'processAll') that all input chunks are fed to the parser.
-------------------------------------------------------------------------------}

newtype MarkLast = MarkLast { getMarkLast :: Bool } deriving (Show)
newtype Input = Input { getInputBytes :: [Word8] } deriving (Show)
newtype ChunkSize = ChunkSize { getChunkSize :: Int } deriving (Show)
newtype PhraseSize = PhraseSize { getPhraseSize :: Int } deriving (Show)
Expand Down Expand Up @@ -179,6 +209,8 @@ processPhrase phrase =
Arbitrary instances
-------------------------------------------------------------------------------}

deriving newtype instance Arbitrary MarkLast

instance Arbitrary Input where
arbitrary = sized $ \n -> do
len <- choose (0, n * 100)
Expand All @@ -187,3 +219,4 @@ instance Arbitrary Input where

deriving via Positive Int instance Arbitrary ChunkSize
deriving via Positive Int instance Arbitrary PhraseSize

34 changes: 33 additions & 1 deletion test-grapesy/Test/Sanity/EndOfStream.hs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ tests = testGroup "Test.Sanity.EndOfStream" [
, testCase "recvTrailers" test_recvTrailers
]
, testGroup "server" [
testCase "recvEndOfInput" test_recvEndOfInput
testCase "recvInput" test_recvInput
, testCase "recvEndOfInput" test_recvEndOfInput
]
]

Expand Down Expand Up @@ -155,6 +156,37 @@ serverStreamingHandler = Server.streamingRpcHandler $
Server tests
-------------------------------------------------------------------------------}

-- | Test that the final element is marked as 'FinalElem'
--
-- Verifies that <https://github.com/well-typed/grapesy/issues/114> is solved.
--
-- NOTE: There is no client equivalent for this test. On the client side, the
-- server will send trailers, and so /cannot/ make the final data frame as
-- end-of-stream.
test_recvInput :: Assertion
test_recvInput = testClientServer $ ClientServerTest {
config = def
, server = [Server.someRpcHandler handler]
, client = simpleTestClient $ \conn ->
Client.withRPC conn def (Proxy @Trivial) $ \call -> do
Client.sendFinalInput call BS.Lazy.empty
_resp <- Client.recvFinalOutput call
return ()
}
where
handler :: Server.RpcHandler IO Trivial
handler = Server.mkRpcHandler $ \call -> do
x <- Server.recvInput call

-- The purpose of this test:
case x of
FinalElem{} ->
return ()
_otherwise ->
assertFailure "Expected FinalElem"

Server.sendFinalOutput call (mempty, NoMetadata)

-- | Test that 'recvEndOfInput' does /not/ throw an exception, even if the
-- previous 'recvNextInput' /happened/ to give us the final input.
--
Expand Down
3 changes: 2 additions & 1 deletion test-grapesy/Test/Sanity/StreamingType/CustomFormat.hs
Original file line number Diff line number Diff line change
Expand Up @@ -232,10 +232,11 @@ test_calculator_cbor = do
biDiStreamingSumHandler = Server.streamingRpcHandler $
Server.mkBiDiStreaming $ \recv send ->
let
go :: Int -> IO ()
go acc =
recv >>= \case
NoMoreElems _ -> return ()
FinalElem n _ -> send (acc + n) >> go (acc + n)
FinalElem n _ -> send (acc + n)
StreamElem n -> send (acc + n) >> go (acc + n)
in
go 0
70 changes: 34 additions & 36 deletions util/Network/GRPC/Util/HTTP2/Stream.hs
Original file line number Diff line number Diff line change
Expand Up @@ -33,28 +33,30 @@ import Network.GRPC.Util.HTTP2 (fromHeaderTable)

data OutputStream = OutputStream {
-- | Write a chunk to the stream
_writeChunk :: HasCallStack => Builder -> IO ()
--
-- The 'Bool' argument indicates if this is the final chunk.
_writeChunk :: HasCallStack => Bool -> Builder -> IO ()

-- | Flush the stream (send frames to the peer)
, _flush :: HasCallStack => IO ()
}

data InputStream = InputStream {
_getChunk :: HasCallStack => IO Strict.ByteString
_getChunk :: HasCallStack => IO (Strict.ByteString, Bool)
, _getTrailers :: HasCallStack => IO [HTTP.Header]
}

{-------------------------------------------------------------------------------
Wrappers to get the proper CallStack
-------------------------------------------------------------------------------}

writeChunk :: HasCallStack => OutputStream -> Builder -> IO ()
writeChunk :: HasCallStack => OutputStream -> Bool -> Builder -> IO ()
writeChunk = _writeChunk

flush :: HasCallStack => OutputStream -> IO ()
flush = _flush

getChunk :: HasCallStack => InputStream -> IO Strict.ByteString
getChunk :: HasCallStack => InputStream -> IO (Strict.ByteString, Bool)
getChunk = _getChunk

getTrailers :: HasCallStack => InputStream -> IO [HTTP.Header]
Expand All @@ -67,11 +69,12 @@ getTrailers = _getTrailers
serverInputStream :: Server.Request -> IO InputStream
serverInputStream req = do
return InputStream {
_getChunk = wrapStreamExceptionsWith ClientDisconnected $
Server.getRequestBodyChunk req
, _getTrailers = wrapStreamExceptionsWith ClientDisconnected $
maybe [] fromHeaderTable <$>
Server.getRequestTrailers req
_getChunk =
wrapStreamExceptionsWith ClientDisconnected $
Server.getRequestBodyChunk' req
, _getTrailers =
wrapStreamExceptionsWith ClientDisconnected $
maybe [] fromHeaderTable <$> Server.getRequestTrailers req
}

-- | Create output stream
Expand Down Expand Up @@ -115,10 +118,12 @@ serverOutputStream writeChunk' flush' = do
-- case (see discussion above).

let outputStream = OutputStream {
_writeChunk = \c -> wrapStreamExceptionsWith ClientDisconnected $
writeChunk' c
, _flush = wrapStreamExceptionsWith ClientDisconnected $
flush'
_writeChunk = \_isFinal c ->
wrapStreamExceptionsWith ClientDisconnected $
writeChunk' c
, _flush =
wrapStreamExceptionsWith ClientDisconnected $
flush'
}

flush outputStream
Expand All @@ -131,31 +136,24 @@ serverOutputStream writeChunk' flush' = do
clientInputStream :: Client.Response -> IO InputStream
clientInputStream resp = do
return InputStream {
_getChunk = wrapStreamExceptionsWith ServerDisconnected $
Client.getResponseBodyChunk resp
, _getTrailers = wrapStreamExceptionsWith ServerDisconnected $
maybe [] fromHeaderTable <$>
Client.getResponseTrailers resp
_getChunk =
wrapStreamExceptionsWith ServerDisconnected $
Client.getResponseBodyChunk' resp
, _getTrailers =
wrapStreamExceptionsWith ServerDisconnected $
maybe [] fromHeaderTable <$> Client.getResponseTrailers resp
}

clientOutputStream :: (Builder -> IO ()) -> IO () -> IO OutputStream
clientOutputStream writeChunk' flush' = do
-- The http2 client implementation has an explicit check that means the
-- request is not initiated until /something/ has been written
-- <https://github.com/kazu-yamamoto/http2/commit/a76cdf3>; to workaround
-- this limitation we write an empty chunk. This results in an empty data
-- frame in the stream, but this does not matter: gRPC does not support
-- request headers, which means that, unlike in the server, we do not need
-- to give the empty case special treatment.
let outputStream = OutputStream {
_writeChunk = \c -> wrapStreamExceptionsWith ServerDisconnected $
writeChunk' c
, _flush = wrapStreamExceptionsWith ServerDisconnected $
flush'
}

writeChunk outputStream mempty
return outputStream
clientOutputStream :: (Bool -> Builder -> IO ()) -> IO () -> IO OutputStream
clientOutputStream writeChunk' flush' =
return OutputStream {
_writeChunk = \isFinal c ->
wrapStreamExceptionsWith ServerDisconnected $
writeChunk' isFinal c
, _flush =
wrapStreamExceptionsWith ServerDisconnected $
flush'
}

{-------------------------------------------------------------------------------
Exceptions
Expand Down
Loading

0 comments on commit b91e1a8

Please sign in to comment.