{-# LANGUAGE NamedFieldPuns #-}
module Ouroboros.Network.Subscription.Client
( ClientSubscriptionParams (..)
, clientSubscriptionWorker
) where
import Control.Monad.Class.MonadTime
import Control.Tracer
import Data.Functor.Identity (Identity (..))
import Data.Void (Void)
import Ouroboros.Network.ErrorPolicy (ErrorPolicies, ErrorPolicyTrace,
WithAddr, completeApplicationTx)
import Ouroboros.Network.Snocket (LocalAddress, LocalSnocket,
LocalSocket)
import Ouroboros.Network.Socket (NetworkMutableState (..))
import Ouroboros.Network.Subscription.Ip (mainTx, socketStateChangeTx)
import Ouroboros.Network.Subscription.Subscriber
import Ouroboros.Network.Subscription.Worker
data ClientSubscriptionParams a = ClientSubscriptionParams
{ ClientSubscriptionParams a -> LocalAddress
cspAddress :: !LocalAddress
, ClientSubscriptionParams a -> Maybe DiffTime
cspConnectionAttemptDelay :: !(Maybe DiffTime)
, ClientSubscriptionParams a -> ErrorPolicies
cspErrorPolicies :: !ErrorPolicies
}
clientSubscriptionWorker
:: LocalSnocket
-> Tracer IO (SubscriptionTrace LocalAddress)
-> Tracer IO (WithAddr LocalAddress ErrorPolicyTrace)
-> NetworkMutableState LocalAddress
-> ClientSubscriptionParams a
-> (LocalSocket -> IO a)
-> IO Void
clientSubscriptionWorker :: LocalSnocket
-> Tracer IO (SubscriptionTrace LocalAddress)
-> Tracer IO (WithAddr LocalAddress ErrorPolicyTrace)
-> NetworkMutableState LocalAddress
-> ClientSubscriptionParams a
-> (LocalSocket -> IO a)
-> IO Void
clientSubscriptionWorker LocalSnocket
snocket
Tracer IO (SubscriptionTrace LocalAddress)
tracer
Tracer IO (WithAddr LocalAddress ErrorPolicyTrace)
errorPolicyTracer
NetworkMutableState { ConnectionTable IO LocalAddress
nmsConnectionTable :: forall addr. NetworkMutableState addr -> ConnectionTable IO addr
nmsConnectionTable :: ConnectionTable IO LocalAddress
nmsConnectionTable, StrictTVar IO (PeerStates IO LocalAddress)
nmsPeerStates :: forall addr.
NetworkMutableState addr -> StrictTVar IO (PeerStates IO addr)
nmsPeerStates :: StrictTVar IO (PeerStates IO LocalAddress)
nmsPeerStates }
ClientSubscriptionParams { LocalAddress
cspAddress :: LocalAddress
cspAddress :: forall a. ClientSubscriptionParams a -> LocalAddress
cspAddress
, Maybe DiffTime
cspConnectionAttemptDelay :: Maybe DiffTime
cspConnectionAttemptDelay :: forall a. ClientSubscriptionParams a -> Maybe DiffTime
cspConnectionAttemptDelay
, ErrorPolicies
cspErrorPolicies :: ErrorPolicies
cspErrorPolicies :: forall a. ClientSubscriptionParams a -> ErrorPolicies
cspErrorPolicies
}
LocalSocket -> IO a
k =
Tracer IO (SubscriptionTrace LocalAddress)
-> Tracer IO (WithAddr LocalAddress ErrorPolicyTrace)
-> ConnectionTable IO LocalAddress
-> StrictTVar IO (PeerStates IO LocalAddress)
-> LocalSnocket
-> WorkerCallbacks
IO (PeerStates IO LocalAddress) LocalAddress a Void
-> WorkerParams IO Identity LocalAddress
-> (LocalSocket -> IO a)
-> IO Void
forall s sock (localAddrs :: * -> *) addr a x.
Ord addr =>
Tracer IO (SubscriptionTrace addr)
-> Tracer IO (WithAddr addr ErrorPolicyTrace)
-> ConnectionTable IO addr
-> StateVar IO s
-> Snocket IO sock addr
-> WorkerCallbacks IO s addr a x
-> WorkerParams IO localAddrs addr
-> (sock -> IO a)
-> IO x
worker Tracer IO (SubscriptionTrace LocalAddress)
tracer
Tracer IO (WithAddr LocalAddress ErrorPolicyTrace)
errorPolicyTracer
ConnectionTable IO LocalAddress
nmsConnectionTable
StrictTVar IO (PeerStates IO LocalAddress)
nmsPeerStates
LocalSnocket
snocket
WorkerCallbacks :: forall (m :: * -> *) s addr a t.
SocketStateChange m s addr
-> CompleteApplication m s addr a
-> Main m s t
-> WorkerCallbacks m s addr a t
WorkerCallbacks
{ wcSocketStateChangeTx :: SocketStateChange IO (PeerStates IO LocalAddress) LocalAddress
wcSocketStateChangeTx = SocketStateChange IO (PeerStates IO LocalAddress) LocalAddress
forall addr.
Ord addr =>
SocketStateChange IO (PeerStates IO addr) addr
socketStateChangeTx
, wcCompleteApplicationTx :: CompleteApplication IO (PeerStates IO LocalAddress) LocalAddress a
wcCompleteApplicationTx = ErrorPolicies
-> CompleteApplication
IO (PeerStates IO LocalAddress) LocalAddress a
forall (m :: * -> *) addr a.
(MonadAsync m, Ord addr, Ord (Async m ())) =>
ErrorPolicies -> CompleteApplication m (PeerStates m addr) addr a
completeApplicationTx ErrorPolicies
cspErrorPolicies
, wcMainTx :: Main IO (PeerStates IO LocalAddress) Void
wcMainTx = Main IO (PeerStates IO LocalAddress) Void
forall (m :: * -> *) addr.
(MonadThrow (STM m), MonadSTM m) =>
Main m (PeerStates m addr) Void
mainTx
}
WorkerParams IO Identity LocalAddress
workerParams
LocalSocket -> IO a
k
where
workerParams :: WorkerParams IO Identity LocalAddress
workerParams :: WorkerParams IO Identity LocalAddress
workerParams = WorkerParams :: forall (m :: * -> *) (localAddrs :: * -> *) addr.
localAddrs addr
-> (addr -> localAddrs addr -> Maybe addr)
-> (addr -> Maybe DiffTime)
-> m (SubscriptionTarget m addr)
-> Int
-> WorkerParams m localAddrs addr
WorkerParams {
wpLocalAddresses :: Identity LocalAddress
wpLocalAddresses = LocalAddress -> Identity LocalAddress
forall a. a -> Identity a
Identity LocalAddress
cspAddress,
wpSelectAddress :: LocalAddress -> Identity LocalAddress -> Maybe LocalAddress
wpSelectAddress = \LocalAddress
_ (Identity LocalAddress
addr) -> LocalAddress -> Maybe LocalAddress
forall a. a -> Maybe a
Just LocalAddress
addr,
wpConnectionAttemptDelay :: LocalAddress -> Maybe DiffTime
wpConnectionAttemptDelay = Maybe DiffTime -> LocalAddress -> Maybe DiffTime
forall a b. a -> b -> a
const Maybe DiffTime
cspConnectionAttemptDelay,
wpSubscriptionTarget :: IO (SubscriptionTarget IO LocalAddress)
wpSubscriptionTarget = SubscriptionTarget IO LocalAddress
-> IO (SubscriptionTarget IO LocalAddress)
forall (f :: * -> *) a. Applicative f => a -> f a
pure ([LocalAddress] -> SubscriptionTarget IO LocalAddress
forall (m :: * -> *) target.
Applicative m =>
[target] -> SubscriptionTarget m target
listSubscriptionTarget [LocalAddress
cspAddress]),
wpValency :: Int
wpValency = Int
1
}