diff --git a/bin/chat.js b/bin/chat.js index 137145a..4f6ca96 100644 --- a/bin/chat.js +++ b/bin/chat.js @@ -7,10 +7,10 @@ let outputMode = 'text' // default output mode const instructions = 'You are a machine learning web application named "Hyperparam" running on a CLI terminal.' + '\nYou assist users with analyzing and exploring datasets, particularly in parquet format.' - + ' The website and api are available at hyperparam.app.' + + ' The website is available at hyperparam.app.' + ' The Hyperparam CLI tool can list and explore local parquet files.' + '\nYou are on a terminal and can only output: text, emojis, terminal colors, and terminal formatting.' - + ' Don\'t add additional markdown or html formatting unless requested.' + + ' Limited markdown formatting is available: inline code blocks.' + (process.stdout.isTTY ? ` The terminal width is ${process.stdout.columns} characters.` : '') const colors = { diff --git a/bin/tools/parquetDataSource.js b/bin/tools/parquetDataSource.js index 9607ccd..d0a607c 100644 --- a/bin/tools/parquetDataSource.js +++ b/bin/tools/parquetDataSource.js @@ -1,11 +1,9 @@ -import { parquetSchema } from 'hyparquet' -import { parquetPlan } from 'hyparquet/src/plan.js' -import { asyncGroupToRows, readRowGroup } from 'hyparquet/src/rowgroup.js' +import { parquetMetadataAsync, parquetReadObjects } from 'hyparquet' import { whereToParquetFilter } from './parquetFilter.js' /** - * @import { AsyncBuffer, Compressors, FileMetaData } from 'hyparquet' - * @import { AsyncDataSource } from 'squirreling' + * @import { AsyncBuffer, Compressors, FileMetaData, ParquetQueryFilter } from 'hyparquet' + * @import { AsyncDataSource, AsyncRow, SqlPrimitive } from 'squirreling' * @import { AsyncCells } from 'squirreling/src/types.js' */ @@ -20,39 +18,75 @@ import { whereToParquetFilter } from './parquetFilter.js' export function parquetDataSource(file, metadata, compressors) { return { async *scan(hints) { - const options = { - file, - metadata, - compressors, - columns: hints?.columns, - // convert WHERE clause to parquet pushdown filter - filter: whereToParquetFilter(hints?.where), - filterStrict: false, - } + metadata ??= await parquetMetadataAsync(file) - // TODO: check that columns exist in parquet file - let { columns } = options - if (!columns?.length) { - const schema = parquetSchema(metadata) - columns = schema.children.map(col => col.element.name) - } + // Convert WHERE AST to hyparquet filter format + const whereFilter = hints?.where && whereToParquetFilter(hints.where) + /** @type {ParquetQueryFilter | undefined} */ + const filter = hints?.where ? whereFilter : undefined + const filterApplied = !filter || whereFilter - const plan = parquetPlan(options) - for (const subplan of plan.groups) { - // Read row group - const rg = readRowGroup(options, plan, subplan) - // Transpose to materialized rows - const rows = await asyncGroupToRows(rg, 0, rg.groupRows, undefined, 'object') - // Convert to AsyncRow generator - for (const row of rows) { - /** @type {AsyncCells} */ - const cells = {} - for (const [key, value] of Object.entries(row)) { - cells[key] = () => Promise.resolve(value) + // Emit rows by row group + let groupStart = 0 + let remainingLimit = hints?.limit ?? Infinity + for (const rowGroup of metadata.row_groups) { + const rowCount = Number(rowGroup.num_rows) + + // Skip row groups by offset if where is fully applied + let safeOffset = 0 + let safeLimit = rowCount + if (filterApplied) { + if (hints?.offset !== undefined && groupStart < hints.offset) { + safeOffset = Math.min(rowCount, hints.offset - groupStart) } - yield { columns, cells } + safeLimit = Math.min(rowCount - safeOffset, remainingLimit) + if (safeLimit <= 0 && safeOffset < rowCount) break + } + for (let i = 0; i < safeOffset; i++) { + // yield empty rows + yield asyncRow({}) + } + if (safeOffset === rowCount) { + groupStart += rowCount + continue + } + + // Read objects from this row group + const data = await parquetReadObjects({ + file, + metadata, + rowStart: groupStart + safeOffset, + rowEnd: groupStart + safeOffset + safeLimit, + columns: hints?.columns, + filter, + filterStrict: false, + compressors, + useOffsetIndex: true, + }) + + // Yield each row + for (const row of data) { + yield asyncRow(row) } + + remainingLimit -= data.length + groupStart += rowCount } }, } } + +/** + * Creates an async row accessor that wraps a plain JavaScript object + * + * @param {Record} obj - the plain object + * @returns {AsyncRow} a row accessor interface + */ +function asyncRow(obj) { + /** @type {AsyncCells} */ + const cells = {} + for (const [key, value] of Object.entries(obj)) { + cells[key] = () => Promise.resolve(value) + } + return { columns: Object.keys(obj), cells } +} diff --git a/bin/tools/parquetSql.js b/bin/tools/parquetSql.js index d9c004e..7cf8cac 100644 --- a/bin/tools/parquetSql.js +++ b/bin/tools/parquetSql.js @@ -1,4 +1,4 @@ -import { asyncBufferFromFile, parquetMetadataAsync } from 'hyparquet' +import { asyncBufferFromFile, asyncBufferFromUrl, parquetMetadataAsync } from 'hyparquet' import { compressors } from 'hyparquet-compressors' import { collect, executeSql } from 'squirreling' import { parquetDataSource } from './parquetDataSource.js' @@ -27,9 +27,9 @@ export const parquetSql = { parameters: { type: 'object', properties: { - filename: { + file: { type: 'string', - description: 'The name of the parquet file to query.', + description: 'The parquet file to query either local file path or url.', }, query: { type: 'string', @@ -40,16 +40,16 @@ export const parquetSql = { description: 'Whether to truncate long string values in the results. If true (default), each string cell is limited to 1000 characters. If false, each string cell is limited to 10,000 characters.', }, }, - required: ['filename', 'query'], + required: ['file', 'query'], }, }, /** * @param {Record} args * @returns {Promise} */ - async handleToolCall({ filename, query, truncate = true }) { - if (typeof filename !== 'string') { - throw new Error('Expected filename to be a string') + async handleToolCall({ file, query, truncate = true }) { + if (typeof file !== 'string') { + throw new Error('Expected file to be a string') } if (typeof query !== 'string' || query.trim().length === 0) { throw new Error('Query parameter must be a non-empty string') @@ -59,9 +59,11 @@ export const parquetSql = { const startTime = performance.now() // Load parquet file and create data source - const file = await asyncBufferFromFile(filename) - const metadata = await parquetMetadataAsync(file) - const table = parquetDataSource(file, metadata, compressors) + const asyncBuffer = file.startsWith('http://') || file.startsWith('https://') + ? await asyncBufferFromUrl({ url: file }) + : await asyncBufferFromFile(file) + const metadata = await parquetMetadataAsync(asyncBuffer) + const table = parquetDataSource(asyncBuffer, metadata, compressors) // Execute SQL query const results = await collect(executeSql({ tables: { table }, query }))