{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE CPP #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE RecursiveDo #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE StandaloneDeriving #-}
module Ouroboros.Network.Subscription.Worker
( SocketStateChange
, SocketState (..)
, CompleteApplication
, ConnectResult (..)
, Result (..)
, Main
, StateVar
, LocalAddresses (..)
, WorkerCallbacks (..)
, WorkerParams (..)
, worker
, safeConnect
, defaultConnectionAttemptDelay
, minConnectionAttemptDelay
, maxConnectionAttemptDelay
, ipRetryDelay
, SubscriberError (..)
, SubscriptionTrace (..)
) where
import Control.Applicative ((<|>))
import qualified Control.Concurrent.STM as STM
import Control.Exception (SomeException (..))
import Control.Monad (forever, join, unless, when)
import Control.Monad.Fix (MonadFix)
import Data.Foldable (traverse_)
import Data.Set (Set)
import qualified Data.Set as Set
import Data.Void (Void)
import GHC.Stack
import Network.Socket (Family (AF_UNIX))
import Text.Printf
import Control.Monad.Class.MonadAsync
import Control.Monad.Class.MonadSTM.Strict
import Control.Monad.Class.MonadThrow
import Control.Monad.Class.MonadTime
import Control.Monad.Class.MonadTimer
import Control.Tracer
import Ouroboros.Network.ErrorPolicy (CompleteApplication,
CompleteApplicationResult (..), ErrorPolicyTrace,
Result (..), WithAddr)
import Ouroboros.Network.Server.ConnectionTable
import Ouroboros.Network.Snocket (Snocket (..))
import qualified Ouroboros.Network.Snocket as Snocket
import Ouroboros.Network.Subscription.Subscriber
defaultConnectionAttemptDelay :: DiffTime
defaultConnectionAttemptDelay :: DiffTime
defaultConnectionAttemptDelay = DiffTime
0.025
minConnectionAttemptDelay :: DiffTime
minConnectionAttemptDelay :: DiffTime
minConnectionAttemptDelay = DiffTime
0.010
maxConnectionAttemptDelay :: DiffTime
maxConnectionAttemptDelay :: DiffTime
maxConnectionAttemptDelay = DiffTime
2
ipRetryDelay :: DiffTime
ipRetryDelay :: DiffTime
ipRetryDelay = DiffTime
10
data ResOrAct m addr tr r =
Res !(Result addr r)
| Act (Set (Async m ()))
(Maybe tr)
type ResultQ m addr tr r = TQueue m (ResOrAct m addr tr r)
newResultQ :: forall m addr tr r. MonadSTM m => m (ResultQ m addr tr r)
newResultQ :: m (ResultQ m addr tr r)
newResultQ = STM m (ResultQ m addr tr r) -> m (ResultQ m addr tr r)
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m (ResultQ m addr tr r) -> m (ResultQ m addr tr r))
-> STM m (ResultQ m addr tr r) -> m (ResultQ m addr tr r)
forall a b. (a -> b) -> a -> b
$ STM m (ResultQ m addr tr r)
forall (m :: * -> *) a. MonadSTM m => STM m (TQueue m a)
newTQueue
type StateVar m s = StrictTVar m s
type ThreadsVar m = StrictTVar m (Set (Async m ()))
data SocketState m addr
= CreatedSocket !addr !(Async m ())
| ClosedSocket !addr !(Async m ())
type SocketStateChange m s addr = SocketState m addr -> s -> STM m s
type Main m s t = s -> STM m t
data LocalAddresses addr = LocalAddresses {
LocalAddresses addr -> Maybe addr
laIpv4 :: Maybe addr
, LocalAddresses addr -> Maybe addr
laIpv6 :: Maybe addr
, LocalAddresses addr -> Maybe addr
laUnix :: Maybe addr
} deriving (LocalAddresses addr -> LocalAddresses addr -> Bool
(LocalAddresses addr -> LocalAddresses addr -> Bool)
-> (LocalAddresses addr -> LocalAddresses addr -> Bool)
-> Eq (LocalAddresses addr)
forall addr.
Eq addr =>
LocalAddresses addr -> LocalAddresses addr -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: LocalAddresses addr -> LocalAddresses addr -> Bool
$c/= :: forall addr.
Eq addr =>
LocalAddresses addr -> LocalAddresses addr -> Bool
== :: LocalAddresses addr -> LocalAddresses addr -> Bool
$c== :: forall addr.
Eq addr =>
LocalAddresses addr -> LocalAddresses addr -> Bool
Eq, Int -> LocalAddresses addr -> ShowS
[LocalAddresses addr] -> ShowS
LocalAddresses addr -> String
(Int -> LocalAddresses addr -> ShowS)
-> (LocalAddresses addr -> String)
-> ([LocalAddresses addr] -> ShowS)
-> Show (LocalAddresses addr)
forall addr. Show addr => Int -> LocalAddresses addr -> ShowS
forall addr. Show addr => [LocalAddresses addr] -> ShowS
forall addr. Show addr => LocalAddresses addr -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [LocalAddresses addr] -> ShowS
$cshowList :: forall addr. Show addr => [LocalAddresses addr] -> ShowS
show :: LocalAddresses addr -> String
$cshow :: forall addr. Show addr => LocalAddresses addr -> String
showsPrec :: Int -> LocalAddresses addr -> ShowS
$cshowsPrec :: forall addr. Show addr => Int -> LocalAddresses addr -> ShowS
Show)
instance Semigroup (LocalAddresses addr) where
LocalAddresses addr
a <> :: LocalAddresses addr -> LocalAddresses addr -> LocalAddresses addr
<> LocalAddresses addr
b = LocalAddresses :: forall addr.
Maybe addr -> Maybe addr -> Maybe addr -> LocalAddresses addr
LocalAddresses {
laIpv4 :: Maybe addr
laIpv4 = LocalAddresses addr -> Maybe addr
forall addr. LocalAddresses addr -> Maybe addr
laIpv4 LocalAddresses addr
a Maybe addr -> Maybe addr -> Maybe addr
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> LocalAddresses addr -> Maybe addr
forall addr. LocalAddresses addr -> Maybe addr
laIpv4 LocalAddresses addr
b,
laIpv6 :: Maybe addr
laIpv6 = LocalAddresses addr -> Maybe addr
forall addr. LocalAddresses addr -> Maybe addr
laIpv6 LocalAddresses addr
a Maybe addr -> Maybe addr -> Maybe addr
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> LocalAddresses addr -> Maybe addr
forall addr. LocalAddresses addr -> Maybe addr
laIpv6 LocalAddresses addr
b,
laUnix :: Maybe addr
laUnix = LocalAddresses addr -> Maybe addr
forall addr. LocalAddresses addr -> Maybe addr
laUnix LocalAddresses addr
a Maybe addr -> Maybe addr -> Maybe addr
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> LocalAddresses addr -> Maybe addr
forall addr. LocalAddresses addr -> Maybe addr
laUnix LocalAddresses addr
b
}
safeConnect :: ( MonadThrow m
, MonadMask m
)
=> Snocket m sock addr
-> addr
-> addr
-> m ()
-> m ()
-> ((forall x. m x -> m x) -> sock -> Either SomeException () -> m t)
-> m t
safeConnect :: Snocket m sock addr
-> addr
-> addr
-> m ()
-> m ()
-> ((forall x. m x -> m x)
-> sock -> Either SomeException () -> m t)
-> m t
safeConnect Snocket m sock addr
sn addr
remoteAddr addr
localAddr m ()
malloc m ()
mclean (forall x. m x -> m x) -> sock -> Either SomeException () -> m t
k =
m sock -> (sock -> m ()) -> (sock -> m t) -> m t
forall (m :: * -> *) a b c.
MonadThrow m =>
m a -> (a -> m b) -> (a -> m c) -> m c
bracket
(do sock
sock <- Snocket m sock addr -> AddressFamily addr -> m sock
forall (m :: * -> *) fd addr.
Snocket m fd addr -> AddressFamily addr -> m fd
Snocket.open Snocket m sock addr
sn (Snocket m sock addr -> addr -> AddressFamily addr
forall (m :: * -> *) fd addr.
Snocket m fd addr -> addr -> AddressFamily addr
Snocket.addrFamily Snocket m sock addr
sn addr
remoteAddr)
m ()
malloc
sock -> m sock
forall (f :: * -> *) a. Applicative f => a -> f a
pure sock
sock
)
(\sock
sock -> Snocket m sock addr -> sock -> m ()
forall (m :: * -> *) fd addr. Snocket m fd addr -> fd -> m ()
Snocket.close Snocket m sock addr
sn sock
sock m () -> m () -> m ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> m ()
mclean)
(\sock
sock -> ((forall x. m x -> m x) -> m t) -> m t
forall (m :: * -> *) b.
MonadMask m =>
((forall a. m a -> m a) -> m b) -> m b
mask (((forall x. m x -> m x) -> m t) -> m t)
-> ((forall x. m x -> m x) -> m t) -> m t
forall a b. (a -> b) -> a -> b
$ \forall x. m x -> m x
unmask -> do
let doBind :: Bool
doBind = case Snocket m sock addr -> addr -> AddressFamily addr
forall (m :: * -> *) fd addr.
Snocket m fd addr -> addr -> AddressFamily addr
Snocket.addrFamily Snocket m sock addr
sn addr
localAddr of
Snocket.SocketFamily Family
fam -> Family
fam Family -> Family -> Bool
forall a. Eq a => a -> a -> Bool
/= Family
AF_UNIX
AddressFamily addr
_ -> Bool
False
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
doBind (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
Snocket m sock addr -> sock -> addr -> m ()
forall (m :: * -> *) fd addr.
Snocket m fd addr -> fd -> addr -> m ()
Snocket.bind Snocket m sock addr
sn sock
sock addr
localAddr
Either SomeException ()
res :: Either SomeException ()
<- m () -> m (Either SomeException ())
forall (m :: * -> *) e a.
(MonadCatch m, Exception e) =>
m a -> m (Either e a)
try (m () -> m ()
forall x. m x -> m x
unmask (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ Snocket m sock addr -> sock -> addr -> m ()
forall (m :: * -> *) fd addr.
Snocket m fd addr -> fd -> addr -> m ()
Snocket.connect Snocket m sock addr
sn sock
sock addr
remoteAddr)
(forall x. m x -> m x) -> sock -> Either SomeException () -> m t
k forall x. m x -> m x
unmask sock
sock Either SomeException ()
res)
data ConnectResult =
ConnectSuccess
| ConnectSuccessLast
| ConnectValencyExceeded
deriving (ConnectResult -> ConnectResult -> Bool
(ConnectResult -> ConnectResult -> Bool)
-> (ConnectResult -> ConnectResult -> Bool) -> Eq ConnectResult
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: ConnectResult -> ConnectResult -> Bool
$c/= :: ConnectResult -> ConnectResult -> Bool
== :: ConnectResult -> ConnectResult -> Bool
$c== :: ConnectResult -> ConnectResult -> Bool
Eq, Eq ConnectResult
Eq ConnectResult
-> (ConnectResult -> ConnectResult -> Ordering)
-> (ConnectResult -> ConnectResult -> Bool)
-> (ConnectResult -> ConnectResult -> Bool)
-> (ConnectResult -> ConnectResult -> Bool)
-> (ConnectResult -> ConnectResult -> Bool)
-> (ConnectResult -> ConnectResult -> ConnectResult)
-> (ConnectResult -> ConnectResult -> ConnectResult)
-> Ord ConnectResult
ConnectResult -> ConnectResult -> Bool
ConnectResult -> ConnectResult -> Ordering
ConnectResult -> ConnectResult -> ConnectResult
forall a.
Eq a
-> (a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
min :: ConnectResult -> ConnectResult -> ConnectResult
$cmin :: ConnectResult -> ConnectResult -> ConnectResult
max :: ConnectResult -> ConnectResult -> ConnectResult
$cmax :: ConnectResult -> ConnectResult -> ConnectResult
>= :: ConnectResult -> ConnectResult -> Bool
$c>= :: ConnectResult -> ConnectResult -> Bool
> :: ConnectResult -> ConnectResult -> Bool
$c> :: ConnectResult -> ConnectResult -> Bool
<= :: ConnectResult -> ConnectResult -> Bool
$c<= :: ConnectResult -> ConnectResult -> Bool
< :: ConnectResult -> ConnectResult -> Bool
$c< :: ConnectResult -> ConnectResult -> Bool
compare :: ConnectResult -> ConnectResult -> Ordering
$ccompare :: ConnectResult -> ConnectResult -> Ordering
$cp1Ord :: Eq ConnectResult
Ord, Int -> ConnectResult -> ShowS
[ConnectResult] -> ShowS
ConnectResult -> String
(Int -> ConnectResult -> ShowS)
-> (ConnectResult -> String)
-> ([ConnectResult] -> ShowS)
-> Show ConnectResult
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [ConnectResult] -> ShowS
$cshowList :: [ConnectResult] -> ShowS
show :: ConnectResult -> String
$cshow :: ConnectResult -> String
showsPrec :: Int -> ConnectResult -> ShowS
$cshowsPrec :: Int -> ConnectResult -> ShowS
Show)
subscriptionLoop
:: forall m s sock localAddrs addr a x.
( MonadAsync m
, MonadMask m
, MonadTime m
, MonadTimer m
, MonadFix m
, Ord (Async m ())
, Ord addr
)
=> Tracer m (SubscriptionTrace addr)
-> ConnectionTable m addr
-> ResultQ m addr (WithAddr addr ErrorPolicyTrace) a
-> StateVar m s
-> ThreadsVar m
-> Snocket m sock addr
-> WorkerCallbacks m s addr a x
-> WorkerParams m localAddrs addr
-> (sock -> m a)
-> m Void
subscriptionLoop :: Tracer m (SubscriptionTrace addr)
-> ConnectionTable m addr
-> ResultQ m addr (WithAddr addr ErrorPolicyTrace) a
-> StateVar m s
-> ThreadsVar m
-> Snocket m sock addr
-> WorkerCallbacks m s addr a x
-> WorkerParams m localAddrs addr
-> (sock -> m a)
-> m Void
subscriptionLoop
Tracer m (SubscriptionTrace addr)
tr ConnectionTable m addr
tbl ResultQ m addr (WithAddr addr ErrorPolicyTrace) a
resQ StateVar m s
sVar ThreadsVar m
threadsVar Snocket m sock addr
snocket
WorkerCallbacks { wcSocketStateChangeTx :: forall (m :: * -> *) s addr a t.
WorkerCallbacks m s addr a t -> SocketStateChange m s addr
wcSocketStateChangeTx = SocketStateChange m s addr
socketStateChangeTx
, wcCompleteApplicationTx :: forall (m :: * -> *) s addr a t.
WorkerCallbacks m s addr a t -> CompleteApplication m s addr a
wcCompleteApplicationTx = CompleteApplication m s addr a
completeApplicationTx
}
WorkerParams { wpLocalAddresses :: forall (m :: * -> *) (localAddrs :: * -> *) addr.
WorkerParams m localAddrs addr -> localAddrs addr
wpLocalAddresses = localAddrs addr
localAddresses
, wpConnectionAttemptDelay :: forall (m :: * -> *) (localAddrs :: * -> *) addr.
WorkerParams m localAddrs addr -> addr -> Maybe DiffTime
wpConnectionAttemptDelay = addr -> Maybe DiffTime
connectionAttemptDelay
, wpSubscriptionTarget :: forall (m :: * -> *) (localAddrs :: * -> *) addr.
WorkerParams m localAddrs addr -> m (SubscriptionTarget m addr)
wpSubscriptionTarget = m (SubscriptionTarget m addr)
subscriptionTargets
, wpValency :: forall (m :: * -> *) (localAddrs :: * -> *) addr.
WorkerParams m localAddrs addr -> Int
wpValency = Int
valency
, addr -> localAddrs addr -> Maybe addr
wpSelectAddress :: forall (m :: * -> *) (localAddrs :: * -> *) addr.
WorkerParams m localAddrs addr
-> addr -> localAddrs addr -> Maybe addr
wpSelectAddress :: addr -> localAddrs addr -> Maybe addr
wpSelectAddress
}
sock -> m a
k = do
ValencyCounter m
valencyVar <- STM m (ValencyCounter m) -> m (ValencyCounter m)
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m (ValencyCounter m) -> m (ValencyCounter m))
-> STM m (ValencyCounter m) -> m (ValencyCounter m)
forall a b. (a -> b) -> a -> b
$ ConnectionTable m addr -> Int -> STM m (ValencyCounter m)
forall (m :: * -> *) addr.
MonadSTM m =>
ConnectionTable m addr -> Int -> STM m (ValencyCounter m)
newValencyCounter ConnectionTable m addr
tbl Int
valency
m () -> m Void
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (m () -> m Void) -> m () -> m Void
forall a b. (a -> b) -> a -> b
$ do
Tracer m (SubscriptionTrace addr) -> SubscriptionTrace addr -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (SubscriptionTrace addr)
tr (Int -> SubscriptionTrace addr
forall addr. Int -> SubscriptionTrace addr
SubscriptionTraceStart Int
valency)
Time
start <- m Time
forall (m :: * -> *). MonadMonotonicTime m => m Time
getMonotonicTime
ThreadsVar m
conThreads <- STM m (ThreadsVar m) -> m (ThreadsVar m)
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m (ThreadsVar m) -> m (ThreadsVar m))
-> STM m (ThreadsVar m) -> m (ThreadsVar m)
forall a b. (a -> b) -> a -> b
$ Set (Async m ()) -> STM m (ThreadsVar m)
forall (m :: * -> *) a. MonadSTM m => a -> STM m (StrictTVar m a)
newTVar Set (Async m ())
forall a. Set a
Set.empty
SubscriptionTarget m addr
sTarget <- m (SubscriptionTarget m addr)
subscriptionTargets
ThreadsVar m
-> ValencyCounter m -> SubscriptionTarget m addr -> m ()
innerLoop ThreadsVar m
conThreads ValencyCounter m
valencyVar SubscriptionTarget m addr
sTarget
STM m () -> m ()
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ ValencyCounter m -> STM m ()
forall (m :: * -> *). MonadSTM m => ValencyCounter m -> STM m ()
waitValencyCounter ValencyCounter m
valencyVar
DiffTime -> m ()
forall (m :: * -> *). MonadDelay m => DiffTime -> m ()
threadDelay DiffTime
1
Time
end <- m Time
forall (m :: * -> *). MonadMonotonicTime m => m Time
getMonotonicTime
let duration :: DiffTime
duration = Time -> Time -> DiffTime
diffTime Time
end Time
start
Int
currentValency <- STM m Int -> m Int
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m Int -> m Int) -> STM m Int -> m Int
forall a b. (a -> b) -> a -> b
$ ValencyCounter m -> STM m Int
forall (m :: * -> *). MonadSTM m => ValencyCounter m -> STM m Int
readValencyCounter ValencyCounter m
valencyVar
Tracer m (SubscriptionTrace addr) -> SubscriptionTrace addr -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (SubscriptionTrace addr)
tr (SubscriptionTrace addr -> m ()) -> SubscriptionTrace addr -> m ()
forall a b. (a -> b) -> a -> b
$ DiffTime -> Int -> Int -> SubscriptionTrace addr
forall addr. DiffTime -> Int -> Int -> SubscriptionTrace addr
SubscriptionTraceRestart DiffTime
duration Int
valency
(Int
valency Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
currentValency)
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (DiffTime
duration DiffTime -> DiffTime -> Bool
forall a. Ord a => a -> a -> Bool
< DiffTime
ipRetryDelay) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
DiffTime -> m ()
forall (m :: * -> *). MonadDelay m => DiffTime -> m ()
threadDelay (DiffTime -> m ()) -> DiffTime -> m ()
forall a b. (a -> b) -> a -> b
$ DiffTime
ipRetryDelay DiffTime -> DiffTime -> DiffTime
forall a. Num a => a -> a -> a
- DiffTime
duration
where
innerLoop :: StrictTVar m (Set (Async m ()))
-> ValencyCounter m
-> SubscriptionTarget m addr
-> m ()
innerLoop :: ThreadsVar m
-> ValencyCounter m -> SubscriptionTarget m addr -> m ()
innerLoop ThreadsVar m
conThreads ValencyCounter m
valencyVar SubscriptionTarget m addr
sTarget = do
Maybe (addr, SubscriptionTarget m addr)
mt <- SubscriptionTarget m addr
-> m (Maybe (addr, SubscriptionTarget m addr))
forall (m :: * -> *) target.
SubscriptionTarget m target
-> m (Maybe (target, SubscriptionTarget m target))
getSubscriptionTarget SubscriptionTarget m addr
sTarget
case Maybe (addr, SubscriptionTarget m addr)
mt of
Maybe (addr, SubscriptionTarget m addr)
Nothing -> do
Int
len <- (Set (Async m ()) -> Int) -> m (Set (Async m ())) -> m Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Set (Async m ()) -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length (m (Set (Async m ())) -> m Int) -> m (Set (Async m ())) -> m Int
forall a b. (a -> b) -> a -> b
$ STM m (Set (Async m ())) -> m (Set (Async m ()))
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m (Set (Async m ())) -> m (Set (Async m ())))
-> STM m (Set (Async m ())) -> m (Set (Async m ()))
forall a b. (a -> b) -> a -> b
$ ThreadsVar m -> STM m (Set (Async m ()))
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar ThreadsVar m
conThreads
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
len Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
Tracer m (SubscriptionTrace addr) -> SubscriptionTrace addr -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (SubscriptionTrace addr)
tr (SubscriptionTrace addr -> m ()) -> SubscriptionTrace addr -> m ()
forall a b. (a -> b) -> a -> b
$ Int -> SubscriptionTrace addr
forall addr. Int -> SubscriptionTrace addr
SubscriptionTraceSubscriptionWaiting Int
len
STM m () -> m ()
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
Set (Async m ())
activeCons <- ThreadsVar m -> STM m (Set (Async m ()))
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar ThreadsVar m
conThreads
Bool -> STM m () -> STM m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (Set (Async m ()) -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null Set (Async m ())
activeCons) STM m ()
forall (m :: * -> *) a. MonadSTM m => STM m a
retry
Int
valencyLeft <- STM m Int -> m Int
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m Int -> m Int) -> STM m Int -> m Int
forall a b. (a -> b) -> a -> b
$ ValencyCounter m -> STM m Int
forall (m :: * -> *). MonadSTM m => ValencyCounter m -> STM m Int
readValencyCounter ValencyCounter m
valencyVar
if Int
valencyLeft Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
0
then Tracer m (SubscriptionTrace addr) -> SubscriptionTrace addr -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (SubscriptionTrace addr)
tr SubscriptionTrace addr
forall addr. SubscriptionTrace addr
SubscriptionTraceSubscriptionRunning
else Tracer m (SubscriptionTrace addr) -> SubscriptionTrace addr -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (SubscriptionTrace addr)
tr SubscriptionTrace addr
forall addr. SubscriptionTrace addr
SubscriptionTraceSubscriptionFailed
Just (addr
remoteAddr, SubscriptionTarget m addr
sTargetNext) -> do
Int
valencyLeft <- STM m Int -> m Int
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m Int -> m Int) -> STM m Int -> m Int
forall a b. (a -> b) -> a -> b
$ ValencyCounter m -> STM m Int
forall (m :: * -> *). MonadSTM m => ValencyCounter m -> STM m Int
readValencyCounter ValencyCounter m
valencyVar
if Int
valencyLeft Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
0
then Tracer m (SubscriptionTrace addr) -> SubscriptionTrace addr -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (SubscriptionTrace addr)
tr SubscriptionTrace addr
forall addr. SubscriptionTrace addr
SubscriptionTraceSubscriptionRunning
else ThreadsVar m
-> ValencyCounter m -> addr -> SubscriptionTarget m addr -> m ()
innerStep ThreadsVar m
conThreads ValencyCounter m
valencyVar addr
remoteAddr SubscriptionTarget m addr
sTargetNext
innerStep :: StrictTVar m (Set (Async m ()))
-> ValencyCounter m
-> addr
-> SubscriptionTarget m addr
-> m ()
innerStep :: ThreadsVar m
-> ValencyCounter m -> addr -> SubscriptionTarget m addr -> m ()
innerStep ThreadsVar m
conThreads ValencyCounter m
valencyVar !addr
remoteAddr SubscriptionTarget m addr
sTargetNext = do
ConnectionTableRef
r <- ConnectionTable m addr
-> addr -> ValencyCounter m -> m ConnectionTableRef
forall (m :: * -> *) addr.
(MonadSTM m, Ord addr) =>
ConnectionTable m addr
-> addr -> ValencyCounter m -> m ConnectionTableRef
refConnection ConnectionTable m addr
tbl addr
remoteAddr ValencyCounter m
valencyVar
case ConnectionTableRef
r of
ConnectionTableRef
ConnectionTableCreate ->
case addr -> localAddrs addr -> Maybe addr
wpSelectAddress addr
remoteAddr localAddrs addr
localAddresses of
Maybe addr
Nothing ->
Tracer m (SubscriptionTrace addr) -> SubscriptionTrace addr -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (SubscriptionTrace addr)
tr (addr -> SubscriptionTrace addr
forall addr. addr -> SubscriptionTrace addr
SubscriptionTraceUnsupportedRemoteAddr addr
remoteAddr)
Just addr
localAddr ->
do rec
Async m ()
thread <- m () -> m (Async m ())
forall (m :: * -> *) a. MonadAsync m => m a -> m (Async m a)
async (m () -> m (Async m ())) -> m () -> m (Async m ())
forall a b. (a -> b) -> a -> b
$ do
Tracer m (SubscriptionTrace addr) -> SubscriptionTrace addr -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (SubscriptionTrace addr)
tr (SubscriptionTrace addr -> m ()) -> SubscriptionTrace addr -> m ()
forall a b. (a -> b) -> a -> b
$ addr -> SubscriptionTrace addr
forall addr. addr -> SubscriptionTrace addr
SubscriptionTraceConnectStart addr
remoteAddr
Snocket m sock addr
-> addr
-> addr
-> m ()
-> m ()
-> ((forall x. m x -> m x)
-> sock -> Either SomeException () -> m ())
-> m ()
forall (m :: * -> *) sock addr t.
(MonadThrow m, MonadMask m) =>
Snocket m sock addr
-> addr
-> addr
-> m ()
-> m ()
-> ((forall x. m x -> m x)
-> sock -> Either SomeException () -> m t)
-> m t
safeConnect
Snocket m sock addr
snocket
addr
remoteAddr
addr
localAddr
(do
Tracer m (SubscriptionTrace addr) -> SubscriptionTrace addr -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (SubscriptionTrace addr)
tr (SubscriptionTrace addr -> m ()) -> SubscriptionTrace addr -> m ()
forall a b. (a -> b) -> a -> b
$ addr -> SubscriptionTrace addr
forall addr. addr -> SubscriptionTrace addr
SubscriptionTraceAllocateSocket addr
remoteAddr
STM m () -> m ()
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
ThreadsVar m -> (Set (Async m ()) -> Set (Async m ())) -> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
StrictTVar m a -> (a -> a) -> STM m ()
modifyTVar ThreadsVar m
conThreads (Async m () -> Set (Async m ()) -> Set (Async m ())
forall a. Ord a => a -> Set a -> Set a
Set.insert Async m ()
thread)
ThreadsVar m -> (Set (Async m ()) -> Set (Async m ())) -> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
StrictTVar m a -> (a -> a) -> STM m ()
modifyTVar ThreadsVar m
threadsVar (Async m () -> Set (Async m ()) -> Set (Async m ())
forall a. Ord a => a -> Set a -> Set a
Set.insert Async m ()
thread)
StateVar m s -> STM m s
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StateVar m s
sVar
STM m s -> (s -> STM m s) -> STM m s
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= SocketStateChange m s addr
socketStateChangeTx (addr -> Async m () -> SocketState m addr
forall (m :: * -> *) addr. addr -> Async m () -> SocketState m addr
CreatedSocket addr
remoteAddr Async m ()
thread)
STM m s -> (s -> STM m ()) -> STM m ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (StateVar m s -> s -> STM m ()
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
StrictTVar m a -> a -> STM m ()
writeTVar StateVar m s
sVar (s -> STM m ()) -> s -> STM m ()
forall a b. (a -> b) -> a -> b
$!))
(do
STM m () -> m ()
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
ThreadsVar m -> (Set (Async m ()) -> Set (Async m ())) -> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
StrictTVar m a -> (a -> a) -> STM m ()
modifyTVar ThreadsVar m
threadsVar (Async m () -> Set (Async m ()) -> Set (Async m ())
forall a. Ord a => a -> Set a -> Set a
Set.delete Async m ()
thread)
StateVar m s -> STM m s
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StateVar m s
sVar
STM m s -> (s -> STM m s) -> STM m s
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= SocketStateChange m s addr
socketStateChangeTx (addr -> Async m () -> SocketState m addr
forall (m :: * -> *) addr. addr -> Async m () -> SocketState m addr
ClosedSocket addr
remoteAddr Async m ()
thread)
STM m s -> (s -> STM m ()) -> STM m ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (StateVar m s -> s -> STM m ()
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
StrictTVar m a -> a -> STM m ()
writeTVar StateVar m s
sVar (s -> STM m ()) -> s -> STM m ()
forall a b. (a -> b) -> a -> b
$!)
Tracer m (SubscriptionTrace addr) -> SubscriptionTrace addr -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (SubscriptionTrace addr)
tr (SubscriptionTrace addr -> m ()) -> SubscriptionTrace addr -> m ()
forall a b. (a -> b) -> a -> b
$ addr -> SubscriptionTrace addr
forall addr. addr -> SubscriptionTrace addr
SubscriptionTraceCloseSocket addr
remoteAddr)
(Async m ()
-> ThreadsVar m
-> ValencyCounter m
-> addr
-> (forall x. m x -> m x)
-> sock
-> Either SomeException ()
-> m ()
connAction
Async m ()
thread ThreadsVar m
conThreads ValencyCounter m
valencyVar
addr
remoteAddr)
let delay :: DiffTime
delay = case addr -> Maybe DiffTime
connectionAttemptDelay addr
remoteAddr of
Just DiffTime
d -> DiffTime
d DiffTime -> DiffTime -> DiffTime
forall a. Ord a => a -> a -> a
`max` DiffTime
minConnectionAttemptDelay
DiffTime -> DiffTime -> DiffTime
forall a. Ord a => a -> a -> a
`min` DiffTime
maxConnectionAttemptDelay
Maybe DiffTime
Nothing -> DiffTime
defaultConnectionAttemptDelay
Tracer m (SubscriptionTrace addr) -> SubscriptionTrace addr -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (SubscriptionTrace addr)
tr
(DiffTime -> SubscriptionTrace addr
forall addr. DiffTime -> SubscriptionTrace addr
SubscriptionTraceSubscriptionWaitingNewConnection DiffTime
delay)
DiffTime -> m ()
forall (m :: * -> *). MonadDelay m => DiffTime -> m ()
threadDelay DiffTime
delay
ConnectionTableRef
ConnectionTableExist ->
Tracer m (SubscriptionTrace addr) -> SubscriptionTrace addr -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (SubscriptionTrace addr)
tr (SubscriptionTrace addr -> m ()) -> SubscriptionTrace addr -> m ()
forall a b. (a -> b) -> a -> b
$ addr -> SubscriptionTrace addr
forall addr. addr -> SubscriptionTrace addr
SubscriptionTraceConnectionExist addr
remoteAddr
ConnectionTableRef
ConnectionTableDuplicate -> () -> m ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
ThreadsVar m
-> ValencyCounter m -> SubscriptionTarget m addr -> m ()
innerLoop ThreadsVar m
conThreads ValencyCounter m
valencyVar SubscriptionTarget m addr
sTargetNext
connAction :: Async m ()
-> StrictTVar m (Set (Async m ()))
-> ValencyCounter m
-> addr
-> (forall y. m y -> m y)
-> sock
-> Either SomeException ()
-> m ()
connAction :: Async m ()
-> ThreadsVar m
-> ValencyCounter m
-> addr
-> (forall x. m x -> m x)
-> sock
-> Either SomeException ()
-> m ()
connAction Async m ()
thread ThreadsVar m
conThreads ValencyCounter m
valencyVar addr
remoteAddr forall x. m x -> m x
unmask sock
sock Either SomeException ()
connectionRes = do
addr
localAddr <- Snocket m sock addr -> sock -> m addr
forall (m :: * -> *) fd addr. Snocket m fd addr -> fd -> m addr
Snocket.getLocalAddr Snocket m sock addr
snocket sock
sock
Time
t <- m Time
forall (m :: * -> *). MonadMonotonicTime m => m Time
getMonotonicTime
case Either SomeException ()
connectionRes of
Left (SomeException e
e) -> do
Tracer m (SubscriptionTrace addr) -> SubscriptionTrace addr -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (SubscriptionTrace addr)
tr (SubscriptionTrace addr -> m ()) -> SubscriptionTrace addr -> m ()
forall a b. (a -> b) -> a -> b
$ addr -> e -> SubscriptionTrace addr
forall addr e. Exception e => addr -> e -> SubscriptionTrace addr
SubscriptionTraceConnectException addr
remoteAddr e
e
STM m () -> m ()
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
ThreadsVar m -> (Set (Async m ()) -> Set (Async m ())) -> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
StrictTVar m a -> (a -> a) -> STM m ()
modifyTVar ThreadsVar m
conThreads (Async m () -> Set (Async m ()) -> Set (Async m ())
forall a. Ord a => a -> Set a -> Set a
Set.delete Async m ()
thread)
CompleteApplicationResult
{ s
carState :: forall (m :: * -> *) addr s.
CompleteApplicationResult m addr s -> s
carState :: s
carState
, Set (Async m ())
carThreads :: forall (m :: * -> *) addr s.
CompleteApplicationResult m addr s -> Set (Async m ())
carThreads :: Set (Async m ())
carThreads
, Maybe (WithAddr addr ErrorPolicyTrace)
carTrace :: forall (m :: * -> *) addr s.
CompleteApplicationResult m addr s
-> Maybe (WithAddr addr ErrorPolicyTrace)
carTrace :: Maybe (WithAddr addr ErrorPolicyTrace)
carTrace
} <- StateVar m s -> STM m s
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StateVar m s
sVar STM m s
-> (s -> STM m (CompleteApplicationResult m addr s))
-> STM m (CompleteApplicationResult m addr s)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= CompleteApplication m s addr a
completeApplicationTx (Time -> addr -> e -> Result addr a
forall e addr r. Exception e => Time -> addr -> e -> Result addr r
ConnectionError Time
t addr
remoteAddr e
e)
StateVar m s -> s -> STM m ()
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
StrictTVar m a -> a -> STM m ()
writeTVar StateVar m s
sVar s
carState
ResultQ m addr (WithAddr addr ErrorPolicyTrace) a
-> ResOrAct m addr (WithAddr addr ErrorPolicyTrace) a -> STM m ()
forall (m :: * -> *) a. MonadSTM m => TQueue m a -> a -> STM m ()
writeTQueue ResultQ m addr (WithAddr addr ErrorPolicyTrace) a
resQ (Set (Async m ())
-> Maybe (WithAddr addr ErrorPolicyTrace)
-> ResOrAct m addr (WithAddr addr ErrorPolicyTrace) a
forall (m :: * -> *) addr tr r.
Set (Async m ()) -> Maybe tr -> ResOrAct m addr tr r
Act Set (Async m ())
carThreads Maybe (WithAddr addr ErrorPolicyTrace)
carTrace)
Right ()
_ -> do
ConnectResult
connRes <- STM m ConnectResult -> m ConnectResult
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m ConnectResult -> m ConnectResult)
-> STM m ConnectResult -> m ConnectResult
forall a b. (a -> b) -> a -> b
$ do
ThreadsVar m -> (Set (Async m ()) -> Set (Async m ())) -> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
StrictTVar m a -> (a -> a) -> STM m ()
modifyTVar ThreadsVar m
conThreads (Async m () -> Set (Async m ()) -> Set (Async m ())
forall a. Ord a => a -> Set a -> Set a
Set.delete Async m ()
thread)
Int
v <- ValencyCounter m -> STM m Int
forall (m :: * -> *). MonadSTM m => ValencyCounter m -> STM m Int
readValencyCounter ValencyCounter m
valencyVar
if Int
v Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0
then do
ConnectionTable m addr
-> addr -> addr -> Maybe (ValencyCounter m) -> STM m ()
forall (m :: * -> *) addr.
(MonadSTM m, Ord addr) =>
ConnectionTable m addr
-> addr -> addr -> Maybe (ValencyCounter m) -> STM m ()
addConnection ConnectionTable m addr
tbl addr
remoteAddr addr
localAddr (ValencyCounter m -> Maybe (ValencyCounter m)
forall a. a -> Maybe a
Just ValencyCounter m
valencyVar)
CompleteApplicationResult
{ s
carState :: s
carState :: forall (m :: * -> *) addr s.
CompleteApplicationResult m addr s -> s
carState
, Set (Async m ())
carThreads :: Set (Async m ())
carThreads :: forall (m :: * -> *) addr s.
CompleteApplicationResult m addr s -> Set (Async m ())
carThreads
, Maybe (WithAddr addr ErrorPolicyTrace)
carTrace :: Maybe (WithAddr addr ErrorPolicyTrace)
carTrace :: forall (m :: * -> *) addr s.
CompleteApplicationResult m addr s
-> Maybe (WithAddr addr ErrorPolicyTrace)
carTrace
} <- StateVar m s -> STM m s
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StateVar m s
sVar STM m s
-> (s -> STM m (CompleteApplicationResult m addr s))
-> STM m (CompleteApplicationResult m addr s)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= CompleteApplication m s addr a
completeApplicationTx (Time -> addr -> Result addr a
forall addr r. Time -> addr -> Result addr r
Connected Time
t addr
remoteAddr)
StateVar m s -> s -> STM m ()
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
StrictTVar m a -> a -> STM m ()
writeTVar StateVar m s
sVar s
carState
ResultQ m addr (WithAddr addr ErrorPolicyTrace) a
-> ResOrAct m addr (WithAddr addr ErrorPolicyTrace) a -> STM m ()
forall (m :: * -> *) a. MonadSTM m => TQueue m a -> a -> STM m ()
writeTQueue ResultQ m addr (WithAddr addr ErrorPolicyTrace) a
resQ (Set (Async m ())
-> Maybe (WithAddr addr ErrorPolicyTrace)
-> ResOrAct m addr (WithAddr addr ErrorPolicyTrace) a
forall (m :: * -> *) addr tr r.
Set (Async m ()) -> Maybe tr -> ResOrAct m addr tr r
Act Set (Async m ())
carThreads Maybe (WithAddr addr ErrorPolicyTrace)
carTrace)
ConnectResult -> STM m ConnectResult
forall (m :: * -> *) a. Monad m => a -> m a
return (ConnectResult -> STM m ConnectResult)
-> ConnectResult -> STM m ConnectResult
forall a b. (a -> b) -> a -> b
$ if Int
v Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
1
then ConnectResult
ConnectSuccessLast
else ConnectResult
ConnectSuccess
else
ConnectResult -> STM m ConnectResult
forall (m :: * -> *) a. Monad m => a -> m a
return ConnectResult
ConnectValencyExceeded
Tracer m (SubscriptionTrace addr) -> SubscriptionTrace addr -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (SubscriptionTrace addr)
tr (SubscriptionTrace addr -> m ()) -> SubscriptionTrace addr -> m ()
forall a b. (a -> b) -> a -> b
$ addr -> ConnectResult -> SubscriptionTrace addr
forall addr. addr -> ConnectResult -> SubscriptionTrace addr
SubscriptionTraceConnectEnd addr
remoteAddr ConnectResult
connRes
case ConnectResult
connRes of
ConnectResult
ConnectValencyExceeded -> () -> m ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
ConnectResult
_ -> do
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (ConnectResult
connRes ConnectResult -> ConnectResult -> Bool
forall a. Eq a => a -> a -> Bool
== ConnectResult
ConnectSuccessLast) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
Set (Async m ())
threads <- STM m (Set (Async m ())) -> m (Set (Async m ()))
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m (Set (Async m ())) -> m (Set (Async m ())))
-> STM m (Set (Async m ())) -> m (Set (Async m ()))
forall a b. (a -> b) -> a -> b
$ ThreadsVar m -> STM m (Set (Async m ()))
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar ThreadsVar m
conThreads
(Async m () -> m ()) -> Set (Async m ()) -> m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (\Async m ()
tid ->
Async m () -> SubscriberError -> m ()
forall (m :: * -> *) e a.
(MonadAsync m, Exception e) =>
Async m a -> e -> m ()
cancelWith Async m ()
tid
(SubscriberErrorType -> String -> CallStack -> SubscriberError
SubscriberError
SubscriberErrorType
SubscriberParallelConnectionCancelled
String
"Parallel connection cancelled"
CallStack
HasCallStack => CallStack
callStack)
)Set (Async m ())
threads
Either SomeException a
appRes :: Either SomeException a
<- m a -> m (Either SomeException a)
forall (m :: * -> *) e a.
(MonadCatch m, Exception e) =>
m a -> m (Either e a)
try (m a -> m (Either SomeException a))
-> m a -> m (Either SomeException a)
forall a b. (a -> b) -> a -> b
$ m a -> m a
forall x. m x -> m x
unmask (sock -> m a
k sock
sock)
case Either SomeException a
appRes of
Right a
_ -> () -> m ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
Left SomeException
e -> Tracer m (SubscriptionTrace addr) -> SubscriptionTrace addr -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (SubscriptionTrace addr)
tr (SubscriptionTrace addr -> m ()) -> SubscriptionTrace addr -> m ()
forall a b. (a -> b) -> a -> b
$ addr -> SomeException -> SubscriptionTrace addr
forall addr e. Exception e => addr -> e -> SubscriptionTrace addr
SubscriptionTraceApplicationException addr
remoteAddr SomeException
e
Time
t' <- m Time
forall (m :: * -> *). MonadMonotonicTime m => m Time
getMonotonicTime
STM m () -> m ()
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
case Either SomeException a
appRes of
Right a
a ->
ResultQ m addr (WithAddr addr ErrorPolicyTrace) a
-> ResOrAct m addr (WithAddr addr ErrorPolicyTrace) a -> STM m ()
forall (m :: * -> *) a. MonadSTM m => TQueue m a -> a -> STM m ()
writeTQueue ResultQ m addr (WithAddr addr ErrorPolicyTrace) a
resQ (Result addr a -> ResOrAct m addr (WithAddr addr ErrorPolicyTrace) a
forall (m :: * -> *) addr tr r.
Result addr r -> ResOrAct m addr tr r
Res (Time -> addr -> a -> Result addr a
forall addr r. Time -> addr -> r -> Result addr r
ApplicationResult Time
t' addr
remoteAddr a
a))
Left (SomeException e
e) ->
ResultQ m addr (WithAddr addr ErrorPolicyTrace) a
-> ResOrAct m addr (WithAddr addr ErrorPolicyTrace) a -> STM m ()
forall (m :: * -> *) a. MonadSTM m => TQueue m a -> a -> STM m ()
writeTQueue ResultQ m addr (WithAddr addr ErrorPolicyTrace) a
resQ (Result addr a -> ResOrAct m addr (WithAddr addr ErrorPolicyTrace) a
forall (m :: * -> *) addr tr r.
Result addr r -> ResOrAct m addr tr r
Res (Time -> addr -> e -> Result addr a
forall e addr r. Exception e => Time -> addr -> e -> Result addr r
ApplicationError Time
t' addr
remoteAddr e
e))
ConnectionTable m addr -> addr -> addr -> STM m ()
forall (m :: * -> *) addr.
(MonadSTM m, Ord addr) =>
ConnectionTable m addr -> addr -> addr -> STM m ()
removeConnectionSTM ConnectionTable m addr
tbl addr
remoteAddr addr
localAddr
mainLoop
:: forall s r addr t.
Tracer IO (WithAddr addr ErrorPolicyTrace)
-> ResultQ IO addr (WithAddr addr ErrorPolicyTrace) r
-> ThreadsVar IO
-> StateVar IO s
-> CompleteApplication IO s addr r
-> Main IO s t
-> IO t
mainLoop :: Tracer IO (WithAddr addr ErrorPolicyTrace)
-> ResultQ IO addr (WithAddr addr ErrorPolicyTrace) r
-> ThreadsVar IO
-> StateVar IO s
-> CompleteApplication IO s addr r
-> Main IO s t
-> IO t
mainLoop Tracer IO (WithAddr addr ErrorPolicyTrace)
errorPolicyTracer ResultQ IO addr (WithAddr addr ErrorPolicyTrace) r
resQ ThreadsVar IO
threadsVar StateVar IO s
statusVar CompleteApplication IO s addr r
completeApplicationTx Main IO s t
main = do
IO (IO t) -> IO t
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join (STM IO (IO t) -> IO (IO t)
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM IO (IO t) -> IO (IO t)) -> STM IO (IO t) -> IO (IO t)
forall a b. (a -> b) -> a -> b
$ STM (IO t)
STM IO (IO t)
mainTx STM (IO t) -> STM (IO t) -> STM (IO t)
forall a. STM a -> STM a -> STM a
`STM.orElse` STM (IO t)
STM IO (IO t)
connectionTx)
where
mainTx :: STM IO (IO t)
mainTx :: STM IO (IO t)
mainTx = do
t
t <- StateVar IO s -> STM IO s
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StateVar IO s
statusVar STM s -> (s -> STM t) -> STM t
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= s -> STM t
Main IO s t
main
IO t -> STM (IO t)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (IO t -> STM (IO t)) -> IO t -> STM (IO t)
forall a b. (a -> b) -> a -> b
$ t -> IO t
forall (f :: * -> *) a. Applicative f => a -> f a
pure t
t
connectionTx :: STM IO (IO t)
connectionTx :: STM IO (IO t)
connectionTx = do
ResOrAct IO addr (WithAddr addr ErrorPolicyTrace) r
result <- TQueue (ResOrAct IO addr (WithAddr addr ErrorPolicyTrace) r)
-> STM (ResOrAct IO addr (WithAddr addr ErrorPolicyTrace) r)
forall a. TQueue a -> STM a
STM.readTQueue ResultQ IO addr (WithAddr addr ErrorPolicyTrace) r
TQueue (ResOrAct IO addr (WithAddr addr ErrorPolicyTrace) r)
resQ
case ResOrAct IO addr (WithAddr addr ErrorPolicyTrace) r
result of
Act Set (Async IO ())
threads Maybe (WithAddr addr ErrorPolicyTrace)
tr -> IO t -> STM (IO t)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (IO t -> STM (IO t)) -> IO t -> STM (IO t)
forall a b. (a -> b) -> a -> b
$ do
(Async () -> IO ()) -> Set (Async ()) -> IO ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
(a -> f b) -> t a -> f ()
traverse_ Async () -> IO ()
forall (m :: * -> *) a. MonadAsync m => Async m a -> m ()
cancel Set (Async ())
Set (Async IO ())
threads
(WithAddr addr ErrorPolicyTrace -> IO ())
-> Maybe (WithAddr addr ErrorPolicyTrace) -> IO ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
(a -> f b) -> t a -> f ()
traverse_ (Tracer IO (WithAddr addr ErrorPolicyTrace)
-> WithAddr addr ErrorPolicyTrace -> IO ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer IO (WithAddr addr ErrorPolicyTrace)
errorPolicyTracer) Maybe (WithAddr addr ErrorPolicyTrace)
tr
Tracer IO (WithAddr addr ErrorPolicyTrace)
-> ResultQ IO addr (WithAddr addr ErrorPolicyTrace) r
-> ThreadsVar IO
-> StateVar IO s
-> CompleteApplication IO s addr r
-> Main IO s t
-> IO t
forall s r addr t.
Tracer IO (WithAddr addr ErrorPolicyTrace)
-> ResultQ IO addr (WithAddr addr ErrorPolicyTrace) r
-> ThreadsVar IO
-> StateVar IO s
-> CompleteApplication IO s addr r
-> Main IO s t
-> IO t
mainLoop Tracer IO (WithAddr addr ErrorPolicyTrace)
errorPolicyTracer ResultQ IO addr (WithAddr addr ErrorPolicyTrace) r
resQ ThreadsVar IO
threadsVar StateVar IO s
statusVar CompleteApplication IO s addr r
completeApplicationTx Main IO s t
main
Res Result addr r
r -> do
s
s <- StateVar IO s -> STM IO s
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StateVar IO s
statusVar
CompleteApplicationResult
{ s
carState :: s
carState :: forall (m :: * -> *) addr s.
CompleteApplicationResult m addr s -> s
carState
, Set (Async IO ())
carThreads :: Set (Async IO ())
carThreads :: forall (m :: * -> *) addr s.
CompleteApplicationResult m addr s -> Set (Async m ())
carThreads
, Maybe (WithAddr addr ErrorPolicyTrace)
carTrace :: Maybe (WithAddr addr ErrorPolicyTrace)
carTrace :: forall (m :: * -> *) addr s.
CompleteApplicationResult m addr s
-> Maybe (WithAddr addr ErrorPolicyTrace)
carTrace
} <- CompleteApplication IO s addr r
completeApplicationTx Result addr r
r s
s
StateVar IO s -> s -> STM IO ()
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
StrictTVar m a -> a -> STM m ()
writeTVar StateVar IO s
statusVar s
carState
IO t -> STM (IO t)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (IO t -> STM (IO t)) -> IO t -> STM (IO t)
forall a b. (a -> b) -> a -> b
$ do
(Async () -> IO ()) -> Set (Async ()) -> IO ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
(a -> f b) -> t a -> f ()
traverse_ Async () -> IO ()
forall (m :: * -> *) a. MonadAsync m => Async m a -> m ()
cancel Set (Async ())
Set (Async IO ())
carThreads
(WithAddr addr ErrorPolicyTrace -> IO ())
-> Maybe (WithAddr addr ErrorPolicyTrace) -> IO ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
(a -> f b) -> t a -> f ()
traverse_ (Tracer IO (WithAddr addr ErrorPolicyTrace)
-> WithAddr addr ErrorPolicyTrace -> IO ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer IO (WithAddr addr ErrorPolicyTrace)
errorPolicyTracer) Maybe (WithAddr addr ErrorPolicyTrace)
carTrace
Tracer IO (WithAddr addr ErrorPolicyTrace)
-> ResultQ IO addr (WithAddr addr ErrorPolicyTrace) r
-> ThreadsVar IO
-> StateVar IO s
-> CompleteApplication IO s addr r
-> Main IO s t
-> IO t
forall s r addr t.
Tracer IO (WithAddr addr ErrorPolicyTrace)
-> ResultQ IO addr (WithAddr addr ErrorPolicyTrace) r
-> ThreadsVar IO
-> StateVar IO s
-> CompleteApplication IO s addr r
-> Main IO s t
-> IO t
mainLoop Tracer IO (WithAddr addr ErrorPolicyTrace)
errorPolicyTracer ResultQ IO addr (WithAddr addr ErrorPolicyTrace) r
resQ ThreadsVar IO
threadsVar StateVar IO s
statusVar CompleteApplication IO s addr r
completeApplicationTx Main IO s t
main
data WorkerCallbacks m s addr a t = WorkerCallbacks {
WorkerCallbacks m s addr a t -> SocketStateChange m s addr
wcSocketStateChangeTx :: SocketStateChange m s addr,
WorkerCallbacks m s addr a t -> CompleteApplication m s addr a
wcCompleteApplicationTx :: CompleteApplication m s addr a,
WorkerCallbacks m s addr a t -> Main m s t
wcMainTx :: Main m s t
}
data WorkerParams m localAddrs addr = WorkerParams {
WorkerParams m localAddrs addr -> localAddrs addr
wpLocalAddresses :: localAddrs addr,
WorkerParams m localAddrs addr
-> addr -> localAddrs addr -> Maybe addr
wpSelectAddress :: addr -> localAddrs addr -> Maybe addr,
WorkerParams m localAddrs addr -> addr -> Maybe DiffTime
wpConnectionAttemptDelay :: addr -> Maybe DiffTime,
WorkerParams m localAddrs addr -> m (SubscriptionTarget m addr)
wpSubscriptionTarget :: m (SubscriptionTarget m addr),
WorkerParams m localAddrs addr -> Int
wpValency :: Int
}
worker
:: forall s sock localAddrs addr a x.
Ord addr
=> Tracer IO (SubscriptionTrace addr)
-> Tracer IO (WithAddr addr ErrorPolicyTrace)
-> ConnectionTable IO addr
-> StateVar IO s
-> Snocket IO sock addr
-> WorkerCallbacks IO s addr a x
-> WorkerParams IO localAddrs addr
-> (sock -> IO a)
-> IO x
worker :: Tracer IO (SubscriptionTrace addr)
-> Tracer IO (WithAddr addr ErrorPolicyTrace)
-> ConnectionTable IO addr
-> StateVar IO s
-> Snocket IO sock addr
-> WorkerCallbacks IO s addr a x
-> WorkerParams IO localAddrs addr
-> (sock -> IO a)
-> IO x
worker Tracer IO (SubscriptionTrace addr)
tr Tracer IO (WithAddr addr ErrorPolicyTrace)
errTrace ConnectionTable IO addr
tbl StateVar IO s
sVar Snocket IO sock addr
snocket workerCallbacks :: WorkerCallbacks IO s addr a x
workerCallbacks@WorkerCallbacks {CompleteApplication IO s addr a
wcCompleteApplicationTx :: CompleteApplication IO s addr a
wcCompleteApplicationTx :: forall (m :: * -> *) s addr a t.
WorkerCallbacks m s addr a t -> CompleteApplication m s addr a
wcCompleteApplicationTx, Main IO s x
wcMainTx :: Main IO s x
wcMainTx :: forall (m :: * -> *) s addr a t.
WorkerCallbacks m s addr a t -> Main m s t
wcMainTx } WorkerParams IO localAddrs addr
workerParams sock -> IO a
k = do
TQueue (ResOrAct IO addr (WithAddr addr ErrorPolicyTrace) a)
resQ <- IO (TQueue (ResOrAct IO addr (WithAddr addr ErrorPolicyTrace) a))
forall (m :: * -> *) addr tr r.
MonadSTM m =>
m (ResultQ m addr tr r)
newResultQ
StrictTVar IO (Set (Async ()))
threadsVar <- STM IO (StrictTVar IO (Set (Async ())))
-> IO (StrictTVar IO (Set (Async ())))
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM IO (StrictTVar IO (Set (Async ())))
-> IO (StrictTVar IO (Set (Async ()))))
-> STM IO (StrictTVar IO (Set (Async ())))
-> IO (StrictTVar IO (Set (Async ())))
forall a b. (a -> b) -> a -> b
$ Set (Async ()) -> STM IO (StrictTVar IO (Set (Async ())))
forall (m :: * -> *) a. MonadSTM m => a -> STM m (StrictTVar m a)
newTVar Set (Async ())
forall a. Set a
Set.empty
IO Void -> (Async IO Void -> IO x) -> IO x
forall (m :: * -> *) a b.
MonadAsync m =>
m a -> (Async m a -> m b) -> m b
withAsync
(Tracer IO (SubscriptionTrace addr)
-> ConnectionTable IO addr
-> ResultQ IO addr (WithAddr addr ErrorPolicyTrace) a
-> StateVar IO s
-> ThreadsVar IO
-> Snocket IO sock addr
-> WorkerCallbacks IO s addr a x
-> WorkerParams IO localAddrs addr
-> (sock -> IO a)
-> IO Void
forall (m :: * -> *) s sock (localAddrs :: * -> *) addr a x.
(MonadAsync m, MonadMask m, MonadTime m, MonadTimer m, MonadFix m,
Ord (Async m ()), Ord addr) =>
Tracer m (SubscriptionTrace addr)
-> ConnectionTable m addr
-> ResultQ m addr (WithAddr addr ErrorPolicyTrace) a
-> StateVar m s
-> ThreadsVar m
-> Snocket m sock addr
-> WorkerCallbacks m s addr a x
-> WorkerParams m localAddrs addr
-> (sock -> m a)
-> m Void
subscriptionLoop Tracer IO (SubscriptionTrace addr)
tr ConnectionTable IO addr
tbl ResultQ IO addr (WithAddr addr ErrorPolicyTrace) a
TQueue (ResOrAct IO addr (WithAddr addr ErrorPolicyTrace) a)
resQ StateVar IO s
sVar StrictTVar IO (Set (Async ()))
ThreadsVar IO
threadsVar Snocket IO sock addr
snocket
WorkerCallbacks IO s addr a x
workerCallbacks WorkerParams IO localAddrs addr
workerParams sock -> IO a
k) ((Async IO Void -> IO x) -> IO x)
-> (Async IO Void -> IO x) -> IO x
forall a b. (a -> b) -> a -> b
$ \Async IO Void
_ ->
Tracer IO (WithAddr addr ErrorPolicyTrace)
-> ResultQ IO addr (WithAddr addr ErrorPolicyTrace) a
-> ThreadsVar IO
-> StateVar IO s
-> CompleteApplication IO s addr a
-> Main IO s x
-> IO x
forall s r addr t.
Tracer IO (WithAddr addr ErrorPolicyTrace)
-> ResultQ IO addr (WithAddr addr ErrorPolicyTrace) r
-> ThreadsVar IO
-> StateVar IO s
-> CompleteApplication IO s addr r
-> Main IO s t
-> IO t
mainLoop Tracer IO (WithAddr addr ErrorPolicyTrace)
errTrace ResultQ IO addr (WithAddr addr ErrorPolicyTrace) a
TQueue (ResOrAct IO addr (WithAddr addr ErrorPolicyTrace) a)
resQ StrictTVar IO (Set (Async ()))
ThreadsVar IO
threadsVar StateVar IO s
sVar CompleteApplication IO s addr a
wcCompleteApplicationTx Main IO s x
wcMainTx
IO x -> IO () -> IO x
forall (m :: * -> *) a b. MonadThrow m => m a -> m b -> m a
`finally` ThreadsVar IO -> IO ()
forall (m :: * -> *) (t :: * -> *) a.
(Foldable t, MonadAsync m) =>
StrictTVar m (t (Async m a)) -> m ()
killThreads StrictTVar IO (Set (Async ()))
ThreadsVar IO
threadsVar
where
killThreads :: StrictTVar m (t (Async m a)) -> m ()
killThreads StrictTVar m (t (Async m a))
threadsVar = do
let e :: SubscriberError
e = SubscriberErrorType -> String -> CallStack -> SubscriberError
SubscriberError
SubscriberErrorType
SubscriberWorkerCancelled
String
"SubscriptionWorker exiting"
CallStack
HasCallStack => CallStack
callStack
t (Async m a)
children <- STM m (t (Async m a)) -> m (t (Async m a))
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m (t (Async m a)) -> m (t (Async m a)))
-> STM m (t (Async m a)) -> m (t (Async m a))
forall a b. (a -> b) -> a -> b
$ StrictTVar m (t (Async m a)) -> STM m (t (Async m a))
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m (t (Async m a))
threadsVar
(Async m a -> m ()) -> t (Async m a) -> m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (\Async m a
a -> Async m a -> SubscriberError -> m ()
forall (m :: * -> *) e a.
(MonadAsync m, Exception e) =>
Async m a -> e -> m ()
cancelWith Async m a
a SubscriberError
e) t (Async m a)
children
data SubscriberError = SubscriberError {
SubscriberError -> SubscriberErrorType
seType :: !SubscriberErrorType
, SubscriberError -> String
seMessage :: !String
, SubscriberError -> CallStack
seStack :: !CallStack
} deriving Int -> SubscriberError -> ShowS
[SubscriberError] -> ShowS
SubscriberError -> String
(Int -> SubscriberError -> ShowS)
-> (SubscriberError -> String)
-> ([SubscriberError] -> ShowS)
-> Show SubscriberError
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [SubscriberError] -> ShowS
$cshowList :: [SubscriberError] -> ShowS
show :: SubscriberError -> String
$cshow :: SubscriberError -> String
showsPrec :: Int -> SubscriberError -> ShowS
$cshowsPrec :: Int -> SubscriberError -> ShowS
Show
data SubscriberErrorType = SubscriberParallelConnectionCancelled
| SubscriberWorkerCancelled
deriving (SubscriberErrorType -> SubscriberErrorType -> Bool
(SubscriberErrorType -> SubscriberErrorType -> Bool)
-> (SubscriberErrorType -> SubscriberErrorType -> Bool)
-> Eq SubscriberErrorType
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: SubscriberErrorType -> SubscriberErrorType -> Bool
$c/= :: SubscriberErrorType -> SubscriberErrorType -> Bool
== :: SubscriberErrorType -> SubscriberErrorType -> Bool
$c== :: SubscriberErrorType -> SubscriberErrorType -> Bool
Eq, Int -> SubscriberErrorType -> ShowS
[SubscriberErrorType] -> ShowS
SubscriberErrorType -> String
(Int -> SubscriberErrorType -> ShowS)
-> (SubscriberErrorType -> String)
-> ([SubscriberErrorType] -> ShowS)
-> Show SubscriberErrorType
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [SubscriberErrorType] -> ShowS
$cshowList :: [SubscriberErrorType] -> ShowS
show :: SubscriberErrorType -> String
$cshow :: SubscriberErrorType -> String
showsPrec :: Int -> SubscriberErrorType -> ShowS
$cshowsPrec :: Int -> SubscriberErrorType -> ShowS
Show)
instance Exception SubscriberError where
displayException :: SubscriberError -> String
displayException SubscriberError{SubscriberErrorType
seType :: SubscriberErrorType
seType :: SubscriberError -> SubscriberErrorType
seType, String
seMessage :: String
seMessage :: SubscriberError -> String
seMessage, CallStack
seStack :: CallStack
seStack :: SubscriberError -> CallStack
seStack}
= String -> String -> String -> ShowS
forall r. PrintfType r => String -> r
printf String
"%s %s at %s"
(SubscriberErrorType -> String
forall a. Show a => a -> String
show SubscriberErrorType
seType)
(ShowS
forall a. Show a => a -> String
show String
seMessage)
(CallStack -> String
prettyCallStack CallStack
seStack)
data SubscriptionTrace addr =
SubscriptionTraceConnectStart addr
| SubscriptionTraceConnectEnd addr ConnectResult
| forall e. Exception e => SubscriptionTraceSocketAllocationException addr e
| forall e. Exception e => SubscriptionTraceConnectException addr e
| forall e. Exception e => SubscriptionTraceApplicationException addr e
| SubscriptionTraceTryConnectToPeer addr
| SubscriptionTraceSkippingPeer addr
| SubscriptionTraceSubscriptionRunning
| SubscriptionTraceSubscriptionWaiting Int
| SubscriptionTraceSubscriptionFailed
| SubscriptionTraceSubscriptionWaitingNewConnection DiffTime
| SubscriptionTraceStart Int
| SubscriptionTraceRestart DiffTime Int Int
| SubscriptionTraceConnectionExist addr
| SubscriptionTraceUnsupportedRemoteAddr addr
| SubscriptionTraceMissingLocalAddress
| SubscriptionTraceAllocateSocket addr
| SubscriptionTraceCloseSocket addr
instance Show addr => Show (SubscriptionTrace addr) where
show :: SubscriptionTrace addr -> String
show (SubscriptionTraceConnectStart addr
dst) =
String
"Connection Attempt Start, destination " String -> ShowS
forall a. [a] -> [a] -> [a]
++ addr -> String
forall a. Show a => a -> String
show addr
dst
show (SubscriptionTraceConnectEnd addr
dst ConnectResult
res) =
String
"Connection Attempt End, destination " String -> ShowS
forall a. [a] -> [a] -> [a]
++ addr -> String
forall a. Show a => a -> String
show addr
dst String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
" outcome: " String -> ShowS
forall a. [a] -> [a] -> [a]
++ ConnectResult -> String
forall a. Show a => a -> String
show ConnectResult
res
show (SubscriptionTraceSocketAllocationException addr
dst e
e) =
String
"Socket Allocation Exception, destination " String -> ShowS
forall a. [a] -> [a] -> [a]
++ addr -> String
forall a. Show a => a -> String
show addr
dst String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
" exception: " String -> ShowS
forall a. [a] -> [a] -> [a]
++ e -> String
forall a. Show a => a -> String
show e
e
show (SubscriptionTraceConnectException addr
dst e
e) =
String
"Connection Attempt Exception, destination " String -> ShowS
forall a. [a] -> [a] -> [a]
++ addr -> String
forall a. Show a => a -> String
show addr
dst String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
" exception: " String -> ShowS
forall a. [a] -> [a] -> [a]
++ e -> String
forall a. Show a => a -> String
show e
e
show (SubscriptionTraceTryConnectToPeer addr
addr) =
String
"Trying to connect to " String -> ShowS
forall a. [a] -> [a] -> [a]
++ addr -> String
forall a. Show a => a -> String
show addr
addr
show (SubscriptionTraceSkippingPeer addr
addr) =
String
"Skipping peer " String -> ShowS
forall a. [a] -> [a] -> [a]
++ addr -> String
forall a. Show a => a -> String
show addr
addr
show SubscriptionTrace addr
SubscriptionTraceSubscriptionRunning =
String
"Required subscriptions started"
show (SubscriptionTraceSubscriptionWaiting Int
d) =
String
"Waiting on " String -> ShowS
forall a. [a] -> [a] -> [a]
++ Int -> String
forall a. Show a => a -> String
show Int
d String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
" active connections"
show SubscriptionTrace addr
SubscriptionTraceSubscriptionFailed =
String
"Failed to start all required subscriptions"
show (SubscriptionTraceSubscriptionWaitingNewConnection DiffTime
delay) =
String
"Waiting " String -> ShowS
forall a. [a] -> [a] -> [a]
++ DiffTime -> String
forall a. Show a => a -> String
show DiffTime
delay String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
" before attempting a new connection"
show (SubscriptionTraceStart Int
val) = String
"Starting Subscription Worker, valency " String -> ShowS
forall a. [a] -> [a] -> [a]
++ Int -> String
forall a. Show a => a -> String
show Int
val
show (SubscriptionTraceRestart DiffTime
duration Int
desiredVal Int
currentVal) =
String
"Restarting Subscription after " String -> ShowS
forall a. [a] -> [a] -> [a]
++ DiffTime -> String
forall a. Show a => a -> String
show DiffTime
duration String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
" desired valency " String -> ShowS
forall a. [a] -> [a] -> [a]
++
Int -> String
forall a. Show a => a -> String
show Int
desiredVal String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
" current valency " String -> ShowS
forall a. [a] -> [a] -> [a]
++ Int -> String
forall a. Show a => a -> String
show Int
currentVal
show (SubscriptionTraceConnectionExist addr
dst) =
String
"Connection Existed to " String -> ShowS
forall a. [a] -> [a] -> [a]
++ addr -> String
forall a. Show a => a -> String
show addr
dst
show (SubscriptionTraceUnsupportedRemoteAddr addr
dst) =
String
"Unsupported remote target address " String -> ShowS
forall a. [a] -> [a] -> [a]
++ addr -> String
forall a. Show a => a -> String
show addr
dst
show SubscriptionTrace addr
SubscriptionTraceMissingLocalAddress =
String
"Missing local address"
show (SubscriptionTraceApplicationException addr
addr e
e) =
String
"Application Exception: " String -> ShowS
forall a. [a] -> [a] -> [a]
++ addr -> String
forall a. Show a => a -> String
show addr
addr String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
" " String -> ShowS
forall a. [a] -> [a] -> [a]
++ e -> String
forall a. Show a => a -> String
show e
e
show (SubscriptionTraceAllocateSocket addr
addr) =
String
"Allocate socket to " String -> ShowS
forall a. [a] -> [a] -> [a]
++ addr -> String
forall a. Show a => a -> String
show addr
addr
show (SubscriptionTraceCloseSocket addr
addr) =
String
"Closed socket to " String -> ShowS
forall a. [a] -> [a] -> [a]
++ addr -> String
forall a. Show a => a -> String
show addr
addr