{-# LANGUAGE FlexibleContexts    #-}
{-# LANGUAGE NamedFieldPuns      #-}
{-# LANGUAGE RecordWildCards     #-}
{-# LANGUAGE ScopedTypeVariables #-}

module Ouroboros.Network.BlockFetch.ClientRegistry
  ( -- * Registry of block fetch clients
    FetchClientRegistry
  , newFetchClientRegistry
  , bracketFetchClient
  , bracketKeepAliveClient
  , bracketSyncWithFetchClient
  , setFetchClientContext
  , FetchClientPolicy (..)
  , readFetchClientsStatus
  , readFetchClientsStateVars
  , readPeerGSVs
  ) where

import           Data.Functor.Contravariant (contramap)
import           Data.Map (Map)
import qualified Data.Map as Map

import           Control.Exception (assert)
import           Control.Monad (unless)
import           Control.Monad.Class.MonadAsync
import           Control.Monad.Class.MonadFork (MonadFork (throwTo),
                     MonadThread (ThreadId, myThreadId))
import           Control.Monad.Class.MonadSTM.Strict
import           Control.Monad.Class.MonadThrow
import           Control.Tracer (Tracer)

import           Ouroboros.Network.BlockFetch.ClientState
import           Ouroboros.Network.DeltaQ
import           Ouroboros.Network.NodeToNode.Version (NodeToNodeVersion (..),
                     isPipeliningEnabled)



-- | A registry for the threads that are executing the client side of the
-- 'BlockFetch' protocol to communicate with our peers.
--
-- The registry contains the shared variables we use to communicate with these
-- threads, both to track their status and to provide instructions.
--
-- The threads add\/remove themselves to\/from this registry when they start up
-- and shut down.
--
data FetchClientRegistry peer header block m =
     FetchClientRegistry
       (StrictTMVar m (Tracer m (TraceLabelPeer peer (TraceFetchClientState header)),
                 WhetherReceivingTentativeBlocks -> STM m (FetchClientPolicy header block m)))
       (StrictTVar  m (Map peer (FetchClientStateVars m header)))
       (StrictTVar  m (Map peer (ThreadId m, StrictTMVar m ())))
       (StrictTVar  m (Map peer PeerGSV))
       (StrictTVar  m (Map peer (ThreadId m, StrictTMVar m ())))

newFetchClientRegistry :: MonadSTM m
                       => m (FetchClientRegistry peer header block m)
newFetchClientRegistry :: m (FetchClientRegistry peer header block m)
newFetchClientRegistry = StrictTMVar
  m
  (Tracer m (TraceLabelPeer peer (TraceFetchClientState header)),
   WhetherReceivingTentativeBlocks
   -> STM m (FetchClientPolicy header block m))
-> StrictTVar m (Map peer (FetchClientStateVars m header))
-> StrictTVar m (Map peer (ThreadId m, StrictTMVar m ()))
-> StrictTVar m (Map peer PeerGSV)
-> StrictTVar m (Map peer (ThreadId m, StrictTMVar m ()))
-> FetchClientRegistry peer header block m
forall peer header block (m :: * -> *).
StrictTMVar
  m
  (Tracer m (TraceLabelPeer peer (TraceFetchClientState header)),
   WhetherReceivingTentativeBlocks
   -> STM m (FetchClientPolicy header block m))
-> StrictTVar m (Map peer (FetchClientStateVars m header))
-> StrictTVar m (Map peer (ThreadId m, StrictTMVar m ()))
-> StrictTVar m (Map peer PeerGSV)
-> StrictTVar m (Map peer (ThreadId m, StrictTMVar m ()))
-> FetchClientRegistry peer header block m
FetchClientRegistry (StrictTMVar
   m
   (Tracer m (TraceLabelPeer peer (TraceFetchClientState header)),
    WhetherReceivingTentativeBlocks
    -> STM m (FetchClientPolicy header block m))
 -> StrictTVar m (Map peer (FetchClientStateVars m header))
 -> StrictTVar m (Map peer (ThreadId m, StrictTMVar m ()))
 -> StrictTVar m (Map peer PeerGSV)
 -> StrictTVar m (Map peer (ThreadId m, StrictTMVar m ()))
 -> FetchClientRegistry peer header block m)
-> m (StrictTMVar
        m
        (Tracer m (TraceLabelPeer peer (TraceFetchClientState header)),
         WhetherReceivingTentativeBlocks
         -> STM m (FetchClientPolicy header block m)))
-> m (StrictTVar m (Map peer (FetchClientStateVars m header))
      -> StrictTVar m (Map peer (ThreadId m, StrictTMVar m ()))
      -> StrictTVar m (Map peer PeerGSV)
      -> StrictTVar m (Map peer (ThreadId m, StrictTMVar m ()))
      -> FetchClientRegistry peer header block m)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> m (StrictTMVar
     m
     (Tracer m (TraceLabelPeer peer (TraceFetchClientState header)),
      WhetherReceivingTentativeBlocks
      -> STM m (FetchClientPolicy header block m)))
forall (m :: * -> *) a. MonadSTM m => m (StrictTMVar m a)
newEmptyTMVarIO
                                             m (StrictTVar m (Map peer (FetchClientStateVars m header))
   -> StrictTVar m (Map peer (ThreadId m, StrictTMVar m ()))
   -> StrictTVar m (Map peer PeerGSV)
   -> StrictTVar m (Map peer (ThreadId m, StrictTMVar m ()))
   -> FetchClientRegistry peer header block m)
-> m (StrictTVar m (Map peer (FetchClientStateVars m header)))
-> m (StrictTVar m (Map peer (ThreadId m, StrictTMVar m ()))
      -> StrictTVar m (Map peer PeerGSV)
      -> StrictTVar m (Map peer (ThreadId m, StrictTMVar m ()))
      -> FetchClientRegistry peer header block m)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Map peer (FetchClientStateVars m header)
-> m (StrictTVar m (Map peer (FetchClientStateVars m header)))
forall (m :: * -> *) a. MonadSTM m => a -> m (StrictTVar m a)
newTVarIO Map peer (FetchClientStateVars m header)
forall k a. Map k a
Map.empty
                                             m (StrictTVar m (Map peer (ThreadId m, StrictTMVar m ()))
   -> StrictTVar m (Map peer PeerGSV)
   -> StrictTVar m (Map peer (ThreadId m, StrictTMVar m ()))
   -> FetchClientRegistry peer header block m)
-> m (StrictTVar m (Map peer (ThreadId m, StrictTMVar m ())))
-> m (StrictTVar m (Map peer PeerGSV)
      -> StrictTVar m (Map peer (ThreadId m, StrictTMVar m ()))
      -> FetchClientRegistry peer header block m)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Map peer (ThreadId m, StrictTMVar m ())
-> m (StrictTVar m (Map peer (ThreadId m, StrictTMVar m ())))
forall (m :: * -> *) a. MonadSTM m => a -> m (StrictTVar m a)
newTVarIO Map peer (ThreadId m, StrictTMVar m ())
forall k a. Map k a
Map.empty
                                             m (StrictTVar m (Map peer PeerGSV)
   -> StrictTVar m (Map peer (ThreadId m, StrictTMVar m ()))
   -> FetchClientRegistry peer header block m)
-> m (StrictTVar m (Map peer PeerGSV))
-> m (StrictTVar m (Map peer (ThreadId m, StrictTMVar m ()))
      -> FetchClientRegistry peer header block m)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Map peer PeerGSV -> m (StrictTVar m (Map peer PeerGSV))
forall (m :: * -> *) a. MonadSTM m => a -> m (StrictTVar m a)
newTVarIO Map peer PeerGSV
forall k a. Map k a
Map.empty
                                             m (StrictTVar m (Map peer (ThreadId m, StrictTMVar m ()))
   -> FetchClientRegistry peer header block m)
-> m (StrictTVar m (Map peer (ThreadId m, StrictTMVar m ())))
-> m (FetchClientRegistry peer header block m)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Map peer (ThreadId m, StrictTMVar m ())
-> m (StrictTVar m (Map peer (ThreadId m, StrictTMVar m ())))
forall (m :: * -> *) a. MonadSTM m => a -> m (StrictTVar m a)
newTVarIO Map peer (ThreadId m, StrictTMVar m ())
forall k a. Map k a
Map.empty

-- | This is needed to start a block fetch client. It provides the required
-- 'FetchClientContext'. It registers and unregisters the fetch client on
-- start and end.
--
-- It also manages synchronisation with the corresponding chain sync client.
--
bracketFetchClient :: forall m a peer header block.
                      (MonadThrow m, MonadSTM m, MonadFork m, MonadMask m,
                       Ord peer)
                   => FetchClientRegistry peer header block m
                   -> NodeToNodeVersion
                   -> peer
                   -> (FetchClientContext header block m -> m a)
                   -> m a
bracketFetchClient :: FetchClientRegistry peer header block m
-> NodeToNodeVersion
-> peer
-> (FetchClientContext header block m -> m a)
-> m a
bracketFetchClient (FetchClientRegistry StrictTMVar
  m
  (Tracer m (TraceLabelPeer peer (TraceFetchClientState header)),
   WhetherReceivingTentativeBlocks
   -> STM m (FetchClientPolicy header block m))
ctxVar
                      StrictTVar m (Map peer (FetchClientStateVars m header))
fetchRegistry StrictTVar m (Map peer (ThreadId m, StrictTMVar m ()))
syncRegistry StrictTVar m (Map peer PeerGSV)
dqRegistry StrictTVar m (Map peer (ThreadId m, StrictTMVar m ()))
keepRegistry) NodeToNodeVersion
version peer
peer FetchClientContext header block m -> m a
action = do
    StrictTMVar m ()
ksVar <- m (StrictTMVar m ())
forall (m :: * -> *) a. MonadSTM m => m (StrictTMVar m a)
newEmptyTMVarIO
    m (FetchClientContext header block m,
   (ThreadId m, StrictTMVar m ()))
-> ((FetchClientContext header block m,
     (ThreadId m, StrictTMVar m ()))
    -> m ())
-> ((FetchClientContext header block m,
     (ThreadId m, StrictTMVar m ()))
    -> m a)
-> m a
forall (m :: * -> *) a b c.
MonadThrow m =>
m a -> (a -> m b) -> (a -> m c) -> m c
bracket (StrictTMVar m ()
-> m (FetchClientContext header block m,
      (ThreadId m, StrictTMVar m ()))
register StrictTMVar m ()
ksVar) ((FetchClientContext header block m
 -> (ThreadId m, StrictTMVar m ()) -> m ())
-> (FetchClientContext header block m,
    (ThreadId m, StrictTMVar m ()))
-> m ()
forall a b c. (a -> b -> c) -> (a, b) -> c
uncurry (StrictTMVar m ()
-> FetchClientContext header block m
-> (ThreadId m, StrictTMVar m ())
-> m ()
unregister StrictTMVar m ()
ksVar)) (FetchClientContext header block m -> m a
action (FetchClientContext header block m -> m a)
-> ((FetchClientContext header block m,
     (ThreadId m, StrictTMVar m ()))
    -> FetchClientContext header block m)
-> (FetchClientContext header block m,
    (ThreadId m, StrictTMVar m ()))
-> m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (FetchClientContext header block m, (ThreadId m, StrictTMVar m ()))
-> FetchClientContext header block m
forall a b. (a, b) -> a
fst)
  where
    register :: StrictTMVar m ()
             -> m ( FetchClientContext header block m
                  , (ThreadId m, StrictTMVar m ()) )
    register :: StrictTMVar m ()
-> m (FetchClientContext header block m,
      (ThreadId m, StrictTMVar m ()))
register StrictTMVar m ()
ksVar = do
      ThreadId m
tid <- m (ThreadId m)
forall (m :: * -> *). MonadThread m => m (ThreadId m)
myThreadId
      FetchClientContext header block m
ctx <- STM m (FetchClientContext header block m)
-> m (FetchClientContext header block m)
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m (FetchClientContext header block m)
 -> m (FetchClientContext header block m))
-> STM m (FetchClientContext header block m)
-> m (FetchClientContext header block m)
forall a b. (a -> b) -> a -> b
$ do
        -- blocks until setFetchClientContext is called
        (Tracer m (TraceLabelPeer peer (TraceFetchClientState header))
tracer, WhetherReceivingTentativeBlocks
-> STM m (FetchClientPolicy header block m)
mkPolicy) <- StrictTMVar
  m
  (Tracer m (TraceLabelPeer peer (TraceFetchClientState header)),
   WhetherReceivingTentativeBlocks
   -> STM m (FetchClientPolicy header block m))
-> STM
     m
     (Tracer m (TraceLabelPeer peer (TraceFetchClientState header)),
      WhetherReceivingTentativeBlocks
      -> STM m (FetchClientPolicy header block m))
forall (m :: * -> *) a. MonadSTM m => StrictTMVar m a -> STM m a
readTMVar StrictTMVar
  m
  (Tracer m (TraceLabelPeer peer (TraceFetchClientState header)),
   WhetherReceivingTentativeBlocks
   -> STM m (FetchClientPolicy header block m))
ctxVar

        -- wait for and register with keepAlive
        Map peer PeerGSV
dqPeers <- StrictTVar m (Map peer PeerGSV) -> STM m (Map peer PeerGSV)
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m (Map peer PeerGSV)
dqRegistry
        Bool -> STM m ()
forall (m :: * -> *). MonadSTM m => Bool -> STM m ()
check (peer
peer peer -> Map peer PeerGSV -> Bool
forall k a. Ord k => k -> Map k a -> Bool
`Map.member` Map peer PeerGSV
dqPeers)
        StrictTVar m (Map peer (ThreadId m, StrictTMVar m ()))
-> (Map peer (ThreadId m, StrictTMVar m ())
    -> Map peer (ThreadId m, StrictTMVar m ()))
-> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
StrictTVar m a -> (a -> a) -> STM m ()
modifyTVar StrictTVar m (Map peer (ThreadId m, StrictTMVar m ()))
keepRegistry ((Map peer (ThreadId m, StrictTMVar m ())
  -> Map peer (ThreadId m, StrictTMVar m ()))
 -> STM m ())
-> (Map peer (ThreadId m, StrictTMVar m ())
    -> Map peer (ThreadId m, StrictTMVar m ()))
-> STM m ()
forall a b. (a -> b) -> a -> b
$ \Map peer (ThreadId m, StrictTMVar m ())
m ->
          Bool
-> Map peer (ThreadId m, StrictTMVar m ())
-> Map peer (ThreadId m, StrictTMVar m ())
forall a. HasCallStack => Bool -> a -> a
assert (peer
peer peer -> Map peer (ThreadId m, StrictTMVar m ()) -> Bool
forall k a. Ord k => k -> Map k a -> Bool
`Map.notMember` Map peer (ThreadId m, StrictTMVar m ())
m) (Map peer (ThreadId m, StrictTMVar m ())
 -> Map peer (ThreadId m, StrictTMVar m ()))
-> Map peer (ThreadId m, StrictTMVar m ())
-> Map peer (ThreadId m, StrictTMVar m ())
forall a b. (a -> b) -> a -> b
$
          peer
-> (ThreadId m, StrictTMVar m ())
-> Map peer (ThreadId m, StrictTMVar m ())
-> Map peer (ThreadId m, StrictTMVar m ())
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert peer
peer (ThreadId m
tid, StrictTMVar m ()
ksVar) Map peer (ThreadId m, StrictTMVar m ())
m

        -- allocate the policy specific for this peer's negotiated version
        FetchClientPolicy header block m
policy <- do
          let pipeliningEnabled :: WhetherReceivingTentativeBlocks
pipeliningEnabled
                | NodeToNodeVersion -> Bool
isPipeliningEnabled NodeToNodeVersion
version = WhetherReceivingTentativeBlocks
ReceivingTentativeBlocks
                | Bool
otherwise                   = WhetherReceivingTentativeBlocks
NotReceivingTentativeBlocks
          WhetherReceivingTentativeBlocks
-> STM m (FetchClientPolicy header block m)
mkPolicy WhetherReceivingTentativeBlocks
pipeliningEnabled

        FetchClientStateVars m header
stateVars <- STM m (FetchClientStateVars m header)
forall (m :: * -> *) header.
MonadSTM m =>
STM m (FetchClientStateVars m header)
newFetchClientStateVars
        StrictTVar m (Map peer (FetchClientStateVars m header))
-> (Map peer (FetchClientStateVars m header)
    -> Map peer (FetchClientStateVars m header))
-> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
StrictTVar m a -> (a -> a) -> STM m ()
modifyTVar StrictTVar m (Map peer (FetchClientStateVars m header))
fetchRegistry ((Map peer (FetchClientStateVars m header)
  -> Map peer (FetchClientStateVars m header))
 -> STM m ())
-> (Map peer (FetchClientStateVars m header)
    -> Map peer (FetchClientStateVars m header))
-> STM m ()
forall a b. (a -> b) -> a -> b
$ \Map peer (FetchClientStateVars m header)
m ->
          Bool
-> Map peer (FetchClientStateVars m header)
-> Map peer (FetchClientStateVars m header)
forall a. HasCallStack => Bool -> a -> a
assert (peer
peer peer -> Map peer (FetchClientStateVars m header) -> Bool
forall k a. Ord k => k -> Map k a -> Bool
`Map.notMember` Map peer (FetchClientStateVars m header)
m) (Map peer (FetchClientStateVars m header)
 -> Map peer (FetchClientStateVars m header))
-> Map peer (FetchClientStateVars m header)
-> Map peer (FetchClientStateVars m header)
forall a b. (a -> b) -> a -> b
$
          peer
-> FetchClientStateVars m header
-> Map peer (FetchClientStateVars m header)
-> Map peer (FetchClientStateVars m header)
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert peer
peer FetchClientStateVars m header
stateVars Map peer (FetchClientStateVars m header)
m
        FetchClientContext header block m
-> STM m (FetchClientContext header block m)
forall (m :: * -> *) a. Monad m => a -> m a
return FetchClientContext :: forall header block (m :: * -> *).
Tracer m (TraceFetchClientState header)
-> FetchClientPolicy header block m
-> FetchClientStateVars m header
-> FetchClientContext header block m
FetchClientContext {
          fetchClientCtxTracer :: Tracer m (TraceFetchClientState header)
fetchClientCtxTracer    = (TraceFetchClientState header
 -> TraceLabelPeer peer (TraceFetchClientState header))
-> Tracer m (TraceLabelPeer peer (TraceFetchClientState header))
-> Tracer m (TraceFetchClientState header)
forall (f :: * -> *) a b. Contravariant f => (a -> b) -> f b -> f a
contramap (peer
-> TraceFetchClientState header
-> TraceLabelPeer peer (TraceFetchClientState header)
forall peerid a. peerid -> a -> TraceLabelPeer peerid a
TraceLabelPeer peer
peer) Tracer m (TraceLabelPeer peer (TraceFetchClientState header))
tracer,
          fetchClientCtxPolicy :: FetchClientPolicy header block m
fetchClientCtxPolicy    = FetchClientPolicy header block m
policy,
          fetchClientCtxStateVars :: FetchClientStateVars m header
fetchClientCtxStateVars = FetchClientStateVars m header
stateVars
        }
      -- Now wait for the sync client to start up:
      (ThreadId m, StrictTMVar m ())
syncclient <- STM m (ThreadId m, StrictTMVar m ())
-> m (ThreadId m, StrictTMVar m ())
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m (ThreadId m, StrictTMVar m ())
 -> m (ThreadId m, StrictTMVar m ()))
-> STM m (ThreadId m, StrictTMVar m ())
-> m (ThreadId m, StrictTMVar m ())
forall a b. (a -> b) -> a -> b
$ do
        Map peer (ThreadId m, StrictTMVar m ())
syncclients <- StrictTVar m (Map peer (ThreadId m, StrictTMVar m ()))
-> STM m (Map peer (ThreadId m, StrictTMVar m ()))
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m (Map peer (ThreadId m, StrictTMVar m ()))
syncRegistry
        case peer
-> Map peer (ThreadId m, StrictTMVar m ())
-> Maybe (ThreadId m, StrictTMVar m ())
forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup peer
peer Map peer (ThreadId m, StrictTMVar m ())
syncclients of
          Maybe (ThreadId m, StrictTMVar m ())
Nothing         -> STM m (ThreadId m, StrictTMVar m ())
forall (m :: * -> *) a. MonadSTM m => STM m a
retry
          Just (ThreadId m, StrictTMVar m ())
syncclient -> (ThreadId m, StrictTMVar m ())
-> STM m (ThreadId m, StrictTMVar m ())
forall (m :: * -> *) a. Monad m => a -> m a
return (ThreadId m, StrictTMVar m ())
syncclient
      (FetchClientContext header block m, (ThreadId m, StrictTMVar m ()))
-> m (FetchClientContext header block m,
      (ThreadId m, StrictTMVar m ()))
forall (m :: * -> *) a. Monad m => a -> m a
return (FetchClientContext header block m
ctx, (ThreadId m, StrictTMVar m ())
syncclient)

    unregister :: StrictTMVar m ()
               -> FetchClientContext header block m
               -> (ThreadId m, StrictTMVar m ())
               -> m ()
    unregister :: StrictTMVar m ()
-> FetchClientContext header block m
-> (ThreadId m, StrictTMVar m ())
-> m ()
unregister StrictTMVar m ()
ksVar FetchClientContext { fetchClientCtxStateVars :: forall header block (m :: * -> *).
FetchClientContext header block m -> FetchClientStateVars m header
fetchClientCtxStateVars = FetchClientStateVars m header
stateVars }
               (ThreadId m
tid, StrictTMVar m ()
doneVar)  = m () -> m ()
forall (m :: * -> *) a. MonadMask m => m a -> m a
uninterruptibleMask_ (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
      -- Signal we are shutting down
      STM m () -> m ()
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$
        StrictTVar m (PeerFetchStatus header)
-> PeerFetchStatus header -> STM m ()
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
StrictTVar m a -> a -> STM m ()
writeTVar (FetchClientStateVars m header
-> StrictTVar m (PeerFetchStatus header)
forall (m :: * -> *) header.
FetchClientStateVars m header
-> StrictTVar m (PeerFetchStatus header)
fetchClientStatusVar FetchClientStateVars m header
stateVars) PeerFetchStatus header
forall header. PeerFetchStatus header
PeerFetchStatusShutdown
      -- Kill the sync client if it is still running
      ThreadId m -> AsyncCancelled -> m ()
forall (m :: * -> *) e.
(MonadFork m, Exception e) =>
ThreadId m -> e -> m ()
throwTo ThreadId m
tid AsyncCancelled
AsyncCancelled
      -- Wait for the sync client to terminate and finally unregister ourselves
      STM m () -> m ()
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
        -- Signal to keepAlive that we're going away
        StrictTMVar m () -> () -> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
StrictTMVar m a -> a -> STM m ()
putTMVar StrictTMVar m ()
ksVar ()
        StrictTVar m (Map peer (ThreadId m, StrictTMVar m ()))
-> (Map peer (ThreadId m, StrictTMVar m ())
    -> Map peer (ThreadId m, StrictTMVar m ()))
-> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
StrictTVar m a -> (a -> a) -> STM m ()
modifyTVar StrictTVar m (Map peer (ThreadId m, StrictTMVar m ()))
keepRegistry ((Map peer (ThreadId m, StrictTMVar m ())
  -> Map peer (ThreadId m, StrictTMVar m ()))
 -> STM m ())
-> (Map peer (ThreadId m, StrictTMVar m ())
    -> Map peer (ThreadId m, StrictTMVar m ()))
-> STM m ()
forall a b. (a -> b) -> a -> b
$ \Map peer (ThreadId m, StrictTMVar m ())
m ->
          Bool
-> Map peer (ThreadId m, StrictTMVar m ())
-> Map peer (ThreadId m, StrictTMVar m ())
forall a. HasCallStack => Bool -> a -> a
assert (peer
peer peer -> Map peer (ThreadId m, StrictTMVar m ()) -> Bool
forall k a. Ord k => k -> Map k a -> Bool
`Map.member` Map peer (ThreadId m, StrictTMVar m ())
m) (Map peer (ThreadId m, StrictTMVar m ())
 -> Map peer (ThreadId m, StrictTMVar m ()))
-> Map peer (ThreadId m, StrictTMVar m ())
-> Map peer (ThreadId m, StrictTMVar m ())
forall a b. (a -> b) -> a -> b
$
          peer
-> Map peer (ThreadId m, StrictTMVar m ())
-> Map peer (ThreadId m, StrictTMVar m ())
forall k a. Ord k => k -> Map k a -> Map k a
Map.delete peer
peer Map peer (ThreadId m, StrictTMVar m ())
m

        StrictTMVar m () -> STM m ()
forall (m :: * -> *) a. MonadSTM m => StrictTMVar m a -> STM m a
readTMVar StrictTMVar m ()
doneVar
        StrictTVar m (Map peer (FetchClientStateVars m header))
-> (Map peer (FetchClientStateVars m header)
    -> Map peer (FetchClientStateVars m header))
-> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
StrictTVar m a -> (a -> a) -> STM m ()
modifyTVar StrictTVar m (Map peer (FetchClientStateVars m header))
fetchRegistry ((Map peer (FetchClientStateVars m header)
  -> Map peer (FetchClientStateVars m header))
 -> STM m ())
-> (Map peer (FetchClientStateVars m header)
    -> Map peer (FetchClientStateVars m header))
-> STM m ()
forall a b. (a -> b) -> a -> b
$ \Map peer (FetchClientStateVars m header)
m ->
          Bool
-> Map peer (FetchClientStateVars m header)
-> Map peer (FetchClientStateVars m header)
forall a. HasCallStack => Bool -> a -> a
assert (peer
peer peer -> Map peer (FetchClientStateVars m header) -> Bool
forall k a. Ord k => k -> Map k a -> Bool
`Map.member` Map peer (FetchClientStateVars m header)
m) (Map peer (FetchClientStateVars m header)
 -> Map peer (FetchClientStateVars m header))
-> Map peer (FetchClientStateVars m header)
-> Map peer (FetchClientStateVars m header)
forall a b. (a -> b) -> a -> b
$
          peer
-> Map peer (FetchClientStateVars m header)
-> Map peer (FetchClientStateVars m header)
forall k a. Ord k => k -> Map k a -> Map k a
Map.delete peer
peer Map peer (FetchClientStateVars m header)
m


-- | The block fetch and chain sync clients for each peer need to synchronise
-- their startup and shutdown. This bracket operation provides that
-- synchronisation for the chain sync client.
--
-- This must be used for the chain sync client /outside/ of its own state
-- registration and deregistration.
--
bracketSyncWithFetchClient :: forall m a peer header block.
                              (MonadThrow m, MonadSTM m, MonadFork m, Ord peer)
                           => FetchClientRegistry peer header block m
                           -> peer
                           -> m a
                           -> m a
bracketSyncWithFetchClient :: FetchClientRegistry peer header block m -> peer -> m a -> m a
bracketSyncWithFetchClient (FetchClientRegistry StrictTMVar
  m
  (Tracer m (TraceLabelPeer peer (TraceFetchClientState header)),
   WhetherReceivingTentativeBlocks
   -> STM m (FetchClientPolicy header block m))
_ctxVar
                              StrictTVar m (Map peer (FetchClientStateVars m header))
fetchRegistry StrictTVar m (Map peer (ThreadId m, StrictTMVar m ()))
syncRegistry StrictTVar m (Map peer PeerGSV)
_dqRegistry StrictTVar m (Map peer (ThreadId m, StrictTMVar m ()))
_keepRegistry) peer
peer m a
action = do
    StrictTMVar m ()
doneVar <- m (StrictTMVar m ())
forall (m :: * -> *) a. MonadSTM m => m (StrictTMVar m a)
newEmptyTMVarIO
    m () -> m () -> m a -> m a
forall (m :: * -> *) a b c.
MonadThrow m =>
m a -> m b -> m c -> m c
bracket_ (StrictTMVar m () -> m ()
register StrictTMVar m ()
doneVar) (StrictTMVar m () -> m ()
unregister StrictTMVar m ()
doneVar) m a
action
  where
    -- The goal here is that the block fetch client should be registered
    -- before the sync client starts running.
    --
    -- On the shutdown side, the sync client should stop before the block fetch
    -- is unregistered. This has to happen even if either client is terminated
    -- abnormally or being cancelled (which of course can happen in any order).

    register :: StrictTMVar m () -> m ()
    register :: StrictTMVar m () -> m ()
register StrictTMVar m ()
doneVar = do
      ThreadId m
tid <- m (ThreadId m)
forall (m :: * -> *). MonadThread m => m (ThreadId m)
myThreadId
      -- We wait for the fetch client to be registered, and register ourselves
      STM m () -> m ()
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
        Map peer (FetchClientStateVars m header)
fetchclients <- StrictTVar m (Map peer (FetchClientStateVars m header))
-> STM m (Map peer (FetchClientStateVars m header))
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m (Map peer (FetchClientStateVars m header))
fetchRegistry
        Bool -> STM m ()
forall (m :: * -> *). MonadSTM m => Bool -> STM m ()
check (peer
peer peer -> Map peer (FetchClientStateVars m header) -> Bool
forall k a. Ord k => k -> Map k a -> Bool
`Map.member` Map peer (FetchClientStateVars m header)
fetchclients)
        StrictTVar m (Map peer (ThreadId m, StrictTMVar m ()))
-> (Map peer (ThreadId m, StrictTMVar m ())
    -> Map peer (ThreadId m, StrictTMVar m ()))
-> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
StrictTVar m a -> (a -> a) -> STM m ()
modifyTVar StrictTVar m (Map peer (ThreadId m, StrictTMVar m ()))
syncRegistry ((Map peer (ThreadId m, StrictTMVar m ())
  -> Map peer (ThreadId m, StrictTMVar m ()))
 -> STM m ())
-> (Map peer (ThreadId m, StrictTMVar m ())
    -> Map peer (ThreadId m, StrictTMVar m ()))
-> STM m ()
forall a b. (a -> b) -> a -> b
$ \Map peer (ThreadId m, StrictTMVar m ())
m ->
          Bool
-> Map peer (ThreadId m, StrictTMVar m ())
-> Map peer (ThreadId m, StrictTMVar m ())
forall a. HasCallStack => Bool -> a -> a
assert (peer
peer peer -> Map peer (ThreadId m, StrictTMVar m ()) -> Bool
forall k a. Ord k => k -> Map k a -> Bool
`Map.notMember` Map peer (ThreadId m, StrictTMVar m ())
m) (Map peer (ThreadId m, StrictTMVar m ())
 -> Map peer (ThreadId m, StrictTMVar m ()))
-> Map peer (ThreadId m, StrictTMVar m ())
-> Map peer (ThreadId m, StrictTMVar m ())
forall a b. (a -> b) -> a -> b
$
          peer
-> (ThreadId m, StrictTMVar m ())
-> Map peer (ThreadId m, StrictTMVar m ())
-> Map peer (ThreadId m, StrictTMVar m ())
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert peer
peer (ThreadId m
tid, StrictTMVar m ()
doneVar) Map peer (ThreadId m, StrictTMVar m ())
m

    unregister :: StrictTMVar m () -> m ()
    unregister :: StrictTMVar m () -> m ()
unregister StrictTMVar m ()
doneVar =
      STM m () -> m ()
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
        StrictTMVar m () -> () -> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
StrictTMVar m a -> a -> STM m ()
putTMVar StrictTMVar m ()
doneVar ()
        StrictTVar m (Map peer (ThreadId m, StrictTMVar m ()))
-> (Map peer (ThreadId m, StrictTMVar m ())
    -> Map peer (ThreadId m, StrictTMVar m ()))
-> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
StrictTVar m a -> (a -> a) -> STM m ()
modifyTVar StrictTVar m (Map peer (ThreadId m, StrictTMVar m ()))
syncRegistry ((Map peer (ThreadId m, StrictTMVar m ())
  -> Map peer (ThreadId m, StrictTMVar m ()))
 -> STM m ())
-> (Map peer (ThreadId m, StrictTMVar m ())
    -> Map peer (ThreadId m, StrictTMVar m ()))
-> STM m ()
forall a b. (a -> b) -> a -> b
$ \Map peer (ThreadId m, StrictTMVar m ())
m ->
          Bool
-> Map peer (ThreadId m, StrictTMVar m ())
-> Map peer (ThreadId m, StrictTMVar m ())
forall a. HasCallStack => Bool -> a -> a
assert (peer
peer peer -> Map peer (ThreadId m, StrictTMVar m ()) -> Bool
forall k a. Ord k => k -> Map k a -> Bool
`Map.member` Map peer (ThreadId m, StrictTMVar m ())
m) (Map peer (ThreadId m, StrictTMVar m ())
 -> Map peer (ThreadId m, StrictTMVar m ()))
-> Map peer (ThreadId m, StrictTMVar m ())
-> Map peer (ThreadId m, StrictTMVar m ())
forall a b. (a -> b) -> a -> b
$
          peer
-> Map peer (ThreadId m, StrictTMVar m ())
-> Map peer (ThreadId m, StrictTMVar m ())
forall k a. Ord k => k -> Map k a -> Map k a
Map.delete peer
peer Map peer (ThreadId m, StrictTMVar m ())
m

bracketKeepAliveClient :: forall m a peer header block.
                              (MonadThrow m, MonadSTM m, MonadFork m,
                               MonadMask m, Ord peer)
                       => FetchClientRegistry peer header block m
                       -> peer
                       -> ((StrictTVar  m (Map peer PeerGSV)) -> m a)
                       -> m a
bracketKeepAliveClient :: FetchClientRegistry peer header block m
-> peer -> (StrictTVar m (Map peer PeerGSV) -> m a) -> m a
bracketKeepAliveClient(FetchClientRegistry StrictTMVar
  m
  (Tracer m (TraceLabelPeer peer (TraceFetchClientState header)),
   WhetherReceivingTentativeBlocks
   -> STM m (FetchClientPolicy header block m))
_ctxVar
                              StrictTVar m (Map peer (FetchClientStateVars m header))
_fetchRegistry StrictTVar m (Map peer (ThreadId m, StrictTMVar m ()))
_syncRegistry StrictTVar m (Map peer PeerGSV)
dqRegistry StrictTVar m (Map peer (ThreadId m, StrictTMVar m ()))
keepRegistry) peer
peer StrictTVar m (Map peer PeerGSV) -> m a
action = do
    m () -> m () -> m a -> m a
forall (m :: * -> *) a b c.
MonadThrow m =>
m a -> m b -> m c -> m c
bracket_ m ()
register m ()
unregister (StrictTVar m (Map peer PeerGSV) -> m a
action StrictTVar m (Map peer PeerGSV)
dqRegistry)
  where
    -- the keepAliveClient will register a PeerGSV and the block fetch client will wait on it.
    register :: m ()
    register :: m ()
register =
      STM m () -> m ()
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$
        StrictTVar m (Map peer PeerGSV)
-> (Map peer PeerGSV -> Map peer PeerGSV) -> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
StrictTVar m a -> (a -> a) -> STM m ()
modifyTVar StrictTVar m (Map peer PeerGSV)
dqRegistry ((Map peer PeerGSV -> Map peer PeerGSV) -> STM m ())
-> (Map peer PeerGSV -> Map peer PeerGSV) -> STM m ()
forall a b. (a -> b) -> a -> b
$ \Map peer PeerGSV
m ->
          Bool -> Map peer PeerGSV -> Map peer PeerGSV
forall a. HasCallStack => Bool -> a -> a
assert (peer
peer peer -> Map peer PeerGSV -> Bool
forall k a. Ord k => k -> Map k a -> Bool
`Map.notMember` Map peer PeerGSV
m) (Map peer PeerGSV -> Map peer PeerGSV)
-> Map peer PeerGSV -> Map peer PeerGSV
forall a b. (a -> b) -> a -> b
$
          peer -> PeerGSV -> Map peer PeerGSV -> Map peer PeerGSV
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert peer
peer PeerGSV
defaultGSV Map peer PeerGSV
m

    -- It is possible for the keepAlive client to keep running even without a fetch client, but
    -- a fetch client shouldn't run without a keepAlive client.
    unregister :: m ()
    unregister :: m ()
unregister = m () -> m ()
forall (m :: * -> *) a. MonadMask m => m a -> m a
uninterruptibleMask_ (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
      Maybe (ThreadId m, StrictTMVar m ())
fetchclient_m <- STM m (Maybe (ThreadId m, StrictTMVar m ()))
-> m (Maybe (ThreadId m, StrictTMVar m ()))
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m (Maybe (ThreadId m, StrictTMVar m ()))
 -> m (Maybe (ThreadId m, StrictTMVar m ())))
-> STM m (Maybe (ThreadId m, StrictTMVar m ()))
-> m (Maybe (ThreadId m, StrictTMVar m ()))
forall a b. (a -> b) -> a -> b
$ do
        Map peer (ThreadId m, StrictTMVar m ())
fetchclients <- StrictTVar m (Map peer (ThreadId m, StrictTMVar m ()))
-> STM m (Map peer (ThreadId m, StrictTMVar m ()))
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m (Map peer (ThreadId m, StrictTMVar m ()))
keepRegistry
        case peer
-> Map peer (ThreadId m, StrictTMVar m ())
-> Maybe (ThreadId m, StrictTMVar m ())
forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup peer
peer Map peer (ThreadId m, StrictTMVar m ())
fetchclients of
             Maybe (ThreadId m, StrictTMVar m ())
Nothing -> do
               -- If the fetch client is already dead we remove PeerGSV ourself directly.
               StrictTVar m (Map peer PeerGSV)
-> (Map peer PeerGSV -> Map peer PeerGSV) -> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
StrictTVar m a -> (a -> a) -> STM m ()
modifyTVar StrictTVar m (Map peer PeerGSV)
dqRegistry ((Map peer PeerGSV -> Map peer PeerGSV) -> STM m ())
-> (Map peer PeerGSV -> Map peer PeerGSV) -> STM m ()
forall a b. (a -> b) -> a -> b
$ \Map peer PeerGSV
m ->
                 Bool -> Map peer PeerGSV -> Map peer PeerGSV
forall a. HasCallStack => Bool -> a -> a
assert (peer
peer peer -> Map peer PeerGSV -> Bool
forall k a. Ord k => k -> Map k a -> Bool
`Map.member` Map peer PeerGSV
m) (Map peer PeerGSV -> Map peer PeerGSV)
-> Map peer PeerGSV -> Map peer PeerGSV
forall a b. (a -> b) -> a -> b
$
                 peer -> Map peer PeerGSV -> Map peer PeerGSV
forall k a. Ord k => k -> Map k a -> Map k a
Map.delete peer
peer Map peer PeerGSV
m
               Maybe (ThreadId m, StrictTMVar m ())
-> STM m (Maybe (ThreadId m, StrictTMVar m ()))
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe (ThreadId m, StrictTMVar m ())
forall a. Maybe a
Nothing
             Just (ThreadId m, StrictTMVar m ())
rc -> Maybe (ThreadId m, StrictTMVar m ())
-> STM m (Maybe (ThreadId m, StrictTMVar m ()))
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe (ThreadId m, StrictTMVar m ())
 -> STM m (Maybe (ThreadId m, StrictTMVar m ())))
-> Maybe (ThreadId m, StrictTMVar m ())
-> STM m (Maybe (ThreadId m, StrictTMVar m ()))
forall a b. (a -> b) -> a -> b
$ (ThreadId m, StrictTMVar m ())
-> Maybe (ThreadId m, StrictTMVar m ())
forall a. a -> Maybe a
Just (ThreadId m, StrictTMVar m ())
rc
      case Maybe (ThreadId m, StrictTMVar m ())
fetchclient_m of
           Maybe (ThreadId m, StrictTMVar m ())
Nothing -> () -> m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
           Just (ThreadId m
tid, StrictTMVar m ()
doneVar) -> do
             -- Cancel the fetch client.
             ThreadId m -> AsyncCancelled -> m ()
forall (m :: * -> *) e.
(MonadFork m, Exception e) =>
ThreadId m -> e -> m ()
throwTo ThreadId m
tid AsyncCancelled
AsyncCancelled
             STM m () -> m ()
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
               -- wait for fetch client to exit.
               StrictTMVar m () -> STM m ()
forall (m :: * -> *) a. MonadSTM m => StrictTMVar m a -> STM m a
readTMVar StrictTMVar m ()
doneVar
               StrictTVar m (Map peer PeerGSV)
-> (Map peer PeerGSV -> Map peer PeerGSV) -> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
StrictTVar m a -> (a -> a) -> STM m ()
modifyTVar StrictTVar m (Map peer PeerGSV)
dqRegistry ((Map peer PeerGSV -> Map peer PeerGSV) -> STM m ())
-> (Map peer PeerGSV -> Map peer PeerGSV) -> STM m ()
forall a b. (a -> b) -> a -> b
$ \Map peer PeerGSV
m ->
                 Bool -> Map peer PeerGSV -> Map peer PeerGSV
forall a. HasCallStack => Bool -> a -> a
assert (peer
peer peer -> Map peer PeerGSV -> Bool
forall k a. Ord k => k -> Map k a -> Bool
`Map.member` Map peer PeerGSV
m) (Map peer PeerGSV -> Map peer PeerGSV)
-> Map peer PeerGSV -> Map peer PeerGSV
forall a b. (a -> b) -> a -> b
$
                 peer -> Map peer PeerGSV -> Map peer PeerGSV
forall k a. Ord k => k -> Map k a -> Map k a
Map.delete peer
peer Map peer PeerGSV
m


setFetchClientContext :: MonadSTM m
                      => FetchClientRegistry peer header block m
                      -> Tracer m (TraceLabelPeer peer (TraceFetchClientState header))
                      -> (   WhetherReceivingTentativeBlocks
                          -> STM m (FetchClientPolicy header block m)
                         )
                      -> m ()
setFetchClientContext :: FetchClientRegistry peer header block m
-> Tracer m (TraceLabelPeer peer (TraceFetchClientState header))
-> (WhetherReceivingTentativeBlocks
    -> STM m (FetchClientPolicy header block m))
-> m ()
setFetchClientContext (FetchClientRegistry StrictTMVar
  m
  (Tracer m (TraceLabelPeer peer (TraceFetchClientState header)),
   WhetherReceivingTentativeBlocks
   -> STM m (FetchClientPolicy header block m))
ctxVar StrictTVar m (Map peer (FetchClientStateVars m header))
_ StrictTVar m (Map peer (ThreadId m, StrictTMVar m ()))
_ StrictTVar m (Map peer PeerGSV)
_ StrictTVar m (Map peer (ThreadId m, StrictTMVar m ()))
_) Tracer m (TraceLabelPeer peer (TraceFetchClientState header))
tracer WhetherReceivingTentativeBlocks
-> STM m (FetchClientPolicy header block m)
mkPolicy =
    STM m () -> m ()
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
      Bool
ok <- StrictTMVar
  m
  (Tracer m (TraceLabelPeer peer (TraceFetchClientState header)),
   WhetherReceivingTentativeBlocks
   -> STM m (FetchClientPolicy header block m))
-> (Tracer m (TraceLabelPeer peer (TraceFetchClientState header)),
    WhetherReceivingTentativeBlocks
    -> STM m (FetchClientPolicy header block m))
-> STM m Bool
forall (m :: * -> *) a.
MonadSTM m =>
StrictTMVar m a -> a -> STM m Bool
tryPutTMVar StrictTMVar
  m
  (Tracer m (TraceLabelPeer peer (TraceFetchClientState header)),
   WhetherReceivingTentativeBlocks
   -> STM m (FetchClientPolicy header block m))
ctxVar (Tracer m (TraceLabelPeer peer (TraceFetchClientState header))
tracer, WhetherReceivingTentativeBlocks
-> STM m (FetchClientPolicy header block m)
mkPolicy)
      Bool -> STM m () -> STM m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
ok (STM m () -> STM m ()) -> STM m () -> STM m ()
forall a b. (a -> b) -> a -> b
$ [Char] -> STM m ()
forall a. HasCallStack => [Char] -> a
error [Char]
"setFetchClientContext: called more than once"

-- | A read-only 'STM' action to get the current 'PeerFetchStatus' for all
-- fetch clients in the 'FetchClientRegistry'.
--
readFetchClientsStatus :: MonadSTM m
                       => FetchClientRegistry peer header block m
                       -> STM m (Map peer (PeerFetchStatus header))
readFetchClientsStatus :: FetchClientRegistry peer header block m
-> STM m (Map peer (PeerFetchStatus header))
readFetchClientsStatus (FetchClientRegistry StrictTMVar
  m
  (Tracer m (TraceLabelPeer peer (TraceFetchClientState header)),
   WhetherReceivingTentativeBlocks
   -> STM m (FetchClientPolicy header block m))
_ StrictTVar m (Map peer (FetchClientStateVars m header))
registry StrictTVar m (Map peer (ThreadId m, StrictTMVar m ()))
_ StrictTVar m (Map peer PeerGSV)
_ StrictTVar m (Map peer (ThreadId m, StrictTMVar m ()))
_) =
  StrictTVar m (Map peer (FetchClientStateVars m header))
-> STM m (Map peer (FetchClientStateVars m header))
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m (Map peer (FetchClientStateVars m header))
registry STM m (Map peer (FetchClientStateVars m header))
-> (Map peer (FetchClientStateVars m header)
    -> STM m (Map peer (PeerFetchStatus header)))
-> STM m (Map peer (PeerFetchStatus header))
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (FetchClientStateVars m header -> STM m (PeerFetchStatus header))
-> Map peer (FetchClientStateVars m header)
-> STM m (Map peer (PeerFetchStatus header))
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
traverse (StrictTVar m (PeerFetchStatus header)
-> STM m (PeerFetchStatus header)
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar (StrictTVar m (PeerFetchStatus header)
 -> STM m (PeerFetchStatus header))
-> (FetchClientStateVars m header
    -> StrictTVar m (PeerFetchStatus header))
-> FetchClientStateVars m header
-> STM m (PeerFetchStatus header)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. FetchClientStateVars m header
-> StrictTVar m (PeerFetchStatus header)
forall (m :: * -> *) header.
FetchClientStateVars m header
-> StrictTVar m (PeerFetchStatus header)
fetchClientStatusVar)

-- | A read-only 'STM' action to get the 'FetchClientStateVars' for all fetch
-- clients in the 'FetchClientRegistry'.
--
readFetchClientsStateVars :: MonadSTM m
                          => FetchClientRegistry peer header block m
                          -> STM m (Map peer (FetchClientStateVars m header))
readFetchClientsStateVars :: FetchClientRegistry peer header block m
-> STM m (Map peer (FetchClientStateVars m header))
readFetchClientsStateVars (FetchClientRegistry StrictTMVar
  m
  (Tracer m (TraceLabelPeer peer (TraceFetchClientState header)),
   WhetherReceivingTentativeBlocks
   -> STM m (FetchClientPolicy header block m))
_ StrictTVar m (Map peer (FetchClientStateVars m header))
registry StrictTVar m (Map peer (ThreadId m, StrictTMVar m ()))
_ StrictTVar m (Map peer PeerGSV)
_ StrictTVar m (Map peer (ThreadId m, StrictTMVar m ()))
_) = StrictTVar m (Map peer (FetchClientStateVars m header))
-> STM m (Map peer (FetchClientStateVars m header))
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m (Map peer (FetchClientStateVars m header))
registry

-- | A read-only 'STM' action to get the 'PeerGSV's for all fetch
-- clients in the 'FetchClientRegistry'.
--
readPeerGSVs :: MonadSTM m
             => FetchClientRegistry peer header block m
             -> STM m (Map peer PeerGSV)
readPeerGSVs :: FetchClientRegistry peer header block m -> STM m (Map peer PeerGSV)
readPeerGSVs (FetchClientRegistry StrictTMVar
  m
  (Tracer m (TraceLabelPeer peer (TraceFetchClientState header)),
   WhetherReceivingTentativeBlocks
   -> STM m (FetchClientPolicy header block m))
_ StrictTVar m (Map peer (FetchClientStateVars m header))
_ StrictTVar m (Map peer (ThreadId m, StrictTMVar m ()))
_ StrictTVar m (Map peer PeerGSV)
registry StrictTVar m (Map peer (ThreadId m, StrictTMVar m ()))
_) = StrictTVar m (Map peer PeerGSV) -> STM m (Map peer PeerGSV)
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m (Map peer PeerGSV)
registry