{-# LANGUAGE BangPatterns             #-}
{-# LANGUAGE DisambiguateRecordFields #-}
{-# LANGUAGE FlexibleContexts         #-}
{-# LANGUAGE LambdaCase               #-}
{-# LANGUAGE NamedFieldPuns           #-}
{-# LANGUAGE RankNTypes               #-}
{-# LANGUAGE ScopedTypeVariables      #-}
{-# LANGUAGE TupleSections            #-}
{-# LANGUAGE TypeFamilies             #-}
module Ouroboros.Consensus.Storage.ImmutableDB.Impl.Parser (
    BlockSummary (..)
  , ChunkFileError (..)
  , parseChunkFile
  ) where

import           Codec.CBOR.Decoding (Decoder)
import           Data.Bifunctor (first)
import qualified Data.ByteString.Lazy as Lazy
import           Data.Functor ((<&>))
import           Data.Word (Word64)
import           Streaming (Of, Stream)
import qualified Streaming as S
import qualified Streaming.Prelude as S

import           Ouroboros.Consensus.Block hiding (headerHash)
import           Ouroboros.Consensus.Util.CBOR (withStreamIncrementalOffsets)
import           Ouroboros.Consensus.Util.IOLike

import           Ouroboros.Consensus.Storage.Common
import           Ouroboros.Consensus.Storage.FS.API (HasFS)
import           Ouroboros.Consensus.Storage.FS.CRC

import           Ouroboros.Consensus.Storage.FS.API.Types (FsPath)
import qualified Ouroboros.Consensus.Storage.ImmutableDB.Impl.Index.Secondary as Secondary
import           Ouroboros.Consensus.Storage.ImmutableDB.Impl.Types
import           Ouroboros.Consensus.Storage.Serialisation (DecodeDisk (..),
                     HasBinaryBlockInfo (..))

-- | Information about a block returned by the parser.
--
-- The fields of this record are strict to make sure that by evaluating this
-- record to WHNF, we no longer hold on to the entire block. Otherwise, we might
-- accidentally keep all blocks in a single file in memory during parsing.
data BlockSummary blk = BlockSummary {
      BlockSummary blk -> Entry blk
summaryEntry   :: !(Secondary.Entry blk)
    , BlockSummary blk -> BlockNo
summaryBlockNo :: !BlockNo
    , BlockSummary blk -> SlotNo
summarySlotNo  :: !SlotNo
    }

-- | Parse the contents of a chunk file.
--
-- * The parser decodes each block in the chunk. When one of them fails to
--   decode, a 'ChunkErrRead' error is returned.
--
-- * Each block's checksum is checked against its given expected checksum
--   (coming from the secondary index). When a checksum doesn't match, a
--   'ChunkErrCorrupt' error is returned. When the secondary index is missing or
--   corrupt, and there are no or fewer expected checksums, we use the given
--   (more expensive) integrity checking function instead of checksum
--   comparison.
--
-- * We check that each block fits onto the previous one by checking the hashes.
--   If not, we return a 'ChunkErrHashMismatch' error.
--
-- * An error is returned in the form of:
--
--   > 'Maybe' ('ChunkFileError' blk, 'Word64')
--
--   The 'Word64' corresponds to the offset in the file where the last valid
--   entry ends. Truncating to this offset will remove all invalid data from the
--   file and just leave the valid entries before it. Note that we are not using
--   'Either' because the error might occur after some valid entries have been
--   parsed successfully, in which case we still want these valid entries, but
--   also want to know about the error so we can truncate the file to get rid of
--   the unparseable data.
--
parseChunkFile ::
     forall m blk h r.
     ( IOLike m
     , GetPrevHash blk
     , HasBinaryBlockInfo blk
     , DecodeDisk blk (Lazy.ByteString -> blk)
     )
  => CodecConfig blk
  -> HasFS m h
  -> (blk -> Bool)   -- ^ Check integrity of the block. 'False' = corrupt.
  -> FsPath
  -> [CRC]
  -> (   Stream (Of (BlockSummary blk, ChainHash blk))
                m
                (Maybe (ChunkFileError blk, Word64))
      -> m r
     )
  -> m r
parseChunkFile :: CodecConfig blk
-> HasFS m h
-> (blk -> Bool)
-> FsPath
-> [CRC]
-> (Stream
      (Of (BlockSummary blk, ChainHash blk))
      m
      (Maybe (ChunkFileError blk, Word64))
    -> m r)
-> m r
parseChunkFile CodecConfig blk
ccfg HasFS m h
hasFS blk -> Bool
isNotCorrupt FsPath
fsPath [CRC]
expectedChecksums Stream
  (Of (BlockSummary blk, ChainHash blk))
  m
  (Maybe (ChunkFileError blk, Word64))
-> m r
k =
      HasFS m h
-> (forall s. Decoder s (ByteString -> (blk, CRC)))
-> FsPath
-> (Stream
      (Of (Word64, (Word64, (blk, CRC))))
      m
      (Maybe (ReadIncrementalErr, Word64))
    -> m r)
-> m r
forall (m :: * -> *) h a r.
(IOLike m, HasCallStack) =>
HasFS m h
-> (forall s. Decoder s (ByteString -> a))
-> FsPath
-> (Stream
      (Of (Word64, (Word64, a))) m (Maybe (ReadIncrementalErr, Word64))
    -> m r)
-> m r
withStreamIncrementalOffsets HasFS m h
hasFS forall s. Decoder s (ByteString -> (blk, CRC))
decoder FsPath
fsPath
        ( Stream
  (Of (BlockSummary blk, ChainHash blk))
  m
  (Maybe (ChunkFileError blk, Word64))
-> m r
k
        (Stream
   (Of (BlockSummary blk, ChainHash blk))
   m
   (Maybe (ChunkFileError blk, Word64))
 -> m r)
-> (Stream
      (Of (Word64, (Word64, (blk, CRC))))
      m
      (Maybe (ReadIncrementalErr, Word64))
    -> Stream
         (Of (BlockSummary blk, ChainHash blk))
         m
         (Maybe (ChunkFileError blk, Word64)))
-> Stream
     (Of (Word64, (Word64, (blk, CRC))))
     m
     (Maybe (ReadIncrementalErr, Word64))
-> m r
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Stream
  (Of (BlockSummary blk, ChainHash blk))
  m
  (Maybe (ChunkFileError blk, Word64))
-> Stream
     (Of (BlockSummary blk, ChainHash blk))
     m
     (Maybe (ChunkFileError blk, Word64))
checkIfHashesLineUp
        (Stream
   (Of (BlockSummary blk, ChainHash blk))
   m
   (Maybe (ChunkFileError blk, Word64))
 -> Stream
      (Of (BlockSummary blk, ChainHash blk))
      m
      (Maybe (ChunkFileError blk, Word64)))
-> (Stream
      (Of (Word64, (Word64, (blk, CRC))))
      m
      (Maybe (ReadIncrementalErr, Word64))
    -> Stream
         (Of (BlockSummary blk, ChainHash blk))
         m
         (Maybe (ChunkFileError blk, Word64)))
-> Stream
     (Of (Word64, (Word64, (blk, CRC))))
     m
     (Maybe (ReadIncrementalErr, Word64))
-> Stream
     (Of (BlockSummary blk, ChainHash blk))
     m
     (Maybe (ChunkFileError blk, Word64))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [CRC]
-> Stream
     (Of (Word64, (Word64, (blk, CRC))))
     m
     (Maybe (ChunkFileError blk, Word64))
-> Stream
     (Of (BlockSummary blk, ChainHash blk))
     m
     (Maybe (ChunkFileError blk, Word64))
checkEntries [CRC]
expectedChecksums
        (Stream
   (Of (Word64, (Word64, (blk, CRC))))
   m
   (Maybe (ChunkFileError blk, Word64))
 -> Stream
      (Of (BlockSummary blk, ChainHash blk))
      m
      (Maybe (ChunkFileError blk, Word64)))
-> (Stream
      (Of (Word64, (Word64, (blk, CRC))))
      m
      (Maybe (ReadIncrementalErr, Word64))
    -> Stream
         (Of (Word64, (Word64, (blk, CRC))))
         m
         (Maybe (ChunkFileError blk, Word64)))
-> Stream
     (Of (Word64, (Word64, (blk, CRC))))
     m
     (Maybe (ReadIncrementalErr, Word64))
-> Stream
     (Of (BlockSummary blk, ChainHash blk))
     m
     (Maybe (ChunkFileError blk, Word64))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Maybe (ReadIncrementalErr, Word64)
 -> Maybe (ChunkFileError blk, Word64))
-> Stream
     (Of (Word64, (Word64, (blk, CRC))))
     m
     (Maybe (ReadIncrementalErr, Word64))
-> Stream
     (Of (Word64, (Word64, (blk, CRC))))
     m
     (Maybe (ChunkFileError blk, Word64))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (((ReadIncrementalErr, Word64) -> (ChunkFileError blk, Word64))
-> Maybe (ReadIncrementalErr, Word64)
-> Maybe (ChunkFileError blk, Word64)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((ReadIncrementalErr -> ChunkFileError blk)
-> (ReadIncrementalErr, Word64) -> (ChunkFileError blk, Word64)
forall (p :: * -> * -> *) a b c.
Bifunctor p =>
(a -> b) -> p a c -> p b c
first ReadIncrementalErr -> ChunkFileError blk
forall blk. ReadIncrementalErr -> ChunkFileError blk
ChunkErrRead))
        )
  where
    decoder :: forall s. Decoder s (Lazy.ByteString -> (blk, CRC))
    decoder :: Decoder s (ByteString -> (blk, CRC))
decoder = CodecConfig blk -> forall s. Decoder s (ByteString -> blk)
forall blk a.
DecodeDisk blk a =>
CodecConfig blk -> forall s. Decoder s a
decodeDisk CodecConfig blk
ccfg Decoder s (ByteString -> blk)
-> ((ByteString -> blk) -> ByteString -> (blk, CRC))
-> Decoder s (ByteString -> (blk, CRC))
forall (f :: * -> *) a b. Functor f => f a -> (a -> b) -> f b
<&> \ByteString -> blk
mkBlk ByteString
bs ->
      let !blk :: blk
blk      = ByteString -> blk
mkBlk ByteString
bs
          !checksum :: CRC
checksum = ByteString -> CRC
forall a. CRC32 a => a -> CRC
computeCRC ByteString
bs
      in (blk
blk, CRC
checksum)

    -- | Go over the expected checksums and blocks in parallel. Stop with an
    -- error when a block is corrupt. Yield correct entries along the way.
    --
    -- If there's an expected checksum and it matches the block's checksum,
    -- then the block is correct. Continue with the next.
    --
    -- If they do not match or if there's no expected checksum in the stream,
    -- check the integrity of the block (expensive). When corrupt, stop
    -- parsing blocks and return an error that the block is corrupt. When not
    -- corrupt, continue with the next.
    checkEntries
      :: [CRC]
         -- ^ Expected checksums
      -> Stream (Of (Word64, (Word64, (blk, CRC))))
                m
                (Maybe (ChunkFileError blk, Word64))
         -- ^ Input stream of blocks (with additional info)
      -> Stream (Of (BlockSummary blk, ChainHash blk))
                m
                (Maybe (ChunkFileError blk, Word64))
    checkEntries :: [CRC]
-> Stream
     (Of (Word64, (Word64, (blk, CRC))))
     m
     (Maybe (ChunkFileError blk, Word64))
-> Stream
     (Of (BlockSummary blk, ChainHash blk))
     m
     (Maybe (ChunkFileError blk, Word64))
checkEntries = \[CRC]
expected -> [CRC]
-> ([CRC]
    -> (Word64, (Word64, (blk, CRC)))
    -> Either
         (Maybe (ChunkFileError blk, Word64))
         ((BlockSummary blk, ChainHash blk), [CRC]))
-> Stream
     (Of (Word64, (Word64, (blk, CRC))))
     m
     (Maybe (ChunkFileError blk, Word64))
-> Stream
     (Of (BlockSummary blk, ChainHash blk))
     m
     (Maybe (ChunkFileError blk, Word64))
forall (m :: * -> *) s a r b.
Monad m =>
s
-> (s -> a -> Either r (b, s))
-> Stream (Of a) m r
-> Stream (Of b) m r
mapAccumS [CRC]
expected [CRC]
-> (Word64, (Word64, (blk, CRC)))
-> Either
     (Maybe (ChunkFileError blk, Word64))
     ((BlockSummary blk, ChainHash blk), [CRC])
updateAcc
      where
        updateAcc
          :: [CRC]
          -> (Word64, (Word64, (blk, CRC)))
          -> Either (Maybe (ChunkFileError blk, Word64))
                    ( (BlockSummary blk, ChainHash blk)
                    , [CRC]
                    )
        updateAcc :: [CRC]
-> (Word64, (Word64, (blk, CRC)))
-> Either
     (Maybe (ChunkFileError blk, Word64))
     ((BlockSummary blk, ChainHash blk), [CRC])
updateAcc [CRC]
expected blkAndInfo :: (Word64, (Word64, (blk, CRC)))
blkAndInfo@(Word64
offset, (Word64
_, (blk
blk, CRC
checksum))) =
            case [CRC]
expected of
              CRC
expectedChecksum:[CRC]
expected'
                | CRC
expectedChecksum CRC -> CRC -> Bool
forall a. Eq a => a -> a -> Bool
== CRC
checksum
                -> ((BlockSummary blk, ChainHash blk), [CRC])
-> Either
     (Maybe (ChunkFileError blk, Word64))
     ((BlockSummary blk, ChainHash blk), [CRC])
forall a b. b -> Either a b
Right ((BlockSummary blk, ChainHash blk)
entryAndPrevHash, [CRC]
expected')
              -- No expected entry or a mismatch
              [CRC]
_ | blk -> Bool
isNotCorrupt blk
blk
                  -- The (expensive) integrity check passed, so continue
                -> ((BlockSummary blk, ChainHash blk), [CRC])
-> Either
     (Maybe (ChunkFileError blk, Word64))
     ((BlockSummary blk, ChainHash blk), [CRC])
forall a b. b -> Either a b
Right ((BlockSummary blk, ChainHash blk)
entryAndPrevHash, Int -> [CRC] -> [CRC]
forall a. Int -> [a] -> [a]
drop Int
1 [CRC]
expected)
                | Bool
otherwise
                  -- The block is corrupt, stop
                -> Maybe (ChunkFileError blk, Word64)
-> Either
     (Maybe (ChunkFileError blk, Word64))
     ((BlockSummary blk, ChainHash blk), [CRC])
forall a b. a -> Either a b
Left (Maybe (ChunkFileError blk, Word64)
 -> Either
      (Maybe (ChunkFileError blk, Word64))
      ((BlockSummary blk, ChainHash blk), [CRC]))
-> Maybe (ChunkFileError blk, Word64)
-> Either
     (Maybe (ChunkFileError blk, Word64))
     ((BlockSummary blk, ChainHash blk), [CRC])
forall a b. (a -> b) -> a -> b
$ (ChunkFileError blk, Word64) -> Maybe (ChunkFileError blk, Word64)
forall a. a -> Maybe a
Just (Point blk -> ChunkFileError blk
forall blk. Point blk -> ChunkFileError blk
ChunkErrCorrupt (blk -> Point blk
forall block. HasHeader block => block -> Point block
blockPoint blk
blk), Word64
offset)
          where
            entryAndPrevHash :: (BlockSummary blk, ChainHash blk)
entryAndPrevHash = (Word64, (Word64, (blk, CRC))) -> (BlockSummary blk, ChainHash blk)
entryForBlockAndInfo (Word64, (Word64, (blk, CRC)))
blkAndInfo

    entryForBlockAndInfo
      :: (Word64, (Word64, (blk, CRC)))
      -> (BlockSummary blk, ChainHash blk)
    entryForBlockAndInfo :: (Word64, (Word64, (blk, CRC))) -> (BlockSummary blk, ChainHash blk)
entryForBlockAndInfo (Word64
offset, (Word64
_size, (blk
blk, CRC
checksum))) =
        (BlockSummary blk
blockSummary, ChainHash blk
prevHash)
      where
        -- Don't accidentally hold on to the block!
        !prevHash :: ChainHash blk
prevHash = blk -> ChainHash blk
forall blk. GetPrevHash blk => blk -> ChainHash blk
blockPrevHash blk
blk
        entry :: Entry blk
entry = Entry :: forall blk.
BlockOffset
-> HeaderOffset
-> HeaderSize
-> CRC
-> HeaderHash blk
-> BlockOrEBB
-> Entry blk
Secondary.Entry {
              blockOffset :: BlockOffset
blockOffset  = Word64 -> BlockOffset
Secondary.BlockOffset  Word64
offset
            , headerOffset :: HeaderOffset
headerOffset = Word16 -> HeaderOffset
Secondary.HeaderOffset Word16
headerOffset
            , headerSize :: HeaderSize
headerSize   = Word16 -> HeaderSize
Secondary.HeaderSize   Word16
headerSize
            , checksum :: CRC
checksum     = CRC
checksum
            , headerHash :: HeaderHash blk
headerHash   = blk -> HeaderHash blk
forall b. HasHeader b => b -> HeaderHash b
blockHash blk
blk
            , blockOrEBB :: BlockOrEBB
blockOrEBB   = case blk -> Maybe EpochNo
forall blk. GetHeader blk => blk -> Maybe EpochNo
blockIsEBB blk
blk of
                Just EpochNo
epoch -> EpochNo -> BlockOrEBB
EBB EpochNo
epoch
                Maybe EpochNo
Nothing    -> SlotNo -> BlockOrEBB
Block (blk -> SlotNo
forall b. HasHeader b => b -> SlotNo
blockSlot blk
blk)
            }
        !blockSummary :: BlockSummary blk
blockSummary = BlockSummary :: forall blk. Entry blk -> BlockNo -> SlotNo -> BlockSummary blk
BlockSummary {
              summaryEntry :: Entry blk
summaryEntry   = Entry blk
entry
            , summaryBlockNo :: BlockNo
summaryBlockNo = blk -> BlockNo
forall b. HasHeader b => b -> BlockNo
blockNo blk
blk
            , summarySlotNo :: SlotNo
summarySlotNo  = blk -> SlotNo
forall b. HasHeader b => b -> SlotNo
blockSlot blk
blk

          }
        BinaryBlockInfo { Word16
headerOffset :: BinaryBlockInfo -> Word16
headerOffset :: Word16
headerOffset, Word16
headerSize :: BinaryBlockInfo -> Word16
headerSize :: Word16
headerSize } = blk -> BinaryBlockInfo
forall blk. HasBinaryBlockInfo blk => blk -> BinaryBlockInfo
getBinaryBlockInfo blk
blk


    checkIfHashesLineUp
      :: Stream (Of (BlockSummary blk, ChainHash blk))
                m
                (Maybe (ChunkFileError blk, Word64))
      -> Stream (Of (BlockSummary blk, ChainHash blk))
                m
                (Maybe (ChunkFileError blk, Word64))
    checkIfHashesLineUp :: Stream
  (Of (BlockSummary blk, ChainHash blk))
  m
  (Maybe (ChunkFileError blk, Word64))
-> Stream
     (Of (BlockSummary blk, ChainHash blk))
     m
     (Maybe (ChunkFileError blk, Word64))
checkIfHashesLineUp = ((BlockSummary blk, ChainHash blk)
 -> Either
      (Maybe (ChunkFileError blk, Word64))
      ((BlockSummary blk, ChainHash blk), HeaderHash blk))
-> (HeaderHash blk
    -> (BlockSummary blk, ChainHash blk)
    -> Either
         (Maybe (ChunkFileError blk, Word64))
         ((BlockSummary blk, ChainHash blk), HeaderHash blk))
-> Stream
     (Of (BlockSummary blk, ChainHash blk))
     m
     (Maybe (ChunkFileError blk, Word64))
-> Stream
     (Of (BlockSummary blk, ChainHash blk))
     m
     (Maybe (ChunkFileError blk, Word64))
forall (m :: * -> *) a b r s.
Monad m =>
(a -> Either r (b, s))
-> (s -> a -> Either r (b, s))
-> Stream (Of a) m r
-> Stream (Of b) m r
mapAccumS0 (BlockSummary blk, ChainHash blk)
-> Either
     (Maybe (ChunkFileError blk, Word64))
     ((BlockSummary blk, ChainHash blk), HeaderHash blk)
forall blk b a.
(BlockSummary blk, b)
-> Either a ((BlockSummary blk, b), HeaderHash blk)
checkFirst HeaderHash blk
-> (BlockSummary blk, ChainHash blk)
-> Either
     (Maybe (ChunkFileError blk, Word64))
     ((BlockSummary blk, ChainHash blk), HeaderHash blk)
forall blk blk.
StandardHash blk =>
HeaderHash blk
-> (BlockSummary blk, ChainHash blk)
-> Either
     (Maybe (ChunkFileError blk, Word64))
     ((BlockSummary blk, ChainHash blk), HeaderHash blk)
checkNext
      where
        -- We pass the hash of the previous block around as the state (@s@).
        checkFirst :: (BlockSummary blk, b)
-> Either a ((BlockSummary blk, b), HeaderHash blk)
checkFirst x :: (BlockSummary blk, b)
x@(BlockSummary { Entry blk
summaryEntry :: Entry blk
summaryEntry :: forall blk. BlockSummary blk -> Entry blk
summaryEntry }, b
_) =
            ((BlockSummary blk, b), HeaderHash blk)
-> Either a ((BlockSummary blk, b), HeaderHash blk)
forall a b. b -> Either a b
Right ((BlockSummary blk, b)
x, Entry blk -> HeaderHash blk
forall blk. Entry blk -> HeaderHash blk
Secondary.headerHash Entry blk
summaryEntry)

        checkNext :: HeaderHash blk
-> (BlockSummary blk, ChainHash blk)
-> Either
     (Maybe (ChunkFileError blk, Word64))
     ((BlockSummary blk, ChainHash blk), HeaderHash blk)
checkNext HeaderHash blk
hashOfPrevBlock x :: (BlockSummary blk, ChainHash blk)
x@(BlockSummary { Entry blk
summaryEntry :: Entry blk
summaryEntry :: forall blk. BlockSummary blk -> Entry blk
summaryEntry }, ChainHash blk
prevHash)
          | ChainHash blk
prevHash ChainHash blk -> ChainHash blk -> Bool
forall a. Eq a => a -> a -> Bool
== HeaderHash blk -> ChainHash blk
forall b. HeaderHash b -> ChainHash b
BlockHash HeaderHash blk
hashOfPrevBlock
          = ((BlockSummary blk, ChainHash blk), HeaderHash blk)
-> Either
     (Maybe (ChunkFileError blk, Word64))
     ((BlockSummary blk, ChainHash blk), HeaderHash blk)
forall a b. b -> Either a b
Right ((BlockSummary blk, ChainHash blk)
x, Entry blk -> HeaderHash blk
forall blk. Entry blk -> HeaderHash blk
Secondary.headerHash Entry blk
summaryEntry)
          | Bool
otherwise
          = Maybe (ChunkFileError blk, Word64)
-> Either
     (Maybe (ChunkFileError blk, Word64))
     ((BlockSummary blk, ChainHash blk), HeaderHash blk)
forall a b. a -> Either a b
Left ((ChunkFileError blk, Word64) -> Maybe (ChunkFileError blk, Word64)
forall a. a -> Maybe a
Just (ChunkFileError blk
err, Word64
offset))
            where
              err :: ChunkFileError blk
err = HeaderHash blk -> ChainHash blk -> ChunkFileError blk
forall blk. HeaderHash blk -> ChainHash blk -> ChunkFileError blk
ChunkErrHashMismatch HeaderHash blk
hashOfPrevBlock ChainHash blk
prevHash
              offset :: Word64
offset = BlockOffset -> Word64
Secondary.unBlockOffset (BlockOffset -> Word64) -> BlockOffset -> Word64
forall a b. (a -> b) -> a -> b
$ Entry blk -> BlockOffset
forall blk. Entry blk -> BlockOffset
Secondary.blockOffset Entry blk
summaryEntry

{-------------------------------------------------------------------------------
  Streaming utilities
-------------------------------------------------------------------------------}

-- | Thread some state through a 'Stream'. An early return is possible by
-- returning 'Left'.
mapAccumS
  :: Monad m
  => s  -- ^ Initial state
  -> (s -> a -> Either r (b, s))
  -> Stream (Of a) m r
  -> Stream (Of b) m r
mapAccumS :: s
-> (s -> a -> Either r (b, s))
-> Stream (Of a) m r
-> Stream (Of b) m r
mapAccumS s
st0 s -> a -> Either r (b, s)
updateAcc = s -> Stream (Of a) m r -> Stream (Of b) m r
go s
st0
  where
    go :: s -> Stream (Of a) m r -> Stream (Of b) m r
go s
st Stream (Of a) m r
input = m (Either r (a, Stream (Of a) m r))
-> Stream (Of b) m (Either r (a, Stream (Of a) m r))
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
S.lift (Stream (Of a) m r -> m (Either r (a, Stream (Of a) m r))
forall (m :: * -> *) a r.
Monad m =>
Stream (Of a) m r -> m (Either r (a, Stream (Of a) m r))
S.next Stream (Of a) m r
input) Stream (Of b) m (Either r (a, Stream (Of a) m r))
-> (Either r (a, Stream (Of a) m r) -> Stream (Of b) m r)
-> Stream (Of b) m r
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
      Left  r
r           -> r -> Stream (Of b) m r
forall (m :: * -> *) a. Monad m => a -> m a
return r
r
      Right (a
a, Stream (Of a) m r
input') -> case s -> a -> Either r (b, s)
updateAcc s
st a
a of
        Left r
r         -> r -> Stream (Of b) m r
forall (m :: * -> *) a. Monad m => a -> m a
return r
r
        Right (b
b, s
st') -> b -> Stream (Of b) m ()
forall (m :: * -> *) a. Monad m => a -> Stream (Of a) m ()
S.yield b
b Stream (Of b) m () -> Stream (Of b) m r -> Stream (Of b) m r
forall (f :: * -> *) a b. Applicative f => f a -> f b -> f b
*> s -> Stream (Of a) m r -> Stream (Of b) m r
go s
st' Stream (Of a) m r
input'

-- | Variant of 'mapAccumS' that calls the first function argument on the
-- first element in the stream to construct the initial state. For all
-- elements in the stream after the first one, the second function argument is
-- used.
mapAccumS0
  :: forall m a b r s. Monad m
  => (a -> Either r (b, s))
  -> (s -> a -> Either r (b, s))
  -> Stream (Of a) m r
  -> Stream (Of b) m r
mapAccumS0 :: (a -> Either r (b, s))
-> (s -> a -> Either r (b, s))
-> Stream (Of a) m r
-> Stream (Of b) m r
mapAccumS0 a -> Either r (b, s)
initAcc s -> a -> Either r (b, s)
updateAcc = Maybe s
-> (Maybe s -> a -> Either r (b, Maybe s))
-> Stream (Of a) m r
-> Stream (Of b) m r
forall (m :: * -> *) s a r b.
Monad m =>
s
-> (s -> a -> Either r (b, s))
-> Stream (Of a) m r
-> Stream (Of b) m r
mapAccumS Maybe s
forall a. Maybe a
Nothing Maybe s -> a -> Either r (b, Maybe s)
updateAcc'
  where
    updateAcc' :: Maybe s -> a -> Either r (b, Maybe s)
    updateAcc' :: Maybe s -> a -> Either r (b, Maybe s)
updateAcc' Maybe s
mbSt = ((b, s) -> (b, Maybe s))
-> Either r (b, s) -> Either r (b, Maybe s)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((s -> Maybe s) -> (b, s) -> (b, Maybe s)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap s -> Maybe s
forall a. a -> Maybe a
Just) (Either r (b, s) -> Either r (b, Maybe s))
-> (a -> Either r (b, s)) -> a -> Either r (b, Maybe s)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (a -> Either r (b, s))
-> (s -> a -> Either r (b, s)) -> Maybe s -> a -> Either r (b, s)
forall b a. b -> (a -> b) -> Maybe a -> b
maybe a -> Either r (b, s)
initAcc s -> a -> Either r (b, s)
updateAcc Maybe s
mbSt