Safe Haskell | None |
---|---|
Language | Haskell2010 |
Let's start with the big picture...
Key: ┏━━━━━━━━━━━━┓ ╔═════════════╗ ┏━━━━━━━━━━━━━━┓ ╔════════════╗ ┃ STM-based ┃ ║active thread║ ┃state instance┃┓ ║ one thread ║╗ ┃shared state┃ ║ ║ ┃ per peer ┃┃ ║ per peer ║║ ┗━━━━━━━━━━━━┛ ╚═════════════╝ ┗━━━━━━━━━━━━━━┛┃ ╚════════════╝║ ┗━━━━━━━━━━━━━━┛ ╚════════════╝
╔═════════════╗ ┏━━━━━━━━━━━━━┓ ║ Chain sync ║╗ ┃ Ledger ┃ ║ protocol ║║◀───┨ state ┃◀───────────╮ ║(client side)║║ ┃ ┃ │ ╚══════╤══════╝║ ┗━━━━━━━━━━━━━┛ │ ╚═════╪═══════╝ │ ▼ │ ┏━━━━━━━━━━━━━┓ ┏━━━━━━━━━━━━━┓ ╔══════╧══════╗ ┃ Candidate ┃ ┃ Set of ┃ ║ Chain and ║ ┃ chains ┃ ┃ downloaded ┠────▶║ ledger ║ ┃ (headers) ┃ ┃ blocks ┃ ║ validation ║ ┗━━━━━┯━━━━━━━┛ ┗━━━━━┯━━━━━━━┛ ╚══════╤══════╝ │ │ ▲ │ │ ╭─────────────────╯ │ │ ░░░░░░░░▼░▼░░░░░░░░ │ ▼ ░░╔═════════════╗░░ │ ┏━━━━━━━━━━━━━┓ ╔═════════════╗ ░░║ Block ║░░ │ ┃ Current ┃ ║ Block fetch ║╗ ░░╢ fetch ║◀────────────┼───────────┨ chain ┠────▶║ protocol ║║ ░░║ logic ║░░ │ ┃ (blocks) ┃ ║(server side)║║ ░░╚═════════════╝░░ │ ┠─────────────┨ ╚═════════════╝║ ░░░░░░░░░▲░░░░░░░░░ │ ┃ Tentative ┃ ╚═════════════╝ ░░░░░░░░░▼░░░░░░░░░░░░░░░░░░░░│░░░░░░░░ ┃ chain ┠──╮ ░░┏━━━━━━━━━━━━━┓░░░░░╔═══════╧═════╗░░ ┃ (headers) ┃ │ ╔═════════════╗ ░░┃ Block fetch ┃┓░░░░║ block fetch ║╗░ ┗━━━━━━━━━━━━━┛ │ ║ Chain sync ║╗ ░░┃ state and ┃┃◀──▶║ protocol ║║░ ╰─▶║ protocol ║║ ░░┃ requests ┃┃░░░░║(client side)║║░ ║(server side)║║ ░░┗━━━━━━━━━━━━━┛┃░░░░╚═════════════╝║░ ╚═════════════╝║ ░░░┗━━━━━━━━━━━━━┛░░░░░╚═════════════╝░ ╚═════════════╝ ░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░
Notes:
- Thread communication is via STM based state.
- Outbound: threads update STM state.
- Inbound: threads wait on STM state changing (using retry).
- These are no queues: there is only the current state, not all change events.
We consider the block fetch logic and the policy for the block fetch protocol client together as one unit of functionality. This is the shaded area in the diagram.
Looking at the diagram we see that these two threads interact with each other and other threads via the following shared state
State | Interactions | Internal/External |
---|---|---|
Candidate chains (headers) | Read | External |
Current chain (blocks) | Read | External |
Set of downloaded blocks | Read & Write | External |
Block fetch requests | Read & Write | Internal |
The block fetch requests state is private between the block fetch logic and the block fetch protocol client, so it is implemented here.
The other state is managed by the consensus layer and is considered external here. So here we define interfaces for interacting with the external state. These have to be provided when instantiating the block fetch logic.
Synopsis
- blockFetchLogic :: forall peer header block m. ( HasHeader header, HasHeader block, HeaderHash header ~ HeaderHash block, MonadDelay m, MonadMonotonicTime m, MonadSTM m, Ord peer, Hashable peer) => Tracer m [ TraceLabelPeer peer ( FetchDecision [ Point header])] -> Tracer m ( TraceLabelPeer peer ( TraceFetchClientState header)) -> BlockFetchConsensusInterface peer header block m -> FetchClientRegistry peer header block m -> BlockFetchConfiguration -> m Void
-
data
BlockFetchConfiguration
=
BlockFetchConfiguration
{
- bfcMaxConcurrencyBulkSync :: ! Word
- bfcMaxConcurrencyDeadline :: ! Word
- bfcMaxRequestsInflight :: ! Word
- bfcDecisionLoopInterval :: ! DiffTime
- bfcSalt :: ! Int
-
data
BlockFetchConsensusInterface
peer header block m =
BlockFetchConsensusInterface
{
- readCandidateChains :: STM m ( Map peer ( AnchoredFragment header))
- readCurrentChain :: STM m ( AnchoredFragment header)
- readFetchMode :: STM m FetchMode
- readFetchedBlocks :: STM m ( Point block -> Bool )
- mkAddFetchedBlock :: WhetherReceivingTentativeBlocks -> STM m ( Point block -> block -> m ())
- readFetchedMaxSlotNo :: STM m MaxSlotNo
- plausibleCandidateChain :: HasCallStack => AnchoredFragment header -> AnchoredFragment header -> Bool
- compareCandidateChains :: HasCallStack => AnchoredFragment header -> AnchoredFragment header -> Ordering
- blockFetchSize :: header -> SizeInBytes
- blockMatchesHeader :: header -> block -> Bool
- headerForgeUTCTime :: FromConsensus header -> STM m UTCTime
- blockForgeUTCTime :: FromConsensus block -> STM m UTCTime
- type FetchDecision result = Either FetchDecline result
-
data
TraceFetchClientState
header
- = AddedFetchRequest ( FetchRequest header) ( PeerFetchInFlight header) PeerFetchInFlightLimits ( PeerFetchStatus header)
- | AcknowledgedFetchRequest ( FetchRequest header)
- | SendFetchRequest ( AnchoredFragment header)
- | StartedFetchBatch ( ChainRange ( Point header)) ( PeerFetchInFlight header) PeerFetchInFlightLimits ( PeerFetchStatus header)
- | CompletedBlockFetch ( Point header) ( PeerFetchInFlight header) PeerFetchInFlightLimits ( PeerFetchStatus header) NominalDiffTime SizeInBytes
- | CompletedFetchBatch ( ChainRange ( Point header)) ( PeerFetchInFlight header) PeerFetchInFlightLimits ( PeerFetchStatus header)
- | RejectedFetchBatch ( ChainRange ( Point header)) ( PeerFetchInFlight header) PeerFetchInFlightLimits ( PeerFetchStatus header)
- | ClientTerminating Int
- data TraceLabelPeer peerid a = TraceLabelPeer peerid a
- data FetchClientRegistry peer header block m
- newFetchClientRegistry :: MonadSTM m => m ( FetchClientRegistry peer header block m)
- bracketFetchClient :: forall m a peer header block. ( MonadThrow m, MonadSTM m, MonadFork m, MonadMask m, Ord peer) => FetchClientRegistry peer header block m -> NodeToNodeVersion -> peer -> ( FetchClientContext header block m -> m a) -> m a
- bracketSyncWithFetchClient :: forall m a peer header block. ( MonadThrow m, MonadSTM m, MonadFork m, Ord peer) => FetchClientRegistry peer header block m -> peer -> m a -> m a
- bracketKeepAliveClient :: forall m a peer header block. ( MonadThrow m, MonadSTM m, MonadFork m, MonadMask m, Ord peer) => FetchClientRegistry peer header block m -> peer -> ( StrictTVar m ( Map peer PeerGSV ) -> m a) -> m a
- data FetchMode
-
newtype
FromConsensus
a =
FromConsensus
{
- unFromConsensus :: a
- type SizeInBytes = Word32
- data WhetherReceivingTentativeBlocks
Documentation
blockFetchLogic :: forall peer header block m. ( HasHeader header, HasHeader block, HeaderHash header ~ HeaderHash block, MonadDelay m, MonadMonotonicTime m, MonadSTM m, Ord peer, Hashable peer) => Tracer m [ TraceLabelPeer peer ( FetchDecision [ Point header])] -> Tracer m ( TraceLabelPeer peer ( TraceFetchClientState header)) -> BlockFetchConsensusInterface peer header block m -> FetchClientRegistry peer header block m -> BlockFetchConfiguration -> m Void Source #
Execute the block fetch logic. It monitors the current chain and candidate chains. It decided which block bodies to fetch and manages the process of fetching them, including making alternative decisions based on timeouts and failures.
This runs forever and should be shut down using mechanisms such as async.
data BlockFetchConfiguration Source #
Configuration for FetchDecisionPolicy. Should be determined by external local node config.
BlockFetchConfiguration | |
|
data BlockFetchConsensusInterface peer header block m Source #
The consensus layer functionality that the block fetch logic requires.
These are provided as input to the block fetch by the consensus layer.
BlockFetchConsensusInterface | |
|
Tracer types
type FetchDecision result = Either FetchDecline result Source #
Throughout the decision making process we accumulate reasons to decline to fetch any blocks. This type is used to wrap intermediate and final results.
data TraceFetchClientState header Source #
Tracing types for the various events that change the state
(i.e.
FetchClientStateVars
) for a block fetch client.
Note that while these are all state changes, the
AddedFetchRequest
occurs
in the decision thread while the other state changes occur in the block
fetch client threads.
AddedFetchRequest ( FetchRequest header) ( PeerFetchInFlight header) PeerFetchInFlightLimits ( PeerFetchStatus header) |
The block fetch decision thread has added a new fetch instruction consisting of one or more individual request ranges. |
AcknowledgedFetchRequest ( FetchRequest header) |
Mark the point when the fetch client picks up the request added
by the block fetch decision thread. Note that this event can happen
fewer times than the
|
SendFetchRequest ( AnchoredFragment header) |
Mark the point when fetch request for a fragment is actually sent over the wire. |
StartedFetchBatch ( ChainRange ( Point header)) ( PeerFetchInFlight header) PeerFetchInFlightLimits ( PeerFetchStatus header) |
Mark the start of receiving a streaming batch of blocks. This will
be followed by one or more
|
CompletedBlockFetch ( Point header) ( PeerFetchInFlight header) PeerFetchInFlightLimits ( PeerFetchStatus header) NominalDiffTime SizeInBytes |
Mark the completion of of receiving a single block within a streaming batch of blocks. |
CompletedFetchBatch ( ChainRange ( Point header)) ( PeerFetchInFlight header) PeerFetchInFlightLimits ( PeerFetchStatus header) |
Mark the successful end of receiving a streaming batch of blocks |
RejectedFetchBatch ( ChainRange ( Point header)) ( PeerFetchInFlight header) PeerFetchInFlightLimits ( PeerFetchStatus header) |
If the other peer rejects our request then we have this event
instead of
|
ClientTerminating Int |
The client is terminating. Log the number of outstanding requests. |
Instances
( StandardHash header, Show header) => Show ( TraceFetchClientState header) Source # | |
Defined in Ouroboros.Network.BlockFetch.ClientState |
data TraceLabelPeer peerid a Source #
A peer label for use in
Tracer
s. This annotates tracer output as being
associated with a given peer identifier.
TraceLabelPeer peerid a |
Instances
Functor ( TraceLabelPeer peerid) | |
Defined in Network.Mux.Trace fmap :: (a -> b) -> TraceLabelPeer peerid a -> TraceLabelPeer peerid b Source # (<$) :: a -> TraceLabelPeer peerid b -> TraceLabelPeer peerid a Source # |
|
( Eq peerid, Eq a) => Eq ( TraceLabelPeer peerid a) | |
Defined in Network.Mux.Trace (==) :: TraceLabelPeer peerid a -> TraceLabelPeer peerid a -> Bool Source # (/=) :: TraceLabelPeer peerid a -> TraceLabelPeer peerid a -> Bool Source # |
|
( Show peerid, Show a) => Show ( TraceLabelPeer peerid a) | |
Defined in Network.Mux.Trace |
The
FetchClientRegistry
data FetchClientRegistry peer header block m Source #
A registry for the threads that are executing the client side of the
BlockFetch
protocol to communicate with our peers.
The registry contains the shared variables we use to communicate with these threads, both to track their status and to provide instructions.
The threads add/remove themselves to/from this registry when they start up and shut down.
newFetchClientRegistry :: MonadSTM m => m ( FetchClientRegistry peer header block m) Source #
bracketFetchClient :: forall m a peer header block. ( MonadThrow m, MonadSTM m, MonadFork m, MonadMask m, Ord peer) => FetchClientRegistry peer header block m -> NodeToNodeVersion -> peer -> ( FetchClientContext header block m -> m a) -> m a Source #
This is needed to start a block fetch client. It provides the required
FetchClientContext
. It registers and unregisters the fetch client on
start and end.
It also manages synchronisation with the corresponding chain sync client.
bracketSyncWithFetchClient :: forall m a peer header block. ( MonadThrow m, MonadSTM m, MonadFork m, Ord peer) => FetchClientRegistry peer header block m -> peer -> m a -> m a Source #
The block fetch and chain sync clients for each peer need to synchronise their startup and shutdown. This bracket operation provides that synchronisation for the chain sync client.
This must be used for the chain sync client outside of its own state registration and deregistration.
bracketKeepAliveClient :: forall m a peer header block. ( MonadThrow m, MonadSTM m, MonadFork m, MonadMask m, Ord peer) => FetchClientRegistry peer header block m -> peer -> ( StrictTVar m ( Map peer PeerGSV ) -> m a) -> m a Source #
Re-export types used by
BlockFetchConsensusInterface
FetchModeBulkSync |
Use this mode when we are catching up on the chain but are stil well behind. In this mode the fetch logic will optimise for throughput rather than latency. |
FetchModeDeadline |
Use this mode for block-producing nodes that have a known deadline to produce a block and need to get the best chain before that. In this mode the fetch logic will optimise for picking the best chain within the given deadline. |
newtype FromConsensus a Source #
A new type used to emphasize the precondition of
headerForgeUTCTime
and
blockForgeUTCTime
at each call site.
At time of writing, the
a
is either a header or a block. The headers are
literally from Consensus (ie provided by ChainSync). Blocks, on the other
hand, are indirectly from Consensus: they were fetched only because we
favored the corresponding header that Consensus provided.
NOTE: We define it here so that it can be used consistently throughout the
implementation; definiting it only in
BlockFetchConsensusInterface
would be too
late.
Instances
Functor FromConsensus Source # | |
Defined in Ouroboros.Network.BlockFetch.ClientState fmap :: (a -> b) -> FromConsensus a -> FromConsensus b Source # (<$) :: a -> FromConsensus b -> FromConsensus a Source # |
|
Applicative FromConsensus Source # | |
Defined in Ouroboros.Network.BlockFetch.ClientState pure :: a -> FromConsensus a Source # (<*>) :: FromConsensus (a -> b) -> FromConsensus a -> FromConsensus b Source # liftA2 :: (a -> b -> c) -> FromConsensus a -> FromConsensus b -> FromConsensus c Source # (*>) :: FromConsensus a -> FromConsensus b -> FromConsensus b Source # (<*) :: FromConsensus a -> FromConsensus b -> FromConsensus a Source # |
type SizeInBytes = Word32 Source #
data WhetherReceivingTentativeBlocks Source #
Whether the block fetch peer is sending tentative blocks, which are understood to possibly be invalid