Safe Haskell None
Language Haskell2010




type SocketStateChange m s addr = SocketState m addr -> s -> STM m s Source #

Callback which fires: when we create or close a socket.

data SocketState m addr Source #


CreatedSocket !addr !( Async m ())
ClosedSocket !addr !( Async m ())

type CompleteApplication m s addr r = Result addr r -> s -> STM m ( CompleteApplicationResult m addr s) Source #

Complete a connection, which receive application result (or exception).

data Result addr r where Source #

Result of the connection thread. It's either result of an application, or an exception thrown by it.


ApplicationResult :: ! Time -> !addr -> !r -> Result addr r
Connected :: ! Time -> !addr -> Result addr r
ConnectionError :: Exception e => ! Time -> !addr -> !e -> Result addr r
ApplicationError :: Exception e => ! Time -> !addr -> !e -> Result addr r

type Main m s t = s -> STM m t Source #

Given current state retry too keep the subscription worker going. When this transaction returns, all the threads spawned by the worker will be killed.

type StateVar m s = StrictTVar m s Source #

Mutable state kept by the worker. All the workers in this module are polymorphic over the state type. The state is updated with two callbacks:

  • CompleteConnect - STM transaction which runs when the connect call returned, if it thrown an exception it will be passed to the callback.
  • CompleteApplication - STM transaction which runs when application returned. It will receive the result of the application or an exception raised by it.

Subscription worker

data WorkerParams m localAddrs addr Source #

Worker parameters




worker Source #


:: forall s sock localAddrs addr a x. Ord addr
=> Tracer IO ( SubscriptionTrace addr)
-> Tracer IO ( WithAddr addr ErrorPolicyTrace )
-> ConnectionTable IO addr
-> StateVar IO s
-> Snocket IO sock addr
-> WorkerCallbacks IO s addr a x
-> WorkerParams IO localAddrs addr
-> (sock -> IO a)


-> IO x

This is the most abstract worker, which puts all the pieces together. It will execute until main :: Main m s t returns. It runs subscriptionLoop in a new threads and will exit when it dies. Spawn threads are cancelled in a finally callback by throwing SubscriberError .

Note: This function runs in IO only because MonadSTM does not yet support orElse , PR #432.

Socket API

safeConnect Source #


:: ( MonadThrow m, MonadMask m)
=> Snocket m sock addr
-> addr

remote addr

-> addr

local addr

-> m ()

allocate extra action; executed with async exceptions masked in the allocation action of bracket

-> m ()

release extra action; executed with async exceptions masked in the closing action of bracket

-> (( forall x. m x -> m x) -> sock -> Either SomeException () -> m t)

continuation executed with async exceptions masked; it receives: unmask function, allocated socket and connection error.

-> m t

Allocate a socket and connect to a peer, execute the continuation with async exceptions masked. The continuation receives the unmask callback.


defaultConnectionAttemptDelay :: DiffTime Source #

Time to wait between connection attempts when we don't have any DeltaQ info.

minConnectionAttemptDelay :: DiffTime Source #

Minimum time to wait between connection attempts.

maxConnectionAttemptDelay :: DiffTime Source #

Maximum time to wait between connection attempts.

ipRetryDelay :: DiffTime Source #

Minimum time to wait between ip reconnects

