Conversation
25bfcc8 to
d7815c9
Compare
0a105d2 to
ce83d25
Compare
|
@NivinCS it's not clear from this document which interfaces go into the SPI. Can you please elaborate on this design from the perspective of the plugin author: what SPI interfaces need to be implemented, and also how does this SPI integrate into the engine? It simply misses this foundational detail. |
ce83d25 to
a933fd3
Compare
|
Hi @NivinCS , it's not easy for readers to tell what will be added/changed to each component, the syntax, analyzer, SPI, planner, operator, function, etc. Do you think it'll help if you organize by component after introducing the overall logic flow? |
Hi Tim, |
|
@NivinCS I took a closer look at the TVF Implementation and now I understand what you meant by applying to partitions. I got confused when it came to partitioning, but now I think I understand. The partition is actually the partition in the jvector not the partition of the table. Yes, we should create a split for each of these partitions via TableFunctionProcessorProvider. This all looks fine to me from a table function standpoint. Thanks! |
mohsaka
left a comment
There was a problem hiding this comment.
Looks good from a table function standpoint.
aditi-pandit
left a comment
There was a problem hiding this comment.
Thanks @NivinCS for this writeup. I have a bunch of questions.
RFC-0022-jvector-integration.md
Outdated
| Coordinator-side operator that aggregates results from all workers to produce global top-K. either we can use k-way merge or bruteforce search to efficiently maintain the best K results. | ||
|
|
||
| ```java | ||
| public class TopKAggregationOperator implements Operator { |
There was a problem hiding this comment.
Why is this a new operator ? Can we achieve the same functionality with a custom aggregation function or another table operator (which is a data processor vs the previous split processor now)
There was a problem hiding this comment.
We introduced a new TopKAggregationOperator to merge pre-sorted results produced by workers. One limitation we observed with custom aggregation functions is that they process input rows in an arbitrary order, which causes the loss of the sorted order already produced by the workers.
Since the TableFunctionDataProcessor runs on workers and processes partitions independently, it cannot perform aggregation across results from all workers. A coordinator-side aggregation step is therefore required.
While Presto already provides result aggregation through the TopNOperator, it is primarily designed for general use cases. In our scenario, the LIMIT clause already constrains the result to top-K, and TopNOperator uses a heap-based algorithm that does not take advantage of pre-sorted inputs, making it less efficient.
There was a problem hiding this comment.
Its not possible to take advantage of sortedness unless the topK is co-located with the prior results from thw worker. If there is an exchange of the worker results then its not guaranteed that the topK operator is going to see sorted results from all the workers. The buffers between workers are mixed losing the sortedness. I don't think you can get by without using a heap-based algorithm like TopN.
Were you able to prototype this work with TopN but not happy with its performance ?
There was a problem hiding this comment.
During our initial prototype, we used Presto’s TopNOperator to compute the final Top-K results. This prototype ran on a single node with a relatively small dataset (around 10k vectors), and we did not observe any issues in that setup. Since the Jvector index search returns results in sorted order, we considered introducing a customised operator to potentially achieve better performance.
However, as you pointed out, the sorted order from workers is not preserved during data transfer to the coordinator .We can continue to use the existing TopNOperator for global Top-K aggregation. This approach allows us to reuse existing functionality and avoids introducing any changes to Presto’s core components.
| doc_id BIGINT, | ||
| title VARCHAR, | ||
| content VARCHAR, | ||
| embedding ARRAY(REAL), -- 768-dimensional embedding |
There was a problem hiding this comment.
I understand this is a column in the table, but can you give the SQL to insert data into this table ?
There was a problem hiding this comment.
added a sample INSERT query in RFC including a realistic 768-dimension embedding example (shortened for readability)
INSERT INTO documents (
doc_id,
title,
content,
embedding,
created_date
)
VALUES (
1001,
'Vector Search in Presto',
'This document explains how vector similarity search works in Presto.',
ARRAY[
0.0123, -0.3456, 0.7891, 0.4567, -0.1123,
0.9981, -0.2234, 0.3345, 0.6678, -0.5543
-- ... continue until 768 REAL values
],
DATE '2026-03-01'
);
There was a problem hiding this comment.
Having such a column looks hard to use and also could have problems as we would want embeddings to be maintained as data changes.
There seem to be special vectorizer kind of components for this https://github.com/timescale/pgai/blob/main/docs/vectorizer/overview.md
We should explore something on those ideas for this work.
There was a problem hiding this comment.
Thanks for sharing the pgai vectorizer doc. We did some research on the vectorizer after going through it.
Based on that, the approach we’re converging on is:
- Treat embeddings as derived data, not user-managed columns.
- Use a vectorizer-style component that maintains embeddings asynchronously.
In environments where a distributed processing engine is available, this can be implemented using background Spark jobs:
- Spark runs in the background to generate and refresh embeddings from the source table.
- Embeddings are stored in a separate, system-managed table/index.
- Updates and deletes are handled via batch or incremental Spark jobs, keeping embeddings in sync without changing the user table.
This model is only feasible in watsonx.data, where we have a managed Spark runtime and job orchestration.
In OSS Presto, this part is intentionally out of scope:
- Users would need to run their own ingestion / embedding pipeline (Spark, Flink, etc.).
- Presto would only support retrieval and similarity search over precomputed embeddings, not embedding generation or lifecycle management.
RFC-0022-jvector-integration.md
Outdated
|
|
||
| ### Architecture Overview (index building flow) | ||
|
|
||
| The implementation follows a **begin-execute-finish pattern** with distributed execution: once user calls `CREATE INDEX` statement, the coordinator validates parameters and generates splits for each partition. Each worker then processes its assigned splits to build the index. The coordinator aggregates results and finalizes the index creation. |
There was a problem hiding this comment.
Seems below that a CALL procedure is used for index building by the user not a CREATE INDEX command. Please can you clarify.
There was a problem hiding this comment.
Yes, you’re right. we invoke a CALL procedure, not a CREATE INDEX statement. updated in RFC.
|
|
||
| #### 1. Index Caching | ||
|
|
||
| **VectorIndexCache** |
There was a problem hiding this comment.
In which component will this be used ?
There was a problem hiding this comment.
The cache is connector-specific and lives entirely within the Iceberg connector implementation, making it a plugin-level optimisation rather than a core Presto feature.
| Manages indexes for individual partitions, supporting incremental index updates as partitions are added or removed. | ||
|
|
||
| ```java | ||
| public interface PartitionAwareIndexManager { |
There was a problem hiding this comment.
In which Presto components will this be used ?
RFC-0022-jvector-integration.md
Outdated
|
|
||
| **ParallelIndexBuilder** | ||
|
|
||
| Builds multiple partition indexes concurrently using Presto's distributed execution framework: |
There was a problem hiding this comment.
Since the index building happens concurrently on all workers the parallelism seemed implicit to me. Are you suggesting there that this will be used to introduce parallelism within the single worker itself ?
There was a problem hiding this comment.
You’re right. A ParallelIndexBuilder is not needed, as parallelism is implicit.
Hi @gggrace14, thanks for the suggestion. The design intentionally does not introduce changes to core components; the impact is limited to connector implementations only, which plug into existing engine behavior. We can add a short section to explicitly clarify the non-impact on other components if needed. |
a933fd3 to
f238cc4
Compare
f238cc4 to
da3f33a
Compare
tdcmeehan
left a comment
There was a problem hiding this comment.
I think there's a hidden presumption in this PR--that "row IDs" are stable. To be clear, there is no concept of row ID in V1 tables, in V2 tables they do exist but are not stable, and only in V3 tables are row IDs stable. By stable, I mean, do they survive things like compaction, snapshot expirations, branches, etc. If there's a hidden dependency on Iceberg V3 table format, which has stable row IDs, we should call it out explicitly in this RFC. If not, then we need to mention this as a potentially significant limitation in this design. It's significant because a compaction will wipe out the indices. Likewise, if the table is copy-on-write, a single update or delete will at best wipe out the indices, at worst we'll return stale data.
| **Benefits:** | ||
| - ✅ Versioning: Index tied to specific snapshot | ||
| - ✅ Consistency: Validate index via snapshot ID comparison | ||
| - ✅ Cleanup: Expired snapshots auto-clean index references |
There was a problem hiding this comment.
How do the vector mappings and indexes get cleaned up?
RFC-0022-jvector-integration.md
Outdated
| } | ||
| ``` | ||
|
|
||
| **TopKAggregationOperator** |
There was a problem hiding this comment.
I was under the impression that this RFC was purely at the connector/plugin layer, however if this is a new node, how exactly will it be planned and executed? This does seem like a core Presto change. Presumably this will be planned--how? How does the planner know to insert this node?
There was a problem hiding this comment.
We had a similar discussion with @aditi-pandit as well. As mentioned in this comment , we have updated the RFC to use the existing TopNOperator for global Top-K aggregation, thereby avoiding the introduction of any changes to Presto’s core components.
There was a problem hiding this comment.
How does the planner know to insert the TopNOperator?
There was a problem hiding this comment.
The Table-Valued Function (TVF) planning explicitly declares the required ordering and limit, so that the engine’s existing planning rules naturally introduce a TopN during plan generation. In our use case we applies ORDER BY ann.score LIMIT k on the Table-Valued Function output.
hantangwangd
left a comment
There was a problem hiding this comment.
Thanks for this great feature.
Before describing the ANN search flow and index building flow in detail, it might be beneficial to begin by introducing the concept of partitions as used in this proposal, clarifying their relationship to both Iceberg table partitions and index partitions, and outlining the philosophy behind partition-based distributed execution. This would provide crucial context to readers before diving into the two flows.
RFC-0022-jvector-integration.md
Outdated
| **2. Metadata Implementation for Begin/Finish** | ||
|
|
||
| Implements the distributed procedure lifecycle in ConnectorMetadata: | ||
|
|
||
| ```java | ||
| public class IcebergMetadata implements ConnectorMetadata { | ||
|
|
||
| /** | ||
| * Begin distributed procedure execution. | ||
| * Validates parameters and returns execution handle. | ||
| * | ||
| * @param session The connector session | ||
| * @param procedureName Qualified procedure name | ||
| * @param tableLayoutHandle Handle to the target table | ||
| * @param arguments Procedure arguments | ||
| * @return BuildVectorIndexHandle with execution context | ||
| */ | ||
| @Override | ||
| public ConnectorDistributedProcedureHandle beginCallDistributedProcedure( | ||
| ConnectorSession session, | ||
| QualifiedObjectName procedureName, | ||
| ConnectorTableLayoutHandle tableLayoutHandle, | ||
| List<Object> arguments); |
There was a problem hiding this comment.
It's not entirely clear to me about one point here. Are you suggesting we implement a new IcebergMetadata specifically for vector indices building and searching? Or, in other words, is the goal here to introduce a new Iceberg catalog type dedicated to vector indexing? If that's not the case, then I believe we likely don't need to re-implement these two methods. The actual logic should be handled within BuildVectorIndexDistributedProcedure. What are your thoughts on this?
There was a problem hiding this comment.
You’re right. There’s no need to introduce a new IcebergMetadata implementation or a separate Iceberg catalog type for vector index building.
The index build logic can be handled entirely within BuildVectorIndexDistributedProcedure, with the required metadata updates performed during the finish phase.
Based on this, we don’t need to re-implement those metadata methods. I’ve updated the RFC accordingly. Thanks for calling this out.
RFC-0022-jvector-integration.md
Outdated
| @Override | ||
| public ConnectorSplitSource getSplits( | ||
| ConnectorTransactionHandle transaction, | ||
| ConnectorSession session, | ||
| ConnectorTableLayoutHandle layout, | ||
| SplitSchedulingContext splitSchedulingContext); |
There was a problem hiding this comment.
Is it intentional to override this method rather than the following table function specific method?
@Override
public ConnectorSplitSource getSplits(
ConnectorTransactionHandle transaction,
ConnectorSession session,
ConnectorTableFunctionHandle function)
There was a problem hiding this comment.
+1... I think to getSplits you will need information about the underlying documents table and index and those information will be passed in the ConnectorTableFunctionHandle.
There was a problem hiding this comment.
Yes, we should be using ConnectorTableFunctionHandle. We’ve updated the RFC accordingly. Thank you for pointing this out.
RFC-0022-jvector-integration.md
Outdated
| 5. **ConnectorSplitManager**: | ||
| - Generates splits for parallel index building | ||
| - Creates one split per partition for partitioned tables | ||
| - Each split contains partition-specific information (data location, vector column, parameters) |
There was a problem hiding this comment.
Is the detailed explanation for this part missing below?
There was a problem hiding this comment.
I have updated the RFC with detailed information for this. Thanks you!
aditi-pandit
left a comment
There was a problem hiding this comment.
@NivinCS : Had another question about security related features as well ? Most of the Vector search libraries used for RAG also mention about how they provide secure access. Is that a concern at all in this work ?
| doc_id BIGINT, | ||
| title VARCHAR, | ||
| content VARCHAR, | ||
| embedding ARRAY(REAL), -- 768-dimensional embedding |
There was a problem hiding this comment.
Having such a column looks hard to use and also could have problems as we would want embeddings to be maintained as data changes.
There seem to be special vectorizer kind of components for this https://github.com/timescale/pgai/blob/main/docs/vectorizer/overview.md
We should explore something on those ideas for this work.
RFC-0022-jvector-integration.md
Outdated
| } | ||
| ``` | ||
|
|
||
| ### Architecture Overview (index building flow) |
There was a problem hiding this comment.
The document would be more readable if you added about the index building before the query part as the index build is a key part of the user workflow.
There was a problem hiding this comment.
Thanks for suggesting this. Updated the RFC accordingly.
| DATE '2026-03-01' | ||
| ); | ||
|
|
||
| -- Find top 10 most similar documents to a query |
There was a problem hiding this comment.
Since the user is expected to create an index as well, it would be good to add those SQL here also.
There was a problem hiding this comment.
Thanks for suggesting this. Updated the RFC to include the index-creation procedure call
| doc_id BIGINT, | ||
| title VARCHAR, | ||
| content VARCHAR, | ||
| embedding ARRAY(REAL), -- 768-dimensional embedding |
There was a problem hiding this comment.
Also most vector search implementations in databases/query engines are making the embedding a special vector type. Is that something you have considered ? If yes, then what would be the implications to this proposal.
There was a problem hiding this comment.
We did consider introducing a dedicated vector data type in both Iceberg and Presto early in the design. However, doing so would require changes across multiple layers: a new type definition in Iceberg, corresponding reader/writer and schema support, and engine-level type handling in Presto. This would likely require maintaining a temporary Iceberg fork until the type is accepted by the Iceberg community, significantly increasing delivery and maintenance cost.
Most databases introduce a vector type primarily to enable efficient vector operations and indexing. In our design, those capabilities are already provided by the JVector library itself, while Presto only needs a way to pass embeddings to the index. Representing embeddings as ARRAY is sufficient for this purpose and avoids introducing new type semantics into the engine.
Additionally, Presto has historically been conservative about adding new primitive types unless there is a strong, engine-level justification. As discussed earlier (and as suggested by @tdcmeehan ), introducing a vector data type should be considered only if there is a clear requirement that cannot be met with existing types. At this stage, the proposal intentionally minimises scope by reusing ARRAY, keeping the design simpler, more portable, and aligned with both Presto and Iceberg community expectations.
We see a dedicated vector type as possible future work if concrete limitations emerge that cannot be addressed with the current approach.
RFC-0022-jvector-integration.md
Outdated
| * - Arguments: query_vector (ARRAY(REAL)), column_name (VARCHAR), limit (BIGINT) | ||
| * - Return type: TABLE(row_id BIGINT, score REAL) | ||
| */ | ||
| public ApproxNearestNeighborsFunction() { ... } |
There was a problem hiding this comment.
Can you give more details... Seems like the return type will be DescribedTableReturnTypeSpecification.
The input parameters also don't seem to include a TABLE argument which is fine.
But if would be good if you wrote more specifics here.
There was a problem hiding this comment.
Thanks for suggesting this. Updated the RFC with detailed information about the ApproxNearestNeighborsFunction method and its return type.
| ); | ||
|
|
||
| -- Refresh an index (rebuild with latest data) | ||
| CALL system.refresh_vector_index( |
There was a problem hiding this comment.
Indexes can get out of sync -- What happens behind the scenes here ? Will you be executing a DROP followed by CREATE ?
Other offerings seem to do https://docs.databricks.com/aws/en/vector-search/vector-search
|
|
||
| **VectorIndexCache** | ||
|
|
||
| Worker-side cache for loaded indexes and mappings using any caching mechanism. Reduces repeated downloads from S3/HDFS. |
There was a problem hiding this comment.
Do you have any particular caching system in mind ? How do you ensure stickiness that subsequent queries use the same worker for the same split ?
There was a problem hiding this comment.
We don't have a particular caching system in mind at this stage. During our internal discussions, this point came up and we thought it would be better to keep this as a future enhancement . We have a basic idea of the approach, but we need to explore more on the specific caching implementation and worker affinity mechanisms.
The VectorIndexCache interface is intentionally designed to be implementation-agnostic, allowing flexibility to choose the most appropriate caching strategy (e.g., Caffeine, Guava, memory-mapped files, or distributed caches like Alluxio) based on performance testing and production requirements. Similarly, the split-to-worker affinity mechanism to ensure cache locality also need to be evaluated, considering options like consistent hashing, custom node selectors, or accepting cache misses as acceptable overhead.
| Worker-side cache for loaded indexes and mappings using any caching mechanism. Reduces repeated downloads from S3/HDFS. | ||
|
|
||
| ```java | ||
| public interface VectorIndexCache { |
There was a problem hiding this comment.
Which component is using this ? I'm not following where this fits in.
There was a problem hiding this comment.
The VectorIndexCache is used by the connector's TableFunctionSplitProcessor implementation (specifically ANNSplitProcessor) which is a Presto SPI component that executes on worker nodes during split execution - it calls getIndex() to load the vector index from cache before performing the JVector ANN search. (This can be accommodated as a future enhancement to provide better performance by reducing repeated downloads from S3/HDFS storage. )
|
|
||
| #### 2. Adaptive Search Parameters | ||
|
|
||
| **AdaptiveSearchParameterOptimizer** |
There was a problem hiding this comment.
Where is this used ? Presume in split generation in table function ?
There was a problem hiding this comment.
The AdaptiveSearchParameterOptimizer is used in the ANNSplitProcessor during split execution on worker nodes to dynamically calculate the optimal ef_search parameter for HNSW searches based on index size, K value, and target recall, though this is planned as a future enhancement with static configuration used initially.
| partitioning = ARRAY['created_date'] | ||
| ); | ||
|
|
||
| -- sample INSERT query including a realistic 768-dimension embedding example (shortened for readability) |
There was a problem hiding this comment.
All the work here is described in context of iceberg... But if we use any other table format like delta, much of these SQL constructs are still applicable.
Is there any particular reason to tie this implementation to iceberg ? Is it possible to separate this implementation into generic and specific pieces to wire between the different table formats ?
There was a problem hiding this comment.
We had a discussion with Gopi on this, and the conclusion is that Iceberg is our current priority. While this approach can be extended to support other table formats in the future, that can be considered later. We can also look into separating the generic components from the Iceberg-specific pieces and work with Guy on the Iceberg-related aspects.
da3f33a to
fa2cdc0
Compare
fa2cdc0 to
3f8af05
Compare
722a31d to
896630d
Compare
896630d to
ea0b2ae
Compare
ea0b2ae to
8d7c79b
Compare
Co-authored-by: Shijin K <Shijin.SK@ibm.com> Co-authored-by: Dilli Babu Godari <Dilli.Babu.Godari@ibm.com> Co-authored-by: Nandakumar B <Nandakumar.B2@ibm.com>
8d7c79b to
8759c47
Compare
We had a discussion with @gopikris, and the conclusion was that this is similar to how database indexes work today. If a user has access to a table, they automatically have access to the associated vector index as well. There is no need for any separate or explicit access control for vector index. |
No description provided.