-
Notifications
You must be signed in to change notification settings - Fork 41
refactor: Refactor Parquet reader to avoid loading entire file in memory at once #184
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -14,7 +14,7 @@ import Data.Either | |
| import Data.IORef | ||
| import Data.Int | ||
| import qualified Data.List as L | ||
| import qualified Data.Map as M | ||
| import qualified Data.Map.Strict as M | ||
| import qualified Data.Set as S | ||
| import qualified Data.Text as T | ||
| import Data.Text.Encoding | ||
|
|
@@ -38,7 +38,9 @@ import DataFrame.IO.Parquet.Types | |
| import System.Directory (doesDirectoryExist) | ||
|
|
||
| import qualified Data.Vector.Unboxed as VU | ||
| import DataFrame.IO.Parquet.Seeking | ||
| import System.FilePath ((</>)) | ||
| import System.IO (IOMode (ReadMode)) | ||
|
|
||
| -- Options ----------------------------------------------------------------- | ||
|
|
||
|
|
@@ -61,6 +63,10 @@ data ParquetReadOptions = ParquetReadOptions | |
| -- ^ Optional row filter expression applied before projection. | ||
| , rowRange :: Maybe (Int, Int) | ||
| -- ^ Optional row slice @(start, end)@ with start-inclusive/end-exclusive semantics. | ||
| , forceNonSeekable :: Maybe Bool | ||
| {- ^ Force reader to buffer entire file in memory, even if it is seekable. | ||
| For internal testing only, to test the fallback path. | ||
| -} | ||
| } | ||
| deriving (Eq, Show) | ||
|
|
||
|
|
@@ -82,6 +88,7 @@ defaultParquetReadOptions = | |
| { selectedColumns = Nothing | ||
| , predicate = Nothing | ||
| , rowRange = Nothing | ||
| , forceNonSeekable = Nothing | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Rather than putting this in the public API we'd much rather make a separate testing endpoint. Something like: -- production path
readParquetWithOpts opts path = withSeekable path ReadMode (readHelper opts path)
-- This would be the testing function.
_readParquetWithOpts opts path = withFilebuffer path ReadMode (readHelper opts path)I actually don't know if there is a good way to inject behaviour into Haskell tests in this way. Lemme ask around then get back to this but separating the functions seems like a good first start to me. |
||
| } | ||
|
|
||
| -- Public API -------------------------------------------------------------- | ||
|
|
@@ -131,8 +138,8 @@ cleanColPath nodes path = go nodes path False | |
| p : go (sChildren n) ps False | ||
|
|
||
| readParquetWithOpts :: ParquetReadOptions -> FilePath -> IO DataFrame | ||
| readParquetWithOpts opts path = do | ||
| (fileMetadata, contents) <- readMetadataFromPath path | ||
| readParquetWithOpts opts path = withFileBufferedOrSeekable (forceNonSeekable opts) path ReadMode $ \file -> do | ||
| fileMetadata <- readMetadataFromHandle file | ||
| let columnPaths = getColumnPaths (drop 1 $ schema fileMetadata) | ||
| let columnNames = map fst columnPaths | ||
| let leafNames = map (last . T.splitOn ".") columnNames | ||
|
|
@@ -205,7 +212,11 @@ readParquetWithOpts opts path = do | |
| else colDataPageOffset | ||
| let colLength = columnTotalCompressedSize metadata | ||
|
|
||
| let columnBytes = BSO.take (fromIntegral colLength) (BSO.drop (fromIntegral colStart) contents) | ||
| columnBytes <- | ||
| seekAndReadBytes | ||
| (Just (AbsoluteSeek, fromIntegral colStart)) | ||
| (fromIntegral colLength) | ||
| file | ||
|
|
||
| pages <- readAllPages (columnCodec metadata) columnBytes | ||
|
|
||
|
|
@@ -237,13 +248,13 @@ readParquetWithOpts opts path = do | |
| Nothing -> do | ||
| mc <- DI.newMutableColumn totalRows column | ||
| DI.copyIntoMutableColumn mc 0 column | ||
| modifyIORef colMutMap (M.insert colFullName mc) | ||
| modifyIORef colOffMap (M.insert colFullName (DI.columnLength column)) | ||
| modifyIORef' colMutMap (M.insert colFullName mc) | ||
| modifyIORef' colOffMap (M.insert colFullName (DI.columnLength column)) | ||
| Just mc -> do | ||
| off <- (M.! colFullName) <$> readIORef colOffMap | ||
| DI.copyIntoMutableColumn mc off column | ||
| modifyIORef colOffMap (M.adjust (+ DI.columnLength column) colFullName) | ||
| modifyIORef lTypeMap (M.insert colFullName lType) | ||
| modifyIORef' colOffMap (M.adjust (+ DI.columnLength column) colFullName) | ||
| modifyIORef' lTypeMap (M.insert colFullName lType) | ||
|
|
||
| finalMutMap <- readIORef colMutMap | ||
| finalColMap <- | ||
|
|
@@ -322,28 +333,40 @@ applyReadOptions opts = | |
|
|
||
| -- File and metadata parsing ----------------------------------------------- | ||
|
|
||
| -- | read the file in memory at once, parse magicString and return the entire file ByteString | ||
| readMetadataFromPath :: FilePath -> IO (FileMetadata, BSO.ByteString) | ||
| readMetadataFromPath path = do | ||
| contents <- BSO.readFile path | ||
| let (size, magicString) = contents `seq` readMetadataSizeFromFooter contents | ||
| let (size, magicString) = readMetadataSizeFromFooter contents | ||
| when (magicString /= "PAR1") $ error "Invalid Parquet file" | ||
| meta <- readMetadata contents size | ||
| pure (meta, contents) | ||
|
|
||
| readMetadataSizeFromFooter :: BSO.ByteString -> (Int, BSO.ByteString) | ||
| readMetadataSizeFromFooter contents = | ||
| -- | read from the end of the file, parse magicString and return the entire file ByteString | ||
| readMetadataFromHandle :: FileBufferedOrSeekable -> IO FileMetadata | ||
| readMetadataFromHandle sh = do | ||
| footerBs <- readLastBytes (fromIntegral footerSize) sh | ||
| let (size, magicString) = readMetadataSizeFromFooterSlice footerBs | ||
| when (magicString /= "PAR1") $ error "Invalid Parquet file" | ||
| readMetadataByHandleMetaSize sh size | ||
|
|
||
| -- | Takes the last 8 bit of the file to parse metadata size and magic string | ||
| readMetadataSizeFromFooterSlice :: BSO.ByteString -> (Int, BSO.ByteString) | ||
| readMetadataSizeFromFooterSlice contents = | ||
| let | ||
| footerOffSet = BSO.length contents - 8 | ||
| sizeBytes = | ||
| map | ||
| (fromIntegral @Word8 @Int32 . BSO.index contents) | ||
| [footerOffSet .. footerOffSet + 3] | ||
| [0 .. 3] | ||
| size = fromIntegral $ L.foldl' (.|.) 0 $ zipWith shift sizeBytes [0, 8, 16, 24] | ||
| magicStringBytes = map (BSO.index contents) [footerOffSet + 4 .. footerOffSet + 7] | ||
| magicStringBytes = map (BSO.index contents) [4 .. 7] | ||
| magicString = BSO.pack magicStringBytes | ||
| in | ||
| (size, magicString) | ||
|
|
||
| readMetadataSizeFromFooter :: BSO.ByteString -> (Int, BSO.ByteString) | ||
| readMetadataSizeFromFooter = readMetadataSizeFromFooterSlice . BSO.takeEnd 8 | ||
|
|
||
| -- Schema navigation ------------------------------------------------------- | ||
|
|
||
| getColumnPaths :: [SchemaElement] -> [(T.Text, Int)] | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,144 @@ | ||
| {- | This module contains low-level utilities around file seeking | ||
|
|
||
| potentially also contains all Streamly related low-level utilities. | ||
|
|
||
| later this module can be renamed / moved to an internal module. | ||
| -} | ||
| module DataFrame.IO.Parquet.Seeking ( | ||
| SeekableHandle (getSeekableHandle), | ||
| SeekMode (..), | ||
| FileBufferedOrSeekable (..), | ||
| advanceBytes, | ||
| mkFileBufferedOrSeekable, | ||
| mkSeekableHandle, | ||
| readLastBytes, | ||
| seekAndReadBytes, | ||
| seekAndStreamBytes, | ||
| withFileBufferedOrSeekable, | ||
| ) where | ||
|
|
||
| import Control.Monad | ||
| import Control.Monad.IO.Class | ||
| import qualified Data.ByteString as BS | ||
| import Data.IORef | ||
| import Data.Int | ||
| import Data.Word | ||
| import Streamly.Data.Stream (Stream) | ||
| import qualified Streamly.Data.Stream as S | ||
| import qualified Streamly.External.ByteString as SBS | ||
| import qualified Streamly.FileSystem.Handle as SHandle | ||
| import System.IO | ||
|
|
||
| {- | This handle carries a proof that it must be seekable. | ||
| Note: Handle and SeekableHandle are not thread safe, should not be | ||
| shared across threads, beaware when running parallel/concurrent code. | ||
|
|
||
| Not seekable: | ||
| - stdin / stdout | ||
| - pipes / FIFOs | ||
|
|
||
| But regular files are always seekable. Parquet fundamentally wants random | ||
| access, a non-seekable source will not support effecient access without | ||
| buffering the entire file. | ||
| -} | ||
| newtype SeekableHandle = SeekableHandle {getSeekableHandle :: Handle} | ||
|
|
||
| {- | If we truely want to support non-seekable files, we need to also consider the case | ||
| to buffer the entire file in memory. | ||
|
|
||
| Not thread safe, contains mutable reference (as Handle already is). | ||
|
|
||
| If we need concurrent / parallel parsing or something, we need to read into ByteString | ||
| first, not sharing the same handle. | ||
| -} | ||
| data FileBufferedOrSeekable | ||
| = FileBuffered !(IORef Int64) !BS.ByteString | ||
| | FileSeekable !SeekableHandle | ||
|
|
||
| -- | Smart constructor for SeekableHandle | ||
| mkSeekableHandle :: Handle -> IO (Maybe SeekableHandle) | ||
| mkSeekableHandle h = do | ||
| seekable <- hIsSeekable h | ||
| pure $ if seekable then Just (SeekableHandle h) else Nothing | ||
|
|
||
| -- | For testing only | ||
| type ForceNonSeekable = Maybe Bool | ||
|
|
||
| {- | Smart constructor for FileBufferedOrSeekable, tries to keep in the seekable case | ||
| if possible. | ||
| -} | ||
| mkFileBufferedOrSeekable :: | ||
| ForceNonSeekable -> Handle -> IO FileBufferedOrSeekable | ||
| mkFileBufferedOrSeekable forceNonSeek h = do | ||
| seekable <- hIsSeekable h | ||
| if not seekable || forceNonSeek == Just True | ||
| then FileBuffered <$> newIORef 0 <*> BS.hGetContents h | ||
| else pure $ FileSeekable $ SeekableHandle h | ||
|
|
||
| {- | With / bracket pattern for FileBufferedOrSeekable | ||
|
|
||
| Warning: do not return the FileBufferedOrSeekable outside the scope of the action as | ||
| it will be closed. | ||
| -} | ||
| withFileBufferedOrSeekable :: | ||
| ForceNonSeekable -> | ||
| FilePath -> | ||
| IOMode -> | ||
| (FileBufferedOrSeekable -> IO a) -> | ||
| IO a | ||
| withFileBufferedOrSeekable forceNonSeek path ioMode action = withFile path ioMode $ \h -> do | ||
| fbos <- mkFileBufferedOrSeekable forceNonSeek h | ||
| action fbos | ||
|
|
||
| -- | Read from the end, useful for reading metadata without loading entire file | ||
| readLastBytes :: Integer -> FileBufferedOrSeekable -> IO BS.ByteString | ||
| readLastBytes n (FileSeekable sh) = do | ||
| let h = getSeekableHandle sh | ||
| hSeek h SeekFromEnd (negate n) | ||
| S.fold SBS.write (SHandle.read h) | ||
| readLastBytes n (FileBuffered i bs) = do | ||
| writeIORef i (fromIntegral $ BS.length bs) | ||
| when (n > fromIntegral (BS.length bs)) $ error "lastBytes: n > length bs" | ||
| pure $ BS.drop (BS.length bs - fromIntegral n) bs | ||
|
|
||
| -- | Note: this does not guarantee n bytes (if it ends early) | ||
| advanceBytes :: Int -> FileBufferedOrSeekable -> IO BS.ByteString | ||
| advanceBytes = seekAndReadBytes Nothing | ||
|
|
||
| -- | Note: this does not guarantee n bytes (if it ends early) | ||
| seekAndReadBytes :: | ||
| Maybe (SeekMode, Integer) -> Int -> FileBufferedOrSeekable -> IO BS.ByteString | ||
| seekAndReadBytes mSeek len f = seekAndStreamBytes mSeek len f >>= S.fold SBS.write | ||
|
|
||
| {- | Warning: the stream produced from this function accesses to the mutable handler. | ||
| if multiple streams are pulled from the same handler at the same time, chaos happen. | ||
| Make sure there is only one stream running at one time for each SeekableHandle, | ||
| and streams are not read again when they are not used anymore. | ||
| -} | ||
| seekAndStreamBytes :: | ||
| (MonadIO m) => | ||
| Maybe (SeekMode, Integer) -> Int -> FileBufferedOrSeekable -> m (Stream m Word8) | ||
| seekAndStreamBytes mSeek len f = do | ||
| liftIO $ | ||
| case mSeek of | ||
| Nothing -> pure () | ||
| Just (seekMode, seekTo) -> fSeek f seekMode seekTo | ||
| pure $ S.take len $ fRead f | ||
|
|
||
| fSeek :: FileBufferedOrSeekable -> SeekMode -> Integer -> IO () | ||
| fSeek (FileSeekable (SeekableHandle h)) seekMode seekTo = hSeek h seekMode seekTo | ||
| fSeek (FileBuffered i bs) AbsoluteSeek seekTo = writeIORef i (fromIntegral seekTo) | ||
| fSeek (FileBuffered i bs) RelativeSeek seekTo = modifyIORef' i (+ fromIntegral seekTo) | ||
| fSeek (FileBuffered i bs) SeekFromEnd seekTo = writeIORef i (fromIntegral $ BS.length bs + fromIntegral seekTo) | ||
|
|
||
| fRead :: (MonadIO m) => FileBufferedOrSeekable -> Stream m Word8 | ||
| fRead (FileSeekable (SeekableHandle h)) = SHandle.read h | ||
| fRead (FileBuffered i bs) = S.concatEffect $ do | ||
| pos <- liftIO $ readIORef i | ||
| pure $ | ||
| S.mapM | ||
| ( \x -> do | ||
| liftIO (modifyIORef' i (+ 1)) | ||
| pure x | ||
| ) | ||
| (S.unfold SBS.reader (BS.drop (fromIntegral pos) bs)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cabal check fails without version bounds. @adithyaov please advise on bounds.