{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DerivingStrategies #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE NumericUnderscores #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE ScopedTypeVariables #-}
module Plutus.ChainIndex.Events where
import Cardano.BM.Trace (Trace)
import Control.Concurrent (threadDelay)
import Control.Concurrent.STM (atomically)
import Control.Concurrent.STM.TBMQueue (flushTBMQueue, isFullTBMQueue)
import Control.Monad (forever, void)
import Data.Maybe (catMaybes, isJust)
import Numeric.Natural (Natural)
import Plutus.ChainIndex qualified as CI
import Plutus.ChainIndex.Lib (ChainSyncEvent (Resume, RollBackward, RollForward), EventsQueue, RunRequirements,
runChainIndexDuringSync)
import Plutus.ChainIndex.SyncStats (SyncLog, getSyncState, isSyncStateSynced, logProgress)
import Plutus.ChainIndex.Types (tipAsPoint)
import Plutus.Monitoring.Util (PrettyObject (PrettyObject), convertLog, runLogEffects)
import System.Clock (Clock (Monotonic), diffTimeSpec, getTime)
period :: Int
period :: Int
period = Int
2_000_000
measureEventQueueSizeByTxs :: Natural -> ChainSyncEvent -> Natural
measureEventQueueSizeByTxs :: Natural -> ChainSyncEvent -> Natural
measureEventQueueSizeByTxs Natural
maxQueueSize (RollForward (CI.Block Tip
syncTip [(ChainIndexTx, TxProcessOption)]
transactions) Tip
nodeTip) =
let syncState :: SyncState
syncState = Point -> Point -> SyncState
getSyncState (Tip -> Point
tipAsPoint Tip
syncTip) (Tip -> Point
tipAsPoint Tip
nodeTip)
txLen :: Natural
txLen = Int -> Natural
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int -> Natural) -> Int -> Natural
forall a b. (a -> b) -> a -> b
$ [(ChainIndexTx, TxProcessOption)] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [(ChainIndexTx, TxProcessOption)]
transactions
in if SyncState -> Bool
isSyncStateSynced SyncState
syncState
then Natural
maxQueueSize Natural -> Natural -> Natural
forall a. Num a => a -> a -> a
+ Natural
1
else Natural
txLen
measureEventQueueSizeByTxs Natural
maxQueueSize ChainSyncEvent
_ = Natural
maxQueueSize Natural -> Natural -> Natural
forall a. Num a => a -> a -> a
+ Natural
1
processEventsQueue :: Trace IO (PrettyObject SyncLog) -> RunRequirements -> EventsQueue -> IO ()
processEventsQueue :: Trace IO (PrettyObject SyncLog)
-> RunRequirements -> EventsQueue -> IO ()
processEventsQueue Trace IO (PrettyObject SyncLog)
trace RunRequirements
runReq EventsQueue
eventsQueue = IO () -> IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
TimeSpec
start <- Clock -> IO TimeSpec
getTime Clock
Monotonic
[ChainSyncEvent]
eventsToProcess <- do
let
waitUntilEvents :: IO [ChainSyncEvent]
waitUntilEvents = do
Bool
isFull <- STM Bool -> IO Bool
forall a. STM a -> IO a
atomically (STM Bool -> IO Bool) -> STM Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ EventsQueue -> STM Bool
forall a. TBMQueue a -> STM Bool
isFullTBMQueue EventsQueue
eventsQueue
if Bool
isFull then STM [ChainSyncEvent] -> IO [ChainSyncEvent]
forall a. STM a -> IO a
atomically (STM [ChainSyncEvent] -> IO [ChainSyncEvent])
-> STM [ChainSyncEvent] -> IO [ChainSyncEvent]
forall a b. (a -> b) -> a -> b
$ EventsQueue -> STM [ChainSyncEvent]
forall a. TBMQueue a -> STM [a]
flushTBMQueue EventsQueue
eventsQueue
else Int -> IO ()
threadDelay Int
period IO () -> IO [ChainSyncEvent] -> IO [ChainSyncEvent]
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO [ChainSyncEvent]
waitUntilEvents
IO [ChainSyncEvent]
waitUntilEvents
[ChainSyncEvent] -> IO ()
processEvents [ChainSyncEvent]
eventsToProcess
TimeSpec
end <- Clock -> IO TimeSpec
getTime Clock
Monotonic
IO () -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Trace IO SyncLog -> Eff '[LogMsg SyncLog, IO] ~> IO
forall (m :: * -> *) l.
MonadIO m =>
Trace m l -> Eff '[LogMsg l, m] ~> m
runLogEffects ((SyncLog -> PrettyObject SyncLog)
-> Trace IO (PrettyObject SyncLog) -> Trace IO SyncLog
forall a b (m :: * -> *). (a -> b) -> Trace m b -> Trace m a
convertLog SyncLog -> PrettyObject SyncLog
forall t. t -> PrettyObject t
PrettyObject Trace IO (PrettyObject SyncLog)
trace) (Eff '[LogMsg SyncLog, IO] () -> IO ())
-> Eff '[LogMsg SyncLog, IO] () -> IO ()
forall a b. (a -> b) -> a -> b
$ [ChainSyncEvent] -> TimeSpec -> Eff '[LogMsg SyncLog, IO] ()
forall (effs :: [* -> *]).
Member (LogMsg SyncLog) effs =>
[ChainSyncEvent] -> TimeSpec -> Eff effs ()
logProgress [ChainSyncEvent]
eventsToProcess (TimeSpec -> TimeSpec -> TimeSpec
diffTimeSpec TimeSpec
end TimeSpec
start)
where
processEvents :: [ChainSyncEvent] -> IO ()
processEvents :: [ChainSyncEvent] -> IO ()
processEvents [] = () -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
processEvents events :: [ChainSyncEvent]
events@( ChainSyncEvent
e : [ChainSyncEvent]
restEvents ) = case ChainSyncEvent
e of
(Resume Point
resumePoint) -> do
IO (Maybe ()) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (Maybe ()) -> IO ()) -> IO (Maybe ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ RunRequirements
-> Eff
'[ChainIndexQueryEffect, ChainIndexControlEffect,
BeamEffect Sqlite]
()
-> IO (Maybe ())
forall a.
RunRequirements
-> Eff
'[ChainIndexQueryEffect, ChainIndexControlEffect,
BeamEffect Sqlite]
a
-> IO (Maybe a)
runChainIndexDuringSync RunRequirements
runReq (Eff
'[ChainIndexQueryEffect, ChainIndexControlEffect,
BeamEffect Sqlite]
()
-> IO (Maybe ()))
-> Eff
'[ChainIndexQueryEffect, ChainIndexControlEffect,
BeamEffect Sqlite]
()
-> IO (Maybe ())
forall a b. (a -> b) -> a -> b
$ Point
-> Eff
'[ChainIndexQueryEffect, ChainIndexControlEffect,
BeamEffect Sqlite]
()
forall (effs :: [* -> *]).
Member ChainIndexControlEffect effs =>
Point -> Eff effs ()
CI.resumeSync Point
resumePoint
[ChainSyncEvent] -> IO ()
processEvents [ChainSyncEvent]
restEvents
(RollBackward Point
backwardPoint Tip
_) -> do
IO (Maybe ()) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (Maybe ()) -> IO ()) -> IO (Maybe ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ RunRequirements
-> Eff
'[ChainIndexQueryEffect, ChainIndexControlEffect,
BeamEffect Sqlite]
()
-> IO (Maybe ())
forall a.
RunRequirements
-> Eff
'[ChainIndexQueryEffect, ChainIndexControlEffect,
BeamEffect Sqlite]
a
-> IO (Maybe a)
runChainIndexDuringSync RunRequirements
runReq (Eff
'[ChainIndexQueryEffect, ChainIndexControlEffect,
BeamEffect Sqlite]
()
-> IO (Maybe ()))
-> Eff
'[ChainIndexQueryEffect, ChainIndexControlEffect,
BeamEffect Sqlite]
()
-> IO (Maybe ())
forall a b. (a -> b) -> a -> b
$ Point
-> Eff
'[ChainIndexQueryEffect, ChainIndexControlEffect,
BeamEffect Sqlite]
()
forall (effs :: [* -> *]).
Member ChainIndexControlEffect effs =>
Point -> Eff effs ()
CI.rollback Point
backwardPoint
[ChainSyncEvent] -> IO ()
processEvents [ChainSyncEvent]
restEvents
(RollForward ChainSyncBlock
_ Tip
_) -> do
let getBlock :: ChainSyncEvent -> Maybe ChainSyncBlock
getBlock = \case
(RollForward ChainSyncBlock
block Tip
_) -> ChainSyncBlock -> Maybe ChainSyncBlock
forall a. a -> Maybe a
Just ChainSyncBlock
block
ChainSyncEvent
_ -> Maybe ChainSyncBlock
forall a. Maybe a
Nothing
isRollForwardEvt :: ChainSyncEvent -> Bool
isRollForwardEvt = Maybe ChainSyncBlock -> Bool
forall a. Maybe a -> Bool
isJust (Maybe ChainSyncBlock -> Bool)
-> (ChainSyncEvent -> Maybe ChainSyncBlock)
-> ChainSyncEvent
-> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ChainSyncEvent -> Maybe ChainSyncBlock
getBlock
([ChainSyncEvent]
rollForwardEvents, [ChainSyncEvent]
restEvents') = (ChainSyncEvent -> Bool)
-> [ChainSyncEvent] -> ([ChainSyncEvent], [ChainSyncEvent])
forall a. (a -> Bool) -> [a] -> ([a], [a])
span ChainSyncEvent -> Bool
isRollForwardEvt [ChainSyncEvent]
events
blocks :: [ChainSyncBlock]
blocks = [Maybe ChainSyncBlock] -> [ChainSyncBlock]
forall a. [Maybe a] -> [a]
catMaybes ([Maybe ChainSyncBlock] -> [ChainSyncBlock])
-> [Maybe ChainSyncBlock] -> [ChainSyncBlock]
forall a b. (a -> b) -> a -> b
$ (ChainSyncEvent -> Maybe ChainSyncBlock)
-> [ChainSyncEvent] -> [Maybe ChainSyncBlock]
forall a b. (a -> b) -> [a] -> [b]
map ChainSyncEvent -> Maybe ChainSyncBlock
getBlock [ChainSyncEvent]
rollForwardEvents
IO (Maybe ()) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (Maybe ()) -> IO ()) -> IO (Maybe ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ RunRequirements
-> Eff
'[ChainIndexQueryEffect, ChainIndexControlEffect,
BeamEffect Sqlite]
()
-> IO (Maybe ())
forall a.
RunRequirements
-> Eff
'[ChainIndexQueryEffect, ChainIndexControlEffect,
BeamEffect Sqlite]
a
-> IO (Maybe a)
runChainIndexDuringSync RunRequirements
runReq (Eff
'[ChainIndexQueryEffect, ChainIndexControlEffect,
BeamEffect Sqlite]
()
-> IO (Maybe ()))
-> Eff
'[ChainIndexQueryEffect, ChainIndexControlEffect,
BeamEffect Sqlite]
()
-> IO (Maybe ())
forall a b. (a -> b) -> a -> b
$ [ChainSyncBlock]
-> Eff
'[ChainIndexQueryEffect, ChainIndexControlEffect,
BeamEffect Sqlite]
()
forall (effs :: [* -> *]).
Member ChainIndexControlEffect effs =>
[ChainSyncBlock] -> Eff effs ()
CI.appendBlocks [ChainSyncBlock]
blocks
[ChainSyncEvent] -> IO ()
processEvents [ChainSyncEvent]
restEvents'