Skip to content

Commit

Permalink
fix regression: client should enqueue atomically
Browse files Browse the repository at this point in the history
  • Loading branch information
kazu-yamamoto committed Jul 9, 2024
1 parent 80de8db commit 21ae2ce
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 14 deletions.
6 changes: 4 additions & 2 deletions Network/HTTP2/Client/Run.hs
Original file line number Diff line number Diff line change
Expand Up @@ -216,12 +216,14 @@ sendHeaderBody Config{..} ctx@Context{..} sid newstrm OutObj{..} = do
q <- sendStreaming ctx newstrm strmbdy
let next = nextForStreaming q
return (Just next, Just q)
(vs, out) <-
prepareSync newstrm (OHeader outObjHeaders mnext outObjTrailers) mtbq
atomically $ do
sidOK <- readTVar outputQStreamID
check (sidOK == sid)
enqueueOutputSTM outputQ out
writeTVar outputQStreamID (sid + 2)
forkManaged threadManager "H2 worker" $
syncWithSender ctx newstrm (OHeader outObjHeaders mnext outObjTrailers) mtbq
forkManaged threadManager "H2 worker" $ syncWithSender ctx newstrm vs
where
nextForStreaming
:: TBQueue StreamingChunk
Expand Down
4 changes: 4 additions & 0 deletions Network/HTTP2/H2/Queue.hs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ import Network.HTTP2.H2.Types
enqueueOutput :: TQueue Output -> Output -> IO ()
enqueueOutput outQ out = atomically $ writeTQueue outQ out

{-# INLINE enqueueOutputSTM #-}
enqueueOutputSTM :: TQueue Output -> Output -> STM ()
enqueueOutputSTM outQ out = writeTQueue outQ out

{-# INLINE enqueueControl #-}
enqueueControl :: TQueue Control -> Control -> IO ()
enqueueControl ctlQ ctl = atomically $ writeTQueue ctlQ ctl
Expand Down
26 changes: 16 additions & 10 deletions Network/HTTP2/H2/Sync.hs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{-# LANGUAGE RecordWildCards #-}

module Network.HTTP2.H2.Sync (makeSync, syncWithSender) where
module Network.HTTP2.H2.Sync (prepareSync, syncWithSender) where

import Control.Concurrent
import Network.HTTP.Semantics.IO
Expand All @@ -11,26 +11,32 @@ import Network.HTTP2.H2.Queue
import Network.HTTP2.H2.Types
import Network.HTTP2.H2.Window

syncWithSender
:: Context
-> Stream
prepareSync
:: Stream
-> OutputType
-> Maybe (TBQueue StreamingChunk)
-> IO ()
syncWithSender Context{..} strm otyp mtbq = do
-> IO ((MVar Sync, Maybe OutputType -> IO Bool), Output)
prepareSync strm otyp mtbq = do
var <- newEmptyMVar
let sync = makeSync strm mtbq (putMVar var)
enqueueOutput outputQ $ Output strm otyp sync
loop var sync
out = Output strm otyp sync
return ((var, sync), out)

syncWithSender
:: Context
-> Stream
-> (MVar Sync, Maybe OutputType -> IO Bool)
-> IO ()
syncWithSender Context{..} strm (var, sync) = loop
where
loop var sync = do
loop = do
s <- takeMVar var
case s of
Done -> return ()
Cont wait newotyp -> do
wait
enqueueOutput outputQ $ Output strm newotyp sync
loop var sync
loop

makeSync
:: Stream
Expand Down
9 changes: 7 additions & 2 deletions Network/HTTP2/Server/Worker.hs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,9 @@ pushStream conf ctx@Context{..} pstrm reqvt pps0
]
ot = OPush promiseRequest pid
Response rsp = promiseResponse pp
syncWithSender ctx newstrm ot Nothing
(vc, out) <- prepareSync newstrm ot Nothing
enqueueOutput outputQ out
syncWithSender ctx newstrm vc
increment tvar
sendHeaderBody conf ctx th newstrm rsp
push tvar pps (n + 1)
Expand Down Expand Up @@ -124,7 +126,10 @@ sendHeaderBody Config{..} ctx@Context{..} th strm OutObj{..} = do
let next = nextForStreaming q
return (Just next, Just q)
OutBodyStreamingUnmask _ -> error "OutBodyStreamingUnmask is not supported in server"
syncWithSender ctx strm (OHeader outObjHeaders mnext outObjTrailers) mtbq
(vc, out) <-
prepareSync strm (OHeader outObjHeaders mnext outObjTrailers) mtbq
enqueueOutput outputQ out
syncWithSender ctx strm vc
where
nextForStreaming
:: TBQueue StreamingChunk
Expand Down

0 comments on commit 21ae2ce

Please sign in to comment.