Skip to content

Commit

Permalink
synching client and sender with STM
Browse files Browse the repository at this point in the history
  • Loading branch information
kazu-yamamoto committed Jul 8, 2024
1 parent c768049 commit 80de8db
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 23 deletions.
26 changes: 16 additions & 10 deletions Network/HTTP2/Client/Run.hs
Original file line number Diff line number Diff line change
Expand Up @@ -88,14 +88,19 @@ run cconf@ClientConfig{..} conf client = do
x <- processResponse rsp
adjustRxWindow ctx strm
return x
runClient ctx = do
x <- client (clientCore ctx) $ aux ctx
waitCounter0 $ threadManager ctx
let frame = goawayFrame 0 NoError "graceful closing"
mvar <- newMVar ()
enqueueControl (controlQ ctx) $ CGoaway frame mvar
takeMVar mvar
return x
runClient ctx = wrapClinet ctx $ client (clientCore ctx) $ aux ctx

wrapClinet :: Context -> IO a -> IO a
wrapClinet ctx client = do
x <- client
waitCounter0 $ threadManager ctx
let frame = goawayFrame 0 NoError "graceful closing"
enqueueControl (controlQ ctx) $ CFrames Nothing [frame]
enqueueControl (controlQ ctx) $ CFinish GoAwayIsSent
atomically $ do
done <- readTVar $ senderDone ctx
check done
return x

-- | Launching a receiver and a sender.
runIO :: ClientConfig -> Config -> (ClientIO -> IO (IO a)) -> IO a
Expand All @@ -107,8 +112,9 @@ runIO cconf@ClientConfig{..} conf@Config{..} action = do
return (streamNumber strm, strm)
get = getResponse
create = openOddStreamWait ctx
runClient <-
action $ ClientIO confMySockAddr confPeerSockAddr putR get putB create
runClient <- do
act <- action $ ClientIO confMySockAddr confPeerSockAddr putR get putB create
return $ wrapClinet ctx act
runH2 conf ctx runClient

getResponse :: Stream -> IO Response
Expand Down
2 changes: 2 additions & 0 deletions Network/HTTP2/H2/Context.hs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ data Context = Context
, mySockAddr :: SockAddr
, peerSockAddr :: SockAddr
, threadManager :: Manager
, senderDone :: TVar Bool
}

----------------------------------------------------------------
Expand Down Expand Up @@ -128,6 +129,7 @@ newContext rinfo Config{..} cacheSiz connRxWS settings timmgr =
<*> return confMySockAddr
<*> return confPeerSockAddr
<*> start timmgr
<*> newTVarIO False
where
rl = case rinfo of
RIC{} -> Client
Expand Down
6 changes: 3 additions & 3 deletions Network/HTTP2/H2/Receiver.hs
Original file line number Diff line number Diff line change
Expand Up @@ -60,15 +60,15 @@ frameReceiver ctx@Context{..} conf@Config{..} = do
-- to destroy the thread trees.
hd <- confReadN frameHeaderLength
if BS.null hd
then enqueueControl controlQ CFinish
then enqueueControl controlQ $ CFinish ConnectionIsTimeout
else do
processFrame ctx conf $ decodeFrameHeader hd
loop

sendGoaway se
| Just e@ConnectionIsClosed <- E.fromException se = do
| Just ConnectionIsClosed <- E.fromException se = do
waitCounter0 threadManager
enqueueControl controlQ $ CFinish e
enqueueControl controlQ $ CFinish ConnectionIsClosed
| Just e@(ConnectionErrorIsReceived _ _ _) <- E.fromException se =
enqueueControl controlQ $ CFinish e
| Just e@(ConnectionErrorIsSent err sid msg) <- E.fromException se = do
Expand Down
15 changes: 6 additions & 9 deletions Network/HTTP2/H2/Sender.hs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ module Network.HTTP2.H2.Sender (
frameSender,
) where

import Control.Concurrent.MVar (putMVar)
import Data.IORef (modifyIORef', readIORef, writeIORef)
import Data.IntMap.Strict (IntMap)
import Foreign.Ptr (minusPtr, plusPtr)
Expand Down Expand Up @@ -39,6 +38,8 @@ data Switch

wrapException :: E.SomeException -> IO ()
wrapException se
| Just GoAwayIsSent <- E.fromException se = return ()
| Just ConnectionIsClosed <- E.fromException se = return ()
| Just (e :: HTTP2Error) <- E.fromException se = E.throwIO e
| otherwise = E.throwIO $ BadThingHappen se

Expand All @@ -65,10 +66,10 @@ updatePeerSettings Context{peerSettings, oddStreamTable, evenStreamTable} peerAl

frameSender :: Context -> Config -> IO ()
frameSender
ctx@Context{outputQ, controlQ, encodeDynamicTable, outputBufferLimit}
ctx@Context{outputQ, controlQ, encodeDynamicTable, outputBufferLimit, senderDone}
Config{..} = do
labelMe "H2 sender"
loop 0 `E.catch` wrapException
(loop 0 `E.finally` setSenderDone) `E.catch` wrapException
where
----------------------------------------------------------------
loop :: Offset -> IO ()
Expand Down Expand Up @@ -114,12 +115,6 @@ frameSender
-- called with off == 0
control :: Control -> IO ()
control (CFinish e) = E.throwIO e
control (CGoaway bs mvar) = do
buf <- copyAll [bs] confWriteBuffer
let off = buf `minusPtr` confWriteBuffer
flushN off
putMVar mvar ()
E.throwIO GoAwayIsSent
control (CFrames ms xs) = do
buf <- copyAll xs confWriteBuffer
let off = buf `minusPtr` confWriteBuffer
Expand Down Expand Up @@ -373,3 +368,5 @@ frameSender
, flags = flag
, streamId = sid
}

setSenderDone = atomically $ writeTVar senderDone True
1 change: 0 additions & 1 deletion Network/HTTP2/H2/Types.hs
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,6 @@ data Sync = Done | Cont (IO ()) OutputType
data Control
= CFinish HTTP2Error
| CFrames (Maybe SettingsList) [ByteString]
| CGoaway ByteString (MVar ())

----------------------------------------------------------------

Expand Down

0 comments on commit 80de8db

Please sign in to comment.