{-# LANGUAGE FlexibleContexts      #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE NamedFieldPuns        #-}
{-# LANGUAGE RankNTypes            #-}
{-# LANGUAGE TypeFamilies          #-}

module Network.Mux.Egress
  ( muxer
    -- $egress
    -- $servicingsSemantics
  , EgressQueue
  , TranslocationServiceRequest (..)
  , Wanton (..)
  ) where

import           Control.Monad
import qualified Data.ByteString.Lazy as BL

import           Control.Monad.Class.MonadAsync
import           Control.Monad.Class.MonadFork
import           Control.Monad.Class.MonadSTM.Strict
import           Control.Monad.Class.MonadThrow
import           Control.Monad.Class.MonadTime
import           Control.Monad.Class.MonadTimer hiding (timeout)

import           Network.Mux.Timeout
import           Network.Mux.Types

-- $servicingsSemantics
-- = Desired Servicing Semantics
--
--  == /Constructing Fairness/
--
--   In this context we are defining fairness as:
--    - no starvation
--    - when presented with equal demand (from a selection of mini
--      protocols) deliver "equal" service.
--
--   Equality here might be in terms of equal service rate of
--   requests (or segmented requests) and/or in terms of effective
--   (SDU) data rates.
--
--
--  Notes:
--
--   1) It is assumed that (for a given peer) that bulk delivery of
--      blocks (i.e. in recovery mode) and normal, interactive,
--      operation (e.g. chain following) are mutually exclusive. As
--      such there is no requirement to create a notion of
--      prioritisation between such traffic.
--
--   2) We are assuming that the underlying TCP/IP bearer is managed
--      so that individual Mux-layer PDUs are paced. a) this is necessary
--      to mitigate head-of-line blocking effects (i.e. arbitrary
--      amounts of data accruing in the O/S kernel); b) ensuring that
--      any host egress data rate limits can be respected / enforced.
--
--  == /Current Caveats/
--
--  1) Not considering how mini-protocol associations are constructed
--     (depending on deployment model this might be resolved within
--     the instantiation of the peer relationship)
--
--  2) Not yet considered notion of orderly termination - this not
--     likely to be used in an operational context, but may be needed
--     for test harness use.
--
--  == /Principle of Operation/
--
--
--  Egress direction (mini protocol instance to remote peer)
--
--  The request for service (the demand) from a mini protocol is
--  encapsulated in a `Wanton`, such `Wanton`s are placed in a (finite)
--  queue (e.g TBMQ) of `TranslocationServiceRequest`s.
--

-- $egress
-- = Egress Path
--
-- > ┌───────────┐ ┌───────────┐ ┌───────────┐ ┌───────────┐ Every mode per miniprotocol
-- > │ muxDuplex │ │ muxDuplex │ │ muxDuplex │ │ muxDuplex │ has a dedicated thread which
-- > │ Initiator │ │ Responder │ │ Initiator │ │ Responder │ will send ByteStrings of CBOR
-- > │ ChainSync │ │ ChainSync │ │ BlockFetch│ │ BlockFetch│ encoded data.
-- > └─────┬─────┘ └─────┬─────┘ └─────┬─────┘ └─────┬─────┘
-- >       │             │             │             │
-- >       │             │             │             │
-- >       ╰─────────────┴──────┬──────┴─────────────╯
-- >                            │
-- >                     application data
-- >                            │
-- >                         ░░░▼░░
-- >                         ░│  │░ For a given Mux Bearer there is a single egress
-- >                         ░│ci│░ queue shared among all miniprotocols. To ensure
-- >                         ░│cr│░ fairness each miniprotocol can at most have one
-- >                         ░└──┘░ message in the queue, see Desired Servicing
-- >                         ░░░│░░ Semantics.
-- >                           ░│░
-- >                       ░░░░░▼░░░
-- >                       ░┌─────┐░ The egress queue is served by a dedicated thread
-- >                       ░│ mux │░ which chops up the CBOR data into MuxSDUs with at
-- >                       ░└─────┘░ most sduSize bytes of data in them.
-- >                       ░░░░│░░░░
-- >                          ░│░ MuxSDUs
-- >                          ░│░
-- >                  ░░░░░░░░░▼░░░░░░░░░░
-- >                  ░┌────────────────┐░
-- >                  ░│ Bearer.write() │░ Mux Bearer implementation specific write
-- >                  ░└────────────────┘░
-- >                  ░░░░░░░░░│░░░░░░░░░░
-- >                           │ ByteStrings
-- >                           ▼
-- >                           ●

type EgressQueue m = TBQueue m (TranslocationServiceRequest m)

-- | A TranslocationServiceRequest is a demand for the translocation
--  of a single mini-protocol message. This message can be of
--  arbitrary (yet bounded) size. This multiplexing layer is
--  responsible for the segmentation of concrete representation into
--  appropriate SDU's for onward transmission.
data TranslocationServiceRequest m =
     TLSRDemand !MiniProtocolNum !MiniProtocolDir !(Wanton m)

-- | A Wanton represent the concrete data to be translocated, note that the
--  TVar becoming empty indicates -- that the last fragment of the data has
--  been enqueued on the -- underlying bearer.
newtype Wanton m = Wanton { Wanton m -> StrictTVar m ByteString
want :: StrictTVar m BL.ByteString }


-- | Process the messages from the mini protocols - there is a single
-- shared FIFO that contains the items of work. This is processed so
-- that each active demand gets a `maxSDU`s work of data processed
-- each time it gets to the front of the queue
muxer
    :: ( MonadAsync m
       , MonadFork m
       , MonadMask m
       , MonadThrow (STM m)
       , MonadTimer m
       , MonadTime m
       )
    => EgressQueue m
    -> MuxBearer m
    -> m void
muxer :: EgressQueue m -> MuxBearer m -> m void
muxer EgressQueue m
egressQueue MuxBearer m
bearer =
    (TimeoutFn m -> m void) -> m void
forall (m :: * -> *) b.
(MonadAsync m, MonadFork m, MonadMonotonicTime m, MonadTimer m,
 MonadMask m, MonadThrow (STM m)) =>
(TimeoutFn m -> m b) -> m b
withTimeoutSerial ((TimeoutFn m -> m void) -> m void)
-> (TimeoutFn m -> m void) -> m void
forall a b. (a -> b) -> a -> b
$ \TimeoutFn m
timeout ->
    m () -> m void
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (m () -> m void) -> m () -> m void
forall a b. (a -> b) -> a -> b
$ do
      TLSRDemand MiniProtocolNum
mpc MiniProtocolDir
md Wanton m
d <- STM m (TranslocationServiceRequest m)
-> m (TranslocationServiceRequest m)
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m (TranslocationServiceRequest m)
 -> m (TranslocationServiceRequest m))
-> STM m (TranslocationServiceRequest m)
-> m (TranslocationServiceRequest m)
forall a b. (a -> b) -> a -> b
$ EgressQueue m -> STM m (TranslocationServiceRequest m)
forall (m :: * -> *) a. MonadSTM m => TBQueue m a -> STM m a
readTBQueue EgressQueue m
egressQueue
      EgressQueue m
-> MuxBearer m
-> TimeoutFn m
-> MiniProtocolNum
-> MiniProtocolDir
-> Wanton m
-> m ()
forall (m :: * -> *).
MonadSTM m =>
EgressQueue m
-> MuxBearer m
-> TimeoutFn m
-> MiniProtocolNum
-> MiniProtocolDir
-> Wanton m
-> m ()
processSingleWanton EgressQueue m
egressQueue MuxBearer m
bearer TimeoutFn m
timeout MiniProtocolNum
mpc MiniProtocolDir
md Wanton m
d

-- | Pull a `maxSDU`s worth of data out out the `Wanton` - if there is
-- data remaining requeue the `TranslocationServiceRequest` (this
-- ensures that any other items on the queue will get some service
-- first.
processSingleWanton :: MonadSTM m
                    => EgressQueue m
                    -> MuxBearer m
                    -> TimeoutFn m
                    -> MiniProtocolNum
                    -> MiniProtocolDir
                    -> Wanton m
                    -> m ()
processSingleWanton :: EgressQueue m
-> MuxBearer m
-> TimeoutFn m
-> MiniProtocolNum
-> MiniProtocolDir
-> Wanton m
-> m ()
processSingleWanton EgressQueue m
egressQueue MuxBearer { TimeoutFn m -> MuxSDU -> m Time
write :: forall (m :: * -> *).
MuxBearer m -> TimeoutFn m -> MuxSDU -> m Time
write :: TimeoutFn m -> MuxSDU -> m Time
write, SDUSize
sduSize :: forall (m :: * -> *). MuxBearer m -> SDUSize
sduSize :: SDUSize
sduSize }
                    TimeoutFn m
timeout MiniProtocolNum
mpc MiniProtocolDir
md Wanton m
wanton = do
    ByteString
blob <- STM m ByteString -> m ByteString
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m ByteString -> m ByteString)
-> STM m ByteString -> m ByteString
forall a b. (a -> b) -> a -> b
$ do
      -- extract next SDU
      ByteString
d <- StrictTVar m ByteString -> STM m ByteString
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar (Wanton m -> StrictTVar m ByteString
forall (m :: * -> *). Wanton m -> StrictTVar m ByteString
want Wanton m
wanton)
      let (ByteString
frag, ByteString
rest) = Int64 -> ByteString -> (ByteString, ByteString)
BL.splitAt (Word16 -> Int64
forall a b. (Integral a, Num b) => a -> b
fromIntegral (SDUSize -> Word16
getSDUSize SDUSize
sduSize)) ByteString
d
      -- if more to process then enqueue remaining work
      if ByteString -> Bool
BL.null ByteString
rest
        then StrictTVar m ByteString -> ByteString -> STM m ()
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
StrictTVar m a -> a -> STM m ()
writeTVar (Wanton m -> StrictTVar m ByteString
forall (m :: * -> *). Wanton m -> StrictTVar m ByteString
want Wanton m
wanton) ByteString
BL.empty
        else do
          -- Note that to preserve bytestream ordering within a given
          -- miniprotocol the readTVar and writeTVar operations
          -- must be inside the same STM transaction.
          StrictTVar m ByteString -> ByteString -> STM m ()
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
StrictTVar m a -> a -> STM m ()
writeTVar (Wanton m -> StrictTVar m ByteString
forall (m :: * -> *). Wanton m -> StrictTVar m ByteString
want Wanton m
wanton) ByteString
rest
          EgressQueue m -> TranslocationServiceRequest m -> STM m ()
forall (m :: * -> *) a. MonadSTM m => TBQueue m a -> a -> STM m ()
writeTBQueue EgressQueue m
egressQueue (MiniProtocolNum
-> MiniProtocolDir -> Wanton m -> TranslocationServiceRequest m
forall (m :: * -> *).
MiniProtocolNum
-> MiniProtocolDir -> Wanton m -> TranslocationServiceRequest m
TLSRDemand MiniProtocolNum
mpc MiniProtocolDir
md Wanton m
wanton)
      -- return data to send
      ByteString -> STM m ByteString
forall (f :: * -> *) a. Applicative f => a -> f a
pure ByteString
frag
    let sdu :: MuxSDU
sdu = MuxSDU :: MuxSDUHeader -> ByteString -> MuxSDU
MuxSDU {
                msHeader :: MuxSDUHeader
msHeader = MuxSDUHeader :: RemoteClockModel
-> MiniProtocolNum -> MiniProtocolDir -> Word16 -> MuxSDUHeader
MuxSDUHeader {
                    mhTimestamp :: RemoteClockModel
mhTimestamp = (Word32 -> RemoteClockModel
RemoteClockModel Word32
0),
                    mhNum :: MiniProtocolNum
mhNum       = MiniProtocolNum
mpc,
                    mhDir :: MiniProtocolDir
mhDir       = MiniProtocolDir
md,
                    mhLength :: Word16
mhLength    = Int64 -> Word16
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int64 -> Word16) -> Int64 -> Word16
forall a b. (a -> b) -> a -> b
$ ByteString -> Int64
BL.length ByteString
blob
                  },
                msBlob :: ByteString
msBlob = ByteString
blob
              }
    m Time -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m Time -> m ()) -> m Time -> m ()
forall a b. (a -> b) -> a -> b
$ TimeoutFn m -> MuxSDU -> m Time
write TimeoutFn m
timeout MuxSDU
sdu
    --paceTransmission tNow