{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE KindSignatures #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# OPTIONS_GHC -Wno-partial-fields #-}
module Ouroboros.Network.TxSubmission.Inbound
( txSubmissionInbound
, TxSubmissionMempoolWriter (..)
, TraceTxSubmissionInbound (..)
, TxSubmissionProtocolError (..)
, ProcessedTxCount (..)
) where
import Data.Foldable (foldl', toList)
import qualified Data.List.NonEmpty as NonEmpty
import Data.Map.Strict (Map)
import qualified Data.Map.Strict as Map
import Data.Sequence.Strict (StrictSeq)
import qualified Data.Sequence.Strict as Seq
import qualified Data.Set as Set
import Data.Word (Word16)
import GHC.Generics (Generic)
import NoThunks.Class (NoThunks (..), unsafeNoThunks)
import Cardano.Prelude (forceElemsToWHNF)
import Control.Exception (assert)
import Control.Monad (unless)
import Control.Monad.Class.MonadSTM
import Control.Monad.Class.MonadSTM.Strict (checkInvariant)
import Control.Monad.Class.MonadThrow
import Control.Tracer (Tracer, traceWith)
import Network.TypedProtocol.Pipelined (N, Nat (..), natToInt)
import Ouroboros.Network.NodeToNode.Version (NodeToNodeVersion)
import Ouroboros.Network.Protocol.TxSubmission2.Server
import Ouroboros.Network.TxSubmission.Mempool.Reader
(MempoolSnapshot (..), TxSubmissionMempoolReader (..))
data TxSubmissionMempoolWriter txid tx idx m =
TxSubmissionMempoolWriter {
TxSubmissionMempoolWriter txid tx idx m -> tx -> txid
txId :: tx -> txid,
TxSubmissionMempoolWriter txid tx idx m -> [tx] -> m [txid]
mempoolAddTxs :: [tx] -> m [txid]
}
data ProcessedTxCount = ProcessedTxCount {
ProcessedTxCount -> Int
ptxcAccepted :: Int
, ProcessedTxCount -> Int
ptxcRejected :: Int
}
deriving (ProcessedTxCount -> ProcessedTxCount -> Bool
(ProcessedTxCount -> ProcessedTxCount -> Bool)
-> (ProcessedTxCount -> ProcessedTxCount -> Bool)
-> Eq ProcessedTxCount
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: ProcessedTxCount -> ProcessedTxCount -> Bool
$c/= :: ProcessedTxCount -> ProcessedTxCount -> Bool
== :: ProcessedTxCount -> ProcessedTxCount -> Bool
$c== :: ProcessedTxCount -> ProcessedTxCount -> Bool
Eq, Int -> ProcessedTxCount -> ShowS
[ProcessedTxCount] -> ShowS
ProcessedTxCount -> String
(Int -> ProcessedTxCount -> ShowS)
-> (ProcessedTxCount -> String)
-> ([ProcessedTxCount] -> ShowS)
-> Show ProcessedTxCount
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [ProcessedTxCount] -> ShowS
$cshowList :: [ProcessedTxCount] -> ShowS
show :: ProcessedTxCount -> String
$cshow :: ProcessedTxCount -> String
showsPrec :: Int -> ProcessedTxCount -> ShowS
$cshowsPrec :: Int -> ProcessedTxCount -> ShowS
Show)
data TraceTxSubmissionInbound txid tx =
TraceTxSubmissionCollected Int
| TraceTxSubmissionProcessed ProcessedTxCount
| TraceTxInboundTerminated
| TraceTxInboundCanRequestMoreTxs Int
| TraceTxInboundCannotRequestMoreTxs Int
deriving (TraceTxSubmissionInbound txid tx
-> TraceTxSubmissionInbound txid tx -> Bool
(TraceTxSubmissionInbound txid tx
-> TraceTxSubmissionInbound txid tx -> Bool)
-> (TraceTxSubmissionInbound txid tx
-> TraceTxSubmissionInbound txid tx -> Bool)
-> Eq (TraceTxSubmissionInbound txid tx)
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
forall txid tx.
TraceTxSubmissionInbound txid tx
-> TraceTxSubmissionInbound txid tx -> Bool
/= :: TraceTxSubmissionInbound txid tx
-> TraceTxSubmissionInbound txid tx -> Bool
$c/= :: forall txid tx.
TraceTxSubmissionInbound txid tx
-> TraceTxSubmissionInbound txid tx -> Bool
== :: TraceTxSubmissionInbound txid tx
-> TraceTxSubmissionInbound txid tx -> Bool
$c== :: forall txid tx.
TraceTxSubmissionInbound txid tx
-> TraceTxSubmissionInbound txid tx -> Bool
Eq, Int -> TraceTxSubmissionInbound txid tx -> ShowS
[TraceTxSubmissionInbound txid tx] -> ShowS
TraceTxSubmissionInbound txid tx -> String
(Int -> TraceTxSubmissionInbound txid tx -> ShowS)
-> (TraceTxSubmissionInbound txid tx -> String)
-> ([TraceTxSubmissionInbound txid tx] -> ShowS)
-> Show (TraceTxSubmissionInbound txid tx)
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
forall txid tx. Int -> TraceTxSubmissionInbound txid tx -> ShowS
forall txid tx. [TraceTxSubmissionInbound txid tx] -> ShowS
forall txid tx. TraceTxSubmissionInbound txid tx -> String
showList :: [TraceTxSubmissionInbound txid tx] -> ShowS
$cshowList :: forall txid tx. [TraceTxSubmissionInbound txid tx] -> ShowS
show :: TraceTxSubmissionInbound txid tx -> String
$cshow :: forall txid tx. TraceTxSubmissionInbound txid tx -> String
showsPrec :: Int -> TraceTxSubmissionInbound txid tx -> ShowS
$cshowsPrec :: forall txid tx. Int -> TraceTxSubmissionInbound txid tx -> ShowS
Show)
data TxSubmissionProtocolError =
ProtocolErrorTxNotRequested
| ProtocolErrorTxIdsNotRequested
deriving Int -> TxSubmissionProtocolError -> ShowS
[TxSubmissionProtocolError] -> ShowS
TxSubmissionProtocolError -> String
(Int -> TxSubmissionProtocolError -> ShowS)
-> (TxSubmissionProtocolError -> String)
-> ([TxSubmissionProtocolError] -> ShowS)
-> Show TxSubmissionProtocolError
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [TxSubmissionProtocolError] -> ShowS
$cshowList :: [TxSubmissionProtocolError] -> ShowS
show :: TxSubmissionProtocolError -> String
$cshow :: TxSubmissionProtocolError -> String
showsPrec :: Int -> TxSubmissionProtocolError -> ShowS
$cshowsPrec :: Int -> TxSubmissionProtocolError -> ShowS
Show
instance Exception TxSubmissionProtocolError where
displayException :: TxSubmissionProtocolError -> String
displayException TxSubmissionProtocolError
ProtocolErrorTxNotRequested =
String
"The peer replied with a transaction we did not ask for."
displayException TxSubmissionProtocolError
ProtocolErrorTxIdsNotRequested =
String
"The peer replied with more txids than we asked for."
data ServerState txid tx = ServerState {
ServerState txid tx -> Word16
requestedTxIdsInFlight :: !Word16,
ServerState txid tx -> StrictSeq txid
unacknowledgedTxIds :: !(StrictSeq txid),
ServerState txid tx -> Map txid TxSizeInBytes
availableTxids :: !(Map txid TxSizeInBytes),
ServerState txid tx -> Map txid (Maybe tx)
bufferedTxs :: !(Map txid (Maybe tx)),
ServerState txid tx -> Word16
numTxsToAcknowledge :: !Word16
}
deriving (Int -> ServerState txid tx -> ShowS
[ServerState txid tx] -> ShowS
ServerState txid tx -> String
(Int -> ServerState txid tx -> ShowS)
-> (ServerState txid tx -> String)
-> ([ServerState txid tx] -> ShowS)
-> Show (ServerState txid tx)
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
forall txid tx.
(Show txid, Show tx) =>
Int -> ServerState txid tx -> ShowS
forall txid tx.
(Show txid, Show tx) =>
[ServerState txid tx] -> ShowS
forall txid tx.
(Show txid, Show tx) =>
ServerState txid tx -> String
showList :: [ServerState txid tx] -> ShowS
$cshowList :: forall txid tx.
(Show txid, Show tx) =>
[ServerState txid tx] -> ShowS
show :: ServerState txid tx -> String
$cshow :: forall txid tx.
(Show txid, Show tx) =>
ServerState txid tx -> String
showsPrec :: Int -> ServerState txid tx -> ShowS
$cshowsPrec :: forall txid tx.
(Show txid, Show tx) =>
Int -> ServerState txid tx -> ShowS
Show, (forall x. ServerState txid tx -> Rep (ServerState txid tx) x)
-> (forall x. Rep (ServerState txid tx) x -> ServerState txid tx)
-> Generic (ServerState txid tx)
forall x. Rep (ServerState txid tx) x -> ServerState txid tx
forall x. ServerState txid tx -> Rep (ServerState txid tx) x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
forall txid tx x.
Rep (ServerState txid tx) x -> ServerState txid tx
forall txid tx x.
ServerState txid tx -> Rep (ServerState txid tx) x
$cto :: forall txid tx x.
Rep (ServerState txid tx) x -> ServerState txid tx
$cfrom :: forall txid tx x.
ServerState txid tx -> Rep (ServerState txid tx) x
Generic)
instance ( NoThunks txid
, NoThunks tx
) => NoThunks (ServerState txid tx)
initialServerState :: ServerState txid tx
initialServerState :: ServerState txid tx
initialServerState = Word16
-> StrictSeq txid
-> Map txid TxSizeInBytes
-> Map txid (Maybe tx)
-> Word16
-> ServerState txid tx
forall txid tx.
Word16
-> StrictSeq txid
-> Map txid TxSizeInBytes
-> Map txid (Maybe tx)
-> Word16
-> ServerState txid tx
ServerState Word16
0 StrictSeq txid
forall a. StrictSeq a
Seq.empty Map txid TxSizeInBytes
forall k a. Map k a
Map.empty Map txid (Maybe tx)
forall k a. Map k a
Map.empty Word16
0
txSubmissionInbound
:: forall txid tx idx m.
( Ord txid
, NoThunks txid
, NoThunks tx
, MonadSTM m
, MonadThrow m
)
=> Tracer m (TraceTxSubmissionInbound txid tx)
-> Word16
-> TxSubmissionMempoolReader txid tx idx m
-> TxSubmissionMempoolWriter txid tx idx m
-> NodeToNodeVersion
-> TxSubmissionServerPipelined txid tx m ()
txSubmissionInbound :: Tracer m (TraceTxSubmissionInbound txid tx)
-> Word16
-> TxSubmissionMempoolReader txid tx idx m
-> TxSubmissionMempoolWriter txid tx idx m
-> NodeToNodeVersion
-> TxSubmissionServerPipelined txid tx m ()
txSubmissionInbound Tracer m (TraceTxSubmissionInbound txid tx)
tracer Word16
maxUnacked TxSubmissionMempoolReader txid tx idx m
mpReader TxSubmissionMempoolWriter txid tx idx m
mpWriter NodeToNodeVersion
_version =
m (ServerStIdle 'Z txid tx m ())
-> TxSubmissionServerPipelined txid tx m ()
forall (m :: * -> *) txid tx a.
m (ServerStIdle 'Z txid tx m a)
-> TxSubmissionServerPipelined txid tx m a
TxSubmissionServerPipelined (m (ServerStIdle 'Z txid tx m ())
-> TxSubmissionServerPipelined txid tx m ())
-> m (ServerStIdle 'Z txid tx m ())
-> TxSubmissionServerPipelined txid tx m ()
forall a b. (a -> b) -> a -> b
$
StatefulM (ServerState txid tx) 'Z txid tx m
-> ServerState txid tx -> m (ServerStIdle 'Z txid tx m ())
forall s (n :: N) txid tx (m :: * -> *).
NoThunks s =>
StatefulM s n txid tx m -> s -> m (ServerStIdle n txid tx m ())
continueWithStateM (Nat 'Z -> StatefulM (ServerState txid tx) 'Z txid tx m
forall (n :: N).
Nat n -> StatefulM (ServerState txid tx) n txid tx m
serverIdle Nat 'Z
forall (n :: N). ('Z ~ n) => Nat n
Zero) ServerState txid tx
forall txid tx. ServerState txid tx
initialServerState
where
maxTxIdsToRequest :: Word16
maxTxIdsToRequest = Word16
3 :: Word16
maxTxToRequest :: Word16
maxTxToRequest = Word16
2 :: Word16
TxSubmissionMempoolReader{STM m (MempoolSnapshot txid tx idx)
mempoolGetSnapshot :: forall txid tx idx (m :: * -> *).
TxSubmissionMempoolReader txid tx idx m
-> STM m (MempoolSnapshot txid tx idx)
mempoolGetSnapshot :: STM m (MempoolSnapshot txid tx idx)
mempoolGetSnapshot} = TxSubmissionMempoolReader txid tx idx m
mpReader
TxSubmissionMempoolWriter
{ tx -> txid
txId :: tx -> txid
txId :: forall txid tx idx (m :: * -> *).
TxSubmissionMempoolWriter txid tx idx m -> tx -> txid
txId
, [tx] -> m [txid]
mempoolAddTxs :: [tx] -> m [txid]
mempoolAddTxs :: forall txid tx idx (m :: * -> *).
TxSubmissionMempoolWriter txid tx idx m -> [tx] -> m [txid]
mempoolAddTxs
} = TxSubmissionMempoolWriter txid tx idx m
mpWriter
serverIdle :: forall (n :: N).
Nat n
-> StatefulM (ServerState txid tx) n txid tx m
serverIdle :: Nat n -> StatefulM (ServerState txid tx) n txid tx m
serverIdle Nat n
n = (ServerState txid tx -> m (ServerStIdle n txid tx m ()))
-> StatefulM (ServerState txid tx) n txid tx m
forall s (n :: N) txid tx (m :: * -> *).
(s -> m (ServerStIdle n txid tx m ())) -> StatefulM s n txid tx m
StatefulM ((ServerState txid tx -> m (ServerStIdle n txid tx m ()))
-> StatefulM (ServerState txid tx) n txid tx m)
-> (ServerState txid tx -> m (ServerStIdle n txid tx m ()))
-> StatefulM (ServerState txid tx) n txid tx m
forall a b. (a -> b) -> a -> b
$ \ServerState txid tx
st -> case Nat n
n of
Nat n
Zero -> do
if ServerState txid tx -> Bool
forall k. ServerState k tx -> Bool
canRequestMoreTxs ServerState txid tx
st
then do
Tracer m (TraceTxSubmissionInbound txid tx)
-> TraceTxSubmissionInbound txid tx -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (TraceTxSubmissionInbound txid tx)
tracer (Int -> TraceTxSubmissionInbound txid tx
forall txid tx. Int -> TraceTxSubmissionInbound txid tx
TraceTxInboundCanRequestMoreTxs (Nat n -> Int
forall (n :: N). Nat n -> Int
natToInt Nat n
n))
ServerStIdle n txid tx m () -> m (ServerStIdle n txid tx m ())
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ServerStIdle n txid tx m () -> m (ServerStIdle n txid tx m ()))
-> ServerStIdle n txid tx m () -> m (ServerStIdle n txid tx m ())
forall a b. (a -> b) -> a -> b
$ Stateful (ServerState txid tx) n txid tx m
-> ServerState txid tx -> ServerStIdle n txid tx m ()
forall s (n :: N) txid tx (m :: * -> *).
NoThunks s =>
Stateful s n txid tx m -> s -> ServerStIdle n txid tx m ()
continueWithState (Nat n -> Stateful (ServerState txid tx) n txid tx m
forall (n :: N).
Nat n -> Stateful (ServerState txid tx) n txid tx m
serverReqTxs Nat n
forall (n :: N). ('Z ~ n) => Nat n
Zero) ServerState txid tx
st
else do
Tracer m (TraceTxSubmissionInbound txid tx)
-> TraceTxSubmissionInbound txid tx -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (TraceTxSubmissionInbound txid tx)
tracer (Int -> TraceTxSubmissionInbound txid tx
forall txid tx. Int -> TraceTxSubmissionInbound txid tx
TraceTxInboundCannotRequestMoreTxs (Nat n -> Int
forall (n :: N). Nat n -> Int
natToInt Nat n
n))
let numTxIdsToRequest :: Word16
numTxIdsToRequest = Word16
maxTxIdsToRequest Word16 -> Word16 -> Word16
forall a. Ord a => a -> a -> a
`min` Word16
maxUnacked
Bool
-> m (ServerStIdle 'Z txid tx m ())
-> m (ServerStIdle 'Z txid tx m ())
forall a. (?callStack::CallStack) => Bool -> a -> a
assert (ServerState txid tx -> Word16
forall txid tx. ServerState txid tx -> Word16
requestedTxIdsInFlight ServerState txid tx
st Word16 -> Word16 -> Bool
forall a. Eq a => a -> a -> Bool
== Word16
0
Bool -> Bool -> Bool
&& StrictSeq txid -> Bool
forall a. StrictSeq a -> Bool
Seq.null (ServerState txid tx -> StrictSeq txid
forall txid tx. ServerState txid tx -> StrictSeq txid
unacknowledgedTxIds ServerState txid tx
st)
Bool -> Bool -> Bool
&& Map txid TxSizeInBytes -> Bool
forall k a. Map k a -> Bool
Map.null (ServerState txid tx -> Map txid TxSizeInBytes
forall txid tx. ServerState txid tx -> Map txid TxSizeInBytes
availableTxids ServerState txid tx
st)
Bool -> Bool -> Bool
&& Map txid (Maybe tx) -> Bool
forall k a. Map k a -> Bool
Map.null (ServerState txid tx -> Map txid (Maybe tx)
forall txid tx. ServerState txid tx -> Map txid (Maybe tx)
bufferedTxs ServerState txid tx
st)) (m (ServerStIdle 'Z txid tx m ())
-> m (ServerStIdle 'Z txid tx m ()))
-> m (ServerStIdle 'Z txid tx m ())
-> m (ServerStIdle 'Z txid tx m ())
forall a b. (a -> b) -> a -> b
$
ServerStIdle 'Z txid tx m () -> m (ServerStIdle 'Z txid tx m ())
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ServerStIdle 'Z txid tx m () -> m (ServerStIdle 'Z txid tx m ()))
-> ServerStIdle 'Z txid tx m () -> m (ServerStIdle 'Z txid tx m ())
forall a b. (a -> b) -> a -> b
$
Word16
-> Word16
-> m ()
-> (NonEmpty (txid, TxSizeInBytes)
-> m (ServerStIdle 'Z txid tx m ()))
-> ServerStIdle 'Z txid tx m ()
forall (m :: * -> *) a txid tx.
Word16
-> Word16
-> m a
-> (NonEmpty (txid, TxSizeInBytes)
-> m (ServerStIdle 'Z txid tx m a))
-> ServerStIdle 'Z txid tx m a
SendMsgRequestTxIdsBlocking
(ServerState txid tx -> Word16
forall txid tx. ServerState txid tx -> Word16
numTxsToAcknowledge ServerState txid tx
st)
Word16
numTxIdsToRequest
(Tracer m (TraceTxSubmissionInbound txid tx)
-> TraceTxSubmissionInbound txid tx -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (TraceTxSubmissionInbound txid tx)
tracer TraceTxSubmissionInbound txid tx
forall txid tx. TraceTxSubmissionInbound txid tx
TraceTxInboundTerminated)
( StatefulCollect (ServerState txid tx) 'Z txid tx m
-> ServerState txid tx
-> Collect txid tx
-> m (ServerStIdle 'Z txid tx m ())
forall s (n :: N) txid tx (m :: * -> *).
NoThunks s =>
StatefulCollect s n txid tx m
-> s -> Collect txid tx -> m (ServerStIdle n txid tx m ())
collectAndContinueWithState (Nat 'Z -> StatefulCollect (ServerState txid tx) 'Z txid tx m
forall (n :: N).
Nat n -> StatefulCollect (ServerState txid tx) n txid tx m
handleReply Nat 'Z
forall (n :: N). ('Z ~ n) => Nat n
Zero) ServerState txid tx
st {
numTxsToAcknowledge :: Word16
numTxsToAcknowledge = Word16
0,
requestedTxIdsInFlight :: Word16
requestedTxIdsInFlight = Word16
numTxIdsToRequest
}
(Collect txid tx -> m (ServerStIdle 'Z txid tx m ()))
-> (NonEmpty (txid, TxSizeInBytes) -> Collect txid tx)
-> NonEmpty (txid, TxSizeInBytes)
-> m (ServerStIdle 'Z txid tx m ())
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Word16 -> [(txid, TxSizeInBytes)] -> Collect txid tx
forall txid tx.
Word16 -> [(txid, TxSizeInBytes)] -> Collect txid tx
CollectTxIds Word16
numTxIdsToRequest
([(txid, TxSizeInBytes)] -> Collect txid tx)
-> (NonEmpty (txid, TxSizeInBytes) -> [(txid, TxSizeInBytes)])
-> NonEmpty (txid, TxSizeInBytes)
-> Collect txid tx
forall b c a. (b -> c) -> (a -> b) -> a -> c
. NonEmpty (txid, TxSizeInBytes) -> [(txid, TxSizeInBytes)]
forall a. NonEmpty a -> [a]
NonEmpty.toList)
Succ Nat n
n' -> if ServerState txid tx -> Bool
forall k. ServerState k tx -> Bool
canRequestMoreTxs ServerState txid tx
st
then do
Tracer m (TraceTxSubmissionInbound txid tx)
-> TraceTxSubmissionInbound txid tx -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (TraceTxSubmissionInbound txid tx)
tracer (Int -> TraceTxSubmissionInbound txid tx
forall txid tx. Int -> TraceTxSubmissionInbound txid tx
TraceTxInboundCanRequestMoreTxs (Nat n -> Int
forall (n :: N). Nat n -> Int
natToInt Nat n
n))
ServerStIdle ('S n) txid tx m ()
-> m (ServerStIdle ('S n) txid tx m ())
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ServerStIdle ('S n) txid tx m ()
-> m (ServerStIdle ('S n) txid tx m ()))
-> ServerStIdle ('S n) txid tx m ()
-> m (ServerStIdle ('S n) txid tx m ())
forall a b. (a -> b) -> a -> b
$ Maybe (ServerStIdle ('S n) txid tx m ())
-> (Collect txid tx -> m (ServerStIdle n txid tx m ()))
-> ServerStIdle ('S n) txid tx m ()
forall (n :: N) txid tx (m :: * -> *) a.
Maybe (ServerStIdle ('S n) txid tx m a)
-> (Collect txid tx -> m (ServerStIdle n txid tx m a))
-> ServerStIdle ('S n) txid tx m a
CollectPipelined
(ServerStIdle ('S n) txid tx m ()
-> Maybe (ServerStIdle ('S n) txid tx m ())
forall a. a -> Maybe a
Just (Stateful (ServerState txid tx) ('S n) txid tx m
-> ServerState txid tx -> ServerStIdle ('S n) txid tx m ()
forall s (n :: N) txid tx (m :: * -> *).
NoThunks s =>
Stateful s n txid tx m -> s -> ServerStIdle n txid tx m ()
continueWithState (Nat ('S n) -> Stateful (ServerState txid tx) ('S n) txid tx m
forall (n :: N).
Nat n -> Stateful (ServerState txid tx) n txid tx m
serverReqTxs (Nat n -> Nat ('S n)
forall (m :: N) (n :: N). (m ~ 'S n) => Nat n -> Nat m
Succ Nat n
n')) ServerState txid tx
st))
(StatefulCollect (ServerState txid tx) n txid tx m
-> ServerState txid tx
-> Collect txid tx
-> m (ServerStIdle n txid tx m ())
forall s (n :: N) txid tx (m :: * -> *).
NoThunks s =>
StatefulCollect s n txid tx m
-> s -> Collect txid tx -> m (ServerStIdle n txid tx m ())
collectAndContinueWithState (Nat n -> StatefulCollect (ServerState txid tx) n txid tx m
forall (n :: N).
Nat n -> StatefulCollect (ServerState txid tx) n txid tx m
handleReply Nat n
n') ServerState txid tx
st)
else do
Tracer m (TraceTxSubmissionInbound txid tx)
-> TraceTxSubmissionInbound txid tx -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (TraceTxSubmissionInbound txid tx)
tracer (Int -> TraceTxSubmissionInbound txid tx
forall txid tx. Int -> TraceTxSubmissionInbound txid tx
TraceTxInboundCannotRequestMoreTxs (Nat n -> Int
forall (n :: N). Nat n -> Int
natToInt Nat n
n))
ServerStIdle ('S n) txid tx m ()
-> m (ServerStIdle ('S n) txid tx m ())
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ServerStIdle ('S n) txid tx m ()
-> m (ServerStIdle ('S n) txid tx m ()))
-> ServerStIdle ('S n) txid tx m ()
-> m (ServerStIdle ('S n) txid tx m ())
forall a b. (a -> b) -> a -> b
$ Maybe (ServerStIdle ('S n) txid tx m ())
-> (Collect txid tx -> m (ServerStIdle n txid tx m ()))
-> ServerStIdle ('S n) txid tx m ()
forall (n :: N) txid tx (m :: * -> *) a.
Maybe (ServerStIdle ('S n) txid tx m a)
-> (Collect txid tx -> m (ServerStIdle n txid tx m a))
-> ServerStIdle ('S n) txid tx m a
CollectPipelined
Maybe (ServerStIdle ('S n) txid tx m ())
forall a. Maybe a
Nothing
(StatefulCollect (ServerState txid tx) n txid tx m
-> ServerState txid tx
-> Collect txid tx
-> m (ServerStIdle n txid tx m ())
forall s (n :: N) txid tx (m :: * -> *).
NoThunks s =>
StatefulCollect s n txid tx m
-> s -> Collect txid tx -> m (ServerStIdle n txid tx m ())
collectAndContinueWithState (Nat n -> StatefulCollect (ServerState txid tx) n txid tx m
forall (n :: N).
Nat n -> StatefulCollect (ServerState txid tx) n txid tx m
handleReply Nat n
n') ServerState txid tx
st)
where
canRequestMoreTxs :: ServerState k tx -> Bool
canRequestMoreTxs :: ServerState k tx -> Bool
canRequestMoreTxs ServerState k tx
st =
Bool -> Bool
not (Map k TxSizeInBytes -> Bool
forall k a. Map k a -> Bool
Map.null (ServerState k tx -> Map k TxSizeInBytes
forall txid tx. ServerState txid tx -> Map txid TxSizeInBytes
availableTxids ServerState k tx
st))
handleReply :: forall (n :: N).
Nat n
-> StatefulCollect (ServerState txid tx) n txid tx m
handleReply :: Nat n -> StatefulCollect (ServerState txid tx) n txid tx m
handleReply Nat n
n = (ServerState txid tx
-> Collect txid tx -> m (ServerStIdle n txid tx m ()))
-> StatefulCollect (ServerState txid tx) n txid tx m
forall s (n :: N) txid tx (m :: * -> *).
(s -> Collect txid tx -> m (ServerStIdle n txid tx m ()))
-> StatefulCollect s n txid tx m
StatefulCollect ((ServerState txid tx
-> Collect txid tx -> m (ServerStIdle n txid tx m ()))
-> StatefulCollect (ServerState txid tx) n txid tx m)
-> (ServerState txid tx
-> Collect txid tx -> m (ServerStIdle n txid tx m ()))
-> StatefulCollect (ServerState txid tx) n txid tx m
forall a b. (a -> b) -> a -> b
$ \ServerState txid tx
st Collect txid tx
collect -> case Collect txid tx
collect of
CollectTxIds Word16
reqNo [(txid, TxSizeInBytes)]
txids -> do
let txidsSeq :: StrictSeq txid
txidsSeq = [txid] -> StrictSeq txid
forall a. [a] -> StrictSeq a
Seq.fromList (((txid, TxSizeInBytes) -> txid)
-> [(txid, TxSizeInBytes)] -> [txid]
forall a b. (a -> b) -> [a] -> [b]
map (txid, TxSizeInBytes) -> txid
forall a b. (a, b) -> a
fst [(txid, TxSizeInBytes)]
txids)
txidsMap :: Map txid TxSizeInBytes
txidsMap = [(txid, TxSizeInBytes)] -> Map txid TxSizeInBytes
forall k a. Ord k => [(k, a)] -> Map k a
Map.fromList [(txid, TxSizeInBytes)]
txids
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (StrictSeq txid -> Int
forall a. StrictSeq a -> Int
Seq.length StrictSeq txid
txidsSeq Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Word16 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word16
reqNo) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
TxSubmissionProtocolError -> m ()
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO TxSubmissionProtocolError
ProtocolErrorTxIdsNotRequested
let st' :: ServerState txid tx
st' = ServerState txid tx
st {
requestedTxIdsInFlight :: Word16
requestedTxIdsInFlight = ServerState txid tx -> Word16
forall txid tx. ServerState txid tx -> Word16
requestedTxIdsInFlight ServerState txid tx
st Word16 -> Word16 -> Word16
forall a. Num a => a -> a -> a
- Word16
reqNo
}
MempoolSnapshot txid tx idx
mpSnapshot <- STM m (MempoolSnapshot txid tx idx)
-> m (MempoolSnapshot txid tx idx)
forall (m :: * -> *) a.
(MonadSTM m, ?callStack::CallStack) =>
STM m a -> m a
atomically STM m (MempoolSnapshot txid tx idx)
mempoolGetSnapshot
StatefulM (ServerState txid tx) n txid tx m
-> ServerState txid tx -> m (ServerStIdle n txid tx m ())
forall s (n :: N) txid tx (m :: * -> *).
NoThunks s =>
StatefulM s n txid tx m -> s -> m (ServerStIdle n txid tx m ())
continueWithStateM
(Nat n -> StatefulM (ServerState txid tx) n txid tx m
forall (n :: N).
Nat n -> StatefulM (ServerState txid tx) n txid tx m
serverIdle Nat n
n)
(ServerState txid tx
-> StrictSeq txid
-> Map txid TxSizeInBytes
-> MempoolSnapshot txid tx idx
-> ServerState txid tx
acknowledgeTxIds ServerState txid tx
st' StrictSeq txid
txidsSeq Map txid TxSizeInBytes
txidsMap MempoolSnapshot txid tx idx
mpSnapshot)
CollectTxs [txid]
txids [tx]
txs -> do
let txsMap :: Map txid tx
txsMap :: Map txid tx
txsMap = [(txid, tx)] -> Map txid tx
forall k a. Ord k => [(k, a)] -> Map k a
Map.fromList [ (tx -> txid
txId tx
tx, tx
tx) | tx
tx <- [tx]
txs ]
txidsReceived :: Set txid
txidsReceived = Map txid tx -> Set txid
forall k a. Map k a -> Set k
Map.keysSet Map txid tx
txsMap
txidsRequested :: Set txid
txidsRequested = [txid] -> Set txid
forall a. Ord a => [a] -> Set a
Set.fromList [txid]
txids
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (Set txid
txidsReceived Set txid -> Set txid -> Bool
forall a. Ord a => Set a -> Set a -> Bool
`Set.isSubsetOf` Set txid
txidsRequested) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
TxSubmissionProtocolError -> m ()
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO TxSubmissionProtocolError
ProtocolErrorTxNotRequested
let txIdsRequestedWithTxsReceived :: Map txid (Maybe tx)
txIdsRequestedWithTxsReceived :: Map txid (Maybe tx)
txIdsRequestedWithTxsReceived =
(tx -> Maybe tx) -> Map txid tx -> Map txid (Maybe tx)
forall a b k. (a -> b) -> Map k a -> Map k b
Map.map tx -> Maybe tx
forall a. a -> Maybe a
Just Map txid tx
txsMap
Map txid (Maybe tx) -> Map txid (Maybe tx) -> Map txid (Maybe tx)
forall a. Semigroup a => a -> a -> a
<> (txid -> Maybe tx) -> Set txid -> Map txid (Maybe tx)
forall k a. (k -> a) -> Set k -> Map k a
Map.fromSet (Maybe tx -> txid -> Maybe tx
forall a b. a -> b -> a
const Maybe tx
forall a. Maybe a
Nothing) Set txid
txidsRequested
bufferedTxs1 :: Map txid (Maybe tx)
bufferedTxs1 = ServerState txid tx -> Map txid (Maybe tx)
forall txid tx. ServerState txid tx -> Map txid (Maybe tx)
bufferedTxs ServerState txid tx
st Map txid (Maybe tx) -> Map txid (Maybe tx) -> Map txid (Maybe tx)
forall a. Semigroup a => a -> a -> a
<> Map txid (Maybe tx)
txIdsRequestedWithTxsReceived
(StrictSeq txid
acknowledgedTxIds, StrictSeq txid
unacknowledgedTxIds') =
(txid -> Bool)
-> StrictSeq txid -> (StrictSeq txid, StrictSeq txid)
forall a. (a -> Bool) -> StrictSeq a -> (StrictSeq a, StrictSeq a)
Seq.spanl (txid -> Map txid (Maybe tx) -> Bool
forall k a. Ord k => k -> Map k a -> Bool
`Map.member` Map txid (Maybe tx)
bufferedTxs1) (ServerState txid tx -> StrictSeq txid
forall txid tx. ServerState txid tx -> StrictSeq txid
unacknowledgedTxIds ServerState txid tx
st)
txsReady :: [tx]
txsReady = (txid -> [tx] -> [tx]) -> [tx] -> StrictSeq txid -> [tx]
forall (t :: * -> *) a b.
Foldable t =>
(a -> b -> b) -> b -> t a -> b
foldr (\txid
txid [tx]
r -> [tx] -> (tx -> [tx]) -> Maybe tx -> [tx]
forall b a. b -> (a -> b) -> Maybe a -> b
maybe [tx]
r (tx -> [tx] -> [tx]
forall a. a -> [a] -> [a]
:[tx]
r) (Map txid (Maybe tx)
bufferedTxs1 Map txid (Maybe tx) -> txid -> Maybe tx
forall k a. Ord k => Map k a -> k -> a
Map.! txid
txid))
[] StrictSeq txid
acknowledgedTxIds
bufferedTxs2 :: Map txid (Maybe tx)
bufferedTxs2 = (Map txid (Maybe tx) -> txid -> Map txid (Maybe tx))
-> Map txid (Maybe tx) -> StrictSeq txid -> Map txid (Maybe tx)
forall (t :: * -> *) b a.
Foldable t =>
(b -> a -> b) -> b -> t a -> b
foldl' ((txid -> Map txid (Maybe tx) -> Map txid (Maybe tx))
-> Map txid (Maybe tx) -> txid -> Map txid (Maybe tx)
forall a b c. (a -> b -> c) -> b -> a -> c
flip txid -> Map txid (Maybe tx) -> Map txid (Maybe tx)
forall k a. Ord k => k -> Map k a -> Map k a
Map.delete)
Map txid (Maybe tx)
bufferedTxs1 StrictSeq txid
acknowledgedTxIds
live :: [txid]
live = (txid -> Bool) -> [txid] -> [txid]
forall a. (a -> Bool) -> [a] -> [a]
filter (txid -> StrictSeq txid -> Bool
forall (t :: * -> *) a. (Foldable t, Eq a) => a -> t a -> Bool
`elem` StrictSeq txid
unacknowledgedTxIds') ([txid] -> [txid]) -> [txid] -> [txid]
forall a b. (a -> b) -> a -> b
$ StrictSeq txid -> [txid]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList StrictSeq txid
acknowledgedTxIds
bufferedTxs3 :: Map txid (Maybe tx)
bufferedTxs3 = Map txid (Maybe tx) -> Map txid (Maybe tx)
forall (t :: * -> *) a. Foldable t => t a -> t a
forceElemsToWHNF (Map txid (Maybe tx) -> Map txid (Maybe tx))
-> Map txid (Maybe tx) -> Map txid (Maybe tx)
forall a b. (a -> b) -> a -> b
$ Map txid (Maybe tx)
bufferedTxs2 Map txid (Maybe tx) -> Map txid (Maybe tx) -> Map txid (Maybe tx)
forall a. Semigroup a => a -> a -> a
<>
([(txid, Maybe tx)] -> Map txid (Maybe tx)
forall k a. Ord k => [(k, a)] -> Map k a
Map.fromList ([txid] -> [Maybe tx] -> [(txid, Maybe tx)]
forall a b. [a] -> [b] -> [(a, b)]
zip [txid]
live (Maybe tx -> [Maybe tx]
forall a. a -> [a]
repeat Maybe tx
forall a. Maybe a
Nothing)))
let !collected :: Int
collected = [tx] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [tx]
txs
Tracer m (TraceTxSubmissionInbound txid tx)
-> TraceTxSubmissionInbound txid tx -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (TraceTxSubmissionInbound txid tx)
tracer (TraceTxSubmissionInbound txid tx -> m ())
-> TraceTxSubmissionInbound txid tx -> m ()
forall a b. (a -> b) -> a -> b
$
Int -> TraceTxSubmissionInbound txid tx
forall txid tx. Int -> TraceTxSubmissionInbound txid tx
TraceTxSubmissionCollected Int
collected
[txid]
txidsAccepted <- [tx] -> m [txid]
mempoolAddTxs [tx]
txsReady
let !accepted :: Int
accepted = [txid] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [txid]
txidsAccepted
Tracer m (TraceTxSubmissionInbound txid tx)
-> TraceTxSubmissionInbound txid tx -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (TraceTxSubmissionInbound txid tx)
tracer (TraceTxSubmissionInbound txid tx -> m ())
-> TraceTxSubmissionInbound txid tx -> m ()
forall a b. (a -> b) -> a -> b
$ ProcessedTxCount -> TraceTxSubmissionInbound txid tx
forall txid tx.
ProcessedTxCount -> TraceTxSubmissionInbound txid tx
TraceTxSubmissionProcessed ProcessedTxCount :: Int -> Int -> ProcessedTxCount
ProcessedTxCount {
ptxcAccepted :: Int
ptxcAccepted = Int
accepted
, ptxcRejected :: Int
ptxcRejected = Int
collected Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
accepted
}
StatefulM (ServerState txid tx) n txid tx m
-> ServerState txid tx -> m (ServerStIdle n txid tx m ())
forall s (n :: N) txid tx (m :: * -> *).
NoThunks s =>
StatefulM s n txid tx m -> s -> m (ServerStIdle n txid tx m ())
continueWithStateM (Nat n -> StatefulM (ServerState txid tx) n txid tx m
forall (n :: N).
Nat n -> StatefulM (ServerState txid tx) n txid tx m
serverIdle Nat n
n) ServerState txid tx
st {
bufferedTxs :: Map txid (Maybe tx)
bufferedTxs = Map txid (Maybe tx)
bufferedTxs3,
unacknowledgedTxIds :: StrictSeq txid
unacknowledgedTxIds = StrictSeq txid
unacknowledgedTxIds',
numTxsToAcknowledge :: Word16
numTxsToAcknowledge = ServerState txid tx -> Word16
forall txid tx. ServerState txid tx -> Word16
numTxsToAcknowledge ServerState txid tx
st
Word16 -> Word16 -> Word16
forall a. Num a => a -> a -> a
+ Int -> Word16
forall a b. (Integral a, Num b) => a -> b
fromIntegral (StrictSeq txid -> Int
forall a. StrictSeq a -> Int
Seq.length StrictSeq txid
acknowledgedTxIds)
}
acknowledgeTxIds :: ServerState txid tx
-> StrictSeq txid
-> Map txid TxSizeInBytes
-> MempoolSnapshot txid tx idx
-> ServerState txid tx
acknowledgeTxIds :: ServerState txid tx
-> StrictSeq txid
-> Map txid TxSizeInBytes
-> MempoolSnapshot txid tx idx
-> ServerState txid tx
acknowledgeTxIds ServerState txid tx
st StrictSeq txid
txidsSeq Map txid TxSizeInBytes
_ MempoolSnapshot txid tx idx
_ | StrictSeq txid -> Bool
forall a. StrictSeq a -> Bool
Seq.null StrictSeq txid
txidsSeq = ServerState txid tx
st
acknowledgeTxIds ServerState txid tx
st StrictSeq txid
txidsSeq Map txid TxSizeInBytes
txidsMap MempoolSnapshot{txid -> Bool
mempoolHasTx :: forall txid tx idx. MempoolSnapshot txid tx idx -> txid -> Bool
mempoolHasTx :: txid -> Bool
mempoolHasTx} =
ServerState txid tx
st {
availableTxids :: Map txid TxSizeInBytes
availableTxids = Map txid TxSizeInBytes
availableTxids',
bufferedTxs :: Map txid (Maybe tx)
bufferedTxs = Map txid (Maybe tx)
bufferedTxs'',
unacknowledgedTxIds :: StrictSeq txid
unacknowledgedTxIds = StrictSeq txid
unacknowledgedTxIds'',
numTxsToAcknowledge :: Word16
numTxsToAcknowledge = ServerState txid tx -> Word16
forall txid tx. ServerState txid tx -> Word16
numTxsToAcknowledge ServerState txid tx
st
Word16 -> Word16 -> Word16
forall a. Num a => a -> a -> a
+ Int -> Word16
forall a b. (Integral a, Num b) => a -> b
fromIntegral (StrictSeq txid -> Int
forall a. StrictSeq a -> Int
Seq.length StrictSeq txid
acknowledgedTxIds)
}
where
(Map txid TxSizeInBytes
ignoredTxids, Map txid TxSizeInBytes
availableTxidsMp) =
(txid -> TxSizeInBytes -> Bool)
-> Map txid TxSizeInBytes
-> (Map txid TxSizeInBytes, Map txid TxSizeInBytes)
forall k a. (k -> a -> Bool) -> Map k a -> (Map k a, Map k a)
Map.partitionWithKey
(\txid
txid TxSizeInBytes
_ -> txid -> Bool
mempoolHasTx txid
txid)
Map txid TxSizeInBytes
txidsMap
availableTxidsU :: Map txid TxSizeInBytes
availableTxidsU =
(txid -> TxSizeInBytes -> Bool)
-> Map txid TxSizeInBytes -> Map txid TxSizeInBytes
forall k a. (k -> a -> Bool) -> Map k a -> Map k a
Map.filterWithKey
(\txid
txid TxSizeInBytes
_ -> txid -> StrictSeq txid -> Bool
forall (t :: * -> *) a. (Foldable t, Eq a) => a -> t a -> Bool
notElem txid
txid (ServerState txid tx -> StrictSeq txid
forall txid tx. ServerState txid tx -> StrictSeq txid
unacknowledgedTxIds ServerState txid tx
st))
Map txid TxSizeInBytes
txidsMap
availableTxids' :: Map txid TxSizeInBytes
availableTxids' = ServerState txid tx -> Map txid TxSizeInBytes
forall txid tx. ServerState txid tx -> Map txid TxSizeInBytes
availableTxids ServerState txid tx
st Map txid TxSizeInBytes
-> Map txid TxSizeInBytes -> Map txid TxSizeInBytes
forall a. Semigroup a => a -> a -> a
<> Map txid TxSizeInBytes
-> Map txid TxSizeInBytes -> Map txid TxSizeInBytes
forall k a b. Ord k => Map k a -> Map k b -> Map k a
Map.intersection Map txid TxSizeInBytes
availableTxidsMp Map txid TxSizeInBytes
availableTxidsU
bufferedTxs' :: Map txid (Maybe tx)
bufferedTxs' = ServerState txid tx -> Map txid (Maybe tx)
forall txid tx. ServerState txid tx -> Map txid (Maybe tx)
bufferedTxs ServerState txid tx
st
Map txid (Maybe tx) -> Map txid (Maybe tx) -> Map txid (Maybe tx)
forall a. Semigroup a => a -> a -> a
<> (TxSizeInBytes -> Maybe tx)
-> Map txid TxSizeInBytes -> Map txid (Maybe tx)
forall a b k. (a -> b) -> Map k a -> Map k b
Map.map (Maybe tx -> TxSizeInBytes -> Maybe tx
forall a b. a -> b -> a
const Maybe tx
forall a. Maybe a
Nothing) Map txid TxSizeInBytes
ignoredTxids
unacknowledgedTxIds' :: StrictSeq txid
unacknowledgedTxIds' = ServerState txid tx -> StrictSeq txid
forall txid tx. ServerState txid tx -> StrictSeq txid
unacknowledgedTxIds ServerState txid tx
st StrictSeq txid -> StrictSeq txid -> StrictSeq txid
forall a. Semigroup a => a -> a -> a
<> StrictSeq txid
txidsSeq
(StrictSeq txid
acknowledgedTxIds, StrictSeq txid
unacknowledgedTxIds'') =
(txid -> Bool)
-> StrictSeq txid -> (StrictSeq txid, StrictSeq txid)
forall a. (a -> Bool) -> StrictSeq a -> (StrictSeq a, StrictSeq a)
Seq.spanl (txid -> Map txid (Maybe tx) -> Bool
forall k a. Ord k => k -> Map k a -> Bool
`Map.member` Map txid (Maybe tx)
bufferedTxs') StrictSeq txid
unacknowledgedTxIds'
bufferedTxs'' :: Map txid (Maybe tx)
bufferedTxs'' = Map txid (Maybe tx) -> Map txid (Maybe tx)
forall (t :: * -> *) a. Foldable t => t a -> t a
forceElemsToWHNF (Map txid (Maybe tx) -> Map txid (Maybe tx))
-> Map txid (Maybe tx) -> Map txid (Maybe tx)
forall a b. (a -> b) -> a -> b
$ (Map txid (Maybe tx) -> txid -> Map txid (Maybe tx))
-> Map txid (Maybe tx) -> StrictSeq txid -> Map txid (Maybe tx)
forall (t :: * -> *) b a.
Foldable t =>
(b -> a -> b) -> b -> t a -> b
foldl' (\Map txid (Maybe tx)
m txid
txid -> if txid -> StrictSeq txid -> Bool
forall (t :: * -> *) a. (Foldable t, Eq a) => a -> t a -> Bool
elem txid
txid StrictSeq txid
unacknowledgedTxIds''
then Map txid (Maybe tx)
m
else txid -> Map txid (Maybe tx) -> Map txid (Maybe tx)
forall k a. Ord k => k -> Map k a -> Map k a
Map.delete txid
txid Map txid (Maybe tx)
m)
Map txid (Maybe tx)
bufferedTxs' StrictSeq txid
acknowledgedTxIds
serverReqTxs :: forall (n :: N).
Nat n
-> Stateful (ServerState txid tx) n txid tx m
serverReqTxs :: Nat n -> Stateful (ServerState txid tx) n txid tx m
serverReqTxs Nat n
n = (ServerState txid tx -> ServerStIdle n txid tx m ())
-> Stateful (ServerState txid tx) n txid tx m
forall s (n :: N) txid tx (m :: * -> *).
(s -> ServerStIdle n txid tx m ()) -> Stateful s n txid tx m
Stateful ((ServerState txid tx -> ServerStIdle n txid tx m ())
-> Stateful (ServerState txid tx) n txid tx m)
-> (ServerState txid tx -> ServerStIdle n txid tx m ())
-> Stateful (ServerState txid tx) n txid tx m
forall a b. (a -> b) -> a -> b
$ \ServerState txid tx
st -> do
let (Map txid TxSizeInBytes
txsToRequest, Map txid TxSizeInBytes
availableTxids') =
Int
-> Map txid TxSizeInBytes
-> (Map txid TxSizeInBytes, Map txid TxSizeInBytes)
forall k a. Int -> Map k a -> (Map k a, Map k a)
Map.splitAt (Word16 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word16
maxTxToRequest) (ServerState txid tx -> Map txid TxSizeInBytes
forall txid tx. ServerState txid tx -> Map txid TxSizeInBytes
availableTxids ServerState txid tx
st)
[txid]
-> m (ServerStIdle ('S n) txid tx m ())
-> ServerStIdle n txid tx m ()
forall txid (m :: * -> *) (n :: N) tx a.
[txid]
-> m (ServerStIdle ('S n) txid tx m a)
-> ServerStIdle n txid tx m a
SendMsgRequestTxsPipelined
(Map txid TxSizeInBytes -> [txid]
forall k a. Map k a -> [k]
Map.keys Map txid TxSizeInBytes
txsToRequest)
(StatefulM (ServerState txid tx) ('S n) txid tx m
-> ServerState txid tx -> m (ServerStIdle ('S n) txid tx m ())
forall s (n :: N) txid tx (m :: * -> *).
NoThunks s =>
StatefulM s n txid tx m -> s -> m (ServerStIdle n txid tx m ())
continueWithStateM (Nat ('S n) -> StatefulM (ServerState txid tx) ('S n) txid tx m
forall (n :: N).
Nat n -> StatefulM (ServerState txid tx) n txid tx m
serverReqTxIds (Nat n -> Nat ('S n)
forall (m :: N) (n :: N). (m ~ 'S n) => Nat n -> Nat m
Succ Nat n
n)) ServerState txid tx
st {
availableTxids :: Map txid TxSizeInBytes
availableTxids = Map txid TxSizeInBytes
availableTxids'
})
serverReqTxIds :: forall (n :: N).
Nat n
-> StatefulM (ServerState txid tx) n txid tx m
serverReqTxIds :: Nat n -> StatefulM (ServerState txid tx) n txid tx m
serverReqTxIds Nat n
n = (ServerState txid tx -> m (ServerStIdle n txid tx m ()))
-> StatefulM (ServerState txid tx) n txid tx m
forall s (n :: N) txid tx (m :: * -> *).
(s -> m (ServerStIdle n txid tx m ())) -> StatefulM s n txid tx m
StatefulM ((ServerState txid tx -> m (ServerStIdle n txid tx m ()))
-> StatefulM (ServerState txid tx) n txid tx m)
-> (ServerState txid tx -> m (ServerStIdle n txid tx m ()))
-> StatefulM (ServerState txid tx) n txid tx m
forall a b. (a -> b) -> a -> b
$ \ServerState txid tx
st -> do
let numTxIdsToRequest :: Word16
numTxIdsToRequest =
(Word16
maxUnacked
Word16 -> Word16 -> Word16
forall a. Num a => a -> a -> a
- Int -> Word16
forall a b. (Integral a, Num b) => a -> b
fromIntegral (StrictSeq txid -> Int
forall a. StrictSeq a -> Int
Seq.length (ServerState txid tx -> StrictSeq txid
forall txid tx. ServerState txid tx -> StrictSeq txid
unacknowledgedTxIds ServerState txid tx
st))
Word16 -> Word16 -> Word16
forall a. Num a => a -> a -> a
- ServerState txid tx -> Word16
forall txid tx. ServerState txid tx -> Word16
requestedTxIdsInFlight ServerState txid tx
st)
Word16 -> Word16 -> Word16
forall a. Ord a => a -> a -> a
`min` Word16
maxTxIdsToRequest
if Word16
numTxIdsToRequest Word16 -> Word16 -> Bool
forall a. Ord a => a -> a -> Bool
> Word16
0
then ServerStIdle n txid tx m () -> m (ServerStIdle n txid tx m ())
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ServerStIdle n txid tx m () -> m (ServerStIdle n txid tx m ()))
-> ServerStIdle n txid tx m () -> m (ServerStIdle n txid tx m ())
forall a b. (a -> b) -> a -> b
$ Word16
-> Word16
-> m (ServerStIdle ('S n) txid tx m ())
-> ServerStIdle n txid tx m ()
forall (m :: * -> *) (n :: N) txid tx a.
Word16
-> Word16
-> m (ServerStIdle ('S n) txid tx m a)
-> ServerStIdle n txid tx m a
SendMsgRequestTxIdsPipelined
(ServerState txid tx -> Word16
forall txid tx. ServerState txid tx -> Word16
numTxsToAcknowledge ServerState txid tx
st)
Word16
numTxIdsToRequest
(StatefulM (ServerState txid tx) ('S n) txid tx m
-> ServerState txid tx -> m (ServerStIdle ('S n) txid tx m ())
forall s (n :: N) txid tx (m :: * -> *).
NoThunks s =>
StatefulM s n txid tx m -> s -> m (ServerStIdle n txid tx m ())
continueWithStateM (Nat ('S n) -> StatefulM (ServerState txid tx) ('S n) txid tx m
forall (n :: N).
Nat n -> StatefulM (ServerState txid tx) n txid tx m
serverIdle (Nat n -> Nat ('S n)
forall (m :: N) (n :: N). (m ~ 'S n) => Nat n -> Nat m
Succ Nat n
n)) ServerState txid tx
st {
requestedTxIdsInFlight :: Word16
requestedTxIdsInFlight = ServerState txid tx -> Word16
forall txid tx. ServerState txid tx -> Word16
requestedTxIdsInFlight ServerState txid tx
st
Word16 -> Word16 -> Word16
forall a. Num a => a -> a -> a
+ Word16
numTxIdsToRequest,
numTxsToAcknowledge :: Word16
numTxsToAcknowledge = Word16
0
})
else StatefulM (ServerState txid tx) n txid tx m
-> ServerState txid tx -> m (ServerStIdle n txid tx m ())
forall s (n :: N) txid tx (m :: * -> *).
NoThunks s =>
StatefulM s n txid tx m -> s -> m (ServerStIdle n txid tx m ())
continueWithStateM (Nat n -> StatefulM (ServerState txid tx) n txid tx m
forall (n :: N).
Nat n -> StatefulM (ServerState txid tx) n txid tx m
serverIdle Nat n
n) ServerState txid tx
st
newtype Stateful s n txid tx m = Stateful (s -> ServerStIdle n txid tx m ())
newtype StatefulM s n txid tx m
= StatefulM (s -> m (ServerStIdle n txid tx m ()))
newtype StatefulCollect s n txid tx m
= StatefulCollect (s -> Collect txid tx -> m (ServerStIdle n txid tx m ()))
continueWithState :: NoThunks s
=> Stateful s n txid tx m
-> s
-> ServerStIdle n txid tx m ()
continueWithState :: Stateful s n txid tx m -> s -> ServerStIdle n txid tx m ()
continueWithState (Stateful s -> ServerStIdle n txid tx m ()
f) !s
st =
Maybe String
-> ServerStIdle n txid tx m () -> ServerStIdle n txid tx m ()
forall a. (?callStack::CallStack) => Maybe String -> a -> a
checkInvariant (ThunkInfo -> String
forall a. Show a => a -> String
show (ThunkInfo -> String) -> Maybe ThunkInfo -> Maybe String
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> s -> Maybe ThunkInfo
forall a. NoThunks a => a -> Maybe ThunkInfo
unsafeNoThunks s
st) (s -> ServerStIdle n txid tx m ()
f s
st)
continueWithStateM :: NoThunks s
=> StatefulM s n txid tx m
-> s
-> m (ServerStIdle n txid tx m ())
continueWithStateM :: StatefulM s n txid tx m -> s -> m (ServerStIdle n txid tx m ())
continueWithStateM (StatefulM s -> m (ServerStIdle n txid tx m ())
f) !s
st =
Maybe String
-> m (ServerStIdle n txid tx m ())
-> m (ServerStIdle n txid tx m ())
forall a. (?callStack::CallStack) => Maybe String -> a -> a
checkInvariant (ThunkInfo -> String
forall a. Show a => a -> String
show (ThunkInfo -> String) -> Maybe ThunkInfo -> Maybe String
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> s -> Maybe ThunkInfo
forall a. NoThunks a => a -> Maybe ThunkInfo
unsafeNoThunks s
st) (s -> m (ServerStIdle n txid tx m ())
f s
st)
collectAndContinueWithState :: NoThunks s
=> StatefulCollect s n txid tx m
-> s
-> Collect txid tx
-> m (ServerStIdle n txid tx m ())
collectAndContinueWithState :: StatefulCollect s n txid tx m
-> s -> Collect txid tx -> m (ServerStIdle n txid tx m ())
collectAndContinueWithState (StatefulCollect s -> Collect txid tx -> m (ServerStIdle n txid tx m ())
f) !s
st Collect txid tx
c =
Maybe String
-> m (ServerStIdle n txid tx m ())
-> m (ServerStIdle n txid tx m ())
forall a. (?callStack::CallStack) => Maybe String -> a -> a
checkInvariant (ThunkInfo -> String
forall a. Show a => a -> String
show (ThunkInfo -> String) -> Maybe ThunkInfo -> Maybe String
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> s -> Maybe ThunkInfo
forall a. NoThunks a => a -> Maybe ThunkInfo
unsafeNoThunks s
st) (s -> Collect txid tx -> m (ServerStIdle n txid tx m ())
f s
st Collect txid tx
c)