async-2.2.4: Run IO operations asynchronously and wait for their results
Copyright (c) Simon Marlow 2012
License BSD3 (see the file LICENSE)
Maintainer Simon Marlow <marlowsd@gmail.com>
Stability provisional
Portability non-portable (requires concurrency)
Safe Haskell Trustworthy
Language Haskell2010

Control.Concurrent.Async

Description

This module provides a set of operations for running IO operations asynchronously and waiting for their results. It is a thin layer over the basic concurrency operations provided by Control.Concurrent . The main additional functionality it provides is the ability to wait for the return value of a thread, but the interface also provides some additional safety and robustness over using forkIO threads and MVar directly.

High-level API

async 's high-level API spawns lexically scoped threads, ensuring the following key poperties that make it safer to use than using plain forkIO :

  1. No exception is swallowed (waiting for results propagates exceptions).
  2. No thread is leaked (left running unintentionally).

(This is done using the bracket pattern to work in presence of synchornous and asynchronous exceptions.)

Most practical/production code should only use the high-level API .

The basic type is Async a , which represents an asynchronous IO action that will return a value of type a , or die with an exception. An Async is a wrapper around a low-level forkIO thread.

The fundamental function to spawn threads with the high-level API is withAsync .

For example, to fetch two web pages at the same time, we could do this (assuming a suitable getURL function):

withAsync (getURL url1) $ \a1 -> do
  withAsync (getURL url2) $ \a2 -> do
    page1 <- wait a1
    page2 <- wait a2
    ...

where withAsync starts the operation in a separate thread, and wait waits for and returns the result.

  • If the operation throws an exception, then that exception is re-thrown by wait . This ensures property (1): No exception is swallowed.
  • If an exception bubbles up through a withAsync , then the Async it spawned is cancel ed. This ensures property (2): No thread is leaked.

Often we do not care to work manually with Async handles like a1 and a2 . Instead, we want to express high-level objectives like performing two or more tasks concurrently, and waiting for one or all of them to finish.

For example, the pattern of performing two IO actions concurrently and waiting for both their results is packaged up in a combinator concurrently , so we can further shorten the above example to:

(page1, page2) <- concurrently (getURL url1) (getURL url2)
...

The section High-level utilities covers the most common high-level objectives, including:

Click here to scroll to that section: Control.Concurrent.Async .

Low-level API

Some use cases require parallelism that is not lexically scoped.

For those, the low-level function async can be used as a direct equivalent of forkIO :

-- Do NOT use this code in production, it has a flaw (explained below).
do
  a1 <- async (getURL url1)
  a2 <- async (getURL url2)
  page1 <- wait a1
  page2 <- wait a2
  ...

In contrast to withAsync , this code has a problem.

It still fulfills property (1) in that an exception arising from getUrl will be re-thrown by wait , but it does not fulfill property (2). Consider the case when the first wait throws an exception; then the second wait will not happen, and the second async may be left running in the background, possibly indefinitely.

withAsync is like async , except that the Async is automatically killed (using uninterruptibleCancel ) if the enclosing IO operation returns before it has completed. Furthermore, withAsync allows a tree of threads to be built, such that children are automatically killed if their parents die for any reason.

If you need to use the low-level API, ensure that you gurantee property (2) by other means, such as link ing asyncs that need to die together, and protecting against asynchronous exceptions using bracket , mask , or other functions from Control.Exception .

Miscellaneous

The Functor instance can be used to change the result of an Async . For example:

ghci> withAsync (return 3) (\a -> wait (fmap (+1) a))
4

Resource exhaustion

As with all concurrent programming, keep in mind that while Haskell's cooperative ("green") multithreading carries low overhead, spawning too many of them at the same time may lead to resource exhaustion (of memory, file descriptors, or other limited resources), given that the actions running in the threads consume these resources.

Synopsis

Asynchronous actions

data Async a Source #

An asynchronous action spawned by async or withAsync . Asynchronous actions are executed in a separate thread, and operations are provided for waiting for asynchronous actions to complete and obtaining their results (see e.g. wait ).

High-level API

Spawning with automatic cancel ation

withAsync :: IO a -> ( Async a -> IO b) -> IO b Source #

Spawn an asynchronous action in a separate thread, and pass its Async handle to the supplied function. When the function returns or throws an exception, uninterruptibleCancel is called on the Async .

withAsync action inner = mask $ \restore -> do
  a <- async (restore action)
  restore (inner a) `finally` uninterruptibleCancel a

This is a useful variant of async that ensures an Async is never left running unintentionally.

Note: a reference to the child thread is kept alive until the call to withAsync returns, so nesting many withAsync calls requires linear memory.

withAsyncWithUnmask :: (( forall c. IO c -> IO c) -> IO a) -> ( Async a -> IO b) -> IO b Source #

Like withAsync but uses forkIOWithUnmask internally. The child thread is passed a function that can be used to unmask asynchronous exceptions.

withAsyncOnWithUnmask :: Int -> (( forall c. IO c -> IO c) -> IO a) -> ( Async a -> IO b) -> IO b Source #

Like withAsyncOn but uses forkOnWithUnmask internally. The child thread is passed a function that can be used to unmask asynchronous exceptions

Querying Async s

wait :: Async a -> IO a Source #

Wait for an asynchronous action to complete, and return its value. If the asynchronous action threw an exception, then the exception is re-thrown by wait .

wait = atomically . waitSTM

poll :: Async a -> IO ( Maybe ( Either SomeException a)) Source #

Check whether an Async has completed yet. If it has not completed yet, then the result is Nothing , otherwise the result is Just e where e is Left x if the Async raised an exception x , or Right a if it returned a value a .

poll = atomically . pollSTM

waitCatch :: Async a -> IO ( Either SomeException a) Source #

Wait for an asynchronous action to complete, and return either Left e if the action raised an exception e , or Right a if it returned a value a .

waitCatch = atomically . waitCatchSTM

asyncThreadId :: Async a -> ThreadId Source #

Returns the ThreadId of the thread running the given Async .

cancel :: Async a -> IO () Source #

Cancel an asynchronous action by throwing the AsyncCancelled exception to it, and waiting for the Async thread to quit. Has no effect if the Async has already completed.

cancel a = throwTo (asyncThreadId a) AsyncCancelled <* waitCatch a

Note that cancel will not terminate until the thread the Async refers to has terminated. This means that cancel will block for as long said thread blocks when receiving an asynchronous exception.

For example, it could block if:

  • It's executing a foreign call, and thus cannot receive the asynchronous exception;
  • It's executing some cleanup handler after having received the exception, and the handler is blocking.

uninterruptibleCancel :: Async a -> IO () Source #

Cancel an asynchronous action

This is a variant of cancel , but it is not interruptible.

cancelWith :: Exception e => Async a -> e -> IO () Source #

Cancel an asynchronous action by throwing the supplied exception to it.

cancelWith a x = throwTo (asyncThreadId a) x

The notes about the synchronous nature of cancel also apply to cancelWith .

High-level utilities

race :: IO a -> IO b -> IO ( Either a b) Source #

Run two IO actions concurrently, and return the first to finish. The loser of the race is cancel led.

race left right =
  withAsync left $ \a ->
  withAsync right $ \b ->
  waitEither a b

race_ :: IO a -> IO b -> IO () Source #

Like race , but the result is ignored.

concurrently :: IO a -> IO b -> IO (a, b) Source #

Run two IO actions concurrently, and return both results. If either action throws an exception at any time, then the other action is cancel led, and the exception is re-thrown by concurrently .

concurrently left right =
  withAsync left $ \a ->
  withAsync right $ \b ->
  waitBoth a b

concurrently_ :: IO a -> IO b -> IO () Source #

concurrently , but ignore the result values

Since: 2.1.1

mapConcurrently :: Traversable t => (a -> IO b) -> t a -> IO (t b) Source #

Maps an IO -performing function over any Traversable data type, performing all the IO actions concurrently, and returning the original data structure with the arguments replaced by the results.

If any of the actions throw an exception, then all other actions are cancelled and the exception is re-thrown.

For example, mapConcurrently works with lists:

pages <- mapConcurrently getURL ["url1", "url2", "url3"]

Take into account that async will try to immediately spawn a thread for each element of the Traversable , so running this on large inputs without care may lead to resource exhaustion (of memory, file descriptors, or other limited resources).

forConcurrently :: Traversable t => t a -> (a -> IO b) -> IO (t b) Source #

forConcurrently is mapConcurrently with its arguments flipped

pages <- forConcurrently ["url1", "url2", "url3"] $ \url -> getURL url

Since: 2.1.0

mapConcurrently_ :: Foldable f => (a -> IO b) -> f a -> IO () Source #

mapConcurrently_ is mapConcurrently with the return value discarded; a concurrent equivalent of mapM_ .

forConcurrently_ :: Foldable f => f a -> (a -> IO b) -> IO () Source #

forConcurrently_ is forConcurrently with the return value discarded; a concurrent equivalent of forM_ .

replicateConcurrently :: Int -> IO a -> IO [a] Source #

Perform the action in the given number of threads.

Since: 2.1.1

replicateConcurrently_ :: Int -> IO a -> IO () Source #

Same as replicateConcurrently , but ignore the results.

Since: 2.1.1

newtype Concurrently a Source #

A value of type Concurrently a is an IO operation that can be composed with other Concurrently values, using the Applicative and Alternative instances.

Calling runConcurrently on a value of type Concurrently a will execute the IO operations it contains concurrently, before delivering the result of type a .

For example

(page1, page2, page3)
    <- runConcurrently $ (,,)
    <$> Concurrently (getURL "url1")
    <*> Concurrently (getURL "url2")
    <*> Concurrently (getURL "url3")

Instances

Instances details
Functor Concurrently Source #
Instance details

Defined in Control.Concurrent.Async

Applicative Concurrently Source #
Instance details

Defined in Control.Concurrent.Async

Alternative Concurrently Source #
Instance details

Defined in Control.Concurrent.Async

Semigroup a => Semigroup ( Concurrently a) Source #

Only defined by async for base >= 4.9

Since: 2.1.0

Instance details

Defined in Control.Concurrent.Async

( Semigroup a, Monoid a) => Monoid ( Concurrently a) Source #

Since: 2.1.0

Instance details

Defined in Control.Concurrent.Async

compareAsyncs :: Async a -> Async b -> Ordering Source #

Compare two Asyncs that may have different types by their ThreadId .

Specialised operations

STM operations

waitSTM :: Async a -> STM a Source #

A version of wait that can be used inside an STM transaction.

pollSTM :: Async a -> STM ( Maybe ( Either SomeException a)) Source #

A version of poll that can be used inside an STM transaction.

waitCatchSTM :: Async a -> STM ( Either SomeException a) Source #

A version of waitCatch that can be used inside an STM transaction.

Waiting for multiple Async s

waitAny :: [ Async a] -> IO ( Async a, a) Source #

Wait for any of the supplied Async s to complete. If the first to complete throws an exception, then that exception is re-thrown by waitAny .

If multiple Async s complete or have completed, then the value returned corresponds to the first completed Async in the list.

waitAnyCatch :: [ Async a] -> IO ( Async a, Either SomeException a) Source #

Wait for any of the supplied asynchronous operations to complete. The value returned is a pair of the Async that completed, and the result that would be returned by wait on that Async .

If multiple Async s complete or have completed, then the value returned corresponds to the first completed Async in the list.

waitAnyCancel :: [ Async a] -> IO ( Async a, a) Source #

Like waitAny , but also cancels the other asynchronous operations as soon as one has completed.

waitAnyCatchCancel :: [ Async a] -> IO ( Async a, Either SomeException a) Source #

Like waitAnyCatch , but also cancels the other asynchronous operations as soon as one has completed.

waitEither :: Async a -> Async b -> IO ( Either a b) Source #

Wait for the first of two Async s to finish. If the Async that finished first raised an exception, then the exception is re-thrown by waitEither .

waitEitherCancel :: Async a -> Async b -> IO ( Either a b) Source #

Like waitEither , but also cancel s both Async s before returning.

waitEither_ :: Async a -> Async b -> IO () Source #

Like waitEither , but the result is ignored.

waitBoth :: Async a -> Async b -> IO (a, b) Source #

Waits for both Async s to finish, but if either of them throws an exception before they have both finished, then the exception is re-thrown by waitBoth .

Waiting for multiple Async s in STM

waitAnySTM :: [ Async a] -> STM ( Async a, a) Source #

A version of waitAny that can be used inside an STM transaction.

Since: 2.1.0

waitAnyCatchSTM :: [ Async a] -> STM ( Async a, Either SomeException a) Source #

A version of waitAnyCatch that can be used inside an STM transaction.

Since: 2.1.0

waitEitherSTM :: Async a -> Async b -> STM ( Either a b) Source #

A version of waitEither that can be used inside an STM transaction.

Since: 2.1.0

waitEitherCatchSTM :: Async a -> Async b -> STM ( Either ( Either SomeException a) ( Either SomeException b)) Source #

A version of waitEitherCatch that can be used inside an STM transaction.

Since: 2.1.0

waitEitherSTM_ :: Async a -> Async b -> STM () Source #

A version of waitEither_ that can be used inside an STM transaction.

Since: 2.1.0

waitBothSTM :: Async a -> Async b -> STM (a, b) Source #

A version of waitBoth that can be used inside an STM transaction.

Since: 2.1.0

Low-level API

Spawning (low-level API)

async :: IO a -> IO ( Async a) Source #

Spawn an asynchronous action in a separate thread.

Like for forkIO , the action may be left running unintentinally (see module-level documentation for details).

Use withAsync style functions wherever you can instead!

asyncWithUnmask :: (( forall b. IO b -> IO b) -> IO a) -> IO ( Async a) Source #

Like async but using forkIOWithUnmask internally. The child thread is passed a function that can be used to unmask asynchronous exceptions.

asyncOnWithUnmask :: Int -> (( forall b. IO b -> IO b) -> IO a) -> IO ( Async a) Source #

Like asyncOn but using forkOnWithUnmask internally. The child thread is passed a function that can be used to unmask asynchronous exceptions.

Linking

link :: Async a -> IO () Source #

Link the given Async to the current thread, such that if the Async raises an exception, that exception will be re-thrown in the current thread, wrapped in ExceptionInLinkedThread .

link ignores AsyncCancelled exceptions thrown in the other thread, so that it's safe to cancel a thread you're linked to. If you want different behaviour, use linkOnly .

linkOnly Source #

Arguments

:: ( SomeException -> Bool )

return True if the exception should be propagated, False otherwise.

-> Async a
-> IO ()

Link the given Async to the current thread, such that if the Async raises an exception, that exception will be re-thrown in the current thread, wrapped in ExceptionInLinkedThread .

The supplied predicate determines which exceptions in the target thread should be propagated to the source thread.

link2 :: Async a -> Async b -> IO () Source #

Link two Async s together, such that if either raises an exception, the same exception is re-thrown in the other Async , wrapped in ExceptionInLinkedThread .

link2 ignores AsyncCancelled exceptions, so that it's possible to cancel either thread without cancelling the other. If you want different behaviour, use link2Only .

link2Only :: ( SomeException -> Bool ) -> Async a -> Async b -> IO () Source #

Link two Async s together, such that if either raises an exception, the same exception is re-thrown in the other Async , wrapped in ExceptionInLinkedThread .

The supplied predicate determines which exceptions in the target thread should be propagated to the source thread.