Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions dataframe.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ library
DataFrame.IO.Parquet.Compression,
DataFrame.IO.Parquet.Encoding,
DataFrame.IO.Parquet.Page,
DataFrame.IO.Parquet.Seeking,
DataFrame.IO.Parquet.Time,
DataFrame.IO.Parquet.Types,
DataFrame.Lazy.IO.CSV,
Expand Down Expand Up @@ -138,6 +139,8 @@ library
stm >= 2.5 && < 3,
filepath >= 1.4 && < 2,
Glob >= 0.10 && < 1,
streamly-core,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cabal check fails without version bounds. @adithyaov please advise on bounds.

streamly-bytestring,

hs-source-dirs: src
c-sources: cbits/process_csv.c
Expand Down
51 changes: 37 additions & 14 deletions src/DataFrame/IO/Parquet.hs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import Data.Either
import Data.IORef
import Data.Int
import qualified Data.List as L
import qualified Data.Map as M
import qualified Data.Map.Strict as M
import qualified Data.Set as S
import qualified Data.Text as T
import Data.Text.Encoding
Expand All @@ -38,7 +38,9 @@ import DataFrame.IO.Parquet.Types
import System.Directory (doesDirectoryExist)

import qualified Data.Vector.Unboxed as VU
import DataFrame.IO.Parquet.Seeking
import System.FilePath ((</>))
import System.IO (IOMode (ReadMode))

-- Options -----------------------------------------------------------------

Expand All @@ -61,6 +63,10 @@ data ParquetReadOptions = ParquetReadOptions
-- ^ Optional row filter expression applied before projection.
, rowRange :: Maybe (Int, Int)
-- ^ Optional row slice @(start, end)@ with start-inclusive/end-exclusive semantics.
, forceNonSeekable :: Maybe Bool
{- ^ Force reader to buffer entire file in memory, even if it is seekable.
For internal testing only, to test the fallback path.
-}
}
deriving (Eq, Show)

Expand All @@ -82,6 +88,7 @@ defaultParquetReadOptions =
{ selectedColumns = Nothing
, predicate = Nothing
, rowRange = Nothing
, forceNonSeekable = Nothing
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than putting this in the public API we'd much rather make a separate testing endpoint.

Something like:

-- production path
readParquetWithOpts opts path = withSeekable path ReadMode (readHelper opts path)

-- This would be the testing function.
_readParquetWithOpts opts path = withFilebuffer path ReadMode (readHelper opts path)

I actually don't know if there is a good way to inject behaviour into Haskell tests in this way. Lemme ask around then get back to this but separating the functions seems like a good first start to me.

}

-- Public API --------------------------------------------------------------
Expand Down Expand Up @@ -131,8 +138,8 @@ cleanColPath nodes path = go nodes path False
p : go (sChildren n) ps False

readParquetWithOpts :: ParquetReadOptions -> FilePath -> IO DataFrame
readParquetWithOpts opts path = do
(fileMetadata, contents) <- readMetadataFromPath path
readParquetWithOpts opts path = withFileBufferedOrSeekable (forceNonSeekable opts) path ReadMode $ \file -> do
fileMetadata <- readMetadataFromHandle file
let columnPaths = getColumnPaths (drop 1 $ schema fileMetadata)
let columnNames = map fst columnPaths
let leafNames = map (last . T.splitOn ".") columnNames
Expand Down Expand Up @@ -205,7 +212,11 @@ readParquetWithOpts opts path = do
else colDataPageOffset
let colLength = columnTotalCompressedSize metadata

let columnBytes = BSO.take (fromIntegral colLength) (BSO.drop (fromIntegral colStart) contents)
columnBytes <-
seekAndReadBytes
(Just (AbsoluteSeek, fromIntegral colStart))
(fromIntegral colLength)
file

pages <- readAllPages (columnCodec metadata) columnBytes

Expand Down Expand Up @@ -237,13 +248,13 @@ readParquetWithOpts opts path = do
Nothing -> do
mc <- DI.newMutableColumn totalRows column
DI.copyIntoMutableColumn mc 0 column
modifyIORef colMutMap (M.insert colFullName mc)
modifyIORef colOffMap (M.insert colFullName (DI.columnLength column))
modifyIORef' colMutMap (M.insert colFullName mc)
modifyIORef' colOffMap (M.insert colFullName (DI.columnLength column))
Just mc -> do
off <- (M.! colFullName) <$> readIORef colOffMap
DI.copyIntoMutableColumn mc off column
modifyIORef colOffMap (M.adjust (+ DI.columnLength column) colFullName)
modifyIORef lTypeMap (M.insert colFullName lType)
modifyIORef' colOffMap (M.adjust (+ DI.columnLength column) colFullName)
modifyIORef' lTypeMap (M.insert colFullName lType)

finalMutMap <- readIORef colMutMap
finalColMap <-
Expand Down Expand Up @@ -322,28 +333,40 @@ applyReadOptions opts =

-- File and metadata parsing -----------------------------------------------

-- | read the file in memory at once, parse magicString and return the entire file ByteString
readMetadataFromPath :: FilePath -> IO (FileMetadata, BSO.ByteString)
readMetadataFromPath path = do
contents <- BSO.readFile path
let (size, magicString) = contents `seq` readMetadataSizeFromFooter contents
let (size, magicString) = readMetadataSizeFromFooter contents
when (magicString /= "PAR1") $ error "Invalid Parquet file"
meta <- readMetadata contents size
pure (meta, contents)

readMetadataSizeFromFooter :: BSO.ByteString -> (Int, BSO.ByteString)
readMetadataSizeFromFooter contents =
-- | read from the end of the file, parse magicString and return the entire file ByteString
readMetadataFromHandle :: FileBufferedOrSeekable -> IO FileMetadata
readMetadataFromHandle sh = do
footerBs <- readLastBytes (fromIntegral footerSize) sh
let (size, magicString) = readMetadataSizeFromFooterSlice footerBs
when (magicString /= "PAR1") $ error "Invalid Parquet file"
readMetadataByHandleMetaSize sh size

-- | Takes the last 8 bit of the file to parse metadata size and magic string
readMetadataSizeFromFooterSlice :: BSO.ByteString -> (Int, BSO.ByteString)
readMetadataSizeFromFooterSlice contents =
let
footerOffSet = BSO.length contents - 8
sizeBytes =
map
(fromIntegral @Word8 @Int32 . BSO.index contents)
[footerOffSet .. footerOffSet + 3]
[0 .. 3]
size = fromIntegral $ L.foldl' (.|.) 0 $ zipWith shift sizeBytes [0, 8, 16, 24]
magicStringBytes = map (BSO.index contents) [footerOffSet + 4 .. footerOffSet + 7]
magicStringBytes = map (BSO.index contents) [4 .. 7]
magicString = BSO.pack magicStringBytes
in
(size, magicString)

readMetadataSizeFromFooter :: BSO.ByteString -> (Int, BSO.ByteString)
readMetadataSizeFromFooter = readMetadataSizeFromFooterSlice . BSO.takeEnd 8

-- Schema navigation -------------------------------------------------------

getColumnPaths :: [SchemaElement] -> [(T.Text, Int)]
Expand Down
2 changes: 1 addition & 1 deletion src/DataFrame/IO/Parquet/Binary.hs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ readAndAdvance :: IORef Int -> BS.ByteString -> IO Word8
readAndAdvance bufferPos buffer = do
pos <- readIORef bufferPos
let b = BS.index buffer pos
modifyIORef bufferPos (+ 1)
modifyIORef' bufferPos (+ 1)
return b

readVarIntFromBuffer :: (Integral a) => BS.ByteString -> IORef Int -> IO a
Expand Down
144 changes: 144 additions & 0 deletions src/DataFrame/IO/Parquet/Seeking.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
{- | This module contains low-level utilities around file seeking

potentially also contains all Streamly related low-level utilities.

later this module can be renamed / moved to an internal module.
-}
module DataFrame.IO.Parquet.Seeking (
SeekableHandle (getSeekableHandle),
SeekMode (..),
FileBufferedOrSeekable (..),
advanceBytes,
mkFileBufferedOrSeekable,
mkSeekableHandle,
readLastBytes,
seekAndReadBytes,
seekAndStreamBytes,
withFileBufferedOrSeekable,
) where

import Control.Monad
import Control.Monad.IO.Class
import qualified Data.ByteString as BS
import Data.IORef
import Data.Int
import Data.Word
import Streamly.Data.Stream (Stream)
import qualified Streamly.Data.Stream as S
import qualified Streamly.External.ByteString as SBS
import qualified Streamly.FileSystem.Handle as SHandle
import System.IO

{- | This handle carries a proof that it must be seekable.
Note: Handle and SeekableHandle are not thread safe, should not be
shared across threads, beaware when running parallel/concurrent code.

Not seekable:
- stdin / stdout
- pipes / FIFOs

But regular files are always seekable. Parquet fundamentally wants random
access, a non-seekable source will not support effecient access without
buffering the entire file.
-}
newtype SeekableHandle = SeekableHandle {getSeekableHandle :: Handle}

{- | If we truely want to support non-seekable files, we need to also consider the case
to buffer the entire file in memory.

Not thread safe, contains mutable reference (as Handle already is).

If we need concurrent / parallel parsing or something, we need to read into ByteString
first, not sharing the same handle.
-}
data FileBufferedOrSeekable
= FileBuffered !(IORef Int64) !BS.ByteString
| FileSeekable !SeekableHandle

-- | Smart constructor for SeekableHandle
mkSeekableHandle :: Handle -> IO (Maybe SeekableHandle)
mkSeekableHandle h = do
seekable <- hIsSeekable h
pure $ if seekable then Just (SeekableHandle h) else Nothing

-- | For testing only
type ForceNonSeekable = Maybe Bool

{- | Smart constructor for FileBufferedOrSeekable, tries to keep in the seekable case
if possible.
-}
mkFileBufferedOrSeekable ::
ForceNonSeekable -> Handle -> IO FileBufferedOrSeekable
mkFileBufferedOrSeekable forceNonSeek h = do
seekable <- hIsSeekable h
if not seekable || forceNonSeek == Just True
then FileBuffered <$> newIORef 0 <*> BS.hGetContents h
else pure $ FileSeekable $ SeekableHandle h

{- | With / bracket pattern for FileBufferedOrSeekable

Warning: do not return the FileBufferedOrSeekable outside the scope of the action as
it will be closed.
-}
withFileBufferedOrSeekable ::
ForceNonSeekable ->
FilePath ->
IOMode ->
(FileBufferedOrSeekable -> IO a) ->
IO a
withFileBufferedOrSeekable forceNonSeek path ioMode action = withFile path ioMode $ \h -> do
fbos <- mkFileBufferedOrSeekable forceNonSeek h
action fbos

-- | Read from the end, useful for reading metadata without loading entire file
readLastBytes :: Integer -> FileBufferedOrSeekable -> IO BS.ByteString
readLastBytes n (FileSeekable sh) = do
let h = getSeekableHandle sh
hSeek h SeekFromEnd (negate n)
S.fold SBS.write (SHandle.read h)
readLastBytes n (FileBuffered i bs) = do
writeIORef i (fromIntegral $ BS.length bs)
when (n > fromIntegral (BS.length bs)) $ error "lastBytes: n > length bs"
pure $ BS.drop (BS.length bs - fromIntegral n) bs

-- | Note: this does not guarantee n bytes (if it ends early)
advanceBytes :: Int -> FileBufferedOrSeekable -> IO BS.ByteString
advanceBytes = seekAndReadBytes Nothing

-- | Note: this does not guarantee n bytes (if it ends early)
seekAndReadBytes ::
Maybe (SeekMode, Integer) -> Int -> FileBufferedOrSeekable -> IO BS.ByteString
seekAndReadBytes mSeek len f = seekAndStreamBytes mSeek len f >>= S.fold SBS.write

{- | Warning: the stream produced from this function accesses to the mutable handler.
if multiple streams are pulled from the same handler at the same time, chaos happen.
Make sure there is only one stream running at one time for each SeekableHandle,
and streams are not read again when they are not used anymore.
-}
seekAndStreamBytes ::
(MonadIO m) =>
Maybe (SeekMode, Integer) -> Int -> FileBufferedOrSeekable -> m (Stream m Word8)
seekAndStreamBytes mSeek len f = do
liftIO $
case mSeek of
Nothing -> pure ()
Just (seekMode, seekTo) -> fSeek f seekMode seekTo
pure $ S.take len $ fRead f

fSeek :: FileBufferedOrSeekable -> SeekMode -> Integer -> IO ()
fSeek (FileSeekable (SeekableHandle h)) seekMode seekTo = hSeek h seekMode seekTo
fSeek (FileBuffered i bs) AbsoluteSeek seekTo = writeIORef i (fromIntegral seekTo)
fSeek (FileBuffered i bs) RelativeSeek seekTo = modifyIORef' i (+ fromIntegral seekTo)
fSeek (FileBuffered i bs) SeekFromEnd seekTo = writeIORef i (fromIntegral $ BS.length bs + fromIntegral seekTo)

fRead :: (MonadIO m) => FileBufferedOrSeekable -> Stream m Word8
fRead (FileSeekable (SeekableHandle h)) = SHandle.read h
fRead (FileBuffered i bs) = S.concatEffect $ do
pos <- liftIO $ readIORef i
pure $
S.mapM
( \x -> do
liftIO (modifyIORef' i (+ 1))
pure x
)
(S.unfold SBS.reader (BS.drop (fromIntegral pos) bs))
12 changes: 12 additions & 0 deletions src/DataFrame/IO/Parquet/Thrift.hs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import qualified Data.Vector as V
import qualified Data.Vector.Unboxed as VU
import Data.Word
import DataFrame.IO.Parquet.Binary
import DataFrame.IO.Parquet.Seeking
import DataFrame.IO.Parquet.Types
import qualified DataFrame.Internal.Column as DI
import DataFrame.Internal.DataFrame (DataFrame, unsafeGetColumn)
Expand Down Expand Up @@ -329,6 +330,17 @@ skipList buf pos = do
let elemType = toTType sizeAndType
replicateM_ sizeOnly (skipFieldData elemType buf pos)

{- | This avoids reading entire bytestring at once: it uses the seekable handle
seeks it to the end of the file to read the metadata
-}
readMetadataByHandleMetaSize :: FileBufferedOrSeekable -> Int -> IO FileMetadata
readMetadataByHandleMetaSize sh metaSize = do
let lastFieldId = 0
bs <- readLastBytes (fromIntegral $ metaSize + footerSize) sh
bufferPos <- newIORef 0
readFileMetaData defaultMetadata bs bufferPos lastFieldId

-- | metadata starts from (L - 8 - meta_size) to L - 8 - 1.
readMetadata :: BS.ByteString -> Int -> IO FileMetadata
readMetadata contents size = do
let metadataStartPos = BS.length contents - footerSize - size
Expand Down
Loading
Loading