diff --git a/dataframe.cabal b/dataframe.cabal index 4a984cd..2b7eaed 100644 --- a/dataframe.cabal +++ b/dataframe.cabal @@ -86,6 +86,7 @@ library DataFrame.IO.Parquet.Compression, DataFrame.IO.Parquet.Encoding, DataFrame.IO.Parquet.Page, + DataFrame.IO.Parquet.Seeking, DataFrame.IO.Parquet.Time, DataFrame.IO.Parquet.Types, DataFrame.Lazy.IO.CSV, @@ -138,6 +139,8 @@ library stm >= 2.5 && < 3, filepath >= 1.4 && < 2, Glob >= 0.10 && < 1, + streamly-core, + streamly-bytestring, hs-source-dirs: src c-sources: cbits/process_csv.c diff --git a/src/DataFrame/IO/Parquet.hs b/src/DataFrame/IO/Parquet.hs index 243c3c3..6424427 100644 --- a/src/DataFrame/IO/Parquet.hs +++ b/src/DataFrame/IO/Parquet.hs @@ -14,7 +14,7 @@ import Data.Either import Data.IORef import Data.Int import qualified Data.List as L -import qualified Data.Map as M +import qualified Data.Map.Strict as M import qualified Data.Set as S import qualified Data.Text as T import Data.Text.Encoding @@ -38,7 +38,9 @@ import DataFrame.IO.Parquet.Types import System.Directory (doesDirectoryExist) import qualified Data.Vector.Unboxed as VU +import DataFrame.IO.Parquet.Seeking import System.FilePath (()) +import System.IO (IOMode (ReadMode)) -- Options ----------------------------------------------------------------- @@ -61,6 +63,10 @@ data ParquetReadOptions = ParquetReadOptions -- ^ Optional row filter expression applied before projection. , rowRange :: Maybe (Int, Int) -- ^ Optional row slice @(start, end)@ with start-inclusive/end-exclusive semantics. + , forceNonSeekable :: Maybe Bool + {- ^ Force reader to buffer entire file in memory, even if it is seekable. + For internal testing only, to test the fallback path. + -} } deriving (Eq, Show) @@ -82,6 +88,7 @@ defaultParquetReadOptions = { selectedColumns = Nothing , predicate = Nothing , rowRange = Nothing + , forceNonSeekable = Nothing } -- Public API -------------------------------------------------------------- @@ -131,8 +138,8 @@ cleanColPath nodes path = go nodes path False p : go (sChildren n) ps False readParquetWithOpts :: ParquetReadOptions -> FilePath -> IO DataFrame -readParquetWithOpts opts path = do - (fileMetadata, contents) <- readMetadataFromPath path +readParquetWithOpts opts path = withFileBufferedOrSeekable (forceNonSeekable opts) path ReadMode $ \file -> do + fileMetadata <- readMetadataFromHandle file let columnPaths = getColumnPaths (drop 1 $ schema fileMetadata) let columnNames = map fst columnPaths let leafNames = map (last . T.splitOn ".") columnNames @@ -205,7 +212,11 @@ readParquetWithOpts opts path = do else colDataPageOffset let colLength = columnTotalCompressedSize metadata - let columnBytes = BSO.take (fromIntegral colLength) (BSO.drop (fromIntegral colStart) contents) + columnBytes <- + seekAndReadBytes + (Just (AbsoluteSeek, fromIntegral colStart)) + (fromIntegral colLength) + file pages <- readAllPages (columnCodec metadata) columnBytes @@ -237,13 +248,13 @@ readParquetWithOpts opts path = do Nothing -> do mc <- DI.newMutableColumn totalRows column DI.copyIntoMutableColumn mc 0 column - modifyIORef colMutMap (M.insert colFullName mc) - modifyIORef colOffMap (M.insert colFullName (DI.columnLength column)) + modifyIORef' colMutMap (M.insert colFullName mc) + modifyIORef' colOffMap (M.insert colFullName (DI.columnLength column)) Just mc -> do off <- (M.! colFullName) <$> readIORef colOffMap DI.copyIntoMutableColumn mc off column - modifyIORef colOffMap (M.adjust (+ DI.columnLength column) colFullName) - modifyIORef lTypeMap (M.insert colFullName lType) + modifyIORef' colOffMap (M.adjust (+ DI.columnLength column) colFullName) + modifyIORef' lTypeMap (M.insert colFullName lType) finalMutMap <- readIORef colMutMap finalColMap <- @@ -322,28 +333,40 @@ applyReadOptions opts = -- File and metadata parsing ----------------------------------------------- +-- | read the file in memory at once, parse magicString and return the entire file ByteString readMetadataFromPath :: FilePath -> IO (FileMetadata, BSO.ByteString) readMetadataFromPath path = do contents <- BSO.readFile path - let (size, magicString) = contents `seq` readMetadataSizeFromFooter contents + let (size, magicString) = readMetadataSizeFromFooter contents when (magicString /= "PAR1") $ error "Invalid Parquet file" meta <- readMetadata contents size pure (meta, contents) -readMetadataSizeFromFooter :: BSO.ByteString -> (Int, BSO.ByteString) -readMetadataSizeFromFooter contents = +-- | read from the end of the file, parse magicString and return the entire file ByteString +readMetadataFromHandle :: FileBufferedOrSeekable -> IO FileMetadata +readMetadataFromHandle sh = do + footerBs <- readLastBytes (fromIntegral footerSize) sh + let (size, magicString) = readMetadataSizeFromFooterSlice footerBs + when (magicString /= "PAR1") $ error "Invalid Parquet file" + readMetadataByHandleMetaSize sh size + +-- | Takes the last 8 bit of the file to parse metadata size and magic string +readMetadataSizeFromFooterSlice :: BSO.ByteString -> (Int, BSO.ByteString) +readMetadataSizeFromFooterSlice contents = let - footerOffSet = BSO.length contents - 8 sizeBytes = map (fromIntegral @Word8 @Int32 . BSO.index contents) - [footerOffSet .. footerOffSet + 3] + [0 .. 3] size = fromIntegral $ L.foldl' (.|.) 0 $ zipWith shift sizeBytes [0, 8, 16, 24] - magicStringBytes = map (BSO.index contents) [footerOffSet + 4 .. footerOffSet + 7] + magicStringBytes = map (BSO.index contents) [4 .. 7] magicString = BSO.pack magicStringBytes in (size, magicString) +readMetadataSizeFromFooter :: BSO.ByteString -> (Int, BSO.ByteString) +readMetadataSizeFromFooter = readMetadataSizeFromFooterSlice . BSO.takeEnd 8 + -- Schema navigation ------------------------------------------------------- getColumnPaths :: [SchemaElement] -> [(T.Text, Int)] diff --git a/src/DataFrame/IO/Parquet/Binary.hs b/src/DataFrame/IO/Parquet/Binary.hs index 3fc85bc..147beea 100644 --- a/src/DataFrame/IO/Parquet/Binary.hs +++ b/src/DataFrame/IO/Parquet/Binary.hs @@ -100,7 +100,7 @@ readAndAdvance :: IORef Int -> BS.ByteString -> IO Word8 readAndAdvance bufferPos buffer = do pos <- readIORef bufferPos let b = BS.index buffer pos - modifyIORef bufferPos (+ 1) + modifyIORef' bufferPos (+ 1) return b readVarIntFromBuffer :: (Integral a) => BS.ByteString -> IORef Int -> IO a diff --git a/src/DataFrame/IO/Parquet/Seeking.hs b/src/DataFrame/IO/Parquet/Seeking.hs new file mode 100644 index 0000000..21888b1 --- /dev/null +++ b/src/DataFrame/IO/Parquet/Seeking.hs @@ -0,0 +1,144 @@ +{- | This module contains low-level utilities around file seeking + +potentially also contains all Streamly related low-level utilities. + +later this module can be renamed / moved to an internal module. +-} +module DataFrame.IO.Parquet.Seeking ( + SeekableHandle (getSeekableHandle), + SeekMode (..), + FileBufferedOrSeekable (..), + advanceBytes, + mkFileBufferedOrSeekable, + mkSeekableHandle, + readLastBytes, + seekAndReadBytes, + seekAndStreamBytes, + withFileBufferedOrSeekable, +) where + +import Control.Monad +import Control.Monad.IO.Class +import qualified Data.ByteString as BS +import Data.IORef +import Data.Int +import Data.Word +import Streamly.Data.Stream (Stream) +import qualified Streamly.Data.Stream as S +import qualified Streamly.External.ByteString as SBS +import qualified Streamly.FileSystem.Handle as SHandle +import System.IO + +{- | This handle carries a proof that it must be seekable. +Note: Handle and SeekableHandle are not thread safe, should not be +shared across threads, beaware when running parallel/concurrent code. + +Not seekable: + - stdin / stdout + - pipes / FIFOs + +But regular files are always seekable. Parquet fundamentally wants random +access, a non-seekable source will not support effecient access without +buffering the entire file. +-} +newtype SeekableHandle = SeekableHandle {getSeekableHandle :: Handle} + +{- | If we truely want to support non-seekable files, we need to also consider the case +to buffer the entire file in memory. + +Not thread safe, contains mutable reference (as Handle already is). + +If we need concurrent / parallel parsing or something, we need to read into ByteString +first, not sharing the same handle. +-} +data FileBufferedOrSeekable + = FileBuffered !(IORef Int64) !BS.ByteString + | FileSeekable !SeekableHandle + +-- | Smart constructor for SeekableHandle +mkSeekableHandle :: Handle -> IO (Maybe SeekableHandle) +mkSeekableHandle h = do + seekable <- hIsSeekable h + pure $ if seekable then Just (SeekableHandle h) else Nothing + +-- | For testing only +type ForceNonSeekable = Maybe Bool + +{- | Smart constructor for FileBufferedOrSeekable, tries to keep in the seekable case +if possible. +-} +mkFileBufferedOrSeekable :: + ForceNonSeekable -> Handle -> IO FileBufferedOrSeekable +mkFileBufferedOrSeekable forceNonSeek h = do + seekable <- hIsSeekable h + if not seekable || forceNonSeek == Just True + then FileBuffered <$> newIORef 0 <*> BS.hGetContents h + else pure $ FileSeekable $ SeekableHandle h + +{- | With / bracket pattern for FileBufferedOrSeekable + +Warning: do not return the FileBufferedOrSeekable outside the scope of the action as +it will be closed. +-} +withFileBufferedOrSeekable :: + ForceNonSeekable -> + FilePath -> + IOMode -> + (FileBufferedOrSeekable -> IO a) -> + IO a +withFileBufferedOrSeekable forceNonSeek path ioMode action = withFile path ioMode $ \h -> do + fbos <- mkFileBufferedOrSeekable forceNonSeek h + action fbos + +-- | Read from the end, useful for reading metadata without loading entire file +readLastBytes :: Integer -> FileBufferedOrSeekable -> IO BS.ByteString +readLastBytes n (FileSeekable sh) = do + let h = getSeekableHandle sh + hSeek h SeekFromEnd (negate n) + S.fold SBS.write (SHandle.read h) +readLastBytes n (FileBuffered i bs) = do + writeIORef i (fromIntegral $ BS.length bs) + when (n > fromIntegral (BS.length bs)) $ error "lastBytes: n > length bs" + pure $ BS.drop (BS.length bs - fromIntegral n) bs + +-- | Note: this does not guarantee n bytes (if it ends early) +advanceBytes :: Int -> FileBufferedOrSeekable -> IO BS.ByteString +advanceBytes = seekAndReadBytes Nothing + +-- | Note: this does not guarantee n bytes (if it ends early) +seekAndReadBytes :: + Maybe (SeekMode, Integer) -> Int -> FileBufferedOrSeekable -> IO BS.ByteString +seekAndReadBytes mSeek len f = seekAndStreamBytes mSeek len f >>= S.fold SBS.write + +{- | Warning: the stream produced from this function accesses to the mutable handler. +if multiple streams are pulled from the same handler at the same time, chaos happen. +Make sure there is only one stream running at one time for each SeekableHandle, +and streams are not read again when they are not used anymore. +-} +seekAndStreamBytes :: + (MonadIO m) => + Maybe (SeekMode, Integer) -> Int -> FileBufferedOrSeekable -> m (Stream m Word8) +seekAndStreamBytes mSeek len f = do + liftIO $ + case mSeek of + Nothing -> pure () + Just (seekMode, seekTo) -> fSeek f seekMode seekTo + pure $ S.take len $ fRead f + +fSeek :: FileBufferedOrSeekable -> SeekMode -> Integer -> IO () +fSeek (FileSeekable (SeekableHandle h)) seekMode seekTo = hSeek h seekMode seekTo +fSeek (FileBuffered i bs) AbsoluteSeek seekTo = writeIORef i (fromIntegral seekTo) +fSeek (FileBuffered i bs) RelativeSeek seekTo = modifyIORef' i (+ fromIntegral seekTo) +fSeek (FileBuffered i bs) SeekFromEnd seekTo = writeIORef i (fromIntegral $ BS.length bs + fromIntegral seekTo) + +fRead :: (MonadIO m) => FileBufferedOrSeekable -> Stream m Word8 +fRead (FileSeekable (SeekableHandle h)) = SHandle.read h +fRead (FileBuffered i bs) = S.concatEffect $ do + pos <- liftIO $ readIORef i + pure $ + S.mapM + ( \x -> do + liftIO (modifyIORef' i (+ 1)) + pure x + ) + (S.unfold SBS.reader (BS.drop (fromIntegral pos) bs)) diff --git a/src/DataFrame/IO/Parquet/Thrift.hs b/src/DataFrame/IO/Parquet/Thrift.hs index 239371c..ec518fb 100644 --- a/src/DataFrame/IO/Parquet/Thrift.hs +++ b/src/DataFrame/IO/Parquet/Thrift.hs @@ -21,6 +21,7 @@ import qualified Data.Vector as V import qualified Data.Vector.Unboxed as VU import Data.Word import DataFrame.IO.Parquet.Binary +import DataFrame.IO.Parquet.Seeking import DataFrame.IO.Parquet.Types import qualified DataFrame.Internal.Column as DI import DataFrame.Internal.DataFrame (DataFrame, unsafeGetColumn) @@ -329,6 +330,17 @@ skipList buf pos = do let elemType = toTType sizeAndType replicateM_ sizeOnly (skipFieldData elemType buf pos) +{- | This avoids reading entire bytestring at once: it uses the seekable handle + seeks it to the end of the file to read the metadata +-} +readMetadataByHandleMetaSize :: FileBufferedOrSeekable -> Int -> IO FileMetadata +readMetadataByHandleMetaSize sh metaSize = do + let lastFieldId = 0 + bs <- readLastBytes (fromIntegral $ metaSize + footerSize) sh + bufferPos <- newIORef 0 + readFileMetaData defaultMetadata bs bufferPos lastFieldId + +-- | metadata starts from (L - 8 - meta_size) to L - 8 - 1. readMetadata :: BS.ByteString -> Int -> IO FileMetadata readMetadata contents size = do let metadataStartPos = BS.length contents - footerSize - size diff --git a/tests/Parquet.hs b/tests/Parquet.hs index 8eb699e..87486cb 100644 --- a/tests/Parquet.hs +++ b/tests/Parquet.hs @@ -13,6 +13,12 @@ import Data.Time import GHC.IO (unsafePerformIO) import Test.HUnit +-- | For fallback path testing +readParquetNonSeekable :: FilePath -> IO D.DataFrame +readParquetNonSeekable = + D.readParquetWithOpts + (D.defaultParquetReadOptions{D.forceNonSeekable = Just True}) + allTypes :: D.DataFrame allTypes = D.fromNamedColumns @@ -53,13 +59,20 @@ allTypes = ) ] +testBothReadParquetPaths :: ((FilePath -> IO D.DataFrame) -> Test) -> Test +testBothReadParquetPaths test = + TestList + [ test D.readParquet + , test readParquetNonSeekable + ] + allTypesPlain :: Test -allTypesPlain = +allTypesPlain = testBothReadParquetPaths $ \readParquet -> TestCase ( assertEqual "allTypesPlain" allTypes - (unsafePerformIO (D.readParquet "./tests/data/alltypes_plain.parquet")) + (unsafePerformIO (readParquet "./tests/data/alltypes_plain.parquet")) ) allTypesTinyPagesDimensions :: Test @@ -155,7 +168,7 @@ tinyPagesLast10 = ] allTypesTinyPagesLastFew :: Test -allTypesTinyPagesLastFew = +allTypesTinyPagesLastFew = testBothReadParquetPaths $ \readParquet -> TestCase ( assertEqual "allTypesTinyPages dimensions" @@ -164,27 +177,27 @@ allTypesTinyPagesLastFew = -- Excluding doubles because they are weird to compare. ( fmap (D.takeLast 10 . D.exclude ["double_col"]) - (D.readParquet "./tests/data/alltypes_tiny_pages.parquet") + (readParquet "./tests/data/alltypes_tiny_pages.parquet") ) ) ) allTypesPlainSnappy :: Test -allTypesPlainSnappy = +allTypesPlainSnappy = testBothReadParquetPaths $ \readParquet -> TestCase ( assertEqual "allTypesPlainSnappy" (D.filter (F.col @Int32 "id") (`elem` [6, 7]) allTypes) - (unsafePerformIO (D.readParquet "./tests/data/alltypes_plain.snappy.parquet")) + (unsafePerformIO (readParquet "./tests/data/alltypes_plain.snappy.parquet")) ) allTypesDictionary :: Test -allTypesDictionary = +allTypesDictionary = testBothReadParquetPaths $ \readParquet -> TestCase ( assertEqual "allTypesPlainSnappy" (D.filter (F.col @Int32 "id") (`elem` [0, 1]) allTypes) - (unsafePerformIO (D.readParquet "./tests/data/alltypes_dictionary.parquet")) + (unsafePerformIO (readParquet "./tests/data/alltypes_dictionary.parquet")) ) selectedColumnsWithOpts :: Test @@ -460,12 +473,12 @@ transactions = ] transactionsTest :: Test -transactionsTest = +transactionsTest = testBothReadParquetPaths $ \readParquet -> TestCase ( assertEqual "transactions" transactions - (unsafePerformIO (D.readParquet "./tests/data/transactions.parquet")) + (unsafePerformIO (readParquet "./tests/data/transactions.parquet")) ) mtCarsDataset :: D.DataFrame