Skip to content

Commit

Permalink
Detect final element
Browse files Browse the repository at this point in the history
Closes #114.
  • Loading branch information
edsko committed May 29, 2024
1 parent 187fb46 commit 10eb799
Show file tree
Hide file tree
Showing 10 changed files with 170 additions and 84 deletions.
28 changes: 6 additions & 22 deletions .github/workflows/haskell-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@
#
# For more information, see https://github.com/haskell-CI/haskell-ci
#
# version: 0.19.20240501
# version: 0.19.20240514
#
# REGENDATA ("0.19.20240501",["github","cabal.project.ci"])
# REGENDATA ("0.19.20240514",["github","cabal.project.ci"])
#
name: Haskell-CI
on:
Expand All @@ -28,9 +28,9 @@ jobs:
strategy:
matrix:
include:
- compiler: ghc-9.10.0.20240426
- compiler: ghc-9.10.1
compilerKind: ghc
compilerVersion: 9.10.0.20240426
compilerVersion: 9.10.1
setup-method: ghcup
allow-failure: false
- compiler: ghc-9.8.2
Expand Down Expand Up @@ -67,7 +67,6 @@ jobs:
mkdir -p "$HOME/.ghcup/bin"
curl -sL https://downloads.haskell.org/ghcup/0.1.20.0/x86_64-linux-ghcup-0.1.20.0 > "$HOME/.ghcup/bin/ghcup"
chmod a+x "$HOME/.ghcup/bin/ghcup"
"$HOME/.ghcup/bin/ghcup" config add-release-channel https://raw.githubusercontent.com/haskell/ghcup-metadata/master/ghcup-prereleases-0.0.8.yaml;
"$HOME/.ghcup/bin/ghcup" install ghc "$HCVER" || (cat "$HOME"/.ghcup/logs/*.* && false)
"$HOME/.ghcup/bin/ghcup" install cabal 3.10.2.0 || (cat "$HOME"/.ghcup/logs/*.* && false)
apt-get update
Expand All @@ -94,7 +93,7 @@ jobs:
echo "HCNUMVER=$HCNUMVER" >> "$GITHUB_ENV"
echo "ARG_TESTS=--enable-tests" >> "$GITHUB_ENV"
echo "ARG_BENCH=--enable-benchmarks" >> "$GITHUB_ENV"
if [ $((HCNUMVER >= 91000)) -ne 0 ] ; then echo "HEADHACKAGE=true" >> "$GITHUB_ENV" ; else echo "HEADHACKAGE=false" >> "$GITHUB_ENV" ; fi
echo "HEADHACKAGE=false" >> "$GITHUB_ENV"
echo "ARG_COMPILER=--$HCKIND --with-compiler=$HC" >> "$GITHUB_ENV"
echo "GHCJSARITH=0" >> "$GITHUB_ENV"
env:
Expand Down Expand Up @@ -123,18 +122,6 @@ jobs:
repository hackage.haskell.org
url: http://hackage.haskell.org/
EOF
if $HEADHACKAGE; then
cat >> $CABAL_CONFIG <<EOF
repository head.hackage.ghc.haskell.org
url: https://ghc.gitlab.haskell.org/head.hackage/
secure: True
root-keys: 7541f32a4ccca4f97aea3b22f5e593ba2c0267546016b992dfadcd2fe944e55d
26021a13b401500c8eb2761ca95c61f2d625bfef951b939a8124ed12ecf07329
f76d08be13e9a61a377a85e2fb63f4c5435d40f8feb3e12eb05905edb8cdea89
key-threshold: 3
active-repositories: hackage.haskell.org, head.hackage.ghc.haskell.org:override
EOF
fi
cat >> $CABAL_CONFIG <<EOF
program-default-options
ghc-options: $GHCJOBS +RTS -M3G -RTS
Expand Down Expand Up @@ -191,10 +178,7 @@ jobs:
flags: +build-demo +build-stress-test +snappy
ghc-options: -Werror
EOF
if $HEADHACKAGE; then
echo "allow-newer: $($HCPKG list --simple-output | sed -E 's/([a-zA-Z-]+)-[0-9.]+/*:\1,/g')" >> cabal.project
fi
$HCPKG list --simple-output --names-only | perl -ne 'for (split /\s+/) { print "constraints: $_ installed\n" unless /^(grapesy)$/; }' >> cabal.project.local
$HCPKG list --simple-output --names-only | perl -ne 'for (split /\s+/) { print "constraints: any.$_ installed\n" unless /^(grapesy)$/; }' >> cabal.project.local
cat cabal.project
cat cabal.project.local
- name: dump install plan
Expand Down
4 changes: 2 additions & 2 deletions grapesy.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ library
, exceptions >= 0.10 && < 0.11
, hashable >= 1.3 && < 1.5
, http-types >= 0.12 && < 0.13
, http2 >= 5.2.1 && < 5.3
, http2 >= 5.2.2 && < 5.3
, http2-tls >= 0.2.11 && < 0.3
, lens >= 5.0 && < 5.4
, mtl >= 2.2 && < 2.4
Expand Down Expand Up @@ -305,7 +305,7 @@ test-suite test-grapesy
, containers >= 0.6 && < 0.8
, exceptions >= 0.10 && < 0.11
, http-types >= 0.12 && < 0.13
, http2 >= 5.2.1 && < 5.3
, http2 >= 5.2.2 && < 5.3
, mtl >= 2.2 && < 2.4
, network >= 3.1 && < 3.3
, proto-lens-runtime >= 0.7 && < 0.8
Expand Down
4 changes: 0 additions & 4 deletions src/Network/GRPC/Client.hs
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,3 @@ import Network.GRPC.Util.TLS qualified as Util.TLS
-- The specialized functions from "Network.GRPC.Client.StreamType" take care of
-- this; if these functions are not applicable, users may wish to use
-- 'recvFinalOutput'.
--
-- **KNOWN LIMITATION**: for _incoming_ messages @grapesy@ cannot currently
-- make this distinction. For detailed discussion, see
-- <https://github.com/well-typed/grapesy/issues/114>.
11 changes: 6 additions & 5 deletions test-grapesy/Test/Driver/Dialogue/Execution.hs
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,12 @@ clientLocal clock call = \(LocalSteps steps) ->
expect (tick, action) (== ResponseInitialMetadata expectedMetadata) $
receivedMetadata
Send (FinalElem a b) -> do
-- <https://github.com/well-typed/grapesy/issues/114>
-- On the client side, when the server sends the final message, we
-- will receive that final message in one HTTP data frame, and then
-- the trailers in another. This means that when we get the message,
-- we do not yet know if this is in fact the last.
-- (This is different on the server side, because gRPC does not
-- support trailers on the client side.)
reactToServer tick $ Send (StreamElem a)
reactToServer tick $ Send (NoMoreElems b)
Send expectedElem -> do
Expand Down Expand Up @@ -401,10 +406,6 @@ serverLocal clock call = \(LocalSteps steps) -> do
case action of
Initiate _ ->
error "serverLocal: unexpected ClientInitiateRequest"
Send (FinalElem a b) -> do
-- <https://github.com/well-typed/grapesy/issues/114>
reactToClient tick $ Send (StreamElem a)
reactToClient tick $ Send (NoMoreElems b)
Send expectedElem -> do
mInp <- liftIO $ try $ within timeoutReceive action $
Server.Binary.recvInput call
Expand Down
20 changes: 15 additions & 5 deletions test-grapesy/Test/Prop/IncrementalParsing.hs
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,12 @@ runPure chunks =
. flip runStateT chunks
. unwrapPure

getChunk :: Pure Strict.ByteString
getChunk :: Pure (Strict.ByteString, Bool)
getChunk =
state $ \case
[] -> (BS.Strict.empty, [])
c:cs -> (c, cs)
[] -> ((BS.Strict.empty, True), [])
[c] -> ((c, True), [])
c:cs -> ((c, False), cs)

processAll ::
[Strict.ByteString]
Expand All @@ -85,8 +86,8 @@ processAll chunks processOne p =
-- throw errors), so we can reuse that also for any parse failures.
aux :: Pure Lazy.ByteString
aux =
Parser.processAll getChunk processOne p
>>= either throwError return
Parser.processAll getChunk processOne processOne p
>>= throwParseErrors

-- 'processAll' should run until all chunks are used
verifyAllChunksConsumed ::
Expand All @@ -99,6 +100,15 @@ processAll chunks processOne p =
| otherwise
= Left "not all chunks consumed"

-- TODO: Verify 'ProcessedFinal'/'ProcessedWithoutFinal'
throwParseErrors :: Parser.ProcessResult String () -> Pure Lazy.ByteString
throwParseErrors (Parser.ProcessError err) =
throwError err
throwParseErrors (Parser.ProcessedWithFinal () bs) =
return bs
throwParseErrors (Parser.ProcessedWithoutFinal bs) =
return bs

{-------------------------------------------------------------------------------
Test input
Expand Down
34 changes: 33 additions & 1 deletion test-grapesy/Test/Sanity/EndOfStream.hs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ tests = testGroup "Test.Sanity.EndOfStream" [
, testCase "recvTrailers" test_recvTrailers
]
, testGroup "server" [
testCase "recvEndOfInput" test_recvEndOfInput
testCase "recvInput" test_recvInput
, testCase "recvEndOfInput" test_recvEndOfInput
]
]

Expand Down Expand Up @@ -155,6 +156,37 @@ serverStreamingHandler = Server.streamingRpcHandler $
Server tests
-------------------------------------------------------------------------------}

-- | Test that the final element is marked as 'FinalElem'
--
-- Verifies that <https://github.com/well-typed/grapesy/issues/114> is solved.
--
-- NOTE: There is no client equivalent for this test. On the client side, the
-- server will send trailers, and so /cannot/ make the final data frame as
-- end-of-stream.
test_recvInput :: Assertion
test_recvInput = testClientServer $ ClientServerTest {
config = def
, server = [Server.someRpcHandler handler]
, client = simpleTestClient $ \conn ->
Client.withRPC conn def (Proxy @Trivial) $ \call -> do
Client.sendFinalInput call BS.Lazy.empty
_resp <- Client.recvFinalOutput call
return ()
}
where
handler :: Server.RpcHandler IO Trivial
handler = Server.mkRpcHandler $ \call -> do
x <- Server.recvInput call

-- The purpose of this test:
case x of
FinalElem{} ->
return ()
_otherwise ->
assertFailure "Expected FinalElem"

Server.sendFinalOutput call (mempty, NoMetadata)

-- | Test that 'recvEndOfInput' does /not/ throw an exception, even if the
-- previous 'recvNextInput' /happened/ to give us the final input.
--
Expand Down
3 changes: 2 additions & 1 deletion test-grapesy/Test/Sanity/StreamingType/CustomFormat.hs
Original file line number Diff line number Diff line change
Expand Up @@ -232,10 +232,11 @@ test_calculator_cbor = do
biDiStreamingSumHandler = Server.streamingRpcHandler $
Server.mkBiDiStreaming $ \recv send ->
let
go :: Int -> IO ()
go acc =
recv >>= \case
NoMoreElems _ -> return ()
FinalElem n _ -> send (acc + n) >> go (acc + n)
FinalElem n _ -> send (acc + n)
StreamElem n -> send (acc + n) >> go (acc + n)
in
go 0
8 changes: 4 additions & 4 deletions util/Network/GRPC/Util/HTTP2/Stream.hs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ data OutputStream = OutputStream {
}

data InputStream = InputStream {
_getChunk :: HasCallStack => IO Strict.ByteString
_getChunk :: HasCallStack => IO (Strict.ByteString, Bool)
, _getTrailers :: HasCallStack => IO [HTTP.Header]
}

Expand All @@ -54,7 +54,7 @@ writeChunk = _writeChunk
flush :: HasCallStack => OutputStream -> IO ()
flush = _flush

getChunk :: HasCallStack => InputStream -> IO Strict.ByteString
getChunk :: HasCallStack => InputStream -> IO (Strict.ByteString, Bool)
getChunk = _getChunk

getTrailers :: HasCallStack => InputStream -> IO [HTTP.Header]
Expand All @@ -68,7 +68,7 @@ serverInputStream :: Server.Request -> IO InputStream
serverInputStream req = do
return InputStream {
_getChunk = wrapStreamExceptionsWith ClientDisconnected $
Server.getRequestBodyChunk req
Server.getRequestBodyChunk' req
, _getTrailers = wrapStreamExceptionsWith ClientDisconnected $
maybe [] fromHeaderTable <$>
Server.getRequestTrailers req
Expand Down Expand Up @@ -132,7 +132,7 @@ clientInputStream :: Client.Response -> IO InputStream
clientInputStream resp = do
return InputStream {
_getChunk = wrapStreamExceptionsWith ServerDisconnected $
Client.getResponseBodyChunk resp
Client.getResponseBodyChunk' resp
, _getTrailers = wrapStreamExceptionsWith ServerDisconnected $
maybe [] fromHeaderTable <$>
Client.getResponseTrailers resp
Expand Down
94 changes: 68 additions & 26 deletions util/Network/GRPC/Util/Parser.hs
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,12 @@ module Network.GRPC.Util.Parser (
, consumeExactly
, getExactly
-- * Execution
, IsFinal
, Leftover
, ProcessResult(..)
, processAll
, processAllIO
) where

import Control.Exception
import Control.Monad
import Data.Bifunctor
import Data.Binary (Get)
Expand Down Expand Up @@ -158,33 +159,74 @@ getExactly len get =
Execution
-------------------------------------------------------------------------------}

type IsFinal = Bool
type Leftover = Lazy.ByteString

data ProcessResult e b =
-- | Parse error during processing
ProcessError e

-- | Parsing succeeded (compare to 'ProcessedWithoutFinal')
| ProcessedWithFinal b Leftover

-- | Parsing succeeded, but we did not recognize the final message on time
--
-- There are two ways that parsing can terminate: the final few chunks may
-- look like this:
--
-- > chunk1 -- not marked final
-- > chunk2 -- not marked final
-- > chunk3 -- marked final
--
-- or like this:
--
-- > chunk1 -- not marked final
-- > chunk2 -- not marked final
-- > chunk3 -- not marked final
-- > empty chunk -- marked final
--
-- In the former case, we know that we are processing the final message /as/
-- we are processing it ('ProcessedFinal'); in the latter case, we realize
-- this only after we receive the final empty chunk.
| ProcessedWithoutFinal Leftover

-- | Process all incoming data
--
-- Returns any unprocessed data.
processAll :: forall m e a.
-- Also returns if we knew that the final result
-- was in fact the final result when we received it (this may or may not be the
-- case, depending on
processAll :: forall m e a b.
Monad m
=> m Strict.ByteString -- ^ Get next chunk (empty indicates end of input)
-> (a -> m ()) -- ^ Process single value
-> Parser e a -- ^ Parser
-> m (Either e Lazy.ByteString)
processAll getChunk processOne parser =
=> m (Strict.ByteString, IsFinal) -- ^ Get next chunk
-> (a -> m ()) -- ^ Process value
-> (a -> m b) -- ^ Process final value
-> Parser e a -- ^ Parser
-> m (ProcessResult e b)
processAll getChunk processOne processFinal parser =
go $ runParser parser nil
where
go :: Result e a -> m (Either e Lazy.ByteString)
go (Failed err) = return $ Left err
go (Done result bs') = processOne result >> go (runParser parser bs')
go (NeedData parser' acc) = do
bs <- getChunk
if not (BS.Strict.null bs)
then go $ runParser parser' (snoc acc bs)
else return $ Right (toLazy acc)

-- | Wrapper around 'processAll' that throws errors as exceptions
processAllIO :: forall e a.
Exception e
=> IO Strict.ByteString
-> (a -> IO ())
-> Parser e a
-> IO Lazy.ByteString
processAllIO getChunk processOne parser =
processAll getChunk processOne parser >>= either throwIO return
go :: Result e a -> m (ProcessResult e b)
go (Failed err) = return $ ProcessError err
go (Done a left) = processOne a >> go (runParser parser left)
go (NeedData parser' left) = do
(bs, isFinal) <- getChunk
if not isFinal
then go $ runParser parser' (left `snoc` bs)
else goFinal [] $ runParser parser' (left `snoc` bs)

-- We have received the final chunk; extract all messages until we are done
goFinal :: [a] -> Result e a -> m (ProcessResult e b)
goFinal _ (Failed err) = return $ ProcessError err
goFinal acc (Done a left) = goFinal (a:acc) $ runParser parser left
goFinal acc (NeedData _ left) = do
mb <- processLastFew (reverse acc)
return $ case mb of
Just b -> ProcessedWithFinal b $ toLazy left
Nothing -> ProcessedWithoutFinal $ toLazy left

processLastFew :: [a] -> m (Maybe b)
processLastFew [] = return Nothing
processLastFew [a] = Just <$> processFinal a
processLastFew (a:as) = processOne a >> processLastFew as

Loading

0 comments on commit 10eb799

Please sign in to comment.