{-# LANGUAGE NamedFieldPuns #-}

-- Subscription worker for client applications connecting with 'LocalSnocket'
-- which is using either unix sockets or Windows' named pipes.
--
module Ouroboros.Network.Subscription.Client
  ( ClientSubscriptionParams (..)
  , clientSubscriptionWorker
  ) where

import           Control.Monad.Class.MonadTime
import           Control.Tracer

import           Data.Functor.Identity (Identity (..))
import           Data.Void (Void)

import           Ouroboros.Network.ErrorPolicy (ErrorPolicies, ErrorPolicyTrace,
                     WithAddr, completeApplicationTx)
import           Ouroboros.Network.Snocket (LocalAddress, LocalSnocket,
                     LocalSocket)
import           Ouroboros.Network.Socket (NetworkMutableState (..))
import           Ouroboros.Network.Subscription.Ip (mainTx, socketStateChangeTx)
import           Ouroboros.Network.Subscription.Subscriber
import           Ouroboros.Network.Subscription.Worker


data ClientSubscriptionParams a = ClientSubscriptionParams
  { ClientSubscriptionParams a -> LocalAddress
cspAddress                :: !LocalAddress
  -- ^ unix socket or named pipe address
  , ClientSubscriptionParams a -> Maybe DiffTime
cspConnectionAttemptDelay :: !(Maybe DiffTime)
  -- ^ delay between connection attempts
  , ClientSubscriptionParams a -> ErrorPolicies
cspErrorPolicies          :: !ErrorPolicies
  -- ^ error policies for subscription worker
  }

-- | Client subscription worker keeps subsribing to the 'LocalAddress' using
-- either unix socket or named pipe.
--
clientSubscriptionWorker
    :: LocalSnocket
    -> Tracer IO (SubscriptionTrace LocalAddress)
    -> Tracer IO (WithAddr LocalAddress ErrorPolicyTrace)
    -> NetworkMutableState LocalAddress
    -> ClientSubscriptionParams a
    -> (LocalSocket -> IO a)
    -> IO Void
clientSubscriptionWorker :: LocalSnocket
-> Tracer IO (SubscriptionTrace LocalAddress)
-> Tracer IO (WithAddr LocalAddress ErrorPolicyTrace)
-> NetworkMutableState LocalAddress
-> ClientSubscriptionParams a
-> (LocalSocket -> IO a)
-> IO Void
clientSubscriptionWorker LocalSnocket
snocket
                         Tracer IO (SubscriptionTrace LocalAddress)
tracer
                         Tracer IO (WithAddr LocalAddress ErrorPolicyTrace)
errorPolicyTracer
                         NetworkMutableState { ConnectionTable IO LocalAddress
nmsConnectionTable :: forall addr. NetworkMutableState addr -> ConnectionTable IO addr
nmsConnectionTable :: ConnectionTable IO LocalAddress
nmsConnectionTable, StrictTVar IO (PeerStates IO LocalAddress)
nmsPeerStates :: forall addr.
NetworkMutableState addr -> StrictTVar IO (PeerStates IO addr)
nmsPeerStates :: StrictTVar IO (PeerStates IO LocalAddress)
nmsPeerStates }
                         ClientSubscriptionParams { LocalAddress
cspAddress :: LocalAddress
cspAddress :: forall a. ClientSubscriptionParams a -> LocalAddress
cspAddress
                                                  , Maybe DiffTime
cspConnectionAttemptDelay :: Maybe DiffTime
cspConnectionAttemptDelay :: forall a. ClientSubscriptionParams a -> Maybe DiffTime
cspConnectionAttemptDelay
                                                  , ErrorPolicies
cspErrorPolicies :: ErrorPolicies
cspErrorPolicies :: forall a. ClientSubscriptionParams a -> ErrorPolicies
cspErrorPolicies
                                                  }
                         LocalSocket -> IO a
k =
    Tracer IO (SubscriptionTrace LocalAddress)
-> Tracer IO (WithAddr LocalAddress ErrorPolicyTrace)
-> ConnectionTable IO LocalAddress
-> StrictTVar IO (PeerStates IO LocalAddress)
-> LocalSnocket
-> WorkerCallbacks
     IO (PeerStates IO LocalAddress) LocalAddress a Void
-> WorkerParams IO Identity LocalAddress
-> (LocalSocket -> IO a)
-> IO Void
forall s sock (localAddrs :: * -> *) addr a x.
Ord addr =>
Tracer IO (SubscriptionTrace addr)
-> Tracer IO (WithAddr addr ErrorPolicyTrace)
-> ConnectionTable IO addr
-> StateVar IO s
-> Snocket IO sock addr
-> WorkerCallbacks IO s addr a x
-> WorkerParams IO localAddrs addr
-> (sock -> IO a)
-> IO x
worker Tracer IO (SubscriptionTrace LocalAddress)
tracer
           Tracer IO (WithAddr LocalAddress ErrorPolicyTrace)
errorPolicyTracer
           ConnectionTable IO LocalAddress
nmsConnectionTable
           StrictTVar IO (PeerStates IO LocalAddress)
nmsPeerStates
           LocalSnocket
snocket
           WorkerCallbacks :: forall (m :: * -> *) s addr a t.
SocketStateChange m s addr
-> CompleteApplication m s addr a
-> Main m s t
-> WorkerCallbacks m s addr a t
WorkerCallbacks
            { wcSocketStateChangeTx :: SocketStateChange IO (PeerStates IO LocalAddress) LocalAddress
wcSocketStateChangeTx   = SocketStateChange IO (PeerStates IO LocalAddress) LocalAddress
forall addr.
Ord addr =>
SocketStateChange IO (PeerStates IO addr) addr
socketStateChangeTx
            , wcCompleteApplicationTx :: CompleteApplication IO (PeerStates IO LocalAddress) LocalAddress a
wcCompleteApplicationTx = ErrorPolicies
-> CompleteApplication
     IO (PeerStates IO LocalAddress) LocalAddress a
forall (m :: * -> *) addr a.
(MonadAsync m, Ord addr, Ord (Async m ())) =>
ErrorPolicies -> CompleteApplication m (PeerStates m addr) addr a
completeApplicationTx ErrorPolicies
cspErrorPolicies
            , wcMainTx :: Main IO (PeerStates IO LocalAddress) Void
wcMainTx                = Main IO (PeerStates IO LocalAddress) Void
forall (m :: * -> *) addr.
(MonadThrow (STM m), MonadSTM m) =>
Main m (PeerStates m addr) Void
mainTx
            }
           WorkerParams IO Identity LocalAddress
workerParams
           LocalSocket -> IO a
k
  where
    workerParams :: WorkerParams IO Identity LocalAddress
    workerParams :: WorkerParams IO Identity LocalAddress
workerParams = WorkerParams :: forall (m :: * -> *) (localAddrs :: * -> *) addr.
localAddrs addr
-> (addr -> localAddrs addr -> Maybe addr)
-> (addr -> Maybe DiffTime)
-> m (SubscriptionTarget m addr)
-> Int
-> WorkerParams m localAddrs addr
WorkerParams {
        wpLocalAddresses :: Identity LocalAddress
wpLocalAddresses         = LocalAddress -> Identity LocalAddress
forall a. a -> Identity a
Identity LocalAddress
cspAddress,
        wpSelectAddress :: LocalAddress -> Identity LocalAddress -> Maybe LocalAddress
wpSelectAddress          = \LocalAddress
_ (Identity LocalAddress
addr) -> LocalAddress -> Maybe LocalAddress
forall a. a -> Maybe a
Just LocalAddress
addr,
        wpConnectionAttemptDelay :: LocalAddress -> Maybe DiffTime
wpConnectionAttemptDelay = Maybe DiffTime -> LocalAddress -> Maybe DiffTime
forall a b. a -> b -> a
const Maybe DiffTime
cspConnectionAttemptDelay,
        wpSubscriptionTarget :: IO (SubscriptionTarget IO LocalAddress)
wpSubscriptionTarget     = SubscriptionTarget IO LocalAddress
-> IO (SubscriptionTarget IO LocalAddress)
forall (f :: * -> *) a. Applicative f => a -> f a
pure ([LocalAddress] -> SubscriptionTarget IO LocalAddress
forall (m :: * -> *) target.
Applicative m =>
[target] -> SubscriptionTarget m target
listSubscriptionTarget [LocalAddress
cspAddress]),
        wpValency :: Int
wpValency                = Int
1
      }