From 6f836f6b0a2aca0777c64b4f34f0579650f3e4f5 Mon Sep 17 00:00:00 2001 From: Kenny Daniel Date: Wed, 17 Dec 2025 02:08:17 -0800 Subject: [PATCH] Update squirreling --- bin/tools/parquetDataSource.js | 39 +++++++++++++++++--------- package.json | 33 +++++++++++----------- src/lib/workers/parquetWorker.ts | 6 ++-- src/lib/workers/parquetWorkerClient.ts | 3 +- src/lib/workers/types.ts | 3 +- 5 files changed, 48 insertions(+), 36 deletions(-) diff --git a/bin/tools/parquetDataSource.js b/bin/tools/parquetDataSource.js index d4d837ea..9607ccdc 100644 --- a/bin/tools/parquetDataSource.js +++ b/bin/tools/parquetDataSource.js @@ -1,10 +1,12 @@ +import { parquetSchema } from 'hyparquet' import { parquetPlan } from 'hyparquet/src/plan.js' import { asyncGroupToRows, readRowGroup } from 'hyparquet/src/rowgroup.js' import { whereToParquetFilter } from './parquetFilter.js' /** - * @import { AsyncBuffer, FileMetaData } from 'hyparquet' - * @import { AsyncDataSource, AsyncRow } from 'squirreling' + * @import { AsyncBuffer, Compressors, FileMetaData } from 'hyparquet' + * @import { AsyncDataSource } from 'squirreling' + * @import { AsyncCells } from 'squirreling/src/types.js' */ /** @@ -12,34 +14,43 @@ import { whereToParquetFilter } from './parquetFilter.js' * * @param {AsyncBuffer} file * @param {FileMetaData} metadata - * @param {import('hyparquet-compressors').Compressors} compressors + * @param {Compressors} compressors * @returns {AsyncDataSource} */ export function parquetDataSource(file, metadata, compressors) { return { - async *getRows(hints) { + async *scan(hints) { const options = { file, metadata, compressors, columns: hints?.columns, + // convert WHERE clause to parquet pushdown filter filter: whereToParquetFilter(hints?.where), + filterStrict: false, } + + // TODO: check that columns exist in parquet file + let { columns } = options + if (!columns?.length) { + const schema = parquetSchema(metadata) + columns = schema.children.map(col => col.element.name) + } + const plan = parquetPlan(options) - let count = 0 for (const subplan of plan.groups) { + // Read row group const rg = readRowGroup(options, plan, subplan) + // Transpose to materialized rows const rows = await asyncGroupToRows(rg, 0, rg.groupRows, undefined, 'object') - for (const asyncRow of rows) { - /** @type {AsyncRow} */ - const row = {} - for (const [key, value] of Object.entries(asyncRow)) { - row[key] = () => Promise.resolve(value) + // Convert to AsyncRow generator + for (const row of rows) { + /** @type {AsyncCells} */ + const cells = {} + for (const [key, value] of Object.entries(row)) { + cells[key] = () => Promise.resolve(value) } - yield row - count++ - // Check limit after each row - if (hints?.limit !== undefined && count >= hints.limit) return + yield { columns, cells } } } }, diff --git a/package.json b/package.json index e9e4459a..c0f6d28c 100644 --- a/package.json +++ b/package.json @@ -55,37 +55,36 @@ "watch:url": "NODE_ENV=development nodemon bin/cli.js https://hyperparam.blob.core.windows.net/hyperparam/starcoderdata-js-00000-of-00065.parquet" }, "dependencies": { - "hightable": "0.24.1", - "hyparquet": "1.22.1", + "hightable": "0.25.0", + "hyparquet": "1.23.2", "hyparquet-compressors": "1.1.1", "icebird": "0.3.1", - "squirreling": "0.4.8" + "squirreling": "0.6.0" }, "devDependencies": { - "@eslint/js": "9.39.1", - "@storybook/react-vite": "10.1.4", - "@testing-library/react": "16.3.0", - "@types/node": "24.10.2", + "@storybook/react-vite": "10.1.9", + "@testing-library/react": "16.3.1", + "@types/node": "25.0.3", "@types/react": "19.2.7", "@types/react-dom": "19.2.3", "@vitejs/plugin-react": "5.1.2", - "@vitest/coverage-v8": "4.0.15", - "eslint": "9.39.1", + "@vitest/coverage-v8": "4.0.16", + "eslint": "9.39.2", "eslint-plugin-react": "7.37.5", "eslint-plugin-react-hooks": "7.0.1", - "eslint-plugin-react-refresh": "0.4.24", - "eslint-plugin-storybook": "10.1.4", + "eslint-plugin-react-refresh": "0.4.26", + "eslint-plugin-storybook": "10.1.9", "globals": "16.5.0", "jsdom": "27.3.0", "nodemon": "3.1.11", "npm-run-all": "4.1.5", - "react": "19.2.1", - "react-dom": "19.2.1", - "storybook": "10.1.4", + "react": "19.2.3", + "react-dom": "19.2.3", + "storybook": "10.1.9", "typescript": "5.9.3", - "typescript-eslint": "8.49.0", - "vite": "7.2.7", - "vitest": "4.0.15" + "typescript-eslint": "8.50.0", + "vite": "7.3.0", + "vitest": "4.0.16" }, "peerDependencies": { "react": "^18.3.1 || ^19", diff --git a/src/lib/workers/parquetWorker.ts b/src/lib/workers/parquetWorker.ts index 333f0ddc..4985f6ec 100644 --- a/src/lib/workers/parquetWorker.ts +++ b/src/lib/workers/parquetWorker.ts @@ -1,5 +1,5 @@ -import type { ColumnData } from 'hyparquet' -import { AsyncBuffer, parquetQuery, parquetRead, parquetReadObjects } from 'hyparquet' +import { AsyncBuffer, ColumnData, parquetQuery, parquetRead, parquetReadObjects } from 'hyparquet' +import type { SubColumnData } from 'hyparquet/src/types.js' import { compressors } from 'hyparquet-compressors' import type { ChunkMessage, ClientMessage, CompleteMessage, PageMessage, ParquetQueryResolveMessage, ParquetReadObjectsResolveMessage, ParquetReadResolveMessage, RejectMessage, Rows } from './types.js' import { fromToAsyncBuffer } from './utils.js' @@ -52,7 +52,7 @@ self.onmessage = async ({ data }: { data: ClientMessage }) => { function onChunk(chunk: ColumnData) { postChunkMessage({ chunk, queryId }) } - function onPage(page: ColumnData) { + function onPage(page: SubColumnData) { postPageMessage({ page, queryId }) } } diff --git a/src/lib/workers/parquetWorkerClient.ts b/src/lib/workers/parquetWorkerClient.ts index 7234f2eb..cfdc4f64 100644 --- a/src/lib/workers/parquetWorkerClient.ts +++ b/src/lib/workers/parquetWorkerClient.ts @@ -1,4 +1,5 @@ import type { ColumnData } from 'hyparquet' +import type { SubColumnData } from 'hyparquet/src/types.js' import ParquetWorker from './parquetWorker?worker&inline' import type { ClientMessage, ParquetQueryWorkerOptions, ParquetReadObjectsWorkerOptions, ParquetReadWorkerOptions, Rows, WorkerMessage } from './types.js' /// ^ the worker is bundled with the main thread code (inline) which is easier for users to import @@ -9,7 +10,7 @@ let nextQueryId = 0 interface Agent { onComplete?: ((rows: Rows) => void) onChunk?: (chunk: ColumnData) => void - onPage?: (page: ColumnData) => void + onPage?: (page: SubColumnData) => void reject: (error: Error) => void parquetReadResolve?: () => void parquetReadObjectsResolve?: (rows: Rows) => void diff --git a/src/lib/workers/types.ts b/src/lib/workers/types.ts index caa868c3..89c901eb 100644 --- a/src/lib/workers/types.ts +++ b/src/lib/workers/types.ts @@ -1,5 +1,6 @@ import type { ColumnData, ParquetReadOptions } from 'hyparquet' import { parquetQuery } from 'hyparquet' +import { SubColumnData } from 'hyparquet/src/types.js' // https://github.com/hyparam/hyparquet/pull/105 type ParquetQueryFilter = Exclude[0]['filter'], undefined> @@ -88,7 +89,7 @@ export interface ChunkMessage extends QueryId { } export interface PageMessage extends QueryId { kind: 'onPage' - page: ColumnData + page: SubColumnData } export interface RejectMessage extends QueryId { kind: 'onReject'