{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE MultiWayIf #-}
module Cardano.Streaming
  ( withChainSyncEventStream
  , CS.ChainSyncEvent (..)
  , CS.ChainSyncEventException (..)

  --
  , CS.mkConnectInfo
  , CS.mkLocalNodeConnectInfo

  -- * Stream blocks and ledger states
  , blocks
  , blocksPipelined
  , ledgerStates
  , ledgerStatesPipelined
  , foldLedgerState
  , foldLedgerStateEvents
  , getEnvAndInitialLedgerStateHistory
  , CS.ignoreRollbacks
  )
where

import Control.Concurrent qualified as IO
import Control.Concurrent.Async (ExceptionInLinkedThread (ExceptionInLinkedThread), link, withAsync)
import Control.Concurrent.MVar (MVar, newEmptyMVar, putMVar, takeMVar)
import Control.Exception (SomeException (SomeException), catch, throw)
import Control.Exception qualified as IO
import Control.Monad (void)
import Control.Monad.IO.Class (liftIO)
import Control.Monad.Trans.Class (lift)
import Control.Monad.Trans.Except (runExceptT)
import Data.Foldable (forM_)
import Data.Function ((&))
import Data.Sequence (Seq)
import Data.Sequence qualified as Seq
import Data.Word (Word32)
import Streaming (Of, Stream)
import Streaming.Prelude qualified as S

import Cardano.Api qualified as C
import Cardano.Api.ChainSync.Client (ClientStIdle (SendMsgFindIntersect, SendMsgRequestNext),
                                     ClientStIntersect (ClientStIntersect, recvMsgIntersectFound, recvMsgIntersectNotFound),
                                     ClientStNext (ClientStNext, recvMsgRollBackward, recvMsgRollForward))
import Cardano.Slotting.Slot (WithOrigin (At, Origin))

import Cardano.Streaming.Callbacks qualified as CS
import Cardano.Streaming.Helpers qualified as CS

-- | `withChainSyncEventStream` uses the chain-sync mini-protocol to
-- connect to a locally running node and fetch blocks from the given
-- starting point.
withChainSyncEventStream ::
  -- | Path to the node socket
  FilePath ->
  C.NetworkId ->
  -- | The point on the chain to start streaming from
  [C.ChainPoint] ->
  -- | The stream consumer
  (Stream (Of (CS.ChainSyncEvent (C.BlockInMode C.CardanoMode))) IO r -> IO b) ->
  IO b
withChainSyncEventStream :: FilePath
-> NetworkId
-> [ChainPoint]
-> (Stream (Of (ChainSyncEvent (BlockInMode CardanoMode))) IO r
    -> IO b)
-> IO b
withChainSyncEventStream FilePath
socketPath NetworkId
networkId [ChainPoint]
points Stream (Of (ChainSyncEvent (BlockInMode CardanoMode))) IO r -> IO b
consumer = do
  -- The chain-sync client runs in a different thread passing the blocks it
  -- receives to the stream consumer through a MVar. The chain-sync client
  -- thread and the stream consumer will each block on each other and stay
  -- in lockstep.
  --
  -- NOTE: choosing a MVar is a tradeoff towards simplicity. In this case a
  -- (bounded) queue could perform better. Indeed a properly-sized buffer
  -- can reduce the time the two threads are blocked waiting for each
  -- other. The problem here is "properly-sized". A bounded queue like
  -- Control.Concurrent.STM.TBQueue allows us to specify a max queue length
  -- but block size can vary a lot (TODO quantify this) depending on the
  -- era. We have an alternative implementation with customizable queue
  -- size (TBMQueue) but it needs to be extracted from the
  -- plutus-chain-index-core package. Using a simple MVar doesn't seem to
  -- slow down marconi's indexing, likely because the difference is
  -- negligeable compared to existing network and IO latencies.  Therefore,
  -- let's stick with a MVar now and revisit later.
  MVar (ChainSyncEvent (BlockInMode CardanoMode))
nextBlockVar <- IO (MVar (ChainSyncEvent (BlockInMode CardanoMode)))
forall a. IO (MVar a)
newEmptyMVar

  let client :: ChainSyncClient (BlockInMode CardanoMode) ChainPoint ChainTip IO ()
client = [ChainPoint]
-> MVar (ChainSyncEvent (BlockInMode CardanoMode))
-> ChainSyncClient
     (BlockInMode CardanoMode) ChainPoint ChainTip IO ()
forall e.
[ChainPoint]
-> MVar (ChainSyncEvent e)
-> ChainSyncClient e ChainPoint ChainTip IO ()
chainSyncStreamingClient [ChainPoint]
points MVar (ChainSyncEvent (BlockInMode CardanoMode))
nextBlockVar
      localNodeClientProtocols :: LocalNodeClientProtocols
  (BlockInMode CardanoMode)
  ChainPoint
  ChainTip
  slot
  tx
  txid
  txerr
  query
  IO
localNodeClientProtocols =
        LocalNodeClientProtocols :: forall block point tip slot tx txid txerr (query :: * -> *)
       (m :: * -> *).
LocalChainSyncClient block point tip m
-> Maybe (LocalTxSubmissionClient tx txerr m ())
-> Maybe (LocalStateQueryClient block point query m ())
-> Maybe (LocalTxMonitorClient txid tx slot m ())
-> LocalNodeClientProtocols
     block point tip slot tx txid txerr query m
C.LocalNodeClientProtocols
          { localChainSyncClient :: LocalChainSyncClient
  (BlockInMode CardanoMode) ChainPoint ChainTip IO
C.localChainSyncClient = ChainSyncClient (BlockInMode CardanoMode) ChainPoint ChainTip IO ()
-> LocalChainSyncClient
     (BlockInMode CardanoMode) ChainPoint ChainTip IO
forall block point tip (m :: * -> *).
ChainSyncClient block point tip m ()
-> LocalChainSyncClient block point tip m
C.LocalChainSyncClient ChainSyncClient (BlockInMode CardanoMode) ChainPoint ChainTip IO ()
client,
            localStateQueryClient :: Maybe
  (LocalStateQueryClient
     (BlockInMode CardanoMode) ChainPoint query IO ())
C.localStateQueryClient = Maybe
  (LocalStateQueryClient
     (BlockInMode CardanoMode) ChainPoint query IO ())
forall a. Maybe a
Nothing,
            localTxMonitoringClient :: Maybe (LocalTxMonitorClient txid tx slot IO ())
C.localTxMonitoringClient = Maybe (LocalTxMonitorClient txid tx slot IO ())
forall a. Maybe a
Nothing,
            localTxSubmissionClient :: Maybe (LocalTxSubmissionClient tx txerr IO ())
C.localTxSubmissionClient = Maybe (LocalTxSubmissionClient tx txerr IO ())
forall a. Maybe a
Nothing
          }
      connectInfo :: LocalNodeConnectInfo CardanoMode
connectInfo = NetworkId -> FilePath -> LocalNodeConnectInfo CardanoMode
CS.mkLocalNodeConnectInfo NetworkId
networkId FilePath
socketPath

  IO () -> (Async () -> IO b) -> IO b
forall a b. IO a -> (Async a -> IO b) -> IO b
withAsync (LocalNodeConnectInfo CardanoMode
-> LocalNodeClientProtocolsInMode CardanoMode -> IO ()
forall mode.
LocalNodeConnectInfo mode
-> LocalNodeClientProtocolsInMode mode -> IO ()
C.connectToLocalNode LocalNodeConnectInfo CardanoMode
connectInfo LocalNodeClientProtocolsInMode CardanoMode
forall slot tx txid txerr (query :: * -> *).
LocalNodeClientProtocols
  (BlockInMode CardanoMode)
  ChainPoint
  ChainTip
  slot
  tx
  txid
  txerr
  query
  IO
localNodeClientProtocols) ((Async () -> IO b) -> IO b) -> (Async () -> IO b) -> IO b
forall a b. (a -> b) -> a -> b
$ \Async ()
a -> do
    -- Make sure all exceptions in the client thread are passed to the consumer thread
    Async () -> IO ()
forall a. Async a -> IO ()
link Async ()
a
    -- Run the consumer
    Stream (Of (ChainSyncEvent (BlockInMode CardanoMode))) IO r -> IO b
consumer (Stream (Of (ChainSyncEvent (BlockInMode CardanoMode))) IO r
 -> IO b)
-> Stream (Of (ChainSyncEvent (BlockInMode CardanoMode))) IO r
-> IO b
forall a b. (a -> b) -> a -> b
$ IO (ChainSyncEvent (BlockInMode CardanoMode))
-> Stream (Of (ChainSyncEvent (BlockInMode CardanoMode))) IO r
forall (m :: * -> *) a r. Monad m => m a -> Stream (Of a) m r
S.repeatM (IO (ChainSyncEvent (BlockInMode CardanoMode))
 -> Stream (Of (ChainSyncEvent (BlockInMode CardanoMode))) IO r)
-> IO (ChainSyncEvent (BlockInMode CardanoMode))
-> Stream (Of (ChainSyncEvent (BlockInMode CardanoMode))) IO r
forall a b. (a -> b) -> a -> b
$ MVar (ChainSyncEvent (BlockInMode CardanoMode))
-> IO (ChainSyncEvent (BlockInMode CardanoMode))
forall a. MVar a -> IO a
takeMVar MVar (ChainSyncEvent (BlockInMode CardanoMode))
nextBlockVar
  -- Let's rethrow exceptions from the client thread unwrapped, so that the
  -- consumer does not have to know anything about async
  IO b -> (ExceptionInLinkedThread -> IO b) -> IO b
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
`catch` \(ExceptionInLinkedThread Async a
_ (SomeException e
e)) -> e -> IO b
forall a e. Exception e => e -> a
throw e
e

-- | `chainSyncStreamingClient` is the client that connects to a local node
-- and runs the chain-sync mini-protocol. This client is fire-and-forget
-- and does not require any control.
--
-- If the starting point is such that an intersection cannot be found, this
-- client will throw a NoIntersectionFound exception.
chainSyncStreamingClient ::
  [C.ChainPoint] ->
  MVar (CS.ChainSyncEvent e) ->
  C.ChainSyncClient e C.ChainPoint C.ChainTip IO ()
chainSyncStreamingClient :: [ChainPoint]
-> MVar (ChainSyncEvent e)
-> ChainSyncClient e ChainPoint ChainTip IO ()
chainSyncStreamingClient [ChainPoint]
points MVar (ChainSyncEvent e)
nextChainEventVar =
  IO (ClientStIdle e ChainPoint ChainTip IO ())
-> ChainSyncClient e ChainPoint ChainTip IO ()
forall header point tip (m :: * -> *) a.
m (ClientStIdle header point tip m a)
-> ChainSyncClient header point tip m a
C.ChainSyncClient (IO (ClientStIdle e ChainPoint ChainTip IO ())
 -> ChainSyncClient e ChainPoint ChainTip IO ())
-> IO (ClientStIdle e ChainPoint ChainTip IO ())
-> ChainSyncClient e ChainPoint ChainTip IO ()
forall a b. (a -> b) -> a -> b
$ ClientStIdle e ChainPoint ChainTip IO ()
-> IO (ClientStIdle e ChainPoint ChainTip IO ())
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ClientStIdle e ChainPoint ChainTip IO ()
 -> IO (ClientStIdle e ChainPoint ChainTip IO ()))
-> ClientStIdle e ChainPoint ChainTip IO ()
-> IO (ClientStIdle e ChainPoint ChainTip IO ())
forall a b. (a -> b) -> a -> b
$ [ChainPoint]
-> ClientStIntersect e ChainPoint ChainTip IO ()
-> ClientStIdle e ChainPoint ChainTip IO ()
forall point header tip (m :: * -> *) a.
[point]
-> ClientStIntersect header point tip m a
-> ClientStIdle header point tip m a
SendMsgFindIntersect [ChainPoint]
points ClientStIntersect e ChainPoint ChainTip IO ()
forall a. ClientStIntersect e ChainPoint ChainTip IO a
onIntersect
  where
    onIntersect :: ClientStIntersect e ChainPoint ChainTip IO a
onIntersect =
      ClientStIntersect :: forall header point tip (m :: * -> *) a.
(point -> tip -> ChainSyncClient header point tip m a)
-> (tip -> ChainSyncClient header point tip m a)
-> ClientStIntersect header point tip m a
ClientStIntersect
        { recvMsgIntersectFound :: ChainPoint
-> ChainTip -> ChainSyncClient e ChainPoint ChainTip IO a
recvMsgIntersectFound = \ChainPoint
cp ChainTip
ct ->
            IO (ClientStIdle e ChainPoint ChainTip IO a)
-> ChainSyncClient e ChainPoint ChainTip IO a
forall header point tip (m :: * -> *) a.
m (ClientStIdle header point tip m a)
-> ChainSyncClient header point tip m a
C.ChainSyncClient (IO (ClientStIdle e ChainPoint ChainTip IO a)
 -> ChainSyncClient e ChainPoint ChainTip IO a)
-> IO (ClientStIdle e ChainPoint ChainTip IO a)
-> ChainSyncClient e ChainPoint ChainTip IO a
forall a b. (a -> b) -> a -> b
$ do
              MVar (ChainSyncEvent e) -> ChainSyncEvent e -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar (ChainSyncEvent e)
nextChainEventVar (ChainPoint -> ChainTip -> ChainSyncEvent e
forall a. ChainPoint -> ChainTip -> ChainSyncEvent a
CS.RollBackward ChainPoint
cp ChainTip
ct)
              IO (ClientStIdle e ChainPoint ChainTip IO a)
forall a. IO (ClientStIdle e ChainPoint ChainTip IO a)
sendRequestNext,
          recvMsgIntersectNotFound :: ChainTip -> ChainSyncClient e ChainPoint ChainTip IO a
recvMsgIntersectNotFound =
            -- There is nothing we can do here
            ChainSyncEventException
-> ChainTip -> ChainSyncClient e ChainPoint ChainTip IO a
forall a e. Exception e => e -> a
throw ChainSyncEventException
CS.NoIntersectionFound
        }

    sendRequestNext :: IO (ClientStIdle e ChainPoint ChainTip IO a)
sendRequestNext =
      ClientStIdle e ChainPoint ChainTip IO a
-> IO (ClientStIdle e ChainPoint ChainTip IO a)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ClientStIdle e ChainPoint ChainTip IO a
 -> IO (ClientStIdle e ChainPoint ChainTip IO a))
-> ClientStIdle e ChainPoint ChainTip IO a
-> IO (ClientStIdle e ChainPoint ChainTip IO a)
forall a b. (a -> b) -> a -> b
$ ClientStNext e ChainPoint ChainTip IO a
-> IO (ClientStNext e ChainPoint ChainTip IO a)
-> ClientStIdle e ChainPoint ChainTip IO a
forall header point tip (m :: * -> *) a.
ClientStNext header point tip m a
-> m (ClientStNext header point tip m a)
-> ClientStIdle header point tip m a
SendMsgRequestNext ClientStNext e ChainPoint ChainTip IO a
onNext (ClientStNext e ChainPoint ChainTip IO a
-> IO (ClientStNext e ChainPoint ChainTip IO a)
forall (f :: * -> *) a. Applicative f => a -> f a
pure ClientStNext e ChainPoint ChainTip IO a
onNext)
      where
        onNext :: ClientStNext e ChainPoint ChainTip IO a
onNext =
          ClientStNext :: forall header point tip (m :: * -> *) a.
(header -> tip -> ChainSyncClient header point tip m a)
-> (point -> tip -> ChainSyncClient header point tip m a)
-> ClientStNext header point tip m a
ClientStNext
            { recvMsgRollForward :: e -> ChainTip -> ChainSyncClient e ChainPoint ChainTip IO a
recvMsgRollForward = \e
bim ChainTip
ct ->
                IO (ClientStIdle e ChainPoint ChainTip IO a)
-> ChainSyncClient e ChainPoint ChainTip IO a
forall header point tip (m :: * -> *) a.
m (ClientStIdle header point tip m a)
-> ChainSyncClient header point tip m a
C.ChainSyncClient (IO (ClientStIdle e ChainPoint ChainTip IO a)
 -> ChainSyncClient e ChainPoint ChainTip IO a)
-> IO (ClientStIdle e ChainPoint ChainTip IO a)
-> ChainSyncClient e ChainPoint ChainTip IO a
forall a b. (a -> b) -> a -> b
$ do
                  MVar (ChainSyncEvent e) -> ChainSyncEvent e -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar (ChainSyncEvent e)
nextChainEventVar (e -> ChainTip -> ChainSyncEvent e
forall a. a -> ChainTip -> ChainSyncEvent a
CS.RollForward e
bim ChainTip
ct)
                  IO (ClientStIdle e ChainPoint ChainTip IO a)
sendRequestNext,
              recvMsgRollBackward :: ChainPoint
-> ChainTip -> ChainSyncClient e ChainPoint ChainTip IO a
recvMsgRollBackward = \ChainPoint
cp ChainTip
ct ->
                IO (ClientStIdle e ChainPoint ChainTip IO a)
-> ChainSyncClient e ChainPoint ChainTip IO a
forall header point tip (m :: * -> *) a.
m (ClientStIdle header point tip m a)
-> ChainSyncClient header point tip m a
C.ChainSyncClient (IO (ClientStIdle e ChainPoint ChainTip IO a)
 -> ChainSyncClient e ChainPoint ChainTip IO a)
-> IO (ClientStIdle e ChainPoint ChainTip IO a)
-> ChainSyncClient e ChainPoint ChainTip IO a
forall a b. (a -> b) -> a -> b
$ do
                  MVar (ChainSyncEvent e) -> ChainSyncEvent e -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar (ChainSyncEvent e)
nextChainEventVar (ChainPoint -> ChainTip -> ChainSyncEvent e
forall a. ChainPoint -> ChainTip -> ChainSyncEvent a
CS.RollBackward ChainPoint
cp ChainTip
ct)
                  IO (ClientStIdle e ChainPoint ChainTip IO a)
sendRequestNext
            }

-- | Create stream of @ChainSyncEvent (BlockInMode CardanoMode)@ from
-- a node at @socketPath@ with @networkId@ starting at @point@.
blocks
  :: C.LocalNodeConnectInfo C.CardanoMode -> C.ChainPoint
  -> S.Stream (S.Of (CS.ChainSyncEvent (C.BlockInMode C.CardanoMode))) IO r
blocks :: LocalNodeConnectInfo CardanoMode
-> ChainPoint
-> Stream (Of (ChainSyncEvent (BlockInMode CardanoMode))) IO r
blocks LocalNodeConnectInfo CardanoMode
con ChainPoint
chainPoint = do
  Chan (ChainSyncEvent (BlockInMode CardanoMode))
chan <- IO (Chan (ChainSyncEvent (BlockInMode CardanoMode)))
-> Stream
     (Of (ChainSyncEvent (BlockInMode CardanoMode)))
     IO
     (Chan (ChainSyncEvent (BlockInMode CardanoMode)))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO (Chan (ChainSyncEvent (BlockInMode CardanoMode)))
forall a. IO (Chan a)
IO.newChan
  Stream (Of (ChainSyncEvent (BlockInMode CardanoMode))) IO ()
-> Stream (Of (ChainSyncEvent (BlockInMode CardanoMode))) IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (Stream (Of (ChainSyncEvent (BlockInMode CardanoMode))) IO ()
 -> Stream (Of (ChainSyncEvent (BlockInMode CardanoMode))) IO ())
-> Stream (Of (ChainSyncEvent (BlockInMode CardanoMode))) IO ()
-> Stream (Of (ChainSyncEvent (BlockInMode CardanoMode))) IO ()
forall a b. (a -> b) -> a -> b
$ IO ()
-> Stream (Of (ChainSyncEvent (BlockInMode CardanoMode))) IO ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO ()
 -> Stream (Of (ChainSyncEvent (BlockInMode CardanoMode))) IO ())
-> IO ()
-> Stream (Of (ChainSyncEvent (BlockInMode CardanoMode))) IO ()
forall a b. (a -> b) -> a -> b
$ IO () -> IO ()
forall a. IO a -> IO ()
CS.linkedAsync (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ LocalNodeConnectInfo CardanoMode
-> ChainPoint
-> (ChainSyncEvent (BlockInMode CardanoMode) -> IO ())
-> IO ()
CS.blocksCallback LocalNodeConnectInfo CardanoMode
con ChainPoint
chainPoint ((ChainSyncEvent (BlockInMode CardanoMode) -> IO ()) -> IO ())
-> (ChainSyncEvent (BlockInMode CardanoMode) -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ Chan (ChainSyncEvent (BlockInMode CardanoMode))
-> ChainSyncEvent (BlockInMode CardanoMode) -> IO ()
forall a. Chan a -> a -> IO ()
IO.writeChan Chan (ChainSyncEvent (BlockInMode CardanoMode))
chan
  IO (ChainSyncEvent (BlockInMode CardanoMode))
-> Stream (Of (ChainSyncEvent (BlockInMode CardanoMode))) IO r
forall (m :: * -> *) a r. Monad m => m a -> Stream (Of a) m r
S.repeatM (IO (ChainSyncEvent (BlockInMode CardanoMode))
 -> Stream (Of (ChainSyncEvent (BlockInMode CardanoMode))) IO r)
-> IO (ChainSyncEvent (BlockInMode CardanoMode))
-> Stream (Of (ChainSyncEvent (BlockInMode CardanoMode))) IO r
forall a b. (a -> b) -> a -> b
$ Chan (ChainSyncEvent (BlockInMode CardanoMode))
-> IO (ChainSyncEvent (BlockInMode CardanoMode))
forall a. Chan a -> IO a
IO.readChan Chan (ChainSyncEvent (BlockInMode CardanoMode))
chan

blocksPipelined
  :: Word32 -> C.LocalNodeConnectInfo C.CardanoMode -> C.ChainPoint
  -> S.Stream (S.Of (CS.ChainSyncEvent (C.BlockInMode C.CardanoMode))) IO r
blocksPipelined :: Word32
-> LocalNodeConnectInfo CardanoMode
-> ChainPoint
-> Stream (Of (ChainSyncEvent (BlockInMode CardanoMode))) IO r
blocksPipelined Word32
pipelineSize LocalNodeConnectInfo CardanoMode
con ChainPoint
chainPoint = do
  Chan (ChainSyncEvent (BlockInMode CardanoMode))
chan <- IO (Chan (ChainSyncEvent (BlockInMode CardanoMode)))
-> Stream
     (Of (ChainSyncEvent (BlockInMode CardanoMode)))
     IO
     (Chan (ChainSyncEvent (BlockInMode CardanoMode)))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO (Chan (ChainSyncEvent (BlockInMode CardanoMode)))
forall a. IO (Chan a)
IO.newChan
  Stream (Of (ChainSyncEvent (BlockInMode CardanoMode))) IO ()
-> Stream (Of (ChainSyncEvent (BlockInMode CardanoMode))) IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (Stream (Of (ChainSyncEvent (BlockInMode CardanoMode))) IO ()
 -> Stream (Of (ChainSyncEvent (BlockInMode CardanoMode))) IO ())
-> Stream (Of (ChainSyncEvent (BlockInMode CardanoMode))) IO ()
-> Stream (Of (ChainSyncEvent (BlockInMode CardanoMode))) IO ()
forall a b. (a -> b) -> a -> b
$ IO ()
-> Stream (Of (ChainSyncEvent (BlockInMode CardanoMode))) IO ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO ()
 -> Stream (Of (ChainSyncEvent (BlockInMode CardanoMode))) IO ())
-> IO ()
-> Stream (Of (ChainSyncEvent (BlockInMode CardanoMode))) IO ()
forall a b. (a -> b) -> a -> b
$ IO () -> IO ()
forall a. IO a -> IO ()
CS.linkedAsync (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Word32
-> LocalNodeConnectInfo CardanoMode
-> ChainPoint
-> (ChainSyncEvent (BlockInMode CardanoMode) -> IO ())
-> IO ()
CS.blocksCallbackPipelined Word32
pipelineSize LocalNodeConnectInfo CardanoMode
con ChainPoint
chainPoint ((ChainSyncEvent (BlockInMode CardanoMode) -> IO ()) -> IO ())
-> (ChainSyncEvent (BlockInMode CardanoMode) -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ Chan (ChainSyncEvent (BlockInMode CardanoMode))
-> ChainSyncEvent (BlockInMode CardanoMode) -> IO ()
forall a. Chan a -> a -> IO ()
IO.writeChan Chan (ChainSyncEvent (BlockInMode CardanoMode))
chan
  IO (ChainSyncEvent (BlockInMode CardanoMode))
-> Stream (Of (ChainSyncEvent (BlockInMode CardanoMode))) IO r
forall (m :: * -> *) a r. Monad m => m a -> Stream (Of a) m r
S.repeatM (IO (ChainSyncEvent (BlockInMode CardanoMode))
 -> Stream (Of (ChainSyncEvent (BlockInMode CardanoMode))) IO r)
-> IO (ChainSyncEvent (BlockInMode CardanoMode))
-> Stream (Of (ChainSyncEvent (BlockInMode CardanoMode))) IO r
forall a b. (a -> b) -> a -> b
$ Chan (ChainSyncEvent (BlockInMode CardanoMode))
-> IO (ChainSyncEvent (BlockInMode CardanoMode))
forall a. Chan a -> IO a
IO.readChan Chan (ChainSyncEvent (BlockInMode CardanoMode))
chan

-- * Ledger states

-- | Get a stream of permanent ledger states
ledgerStates :: FilePath -> FilePath -> C.ValidationMode -> S.Stream (S.Of C.LedgerState) IO r
ledgerStates :: FilePath
-> FilePath -> ValidationMode -> Stream (Of LedgerState) IO r
ledgerStates FilePath
config FilePath
socket ValidationMode
validationMode = do
  (Env
env, LedgerStateHistory
initialLedgerStateHistory) <- IO (Env, LedgerStateHistory)
-> Stream (Of LedgerState) IO (Env, LedgerStateHistory)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Env, LedgerStateHistory)
 -> Stream (Of LedgerState) IO (Env, LedgerStateHistory))
-> IO (Env, LedgerStateHistory)
-> Stream (Of LedgerState) IO (Env, LedgerStateHistory)
forall a b. (a -> b) -> a -> b
$ FilePath -> IO (Env, LedgerStateHistory)
getEnvAndInitialLedgerStateHistory FilePath
config
  LocalNodeConnectInfo CardanoMode
-> ChainPoint
-> Stream (Of (ChainSyncEvent (BlockInMode CardanoMode))) IO r
forall r.
LocalNodeConnectInfo CardanoMode
-> ChainPoint
-> Stream (Of (ChainSyncEvent (BlockInMode CardanoMode))) IO r
blocks (Env -> FilePath -> LocalNodeConnectInfo CardanoMode
CS.mkConnectInfo Env
env FilePath
socket) ChainPoint
C.ChainPointAtGenesis
    Stream (Of (ChainSyncEvent (BlockInMode CardanoMode))) IO r
-> (Stream (Of (ChainSyncEvent (BlockInMode CardanoMode))) IO r
    -> Stream (Of LedgerState) IO r)
-> Stream (Of LedgerState) IO r
forall a b. a -> (a -> b) -> b
& Env
-> LedgerStateHistory
-> ValidationMode
-> Stream (Of (ChainSyncEvent (BlockInMode CardanoMode))) IO r
-> Stream (Of LedgerState) IO r
forall r.
Env
-> LedgerStateHistory
-> ValidationMode
-> Stream (Of (ChainSyncEvent (BlockInMode CardanoMode))) IO r
-> Stream (Of LedgerState) IO r
foldLedgerState Env
env LedgerStateHistory
initialLedgerStateHistory ValidationMode
validationMode

-- | Get a stream of ledger states over a pipelined chain sync
ledgerStatesPipelined
  :: Word32 -> FilePath -> FilePath -> C.ValidationMode -> S.Stream (S.Of C.LedgerState) IO r
ledgerStatesPipelined :: Word32
-> FilePath
-> FilePath
-> ValidationMode
-> Stream (Of LedgerState) IO r
ledgerStatesPipelined Word32
pipelineSize FilePath
config FilePath
socket ValidationMode
validationMode = do
  (Env
env, LedgerStateHistory
initialLedgerStateHistory) <- IO (Env, LedgerStateHistory)
-> Stream (Of LedgerState) IO (Env, LedgerStateHistory)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Env, LedgerStateHistory)
 -> Stream (Of LedgerState) IO (Env, LedgerStateHistory))
-> IO (Env, LedgerStateHistory)
-> Stream (Of LedgerState) IO (Env, LedgerStateHistory)
forall a b. (a -> b) -> a -> b
$ FilePath -> IO (Env, LedgerStateHistory)
getEnvAndInitialLedgerStateHistory FilePath
config
  Word32
-> LocalNodeConnectInfo CardanoMode
-> ChainPoint
-> Stream (Of (ChainSyncEvent (BlockInMode CardanoMode))) IO r
forall r.
Word32
-> LocalNodeConnectInfo CardanoMode
-> ChainPoint
-> Stream (Of (ChainSyncEvent (BlockInMode CardanoMode))) IO r
blocksPipelined Word32
pipelineSize (Env -> FilePath -> LocalNodeConnectInfo CardanoMode
CS.mkConnectInfo Env
env FilePath
socket) ChainPoint
C.ChainPointAtGenesis
    Stream (Of (ChainSyncEvent (BlockInMode CardanoMode))) IO r
-> (Stream (Of (ChainSyncEvent (BlockInMode CardanoMode))) IO r
    -> Stream (Of LedgerState) IO r)
-> Stream (Of LedgerState) IO r
forall a b. a -> (a -> b) -> b
& Env
-> LedgerStateHistory
-> ValidationMode
-> Stream (Of (ChainSyncEvent (BlockInMode CardanoMode))) IO r
-> Stream (Of LedgerState) IO r
forall r.
Env
-> LedgerStateHistory
-> ValidationMode
-> Stream (Of (ChainSyncEvent (BlockInMode CardanoMode))) IO r
-> Stream (Of LedgerState) IO r
foldLedgerState Env
env LedgerStateHistory
initialLedgerStateHistory ValidationMode
validationMode

-- * Apply block

-- | Fold a stream of blocks into a stream of ledger states. This is
-- implemented in a similar way as `foldBlocks` in
-- cardano-api:Cardano.Api.LedgerState, the difference being that this
-- keeps waiting for more blocks when chainsync server and client are
-- fully synchronized.
foldLedgerState
  :: C.Env -> LedgerStateHistory -> C.ValidationMode
  -> S.Stream (S.Of (CS.ChainSyncEvent (C.BlockInMode C.CardanoMode))) IO r
  -> S.Stream (S.Of C.LedgerState) IO r
foldLedgerState :: Env
-> LedgerStateHistory
-> ValidationMode
-> Stream (Of (ChainSyncEvent (BlockInMode CardanoMode))) IO r
-> Stream (Of LedgerState) IO r
foldLedgerState Env
env LedgerStateHistory
initialLedgerStateHistory ValidationMode
validationMode =
  ((BlockInMode CardanoMode, (LedgerState, [LedgerEvent]))
 -> LedgerState)
-> Stream
     (Of (BlockInMode CardanoMode, (LedgerState, [LedgerEvent]))) IO r
-> Stream (Of LedgerState) IO r
forall (m :: * -> *) a b r.
Monad m =>
(a -> b) -> Stream (Of a) m r -> Stream (Of b) m r
S.map ((LedgerState, [LedgerEvent]) -> LedgerState
forall a b. (a, b) -> a
fst ((LedgerState, [LedgerEvent]) -> LedgerState)
-> ((BlockInMode CardanoMode, (LedgerState, [LedgerEvent]))
    -> (LedgerState, [LedgerEvent]))
-> (BlockInMode CardanoMode, (LedgerState, [LedgerEvent]))
-> LedgerState
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (BlockInMode CardanoMode, (LedgerState, [LedgerEvent]))
-> (LedgerState, [LedgerEvent])
forall a b. (a, b) -> b
snd) (Stream
   (Of (BlockInMode CardanoMode, (LedgerState, [LedgerEvent]))) IO r
 -> Stream (Of LedgerState) IO r)
-> (Stream (Of (ChainSyncEvent (BlockInMode CardanoMode))) IO r
    -> Stream
         (Of (BlockInMode CardanoMode, (LedgerState, [LedgerEvent]))) IO r)
-> Stream (Of (ChainSyncEvent (BlockInMode CardanoMode))) IO r
-> Stream (Of LedgerState) IO r
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Env
-> LedgerStateHistory
-> ValidationMode
-> Stream (Of (ChainSyncEvent (BlockInMode CardanoMode))) IO r
-> Stream
     (Of (BlockInMode CardanoMode, (LedgerState, [LedgerEvent]))) IO r
forall r.
Env
-> LedgerStateHistory
-> ValidationMode
-> Stream (Of (ChainSyncEvent (BlockInMode CardanoMode))) IO r
-> Stream
     (Of (BlockInMode CardanoMode, (LedgerState, [LedgerEvent]))) IO r
foldLedgerStateEvents Env
env LedgerStateHistory
initialLedgerStateHistory ValidationMode
validationMode

-- | Like `foldLedgerState`, but also produces blocks and `C.LedgerEvent`s.
foldLedgerStateEvents
  :: C.Env -> LedgerStateHistory -> C.ValidationMode
  -> S.Stream (S.Of (CS.ChainSyncEvent (C.BlockInMode C.CardanoMode))) IO r
  -> S.Stream (S.Of (C.BlockInMode C.CardanoMode, LedgerStateEvents)) IO r
foldLedgerStateEvents :: Env
-> LedgerStateHistory
-> ValidationMode
-> Stream (Of (ChainSyncEvent (BlockInMode CardanoMode))) IO r
-> Stream
     (Of (BlockInMode CardanoMode, (LedgerState, [LedgerEvent]))) IO r
foldLedgerStateEvents Env
env LedgerStateHistory
initialLedgerStateHistory ValidationMode
validationMode = LedgerStateHistory
-> Stream (Of (ChainSyncEvent (BlockInMode CardanoMode))) IO r
-> Stream
     (Of (BlockInMode CardanoMode, (LedgerState, [LedgerEvent]))) IO r
forall r.
LedgerStateHistory
-> Stream (Of (ChainSyncEvent (BlockInMode CardanoMode))) IO r
-> Stream
     (Of (BlockInMode CardanoMode, (LedgerState, [LedgerEvent]))) IO r
loop LedgerStateHistory
initialLedgerStateHistory
  where
    applyBlock_ :: C.LedgerState -> C.Block era -> IO (C.LedgerState, [C.LedgerEvent])
    applyBlock_ :: LedgerState -> Block era -> IO (LedgerState, [LedgerEvent])
applyBlock_ LedgerState
ledgerState Block era
block = Env
-> LedgerState
-> ValidationMode
-> Block era
-> IO (LedgerState, [LedgerEvent])
forall era.
Env
-> LedgerState
-> ValidationMode
-> Block era
-> IO (LedgerState, [LedgerEvent])
applyBlockThrow Env
env LedgerState
ledgerState ValidationMode
validationMode Block era
block

    loop
      :: LedgerStateHistory
      -> S.Stream (S.Of (CS.ChainSyncEvent (C.BlockInMode C.CardanoMode))) IO r
      -> S.Stream (S.Of (C.BlockInMode C.CardanoMode, LedgerStateEvents)) IO r
    loop :: LedgerStateHistory
-> Stream (Of (ChainSyncEvent (BlockInMode CardanoMode))) IO r
-> Stream
     (Of (BlockInMode CardanoMode, (LedgerState, [LedgerEvent]))) IO r
loop LedgerStateHistory
ledgerStateHistory Stream (Of (ChainSyncEvent (BlockInMode CardanoMode))) IO r
source = IO
  (Either
     r
     (ChainSyncEvent (BlockInMode CardanoMode),
      Stream (Of (ChainSyncEvent (BlockInMode CardanoMode))) IO r))
-> Stream
     (Of (BlockInMode CardanoMode, (LedgerState, [LedgerEvent])))
     IO
     (Either
        r
        (ChainSyncEvent (BlockInMode CardanoMode),
         Stream (Of (ChainSyncEvent (BlockInMode CardanoMode))) IO r))
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (Stream (Of (ChainSyncEvent (BlockInMode CardanoMode))) IO r
-> IO
     (Either
        r
        (ChainSyncEvent (BlockInMode CardanoMode),
         Stream (Of (ChainSyncEvent (BlockInMode CardanoMode))) IO r))
forall (m :: * -> *) a r.
Monad m =>
Stream (Of a) m r -> m (Either r (a, Stream (Of a) m r))
S.next Stream (Of (ChainSyncEvent (BlockInMode CardanoMode))) IO r
source) Stream
  (Of (BlockInMode CardanoMode, (LedgerState, [LedgerEvent])))
  IO
  (Either
     r
     (ChainSyncEvent (BlockInMode CardanoMode),
      Stream (Of (ChainSyncEvent (BlockInMode CardanoMode))) IO r))
-> (Either
      r
      (ChainSyncEvent (BlockInMode CardanoMode),
       Stream (Of (ChainSyncEvent (BlockInMode CardanoMode))) IO r)
    -> Stream
         (Of (BlockInMode CardanoMode, (LedgerState, [LedgerEvent]))) IO r)
-> Stream
     (Of (BlockInMode CardanoMode, (LedgerState, [LedgerEvent]))) IO r
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
      Left r
r -> r
-> Stream
     (Of (BlockInMode CardanoMode, (LedgerState, [LedgerEvent]))) IO r
forall (f :: * -> *) a. Applicative f => a -> f a
pure r
r
      Right (ChainSyncEvent (BlockInMode CardanoMode)
chainSyncEvent, Stream (Of (ChainSyncEvent (BlockInMode CardanoMode))) IO r
source') -> do
        LedgerStateHistory
ledgerStateHistory' <- case ChainSyncEvent (BlockInMode CardanoMode)
chainSyncEvent of
          CS.RollForward (blockInMode :: BlockInMode CardanoMode
blockInMode@(C.BlockInMode Block era
block EraInMode era CardanoMode
_)) ChainTip
_ct -> do
            (LedgerState, [LedgerEvent])
newLedgerState <- IO (LedgerState, [LedgerEvent])
-> Stream
     (Of (BlockInMode CardanoMode, (LedgerState, [LedgerEvent])))
     IO
     (LedgerState, [LedgerEvent])
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (LedgerState, [LedgerEvent])
 -> Stream
      (Of (BlockInMode CardanoMode, (LedgerState, [LedgerEvent])))
      IO
      (LedgerState, [LedgerEvent]))
-> IO (LedgerState, [LedgerEvent])
-> Stream
     (Of (BlockInMode CardanoMode, (LedgerState, [LedgerEvent])))
     IO
     (LedgerState, [LedgerEvent])
forall a b. (a -> b) -> a -> b
$ LedgerState -> Block era -> IO (LedgerState, [LedgerEvent])
forall era.
LedgerState -> Block era -> IO (LedgerState, [LedgerEvent])
applyBlock_ (LedgerStateHistory -> LedgerState
getLastLedgerState LedgerStateHistory
ledgerStateHistory) Block era
block
            let (LedgerStateHistory
ledgerStateHistory', LedgerStateHistory
committedStates) = Env
-> LedgerStateHistory
-> SlotNo
-> (LedgerState, [LedgerEvent])
-> BlockInMode CardanoMode
-> (LedgerStateHistory, LedgerStateHistory)
forall a.
Env
-> History a
-> SlotNo
-> a
-> BlockInMode CardanoMode
-> (History a, History a)
pushLedgerState Env
env LedgerStateHistory
ledgerStateHistory (BlockInMode CardanoMode -> SlotNo
CS.bimSlotNo BlockInMode CardanoMode
blockInMode) (LedgerState, [LedgerEvent])
newLedgerState BlockInMode CardanoMode
blockInMode
            LedgerStateHistory
-> ((SlotNo, (LedgerState, [LedgerEvent]),
     WithOrigin (BlockInMode CardanoMode))
    -> Stream
         (Of (BlockInMode CardanoMode, (LedgerState, [LedgerEvent]))) IO ())
-> Stream
     (Of (BlockInMode CardanoMode, (LedgerState, [LedgerEvent]))) IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ LedgerStateHistory
committedStates (((SlotNo, (LedgerState, [LedgerEvent]),
   WithOrigin (BlockInMode CardanoMode))
  -> Stream
       (Of (BlockInMode CardanoMode, (LedgerState, [LedgerEvent]))) IO ())
 -> Stream
      (Of (BlockInMode CardanoMode, (LedgerState, [LedgerEvent]))) IO ())
-> ((SlotNo, (LedgerState, [LedgerEvent]),
     WithOrigin (BlockInMode CardanoMode))
    -> Stream
         (Of (BlockInMode CardanoMode, (LedgerState, [LedgerEvent]))) IO ())
-> Stream
     (Of (BlockInMode CardanoMode, (LedgerState, [LedgerEvent]))) IO ()
forall a b. (a -> b) -> a -> b
$ \(SlotNo
_, (LedgerState
ledgerState, [LedgerEvent]
ledgerEvents), WithOrigin (BlockInMode CardanoMode)
currBlockMay) -> case WithOrigin (BlockInMode CardanoMode)
currBlockMay of
              WithOrigin (BlockInMode CardanoMode)
Origin       -> ()
-> Stream
     (Of (BlockInMode CardanoMode, (LedgerState, [LedgerEvent]))) IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
              At BlockInMode CardanoMode
currBlock -> (BlockInMode CardanoMode, (LedgerState, [LedgerEvent]))
-> Stream
     (Of (BlockInMode CardanoMode, (LedgerState, [LedgerEvent]))) IO ()
forall (m :: * -> *) a. Monad m => a -> Stream (Of a) m ()
S.yield (BlockInMode CardanoMode
currBlock, (LedgerState
ledgerState, [LedgerEvent]
ledgerEvents))
            LedgerStateHistory
-> Stream
     (Of (BlockInMode CardanoMode, (LedgerState, [LedgerEvent])))
     IO
     LedgerStateHistory
forall (f :: * -> *) a. Applicative f => a -> f a
pure LedgerStateHistory
ledgerStateHistory'
          CS.RollBackward ChainPoint
cp ChainTip
_ct -> LedgerStateHistory
-> Stream
     (Of (BlockInMode CardanoMode, (LedgerState, [LedgerEvent])))
     IO
     LedgerStateHistory
forall (f :: * -> *) a. Applicative f => a -> f a
pure (LedgerStateHistory
 -> Stream
      (Of (BlockInMode CardanoMode, (LedgerState, [LedgerEvent])))
      IO
      LedgerStateHistory)
-> LedgerStateHistory
-> Stream
     (Of (BlockInMode CardanoMode, (LedgerState, [LedgerEvent])))
     IO
     LedgerStateHistory
forall a b. (a -> b) -> a -> b
$ case ChainPoint
cp of
            ChainPoint
C.ChainPointAtGenesis -> LedgerStateHistory
initialLedgerStateHistory
            C.ChainPoint SlotNo
slotNo Hash BlockHeader
_ -> LedgerStateHistory -> SlotNo -> LedgerStateHistory
forall a. History a -> SlotNo -> History a
rollBackLedgerStateHist LedgerStateHistory
ledgerStateHistory SlotNo
slotNo

        LedgerStateHistory
-> Stream (Of (ChainSyncEvent (BlockInMode CardanoMode))) IO r
-> Stream
     (Of (BlockInMode CardanoMode, (LedgerState, [LedgerEvent]))) IO r
forall r.
LedgerStateHistory
-> Stream (Of (ChainSyncEvent (BlockInMode CardanoMode))) IO r
-> Stream
     (Of (BlockInMode CardanoMode, (LedgerState, [LedgerEvent]))) IO r
loop LedgerStateHistory
ledgerStateHistory' Stream (Of (ChainSyncEvent (BlockInMode CardanoMode))) IO r
source'

getEnvAndInitialLedgerStateHistory :: FilePath -> IO (C.Env, LedgerStateHistory)
getEnvAndInitialLedgerStateHistory :: FilePath -> IO (Env, LedgerStateHistory)
getEnvAndInitialLedgerStateHistory FilePath
configPath = do
  (Env
env, LedgerState
initialLedgerState) <- (InitialLedgerStateError -> IO (Env, LedgerState))
-> ((Env, LedgerState) -> IO (Env, LedgerState))
-> Either InitialLedgerStateError (Env, LedgerState)
-> IO (Env, LedgerState)
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either InitialLedgerStateError -> IO (Env, LedgerState)
forall a e. Exception e => e -> a
IO.throw (Env, LedgerState) -> IO (Env, LedgerState)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either InitialLedgerStateError (Env, LedgerState)
 -> IO (Env, LedgerState))
-> IO (Either InitialLedgerStateError (Env, LedgerState))
-> IO (Env, LedgerState)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< (ExceptT InitialLedgerStateError IO (Env, LedgerState)
-> IO (Either InitialLedgerStateError (Env, LedgerState))
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT (ExceptT InitialLedgerStateError IO (Env, LedgerState)
 -> IO (Either InitialLedgerStateError (Env, LedgerState)))
-> ExceptT InitialLedgerStateError IO (Env, LedgerState)
-> IO (Either InitialLedgerStateError (Env, LedgerState))
forall a b. (a -> b) -> a -> b
$ FilePath -> ExceptT InitialLedgerStateError IO (Env, LedgerState)
C.initialLedgerState FilePath
configPath)
  let initialLedgerStateHistory :: LedgerStateHistory
initialLedgerStateHistory = LedgerState -> LedgerStateHistory
singletonLedgerStateHistory LedgerState
initialLedgerState
  (Env, LedgerStateHistory) -> IO (Env, LedgerStateHistory)
forall (m :: * -> *) a. Monad m => a -> m a
return (Env
env, LedgerStateHistory
initialLedgerStateHistory)


applyBlockThrow :: C.Env -> C.LedgerState -> C.ValidationMode -> C.Block era -> IO (C.LedgerState, [C.LedgerEvent])
applyBlockThrow :: Env
-> LedgerState
-> ValidationMode
-> Block era
-> IO (LedgerState, [LedgerEvent])
applyBlockThrow Env
env LedgerState
ledgerState ValidationMode
validationMode Block era
block = case Env
-> LedgerState
-> ValidationMode
-> Block era
-> Either LedgerStateError (LedgerState, [LedgerEvent])
forall era.
Env
-> LedgerState
-> ValidationMode
-> Block era
-> Either LedgerStateError (LedgerState, [LedgerEvent])
C.applyBlock Env
env LedgerState
ledgerState ValidationMode
validationMode Block era
block of
  Left LedgerStateError
err -> LedgerStateError -> IO (LedgerState, [LedgerEvent])
forall a e. Exception e => e -> a
IO.throw LedgerStateError
err
  Right (LedgerState, [LedgerEvent])
ls -> (LedgerState, [LedgerEvent]) -> IO (LedgerState, [LedgerEvent])
forall (f :: * -> *) a. Applicative f => a -> f a
pure (LedgerState, [LedgerEvent])
ls

-- * Copy-paste code
--
-- The following is pasted in from cardano-api:Cardano.Api.LedgerState.
-- (`getLastLedgerState` and `singletonLedgerStateHistory` aren't a
-- direct copy-paste, but they are extracted from within `foldBlocks`)


-- | A history of k (security parameter) recent ledger states. The head is the
-- most recent item. Elements are:
--
-- * Slot number that a new block occurred
-- * The ledger state and events after applying the new block
-- * The new block
--
type LedgerStateHistory = History LedgerStateEvents
type History a = Seq (C.SlotNo, a, WithOrigin (C.BlockInMode C.CardanoMode))

type LedgerStateEvents = (C.LedgerState, [C.LedgerEvent])

-- | Add a new ledger state to the history
pushLedgerState
  :: C.Env                -- ^ Environment used to get the security param, k.
  -> History a          -- ^ History of k items.
  -> C.SlotNo             -- ^ Slot number of the new item.
  -> a                  -- ^ New item to add to the history
  -> C.BlockInMode C.CardanoMode
                        -- ^ The block that (when applied to the previous
                        -- item) resulted in the new item.
  -> (History a, History a)
  -- ^ ( The new history with the new item appended
  --   , Any existing items that are now past the security parameter
  --      and hence can no longer be rolled back.
  --   )
pushLedgerState :: Env
-> History a
-> SlotNo
-> a
-> BlockInMode CardanoMode
-> (History a, History a)
pushLedgerState Env
env History a
hist SlotNo
ix a
st BlockInMode CardanoMode
block
  = Int -> History a -> (History a, History a)
forall a. Int -> Seq a -> (Seq a, Seq a)
Seq.splitAt
      (Word64 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Word64 -> Int) -> Word64 -> Int
forall a b. (a -> b) -> a -> b
$ Env -> Word64
C.envSecurityParam Env
env Word64 -> Word64 -> Word64
forall a. Num a => a -> a -> a
+ Word64
1)
      ((SlotNo
ix, a
st, BlockInMode CardanoMode -> WithOrigin (BlockInMode CardanoMode)
forall t. t -> WithOrigin t
At BlockInMode CardanoMode
block) (SlotNo, a, WithOrigin (BlockInMode CardanoMode))
-> History a -> History a
forall a. a -> Seq a -> Seq a
Seq.:<| History a
hist)

rollBackLedgerStateHist :: History a -> C.SlotNo -> History a
rollBackLedgerStateHist :: History a -> SlotNo -> History a
rollBackLedgerStateHist History a
hist SlotNo
maxInc = ((SlotNo, a, WithOrigin (BlockInMode CardanoMode)) -> Bool)
-> History a -> History a
forall a. (a -> Bool) -> Seq a -> Seq a
Seq.dropWhileL ((SlotNo -> SlotNo -> Bool
forall a. Ord a => a -> a -> Bool
> SlotNo
maxInc) (SlotNo -> Bool)
-> ((SlotNo, a, WithOrigin (BlockInMode CardanoMode)) -> SlotNo)
-> (SlotNo, a, WithOrigin (BlockInMode CardanoMode))
-> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (\(SlotNo
x,a
_,WithOrigin (BlockInMode CardanoMode)
_) -> SlotNo
x)) History a
hist

getLastLedgerState :: LedgerStateHistory -> C.LedgerState
getLastLedgerState :: LedgerStateHistory -> LedgerState
getLastLedgerState LedgerStateHistory
ledgerStates' = LedgerState
-> ((SlotNo, (LedgerState, [LedgerEvent]),
     WithOrigin (BlockInMode CardanoMode))
    -> LedgerState)
-> Maybe
     (SlotNo, (LedgerState, [LedgerEvent]),
      WithOrigin (BlockInMode CardanoMode))
-> LedgerState
forall b a. b -> (a -> b) -> Maybe a -> b
maybe
  (FilePath -> LedgerState
forall a. HasCallStack => FilePath -> a
error FilePath
"Impossible! Missing Ledger state")
  (\(SlotNo
_,(LedgerState
ledgerState, [LedgerEvent]
_),WithOrigin (BlockInMode CardanoMode)
_) -> LedgerState
ledgerState)
  (Int
-> LedgerStateHistory
-> Maybe
     (SlotNo, (LedgerState, [LedgerEvent]),
      WithOrigin (BlockInMode CardanoMode))
forall a. Int -> Seq a -> Maybe a
Seq.lookup Int
0 LedgerStateHistory
ledgerStates')

singletonLedgerStateHistory :: C.LedgerState -> LedgerStateHistory
singletonLedgerStateHistory :: LedgerState -> LedgerStateHistory
singletonLedgerStateHistory LedgerState
ledgerState = (SlotNo, (LedgerState, [LedgerEvent]),
 WithOrigin (BlockInMode CardanoMode))
-> LedgerStateHistory
forall a. a -> Seq a
Seq.singleton (SlotNo
0, (LedgerState
ledgerState, []), WithOrigin (BlockInMode CardanoMode)
forall t. WithOrigin t
Origin)