{-# LANGUAGE BangPatterns              #-}
{-# LANGUAGE DataKinds                 #-}
{-# LANGUAGE ExistentialQuantification #-}
{-# LANGUAGE FlexibleContexts          #-}
{-# LANGUAGE GADTSyntax                #-}
{-# LANGUAGE KindSignatures            #-}
{-# LANGUAGE NamedFieldPuns            #-}
{-# LANGUAGE RankNTypes                #-}
{-# LANGUAGE ScopedTypeVariables       #-}
{-# LANGUAGE TypeFamilies              #-}

module Network.Mux
  ( -- * Defining 'Mux' protocol bundles
    newMux
  , Mux
  , MuxMode (..)
  , HasInitiator
  , HasResponder
  , MiniProtocolBundle (..)
  , MiniProtocolInfo (..)
  , MiniProtocolNum (..)
  , MiniProtocolDirection (..)
  , MiniProtocolLimits (..)
    -- * Running the Mux
  , runMux
  , MuxBearer
  , runMiniProtocol
  , StartOnDemandOrEagerly (..)
  , stopMux
    -- * Monitoring
  , miniProtocolStateMap
  , muxStopped
    -- * Errors
  , MuxError (..)
  , MuxErrorType (..)
    -- * Tracing
  , traceMuxBearerState
  , MuxBearerState (..)
  , MuxTrace (..)
  , WithMuxBearer (..)
  ) where

import qualified Data.ByteString.Lazy as BL
import           Data.Int (Int64)
import           Data.Map (Map)
import qualified Data.Map.Strict as Map
import           Data.Maybe (isNothing)
import           Data.Monoid.Synchronisation (FirstToFinish (..))

import           Control.Applicative
import qualified Control.Concurrent.JobPool as JobPool
import           Control.Exception (SomeAsyncException (..))
import           Control.Monad
import           Control.Monad.Class.MonadAsync
import           Control.Monad.Class.MonadFork
import           Control.Monad.Class.MonadSTM.Strict
import           Control.Monad.Class.MonadThrow
import           Control.Monad.Class.MonadTime
import           Control.Monad.Class.MonadTimer hiding (timeout)
import           Control.Tracer

import           Network.Mux.Channel
import           Network.Mux.Egress as Egress
import           Network.Mux.Ingress as Ingress
import           Network.Mux.Timeout
import           Network.Mux.Trace
import           Network.Mux.Types


data Mux (mode :: MuxMode) m =
     Mux {
       Mux mode m
-> Map
     (MiniProtocolNum, MiniProtocolDir) (MiniProtocolState mode m)
muxMiniProtocols   :: !(Map (MiniProtocolNum, MiniProtocolDir)
                                   (MiniProtocolState mode m)),
       Mux mode m -> TQueue m (ControlCmd mode m)
muxControlCmdQueue :: !(TQueue m (ControlCmd mode m)),
       Mux mode m -> StrictTVar m MuxStatus
muxStatus          :: StrictTVar m MuxStatus
     }


miniProtocolStateMap :: MonadSTM m
                     => Mux mode m
                     -> Map (MiniProtocolNum, MiniProtocolDir)
                            (STM m MiniProtocolStatus)
miniProtocolStateMap :: Mux mode m
-> Map
     (MiniProtocolNum, MiniProtocolDir) (STM m MiniProtocolStatus)
miniProtocolStateMap = (MiniProtocolState mode m -> STM m MiniProtocolStatus)
-> Map
     (MiniProtocolNum, MiniProtocolDir) (MiniProtocolState mode m)
-> Map
     (MiniProtocolNum, MiniProtocolDir) (STM m MiniProtocolStatus)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (StrictTVar m MiniProtocolStatus -> STM m MiniProtocolStatus
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar (StrictTVar m MiniProtocolStatus -> STM m MiniProtocolStatus)
-> (MiniProtocolState mode m -> StrictTVar m MiniProtocolStatus)
-> MiniProtocolState mode m
-> STM m MiniProtocolStatus
forall b c a. (b -> c) -> (a -> b) -> a -> c
. MiniProtocolState mode m -> StrictTVar m MiniProtocolStatus
forall (mode :: MuxMode) (m :: * -> *).
MiniProtocolState mode m -> StrictTVar m MiniProtocolStatus
miniProtocolStatusVar)
                     (Map (MiniProtocolNum, MiniProtocolDir) (MiniProtocolState mode m)
 -> Map
      (MiniProtocolNum, MiniProtocolDir) (STM m MiniProtocolStatus))
-> (Mux mode m
    -> Map
         (MiniProtocolNum, MiniProtocolDir) (MiniProtocolState mode m))
-> Mux mode m
-> Map
     (MiniProtocolNum, MiniProtocolDir) (STM m MiniProtocolStatus)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Mux mode m
-> Map
     (MiniProtocolNum, MiniProtocolDir) (MiniProtocolState mode m)
forall (mode :: MuxMode) (m :: * -> *).
Mux mode m
-> Map
     (MiniProtocolNum, MiniProtocolDir) (MiniProtocolState mode m)
muxMiniProtocols

-- | Await until mux stopped.
--
muxStopped :: MonadSTM m => Mux mode m -> STM m (Maybe SomeException)
muxStopped :: Mux mode m -> STM m (Maybe SomeException)
muxStopped Mux { StrictTVar m MuxStatus
muxStatus :: StrictTVar m MuxStatus
muxStatus :: forall (mode :: MuxMode) (m :: * -> *).
Mux mode m -> StrictTVar m MuxStatus
muxStatus } =
    StrictTVar m MuxStatus -> STM m MuxStatus
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m MuxStatus
muxStatus STM m MuxStatus
-> (MuxStatus -> STM m (Maybe SomeException))
-> STM m (Maybe SomeException)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \MuxStatus
status -> case MuxStatus
status of
      MuxStatus
MuxReady      -> STM m (Maybe SomeException)
forall (m :: * -> *) a. MonadSTM m => STM m a
retry
      MuxFailed SomeException
err -> Maybe SomeException -> STM m (Maybe SomeException)
forall (m :: * -> *) a. Monad m => a -> m a
return (SomeException -> Maybe SomeException
forall a. a -> Maybe a
Just SomeException
err)
      MuxStatus
MuxStopping   -> STM m (Maybe SomeException)
forall (m :: * -> *) a. MonadSTM m => STM m a
retry
      MuxStatus
MuxStopped    -> Maybe SomeException -> STM m (Maybe SomeException)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe SomeException
forall a. Maybe a
Nothing


data MuxStatus
    -- | Initial mux state, mux is ready to accept requests.  It does not
    -- indicate weather mux thread was started or not.
    = MuxReady

    -- | Mux failed with 'SomeException'
    | MuxFailed SomeException

    -- | Mux is being stopped; mux will not accept any new mini-protocols to
    -- start.
    | MuxStopping

     -- | Mux stopped.
    | MuxStopped


newMux :: MonadSTM m  => MiniProtocolBundle mode -> m (Mux mode m)
newMux :: MiniProtocolBundle mode -> m (Mux mode m)
newMux (MiniProtocolBundle [MiniProtocolInfo mode]
ptcls) = do
    Map (MiniProtocolNum, MiniProtocolDir) (MiniProtocolState mode m)
muxMiniProtocols   <- [MiniProtocolInfo mode]
-> m (Map
        (MiniProtocolNum, MiniProtocolDir) (MiniProtocolState mode m))
forall (m :: * -> *) (mode :: MuxMode).
MonadSTM m =>
[MiniProtocolInfo mode]
-> m (Map
        (MiniProtocolNum, MiniProtocolDir) (MiniProtocolState mode m))
mkMiniProtocolStateMap [MiniProtocolInfo mode]
ptcls
    TQueue m (ControlCmd mode m)
muxControlCmdQueue <- STM m (TQueue m (ControlCmd mode m))
-> m (TQueue m (ControlCmd mode m))
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically STM m (TQueue m (ControlCmd mode m))
forall (m :: * -> *) a. MonadSTM m => STM m (TQueue m a)
newTQueue
    StrictTVar m MuxStatus
muxStatus <- MuxStatus -> m (StrictTVar m MuxStatus)
forall (m :: * -> *) a. MonadSTM m => a -> m (StrictTVar m a)
newTVarIO MuxStatus
MuxReady
    Mux mode m -> m (Mux mode m)
forall (m :: * -> *) a. Monad m => a -> m a
return Mux :: forall (mode :: MuxMode) (m :: * -> *).
Map (MiniProtocolNum, MiniProtocolDir) (MiniProtocolState mode m)
-> TQueue m (ControlCmd mode m)
-> StrictTVar m MuxStatus
-> Mux mode m
Mux {
      Map (MiniProtocolNum, MiniProtocolDir) (MiniProtocolState mode m)
muxMiniProtocols :: Map (MiniProtocolNum, MiniProtocolDir) (MiniProtocolState mode m)
muxMiniProtocols :: Map (MiniProtocolNum, MiniProtocolDir) (MiniProtocolState mode m)
muxMiniProtocols,
      TQueue m (ControlCmd mode m)
muxControlCmdQueue :: TQueue m (ControlCmd mode m)
muxControlCmdQueue :: TQueue m (ControlCmd mode m)
muxControlCmdQueue,
      StrictTVar m MuxStatus
muxStatus :: StrictTVar m MuxStatus
muxStatus :: StrictTVar m MuxStatus
muxStatus
    }

mkMiniProtocolStateMap :: MonadSTM m
                       => [MiniProtocolInfo mode]
                       -> m (Map (MiniProtocolNum, MiniProtocolDir)
                                 (MiniProtocolState mode m))
mkMiniProtocolStateMap :: [MiniProtocolInfo mode]
-> m (Map
        (MiniProtocolNum, MiniProtocolDir) (MiniProtocolState mode m))
mkMiniProtocolStateMap [MiniProtocolInfo mode]
ptcls =
    [((MiniProtocolNum, MiniProtocolDir), MiniProtocolState mode m)]
-> Map
     (MiniProtocolNum, MiniProtocolDir) (MiniProtocolState mode m)
forall k a. Ord k => [(k, a)] -> Map k a
Map.fromList ([((MiniProtocolNum, MiniProtocolDir), MiniProtocolState mode m)]
 -> Map
      (MiniProtocolNum, MiniProtocolDir) (MiniProtocolState mode m))
-> m [((MiniProtocolNum, MiniProtocolDir),
       MiniProtocolState mode m)]
-> m (Map
        (MiniProtocolNum, MiniProtocolDir) (MiniProtocolState mode m))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>
    [m ((MiniProtocolNum, MiniProtocolDir), MiniProtocolState mode m)]
-> m [((MiniProtocolNum, MiniProtocolDir),
       MiniProtocolState mode m)]
forall (t :: * -> *) (m :: * -> *) a.
(Traversable t, Monad m) =>
t (m a) -> m (t a)
sequence
      [ do MiniProtocolState mode m
state <- MiniProtocolInfo mode -> m (MiniProtocolState mode m)
forall (m :: * -> *) (mode :: MuxMode).
MonadSTM m =>
MiniProtocolInfo mode -> m (MiniProtocolState mode m)
mkMiniProtocolState MiniProtocolInfo mode
ptcl
           ((MiniProtocolNum, MiniProtocolDir), MiniProtocolState mode m)
-> m ((MiniProtocolNum, MiniProtocolDir), MiniProtocolState mode m)
forall (m :: * -> *) a. Monad m => a -> m a
return ((MiniProtocolNum
miniProtocolNum, MiniProtocolDirection mode -> MiniProtocolDir
forall (mode :: MuxMode).
MiniProtocolDirection mode -> MiniProtocolDir
protocolDirEnum MiniProtocolDirection mode
miniProtocolDir), MiniProtocolState mode m
state)
      | ptcl :: MiniProtocolInfo mode
ptcl@MiniProtocolInfo {MiniProtocolNum
miniProtocolNum :: forall (mode :: MuxMode). MiniProtocolInfo mode -> MiniProtocolNum
miniProtocolNum :: MiniProtocolNum
miniProtocolNum, MiniProtocolDirection mode
miniProtocolDir :: forall (mode :: MuxMode).
MiniProtocolInfo mode -> MiniProtocolDirection mode
miniProtocolDir :: MiniProtocolDirection mode
miniProtocolDir} <- [MiniProtocolInfo mode]
ptcls ]

mkMiniProtocolState :: MonadSTM m
                    => MiniProtocolInfo mode
                    -> m (MiniProtocolState mode m)
mkMiniProtocolState :: MiniProtocolInfo mode -> m (MiniProtocolState mode m)
mkMiniProtocolState MiniProtocolInfo mode
miniProtocolInfo = do
    StrictTVar m ByteString
miniProtocolIngressQueue <- ByteString -> m (StrictTVar m ByteString)
forall (m :: * -> *) a. MonadSTM m => a -> m (StrictTVar m a)
newTVarIO ByteString
BL.empty
    StrictTVar m MiniProtocolStatus
miniProtocolStatusVar    <- MiniProtocolStatus -> m (StrictTVar m MiniProtocolStatus)
forall (m :: * -> *) a. MonadSTM m => a -> m (StrictTVar m a)
newTVarIO MiniProtocolStatus
StatusIdle
    MiniProtocolState mode m -> m (MiniProtocolState mode m)
forall (m :: * -> *) a. Monad m => a -> m a
return MiniProtocolState :: forall (mode :: MuxMode) (m :: * -> *).
MiniProtocolInfo mode
-> IngressQueue m
-> StrictTVar m MiniProtocolStatus
-> MiniProtocolState mode m
MiniProtocolState {
       MiniProtocolInfo mode
miniProtocolInfo :: MiniProtocolInfo mode
miniProtocolInfo :: MiniProtocolInfo mode
miniProtocolInfo,
       StrictTVar m ByteString
miniProtocolIngressQueue :: StrictTVar m ByteString
miniProtocolIngressQueue :: StrictTVar m ByteString
miniProtocolIngressQueue,
       StrictTVar m MiniProtocolStatus
miniProtocolStatusVar :: StrictTVar m MiniProtocolStatus
miniProtocolStatusVar :: StrictTVar m MiniProtocolStatus
miniProtocolStatusVar
     }

-- | Shut down the mux. This will cause 'runMux' to return. It does not
-- wait for any protocol threads to finish, so you should do that first if
-- necessary.
--
stopMux :: MonadSTM m  => Mux mode m -> m ()
stopMux :: Mux mode m -> m ()
stopMux Mux{TQueue m (ControlCmd mode m)
muxControlCmdQueue :: TQueue m (ControlCmd mode m)
muxControlCmdQueue :: forall (mode :: MuxMode) (m :: * -> *).
Mux mode m -> TQueue m (ControlCmd mode m)
muxControlCmdQueue} =
    STM m () -> m ()
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ TQueue m (ControlCmd mode m) -> ControlCmd mode m -> STM m ()
forall (m :: * -> *) a. MonadSTM m => TQueue m a -> a -> STM m ()
writeTQueue TQueue m (ControlCmd mode m)
muxControlCmdQueue ControlCmd mode m
forall (mode :: MuxMode) (m :: * -> *). ControlCmd mode m
CmdShutdown


-- | Mux classification of 'Job's
--
data MuxGroup = MuxJob
              | MiniProtocolJob
  deriving (MuxGroup -> MuxGroup -> Bool
(MuxGroup -> MuxGroup -> Bool)
-> (MuxGroup -> MuxGroup -> Bool) -> Eq MuxGroup
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: MuxGroup -> MuxGroup -> Bool
$c/= :: MuxGroup -> MuxGroup -> Bool
== :: MuxGroup -> MuxGroup -> Bool
$c== :: MuxGroup -> MuxGroup -> Bool
Eq, Eq MuxGroup
Eq MuxGroup
-> (MuxGroup -> MuxGroup -> Ordering)
-> (MuxGroup -> MuxGroup -> Bool)
-> (MuxGroup -> MuxGroup -> Bool)
-> (MuxGroup -> MuxGroup -> Bool)
-> (MuxGroup -> MuxGroup -> Bool)
-> (MuxGroup -> MuxGroup -> MuxGroup)
-> (MuxGroup -> MuxGroup -> MuxGroup)
-> Ord MuxGroup
MuxGroup -> MuxGroup -> Bool
MuxGroup -> MuxGroup -> Ordering
MuxGroup -> MuxGroup -> MuxGroup
forall a.
Eq a
-> (a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
min :: MuxGroup -> MuxGroup -> MuxGroup
$cmin :: MuxGroup -> MuxGroup -> MuxGroup
max :: MuxGroup -> MuxGroup -> MuxGroup
$cmax :: MuxGroup -> MuxGroup -> MuxGroup
>= :: MuxGroup -> MuxGroup -> Bool
$c>= :: MuxGroup -> MuxGroup -> Bool
> :: MuxGroup -> MuxGroup -> Bool
$c> :: MuxGroup -> MuxGroup -> Bool
<= :: MuxGroup -> MuxGroup -> Bool
$c<= :: MuxGroup -> MuxGroup -> Bool
< :: MuxGroup -> MuxGroup -> Bool
$c< :: MuxGroup -> MuxGroup -> Bool
compare :: MuxGroup -> MuxGroup -> Ordering
$ccompare :: MuxGroup -> MuxGroup -> Ordering
$cp1Ord :: Eq MuxGroup
Ord)


-- | runMux starts a mux bearer for the specified protocols corresponding to
-- one of the provided Versions.
--
-- __Isometric flow control: analysis of head-of-line blocking of the ingress side of the multiplexer__
--
-- For each mini-protocol (enumerated by @ptcl@), mux will create two
-- channels. One for initiator and one for the responder.  Each channel will use
-- a single 'Wanton'.  When it is filled, it is put in a common queue
-- 'tsrQueue'.  This means that the queue is bound by @2 * |ptcl|@.  Every side
-- of a mini-protocol is served by a single 'Wanton': when an application sends
-- data, the channel will try to put it into the 'Wanton' (which might block).
-- 'Wanton's are taken from the 'tsrQueue' queue by one of mux threads.  This
-- eliminates head of line blocking: each mini-protocol thread can block on
-- putting more bytes into its 'Wanton', but it cannot block the other
-- mini-protocols or the thread that is reading the 'tsrQueue' queue.  This is
-- ensured since the 'muxChannel' will put only a non-empty 'Wanton' to the
-- 'tsrQueue' queue, and on such wantons the queue is never blocked.  This means
-- that  the only way the queue can block is when its empty, which means that
-- none of the mini-protocols wanted to send.  The egress part will read
-- a 'Wanton', take a fixed amount of bytes encode them in as an 'MuxSDU'; if
-- there are leftovers it will put them back in the 'Wanton' and place it at the
-- end of the queue (reading and writing to it will happen in a single STM
-- transaction which assures that the order of requests from a mini-protocol is
-- preserved.
--
-- Properties:
--
-- * at any given time the 'tsrQueue' contains at most one
--   'TranslocationServiceRequest' from a given mini-protocol of the given
--   'MiniProtocolDir', thus the queue contains at most @2 * |ptcl|@
--   translocation requests.
-- * at any given time each @TranslocationServiceRequest@ contains a non-empty
-- 'Wanton'
--
runMux :: forall m mode.
          ( MonadAsync m
          , MonadCatch m
          , MonadFork m
          , MonadLabelledSTM m
          , MonadThrow (STM m)
          , MonadTime  m
          , MonadTimer m
          , MonadMask m
          )
       => Tracer m MuxTrace
       -> Mux mode m
       -> MuxBearer m
       -> m ()
runMux :: Tracer m MuxTrace -> Mux mode m -> MuxBearer m -> m ()
runMux Tracer m MuxTrace
tracer Mux {Map (MiniProtocolNum, MiniProtocolDir) (MiniProtocolState mode m)
muxMiniProtocols :: Map (MiniProtocolNum, MiniProtocolDir) (MiniProtocolState mode m)
muxMiniProtocols :: forall (mode :: MuxMode) (m :: * -> *).
Mux mode m
-> Map
     (MiniProtocolNum, MiniProtocolDir) (MiniProtocolState mode m)
muxMiniProtocols, TQueue m (ControlCmd mode m)
muxControlCmdQueue :: TQueue m (ControlCmd mode m)
muxControlCmdQueue :: forall (mode :: MuxMode) (m :: * -> *).
Mux mode m -> TQueue m (ControlCmd mode m)
muxControlCmdQueue, StrictTVar m MuxStatus
muxStatus :: StrictTVar m MuxStatus
muxStatus :: forall (mode :: MuxMode) (m :: * -> *).
Mux mode m -> StrictTVar m MuxStatus
muxStatus} MuxBearer m
bearer = do
    TBQueue m (TranslocationServiceRequest m)
egressQueue <- STM m (TBQueue m (TranslocationServiceRequest m))
-> m (TBQueue m (TranslocationServiceRequest m))
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m (TBQueue m (TranslocationServiceRequest m))
 -> m (TBQueue m (TranslocationServiceRequest m)))
-> STM m (TBQueue m (TranslocationServiceRequest m))
-> m (TBQueue m (TranslocationServiceRequest m))
forall a b. (a -> b) -> a -> b
$ Natural -> STM m (TBQueue m (TranslocationServiceRequest m))
forall (m :: * -> *) a.
MonadSTM m =>
Natural -> STM m (TBQueue m a)
newTBQueue Natural
100
    TBQueue m (TranslocationServiceRequest m) -> String -> m ()
forall (m :: * -> *) a.
MonadLabelledSTM m =>
TBQueue m a -> String -> m ()
labelTBQueueIO TBQueue m (TranslocationServiceRequest m)
egressQueue String
"mux-eq"

    (JobPool MuxGroup m MuxJobResult -> m ()) -> m ()
forall group (m :: * -> *) a b.
(MonadAsync m, MonadThrow m, MonadLabelledSTM m) =>
(JobPool group m a -> m b) -> m b
JobPool.withJobPool
      (\JobPool MuxGroup m MuxJobResult
jobpool -> do
        JobPool MuxGroup m MuxJobResult
-> Job MuxGroup m MuxJobResult -> m ()
forall group (m :: * -> *) a.
(MonadAsync m, MonadMask m, Ord group) =>
JobPool group m a -> Job group m a -> m ()
JobPool.forkJob JobPool MuxGroup m MuxJobResult
jobpool (TBQueue m (TranslocationServiceRequest m)
-> Job MuxGroup m MuxJobResult
muxerJob TBQueue m (TranslocationServiceRequest m)
egressQueue)
        JobPool MuxGroup m MuxJobResult
-> Job MuxGroup m MuxJobResult -> m ()
forall group (m :: * -> *) a.
(MonadAsync m, MonadMask m, Ord group) =>
JobPool group m a -> Job group m a -> m ()
JobPool.forkJob JobPool MuxGroup m MuxJobResult
jobpool Job MuxGroup m MuxJobResult
demuxerJob
        Tracer m MuxTrace -> MuxTrace -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m MuxTrace
tracer (MuxBearerState -> MuxTrace
MuxTraceState MuxBearerState
Mature)

        -- Wait for someone to shut us down by calling muxStop or an error.
        -- Outstanding jobs are shut down Upon completion of withJobPool.
        (TimeoutFn m -> m ()) -> m ()
forall (m :: * -> *) b.
(MonadAsync m, MonadFork m, MonadMonotonicTime m, MonadTimer m,
 MonadMask m, MonadThrow (STM m)) =>
(TimeoutFn m -> m b) -> m b
withTimeoutSerial ((TimeoutFn m -> m ()) -> m ()) -> (TimeoutFn m -> m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ \TimeoutFn m
timeout ->
          Tracer m MuxTrace
-> TimeoutFn m
-> JobPool MuxGroup m MuxJobResult
-> TBQueue m (TranslocationServiceRequest m)
-> TQueue m (ControlCmd mode m)
-> StrictTVar m MuxStatus
-> m ()
forall (mode :: MuxMode) (m :: * -> *).
(MonadSTM m, MonadAsync m, MonadMask m, MonadThrow (STM m)) =>
Tracer m MuxTrace
-> TimeoutFn m
-> JobPool MuxGroup m MuxJobResult
-> EgressQueue m
-> TQueue m (ControlCmd mode m)
-> StrictTVar m MuxStatus
-> m ()
monitor Tracer m MuxTrace
tracer
                  TimeoutFn m
timeout
                  JobPool MuxGroup m MuxJobResult
jobpool
                  TBQueue m (TranslocationServiceRequest m)
egressQueue
                  TQueue m (ControlCmd mode m)
muxControlCmdQueue
                  StrictTVar m MuxStatus
muxStatus
      )
    -- Only handle async exceptions, 'monitor' sets 'muxStatus' before throwing
    -- an exception.  Setting 'muxStatus' is necessary to resolve a possible
    -- deadlock of mini-protocol completion action.
    m () -> (SomeAsyncException -> m ()) -> m ()
forall (m :: * -> *) e a.
(MonadCatch m, Exception e) =>
m a -> (e -> m a) -> m a
`catch` \(SomeAsyncException e
e) -> do
      STM m () -> m ()
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ StrictTVar m MuxStatus -> MuxStatus -> STM m ()
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
StrictTVar m a -> a -> STM m ()
writeTVar StrictTVar m MuxStatus
muxStatus (SomeException -> MuxStatus
MuxFailed (SomeException -> MuxStatus) -> SomeException -> MuxStatus
forall a b. (a -> b) -> a -> b
$ e -> SomeException
forall e. Exception e => e -> SomeException
toException e
e)
      e -> m ()
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO e
e
  where
    muxerJob :: TBQueue m (TranslocationServiceRequest m)
-> Job MuxGroup m MuxJobResult
muxerJob TBQueue m (TranslocationServiceRequest m)
egressQueue =
      m MuxJobResult
-> (SomeException -> m MuxJobResult)
-> MuxGroup
-> String
-> Job MuxGroup m MuxJobResult
forall group (m :: * -> *) a.
m a -> (SomeException -> m a) -> group -> String -> Job group m a
JobPool.Job (TBQueue m (TranslocationServiceRequest m)
-> MuxBearer m -> m MuxJobResult
forall (m :: * -> *) void.
(MonadAsync m, MonadFork m, MonadMask m, MonadThrow (STM m),
 MonadTimer m, MonadTime m) =>
EgressQueue m -> MuxBearer m -> m void
muxer TBQueue m (TranslocationServiceRequest m)
egressQueue MuxBearer m
bearer)
                  (MuxJobResult -> m MuxJobResult
forall (m :: * -> *) a. Monad m => a -> m a
return (MuxJobResult -> m MuxJobResult)
-> (SomeException -> MuxJobResult)
-> SomeException
-> m MuxJobResult
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SomeException -> MuxJobResult
MuxerException)
                  MuxGroup
MuxJob
                  String
"muxer"

    demuxerJob :: Job MuxGroup m MuxJobResult
demuxerJob =
      m MuxJobResult
-> (SomeException -> m MuxJobResult)
-> MuxGroup
-> String
-> Job MuxGroup m MuxJobResult
forall group (m :: * -> *) a.
m a -> (SomeException -> m a) -> group -> String -> Job group m a
JobPool.Job ([MiniProtocolState mode m] -> MuxBearer m -> m MuxJobResult
forall (m :: * -> *) (mode :: MuxMode) void.
(MonadAsync m, MonadFork m, MonadMask m, MonadThrow (STM m),
 MonadTimer m, MonadTime m) =>
[MiniProtocolState mode m] -> MuxBearer m -> m void
demuxer (Map (MiniProtocolNum, MiniProtocolDir) (MiniProtocolState mode m)
-> [MiniProtocolState mode m]
forall k a. Map k a -> [a]
Map.elems Map (MiniProtocolNum, MiniProtocolDir) (MiniProtocolState mode m)
muxMiniProtocols) MuxBearer m
bearer)
                  (MuxJobResult -> m MuxJobResult
forall (m :: * -> *) a. Monad m => a -> m a
return (MuxJobResult -> m MuxJobResult)
-> (SomeException -> MuxJobResult)
-> SomeException
-> m MuxJobResult
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SomeException -> MuxJobResult
DemuxerException)
                  MuxGroup
MuxJob
                  String
"demuxer"

miniProtocolJob
  :: forall mode m.
     ( MonadSTM m
     , MonadThread m
     , MonadThrow (STM m)
     )
  => Tracer m MuxTrace
  -> EgressQueue m
  -> MiniProtocolState mode m
  -> MiniProtocolAction m
  -> JobPool.Job MuxGroup m MuxJobResult
miniProtocolJob :: Tracer m MuxTrace
-> EgressQueue m
-> MiniProtocolState mode m
-> MiniProtocolAction m
-> Job MuxGroup m MuxJobResult
miniProtocolJob Tracer m MuxTrace
tracer EgressQueue m
egressQueue
                MiniProtocolState {
                  miniProtocolInfo :: forall (mode :: MuxMode) (m :: * -> *).
MiniProtocolState mode m -> MiniProtocolInfo mode
miniProtocolInfo =
                    MiniProtocolInfo {
                      MiniProtocolNum
miniProtocolNum :: MiniProtocolNum
miniProtocolNum :: forall (mode :: MuxMode). MiniProtocolInfo mode -> MiniProtocolNum
miniProtocolNum,
                      MiniProtocolDirection mode
miniProtocolDir :: MiniProtocolDirection mode
miniProtocolDir :: forall (mode :: MuxMode).
MiniProtocolInfo mode -> MiniProtocolDirection mode
miniProtocolDir
                    },
                  IngressQueue m
miniProtocolIngressQueue :: IngressQueue m
miniProtocolIngressQueue :: forall (mode :: MuxMode) (m :: * -> *).
MiniProtocolState mode m -> IngressQueue m
miniProtocolIngressQueue,
                  StrictTVar m MiniProtocolStatus
miniProtocolStatusVar :: StrictTVar m MiniProtocolStatus
miniProtocolStatusVar :: forall (mode :: MuxMode) (m :: * -> *).
MiniProtocolState mode m -> StrictTVar m MiniProtocolStatus
miniProtocolStatusVar
                }
                (MiniProtocolAction Channel m -> m (a, Maybe ByteString)
protocolAction StrictTMVar m (Either SomeException a)
completionVar) =
    m MuxJobResult
-> (SomeException -> m MuxJobResult)
-> MuxGroup
-> String
-> Job MuxGroup m MuxJobResult
forall group (m :: * -> *) a.
m a -> (SomeException -> m a) -> group -> String -> Job group m a
JobPool.Job m MuxJobResult
jobAction
                SomeException -> m MuxJobResult
jobHandler
                MuxGroup
MiniProtocolJob
                (MiniProtocolNum -> String
forall a. Show a => a -> String
show MiniProtocolNum
miniProtocolNum String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
"." String -> String -> String
forall a. [a] -> [a] -> [a]
++ MiniProtocolDir -> String
forall a. Show a => a -> String
show MiniProtocolDir
miniProtocolDirEnum)
  where
    jobAction :: m MuxJobResult
jobAction = do
      String -> m ()
forall (m :: * -> *). MonadThread m => String -> m ()
labelThisThread (case MiniProtocolNum
miniProtocolNum of
                        MiniProtocolNum Word16
a -> String
"prtcl-" String -> String -> String
forall a. [a] -> [a] -> [a]
++ Word16 -> String
forall a. Show a => a -> String
show Word16
a)
      IngressQueue m
w <- ByteString -> m (IngressQueue m)
forall (m :: * -> *) a. MonadSTM m => a -> m (StrictTVar m a)
newTVarIO ByteString
BL.empty
      let chan :: Channel m
chan = Tracer m MuxTrace
-> EgressQueue m
-> Wanton m
-> MiniProtocolNum
-> MiniProtocolDir
-> IngressQueue m
-> Channel m
forall (m :: * -> *).
MonadSTM m =>
Tracer m MuxTrace
-> EgressQueue m
-> Wanton m
-> MiniProtocolNum
-> MiniProtocolDir
-> IngressQueue m
-> Channel m
muxChannel Tracer m MuxTrace
tracer EgressQueue m
egressQueue (IngressQueue m -> Wanton m
forall (m :: * -> *). StrictTVar m ByteString -> Wanton m
Wanton IngressQueue m
w)
                           MiniProtocolNum
miniProtocolNum MiniProtocolDir
miniProtocolDirEnum
                           IngressQueue m
miniProtocolIngressQueue
      (a
result, Maybe ByteString
remainder)  <- Channel m -> m (a, Maybe ByteString)
protocolAction Channel m
chan
      Tracer m MuxTrace -> MuxTrace -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m MuxTrace
tracer (MiniProtocolNum -> MiniProtocolDir -> MuxTrace
MuxTraceTerminating MiniProtocolNum
miniProtocolNum MiniProtocolDir
miniProtocolDirEnum)
      STM m () -> m ()
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
        -- The Wanton w is the SDUs that are queued but not yet sent for this job.
        -- Job threads will be prevented from exiting until all their SDUs have been
        -- transmitted unless an exception/error is encountered. In that case all
        -- jobs will be cancelled directly.
        IngressQueue m -> STM m ByteString
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar IngressQueue m
w STM m ByteString -> (ByteString -> STM m ()) -> STM m ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Bool -> STM m ()
forall (m :: * -> *). MonadSTM m => Bool -> STM m ()
check (Bool -> STM m ())
-> (ByteString -> Bool) -> ByteString -> STM m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> Bool
BL.null
        StrictTVar m MiniProtocolStatus -> MiniProtocolStatus -> STM m ()
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
StrictTVar m a -> a -> STM m ()
writeTVar StrictTVar m MiniProtocolStatus
miniProtocolStatusVar MiniProtocolStatus
StatusIdle
        StrictTMVar m (Either SomeException a)
-> Either SomeException a -> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
StrictTMVar m a -> a -> STM m ()
putTMVar StrictTMVar m (Either SomeException a)
completionVar (a -> Either SomeException a
forall a b. b -> Either a b
Right a
result)
          STM m () -> STM m () -> STM m ()
forall (m :: * -> *) a. MonadSTM m => STM m a -> STM m a -> STM m a
`orElse` MuxRuntimeError -> STM m ()
forall (m :: * -> *) e a.
(MonadSTM m, MonadThrow (STM m), Exception e) =>
e -> STM m a
throwSTM (MiniProtocolNum -> MuxRuntimeError
MuxBlockedOnCompletionVar MiniProtocolNum
miniProtocolNum)
        case Maybe ByteString
remainder of
          Just ByteString
trailing ->
            IngressQueue m -> (ByteString -> ByteString) -> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
StrictTVar m a -> (a -> a) -> STM m ()
modifyTVar IngressQueue m
miniProtocolIngressQueue (ByteString -> ByteString -> ByteString
BL.append ByteString
trailing)
          Maybe ByteString
Nothing ->
            () -> STM m ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()

      MuxJobResult -> m MuxJobResult
forall (m :: * -> *) a. Monad m => a -> m a
return (MiniProtocolNum -> MiniProtocolDir -> MuxJobResult
MiniProtocolShutdown MiniProtocolNum
miniProtocolNum MiniProtocolDir
miniProtocolDirEnum)

    jobHandler :: SomeException -> m MuxJobResult
    jobHandler :: SomeException -> m MuxJobResult
jobHandler SomeException
e = do
      STM m () -> m ()
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$
        StrictTMVar m (Either SomeException a)
-> Either SomeException a -> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
StrictTMVar m a -> a -> STM m ()
putTMVar StrictTMVar m (Either SomeException a)
completionVar (SomeException -> Either SomeException a
forall a b. a -> Either a b
Left SomeException
e)
        STM m () -> STM m () -> STM m ()
forall (m :: * -> *) a. MonadSTM m => STM m a -> STM m a -> STM m a
`orElse`
        MuxRuntimeError -> STM m ()
forall (m :: * -> *) e a.
(MonadSTM m, MonadThrow (STM m), Exception e) =>
e -> STM m a
throwSTM (MiniProtocolNum -> MuxRuntimeError
MuxBlockedOnCompletionVar MiniProtocolNum
miniProtocolNum)
      MuxJobResult -> m MuxJobResult
forall (m :: * -> *) a. Monad m => a -> m a
return (MiniProtocolNum -> MiniProtocolDir -> SomeException -> MuxJobResult
MiniProtocolException MiniProtocolNum
miniProtocolNum MiniProtocolDir
miniProtocolDirEnum SomeException
e)

    miniProtocolDirEnum :: MiniProtocolDir
    miniProtocolDirEnum :: MiniProtocolDir
miniProtocolDirEnum = MiniProtocolDirection mode -> MiniProtocolDir
forall (mode :: MuxMode).
MiniProtocolDirection mode -> MiniProtocolDir
protocolDirEnum MiniProtocolDirection mode
miniProtocolDir

data ControlCmd mode m =
     CmdStartProtocolThread
       !StartOnDemandOrEagerly
       !(MiniProtocolState mode m)
       !(MiniProtocolAction m)
   | CmdShutdown

data StartOnDemandOrEagerly = StartOnDemand | StartEagerly
  deriving StartOnDemandOrEagerly -> StartOnDemandOrEagerly -> Bool
(StartOnDemandOrEagerly -> StartOnDemandOrEagerly -> Bool)
-> (StartOnDemandOrEagerly -> StartOnDemandOrEagerly -> Bool)
-> Eq StartOnDemandOrEagerly
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: StartOnDemandOrEagerly -> StartOnDemandOrEagerly -> Bool
$c/= :: StartOnDemandOrEagerly -> StartOnDemandOrEagerly -> Bool
== :: StartOnDemandOrEagerly -> StartOnDemandOrEagerly -> Bool
$c== :: StartOnDemandOrEagerly -> StartOnDemandOrEagerly -> Bool
Eq

data MiniProtocolAction m where
     MiniProtocolAction :: (Channel m -> m (a, Maybe BL.ByteString)) -- ^ Action
                        -> StrictTMVar m (Either SomeException a)    -- ^ Completion var
                        -> MiniProtocolAction m

type MiniProtocolKey = (MiniProtocolNum, MiniProtocolDir)

newtype MonitorCtx m mode = MonitorCtx {
    -- | Mini-Protocols started on demand and waiting to be scheduled.
    --
    MonitorCtx m mode
-> Map
     (MiniProtocolNum, MiniProtocolDir)
     (MiniProtocolState mode m, MiniProtocolAction m)
mcOnDemandProtocols :: (Map MiniProtocolKey
                                (MiniProtocolState mode m, MiniProtocolAction m))

  }

-- | The monitoring loop does two jobs:
--
--  1. It waits for mini-protocol threads to terminate.
--  2. It starts responder protocol threads on demand when the first
--     incoming message arrives.
--
monitor :: forall mode m.
           ( MonadSTM m
           , MonadAsync m
           , MonadMask m
           , MonadThrow (STM m)
           )
        => Tracer m MuxTrace
        -> TimeoutFn m
        -> JobPool.JobPool MuxGroup m MuxJobResult
        -> EgressQueue m
        -> TQueue m (ControlCmd mode m)
        -> StrictTVar m MuxStatus
        -> m ()
monitor :: Tracer m MuxTrace
-> TimeoutFn m
-> JobPool MuxGroup m MuxJobResult
-> EgressQueue m
-> TQueue m (ControlCmd mode m)
-> StrictTVar m MuxStatus
-> m ()
monitor Tracer m MuxTrace
tracer TimeoutFn m
timeout JobPool MuxGroup m MuxJobResult
jobpool EgressQueue m
egressQueue TQueue m (ControlCmd mode m)
cmdQueue StrictTVar m MuxStatus
muxStatus =
    MonitorCtx m mode -> m ()
go (Map
  (MiniProtocolNum, MiniProtocolDir)
  (MiniProtocolState mode m, MiniProtocolAction m)
-> MonitorCtx m mode
forall (m :: * -> *) (mode :: MuxMode).
Map
  (MiniProtocolNum, MiniProtocolDir)
  (MiniProtocolState mode m, MiniProtocolAction m)
-> MonitorCtx m mode
MonitorCtx Map
  (MiniProtocolNum, MiniProtocolDir)
  (MiniProtocolState mode m, MiniProtocolAction m)
forall k a. Map k a
Map.empty)
  where
    go :: MonitorCtx m mode -> m ()
    go :: MonitorCtx m mode -> m ()
go !monitorCtx :: MonitorCtx m mode
monitorCtx@MonitorCtx { Map
  (MiniProtocolNum, MiniProtocolDir)
  (MiniProtocolState mode m, MiniProtocolAction m)
mcOnDemandProtocols :: Map
  (MiniProtocolNum, MiniProtocolDir)
  (MiniProtocolState mode m, MiniProtocolAction m)
mcOnDemandProtocols :: forall (m :: * -> *) (mode :: MuxMode).
MonitorCtx m mode
-> Map
     (MiniProtocolNum, MiniProtocolDir)
     (MiniProtocolState mode m, MiniProtocolAction m)
mcOnDemandProtocols } = do
      MonitorEvent mode m
result <- STM m (MonitorEvent mode m) -> m (MonitorEvent mode m)
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m (MonitorEvent mode m) -> m (MonitorEvent mode m))
-> STM m (MonitorEvent mode m) -> m (MonitorEvent mode m)
forall a b. (a -> b) -> a -> b
$ FirstToFinish (STM m) (MonitorEvent mode m)
-> STM m (MonitorEvent mode m)
forall (m :: * -> *) a. FirstToFinish m a -> m a
runFirstToFinish (FirstToFinish (STM m) (MonitorEvent mode m)
 -> STM m (MonitorEvent mode m))
-> FirstToFinish (STM m) (MonitorEvent mode m)
-> STM m (MonitorEvent mode m)
forall a b. (a -> b) -> a -> b
$
            -- wait for a mini-protocol thread to terminate
           (STM m (MonitorEvent mode m)
-> FirstToFinish (STM m) (MonitorEvent mode m)
forall (m :: * -> *) a. m a -> FirstToFinish m a
FirstToFinish (STM m (MonitorEvent mode m)
 -> FirstToFinish (STM m) (MonitorEvent mode m))
-> STM m (MonitorEvent mode m)
-> FirstToFinish (STM m) (MonitorEvent mode m)
forall a b. (a -> b) -> a -> b
$ MuxJobResult -> MonitorEvent mode m
forall (mode :: MuxMode) (m :: * -> *).
MuxJobResult -> MonitorEvent mode m
EventJobResult (MuxJobResult -> MonitorEvent mode m)
-> STM m MuxJobResult -> STM m (MonitorEvent mode m)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> JobPool MuxGroup m MuxJobResult -> STM m MuxJobResult
forall (m :: * -> *) group a.
MonadSTM m =>
JobPool group m a -> STM m a
JobPool.collect JobPool MuxGroup m MuxJobResult
jobpool)

            -- wait for a new control command
        FirstToFinish (STM m) (MonitorEvent mode m)
-> FirstToFinish (STM m) (MonitorEvent mode m)
-> FirstToFinish (STM m) (MonitorEvent mode m)
forall a. Semigroup a => a -> a -> a
<> (STM m (MonitorEvent mode m)
-> FirstToFinish (STM m) (MonitorEvent mode m)
forall (m :: * -> *) a. m a -> FirstToFinish m a
FirstToFinish (STM m (MonitorEvent mode m)
 -> FirstToFinish (STM m) (MonitorEvent mode m))
-> STM m (MonitorEvent mode m)
-> FirstToFinish (STM m) (MonitorEvent mode m)
forall a b. (a -> b) -> a -> b
$ ControlCmd mode m -> MonitorEvent mode m
forall (mode :: MuxMode) (m :: * -> *).
ControlCmd mode m -> MonitorEvent mode m
EventControlCmd (ControlCmd mode m -> MonitorEvent mode m)
-> STM m (ControlCmd mode m) -> STM m (MonitorEvent mode m)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TQueue m (ControlCmd mode m) -> STM m (ControlCmd mode m)
forall (m :: * -> *) a. MonadSTM m => TQueue m a -> STM m a
readTQueue TQueue m (ControlCmd mode m)
cmdQueue)

            -- or wait for data to arrive on the channels that do not yet have
            -- responder threads running
        FirstToFinish (STM m) (MonitorEvent mode m)
-> FirstToFinish (STM m) (MonitorEvent mode m)
-> FirstToFinish (STM m) (MonitorEvent mode m)
forall a. Semigroup a => a -> a -> a
<> ((MiniProtocolState mode m, MiniProtocolAction m)
 -> FirstToFinish (STM m) (MonitorEvent mode m))
-> Map
     (MiniProtocolNum, MiniProtocolDir)
     (MiniProtocolState mode m, MiniProtocolAction m)
-> FirstToFinish (STM m) (MonitorEvent mode m)
forall (t :: * -> *) m a.
(Foldable t, Monoid m) =>
(a -> m) -> t a -> m
foldMap
             (\(MiniProtocolState mode m
ptclState, MiniProtocolAction m
ptclAction) ->
               STM m (MonitorEvent mode m)
-> FirstToFinish (STM m) (MonitorEvent mode m)
forall (m :: * -> *) a. m a -> FirstToFinish m a
FirstToFinish (STM m (MonitorEvent mode m)
 -> FirstToFinish (STM m) (MonitorEvent mode m))
-> STM m (MonitorEvent mode m)
-> FirstToFinish (STM m) (MonitorEvent mode m)
forall a b. (a -> b) -> a -> b
$ do
                 IngressQueue m -> STM m ()
checkNonEmptyQueue (MiniProtocolState mode m -> IngressQueue m
forall (mode :: MuxMode) (m :: * -> *).
MiniProtocolState mode m -> IngressQueue m
miniProtocolIngressQueue MiniProtocolState mode m
ptclState)
                 MonitorEvent mode m -> STM m (MonitorEvent mode m)
forall (m :: * -> *) a. Monad m => a -> m a
return (MiniProtocolState mode m
-> MiniProtocolAction m -> MonitorEvent mode m
forall (mode :: MuxMode) (m :: * -> *).
MiniProtocolState mode m
-> MiniProtocolAction m -> MonitorEvent mode m
EventStartOnDemand MiniProtocolState mode m
ptclState MiniProtocolAction m
ptclAction)
             )
             Map
  (MiniProtocolNum, MiniProtocolDir)
  (MiniProtocolState mode m, MiniProtocolAction m)
mcOnDemandProtocols

      case MonitorEvent mode m
result of
        -- Protocols that runs to completion are not automatically restarted.
        EventJobResult (MiniProtocolShutdown MiniProtocolNum
pnum MiniProtocolDir
pmode) -> do
          Tracer m MuxTrace -> MuxTrace -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m MuxTrace
tracer (MiniProtocolNum -> MiniProtocolDir -> MuxTrace
MuxTraceCleanExit MiniProtocolNum
pnum MiniProtocolDir
pmode)
          MonitorCtx m mode -> m ()
go MonitorCtx m mode
monitorCtx

        EventJobResult (MiniProtocolException MiniProtocolNum
pnum MiniProtocolDir
pmode SomeException
e) -> do
          Tracer m MuxTrace -> MuxTrace -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m MuxTrace
tracer (MuxBearerState -> MuxTrace
MuxTraceState MuxBearerState
Dead)
          Tracer m MuxTrace -> MuxTrace -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m MuxTrace
tracer (MiniProtocolNum -> MiniProtocolDir -> SomeException -> MuxTrace
MuxTraceExceptionExit MiniProtocolNum
pnum MiniProtocolDir
pmode SomeException
e)
          STM m () -> m ()
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ StrictTVar m MuxStatus -> MuxStatus -> STM m ()
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
StrictTVar m a -> a -> STM m ()
writeTVar StrictTVar m MuxStatus
muxStatus (MuxStatus -> STM m ()) -> MuxStatus -> STM m ()
forall a b. (a -> b) -> a -> b
$ SomeException -> MuxStatus
MuxFailed SomeException
e
          SomeException -> m ()
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO SomeException
e

        -- These two cover internal and protocol errors.  The muxer exception is
        -- always fatal.  The demuxer exception 'MuxError BearerClosed' when all
        -- mini-protocols stopped indicates a normal shutdown and thus it is not
        -- propagated.
        --
        -- TODO: decide if we should have exception wrappers here to identify
        -- the source of the failure, e.g. specific mini-protocol. If we're
        -- propagating exceptions, we don't need to log them.
        EventJobResult (MuxerException SomeException
e) -> do
          Tracer m MuxTrace -> MuxTrace -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m MuxTrace
tracer (MuxBearerState -> MuxTrace
MuxTraceState MuxBearerState
Dead)
          STM m () -> m ()
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ StrictTVar m MuxStatus -> MuxStatus -> STM m ()
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
StrictTVar m a -> a -> STM m ()
writeTVar StrictTVar m MuxStatus
muxStatus (MuxStatus -> STM m ()) -> MuxStatus -> STM m ()
forall a b. (a -> b) -> a -> b
$ SomeException -> MuxStatus
MuxFailed SomeException
e
          SomeException -> m ()
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO SomeException
e
        EventJobResult (DemuxerException SomeException
e) -> do
          Tracer m MuxTrace -> MuxTrace -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m MuxTrace
tracer (MuxBearerState -> MuxTrace
MuxTraceState MuxBearerState
Dead)
          Bool
r <- STM m Bool -> m Bool
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m Bool -> m Bool) -> STM m Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ do
            Int
size <- JobPool MuxGroup m MuxJobResult -> MuxGroup -> STM m Int
forall (m :: * -> *) group a.
(MonadSTM m, Eq group) =>
JobPool group m a -> group -> STM m Int
JobPool.readGroupSize JobPool MuxGroup m MuxJobResult
jobpool MuxGroup
MiniProtocolJob
            case Int
size of
              Int
0  | Just (MuxError MuxErrorType
MuxBearerClosed String
_) <- SomeException -> Maybe MuxError
forall e. Exception e => SomeException -> Maybe e
fromException SomeException
e
                -> StrictTVar m MuxStatus -> MuxStatus -> STM m ()
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
StrictTVar m a -> a -> STM m ()
writeTVar StrictTVar m MuxStatus
muxStatus MuxStatus
MuxStopped
                STM m () -> STM m Bool -> STM m Bool
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Bool -> STM m Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
              Int
_ -> StrictTVar m MuxStatus -> MuxStatus -> STM m ()
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
StrictTVar m a -> a -> STM m ()
writeTVar StrictTVar m MuxStatus
muxStatus (SomeException -> MuxStatus
MuxFailed SomeException
e)
                STM m () -> STM m Bool -> STM m Bool
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Bool -> STM m Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
          if Bool
r
            then () -> m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
            else SomeException -> m ()
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO SomeException
e

        EventControlCmd (CmdStartProtocolThread
                           StartOnDemandOrEagerly
StartEagerly
                           ptclState :: MiniProtocolState mode m
ptclState@MiniProtocolState {
                             miniProtocolInfo :: forall (mode :: MuxMode) (m :: * -> *).
MiniProtocolState mode m -> MiniProtocolInfo mode
miniProtocolInfo = MiniProtocolInfo {
                               MiniProtocolNum
miniProtocolNum :: MiniProtocolNum
miniProtocolNum :: forall (mode :: MuxMode). MiniProtocolInfo mode -> MiniProtocolNum
miniProtocolNum,
                               MiniProtocolDirection mode
miniProtocolDir :: MiniProtocolDirection mode
miniProtocolDir :: forall (mode :: MuxMode).
MiniProtocolInfo mode -> MiniProtocolDirection mode
miniProtocolDir
                             }
                           }
                           MiniProtocolAction m
ptclAction) -> do
          Tracer m MuxTrace -> MuxTrace -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m MuxTrace
tracer (MiniProtocolNum -> MiniProtocolDir -> MuxTrace
MuxTraceStartEagerly MiniProtocolNum
miniProtocolNum
                             (MiniProtocolDirection mode -> MiniProtocolDir
forall (mode :: MuxMode).
MiniProtocolDirection mode -> MiniProtocolDir
protocolDirEnum MiniProtocolDirection mode
miniProtocolDir))
          JobPool MuxGroup m MuxJobResult
-> Job MuxGroup m MuxJobResult -> m ()
forall group (m :: * -> *) a.
(MonadAsync m, MonadMask m, Ord group) =>
JobPool group m a -> Job group m a -> m ()
JobPool.forkJob JobPool MuxGroup m MuxJobResult
jobpool (Job MuxGroup m MuxJobResult -> m ())
-> Job MuxGroup m MuxJobResult -> m ()
forall a b. (a -> b) -> a -> b
$
            Tracer m MuxTrace
-> EgressQueue m
-> MiniProtocolState mode m
-> MiniProtocolAction m
-> Job MuxGroup m MuxJobResult
forall (mode :: MuxMode) (m :: * -> *).
(MonadSTM m, MonadThread m, MonadThrow (STM m)) =>
Tracer m MuxTrace
-> EgressQueue m
-> MiniProtocolState mode m
-> MiniProtocolAction m
-> Job MuxGroup m MuxJobResult
miniProtocolJob
              Tracer m MuxTrace
tracer
              EgressQueue m
egressQueue
              MiniProtocolState mode m
ptclState
              MiniProtocolAction m
ptclAction
          MonitorCtx m mode -> m ()
go MonitorCtx m mode
monitorCtx

        EventControlCmd (CmdStartProtocolThread
                           StartOnDemandOrEagerly
StartOnDemand
                           ptclState :: MiniProtocolState mode m
ptclState@MiniProtocolState {
                             miniProtocolInfo :: forall (mode :: MuxMode) (m :: * -> *).
MiniProtocolState mode m -> MiniProtocolInfo mode
miniProtocolInfo = MiniProtocolInfo {
                               MiniProtocolNum
miniProtocolNum :: MiniProtocolNum
miniProtocolNum :: forall (mode :: MuxMode). MiniProtocolInfo mode -> MiniProtocolNum
miniProtocolNum,
                               MiniProtocolDirection mode
miniProtocolDir :: MiniProtocolDirection mode
miniProtocolDir :: forall (mode :: MuxMode).
MiniProtocolInfo mode -> MiniProtocolDirection mode
miniProtocolDir
                             }
                           }
                           MiniProtocolAction m
ptclAction) -> do
          let monitorCtx' :: MonitorCtx m mode
monitorCtx' = MonitorCtx m mode
monitorCtx { mcOnDemandProtocols :: Map
  (MiniProtocolNum, MiniProtocolDir)
  (MiniProtocolState mode m, MiniProtocolAction m)
mcOnDemandProtocols =
                                           (MiniProtocolNum, MiniProtocolDir)
-> (MiniProtocolState mode m, MiniProtocolAction m)
-> Map
     (MiniProtocolNum, MiniProtocolDir)
     (MiniProtocolState mode m, MiniProtocolAction m)
-> Map
     (MiniProtocolNum, MiniProtocolDir)
     (MiniProtocolState mode m, MiniProtocolAction m)
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert (MiniProtocolState mode m -> (MiniProtocolNum, MiniProtocolDir)
protocolKey MiniProtocolState mode m
ptclState)
                                                      (MiniProtocolState mode m
ptclState, MiniProtocolAction m
ptclAction)
                                                      Map
  (MiniProtocolNum, MiniProtocolDir)
  (MiniProtocolState mode m, MiniProtocolAction m)
mcOnDemandProtocols
                                       }
          Tracer m MuxTrace -> MuxTrace -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m MuxTrace
tracer (MiniProtocolNum -> MiniProtocolDir -> MuxTrace
MuxTraceStartedOnDemand MiniProtocolNum
miniProtocolNum
                             (MiniProtocolDirection mode -> MiniProtocolDir
forall (mode :: MuxMode).
MiniProtocolDirection mode -> MiniProtocolDir
protocolDirEnum MiniProtocolDirection mode
miniProtocolDir))
          MonitorCtx m mode -> m ()
go MonitorCtx m mode
monitorCtx'

        EventControlCmd ControlCmd mode m
CmdShutdown -> do
          Tracer m MuxTrace -> MuxTrace -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m MuxTrace
tracer MuxTrace
MuxTraceShutdown
          STM m () -> m ()
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ StrictTVar m MuxStatus -> MuxStatus -> STM m ()
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
StrictTVar m a -> a -> STM m ()
writeTVar StrictTVar m MuxStatus
muxStatus MuxStatus
MuxStopping
          JobPool MuxGroup m MuxJobResult -> MuxGroup -> m ()
forall (m :: * -> *) group a.
(MonadAsync m, Eq group) =>
JobPool group m a -> group -> m ()
JobPool.cancelGroup JobPool MuxGroup m MuxJobResult
jobpool MuxGroup
MiniProtocolJob
          -- wait for 2 seconds before the egress queue is drained
          Maybe ()
_ <- DiffTime -> m () -> m (Maybe ())
TimeoutFn m
timeout DiffTime
2 (m () -> m (Maybe ())) -> m () -> m (Maybe ())
forall a b. (a -> b) -> a -> b
$
            STM m () -> m ()
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$
                  EgressQueue m -> STM m (Maybe (TranslocationServiceRequest m))
forall (m :: * -> *) a.
MonadSTM m =>
TBQueue m a -> STM m (Maybe a)
tryPeekTBQueue EgressQueue m
egressQueue
              STM m (Maybe (TranslocationServiceRequest m))
-> (Maybe (TranslocationServiceRequest m) -> STM m ()) -> STM m ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Bool -> STM m ()
forall (m :: * -> *). MonadSTM m => Bool -> STM m ()
check (Bool -> STM m ())
-> (Maybe (TranslocationServiceRequest m) -> Bool)
-> Maybe (TranslocationServiceRequest m)
-> STM m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Maybe (TranslocationServiceRequest m) -> Bool
forall a. Maybe a -> Bool
isNothing
          STM m () -> m ()
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ StrictTVar m MuxStatus -> MuxStatus -> STM m ()
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
StrictTVar m a -> a -> STM m ()
writeTVar StrictTVar m MuxStatus
muxStatus MuxStatus
MuxStopped
          -- by exiting the 'monitor' loop we let the job pool kill demuxer and
          -- muxer threads

        -- Data has arrived on a channel for a mini-protocol for which we have
        -- an on-demand-start protocol thread. So we start it now.
        EventStartOnDemand ptclState :: MiniProtocolState mode m
ptclState@MiniProtocolState {
                             miniProtocolInfo :: forall (mode :: MuxMode) (m :: * -> *).
MiniProtocolState mode m -> MiniProtocolInfo mode
miniProtocolInfo = MiniProtocolInfo {
                               MiniProtocolNum
miniProtocolNum :: MiniProtocolNum
miniProtocolNum :: forall (mode :: MuxMode). MiniProtocolInfo mode -> MiniProtocolNum
miniProtocolNum,
                               MiniProtocolDirection mode
miniProtocolDir :: MiniProtocolDirection mode
miniProtocolDir :: forall (mode :: MuxMode).
MiniProtocolInfo mode -> MiniProtocolDirection mode
miniProtocolDir
                             },
                             StrictTVar m MiniProtocolStatus
miniProtocolStatusVar :: StrictTVar m MiniProtocolStatus
miniProtocolStatusVar :: forall (mode :: MuxMode) (m :: * -> *).
MiniProtocolState mode m -> StrictTVar m MiniProtocolStatus
miniProtocolStatusVar
                           }
                           MiniProtocolAction m
ptclAction -> do
          Tracer m MuxTrace -> MuxTrace -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m MuxTrace
tracer (MiniProtocolNum -> MiniProtocolDir -> MuxTrace
MuxTraceStartOnDemand MiniProtocolNum
miniProtocolNum
                             (MiniProtocolDirection mode -> MiniProtocolDir
forall (mode :: MuxMode).
MiniProtocolDirection mode -> MiniProtocolDir
protocolDirEnum MiniProtocolDirection mode
miniProtocolDir))
          STM m () -> m ()
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ StrictTVar m MiniProtocolStatus -> MiniProtocolStatus -> STM m ()
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
StrictTVar m a -> a -> STM m ()
writeTVar StrictTVar m MiniProtocolStatus
miniProtocolStatusVar MiniProtocolStatus
StatusRunning
          JobPool MuxGroup m MuxJobResult
-> Job MuxGroup m MuxJobResult -> m ()
forall group (m :: * -> *) a.
(MonadAsync m, MonadMask m, Ord group) =>
JobPool group m a -> Job group m a -> m ()
JobPool.forkJob JobPool MuxGroup m MuxJobResult
jobpool (Job MuxGroup m MuxJobResult -> m ())
-> Job MuxGroup m MuxJobResult -> m ()
forall a b. (a -> b) -> a -> b
$
            Tracer m MuxTrace
-> EgressQueue m
-> MiniProtocolState mode m
-> MiniProtocolAction m
-> Job MuxGroup m MuxJobResult
forall (mode :: MuxMode) (m :: * -> *).
(MonadSTM m, MonadThread m, MonadThrow (STM m)) =>
Tracer m MuxTrace
-> EgressQueue m
-> MiniProtocolState mode m
-> MiniProtocolAction m
-> Job MuxGroup m MuxJobResult
miniProtocolJob
              Tracer m MuxTrace
tracer
              EgressQueue m
egressQueue
              MiniProtocolState mode m
ptclState
              MiniProtocolAction m
ptclAction
          let ptclKey :: (MiniProtocolNum, MiniProtocolDir)
ptclKey = MiniProtocolState mode m -> (MiniProtocolNum, MiniProtocolDir)
protocolKey MiniProtocolState mode m
ptclState
              monitorCtx' :: MonitorCtx m mode
monitorCtx' = MonitorCtx m mode
monitorCtx { mcOnDemandProtocols :: Map
  (MiniProtocolNum, MiniProtocolDir)
  (MiniProtocolState mode m, MiniProtocolAction m)
mcOnDemandProtocols =
                                           (MiniProtocolNum, MiniProtocolDir)
-> Map
     (MiniProtocolNum, MiniProtocolDir)
     (MiniProtocolState mode m, MiniProtocolAction m)
-> Map
     (MiniProtocolNum, MiniProtocolDir)
     (MiniProtocolState mode m, MiniProtocolAction m)
forall k a. Ord k => k -> Map k a -> Map k a
Map.delete (MiniProtocolNum, MiniProtocolDir)
ptclKey
                                                      Map
  (MiniProtocolNum, MiniProtocolDir)
  (MiniProtocolState mode m, MiniProtocolAction m)
mcOnDemandProtocols
                                       }
          MonitorCtx m mode -> m ()
go MonitorCtx m mode
monitorCtx'

    checkNonEmptyQueue :: IngressQueue m -> STM m ()
    checkNonEmptyQueue :: IngressQueue m -> STM m ()
checkNonEmptyQueue IngressQueue m
q = do
      ByteString
buf <- IngressQueue m -> STM m ByteString
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar IngressQueue m
q
      Bool -> STM m ()
forall (m :: * -> *). MonadSTM m => Bool -> STM m ()
check (Bool -> Bool
not (ByteString -> Bool
BL.null ByteString
buf))

    protocolKey :: MiniProtocolState mode m -> MiniProtocolKey
    protocolKey :: MiniProtocolState mode m -> (MiniProtocolNum, MiniProtocolDir)
protocolKey MiniProtocolState {
                  miniProtocolInfo :: forall (mode :: MuxMode) (m :: * -> *).
MiniProtocolState mode m -> MiniProtocolInfo mode
miniProtocolInfo = MiniProtocolInfo {
                    MiniProtocolNum
miniProtocolNum :: MiniProtocolNum
miniProtocolNum :: forall (mode :: MuxMode). MiniProtocolInfo mode -> MiniProtocolNum
miniProtocolNum,
                    MiniProtocolDirection mode
miniProtocolDir :: MiniProtocolDirection mode
miniProtocolDir :: forall (mode :: MuxMode).
MiniProtocolInfo mode -> MiniProtocolDirection mode
miniProtocolDir
                  }
                } =
      (MiniProtocolNum
miniProtocolNum, MiniProtocolDirection mode -> MiniProtocolDir
forall (mode :: MuxMode).
MiniProtocolDirection mode -> MiniProtocolDir
protocolDirEnum MiniProtocolDirection mode
miniProtocolDir)

data MonitorEvent mode m =
     EventJobResult  MuxJobResult
   | EventControlCmd (ControlCmd mode m)
   | EventStartOnDemand (MiniProtocolState mode m)
                        (MiniProtocolAction m)

-- | The mux forks off a number of threads and its main thread waits and
-- monitors them all. This type covers the different thread and their possible
-- termination behaviour.
--
data MuxJobResult =

       -- | A mini-protocol thread terminated with a result.
       --
       MiniProtocolShutdown MiniProtocolNum MiniProtocolDir

       -- | A mini-protocol thread terminated with an exception. We always
       -- respond by terminating the whole mux.
     | MiniProtocolException MiniProtocolNum MiniProtocolDir SomeException

       -- | Exception in the 'mux' thread. Always fatal.
     | MuxerException   SomeException

       -- | Exception in the 'demux' thread. Always fatal.
     | DemuxerException SomeException


-- | muxChannel creates a duplex channel for a specific 'MiniProtocolId' and
-- 'MiniProtocolDir'.
--
muxChannel
    :: forall m.
       ( MonadSTM m
       )
    => Tracer m MuxTrace
    -> EgressQueue m
    -> Wanton m
    -> MiniProtocolNum
    -> MiniProtocolDir
    -> IngressQueue m
    -> Channel m
muxChannel :: Tracer m MuxTrace
-> EgressQueue m
-> Wanton m
-> MiniProtocolNum
-> MiniProtocolDir
-> IngressQueue m
-> Channel m
muxChannel Tracer m MuxTrace
tracer EgressQueue m
egressQueue want :: Wanton m
want@(Wanton IngressQueue m
w) MiniProtocolNum
mc MiniProtocolDir
md IngressQueue m
q =
    Channel :: forall (m :: * -> *).
(ByteString -> m ()) -> m (Maybe ByteString) -> Channel m
Channel { ByteString -> m ()
send :: ByteString -> m ()
send :: ByteString -> m ()
send, m (Maybe ByteString)
recv :: m (Maybe ByteString)
recv :: m (Maybe ByteString)
recv}
  where
    -- Limit for the message buffer between send and mux thread.
    perMiniProtocolBufferSize :: Int64
    perMiniProtocolBufferSize :: Int64
perMiniProtocolBufferSize = Int64
0x3ffff

    send :: BL.ByteString -> m ()
    send :: ByteString -> m ()
send ByteString
encoding = do
        -- We send CBOR encoded messages by encoding them into by ByteString
        -- forwarding them to the 'mux' thread, see 'Desired servicing semantics'.

        Tracer m MuxTrace -> MuxTrace -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m MuxTrace
tracer (MuxTrace -> m ()) -> MuxTrace -> m ()
forall a b. (a -> b) -> a -> b
$ MiniProtocolNum -> Int -> MuxTrace
MuxTraceChannelSendStart MiniProtocolNum
mc (Int64 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int64 -> Int) -> Int64 -> Int
forall a b. (a -> b) -> a -> b
$ ByteString -> Int64
BL.length ByteString
encoding)

        STM m () -> m ()
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
            ByteString
buf <- IngressQueue m -> STM m ByteString
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar IngressQueue m
w
            if ByteString -> Int64
BL.length ByteString
buf Int64 -> Int64 -> Bool
forall a. Ord a => a -> a -> Bool
< Int64
perMiniProtocolBufferSize
               then do
                   let wasEmpty :: Bool
wasEmpty = ByteString -> Bool
BL.null ByteString
buf
                   IngressQueue m -> ByteString -> STM m ()
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
StrictTVar m a -> a -> STM m ()
writeTVar IngressQueue m
w (ByteString -> ByteString -> ByteString
BL.append ByteString
buf ByteString
encoding)
                   Bool -> STM m () -> STM m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
wasEmpty (STM m () -> STM m ()) -> STM m () -> STM m ()
forall a b. (a -> b) -> a -> b
$
                     EgressQueue m -> TranslocationServiceRequest m -> STM m ()
forall (m :: * -> *) a. MonadSTM m => TBQueue m a -> a -> STM m ()
writeTBQueue EgressQueue m
egressQueue (MiniProtocolNum
-> MiniProtocolDir -> Wanton m -> TranslocationServiceRequest m
forall (m :: * -> *).
MiniProtocolNum
-> MiniProtocolDir -> Wanton m -> TranslocationServiceRequest m
TLSRDemand MiniProtocolNum
mc MiniProtocolDir
md Wanton m
want)
               else STM m ()
forall (m :: * -> *) a. MonadSTM m => STM m a
retry

        Tracer m MuxTrace -> MuxTrace -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m MuxTrace
tracer (MuxTrace -> m ()) -> MuxTrace -> m ()
forall a b. (a -> b) -> a -> b
$ MiniProtocolNum -> MuxTrace
MuxTraceChannelSendEnd MiniProtocolNum
mc

    recv :: m (Maybe BL.ByteString)
    recv :: m (Maybe ByteString)
recv = do
        -- We receive CBOR encoded messages as ByteStrings (possibly partial) from the
        -- matching ingress queue. This is the same queue the 'demux' thread writes to.
        Tracer m MuxTrace -> MuxTrace -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m MuxTrace
tracer (MuxTrace -> m ()) -> MuxTrace -> m ()
forall a b. (a -> b) -> a -> b
$ MiniProtocolNum -> MuxTrace
MuxTraceChannelRecvStart MiniProtocolNum
mc
        ByteString
blob <- STM m ByteString -> m ByteString
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m ByteString -> m ByteString)
-> STM m ByteString -> m ByteString
forall a b. (a -> b) -> a -> b
$ do
            ByteString
blob <- IngressQueue m -> STM m ByteString
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar IngressQueue m
q
            if ByteString
blob ByteString -> ByteString -> Bool
forall a. Eq a => a -> a -> Bool
== ByteString
BL.empty
                then STM m ByteString
forall (m :: * -> *) a. MonadSTM m => STM m a
retry
                else IngressQueue m -> ByteString -> STM m ()
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
StrictTVar m a -> a -> STM m ()
writeTVar IngressQueue m
q ByteString
BL.empty STM m () -> STM m ByteString -> STM m ByteString
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> ByteString -> STM m ByteString
forall (m :: * -> *) a. Monad m => a -> m a
return ByteString
blob
        -- say $ printf "recv mid %s mode %s blob len %d" (show mid) (show md) (BL.length blob)
        Tracer m MuxTrace -> MuxTrace -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m MuxTrace
tracer (MuxTrace -> m ()) -> MuxTrace -> m ()
forall a b. (a -> b) -> a -> b
$ MiniProtocolNum -> Int -> MuxTrace
MuxTraceChannelRecvEnd MiniProtocolNum
mc (Int64 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int64 -> Int) -> Int64 -> Int
forall a b. (a -> b) -> a -> b
$ ByteString -> Int64
BL.length ByteString
blob)
        Maybe ByteString -> m (Maybe ByteString)
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe ByteString -> m (Maybe ByteString))
-> Maybe ByteString -> m (Maybe ByteString)
forall a b. (a -> b) -> a -> b
$ ByteString -> Maybe ByteString
forall a. a -> Maybe a
Just ByteString
blob

traceMuxBearerState :: Tracer m MuxTrace -> MuxBearerState -> m ()
traceMuxBearerState :: Tracer m MuxTrace -> MuxBearerState -> m ()
traceMuxBearerState Tracer m MuxTrace
tracer MuxBearerState
state =
    Tracer m MuxTrace -> MuxTrace -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m MuxTrace
tracer (MuxBearerState -> MuxTrace
MuxTraceState MuxBearerState
state)


--
-- Starting mini-protocol threads
--

-- | Arrange to run a protocol thread (for a particular 'MiniProtocolNum' and
-- 'MiniProtocolDirection') to interact on this protocol's 'Channel'.
--
-- The protocol thread can either be started eagerly or on-demand:
--
-- * With 'StartEagerly', the thread is started promptly. This is appropriate
--   for mini-protocols where the opening message may be sent by this thread.
--
-- * With 'StartOnDemand', the thread is not started until the first data is
--   received for this mini-protocol. This is appropriate for mini-protocols
--   where the opening message is sent by the remote peer.
--
-- The result is a STM action to block and wait on the protocol completion.
-- It is safe to call this completion action multiple times: it will always
-- return the same result once the protocol thread completes.
-- In case the Mux has stopped, either due to an exception or because of a call
-- to muxStop a `Left MuxError` will be returned from the STM action.
--
-- It is an error to start a new protocol thread while one is still running,
-- for the same 'MiniProtocolNum' and 'MiniProtocolDirection'. This can easily be
-- avoided by using the STM completion action to wait for the previous one to
-- finish.
--
-- It is safe to ask to start a protocol thread before 'runMux'. In this case
-- the protocol thread will not actually start until 'runMux' is called,
-- irrespective of the 'StartOnDemandOrEagerly' value.
--
runMiniProtocol :: forall mode m a.
                   ( MonadSTM   m
                   , MonadThrow m
                   , MonadThrow (STM m)
                   )
                => Mux mode m
                -> MiniProtocolNum
                -> MiniProtocolDirection mode
                -> StartOnDemandOrEagerly
                -> (Channel m -> m (a, Maybe BL.ByteString))
                -> m (STM m (Either SomeException a))
runMiniProtocol :: Mux mode m
-> MiniProtocolNum
-> MiniProtocolDirection mode
-> StartOnDemandOrEagerly
-> (Channel m -> m (a, Maybe ByteString))
-> m (STM m (Either SomeException a))
runMiniProtocol Mux { Map (MiniProtocolNum, MiniProtocolDir) (MiniProtocolState mode m)
muxMiniProtocols :: Map (MiniProtocolNum, MiniProtocolDir) (MiniProtocolState mode m)
muxMiniProtocols :: forall (mode :: MuxMode) (m :: * -> *).
Mux mode m
-> Map
     (MiniProtocolNum, MiniProtocolDir) (MiniProtocolState mode m)
muxMiniProtocols, TQueue m (ControlCmd mode m)
muxControlCmdQueue :: TQueue m (ControlCmd mode m)
muxControlCmdQueue :: forall (mode :: MuxMode) (m :: * -> *).
Mux mode m -> TQueue m (ControlCmd mode m)
muxControlCmdQueue , StrictTVar m MuxStatus
muxStatus :: StrictTVar m MuxStatus
muxStatus :: forall (mode :: MuxMode) (m :: * -> *).
Mux mode m -> StrictTVar m MuxStatus
muxStatus}
                MiniProtocolNum
ptclNum MiniProtocolDirection mode
ptclDir StartOnDemandOrEagerly
startMode Channel m -> m (a, Maybe ByteString)
protocolAction

    -- Ensure the mini-protocol is known and get the status var
  | Just ptclState :: MiniProtocolState mode m
ptclState@MiniProtocolState{StrictTVar m MiniProtocolStatus
miniProtocolStatusVar :: StrictTVar m MiniProtocolStatus
miniProtocolStatusVar :: forall (mode :: MuxMode) (m :: * -> *).
MiniProtocolState mode m -> StrictTVar m MiniProtocolStatus
miniProtocolStatusVar}
      <- (MiniProtocolNum, MiniProtocolDir)
-> Map
     (MiniProtocolNum, MiniProtocolDir) (MiniProtocolState mode m)
-> Maybe (MiniProtocolState mode m)
forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup (MiniProtocolNum
ptclNum, MiniProtocolDir
ptclDir') Map (MiniProtocolNum, MiniProtocolDir) (MiniProtocolState mode m)
muxMiniProtocols

  = STM m (STM m (Either SomeException a))
-> m (STM m (Either SomeException a))
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m (STM m (Either SomeException a))
 -> m (STM m (Either SomeException a)))
-> STM m (STM m (Either SomeException a))
-> m (STM m (Either SomeException a))
forall a b. (a -> b) -> a -> b
$ do
      MuxStatus
st <- StrictTVar m MuxStatus -> STM m MuxStatus
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m MuxStatus
muxStatus
      case MuxStatus
st of
        MuxStatus
MuxStopping -> MuxError -> STM m ()
forall (m :: * -> *) e a.
(MonadSTM m, MonadThrow (STM m), Exception e) =>
e -> STM m a
throwSTM (MuxErrorType -> String -> MuxError
MuxError  (Maybe MuxErrorType -> MuxErrorType
MuxShutdown Maybe MuxErrorType
forall a. Maybe a
Nothing) String
"mux stopping")
        MuxStatus
MuxStopped  -> MuxError -> STM m ()
forall (m :: * -> *) e a.
(MonadSTM m, MonadThrow (STM m), Exception e) =>
e -> STM m a
throwSTM (MuxErrorType -> String -> MuxError
MuxError  (Maybe MuxErrorType -> MuxErrorType
MuxShutdown Maybe MuxErrorType
forall a. Maybe a
Nothing) String
"mux stopped")
        MuxStatus
_           -> () -> STM m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()

      -- Make sure no thread is currently running, and update the status to
      -- indicate a thread is running (or ready to start on demand)
      MiniProtocolStatus
status <- StrictTVar m MiniProtocolStatus -> STM m MiniProtocolStatus
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m MiniProtocolStatus
miniProtocolStatusVar
      Bool -> STM m () -> STM m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (MiniProtocolStatus
status MiniProtocolStatus -> MiniProtocolStatus -> Bool
forall a. Eq a => a -> a -> Bool
== MiniProtocolStatus
StatusIdle) (STM m () -> STM m ()) -> STM m () -> STM m ()
forall a b. (a -> b) -> a -> b
$
        MuxRuntimeError -> STM m ()
forall (m :: * -> *) e a.
(MonadSTM m, MonadThrow (STM m), Exception e) =>
e -> STM m a
throwSTM (MiniProtocolNum
-> MiniProtocolDir -> MiniProtocolStatus -> MuxRuntimeError
ProtocolAlreadyRunning MiniProtocolNum
ptclNum MiniProtocolDir
ptclDir' MiniProtocolStatus
status)
      let !status' :: MiniProtocolStatus
status' = case StartOnDemandOrEagerly
startMode of
                       StartOnDemandOrEagerly
StartOnDemand -> MiniProtocolStatus
StatusStartOnDemand
                       StartOnDemandOrEagerly
StartEagerly  -> MiniProtocolStatus
StatusRunning
      StrictTVar m MiniProtocolStatus -> MiniProtocolStatus -> STM m ()
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
StrictTVar m a -> a -> STM m ()
writeTVar StrictTVar m MiniProtocolStatus
miniProtocolStatusVar MiniProtocolStatus
status'

      -- Tell the mux control to start the thread
      StrictTMVar m (Either SomeException a)
completionVar <- STM m (StrictTMVar m (Either SomeException a))
forall (m :: * -> *) a. MonadSTM m => STM m (StrictTMVar m a)
newEmptyTMVar
      TQueue m (ControlCmd mode m) -> ControlCmd mode m -> STM m ()
forall (m :: * -> *) a. MonadSTM m => TQueue m a -> a -> STM m ()
writeTQueue TQueue m (ControlCmd mode m)
muxControlCmdQueue (ControlCmd mode m -> STM m ()) -> ControlCmd mode m -> STM m ()
forall a b. (a -> b) -> a -> b
$
        StartOnDemandOrEagerly
-> MiniProtocolState mode m
-> MiniProtocolAction m
-> ControlCmd mode m
forall (mode :: MuxMode) (m :: * -> *).
StartOnDemandOrEagerly
-> MiniProtocolState mode m
-> MiniProtocolAction m
-> ControlCmd mode m
CmdStartProtocolThread
          StartOnDemandOrEagerly
startMode
          MiniProtocolState mode m
ptclState
          ((Channel m -> m (a, Maybe ByteString))
-> StrictTMVar m (Either SomeException a) -> MiniProtocolAction m
forall (m :: * -> *) a.
(Channel m -> m (a, Maybe ByteString))
-> StrictTMVar m (Either SomeException a) -> MiniProtocolAction m
MiniProtocolAction Channel m -> m (a, Maybe ByteString)
protocolAction StrictTMVar m (Either SomeException a)
completionVar)

      STM m (Either SomeException a)
-> STM m (STM m (Either SomeException a))
forall (m :: * -> *) a. Monad m => a -> m a
return (STM m (Either SomeException a)
 -> STM m (STM m (Either SomeException a)))
-> STM m (Either SomeException a)
-> STM m (STM m (Either SomeException a))
forall a b. (a -> b) -> a -> b
$ StrictTMVar m (Either SomeException a)
-> STM m (Either SomeException a)
completionAction StrictTMVar m (Either SomeException a)
completionVar

    -- It is a programmer error to get the wrong protocol, but this is also
    -- very easy to avoid.
  | Bool
otherwise
  = MuxRuntimeError -> m (STM m (Either SomeException a))
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO (MiniProtocolNum -> MiniProtocolDir -> MuxRuntimeError
UnknownProtocolInternalError MiniProtocolNum
ptclNum MiniProtocolDir
ptclDir')
  where
    ptclDir' :: MiniProtocolDir
ptclDir' = MiniProtocolDirection mode -> MiniProtocolDir
forall (mode :: MuxMode).
MiniProtocolDirection mode -> MiniProtocolDir
protocolDirEnum MiniProtocolDirection mode
ptclDir

    -- Wait for the miniprotocol to complete.
    -- If the mux was stopped through a call to 'stopMux' (MuxStopped)
    -- or in case of an error (MuxFailed) we return the result of
    -- the miniprotocol, or a `MuxError` if it was still running.
    completionAction :: StrictTMVar m (Either SomeException a)
-> STM m (Either SomeException a)
completionAction StrictTMVar m (Either SomeException a)
completionVar = do
      MuxStatus
st <- StrictTVar m MuxStatus -> STM m MuxStatus
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m MuxStatus
muxStatus
      case MuxStatus
st of
           MuxStatus
MuxReady    -> StrictTMVar m (Either SomeException a)
-> STM m (Either SomeException a)
forall (m :: * -> *) a. MonadSTM m => StrictTMVar m a -> STM m a
readTMVar StrictTMVar m (Either SomeException a)
completionVar
           MuxStatus
MuxStopping -> StrictTMVar m (Either SomeException a)
-> STM m (Either SomeException a)
forall (m :: * -> *) a. MonadSTM m => StrictTMVar m a -> STM m a
readTMVar StrictTMVar m (Either SomeException a)
completionVar
                      STM m (Either SomeException a)
-> STM m (Either SomeException a) -> STM m (Either SomeException a)
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> Either SomeException a -> STM m (Either SomeException a)
forall (m :: * -> *) a. Monad m => a -> m a
return (SomeException -> Either SomeException a
forall a b. a -> Either a b
Left (SomeException -> Either SomeException a)
-> SomeException -> Either SomeException a
forall a b. (a -> b) -> a -> b
$ MuxError -> SomeException
forall e. Exception e => e -> SomeException
toException (MuxErrorType -> String -> MuxError
MuxError (Maybe MuxErrorType -> MuxErrorType
MuxShutdown Maybe MuxErrorType
forall a. Maybe a
Nothing) String
"Mux stoping"))
           MuxStatus
MuxStopped  -> StrictTMVar m (Either SomeException a)
-> STM m (Either SomeException a)
forall (m :: * -> *) a. MonadSTM m => StrictTMVar m a -> STM m a
readTMVar StrictTMVar m (Either SomeException a)
completionVar
                      STM m (Either SomeException a)
-> STM m (Either SomeException a) -> STM m (Either SomeException a)
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> Either SomeException a -> STM m (Either SomeException a)
forall (m :: * -> *) a. Monad m => a -> m a
return (SomeException -> Either SomeException a
forall a b. a -> Either a b
Left (SomeException -> Either SomeException a)
-> SomeException -> Either SomeException a
forall a b. (a -> b) -> a -> b
$ MuxError -> SomeException
forall e. Exception e => e -> SomeException
toException (MuxErrorType -> String -> MuxError
MuxError (Maybe MuxErrorType -> MuxErrorType
MuxShutdown Maybe MuxErrorType
forall a. Maybe a
Nothing) String
"Mux stopped"))
           MuxFailed SomeException
e -> StrictTMVar m (Either SomeException a)
-> STM m (Either SomeException a)
forall (m :: * -> *) a. MonadSTM m => StrictTMVar m a -> STM m a
readTMVar StrictTMVar m (Either SomeException a)
completionVar
                      STM m (Either SomeException a)
-> STM m (Either SomeException a) -> STM m (Either SomeException a)
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> Either SomeException a -> STM m (Either SomeException a)
forall (m :: * -> *) a. Monad m => a -> m a
return (SomeException -> Either SomeException a
forall a b. a -> Either a b
Left (SomeException -> Either SomeException a)
-> SomeException -> Either SomeException a
forall a b. (a -> b) -> a -> b
$ MuxError -> SomeException
forall e. Exception e => e -> SomeException
toException (MuxError -> SomeException) -> MuxError -> SomeException
forall a b. (a -> b) -> a -> b
$
                            case SomeException -> Maybe MuxError
forall e. Exception e => SomeException -> Maybe e
fromException SomeException
e of
                              Just e' :: MuxError
e'@MuxError { MuxErrorType
errorType :: MuxError -> MuxErrorType
errorType :: MuxErrorType
errorType } ->
                                MuxError
e' { errorType :: MuxErrorType
errorType = Maybe MuxErrorType -> MuxErrorType
MuxShutdown (MuxErrorType -> Maybe MuxErrorType
forall a. a -> Maybe a
Just MuxErrorType
errorType) }
                              Maybe MuxError
Nothing ->
                                MuxErrorType -> String -> MuxError
MuxError (Maybe MuxErrorType -> MuxErrorType
MuxShutdown Maybe MuxErrorType
forall a. Maybe a
Nothing) (SomeException -> String
forall a. Show a => a -> String
show SomeException
e))