-
Notifications
You must be signed in to change notification settings - Fork 3k
Core: Detect and merge duplicate DVs for a data file and merge them before committing #15006
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
82cced9
e41943d
76e24e4
a740ff9
11ffc2f
772e3c2
3404a86
c04d0e0
a39b073
a079d22
d7eadb0
0a053a6
6b04dd9
a50fb32
301f0fe
112b086
eecacad
9673f85
4104097
9972ce0
e75bb03
6bccc52
9bb9c56
85801f1
669f125
a5a5ea0
67f3d17
8087558
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,215 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
| package org.apache.iceberg; | ||
|
|
||
| import java.io.IOException; | ||
| import java.io.UncheckedIOException; | ||
| import java.util.Comparator; | ||
| import java.util.List; | ||
| import java.util.Map; | ||
| import java.util.Objects; | ||
| import java.util.concurrent.ExecutorService; | ||
| import java.util.stream.Collectors; | ||
| import org.apache.iceberg.deletes.BaseDVFileWriter; | ||
| import org.apache.iceberg.deletes.DVFileWriter; | ||
| import org.apache.iceberg.deletes.Deletes; | ||
| import org.apache.iceberg.deletes.PositionDeleteIndex; | ||
| import org.apache.iceberg.encryption.EncryptionManager; | ||
| import org.apache.iceberg.io.FileIO; | ||
| import org.apache.iceberg.io.LocationProvider; | ||
| import org.apache.iceberg.io.OutputFileFactory; | ||
| import org.apache.iceberg.relocated.com.google.common.base.Preconditions; | ||
| import org.apache.iceberg.relocated.com.google.common.collect.Lists; | ||
| import org.apache.iceberg.relocated.com.google.common.collect.Maps; | ||
| import org.apache.iceberg.types.Comparators; | ||
| import org.apache.iceberg.util.Tasks; | ||
|
|
||
| class DVUtil { | ||
| private DVUtil() {} | ||
|
|
||
| /** | ||
| * Merges duplicate DVs for the same data file and writes the merged DV Puffin files. If there is | ||
| * exactly 1 DV for a given data file then it is return as is | ||
| * | ||
| * @param dvsByFile map of data file location to DVs | ||
| * @return a list containing both any newly merged DVs and any DVs that are already valid | ||
| */ | ||
| static List<DeleteFile> mergeAndWriteDvsIfRequired( | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Typo:
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Looks like this happens quite a bit. We usually don't follow the Java convention of changing acronyms to camel case. For instance, DV util -> |
||
| Map<String, List<DeleteFile>> dvsByFile, | ||
| ExecutorService pool, | ||
| LocationProvider locationProvider, | ||
| EncryptionManager encryptionManager, | ||
| FileIO fileIO, | ||
| Map<Integer, PartitionSpec> specs) { | ||
| List<DeleteFile> finalDvs = Lists.newArrayList(); | ||
| Map<String, List<DeleteFile>> duplicateDvsByFile = Maps.newLinkedHashMap(); | ||
| for (Map.Entry<String, List<DeleteFile>> dvsForFile : dvsByFile.entrySet()) { | ||
| List<DeleteFile> dvs = dvsForFile.getValue(); | ||
| if (!dvs.isEmpty()) { | ||
| if (dvs.size() == 1) { | ||
| finalDvs.addAll(dvs); | ||
| } else { | ||
| duplicateDvsByFile.put(dvsForFile.getKey(), dvs); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| if (!duplicateDvsByFile.isEmpty()) { | ||
| List<DeleteFile> duplicateDvs = | ||
| duplicateDvsByFile.values().stream().flatMap(List::stream).collect(Collectors.toList()); | ||
|
|
||
| Map<String, PositionDeleteIndex> mergedIndices = | ||
| readAndMergeDvs(duplicateDvs, pool, fileIO, encryptionManager, specs); | ||
| finalDvs.addAll( | ||
| writeMergedDVs( | ||
| mergedIndices, | ||
| duplicateDvsByFile, | ||
| locationProvider, | ||
| encryptionManager, | ||
| fileIO, | ||
| specs)); | ||
| } | ||
|
|
||
| return finalDvs; | ||
| } | ||
|
|
||
| /** | ||
| * Reads all DVs, and merge the position indices per referenced data file | ||
| * | ||
| * @param io the FileIO to use for reading DV files | ||
| * @param encryptionManager the EncryptionManager for decrypting DV files | ||
| * @param duplicateDvs list of dvs to read and merge | ||
| * @param specsById map of partition spec ID to partition spec | ||
| * @param pool executor service for reading DVs | ||
| * @return map of referenced data file location to the merged position delete index | ||
| */ | ||
| private static Map<String, PositionDeleteIndex> readAndMergeDvs( | ||
| List<DeleteFile> duplicateDvs, | ||
| ExecutorService pool, | ||
| FileIO io, | ||
| EncryptionManager encryptionManager, | ||
| Map<Integer, PartitionSpec> specsById) { | ||
| // Read all duplicate DVs in parallel | ||
| PositionDeleteIndex[] duplicateDvPositions = new PositionDeleteIndex[duplicateDvs.size()]; | ||
| Tasks.range(duplicateDvPositions.length) | ||
| .executeWith(pool) | ||
| .stopOnFailure() | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since this is in the commit path, do we want to retry?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think we should? This is all file I/O and I think I'd expect FileIOs under the hood to already be retrying. I'm not sure we should really compound the retries or if there's any benefit to doing that past what FileIO's are already doing, especially at this point where we're doing extra work in the commit path already. |
||
| .throwFailureWhenFinished() | ||
| .run( | ||
| i -> | ||
| duplicateDvPositions[i] = | ||
| Deletes.readDV(duplicateDvs.get(i), io, encryptionManager)); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As noted earlier, there should be no need to pass |
||
|
|
||
| // Build a grouping of referenced file to indices of the corresponding duplicate DVs | ||
| Map<String, List<Integer>> dvIndicesByDataFile = Maps.newLinkedHashMap(); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why use linked hashmaps? |
||
| for (int i = 0; i < duplicateDvs.size(); i++) { | ||
| dvIndicesByDataFile | ||
| .computeIfAbsent(duplicateDvs.get(i).referencedDataFile(), k -> Lists.newArrayList()) | ||
| .add(i); | ||
| } | ||
|
|
||
| // Validate and merge per referenced file, caching comparators by spec ID | ||
| Map<Integer, Comparator<StructLike>> comparatorsBySpecId = Maps.newHashMap(); | ||
| Map<String, PositionDeleteIndex> result = Maps.newHashMap(); | ||
| for (Map.Entry<String, List<Integer>> entry : dvIndicesByDataFile.entrySet()) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It looks like the reason for the complex logic here is to validate the DVs can be merged. As I noted below I think that should be done before reading. If you restructure that way, then all you need is to loop over the
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think you're right about this, we can really simplify if we just do the validation in that initial loop where we construct the duplicate mapping. Everything we need to validate is at the metadata level. Then we can just work off the assumption later on that everything can be merged, and just loop through the indices and merge. |
||
| List<Integer> dvIndicesForFile = entry.getValue(); | ||
| int firstDVIndex = dvIndicesForFile.get(0); | ||
| PositionDeleteIndex mergedIndexForFile = duplicateDvPositions[firstDVIndex]; | ||
| DeleteFile firstDv = duplicateDvs.get(firstDVIndex); | ||
|
|
||
| Comparator<StructLike> partitionComparator = | ||
| comparatorsBySpecId.computeIfAbsent( | ||
| firstDv.specId(), id -> Comparators.forType(specsById.get(id).partitionType())); | ||
|
|
||
| for (int i = 1; i < dvIndicesForFile.size(); i++) { | ||
| int dvIndex = dvIndicesForFile.get(i); | ||
| DeleteFile dv = duplicateDvs.get(dvIndex); | ||
| validateDVCanBeMerged(dv, firstDv, partitionComparator); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it is a bit late to validate that DVs can be merged because they were all read into memory. I would move this check to the start of the method.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is a good reason to pass the original map in since it is available to the calling method. |
||
| mergedIndexForFile.merge(duplicateDvPositions[dvIndex]); | ||
| } | ||
|
|
||
| result.put(entry.getKey(), mergedIndexForFile); | ||
| } | ||
|
|
||
| return result; | ||
| } | ||
|
|
||
| private static void validateDVCanBeMerged( | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For method names, it's shorter if you avoid past tense. For example: |
||
| DeleteFile first, DeleteFile second, Comparator<StructLike> partitionComparator) { | ||
| Preconditions.checkArgument( | ||
| Objects.equals(first.dataSequenceNumber(), second.dataSequenceNumber()), | ||
| "Cannot merge duplicate added DVs when data sequence numbers are different, " | ||
| + "expected all to be added with sequence %s, but got %s", | ||
| first.dataSequenceNumber(), | ||
| second.dataSequenceNumber()); | ||
|
|
||
| Preconditions.checkArgument( | ||
| first.specId() == second.specId(), | ||
| "Cannot merge duplicate added DVs when partition specs are different, " | ||
| + "expected all to be added with spec %s, but got %s", | ||
| first.specId(), | ||
| second.specId()); | ||
|
|
||
| Preconditions.checkArgument( | ||
| partitionComparator.compare(first.partition(), second.partition()) == 0, | ||
| "Cannot merge duplicate added DVs when partition tuples are different"); | ||
| } | ||
|
|
||
| // Produces a single Puffin file containing the merged DVs | ||
| private static List<DeleteFile> writeMergedDVs( | ||
| Map<String, PositionDeleteIndex> mergedIndices, | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There aren't indices here. |
||
| Map<String, List<DeleteFile>> dvsByFile, | ||
| LocationProvider locationProvider, | ||
| EncryptionManager encryptionManager, | ||
| FileIO fileIO, | ||
| Map<Integer, PartitionSpec> specsById) { | ||
| try (DVFileWriter dvFileWriter = | ||
| new BaseDVFileWriter( | ||
| // Use an unpartitioned spec for the location provider for the puffin containing | ||
| // all the merged DVs | ||
| OutputFileFactory.builderFor( | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This |
||
| locationProvider, | ||
| encryptionManager, | ||
| () -> fileIO, | ||
| PartitionSpec.unpartitioned(), | ||
| FileFormat.PUFFIN, | ||
| 1, | ||
| 1) | ||
| .build(), | ||
| path -> null)) { | ||
|
|
||
| for (Map.Entry<String, PositionDeleteIndex> entry : mergedIndices.entrySet()) { | ||
| String referencedLocation = entry.getKey(); | ||
| PositionDeleteIndex mergedPositions = entry.getValue(); | ||
| List<DeleteFile> duplicateDvs = dvsByFile.get(referencedLocation); | ||
| DeleteFile firstDV = duplicateDvs.get(0); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd combine with the previous line. You don't need the duplicates after this, just the first. |
||
| dvFileWriter.delete( | ||
| referencedLocation, | ||
| mergedPositions, | ||
| specsById.get(firstDV.specId()), | ||
| firstDV.partition()); | ||
| } | ||
|
|
||
| dvFileWriter.close(); | ||
| return dvFileWriter.result().deleteFiles(); | ||
| } catch (IOException e) { | ||
| throw new UncheckedIOException(e); | ||
| } | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -30,6 +30,7 @@ | |
| import java.util.Map; | ||
| import java.util.Objects; | ||
| import java.util.Set; | ||
| import java.util.stream.Collectors; | ||
| import org.apache.iceberg.encryption.EncryptedOutputFile; | ||
| import org.apache.iceberg.events.CreateSnapshotEvent; | ||
| import org.apache.iceberg.exceptions.ValidationException; | ||
|
|
@@ -47,6 +48,7 @@ | |
| import org.apache.iceberg.relocated.com.google.common.collect.Lists; | ||
| import org.apache.iceberg.relocated.com.google.common.collect.Maps; | ||
| import org.apache.iceberg.relocated.com.google.common.collect.Sets; | ||
| import org.apache.iceberg.relocated.com.google.common.collect.Streams; | ||
| import org.apache.iceberg.util.CharSequenceSet; | ||
| import org.apache.iceberg.util.ContentFileUtil; | ||
| import org.apache.iceberg.util.DataFileSet; | ||
|
|
@@ -55,6 +57,7 @@ | |
| import org.apache.iceberg.util.PartitionSet; | ||
| import org.apache.iceberg.util.SnapshotUtil; | ||
| import org.apache.iceberg.util.Tasks; | ||
| import org.apache.iceberg.util.ThreadPools; | ||
| import org.slf4j.Logger; | ||
| import org.slf4j.LoggerFactory; | ||
|
|
||
|
|
@@ -86,8 +89,8 @@ abstract class MergingSnapshotProducer<ThisT> extends SnapshotProducer<ThisT> { | |
| // update data | ||
| private final Map<Integer, DataFileSet> newDataFilesBySpec = Maps.newHashMap(); | ||
| private Long newDataFilesDataSequenceNumber; | ||
| private final Map<Integer, DeleteFileSet> newDeleteFilesBySpec = Maps.newHashMap(); | ||
| private final Set<String> newDVRefs = Sets.newHashSet(); | ||
| private final List<DeleteFile> positionAndEqualityDeletes = Lists.newArrayList(); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see below that this doesn't include DVs. But DVs are still position deletes, so we should probably find a better name. |
||
| private final Map<String, List<DeleteFile>> dvsByReferencedFile = Maps.newLinkedHashMap(); | ||
|
Comment on lines
+92
to
+93
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @rdblue These are 2 disjoint fields, one for a list of v2 deletes and a multimap for DVs. I personally think our tests should probably get away from expecting a certain order in manifests, and just assert the contents (or at least have validate methods that express either being strict on the ordering or not). As we get into V4, maybe we'll make implementation choices for ordering entries in a certain way but in the current state of things, it was kind of a hinderance to making changes here. I didn't make the test change since it's fairly large, and can be distracting from this change and I figured the linkedhashma has negligible overhead so we can just preserve the existing behavior. |
||
| private final List<ManifestFile> appendManifests = Lists.newArrayList(); | ||
| private final List<ManifestFile> rewrittenAppendManifests = Lists.newArrayList(); | ||
| private final SnapshotSummary.Builder addedFilesSummary = SnapshotSummary.builder(); | ||
|
|
@@ -222,7 +225,7 @@ protected boolean addsDataFiles() { | |
| } | ||
|
|
||
| protected boolean addsDeleteFiles() { | ||
| return !newDeleteFilesBySpec.isEmpty(); | ||
| return !positionAndEqualityDeletes.isEmpty() || !dvsByReferencedFile.isEmpty(); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This isn't entirely correct because it assumes that the lists in protected boolean addsDeleteFiles() {
return !positionAndEqualityDeletes.isEmpty()
|| dvsByReferencedFile.values().stream().anyMatch(dvs -> !dvs.isEmpty());
} |
||
| } | ||
|
|
||
| /** Add a data file to the new snapshot. */ | ||
|
|
@@ -265,15 +268,14 @@ private void addInternal(DeleteFile file) { | |
| "Cannot find partition spec %s for delete file: %s", | ||
| file.specId(), | ||
| file.location()); | ||
|
|
||
| DeleteFileSet deleteFiles = | ||
| newDeleteFilesBySpec.computeIfAbsent(spec.specId(), ignored -> DeleteFileSet.create()); | ||
| if (deleteFiles.add(file)) { | ||
| addedFilesSummary.addedFile(spec, file); | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. because we may be merging duplicates, we don't update the summary for delete files until after we dedupe and are just about to write the new manifests |
||
| hasNewDeleteFiles = true; | ||
| if (ContentFileUtil.isDV(file)) { | ||
| newDVRefs.add(file.referencedDataFile()); | ||
| } | ||
| hasNewDeleteFiles = true; | ||
amogh-jahagirdar marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| if (ContentFileUtil.isDV(file)) { | ||
| List<DeleteFile> dvsForReferencedFile = | ||
| dvsByReferencedFile.computeIfAbsent( | ||
| file.referencedDataFile(), newFile -> Lists.newArrayList()); | ||
| dvsForReferencedFile.add(file); | ||
| } else { | ||
| positionAndEqualityDeletes.add(file); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In v3, position deletes are no longer allowed to be added to the table, so should this have a check that the table is either version 2 or the deletes are equality deletes? I know that we don't want to add the complexity right now to rewrite v2 position deletes as DVs, but it doesn't seem like we should allow them to be added through the public API.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should do this in a separate commit. I wouldn't want to backport it to 1.10.x but we should definitely check because there isn't one elsewhere that I see. |
||
| } | ||
| } | ||
|
|
||
|
|
@@ -814,7 +816,7 @@ protected void validateAddedDVs( | |
| Expression conflictDetectionFilter, | ||
| Snapshot parent) { | ||
| // skip if there is no current table state or this operation doesn't add new DVs | ||
| if (parent == null || newDVRefs.isEmpty()) { | ||
| if (parent == null || dvsByReferencedFile.isEmpty()) { | ||
| return; | ||
| } | ||
|
|
||
|
|
@@ -847,7 +849,7 @@ private void validateAddedDVs( | |
| DeleteFile file = entry.file(); | ||
| if (newSnapshotIds.contains(entry.snapshotId()) && ContentFileUtil.isDV(file)) { | ||
| ValidationException.check( | ||
| !newDVRefs.contains(file.referencedDataFile()), | ||
| !dvsByReferencedFile.containsKey(file.referencedDataFile()), | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this change is correct, but I want to note that in the future we could avoid failing by merging DVs as long as that is allowed by the operation being committed.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah had an old PR out for this https://github.com/apache/iceberg/pull/11693/files#diff-410ff1b47d9a44a2fd5dbd103cad9463d82c8f4f51aa1be63b8b403123ab6e0e (probably a bad PR title since by definition for the operation if the positions are disjoint, it's not conflicting) |
||
| "Found concurrently added DV for %s: %s", | ||
| file.referencedDataFile(), | ||
| ContentFileUtil.dvDesc(file)); | ||
|
|
@@ -1042,7 +1044,7 @@ private List<ManifestFile> newDataFilesAsManifests() { | |
| } | ||
|
|
||
| private Iterable<ManifestFile> prepareDeleteManifests() { | ||
| if (newDeleteFilesBySpec.isEmpty()) { | ||
| if (!addsDeleteFiles()) { | ||
| return ImmutableList.of(); | ||
| } | ||
|
|
||
|
|
@@ -1060,9 +1062,34 @@ private List<ManifestFile> newDeleteFilesAsManifests() { | |
| } | ||
|
|
||
| if (cachedNewDeleteManifests.isEmpty()) { | ||
| for (Map.Entry<String, List<DeleteFile>> entry : dvsByReferencedFile.entrySet()) { | ||
| if (entry.getValue().size() > 1) { | ||
| LOG.warn( | ||
| "Attempted to commit {} duplicate DVs for data file {} in table {}. " | ||
| + "Merging duplicates, and original DVs will be orphaned.", | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Will be orphaned? I think this is a bit strong of a warning and could lead to incorrect issue reports. It would be nice to detect whether the puffin file is actually orphaned by checking whether it has other DVs. It's fine to leave a Puffin file in place if it has live DVs from this commit. And it's fine to delete it if we see that all of its DVs will be merged. We probably don't need to do all that in this commit, though. I'd probably leave this out and just have a warning about merging, without the part about orphan DVs. |
||
| entry.getValue().size(), | ||
| entry.getKey(), | ||
| tableName); | ||
| } | ||
| } | ||
|
|
||
| List<DeleteFile> finalDVs = | ||
| DVUtil.mergeAndWriteDvsIfRequired( | ||
| dvsByReferencedFile, | ||
| ThreadPools.getDeleteWorkerPool(), | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd put this at the end, since it is an optimization and not an argument that changes the result. |
||
| ops().locationProvider(), | ||
| ops().encryption(), | ||
| ops().io(), | ||
| ops().current().specsById()); | ||
| // Prevent commiting duplicate V2 deletes by deduping them | ||
amogh-jahagirdar marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| Map<Integer, List<DeleteFile>> newDeleteFilesBySpec = | ||
| Streams.stream(Iterables.concat(finalDVs, DeleteFileSet.of(positionAndEqualityDeletes))) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why does this use a |
||
| .map(file -> Delegates.pendingDeleteFile(file, file.dataSequenceNumber())) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This doesn't seem correct. The delete files are already wrapped when passed into this class. At a minimum,
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. mergedDVs would be constructed with the right data sequence number but this was done because we validate that every DeleteFile that we want to commit is an instance of But you're right we don't need this for positionAndEqualityDeletes. |
||
| .collect(Collectors.groupingBy(ContentFile::specId)); | ||
| newDeleteFilesBySpec.forEach( | ||
| (specId, deleteFiles) -> { | ||
| PartitionSpec spec = ops().current().spec(specId); | ||
| deleteFiles.forEach(file -> addedFilesSummary.addedFile(spec, file)); | ||
amogh-jahagirdar marked this conversation as resolved.
Show resolved
Hide resolved
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If the cache is invalidated, this will double-count all delete files, which I think we have to fix. That's a bit ugly. The cleanest way I can think of is to keep a summary builder for data files and then combine the current set of deletes with data files here with a new builder, or possibly just clear the current builder and add everything. |
||
| List<ManifestFile> newDeleteManifests = writeDeleteManifests(deleteFiles, spec); | ||
| cachedNewDeleteManifests.addAll(newDeleteManifests); | ||
| }); | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -43,4 +43,21 @@ public interface DVFileWriter extends Closeable { | |
| * @return the writer result | ||
| */ | ||
| DeleteWriteResult result(); | ||
|
|
||
| /** | ||
| * Marks every position that is deleted in positionDeleteIndex as deleted in the given data file. | ||
| * Implementations should merge with existing position indices for the provided path | ||
| * | ||
| * @param path the data file path | ||
| * @param positionDeleteIndex the position delete index containing all the positions to delete | ||
| * @param spec the data file partition spec | ||
| * @param partition the data file partition | ||
| */ | ||
| default void delete( | ||
| String path, | ||
| PositionDeleteIndex positionDeleteIndex, | ||
| PartitionSpec spec, | ||
| StructLike partition) { | ||
| throw new UnsupportedOperationException("Delete with positionDeleteIndex is not supported"); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Minor: This could be implemented by deleting specific positions from the delete index.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Error messages generally shouldn't use identifiers from code. Would this be more helpful to users if you used "Bulk deletes are not supported" or similar? |
||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should
DVUtilbe indeletes? That would require making some of this public.