From 98309f37de0b11651916db36cdf772640b1a0ef1 Mon Sep 17 00:00:00 2001 From: Sivapriya Venkateswarar Date: Thu, 5 Mar 2026 12:03:28 +0400 Subject: [PATCH] Add BROTLI compression support for Parquet decoding --- dataframe.cabal | 1 + src/DataFrame/IO/Parquet/Page.hs | 104 ++++++++++++++++++++++++------- tests/Parquet.hs | 13 ++++ 3 files changed, 97 insertions(+), 21 deletions(-) diff --git a/dataframe.cabal b/dataframe.cabal index 488c8e7..d2a4982 100644 --- a/dataframe.cabal +++ b/dataframe.cabal @@ -125,6 +125,7 @@ library unordered-containers >= 0.1 && < 1, vector ^>= 0.13, vector-algorithms ^>= 0.9, + brotli >= 0.0 && < 1, zlib >= 0.5 && < 1, zstd >= 0.1.2.0 && < 0.2, mmap >= 0.5.8 && < 0.6, diff --git a/src/DataFrame/IO/Parquet/Page.hs b/src/DataFrame/IO/Parquet/Page.hs index cfda036..8a8d46e 100644 --- a/src/DataFrame/IO/Parquet/Page.hs +++ b/src/DataFrame/IO/Parquet/Page.hs @@ -3,6 +3,7 @@ module DataFrame.IO.Parquet.Page where +import qualified Codec.Compression.Brotli as Brotli import qualified Codec.Compression.GZip as GZip import qualified Codec.Compression.Zstd.Streaming as Zstd import Data.Bits @@ -15,6 +16,7 @@ import DataFrame.IO.Parquet.Thrift import DataFrame.IO.Parquet.Types import GHC.Float import qualified Snappy +import Control.Monad (when) isDataPage :: Page -> Bool isDataPage page = case pageTypeHeader (pageHeader page) of @@ -36,27 +38,87 @@ readPage c columnBytes = let compressed = BS.take (fromIntegral $ compressedPageSize hdr) rem - fullData <- case c of - ZSTD -> do - result <- Zstd.decompress - drainZstd result compressed [] - where - drainZstd (Zstd.Consume f) input acc = do - result <- f input - drainZstd result BS.empty acc - drainZstd (Zstd.Produce chunk next) _ acc = do - result <- next - drainZstd result BS.empty (acc <> [chunk]) - drainZstd (Zstd.Done final) _ acc = - pure $ BS.concat (acc <> [final]) - drainZstd (Zstd.Error msg msg2) _ _ = - error ("ZSTD error: " ++ msg ++ " " ++ msg2) - SNAPPY -> case Snappy.decompress compressed of - Left e -> error (show e) - Right res -> pure res - UNCOMPRESSED -> pure compressed - GZIP -> pure (LB.toStrict (GZip.decompress (BS.fromStrict compressed))) - other -> error ("Unsupported compression type: " ++ show other) + fullData <- case pageTypeHeader hdr of + DataPageHeaderV2{..} -> do + let { repLen = fromIntegral repetitionLevelByteLength + ; defLen = fromIntegral definitionLevelByteLength + ; prefixLen = repLen + defLen + } + + when (prefixLen > BS.length compressed) $ + error $ + "readPage: DataPageHeaderV2 prefixLen (" ++ show prefixLen + ++ ") > compressedPageSize (" ++ show (BS.length compressed) ++ ")" + + let { prefix = BS.take prefixLen compressed + ; body = BS.drop prefixLen compressed + } + + body' <- + if dataPageHeaderV2IsCompressed + then case c of + ZSTD -> do + result <- Zstd.decompress + drainZstd result body [] + where + drainZstd (Zstd.Consume f) input acc = do + result <- f input + drainZstd result BS.empty acc + drainZstd (Zstd.Produce chunk next) _ acc = do + result <- next + drainZstd result BS.empty (acc <> [chunk]) + drainZstd (Zstd.Done final) _ acc = + pure $ BS.concat (acc <> [final]) + drainZstd (Zstd.Error msg msg2) _ _ = + error ("ZSTD error: " ++ msg ++ " " ++ msg2) + + SNAPPY -> case Snappy.decompress body of + Left e -> error (show e) + Right res -> pure res + + UNCOMPRESSED -> pure body + + GZIP -> + pure (LB.toStrict (GZip.decompress (LB.fromStrict body))) + + BROTLI -> + pure (LB.toStrict (Brotli.decompress (LB.fromStrict body))) + + other -> error ("Unsupported compression type: " ++ show other) + else + pure body + + pure (prefix <> body') + + _ -> case c of + ZSTD -> do + result <- Zstd.decompress + drainZstd result compressed [] + where + drainZstd (Zstd.Consume f) input acc = do + result <- f input + drainZstd result BS.empty acc + drainZstd (Zstd.Produce chunk next) _ acc = do + result <- next + drainZstd result BS.empty (acc <> [chunk]) + drainZstd (Zstd.Done final) _ acc = + pure $ BS.concat (acc <> [final]) + drainZstd (Zstd.Error msg msg2) _ _ = + error ("ZSTD error: " ++ msg ++ " " ++ msg2) + + SNAPPY -> case Snappy.decompress compressed of + Left e -> error (show e) + Right res -> pure res + + UNCOMPRESSED -> pure compressed + + GZIP -> + pure (LB.toStrict (GZip.decompress (LB.fromStrict compressed))) + + BROTLI -> + pure (LB.toStrict (Brotli.decompress (LB.fromStrict compressed))) + + other -> error ("Unsupported compression type: " ++ show other) pure ( Just $ Page hdr fullData , BS.drop (fromIntegral $ compressedPageSize hdr) rem diff --git a/tests/Parquet.hs b/tests/Parquet.hs index 82cf506..0caefa4 100644 --- a/tests/Parquet.hs +++ b/tests/Parquet.hs @@ -187,6 +187,18 @@ allTypesDictionary = (unsafePerformIO (D.readParquet "./tests/data/alltypes_dictionary.parquet")) ) +brotliLargeStringMapLoads :: Test +brotliLargeStringMapLoads = + TestCase $ + assertBool + "large_string_map.brotli.parquet should load" + ( unsafePerformIO + ( do + df <- D.readParquet "./tests/data/large_string_map.brotli.parquet" + pure (D.dimensions df /= (0, 0)) + ) + ) + selectedColumnsWithOpts :: Test selectedColumnsWithOpts = TestCase @@ -931,6 +943,7 @@ tests = [ allTypesPlain , allTypesPlainSnappy , allTypesDictionary + , brotliLargeStringMapLoads , selectedColumnsWithOpts , rowRangeWithOpts , predicateWithOpts