{-# LANGUAGE DeriveAnyClass      #-}
{-# LANGUAGE DeriveGeneric       #-}
{-# LANGUAGE FlexibleContexts    #-}
{-# LANGUAGE LambdaCase          #-}
{-# LANGUAGE NamedFieldPuns      #-}
{-# LANGUAGE PatternSynonyms     #-}
{-# LANGUAGE RankNTypes          #-}
{-# LANGUAGE RecordWildCards     #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE ViewPatterns        #-}

-- | Iterators
module Ouroboros.Consensus.Storage.ChainDB.Impl.Iterator (
    closeAllIterators
  , stream
    -- * Exported for testing purposes
  , IteratorEnv (..)
  , newIterator
  ) where

import           Control.Monad (unless, when)
import           Control.Monad.Except (ExceptT (..), catchError, lift,
                     runExceptT, throwError, withExceptT)
import           Control.Tracer
import           Data.Functor (($>))
import           Data.List.NonEmpty (NonEmpty)
import qualified Data.List.NonEmpty as NE
import           Data.Map.Strict (Map)
import qualified Data.Map.Strict as Map
import           Data.Maybe (isJust)
import           Data.Typeable (Typeable)
import           GHC.Generics (Generic)
import           GHC.Stack (HasCallStack)

import           Ouroboros.Consensus.Block
import           Ouroboros.Consensus.Util.IOLike
import           Ouroboros.Consensus.Util.ResourceRegistry (ResourceRegistry)

import           Ouroboros.Consensus.Storage.ChainDB.API (BlockComponent (..),
                     ChainDbError (..), Iterator (..), IteratorResult (..),
                     StreamFrom (..), StreamTo (..), UnknownRange (..),
                     getPoint, validBounds)

import           Ouroboros.Consensus.Storage.ChainDB.Impl.Paths (Path (..),
                     computePath)
import           Ouroboros.Consensus.Storage.ChainDB.Impl.Types
import           Ouroboros.Consensus.Storage.ImmutableDB (ImmutableDB)
import qualified Ouroboros.Consensus.Storage.ImmutableDB as ImmutableDB
import           Ouroboros.Consensus.Storage.VolatileDB (VolatileDB)
import qualified Ouroboros.Consensus.Storage.VolatileDB as VolatileDB

-- | Stream blocks
--
-- = Start & end point
--
-- The start point can either be in the ImmutableDB (on our chain) or in the
-- VolatileDB (on our chain or on a recent fork). We first check whether it is
-- in the VolatileDB, if not, we check if it is in the ImmutableDB (see
-- \"Garbage collection\" for why this order is important). Similarly for the
-- end point.
--
-- If a bound can't be found in the ChainDB, an 'UnknownRange' error is
-- returned.
--
-- When the bounds are nonsensical, e.g.,
-- > StreamFromExclusive (Point     (SlotNo 3) _)
-- > StreamToInclusive   (RealPoint (SlotNo 3) _)
-- An 'InvalidIteratorRange' exception is thrown.
--
-- = Paths of blocks
--
-- To stream blocks from the ImmutableDB we can simply use the iterators
-- offered by the ImmutableDB.
--
-- To stream blocks from the VolatileDB we have to construct a path of points
-- backwards through the VolatileDB, starting from the end point using
-- 'getPredecessor' until we get to the start point, genesis, or we get to a
-- block that is not in the VolatileDB. Then, for each point in the path, we
-- can ask the VolatileDB for the corresponding block.
--
-- If the path through the VolatileDB is incomplete, we will first have to
-- stream blocks from the ImmutableDB and then switch to the path through the
-- VolatileDB. We only allow the tip of the ImmutableDB to be the switchover
-- point between the two DBs. In other words, the incomplete path through the
-- VolatileDB must fit onto the tip of the ImmutableDB. This must be true at
-- the time of initialising the iterator, but does not have to hold during the
-- whole lifetime of the iterator. If it doesn't fit on it, it means the path
-- forked off more than @k@ blocks in the past and blocks belonging to it are
-- more likely to go missing because of garbage-collection (see the next
-- paragraph). In that case, we return 'ForkTooOld'.
--
-- = Garbage collection
--
-- We have to be careful about the following: as our chain grows, blocks from
-- our chain will be copied to the ImmutableDB in the background. After a
-- while, old blocks will be garbage-collected from the VolatileDB. Blocks
-- that were part of the current chain will be in the ImmutableDB, but blocks
-- that only lived on forks will be gone forever.
--
-- This means that blocks that were part of the VolatileDB when the iterator
-- was initialised might no longer be part of the VolatileDB when we come to
-- the point that the iterator will try to read them. When this is noticed, we
-- will try to open an iterator from the ImmutableDB to obtain the blocks that
-- have moved over. However, this will only work if they were and are part of
-- the current chain, otherwise they will have been deleted from the
-- VolatileDB without being copied to the ImmutableDB.
--
-- This iterator is opened with an open upper bound and will be used to stream
-- blocks until the path has been fully streamed, the iterator is exhausted,
-- or a block doesn't match the expected point. In the latter two cases, we
-- switch back to the VolatileDB. If the block is missing from the VolatileDB,
-- we will switch back to streaming from the ImmutableDB. If that fails, we
-- switch back to the VolatileDB. To avoid eternally switching between the two
-- DBs, we only switch back to the VolatileDB if the stream from the
-- ImmutableDB has made progress, i.e. streamed at least one block with the
-- expected point. If no block was streamed from the ImmutableDB, not even the
-- first one, we know for sure that that block isn't part of the VolatileDB
-- (the reason we switch to the ImmutableDB) and isn't part of the ImmutableDB
-- (no block was streamed). In that case, we return 'IteratorBlockGCed' and
-- stop the stream.
--
-- Note that the open upper bound doesn't allow us to include blocks in the
-- stream that are copied to the ImmutableDB after opening this iterator, as
-- the bound of the iterator is fixed upon initialisation. These newly added
-- blocks will be included in the stream because we will repeatedly open new
-- ImmutableDB iterators (as long as we make progress).
--
-- = Bounds checking
--
-- The VolatileDB is hash-based instead of point-based. While the bounds of a
-- stream are /point/s, we can simply check whether the hashes of the bounds
-- match the hashes stored in the points.
--
-- The ImmutableDB is slot-based instead of point-based, which means that
-- before we know whether a block in the ImmutableDB matches a given point, we
-- must first read the block's hash corresponding to the point's slot from the
-- (cached) on-disk indices, after which we can then verify whether it matches
-- the hash of the point. This is important for the start and end bounds (both
-- points) of a stream in case they are in the ImmutableDB (i.e., their slots
-- are <= the tip of the ImmutableDB): we must first read the hashes
-- corresponding to the bounds from the (cached) on-disk indices to be sure
-- the range is valid. Note that these reads happen before the first call to
-- 'iteratorNext'.
--
-- Note that when streaming to an /exclusive/ bound, the block corresponding
-- to that bound ('Point') must exist in the ChainDB.
--
-- The ImmutableDB will keep the on-disk indices of a chunk of blocks in
-- memory after the first read so that the next lookup doesn't have to read
-- from disk. When both bounds are in the same chunk, which will typically be
-- the case, only checking the first bound will require disk reads, the second
-- will be cached.
--
-- = Costs
--
-- Opening an iterator has some costs:
--
-- * When blocks have to be streamed from the ImmutableDB: as discussed in
--   \"Bounds checking\", the hashes corresponding to the bounds have to be
--   read from the (cached) on-disk indices.
--
-- * When blocks have to be streamed both from the ImmutableDB and the
--   VolatileDB, only the hash of the block corresponding to the lower bound
--   will have to be read from the ImmutableDB upfront, as described in the
--   previous bullet point. Note that the hash of the block corresponding to
--   the upper bound does not have to be read from disk, since it will be in
--   the VolatileDB, which means that we know its hash already from the
--   in-memory index.
--
-- In summary:
--
-- * Only streaming from the VolatileDB: 0 (cached) reads from disk upfront.
-- * Only streaming from the ImmutableDB: 2 (cached) reads from disk upfront.
-- * Streaming from both the ImmutableDB and the VolatileDB: 1 (cached) read
--   from disk upfront.
--
-- Additionally, when we notice during streaming that a block is no longer in
-- the VolatileDB, we try to see whether it can be streamed from the ImmutableDB
-- instead. Opening such an iterator costs 2 (cached) reads from disk upfront.
-- This can happen multiple times.
stream
  :: forall m blk b.
     ( IOLike m
     , HasHeader blk
     , HasCallStack
     )
  => ChainDbHandle m blk
  -> ResourceRegistry m
  -> BlockComponent blk b
  -> StreamFrom blk
  -> StreamTo blk
  -> m (Either (UnknownRange blk) (Iterator m blk b))
stream :: ChainDbHandle m blk
-> ResourceRegistry m
-> BlockComponent blk b
-> StreamFrom blk
-> StreamTo blk
-> m (Either (UnknownRange blk) (Iterator m blk b))
stream ChainDbHandle m blk
h ResourceRegistry m
registry BlockComponent blk b
blockComponent StreamFrom blk
from StreamTo blk
to = ChainDbHandle m blk
-> (ChainDbEnv m blk
    -> m (Either (UnknownRange blk) (Iterator m blk b)))
-> m (Either (UnknownRange blk) (Iterator m blk b))
forall (m :: * -> *) blk r.
(IOLike m, HasCallStack, HasHeader blk) =>
ChainDbHandle m blk -> (ChainDbEnv m blk -> m r) -> m r
getEnv ChainDbHandle m blk
h ((ChainDbEnv m blk
  -> m (Either (UnknownRange blk) (Iterator m blk b)))
 -> m (Either (UnknownRange blk) (Iterator m blk b)))
-> (ChainDbEnv m blk
    -> m (Either (UnknownRange blk) (Iterator m blk b)))
-> m (Either (UnknownRange blk) (Iterator m blk b))
forall a b. (a -> b) -> a -> b
$ \ChainDbEnv m blk
cdb ->
    IteratorEnv m blk
-> (forall r. (IteratorEnv m blk -> m r) -> m r)
-> ResourceRegistry m
-> BlockComponent blk b
-> StreamFrom blk
-> StreamTo blk
-> m (Either (UnknownRange blk) (Iterator m blk b))
forall (m :: * -> *) blk b.
(IOLike m, HasHeader blk, HasCallStack) =>
IteratorEnv m blk
-> (forall r. (IteratorEnv m blk -> m r) -> m r)
-> ResourceRegistry m
-> BlockComponent blk b
-> StreamFrom blk
-> StreamTo blk
-> m (Either (UnknownRange blk) (Iterator m blk b))
newIterator (ChainDbEnv m blk -> IteratorEnv m blk
forall (m :: * -> *) blk. ChainDbEnv m blk -> IteratorEnv m blk
fromChainDbEnv ChainDbEnv m blk
cdb) forall r. (IteratorEnv m blk -> m r) -> m r
getItEnv ResourceRegistry m
registry BlockComponent blk b
blockComponent StreamFrom blk
from StreamTo blk
to
  where
    getItEnv :: forall r. (IteratorEnv m blk -> m r) -> m r
    getItEnv :: (IteratorEnv m blk -> m r) -> m r
getItEnv IteratorEnv m blk -> m r
f = ChainDbHandle m blk -> (ChainDbEnv m blk -> m r) -> m r
forall (m :: * -> *) blk r.
(IOLike m, HasCallStack, HasHeader blk) =>
ChainDbHandle m blk -> (ChainDbEnv m blk -> m r) -> m r
getEnv ChainDbHandle m blk
h (IteratorEnv m blk -> m r
f (IteratorEnv m blk -> m r)
-> (ChainDbEnv m blk -> IteratorEnv m blk)
-> ChainDbEnv m blk
-> m r
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ChainDbEnv m blk -> IteratorEnv m blk
forall (m :: * -> *) blk. ChainDbEnv m blk -> IteratorEnv m blk
fromChainDbEnv)

{-------------------------------------------------------------------------------
  Iterator environment
-------------------------------------------------------------------------------}

-- | Environment containing everything needed to implement iterators.
--
-- The main purpose of bundling these things in a separate record is to make
-- it easier to test this code: no need to set up a whole ChainDB, just
-- provide this record.
data IteratorEnv m blk = IteratorEnv {
      IteratorEnv m blk -> ImmutableDB m blk
itImmutableDB     :: ImmutableDB m blk
    , IteratorEnv m blk -> VolatileDB m blk
itVolatileDB      :: VolatileDB m blk
    , IteratorEnv m blk -> StrictTVar m (Map IteratorKey (m ()))
itIterators       :: StrictTVar m (Map IteratorKey (m ()))
    , IteratorEnv m blk -> StrictTVar m IteratorKey
itNextIteratorKey :: StrictTVar m IteratorKey
    , IteratorEnv m blk -> Tracer m (TraceIteratorEvent blk)
itTracer          :: Tracer m (TraceIteratorEvent blk)
    }

-- | Obtain an 'IteratorEnv' from a 'ChainDbEnv'.
fromChainDbEnv :: ChainDbEnv m blk -> IteratorEnv m blk
fromChainDbEnv :: ChainDbEnv m blk -> IteratorEnv m blk
fromChainDbEnv CDB{Tracer m (LedgerDB' blk)
Tracer m (TraceEvent blk)
DiffTime
StrictTVar m (m ())
StrictTVar m (FutureBlocks m blk)
StrictTVar m (Map FollowerKey (FollowerHandle m blk))
StrictTVar m (Map IteratorKey (m ()))
StrictTVar m (AnchoredFragment (Header blk))
StrictTVar m (StrictMaybe (Header blk))
StrictTVar m (WithFingerprint (InvalidBlocks blk))
StrictTVar m (TentativeState blk)
StrictTVar m FollowerKey
StrictTVar m IteratorKey
TopLevelConfig blk
StrictMVar m ()
VolatileDB m blk
ChunkInfo
ResourceRegistry m
ImmutableDB m blk
CheckInFuture m blk
LgrDB m blk
BlocksToAdd m blk
blk -> Bool
cdbFutureBlocks :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m (FutureBlocks m blk)
cdbBlocksToAdd :: forall (m :: * -> *) blk. ChainDbEnv m blk -> BlocksToAdd m blk
cdbCheckInFuture :: forall (m :: * -> *) blk. ChainDbEnv m blk -> CheckInFuture m blk
cdbCheckIntegrity :: forall (m :: * -> *) blk. ChainDbEnv m blk -> blk -> Bool
cdbChunkInfo :: forall (m :: * -> *) blk. ChainDbEnv m blk -> ChunkInfo
cdbKillBgThreads :: forall (m :: * -> *) blk. ChainDbEnv m blk -> StrictTVar m (m ())
cdbGcInterval :: forall (m :: * -> *) blk. ChainDbEnv m blk -> DiffTime
cdbGcDelay :: forall (m :: * -> *) blk. ChainDbEnv m blk -> DiffTime
cdbRegistry :: forall (m :: * -> *) blk. ChainDbEnv m blk -> ResourceRegistry m
cdbTraceLedger :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> Tracer m (LedgerDB' blk)
cdbTracer :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> Tracer m (TraceEvent blk)
cdbCopyLock :: forall (m :: * -> *) blk. ChainDbEnv m blk -> StrictMVar m ()
cdbNextFollowerKey :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m FollowerKey
cdbNextIteratorKey :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m IteratorKey
cdbInvalid :: forall (m :: * -> *) blk.
ChainDbEnv m blk
-> StrictTVar m (WithFingerprint (InvalidBlocks blk))
cdbTopLevelConfig :: forall (m :: * -> *) blk. ChainDbEnv m blk -> TopLevelConfig blk
cdbFollowers :: forall (m :: * -> *) blk.
ChainDbEnv m blk
-> StrictTVar m (Map FollowerKey (FollowerHandle m blk))
cdbIterators :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m (Map IteratorKey (m ()))
cdbTentativeHeader :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m (StrictMaybe (Header blk))
cdbTentativeState :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m (TentativeState blk)
cdbChain :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m (AnchoredFragment (Header blk))
cdbLgrDB :: forall (m :: * -> *) blk. ChainDbEnv m blk -> LgrDB m blk
cdbVolatileDB :: forall (m :: * -> *) blk. ChainDbEnv m blk -> VolatileDB m blk
cdbImmutableDB :: forall (m :: * -> *) blk. ChainDbEnv m blk -> ImmutableDB m blk
cdbFutureBlocks :: StrictTVar m (FutureBlocks m blk)
cdbBlocksToAdd :: BlocksToAdd m blk
cdbCheckInFuture :: CheckInFuture m blk
cdbCheckIntegrity :: blk -> Bool
cdbChunkInfo :: ChunkInfo
cdbKillBgThreads :: StrictTVar m (m ())
cdbGcInterval :: DiffTime
cdbGcDelay :: DiffTime
cdbRegistry :: ResourceRegistry m
cdbTraceLedger :: Tracer m (LedgerDB' blk)
cdbTracer :: Tracer m (TraceEvent blk)
cdbCopyLock :: StrictMVar m ()
cdbNextFollowerKey :: StrictTVar m FollowerKey
cdbNextIteratorKey :: StrictTVar m IteratorKey
cdbInvalid :: StrictTVar m (WithFingerprint (InvalidBlocks blk))
cdbTopLevelConfig :: TopLevelConfig blk
cdbFollowers :: StrictTVar m (Map FollowerKey (FollowerHandle m blk))
cdbIterators :: StrictTVar m (Map IteratorKey (m ()))
cdbTentativeHeader :: StrictTVar m (StrictMaybe (Header blk))
cdbTentativeState :: StrictTVar m (TentativeState blk)
cdbChain :: StrictTVar m (AnchoredFragment (Header blk))
cdbLgrDB :: LgrDB m blk
cdbVolatileDB :: VolatileDB m blk
cdbImmutableDB :: ImmutableDB m blk
..} = IteratorEnv :: forall (m :: * -> *) blk.
ImmutableDB m blk
-> VolatileDB m blk
-> StrictTVar m (Map IteratorKey (m ()))
-> StrictTVar m IteratorKey
-> Tracer m (TraceIteratorEvent blk)
-> IteratorEnv m blk
IteratorEnv {
      itImmutableDB :: ImmutableDB m blk
itImmutableDB     = ImmutableDB m blk
cdbImmutableDB
    , itVolatileDB :: VolatileDB m blk
itVolatileDB      = VolatileDB m blk
cdbVolatileDB
    , itIterators :: StrictTVar m (Map IteratorKey (m ()))
itIterators       = StrictTVar m (Map IteratorKey (m ()))
cdbIterators
    , itNextIteratorKey :: StrictTVar m IteratorKey
itNextIteratorKey = StrictTVar m IteratorKey
cdbNextIteratorKey
    , itTracer :: Tracer m (TraceIteratorEvent blk)
itTracer          = (TraceIteratorEvent blk -> TraceEvent blk)
-> Tracer m (TraceEvent blk) -> Tracer m (TraceIteratorEvent blk)
forall (f :: * -> *) a b. Contravariant f => (a -> b) -> f b -> f a
contramap TraceIteratorEvent blk -> TraceEvent blk
forall blk. TraceIteratorEvent blk -> TraceEvent blk
TraceIteratorEvent Tracer m (TraceEvent blk)
cdbTracer
    }

-- | See 'stream'.
newIterator ::
     forall m blk b. (IOLike m, HasHeader blk, HasCallStack)
  => IteratorEnv m blk
  -> (forall r. (IteratorEnv m blk -> m r) -> m r)
     -- ^ Function with which the operations on the returned iterator should
     -- obtain their 'IteratorEnv'. This function should check whether the
     -- ChainDB is still open or throw an exception otherwise. This makes sure
     -- that when we call 'iteratorNext', we first check whether the ChainDB
     -- is still open.
  -> ResourceRegistry m
  -> BlockComponent blk b
  -> StreamFrom blk
  -> StreamTo blk
  -> m (Either (UnknownRange blk) (Iterator m blk b))
newIterator :: IteratorEnv m blk
-> (forall r. (IteratorEnv m blk -> m r) -> m r)
-> ResourceRegistry m
-> BlockComponent blk b
-> StreamFrom blk
-> StreamTo blk
-> m (Either (UnknownRange blk) (Iterator m blk b))
newIterator itEnv :: IteratorEnv m blk
itEnv@IteratorEnv{Tracer m (TraceIteratorEvent blk)
StrictTVar m (Map IteratorKey (m ()))
StrictTVar m IteratorKey
VolatileDB m blk
ImmutableDB m blk
itTracer :: Tracer m (TraceIteratorEvent blk)
itNextIteratorKey :: StrictTVar m IteratorKey
itIterators :: StrictTVar m (Map IteratorKey (m ()))
itVolatileDB :: VolatileDB m blk
itImmutableDB :: ImmutableDB m blk
itTracer :: forall (m :: * -> *) blk.
IteratorEnv m blk -> Tracer m (TraceIteratorEvent blk)
itNextIteratorKey :: forall (m :: * -> *) blk.
IteratorEnv m blk -> StrictTVar m IteratorKey
itIterators :: forall (m :: * -> *) blk.
IteratorEnv m blk -> StrictTVar m (Map IteratorKey (m ()))
itVolatileDB :: forall (m :: * -> *) blk. IteratorEnv m blk -> VolatileDB m blk
itImmutableDB :: forall (m :: * -> *) blk. IteratorEnv m blk -> ImmutableDB m blk
..} forall r. (IteratorEnv m blk -> m r) -> m r
getItEnv ResourceRegistry m
registry BlockComponent blk b
blockComponent StreamFrom blk
from StreamTo blk
to = do
    Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (StreamFrom blk -> StreamTo blk -> Bool
forall blk.
StandardHash blk =>
StreamFrom blk -> StreamTo blk -> Bool
validBounds StreamFrom blk
from StreamTo blk
to) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
      ChainDbError blk -> m ()
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO (ChainDbError blk -> m ()) -> ChainDbError blk -> m ()
forall a b. (a -> b) -> a -> b
$ StreamFrom blk -> StreamTo blk -> ChainDbError blk
forall blk. StreamFrom blk -> StreamTo blk -> ChainDbError blk
InvalidIteratorRange StreamFrom blk
from StreamTo blk
to
    Either (UnknownRange blk) (Iterator m blk b)
res <- ExceptT (UnknownRange blk) m (Iterator m blk b)
-> m (Either (UnknownRange blk) (Iterator m blk b))
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT ExceptT (UnknownRange blk) m (Iterator m blk b)
HasCallStack => ExceptT (UnknownRange blk) m (Iterator m blk b)
start
    case Either (UnknownRange blk) (Iterator m blk b)
res of
      Left UnknownRange blk
e -> TraceIteratorEvent blk -> m ()
trace (TraceIteratorEvent blk -> m ()) -> TraceIteratorEvent blk -> m ()
forall a b. (a -> b) -> a -> b
$ UnknownRange blk -> TraceIteratorEvent blk
forall blk. UnknownRange blk -> TraceIteratorEvent blk
UnknownRangeRequested UnknownRange blk
e
      Either (UnknownRange blk) (Iterator m blk b)
_      -> () -> m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
    Either (UnknownRange blk) (Iterator m blk b)
-> m (Either (UnknownRange blk) (Iterator m blk b))
forall (m :: * -> *) a. Monad m => a -> m a
return Either (UnknownRange blk) (Iterator m blk b)
res
  where
    trace :: TraceIteratorEvent blk -> m ()
trace = Tracer m (TraceIteratorEvent blk) -> TraceIteratorEvent blk -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (TraceIteratorEvent blk)
itTracer

    endPoint :: RealPoint blk
    endPoint :: RealPoint blk
endPoint = case StreamTo blk
to of
      StreamToInclusive RealPoint blk
pt -> RealPoint blk
pt

    -- | Use the tip of the ImmutableDB to determine whether to look directly
    -- in the ImmutableDB (the range is <= the tip) or first try the
    -- VolatileDB (in the other cases).
    start :: HasCallStack
          => ExceptT (UnknownRange blk) m (Iterator m blk b)
    start :: ExceptT (UnknownRange blk) m (Iterator m blk b)
start = m (WithOrigin (Tip blk))
-> ExceptT (UnknownRange blk) m (WithOrigin (Tip blk))
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (STM m (WithOrigin (Tip blk)) -> m (WithOrigin (Tip blk))
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (ImmutableDB m blk -> STM m (WithOrigin (Tip blk))
forall (m :: * -> *) blk.
HasCallStack =>
ImmutableDB m blk -> STM m (WithOrigin (Tip blk))
ImmutableDB.getTip ImmutableDB m blk
itImmutableDB)) ExceptT (UnknownRange blk) m (WithOrigin (Tip blk))
-> (WithOrigin (Tip blk)
    -> ExceptT (UnknownRange blk) m (Iterator m blk b))
-> ExceptT (UnknownRange blk) m (Iterator m blk b)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
      WithOrigin (Tip blk)
Origin -> ExceptT (UnknownRange blk) m (Iterator m blk b)
HasCallStack => ExceptT (UnknownRange blk) m (Iterator m blk b)
findPathInVolatileDB
      NotOrigin ImmutableDB.Tip { SlotNo
tipSlotNo :: forall blk. Tip blk -> SlotNo
tipSlotNo :: SlotNo
tipSlotNo, HeaderHash blk
tipHash :: forall blk. Tip blk -> HeaderHash blk
tipHash :: HeaderHash blk
tipHash, IsEBB
tipIsEBB :: forall blk. Tip blk -> IsEBB
tipIsEBB :: IsEBB
tipIsEBB } ->
        case RealPoint blk -> SlotNo
forall blk. RealPoint blk -> SlotNo
realPointSlot RealPoint blk
endPoint SlotNo -> SlotNo -> Ordering
forall a. Ord a => a -> a -> Ordering
`compare` SlotNo
tipSlotNo of
          -- The end point is < the tip of the ImmutableDB
          Ordering
LT -> ExceptT (UnknownRange blk) m (Iterator m blk b)
streamFromImmutableDB

          Ordering
EQ | RealPoint blk -> HeaderHash blk
forall blk. RealPoint blk -> HeaderHash blk
realPointHash RealPoint blk
endPoint HeaderHash blk -> HeaderHash blk -> Bool
forall a. Eq a => a -> a -> Bool
== HeaderHash blk
tipHash
                -- The end point == the tip of the ImmutableDB
             -> ExceptT (UnknownRange blk) m (Iterator m blk b)
streamFromImmutableDB

             -- The end point /= the tip of the ImmutableDB.
             --
             -- The end point can be a regular block or EBB. So can the tip of
             -- the ImmutableDB. We distinguish the following for cases where
             -- each block and EBB has the same slot number, and a block or
             -- EBB /not/ on the current chain is indicated with a '.
             --
             -- 1. ImmutableDB: .. :> EBB :> B
             --    end point: B'
             --    desired outcome: ForkTooOld
             --
             -- 2. ImmutableDB: .. :> EBB :> B
             --    end point: EBB'
             --    desired outcome: ForkTooOld
             --
             -- 3. ImmutableDB: .. :> EBB :> B
             --    end point: EBB
             --    desired outcome: stream from ImmutableDB
             --
             -- 4. ImmutableDB: .. :> EBB
             --    end point: B
             --    desired outcome: find path in the VolatileDB
             --
             -- 5. ImmutableDB: .. :> EBB
             --    end point: B'
             --    desired outcome: ForkTooOld
             --
             -- 6. ImmutableDB: .. :> EBB
             --    end point: EBB'
             --    desired outcome: ForkTooOld
             --
             -- We don't know upfront whether the given end point refers to a
             -- block or EBB nor whether it is part of the current chain or
             -- not. This means we don't know yet with which case we are
             -- dealing. The only thing we know for sure, is whether the
             -- ImmutableDB tip ends with a regular block (1-3) or an EBB
             -- (4-6).

             | IsEBB
IsNotEBB <- IsEBB
tipIsEBB  -- Cases 1-3
             -> ExceptT (UnknownRange blk) m (Iterator m blk b)
streamFromImmutableDB ExceptT (UnknownRange blk) m (Iterator m blk b)
-> (UnknownRange blk
    -> ExceptT (UnknownRange blk) m (Iterator m blk b))
-> ExceptT (UnknownRange blk) m (Iterator m blk b)
forall e (m :: * -> *) a.
MonadError e m =>
m a -> (e -> m a) -> m a
`catchError`
                -- We also use 'streamFromImmutableDB' to check whether the
                -- block or EBB is in the ImmutableDB. If that's not the case,
                -- 'streamFromImmutableDB' will return 'MissingBlock'. Instead
                -- of returning that, we should return 'ForkTooOld', which is
                -- more correct.
                ExceptT (UnknownRange blk) m (Iterator m blk b)
-> UnknownRange blk
-> ExceptT (UnknownRange blk) m (Iterator m blk b)
forall a b. a -> b -> a
const (UnknownRange blk -> ExceptT (UnknownRange blk) m (Iterator m blk b)
forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError (UnknownRange blk
 -> ExceptT (UnknownRange blk) m (Iterator m blk b))
-> UnknownRange blk
-> ExceptT (UnknownRange blk) m (Iterator m blk b)
forall a b. (a -> b) -> a -> b
$ StreamFrom blk -> UnknownRange blk
forall blk. StreamFrom blk -> UnknownRange blk
ForkTooOld StreamFrom blk
from)
             | Bool
otherwise  -- Cases 4-6
             -> ExceptT (UnknownRange blk) m (Iterator m blk b)
HasCallStack => ExceptT (UnknownRange blk) m (Iterator m blk b)
findPathInVolatileDB

          -- The end point is > the tip of the ImmutableDB
          Ordering
GT -> ExceptT (UnknownRange blk) m (Iterator m blk b)
HasCallStack => ExceptT (UnknownRange blk) m (Iterator m blk b)
findPathInVolatileDB

    -- | PRECONDITION: the upper bound >= the tip of the ImmutableDB.
    -- Greater or /equal/, because of EBBs :(
    findPathInVolatileDB ::
         HasCallStack => ExceptT (UnknownRange blk) m (Iterator m blk b)
    findPathInVolatileDB :: ExceptT (UnknownRange blk) m (Iterator m blk b)
findPathInVolatileDB = do
      Path blk
path <- m (Path blk) -> ExceptT (UnknownRange blk) m (Path blk)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (m (Path blk) -> ExceptT (UnknownRange blk) m (Path blk))
-> m (Path blk) -> ExceptT (UnknownRange blk) m (Path blk)
forall a b. (a -> b) -> a -> b
$ VolatileDB m blk -> StreamFrom blk -> StreamTo blk -> m (Path blk)
forall (m :: * -> *) blk.
(IOLike m, HasHeader blk) =>
VolatileDB m blk -> StreamFrom blk -> StreamTo blk -> m (Path blk)
computePathVolatileDB VolatileDB m blk
itVolatileDB StreamFrom blk
from StreamTo blk
to
      case Path blk
path of
        NotInVolatileDB        RealPoint blk
_hash        -> UnknownRange blk -> ExceptT (UnknownRange blk) m (Iterator m blk b)
forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError (UnknownRange blk
 -> ExceptT (UnknownRange blk) m (Iterator m blk b))
-> UnknownRange blk
-> ExceptT (UnknownRange blk) m (Iterator m blk b)
forall a b. (a -> b) -> a -> b
$ StreamFrom blk -> UnknownRange blk
forall blk. StreamFrom blk -> UnknownRange blk
ForkTooOld StreamFrom blk
from
        PartiallyInVolatileDB  HeaderHash blk
predHash [RealPoint blk]
pts -> HasCallStack =>
HeaderHash blk
-> [RealPoint blk]
-> ExceptT (UnknownRange blk) m (Iterator m blk b)
HeaderHash blk
-> [RealPoint blk]
-> ExceptT (UnknownRange blk) m (Iterator m blk b)
streamFromBoth HeaderHash blk
predHash [RealPoint blk]
pts
        CompletelyInVolatileDB [RealPoint blk]
pts          -> case [RealPoint blk] -> Maybe (NonEmpty (RealPoint blk))
forall a. [a] -> Maybe (NonEmpty a)
NE.nonEmpty [RealPoint blk]
pts of
          Just NonEmpty (RealPoint blk)
pts' -> m (Iterator m blk b)
-> ExceptT (UnknownRange blk) m (Iterator m blk b)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (m (Iterator m blk b)
 -> ExceptT (UnknownRange blk) m (Iterator m blk b))
-> m (Iterator m blk b)
-> ExceptT (UnknownRange blk) m (Iterator m blk b)
forall a b. (a -> b) -> a -> b
$ NonEmpty (RealPoint blk) -> m (Iterator m blk b)
streamFromVolatileDB NonEmpty (RealPoint blk)
pts'
          Maybe (NonEmpty (RealPoint blk))
Nothing   -> m (Iterator m blk b)
-> ExceptT (UnknownRange blk) m (Iterator m blk b)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (m (Iterator m blk b)
 -> ExceptT (UnknownRange blk) m (Iterator m blk b))
-> m (Iterator m blk b)
-> ExceptT (UnknownRange blk) m (Iterator m blk b)
forall a b. (a -> b) -> a -> b
$ m (Iterator m blk b)
emptyIterator

    streamFromVolatileDB :: NonEmpty (RealPoint blk) -> m (Iterator m blk b)
    streamFromVolatileDB :: NonEmpty (RealPoint blk) -> m (Iterator m blk b)
streamFromVolatileDB NonEmpty (RealPoint blk)
pts = do
      TraceIteratorEvent blk -> m ()
trace (TraceIteratorEvent blk -> m ()) -> TraceIteratorEvent blk -> m ()
forall a b. (a -> b) -> a -> b
$ StreamFrom blk
-> StreamTo blk -> [RealPoint blk] -> TraceIteratorEvent blk
forall blk.
StreamFrom blk
-> StreamTo blk -> [RealPoint blk] -> TraceIteratorEvent blk
StreamFromVolatileDB StreamFrom blk
from StreamTo blk
to (NonEmpty (RealPoint blk) -> [RealPoint blk]
forall a. NonEmpty a -> [a]
NE.toList NonEmpty (RealPoint blk)
pts)
      IteratorState m blk b -> m (Iterator m blk b)
createIterator (IteratorState m blk b -> m (Iterator m blk b))
-> IteratorState m blk b -> m (Iterator m blk b)
forall a b. (a -> b) -> a -> b
$ StreamFrom blk -> NonEmpty (RealPoint blk) -> IteratorState m blk b
forall (m :: * -> *) blk b.
StreamFrom blk -> NonEmpty (RealPoint blk) -> IteratorState m blk b
InVolatileDB StreamFrom blk
from NonEmpty (RealPoint blk)
pts

    streamFromImmutableDB :: ExceptT (UnknownRange blk) m (Iterator m blk b)
    streamFromImmutableDB :: ExceptT (UnknownRange blk) m (Iterator m blk b)
streamFromImmutableDB = do
      m () -> ExceptT (UnknownRange blk) m ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (m () -> ExceptT (UnknownRange blk) m ())
-> m () -> ExceptT (UnknownRange blk) m ()
forall a b. (a -> b) -> a -> b
$ TraceIteratorEvent blk -> m ()
trace (TraceIteratorEvent blk -> m ()) -> TraceIteratorEvent blk -> m ()
forall a b. (a -> b) -> a -> b
$ StreamFrom blk -> StreamTo blk -> TraceIteratorEvent blk
forall blk.
StreamFrom blk -> StreamTo blk -> TraceIteratorEvent blk
StreamFromImmutableDB StreamFrom blk
from StreamTo blk
to
      StreamTo blk -> ExceptT (UnknownRange blk) m (Iterator m blk b)
streamFromImmutableDBHelper StreamTo blk
to

    streamFromImmutableDBHelper ::
         StreamTo blk
      -> ExceptT (UnknownRange blk) m (Iterator m blk b)
    streamFromImmutableDBHelper :: StreamTo blk -> ExceptT (UnknownRange blk) m (Iterator m blk b)
streamFromImmutableDBHelper StreamTo blk
to' = do
        -- 'ImmutableDB.stream' will check the hash of the block at the
        -- start and end bounds.
        Iterator m blk (Point blk, b)
immIt <-
          (MissingBlock blk -> UnknownRange blk)
-> ExceptT (MissingBlock blk) m (Iterator m blk (Point blk, b))
-> ExceptT (UnknownRange blk) m (Iterator m blk (Point blk, b))
forall (m :: * -> *) e e' a.
Functor m =>
(e -> e') -> ExceptT e m a -> ExceptT e' m a
withExceptT MissingBlock blk -> UnknownRange blk
forall blk. MissingBlock blk -> UnknownRange blk
missingBlockToUnknownRange (ExceptT (MissingBlock blk) m (Iterator m blk (Point blk, b))
 -> ExceptT (UnknownRange blk) m (Iterator m blk (Point blk, b)))
-> ExceptT (MissingBlock blk) m (Iterator m blk (Point blk, b))
-> ExceptT (UnknownRange blk) m (Iterator m blk (Point blk, b))
forall a b. (a -> b) -> a -> b
$ m (Either (MissingBlock blk) (Iterator m blk (Point blk, b)))
-> ExceptT (MissingBlock blk) m (Iterator m blk (Point blk, b))
forall e (m :: * -> *) a. m (Either e a) -> ExceptT e m a
ExceptT (m (Either (MissingBlock blk) (Iterator m blk (Point blk, b)))
 -> ExceptT (MissingBlock blk) m (Iterator m blk (Point blk, b)))
-> m (Either (MissingBlock blk) (Iterator m blk (Point blk, b)))
-> ExceptT (MissingBlock blk) m (Iterator m blk (Point blk, b))
forall a b. (a -> b) -> a -> b
$
            ImmutableDB m blk
-> ResourceRegistry m
-> BlockComponent blk (Point blk, b)
-> StreamFrom blk
-> StreamTo blk
-> m (Either (MissingBlock blk) (Iterator m blk (Point blk, b)))
forall (m :: * -> *) blk b.
HasCallStack =>
ImmutableDB m blk
-> ResourceRegistry m
-> BlockComponent blk b
-> StreamFrom blk
-> StreamTo blk
-> m (Either (MissingBlock blk) (Iterator m blk b))
ImmutableDB.stream
              ImmutableDB m blk
itImmutableDB
              ResourceRegistry m
registry
              ((,) (Point blk -> b -> (Point blk, b))
-> BlockComponent blk (Point blk)
-> BlockComponent blk (b -> (Point blk, b))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> BlockComponent blk (Point blk)
forall blk. BlockComponent blk (Point blk)
getPoint BlockComponent blk (b -> (Point blk, b))
-> BlockComponent blk b -> BlockComponent blk (Point blk, b)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> BlockComponent blk b
blockComponent)
              StreamFrom blk
from
              StreamTo blk
to'
        m (Iterator m blk b)
-> ExceptT (UnknownRange blk) m (Iterator m blk b)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (m (Iterator m blk b)
 -> ExceptT (UnknownRange blk) m (Iterator m blk b))
-> m (Iterator m blk b)
-> ExceptT (UnknownRange blk) m (Iterator m blk b)
forall a b. (a -> b) -> a -> b
$ IteratorState m blk b -> m (Iterator m blk b)
createIterator (IteratorState m blk b -> m (Iterator m blk b))
-> IteratorState m blk b -> m (Iterator m blk b)
forall a b. (a -> b) -> a -> b
$ StreamFrom blk
-> Iterator m blk (Point blk, b)
-> InImmutableDBEnd blk
-> IteratorState m blk b
forall (m :: * -> *) blk b.
StreamFrom blk
-> Iterator m blk (Point blk, b)
-> InImmutableDBEnd blk
-> IteratorState m blk b
InImmutableDB StreamFrom blk
from Iterator m blk (Point blk, b)
immIt (StreamTo blk -> InImmutableDBEnd blk
forall blk. StreamTo blk -> InImmutableDBEnd blk
StreamTo StreamTo blk
to')

    -- | If we have to stream from both the ImmutableDB and the VolatileDB, we
    -- only allow the (current) tip of the ImmutableDB to be the switchover
    -- point between the two DBs. If not, this would mean we have to stream a
    -- fork that forks off more than @k@ blocks in the past, in which case the
    -- risk of blocks going missing due to GC increases. So we refuse such a
    -- stream.
    streamFromBoth ::
         HasCallStack
      => HeaderHash blk
      -> [RealPoint blk]
      -> ExceptT (UnknownRange blk) m (Iterator m blk b)
    streamFromBoth :: HeaderHash blk
-> [RealPoint blk]
-> ExceptT (UnknownRange blk) m (Iterator m blk b)
streamFromBoth HeaderHash blk
predHash [RealPoint blk]
pts = do
        m () -> ExceptT (UnknownRange blk) m ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (m () -> ExceptT (UnknownRange blk) m ())
-> m () -> ExceptT (UnknownRange blk) m ()
forall a b. (a -> b) -> a -> b
$ TraceIteratorEvent blk -> m ()
trace (TraceIteratorEvent blk -> m ()) -> TraceIteratorEvent blk -> m ()
forall a b. (a -> b) -> a -> b
$ StreamFrom blk
-> StreamTo blk -> [RealPoint blk] -> TraceIteratorEvent blk
forall blk.
StreamFrom blk
-> StreamTo blk -> [RealPoint blk] -> TraceIteratorEvent blk
StreamFromBoth StreamFrom blk
from StreamTo blk
to [RealPoint blk]
pts
        m (WithOrigin (RealPoint blk))
-> ExceptT (UnknownRange blk) m (WithOrigin (RealPoint blk))
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift ((Tip blk -> RealPoint blk)
-> WithOrigin (Tip blk) -> WithOrigin (RealPoint blk)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Tip blk -> RealPoint blk
forall blk. Tip blk -> RealPoint blk
ImmutableDB.tipToRealPoint (WithOrigin (Tip blk) -> WithOrigin (RealPoint blk))
-> m (WithOrigin (Tip blk)) -> m (WithOrigin (RealPoint blk))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>
               STM m (WithOrigin (Tip blk)) -> m (WithOrigin (Tip blk))
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (ImmutableDB m blk -> STM m (WithOrigin (Tip blk))
forall (m :: * -> *) blk.
HasCallStack =>
ImmutableDB m blk -> STM m (WithOrigin (Tip blk))
ImmutableDB.getTip ImmutableDB m blk
itImmutableDB)) ExceptT (UnknownRange blk) m (WithOrigin (RealPoint blk))
-> (WithOrigin (RealPoint blk)
    -> ExceptT (UnknownRange blk) m (Iterator m blk b))
-> ExceptT (UnknownRange blk) m (Iterator m blk b)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
          -- The ImmutableDB is empty
          WithOrigin (RealPoint blk)
Origin -> UnknownRange blk -> ExceptT (UnknownRange blk) m (Iterator m blk b)
forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError (UnknownRange blk
 -> ExceptT (UnknownRange blk) m (Iterator m blk b))
-> UnknownRange blk
-> ExceptT (UnknownRange blk) m (Iterator m blk b)
forall a b. (a -> b) -> a -> b
$ StreamFrom blk -> UnknownRange blk
forall blk. StreamFrom blk -> UnknownRange blk
ForkTooOld StreamFrom blk
from
          -- The incomplete path fits onto the tip of the ImmutableDB.
          NotOrigin pt :: RealPoint blk
pt@(RealPoint SlotNo
_ HeaderHash blk
tipHash)
            | HeaderHash blk
tipHash HeaderHash blk -> HeaderHash blk -> Bool
forall a. Eq a => a -> a -> Bool
== HeaderHash blk
predHash
            -> case [RealPoint blk] -> Maybe (NonEmpty (RealPoint blk))
forall a. [a] -> Maybe (NonEmpty a)
NE.nonEmpty [RealPoint blk]
pts of
                 Just NonEmpty (RealPoint blk)
pts' -> RealPoint blk
-> NonEmpty (RealPoint blk)
-> ExceptT (UnknownRange blk) m (Iterator m blk b)
startStream RealPoint blk
pt NonEmpty (RealPoint blk)
pts'
                 -- The lower bound was in the ImmutableDB and the upper was
                 -- in the VolatileDB, but the path of points in the
                 -- VolatileDB is actually empty. It must be that the
                 -- exclusive bound was in the VolatileDB and its
                 -- predecessor is the tip of the ImmutableDB.
                 Maybe (NonEmpty (RealPoint blk))
Nothing   -> StreamTo blk -> ExceptT (UnknownRange blk) m (Iterator m blk b)
streamFromImmutableDBHelper (RealPoint blk -> StreamTo blk
forall blk. RealPoint blk -> StreamTo blk
StreamToInclusive RealPoint blk
pt)
            -- The incomplete path doesn't fit onto the tip of the
            -- ImmutableDB. Note that since we have constructed the
            -- incomplete path through the VolatileDB, blocks might have
            -- moved from the VolatileDB to the ImmutableDB so that the tip
            -- of the ImmutableDB has changed. Either the path used to fit
            -- onto the tip but the tip has changed, or the path simply
            -- never fitted onto the tip.
            | Bool
otherwise  -> case (RealPoint blk -> Bool) -> [RealPoint blk] -> [RealPoint blk]
forall a. (a -> Bool) -> [a] -> [a]
dropWhile (RealPoint blk -> RealPoint blk -> Bool
forall a. Eq a => a -> a -> Bool
/= RealPoint blk
pt) [RealPoint blk]
pts of
              -- The current tip is not in the path, this means that the path
              -- never fitted onto the tip of the ImmutableDB. We refuse this
              -- stream.
              []              -> UnknownRange blk -> ExceptT (UnknownRange blk) m (Iterator m blk b)
forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError (UnknownRange blk
 -> ExceptT (UnknownRange blk) m (Iterator m blk b))
-> UnknownRange blk
-> ExceptT (UnknownRange blk) m (Iterator m blk b)
forall a b. (a -> b) -> a -> b
$ StreamFrom blk -> UnknownRange blk
forall blk. StreamFrom blk -> UnknownRange blk
ForkTooOld StreamFrom blk
from
              -- The current tip is in the path, with some points after it,
              -- this means that some blocks in our path have moved from the
              -- VolatileDB to the ImmutableDB. We can shift the switchover
              -- point to the current tip.
              RealPoint blk
_tipPt:RealPoint blk
pt':[RealPoint blk]
pts' -> RealPoint blk
-> NonEmpty (RealPoint blk)
-> ExceptT (UnknownRange blk) m (Iterator m blk b)
startStream RealPoint blk
pt (RealPoint blk
pt' RealPoint blk -> [RealPoint blk] -> NonEmpty (RealPoint blk)
forall a. a -> [a] -> NonEmpty a
NE.:| [RealPoint blk]
pts')
              -- The current tip is the end of the path, this means we can
              -- actually stream everything from just the ImmutableDB. It
              -- could be that the exclusive end bound was not part of the
              -- ImmutableDB, so stream to the current tip of the ImmutableDB
              -- (inclusive) to avoid trying to stream (exclusive) to a block
              -- that's not in the ImmutableDB.
              [RealPoint blk
_tipPt]        -> StreamTo blk -> ExceptT (UnknownRange blk) m (Iterator m blk b)
streamFromImmutableDBHelper (RealPoint blk -> StreamTo blk
forall blk. RealPoint blk -> StreamTo blk
StreamToInclusive RealPoint blk
pt)
      where
        startStream ::
             RealPoint blk -- ^ Tip of the ImmutableDB
          -> NonEmpty (RealPoint blk)
          -> ExceptT (UnknownRange blk) m (Iterator m blk b)
        startStream :: RealPoint blk
-> NonEmpty (RealPoint blk)
-> ExceptT (UnknownRange blk) m (Iterator m blk b)
startStream RealPoint blk
immTip NonEmpty (RealPoint blk)
pts' = do
          let immEnd :: InImmutableDBEnd blk
immEnd = StreamTo blk -> NonEmpty (RealPoint blk) -> InImmutableDBEnd blk
forall blk.
StreamTo blk -> NonEmpty (RealPoint blk) -> InImmutableDBEnd blk
SwitchToVolatileDBFrom (RealPoint blk -> StreamTo blk
forall blk. RealPoint blk -> StreamTo blk
StreamToInclusive RealPoint blk
immTip) NonEmpty (RealPoint blk)
pts'
          Iterator m blk (Point blk, b)
immIt <- (MissingBlock blk -> UnknownRange blk)
-> ExceptT (MissingBlock blk) m (Iterator m blk (Point blk, b))
-> ExceptT (UnknownRange blk) m (Iterator m blk (Point blk, b))
forall (m :: * -> *) e e' a.
Functor m =>
(e -> e') -> ExceptT e m a -> ExceptT e' m a
withExceptT MissingBlock blk -> UnknownRange blk
forall blk. MissingBlock blk -> UnknownRange blk
missingBlockToUnknownRange (ExceptT (MissingBlock blk) m (Iterator m blk (Point blk, b))
 -> ExceptT (UnknownRange blk) m (Iterator m blk (Point blk, b)))
-> ExceptT (MissingBlock blk) m (Iterator m blk (Point blk, b))
-> ExceptT (UnknownRange blk) m (Iterator m blk (Point blk, b))
forall a b. (a -> b) -> a -> b
$ m (Either (MissingBlock blk) (Iterator m blk (Point blk, b)))
-> ExceptT (MissingBlock blk) m (Iterator m blk (Point blk, b))
forall e (m :: * -> *) a. m (Either e a) -> ExceptT e m a
ExceptT (m (Either (MissingBlock blk) (Iterator m blk (Point blk, b)))
 -> ExceptT (MissingBlock blk) m (Iterator m blk (Point blk, b)))
-> m (Either (MissingBlock blk) (Iterator m blk (Point blk, b)))
-> ExceptT (MissingBlock blk) m (Iterator m blk (Point blk, b))
forall a b. (a -> b) -> a -> b
$
            ImmutableDB m blk
-> ResourceRegistry m
-> BlockComponent blk (Point blk, b)
-> StreamFrom blk
-> StreamTo blk
-> m (Either (MissingBlock blk) (Iterator m blk (Point blk, b)))
forall (m :: * -> *) blk b.
HasCallStack =>
ImmutableDB m blk
-> ResourceRegistry m
-> BlockComponent blk b
-> StreamFrom blk
-> StreamTo blk
-> m (Either (MissingBlock blk) (Iterator m blk b))
ImmutableDB.stream
              ImmutableDB m blk
itImmutableDB
              ResourceRegistry m
registry
              ((,) (Point blk -> b -> (Point blk, b))
-> BlockComponent blk (Point blk)
-> BlockComponent blk (b -> (Point blk, b))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> BlockComponent blk (Point blk)
forall blk. BlockComponent blk (Point blk)
getPoint BlockComponent blk (b -> (Point blk, b))
-> BlockComponent blk b -> BlockComponent blk (Point blk, b)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> BlockComponent blk b
blockComponent)
              StreamFrom blk
from
              (RealPoint blk -> StreamTo blk
forall blk. RealPoint blk -> StreamTo blk
StreamToInclusive RealPoint blk
immTip)
          m (Iterator m blk b)
-> ExceptT (UnknownRange blk) m (Iterator m blk b)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (m (Iterator m blk b)
 -> ExceptT (UnknownRange blk) m (Iterator m blk b))
-> m (Iterator m blk b)
-> ExceptT (UnknownRange blk) m (Iterator m blk b)
forall a b. (a -> b) -> a -> b
$ IteratorState m blk b -> m (Iterator m blk b)
createIterator (IteratorState m blk b -> m (Iterator m blk b))
-> IteratorState m blk b -> m (Iterator m blk b)
forall a b. (a -> b) -> a -> b
$ StreamFrom blk
-> Iterator m blk (Point blk, b)
-> InImmutableDBEnd blk
-> IteratorState m blk b
forall (m :: * -> *) blk b.
StreamFrom blk
-> Iterator m blk (Point blk, b)
-> InImmutableDBEnd blk
-> IteratorState m blk b
InImmutableDB StreamFrom blk
from Iterator m blk (Point blk, b)
immIt InImmutableDBEnd blk
immEnd

    makeIterator ::
         Bool  -- ^ Register the iterator in 'cdbIterators'?
      -> IteratorState m blk b
      -> m (Iterator m blk b)
    makeIterator :: Bool -> IteratorState m blk b -> m (Iterator m blk b)
makeIterator Bool
register IteratorState m blk b
itState = do
      IteratorKey
iteratorKey <- m IteratorKey
makeNewIteratorKey
      StrictTVar m (IteratorState m blk b)
varItState  <- IteratorState m blk b -> m (StrictTVar m (IteratorState m blk b))
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack, NoThunks a) =>
a -> m (StrictTVar m a)
newTVarIO IteratorState m blk b
itState
      Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
register (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ STM m () -> m ()
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ StrictTVar m (Map IteratorKey (m ()))
-> (Map IteratorKey (m ()) -> Map IteratorKey (m ())) -> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
StrictTVar m a -> (a -> a) -> STM m ()
modifyTVar StrictTVar m (Map IteratorKey (m ()))
itIterators ((Map IteratorKey (m ()) -> Map IteratorKey (m ())) -> STM m ())
-> (Map IteratorKey (m ()) -> Map IteratorKey (m ())) -> STM m ()
forall a b. (a -> b) -> a -> b
$
        -- Note that we don't use 'itEnv' here, because that would mean that
        -- invoking the function only works when the database is open, which
        -- probably won't be the case.
        IteratorKey
-> m () -> Map IteratorKey (m ()) -> Map IteratorKey (m ())
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert IteratorKey
iteratorKey (StrictTVar m (IteratorState m blk b)
-> IteratorKey -> IteratorEnv m blk -> m ()
forall (m :: * -> *) blk b.
IOLike m =>
StrictTVar m (IteratorState m blk b)
-> IteratorKey -> IteratorEnv m blk -> m ()
implIteratorClose StrictTVar m (IteratorState m blk b)
varItState IteratorKey
iteratorKey IteratorEnv m blk
itEnv)
      Iterator m blk b -> m (Iterator m blk b)
forall (m :: * -> *) a. Monad m => a -> m a
return Iterator :: forall (m :: * -> *) blk b.
m (IteratorResult blk b) -> m () -> Iterator m blk b
Iterator {
          iteratorNext :: m (IteratorResult blk b)
iteratorNext  = (IteratorEnv m blk -> m (IteratorResult blk b))
-> m (IteratorResult blk b)
forall r. (IteratorEnv m blk -> m r) -> m r
getItEnv ((IteratorEnv m blk -> m (IteratorResult blk b))
 -> m (IteratorResult blk b))
-> (IteratorEnv m blk -> m (IteratorResult blk b))
-> m (IteratorResult blk b)
forall a b. (a -> b) -> a -> b
$
            ResourceRegistry m
-> StrictTVar m (IteratorState m blk b)
-> BlockComponent blk b
-> IteratorEnv m blk
-> m (IteratorResult blk b)
forall (m :: * -> *) blk b.
(IOLike m, HasHeader blk) =>
ResourceRegistry m
-> StrictTVar m (IteratorState m blk b)
-> BlockComponent blk b
-> IteratorEnv m blk
-> m (IteratorResult blk b)
implIteratorNext  ResourceRegistry m
registry StrictTVar m (IteratorState m blk b)
varItState BlockComponent blk b
blockComponent
        , iteratorClose :: m ()
iteratorClose = (IteratorEnv m blk -> m ()) -> m ()
forall r. (IteratorEnv m blk -> m r) -> m r
getItEnv ((IteratorEnv m blk -> m ()) -> m ())
-> (IteratorEnv m blk -> m ()) -> m ()
forall a b. (a -> b) -> a -> b
$
            StrictTVar m (IteratorState m blk b)
-> IteratorKey -> IteratorEnv m blk -> m ()
forall (m :: * -> *) blk b.
IOLike m =>
StrictTVar m (IteratorState m blk b)
-> IteratorKey -> IteratorEnv m blk -> m ()
implIteratorClose          StrictTVar m (IteratorState m blk b)
varItState IteratorKey
iteratorKey
        }

    emptyIterator :: m (Iterator m blk b)
    emptyIterator :: m (Iterator m blk b)
emptyIterator = Bool -> IteratorState m blk b -> m (Iterator m blk b)
makeIterator Bool
False IteratorState m blk b
forall (m :: * -> *) blk b. IteratorState m blk b
Closed

    -- | This is 'makeIterator' +  it in 'cdbIterators'.
    createIterator :: IteratorState m blk b -> m (Iterator m blk b)
    createIterator :: IteratorState m blk b -> m (Iterator m blk b)
createIterator = Bool -> IteratorState m blk b -> m (Iterator m blk b)
makeIterator Bool
True

    makeNewIteratorKey :: m IteratorKey
    makeNewIteratorKey :: m IteratorKey
makeNewIteratorKey = STM m IteratorKey -> m IteratorKey
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m IteratorKey -> m IteratorKey)
-> STM m IteratorKey -> m IteratorKey
forall a b. (a -> b) -> a -> b
$ do
      IteratorKey
newIteratorKey <- StrictTVar m IteratorKey -> STM m IteratorKey
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m IteratorKey
itNextIteratorKey
      StrictTVar m IteratorKey
-> (IteratorKey -> IteratorKey) -> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
StrictTVar m a -> (a -> a) -> STM m ()
modifyTVar StrictTVar m IteratorKey
itNextIteratorKey IteratorKey -> IteratorKey
forall a. Enum a => a -> a
succ
      IteratorKey -> STM m IteratorKey
forall (m :: * -> *) a. Monad m => a -> m a
return IteratorKey
newIteratorKey

-- | Variant of 'computePath' that computes a path through the VolatileDB.
-- Throws an 'InvalidIteratorRange' exception when the range is invalid (i.e.,
-- 'computePath' returned 'Nothing').
computePathVolatileDB ::
     (IOLike m, HasHeader blk)
  => VolatileDB m blk
  -> StreamFrom blk
  -> StreamTo   blk
  -> m (Path blk)
computePathVolatileDB :: VolatileDB m blk -> StreamFrom blk -> StreamTo blk -> m (Path blk)
computePathVolatileDB VolatileDB m blk
volatileDB StreamFrom blk
from StreamTo blk
to = do
    HeaderHash blk -> Maybe (BlockInfo blk)
lookupBlockInfo <- STM m (HeaderHash blk -> Maybe (BlockInfo blk))
-> m (HeaderHash blk -> Maybe (BlockInfo blk))
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m (HeaderHash blk -> Maybe (BlockInfo blk))
 -> m (HeaderHash blk -> Maybe (BlockInfo blk)))
-> STM m (HeaderHash blk -> Maybe (BlockInfo blk))
-> m (HeaderHash blk -> Maybe (BlockInfo blk))
forall a b. (a -> b) -> a -> b
$ VolatileDB m blk
-> HasCallStack => STM m (HeaderHash blk -> Maybe (BlockInfo blk))
forall (m :: * -> *) blk.
VolatileDB m blk
-> HasCallStack => STM m (HeaderHash blk -> Maybe (BlockInfo blk))
VolatileDB.getBlockInfo VolatileDB m blk
volatileDB
    case (HeaderHash blk -> Maybe (BlockInfo blk))
-> StreamFrom blk -> StreamTo blk -> Maybe (Path blk)
forall blk.
HasHeader blk =>
LookupBlockInfo blk
-> StreamFrom blk -> StreamTo blk -> Maybe (Path blk)
computePath HeaderHash blk -> Maybe (BlockInfo blk)
lookupBlockInfo StreamFrom blk
from StreamTo blk
to of
      Just Path blk
path -> Path blk -> m (Path blk)
forall (m :: * -> *) a. Monad m => a -> m a
return Path blk
path
      Maybe (Path blk)
Nothing   -> ChainDbError blk -> m (Path blk)
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO (ChainDbError blk -> m (Path blk))
-> ChainDbError blk -> m (Path blk)
forall a b. (a -> b) -> a -> b
$ StreamFrom blk -> StreamTo blk -> ChainDbError blk
forall blk. StreamFrom blk -> StreamTo blk -> ChainDbError blk
InvalidIteratorRange StreamFrom blk
from StreamTo blk
to

-- | Close the iterator and remove it from the map of iterators ('itIterators'
-- and thus 'cdbIterators').
implIteratorClose ::
     IOLike m
  => StrictTVar m (IteratorState m blk b)
  -> IteratorKey
  -> IteratorEnv m blk
  -> m ()
implIteratorClose :: StrictTVar m (IteratorState m blk b)
-> IteratorKey -> IteratorEnv m blk -> m ()
implIteratorClose StrictTVar m (IteratorState m blk b)
varItState IteratorKey
itrKey IteratorEnv{Tracer m (TraceIteratorEvent blk)
StrictTVar m (Map IteratorKey (m ()))
StrictTVar m IteratorKey
VolatileDB m blk
ImmutableDB m blk
itTracer :: Tracer m (TraceIteratorEvent blk)
itNextIteratorKey :: StrictTVar m IteratorKey
itIterators :: StrictTVar m (Map IteratorKey (m ()))
itVolatileDB :: VolatileDB m blk
itImmutableDB :: ImmutableDB m blk
itTracer :: forall (m :: * -> *) blk.
IteratorEnv m blk -> Tracer m (TraceIteratorEvent blk)
itNextIteratorKey :: forall (m :: * -> *) blk.
IteratorEnv m blk -> StrictTVar m IteratorKey
itIterators :: forall (m :: * -> *) blk.
IteratorEnv m blk -> StrictTVar m (Map IteratorKey (m ()))
itVolatileDB :: forall (m :: * -> *) blk. IteratorEnv m blk -> VolatileDB m blk
itImmutableDB :: forall (m :: * -> *) blk. IteratorEnv m blk -> ImmutableDB m blk
..} = do
    Maybe (Iterator m blk (Point blk, b))
mbImmIt <- STM m (Maybe (Iterator m blk (Point blk, b)))
-> m (Maybe (Iterator m blk (Point blk, b)))
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m (Maybe (Iterator m blk (Point blk, b)))
 -> m (Maybe (Iterator m blk (Point blk, b))))
-> STM m (Maybe (Iterator m blk (Point blk, b)))
-> m (Maybe (Iterator m blk (Point blk, b)))
forall a b. (a -> b) -> a -> b
$ do
      StrictTVar m (Map IteratorKey (m ()))
-> (Map IteratorKey (m ()) -> Map IteratorKey (m ())) -> STM m ()
forall (m :: * -> *) a.
MonadSTM m =>
StrictTVar m a -> (a -> a) -> STM m ()
modifyTVar StrictTVar m (Map IteratorKey (m ()))
itIterators (IteratorKey -> Map IteratorKey (m ()) -> Map IteratorKey (m ())
forall k a. Ord k => k -> Map k a -> Map k a
Map.delete IteratorKey
itrKey)
      Maybe (Iterator m blk (Point blk, b))
mbImmIt <- IteratorState m blk b -> Maybe (Iterator m blk (Point blk, b))
forall (m :: * -> *) blk b.
IteratorState m blk b -> Maybe (Iterator m blk (Point blk, b))
iteratorStateImmutableIt (IteratorState m blk b -> Maybe (Iterator m blk (Point blk, b)))
-> STM m (IteratorState m blk b)
-> STM m (Maybe (Iterator m blk (Point blk, b)))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> StrictTVar m (IteratorState m blk b)
-> STM m (IteratorState m blk b)
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m (IteratorState m blk b)
varItState
      StrictTVar m (IteratorState m blk b)
-> IteratorState m blk b -> STM m ()
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
StrictTVar m a -> a -> STM m ()
writeTVar StrictTVar m (IteratorState m blk b)
varItState IteratorState m blk b
forall (m :: * -> *) blk b. IteratorState m blk b
Closed
      Maybe (Iterator m blk (Point blk, b))
-> STM m (Maybe (Iterator m blk (Point blk, b)))
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe (Iterator m blk (Point blk, b))
mbImmIt
    (Iterator m blk (Point blk, b) -> m ())
-> Maybe (Iterator m blk (Point blk, b)) -> m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ Iterator m blk (Point blk, b) -> m ()
forall (m :: * -> *) blk b.
Iterator m blk b -> HasCallStack => m ()
ImmutableDB.iteratorClose Maybe (Iterator m blk (Point blk, b))
mbImmIt

-- | Possible states of an iterator.
--
-- When streaming solely from the ImmutableDB ('InImmutableDB' where
-- 'InImmutableDBEnd' is /not/ 'SwitchToVolatileDBFrom'): we will remain in this
-- state until we are done, and end up in 'Closed'.
--
-- When streaming solely from the VolatileDB ('InVolatileDB'): when
-- 'VolatileDB.getBlock' returns 'Nothing', i.e. the block is missing from the
-- VolatileDB and might have moved to the ImmutableDB: we switch to the
-- 'InImmutableDBRetry' state, unless we just come from that state, in that
-- case, return 'IteratorBlockGCed' and close the iterator.
--
-- When streaming from the ImmutableDB with a planned switchover to the
-- VolatileDB ('InImmutableDB' where 'InImmutableDBEnd' is
-- 'SwitchToVolatileDBFrom') and we have reached the end of the ImmutableDB
-- iterator (exhausted or upper bound is reached): we switch to the
-- 'InVolatileDB' state.
--
-- In the 'InImmutableDBRetry' state, we distinguish two cases:
--
-- 1. We have just switched to it because a block was missing from the
--    VolatileDB. We have an iterator that could stream this block from the
--    ImmutableDB (if it was indeed moved to the ImmutableDB). If the streamed
--    block matches the expected point, we continue. If not, or if the
--    iterator is immediately exhausted, then the block is missing and we
--    return 'IteratorBlockGCed' and close the iterator.
--
-- 2. We have successfully streamed one or more blocks from the ImmutableDB
--    that were previously part of the VolatileDB. When we now encounter a
--    block of which the point does not match the expected point or when the
--    iterator is exhausted, we switch back to the 'InVolatileDB' state.
--
data IteratorState m blk b
  = InImmutableDB
      !(StreamFrom blk)
      !(ImmutableDB.Iterator m blk (Point blk, b))
      !(InImmutableDBEnd blk)
    -- ^ Streaming from the ImmutableDB.
    --
    -- Invariant: an ImmutableDB iterator opened using the 'StreamFrom'
    -- parameter as lower bound will yield the same next block as the iterator
    -- stored as parameter. There is one difference, which is exactly the
    -- reason for keeping track of this 'StreamFrom': if the latter iterator
    -- (the parameter) is exhausted and blocks have been appended to the end
    -- of the ImmutableDB since it was originally opened, the new iterator can
    -- include them in its stream.
    --
    -- Invariant: the iterator is not exhausted.
  | InVolatileDB
      !(StreamFrom blk)
      !(NonEmpty (RealPoint blk))
    -- ^ Streaming from the VolatileDB.
    --
    -- The (non-empty) list of points is the path to follow through the
    -- VolatileDB.
    --
    -- Invariant: if the blocks corresponding to the points have been moved to
    -- the ImmutableDB, it should be possible to stream these blocks from the
    -- ImmutableDB by starting an iterator using the 'StreamFrom' parameter.
    -- Note that the points of these blocks still have to be checked against
    -- the points in the path, because the blocks might not have been part of
    -- the current chain, in which case they will not be in the ImmutableDB.
  | InImmutableDBRetry
      !(StreamFrom blk)
      !(ImmutableDB.Iterator m blk (Point blk, b))
      !(NonEmpty (RealPoint blk))
    -- ^ When streaming blocks (a list of points) from the VolatileDB, we
    -- noticed a block was missing from the VolatileDB. It may have moved to
    -- the ImmutableDB since we initialised the iterator (and built the path),
    -- so we'll try if we can stream it from the ImmutableDB.
    --
    -- Invariants: invariants of 'InImmutableDB' + invariant of 'InVolatileDB'.

  | Closed
  deriving ((forall x. IteratorState m blk b -> Rep (IteratorState m blk b) x)
-> (forall x.
    Rep (IteratorState m blk b) x -> IteratorState m blk b)
-> Generic (IteratorState m blk b)
forall x. Rep (IteratorState m blk b) x -> IteratorState m blk b
forall x. IteratorState m blk b -> Rep (IteratorState m blk b) x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
forall (m :: * -> *) blk b x.
Rep (IteratorState m blk b) x -> IteratorState m blk b
forall (m :: * -> *) blk b x.
IteratorState m blk b -> Rep (IteratorState m blk b) x
$cto :: forall (m :: * -> *) blk b x.
Rep (IteratorState m blk b) x -> IteratorState m blk b
$cfrom :: forall (m :: * -> *) blk b x.
IteratorState m blk b -> Rep (IteratorState m blk b) x
Generic)

instance (Typeable blk, StandardHash blk)
      => NoThunks (IteratorState m blk b)
  -- use generic instance

-- | Extract the ImmutableDB Iterator from the 'IteratorState'.
iteratorStateImmutableIt ::
     IteratorState m blk b
  -> Maybe (ImmutableDB.Iterator m blk (Point blk, b))
iteratorStateImmutableIt :: IteratorState m blk b -> Maybe (Iterator m blk (Point blk, b))
iteratorStateImmutableIt = \case
    IteratorState m blk b
Closed                       -> Maybe (Iterator m blk (Point blk, b))
forall a. Maybe a
Nothing
    InImmutableDB      StreamFrom blk
_ Iterator m blk (Point blk, b)
immIt InImmutableDBEnd blk
_ -> Iterator m blk (Point blk, b)
-> Maybe (Iterator m blk (Point blk, b))
forall a. a -> Maybe a
Just Iterator m blk (Point blk, b)
immIt
    InImmutableDBRetry StreamFrom blk
_ Iterator m blk (Point blk, b)
immIt NonEmpty (RealPoint blk)
_ -> Iterator m blk (Point blk, b)
-> Maybe (Iterator m blk (Point blk, b))
forall a. a -> Maybe a
Just Iterator m blk (Point blk, b)
immIt
    InVolatileDB {}              -> Maybe (Iterator m blk (Point blk, b))
forall a. Maybe a
Nothing

-- | Determines if/when to stop streaming from the ImmutableDB and what to do
-- afterwards.
data InImmutableDBEnd blk =
    StreamAll
    -- ^ Don't stop streaming until the iterator is exhausted.
  | StreamTo               !(StreamTo blk)
    -- ^ Stream to the upper bound.
  | SwitchToVolatileDBFrom !(StreamTo blk) !(NonEmpty (RealPoint blk))
    -- ^ Stream to the upper bound. Afterwards, start streaming the path (the
    -- second parameter) from the VolatileDB.
  deriving ((forall x. InImmutableDBEnd blk -> Rep (InImmutableDBEnd blk) x)
-> (forall x. Rep (InImmutableDBEnd blk) x -> InImmutableDBEnd blk)
-> Generic (InImmutableDBEnd blk)
forall x. Rep (InImmutableDBEnd blk) x -> InImmutableDBEnd blk
forall x. InImmutableDBEnd blk -> Rep (InImmutableDBEnd blk) x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
forall blk x. Rep (InImmutableDBEnd blk) x -> InImmutableDBEnd blk
forall blk x. InImmutableDBEnd blk -> Rep (InImmutableDBEnd blk) x
$cto :: forall blk x. Rep (InImmutableDBEnd blk) x -> InImmutableDBEnd blk
$cfrom :: forall blk x. InImmutableDBEnd blk -> Rep (InImmutableDBEnd blk) x
Generic, Context -> InImmutableDBEnd blk -> IO (Maybe ThunkInfo)
Proxy (InImmutableDBEnd blk) -> String
(Context -> InImmutableDBEnd blk -> IO (Maybe ThunkInfo))
-> (Context -> InImmutableDBEnd blk -> IO (Maybe ThunkInfo))
-> (Proxy (InImmutableDBEnd blk) -> String)
-> NoThunks (InImmutableDBEnd blk)
forall blk.
(StandardHash blk, Typeable blk) =>
Context -> InImmutableDBEnd blk -> IO (Maybe ThunkInfo)
forall blk.
(StandardHash blk, Typeable blk) =>
Proxy (InImmutableDBEnd blk) -> String
forall a.
(Context -> a -> IO (Maybe ThunkInfo))
-> (Context -> a -> IO (Maybe ThunkInfo))
-> (Proxy a -> String)
-> NoThunks a
showTypeOf :: Proxy (InImmutableDBEnd blk) -> String
$cshowTypeOf :: forall blk.
(StandardHash blk, Typeable blk) =>
Proxy (InImmutableDBEnd blk) -> String
wNoThunks :: Context -> InImmutableDBEnd blk -> IO (Maybe ThunkInfo)
$cwNoThunks :: forall blk.
(StandardHash blk, Typeable blk) =>
Context -> InImmutableDBEnd blk -> IO (Maybe ThunkInfo)
noThunks :: Context -> InImmutableDBEnd blk -> IO (Maybe ThunkInfo)
$cnoThunks :: forall blk.
(StandardHash blk, Typeable blk) =>
Context -> InImmutableDBEnd blk -> IO (Maybe ThunkInfo)
NoThunks)

implIteratorNext ::
    forall m blk b. (IOLike m, HasHeader blk)
  => ResourceRegistry m
  -> StrictTVar m (IteratorState m blk b)
  -> BlockComponent blk b
  -> IteratorEnv m blk
  -> m (IteratorResult blk b)
implIteratorNext :: ResourceRegistry m
-> StrictTVar m (IteratorState m blk b)
-> BlockComponent blk b
-> IteratorEnv m blk
-> m (IteratorResult blk b)
implIteratorNext ResourceRegistry m
registry StrictTVar m (IteratorState m blk b)
varItState BlockComponent blk b
blockComponent IteratorEnv{Tracer m (TraceIteratorEvent blk)
StrictTVar m (Map IteratorKey (m ()))
StrictTVar m IteratorKey
VolatileDB m blk
ImmutableDB m blk
itTracer :: Tracer m (TraceIteratorEvent blk)
itNextIteratorKey :: StrictTVar m IteratorKey
itIterators :: StrictTVar m (Map IteratorKey (m ()))
itVolatileDB :: VolatileDB m blk
itImmutableDB :: ImmutableDB m blk
itTracer :: forall (m :: * -> *) blk.
IteratorEnv m blk -> Tracer m (TraceIteratorEvent blk)
itNextIteratorKey :: forall (m :: * -> *) blk.
IteratorEnv m blk -> StrictTVar m IteratorKey
itIterators :: forall (m :: * -> *) blk.
IteratorEnv m blk -> StrictTVar m (Map IteratorKey (m ()))
itVolatileDB :: forall (m :: * -> *) blk. IteratorEnv m blk -> VolatileDB m blk
itImmutableDB :: forall (m :: * -> *) blk. IteratorEnv m blk -> ImmutableDB m blk
..} =
    STM m (IteratorState m blk b) -> m (IteratorState m blk b)
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (StrictTVar m (IteratorState m blk b)
-> STM m (IteratorState m blk b)
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m (IteratorState m blk b)
varItState) m (IteratorState m blk b)
-> (IteratorState m blk b -> m (IteratorResult blk b))
-> m (IteratorResult blk b)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
      IteratorState m blk b
Closed ->
        IteratorResult blk b -> m (IteratorResult blk b)
forall (m :: * -> *) a. Monad m => a -> m a
return IteratorResult blk b
forall blk b. IteratorResult blk b
IteratorExhausted
      InImmutableDB StreamFrom blk
continueAfter Iterator m blk (Point blk, b)
immIt InImmutableDBEnd blk
immEnd ->
        StreamFrom blk
-> Iterator m blk (Point blk, b)
-> InImmutableDBEnd blk
-> m (IteratorResult blk b)
nextInImmutableDB StreamFrom blk
continueAfter Iterator m blk (Point blk, b)
immIt InImmutableDBEnd blk
immEnd
      InImmutableDBRetry StreamFrom blk
continueAfter Iterator m blk (Point blk, b)
immIt NonEmpty (RealPoint blk)
immPts ->
        Maybe (StreamFrom blk)
-> Iterator m blk (Point blk, b)
-> NonEmpty (RealPoint blk)
-> m (IteratorResult blk b)
nextInImmutableDBRetry (StreamFrom blk -> Maybe (StreamFrom blk)
forall a. a -> Maybe a
Just StreamFrom blk
continueAfter) Iterator m blk (Point blk, b)
immIt NonEmpty (RealPoint blk)
immPts
      InVolatileDB StreamFrom blk
continueAfter NonEmpty (RealPoint blk)
volPts ->
        StreamFrom blk
-> NonEmpty (RealPoint blk) -> m (IteratorResult blk b)
nextInVolatileDB StreamFrom blk
continueAfter NonEmpty (RealPoint blk)
volPts
  where
    trace :: TraceIteratorEvent blk -> m ()
trace = Tracer m (TraceIteratorEvent blk) -> TraceIteratorEvent blk -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (TraceIteratorEvent blk)
itTracer

    -- | Read the next block while in the 'InVolatileDB' state.
    nextInVolatileDB ::
         StreamFrom blk
         -- ^ In case the block corresponding to the first point in
         -- the path is missing from the VolatileDB, we can use this
         -- lower bound to try to stream it from the ImmutableDB (if
         -- the block indeed has been moved there).
      -> NonEmpty (RealPoint blk)
      -> m (IteratorResult blk b)
    nextInVolatileDB :: StreamFrom blk
-> NonEmpty (RealPoint blk) -> m (IteratorResult blk b)
nextInVolatileDB StreamFrom blk
continueFrom (pt :: RealPoint blk
pt@(RealPoint blk -> HeaderHash blk
forall blk. RealPoint blk -> HeaderHash blk
realPointHash -> HeaderHash blk
hash) NE.:| [RealPoint blk]
pts) =
      VolatileDB m blk
-> BlockComponent blk b -> HeaderHash blk -> m (Maybe b)
forall (m :: * -> *) blk.
VolatileDB m blk
-> forall b.
   HasCallStack =>
   BlockComponent blk b -> HeaderHash blk -> m (Maybe b)
VolatileDB.getBlockComponent VolatileDB m blk
itVolatileDB BlockComponent blk b
blockComponent HeaderHash blk
hash m (Maybe b)
-> (Maybe b -> m (IteratorResult blk b))
-> m (IteratorResult blk b)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
        -- Block is missing
        Maybe b
Nothing -> do
            TraceIteratorEvent blk -> m ()
trace (TraceIteratorEvent blk -> m ()) -> TraceIteratorEvent blk -> m ()
forall a b. (a -> b) -> a -> b
$ RealPoint blk -> TraceIteratorEvent blk
forall blk. RealPoint blk -> TraceIteratorEvent blk
BlockMissingFromVolatileDB RealPoint blk
pt
            -- Try if we can stream a block from the ImmutableDB that was
            -- previously in the VolatileDB. This will only work if the block
            -- was part of the current chain, otherwise it will not have been
            -- copied to the ImmutableDB.
            --
            -- This call cannot throw a 'ReadFutureSlotError' or a
            -- 'ReadFutureEBBError' because if the block is missing, it /must/
            -- have been garbage-collected, which means that its slot was
            -- older than the slot of the tip of the ImmutableDB.
            ((Tip blk -> RealPoint blk)
-> WithOrigin (Tip blk) -> WithOrigin (RealPoint blk)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Tip blk -> RealPoint blk
forall blk. Tip blk -> RealPoint blk
ImmutableDB.tipToRealPoint (WithOrigin (Tip blk) -> WithOrigin (RealPoint blk))
-> m (WithOrigin (Tip blk)) -> m (WithOrigin (RealPoint blk))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>
              STM m (WithOrigin (Tip blk)) -> m (WithOrigin (Tip blk))
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (ImmutableDB m blk -> STM m (WithOrigin (Tip blk))
forall (m :: * -> *) blk.
HasCallStack =>
ImmutableDB m blk -> STM m (WithOrigin (Tip blk))
ImmutableDB.getTip ImmutableDB m blk
itImmutableDB)) m (WithOrigin (RealPoint blk))
-> (WithOrigin (RealPoint blk) -> m (IteratorResult blk b))
-> m (IteratorResult blk b)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
              WithOrigin (RealPoint blk)
Origin ->
                -- The block was in the VolatileDB, but isn't anymore. This can
                -- only happen due to GC. It's not guaranteed that GC will have
                -- moved /that/ block to the ImmutableDb (so it might have just
                -- disappeared altogether), /but/ after GC the ImmutableDB
                -- cannot be empty (because GC will only be triggered after some
                -- newly immutable blocks have been copied to the ImmutableDB).
                String -> m (IteratorResult blk b)
forall a. HasCallStack => String -> a
error String
"nextInVolatileDB: impossible"
              NotOrigin RealPoint blk
tip -> do
                Either (MissingBlock blk) (Iterator m blk (Point blk, b))
errOrIt <- ImmutableDB m blk
-> ResourceRegistry m
-> BlockComponent blk (Point blk, b)
-> StreamFrom blk
-> StreamTo blk
-> m (Either (MissingBlock blk) (Iterator m blk (Point blk, b)))
forall (m :: * -> *) blk b.
HasCallStack =>
ImmutableDB m blk
-> ResourceRegistry m
-> BlockComponent blk b
-> StreamFrom blk
-> StreamTo blk
-> m (Either (MissingBlock blk) (Iterator m blk b))
ImmutableDB.stream
                  ImmutableDB m blk
itImmutableDB
                  ResourceRegistry m
registry
                  ((,) (Point blk -> b -> (Point blk, b))
-> BlockComponent blk (Point blk)
-> BlockComponent blk (b -> (Point blk, b))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> BlockComponent blk (Point blk)
forall blk. BlockComponent blk (Point blk)
getPoint BlockComponent blk (b -> (Point blk, b))
-> BlockComponent blk b -> BlockComponent blk (Point blk, b)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> BlockComponent blk b
blockComponent)
                  StreamFrom blk
continueFrom
                  (RealPoint blk -> StreamTo blk
forall blk. RealPoint blk -> StreamTo blk
StreamToInclusive RealPoint blk
tip)
                case Either (MissingBlock blk) (Iterator m blk (Point blk, b))
errOrIt of
                  -- The block was not found in the ImmutableDB, it must have
                  -- been garbage-collected
                  Left  MissingBlock blk
_ -> do
                    TraceIteratorEvent blk -> m ()
trace (TraceIteratorEvent blk -> m ()) -> TraceIteratorEvent blk -> m ()
forall a b. (a -> b) -> a -> b
$ RealPoint blk -> TraceIteratorEvent blk
forall blk. RealPoint blk -> TraceIteratorEvent blk
BlockGCedFromVolatileDB RealPoint blk
pt
                    IteratorResult blk b -> m (IteratorResult blk b)
forall (m :: * -> *) a. Monad m => a -> m a
return (IteratorResult blk b -> m (IteratorResult blk b))
-> IteratorResult blk b -> m (IteratorResult blk b)
forall a b. (a -> b) -> a -> b
$ RealPoint blk -> IteratorResult blk b
forall blk b. RealPoint blk -> IteratorResult blk b
IteratorBlockGCed RealPoint blk
pt
                  Right Iterator m blk (Point blk, b)
immIt ->
                    Maybe (StreamFrom blk)
-> Iterator m blk (Point blk, b)
-> NonEmpty (RealPoint blk)
-> m (IteratorResult blk b)
nextInImmutableDBRetry Maybe (StreamFrom blk)
forall a. Maybe a
Nothing Iterator m blk (Point blk, b)
immIt (RealPoint blk
pt RealPoint blk -> [RealPoint blk] -> NonEmpty (RealPoint blk)
forall a. a -> [a] -> NonEmpty a
NE.:| [RealPoint blk]
pts)

        -- Block is there
        Just b
b | Just NonEmpty (RealPoint blk)
pts' <- [RealPoint blk] -> Maybe (NonEmpty (RealPoint blk))
forall a. [a] -> Maybe (NonEmpty a)
NE.nonEmpty [RealPoint blk]
pts -> do
          let continueFrom' :: StreamFrom blk
continueFrom' = Point blk -> StreamFrom blk
forall blk. Point blk -> StreamFrom blk
StreamFromExclusive (RealPoint blk -> Point blk
forall blk. RealPoint blk -> Point blk
realPointToPoint RealPoint blk
pt)
          STM m () -> m ()
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ StrictTVar m (IteratorState m blk b)
-> IteratorState m blk b -> STM m ()
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
StrictTVar m a -> a -> STM m ()
writeTVar StrictTVar m (IteratorState m blk b)
varItState (StreamFrom blk -> NonEmpty (RealPoint blk) -> IteratorState m blk b
forall (m :: * -> *) blk b.
StreamFrom blk -> NonEmpty (RealPoint blk) -> IteratorState m blk b
InVolatileDB StreamFrom blk
continueFrom' NonEmpty (RealPoint blk)
pts')
          IteratorResult blk b -> m (IteratorResult blk b)
forall (m :: * -> *) a. Monad m => a -> m a
return (IteratorResult blk b -> m (IteratorResult blk b))
-> IteratorResult blk b -> m (IteratorResult blk b)
forall a b. (a -> b) -> a -> b
$ b -> IteratorResult blk b
forall blk b. b -> IteratorResult blk b
IteratorResult b
b
        -- No more points, so we can stop
        Just b
b -> do
          STM m () -> m ()
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ StrictTVar m (IteratorState m blk b)
-> IteratorState m blk b -> STM m ()
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
StrictTVar m a -> a -> STM m ()
writeTVar StrictTVar m (IteratorState m blk b)
varItState IteratorState m blk b
forall (m :: * -> *) blk b. IteratorState m blk b
Closed
          IteratorResult blk b -> m (IteratorResult blk b)
forall (m :: * -> *) a. Monad m => a -> m a
return (IteratorResult blk b -> m (IteratorResult blk b))
-> IteratorResult blk b -> m (IteratorResult blk b)
forall a b. (a -> b) -> a -> b
$ b -> IteratorResult blk b
forall blk b. b -> IteratorResult blk b
IteratorResult b
b

    -- | Read the next block while in the 'InImmutableDB' state.
    nextInImmutableDB ::
         StreamFrom blk
      -> ImmutableDB.Iterator m blk (Point blk, b)
      -> InImmutableDBEnd blk
      -> m (IteratorResult blk b)
    nextInImmutableDB :: StreamFrom blk
-> Iterator m blk (Point blk, b)
-> InImmutableDBEnd blk
-> m (IteratorResult blk b)
nextInImmutableDB StreamFrom blk
continueFrom Iterator m blk (Point blk, b)
immIt InImmutableDBEnd blk
immEnd =
      Iterator m blk (Point blk, b) -> m (Done (Point blk, b))
selectResult Iterator m blk (Point blk, b)
immIt m (Done (Point blk, b))
-> (Done (Point blk, b) -> m (IteratorResult blk b))
-> m (IteratorResult blk b)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
        NotDone (Point blk
pt, b
b) -> do
          let continueFrom' :: StreamFrom blk
continueFrom' = Point blk -> StreamFrom blk
forall blk. Point blk -> StreamFrom blk
StreamFromExclusive Point blk
pt
          STM m () -> m ()
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ StrictTVar m (IteratorState m blk b)
-> IteratorState m blk b -> STM m ()
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
StrictTVar m a -> a -> STM m ()
writeTVar StrictTVar m (IteratorState m blk b)
varItState (StreamFrom blk
-> Iterator m blk (Point blk, b)
-> InImmutableDBEnd blk
-> IteratorState m blk b
forall (m :: * -> *) blk b.
StreamFrom blk
-> Iterator m blk (Point blk, b)
-> InImmutableDBEnd blk
-> IteratorState m blk b
InImmutableDB StreamFrom blk
continueFrom' Iterator m blk (Point blk, b)
immIt InImmutableDBEnd blk
immEnd)
          IteratorResult blk b -> m (IteratorResult blk b)
forall (m :: * -> *) a. Monad m => a -> m a
return (IteratorResult blk b -> m (IteratorResult blk b))
-> IteratorResult blk b -> m (IteratorResult blk b)
forall a b. (a -> b) -> a -> b
$ b -> IteratorResult blk b
forall blk b. b -> IteratorResult blk b
IteratorResult b
b
        -- True indicates that this is the last element in the stream
        DoneAfter (Point blk
pt, b
b) | SwitchToVolatileDBFrom StreamTo blk
_ NonEmpty (RealPoint blk)
pts <- InImmutableDBEnd blk
immEnd -> do
          let continueFrom' :: StreamFrom blk
continueFrom' = Point blk -> StreamFrom blk
forall blk. Point blk -> StreamFrom blk
StreamFromExclusive Point blk
pt
          STM m () -> m ()
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ StrictTVar m (IteratorState m blk b)
-> IteratorState m blk b -> STM m ()
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
StrictTVar m a -> a -> STM m ()
writeTVar StrictTVar m (IteratorState m blk b)
varItState (StreamFrom blk -> NonEmpty (RealPoint blk) -> IteratorState m blk b
forall (m :: * -> *) blk b.
StreamFrom blk -> NonEmpty (RealPoint blk) -> IteratorState m blk b
InVolatileDB StreamFrom blk
continueFrom' NonEmpty (RealPoint blk)
pts)
          IteratorResult blk b -> m (IteratorResult blk b)
forall (m :: * -> *) a. Monad m => a -> m a
return (IteratorResult blk b -> m (IteratorResult blk b))
-> IteratorResult blk b -> m (IteratorResult blk b)
forall a b. (a -> b) -> a -> b
$ b -> IteratorResult blk b
forall blk b. b -> IteratorResult blk b
IteratorResult b
b
        DoneAfter (Point blk
_pt, b
b) -> do
          STM m () -> m ()
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ StrictTVar m (IteratorState m blk b)
-> IteratorState m blk b -> STM m ()
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
StrictTVar m a -> a -> STM m ()
writeTVar StrictTVar m (IteratorState m blk b)
varItState IteratorState m blk b
forall (m :: * -> *) blk b. IteratorState m blk b
Closed
          IteratorResult blk b -> m (IteratorResult blk b)
forall (m :: * -> *) a. Monad m => a -> m a
return (IteratorResult blk b -> m (IteratorResult blk b))
-> IteratorResult blk b -> m (IteratorResult blk b)
forall a b. (a -> b) -> a -> b
$ b -> IteratorResult blk b
forall blk b. b -> IteratorResult blk b
IteratorResult b
b
        Done (Point blk, b)
Done | SwitchToVolatileDBFrom StreamTo blk
_ NonEmpty (RealPoint blk)
pts <- InImmutableDBEnd blk
immEnd ->
          StreamFrom blk
-> NonEmpty (RealPoint blk) -> m (IteratorResult blk b)
nextInVolatileDB StreamFrom blk
continueFrom NonEmpty (RealPoint blk)
pts
        Done (Point blk, b)
Done -> do
          -- No need to switch to the VolatileDB, so we can stop
          STM m () -> m ()
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ StrictTVar m (IteratorState m blk b)
-> IteratorState m blk b -> STM m ()
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
StrictTVar m a -> a -> STM m ()
writeTVar StrictTVar m (IteratorState m blk b)
varItState IteratorState m blk b
forall (m :: * -> *) blk b. IteratorState m blk b
Closed
          IteratorResult blk b -> m (IteratorResult blk b)
forall (m :: * -> *) a. Monad m => a -> m a
return IteratorResult blk b
forall blk b. IteratorResult blk b
IteratorExhausted

    -- | Read the next block while in the 'InImmutableDBRetry' state.
    --
    -- We try to stream blocks that we suspect are now in the ImmutableDB
    -- because they are no longer in the VolatileDB. We don't know this for
    -- sure, so we must check whether they match the expected points.
    nextInImmutableDBRetry ::
         Maybe (StreamFrom blk)
         -- ^ 'Nothing' iff the iterator was just opened and nothing has been
         -- streamed from it yet. This is used to avoid switching right back
         -- to the VolatileDB if we came from there.
       -> ImmutableDB.Iterator m blk (Point blk, b)
       -> NonEmpty (RealPoint blk)
       -> m (IteratorResult blk b)
    nextInImmutableDBRetry :: Maybe (StreamFrom blk)
-> Iterator m blk (Point blk, b)
-> NonEmpty (RealPoint blk)
-> m (IteratorResult blk b)
nextInImmutableDBRetry Maybe (StreamFrom blk)
mbContinueFrom Iterator m blk (Point blk, b)
immIt (RealPoint blk
expectedPt NE.:| [RealPoint blk]
pts) =
      Iterator m blk (Point blk, b) -> m (Done (Point blk, b))
selectResult Iterator m blk (Point blk, b)
immIt m (Done (Point blk, b))
-> (Done (Point blk, b) -> m (IteratorResult blk b))
-> m (IteratorResult blk b)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
        NotDone (Point blk
actualPt, b
b) | Point blk
actualPt Point blk -> Point blk -> Bool
forall a. Eq a => a -> a -> Bool
== RealPoint blk -> Point blk
forall blk. RealPoint blk -> Point blk
realPointToPoint RealPoint blk
expectedPt -> do
          TraceIteratorEvent blk -> m ()
trace (TraceIteratorEvent blk -> m ()) -> TraceIteratorEvent blk -> m ()
forall a b. (a -> b) -> a -> b
$ RealPoint blk -> TraceIteratorEvent blk
forall blk. RealPoint blk -> TraceIteratorEvent blk
BlockWasCopiedToImmutableDB RealPoint blk
expectedPt
          let continueFrom' :: StreamFrom blk
continueFrom' = Point blk -> StreamFrom blk
forall blk. Point blk -> StreamFrom blk
StreamFromExclusive (RealPoint blk -> Point blk
forall blk. RealPoint blk -> Point blk
realPointToPoint RealPoint blk
expectedPt)
          case [RealPoint blk] -> Maybe (NonEmpty (RealPoint blk))
forall a. [a] -> Maybe (NonEmpty a)
NE.nonEmpty [RealPoint blk]
pts of
            Maybe (NonEmpty (RealPoint blk))
Nothing      -> do
              STM m () -> m ()
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ StrictTVar m (IteratorState m blk b)
-> IteratorState m blk b -> STM m ()
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
StrictTVar m a -> a -> STM m ()
writeTVar StrictTVar m (IteratorState m blk b)
varItState IteratorState m blk b
forall (m :: * -> *) blk b. IteratorState m blk b
Closed
              Iterator m blk (Point blk, b) -> HasCallStack => m ()
forall (m :: * -> *) blk b.
Iterator m blk b -> HasCallStack => m ()
ImmutableDB.iteratorClose Iterator m blk (Point blk, b)
immIt
            Just NonEmpty (RealPoint blk)
pts' ->
              STM m () -> m ()
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ StrictTVar m (IteratorState m blk b)
-> IteratorState m blk b -> STM m ()
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
StrictTVar m a -> a -> STM m ()
writeTVar StrictTVar m (IteratorState m blk b)
varItState (IteratorState m blk b -> STM m ())
-> IteratorState m blk b -> STM m ()
forall a b. (a -> b) -> a -> b
$
                StreamFrom blk
-> Iterator m blk (Point blk, b)
-> NonEmpty (RealPoint blk)
-> IteratorState m blk b
forall (m :: * -> *) blk b.
StreamFrom blk
-> Iterator m blk (Point blk, b)
-> NonEmpty (RealPoint blk)
-> IteratorState m blk b
InImmutableDBRetry StreamFrom blk
continueFrom' Iterator m blk (Point blk, b)
immIt NonEmpty (RealPoint blk)
pts'
          IteratorResult blk b -> m (IteratorResult blk b)
forall (m :: * -> *) a. Monad m => a -> m a
return (IteratorResult blk b -> m (IteratorResult blk b))
-> IteratorResult blk b -> m (IteratorResult blk b)
forall a b. (a -> b) -> a -> b
$ b -> IteratorResult blk b
forall blk b. b -> IteratorResult blk b
IteratorResult b
b

        DoneAfter (Point blk
actualPt, b
b) | Point blk
actualPt Point blk -> Point blk -> Bool
forall a. Eq a => a -> a -> Bool
== RealPoint blk -> Point blk
forall blk. RealPoint blk -> Point blk
realPointToPoint RealPoint blk
expectedPt -> do
          -- 'DoneAfter': 'selectResult' will have closed the ImmutableDB iterator
          -- already
          TraceIteratorEvent blk -> m ()
trace (TraceIteratorEvent blk -> m ()) -> TraceIteratorEvent blk -> m ()
forall a b. (a -> b) -> a -> b
$ RealPoint blk -> TraceIteratorEvent blk
forall blk. RealPoint blk -> TraceIteratorEvent blk
BlockWasCopiedToImmutableDB RealPoint blk
expectedPt
          let continueFrom' :: StreamFrom blk
continueFrom' = Point blk -> StreamFrom blk
forall blk. Point blk -> StreamFrom blk
StreamFromExclusive (RealPoint blk -> Point blk
forall blk. RealPoint blk -> Point blk
realPointToPoint RealPoint blk
expectedPt)
          case [RealPoint blk] -> Maybe (NonEmpty (RealPoint blk))
forall a. [a] -> Maybe (NonEmpty a)
NE.nonEmpty [RealPoint blk]
pts of
            Maybe (NonEmpty (RealPoint blk))
Nothing      -> STM m () -> m ()
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ StrictTVar m (IteratorState m blk b)
-> IteratorState m blk b -> STM m ()
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
StrictTVar m a -> a -> STM m ()
writeTVar StrictTVar m (IteratorState m blk b)
varItState IteratorState m blk b
forall (m :: * -> *) blk b. IteratorState m blk b
Closed
            Just NonEmpty (RealPoint blk)
pts' -> do
              STM m () -> m ()
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ StrictTVar m (IteratorState m blk b)
-> IteratorState m blk b -> STM m ()
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
StrictTVar m a -> a -> STM m ()
writeTVar StrictTVar m (IteratorState m blk b)
varItState (IteratorState m blk b -> STM m ())
-> IteratorState m blk b -> STM m ()
forall a b. (a -> b) -> a -> b
$ StreamFrom blk -> NonEmpty (RealPoint blk) -> IteratorState m blk b
forall (m :: * -> *) blk b.
StreamFrom blk -> NonEmpty (RealPoint blk) -> IteratorState m blk b
InVolatileDB StreamFrom blk
continueFrom' NonEmpty (RealPoint blk)
pts'
              TraceIteratorEvent blk -> m ()
trace TraceIteratorEvent blk
forall blk. TraceIteratorEvent blk
SwitchBackToVolatileDB
          IteratorResult blk b -> m (IteratorResult blk b)
forall (m :: * -> *) a. Monad m => a -> m a
return (IteratorResult blk b -> m (IteratorResult blk b))
-> IteratorResult blk b -> m (IteratorResult blk b)
forall a b. (a -> b) -> a -> b
$ b -> IteratorResult blk b
forall blk b. b -> IteratorResult blk b
IteratorResult b
b

        -- Point mismatch or 'Done'. Close the ImmutableDB Iterator (idempotent).
        Done (Point blk, b)
_ -> Iterator m blk (Point blk, b) -> HasCallStack => m ()
forall (m :: * -> *) blk b.
Iterator m blk b -> HasCallStack => m ()
ImmutableDB.iteratorClose Iterator m blk (Point blk, b)
immIt m () -> m (IteratorResult blk b) -> m (IteratorResult blk b)
forall (f :: * -> *) a b. Applicative f => f a -> f b -> f b
*> case Maybe (StreamFrom blk)
mbContinueFrom of
          -- We just switched to this state and the iterator was just opened.
          -- The block must be GC'ed, since we opened the iterator because it
          -- was missing from the VolatileDB and now it is not in the
          -- ImmutableDB either.
          Maybe (StreamFrom blk)
Nothing -> do
            STM m () -> m ()
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ StrictTVar m (IteratorState m blk b)
-> IteratorState m blk b -> STM m ()
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
StrictTVar m a -> a -> STM m ()
writeTVar StrictTVar m (IteratorState m blk b)
varItState IteratorState m blk b
forall (m :: * -> *) blk b. IteratorState m blk b
Closed
            TraceIteratorEvent blk -> m ()
trace (TraceIteratorEvent blk -> m ()) -> TraceIteratorEvent blk -> m ()
forall a b. (a -> b) -> a -> b
$ RealPoint blk -> TraceIteratorEvent blk
forall blk. RealPoint blk -> TraceIteratorEvent blk
BlockGCedFromVolatileDB RealPoint blk
expectedPt
            IteratorResult blk b -> m (IteratorResult blk b)
forall (m :: * -> *) a. Monad m => a -> m a
return (IteratorResult blk b -> m (IteratorResult blk b))
-> IteratorResult blk b -> m (IteratorResult blk b)
forall a b. (a -> b) -> a -> b
$ RealPoint blk -> IteratorResult blk b
forall blk b. RealPoint blk -> IteratorResult blk b
IteratorBlockGCed RealPoint blk
expectedPt

          -- We have already streamed something from the iterator. We can try
          -- looking in the VolatileDB again. If we hadn't streamed something
          -- yet, switching to the VolatileDB would be pointless as we just
          -- came from there.
          Just StreamFrom blk
continueFrom -> do
            TraceIteratorEvent blk -> m ()
trace TraceIteratorEvent blk
forall blk. TraceIteratorEvent blk
SwitchBackToVolatileDB
            StreamFrom blk
-> NonEmpty (RealPoint blk) -> m (IteratorResult blk b)
nextInVolatileDB StreamFrom blk
continueFrom (RealPoint blk
expectedPt RealPoint blk -> [RealPoint blk] -> NonEmpty (RealPoint blk)
forall a. a -> [a] -> NonEmpty a
NE.:| [RealPoint blk]
pts)

    -- | Given an ImmutableDB iterator, try to stream a value from it and
    -- convert it to a 'Done'. See the documentation of 'Done' for more
    -- details.
    --
    -- Note that this function closes the iterator when necessary, i.e., when
    -- the return value is 'Done' or 'DoneAfter'.
    selectResult ::
         ImmutableDB.Iterator m blk (Point blk, b)
      -> m (Done (Point blk, b))
    selectResult :: Iterator m blk (Point blk, b) -> m (Done (Point blk, b))
selectResult Iterator m blk (Point blk, b)
immIt = do
        IteratorResult (Point blk, b)
itRes   <-                        Iterator m blk (Point blk, b)
-> HasCallStack => m (IteratorResult (Point blk, b))
forall (m :: * -> *) blk b.
Iterator m blk b -> HasCallStack => m (IteratorResult b)
ImmutableDB.iteratorNext    Iterator m blk (Point blk, b)
immIt
        Bool
hasNext <- Maybe (RealPoint blk) -> Bool
forall a. Maybe a -> Bool
isJust (Maybe (RealPoint blk) -> Bool)
-> m (Maybe (RealPoint blk)) -> m Bool
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> STM m (Maybe (RealPoint blk)) -> m (Maybe (RealPoint blk))
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (Iterator m blk (Point blk, b)
-> HasCallStack => STM m (Maybe (RealPoint blk))
forall (m :: * -> *) blk b.
Iterator m blk b -> HasCallStack => STM m (Maybe (RealPoint blk))
ImmutableDB.iteratorHasNext Iterator m blk (Point blk, b)
immIt)
        case IteratorResult (Point blk, b)
itRes of
          ImmutableDB.IteratorResult (Point blk, b)
blk -> (Point blk, b) -> Bool -> m (Done (Point blk, b))
forall blk. blk -> Bool -> m (Done blk)
select (Point blk, b)
blk Bool
hasNext
          IteratorResult (Point blk, b)
ImmutableDB.IteratorExhausted  -> Done (Point blk, b) -> m (Done (Point blk, b))
forall (m :: * -> *) a. Monad m => a -> m a
return Done (Point blk, b)
forall blk. Done blk
Done
      where
        select :: blk -> Bool -> m (Done blk)
select blk
blk Bool
hasNext
          | Bool
hasNext
          = Done blk -> m (Done blk)
forall (m :: * -> *) a. Monad m => a -> m a
return (Done blk -> m (Done blk)) -> Done blk -> m (Done blk)
forall a b. (a -> b) -> a -> b
$ blk -> Done blk
forall blk. blk -> Done blk
NotDone blk
blk
          | Bool
otherwise
          = Iterator m blk (Point blk, b) -> HasCallStack => m ()
forall (m :: * -> *) blk b.
Iterator m blk b -> HasCallStack => m ()
ImmutableDB.iteratorClose Iterator m blk (Point blk, b)
immIt m () -> Done blk -> m (Done blk)
forall (f :: * -> *) a b. Functor f => f a -> b -> f b
$> blk -> Done blk
forall blk. blk -> Done blk
DoneAfter blk
blk

-- | Auxiliary data type used for 'selectResult' in 'implIteratorNext'.
data Done blk =
    Done
    -- ^ We're done with the iterator, either it is exhausted or we reached
    -- its upper bound.
  | DoneAfter blk
    -- ^ We're done with the iterator, but have to return this last block. We
    -- must have reached its upper /inclusive/ bound.
  | NotDone     blk
    -- ^ We're not done yet with the iterator and have to return this block.

missingBlockToUnknownRange ::
     ImmutableDB.MissingBlock blk
  -> UnknownRange blk
missingBlockToUnknownRange :: MissingBlock blk -> UnknownRange blk
missingBlockToUnknownRange = RealPoint blk -> UnknownRange blk
forall blk. RealPoint blk -> UnknownRange blk
MissingBlock (RealPoint blk -> UnknownRange blk)
-> (MissingBlock blk -> RealPoint blk)
-> MissingBlock blk
-> UnknownRange blk
forall b c a. (b -> c) -> (a -> b) -> a -> c
. MissingBlock blk -> RealPoint blk
forall blk. MissingBlock blk -> RealPoint blk
ImmutableDB.missingBlockPoint

-- | Close all open 'Iterator's.
--
-- This /can/ be called when the ChainDB is already closed.
closeAllIterators :: IOLike m => ChainDbEnv m blk -> m ()
closeAllIterators :: ChainDbEnv m blk -> m ()
closeAllIterators CDB{Tracer m (LedgerDB' blk)
Tracer m (TraceEvent blk)
DiffTime
StrictTVar m (m ())
StrictTVar m (FutureBlocks m blk)
StrictTVar m (Map FollowerKey (FollowerHandle m blk))
StrictTVar m (Map IteratorKey (m ()))
StrictTVar m (AnchoredFragment (Header blk))
StrictTVar m (StrictMaybe (Header blk))
StrictTVar m (WithFingerprint (InvalidBlocks blk))
StrictTVar m (TentativeState blk)
StrictTVar m FollowerKey
StrictTVar m IteratorKey
TopLevelConfig blk
StrictMVar m ()
VolatileDB m blk
ChunkInfo
ResourceRegistry m
ImmutableDB m blk
CheckInFuture m blk
LgrDB m blk
BlocksToAdd m blk
blk -> Bool
cdbFutureBlocks :: StrictTVar m (FutureBlocks m blk)
cdbBlocksToAdd :: BlocksToAdd m blk
cdbCheckInFuture :: CheckInFuture m blk
cdbCheckIntegrity :: blk -> Bool
cdbChunkInfo :: ChunkInfo
cdbKillBgThreads :: StrictTVar m (m ())
cdbGcInterval :: DiffTime
cdbGcDelay :: DiffTime
cdbRegistry :: ResourceRegistry m
cdbTraceLedger :: Tracer m (LedgerDB' blk)
cdbTracer :: Tracer m (TraceEvent blk)
cdbCopyLock :: StrictMVar m ()
cdbNextFollowerKey :: StrictTVar m FollowerKey
cdbNextIteratorKey :: StrictTVar m IteratorKey
cdbInvalid :: StrictTVar m (WithFingerprint (InvalidBlocks blk))
cdbTopLevelConfig :: TopLevelConfig blk
cdbFollowers :: StrictTVar m (Map FollowerKey (FollowerHandle m blk))
cdbIterators :: StrictTVar m (Map IteratorKey (m ()))
cdbTentativeHeader :: StrictTVar m (StrictMaybe (Header blk))
cdbTentativeState :: StrictTVar m (TentativeState blk)
cdbChain :: StrictTVar m (AnchoredFragment (Header blk))
cdbLgrDB :: LgrDB m blk
cdbVolatileDB :: VolatileDB m blk
cdbImmutableDB :: ImmutableDB m blk
cdbFutureBlocks :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m (FutureBlocks m blk)
cdbBlocksToAdd :: forall (m :: * -> *) blk. ChainDbEnv m blk -> BlocksToAdd m blk
cdbCheckInFuture :: forall (m :: * -> *) blk. ChainDbEnv m blk -> CheckInFuture m blk
cdbCheckIntegrity :: forall (m :: * -> *) blk. ChainDbEnv m blk -> blk -> Bool
cdbChunkInfo :: forall (m :: * -> *) blk. ChainDbEnv m blk -> ChunkInfo
cdbKillBgThreads :: forall (m :: * -> *) blk. ChainDbEnv m blk -> StrictTVar m (m ())
cdbGcInterval :: forall (m :: * -> *) blk. ChainDbEnv m blk -> DiffTime
cdbGcDelay :: forall (m :: * -> *) blk. ChainDbEnv m blk -> DiffTime
cdbRegistry :: forall (m :: * -> *) blk. ChainDbEnv m blk -> ResourceRegistry m
cdbTraceLedger :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> Tracer m (LedgerDB' blk)
cdbTracer :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> Tracer m (TraceEvent blk)
cdbCopyLock :: forall (m :: * -> *) blk. ChainDbEnv m blk -> StrictMVar m ()
cdbNextFollowerKey :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m FollowerKey
cdbNextIteratorKey :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m IteratorKey
cdbInvalid :: forall (m :: * -> *) blk.
ChainDbEnv m blk
-> StrictTVar m (WithFingerprint (InvalidBlocks blk))
cdbTopLevelConfig :: forall (m :: * -> *) blk. ChainDbEnv m blk -> TopLevelConfig blk
cdbFollowers :: forall (m :: * -> *) blk.
ChainDbEnv m blk
-> StrictTVar m (Map FollowerKey (FollowerHandle m blk))
cdbIterators :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m (Map IteratorKey (m ()))
cdbTentativeHeader :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m (StrictMaybe (Header blk))
cdbTentativeState :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m (TentativeState blk)
cdbChain :: forall (m :: * -> *) blk.
ChainDbEnv m blk -> StrictTVar m (AnchoredFragment (Header blk))
cdbLgrDB :: forall (m :: * -> *) blk. ChainDbEnv m blk -> LgrDB m blk
cdbVolatileDB :: forall (m :: * -> *) blk. ChainDbEnv m blk -> VolatileDB m blk
cdbImmutableDB :: forall (m :: * -> *) blk. ChainDbEnv m blk -> ImmutableDB m blk
..} = do
    [m ()]
iteratorClosers <- STM m [m ()] -> m [m ()]
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m [m ()] -> m [m ()]) -> STM m [m ()] -> m [m ()]
forall a b. (a -> b) -> a -> b
$ Map IteratorKey (m ()) -> [m ()]
forall k a. Map k a -> [a]
Map.elems (Map IteratorKey (m ()) -> [m ()])
-> STM m (Map IteratorKey (m ())) -> STM m [m ()]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> StrictTVar m (Map IteratorKey (m ()))
-> STM m (Map IteratorKey (m ()))
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m (Map IteratorKey (m ()))
cdbIterators
    -- Note that each closer removes its entry from the 'cdbIterators' map.
    [m ()] -> m ()
forall (t :: * -> *) (m :: * -> *) a.
(Foldable t, Monad m) =>
t (m a) -> m ()
sequence_ [m ()]
iteratorClosers