{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE StandaloneDeriving #-}
module Ouroboros.Consensus.Storage.VolatileDB.Impl.Parser (
ParseError (..)
, ParsedBlockInfo (..)
, parseBlockFile
, extractBlockInfo
) where
import Data.Bifunctor (bimap)
import qualified Data.ByteString.Lazy as Lazy
import Data.Word (Word64)
import Streaming.Prelude (Of (..), Stream)
import qualified Streaming.Prelude as S
import Ouroboros.Consensus.Block
import Ouroboros.Consensus.Util.CBOR (ReadIncrementalErr (..),
withStreamIncrementalOffsets)
import Ouroboros.Consensus.Util.IOLike
import Ouroboros.Consensus.Storage.FS.API (HasFS)
import Ouroboros.Consensus.Storage.FS.API.Types (FsPath)
import Ouroboros.Consensus.Storage.Serialisation
import Ouroboros.Consensus.Storage.VolatileDB.API (BlockInfo (..))
import Ouroboros.Consensus.Storage.VolatileDB.Impl.Types
data ParsedBlockInfo blk = ParsedBlockInfo {
ParsedBlockInfo blk -> BlockOffset
pbiBlockOffset :: !BlockOffset
, ParsedBlockInfo blk -> BlockSize
pbiBlockSize :: !BlockSize
, ParsedBlockInfo blk -> BlockInfo blk
pbiBlockInfo :: !(BlockInfo blk)
, ParsedBlockInfo blk -> SomeSecond (NestedCtxt Header) blk
pbiNestedCtxt :: !(SomeSecond (NestedCtxt Header) blk)
}
parseBlockFile ::
forall m blk h.
( IOLike m
, GetPrevHash blk
, HasBinaryBlockInfo blk
, HasNestedContent Header blk
, DecodeDisk blk (Lazy.ByteString -> blk)
)
=> CodecConfig blk
-> HasFS m h
-> (blk -> Bool)
-> BlockValidationPolicy
-> FsPath
-> m ( [ParsedBlockInfo blk]
, Maybe (ParseError blk, BlockOffset)
)
parseBlockFile :: CodecConfig blk
-> HasFS m h
-> (blk -> Bool)
-> BlockValidationPolicy
-> FsPath
-> m ([ParsedBlockInfo blk], Maybe (ParseError blk, BlockOffset))
parseBlockFile CodecConfig blk
ccfg HasFS m h
hasFS blk -> Bool
isNotCorrupt BlockValidationPolicy
validationPolicy FsPath
fsPath =
HasFS m h
-> (forall s. Decoder s (ByteString -> blk))
-> FsPath
-> (Stream
(Of (Word64, (Word64, blk))) m (Maybe (ReadIncrementalErr, Word64))
-> m ([ParsedBlockInfo blk], Maybe (ParseError blk, BlockOffset)))
-> m ([ParsedBlockInfo blk], Maybe (ParseError blk, BlockOffset))
forall (m :: * -> *) h a r.
(IOLike m, HasCallStack) =>
HasFS m h
-> (forall s. Decoder s (ByteString -> a))
-> FsPath
-> (Stream
(Of (Word64, (Word64, a))) m (Maybe (ReadIncrementalErr, Word64))
-> m r)
-> m r
withStreamIncrementalOffsets HasFS m h
hasFS (CodecConfig blk -> forall s. Decoder s (ByteString -> blk)
forall blk a.
DecodeDisk blk a =>
CodecConfig blk -> forall s. Decoder s a
decodeDisk CodecConfig blk
ccfg) FsPath
fsPath ((Stream
(Of (Word64, (Word64, blk))) m (Maybe (ReadIncrementalErr, Word64))
-> m ([ParsedBlockInfo blk], Maybe (ParseError blk, BlockOffset)))
-> m ([ParsedBlockInfo blk], Maybe (ParseError blk, BlockOffset)))
-> (Stream
(Of (Word64, (Word64, blk))) m (Maybe (ReadIncrementalErr, Word64))
-> m ([ParsedBlockInfo blk], Maybe (ParseError blk, BlockOffset)))
-> m ([ParsedBlockInfo blk], Maybe (ParseError blk, BlockOffset))
forall a b. (a -> b) -> a -> b
$
[ParsedBlockInfo blk]
-> Stream
(Of (Word64, (Word64, blk))) m (Maybe (ReadIncrementalErr, Word64))
-> m ([ParsedBlockInfo blk], Maybe (ParseError blk, BlockOffset))
checkEntries []
where
noValidation :: Bool
noValidation :: Bool
noValidation = BlockValidationPolicy
validationPolicy BlockValidationPolicy -> BlockValidationPolicy -> Bool
forall a. Eq a => a -> a -> Bool
== BlockValidationPolicy
NoValidation
checkEntries ::
[ParsedBlockInfo blk]
-> Stream (Of (Word64, (Word64, blk)))
m
(Maybe (ReadIncrementalErr, Word64))
-> m ( [ParsedBlockInfo blk]
, Maybe (ParseError blk, BlockOffset)
)
checkEntries :: [ParsedBlockInfo blk]
-> Stream
(Of (Word64, (Word64, blk))) m (Maybe (ReadIncrementalErr, Word64))
-> m ([ParsedBlockInfo blk], Maybe (ParseError blk, BlockOffset))
checkEntries [ParsedBlockInfo blk]
parsed Stream
(Of (Word64, (Word64, blk))) m (Maybe (ReadIncrementalErr, Word64))
stream = Stream
(Of (Word64, (Word64, blk))) m (Maybe (ReadIncrementalErr, Word64))
-> m (Either
(Maybe (ReadIncrementalErr, Word64))
((Word64, (Word64, blk)),
Stream
(Of (Word64, (Word64, blk)))
m
(Maybe (ReadIncrementalErr, Word64))))
forall (m :: * -> *) a r.
Monad m =>
Stream (Of a) m r -> m (Either r (a, Stream (Of a) m r))
S.next Stream
(Of (Word64, (Word64, blk))) m (Maybe (ReadIncrementalErr, Word64))
stream m (Either
(Maybe (ReadIncrementalErr, Word64))
((Word64, (Word64, blk)),
Stream
(Of (Word64, (Word64, blk)))
m
(Maybe (ReadIncrementalErr, Word64))))
-> (Either
(Maybe (ReadIncrementalErr, Word64))
((Word64, (Word64, blk)),
Stream
(Of (Word64, (Word64, blk)))
m
(Maybe (ReadIncrementalErr, Word64)))
-> m ([ParsedBlockInfo blk], Maybe (ParseError blk, BlockOffset)))
-> m ([ParsedBlockInfo blk], Maybe (ParseError blk, BlockOffset))
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Left Maybe (ReadIncrementalErr, Word64)
mbErr
-> ([ParsedBlockInfo blk], Maybe (ParseError blk, BlockOffset))
-> m ([ParsedBlockInfo blk], Maybe (ParseError blk, BlockOffset))
forall (m :: * -> *) a. Monad m => a -> m a
return ([ParsedBlockInfo blk] -> [ParsedBlockInfo blk]
forall a. [a] -> [a]
reverse [ParsedBlockInfo blk]
parsed, (ReadIncrementalErr -> ParseError blk)
-> (Word64 -> BlockOffset)
-> (ReadIncrementalErr, Word64)
-> (ParseError blk, BlockOffset)
forall (p :: * -> * -> *) a b c d.
Bifunctor p =>
(a -> b) -> (c -> d) -> p a c -> p b d
bimap ReadIncrementalErr -> ParseError blk
forall blk. ReadIncrementalErr -> ParseError blk
BlockReadErr Word64 -> BlockOffset
BlockOffset ((ReadIncrementalErr, Word64) -> (ParseError blk, BlockOffset))
-> Maybe (ReadIncrementalErr, Word64)
-> Maybe (ParseError blk, BlockOffset)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe (ReadIncrementalErr, Word64)
mbErr)
Right ((Word64
offset, (Word64
size, blk
blk)), Stream
(Of (Word64, (Word64, blk))) m (Maybe (ReadIncrementalErr, Word64))
stream')
| Bool
noValidation Bool -> Bool -> Bool
|| blk -> Bool
isNotCorrupt blk
blk
-> let !blockInfo :: BlockInfo blk
blockInfo = blk -> BlockInfo blk
forall blk.
(GetPrevHash blk, HasBinaryBlockInfo blk) =>
blk -> BlockInfo blk
extractBlockInfo blk
blk
!newParsed :: ParsedBlockInfo blk
newParsed = ParsedBlockInfo :: forall blk.
BlockOffset
-> BlockSize
-> BlockInfo blk
-> SomeSecond (NestedCtxt Header) blk
-> ParsedBlockInfo blk
ParsedBlockInfo {
pbiBlockOffset :: BlockOffset
pbiBlockOffset = Word64 -> BlockOffset
BlockOffset Word64
offset
, pbiBlockSize :: BlockSize
pbiBlockSize = Word32 -> BlockSize
BlockSize (Word32 -> BlockSize) -> Word32 -> BlockSize
forall a b. (a -> b) -> a -> b
$ Word64 -> Word32
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word64
size
, pbiBlockInfo :: BlockInfo blk
pbiBlockInfo = BlockInfo blk
blockInfo
, pbiNestedCtxt :: SomeSecond (NestedCtxt Header) blk
pbiNestedCtxt = case Header blk -> DepPair (NestedCtxt Header blk)
forall (f :: * -> *) blk.
HasNestedContent f blk =>
f blk -> DepPair (NestedCtxt f blk)
unnest (blk -> Header blk
forall blk. GetHeader blk => blk -> Header blk
getHeader blk
blk) of
DepPair NestedCtxt Header blk a
nestedCtxt a
_ -> NestedCtxt Header blk a -> SomeSecond (NestedCtxt Header) blk
forall (f :: * -> * -> *) a b. f a b -> SomeSecond f a
SomeSecond NestedCtxt Header blk a
nestedCtxt
}
in [ParsedBlockInfo blk]
-> Stream
(Of (Word64, (Word64, blk))) m (Maybe (ReadIncrementalErr, Word64))
-> m ([ParsedBlockInfo blk], Maybe (ParseError blk, BlockOffset))
checkEntries (ParsedBlockInfo blk
newParsed ParsedBlockInfo blk
-> [ParsedBlockInfo blk] -> [ParsedBlockInfo blk]
forall a. a -> [a] -> [a]
: [ParsedBlockInfo blk]
parsed) Stream
(Of (Word64, (Word64, blk))) m (Maybe (ReadIncrementalErr, Word64))
stream'
| Bool
otherwise
-> let !hash :: HeaderHash blk
hash = blk -> HeaderHash blk
forall b. HasHeader b => b -> HeaderHash b
blockHash blk
blk
in ([ParsedBlockInfo blk], Maybe (ParseError blk, BlockOffset))
-> m ([ParsedBlockInfo blk], Maybe (ParseError blk, BlockOffset))
forall (m :: * -> *) a. Monad m => a -> m a
return ( [ParsedBlockInfo blk] -> [ParsedBlockInfo blk]
forall a. [a] -> [a]
reverse [ParsedBlockInfo blk]
parsed
, (ParseError blk, BlockOffset)
-> Maybe (ParseError blk, BlockOffset)
forall a. a -> Maybe a
Just (HeaderHash blk -> ParseError blk
forall blk. HeaderHash blk -> ParseError blk
BlockCorruptedErr HeaderHash blk
hash, Word64 -> BlockOffset
BlockOffset Word64
offset)
)
extractBlockInfo ::
(GetPrevHash blk, HasBinaryBlockInfo blk)
=> blk
-> BlockInfo blk
blk
blk = BlockInfo :: forall blk.
HeaderHash blk
-> SlotNo
-> BlockNo
-> ChainHash blk
-> IsEBB
-> Word16
-> Word16
-> BlockInfo blk
BlockInfo {
biHash :: HeaderHash blk
biHash = blk -> HeaderHash blk
forall b. HasHeader b => b -> HeaderHash b
blockHash blk
blk
, biSlotNo :: SlotNo
biSlotNo = blk -> SlotNo
forall b. HasHeader b => b -> SlotNo
blockSlot blk
blk
, biBlockNo :: BlockNo
biBlockNo = blk -> BlockNo
forall b. HasHeader b => b -> BlockNo
blockNo blk
blk
, biIsEBB :: IsEBB
biIsEBB = blk -> IsEBB
forall blk. GetHeader blk => blk -> IsEBB
blockToIsEBB blk
blk
, biPrevHash :: ChainHash blk
biPrevHash = blk -> ChainHash blk
forall blk. GetPrevHash blk => blk -> ChainHash blk
blockPrevHash blk
blk
, biHeaderOffset :: Word16
biHeaderOffset = Word16
headerOffset
, biHeaderSize :: Word16
biHeaderSize = Word16
headerSize
}
where
BinaryBlockInfo { Word16
headerOffset :: BinaryBlockInfo -> Word16
headerOffset :: Word16
headerOffset, Word16
headerSize :: BinaryBlockInfo -> Word16
headerSize :: Word16
headerSize } = blk -> BinaryBlockInfo
forall blk. HasBinaryBlockInfo blk => blk -> BinaryBlockInfo
getBinaryBlockInfo blk
blk