Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix treatment of async exceptions #138

Merged
merged 2 commits into from
Jul 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Network/HTTP2/Client.hs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ module Network.HTTP2.Client (
emptyFrameRateLimit,
rstRateLimit,


-- * Common configuration
Config (..),
allocSimpleConfig,
Expand Down
16 changes: 6 additions & 10 deletions Network/HTTP2/Client/Run.hs
Original file line number Diff line number Diff line change
Expand Up @@ -140,16 +140,8 @@ setup ClientConfig{..} conf@Config{..} = do

runH2 :: Config -> Context -> IO a -> IO a
runH2 conf ctx runClient = do
stopAfter mgr (race runBackgroundThreads runClient) $ \res -> do
closeAllStreams (oddStreamTable ctx) (evenStreamTable ctx) $
either Just (const Nothing) res
case res of
Left err ->
throwIO err
Right (Left ()) ->
undefined -- never reach
Right (Right x) ->
return x
stopAfter mgr (clientResult <$> race runBackgroundThreads runClient) $ \res ->
closeAllStreams (oddStreamTable ctx) (evenStreamTable ctx) res
where
mgr = threadManager ctx
runReceiver = frameReceiver ctx conf
Expand All @@ -158,6 +150,10 @@ runH2 conf ctx runClient = do
labelMe "H2 runBackgroundThreads"
concurrently_ runReceiver runSender

clientResult :: Either () a -> a
clientResult (Left ()) = undefined -- unreachable
clientResult (Right a) = a

sendRequest
:: Config
-> Context
Expand Down
47 changes: 28 additions & 19 deletions Network/HTTP2/H2/Manager.hs
Original file line number Diff line number Diff line change
Expand Up @@ -28,22 +28,22 @@ import Imports

----------------------------------------------------------------

data Command =
Stop (MVar ()) (Maybe SomeException)
| Add ThreadId
| RegisterTimeout ThreadId T.Handle
| Delete ThreadId
data Command
= Stop (MVar ()) (Maybe SomeException)
| Add ThreadId
| RegisterTimeout ThreadId T.Handle
| Delete ThreadId

-- | Manager to manage the thread and the timer.
data Manager = Manager (TQueue Command) (TVar Int) T.Manager

data TimeoutHandle =
ThreadWithTimeout T.Handle
| ThreadWithoutTimeout
data TimeoutHandle
= ThreadWithTimeout T.Handle
| ThreadWithoutTimeout

cancelTimeout :: TimeoutHandle -> IO ()
cancelTimeout (ThreadWithTimeout h) = T.cancel h
cancelTimeout ThreadWithoutTimeout = return ()
cancelTimeout ThreadWithoutTimeout = return ()

type ManagedThreads = Map ThreadId TimeoutHandle

Expand Down Expand Up @@ -79,16 +79,25 @@ start timmgr = do
go q threadMap

-- | Stopping the manager.
stopAfter :: Manager -> IO a -> (Either SomeException a -> IO b) -> IO b
--
-- The action is run in the scope of an exception handler that catches all
-- exceptions (including asynchronous ones); this allows the cleanup handler
-- to cleanup in all circumstances. If an exception is caught, it is rethrown
-- after the cleanup is complete.
stopAfter :: Manager -> IO a -> (Maybe SomeException -> IO ()) -> IO a
stopAfter (Manager q _ _) action cleanup = do
mask $ \unmask -> do
ma <- try $ unmask action
ma <- trySyncOrAsync $ unmask action
signalTimeoutsDisabled <- newEmptyMVar
atomically $ writeTQueue q $ Stop signalTimeoutsDisabled (either Just (const Nothing) ma)
atomically $
writeTQueue q $
Stop signalTimeoutsDisabled (either Just (const Nothing) ma)
-- This call to takeMVar /will/ eventually succeed, because the Manager
-- thread cannot be killed (see comment on 'go' in 'start').
takeMVar signalTimeoutsDisabled
cleanup ma
case ma of
Left err -> cleanup (Just err) >> throwIO err
Right a -> cleanup Nothing >> return a

----------------------------------------------------------------

Expand All @@ -111,7 +120,7 @@ forkManagedUnmask mgr label io =
incCounter mgr
-- We catch the exception and do not rethrow it: we don't want the
-- exception printed to stderr.
io unmask `catch` \(_e :: SomeException) -> return ()
io unmask `catchSyncOrAsync` \(_e :: SomeException) -> return ()
deleteMyId mgr
decCounter mgr
where
Expand Down Expand Up @@ -159,17 +168,17 @@ kill signalTimeoutsDisabled threadMap err = do
forM_ (Map.elems threadMap) cancelTimeout
putMVar signalTimeoutsDisabled ()
forM_ (Map.keys threadMap) $ \tid ->
E.throwTo tid $ KilledByHttp2ThreadManager err
E.throwTo tid $ KilledByHttp2ThreadManager err

-- | Killing the IO action of the second argument on timeout.
timeoutKillThread :: Manager -> (T.Handle -> IO a) -> IO a
timeoutKillThread (Manager q _ tmgr) action = E.bracket register T.cancel action
where
register = do
h <- T.registerKillThread tmgr (return ())
tid <- myThreadId
atomically $ writeTQueue q (RegisterTimeout tid h)
return h
h <- T.registerKillThread tmgr (return ())
tid <- myThreadId
atomically $ writeTQueue q (RegisterTimeout tid h)
return h

-- | Registering closer for a resource and
-- returning a timer refresher.
Expand Down
11 changes: 2 additions & 9 deletions Network/HTTP2/Server/Run.hs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import Network.HTTP.Semantics.Server
import Network.HTTP.Semantics.Server.Internal
import Network.Socket (SockAddr)
import UnliftIO.Async (concurrently_)
import UnliftIO.Exception

import Network.HTTP2.Frame
import Network.HTTP2.H2
Expand Down Expand Up @@ -129,14 +128,8 @@ runH2 conf ctx = do
runReceiver = frameReceiver ctx conf
runSender = frameSender ctx conf
runBackgroundThreads = concurrently_ runReceiver runSender
stopAfter mgr runBackgroundThreads $ \res -> do
closeAllStreams (oddStreamTable ctx) (evenStreamTable ctx) $
either Just (const Nothing) res
case res of
Left err ->
throwIO err
Right x ->
return x
stopAfter mgr runBackgroundThreads $ \res ->
closeAllStreams (oddStreamTable ctx) (evenStreamTable ctx) res

-- connClose must not be called here since Run:fork calls it
goaway :: Config -> ErrorCode -> ByteString -> IO ()
Expand Down
Loading