Core: Detect and merge duplicate DVs for a data file and merge them before committing#15006
Core: Detect and merge duplicate DVs for a data file and merge them before committing#15006amogh-jahagirdar wants to merge 28 commits intoapache:mainfrom
Conversation
|
Still cleaning some stuff up, so leaving in draft but feel free to comment. But basically there are some cases in Spark where a file can be split across multiple tasks, and if deletes happen to touch every single part in the task we'd incorrectly produce multiple DVs for a given data file (discovered this recently with a user when they had Spark AQE enabled, but I think file splitting can happen in more cases). We currently throw on read in such cases, but ideally we can try and prevent this on write by detecting and merging pre-commit. The reason this is done behind the API is largely so that we are defensive from a library perspective that in case an engine/integration happens to produce multiple DVs, we can at least fix it up pre-commit. In the case there are too many to reasonably rewrite on a single node, then engines could do distributed writes to fix up before handing off the files to the API, but arguably from a library perspective it seems reasonable to pay this overhead to prevent bad commits across any integration. |
core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Probably keep a boolean in-case we detect a duplicate. That way we don't have to pay the price of grouping by referenced file everytime to detect possible duplicates; only if we detect it at the time of adding it, we can do the dedupe/merge
There was a problem hiding this comment.
We also could just keep a mapping specific for duplicates. That shrinks down how much work we need to do because instead of trying to group by every referenced data file in case of duplicates, we just go through the duplicates set. It's maybe a little more memory but if we consider that we expect duplicates to generally be rare it feels like a generally better solution
core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
Outdated
Show resolved
Hide resolved
core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
Outdated
Show resolved
Hide resolved
core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
Outdated
Show resolved
Hide resolved
core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
Outdated
Show resolved
Hide resolved
core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
Outdated
Show resolved
Hide resolved
core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
Outdated
Show resolved
Hide resolved
…require HasTableOPerations
374b567 to
c04d0e0
Compare
core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
Outdated
Show resolved
Hide resolved
core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
Outdated
Show resolved
Hide resolved
| DeleteFileSet deleteFiles = | ||
| newDeleteFilesBySpec.computeIfAbsent(spec.specId(), ignored -> DeleteFileSet.create()); | ||
| if (deleteFiles.add(file)) { | ||
| addedFilesSummary.addedFile(spec, file); |
There was a problem hiding this comment.
because we may be merging duplicates, we don't update the summary for delete files until after we dedupe and are just about to write the new manifests
core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
Outdated
Show resolved
Hide resolved
| Pair<List<PositionDelete<?>>, DeleteFile> deletesA = | ||
| deleteFile(tab, dataFileA, new Object[] {"aa"}, new Object[] {"a"}); | ||
| Pair<List<PositionDelete<?>>, DeleteFile> deletesB = | ||
| deleteFile(tab, dataFileA, new Object[] {"bb"}, new Object[] {"b"}); |
There was a problem hiding this comment.
This fix surfaced an issue in some of the TestPositionDeletesTable tests where we were setting the wrong data file for delete file; we'd just add a DV for the same data file, and then it'd get merged with the new logic , and break some of the later assertions.
| // Add Data Files with EQ and POS deletes | ||
| DeleteFile fileADeletes = fileADeletes(); | ||
| DeleteFile fileA2Deletes = fileA2Deletes(); | ||
| DeleteFile fileBDeletes = fileBDeletes(); |
There was a problem hiding this comment.
This test had to be fixed after the recent changes because the file paths for data file B and B2 were set to the same before, so the DVs for both referenced the same file (but that probably wasn't the intention of these tests) so it was a duplicate. After this change we'd merge the DVs in the commit, and then it'd actually get treated as a dangling delete and fail some of the assertions.
Since these tests are just testing the eq. delete case we could just simplify it by removing the usage of fileB deletes, it's a more minimal test that tests the same thing.
Also note, generally I'd take this in a separate PR but I think there's a good argument that this change should be in a 1.10.2 patch release to prevent invalid table states; in that case we'd need to keep these changes together.
bc231a2 to
edecb48
Compare
edecb48 to
8087558
Compare
| public static PositionDeleteIndex readDV( | ||
| DeleteFile deleteFile, FileIO fileIO, EncryptionManager encryptionManager) { | ||
| Preconditions.checkArgument( | ||
| ContentFileUtil.isDV(deleteFile), "Delete file must be a deletion vector"); |
There was a problem hiding this comment.
"Cannot read, not a deletion vector: %s", deleteFile.location()?
| Preconditions.checkArgument( | ||
| ContentFileUtil.isDV(deleteFile), "Delete file must be a deletion vector"); | ||
| InputFile inputFile = | ||
| EncryptingFileIO.combine(fileIO, encryptionManager).newInputFile(deleteFile); |
There was a problem hiding this comment.
I think that the caller should combine these so that the FileIO can handle the DeleteFile directly. We don't want utility methods creating a one-time-use EncryptingFileIO. This should just assume that the FileIO has been wrapped appropriately for the table.
| } | ||
| } | ||
|
|
||
| public static byte[] readBytes(InputFile inputFile, long offset, int length) { |
There was a problem hiding this comment.
I think this ends up being confusing because RangeReadable#readFully ends up calling the method above. It isn't clear which one to use, except that this one allocates a byte array. That's not related to the difference in behavior, where this one looks for a specific FileIO capability. It would be reasonable to call IOUtil.readFully(inputFile.newStream(), buffer, offset, buffer.length) to reuse a buffer, even though that would miss the RangeReadable optimization.
To improve this:
- It should also be named
readFullyto capture the semantics: this will not return after a partial read. (This should also be covered by the method's Javadoc.) - It should accept a destination buffer so that the difference doesn't make people choose one or the other
- It should also use the local implementation of
readFully(InputStream, byte[])instead of the Guava one - It will need a second offset. The
readFullyimplementation above usesoffsetas the location in the destination buffer, not in the source stream (which is assumed to be in position).
| PositionDeleteIndex positionDeleteIndex, | ||
| PartitionSpec spec, | ||
| StructLike partition) { | ||
| throw new UnsupportedOperationException("Delete with positionDeleteIndex is not supported"); |
There was a problem hiding this comment.
Minor: This could be implemented by deleting specific positions from the delete index.
| taskId); | ||
| } | ||
|
|
||
| public static Builder builderFor( |
There was a problem hiding this comment.
Why doesn't this create a builder and then configure it? Why does all of this need to be passed into the builder constructor? Seems like that defeats the purpose of the builder.
If the idea was to be able to create a builder without passing a table, that seems reasonable. But this should use as many builder methods as possible.
| import org.apache.iceberg.types.Comparators; | ||
| import org.apache.iceberg.util.Tasks; | ||
|
|
||
| class DVUtil { |
There was a problem hiding this comment.
Should DVUtil be in deletes? That would require making some of this public.
| PositionDeleteIndex positionDeleteIndex, | ||
| PartitionSpec spec, | ||
| StructLike partition) { | ||
| throw new UnsupportedOperationException("Delete with positionDeleteIndex is not supported"); |
There was a problem hiding this comment.
Error messages generally shouldn't use identifiers from code. Would this be more helpful to users if you used "Bulk deletes are not supported" or similar?
| private Long newDataFilesDataSequenceNumber; | ||
| private final Map<Integer, DeleteFileSet> newDeleteFilesBySpec = Maps.newHashMap(); | ||
| private final Set<String> newDVRefs = Sets.newHashSet(); | ||
| private final List<DeleteFile> positionAndEqualityDeletes = Lists.newArrayList(); |
There was a problem hiding this comment.
I see below that this doesn't include DVs. But DVs are still position deletes, so we should probably find a better name.
| file.referencedDataFile(), newFile -> Lists.newArrayList()); | ||
| dvsForReferencedFile.add(file); | ||
| } else { | ||
| positionAndEqualityDeletes.add(file); |
There was a problem hiding this comment.
In v3, position deletes are no longer allowed to be added to the table, so should this have a check that the table is either version 2 or the deletes are equality deletes?
I know that we don't want to add the complexity right now to rewrite v2 position deletes as DVs, but it doesn't seem like we should allow them to be added through the public API.
There was a problem hiding this comment.
We should do this in a separate commit. I wouldn't want to backport it to 1.10.x but we should definitely check because there isn't one elsewhere that I see.
|
|
||
| protected boolean addsDeleteFiles() { | ||
| return !newDeleteFilesBySpec.isEmpty(); | ||
| return !positionAndEqualityDeletes.isEmpty() || !dvsByReferencedFile.isEmpty(); |
There was a problem hiding this comment.
This isn't entirely correct because it assumes that the lists in dvsByReferencedFile are non-empty. It would probably be better to check:
protected boolean addsDeleteFiles() {
return !positionAndEqualityDeletes.isEmpty()
|| dvsByReferencedFile.values().stream().anyMatch(dvs -> !dvs.isEmpty());
}| if (entry.getValue().size() > 1) { | ||
| LOG.warn( | ||
| "Attempted to commit {} duplicate DVs for data file {} in table {}. " | ||
| + "Merging duplicates, and original DVs will be orphaned.", |
There was a problem hiding this comment.
Will be orphaned? I think this is a bit strong of a warning and could lead to incorrect issue reports.
It would be nice to detect whether the puffin file is actually orphaned by checking whether it has other DVs. It's fine to leave a Puffin file in place if it has live DVs from this commit. And it's fine to delete it if we see that all of its DVs will be merged.
We probably don't need to do all that in this commit, though. I'd probably leave this out and just have a warning about merging, without the part about orphan DVs.
| * @param dvsByFile map of data file location to DVs | ||
| * @return a list containing both any newly merged DVs and any DVs that are already valid | ||
| */ | ||
| static List<DeleteFile> mergeAndWriteDvsIfRequired( |
There was a problem hiding this comment.
Looks like this happens quite a bit. We usually don't follow the Java convention of changing acronyms to camel case. For instance, DV util -> DVUtil instead of DvUtil and file IO -> fileIO. (The exception is ID that was added early and does usually change to fieldId)
| ops().current().specsById()); | ||
| // Prevent commiting duplicate V2 deletes by deduping them | ||
| Map<Integer, List<DeleteFile>> newDeleteFilesBySpec = | ||
| Streams.stream(Iterables.concat(finalDVs, DeleteFileSet.of(positionAndEqualityDeletes))) |
There was a problem hiding this comment.
Why does this use a DeleteFileSet? Do we think that there are cases where not using one is a behavior change? I'd prefer failing if there are duplicate deletes.
| // Prevent commiting duplicate V2 deletes by deduping them | ||
| Map<Integer, List<DeleteFile>> newDeleteFilesBySpec = | ||
| Streams.stream(Iterables.concat(finalDVs, DeleteFileSet.of(positionAndEqualityDeletes))) | ||
| .map(file -> Delegates.pendingDeleteFile(file, file.dataSequenceNumber())) |
There was a problem hiding this comment.
This doesn't seem correct. The delete files are already wrapped when passed into this class. At a minimum, positionAndEqualityDeletes don't need to be re-wrapped. And merged DVs should already be constructed with the right dataSequenceNumber because that's an internal rewrite.
There was a problem hiding this comment.
mergedDVs would be constructed with the right data sequence number but this was done because we validate that every DeleteFile that we want to commit is an instance of PendingDeleteFile otherwise it fails. I didn't want to do that in DVUtil because I didn't want to make assumptions for how a caller would use it.
But you're right we don't need this for positionAndEqualityDeletes.
| newDeleteFilesBySpec.forEach( | ||
| (specId, deleteFiles) -> { | ||
| PartitionSpec spec = ops().current().spec(specId); | ||
| deleteFiles.forEach(file -> addedFilesSummary.addedFile(spec, file)); |
There was a problem hiding this comment.
If the cache is invalidated, this will double-count all delete files, which I think we have to fix. That's a bit ugly. The cleanest way I can think of is to keep a summary builder for data files and then combine the current set of deletes with data files here with a new builder, or possibly just clear the current builder and add everything.
| List<DeleteFile> finalDVs = | ||
| DVUtil.mergeAndWriteDvsIfRequired( | ||
| dvsByReferencedFile, | ||
| ThreadPools.getDeleteWorkerPool(), |
There was a problem hiding this comment.
I'd put this at the end, since it is an optimization and not an argument that changes the result.
| return result; | ||
| } | ||
|
|
||
| private static void validateDVCanBeMerged( |
There was a problem hiding this comment.
For method names, it's shorter if you avoid past tense. For example: validateCanMerge rather than validateCanBeMerged. I also think since this is DVUtil you can get away with not having DV in each method name.
| Deletes.readDV(duplicateDvs.get(i), io, encryptionManager)); | ||
|
|
||
| // Build a grouping of referenced file to indices of the corresponding duplicate DVs | ||
| Map<String, List<Integer>> dvIndicesByDataFile = Maps.newLinkedHashMap(); |
| .run( | ||
| i -> | ||
| duplicateDvPositions[i] = | ||
| Deletes.readDV(duplicateDvs.get(i), io, encryptionManager)); |
There was a problem hiding this comment.
As noted earlier, there should be no need to pass encryptionManager separately.
| for (int i = 1; i < dvIndicesForFile.size(); i++) { | ||
| int dvIndex = dvIndicesForFile.get(i); | ||
| DeleteFile dv = duplicateDvs.get(dvIndex); | ||
| validateDVCanBeMerged(dv, firstDv, partitionComparator); |
There was a problem hiding this comment.
I think it is a bit late to validate that DVs can be merged because they were all read into memory. I would move this check to the start of the method.
There was a problem hiding this comment.
This is a good reason to pass the original map in since it is available to the calling method.
| // Validate and merge per referenced file, caching comparators by spec ID | ||
| Map<Integer, Comparator<StructLike>> comparatorsBySpecId = Maps.newHashMap(); | ||
| Map<String, PositionDeleteIndex> result = Maps.newHashMap(); | ||
| for (Map.Entry<String, List<Integer>> entry : dvIndicesByDataFile.entrySet()) { |
There was a problem hiding this comment.
It looks like the reason for the complex logic here is to validate the DVs can be merged. As I noted below I think that should be done before reading. If you restructure that way, then all you need is to loop over the PositionDeleteIndex array and duplicateDVs at the same time and either adding a new entry for the DeleteFile#referencedDataFile or merging them. I think that would really simplify this part.
There was a problem hiding this comment.
I think you're right about this, we can really simplify if we just do the validation in that initial loop where we construct the duplicate mapping. Everything we need to validate is at the metadata level. Then we can just work off the assumption later on that everything can be merged, and just loop through the indices and merge.
|
|
||
| // Produces a single Puffin file containing the merged DVs | ||
| private static List<DeleteFile> writeMergedDVs( | ||
| Map<String, PositionDeleteIndex> mergedIndices, |
| String referencedLocation = entry.getKey(); | ||
| PositionDeleteIndex mergedPositions = entry.getValue(); | ||
| List<DeleteFile> duplicateDvs = dvsByFile.get(referencedLocation); | ||
| DeleteFile firstDV = duplicateDvs.get(0); |
There was a problem hiding this comment.
I'd combine with the previous line. You don't need the duplicates after this, just the first.
| new BaseDVFileWriter( | ||
| // Use an unpartitioned spec for the location provider for the puffin containing | ||
| // all the merged DVs | ||
| OutputFileFactory.builderFor( |
There was a problem hiding this comment.
This OutputFileFactory has a fake partition spec and IDs. Maybe we don't need to use it, then. All this really needs to do is generate a file name, add the Puffin extension, and call the location provider. I'd rather just do those things directly and not need to fake OutputFileFactory, which is intended for tasks. This could also use the snapshot ID.
While generally, writers are expected to merge DVs for a given data file before attempting to commit, we probably want to have a safeguard in the commit path in case this assumption is violated. This has been observed when AQE is enabled in Spark and a data file is split across multiple tasks (really just depends on how files and deletes are split); then multiple DVs are produced for a given data file, and then committed. Currently, after that commit reads would fail since the DeleteFileIndex detects the duplicates and fails on read.
Arguably, there should be a safeguard on the commit path which detects duplicates and fixes them up to prevent any invalid table states. Doing this behind the API covers any engine integration using the library.
This change updates MergingSnapshotProducer to track duplicate DVs for a datafile, and then merge them and produces a Puffin file per DV. Note that since we generally expect duplicates to be rare, we don't expect there to be too many small Puffins produced, and we don't add the additional logic to coalesce into larger files. Furthermore, these can later be compacted. In case of large scale duplicates, then engines should arguably fix those up before handing off to the commit path.