Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
82cced9
Core: Merge DVs referencing the same data files as a safeguard
amogh-jahagirdar Jan 9, 2026
e41943d
Fix dangling delete tests
amogh-jahagirdar Jan 9, 2026
76e24e4
Simplification in OutputFileFactory
amogh-jahagirdar Jan 9, 2026
a740ff9
minor optimization
amogh-jahagirdar Jan 9, 2026
11ffc2f
cleanup, make outputfilefactory take in more fields so that we don't …
amogh-jahagirdar Jan 10, 2026
772e3c2
change the duplicate tracking algorithm, fix spark tests
amogh-jahagirdar Jan 10, 2026
3404a86
Add more tests for multiple DVs and w equality deletes
amogh-jahagirdar Jan 11, 2026
c04d0e0
Rebase and fix spark 4.1 tests
amogh-jahagirdar Jan 11, 2026
a39b073
more cleanup, put dvfilewriter in try w resources
amogh-jahagirdar Jan 11, 2026
a079d22
Add logging, some more cleanup
amogh-jahagirdar Jan 12, 2026
d7eadb0
more cleanup
amogh-jahagirdar Jan 12, 2026
0a053a6
Make dv refs a multimap, group by partition to write single puffin fo…
amogh-jahagirdar Jan 15, 2026
6b04dd9
Filter files with duplicates before sifting through them and merging
amogh-jahagirdar Jan 15, 2026
a50fb32
update old comment
amogh-jahagirdar Jan 15, 2026
301f0fe
Use an unpartitioned spec for the output file factory for the final p…
amogh-jahagirdar Jan 16, 2026
112b086
address feedback
amogh-jahagirdar Jan 16, 2026
eecacad
Add some spacing to make precondition checks more readable
amogh-jahagirdar Jan 16, 2026
9673f85
more style stuff
amogh-jahagirdar Jan 16, 2026
4104097
Update delete loader to use IOUtil.readDV API
amogh-jahagirdar Jan 16, 2026
9972ce0
Update interface documentation
amogh-jahagirdar Jan 16, 2026
e75bb03
address Ryan's feedback
amogh-jahagirdar Jan 25, 2026
6bccc52
Dedupe pos/equality deletes to preserve previous behavior, prevent an…
amogh-jahagirdar Jan 26, 2026
9bb9c56
Make DV tracking a linked hashmap to preserve ordering of entries
amogh-jahagirdar Jan 27, 2026
85801f1
Remove passing in specs to util
amogh-jahagirdar Jan 27, 2026
669f125
more cleanup
amogh-jahagirdar Jan 27, 2026
a5a5ea0
address feedback
amogh-jahagirdar Feb 5, 2026
67f3d17
cleanup
amogh-jahagirdar Feb 5, 2026
8087558
more consistent naming
amogh-jahagirdar Feb 5, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
215 changes: 215 additions & 0 deletions core/src/main/java/org/apache/iceberg/DVUtil.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
import org.apache.iceberg.deletes.BaseDVFileWriter;
import org.apache.iceberg.deletes.DVFileWriter;
import org.apache.iceberg.deletes.Deletes;
import org.apache.iceberg.deletes.PositionDeleteIndex;
import org.apache.iceberg.encryption.EncryptionManager;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.LocationProvider;
import org.apache.iceberg.io.OutputFileFactory;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Comparators;
import org.apache.iceberg.util.Tasks;

class DVUtil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should DVUtil be in deletes? That would require making some of this public.

private DVUtil() {}

/**
* Merges duplicate DVs for the same data file and writes the merged DV Puffin files. If there is
* exactly 1 DV for a given data file then it is return as is
*
* @param dvsByFile map of data file location to DVs
* @return a list containing both any newly merged DVs and any DVs that are already valid
*/
static List<DeleteFile> mergeAndWriteDvsIfRequired(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo: Dvs should be DVs.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like this happens quite a bit. We usually don't follow the Java convention of changing acronyms to camel case. For instance, DV util -> DVUtil instead of DvUtil and file IO -> fileIO. (The exception is ID that was added early and does usually change to fieldId)

Map<String, List<DeleteFile>> dvsByFile,
ExecutorService pool,
LocationProvider locationProvider,
EncryptionManager encryptionManager,
FileIO fileIO,
Map<Integer, PartitionSpec> specs) {
List<DeleteFile> finalDvs = Lists.newArrayList();
Map<String, List<DeleteFile>> duplicateDvsByFile = Maps.newLinkedHashMap();
for (Map.Entry<String, List<DeleteFile>> dvsForFile : dvsByFile.entrySet()) {
List<DeleteFile> dvs = dvsForFile.getValue();
if (!dvs.isEmpty()) {
if (dvs.size() == 1) {
finalDvs.addAll(dvs);
} else {
duplicateDvsByFile.put(dvsForFile.getKey(), dvs);
}
}
}

if (!duplicateDvsByFile.isEmpty()) {
List<DeleteFile> duplicateDvs =
duplicateDvsByFile.values().stream().flatMap(List::stream).collect(Collectors.toList());

Map<String, PositionDeleteIndex> mergedIndices =
readAndMergeDvs(duplicateDvs, pool, fileIO, encryptionManager, specs);
finalDvs.addAll(
writeMergedDVs(
mergedIndices,
duplicateDvsByFile,
locationProvider,
encryptionManager,
fileIO,
specs));
}

return finalDvs;
}

/**
* Reads all DVs, and merge the position indices per referenced data file
*
* @param io the FileIO to use for reading DV files
* @param encryptionManager the EncryptionManager for decrypting DV files
* @param duplicateDvs list of dvs to read and merge
* @param specsById map of partition spec ID to partition spec
* @param pool executor service for reading DVs
* @return map of referenced data file location to the merged position delete index
*/
private static Map<String, PositionDeleteIndex> readAndMergeDvs(
List<DeleteFile> duplicateDvs,
ExecutorService pool,
FileIO io,
EncryptionManager encryptionManager,
Map<Integer, PartitionSpec> specsById) {
// Read all duplicate DVs in parallel
PositionDeleteIndex[] duplicateDvPositions = new PositionDeleteIndex[duplicateDvs.size()];
Tasks.range(duplicateDvPositions.length)
.executeWith(pool)
.stopOnFailure()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is in the commit path, do we want to retry?

Copy link
Contributor Author

@amogh-jahagirdar amogh-jahagirdar Feb 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we should? This is all file I/O and I think I'd expect FileIOs under the hood to already be retrying. I'm not sure we should really compound the retries or if there's any benefit to doing that past what FileIO's are already doing, especially at this point where we're doing extra work in the commit path already.

.throwFailureWhenFinished()
.run(
i ->
duplicateDvPositions[i] =
Deletes.readDV(duplicateDvs.get(i), io, encryptionManager));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As noted earlier, there should be no need to pass encryptionManager separately.


// Build a grouping of referenced file to indices of the corresponding duplicate DVs
Map<String, List<Integer>> dvIndicesByDataFile = Maps.newLinkedHashMap();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why use linked hashmaps?

for (int i = 0; i < duplicateDvs.size(); i++) {
dvIndicesByDataFile
.computeIfAbsent(duplicateDvs.get(i).referencedDataFile(), k -> Lists.newArrayList())
.add(i);
}

// Validate and merge per referenced file, caching comparators by spec ID
Map<Integer, Comparator<StructLike>> comparatorsBySpecId = Maps.newHashMap();
Map<String, PositionDeleteIndex> result = Maps.newHashMap();
for (Map.Entry<String, List<Integer>> entry : dvIndicesByDataFile.entrySet()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like the reason for the complex logic here is to validate the DVs can be merged. As I noted below I think that should be done before reading. If you restructure that way, then all you need is to loop over the PositionDeleteIndex array and duplicateDVs at the same time and either adding a new entry for the DeleteFile#referencedDataFile or merging them. I think that would really simplify this part.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you're right about this, we can really simplify if we just do the validation in that initial loop where we construct the duplicate mapping. Everything we need to validate is at the metadata level. Then we can just work off the assumption later on that everything can be merged, and just loop through the indices and merge.

List<Integer> dvIndicesForFile = entry.getValue();
int firstDVIndex = dvIndicesForFile.get(0);
PositionDeleteIndex mergedIndexForFile = duplicateDvPositions[firstDVIndex];
DeleteFile firstDv = duplicateDvs.get(firstDVIndex);

Comparator<StructLike> partitionComparator =
comparatorsBySpecId.computeIfAbsent(
firstDv.specId(), id -> Comparators.forType(specsById.get(id).partitionType()));

for (int i = 1; i < dvIndicesForFile.size(); i++) {
int dvIndex = dvIndicesForFile.get(i);
DeleteFile dv = duplicateDvs.get(dvIndex);
validateDVCanBeMerged(dv, firstDv, partitionComparator);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is a bit late to validate that DVs can be merged because they were all read into memory. I would move this check to the start of the method.

Copy link
Contributor

@rdblue rdblue Feb 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good reason to pass the original map in since it is available to the calling method.

mergedIndexForFile.merge(duplicateDvPositions[dvIndex]);
}

result.put(entry.getKey(), mergedIndexForFile);
}

return result;
}

private static void validateDVCanBeMerged(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For method names, it's shorter if you avoid past tense. For example: validateCanMerge rather than validateCanBeMerged. I also think since this is DVUtil you can get away with not having DV in each method name.

DeleteFile first, DeleteFile second, Comparator<StructLike> partitionComparator) {
Preconditions.checkArgument(
Objects.equals(first.dataSequenceNumber(), second.dataSequenceNumber()),
"Cannot merge duplicate added DVs when data sequence numbers are different, "
+ "expected all to be added with sequence %s, but got %s",
first.dataSequenceNumber(),
second.dataSequenceNumber());

Preconditions.checkArgument(
first.specId() == second.specId(),
"Cannot merge duplicate added DVs when partition specs are different, "
+ "expected all to be added with spec %s, but got %s",
first.specId(),
second.specId());

Preconditions.checkArgument(
partitionComparator.compare(first.partition(), second.partition()) == 0,
"Cannot merge duplicate added DVs when partition tuples are different");
}

// Produces a single Puffin file containing the merged DVs
private static List<DeleteFile> writeMergedDVs(
Map<String, PositionDeleteIndex> mergedIndices,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There aren't indices here.

Map<String, List<DeleteFile>> dvsByFile,
LocationProvider locationProvider,
EncryptionManager encryptionManager,
FileIO fileIO,
Map<Integer, PartitionSpec> specsById) {
try (DVFileWriter dvFileWriter =
new BaseDVFileWriter(
// Use an unpartitioned spec for the location provider for the puffin containing
// all the merged DVs
OutputFileFactory.builderFor(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This OutputFileFactory has a fake partition spec and IDs. Maybe we don't need to use it, then. All this really needs to do is generate a file name, add the Puffin extension, and call the location provider. I'd rather just do those things directly and not need to fake OutputFileFactory, which is intended for tasks. This could also use the snapshot ID.

locationProvider,
encryptionManager,
() -> fileIO,
PartitionSpec.unpartitioned(),
FileFormat.PUFFIN,
1,
1)
.build(),
path -> null)) {

for (Map.Entry<String, PositionDeleteIndex> entry : mergedIndices.entrySet()) {
String referencedLocation = entry.getKey();
PositionDeleteIndex mergedPositions = entry.getValue();
List<DeleteFile> duplicateDvs = dvsByFile.get(referencedLocation);
DeleteFile firstDV = duplicateDvs.get(0);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd combine with the previous line. You don't need the duplicates after this, just the first.

dvFileWriter.delete(
referencedLocation,
mergedPositions,
specsById.get(firstDV.specId()),
firstDV.partition());
}

dvFileWriter.close();
return dvFileWriter.result().deleteFiles();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
}
57 changes: 42 additions & 15 deletions core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.iceberg.encryption.EncryptedOutputFile;
import org.apache.iceberg.events.CreateSnapshotEvent;
import org.apache.iceberg.exceptions.ValidationException;
Expand All @@ -47,6 +48,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.relocated.com.google.common.collect.Streams;
import org.apache.iceberg.util.CharSequenceSet;
import org.apache.iceberg.util.ContentFileUtil;
import org.apache.iceberg.util.DataFileSet;
Expand All @@ -55,6 +57,7 @@
import org.apache.iceberg.util.PartitionSet;
import org.apache.iceberg.util.SnapshotUtil;
import org.apache.iceberg.util.Tasks;
import org.apache.iceberg.util.ThreadPools;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -86,8 +89,8 @@ abstract class MergingSnapshotProducer<ThisT> extends SnapshotProducer<ThisT> {
// update data
private final Map<Integer, DataFileSet> newDataFilesBySpec = Maps.newHashMap();
private Long newDataFilesDataSequenceNumber;
private final Map<Integer, DeleteFileSet> newDeleteFilesBySpec = Maps.newHashMap();
private final Set<String> newDVRefs = Sets.newHashSet();
private final List<DeleteFile> positionAndEqualityDeletes = Lists.newArrayList();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

allDeletes?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see below that this doesn't include DVs. But DVs are still position deletes, so we should probably find a better name.

private final Map<String, List<DeleteFile>> dvsByReferencedFile = Maps.newLinkedHashMap();
Comment on lines +92 to +93
Copy link
Contributor Author

@amogh-jahagirdar amogh-jahagirdar Jan 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rdblue These are 2 disjoint fields, one for a list of v2 deletes and a multimap for DVs.
The map is a LinkedHashMap because we have a bunch of tests which have expectations on the exact orders of entries in a manifest. The previous change didn't require anything because we worked with the deleteFilesBySpec, and inherently preserved the order.

I personally think our tests should probably get away from expecting a certain order in manifests, and just assert the contents (or at least have validate methods that express either being strict on the ordering or not). As we get into V4, maybe we'll make implementation choices for ordering entries in a certain way but in the current state of things, it was kind of a hinderance to making changes here.

I didn't make the test change since it's fairly large, and can be distracting from this change and I figured the linkedhashma has negligible overhead so we can just preserve the existing behavior.

private final List<ManifestFile> appendManifests = Lists.newArrayList();
private final List<ManifestFile> rewrittenAppendManifests = Lists.newArrayList();
private final SnapshotSummary.Builder addedFilesSummary = SnapshotSummary.builder();
Expand Down Expand Up @@ -222,7 +225,7 @@ protected boolean addsDataFiles() {
}

protected boolean addsDeleteFiles() {
return !newDeleteFilesBySpec.isEmpty();
return !positionAndEqualityDeletes.isEmpty() || !dvsByReferencedFile.isEmpty();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't entirely correct because it assumes that the lists in dvsByReferencedFile are non-empty. It would probably be better to check:

  protected boolean addsDeleteFiles() {
    return !positionAndEqualityDeletes.isEmpty()
        || dvsByReferencedFile.values().stream().anyMatch(dvs -> !dvs.isEmpty());
  }

}

/** Add a data file to the new snapshot. */
Expand Down Expand Up @@ -265,15 +268,14 @@ private void addInternal(DeleteFile file) {
"Cannot find partition spec %s for delete file: %s",
file.specId(),
file.location());

DeleteFileSet deleteFiles =
newDeleteFilesBySpec.computeIfAbsent(spec.specId(), ignored -> DeleteFileSet.create());
if (deleteFiles.add(file)) {
addedFilesSummary.addedFile(spec, file);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because we may be merging duplicates, we don't update the summary for delete files until after we dedupe and are just about to write the new manifests

hasNewDeleteFiles = true;
if (ContentFileUtil.isDV(file)) {
newDVRefs.add(file.referencedDataFile());
}
hasNewDeleteFiles = true;
if (ContentFileUtil.isDV(file)) {
List<DeleteFile> dvsForReferencedFile =
dvsByReferencedFile.computeIfAbsent(
file.referencedDataFile(), newFile -> Lists.newArrayList());
dvsForReferencedFile.add(file);
} else {
positionAndEqualityDeletes.add(file);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In v3, position deletes are no longer allowed to be added to the table, so should this have a check that the table is either version 2 or the deletes are equality deletes?

I know that we don't want to add the complexity right now to rewrite v2 position deletes as DVs, but it doesn't seem like we should allow them to be added through the public API.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should do this in a separate commit. I wouldn't want to backport it to 1.10.x but we should definitely check because there isn't one elsewhere that I see.

}
}

Expand Down Expand Up @@ -814,7 +816,7 @@ protected void validateAddedDVs(
Expression conflictDetectionFilter,
Snapshot parent) {
// skip if there is no current table state or this operation doesn't add new DVs
if (parent == null || newDVRefs.isEmpty()) {
if (parent == null || dvsByReferencedFile.isEmpty()) {
return;
}

Expand Down Expand Up @@ -847,7 +849,7 @@ private void validateAddedDVs(
DeleteFile file = entry.file();
if (newSnapshotIds.contains(entry.snapshotId()) && ContentFileUtil.isDV(file)) {
ValidationException.check(
!newDVRefs.contains(file.referencedDataFile()),
!dvsByReferencedFile.containsKey(file.referencedDataFile()),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this change is correct, but I want to note that in the future we could avoid failing by merging DVs as long as that is allowed by the operation being committed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah had an old PR out for this https://github.com/apache/iceberg/pull/11693/files#diff-410ff1b47d9a44a2fd5dbd103cad9463d82c8f4f51aa1be63b8b403123ab6e0e (probably a bad PR title since by definition for the operation if the positions are disjoint, it's not conflicting)

"Found concurrently added DV for %s: %s",
file.referencedDataFile(),
ContentFileUtil.dvDesc(file));
Expand Down Expand Up @@ -1042,7 +1044,7 @@ private List<ManifestFile> newDataFilesAsManifests() {
}

private Iterable<ManifestFile> prepareDeleteManifests() {
if (newDeleteFilesBySpec.isEmpty()) {
if (!addsDeleteFiles()) {
return ImmutableList.of();
}

Expand All @@ -1060,9 +1062,34 @@ private List<ManifestFile> newDeleteFilesAsManifests() {
}

if (cachedNewDeleteManifests.isEmpty()) {
for (Map.Entry<String, List<DeleteFile>> entry : dvsByReferencedFile.entrySet()) {
if (entry.getValue().size() > 1) {
LOG.warn(
"Attempted to commit {} duplicate DVs for data file {} in table {}. "
+ "Merging duplicates, and original DVs will be orphaned.",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will be orphaned? I think this is a bit strong of a warning and could lead to incorrect issue reports.

It would be nice to detect whether the puffin file is actually orphaned by checking whether it has other DVs. It's fine to leave a Puffin file in place if it has live DVs from this commit. And it's fine to delete it if we see that all of its DVs will be merged.

We probably don't need to do all that in this commit, though. I'd probably leave this out and just have a warning about merging, without the part about orphan DVs.

entry.getValue().size(),
entry.getKey(),
tableName);
}
}

List<DeleteFile> finalDVs =
DVUtil.mergeAndWriteDvsIfRequired(
dvsByReferencedFile,
ThreadPools.getDeleteWorkerPool(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd put this at the end, since it is an optimization and not an argument that changes the result.

ops().locationProvider(),
ops().encryption(),
ops().io(),
ops().current().specsById());
// Prevent commiting duplicate V2 deletes by deduping them
Map<Integer, List<DeleteFile>> newDeleteFilesBySpec =
Streams.stream(Iterables.concat(finalDVs, DeleteFileSet.of(positionAndEqualityDeletes)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does this use a DeleteFileSet? Do we think that there are cases where not using one is a behavior change? I'd prefer failing if there are duplicate deletes.

.map(file -> Delegates.pendingDeleteFile(file, file.dataSequenceNumber()))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't seem correct. The delete files are already wrapped when passed into this class. At a minimum, positionAndEqualityDeletes don't need to be re-wrapped. And merged DVs should already be constructed with the right dataSequenceNumber because that's an internal rewrite.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mergedDVs would be constructed with the right data sequence number but this was done because we validate that every DeleteFile that we want to commit is an instance of PendingDeleteFile otherwise it fails. I didn't want to do that in DVUtil because I didn't want to make assumptions for how a caller would use it.

But you're right we don't need this for positionAndEqualityDeletes.

.collect(Collectors.groupingBy(ContentFile::specId));
newDeleteFilesBySpec.forEach(
(specId, deleteFiles) -> {
PartitionSpec spec = ops().current().spec(specId);
deleteFiles.forEach(file -> addedFilesSummary.addedFile(spec, file));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the cache is invalidated, this will double-count all delete files, which I think we have to fix. That's a bit ugly. The cleanest way I can think of is to keep a summary builder for data files and then combine the current set of deletes with data files here with a new builder, or possibly just clear the current builder and add everything.

List<ManifestFile> newDeleteManifests = writeDeleteManifests(deleteFiles, spec);
cachedNewDeleteManifests.addAll(newDeleteManifests);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,17 @@ public void delete(String path, long pos, PartitionSpec spec, StructLike partiti
positions.delete(pos);
}

@Override
public void delete(
String path,
PositionDeleteIndex positionDeleteIndex,
PartitionSpec spec,
StructLike partition) {
Deletes deletes =
deletesByPath.computeIfAbsent(path, key -> new Deletes(path, spec, partition));
deletes.positions().merge(positionDeleteIndex);
}

@Override
public DeleteWriteResult result() {
Preconditions.checkState(result != null, "Cannot get result from unclosed writer");
Expand Down
17 changes: 17 additions & 0 deletions core/src/main/java/org/apache/iceberg/deletes/DVFileWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,21 @@ public interface DVFileWriter extends Closeable {
* @return the writer result
*/
DeleteWriteResult result();

/**
* Marks every position that is deleted in positionDeleteIndex as deleted in the given data file.
* Implementations should merge with existing position indices for the provided path
*
* @param path the data file path
* @param positionDeleteIndex the position delete index containing all the positions to delete
* @param spec the data file partition spec
* @param partition the data file partition
*/
default void delete(
String path,
PositionDeleteIndex positionDeleteIndex,
PartitionSpec spec,
StructLike partition) {
throw new UnsupportedOperationException("Delete with positionDeleteIndex is not supported");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: This could be implemented by deleting specific positions from the delete index.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Error messages generally shouldn't use identifiers from code. Would this be more helpful to users if you used "Bulk deletes are not supported" or similar?

}
}
Loading