{-# LANGUAGE DataKinds #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE InstanceSigs #-}
{-# LANGUAGE KindSignatures #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TupleSections #-}
{-# LANGUAGE TypeApplications #-}
module Control.Concurrent.STM.Extras.Stream
( STMStream
, readOne
, readN
, foldM
, unfold
, unfoldOn
, singleton
, dedupe
) where
import Control.Applicative (Alternative (..), Applicative (..))
import Control.Concurrent.STM (STM)
import Control.Concurrent.STM qualified as STM
import Control.Monad (guard)
import Data.Bifunctor (Bifunctor (..))
import Data.Foldable (traverse_)
import Numeric.Natural (Natural)
newtype STMStream a = STMStream{ STMStream a -> STM (a, Maybe (STMStream a))
unSTMStream :: STM (a, Maybe (STMStream a)) }
deriving a -> STMStream b -> STMStream a
(a -> b) -> STMStream a -> STMStream b
(forall a b. (a -> b) -> STMStream a -> STMStream b)
-> (forall a b. a -> STMStream b -> STMStream a)
-> Functor STMStream
forall a b. a -> STMStream b -> STMStream a
forall a b. (a -> b) -> STMStream a -> STMStream b
forall (f :: * -> *).
(forall a b. (a -> b) -> f a -> f b)
-> (forall a b. a -> f b -> f a) -> Functor f
<$ :: a -> STMStream b -> STMStream a
$c<$ :: forall a b. a -> STMStream b -> STMStream a
fmap :: (a -> b) -> STMStream a -> STMStream b
$cfmap :: forall a b. (a -> b) -> STMStream a -> STMStream b
Functor
joinStream :: STMStream (STMStream a) -> STMStream a
joinStream :: STMStream (STMStream a) -> STMStream a
joinStream STMStream{STM (STMStream a, Maybe (STMStream (STMStream a)))
unSTMStream :: STM (STMStream a, Maybe (STMStream (STMStream a)))
unSTMStream :: forall a. STMStream a -> STM (a, Maybe (STMStream a))
unSTMStream} = STM (a, Maybe (STMStream a)) -> STMStream a
forall a. STM (a, Maybe (STMStream a)) -> STMStream a
STMStream (STM (a, Maybe (STMStream a)) -> STMStream a)
-> STM (a, Maybe (STMStream a)) -> STMStream a
forall a b. (a -> b) -> a -> b
$ STM (STMStream a, Maybe (STMStream (STMStream a)))
unSTMStream STM (STMStream a, Maybe (STMStream (STMStream a)))
-> ((STMStream a, Maybe (STMStream (STMStream a)))
-> STM (a, Maybe (STMStream a)))
-> STM (a, Maybe (STMStream a))
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (STMStream a, Maybe (STMStream (STMStream a)))
-> STM (a, Maybe (STMStream a))
forall a.
(STMStream a, Maybe (STMStream (STMStream a)))
-> STM (a, Maybe (STMStream a))
go where
go :: (STMStream a, Maybe (STMStream (STMStream a))) -> STM (a, Maybe (STMStream a))
go :: (STMStream a, Maybe (STMStream (STMStream a)))
-> STM (a, Maybe (STMStream a))
go (STMStream STM (a, Maybe (STMStream a))
currentStream, Maybe (STMStream (STMStream a))
Nothing) = STM (a, Maybe (STMStream a))
currentStream
go (STMStream STM (a, Maybe (STMStream a))
currentStream, Just ns :: STMStream (STMStream a)
ns@(STMStream STM (STMStream a, Maybe (STMStream (STMStream a)))
nextStream)) = do
Either
(a, Maybe (STMStream a))
(STMStream a, Maybe (STMStream (STMStream a)))
vl <- ((a, Maybe (STMStream a))
-> Either
(a, Maybe (STMStream a))
(STMStream a, Maybe (STMStream (STMStream a))))
-> STM (a, Maybe (STMStream a))
-> STM
(Either
(a, Maybe (STMStream a))
(STMStream a, Maybe (STMStream (STMStream a))))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (a, Maybe (STMStream a))
-> Either
(a, Maybe (STMStream a))
(STMStream a, Maybe (STMStream (STMStream a)))
forall a b. a -> Either a b
Left STM (a, Maybe (STMStream a))
currentStream STM
(Either
(a, Maybe (STMStream a))
(STMStream a, Maybe (STMStream (STMStream a))))
-> STM
(Either
(a, Maybe (STMStream a))
(STMStream a, Maybe (STMStream (STMStream a))))
-> STM
(Either
(a, Maybe (STMStream a))
(STMStream a, Maybe (STMStream (STMStream a))))
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> ((STMStream a, Maybe (STMStream (STMStream a)))
-> Either
(a, Maybe (STMStream a))
(STMStream a, Maybe (STMStream (STMStream a))))
-> STM (STMStream a, Maybe (STMStream (STMStream a)))
-> STM
(Either
(a, Maybe (STMStream a))
(STMStream a, Maybe (STMStream (STMStream a))))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (STMStream a, Maybe (STMStream (STMStream a)))
-> Either
(a, Maybe (STMStream a))
(STMStream a, Maybe (STMStream (STMStream a)))
forall a b. b -> Either a b
Right STM (STMStream a, Maybe (STMStream (STMStream a)))
nextStream
case Either
(a, Maybe (STMStream a))
(STMStream a, Maybe (STMStream (STMStream a)))
vl of
Left (a
a, Just STMStream a
currentStream') -> (a, Maybe (STMStream a)) -> STM (a, Maybe (STMStream a))
forall (f :: * -> *) a. Applicative f => a -> f a
pure (a
a, STMStream a -> Maybe (STMStream a)
forall a. a -> Maybe a
Just (STMStream a -> Maybe (STMStream a))
-> STMStream a -> Maybe (STMStream a)
forall a b. (a -> b) -> a -> b
$ STM (a, Maybe (STMStream a)) -> STMStream a
forall a. STM (a, Maybe (STMStream a)) -> STMStream a
STMStream (STM (a, Maybe (STMStream a)) -> STMStream a)
-> STM (a, Maybe (STMStream a)) -> STMStream a
forall a b. (a -> b) -> a -> b
$ (STMStream a, Maybe (STMStream (STMStream a)))
-> STM (a, Maybe (STMStream a))
forall a.
(STMStream a, Maybe (STMStream (STMStream a)))
-> STM (a, Maybe (STMStream a))
go (STMStream a
currentStream', STMStream (STMStream a) -> Maybe (STMStream (STMStream a))
forall a. a -> Maybe a
Just STMStream (STMStream a)
ns))
Left (a
a, Maybe (STMStream a)
Nothing) -> (a, Maybe (STMStream a)) -> STM (a, Maybe (STMStream a))
forall (f :: * -> *) a. Applicative f => a -> f a
pure (a
a, STMStream a -> Maybe (STMStream a)
forall a. a -> Maybe a
Just (STMStream a -> Maybe (STMStream a))
-> STMStream a -> Maybe (STMStream a)
forall a b. (a -> b) -> a -> b
$ STMStream (STMStream a) -> STMStream a
forall a. STMStream (STMStream a) -> STMStream a
joinStream STMStream (STMStream a)
ns)
Right (STMStream a, Maybe (STMStream (STMStream a)))
ns' -> (STMStream a, Maybe (STMStream (STMStream a)))
-> STM (a, Maybe (STMStream a))
forall a.
(STMStream a, Maybe (STMStream (STMStream a)))
-> STM (a, Maybe (STMStream a))
go (STMStream a, Maybe (STMStream (STMStream a)))
ns'
singleton :: STM a -> STMStream a
singleton :: STM a -> STMStream a
singleton = STM (a, Maybe (STMStream a)) -> STMStream a
forall a. STM (a, Maybe (STMStream a)) -> STMStream a
STMStream (STM (a, Maybe (STMStream a)) -> STMStream a)
-> (STM a -> STM (a, Maybe (STMStream a))) -> STM a -> STMStream a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (a -> (a, Maybe (STMStream a)))
-> STM a -> STM (a, Maybe (STMStream a))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (, Maybe (STMStream a)
forall a. Maybe a
Nothing)
instance Applicative STMStream where
pure :: a -> STMStream a
pure = STM a -> STMStream a
forall a. STM a -> STMStream a
singleton (STM a -> STMStream a) -> (a -> STM a) -> a -> STMStream a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure
liftA2 :: forall a b c. (a -> b -> c) -> STMStream a -> STMStream b -> STMStream c
liftA2 :: (a -> b -> c) -> STMStream a -> STMStream b -> STMStream c
liftA2 a -> b -> c
f (STMStream STM (a, Maybe (STMStream a))
l) (STMStream STM (b, Maybe (STMStream b))
r) = STM (c, Maybe (STMStream c)) -> STMStream c
forall a. STM (a, Maybe (STMStream a)) -> STMStream a
STMStream (STM (c, Maybe (STMStream c)) -> STMStream c)
-> STM (c, Maybe (STMStream c)) -> STMStream c
forall a b. (a -> b) -> a -> b
$ do
((a, Maybe (STMStream a)), (b, Maybe (STMStream b)))
x <- (,) ((a, Maybe (STMStream a))
-> (b, Maybe (STMStream b))
-> ((a, Maybe (STMStream a)), (b, Maybe (STMStream b))))
-> STM (a, Maybe (STMStream a))
-> STM
((b, Maybe (STMStream b))
-> ((a, Maybe (STMStream a)), (b, Maybe (STMStream b))))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> STM (a, Maybe (STMStream a))
l STM
((b, Maybe (STMStream b))
-> ((a, Maybe (STMStream a)), (b, Maybe (STMStream b))))
-> STM (b, Maybe (STMStream b))
-> STM ((a, Maybe (STMStream a)), (b, Maybe (STMStream b)))
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> STM (b, Maybe (STMStream b))
r
let go :: ((a, Maybe (STMStream a)), (b, Maybe (STMStream b))) -> STM (c, Maybe (STMStream c))
go :: ((a, Maybe (STMStream a)), (b, Maybe (STMStream b)))
-> STM (c, Maybe (STMStream c))
go ((a
currentL, Just (STMStream STM (a, Maybe (STMStream a))
restL)), (b
currentR, Just (STMStream STM (b, Maybe (STMStream b))
restR))) = do
let next :: STM (c, Maybe (STMStream c))
next = do
Either (a, Maybe (STMStream a)) (b, Maybe (STMStream b))
v <- ((a, Maybe (STMStream a))
-> Either (a, Maybe (STMStream a)) (b, Maybe (STMStream b)))
-> STM (a, Maybe (STMStream a))
-> STM (Either (a, Maybe (STMStream a)) (b, Maybe (STMStream b)))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (a, Maybe (STMStream a))
-> Either (a, Maybe (STMStream a)) (b, Maybe (STMStream b))
forall a b. a -> Either a b
Left STM (a, Maybe (STMStream a))
restL STM (Either (a, Maybe (STMStream a)) (b, Maybe (STMStream b)))
-> STM (Either (a, Maybe (STMStream a)) (b, Maybe (STMStream b)))
-> STM (Either (a, Maybe (STMStream a)) (b, Maybe (STMStream b)))
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> ((b, Maybe (STMStream b))
-> Either (a, Maybe (STMStream a)) (b, Maybe (STMStream b)))
-> STM (b, Maybe (STMStream b))
-> STM (Either (a, Maybe (STMStream a)) (b, Maybe (STMStream b)))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (b, Maybe (STMStream b))
-> Either (a, Maybe (STMStream a)) (b, Maybe (STMStream b))
forall a b. b -> Either a b
Right STM (b, Maybe (STMStream b))
restR
case Either (a, Maybe (STMStream a)) (b, Maybe (STMStream b))
v of
Left (a
newL, Maybe (STMStream a)
restL') -> ((a, Maybe (STMStream a)), (b, Maybe (STMStream b)))
-> STM (c, Maybe (STMStream c))
go ((a
newL, Maybe (STMStream a)
restL'), (b
currentR, STMStream b -> Maybe (STMStream b)
forall a. a -> Maybe a
Just (STMStream b -> Maybe (STMStream b))
-> STMStream b -> Maybe (STMStream b)
forall a b. (a -> b) -> a -> b
$ STM (b, Maybe (STMStream b)) -> STMStream b
forall a. STM (a, Maybe (STMStream a)) -> STMStream a
STMStream STM (b, Maybe (STMStream b))
restR))
Right (b
newR, Maybe (STMStream b)
restR') -> ((a, Maybe (STMStream a)), (b, Maybe (STMStream b)))
-> STM (c, Maybe (STMStream c))
go ((a
currentL, STMStream a -> Maybe (STMStream a)
forall a. a -> Maybe a
Just (STMStream a -> Maybe (STMStream a))
-> STMStream a -> Maybe (STMStream a)
forall a b. (a -> b) -> a -> b
$ STM (a, Maybe (STMStream a)) -> STMStream a
forall a. STM (a, Maybe (STMStream a)) -> STMStream a
STMStream STM (a, Maybe (STMStream a))
restL), (b
newR, Maybe (STMStream b)
restR'))
(c, Maybe (STMStream c)) -> STM (c, Maybe (STMStream c))
forall (f :: * -> *) a. Applicative f => a -> f a
pure (a -> b -> c
f a
currentL b
currentR, STMStream c -> Maybe (STMStream c)
forall a. a -> Maybe a
Just (STMStream c -> Maybe (STMStream c))
-> STMStream c -> Maybe (STMStream c)
forall a b. (a -> b) -> a -> b
$ STM (c, Maybe (STMStream c)) -> STMStream c
forall a. STM (a, Maybe (STMStream a)) -> STMStream a
STMStream STM (c, Maybe (STMStream c))
next)
go ((a
currentL, Just (STMStream STM (a, Maybe (STMStream a))
restL)), (b
currentR, Maybe (STMStream b)
Nothing)) =
let apply :: a -> c
apply = (a -> b -> c) -> b -> a -> c
forall a b c. (a -> b -> c) -> b -> a -> c
flip a -> b -> c
f b
currentR in
(c, Maybe (STMStream c)) -> STM (c, Maybe (STMStream c))
forall (f :: * -> *) a. Applicative f => a -> f a
pure (a -> b -> c
f a
currentL b
currentR, STMStream c -> Maybe (STMStream c)
forall a. a -> Maybe a
Just (STMStream c -> Maybe (STMStream c))
-> STMStream c -> Maybe (STMStream c)
forall a b. (a -> b) -> a -> b
$ STM (c, Maybe (STMStream c)) -> STMStream c
forall a. STM (a, Maybe (STMStream a)) -> STMStream a
STMStream (STM (c, Maybe (STMStream c)) -> STMStream c)
-> STM (c, Maybe (STMStream c)) -> STMStream c
forall a b. (a -> b) -> a -> b
$ ((a, Maybe (STMStream a)) -> (c, Maybe (STMStream c)))
-> STM (a, Maybe (STMStream a)) -> STM (c, Maybe (STMStream c))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((a -> c) -> (a, Maybe (STMStream c)) -> (c, Maybe (STMStream c))
forall (p :: * -> * -> *) a b c.
Bifunctor p =>
(a -> b) -> p a c -> p b c
first a -> c
apply ((a, Maybe (STMStream c)) -> (c, Maybe (STMStream c)))
-> ((a, Maybe (STMStream a)) -> (a, Maybe (STMStream c)))
-> (a, Maybe (STMStream a))
-> (c, Maybe (STMStream c))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Maybe (STMStream a) -> Maybe (STMStream c))
-> (a, Maybe (STMStream a)) -> (a, Maybe (STMStream c))
forall (p :: * -> * -> *) b c a.
Bifunctor p =>
(b -> c) -> p a b -> p a c
second ((STMStream a -> STMStream c)
-> Maybe (STMStream a) -> Maybe (STMStream c)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((a -> c) -> STMStream a -> STMStream c
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap a -> c
apply))) STM (a, Maybe (STMStream a))
restL)
go ((a
currentL, Maybe (STMStream a)
Nothing), (b
currentR, Just (STMStream STM (b, Maybe (STMStream b))
restR))) =
let apply :: b -> c
apply = a -> b -> c
f a
currentL in
(c, Maybe (STMStream c)) -> STM (c, Maybe (STMStream c))
forall (f :: * -> *) a. Applicative f => a -> f a
pure (a -> b -> c
f a
currentL b
currentR, STMStream c -> Maybe (STMStream c)
forall a. a -> Maybe a
Just (STMStream c -> Maybe (STMStream c))
-> STMStream c -> Maybe (STMStream c)
forall a b. (a -> b) -> a -> b
$ STM (c, Maybe (STMStream c)) -> STMStream c
forall a. STM (a, Maybe (STMStream a)) -> STMStream a
STMStream (STM (c, Maybe (STMStream c)) -> STMStream c)
-> STM (c, Maybe (STMStream c)) -> STMStream c
forall a b. (a -> b) -> a -> b
$ ((b, Maybe (STMStream b)) -> (c, Maybe (STMStream c)))
-> STM (b, Maybe (STMStream b)) -> STM (c, Maybe (STMStream c))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((b -> c) -> (b, Maybe (STMStream c)) -> (c, Maybe (STMStream c))
forall (p :: * -> * -> *) a b c.
Bifunctor p =>
(a -> b) -> p a c -> p b c
first b -> c
apply ((b, Maybe (STMStream c)) -> (c, Maybe (STMStream c)))
-> ((b, Maybe (STMStream b)) -> (b, Maybe (STMStream c)))
-> (b, Maybe (STMStream b))
-> (c, Maybe (STMStream c))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Maybe (STMStream b) -> Maybe (STMStream c))
-> (b, Maybe (STMStream b)) -> (b, Maybe (STMStream c))
forall (p :: * -> * -> *) b c a.
Bifunctor p =>
(b -> c) -> p a b -> p a c
second ((STMStream b -> STMStream c)
-> Maybe (STMStream b) -> Maybe (STMStream c)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((b -> c) -> STMStream b -> STMStream c
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap b -> c
apply))) STM (b, Maybe (STMStream b))
restR)
go ((a
currentL, Maybe (STMStream a)
Nothing), (b
currentR, Maybe (STMStream b)
Nothing)) =
(c, Maybe (STMStream c)) -> STM (c, Maybe (STMStream c))
forall (f :: * -> *) a. Applicative f => a -> f a
pure (a -> b -> c
f a
currentL b
currentR, Maybe (STMStream c)
forall a. Maybe a
Nothing)
((a, Maybe (STMStream a)), (b, Maybe (STMStream b)))
-> STM (c, Maybe (STMStream c))
go ((a, Maybe (STMStream a)), (b, Maybe (STMStream b)))
x
instance Monad STMStream where
STMStream a
x >>= :: STMStream a -> (a -> STMStream b) -> STMStream b
>>= a -> STMStream b
f = STMStream (STMStream b) -> STMStream b
forall a. STMStream (STMStream a) -> STMStream a
joinStream (a -> STMStream b
f (a -> STMStream b) -> STMStream a -> STMStream (STMStream b)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> STMStream a
x)
instance Eq a => Semigroup (STMStream a) where
ls :: STMStream a
ls@(STMStream STM (a, Maybe (STMStream a))
l) <> :: STMStream a -> STMStream a -> STMStream a
<> rs :: STMStream a
rs@(STMStream STM (a, Maybe (STMStream a))
r) = STMStream a -> STMStream a
forall a. Eq a => STMStream a -> STMStream a
dedupe (STMStream a -> STMStream a) -> STMStream a -> STMStream a
forall a b. (a -> b) -> a -> b
$
STM (a, Maybe (STMStream a)) -> STMStream a
forall a. STM (a, Maybe (STMStream a)) -> STMStream a
STMStream (STM (a, Maybe (STMStream a)) -> STMStream a)
-> STM (a, Maybe (STMStream a)) -> STMStream a
forall a b. (a -> b) -> a -> b
$ do
Either (a, Maybe (STMStream a)) (a, Maybe (STMStream a))
next <- ((a, Maybe (STMStream a))
-> Either (a, Maybe (STMStream a)) (a, Maybe (STMStream a)))
-> STM (a, Maybe (STMStream a))
-> STM (Either (a, Maybe (STMStream a)) (a, Maybe (STMStream a)))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (a, Maybe (STMStream a))
-> Either (a, Maybe (STMStream a)) (a, Maybe (STMStream a))
forall a b. a -> Either a b
Left STM (a, Maybe (STMStream a))
l STM (Either (a, Maybe (STMStream a)) (a, Maybe (STMStream a)))
-> STM (Either (a, Maybe (STMStream a)) (a, Maybe (STMStream a)))
-> STM (Either (a, Maybe (STMStream a)) (a, Maybe (STMStream a)))
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> ((a, Maybe (STMStream a))
-> Either (a, Maybe (STMStream a)) (a, Maybe (STMStream a)))
-> STM (a, Maybe (STMStream a))
-> STM (Either (a, Maybe (STMStream a)) (a, Maybe (STMStream a)))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (a, Maybe (STMStream a))
-> Either (a, Maybe (STMStream a)) (a, Maybe (STMStream a))
forall a b. b -> Either a b
Right STM (a, Maybe (STMStream a))
r
case Either (a, Maybe (STMStream a)) (a, Maybe (STMStream a))
next of
Left (a
v, Maybe (STMStream a)
k) -> (a, Maybe (STMStream a)) -> STM (a, Maybe (STMStream a))
forall (f :: * -> *) a. Applicative f => a -> f a
pure (a
v, Maybe (STMStream a)
k Maybe (STMStream a) -> Maybe (STMStream a) -> Maybe (STMStream a)
forall a. Semigroup a => a -> a -> a
<> STMStream a -> Maybe (STMStream a)
forall a. a -> Maybe a
Just STMStream a
rs)
Right (a
v, Maybe (STMStream a)
k) -> (a, Maybe (STMStream a)) -> STM (a, Maybe (STMStream a))
forall (f :: * -> *) a. Applicative f => a -> f a
pure (a
v, STMStream a -> Maybe (STMStream a)
forall a. a -> Maybe a
Just STMStream a
ls Maybe (STMStream a) -> Maybe (STMStream a) -> Maybe (STMStream a)
forall a. Semigroup a => a -> a -> a
<> Maybe (STMStream a)
k)
instance Eq a => Monoid (STMStream a) where
mappend :: STMStream a -> STMStream a -> STMStream a
mappend = STMStream a -> STMStream a -> STMStream a
forall a. Semigroup a => a -> a -> a
(<>)
mempty :: STMStream a
mempty = STM (a, Maybe (STMStream a)) -> STMStream a
forall a. STM (a, Maybe (STMStream a)) -> STMStream a
STMStream STM (a, Maybe (STMStream a))
forall a. STM a
STM.retry
dedupe :: forall a. Eq a => STMStream a -> STMStream a
dedupe :: STMStream a -> STMStream a
dedupe = STM (a, Maybe (STMStream a)) -> STMStream a
forall a. STM (a, Maybe (STMStream a)) -> STMStream a
STMStream (STM (a, Maybe (STMStream a)) -> STMStream a)
-> (STMStream a -> STM (a, Maybe (STMStream a)))
-> STMStream a
-> STMStream a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Maybe a -> STMStream a -> STM (a, Maybe (STMStream a))
go Maybe a
forall a. Maybe a
Nothing
where
go :: Maybe a -> STMStream a -> STM (a, Maybe (STMStream a))
go :: Maybe a -> STMStream a -> STM (a, Maybe (STMStream a))
go Maybe a
lastVl (STMStream STM (a, Maybe (STMStream a))
s) = do
(a
next, Maybe (STMStream a)
ms) <- STM (a, Maybe (STMStream a))
s
case Maybe (STMStream a)
ms of
Maybe (STMStream a)
Nothing -> (a, Maybe (STMStream a)) -> STM (a, Maybe (STMStream a))
forall (f :: * -> *) a. Applicative f => a -> f a
pure (a
next, Maybe (STMStream a)
forall a. Maybe a
Nothing)
Just STMStream a
s' ->
if a -> Maybe a
forall a. a -> Maybe a
Just a
next Maybe a -> Maybe a -> Bool
forall a. Eq a => a -> a -> Bool
/= Maybe a
lastVl
then (a, Maybe (STMStream a)) -> STM (a, Maybe (STMStream a))
forall (f :: * -> *) a. Applicative f => a -> f a
pure (a
next, STMStream a -> Maybe (STMStream a)
forall a. a -> Maybe a
Just (STMStream a -> Maybe (STMStream a))
-> STMStream a -> Maybe (STMStream a)
forall a b. (a -> b) -> a -> b
$ STM (a, Maybe (STMStream a)) -> STMStream a
forall a. STM (a, Maybe (STMStream a)) -> STMStream a
STMStream (STM (a, Maybe (STMStream a)) -> STMStream a)
-> STM (a, Maybe (STMStream a)) -> STMStream a
forall a b. (a -> b) -> a -> b
$ Maybe a -> STMStream a -> STM (a, Maybe (STMStream a))
go (a -> Maybe a
forall a. a -> Maybe a
Just a
next) STMStream a
s')
else Maybe a -> STMStream a -> STM (a, Maybe (STMStream a))
go Maybe a
lastVl STMStream a
s'
unfold :: forall a. Eq a => STM a -> STMStream a
unfold :: STM a -> STMStream a
unfold = (a -> a) -> STM a -> STMStream a
forall a b. Eq b => (a -> b) -> STM a -> STMStream a
unfoldOn a -> a
forall a. a -> a
id
unfoldOn :: forall a b. Eq b => (a -> b) -> STM a -> STMStream a
unfoldOn :: (a -> b) -> STM a -> STMStream a
unfoldOn a -> b
f STM a
tx = STM (a, Maybe (STMStream a)) -> STMStream a
forall a. STM (a, Maybe (STMStream a)) -> STMStream a
STMStream (STM (a, Maybe (STMStream a)) -> STMStream a)
-> STM (a, Maybe (STMStream a)) -> STMStream a
forall a b. (a -> b) -> a -> b
$ Maybe b -> STM (a, Maybe (STMStream a))
go Maybe b
forall a. Maybe a
Nothing where
go :: Maybe b -> STM (a, Maybe (STMStream a))
go :: Maybe b -> STM (a, Maybe (STMStream a))
go Maybe b
lastVl = do
a
next <- STM a
tx
let next' :: b
next' = a -> b
f a
next
(b -> STM ()) -> Maybe b -> STM ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
(a -> f b) -> t a -> f ()
traverse_ (\b
previous -> Bool -> STM ()
forall (f :: * -> *). Alternative f => Bool -> f ()
guard (b
previous b -> b -> Bool
forall a. Eq a => a -> a -> Bool
/= b
next')) Maybe b
lastVl
(a, Maybe (STMStream a)) -> STM (a, Maybe (STMStream a))
forall (f :: * -> *) a. Applicative f => a -> f a
pure (a
next, STMStream a -> Maybe (STMStream a)
forall a. a -> Maybe a
Just (STMStream a -> Maybe (STMStream a))
-> STMStream a -> Maybe (STMStream a)
forall a b. (a -> b) -> a -> b
$ STM (a, Maybe (STMStream a)) -> STMStream a
forall a. STM (a, Maybe (STMStream a)) -> STMStream a
STMStream (STM (a, Maybe (STMStream a)) -> STMStream a)
-> STM (a, Maybe (STMStream a)) -> STMStream a
forall a b. (a -> b) -> a -> b
$ Maybe b -> STM (a, Maybe (STMStream a))
go (b -> Maybe b
forall a. a -> Maybe a
Just b
next'))
readOne :: STMStream a -> IO (a, Maybe (STMStream a))
readOne :: STMStream a -> IO (a, Maybe (STMStream a))
readOne STMStream{STM (a, Maybe (STMStream a))
unSTMStream :: STM (a, Maybe (STMStream a))
unSTMStream :: forall a. STMStream a -> STM (a, Maybe (STMStream a))
unSTMStream} = STM (a, Maybe (STMStream a)) -> IO (a, Maybe (STMStream a))
forall a. STM a -> IO a
STM.atomically STM (a, Maybe (STMStream a))
unSTMStream
readN :: Natural -> STMStream a -> IO [a]
readN :: Natural -> STMStream a -> IO [a]
readN Natural
0 STMStream a
_ = [a] -> IO [a]
forall (f :: * -> *) a. Applicative f => a -> f a
pure []
readN Natural
k STMStream a
s = do
(a
a, Maybe (STMStream a)
s') <- STMStream a -> IO (a, Maybe (STMStream a))
forall a. STMStream a -> IO (a, Maybe (STMStream a))
readOne STMStream a
s
case Maybe (STMStream a)
s' of
Maybe (STMStream a)
Nothing -> [a] -> IO [a]
forall (f :: * -> *) a. Applicative f => a -> f a
pure [a
a]
Just STMStream a
rest -> (:) a
a ([a] -> [a]) -> IO [a] -> IO [a]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Natural -> STMStream a -> IO [a]
forall a. Natural -> STMStream a -> IO [a]
readN (Natural -> Natural
forall a. Enum a => a -> a
pred Natural
k) STMStream a
rest
foldM ::
STMStream a
-> (a -> IO ())
-> IO ()
-> IO ()
foldM :: STMStream a -> (a -> IO ()) -> IO () -> IO ()
foldM STMStream a
s a -> IO ()
handleEvent IO ()
handleStop = do
(a
v, Maybe (STMStream a)
next) <- STMStream a -> IO (a, Maybe (STMStream a))
forall a. STMStream a -> IO (a, Maybe (STMStream a))
readOne STMStream a
s
a -> IO ()
handleEvent a
v
case Maybe (STMStream a)
next of
Maybe (STMStream a)
Nothing -> IO ()
handleStop
Just STMStream a
s' -> STMStream a -> (a -> IO ()) -> IO () -> IO ()
forall a. STMStream a -> (a -> IO ()) -> IO () -> IO ()
foldM STMStream a
s' a -> IO ()
handleEvent IO ()
handleStop