{-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE RankNTypes #-} {-# LANGUAGE ScopedTypeVariables #-} module Network.Mux.Bearer.Queues (queuesAsMuxBearer) where import qualified Data.ByteString.Lazy as BL import Control.Monad.Class.MonadSTM.Strict import Control.Monad.Class.MonadThrow import Control.Monad.Class.MonadTime import Control.Tracer import qualified Network.Mux as Mx import qualified Network.Mux.Codec as Mx import Network.Mux.Time as Mx import qualified Network.Mux.Timeout as Mx import Network.Mux.Types (MuxBearer, SDUSize) import qualified Network.Mux.Types as Mx queuesAsMuxBearer :: forall m. ( MonadSTM m , MonadTime m , MonadThrow m ) => Tracer m Mx.MuxTrace -> TBQueue m BL.ByteString -> TBQueue m BL.ByteString -> SDUSize -> MuxBearer m queuesAsMuxBearer :: Tracer m MuxTrace -> TBQueue m ByteString -> TBQueue m ByteString -> SDUSize -> MuxBearer m queuesAsMuxBearer Tracer m MuxTrace tracer TBQueue m ByteString writeQueue TBQueue m ByteString readQueue SDUSize sduSize = do MuxBearer :: forall (m :: * -> *). (TimeoutFn m -> MuxSDU -> m Time) -> (TimeoutFn m -> m (MuxSDU, Time)) -> SDUSize -> MuxBearer m Mx.MuxBearer { read :: TimeoutFn m -> m (MuxSDU, Time) Mx.read = TimeoutFn m -> m (MuxSDU, Time) readMux, write :: TimeoutFn m -> MuxSDU -> m Time Mx.write = TimeoutFn m -> MuxSDU -> m Time writeMux, sduSize :: SDUSize Mx.sduSize = SDUSize sduSize } where readMux :: Mx.TimeoutFn m -> m (Mx.MuxSDU, Time) readMux :: TimeoutFn m -> m (MuxSDU, Time) readMux TimeoutFn m _ = do Tracer m MuxTrace -> MuxTrace -> m () forall (m :: * -> *) a. Tracer m a -> a -> m () traceWith Tracer m MuxTrace tracer (MuxTrace -> m ()) -> MuxTrace -> m () forall a b. (a -> b) -> a -> b $ MuxTrace Mx.MuxTraceRecvHeaderStart ByteString buf <- STM m ByteString -> m ByteString forall (m :: * -> *) a. (MonadSTM m, HasCallStack) => STM m a -> m a atomically (STM m ByteString -> m ByteString) -> STM m ByteString -> m ByteString forall a b. (a -> b) -> a -> b $ TBQueue m ByteString -> STM m ByteString forall (m :: * -> *) a. MonadSTM m => TBQueue m a -> STM m a readTBQueue TBQueue m ByteString readQueue let (ByteString hbuf, ByteString payload) = Int64 -> ByteString -> (ByteString, ByteString) BL.splitAt Int64 8 ByteString buf case ByteString -> Either MuxError MuxSDU Mx.decodeMuxSDU ByteString hbuf of Left MuxError e -> MuxError -> m (MuxSDU, Time) forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a throwIO MuxError e Right MuxSDU header -> do Tracer m MuxTrace -> MuxTrace -> m () forall (m :: * -> *) a. Tracer m a -> a -> m () traceWith Tracer m MuxTrace tracer (MuxTrace -> m ()) -> MuxTrace -> m () forall a b. (a -> b) -> a -> b $ MuxSDUHeader -> MuxTrace Mx.MuxTraceRecvHeaderEnd (MuxSDU -> MuxSDUHeader Mx.msHeader MuxSDU header) Time ts <- m Time forall (m :: * -> *). MonadMonotonicTime m => m Time getMonotonicTime Tracer m MuxTrace -> MuxTrace -> m () forall (m :: * -> *) a. Tracer m a -> a -> m () traceWith Tracer m MuxTrace tracer (MuxTrace -> m ()) -> MuxTrace -> m () forall a b. (a -> b) -> a -> b $ MuxSDUHeader -> Time -> MuxTrace Mx.MuxTraceRecvDeltaQObservation (MuxSDU -> MuxSDUHeader Mx.msHeader MuxSDU header) Time ts (MuxSDU, Time) -> m (MuxSDU, Time) forall (m :: * -> *) a. Monad m => a -> m a return (MuxSDU header {msBlob :: ByteString Mx.msBlob = ByteString payload}, Time ts) writeMux :: Mx.TimeoutFn m -> Mx.MuxSDU -> m Time writeMux :: TimeoutFn m -> MuxSDU -> m Time writeMux TimeoutFn m _ MuxSDU sdu = do Time ts <- m Time forall (m :: * -> *). MonadMonotonicTime m => m Time getMonotonicTime let ts32 :: Word32 ts32 = Time -> Word32 Mx.timestampMicrosecondsLow32Bits Time ts sdu' :: MuxSDU sdu' = MuxSDU -> RemoteClockModel -> MuxSDU Mx.setTimestamp MuxSDU sdu (Word32 -> RemoteClockModel Mx.RemoteClockModel Word32 ts32) buf :: ByteString buf = MuxSDU -> ByteString Mx.encodeMuxSDU MuxSDU sdu' Tracer m MuxTrace -> MuxTrace -> m () forall (m :: * -> *) a. Tracer m a -> a -> m () traceWith Tracer m MuxTrace tracer (MuxTrace -> m ()) -> MuxTrace -> m () forall a b. (a -> b) -> a -> b $ MuxSDUHeader -> MuxTrace Mx.MuxTraceSendStart (MuxSDU -> MuxSDUHeader Mx.msHeader MuxSDU sdu') STM m () -> m () forall (m :: * -> *) a. (MonadSTM m, HasCallStack) => STM m a -> m a atomically (STM m () -> m ()) -> STM m () -> m () forall a b. (a -> b) -> a -> b $ TBQueue m ByteString -> ByteString -> STM m () forall (m :: * -> *) a. MonadSTM m => TBQueue m a -> a -> STM m () writeTBQueue TBQueue m ByteString writeQueue ByteString buf Tracer m MuxTrace -> MuxTrace -> m () forall (m :: * -> *) a. Tracer m a -> a -> m () traceWith Tracer m MuxTrace tracer (MuxTrace -> m ()) -> MuxTrace -> m () forall a b. (a -> b) -> a -> b $ MuxTrace Mx.MuxTraceSendEnd Time -> m Time forall (m :: * -> *) a. Monad m => a -> m a return Time ts