{-# LANGUAGE FlexibleContexts    #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeApplications    #-}

-- | Monadic side of the Mempool implementation.
--
-- Using the functions defined in Ouroboros.Consensus.Mempool.Impl.Pure,
-- a dedicated constructor 'openMempool' is provided to encapsulate the mempool
-- functionality.
--
-- The implementation is based on a MempoolEnv that captures the relevant
-- variables to manage the mempool and is then used to craft functions that
-- conform to the Mempool datatype API.
--
-- The operations performed on the Mempool are written in a pure fashion in
-- Ouroboros.Consensus.Mempool.Impl.Pure.
module Ouroboros.Consensus.Mempool.Impl (
    openMempool
    -- * For testing purposes
  , LedgerInterface (..)
  , chainDBLedgerInterface
  , openMempoolWithoutSyncThread
  ) where

import           Control.Monad.Except
import           Data.Typeable

import           Control.Tracer

import           Ouroboros.Consensus.Storage.ChainDB (ChainDB)
import qualified Ouroboros.Consensus.Storage.ChainDB.API as ChainDB

import           Ouroboros.Consensus.Block
import           Ouroboros.Consensus.HeaderValidation
import           Ouroboros.Consensus.Ledger.Abstract
import           Ouroboros.Consensus.Ledger.Extended
import           Ouroboros.Consensus.Ledger.SupportsMempool
import           Ouroboros.Consensus.Mempool.API
import           Ouroboros.Consensus.Mempool.Impl.Pure
import           Ouroboros.Consensus.Mempool.Impl.Types
import           Ouroboros.Consensus.Mempool.TxSeq (TicketNo, zeroTicketNo)
import           Ouroboros.Consensus.Util (whenJust)
import           Ouroboros.Consensus.Util.IOLike
import           Ouroboros.Consensus.Util.ResourceRegistry
import           Ouroboros.Consensus.Util.STM (Watcher (..), forkLinkedWatcher)

{-------------------------------------------------------------------------------
  Top-level API
-------------------------------------------------------------------------------}

-- | Create a @Mempool m blk TicketNo@ in @m@ to manipulate the mempool. It
-- will also fork a thread that syncs the mempool and the ledger when the ledger
-- changes.
openMempool
  :: ( IOLike m
     , LedgerSupportsMempool blk
     , HasTxId (GenTx blk)
     , ValidateEnvelope blk
     )
  => ResourceRegistry m
  -> LedgerInterface m blk
  -> LedgerConfig blk
  -> MempoolCapacityBytesOverride
  -> Tracer m (TraceEventMempool blk)
  -> (GenTx blk -> TxSizeInBytes)
  -> m (Mempool m blk TicketNo)
openMempool :: ResourceRegistry m
-> LedgerInterface m blk
-> LedgerConfig blk
-> MempoolCapacityBytesOverride
-> Tracer m (TraceEventMempool blk)
-> (GenTx blk -> TxSizeInBytes)
-> m (Mempool m blk TicketNo)
openMempool ResourceRegistry m
registry LedgerInterface m blk
ledger LedgerConfig blk
cfg MempoolCapacityBytesOverride
capacityOverride Tracer m (TraceEventMempool blk)
tracer GenTx blk -> TxSizeInBytes
txSize = do
    MempoolEnv m blk
env <- LedgerInterface m blk
-> LedgerConfig blk
-> MempoolCapacityBytesOverride
-> Tracer m (TraceEventMempool blk)
-> (GenTx blk -> TxSizeInBytes)
-> m (MempoolEnv m blk)
forall (m :: * -> *) blk.
(IOLike m, NoThunks (GenTxId blk), LedgerSupportsMempool blk,
 ValidateEnvelope blk) =>
LedgerInterface m blk
-> LedgerConfig blk
-> MempoolCapacityBytesOverride
-> Tracer m (TraceEventMempool blk)
-> (GenTx blk -> TxSizeInBytes)
-> m (MempoolEnv m blk)
initMempoolEnv LedgerInterface m blk
ledger LedgerConfig blk
cfg MempoolCapacityBytesOverride
capacityOverride Tracer m (TraceEventMempool blk)
tracer GenTx blk -> TxSizeInBytes
txSize
    ResourceRegistry m -> MempoolEnv m blk -> m ()
forall (m :: * -> *) blk.
(IOLike m, LedgerSupportsMempool blk, HasTxId (GenTx blk),
 ValidateEnvelope blk) =>
ResourceRegistry m -> MempoolEnv m blk -> m ()
forkSyncStateOnTipPointChange ResourceRegistry m
registry MempoolEnv m blk
env
    Mempool m blk TicketNo -> m (Mempool m blk TicketNo)
forall (m :: * -> *) a. Monad m => a -> m a
return (Mempool m blk TicketNo -> m (Mempool m blk TicketNo))
-> Mempool m blk TicketNo -> m (Mempool m blk TicketNo)
forall a b. (a -> b) -> a -> b
$ MempoolEnv m blk -> Mempool m blk TicketNo
forall (m :: * -> *) blk.
(IOLike m, LedgerSupportsMempool blk, HasTxId (GenTx blk),
 ValidateEnvelope blk) =>
MempoolEnv m blk -> Mempool m blk TicketNo
mkMempool MempoolEnv m blk
env

-- | Unlike 'openMempool', this function does not fork a background thread
-- that synchronises with the ledger state whenever the later changes.
--
-- Intended for testing purposes.
openMempoolWithoutSyncThread
  :: ( IOLike m
     , LedgerSupportsMempool blk
     , HasTxId (GenTx blk)
     , ValidateEnvelope blk
     )
  => LedgerInterface m blk
  -> LedgerConfig blk
  -> MempoolCapacityBytesOverride
  -> Tracer m (TraceEventMempool blk)
  -> (GenTx blk -> TxSizeInBytes)
  -> m (Mempool m blk TicketNo)
openMempoolWithoutSyncThread :: LedgerInterface m blk
-> LedgerConfig blk
-> MempoolCapacityBytesOverride
-> Tracer m (TraceEventMempool blk)
-> (GenTx blk -> TxSizeInBytes)
-> m (Mempool m blk TicketNo)
openMempoolWithoutSyncThread LedgerInterface m blk
ledger LedgerConfig blk
cfg MempoolCapacityBytesOverride
capacityOverride Tracer m (TraceEventMempool blk)
tracer GenTx blk -> TxSizeInBytes
txSize =
    MempoolEnv m blk -> Mempool m blk TicketNo
forall (m :: * -> *) blk.
(IOLike m, LedgerSupportsMempool blk, HasTxId (GenTx blk),
 ValidateEnvelope blk) =>
MempoolEnv m blk -> Mempool m blk TicketNo
mkMempool (MempoolEnv m blk -> Mempool m blk TicketNo)
-> m (MempoolEnv m blk) -> m (Mempool m blk TicketNo)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> LedgerInterface m blk
-> LedgerConfig blk
-> MempoolCapacityBytesOverride
-> Tracer m (TraceEventMempool blk)
-> (GenTx blk -> TxSizeInBytes)
-> m (MempoolEnv m blk)
forall (m :: * -> *) blk.
(IOLike m, NoThunks (GenTxId blk), LedgerSupportsMempool blk,
 ValidateEnvelope blk) =>
LedgerInterface m blk
-> LedgerConfig blk
-> MempoolCapacityBytesOverride
-> Tracer m (TraceEventMempool blk)
-> (GenTx blk -> TxSizeInBytes)
-> m (MempoolEnv m blk)
initMempoolEnv LedgerInterface m blk
ledger LedgerConfig blk
cfg MempoolCapacityBytesOverride
capacityOverride Tracer m (TraceEventMempool blk)
tracer GenTx blk -> TxSizeInBytes
txSize

mkMempool
  :: ( IOLike m
     , LedgerSupportsMempool blk
     , HasTxId (GenTx blk)
     , ValidateEnvelope blk
     )
  => MempoolEnv m blk -> Mempool m blk TicketNo
mkMempool :: MempoolEnv m blk -> Mempool m blk TicketNo
mkMempool MempoolEnv m blk
mpEnv = Mempool :: forall (m :: * -> *) blk idx.
(WhetherToIntervene
 -> [GenTx blk] -> m ([MempoolAddTxResult blk], [GenTx blk]))
-> ([GenTxId blk] -> m ())
-> m (MempoolSnapshot blk idx)
-> STM m (MempoolSnapshot blk idx)
-> (ForgeLedgerState blk -> STM m (MempoolSnapshot blk idx))
-> STM m MempoolCapacityBytes
-> (GenTx blk -> TxSizeInBytes)
-> idx
-> Mempool m blk idx
Mempool
    { tryAddTxs :: WhetherToIntervene
-> [GenTx blk] -> m ([MempoolAddTxResult blk], [GenTx blk])
tryAddTxs      = StrictTVar m (InternalState blk)
-> LedgerConfig blk
-> (GenTx blk -> TxSizeInBytes)
-> Tracer m (TraceEventMempool blk)
-> WhetherToIntervene
-> [GenTx blk]
-> m ([MempoolAddTxResult blk], [GenTx blk])
forall (m :: * -> *) blk.
(MonadSTM m, LedgerSupportsMempool blk, HasTxId (GenTx blk)) =>
StrictTVar m (InternalState blk)
-> LedgerConfig blk
-> (GenTx blk -> TxSizeInBytes)
-> Tracer m (TraceEventMempool blk)
-> WhetherToIntervene
-> [GenTx blk]
-> m ([MempoolAddTxResult blk], [GenTx blk])
implTryAddTxs StrictTVar m (InternalState blk)
istate LedgerConfig blk
cfg GenTx blk -> TxSizeInBytes
txSize Tracer m (TraceEventMempool blk)
trcr
    , removeTxs :: [GenTxId blk] -> m ()
removeTxs      = \[GenTxId blk]
txs -> do
        Maybe (TraceEventMempool blk)
mTrace <- STM m (Maybe (TraceEventMempool blk))
-> m (Maybe (TraceEventMempool blk))
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m (Maybe (TraceEventMempool blk))
 -> m (Maybe (TraceEventMempool blk)))
-> STM m (Maybe (TraceEventMempool blk))
-> m (Maybe (TraceEventMempool blk))
forall a b. (a -> b) -> a -> b
$ do
          InternalState blk
is <- StrictTVar m (InternalState blk) -> STM m (InternalState blk)
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m (InternalState blk)
istate
          LedgerState blk
ls <- LedgerInterface m blk -> STM m (LedgerState blk)
forall (m :: * -> *) blk.
LedgerInterface m blk -> STM m (LedgerState blk)
getCurrentLedgerState LedgerInterface m blk
ldgr
          let p :: RemoveTxs blk
p = LedgerConfig blk
-> MempoolCapacityBytesOverride
-> [GenTxId blk]
-> InternalState blk
-> LedgerState blk
-> RemoveTxs blk
forall blk.
(LedgerSupportsMempool blk, HasTxId (GenTx blk),
 ValidateEnvelope blk) =>
LedgerConfig blk
-> MempoolCapacityBytesOverride
-> [GenTxId blk]
-> InternalState blk
-> LedgerState blk
-> RemoveTxs blk
pureRemoveTxs LedgerConfig blk
cfg MempoolCapacityBytesOverride
co [GenTxId blk]
txs InternalState blk
is LedgerState blk
ls
          StrictTVar m (InternalState blk)
-> RemoveTxs blk -> STM m (Maybe (TraceEventMempool blk))
forall (m :: * -> *) blk.
IOLike m =>
StrictTVar m (InternalState blk)
-> RemoveTxs blk -> STM m (Maybe (TraceEventMempool blk))
runRemoveTxs StrictTVar m (InternalState blk)
istate RemoveTxs blk
p
        Maybe (TraceEventMempool blk)
-> (TraceEventMempool blk -> m ()) -> m ()
forall (f :: * -> *) a.
Applicative f =>
Maybe a -> (a -> f ()) -> f ()
whenJust Maybe (TraceEventMempool blk)
mTrace (Tracer m (TraceEventMempool blk) -> TraceEventMempool blk -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (TraceEventMempool blk)
trcr)
    , syncWithLedger :: m (MempoolSnapshot blk TicketNo)
syncWithLedger = MempoolEnv m blk -> m (MempoolSnapshot blk TicketNo)
forall (m :: * -> *) blk.
(IOLike m, LedgerSupportsMempool blk, HasTxId (GenTx blk),
 ValidateEnvelope blk) =>
MempoolEnv m blk -> m (MempoolSnapshot blk TicketNo)
implSyncWithLedger MempoolEnv m blk
mpEnv
    , getSnapshot :: STM m (MempoolSnapshot blk TicketNo)
getSnapshot    = InternalState blk -> MempoolSnapshot blk TicketNo
forall blk.
HasTxId (GenTx blk) =>
InternalState blk -> MempoolSnapshot blk TicketNo
implSnapshotFromIS (InternalState blk -> MempoolSnapshot blk TicketNo)
-> STM m (InternalState blk)
-> STM m (MempoolSnapshot blk TicketNo)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> StrictTVar m (InternalState blk) -> STM m (InternalState blk)
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m (InternalState blk)
istate
    , getSnapshotFor :: ForgeLedgerState blk -> STM m (MempoolSnapshot blk TicketNo)
getSnapshotFor = \ForgeLedgerState blk
fls -> LedgerConfig blk
-> ForgeLedgerState blk
-> MempoolCapacityBytesOverride
-> InternalState blk
-> MempoolSnapshot blk TicketNo
forall blk.
(LedgerSupportsMempool blk, HasTxId (GenTx blk),
 ValidateEnvelope blk) =>
LedgerConfig blk
-> ForgeLedgerState blk
-> MempoolCapacityBytesOverride
-> InternalState blk
-> MempoolSnapshot blk TicketNo
pureGetSnapshotFor LedgerConfig blk
cfg ForgeLedgerState blk
fls MempoolCapacityBytesOverride
co (InternalState blk -> MempoolSnapshot blk TicketNo)
-> STM m (InternalState blk)
-> STM m (MempoolSnapshot blk TicketNo)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> StrictTVar m (InternalState blk) -> STM m (InternalState blk)
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m (InternalState blk)
istate
    , getCapacity :: STM m MempoolCapacityBytes
getCapacity    = InternalState blk -> MempoolCapacityBytes
forall blk. InternalState blk -> MempoolCapacityBytes
isCapacity (InternalState blk -> MempoolCapacityBytes)
-> STM m (InternalState blk) -> STM m MempoolCapacityBytes
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> StrictTVar m (InternalState blk) -> STM m (InternalState blk)
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m (InternalState blk)
istate
    , getTxSize :: GenTx blk -> TxSizeInBytes
getTxSize      = GenTx blk -> TxSizeInBytes
txSize
    , zeroIdx :: TicketNo
zeroIdx        = TicketNo
zeroTicketNo
    }
   where MempoolEnv{ mpEnvStateVar :: forall (m :: * -> *) blk.
MempoolEnv m blk -> StrictTVar m (InternalState blk)
mpEnvStateVar = StrictTVar m (InternalState blk)
istate
                   , mpEnvLedgerCfg :: forall (m :: * -> *) blk. MempoolEnv m blk -> LedgerConfig blk
mpEnvLedgerCfg = LedgerConfig blk
cfg
                   , mpEnvTxSize :: forall (m :: * -> *) blk.
MempoolEnv m blk -> GenTx blk -> TxSizeInBytes
mpEnvTxSize = GenTx blk -> TxSizeInBytes
txSize
                   , mpEnvTracer :: forall (m :: * -> *) blk.
MempoolEnv m blk -> Tracer m (TraceEventMempool blk)
mpEnvTracer = Tracer m (TraceEventMempool blk)
trcr
                   , mpEnvLedger :: forall (m :: * -> *) blk. MempoolEnv m blk -> LedgerInterface m blk
mpEnvLedger = LedgerInterface m blk
ldgr
                   , mpEnvCapacityOverride :: forall (m :: * -> *) blk.
MempoolEnv m blk -> MempoolCapacityBytesOverride
mpEnvCapacityOverride = MempoolCapacityBytesOverride
co
                   } = MempoolEnv m blk
mpEnv

-- | Abstract interface needed to run a Mempool.
data LedgerInterface m blk = LedgerInterface
    { LedgerInterface m blk -> STM m (LedgerState blk)
getCurrentLedgerState :: STM m (LedgerState blk)
    }

-- | Create a 'LedgerInterface' from a 'ChainDB'.
chainDBLedgerInterface ::
     (IOLike m, IsLedger (LedgerState blk))
  => ChainDB m blk -> LedgerInterface m blk
chainDBLedgerInterface :: ChainDB m blk -> LedgerInterface m blk
chainDBLedgerInterface ChainDB m blk
chainDB = LedgerInterface :: forall (m :: * -> *) blk.
STM m (LedgerState blk) -> LedgerInterface m blk
LedgerInterface
    { getCurrentLedgerState :: STM m (LedgerState blk)
getCurrentLedgerState = ExtLedgerState blk -> LedgerState blk
forall blk. ExtLedgerState blk -> LedgerState blk
ledgerState (ExtLedgerState blk -> LedgerState blk)
-> STM m (ExtLedgerState blk) -> STM m (LedgerState blk)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> ChainDB m blk -> STM m (ExtLedgerState blk)
forall (m :: * -> *) blk.
(Monad (STM m), IsLedger (LedgerState blk)) =>
ChainDB m blk -> STM m (ExtLedgerState blk)
ChainDB.getCurrentLedger ChainDB m blk
chainDB
    }

{-------------------------------------------------------------------------------
  Mempool environment
-------------------------------------------------------------------------------}

-- | The mempool environment captures all the associated variables wrt the
-- Mempool and is accessed by the Mempool interface on demand to perform the
-- different operations.
data MempoolEnv m blk = MempoolEnv {
      MempoolEnv m blk -> LedgerInterface m blk
mpEnvLedger           :: LedgerInterface m blk
    , MempoolEnv m blk -> LedgerConfig blk
mpEnvLedgerCfg        :: LedgerConfig blk
    , MempoolEnv m blk -> StrictTVar m (InternalState blk)
mpEnvStateVar         :: StrictTVar m (InternalState blk)
    , MempoolEnv m blk -> Tracer m (TraceEventMempool blk)
mpEnvTracer           :: Tracer m (TraceEventMempool blk)
    , MempoolEnv m blk -> GenTx blk -> TxSizeInBytes
mpEnvTxSize           :: GenTx blk -> TxSizeInBytes
    , MempoolEnv m blk -> MempoolCapacityBytesOverride
mpEnvCapacityOverride :: MempoolCapacityBytesOverride
    }

initMempoolEnv :: ( IOLike m
                  , NoThunks (GenTxId blk)
                  , LedgerSupportsMempool blk
                  , ValidateEnvelope blk
                  )
               => LedgerInterface m blk
               -> LedgerConfig blk
               -> MempoolCapacityBytesOverride
               -> Tracer m (TraceEventMempool blk)
               -> (GenTx blk -> TxSizeInBytes)
               -> m (MempoolEnv m blk)
initMempoolEnv :: LedgerInterface m blk
-> LedgerConfig blk
-> MempoolCapacityBytesOverride
-> Tracer m (TraceEventMempool blk)
-> (GenTx blk -> TxSizeInBytes)
-> m (MempoolEnv m blk)
initMempoolEnv LedgerInterface m blk
ledgerInterface LedgerConfig blk
cfg MempoolCapacityBytesOverride
capacityOverride Tracer m (TraceEventMempool blk)
tracer GenTx blk -> TxSizeInBytes
txSize = do
    LedgerState blk
st <- STM m (LedgerState blk) -> m (LedgerState blk)
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m (LedgerState blk) -> m (LedgerState blk))
-> STM m (LedgerState blk) -> m (LedgerState blk)
forall a b. (a -> b) -> a -> b
$ LedgerInterface m blk -> STM m (LedgerState blk)
forall (m :: * -> *) blk.
LedgerInterface m blk -> STM m (LedgerState blk)
getCurrentLedgerState LedgerInterface m blk
ledgerInterface
    let (SlotNo
slot, TickedLedgerState blk
st') = LedgerConfig blk
-> ForgeLedgerState blk -> (SlotNo, TickedLedgerState blk)
forall blk.
(UpdateLedger blk, ValidateEnvelope blk) =>
LedgerConfig blk
-> ForgeLedgerState blk -> (SlotNo, TickedLedgerState blk)
tickLedgerState LedgerConfig blk
cfg (LedgerState blk -> ForgeLedgerState blk
forall blk. LedgerState blk -> ForgeLedgerState blk
ForgeInUnknownSlot LedgerState blk
st)
    StrictTVar m (InternalState blk)
isVar <- InternalState blk -> m (StrictTVar m (InternalState blk))
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack, NoThunks a) =>
a -> m (StrictTVar m a)
newTVarIO (InternalState blk -> m (StrictTVar m (InternalState blk)))
-> InternalState blk -> m (StrictTVar m (InternalState blk))
forall a b. (a -> b) -> a -> b
$ MempoolCapacityBytesOverride
-> TicketNo -> SlotNo -> TickedLedgerState blk -> InternalState blk
forall blk.
LedgerSupportsMempool blk =>
MempoolCapacityBytesOverride
-> TicketNo -> SlotNo -> TickedLedgerState blk -> InternalState blk
initInternalState MempoolCapacityBytesOverride
capacityOverride TicketNo
zeroTicketNo SlotNo
slot TickedLedgerState blk
st'
    MempoolEnv m blk -> m (MempoolEnv m blk)
forall (m :: * -> *) a. Monad m => a -> m a
return MempoolEnv :: forall (m :: * -> *) blk.
LedgerInterface m blk
-> LedgerConfig blk
-> StrictTVar m (InternalState blk)
-> Tracer m (TraceEventMempool blk)
-> (GenTx blk -> TxSizeInBytes)
-> MempoolCapacityBytesOverride
-> MempoolEnv m blk
MempoolEnv
      { mpEnvLedger :: LedgerInterface m blk
mpEnvLedger           = LedgerInterface m blk
ledgerInterface
      , mpEnvLedgerCfg :: LedgerConfig blk
mpEnvLedgerCfg        = LedgerConfig blk
cfg
      , mpEnvStateVar :: StrictTVar m (InternalState blk)
mpEnvStateVar         = StrictTVar m (InternalState blk)
isVar
      , mpEnvTracer :: Tracer m (TraceEventMempool blk)
mpEnvTracer           = Tracer m (TraceEventMempool blk)
tracer
      , mpEnvTxSize :: GenTx blk -> TxSizeInBytes
mpEnvTxSize           = GenTx blk -> TxSizeInBytes
txSize
      , mpEnvCapacityOverride :: MempoolCapacityBytesOverride
mpEnvCapacityOverride = MempoolCapacityBytesOverride
capacityOverride
      }

-- | Spawn a thread which syncs the 'Mempool' state whenever the 'LedgerState'
-- changes.
forkSyncStateOnTipPointChange :: forall m blk. (
                                   IOLike m
                                 , LedgerSupportsMempool blk
                                 , HasTxId (GenTx blk)
                                 , ValidateEnvelope blk
                                 )
                              => ResourceRegistry m
                              -> MempoolEnv m blk
                              -> m ()
forkSyncStateOnTipPointChange :: ResourceRegistry m -> MempoolEnv m blk -> m ()
forkSyncStateOnTipPointChange ResourceRegistry m
registry MempoolEnv m blk
menv =
    m (Thread m Void) -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m (Thread m Void) -> m ()) -> m (Thread m Void) -> m ()
forall a b. (a -> b) -> a -> b
$ ResourceRegistry m
-> String -> Watcher m (Point blk) (Point blk) -> m (Thread m Void)
forall (m :: * -> *) a fp.
(IOLike m, Eq fp, HasCallStack) =>
ResourceRegistry m -> String -> Watcher m a fp -> m (Thread m Void)
forkLinkedWatcher
      ResourceRegistry m
registry
      String
"Mempool.syncStateOnTipPointChange"
      Watcher :: forall (m :: * -> *) a fp.
(a -> fp) -> Maybe fp -> (a -> m ()) -> STM m a -> Watcher m a fp
Watcher {
          wFingerprint :: Point blk -> Point blk
wFingerprint = Point blk -> Point blk
forall a. a -> a
id
        , wInitial :: Maybe (Point blk)
wInitial     = Maybe (Point blk)
forall a. Maybe a
Nothing
        , wNotify :: Point blk -> m ()
wNotify      = Point blk -> m ()
action
        , wReader :: STM m (Point blk)
wReader      = STM m (Point blk)
getCurrentTip
        }
  where

    action :: Point blk -> m ()
    action :: Point blk -> m ()
action Point blk
_tipPoint = m (MempoolSnapshot blk TicketNo) -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m (MempoolSnapshot blk TicketNo) -> m ())
-> m (MempoolSnapshot blk TicketNo) -> m ()
forall a b. (a -> b) -> a -> b
$ MempoolEnv m blk -> m (MempoolSnapshot blk TicketNo)
forall (m :: * -> *) blk.
(IOLike m, LedgerSupportsMempool blk, HasTxId (GenTx blk),
 ValidateEnvelope blk) =>
MempoolEnv m blk -> m (MempoolSnapshot blk TicketNo)
implSyncWithLedger MempoolEnv m blk
menv

    -- Using the tip ('Point') allows for quicker equality checks
    getCurrentTip :: STM m (Point blk)
    getCurrentTip :: STM m (Point blk)
getCurrentTip =
          Proxy blk -> LedgerState blk -> Point blk
forall blk.
UpdateLedger blk =>
Proxy blk -> LedgerState blk -> Point blk
ledgerTipPoint (Proxy blk
forall k (t :: k). Proxy t
Proxy @blk)
      (LedgerState blk -> Point blk)
-> STM m (LedgerState blk) -> STM m (Point blk)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> LedgerInterface m blk -> STM m (LedgerState blk)
forall (m :: * -> *) blk.
LedgerInterface m blk -> STM m (LedgerState blk)
getCurrentLedgerState (MempoolEnv m blk -> LedgerInterface m blk
forall (m :: * -> *) blk. MempoolEnv m blk -> LedgerInterface m blk
mpEnvLedger MempoolEnv m blk
menv)

implSyncWithLedger ::
     forall m blk. (
       IOLike m
     , LedgerSupportsMempool blk
     , HasTxId (GenTx blk)
     , ValidateEnvelope blk
     )
  => MempoolEnv m blk
  -> m (MempoolSnapshot blk TicketNo)
implSyncWithLedger :: MempoolEnv m blk -> m (MempoolSnapshot blk TicketNo)
implSyncWithLedger MempoolEnv m blk
menv = do
  (Maybe (TraceEventMempool blk)
mTrace, MempoolSnapshot blk TicketNo
mp) <- STM m (Maybe (TraceEventMempool blk), MempoolSnapshot blk TicketNo)
-> m (Maybe (TraceEventMempool blk), MempoolSnapshot blk TicketNo)
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM
   m (Maybe (TraceEventMempool blk), MempoolSnapshot blk TicketNo)
 -> m (Maybe (TraceEventMempool blk), MempoolSnapshot blk TicketNo))
-> STM
     m (Maybe (TraceEventMempool blk), MempoolSnapshot blk TicketNo)
-> m (Maybe (TraceEventMempool blk), MempoolSnapshot blk TicketNo)
forall a b. (a -> b) -> a -> b
$ do
    InternalState blk
is <- StrictTVar m (InternalState blk) -> STM m (InternalState blk)
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m (InternalState blk)
istate
    LedgerState blk
ls <- LedgerInterface m blk -> STM m (LedgerState blk)
forall (m :: * -> *) blk.
LedgerInterface m blk -> STM m (LedgerState blk)
getCurrentLedgerState LedgerInterface m blk
ldgrInterface
    let p :: SyncWithLedger blk
p = InternalState blk
-> LedgerState blk
-> LedgerConfig blk
-> MempoolCapacityBytesOverride
-> SyncWithLedger blk
forall blk.
(LedgerSupportsMempool blk, HasTxId (GenTx blk),
 ValidateEnvelope blk) =>
InternalState blk
-> LedgerState blk
-> LedgerConfig blk
-> MempoolCapacityBytesOverride
-> SyncWithLedger blk
pureSyncWithLedger InternalState blk
is LedgerState blk
ls LedgerConfig blk
cfg MempoolCapacityBytesOverride
co
    StrictTVar m (InternalState blk)
-> SyncWithLedger blk
-> STM
     m (Maybe (TraceEventMempool blk), MempoolSnapshot blk TicketNo)
forall (m :: * -> *) blk.
IOLike m =>
StrictTVar m (InternalState blk)
-> SyncWithLedger blk
-> STM
     m (Maybe (TraceEventMempool blk), MempoolSnapshot blk TicketNo)
runSyncWithLedger StrictTVar m (InternalState blk)
istate SyncWithLedger blk
p
  Maybe (TraceEventMempool blk)
-> (TraceEventMempool blk -> m ()) -> m ()
forall (f :: * -> *) a.
Applicative f =>
Maybe a -> (a -> f ()) -> f ()
whenJust Maybe (TraceEventMempool blk)
mTrace (Tracer m (TraceEventMempool blk) -> TraceEventMempool blk -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (TraceEventMempool blk)
trcr)
  MempoolSnapshot blk TicketNo -> m (MempoolSnapshot blk TicketNo)
forall (m :: * -> *) a. Monad m => a -> m a
return MempoolSnapshot blk TicketNo
mp
  where
    MempoolEnv { mpEnvStateVar :: forall (m :: * -> *) blk.
MempoolEnv m blk -> StrictTVar m (InternalState blk)
mpEnvStateVar = StrictTVar m (InternalState blk)
istate
               , mpEnvLedger :: forall (m :: * -> *) blk. MempoolEnv m blk -> LedgerInterface m blk
mpEnvLedger = LedgerInterface m blk
ldgrInterface
               , mpEnvTracer :: forall (m :: * -> *) blk.
MempoolEnv m blk -> Tracer m (TraceEventMempool blk)
mpEnvTracer = Tracer m (TraceEventMempool blk)
trcr
               , mpEnvLedgerCfg :: forall (m :: * -> *) blk. MempoolEnv m blk -> LedgerConfig blk
mpEnvLedgerCfg = LedgerConfig blk
cfg
               , mpEnvCapacityOverride :: forall (m :: * -> *) blk.
MempoolEnv m blk -> MempoolCapacityBytesOverride
mpEnvCapacityOverride = MempoolCapacityBytesOverride
co
               } = MempoolEnv m blk
menv