{-# LANGUAGE NamedFieldPuns      #-}
{-# LANGUAGE RankNTypes          #-}
{-# LANGUAGE ScopedTypeVariables #-}

module Ouroboros.Network.Channel
  ( Channel (..)
  , toChannel
  , fromChannel
  , createPipeConnectedChannels
  , hoistChannel
  , isoKleisliChannel
  , fixedInputChannel
  , mvarsAsChannel
  , handlesAsChannel
  , createConnectedChannels
  , createConnectedBufferedChannels
  , createConnectedBufferedChannelsSTM
  , createPipelineTestChannels
  , channelEffect
  , delayChannel
  , loggingChannel
  ) where

import           Control.Monad ((>=>))
import           Control.Monad.Class.MonadSay
import           Control.Monad.Class.MonadTimer
import qualified Data.ByteString as BS
import qualified Data.ByteString.Lazy as LBS
import           Data.ByteString.Lazy.Internal (smallChunkSize)
import           Numeric.Natural

import qualified System.IO as IO (Handle, hFlush, hIsEOF)

import           Control.Monad.Class.MonadSTM

import qualified Network.Mux.Channel as Mx


-- | One end of a duplex channel. It is a reliable, ordered channel of some
-- medium. The medium does not imply message boundaries, it can be just bytes.
--
data Channel m a = Channel {

       -- | Write output to the channel.
       --
       -- It may raise exceptions (as appropriate for the monad and kind of
       -- channel).
       --
       Channel m a -> a -> m ()
send :: a -> m (),

       -- | Read some input from the channel, or @Nothing@ to indicate EOF.
       --
       -- Note that having received EOF it is still possible to send.
       -- The EOF condition is however monotonic.
       --
       -- It may raise exceptions (as appropriate for the monad and kind of
       -- channel).
       --
       Channel m a -> m (Maybe a)
recv :: m (Maybe a)
     }

-- TODO: eliminate the second Channel type and these conversion functions.

fromChannel :: Mx.Channel m
            -> Channel m LBS.ByteString
fromChannel :: Channel m -> Channel m ByteString
fromChannel Mx.Channel { ByteString -> m ()
send :: forall (m :: * -> *). Channel m -> ByteString -> m ()
send :: ByteString -> m ()
Mx.send, m (Maybe ByteString)
recv :: forall (m :: * -> *). Channel m -> m (Maybe ByteString)
recv :: m (Maybe ByteString)
Mx.recv } = Channel :: forall (m :: * -> *) a. (a -> m ()) -> m (Maybe a) -> Channel m a
Channel {
    send :: ByteString -> m ()
send = ByteString -> m ()
send,
    recv :: m (Maybe ByteString)
recv = m (Maybe ByteString)
recv
  }

toChannel :: Channel m LBS.ByteString
          -> Mx.Channel m
toChannel :: Channel m ByteString -> Channel m
toChannel Channel { ByteString -> m ()
send :: ByteString -> m ()
send :: forall (m :: * -> *) a. Channel m a -> a -> m ()
send, m (Maybe ByteString)
recv :: m (Maybe ByteString)
recv :: forall (m :: * -> *) a. Channel m a -> m (Maybe a)
recv } = Channel :: forall (m :: * -> *).
(ByteString -> m ()) -> m (Maybe ByteString) -> Channel m
Mx.Channel {
    send :: ByteString -> m ()
Mx.send = ByteString -> m ()
send,
    recv :: m (Maybe ByteString)
Mx.recv = m (Maybe ByteString)
recv
  }

-- | Create a local pipe, with both ends in this process, and expose that as
-- a pair of 'Channel's, one for each end.
--
-- This is primarily for testing purposes since it does not allow actual IPC.
--
createPipeConnectedChannels :: IO (Channel IO LBS.ByteString,
                                   Channel IO LBS.ByteString)
createPipeConnectedChannels :: IO (Channel IO ByteString, Channel IO ByteString)
createPipeConnectedChannels =
    (\(Channel IO
a, Channel IO
b) -> (Channel IO -> Channel IO ByteString
forall (m :: * -> *). Channel m -> Channel m ByteString
fromChannel Channel IO
a, Channel IO -> Channel IO ByteString
forall (m :: * -> *). Channel m -> Channel m ByteString
fromChannel Channel IO
b))
    ((Channel IO, Channel IO)
 -> (Channel IO ByteString, Channel IO ByteString))
-> IO (Channel IO, Channel IO)
-> IO (Channel IO ByteString, Channel IO ByteString)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO (Channel IO, Channel IO)
Mx.createPipeConnectedChannels

-- | Given an isomorphism between @a@ and @b@ (in Kleisli category), transform
-- a @'Channel' m a@ into @'Channel' m b@.
--
isoKleisliChannel
  :: forall a b m. Monad m
  => (a -> m b)
  -> (b -> m a)
  -> Channel m a
  -> Channel m b
isoKleisliChannel :: (a -> m b) -> (b -> m a) -> Channel m a -> Channel m b
isoKleisliChannel a -> m b
f b -> m a
finv Channel{a -> m ()
send :: a -> m ()
send :: forall (m :: * -> *) a. Channel m a -> a -> m ()
send, m (Maybe a)
recv :: m (Maybe a)
recv :: forall (m :: * -> *) a. Channel m a -> m (Maybe a)
recv} = Channel :: forall (m :: * -> *) a. (a -> m ()) -> m (Maybe a) -> Channel m a
Channel {
    send :: b -> m ()
send = b -> m a
finv (b -> m a) -> (a -> m ()) -> b -> m ()
forall (m :: * -> *) a b c.
Monad m =>
(a -> m b) -> (b -> m c) -> a -> m c
>=> a -> m ()
send,
    recv :: m (Maybe b)
recv = m (Maybe a)
recv m (Maybe a) -> (Maybe a -> m (Maybe b)) -> m (Maybe b)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (a -> m b) -> Maybe a -> m (Maybe b)
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
traverse a -> m b
f
  }


hoistChannel
  :: (forall x . m x -> n x)
  -> Channel m a
  -> Channel n a
hoistChannel :: (forall x. m x -> n x) -> Channel m a -> Channel n a
hoistChannel forall x. m x -> n x
nat Channel m a
channel = Channel :: forall (m :: * -> *) a. (a -> m ()) -> m (Maybe a) -> Channel m a
Channel
  { send :: a -> n ()
send = m () -> n ()
forall x. m x -> n x
nat (m () -> n ()) -> (a -> m ()) -> a -> n ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Channel m a -> a -> m ()
forall (m :: * -> *) a. Channel m a -> a -> m ()
send Channel m a
channel
  , recv :: n (Maybe a)
recv = m (Maybe a) -> n (Maybe a)
forall x. m x -> n x
nat (Channel m a -> m (Maybe a)
forall (m :: * -> *) a. Channel m a -> m (Maybe a)
recv Channel m a
channel)
  }

-- | A 'Channel' with a fixed input, and where all output is discarded.
--
-- The input is guaranteed to be supplied via 'read' with the given chunk
-- boundaries.
--
-- This is only useful for testing. In particular the fixed chunk boundaries
-- can be used to test that framing and other codecs work with any possible
-- chunking.
--
fixedInputChannel :: MonadSTM m => [a] -> m (Channel m a)
fixedInputChannel :: [a] -> m (Channel m a)
fixedInputChannel [a]
xs0 = do
    TVar m [a]
v <- STM m (TVar m [a]) -> m (TVar m [a])
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m (TVar m [a]) -> m (TVar m [a]))
-> STM m (TVar m [a]) -> m (TVar m [a])
forall a b. (a -> b) -> a -> b
$ [a] -> STM m (TVar m [a])
forall (m :: * -> *) a. MonadSTM m => a -> STM m (TVar m a)
newTVar [a]
xs0
    Channel m a -> m (Channel m a)
forall (m :: * -> *) a. Monad m => a -> m a
return Channel :: forall (m :: * -> *) a. (a -> m ()) -> m (Maybe a) -> Channel m a
Channel {a -> m ()
forall (m :: * -> *) p. Monad m => p -> m ()
send :: forall (m :: * -> *) p. Monad m => p -> m ()
send :: a -> m ()
send, recv :: m (Maybe a)
recv = TVar m [a] -> m (Maybe a)
forall (m :: * -> *) a. MonadSTM m => TVar m [a] -> m (Maybe a)
recv TVar m [a]
v}
  where
    recv :: TVar m [a] -> m (Maybe a)
recv TVar m [a]
v = STM m (Maybe a) -> m (Maybe a)
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m (Maybe a) -> m (Maybe a)) -> STM m (Maybe a) -> m (Maybe a)
forall a b. (a -> b) -> a -> b
$ do
               [a]
xs <- TVar m [a] -> STM m [a]
forall (m :: * -> *) a. MonadSTM m => TVar m a -> STM m a
readTVar TVar m [a]
v
               case [a]
xs of
                 []      -> Maybe a -> STM m (Maybe a)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe a
forall a. Maybe a
Nothing
                 (a
x:[a]
xs') -> TVar m [a] -> [a] -> STM m ()
forall (m :: * -> *) a. MonadSTM m => TVar m a -> a -> STM m ()
writeTVar TVar m [a]
v [a]
xs' STM m () -> STM m (Maybe a) -> STM m (Maybe a)
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Maybe a -> STM m (Maybe a)
forall (m :: * -> *) a. Monad m => a -> m a
return (a -> Maybe a
forall a. a -> Maybe a
Just a
x)

    send :: p -> m ()
send p
_ = () -> m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()


-- | Make a 'Channel' from a pair of 'TMVar's, one for reading and one for
-- writing.
--
mvarsAsChannel :: MonadSTM m
               => TMVar m a
               -> TMVar m a
               -> Channel m a
mvarsAsChannel :: TMVar m a -> TMVar m a -> Channel m a
mvarsAsChannel TMVar m a
bufferRead TMVar m a
bufferWrite =
    Channel :: forall (m :: * -> *) a. (a -> m ()) -> m (Maybe a) -> Channel m a
Channel{a -> m ()
send :: a -> m ()
send :: a -> m ()
send, m (Maybe a)
recv :: m (Maybe a)
recv :: m (Maybe a)
recv}
  where
    send :: a -> m ()
send a
x = STM m () -> m ()
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (TMVar m a -> a -> STM m ()
forall (m :: * -> *) a. MonadSTM m => TMVar m a -> a -> STM m ()
putTMVar TMVar m a
bufferWrite a
x)
    recv :: m (Maybe a)
recv   = STM m (Maybe a) -> m (Maybe a)
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (a -> Maybe a
forall a. a -> Maybe a
Just (a -> Maybe a) -> STM m a -> STM m (Maybe a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TMVar m a -> STM m a
forall (m :: * -> *) a. MonadSTM m => TMVar m a -> STM m a
takeTMVar TMVar m a
bufferRead)


-- | Create a pair of channels that are connected via one-place buffers.
--
-- This is primarily useful for testing protocols.
--
createConnectedChannels :: MonadSTM m => m (Channel m a, Channel m a)
createConnectedChannels :: m (Channel m a, Channel m a)
createConnectedChannels = do
    -- Create two TMVars to act as the channel buffer (one for each direction)
    -- and use them to make both ends of a bidirectional channel
    TMVar m a
bufferA <- STM m (TMVar m a) -> m (TMVar m a)
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m (TMVar m a) -> m (TMVar m a))
-> STM m (TMVar m a) -> m (TMVar m a)
forall a b. (a -> b) -> a -> b
$ STM m (TMVar m a)
forall (m :: * -> *) a. MonadSTM m => STM m (TMVar m a)
newEmptyTMVar
    TMVar m a
bufferB <- STM m (TMVar m a) -> m (TMVar m a)
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m (TMVar m a) -> m (TMVar m a))
-> STM m (TMVar m a) -> m (TMVar m a)
forall a b. (a -> b) -> a -> b
$ STM m (TMVar m a)
forall (m :: * -> *) a. MonadSTM m => STM m (TMVar m a)
newEmptyTMVar

    (Channel m a, Channel m a) -> m (Channel m a, Channel m a)
forall (m :: * -> *) a. Monad m => a -> m a
return (TMVar m a -> TMVar m a -> Channel m a
forall (m :: * -> *) a.
MonadSTM m =>
TMVar m a -> TMVar m a -> Channel m a
mvarsAsChannel TMVar m a
bufferB TMVar m a
bufferA,
            TMVar m a -> TMVar m a -> Channel m a
forall (m :: * -> *) a.
MonadSTM m =>
TMVar m a -> TMVar m a -> Channel m a
mvarsAsChannel TMVar m a
bufferA TMVar m a
bufferB)


-- | Create a pair of channels that are connected via N-place buffers.
--
-- This variant /blocks/ when 'send' would exceed the maximum buffer size.
-- Use this variant when you want the environment rather than the 'Peer' to
-- limit the pipelining.
--
-- This is primarily useful for testing protocols.
--
createConnectedBufferedChannels :: forall m a. MonadSTM m
                                => Natural -> m (Channel m a, Channel m a)
createConnectedBufferedChannels :: Natural -> m (Channel m a, Channel m a)
createConnectedBufferedChannels Natural
sz = do
    (Channel (STM m) a
chan1, Channel (STM m) a
chan2) <- STM m (Channel (STM m) a, Channel (STM m) a)
-> m (Channel (STM m) a, Channel (STM m) a)
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m (Channel (STM m) a, Channel (STM m) a)
 -> m (Channel (STM m) a, Channel (STM m) a))
-> STM m (Channel (STM m) a, Channel (STM m) a)
-> m (Channel (STM m) a, Channel (STM m) a)
forall a b. (a -> b) -> a -> b
$ Natural -> STM m (Channel (STM m) a, Channel (STM m) a)
forall (m :: * -> *) a.
MonadSTM m =>
Natural -> STM m (Channel (STM m) a, Channel (STM m) a)
createConnectedBufferedChannelsSTM Natural
sz
    (Channel m a, Channel m a) -> m (Channel m a, Channel m a)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Channel (STM m) a -> Channel m a
wrap Channel (STM m) a
chan1, Channel (STM m) a -> Channel m a
wrap Channel (STM m) a
chan2)
  where
    wrap :: Channel (STM m) a -> Channel m a
    wrap :: Channel (STM m) a -> Channel m a
wrap Channel{a -> STM m ()
send :: a -> STM m ()
send :: forall (m :: * -> *) a. Channel m a -> a -> m ()
send, STM m (Maybe a)
recv :: STM m (Maybe a)
recv :: forall (m :: * -> *) a. Channel m a -> m (Maybe a)
recv} = Channel :: forall (m :: * -> *) a. (a -> m ()) -> m (Maybe a) -> Channel m a
Channel
      { send :: a -> m ()
send = STM m () -> m ()
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> (a -> STM m ()) -> a -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> STM m ()
send
      , recv :: m (Maybe a)
recv = STM m (Maybe a) -> m (Maybe a)
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically STM m (Maybe a)
recv
      }

-- | As 'createConnectedBufferedChannels', but in 'STM'.
--
-- TODO: it should return a pair of `Channel m a`.
createConnectedBufferedChannelsSTM :: MonadSTM m
                                   => Natural -> STM m (Channel (STM m) a, Channel (STM m) a)
createConnectedBufferedChannelsSTM :: Natural -> STM m (Channel (STM m) a, Channel (STM m) a)
createConnectedBufferedChannelsSTM Natural
sz = do
    -- Create two TBQueues to act as the channel buffers (one for each
    -- direction) and use them to make both ends of a bidirectional channel
    TBQueue m a
bufferA <- Natural -> STM m (TBQueue m a)
forall (m :: * -> *) a.
MonadSTM m =>
Natural -> STM m (TBQueue m a)
newTBQueue Natural
sz
    TBQueue m a
bufferB <- Natural -> STM m (TBQueue m a)
forall (m :: * -> *) a.
MonadSTM m =>
Natural -> STM m (TBQueue m a)
newTBQueue Natural
sz

    (Channel (STM m) a, Channel (STM m) a)
-> STM m (Channel (STM m) a, Channel (STM m) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (TBQueue m a -> TBQueue m a -> Channel (STM m) a
forall (m :: * -> *) a.
MonadSTM m =>
TBQueue m a -> TBQueue m a -> Channel (STM m) a
queuesAsChannel TBQueue m a
bufferB TBQueue m a
bufferA,
            TBQueue m a -> TBQueue m a -> Channel (STM m) a
forall (m :: * -> *) a.
MonadSTM m =>
TBQueue m a -> TBQueue m a -> Channel (STM m) a
queuesAsChannel TBQueue m a
bufferA TBQueue m a
bufferB)
  where
    queuesAsChannel :: TBQueue m a -> TBQueue m a -> Channel (STM m) a
queuesAsChannel TBQueue m a
bufferRead TBQueue m a
bufferWrite =
        Channel :: forall (m :: * -> *) a. (a -> m ()) -> m (Maybe a) -> Channel m a
Channel{a -> STM m ()
send :: a -> STM m ()
send :: a -> STM m ()
send, STM m (Maybe a)
recv :: STM m (Maybe a)
recv :: STM m (Maybe a)
recv}
      where
        send :: a -> STM m ()
send a
x = TBQueue m a -> a -> STM m ()
forall (m :: * -> *) a. MonadSTM m => TBQueue m a -> a -> STM m ()
writeTBQueue TBQueue m a
bufferWrite a
x
        recv :: STM m (Maybe a)
recv   = a -> Maybe a
forall a. a -> Maybe a
Just (a -> Maybe a) -> STM m a -> STM m (Maybe a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TBQueue m a -> STM m a
forall (m :: * -> *) a. MonadSTM m => TBQueue m a -> STM m a
readTBQueue TBQueue m a
bufferRead


-- | Create a pair of channels that are connected via N-place buffers.
--
-- This variant /fails/ when  'send' would exceed the maximum buffer size.
-- Use this variant when you want the 'PeerPipelined' to limit the pipelining
-- itself, and you want to check that it does not exceed the expected level of
-- pipelining.
--
-- This is primarily useful for testing protocols.
--
createPipelineTestChannels :: MonadSTM m
                           => Natural -> m (Channel m a, Channel m a)
createPipelineTestChannels :: Natural -> m (Channel m a, Channel m a)
createPipelineTestChannels Natural
sz = do
    -- Create two TBQueues to act as the channel buffers (one for each
    -- direction) and use them to make both ends of a bidirectional channel
    TBQueue m a
bufferA <- STM m (TBQueue m a) -> m (TBQueue m a)
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m (TBQueue m a) -> m (TBQueue m a))
-> STM m (TBQueue m a) -> m (TBQueue m a)
forall a b. (a -> b) -> a -> b
$ Natural -> STM m (TBQueue m a)
forall (m :: * -> *) a.
MonadSTM m =>
Natural -> STM m (TBQueue m a)
newTBQueue Natural
sz
    TBQueue m a
bufferB <- STM m (TBQueue m a) -> m (TBQueue m a)
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m (TBQueue m a) -> m (TBQueue m a))
-> STM m (TBQueue m a) -> m (TBQueue m a)
forall a b. (a -> b) -> a -> b
$ Natural -> STM m (TBQueue m a)
forall (m :: * -> *) a.
MonadSTM m =>
Natural -> STM m (TBQueue m a)
newTBQueue Natural
sz

    (Channel m a, Channel m a) -> m (Channel m a, Channel m a)
forall (m :: * -> *) a. Monad m => a -> m a
return (TBQueue m a -> TBQueue m a -> Channel m a
forall (m :: * -> *) a.
MonadSTM m =>
TBQueue m a -> TBQueue m a -> Channel m a
queuesAsChannel TBQueue m a
bufferB TBQueue m a
bufferA,
            TBQueue m a -> TBQueue m a -> Channel m a
forall (m :: * -> *) a.
MonadSTM m =>
TBQueue m a -> TBQueue m a -> Channel m a
queuesAsChannel TBQueue m a
bufferA TBQueue m a
bufferB)
  where
    queuesAsChannel :: TBQueue m a -> TBQueue m a -> Channel m a
queuesAsChannel TBQueue m a
bufferRead TBQueue m a
bufferWrite =
        Channel :: forall (m :: * -> *) a. (a -> m ()) -> m (Maybe a) -> Channel m a
Channel{a -> m ()
send :: a -> m ()
send :: a -> m ()
send, m (Maybe a)
recv :: m (Maybe a)
recv :: m (Maybe a)
recv}
      where
        send :: a -> m ()
send a
x = STM m () -> m ()
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m () -> m ()) -> STM m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
                   Bool
full <- TBQueue m a -> STM m Bool
forall (m :: * -> *) a. MonadSTM m => TBQueue m a -> STM m Bool
isFullTBQueue TBQueue m a
bufferWrite
                   if Bool
full then [Char] -> STM m ()
forall a. HasCallStack => [Char] -> a
error [Char]
failureMsg
                           else TBQueue m a -> a -> STM m ()
forall (m :: * -> *) a. MonadSTM m => TBQueue m a -> a -> STM m ()
writeTBQueue TBQueue m a
bufferWrite a
x
        recv :: m (Maybe a)
recv   = STM m (Maybe a) -> m (Maybe a)
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (a -> Maybe a
forall a. a -> Maybe a
Just (a -> Maybe a) -> STM m a -> STM m (Maybe a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TBQueue m a -> STM m a
forall (m :: * -> *) a. MonadSTM m => TBQueue m a -> STM m a
readTBQueue TBQueue m a
bufferRead)

    failureMsg :: [Char]
failureMsg = [Char]
"createPipelineTestChannels: "
              [Char] -> [Char] -> [Char]
forall a. [a] -> [a] -> [a]
++ [Char]
"maximum pipeline depth exceeded: " [Char] -> [Char] -> [Char]
forall a. [a] -> [a] -> [a]
++ Natural -> [Char]
forall a. Show a => a -> [Char]
show Natural
sz


-- | Make a 'Channel' from a pair of IO 'Handle's, one for reading and one
-- for writing.
--
-- The Handles should be open in the appropriate read or write mode, and in
-- binary mode. Writes are flushed after each write, so it is safe to use
-- a buffering mode.
--
-- For bidirectional handles it is safe to pass the same handle for both.
--
handlesAsChannel :: IO.Handle -- ^ Read handle
                 -> IO.Handle -- ^ Write handle
                 -> Channel IO LBS.ByteString
handlesAsChannel :: Handle -> Handle -> Channel IO ByteString
handlesAsChannel Handle
hndRead Handle
hndWrite =
    Channel :: forall (m :: * -> *) a. (a -> m ()) -> m (Maybe a) -> Channel m a
Channel{ByteString -> IO ()
send :: ByteString -> IO ()
send :: ByteString -> IO ()
send, IO (Maybe ByteString)
recv :: IO (Maybe ByteString)
recv :: IO (Maybe ByteString)
recv}
  where
    send :: LBS.ByteString -> IO ()
    send :: ByteString -> IO ()
send ByteString
chunk = do
      Handle -> ByteString -> IO ()
LBS.hPut Handle
hndWrite ByteString
chunk
      Handle -> IO ()
IO.hFlush Handle
hndWrite

    recv :: IO (Maybe LBS.ByteString)
    recv :: IO (Maybe ByteString)
recv = do
      Bool
eof <- Handle -> IO Bool
IO.hIsEOF Handle
hndRead
      if Bool
eof
        then Maybe ByteString -> IO (Maybe ByteString)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe ByteString
forall a. Maybe a
Nothing
        else ByteString -> Maybe ByteString
forall a. a -> Maybe a
Just (ByteString -> Maybe ByteString)
-> (ByteString -> ByteString) -> ByteString -> Maybe ByteString
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> ByteString
LBS.fromStrict (ByteString -> Maybe ByteString)
-> IO ByteString -> IO (Maybe ByteString)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Handle -> Int -> IO ByteString
BS.hGetSome Handle
hndRead Int
smallChunkSize


-- | Transform a channel to add an extra action before /every/ send and after
-- /every/ receive.
--
channelEffect :: forall m a.
                 Monad m
              => (a -> m ())        -- ^ Action before 'send'
              -> (Maybe a -> m ())  -- ^ Action after 'recv'
              -> Channel m a
              -> Channel m a
channelEffect :: (a -> m ()) -> (Maybe a -> m ()) -> Channel m a -> Channel m a
channelEffect a -> m ()
beforeSend Maybe a -> m ()
afterRecv Channel{a -> m ()
send :: a -> m ()
send :: forall (m :: * -> *) a. Channel m a -> a -> m ()
send, m (Maybe a)
recv :: m (Maybe a)
recv :: forall (m :: * -> *) a. Channel m a -> m (Maybe a)
recv} =
    Channel :: forall (m :: * -> *) a. (a -> m ()) -> m (Maybe a) -> Channel m a
Channel{
      send :: a -> m ()
send = \a
x -> do
        a -> m ()
beforeSend a
x
        a -> m ()
send a
x

    , recv :: m (Maybe a)
recv = do
        Maybe a
mx <- m (Maybe a)
recv
        Maybe a -> m ()
afterRecv Maybe a
mx
        Maybe a -> m (Maybe a)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe a
mx
    }

-- | Delay a channel on the receiver end.
--
-- This is intended for testing, as a crude approximation of network delays.
-- More accurate models along these lines are of course possible.
--
delayChannel :: ( MonadSTM m
                , MonadTimer m
                )
             => DiffTime
             -> Channel m a
             -> Channel m a
delayChannel :: DiffTime -> Channel m a -> Channel m a
delayChannel DiffTime
delay = (a -> m ()) -> (Maybe a -> m ()) -> Channel m a -> Channel m a
forall (m :: * -> *) a.
Monad m =>
(a -> m ()) -> (Maybe a -> m ()) -> Channel m a -> Channel m a
channelEffect (\a
_ -> () -> m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ())
                                   (\Maybe a
_ -> DiffTime -> m ()
forall (m :: * -> *). MonadDelay m => DiffTime -> m ()
threadDelay DiffTime
delay)


-- | Channel which logs sent and received messages.
--
loggingChannel :: ( MonadSay m
                  , Show id
                  , Show a
                  )
               => id
               -> Channel m a
               -> Channel m a
loggingChannel :: id -> Channel m a -> Channel m a
loggingChannel id
ident Channel{a -> m ()
send :: a -> m ()
send :: forall (m :: * -> *) a. Channel m a -> a -> m ()
send,m (Maybe a)
recv :: m (Maybe a)
recv :: forall (m :: * -> *) a. Channel m a -> m (Maybe a)
recv} =
  Channel :: forall (m :: * -> *) a. (a -> m ()) -> m (Maybe a) -> Channel m a
Channel {
    send :: a -> m ()
send = a -> m ()
loggingSend,
    recv :: m (Maybe a)
recv = m (Maybe a)
loggingRecv
  }
 where
  loggingSend :: a -> m ()
loggingSend a
a = do
    [Char] -> m ()
forall (m :: * -> *). MonadSay m => [Char] -> m ()
say (id -> [Char]
forall a. Show a => a -> [Char]
show id
ident [Char] -> [Char] -> [Char]
forall a. [a] -> [a] -> [a]
++ [Char]
":send:" [Char] -> [Char] -> [Char]
forall a. [a] -> [a] -> [a]
++ a -> [Char]
forall a. Show a => a -> [Char]
show a
a)
    a -> m ()
send a
a

  loggingRecv :: m (Maybe a)
loggingRecv = do
    Maybe a
msg <- m (Maybe a)
recv
    case Maybe a
msg of
      Maybe a
Nothing -> () -> m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
      Just a
a  -> [Char] -> m ()
forall (m :: * -> *). MonadSay m => [Char] -> m ()
say (id -> [Char]
forall a. Show a => a -> [Char]
show id
ident [Char] -> [Char] -> [Char]
forall a. [a] -> [a] -> [a]
++ [Char]
":recv:" [Char] -> [Char] -> [Char]
forall a. [a] -> [a] -> [a]
++ a -> [Char]
forall a. Show a => a -> [Char]
show a
a)
    Maybe a -> m (Maybe a)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe a
msg