Copyright | (c) Simon Marlow 2012 |
---|---|
License | BSD3 (see the file LICENSE) |
Maintainer | Simon Marlow <marlowsd@gmail.com> |
Stability | provisional |
Portability | non-portable (requires concurrency) |
Safe Haskell | Trustworthy |
Language | Haskell2010 |
This module provides a set of operations for running IO operations
asynchronously and waiting for their results. It is a thin layer
over the basic concurrency operations provided by
Control.Concurrent
. The main additional functionality it
provides is the ability to wait for the return value of a thread,
but the interface also provides some additional safety and
robustness over using
forkIO
threads and
MVar
directly.
High-level API
async
's high-level API spawns
lexically scoped
threads,
ensuring the following key poperties that make it safer to use
than using plain
forkIO
:
- No exception is swallowed (waiting for results propagates exceptions).
- No thread is leaked (left running unintentionally).
(This is done using the
bracket
pattern to work in presence
of synchornous and asynchronous exceptions.)
Most practical/production code should only use the high-level API .
The basic type is
, which represents an asynchronous
Async
a
IO
action that will return a value of type
a
, or die with an
exception. An
Async
is a wrapper around a low-level
forkIO
thread.
The fundamental function to spawn threads with the high-level API is
withAsync
.
For example, to fetch two web pages at the same time, we could do
this (assuming a suitable
getURL
function):
withAsync (getURL url1) $ \a1 -> do withAsync (getURL url2) $ \a2 -> do page1 <- wait a1 page2 <- wait a2 ...
where
withAsync
starts the operation in a separate thread, and
wait
waits for and returns the result.
-
If the operation throws an exception, then that exception is re-thrown
by
wait
. This ensures property (1): No exception is swallowed. -
If an exception bubbles up through a
withAsync
, then theAsync
it spawned iscancel
ed. This ensures property (2): No thread is leaked.
Often we do not care to work manually with
Async
handles like
a1
and
a2
. Instead, we want to express high-level objectives like
performing two or more tasks concurrently, and waiting for one or all
of them to finish.
For example, the pattern of performing two IO actions concurrently and
waiting for both their results is packaged up in a combinator
concurrently
,
so we can further shorten the above example to:
(page1, page2) <- concurrently (getURL url1) (getURL url2) ...
The section High-level utilities covers the most common high-level objectives, including:
-
Waiting for 2 results (
concurrently
). -
Waiting for many results (
mapConcurrently
/forConcurrently
). -
Waiting for the first of 2 results (
race
). -
Waiting for arbitrary nestings of "all of
N
" and "the first of
N
"
results with the
Concurrently
newtype and itsApplicative
andAlternative
instances.
Click here to scroll to that section: Control.Concurrent.Async .
Low-level API
Some use cases require parallelism that is not lexically scoped.
For those, the low-level function
async
can be used as a direct
equivalent of
forkIO
:
-- Do NOT use this code in production, it has a flaw (explained below). do a1 <- async (getURL url1) a2 <- async (getURL url2) page1 <- wait a1 page2 <- wait a2 ...
In contrast to
withAsync
, this code has a problem.
It still fulfills property (1) in that an exception arising from
getUrl
will be re-thrown by
wait
, but it does not fulfill
property (2).
Consider the case when the first
wait
throws an exception; then the
second
wait
will not happen, and the second
async
may be left
running in the background, possibly indefinitely.
withAsync
is like
async
, except that the
Async
is
automatically killed (using
uninterruptibleCancel
) if the
enclosing IO operation returns before it has completed.
Furthermore,
withAsync
allows a tree of threads to be built, such
that children are automatically killed if their parents die for any
reason.
If you need to use the low-level API, ensure that you gurantee
property (2) by other means, such as
link
ing asyncs that need
to die together, and protecting against asynchronous exceptions
using
bracket
,
mask
,
or other functions from
Control.Exception
.
Miscellaneous
The
Functor
instance can be used to change the result of an
Async
. For example:
ghci> withAsync (return 3) (\a -> wait (fmap (+1) a)) 4
Resource exhaustion
As with all concurrent programming, keep in mind that while Haskell's cooperative ("green") multithreading carries low overhead, spawning too many of them at the same time may lead to resource exhaustion (of memory, file descriptors, or other limited resources), given that the actions running in the threads consume these resources.
Synopsis
- data Async a
- withAsync :: IO a -> ( Async a -> IO b) -> IO b
- withAsyncBound :: IO a -> ( Async a -> IO b) -> IO b
- withAsyncOn :: Int -> IO a -> ( Async a -> IO b) -> IO b
- withAsyncWithUnmask :: (( forall c. IO c -> IO c) -> IO a) -> ( Async a -> IO b) -> IO b
- withAsyncOnWithUnmask :: Int -> (( forall c. IO c -> IO c) -> IO a) -> ( Async a -> IO b) -> IO b
- wait :: Async a -> IO a
- poll :: Async a -> IO ( Maybe ( Either SomeException a))
- waitCatch :: Async a -> IO ( Either SomeException a)
- asyncThreadId :: Async a -> ThreadId
- cancel :: Async a -> IO ()
- uninterruptibleCancel :: Async a -> IO ()
- cancelWith :: Exception e => Async a -> e -> IO ()
- data AsyncCancelled = AsyncCancelled
- race :: IO a -> IO b -> IO ( Either a b)
- race_ :: IO a -> IO b -> IO ()
- concurrently :: IO a -> IO b -> IO (a, b)
- concurrently_ :: IO a -> IO b -> IO ()
- mapConcurrently :: Traversable t => (a -> IO b) -> t a -> IO (t b)
- forConcurrently :: Traversable t => t a -> (a -> IO b) -> IO (t b)
- mapConcurrently_ :: Foldable f => (a -> IO b) -> f a -> IO ()
- forConcurrently_ :: Foldable f => f a -> (a -> IO b) -> IO ()
- replicateConcurrently :: Int -> IO a -> IO [a]
- replicateConcurrently_ :: Int -> IO a -> IO ()
-
newtype
Concurrently
a =
Concurrently
{
- runConcurrently :: IO a
- compareAsyncs :: Async a -> Async b -> Ordering
- waitSTM :: Async a -> STM a
- pollSTM :: Async a -> STM ( Maybe ( Either SomeException a))
- waitCatchSTM :: Async a -> STM ( Either SomeException a)
- waitAny :: [ Async a] -> IO ( Async a, a)
- waitAnyCatch :: [ Async a] -> IO ( Async a, Either SomeException a)
- waitAnyCancel :: [ Async a] -> IO ( Async a, a)
- waitAnyCatchCancel :: [ Async a] -> IO ( Async a, Either SomeException a)
- waitEither :: Async a -> Async b -> IO ( Either a b)
- waitEitherCatch :: Async a -> Async b -> IO ( Either ( Either SomeException a) ( Either SomeException b))
- waitEitherCancel :: Async a -> Async b -> IO ( Either a b)
- waitEitherCatchCancel :: Async a -> Async b -> IO ( Either ( Either SomeException a) ( Either SomeException b))
- waitEither_ :: Async a -> Async b -> IO ()
- waitBoth :: Async a -> Async b -> IO (a, b)
- waitAnySTM :: [ Async a] -> STM ( Async a, a)
- waitAnyCatchSTM :: [ Async a] -> STM ( Async a, Either SomeException a)
- waitEitherSTM :: Async a -> Async b -> STM ( Either a b)
- waitEitherCatchSTM :: Async a -> Async b -> STM ( Either ( Either SomeException a) ( Either SomeException b))
- waitEitherSTM_ :: Async a -> Async b -> STM ()
- waitBothSTM :: Async a -> Async b -> STM (a, b)
- async :: IO a -> IO ( Async a)
- asyncBound :: IO a -> IO ( Async a)
- asyncOn :: Int -> IO a -> IO ( Async a)
- asyncWithUnmask :: (( forall b. IO b -> IO b) -> IO a) -> IO ( Async a)
- asyncOnWithUnmask :: Int -> (( forall b. IO b -> IO b) -> IO a) -> IO ( Async a)
- link :: Async a -> IO ()
- linkOnly :: ( SomeException -> Bool ) -> Async a -> IO ()
- link2 :: Async a -> Async b -> IO ()
- link2Only :: ( SomeException -> Bool ) -> Async a -> Async b -> IO ()
- data ExceptionInLinkedThread = forall a. ExceptionInLinkedThread ( Async a) SomeException
Asynchronous actions
An asynchronous action spawned by
async
or
withAsync
.
Asynchronous actions are executed in a separate thread, and
operations are provided for waiting for asynchronous actions to
complete and obtaining their results (see e.g.
wait
).
Instances
Functor Async Source # | |
Eq ( Async a) Source # | |
Ord ( Async a) Source # | |
Defined in Control.Concurrent.Async |
|
Hashable ( Async a) Source # | |
High-level API
Spawning with automatic
cancel
ation
withAsync :: IO a -> ( Async a -> IO b) -> IO b Source #
Spawn an asynchronous action in a separate thread, and pass its
Async
handle to the supplied function. When the function returns
or throws an exception,
uninterruptibleCancel
is called on the
Async
.
withAsync action inner = mask $ \restore -> do a <- async (restore action) restore (inner a) `finally` uninterruptibleCancel a
This is a useful variant of
async
that ensures an
Async
is
never left running unintentionally.
Note: a reference to the child thread is kept alive until the call
to
withAsync
returns, so nesting many
withAsync
calls requires
linear memory.
withAsyncWithUnmask :: (( forall c. IO c -> IO c) -> IO a) -> ( Async a -> IO b) -> IO b Source #
Like
withAsync
but uses
forkIOWithUnmask
internally. The
child thread is passed a function that can be used to unmask
asynchronous exceptions.
withAsyncOnWithUnmask :: Int -> (( forall c. IO c -> IO c) -> IO a) -> ( Async a -> IO b) -> IO b Source #
Like
withAsyncOn
but uses
forkOnWithUnmask
internally. The
child thread is passed a function that can be used to unmask
asynchronous exceptions
Querying
Async
s
wait :: Async a -> IO a Source #
Wait for an asynchronous action to complete, and return its
value. If the asynchronous action threw an exception, then the
exception is re-thrown by
wait
.
wait = atomically . waitSTM
poll :: Async a -> IO ( Maybe ( Either SomeException a)) Source #
Check whether an
Async
has completed yet. If it has not
completed yet, then the result is
Nothing
, otherwise the result
is
Just e
where
e
is
Left x
if the
Async
raised an
exception
x
, or
Right a
if it returned a value
a
.
poll = atomically . pollSTM
waitCatch :: Async a -> IO ( Either SomeException a) Source #
Wait for an asynchronous action to complete, and return either
Left e
if the action raised an exception
e
, or
Right a
if it
returned a value
a
.
waitCatch = atomically . waitCatchSTM
asyncThreadId :: Async a -> ThreadId Source #
cancel :: Async a -> IO () Source #
Cancel an asynchronous action by throwing the
AsyncCancelled
exception to it, and waiting for the
Async
thread to quit.
Has no effect if the
Async
has already completed.
cancel a = throwTo (asyncThreadId a) AsyncCancelled <* waitCatch a
Note that
cancel
will not terminate until the thread the
Async
refers to has terminated. This means that
cancel
will block for
as long said thread blocks when receiving an asynchronous exception.
For example, it could block if:
- It's executing a foreign call, and thus cannot receive the asynchronous exception;
- It's executing some cleanup handler after having received the exception, and the handler is blocking.
uninterruptibleCancel :: Async a -> IO () Source #
Cancel an asynchronous action
This is a variant of
cancel
, but it is not interruptible.
cancelWith :: Exception e => Async a -> e -> IO () Source #
Cancel an asynchronous action by throwing the supplied exception to it.
cancelWith a x = throwTo (asyncThreadId a) x
The notes about the synchronous nature of
cancel
also apply to
cancelWith
.
data AsyncCancelled Source #
The exception thrown by
cancel
to terminate a thread.
Instances
Eq AsyncCancelled Source # | |
Defined in Control.Concurrent.Async (==) :: AsyncCancelled -> AsyncCancelled -> Bool Source # (/=) :: AsyncCancelled -> AsyncCancelled -> Bool Source # |
|
Show AsyncCancelled Source # | |
Defined in Control.Concurrent.Async |
|
Exception AsyncCancelled Source # | |
Defined in Control.Concurrent.Async |
High-level utilities
race :: IO a -> IO b -> IO ( Either a b) Source #
Run two
IO
actions concurrently, and return the first to
finish. The loser of the race is
cancel
led.
race left right = withAsync left $ \a -> withAsync right $ \b -> waitEither a b
concurrently :: IO a -> IO b -> IO (a, b) Source #
Run two
IO
actions concurrently, and return both results. If
either action throws an exception at any time, then the other
action is
cancel
led, and the exception is re-thrown by
concurrently
.
concurrently left right = withAsync left $ \a -> withAsync right $ \b -> waitBoth a b
concurrently_ :: IO a -> IO b -> IO () Source #
concurrently
, but ignore the result values
Since: 2.1.1
mapConcurrently :: Traversable t => (a -> IO b) -> t a -> IO (t b) Source #
Maps an
IO
-performing function over any
Traversable
data
type, performing all the
IO
actions concurrently, and returning
the original data structure with the arguments replaced by the
results.
If any of the actions throw an exception, then all other actions are cancelled and the exception is re-thrown.
For example,
mapConcurrently
works with lists:
pages <- mapConcurrently getURL ["url1", "url2", "url3"]
Take into account that
async
will try to immediately spawn a thread
for each element of the
Traversable
, so running this on large
inputs without care may lead to resource exhaustion (of memory,
file descriptors, or other limited resources).
forConcurrently :: Traversable t => t a -> (a -> IO b) -> IO (t b) Source #
forConcurrently
is
mapConcurrently
with its arguments flipped
pages <- forConcurrently ["url1", "url2", "url3"] $ \url -> getURL url
Since: 2.1.0
mapConcurrently_ :: Foldable f => (a -> IO b) -> f a -> IO () Source #
mapConcurrently_
is
mapConcurrently
with the return value discarded;
a concurrent equivalent of
mapM_
.
forConcurrently_ :: Foldable f => f a -> (a -> IO b) -> IO () Source #
forConcurrently_
is
forConcurrently
with the return value discarded;
a concurrent equivalent of
forM_
.
replicateConcurrently :: Int -> IO a -> IO [a] Source #
Perform the action in the given number of threads.
Since: 2.1.1
replicateConcurrently_ :: Int -> IO a -> IO () Source #
Same as
replicateConcurrently
, but ignore the results.
Since: 2.1.1
newtype Concurrently a Source #
A value of type
Concurrently a
is an
IO
operation that can be
composed with other
Concurrently
values, using the
Applicative
and
Alternative
instances.
Calling
runConcurrently
on a value of type
Concurrently a
will
execute the
IO
operations it contains concurrently, before
delivering the result of type
a
.
For example
(page1, page2, page3) <- runConcurrently $ (,,) <$> Concurrently (getURL "url1") <*> Concurrently (getURL "url2") <*> Concurrently (getURL "url3")
Instances
compareAsyncs :: Async a -> Async b -> Ordering Source #
Compare two Asyncs that may have different types by their
ThreadId
.
Specialised operations
STM operations
pollSTM :: Async a -> STM ( Maybe ( Either SomeException a)) Source #
A version of
poll
that can be used inside an STM transaction.
waitCatchSTM :: Async a -> STM ( Either SomeException a) Source #
A version of
waitCatch
that can be used inside an STM transaction.
Waiting for multiple
Async
s
waitAnyCatch :: [ Async a] -> IO ( Async a, Either SomeException a) Source #
Wait for any of the supplied asynchronous operations to complete.
The value returned is a pair of the
Async
that completed, and the
result that would be returned by
wait
on that
Async
.
If multiple
Async
s complete or have completed, then the value
returned corresponds to the first completed
Async
in the list.
waitAnyCancel :: [ Async a] -> IO ( Async a, a) Source #
Like
waitAny
, but also cancels the other asynchronous
operations as soon as one has completed.
waitAnyCatchCancel :: [ Async a] -> IO ( Async a, Either SomeException a) Source #
Like
waitAnyCatch
, but also cancels the other asynchronous
operations as soon as one has completed.
waitEither :: Async a -> Async b -> IO ( Either a b) Source #
Wait for the first of two
Async
s to finish. If the
Async
that finished first raised an exception, then the exception is
re-thrown by
waitEither
.
waitEitherCatch :: Async a -> Async b -> IO ( Either ( Either SomeException a) ( Either SomeException b)) Source #
Wait for the first of two
Async
s to finish.
waitEitherCancel :: Async a -> Async b -> IO ( Either a b) Source #
Like
waitEither
, but also
cancel
s both
Async
s before
returning.
waitEitherCatchCancel :: Async a -> Async b -> IO ( Either ( Either SomeException a) ( Either SomeException b)) Source #
Like
waitEitherCatch
, but also
cancel
s both
Async
s before
returning.
waitEither_ :: Async a -> Async b -> IO () Source #
Like
waitEither
, but the result is ignored.
waitBoth :: Async a -> Async b -> IO (a, b) Source #
Waits for both
Async
s to finish, but if either of them throws
an exception before they have both finished, then the exception is
re-thrown by
waitBoth
.
Waiting for multiple
Async
s in STM
waitAnySTM :: [ Async a] -> STM ( Async a, a) Source #
A version of
waitAny
that can be used inside an STM transaction.
Since: 2.1.0
waitAnyCatchSTM :: [ Async a] -> STM ( Async a, Either SomeException a) Source #
A version of
waitAnyCatch
that can be used inside an STM transaction.
Since: 2.1.0
waitEitherSTM :: Async a -> Async b -> STM ( Either a b) Source #
A version of
waitEither
that can be used inside an STM transaction.
Since: 2.1.0
waitEitherCatchSTM :: Async a -> Async b -> STM ( Either ( Either SomeException a) ( Either SomeException b)) Source #
A version of
waitEitherCatch
that can be used inside an STM transaction.
Since: 2.1.0
waitEitherSTM_ :: Async a -> Async b -> STM () Source #
A version of
waitEither_
that can be used inside an STM transaction.
Since: 2.1.0
waitBothSTM :: Async a -> Async b -> STM (a, b) Source #
A version of
waitBoth
that can be used inside an STM transaction.
Since: 2.1.0
Low-level API
Spawning (low-level API)
asyncWithUnmask :: (( forall b. IO b -> IO b) -> IO a) -> IO ( Async a) Source #
Like
async
but using
forkIOWithUnmask
internally. The child
thread is passed a function that can be used to unmask asynchronous
exceptions.
asyncOnWithUnmask :: Int -> (( forall b. IO b -> IO b) -> IO a) -> IO ( Async a) Source #
Like
asyncOn
but using
forkOnWithUnmask
internally. The
child thread is passed a function that can be used to unmask
asynchronous exceptions.
Linking
link :: Async a -> IO () Source #
Link the given
Async
to the current thread, such that if the
Async
raises an exception, that exception will be re-thrown in
the current thread, wrapped in
ExceptionInLinkedThread
.
link
ignores
AsyncCancelled
exceptions thrown in the other thread,
so that it's safe to
cancel
a thread you're linked to. If you want
different behaviour, use
linkOnly
.
:: ( SomeException -> Bool ) |
return
|
-> Async a | |
-> IO () |
Link the given
Async
to the current thread, such that if the
Async
raises an exception, that exception will be re-thrown in
the current thread, wrapped in
ExceptionInLinkedThread
.
The supplied predicate determines which exceptions in the target thread should be propagated to the source thread.
link2 :: Async a -> Async b -> IO () Source #
Link two
Async
s together, such that if either raises an
exception, the same exception is re-thrown in the other
Async
,
wrapped in
ExceptionInLinkedThread
.
link2
ignores
AsyncCancelled
exceptions, so that it's possible
to
cancel
either thread without cancelling the other. If you
want different behaviour, use
link2Only
.
link2Only :: ( SomeException -> Bool ) -> Async a -> Async b -> IO () Source #
Link two
Async
s together, such that if either raises an
exception, the same exception is re-thrown in the other
Async
,
wrapped in
ExceptionInLinkedThread
.
The supplied predicate determines which exceptions in the target thread should be propagated to the source thread.
data ExceptionInLinkedThread Source #
forall a. ExceptionInLinkedThread ( Async a) SomeException |