From 4e59ee0e1858f46f6dfa3b071683d8cce3fc8d94 Mon Sep 17 00:00:00 2001 From: Eiko Date: Sun, 15 Mar 2026 17:35:28 +0000 Subject: [PATCH 1/2] refactor:Refactor Parquet reader to avoid whole-file loading in memory Read Parquet metadata from the footer and fetch column chunk bytes by seek instead of loading the entire file into memory up front. This keeps the current page decoding path intact while reducing peak memory usage for normal file reads, ensuring that only the column chunks needed are loaded into memory. One column chunk at a time so extra memory is bounded by the size of the column chunk. This is also the first step towards a streaming reader. --- dataframe.cabal | 3 + src/DataFrame/IO/Parquet.hs | 49 +++++++--- src/DataFrame/IO/Parquet/Binary.hs | 2 +- src/DataFrame/IO/Parquet/Seeking.hs | 144 ++++++++++++++++++++++++++++ src/DataFrame/IO/Parquet/Thrift.hs | 12 +++ tests/Parquet.hs | 33 +++++-- 6 files changed, 219 insertions(+), 24 deletions(-) create mode 100644 src/DataFrame/IO/Parquet/Seeking.hs diff --git a/dataframe.cabal b/dataframe.cabal index 4a984cd..2b7eaed 100644 --- a/dataframe.cabal +++ b/dataframe.cabal @@ -86,6 +86,7 @@ library DataFrame.IO.Parquet.Compression, DataFrame.IO.Parquet.Encoding, DataFrame.IO.Parquet.Page, + DataFrame.IO.Parquet.Seeking, DataFrame.IO.Parquet.Time, DataFrame.IO.Parquet.Types, DataFrame.Lazy.IO.CSV, @@ -138,6 +139,8 @@ library stm >= 2.5 && < 3, filepath >= 1.4 && < 2, Glob >= 0.10 && < 1, + streamly-core, + streamly-bytestring, hs-source-dirs: src c-sources: cbits/process_csv.c diff --git a/src/DataFrame/IO/Parquet.hs b/src/DataFrame/IO/Parquet.hs index 243c3c3..0f0503b 100644 --- a/src/DataFrame/IO/Parquet.hs +++ b/src/DataFrame/IO/Parquet.hs @@ -38,7 +38,9 @@ import DataFrame.IO.Parquet.Types import System.Directory (doesDirectoryExist) import qualified Data.Vector.Unboxed as VU +import DataFrame.IO.Parquet.Seeking import System.FilePath (()) +import System.IO (IOMode (ReadMode)) -- Options ----------------------------------------------------------------- @@ -61,6 +63,10 @@ data ParquetReadOptions = ParquetReadOptions -- ^ Optional row filter expression applied before projection. , rowRange :: Maybe (Int, Int) -- ^ Optional row slice @(start, end)@ with start-inclusive/end-exclusive semantics. + , forceNonSeekable :: Maybe Bool + {- ^ Force reader to buffer entire file in memory, even if it is seekable. + For internal testing only, to test the fallback path. + -} } deriving (Eq, Show) @@ -82,6 +88,7 @@ defaultParquetReadOptions = { selectedColumns = Nothing , predicate = Nothing , rowRange = Nothing + , forceNonSeekable = Nothing } -- Public API -------------------------------------------------------------- @@ -131,8 +138,8 @@ cleanColPath nodes path = go nodes path False p : go (sChildren n) ps False readParquetWithOpts :: ParquetReadOptions -> FilePath -> IO DataFrame -readParquetWithOpts opts path = do - (fileMetadata, contents) <- readMetadataFromPath path +readParquetWithOpts opts path = withFileBufferedOrSeekable (forceNonSeekable opts) path ReadMode $ \file -> do + fileMetadata <- readMetadataFromHandle file let columnPaths = getColumnPaths (drop 1 $ schema fileMetadata) let columnNames = map fst columnPaths let leafNames = map (last . T.splitOn ".") columnNames @@ -205,7 +212,11 @@ readParquetWithOpts opts path = do else colDataPageOffset let colLength = columnTotalCompressedSize metadata - let columnBytes = BSO.take (fromIntegral colLength) (BSO.drop (fromIntegral colStart) contents) + columnBytes <- + seekAndReadBytes + (Just (AbsoluteSeek, fromIntegral colStart)) + (fromIntegral colLength) + file pages <- readAllPages (columnCodec metadata) columnBytes @@ -237,13 +248,13 @@ readParquetWithOpts opts path = do Nothing -> do mc <- DI.newMutableColumn totalRows column DI.copyIntoMutableColumn mc 0 column - modifyIORef colMutMap (M.insert colFullName mc) - modifyIORef colOffMap (M.insert colFullName (DI.columnLength column)) + modifyIORef' colMutMap (M.insert colFullName mc) + modifyIORef' colOffMap (M.insert colFullName (DI.columnLength column)) Just mc -> do off <- (M.! colFullName) <$> readIORef colOffMap DI.copyIntoMutableColumn mc off column - modifyIORef colOffMap (M.adjust (+ DI.columnLength column) colFullName) - modifyIORef lTypeMap (M.insert colFullName lType) + modifyIORef' colOffMap (M.adjust (+ DI.columnLength column) colFullName) + modifyIORef' lTypeMap (M.insert colFullName lType) finalMutMap <- readIORef colMutMap finalColMap <- @@ -322,28 +333,40 @@ applyReadOptions opts = -- File and metadata parsing ----------------------------------------------- +-- | read the file in memory at once, parse magicString and return the entire file ByteString readMetadataFromPath :: FilePath -> IO (FileMetadata, BSO.ByteString) readMetadataFromPath path = do contents <- BSO.readFile path - let (size, magicString) = contents `seq` readMetadataSizeFromFooter contents + let (size, magicString) = readMetadataSizeFromFooter contents when (magicString /= "PAR1") $ error "Invalid Parquet file" meta <- readMetadata contents size pure (meta, contents) -readMetadataSizeFromFooter :: BSO.ByteString -> (Int, BSO.ByteString) -readMetadataSizeFromFooter contents = +-- | read from the end of the file, parse magicString and return the entire file ByteString +readMetadataFromHandle :: FileBufferedOrSeekable -> IO FileMetadata +readMetadataFromHandle sh = do + footerBs <- readLastBytes (fromIntegral footerSize) sh + let (size, magicString) = readMetadataSizeFromFooterSlice footerBs + when (magicString /= "PAR1") $ error "Invalid Parquet file" + readMetadataByHandleMetaSize sh size + +-- | Takes the last 8 bit of the file to parse metadata size and magic string +readMetadataSizeFromFooterSlice :: BSO.ByteString -> (Int, BSO.ByteString) +readMetadataSizeFromFooterSlice contents = let - footerOffSet = BSO.length contents - 8 sizeBytes = map (fromIntegral @Word8 @Int32 . BSO.index contents) - [footerOffSet .. footerOffSet + 3] + [0 .. 3] size = fromIntegral $ L.foldl' (.|.) 0 $ zipWith shift sizeBytes [0, 8, 16, 24] - magicStringBytes = map (BSO.index contents) [footerOffSet + 4 .. footerOffSet + 7] + magicStringBytes = map (BSO.index contents) [4 .. 7] magicString = BSO.pack magicStringBytes in (size, magicString) +readMetadataSizeFromFooter :: BSO.ByteString -> (Int, BSO.ByteString) +readMetadataSizeFromFooter = readMetadataSizeFromFooterSlice . BSO.takeEnd 8 + -- Schema navigation ------------------------------------------------------- getColumnPaths :: [SchemaElement] -> [(T.Text, Int)] diff --git a/src/DataFrame/IO/Parquet/Binary.hs b/src/DataFrame/IO/Parquet/Binary.hs index 3fc85bc..147beea 100644 --- a/src/DataFrame/IO/Parquet/Binary.hs +++ b/src/DataFrame/IO/Parquet/Binary.hs @@ -100,7 +100,7 @@ readAndAdvance :: IORef Int -> BS.ByteString -> IO Word8 readAndAdvance bufferPos buffer = do pos <- readIORef bufferPos let b = BS.index buffer pos - modifyIORef bufferPos (+ 1) + modifyIORef' bufferPos (+ 1) return b readVarIntFromBuffer :: (Integral a) => BS.ByteString -> IORef Int -> IO a diff --git a/src/DataFrame/IO/Parquet/Seeking.hs b/src/DataFrame/IO/Parquet/Seeking.hs new file mode 100644 index 0000000..21888b1 --- /dev/null +++ b/src/DataFrame/IO/Parquet/Seeking.hs @@ -0,0 +1,144 @@ +{- | This module contains low-level utilities around file seeking + +potentially also contains all Streamly related low-level utilities. + +later this module can be renamed / moved to an internal module. +-} +module DataFrame.IO.Parquet.Seeking ( + SeekableHandle (getSeekableHandle), + SeekMode (..), + FileBufferedOrSeekable (..), + advanceBytes, + mkFileBufferedOrSeekable, + mkSeekableHandle, + readLastBytes, + seekAndReadBytes, + seekAndStreamBytes, + withFileBufferedOrSeekable, +) where + +import Control.Monad +import Control.Monad.IO.Class +import qualified Data.ByteString as BS +import Data.IORef +import Data.Int +import Data.Word +import Streamly.Data.Stream (Stream) +import qualified Streamly.Data.Stream as S +import qualified Streamly.External.ByteString as SBS +import qualified Streamly.FileSystem.Handle as SHandle +import System.IO + +{- | This handle carries a proof that it must be seekable. +Note: Handle and SeekableHandle are not thread safe, should not be +shared across threads, beaware when running parallel/concurrent code. + +Not seekable: + - stdin / stdout + - pipes / FIFOs + +But regular files are always seekable. Parquet fundamentally wants random +access, a non-seekable source will not support effecient access without +buffering the entire file. +-} +newtype SeekableHandle = SeekableHandle {getSeekableHandle :: Handle} + +{- | If we truely want to support non-seekable files, we need to also consider the case +to buffer the entire file in memory. + +Not thread safe, contains mutable reference (as Handle already is). + +If we need concurrent / parallel parsing or something, we need to read into ByteString +first, not sharing the same handle. +-} +data FileBufferedOrSeekable + = FileBuffered !(IORef Int64) !BS.ByteString + | FileSeekable !SeekableHandle + +-- | Smart constructor for SeekableHandle +mkSeekableHandle :: Handle -> IO (Maybe SeekableHandle) +mkSeekableHandle h = do + seekable <- hIsSeekable h + pure $ if seekable then Just (SeekableHandle h) else Nothing + +-- | For testing only +type ForceNonSeekable = Maybe Bool + +{- | Smart constructor for FileBufferedOrSeekable, tries to keep in the seekable case +if possible. +-} +mkFileBufferedOrSeekable :: + ForceNonSeekable -> Handle -> IO FileBufferedOrSeekable +mkFileBufferedOrSeekable forceNonSeek h = do + seekable <- hIsSeekable h + if not seekable || forceNonSeek == Just True + then FileBuffered <$> newIORef 0 <*> BS.hGetContents h + else pure $ FileSeekable $ SeekableHandle h + +{- | With / bracket pattern for FileBufferedOrSeekable + +Warning: do not return the FileBufferedOrSeekable outside the scope of the action as +it will be closed. +-} +withFileBufferedOrSeekable :: + ForceNonSeekable -> + FilePath -> + IOMode -> + (FileBufferedOrSeekable -> IO a) -> + IO a +withFileBufferedOrSeekable forceNonSeek path ioMode action = withFile path ioMode $ \h -> do + fbos <- mkFileBufferedOrSeekable forceNonSeek h + action fbos + +-- | Read from the end, useful for reading metadata without loading entire file +readLastBytes :: Integer -> FileBufferedOrSeekable -> IO BS.ByteString +readLastBytes n (FileSeekable sh) = do + let h = getSeekableHandle sh + hSeek h SeekFromEnd (negate n) + S.fold SBS.write (SHandle.read h) +readLastBytes n (FileBuffered i bs) = do + writeIORef i (fromIntegral $ BS.length bs) + when (n > fromIntegral (BS.length bs)) $ error "lastBytes: n > length bs" + pure $ BS.drop (BS.length bs - fromIntegral n) bs + +-- | Note: this does not guarantee n bytes (if it ends early) +advanceBytes :: Int -> FileBufferedOrSeekable -> IO BS.ByteString +advanceBytes = seekAndReadBytes Nothing + +-- | Note: this does not guarantee n bytes (if it ends early) +seekAndReadBytes :: + Maybe (SeekMode, Integer) -> Int -> FileBufferedOrSeekable -> IO BS.ByteString +seekAndReadBytes mSeek len f = seekAndStreamBytes mSeek len f >>= S.fold SBS.write + +{- | Warning: the stream produced from this function accesses to the mutable handler. +if multiple streams are pulled from the same handler at the same time, chaos happen. +Make sure there is only one stream running at one time for each SeekableHandle, +and streams are not read again when they are not used anymore. +-} +seekAndStreamBytes :: + (MonadIO m) => + Maybe (SeekMode, Integer) -> Int -> FileBufferedOrSeekable -> m (Stream m Word8) +seekAndStreamBytes mSeek len f = do + liftIO $ + case mSeek of + Nothing -> pure () + Just (seekMode, seekTo) -> fSeek f seekMode seekTo + pure $ S.take len $ fRead f + +fSeek :: FileBufferedOrSeekable -> SeekMode -> Integer -> IO () +fSeek (FileSeekable (SeekableHandle h)) seekMode seekTo = hSeek h seekMode seekTo +fSeek (FileBuffered i bs) AbsoluteSeek seekTo = writeIORef i (fromIntegral seekTo) +fSeek (FileBuffered i bs) RelativeSeek seekTo = modifyIORef' i (+ fromIntegral seekTo) +fSeek (FileBuffered i bs) SeekFromEnd seekTo = writeIORef i (fromIntegral $ BS.length bs + fromIntegral seekTo) + +fRead :: (MonadIO m) => FileBufferedOrSeekable -> Stream m Word8 +fRead (FileSeekable (SeekableHandle h)) = SHandle.read h +fRead (FileBuffered i bs) = S.concatEffect $ do + pos <- liftIO $ readIORef i + pure $ + S.mapM + ( \x -> do + liftIO (modifyIORef' i (+ 1)) + pure x + ) + (S.unfold SBS.reader (BS.drop (fromIntegral pos) bs)) diff --git a/src/DataFrame/IO/Parquet/Thrift.hs b/src/DataFrame/IO/Parquet/Thrift.hs index 239371c..ec518fb 100644 --- a/src/DataFrame/IO/Parquet/Thrift.hs +++ b/src/DataFrame/IO/Parquet/Thrift.hs @@ -21,6 +21,7 @@ import qualified Data.Vector as V import qualified Data.Vector.Unboxed as VU import Data.Word import DataFrame.IO.Parquet.Binary +import DataFrame.IO.Parquet.Seeking import DataFrame.IO.Parquet.Types import qualified DataFrame.Internal.Column as DI import DataFrame.Internal.DataFrame (DataFrame, unsafeGetColumn) @@ -329,6 +330,17 @@ skipList buf pos = do let elemType = toTType sizeAndType replicateM_ sizeOnly (skipFieldData elemType buf pos) +{- | This avoids reading entire bytestring at once: it uses the seekable handle + seeks it to the end of the file to read the metadata +-} +readMetadataByHandleMetaSize :: FileBufferedOrSeekable -> Int -> IO FileMetadata +readMetadataByHandleMetaSize sh metaSize = do + let lastFieldId = 0 + bs <- readLastBytes (fromIntegral $ metaSize + footerSize) sh + bufferPos <- newIORef 0 + readFileMetaData defaultMetadata bs bufferPos lastFieldId + +-- | metadata starts from (L - 8 - meta_size) to L - 8 - 1. readMetadata :: BS.ByteString -> Int -> IO FileMetadata readMetadata contents size = do let metadataStartPos = BS.length contents - footerSize - size diff --git a/tests/Parquet.hs b/tests/Parquet.hs index 8eb699e..87486cb 100644 --- a/tests/Parquet.hs +++ b/tests/Parquet.hs @@ -13,6 +13,12 @@ import Data.Time import GHC.IO (unsafePerformIO) import Test.HUnit +-- | For fallback path testing +readParquetNonSeekable :: FilePath -> IO D.DataFrame +readParquetNonSeekable = + D.readParquetWithOpts + (D.defaultParquetReadOptions{D.forceNonSeekable = Just True}) + allTypes :: D.DataFrame allTypes = D.fromNamedColumns @@ -53,13 +59,20 @@ allTypes = ) ] +testBothReadParquetPaths :: ((FilePath -> IO D.DataFrame) -> Test) -> Test +testBothReadParquetPaths test = + TestList + [ test D.readParquet + , test readParquetNonSeekable + ] + allTypesPlain :: Test -allTypesPlain = +allTypesPlain = testBothReadParquetPaths $ \readParquet -> TestCase ( assertEqual "allTypesPlain" allTypes - (unsafePerformIO (D.readParquet "./tests/data/alltypes_plain.parquet")) + (unsafePerformIO (readParquet "./tests/data/alltypes_plain.parquet")) ) allTypesTinyPagesDimensions :: Test @@ -155,7 +168,7 @@ tinyPagesLast10 = ] allTypesTinyPagesLastFew :: Test -allTypesTinyPagesLastFew = +allTypesTinyPagesLastFew = testBothReadParquetPaths $ \readParquet -> TestCase ( assertEqual "allTypesTinyPages dimensions" @@ -164,27 +177,27 @@ allTypesTinyPagesLastFew = -- Excluding doubles because they are weird to compare. ( fmap (D.takeLast 10 . D.exclude ["double_col"]) - (D.readParquet "./tests/data/alltypes_tiny_pages.parquet") + (readParquet "./tests/data/alltypes_tiny_pages.parquet") ) ) ) allTypesPlainSnappy :: Test -allTypesPlainSnappy = +allTypesPlainSnappy = testBothReadParquetPaths $ \readParquet -> TestCase ( assertEqual "allTypesPlainSnappy" (D.filter (F.col @Int32 "id") (`elem` [6, 7]) allTypes) - (unsafePerformIO (D.readParquet "./tests/data/alltypes_plain.snappy.parquet")) + (unsafePerformIO (readParquet "./tests/data/alltypes_plain.snappy.parquet")) ) allTypesDictionary :: Test -allTypesDictionary = +allTypesDictionary = testBothReadParquetPaths $ \readParquet -> TestCase ( assertEqual "allTypesPlainSnappy" (D.filter (F.col @Int32 "id") (`elem` [0, 1]) allTypes) - (unsafePerformIO (D.readParquet "./tests/data/alltypes_dictionary.parquet")) + (unsafePerformIO (readParquet "./tests/data/alltypes_dictionary.parquet")) ) selectedColumnsWithOpts :: Test @@ -460,12 +473,12 @@ transactions = ] transactionsTest :: Test -transactionsTest = +transactionsTest = testBothReadParquetPaths $ \readParquet -> TestCase ( assertEqual "transactions" transactions - (unsafePerformIO (D.readParquet "./tests/data/transactions.parquet")) + (unsafePerformIO (readParquet "./tests/data/transactions.parquet")) ) mtCarsDataset :: D.DataFrame From 2618b5387539c192f7db09ef60ea84530d4f46c0 Mon Sep 17 00:00:00 2001 From: Eiko Date: Sun, 15 Mar 2026 18:30:43 +0000 Subject: [PATCH 2/2] Use Map.Strict to reduce the possibility of unevaluated thunk leak --- src/DataFrame/IO/Parquet.hs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/DataFrame/IO/Parquet.hs b/src/DataFrame/IO/Parquet.hs index 0f0503b..6424427 100644 --- a/src/DataFrame/IO/Parquet.hs +++ b/src/DataFrame/IO/Parquet.hs @@ -14,7 +14,7 @@ import Data.Either import Data.IORef import Data.Int import qualified Data.List as L -import qualified Data.Map as M +import qualified Data.Map.Strict as M import qualified Data.Set as S import qualified Data.Text as T import Data.Text.Encoding