Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions dataframe.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ library
unordered-containers >= 0.1 && < 1,
vector ^>= 0.13,
vector-algorithms ^>= 0.9,
brotli >= 0.0 && < 1,
zlib >= 0.5 && < 1,
zstd >= 0.1.2.0 && < 0.2,
mmap >= 0.5.8 && < 0.6,
Expand Down
104 changes: 83 additions & 21 deletions src/DataFrame/IO/Parquet/Page.hs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

module DataFrame.IO.Parquet.Page where

import qualified Codec.Compression.Brotli as Brotli
import qualified Codec.Compression.GZip as GZip
import qualified Codec.Compression.Zstd.Streaming as Zstd
import Data.Bits
Expand All @@ -15,6 +16,7 @@ import DataFrame.IO.Parquet.Thrift
import DataFrame.IO.Parquet.Types
import GHC.Float
import qualified Snappy
import Control.Monad (when)

isDataPage :: Page -> Bool
isDataPage page = case pageTypeHeader (pageHeader page) of
Expand All @@ -36,27 +38,87 @@ readPage c columnBytes =

let compressed = BS.take (fromIntegral $ compressedPageSize hdr) rem

fullData <- case c of
ZSTD -> do
result <- Zstd.decompress
drainZstd result compressed []
where
drainZstd (Zstd.Consume f) input acc = do
result <- f input
drainZstd result BS.empty acc
drainZstd (Zstd.Produce chunk next) _ acc = do
result <- next
drainZstd result BS.empty (acc <> [chunk])
drainZstd (Zstd.Done final) _ acc =
pure $ BS.concat (acc <> [final])
drainZstd (Zstd.Error msg msg2) _ _ =
error ("ZSTD error: " ++ msg ++ " " ++ msg2)
SNAPPY -> case Snappy.decompress compressed of
Left e -> error (show e)
Right res -> pure res
UNCOMPRESSED -> pure compressed
GZIP -> pure (LB.toStrict (GZip.decompress (BS.fromStrict compressed)))
other -> error ("Unsupported compression type: " ++ show other)
fullData <- case pageTypeHeader hdr of
DataPageHeaderV2{..} -> do
let { repLen = fromIntegral repetitionLevelByteLength
; defLen = fromIntegral definitionLevelByteLength
; prefixLen = repLen + defLen
}

when (prefixLen > BS.length compressed) $
error $
"readPage: DataPageHeaderV2 prefixLen (" ++ show prefixLen
++ ") > compressedPageSize (" ++ show (BS.length compressed) ++ ")"

let { prefix = BS.take prefixLen compressed
; body = BS.drop prefixLen compressed
}

body' <-
if dataPageHeaderV2IsCompressed
then case c of
ZSTD -> do
result <- Zstd.decompress
drainZstd result body []
where
drainZstd (Zstd.Consume f) input acc = do
result <- f input
drainZstd result BS.empty acc
drainZstd (Zstd.Produce chunk next) _ acc = do
result <- next
drainZstd result BS.empty (acc <> [chunk])
drainZstd (Zstd.Done final) _ acc =
pure $ BS.concat (acc <> [final])
drainZstd (Zstd.Error msg msg2) _ _ =
error ("ZSTD error: " ++ msg ++ " " ++ msg2)

SNAPPY -> case Snappy.decompress body of
Left e -> error (show e)
Right res -> pure res

UNCOMPRESSED -> pure body

GZIP ->
pure (LB.toStrict (GZip.decompress (LB.fromStrict body)))

BROTLI ->
pure (LB.toStrict (Brotli.decompress (LB.fromStrict body)))

other -> error ("Unsupported compression type: " ++ show other)
else
pure body

pure (prefix <> body')

_ -> case c of
ZSTD -> do
result <- Zstd.decompress
drainZstd result compressed []
where
drainZstd (Zstd.Consume f) input acc = do
result <- f input
drainZstd result BS.empty acc
drainZstd (Zstd.Produce chunk next) _ acc = do
result <- next
drainZstd result BS.empty (acc <> [chunk])
drainZstd (Zstd.Done final) _ acc =
pure $ BS.concat (acc <> [final])
drainZstd (Zstd.Error msg msg2) _ _ =
error ("ZSTD error: " ++ msg ++ " " ++ msg2)

SNAPPY -> case Snappy.decompress compressed of
Left e -> error (show e)
Right res -> pure res

UNCOMPRESSED -> pure compressed

GZIP ->
pure (LB.toStrict (GZip.decompress (LB.fromStrict compressed)))

BROTLI ->
pure (LB.toStrict (Brotli.decompress (LB.fromStrict compressed)))

other -> error ("Unsupported compression type: " ++ show other)
pure
( Just $ Page hdr fullData
, BS.drop (fromIntegral $ compressedPageSize hdr) rem
Expand Down
13 changes: 13 additions & 0 deletions tests/Parquet.hs
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,18 @@ allTypesDictionary =
(unsafePerformIO (D.readParquet "./tests/data/alltypes_dictionary.parquet"))
)

brotliLargeStringMapLoads :: Test
brotliLargeStringMapLoads =
TestCase $
assertBool
"large_string_map.brotli.parquet should load"
( unsafePerformIO
( do
df <- D.readParquet "./tests/data/large_string_map.brotli.parquet"
pure (D.dimensions df /= (0, 0))
)
)

selectedColumnsWithOpts :: Test
selectedColumnsWithOpts =
TestCase
Expand Down Expand Up @@ -931,6 +943,7 @@ tests =
[ allTypesPlain
, allTypesPlainSnappy
, allTypesDictionary
, brotliLargeStringMapLoads
, selectedColumnsWithOpts
, rowRangeWithOpts
, predicateWithOpts
Expand Down