{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}
module Ouroboros.Network.TxSubmission.Outbound
( txSubmissionOutbound
, TraceTxSubmissionOutbound (..)
, TxSubmissionProtocolError (..)
) where
import Data.Foldable (find)
import qualified Data.List.NonEmpty as NonEmpty
import Data.Maybe (catMaybes, isNothing)
import Data.Sequence.Strict (StrictSeq)
import qualified Data.Sequence.Strict as Seq
import Data.Word (Word16)
import Control.Exception (assert)
import Control.Monad (unless, when)
import Control.Monad.Class.MonadSTM
import Control.Monad.Class.MonadThrow
import Control.Tracer (Tracer, traceWith)
import Ouroboros.Network.Mux (ControlMessage, ControlMessageSTM,
timeoutWithControlMessage)
import Ouroboros.Network.NodeToNode.Version (NodeToNodeVersion)
import Ouroboros.Network.Protocol.TxSubmission2.Client
import Ouroboros.Network.TxSubmission.Mempool.Reader
(MempoolSnapshot (..), TxSubmissionMempoolReader (..))
data TraceTxSubmissionOutbound txid tx
= TraceTxSubmissionOutboundRecvMsgRequestTxs
[txid]
| TraceTxSubmissionOutboundSendMsgReplyTxs
[tx]
| TraceControlMessage ControlMessage
deriving Int -> TraceTxSubmissionOutbound txid tx -> ShowS
[TraceTxSubmissionOutbound txid tx] -> ShowS
TraceTxSubmissionOutbound txid tx -> String
(Int -> TraceTxSubmissionOutbound txid tx -> ShowS)
-> (TraceTxSubmissionOutbound txid tx -> String)
-> ([TraceTxSubmissionOutbound txid tx] -> ShowS)
-> Show (TraceTxSubmissionOutbound txid tx)
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
forall txid tx.
(Show txid, Show tx) =>
Int -> TraceTxSubmissionOutbound txid tx -> ShowS
forall txid tx.
(Show txid, Show tx) =>
[TraceTxSubmissionOutbound txid tx] -> ShowS
forall txid tx.
(Show txid, Show tx) =>
TraceTxSubmissionOutbound txid tx -> String
showList :: [TraceTxSubmissionOutbound txid tx] -> ShowS
$cshowList :: forall txid tx.
(Show txid, Show tx) =>
[TraceTxSubmissionOutbound txid tx] -> ShowS
show :: TraceTxSubmissionOutbound txid tx -> String
$cshow :: forall txid tx.
(Show txid, Show tx) =>
TraceTxSubmissionOutbound txid tx -> String
showsPrec :: Int -> TraceTxSubmissionOutbound txid tx -> ShowS
$cshowsPrec :: forall txid tx.
(Show txid, Show tx) =>
Int -> TraceTxSubmissionOutbound txid tx -> ShowS
Show
data TxSubmissionProtocolError =
ProtocolErrorAckedTooManyTxids
| ProtocolErrorRequestedNothing
| ProtocolErrorRequestedTooManyTxids Word16 Word16
| ProtocolErrorRequestBlocking
| ProtocolErrorRequestNonBlocking
| ProtocolErrorRequestedUnavailableTx
deriving Int -> TxSubmissionProtocolError -> ShowS
[TxSubmissionProtocolError] -> ShowS
TxSubmissionProtocolError -> String
(Int -> TxSubmissionProtocolError -> ShowS)
-> (TxSubmissionProtocolError -> String)
-> ([TxSubmissionProtocolError] -> ShowS)
-> Show TxSubmissionProtocolError
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [TxSubmissionProtocolError] -> ShowS
$cshowList :: [TxSubmissionProtocolError] -> ShowS
show :: TxSubmissionProtocolError -> String
$cshow :: TxSubmissionProtocolError -> String
showsPrec :: Int -> TxSubmissionProtocolError -> ShowS
$cshowsPrec :: Int -> TxSubmissionProtocolError -> ShowS
Show
instance Exception TxSubmissionProtocolError where
displayException :: TxSubmissionProtocolError -> String
displayException TxSubmissionProtocolError
ProtocolErrorAckedTooManyTxids =
String
"The peer tried to acknowledged more txids than are available to do so."
displayException (ProtocolErrorRequestedTooManyTxids Word16
reqNo Word16
maxUnacked) =
String
"The peer requested " String -> ShowS
forall a. [a] -> [a] -> [a]
++ Word16 -> String
forall a. Show a => a -> String
show Word16
reqNo String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
" txids which would put the "
String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
"total in flight over the limit of " String -> ShowS
forall a. [a] -> [a] -> [a]
++ Word16 -> String
forall a. Show a => a -> String
show Word16
maxUnacked
displayException TxSubmissionProtocolError
ProtocolErrorRequestedNothing =
String
"The peer requested zero txids."
displayException TxSubmissionProtocolError
ProtocolErrorRequestBlocking =
String
"The peer made a blocking request for more txids when there are still "
String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
"unacknowledged txids. It should have used a non-blocking request."
displayException TxSubmissionProtocolError
ProtocolErrorRequestNonBlocking =
String
"The peer made a non-blocking request for more txids when there are "
String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
"no unacknowledged txids. It should have used a blocking request."
displayException TxSubmissionProtocolError
ProtocolErrorRequestedUnavailableTx =
String
"The peer requested a transaction which is not available, either "
String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
"because it was never available or because it was previously requested."
txSubmissionOutbound
:: forall txid tx idx m.
(Ord txid, Ord idx, MonadSTM m, MonadThrow m)
=> Tracer m (TraceTxSubmissionOutbound txid tx)
-> Word16
-> TxSubmissionMempoolReader txid tx idx m
-> NodeToNodeVersion
-> ControlMessageSTM m
-> TxSubmissionClient txid tx m ()
txSubmissionOutbound :: Tracer m (TraceTxSubmissionOutbound txid tx)
-> Word16
-> TxSubmissionMempoolReader txid tx idx m
-> NodeToNodeVersion
-> ControlMessageSTM m
-> TxSubmissionClient txid tx m ()
txSubmissionOutbound Tracer m (TraceTxSubmissionOutbound txid tx)
tracer Word16
maxUnacked TxSubmissionMempoolReader{idx
STM m (MempoolSnapshot txid tx idx)
mempoolZeroIdx :: forall txid tx idx (m :: * -> *).
TxSubmissionMempoolReader txid tx idx m -> idx
mempoolGetSnapshot :: forall txid tx idx (m :: * -> *).
TxSubmissionMempoolReader txid tx idx m
-> STM m (MempoolSnapshot txid tx idx)
mempoolZeroIdx :: idx
mempoolGetSnapshot :: STM m (MempoolSnapshot txid tx idx)
..} NodeToNodeVersion
_version ControlMessageSTM m
controlMessageSTM =
m (ClientStIdle txid tx m ()) -> TxSubmissionClient txid tx m ()
forall txid tx (m :: * -> *) a.
m (ClientStIdle txid tx m a) -> TxSubmissionClient txid tx m a
TxSubmissionClient (ClientStIdle txid tx m () -> m (ClientStIdle txid tx m ())
forall (f :: * -> *) a. Applicative f => a -> f a
pure (StrictSeq (txid, idx) -> idx -> ClientStIdle txid tx m ()
client StrictSeq (txid, idx)
forall a. StrictSeq a
Seq.empty idx
mempoolZeroIdx))
where
client :: StrictSeq (txid, idx) -> idx -> ClientStIdle txid tx m ()
client :: StrictSeq (txid, idx) -> idx -> ClientStIdle txid tx m ()
client !StrictSeq (txid, idx)
unackedSeq !idx
lastIdx =
ClientStIdle :: forall txid tx (m :: * -> *) a.
(forall (blocking :: StBlockingStyle).
TokBlockingStyle blocking
-> Word16 -> Word16 -> m (ClientStTxIds blocking txid tx m a))
-> ([txid] -> m (ClientStTxs txid tx m a))
-> ClientStIdle txid tx m a
ClientStIdle { forall (blocking :: StBlockingStyle).
TokBlockingStyle blocking
-> Word16 -> Word16 -> m (ClientStTxIds blocking txid tx m ())
recvMsgRequestTxIds :: forall (blocking :: StBlockingStyle).
TokBlockingStyle blocking
-> Word16 -> Word16 -> m (ClientStTxIds blocking txid tx m ())
recvMsgRequestTxIds :: forall (blocking :: StBlockingStyle).
TokBlockingStyle blocking
-> Word16 -> Word16 -> m (ClientStTxIds blocking txid tx m ())
recvMsgRequestTxIds, [txid] -> m (ClientStTxs txid tx m ())
recvMsgRequestTxs :: [txid] -> m (ClientStTxs txid tx m ())
recvMsgRequestTxs :: [txid] -> m (ClientStTxs txid tx m ())
recvMsgRequestTxs }
where
recvMsgRequestTxIds :: forall blocking.
TokBlockingStyle blocking
-> Word16
-> Word16
-> m (ClientStTxIds blocking txid tx m ())
recvMsgRequestTxIds :: TokBlockingStyle blocking
-> Word16 -> Word16 -> m (ClientStTxIds blocking txid tx m ())
recvMsgRequestTxIds TokBlockingStyle blocking
blocking Word16
ackNo Word16
reqNo = do
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Word16
ackNo Word16 -> Word16 -> Bool
forall a. Ord a => a -> a -> Bool
> Int -> Word16
forall a b. (Integral a, Num b) => a -> b
fromIntegral (StrictSeq (txid, idx) -> Int
forall a. StrictSeq a -> Int
Seq.length StrictSeq (txid, idx)
unackedSeq)) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
TxSubmissionProtocolError -> m ()
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO TxSubmissionProtocolError
ProtocolErrorAckedTooManyTxids
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when ( Int -> Word16
forall a b. (Integral a, Num b) => a -> b
fromIntegral (StrictSeq (txid, idx) -> Int
forall a. StrictSeq a -> Int
Seq.length StrictSeq (txid, idx)
unackedSeq)
Word16 -> Word16 -> Word16
forall a. Num a => a -> a -> a
- Word16
ackNo
Word16 -> Word16 -> Word16
forall a. Num a => a -> a -> a
+ Word16
reqNo
Word16 -> Word16 -> Bool
forall a. Ord a => a -> a -> Bool
> Word16
maxUnacked) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
TxSubmissionProtocolError -> m ()
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO (Word16 -> Word16 -> TxSubmissionProtocolError
ProtocolErrorRequestedTooManyTxids Word16
reqNo Word16
maxUnacked)
let !unackedSeq' :: StrictSeq (txid, idx)
unackedSeq' = Int -> StrictSeq (txid, idx) -> StrictSeq (txid, idx)
forall a. Int -> StrictSeq a -> StrictSeq a
Seq.drop (Word16 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word16
ackNo) StrictSeq (txid, idx)
unackedSeq
Maybe [(txid, idx, TxSizeInBytes)]
mbtxs <- case TokBlockingStyle blocking
blocking of
TokBlockingStyle blocking
TokBlocking -> do
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Word16
reqNo Word16 -> Word16 -> Bool
forall a. Eq a => a -> a -> Bool
== Word16
0) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
TxSubmissionProtocolError -> m ()
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO TxSubmissionProtocolError
ProtocolErrorRequestedNothing
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (StrictSeq (txid, idx) -> Bool
forall a. StrictSeq a -> Bool
Seq.null StrictSeq (txid, idx)
unackedSeq') (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
TxSubmissionProtocolError -> m ()
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO TxSubmissionProtocolError
ProtocolErrorRequestBlocking
ControlMessageSTM m
-> STM m [(txid, idx, TxSizeInBytes)]
-> m (Maybe [(txid, idx, TxSizeInBytes)])
forall (m :: * -> *) a.
MonadSTM m =>
ControlMessageSTM m -> STM m a -> m (Maybe a)
timeoutWithControlMessage ControlMessageSTM m
controlMessageSTM (STM m [(txid, idx, TxSizeInBytes)]
-> m (Maybe [(txid, idx, TxSizeInBytes)]))
-> STM m [(txid, idx, TxSizeInBytes)]
-> m (Maybe [(txid, idx, TxSizeInBytes)])
forall a b. (a -> b) -> a -> b
$
do
MempoolSnapshot{idx -> [(txid, idx, TxSizeInBytes)]
mempoolTxIdsAfter :: forall txid tx idx.
MempoolSnapshot txid tx idx -> idx -> [(txid, idx, TxSizeInBytes)]
mempoolTxIdsAfter :: idx -> [(txid, idx, TxSizeInBytes)]
mempoolTxIdsAfter} <- STM m (MempoolSnapshot txid tx idx)
mempoolGetSnapshot
let txs :: [(txid, idx, TxSizeInBytes)]
txs = idx -> [(txid, idx, TxSizeInBytes)]
mempoolTxIdsAfter idx
lastIdx
Bool -> STM m ()
forall (m :: * -> *). MonadSTM m => Bool -> STM m ()
check (Bool -> Bool
not (Bool -> Bool) -> Bool -> Bool
forall a b. (a -> b) -> a -> b
$ [(txid, idx, TxSizeInBytes)] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [(txid, idx, TxSizeInBytes)]
txs)
[(txid, idx, TxSizeInBytes)] -> STM m [(txid, idx, TxSizeInBytes)]
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Int -> [(txid, idx, TxSizeInBytes)] -> [(txid, idx, TxSizeInBytes)]
forall a. Int -> [a] -> [a]
take (Word16 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word16
reqNo) [(txid, idx, TxSizeInBytes)]
txs)
TokBlockingStyle blocking
TokNonBlocking -> do
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Word16
reqNo Word16 -> Word16 -> Bool
forall a. Eq a => a -> a -> Bool
== Word16
0 Bool -> Bool -> Bool
&& Word16
ackNo Word16 -> Word16 -> Bool
forall a. Eq a => a -> a -> Bool
== Word16
0) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
TxSubmissionProtocolError -> m ()
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO TxSubmissionProtocolError
ProtocolErrorRequestedNothing
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (StrictSeq (txid, idx) -> Bool
forall a. StrictSeq a -> Bool
Seq.null StrictSeq (txid, idx)
unackedSeq') (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
TxSubmissionProtocolError -> m ()
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO TxSubmissionProtocolError
ProtocolErrorRequestNonBlocking
STM m (Maybe [(txid, idx, TxSizeInBytes)])
-> m (Maybe [(txid, idx, TxSizeInBytes)])
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m (Maybe [(txid, idx, TxSizeInBytes)])
-> m (Maybe [(txid, idx, TxSizeInBytes)]))
-> STM m (Maybe [(txid, idx, TxSizeInBytes)])
-> m (Maybe [(txid, idx, TxSizeInBytes)])
forall a b. (a -> b) -> a -> b
$ do
MempoolSnapshot{idx -> [(txid, idx, TxSizeInBytes)]
mempoolTxIdsAfter :: idx -> [(txid, idx, TxSizeInBytes)]
mempoolTxIdsAfter :: forall txid tx idx.
MempoolSnapshot txid tx idx -> idx -> [(txid, idx, TxSizeInBytes)]
mempoolTxIdsAfter} <- STM m (MempoolSnapshot txid tx idx)
mempoolGetSnapshot
let txs :: [(txid, idx, TxSizeInBytes)]
txs = idx -> [(txid, idx, TxSizeInBytes)]
mempoolTxIdsAfter idx
lastIdx
Maybe [(txid, idx, TxSizeInBytes)]
-> STM m (Maybe [(txid, idx, TxSizeInBytes)])
forall (m :: * -> *) a. Monad m => a -> m a
return ([(txid, idx, TxSizeInBytes)] -> Maybe [(txid, idx, TxSizeInBytes)]
forall a. a -> Maybe a
Just ([(txid, idx, TxSizeInBytes)]
-> Maybe [(txid, idx, TxSizeInBytes)])
-> [(txid, idx, TxSizeInBytes)]
-> Maybe [(txid, idx, TxSizeInBytes)]
forall a b. (a -> b) -> a -> b
$ Int -> [(txid, idx, TxSizeInBytes)] -> [(txid, idx, TxSizeInBytes)]
forall a. Int -> [a] -> [a]
take (Word16 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word16
reqNo) [(txid, idx, TxSizeInBytes)]
txs)
ClientStTxIds blocking txid tx m ()
-> m (ClientStTxIds blocking txid tx m ())
forall (m :: * -> *) a. Monad m => a -> m a
return (ClientStTxIds blocking txid tx m ()
-> m (ClientStTxIds blocking txid tx m ()))
-> ClientStTxIds blocking txid tx m ()
-> m (ClientStTxIds blocking txid tx m ())
forall a b. (a -> b) -> a -> b
$! case (Maybe [(txid, idx, TxSizeInBytes)]
mbtxs, TokBlockingStyle blocking
blocking) of
(Maybe [(txid, idx, TxSizeInBytes)]
Nothing, TokBlockingStyle blocking
TokBlocking) -> () -> ClientStTxIds 'StBlocking txid tx m ()
forall a txid tx (m :: * -> *).
a -> ClientStTxIds 'StBlocking txid tx m a
SendMsgDone ()
(Maybe [(txid, idx, TxSizeInBytes)]
Nothing, TokBlockingStyle blocking
TokNonBlocking) -> String -> ClientStTxIds blocking txid tx m ()
forall a. HasCallStack => String -> a
error String
"txSubmissionOutbound: impossible happend!"
(Just [(txid, idx, TxSizeInBytes)]
txs, TokBlockingStyle blocking
_) ->
Bool
-> ClientStTxIds blocking txid tx m ()
-> ClientStTxIds blocking txid tx m ()
forall a. HasCallStack => Bool -> a -> a
assert (((txid, idx, TxSizeInBytes) -> Bool)
-> [(txid, idx, TxSizeInBytes)] -> Bool
forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Bool
all (\(txid
_, idx
idx, TxSizeInBytes
_) -> idx
idx idx -> idx -> Bool
forall a. Ord a => a -> a -> Bool
> idx
lastIdx) [(txid, idx, TxSizeInBytes)]
txs) (ClientStTxIds blocking txid tx m ()
-> ClientStTxIds blocking txid tx m ())
-> ClientStTxIds blocking txid tx m ()
-> ClientStTxIds blocking txid tx m ()
forall a b. (a -> b) -> a -> b
$
let !unackedSeq'' :: StrictSeq (txid, idx)
unackedSeq'' = StrictSeq (txid, idx)
unackedSeq' StrictSeq (txid, idx)
-> StrictSeq (txid, idx) -> StrictSeq (txid, idx)
forall a. Semigroup a => a -> a -> a
<> [(txid, idx)] -> StrictSeq (txid, idx)
forall a. [a] -> StrictSeq a
Seq.fromList
[ (txid
txid, idx
idx) | (txid
txid, idx
idx, TxSizeInBytes
_) <- [(txid, idx, TxSizeInBytes)]
txs ]
!lastIdx' :: idx
lastIdx'
| [(txid, idx, TxSizeInBytes)] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [(txid, idx, TxSizeInBytes)]
txs = idx
lastIdx
| Bool
otherwise = idx
idx where (txid
_, idx
idx, TxSizeInBytes
_) = [(txid, idx, TxSizeInBytes)] -> (txid, idx, TxSizeInBytes)
forall a. [a] -> a
last [(txid, idx, TxSizeInBytes)]
txs
txs' :: [(txid, TxSizeInBytes)]
txs' :: [(txid, TxSizeInBytes)]
txs' = [ (txid
txid, TxSizeInBytes
size) | (txid
txid, idx
_, TxSizeInBytes
size) <- [(txid, idx, TxSizeInBytes)]
txs ]
client' :: ClientStIdle txid tx m ()
client' = StrictSeq (txid, idx) -> idx -> ClientStIdle txid tx m ()
client StrictSeq (txid, idx)
unackedSeq'' idx
lastIdx'
in case TokBlockingStyle blocking
blocking of
TokBlockingStyle blocking
TokNonBlocking -> BlockingReplyList 'StNonBlocking (txid, TxSizeInBytes)
-> ClientStIdle txid tx m ()
-> ClientStTxIds 'StNonBlocking txid tx m ()
forall (blocking :: StBlockingStyle) txid tx (m :: * -> *) a.
BlockingReplyList blocking (txid, TxSizeInBytes)
-> ClientStIdle txid tx m a -> ClientStTxIds blocking txid tx m a
SendMsgReplyTxIds ([(txid, TxSizeInBytes)]
-> BlockingReplyList 'StNonBlocking (txid, TxSizeInBytes)
forall a. [a] -> BlockingReplyList 'StNonBlocking a
NonBlockingReply [(txid, TxSizeInBytes)]
txs') ClientStIdle txid tx m ()
client'
TokBlockingStyle blocking
TokBlocking -> BlockingReplyList 'StBlocking (txid, TxSizeInBytes)
-> ClientStIdle txid tx m ()
-> ClientStTxIds 'StBlocking txid tx m ()
forall (blocking :: StBlockingStyle) txid tx (m :: * -> *) a.
BlockingReplyList blocking (txid, TxSizeInBytes)
-> ClientStIdle txid tx m a -> ClientStTxIds blocking txid tx m a
SendMsgReplyTxIds (NonEmpty (txid, TxSizeInBytes)
-> BlockingReplyList 'StBlocking (txid, TxSizeInBytes)
forall a. NonEmpty a -> BlockingReplyList 'StBlocking a
BlockingReply NonEmpty (txid, TxSizeInBytes)
txs'') ClientStIdle txid tx m ()
client'
where
txs'' :: NonEmpty (txid, TxSizeInBytes)
txs'' = case [(txid, TxSizeInBytes)] -> Maybe (NonEmpty (txid, TxSizeInBytes))
forall a. [a] -> Maybe (NonEmpty a)
NonEmpty.nonEmpty [(txid, TxSizeInBytes)]
txs' of
Just NonEmpty (txid, TxSizeInBytes)
x -> NonEmpty (txid, TxSizeInBytes)
x
Maybe (NonEmpty (txid, TxSizeInBytes))
Nothing -> String -> NonEmpty (txid, TxSizeInBytes)
forall a. HasCallStack => String -> a
error String
"txSubmissionOutbound: empty transaction's list"
recvMsgRequestTxs :: [txid]
-> m (ClientStTxs txid tx m ())
recvMsgRequestTxs :: [txid] -> m (ClientStTxs txid tx m ())
recvMsgRequestTxs [txid]
txids = do
Tracer m (TraceTxSubmissionOutbound txid tx)
-> TraceTxSubmissionOutbound txid tx -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (TraceTxSubmissionOutbound txid tx)
tracer ([txid] -> TraceTxSubmissionOutbound txid tx
forall txid tx. [txid] -> TraceTxSubmissionOutbound txid tx
TraceTxSubmissionOutboundRecvMsgRequestTxs [txid]
txids)
MempoolSnapshot{idx -> Maybe tx
mempoolLookupTx :: forall txid tx idx. MempoolSnapshot txid tx idx -> idx -> Maybe tx
mempoolLookupTx :: idx -> Maybe tx
mempoolLookupTx} <- STM m (MempoolSnapshot txid tx idx)
-> m (MempoolSnapshot txid tx idx)
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically STM m (MempoolSnapshot txid tx idx)
mempoolGetSnapshot
let txidxs :: [Maybe (txid, idx)]
txidxs = [ ((txid, idx) -> Bool) -> StrictSeq (txid, idx) -> Maybe (txid, idx)
forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Maybe a
find (\(txid
t,idx
_) -> txid
t txid -> txid -> Bool
forall a. Eq a => a -> a -> Bool
== txid
txid) StrictSeq (txid, idx)
unackedSeq | txid
txid <- [txid]
txids ]
txidxs' :: [idx]
txidxs' = ((txid, idx) -> idx) -> [(txid, idx)] -> [idx]
forall a b. (a -> b) -> [a] -> [b]
map (txid, idx) -> idx
forall a b. (a, b) -> b
snd ([(txid, idx)] -> [idx]) -> [(txid, idx)] -> [idx]
forall a b. (a -> b) -> a -> b
$ [Maybe (txid, idx)] -> [(txid, idx)]
forall a. [Maybe a] -> [a]
catMaybes [Maybe (txid, idx)]
txidxs
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when ((Maybe (txid, idx) -> Bool) -> [Maybe (txid, idx)] -> Bool
forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Bool
any Maybe (txid, idx) -> Bool
forall a. Maybe a -> Bool
isNothing [Maybe (txid, idx)]
txidxs) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
TxSubmissionProtocolError -> m ()
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO TxSubmissionProtocolError
ProtocolErrorRequestedUnavailableTx
let txs :: [tx]
txs = [Maybe tx] -> [tx]
forall a. [Maybe a] -> [a]
catMaybes ((idx -> Maybe tx) -> [idx] -> [Maybe tx]
forall a b. (a -> b) -> [a] -> [b]
map idx -> Maybe tx
mempoolLookupTx [idx]
txidxs')
client' :: ClientStIdle txid tx m ()
client' = StrictSeq (txid, idx) -> idx -> ClientStIdle txid tx m ()
client StrictSeq (txid, idx)
unackedSeq idx
lastIdx
Tracer m (TraceTxSubmissionOutbound txid tx)
-> TraceTxSubmissionOutbound txid tx -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (TraceTxSubmissionOutbound txid tx)
tracer ([tx] -> TraceTxSubmissionOutbound txid tx
forall txid tx. [tx] -> TraceTxSubmissionOutbound txid tx
TraceTxSubmissionOutboundSendMsgReplyTxs [tx]
txs)
ClientStTxs txid tx m () -> m (ClientStTxs txid tx m ())
forall (m :: * -> *) a. Monad m => a -> m a
return (ClientStTxs txid tx m () -> m (ClientStTxs txid tx m ()))
-> ClientStTxs txid tx m () -> m (ClientStTxs txid tx m ())
forall a b. (a -> b) -> a -> b
$ [tx] -> ClientStIdle txid tx m () -> ClientStTxs txid tx m ()
forall tx txid (m :: * -> *) a.
[tx] -> ClientStIdle txid tx m a -> ClientStTxs txid tx m a
SendMsgReplyTxs [tx]
txs ClientStIdle txid tx m ()
client'