{-# LANGUAGE DataKinds           #-}
{-# LANGUAGE FlexibleInstances   #-}
{-# LANGUAGE KindSignatures      #-}
{-# LANGUAGE NamedFieldPuns      #-}
{-# LANGUAGE NumericUnderscores  #-}
{-# LANGUAGE RankNTypes          #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeFamilies        #-}

-- | This is the starting point for a module that will bring together the
-- overall node to node protocol, as a collection of mini-protocols.
--
module Ouroboros.Network.NodeToNode
  ( nodeToNodeProtocols
  , NodeToNodeProtocols (..)
  , MiniProtocolParameters (..)
  , chainSyncProtocolLimits
  , blockFetchProtocolLimits
  , txSubmissionProtocolLimits
  , keepAliveProtocolLimits
  , defaultMiniProtocolParameters
  , NodeToNodeVersion (..)
  , NodeToNodeVersionData (..)
  , NetworkConnectTracers (..)
  , nullNetworkConnectTracers
  , connectTo
  , NetworkServerTracers (..)
  , nullNetworkServerTracers
  , NetworkMutableState (..)
  , AcceptedConnectionsLimit (..)
  , newNetworkMutableState
  , newNetworkMutableStateSTM
  , cleanNetworkMutableState
  , withServer
    -- * P2P Governor
  , DomainAccessPoint (..)
  , PeerAdvertise (..)
  , PeerSelectionTargets (..)
    -- * Subscription Workers
    -- ** IP subscription worker
  , IPSubscriptionTarget (..)
  , NetworkIPSubscriptionTracers
  , NetworkSubscriptionTracers (..)
  , nullNetworkSubscriptionTracers
  , SubscriptionParams (..)
  , IPSubscriptionParams
  , ipSubscriptionWorker
    -- ** DNS subscription worker
  , DnsSubscriptionTarget (..)
  , DnsSubscriptionParams
  , NetworkDNSSubscriptionTracers (..)
  , nullNetworkDNSSubscriptionTracers
  , dnsSubscriptionWorker
    -- ** Versions
  , Versions (..)
  , DiffusionMode (..)
  , simpleSingletonVersions
  , foldMapVersions
  , combineVersions
    -- *** Codecs
  , nodeToNodeHandshakeCodec
  , nodeToNodeVersionCodec
  , nodeToNodeCodecCBORTerm
    -- * Re-exports
  , ConnectionId (..)
  , RemoteAddress
  , RemoteConnectionId
  , ProtocolLimitFailure
  , Handshake
  , LocalAddresses (..)
  , Socket
  , isPipeliningEnabled
    -- ** Error Policies and Peer state
  , ErrorPolicies (..)
  , remoteNetworkErrorPolicy
  , localNetworkErrorPolicy
  , nullErrorPolicies
  , ErrorPolicy (..)
  , SuspendDecision (..)
    -- ** Traces
  , AcceptConnectionsPolicyTrace (..)
  , TraceSendRecv (..)
  , SubscriptionTrace (..)
  , DnsTrace (..)
  , ErrorPolicyTrace (..)
  , WithIPList (..)
  , WithDomainName (..)
  , WithAddr (..)
  , HandshakeTr
    -- * For Consensus ThreadNet Tests
  , chainSyncMiniProtocolNum
  , blockFetchMiniProtocolNum
  , txSubmissionMiniProtocolNum
  , keepAliveMiniProtocolNum
  ) where

import qualified Control.Concurrent.Async as Async
import           Control.Exception (IOException)
import           Control.Monad.Class.MonadST
import           Control.Monad.Class.MonadSTM
import           Control.Monad.Class.MonadTime (DiffTime)

import qualified Codec.CBOR.Read as CBOR
import qualified Codec.CBOR.Term as CBOR
import qualified Data.ByteString.Lazy as BL
import           Data.Void (Void)
import           Data.Word
import           Network.Mux (WithMuxBearer (..))
import           Network.Mux.Types (MuxRuntimeError (..))
import           Network.Socket (Socket)
import qualified Network.Socket as Socket

import           Network.TypedProtocol.Codec.CBOR

import           Ouroboros.Network.BlockFetch.Client (BlockFetchProtocolFailure)
import           Ouroboros.Network.Driver (TraceSendRecv (..))
import           Ouroboros.Network.Driver.Limits (ProtocolLimitFailure (..))
import           Ouroboros.Network.Driver.Simple (DecoderFailure)
import           Ouroboros.Network.ErrorPolicy
import           Ouroboros.Network.IOManager
import           Ouroboros.Network.Mux
import           Ouroboros.Network.NodeToNode.Version
import           Ouroboros.Network.PeerSelection.Governor.Types
                     (PeerSelectionTargets (..))
import           Ouroboros.Network.PeerSelection.RootPeersDNS
                     (DomainAccessPoint (..))
import           Ouroboros.Network.PeerSelection.Types (PeerAdvertise (..))
import           Ouroboros.Network.Protocol.Handshake.Codec
import           Ouroboros.Network.Protocol.Handshake.Type
import           Ouroboros.Network.Protocol.Handshake.Version hiding (Accept)
import           Ouroboros.Network.Snocket
import           Ouroboros.Network.Socket
import           Ouroboros.Network.Subscription.Dns (DnsSubscriptionParams,
                     DnsSubscriptionTarget (..), DnsTrace (..),
                     WithDomainName (..))
import qualified Ouroboros.Network.Subscription.Dns as Subscription
import           Ouroboros.Network.Subscription.Ip (IPSubscriptionParams,
                     IPSubscriptionTarget (..), SubscriptionParams (..),
                     SubscriptionTrace (..), WithIPList (..))
import qualified Ouroboros.Network.Subscription.Ip as Subscription
import           Ouroboros.Network.Subscription.Worker (LocalAddresses (..),
                     SubscriberError)
import           Ouroboros.Network.Tracers
import qualified Ouroboros.Network.TxSubmission.Inbound as TxInbound
import qualified Ouroboros.Network.TxSubmission.Outbound as TxOutbound


-- The Handshake tracer types are simply terrible.
type HandshakeTr ntnAddr ntnVersion =
    WithMuxBearer (ConnectionId ntnAddr)
                  (TraceSendRecv (Handshake ntnVersion CBOR.Term))

-- | 'Handshake' codec for the @node-to-node@ protocol suite.
--
nodeToNodeHandshakeCodec :: MonadST m
                         => Codec (Handshake NodeToNodeVersion CBOR.Term)
                                  CBOR.DeserialiseFailure m BL.ByteString
nodeToNodeHandshakeCodec :: Codec
  (Handshake NodeToNodeVersion Term) DeserialiseFailure m ByteString
nodeToNodeHandshakeCodec = CodecCBORTerm (Text, Maybe Int) NodeToNodeVersion
-> Codec
     (Handshake NodeToNodeVersion Term) DeserialiseFailure m ByteString
forall vNumber (m :: * -> *) failure.
(MonadST m, Ord vNumber, Show failure) =>
CodecCBORTerm (failure, Maybe Int) vNumber
-> Codec (Handshake vNumber Term) DeserialiseFailure m ByteString
codecHandshake CodecCBORTerm (Text, Maybe Int) NodeToNodeVersion
nodeToNodeVersionCodec


data NodeToNodeProtocols appType bytes m a b = NodeToNodeProtocols {
    -- | chain-sync mini-protocol
    --
    NodeToNodeProtocols appType bytes m a b
-> RunMiniProtocol appType bytes m a b
chainSyncProtocol    :: RunMiniProtocol appType bytes m a b,

    -- | block-fetch mini-protocol
    --
    NodeToNodeProtocols appType bytes m a b
-> RunMiniProtocol appType bytes m a b
blockFetchProtocol   :: RunMiniProtocol appType bytes m a b,

    -- | tx-submission mini-protocol
    --
    NodeToNodeProtocols appType bytes m a b
-> RunMiniProtocol appType bytes m a b
txSubmissionProtocol :: RunMiniProtocol appType bytes m a b,

    -- | keep-alive mini-protocol
    --
    NodeToNodeProtocols appType bytes m a b
-> RunMiniProtocol appType bytes m a b
keepAliveProtocol    :: RunMiniProtocol appType bytes m a b

  }


data MiniProtocolParameters = MiniProtocolParameters {
      MiniProtocolParameters -> Word32
chainSyncPipeliningHighMark :: !Word32,
      -- ^ high threshold for pipelining (we will never exceed that many
      -- messages pipelined).

      MiniProtocolParameters -> Word32
chainSyncPipeliningLowMark  :: !Word32,
      -- ^ low threshold: if we hit the 'chainSyncPipeliningHighMark' we will
      -- listen for responses until there are at most
      -- 'chainSyncPipeliningLowMark' pipelined message
      --
      -- Must be smaller than 'chainSyncPipeliningHighMark'.
      --
      -- Note: 'chainSyncPipeliningLowMark' and 'chainSyncPipeliningLowMark'
      -- are passed to 'pipelineDecisionLowHighMark'.

      MiniProtocolParameters -> Word
blockFetchPipeliningMax     :: !Word,
      -- ^ maximal number of pipelined messages in 'block-fetch' mini-protocol.

      MiniProtocolParameters -> Word16
txSubmissionMaxUnacked      :: !Word16
      -- ^ maximal number of unacked tx (pipelining is bounded by twice this
      -- number)
    }

defaultMiniProtocolParameters :: MiniProtocolParameters
defaultMiniProtocolParameters :: MiniProtocolParameters
defaultMiniProtocolParameters = MiniProtocolParameters :: Word32 -> Word32 -> Word -> Word16 -> MiniProtocolParameters
MiniProtocolParameters {
      chainSyncPipeliningLowMark :: Word32
chainSyncPipeliningLowMark  = Word32
200
    , chainSyncPipeliningHighMark :: Word32
chainSyncPipeliningHighMark = Word32
300
    , blockFetchPipeliningMax :: Word
blockFetchPipeliningMax     = Word
100
    , txSubmissionMaxUnacked :: Word16
txSubmissionMaxUnacked       = Word16
10
  }

-- | Make an 'OuroborosApplication' for the bundle of mini-protocols that
-- make up the overall node-to-node protocol.
--
-- This function specifies the wire format protocol numbers.
--
-- The application specific protocol numbers start from 2.  The
-- @'MiniProtocolNum' 0@ is reserved for the 'Handshake' protocol, while
-- @'MiniProtocolNum' 1@ is reserved for DeltaQ messages.
-- 'Handshake' protocol is not included in 'NodeToNodeProtocols' as it runs
-- before mux is started but it reusing 'MuxBearer' to send and receive
-- messages.  Only when the handshake protocol succeeds, we will know which
-- protocols to run / multiplex.
--
-- These are chosen to not overlap with the node to client protocol numbers (and
-- the handshake protocol number).  This is not essential for correctness, but
-- is helpful to allow a single shared implementation of tools that can analyse
-- both protocols, e.g.  wireshark plugins.
--
nodeToNodeProtocols
  :: MiniProtocolParameters
  -> (ConnectionId addr -> STM m ControlMessage -> NodeToNodeProtocols muxMode bytes m a b)
  -> NodeToNodeVersion
  -> OuroborosBundle muxMode addr bytes m a b
nodeToNodeProtocols :: MiniProtocolParameters
-> (ConnectionId addr
    -> STM m ControlMessage -> NodeToNodeProtocols muxMode bytes m a b)
-> NodeToNodeVersion
-> OuroborosBundle muxMode addr bytes m a b
nodeToNodeProtocols MiniProtocolParameters
miniProtocolParameters ConnectionId addr
-> STM m ControlMessage -> NodeToNodeProtocols muxMode bytes m a b
protocols NodeToNodeVersion
_version =
    WithProtocolTemperature
  'Hot
  (ConnectionId addr
   -> STM m ControlMessage -> [MiniProtocol muxMode bytes m a b])
-> WithProtocolTemperature
     'Warm
     (ConnectionId addr
      -> STM m ControlMessage -> [MiniProtocol muxMode bytes m a b])
-> WithProtocolTemperature
     'Established
     (ConnectionId addr
      -> STM m ControlMessage -> [MiniProtocol muxMode bytes m a b])
-> OuroborosBundle muxMode addr bytes m a b
forall a.
WithProtocolTemperature 'Hot a
-> WithProtocolTemperature 'Warm a
-> WithProtocolTemperature 'Established a
-> Bundle a
Bundle
      -- Hot protocols: 'chain-sync', 'block-fetch' and 'tx-submission'.
      ((ConnectionId addr
 -> STM m ControlMessage -> [MiniProtocol muxMode bytes m a b])
-> WithProtocolTemperature
     'Hot
     (ConnectionId addr
      -> STM m ControlMessage -> [MiniProtocol muxMode bytes m a b])
forall a. a -> WithProtocolTemperature 'Hot a
WithHot ((ConnectionId addr
  -> STM m ControlMessage -> [MiniProtocol muxMode bytes m a b])
 -> WithProtocolTemperature
      'Hot
      (ConnectionId addr
       -> STM m ControlMessage -> [MiniProtocol muxMode bytes m a b]))
-> (ConnectionId addr
    -> STM m ControlMessage -> [MiniProtocol muxMode bytes m a b])
-> WithProtocolTemperature
     'Hot
     (ConnectionId addr
      -> STM m ControlMessage -> [MiniProtocol muxMode bytes m a b])
forall a b. (a -> b) -> a -> b
$ \ConnectionId addr
connectionId STM m ControlMessage
controlMessageSTM ->
        case ConnectionId addr
-> STM m ControlMessage -> NodeToNodeProtocols muxMode bytes m a b
protocols ConnectionId addr
connectionId STM m ControlMessage
controlMessageSTM of
          NodeToNodeProtocols { RunMiniProtocol muxMode bytes m a b
chainSyncProtocol :: RunMiniProtocol muxMode bytes m a b
chainSyncProtocol :: forall (appType :: MuxMode) bytes (m :: * -> *) a b.
NodeToNodeProtocols appType bytes m a b
-> RunMiniProtocol appType bytes m a b
chainSyncProtocol,
                                RunMiniProtocol muxMode bytes m a b
blockFetchProtocol :: RunMiniProtocol muxMode bytes m a b
blockFetchProtocol :: forall (appType :: MuxMode) bytes (m :: * -> *) a b.
NodeToNodeProtocols appType bytes m a b
-> RunMiniProtocol appType bytes m a b
blockFetchProtocol,
                                RunMiniProtocol muxMode bytes m a b
txSubmissionProtocol :: RunMiniProtocol muxMode bytes m a b
txSubmissionProtocol :: forall (appType :: MuxMode) bytes (m :: * -> *) a b.
NodeToNodeProtocols appType bytes m a b
-> RunMiniProtocol appType bytes m a b
txSubmissionProtocol
                              } ->
            [ MiniProtocol :: forall (mode :: MuxMode) bytes (m :: * -> *) a b.
MiniProtocolNum
-> MiniProtocolLimits
-> RunMiniProtocol mode bytes m a b
-> MiniProtocol mode bytes m a b
MiniProtocol {
                miniProtocolNum :: MiniProtocolNum
miniProtocolNum    = Word16 -> MiniProtocolNum
MiniProtocolNum Word16
2,
                miniProtocolLimits :: MiniProtocolLimits
miniProtocolLimits = MiniProtocolParameters -> MiniProtocolLimits
chainSyncProtocolLimits MiniProtocolParameters
miniProtocolParameters,
                miniProtocolRun :: RunMiniProtocol muxMode bytes m a b
miniProtocolRun    = RunMiniProtocol muxMode bytes m a b
chainSyncProtocol
              }
            , MiniProtocol :: forall (mode :: MuxMode) bytes (m :: * -> *) a b.
MiniProtocolNum
-> MiniProtocolLimits
-> RunMiniProtocol mode bytes m a b
-> MiniProtocol mode bytes m a b
MiniProtocol {
                miniProtocolNum :: MiniProtocolNum
miniProtocolNum    = Word16 -> MiniProtocolNum
MiniProtocolNum Word16
3,
                miniProtocolLimits :: MiniProtocolLimits
miniProtocolLimits = MiniProtocolParameters -> MiniProtocolLimits
blockFetchProtocolLimits MiniProtocolParameters
miniProtocolParameters,
                miniProtocolRun :: RunMiniProtocol muxMode bytes m a b
miniProtocolRun    = RunMiniProtocol muxMode bytes m a b
blockFetchProtocol
              }
            , MiniProtocol :: forall (mode :: MuxMode) bytes (m :: * -> *) a b.
MiniProtocolNum
-> MiniProtocolLimits
-> RunMiniProtocol mode bytes m a b
-> MiniProtocol mode bytes m a b
MiniProtocol {
                miniProtocolNum :: MiniProtocolNum
miniProtocolNum    = Word16 -> MiniProtocolNum
MiniProtocolNum Word16
4,
                miniProtocolLimits :: MiniProtocolLimits
miniProtocolLimits = MiniProtocolParameters -> MiniProtocolLimits
txSubmissionProtocolLimits MiniProtocolParameters
miniProtocolParameters,
                miniProtocolRun :: RunMiniProtocol muxMode bytes m a b
miniProtocolRun    = RunMiniProtocol muxMode bytes m a b
txSubmissionProtocol
              }
            ])

      -- Warm protocols: reserved for 'tip-sample'.
      ((ConnectionId addr
 -> STM m ControlMessage -> [MiniProtocol muxMode bytes m a b])
-> WithProtocolTemperature
     'Warm
     (ConnectionId addr
      -> STM m ControlMessage -> [MiniProtocol muxMode bytes m a b])
forall a. a -> WithProtocolTemperature 'Warm a
WithWarm ((ConnectionId addr
  -> STM m ControlMessage -> [MiniProtocol muxMode bytes m a b])
 -> WithProtocolTemperature
      'Warm
      (ConnectionId addr
       -> STM m ControlMessage -> [MiniProtocol muxMode bytes m a b]))
-> (ConnectionId addr
    -> STM m ControlMessage -> [MiniProtocol muxMode bytes m a b])
-> WithProtocolTemperature
     'Warm
     (ConnectionId addr
      -> STM m ControlMessage -> [MiniProtocol muxMode bytes m a b])
forall a b. (a -> b) -> a -> b
$ \ConnectionId addr
_connectionId STM m ControlMessage
_controlMessageSTM -> [])

      -- Established protocols: 'keep-alive'.
      ((ConnectionId addr
 -> STM m ControlMessage -> [MiniProtocol muxMode bytes m a b])
-> WithProtocolTemperature
     'Established
     (ConnectionId addr
      -> STM m ControlMessage -> [MiniProtocol muxMode bytes m a b])
forall a. a -> WithProtocolTemperature 'Established a
WithEstablished ((ConnectionId addr
  -> STM m ControlMessage -> [MiniProtocol muxMode bytes m a b])
 -> WithProtocolTemperature
      'Established
      (ConnectionId addr
       -> STM m ControlMessage -> [MiniProtocol muxMode bytes m a b]))
-> (ConnectionId addr
    -> STM m ControlMessage -> [MiniProtocol muxMode bytes m a b])
-> WithProtocolTemperature
     'Established
     (ConnectionId addr
      -> STM m ControlMessage -> [MiniProtocol muxMode bytes m a b])
forall a b. (a -> b) -> a -> b
$ \ConnectionId addr
connectionId STM m ControlMessage
controlMessageSTM ->
        case ConnectionId addr
-> STM m ControlMessage -> NodeToNodeProtocols muxMode bytes m a b
protocols ConnectionId addr
connectionId STM m ControlMessage
controlMessageSTM of
          NodeToNodeProtocols { RunMiniProtocol muxMode bytes m a b
keepAliveProtocol :: RunMiniProtocol muxMode bytes m a b
keepAliveProtocol :: forall (appType :: MuxMode) bytes (m :: * -> *) a b.
NodeToNodeProtocols appType bytes m a b
-> RunMiniProtocol appType bytes m a b
keepAliveProtocol } ->
            [ MiniProtocol :: forall (mode :: MuxMode) bytes (m :: * -> *) a b.
MiniProtocolNum
-> MiniProtocolLimits
-> RunMiniProtocol mode bytes m a b
-> MiniProtocol mode bytes m a b
MiniProtocol {
                miniProtocolNum :: MiniProtocolNum
miniProtocolNum    = Word16 -> MiniProtocolNum
MiniProtocolNum Word16
8,
                miniProtocolLimits :: MiniProtocolLimits
miniProtocolLimits = MiniProtocolParameters -> MiniProtocolLimits
keepAliveProtocolLimits MiniProtocolParameters
miniProtocolParameters,
                miniProtocolRun :: RunMiniProtocol muxMode bytes m a b
miniProtocolRun    = RunMiniProtocol muxMode bytes m a b
keepAliveProtocol
              }
            ])

addSafetyMargin :: Int -> Int
addSafetyMargin :: Int -> Int
addSafetyMargin Int
x = Int
x Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
x Int -> Int -> Int
forall a. Integral a => a -> a -> a
`div` Int
10

chainSyncProtocolLimits
  , blockFetchProtocolLimits
  , txSubmissionProtocolLimits
  , keepAliveProtocolLimits :: MiniProtocolParameters -> MiniProtocolLimits

chainSyncProtocolLimits :: MiniProtocolParameters -> MiniProtocolLimits
chainSyncProtocolLimits MiniProtocolParameters { Word32
chainSyncPipeliningHighMark :: Word32
chainSyncPipeliningHighMark :: MiniProtocolParameters -> Word32
chainSyncPipeliningHighMark } =
  MiniProtocolLimits :: Int -> MiniProtocolLimits
MiniProtocolLimits {
      -- The largest message over ChainSync is @MsgRollForward@ which mainly
      -- consists of a BlockHeader.
      -- TODO: 1400 comes from maxBlockHeaderSize in genesis, but should come
      -- from consensus rather than being hard coded.
      maximumIngressQueue :: Int
maximumIngressQueue = Int -> Int
addSafetyMargin (Int -> Int) -> Int -> Int
forall a b. (a -> b) -> a -> b
$
        Word32 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word32
chainSyncPipeliningHighMark Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
1400
    }

blockFetchProtocolLimits :: MiniProtocolParameters -> MiniProtocolLimits
blockFetchProtocolLimits MiniProtocolParameters { Word
blockFetchPipeliningMax :: Word
blockFetchPipeliningMax :: MiniProtocolParameters -> Word
blockFetchPipeliningMax } = MiniProtocolLimits :: Int -> MiniProtocolLimits
MiniProtocolLimits {
    -- block-fetch client can pipeline at most 'blockFetchPipeliningMax'
    -- blocks (currently '10').  This is currently hard coded in
    -- 'Ouroboros.Network.BlockFetch.blockFetchLogic' (where
    -- @maxInFlightReqsPerPeer = 100@ is specified).  In the future the
    -- block fetch client will count bytes rather than blocks.  By far
    -- the largest (and the only pipelined message) in 'block-fetch'
    -- protocol is 'MsgBlock'.  Current block size limit is 64KB and
    -- `blockFetchPipeliningMax` below is set to `100`.  This means that
    -- overall queue limit must be:
    --
    --   ```
        -- 100 * 64KB = 6.4MB
    --   ```
    --
    -- In the byron era this limit was set to `10 * 2MB`, we keep the more
    -- relaxed limit here.
    --
    maximumIngressQueue :: Int
maximumIngressQueue = Int -> Int
addSafetyMargin (Int -> Int) -> Int -> Int
forall a b. (a -> b) -> a -> b
$ Word -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Word -> Int) -> Word -> Int
forall a b. (a -> b) -> a -> b
$
      Word -> Word -> Word
forall a. Ord a => a -> a -> a
max (Word
10 Word -> Word -> Word
forall a. Num a => a -> a -> a
* Word
2_097_154) (Word
blockFetchPipeliningMax Word -> Word -> Word
forall a. Num a => a -> a -> a
* Word
65535)
  }

txSubmissionProtocolLimits :: MiniProtocolParameters -> MiniProtocolLimits
txSubmissionProtocolLimits MiniProtocolParameters { Word16
txSubmissionMaxUnacked :: Word16
txSubmissionMaxUnacked :: MiniProtocolParameters -> Word16
txSubmissionMaxUnacked } = MiniProtocolLimits :: Int -> MiniProtocolLimits
MiniProtocolLimits {
      -- tx-submission server can pipeline both 'MsgRequestTxIds' and
      -- 'MsgRequestTx'. This means that there can be many
      -- 'MsgReplyTxIds', 'MsgReplyTxs' messages in an inbound queue (their
      -- sizes are strictly greater than the corresponding request
      -- messages).
      --
      -- Each 'MsgRequestTx' can contain at max @maxTxIdsToRequest = 3@
      -- (defined in -- 'Ouroboros.Network.TxSubmission.Inbound.txSubmissionInbound')
      --
      -- Each 'MsgRequestTx' can request at max @maxTxToRequest = 2@
      -- (defined in -- 'Ouroboros.Network.TxSubmission.Inbound.txSubmissionInbound')
      --
      -- The 'txSubmissionInBound' server can at most put `100`
      -- unacknowledged transactions.  It also pipelines both 'MsgRequestTx`
      -- and `MsgRequestTx` in turn. This means that the inbound queue can
      -- have at most `100` `MsgRequestTxIds` and `MsgRequestTx` which will
      -- contain a single `TxId` / `Tx`.
      --
      -- TODO: the unacknowledged transactions are configured in `NodeArgs`,
      -- and we should take this parameter as an input for this computation.
      --
      -- The upper bound of size of a single transaction is 64k, while the
      -- size of `TxId` is `34` bytes (`type TxId = Hash Tx`).
      --
      -- Ingress side of `txSubmissinInbound`
      --
      -- - 'MsgReplyTxs' carrying a single `TxId`:
      -- ```
      --    1  -- encodeListLen 2
      --  + 1  -- encodeWord 1
      --  + 1  -- encodeListLenIndef
      --  + 1  -- encodeListLen 2
      --  + 34 -- encode 'TxId'
      --  + 5  -- encodeWord32 (size of tx)
      --  + 1  -- encodeBreak
      --  = 44
      -- ```
      -- - 'MsgReplyTx' carrying a single 'Tx':
      -- ```
      --    1      -- encodeListLen 2
      --  + 1      -- encodeWord 3
      --  + 1      -- encodeListLenIndef
      --  + 65_536 -- 64kb transaction
      --  + 1      -- encodeBreak
      --  = 65_540
      -- ```
      --
      -- On the ingress side of 'txSubmissionOutbound' we can have at most
      -- `MaxUnacked' 'MsgRequestTxsIds' and the same amount of
      -- 'MsgRequsetTx' containing a single 'TxId'.  The size of
      -- 'MsgRequestTxsIds' is much smaller that 'MsgReplyTx', and the size
      -- of `MsgReqeustTx` with a single 'TxId' is smaller than
      -- 'MsgReplyTxIds' which contains a single 'TxId' (it just contains
      -- the 'TxId' without the size of 'Tx' in bytes).  So the ingress
      -- queue of 'txSubmissionOutbound' is bounded by the ingress side of
      -- the 'txSubmissionInbound'
      --
      -- Currently the value of 'txSubmissionMaxUnacked' is '100', for
      -- which the upper bound is `100 * (44 + 65_540) = 6_558_400`, we add
      -- 10% as a safety margin.
      --
      maximumIngressQueue :: Int
maximumIngressQueue = Int -> Int
addSafetyMargin (Int -> Int) -> Int -> Int
forall a b. (a -> b) -> a -> b
$
          Word16 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word16
txSubmissionMaxUnacked Int -> Int -> Int
forall a. Num a => a -> a -> a
* (Int
44 Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
65_540)
    }

keepAliveProtocolLimits :: MiniProtocolParameters -> MiniProtocolLimits
keepAliveProtocolLimits MiniProtocolParameters
_ =
  MiniProtocolLimits :: Int -> MiniProtocolLimits
MiniProtocolLimits {
      -- One small outstanding message.
      maximumIngressQueue :: Int
maximumIngressQueue = Int -> Int
addSafetyMargin Int
1280
    }

chainSyncMiniProtocolNum :: MiniProtocolNum
chainSyncMiniProtocolNum :: MiniProtocolNum
chainSyncMiniProtocolNum = Word16 -> MiniProtocolNum
MiniProtocolNum Word16
2

blockFetchMiniProtocolNum :: MiniProtocolNum
blockFetchMiniProtocolNum :: MiniProtocolNum
blockFetchMiniProtocolNum = Word16 -> MiniProtocolNum
MiniProtocolNum Word16
3

txSubmissionMiniProtocolNum :: MiniProtocolNum
txSubmissionMiniProtocolNum :: MiniProtocolNum
txSubmissionMiniProtocolNum = Word16 -> MiniProtocolNum
MiniProtocolNum Word16
4

keepAliveMiniProtocolNum :: MiniProtocolNum
keepAliveMiniProtocolNum :: MiniProtocolNum
keepAliveMiniProtocolNum = Word16 -> MiniProtocolNum
MiniProtocolNum Word16
8

-- | A specialised version of @'Ouroboros.Network.Socket.connectToNode'@.
--
connectTo
  :: Snocket IO Socket.Socket Socket.SockAddr
  -> NetworkConnectTracers Socket.SockAddr NodeToNodeVersion
  -> Versions NodeToNodeVersion
              NodeToNodeVersionData
              (OuroborosApplication InitiatorMode Socket.SockAddr BL.ByteString IO a b)
  -> Maybe Socket.SockAddr
  -> Socket.SockAddr
  -> IO ()
connectTo :: Snocket IO Socket SockAddr
-> NetworkConnectTracers SockAddr NodeToNodeVersion
-> Versions
     NodeToNodeVersion
     NodeToNodeVersionData
     (OuroborosApplication 'InitiatorMode SockAddr ByteString IO a b)
-> Maybe SockAddr
-> SockAddr
-> IO ()
connectTo Snocket IO Socket SockAddr
sn NetworkConnectTracers SockAddr NodeToNodeVersion
tr =
    Snocket IO Socket SockAddr
-> Codec
     (Handshake NodeToNodeVersion Term) DeserialiseFailure IO ByteString
-> ProtocolTimeLimits (Handshake NodeToNodeVersion Term)
-> VersionDataCodec Term NodeToNodeVersion NodeToNodeVersionData
-> NetworkConnectTracers SockAddr NodeToNodeVersion
-> (NodeToNodeVersionData
    -> NodeToNodeVersionData -> Accept NodeToNodeVersionData)
-> Versions
     NodeToNodeVersion
     NodeToNodeVersionData
     (OuroborosApplication 'InitiatorMode SockAddr ByteString IO a b)
-> Maybe SockAddr
-> SockAddr
-> IO ()
forall (appType :: MuxMode) vNumber vData fd addr a b.
(Ord vNumber, Typeable vNumber, Show vNumber,
 HasInitiator appType ~ 'True) =>
Snocket IO fd addr
-> Codec (Handshake vNumber Term) DeserialiseFailure IO ByteString
-> ProtocolTimeLimits (Handshake vNumber Term)
-> VersionDataCodec Term vNumber vData
-> NetworkConnectTracers addr vNumber
-> (vData -> vData -> Accept vData)
-> Versions
     vNumber vData (OuroborosApplication appType addr ByteString IO a b)
-> Maybe addr
-> addr
-> IO ()
connectToNode Snocket IO Socket SockAddr
sn Codec
  (Handshake NodeToNodeVersion Term) DeserialiseFailure IO ByteString
forall (m :: * -> *).
MonadST m =>
Codec
  (Handshake NodeToNodeVersion Term) DeserialiseFailure m ByteString
nodeToNodeHandshakeCodec ProtocolTimeLimits (Handshake NodeToNodeVersion Term)
forall k (vNumber :: k).
ProtocolTimeLimits (Handshake vNumber Term)
timeLimitsHandshake
                  ((NodeToNodeVersion -> CodecCBORTerm Text NodeToNodeVersionData)
-> VersionDataCodec Term NodeToNodeVersion NodeToNodeVersionData
forall vNumber vData.
(vNumber -> CodecCBORTerm Text vData)
-> VersionDataCodec Term vNumber vData
cborTermVersionDataCodec NodeToNodeVersion -> CodecCBORTerm Text NodeToNodeVersionData
nodeToNodeCodecCBORTerm)
                  NetworkConnectTracers SockAddr NodeToNodeVersion
tr NodeToNodeVersionData
-> NodeToNodeVersionData -> Accept NodeToNodeVersionData
forall v. Acceptable v => v -> v -> Accept v
acceptableVersion


-- | A specialised version of @'Ouroboros.Network.Socket.withServerNode'@.
-- It forks a thread which runs an accept loop (server thread):
--
-- * when the server thread throws an exception the main thread rethrows
--   it (by 'Async.wait')
-- * when an async exception is thrown to kill the main thread the server thread
--   will be cancelled as well (by 'withAsync')
--
withServer
  :: SocketSnocket
  -> NetworkServerTracers Socket.SockAddr NodeToNodeVersion
  -> NetworkMutableState Socket.SockAddr
  -> AcceptedConnectionsLimit
  -> Socket.Socket
  -> Versions NodeToNodeVersion
              NodeToNodeVersionData
              (OuroborosApplication ResponderMode Socket.SockAddr BL.ByteString IO a b)
  -> ErrorPolicies
  -> IO Void
withServer :: Snocket IO Socket SockAddr
-> NetworkServerTracers SockAddr NodeToNodeVersion
-> NetworkMutableState SockAddr
-> AcceptedConnectionsLimit
-> Socket
-> Versions
     NodeToNodeVersion
     NodeToNodeVersionData
     (OuroborosApplication 'ResponderMode SockAddr ByteString IO a b)
-> ErrorPolicies
-> IO Void
withServer Snocket IO Socket SockAddr
sn NetworkServerTracers SockAddr NodeToNodeVersion
tracers NetworkMutableState SockAddr
networkState AcceptedConnectionsLimit
acceptedConnectionsLimit Socket
sd Versions
  NodeToNodeVersion
  NodeToNodeVersionData
  (OuroborosApplication 'ResponderMode SockAddr ByteString IO a b)
versions ErrorPolicies
errPolicies =
  Snocket IO Socket SockAddr
-> NetworkServerTracers SockAddr NodeToNodeVersion
-> NetworkMutableState SockAddr
-> AcceptedConnectionsLimit
-> Socket
-> Codec
     (Handshake NodeToNodeVersion Term) DeserialiseFailure IO ByteString
-> ProtocolTimeLimits (Handshake NodeToNodeVersion Term)
-> VersionDataCodec Term NodeToNodeVersion NodeToNodeVersionData
-> (NodeToNodeVersionData
    -> NodeToNodeVersionData -> Accept NodeToNodeVersionData)
-> Versions
     NodeToNodeVersion
     NodeToNodeVersionData
     (SomeResponderApplication SockAddr ByteString IO b)
-> ErrorPolicies
-> (SockAddr -> Async Void -> IO Void)
-> IO Void
forall vNumber vData t fd addr b.
(Ord vNumber, Typeable vNumber, Show vNumber, Ord addr) =>
Snocket IO fd addr
-> NetworkServerTracers addr vNumber
-> NetworkMutableState addr
-> AcceptedConnectionsLimit
-> fd
-> Codec (Handshake vNumber Term) DeserialiseFailure IO ByteString
-> ProtocolTimeLimits (Handshake vNumber Term)
-> VersionDataCodec Term vNumber vData
-> (vData -> vData -> Accept vData)
-> Versions
     vNumber vData (SomeResponderApplication addr ByteString IO b)
-> ErrorPolicies
-> (addr -> Async Void -> IO t)
-> IO t
withServerNode'
    Snocket IO Socket SockAddr
sn
    NetworkServerTracers SockAddr NodeToNodeVersion
tracers
    NetworkMutableState SockAddr
networkState
    AcceptedConnectionsLimit
acceptedConnectionsLimit
    Socket
sd
    Codec
  (Handshake NodeToNodeVersion Term) DeserialiseFailure IO ByteString
forall (m :: * -> *).
MonadST m =>
Codec
  (Handshake NodeToNodeVersion Term) DeserialiseFailure m ByteString
nodeToNodeHandshakeCodec
    ProtocolTimeLimits (Handshake NodeToNodeVersion Term)
forall k (vNumber :: k).
ProtocolTimeLimits (Handshake vNumber Term)
timeLimitsHandshake
    ((NodeToNodeVersion -> CodecCBORTerm Text NodeToNodeVersionData)
-> VersionDataCodec Term NodeToNodeVersion NodeToNodeVersionData
forall vNumber vData.
(vNumber -> CodecCBORTerm Text vData)
-> VersionDataCodec Term vNumber vData
cborTermVersionDataCodec NodeToNodeVersion -> CodecCBORTerm Text NodeToNodeVersionData
nodeToNodeCodecCBORTerm)
    NodeToNodeVersionData
-> NodeToNodeVersionData -> Accept NodeToNodeVersionData
forall v. Acceptable v => v -> v -> Accept v
acceptableVersion
    (OuroborosApplication 'ResponderMode SockAddr ByteString IO a b
-> SomeResponderApplication SockAddr ByteString IO b
forall (appType :: MuxMode) addr bytes (m :: * -> *) a b.
(HasResponder appType ~ 'True) =>
OuroborosApplication appType addr bytes m a b
-> SomeResponderApplication addr bytes m b
SomeResponderApplication (OuroborosApplication 'ResponderMode SockAddr ByteString IO a b
 -> SomeResponderApplication SockAddr ByteString IO b)
-> Versions
     NodeToNodeVersion
     NodeToNodeVersionData
     (OuroborosApplication 'ResponderMode SockAddr ByteString IO a b)
-> Versions
     NodeToNodeVersion
     NodeToNodeVersionData
     (SomeResponderApplication SockAddr ByteString IO b)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Versions
  NodeToNodeVersion
  NodeToNodeVersionData
  (OuroborosApplication 'ResponderMode SockAddr ByteString IO a b)
versions)
    ErrorPolicies
errPolicies
    (\SockAddr
_ Async Void
async -> Async Void -> IO Void
forall a. Async a -> IO a
Async.wait Async Void
async)


-- | 'ipSubscriptionWorker' which starts given application versions on each
-- established connection.
--
ipSubscriptionWorker
    :: forall mode x y.
       ( HasInitiator mode ~ True )
    => SocketSnocket
    -> NetworkIPSubscriptionTracers Socket.SockAddr NodeToNodeVersion
    -> NetworkMutableState Socket.SockAddr
    -> IPSubscriptionParams ()
    -> Versions
        NodeToNodeVersion
        NodeToNodeVersionData
        (OuroborosApplication mode Socket.SockAddr BL.ByteString IO x y)
    -> IO Void
ipSubscriptionWorker :: Snocket IO Socket SockAddr
-> NetworkIPSubscriptionTracers SockAddr NodeToNodeVersion
-> NetworkMutableState SockAddr
-> IPSubscriptionParams ()
-> Versions
     NodeToNodeVersion
     NodeToNodeVersionData
     (OuroborosApplication mode SockAddr ByteString IO x y)
-> IO Void
ipSubscriptionWorker
  Snocket IO Socket SockAddr
sn
  NetworkSubscriptionTracers
    { Tracer IO (WithIPList (SubscriptionTrace SockAddr))
nsSubscriptionTracer :: forall (withIPList :: * -> *) addr vNumber.
NetworkSubscriptionTracers withIPList addr vNumber
-> Tracer IO (withIPList (SubscriptionTrace addr))
nsSubscriptionTracer :: Tracer IO (WithIPList (SubscriptionTrace SockAddr))
nsSubscriptionTracer
    , Tracer IO (WithMuxBearer (ConnectionId SockAddr) MuxTrace)
nsMuxTracer :: forall (withIPList :: * -> *) addr vNumber.
NetworkSubscriptionTracers withIPList addr vNumber
-> Tracer IO (WithMuxBearer (ConnectionId addr) MuxTrace)
nsMuxTracer :: Tracer IO (WithMuxBearer (ConnectionId SockAddr) MuxTrace)
nsMuxTracer
    , Tracer
  IO
  (WithMuxBearer
     (ConnectionId SockAddr)
     (TraceSendRecv (Handshake NodeToNodeVersion Term)))
nsHandshakeTracer :: forall (withIPList :: * -> *) addr vNumber.
NetworkSubscriptionTracers withIPList addr vNumber
-> Tracer
     IO
     (WithMuxBearer
        (ConnectionId addr) (TraceSendRecv (Handshake vNumber Term)))
nsHandshakeTracer :: Tracer
  IO
  (WithMuxBearer
     (ConnectionId SockAddr)
     (TraceSendRecv (Handshake NodeToNodeVersion Term)))
nsHandshakeTracer
    , Tracer IO (WithAddr SockAddr ErrorPolicyTrace)
nsErrorPolicyTracer :: forall (withIPList :: * -> *) addr vNumber.
NetworkSubscriptionTracers withIPList addr vNumber
-> Tracer IO (WithAddr addr ErrorPolicyTrace)
nsErrorPolicyTracer :: Tracer IO (WithAddr SockAddr ErrorPolicyTrace)
nsErrorPolicyTracer
    }
  NetworkMutableState SockAddr
networkState
  IPSubscriptionParams ()
subscriptionParams
  Versions
  NodeToNodeVersion
  NodeToNodeVersionData
  (OuroborosApplication mode SockAddr ByteString IO x y)
versions
    = Snocket IO Socket SockAddr
-> Tracer IO (WithIPList (SubscriptionTrace SockAddr))
-> Tracer IO (WithAddr SockAddr ErrorPolicyTrace)
-> NetworkMutableState SockAddr
-> IPSubscriptionParams ()
-> (Socket -> IO ())
-> IO Void
forall a.
Snocket IO Socket SockAddr
-> Tracer IO (WithIPList (SubscriptionTrace SockAddr))
-> Tracer IO (WithAddr SockAddr ErrorPolicyTrace)
-> NetworkMutableState SockAddr
-> IPSubscriptionParams a
-> (Socket -> IO a)
-> IO Void
Subscription.ipSubscriptionWorker
        Snocket IO Socket SockAddr
sn
        Tracer IO (WithIPList (SubscriptionTrace SockAddr))
nsSubscriptionTracer
        Tracer IO (WithAddr SockAddr ErrorPolicyTrace)
nsErrorPolicyTracer
        NetworkMutableState SockAddr
networkState
        IPSubscriptionParams ()
subscriptionParams
        (Snocket IO Socket SockAddr
-> Codec
     (Handshake NodeToNodeVersion Term) DeserialiseFailure IO ByteString
-> ProtocolTimeLimits (Handshake NodeToNodeVersion Term)
-> VersionDataCodec Term NodeToNodeVersion NodeToNodeVersionData
-> NetworkConnectTracers SockAddr NodeToNodeVersion
-> (NodeToNodeVersionData
    -> NodeToNodeVersionData -> Accept NodeToNodeVersionData)
-> Versions
     NodeToNodeVersion
     NodeToNodeVersionData
     (OuroborosApplication mode SockAddr ByteString IO x y)
-> Socket
-> IO ()
forall (appType :: MuxMode) vNumber vData fd addr a b.
(Ord vNumber, Typeable vNumber, Show vNumber,
 HasInitiator appType ~ 'True) =>
Snocket IO fd addr
-> Codec (Handshake vNumber Term) DeserialiseFailure IO ByteString
-> ProtocolTimeLimits (Handshake vNumber Term)
-> VersionDataCodec Term vNumber vData
-> NetworkConnectTracers addr vNumber
-> (vData -> vData -> Accept vData)
-> Versions
     vNumber vData (OuroborosApplication appType addr ByteString IO a b)
-> fd
-> IO ()
connectToNode'
          Snocket IO Socket SockAddr
sn
          Codec
  (Handshake NodeToNodeVersion Term) DeserialiseFailure IO ByteString
forall (m :: * -> *).
MonadST m =>
Codec
  (Handshake NodeToNodeVersion Term) DeserialiseFailure m ByteString
nodeToNodeHandshakeCodec
          ProtocolTimeLimits (Handshake NodeToNodeVersion Term)
forall k (vNumber :: k).
ProtocolTimeLimits (Handshake vNumber Term)
timeLimitsHandshake
          ((NodeToNodeVersion -> CodecCBORTerm Text NodeToNodeVersionData)
-> VersionDataCodec Term NodeToNodeVersion NodeToNodeVersionData
forall vNumber vData.
(vNumber -> CodecCBORTerm Text vData)
-> VersionDataCodec Term vNumber vData
cborTermVersionDataCodec NodeToNodeVersion -> CodecCBORTerm Text NodeToNodeVersionData
nodeToNodeCodecCBORTerm)
          (Tracer IO (WithMuxBearer (ConnectionId SockAddr) MuxTrace)
-> Tracer
     IO
     (WithMuxBearer
        (ConnectionId SockAddr)
        (TraceSendRecv (Handshake NodeToNodeVersion Term)))
-> NetworkConnectTracers SockAddr NodeToNodeVersion
forall addr vNumber.
Tracer IO (WithMuxBearer (ConnectionId addr) MuxTrace)
-> Tracer
     IO
     (WithMuxBearer
        (ConnectionId addr) (TraceSendRecv (Handshake vNumber Term)))
-> NetworkConnectTracers addr vNumber
NetworkConnectTracers Tracer IO (WithMuxBearer (ConnectionId SockAddr) MuxTrace)
nsMuxTracer Tracer
  IO
  (WithMuxBearer
     (ConnectionId SockAddr)
     (TraceSendRecv (Handshake NodeToNodeVersion Term)))
nsHandshakeTracer)
          NodeToNodeVersionData
-> NodeToNodeVersionData -> Accept NodeToNodeVersionData
forall v. Acceptable v => v -> v -> Accept v
acceptableVersion
          Versions
  NodeToNodeVersion
  NodeToNodeVersionData
  (OuroborosApplication mode SockAddr ByteString IO x y)
versions)


-- | 'dnsSubscriptionWorker' which starts given application versions on each
-- established connection.
--
dnsSubscriptionWorker
    :: forall mode x y.
       ( HasInitiator mode ~ True )
    => SocketSnocket
    -> NetworkDNSSubscriptionTracers NodeToNodeVersion Socket.SockAddr
    -> NetworkMutableState Socket.SockAddr
    -> DnsSubscriptionParams ()
    -> Versions
        NodeToNodeVersion
        NodeToNodeVersionData
        (OuroborosApplication mode Socket.SockAddr BL.ByteString IO x y)
    -> IO Void
dnsSubscriptionWorker :: Snocket IO Socket SockAddr
-> NetworkDNSSubscriptionTracers NodeToNodeVersion SockAddr
-> NetworkMutableState SockAddr
-> DnsSubscriptionParams ()
-> Versions
     NodeToNodeVersion
     NodeToNodeVersionData
     (OuroborosApplication mode SockAddr ByteString IO x y)
-> IO Void
dnsSubscriptionWorker
  Snocket IO Socket SockAddr
sn
  NetworkDNSSubscriptionTracers
    { Tracer IO (WithDomainName (SubscriptionTrace SockAddr))
ndstSubscriptionTracer :: forall vNumber addr.
NetworkDNSSubscriptionTracers vNumber addr
-> Tracer IO (WithDomainName (SubscriptionTrace addr))
ndstSubscriptionTracer :: Tracer IO (WithDomainName (SubscriptionTrace SockAddr))
ndstSubscriptionTracer
    , Tracer IO (WithDomainName DnsTrace)
ndstDnsTracer :: forall vNumber addr.
NetworkDNSSubscriptionTracers vNumber addr
-> Tracer IO (WithDomainName DnsTrace)
ndstDnsTracer :: Tracer IO (WithDomainName DnsTrace)
ndstDnsTracer
    , Tracer IO (WithMuxBearer (ConnectionId SockAddr) MuxTrace)
ndstMuxTracer :: forall vNumber addr.
NetworkDNSSubscriptionTracers vNumber addr
-> Tracer IO (WithMuxBearer (ConnectionId addr) MuxTrace)
ndstMuxTracer :: Tracer IO (WithMuxBearer (ConnectionId SockAddr) MuxTrace)
ndstMuxTracer
    , Tracer
  IO
  (WithMuxBearer
     (ConnectionId SockAddr)
     (TraceSendRecv (Handshake NodeToNodeVersion Term)))
ndstHandshakeTracer :: forall vNumber addr.
NetworkDNSSubscriptionTracers vNumber addr
-> Tracer
     IO
     (WithMuxBearer
        (ConnectionId addr) (TraceSendRecv (Handshake vNumber Term)))
ndstHandshakeTracer :: Tracer
  IO
  (WithMuxBearer
     (ConnectionId SockAddr)
     (TraceSendRecv (Handshake NodeToNodeVersion Term)))
ndstHandshakeTracer
    , Tracer IO (WithAddr SockAddr ErrorPolicyTrace)
ndstErrorPolicyTracer :: forall vNumber addr.
NetworkDNSSubscriptionTracers vNumber addr
-> Tracer IO (WithAddr addr ErrorPolicyTrace)
ndstErrorPolicyTracer :: Tracer IO (WithAddr SockAddr ErrorPolicyTrace)
ndstErrorPolicyTracer
    }
  NetworkMutableState SockAddr
networkState
  DnsSubscriptionParams ()
subscriptionParams
  Versions
  NodeToNodeVersion
  NodeToNodeVersionData
  (OuroborosApplication mode SockAddr ByteString IO x y)
versions =
    Snocket IO Socket SockAddr
-> Tracer IO (WithDomainName (SubscriptionTrace SockAddr))
-> Tracer IO (WithDomainName DnsTrace)
-> Tracer IO (WithAddr SockAddr ErrorPolicyTrace)
-> NetworkMutableState SockAddr
-> DnsSubscriptionParams ()
-> (Socket -> IO ())
-> IO Void
forall a.
Snocket IO Socket SockAddr
-> Tracer IO (WithDomainName (SubscriptionTrace SockAddr))
-> Tracer IO (WithDomainName DnsTrace)
-> Tracer IO (WithAddr SockAddr ErrorPolicyTrace)
-> NetworkMutableState SockAddr
-> DnsSubscriptionParams a
-> (Socket -> IO a)
-> IO Void
Subscription.dnsSubscriptionWorker
      Snocket IO Socket SockAddr
sn
      Tracer IO (WithDomainName (SubscriptionTrace SockAddr))
ndstSubscriptionTracer
      Tracer IO (WithDomainName DnsTrace)
ndstDnsTracer
      Tracer IO (WithAddr SockAddr ErrorPolicyTrace)
ndstErrorPolicyTracer
      NetworkMutableState SockAddr
networkState
      DnsSubscriptionParams ()
subscriptionParams
      (Snocket IO Socket SockAddr
-> Codec
     (Handshake NodeToNodeVersion Term) DeserialiseFailure IO ByteString
-> ProtocolTimeLimits (Handshake NodeToNodeVersion Term)
-> VersionDataCodec Term NodeToNodeVersion NodeToNodeVersionData
-> NetworkConnectTracers SockAddr NodeToNodeVersion
-> (NodeToNodeVersionData
    -> NodeToNodeVersionData -> Accept NodeToNodeVersionData)
-> Versions
     NodeToNodeVersion
     NodeToNodeVersionData
     (OuroborosApplication mode SockAddr ByteString IO x y)
-> Socket
-> IO ()
forall (appType :: MuxMode) vNumber vData fd addr a b.
(Ord vNumber, Typeable vNumber, Show vNumber,
 HasInitiator appType ~ 'True) =>
Snocket IO fd addr
-> Codec (Handshake vNumber Term) DeserialiseFailure IO ByteString
-> ProtocolTimeLimits (Handshake vNumber Term)
-> VersionDataCodec Term vNumber vData
-> NetworkConnectTracers addr vNumber
-> (vData -> vData -> Accept vData)
-> Versions
     vNumber vData (OuroborosApplication appType addr ByteString IO a b)
-> fd
-> IO ()
connectToNode'
        Snocket IO Socket SockAddr
sn
        Codec
  (Handshake NodeToNodeVersion Term) DeserialiseFailure IO ByteString
forall (m :: * -> *).
MonadST m =>
Codec
  (Handshake NodeToNodeVersion Term) DeserialiseFailure m ByteString
nodeToNodeHandshakeCodec
        ProtocolTimeLimits (Handshake NodeToNodeVersion Term)
forall k (vNumber :: k).
ProtocolTimeLimits (Handshake vNumber Term)
timeLimitsHandshake
        ((NodeToNodeVersion -> CodecCBORTerm Text NodeToNodeVersionData)
-> VersionDataCodec Term NodeToNodeVersion NodeToNodeVersionData
forall vNumber vData.
(vNumber -> CodecCBORTerm Text vData)
-> VersionDataCodec Term vNumber vData
cborTermVersionDataCodec NodeToNodeVersion -> CodecCBORTerm Text NodeToNodeVersionData
nodeToNodeCodecCBORTerm)
        (Tracer IO (WithMuxBearer (ConnectionId SockAddr) MuxTrace)
-> Tracer
     IO
     (WithMuxBearer
        (ConnectionId SockAddr)
        (TraceSendRecv (Handshake NodeToNodeVersion Term)))
-> NetworkConnectTracers SockAddr NodeToNodeVersion
forall addr vNumber.
Tracer IO (WithMuxBearer (ConnectionId addr) MuxTrace)
-> Tracer
     IO
     (WithMuxBearer
        (ConnectionId addr) (TraceSendRecv (Handshake vNumber Term)))
-> NetworkConnectTracers addr vNumber
NetworkConnectTracers Tracer IO (WithMuxBearer (ConnectionId SockAddr) MuxTrace)
ndstMuxTracer Tracer
  IO
  (WithMuxBearer
     (ConnectionId SockAddr)
     (TraceSendRecv (Handshake NodeToNodeVersion Term)))
ndstHandshakeTracer)
        NodeToNodeVersionData
-> NodeToNodeVersionData -> Accept NodeToNodeVersionData
forall v. Acceptable v => v -> v -> Accept v
acceptableVersion
        Versions
  NodeToNodeVersion
  NodeToNodeVersionData
  (OuroborosApplication mode SockAddr ByteString IO x y)
versions)


-- | A minimal error policy for remote peers, which only handles exceptions
-- raised by `ouroboros-network`.
--
remoteNetworkErrorPolicy :: ErrorPolicies
remoteNetworkErrorPolicy :: ErrorPolicies
remoteNetworkErrorPolicy = ErrorPolicies :: [ErrorPolicy] -> [ErrorPolicy] -> ErrorPolicies
ErrorPolicies {
      epAppErrorPolicies :: [ErrorPolicy]
epAppErrorPolicies = [
          -- Handshake client protocol error: we either did not recognise received
          -- version or we refused it.  This is only for outbound connections,
          -- thus we suspend the consumer.
          (HandshakeProtocolError NodeToNodeVersion
 -> Maybe (SuspendDecision DiffTime))
-> ErrorPolicy
forall e.
Exception e =>
(e -> Maybe (SuspendDecision DiffTime)) -> ErrorPolicy
ErrorPolicy
            ((HandshakeProtocolError NodeToNodeVersion
  -> Maybe (SuspendDecision DiffTime))
 -> ErrorPolicy)
-> (HandshakeProtocolError NodeToNodeVersion
    -> Maybe (SuspendDecision DiffTime))
-> ErrorPolicy
forall a b. (a -> b) -> a -> b
$ \(HandshakeProtocolError NodeToNodeVersion
_ :: HandshakeProtocolError NodeToNodeVersion)
                  -> SuspendDecision DiffTime -> Maybe (SuspendDecision DiffTime)
forall a. a -> Maybe a
Just SuspendDecision DiffTime
misconfiguredPeer

          -- deserialisation failure; this means that the remote peer is either
          -- buggy, adversarial, or the connection return garbage.  In the last
          -- case it's also good to shutdown both the consumer and the
          -- producer, as it's likely that the other side of the connection
          -- will return garbage as well.
        , (DecoderFailure -> Maybe (SuspendDecision DiffTime)) -> ErrorPolicy
forall e.
Exception e =>
(e -> Maybe (SuspendDecision DiffTime)) -> ErrorPolicy
ErrorPolicy
           ((DecoderFailure -> Maybe (SuspendDecision DiffTime))
 -> ErrorPolicy)
-> (DecoderFailure -> Maybe (SuspendDecision DiffTime))
-> ErrorPolicy
forall a b. (a -> b) -> a -> b
$ \(DecoderFailure
_ :: DecoderFailure)
                 -> SuspendDecision DiffTime -> Maybe (SuspendDecision DiffTime)
forall a. a -> Maybe a
Just SuspendDecision DiffTime
theyBuggyOrEvil

        , (ProtocolLimitFailure -> Maybe (SuspendDecision DiffTime))
-> ErrorPolicy
forall e.
Exception e =>
(e -> Maybe (SuspendDecision DiffTime)) -> ErrorPolicy
ErrorPolicy
          ((ProtocolLimitFailure -> Maybe (SuspendDecision DiffTime))
 -> ErrorPolicy)
-> (ProtocolLimitFailure -> Maybe (SuspendDecision DiffTime))
-> ErrorPolicy
forall a b. (a -> b) -> a -> b
$ \(ProtocolLimitFailure
msg :: ProtocolLimitFailure)
                  -> case ProtocolLimitFailure
msg of
                      ExceededSizeLimit{} -> SuspendDecision DiffTime -> Maybe (SuspendDecision DiffTime)
forall a. a -> Maybe a
Just SuspendDecision DiffTime
theyBuggyOrEvil
                      ExceededTimeLimit{} -> SuspendDecision DiffTime -> Maybe (SuspendDecision DiffTime)
forall a. a -> Maybe a
Just (DiffTime -> SuspendDecision DiffTime
forall t. t -> SuspendDecision t
SuspendConsumer DiffTime
shortDelay)

          -- the connection was unexpectedly closed, we suspend the peer for
          -- a 'shortDelay'
        , (MuxError -> Maybe (SuspendDecision DiffTime)) -> ErrorPolicy
forall e.
Exception e =>
(e -> Maybe (SuspendDecision DiffTime)) -> ErrorPolicy
ErrorPolicy
            ((MuxError -> Maybe (SuspendDecision DiffTime)) -> ErrorPolicy)
-> (MuxError -> Maybe (SuspendDecision DiffTime)) -> ErrorPolicy
forall a b. (a -> b) -> a -> b
$ \(MuxError
e :: MuxError)
                  -> case MuxError -> MuxErrorType
errorType MuxError
e of
                        MuxErrorType
MuxUnknownMiniProtocol  -> SuspendDecision DiffTime -> Maybe (SuspendDecision DiffTime)
forall a. a -> Maybe a
Just SuspendDecision DiffTime
theyBuggyOrEvil
                        MuxErrorType
MuxDecodeError          -> SuspendDecision DiffTime -> Maybe (SuspendDecision DiffTime)
forall a. a -> Maybe a
Just SuspendDecision DiffTime
theyBuggyOrEvil
                        MuxErrorType
MuxIngressQueueOverRun  -> SuspendDecision DiffTime -> Maybe (SuspendDecision DiffTime)
forall a. a -> Maybe a
Just SuspendDecision DiffTime
theyBuggyOrEvil
                        MuxErrorType
MuxInitiatorOnly        -> SuspendDecision DiffTime -> Maybe (SuspendDecision DiffTime)
forall a. a -> Maybe a
Just SuspendDecision DiffTime
theyBuggyOrEvil

                        -- in case of bearer closed / or IOException we suspend
                        -- the peer for a short time
                        --
                        -- TODO: an exponential backoff would be nicer than a fixed 20s
                        -- TODO: right now we cannot suspend just the
                        -- 'responder'.  If a 'responder' throws 'MuxError' we
                        -- might not want to shutdown the consumer (which is
                        -- using different connection), as we do below:
                        MuxErrorType
MuxBearerClosed              -> SuspendDecision DiffTime -> Maybe (SuspendDecision DiffTime)
forall a. a -> Maybe a
Just (DiffTime -> DiffTime -> SuspendDecision DiffTime
forall t. t -> t -> SuspendDecision t
SuspendPeer DiffTime
shortDelay DiffTime
shortDelay)
                        MuxIOException{}             -> SuspendDecision DiffTime -> Maybe (SuspendDecision DiffTime)
forall a. a -> Maybe a
Just (DiffTime -> DiffTime -> SuspendDecision DiffTime
forall t. t -> t -> SuspendDecision t
SuspendPeer DiffTime
shortDelay DiffTime
shortDelay)
                        MuxErrorType
MuxSDUReadTimeout            -> SuspendDecision DiffTime -> Maybe (SuspendDecision DiffTime)
forall a. a -> Maybe a
Just (DiffTime -> DiffTime -> SuspendDecision DiffTime
forall t. t -> t -> SuspendDecision t
SuspendPeer DiffTime
shortDelay DiffTime
shortDelay)
                        MuxErrorType
MuxSDUWriteTimeout           -> SuspendDecision DiffTime -> Maybe (SuspendDecision DiffTime)
forall a. a -> Maybe a
Just (DiffTime -> DiffTime -> SuspendDecision DiffTime
forall t. t -> t -> SuspendDecision t
SuspendPeer DiffTime
shortDelay DiffTime
shortDelay)
                        MuxShutdown {}               -> SuspendDecision DiffTime -> Maybe (SuspendDecision DiffTime)
forall a. a -> Maybe a
Just (DiffTime -> DiffTime -> SuspendDecision DiffTime
forall t. t -> t -> SuspendDecision t
SuspendPeer DiffTime
shortDelay DiffTime
shortDelay)
                        MuxErrorType
MuxCleanShutdown             -> SuspendDecision DiffTime -> Maybe (SuspendDecision DiffTime)
forall a. a -> Maybe a
Just (DiffTime -> DiffTime -> SuspendDecision DiffTime
forall t. t -> t -> SuspendDecision t
SuspendPeer DiffTime
shortDelay DiffTime
shortDelay)

        , (MuxRuntimeError -> Maybe (SuspendDecision DiffTime))
-> ErrorPolicy
forall e.
Exception e =>
(e -> Maybe (SuspendDecision DiffTime)) -> ErrorPolicy
ErrorPolicy
            ((MuxRuntimeError -> Maybe (SuspendDecision DiffTime))
 -> ErrorPolicy)
-> (MuxRuntimeError -> Maybe (SuspendDecision DiffTime))
-> ErrorPolicy
forall a b. (a -> b) -> a -> b
$ \(MuxRuntimeError
e :: MuxRuntimeError)
                  -> case MuxRuntimeError
e of
                       ProtocolAlreadyRunning       {} -> SuspendDecision DiffTime -> Maybe (SuspendDecision DiffTime)
forall a. a -> Maybe a
Just (DiffTime -> DiffTime -> SuspendDecision DiffTime
forall t. t -> t -> SuspendDecision t
SuspendPeer DiffTime
shortDelay DiffTime
shortDelay)
                       UnknownProtocolInternalError {} -> SuspendDecision DiffTime -> Maybe (SuspendDecision DiffTime)
forall a. a -> Maybe a
Just SuspendDecision DiffTime
forall t. SuspendDecision t
Throw
                       MuxBlockedOnCompletionVar    {} -> SuspendDecision DiffTime -> Maybe (SuspendDecision DiffTime)
forall a. a -> Maybe a
Just (DiffTime -> DiffTime -> SuspendDecision DiffTime
forall t. t -> t -> SuspendDecision t
SuspendPeer DiffTime
shortDelay DiffTime
shortDelay)

          -- Error policy for TxSubmission protocol: outbound side (client role)
        , (TxSubmissionProtocolError -> Maybe (SuspendDecision DiffTime))
-> ErrorPolicy
forall e.
Exception e =>
(e -> Maybe (SuspendDecision DiffTime)) -> ErrorPolicy
ErrorPolicy
            ((TxSubmissionProtocolError -> Maybe (SuspendDecision DiffTime))
 -> ErrorPolicy)
-> (TxSubmissionProtocolError -> Maybe (SuspendDecision DiffTime))
-> ErrorPolicy
forall a b. (a -> b) -> a -> b
$ \(TxSubmissionProtocolError
_ :: TxOutbound.TxSubmissionProtocolError)
                  -> SuspendDecision DiffTime -> Maybe (SuspendDecision DiffTime)
forall a. a -> Maybe a
Just SuspendDecision DiffTime
theyBuggyOrEvil

          -- Error policy for TxSubmission protocol: inbound side (server role)
        , (TxSubmissionProtocolError -> Maybe (SuspendDecision DiffTime))
-> ErrorPolicy
forall e.
Exception e =>
(e -> Maybe (SuspendDecision DiffTime)) -> ErrorPolicy
ErrorPolicy
            ((TxSubmissionProtocolError -> Maybe (SuspendDecision DiffTime))
 -> ErrorPolicy)
-> (TxSubmissionProtocolError -> Maybe (SuspendDecision DiffTime))
-> ErrorPolicy
forall a b. (a -> b) -> a -> b
$ \(TxSubmissionProtocolError
_ :: TxInbound.TxSubmissionProtocolError)
                  -> SuspendDecision DiffTime -> Maybe (SuspendDecision DiffTime)
forall a. a -> Maybe a
Just SuspendDecision DiffTime
theyBuggyOrEvil

          -- Error policy for BlockFetch protocol: consumer side (client role)
        , (BlockFetchProtocolFailure -> Maybe (SuspendDecision DiffTime))
-> ErrorPolicy
forall e.
Exception e =>
(e -> Maybe (SuspendDecision DiffTime)) -> ErrorPolicy
ErrorPolicy
            ((BlockFetchProtocolFailure -> Maybe (SuspendDecision DiffTime))
 -> ErrorPolicy)
-> (BlockFetchProtocolFailure -> Maybe (SuspendDecision DiffTime))
-> ErrorPolicy
forall a b. (a -> b) -> a -> b
$ \(BlockFetchProtocolFailure
_ :: BlockFetchProtocolFailure)
                  -> SuspendDecision DiffTime -> Maybe (SuspendDecision DiffTime)
forall a. a -> Maybe a
Just SuspendDecision DiffTime
theyBuggyOrEvil

          -- Error thrown by 'IOManager', this is fatal on Windows, and it will
          -- never fire on other platforms.
        , (Void -> Maybe (SuspendDecision DiffTime)) -> ErrorPolicy
forall e.
Exception e =>
(e -> Maybe (SuspendDecision DiffTime)) -> ErrorPolicy
ErrorPolicy
            ((Void -> Maybe (SuspendDecision DiffTime)) -> ErrorPolicy)
-> (Void -> Maybe (SuspendDecision DiffTime)) -> ErrorPolicy
forall a b. (a -> b) -> a -> b
$ \(Void
_ :: IOManagerError)
                  -> SuspendDecision DiffTime -> Maybe (SuspendDecision DiffTime)
forall a. a -> Maybe a
Just SuspendDecision DiffTime
forall t. SuspendDecision t
Throw
        ],

      -- Exception raised during connect; suspend connecting to that peer for
      -- a 'shortDelay'
      epConErrorPolicies :: [ErrorPolicy]
epConErrorPolicies = [
          (IOException -> Maybe (SuspendDecision DiffTime)) -> ErrorPolicy
forall e.
Exception e =>
(e -> Maybe (SuspendDecision DiffTime)) -> ErrorPolicy
ErrorPolicy ((IOException -> Maybe (SuspendDecision DiffTime)) -> ErrorPolicy)
-> (IOException -> Maybe (SuspendDecision DiffTime)) -> ErrorPolicy
forall a b. (a -> b) -> a -> b
$ \(IOException
_ :: IOException) -> SuspendDecision DiffTime -> Maybe (SuspendDecision DiffTime)
forall a. a -> Maybe a
Just (SuspendDecision DiffTime -> Maybe (SuspendDecision DiffTime))
-> SuspendDecision DiffTime -> Maybe (SuspendDecision DiffTime)
forall a b. (a -> b) -> a -> b
$
            DiffTime -> SuspendDecision DiffTime
forall t. t -> SuspendDecision t
SuspendConsumer DiffTime
shortDelay

        , (Void -> Maybe (SuspendDecision DiffTime)) -> ErrorPolicy
forall e.
Exception e =>
(e -> Maybe (SuspendDecision DiffTime)) -> ErrorPolicy
ErrorPolicy
            ((Void -> Maybe (SuspendDecision DiffTime)) -> ErrorPolicy)
-> (Void -> Maybe (SuspendDecision DiffTime)) -> ErrorPolicy
forall a b. (a -> b) -> a -> b
$ \(Void
_ :: IOManagerError)
                  -> SuspendDecision DiffTime -> Maybe (SuspendDecision DiffTime)
forall a. a -> Maybe a
Just SuspendDecision DiffTime
forall t. SuspendDecision t
Throw
        , (SubscriberError -> Maybe (SuspendDecision DiffTime))
-> ErrorPolicy
forall e.
Exception e =>
(e -> Maybe (SuspendDecision DiffTime)) -> ErrorPolicy
ErrorPolicy
          -- Multiple connection attempts are run in parallel and the last to
          -- finish are cancelled. There may be nothing wrong with the peer,
          -- it could just be slow to respond.
          ((SubscriberError -> Maybe (SuspendDecision DiffTime))
 -> ErrorPolicy)
-> (SubscriberError -> Maybe (SuspendDecision DiffTime))
-> ErrorPolicy
forall a b. (a -> b) -> a -> b
$ \(SubscriberError
_ :: SubscriberError)
                -> SuspendDecision DiffTime -> Maybe (SuspendDecision DiffTime)
forall a. a -> Maybe a
Just (DiffTime -> SuspendDecision DiffTime
forall t. t -> SuspendDecision t
SuspendConsumer DiffTime
veryShortDelay)
        ]
    }
  where
    theyBuggyOrEvil :: SuspendDecision DiffTime
    theyBuggyOrEvil :: SuspendDecision DiffTime
theyBuggyOrEvil = DiffTime -> DiffTime -> SuspendDecision DiffTime
forall t. t -> t -> SuspendDecision t
SuspendPeer DiffTime
defaultDelay DiffTime
defaultDelay

    misconfiguredPeer :: SuspendDecision DiffTime
    misconfiguredPeer :: SuspendDecision DiffTime
misconfiguredPeer = DiffTime -> SuspendDecision DiffTime
forall t. t -> SuspendDecision t
SuspendConsumer DiffTime
defaultDelay

    defaultDelay :: DiffTime
    defaultDelay :: DiffTime
defaultDelay = DiffTime
200 -- seconds

    shortDelay :: DiffTime
    shortDelay :: DiffTime
shortDelay = DiffTime
20 -- seconds

    veryShortDelay :: DiffTime
    veryShortDelay :: DiffTime
veryShortDelay = DiffTime
1 -- seconds

-- | Error policy for local clients.  This is equivalent to
-- 'nullErrorPolicies', but explicit in the errors which can be caught.
--
-- We are very permissive here, and very strict in the
-- `NodeToClient.networkErrorPolicy`.  After any failure the client will be
-- killed and not penalised by this policy.  This allows to restart the local
-- client without a delay.
--
localNetworkErrorPolicy :: ErrorPolicies
localNetworkErrorPolicy :: ErrorPolicies
localNetworkErrorPolicy = ErrorPolicies :: [ErrorPolicy] -> [ErrorPolicy] -> ErrorPolicies
ErrorPolicies {
      epAppErrorPolicies :: [ErrorPolicy]
epAppErrorPolicies = [
          -- exception thrown by `runPeerWithLimits`
          (ProtocolLimitFailure -> Maybe (SuspendDecision DiffTime))
-> ErrorPolicy
forall e.
Exception e =>
(e -> Maybe (SuspendDecision DiffTime)) -> ErrorPolicy
ErrorPolicy
            ((ProtocolLimitFailure -> Maybe (SuspendDecision DiffTime))
 -> ErrorPolicy)
-> (ProtocolLimitFailure -> Maybe (SuspendDecision DiffTime))
-> ErrorPolicy
forall a b. (a -> b) -> a -> b
$ \(ProtocolLimitFailure
_ :: ProtocolLimitFailure)
                  -> Maybe (SuspendDecision DiffTime)
forall a. Maybe a
Nothing

          -- deserialisation failure
        , (DeserialiseFailure -> Maybe (SuspendDecision DiffTime))
-> ErrorPolicy
forall e.
Exception e =>
(e -> Maybe (SuspendDecision DiffTime)) -> ErrorPolicy
ErrorPolicy
            ((DeserialiseFailure -> Maybe (SuspendDecision DiffTime))
 -> ErrorPolicy)
-> (DeserialiseFailure -> Maybe (SuspendDecision DiffTime))
-> ErrorPolicy
forall a b. (a -> b) -> a -> b
$ \(DeserialiseFailure
_ :: CBOR.DeserialiseFailure) -> Maybe (SuspendDecision DiffTime)
forall a. Maybe a
Nothing

          -- the connection was unexpectedly closed, we suspend the peer for
          -- a 'shortDelay'
        , (MuxError -> Maybe (SuspendDecision DiffTime)) -> ErrorPolicy
forall e.
Exception e =>
(e -> Maybe (SuspendDecision DiffTime)) -> ErrorPolicy
ErrorPolicy
          ((MuxError -> Maybe (SuspendDecision DiffTime)) -> ErrorPolicy)
-> (MuxError -> Maybe (SuspendDecision DiffTime)) -> ErrorPolicy
forall a b. (a -> b) -> a -> b
$ \(MuxError
_ :: MuxError) -> Maybe (SuspendDecision DiffTime)
forall a. Maybe a
Nothing
        ],

      -- The node never connects to a local client
      epConErrorPolicies :: [ErrorPolicy]
epConErrorPolicies = []
    }

type RemoteAddress      = Socket.SockAddr
type RemoteConnectionId = ConnectionId RemoteAddress