{-# LANGUAGE DeriveFunctor         #-}
{-# LANGUAGE FlexibleContexts      #-}
{-# LANGUAGE FlexibleInstances     #-}
{-# LANGUAGE GADTs                 #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE NamedFieldPuns        #-}
{-# LANGUAGE ScopedTypeVariables   #-}
{-# LANGUAGE StandaloneDeriving    #-}
{-# LANGUAGE TypeOperators         #-}
{-# LANGUAGE UndecidableInstances  #-}

-- | This module contains peer state management and error policies.
--
module Ouroboros.Network.Subscription.PeerState
  ( SuspendDecision (..)
  , suspend
    -- * PeerStates and its operations
  , PeerState (..)
  , threadsToCancel
  , PeerStates (..)
  , newPeerStatesVar
  , newPeerStatesVarSTM
  , cleanPeerStates
  , runSuspendDecision
  , registerConsumer
  , unregisterConsumer
  , registerProducer
  , unregisterProducer
  , BeforeConnect
  , ConnectDecision (..)
  , runBeforeConnect
  , beforeConnectTx
    -- * Re-exports
  , DiffTime
    -- * Auxiliary functions
  , alterAndLookup
  ) where

import           Control.Exception (Exception, SomeException (..), assert)
import           Control.Monad.State
import qualified Data.Map as Map
import           Data.Map.Strict (Map)
import           Data.Set (Set)
import qualified Data.Set as Set
import           Data.Typeable (eqT, (:~:) (..))

import           Control.Monad.Class.MonadAsync
import           Control.Monad.Class.MonadFork
import           Control.Monad.Class.MonadSTM.Strict
import           Control.Monad.Class.MonadTime
import           Control.Monad.Class.MonadTimer

import           Data.Semigroup.Action

-- | Semigroup of commands which acts on 'PeerState'.  The @t@ variable might
-- be initiated to 'DiffTime' or @Time m@.
--
-- This semigroup allows to either suspend both consumer and producer or just
-- the consumer part.
--
data SuspendDecision t
    = SuspendPeer !t !t
    -- ^ peer is suspend; The first @t@ is the time until which a local
    -- producer is suspended, the second one is the time until which a local
    -- consumer is suspended.
    | SuspendConsumer !t
    -- ^ suspend local consumer \/ initiator side until @t@ (this mean we are
    -- not allowing to communicate with the producer \/ responder of a remote
    -- peer).
    | Throw
    -- ^ throw an error from the main thread.
    deriving (SuspendDecision t -> SuspendDecision t -> Bool
(SuspendDecision t -> SuspendDecision t -> Bool)
-> (SuspendDecision t -> SuspendDecision t -> Bool)
-> Eq (SuspendDecision t)
forall t. Eq t => SuspendDecision t -> SuspendDecision t -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: SuspendDecision t -> SuspendDecision t -> Bool
$c/= :: forall t. Eq t => SuspendDecision t -> SuspendDecision t -> Bool
== :: SuspendDecision t -> SuspendDecision t -> Bool
$c== :: forall t. Eq t => SuspendDecision t -> SuspendDecision t -> Bool
Eq, Eq (SuspendDecision t)
Eq (SuspendDecision t)
-> (SuspendDecision t -> SuspendDecision t -> Ordering)
-> (SuspendDecision t -> SuspendDecision t -> Bool)
-> (SuspendDecision t -> SuspendDecision t -> Bool)
-> (SuspendDecision t -> SuspendDecision t -> Bool)
-> (SuspendDecision t -> SuspendDecision t -> Bool)
-> (SuspendDecision t -> SuspendDecision t -> SuspendDecision t)
-> (SuspendDecision t -> SuspendDecision t -> SuspendDecision t)
-> Ord (SuspendDecision t)
SuspendDecision t -> SuspendDecision t -> Bool
SuspendDecision t -> SuspendDecision t -> Ordering
SuspendDecision t -> SuspendDecision t -> SuspendDecision t
forall a.
Eq a
-> (a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
forall t. Ord t => Eq (SuspendDecision t)
forall t. Ord t => SuspendDecision t -> SuspendDecision t -> Bool
forall t.
Ord t =>
SuspendDecision t -> SuspendDecision t -> Ordering
forall t.
Ord t =>
SuspendDecision t -> SuspendDecision t -> SuspendDecision t
min :: SuspendDecision t -> SuspendDecision t -> SuspendDecision t
$cmin :: forall t.
Ord t =>
SuspendDecision t -> SuspendDecision t -> SuspendDecision t
max :: SuspendDecision t -> SuspendDecision t -> SuspendDecision t
$cmax :: forall t.
Ord t =>
SuspendDecision t -> SuspendDecision t -> SuspendDecision t
>= :: SuspendDecision t -> SuspendDecision t -> Bool
$c>= :: forall t. Ord t => SuspendDecision t -> SuspendDecision t -> Bool
> :: SuspendDecision t -> SuspendDecision t -> Bool
$c> :: forall t. Ord t => SuspendDecision t -> SuspendDecision t -> Bool
<= :: SuspendDecision t -> SuspendDecision t -> Bool
$c<= :: forall t. Ord t => SuspendDecision t -> SuspendDecision t -> Bool
< :: SuspendDecision t -> SuspendDecision t -> Bool
$c< :: forall t. Ord t => SuspendDecision t -> SuspendDecision t -> Bool
compare :: SuspendDecision t -> SuspendDecision t -> Ordering
$ccompare :: forall t.
Ord t =>
SuspendDecision t -> SuspendDecision t -> Ordering
$cp1Ord :: forall t. Ord t => Eq (SuspendDecision t)
Ord, Int -> SuspendDecision t -> ShowS
[SuspendDecision t] -> ShowS
SuspendDecision t -> String
(Int -> SuspendDecision t -> ShowS)
-> (SuspendDecision t -> String)
-> ([SuspendDecision t] -> ShowS)
-> Show (SuspendDecision t)
forall t. Show t => Int -> SuspendDecision t -> ShowS
forall t. Show t => [SuspendDecision t] -> ShowS
forall t. Show t => SuspendDecision t -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [SuspendDecision t] -> ShowS
$cshowList :: forall t. Show t => [SuspendDecision t] -> ShowS
show :: SuspendDecision t -> String
$cshow :: forall t. Show t => SuspendDecision t -> String
showsPrec :: Int -> SuspendDecision t -> ShowS
$cshowsPrec :: forall t. Show t => Int -> SuspendDecision t -> ShowS
Show, a -> SuspendDecision b -> SuspendDecision a
(a -> b) -> SuspendDecision a -> SuspendDecision b
(forall a b. (a -> b) -> SuspendDecision a -> SuspendDecision b)
-> (forall a b. a -> SuspendDecision b -> SuspendDecision a)
-> Functor SuspendDecision
forall a b. a -> SuspendDecision b -> SuspendDecision a
forall a b. (a -> b) -> SuspendDecision a -> SuspendDecision b
forall (f :: * -> *).
(forall a b. (a -> b) -> f a -> f b)
-> (forall a b. a -> f b -> f a) -> Functor f
<$ :: a -> SuspendDecision b -> SuspendDecision a
$c<$ :: forall a b. a -> SuspendDecision b -> SuspendDecision a
fmap :: (a -> b) -> SuspendDecision a -> SuspendDecision b
$cfmap :: forall a b. (a -> b) -> SuspendDecision a -> SuspendDecision b
Functor)

consumerSuspendedUntil :: SuspendDecision t -> Maybe t
consumerSuspendedUntil :: SuspendDecision t -> Maybe t
consumerSuspendedUntil (SuspendPeer t
_ t
consT)   = t -> Maybe t
forall a. a -> Maybe a
Just t
consT
consumerSuspendedUntil (SuspendConsumer t
consT) = t -> Maybe t
forall a. a -> Maybe a
Just t
consT
consumerSuspendedUntil SuspendDecision t
Throw                   = Maybe t
forall a. Maybe a
Nothing

producerSuspendedUntil :: SuspendDecision t -> Maybe t
producerSuspendedUntil :: SuspendDecision t -> Maybe t
producerSuspendedUntil (SuspendPeer t
prodT t
_) = t -> Maybe t
forall a. a -> Maybe a
Just t
prodT
producerSuspendedUntil (SuspendConsumer t
_)   = Maybe t
forall a. Maybe a
Nothing
producerSuspendedUntil SuspendDecision t
Throw                 = Maybe t
forall a. Maybe a
Nothing

-- | The semigroup instance.  Note that composing 'SuspendPeer' with
-- 'SuspendConsumer' gives 'SuspendPeer'.  'SuspendPeer' and 'SuspendConsumer'
-- form a sub-semigroup.
--
instance Ord t => Semigroup (SuspendDecision t) where
    SuspendDecision t
Throw <> :: SuspendDecision t -> SuspendDecision t -> SuspendDecision t
<> SuspendDecision t
_ = SuspendDecision t
forall t. SuspendDecision t
Throw
    SuspendDecision t
_ <> SuspendDecision t
Throw = SuspendDecision t
forall t. SuspendDecision t
Throw
    SuspendPeer t
prodT t
consT <> SuspendPeer t
prodT' t
consT'
      = t -> t -> SuspendDecision t
forall t. t -> t -> SuspendDecision t
SuspendPeer (t
prodT t -> t -> t
forall a. Ord a => a -> a -> a
`max` t
prodT') (t
consT t -> t -> t
forall a. Ord a => a -> a -> a
`max` t
consT')
    SuspendConsumer t
consT <> SuspendPeer t
prodT t
consT'
      = t -> t -> SuspendDecision t
forall t. t -> t -> SuspendDecision t
SuspendPeer t
prodT (t
consT t -> t -> t
forall a. Ord a => a -> a -> a
`max` t
consT')
    SuspendPeer t
prodT t
consT <> SuspendConsumer t
consT'
      = t -> t -> SuspendDecision t
forall t. t -> t -> SuspendDecision t
SuspendPeer t
prodT (t
consT t -> t -> t
forall a. Ord a => a -> a -> a
`max` t
consT')
    SuspendConsumer t
consT <> SuspendConsumer t
consT'
      = t -> SuspendDecision t
forall t. t -> SuspendDecision t
SuspendConsumer (t
consT t -> t -> t
forall a. Ord a => a -> a -> a
`max` t
consT')


data PeerState m
  = HotPeer !(Set (Async m ())) !(Set (Async m ()))
  -- ^ active peer with its producers and consumer threads
  | SuspendedConsumer !(Set (Async m ())) !Time
  -- ^ suspended consumer: with producer threads and time until the consumer is
  -- suspended
  | SuspendedPeer !Time !Time
  -- ^ suspended peer: producer & consumer suspend time
  | ColdPeer
  -- ^ peer with no opened connections in either direction

instance ( MonadAsync m
         , Show (ThreadId m)
         , Ord (ThreadId m)
         )  => Show (PeerState m) where
    show :: PeerState m -> String
show (HotPeer Set (Async m ())
producers Set (Async m ())
consumers)
       = String
"HotPeer"
      String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
" "
      String -> ShowS
forall a. [a] -> [a] -> [a]
++ Set (ThreadId m) -> String
forall a. Show a => a -> String
show ((Async m () -> ThreadId m) -> Set (Async m ()) -> Set (ThreadId m)
forall b a. Ord b => (a -> b) -> Set a -> Set b
Set.map Async m () -> ThreadId m
forall (m :: * -> *) a. MonadAsync m => Async m a -> ThreadId m
asyncThreadId Set (Async m ())
producers)
      String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
" "
      String -> ShowS
forall a. [a] -> [a] -> [a]
++ Set (ThreadId m) -> String
forall a. Show a => a -> String
show ((Async m () -> ThreadId m) -> Set (Async m ()) -> Set (ThreadId m)
forall b a. Ord b => (a -> b) -> Set a -> Set b
Set.map Async m () -> ThreadId m
forall (m :: * -> *) a. MonadAsync m => Async m a -> ThreadId m
asyncThreadId Set (Async m ())
consumers)
    show (SuspendedConsumer Set (Async m ())
producers Time
consT)
       = String
"SuspendedConsumer"
      String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
" "
      String -> ShowS
forall a. [a] -> [a] -> [a]
++ Set (ThreadId m) -> String
forall a. Show a => a -> String
show ((Async m () -> ThreadId m) -> Set (Async m ()) -> Set (ThreadId m)
forall b a. Ord b => (a -> b) -> Set a -> Set b
Set.map Async m () -> ThreadId m
forall (m :: * -> *) a. MonadAsync m => Async m a -> ThreadId m
asyncThreadId Set (Async m ())
producers)
      String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
" "
      String -> ShowS
forall a. [a] -> [a] -> [a]
++ Time -> String
forall a. Show a => a -> String
show Time
consT
    show (SuspendedPeer Time
prodT Time
consT)
       = String
"SuspendedPeer"
      String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
" "
      String -> ShowS
forall a. [a] -> [a] -> [a]
++ Time -> String
forall a. Show a => a -> String
show Time
prodT
      String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
" "
      String -> ShowS
forall a. [a] -> [a] -> [a]
++ Time -> String
forall a. Show a => a -> String
show Time
consT
    show PeerState m
ColdPeer = String
"ColdPeer"

deriving instance Eq (Async m ()) => Eq (PeerState m)

deriving instance Ord (Async m ()) => Ord (PeerState m)

-- | Action of 'SuspendDecision' on @Maybe 'PeerState'@.  We use this action
-- together with 'Map.alter' function.
--
-- Note: 'SuspendDecision' does not act on 'PeerState', only the sub-semigroup
-- generated by 'SuspendConsumer' and 'SuspendPeer' does.
--
--
instance SAct (SuspendDecision Time) (Maybe (PeerState m)) where

    -- this means we will remove the entry from the state map; this is fine
    -- since we are about to throw an exception to kill a node.
    Maybe (PeerState m)
_ <| :: Maybe (PeerState m) -> SuspendDecision Time -> Maybe (PeerState m)
<| SuspendDecision Time
Throw   = Maybe (PeerState m)
forall a. Maybe a
Nothing
    Maybe (PeerState m)
Nothing <| SuspendDecision Time
_ = Maybe (PeerState m)
forall a. Maybe a
Nothing

    -- this might apply when a connection to a 'ColdPeer' thrown an
    -- exception.
    (Just PeerState m
ColdPeer) <| (SuspendConsumer Time
consT)
      = PeerState m -> Maybe (PeerState m)
forall a. a -> Maybe a
Just (PeerState m -> Maybe (PeerState m))
-> PeerState m -> Maybe (PeerState m)
forall a b. (a -> b) -> a -> b
$ Set (Async m ()) -> Time -> PeerState m
forall (m :: * -> *). Set (Async m ()) -> Time -> PeerState m
SuspendedConsumer Set (Async m ())
forall a. Set a
Set.empty Time
consT
    (Just PeerState m
ColdPeer) <| (SuspendPeer Time
prodT Time
consT)
      = PeerState m -> Maybe (PeerState m)
forall a. a -> Maybe a
Just (Time -> Time -> PeerState m
forall (m :: * -> *). Time -> Time -> PeerState m
SuspendedPeer Time
prodT Time
consT)

    (Just (HotPeer Set (Async m ())
producers Set (Async m ())
_consumers)) <| (SuspendConsumer Time
consT)
      = PeerState m -> Maybe (PeerState m)
forall a. a -> Maybe a
Just (PeerState m -> Maybe (PeerState m))
-> PeerState m -> Maybe (PeerState m)
forall a b. (a -> b) -> a -> b
$ Set (Async m ()) -> Time -> PeerState m
forall (m :: * -> *). Set (Async m ()) -> Time -> PeerState m
SuspendedConsumer Set (Async m ())
producers Time
consT
    (Just (HotPeer Set (Async m ())
_prodcuers Set (Async m ())
_consumers)) <| (SuspendPeer Time
prodT Time
consT)
      = PeerState m -> Maybe (PeerState m)
forall a. a -> Maybe a
Just (PeerState m -> Maybe (PeerState m))
-> PeerState m -> Maybe (PeerState m)
forall a b. (a -> b) -> a -> b
$ Time -> Time -> PeerState m
forall (m :: * -> *). Time -> Time -> PeerState m
SuspendedPeer Time
prodT Time
consT

    (Just (SuspendedConsumer Set (Async m ())
producers Time
consT)) <| (SuspendConsumer Time
consT')
      = PeerState m -> Maybe (PeerState m)
forall a. a -> Maybe a
Just (PeerState m -> Maybe (PeerState m))
-> PeerState m -> Maybe (PeerState m)
forall a b. (a -> b) -> a -> b
$ Set (Async m ()) -> Time -> PeerState m
forall (m :: * -> *). Set (Async m ()) -> Time -> PeerState m
SuspendedConsumer Set (Async m ())
producers (Time
consT Time -> Time -> Time
forall a. Ord a => a -> a -> a
`max` Time
consT')
    (Just (SuspendedConsumer Set (Async m ())
_producers Time
consT)) <| (SuspendPeer Time
prodT Time
consT')
      = PeerState m -> Maybe (PeerState m)
forall a. a -> Maybe a
Just (PeerState m -> Maybe (PeerState m))
-> PeerState m -> Maybe (PeerState m)
forall a b. (a -> b) -> a -> b
$ Time -> Time -> PeerState m
forall (m :: * -> *). Time -> Time -> PeerState m
SuspendedPeer Time
prodT (Time
consT Time -> Time -> Time
forall a. Ord a => a -> a -> a
`max` Time
consT')

    (Just (SuspendedPeer Time
prodT Time
consT)) <| SuspendDecision Time
cmd
      = case SuspendDecision Time -> Maybe Time
forall t. SuspendDecision t -> Maybe t
producerSuspendedUntil SuspendDecision Time
cmd of
          Maybe Time
Nothing ->
            PeerState m -> Maybe (PeerState m)
forall a. a -> Maybe a
Just (PeerState m -> Maybe (PeerState m))
-> PeerState m -> Maybe (PeerState m)
forall a b. (a -> b) -> a -> b
$ Time -> Time -> PeerState m
forall (m :: * -> *). Time -> Time -> PeerState m
SuspendedPeer
                    Time
prodT
                    (Time -> (Time -> Time) -> Maybe Time -> Time
forall b a. b -> (a -> b) -> Maybe a -> b
maybe Time
consT (Time
consT Time -> Time -> Time
forall a. Ord a => a -> a -> a
`max`) (Maybe Time -> Time) -> Maybe Time -> Time
forall a b. (a -> b) -> a -> b
$ SuspendDecision Time -> Maybe Time
forall t. SuspendDecision t -> Maybe t
consumerSuspendedUntil SuspendDecision Time
cmd)
          Just Time
prodT' ->
            PeerState m -> Maybe (PeerState m)
forall a. a -> Maybe a
Just (PeerState m -> Maybe (PeerState m))
-> PeerState m -> Maybe (PeerState m)
forall a b. (a -> b) -> a -> b
$ Time -> Time -> PeerState m
forall (m :: * -> *). Time -> Time -> PeerState m
SuspendedPeer
                    (Time
prodT Time -> Time -> Time
forall a. Ord a => a -> a -> a
`max` Time
prodT')
                    (Time -> (Time -> Time) -> Maybe Time -> Time
forall b a. b -> (a -> b) -> Maybe a -> b
maybe Time
consT (Time
consT Time -> Time -> Time
forall a. Ord a => a -> a -> a
`max`) (Maybe Time -> Time) -> Maybe Time -> Time
forall a b. (a -> b) -> a -> b
$ SuspendDecision Time -> Maybe Time
forall t. SuspendDecision t -> Maybe t
consumerSuspendedUntil SuspendDecision Time
cmd)

-- | Threads which needs to be cancelled when updating the 'PeerState' with
-- 'SuspendDecision'.
--
threadsToCancel :: Ord (Async m ())
                => PeerState m
                -> SuspendDecision diffTime
                -> Set (Async m ())
threadsToCancel :: PeerState m -> SuspendDecision diffTime -> Set (Async m ())
threadsToCancel PeerState m
_ SuspendDecision diffTime
Throw
    = Set (Async m ())
forall a. Set a
Set.empty
threadsToCancel PeerState m
ColdPeer SuspendDecision diffTime
_
    = Set (Async m ())
forall a. Set a
Set.empty
threadsToCancel (HotPeer Set (Async m ())
_producers Set (Async m ())
consumers) SuspendConsumer{}
    = Set (Async m ())
consumers
threadsToCancel (HotPeer Set (Async m ())
consumers Set (Async m ())
producers) SuspendPeer{}
    = Set (Async m ())
consumers Set (Async m ()) -> Set (Async m ()) -> Set (Async m ())
forall a. Semigroup a => a -> a -> a
<> Set (Async m ())
producers
threadsToCancel SuspendedConsumer{} SuspendConsumer{}
    = Set (Async m ())
forall a. Set a
Set.empty
threadsToCancel (SuspendedConsumer Set (Async m ())
producers Time
_consT) SuspendPeer{}
    = Set (Async m ())
producers
threadsToCancel SuspendedPeer{} SuspendDecision diffTime
_cmd
    = Set (Async m ())
forall a. Set a
Set.empty


-- | Action of 'SuspendDecision' on @Maybe 'PeerState'@.  Action laws are only
-- satisfied for the submonoid form by 'SuspendPeer' and 'SuspendConsumer'.
--
suspend :: Ord (Async m ())
        => Maybe (PeerState m)
        -> SuspendDecision Time
        -> ( Set (Async m ())
           , Maybe (PeerState m)
           )
suspend :: Maybe (PeerState m)
-> SuspendDecision Time -> (Set (Async m ()), Maybe (PeerState m))
suspend Maybe (PeerState m)
mbps SuspendDecision Time
cmd = ( Set (Async m ())
-> (PeerState m -> Set (Async m ()))
-> Maybe (PeerState m)
-> Set (Async m ())
forall b a. b -> (a -> b) -> Maybe a -> b
maybe Set (Async m ())
forall a. Set a
Set.empty (PeerState m -> SuspendDecision Time -> Set (Async m ())
forall (m :: * -> *) diffTime.
Ord (Async m ()) =>
PeerState m -> SuspendDecision diffTime -> Set (Async m ())
`threadsToCancel` SuspendDecision Time
cmd) Maybe (PeerState m)
mbps
                   , Maybe (PeerState m)
mbps Maybe (PeerState m) -> SuspendDecision Time -> Maybe (PeerState m)
forall s x. SAct s x => x -> s -> x
<| SuspendDecision Time
cmd
                   )


-- | Map from addresses to 'PeerState's; it will be be shared in a 'StrictTVar'.
--
-- Abstracting @t@ is useful for tests, the @IO@ version will use @Time IO@.
--
data PeerStates m addr where
     -- | Map of peer states
     PeerStates :: !(Map addr (PeerState m))
                -> PeerStates m addr

     -- | Or an exception to throw
     ThrowException :: Exception e
                    => e
                    -> PeerStates m addr

instance Show addr
         => Show (PeerStates IO addr) where
    show :: PeerStates IO addr -> String
show (PeerStates Map addr (PeerState IO)
ps)    = String
"PeerStates "     String -> ShowS
forall a. [a] -> [a] -> [a]
++ Map addr (PeerState IO) -> String
forall a. Show a => a -> String
show Map addr (PeerState IO)
ps
    show (ThrowException e
e) = String
"ThrowException " String -> ShowS
forall a. [a] -> [a] -> [a]
++ e -> String
forall a. Show a => a -> String
show e
e

-- TODO: move to Test.PeerStates as eqPeerStates
instance Eq addr
         => Eq (PeerStates IO addr) where
    ThrowException (e
_ :: e) == :: PeerStates IO addr -> PeerStates IO addr -> Bool
== ThrowException (e
_ :: e') =
      case Maybe (e :~: e)
forall k (a :: k) (b :: k).
(Typeable a, Typeable b) =>
Maybe (a :~: b)
eqT :: Maybe (e :~: e') of
        Maybe (e :~: e)
Nothing   -> Bool
False
        Just e :~: e
Refl -> Bool
True
    PeerStates Map addr (PeerState IO)
ps == PeerStates Map addr (PeerState IO)
ps' = Map addr (PeerState IO)
ps Map addr (PeerState IO) -> Map addr (PeerState IO) -> Bool
forall a. Eq a => a -> a -> Bool
== Map addr (PeerState IO)
ps'
    PeerStates IO addr
_ == PeerStates IO addr
_ = Bool
False


newPeerStatesVarSTM :: MonadSTM m => STM m (StrictTVar m (PeerStates m addr))
newPeerStatesVarSTM :: STM m (StrictTVar m (PeerStates m addr))
newPeerStatesVarSTM = PeerStates m addr -> STM m (StrictTVar m (PeerStates m addr))
forall (m :: * -> *) a. MonadSTM m => a -> STM m (StrictTVar m a)
newTVar (Map addr (PeerState m) -> PeerStates m addr
forall addr (m :: * -> *).
Map addr (PeerState m) -> PeerStates m addr
PeerStates Map addr (PeerState m)
forall k a. Map k a
Map.empty)

newPeerStatesVar :: MonadSTM m => m (StrictTVar m (PeerStates m addr))
newPeerStatesVar :: m (StrictTVar m (PeerStates m addr))
newPeerStatesVar = STM m (StrictTVar m (PeerStates m addr))
-> m (StrictTVar m (PeerStates m addr))
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically STM m (StrictTVar m (PeerStates m addr))
forall (m :: * -> *) addr.
MonadSTM m =>
STM m (StrictTVar m (PeerStates m addr))
newPeerStatesVarSTM


-- | Periodically clean 'PeerState'.  It will stop when 'PeerState' becomes
-- 'ThrowException'.
--
cleanPeerStates :: ( MonadSTM   m
                   , MonadAsync m
                   , MonadTime  m
                   , MonadTimer m
                   )
                => DiffTime
                -> StrictTVar m (PeerStates m addr)
                -> m ()
cleanPeerStates :: DiffTime -> StrictTVar m (PeerStates m addr) -> m ()
cleanPeerStates DiffTime
interval StrictTVar m (PeerStates m addr)
v = m ()
go
  where
    go :: m ()
go = do
      DiffTime -> m ()
forall (m :: * -> *). MonadDelay m => DiffTime -> m ()
threadDelay DiffTime
interval
      Time
t <- m Time
forall (m :: * -> *). MonadMonotonicTime m => m Time
getMonotonicTime
      Bool
continue <- STM m Bool -> m Bool
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m Bool -> m Bool) -> STM m Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ do
        PeerStates m addr
s <- StrictTVar m (PeerStates m addr) -> STM m (PeerStates m addr)
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m (PeerStates m addr)
v
        case PeerStates m addr
s of
          ThrowException e
_
            -> Bool -> STM m Bool
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
False
          PeerStates Map addr (PeerState m)
ps
            -> Bool
True Bool -> STM m () -> STM m Bool
forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ (StrictTVar m (PeerStates m addr) -> PeerStates m addr -> STM m ()
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
StrictTVar m a -> a -> STM m ()
writeTVar StrictTVar m (PeerStates m addr)
v (PeerStates m addr -> STM m ()) -> PeerStates m addr -> STM m ()
forall a b. (a -> b) -> a -> b
$! (Map addr (PeerState m) -> PeerStates m addr
forall addr (m :: * -> *).
Map addr (PeerState m) -> PeerStates m addr
PeerStates (Map addr (PeerState m) -> PeerStates m addr)
-> Map addr (PeerState m) -> PeerStates m addr
forall a b. (a -> b) -> a -> b
$ (PeerState m -> Maybe (PeerState m))
-> Map addr (PeerState m) -> Map addr (PeerState m)
forall a b k. (a -> Maybe b) -> Map k a -> Map k b
Map.mapMaybe (Time -> PeerState m -> Maybe (PeerState m)
forall (m :: * -> *). Time -> PeerState m -> Maybe (PeerState m)
cleanPeerState Time
t) Map addr (PeerState m)
ps))

      if Bool
continue
        then m ()
go
        else () -> m ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()


    cleanPeerState :: Time -> PeerState m -> Maybe (PeerState m)
    cleanPeerState :: Time -> PeerState m -> Maybe (PeerState m)
cleanPeerState Time
_t ColdPeer{}   = Maybe (PeerState m)
forall a. Maybe a
Nothing
    cleanPeerState Time
_  ps :: PeerState m
ps@HotPeer{} = PeerState m -> Maybe (PeerState m)
forall a. a -> Maybe a
Just PeerState m
ps
    cleanPeerState  Time
t ps :: PeerState m
ps@(SuspendedConsumer Set (Async m ())
producers Time
consT)
      | Set (Async m ()) -> Bool
forall a. Set a -> Bool
Set.null Set (Async m ())
producers  Bool -> Bool -> Bool
&& Time
consT Time -> Time -> Bool
forall a. Ord a => a -> a -> Bool
>= Time
t
      -- the consumer is not suspended anymore, but there is no producer thread
      -- running, we can safely remove the peer from 'PeerStates'
      = Maybe (PeerState m)
forall a. Maybe a
Nothing

      | Time
consT Time -> Time -> Bool
forall a. Ord a => a -> a -> Bool
>= Time
t
      -- the consumer is not suspended anymore, there are running producer
      -- threads, and thus return a 'HotPeer'.
      = PeerState m -> Maybe (PeerState m)
forall a. a -> Maybe a
Just (Set (Async m ()) -> Set (Async m ()) -> PeerState m
forall (m :: * -> *).
Set (Async m ()) -> Set (Async m ()) -> PeerState m
HotPeer Set (Async m ())
producers Set (Async m ())
forall a. Set a
Set.empty)

      | Bool
otherwise
      -- otherwise the consumer is still supsended
      = PeerState m -> Maybe (PeerState m)
forall a. a -> Maybe a
Just PeerState m
ps

    cleanPeerState  Time
t ps :: PeerState m
ps@(SuspendedPeer Time
prodT Time
consT)
      | Time
prodT Time -> Time -> Bool
forall a. Ord a => a -> a -> Bool
< Time
t
      -- the producer is still suspended
      = PeerState m -> Maybe (PeerState m)
forall a. a -> Maybe a
Just PeerState m
ps

      | Time
consT Time -> Time -> Bool
forall a. Ord a => a -> a -> Bool
< Time
t
      -- only the consumer is still suspended
      = PeerState m -> Maybe (PeerState m)
forall a. a -> Maybe a
Just (Set (Async m ()) -> Time -> PeerState m
forall (m :: * -> *). Set (Async m ()) -> Time -> PeerState m
SuspendedConsumer Set (Async m ())
forall a. Set a
Set.empty Time
consT)

      | Bool
otherwise
      -- the peer is not suspended any more
      = Maybe (PeerState m)
forall a. Maybe a
Nothing



-- | Update 'PeerStates' for a given 'addr', using 'suspend', and return
-- threads which must be cancelled.
--
-- This is more efficient that using the action of 'SuspendDecision' on
-- 'PeerStates', since it only uses a single dictionary lookup to update the
-- state and return the set of threads to be cancelled.
--
runSuspendDecision
    :: forall m addr e.
       ( Ord addr
       , Ord (Async m ())
       , Exception e
       )
    => Time
    -> addr
    -> e
    -> SuspendDecision DiffTime
    -> PeerStates m addr
    -> ( PeerStates m addr
       , Set (Async m ())
       )
runSuspendDecision :: Time
-> addr
-> e
-> SuspendDecision DiffTime
-> PeerStates m addr
-> (PeerStates m addr, Set (Async m ()))
runSuspendDecision Time
_t addr
_addr e
_e SuspendDecision DiffTime
_cmd ps0 :: PeerStates m addr
ps0@ThrowException{} =
    ( PeerStates m addr
ps0
    , Set (Async m ())
forall a. Set a
Set.empty
    )
runSuspendDecision Time
_t addr
_addr  e
e  SuspendDecision DiffTime
Throw PeerStates m addr
_ =
    ( SomeException -> PeerStates m addr
forall e (m :: * -> *) addr. Exception e => e -> PeerStates m addr
ThrowException (e -> SomeException
forall e. Exception e => e -> SomeException
SomeException e
e)
    , Set (Async m ())
forall a. Set a
Set.empty
    )
runSuspendDecision  Time
t  addr
addr e
_e  SuspendDecision DiffTime
cmd (PeerStates Map addr (PeerState m)
ps0) =
    (Map addr (PeerState m), Maybe (Set (Async m ())))
-> (PeerStates m addr, Set (Async m ()))
gn ((Map addr (PeerState m), Maybe (Set (Async m ())))
 -> (PeerStates m addr, Set (Async m ())))
-> (Map addr (PeerState m), Maybe (Set (Async m ())))
-> (PeerStates m addr, Set (Async m ()))
forall a b. (a -> b) -> a -> b
$ (Maybe (PeerState m) -> (Set (Async m ()), Maybe (PeerState m)))
-> addr
-> Map addr (PeerState m)
-> (Map addr (PeerState m), Maybe (Set (Async m ())))
forall k s a.
Ord k =>
(Maybe a -> (s, Maybe a)) -> k -> Map k a -> (Map k a, Maybe s)
alterAndLookup Maybe (PeerState m) -> (Set (Async m ()), Maybe (PeerState m))
fn addr
addr Map addr (PeerState m)
ps0
  where
    fn :: Maybe (PeerState m)
       -> ( Set (Async m ())
          , Maybe (PeerState m)
          )
    fn :: Maybe (PeerState m) -> (Set (Async m ()), Maybe (PeerState m))
fn Maybe (PeerState m)
mbps = ( Set (Async m ())
-> (PeerState m -> Set (Async m ()))
-> Maybe (PeerState m)
-> Set (Async m ())
forall b a. b -> (a -> b) -> Maybe a -> b
maybe Set (Async m ())
forall a. Set a
Set.empty (PeerState m -> SuspendDecision DiffTime -> Set (Async m ())
forall (m :: * -> *) diffTime.
Ord (Async m ()) =>
PeerState m -> SuspendDecision diffTime -> Set (Async m ())
`threadsToCancel` SuspendDecision DiffTime
cmd) Maybe (PeerState m)
mbps
              , Maybe (PeerState m)
mbps Maybe (PeerState m) -> SuspendDecision Time -> Maybe (PeerState m)
forall s x. SAct s x => x -> s -> x
<| ((DiffTime -> Time -> Time) -> Time -> DiffTime -> Time
forall a b c. (a -> b -> c) -> b -> a -> c
flip DiffTime -> Time -> Time
addTime Time
t (DiffTime -> Time)
-> SuspendDecision DiffTime -> SuspendDecision Time
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> SuspendDecision DiffTime
cmd)
              )

    gn :: ( Map addr (PeerState m)
          , Maybe (Set (Async m ()))
          )
       -> ( PeerStates m addr
          , Set (Async m ())
          )
    gn :: (Map addr (PeerState m), Maybe (Set (Async m ())))
-> (PeerStates m addr, Set (Async m ()))
gn (Map addr (PeerState m)
ps, Maybe (Set (Async m ()))
Nothing) = (Map addr (PeerState m) -> PeerStates m addr
forall addr (m :: * -> *).
Map addr (PeerState m) -> PeerStates m addr
PeerStates Map addr (PeerState m)
ps, Set (Async m ())
forall a. Set a
Set.empty)
    gn (Map addr (PeerState m)
ps, Just Set (Async m ())
s)  = (Map addr (PeerState m) -> PeerStates m addr
forall addr (m :: * -> *).
Map addr (PeerState m) -> PeerStates m addr
PeerStates Map addr (PeerState m)
ps, Set (Async m ())
s)



-- Using pure 'State' monad and 'alterF' to avoid searching the 'PeerState'
-- twice.
alterAndLookup
    :: forall k s a.
       Ord k
    => (Maybe a -> (s, Maybe a))
    -> k
    -> Map k a
    -> ( Map k a
       , Maybe s
       )
alterAndLookup :: (Maybe a -> (s, Maybe a)) -> k -> Map k a -> (Map k a, Maybe s)
alterAndLookup Maybe a -> (s, Maybe a)
f k
k Map k a
m = State (Maybe s) (Map k a) -> Maybe s -> (Map k a, Maybe s)
forall s a. State s a -> s -> (a, s)
runState ((Maybe a -> StateT (Maybe s) Identity (Maybe a))
-> k -> Map k a -> State (Maybe s) (Map k a)
forall (f :: * -> *) k a.
(Functor f, Ord k) =>
(Maybe a -> f (Maybe a)) -> k -> Map k a -> f (Map k a)
Map.alterF Maybe a -> StateT (Maybe s) Identity (Maybe a)
g k
k Map k a
m) Maybe s
forall a. Maybe a
Nothing
  where
    g :: Maybe a -> State (Maybe s) (Maybe a)
    g :: Maybe a -> StateT (Maybe s) Identity (Maybe a)
g Maybe a
mba = case Maybe a -> (s, Maybe a)
f Maybe a
mba of
      (s
s, Maybe a
mba') -> Maybe a
mba' Maybe a
-> StateT (Maybe s) Identity ()
-> StateT (Maybe s) Identity (Maybe a)
forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ (Maybe s -> Maybe s) -> StateT (Maybe s) Identity ()
forall s (m :: * -> *). MonadState s m => (s -> s) -> m ()
modify' (Maybe s -> Maybe s -> Maybe s
forall a b. a -> b -> a
const (s -> Maybe s
forall a. a -> Maybe a
Just s
s))


--
-- Various callbacks
--


-- | Register producer in PeerStates.  This is a partial function which assumes
-- that the 'PeerState' is either 'HotPeer' or 'SuspendedConsumer'.
--
registerProducer :: forall m addr.
                    ( Ord addr
                    , Ord (Async m ())
                    )
                 => addr
                 -> Async m ()
                 -> PeerStates m addr
                 -> PeerStates m addr
registerProducer :: addr -> Async m () -> PeerStates m addr -> PeerStates m addr
registerProducer addr
_addr Async m ()
_tid ps :: PeerStates m addr
ps@ThrowException{} = PeerStates m addr
ps
registerProducer addr
addr  Async m ()
tid  (PeerStates Map addr (PeerState m)
peerStates) =
    Map addr (PeerState m) -> PeerStates m addr
forall addr (m :: * -> *).
Map addr (PeerState m) -> PeerStates m addr
PeerStates (Map addr (PeerState m) -> PeerStates m addr)
-> Map addr (PeerState m) -> PeerStates m addr
forall a b. (a -> b) -> a -> b
$ (Maybe (PeerState m) -> Maybe (PeerState m))
-> addr -> Map addr (PeerState m) -> Map addr (PeerState m)
forall k a.
Ord k =>
(Maybe a -> Maybe a) -> k -> Map k a -> Map k a
Map.alter Maybe (PeerState m) -> Maybe (PeerState m)
fn addr
addr Map addr (PeerState m)
peerStates
  where
    fn :: Maybe (PeerState m) -> Maybe (PeerState m)
    fn :: Maybe (PeerState m) -> Maybe (PeerState m)
fn Maybe (PeerState m)
Nothing =
        PeerState m -> Maybe (PeerState m)
forall a. a -> Maybe a
Just (Set (Async m ()) -> Set (Async m ()) -> PeerState m
forall (m :: * -> *).
Set (Async m ()) -> Set (Async m ()) -> PeerState m
HotPeer (Async m () -> Set (Async m ())
forall a. a -> Set a
Set.singleton Async m ()
tid) Set (Async m ())
forall a. Set a
Set.empty)
    fn (Just (HotPeer Set (Async m ())
producers Set (Async m ())
consumers)) =
        PeerState m -> Maybe (PeerState m)
forall a. a -> Maybe a
Just (Set (Async m ()) -> Set (Async m ()) -> PeerState m
forall (m :: * -> *).
Set (Async m ()) -> Set (Async m ()) -> PeerState m
HotPeer (Async m ()
tid Async m () -> Set (Async m ()) -> Set (Async m ())
forall a. Ord a => a -> Set a -> Set a
`Set.insert` Set (Async m ())
producers) Set (Async m ())
consumers)
    fn (Just PeerState m
ColdPeer) =
        PeerState m -> Maybe (PeerState m)
forall a. a -> Maybe a
Just (Set (Async m ()) -> Set (Async m ()) -> PeerState m
forall (m :: * -> *).
Set (Async m ()) -> Set (Async m ()) -> PeerState m
HotPeer (Async m () -> Set (Async m ())
forall a. a -> Set a
Set.singleton Async m ()
tid) Set (Async m ())
forall a. Set a
Set.empty)
    fn (Just (SuspendedConsumer Set (Async m ())
producers Time
consT)) =
        PeerState m -> Maybe (PeerState m)
forall a. a -> Maybe a
Just (Set (Async m ()) -> Time -> PeerState m
forall (m :: * -> *). Set (Async m ()) -> Time -> PeerState m
SuspendedConsumer (Async m ()
tid Async m () -> Set (Async m ()) -> Set (Async m ())
forall a. Ord a => a -> Set a -> Set a
`Set.insert` Set (Async m ())
producers) Time
consT)
    fn (Just ps :: PeerState m
ps@SuspendedPeer{}) =
        -- registerProducer on a suspended peer
        Bool -> Maybe (PeerState m) -> Maybe (PeerState m)
forall a. HasCallStack => Bool -> a -> a
assert Bool
False (Maybe (PeerState m) -> Maybe (PeerState m))
-> Maybe (PeerState m) -> Maybe (PeerState m)
forall a b. (a -> b) -> a -> b
$ PeerState m -> Maybe (PeerState m)
forall a. a -> Maybe a
Just PeerState m
ps

unregisterProducer :: forall m addr.
                      ( Ord addr
                      , Ord (Async m ())
                      )
                   => addr
                   -> Async m ()
                   -> PeerStates m addr
                   -> PeerStates m addr
unregisterProducer :: addr -> Async m () -> PeerStates m addr -> PeerStates m addr
unregisterProducer addr
_addr Async m ()
_tid ps :: PeerStates m addr
ps@ThrowException{} = PeerStates m addr
ps
unregisterProducer addr
addr  Async m ()
tid  (PeerStates Map addr (PeerState m)
peerStates) =
    Map addr (PeerState m) -> PeerStates m addr
forall addr (m :: * -> *).
Map addr (PeerState m) -> PeerStates m addr
PeerStates (Map addr (PeerState m) -> PeerStates m addr)
-> Map addr (PeerState m) -> PeerStates m addr
forall a b. (a -> b) -> a -> b
$ (Maybe (PeerState m) -> Maybe (PeerState m))
-> addr -> Map addr (PeerState m) -> Map addr (PeerState m)
forall k a.
Ord k =>
(Maybe a -> Maybe a) -> k -> Map k a -> Map k a
Map.alter Maybe (PeerState m) -> Maybe (PeerState m)
fn addr
addr Map addr (PeerState m)
peerStates
  where
    fn :: Maybe (PeerState m) -> Maybe (PeerState m)
    fn :: Maybe (PeerState m) -> Maybe (PeerState m)
fn Maybe (PeerState m)
Nothing = Maybe (PeerState m)
forall a. Maybe a
Nothing
    fn (Just (HotPeer Set (Async m ())
producers Set (Async m ())
consumers)) =
        let producers' :: Set (Async m ())
producers' = Async m ()
tid Async m () -> Set (Async m ()) -> Set (Async m ())
forall a. Ord a => a -> Set a -> Set a
`Set.delete` Set (Async m ())
producers
        in if Set (Async m ()) -> Bool
forall a. Set a -> Bool
Set.null Set (Async m ())
producers' Bool -> Bool -> Bool
&& Set (Async m ()) -> Bool
forall a. Set a -> Bool
Set.null Set (Async m ())
consumers
             then Maybe (PeerState m)
forall a. Maybe a
Nothing
             else PeerState m -> Maybe (PeerState m)
forall a. a -> Maybe a
Just (Set (Async m ()) -> Set (Async m ()) -> PeerState m
forall (m :: * -> *).
Set (Async m ()) -> Set (Async m ()) -> PeerState m
HotPeer Set (Async m ())
producers' Set (Async m ())
consumers)
    fn (Just PeerState m
ColdPeer) = Maybe (PeerState m)
forall a. Maybe a
Nothing
    fn (Just p :: PeerState m
p@SuspendedPeer{}) = PeerState m -> Maybe (PeerState m)
forall a. a -> Maybe a
Just PeerState m
p
    fn (Just (SuspendedConsumer Set (Async m ())
producers Time
consT)) =
        PeerState m -> Maybe (PeerState m)
forall a. a -> Maybe a
Just (Set (Async m ()) -> Time -> PeerState m
forall (m :: * -> *). Set (Async m ()) -> Time -> PeerState m
SuspendedConsumer (Async m ()
tid Async m () -> Set (Async m ()) -> Set (Async m ())
forall a. Ord a => a -> Set a -> Set a
`Set.delete` Set (Async m ())
producers) Time
consT)


-- | Register consumer in 'PeerState'.  This is a partial function which
-- assumes that the 'PeerState' is 'HotPeer'.
--
registerConsumer :: forall m addr.
                    ( Ord addr
                    , Ord (Async m ())
                    )
                 => addr
                 -> Async m ()
                 -> PeerStates m addr
                 -> PeerStates m addr
registerConsumer :: addr -> Async m () -> PeerStates m addr -> PeerStates m addr
registerConsumer addr
_addr Async m ()
_tid ps :: PeerStates m addr
ps@ThrowException{} = PeerStates m addr
ps
registerConsumer addr
addr  Async m ()
tid  (PeerStates Map addr (PeerState m)
peerStates) =
    Map addr (PeerState m) -> PeerStates m addr
forall addr (m :: * -> *).
Map addr (PeerState m) -> PeerStates m addr
PeerStates (Map addr (PeerState m) -> PeerStates m addr)
-> Map addr (PeerState m) -> PeerStates m addr
forall a b. (a -> b) -> a -> b
$ (Maybe (PeerState m) -> Maybe (PeerState m))
-> addr -> Map addr (PeerState m) -> Map addr (PeerState m)
forall k a.
Ord k =>
(Maybe a -> Maybe a) -> k -> Map k a -> Map k a
Map.alter Maybe (PeerState m) -> Maybe (PeerState m)
fn addr
addr Map addr (PeerState m)
peerStates
  where
    fn :: Maybe (PeerState m) -> Maybe (PeerState m)
    fn :: Maybe (PeerState m) -> Maybe (PeerState m)
fn  Maybe (PeerState m)
Nothing =
        PeerState m -> Maybe (PeerState m)
forall a. a -> Maybe a
Just (Set (Async m ()) -> Set (Async m ()) -> PeerState m
forall (m :: * -> *).
Set (Async m ()) -> Set (Async m ()) -> PeerState m
HotPeer Set (Async m ())
forall a. Set a
Set.empty (Async m () -> Set (Async m ())
forall a. a -> Set a
Set.singleton Async m ()
tid))
    fn  (Just (HotPeer Set (Async m ())
producers Set (Async m ())
consumers)) =
        PeerState m -> Maybe (PeerState m)
forall a. a -> Maybe a
Just (Set (Async m ()) -> Set (Async m ()) -> PeerState m
forall (m :: * -> *).
Set (Async m ()) -> Set (Async m ()) -> PeerState m
HotPeer Set (Async m ())
producers (Async m ()
tid Async m () -> Set (Async m ()) -> Set (Async m ())
forall a. Ord a => a -> Set a -> Set a
`Set.insert` Set (Async m ())
consumers))
    fn (Just PeerState m
ColdPeer) =
        PeerState m -> Maybe (PeerState m)
forall a. a -> Maybe a
Just (Set (Async m ()) -> Set (Async m ()) -> PeerState m
forall (m :: * -> *).
Set (Async m ()) -> Set (Async m ()) -> PeerState m
HotPeer Set (Async m ())
forall a. Set a
Set.empty (Async m () -> Set (Async m ())
forall a. a -> Set a
Set.singleton Async m ()
tid))
    fn (Just PeerState m
ps) =
        -- registerConsumer on a suspended peer
        Bool -> Maybe (PeerState m) -> Maybe (PeerState m)
forall a. HasCallStack => Bool -> a -> a
assert Bool
False (Maybe (PeerState m) -> Maybe (PeerState m))
-> Maybe (PeerState m) -> Maybe (PeerState m)
forall a b. (a -> b) -> a -> b
$ PeerState m -> Maybe (PeerState m)
forall a. a -> Maybe a
Just PeerState m
ps


-- | Unregister consumer from a 'PeerState'.
--
unregisterConsumer :: forall m addr.
                      ( Ord addr
                      , Ord (Async m ())
                      )
                   => addr
                   -> Async m ()
                   -> PeerStates m addr
                   -> PeerStates m addr
unregisterConsumer :: addr -> Async m () -> PeerStates m addr -> PeerStates m addr
unregisterConsumer addr
_addr Async m ()
_tid ps :: PeerStates m addr
ps@ThrowException{} = PeerStates m addr
ps
unregisterConsumer addr
addr  Async m ()
tid (PeerStates Map addr (PeerState m)
peerStates) =
    Map addr (PeerState m) -> PeerStates m addr
forall addr (m :: * -> *).
Map addr (PeerState m) -> PeerStates m addr
PeerStates (Map addr (PeerState m) -> PeerStates m addr)
-> Map addr (PeerState m) -> PeerStates m addr
forall a b. (a -> b) -> a -> b
$ (Maybe (PeerState m) -> Maybe (PeerState m))
-> addr -> Map addr (PeerState m) -> Map addr (PeerState m)
forall k a.
Ord k =>
(Maybe a -> Maybe a) -> k -> Map k a -> Map k a
Map.alter Maybe (PeerState m) -> Maybe (PeerState m)
fn addr
addr Map addr (PeerState m)
peerStates
  where
    fn :: Maybe (PeerState m) -> Maybe (PeerState m)
    fn :: Maybe (PeerState m) -> Maybe (PeerState m)
fn Maybe (PeerState m)
Nothing = Maybe (PeerState m)
forall a. Maybe a
Nothing
    fn (Just (HotPeer Set (Async m ())
producers Set (Async m ())
consumers)) =
      let consumers' :: Set (Async m ())
consumers' = Async m ()
tid Async m () -> Set (Async m ()) -> Set (Async m ())
forall a. Ord a => a -> Set a -> Set a
`Set.delete` Set (Async m ())
consumers
      in if Set (Async m ()) -> Bool
forall a. Set a -> Bool
Set.null Set (Async m ())
producers Bool -> Bool -> Bool
&& Set (Async m ()) -> Bool
forall a. Set a -> Bool
Set.null Set (Async m ())
consumers'
           then Maybe (PeerState m)
forall a. Maybe a
Nothing
           else PeerState m -> Maybe (PeerState m)
forall a. a -> Maybe a
Just (Set (Async m ()) -> Set (Async m ()) -> PeerState m
forall (m :: * -> *).
Set (Async m ()) -> Set (Async m ()) -> PeerState m
HotPeer Set (Async m ())
producers Set (Async m ())
consumers')
    fn (Just PeerState m
ColdPeer) = Maybe (PeerState m)
forall a. Maybe a
Nothing
    fn (Just PeerState m
ps) = PeerState m -> Maybe (PeerState m)
forall a. a -> Maybe a
Just PeerState m
ps


-- | Before connectin with a peer we make a decision to either connect to it or
-- not.
--
data ConnectDecision s
    = AllowConnection !s
    | DisallowConnection !s
  deriving a -> ConnectDecision b -> ConnectDecision a
(a -> b) -> ConnectDecision a -> ConnectDecision b
(forall a b. (a -> b) -> ConnectDecision a -> ConnectDecision b)
-> (forall a b. a -> ConnectDecision b -> ConnectDecision a)
-> Functor ConnectDecision
forall a b. a -> ConnectDecision b -> ConnectDecision a
forall a b. (a -> b) -> ConnectDecision a -> ConnectDecision b
forall (f :: * -> *).
(forall a b. (a -> b) -> f a -> f b)
-> (forall a b. a -> f b -> f a) -> Functor f
<$ :: a -> ConnectDecision b -> ConnectDecision a
$c<$ :: forall a b. a -> ConnectDecision b -> ConnectDecision a
fmap :: (a -> b) -> ConnectDecision a -> ConnectDecision b
$cfmap :: forall a b. (a -> b) -> ConnectDecision a -> ConnectDecision b
Functor

-- | Check state before connecting to a remote peer.  We will connect only if
-- it retuns 'True'.
--
type BeforeConnect m s addr = Time -> addr -> s -> STM m (ConnectDecision s)

-- | Run 'BeforeConnect' callback in a 'MonadTime' monad.
--
runBeforeConnect :: ( MonadSTM  m
                    , MonadTime m
                    )
                 => StrictTVar m s
                 -> BeforeConnect m s addr
                 -> addr
                 -> m Bool
runBeforeConnect :: StrictTVar m s -> BeforeConnect m s addr -> addr -> m Bool
runBeforeConnect StrictTVar m s
sVar BeforeConnect m s addr
beforeConnect addr
addr = do
    Time
t <- m Time
forall (m :: * -> *). MonadMonotonicTime m => m Time
getMonotonicTime
    STM m Bool -> m Bool
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m Bool -> m Bool) -> STM m Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ do
      ConnectDecision s
d <- StrictTVar m s -> STM m s
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m s
sVar STM m s
-> (s -> STM m (ConnectDecision s)) -> STM m (ConnectDecision s)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= BeforeConnect m s addr
beforeConnect Time
t addr
addr
      case ConnectDecision s
d of
        AllowConnection s
s    -> Bool
True  Bool -> STM m () -> STM m Bool
forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ StrictTVar m s -> s -> STM m ()
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
StrictTVar m a -> a -> STM m ()
writeTVar StrictTVar m s
sVar s
s
        DisallowConnection s
s -> Bool
False Bool -> STM m () -> STM m Bool
forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ StrictTVar m s -> s -> STM m ()
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
StrictTVar m a -> a -> STM m ()
writeTVar StrictTVar m s
sVar s
s


-- | 'BeforeConnect' callback: it updates peer state and return boolean value
-- wheather to connect to it or not.  If a peer hasn't been recorded in
-- 'PeerStates', we add it and try to connect to it.
--
beforeConnectTx
  :: forall m addr.
     ( MonadSTM m
     , Ord addr
     )
  => BeforeConnect m
      (PeerStates m addr)
      addr

beforeConnectTx :: BeforeConnect m (PeerStates m addr) addr
beforeConnectTx Time
_t addr
_addr ps :: PeerStates m addr
ps@ThrowException{} = ConnectDecision (PeerStates m addr)
-> STM m (ConnectDecision (PeerStates m addr))
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ConnectDecision (PeerStates m addr)
 -> STM m (ConnectDecision (PeerStates m addr)))
-> ConnectDecision (PeerStates m addr)
-> STM m (ConnectDecision (PeerStates m addr))
forall a b. (a -> b) -> a -> b
$ PeerStates m addr -> ConnectDecision (PeerStates m addr)
forall s. s -> ConnectDecision s
DisallowConnection PeerStates m addr
ps

beforeConnectTx  Time
t  addr
addr (PeerStates Map addr (PeerState m)
s) =
    case (Maybe (PeerState m) -> (ConnectDecision (), Maybe (PeerState m)))
-> addr
-> Map addr (PeerState m)
-> (Map addr (PeerState m), Maybe (ConnectDecision ()))
forall k s a.
Ord k =>
(Maybe a -> (s, Maybe a)) -> k -> Map k a -> (Map k a, Maybe s)
alterAndLookup Maybe (PeerState m) -> (ConnectDecision (), Maybe (PeerState m))
fn addr
addr Map addr (PeerState m)
s of
      (Map addr (PeerState m)
s', Maybe (ConnectDecision ())
mbd) -> case Maybe (ConnectDecision ())
mbd of
        Maybe (ConnectDecision ())
Nothing -> ConnectDecision (PeerStates m addr)
-> STM m (ConnectDecision (PeerStates m addr))
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ConnectDecision (PeerStates m addr)
 -> STM m (ConnectDecision (PeerStates m addr)))
-> ConnectDecision (PeerStates m addr)
-> STM m (ConnectDecision (PeerStates m addr))
forall a b. (a -> b) -> a -> b
$ PeerStates m addr -> ConnectDecision (PeerStates m addr)
forall s. s -> ConnectDecision s
DisallowConnection (Map addr (PeerState m) -> PeerStates m addr
forall addr (m :: * -> *).
Map addr (PeerState m) -> PeerStates m addr
PeerStates Map addr (PeerState m)
s')
        Just ConnectDecision ()
d  -> ConnectDecision (PeerStates m addr)
-> STM m (ConnectDecision (PeerStates m addr))
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Map addr (PeerState m) -> PeerStates m addr
forall addr (m :: * -> *).
Map addr (PeerState m) -> PeerStates m addr
PeerStates Map addr (PeerState m)
s' PeerStates m addr
-> ConnectDecision () -> ConnectDecision (PeerStates m addr)
forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ ConnectDecision ()
d)
  where
    fn :: Maybe (PeerState m)
       -> ( ConnectDecision ()
          , Maybe (PeerState m)
          )

    -- we see the peer for the first time; consider it as a good peer and
    -- try to connect to it.
    fn :: Maybe (PeerState m) -> (ConnectDecision (), Maybe (PeerState m))
fn Maybe (PeerState m)
Nothing = ( () -> ConnectDecision ()
forall s. s -> ConnectDecision s
AllowConnection ()
                 , PeerState m -> Maybe (PeerState m)
forall a. a -> Maybe a
Just PeerState m
forall (m :: * -> *). PeerState m
ColdPeer
                 )

    fn (Just p :: PeerState m
p@ColdPeer{}) = ( () -> ConnectDecision ()
forall s. s -> ConnectDecision s
AllowConnection ()
                             , PeerState m -> Maybe (PeerState m)
forall a. a -> Maybe a
Just PeerState m
p
                             )

    fn (Just p :: PeerState m
p@(HotPeer Set (Async m ())
producers Set (Async m ())
consumers))
      = if Set (Async m ()) -> Bool
forall a. Set a -> Bool
Set.null Set (Async m ())
producers Bool -> Bool -> Bool
&& Set (Async m ()) -> Bool
forall a. Set a -> Bool
Set.null Set (Async m ())
consumers
        -- the peer has no registered producers nor consumers, thus it should
        -- be marked as a 'ColdPeer'
        then ( () -> ConnectDecision ()
forall s. s -> ConnectDecision s
AllowConnection ()
             , PeerState m -> Maybe (PeerState m)
forall a. a -> Maybe a
Just PeerState m
forall (m :: * -> *). PeerState m
ColdPeer
             )
        else ( () -> ConnectDecision ()
forall s. s -> ConnectDecision s
AllowConnection ()
             , PeerState m -> Maybe (PeerState m)
forall a. a -> Maybe a
Just PeerState m
p
             )

    fn (Just p :: PeerState m
p@(SuspendedConsumer Set (Async m ())
producers Time
consT)) =
      if Time
consT Time -> Time -> Bool
forall a. Ord a => a -> a -> Bool
< Time
t
        then if Set (Async m ()) -> Bool
forall a. Set a -> Bool
Set.null Set (Async m ())
producers
               -- the consumer is not suspended any longer, and it has no
               -- producers; thus it's a 'ColdPeer'.
               then (() -> ConnectDecision ()
forall s. s -> ConnectDecision s
AllowConnection (), PeerState m -> Maybe (PeerState m)
forall a. a -> Maybe a
Just PeerState m
forall (m :: * -> *). PeerState m
ColdPeer)
               else (() -> ConnectDecision ()
forall s. s -> ConnectDecision s
AllowConnection (), PeerState m -> Maybe (PeerState m)
forall a. a -> Maybe a
Just (Set (Async m ()) -> Set (Async m ()) -> PeerState m
forall (m :: * -> *).
Set (Async m ()) -> Set (Async m ()) -> PeerState m
HotPeer Set (Async m ())
producers Set (Async m ())
forall a. Set a
Set.empty))
        else (() -> ConnectDecision ()
forall s. s -> ConnectDecision s
DisallowConnection (), PeerState m -> Maybe (PeerState m)
forall a. a -> Maybe a
Just PeerState m
p)

    fn (Just p :: PeerState m
p@(SuspendedPeer Time
prodT Time
consT)) =
      if Time
t Time -> Time -> Bool
forall a. Ord a => a -> a -> Bool
< Time
prodT Time -> Time -> Time
forall a. Ord a => a -> a -> a
`max` Time
consT
          then if Time
t Time -> Time -> Bool
forall a. Ord a => a -> a -> Bool
< Time
prodT Time -> Time -> Time
forall a. Ord a => a -> a -> a
`min` Time
consT
                 then (() -> ConnectDecision ()
forall s. s -> ConnectDecision s
DisallowConnection (), PeerState m -> Maybe (PeerState m)
forall a. a -> Maybe a
Just PeerState m
p)
                 else if Time
prodT Time -> Time -> Bool
forall a. Ord a => a -> a -> Bool
< Time
consT
                   then -- prodT ≤ t < consT
                        -- allow the remote peer to connect to us, but we're
                        -- still not allowed to connect to it.
                        (() -> ConnectDecision ()
forall s. s -> ConnectDecision s
DisallowConnection (), PeerState m -> Maybe (PeerState m)
forall a. a -> Maybe a
Just (PeerState m -> Maybe (PeerState m))
-> PeerState m -> Maybe (PeerState m)
forall a b. (a -> b) -> a -> b
$ Set (Async m ()) -> Time -> PeerState m
forall (m :: * -> *). Set (Async m ()) -> Time -> PeerState m
SuspendedConsumer Set (Async m ())
forall a. Set a
Set.empty Time
consT)
                   else -- consT ≤ t < prodT
                        -- the local consumer is suspended shorter than local
                        -- producer; In this case we suspend both until `prodT`.
                        -- This means we effectively make local consumer
                        -- errors more sevier than the ones which come from
                        -- a local producer.
                        (() -> ConnectDecision ()
forall s. s -> ConnectDecision s
DisallowConnection (), PeerState m -> Maybe (PeerState m)
forall a. a -> Maybe a
Just (PeerState m -> Maybe (PeerState m))
-> PeerState m -> Maybe (PeerState m)
forall a b. (a -> b) -> a -> b
$ Time -> Time -> PeerState m
forall (m :: * -> *). Time -> Time -> PeerState m
SuspendedPeer Time
prodT Time
prodT)

          -- the peer is not suspended any longer
          else (() -> ConnectDecision ()
forall s. s -> ConnectDecision s
AllowConnection (), PeerState m -> Maybe (PeerState m)
forall a. a -> Maybe a
Just PeerState m
forall (m :: * -> *). PeerState m
ColdPeer)