{-# LANGUAGE DataKinds #-} {-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE GADTs #-} {-# LANGUAGE InstanceSigs #-} {-# LANGUAGE KindSignatures #-} {-# LANGUAGE NamedFieldPuns #-} {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE ScopedTypeVariables #-} {-# LANGUAGE TupleSections #-} {-# LANGUAGE TypeApplications #-} module Control.Concurrent.STM.Extras.Stream ( STMStream , readOne , readN , foldM , unfold , unfoldOn , singleton , dedupe ) where import Control.Applicative (Alternative (..), Applicative (..)) import Control.Concurrent.STM (STM) import Control.Concurrent.STM qualified as STM import Control.Monad (guard) import Data.Bifunctor (Bifunctor (..)) import Data.Foldable (traverse_) import Numeric.Natural (Natural) -- | An STM stream of 'a's (poor man's pull-based FRP) newtype STMStream a = STMStream{ STMStream a -> STM (a, Maybe (STMStream a)) unSTMStream :: STM (a, Maybe (STMStream a)) } deriving a -> STMStream b -> STMStream a (a -> b) -> STMStream a -> STMStream b (forall a b. (a -> b) -> STMStream a -> STMStream b) -> (forall a b. a -> STMStream b -> STMStream a) -> Functor STMStream forall a b. a -> STMStream b -> STMStream a forall a b. (a -> b) -> STMStream a -> STMStream b forall (f :: * -> *). (forall a b. (a -> b) -> f a -> f b) -> (forall a b. a -> f b -> f a) -> Functor f <$ :: a -> STMStream b -> STMStream a $c<$ :: forall a b. a -> STMStream b -> STMStream a fmap :: (a -> b) -> STMStream a -> STMStream b $cfmap :: forall a b. (a -> b) -> STMStream a -> STMStream b Functor -- | Join a stream of streams by producing values from the latest stream only. joinStream :: STMStream (STMStream a) -> STMStream a joinStream :: STMStream (STMStream a) -> STMStream a joinStream STMStream{STM (STMStream a, Maybe (STMStream (STMStream a))) unSTMStream :: STM (STMStream a, Maybe (STMStream (STMStream a))) unSTMStream :: forall a. STMStream a -> STM (a, Maybe (STMStream a)) unSTMStream} = STM (a, Maybe (STMStream a)) -> STMStream a forall a. STM (a, Maybe (STMStream a)) -> STMStream a STMStream (STM (a, Maybe (STMStream a)) -> STMStream a) -> STM (a, Maybe (STMStream a)) -> STMStream a forall a b. (a -> b) -> a -> b $ STM (STMStream a, Maybe (STMStream (STMStream a))) unSTMStream STM (STMStream a, Maybe (STMStream (STMStream a))) -> ((STMStream a, Maybe (STMStream (STMStream a))) -> STM (a, Maybe (STMStream a))) -> STM (a, Maybe (STMStream a)) forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b >>= (STMStream a, Maybe (STMStream (STMStream a))) -> STM (a, Maybe (STMStream a)) forall a. (STMStream a, Maybe (STMStream (STMStream a))) -> STM (a, Maybe (STMStream a)) go where go :: (STMStream a, Maybe (STMStream (STMStream a))) -> STM (a, Maybe (STMStream a)) go :: (STMStream a, Maybe (STMStream (STMStream a))) -> STM (a, Maybe (STMStream a)) go (STMStream STM (a, Maybe (STMStream a)) currentStream, Maybe (STMStream (STMStream a)) Nothing) = STM (a, Maybe (STMStream a)) currentStream go (STMStream STM (a, Maybe (STMStream a)) currentStream, Just ns :: STMStream (STMStream a) ns@(STMStream STM (STMStream a, Maybe (STMStream (STMStream a))) nextStream)) = do Either (a, Maybe (STMStream a)) (STMStream a, Maybe (STMStream (STMStream a))) vl <- ((a, Maybe (STMStream a)) -> Either (a, Maybe (STMStream a)) (STMStream a, Maybe (STMStream (STMStream a)))) -> STM (a, Maybe (STMStream a)) -> STM (Either (a, Maybe (STMStream a)) (STMStream a, Maybe (STMStream (STMStream a)))) forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b fmap (a, Maybe (STMStream a)) -> Either (a, Maybe (STMStream a)) (STMStream a, Maybe (STMStream (STMStream a))) forall a b. a -> Either a b Left STM (a, Maybe (STMStream a)) currentStream STM (Either (a, Maybe (STMStream a)) (STMStream a, Maybe (STMStream (STMStream a)))) -> STM (Either (a, Maybe (STMStream a)) (STMStream a, Maybe (STMStream (STMStream a)))) -> STM (Either (a, Maybe (STMStream a)) (STMStream a, Maybe (STMStream (STMStream a)))) forall (f :: * -> *) a. Alternative f => f a -> f a -> f a <|> ((STMStream a, Maybe (STMStream (STMStream a))) -> Either (a, Maybe (STMStream a)) (STMStream a, Maybe (STMStream (STMStream a)))) -> STM (STMStream a, Maybe (STMStream (STMStream a))) -> STM (Either (a, Maybe (STMStream a)) (STMStream a, Maybe (STMStream (STMStream a)))) forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b fmap (STMStream a, Maybe (STMStream (STMStream a))) -> Either (a, Maybe (STMStream a)) (STMStream a, Maybe (STMStream (STMStream a))) forall a b. b -> Either a b Right STM (STMStream a, Maybe (STMStream (STMStream a))) nextStream case Either (a, Maybe (STMStream a)) (STMStream a, Maybe (STMStream (STMStream a))) vl of Left (a a, Just STMStream a currentStream') -> (a, Maybe (STMStream a)) -> STM (a, Maybe (STMStream a)) forall (f :: * -> *) a. Applicative f => a -> f a pure (a a, STMStream a -> Maybe (STMStream a) forall a. a -> Maybe a Just (STMStream a -> Maybe (STMStream a)) -> STMStream a -> Maybe (STMStream a) forall a b. (a -> b) -> a -> b $ STM (a, Maybe (STMStream a)) -> STMStream a forall a. STM (a, Maybe (STMStream a)) -> STMStream a STMStream (STM (a, Maybe (STMStream a)) -> STMStream a) -> STM (a, Maybe (STMStream a)) -> STMStream a forall a b. (a -> b) -> a -> b $ (STMStream a, Maybe (STMStream (STMStream a))) -> STM (a, Maybe (STMStream a)) forall a. (STMStream a, Maybe (STMStream (STMStream a))) -> STM (a, Maybe (STMStream a)) go (STMStream a currentStream', STMStream (STMStream a) -> Maybe (STMStream (STMStream a)) forall a. a -> Maybe a Just STMStream (STMStream a) ns)) Left (a a, Maybe (STMStream a) Nothing) -> (a, Maybe (STMStream a)) -> STM (a, Maybe (STMStream a)) forall (f :: * -> *) a. Applicative f => a -> f a pure (a a, STMStream a -> Maybe (STMStream a) forall a. a -> Maybe a Just (STMStream a -> Maybe (STMStream a)) -> STMStream a -> Maybe (STMStream a) forall a b. (a -> b) -> a -> b $ STMStream (STMStream a) -> STMStream a forall a. STMStream (STMStream a) -> STMStream a joinStream STMStream (STMStream a) ns) Right (STMStream a, Maybe (STMStream (STMStream a))) ns' -> (STMStream a, Maybe (STMStream (STMStream a))) -> STM (a, Maybe (STMStream a)) forall a. (STMStream a, Maybe (STMStream (STMStream a))) -> STM (a, Maybe (STMStream a)) go (STMStream a, Maybe (STMStream (STMStream a))) ns' -- | Build a stream containing only one element. singleton :: STM a -> STMStream a singleton :: STM a -> STMStream a singleton = STM (a, Maybe (STMStream a)) -> STMStream a forall a. STM (a, Maybe (STMStream a)) -> STMStream a STMStream (STM (a, Maybe (STMStream a)) -> STMStream a) -> (STM a -> STM (a, Maybe (STMStream a))) -> STM a -> STMStream a forall b c a. (b -> c) -> (a -> b) -> a -> c . (a -> (a, Maybe (STMStream a))) -> STM a -> STM (a, Maybe (STMStream a)) forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b fmap (, Maybe (STMStream a) forall a. Maybe a Nothing) instance Applicative STMStream where pure :: a -> STMStream a pure = STM a -> STMStream a forall a. STM a -> STMStream a singleton (STM a -> STMStream a) -> (a -> STM a) -> a -> STMStream a forall b c a. (b -> c) -> (a -> b) -> a -> c . a -> STM a forall (f :: * -> *) a. Applicative f => a -> f a pure -- | Updates when one of the two sides updates liftA2 :: forall a b c. (a -> b -> c) -> STMStream a -> STMStream b -> STMStream c liftA2 :: (a -> b -> c) -> STMStream a -> STMStream b -> STMStream c liftA2 a -> b -> c f (STMStream STM (a, Maybe (STMStream a)) l) (STMStream STM (b, Maybe (STMStream b)) r) = STM (c, Maybe (STMStream c)) -> STMStream c forall a. STM (a, Maybe (STMStream a)) -> STMStream a STMStream (STM (c, Maybe (STMStream c)) -> STMStream c) -> STM (c, Maybe (STMStream c)) -> STMStream c forall a b. (a -> b) -> a -> b $ do ((a, Maybe (STMStream a)), (b, Maybe (STMStream b))) x <- (,) ((a, Maybe (STMStream a)) -> (b, Maybe (STMStream b)) -> ((a, Maybe (STMStream a)), (b, Maybe (STMStream b)))) -> STM (a, Maybe (STMStream a)) -> STM ((b, Maybe (STMStream b)) -> ((a, Maybe (STMStream a)), (b, Maybe (STMStream b)))) forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b <$> STM (a, Maybe (STMStream a)) l STM ((b, Maybe (STMStream b)) -> ((a, Maybe (STMStream a)), (b, Maybe (STMStream b)))) -> STM (b, Maybe (STMStream b)) -> STM ((a, Maybe (STMStream a)), (b, Maybe (STMStream b))) forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b <*> STM (b, Maybe (STMStream b)) r let go :: ((a, Maybe (STMStream a)), (b, Maybe (STMStream b))) -> STM (c, Maybe (STMStream c)) go :: ((a, Maybe (STMStream a)), (b, Maybe (STMStream b))) -> STM (c, Maybe (STMStream c)) go ((a currentL, Just (STMStream STM (a, Maybe (STMStream a)) restL)), (b currentR, Just (STMStream STM (b, Maybe (STMStream b)) restR))) = do let next :: STM (c, Maybe (STMStream c)) next = do Either (a, Maybe (STMStream a)) (b, Maybe (STMStream b)) v <- ((a, Maybe (STMStream a)) -> Either (a, Maybe (STMStream a)) (b, Maybe (STMStream b))) -> STM (a, Maybe (STMStream a)) -> STM (Either (a, Maybe (STMStream a)) (b, Maybe (STMStream b))) forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b fmap (a, Maybe (STMStream a)) -> Either (a, Maybe (STMStream a)) (b, Maybe (STMStream b)) forall a b. a -> Either a b Left STM (a, Maybe (STMStream a)) restL STM (Either (a, Maybe (STMStream a)) (b, Maybe (STMStream b))) -> STM (Either (a, Maybe (STMStream a)) (b, Maybe (STMStream b))) -> STM (Either (a, Maybe (STMStream a)) (b, Maybe (STMStream b))) forall (f :: * -> *) a. Alternative f => f a -> f a -> f a <|> ((b, Maybe (STMStream b)) -> Either (a, Maybe (STMStream a)) (b, Maybe (STMStream b))) -> STM (b, Maybe (STMStream b)) -> STM (Either (a, Maybe (STMStream a)) (b, Maybe (STMStream b))) forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b fmap (b, Maybe (STMStream b)) -> Either (a, Maybe (STMStream a)) (b, Maybe (STMStream b)) forall a b. b -> Either a b Right STM (b, Maybe (STMStream b)) restR case Either (a, Maybe (STMStream a)) (b, Maybe (STMStream b)) v of Left (a newL, Maybe (STMStream a) restL') -> ((a, Maybe (STMStream a)), (b, Maybe (STMStream b))) -> STM (c, Maybe (STMStream c)) go ((a newL, Maybe (STMStream a) restL'), (b currentR, STMStream b -> Maybe (STMStream b) forall a. a -> Maybe a Just (STMStream b -> Maybe (STMStream b)) -> STMStream b -> Maybe (STMStream b) forall a b. (a -> b) -> a -> b $ STM (b, Maybe (STMStream b)) -> STMStream b forall a. STM (a, Maybe (STMStream a)) -> STMStream a STMStream STM (b, Maybe (STMStream b)) restR)) Right (b newR, Maybe (STMStream b) restR') -> ((a, Maybe (STMStream a)), (b, Maybe (STMStream b))) -> STM (c, Maybe (STMStream c)) go ((a currentL, STMStream a -> Maybe (STMStream a) forall a. a -> Maybe a Just (STMStream a -> Maybe (STMStream a)) -> STMStream a -> Maybe (STMStream a) forall a b. (a -> b) -> a -> b $ STM (a, Maybe (STMStream a)) -> STMStream a forall a. STM (a, Maybe (STMStream a)) -> STMStream a STMStream STM (a, Maybe (STMStream a)) restL), (b newR, Maybe (STMStream b) restR')) (c, Maybe (STMStream c)) -> STM (c, Maybe (STMStream c)) forall (f :: * -> *) a. Applicative f => a -> f a pure (a -> b -> c f a currentL b currentR, STMStream c -> Maybe (STMStream c) forall a. a -> Maybe a Just (STMStream c -> Maybe (STMStream c)) -> STMStream c -> Maybe (STMStream c) forall a b. (a -> b) -> a -> b $ STM (c, Maybe (STMStream c)) -> STMStream c forall a. STM (a, Maybe (STMStream a)) -> STMStream a STMStream STM (c, Maybe (STMStream c)) next) go ((a currentL, Just (STMStream STM (a, Maybe (STMStream a)) restL)), (b currentR, Maybe (STMStream b) Nothing)) = let apply :: a -> c apply = (a -> b -> c) -> b -> a -> c forall a b c. (a -> b -> c) -> b -> a -> c flip a -> b -> c f b currentR in (c, Maybe (STMStream c)) -> STM (c, Maybe (STMStream c)) forall (f :: * -> *) a. Applicative f => a -> f a pure (a -> b -> c f a currentL b currentR, STMStream c -> Maybe (STMStream c) forall a. a -> Maybe a Just (STMStream c -> Maybe (STMStream c)) -> STMStream c -> Maybe (STMStream c) forall a b. (a -> b) -> a -> b $ STM (c, Maybe (STMStream c)) -> STMStream c forall a. STM (a, Maybe (STMStream a)) -> STMStream a STMStream (STM (c, Maybe (STMStream c)) -> STMStream c) -> STM (c, Maybe (STMStream c)) -> STMStream c forall a b. (a -> b) -> a -> b $ ((a, Maybe (STMStream a)) -> (c, Maybe (STMStream c))) -> STM (a, Maybe (STMStream a)) -> STM (c, Maybe (STMStream c)) forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b fmap ((a -> c) -> (a, Maybe (STMStream c)) -> (c, Maybe (STMStream c)) forall (p :: * -> * -> *) a b c. Bifunctor p => (a -> b) -> p a c -> p b c first a -> c apply ((a, Maybe (STMStream c)) -> (c, Maybe (STMStream c))) -> ((a, Maybe (STMStream a)) -> (a, Maybe (STMStream c))) -> (a, Maybe (STMStream a)) -> (c, Maybe (STMStream c)) forall b c a. (b -> c) -> (a -> b) -> a -> c . (Maybe (STMStream a) -> Maybe (STMStream c)) -> (a, Maybe (STMStream a)) -> (a, Maybe (STMStream c)) forall (p :: * -> * -> *) b c a. Bifunctor p => (b -> c) -> p a b -> p a c second ((STMStream a -> STMStream c) -> Maybe (STMStream a) -> Maybe (STMStream c) forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b fmap ((a -> c) -> STMStream a -> STMStream c forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b fmap a -> c apply))) STM (a, Maybe (STMStream a)) restL) go ((a currentL, Maybe (STMStream a) Nothing), (b currentR, Just (STMStream STM (b, Maybe (STMStream b)) restR))) = let apply :: b -> c apply = a -> b -> c f a currentL in (c, Maybe (STMStream c)) -> STM (c, Maybe (STMStream c)) forall (f :: * -> *) a. Applicative f => a -> f a pure (a -> b -> c f a currentL b currentR, STMStream c -> Maybe (STMStream c) forall a. a -> Maybe a Just (STMStream c -> Maybe (STMStream c)) -> STMStream c -> Maybe (STMStream c) forall a b. (a -> b) -> a -> b $ STM (c, Maybe (STMStream c)) -> STMStream c forall a. STM (a, Maybe (STMStream a)) -> STMStream a STMStream (STM (c, Maybe (STMStream c)) -> STMStream c) -> STM (c, Maybe (STMStream c)) -> STMStream c forall a b. (a -> b) -> a -> b $ ((b, Maybe (STMStream b)) -> (c, Maybe (STMStream c))) -> STM (b, Maybe (STMStream b)) -> STM (c, Maybe (STMStream c)) forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b fmap ((b -> c) -> (b, Maybe (STMStream c)) -> (c, Maybe (STMStream c)) forall (p :: * -> * -> *) a b c. Bifunctor p => (a -> b) -> p a c -> p b c first b -> c apply ((b, Maybe (STMStream c)) -> (c, Maybe (STMStream c))) -> ((b, Maybe (STMStream b)) -> (b, Maybe (STMStream c))) -> (b, Maybe (STMStream b)) -> (c, Maybe (STMStream c)) forall b c a. (b -> c) -> (a -> b) -> a -> c . (Maybe (STMStream b) -> Maybe (STMStream c)) -> (b, Maybe (STMStream b)) -> (b, Maybe (STMStream c)) forall (p :: * -> * -> *) b c a. Bifunctor p => (b -> c) -> p a b -> p a c second ((STMStream b -> STMStream c) -> Maybe (STMStream b) -> Maybe (STMStream c) forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b fmap ((b -> c) -> STMStream b -> STMStream c forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b fmap b -> c apply))) STM (b, Maybe (STMStream b)) restR) go ((a currentL, Maybe (STMStream a) Nothing), (b currentR, Maybe (STMStream b) Nothing)) = (c, Maybe (STMStream c)) -> STM (c, Maybe (STMStream c)) forall (f :: * -> *) a. Applicative f => a -> f a pure (a -> b -> c f a currentL b currentR, Maybe (STMStream c) forall a. Maybe a Nothing) ((a, Maybe (STMStream a)), (b, Maybe (STMStream b))) -> STM (c, Maybe (STMStream c)) go ((a, Maybe (STMStream a)), (b, Maybe (STMStream b))) x instance Monad STMStream where STMStream a x >>= :: STMStream a -> (a -> STMStream b) -> STMStream b >>= a -> STMStream b f = STMStream (STMStream b) -> STMStream b forall a. STMStream (STMStream a) -> STMStream a joinStream (a -> STMStream b f (a -> STMStream b) -> STMStream a -> STMStream (STMStream b) forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b <$> STMStream a x) instance Eq a => Semigroup (STMStream a) where -- Note: We remove consecutive duplicates from the stream, so it is now -- never possible to construct a stream like: 1,1,1,1... ls :: STMStream a ls@(STMStream STM (a, Maybe (STMStream a)) l) <> :: STMStream a -> STMStream a -> STMStream a <> rs :: STMStream a rs@(STMStream STM (a, Maybe (STMStream a)) r) = STMStream a -> STMStream a forall a. Eq a => STMStream a -> STMStream a dedupe (STMStream a -> STMStream a) -> STMStream a -> STMStream a forall a b. (a -> b) -> a -> b $ STM (a, Maybe (STMStream a)) -> STMStream a forall a. STM (a, Maybe (STMStream a)) -> STMStream a STMStream (STM (a, Maybe (STMStream a)) -> STMStream a) -> STM (a, Maybe (STMStream a)) -> STMStream a forall a b. (a -> b) -> a -> b $ do Either (a, Maybe (STMStream a)) (a, Maybe (STMStream a)) next <- ((a, Maybe (STMStream a)) -> Either (a, Maybe (STMStream a)) (a, Maybe (STMStream a))) -> STM (a, Maybe (STMStream a)) -> STM (Either (a, Maybe (STMStream a)) (a, Maybe (STMStream a))) forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b fmap (a, Maybe (STMStream a)) -> Either (a, Maybe (STMStream a)) (a, Maybe (STMStream a)) forall a b. a -> Either a b Left STM (a, Maybe (STMStream a)) l STM (Either (a, Maybe (STMStream a)) (a, Maybe (STMStream a))) -> STM (Either (a, Maybe (STMStream a)) (a, Maybe (STMStream a))) -> STM (Either (a, Maybe (STMStream a)) (a, Maybe (STMStream a))) forall (f :: * -> *) a. Alternative f => f a -> f a -> f a <|> ((a, Maybe (STMStream a)) -> Either (a, Maybe (STMStream a)) (a, Maybe (STMStream a))) -> STM (a, Maybe (STMStream a)) -> STM (Either (a, Maybe (STMStream a)) (a, Maybe (STMStream a))) forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b fmap (a, Maybe (STMStream a)) -> Either (a, Maybe (STMStream a)) (a, Maybe (STMStream a)) forall a b. b -> Either a b Right STM (a, Maybe (STMStream a)) r case Either (a, Maybe (STMStream a)) (a, Maybe (STMStream a)) next of Left (a v, Maybe (STMStream a) k) -> (a, Maybe (STMStream a)) -> STM (a, Maybe (STMStream a)) forall (f :: * -> *) a. Applicative f => a -> f a pure (a v, Maybe (STMStream a) k Maybe (STMStream a) -> Maybe (STMStream a) -> Maybe (STMStream a) forall a. Semigroup a => a -> a -> a <> STMStream a -> Maybe (STMStream a) forall a. a -> Maybe a Just STMStream a rs) Right (a v, Maybe (STMStream a) k) -> (a, Maybe (STMStream a)) -> STM (a, Maybe (STMStream a)) forall (f :: * -> *) a. Applicative f => a -> f a pure (a v, STMStream a -> Maybe (STMStream a) forall a. a -> Maybe a Just STMStream a ls Maybe (STMStream a) -> Maybe (STMStream a) -> Maybe (STMStream a) forall a. Semigroup a => a -> a -> a <> Maybe (STMStream a) k) instance Eq a => Monoid (STMStream a) where mappend :: STMStream a -> STMStream a -> STMStream a mappend = STMStream a -> STMStream a -> STMStream a forall a. Semigroup a => a -> a -> a (<>) mempty :: STMStream a mempty = STM (a, Maybe (STMStream a)) -> STMStream a forall a. STM (a, Maybe (STMStream a)) -> STMStream a STMStream STM (a, Maybe (STMStream a)) forall a. STM a STM.retry -- | Remove consecutive duplicates from a stream. dedupe :: forall a. Eq a => STMStream a -> STMStream a dedupe :: STMStream a -> STMStream a dedupe = STM (a, Maybe (STMStream a)) -> STMStream a forall a. STM (a, Maybe (STMStream a)) -> STMStream a STMStream (STM (a, Maybe (STMStream a)) -> STMStream a) -> (STMStream a -> STM (a, Maybe (STMStream a))) -> STMStream a -> STMStream a forall b c a. (b -> c) -> (a -> b) -> a -> c . Maybe a -> STMStream a -> STM (a, Maybe (STMStream a)) go Maybe a forall a. Maybe a Nothing where go :: Maybe a -> STMStream a -> STM (a, Maybe (STMStream a)) go :: Maybe a -> STMStream a -> STM (a, Maybe (STMStream a)) go Maybe a lastVl (STMStream STM (a, Maybe (STMStream a)) s) = do (a next, Maybe (STMStream a) ms) <- STM (a, Maybe (STMStream a)) s case Maybe (STMStream a) ms of Maybe (STMStream a) Nothing -> (a, Maybe (STMStream a)) -> STM (a, Maybe (STMStream a)) forall (f :: * -> *) a. Applicative f => a -> f a pure (a next, Maybe (STMStream a) forall a. Maybe a Nothing) Just STMStream a s' -> if a -> Maybe a forall a. a -> Maybe a Just a next Maybe a -> Maybe a -> Bool forall a. Eq a => a -> a -> Bool /= Maybe a lastVl then (a, Maybe (STMStream a)) -> STM (a, Maybe (STMStream a)) forall (f :: * -> *) a. Applicative f => a -> f a pure (a next, STMStream a -> Maybe (STMStream a) forall a. a -> Maybe a Just (STMStream a -> Maybe (STMStream a)) -> STMStream a -> Maybe (STMStream a) forall a b. (a -> b) -> a -> b $ STM (a, Maybe (STMStream a)) -> STMStream a forall a. STM (a, Maybe (STMStream a)) -> STMStream a STMStream (STM (a, Maybe (STMStream a)) -> STMStream a) -> STM (a, Maybe (STMStream a)) -> STMStream a forall a b. (a -> b) -> a -> b $ Maybe a -> STMStream a -> STM (a, Maybe (STMStream a)) go (a -> Maybe a forall a. a -> Maybe a Just a next) STMStream a s') else Maybe a -> STMStream a -> STM (a, Maybe (STMStream a)) go Maybe a lastVl STMStream a s' -- | Produce an infinite stream of values from an STM (i.e. watch it for -- updates). Uses the Eq instance to not output the same value twice in a -- row. unfold :: forall a. Eq a => STM a -> STMStream a unfold :: STM a -> STMStream a unfold = (a -> a) -> STM a -> STMStream a forall a b. Eq b => (a -> b) -> STM a -> STMStream a unfoldOn a -> a forall a. a -> a id -- | Produce an infinite stream of values from an STM (i.e. watch it for -- updates). Uses the Eq instance of 'b' to not output the same value twice in a -- row. unfoldOn :: forall a b. Eq b => (a -> b) -> STM a -> STMStream a unfoldOn :: (a -> b) -> STM a -> STMStream a unfoldOn a -> b f STM a tx = STM (a, Maybe (STMStream a)) -> STMStream a forall a. STM (a, Maybe (STMStream a)) -> STMStream a STMStream (STM (a, Maybe (STMStream a)) -> STMStream a) -> STM (a, Maybe (STMStream a)) -> STMStream a forall a b. (a -> b) -> a -> b $ Maybe b -> STM (a, Maybe (STMStream a)) go Maybe b forall a. Maybe a Nothing where go :: Maybe b -> STM (a, Maybe (STMStream a)) go :: Maybe b -> STM (a, Maybe (STMStream a)) go Maybe b lastVl = do a next <- STM a tx let next' :: b next' = a -> b f a next (b -> STM ()) -> Maybe b -> STM () forall (t :: * -> *) (f :: * -> *) a b. (Foldable t, Applicative f) => (a -> f b) -> t a -> f () traverse_ (\b previous -> Bool -> STM () forall (f :: * -> *). Alternative f => Bool -> f () guard (b previous b -> b -> Bool forall a. Eq a => a -> a -> Bool /= b next')) Maybe b lastVl (a, Maybe (STMStream a)) -> STM (a, Maybe (STMStream a)) forall (f :: * -> *) a. Applicative f => a -> f a pure (a next, STMStream a -> Maybe (STMStream a) forall a. a -> Maybe a Just (STMStream a -> Maybe (STMStream a)) -> STMStream a -> Maybe (STMStream a) forall a b. (a -> b) -> a -> b $ STM (a, Maybe (STMStream a)) -> STMStream a forall a. STM (a, Maybe (STMStream a)) -> STMStream a STMStream (STM (a, Maybe (STMStream a)) -> STMStream a) -> STM (a, Maybe (STMStream a)) -> STMStream a forall a b. (a -> b) -> a -> b $ Maybe b -> STM (a, Maybe (STMStream a)) go (b -> Maybe b forall a. a -> Maybe a Just b next')) -- | Read the first event from the stream. readOne :: STMStream a -> IO (a, Maybe (STMStream a)) readOne :: STMStream a -> IO (a, Maybe (STMStream a)) readOne STMStream{STM (a, Maybe (STMStream a)) unSTMStream :: STM (a, Maybe (STMStream a)) unSTMStream :: forall a. STMStream a -> STM (a, Maybe (STMStream a)) unSTMStream} = STM (a, Maybe (STMStream a)) -> IO (a, Maybe (STMStream a)) forall a. STM a -> IO a STM.atomically STM (a, Maybe (STMStream a)) unSTMStream -- | Read a number of events from the stream. Blocks until all events -- have been received. readN :: Natural -> STMStream a -> IO [a] readN :: Natural -> STMStream a -> IO [a] readN Natural 0 STMStream a _ = [a] -> IO [a] forall (f :: * -> *) a. Applicative f => a -> f a pure [] readN Natural k STMStream a s = do (a a, Maybe (STMStream a) s') <- STMStream a -> IO (a, Maybe (STMStream a)) forall a. STMStream a -> IO (a, Maybe (STMStream a)) readOne STMStream a s case Maybe (STMStream a) s' of Maybe (STMStream a) Nothing -> [a] -> IO [a] forall (f :: * -> *) a. Applicative f => a -> f a pure [a a] Just STMStream a rest -> (:) a a ([a] -> [a]) -> IO [a] -> IO [a] forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b <$> Natural -> STMStream a -> IO [a] forall a. Natural -> STMStream a -> IO [a] readN (Natural -> Natural forall a. Enum a => a -> a pred Natural k) STMStream a rest -- | Consume a stream. Blocks until the stream has terminated. foldM :: STMStream a -- ^ The stream -> (a -> IO ()) -- ^ Event handler -> IO () -- ^ Handler for the end of the stream -> IO () foldM :: STMStream a -> (a -> IO ()) -> IO () -> IO () foldM STMStream a s a -> IO () handleEvent IO () handleStop = do (a v, Maybe (STMStream a) next) <- STMStream a -> IO (a, Maybe (STMStream a)) forall a. STMStream a -> IO (a, Maybe (STMStream a)) readOne STMStream a s a -> IO () handleEvent a v case Maybe (STMStream a) next of Maybe (STMStream a) Nothing -> IO () handleStop Just STMStream a s' -> STMStream a -> (a -> IO ()) -> IO () -> IO () forall a. STMStream a -> (a -> IO ()) -> IO () -> IO () foldM STMStream a s' a -> IO () handleEvent IO () handleStop