{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeFamilies #-}
{-# OPTIONS_GHC -Wno-incomplete-uni-patterns #-}
module Ouroboros.Network.BlockFetch.Client
(
blockFetchClient
, BlockFetchClient
, FetchClientContext
, TraceFetchClientState
, FetchRequest (..)
, FetchClientStateVars
, BlockFetchProtocolFailure
) where
import Control.Exception (assert)
import Control.Monad (unless)
import Control.Monad.Class.MonadSTM.Strict
import Control.Monad.Class.MonadThrow
import Control.Monad.Class.MonadTime
import qualified Data.Set as Set
import Control.Tracer (traceWith)
import Ouroboros.Network.Block
import Network.TypedProtocol.Core
import Network.TypedProtocol.Pipelined
import Ouroboros.Network.Mux (ControlMessageSTM)
import Ouroboros.Network.NodeToNode.Version (NodeToNodeVersion (..))
import Ouroboros.Network.Protocol.BlockFetch.Type
import Ouroboros.Network.AnchoredFragment (AnchoredFragment)
import qualified Ouroboros.Network.AnchoredFragment as AF
import Ouroboros.Network.BlockFetch.ClientState
(FetchClientContext (..), FetchClientPolicy (..),
FetchClientStateVars (fetchClientInFlightVar),
FetchRequest (..), FromConsensus (..),
PeerFetchInFlight (..), TraceFetchClientState (..),
acknowledgeFetchRequest, completeBlockDownload,
completeFetchBatch, fetchClientCtxStateVars,
rejectedFetchBatch, startedFetchBatch)
import Ouroboros.Network.BlockFetch.DeltaQ
(PeerFetchInFlightLimits (..), PeerGSV (..))
import Ouroboros.Network.PeerSelection.PeerMetric.Type
(FetchedMetricsTracer)
data BlockFetchProtocolFailure =
BlockFetchProtocolFailureTooFewBlocks
| BlockFetchProtocolFailureTooManyBlocks
| BlockFetchProtocolFailureWrongBlock
| BlockFetchProtocolFailureInvalidBody
deriving (BlockFetchProtocolFailure -> BlockFetchProtocolFailure -> Bool
(BlockFetchProtocolFailure -> BlockFetchProtocolFailure -> Bool)
-> (BlockFetchProtocolFailure -> BlockFetchProtocolFailure -> Bool)
-> Eq BlockFetchProtocolFailure
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: BlockFetchProtocolFailure -> BlockFetchProtocolFailure -> Bool
$c/= :: BlockFetchProtocolFailure -> BlockFetchProtocolFailure -> Bool
== :: BlockFetchProtocolFailure -> BlockFetchProtocolFailure -> Bool
$c== :: BlockFetchProtocolFailure -> BlockFetchProtocolFailure -> Bool
Eq, Int -> BlockFetchProtocolFailure -> ShowS
[BlockFetchProtocolFailure] -> ShowS
BlockFetchProtocolFailure -> String
(Int -> BlockFetchProtocolFailure -> ShowS)
-> (BlockFetchProtocolFailure -> String)
-> ([BlockFetchProtocolFailure] -> ShowS)
-> Show BlockFetchProtocolFailure
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [BlockFetchProtocolFailure] -> ShowS
$cshowList :: [BlockFetchProtocolFailure] -> ShowS
show :: BlockFetchProtocolFailure -> String
$cshow :: BlockFetchProtocolFailure -> String
showsPrec :: Int -> BlockFetchProtocolFailure -> ShowS
$cshowsPrec :: Int -> BlockFetchProtocolFailure -> ShowS
Show)
instance Exception BlockFetchProtocolFailure
type BlockFetchClient header block m a =
FetchClientContext header block m ->
PeerPipelined (BlockFetch block (Point block)) AsClient BFIdle m a
blockFetchClient :: forall header block m.
(MonadSTM m, MonadThrow m, MonadTime m,
HasHeader header, HasHeader block,
HeaderHash header ~ HeaderHash block)
=> NodeToNodeVersion
-> ControlMessageSTM m
-> FetchedMetricsTracer m
-> FetchClientContext header block m
-> PeerPipelined (BlockFetch block (Point block)) AsClient BFIdle m ()
blockFetchClient :: NodeToNodeVersion
-> ControlMessageSTM m
-> FetchedMetricsTracer m
-> FetchClientContext header block m
-> PeerPipelined
(BlockFetch block (Point block)) 'AsClient 'BFIdle m ()
blockFetchClient NodeToNodeVersion
_version ControlMessageSTM m
controlMessageSTM FetchedMetricsTracer m
reportFetched
FetchClientContext {
fetchClientCtxTracer :: forall header block (m :: * -> *).
FetchClientContext header block m
-> Tracer m (TraceFetchClientState header)
fetchClientCtxTracer = Tracer m (TraceFetchClientState header)
tracer,
fetchClientCtxPolicy :: forall header block (m :: * -> *).
FetchClientContext header block m
-> FetchClientPolicy header block m
fetchClientCtxPolicy = FetchClientPolicy {
header -> SizeInBytes
blockFetchSize :: forall header block (m :: * -> *).
FetchClientPolicy header block m -> header -> SizeInBytes
blockFetchSize :: header -> SizeInBytes
blockFetchSize,
header -> block -> Bool
blockMatchesHeader :: forall header block (m :: * -> *).
FetchClientPolicy header block m -> header -> block -> Bool
blockMatchesHeader :: header -> block -> Bool
blockMatchesHeader,
Point block -> block -> m ()
addFetchedBlock :: forall header block (m :: * -> *).
FetchClientPolicy header block m -> Point block -> block -> m ()
addFetchedBlock :: Point block -> block -> m ()
addFetchedBlock,
FromConsensus block -> STM m UTCTime
blockForgeUTCTime :: forall header block (m :: * -> *).
FetchClientPolicy header block m
-> FromConsensus block -> STM m UTCTime
blockForgeUTCTime :: FromConsensus block -> STM m UTCTime
blockForgeUTCTime
},
fetchClientCtxStateVars :: forall header block (m :: * -> *).
FetchClientContext header block m -> FetchClientStateVars m header
fetchClientCtxStateVars = FetchClientStateVars m header
stateVars
} =
PeerSender
(BlockFetch block (Point block)) 'AsClient 'BFIdle 'Z () m ()
-> PeerPipelined
(BlockFetch block (Point block)) 'AsClient 'BFIdle m ()
forall ps (pr :: PeerRole) (st :: ps) c (m :: * -> *) a.
PeerSender ps pr st 'Z c m a -> PeerPipelined ps pr st m a
PeerPipelined (Nat 'Z
-> PeerSender
(BlockFetch block (Point block)) 'AsClient 'BFIdle 'Z () m ()
forall (n :: N).
Nat n
-> PeerSender
(BlockFetch block (Point block)) 'AsClient 'BFIdle n () m ()
senderAwait Nat 'Z
forall (n :: N). ('Z ~ n) => Nat n
Zero)
where
senderIdle :: forall n.
Nat n
-> PeerSender (BlockFetch block (Point block)) AsClient
BFIdle n () m ()
senderIdle :: Nat n
-> PeerSender
(BlockFetch block (Point block)) 'AsClient 'BFIdle n () m ()
senderIdle (Succ Nat n
outstanding) =
Maybe
(PeerSender
(BlockFetch block (Point block)) 'AsClient 'BFIdle ('S n) () m ())
-> (()
-> PeerSender
(BlockFetch block (Point block)) 'AsClient 'BFIdle n () m ())
-> PeerSender
(BlockFetch block (Point block)) 'AsClient 'BFIdle ('S n) () m ()
forall ps (pr :: PeerRole) (st :: ps) (n1 :: N) c (m :: * -> *) a.
Maybe (PeerSender ps pr st ('S n1) c m a)
-> (c -> PeerSender ps pr st n1 c m a)
-> PeerSender ps pr st ('S n1) c m a
SenderCollect (PeerSender
(BlockFetch block (Point block)) 'AsClient 'BFIdle ('S n) () m ()
-> Maybe
(PeerSender
(BlockFetch block (Point block)) 'AsClient 'BFIdle ('S n) () m ())
forall a. a -> Maybe a
Just (Nat ('S n)
-> PeerSender
(BlockFetch block (Point block)) 'AsClient 'BFIdle ('S n) () m ()
forall (n :: N).
Nat n
-> PeerSender
(BlockFetch block (Point block)) 'AsClient 'BFIdle n () m ()
senderAwait (Nat n -> Nat ('S n)
forall (m :: N) (n :: N). (m ~ 'S n) => Nat n -> Nat m
Succ Nat n
outstanding)))
(\()
_ -> Nat n
-> PeerSender
(BlockFetch block (Point block)) 'AsClient 'BFIdle n () m ()
forall (n :: N).
Nat n
-> PeerSender
(BlockFetch block (Point block)) 'AsClient 'BFIdle n () m ()
senderIdle Nat n
outstanding)
senderIdle Nat n
Zero = m (PeerSender
(BlockFetch block (Point block)) 'AsClient 'BFIdle n () m ())
-> PeerSender
(BlockFetch block (Point block)) 'AsClient 'BFIdle n () m ()
forall (m :: * -> *) ps (pr :: PeerRole) (st :: ps) (n :: N) c a.
m (PeerSender ps pr st n c m a) -> PeerSender ps pr st n c m a
SenderEffect (m (PeerSender
(BlockFetch block (Point block)) 'AsClient 'BFIdle n () m ())
-> PeerSender
(BlockFetch block (Point block)) 'AsClient 'BFIdle n () m ())
-> m (PeerSender
(BlockFetch block (Point block)) 'AsClient 'BFIdle n () m ())
-> PeerSender
(BlockFetch block (Point block)) 'AsClient 'BFIdle n () m ()
forall a b. (a -> b) -> a -> b
$ do
PeerFetchInFlight {
Word
peerFetchReqsInFlight :: forall header. PeerFetchInFlight header -> Word
peerFetchReqsInFlight :: Word
peerFetchReqsInFlight,
SizeInBytes
peerFetchBytesInFlight :: forall header. PeerFetchInFlight header -> SizeInBytes
peerFetchBytesInFlight :: SizeInBytes
peerFetchBytesInFlight,
Set (Point header)
peerFetchBlocksInFlight :: forall header. PeerFetchInFlight header -> Set (Point header)
peerFetchBlocksInFlight :: Set (Point header)
peerFetchBlocksInFlight
} <- STM m (PeerFetchInFlight header) -> m (PeerFetchInFlight header)
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m (PeerFetchInFlight header) -> m (PeerFetchInFlight header))
-> STM m (PeerFetchInFlight header) -> m (PeerFetchInFlight header)
forall a b. (a -> b) -> a -> b
$ StrictTVar m (PeerFetchInFlight header)
-> STM m (PeerFetchInFlight header)
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar (FetchClientStateVars m header
-> StrictTVar m (PeerFetchInFlight header)
forall (m :: * -> *) header.
FetchClientStateVars m header
-> StrictTVar m (PeerFetchInFlight header)
fetchClientInFlightVar FetchClientStateVars m header
stateVars)
Bool
-> m (PeerSender
(BlockFetch block (Point block)) 'AsClient 'BFIdle n () m ())
-> m (PeerSender
(BlockFetch block (Point block)) 'AsClient 'BFIdle n () m ())
forall a. HasCallStack => Bool -> a -> a
assert
( Word
peerFetchReqsInFlight Word -> Word -> Bool
forall a. Eq a => a -> a -> Bool
== Word
0 Bool -> Bool -> Bool
&&
SizeInBytes
peerFetchBytesInFlight SizeInBytes -> SizeInBytes -> Bool
forall a. Eq a => a -> a -> Bool
== SizeInBytes
0 Bool -> Bool -> Bool
&&
Set (Point header) -> Bool
forall a. Set a -> Bool
Set.null Set (Point header)
peerFetchBlocksInFlight )
(m (PeerSender
(BlockFetch block (Point block)) 'AsClient 'BFIdle n () m ())
-> m (PeerSender
(BlockFetch block (Point block)) 'AsClient 'BFIdle n () m ()))
-> m (PeerSender
(BlockFetch block (Point block)) 'AsClient 'BFIdle n () m ())
-> m (PeerSender
(BlockFetch block (Point block)) 'AsClient 'BFIdle n () m ())
forall a b. (a -> b) -> a -> b
$ PeerSender
(BlockFetch block (Point block)) 'AsClient 'BFIdle n () m ()
-> m (PeerSender
(BlockFetch block (Point block)) 'AsClient 'BFIdle n () m ())
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Nat n
-> PeerSender
(BlockFetch block (Point block)) 'AsClient 'BFIdle n () m ()
forall (n :: N).
Nat n
-> PeerSender
(BlockFetch block (Point block)) 'AsClient 'BFIdle n () m ()
senderAwait Nat n
forall (n :: N). ('Z ~ n) => Nat n
Zero)
senderAwait :: forall n.
Nat n
-> PeerSender (BlockFetch block (Point block)) AsClient
BFIdle n () m ()
senderAwait :: Nat n
-> PeerSender
(BlockFetch block (Point block)) 'AsClient 'BFIdle n () m ()
senderAwait Nat n
outstanding =
m (PeerSender
(BlockFetch block (Point block)) 'AsClient 'BFIdle n () m ())
-> PeerSender
(BlockFetch block (Point block)) 'AsClient 'BFIdle n () m ()
forall (m :: * -> *) ps (pr :: PeerRole) (st :: ps) (n :: N) c a.
m (PeerSender ps pr st n c m a) -> PeerSender ps pr st n c m a
SenderEffect (m (PeerSender
(BlockFetch block (Point block)) 'AsClient 'BFIdle n () m ())
-> PeerSender
(BlockFetch block (Point block)) 'AsClient 'BFIdle n () m ())
-> m (PeerSender
(BlockFetch block (Point block)) 'AsClient 'BFIdle n () m ())
-> PeerSender
(BlockFetch block (Point block)) 'AsClient 'BFIdle n () m ()
forall a b. (a -> b) -> a -> b
$ do
Maybe (FetchRequest header, PeerGSV, PeerFetchInFlightLimits)
result <-
Tracer m (TraceFetchClientState header)
-> ControlMessageSTM m
-> FetchClientStateVars m header
-> m (Maybe
(FetchRequest header, PeerGSV, PeerFetchInFlightLimits))
forall (m :: * -> *) header.
MonadSTM m =>
Tracer m (TraceFetchClientState header)
-> ControlMessageSTM m
-> FetchClientStateVars m header
-> m (Maybe
(FetchRequest header, PeerGSV, PeerFetchInFlightLimits))
acknowledgeFetchRequest Tracer m (TraceFetchClientState header)
tracer ControlMessageSTM m
controlMessageSTM FetchClientStateVars m header
stateVars
case Maybe (FetchRequest header, PeerGSV, PeerFetchInFlightLimits)
result of
Maybe (FetchRequest header, PeerGSV, PeerFetchInFlightLimits)
Nothing -> do
Tracer m (TraceFetchClientState header)
-> TraceFetchClientState header -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (TraceFetchClientState header)
tracer (Int -> TraceFetchClientState header
forall header. Int -> TraceFetchClientState header
ClientTerminating (Int -> TraceFetchClientState header)
-> Int -> TraceFetchClientState header
forall a b. (a -> b) -> a -> b
$ Nat n -> Int
forall (n :: N). Nat n -> Int
natToInt Nat n
outstanding)
PeerSender
(BlockFetch block (Point block)) 'AsClient 'BFIdle n () m ()
-> m (PeerSender
(BlockFetch block (Point block)) 'AsClient 'BFIdle n () m ())
forall (m :: * -> *) a. Monad m => a -> m a
return (PeerSender
(BlockFetch block (Point block)) 'AsClient 'BFIdle n () m ()
-> m (PeerSender
(BlockFetch block (Point block)) 'AsClient 'BFIdle n () m ()))
-> PeerSender
(BlockFetch block (Point block)) 'AsClient 'BFIdle n () m ()
-> m (PeerSender
(BlockFetch block (Point block)) 'AsClient 'BFIdle n () m ())
forall a b. (a -> b) -> a -> b
$ Nat n
-> PeerSender
(BlockFetch block (Point block)) 'AsClient 'BFIdle n () m ()
forall (n :: N).
Nat n
-> PeerSender
(BlockFetch block (Point block)) 'AsClient 'BFIdle n () m ()
senderTerminate Nat n
outstanding
Just (FetchRequest header
request, PeerGSV
gsvs, PeerFetchInFlightLimits
inflightlimits) ->
PeerSender
(BlockFetch block (Point block)) 'AsClient 'BFIdle n () m ()
-> m (PeerSender
(BlockFetch block (Point block)) 'AsClient 'BFIdle n () m ())
forall (m :: * -> *) a. Monad m => a -> m a
return (PeerSender
(BlockFetch block (Point block)) 'AsClient 'BFIdle n () m ()
-> m (PeerSender
(BlockFetch block (Point block)) 'AsClient 'BFIdle n () m ()))
-> PeerSender
(BlockFetch block (Point block)) 'AsClient 'BFIdle n () m ()
-> m (PeerSender
(BlockFetch block (Point block)) 'AsClient 'BFIdle n () m ())
forall a b. (a -> b) -> a -> b
$ Nat n
-> PeerGSV
-> PeerFetchInFlightLimits
-> [AnchoredFragment header]
-> PeerSender
(BlockFetch block (Point block)) 'AsClient 'BFIdle n () m ()
forall (n :: N).
Nat n
-> PeerGSV
-> PeerFetchInFlightLimits
-> [AnchoredFragment header]
-> PeerSender
(BlockFetch block (Point block)) 'AsClient 'BFIdle n () m ()
senderActive Nat n
outstanding PeerGSV
gsvs PeerFetchInFlightLimits
inflightlimits
(FetchRequest header -> [AnchoredFragment header]
forall header. FetchRequest header -> [AnchoredFragment header]
fetchRequestFragments FetchRequest header
request)
senderActive :: forall n.
Nat n
-> PeerGSV
-> PeerFetchInFlightLimits
-> [AnchoredFragment header]
-> PeerSender (BlockFetch block (Point block)) AsClient
BFIdle n () m ()
senderActive :: Nat n
-> PeerGSV
-> PeerFetchInFlightLimits
-> [AnchoredFragment header]
-> PeerSender
(BlockFetch block (Point block)) 'AsClient 'BFIdle n () m ()
senderActive Nat n
outstanding PeerGSV
gsvs PeerFetchInFlightLimits
inflightlimits (AnchoredFragment header
fragment:[AnchoredFragment header]
fragments) =
m (PeerSender
(BlockFetch block (Point block)) 'AsClient 'BFIdle n () m ())
-> PeerSender
(BlockFetch block (Point block)) 'AsClient 'BFIdle n () m ()
forall (m :: * -> *) ps (pr :: PeerRole) (st :: ps) (n :: N) c a.
m (PeerSender ps pr st n c m a) -> PeerSender ps pr st n c m a
SenderEffect (m (PeerSender
(BlockFetch block (Point block)) 'AsClient 'BFIdle n () m ())
-> PeerSender
(BlockFetch block (Point block)) 'AsClient 'BFIdle n () m ())
-> m (PeerSender
(BlockFetch block (Point block)) 'AsClient 'BFIdle n () m ())
-> PeerSender
(BlockFetch block (Point block)) 'AsClient 'BFIdle n () m ()
forall a b. (a -> b) -> a -> b
$ do
let range :: ChainRange (Point header)
!range :: ChainRange (Point header)
range = Bool -> ChainRange (Point header) -> ChainRange (Point header)
forall a. HasCallStack => Bool -> a -> a
assert (Bool -> Bool
not (AnchoredFragment header -> Bool
forall v a b. AnchoredSeq v a b -> Bool
AF.null AnchoredFragment header
fragment)) (ChainRange (Point header) -> ChainRange (Point header))
-> ChainRange (Point header) -> ChainRange (Point header)
forall a b. (a -> b) -> a -> b
$
Point header -> Point header -> ChainRange (Point header)
forall point. point -> point -> ChainRange point
ChainRange (header -> Point header
forall block. HasHeader block => block -> Point block
blockPoint header
lower)
(header -> Point header
forall block. HasHeader block => block -> Point block
blockPoint header
upper)
where
Right header
lower = AnchoredFragment header -> Either (Anchor header) header
forall v a b. Anchorable v a b => AnchoredSeq v a b -> Either a b
AF.last AnchoredFragment header
fragment
Right header
upper = AnchoredFragment header -> Either (Anchor header) header
forall v a b. Anchorable v a b => AnchoredSeq v a b -> Either a b
AF.head AnchoredFragment header
fragment
Tracer m (TraceFetchClientState header)
-> TraceFetchClientState header -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (TraceFetchClientState header)
tracer (AnchoredFragment header -> TraceFetchClientState header
forall header.
AnchoredFragment header -> TraceFetchClientState header
SendFetchRequest AnchoredFragment header
fragment)
PeerSender
(BlockFetch block (Point block)) 'AsClient 'BFIdle n () m ()
-> m (PeerSender
(BlockFetch block (Point block)) 'AsClient 'BFIdle n () m ())
forall (m :: * -> *) a. Monad m => a -> m a
return (PeerSender
(BlockFetch block (Point block)) 'AsClient 'BFIdle n () m ()
-> m (PeerSender
(BlockFetch block (Point block)) 'AsClient 'BFIdle n () m ()))
-> PeerSender
(BlockFetch block (Point block)) 'AsClient 'BFIdle n () m ()
-> m (PeerSender
(BlockFetch block (Point block)) 'AsClient 'BFIdle n () m ())
forall a b. (a -> b) -> a -> b
$
WeHaveAgency 'AsClient 'BFIdle
-> Message (BlockFetch block (Point block)) 'BFIdle 'BFBusy
-> PeerReceiver
(BlockFetch block (Point block)) 'AsClient 'BFBusy 'BFIdle m ()
-> PeerSender
(BlockFetch block (Point block)) 'AsClient 'BFIdle ('S n) () m ()
-> PeerSender
(BlockFetch block (Point block)) 'AsClient 'BFIdle n () m ()
forall (pr :: PeerRole) ps (st :: ps) (st' :: ps) (st'' :: ps)
(m :: * -> *) c (n :: N) a.
WeHaveAgency pr st
-> Message ps st st'
-> PeerReceiver ps pr st' st'' m c
-> PeerSender ps pr st'' ('S n) c m a
-> PeerSender ps pr st n c m a
SenderPipeline
(ClientHasAgency 'BFIdle -> WeHaveAgency 'AsClient 'BFIdle
forall ps (st :: ps).
ClientHasAgency st -> PeerHasAgency 'AsClient st
ClientAgency ClientHasAgency 'BFIdle
forall k k (block :: k) (point :: k). ClientHasAgency 'BFIdle
TokIdle)
(ChainRange (Point block)
-> Message (BlockFetch block (Point block)) 'BFIdle 'BFBusy
forall k point (block :: k).
ChainRange point
-> Message (BlockFetch block point) 'BFIdle 'BFBusy
MsgRequestRange (ChainRange (Point header) -> ChainRange (Point block)
forall a b.
(HeaderHash a ~ HeaderHash b) =>
ChainRange (Point a) -> ChainRange (Point b)
castRange ChainRange (Point header)
range))
(ChainRange (Point header)
-> AnchoredFragment header
-> PeerFetchInFlightLimits
-> PeerReceiver
(BlockFetch block (Point block)) 'AsClient 'BFBusy 'BFIdle m ()
receiverBusy ChainRange (Point header)
range AnchoredFragment header
fragment PeerFetchInFlightLimits
inflightlimits)
(Nat ('S n)
-> PeerGSV
-> PeerFetchInFlightLimits
-> [AnchoredFragment header]
-> PeerSender
(BlockFetch block (Point block)) 'AsClient 'BFIdle ('S n) () m ()
forall (n :: N).
Nat n
-> PeerGSV
-> PeerFetchInFlightLimits
-> [AnchoredFragment header]
-> PeerSender
(BlockFetch block (Point block)) 'AsClient 'BFIdle n () m ()
senderActive (Nat n -> Nat ('S n)
forall (m :: N) (n :: N). (m ~ 'S n) => Nat n -> Nat m
Succ Nat n
outstanding) PeerGSV
gsvs PeerFetchInFlightLimits
inflightlimits [AnchoredFragment header]
fragments)
senderActive Nat n
outstanding PeerGSV
_ PeerFetchInFlightLimits
_ [] = Nat n
-> PeerSender
(BlockFetch block (Point block)) 'AsClient 'BFIdle n () m ()
forall (n :: N).
Nat n
-> PeerSender
(BlockFetch block (Point block)) 'AsClient 'BFIdle n () m ()
senderIdle Nat n
outstanding
senderTerminate :: forall n.
Nat n
-> PeerSender (BlockFetch block (Point block)) AsClient
BFIdle n () m ()
senderTerminate :: Nat n
-> PeerSender
(BlockFetch block (Point block)) 'AsClient 'BFIdle n () m ()
senderTerminate Nat n
Zero =
WeHaveAgency 'AsClient 'BFIdle
-> Message (BlockFetch block (Point block)) 'BFIdle 'BFDone
-> PeerSender
(BlockFetch block (Point block)) 'AsClient 'BFDone 'Z () m ()
-> PeerSender
(BlockFetch block (Point block)) 'AsClient 'BFIdle 'Z () m ()
forall (pr :: PeerRole) ps (st :: ps) (st' :: ps) c (m :: * -> *)
a.
WeHaveAgency pr st
-> Message ps st st'
-> PeerSender ps pr st' 'Z c m a
-> PeerSender ps pr st 'Z c m a
SenderYield (ClientHasAgency 'BFIdle -> WeHaveAgency 'AsClient 'BFIdle
forall ps (st :: ps).
ClientHasAgency st -> PeerHasAgency 'AsClient st
ClientAgency ClientHasAgency 'BFIdle
forall k k (block :: k) (point :: k). ClientHasAgency 'BFIdle
TokIdle)
Message (BlockFetch block (Point block)) 'BFIdle 'BFDone
forall k k (block :: k) (point :: k).
Message (BlockFetch block point) 'BFIdle 'BFDone
MsgClientDone
(NobodyHasAgency 'BFDone
-> ()
-> PeerSender
(BlockFetch block (Point block)) 'AsClient 'BFDone 'Z () m ()
forall ps (st :: ps) a (pr :: PeerRole) c (m :: * -> *).
NobodyHasAgency st -> a -> PeerSender ps pr st 'Z c m a
SenderDone NobodyHasAgency 'BFDone
forall k k (block :: k) (point :: k). NobodyHasAgency 'BFDone
TokDone ())
senderTerminate (Succ Nat n
n) =
Maybe
(PeerSender
(BlockFetch block (Point block)) 'AsClient 'BFIdle ('S n) () m ())
-> (()
-> PeerSender
(BlockFetch block (Point block)) 'AsClient 'BFIdle n () m ())
-> PeerSender
(BlockFetch block (Point block)) 'AsClient 'BFIdle ('S n) () m ()
forall ps (pr :: PeerRole) (st :: ps) (n1 :: N) c (m :: * -> *) a.
Maybe (PeerSender ps pr st ('S n1) c m a)
-> (c -> PeerSender ps pr st n1 c m a)
-> PeerSender ps pr st ('S n1) c m a
SenderCollect Maybe
(PeerSender
(BlockFetch block (Point block)) 'AsClient 'BFIdle ('S n) () m ())
forall a. Maybe a
Nothing
(\()
_ -> Nat n
-> PeerSender
(BlockFetch block (Point block)) 'AsClient 'BFIdle n () m ()
forall (n :: N).
Nat n
-> PeerSender
(BlockFetch block (Point block)) 'AsClient 'BFIdle n () m ()
senderTerminate Nat n
n)
receiverBusy :: ChainRange (Point header)
-> AnchoredFragment header
-> PeerFetchInFlightLimits
-> PeerReceiver (BlockFetch block (Point block)) AsClient
BFBusy BFIdle m ()
receiverBusy :: ChainRange (Point header)
-> AnchoredFragment header
-> PeerFetchInFlightLimits
-> PeerReceiver
(BlockFetch block (Point block)) 'AsClient 'BFBusy 'BFIdle m ()
receiverBusy ChainRange (Point header)
range AnchoredFragment header
fragment PeerFetchInFlightLimits
inflightlimits =
TheyHaveAgency 'AsClient 'BFBusy
-> (forall (st' :: BlockFetch block (Point block)).
Message (BlockFetch block (Point block)) 'BFBusy st'
-> PeerReceiver
(BlockFetch block (Point block)) 'AsClient st' 'BFIdle m ())
-> PeerReceiver
(BlockFetch block (Point block)) 'AsClient 'BFBusy 'BFIdle m ()
forall (pr :: PeerRole) ps (st :: ps) (stdone :: ps) (m :: * -> *)
c.
TheyHaveAgency pr st
-> (forall (st' :: ps).
Message ps st st' -> PeerReceiver ps pr st' stdone m c)
-> PeerReceiver ps pr st stdone m c
ReceiverAwait
(ServerHasAgency 'BFBusy -> PeerHasAgency 'AsServer 'BFBusy
forall ps (st :: ps).
ServerHasAgency st -> PeerHasAgency 'AsServer st
ServerAgency ServerHasAgency 'BFBusy
forall k k (block :: k) (point :: k). ServerHasAgency 'BFBusy
TokBusy) ((forall (st' :: BlockFetch block (Point block)).
Message (BlockFetch block (Point block)) 'BFBusy st'
-> PeerReceiver
(BlockFetch block (Point block)) 'AsClient st' 'BFIdle m ())
-> PeerReceiver
(BlockFetch block (Point block)) 'AsClient 'BFBusy 'BFIdle m ())
-> (forall (st' :: BlockFetch block (Point block)).
Message (BlockFetch block (Point block)) 'BFBusy st'
-> PeerReceiver
(BlockFetch block (Point block)) 'AsClient st' 'BFIdle m ())
-> PeerReceiver
(BlockFetch block (Point block)) 'AsClient 'BFBusy 'BFIdle m ()
forall a b. (a -> b) -> a -> b
$ \Message (BlockFetch block (Point block)) 'BFBusy st'
msg ->
case Message (BlockFetch block (Point block)) 'BFBusy st'
msg of
Message (BlockFetch block (Point block)) 'BFBusy st'
MsgNoBlocks ->
m (PeerReceiver
(BlockFetch block (Point block)) 'AsClient st' st' m ())
-> PeerReceiver
(BlockFetch block (Point block)) 'AsClient st' st' m ()
forall (m :: * -> *) ps (pr :: PeerRole) (st :: ps) (stdone :: ps)
c.
m (PeerReceiver ps pr st stdone m c)
-> PeerReceiver ps pr st stdone m c
ReceiverEffect (m (PeerReceiver
(BlockFetch block (Point block)) 'AsClient st' st' m ())
-> PeerReceiver
(BlockFetch block (Point block)) 'AsClient st' st' m ())
-> m (PeerReceiver
(BlockFetch block (Point block)) 'AsClient st' st' m ())
-> PeerReceiver
(BlockFetch block (Point block)) 'AsClient st' st' m ()
forall a b. (a -> b) -> a -> b
$ do
Tracer m (TraceFetchClientState header)
-> (header -> SizeInBytes)
-> PeerFetchInFlightLimits
-> ChainRange (Point header)
-> [header]
-> FetchClientStateVars m header
-> m ()
forall (m :: * -> *) header.
(MonadSTM m, HasHeader header) =>
Tracer m (TraceFetchClientState header)
-> (header -> SizeInBytes)
-> PeerFetchInFlightLimits
-> ChainRange (Point header)
-> [header]
-> FetchClientStateVars m header
-> m ()
rejectedFetchBatch Tracer m (TraceFetchClientState header)
tracer header -> SizeInBytes
blockFetchSize PeerFetchInFlightLimits
inflightlimits
ChainRange (Point header)
range [header]
headers FetchClientStateVars m header
stateVars
PeerReceiver
(BlockFetch block (Point block)) 'AsClient st' st' m ()
-> m (PeerReceiver
(BlockFetch block (Point block)) 'AsClient st' st' m ())
forall (m :: * -> *) a. Monad m => a -> m a
return (()
-> PeerReceiver
(BlockFetch block (Point block)) 'AsClient st' st' m ()
forall c ps (pr :: PeerRole) (st :: ps) (m :: * -> *).
c -> PeerReceiver ps pr st st m c
ReceiverDone ())
where
headers :: [header]
headers = AnchoredFragment header -> [header]
forall v a b. AnchoredSeq v a b -> [b]
AF.toOldestFirst AnchoredFragment header
fragment
Message (BlockFetch block (Point block)) 'BFBusy st'
MsgStartBatch ->
m (PeerReceiver
(BlockFetch block (Point block))
'AsClient
'BFStreaming
'BFIdle
m
())
-> PeerReceiver
(BlockFetch block (Point block))
'AsClient
'BFStreaming
'BFIdle
m
()
forall (m :: * -> *) ps (pr :: PeerRole) (st :: ps) (stdone :: ps)
c.
m (PeerReceiver ps pr st stdone m c)
-> PeerReceiver ps pr st stdone m c
ReceiverEffect (m (PeerReceiver
(BlockFetch block (Point block))
'AsClient
'BFStreaming
'BFIdle
m
())
-> PeerReceiver
(BlockFetch block (Point block))
'AsClient
'BFStreaming
'BFIdle
m
())
-> m (PeerReceiver
(BlockFetch block (Point block))
'AsClient
'BFStreaming
'BFIdle
m
())
-> PeerReceiver
(BlockFetch block (Point block))
'AsClient
'BFStreaming
'BFIdle
m
()
forall a b. (a -> b) -> a -> b
$ do
Tracer m (TraceFetchClientState header)
-> PeerFetchInFlightLimits
-> ChainRange (Point header)
-> FetchClientStateVars m header
-> m ()
forall (m :: * -> *) header.
MonadSTM m =>
Tracer m (TraceFetchClientState header)
-> PeerFetchInFlightLimits
-> ChainRange (Point header)
-> FetchClientStateVars m header
-> m ()
startedFetchBatch Tracer m (TraceFetchClientState header)
tracer PeerFetchInFlightLimits
inflightlimits ChainRange (Point header)
range FetchClientStateVars m header
stateVars
PeerReceiver
(BlockFetch block (Point block))
'AsClient
'BFStreaming
'BFIdle
m
()
-> m (PeerReceiver
(BlockFetch block (Point block))
'AsClient
'BFStreaming
'BFIdle
m
())
forall (m :: * -> *) a. Monad m => a -> m a
return (PeerFetchInFlightLimits
-> ChainRange (Point header)
-> [header]
-> PeerReceiver
(BlockFetch block (Point block))
'AsClient
'BFStreaming
'BFIdle
m
()
receiverStreaming PeerFetchInFlightLimits
inflightlimits ChainRange (Point header)
range [header]
headers)
where
headers :: [header]
headers = AnchoredFragment header -> [header]
forall v a b. AnchoredSeq v a b -> [b]
AF.toOldestFirst AnchoredFragment header
fragment
receiverStreaming :: PeerFetchInFlightLimits
-> ChainRange (Point header)
-> [header]
-> PeerReceiver (BlockFetch block (Point block)) AsClient
BFStreaming BFIdle m ()
receiverStreaming :: PeerFetchInFlightLimits
-> ChainRange (Point header)
-> [header]
-> PeerReceiver
(BlockFetch block (Point block))
'AsClient
'BFStreaming
'BFIdle
m
()
receiverStreaming PeerFetchInFlightLimits
inflightlimits ChainRange (Point header)
range [header]
headers =
TheyHaveAgency 'AsClient 'BFStreaming
-> (forall (st' :: BlockFetch block (Point block)).
Message (BlockFetch block (Point block)) 'BFStreaming st'
-> PeerReceiver
(BlockFetch block (Point block)) 'AsClient st' 'BFIdle m ())
-> PeerReceiver
(BlockFetch block (Point block))
'AsClient
'BFStreaming
'BFIdle
m
()
forall (pr :: PeerRole) ps (st :: ps) (stdone :: ps) (m :: * -> *)
c.
TheyHaveAgency pr st
-> (forall (st' :: ps).
Message ps st st' -> PeerReceiver ps pr st' stdone m c)
-> PeerReceiver ps pr st stdone m c
ReceiverAwait
(ServerHasAgency 'BFStreaming
-> PeerHasAgency 'AsServer 'BFStreaming
forall ps (st :: ps).
ServerHasAgency st -> PeerHasAgency 'AsServer st
ServerAgency ServerHasAgency 'BFStreaming
forall k k (block :: k) (point :: k). ServerHasAgency 'BFStreaming
TokStreaming) ((forall (st' :: BlockFetch block (Point block)).
Message (BlockFetch block (Point block)) 'BFStreaming st'
-> PeerReceiver
(BlockFetch block (Point block)) 'AsClient st' 'BFIdle m ())
-> PeerReceiver
(BlockFetch block (Point block))
'AsClient
'BFStreaming
'BFIdle
m
())
-> (forall (st' :: BlockFetch block (Point block)).
Message (BlockFetch block (Point block)) 'BFStreaming st'
-> PeerReceiver
(BlockFetch block (Point block)) 'AsClient st' 'BFIdle m ())
-> PeerReceiver
(BlockFetch block (Point block))
'AsClient
'BFStreaming
'BFIdle
m
()
forall a b. (a -> b) -> a -> b
$ \Message (BlockFetch block (Point block)) 'BFStreaming st'
msg ->
case (Message (BlockFetch block (Point block)) 'BFStreaming st'
msg, [header]
headers) of
(Message (BlockFetch block (Point block)) 'BFStreaming st'
MsgBatchDone, []) -> m (PeerReceiver
(BlockFetch block (Point block)) 'AsClient st' st' m ())
-> PeerReceiver
(BlockFetch block (Point block)) 'AsClient st' st' m ()
forall (m :: * -> *) ps (pr :: PeerRole) (st :: ps) (stdone :: ps)
c.
m (PeerReceiver ps pr st stdone m c)
-> PeerReceiver ps pr st stdone m c
ReceiverEffect (m (PeerReceiver
(BlockFetch block (Point block)) 'AsClient st' st' m ())
-> PeerReceiver
(BlockFetch block (Point block)) 'AsClient st' st' m ())
-> m (PeerReceiver
(BlockFetch block (Point block)) 'AsClient st' st' m ())
-> PeerReceiver
(BlockFetch block (Point block)) 'AsClient st' st' m ()
forall a b. (a -> b) -> a -> b
$ do
Tracer m (TraceFetchClientState header)
-> PeerFetchInFlightLimits
-> ChainRange (Point header)
-> FetchClientStateVars m header
-> m ()
forall (m :: * -> *) header.
MonadSTM m =>
Tracer m (TraceFetchClientState header)
-> PeerFetchInFlightLimits
-> ChainRange (Point header)
-> FetchClientStateVars m header
-> m ()
completeFetchBatch Tracer m (TraceFetchClientState header)
tracer PeerFetchInFlightLimits
inflightlimits ChainRange (Point header)
range FetchClientStateVars m header
stateVars
PeerReceiver
(BlockFetch block (Point block)) 'AsClient st' st' m ()
-> m (PeerReceiver
(BlockFetch block (Point block)) 'AsClient st' st' m ())
forall (m :: * -> *) a. Monad m => a -> m a
return (()
-> PeerReceiver
(BlockFetch block (Point block)) 'AsClient st' st' m ()
forall c ps (pr :: PeerRole) (st :: ps) (m :: * -> *).
c -> PeerReceiver ps pr st st m c
ReceiverDone ())
(MsgBlock block, header
header:[header]
headers') -> m (PeerReceiver
(BlockFetch block (Point block))
'AsClient
'BFStreaming
'BFIdle
m
())
-> PeerReceiver
(BlockFetch block (Point block))
'AsClient
'BFStreaming
'BFIdle
m
()
forall (m :: * -> *) ps (pr :: PeerRole) (st :: ps) (stdone :: ps)
c.
m (PeerReceiver ps pr st stdone m c)
-> PeerReceiver ps pr st stdone m c
ReceiverEffect (m (PeerReceiver
(BlockFetch block (Point block))
'AsClient
'BFStreaming
'BFIdle
m
())
-> PeerReceiver
(BlockFetch block (Point block))
'AsClient
'BFStreaming
'BFIdle
m
())
-> m (PeerReceiver
(BlockFetch block (Point block))
'AsClient
'BFStreaming
'BFIdle
m
())
-> PeerReceiver
(BlockFetch block (Point block))
'AsClient
'BFStreaming
'BFIdle
m
()
forall a b. (a -> b) -> a -> b
$ do
UTCTime
now <- m UTCTime
forall (m :: * -> *). MonadTime m => m UTCTime
getCurrentTime
Time
nowMono <- m Time
forall (m :: * -> *). MonadMonotonicTime m => m Time
getMonotonicTime
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (header -> Point header
forall block. HasHeader block => block -> Point block
blockPoint header
header Point header -> Point header -> Bool
forall a. Eq a => a -> a -> Bool
== Point block -> Point header
forall b b'.
Coercible (HeaderHash b) (HeaderHash b') =>
Point b -> Point b'
castPoint (block -> Point block
forall block. HasHeader block => block -> Point block
blockPoint block
block)) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
BlockFetchProtocolFailure -> m ()
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO BlockFetchProtocolFailure
BlockFetchProtocolFailureWrongBlock
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (header -> block -> Bool
blockMatchesHeader header
header block
block
block) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
BlockFetchProtocolFailure -> m ()
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO BlockFetchProtocolFailure
BlockFetchProtocolFailureInvalidBody
Point block -> block -> m ()
addFetchedBlock (Point header -> Point block
forall b b'.
Coercible (HeaderHash b) (HeaderHash b') =>
Point b -> Point b'
castPoint (header -> Point header
forall block. HasHeader block => block -> Point block
blockPoint header
header)) block
block
block
UTCTime
forgeTime <- STM m UTCTime -> m UTCTime
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m UTCTime -> m UTCTime) -> STM m UTCTime -> m UTCTime
forall a b. (a -> b) -> a -> b
$ FromConsensus block -> STM m UTCTime
blockForgeUTCTime (FromConsensus block -> STM m UTCTime)
-> FromConsensus block -> STM m UTCTime
forall a b. (a -> b) -> a -> b
$ block -> FromConsensus block
forall a. a -> FromConsensus a
FromConsensus block
block
let blockDelay :: NominalDiffTime
blockDelay = UTCTime -> UTCTime -> NominalDiffTime
diffUTCTime UTCTime
now UTCTime
forgeTime
let hf :: HeaderFields header
hf = header -> HeaderFields header
forall b. HasHeader b => b -> HeaderFields b
getHeaderFields header
header
slotNo :: SlotNo
slotNo = HeaderFields header -> SlotNo
forall b. HeaderFields b -> SlotNo
headerFieldSlot HeaderFields header
hf
STM m () -> m ()
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ FetchedMetricsTracer m -> (SizeInBytes, SlotNo, Time) -> STM m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith FetchedMetricsTracer m
reportFetched ( header -> SizeInBytes
blockFetchSize header
header
, SlotNo
slotNo
, Time
nowMono
)
Tracer m (TraceFetchClientState header)
-> (header -> SizeInBytes)
-> PeerFetchInFlightLimits
-> header
-> NominalDiffTime
-> FetchClientStateVars m header
-> m ()
forall (m :: * -> *) header.
(MonadSTM m, HasHeader header) =>
Tracer m (TraceFetchClientState header)
-> (header -> SizeInBytes)
-> PeerFetchInFlightLimits
-> header
-> NominalDiffTime
-> FetchClientStateVars m header
-> m ()
completeBlockDownload Tracer m (TraceFetchClientState header)
tracer header -> SizeInBytes
blockFetchSize PeerFetchInFlightLimits
inflightlimits
header
header NominalDiffTime
blockDelay FetchClientStateVars m header
stateVars
PeerReceiver
(BlockFetch block (Point block))
'AsClient
'BFStreaming
'BFIdle
m
()
-> m (PeerReceiver
(BlockFetch block (Point block))
'AsClient
'BFStreaming
'BFIdle
m
())
forall (m :: * -> *) a. Monad m => a -> m a
return (PeerFetchInFlightLimits
-> ChainRange (Point header)
-> [header]
-> PeerReceiver
(BlockFetch block (Point block))
'AsClient
'BFStreaming
'BFIdle
m
()
receiverStreaming PeerFetchInFlightLimits
inflightlimits ChainRange (Point header)
range [header]
headers')
(Message (BlockFetch block (Point block)) 'BFStreaming st'
MsgBatchDone, (header
_:[header]
_)) -> m (PeerReceiver
(BlockFetch block (Point block)) 'AsClient st' 'BFIdle m ())
-> PeerReceiver
(BlockFetch block (Point block)) 'AsClient st' 'BFIdle m ()
forall (m :: * -> *) ps (pr :: PeerRole) (st :: ps) (stdone :: ps)
c.
m (PeerReceiver ps pr st stdone m c)
-> PeerReceiver ps pr st stdone m c
ReceiverEffect (m (PeerReceiver
(BlockFetch block (Point block)) 'AsClient st' 'BFIdle m ())
-> PeerReceiver
(BlockFetch block (Point block)) 'AsClient st' 'BFIdle m ())
-> m (PeerReceiver
(BlockFetch block (Point block)) 'AsClient st' 'BFIdle m ())
-> PeerReceiver
(BlockFetch block (Point block)) 'AsClient st' 'BFIdle m ()
forall a b. (a -> b) -> a -> b
$
BlockFetchProtocolFailure
-> m (PeerReceiver
(BlockFetch block (Point block)) 'AsClient st' 'BFIdle m ())
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO BlockFetchProtocolFailure
BlockFetchProtocolFailureTooFewBlocks
(MsgBlock _, []) -> m (PeerReceiver
(BlockFetch block (Point block)) 'AsClient st' 'BFIdle m ())
-> PeerReceiver
(BlockFetch block (Point block)) 'AsClient st' 'BFIdle m ()
forall (m :: * -> *) ps (pr :: PeerRole) (st :: ps) (stdone :: ps)
c.
m (PeerReceiver ps pr st stdone m c)
-> PeerReceiver ps pr st stdone m c
ReceiverEffect (m (PeerReceiver
(BlockFetch block (Point block)) 'AsClient st' 'BFIdle m ())
-> PeerReceiver
(BlockFetch block (Point block)) 'AsClient st' 'BFIdle m ())
-> m (PeerReceiver
(BlockFetch block (Point block)) 'AsClient st' 'BFIdle m ())
-> PeerReceiver
(BlockFetch block (Point block)) 'AsClient st' 'BFIdle m ()
forall a b. (a -> b) -> a -> b
$
BlockFetchProtocolFailure
-> m (PeerReceiver
(BlockFetch block (Point block)) 'AsClient st' 'BFIdle m ())
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO BlockFetchProtocolFailure
BlockFetchProtocolFailureTooManyBlocks
castRange :: (HeaderHash a ~ HeaderHash b)
=> ChainRange (Point a) -> ChainRange (Point b)
castRange :: ChainRange (Point a) -> ChainRange (Point b)
castRange (ChainRange Point a
l Point a
u) = Point b -> Point b -> ChainRange (Point b)
forall point. point -> point -> ChainRange point
ChainRange (Point a -> Point b
forall b b'.
Coercible (HeaderHash b) (HeaderHash b') =>
Point b -> Point b'
castPoint Point a
l) (Point a -> Point b
forall b b'.
Coercible (HeaderHash b) (HeaderHash b') =>
Point b -> Point b'
castPoint Point a
u)