From 931d6b10e67aa5e45a33829ff459b3a0e62b6a66 Mon Sep 17 00:00:00 2001 From: Finley McIlwaine Date: Thu, 11 Jul 2024 09:58:36 -0700 Subject: [PATCH] Fix `outboundTrailersMaker` In current `http2`, `respond` returns when the body is sent and before the trailers are constructed. In the new `http2` architecture, `respond` does not return until *after* trailers are constructed, which deadlocks `grapesy`. --- util/Network/GRPC/Util/Session/Channel.hs | 22 ++++++++++------------ util/Network/GRPC/Util/Session/Server.hs | 7 ++++--- util/Network/GRPC/Util/Thread.hs | 10 ++++++++++ 3 files changed, 24 insertions(+), 15 deletions(-) diff --git a/util/Network/GRPC/Util/Session/Channel.hs b/util/Network/GRPC/Util/Session/Channel.hs index 2d4b018a..9b1dd0be 100644 --- a/util/Network/GRPC/Util/Session/Channel.hs +++ b/util/Network/GRPC/Util/Session/Channel.hs @@ -611,20 +611,18 @@ outboundTrailersMaker :: forall sess. IsSession sess => sess -> Channel sess + -> RegularFlowState (Outbound sess) -> HTTP2.TrailersMaker -outboundTrailersMaker sess channel = go +outboundTrailersMaker sess Channel{channelOutbound} regular = go where go :: HTTP2.TrailersMaker go (Just _) = return $ HTTP2.NextTrailersMaker go go Nothing = do - -- Wait for the thread to terminate - -- - -- If the thread was killed, this will throw an exception (which will - -- then result in @http2@ cancelling the corresponding stream). - flowState <- waitForOutbound channel - trailers <- case flowState of - FlowStateRegular regular -> - atomically $ readTMVar $ flowTerminated regular - FlowStateNoMessages _ -> - error "unexpected FlowStateNoMessages" - return $ HTTP2.Trailers $ buildOutboundTrailers sess trailers + mFlowState <- atomically $ do + (Right <$> readTMVar (flowTerminated regular)) + `orElse` (Left <$> waitForAbnormalThreadTermination channelOutbound) + case mFlowState of + Right trailers -> + return $ HTTP2.Trailers $ buildOutboundTrailers sess trailers + Left _exception -> + return $ HTTP2.Trailers [] diff --git a/util/Network/GRPC/Util/Session/Server.hs b/util/Network/GRPC/Util/Session/Server.hs index 9f1e43c9..33570a98 100644 --- a/util/Network/GRPC/Util/Session/Server.hs +++ b/util/Network/GRPC/Util/Session/Server.hs @@ -77,7 +77,7 @@ setupResponseChannel sess regular <- initFlowStateRegular headers markReady $ FlowStateRegular regular let resp :: Server.Response - resp = setResponseTrailers sess channel + resp = setResponseTrailers sess channel regular $ Server.responseStreaming (responseStatus responseInfo) (responseHeaders responseInfo) @@ -103,7 +103,8 @@ setResponseTrailers :: IsSession sess => sess -> Channel sess + -> RegularFlowState (Outbound sess) -> Server.Response -> Server.Response -setResponseTrailers sess channel resp = +setResponseTrailers sess channel regular resp = Server.setResponseTrailersMaker resp $ - outboundTrailersMaker sess channel + outboundTrailersMaker sess channel regular diff --git a/util/Network/GRPC/Util/Thread.hs b/util/Network/GRPC/Util/Thread.hs index d3993b4b..adafaee2 100644 --- a/util/Network/GRPC/Util/Thread.hs +++ b/util/Network/GRPC/Util/Thread.hs @@ -16,6 +16,7 @@ module Network.GRPC.Util.Thread ( , cancelThread , withThreadInterface , waitForNormalThreadTermination + , waitForAbnormalThreadTermination , waitForNormalOrAbnormalThreadTermination , hasThreadTerminated ) where @@ -316,6 +317,15 @@ waitForNormalOrAbnormalThreadTermination :: waitForNormalOrAbnormalThreadTermination state = hasThreadTerminated state >>= maybe retry return +-- | Wait for the thread to terminate abnormally +waitForAbnormalThreadTermination :: + TVar (ThreadState a) + -> STM SomeException +waitForAbnormalThreadTermination state = + hasThreadTerminated state >>= \case + Just (Left e) -> return e + _otherwise -> retry + -- | Has the thread terminated? hasThreadTerminated :: TVar (ThreadState a)