Safe Haskell | None |
---|---|
Language | Haskell2010 |
Synopsis
- newMux :: MonadSTM m => MiniProtocolBundle mode -> m ( Mux mode m)
- data Mux (mode :: MuxMode ) m
- data MuxMode where
- type family HasInitiator (mode :: MuxMode ) :: Bool where ...
- type family HasResponder (mode :: MuxMode ) :: Bool where ...
- newtype MiniProtocolBundle (mode :: MuxMode ) = MiniProtocolBundle [ MiniProtocolInfo mode]
-
data
MiniProtocolInfo
(mode ::
MuxMode
) =
MiniProtocolInfo
{
- miniProtocolNum :: ! MiniProtocolNum
- miniProtocolDir :: !( MiniProtocolDirection mode)
- miniProtocolLimits :: ! MiniProtocolLimits
- newtype MiniProtocolNum = MiniProtocolNum Word16
- data MiniProtocolDirection (mode :: MuxMode ) where
-
data
MiniProtocolLimits
=
MiniProtocolLimits
{
- maximumIngressQueue :: ! Int
- runMux :: forall m mode. ( MonadAsync m, MonadCatch m, MonadFork m, MonadLabelledSTM m, MonadThrow ( STM m), MonadTime m, MonadTimer m, MonadMask m) => Tracer m MuxTrace -> Mux mode m -> MuxBearer m -> m ()
- data MuxBearer m
- runMiniProtocol :: forall mode m a. ( MonadSTM m, MonadThrow m, MonadThrow ( STM m)) => Mux mode m -> MiniProtocolNum -> MiniProtocolDirection mode -> StartOnDemandOrEagerly -> ( Channel m -> m (a, Maybe ByteString )) -> m ( STM m ( Either SomeException a))
- data StartOnDemandOrEagerly
- stopMux :: MonadSTM m => Mux mode m -> m ()
- miniProtocolStateMap :: MonadSTM m => Mux mode m -> Map ( MiniProtocolNum , MiniProtocolDir ) ( STM m MiniProtocolStatus )
- muxStopped :: MonadSTM m => Mux mode m -> STM m ( Maybe SomeException )
-
data
MuxError
=
MuxError
{
- errorType :: ! MuxErrorType
- errorMsg :: ! String
- data MuxErrorType
- traceMuxBearerState :: Tracer m MuxTrace -> MuxBearerState -> m ()
- data MuxBearerState
-
data
MuxTrace
- = MuxTraceRecvHeaderStart
- | MuxTraceRecvHeaderEnd MuxSDUHeader
- | MuxTraceRecvDeltaQObservation MuxSDUHeader Time
- | MuxTraceRecvDeltaQSample Double Int Int Double Double Double Double String
- | MuxTraceRecvStart Int
- | MuxTraceRecvEnd Int
- | MuxTraceSendStart MuxSDUHeader
- | MuxTraceSendEnd
- | MuxTraceState MuxBearerState
- | MuxTraceCleanExit MiniProtocolNum MiniProtocolDir
- | MuxTraceExceptionExit MiniProtocolNum MiniProtocolDir SomeException
- | MuxTraceChannelRecvStart MiniProtocolNum
- | MuxTraceChannelRecvEnd MiniProtocolNum Int
- | MuxTraceChannelSendStart MiniProtocolNum Int
- | MuxTraceChannelSendEnd MiniProtocolNum
- | MuxTraceHandshakeStart
- | MuxTraceHandshakeClientEnd DiffTime
- | MuxTraceHandshakeServerEnd
- | forall e. Exception e => MuxTraceHandshakeClientError e DiffTime
- | forall e. Exception e => MuxTraceHandshakeServerError e
- | MuxTraceSDUReadTimeoutException
- | MuxTraceSDUWriteTimeoutException
- | MuxTraceStartEagerly MiniProtocolNum MiniProtocolDir
- | MuxTraceStartOnDemand MiniProtocolNum MiniProtocolDir
- | MuxTraceStartedOnDemand MiniProtocolNum MiniProtocolDir
- | MuxTraceTerminating MiniProtocolNum MiniProtocolDir
- | MuxTraceShutdown
- | MuxTraceTCPInfo StructTCPInfo Word16
- data WithMuxBearer peerid a = WithMuxBearer { }
Defining
Mux
protocol bundles
type family HasInitiator (mode :: MuxMode ) :: Bool where ... Source #
type family HasResponder (mode :: MuxMode ) :: Bool where ... Source #
newtype MiniProtocolBundle (mode :: MuxMode ) Source #
Application run by mux layer.
- enumeration of client application, e.g. a wallet application communicating with a node using ChainSync and TxSubmission protocols; this only requires to run client side of each protocol.
- enumeration of server applications: this application type is mostly useful tests.
- enumeration of both client and server applications, e.g. a full node serving downstream peers using server side of each protocol and getting updates from upstream peers using client side of each of the protocols.
MiniProtocolBundle [ MiniProtocolInfo mode] |
data MiniProtocolInfo (mode :: MuxMode ) Source #
newtype MiniProtocolNum Source #
The wire format includes the protocol numbers, and it's vital that these
are stable. They are not necessarily dense however, as new ones are added
and some old ones retired. So we use a dedicated class for this rather than
reusing
Enum
. This also covers unrecognised protocol numbers on the
decoding side.
Instances
data MiniProtocolDirection (mode :: MuxMode ) where Source #
Instances
Eq ( MiniProtocolDirection mode) Source # | |
Defined in Network.Mux.Types (==) :: MiniProtocolDirection mode -> MiniProtocolDirection mode -> Bool Source # (/=) :: MiniProtocolDirection mode -> MiniProtocolDirection mode -> Bool Source # |
|
Ord ( MiniProtocolDirection mode) Source # | |
Defined in Network.Mux.Types compare :: MiniProtocolDirection mode -> MiniProtocolDirection mode -> Ordering Source # (<) :: MiniProtocolDirection mode -> MiniProtocolDirection mode -> Bool Source # (<=) :: MiniProtocolDirection mode -> MiniProtocolDirection mode -> Bool Source # (>) :: MiniProtocolDirection mode -> MiniProtocolDirection mode -> Bool Source # (>=) :: MiniProtocolDirection mode -> MiniProtocolDirection mode -> Bool Source # max :: MiniProtocolDirection mode -> MiniProtocolDirection mode -> MiniProtocolDirection mode Source # min :: MiniProtocolDirection mode -> MiniProtocolDirection mode -> MiniProtocolDirection mode Source # |
data MiniProtocolLimits Source #
Per Miniprotocol limits
MiniProtocolLimits | |
|
Running the Mux
runMux :: forall m mode. ( MonadAsync m, MonadCatch m, MonadFork m, MonadLabelledSTM m, MonadThrow ( STM m), MonadTime m, MonadTimer m, MonadMask m) => Tracer m MuxTrace -> Mux mode m -> MuxBearer m -> m () Source #
runMux starts a mux bearer for the specified protocols corresponding to one of the provided Versions.
Isometric flow control: analysis of head-of-line blocking of the ingress side of the multiplexer
For each mini-protocol (enumerated by
ptcl
), mux will create two
channels. One for initiator and one for the responder. Each channel will use
a single
Wanton
. When it is filled, it is put in a common queue
tsrQueue
. This means that the queue is bound by
2 * |ptcl|
. Every side
of a mini-protocol is served by a single
Wanton
: when an application sends
data, the channel will try to put it into the
Wanton
(which might block).
Wanton
s are taken from the
tsrQueue
queue by one of mux threads. This
eliminates head of line blocking: each mini-protocol thread can block on
putting more bytes into its
Wanton
, but it cannot block the other
mini-protocols or the thread that is reading the
tsrQueue
queue. This is
ensured since the
muxChannel
will put only a non-empty
Wanton
to the
tsrQueue
queue, and on such wantons the queue is never blocked. This means
that the only way the queue can block is when its empty, which means that
none of the mini-protocols wanted to send. The egress part will read
a
Wanton
, take a fixed amount of bytes encode them in as an
MuxSDU
; if
there are leftovers it will put them back in the
Wanton
and place it at the
end of the queue (reading and writing to it will happen in a single STM
transaction which assures that the order of requests from a mini-protocol is
preserved.
Properties:
-
at any given time the
tsrQueue
contains at most oneTranslocationServiceRequest
from a given mini-protocol of the givenMiniProtocolDir
, thus the queue contains at most2 * |ptcl|
translocation requests. -
at any given time each
TranslocationServiceRequest
contains a non-emptyWanton
Low level access to underlying socket or pipe. There are three smart constructors:
-
socketAsMuxBearer
-
pipeAsMuxBearer
-
Test.Mux.queuesAsMuxBearer
runMiniProtocol :: forall mode m a. ( MonadSTM m, MonadThrow m, MonadThrow ( STM m)) => Mux mode m -> MiniProtocolNum -> MiniProtocolDirection mode -> StartOnDemandOrEagerly -> ( Channel m -> m (a, Maybe ByteString )) -> m ( STM m ( Either SomeException a)) Source #
Arrange to run a protocol thread (for a particular
MiniProtocolNum
and
MiniProtocolDirection
) to interact on this protocol's
Channel
.
The protocol thread can either be started eagerly or on-demand:
-
With
StartEagerly
, the thread is started promptly. This is appropriate for mini-protocols where the opening message may be sent by this thread. -
With
StartOnDemand
, the thread is not started until the first data is received for this mini-protocol. This is appropriate for mini-protocols where the opening message is sent by the remote peer.
The result is a STM action to block and wait on the protocol completion. It is safe to call this completion action multiple times: it will always return the same result once the protocol thread completes. In case the Mux has stopped, either due to an exception or because of a call to muxStop a `Left MuxError` will be returned from the STM action.
It is an error to start a new protocol thread while one is still running,
for the same
MiniProtocolNum
and
MiniProtocolDirection
. This can easily be
avoided by using the STM completion action to wait for the previous one to
finish.
It is safe to ask to start a protocol thread before
runMux
. In this case
the protocol thread will not actually start until
runMux
is called,
irrespective of the
StartOnDemandOrEagerly
value.
data StartOnDemandOrEagerly Source #
Instances
Eq StartOnDemandOrEagerly Source # | |
Defined in Network.Mux |
stopMux :: MonadSTM m => Mux mode m -> m () Source #
Shut down the mux. This will cause
runMux
to return. It does not
wait for any protocol threads to finish, so you should do that first if
necessary.
Monitoring
miniProtocolStateMap :: MonadSTM m => Mux mode m -> Map ( MiniProtocolNum , MiniProtocolDir ) ( STM m MiniProtocolStatus ) Source #
muxStopped :: MonadSTM m => Mux mode m -> STM m ( Maybe SomeException ) Source #
Await until mux stopped.
Errors
Error type used in across the mux layer.
MuxError | |
|
Instances
Show MuxError Source # | |
Generic MuxError Source # | |
Exception MuxError Source # | |
Defined in Network.Mux.Trace toException :: MuxError -> SomeException Source # fromException :: SomeException -> Maybe MuxError Source # displayException :: MuxError -> String Source # |
|
type Rep MuxError Source # | |
Defined in Network.Mux.Trace
type
Rep
MuxError
=
D1
('
MetaData
"MuxError" "Network.Mux.Trace" "network-mux-0.1.0.1-7ZKx91o48G8EWFMuCmTsQc" '
False
) (
C1
('
MetaCons
"MuxError" '
PrefixI
'
True
) (
S1
('
MetaSel
('
Just
"errorType") '
NoSourceUnpackedness
'
SourceStrict
'
DecidedStrict
) (
Rec0
MuxErrorType
)
:*:
S1
('
MetaSel
('
Just
"errorMsg") '
NoSourceUnpackedness
'
SourceStrict
'
DecidedStrict
) (
Rec0
String
)))
|
data MuxErrorType Source #
Enumeration of error conditions.
MuxUnknownMiniProtocol |
returned by
|
MuxDecodeError |
return by
|
MuxBearerClosed |
thrown by
|
MuxIngressQueueOverRun |
thrown by
|
MuxInitiatorOnly |
thrown when data arrives on a responder channel when the
mux was set up as an
|
MuxIOException IOException |
|
MuxSDUReadTimeout |
thrown when reading of a single SDU takes too long |
MuxSDUWriteTimeout |
thrown when writing a single SDU takes too long |
MuxShutdown !( Maybe MuxErrorType ) |
Result of runMiniProtocol's completionAction in case of an error or mux being closed while a mini-protocol was still running, this is not a clean exit. |
MuxCleanShutdown |
Mux stopped by
|
Instances
Eq MuxErrorType Source # | |
Defined in Network.Mux.Trace (==) :: MuxErrorType -> MuxErrorType -> Bool Source # (/=) :: MuxErrorType -> MuxErrorType -> Bool Source # |
|
Show MuxErrorType Source # | |
Defined in Network.Mux.Trace |
Tracing
traceMuxBearerState :: Tracer m MuxTrace -> MuxBearerState -> m () Source #
data MuxBearerState Source #
Mature |
MuxBearer has successfully completed the handshake. |
Dead |
MuxBearer is dead and the underlying bearer has been closed. |
Instances
Eq MuxBearerState Source # | |
Defined in Network.Mux.Trace (==) :: MuxBearerState -> MuxBearerState -> Bool Source # (/=) :: MuxBearerState -> MuxBearerState -> Bool Source # |
|
Show MuxBearerState Source # | |
Defined in Network.Mux.Trace |
Enumeration of Mux events that can be traced.
data WithMuxBearer peerid a Source #
Type used for tracing mux events.
Instances
( Show peerid, Show a) => Show ( WithMuxBearer peerid a) Source # | |
Defined in Network.Mux.Trace |
|
Generic ( WithMuxBearer peerid a) Source # | |
Defined in Network.Mux.Trace from :: WithMuxBearer peerid a -> Rep ( WithMuxBearer peerid a) x Source # to :: Rep ( WithMuxBearer peerid a) x -> WithMuxBearer peerid a Source # |
|
type Rep ( WithMuxBearer peerid a) Source # | |
Defined in Network.Mux.Trace
type
Rep
(
WithMuxBearer
peerid a) =
D1
('
MetaData
"WithMuxBearer" "Network.Mux.Trace" "network-mux-0.1.0.1-7ZKx91o48G8EWFMuCmTsQc" '
False
) (
C1
('
MetaCons
"WithMuxBearer" '
PrefixI
'
True
) (
S1
('
MetaSel
('
Just
"wmbPeerId") '
NoSourceUnpackedness
'
SourceStrict
'
DecidedStrict
) (
Rec0
peerid)
:*:
S1
('
MetaSel
('
Just
"wmbEvent") '
NoSourceUnpackedness
'
SourceStrict
'
DecidedStrict
) (
Rec0
a)))
|