{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
module Network.Mux.Bearer.AttenuatedChannel
( AttenuatedChannel (..)
, Size
, SuccessOrFailure (..)
, Attenuation (..)
, newConnectedAttenuatedChannelPair
, attenuationChannelAsMuxBearer
, AttenuatedChannelTrace (..)
, resourceVanishedIOError
) where
import Prelude hiding (read)
import Control.Monad (when)
import Control.Monad.Class.MonadSTM.Strict
import Control.Monad.Class.MonadThrow
import Control.Monad.Class.MonadTime
import Control.Monad.Class.MonadTimer
import Control.Tracer (Tracer, traceWith)
import GHC.IO.Exception
import qualified Data.ByteString.Lazy as BL
import Data.Int (Int64)
import Network.Mux.Codec
import Network.Mux.Time
import Network.Mux.Timeout
import Network.Mux.Trace
import Network.Mux.Types
data Message =
MsgClose
| MsgBytes BL.ByteString
deriving Message -> Message -> Bool
(Message -> Message -> Bool)
-> (Message -> Message -> Bool) -> Eq Message
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: Message -> Message -> Bool
$c/= :: Message -> Message -> Bool
== :: Message -> Message -> Bool
$c== :: Message -> Message -> Bool
Eq
data QueueChannel m = QueueChannel {
QueueChannel m -> StrictTVar m (Maybe (TQueue m Message))
qcRead :: StrictTVar m (Maybe (TQueue m Message)),
QueueChannel m -> StrictTVar m (Maybe (TQueue m Message))
qcWrite :: StrictTVar m (Maybe (TQueue m Message))
}
readQueueChannel :: ( MonadSTM m
, MonadThrow (STM m)
)
=> QueueChannel m -> m Message
readQueueChannel :: QueueChannel m -> m Message
readQueueChannel QueueChannel { StrictTVar m (Maybe (TQueue m Message))
qcRead :: StrictTVar m (Maybe (TQueue m Message))
qcRead :: forall (m :: * -> *).
QueueChannel m -> StrictTVar m (Maybe (TQueue m Message))
qcRead } =
STM m Message -> m Message
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m Message -> m Message) -> STM m Message -> m Message
forall a b. (a -> b) -> a -> b
$ do
Maybe Message
a <- StrictTVar m (Maybe (TQueue m Message))
-> STM m (Maybe (TQueue m Message))
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m (Maybe (TQueue m Message))
qcRead STM m (Maybe (TQueue m Message))
-> (Maybe (TQueue m Message) -> STM m (Maybe Message))
-> STM m (Maybe Message)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (TQueue m Message -> STM m Message)
-> Maybe (TQueue m Message) -> STM m (Maybe Message)
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
traverse TQueue m Message -> STM m Message
forall (m :: * -> *) a. MonadSTM m => TQueue m a -> STM m a
readTQueue
case Maybe Message
a of
Maybe Message
Nothing -> IOError -> STM m Message
forall (m :: * -> *) e a.
(MonadSTM m, MonadThrow (STM m), Exception e) =>
e -> STM m a
throwSTM (String -> String -> IOError
resourceVanishedIOError
String
"AttenuatedChannel.readQueueChannel"
String
"channel vanished")
Just msg :: Message
msg@Message
MsgClose -> StrictTVar m (Maybe (TQueue m Message))
-> Maybe (TQueue m Message) -> STM m ()
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
StrictTVar m a -> a -> STM m ()
writeTVar StrictTVar m (Maybe (TQueue m Message))
qcRead Maybe (TQueue m Message)
forall a. Maybe a
Nothing
STM m () -> STM m Message -> STM m Message
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Message -> STM m Message
forall (m :: * -> *) a. Monad m => a -> m a
return Message
msg
Just Message
msg -> Message -> STM m Message
forall (m :: * -> *) a. Monad m => a -> m a
return Message
msg
writeQueueChannel :: MonadSTM m
=> QueueChannel m -> Message -> m Bool
writeQueueChannel :: QueueChannel m -> Message -> m Bool
writeQueueChannel QueueChannel { StrictTVar m (Maybe (TQueue m Message))
qcWrite :: StrictTVar m (Maybe (TQueue m Message))
qcWrite :: forall (m :: * -> *).
QueueChannel m -> StrictTVar m (Maybe (TQueue m Message))
qcWrite, StrictTVar m (Maybe (TQueue m Message))
qcRead :: StrictTVar m (Maybe (TQueue m Message))
qcRead :: forall (m :: * -> *).
QueueChannel m -> StrictTVar m (Maybe (TQueue m Message))
qcRead } Message
msg =
STM m Bool -> m Bool
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m Bool -> m Bool) -> STM m Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ do
Maybe (TQueue m Message)
mq <- StrictTVar m (Maybe (TQueue m Message))
-> STM m (Maybe (TQueue m Message))
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m (Maybe (TQueue m Message))
qcWrite
case Maybe (TQueue m Message)
mq of
Maybe (TQueue m Message)
Nothing -> do
StrictTVar m (Maybe (TQueue m Message))
-> Maybe (TQueue m Message) -> STM m ()
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
StrictTVar m a -> a -> STM m ()
writeTVar StrictTVar m (Maybe (TQueue m Message))
qcRead Maybe (TQueue m Message)
forall a. Maybe a
Nothing
Bool -> STM m Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
Just TQueue m Message
q -> do
case Message
msg of
Message
MsgClose -> StrictTVar m (Maybe (TQueue m Message))
-> Maybe (TQueue m Message) -> STM m ()
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
StrictTVar m a -> a -> STM m ()
writeTVar StrictTVar m (Maybe (TQueue m Message))
qcWrite Maybe (TQueue m Message)
forall a. Maybe a
Nothing
Message
_ -> TQueue m Message -> Message -> STM m ()
forall (m :: * -> *) a. MonadSTM m => TQueue m a -> a -> STM m ()
writeTQueue TQueue m Message
q Message
msg
Bool -> STM m Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
newConnectedQueueChannelPair :: ( MonadSTM m
, MonadLabelledSTM m
)
=> STM m ( QueueChannel m
, QueueChannel m )
newConnectedQueueChannelPair :: STM m (QueueChannel m, QueueChannel m)
newConnectedQueueChannelPair = do
TQueue m Message
read <- STM m (TQueue m Message)
forall (m :: * -> *) a. MonadSTM m => STM m (TQueue m a)
newTQueue
TQueue m Message
write <- STM m (TQueue m Message)
forall (m :: * -> *) a. MonadSTM m => STM m (TQueue m a)
newTQueue
TQueue m Message -> String -> STM m ()
forall (m :: * -> *) a.
MonadLabelledSTM m =>
TQueue m a -> String -> STM m ()
labelTQueue TQueue m Message
read String
"qc-queue-read"
TQueue m Message -> String -> STM m ()
forall (m :: * -> *) a.
MonadLabelledSTM m =>
TQueue m a -> String -> STM m ()
labelTQueue TQueue m Message
write String
"qc-queue-write"
QueueChannel m
q <- StrictTVar m (Maybe (TQueue m Message))
-> StrictTVar m (Maybe (TQueue m Message)) -> QueueChannel m
forall (m :: * -> *).
StrictTVar m (Maybe (TQueue m Message))
-> StrictTVar m (Maybe (TQueue m Message)) -> QueueChannel m
QueueChannel (StrictTVar m (Maybe (TQueue m Message))
-> StrictTVar m (Maybe (TQueue m Message)) -> QueueChannel m)
-> STM m (StrictTVar m (Maybe (TQueue m Message)))
-> STM
m (StrictTVar m (Maybe (TQueue m Message)) -> QueueChannel m)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe (TQueue m Message)
-> STM m (StrictTVar m (Maybe (TQueue m Message)))
forall (m :: * -> *) a. MonadSTM m => a -> STM m (StrictTVar m a)
newTVar (TQueue m Message -> Maybe (TQueue m Message)
forall a. a -> Maybe a
Just TQueue m Message
read)
STM m (StrictTVar m (Maybe (TQueue m Message)) -> QueueChannel m)
-> STM m (StrictTVar m (Maybe (TQueue m Message)))
-> STM m (QueueChannel m)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Maybe (TQueue m Message)
-> STM m (StrictTVar m (Maybe (TQueue m Message)))
forall (m :: * -> *) a. MonadSTM m => a -> STM m (StrictTVar m a)
newTVar (TQueue m Message -> Maybe (TQueue m Message)
forall a. a -> Maybe a
Just TQueue m Message
write)
StrictTVar m (Maybe (TQueue m Message)) -> String -> STM m ()
forall (m :: * -> *) a.
MonadLabelledSTM m =>
StrictTVar m a -> String -> STM m ()
labelTVar (QueueChannel m -> StrictTVar m (Maybe (TQueue m Message))
forall (m :: * -> *).
QueueChannel m -> StrictTVar m (Maybe (TQueue m Message))
qcRead QueueChannel m
q) String
"qc-read"
StrictTVar m (Maybe (TQueue m Message)) -> String -> STM m ()
forall (m :: * -> *) a.
MonadLabelledSTM m =>
StrictTVar m a -> String -> STM m ()
labelTVar (QueueChannel m -> StrictTVar m (Maybe (TQueue m Message))
forall (m :: * -> *).
QueueChannel m -> StrictTVar m (Maybe (TQueue m Message))
qcWrite QueueChannel m
q) String
"qc-write"
QueueChannel m
q' <- StrictTVar m (Maybe (TQueue m Message))
-> StrictTVar m (Maybe (TQueue m Message)) -> QueueChannel m
forall (m :: * -> *).
StrictTVar m (Maybe (TQueue m Message))
-> StrictTVar m (Maybe (TQueue m Message)) -> QueueChannel m
QueueChannel (StrictTVar m (Maybe (TQueue m Message))
-> StrictTVar m (Maybe (TQueue m Message)) -> QueueChannel m)
-> STM m (StrictTVar m (Maybe (TQueue m Message)))
-> STM
m (StrictTVar m (Maybe (TQueue m Message)) -> QueueChannel m)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe (TQueue m Message)
-> STM m (StrictTVar m (Maybe (TQueue m Message)))
forall (m :: * -> *) a. MonadSTM m => a -> STM m (StrictTVar m a)
newTVar (TQueue m Message -> Maybe (TQueue m Message)
forall a. a -> Maybe a
Just TQueue m Message
write)
STM m (StrictTVar m (Maybe (TQueue m Message)) -> QueueChannel m)
-> STM m (StrictTVar m (Maybe (TQueue m Message)))
-> STM m (QueueChannel m)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Maybe (TQueue m Message)
-> STM m (StrictTVar m (Maybe (TQueue m Message)))
forall (m :: * -> *) a. MonadSTM m => a -> STM m (StrictTVar m a)
newTVar (TQueue m Message -> Maybe (TQueue m Message)
forall a. a -> Maybe a
Just TQueue m Message
read)
StrictTVar m (Maybe (TQueue m Message)) -> String -> STM m ()
forall (m :: * -> *) a.
MonadLabelledSTM m =>
StrictTVar m a -> String -> STM m ()
labelTVar (QueueChannel m -> StrictTVar m (Maybe (TQueue m Message))
forall (m :: * -> *).
QueueChannel m -> StrictTVar m (Maybe (TQueue m Message))
qcRead QueueChannel m
q') String
"qc-read'"
StrictTVar m (Maybe (TQueue m Message)) -> String -> STM m ()
forall (m :: * -> *) a.
MonadLabelledSTM m =>
StrictTVar m a -> String -> STM m ()
labelTVar (QueueChannel m -> StrictTVar m (Maybe (TQueue m Message))
forall (m :: * -> *).
QueueChannel m -> StrictTVar m (Maybe (TQueue m Message))
qcWrite QueueChannel m
q') String
"qc-write'"
(QueueChannel m, QueueChannel m)
-> STM m (QueueChannel m, QueueChannel m)
forall (m :: * -> *) a. Monad m => a -> m a
return (QueueChannel m
q, QueueChannel m
q')
data AttenuatedChannel m = AttenuatedChannel {
AttenuatedChannel m -> m ByteString
acRead :: m BL.ByteString,
AttenuatedChannel m -> ByteString -> m ()
acWrite :: BL.ByteString -> m (),
AttenuatedChannel m -> m ()
acClose :: m ()
}
data SuccessOrFailure = Success | Failure
type Size = Int64
data Attenuation = Attenuation {
Attenuation -> Time -> Size -> (DiffTime, SuccessOrFailure)
aReadAttenuation :: Time -> Size -> ( DiffTime,
SuccessOrFailure ),
Attenuation -> Maybe Int
aWriteAttenuation :: Maybe Int
}
newAttenuatedChannel :: forall m.
( MonadSTM m
, MonadTime m
, MonadTimer m
, MonadThrow m
, MonadThrow (STM m)
)
=> Tracer m AttenuatedChannelTrace
-> Attenuation
-> QueueChannel m
-> STM m (AttenuatedChannel m)
newAttenuatedChannel :: Tracer m AttenuatedChannelTrace
-> Attenuation -> QueueChannel m -> STM m (AttenuatedChannel m)
newAttenuatedChannel Tracer m AttenuatedChannelTrace
tr Attenuation { Time -> Size -> (DiffTime, SuccessOrFailure)
aReadAttenuation :: Time -> Size -> (DiffTime, SuccessOrFailure)
aReadAttenuation :: Attenuation -> Time -> Size -> (DiffTime, SuccessOrFailure)
aReadAttenuation,
Maybe Int
aWriteAttenuation :: Maybe Int
aWriteAttenuation :: Attenuation -> Maybe Int
aWriteAttenuation } QueueChannel m
qc = do
StrictTVar m Int
writeCounterVar <- Int -> STM m (StrictTVar m Int)
forall (m :: * -> *) a. MonadSTM m => a -> STM m (StrictTVar m a)
newTVar Int
0
AttenuatedChannel m -> STM m (AttenuatedChannel m)
forall (m :: * -> *) a. Monad m => a -> m a
return AttenuatedChannel :: forall (m :: * -> *).
m ByteString -> (ByteString -> m ()) -> m () -> AttenuatedChannel m
AttenuatedChannel { m ByteString
acRead :: m ByteString
acRead :: m ByteString
acRead
, acWrite :: ByteString -> m ()
acWrite = StrictTVar m Int -> ByteString -> m ()
acWrite StrictTVar m Int
writeCounterVar
, m ()
acClose :: m ()
acClose :: m ()
acClose
}
where
acRead :: m BL.ByteString
acRead :: m ByteString
acRead = do
Message
msg <- QueueChannel m -> m Message
forall (m :: * -> *).
(MonadSTM m, MonadThrow (STM m)) =>
QueueChannel m -> m Message
readQueueChannel QueueChannel m
qc
Time
t <- m Time
forall (m :: * -> *). MonadMonotonicTime m => m Time
getMonotonicTime
case Message
msg of
Message
MsgClose -> do
case Time -> Size -> (DiffTime, SuccessOrFailure)
aReadAttenuation Time
t Size
1 of
( DiffTime
d, SuccessOrFailure
_ ) -> Tracer m AttenuatedChannelTrace -> AttenuatedChannelTrace -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m AttenuatedChannelTrace
tr AttenuatedChannelTrace
AttChannRemoteClose
m () -> m () -> m ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> DiffTime -> m ()
forall (m :: * -> *). MonadDelay m => DiffTime -> m ()
threadDelay DiffTime
d
m () -> m ByteString -> m ByteString
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> MuxError -> m ByteString
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO (MuxErrorType -> String -> MuxError
MuxError MuxErrorType
MuxBearerClosed
String
"closed when reading data")
MsgBytes ByteString
bs ->
case Time -> Size -> (DiffTime, SuccessOrFailure)
aReadAttenuation Time
t (ByteString -> Size
BL.length ByteString
bs) of
( DiffTime
d, SuccessOrFailure
Success ) -> DiffTime -> m ()
forall (m :: * -> *). MonadDelay m => DiffTime -> m ()
threadDelay DiffTime
d
m () -> m ByteString -> m ByteString
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> ByteString -> m ByteString
forall (m :: * -> *) a. Monad m => a -> m a
return ByteString
bs
( DiffTime
d, SuccessOrFailure
Failure ) -> DiffTime -> m ()
forall (m :: * -> *). MonadDelay m => DiffTime -> m ()
threadDelay DiffTime
d
m () -> m ByteString -> m ByteString
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IOError -> m ByteString
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO (String -> String -> IOError
resourceVanishedIOError
String
"AttenuatedChannel.read"
String
"read attenuation")
acWrite :: StrictTVar m Int
-> BL.ByteString
-> m ()
acWrite :: StrictTVar m Int -> ByteString -> m ()
acWrite StrictTVar m Int
writeCounterVar ByteString
bs = do
Int
wCount <- STM m Int -> m Int
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m Int -> m Int) -> STM m Int -> m Int
forall a b. (a -> b) -> a -> b
$ do
StrictTVar m Int -> (Int -> Int) -> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
StrictTVar m a -> (a -> a) -> STM m ()
modifyTVar StrictTVar m Int
writeCounterVar Int -> Int
forall a. Enum a => a -> a
succ
StrictTVar m Int -> STM m Int
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m Int
writeCounterVar
case Maybe Int
aWriteAttenuation of
Just Int
limit | Int
wCount Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
limit
-> IOError -> m ()
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO (IOError -> m ()) -> IOError -> m ()
forall a b. (a -> b) -> a -> b
$
String -> String -> IOError
resourceVanishedIOError
String
"AttenuatedChannel.write"
String
"write limit reached (write attenuation)"
Maybe Int
_ -> () -> m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
Bool
sent <- QueueChannel m -> Message -> m Bool
forall (m :: * -> *).
MonadSTM m =>
QueueChannel m -> Message -> m Bool
writeQueueChannel QueueChannel m
qc (ByteString -> Message
MsgBytes ByteString
bs)
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Bool -> Bool
not Bool
sent) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
IOError -> m ()
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO (String -> String -> IOError
resourceVanishedIOError String
"AttenuatedChannel.write" String
"")
acClose :: m ()
acClose :: m ()
acClose = do
Bool
sent <- QueueChannel m -> Message -> m Bool
forall (m :: * -> *).
MonadSTM m =>
QueueChannel m -> Message -> m Bool
writeQueueChannel QueueChannel m
qc Message
MsgClose
Tracer m AttenuatedChannelTrace -> AttenuatedChannelTrace -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m AttenuatedChannelTrace
tr (Bool -> AttenuatedChannelTrace
AttChannLocalClose Bool
sent)
newConnectedAttenuatedChannelPair
:: forall m.
( MonadLabelledSTM m
, MonadTime m
, MonadTimer m
, MonadThrow m
, MonadThrow (STM m)
)
=> Tracer m AttenuatedChannelTrace
-> Tracer m AttenuatedChannelTrace
-> Attenuation
-> Attenuation
-> STM m (AttenuatedChannel m, AttenuatedChannel m)
newConnectedAttenuatedChannelPair :: Tracer m AttenuatedChannelTrace
-> Tracer m AttenuatedChannelTrace
-> Attenuation
-> Attenuation
-> STM m (AttenuatedChannel m, AttenuatedChannel m)
newConnectedAttenuatedChannelPair Tracer m AttenuatedChannelTrace
tr Tracer m AttenuatedChannelTrace
tr' Attenuation
attenuation Attenuation
attenuation' = do
(QueueChannel m
c, QueueChannel m
c') <- STM m (QueueChannel m, QueueChannel m)
forall (m :: * -> *).
(MonadSTM m, MonadLabelledSTM m) =>
STM m (QueueChannel m, QueueChannel m)
newConnectedQueueChannelPair
AttenuatedChannel m
b <- Tracer m AttenuatedChannelTrace
-> Attenuation -> QueueChannel m -> STM m (AttenuatedChannel m)
forall (m :: * -> *).
(MonadSTM m, MonadTime m, MonadTimer m, MonadThrow m,
MonadThrow (STM m)) =>
Tracer m AttenuatedChannelTrace
-> Attenuation -> QueueChannel m -> STM m (AttenuatedChannel m)
newAttenuatedChannel Tracer m AttenuatedChannelTrace
tr Attenuation
attenuation QueueChannel m
c
AttenuatedChannel m
b' <- Tracer m AttenuatedChannelTrace
-> Attenuation -> QueueChannel m -> STM m (AttenuatedChannel m)
forall (m :: * -> *).
(MonadSTM m, MonadTime m, MonadTimer m, MonadThrow m,
MonadThrow (STM m)) =>
Tracer m AttenuatedChannelTrace
-> Attenuation -> QueueChannel m -> STM m (AttenuatedChannel m)
newAttenuatedChannel Tracer m AttenuatedChannelTrace
tr' Attenuation
attenuation' QueueChannel m
c'
(AttenuatedChannel m, AttenuatedChannel m)
-> STM m (AttenuatedChannel m, AttenuatedChannel m)
forall (m :: * -> *) a. Monad m => a -> m a
return (AttenuatedChannel m
b, AttenuatedChannel m
b')
attenuationChannelAsMuxBearer :: forall m.
( MonadThrow m
, MonadMonotonicTime m
)
=> SDUSize
-> DiffTime
-> Tracer m MuxTrace
-> AttenuatedChannel m
-> MuxBearer m
attenuationChannelAsMuxBearer :: SDUSize
-> DiffTime
-> Tracer m MuxTrace
-> AttenuatedChannel m
-> MuxBearer m
attenuationChannelAsMuxBearer SDUSize
sduSize DiffTime
sduTimeout Tracer m MuxTrace
muxTracer AttenuatedChannel m
chan =
MuxBearer :: forall (m :: * -> *).
(TimeoutFn m -> MuxSDU -> m Time)
-> (TimeoutFn m -> m (MuxSDU, Time)) -> SDUSize -> MuxBearer m
MuxBearer {
read :: TimeoutFn m -> m (MuxSDU, Time)
read = TimeoutFn m -> m (MuxSDU, Time)
readMux,
write :: TimeoutFn m -> MuxSDU -> m Time
write = TimeoutFn m -> MuxSDU -> m Time
writeMux,
SDUSize
sduSize :: SDUSize
sduSize :: SDUSize
sduSize
}
where
readMux :: TimeoutFn m -> m (MuxSDU, Time)
readMux :: TimeoutFn m -> m (MuxSDU, Time)
readMux TimeoutFn m
timeoutFn = do
Tracer m MuxTrace -> MuxTrace -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m MuxTrace
muxTracer (MuxTrace -> m ()) -> MuxTrace -> m ()
forall a b. (a -> b) -> a -> b
$ MuxTrace
MuxTraceRecvHeaderStart
Maybe ByteString
mbuf <- DiffTime -> m ByteString -> m (Maybe ByteString)
TimeoutFn m
timeoutFn DiffTime
sduTimeout (m ByteString -> m (Maybe ByteString))
-> m ByteString -> m (Maybe ByteString)
forall a b. (a -> b) -> a -> b
$ AttenuatedChannel m -> m ByteString
forall (m :: * -> *). AttenuatedChannel m -> m ByteString
acRead AttenuatedChannel m
chan
case Maybe ByteString
mbuf of
Maybe ByteString
Nothing -> do
Tracer m MuxTrace -> MuxTrace -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m MuxTrace
muxTracer MuxTrace
MuxTraceSDUReadTimeoutException
MuxError -> m (MuxSDU, Time)
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO (MuxErrorType -> String -> MuxError
MuxError MuxErrorType
MuxSDUReadTimeout String
"Mux SDU Timeout")
Just ByteString
buf -> do
let (ByteString
hbuf, ByteString
payload) = Size -> ByteString -> (ByteString, ByteString)
BL.splitAt Size
8 ByteString
buf
case ByteString -> Either MuxError MuxSDU
decodeMuxSDU ByteString
hbuf of
Left MuxError
e -> MuxError -> m (MuxSDU, Time)
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO MuxError
e
Right MuxSDU
muxsdu -> do
let header :: MuxSDUHeader
header = MuxSDU -> MuxSDUHeader
msHeader MuxSDU
muxsdu
Tracer m MuxTrace -> MuxTrace -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m MuxTrace
muxTracer (MuxTrace -> m ()) -> MuxTrace -> m ()
forall a b. (a -> b) -> a -> b
$ MuxSDUHeader -> MuxTrace
MuxTraceRecvHeaderEnd MuxSDUHeader
header
Time
ts <- m Time
forall (m :: * -> *). MonadMonotonicTime m => m Time
getMonotonicTime
Tracer m MuxTrace -> MuxTrace -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m MuxTrace
muxTracer (MuxTrace -> m ()) -> MuxTrace -> m ()
forall a b. (a -> b) -> a -> b
$ MuxSDUHeader -> Time -> MuxTrace
MuxTraceRecvDeltaQObservation MuxSDUHeader
header Time
ts
(MuxSDU, Time) -> m (MuxSDU, Time)
forall (m :: * -> *) a. Monad m => a -> m a
return (MuxSDU
muxsdu {msBlob :: ByteString
msBlob = ByteString
payload}, Time
ts)
writeMux :: TimeoutFn m -> MuxSDU -> m Time
writeMux :: TimeoutFn m -> MuxSDU -> m Time
writeMux TimeoutFn m
_ MuxSDU
sdu = do
Time
ts <- m Time
forall (m :: * -> *). MonadMonotonicTime m => m Time
getMonotonicTime
let ts32 :: Word32
ts32 = Time -> Word32
timestampMicrosecondsLow32Bits Time
ts
sdu' :: MuxSDU
sdu' = MuxSDU -> RemoteClockModel -> MuxSDU
setTimestamp MuxSDU
sdu (Word32 -> RemoteClockModel
RemoteClockModel Word32
ts32)
buf :: ByteString
buf = MuxSDU -> ByteString
encodeMuxSDU MuxSDU
sdu'
Tracer m MuxTrace -> MuxTrace -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m MuxTrace
muxTracer (MuxTrace -> m ()) -> MuxTrace -> m ()
forall a b. (a -> b) -> a -> b
$ MuxSDUHeader -> MuxTrace
MuxTraceSendStart (MuxSDU -> MuxSDUHeader
msHeader MuxSDU
sdu')
AttenuatedChannel m -> ByteString -> m ()
forall (m :: * -> *). AttenuatedChannel m -> ByteString -> m ()
acWrite AttenuatedChannel m
chan ByteString
buf
Tracer m MuxTrace -> MuxTrace -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m MuxTrace
muxTracer (MuxTrace -> m ()) -> MuxTrace -> m ()
forall a b. (a -> b) -> a -> b
$ MuxTrace
MuxTraceSendEnd
Time -> m Time
forall (m :: * -> *) a. Monad m => a -> m a
return Time
ts
data AttenuatedChannelTrace =
AttChannLocalClose Bool
| AttChannRemoteClose
deriving Int -> AttenuatedChannelTrace -> ShowS
[AttenuatedChannelTrace] -> ShowS
AttenuatedChannelTrace -> String
(Int -> AttenuatedChannelTrace -> ShowS)
-> (AttenuatedChannelTrace -> String)
-> ([AttenuatedChannelTrace] -> ShowS)
-> Show AttenuatedChannelTrace
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [AttenuatedChannelTrace] -> ShowS
$cshowList :: [AttenuatedChannelTrace] -> ShowS
show :: AttenuatedChannelTrace -> String
$cshow :: AttenuatedChannelTrace -> String
showsPrec :: Int -> AttenuatedChannelTrace -> ShowS
$cshowsPrec :: Int -> AttenuatedChannelTrace -> ShowS
Show
resourceVanishedIOError :: String -> String -> IOError
resourceVanishedIOError :: String -> String -> IOError
resourceVanishedIOError String
ioe_location String
ioe_description = IOError :: Maybe Handle
-> IOErrorType
-> String
-> String
-> Maybe CInt
-> Maybe String
-> IOError
IOError
{ ioe_handle :: Maybe Handle
ioe_handle = Maybe Handle
forall a. Maybe a
Nothing
, ioe_type :: IOErrorType
ioe_type = IOErrorType
ResourceVanished
, String
ioe_location :: String
ioe_location :: String
ioe_location
, String
ioe_description :: String
ioe_description :: String
ioe_description
, ioe_errno :: Maybe CInt
ioe_errno = Maybe CInt
forall a. Maybe a
Nothing
, ioe_filename :: Maybe String
ioe_filename = Maybe String
forall a. Maybe a
Nothing
}