Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions dataframe.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ library
DataFrame.Internal.Interpreter,
DataFrame.Internal.Parsing,
DataFrame.Internal.Column,
DataFrame.Internal.Binary,
DataFrame.Internal.Statistics,
DataFrame.Display.Terminal.PrettyPrint,
DataFrame.Display.Terminal.Colours,
Expand Down Expand Up @@ -246,6 +247,7 @@ test-suite tests
build-depends: base >= 4 && < 5,
bytestring >= 0.11 && < 0.13,
dataframe >= 0.5 && < 1,
bytestring >= 0.11 && < 0.13,
directory >= 1.3.0.0 && < 2,
HUnit ^>= 1.6,
QuickCheck >= 2 && < 3,
Expand Down
13 changes: 4 additions & 9 deletions src/DataFrame/IO/Parquet.hs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ module DataFrame.IO.Parquet where

import Control.Exception (throw)
import Control.Monad
import Data.Bits
import qualified Data.ByteString as BSO
import Data.Either
import Data.IORef
Expand All @@ -20,8 +19,8 @@ import qualified Data.Text as T
import Data.Text.Encoding
import Data.Time
import Data.Time.Clock.POSIX (posixSecondsToUTCTime)
import Data.Word
import DataFrame.Errors (DataFrameException (ColumnNotFoundException))
import DataFrame.Internal.Binary (littleEndianWord32)
import qualified DataFrame.Internal.Column as DI
import DataFrame.Internal.DataFrame (DataFrame)
import DataFrame.Internal.Expression (Expr, getColumns)
Expand Down Expand Up @@ -334,13 +333,9 @@ readMetadataSizeFromFooter :: BSO.ByteString -> (Int, BSO.ByteString)
readMetadataSizeFromFooter contents =
let
footerOffSet = BSO.length contents - 8
sizeBytes =
map
(fromIntegral @Word8 @Int32 . BSO.index contents)
[footerOffSet .. footerOffSet + 3]
size = fromIntegral $ L.foldl' (.|.) 0 $ zipWith shift sizeBytes [0, 8, 16, 24]
magicStringBytes = map (BSO.index contents) [footerOffSet + 4 .. footerOffSet + 7]
magicString = BSO.pack magicStringBytes
footer = BSO.drop footerOffSet contents
size = fromIntegral (littleEndianWord32 footer)
magicString = BSO.take 4 (BSO.drop 4 footer)
in
(size, magicString)

Expand Down
38 changes: 0 additions & 38 deletions src/DataFrame/IO/Parquet/Binary.hs
Original file line number Diff line number Diff line change
Expand Up @@ -15,44 +15,6 @@ import qualified Foreign.Marshal.Alloc as Foreign
import qualified Foreign.Ptr as Foreign
import qualified Foreign.Storable as Foreign

littleEndianWord32 :: BS.ByteString -> Word32
littleEndianWord32 bytes
| BS.length bytes >= 4 =
foldr
(.|.)
0
( zipWith
(\b i -> fromIntegral b `shiftL` i)
(BS.unpack $ BS.take 4 bytes)
[0, 8, 16, 24]
)
| otherwise =
littleEndianWord32 (BS.take 4 $ bytes `BS.append` BS.pack [0, 0, 0, 0])

littleEndianWord64 :: BS.ByteString -> Word64
littleEndianWord64 bytes =
foldr
(.|.)
0
( zipWith
(\b i -> fromIntegral b `shiftL` i)
(BS.unpack $ BS.take 8 bytes)
[0, 8 ..]
)

littleEndianInt32 :: BS.ByteString -> Int32
littleEndianInt32 = fromIntegral . littleEndianWord32

word64ToLittleEndian :: Word64 -> BS.ByteString
word64ToLittleEndian w =
BS.map
(\i -> fromIntegral (w `shiftR` fromIntegral i))
(BS.pack [0, 8, 16, 24, 32, 40, 48, 56])

word32ToLittleEndian :: Word32 -> BS.ByteString
word32ToLittleEndian w =
BS.map (\i -> fromIntegral (w `shiftR` fromIntegral i)) (BS.pack [0, 8, 16, 24])

readUVarInt :: BS.ByteString -> (Word64, BS.ByteString)
readUVarInt xs = loop xs 0 0 0
where
Expand Down
6 changes: 5 additions & 1 deletion src/DataFrame/IO/Parquet/Dictionary.hs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,15 @@ import Data.Time
import qualified Data.Vector as V
import qualified Data.Vector.Mutable as VM
import qualified Data.Vector.Unboxed as VU
import DataFrame.IO.Parquet.Binary
import DataFrame.IO.Parquet.Encoding
import DataFrame.IO.Parquet.Levels
import DataFrame.IO.Parquet.Time
import DataFrame.IO.Parquet.Types
import DataFrame.Internal.Binary (
littleEndianInt32,
littleEndianWord32,
littleEndianWord64,
)
import qualified DataFrame.Internal.Column as DI
import GHC.Float

Expand Down
3 changes: 2 additions & 1 deletion src/DataFrame/IO/Parquet/Encoding.hs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ import qualified Data.ByteString.Unsafe as BSU
import Data.List (foldl')
import qualified Data.Vector.Unboxed as VU
import Data.Word
import DataFrame.IO.Parquet.Binary
import DataFrame.IO.Parquet.Binary (readUVarInt)
import DataFrame.Internal.Binary (littleEndianWord32)

ceilLog2 :: Int -> Int
ceilLog2 x
Expand Down
2 changes: 1 addition & 1 deletion src/DataFrame/IO/Parquet/Levels.hs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ import Data.Int
import Data.List
import qualified Data.Text as T

import DataFrame.IO.Parquet.Binary
import DataFrame.IO.Parquet.Encoding
import DataFrame.IO.Parquet.Thrift
import DataFrame.IO.Parquet.Types
import DataFrame.Internal.Binary (littleEndianWord32)

readLevelsV1 ::
Int -> Int -> Int -> BS.ByteString -> ([Int], [Int], BS.ByteString)
Expand Down
5 changes: 5 additions & 0 deletions src/DataFrame/IO/Parquet/Page.hs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@ import qualified Data.Vector.Unboxed as VU
import DataFrame.IO.Parquet.Binary
import DataFrame.IO.Parquet.Thrift
import DataFrame.IO.Parquet.Types
import DataFrame.Internal.Binary (
littleEndianInt32,
littleEndianWord32,
littleEndianWord64,
)
import GHC.Float
import qualified Snappy

Expand Down
7 changes: 6 additions & 1 deletion src/DataFrame/IO/Parquet/Time.hs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,12 @@ import qualified Data.ByteString as BS
import Data.Time
import Data.Word

import DataFrame.IO.Parquet.Binary
import DataFrame.Internal.Binary (
littleEndianWord32,
littleEndianWord64,
word32ToLittleEndian,
word64ToLittleEndian,
)

int96ToUTCTime :: BS.ByteString -> UTCTime
int96ToUTCTime bytes
Expand Down
105 changes: 105 additions & 0 deletions src/DataFrame/Internal/Binary.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
module DataFrame.Internal.Binary where

import Data.Bits
import qualified Data.ByteString as BS
import Data.Int
import Data.Word

littleEndianWord32 :: BS.ByteString -> Word32
littleEndianWord32 bytes
| len >= 4 =
assembleWord32
(BS.index bytes 0)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unsafe index is fine here.

(BS.index bytes 1)
(BS.index bytes 2)
(BS.index bytes 3)
| otherwise =
assembleWord32
(byteAtOrZero len bytes 0)
(byteAtOrZero len bytes 1)
(byteAtOrZero len bytes 2)
(byteAtOrZero len bytes 3)
where
len = BS.length bytes
{-# INLINE littleEndianWord32 #-}

littleEndianWord64 :: BS.ByteString -> Word64
littleEndianWord64 bytes
| len >= 8 =
assembleWord64
(BS.index bytes 0)
(BS.index bytes 1)
(BS.index bytes 2)
(BS.index bytes 3)
(BS.index bytes 4)
(BS.index bytes 5)
(BS.index bytes 6)
(BS.index bytes 7)
| otherwise =
assembleWord64
(byteAtOrZero len bytes 0)
(byteAtOrZero len bytes 1)
(byteAtOrZero len bytes 2)
(byteAtOrZero len bytes 3)
(byteAtOrZero len bytes 4)
(byteAtOrZero len bytes 5)
(byteAtOrZero len bytes 6)
(byteAtOrZero len bytes 7)
where
len = BS.length bytes
{-# INLINE littleEndianWord64 #-}

littleEndianInt32 :: BS.ByteString -> Int32
littleEndianInt32 = fromIntegral . littleEndianWord32
{-# INLINE littleEndianInt32 #-}

word64ToLittleEndian :: Word64 -> BS.ByteString
word64ToLittleEndian w =
BS.pack
[ fromIntegral w
, fromIntegral (w `unsafeShiftR` 8)
, fromIntegral (w `unsafeShiftR` 16)
, fromIntegral (w `unsafeShiftR` 24)
, fromIntegral (w `unsafeShiftR` 32)
, fromIntegral (w `unsafeShiftR` 40)
, fromIntegral (w `unsafeShiftR` 48)
, fromIntegral (w `unsafeShiftR` 56)
]
{-# INLINE word64ToLittleEndian #-}

word32ToLittleEndian :: Word32 -> BS.ByteString
word32ToLittleEndian w =
BS.pack
[ fromIntegral w
, fromIntegral (w `unsafeShiftR` 8)
, fromIntegral (w `unsafeShiftR` 16)
, fromIntegral (w `unsafeShiftR` 24)
]
{-# INLINE word32ToLittleEndian #-}

byteAtOrZero :: Int -> BS.ByteString -> Int -> Word8
byteAtOrZero len bytes i
| i >= 0 && i < len = BS.index bytes i
| otherwise = 0
{-# INLINE byteAtOrZero #-}

assembleWord32 :: Word8 -> Word8 -> Word8 -> Word8 -> Word32
assembleWord32 b0 b1 b2 b3 =
fromIntegral b0
.|. (fromIntegral b1 `unsafeShiftL` 8)
.|. (fromIntegral b2 `unsafeShiftL` 16)
.|. (fromIntegral b3 `unsafeShiftL` 24)
{-# INLINE assembleWord32 #-}

assembleWord64 ::
Word8 -> Word8 -> Word8 -> Word8 -> Word8 -> Word8 -> Word8 -> Word8 -> Word64
assembleWord64 b0 b1 b2 b3 b4 b5 b6 b7 =
fromIntegral b0
.|. (fromIntegral b1 `unsafeShiftL` 8)
.|. (fromIntegral b2 `unsafeShiftL` 16)
.|. (fromIntegral b3 `unsafeShiftL` 24)
.|. (fromIntegral b4 `unsafeShiftL` 32)
.|. (fromIntegral b5 `unsafeShiftL` 40)
.|. (fromIntegral b6 `unsafeShiftL` 48)
.|. (fromIntegral b7 `unsafeShiftL` 56)
{-# INLINE assembleWord64 #-}
36 changes: 4 additions & 32 deletions src/DataFrame/Lazy/IO/Binary.hs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ import Data.Bits (setBit, shiftL, testBit, (.|.))
import Data.Maybe (fromMaybe, isJust)
import Data.Type.Equality (TestEquality (testEquality), type (:~:) (Refl))
import Data.Word (Word16, Word32, Word64, Word8)
import qualified DataFrame.Internal.Binary as Binary
import DataFrame.Internal.Column (Column (..))
import DataFrame.Internal.DataFrame (DataFrame (..))
import Foreign (ForeignPtr, castForeignPtr, plusForeignPtr, sizeOf)
Expand Down Expand Up @@ -362,37 +363,12 @@ readWord16LE bs off
readWord32LE :: BS.ByteString -> Int -> ParseResult Word32
readWord32LE bs off
| off + 4 > BS.length bs = Left "unexpected end of input"
| otherwise =
let b0 = fromIntegral (BSU.unsafeIndex bs off) :: Word32
b1 = fromIntegral (BSU.unsafeIndex bs (off + 1)) :: Word32
b2 = fromIntegral (BSU.unsafeIndex bs (off + 2)) :: Word32
b3 = fromIntegral (BSU.unsafeIndex bs (off + 3)) :: Word32
in Right
(off + 4, b0 .|. (b1 `shiftL` 8) .|. (b2 `shiftL` 16) .|. (b3 `shiftL` 24))
| otherwise = Right (off + 4, Binary.littleEndianWord32 (BS.drop off bs))

readWord64LE :: BS.ByteString -> Int -> ParseResult Word64
readWord64LE bs off
| off + 8 > BS.length bs = Left "unexpected end of input"
| otherwise =
let b0 = fromIntegral (BSU.unsafeIndex bs off) :: Word64
b1 = fromIntegral (BSU.unsafeIndex bs (off + 1)) :: Word64
b2 = fromIntegral (BSU.unsafeIndex bs (off + 2)) :: Word64
b3 = fromIntegral (BSU.unsafeIndex bs (off + 3)) :: Word64
b4 = fromIntegral (BSU.unsafeIndex bs (off + 4)) :: Word64
b5 = fromIntegral (BSU.unsafeIndex bs (off + 5)) :: Word64
b6 = fromIntegral (BSU.unsafeIndex bs (off + 6)) :: Word64
b7 = fromIntegral (BSU.unsafeIndex bs (off + 7)) :: Word64
in Right
( off + 8
, b0
.|. (b1 `shiftL` 8)
.|. (b2 `shiftL` 16)
.|. (b3 `shiftL` 24)
.|. (b4 `shiftL` 32)
.|. (b5 `shiftL` 40)
.|. (b6 `shiftL` 48)
.|. (b7 `shiftL` 56)
)
| otherwise = Right (off + 8, Binary.littleEndianWord64 (BS.drop off bs))

-- | Read @n@ consecutive Word32LE values starting at offset @off@.
readWord32Array :: BS.ByteString -> Int -> Int -> Either String [Word32]
Expand All @@ -401,11 +377,7 @@ readWord32Array bs off n
| otherwise =
Right
[ let i = off + k * 4
b0 = fromIntegral (BSU.unsafeIndex bs i) :: Word32
b1 = fromIntegral (BSU.unsafeIndex bs (i + 1)) :: Word32
b2 = fromIntegral (BSU.unsafeIndex bs (i + 2)) :: Word32
b3 = fromIntegral (BSU.unsafeIndex bs (i + 3)) :: Word32
in b0 .|. (b1 `shiftL` 8) .|. (b2 `shiftL` 16) .|. (b3 `shiftL` 24)
in Binary.littleEndianWord32 (BS.drop i bs)
| k <- [0 .. n - 1]
]

Expand Down
Loading
Loading