{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE ViewPatterns #-}
module Ouroboros.Consensus.MiniProtocol.LocalTxMonitor.Server (localTxMonitorServer) where
import Ouroboros.Network.Protocol.LocalTxMonitor.Server
import Ouroboros.Network.Protocol.LocalTxMonitor.Type
import Ouroboros.Consensus.Block
import Ouroboros.Consensus.Ledger.SupportsMempool
import Ouroboros.Consensus.Mempool.API
import Ouroboros.Consensus.Util.IOLike
localTxMonitorServer ::
forall blk idx m.
( MonadSTM m
, LedgerSupportsMempool blk
, Eq idx
)
=> Mempool m blk idx
-> LocalTxMonitorServer (GenTxId blk) (GenTx blk) SlotNo m ()
localTxMonitorServer :: Mempool m blk idx
-> LocalTxMonitorServer (GenTxId blk) (GenTx blk) SlotNo m ()
localTxMonitorServer Mempool m blk idx
mempool =
m (ServerStIdle (GenTxId blk) (GenTx blk) SlotNo m ())
-> LocalTxMonitorServer (GenTxId blk) (GenTx blk) SlotNo m ()
forall txid tx slot (m :: * -> *) a.
m (ServerStIdle txid tx slot m a)
-> LocalTxMonitorServer txid tx slot m a
LocalTxMonitorServer (ServerStIdle (GenTxId blk) (GenTx blk) SlotNo m ()
-> m (ServerStIdle (GenTxId blk) (GenTx blk) SlotNo m ())
forall (f :: * -> *) a. Applicative f => a -> f a
pure ServerStIdle (GenTxId blk) (GenTx blk) SlotNo m ()
serverStIdle)
where
serverStIdle
:: ServerStIdle (GenTxId blk) (GenTx blk) SlotNo m ()
serverStIdle :: ServerStIdle (GenTxId blk) (GenTx blk) SlotNo m ()
serverStIdle =
ServerStIdle :: forall txid tx slot (m :: * -> *) a.
m (ServerStAcquiring txid tx slot m a)
-> m a -> ServerStIdle txid tx slot m a
ServerStIdle
{ recvMsgDone :: m ()
recvMsgDone = do
() -> m ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
, recvMsgAcquire :: m (ServerStAcquiring (GenTxId blk) (GenTx blk) SlotNo m ())
recvMsgAcquire = do
(MempoolCapacityBytes, MempoolSnapshot blk idx)
s <- STM m (MempoolCapacityBytes, MempoolSnapshot blk idx)
-> m (MempoolCapacityBytes, MempoolSnapshot blk idx)
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m (MempoolCapacityBytes, MempoolSnapshot blk idx)
-> m (MempoolCapacityBytes, MempoolSnapshot blk idx))
-> STM m (MempoolCapacityBytes, MempoolSnapshot blk idx)
-> m (MempoolCapacityBytes, MempoolSnapshot blk idx)
forall a b. (a -> b) -> a -> b
$ (,) (MempoolCapacityBytes
-> MempoolSnapshot blk idx
-> (MempoolCapacityBytes, MempoolSnapshot blk idx))
-> STM m MempoolCapacityBytes
-> STM
m
(MempoolSnapshot blk idx
-> (MempoolCapacityBytes, MempoolSnapshot blk idx))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Mempool m blk idx -> STM m MempoolCapacityBytes
forall (m :: * -> *) blk idx.
Mempool m blk idx -> STM m MempoolCapacityBytes
getCapacity Mempool m blk idx
mempool STM
m
(MempoolSnapshot blk idx
-> (MempoolCapacityBytes, MempoolSnapshot blk idx))
-> STM m (MempoolSnapshot blk idx)
-> STM m (MempoolCapacityBytes, MempoolSnapshot blk idx)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Mempool m blk idx -> STM m (MempoolSnapshot blk idx)
forall (m :: * -> *) blk idx.
Mempool m blk idx -> STM m (MempoolSnapshot blk idx)
getSnapshot Mempool m blk idx
mempool
ServerStAcquiring (GenTxId blk) (GenTx blk) SlotNo m ()
-> m (ServerStAcquiring (GenTxId blk) (GenTx blk) SlotNo m ())
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ServerStAcquiring (GenTxId blk) (GenTx blk) SlotNo m ()
-> m (ServerStAcquiring (GenTxId blk) (GenTx blk) SlotNo m ()))
-> ServerStAcquiring (GenTxId blk) (GenTx blk) SlotNo m ()
-> m (ServerStAcquiring (GenTxId blk) (GenTx blk) SlotNo m ())
forall a b. (a -> b) -> a -> b
$ (MempoolCapacityBytes, MempoolSnapshot blk idx)
-> ServerStAcquiring (GenTxId blk) (GenTx blk) SlotNo m ()
serverStAcquiring (MempoolCapacityBytes, MempoolSnapshot blk idx)
s
}
serverStAcquiring
:: (MempoolCapacityBytes, MempoolSnapshot blk idx)
-> ServerStAcquiring (GenTxId blk) (GenTx blk) SlotNo m ()
serverStAcquiring :: (MempoolCapacityBytes, MempoolSnapshot blk idx)
-> ServerStAcquiring (GenTxId blk) (GenTx blk) SlotNo m ()
serverStAcquiring s :: (MempoolCapacityBytes, MempoolSnapshot blk idx)
s@(MempoolCapacityBytes
_, MempoolSnapshot blk idx
snapshot) =
SlotNo
-> ServerStAcquired (GenTxId blk) (GenTx blk) SlotNo m ()
-> ServerStAcquiring (GenTxId blk) (GenTx blk) SlotNo m ()
forall slot txid tx (m :: * -> *) a.
slot
-> ServerStAcquired txid tx slot m a
-> ServerStAcquiring txid tx slot m a
SendMsgAcquired (MempoolSnapshot blk idx -> SlotNo
forall blk idx. MempoolSnapshot blk idx -> SlotNo
snapshotSlotNo MempoolSnapshot blk idx
snapshot) ((MempoolCapacityBytes, MempoolSnapshot blk idx)
-> [(Validated (GenTx blk), idx)]
-> ServerStAcquired (GenTxId blk) (GenTx blk) SlotNo m ()
serverStAcquired (MempoolCapacityBytes, MempoolSnapshot blk idx)
s (MempoolSnapshot blk idx -> [(Validated (GenTx blk), idx)]
forall blk idx.
MempoolSnapshot blk idx -> [(Validated (GenTx blk), idx)]
snapshotTxs MempoolSnapshot blk idx
snapshot))
serverStAcquired
:: (MempoolCapacityBytes, MempoolSnapshot blk idx)
-> [(Validated (GenTx blk), idx)]
-> ServerStAcquired (GenTxId blk) (GenTx blk) SlotNo m ()
serverStAcquired :: (MempoolCapacityBytes, MempoolSnapshot blk idx)
-> [(Validated (GenTx blk), idx)]
-> ServerStAcquired (GenTxId blk) (GenTx blk) SlotNo m ()
serverStAcquired s :: (MempoolCapacityBytes, MempoolSnapshot blk idx)
s@(MempoolCapacityBytes
capacity, MempoolSnapshot blk idx
snapshot) [(Validated (GenTx blk), idx)]
txs =
ServerStAcquired :: forall txid tx slot (m :: * -> *) a.
m (ServerStBusy 'NextTx txid tx slot m a)
-> (txid -> m (ServerStBusy 'HasTx txid tx slot m a))
-> m (ServerStBusy 'GetSizes txid tx slot m a)
-> m (ServerStAcquiring txid tx slot m a)
-> m (ServerStIdle txid tx slot m a)
-> ServerStAcquired txid tx slot m a
ServerStAcquired
{ recvMsgNextTx :: m (ServerStBusy 'NextTx (GenTxId blk) (GenTx blk) SlotNo m ())
recvMsgNextTx =
case [(Validated (GenTx blk), idx)]
txs of
[] ->
ServerStBusy 'NextTx (GenTxId blk) (GenTx blk) SlotNo m ()
-> m (ServerStBusy 'NextTx (GenTxId blk) (GenTx blk) SlotNo m ())
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ServerStBusy 'NextTx (GenTxId blk) (GenTx blk) SlotNo m ()
-> m (ServerStBusy 'NextTx (GenTxId blk) (GenTx blk) SlotNo m ()))
-> ServerStBusy 'NextTx (GenTxId blk) (GenTx blk) SlotNo m ()
-> m (ServerStBusy 'NextTx (GenTxId blk) (GenTx blk) SlotNo m ())
forall a b. (a -> b) -> a -> b
$ Maybe (GenTx blk)
-> ServerStAcquired (GenTxId blk) (GenTx blk) SlotNo m ()
-> ServerStBusy 'NextTx (GenTxId blk) (GenTx blk) SlotNo m ()
forall tx txid slot (m :: * -> *) a.
Maybe tx
-> ServerStAcquired txid tx slot m a
-> ServerStBusy 'NextTx txid tx slot m a
SendMsgReplyNextTx Maybe (GenTx blk)
forall a. Maybe a
Nothing ((MempoolCapacityBytes, MempoolSnapshot blk idx)
-> [(Validated (GenTx blk), idx)]
-> ServerStAcquired (GenTxId blk) (GenTx blk) SlotNo m ()
serverStAcquired (MempoolCapacityBytes, MempoolSnapshot blk idx)
s [])
(Validated (GenTx blk) -> GenTx blk
forall blk.
LedgerSupportsMempool blk =>
Validated (GenTx blk) -> GenTx blk
txForgetValidated -> GenTx blk
h, idx
_):[(Validated (GenTx blk), idx)]
q ->
ServerStBusy 'NextTx (GenTxId blk) (GenTx blk) SlotNo m ()
-> m (ServerStBusy 'NextTx (GenTxId blk) (GenTx blk) SlotNo m ())
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ServerStBusy 'NextTx (GenTxId blk) (GenTx blk) SlotNo m ()
-> m (ServerStBusy 'NextTx (GenTxId blk) (GenTx blk) SlotNo m ()))
-> ServerStBusy 'NextTx (GenTxId blk) (GenTx blk) SlotNo m ()
-> m (ServerStBusy 'NextTx (GenTxId blk) (GenTx blk) SlotNo m ())
forall a b. (a -> b) -> a -> b
$ Maybe (GenTx blk)
-> ServerStAcquired (GenTxId blk) (GenTx blk) SlotNo m ()
-> ServerStBusy 'NextTx (GenTxId blk) (GenTx blk) SlotNo m ()
forall tx txid slot (m :: * -> *) a.
Maybe tx
-> ServerStAcquired txid tx slot m a
-> ServerStBusy 'NextTx txid tx slot m a
SendMsgReplyNextTx (GenTx blk -> Maybe (GenTx blk)
forall a. a -> Maybe a
Just GenTx blk
h) ((MempoolCapacityBytes, MempoolSnapshot blk idx)
-> [(Validated (GenTx blk), idx)]
-> ServerStAcquired (GenTxId blk) (GenTx blk) SlotNo m ()
serverStAcquired (MempoolCapacityBytes, MempoolSnapshot blk idx)
s [(Validated (GenTx blk), idx)]
q)
, recvMsgHasTx :: GenTxId blk
-> m (ServerStBusy 'HasTx (GenTxId blk) (GenTx blk) SlotNo m ())
recvMsgHasTx = \GenTxId blk
txid ->
ServerStBusy 'HasTx (GenTxId blk) (GenTx blk) SlotNo m ()
-> m (ServerStBusy 'HasTx (GenTxId blk) (GenTx blk) SlotNo m ())
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ServerStBusy 'HasTx (GenTxId blk) (GenTx blk) SlotNo m ()
-> m (ServerStBusy 'HasTx (GenTxId blk) (GenTx blk) SlotNo m ()))
-> ServerStBusy 'HasTx (GenTxId blk) (GenTx blk) SlotNo m ()
-> m (ServerStBusy 'HasTx (GenTxId blk) (GenTx blk) SlotNo m ())
forall a b. (a -> b) -> a -> b
$ Bool
-> ServerStAcquired (GenTxId blk) (GenTx blk) SlotNo m ()
-> ServerStBusy 'HasTx (GenTxId blk) (GenTx blk) SlotNo m ()
forall txid tx slot (m :: * -> *) a.
Bool
-> ServerStAcquired txid tx slot m a
-> ServerStBusy 'HasTx txid tx slot m a
SendMsgReplyHasTx (MempoolSnapshot blk idx -> GenTxId blk -> Bool
forall blk idx. MempoolSnapshot blk idx -> GenTxId blk -> Bool
snapshotHasTx MempoolSnapshot blk idx
snapshot GenTxId blk
txid) ((MempoolCapacityBytes, MempoolSnapshot blk idx)
-> [(Validated (GenTx blk), idx)]
-> ServerStAcquired (GenTxId blk) (GenTx blk) SlotNo m ()
serverStAcquired (MempoolCapacityBytes, MempoolSnapshot blk idx)
s [(Validated (GenTx blk), idx)]
txs)
, recvMsgGetSizes :: m (ServerStBusy 'GetSizes (GenTxId blk) (GenTx blk) SlotNo m ())
recvMsgGetSizes = do
let MempoolSize{Word32
msNumTxs :: MempoolSize -> Word32
msNumTxs :: Word32
msNumTxs,Word32
msNumBytes :: MempoolSize -> Word32
msNumBytes :: Word32
msNumBytes} = MempoolSnapshot blk idx -> MempoolSize
forall blk idx. MempoolSnapshot blk idx -> MempoolSize
snapshotMempoolSize MempoolSnapshot blk idx
snapshot
let sizes :: MempoolSizeAndCapacity
sizes = MempoolSizeAndCapacity :: Word32 -> Word32 -> Word32 -> MempoolSizeAndCapacity
MempoolSizeAndCapacity
{ capacityInBytes :: Word32
capacityInBytes = MempoolCapacityBytes -> Word32
getMempoolCapacityBytes MempoolCapacityBytes
capacity
, sizeInBytes :: Word32
sizeInBytes = Word32
msNumBytes
, numberOfTxs :: Word32
numberOfTxs = Word32
msNumTxs
}
ServerStBusy 'GetSizes (GenTxId blk) (GenTx blk) SlotNo m ()
-> m (ServerStBusy 'GetSizes (GenTxId blk) (GenTx blk) SlotNo m ())
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ServerStBusy 'GetSizes (GenTxId blk) (GenTx blk) SlotNo m ()
-> m (ServerStBusy
'GetSizes (GenTxId blk) (GenTx blk) SlotNo m ()))
-> ServerStBusy 'GetSizes (GenTxId blk) (GenTx blk) SlotNo m ()
-> m (ServerStBusy 'GetSizes (GenTxId blk) (GenTx blk) SlotNo m ())
forall a b. (a -> b) -> a -> b
$ MempoolSizeAndCapacity
-> ServerStAcquired (GenTxId blk) (GenTx blk) SlotNo m ()
-> ServerStBusy 'GetSizes (GenTxId blk) (GenTx blk) SlotNo m ()
forall txid tx slot (m :: * -> *) a.
MempoolSizeAndCapacity
-> ServerStAcquired txid tx slot m a
-> ServerStBusy 'GetSizes txid tx slot m a
SendMsgReplyGetSizes MempoolSizeAndCapacity
sizes ((MempoolCapacityBytes, MempoolSnapshot blk idx)
-> [(Validated (GenTx blk), idx)]
-> ServerStAcquired (GenTxId blk) (GenTx blk) SlotNo m ()
serverStAcquired (MempoolCapacityBytes, MempoolSnapshot blk idx)
s [(Validated (GenTx blk), idx)]
txs)
, recvMsgAwaitAcquire :: m (ServerStAcquiring (GenTxId blk) (GenTx blk) SlotNo m ())
recvMsgAwaitAcquire = do
(MempoolCapacityBytes, MempoolSnapshot blk idx)
s' <- STM m (MempoolCapacityBytes, MempoolSnapshot blk idx)
-> m (MempoolCapacityBytes, MempoolSnapshot blk idx)
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m (MempoolCapacityBytes, MempoolSnapshot blk idx)
-> m (MempoolCapacityBytes, MempoolSnapshot blk idx))
-> STM m (MempoolCapacityBytes, MempoolSnapshot blk idx)
-> m (MempoolCapacityBytes, MempoolSnapshot blk idx)
forall a b. (a -> b) -> a -> b
$ do
s' :: (MempoolCapacityBytes, MempoolSnapshot blk idx)
s'@(MempoolCapacityBytes
_, MempoolSnapshot blk idx
snapshot') <- (,) (MempoolCapacityBytes
-> MempoolSnapshot blk idx
-> (MempoolCapacityBytes, MempoolSnapshot blk idx))
-> STM m MempoolCapacityBytes
-> STM
m
(MempoolSnapshot blk idx
-> (MempoolCapacityBytes, MempoolSnapshot blk idx))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Mempool m blk idx -> STM m MempoolCapacityBytes
forall (m :: * -> *) blk idx.
Mempool m blk idx -> STM m MempoolCapacityBytes
getCapacity Mempool m blk idx
mempool STM
m
(MempoolSnapshot blk idx
-> (MempoolCapacityBytes, MempoolSnapshot blk idx))
-> STM m (MempoolSnapshot blk idx)
-> STM m (MempoolCapacityBytes, MempoolSnapshot blk idx)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Mempool m blk idx -> STM m (MempoolSnapshot blk idx)
forall (m :: * -> *) blk idx.
Mempool m blk idx -> STM m (MempoolSnapshot blk idx)
getSnapshot Mempool m blk idx
mempool
(MempoolCapacityBytes, MempoolSnapshot blk idx)
s' (MempoolCapacityBytes, MempoolSnapshot blk idx)
-> STM m ()
-> STM m (MempoolCapacityBytes, MempoolSnapshot blk idx)
forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ Bool -> STM m ()
forall (m :: * -> *). MonadSTM m => Bool -> STM m ()
check (Bool -> Bool
not (MempoolSnapshot blk idx
snapshot MempoolSnapshot blk idx -> MempoolSnapshot blk idx -> Bool
`isSameSnapshot` MempoolSnapshot blk idx
snapshot'))
ServerStAcquiring (GenTxId blk) (GenTx blk) SlotNo m ()
-> m (ServerStAcquiring (GenTxId blk) (GenTx blk) SlotNo m ())
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ServerStAcquiring (GenTxId blk) (GenTx blk) SlotNo m ()
-> m (ServerStAcquiring (GenTxId blk) (GenTx blk) SlotNo m ()))
-> ServerStAcquiring (GenTxId blk) (GenTx blk) SlotNo m ()
-> m (ServerStAcquiring (GenTxId blk) (GenTx blk) SlotNo m ())
forall a b. (a -> b) -> a -> b
$ (MempoolCapacityBytes, MempoolSnapshot blk idx)
-> ServerStAcquiring (GenTxId blk) (GenTx blk) SlotNo m ()
serverStAcquiring (MempoolCapacityBytes, MempoolSnapshot blk idx)
s'
, recvMsgRelease :: m (ServerStIdle (GenTxId blk) (GenTx blk) SlotNo m ())
recvMsgRelease =
ServerStIdle (GenTxId blk) (GenTx blk) SlotNo m ()
-> m (ServerStIdle (GenTxId blk) (GenTx blk) SlotNo m ())
forall (f :: * -> *) a. Applicative f => a -> f a
pure ServerStIdle (GenTxId blk) (GenTx blk) SlotNo m ()
serverStIdle
}
isSameSnapshot
:: MempoolSnapshot blk idx
-> MempoolSnapshot blk idx
-> Bool
isSameSnapshot :: MempoolSnapshot blk idx -> MempoolSnapshot blk idx -> Bool
isSameSnapshot MempoolSnapshot blk idx
a MempoolSnapshot blk idx
b =
((Validated (GenTx blk), idx) -> idx
forall a b. (a, b) -> b
snd ((Validated (GenTx blk), idx) -> idx)
-> [(Validated (GenTx blk), idx)] -> [idx]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> MempoolSnapshot blk idx -> [(Validated (GenTx blk), idx)]
forall blk idx.
MempoolSnapshot blk idx -> [(Validated (GenTx blk), idx)]
snapshotTxs MempoolSnapshot blk idx
a) [idx] -> [idx] -> Bool
forall a. Eq a => a -> a -> Bool
== ((Validated (GenTx blk), idx) -> idx
forall a b. (a, b) -> b
snd ((Validated (GenTx blk), idx) -> idx)
-> [(Validated (GenTx blk), idx)] -> [idx]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> MempoolSnapshot blk idx -> [(Validated (GenTx blk), idx)]
forall blk idx.
MempoolSnapshot blk idx -> [(Validated (GenTx blk), idx)]
snapshotTxs MempoolSnapshot blk idx
b)
Bool -> Bool -> Bool
&&
MempoolSnapshot blk idx -> SlotNo
forall blk idx. MempoolSnapshot blk idx -> SlotNo
snapshotSlotNo MempoolSnapshot blk idx
a SlotNo -> SlotNo -> Bool
forall a. Eq a => a -> a -> Bool
== MempoolSnapshot blk idx -> SlotNo
forall blk idx. MempoolSnapshot blk idx -> SlotNo
snapshotSlotNo MempoolSnapshot blk idx
b