{-# LANGUAGE DeriveFunctor #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE StandaloneDeriving #-}
{-# LANGUAGE TypeOperators #-}
{-# LANGUAGE UndecidableInstances #-}
module Ouroboros.Network.Subscription.PeerState
( SuspendDecision (..)
, suspend
, PeerState (..)
, threadsToCancel
, PeerStates (..)
, newPeerStatesVar
, newPeerStatesVarSTM
, cleanPeerStates
, runSuspendDecision
, registerConsumer
, unregisterConsumer
, registerProducer
, unregisterProducer
, BeforeConnect
, ConnectDecision (..)
, runBeforeConnect
, beforeConnectTx
, DiffTime
, alterAndLookup
) where
import Control.Exception (Exception, SomeException (..), assert)
import Control.Monad.State
import qualified Data.Map as Map
import Data.Map.Strict (Map)
import Data.Set (Set)
import qualified Data.Set as Set
import Data.Typeable (eqT, (:~:) (..))
import Control.Monad.Class.MonadAsync
import Control.Monad.Class.MonadFork
import Control.Monad.Class.MonadSTM.Strict
import Control.Monad.Class.MonadTime
import Control.Monad.Class.MonadTimer
import Data.Semigroup.Action
data SuspendDecision t
= SuspendPeer !t !t
| SuspendConsumer !t
| Throw
deriving (SuspendDecision t -> SuspendDecision t -> Bool
(SuspendDecision t -> SuspendDecision t -> Bool)
-> (SuspendDecision t -> SuspendDecision t -> Bool)
-> Eq (SuspendDecision t)
forall t. Eq t => SuspendDecision t -> SuspendDecision t -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: SuspendDecision t -> SuspendDecision t -> Bool
$c/= :: forall t. Eq t => SuspendDecision t -> SuspendDecision t -> Bool
== :: SuspendDecision t -> SuspendDecision t -> Bool
$c== :: forall t. Eq t => SuspendDecision t -> SuspendDecision t -> Bool
Eq, Eq (SuspendDecision t)
Eq (SuspendDecision t)
-> (SuspendDecision t -> SuspendDecision t -> Ordering)
-> (SuspendDecision t -> SuspendDecision t -> Bool)
-> (SuspendDecision t -> SuspendDecision t -> Bool)
-> (SuspendDecision t -> SuspendDecision t -> Bool)
-> (SuspendDecision t -> SuspendDecision t -> Bool)
-> (SuspendDecision t -> SuspendDecision t -> SuspendDecision t)
-> (SuspendDecision t -> SuspendDecision t -> SuspendDecision t)
-> Ord (SuspendDecision t)
SuspendDecision t -> SuspendDecision t -> Bool
SuspendDecision t -> SuspendDecision t -> Ordering
SuspendDecision t -> SuspendDecision t -> SuspendDecision t
forall a.
Eq a
-> (a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
forall t. Ord t => Eq (SuspendDecision t)
forall t. Ord t => SuspendDecision t -> SuspendDecision t -> Bool
forall t.
Ord t =>
SuspendDecision t -> SuspendDecision t -> Ordering
forall t.
Ord t =>
SuspendDecision t -> SuspendDecision t -> SuspendDecision t
min :: SuspendDecision t -> SuspendDecision t -> SuspendDecision t
$cmin :: forall t.
Ord t =>
SuspendDecision t -> SuspendDecision t -> SuspendDecision t
max :: SuspendDecision t -> SuspendDecision t -> SuspendDecision t
$cmax :: forall t.
Ord t =>
SuspendDecision t -> SuspendDecision t -> SuspendDecision t
>= :: SuspendDecision t -> SuspendDecision t -> Bool
$c>= :: forall t. Ord t => SuspendDecision t -> SuspendDecision t -> Bool
> :: SuspendDecision t -> SuspendDecision t -> Bool
$c> :: forall t. Ord t => SuspendDecision t -> SuspendDecision t -> Bool
<= :: SuspendDecision t -> SuspendDecision t -> Bool
$c<= :: forall t. Ord t => SuspendDecision t -> SuspendDecision t -> Bool
< :: SuspendDecision t -> SuspendDecision t -> Bool
$c< :: forall t. Ord t => SuspendDecision t -> SuspendDecision t -> Bool
compare :: SuspendDecision t -> SuspendDecision t -> Ordering
$ccompare :: forall t.
Ord t =>
SuspendDecision t -> SuspendDecision t -> Ordering
$cp1Ord :: forall t. Ord t => Eq (SuspendDecision t)
Ord, Int -> SuspendDecision t -> ShowS
[SuspendDecision t] -> ShowS
SuspendDecision t -> String
(Int -> SuspendDecision t -> ShowS)
-> (SuspendDecision t -> String)
-> ([SuspendDecision t] -> ShowS)
-> Show (SuspendDecision t)
forall t. Show t => Int -> SuspendDecision t -> ShowS
forall t. Show t => [SuspendDecision t] -> ShowS
forall t. Show t => SuspendDecision t -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [SuspendDecision t] -> ShowS
$cshowList :: forall t. Show t => [SuspendDecision t] -> ShowS
show :: SuspendDecision t -> String
$cshow :: forall t. Show t => SuspendDecision t -> String
showsPrec :: Int -> SuspendDecision t -> ShowS
$cshowsPrec :: forall t. Show t => Int -> SuspendDecision t -> ShowS
Show, a -> SuspendDecision b -> SuspendDecision a
(a -> b) -> SuspendDecision a -> SuspendDecision b
(forall a b. (a -> b) -> SuspendDecision a -> SuspendDecision b)
-> (forall a b. a -> SuspendDecision b -> SuspendDecision a)
-> Functor SuspendDecision
forall a b. a -> SuspendDecision b -> SuspendDecision a
forall a b. (a -> b) -> SuspendDecision a -> SuspendDecision b
forall (f :: * -> *).
(forall a b. (a -> b) -> f a -> f b)
-> (forall a b. a -> f b -> f a) -> Functor f
<$ :: a -> SuspendDecision b -> SuspendDecision a
$c<$ :: forall a b. a -> SuspendDecision b -> SuspendDecision a
fmap :: (a -> b) -> SuspendDecision a -> SuspendDecision b
$cfmap :: forall a b. (a -> b) -> SuspendDecision a -> SuspendDecision b
Functor)
consumerSuspendedUntil :: SuspendDecision t -> Maybe t
consumerSuspendedUntil :: SuspendDecision t -> Maybe t
consumerSuspendedUntil (SuspendPeer t
_ t
consT) = t -> Maybe t
forall a. a -> Maybe a
Just t
consT
consumerSuspendedUntil (SuspendConsumer t
consT) = t -> Maybe t
forall a. a -> Maybe a
Just t
consT
consumerSuspendedUntil SuspendDecision t
Throw = Maybe t
forall a. Maybe a
Nothing
producerSuspendedUntil :: SuspendDecision t -> Maybe t
producerSuspendedUntil :: SuspendDecision t -> Maybe t
producerSuspendedUntil (SuspendPeer t
prodT t
_) = t -> Maybe t
forall a. a -> Maybe a
Just t
prodT
producerSuspendedUntil (SuspendConsumer t
_) = Maybe t
forall a. Maybe a
Nothing
producerSuspendedUntil SuspendDecision t
Throw = Maybe t
forall a. Maybe a
Nothing
instance Ord t => Semigroup (SuspendDecision t) where
SuspendDecision t
Throw <> :: SuspendDecision t -> SuspendDecision t -> SuspendDecision t
<> SuspendDecision t
_ = SuspendDecision t
forall t. SuspendDecision t
Throw
SuspendDecision t
_ <> SuspendDecision t
Throw = SuspendDecision t
forall t. SuspendDecision t
Throw
SuspendPeer t
prodT t
consT <> SuspendPeer t
prodT' t
consT'
= t -> t -> SuspendDecision t
forall t. t -> t -> SuspendDecision t
SuspendPeer (t
prodT t -> t -> t
forall a. Ord a => a -> a -> a
`max` t
prodT') (t
consT t -> t -> t
forall a. Ord a => a -> a -> a
`max` t
consT')
SuspendConsumer t
consT <> SuspendPeer t
prodT t
consT'
= t -> t -> SuspendDecision t
forall t. t -> t -> SuspendDecision t
SuspendPeer t
prodT (t
consT t -> t -> t
forall a. Ord a => a -> a -> a
`max` t
consT')
SuspendPeer t
prodT t
consT <> SuspendConsumer t
consT'
= t -> t -> SuspendDecision t
forall t. t -> t -> SuspendDecision t
SuspendPeer t
prodT (t
consT t -> t -> t
forall a. Ord a => a -> a -> a
`max` t
consT')
SuspendConsumer t
consT <> SuspendConsumer t
consT'
= t -> SuspendDecision t
forall t. t -> SuspendDecision t
SuspendConsumer (t
consT t -> t -> t
forall a. Ord a => a -> a -> a
`max` t
consT')
data PeerState m
= HotPeer !(Set (Async m ())) !(Set (Async m ()))
| SuspendedConsumer !(Set (Async m ())) !Time
| SuspendedPeer !Time !Time
| ColdPeer
instance ( MonadAsync m
, Show (ThreadId m)
, Ord (ThreadId m)
) => Show (PeerState m) where
show :: PeerState m -> String
show (HotPeer Set (Async m ())
producers Set (Async m ())
consumers)
= String
"HotPeer"
String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
" "
String -> ShowS
forall a. [a] -> [a] -> [a]
++ Set (ThreadId m) -> String
forall a. Show a => a -> String
show ((Async m () -> ThreadId m) -> Set (Async m ()) -> Set (ThreadId m)
forall b a. Ord b => (a -> b) -> Set a -> Set b
Set.map Async m () -> ThreadId m
forall (m :: * -> *) a. MonadAsync m => Async m a -> ThreadId m
asyncThreadId Set (Async m ())
producers)
String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
" "
String -> ShowS
forall a. [a] -> [a] -> [a]
++ Set (ThreadId m) -> String
forall a. Show a => a -> String
show ((Async m () -> ThreadId m) -> Set (Async m ()) -> Set (ThreadId m)
forall b a. Ord b => (a -> b) -> Set a -> Set b
Set.map Async m () -> ThreadId m
forall (m :: * -> *) a. MonadAsync m => Async m a -> ThreadId m
asyncThreadId Set (Async m ())
consumers)
show (SuspendedConsumer Set (Async m ())
producers Time
consT)
= String
"SuspendedConsumer"
String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
" "
String -> ShowS
forall a. [a] -> [a] -> [a]
++ Set (ThreadId m) -> String
forall a. Show a => a -> String
show ((Async m () -> ThreadId m) -> Set (Async m ()) -> Set (ThreadId m)
forall b a. Ord b => (a -> b) -> Set a -> Set b
Set.map Async m () -> ThreadId m
forall (m :: * -> *) a. MonadAsync m => Async m a -> ThreadId m
asyncThreadId Set (Async m ())
producers)
String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
" "
String -> ShowS
forall a. [a] -> [a] -> [a]
++ Time -> String
forall a. Show a => a -> String
show Time
consT
show (SuspendedPeer Time
prodT Time
consT)
= String
"SuspendedPeer"
String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
" "
String -> ShowS
forall a. [a] -> [a] -> [a]
++ Time -> String
forall a. Show a => a -> String
show Time
prodT
String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
" "
String -> ShowS
forall a. [a] -> [a] -> [a]
++ Time -> String
forall a. Show a => a -> String
show Time
consT
show PeerState m
ColdPeer = String
"ColdPeer"
deriving instance Eq (Async m ()) => Eq (PeerState m)
deriving instance Ord (Async m ()) => Ord (PeerState m)
instance SAct (SuspendDecision Time) (Maybe (PeerState m)) where
Maybe (PeerState m)
_ <| :: Maybe (PeerState m) -> SuspendDecision Time -> Maybe (PeerState m)
<| SuspendDecision Time
Throw = Maybe (PeerState m)
forall a. Maybe a
Nothing
Maybe (PeerState m)
Nothing <| SuspendDecision Time
_ = Maybe (PeerState m)
forall a. Maybe a
Nothing
(Just PeerState m
ColdPeer) <| (SuspendConsumer Time
consT)
= PeerState m -> Maybe (PeerState m)
forall a. a -> Maybe a
Just (PeerState m -> Maybe (PeerState m))
-> PeerState m -> Maybe (PeerState m)
forall a b. (a -> b) -> a -> b
$ Set (Async m ()) -> Time -> PeerState m
forall (m :: * -> *). Set (Async m ()) -> Time -> PeerState m
SuspendedConsumer Set (Async m ())
forall a. Set a
Set.empty Time
consT
(Just PeerState m
ColdPeer) <| (SuspendPeer Time
prodT Time
consT)
= PeerState m -> Maybe (PeerState m)
forall a. a -> Maybe a
Just (Time -> Time -> PeerState m
forall (m :: * -> *). Time -> Time -> PeerState m
SuspendedPeer Time
prodT Time
consT)
(Just (HotPeer Set (Async m ())
producers Set (Async m ())
_consumers)) <| (SuspendConsumer Time
consT)
= PeerState m -> Maybe (PeerState m)
forall a. a -> Maybe a
Just (PeerState m -> Maybe (PeerState m))
-> PeerState m -> Maybe (PeerState m)
forall a b. (a -> b) -> a -> b
$ Set (Async m ()) -> Time -> PeerState m
forall (m :: * -> *). Set (Async m ()) -> Time -> PeerState m
SuspendedConsumer Set (Async m ())
producers Time
consT
(Just (HotPeer Set (Async m ())
_prodcuers Set (Async m ())
_consumers)) <| (SuspendPeer Time
prodT Time
consT)
= PeerState m -> Maybe (PeerState m)
forall a. a -> Maybe a
Just (PeerState m -> Maybe (PeerState m))
-> PeerState m -> Maybe (PeerState m)
forall a b. (a -> b) -> a -> b
$ Time -> Time -> PeerState m
forall (m :: * -> *). Time -> Time -> PeerState m
SuspendedPeer Time
prodT Time
consT
(Just (SuspendedConsumer Set (Async m ())
producers Time
consT)) <| (SuspendConsumer Time
consT')
= PeerState m -> Maybe (PeerState m)
forall a. a -> Maybe a
Just (PeerState m -> Maybe (PeerState m))
-> PeerState m -> Maybe (PeerState m)
forall a b. (a -> b) -> a -> b
$ Set (Async m ()) -> Time -> PeerState m
forall (m :: * -> *). Set (Async m ()) -> Time -> PeerState m
SuspendedConsumer Set (Async m ())
producers (Time
consT Time -> Time -> Time
forall a. Ord a => a -> a -> a
`max` Time
consT')
(Just (SuspendedConsumer Set (Async m ())
_producers Time
consT)) <| (SuspendPeer Time
prodT Time
consT')
= PeerState m -> Maybe (PeerState m)
forall a. a -> Maybe a
Just (PeerState m -> Maybe (PeerState m))
-> PeerState m -> Maybe (PeerState m)
forall a b. (a -> b) -> a -> b
$ Time -> Time -> PeerState m
forall (m :: * -> *). Time -> Time -> PeerState m
SuspendedPeer Time
prodT (Time
consT Time -> Time -> Time
forall a. Ord a => a -> a -> a
`max` Time
consT')
(Just (SuspendedPeer Time
prodT Time
consT)) <| SuspendDecision Time
cmd
= case SuspendDecision Time -> Maybe Time
forall t. SuspendDecision t -> Maybe t
producerSuspendedUntil SuspendDecision Time
cmd of
Maybe Time
Nothing ->
PeerState m -> Maybe (PeerState m)
forall a. a -> Maybe a
Just (PeerState m -> Maybe (PeerState m))
-> PeerState m -> Maybe (PeerState m)
forall a b. (a -> b) -> a -> b
$ Time -> Time -> PeerState m
forall (m :: * -> *). Time -> Time -> PeerState m
SuspendedPeer
Time
prodT
(Time -> (Time -> Time) -> Maybe Time -> Time
forall b a. b -> (a -> b) -> Maybe a -> b
maybe Time
consT (Time
consT Time -> Time -> Time
forall a. Ord a => a -> a -> a
`max`) (Maybe Time -> Time) -> Maybe Time -> Time
forall a b. (a -> b) -> a -> b
$ SuspendDecision Time -> Maybe Time
forall t. SuspendDecision t -> Maybe t
consumerSuspendedUntil SuspendDecision Time
cmd)
Just Time
prodT' ->
PeerState m -> Maybe (PeerState m)
forall a. a -> Maybe a
Just (PeerState m -> Maybe (PeerState m))
-> PeerState m -> Maybe (PeerState m)
forall a b. (a -> b) -> a -> b
$ Time -> Time -> PeerState m
forall (m :: * -> *). Time -> Time -> PeerState m
SuspendedPeer
(Time
prodT Time -> Time -> Time
forall a. Ord a => a -> a -> a
`max` Time
prodT')
(Time -> (Time -> Time) -> Maybe Time -> Time
forall b a. b -> (a -> b) -> Maybe a -> b
maybe Time
consT (Time
consT Time -> Time -> Time
forall a. Ord a => a -> a -> a
`max`) (Maybe Time -> Time) -> Maybe Time -> Time
forall a b. (a -> b) -> a -> b
$ SuspendDecision Time -> Maybe Time
forall t. SuspendDecision t -> Maybe t
consumerSuspendedUntil SuspendDecision Time
cmd)
threadsToCancel :: Ord (Async m ())
=> PeerState m
-> SuspendDecision diffTime
-> Set (Async m ())
threadsToCancel :: PeerState m -> SuspendDecision diffTime -> Set (Async m ())
threadsToCancel PeerState m
_ SuspendDecision diffTime
Throw
= Set (Async m ())
forall a. Set a
Set.empty
threadsToCancel PeerState m
ColdPeer SuspendDecision diffTime
_
= Set (Async m ())
forall a. Set a
Set.empty
threadsToCancel (HotPeer Set (Async m ())
_producers Set (Async m ())
consumers) SuspendConsumer{}
= Set (Async m ())
consumers
threadsToCancel (HotPeer Set (Async m ())
consumers Set (Async m ())
producers) SuspendPeer{}
= Set (Async m ())
consumers Set (Async m ()) -> Set (Async m ()) -> Set (Async m ())
forall a. Semigroup a => a -> a -> a
<> Set (Async m ())
producers
threadsToCancel SuspendedConsumer{} SuspendConsumer{}
= Set (Async m ())
forall a. Set a
Set.empty
threadsToCancel (SuspendedConsumer Set (Async m ())
producers Time
_consT) SuspendPeer{}
= Set (Async m ())
producers
threadsToCancel SuspendedPeer{} SuspendDecision diffTime
_cmd
= Set (Async m ())
forall a. Set a
Set.empty
suspend :: Ord (Async m ())
=> Maybe (PeerState m)
-> SuspendDecision Time
-> ( Set (Async m ())
, Maybe (PeerState m)
)
suspend :: Maybe (PeerState m)
-> SuspendDecision Time -> (Set (Async m ()), Maybe (PeerState m))
suspend Maybe (PeerState m)
mbps SuspendDecision Time
cmd = ( Set (Async m ())
-> (PeerState m -> Set (Async m ()))
-> Maybe (PeerState m)
-> Set (Async m ())
forall b a. b -> (a -> b) -> Maybe a -> b
maybe Set (Async m ())
forall a. Set a
Set.empty (PeerState m -> SuspendDecision Time -> Set (Async m ())
forall (m :: * -> *) diffTime.
Ord (Async m ()) =>
PeerState m -> SuspendDecision diffTime -> Set (Async m ())
`threadsToCancel` SuspendDecision Time
cmd) Maybe (PeerState m)
mbps
, Maybe (PeerState m)
mbps Maybe (PeerState m) -> SuspendDecision Time -> Maybe (PeerState m)
forall s x. SAct s x => x -> s -> x
<| SuspendDecision Time
cmd
)
data PeerStates m addr where
PeerStates :: !(Map addr (PeerState m))
-> PeerStates m addr
ThrowException :: Exception e
=> e
-> PeerStates m addr
instance Show addr
=> Show (PeerStates IO addr) where
show :: PeerStates IO addr -> String
show (PeerStates Map addr (PeerState IO)
ps) = String
"PeerStates " String -> ShowS
forall a. [a] -> [a] -> [a]
++ Map addr (PeerState IO) -> String
forall a. Show a => a -> String
show Map addr (PeerState IO)
ps
show (ThrowException e
e) = String
"ThrowException " String -> ShowS
forall a. [a] -> [a] -> [a]
++ e -> String
forall a. Show a => a -> String
show e
e
instance Eq addr
=> Eq (PeerStates IO addr) where
ThrowException (e
_ :: e) == :: PeerStates IO addr -> PeerStates IO addr -> Bool
== ThrowException (e
_ :: e') =
case Maybe (e :~: e)
forall k (a :: k) (b :: k).
(Typeable a, Typeable b) =>
Maybe (a :~: b)
eqT :: Maybe (e :~: e') of
Maybe (e :~: e)
Nothing -> Bool
False
Just e :~: e
Refl -> Bool
True
PeerStates Map addr (PeerState IO)
ps == PeerStates Map addr (PeerState IO)
ps' = Map addr (PeerState IO)
ps Map addr (PeerState IO) -> Map addr (PeerState IO) -> Bool
forall a. Eq a => a -> a -> Bool
== Map addr (PeerState IO)
ps'
PeerStates IO addr
_ == PeerStates IO addr
_ = Bool
False
newPeerStatesVarSTM :: MonadSTM m => STM m (StrictTVar m (PeerStates m addr))
newPeerStatesVarSTM :: STM m (StrictTVar m (PeerStates m addr))
newPeerStatesVarSTM = PeerStates m addr -> STM m (StrictTVar m (PeerStates m addr))
forall (m :: * -> *) a. MonadSTM m => a -> STM m (StrictTVar m a)
newTVar (Map addr (PeerState m) -> PeerStates m addr
forall addr (m :: * -> *).
Map addr (PeerState m) -> PeerStates m addr
PeerStates Map addr (PeerState m)
forall k a. Map k a
Map.empty)
newPeerStatesVar :: MonadSTM m => m (StrictTVar m (PeerStates m addr))
newPeerStatesVar :: m (StrictTVar m (PeerStates m addr))
newPeerStatesVar = STM m (StrictTVar m (PeerStates m addr))
-> m (StrictTVar m (PeerStates m addr))
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically STM m (StrictTVar m (PeerStates m addr))
forall (m :: * -> *) addr.
MonadSTM m =>
STM m (StrictTVar m (PeerStates m addr))
newPeerStatesVarSTM
cleanPeerStates :: ( MonadSTM m
, MonadAsync m
, MonadTime m
, MonadTimer m
)
=> DiffTime
-> StrictTVar m (PeerStates m addr)
-> m ()
cleanPeerStates :: DiffTime -> StrictTVar m (PeerStates m addr) -> m ()
cleanPeerStates DiffTime
interval StrictTVar m (PeerStates m addr)
v = m ()
go
where
go :: m ()
go = do
DiffTime -> m ()
forall (m :: * -> *). MonadDelay m => DiffTime -> m ()
threadDelay DiffTime
interval
Time
t <- m Time
forall (m :: * -> *). MonadMonotonicTime m => m Time
getMonotonicTime
Bool
continue <- STM m Bool -> m Bool
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m Bool -> m Bool) -> STM m Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ do
PeerStates m addr
s <- StrictTVar m (PeerStates m addr) -> STM m (PeerStates m addr)
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m (PeerStates m addr)
v
case PeerStates m addr
s of
ThrowException e
_
-> Bool -> STM m Bool
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
False
PeerStates Map addr (PeerState m)
ps
-> Bool
True Bool -> STM m () -> STM m Bool
forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ (StrictTVar m (PeerStates m addr) -> PeerStates m addr -> STM m ()
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
StrictTVar m a -> a -> STM m ()
writeTVar StrictTVar m (PeerStates m addr)
v (PeerStates m addr -> STM m ()) -> PeerStates m addr -> STM m ()
forall a b. (a -> b) -> a -> b
$! (Map addr (PeerState m) -> PeerStates m addr
forall addr (m :: * -> *).
Map addr (PeerState m) -> PeerStates m addr
PeerStates (Map addr (PeerState m) -> PeerStates m addr)
-> Map addr (PeerState m) -> PeerStates m addr
forall a b. (a -> b) -> a -> b
$ (PeerState m -> Maybe (PeerState m))
-> Map addr (PeerState m) -> Map addr (PeerState m)
forall a b k. (a -> Maybe b) -> Map k a -> Map k b
Map.mapMaybe (Time -> PeerState m -> Maybe (PeerState m)
forall (m :: * -> *). Time -> PeerState m -> Maybe (PeerState m)
cleanPeerState Time
t) Map addr (PeerState m)
ps))
if Bool
continue
then m ()
go
else () -> m ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
cleanPeerState :: Time -> PeerState m -> Maybe (PeerState m)
cleanPeerState :: Time -> PeerState m -> Maybe (PeerState m)
cleanPeerState Time
_t ColdPeer{} = Maybe (PeerState m)
forall a. Maybe a
Nothing
cleanPeerState Time
_ ps :: PeerState m
ps@HotPeer{} = PeerState m -> Maybe (PeerState m)
forall a. a -> Maybe a
Just PeerState m
ps
cleanPeerState Time
t ps :: PeerState m
ps@(SuspendedConsumer Set (Async m ())
producers Time
consT)
| Set (Async m ()) -> Bool
forall a. Set a -> Bool
Set.null Set (Async m ())
producers Bool -> Bool -> Bool
&& Time
consT Time -> Time -> Bool
forall a. Ord a => a -> a -> Bool
>= Time
t
= Maybe (PeerState m)
forall a. Maybe a
Nothing
| Time
consT Time -> Time -> Bool
forall a. Ord a => a -> a -> Bool
>= Time
t
= PeerState m -> Maybe (PeerState m)
forall a. a -> Maybe a
Just (Set (Async m ()) -> Set (Async m ()) -> PeerState m
forall (m :: * -> *).
Set (Async m ()) -> Set (Async m ()) -> PeerState m
HotPeer Set (Async m ())
producers Set (Async m ())
forall a. Set a
Set.empty)
| Bool
otherwise
= PeerState m -> Maybe (PeerState m)
forall a. a -> Maybe a
Just PeerState m
ps
cleanPeerState Time
t ps :: PeerState m
ps@(SuspendedPeer Time
prodT Time
consT)
| Time
prodT Time -> Time -> Bool
forall a. Ord a => a -> a -> Bool
< Time
t
= PeerState m -> Maybe (PeerState m)
forall a. a -> Maybe a
Just PeerState m
ps
| Time
consT Time -> Time -> Bool
forall a. Ord a => a -> a -> Bool
< Time
t
= PeerState m -> Maybe (PeerState m)
forall a. a -> Maybe a
Just (Set (Async m ()) -> Time -> PeerState m
forall (m :: * -> *). Set (Async m ()) -> Time -> PeerState m
SuspendedConsumer Set (Async m ())
forall a. Set a
Set.empty Time
consT)
| Bool
otherwise
= Maybe (PeerState m)
forall a. Maybe a
Nothing
runSuspendDecision
:: forall m addr e.
( Ord addr
, Ord (Async m ())
, Exception e
)
=> Time
-> addr
-> e
-> SuspendDecision DiffTime
-> PeerStates m addr
-> ( PeerStates m addr
, Set (Async m ())
)
runSuspendDecision :: Time
-> addr
-> e
-> SuspendDecision DiffTime
-> PeerStates m addr
-> (PeerStates m addr, Set (Async m ()))
runSuspendDecision Time
_t addr
_addr e
_e SuspendDecision DiffTime
_cmd ps0 :: PeerStates m addr
ps0@ThrowException{} =
( PeerStates m addr
ps0
, Set (Async m ())
forall a. Set a
Set.empty
)
runSuspendDecision Time
_t addr
_addr e
e SuspendDecision DiffTime
Throw PeerStates m addr
_ =
( SomeException -> PeerStates m addr
forall e (m :: * -> *) addr. Exception e => e -> PeerStates m addr
ThrowException (e -> SomeException
forall e. Exception e => e -> SomeException
SomeException e
e)
, Set (Async m ())
forall a. Set a
Set.empty
)
runSuspendDecision Time
t addr
addr e
_e SuspendDecision DiffTime
cmd (PeerStates Map addr (PeerState m)
ps0) =
(Map addr (PeerState m), Maybe (Set (Async m ())))
-> (PeerStates m addr, Set (Async m ()))
gn ((Map addr (PeerState m), Maybe (Set (Async m ())))
-> (PeerStates m addr, Set (Async m ())))
-> (Map addr (PeerState m), Maybe (Set (Async m ())))
-> (PeerStates m addr, Set (Async m ()))
forall a b. (a -> b) -> a -> b
$ (Maybe (PeerState m) -> (Set (Async m ()), Maybe (PeerState m)))
-> addr
-> Map addr (PeerState m)
-> (Map addr (PeerState m), Maybe (Set (Async m ())))
forall k s a.
Ord k =>
(Maybe a -> (s, Maybe a)) -> k -> Map k a -> (Map k a, Maybe s)
alterAndLookup Maybe (PeerState m) -> (Set (Async m ()), Maybe (PeerState m))
fn addr
addr Map addr (PeerState m)
ps0
where
fn :: Maybe (PeerState m)
-> ( Set (Async m ())
, Maybe (PeerState m)
)
fn :: Maybe (PeerState m) -> (Set (Async m ()), Maybe (PeerState m))
fn Maybe (PeerState m)
mbps = ( Set (Async m ())
-> (PeerState m -> Set (Async m ()))
-> Maybe (PeerState m)
-> Set (Async m ())
forall b a. b -> (a -> b) -> Maybe a -> b
maybe Set (Async m ())
forall a. Set a
Set.empty (PeerState m -> SuspendDecision DiffTime -> Set (Async m ())
forall (m :: * -> *) diffTime.
Ord (Async m ()) =>
PeerState m -> SuspendDecision diffTime -> Set (Async m ())
`threadsToCancel` SuspendDecision DiffTime
cmd) Maybe (PeerState m)
mbps
, Maybe (PeerState m)
mbps Maybe (PeerState m) -> SuspendDecision Time -> Maybe (PeerState m)
forall s x. SAct s x => x -> s -> x
<| ((DiffTime -> Time -> Time) -> Time -> DiffTime -> Time
forall a b c. (a -> b -> c) -> b -> a -> c
flip DiffTime -> Time -> Time
addTime Time
t (DiffTime -> Time)
-> SuspendDecision DiffTime -> SuspendDecision Time
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> SuspendDecision DiffTime
cmd)
)
gn :: ( Map addr (PeerState m)
, Maybe (Set (Async m ()))
)
-> ( PeerStates m addr
, Set (Async m ())
)
gn :: (Map addr (PeerState m), Maybe (Set (Async m ())))
-> (PeerStates m addr, Set (Async m ()))
gn (Map addr (PeerState m)
ps, Maybe (Set (Async m ()))
Nothing) = (Map addr (PeerState m) -> PeerStates m addr
forall addr (m :: * -> *).
Map addr (PeerState m) -> PeerStates m addr
PeerStates Map addr (PeerState m)
ps, Set (Async m ())
forall a. Set a
Set.empty)
gn (Map addr (PeerState m)
ps, Just Set (Async m ())
s) = (Map addr (PeerState m) -> PeerStates m addr
forall addr (m :: * -> *).
Map addr (PeerState m) -> PeerStates m addr
PeerStates Map addr (PeerState m)
ps, Set (Async m ())
s)
alterAndLookup
:: forall k s a.
Ord k
=> (Maybe a -> (s, Maybe a))
-> k
-> Map k a
-> ( Map k a
, Maybe s
)
alterAndLookup :: (Maybe a -> (s, Maybe a)) -> k -> Map k a -> (Map k a, Maybe s)
alterAndLookup Maybe a -> (s, Maybe a)
f k
k Map k a
m = State (Maybe s) (Map k a) -> Maybe s -> (Map k a, Maybe s)
forall s a. State s a -> s -> (a, s)
runState ((Maybe a -> StateT (Maybe s) Identity (Maybe a))
-> k -> Map k a -> State (Maybe s) (Map k a)
forall (f :: * -> *) k a.
(Functor f, Ord k) =>
(Maybe a -> f (Maybe a)) -> k -> Map k a -> f (Map k a)
Map.alterF Maybe a -> StateT (Maybe s) Identity (Maybe a)
g k
k Map k a
m) Maybe s
forall a. Maybe a
Nothing
where
g :: Maybe a -> State (Maybe s) (Maybe a)
g :: Maybe a -> StateT (Maybe s) Identity (Maybe a)
g Maybe a
mba = case Maybe a -> (s, Maybe a)
f Maybe a
mba of
(s
s, Maybe a
mba') -> Maybe a
mba' Maybe a
-> StateT (Maybe s) Identity ()
-> StateT (Maybe s) Identity (Maybe a)
forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ (Maybe s -> Maybe s) -> StateT (Maybe s) Identity ()
forall s (m :: * -> *). MonadState s m => (s -> s) -> m ()
modify' (Maybe s -> Maybe s -> Maybe s
forall a b. a -> b -> a
const (s -> Maybe s
forall a. a -> Maybe a
Just s
s))
registerProducer :: forall m addr.
( Ord addr
, Ord (Async m ())
)
=> addr
-> Async m ()
-> PeerStates m addr
-> PeerStates m addr
registerProducer :: addr -> Async m () -> PeerStates m addr -> PeerStates m addr
registerProducer addr
_addr Async m ()
_tid ps :: PeerStates m addr
ps@ThrowException{} = PeerStates m addr
ps
registerProducer addr
addr Async m ()
tid (PeerStates Map addr (PeerState m)
peerStates) =
Map addr (PeerState m) -> PeerStates m addr
forall addr (m :: * -> *).
Map addr (PeerState m) -> PeerStates m addr
PeerStates (Map addr (PeerState m) -> PeerStates m addr)
-> Map addr (PeerState m) -> PeerStates m addr
forall a b. (a -> b) -> a -> b
$ (Maybe (PeerState m) -> Maybe (PeerState m))
-> addr -> Map addr (PeerState m) -> Map addr (PeerState m)
forall k a.
Ord k =>
(Maybe a -> Maybe a) -> k -> Map k a -> Map k a
Map.alter Maybe (PeerState m) -> Maybe (PeerState m)
fn addr
addr Map addr (PeerState m)
peerStates
where
fn :: Maybe (PeerState m) -> Maybe (PeerState m)
fn :: Maybe (PeerState m) -> Maybe (PeerState m)
fn Maybe (PeerState m)
Nothing =
PeerState m -> Maybe (PeerState m)
forall a. a -> Maybe a
Just (Set (Async m ()) -> Set (Async m ()) -> PeerState m
forall (m :: * -> *).
Set (Async m ()) -> Set (Async m ()) -> PeerState m
HotPeer (Async m () -> Set (Async m ())
forall a. a -> Set a
Set.singleton Async m ()
tid) Set (Async m ())
forall a. Set a
Set.empty)
fn (Just (HotPeer Set (Async m ())
producers Set (Async m ())
consumers)) =
PeerState m -> Maybe (PeerState m)
forall a. a -> Maybe a
Just (Set (Async m ()) -> Set (Async m ()) -> PeerState m
forall (m :: * -> *).
Set (Async m ()) -> Set (Async m ()) -> PeerState m
HotPeer (Async m ()
tid Async m () -> Set (Async m ()) -> Set (Async m ())
forall a. Ord a => a -> Set a -> Set a
`Set.insert` Set (Async m ())
producers) Set (Async m ())
consumers)
fn (Just PeerState m
ColdPeer) =
PeerState m -> Maybe (PeerState m)
forall a. a -> Maybe a
Just (Set (Async m ()) -> Set (Async m ()) -> PeerState m
forall (m :: * -> *).
Set (Async m ()) -> Set (Async m ()) -> PeerState m
HotPeer (Async m () -> Set (Async m ())
forall a. a -> Set a
Set.singleton Async m ()
tid) Set (Async m ())
forall a. Set a
Set.empty)
fn (Just (SuspendedConsumer Set (Async m ())
producers Time
consT)) =
PeerState m -> Maybe (PeerState m)
forall a. a -> Maybe a
Just (Set (Async m ()) -> Time -> PeerState m
forall (m :: * -> *). Set (Async m ()) -> Time -> PeerState m
SuspendedConsumer (Async m ()
tid Async m () -> Set (Async m ()) -> Set (Async m ())
forall a. Ord a => a -> Set a -> Set a
`Set.insert` Set (Async m ())
producers) Time
consT)
fn (Just ps :: PeerState m
ps@SuspendedPeer{}) =
Bool -> Maybe (PeerState m) -> Maybe (PeerState m)
forall a. HasCallStack => Bool -> a -> a
assert Bool
False (Maybe (PeerState m) -> Maybe (PeerState m))
-> Maybe (PeerState m) -> Maybe (PeerState m)
forall a b. (a -> b) -> a -> b
$ PeerState m -> Maybe (PeerState m)
forall a. a -> Maybe a
Just PeerState m
ps
unregisterProducer :: forall m addr.
( Ord addr
, Ord (Async m ())
)
=> addr
-> Async m ()
-> PeerStates m addr
-> PeerStates m addr
unregisterProducer :: addr -> Async m () -> PeerStates m addr -> PeerStates m addr
unregisterProducer addr
_addr Async m ()
_tid ps :: PeerStates m addr
ps@ThrowException{} = PeerStates m addr
ps
unregisterProducer addr
addr Async m ()
tid (PeerStates Map addr (PeerState m)
peerStates) =
Map addr (PeerState m) -> PeerStates m addr
forall addr (m :: * -> *).
Map addr (PeerState m) -> PeerStates m addr
PeerStates (Map addr (PeerState m) -> PeerStates m addr)
-> Map addr (PeerState m) -> PeerStates m addr
forall a b. (a -> b) -> a -> b
$ (Maybe (PeerState m) -> Maybe (PeerState m))
-> addr -> Map addr (PeerState m) -> Map addr (PeerState m)
forall k a.
Ord k =>
(Maybe a -> Maybe a) -> k -> Map k a -> Map k a
Map.alter Maybe (PeerState m) -> Maybe (PeerState m)
fn addr
addr Map addr (PeerState m)
peerStates
where
fn :: Maybe (PeerState m) -> Maybe (PeerState m)
fn :: Maybe (PeerState m) -> Maybe (PeerState m)
fn Maybe (PeerState m)
Nothing = Maybe (PeerState m)
forall a. Maybe a
Nothing
fn (Just (HotPeer Set (Async m ())
producers Set (Async m ())
consumers)) =
let producers' :: Set (Async m ())
producers' = Async m ()
tid Async m () -> Set (Async m ()) -> Set (Async m ())
forall a. Ord a => a -> Set a -> Set a
`Set.delete` Set (Async m ())
producers
in if Set (Async m ()) -> Bool
forall a. Set a -> Bool
Set.null Set (Async m ())
producers' Bool -> Bool -> Bool
&& Set (Async m ()) -> Bool
forall a. Set a -> Bool
Set.null Set (Async m ())
consumers
then Maybe (PeerState m)
forall a. Maybe a
Nothing
else PeerState m -> Maybe (PeerState m)
forall a. a -> Maybe a
Just (Set (Async m ()) -> Set (Async m ()) -> PeerState m
forall (m :: * -> *).
Set (Async m ()) -> Set (Async m ()) -> PeerState m
HotPeer Set (Async m ())
producers' Set (Async m ())
consumers)
fn (Just PeerState m
ColdPeer) = Maybe (PeerState m)
forall a. Maybe a
Nothing
fn (Just p :: PeerState m
p@SuspendedPeer{}) = PeerState m -> Maybe (PeerState m)
forall a. a -> Maybe a
Just PeerState m
p
fn (Just (SuspendedConsumer Set (Async m ())
producers Time
consT)) =
PeerState m -> Maybe (PeerState m)
forall a. a -> Maybe a
Just (Set (Async m ()) -> Time -> PeerState m
forall (m :: * -> *). Set (Async m ()) -> Time -> PeerState m
SuspendedConsumer (Async m ()
tid Async m () -> Set (Async m ()) -> Set (Async m ())
forall a. Ord a => a -> Set a -> Set a
`Set.delete` Set (Async m ())
producers) Time
consT)
registerConsumer :: forall m addr.
( Ord addr
, Ord (Async m ())
)
=> addr
-> Async m ()
-> PeerStates m addr
-> PeerStates m addr
registerConsumer :: addr -> Async m () -> PeerStates m addr -> PeerStates m addr
registerConsumer addr
_addr Async m ()
_tid ps :: PeerStates m addr
ps@ThrowException{} = PeerStates m addr
ps
registerConsumer addr
addr Async m ()
tid (PeerStates Map addr (PeerState m)
peerStates) =
Map addr (PeerState m) -> PeerStates m addr
forall addr (m :: * -> *).
Map addr (PeerState m) -> PeerStates m addr
PeerStates (Map addr (PeerState m) -> PeerStates m addr)
-> Map addr (PeerState m) -> PeerStates m addr
forall a b. (a -> b) -> a -> b
$ (Maybe (PeerState m) -> Maybe (PeerState m))
-> addr -> Map addr (PeerState m) -> Map addr (PeerState m)
forall k a.
Ord k =>
(Maybe a -> Maybe a) -> k -> Map k a -> Map k a
Map.alter Maybe (PeerState m) -> Maybe (PeerState m)
fn addr
addr Map addr (PeerState m)
peerStates
where
fn :: Maybe (PeerState m) -> Maybe (PeerState m)
fn :: Maybe (PeerState m) -> Maybe (PeerState m)
fn Maybe (PeerState m)
Nothing =
PeerState m -> Maybe (PeerState m)
forall a. a -> Maybe a
Just (Set (Async m ()) -> Set (Async m ()) -> PeerState m
forall (m :: * -> *).
Set (Async m ()) -> Set (Async m ()) -> PeerState m
HotPeer Set (Async m ())
forall a. Set a
Set.empty (Async m () -> Set (Async m ())
forall a. a -> Set a
Set.singleton Async m ()
tid))
fn (Just (HotPeer Set (Async m ())
producers Set (Async m ())
consumers)) =
PeerState m -> Maybe (PeerState m)
forall a. a -> Maybe a
Just (Set (Async m ()) -> Set (Async m ()) -> PeerState m
forall (m :: * -> *).
Set (Async m ()) -> Set (Async m ()) -> PeerState m
HotPeer Set (Async m ())
producers (Async m ()
tid Async m () -> Set (Async m ()) -> Set (Async m ())
forall a. Ord a => a -> Set a -> Set a
`Set.insert` Set (Async m ())
consumers))
fn (Just PeerState m
ColdPeer) =
PeerState m -> Maybe (PeerState m)
forall a. a -> Maybe a
Just (Set (Async m ()) -> Set (Async m ()) -> PeerState m
forall (m :: * -> *).
Set (Async m ()) -> Set (Async m ()) -> PeerState m
HotPeer Set (Async m ())
forall a. Set a
Set.empty (Async m () -> Set (Async m ())
forall a. a -> Set a
Set.singleton Async m ()
tid))
fn (Just PeerState m
ps) =
Bool -> Maybe (PeerState m) -> Maybe (PeerState m)
forall a. HasCallStack => Bool -> a -> a
assert Bool
False (Maybe (PeerState m) -> Maybe (PeerState m))
-> Maybe (PeerState m) -> Maybe (PeerState m)
forall a b. (a -> b) -> a -> b
$ PeerState m -> Maybe (PeerState m)
forall a. a -> Maybe a
Just PeerState m
ps
unregisterConsumer :: forall m addr.
( Ord addr
, Ord (Async m ())
)
=> addr
-> Async m ()
-> PeerStates m addr
-> PeerStates m addr
unregisterConsumer :: addr -> Async m () -> PeerStates m addr -> PeerStates m addr
unregisterConsumer addr
_addr Async m ()
_tid ps :: PeerStates m addr
ps@ThrowException{} = PeerStates m addr
ps
unregisterConsumer addr
addr Async m ()
tid (PeerStates Map addr (PeerState m)
peerStates) =
Map addr (PeerState m) -> PeerStates m addr
forall addr (m :: * -> *).
Map addr (PeerState m) -> PeerStates m addr
PeerStates (Map addr (PeerState m) -> PeerStates m addr)
-> Map addr (PeerState m) -> PeerStates m addr
forall a b. (a -> b) -> a -> b
$ (Maybe (PeerState m) -> Maybe (PeerState m))
-> addr -> Map addr (PeerState m) -> Map addr (PeerState m)
forall k a.
Ord k =>
(Maybe a -> Maybe a) -> k -> Map k a -> Map k a
Map.alter Maybe (PeerState m) -> Maybe (PeerState m)
fn addr
addr Map addr (PeerState m)
peerStates
where
fn :: Maybe (PeerState m) -> Maybe (PeerState m)
fn :: Maybe (PeerState m) -> Maybe (PeerState m)
fn Maybe (PeerState m)
Nothing = Maybe (PeerState m)
forall a. Maybe a
Nothing
fn (Just (HotPeer Set (Async m ())
producers Set (Async m ())
consumers)) =
let consumers' :: Set (Async m ())
consumers' = Async m ()
tid Async m () -> Set (Async m ()) -> Set (Async m ())
forall a. Ord a => a -> Set a -> Set a
`Set.delete` Set (Async m ())
consumers
in if Set (Async m ()) -> Bool
forall a. Set a -> Bool
Set.null Set (Async m ())
producers Bool -> Bool -> Bool
&& Set (Async m ()) -> Bool
forall a. Set a -> Bool
Set.null Set (Async m ())
consumers'
then Maybe (PeerState m)
forall a. Maybe a
Nothing
else PeerState m -> Maybe (PeerState m)
forall a. a -> Maybe a
Just (Set (Async m ()) -> Set (Async m ()) -> PeerState m
forall (m :: * -> *).
Set (Async m ()) -> Set (Async m ()) -> PeerState m
HotPeer Set (Async m ())
producers Set (Async m ())
consumers')
fn (Just PeerState m
ColdPeer) = Maybe (PeerState m)
forall a. Maybe a
Nothing
fn (Just PeerState m
ps) = PeerState m -> Maybe (PeerState m)
forall a. a -> Maybe a
Just PeerState m
ps
data ConnectDecision s
= AllowConnection !s
| DisallowConnection !s
deriving a -> ConnectDecision b -> ConnectDecision a
(a -> b) -> ConnectDecision a -> ConnectDecision b
(forall a b. (a -> b) -> ConnectDecision a -> ConnectDecision b)
-> (forall a b. a -> ConnectDecision b -> ConnectDecision a)
-> Functor ConnectDecision
forall a b. a -> ConnectDecision b -> ConnectDecision a
forall a b. (a -> b) -> ConnectDecision a -> ConnectDecision b
forall (f :: * -> *).
(forall a b. (a -> b) -> f a -> f b)
-> (forall a b. a -> f b -> f a) -> Functor f
<$ :: a -> ConnectDecision b -> ConnectDecision a
$c<$ :: forall a b. a -> ConnectDecision b -> ConnectDecision a
fmap :: (a -> b) -> ConnectDecision a -> ConnectDecision b
$cfmap :: forall a b. (a -> b) -> ConnectDecision a -> ConnectDecision b
Functor
type BeforeConnect m s addr = Time -> addr -> s -> STM m (ConnectDecision s)
runBeforeConnect :: ( MonadSTM m
, MonadTime m
)
=> StrictTVar m s
-> BeforeConnect m s addr
-> addr
-> m Bool
runBeforeConnect :: StrictTVar m s -> BeforeConnect m s addr -> addr -> m Bool
runBeforeConnect StrictTVar m s
sVar BeforeConnect m s addr
beforeConnect addr
addr = do
Time
t <- m Time
forall (m :: * -> *). MonadMonotonicTime m => m Time
getMonotonicTime
STM m Bool -> m Bool
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
STM m a -> m a
atomically (STM m Bool -> m Bool) -> STM m Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ do
ConnectDecision s
d <- StrictTVar m s -> STM m s
forall (m :: * -> *) a. MonadSTM m => StrictTVar m a -> STM m a
readTVar StrictTVar m s
sVar STM m s
-> (s -> STM m (ConnectDecision s)) -> STM m (ConnectDecision s)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= BeforeConnect m s addr
beforeConnect Time
t addr
addr
case ConnectDecision s
d of
AllowConnection s
s -> Bool
True Bool -> STM m () -> STM m Bool
forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ StrictTVar m s -> s -> STM m ()
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
StrictTVar m a -> a -> STM m ()
writeTVar StrictTVar m s
sVar s
s
DisallowConnection s
s -> Bool
False Bool -> STM m () -> STM m Bool
forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ StrictTVar m s -> s -> STM m ()
forall (m :: * -> *) a.
(MonadSTM m, HasCallStack) =>
StrictTVar m a -> a -> STM m ()
writeTVar StrictTVar m s
sVar s
s
beforeConnectTx
:: forall m addr.
( MonadSTM m
, Ord addr
)
=> BeforeConnect m
(PeerStates m addr)
addr
beforeConnectTx :: BeforeConnect m (PeerStates m addr) addr
beforeConnectTx Time
_t addr
_addr ps :: PeerStates m addr
ps@ThrowException{} = ConnectDecision (PeerStates m addr)
-> STM m (ConnectDecision (PeerStates m addr))
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ConnectDecision (PeerStates m addr)
-> STM m (ConnectDecision (PeerStates m addr)))
-> ConnectDecision (PeerStates m addr)
-> STM m (ConnectDecision (PeerStates m addr))
forall a b. (a -> b) -> a -> b
$ PeerStates m addr -> ConnectDecision (PeerStates m addr)
forall s. s -> ConnectDecision s
DisallowConnection PeerStates m addr
ps
beforeConnectTx Time
t addr
addr (PeerStates Map addr (PeerState m)
s) =
case (Maybe (PeerState m) -> (ConnectDecision (), Maybe (PeerState m)))
-> addr
-> Map addr (PeerState m)
-> (Map addr (PeerState m), Maybe (ConnectDecision ()))
forall k s a.
Ord k =>
(Maybe a -> (s, Maybe a)) -> k -> Map k a -> (Map k a, Maybe s)
alterAndLookup Maybe (PeerState m) -> (ConnectDecision (), Maybe (PeerState m))
fn addr
addr Map addr (PeerState m)
s of
(Map addr (PeerState m)
s', Maybe (ConnectDecision ())
mbd) -> case Maybe (ConnectDecision ())
mbd of
Maybe (ConnectDecision ())
Nothing -> ConnectDecision (PeerStates m addr)
-> STM m (ConnectDecision (PeerStates m addr))
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ConnectDecision (PeerStates m addr)
-> STM m (ConnectDecision (PeerStates m addr)))
-> ConnectDecision (PeerStates m addr)
-> STM m (ConnectDecision (PeerStates m addr))
forall a b. (a -> b) -> a -> b
$ PeerStates m addr -> ConnectDecision (PeerStates m addr)
forall s. s -> ConnectDecision s
DisallowConnection (Map addr (PeerState m) -> PeerStates m addr
forall addr (m :: * -> *).
Map addr (PeerState m) -> PeerStates m addr
PeerStates Map addr (PeerState m)
s')
Just ConnectDecision ()
d -> ConnectDecision (PeerStates m addr)
-> STM m (ConnectDecision (PeerStates m addr))
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Map addr (PeerState m) -> PeerStates m addr
forall addr (m :: * -> *).
Map addr (PeerState m) -> PeerStates m addr
PeerStates Map addr (PeerState m)
s' PeerStates m addr
-> ConnectDecision () -> ConnectDecision (PeerStates m addr)
forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ ConnectDecision ()
d)
where
fn :: Maybe (PeerState m)
-> ( ConnectDecision ()
, Maybe (PeerState m)
)
fn :: Maybe (PeerState m) -> (ConnectDecision (), Maybe (PeerState m))
fn Maybe (PeerState m)
Nothing = ( () -> ConnectDecision ()
forall s. s -> ConnectDecision s
AllowConnection ()
, PeerState m -> Maybe (PeerState m)
forall a. a -> Maybe a
Just PeerState m
forall (m :: * -> *). PeerState m
ColdPeer
)
fn (Just p :: PeerState m
p@ColdPeer{}) = ( () -> ConnectDecision ()
forall s. s -> ConnectDecision s
AllowConnection ()
, PeerState m -> Maybe (PeerState m)
forall a. a -> Maybe a
Just PeerState m
p
)
fn (Just p :: PeerState m
p@(HotPeer Set (Async m ())
producers Set (Async m ())
consumers))
= if Set (Async m ()) -> Bool
forall a. Set a -> Bool
Set.null Set (Async m ())
producers Bool -> Bool -> Bool
&& Set (Async m ()) -> Bool
forall a. Set a -> Bool
Set.null Set (Async m ())
consumers
then ( () -> ConnectDecision ()
forall s. s -> ConnectDecision s
AllowConnection ()
, PeerState m -> Maybe (PeerState m)
forall a. a -> Maybe a
Just PeerState m
forall (m :: * -> *). PeerState m
ColdPeer
)
else ( () -> ConnectDecision ()
forall s. s -> ConnectDecision s
AllowConnection ()
, PeerState m -> Maybe (PeerState m)
forall a. a -> Maybe a
Just PeerState m
p
)
fn (Just p :: PeerState m
p@(SuspendedConsumer Set (Async m ())
producers Time
consT)) =
if Time
consT Time -> Time -> Bool
forall a. Ord a => a -> a -> Bool
< Time
t
then if Set (Async m ()) -> Bool
forall a. Set a -> Bool
Set.null Set (Async m ())
producers
then (() -> ConnectDecision ()
forall s. s -> ConnectDecision s
AllowConnection (), PeerState m -> Maybe (PeerState m)
forall a. a -> Maybe a
Just PeerState m
forall (m :: * -> *). PeerState m
ColdPeer)
else (() -> ConnectDecision ()
forall s. s -> ConnectDecision s
AllowConnection (), PeerState m -> Maybe (PeerState m)
forall a. a -> Maybe a
Just (Set (Async m ()) -> Set (Async m ()) -> PeerState m
forall (m :: * -> *).
Set (Async m ()) -> Set (Async m ()) -> PeerState m
HotPeer Set (Async m ())
producers Set (Async m ())
forall a. Set a
Set.empty))
else (() -> ConnectDecision ()
forall s. s -> ConnectDecision s
DisallowConnection (), PeerState m -> Maybe (PeerState m)
forall a. a -> Maybe a
Just PeerState m
p)
fn (Just p :: PeerState m
p@(SuspendedPeer Time
prodT Time
consT)) =
if Time
t Time -> Time -> Bool
forall a. Ord a => a -> a -> Bool
< Time
prodT Time -> Time -> Time
forall a. Ord a => a -> a -> a
`max` Time
consT
then if Time
t Time -> Time -> Bool
forall a. Ord a => a -> a -> Bool
< Time
prodT Time -> Time -> Time
forall a. Ord a => a -> a -> a
`min` Time
consT
then (() -> ConnectDecision ()
forall s. s -> ConnectDecision s
DisallowConnection (), PeerState m -> Maybe (PeerState m)
forall a. a -> Maybe a
Just PeerState m
p)
else if Time
prodT Time -> Time -> Bool
forall a. Ord a => a -> a -> Bool
< Time
consT
then
(() -> ConnectDecision ()
forall s. s -> ConnectDecision s
DisallowConnection (), PeerState m -> Maybe (PeerState m)
forall a. a -> Maybe a
Just (PeerState m -> Maybe (PeerState m))
-> PeerState m -> Maybe (PeerState m)
forall a b. (a -> b) -> a -> b
$ Set (Async m ()) -> Time -> PeerState m
forall (m :: * -> *). Set (Async m ()) -> Time -> PeerState m
SuspendedConsumer Set (Async m ())
forall a. Set a
Set.empty Time
consT)
else
(() -> ConnectDecision ()
forall s. s -> ConnectDecision s
DisallowConnection (), PeerState m -> Maybe (PeerState m)
forall a. a -> Maybe a
Just (PeerState m -> Maybe (PeerState m))
-> PeerState m -> Maybe (PeerState m)
forall a b. (a -> b) -> a -> b
$ Time -> Time -> PeerState m
forall (m :: * -> *). Time -> Time -> PeerState m
SuspendedPeer Time
prodT Time
prodT)
else (() -> ConnectDecision ()
forall s. s -> ConnectDecision s
AllowConnection (), PeerState m -> Maybe (PeerState m)
forall a. a -> Maybe a
Just PeerState m
forall (m :: * -> *). PeerState m
ColdPeer)