diff --git a/grapesy.cabal b/grapesy.cabal index 5648795a..0369cf23 100644 --- a/grapesy.cabal +++ b/grapesy.cabal @@ -194,7 +194,7 @@ library , exceptions >= 0.10 && < 0.11 , hashable >= 1.3 && < 1.5 , http-types >= 0.12 && < 0.13 - , http2 >= 5.2.1 && < 5.3 + , http2 >= 5.2.2 && < 5.3 , http2-tls >= 0.2.11 && < 0.3 , lens >= 5.0 && < 5.4 , mtl >= 2.2 && < 2.4 @@ -339,7 +339,7 @@ test-suite test-grapesy , containers >= 0.6 && < 0.8 , exceptions >= 0.10 && < 0.11 , http-types >= 0.12 && < 0.13 - , http2 >= 5.2.1 && < 5.3 + , http2 >= 5.2.2 && < 5.3 , mtl >= 2.2 && < 2.4 , network >= 3.1 && < 3.3 , proto-lens-runtime >= 0.7 && < 0.8 diff --git a/src/Network/GRPC/Client.hs b/src/Network/GRPC/Client.hs index 8352a4ab..05bd9a3d 100644 --- a/src/Network/GRPC/Client.hs +++ b/src/Network/GRPC/Client.hs @@ -187,7 +187,3 @@ import Network.GRPC.Util.TLS qualified as Util.TLS -- The specialized functions from "Network.GRPC.Client.StreamType" take care of -- this; if these functions are not applicable, users may wish to use -- 'recvFinalOutput'. --- --- **KNOWN LIMITATION**: for _incoming_ messages @grapesy@ cannot currently --- make this distinction. For detailed discussion, see --- . \ No newline at end of file diff --git a/test-grapesy/Test/Driver/Dialogue/Execution.hs b/test-grapesy/Test/Driver/Dialogue/Execution.hs index 988e3b4f..cbd27b70 100644 --- a/test-grapesy/Test/Driver/Dialogue/Execution.hs +++ b/test-grapesy/Test/Driver/Dialogue/Execution.hs @@ -206,7 +206,12 @@ clientLocal clock call = \(LocalSteps steps) -> expect (tick, action) (== ResponseInitialMetadata expectedMetadata) $ receivedMetadata Send (FinalElem a b) -> do - -- + -- On the client side, when the server sends the final message, we + -- will receive that final message in one HTTP data frame, and then + -- the trailers in another. This means that when we get the message, + -- we do not yet know if this is in fact the last. + -- (This is different on the server side, because gRPC does not + -- support trailers on the client side.) reactToServer tick $ Send (StreamElem a) reactToServer tick $ Send (NoMoreElems b) Send expectedElem -> do @@ -401,10 +406,6 @@ serverLocal clock call = \(LocalSteps steps) -> do case action of Initiate _ -> error "serverLocal: unexpected ClientInitiateRequest" - Send (FinalElem a b) -> do - -- - reactToClient tick $ Send (StreamElem a) - reactToClient tick $ Send (NoMoreElems b) Send expectedElem -> do mInp <- liftIO $ try $ within timeoutReceive action $ Server.Binary.recvInput call diff --git a/test-grapesy/Test/Prop/IncrementalParsing.hs b/test-grapesy/Test/Prop/IncrementalParsing.hs index 56cf7850..21eaddfb 100644 --- a/test-grapesy/Test/Prop/IncrementalParsing.hs +++ b/test-grapesy/Test/Prop/IncrementalParsing.hs @@ -1,7 +1,8 @@ module Test.Prop.IncrementalParsing (tests) where -import Control.Monad.Except -import Control.Monad.State +import Control.Monad +import Control.Monad.Except (MonadError, Except, runExcept, throwError) +import Control.Monad.State (MonadState, StateT, runStateT, state) import Data.ByteString qualified as BS.Strict import Data.ByteString qualified as Strict (ByteString) import Data.ByteString.Lazy qualified as BS.Lazy @@ -24,10 +25,10 @@ tests = testGroup "Test.Prop.IncrementalParsing" [ testProperty "parser" test_parser ] -test_parser :: Input -> [ChunkSize] -> PhraseSize -> Property -test_parser input splits phraseSize = +test_parser :: MarkLast -> Input -> [ChunkSize] -> PhraseSize -> Property +test_parser markLast input splits phraseSize = counterexample ("chunks: " ++ show chunks) - $ case processAll chunks processPhrase (parsePhrase phraseSize) of + $ case processAll markLast chunks phraseSize of Left err -> counterexample ("Unexpected failure " ++ show err) $ False Right unconsumed -> @@ -64,20 +65,24 @@ runPure chunks = . flip runStateT chunks . unwrapPure -getChunk :: Pure Strict.ByteString -getChunk = +getChunk :: MarkLast -> Pure (Strict.ByteString, Bool) +getChunk (MarkLast markLast) = state $ \case - [] -> (BS.Strict.empty, []) - c:cs -> (c, cs) + [] -> ((BS.Strict.empty, True), []) + [c] -> ((c, markLast), []) + c:cs -> ((c, False), cs) processAll :: - [Strict.ByteString] - -> (a -> Pure ()) - -> Parser String a + MarkLast + -> [Strict.ByteString] + -> PhraseSize -> Either String Lazy.ByteString -processAll chunks processOne p = +processAll markLast chunks phraseSize = runPure chunks aux >>= verifyAllChunksConsumed where + p :: Parser String [Word8] + p = parsePhrase phraseSize + -- 'processAll' does not assume that the monad @m@ in which it is executed -- has any way of reporting errors: if there is a parse failure during -- execution, this failure is returned as a value. For the specific case of @@ -85,8 +90,8 @@ processAll chunks processOne p = -- throw errors), so we can reuse that also for any parse failures. aux :: Pure Lazy.ByteString aux = - Parser.processAll getChunk processOne p - >>= either throwError return + Parser.processAll (getChunk markLast) processPhrase processPhrase p + >>= throwParseErrors -- 'processAll' should run until all chunks are used verifyAllChunksConsumed :: @@ -99,6 +104,28 @@ processAll chunks processOne p = | otherwise = Left "not all chunks consumed" + throwParseErrors :: Parser.ProcessResult String () -> Pure Lazy.ByteString + throwParseErrors (Parser.ProcessError err) = + throwError err + throwParseErrors (Parser.ProcessedWithFinal () bs) = do + unless canMarkFinal $ throwError "Unexpected ProcessedWithFinal" + return bs + throwParseErrors (Parser.ProcessedWithoutFinal bs) = do + when canMarkFinal $ throwError "Unexpected ProcessedWithoutFinal" + return bs + + -- We can mark the final phrase as final if the final chunk is marked as + -- final, and when we get that chunk, it contains at least one phrase. + canMarkFinal :: Bool + canMarkFinal = and [ + getMarkLast markLast + , case reverse chunks of + [] -> False + c:cs -> let left = sum (map BS.Strict.length cs) + `mod` getPhraseSize phraseSize + in (left + BS.Strict.length c) >= getPhraseSize phraseSize + ] + {------------------------------------------------------------------------------- Test input @@ -111,6 +138,8 @@ processAll chunks processOne p = ``` * We split this input into non-empty chunks of varying sizes @[ChunkSize]@. + We sometimes mark the last chunk as being the last, and sometimes don't + (see ). * We then choose a non-zero 'PhraseSize' @n@. The idea is that the parser splits the input into phrases of @n@ bytes @@ -134,6 +163,7 @@ processAll chunks processOne p = (in 'processAll') that all input chunks are fed to the parser. -------------------------------------------------------------------------------} +newtype MarkLast = MarkLast { getMarkLast :: Bool } deriving (Show) newtype Input = Input { getInputBytes :: [Word8] } deriving (Show) newtype ChunkSize = ChunkSize { getChunkSize :: Int } deriving (Show) newtype PhraseSize = PhraseSize { getPhraseSize :: Int } deriving (Show) @@ -179,6 +209,8 @@ processPhrase phrase = Arbitrary instances -------------------------------------------------------------------------------} +deriving newtype instance Arbitrary MarkLast + instance Arbitrary Input where arbitrary = sized $ \n -> do len <- choose (0, n * 100) @@ -187,3 +219,4 @@ instance Arbitrary Input where deriving via Positive Int instance Arbitrary ChunkSize deriving via Positive Int instance Arbitrary PhraseSize + diff --git a/test-grapesy/Test/Sanity/EndOfStream.hs b/test-grapesy/Test/Sanity/EndOfStream.hs index 47760253..dd956729 100644 --- a/test-grapesy/Test/Sanity/EndOfStream.hs +++ b/test-grapesy/Test/Sanity/EndOfStream.hs @@ -29,7 +29,8 @@ tests = testGroup "Test.Sanity.EndOfStream" [ , testCase "recvTrailers" test_recvTrailers ] , testGroup "server" [ - testCase "recvEndOfInput" test_recvEndOfInput + testCase "recvInput" test_recvInput + , testCase "recvEndOfInput" test_recvEndOfInput ] ] @@ -155,6 +156,37 @@ serverStreamingHandler = Server.streamingRpcHandler $ Server tests -------------------------------------------------------------------------------} +-- | Test that the final element is marked as 'FinalElem' +-- +-- Verifies that is solved. +-- +-- NOTE: There is no client equivalent for this test. On the client side, the +-- server will send trailers, and so /cannot/ make the final data frame as +-- end-of-stream. +test_recvInput :: Assertion +test_recvInput = testClientServer $ ClientServerTest { + config = def + , server = [Server.someRpcHandler handler] + , client = simpleTestClient $ \conn -> + Client.withRPC conn def (Proxy @Trivial) $ \call -> do + Client.sendFinalInput call BS.Lazy.empty + _resp <- Client.recvFinalOutput call + return () + } + where + handler :: Server.RpcHandler IO Trivial + handler = Server.mkRpcHandler $ \call -> do + x <- Server.recvInput call + + -- The purpose of this test: + case x of + FinalElem{} -> + return () + _otherwise -> + assertFailure "Expected FinalElem" + + Server.sendFinalOutput call (mempty, NoMetadata) + -- | Test that 'recvEndOfInput' does /not/ throw an exception, even if the -- previous 'recvNextInput' /happened/ to give us the final input. -- diff --git a/test-grapesy/Test/Sanity/StreamingType/CustomFormat.hs b/test-grapesy/Test/Sanity/StreamingType/CustomFormat.hs index 931c9d4e..180e95d8 100644 --- a/test-grapesy/Test/Sanity/StreamingType/CustomFormat.hs +++ b/test-grapesy/Test/Sanity/StreamingType/CustomFormat.hs @@ -232,10 +232,11 @@ test_calculator_cbor = do biDiStreamingSumHandler = Server.streamingRpcHandler $ Server.mkBiDiStreaming $ \recv send -> let + go :: Int -> IO () go acc = recv >>= \case NoMoreElems _ -> return () - FinalElem n _ -> send (acc + n) >> go (acc + n) + FinalElem n _ -> send (acc + n) StreamElem n -> send (acc + n) >> go (acc + n) in go 0 diff --git a/util/Network/GRPC/Util/HTTP2/Stream.hs b/util/Network/GRPC/Util/HTTP2/Stream.hs index 5d021442..259180ca 100644 --- a/util/Network/GRPC/Util/HTTP2/Stream.hs +++ b/util/Network/GRPC/Util/HTTP2/Stream.hs @@ -40,7 +40,7 @@ data OutputStream = OutputStream { } data InputStream = InputStream { - _getChunk :: HasCallStack => IO Strict.ByteString + _getChunk :: HasCallStack => IO (Strict.ByteString, Bool) , _getTrailers :: HasCallStack => IO [HTTP.Header] } @@ -54,7 +54,7 @@ writeChunk = _writeChunk flush :: HasCallStack => OutputStream -> IO () flush = _flush -getChunk :: HasCallStack => InputStream -> IO Strict.ByteString +getChunk :: HasCallStack => InputStream -> IO (Strict.ByteString, Bool) getChunk = _getChunk getTrailers :: HasCallStack => InputStream -> IO [HTTP.Header] @@ -68,7 +68,7 @@ serverInputStream :: Server.Request -> IO InputStream serverInputStream req = do return InputStream { _getChunk = wrapStreamExceptionsWith ClientDisconnected $ - Server.getRequestBodyChunk req + Server.getRequestBodyChunk' req , _getTrailers = wrapStreamExceptionsWith ClientDisconnected $ maybe [] fromHeaderTable <$> Server.getRequestTrailers req @@ -132,7 +132,7 @@ clientInputStream :: Client.Response -> IO InputStream clientInputStream resp = do return InputStream { _getChunk = wrapStreamExceptionsWith ServerDisconnected $ - Client.getResponseBodyChunk resp + Client.getResponseBodyChunk' resp , _getTrailers = wrapStreamExceptionsWith ServerDisconnected $ maybe [] fromHeaderTable <$> Client.getResponseTrailers resp diff --git a/util/Network/GRPC/Util/Parser.hs b/util/Network/GRPC/Util/Parser.hs index 89c426bf..60ac1746 100644 --- a/util/Network/GRPC/Util/Parser.hs +++ b/util/Network/GRPC/Util/Parser.hs @@ -10,11 +10,12 @@ module Network.GRPC.Util.Parser ( , consumeExactly , getExactly -- * Execution + , IsFinal + , Leftover + , ProcessResult(..) , processAll - , processAllIO ) where -import Control.Exception import Control.Monad import Data.Bifunctor import Data.Binary (Get) @@ -158,33 +159,74 @@ getExactly len get = Execution -------------------------------------------------------------------------------} +type IsFinal = Bool +type Leftover = Lazy.ByteString + +data ProcessResult e b = + -- | Parse error during processing + ProcessError e + + -- | Parsing succeeded (compare to 'ProcessedWithoutFinal') + | ProcessedWithFinal b Leftover + + -- | Parsing succeeded, but we did not recognize the final message on time + -- + -- There are two ways that parsing can terminate: the final few chunks may + -- look like this: + -- + -- > chunk1 -- not marked final + -- > chunk2 -- not marked final + -- > chunk3 -- marked final + -- + -- or like this: + -- + -- > chunk1 -- not marked final + -- > chunk2 -- not marked final + -- > chunk3 -- not marked final + -- > empty chunk -- marked final + -- + -- In the former case, we know that we are processing the final message /as/ + -- we are processing it ('ProcessedFinal'); in the latter case, we realize + -- this only after we receive the final empty chunk. + | ProcessedWithoutFinal Leftover + -- | Process all incoming data -- -- Returns any unprocessed data. -processAll :: forall m e a. +-- Also returns if we knew that the final result +-- was in fact the final result when we received it (this may or may not be the +-- case, depending on +processAll :: forall m e a b. Monad m - => m Strict.ByteString -- ^ Get next chunk (empty indicates end of input) - -> (a -> m ()) -- ^ Process single value - -> Parser e a -- ^ Parser - -> m (Either e Lazy.ByteString) -processAll getChunk processOne parser = + => m (Strict.ByteString, IsFinal) -- ^ Get next chunk + -> (a -> m ()) -- ^ Process value + -> (a -> m b) -- ^ Process final value + -> Parser e a -- ^ Parser + -> m (ProcessResult e b) +processAll getChunk processOne processFinal parser = go $ runParser parser nil where - go :: Result e a -> m (Either e Lazy.ByteString) - go (Failed err) = return $ Left err - go (Done result bs') = processOne result >> go (runParser parser bs') - go (NeedData parser' acc) = do - bs <- getChunk - if not (BS.Strict.null bs) - then go $ runParser parser' (snoc acc bs) - else return $ Right (toLazy acc) - --- | Wrapper around 'processAll' that throws errors as exceptions -processAllIO :: forall e a. - Exception e - => IO Strict.ByteString - -> (a -> IO ()) - -> Parser e a - -> IO Lazy.ByteString -processAllIO getChunk processOne parser = - processAll getChunk processOne parser >>= either throwIO return + go :: Result e a -> m (ProcessResult e b) + go (Failed err) = return $ ProcessError err + go (Done a left) = processOne a >> go (runParser parser left) + go (NeedData parser' left) = do + (bs, isFinal) <- getChunk + if not isFinal + then go $ runParser parser' (left `snoc` bs) + else goFinal [] $ runParser parser' (left `snoc` bs) + + -- We have received the final chunk; extract all messages until we are done + goFinal :: [a] -> Result e a -> m (ProcessResult e b) + goFinal _ (Failed err) = return $ ProcessError err + goFinal acc (Done a left) = goFinal (a:acc) $ runParser parser left + goFinal acc (NeedData _ left) = do + mb <- processLastFew (reverse acc) + return $ case mb of + Just b -> ProcessedWithFinal b $ toLazy left + Nothing -> ProcessedWithoutFinal $ toLazy left + + processLastFew :: [a] -> m (Maybe b) + processLastFew [] = return Nothing + processLastFew [a] = Just <$> processFinal a + processLastFew (a:as) = processOne a >> processLastFew as + diff --git a/util/Network/GRPC/Util/Session/Channel.hs b/util/Network/GRPC/Util/Session/Channel.hs index 30d3c1f0..741900de 100644 --- a/util/Network/GRPC/Util/Session/Channel.hs +++ b/util/Network/GRPC/Util/Session/Channel.hs @@ -272,10 +272,6 @@ send Channel{channelOutbound, channelSentFinal} msg = -- sending the HTTP trailers in the same frame, then we will return the message -- and the trailers together. It is a bug to call 'recvBoth' again after this; -- doing so will result in a 'RecvAfterFinal' exception. --- --- TODO: --- Although we provide this API, even /if/ the sender marks the message as --- final when they send it, we currently cannot propagate this. recvBoth :: forall sess. HasCallStack => Channel sess @@ -564,9 +560,6 @@ sendMessageLoop sess st stream = do return trailers -- | Receive all messages sent by the node's peer --- --- TODO: . --- We are never marking the final element as final. recvMessageLoop :: forall sess. IsSession sess => sess @@ -578,18 +571,45 @@ recvMessageLoop sess st stream = where go :: Parser String (Message (Inbound sess)) -> IO (Trailers (Inbound sess)) go parser = do - leftover <- Parser.processAllIO + mProcessedFinal <- throwParseErrors =<< Parser.processAll (getChunk stream) - (atomically . putTMVar (flowMsg st) . StreamElem) - (first PeerSentMalformedMessage parser) - unless (BS.Lazy.null leftover) $ - throwIO PeerSentIncompleteMessage + processOne + processFinal + parser + case mProcessedFinal of + Just trailers -> + return trailers + Nothing -> do + trailers <- processTrailers + atomically $ putTMVar (flowMsg st) $ NoMoreElems trailers + return trailers - trailers <- parseInboundTrailers sess =<< getTrailers stream + processOne :: Message (Inbound sess) -> IO () + processOne msg = do + atomically $ putTMVar (flowMsg st) $ StreamElem msg + + processFinal :: Message (Inbound sess) -> IO (Trailers (Inbound sess)) + processFinal msg = do + trailers <- processTrailers + atomically $ putTMVar (flowMsg st) $ FinalElem msg trailers + return trailers + + processTrailers :: IO (Trailers (Inbound sess)) + processTrailers = do + trailers <- parseInboundTrailers sess =<< getTrailers stream atomically $ putTMVar (flowTerminated st) $ trailers - atomically $ putTMVar (flowMsg st) $ NoMoreElems trailers return trailers + throwParseErrors :: Parser.ProcessResult String b -> IO (Maybe b) + throwParseErrors (Parser.ProcessError err) = + throwIO $ PeerSentMalformedMessage err + throwParseErrors (Parser.ProcessedWithFinal b leftover) = do + unless (BS.Lazy.null leftover) $ throwIO PeerSentIncompleteMessage + return $ Just b + throwParseErrors (Parser.ProcessedWithoutFinal leftover) = do + unless (BS.Lazy.null leftover) $ throwIO PeerSentIncompleteMessage + return $ Nothing + outboundTrailersMaker :: forall sess. IsSession sess => sess