From 9b08e1fe45294668f60f2a9a1df199e6b9a02fe0 Mon Sep 17 00:00:00 2001 From: Swaminathan Balachandran Date: Thu, 18 Dec 2025 15:38:51 -0500 Subject: [PATCH 1/4] HDDS-14211. Rename RDBBatchOperation as AtomicRDBBatchOperation and introduce base abstract class RDBBatchOperation Change-Id: Ia81bd22e402fb8e96a613760bce9fd97ac51d849 --- .../container/metadata/DatanodeTable.java | 5 +- .../metadata/SchemaOneDeletedBlocksTable.java | 2 +- .../scm/metadata/DBTransactionBuffer.java | 5 +- .../utils/db/AtomicRDBBatchOperation.java | 683 ++++++++++++++++++ .../hadoop/hdds/utils/db/BatchOperation.java | 6 +- .../hdds/utils/db/BatchOperationHandler.java | 4 +- .../hdds/utils/db/RDBBatchOperation.java | 671 +---------------- .../apache/hadoop/hdds/utils/db/RDBStore.java | 18 +- .../apache/hadoop/hdds/utils/db/RDBTable.java | 8 +- .../apache/hadoop/hdds/utils/db/Table.java | 5 +- .../hadoop/hdds/utils/db/TypedTable.java | 5 +- .../hadoop/hdds/utils/db/TestCodec.java | 2 +- .../hdds/utils/db/TestRDBBatchOperation.java | 5 +- .../hdds/utils/db/TestRDBTableStore.java | 8 +- .../hdds/utils/db/TestTypedRDBTableStore.java | 4 +- .../hdds/scm/ha/SCMHADBTransactionBuffer.java | 3 +- .../scm/ha/SCMHADBTransactionBufferImpl.java | 8 +- .../scm/ha/SCMHADBTransactionBufferStub.java | 10 +- .../hadoop/ozone/recon/TestReconTasks.java | 6 +- .../OMSnapshotMoveDeletedKeysResponse.java | 5 +- .../OMSnapshotMoveTableKeysResponse.java | 5 +- .../ozone/om/TestOmSnapshotManager.java | 8 +- .../bucket/TestOMBucketCreateResponse.java | 3 +- .../bucket/TestOMBucketDeleteResponse.java | 3 +- .../TestOMBucketSetPropertyResponse.java | 3 +- .../file/TestOMDirectoryCreateResponse.java | 3 +- .../om/response/key/TestOMKeyResponse.java | 2 +- .../s3/multipart/TestS3MultipartResponse.java | 2 +- .../TestOMDelegationTokenResponse.java | 2 +- .../TestOMSnapshotCreateResponse.java | 2 +- .../TestOMSnapshotDeleteResponse.java | 3 +- .../response/volume/TestOMVolumeResponse.java | 3 +- .../TestSnapshotRequestAndResponse.java | 2 +- .../impl/OzoneManagerServiceProviderImpl.java | 5 +- .../recon/tasks/ContainerKeyMapperHelper.java | 2 +- .../recon/tasks/FileSizeCountTaskHelper.java | 2 +- .../tasks/NSSummaryTaskDbEventHandler.java | 2 +- .../ozone/recon/tasks/OmTableInsightTask.java | 2 +- .../recon/api/TestContainerEndpoint.java | 4 +- ...TestReconContainerMetadataManagerImpl.java | 18 +- .../TestReconNamespaceSummaryManagerImpl.java | 2 +- .../tasks/AbstractNSSummaryTaskTest.java | 2 +- 42 files changed, 816 insertions(+), 727 deletions(-) create mode 100644 hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/AtomicRDBBatchOperation.java diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeTable.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeTable.java index 5b39147f3e29..4d78d1838d49 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeTable.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeTable.java @@ -69,12 +69,13 @@ public void deleteRange(KEY beginKey, KEY endKey) throws RocksDatabaseException, } @Override - public void deleteWithBatch(BatchOperation batch, KEY key) throws CodecException { + public void deleteWithBatch(BatchOperation batch, KEY key) throws CodecException, RocksDatabaseException { table.deleteWithBatch(batch, key); } @Override - public void deleteRangeWithBatch(BatchOperation batch, KEY beginKey, KEY endKey) throws CodecException { + public void deleteRangeWithBatch(BatchOperation batch, KEY beginKey, KEY endKey) + throws CodecException, RocksDatabaseException { table.deleteRangeWithBatch(batch, beginKey, endKey); } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/SchemaOneDeletedBlocksTable.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/SchemaOneDeletedBlocksTable.java index 913d6e30d1a9..486d45adfa2e 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/SchemaOneDeletedBlocksTable.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/SchemaOneDeletedBlocksTable.java @@ -69,7 +69,7 @@ public void delete(String key) throws RocksDatabaseException, CodecException { } @Override - public void deleteWithBatch(BatchOperation batch, String key) throws CodecException { + public void deleteWithBatch(BatchOperation batch, String key) throws CodecException, RocksDatabaseException { super.deleteWithBatch(batch, prefix(key)); } diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/metadata/DBTransactionBuffer.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/metadata/DBTransactionBuffer.java index 58dacfab6210..b460c8324789 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/metadata/DBTransactionBuffer.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/metadata/DBTransactionBuffer.java @@ -17,6 +17,7 @@ package org.apache.hadoop.hdds.scm.metadata; +import java.io.Closeable; import org.apache.hadoop.hdds.utils.db.CodecException; import org.apache.hadoop.hdds.utils.db.RocksDatabaseException; import org.apache.hadoop.hdds.utils.db.Table; @@ -24,13 +25,11 @@ /** * DB transaction that abstracts the updates to the underlying datastore. */ -public interface DBTransactionBuffer { +public interface DBTransactionBuffer extends Closeable { void addToBuffer(Table table, KEY key, VALUE value) throws RocksDatabaseException, CodecException; void removeFromBuffer(Table table, KEY key) throws RocksDatabaseException, CodecException; - - void close() throws RocksDatabaseException; } diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/AtomicRDBBatchOperation.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/AtomicRDBBatchOperation.java new file mode 100644 index 000000000000..c0027730d914 --- /dev/null +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/AtomicRDBBatchOperation.java @@ -0,0 +1,683 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.utils.db; + +import static org.apache.hadoop.hdds.StringUtils.bytes2String; + +import com.google.common.base.Preconditions; +import com.google.common.primitives.UnsignedBytes; +import java.io.Closeable; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Supplier; +import java.util.stream.Collectors; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.hadoop.hdds.utils.IOUtils; +import org.apache.hadoop.hdds.utils.db.RocksDatabase.ColumnFamily; +import org.apache.hadoop.hdds.utils.db.managed.ManagedWriteBatch; +import org.apache.hadoop.hdds.utils.db.managed.ManagedWriteOptions; +import org.apache.ratis.util.TraditionalBinaryPrefix; +import org.apache.ratis.util.UncheckedAutoCloseable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Batch operation implementation for rocks db. + * Note that a {@link AtomicRDBBatchOperation} object only for one batch. + * Also, this class is not threadsafe. + */ +public class AtomicRDBBatchOperation extends RDBBatchOperation { + static final Logger LOG = LoggerFactory.getLogger(AtomicRDBBatchOperation.class); + + private static final AtomicInteger BATCH_COUNT = new AtomicInteger(); + + private final String name = "Batch-" + BATCH_COUNT.getAndIncrement(); + + private final ManagedWriteBatch writeBatch; + + private final OpCache opCache = new OpCache(); + + private enum Op { DELETE, PUT, DELETE_RANGE } + + private static void debug(Supplier message) { + if (LOG.isTraceEnabled()) { + LOG.trace("\n{}", message.get()); + } + } + + private static String byteSize2String(long length) { + return TraditionalBinaryPrefix.long2String(length, "B", 2); + } + + private static String countSize2String(int count, long size) { + return count + " (" + byteSize2String(size) + ")"; + } + + /** + * The key type of {@link AtomicRDBBatchOperation.OpCache.FamilyCache#opsKeys}. + * To implement {@link #equals(Object)} and {@link #hashCode()} + * based on the contents of the bytes. + */ + static final class Bytes implements Comparable { + private final byte[] array; + private final CodecBuffer buffer; + /** Cache the hash value. */ + private final int hash; + + Bytes(CodecBuffer buffer) { + this.array = null; + this.buffer = Objects.requireNonNull(buffer, "buffer == null"); + this.hash = buffer.asReadOnlyByteBuffer().hashCode(); + } + + Bytes(byte[] array) { + this.array = array; + this.buffer = null; + this.hash = ByteBuffer.wrap(array).hashCode(); + } + + ByteBuffer asReadOnlyByteBuffer() { + return buffer.asReadOnlyByteBuffer(); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } else if (!(obj instanceof Bytes)) { + return false; + } + final Bytes that = (Bytes) obj; + if (this.hash != that.hash) { + return false; + } + final ByteBuffer thisBuf = this.array != null ? + ByteBuffer.wrap(this.array) : this.asReadOnlyByteBuffer(); + final ByteBuffer thatBuf = that.array != null ? + ByteBuffer.wrap(that.array) : that.asReadOnlyByteBuffer(); + return thisBuf.equals(thatBuf); + } + + @Override + public int hashCode() { + return hash; + } + + @Override + public String toString() { + return array != null ? bytes2String(array) + : bytes2String(asReadOnlyByteBuffer()); + } + + // This method mimics the ByteWiseComparator in RocksDB. + @Override + public int compareTo(AtomicRDBBatchOperation.Bytes that) { + final ByteBuffer thisBuf = this.array != null ? + ByteBuffer.wrap(this.array) : this.asReadOnlyByteBuffer(); + final ByteBuffer thatBuf = that.array != null ? + ByteBuffer.wrap(that.array) : that.asReadOnlyByteBuffer(); + + for (int i = 0; i < Math.min(thisBuf.remaining(), thatBuf.remaining()); i++) { + int cmp = UnsignedBytes.compare(thisBuf.get(i), thatBuf.get(i)); + if (cmp != 0) { + return cmp; + } + } + return thisBuf.remaining() - thatBuf.remaining(); + } + } + + private abstract class Operation implements Closeable { + private Bytes keyBytes; + + private Operation(Bytes keyBytes) { + this.keyBytes = keyBytes; + } + + abstract void apply(ColumnFamily family, ManagedWriteBatch batch) throws RocksDatabaseException; + + abstract int keyLen(); + + abstract int valLen(); + + Bytes getKey() { + return keyBytes; + } + + int totalLength() { + return keyLen() + valLen(); + } + + abstract Op getOpType(); + + @Override + public void close() { + } + } + + /** + * Delete operation to be applied to a {@link ColumnFamily} batch. + */ + private final class DeleteOperation extends Operation { + private final byte[] key; + + private DeleteOperation(byte[] key, Bytes keyBytes) { + super(Objects.requireNonNull(keyBytes, "keyBytes == null")); + this.key = Objects.requireNonNull(key, "key == null"); + } + + @Override + public void apply(ColumnFamily family, ManagedWriteBatch batch) throws RocksDatabaseException { + family.batchDelete(batch, this.key); + } + + @Override + public int keyLen() { + return key.length; + } + + @Override + public int valLen() { + return 0; + } + + @Override + public Op getOpType() { + return Op.DELETE; + } + } + + /** + * Put operation to be applied to a {@link ColumnFamily} batch using the CodecBuffer api. + */ + private final class CodecBufferPutOperation extends Operation { + private final CodecBuffer key; + private final CodecBuffer value; + private final AtomicBoolean closed = new AtomicBoolean(false); + + private CodecBufferPutOperation(CodecBuffer key, CodecBuffer value, Bytes keyBytes) { + super(keyBytes); + this.key = key; + this.value = value; + } + + @Override + public void apply(ColumnFamily family, ManagedWriteBatch batch) throws RocksDatabaseException { + family.batchPut(batch, key.asReadOnlyByteBuffer(), value.asReadOnlyByteBuffer()); + } + + @Override + public int keyLen() { + return key.readableBytes(); + } + + @Override + public int valLen() { + return value.readableBytes(); + } + + @Override + public Op getOpType() { + return Op.PUT; + } + + @Override + public void close() { + if (closed.compareAndSet(false, true)) { + key.release(); + value.release(); + } + super.close(); + } + } + + /** + * Put operation to be applied to a {@link ColumnFamily} batch using the byte array api. + */ + private final class ByteArrayPutOperation extends Operation { + private final byte[] key; + private final byte[] value; + + private ByteArrayPutOperation(byte[] key, byte[] value, Bytes keyBytes) { + super(Objects.requireNonNull(keyBytes)); + this.key = Objects.requireNonNull(key, "key == null"); + this.value = Objects.requireNonNull(value, "value == null"); + } + + @Override + public void apply(ColumnFamily family, ManagedWriteBatch batch) throws RocksDatabaseException { + family.batchPut(batch, key, value); + } + + @Override + public int keyLen() { + return key.length; + } + + @Override + public int valLen() { + return value.length; + } + + @Override + public Op getOpType() { + return Op.PUT; + } + } + + /** + * Delete range operation to be applied to a {@link ColumnFamily} batch. + */ + private final class DeleteRangeOperation extends Operation { + private final byte[] startKey; + private final byte[] endKey; + + private DeleteRangeOperation(byte[] startKey, byte[] endKey) { + super(null); + this.startKey = Objects.requireNonNull(startKey, "startKey == null"); + this.endKey = Objects.requireNonNull(endKey, "endKey == null"); + } + + @Override + public void apply(ColumnFamily family, ManagedWriteBatch batch) throws RocksDatabaseException { + family.batchDeleteRange(batch, startKey, endKey); + } + + @Override + public int keyLen() { + return startKey.length + endKey.length; + } + + @Override + public int valLen() { + return 0; + } + + @Override + public Op getOpType() { + return Op.DELETE_RANGE; + } + } + + /** Cache and deduplicate db ops (put/delete). */ + private class OpCache { + /** A (family name -> {@link FamilyCache}) map. */ + private final Map name2cache = new HashMap<>(); + + /** A cache for a {@link ColumnFamily}. */ + private class FamilyCache { + private final ColumnFamily family; + /** + * A mapping of operation keys to their respective indices in {@code FamilyCache}. + * + * Key details: + * - Maintains a mapping of unique operation keys to their insertion or processing order. + * - Used internally to manage and sort operations during batch writes. + * - Facilitates filtering, overwriting, or deletion of operations based on their keys. + * + * Constraints: + * - Keys must be unique, represented using {@link Bytes}, to avoid collisions. + * - Each key is associated with a unique integer index to track insertion order. + * + * This field plays a critical role in managing the logical consistency and proper execution + * order of operations stored in the batch when interacting with a RocksDB-backed system. + */ + private final Map opsKeys = new HashMap<>(); + /** + * Maintains a mapping of unique operation indices to their corresponding {@code Operation} instances. + * + * This map serves as the primary container for recording operations in preparation for a batch write + * within a RocksDB-backed system. Each operation is referenced by an integer index, which determines + * its insertion order and ensures correct sequencing during batch execution. + * + * Key characteristics: + * - Stores operations of type {@code Operation}. + * - Uses a unique integer key (index) for mapping each operation. + * - Serves as an intermediary structure during batch preparation and execution. + * + * Usage context: + * - This map is managed as part of the batch-writing process, which involves organizing, + * filtering, and applying multiple operations in a single cohesive batch. + * - Operations stored in this map are expected to define specific actions (e.g., put, delete, + * delete range) and their associated data (e.g., keys, values). + */ + private final Map batchOps = new HashMap<>(); + private boolean isCommit; + + private long batchSize; + private long discardedSize; + private int discardedCount; + private int putCount; + private int delCount; + private int delRangeCount; + private AtomicInteger opIndex; + + FamilyCache(ColumnFamily family) { + this.family = family; + this.opIndex = new AtomicInteger(0); + } + + /** + * Prepares a batch write operation for a RocksDB-backed system. + * + * This method ensures the orderly execution of operations accumulated in the batch, + * respecting their respective types and order of insertion. + * + * Key functionalities: + * 1. Ensures that the batch is not already committed before proceeding. + * 2. Sorts all operations by their `opIndex` to maintain a consistent execution order. + * 3. Filters and adapts operations to account for any delete range operations that might + * affect other operations in the batch: + * - Operations with keys that fall within the range specified by a delete range operation + * are discarded. + * - Delete range operations are executed in their correct order. + * 4. Applies remaining operations to the write batch, ensuring proper filtering and execution. + * 5. Logs a summary of the batch execution for debugging purposes. + * + * Throws: + * - RocksDatabaseException if any error occurs while applying operations to the write batch. + * + * Prerequisites: + * - The method assumes that the operations are represented by `Operation` objects, each of which + * encapsulates the logic for its specific type. + * - Delete range operations must be represented by the `DeleteRangeOperation` class. + */ + void prepareBatchWrite() throws RocksDatabaseException { + Preconditions.checkState(!isCommit, "%s is already committed.", this); + isCommit = true; + // Sort Entries based on opIndex and flush the operation to the batch in the same order. + List ops = batchOps.entrySet().stream().sorted(Comparator.comparingInt(Map.Entry::getKey)) + .map(Map.Entry::getValue).collect(Collectors.toList()); + List> deleteRangeIndices = new ArrayList<>(); + int index = 0; + int prevIndex = -2; + for (Operation op : ops) { + if (Op.DELETE_RANGE == op.getOpType()) { + if (index - prevIndex > 1) { + deleteRangeIndices.add(new ArrayList<>()); + } + List continuousIndices = deleteRangeIndices.get(deleteRangeIndices.size() - 1); + continuousIndices.add(index); + prevIndex = index; + } + index++; + } + // This is to apply the last batch of entries after the last DeleteRangeOperation. + deleteRangeIndices.add(Collections.emptyList()); + int startIndex = 0; + for (List continuousDeleteRangeIndices : deleteRangeIndices) { + List deleteRangeOps = continuousDeleteRangeIndices.stream() + .map(i -> (DeleteRangeOperation)ops.get(i)) + .collect(Collectors.toList()); + List> deleteRangeOpsRanges = continuousDeleteRangeIndices.stream() + .map(i -> (DeleteRangeOperation)ops.get(i)) + .map(i -> Pair.of(new Bytes(i.startKey), new Bytes(i.endKey))) + .collect(Collectors.toList()); + int firstOpIndex = continuousDeleteRangeIndices.isEmpty() ? ops.size() : continuousDeleteRangeIndices.get(0); + + for (int i = startIndex; i < firstOpIndex; i++) { + Operation op = ops.get(i); + Bytes key = op.getKey(); + // Compare the key with the startKey and endKey of the delete range operation. Add to Batch if key + // doesn't fall [startKey, endKey) range. + boolean keyInRange = false; + Pair deleteRange = null; + for (Pair deleteRangeOp : deleteRangeOpsRanges) { + if (key.compareTo(deleteRangeOp.getLeft()) >= 0 && key.compareTo(deleteRangeOp.getRight()) < 0) { + keyInRange = true; + deleteRange = deleteRangeOp; + break; + } + } + if (!keyInRange) { + op.apply(family, writeBatch); + } else { + Pair finalDeleteRange = deleteRange; + debug(() -> String.format("Discarding Operation with Key: %s as it falls within the range of [%s, %s)", + bytes2String(key.asReadOnlyByteBuffer()), + bytes2String(finalDeleteRange.getKey().asReadOnlyByteBuffer()), + bytes2String(finalDeleteRange.getRight().asReadOnlyByteBuffer()))); + discardedCount++; + discardedSize += op.totalLength(); + } + } + for (DeleteRangeOperation deleteRangeOp : deleteRangeOps) { + // Apply the delete range operation to the batch. + deleteRangeOp.apply(family, writeBatch); + } + // Update the startIndex to start from the next operation after the delete range operation. + startIndex = firstOpIndex + continuousDeleteRangeIndices.size(); + } + debug(this::summary); + } + + private String summary() { + return String.format(" %s %s, #put=%s, #del=%s", this, + batchSizeDiscardedString(), putCount, delCount); + } + + void clear() { + final boolean warn = !isCommit && batchSize > 0; + String details = warn ? summary() : null; + + IOUtils.close(LOG, batchOps.values()); + batchOps.clear(); + + if (warn) { + LOG.warn("discarding changes {}", details); + } + } + + private void deleteIfExist(Bytes key, boolean removeFromIndexMap) { + // remove previous first in order to call release() + if (opsKeys.containsKey(key)) { + int previousIndex = removeFromIndexMap ? opsKeys.remove(key) : opsKeys.get(key); + final Operation previous = batchOps.remove(previousIndex); + previous.close(); + discardedSize += previous.totalLength(); + discardedCount++; + debug(() -> String.format("%s overwriting a previous %s[valLen => %s]", this, previous.getOpType(), + previous.valLen())); + } + } + + void overWriteOpIfExist(Bytes key, Operation operation) { + Preconditions.checkState(!isCommit, "%s is already committed.", this); + deleteIfExist(key, true); + batchSize += operation.totalLength(); + int newIndex = opIndex.getAndIncrement(); + final Integer overwritten = opsKeys.put(key, newIndex); + batchOps.put(newIndex, operation); + Preconditions.checkState(overwritten == null || !batchOps.containsKey(overwritten)); + debug(() -> String.format("%s %s, %s; key=%s", this, + Op.DELETE == operation.getOpType() ? delString(operation.totalLength()) : putString(operation.keyLen(), + operation.valLen()), + batchSizeDiscardedString(), key)); + } + + void put(CodecBuffer key, CodecBuffer value) { + putCount++; + + // always release the key with the value + Bytes keyBytes = new Bytes(key); + overWriteOpIfExist(keyBytes, new CodecBufferPutOperation(key, value, keyBytes)); + } + + void put(byte[] key, byte[] value) { + putCount++; + Bytes keyBytes = new Bytes(key); + overWriteOpIfExist(keyBytes, new ByteArrayPutOperation(key, value, keyBytes)); + } + + void delete(byte[] key) { + delCount++; + Bytes keyBytes = new Bytes(key); + overWriteOpIfExist(keyBytes, new DeleteOperation(key, keyBytes)); + } + + void deleteRange(byte[] startKey, byte[] endKey) { + delRangeCount++; + batchOps.put(opIndex.getAndIncrement(), new DeleteRangeOperation(startKey, endKey)); + } + + String putString(int keySize, int valueSize) { + return String.format("put(key: %s, value: %s), #put=%s", + byteSize2String(keySize), byteSize2String(valueSize), putCount); + } + + String delString(int keySize) { + return String.format("del(key: %s), #del=%s", + byteSize2String(keySize), delCount); + } + + String batchSizeDiscardedString() { + return String.format("batchSize=%s, discarded: %s", + byteSize2String(batchSize), + countSize2String(discardedCount, discardedSize)); + } + + @Override + public String toString() { + return name + ": " + family.getName(); + } + } + + void put(ColumnFamily f, CodecBuffer key, CodecBuffer value) { + name2cache.computeIfAbsent(f.getName(), k -> new FamilyCache(f)) + .put(key, value); + } + + void put(ColumnFamily f, byte[] key, byte[] value) { + name2cache.computeIfAbsent(f.getName(), k -> new FamilyCache(f)) + .put(key, value); + } + + void delete(ColumnFamily family, byte[] key) { + name2cache.computeIfAbsent(family.getName(), k -> new FamilyCache(family)) + .delete(key); + } + + void deleteRange(ColumnFamily family, byte[] startKey, byte[] endKey) { + name2cache.computeIfAbsent(family.getName(), k -> new FamilyCache(family)) + .deleteRange(startKey, endKey); + } + + /** Prepare batch write for the entire cache. */ + UncheckedAutoCloseable prepareBatchWrite() throws RocksDatabaseException { + for (Map.Entry e : name2cache.entrySet()) { + e.getValue().prepareBatchWrite(); + } + return this::clear; + } + + void clear() { + for (Map.Entry e : name2cache.entrySet()) { + e.getValue().clear(); + } + name2cache.clear(); + } + + String getCommitString() { + int putCount = 0; + int delCount = 0; + int opSize = 0; + int discardedCount = 0; + int discardedSize = 0; + int delRangeCount = 0; + + for (FamilyCache f : name2cache.values()) { + putCount += f.putCount; + delCount += f.delCount; + opSize += f.batchSize; + discardedCount += f.discardedCount; + discardedSize += f.discardedSize; + delRangeCount += f.delRangeCount; + } + + final int opCount = putCount + delCount; + return String.format( + "#put=%s, #del=%s, #delRange=%s, batchSize: %s, discarded: %s, committed: %s", + putCount, delCount, delRangeCount, + countSize2String(opCount, opSize), + countSize2String(discardedCount, discardedSize), + countSize2String(opCount - discardedCount, opSize - discardedSize)); + } + } + + public AtomicRDBBatchOperation() { + this(new ManagedWriteBatch()); + } + + public AtomicRDBBatchOperation(ManagedWriteBatch writeBatch) { + this.writeBatch = writeBatch; + } + + @Override + public String toString() { + return name; + } + + @Override + void commit(RocksDatabase db) throws RocksDatabaseException { + debug(() -> String.format("%s: commit %s", + name, opCache.getCommitString())); + try (UncheckedAutoCloseable ignored = opCache.prepareBatchWrite()) { + db.batchWrite(writeBatch); + } + } + + @Override + void commit(RocksDatabase db, ManagedWriteOptions writeOptions) throws RocksDatabaseException { + debug(() -> String.format("%s: commit-with-writeOptions %s", + name, opCache.getCommitString())); + try (UncheckedAutoCloseable ignored = opCache.prepareBatchWrite()) { + db.batchWrite(writeBatch, writeOptions); + } + } + + @Override + public void close() { + debug(() -> String.format("%s: close", name)); + writeBatch.close(); + opCache.clear(); + } + + void delete(ColumnFamily family, byte[] key) { + opCache.delete(family, key); + } + + void put(ColumnFamily family, CodecBuffer key, CodecBuffer value) { + opCache.put(family, key, value); + } + + void put(ColumnFamily family, byte[] key, byte[] value) { + opCache.put(family, key, value); + } + + void deleteRange(ColumnFamily family, byte[] startKey, byte[] endKey) { + opCache.deleteRange(family, startKey, endKey); + } +} diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/BatchOperation.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/BatchOperation.java index 7233422eeff8..26c5a2a761ba 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/BatchOperation.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/BatchOperation.java @@ -17,11 +17,11 @@ package org.apache.hadoop.hdds.utils.db; +import java.io.Closeable; + /** * Class represents a batch operation, collects multiple db operation. */ -public interface BatchOperation extends AutoCloseable { +public interface BatchOperation extends Closeable { - @Override - void close(); } diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/BatchOperationHandler.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/BatchOperationHandler.java index 179247f89373..4fc93bf9ed63 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/BatchOperationHandler.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/BatchOperationHandler.java @@ -17,6 +17,8 @@ package org.apache.hadoop.hdds.utils.db; +import java.io.IOException; + /** * Create and commit batch operation for one DB. */ @@ -36,5 +38,5 @@ public interface BatchOperationHandler { * * @param operation which contains all the required batch operation. */ - void commitBatchOperation(BatchOperation operation) throws RocksDatabaseException; + void commitBatchOperation(BatchOperation operation) throws IOException; } diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBBatchOperation.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBBatchOperation.java index f7b025ed98f8..805d5caee593 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBBatchOperation.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBBatchOperation.java @@ -17,665 +17,46 @@ package org.apache.hadoop.hdds.utils.db; -import static org.apache.hadoop.hdds.StringUtils.bytes2String; - -import com.google.common.base.Preconditions; -import com.google.common.primitives.UnsignedBytes; -import java.io.Closeable; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Comparator; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.Supplier; -import java.util.stream.Collectors; -import org.apache.commons.lang3.tuple.Pair; -import org.apache.hadoop.hdds.utils.IOUtils; +import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.hdds.utils.db.RocksDatabase.ColumnFamily; import org.apache.hadoop.hdds.utils.db.managed.ManagedWriteBatch; import org.apache.hadoop.hdds.utils.db.managed.ManagedWriteOptions; -import org.apache.ratis.util.TraditionalBinaryPrefix; -import org.apache.ratis.util.UncheckedAutoCloseable; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** - * Batch operation implementation for rocks db. - * Note that a {@link RDBBatchOperation} object only for one batch. - * Also, this class is not threadsafe. + * Interface for performing batch operations on a RocksDB database. + * + * Provides methods to perform operations on a specific column family within + * a database, such as inserting or deleting a key-value pair or deleting + * a range of keys. These batch operations are designed for use in scenarios + * where multiple database modifications need to be grouped together, ensuring + * efficiency and atomicity. + * + * This interface extends {@link BatchOperation}, inheriting functionality for + * managing batch sizes and allowing cleanup of batch resources via the + * {@link #close()} method. */ -public class RDBBatchOperation implements BatchOperation { - static final Logger LOG = LoggerFactory.getLogger(RDBBatchOperation.class); - - private static final AtomicInteger BATCH_COUNT = new AtomicInteger(); - - private final String name = "Batch-" + BATCH_COUNT.getAndIncrement(); - - private final ManagedWriteBatch writeBatch; - - private final OpCache opCache = new OpCache(); - - private enum Op { DELETE, PUT, DELETE_RANGE } - - private static void debug(Supplier message) { - if (LOG.isTraceEnabled()) { - LOG.trace("\n{}", message.get()); - } - } - - private static String byteSize2String(long length) { - return TraditionalBinaryPrefix.long2String(length, "B", 2); - } - - private static String countSize2String(int count, long size) { - return count + " (" + byteSize2String(size) + ")"; - } - - /** - * The key type of {@link RDBBatchOperation.OpCache.FamilyCache#opsKeys}. - * To implement {@link #equals(Object)} and {@link #hashCode()} - * based on the contents of the bytes. - */ - static final class Bytes implements Comparable { - private final byte[] array; - private final CodecBuffer buffer; - /** Cache the hash value. */ - private final int hash; - - Bytes(CodecBuffer buffer) { - this.array = null; - this.buffer = Objects.requireNonNull(buffer, "buffer == null"); - this.hash = buffer.asReadOnlyByteBuffer().hashCode(); - } - - Bytes(byte[] array) { - this.array = array; - this.buffer = null; - this.hash = ByteBuffer.wrap(array).hashCode(); - } - - ByteBuffer asReadOnlyByteBuffer() { - return buffer.asReadOnlyByteBuffer(); - } - - @Override - public boolean equals(Object obj) { - if (this == obj) { - return true; - } else if (!(obj instanceof Bytes)) { - return false; - } - final Bytes that = (Bytes) obj; - if (this.hash != that.hash) { - return false; - } - final ByteBuffer thisBuf = this.array != null ? - ByteBuffer.wrap(this.array) : this.asReadOnlyByteBuffer(); - final ByteBuffer thatBuf = that.array != null ? - ByteBuffer.wrap(that.array) : that.asReadOnlyByteBuffer(); - return thisBuf.equals(thatBuf); - } - - @Override - public int hashCode() { - return hash; - } - - @Override - public String toString() { - return array != null ? bytes2String(array) - : bytes2String(asReadOnlyByteBuffer()); - } - - // This method mimics the ByteWiseComparator in RocksDB. - @Override - public int compareTo(RDBBatchOperation.Bytes that) { - final ByteBuffer thisBuf = this.array != null ? - ByteBuffer.wrap(this.array) : this.asReadOnlyByteBuffer(); - final ByteBuffer thatBuf = that.array != null ? - ByteBuffer.wrap(that.array) : that.asReadOnlyByteBuffer(); - - for (int i = 0; i < Math.min(thisBuf.remaining(), thatBuf.remaining()); i++) { - int cmp = UnsignedBytes.compare(thisBuf.get(i), thatBuf.get(i)); - if (cmp != 0) { - return cmp; - } - } - return thisBuf.remaining() - thatBuf.remaining(); - } - } - - private abstract class Operation implements Closeable { - private Bytes keyBytes; - - private Operation(Bytes keyBytes) { - this.keyBytes = keyBytes; - } - - abstract void apply(ColumnFamily family, ManagedWriteBatch batch) throws RocksDatabaseException; - - abstract int keyLen(); - - abstract int valLen(); - - Bytes getKey() { - return keyBytes; - } - - int totalLength() { - return keyLen() + valLen(); - } - - abstract Op getOpType(); - - @Override - public void close() { - } - } - - /** - * Delete operation to be applied to a {@link ColumnFamily} batch. - */ - private final class DeleteOperation extends Operation { - private final byte[] key; - - private DeleteOperation(byte[] key, Bytes keyBytes) { - super(Objects.requireNonNull(keyBytes, "keyBytes == null")); - this.key = Objects.requireNonNull(key, "key == null"); - } - - @Override - public void apply(ColumnFamily family, ManagedWriteBatch batch) throws RocksDatabaseException { - family.batchDelete(batch, this.key); - } - - @Override - public int keyLen() { - return key.length; - } - - @Override - public int valLen() { - return 0; - } - - @Override - public Op getOpType() { - return Op.DELETE; - } - } - - /** - * Put operation to be applied to a {@link ColumnFamily} batch using the CodecBuffer api. - */ - private final class CodecBufferPutOperation extends Operation { - private final CodecBuffer key; - private final CodecBuffer value; - private final AtomicBoolean closed = new AtomicBoolean(false); - - private CodecBufferPutOperation(CodecBuffer key, CodecBuffer value, Bytes keyBytes) { - super(keyBytes); - this.key = key; - this.value = value; - } - - @Override - public void apply(ColumnFamily family, ManagedWriteBatch batch) throws RocksDatabaseException { - family.batchPut(batch, key.asReadOnlyByteBuffer(), value.asReadOnlyByteBuffer()); - } - - @Override - public int keyLen() { - return key.readableBytes(); - } - - @Override - public int valLen() { - return value.readableBytes(); - } - - @Override - public Op getOpType() { - return Op.PUT; - } - - @Override - public void close() { - if (closed.compareAndSet(false, true)) { - key.release(); - value.release(); - } - super.close(); - } - } - - /** - * Put operation to be applied to a {@link ColumnFamily} batch using the byte array api. - */ - private final class ByteArrayPutOperation extends Operation { - private final byte[] key; - private final byte[] value; - - private ByteArrayPutOperation(byte[] key, byte[] value, Bytes keyBytes) { - super(Objects.requireNonNull(keyBytes)); - this.key = Objects.requireNonNull(key, "key == null"); - this.value = Objects.requireNonNull(value, "value == null"); - } - - @Override - public void apply(ColumnFamily family, ManagedWriteBatch batch) throws RocksDatabaseException { - family.batchPut(batch, key, value); - } +public abstract class RDBBatchOperation implements BatchOperation { - @Override - public int keyLen() { - return key.length; - } + abstract void delete(ColumnFamily family, byte[] key) throws RocksDatabaseException; - @Override - public int valLen() { - return value.length; - } + abstract void put(ColumnFamily family, CodecBuffer key, CodecBuffer value) throws RocksDatabaseException; - @Override - public Op getOpType() { - return Op.PUT; - } - } - - /** - * Delete range operation to be applied to a {@link ColumnFamily} batch. - */ - private final class DeleteRangeOperation extends Operation { - private final byte[] startKey; - private final byte[] endKey; - - private DeleteRangeOperation(byte[] startKey, byte[] endKey) { - super(null); - this.startKey = Objects.requireNonNull(startKey, "startKey == null"); - this.endKey = Objects.requireNonNull(endKey, "endKey == null"); - } - - @Override - public void apply(ColumnFamily family, ManagedWriteBatch batch) throws RocksDatabaseException { - family.batchDeleteRange(batch, startKey, endKey); - } - - @Override - public int keyLen() { - return startKey.length + endKey.length; - } - - @Override - public int valLen() { - return 0; - } - - @Override - public Op getOpType() { - return Op.DELETE_RANGE; - } - } - - /** Cache and deduplicate db ops (put/delete). */ - private class OpCache { - /** A (family name -> {@link FamilyCache}) map. */ - private final Map name2cache = new HashMap<>(); - - /** A cache for a {@link ColumnFamily}. */ - private class FamilyCache { - private final ColumnFamily family; - /** - * A mapping of operation keys to their respective indices in {@code FamilyCache}. - * - * Key details: - * - Maintains a mapping of unique operation keys to their insertion or processing order. - * - Used internally to manage and sort operations during batch writes. - * - Facilitates filtering, overwriting, or deletion of operations based on their keys. - * - * Constraints: - * - Keys must be unique, represented using {@link Bytes}, to avoid collisions. - * - Each key is associated with a unique integer index to track insertion order. - * - * This field plays a critical role in managing the logical consistency and proper execution - * order of operations stored in the batch when interacting with a RocksDB-backed system. - */ - private final Map opsKeys = new HashMap<>(); - /** - * Maintains a mapping of unique operation indices to their corresponding {@code Operation} instances. - * - * This map serves as the primary container for recording operations in preparation for a batch write - * within a RocksDB-backed system. Each operation is referenced by an integer index, which determines - * its insertion order and ensures correct sequencing during batch execution. - * - * Key characteristics: - * - Stores operations of type {@code Operation}. - * - Uses a unique integer key (index) for mapping each operation. - * - Serves as an intermediary structure during batch preparation and execution. - * - * Usage context: - * - This map is managed as part of the batch-writing process, which involves organizing, - * filtering, and applying multiple operations in a single cohesive batch. - * - Operations stored in this map are expected to define specific actions (e.g., put, delete, - * delete range) and their associated data (e.g., keys, values). - */ - private final Map batchOps = new HashMap<>(); - private boolean isCommit; - - private long batchSize; - private long discardedSize; - private int discardedCount; - private int putCount; - private int delCount; - private int delRangeCount; - private AtomicInteger opIndex; - - FamilyCache(ColumnFamily family) { - this.family = family; - this.opIndex = new AtomicInteger(0); - } - - /** - * Prepares a batch write operation for a RocksDB-backed system. - * - * This method ensures the orderly execution of operations accumulated in the batch, - * respecting their respective types and order of insertion. - * - * Key functionalities: - * 1. Ensures that the batch is not already committed before proceeding. - * 2. Sorts all operations by their `opIndex` to maintain a consistent execution order. - * 3. Filters and adapts operations to account for any delete range operations that might - * affect other operations in the batch: - * - Operations with keys that fall within the range specified by a delete range operation - * are discarded. - * - Delete range operations are executed in their correct order. - * 4. Applies remaining operations to the write batch, ensuring proper filtering and execution. - * 5. Logs a summary of the batch execution for debugging purposes. - * - * Throws: - * - RocksDatabaseException if any error occurs while applying operations to the write batch. - * - * Prerequisites: - * - The method assumes that the operations are represented by `Operation` objects, each of which - * encapsulates the logic for its specific type. - * - Delete range operations must be represented by the `DeleteRangeOperation` class. - */ - void prepareBatchWrite() throws RocksDatabaseException { - Preconditions.checkState(!isCommit, "%s is already committed.", this); - isCommit = true; - // Sort Entries based on opIndex and flush the operation to the batch in the same order. - List ops = batchOps.entrySet().stream().sorted(Comparator.comparingInt(Map.Entry::getKey)) - .map(Map.Entry::getValue).collect(Collectors.toList()); - List> deleteRangeIndices = new ArrayList<>(); - int index = 0; - int prevIndex = -2; - for (Operation op : ops) { - if (Op.DELETE_RANGE == op.getOpType()) { - if (index - prevIndex > 1) { - deleteRangeIndices.add(new ArrayList<>()); - } - List continuousIndices = deleteRangeIndices.get(deleteRangeIndices.size() - 1); - continuousIndices.add(index); - prevIndex = index; - } - index++; - } - // This is to apply the last batch of entries after the last DeleteRangeOperation. - deleteRangeIndices.add(Collections.emptyList()); - int startIndex = 0; - for (List continuousDeleteRangeIndices : deleteRangeIndices) { - List deleteRangeOps = continuousDeleteRangeIndices.stream() - .map(i -> (DeleteRangeOperation)ops.get(i)) - .collect(Collectors.toList()); - List> deleteRangeOpsRanges = continuousDeleteRangeIndices.stream() - .map(i -> (DeleteRangeOperation)ops.get(i)) - .map(i -> Pair.of(new Bytes(i.startKey), new Bytes(i.endKey))) - .collect(Collectors.toList()); - int firstOpIndex = continuousDeleteRangeIndices.isEmpty() ? ops.size() : continuousDeleteRangeIndices.get(0); - - for (int i = startIndex; i < firstOpIndex; i++) { - Operation op = ops.get(i); - Bytes key = op.getKey(); - // Compare the key with the startKey and endKey of the delete range operation. Add to Batch if key - // doesn't fall [startKey, endKey) range. - boolean keyInRange = false; - Pair deleteRange = null; - for (Pair deleteRangeOp : deleteRangeOpsRanges) { - if (key.compareTo(deleteRangeOp.getLeft()) >= 0 && key.compareTo(deleteRangeOp.getRight()) < 0) { - keyInRange = true; - deleteRange = deleteRangeOp; - break; - } - } - if (!keyInRange) { - op.apply(family, writeBatch); - } else { - Pair finalDeleteRange = deleteRange; - debug(() -> String.format("Discarding Operation with Key: %s as it falls within the range of [%s, %s)", - bytes2String(key.asReadOnlyByteBuffer()), - bytes2String(finalDeleteRange.getKey().asReadOnlyByteBuffer()), - bytes2String(finalDeleteRange.getRight().asReadOnlyByteBuffer()))); - discardedCount++; - discardedSize += op.totalLength(); - } - } - for (DeleteRangeOperation deleteRangeOp : deleteRangeOps) { - // Apply the delete range operation to the batch. - deleteRangeOp.apply(family, writeBatch); - } - // Update the startIndex to start from the next operation after the delete range operation. - startIndex = firstOpIndex + continuousDeleteRangeIndices.size(); - } - debug(this::summary); - } - - private String summary() { - return String.format(" %s %s, #put=%s, #del=%s", this, - batchSizeDiscardedString(), putCount, delCount); - } - - void clear() { - final boolean warn = !isCommit && batchSize > 0; - String details = warn ? summary() : null; - - IOUtils.close(LOG, batchOps.values()); - batchOps.clear(); - - if (warn) { - LOG.warn("discarding changes {}", details); - } - } - - private void deleteIfExist(Bytes key, boolean removeFromIndexMap) { - // remove previous first in order to call release() - if (opsKeys.containsKey(key)) { - int previousIndex = removeFromIndexMap ? opsKeys.remove(key) : opsKeys.get(key); - final Operation previous = batchOps.remove(previousIndex); - previous.close(); - discardedSize += previous.totalLength(); - discardedCount++; - debug(() -> String.format("%s overwriting a previous %s[valLen => %s]", this, previous.getOpType(), - previous.valLen())); - } - } - - void overWriteOpIfExist(Bytes key, Operation operation) { - Preconditions.checkState(!isCommit, "%s is already committed.", this); - deleteIfExist(key, true); - batchSize += operation.totalLength(); - int newIndex = opIndex.getAndIncrement(); - final Integer overwritten = opsKeys.put(key, newIndex); - batchOps.put(newIndex, operation); - Preconditions.checkState(overwritten == null || !batchOps.containsKey(overwritten)); - debug(() -> String.format("%s %s, %s; key=%s", this, - Op.DELETE == operation.getOpType() ? delString(operation.totalLength()) : putString(operation.keyLen(), - operation.valLen()), - batchSizeDiscardedString(), key)); - } - - void put(CodecBuffer key, CodecBuffer value) { - putCount++; - - // always release the key with the value - Bytes keyBytes = new Bytes(key); - overWriteOpIfExist(keyBytes, new CodecBufferPutOperation(key, value, keyBytes)); - } - - void put(byte[] key, byte[] value) { - putCount++; - Bytes keyBytes = new Bytes(key); - overWriteOpIfExist(keyBytes, new ByteArrayPutOperation(key, value, keyBytes)); - } - - void delete(byte[] key) { - delCount++; - Bytes keyBytes = new Bytes(key); - overWriteOpIfExist(keyBytes, new DeleteOperation(key, keyBytes)); - } - - void deleteRange(byte[] startKey, byte[] endKey) { - delRangeCount++; - batchOps.put(opIndex.getAndIncrement(), new DeleteRangeOperation(startKey, endKey)); - } - - String putString(int keySize, int valueSize) { - return String.format("put(key: %s, value: %s), #put=%s", - byteSize2String(keySize), byteSize2String(valueSize), putCount); - } + abstract void put(ColumnFamily family, byte[] key, byte[] value) throws RocksDatabaseException; - String delString(int keySize) { - return String.format("del(key: %s), #del=%s", - byteSize2String(keySize), delCount); - } + abstract void deleteRange(ColumnFamily family, byte[] startKey, byte[] endKey) throws RocksDatabaseException; - String batchSizeDiscardedString() { - return String.format("batchSize=%s, discarded: %s", - byteSize2String(batchSize), - countSize2String(discardedCount, discardedSize)); - } + abstract void commit(RocksDatabase db) throws RocksDatabaseException; - @Override - public String toString() { - return name + ": " + family.getName(); - } - } - - void put(ColumnFamily f, CodecBuffer key, CodecBuffer value) { - name2cache.computeIfAbsent(f.getName(), k -> new FamilyCache(f)) - .put(key, value); - } - - void put(ColumnFamily f, byte[] key, byte[] value) { - name2cache.computeIfAbsent(f.getName(), k -> new FamilyCache(f)) - .put(key, value); - } - - void delete(ColumnFamily family, byte[] key) { - name2cache.computeIfAbsent(family.getName(), k -> new FamilyCache(family)) - .delete(key); - } - - void deleteRange(ColumnFamily family, byte[] startKey, byte[] endKey) { - name2cache.computeIfAbsent(family.getName(), k -> new FamilyCache(family)) - .deleteRange(startKey, endKey); - } - - /** Prepare batch write for the entire cache. */ - UncheckedAutoCloseable prepareBatchWrite() throws RocksDatabaseException { - for (Map.Entry e : name2cache.entrySet()) { - e.getValue().prepareBatchWrite(); - } - return this::clear; - } - - void clear() { - for (Map.Entry e : name2cache.entrySet()) { - e.getValue().clear(); - } - name2cache.clear(); - } - - String getCommitString() { - int putCount = 0; - int delCount = 0; - int opSize = 0; - int discardedCount = 0; - int discardedSize = 0; - int delRangeCount = 0; - - for (FamilyCache f : name2cache.values()) { - putCount += f.putCount; - delCount += f.delCount; - opSize += f.batchSize; - discardedCount += f.discardedCount; - discardedSize += f.discardedSize; - delRangeCount += f.delRangeCount; - } - - final int opCount = putCount + delCount; - return String.format( - "#put=%s, #del=%s, #delRange=%s, batchSize: %s, discarded: %s, committed: %s", - putCount, delCount, delRangeCount, - countSize2String(opCount, opSize), - countSize2String(discardedCount, discardedSize), - countSize2String(opCount - discardedCount, opSize - discardedSize)); - } - } - - public RDBBatchOperation() { - writeBatch = new ManagedWriteBatch(); - } - - public RDBBatchOperation(ManagedWriteBatch writeBatch) { - this.writeBatch = writeBatch; - } - - @Override - public String toString() { - return name; - } - - public void commit(RocksDatabase db) throws RocksDatabaseException { - debug(() -> String.format("%s: commit %s", - name, opCache.getCommitString())); - try (UncheckedAutoCloseable ignored = opCache.prepareBatchWrite()) { - db.batchWrite(writeBatch); - } - } - - public void commit(RocksDatabase db, ManagedWriteOptions writeOptions) throws RocksDatabaseException { - debug(() -> String.format("%s: commit-with-writeOptions %s", - name, opCache.getCommitString())); - try (UncheckedAutoCloseable ignored = opCache.prepareBatchWrite()) { - db.batchWrite(writeBatch, writeOptions); - } - } - - @Override - public void close() { - debug(() -> String.format("%s: close", name)); - writeBatch.close(); - opCache.clear(); - } - - public void delete(ColumnFamily family, byte[] key) { - opCache.delete(family, key); - } - - public void put(ColumnFamily family, CodecBuffer key, CodecBuffer value) { - opCache.put(family, key, value); - } + abstract void commit(RocksDatabase db, ManagedWriteOptions writeOptions) throws RocksDatabaseException; - public void put(ColumnFamily family, byte[] key, byte[] value) { - opCache.put(family, key, value); + // TODO: Remove this once recon components code implements BatchOperationHandler and make use of mocked batch + // operation. + @VisibleForTesting + public static RDBBatchOperation newAtomicOperation() { + return new AtomicRDBBatchOperation(); } - public void deleteRange(ColumnFamily family, byte[] startKey, byte[] endKey) { - opCache.deleteRange(family, startKey, endKey); + static RDBBatchOperation newAtomicOperation(ManagedWriteBatch writeBatch) { + return new AtomicRDBBatchOperation(writeBatch); } } diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java index c833fed6ab49..498adc5891fd 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java @@ -48,6 +48,7 @@ import org.apache.hadoop.hdds.utils.db.managed.ManagedDBOptions; import org.apache.hadoop.hdds.utils.db.managed.ManagedStatistics; import org.apache.hadoop.hdds.utils.db.managed.ManagedTransactionLogIterator; +import org.apache.hadoop.hdds.utils.db.managed.ManagedWriteBatch; import org.apache.hadoop.hdds.utils.db.managed.ManagedWriteOptions; import org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer; import org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer.RocksDBCheckpointDifferHolder; @@ -273,14 +274,21 @@ public long getEstimatedKeyCount() throws RocksDatabaseException { } @Override - public BatchOperation initBatchOperation() { - return new RDBBatchOperation(); + public RDBBatchOperation initBatchOperation() { + return RDBBatchOperation.newAtomicOperation(); + } + + public RDBBatchOperation initBatchOperation(ManagedWriteBatch writeBatch) { + return RDBBatchOperation.newAtomicOperation(writeBatch); } @Override - public void commitBatchOperation(BatchOperation operation) - throws RocksDatabaseException { - ((RDBBatchOperation) operation).commit(db); + public void commitBatchOperation(BatchOperation operation) throws IOException { + ((RDBBatchOperation)operation).commit(db); + } + + public void commitBatchOperation(RDBBatchOperation operation, ManagedWriteOptions writeOptions) throws IOException { + operation.commit(db, writeOptions); } @Override diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBTable.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBTable.java index 045f020b2fe3..329971d9df1a 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBTable.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBTable.java @@ -70,7 +70,7 @@ public void put(byte[] key, byte[] value) throws RocksDatabaseException { db.put(family, key, value); } - void putWithBatch(BatchOperation batch, CodecBuffer key, CodecBuffer value) { + void putWithBatch(BatchOperation batch, CodecBuffer key, CodecBuffer value) throws RocksDatabaseException { if (batch instanceof RDBBatchOperation) { ((RDBBatchOperation) batch).put(family, key, value); } else { @@ -80,7 +80,7 @@ void putWithBatch(BatchOperation batch, CodecBuffer key, CodecBuffer value) { } @Override - public void putWithBatch(BatchOperation batch, byte[] key, byte[] value) { + public void putWithBatch(BatchOperation batch, byte[] key, byte[] value) throws RocksDatabaseException { if (batch instanceof RDBBatchOperation) { ((RDBBatchOperation) batch).put(family, key, value); } else { @@ -194,7 +194,7 @@ public void deleteRange(byte[] beginKey, byte[] endKey) throws RocksDatabaseExce } @Override - public void deleteWithBatch(BatchOperation batch, byte[] key) { + public void deleteWithBatch(BatchOperation batch, byte[] key) throws RocksDatabaseException { if (batch instanceof RDBBatchOperation) { ((RDBBatchOperation) batch).delete(family, key); } else { @@ -204,7 +204,7 @@ public void deleteWithBatch(BatchOperation batch, byte[] key) { } @Override - public void deleteRangeWithBatch(BatchOperation batch, byte[] beginKey, byte[] endKey) { + public void deleteRangeWithBatch(BatchOperation batch, byte[] beginKey, byte[] endKey) throws RocksDatabaseException { if (batch instanceof RDBBatchOperation) { ((RDBBatchOperation) batch).deleteRange(family, beginKey, endKey); } else { diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/Table.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/Table.java index 6904f22d7d8c..3d53d9a997a8 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/Table.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/Table.java @@ -132,7 +132,7 @@ default VALUE getReadCopy(KEY key) throws RocksDatabaseException, CodecException * @param batch the batch operation * @param key metadata key */ - void deleteWithBatch(BatchOperation batch, KEY key) throws CodecException; + void deleteWithBatch(BatchOperation batch, KEY key) throws RocksDatabaseException, CodecException; /** * Deletes a range of keys from the metadata store as part of a batch operation. @@ -140,7 +140,8 @@ default VALUE getReadCopy(KEY key) throws RocksDatabaseException, CodecException * @param beginKey start metadata key, inclusive. * @param endKey end metadata key, exclusive. */ - void deleteRangeWithBatch(BatchOperation batch, KEY beginKey, KEY endKey) throws CodecException; + void deleteRangeWithBatch(BatchOperation batch, KEY beginKey, KEY endKey) throws RocksDatabaseException, + CodecException; /** * Deletes a range of keys from the metadata store. diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java index 6d2fa3a99ffb..738050fc667f 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java @@ -376,12 +376,13 @@ public void delete(KEY key) throws RocksDatabaseException, CodecException { } @Override - public void deleteWithBatch(BatchOperation batch, KEY key) throws CodecException { + public void deleteWithBatch(BatchOperation batch, KEY key) throws CodecException, RocksDatabaseException { rawTable.deleteWithBatch(batch, encodeKey(key)); } @Override - public void deleteRangeWithBatch(BatchOperation batch, KEY beginKey, KEY endKey) throws CodecException { + public void deleteRangeWithBatch(BatchOperation batch, KEY beginKey, KEY endKey) + throws CodecException, RocksDatabaseException { rawTable.deleteRangeWithBatch(batch, encodeKey(beginKey), encodeKey(endKey)); } diff --git a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestCodec.java b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestCodec.java index f695f2864055..230118cd7d6e 100644 --- a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestCodec.java +++ b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestCodec.java @@ -34,7 +34,7 @@ import java.util.UUID; import java.util.concurrent.ThreadLocalRandom; import java.util.function.Consumer; -import org.apache.hadoop.hdds.utils.db.RDBBatchOperation.Bytes; +import org.apache.hadoop.hdds.utils.db.AtomicRDBBatchOperation.Bytes; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.function.Executable; import org.slf4j.Logger; diff --git a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBBatchOperation.java b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBBatchOperation.java index bbf53b9a9608..5e5b6f757797 100644 --- a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBBatchOperation.java +++ b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBBatchOperation.java @@ -26,6 +26,7 @@ import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.when; +import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -54,7 +55,7 @@ * 2. Mocking of methods to track operations performed on*/ public class TestRDBBatchOperation { @Test - public void testBatchOperationWithDeleteRange() throws RocksDatabaseException { + public void testBatchOperationWithDeleteRange() throws IOException { final List, Integer>> deleteKeyRangePairs = new ArrayList<>(); final List, Integer>> putKeys = new ArrayList<>(); final List> deleteKeys = new ArrayList<>(); @@ -80,7 +81,7 @@ public void testBatchOperationWithDeleteRange() throws RocksDatabaseException { }).when(writeBatch).delete(Mockito.any(ColumnFamilyHandle.class), Mockito.any(byte[].class)); }); - RDBBatchOperation batchOperation = new RDBBatchOperation()) { + RDBBatchOperation batchOperation = RDBBatchOperation.newAtomicOperation()) { ColumnFamilyHandle columnFamilyHandle = Mockito.mock(ColumnFamilyHandle.class); RocksDatabase.ColumnFamily columnFamily = Mockito.mock(RocksDatabase.ColumnFamily.class); doAnswer((i) -> { diff --git a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBTableStore.java b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBTableStore.java index cd155f27a96f..9f56a1c9c648 100644 --- a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBTableStore.java +++ b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBTableStore.java @@ -260,7 +260,7 @@ public void deleteRange() throws Exception { @Test public void batchPut() throws Exception { final Table testTable = rdbStore.getTable("Fifth"); - try (BatchOperation batch = rdbStore.initBatchOperation()) { + try (RDBBatchOperation batch = rdbStore.initBatchOperation()) { //given byte[] key = RandomStringUtils.secure().next(10).getBytes(StandardCharsets.UTF_8); @@ -280,7 +280,7 @@ public void batchPut() throws Exception { @Test public void batchDelete() throws Exception { final Table testTable = rdbStore.getTable("Fifth"); - try (BatchOperation batch = rdbStore.initBatchOperation()) { + try (RDBBatchOperation batch = rdbStore.initBatchOperation()) { //given byte[] key = @@ -780,7 +780,7 @@ private void populateTable(Table table, @Test public void batchDeleteWithRange() throws Exception { final Table testTable = rdbStore.getTable("Fifth"); - try (BatchOperation batch = rdbStore.initBatchOperation()) { + try (RDBBatchOperation batch = rdbStore.initBatchOperation()) { //given String keyStr = RandomStringUtils.secure().next(10); @@ -814,7 +814,7 @@ public void batchDeleteWithRange() throws Exception { @Test public void orderOfBatchOperations() throws Exception { final Table testTable = rdbStore.getTable("Fifth"); - try (BatchOperation batch = rdbStore.initBatchOperation()) { + try (RDBBatchOperation batch = rdbStore.initBatchOperation()) { //given String keyStr = RandomStringUtils.secure().next(10); diff --git a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestTypedRDBTableStore.java b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestTypedRDBTableStore.java index 8bf3e59e210f..c945a421296a 100644 --- a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestTypedRDBTableStore.java +++ b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestTypedRDBTableStore.java @@ -161,7 +161,7 @@ public void delete() throws Exception { @Test public void batchPut() throws Exception { final Table testTable = createTypedTable("Fourth"); - try (BatchOperation batch = rdbStore.initBatchOperation()) { + try (RDBBatchOperation batch = rdbStore.initBatchOperation()) { //given String key = RandomStringUtils.secure().next(10); @@ -180,7 +180,7 @@ public void batchPut() throws Exception { @Test public void batchDelete() throws Exception { final Table testTable = createTypedTable("Fourth"); - try (BatchOperation batch = rdbStore.initBatchOperation()) { + try (RDBBatchOperation batch = rdbStore.initBatchOperation()) { //given String key = diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHADBTransactionBuffer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHADBTransactionBuffer.java index 7acf03424d84..540910d040a7 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHADBTransactionBuffer.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHADBTransactionBuffer.java @@ -17,6 +17,7 @@ package org.apache.hadoop.hdds.scm.ha; +import java.io.IOException; import java.util.concurrent.atomic.AtomicReference; import org.apache.hadoop.hdds.scm.metadata.DBTransactionBuffer; import org.apache.hadoop.hdds.utils.TransactionInfo; @@ -42,7 +43,7 @@ public interface SCMHADBTransactionBuffer AtomicReference getLatestSnapshotRef(); - void flush() throws RocksDatabaseException, CodecException; + void flush() throws IOException; boolean shouldFlush(long snapshotWaitTime); diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHADBTransactionBufferImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHADBTransactionBufferImpl.java index 4b1243fd53db..a27db5160f12 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHADBTransactionBufferImpl.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHADBTransactionBufferImpl.java @@ -20,6 +20,7 @@ import static org.apache.hadoop.ozone.OzoneConsts.TRANSACTION_INFO_KEY; import com.google.common.base.Preconditions; +import java.io.IOException; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -78,7 +79,8 @@ public void addToBuffer(Table table, KEY key, VALUE val } @Override - public void removeFromBuffer(Table table, KEY key) throws CodecException { + public void removeFromBuffer(Table table, KEY key) + throws CodecException, RocksDatabaseException { rwLock.readLock().lock(); try { txFlushPending.getAndIncrement(); @@ -121,7 +123,7 @@ public AtomicReference getLatestSnapshotRef() { } @Override - public void flush() throws RocksDatabaseException, CodecException { + public void flush() throws IOException { rwLock.writeLock().lock(); try { // write latest trx info into trx table in the same batch @@ -190,7 +192,7 @@ public String toString() { } @Override - public void close() { + public void close() throws IOException { if (currentBatchOperation != null) { currentBatchOperation.close(); } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHADBTransactionBufferStub.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHADBTransactionBufferStub.java index 3e7803407f05..dfffcd789182 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHADBTransactionBufferStub.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHADBTransactionBufferStub.java @@ -17,6 +17,7 @@ package org.apache.hadoop.hdds.scm.ha; +import java.io.IOException; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.hadoop.hdds.utils.TransactionInfo; @@ -49,7 +50,7 @@ private BatchOperation getCurrentBatchOperation() { if (dbStore != null) { currentBatchOperation = dbStore.initBatchOperation(); } else { - currentBatchOperation = new RDBBatchOperation(); + currentBatchOperation = RDBBatchOperation.newAtomicOperation(); } } return currentBatchOperation; @@ -67,7 +68,8 @@ public void addToBuffer(Table table, KEY key, VALUE val } @Override - public void removeFromBuffer(Table table, KEY key) throws CodecException { + public void removeFromBuffer(Table table, KEY key) + throws CodecException, RocksDatabaseException { rwLock.readLock().lock(); try { table.deleteWithBatch(getCurrentBatchOperation(), key); @@ -102,7 +104,7 @@ public AtomicReference getLatestSnapshotRef() { } @Override - public void flush() throws RocksDatabaseException { + public void flush() throws IOException { rwLock.writeLock().lock(); try { if (dbStore != null) { @@ -127,7 +129,7 @@ public void init() { } @Override - public void close() throws RocksDatabaseException { + public void close() throws IOException { flush(); } } diff --git a/hadoop-ozone/integration-test-recon/src/test/java/org/apache/hadoop/ozone/recon/TestReconTasks.java b/hadoop-ozone/integration-test-recon/src/test/java/org/apache/hadoop/ozone/recon/TestReconTasks.java index ed0ddde04c98..3de70d665700 100644 --- a/hadoop-ozone/integration-test-recon/src/test/java/org/apache/hadoop/ozone/recon/TestReconTasks.java +++ b/hadoop-ozone/integration-test-recon/src/test/java/org/apache/hadoop/ozone/recon/TestReconTasks.java @@ -148,7 +148,7 @@ public void testMissingContainerDownNode() throws Exception { .allocateContainer(RatisReplicationConfig.getInstance(ONE), "test"); long containerID = containerInfo.getContainerID(); - try (RDBBatchOperation rdbBatchOperation = new RDBBatchOperation()) { + try (RDBBatchOperation rdbBatchOperation = RDBBatchOperation.newAtomicOperation()) { reconContainerMetadataManager .batchStoreContainerKeyCounts(rdbBatchOperation, containerID, 2L); reconContainerMetadataManager.commitBatchOperation(rdbBatchOperation); @@ -264,7 +264,7 @@ public void testEmptyMissingContainerDownNode() throws Exception { // Now add a container to key mapping count as 3. This data is used to // identify if container is empty in terms of keys mapped to container. - try (RDBBatchOperation rdbBatchOperation = new RDBBatchOperation()) { + try (RDBBatchOperation rdbBatchOperation = RDBBatchOperation.newAtomicOperation()) { reconContainerMetadataManager .batchStoreContainerKeyCounts(rdbBatchOperation, containerID, 3L); reconContainerMetadataManager.commitBatchOperation(rdbBatchOperation); @@ -302,7 +302,7 @@ public void testEmptyMissingContainerDownNode() throws Exception { // Now remove keys from container. This data is used to // identify if container is empty in terms of keys mapped to container. - try (RDBBatchOperation rdbBatchOperation = new RDBBatchOperation()) { + try (RDBBatchOperation rdbBatchOperation = RDBBatchOperation.newAtomicOperation()) { reconContainerMetadataManager .batchStoreContainerKeyCounts(rdbBatchOperation, containerID, 0L); reconContainerMetadataManager.commitBatchOperation(rdbBatchOperation); diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/snapshot/OMSnapshotMoveDeletedKeysResponse.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/snapshot/OMSnapshotMoveDeletedKeysResponse.java index f5558dcfbf9d..1ee26a608b2f 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/snapshot/OMSnapshotMoveDeletedKeysResponse.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/snapshot/OMSnapshotMoveDeletedKeysResponse.java @@ -26,6 +26,7 @@ import java.util.Objects; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.utils.db.BatchOperation; +import org.apache.hadoop.hdds.utils.db.RDBBatchOperation; import org.apache.hadoop.hdds.utils.db.RDBStore; import org.apache.hadoop.ozone.om.OMMetadataManager; import org.apache.hadoop.ozone.om.OmMetadataManagerImpl; @@ -99,7 +100,7 @@ protected void addToDBBatch(OMMetadataManager omMetadataManager, RDBStore nextSnapshotStore = (RDBStore) nextOmSnapshot.getMetadataManager().getStore(); // Init Batch Operation for snapshot db. - try (BatchOperation writeBatch = + try (RDBBatchOperation writeBatch = nextSnapshotStore.initBatchOperation()) { processKeys(writeBatch, nextOmSnapshot.getMetadataManager()); processDirs(writeBatch, nextOmSnapshot.getMetadataManager(), @@ -118,7 +119,7 @@ protected void addToDBBatch(OMMetadataManager omMetadataManager, // Update From Snapshot Deleted Table. RDBStore fromSnapshotStore = (RDBStore) fromOmSnapshot.getMetadataManager().getStore(); - try (BatchOperation fromSnapshotBatchOp = + try (RDBBatchOperation fromSnapshotBatchOp = fromSnapshotStore.initBatchOperation()) { processReclaimKeys(fromSnapshotBatchOp, fromOmSnapshot.getMetadataManager()); diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/snapshot/OMSnapshotMoveTableKeysResponse.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/snapshot/OMSnapshotMoveTableKeysResponse.java index c9ed469d6caa..25c768bbb639 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/snapshot/OMSnapshotMoveTableKeysResponse.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/snapshot/OMSnapshotMoveTableKeysResponse.java @@ -27,6 +27,7 @@ import java.util.List; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.utils.db.BatchOperation; +import org.apache.hadoop.hdds.utils.db.RDBBatchOperation; import org.apache.hadoop.hdds.utils.db.RDBStore; import org.apache.hadoop.ozone.om.OMMetadataManager; import org.apache.hadoop.ozone.om.OmMetadataManagerImpl; @@ -106,7 +107,7 @@ protected void addToDBBatch(OMMetadataManager omMetadataManager, BatchOperation OmSnapshot nextOmSnapshot = rcOmNextSnapshot.get(); RDBStore nextSnapshotStore = (RDBStore) nextOmSnapshot.getMetadataManager().getStore(); // Init Batch Operation for snapshot db. - try (BatchOperation writeBatch = nextSnapshotStore.initBatchOperation()) { + try (RDBBatchOperation writeBatch = nextSnapshotStore.initBatchOperation()) { addKeysToNextSnapshot(writeBatch, nextOmSnapshot.getMetadataManager()); nextSnapshotStore.commitBatchOperation(writeBatch); nextSnapshotStore.getDb().flushWal(true); @@ -120,7 +121,7 @@ protected void addToDBBatch(OMMetadataManager omMetadataManager, BatchOperation // Update From Snapshot Deleted Table. RDBStore fromSnapshotStore = (RDBStore) fromOmSnapshot.getMetadataManager().getStore(); - try (BatchOperation fromSnapshotBatchOp = fromSnapshotStore.initBatchOperation()) { + try (RDBBatchOperation fromSnapshotBatchOp = fromSnapshotStore.initBatchOperation()) { deleteKeysFromSnapshot(fromSnapshotBatchOp, fromOmSnapshot.getMetadataManager()); fromSnapshotStore.commitBatchOperation(fromSnapshotBatchOp); fromSnapshotStore.getDb().flushWal(true); diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOmSnapshotManager.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOmSnapshotManager.java index dd7195802ed1..fc4e5b7408d5 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOmSnapshotManager.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOmSnapshotManager.java @@ -223,7 +223,7 @@ public void testCloseOnEviction() throws IOException, snapshotChainManager.addSnapshot(first); snapshotChainManager.addSnapshot(second); - RDBBatchOperation rdbBatchOperation = new RDBBatchOperation(); + RDBBatchOperation rdbBatchOperation = RDBBatchOperation.newAtomicOperation(); // create the first snapshot checkpoint OmSnapshotManager.createOmSnapshotCheckpoint(om.getMetadataManager(), first, rdbBatchOperation); @@ -238,7 +238,7 @@ public void testCloseOnEviction() throws IOException, firstSnapshot.getMetadataManager(), "store", firstSnapshotStore); // create second snapshot checkpoint (which will be used for eviction) - rdbBatchOperation = new RDBBatchOperation(); + rdbBatchOperation = RDBBatchOperation.newAtomicOperation(); OmSnapshotManager.createOmSnapshotCheckpoint(om.getMetadataManager(), second, rdbBatchOperation); om.getMetadataManager().getStore().commitBatchOperation(rdbBatchOperation); @@ -749,7 +749,7 @@ public void testCreateSnapshotIdempotent() throws Exception { when(snapshotInfoTable.get(first.getTableKey())).thenReturn(first); // Create first checkpoint for the snapshot checkpoint - RDBBatchOperation rdbBatchOperation = new RDBBatchOperation(); + RDBBatchOperation rdbBatchOperation = RDBBatchOperation.newAtomicOperation(); OmSnapshotManager.createOmSnapshotCheckpoint(om.getMetadataManager(), first, rdbBatchOperation); om.getMetadataManager().getStore().commitBatchOperation(rdbBatchOperation); @@ -758,7 +758,7 @@ public void testCreateSnapshotIdempotent() throws Exception { logCapturer.clearOutput(); // Create checkpoint again for the same snapshot. - rdbBatchOperation = new RDBBatchOperation(); + rdbBatchOperation = RDBBatchOperation.newAtomicOperation(); OmSnapshotManager.createOmSnapshotCheckpoint(om.getMetadataManager(), first, rdbBatchOperation); om.getMetadataManager().getStore().commitBatchOperation(rdbBatchOperation); diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/bucket/TestOMBucketCreateResponse.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/bucket/TestOMBucketCreateResponse.java index 7ae5f878a268..b56353459783 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/bucket/TestOMBucketCreateResponse.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/bucket/TestOMBucketCreateResponse.java @@ -19,6 +19,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; +import java.io.IOException; import java.nio.file.Path; import java.util.UUID; import org.apache.hadoop.hdds.conf.OzoneConfiguration; @@ -58,7 +59,7 @@ public void setup() throws Exception { } @AfterEach - public void tearDown() { + public void tearDown() throws IOException { if (batchOperation != null) { batchOperation.close(); } diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/bucket/TestOMBucketDeleteResponse.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/bucket/TestOMBucketDeleteResponse.java index 3699b91cd275..428027f727a7 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/bucket/TestOMBucketDeleteResponse.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/bucket/TestOMBucketDeleteResponse.java @@ -19,6 +19,7 @@ import static org.junit.jupiter.api.Assertions.assertNull; +import java.io.IOException; import java.nio.file.Path; import java.util.UUID; import org.apache.hadoop.hdds.conf.OzoneConfiguration; @@ -58,7 +59,7 @@ public void setup() throws Exception { } @AfterEach - public void tearDown() { + public void tearDown() throws IOException { if (batchOperation != null) { batchOperation.close(); } diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/bucket/TestOMBucketSetPropertyResponse.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/bucket/TestOMBucketSetPropertyResponse.java index 562549357646..f2db3c9b0224 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/bucket/TestOMBucketSetPropertyResponse.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/bucket/TestOMBucketSetPropertyResponse.java @@ -19,6 +19,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; +import java.io.IOException; import java.nio.file.Path; import java.util.UUID; import org.apache.hadoop.hdds.conf.OzoneConfiguration; @@ -58,7 +59,7 @@ public void setup() throws Exception { } @AfterEach - public void tearDown() { + public void tearDown() throws IOException { if (batchOperation != null) { batchOperation.close(); } diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/file/TestOMDirectoryCreateResponse.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/file/TestOMDirectoryCreateResponse.java index aa152a5d2b76..eaa369d2a01d 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/file/TestOMDirectoryCreateResponse.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/file/TestOMDirectoryCreateResponse.java @@ -20,6 +20,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; +import java.io.IOException; import java.nio.file.Path; import java.util.ArrayList; import java.util.UUID; @@ -66,7 +67,7 @@ public void setup() throws Exception { } @AfterEach - public void tearDown() { + public void tearDown() throws IOException { if (batchOperation != null) { batchOperation.close(); } diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/key/TestOMKeyResponse.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/key/TestOMKeyResponse.java index 02afd43960e0..0aaea10eaa41 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/key/TestOMKeyResponse.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/key/TestOMKeyResponse.java @@ -124,7 +124,7 @@ protected OzoneConfiguration getOzoneConfiguration() { } @AfterEach - public void stop() { + public void stop() throws IOException { framework().clearInlineMocks(); if (batchOperation != null) { batchOperation.close(); diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/s3/multipart/TestS3MultipartResponse.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/s3/multipart/TestS3MultipartResponse.java index ac56273d628c..123d34f4838d 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/s3/multipart/TestS3MultipartResponse.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/s3/multipart/TestS3MultipartResponse.java @@ -77,7 +77,7 @@ public void setup() throws Exception { } @AfterEach - public void tearDown() { + public void tearDown() throws IOException { if (batchOperation != null) { batchOperation.close(); } diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/security/TestOMDelegationTokenResponse.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/security/TestOMDelegationTokenResponse.java index 13568e0ca230..1e4aa61cbf12 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/security/TestOMDelegationTokenResponse.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/security/TestOMDelegationTokenResponse.java @@ -51,7 +51,7 @@ public void setup() throws IOException { } @AfterEach - public void tearDown() { + public void tearDown() throws IOException { if (batchOperation != null) { batchOperation.close(); } diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/snapshot/TestOMSnapshotCreateResponse.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/snapshot/TestOMSnapshotCreateResponse.java index 6bef4b84247b..e3f1ce8fa1d1 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/snapshot/TestOMSnapshotCreateResponse.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/snapshot/TestOMSnapshotCreateResponse.java @@ -85,7 +85,7 @@ public void setup() throws Exception { } @AfterEach - public void tearDown() { + public void tearDown() throws IOException { if (batchOperation != null) { batchOperation.close(); } diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/snapshot/TestOMSnapshotDeleteResponse.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/snapshot/TestOMSnapshotDeleteResponse.java index bdb23b65f2c8..fd18d4590ff4 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/snapshot/TestOMSnapshotDeleteResponse.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/snapshot/TestOMSnapshotDeleteResponse.java @@ -25,6 +25,7 @@ import static org.mockito.Mockito.when; import java.io.File; +import java.io.IOException; import java.nio.file.Path; import java.util.UUID; import org.apache.hadoop.hdds.conf.OzoneConfiguration; @@ -77,7 +78,7 @@ public void setup() throws Exception { } @AfterEach - public void tearDown() { + public void tearDown() throws IOException { if (batchOperation != null) { batchOperation.close(); } diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/volume/TestOMVolumeResponse.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/volume/TestOMVolumeResponse.java index e8d1707bbe9d..cd035ec7b4f0 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/volume/TestOMVolumeResponse.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/volume/TestOMVolumeResponse.java @@ -17,6 +17,7 @@ package org.apache.hadoop.ozone.om.response.volume; +import java.io.IOException; import java.nio.file.Path; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.utils.db.BatchOperation; @@ -47,7 +48,7 @@ public void setup() throws Exception { } @AfterEach - public void tearDown() { + public void tearDown() throws IOException { if (batchOperation != null) { batchOperation.close(); } diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotRequestAndResponse.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotRequestAndResponse.java index 9c6f033b907b..fc937a10a28a 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotRequestAndResponse.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotRequestAndResponse.java @@ -191,7 +191,7 @@ public void baseSetup() throws Exception { } @AfterEach - public void stop() { + public void stop() throws IOException { omMetrics.unRegister(); omSnapshotIntMetrics.unregister(); framework().clearInlineMocks(); diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java index d4edd2894f9d..76f512cbc506 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java @@ -655,10 +655,9 @@ ImmutablePair innerGetAndApplyDeltaUpdatesFromOM(long fromSequenc // Events gets populated in events list in OMDBUpdatesHandler with call back for put/delete/update writeBatch.iterate(omdbUpdatesHandler); // Commit the OM DB transactions in recon rocks DB and sync here. - try (RDBBatchOperation rdbBatchOperation = - new RDBBatchOperation(writeBatch)) { + try (RDBBatchOperation rdbBatchOperation = rocksDBStore.initBatchOperation(writeBatch)) { try (ManagedWriteOptions wOpts = new ManagedWriteOptions()) { - rdbBatchOperation.commit(rocksDB, wOpts); + rocksDBStore.commitBatchOperation(rdbBatchOperation, wOpts); } } } diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperHelper.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperHelper.java index 626376ac09a9..3e60ceceb6ba 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperHelper.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperHelper.java @@ -430,7 +430,7 @@ private static void writeToTheDB(Map localContainer ReconContainerMetadataManager reconContainerMetadataManager) throws IOException { - try (RDBBatchOperation rdbBatchOperation = new RDBBatchOperation()) { + try (RDBBatchOperation rdbBatchOperation = RDBBatchOperation.newAtomicOperation()) { // Write container key mappings (local per-task data) localContainerKeyMap.keySet().forEach((ContainerKeyPrefix key) -> { diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/FileSizeCountTaskHelper.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/FileSizeCountTaskHelper.java index b82fcd556a1b..4981083b5025 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/FileSizeCountTaskHelper.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/FileSizeCountTaskHelper.java @@ -274,7 +274,7 @@ public static void writeCountsToDB(Map fileSizeCountMap, return; } - try (RDBBatchOperation rdbBatchOperation = new RDBBatchOperation()) { + try (RDBBatchOperation rdbBatchOperation = RDBBatchOperation.newAtomicOperation()) { for (Map.Entry entry : fileSizeCountMap.entrySet()) { FileSizeCountKey key = entry.getKey(); Long deltaCount = entry.getValue(); diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/NSSummaryTaskDbEventHandler.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/NSSummaryTaskDbEventHandler.java index 974c109bb42b..3569bae71a18 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/NSSummaryTaskDbEventHandler.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/NSSummaryTaskDbEventHandler.java @@ -61,7 +61,7 @@ public ReconOMMetadataManager getReconOMMetadataManager() { private void updateNSSummariesToDB(Map nsSummaryMap, Collection objectIdsToBeDeleted) throws IOException { - try (RDBBatchOperation rdbBatchOperation = new RDBBatchOperation()) { + try (RDBBatchOperation rdbBatchOperation = RDBBatchOperation.newAtomicOperation()) { for (Map.Entry entry : nsSummaryMap.entrySet()) { try { reconNamespaceSummaryManager.batchStoreNSSummaries(rdbBatchOperation, entry.getKey(), entry.getValue()); diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OmTableInsightTask.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OmTableInsightTask.java index 8027966231de..59862cebb81f 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OmTableInsightTask.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OmTableInsightTask.java @@ -352,7 +352,7 @@ private void handleUpdateEvent(OMDBUpdateEvent event, * @param dataMap Map containing the updated count and size information. */ private void writeDataToDB(Map dataMap) { - try (RDBBatchOperation rdbBatchOperation = new RDBBatchOperation()) { + try (RDBBatchOperation rdbBatchOperation = RDBBatchOperation.newAtomicOperation()) { for (Entry entry : dataMap.entrySet()) { String key = entry.getKey(); Long value = entry.getValue(); diff --git a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestContainerEndpoint.java b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestContainerEndpoint.java index 7df56a57be65..3889f2e005b8 100644 --- a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestContainerEndpoint.java +++ b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestContainerEndpoint.java @@ -1468,7 +1468,7 @@ public void testGetContainerInsightsNonOMContainers() .stream().map(entry -> entry.getKey()).collect( Collectors.toList()); deletedContainerKeyList.forEach((ContainerKeyPrefix key) -> { - try (RDBBatchOperation rdbBatchOperation = new RDBBatchOperation()) { + try (RDBBatchOperation rdbBatchOperation = RDBBatchOperation.newAtomicOperation()) { reconContainerMetadataManager .batchDeleteContainerMapping(rdbBatchOperation, key); reconContainerMetadataManager.commitBatchOperation(rdbBatchOperation); @@ -1505,7 +1505,7 @@ public void testGetContainerInsightsNonOMContainersWithPrevKey() reconContainerMetadataManager.getKeyPrefixesForContainer(2).entrySet() .stream().map(entry -> entry.getKey()).collect(Collectors.toList()); deletedContainerKeyList.forEach((ContainerKeyPrefix key) -> { - try (RDBBatchOperation rdbBatchOperation = new RDBBatchOperation()) { + try (RDBBatchOperation rdbBatchOperation = RDBBatchOperation.newAtomicOperation()) { reconContainerMetadataManager.batchDeleteContainerMapping( rdbBatchOperation, key); reconContainerMetadataManager.commitBatchOperation(rdbBatchOperation); diff --git a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/spi/impl/TestReconContainerMetadataManagerImpl.java b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/spi/impl/TestReconContainerMetadataManagerImpl.java index 0a3d9429a7c8..22c3d6da4f44 100644 --- a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/spi/impl/TestReconContainerMetadataManagerImpl.java +++ b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/spi/impl/TestReconContainerMetadataManagerImpl.java @@ -80,7 +80,7 @@ public void setUp() throws Exception { private void populateKeysInContainers(long containerId1, long containerId2) throws Exception { - RDBBatchOperation rdbBatchOperation = new RDBBatchOperation(); + RDBBatchOperation rdbBatchOperation = RDBBatchOperation.newAtomicOperation(); ContainerKeyPrefix containerKeyPrefix1 = ContainerKeyPrefix.get( containerId1, keyPrefix1, 0); reconContainerMetadataManager @@ -119,7 +119,7 @@ public void testInitNewContainerDB() throws Exception { "V1/B2/K3", 0); prefixCounts.put(ckp3, 3); - RDBBatchOperation rdbBatchOperation = new RDBBatchOperation(); + RDBBatchOperation rdbBatchOperation = RDBBatchOperation.newAtomicOperation(); for (Map.Entry entry : prefixCounts.entrySet()) { reconContainerMetadataManager.batchStoreContainerKeyMapping( @@ -164,7 +164,7 @@ public void testBatchStoreContainerKeyMapping() throws Exception { prefixCounts.put(keyPrefix2, 2); prefixCounts.put(keyPrefix3, 3); - RDBBatchOperation rdbBatchOperation = new RDBBatchOperation(); + RDBBatchOperation rdbBatchOperation = RDBBatchOperation.newAtomicOperation(); for (Map.Entry entry : prefixCounts.entrySet()) { ContainerKeyPrefix containerKeyPrefix = ContainerKeyPrefix.get( containerId, entry.getKey(), 0); @@ -192,7 +192,7 @@ public void testBatchStoreContainerKeyMapping() throws Exception { public void testStoreContainerKeyCount() throws Exception { long containerId = 1L; long nextContainerId = 2L; - RDBBatchOperation rdbBatchOperation = new RDBBatchOperation(); + RDBBatchOperation rdbBatchOperation = RDBBatchOperation.newAtomicOperation(); reconContainerMetadataManager .batchStoreContainerKeyCounts(rdbBatchOperation, containerId, 2L); reconContainerMetadataManager @@ -204,7 +204,7 @@ public void testStoreContainerKeyCount() throws Exception { assertEquals(3, reconContainerMetadataManager.getKeyCountForContainer(nextContainerId)); - RDBBatchOperation rdbBatchOperation2 = new RDBBatchOperation(); + RDBBatchOperation rdbBatchOperation2 = RDBBatchOperation.newAtomicOperation(); reconContainerMetadataManager .batchStoreContainerKeyCounts(rdbBatchOperation2, containerId, 20L); reconContainerMetadataManager.commitBatchOperation(rdbBatchOperation2); @@ -216,7 +216,7 @@ public void testStoreContainerKeyCount() throws Exception { public void testGetKeyCountForContainer() throws Exception { long containerId = 1L; long nextContainerId = 2L; - RDBBatchOperation rdbBatchOperation = new RDBBatchOperation(); + RDBBatchOperation rdbBatchOperation = RDBBatchOperation.newAtomicOperation(); reconContainerMetadataManager .batchStoreContainerKeyCounts(rdbBatchOperation, containerId, 2L); reconContainerMetadataManager @@ -236,7 +236,7 @@ public void testGetKeyCountForContainer() throws Exception { public void testDoesContainerExists() throws Exception { long containerId = 1L; long nextContainerId = 2L; - RDBBatchOperation rdbBatchOperation = new RDBBatchOperation(); + RDBBatchOperation rdbBatchOperation = RDBBatchOperation.newAtomicOperation(); reconContainerMetadataManager .batchStoreContainerKeyCounts(rdbBatchOperation, containerId, 2L); reconContainerMetadataManager @@ -254,7 +254,7 @@ public void testDoesContainerExists() throws Exception { public void testGetCountForContainerKeyPrefix() throws Exception { long containerId = System.currentTimeMillis(); - RDBBatchOperation rdbBatchOperation = new RDBBatchOperation(); + RDBBatchOperation rdbBatchOperation = RDBBatchOperation.newAtomicOperation(); reconContainerMetadataManager.batchStoreContainerKeyMapping( rdbBatchOperation, ContainerKeyPrefix.get(containerId, keyPrefix1), 2); reconContainerMetadataManager.commitBatchOperation(rdbBatchOperation); @@ -413,7 +413,7 @@ public void testDeleteContainerMapping() throws Exception { } }); - RDBBatchOperation rdbBatchOperation = new RDBBatchOperation(); + RDBBatchOperation rdbBatchOperation = RDBBatchOperation.newAtomicOperation(); ContainerKeyPrefix prefixForDelete = ContainerKeyPrefix.get( containerId, keyPrefix2, 0); reconContainerMetadataManager diff --git a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/spi/impl/TestReconNamespaceSummaryManagerImpl.java b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/spi/impl/TestReconNamespaceSummaryManagerImpl.java index b4e62e9d03c5..339e4160c317 100644 --- a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/spi/impl/TestReconNamespaceSummaryManagerImpl.java +++ b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/spi/impl/TestReconNamespaceSummaryManagerImpl.java @@ -114,7 +114,7 @@ private void putThreeNSMetadata() throws IOException { hmap.put(1L, new NSSummary(1, 2, 2 * 3, testBucket, TEST_CHILD_DIR, "dir1", -1)); hmap.put(2L, new NSSummary(3, 4, 4 * 3, testBucket, TEST_CHILD_DIR, "dir2", -1)); hmap.put(3L, new NSSummary(5, 6, 6 * 3, testBucket, TEST_CHILD_DIR, "dir3", -1)); - RDBBatchOperation rdbBatchOperation = new RDBBatchOperation(); + RDBBatchOperation rdbBatchOperation = RDBBatchOperation.newAtomicOperation(); for (Map.Entry entry: hmap.entrySet()) { reconNamespaceSummaryManager.batchStoreNSSummaries(rdbBatchOperation, (long)entry.getKey(), (NSSummary)entry.getValue()); diff --git a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/AbstractNSSummaryTaskTest.java b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/AbstractNSSummaryTaskTest.java index fe461e18857c..833ca449b43d 100644 --- a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/AbstractNSSummaryTaskTest.java +++ b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/AbstractNSSummaryTaskTest.java @@ -159,7 +159,7 @@ public List commonSetUpTestReprocess(Runnable reprocessTask, long... List result = new ArrayList<>(); NSSummary staleNSSummary = new NSSummary(); - RDBBatchOperation rdbBatchOperation = new RDBBatchOperation(); + RDBBatchOperation rdbBatchOperation = RDBBatchOperation.newAtomicOperation(); getReconNamespaceSummaryManager().batchStoreNSSummaries(rdbBatchOperation, -1L, staleNSSummary); getReconNamespaceSummaryManager().commitBatchOperation(rdbBatchOperation); From dc5161075866c7cbe8fb4114ff3ec908ca3bdae0 Mon Sep 17 00:00:00 2001 From: "Doroszlai, Attila" Date: Mon, 22 Dec 2025 19:22:26 +0100 Subject: [PATCH 2/4] fix checkstyle --- .../java/org/apache/hadoop/hdds/utils/db/RDBBatchOperation.java | 1 - 1 file changed, 1 deletion(-) diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBBatchOperation.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBBatchOperation.java index 264e5cd70bc6..b6bc7cf0c21e 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBBatchOperation.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBBatchOperation.java @@ -17,7 +17,6 @@ package org.apache.hadoop.hdds.utils.db; -import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.hdds.utils.db.RocksDatabase.ColumnFamily; import org.apache.hadoop.hdds.utils.db.managed.ManagedWriteBatch; import org.apache.hadoop.hdds.utils.db.managed.ManagedWriteOptions; From 08d572edee66a2d83d8c8d07cad3dc8743690650 Mon Sep 17 00:00:00 2001 From: "Doroszlai, Attila" Date: Mon, 22 Dec 2025 19:23:37 +0100 Subject: [PATCH 3/4] fix pmd --- .../apache/hadoop/hdds/utils/db/AtomicRDBBatchOperation.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/AtomicRDBBatchOperation.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/AtomicRDBBatchOperation.java index c0027730d914..28d17d3b5679 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/AtomicRDBBatchOperation.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/AtomicRDBBatchOperation.java @@ -665,18 +665,22 @@ public void close() { opCache.clear(); } + @Override void delete(ColumnFamily family, byte[] key) { opCache.delete(family, key); } + @Override void put(ColumnFamily family, CodecBuffer key, CodecBuffer value) { opCache.put(family, key, value); } + @Override void put(ColumnFamily family, byte[] key, byte[] value) { opCache.put(family, key, value); } + @Override void deleteRange(ColumnFamily family, byte[] startKey, byte[] endKey) { opCache.deleteRange(family, startKey, endKey); } From f6f14114c1ff65bf76569878cf46664331e1d5fa Mon Sep 17 00:00:00 2001 From: "Doroszlai, Attila" Date: Mon, 22 Dec 2025 19:24:15 +0100 Subject: [PATCH 4/4] fix findbugs --- .../ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java index 76f512cbc506..ae0da3a935c9 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java @@ -74,7 +74,6 @@ import org.apache.hadoop.hdds.utils.db.RDBBatchOperation; import org.apache.hadoop.hdds.utils.db.RDBStore; import org.apache.hadoop.hdds.utils.db.RocksDBCheckpoint; -import org.apache.hadoop.hdds.utils.db.RocksDatabase; import org.apache.hadoop.hdds.utils.db.Table; import org.apache.hadoop.hdds.utils.db.TableIterator; import org.apache.hadoop.hdds.utils.db.managed.ManagedWriteBatch; @@ -633,7 +632,6 @@ ImmutablePair innerGetAndApplyDeltaUpdatesFromOM(long fromSequenc latestSequenceNumberOfOM = dbUpdates.getLatestSequenceNumber(); RDBStore rocksDBStore = (RDBStore) omMetadataManager.getStore(); - final RocksDatabase rocksDB = rocksDBStore.getDb(); numUpdates = dbUpdates.getData().size(); if (numUpdates > 0) { metrics.incrNumUpdatesInDeltaTotal(numUpdates);