{-# LANGUAGE DataKinds           #-}
{-# LANGUAGE FlexibleContexts    #-}
{-# LANGUAGE GADTs               #-}
{-# LANGUAGE InstanceSigs        #-}
{-# LANGUAGE KindSignatures      #-}
{-# LANGUAGE NamedFieldPuns      #-}
{-# LANGUAGE OverloadedStrings   #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TupleSections       #-}
{-# LANGUAGE TypeApplications    #-}

module Control.Concurrent.STM.Extras.Stream
  ( STMStream
  , readOne
  , readN
  , foldM
  , unfold
  , unfoldOn
  , singleton
  , dedupe
  ) where

import Control.Applicative (Alternative (..), Applicative (..))
import Control.Concurrent.STM (STM)
import Control.Concurrent.STM qualified as STM
import Control.Monad (guard)
import Data.Bifunctor (Bifunctor (..))
import Data.Foldable (traverse_)
import Numeric.Natural (Natural)

-- | An STM stream of 'a's (poor man's pull-based FRP)
newtype STMStream a = STMStream{ STMStream a -> STM (a, Maybe (STMStream a))
unSTMStream :: STM (a, Maybe (STMStream a)) }
    deriving a -> STMStream b -> STMStream a
(a -> b) -> STMStream a -> STMStream b
(forall a b. (a -> b) -> STMStream a -> STMStream b)
-> (forall a b. a -> STMStream b -> STMStream a)
-> Functor STMStream
forall a b. a -> STMStream b -> STMStream a
forall a b. (a -> b) -> STMStream a -> STMStream b
forall (f :: * -> *).
(forall a b. (a -> b) -> f a -> f b)
-> (forall a b. a -> f b -> f a) -> Functor f
<$ :: a -> STMStream b -> STMStream a
$c<$ :: forall a b. a -> STMStream b -> STMStream a
fmap :: (a -> b) -> STMStream a -> STMStream b
$cfmap :: forall a b. (a -> b) -> STMStream a -> STMStream b
Functor

-- | Join a stream of streams by producing values from the latest stream only.
joinStream :: STMStream (STMStream a) -> STMStream a
joinStream :: STMStream (STMStream a) -> STMStream a
joinStream STMStream{STM (STMStream a, Maybe (STMStream (STMStream a)))
unSTMStream :: STM (STMStream a, Maybe (STMStream (STMStream a)))
unSTMStream :: forall a. STMStream a -> STM (a, Maybe (STMStream a))
unSTMStream} = STM (a, Maybe (STMStream a)) -> STMStream a
forall a. STM (a, Maybe (STMStream a)) -> STMStream a
STMStream (STM (a, Maybe (STMStream a)) -> STMStream a)
-> STM (a, Maybe (STMStream a)) -> STMStream a
forall a b. (a -> b) -> a -> b
$ STM (STMStream a, Maybe (STMStream (STMStream a)))
unSTMStream STM (STMStream a, Maybe (STMStream (STMStream a)))
-> ((STMStream a, Maybe (STMStream (STMStream a)))
    -> STM (a, Maybe (STMStream a)))
-> STM (a, Maybe (STMStream a))
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (STMStream a, Maybe (STMStream (STMStream a)))
-> STM (a, Maybe (STMStream a))
forall a.
(STMStream a, Maybe (STMStream (STMStream a)))
-> STM (a, Maybe (STMStream a))
go where

    go :: (STMStream a, Maybe (STMStream (STMStream a))) -> STM (a, Maybe (STMStream a))
    go :: (STMStream a, Maybe (STMStream (STMStream a)))
-> STM (a, Maybe (STMStream a))
go (STMStream STM (a, Maybe (STMStream a))
currentStream, Maybe (STMStream (STMStream a))
Nothing) = STM (a, Maybe (STMStream a))
currentStream
    go (STMStream STM (a, Maybe (STMStream a))
currentStream, Just ns :: STMStream (STMStream a)
ns@(STMStream STM (STMStream a, Maybe (STMStream (STMStream a)))
nextStream)) = do
        Either
  (a, Maybe (STMStream a))
  (STMStream a, Maybe (STMStream (STMStream a)))
vl <- ((a, Maybe (STMStream a))
 -> Either
      (a, Maybe (STMStream a))
      (STMStream a, Maybe (STMStream (STMStream a))))
-> STM (a, Maybe (STMStream a))
-> STM
     (Either
        (a, Maybe (STMStream a))
        (STMStream a, Maybe (STMStream (STMStream a))))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (a, Maybe (STMStream a))
-> Either
     (a, Maybe (STMStream a))
     (STMStream a, Maybe (STMStream (STMStream a)))
forall a b. a -> Either a b
Left STM (a, Maybe (STMStream a))
currentStream STM
  (Either
     (a, Maybe (STMStream a))
     (STMStream a, Maybe (STMStream (STMStream a))))
-> STM
     (Either
        (a, Maybe (STMStream a))
        (STMStream a, Maybe (STMStream (STMStream a))))
-> STM
     (Either
        (a, Maybe (STMStream a))
        (STMStream a, Maybe (STMStream (STMStream a))))
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> ((STMStream a, Maybe (STMStream (STMStream a)))
 -> Either
      (a, Maybe (STMStream a))
      (STMStream a, Maybe (STMStream (STMStream a))))
-> STM (STMStream a, Maybe (STMStream (STMStream a)))
-> STM
     (Either
        (a, Maybe (STMStream a))
        (STMStream a, Maybe (STMStream (STMStream a))))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (STMStream a, Maybe (STMStream (STMStream a)))
-> Either
     (a, Maybe (STMStream a))
     (STMStream a, Maybe (STMStream (STMStream a)))
forall a b. b -> Either a b
Right STM (STMStream a, Maybe (STMStream (STMStream a)))
nextStream
        case Either
  (a, Maybe (STMStream a))
  (STMStream a, Maybe (STMStream (STMStream a)))
vl of
            Left (a
a, Just STMStream a
currentStream') -> (a, Maybe (STMStream a)) -> STM (a, Maybe (STMStream a))
forall (f :: * -> *) a. Applicative f => a -> f a
pure (a
a, STMStream a -> Maybe (STMStream a)
forall a. a -> Maybe a
Just (STMStream a -> Maybe (STMStream a))
-> STMStream a -> Maybe (STMStream a)
forall a b. (a -> b) -> a -> b
$ STM (a, Maybe (STMStream a)) -> STMStream a
forall a. STM (a, Maybe (STMStream a)) -> STMStream a
STMStream (STM (a, Maybe (STMStream a)) -> STMStream a)
-> STM (a, Maybe (STMStream a)) -> STMStream a
forall a b. (a -> b) -> a -> b
$ (STMStream a, Maybe (STMStream (STMStream a)))
-> STM (a, Maybe (STMStream a))
forall a.
(STMStream a, Maybe (STMStream (STMStream a)))
-> STM (a, Maybe (STMStream a))
go (STMStream a
currentStream', STMStream (STMStream a) -> Maybe (STMStream (STMStream a))
forall a. a -> Maybe a
Just STMStream (STMStream a)
ns))
            Left (a
a, Maybe (STMStream a)
Nothing)             -> (a, Maybe (STMStream a)) -> STM (a, Maybe (STMStream a))
forall (f :: * -> *) a. Applicative f => a -> f a
pure (a
a, STMStream a -> Maybe (STMStream a)
forall a. a -> Maybe a
Just (STMStream a -> Maybe (STMStream a))
-> STMStream a -> Maybe (STMStream a)
forall a b. (a -> b) -> a -> b
$ STMStream (STMStream a) -> STMStream a
forall a. STMStream (STMStream a) -> STMStream a
joinStream STMStream (STMStream a)
ns)
            Right (STMStream a, Maybe (STMStream (STMStream a)))
ns'                     -> (STMStream a, Maybe (STMStream (STMStream a)))
-> STM (a, Maybe (STMStream a))
forall a.
(STMStream a, Maybe (STMStream (STMStream a)))
-> STM (a, Maybe (STMStream a))
go (STMStream a, Maybe (STMStream (STMStream a)))
ns'

-- | Build a stream containing only one element.
singleton :: STM a -> STMStream a
singleton :: STM a -> STMStream a
singleton = STM (a, Maybe (STMStream a)) -> STMStream a
forall a. STM (a, Maybe (STMStream a)) -> STMStream a
STMStream (STM (a, Maybe (STMStream a)) -> STMStream a)
-> (STM a -> STM (a, Maybe (STMStream a))) -> STM a -> STMStream a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (a -> (a, Maybe (STMStream a)))
-> STM a -> STM (a, Maybe (STMStream a))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (, Maybe (STMStream a)
forall a. Maybe a
Nothing)

instance Applicative STMStream where
    pure :: a -> STMStream a
pure = STM a -> STMStream a
forall a. STM a -> STMStream a
singleton (STM a -> STMStream a) -> (a -> STM a) -> a -> STMStream a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure
    -- | Updates when one of the two sides updates
    liftA2 :: forall a b c. (a -> b -> c) -> STMStream a -> STMStream b -> STMStream c
    liftA2 :: (a -> b -> c) -> STMStream a -> STMStream b -> STMStream c
liftA2 a -> b -> c
f (STMStream STM (a, Maybe (STMStream a))
l) (STMStream STM (b, Maybe (STMStream b))
r) = STM (c, Maybe (STMStream c)) -> STMStream c
forall a. STM (a, Maybe (STMStream a)) -> STMStream a
STMStream (STM (c, Maybe (STMStream c)) -> STMStream c)
-> STM (c, Maybe (STMStream c)) -> STMStream c
forall a b. (a -> b) -> a -> b
$ do
        ((a, Maybe (STMStream a)), (b, Maybe (STMStream b)))
x <- (,) ((a, Maybe (STMStream a))
 -> (b, Maybe (STMStream b))
 -> ((a, Maybe (STMStream a)), (b, Maybe (STMStream b))))
-> STM (a, Maybe (STMStream a))
-> STM
     ((b, Maybe (STMStream b))
      -> ((a, Maybe (STMStream a)), (b, Maybe (STMStream b))))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> STM (a, Maybe (STMStream a))
l STM
  ((b, Maybe (STMStream b))
   -> ((a, Maybe (STMStream a)), (b, Maybe (STMStream b))))
-> STM (b, Maybe (STMStream b))
-> STM ((a, Maybe (STMStream a)), (b, Maybe (STMStream b)))
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> STM (b, Maybe (STMStream b))
r
        let go :: ((a, Maybe (STMStream a)), (b, Maybe (STMStream b))) -> STM (c, Maybe (STMStream c))

            go :: ((a, Maybe (STMStream a)), (b, Maybe (STMStream b)))
-> STM (c, Maybe (STMStream c))
go ((a
currentL, Just (STMStream STM (a, Maybe (STMStream a))
restL)), (b
currentR, Just (STMStream STM (b, Maybe (STMStream b))
restR))) = do
                let next :: STM (c, Maybe (STMStream c))
next = do
                        Either (a, Maybe (STMStream a)) (b, Maybe (STMStream b))
v <- ((a, Maybe (STMStream a))
 -> Either (a, Maybe (STMStream a)) (b, Maybe (STMStream b)))
-> STM (a, Maybe (STMStream a))
-> STM (Either (a, Maybe (STMStream a)) (b, Maybe (STMStream b)))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (a, Maybe (STMStream a))
-> Either (a, Maybe (STMStream a)) (b, Maybe (STMStream b))
forall a b. a -> Either a b
Left STM (a, Maybe (STMStream a))
restL STM (Either (a, Maybe (STMStream a)) (b, Maybe (STMStream b)))
-> STM (Either (a, Maybe (STMStream a)) (b, Maybe (STMStream b)))
-> STM (Either (a, Maybe (STMStream a)) (b, Maybe (STMStream b)))
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> ((b, Maybe (STMStream b))
 -> Either (a, Maybe (STMStream a)) (b, Maybe (STMStream b)))
-> STM (b, Maybe (STMStream b))
-> STM (Either (a, Maybe (STMStream a)) (b, Maybe (STMStream b)))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (b, Maybe (STMStream b))
-> Either (a, Maybe (STMStream a)) (b, Maybe (STMStream b))
forall a b. b -> Either a b
Right STM (b, Maybe (STMStream b))
restR
                        case Either (a, Maybe (STMStream a)) (b, Maybe (STMStream b))
v of
                            Left (a
newL, Maybe (STMStream a)
restL')  -> ((a, Maybe (STMStream a)), (b, Maybe (STMStream b)))
-> STM (c, Maybe (STMStream c))
go ((a
newL, Maybe (STMStream a)
restL'), (b
currentR, STMStream b -> Maybe (STMStream b)
forall a. a -> Maybe a
Just (STMStream b -> Maybe (STMStream b))
-> STMStream b -> Maybe (STMStream b)
forall a b. (a -> b) -> a -> b
$ STM (b, Maybe (STMStream b)) -> STMStream b
forall a. STM (a, Maybe (STMStream a)) -> STMStream a
STMStream STM (b, Maybe (STMStream b))
restR))
                            Right (b
newR, Maybe (STMStream b)
restR') -> ((a, Maybe (STMStream a)), (b, Maybe (STMStream b)))
-> STM (c, Maybe (STMStream c))
go ((a
currentL, STMStream a -> Maybe (STMStream a)
forall a. a -> Maybe a
Just (STMStream a -> Maybe (STMStream a))
-> STMStream a -> Maybe (STMStream a)
forall a b. (a -> b) -> a -> b
$ STM (a, Maybe (STMStream a)) -> STMStream a
forall a. STM (a, Maybe (STMStream a)) -> STMStream a
STMStream STM (a, Maybe (STMStream a))
restL), (b
newR, Maybe (STMStream b)
restR'))
                (c, Maybe (STMStream c)) -> STM (c, Maybe (STMStream c))
forall (f :: * -> *) a. Applicative f => a -> f a
pure (a -> b -> c
f a
currentL b
currentR, STMStream c -> Maybe (STMStream c)
forall a. a -> Maybe a
Just (STMStream c -> Maybe (STMStream c))
-> STMStream c -> Maybe (STMStream c)
forall a b. (a -> b) -> a -> b
$ STM (c, Maybe (STMStream c)) -> STMStream c
forall a. STM (a, Maybe (STMStream a)) -> STMStream a
STMStream STM (c, Maybe (STMStream c))
next)
            go ((a
currentL, Just (STMStream STM (a, Maybe (STMStream a))
restL)), (b
currentR, Maybe (STMStream b)
Nothing)) =
                let apply :: a -> c
apply = (a -> b -> c) -> b -> a -> c
forall a b c. (a -> b -> c) -> b -> a -> c
flip a -> b -> c
f b
currentR in
                (c, Maybe (STMStream c)) -> STM (c, Maybe (STMStream c))
forall (f :: * -> *) a. Applicative f => a -> f a
pure (a -> b -> c
f a
currentL b
currentR, STMStream c -> Maybe (STMStream c)
forall a. a -> Maybe a
Just (STMStream c -> Maybe (STMStream c))
-> STMStream c -> Maybe (STMStream c)
forall a b. (a -> b) -> a -> b
$ STM (c, Maybe (STMStream c)) -> STMStream c
forall a. STM (a, Maybe (STMStream a)) -> STMStream a
STMStream (STM (c, Maybe (STMStream c)) -> STMStream c)
-> STM (c, Maybe (STMStream c)) -> STMStream c
forall a b. (a -> b) -> a -> b
$ ((a, Maybe (STMStream a)) -> (c, Maybe (STMStream c)))
-> STM (a, Maybe (STMStream a)) -> STM (c, Maybe (STMStream c))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((a -> c) -> (a, Maybe (STMStream c)) -> (c, Maybe (STMStream c))
forall (p :: * -> * -> *) a b c.
Bifunctor p =>
(a -> b) -> p a c -> p b c
first a -> c
apply ((a, Maybe (STMStream c)) -> (c, Maybe (STMStream c)))
-> ((a, Maybe (STMStream a)) -> (a, Maybe (STMStream c)))
-> (a, Maybe (STMStream a))
-> (c, Maybe (STMStream c))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Maybe (STMStream a) -> Maybe (STMStream c))
-> (a, Maybe (STMStream a)) -> (a, Maybe (STMStream c))
forall (p :: * -> * -> *) b c a.
Bifunctor p =>
(b -> c) -> p a b -> p a c
second ((STMStream a -> STMStream c)
-> Maybe (STMStream a) -> Maybe (STMStream c)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((a -> c) -> STMStream a -> STMStream c
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap a -> c
apply))) STM (a, Maybe (STMStream a))
restL)
            go ((a
currentL, Maybe (STMStream a)
Nothing), (b
currentR, Just (STMStream STM (b, Maybe (STMStream b))
restR))) =
                let apply :: b -> c
apply = a -> b -> c
f a
currentL in
                (c, Maybe (STMStream c)) -> STM (c, Maybe (STMStream c))
forall (f :: * -> *) a. Applicative f => a -> f a
pure (a -> b -> c
f a
currentL b
currentR, STMStream c -> Maybe (STMStream c)
forall a. a -> Maybe a
Just (STMStream c -> Maybe (STMStream c))
-> STMStream c -> Maybe (STMStream c)
forall a b. (a -> b) -> a -> b
$ STM (c, Maybe (STMStream c)) -> STMStream c
forall a. STM (a, Maybe (STMStream a)) -> STMStream a
STMStream (STM (c, Maybe (STMStream c)) -> STMStream c)
-> STM (c, Maybe (STMStream c)) -> STMStream c
forall a b. (a -> b) -> a -> b
$ ((b, Maybe (STMStream b)) -> (c, Maybe (STMStream c)))
-> STM (b, Maybe (STMStream b)) -> STM (c, Maybe (STMStream c))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((b -> c) -> (b, Maybe (STMStream c)) -> (c, Maybe (STMStream c))
forall (p :: * -> * -> *) a b c.
Bifunctor p =>
(a -> b) -> p a c -> p b c
first b -> c
apply ((b, Maybe (STMStream c)) -> (c, Maybe (STMStream c)))
-> ((b, Maybe (STMStream b)) -> (b, Maybe (STMStream c)))
-> (b, Maybe (STMStream b))
-> (c, Maybe (STMStream c))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Maybe (STMStream b) -> Maybe (STMStream c))
-> (b, Maybe (STMStream b)) -> (b, Maybe (STMStream c))
forall (p :: * -> * -> *) b c a.
Bifunctor p =>
(b -> c) -> p a b -> p a c
second ((STMStream b -> STMStream c)
-> Maybe (STMStream b) -> Maybe (STMStream c)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((b -> c) -> STMStream b -> STMStream c
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap b -> c
apply))) STM (b, Maybe (STMStream b))
restR)
            go ((a
currentL, Maybe (STMStream a)
Nothing), (b
currentR, Maybe (STMStream b)
Nothing)) =
                (c, Maybe (STMStream c)) -> STM (c, Maybe (STMStream c))
forall (f :: * -> *) a. Applicative f => a -> f a
pure (a -> b -> c
f a
currentL b
currentR, Maybe (STMStream c)
forall a. Maybe a
Nothing)
        ((a, Maybe (STMStream a)), (b, Maybe (STMStream b)))
-> STM (c, Maybe (STMStream c))
go ((a, Maybe (STMStream a)), (b, Maybe (STMStream b)))
x

instance Monad STMStream where
    STMStream a
x >>= :: STMStream a -> (a -> STMStream b) -> STMStream b
>>= a -> STMStream b
f = STMStream (STMStream b) -> STMStream b
forall a. STMStream (STMStream a) -> STMStream a
joinStream (a -> STMStream b
f (a -> STMStream b) -> STMStream a -> STMStream (STMStream b)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> STMStream a
x)

instance Eq a => Semigroup (STMStream a) where
  -- Note: We remove consecutive duplicates from the stream, so it is now
  -- never possible to construct a stream like: 1,1,1,1...
  ls :: STMStream a
ls@(STMStream STM (a, Maybe (STMStream a))
l) <> :: STMStream a -> STMStream a -> STMStream a
<> rs :: STMStream a
rs@(STMStream STM (a, Maybe (STMStream a))
r) = STMStream a -> STMStream a
forall a. Eq a => STMStream a -> STMStream a
dedupe (STMStream a -> STMStream a) -> STMStream a -> STMStream a
forall a b. (a -> b) -> a -> b
$
     STM (a, Maybe (STMStream a)) -> STMStream a
forall a. STM (a, Maybe (STMStream a)) -> STMStream a
STMStream (STM (a, Maybe (STMStream a)) -> STMStream a)
-> STM (a, Maybe (STMStream a)) -> STMStream a
forall a b. (a -> b) -> a -> b
$ do
          Either (a, Maybe (STMStream a)) (a, Maybe (STMStream a))
next <- ((a, Maybe (STMStream a))
 -> Either (a, Maybe (STMStream a)) (a, Maybe (STMStream a)))
-> STM (a, Maybe (STMStream a))
-> STM (Either (a, Maybe (STMStream a)) (a, Maybe (STMStream a)))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (a, Maybe (STMStream a))
-> Either (a, Maybe (STMStream a)) (a, Maybe (STMStream a))
forall a b. a -> Either a b
Left STM (a, Maybe (STMStream a))
l STM (Either (a, Maybe (STMStream a)) (a, Maybe (STMStream a)))
-> STM (Either (a, Maybe (STMStream a)) (a, Maybe (STMStream a)))
-> STM (Either (a, Maybe (STMStream a)) (a, Maybe (STMStream a)))
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> ((a, Maybe (STMStream a))
 -> Either (a, Maybe (STMStream a)) (a, Maybe (STMStream a)))
-> STM (a, Maybe (STMStream a))
-> STM (Either (a, Maybe (STMStream a)) (a, Maybe (STMStream a)))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (a, Maybe (STMStream a))
-> Either (a, Maybe (STMStream a)) (a, Maybe (STMStream a))
forall a b. b -> Either a b
Right STM (a, Maybe (STMStream a))
r
          case Either (a, Maybe (STMStream a)) (a, Maybe (STMStream a))
next of
            Left  (a
v, Maybe (STMStream a)
k) -> (a, Maybe (STMStream a)) -> STM (a, Maybe (STMStream a))
forall (f :: * -> *) a. Applicative f => a -> f a
pure (a
v, Maybe (STMStream a)
k Maybe (STMStream a) -> Maybe (STMStream a) -> Maybe (STMStream a)
forall a. Semigroup a => a -> a -> a
<> STMStream a -> Maybe (STMStream a)
forall a. a -> Maybe a
Just STMStream a
rs)
            Right (a
v, Maybe (STMStream a)
k) -> (a, Maybe (STMStream a)) -> STM (a, Maybe (STMStream a))
forall (f :: * -> *) a. Applicative f => a -> f a
pure (a
v, STMStream a -> Maybe (STMStream a)
forall a. a -> Maybe a
Just STMStream a
ls Maybe (STMStream a) -> Maybe (STMStream a) -> Maybe (STMStream a)
forall a. Semigroup a => a -> a -> a
<> Maybe (STMStream a)
k)

instance Eq a => Monoid (STMStream a) where
    mappend :: STMStream a -> STMStream a -> STMStream a
mappend = STMStream a -> STMStream a -> STMStream a
forall a. Semigroup a => a -> a -> a
(<>)
    mempty :: STMStream a
mempty = STM (a, Maybe (STMStream a)) -> STMStream a
forall a. STM (a, Maybe (STMStream a)) -> STMStream a
STMStream STM (a, Maybe (STMStream a))
forall a. STM a
STM.retry

-- | Remove consecutive duplicates from a stream.
dedupe :: forall a. Eq a => STMStream a -> STMStream a
dedupe :: STMStream a -> STMStream a
dedupe = STM (a, Maybe (STMStream a)) -> STMStream a
forall a. STM (a, Maybe (STMStream a)) -> STMStream a
STMStream (STM (a, Maybe (STMStream a)) -> STMStream a)
-> (STMStream a -> STM (a, Maybe (STMStream a)))
-> STMStream a
-> STMStream a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Maybe a -> STMStream a -> STM (a, Maybe (STMStream a))
go Maybe a
forall a. Maybe a
Nothing
  where
    go :: Maybe a -> STMStream a -> STM (a, Maybe (STMStream a))
    go :: Maybe a -> STMStream a -> STM (a, Maybe (STMStream a))
go Maybe a
lastVl (STMStream STM (a, Maybe (STMStream a))
s) = do
      (a
next, Maybe (STMStream a)
ms) <- STM (a, Maybe (STMStream a))
s
      case Maybe (STMStream a)
ms of
        Maybe (STMStream a)
Nothing -> (a, Maybe (STMStream a)) -> STM (a, Maybe (STMStream a))
forall (f :: * -> *) a. Applicative f => a -> f a
pure (a
next, Maybe (STMStream a)
forall a. Maybe a
Nothing)
        Just STMStream a
s' ->
          if a -> Maybe a
forall a. a -> Maybe a
Just a
next Maybe a -> Maybe a -> Bool
forall a. Eq a => a -> a -> Bool
/= Maybe a
lastVl
             then (a, Maybe (STMStream a)) -> STM (a, Maybe (STMStream a))
forall (f :: * -> *) a. Applicative f => a -> f a
pure (a
next, STMStream a -> Maybe (STMStream a)
forall a. a -> Maybe a
Just (STMStream a -> Maybe (STMStream a))
-> STMStream a -> Maybe (STMStream a)
forall a b. (a -> b) -> a -> b
$ STM (a, Maybe (STMStream a)) -> STMStream a
forall a. STM (a, Maybe (STMStream a)) -> STMStream a
STMStream (STM (a, Maybe (STMStream a)) -> STMStream a)
-> STM (a, Maybe (STMStream a)) -> STMStream a
forall a b. (a -> b) -> a -> b
$ Maybe a -> STMStream a -> STM (a, Maybe (STMStream a))
go (a -> Maybe a
forall a. a -> Maybe a
Just a
next) STMStream a
s')
             else Maybe a -> STMStream a -> STM (a, Maybe (STMStream a))
go Maybe a
lastVl STMStream a
s'

-- | Produce an infinite stream of values from an STM (i.e. watch it for
-- updates). Uses the Eq instance to not output the same value twice in a
-- row.
unfold :: forall a. Eq a => STM a -> STMStream a
unfold :: STM a -> STMStream a
unfold = (a -> a) -> STM a -> STMStream a
forall a b. Eq b => (a -> b) -> STM a -> STMStream a
unfoldOn a -> a
forall a. a -> a
id

-- | Produce an infinite stream of values from an STM (i.e. watch it for
-- updates). Uses the Eq instance of 'b' to not output the same value twice in a
-- row.
unfoldOn :: forall a b. Eq b => (a -> b) -> STM a -> STMStream a
unfoldOn :: (a -> b) -> STM a -> STMStream a
unfoldOn a -> b
f STM a
tx = STM (a, Maybe (STMStream a)) -> STMStream a
forall a. STM (a, Maybe (STMStream a)) -> STMStream a
STMStream (STM (a, Maybe (STMStream a)) -> STMStream a)
-> STM (a, Maybe (STMStream a)) -> STMStream a
forall a b. (a -> b) -> a -> b
$ Maybe b -> STM (a, Maybe (STMStream a))
go Maybe b
forall a. Maybe a
Nothing where
    go :: Maybe b -> STM (a, Maybe (STMStream a))
    go :: Maybe b -> STM (a, Maybe (STMStream a))
go Maybe b
lastVl = do
        a
next <- STM a
tx
        let next' :: b
next' = a -> b
f a
next
        (b -> STM ()) -> Maybe b -> STM ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
(a -> f b) -> t a -> f ()
traverse_ (\b
previous -> Bool -> STM ()
forall (f :: * -> *). Alternative f => Bool -> f ()
guard (b
previous b -> b -> Bool
forall a. Eq a => a -> a -> Bool
/= b
next')) Maybe b
lastVl
        (a, Maybe (STMStream a)) -> STM (a, Maybe (STMStream a))
forall (f :: * -> *) a. Applicative f => a -> f a
pure (a
next, STMStream a -> Maybe (STMStream a)
forall a. a -> Maybe a
Just (STMStream a -> Maybe (STMStream a))
-> STMStream a -> Maybe (STMStream a)
forall a b. (a -> b) -> a -> b
$ STM (a, Maybe (STMStream a)) -> STMStream a
forall a. STM (a, Maybe (STMStream a)) -> STMStream a
STMStream (STM (a, Maybe (STMStream a)) -> STMStream a)
-> STM (a, Maybe (STMStream a)) -> STMStream a
forall a b. (a -> b) -> a -> b
$ Maybe b -> STM (a, Maybe (STMStream a))
go (b -> Maybe b
forall a. a -> Maybe a
Just b
next'))

-- | Read the first event from the stream.
readOne :: STMStream a -> IO (a, Maybe (STMStream a))
readOne :: STMStream a -> IO (a, Maybe (STMStream a))
readOne STMStream{STM (a, Maybe (STMStream a))
unSTMStream :: STM (a, Maybe (STMStream a))
unSTMStream :: forall a. STMStream a -> STM (a, Maybe (STMStream a))
unSTMStream} = STM (a, Maybe (STMStream a)) -> IO (a, Maybe (STMStream a))
forall a. STM a -> IO a
STM.atomically STM (a, Maybe (STMStream a))
unSTMStream

-- | Read a number of events from the stream. Blocks until all events
--   have been received.
readN :: Natural -> STMStream a -> IO [a]
readN :: Natural -> STMStream a -> IO [a]
readN Natural
0 STMStream a
_ = [a] -> IO [a]
forall (f :: * -> *) a. Applicative f => a -> f a
pure []
readN Natural
k STMStream a
s = do
    (a
a, Maybe (STMStream a)
s') <- STMStream a -> IO (a, Maybe (STMStream a))
forall a. STMStream a -> IO (a, Maybe (STMStream a))
readOne STMStream a
s
    case Maybe (STMStream a)
s' of
        Maybe (STMStream a)
Nothing   -> [a] -> IO [a]
forall (f :: * -> *) a. Applicative f => a -> f a
pure [a
a]
        Just STMStream a
rest -> (:) a
a ([a] -> [a]) -> IO [a] -> IO [a]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Natural -> STMStream a -> IO [a]
forall a. Natural -> STMStream a -> IO [a]
readN (Natural -> Natural
forall a. Enum a => a -> a
pred Natural
k) STMStream a
rest

-- | Consume a stream. Blocks until the stream has terminated.
foldM ::
    STMStream a -- ^ The stream
    -> (a -> IO ()) -- ^ Event handler
    -> IO () -- ^ Handler for the end of the stream
    -> IO ()
foldM :: STMStream a -> (a -> IO ()) -> IO () -> IO ()
foldM STMStream a
s a -> IO ()
handleEvent IO ()
handleStop = do
    (a
v, Maybe (STMStream a)
next) <- STMStream a -> IO (a, Maybe (STMStream a))
forall a. STMStream a -> IO (a, Maybe (STMStream a))
readOne STMStream a
s
    a -> IO ()
handleEvent a
v
    case Maybe (STMStream a)
next of
        Maybe (STMStream a)
Nothing -> IO ()
handleStop
        Just STMStream a
s' -> STMStream a -> (a -> IO ()) -> IO () -> IO ()
forall a. STMStream a -> (a -> IO ()) -> IO () -> IO ()
foldM STMStream a
s' a -> IO ()
handleEvent IO ()
handleStop