{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE DeriveFunctor #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE RecordWildCards #-}
module Ouroboros.Network.BlockFetch.ClientState
( FetchClientContext (..)
, FetchClientPolicy (..)
, FetchClientStateVars (..)
, newFetchClientStateVars
, readFetchClientState
, PeerFetchStatus (..)
, IsIdle (..)
, PeerFetchInFlight (..)
, initialPeerFetchInFlight
, FetchRequest (..)
, addNewFetchRequest
, acknowledgeFetchRequest
, startedFetchBatch
, completeBlockDownload
, completeFetchBatch
, rejectedFetchBatch
, TraceFetchClientState (..)
, TraceLabelPeer (..)
, ChainRange (..)
, FromConsensus (..)
, WhetherReceivingTentativeBlocks (..)
) where
import Data.List (foldl')
import Data.Maybe (mapMaybe)
import Data.Semigroup (Last (..))
import Data.Set (Set)
import qualified Data.Set as Set
import Control.Exception (assert)
import Control.Monad (when)
import Control.Monad.Class.MonadSTM.Strict
import Control.Monad.Class.MonadTime
import Control.Tracer (Tracer, traceWith)
import Network.Mux.Trace (TraceLabelPeer (..))
import Ouroboros.Network.AnchoredFragment (AnchoredFragment)
import qualified Ouroboros.Network.AnchoredFragment as AF
import Ouroboros.Network.Block (HasHeader, MaxSlotNo (..), Point,
blockPoint)
import Ouroboros.Network.BlockFetch.DeltaQ
(PeerFetchInFlightLimits (..), PeerGSV, SizeInBytes,
calculatePeerFetchInFlightLimits)
import Ouroboros.Network.Mux (ControlMessageSTM,
timeoutWithControlMessage)
import Ouroboros.Network.Point (withOriginToMaybe)
import Ouroboros.Network.Protocol.BlockFetch.Type (ChainRange (..))
data FetchClientContext header block m =
FetchClientContext {
FetchClientContext header block m
-> Tracer m (TraceFetchClientState header)
fetchClientCtxTracer :: Tracer m (TraceFetchClientState header),
FetchClientContext header block m
-> FetchClientPolicy header block m
fetchClientCtxPolicy :: FetchClientPolicy header block m,
FetchClientContext header block m -> FetchClientStateVars m header
fetchClientCtxStateVars :: FetchClientStateVars m header
}
data FetchClientPolicy header block m =
FetchClientPolicy {
FetchClientPolicy header block m -> header -> SizeInBytes
blockFetchSize :: header -> SizeInBytes,
:: header -> block -> Bool,
FetchClientPolicy header block m -> Point block -> block -> m ()
addFetchedBlock :: Point block -> block -> m (),
FetchClientPolicy header block m
-> FromConsensus block -> STM m UTCTime
blockForgeUTCTime :: FromConsensus block -> STM m UTCTime
}
data WhetherReceivingTentativeBlocks
= ReceivingTentativeBlocks
| NotReceivingTentativeBlocks
data FetchClientStateVars m header =
FetchClientStateVars {
FetchClientStateVars m header
-> StrictTVar m (PeerFetchStatus header)
fetchClientStatusVar :: StrictTVar m (PeerFetchStatus header),
FetchClientStateVars m header
-> StrictTVar m (PeerFetchInFlight header)
fetchClientInFlightVar :: StrictTVar m (PeerFetchInFlight header),
FetchClientStateVars m header -> TFetchRequestVar m header
fetchClientRequestVar :: TFetchRequestVar m header
}
newFetchClientStateVars :: MonadSTM m => STM m (FetchClientStateVars m header)
newFetchClientStateVars :: STM m (FetchClientStateVars m header)
newFetchClientStateVars = do
StrictTVar m (PeerFetchInFlight header)
fetchClientInFlightVar <- PeerFetchInFlight header
-> STM m (StrictTVar m (PeerFetchInFlight header))
forall (m :: * -> *) a. MonadSTM m => a -> STM m (StrictTVar m a)
newTVar PeerFetchInFlight header
forall header. PeerFetchInFlight header
initialPeerFetchInFlight
StrictTVar m (PeerFetchStatus header)
fetchClientStatusVar <- PeerFetchStatus header
-> STM m (StrictTVar m (PeerFetchStatus header))
forall (m :: * -> *) a. MonadSTM m => a -> STM m (StrictTVar m a)
newTVar (Set (Point header) -> IsIdle -> PeerFetchStatus header
forall header.
Set (Point header) -> IsIdle -> PeerFetchStatus header
PeerFetchStatusReady Set (Point header)
forall a. Set a
Set.empty IsIdle
IsIdle)
TFetchRequestVar m header
fetchClientRequestVar <- STM m (TFetchRequestVar m header)
forall (m :: * -> *) header.
MonadSTM m =>
STM m (TFetchRequestVar m header)
newTFetchRequestVar
FetchClientStateVars m header
-> STM m (FetchClientStateVars m header)
forall (m :: * -> *) a. Monad m => a -> m a
return FetchClientStateVars :: forall (m :: * -> *) header.
StrictTVar m (PeerFetchStatus header)
-> StrictTVar m (PeerFetchInFlight header)
-> TFetchRequestVar m header
-> FetchClientStateVars m header
FetchClientStateVars {StrictTVar m (PeerFetchInFlight header)
StrictTVar m (PeerFetchStatus header)
TFetchRequestVar m header
fetchClientRequestVar :: TFetchRequestVar m header
fetchClientStatusVar :: StrictTVar m (PeerFetchStatus header)
fetchClientInFlightVar :: StrictTVar m (PeerFetchInFlight header)
fetchClientRequestVar :: TFetchRequestVar m header
fetchClientInFlightVar :: StrictTVar m (PeerFetchInFlight header)
fetchClientStatusVar :: StrictTVar m (PeerFetchStatus header)
..}
readFetchClientState :: MonadSTM m
=> FetchClientStateVars m header
-> STM m (PeerFetchStatus header,
PeerFetchInFlight header,
FetchClientStateVars m header)
readFetchClientState :: FetchClientStateVars m header
-> STM
m
(PeerFetchStatus header, PeerFetchInFlight header,
FetchClientStateVars m header)
readFetchClientState vars :: FetchClientStateVars m header
vars@FetchClientStateVars{StrictTVar m (PeerFetchInFlight header)
StrictTVar m (PeerFetchStatus header)
TFetchRequestVar m header
fetchClientRequestVar :: TFetchRequestVar m header
fetchClientInFlightVar :: StrictTVar m (PeerFetchInFlight header)
fetchClientStatusVar :: StrictTVar m (PeerFetchStatus header)
fetchClientRequestVar :: forall (m :: * -> *) header.
FetchClientStateVars m header -> TFetchRequestVar m header
fetchClientInFlightVar :: forall (m :: * -> *) header.
FetchClientStateVars m header
-> StrictTVar m (PeerFetchInFlight header)
fetchClientStatusVar :: forall (m :: * -> *) header.
FetchClientStateVars m header
-> StrictTVar m (PeerFetchStatus header)
..} =
(,,) (PeerFetchStatus header
-> PeerFetchInFlight header
-> FetchClientStateVars m header
-> (PeerFetchStatus header, PeerFetchInFlight header,
FetchClientStateVars m header))
-> STM m (PeerFetchStatus header)
-> STM
m
(PeerFetchInFlight header
-> FetchClientStateVars m header
-> (PeerFetchStatus header, PeerFetchInFlight header,
FetchClientStateVars m header))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> StrictTVar m (PeerFetchStatus header)
-> STM m (PeerFetchStatus header)
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m (PeerFetchStatus header)
fetchClientStatusVar
STM
m
(PeerFetchInFlight header
-> FetchClientStateVars m header
-> (PeerFetchStatus header, PeerFetchInFlight header,
FetchClientStateVars m header))
-> STM m (PeerFetchInFlight header)
-> STM
m
(FetchClientStateVars m header
-> (PeerFetchStatus header, PeerFetchInFlight header,
FetchClientStateVars m header))
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> StrictTVar m (PeerFetchInFlight header)
-> STM m (PeerFetchInFlight header)
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m (PeerFetchInFlight header)
fetchClientInFlightVar
STM
m
(FetchClientStateVars m header
-> (PeerFetchStatus header, PeerFetchInFlight header,
FetchClientStateVars m header))
-> STM m (FetchClientStateVars m header)
-> STM
m
(PeerFetchStatus header, PeerFetchInFlight header,
FetchClientStateVars m header)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> FetchClientStateVars m header
-> STM m (FetchClientStateVars m header)
forall (f :: * -> *) a. Applicative f => a -> f a
pure FetchClientStateVars m header
vars
data PeerFetchStatus header =
PeerFetchStatusShutdown
| PeerFetchStatusAberrant
| PeerFetchStatusBusy
| PeerFetchStatusReady (Set (Point header)) IsIdle
deriving (PeerFetchStatus header -> PeerFetchStatus header -> Bool
(PeerFetchStatus header -> PeerFetchStatus header -> Bool)
-> (PeerFetchStatus header -> PeerFetchStatus header -> Bool)
-> Eq (PeerFetchStatus header)
forall header.
StandardHash header =>
PeerFetchStatus header -> PeerFetchStatus header -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: PeerFetchStatus header -> PeerFetchStatus header -> Bool
$c/= :: forall header.
StandardHash header =>
PeerFetchStatus header -> PeerFetchStatus header -> Bool
== :: PeerFetchStatus header -> PeerFetchStatus header -> Bool
$c== :: forall header.
StandardHash header =>
PeerFetchStatus header -> PeerFetchStatus header -> Bool
Eq, Int -> PeerFetchStatus header -> ShowS
[PeerFetchStatus header] -> ShowS
PeerFetchStatus header -> String
(Int -> PeerFetchStatus header -> ShowS)
-> (PeerFetchStatus header -> String)
-> ([PeerFetchStatus header] -> ShowS)
-> Show (PeerFetchStatus header)
forall header.
StandardHash header =>
Int -> PeerFetchStatus header -> ShowS
forall header.
StandardHash header =>
[PeerFetchStatus header] -> ShowS
forall header.
StandardHash header =>
PeerFetchStatus header -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [PeerFetchStatus header] -> ShowS
$cshowList :: forall header.
StandardHash header =>
[PeerFetchStatus header] -> ShowS
show :: PeerFetchStatus header -> String
$cshow :: forall header.
StandardHash header =>
PeerFetchStatus header -> String
showsPrec :: Int -> PeerFetchStatus header -> ShowS
$cshowsPrec :: forall header.
StandardHash header =>
Int -> PeerFetchStatus header -> ShowS
Show)
data IsIdle = IsIdle | IsNotIdle
deriving (IsIdle -> IsIdle -> Bool
(IsIdle -> IsIdle -> Bool)
-> (IsIdle -> IsIdle -> Bool) -> Eq IsIdle
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: IsIdle -> IsIdle -> Bool
$c/= :: IsIdle -> IsIdle -> Bool
== :: IsIdle -> IsIdle -> Bool
$c== :: IsIdle -> IsIdle -> Bool
Eq, Int -> IsIdle -> ShowS
[IsIdle] -> ShowS
IsIdle -> String
(Int -> IsIdle -> ShowS)
-> (IsIdle -> String) -> ([IsIdle] -> ShowS) -> Show IsIdle
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [IsIdle] -> ShowS
$cshowList :: [IsIdle] -> ShowS
show :: IsIdle -> String
$cshow :: IsIdle -> String
showsPrec :: Int -> IsIdle -> ShowS
$cshowsPrec :: Int -> IsIdle -> ShowS
Show)
idleIf :: Bool -> IsIdle
idleIf :: Bool -> IsIdle
idleIf Bool
b = if Bool
b then IsIdle
IsIdle else IsIdle
IsNotIdle
data PeerFetchInFlight header = PeerFetchInFlight {
PeerFetchInFlight header -> Word
peerFetchReqsInFlight :: !Word,
PeerFetchInFlight header -> SizeInBytes
peerFetchBytesInFlight :: !SizeInBytes,
PeerFetchInFlight header -> Set (Point header)
peerFetchBlocksInFlight :: Set (Point header),
PeerFetchInFlight header -> MaxSlotNo
peerFetchMaxSlotNo :: !MaxSlotNo
}
deriving (PeerFetchInFlight header -> PeerFetchInFlight header -> Bool
(PeerFetchInFlight header -> PeerFetchInFlight header -> Bool)
-> (PeerFetchInFlight header -> PeerFetchInFlight header -> Bool)
-> Eq (PeerFetchInFlight header)
forall header.
StandardHash header =>
PeerFetchInFlight header -> PeerFetchInFlight header -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: PeerFetchInFlight header -> PeerFetchInFlight header -> Bool
$c/= :: forall header.
StandardHash header =>
PeerFetchInFlight header -> PeerFetchInFlight header -> Bool
== :: PeerFetchInFlight header -> PeerFetchInFlight header -> Bool
$c== :: forall header.
StandardHash header =>
PeerFetchInFlight header -> PeerFetchInFlight header -> Bool
Eq, Int -> PeerFetchInFlight header -> ShowS
[PeerFetchInFlight header] -> ShowS
PeerFetchInFlight header -> String
(Int -> PeerFetchInFlight header -> ShowS)
-> (PeerFetchInFlight header -> String)
-> ([PeerFetchInFlight header] -> ShowS)
-> Show (PeerFetchInFlight header)
forall header.
StandardHash header =>
Int -> PeerFetchInFlight header -> ShowS
forall header.
StandardHash header =>
[PeerFetchInFlight header] -> ShowS
forall header.
StandardHash header =>
PeerFetchInFlight header -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [PeerFetchInFlight header] -> ShowS
$cshowList :: forall header.
StandardHash header =>
[PeerFetchInFlight header] -> ShowS
show :: PeerFetchInFlight header -> String
$cshow :: forall header.
StandardHash header =>
PeerFetchInFlight header -> String
showsPrec :: Int -> PeerFetchInFlight header -> ShowS
$cshowsPrec :: forall header.
StandardHash header =>
Int -> PeerFetchInFlight header -> ShowS
Show)
initialPeerFetchInFlight :: PeerFetchInFlight header
initialPeerFetchInFlight :: PeerFetchInFlight header
initialPeerFetchInFlight =
PeerFetchInFlight :: forall header.
Word
-> SizeInBytes
-> Set (Point header)
-> MaxSlotNo
-> PeerFetchInFlight header
PeerFetchInFlight {
peerFetchReqsInFlight :: Word
peerFetchReqsInFlight = Word
0,
peerFetchBytesInFlight :: SizeInBytes
peerFetchBytesInFlight = SizeInBytes
0,
peerFetchBlocksInFlight :: Set (Point header)
peerFetchBlocksInFlight = Set (Point header)
forall a. Set a
Set.empty,
peerFetchMaxSlotNo :: MaxSlotNo
peerFetchMaxSlotNo = MaxSlotNo
NoMaxSlotNo
}
addHeadersInFlight :: HasHeader header
=> (header -> SizeInBytes)
-> Maybe (FetchRequest header)
-> FetchRequest header
-> FetchRequest header
-> PeerFetchInFlight header
-> PeerFetchInFlight header
header -> SizeInBytes
blockFetchSize Maybe (FetchRequest header)
oldReq FetchRequest header
addedReq FetchRequest header
mergedReq PeerFetchInFlight header
inflight =
Bool -> PeerFetchInFlight header -> PeerFetchInFlight header
forall a. (?callStack::CallStack) => Bool -> a -> a
assert ([Bool] -> Bool
forall (t :: * -> *). Foldable t => t Bool -> Bool
and [ header -> Point header
forall block. HasHeader block => block -> Point block
blockPoint header
header Point header -> Set (Point header) -> Bool
forall a. Ord a => a -> Set a -> Bool
`Set.notMember` PeerFetchInFlight header -> Set (Point header)
forall header. PeerFetchInFlight header -> Set (Point header)
peerFetchBlocksInFlight PeerFetchInFlight header
inflight
| AnchoredFragment header
fragment <- FetchRequest header -> [AnchoredFragment header]
forall header. FetchRequest header -> [AnchoredFragment header]
fetchRequestFragments FetchRequest header
addedReq
, header
header <- AnchoredFragment header -> [header]
forall v a b. AnchoredSeq v a b -> [b]
AF.toOldestFirst AnchoredFragment header
fragment ]) (PeerFetchInFlight header -> PeerFetchInFlight header)
-> PeerFetchInFlight header -> PeerFetchInFlight header
forall a b. (a -> b) -> a -> b
$
PeerFetchInFlight :: forall header.
Word
-> SizeInBytes
-> Set (Point header)
-> MaxSlotNo
-> PeerFetchInFlight header
PeerFetchInFlight {
peerFetchReqsInFlight :: Word
peerFetchReqsInFlight = PeerFetchInFlight header -> Word
forall header. PeerFetchInFlight header -> Word
peerFetchReqsInFlight PeerFetchInFlight header
inflight
Word -> Word -> Word
forall a. Num a => a -> a -> a
+ FetchRequest header -> Word
forall header. FetchRequest header -> Word
numFetchReqs FetchRequest header
mergedReq
Word -> Word -> Word
forall a. Num a => a -> a -> a
- Word
-> (FetchRequest header -> Word)
-> Maybe (FetchRequest header)
-> Word
forall b a. b -> (a -> b) -> Maybe a -> b
maybe Word
0 FetchRequest header -> Word
forall header. FetchRequest header -> Word
numFetchReqs Maybe (FetchRequest header)
oldReq,
peerFetchBytesInFlight :: SizeInBytes
peerFetchBytesInFlight = PeerFetchInFlight header -> SizeInBytes
forall header. PeerFetchInFlight header -> SizeInBytes
peerFetchBytesInFlight PeerFetchInFlight header
inflight
SizeInBytes -> SizeInBytes -> SizeInBytes
forall a. Num a => a -> a -> a
+ [SizeInBytes] -> SizeInBytes
forall (t :: * -> *) a. (Foldable t, Num a) => t a -> a
sum [ header -> SizeInBytes
blockFetchSize header
header
| AnchoredFragment header
fragment <- FetchRequest header -> [AnchoredFragment header]
forall header. FetchRequest header -> [AnchoredFragment header]
fetchRequestFragments FetchRequest header
addedReq
, header
header <- AnchoredFragment header -> [header]
forall v a b. AnchoredSeq v a b -> [b]
AF.toOldestFirst AnchoredFragment header
fragment ],
peerFetchBlocksInFlight :: Set (Point header)
peerFetchBlocksInFlight = PeerFetchInFlight header -> Set (Point header)
forall header. PeerFetchInFlight header -> Set (Point header)
peerFetchBlocksInFlight PeerFetchInFlight header
inflight
Set (Point header) -> Set (Point header) -> Set (Point header)
forall a. Ord a => Set a -> Set a -> Set a
`Set.union` [Point header] -> Set (Point header)
forall a. Ord a => [a] -> Set a
Set.fromList
[ header -> Point header
forall block. HasHeader block => block -> Point block
blockPoint header
header
| AnchoredFragment header
fragment <- FetchRequest header -> [AnchoredFragment header]
forall header. FetchRequest header -> [AnchoredFragment header]
fetchRequestFragments FetchRequest header
addedReq
, header
header <- AnchoredFragment header -> [header]
forall v a b. AnchoredSeq v a b -> [b]
AF.toOldestFirst AnchoredFragment header
fragment ],
peerFetchMaxSlotNo :: MaxSlotNo
peerFetchMaxSlotNo = PeerFetchInFlight header -> MaxSlotNo
forall header. PeerFetchInFlight header -> MaxSlotNo
peerFetchMaxSlotNo PeerFetchInFlight header
inflight
MaxSlotNo -> MaxSlotNo -> MaxSlotNo
forall a. Ord a => a -> a -> a
`max` FetchRequest header -> MaxSlotNo
forall header. HasHeader header => FetchRequest header -> MaxSlotNo
fetchRequestMaxSlotNo FetchRequest header
addedReq
}
where
numFetchReqs :: FetchRequest header -> Word
numFetchReqs :: FetchRequest header -> Word
numFetchReqs = Int -> Word
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int -> Word)
-> (FetchRequest header -> Int) -> FetchRequest header -> Word
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [AnchoredFragment header] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length ([AnchoredFragment header] -> Int)
-> (FetchRequest header -> [AnchoredFragment header])
-> FetchRequest header
-> Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
. FetchRequest header -> [AnchoredFragment header]
forall header. FetchRequest header -> [AnchoredFragment header]
fetchRequestFragments
deleteHeaderInFlight :: HasHeader header
=> (header -> SizeInBytes)
-> header
-> PeerFetchInFlight header
-> PeerFetchInFlight header
header -> SizeInBytes
blockFetchSize header
header PeerFetchInFlight header
inflight =
Bool -> PeerFetchInFlight header -> PeerFetchInFlight header
forall a. (?callStack::CallStack) => Bool -> a -> a
assert (PeerFetchInFlight header -> SizeInBytes
forall header. PeerFetchInFlight header -> SizeInBytes
peerFetchBytesInFlight PeerFetchInFlight header
inflight SizeInBytes -> SizeInBytes -> Bool
forall a. Ord a => a -> a -> Bool
>= header -> SizeInBytes
blockFetchSize header
header) (PeerFetchInFlight header -> PeerFetchInFlight header)
-> PeerFetchInFlight header -> PeerFetchInFlight header
forall a b. (a -> b) -> a -> b
$
Bool -> PeerFetchInFlight header -> PeerFetchInFlight header
forall a. (?callStack::CallStack) => Bool -> a -> a
assert (header -> Point header
forall block. HasHeader block => block -> Point block
blockPoint header
header Point header -> Set (Point header) -> Bool
forall a. Ord a => a -> Set a -> Bool
`Set.member` PeerFetchInFlight header -> Set (Point header)
forall header. PeerFetchInFlight header -> Set (Point header)
peerFetchBlocksInFlight PeerFetchInFlight header
inflight) (PeerFetchInFlight header -> PeerFetchInFlight header)
-> PeerFetchInFlight header -> PeerFetchInFlight header
forall a b. (a -> b) -> a -> b
$
PeerFetchInFlight header
inflight {
peerFetchBytesInFlight :: SizeInBytes
peerFetchBytesInFlight = PeerFetchInFlight header -> SizeInBytes
forall header. PeerFetchInFlight header -> SizeInBytes
peerFetchBytesInFlight PeerFetchInFlight header
inflight
SizeInBytes -> SizeInBytes -> SizeInBytes
forall a. Num a => a -> a -> a
- header -> SizeInBytes
blockFetchSize header
header,
peerFetchBlocksInFlight :: Set (Point header)
peerFetchBlocksInFlight = header -> Point header
forall block. HasHeader block => block -> Point block
blockPoint header
header
Point header -> Set (Point header) -> Set (Point header)
forall a. Ord a => a -> Set a -> Set a
`Set.delete` PeerFetchInFlight header -> Set (Point header)
forall header. PeerFetchInFlight header -> Set (Point header)
peerFetchBlocksInFlight PeerFetchInFlight header
inflight
}
deleteHeadersInFlight :: HasHeader header
=> (header -> SizeInBytes)
-> [header]
-> PeerFetchInFlight header
-> PeerFetchInFlight header
header -> SizeInBytes
blockFetchSize [header]
headers PeerFetchInFlight header
inflight =
(PeerFetchInFlight header -> header -> PeerFetchInFlight header)
-> PeerFetchInFlight header -> [header] -> PeerFetchInFlight header
forall (t :: * -> *) b a.
Foldable t =>
(b -> a -> b) -> b -> t a -> b
foldl' ((header -> PeerFetchInFlight header -> PeerFetchInFlight header)
-> PeerFetchInFlight header -> header -> PeerFetchInFlight header
forall a b c. (a -> b -> c) -> b -> a -> c
flip ((header -> SizeInBytes)
-> header -> PeerFetchInFlight header -> PeerFetchInFlight header
forall header.
HasHeader header =>
(header -> SizeInBytes)
-> header -> PeerFetchInFlight header -> PeerFetchInFlight header
deleteHeaderInFlight header -> SizeInBytes
blockFetchSize)) PeerFetchInFlight header
inflight [header]
headers
newtype FetchRequest header =
FetchRequest { FetchRequest header -> [AnchoredFragment header]
fetchRequestFragments :: [AnchoredFragment header] }
deriving Int -> FetchRequest header -> ShowS
[FetchRequest header] -> ShowS
FetchRequest header -> String
(Int -> FetchRequest header -> ShowS)
-> (FetchRequest header -> String)
-> ([FetchRequest header] -> ShowS)
-> Show (FetchRequest header)
forall header.
(StandardHash header, Show header) =>
Int -> FetchRequest header -> ShowS
forall header.
(StandardHash header, Show header) =>
[FetchRequest header] -> ShowS
forall header.
(StandardHash header, Show header) =>
FetchRequest header -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [FetchRequest header] -> ShowS
$cshowList :: forall header.
(StandardHash header, Show header) =>
[FetchRequest header] -> ShowS
show :: FetchRequest header -> String
$cshow :: forall header.
(StandardHash header, Show header) =>
FetchRequest header -> String
showsPrec :: Int -> FetchRequest header -> ShowS
$cshowsPrec :: forall header.
(StandardHash header, Show header) =>
Int -> FetchRequest header -> ShowS
Show
instance HasHeader header => Semigroup (FetchRequest header) where
FetchRequest afs :: [AnchoredFragment header]
afs@(AnchoredFragment header
_:[AnchoredFragment header]
_) <> :: FetchRequest header -> FetchRequest header -> FetchRequest header
<> FetchRequest bfs :: [AnchoredFragment header]
bfs@(AnchoredFragment header
_:[AnchoredFragment header]
_)
| Just AnchoredFragment header
f <- AnchoredFragment header
-> AnchoredFragment header -> Maybe (AnchoredFragment header)
forall block.
HasHeader block =>
AnchoredFragment block
-> AnchoredFragment block -> Maybe (AnchoredFragment block)
AF.join ([AnchoredFragment header] -> AnchoredFragment header
forall a. [a] -> a
last [AnchoredFragment header]
afs) ([AnchoredFragment header] -> AnchoredFragment header
forall a. [a] -> a
head [AnchoredFragment header]
bfs)
= [AnchoredFragment header] -> FetchRequest header
forall header. [AnchoredFragment header] -> FetchRequest header
FetchRequest ([AnchoredFragment header] -> [AnchoredFragment header]
forall a. [a] -> [a]
init [AnchoredFragment header]
afs [AnchoredFragment header]
-> [AnchoredFragment header] -> [AnchoredFragment header]
forall a. [a] -> [a] -> [a]
++ AnchoredFragment header
f AnchoredFragment header
-> [AnchoredFragment header] -> [AnchoredFragment header]
forall a. a -> [a] -> [a]
: [AnchoredFragment header] -> [AnchoredFragment header]
forall a. [a] -> [a]
tail [AnchoredFragment header]
bfs)
FetchRequest [AnchoredFragment header]
afs <> FetchRequest [AnchoredFragment header]
bfs
= [AnchoredFragment header] -> FetchRequest header
forall header. [AnchoredFragment header] -> FetchRequest header
FetchRequest ([AnchoredFragment header]
afs [AnchoredFragment header]
-> [AnchoredFragment header] -> [AnchoredFragment header]
forall a. [a] -> [a] -> [a]
++ [AnchoredFragment header]
bfs)
fetchRequestMaxSlotNo :: HasHeader header => FetchRequest header -> MaxSlotNo
fetchRequestMaxSlotNo :: FetchRequest header -> MaxSlotNo
fetchRequestMaxSlotNo (FetchRequest [AnchoredFragment header]
afs) =
(MaxSlotNo -> MaxSlotNo -> MaxSlotNo)
-> MaxSlotNo -> [MaxSlotNo] -> MaxSlotNo
forall (t :: * -> *) b a.
Foldable t =>
(b -> a -> b) -> b -> t a -> b
foldl' MaxSlotNo -> MaxSlotNo -> MaxSlotNo
forall a. Ord a => a -> a -> a
max MaxSlotNo
NoMaxSlotNo ([MaxSlotNo] -> MaxSlotNo) -> [MaxSlotNo] -> MaxSlotNo
forall a b. (a -> b) -> a -> b
$ (SlotNo -> MaxSlotNo) -> [SlotNo] -> [MaxSlotNo]
forall a b. (a -> b) -> [a] -> [b]
map SlotNo -> MaxSlotNo
MaxSlotNo ([SlotNo] -> [MaxSlotNo]) -> [SlotNo] -> [MaxSlotNo]
forall a b. (a -> b) -> a -> b
$
(AnchoredFragment header -> Maybe SlotNo)
-> [AnchoredFragment header] -> [SlotNo]
forall a b. (a -> Maybe b) -> [a] -> [b]
mapMaybe (WithOrigin SlotNo -> Maybe SlotNo
forall t. WithOrigin t -> Maybe t
withOriginToMaybe (WithOrigin SlotNo -> Maybe SlotNo)
-> (AnchoredFragment header -> WithOrigin SlotNo)
-> AnchoredFragment header
-> Maybe SlotNo
forall b c a. (b -> c) -> (a -> b) -> a -> c
. AnchoredFragment header -> WithOrigin SlotNo
forall block.
HasHeader block =>
AnchoredFragment block -> WithOrigin SlotNo
AF.headSlot) [AnchoredFragment header]
afs
data TraceFetchClientState header =
AddedFetchRequest
(FetchRequest header)
(PeerFetchInFlight header)
PeerFetchInFlightLimits
(PeerFetchStatus header)
| AcknowledgedFetchRequest
(FetchRequest header)
| SendFetchRequest
(AnchoredFragment header)
| StartedFetchBatch
(ChainRange (Point header))
(PeerFetchInFlight header)
PeerFetchInFlightLimits
(PeerFetchStatus header)
| CompletedBlockFetch
(Point header)
(PeerFetchInFlight header)
PeerFetchInFlightLimits
(PeerFetchStatus header)
NominalDiffTime
SizeInBytes
| CompletedFetchBatch
(ChainRange (Point header))
(PeerFetchInFlight header)
PeerFetchInFlightLimits
(PeerFetchStatus header)
| RejectedFetchBatch
(ChainRange (Point header))
(PeerFetchInFlight header)
PeerFetchInFlightLimits
(PeerFetchStatus header)
| ClientTerminating Int
deriving Int -> TraceFetchClientState header -> ShowS
[TraceFetchClientState header] -> ShowS
TraceFetchClientState header -> String
(Int -> TraceFetchClientState header -> ShowS)
-> (TraceFetchClientState header -> String)
-> ([TraceFetchClientState header] -> ShowS)
-> Show (TraceFetchClientState header)
forall header.
(StandardHash header, Show header) =>
Int -> TraceFetchClientState header -> ShowS
forall header.
(StandardHash header, Show header) =>
[TraceFetchClientState header] -> ShowS
forall header.
(StandardHash header, Show header) =>
TraceFetchClientState header -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [TraceFetchClientState header] -> ShowS
$cshowList :: forall header.
(StandardHash header, Show header) =>
[TraceFetchClientState header] -> ShowS
show :: TraceFetchClientState header -> String
$cshow :: forall header.
(StandardHash header, Show header) =>
TraceFetchClientState header -> String
showsPrec :: Int -> TraceFetchClientState header -> ShowS
$cshowsPrec :: forall header.
(StandardHash header, Show header) =>
Int -> TraceFetchClientState header -> ShowS
Show
addNewFetchRequest :: (MonadSTM m, HasHeader header)
=> Tracer m (TraceFetchClientState header)
-> (header -> SizeInBytes)
-> FetchRequest header
-> PeerGSV
-> FetchClientStateVars m header
-> m (PeerFetchStatus header)
addNewFetchRequest :: Tracer m (TraceFetchClientState header)
-> (header -> SizeInBytes)
-> FetchRequest header
-> PeerGSV
-> FetchClientStateVars m header
-> m (PeerFetchStatus header)
addNewFetchRequest Tracer m (TraceFetchClientState header)
tracer header -> SizeInBytes
blockFetchSize FetchRequest header
addedReq PeerGSV
gsvs
FetchClientStateVars{
TFetchRequestVar m header
fetchClientRequestVar :: TFetchRequestVar m header
fetchClientRequestVar :: forall (m :: * -> *) header.
FetchClientStateVars m header -> TFetchRequestVar m header
fetchClientRequestVar,
StrictTVar m (PeerFetchInFlight header)
fetchClientInFlightVar :: StrictTVar m (PeerFetchInFlight header)
fetchClientInFlightVar :: forall (m :: * -> *) header.
FetchClientStateVars m header
-> StrictTVar m (PeerFetchInFlight header)
fetchClientInFlightVar,
StrictTVar m (PeerFetchStatus header)
fetchClientStatusVar :: StrictTVar m (PeerFetchStatus header)
fetchClientStatusVar :: forall (m :: * -> *) header.
FetchClientStateVars m header
-> StrictTVar m (PeerFetchStatus header)
fetchClientStatusVar
} = do
(PeerFetchInFlight header
inflight', PeerFetchStatus header
currentStatus') <- STM m (PeerFetchInFlight header, PeerFetchStatus header)
-> m (PeerFetchInFlight header, PeerFetchStatus header)
forall (m :: * -> *) a.
(MonadSTM m, ?callStack::CallStack) =>
STM m a -> m a
atomically (STM m (PeerFetchInFlight header, PeerFetchStatus header)
-> m (PeerFetchInFlight header, PeerFetchStatus header))
-> STM m (PeerFetchInFlight header, PeerFetchStatus header)
-> m (PeerFetchInFlight header, PeerFetchStatus header)
forall a b. (a -> b) -> a -> b
$ do
Maybe (FetchRequest header)
oldReq <- TFetchRequestVar m header -> STM m (Maybe (FetchRequest header))
forall (m :: * -> *) header.
MonadSTM m =>
TFetchRequestVar m header -> STM m (Maybe (FetchRequest header))
peekTFetchRequestVar TFetchRequestVar m header
fetchClientRequestVar
FetchRequest header
mergedReq <- TFetchRequestVar m header
-> FetchRequest header
-> PeerGSV
-> PeerFetchInFlightLimits
-> STM m (FetchRequest header)
forall (m :: * -> *) header.
(MonadSTM m, HasHeader header) =>
TFetchRequestVar m header
-> FetchRequest header
-> PeerGSV
-> PeerFetchInFlightLimits
-> STM m (FetchRequest header)
writeTFetchRequestVar TFetchRequestVar m header
fetchClientRequestVar
FetchRequest header
addedReq PeerGSV
gsvs PeerFetchInFlightLimits
inflightlimits
PeerFetchInFlight header
inflight <- StrictTVar m (PeerFetchInFlight header)
-> STM m (PeerFetchInFlight header)
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m (PeerFetchInFlight header)
fetchClientInFlightVar
let !inflight' :: PeerFetchInFlight header
inflight' = (header -> SizeInBytes)
-> Maybe (FetchRequest header)
-> FetchRequest header
-> FetchRequest header
-> PeerFetchInFlight header
-> PeerFetchInFlight header
forall header.
HasHeader header =>
(header -> SizeInBytes)
-> Maybe (FetchRequest header)
-> FetchRequest header
-> FetchRequest header
-> PeerFetchInFlight header
-> PeerFetchInFlight header
addHeadersInFlight header -> SizeInBytes
blockFetchSize
Maybe (FetchRequest header)
oldReq FetchRequest header
addedReq FetchRequest header
mergedReq
PeerFetchInFlight header
inflight
StrictTVar m (PeerFetchInFlight header)
-> PeerFetchInFlight header -> STM m ()
forall (m :: * -> *) a.
(MonadSTM m, ?callStack::CallStack) =>
StrictTVar m a -> a -> STM m ()
writeTVar StrictTVar m (PeerFetchInFlight header)
fetchClientInFlightVar PeerFetchInFlight header
inflight'
PeerFetchStatus header
currentStatus' <- (PeerFetchInFlight header -> PeerFetchStatus header)
-> StrictTVar m (PeerFetchStatus header)
-> PeerFetchInFlight header
-> STM m (PeerFetchStatus header)
forall (m :: * -> *) header.
(MonadSTM m, HasHeader header) =>
(PeerFetchInFlight header -> PeerFetchStatus header)
-> StrictTVar m (PeerFetchStatus header)
-> PeerFetchInFlight header
-> STM m (PeerFetchStatus header)
updateCurrentStatus
(PeerFetchInFlightLimits
-> PeerFetchInFlight header -> PeerFetchStatus header
forall header.
PeerFetchInFlightLimits
-> PeerFetchInFlight header -> PeerFetchStatus header
busyIfOverHighWatermark PeerFetchInFlightLimits
inflightlimits)
StrictTVar m (PeerFetchStatus header)
fetchClientStatusVar
PeerFetchInFlight header
inflight'
(PeerFetchInFlight header, PeerFetchStatus header)
-> STM m (PeerFetchInFlight header, PeerFetchStatus header)
forall (m :: * -> *) a. Monad m => a -> m a
return (PeerFetchInFlight header
inflight', PeerFetchStatus header
currentStatus')
Tracer m (TraceFetchClientState header)
-> TraceFetchClientState header -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (TraceFetchClientState header)
tracer (TraceFetchClientState header -> m ())
-> TraceFetchClientState header -> m ()
forall a b. (a -> b) -> a -> b
$
FetchRequest header
-> PeerFetchInFlight header
-> PeerFetchInFlightLimits
-> PeerFetchStatus header
-> TraceFetchClientState header
forall header.
FetchRequest header
-> PeerFetchInFlight header
-> PeerFetchInFlightLimits
-> PeerFetchStatus header
-> TraceFetchClientState header
AddedFetchRequest
FetchRequest header
addedReq
PeerFetchInFlight header
inflight' PeerFetchInFlightLimits
inflightlimits
PeerFetchStatus header
currentStatus'
PeerFetchStatus header -> m (PeerFetchStatus header)
forall (m :: * -> *) a. Monad m => a -> m a
return PeerFetchStatus header
currentStatus'
where
inflightlimits :: PeerFetchInFlightLimits
inflightlimits = PeerGSV -> PeerFetchInFlightLimits
calculatePeerFetchInFlightLimits PeerGSV
gsvs
acknowledgeFetchRequest :: MonadSTM m
=> Tracer m (TraceFetchClientState header)
-> ControlMessageSTM m
-> FetchClientStateVars m header
-> m (Maybe
( FetchRequest header
, PeerGSV
, PeerFetchInFlightLimits ))
acknowledgeFetchRequest :: Tracer m (TraceFetchClientState header)
-> ControlMessageSTM m
-> FetchClientStateVars m header
-> m (Maybe
(FetchRequest header, PeerGSV, PeerFetchInFlightLimits))
acknowledgeFetchRequest Tracer m (TraceFetchClientState header)
tracer ControlMessageSTM m
controlMessageSTM FetchClientStateVars {TFetchRequestVar m header
fetchClientRequestVar :: TFetchRequestVar m header
fetchClientRequestVar :: forall (m :: * -> *) header.
FetchClientStateVars m header -> TFetchRequestVar m header
fetchClientRequestVar} = do
Maybe (FetchRequest header, PeerGSV, PeerFetchInFlightLimits)
result <-
ControlMessageSTM m
-> STM m (FetchRequest header, PeerGSV, PeerFetchInFlightLimits)
-> m (Maybe
(FetchRequest header, PeerGSV, PeerFetchInFlightLimits))
forall (m :: * -> *) a.
MonadSTM m =>
ControlMessageSTM m -> STM m a -> m (Maybe a)
timeoutWithControlMessage ControlMessageSTM m
controlMessageSTM (TFetchRequestVar m header
-> STM m (FetchRequest header, PeerGSV, PeerFetchInFlightLimits)
forall (m :: * -> *) header.
MonadSTM m =>
TFetchRequestVar m header
-> STM m (FetchRequest header, PeerGSV, PeerFetchInFlightLimits)
takeTFetchRequestVar TFetchRequestVar m header
fetchClientRequestVar)
case Maybe (FetchRequest header, PeerGSV, PeerFetchInFlightLimits)
result of
Maybe (FetchRequest header, PeerGSV, PeerFetchInFlightLimits)
Nothing -> Maybe (FetchRequest header, PeerGSV, PeerFetchInFlightLimits)
-> m (Maybe
(FetchRequest header, PeerGSV, PeerFetchInFlightLimits))
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe (FetchRequest header, PeerGSV, PeerFetchInFlightLimits)
result
Just (FetchRequest header
request, PeerGSV
_, PeerFetchInFlightLimits
_) -> do
Tracer m (TraceFetchClientState header)
-> TraceFetchClientState header -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (TraceFetchClientState header)
tracer (FetchRequest header -> TraceFetchClientState header
forall header. FetchRequest header -> TraceFetchClientState header
AcknowledgedFetchRequest FetchRequest header
request)
Maybe (FetchRequest header, PeerGSV, PeerFetchInFlightLimits)
-> m (Maybe
(FetchRequest header, PeerGSV, PeerFetchInFlightLimits))
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe (FetchRequest header, PeerGSV, PeerFetchInFlightLimits)
result
startedFetchBatch :: MonadSTM m
=> Tracer m (TraceFetchClientState header)
-> PeerFetchInFlightLimits
-> ChainRange (Point header)
-> FetchClientStateVars m header
-> m ()
startedFetchBatch :: Tracer m (TraceFetchClientState header)
-> PeerFetchInFlightLimits
-> ChainRange (Point header)
-> FetchClientStateVars m header
-> m ()
startedFetchBatch Tracer m (TraceFetchClientState header)
tracer PeerFetchInFlightLimits
inflightlimits ChainRange (Point header)
range
FetchClientStateVars {
StrictTVar m (PeerFetchInFlight header)
fetchClientInFlightVar :: StrictTVar m (PeerFetchInFlight header)
fetchClientInFlightVar :: forall (m :: * -> *) header.
FetchClientStateVars m header
-> StrictTVar m (PeerFetchInFlight header)
fetchClientInFlightVar,
StrictTVar m (PeerFetchStatus header)
fetchClientStatusVar :: StrictTVar m (PeerFetchStatus header)
fetchClientStatusVar :: forall (m :: * -> *) header.
FetchClientStateVars m header
-> StrictTVar m (PeerFetchStatus header)
fetchClientStatusVar
} = do
(PeerFetchInFlight header
inflight, PeerFetchStatus header
currentStatus) <-
STM m (PeerFetchInFlight header, PeerFetchStatus header)
-> m (PeerFetchInFlight header, PeerFetchStatus header)
forall (m :: * -> *) a.
(MonadSTM m, ?callStack::CallStack) =>
STM m a -> m a
atomically (STM m (PeerFetchInFlight header, PeerFetchStatus header)
-> m (PeerFetchInFlight header, PeerFetchStatus header))
-> STM m (PeerFetchInFlight header, PeerFetchStatus header)
-> m (PeerFetchInFlight header, PeerFetchStatus header)
forall a b. (a -> b) -> a -> b
$ (,) (PeerFetchInFlight header
-> PeerFetchStatus header
-> (PeerFetchInFlight header, PeerFetchStatus header))
-> STM m (PeerFetchInFlight header)
-> STM
m
(PeerFetchStatus header
-> (PeerFetchInFlight header, PeerFetchStatus header))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> StrictTVar m (PeerFetchInFlight header)
-> STM m (PeerFetchInFlight header)
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m (PeerFetchInFlight header)
fetchClientInFlightVar
STM
m
(PeerFetchStatus header
-> (PeerFetchInFlight header, PeerFetchStatus header))
-> STM m (PeerFetchStatus header)
-> STM m (PeerFetchInFlight header, PeerFetchStatus header)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> StrictTVar m (PeerFetchStatus header)
-> STM m (PeerFetchStatus header)
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m (PeerFetchStatus header)
fetchClientStatusVar
Tracer m (TraceFetchClientState header)
-> TraceFetchClientState header -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (TraceFetchClientState header)
tracer (TraceFetchClientState header -> m ())
-> TraceFetchClientState header -> m ()
forall a b. (a -> b) -> a -> b
$
ChainRange (Point header)
-> PeerFetchInFlight header
-> PeerFetchInFlightLimits
-> PeerFetchStatus header
-> TraceFetchClientState header
forall header.
ChainRange (Point header)
-> PeerFetchInFlight header
-> PeerFetchInFlightLimits
-> PeerFetchStatus header
-> TraceFetchClientState header
StartedFetchBatch
ChainRange (Point header)
range
PeerFetchInFlight header
inflight PeerFetchInFlightLimits
inflightlimits
PeerFetchStatus header
currentStatus
completeBlockDownload :: (MonadSTM m, HasHeader header)
=> Tracer m (TraceFetchClientState header)
-> (header -> SizeInBytes)
-> PeerFetchInFlightLimits
-> header
-> NominalDiffTime
-> FetchClientStateVars m header
-> m ()
completeBlockDownload :: Tracer m (TraceFetchClientState header)
-> (header -> SizeInBytes)
-> PeerFetchInFlightLimits
-> header
-> NominalDiffTime
-> FetchClientStateVars m header
-> m ()
completeBlockDownload Tracer m (TraceFetchClientState header)
tracer header -> SizeInBytes
blockFetchSize PeerFetchInFlightLimits
inflightlimits header
header NominalDiffTime
blockDelay
FetchClientStateVars {
StrictTVar m (PeerFetchInFlight header)
fetchClientInFlightVar :: StrictTVar m (PeerFetchInFlight header)
fetchClientInFlightVar :: forall (m :: * -> *) header.
FetchClientStateVars m header
-> StrictTVar m (PeerFetchInFlight header)
fetchClientInFlightVar,
StrictTVar m (PeerFetchStatus header)
fetchClientStatusVar :: StrictTVar m (PeerFetchStatus header)
fetchClientStatusVar :: forall (m :: * -> *) header.
FetchClientStateVars m header
-> StrictTVar m (PeerFetchStatus header)
fetchClientStatusVar
} = do
(PeerFetchInFlight header
inflight', PeerFetchStatus header
currentStatus') <- STM m (PeerFetchInFlight header, PeerFetchStatus header)
-> m (PeerFetchInFlight header, PeerFetchStatus header)
forall (m :: * -> *) a.
(MonadSTM m, ?callStack::CallStack) =>
STM m a -> m a
atomically (STM m (PeerFetchInFlight header, PeerFetchStatus header)
-> m (PeerFetchInFlight header, PeerFetchStatus header))
-> STM m (PeerFetchInFlight header, PeerFetchStatus header)
-> m (PeerFetchInFlight header, PeerFetchStatus header)
forall a b. (a -> b) -> a -> b
$ do
PeerFetchInFlight header
inflight <- StrictTVar m (PeerFetchInFlight header)
-> STM m (PeerFetchInFlight header)
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m (PeerFetchInFlight header)
fetchClientInFlightVar
let !inflight' :: PeerFetchInFlight header
inflight' = (header -> SizeInBytes)
-> header -> PeerFetchInFlight header -> PeerFetchInFlight header
forall header.
HasHeader header =>
(header -> SizeInBytes)
-> header -> PeerFetchInFlight header -> PeerFetchInFlight header
deleteHeaderInFlight header -> SizeInBytes
blockFetchSize header
header PeerFetchInFlight header
inflight
StrictTVar m (PeerFetchInFlight header)
-> PeerFetchInFlight header -> STM m ()
forall (m :: * -> *) a.
(MonadSTM m, ?callStack::CallStack) =>
StrictTVar m a -> a -> STM m ()
writeTVar StrictTVar m (PeerFetchInFlight header)
fetchClientInFlightVar PeerFetchInFlight header
inflight'
PeerFetchStatus header
currentStatus' <- (PeerFetchInFlight header -> PeerFetchStatus header)
-> StrictTVar m (PeerFetchStatus header)
-> PeerFetchInFlight header
-> STM m (PeerFetchStatus header)
forall (m :: * -> *) header.
(MonadSTM m, HasHeader header) =>
(PeerFetchInFlight header -> PeerFetchStatus header)
-> StrictTVar m (PeerFetchStatus header)
-> PeerFetchInFlight header
-> STM m (PeerFetchStatus header)
updateCurrentStatus
(PeerFetchInFlightLimits
-> PeerFetchInFlight header -> PeerFetchStatus header
forall header.
PeerFetchInFlightLimits
-> PeerFetchInFlight header -> PeerFetchStatus header
readyIfUnderLowWatermark PeerFetchInFlightLimits
inflightlimits)
StrictTVar m (PeerFetchStatus header)
fetchClientStatusVar
PeerFetchInFlight header
inflight'
(PeerFetchInFlight header, PeerFetchStatus header)
-> STM m (PeerFetchInFlight header, PeerFetchStatus header)
forall (m :: * -> *) a. Monad m => a -> m a
return (PeerFetchInFlight header
inflight', PeerFetchStatus header
currentStatus')
Tracer m (TraceFetchClientState header)
-> TraceFetchClientState header -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (TraceFetchClientState header)
tracer (TraceFetchClientState header -> m ())
-> TraceFetchClientState header -> m ()
forall a b. (a -> b) -> a -> b
$
Point header
-> PeerFetchInFlight header
-> PeerFetchInFlightLimits
-> PeerFetchStatus header
-> NominalDiffTime
-> SizeInBytes
-> TraceFetchClientState header
forall header.
Point header
-> PeerFetchInFlight header
-> PeerFetchInFlightLimits
-> PeerFetchStatus header
-> NominalDiffTime
-> SizeInBytes
-> TraceFetchClientState header
CompletedBlockFetch
(header -> Point header
forall block. HasHeader block => block -> Point block
blockPoint header
header)
PeerFetchInFlight header
inflight' PeerFetchInFlightLimits
inflightlimits
PeerFetchStatus header
currentStatus'
NominalDiffTime
blockDelay
(header -> SizeInBytes
blockFetchSize header
header)
completeFetchBatch :: MonadSTM m
=> Tracer m (TraceFetchClientState header)
-> PeerFetchInFlightLimits
-> ChainRange (Point header)
-> FetchClientStateVars m header
-> m ()
completeFetchBatch :: Tracer m (TraceFetchClientState header)
-> PeerFetchInFlightLimits
-> ChainRange (Point header)
-> FetchClientStateVars m header
-> m ()
completeFetchBatch Tracer m (TraceFetchClientState header)
tracer PeerFetchInFlightLimits
inflightlimits ChainRange (Point header)
range
FetchClientStateVars {
StrictTVar m (PeerFetchInFlight header)
fetchClientInFlightVar :: StrictTVar m (PeerFetchInFlight header)
fetchClientInFlightVar :: forall (m :: * -> *) header.
FetchClientStateVars m header
-> StrictTVar m (PeerFetchInFlight header)
fetchClientInFlightVar,
StrictTVar m (PeerFetchStatus header)
fetchClientStatusVar :: StrictTVar m (PeerFetchStatus header)
fetchClientStatusVar :: forall (m :: * -> *) header.
FetchClientStateVars m header
-> StrictTVar m (PeerFetchStatus header)
fetchClientStatusVar
} = do
(PeerFetchInFlight header
inflight, PeerFetchStatus header
currentStatus) <- STM m (PeerFetchInFlight header, PeerFetchStatus header)
-> m (PeerFetchInFlight header, PeerFetchStatus header)
forall (m :: * -> *) a.
(MonadSTM m, ?callStack::CallStack) =>
STM m a -> m a
atomically (STM m (PeerFetchInFlight header, PeerFetchStatus header)
-> m (PeerFetchInFlight header, PeerFetchStatus header))
-> STM m (PeerFetchInFlight header, PeerFetchStatus header)
-> m (PeerFetchInFlight header, PeerFetchStatus header)
forall a b. (a -> b) -> a -> b
$ do
PeerFetchInFlight header
inflight <- StrictTVar m (PeerFetchInFlight header)
-> STM m (PeerFetchInFlight header)
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m (PeerFetchInFlight header)
fetchClientInFlightVar
let !inflight' :: PeerFetchInFlight header
inflight' =
Bool -> PeerFetchInFlight header -> PeerFetchInFlight header
forall a. (?callStack::CallStack) => Bool -> a -> a
assert (if PeerFetchInFlight header -> Word
forall header. PeerFetchInFlight header -> Word
peerFetchReqsInFlight PeerFetchInFlight header
inflight Word -> Word -> Bool
forall a. Eq a => a -> a -> Bool
== Word
1
then PeerFetchInFlight header -> SizeInBytes
forall header. PeerFetchInFlight header -> SizeInBytes
peerFetchBytesInFlight PeerFetchInFlight header
inflight SizeInBytes -> SizeInBytes -> Bool
forall a. Eq a => a -> a -> Bool
== SizeInBytes
0
Bool -> Bool -> Bool
&& Set (Point header) -> Bool
forall a. Set a -> Bool
Set.null (PeerFetchInFlight header -> Set (Point header)
forall header. PeerFetchInFlight header -> Set (Point header)
peerFetchBlocksInFlight PeerFetchInFlight header
inflight)
else Bool
True)
PeerFetchInFlight header
inflight {
peerFetchReqsInFlight :: Word
peerFetchReqsInFlight = PeerFetchInFlight header -> Word
forall header. PeerFetchInFlight header -> Word
peerFetchReqsInFlight PeerFetchInFlight header
inflight Word -> Word -> Word
forall a. Num a => a -> a -> a
- Word
1
}
StrictTVar m (PeerFetchInFlight header)
-> PeerFetchInFlight header -> STM m ()
forall (m :: * -> *) a.
(MonadSTM m, ?callStack::CallStack) =>
StrictTVar m a -> a -> STM m ()
writeTVar StrictTVar m (PeerFetchInFlight header)
fetchClientInFlightVar PeerFetchInFlight header
inflight'
PeerFetchStatus header
currentStatus' <- StrictTVar m (PeerFetchStatus header)
-> STM m (PeerFetchStatus header)
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m (PeerFetchStatus header)
fetchClientStatusVar STM m (PeerFetchStatus header)
-> (PeerFetchStatus header -> STM m (PeerFetchStatus header))
-> STM m (PeerFetchStatus header)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
PeerFetchStatusReady Set (Point header)
bs IsIdle
IsNotIdle
| Set (Point header) -> Bool
forall a. Set a -> Bool
Set.null Set (Point header)
bs
Bool -> Bool -> Bool
&& Word
0 Word -> Word -> Bool
forall a. Eq a => a -> a -> Bool
== PeerFetchInFlight header -> Word
forall header. PeerFetchInFlight header -> Word
peerFetchReqsInFlight PeerFetchInFlight header
inflight'
-> let status :: PeerFetchStatus header
status = Set (Point header) -> IsIdle -> PeerFetchStatus header
forall header.
Set (Point header) -> IsIdle -> PeerFetchStatus header
PeerFetchStatusReady Set (Point header)
forall a. Set a
Set.empty IsIdle
IsIdle
in PeerFetchStatus header
forall header. PeerFetchStatus header
status PeerFetchStatus header
-> STM m () -> STM m (PeerFetchStatus header)
forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ StrictTVar m (PeerFetchStatus header)
-> PeerFetchStatus header -> STM m ()
forall (m :: * -> *) a.
(MonadSTM m, ?callStack::CallStack) =>
StrictTVar m a -> a -> STM m ()
writeTVar StrictTVar m (PeerFetchStatus header)
fetchClientStatusVar PeerFetchStatus header
forall header. PeerFetchStatus header
status
PeerFetchStatus header
currentStatus -> PeerFetchStatus header -> STM m (PeerFetchStatus header)
forall (f :: * -> *) a. Applicative f => a -> f a
pure PeerFetchStatus header
currentStatus
(PeerFetchInFlight header, PeerFetchStatus header)
-> STM m (PeerFetchInFlight header, PeerFetchStatus header)
forall (m :: * -> *) a. Monad m => a -> m a
return (PeerFetchInFlight header
inflight', PeerFetchStatus header
currentStatus')
Tracer m (TraceFetchClientState header)
-> TraceFetchClientState header -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (TraceFetchClientState header)
tracer (TraceFetchClientState header -> m ())
-> TraceFetchClientState header -> m ()
forall a b. (a -> b) -> a -> b
$
ChainRange (Point header)
-> PeerFetchInFlight header
-> PeerFetchInFlightLimits
-> PeerFetchStatus header
-> TraceFetchClientState header
forall header.
ChainRange (Point header)
-> PeerFetchInFlight header
-> PeerFetchInFlightLimits
-> PeerFetchStatus header
-> TraceFetchClientState header
CompletedFetchBatch
ChainRange (Point header)
range
PeerFetchInFlight header
inflight PeerFetchInFlightLimits
inflightlimits
PeerFetchStatus header
currentStatus
rejectedFetchBatch :: (MonadSTM m, HasHeader header)
=> Tracer m (TraceFetchClientState header)
-> (header -> SizeInBytes)
-> PeerFetchInFlightLimits
-> ChainRange (Point header)
-> [header]
-> FetchClientStateVars m header
-> m ()
rejectedFetchBatch :: Tracer m (TraceFetchClientState header)
-> (header -> SizeInBytes)
-> PeerFetchInFlightLimits
-> ChainRange (Point header)
-> [header]
-> FetchClientStateVars m header
-> m ()
rejectedFetchBatch Tracer m (TraceFetchClientState header)
tracer header -> SizeInBytes
blockFetchSize PeerFetchInFlightLimits
inflightlimits ChainRange (Point header)
range [header]
headers
FetchClientStateVars {
StrictTVar m (PeerFetchInFlight header)
fetchClientInFlightVar :: StrictTVar m (PeerFetchInFlight header)
fetchClientInFlightVar :: forall (m :: * -> *) header.
FetchClientStateVars m header
-> StrictTVar m (PeerFetchInFlight header)
fetchClientInFlightVar,
StrictTVar m (PeerFetchStatus header)
fetchClientStatusVar :: StrictTVar m (PeerFetchStatus header)
fetchClientStatusVar :: forall (m :: * -> *) header.
FetchClientStateVars m header
-> StrictTVar m (PeerFetchStatus header)
fetchClientStatusVar
} = do
(PeerFetchInFlight header
inflight', PeerFetchStatus header
currentStatus') <- STM m (PeerFetchInFlight header, PeerFetchStatus header)
-> m (PeerFetchInFlight header, PeerFetchStatus header)
forall (m :: * -> *) a.
(MonadSTM m, ?callStack::CallStack) =>
STM m a -> m a
atomically (STM m (PeerFetchInFlight header, PeerFetchStatus header)
-> m (PeerFetchInFlight header, PeerFetchStatus header))
-> STM m (PeerFetchInFlight header, PeerFetchStatus header)
-> m (PeerFetchInFlight header, PeerFetchStatus header)
forall a b. (a -> b) -> a -> b
$ do
PeerFetchInFlight header
inflight <- StrictTVar m (PeerFetchInFlight header)
-> STM m (PeerFetchInFlight header)
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m (PeerFetchInFlight header)
fetchClientInFlightVar
let !inflight' :: PeerFetchInFlight header
inflight' =
((header -> SizeInBytes)
-> [header] -> PeerFetchInFlight header -> PeerFetchInFlight header
forall header.
HasHeader header =>
(header -> SizeInBytes)
-> [header] -> PeerFetchInFlight header -> PeerFetchInFlight header
deleteHeadersInFlight header -> SizeInBytes
blockFetchSize [header]
headers PeerFetchInFlight header
inflight) {
peerFetchReqsInFlight :: Word
peerFetchReqsInFlight = PeerFetchInFlight header -> Word
forall header. PeerFetchInFlight header -> Word
peerFetchReqsInFlight PeerFetchInFlight header
inflight Word -> Word -> Word
forall a. Num a => a -> a -> a
- Word
1
}
StrictTVar m (PeerFetchInFlight header)
-> PeerFetchInFlight header -> STM m ()
forall (m :: * -> *) a.
(MonadSTM m, ?callStack::CallStack) =>
StrictTVar m a -> a -> STM m ()
writeTVar StrictTVar m (PeerFetchInFlight header)
fetchClientInFlightVar PeerFetchInFlight header
inflight'
PeerFetchStatus header
currentStatus' <- (PeerFetchInFlight header -> PeerFetchStatus header)
-> StrictTVar m (PeerFetchStatus header)
-> PeerFetchInFlight header
-> STM m (PeerFetchStatus header)
forall (m :: * -> *) header.
(MonadSTM m, HasHeader header) =>
(PeerFetchInFlight header -> PeerFetchStatus header)
-> StrictTVar m (PeerFetchStatus header)
-> PeerFetchInFlight header
-> STM m (PeerFetchStatus header)
updateCurrentStatus
(PeerFetchInFlightLimits
-> PeerFetchInFlight header -> PeerFetchStatus header
forall header.
PeerFetchInFlightLimits
-> PeerFetchInFlight header -> PeerFetchStatus header
readyIfUnderLowWatermark PeerFetchInFlightLimits
inflightlimits)
StrictTVar m (PeerFetchStatus header)
fetchClientStatusVar
PeerFetchInFlight header
inflight'
(PeerFetchInFlight header, PeerFetchStatus header)
-> STM m (PeerFetchInFlight header, PeerFetchStatus header)
forall (m :: * -> *) a. Monad m => a -> m a
return (PeerFetchInFlight header
inflight', PeerFetchStatus header
currentStatus')
Tracer m (TraceFetchClientState header)
-> TraceFetchClientState header -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (TraceFetchClientState header)
tracer (TraceFetchClientState header -> m ())
-> TraceFetchClientState header -> m ()
forall a b. (a -> b) -> a -> b
$
ChainRange (Point header)
-> PeerFetchInFlight header
-> PeerFetchInFlightLimits
-> PeerFetchStatus header
-> TraceFetchClientState header
forall header.
ChainRange (Point header)
-> PeerFetchInFlight header
-> PeerFetchInFlightLimits
-> PeerFetchStatus header
-> TraceFetchClientState header
RejectedFetchBatch
ChainRange (Point header)
range
PeerFetchInFlight header
inflight' PeerFetchInFlightLimits
inflightlimits
PeerFetchStatus header
currentStatus'
updateCurrentStatus :: (MonadSTM m, HasHeader header)
=> (PeerFetchInFlight header -> PeerFetchStatus header)
-> StrictTVar m (PeerFetchStatus header)
-> PeerFetchInFlight header
-> STM m (PeerFetchStatus header)
updateCurrentStatus :: (PeerFetchInFlight header -> PeerFetchStatus header)
-> StrictTVar m (PeerFetchStatus header)
-> PeerFetchInFlight header
-> STM m (PeerFetchStatus header)
updateCurrentStatus PeerFetchInFlight header -> PeerFetchStatus header
decideCurrentStatus StrictTVar m (PeerFetchStatus header)
fetchClientStatusVar PeerFetchInFlight header
inflight = do
let currentStatus' :: PeerFetchStatus header
currentStatus' = PeerFetchInFlight header -> PeerFetchStatus header
decideCurrentStatus PeerFetchInFlight header
inflight
PeerFetchStatus header
currentStatus <- StrictTVar m (PeerFetchStatus header)
-> STM m (PeerFetchStatus header)
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m (PeerFetchStatus header)
fetchClientStatusVar
Bool -> STM m () -> STM m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (PeerFetchStatus header
currentStatus' PeerFetchStatus header -> PeerFetchStatus header -> Bool
forall a. Eq a => a -> a -> Bool
/= PeerFetchStatus header
currentStatus) (STM m () -> STM m ()) -> STM m () -> STM m ()
forall a b. (a -> b) -> a -> b
$
StrictTVar m (PeerFetchStatus header)
-> PeerFetchStatus header -> STM m ()
forall (m :: * -> *) a.
(MonadSTM m, ?callStack::CallStack) =>
StrictTVar m a -> a -> STM m ()
writeTVar StrictTVar m (PeerFetchStatus header)
fetchClientStatusVar PeerFetchStatus header
currentStatus'
PeerFetchStatus header -> STM m (PeerFetchStatus header)
forall (m :: * -> *) a. Monad m => a -> m a
return PeerFetchStatus header
currentStatus'
busyIfOverHighWatermark :: PeerFetchInFlightLimits
-> PeerFetchInFlight header
-> PeerFetchStatus header
busyIfOverHighWatermark :: PeerFetchInFlightLimits
-> PeerFetchInFlight header -> PeerFetchStatus header
busyIfOverHighWatermark PeerFetchInFlightLimits
inflightlimits PeerFetchInFlight header
inflight
| PeerFetchInFlight header -> SizeInBytes
forall header. PeerFetchInFlight header -> SizeInBytes
peerFetchBytesInFlight PeerFetchInFlight header
inflight SizeInBytes -> SizeInBytes -> Bool
forall a. Ord a => a -> a -> Bool
>= PeerFetchInFlightLimits -> SizeInBytes
inFlightBytesHighWatermark PeerFetchInFlightLimits
inflightlimits
= PeerFetchStatus header
forall header. PeerFetchStatus header
PeerFetchStatusBusy
| Bool
otherwise
= Set (Point header) -> IsIdle -> PeerFetchStatus header
forall header.
Set (Point header) -> IsIdle -> PeerFetchStatus header
PeerFetchStatusReady
(PeerFetchInFlight header -> Set (Point header)
forall header. PeerFetchInFlight header -> Set (Point header)
peerFetchBlocksInFlight PeerFetchInFlight header
inflight)
(Bool -> IsIdle
idleIf (Word
0 Word -> Word -> Bool
forall a. Eq a => a -> a -> Bool
== PeerFetchInFlight header -> Word
forall header. PeerFetchInFlight header -> Word
peerFetchReqsInFlight PeerFetchInFlight header
inflight))
readyIfUnderLowWatermark :: PeerFetchInFlightLimits
-> PeerFetchInFlight header
-> PeerFetchStatus header
readyIfUnderLowWatermark :: PeerFetchInFlightLimits
-> PeerFetchInFlight header -> PeerFetchStatus header
readyIfUnderLowWatermark PeerFetchInFlightLimits
inflightlimits PeerFetchInFlight header
inflight
| PeerFetchInFlight header -> SizeInBytes
forall header. PeerFetchInFlight header -> SizeInBytes
peerFetchBytesInFlight PeerFetchInFlight header
inflight SizeInBytes -> SizeInBytes -> Bool
forall a. Ord a => a -> a -> Bool
<= PeerFetchInFlightLimits -> SizeInBytes
inFlightBytesLowWatermark PeerFetchInFlightLimits
inflightlimits
= Set (Point header) -> IsIdle -> PeerFetchStatus header
forall header.
Set (Point header) -> IsIdle -> PeerFetchStatus header
PeerFetchStatusReady
(PeerFetchInFlight header -> Set (Point header)
forall header. PeerFetchInFlight header -> Set (Point header)
peerFetchBlocksInFlight PeerFetchInFlight header
inflight)
(Bool -> IsIdle
idleIf (Word
0 Word -> Word -> Bool
forall a. Eq a => a -> a -> Bool
== PeerFetchInFlight header -> Word
forall header. PeerFetchInFlight header -> Word
peerFetchReqsInFlight PeerFetchInFlight header
inflight))
| Bool
otherwise
= PeerFetchStatus header
forall header. PeerFetchStatus header
PeerFetchStatusBusy
type TFetchRequestVar m header =
TMergeVar m (FetchRequest header,
Last PeerGSV,
Last PeerFetchInFlightLimits)
newTFetchRequestVar :: MonadSTM m => STM m (TFetchRequestVar m header)
newTFetchRequestVar :: STM m (TFetchRequestVar m header)
newTFetchRequestVar = STM m (TFetchRequestVar m header)
forall (m :: * -> *) a. MonadSTM m => STM m (TMergeVar m a)
newTMergeVar
writeTFetchRequestVar :: (MonadSTM m, HasHeader header)
=> TFetchRequestVar m header
-> FetchRequest header
-> PeerGSV
-> PeerFetchInFlightLimits
-> STM m (FetchRequest header)
writeTFetchRequestVar :: TFetchRequestVar m header
-> FetchRequest header
-> PeerGSV
-> PeerFetchInFlightLimits
-> STM m (FetchRequest header)
writeTFetchRequestVar TFetchRequestVar m header
v FetchRequest header
r PeerGSV
g PeerFetchInFlightLimits
l = do
(FetchRequest header
r', Last PeerGSV
_, Last PeerFetchInFlightLimits
_) <- TFetchRequestVar m header
-> (FetchRequest header, Last PeerGSV,
Last PeerFetchInFlightLimits)
-> STM
m (FetchRequest header, Last PeerGSV, Last PeerFetchInFlightLimits)
forall (m :: * -> *) a.
(MonadSTM m, Semigroup a) =>
TMergeVar m a -> a -> STM m a
writeTMergeVar TFetchRequestVar m header
v (FetchRequest header
r, PeerGSV -> Last PeerGSV
forall a. a -> Last a
Last PeerGSV
g, PeerFetchInFlightLimits -> Last PeerFetchInFlightLimits
forall a. a -> Last a
Last PeerFetchInFlightLimits
l)
FetchRequest header -> STM m (FetchRequest header)
forall (m :: * -> *) a. Monad m => a -> m a
return FetchRequest header
r'
peekTFetchRequestVar :: MonadSTM m
=> TFetchRequestVar m header
-> STM m (Maybe (FetchRequest header))
peekTFetchRequestVar :: TFetchRequestVar m header -> STM m (Maybe (FetchRequest header))
peekTFetchRequestVar TFetchRequestVar m header
v = ((FetchRequest header, Last PeerGSV, Last PeerFetchInFlightLimits)
-> FetchRequest header)
-> Maybe
(FetchRequest header, Last PeerGSV, Last PeerFetchInFlightLimits)
-> Maybe (FetchRequest header)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (\(FetchRequest header
x, Last PeerGSV
_, Last PeerFetchInFlightLimits
_) -> FetchRequest header
x) (Maybe
(FetchRequest header, Last PeerGSV, Last PeerFetchInFlightLimits)
-> Maybe (FetchRequest header))
-> STM
m
(Maybe
(FetchRequest header, Last PeerGSV, Last PeerFetchInFlightLimits))
-> STM m (Maybe (FetchRequest header))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TFetchRequestVar m header
-> STM
m
(Maybe
(FetchRequest header, Last PeerGSV, Last PeerFetchInFlightLimits))
forall (m :: * -> *) a.
MonadSTM m =>
TMergeVar m a -> STM m (Maybe a)
tryReadTMergeVar TFetchRequestVar m header
v
takeTFetchRequestVar :: MonadSTM m
=> TFetchRequestVar m header
-> STM m (FetchRequest header,
PeerGSV,
PeerFetchInFlightLimits)
takeTFetchRequestVar :: TFetchRequestVar m header
-> STM m (FetchRequest header, PeerGSV, PeerFetchInFlightLimits)
takeTFetchRequestVar TFetchRequestVar m header
v = (\(FetchRequest header
r,Last PeerGSV
g,Last PeerFetchInFlightLimits
l) -> (FetchRequest header
r, Last PeerGSV -> PeerGSV
forall a. Last a -> a
getLast Last PeerGSV
g, Last PeerFetchInFlightLimits -> PeerFetchInFlightLimits
forall a. Last a -> a
getLast Last PeerFetchInFlightLimits
l))
((FetchRequest header, Last PeerGSV, Last PeerFetchInFlightLimits)
-> (FetchRequest header, PeerGSV, PeerFetchInFlightLimits))
-> STM
m (FetchRequest header, Last PeerGSV, Last PeerFetchInFlightLimits)
-> STM m (FetchRequest header, PeerGSV, PeerFetchInFlightLimits)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TFetchRequestVar m header
-> STM
m (FetchRequest header, Last PeerGSV, Last PeerFetchInFlightLimits)
forall (m :: * -> *) a. MonadSTM m => TMergeVar m a -> STM m a
takeTMergeVar TFetchRequestVar m header
v
newtype TMergeVar m a = TMergeVar (StrictTMVar m a)
newTMergeVar :: MonadSTM m => STM m (TMergeVar m a)
newTMergeVar :: STM m (TMergeVar m a)
newTMergeVar = StrictTMVar m a -> TMergeVar m a
forall (m :: * -> *) a. StrictTMVar m a -> TMergeVar m a
TMergeVar (StrictTMVar m a -> TMergeVar m a)
-> STM m (StrictTMVar m a) -> STM m (TMergeVar m a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> STM m (StrictTMVar m a)
forall (m :: * -> *) a. MonadSTM m => STM m (StrictTMVar m a)
newEmptyTMVar
writeTMergeVar :: (MonadSTM m, Semigroup a) => TMergeVar m a -> a -> STM m a
writeTMergeVar :: TMergeVar m a -> a -> STM m a
writeTMergeVar (TMergeVar StrictTMVar m a
v) a
x = do
Maybe a
mx0 <- StrictTMVar m a -> STM m (Maybe a)
forall (m :: * -> *) a.
MonadSTM m =>
StrictTMVar m a -> STM m (Maybe a)
tryTakeTMVar StrictTMVar m a
v
case Maybe a
mx0 of
Maybe a
Nothing -> a
x a -> STM m () -> STM m a
forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ StrictTMVar m a -> a -> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
StrictTMVar m a -> a -> STM m ()
putTMVar StrictTMVar m a
v a
x
Just a
x0 -> a
x' a -> STM m () -> STM m a
forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ StrictTMVar m a -> a -> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
StrictTMVar m a -> a -> STM m ()
putTMVar StrictTMVar m a
v a
x' where !x' :: a
x' = a
x0 a -> a -> a
forall a. Semigroup a => a -> a -> a
<> a
x
takeTMergeVar :: MonadSTM m => TMergeVar m a -> STM m a
takeTMergeVar :: TMergeVar m a -> STM m a
takeTMergeVar (TMergeVar StrictTMVar m a
v) = StrictTMVar m a -> STM m a
forall (m :: * -> *) a. MonadSTM m => StrictTMVar m a -> STM m a
takeTMVar StrictTMVar m a
v
tryReadTMergeVar :: MonadSTM m
=> TMergeVar m a
-> STM m (Maybe a)
tryReadTMergeVar :: TMergeVar m a -> STM m (Maybe a)
tryReadTMergeVar (TMergeVar StrictTMVar m a
v) = StrictTMVar m a -> STM m (Maybe a)
forall (m :: * -> *) a.
MonadSTM m =>
StrictTMVar m a -> STM m (Maybe a)
tryReadTMVar StrictTMVar m a
v
newtype FromConsensus a = FromConsensus {FromConsensus a -> a
unFromConsensus :: a}
deriving (a -> FromConsensus b -> FromConsensus a
(a -> b) -> FromConsensus a -> FromConsensus b
(forall a b. (a -> b) -> FromConsensus a -> FromConsensus b)
-> (forall a b. a -> FromConsensus b -> FromConsensus a)
-> Functor FromConsensus
forall a b. a -> FromConsensus b -> FromConsensus a
forall a b. (a -> b) -> FromConsensus a -> FromConsensus b
forall (f :: * -> *).
(forall a b. (a -> b) -> f a -> f b)
-> (forall a b. a -> f b -> f a) -> Functor f
<$ :: a -> FromConsensus b -> FromConsensus a
$c<$ :: forall a b. a -> FromConsensus b -> FromConsensus a
fmap :: (a -> b) -> FromConsensus a -> FromConsensus b
$cfmap :: forall a b. (a -> b) -> FromConsensus a -> FromConsensus b
Functor)
instance Applicative FromConsensus where
pure :: a -> FromConsensus a
pure = a -> FromConsensus a
forall a. a -> FromConsensus a
FromConsensus
FromConsensus a -> b
f <*> :: FromConsensus (a -> b) -> FromConsensus a -> FromConsensus b
<*> FromConsensus a
a = b -> FromConsensus b
forall a. a -> FromConsensus a
FromConsensus (a -> b
f a
a)