Skip to content

Commit

Permalink
Detect final element
Browse files Browse the repository at this point in the history
Closes #114.
  • Loading branch information
edsko committed May 29, 2024
1 parent c39f943 commit 9303873
Show file tree
Hide file tree
Showing 9 changed files with 197 additions and 72 deletions.
4 changes: 2 additions & 2 deletions grapesy.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ library
, exceptions >= 0.10 && < 0.11
, hashable >= 1.3 && < 1.5
, http-types >= 0.12 && < 0.13
, http2 >= 5.2.1 && < 5.3
, http2 >= 5.2.2 && < 5.3
, http2-tls >= 0.2.11 && < 0.3
, lens >= 5.0 && < 5.4
, mtl >= 2.2 && < 2.4
Expand Down Expand Up @@ -339,7 +339,7 @@ test-suite test-grapesy
, containers >= 0.6 && < 0.8
, exceptions >= 0.10 && < 0.11
, http-types >= 0.12 && < 0.13
, http2 >= 5.2.1 && < 5.3
, http2 >= 5.2.2 && < 5.3
, mtl >= 2.2 && < 2.4
, network >= 3.1 && < 3.3
, proto-lens-runtime >= 0.7 && < 0.8
Expand Down
4 changes: 0 additions & 4 deletions src/Network/GRPC/Client.hs
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,3 @@ import Network.GRPC.Util.TLS qualified as Util.TLS
-- The specialized functions from "Network.GRPC.Client.StreamType" take care of
-- this; if these functions are not applicable, users may wish to use
-- 'recvFinalOutput'.
--
-- **KNOWN LIMITATION**: for _incoming_ messages @grapesy@ cannot currently
-- make this distinction. For detailed discussion, see
-- <https://github.com/well-typed/grapesy/issues/114>.
11 changes: 6 additions & 5 deletions test-grapesy/Test/Driver/Dialogue/Execution.hs
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,12 @@ clientLocal clock call = \(LocalSteps steps) ->
expect (tick, action) (== ResponseInitialMetadata expectedMetadata) $
receivedMetadata
Send (FinalElem a b) -> do
-- <https://github.com/well-typed/grapesy/issues/114>
-- On the client side, when the server sends the final message, we
-- will receive that final message in one HTTP data frame, and then
-- the trailers in another. This means that when we get the message,
-- we do not yet know if this is in fact the last.
-- (This is different on the server side, because gRPC does not
-- support trailers on the client side.)
reactToServer tick $ Send (StreamElem a)
reactToServer tick $ Send (NoMoreElems b)
Send expectedElem -> do
Expand Down Expand Up @@ -401,10 +406,6 @@ serverLocal clock call = \(LocalSteps steps) -> do
case action of
Initiate _ ->
error "serverLocal: unexpected ClientInitiateRequest"
Send (FinalElem a b) -> do
-- <https://github.com/well-typed/grapesy/issues/114>
reactToClient tick $ Send (StreamElem a)
reactToClient tick $ Send (NoMoreElems b)
Send expectedElem -> do
mInp <- liftIO $ try $ within timeoutReceive action $
Server.Binary.recvInput call
Expand Down
63 changes: 48 additions & 15 deletions test-grapesy/Test/Prop/IncrementalParsing.hs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
module Test.Prop.IncrementalParsing (tests) where

import Control.Monad.Except
import Control.Monad.State
import Control.Monad
import Control.Monad.Except (MonadError, Except, runExcept, throwError)
import Control.Monad.State (MonadState, StateT, runStateT, state)
import Data.ByteString qualified as BS.Strict
import Data.ByteString qualified as Strict (ByteString)
import Data.ByteString.Lazy qualified as BS.Lazy
Expand All @@ -24,10 +25,10 @@ tests = testGroup "Test.Prop.IncrementalParsing" [
testProperty "parser" test_parser
]

test_parser :: Input -> [ChunkSize] -> PhraseSize -> Property
test_parser input splits phraseSize =
test_parser :: MarkLast -> Input -> [ChunkSize] -> PhraseSize -> Property
test_parser markLast input splits phraseSize =
counterexample ("chunks: " ++ show chunks)
$ case processAll chunks processPhrase (parsePhrase phraseSize) of
$ case processAll markLast chunks phraseSize of
Left err ->
counterexample ("Unexpected failure " ++ show err) $ False
Right unconsumed ->
Expand Down Expand Up @@ -64,29 +65,33 @@ runPure chunks =
. flip runStateT chunks
. unwrapPure

getChunk :: Pure Strict.ByteString
getChunk =
getChunk :: MarkLast -> Pure (Strict.ByteString, Bool)
getChunk (MarkLast markLast) =
state $ \case
[] -> (BS.Strict.empty, [])
c:cs -> (c, cs)
[] -> ((BS.Strict.empty, True), [])
[c] -> ((c, markLast), [])
c:cs -> ((c, False), cs)

processAll ::
[Strict.ByteString]
-> (a -> Pure ())
-> Parser String a
MarkLast
-> [Strict.ByteString]
-> PhraseSize
-> Either String Lazy.ByteString
processAll chunks processOne p =
processAll markLast chunks phraseSize =
runPure chunks aux >>= verifyAllChunksConsumed
where
p :: Parser String [Word8]
p = parsePhrase phraseSize

-- 'processAll' does not assume that the monad @m@ in which it is executed
-- has any way of reporting errors: if there is a parse failure during
-- execution, this failure is returned as a value. For the specific case of
-- 'Pure', however, we /can/ throw errors in @m@ (to allow 'processOne' to
-- throw errors), so we can reuse that also for any parse failures.
aux :: Pure Lazy.ByteString
aux =
Parser.processAll getChunk processOne p
>>= either throwError return
Parser.processAll (getChunk markLast) processPhrase processPhrase p
>>= throwParseErrors

-- 'processAll' should run until all chunks are used
verifyAllChunksConsumed ::
Expand All @@ -99,6 +104,28 @@ processAll chunks processOne p =
| otherwise
= Left "not all chunks consumed"

throwParseErrors :: Parser.ProcessResult String () -> Pure Lazy.ByteString
throwParseErrors (Parser.ProcessError err) =
throwError err
throwParseErrors (Parser.ProcessedWithFinal () bs) = do
unless canMarkFinal $ throwError "Unexpected ProcessedWithFinal"
return bs
throwParseErrors (Parser.ProcessedWithoutFinal bs) = do
when canMarkFinal $ throwError "Unexpected ProcessedWithoutFinal"
return bs

-- We can mark the final phrase as final if the final chunk is marked as
-- final, and when we get that chunk, it contains at least one phrase.
canMarkFinal :: Bool
canMarkFinal = and [
getMarkLast markLast
, case reverse chunks of
[] -> False
c:cs -> let left = sum (map BS.Strict.length cs)
`mod` getPhraseSize phraseSize
in (left + BS.Strict.length c) >= getPhraseSize phraseSize
]

{-------------------------------------------------------------------------------
Test input
Expand All @@ -111,6 +138,8 @@ processAll chunks processOne p =
```
* We split this input into non-empty chunks of varying sizes @[ChunkSize]@.
We sometimes mark the last chunk as being the last, and sometimes don't
(see <https://github.com/well-typed/grapesy/issues/114>).
* We then choose a non-zero 'PhraseSize' @n@. The idea is that the parser
splits the input into phrases of @n@ bytes
Expand All @@ -134,6 +163,7 @@ processAll chunks processOne p =
(in 'processAll') that all input chunks are fed to the parser.
-------------------------------------------------------------------------------}

newtype MarkLast = MarkLast { getMarkLast :: Bool } deriving (Show)
newtype Input = Input { getInputBytes :: [Word8] } deriving (Show)
newtype ChunkSize = ChunkSize { getChunkSize :: Int } deriving (Show)
newtype PhraseSize = PhraseSize { getPhraseSize :: Int } deriving (Show)
Expand Down Expand Up @@ -179,6 +209,8 @@ processPhrase phrase =
Arbitrary instances
-------------------------------------------------------------------------------}

deriving newtype instance Arbitrary MarkLast

instance Arbitrary Input where
arbitrary = sized $ \n -> do
len <- choose (0, n * 100)
Expand All @@ -187,3 +219,4 @@ instance Arbitrary Input where

deriving via Positive Int instance Arbitrary ChunkSize
deriving via Positive Int instance Arbitrary PhraseSize

34 changes: 33 additions & 1 deletion test-grapesy/Test/Sanity/EndOfStream.hs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ tests = testGroup "Test.Sanity.EndOfStream" [
, testCase "recvTrailers" test_recvTrailers
]
, testGroup "server" [
testCase "recvEndOfInput" test_recvEndOfInput
testCase "recvInput" test_recvInput
, testCase "recvEndOfInput" test_recvEndOfInput
]
]

Expand Down Expand Up @@ -155,6 +156,37 @@ serverStreamingHandler = Server.streamingRpcHandler $
Server tests
-------------------------------------------------------------------------------}

-- | Test that the final element is marked as 'FinalElem'
--
-- Verifies that <https://github.com/well-typed/grapesy/issues/114> is solved.
--
-- NOTE: There is no client equivalent for this test. On the client side, the
-- server will send trailers, and so /cannot/ make the final data frame as
-- end-of-stream.
test_recvInput :: Assertion
test_recvInput = testClientServer $ ClientServerTest {
config = def
, server = [Server.someRpcHandler handler]
, client = simpleTestClient $ \conn ->
Client.withRPC conn def (Proxy @Trivial) $ \call -> do
Client.sendFinalInput call BS.Lazy.empty
_resp <- Client.recvFinalOutput call
return ()
}
where
handler :: Server.RpcHandler IO Trivial
handler = Server.mkRpcHandler $ \call -> do
x <- Server.recvInput call

-- The purpose of this test:
case x of
FinalElem{} ->
return ()
_otherwise ->
assertFailure "Expected FinalElem"

Server.sendFinalOutput call (mempty, NoMetadata)

-- | Test that 'recvEndOfInput' does /not/ throw an exception, even if the
-- previous 'recvNextInput' /happened/ to give us the final input.
--
Expand Down
3 changes: 2 additions & 1 deletion test-grapesy/Test/Sanity/StreamingType/CustomFormat.hs
Original file line number Diff line number Diff line change
Expand Up @@ -232,10 +232,11 @@ test_calculator_cbor = do
biDiStreamingSumHandler = Server.streamingRpcHandler $
Server.mkBiDiStreaming $ \recv send ->
let
go :: Int -> IO ()
go acc =
recv >>= \case
NoMoreElems _ -> return ()
FinalElem n _ -> send (acc + n) >> go (acc + n)
FinalElem n _ -> send (acc + n)
StreamElem n -> send (acc + n) >> go (acc + n)
in
go 0
8 changes: 4 additions & 4 deletions util/Network/GRPC/Util/HTTP2/Stream.hs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ data OutputStream = OutputStream {
}

data InputStream = InputStream {
_getChunk :: HasCallStack => IO Strict.ByteString
_getChunk :: HasCallStack => IO (Strict.ByteString, Bool)
, _getTrailers :: HasCallStack => IO [HTTP.Header]
}

Expand All @@ -54,7 +54,7 @@ writeChunk = _writeChunk
flush :: HasCallStack => OutputStream -> IO ()
flush = _flush

getChunk :: HasCallStack => InputStream -> IO Strict.ByteString
getChunk :: HasCallStack => InputStream -> IO (Strict.ByteString, Bool)
getChunk = _getChunk

getTrailers :: HasCallStack => InputStream -> IO [HTTP.Header]
Expand All @@ -68,7 +68,7 @@ serverInputStream :: Server.Request -> IO InputStream
serverInputStream req = do
return InputStream {
_getChunk = wrapStreamExceptionsWith ClientDisconnected $
Server.getRequestBodyChunk req
Server.getRequestBodyChunk' req
, _getTrailers = wrapStreamExceptionsWith ClientDisconnected $
maybe [] fromHeaderTable <$>
Server.getRequestTrailers req
Expand Down Expand Up @@ -132,7 +132,7 @@ clientInputStream :: Client.Response -> IO InputStream
clientInputStream resp = do
return InputStream {
_getChunk = wrapStreamExceptionsWith ServerDisconnected $
Client.getResponseBodyChunk resp
Client.getResponseBodyChunk' resp
, _getTrailers = wrapStreamExceptionsWith ServerDisconnected $
maybe [] fromHeaderTable <$>
Client.getResponseTrailers resp
Expand Down
94 changes: 68 additions & 26 deletions util/Network/GRPC/Util/Parser.hs
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,12 @@ module Network.GRPC.Util.Parser (
, consumeExactly
, getExactly
-- * Execution
, IsFinal
, Leftover
, ProcessResult(..)
, processAll
, processAllIO
) where

import Control.Exception
import Control.Monad
import Data.Bifunctor
import Data.Binary (Get)
Expand Down Expand Up @@ -158,33 +159,74 @@ getExactly len get =
Execution
-------------------------------------------------------------------------------}

type IsFinal = Bool
type Leftover = Lazy.ByteString

data ProcessResult e b =
-- | Parse error during processing
ProcessError e

-- | Parsing succeeded (compare to 'ProcessedWithoutFinal')
| ProcessedWithFinal b Leftover

-- | Parsing succeeded, but we did not recognize the final message on time
--
-- There are two ways that parsing can terminate: the final few chunks may
-- look like this:
--
-- > chunk1 -- not marked final
-- > chunk2 -- not marked final
-- > chunk3 -- marked final
--
-- or like this:
--
-- > chunk1 -- not marked final
-- > chunk2 -- not marked final
-- > chunk3 -- not marked final
-- > empty chunk -- marked final
--
-- In the former case, we know that we are processing the final message /as/
-- we are processing it ('ProcessedFinal'); in the latter case, we realize
-- this only after we receive the final empty chunk.
| ProcessedWithoutFinal Leftover

-- | Process all incoming data
--
-- Returns any unprocessed data.
processAll :: forall m e a.
-- Also returns if we knew that the final result
-- was in fact the final result when we received it (this may or may not be the
-- case, depending on
processAll :: forall m e a b.
Monad m
=> m Strict.ByteString -- ^ Get next chunk (empty indicates end of input)
-> (a -> m ()) -- ^ Process single value
-> Parser e a -- ^ Parser
-> m (Either e Lazy.ByteString)
processAll getChunk processOne parser =
=> m (Strict.ByteString, IsFinal) -- ^ Get next chunk
-> (a -> m ()) -- ^ Process value
-> (a -> m b) -- ^ Process final value
-> Parser e a -- ^ Parser
-> m (ProcessResult e b)
processAll getChunk processOne processFinal parser =
go $ runParser parser nil
where
go :: Result e a -> m (Either e Lazy.ByteString)
go (Failed err) = return $ Left err
go (Done result bs') = processOne result >> go (runParser parser bs')
go (NeedData parser' acc) = do
bs <- getChunk
if not (BS.Strict.null bs)
then go $ runParser parser' (snoc acc bs)
else return $ Right (toLazy acc)

-- | Wrapper around 'processAll' that throws errors as exceptions
processAllIO :: forall e a.
Exception e
=> IO Strict.ByteString
-> (a -> IO ())
-> Parser e a
-> IO Lazy.ByteString
processAllIO getChunk processOne parser =
processAll getChunk processOne parser >>= either throwIO return
go :: Result e a -> m (ProcessResult e b)
go (Failed err) = return $ ProcessError err
go (Done a left) = processOne a >> go (runParser parser left)
go (NeedData parser' left) = do
(bs, isFinal) <- getChunk
if not isFinal
then go $ runParser parser' (left `snoc` bs)
else goFinal [] $ runParser parser' (left `snoc` bs)

-- We have received the final chunk; extract all messages until we are done
goFinal :: [a] -> Result e a -> m (ProcessResult e b)
goFinal _ (Failed err) = return $ ProcessError err
goFinal acc (Done a left) = goFinal (a:acc) $ runParser parser left
goFinal acc (NeedData _ left) = do
mb <- processLastFew (reverse acc)
return $ case mb of
Just b -> ProcessedWithFinal b $ toLazy left
Nothing -> ProcessedWithoutFinal $ toLazy left

processLastFew :: [a] -> m (Maybe b)
processLastFew [] = return Nothing
processLastFew [a] = Just <$> processFinal a
processLastFew (a:as) = processOne a >> processLastFew as

Loading

0 comments on commit 9303873

Please sign in to comment.