{-# LANGUAGE CPP, MagicHash, UnboxedTuples, RankNTypes,
ExistentialQuantification #-}
#if __GLASGOW_HASKELL__ >= 701
{-# LANGUAGE Trustworthy #-}
#endif
#if __GLASGOW_HASKELL__ < 710
{-# LANGUAGE DeriveDataTypeable #-}
#endif
{-# OPTIONS -Wall #-}
module Control.Concurrent.Async (
Async,
withAsync, withAsyncBound, withAsyncOn, withAsyncWithUnmask,
withAsyncOnWithUnmask,
wait, poll, waitCatch, asyncThreadId,
cancel, uninterruptibleCancel, cancelWith, AsyncCancelled(..),
race, race_,
concurrently, concurrently_,
mapConcurrently, forConcurrently,
mapConcurrently_, forConcurrently_,
replicateConcurrently, replicateConcurrently_,
Concurrently(..),
compareAsyncs,
waitSTM, pollSTM, waitCatchSTM,
waitAny, waitAnyCatch, waitAnyCancel, waitAnyCatchCancel,
waitEither, waitEitherCatch, waitEitherCancel, waitEitherCatchCancel,
waitEither_,
waitBoth,
waitAnySTM, waitAnyCatchSTM,
waitEitherSTM, waitEitherCatchSTM,
waitEitherSTM_,
waitBothSTM,
async, asyncBound, asyncOn, asyncWithUnmask, asyncOnWithUnmask,
link, linkOnly, link2, link2Only, ExceptionInLinkedThread(..),
) where
import Control.Concurrent.STM
import Control.Exception
import Control.Concurrent
import qualified Data.Foldable as F
#if !MIN_VERSION_base(4,6,0)
import Prelude hiding (catch)
#endif
import Control.Monad
import Control.Applicative
#if !MIN_VERSION_base(4,8,0)
import Data.Monoid (Monoid(mempty,mappend))
import Data.Traversable
#endif
#if __GLASGOW_HASKELL__ < 710
import Data.Typeable
#endif
#if MIN_VERSION_base(4,9,0)
import Data.Semigroup (Semigroup((<>)))
#endif
import Data.Hashable (Hashable(hashWithSalt))
import Data.IORef
import GHC.Exts
import GHC.IO hiding (finally, onException)
import GHC.Conc
data Async a = Async
{ Async a -> ThreadId
asyncThreadId :: {-# UNPACK #-} !ThreadId
, Async a -> STM (Either SomeException a)
_asyncWait :: STM (Either SomeException a)
}
instance Eq (Async a) where
Async ThreadId
a STM (Either SomeException a)
_ == :: Async a -> Async a -> Bool
== Async ThreadId
b STM (Either SomeException a)
_ = ThreadId
a ThreadId -> ThreadId -> Bool
forall a. Eq a => a -> a -> Bool
== ThreadId
b
instance Ord (Async a) where
Async ThreadId
a STM (Either SomeException a)
_ compare :: Async a -> Async a -> Ordering
`compare` Async ThreadId
b STM (Either SomeException a)
_ = ThreadId
a ThreadId -> ThreadId -> Ordering
forall a. Ord a => a -> a -> Ordering
`compare` ThreadId
b
instance Hashable (Async a) where
hashWithSalt :: Int -> Async a -> Int
hashWithSalt Int
salt (Async ThreadId
a STM (Either SomeException a)
_) = Int -> ThreadId -> Int
forall a. Hashable a => Int -> a -> Int
hashWithSalt Int
salt ThreadId
a
instance Functor Async where
fmap :: (a -> b) -> Async a -> Async b
fmap a -> b
f (Async ThreadId
a STM (Either SomeException a)
w) = ThreadId -> STM (Either SomeException b) -> Async b
forall a. ThreadId -> STM (Either SomeException a) -> Async a
Async ThreadId
a ((Either SomeException a -> Either SomeException b)
-> STM (Either SomeException a) -> STM (Either SomeException b)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((a -> b) -> Either SomeException a -> Either SomeException b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap a -> b
f) STM (Either SomeException a)
w)
compareAsyncs :: Async a -> Async b -> Ordering
compareAsyncs :: Async a -> Async b -> Ordering
compareAsyncs (Async ThreadId
t1 STM (Either SomeException a)
_) (Async ThreadId
t2 STM (Either SomeException b)
_) = ThreadId -> ThreadId -> Ordering
forall a. Ord a => a -> a -> Ordering
compare ThreadId
t1 ThreadId
t2
async :: IO a -> IO (Async a)
async :: IO a -> IO (Async a)
async = ((IO () -> IO ThreadId) -> IO a -> IO (Async a))
-> (IO () -> IO ThreadId) -> IO a -> IO (Async a)
forall a. a -> a
inline (IO () -> IO ThreadId) -> IO a -> IO (Async a)
forall a. (IO () -> IO ThreadId) -> IO a -> IO (Async a)
asyncUsing IO () -> IO ThreadId
rawForkIO
asyncBound :: IO a -> IO (Async a)
asyncBound :: IO a -> IO (Async a)
asyncBound = (IO () -> IO ThreadId) -> IO a -> IO (Async a)
forall a. (IO () -> IO ThreadId) -> IO a -> IO (Async a)
asyncUsing IO () -> IO ThreadId
forkOS
asyncOn :: Int -> IO a -> IO (Async a)
asyncOn :: Int -> IO a -> IO (Async a)
asyncOn = (IO () -> IO ThreadId) -> IO a -> IO (Async a)
forall a. (IO () -> IO ThreadId) -> IO a -> IO (Async a)
asyncUsing ((IO () -> IO ThreadId) -> IO a -> IO (Async a))
-> (Int -> IO () -> IO ThreadId) -> Int -> IO a -> IO (Async a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> IO () -> IO ThreadId
rawForkOn
asyncWithUnmask :: ((forall b . IO b -> IO b) -> IO a) -> IO (Async a)
asyncWithUnmask :: ((forall b. IO b -> IO b) -> IO a) -> IO (Async a)
asyncWithUnmask (forall b. IO b -> IO b) -> IO a
actionWith = (IO () -> IO ThreadId) -> IO a -> IO (Async a)
forall a. (IO () -> IO ThreadId) -> IO a -> IO (Async a)
asyncUsing IO () -> IO ThreadId
rawForkIO ((forall b. IO b -> IO b) -> IO a
actionWith forall b. IO b -> IO b
unsafeUnmask)
asyncOnWithUnmask :: Int -> ((forall b . IO b -> IO b) -> IO a) -> IO (Async a)
asyncOnWithUnmask :: Int -> ((forall b. IO b -> IO b) -> IO a) -> IO (Async a)
asyncOnWithUnmask Int
cpu (forall b. IO b -> IO b) -> IO a
actionWith =
(IO () -> IO ThreadId) -> IO a -> IO (Async a)
forall a. (IO () -> IO ThreadId) -> IO a -> IO (Async a)
asyncUsing (Int -> IO () -> IO ThreadId
rawForkOn Int
cpu) ((forall b. IO b -> IO b) -> IO a
actionWith forall b. IO b -> IO b
unsafeUnmask)
asyncUsing :: (IO () -> IO ThreadId)
-> IO a -> IO (Async a)
asyncUsing :: (IO () -> IO ThreadId) -> IO a -> IO (Async a)
asyncUsing IO () -> IO ThreadId
doFork = \IO a
action -> do
TMVar (Either SomeException a)
var <- IO (TMVar (Either SomeException a))
forall a. IO (TMVar a)
newEmptyTMVarIO
ThreadId
t <- ((forall b. IO b -> IO b) -> IO ThreadId) -> IO ThreadId
forall b. ((forall b. IO b -> IO b) -> IO b) -> IO b
mask (((forall b. IO b -> IO b) -> IO ThreadId) -> IO ThreadId)
-> ((forall b. IO b -> IO b) -> IO ThreadId) -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ \forall b. IO b -> IO b
restore ->
IO () -> IO ThreadId
doFork (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ IO a -> IO (Either SomeException a)
forall e a. Exception e => IO a -> IO (Either e a)
try (IO a -> IO a
forall b. IO b -> IO b
restore IO a
action) IO (Either SomeException a)
-> (Either SomeException a -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ())
-> (Either SomeException a -> STM ())
-> Either SomeException a
-> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TMVar (Either SomeException a) -> Either SomeException a -> STM ()
forall a. TMVar a -> a -> STM ()
putTMVar TMVar (Either SomeException a)
var
Async a -> IO (Async a)
forall (m :: * -> *) a. Monad m => a -> m a
return (ThreadId -> STM (Either SomeException a) -> Async a
forall a. ThreadId -> STM (Either SomeException a) -> Async a
Async ThreadId
t (TMVar (Either SomeException a) -> STM (Either SomeException a)
forall a. TMVar a -> STM a
readTMVar TMVar (Either SomeException a)
var))
withAsync :: IO a -> (Async a -> IO b) -> IO b
withAsync :: IO a -> (Async a -> IO b) -> IO b
withAsync = ((IO () -> IO ThreadId) -> IO a -> (Async a -> IO b) -> IO b)
-> (IO () -> IO ThreadId) -> IO a -> (Async a -> IO b) -> IO b
forall a. a -> a
inline (IO () -> IO ThreadId) -> IO a -> (Async a -> IO b) -> IO b
forall a b.
(IO () -> IO ThreadId) -> IO a -> (Async a -> IO b) -> IO b
withAsyncUsing IO () -> IO ThreadId
rawForkIO
withAsyncBound :: IO a -> (Async a -> IO b) -> IO b
withAsyncBound :: IO a -> (Async a -> IO b) -> IO b
withAsyncBound = (IO () -> IO ThreadId) -> IO a -> (Async a -> IO b) -> IO b
forall a b.
(IO () -> IO ThreadId) -> IO a -> (Async a -> IO b) -> IO b
withAsyncUsing IO () -> IO ThreadId
forkOS
withAsyncOn :: Int -> IO a -> (Async a -> IO b) -> IO b
withAsyncOn :: Int -> IO a -> (Async a -> IO b) -> IO b
withAsyncOn = (IO () -> IO ThreadId) -> IO a -> (Async a -> IO b) -> IO b
forall a b.
(IO () -> IO ThreadId) -> IO a -> (Async a -> IO b) -> IO b
withAsyncUsing ((IO () -> IO ThreadId) -> IO a -> (Async a -> IO b) -> IO b)
-> (Int -> IO () -> IO ThreadId)
-> Int
-> IO a
-> (Async a -> IO b)
-> IO b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> IO () -> IO ThreadId
rawForkOn
withAsyncWithUnmask
:: ((forall c. IO c -> IO c) -> IO a) -> (Async a -> IO b) -> IO b
withAsyncWithUnmask :: ((forall b. IO b -> IO b) -> IO a) -> (Async a -> IO b) -> IO b
withAsyncWithUnmask (forall b. IO b -> IO b) -> IO a
actionWith =
(IO () -> IO ThreadId) -> IO a -> (Async a -> IO b) -> IO b
forall a b.
(IO () -> IO ThreadId) -> IO a -> (Async a -> IO b) -> IO b
withAsyncUsing IO () -> IO ThreadId
rawForkIO ((forall b. IO b -> IO b) -> IO a
actionWith forall b. IO b -> IO b
unsafeUnmask)
withAsyncOnWithUnmask
:: Int -> ((forall c. IO c -> IO c) -> IO a) -> (Async a -> IO b) -> IO b
withAsyncOnWithUnmask :: Int
-> ((forall b. IO b -> IO b) -> IO a) -> (Async a -> IO b) -> IO b
withAsyncOnWithUnmask Int
cpu (forall b. IO b -> IO b) -> IO a
actionWith =
(IO () -> IO ThreadId) -> IO a -> (Async a -> IO b) -> IO b
forall a b.
(IO () -> IO ThreadId) -> IO a -> (Async a -> IO b) -> IO b
withAsyncUsing (Int -> IO () -> IO ThreadId
rawForkOn Int
cpu) ((forall b. IO b -> IO b) -> IO a
actionWith forall b. IO b -> IO b
unsafeUnmask)
withAsyncUsing :: (IO () -> IO ThreadId)
-> IO a -> (Async a -> IO b) -> IO b
withAsyncUsing :: (IO () -> IO ThreadId) -> IO a -> (Async a -> IO b) -> IO b
withAsyncUsing IO () -> IO ThreadId
doFork = \IO a
action Async a -> IO b
inner -> do
TMVar (Either SomeException a)
var <- IO (TMVar (Either SomeException a))
forall a. IO (TMVar a)
newEmptyTMVarIO
((forall b. IO b -> IO b) -> IO b) -> IO b
forall b. ((forall b. IO b -> IO b) -> IO b) -> IO b
mask (((forall b. IO b -> IO b) -> IO b) -> IO b)
-> ((forall b. IO b -> IO b) -> IO b) -> IO b
forall a b. (a -> b) -> a -> b
$ \forall b. IO b -> IO b
restore -> do
ThreadId
t <- IO () -> IO ThreadId
doFork (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ IO a -> IO (Either SomeException a)
forall e a. Exception e => IO a -> IO (Either e a)
try (IO a -> IO a
forall b. IO b -> IO b
restore IO a
action) IO (Either SomeException a)
-> (Either SomeException a -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ())
-> (Either SomeException a -> STM ())
-> Either SomeException a
-> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TMVar (Either SomeException a) -> Either SomeException a -> STM ()
forall a. TMVar a -> a -> STM ()
putTMVar TMVar (Either SomeException a)
var
let a :: Async a
a = ThreadId -> STM (Either SomeException a) -> Async a
forall a. ThreadId -> STM (Either SomeException a) -> Async a
Async ThreadId
t (TMVar (Either SomeException a) -> STM (Either SomeException a)
forall a. TMVar a -> STM a
readTMVar TMVar (Either SomeException a)
var)
b
r <- IO b -> IO b
forall b. IO b -> IO b
restore (Async a -> IO b
inner Async a
a) IO b -> (SomeException -> IO b) -> IO b
forall a. IO a -> (SomeException -> IO a) -> IO a
`catchAll` \SomeException
e -> do
Async a -> IO ()
forall a. Async a -> IO ()
uninterruptibleCancel Async a
a
SomeException -> IO b
forall e a. Exception e => e -> IO a
throwIO SomeException
e
Async a -> IO ()
forall a. Async a -> IO ()
uninterruptibleCancel Async a
a
b -> IO b
forall (m :: * -> *) a. Monad m => a -> m a
return b
r
{-# INLINE wait #-}
wait :: Async a -> IO a
wait :: Async a -> IO a
wait = IO a -> IO a
forall b. IO b -> IO b
tryAgain (IO a -> IO a) -> (Async a -> IO a) -> Async a -> IO a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STM a -> IO a
forall a. STM a -> IO a
atomically (STM a -> IO a) -> (Async a -> STM a) -> Async a -> IO a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Async a -> STM a
forall a. Async a -> STM a
waitSTM
where
tryAgain :: IO a -> IO a
tryAgain IO a
f = IO a
f IO a -> (BlockedIndefinitelyOnSTM -> IO a) -> IO a
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
`catch` \BlockedIndefinitelyOnSTM
BlockedIndefinitelyOnSTM -> IO a
f
{-# INLINE waitCatch #-}
waitCatch :: Async a -> IO (Either SomeException a)
waitCatch :: Async a -> IO (Either SomeException a)
waitCatch = IO (Either SomeException a) -> IO (Either SomeException a)
forall b. IO b -> IO b
tryAgain (IO (Either SomeException a) -> IO (Either SomeException a))
-> (Async a -> IO (Either SomeException a))
-> Async a
-> IO (Either SomeException a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STM (Either SomeException a) -> IO (Either SomeException a)
forall a. STM a -> IO a
atomically (STM (Either SomeException a) -> IO (Either SomeException a))
-> (Async a -> STM (Either SomeException a))
-> Async a
-> IO (Either SomeException a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Async a -> STM (Either SomeException a)
forall a. Async a -> STM (Either SomeException a)
waitCatchSTM
where
tryAgain :: IO a -> IO a
tryAgain IO a
f = IO a
f IO a -> (BlockedIndefinitelyOnSTM -> IO a) -> IO a
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
`catch` \BlockedIndefinitelyOnSTM
BlockedIndefinitelyOnSTM -> IO a
f
{-# INLINE poll #-}
poll :: Async a -> IO (Maybe (Either SomeException a))
poll :: Async a -> IO (Maybe (Either SomeException a))
poll = STM (Maybe (Either SomeException a))
-> IO (Maybe (Either SomeException a))
forall a. STM a -> IO a
atomically (STM (Maybe (Either SomeException a))
-> IO (Maybe (Either SomeException a)))
-> (Async a -> STM (Maybe (Either SomeException a)))
-> Async a
-> IO (Maybe (Either SomeException a))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Async a -> STM (Maybe (Either SomeException a))
forall a. Async a -> STM (Maybe (Either SomeException a))
pollSTM
waitSTM :: Async a -> STM a
waitSTM :: Async a -> STM a
waitSTM Async a
a = do
Either SomeException a
r <- Async a -> STM (Either SomeException a)
forall a. Async a -> STM (Either SomeException a)
waitCatchSTM Async a
a
(SomeException -> STM a)
-> (a -> STM a) -> Either SomeException a -> STM a
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either SomeException -> STM a
forall e a. Exception e => e -> STM a
throwSTM a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return Either SomeException a
r
{-# INLINE waitCatchSTM #-}
waitCatchSTM :: Async a -> STM (Either SomeException a)
waitCatchSTM :: Async a -> STM (Either SomeException a)
waitCatchSTM (Async ThreadId
_ STM (Either SomeException a)
w) = STM (Either SomeException a)
w
{-# INLINE pollSTM #-}
pollSTM :: Async a -> STM (Maybe (Either SomeException a))
pollSTM :: Async a -> STM (Maybe (Either SomeException a))
pollSTM (Async ThreadId
_ STM (Either SomeException a)
w) = (Either SomeException a -> Maybe (Either SomeException a)
forall a. a -> Maybe a
Just (Either SomeException a -> Maybe (Either SomeException a))
-> STM (Either SomeException a)
-> STM (Maybe (Either SomeException a))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> STM (Either SomeException a)
w) STM (Maybe (Either SomeException a))
-> STM (Maybe (Either SomeException a))
-> STM (Maybe (Either SomeException a))
forall a. STM a -> STM a -> STM a
`orElse` Maybe (Either SomeException a)
-> STM (Maybe (Either SomeException a))
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe (Either SomeException a)
forall a. Maybe a
Nothing
{-# INLINE cancel #-}
cancel :: Async a -> IO ()
cancel :: Async a -> IO ()
cancel a :: Async a
a@(Async ThreadId
t STM (Either SomeException a)
_) = ThreadId -> AsyncCancelled -> IO ()
forall e. Exception e => ThreadId -> e -> IO ()
throwTo ThreadId
t AsyncCancelled
AsyncCancelled IO () -> IO (Either SomeException a) -> IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b -> f a
<* Async a -> IO (Either SomeException a)
forall a. Async a -> IO (Either SomeException a)
waitCatch Async a
a
data AsyncCancelled = AsyncCancelled
deriving (Int -> AsyncCancelled -> ShowS
[AsyncCancelled] -> ShowS
AsyncCancelled -> String
(Int -> AsyncCancelled -> ShowS)
-> (AsyncCancelled -> String)
-> ([AsyncCancelled] -> ShowS)
-> Show AsyncCancelled
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [AsyncCancelled] -> ShowS
$cshowList :: [AsyncCancelled] -> ShowS
show :: AsyncCancelled -> String
$cshow :: AsyncCancelled -> String
showsPrec :: Int -> AsyncCancelled -> ShowS
$cshowsPrec :: Int -> AsyncCancelled -> ShowS
Show, AsyncCancelled -> AsyncCancelled -> Bool
(AsyncCancelled -> AsyncCancelled -> Bool)
-> (AsyncCancelled -> AsyncCancelled -> Bool) -> Eq AsyncCancelled
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: AsyncCancelled -> AsyncCancelled -> Bool
$c/= :: AsyncCancelled -> AsyncCancelled -> Bool
== :: AsyncCancelled -> AsyncCancelled -> Bool
$c== :: AsyncCancelled -> AsyncCancelled -> Bool
Eq
#if __GLASGOW_HASKELL__ < 710
,Typeable
#endif
)
instance Exception AsyncCancelled where
#if __GLASGOW_HASKELL__ >= 708
fromException :: SomeException -> Maybe AsyncCancelled
fromException = SomeException -> Maybe AsyncCancelled
forall e. Exception e => SomeException -> Maybe e
asyncExceptionFromException
toException :: AsyncCancelled -> SomeException
toException = AsyncCancelled -> SomeException
forall e. Exception e => e -> SomeException
asyncExceptionToException
#endif
{-# INLINE uninterruptibleCancel #-}
uninterruptibleCancel :: Async a -> IO ()
uninterruptibleCancel :: Async a -> IO ()
uninterruptibleCancel = IO () -> IO ()
forall b. IO b -> IO b
uninterruptibleMask_ (IO () -> IO ()) -> (Async a -> IO ()) -> Async a -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Async a -> IO ()
forall a. Async a -> IO ()
cancel
cancelWith :: Exception e => Async a -> e -> IO ()
cancelWith :: Async a -> e -> IO ()
cancelWith a :: Async a
a@(Async ThreadId
t STM (Either SomeException a)
_) e
e = ThreadId -> e -> IO ()
forall e. Exception e => ThreadId -> e -> IO ()
throwTo ThreadId
t e
e IO () -> IO (Either SomeException a) -> IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b -> f a
<* Async a -> IO (Either SomeException a)
forall a. Async a -> IO (Either SomeException a)
waitCatch Async a
a
{-# INLINE waitAnyCatch #-}
waitAnyCatch :: [Async a] -> IO (Async a, Either SomeException a)
waitAnyCatch :: [Async a] -> IO (Async a, Either SomeException a)
waitAnyCatch = STM (Async a, Either SomeException a)
-> IO (Async a, Either SomeException a)
forall a. STM a -> IO a
atomically (STM (Async a, Either SomeException a)
-> IO (Async a, Either SomeException a))
-> ([Async a] -> STM (Async a, Either SomeException a))
-> [Async a]
-> IO (Async a, Either SomeException a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [Async a] -> STM (Async a, Either SomeException a)
forall a. [Async a] -> STM (Async a, Either SomeException a)
waitAnyCatchSTM
waitAnyCatchSTM :: [Async a] -> STM (Async a, Either SomeException a)
waitAnyCatchSTM :: [Async a] -> STM (Async a, Either SomeException a)
waitAnyCatchSTM [Async a]
asyncs =
(STM (Async a, Either SomeException a)
-> STM (Async a, Either SomeException a)
-> STM (Async a, Either SomeException a))
-> STM (Async a, Either SomeException a)
-> [STM (Async a, Either SomeException a)]
-> STM (Async a, Either SomeException a)
forall (t :: * -> *) a b.
Foldable t =>
(a -> b -> b) -> b -> t a -> b
foldr STM (Async a, Either SomeException a)
-> STM (Async a, Either SomeException a)
-> STM (Async a, Either SomeException a)
forall a. STM a -> STM a -> STM a
orElse STM (Async a, Either SomeException a)
forall a. STM a
retry ([STM (Async a, Either SomeException a)]
-> STM (Async a, Either SomeException a))
-> [STM (Async a, Either SomeException a)]
-> STM (Async a, Either SomeException a)
forall a b. (a -> b) -> a -> b
$
(Async a -> STM (Async a, Either SomeException a))
-> [Async a] -> [STM (Async a, Either SomeException a)]
forall a b. (a -> b) -> [a] -> [b]
map (\Async a
a -> do Either SomeException a
r <- Async a -> STM (Either SomeException a)
forall a. Async a -> STM (Either SomeException a)
waitCatchSTM Async a
a; (Async a, Either SomeException a)
-> STM (Async a, Either SomeException a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Async a
a, Either SomeException a
r)) [Async a]
asyncs
waitAnyCatchCancel :: [Async a] -> IO (Async a, Either SomeException a)
waitAnyCatchCancel :: [Async a] -> IO (Async a, Either SomeException a)
waitAnyCatchCancel [Async a]
asyncs =
[Async a] -> IO (Async a, Either SomeException a)
forall a. [Async a] -> IO (Async a, Either SomeException a)
waitAnyCatch [Async a]
asyncs IO (Async a, Either SomeException a)
-> IO () -> IO (Async a, Either SomeException a)
forall a b. IO a -> IO b -> IO a
`finally` (Async a -> IO ()) -> [Async a] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ Async a -> IO ()
forall a. Async a -> IO ()
cancel [Async a]
asyncs
{-# INLINE waitAny #-}
waitAny :: [Async a] -> IO (Async a, a)
waitAny :: [Async a] -> IO (Async a, a)
waitAny = STM (Async a, a) -> IO (Async a, a)
forall a. STM a -> IO a
atomically (STM (Async a, a) -> IO (Async a, a))
-> ([Async a] -> STM (Async a, a)) -> [Async a] -> IO (Async a, a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [Async a] -> STM (Async a, a)
forall a. [Async a] -> STM (Async a, a)
waitAnySTM
waitAnySTM :: [Async a] -> STM (Async a, a)
waitAnySTM :: [Async a] -> STM (Async a, a)
waitAnySTM [Async a]
asyncs =
(STM (Async a, a) -> STM (Async a, a) -> STM (Async a, a))
-> STM (Async a, a) -> [STM (Async a, a)] -> STM (Async a, a)
forall (t :: * -> *) a b.
Foldable t =>
(a -> b -> b) -> b -> t a -> b
foldr STM (Async a, a) -> STM (Async a, a) -> STM (Async a, a)
forall a. STM a -> STM a -> STM a
orElse STM (Async a, a)
forall a. STM a
retry ([STM (Async a, a)] -> STM (Async a, a))
-> [STM (Async a, a)] -> STM (Async a, a)
forall a b. (a -> b) -> a -> b
$
(Async a -> STM (Async a, a)) -> [Async a] -> [STM (Async a, a)]
forall a b. (a -> b) -> [a] -> [b]
map (\Async a
a -> do a
r <- Async a -> STM a
forall a. Async a -> STM a
waitSTM Async a
a; (Async a, a) -> STM (Async a, a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Async a
a, a
r)) [Async a]
asyncs
waitAnyCancel :: [Async a] -> IO (Async a, a)
waitAnyCancel :: [Async a] -> IO (Async a, a)
waitAnyCancel [Async a]
asyncs =
[Async a] -> IO (Async a, a)
forall a. [Async a] -> IO (Async a, a)
waitAny [Async a]
asyncs IO (Async a, a) -> IO () -> IO (Async a, a)
forall a b. IO a -> IO b -> IO a
`finally` (Async a -> IO ()) -> [Async a] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ Async a -> IO ()
forall a. Async a -> IO ()
cancel [Async a]
asyncs
{-# INLINE waitEitherCatch #-}
waitEitherCatch :: Async a -> Async b
-> IO (Either (Either SomeException a)
(Either SomeException b))
waitEitherCatch :: Async a
-> Async b
-> IO (Either (Either SomeException a) (Either SomeException b))
waitEitherCatch Async a
left Async b
right =
IO (Either (Either SomeException a) (Either SomeException b))
-> IO (Either (Either SomeException a) (Either SomeException b))
forall b. IO b -> IO b
tryAgain (IO (Either (Either SomeException a) (Either SomeException b))
-> IO (Either (Either SomeException a) (Either SomeException b)))
-> IO (Either (Either SomeException a) (Either SomeException b))
-> IO (Either (Either SomeException a) (Either SomeException b))
forall a b. (a -> b) -> a -> b
$ STM (Either (Either SomeException a) (Either SomeException b))
-> IO (Either (Either SomeException a) (Either SomeException b))
forall a. STM a -> IO a
atomically (Async a
-> Async b
-> STM (Either (Either SomeException a) (Either SomeException b))
forall a b.
Async a
-> Async b
-> STM (Either (Either SomeException a) (Either SomeException b))
waitEitherCatchSTM Async a
left Async b
right)
where
tryAgain :: IO a -> IO a
tryAgain IO a
f = IO a
f IO a -> (BlockedIndefinitelyOnSTM -> IO a) -> IO a
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
`catch` \BlockedIndefinitelyOnSTM
BlockedIndefinitelyOnSTM -> IO a
f
waitEitherCatchSTM :: Async a -> Async b
-> STM (Either (Either SomeException a)
(Either SomeException b))
waitEitherCatchSTM :: Async a
-> Async b
-> STM (Either (Either SomeException a) (Either SomeException b))
waitEitherCatchSTM Async a
left Async b
right =
(Either SomeException a
-> Either (Either SomeException a) (Either SomeException b)
forall a b. a -> Either a b
Left (Either SomeException a
-> Either (Either SomeException a) (Either SomeException b))
-> STM (Either SomeException a)
-> STM (Either (Either SomeException a) (Either SomeException b))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Async a -> STM (Either SomeException a)
forall a. Async a -> STM (Either SomeException a)
waitCatchSTM Async a
left)
STM (Either (Either SomeException a) (Either SomeException b))
-> STM (Either (Either SomeException a) (Either SomeException b))
-> STM (Either (Either SomeException a) (Either SomeException b))
forall a. STM a -> STM a -> STM a
`orElse`
(Either SomeException b
-> Either (Either SomeException a) (Either SomeException b)
forall a b. b -> Either a b
Right (Either SomeException b
-> Either (Either SomeException a) (Either SomeException b))
-> STM (Either SomeException b)
-> STM (Either (Either SomeException a) (Either SomeException b))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Async b -> STM (Either SomeException b)
forall a. Async a -> STM (Either SomeException a)
waitCatchSTM Async b
right)
waitEitherCatchCancel :: Async a -> Async b
-> IO (Either (Either SomeException a)
(Either SomeException b))
waitEitherCatchCancel :: Async a
-> Async b
-> IO (Either (Either SomeException a) (Either SomeException b))
waitEitherCatchCancel Async a
left Async b
right =
Async a
-> Async b
-> IO (Either (Either SomeException a) (Either SomeException b))
forall a b.
Async a
-> Async b
-> IO (Either (Either SomeException a) (Either SomeException b))
waitEitherCatch Async a
left Async b
right IO (Either (Either SomeException a) (Either SomeException b))
-> IO ()
-> IO (Either (Either SomeException a) (Either SomeException b))
forall a b. IO a -> IO b -> IO a
`finally` (Async a -> IO ()
forall a. Async a -> IO ()
cancel Async a
left IO () -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Async b -> IO ()
forall a. Async a -> IO ()
cancel Async b
right)
{-# INLINE waitEither #-}
waitEither :: Async a -> Async b -> IO (Either a b)
waitEither :: Async a -> Async b -> IO (Either a b)
waitEither Async a
left Async b
right = STM (Either a b) -> IO (Either a b)
forall a. STM a -> IO a
atomically (Async a -> Async b -> STM (Either a b)
forall a b. Async a -> Async b -> STM (Either a b)
waitEitherSTM Async a
left Async b
right)
waitEitherSTM :: Async a -> Async b -> STM (Either a b)
waitEitherSTM :: Async a -> Async b -> STM (Either a b)
waitEitherSTM Async a
left Async b
right =
(a -> Either a b
forall a b. a -> Either a b
Left (a -> Either a b) -> STM a -> STM (Either a b)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Async a -> STM a
forall a. Async a -> STM a
waitSTM Async a
left)
STM (Either a b) -> STM (Either a b) -> STM (Either a b)
forall a. STM a -> STM a -> STM a
`orElse`
(b -> Either a b
forall a b. b -> Either a b
Right (b -> Either a b) -> STM b -> STM (Either a b)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Async b -> STM b
forall a. Async a -> STM a
waitSTM Async b
right)
{-# INLINE waitEither_ #-}
waitEither_ :: Async a -> Async b -> IO ()
waitEither_ :: Async a -> Async b -> IO ()
waitEither_ Async a
left Async b
right = STM () -> IO ()
forall a. STM a -> IO a
atomically (Async a -> Async b -> STM ()
forall a b. Async a -> Async b -> STM ()
waitEitherSTM_ Async a
left Async b
right)
waitEitherSTM_:: Async a -> Async b -> STM ()
waitEitherSTM_ :: Async a -> Async b -> STM ()
waitEitherSTM_ Async a
left Async b
right =
(STM a -> STM ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (STM a -> STM ()) -> STM a -> STM ()
forall a b. (a -> b) -> a -> b
$ Async a -> STM a
forall a. Async a -> STM a
waitSTM Async a
left)
STM () -> STM () -> STM ()
forall a. STM a -> STM a -> STM a
`orElse`
(STM b -> STM ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (STM b -> STM ()) -> STM b -> STM ()
forall a b. (a -> b) -> a -> b
$ Async b -> STM b
forall a. Async a -> STM a
waitSTM Async b
right)
waitEitherCancel :: Async a -> Async b -> IO (Either a b)
waitEitherCancel :: Async a -> Async b -> IO (Either a b)
waitEitherCancel Async a
left Async b
right =
Async a -> Async b -> IO (Either a b)
forall a b. Async a -> Async b -> IO (Either a b)
waitEither Async a
left Async b
right IO (Either a b) -> IO () -> IO (Either a b)
forall a b. IO a -> IO b -> IO a
`finally` (Async a -> IO ()
forall a. Async a -> IO ()
cancel Async a
left IO () -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Async b -> IO ()
forall a. Async a -> IO ()
cancel Async b
right)
{-# INLINE waitBoth #-}
waitBoth :: Async a -> Async b -> IO (a,b)
waitBoth :: Async a -> Async b -> IO (a, b)
waitBoth Async a
left Async b
right = IO (a, b) -> IO (a, b)
forall b. IO b -> IO b
tryAgain (IO (a, b) -> IO (a, b)) -> IO (a, b) -> IO (a, b)
forall a b. (a -> b) -> a -> b
$ STM (a, b) -> IO (a, b)
forall a. STM a -> IO a
atomically (Async a -> Async b -> STM (a, b)
forall a b. Async a -> Async b -> STM (a, b)
waitBothSTM Async a
left Async b
right)
where
tryAgain :: IO a -> IO a
tryAgain IO a
f = IO a
f IO a -> (BlockedIndefinitelyOnSTM -> IO a) -> IO a
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
`catch` \BlockedIndefinitelyOnSTM
BlockedIndefinitelyOnSTM -> IO a
f
waitBothSTM :: Async a -> Async b -> STM (a,b)
waitBothSTM :: Async a -> Async b -> STM (a, b)
waitBothSTM Async a
left Async b
right = do
a
a <- Async a -> STM a
forall a. Async a -> STM a
waitSTM Async a
left
STM a -> STM a -> STM a
forall a. STM a -> STM a -> STM a
`orElse`
(Async b -> STM b
forall a. Async a -> STM a
waitSTM Async b
right STM b -> STM a -> STM a
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> STM a
forall a. STM a
retry)
b
b <- Async b -> STM b
forall a. Async a -> STM a
waitSTM Async b
right
(a, b) -> STM (a, b)
forall (m :: * -> *) a. Monad m => a -> m a
return (a
a,b
b)
data ExceptionInLinkedThread =
forall a . ExceptionInLinkedThread (Async a) SomeException
#if __GLASGOW_HASKELL__ < 710
deriving Typeable
#endif
instance Show ExceptionInLinkedThread where
showsPrec :: Int -> ExceptionInLinkedThread -> ShowS
showsPrec Int
p (ExceptionInLinkedThread (Async ThreadId
t STM (Either SomeException a)
_) SomeException
e) =
Bool -> ShowS -> ShowS
showParen (Int
p Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
11) (ShowS -> ShowS) -> ShowS -> ShowS
forall a b. (a -> b) -> a -> b
$
String -> ShowS
showString String
"ExceptionInLinkedThread " ShowS -> ShowS -> ShowS
forall b c a. (b -> c) -> (a -> b) -> a -> c
.
Int -> ThreadId -> ShowS
forall a. Show a => Int -> a -> ShowS
showsPrec Int
11 ThreadId
t ShowS -> ShowS -> ShowS
forall b c a. (b -> c) -> (a -> b) -> a -> c
.
String -> ShowS
showString String
" " ShowS -> ShowS -> ShowS
forall b c a. (b -> c) -> (a -> b) -> a -> c
.
Int -> SomeException -> ShowS
forall a. Show a => Int -> a -> ShowS
showsPrec Int
11 SomeException
e
instance Exception ExceptionInLinkedThread where
#if __GLASGOW_HASKELL__ >= 708
fromException :: SomeException -> Maybe ExceptionInLinkedThread
fromException = SomeException -> Maybe ExceptionInLinkedThread
forall e. Exception e => SomeException -> Maybe e
asyncExceptionFromException
toException :: ExceptionInLinkedThread -> SomeException
toException = ExceptionInLinkedThread -> SomeException
forall e. Exception e => e -> SomeException
asyncExceptionToException
#endif
link :: Async a -> IO ()
link :: Async a -> IO ()
link = (SomeException -> Bool) -> Async a -> IO ()
forall a. (SomeException -> Bool) -> Async a -> IO ()
linkOnly (Bool -> Bool
not (Bool -> Bool) -> (SomeException -> Bool) -> SomeException -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SomeException -> Bool
isCancel)
linkOnly
:: (SomeException -> Bool)
-> Async a
-> IO ()
linkOnly :: (SomeException -> Bool) -> Async a -> IO ()
linkOnly SomeException -> Bool
shouldThrow Async a
a = do
ThreadId
me <- IO ThreadId
myThreadId
IO ThreadId -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO ThreadId -> IO ()) -> IO ThreadId -> IO ()
forall a b. (a -> b) -> a -> b
$ IO () -> IO ThreadId
forall a. IO a -> IO ThreadId
forkRepeat (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ do
Either SomeException a
r <- Async a -> IO (Either SomeException a)
forall a. Async a -> IO (Either SomeException a)
waitCatch Async a
a
case Either SomeException a
r of
Left SomeException
e | SomeException -> Bool
shouldThrow SomeException
e -> ThreadId -> ExceptionInLinkedThread -> IO ()
forall e. Exception e => ThreadId -> e -> IO ()
throwTo ThreadId
me (Async a -> SomeException -> ExceptionInLinkedThread
forall a. Async a -> SomeException -> ExceptionInLinkedThread
ExceptionInLinkedThread Async a
a SomeException
e)
Either SomeException a
_otherwise -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
link2 :: Async a -> Async b -> IO ()
link2 :: Async a -> Async b -> IO ()
link2 = (SomeException -> Bool) -> Async a -> Async b -> IO ()
forall a b. (SomeException -> Bool) -> Async a -> Async b -> IO ()
link2Only (Bool -> Bool
not (Bool -> Bool) -> (SomeException -> Bool) -> SomeException -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SomeException -> Bool
isCancel)
link2Only :: (SomeException -> Bool) -> Async a -> Async b -> IO ()
link2Only :: (SomeException -> Bool) -> Async a -> Async b -> IO ()
link2Only SomeException -> Bool
shouldThrow left :: Async a
left@(Async ThreadId
tl STM (Either SomeException a)
_) right :: Async b
right@(Async ThreadId
tr STM (Either SomeException b)
_) =
IO ThreadId -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO ThreadId -> IO ()) -> IO ThreadId -> IO ()
forall a b. (a -> b) -> a -> b
$ IO () -> IO ThreadId
forall a. IO a -> IO ThreadId
forkRepeat (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ do
Either (Either SomeException a) (Either SomeException b)
r <- Async a
-> Async b
-> IO (Either (Either SomeException a) (Either SomeException b))
forall a b.
Async a
-> Async b
-> IO (Either (Either SomeException a) (Either SomeException b))
waitEitherCatch Async a
left Async b
right
case Either (Either SomeException a) (Either SomeException b)
r of
Left (Left SomeException
e) | SomeException -> Bool
shouldThrow SomeException
e ->
ThreadId -> ExceptionInLinkedThread -> IO ()
forall e. Exception e => ThreadId -> e -> IO ()
throwTo ThreadId
tr (Async a -> SomeException -> ExceptionInLinkedThread
forall a. Async a -> SomeException -> ExceptionInLinkedThread
ExceptionInLinkedThread Async a
left SomeException
e)
Right (Left SomeException
e) | SomeException -> Bool
shouldThrow SomeException
e ->
ThreadId -> ExceptionInLinkedThread -> IO ()
forall e. Exception e => ThreadId -> e -> IO ()
throwTo ThreadId
tl (Async b -> SomeException -> ExceptionInLinkedThread
forall a. Async a -> SomeException -> ExceptionInLinkedThread
ExceptionInLinkedThread Async b
right SomeException
e)
Either (Either SomeException a) (Either SomeException b)
_ -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
isCancel :: SomeException -> Bool
isCancel :: SomeException -> Bool
isCancel SomeException
e
| Just AsyncCancelled
AsyncCancelled <- SomeException -> Maybe AsyncCancelled
forall e. Exception e => SomeException -> Maybe e
fromException SomeException
e = Bool
True
| Bool
otherwise = Bool
False
race :: IO a -> IO b -> IO (Either a b)
race_ :: IO a -> IO b -> IO ()
concurrently :: IO a -> IO b -> IO (a,b)
concurrently_ :: IO a -> IO b -> IO ()
#define USE_ASYNC_VERSIONS 0
#if USE_ASYNC_VERSIONS
race left right =
withAsync left $ \a ->
withAsync right $ \b ->
waitEither a b
race_ left right = void $ race left right
concurrently left right =
withAsync left $ \a ->
withAsync right $ \b ->
waitBoth a b
concurrently_ left right = void $ concurrently left right
#else
race :: IO a -> IO b -> IO (Either a b)
race IO a
left IO b
right = IO a
-> IO b
-> (IO (Either SomeException (Either a b)) -> IO (Either a b))
-> IO (Either a b)
forall a b r.
IO a
-> IO b -> (IO (Either SomeException (Either a b)) -> IO r) -> IO r
concurrently' IO a
left IO b
right IO (Either SomeException (Either a b)) -> IO (Either a b)
forall e b. Exception e => IO (Either e b) -> IO b
collect
where
collect :: IO (Either e b) -> IO b
collect IO (Either e b)
m = do
Either e b
e <- IO (Either e b)
m
case Either e b
e of
Left e
ex -> e -> IO b
forall e a. Exception e => e -> IO a
throwIO e
ex
Right b
r -> b -> IO b
forall (m :: * -> *) a. Monad m => a -> m a
return b
r
race_ :: IO a -> IO b -> IO ()
race_ IO a
left IO b
right = IO (Either a b) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (Either a b) -> IO ()) -> IO (Either a b) -> IO ()
forall a b. (a -> b) -> a -> b
$ IO a -> IO b -> IO (Either a b)
forall a b. IO a -> IO b -> IO (Either a b)
race IO a
left IO b
right
concurrently :: IO a -> IO b -> IO (a, b)
concurrently IO a
left IO b
right = IO a
-> IO b
-> (IO (Either SomeException (Either a b)) -> IO (a, b))
-> IO (a, b)
forall a b r.
IO a
-> IO b -> (IO (Either SomeException (Either a b)) -> IO r) -> IO r
concurrently' IO a
left IO b
right ([Either a b] -> IO (Either SomeException (Either a b)) -> IO (a, b)
forall e a b.
Exception e =>
[Either a b] -> IO (Either e (Either a b)) -> IO (a, b)
collect [])
where
collect :: [Either a b] -> IO (Either e (Either a b)) -> IO (a, b)
collect [Left a
a, Right b
b] IO (Either e (Either a b))
_ = (a, b) -> IO (a, b)
forall (m :: * -> *) a. Monad m => a -> m a
return (a
a,b
b)
collect [Right b
b, Left a
a] IO (Either e (Either a b))
_ = (a, b) -> IO (a, b)
forall (m :: * -> *) a. Monad m => a -> m a
return (a
a,b
b)
collect [Either a b]
xs IO (Either e (Either a b))
m = do
Either e (Either a b)
e <- IO (Either e (Either a b))
m
case Either e (Either a b)
e of
Left e
ex -> e -> IO (a, b)
forall e a. Exception e => e -> IO a
throwIO e
ex
Right Either a b
r -> [Either a b] -> IO (Either e (Either a b)) -> IO (a, b)
collect (Either a b
rEither a b -> [Either a b] -> [Either a b]
forall a. a -> [a] -> [a]
:[Either a b]
xs) IO (Either e (Either a b))
m
concurrently' :: IO a -> IO b
-> (IO (Either SomeException (Either a b)) -> IO r)
-> IO r
concurrently' :: IO a
-> IO b -> (IO (Either SomeException (Either a b)) -> IO r) -> IO r
concurrently' IO a
left IO b
right IO (Either SomeException (Either a b)) -> IO r
collect = do
MVar (Either SomeException (Either a b))
done <- IO (MVar (Either SomeException (Either a b)))
forall a. IO (MVar a)
newEmptyMVar
((forall b. IO b -> IO b) -> IO r) -> IO r
forall b. ((forall b. IO b -> IO b) -> IO b) -> IO b
mask (((forall b. IO b -> IO b) -> IO r) -> IO r)
-> ((forall b. IO b -> IO b) -> IO r) -> IO r
forall a b. (a -> b) -> a -> b
$ \forall b. IO b -> IO b
restore -> do
ThreadId
lid <- IO () -> IO ThreadId
forkIO (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ IO () -> IO ()
forall b. IO b -> IO b
uninterruptibleMask_ (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
IO () -> IO ()
forall b. IO b -> IO b
restore (IO a
left IO a -> (a -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= MVar (Either SomeException (Either a b))
-> Either SomeException (Either a b) -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar (Either SomeException (Either a b))
done (Either SomeException (Either a b) -> IO ())
-> (a -> Either SomeException (Either a b)) -> a -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Either a b -> Either SomeException (Either a b)
forall a b. b -> Either a b
Right (Either a b -> Either SomeException (Either a b))
-> (a -> Either a b) -> a -> Either SomeException (Either a b)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> Either a b
forall a b. a -> Either a b
Left)
IO () -> (SomeException -> IO ()) -> IO ()
forall a. IO a -> (SomeException -> IO a) -> IO a
`catchAll` (MVar (Either SomeException (Either a b))
-> Either SomeException (Either a b) -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar (Either SomeException (Either a b))
done (Either SomeException (Either a b) -> IO ())
-> (SomeException -> Either SomeException (Either a b))
-> SomeException
-> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SomeException -> Either SomeException (Either a b)
forall a b. a -> Either a b
Left)
ThreadId
rid <- IO () -> IO ThreadId
forkIO (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ IO () -> IO ()
forall b. IO b -> IO b
uninterruptibleMask_ (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
IO () -> IO ()
forall b. IO b -> IO b
restore (IO b
right IO b -> (b -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= MVar (Either SomeException (Either a b))
-> Either SomeException (Either a b) -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar (Either SomeException (Either a b))
done (Either SomeException (Either a b) -> IO ())
-> (b -> Either SomeException (Either a b)) -> b -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Either a b -> Either SomeException (Either a b)
forall a b. b -> Either a b
Right (Either a b -> Either SomeException (Either a b))
-> (b -> Either a b) -> b -> Either SomeException (Either a b)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. b -> Either a b
forall a b. b -> Either a b
Right)
IO () -> (SomeException -> IO ()) -> IO ()
forall a. IO a -> (SomeException -> IO a) -> IO a
`catchAll` (MVar (Either SomeException (Either a b))
-> Either SomeException (Either a b) -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar (Either SomeException (Either a b))
done (Either SomeException (Either a b) -> IO ())
-> (SomeException -> Either SomeException (Either a b))
-> SomeException
-> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SomeException -> Either SomeException (Either a b)
forall a b. a -> Either a b
Left)
IORef Int
count <- Int -> IO (IORef Int)
forall a. a -> IO (IORef a)
newIORef (Int
2 :: Int)
let takeDone :: IO (Either SomeException (Either a b))
takeDone = do
Either SomeException (Either a b)
r <- MVar (Either SomeException (Either a b))
-> IO (Either SomeException (Either a b))
forall a. MVar a -> IO a
takeMVar MVar (Either SomeException (Either a b))
done
IORef Int -> (Int -> Int) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
modifyIORef IORef Int
count (Int -> Int -> Int
forall a. Num a => a -> a -> a
subtract Int
1)
Either SomeException (Either a b)
-> IO (Either SomeException (Either a b))
forall (m :: * -> *) a. Monad m => a -> m a
return Either SomeException (Either a b)
r
let tryAgain :: IO a -> IO a
tryAgain IO a
f = IO a
f IO a -> (BlockedIndefinitelyOnMVar -> IO a) -> IO a
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
`catch` \BlockedIndefinitelyOnMVar
BlockedIndefinitelyOnMVar -> IO a
f
stop :: IO ()
stop = do
IO () -> IO ()
forall b. IO b -> IO b
uninterruptibleMask_ (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
Int
count' <- IORef Int -> IO Int
forall a. IORef a -> IO a
readIORef IORef Int
count
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
count' Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
IO ThreadId -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO ThreadId -> IO ()) -> IO ThreadId -> IO ()
forall a b. (a -> b) -> a -> b
$ IO () -> IO ThreadId
forkIO (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ do
ThreadId -> AsyncCancelled -> IO ()
forall e. Exception e => ThreadId -> e -> IO ()
throwTo ThreadId
rid AsyncCancelled
AsyncCancelled
ThreadId -> AsyncCancelled -> IO ()
forall e. Exception e => ThreadId -> e -> IO ()
throwTo ThreadId
lid AsyncCancelled
AsyncCancelled
Int -> IO (Either SomeException (Either a b)) -> IO ()
forall (m :: * -> *) a. Applicative m => Int -> m a -> m ()
replicateM_ Int
count' (IO (Either SomeException (Either a b))
-> IO (Either SomeException (Either a b))
forall b. IO b -> IO b
tryAgain (IO (Either SomeException (Either a b))
-> IO (Either SomeException (Either a b)))
-> IO (Either SomeException (Either a b))
-> IO (Either SomeException (Either a b))
forall a b. (a -> b) -> a -> b
$ MVar (Either SomeException (Either a b))
-> IO (Either SomeException (Either a b))
forall a. MVar a -> IO a
takeMVar MVar (Either SomeException (Either a b))
done)
r
r <- IO (Either SomeException (Either a b)) -> IO r
collect (IO (Either SomeException (Either a b))
-> IO (Either SomeException (Either a b))
forall b. IO b -> IO b
tryAgain (IO (Either SomeException (Either a b))
-> IO (Either SomeException (Either a b)))
-> IO (Either SomeException (Either a b))
-> IO (Either SomeException (Either a b))
forall a b. (a -> b) -> a -> b
$ IO (Either SomeException (Either a b))
takeDone) IO r -> IO () -> IO r
forall a b. IO a -> IO b -> IO a
`onException` IO ()
stop
IO ()
stop
r -> IO r
forall (m :: * -> *) a. Monad m => a -> m a
return r
r
concurrently_ :: IO a -> IO b -> IO ()
concurrently_ IO a
left IO b
right = IO a
-> IO b
-> (IO (Either SomeException (Either a b)) -> IO ())
-> IO ()
forall a b r.
IO a
-> IO b -> (IO (Either SomeException (Either a b)) -> IO r) -> IO r
concurrently' IO a
left IO b
right (Int -> IO (Either SomeException (Either a b)) -> IO ()
forall e b. Exception e => Int -> IO (Either e b) -> IO ()
collect Int
0)
where
collect :: Int -> IO (Either e b) -> IO ()
collect Int
2 IO (Either e b)
_ = () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
collect Int
i IO (Either e b)
m = do
Either e b
e <- IO (Either e b)
m
case Either e b
e of
Left e
ex -> e -> IO ()
forall e a. Exception e => e -> IO a
throwIO e
ex
Right b
_ -> Int -> IO (Either e b) -> IO ()
collect (Int
i Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1 :: Int) IO (Either e b)
m
#endif
mapConcurrently :: Traversable t => (a -> IO b) -> t a -> IO (t b)
mapConcurrently :: (a -> IO b) -> t a -> IO (t b)
mapConcurrently a -> IO b
f = Concurrently (t b) -> IO (t b)
forall a. Concurrently a -> IO a
runConcurrently (Concurrently (t b) -> IO (t b))
-> (t a -> Concurrently (t b)) -> t a -> IO (t b)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (a -> Concurrently b) -> t a -> Concurrently (t b)
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
traverse (IO b -> Concurrently b
forall a. IO a -> Concurrently a
Concurrently (IO b -> Concurrently b) -> (a -> IO b) -> a -> Concurrently b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> IO b
f)
forConcurrently :: Traversable t => t a -> (a -> IO b) -> IO (t b)
forConcurrently :: t a -> (a -> IO b) -> IO (t b)
forConcurrently = ((a -> IO b) -> t a -> IO (t b)) -> t a -> (a -> IO b) -> IO (t b)
forall a b c. (a -> b -> c) -> b -> a -> c
flip (a -> IO b) -> t a -> IO (t b)
forall (t :: * -> *) a b.
Traversable t =>
(a -> IO b) -> t a -> IO (t b)
mapConcurrently
mapConcurrently_ :: F.Foldable f => (a -> IO b) -> f a -> IO ()
mapConcurrently_ :: (a -> IO b) -> f a -> IO ()
mapConcurrently_ a -> IO b
f = Concurrently () -> IO ()
forall a. Concurrently a -> IO a
runConcurrently (Concurrently () -> IO ())
-> (f a -> Concurrently ()) -> f a -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (a -> Concurrently ()) -> f a -> Concurrently ()
forall (t :: * -> *) m a.
(Foldable t, Monoid m) =>
(a -> m) -> t a -> m
F.foldMap (IO () -> Concurrently ()
forall a. IO a -> Concurrently a
Concurrently (IO () -> Concurrently ()) -> (a -> IO ()) -> a -> Concurrently ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO b -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO b -> IO ()) -> (a -> IO b) -> a -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> IO b
f)
forConcurrently_ :: F.Foldable f => f a -> (a -> IO b) -> IO ()
forConcurrently_ :: f a -> (a -> IO b) -> IO ()
forConcurrently_ = ((a -> IO b) -> f a -> IO ()) -> f a -> (a -> IO b) -> IO ()
forall a b c. (a -> b -> c) -> b -> a -> c
flip (a -> IO b) -> f a -> IO ()
forall (f :: * -> *) a b. Foldable f => (a -> IO b) -> f a -> IO ()
mapConcurrently_
replicateConcurrently :: Int -> IO a -> IO [a]
replicateConcurrently :: Int -> IO a -> IO [a]
replicateConcurrently Int
cnt = Concurrently [a] -> IO [a]
forall a. Concurrently a -> IO a
runConcurrently (Concurrently [a] -> IO [a])
-> (IO a -> Concurrently [a]) -> IO a -> IO [a]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [Concurrently a] -> Concurrently [a]
forall (t :: * -> *) (f :: * -> *) a.
(Traversable t, Applicative f) =>
t (f a) -> f (t a)
sequenceA ([Concurrently a] -> Concurrently [a])
-> (IO a -> [Concurrently a]) -> IO a -> Concurrently [a]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> Concurrently a -> [Concurrently a]
forall a. Int -> a -> [a]
replicate Int
cnt (Concurrently a -> [Concurrently a])
-> (IO a -> Concurrently a) -> IO a -> [Concurrently a]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO a -> Concurrently a
forall a. IO a -> Concurrently a
Concurrently
replicateConcurrently_ :: Int -> IO a -> IO ()
replicateConcurrently_ :: Int -> IO a -> IO ()
replicateConcurrently_ Int
cnt = Concurrently () -> IO ()
forall a. Concurrently a -> IO a
runConcurrently (Concurrently () -> IO ())
-> (IO a -> Concurrently ()) -> IO a -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [Concurrently ()] -> Concurrently ()
forall (t :: * -> *) m. (Foldable t, Monoid m) => t m -> m
F.fold ([Concurrently ()] -> Concurrently ())
-> (IO a -> [Concurrently ()]) -> IO a -> Concurrently ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> Concurrently () -> [Concurrently ()]
forall a. Int -> a -> [a]
replicate Int
cnt (Concurrently () -> [Concurrently ()])
-> (IO a -> Concurrently ()) -> IO a -> [Concurrently ()]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO () -> Concurrently ()
forall a. IO a -> Concurrently a
Concurrently (IO () -> Concurrently ())
-> (IO a -> IO ()) -> IO a -> Concurrently ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO a -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void
newtype Concurrently a = Concurrently { Concurrently a -> IO a
runConcurrently :: IO a }
instance Functor Concurrently where
fmap :: (a -> b) -> Concurrently a -> Concurrently b
fmap a -> b
f (Concurrently IO a
a) = IO b -> Concurrently b
forall a. IO a -> Concurrently a
Concurrently (IO b -> Concurrently b) -> IO b -> Concurrently b
forall a b. (a -> b) -> a -> b
$ a -> b
f (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO a
a
instance Applicative Concurrently where
pure :: a -> Concurrently a
pure = IO a -> Concurrently a
forall a. IO a -> Concurrently a
Concurrently (IO a -> Concurrently a) -> (a -> IO a) -> a -> Concurrently a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return
Concurrently IO (a -> b)
fs <*> :: Concurrently (a -> b) -> Concurrently a -> Concurrently b
<*> Concurrently IO a
as =
IO b -> Concurrently b
forall a. IO a -> Concurrently a
Concurrently (IO b -> Concurrently b) -> IO b -> Concurrently b
forall a b. (a -> b) -> a -> b
$ (\(a -> b
f, a
a) -> a -> b
f a
a) ((a -> b, a) -> b) -> IO (a -> b, a) -> IO b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO (a -> b) -> IO a -> IO (a -> b, a)
forall a b. IO a -> IO b -> IO (a, b)
concurrently IO (a -> b)
fs IO a
as
instance Alternative Concurrently where
empty :: Concurrently a
empty = IO a -> Concurrently a
forall a. IO a -> Concurrently a
Concurrently (IO a -> Concurrently a) -> IO a -> Concurrently a
forall a b. (a -> b) -> a -> b
$ IO () -> IO a
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (Int -> IO ()
threadDelay Int
forall a. Bounded a => a
maxBound)
Concurrently IO a
as <|> :: Concurrently a -> Concurrently a -> Concurrently a
<|> Concurrently IO a
bs =
IO a -> Concurrently a
forall a. IO a -> Concurrently a
Concurrently (IO a -> Concurrently a) -> IO a -> Concurrently a
forall a b. (a -> b) -> a -> b
$ (a -> a) -> (a -> a) -> Either a a -> a
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either a -> a
forall a. a -> a
id a -> a
forall a. a -> a
id (Either a a -> a) -> IO (Either a a) -> IO a
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO a -> IO a -> IO (Either a a)
forall a b. IO a -> IO b -> IO (Either a b)
race IO a
as IO a
bs
#if MIN_VERSION_base(4,9,0)
instance Semigroup a => Semigroup (Concurrently a) where
<> :: Concurrently a -> Concurrently a -> Concurrently a
(<>) = (a -> a -> a) -> Concurrently a -> Concurrently a -> Concurrently a
forall (f :: * -> *) a b c.
Applicative f =>
(a -> b -> c) -> f a -> f b -> f c
liftA2 a -> a -> a
forall a. Semigroup a => a -> a -> a
(<>)
instance (Semigroup a, Monoid a) => Monoid (Concurrently a) where
mempty :: Concurrently a
mempty = a -> Concurrently a
forall (f :: * -> *) a. Applicative f => a -> f a
pure a
forall a. Monoid a => a
mempty
mappend :: Concurrently a -> Concurrently a -> Concurrently a
mappend = Concurrently a -> Concurrently a -> Concurrently a
forall a. Semigroup a => a -> a -> a
(<>)
#else
instance Monoid a => Monoid (Concurrently a) where
mempty = pure mempty
mappend = liftA2 mappend
#endif
forkRepeat :: IO a -> IO ThreadId
forkRepeat :: IO a -> IO ThreadId
forkRepeat IO a
action =
((forall b. IO b -> IO b) -> IO ThreadId) -> IO ThreadId
forall b. ((forall b. IO b -> IO b) -> IO b) -> IO b
mask (((forall b. IO b -> IO b) -> IO ThreadId) -> IO ThreadId)
-> ((forall b. IO b -> IO b) -> IO ThreadId) -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ \forall b. IO b -> IO b
restore ->
let go :: IO ()
go = do Either SomeException a
r <- IO a -> IO (Either SomeException a)
forall a. IO a -> IO (Either SomeException a)
tryAll (IO a -> IO a
forall b. IO b -> IO b
restore IO a
action)
case Either SomeException a
r of
Left SomeException
_ -> IO ()
go
Either SomeException a
_ -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
in IO () -> IO ThreadId
forkIO IO ()
go
catchAll :: IO a -> (SomeException -> IO a) -> IO a
catchAll :: IO a -> (SomeException -> IO a) -> IO a
catchAll = IO a -> (SomeException -> IO a) -> IO a
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
catch
tryAll :: IO a -> IO (Either SomeException a)
tryAll :: IO a -> IO (Either SomeException a)
tryAll = IO a -> IO (Either SomeException a)
forall e a. Exception e => IO a -> IO (Either e a)
try
{-# INLINE rawForkIO #-}
rawForkIO :: IO () -> IO ThreadId
rawForkIO :: IO () -> IO ThreadId
rawForkIO (IO State# RealWorld -> (# State# RealWorld, () #)
action) = (State# RealWorld -> (# State# RealWorld, ThreadId #))
-> IO ThreadId
forall a. (State# RealWorld -> (# State# RealWorld, a #)) -> IO a
IO ((State# RealWorld -> (# State# RealWorld, ThreadId #))
-> IO ThreadId)
-> (State# RealWorld -> (# State# RealWorld, ThreadId #))
-> IO ThreadId
forall a b. (a -> b) -> a -> b
$ \ State# RealWorld
s ->
case ((State# RealWorld -> (# State# RealWorld, () #))
-> State# RealWorld -> (# State# RealWorld, ThreadId# #)
forall a.
a -> State# RealWorld -> (# State# RealWorld, ThreadId# #)
fork# State# RealWorld -> (# State# RealWorld, () #)
action State# RealWorld
s) of (# State# RealWorld
s1, ThreadId#
tid #) -> (# State# RealWorld
s1, ThreadId# -> ThreadId
ThreadId ThreadId#
tid #)
{-# INLINE rawForkOn #-}
rawForkOn :: Int -> IO () -> IO ThreadId
rawForkOn :: Int -> IO () -> IO ThreadId
rawForkOn (I# Int#
cpu) (IO State# RealWorld -> (# State# RealWorld, () #)
action) = (State# RealWorld -> (# State# RealWorld, ThreadId #))
-> IO ThreadId
forall a. (State# RealWorld -> (# State# RealWorld, a #)) -> IO a
IO ((State# RealWorld -> (# State# RealWorld, ThreadId #))
-> IO ThreadId)
-> (State# RealWorld -> (# State# RealWorld, ThreadId #))
-> IO ThreadId
forall a b. (a -> b) -> a -> b
$ \ State# RealWorld
s ->
case (Int#
-> (State# RealWorld -> (# State# RealWorld, () #))
-> State# RealWorld
-> (# State# RealWorld, ThreadId# #)
forall a.
Int# -> a -> State# RealWorld -> (# State# RealWorld, ThreadId# #)
forkOn# Int#
cpu State# RealWorld -> (# State# RealWorld, () #)
action State# RealWorld
s) of (# State# RealWorld
s1, ThreadId#
tid #) -> (# State# RealWorld
s1, ThreadId# -> ThreadId
ThreadId ThreadId#
tid #)