{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE TypeFamilies #-}
module Network.Mux.Egress
( muxer
, EgressQueue
, TranslocationServiceRequest (..)
, Wanton (..)
) where
import Control.Monad
import qualified Data.ByteString.Lazy as BL
import Control.Monad.Class.MonadAsync
import Control.Monad.Class.MonadFork
import Control.Monad.Class.MonadSTM.Strict
import Control.Monad.Class.MonadThrow
import Control.Monad.Class.MonadTime
import Control.Monad.Class.MonadTimer hiding (timeout)
import Network.Mux.Timeout
import Network.Mux.Types
type EgressQueue m = TBQueue m (TranslocationServiceRequest m)
data TranslocationServiceRequest m =
TLSRDemand !MiniProtocolNum !MiniProtocolDir !(Wanton m)
newtype Wanton m = Wanton { Wanton m -> StrictTVar m ByteString
want :: StrictTVar m BL.ByteString }
muxer
:: ( MonadAsync m
, MonadFork m
, MonadMask m
, MonadThrow (STM m)
, MonadTimer m
, MonadTime m
)
=> EgressQueue m
-> MuxBearer m
-> m void
muxer :: EgressQueue m -> MuxBearer m -> m void
muxer EgressQueue m
egressQueue MuxBearer m
bearer =
(TimeoutFn m -> m void) -> m void
forall (m :: * -> *) b.
(MonadAsync m, MonadFork m, MonadMonotonicTime m, MonadTimer m,
MonadMask m, MonadThrow (STM m)) =>
(TimeoutFn m -> m b) -> m b
withTimeoutSerial ((TimeoutFn m -> m void) -> m void)
-> (TimeoutFn m -> m void) -> m void
forall a b. (a -> b) -> a -> b
$ \TimeoutFn m
timeout ->
m () -> m void
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (m () -> m void) -> m () -> m void
forall a b. (a -> b) -> a -> b
$ do
TLSRDemand MiniProtocolNum
mpc MiniProtocolDir
md Wanton m
d <- STM m (TranslocationServiceRequest m)
-> m (TranslocationServiceRequest m)
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m (TranslocationServiceRequest m)
-> m (TranslocationServiceRequest m))
-> STM m (TranslocationServiceRequest m)
-> m (TranslocationServiceRequest m)
forall a b. (a -> b) -> a -> b
$ EgressQueue m -> STM m (TranslocationServiceRequest m)
forall (m :: * -> *) a. MonadSTM m => TBQueue m a -> STM m a
readTBQueue EgressQueue m
egressQueue
EgressQueue m
-> MuxBearer m
-> TimeoutFn m
-> MiniProtocolNum
-> MiniProtocolDir
-> Wanton m
-> m ()
forall (m :: * -> *).
MonadSTM m =>
EgressQueue m
-> MuxBearer m
-> TimeoutFn m
-> MiniProtocolNum
-> MiniProtocolDir
-> Wanton m
-> m ()
processSingleWanton EgressQueue m
egressQueue MuxBearer m
bearer TimeoutFn m
timeout MiniProtocolNum
mpc MiniProtocolDir
md Wanton m
d
processSingleWanton :: MonadSTM m
=> EgressQueue m
-> MuxBearer m
-> TimeoutFn m
-> MiniProtocolNum
-> MiniProtocolDir
-> Wanton m
-> m ()
processSingleWanton :: EgressQueue m
-> MuxBearer m
-> TimeoutFn m
-> MiniProtocolNum
-> MiniProtocolDir
-> Wanton m
-> m ()
processSingleWanton EgressQueue m
egressQueue MuxBearer { TimeoutFn m -> MuxSDU -> m Time
write :: forall (m :: * -> *).
MuxBearer m -> TimeoutFn m -> MuxSDU -> m Time
write :: TimeoutFn m -> MuxSDU -> m Time
write, SDUSize
sduSize :: forall (m :: * -> *). MuxBearer m -> SDUSize
sduSize :: SDUSize
sduSize }
TimeoutFn m
timeout MiniProtocolNum
mpc MiniProtocolDir
md Wanton m
wanton = do
ByteString
blob <- STM m ByteString -> m ByteString
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m ByteString -> m ByteString)
-> STM m ByteString -> m ByteString
forall a b. (a -> b) -> a -> b
$ do
ByteString
d <- StrictTVar m ByteString -> STM m ByteString
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar (Wanton m -> StrictTVar m ByteString
forall (m :: * -> *). Wanton m -> StrictTVar m ByteString
want Wanton m
wanton)
let (ByteString
frag, ByteString
rest) = Int64 -> ByteString -> (ByteString, ByteString)
BL.splitAt (Word16 -> Int64
forall a b. (Integral a, Num b) => a -> b
fromIntegral (SDUSize -> Word16
getSDUSize SDUSize
sduSize)) ByteString
d
if ByteString -> Bool
BL.null ByteString
rest
then StrictTVar m ByteString -> ByteString -> STM m ()
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
StrictTVar m a -> a -> STM m ()
writeTVar (Wanton m -> StrictTVar m ByteString
forall (m :: * -> *). Wanton m -> StrictTVar m ByteString
want Wanton m
wanton) ByteString
BL.empty
else do
StrictTVar m ByteString -> ByteString -> STM m ()
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
StrictTVar m a -> a -> STM m ()
writeTVar (Wanton m -> StrictTVar m ByteString
forall (m :: * -> *). Wanton m -> StrictTVar m ByteString
want Wanton m
wanton) ByteString
rest
EgressQueue m -> TranslocationServiceRequest m -> STM m ()
forall (m :: * -> *) a. MonadSTM m => TBQueue m a -> a -> STM m ()
writeTBQueue EgressQueue m
egressQueue (MiniProtocolNum
-> MiniProtocolDir -> Wanton m -> TranslocationServiceRequest m
forall (m :: * -> *).
MiniProtocolNum
-> MiniProtocolDir -> Wanton m -> TranslocationServiceRequest m
TLSRDemand MiniProtocolNum
mpc MiniProtocolDir
md Wanton m
wanton)
ByteString -> STM m ByteString
forall (f :: * -> *) a. Applicative f => a -> f a
pure ByteString
frag
let sdu :: MuxSDU
sdu = MuxSDU :: MuxSDUHeader -> ByteString -> MuxSDU
MuxSDU {
msHeader :: MuxSDUHeader
msHeader = MuxSDUHeader :: RemoteClockModel
-> MiniProtocolNum -> MiniProtocolDir -> Word16 -> MuxSDUHeader
MuxSDUHeader {
mhTimestamp :: RemoteClockModel
mhTimestamp = (Word32 -> RemoteClockModel
RemoteClockModel Word32
0),
mhNum :: MiniProtocolNum
mhNum = MiniProtocolNum
mpc,
mhDir :: MiniProtocolDir
mhDir = MiniProtocolDir
md,
mhLength :: Word16
mhLength = Int64 -> Word16
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int64 -> Word16) -> Int64 -> Word16
forall a b. (a -> b) -> a -> b
$ ByteString -> Int64
BL.length ByteString
blob
},
msBlob :: ByteString
msBlob = ByteString
blob
}
m Time -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m Time -> m ()) -> m Time -> m ()
forall a b. (a -> b) -> a -> b
$ TimeoutFn m -> MuxSDU -> m Time
write TimeoutFn m
timeout MuxSDU
sdu