{-# LANGUAGE BangPatterns        #-}
{-# LANGUAGE GADTs               #-}
{-# LANGUAGE NamedFieldPuns      #-}
{-# LANGUAGE RecordWildCards     #-}
{-# LANGUAGE ScopedTypeVariables #-}

module Ouroboros.Network.TxSubmission.Outbound
  ( txSubmissionOutbound
  , TraceTxSubmissionOutbound (..)
  , TxSubmissionProtocolError (..)
  ) where

import           Data.Foldable (find)
import qualified Data.List.NonEmpty as NonEmpty
import           Data.Maybe (catMaybes, isNothing)
import           Data.Sequence.Strict (StrictSeq)
import qualified Data.Sequence.Strict as Seq
import           Data.Word (Word16)

import           Control.Exception (assert)
import           Control.Monad (unless, when)
import           Control.Monad.Class.MonadSTM
import           Control.Monad.Class.MonadThrow
import           Control.Tracer (Tracer, traceWith)

import           Ouroboros.Network.Mux (ControlMessage, ControlMessageSTM,
                     timeoutWithControlMessage)
import           Ouroboros.Network.NodeToNode.Version (NodeToNodeVersion)
import           Ouroboros.Network.Protocol.TxSubmission2.Client
import           Ouroboros.Network.TxSubmission.Mempool.Reader
                     (MempoolSnapshot (..), TxSubmissionMempoolReader (..))


data TraceTxSubmissionOutbound txid tx
  = TraceTxSubmissionOutboundRecvMsgRequestTxs
      [txid]
      -- ^ The IDs of the transactions requested.
  | TraceTxSubmissionOutboundSendMsgReplyTxs
      [tx]
      -- ^ The transactions to be sent in the response.
  | TraceControlMessage ControlMessage
  deriving Int -> TraceTxSubmissionOutbound txid tx -> ShowS
[TraceTxSubmissionOutbound txid tx] -> ShowS
TraceTxSubmissionOutbound txid tx -> String
(Int -> TraceTxSubmissionOutbound txid tx -> ShowS)
-> (TraceTxSubmissionOutbound txid tx -> String)
-> ([TraceTxSubmissionOutbound txid tx] -> ShowS)
-> Show (TraceTxSubmissionOutbound txid tx)
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
forall txid tx.
(Show txid, Show tx) =>
Int -> TraceTxSubmissionOutbound txid tx -> ShowS
forall txid tx.
(Show txid, Show tx) =>
[TraceTxSubmissionOutbound txid tx] -> ShowS
forall txid tx.
(Show txid, Show tx) =>
TraceTxSubmissionOutbound txid tx -> String
showList :: [TraceTxSubmissionOutbound txid tx] -> ShowS
$cshowList :: forall txid tx.
(Show txid, Show tx) =>
[TraceTxSubmissionOutbound txid tx] -> ShowS
show :: TraceTxSubmissionOutbound txid tx -> String
$cshow :: forall txid tx.
(Show txid, Show tx) =>
TraceTxSubmissionOutbound txid tx -> String
showsPrec :: Int -> TraceTxSubmissionOutbound txid tx -> ShowS
$cshowsPrec :: forall txid tx.
(Show txid, Show tx) =>
Int -> TraceTxSubmissionOutbound txid tx -> ShowS
Show

data TxSubmissionProtocolError =
       ProtocolErrorAckedTooManyTxids
     | ProtocolErrorRequestedNothing
     | ProtocolErrorRequestedTooManyTxids Word16 Word16
     | ProtocolErrorRequestBlocking
     | ProtocolErrorRequestNonBlocking
     | ProtocolErrorRequestedUnavailableTx
  deriving Int -> TxSubmissionProtocolError -> ShowS
[TxSubmissionProtocolError] -> ShowS
TxSubmissionProtocolError -> String
(Int -> TxSubmissionProtocolError -> ShowS)
-> (TxSubmissionProtocolError -> String)
-> ([TxSubmissionProtocolError] -> ShowS)
-> Show TxSubmissionProtocolError
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [TxSubmissionProtocolError] -> ShowS
$cshowList :: [TxSubmissionProtocolError] -> ShowS
show :: TxSubmissionProtocolError -> String
$cshow :: TxSubmissionProtocolError -> String
showsPrec :: Int -> TxSubmissionProtocolError -> ShowS
$cshowsPrec :: Int -> TxSubmissionProtocolError -> ShowS
Show

instance Exception TxSubmissionProtocolError where
  displayException :: TxSubmissionProtocolError -> String
displayException TxSubmissionProtocolError
ProtocolErrorAckedTooManyTxids =
      String
"The peer tried to acknowledged more txids than are available to do so."

  displayException (ProtocolErrorRequestedTooManyTxids Word16
reqNo Word16
maxUnacked) =
      String
"The peer requested " String -> ShowS
forall a. [a] -> [a] -> [a]
++ Word16 -> String
forall a. Show a => a -> String
show Word16
reqNo String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
" txids which would put the "
   String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
"total in flight over the limit of " String -> ShowS
forall a. [a] -> [a] -> [a]
++ Word16 -> String
forall a. Show a => a -> String
show Word16
maxUnacked

  displayException TxSubmissionProtocolError
ProtocolErrorRequestedNothing =
      String
"The peer requested zero txids."

  displayException TxSubmissionProtocolError
ProtocolErrorRequestBlocking =
      String
"The peer made a blocking request for more txids when there are still "
   String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
"unacknowledged txids. It should have used a non-blocking request."

  displayException TxSubmissionProtocolError
ProtocolErrorRequestNonBlocking =
      String
"The peer made a non-blocking request for more txids when there are "
   String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
"no unacknowledged txids. It should have used a blocking request."

  displayException TxSubmissionProtocolError
ProtocolErrorRequestedUnavailableTx =
      String
"The peer requested a transaction which is not available, either "
   String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
"because it was never available or because it was previously requested."


txSubmissionOutbound
  :: forall txid tx idx m.
     (Ord txid, Ord idx, MonadSTM m, MonadThrow m)
  => Tracer m (TraceTxSubmissionOutbound txid tx)
  -> Word16         -- ^ Maximum number of unacknowledged txids allowed
  -> TxSubmissionMempoolReader txid tx idx m
  -> NodeToNodeVersion
  -> ControlMessageSTM m
  -> TxSubmissionClient txid tx m ()
txSubmissionOutbound :: Tracer m (TraceTxSubmissionOutbound txid tx)
-> Word16
-> TxSubmissionMempoolReader txid tx idx m
-> NodeToNodeVersion
-> ControlMessageSTM m
-> TxSubmissionClient txid tx m ()
txSubmissionOutbound Tracer m (TraceTxSubmissionOutbound txid tx)
tracer Word16
maxUnacked TxSubmissionMempoolReader{idx
STM m (MempoolSnapshot txid tx idx)
mempoolZeroIdx :: forall txid tx idx (m :: * -> *).
TxSubmissionMempoolReader txid tx idx m -> idx
mempoolGetSnapshot :: forall txid tx idx (m :: * -> *).
TxSubmissionMempoolReader txid tx idx m
-> STM m (MempoolSnapshot txid tx idx)
mempoolZeroIdx :: idx
mempoolGetSnapshot :: STM m (MempoolSnapshot txid tx idx)
..} NodeToNodeVersion
_version ControlMessageSTM m
controlMessageSTM =
    m (ClientStIdle txid tx m ()) -> TxSubmissionClient txid tx m ()
forall txid tx (m :: * -> *) a.
m (ClientStIdle txid tx m a) -> TxSubmissionClient txid tx m a
TxSubmissionClient (ClientStIdle txid tx m () -> m (ClientStIdle txid tx m ())
forall (f :: * -> *) a. Applicative f => a -> f a
pure (StrictSeq (txid, idx) -> idx -> ClientStIdle txid tx m ()
client StrictSeq (txid, idx)
forall a. StrictSeq a
Seq.empty idx
mempoolZeroIdx))
  where
    client :: StrictSeq (txid, idx) -> idx -> ClientStIdle txid tx m ()
    client :: StrictSeq (txid, idx) -> idx -> ClientStIdle txid tx m ()
client !StrictSeq (txid, idx)
unackedSeq !idx
lastIdx =
        ClientStIdle :: forall txid tx (m :: * -> *) a.
(forall (blocking :: StBlockingStyle).
 TokBlockingStyle blocking
 -> Word16 -> Word16 -> m (ClientStTxIds blocking txid tx m a))
-> ([txid] -> m (ClientStTxs txid tx m a))
-> ClientStIdle txid tx m a
ClientStIdle { forall (blocking :: StBlockingStyle).
TokBlockingStyle blocking
-> Word16 -> Word16 -> m (ClientStTxIds blocking txid tx m ())
recvMsgRequestTxIds :: forall (blocking :: StBlockingStyle).
TokBlockingStyle blocking
-> Word16 -> Word16 -> m (ClientStTxIds blocking txid tx m ())
recvMsgRequestTxIds :: forall (blocking :: StBlockingStyle).
TokBlockingStyle blocking
-> Word16 -> Word16 -> m (ClientStTxIds blocking txid tx m ())
recvMsgRequestTxIds, [txid] -> m (ClientStTxs txid tx m ())
recvMsgRequestTxs :: [txid] -> m (ClientStTxs txid tx m ())
recvMsgRequestTxs :: [txid] -> m (ClientStTxs txid tx m ())
recvMsgRequestTxs }
      where
        recvMsgRequestTxIds :: forall blocking.
                               TokBlockingStyle blocking
                            -> Word16
                            -> Word16
                            -> m (ClientStTxIds blocking txid tx m ())
        recvMsgRequestTxIds :: TokBlockingStyle blocking
-> Word16 -> Word16 -> m (ClientStTxIds blocking txid tx m ())
recvMsgRequestTxIds TokBlockingStyle blocking
blocking Word16
ackNo Word16
reqNo = do

          Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Word16
ackNo Word16 -> Word16 -> Bool
forall a. Ord a => a -> a -> Bool
> Int -> Word16
forall a b. (Integral a, Num b) => a -> b
fromIntegral (StrictSeq (txid, idx) -> Int
forall a. StrictSeq a -> Int
Seq.length StrictSeq (txid, idx)
unackedSeq)) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
            TxSubmissionProtocolError -> m ()
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO TxSubmissionProtocolError
ProtocolErrorAckedTooManyTxids

          Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (  Int -> Word16
forall a b. (Integral a, Num b) => a -> b
fromIntegral (StrictSeq (txid, idx) -> Int
forall a. StrictSeq a -> Int
Seq.length StrictSeq (txid, idx)
unackedSeq)
                Word16 -> Word16 -> Word16
forall a. Num a => a -> a -> a
- Word16
ackNo
                Word16 -> Word16 -> Word16
forall a. Num a => a -> a -> a
+ Word16
reqNo
                Word16 -> Word16 -> Bool
forall a. Ord a => a -> a -> Bool
> Word16
maxUnacked) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
            TxSubmissionProtocolError -> m ()
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO (Word16 -> Word16 -> TxSubmissionProtocolError
ProtocolErrorRequestedTooManyTxids Word16
reqNo Word16
maxUnacked)

          -- Update our tracking state to remove the number of txids that the
          -- peer has acknowledged.
          let !unackedSeq' :: StrictSeq (txid, idx)
unackedSeq' = Int -> StrictSeq (txid, idx) -> StrictSeq (txid, idx)
forall a. Int -> StrictSeq a -> StrictSeq a
Seq.drop (Word16 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word16
ackNo) StrictSeq (txid, idx)
unackedSeq

          -- Grab info about any new txs after the last tx idx we've seen,
          -- up to  the number that the peer has requested.
          Maybe [(txid, idx, TxSizeInBytes)]
mbtxs <- case TokBlockingStyle blocking
blocking of
            TokBlockingStyle blocking
TokBlocking -> do
              Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Word16
reqNo Word16 -> Word16 -> Bool
forall a. Eq a => a -> a -> Bool
== Word16
0) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
                TxSubmissionProtocolError -> m ()
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO TxSubmissionProtocolError
ProtocolErrorRequestedNothing
              Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (StrictSeq (txid, idx) -> Bool
forall a. StrictSeq a -> Bool
Seq.null StrictSeq (txid, idx)
unackedSeq') (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
                TxSubmissionProtocolError -> m ()
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO TxSubmissionProtocolError
ProtocolErrorRequestBlocking

              ControlMessageSTM m
-> STM m [(txid, idx, TxSizeInBytes)]
-> m (Maybe [(txid, idx, TxSizeInBytes)])
forall (m :: * -> *) a.
MonadSTM m =>
ControlMessageSTM m -> STM m a -> m (Maybe a)
timeoutWithControlMessage ControlMessageSTM m
controlMessageSTM (STM m [(txid, idx, TxSizeInBytes)]
 -> m (Maybe [(txid, idx, TxSizeInBytes)]))
-> STM m [(txid, idx, TxSizeInBytes)]
-> m (Maybe [(txid, idx, TxSizeInBytes)])
forall a b. (a -> b) -> a -> b
$
                do
                  MempoolSnapshot{idx -> [(txid, idx, TxSizeInBytes)]
mempoolTxIdsAfter :: forall txid tx idx.
MempoolSnapshot txid tx idx -> idx -> [(txid, idx, TxSizeInBytes)]
mempoolTxIdsAfter :: idx -> [(txid, idx, TxSizeInBytes)]
mempoolTxIdsAfter} <- STM m (MempoolSnapshot txid tx idx)
mempoolGetSnapshot
                  let txs :: [(txid, idx, TxSizeInBytes)]
txs = idx -> [(txid, idx, TxSizeInBytes)]
mempoolTxIdsAfter idx
lastIdx
                  Bool -> STM m ()
forall (m :: * -> *). MonadSTM m => Bool -> STM m ()
check (Bool -> Bool
not (Bool -> Bool) -> Bool -> Bool
forall a b. (a -> b) -> a -> b
$ [(txid, idx, TxSizeInBytes)] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [(txid, idx, TxSizeInBytes)]
txs)
                  [(txid, idx, TxSizeInBytes)] -> STM m [(txid, idx, TxSizeInBytes)]
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Int -> [(txid, idx, TxSizeInBytes)] -> [(txid, idx, TxSizeInBytes)]
forall a. Int -> [a] -> [a]
take (Word16 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word16
reqNo) [(txid, idx, TxSizeInBytes)]
txs)

            TokBlockingStyle blocking
TokNonBlocking -> do
              Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Word16
reqNo Word16 -> Word16 -> Bool
forall a. Eq a => a -> a -> Bool
== Word16
0 Bool -> Bool -> Bool
&& Word16
ackNo Word16 -> Word16 -> Bool
forall a. Eq a => a -> a -> Bool
== Word16
0) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
                TxSubmissionProtocolError -> m ()
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO TxSubmissionProtocolError
ProtocolErrorRequestedNothing
              Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (StrictSeq (txid, idx) -> Bool
forall a. StrictSeq a -> Bool
Seq.null StrictSeq (txid, idx)
unackedSeq') (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
                TxSubmissionProtocolError -> m ()
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO TxSubmissionProtocolError
ProtocolErrorRequestNonBlocking

              STM m (Maybe [(txid, idx, TxSizeInBytes)])
-> m (Maybe [(txid, idx, TxSizeInBytes)])
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m (Maybe [(txid, idx, TxSizeInBytes)])
 -> m (Maybe [(txid, idx, TxSizeInBytes)]))
-> STM m (Maybe [(txid, idx, TxSizeInBytes)])
-> m (Maybe [(txid, idx, TxSizeInBytes)])
forall a b. (a -> b) -> a -> b
$ do
                MempoolSnapshot{idx -> [(txid, idx, TxSizeInBytes)]
mempoolTxIdsAfter :: idx -> [(txid, idx, TxSizeInBytes)]
mempoolTxIdsAfter :: forall txid tx idx.
MempoolSnapshot txid tx idx -> idx -> [(txid, idx, TxSizeInBytes)]
mempoolTxIdsAfter} <- STM m (MempoolSnapshot txid tx idx)
mempoolGetSnapshot
                let txs :: [(txid, idx, TxSizeInBytes)]
txs = idx -> [(txid, idx, TxSizeInBytes)]
mempoolTxIdsAfter idx
lastIdx
                Maybe [(txid, idx, TxSizeInBytes)]
-> STM m (Maybe [(txid, idx, TxSizeInBytes)])
forall (m :: * -> *) a. Monad m => a -> m a
return ([(txid, idx, TxSizeInBytes)] -> Maybe [(txid, idx, TxSizeInBytes)]
forall a. a -> Maybe a
Just ([(txid, idx, TxSizeInBytes)]
 -> Maybe [(txid, idx, TxSizeInBytes)])
-> [(txid, idx, TxSizeInBytes)]
-> Maybe [(txid, idx, TxSizeInBytes)]
forall a b. (a -> b) -> a -> b
$ Int -> [(txid, idx, TxSizeInBytes)] -> [(txid, idx, TxSizeInBytes)]
forall a. Int -> [a] -> [a]
take (Word16 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word16
reqNo) [(txid, idx, TxSizeInBytes)]
txs)

          ClientStTxIds blocking txid tx m ()
-> m (ClientStTxIds blocking txid tx m ())
forall (m :: * -> *) a. Monad m => a -> m a
return (ClientStTxIds blocking txid tx m ()
 -> m (ClientStTxIds blocking txid tx m ()))
-> ClientStTxIds blocking txid tx m ()
-> m (ClientStTxIds blocking txid tx m ())
forall a b. (a -> b) -> a -> b
$! case (Maybe [(txid, idx, TxSizeInBytes)]
mbtxs, TokBlockingStyle blocking
blocking) of
            (Maybe [(txid, idx, TxSizeInBytes)]
Nothing, TokBlockingStyle blocking
TokBlocking)    -> () -> ClientStTxIds 'StBlocking txid tx m ()
forall a txid tx (m :: * -> *).
a -> ClientStTxIds 'StBlocking txid tx m a
SendMsgDone ()
            (Maybe [(txid, idx, TxSizeInBytes)]
Nothing, TokBlockingStyle blocking
TokNonBlocking) -> String -> ClientStTxIds blocking txid tx m ()
forall a. HasCallStack => String -> a
error String
"txSubmissionOutbound: impossible happend!"
            (Just [(txid, idx, TxSizeInBytes)]
txs, TokBlockingStyle blocking
_) ->
              -- These txs should all be fresh
              Bool
-> ClientStTxIds blocking txid tx m ()
-> ClientStTxIds blocking txid tx m ()
forall a. HasCallStack => Bool -> a -> a
assert (((txid, idx, TxSizeInBytes) -> Bool)
-> [(txid, idx, TxSizeInBytes)] -> Bool
forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Bool
all (\(txid
_, idx
idx, TxSizeInBytes
_) -> idx
idx idx -> idx -> Bool
forall a. Ord a => a -> a -> Bool
> idx
lastIdx) [(txid, idx, TxSizeInBytes)]
txs) (ClientStTxIds blocking txid tx m ()
 -> ClientStTxIds blocking txid tx m ())
-> ClientStTxIds blocking txid tx m ()
-> ClientStTxIds blocking txid tx m ()
forall a b. (a -> b) -> a -> b
$
                -- Update our tracking state with any extra txs available.
                let !unackedSeq'' :: StrictSeq (txid, idx)
unackedSeq'' = StrictSeq (txid, idx)
unackedSeq' StrictSeq (txid, idx)
-> StrictSeq (txid, idx) -> StrictSeq (txid, idx)
forall a. Semigroup a => a -> a -> a
<> [(txid, idx)] -> StrictSeq (txid, idx)
forall a. [a] -> StrictSeq a
Seq.fromList
                                      [ (txid
txid, idx
idx) | (txid
txid, idx
idx, TxSizeInBytes
_) <- [(txid, idx, TxSizeInBytes)]
txs ]
                    !lastIdx' :: idx
lastIdx'
                      | [(txid, idx, TxSizeInBytes)] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [(txid, idx, TxSizeInBytes)]
txs  = idx
lastIdx
                      | Bool
otherwise = idx
idx where (txid
_, idx
idx, TxSizeInBytes
_) = [(txid, idx, TxSizeInBytes)] -> (txid, idx, TxSizeInBytes)
forall a. [a] -> a
last [(txid, idx, TxSizeInBytes)]
txs
                    txs'         :: [(txid, TxSizeInBytes)]
                    txs' :: [(txid, TxSizeInBytes)]
txs'          = [ (txid
txid, TxSizeInBytes
size) | (txid
txid, idx
_, TxSizeInBytes
size) <- [(txid, idx, TxSizeInBytes)]
txs ]
                    client' :: ClientStIdle txid tx m ()
client'       = StrictSeq (txid, idx) -> idx -> ClientStIdle txid tx m ()
client StrictSeq (txid, idx)
unackedSeq'' idx
lastIdx'

                -- Our reply type is different in the blocking vs non-blocking cases
                in case TokBlockingStyle blocking
blocking of
                    TokBlockingStyle blocking
TokNonBlocking -> BlockingReplyList 'StNonBlocking (txid, TxSizeInBytes)
-> ClientStIdle txid tx m ()
-> ClientStTxIds 'StNonBlocking txid tx m ()
forall (blocking :: StBlockingStyle) txid tx (m :: * -> *) a.
BlockingReplyList blocking (txid, TxSizeInBytes)
-> ClientStIdle txid tx m a -> ClientStTxIds blocking txid tx m a
SendMsgReplyTxIds ([(txid, TxSizeInBytes)]
-> BlockingReplyList 'StNonBlocking (txid, TxSizeInBytes)
forall a. [a] -> BlockingReplyList 'StNonBlocking a
NonBlockingReply [(txid, TxSizeInBytes)]
txs') ClientStIdle txid tx m ()
client'
                    TokBlockingStyle blocking
TokBlocking    -> BlockingReplyList 'StBlocking (txid, TxSizeInBytes)
-> ClientStIdle txid tx m ()
-> ClientStTxIds 'StBlocking txid tx m ()
forall (blocking :: StBlockingStyle) txid tx (m :: * -> *) a.
BlockingReplyList blocking (txid, TxSizeInBytes)
-> ClientStIdle txid tx m a -> ClientStTxIds blocking txid tx m a
SendMsgReplyTxIds (NonEmpty (txid, TxSizeInBytes)
-> BlockingReplyList 'StBlocking (txid, TxSizeInBytes)
forall a. NonEmpty a -> BlockingReplyList 'StBlocking a
BlockingReply   NonEmpty (txid, TxSizeInBytes)
txs'') ClientStIdle txid tx m ()
client'
                      where
                        txs'' :: NonEmpty (txid, TxSizeInBytes)
txs'' = case [(txid, TxSizeInBytes)] -> Maybe (NonEmpty (txid, TxSizeInBytes))
forall a. [a] -> Maybe (NonEmpty a)
NonEmpty.nonEmpty [(txid, TxSizeInBytes)]
txs' of
                          Just  NonEmpty (txid, TxSizeInBytes)
x -> NonEmpty (txid, TxSizeInBytes)
x
                          Maybe (NonEmpty (txid, TxSizeInBytes))
Nothing -> String -> NonEmpty (txid, TxSizeInBytes)
forall a. HasCallStack => String -> a
error String
"txSubmissionOutbound: empty transaction's list"
                        -- Assert txs is non-empty: we blocked until txs was non-null,
                        -- and we know reqNo > 0, hence take reqNo txs is non-null.


        recvMsgRequestTxs :: [txid]
                          -> m (ClientStTxs txid tx m ())
        recvMsgRequestTxs :: [txid] -> m (ClientStTxs txid tx m ())
recvMsgRequestTxs [txid]
txids = do
          -- Trace the IDs of the transactions requested.
          Tracer m (TraceTxSubmissionOutbound txid tx)
-> TraceTxSubmissionOutbound txid tx -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (TraceTxSubmissionOutbound txid tx)
tracer ([txid] -> TraceTxSubmissionOutbound txid tx
forall txid tx. [txid] -> TraceTxSubmissionOutbound txid tx
TraceTxSubmissionOutboundRecvMsgRequestTxs [txid]
txids)

          MempoolSnapshot{idx -> Maybe tx
mempoolLookupTx :: forall txid tx idx. MempoolSnapshot txid tx idx -> idx -> Maybe tx
mempoolLookupTx :: idx -> Maybe tx
mempoolLookupTx} <- STM m (MempoolSnapshot txid tx idx)
-> m (MempoolSnapshot txid tx idx)
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically STM m (MempoolSnapshot txid tx idx)
mempoolGetSnapshot

          -- The window size is expected to be small (currently 10) so the find is acceptable.
          let txidxs :: [Maybe (txid, idx)]
txidxs  = [ ((txid, idx) -> Bool) -> StrictSeq (txid, idx) -> Maybe (txid, idx)
forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Maybe a
find (\(txid
t,idx
_) -> txid
t txid -> txid -> Bool
forall a. Eq a => a -> a -> Bool
== txid
txid) StrictSeq (txid, idx)
unackedSeq | txid
txid <- [txid]
txids ]
              txidxs' :: [idx]
txidxs' = ((txid, idx) -> idx) -> [(txid, idx)] -> [idx]
forall a b. (a -> b) -> [a] -> [b]
map (txid, idx) -> idx
forall a b. (a, b) -> b
snd ([(txid, idx)] -> [idx]) -> [(txid, idx)] -> [idx]
forall a b. (a -> b) -> a -> b
$ [Maybe (txid, idx)] -> [(txid, idx)]
forall a. [Maybe a] -> [a]
catMaybes [Maybe (txid, idx)]
txidxs

          Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when ((Maybe (txid, idx) -> Bool) -> [Maybe (txid, idx)] -> Bool
forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Bool
any Maybe (txid, idx) -> Bool
forall a. Maybe a -> Bool
isNothing [Maybe (txid, idx)]
txidxs) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
            TxSubmissionProtocolError -> m ()
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO TxSubmissionProtocolError
ProtocolErrorRequestedUnavailableTx

          -- The 'mempoolLookupTx' will return nothing if the transaction is no
          -- longer in the mempool. This is good. Neither the sending nor
          -- receiving side wants to forward txs that are no longer of interest.
          let txs :: [tx]
txs          = [Maybe tx] -> [tx]
forall a. [Maybe a] -> [a]
catMaybes ((idx -> Maybe tx) -> [idx] -> [Maybe tx]
forall a b. (a -> b) -> [a] -> [b]
map idx -> Maybe tx
mempoolLookupTx [idx]
txidxs')
              client' :: ClientStIdle txid tx m ()
client'      = StrictSeq (txid, idx) -> idx -> ClientStIdle txid tx m ()
client StrictSeq (txid, idx)
unackedSeq idx
lastIdx

          -- Trace the transactions to be sent in the response.
          Tracer m (TraceTxSubmissionOutbound txid tx)
-> TraceTxSubmissionOutbound txid tx -> m ()
forall (m :: * -> *) a. Tracer m a -> a -> m ()
traceWith Tracer m (TraceTxSubmissionOutbound txid tx)
tracer ([tx] -> TraceTxSubmissionOutbound txid tx
forall txid tx. [tx] -> TraceTxSubmissionOutbound txid tx
TraceTxSubmissionOutboundSendMsgReplyTxs [tx]
txs)

          ClientStTxs txid tx m () -> m (ClientStTxs txid tx m ())
forall (m :: * -> *) a. Monad m => a -> m a
return (ClientStTxs txid tx m () -> m (ClientStTxs txid tx m ()))
-> ClientStTxs txid tx m () -> m (ClientStTxs txid tx m ())
forall a b. (a -> b) -> a -> b
$ [tx] -> ClientStIdle txid tx m () -> ClientStTxs txid tx m ()
forall tx txid (m :: * -> *) a.
[tx] -> ClientStIdle txid tx m a -> ClientStTxs txid tx m a
SendMsgReplyTxs [tx]
txs ClientStIdle txid tx m ()
client'