diff --git a/.github/workflows/haskell-ci.yml b/.github/workflows/haskell-ci.yml index 35eee7f7..62079c87 100644 --- a/.github/workflows/haskell-ci.yml +++ b/.github/workflows/haskell-ci.yml @@ -8,9 +8,9 @@ # # For more information, see https://github.com/haskell-CI/haskell-ci # -# version: 0.19.20240501 +# version: 0.19.20240514 # -# REGENDATA ("0.19.20240501",["github","cabal.project.ci"]) +# REGENDATA ("0.19.20240514",["github","cabal.project.ci"]) # name: Haskell-CI on: @@ -28,9 +28,9 @@ jobs: strategy: matrix: include: - - compiler: ghc-9.10.0.20240426 + - compiler: ghc-9.10.1 compilerKind: ghc - compilerVersion: 9.10.0.20240426 + compilerVersion: 9.10.1 setup-method: ghcup allow-failure: false - compiler: ghc-9.8.2 @@ -67,7 +67,6 @@ jobs: mkdir -p "$HOME/.ghcup/bin" curl -sL https://downloads.haskell.org/ghcup/0.1.20.0/x86_64-linux-ghcup-0.1.20.0 > "$HOME/.ghcup/bin/ghcup" chmod a+x "$HOME/.ghcup/bin/ghcup" - "$HOME/.ghcup/bin/ghcup" config add-release-channel https://raw.githubusercontent.com/haskell/ghcup-metadata/master/ghcup-prereleases-0.0.8.yaml; "$HOME/.ghcup/bin/ghcup" install ghc "$HCVER" || (cat "$HOME"/.ghcup/logs/*.* && false) "$HOME/.ghcup/bin/ghcup" install cabal 3.10.2.0 || (cat "$HOME"/.ghcup/logs/*.* && false) apt-get update @@ -94,7 +93,7 @@ jobs: echo "HCNUMVER=$HCNUMVER" >> "$GITHUB_ENV" echo "ARG_TESTS=--enable-tests" >> "$GITHUB_ENV" echo "ARG_BENCH=--enable-benchmarks" >> "$GITHUB_ENV" - if [ $((HCNUMVER >= 91000)) -ne 0 ] ; then echo "HEADHACKAGE=true" >> "$GITHUB_ENV" ; else echo "HEADHACKAGE=false" >> "$GITHUB_ENV" ; fi + echo "HEADHACKAGE=false" >> "$GITHUB_ENV" echo "ARG_COMPILER=--$HCKIND --with-compiler=$HC" >> "$GITHUB_ENV" echo "GHCJSARITH=0" >> "$GITHUB_ENV" env: @@ -123,18 +122,6 @@ jobs: repository hackage.haskell.org url: http://hackage.haskell.org/ EOF - if $HEADHACKAGE; then - cat >> $CABAL_CONFIG <> $CABAL_CONFIG <> cabal.project - fi - $HCPKG list --simple-output --names-only | perl -ne 'for (split /\s+/) { print "constraints: $_ installed\n" unless /^(grapesy)$/; }' >> cabal.project.local + $HCPKG list --simple-output --names-only | perl -ne 'for (split /\s+/) { print "constraints: any.$_ installed\n" unless /^(grapesy)$/; }' >> cabal.project.local cat cabal.project cat cabal.project.local - name: dump install plan diff --git a/grapesy.cabal b/grapesy.cabal index e9730210..eddd5b30 100644 --- a/grapesy.cabal +++ b/grapesy.cabal @@ -194,7 +194,7 @@ library , exceptions >= 0.10 && < 0.11 , hashable >= 1.3 && < 1.5 , http-types >= 0.12 && < 0.13 - , http2 >= 5.2.1 && < 5.3 + , http2 >= 5.2.2 && < 5.3 , http2-tls >= 0.2.11 && < 0.3 , lens >= 5.0 && < 5.4 , mtl >= 2.2 && < 2.4 @@ -305,7 +305,7 @@ test-suite test-grapesy , containers >= 0.6 && < 0.8 , exceptions >= 0.10 && < 0.11 , http-types >= 0.12 && < 0.13 - , http2 >= 5.2.1 && < 5.3 + , http2 >= 5.2.2 && < 5.3 , mtl >= 2.2 && < 2.4 , network >= 3.1 && < 3.3 , proto-lens-runtime >= 0.7 && < 0.8 diff --git a/src/Network/GRPC/Client.hs b/src/Network/GRPC/Client.hs index 8352a4ab..05bd9a3d 100644 --- a/src/Network/GRPC/Client.hs +++ b/src/Network/GRPC/Client.hs @@ -187,7 +187,3 @@ import Network.GRPC.Util.TLS qualified as Util.TLS -- The specialized functions from "Network.GRPC.Client.StreamType" take care of -- this; if these functions are not applicable, users may wish to use -- 'recvFinalOutput'. --- --- **KNOWN LIMITATION**: for _incoming_ messages @grapesy@ cannot currently --- make this distinction. For detailed discussion, see --- . \ No newline at end of file diff --git a/test-grapesy/Test/Driver/Dialogue/Execution.hs b/test-grapesy/Test/Driver/Dialogue/Execution.hs index 988e3b4f..cbd27b70 100644 --- a/test-grapesy/Test/Driver/Dialogue/Execution.hs +++ b/test-grapesy/Test/Driver/Dialogue/Execution.hs @@ -206,7 +206,12 @@ clientLocal clock call = \(LocalSteps steps) -> expect (tick, action) (== ResponseInitialMetadata expectedMetadata) $ receivedMetadata Send (FinalElem a b) -> do - -- + -- On the client side, when the server sends the final message, we + -- will receive that final message in one HTTP data frame, and then + -- the trailers in another. This means that when we get the message, + -- we do not yet know if this is in fact the last. + -- (This is different on the server side, because gRPC does not + -- support trailers on the client side.) reactToServer tick $ Send (StreamElem a) reactToServer tick $ Send (NoMoreElems b) Send expectedElem -> do @@ -401,10 +406,6 @@ serverLocal clock call = \(LocalSteps steps) -> do case action of Initiate _ -> error "serverLocal: unexpected ClientInitiateRequest" - Send (FinalElem a b) -> do - -- - reactToClient tick $ Send (StreamElem a) - reactToClient tick $ Send (NoMoreElems b) Send expectedElem -> do mInp <- liftIO $ try $ within timeoutReceive action $ Server.Binary.recvInput call diff --git a/test-grapesy/Test/Prop/IncrementalParsing.hs b/test-grapesy/Test/Prop/IncrementalParsing.hs index 56cf7850..219ccd5a 100644 --- a/test-grapesy/Test/Prop/IncrementalParsing.hs +++ b/test-grapesy/Test/Prop/IncrementalParsing.hs @@ -64,11 +64,12 @@ runPure chunks = . flip runStateT chunks . unwrapPure -getChunk :: Pure Strict.ByteString +getChunk :: Pure (Strict.ByteString, Bool) getChunk = state $ \case - [] -> (BS.Strict.empty, []) - c:cs -> (c, cs) + [] -> ((BS.Strict.empty, True), []) + [c] -> ((c, True), []) + c:cs -> ((c, False), cs) processAll :: [Strict.ByteString] @@ -85,8 +86,8 @@ processAll chunks processOne p = -- throw errors), so we can reuse that also for any parse failures. aux :: Pure Lazy.ByteString aux = - Parser.processAll getChunk processOne p - >>= either throwError return + Parser.processAll getChunk processOne processOne p + >>= throwParseErrors -- 'processAll' should run until all chunks are used verifyAllChunksConsumed :: @@ -99,6 +100,15 @@ processAll chunks processOne p = | otherwise = Left "not all chunks consumed" + -- TODO: Verify 'ProcessedFinal'/'ProcessedWithoutFinal' + throwParseErrors :: Parser.ProcessResult String () -> Pure Lazy.ByteString + throwParseErrors (Parser.ProcessError err) = + throwError err + throwParseErrors (Parser.ProcessedWithFinal () bs) = + return bs + throwParseErrors (Parser.ProcessedWithoutFinal bs) = + return bs + {------------------------------------------------------------------------------- Test input diff --git a/test-grapesy/Test/Sanity/EndOfStream.hs b/test-grapesy/Test/Sanity/EndOfStream.hs index 47760253..dd956729 100644 --- a/test-grapesy/Test/Sanity/EndOfStream.hs +++ b/test-grapesy/Test/Sanity/EndOfStream.hs @@ -29,7 +29,8 @@ tests = testGroup "Test.Sanity.EndOfStream" [ , testCase "recvTrailers" test_recvTrailers ] , testGroup "server" [ - testCase "recvEndOfInput" test_recvEndOfInput + testCase "recvInput" test_recvInput + , testCase "recvEndOfInput" test_recvEndOfInput ] ] @@ -155,6 +156,37 @@ serverStreamingHandler = Server.streamingRpcHandler $ Server tests -------------------------------------------------------------------------------} +-- | Test that the final element is marked as 'FinalElem' +-- +-- Verifies that is solved. +-- +-- NOTE: There is no client equivalent for this test. On the client side, the +-- server will send trailers, and so /cannot/ make the final data frame as +-- end-of-stream. +test_recvInput :: Assertion +test_recvInput = testClientServer $ ClientServerTest { + config = def + , server = [Server.someRpcHandler handler] + , client = simpleTestClient $ \conn -> + Client.withRPC conn def (Proxy @Trivial) $ \call -> do + Client.sendFinalInput call BS.Lazy.empty + _resp <- Client.recvFinalOutput call + return () + } + where + handler :: Server.RpcHandler IO Trivial + handler = Server.mkRpcHandler $ \call -> do + x <- Server.recvInput call + + -- The purpose of this test: + case x of + FinalElem{} -> + return () + _otherwise -> + assertFailure "Expected FinalElem" + + Server.sendFinalOutput call (mempty, NoMetadata) + -- | Test that 'recvEndOfInput' does /not/ throw an exception, even if the -- previous 'recvNextInput' /happened/ to give us the final input. -- diff --git a/test-grapesy/Test/Sanity/StreamingType/CustomFormat.hs b/test-grapesy/Test/Sanity/StreamingType/CustomFormat.hs index 931c9d4e..180e95d8 100644 --- a/test-grapesy/Test/Sanity/StreamingType/CustomFormat.hs +++ b/test-grapesy/Test/Sanity/StreamingType/CustomFormat.hs @@ -232,10 +232,11 @@ test_calculator_cbor = do biDiStreamingSumHandler = Server.streamingRpcHandler $ Server.mkBiDiStreaming $ \recv send -> let + go :: Int -> IO () go acc = recv >>= \case NoMoreElems _ -> return () - FinalElem n _ -> send (acc + n) >> go (acc + n) + FinalElem n _ -> send (acc + n) StreamElem n -> send (acc + n) >> go (acc + n) in go 0 diff --git a/util/Network/GRPC/Util/HTTP2/Stream.hs b/util/Network/GRPC/Util/HTTP2/Stream.hs index 5d021442..259180ca 100644 --- a/util/Network/GRPC/Util/HTTP2/Stream.hs +++ b/util/Network/GRPC/Util/HTTP2/Stream.hs @@ -40,7 +40,7 @@ data OutputStream = OutputStream { } data InputStream = InputStream { - _getChunk :: HasCallStack => IO Strict.ByteString + _getChunk :: HasCallStack => IO (Strict.ByteString, Bool) , _getTrailers :: HasCallStack => IO [HTTP.Header] } @@ -54,7 +54,7 @@ writeChunk = _writeChunk flush :: HasCallStack => OutputStream -> IO () flush = _flush -getChunk :: HasCallStack => InputStream -> IO Strict.ByteString +getChunk :: HasCallStack => InputStream -> IO (Strict.ByteString, Bool) getChunk = _getChunk getTrailers :: HasCallStack => InputStream -> IO [HTTP.Header] @@ -68,7 +68,7 @@ serverInputStream :: Server.Request -> IO InputStream serverInputStream req = do return InputStream { _getChunk = wrapStreamExceptionsWith ClientDisconnected $ - Server.getRequestBodyChunk req + Server.getRequestBodyChunk' req , _getTrailers = wrapStreamExceptionsWith ClientDisconnected $ maybe [] fromHeaderTable <$> Server.getRequestTrailers req @@ -132,7 +132,7 @@ clientInputStream :: Client.Response -> IO InputStream clientInputStream resp = do return InputStream { _getChunk = wrapStreamExceptionsWith ServerDisconnected $ - Client.getResponseBodyChunk resp + Client.getResponseBodyChunk' resp , _getTrailers = wrapStreamExceptionsWith ServerDisconnected $ maybe [] fromHeaderTable <$> Client.getResponseTrailers resp diff --git a/util/Network/GRPC/Util/Parser.hs b/util/Network/GRPC/Util/Parser.hs index 89c426bf..60ac1746 100644 --- a/util/Network/GRPC/Util/Parser.hs +++ b/util/Network/GRPC/Util/Parser.hs @@ -10,11 +10,12 @@ module Network.GRPC.Util.Parser ( , consumeExactly , getExactly -- * Execution + , IsFinal + , Leftover + , ProcessResult(..) , processAll - , processAllIO ) where -import Control.Exception import Control.Monad import Data.Bifunctor import Data.Binary (Get) @@ -158,33 +159,74 @@ getExactly len get = Execution -------------------------------------------------------------------------------} +type IsFinal = Bool +type Leftover = Lazy.ByteString + +data ProcessResult e b = + -- | Parse error during processing + ProcessError e + + -- | Parsing succeeded (compare to 'ProcessedWithoutFinal') + | ProcessedWithFinal b Leftover + + -- | Parsing succeeded, but we did not recognize the final message on time + -- + -- There are two ways that parsing can terminate: the final few chunks may + -- look like this: + -- + -- > chunk1 -- not marked final + -- > chunk2 -- not marked final + -- > chunk3 -- marked final + -- + -- or like this: + -- + -- > chunk1 -- not marked final + -- > chunk2 -- not marked final + -- > chunk3 -- not marked final + -- > empty chunk -- marked final + -- + -- In the former case, we know that we are processing the final message /as/ + -- we are processing it ('ProcessedFinal'); in the latter case, we realize + -- this only after we receive the final empty chunk. + | ProcessedWithoutFinal Leftover + -- | Process all incoming data -- -- Returns any unprocessed data. -processAll :: forall m e a. +-- Also returns if we knew that the final result +-- was in fact the final result when we received it (this may or may not be the +-- case, depending on +processAll :: forall m e a b. Monad m - => m Strict.ByteString -- ^ Get next chunk (empty indicates end of input) - -> (a -> m ()) -- ^ Process single value - -> Parser e a -- ^ Parser - -> m (Either e Lazy.ByteString) -processAll getChunk processOne parser = + => m (Strict.ByteString, IsFinal) -- ^ Get next chunk + -> (a -> m ()) -- ^ Process value + -> (a -> m b) -- ^ Process final value + -> Parser e a -- ^ Parser + -> m (ProcessResult e b) +processAll getChunk processOne processFinal parser = go $ runParser parser nil where - go :: Result e a -> m (Either e Lazy.ByteString) - go (Failed err) = return $ Left err - go (Done result bs') = processOne result >> go (runParser parser bs') - go (NeedData parser' acc) = do - bs <- getChunk - if not (BS.Strict.null bs) - then go $ runParser parser' (snoc acc bs) - else return $ Right (toLazy acc) - --- | Wrapper around 'processAll' that throws errors as exceptions -processAllIO :: forall e a. - Exception e - => IO Strict.ByteString - -> (a -> IO ()) - -> Parser e a - -> IO Lazy.ByteString -processAllIO getChunk processOne parser = - processAll getChunk processOne parser >>= either throwIO return + go :: Result e a -> m (ProcessResult e b) + go (Failed err) = return $ ProcessError err + go (Done a left) = processOne a >> go (runParser parser left) + go (NeedData parser' left) = do + (bs, isFinal) <- getChunk + if not isFinal + then go $ runParser parser' (left `snoc` bs) + else goFinal [] $ runParser parser' (left `snoc` bs) + + -- We have received the final chunk; extract all messages until we are done + goFinal :: [a] -> Result e a -> m (ProcessResult e b) + goFinal _ (Failed err) = return $ ProcessError err + goFinal acc (Done a left) = goFinal (a:acc) $ runParser parser left + goFinal acc (NeedData _ left) = do + mb <- processLastFew (reverse acc) + return $ case mb of + Just b -> ProcessedWithFinal b $ toLazy left + Nothing -> ProcessedWithoutFinal $ toLazy left + + processLastFew :: [a] -> m (Maybe b) + processLastFew [] = return Nothing + processLastFew [a] = Just <$> processFinal a + processLastFew (a:as) = processOne a >> processLastFew as + diff --git a/util/Network/GRPC/Util/Session/Channel.hs b/util/Network/GRPC/Util/Session/Channel.hs index 30d3c1f0..741900de 100644 --- a/util/Network/GRPC/Util/Session/Channel.hs +++ b/util/Network/GRPC/Util/Session/Channel.hs @@ -272,10 +272,6 @@ send Channel{channelOutbound, channelSentFinal} msg = -- sending the HTTP trailers in the same frame, then we will return the message -- and the trailers together. It is a bug to call 'recvBoth' again after this; -- doing so will result in a 'RecvAfterFinal' exception. --- --- TODO: --- Although we provide this API, even /if/ the sender marks the message as --- final when they send it, we currently cannot propagate this. recvBoth :: forall sess. HasCallStack => Channel sess @@ -564,9 +560,6 @@ sendMessageLoop sess st stream = do return trailers -- | Receive all messages sent by the node's peer --- --- TODO: . --- We are never marking the final element as final. recvMessageLoop :: forall sess. IsSession sess => sess @@ -578,18 +571,45 @@ recvMessageLoop sess st stream = where go :: Parser String (Message (Inbound sess)) -> IO (Trailers (Inbound sess)) go parser = do - leftover <- Parser.processAllIO + mProcessedFinal <- throwParseErrors =<< Parser.processAll (getChunk stream) - (atomically . putTMVar (flowMsg st) . StreamElem) - (first PeerSentMalformedMessage parser) - unless (BS.Lazy.null leftover) $ - throwIO PeerSentIncompleteMessage + processOne + processFinal + parser + case mProcessedFinal of + Just trailers -> + return trailers + Nothing -> do + trailers <- processTrailers + atomically $ putTMVar (flowMsg st) $ NoMoreElems trailers + return trailers - trailers <- parseInboundTrailers sess =<< getTrailers stream + processOne :: Message (Inbound sess) -> IO () + processOne msg = do + atomically $ putTMVar (flowMsg st) $ StreamElem msg + + processFinal :: Message (Inbound sess) -> IO (Trailers (Inbound sess)) + processFinal msg = do + trailers <- processTrailers + atomically $ putTMVar (flowMsg st) $ FinalElem msg trailers + return trailers + + processTrailers :: IO (Trailers (Inbound sess)) + processTrailers = do + trailers <- parseInboundTrailers sess =<< getTrailers stream atomically $ putTMVar (flowTerminated st) $ trailers - atomically $ putTMVar (flowMsg st) $ NoMoreElems trailers return trailers + throwParseErrors :: Parser.ProcessResult String b -> IO (Maybe b) + throwParseErrors (Parser.ProcessError err) = + throwIO $ PeerSentMalformedMessage err + throwParseErrors (Parser.ProcessedWithFinal b leftover) = do + unless (BS.Lazy.null leftover) $ throwIO PeerSentIncompleteMessage + return $ Just b + throwParseErrors (Parser.ProcessedWithoutFinal leftover) = do + unless (BS.Lazy.null leftover) $ throwIO PeerSentIncompleteMessage + return $ Nothing + outboundTrailersMaker :: forall sess. IsSession sess => sess