Skip to content

Commit

Permalink
Detect final element
Browse files Browse the repository at this point in the history
Closes #114.
  • Loading branch information
edsko committed Jun 7, 2024
1 parent c39f943 commit 083124d
Show file tree
Hide file tree
Showing 13 changed files with 275 additions and 123 deletions.
10 changes: 10 additions & 0 deletions .github/workflows/haskell-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,16 @@ jobs:
allow-newer: proto-lens:base
allow-newer: serialise:base
source-repository-package
type: git
location: https://github.com/edsko/http2
tag: b90f71fa599b90cd8b71cedbaf5ebe852fbb2888
source-repository-package
type: git
location: https://github.com/edsko/http-semantics
tag: b96d1ae71aa8d56d2dbfcfb89ede82f6ef994e8b
package grapesy
tests: True
benchmarks: True
Expand Down
12 changes: 11 additions & 1 deletion cabal.project
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,16 @@ package grapesy
benchmarks: True
flags: +build-demo +build-stress-test +snappy

source-repository-package
type: git
location: https://github.com/edsko/http2
tag: b90f71fa599b90cd8b71cedbaf5ebe852fbb2888

source-repository-package
type: git
location: https://github.com/edsko/http-semantics
tag: b96d1ae71aa8d56d2dbfcfb89ede82f6ef994e8b

--
-- ghc 9.10
--
Expand All @@ -14,4 +24,4 @@ allow-newer: http2:containers
allow-newer: pipes-safe:base
allow-newer: proto-lens-runtime:base
allow-newer: proto-lens:base
allow-newer: serialise:base
allow-newer: serialise:base
12 changes: 11 additions & 1 deletion cabal.project.ci
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,16 @@ package grapesy
flags: +build-demo +build-stress-test +snappy
ghc-options: -Werror

source-repository-package
type: git
location: https://github.com/edsko/http2
tag: b90f71fa599b90cd8b71cedbaf5ebe852fbb2888

source-repository-package
type: git
location: https://github.com/edsko/http-semantics
tag: b96d1ae71aa8d56d2dbfcfb89ede82f6ef994e8b

--
-- ghc 9.10
--
Expand All @@ -15,4 +25,4 @@ allow-newer: http2:containers
allow-newer: pipes-safe:base
allow-newer: proto-lens-runtime:base
allow-newer: proto-lens:base
allow-newer: serialise:base
allow-newer: serialise:base
4 changes: 2 additions & 2 deletions grapesy.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ library
, exceptions >= 0.10 && < 0.11
, hashable >= 1.3 && < 1.5
, http-types >= 0.12 && < 0.13
, http2 >= 5.2.1 && < 5.3
, http2 >= 5.2.2 && < 5.3
, http2-tls >= 0.2.11 && < 0.3
, lens >= 5.0 && < 5.4
, mtl >= 2.2 && < 2.4
Expand Down Expand Up @@ -339,7 +339,7 @@ test-suite test-grapesy
, containers >= 0.6 && < 0.8
, exceptions >= 0.10 && < 0.11
, http-types >= 0.12 && < 0.13
, http2 >= 5.2.1 && < 5.3
, http2 >= 5.2.2 && < 5.3
, mtl >= 2.2 && < 2.4
, network >= 3.1 && < 3.3
, proto-lens-runtime >= 0.7 && < 0.8
Expand Down
4 changes: 0 additions & 4 deletions src/Network/GRPC/Client.hs
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,3 @@ import Network.GRPC.Util.TLS qualified as Util.TLS
-- The specialized functions from "Network.GRPC.Client.StreamType" take care of
-- this; if these functions are not applicable, users may wish to use
-- 'recvFinalOutput'.
--
-- **KNOWN LIMITATION**: for _incoming_ messages @grapesy@ cannot currently
-- make this distinction. For detailed discussion, see
-- <https://github.com/well-typed/grapesy/issues/114>.
11 changes: 6 additions & 5 deletions test-grapesy/Test/Driver/Dialogue/Execution.hs
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,12 @@ clientLocal clock call = \(LocalSteps steps) ->
expect (tick, action) (== ResponseInitialMetadata expectedMetadata) $
receivedMetadata
Send (FinalElem a b) -> do
-- <https://github.com/well-typed/grapesy/issues/114>
-- On the client side, when the server sends the final message, we
-- will receive that final message in one HTTP data frame, and then
-- the trailers in another. This means that when we get the message,
-- we do not yet know if this is in fact the last.
-- (This is different on the server side, because gRPC does not
-- support trailers on the client side.)
reactToServer tick $ Send (StreamElem a)
reactToServer tick $ Send (NoMoreElems b)
Send expectedElem -> do
Expand Down Expand Up @@ -401,10 +406,6 @@ serverLocal clock call = \(LocalSteps steps) -> do
case action of
Initiate _ ->
error "serverLocal: unexpected ClientInitiateRequest"
Send (FinalElem a b) -> do
-- <https://github.com/well-typed/grapesy/issues/114>
reactToClient tick $ Send (StreamElem a)
reactToClient tick $ Send (NoMoreElems b)
Send expectedElem -> do
mInp <- liftIO $ try $ within timeoutReceive action $
Server.Binary.recvInput call
Expand Down
63 changes: 48 additions & 15 deletions test-grapesy/Test/Prop/IncrementalParsing.hs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
module Test.Prop.IncrementalParsing (tests) where

import Control.Monad.Except
import Control.Monad.State
import Control.Monad
import Control.Monad.Except (MonadError, Except, runExcept, throwError)
import Control.Monad.State (MonadState, StateT, runStateT, state)
import Data.ByteString qualified as BS.Strict
import Data.ByteString qualified as Strict (ByteString)
import Data.ByteString.Lazy qualified as BS.Lazy
Expand All @@ -24,10 +25,10 @@ tests = testGroup "Test.Prop.IncrementalParsing" [
testProperty "parser" test_parser
]

test_parser :: Input -> [ChunkSize] -> PhraseSize -> Property
test_parser input splits phraseSize =
test_parser :: MarkLast -> Input -> [ChunkSize] -> PhraseSize -> Property
test_parser markLast input splits phraseSize =
counterexample ("chunks: " ++ show chunks)
$ case processAll chunks processPhrase (parsePhrase phraseSize) of
$ case processAll markLast chunks phraseSize of
Left err ->
counterexample ("Unexpected failure " ++ show err) $ False
Right unconsumed ->
Expand Down Expand Up @@ -64,29 +65,33 @@ runPure chunks =
. flip runStateT chunks
. unwrapPure

getChunk :: Pure Strict.ByteString
getChunk =
getChunk :: MarkLast -> Pure (Strict.ByteString, Bool)
getChunk (MarkLast markLast) =
state $ \case
[] -> (BS.Strict.empty, [])
c:cs -> (c, cs)
[] -> ((BS.Strict.empty, True), [])
[c] -> ((c, markLast), [])
c:cs -> ((c, False), cs)

processAll ::
[Strict.ByteString]
-> (a -> Pure ())
-> Parser String a
MarkLast
-> [Strict.ByteString]
-> PhraseSize
-> Either String Lazy.ByteString
processAll chunks processOne p =
processAll markLast chunks phraseSize =
runPure chunks aux >>= verifyAllChunksConsumed
where
p :: Parser String [Word8]
p = parsePhrase phraseSize

-- 'processAll' does not assume that the monad @m@ in which it is executed
-- has any way of reporting errors: if there is a parse failure during
-- execution, this failure is returned as a value. For the specific case of
-- 'Pure', however, we /can/ throw errors in @m@ (to allow 'processOne' to
-- throw errors), so we can reuse that also for any parse failures.
aux :: Pure Lazy.ByteString
aux =
Parser.processAll getChunk processOne p
>>= either throwError return
Parser.processAll (getChunk markLast) processPhrase processPhrase p
>>= throwParseErrors

-- 'processAll' should run until all chunks are used
verifyAllChunksConsumed ::
Expand All @@ -99,6 +104,28 @@ processAll chunks processOne p =
| otherwise
= Left "not all chunks consumed"

throwParseErrors :: Parser.ProcessResult String () -> Pure Lazy.ByteString
throwParseErrors (Parser.ProcessError err) =
throwError err
throwParseErrors (Parser.ProcessedWithFinal () bs) = do
unless canMarkFinal $ throwError "Unexpected ProcessedWithFinal"
return bs
throwParseErrors (Parser.ProcessedWithoutFinal bs) = do
when canMarkFinal $ throwError "Unexpected ProcessedWithoutFinal"
return bs

-- We can mark the final phrase as final if the final chunk is marked as
-- final, and when we get that chunk, it contains at least one phrase.
canMarkFinal :: Bool
canMarkFinal = and [
getMarkLast markLast
, case reverse chunks of
[] -> False
c:cs -> let left = sum (map BS.Strict.length cs)
`mod` getPhraseSize phraseSize
in (left + BS.Strict.length c) >= getPhraseSize phraseSize
]

{-------------------------------------------------------------------------------
Test input
Expand All @@ -111,6 +138,8 @@ processAll chunks processOne p =
```
* We split this input into non-empty chunks of varying sizes @[ChunkSize]@.
We sometimes mark the last chunk as being the last, and sometimes don't
(see <https://github.com/well-typed/grapesy/issues/114>).
* We then choose a non-zero 'PhraseSize' @n@. The idea is that the parser
splits the input into phrases of @n@ bytes
Expand All @@ -134,6 +163,7 @@ processAll chunks processOne p =
(in 'processAll') that all input chunks are fed to the parser.
-------------------------------------------------------------------------------}

newtype MarkLast = MarkLast { getMarkLast :: Bool } deriving (Show)
newtype Input = Input { getInputBytes :: [Word8] } deriving (Show)
newtype ChunkSize = ChunkSize { getChunkSize :: Int } deriving (Show)
newtype PhraseSize = PhraseSize { getPhraseSize :: Int } deriving (Show)
Expand Down Expand Up @@ -179,6 +209,8 @@ processPhrase phrase =
Arbitrary instances
-------------------------------------------------------------------------------}

deriving newtype instance Arbitrary MarkLast

instance Arbitrary Input where
arbitrary = sized $ \n -> do
len <- choose (0, n * 100)
Expand All @@ -187,3 +219,4 @@ instance Arbitrary Input where

deriving via Positive Int instance Arbitrary ChunkSize
deriving via Positive Int instance Arbitrary PhraseSize

34 changes: 33 additions & 1 deletion test-grapesy/Test/Sanity/EndOfStream.hs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ tests = testGroup "Test.Sanity.EndOfStream" [
, testCase "recvTrailers" test_recvTrailers
]
, testGroup "server" [
testCase "recvEndOfInput" test_recvEndOfInput
testCase "recvInput" test_recvInput
, testCase "recvEndOfInput" test_recvEndOfInput
]
]

Expand Down Expand Up @@ -155,6 +156,37 @@ serverStreamingHandler = Server.streamingRpcHandler $
Server tests
-------------------------------------------------------------------------------}

-- | Test that the final element is marked as 'FinalElem'
--
-- Verifies that <https://github.com/well-typed/grapesy/issues/114> is solved.
--
-- NOTE: There is no client equivalent for this test. On the client side, the
-- server will send trailers, and so /cannot/ make the final data frame as
-- end-of-stream.
test_recvInput :: Assertion
test_recvInput = testClientServer $ ClientServerTest {
config = def
, server = [Server.someRpcHandler handler]
, client = simpleTestClient $ \conn ->
Client.withRPC conn def (Proxy @Trivial) $ \call -> do
Client.sendFinalInput call BS.Lazy.empty
_resp <- Client.recvFinalOutput call
return ()
}
where
handler :: Server.RpcHandler IO Trivial
handler = Server.mkRpcHandler $ \call -> do
x <- Server.recvInput call

-- The purpose of this test:
case x of
FinalElem{} ->
return ()
_otherwise ->
assertFailure "Expected FinalElem"

Server.sendFinalOutput call (mempty, NoMetadata)

-- | Test that 'recvEndOfInput' does /not/ throw an exception, even if the
-- previous 'recvNextInput' /happened/ to give us the final input.
--
Expand Down
3 changes: 2 additions & 1 deletion test-grapesy/Test/Sanity/StreamingType/CustomFormat.hs
Original file line number Diff line number Diff line change
Expand Up @@ -232,10 +232,11 @@ test_calculator_cbor = do
biDiStreamingSumHandler = Server.streamingRpcHandler $
Server.mkBiDiStreaming $ \recv send ->
let
go :: Int -> IO ()
go acc =
recv >>= \case
NoMoreElems _ -> return ()
FinalElem n _ -> send (acc + n) >> go (acc + n)
FinalElem n _ -> send (acc + n)
StreamElem n -> send (acc + n) >> go (acc + n)
in
go 0
Loading

0 comments on commit 083124d

Please sign in to comment.