diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeTable.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeTable.java index 5b39147f3e2..4d78d1838d4 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeTable.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeTable.java @@ -69,12 +69,13 @@ public void deleteRange(KEY beginKey, KEY endKey) throws RocksDatabaseException, } @Override - public void deleteWithBatch(BatchOperation batch, KEY key) throws CodecException { + public void deleteWithBatch(BatchOperation batch, KEY key) throws CodecException, RocksDatabaseException { table.deleteWithBatch(batch, key); } @Override - public void deleteRangeWithBatch(BatchOperation batch, KEY beginKey, KEY endKey) throws CodecException { + public void deleteRangeWithBatch(BatchOperation batch, KEY beginKey, KEY endKey) + throws CodecException, RocksDatabaseException { table.deleteRangeWithBatch(batch, beginKey, endKey); } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/SchemaOneDeletedBlocksTable.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/SchemaOneDeletedBlocksTable.java index 913d6e30d1a..486d45adfa2 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/SchemaOneDeletedBlocksTable.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/SchemaOneDeletedBlocksTable.java @@ -69,7 +69,7 @@ public void delete(String key) throws RocksDatabaseException, CodecException { } @Override - public void deleteWithBatch(BatchOperation batch, String key) throws CodecException { + public void deleteWithBatch(BatchOperation batch, String key) throws CodecException, RocksDatabaseException { super.deleteWithBatch(batch, prefix(key)); } diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/metadata/DBTransactionBuffer.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/metadata/DBTransactionBuffer.java index 58dacfab621..b460c832478 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/metadata/DBTransactionBuffer.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/metadata/DBTransactionBuffer.java @@ -17,6 +17,7 @@ package org.apache.hadoop.hdds.scm.metadata; +import java.io.Closeable; import org.apache.hadoop.hdds.utils.db.CodecException; import org.apache.hadoop.hdds.utils.db.RocksDatabaseException; import org.apache.hadoop.hdds.utils.db.Table; @@ -24,13 +25,11 @@ /** * DB transaction that abstracts the updates to the underlying datastore. */ -public interface DBTransactionBuffer { +public interface DBTransactionBuffer extends Closeable { void addToBuffer(Table table, KEY key, VALUE value) throws RocksDatabaseException, CodecException; void removeFromBuffer(Table table, KEY key) throws RocksDatabaseException, CodecException; - - void close() throws RocksDatabaseException; } diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/AtomicRDBBatchOperation.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/AtomicRDBBatchOperation.java new file mode 100644 index 00000000000..28d17d3b567 --- /dev/null +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/AtomicRDBBatchOperation.java @@ -0,0 +1,687 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.utils.db; + +import static org.apache.hadoop.hdds.StringUtils.bytes2String; + +import com.google.common.base.Preconditions; +import com.google.common.primitives.UnsignedBytes; +import java.io.Closeable; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Supplier; +import java.util.stream.Collectors; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.hadoop.hdds.utils.IOUtils; +import org.apache.hadoop.hdds.utils.db.RocksDatabase.ColumnFamily; +import org.apache.hadoop.hdds.utils.db.managed.ManagedWriteBatch; +import org.apache.hadoop.hdds.utils.db.managed.ManagedWriteOptions; +import org.apache.ratis.util.TraditionalBinaryPrefix; +import org.apache.ratis.util.UncheckedAutoCloseable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Batch operation implementation for rocks db. + * Note that a {@link AtomicRDBBatchOperation} object only for one batch. + * Also, this class is not threadsafe. + */ +public class AtomicRDBBatchOperation extends RDBBatchOperation { + static final Logger LOG = LoggerFactory.getLogger(AtomicRDBBatchOperation.class); + + private static final AtomicInteger BATCH_COUNT = new AtomicInteger(); + + private final String name = "Batch-" + BATCH_COUNT.getAndIncrement(); + + private final ManagedWriteBatch writeBatch; + + private final OpCache opCache = new OpCache(); + + private enum Op { DELETE, PUT, DELETE_RANGE } + + private static void debug(Supplier message) { + if (LOG.isTraceEnabled()) { + LOG.trace("\n{}", message.get()); + } + } + + private static String byteSize2String(long length) { + return TraditionalBinaryPrefix.long2String(length, "B", 2); + } + + private static String countSize2String(int count, long size) { + return count + " (" + byteSize2String(size) + ")"; + } + + /** + * The key type of {@link AtomicRDBBatchOperation.OpCache.FamilyCache#opsKeys}. + * To implement {@link #equals(Object)} and {@link #hashCode()} + * based on the contents of the bytes. + */ + static final class Bytes implements Comparable { + private final byte[] array; + private final CodecBuffer buffer; + /** Cache the hash value. */ + private final int hash; + + Bytes(CodecBuffer buffer) { + this.array = null; + this.buffer = Objects.requireNonNull(buffer, "buffer == null"); + this.hash = buffer.asReadOnlyByteBuffer().hashCode(); + } + + Bytes(byte[] array) { + this.array = array; + this.buffer = null; + this.hash = ByteBuffer.wrap(array).hashCode(); + } + + ByteBuffer asReadOnlyByteBuffer() { + return buffer.asReadOnlyByteBuffer(); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } else if (!(obj instanceof Bytes)) { + return false; + } + final Bytes that = (Bytes) obj; + if (this.hash != that.hash) { + return false; + } + final ByteBuffer thisBuf = this.array != null ? + ByteBuffer.wrap(this.array) : this.asReadOnlyByteBuffer(); + final ByteBuffer thatBuf = that.array != null ? + ByteBuffer.wrap(that.array) : that.asReadOnlyByteBuffer(); + return thisBuf.equals(thatBuf); + } + + @Override + public int hashCode() { + return hash; + } + + @Override + public String toString() { + return array != null ? bytes2String(array) + : bytes2String(asReadOnlyByteBuffer()); + } + + // This method mimics the ByteWiseComparator in RocksDB. + @Override + public int compareTo(AtomicRDBBatchOperation.Bytes that) { + final ByteBuffer thisBuf = this.array != null ? + ByteBuffer.wrap(this.array) : this.asReadOnlyByteBuffer(); + final ByteBuffer thatBuf = that.array != null ? + ByteBuffer.wrap(that.array) : that.asReadOnlyByteBuffer(); + + for (int i = 0; i < Math.min(thisBuf.remaining(), thatBuf.remaining()); i++) { + int cmp = UnsignedBytes.compare(thisBuf.get(i), thatBuf.get(i)); + if (cmp != 0) { + return cmp; + } + } + return thisBuf.remaining() - thatBuf.remaining(); + } + } + + private abstract class Operation implements Closeable { + private Bytes keyBytes; + + private Operation(Bytes keyBytes) { + this.keyBytes = keyBytes; + } + + abstract void apply(ColumnFamily family, ManagedWriteBatch batch) throws RocksDatabaseException; + + abstract int keyLen(); + + abstract int valLen(); + + Bytes getKey() { + return keyBytes; + } + + int totalLength() { + return keyLen() + valLen(); + } + + abstract Op getOpType(); + + @Override + public void close() { + } + } + + /** + * Delete operation to be applied to a {@link ColumnFamily} batch. + */ + private final class DeleteOperation extends Operation { + private final byte[] key; + + private DeleteOperation(byte[] key, Bytes keyBytes) { + super(Objects.requireNonNull(keyBytes, "keyBytes == null")); + this.key = Objects.requireNonNull(key, "key == null"); + } + + @Override + public void apply(ColumnFamily family, ManagedWriteBatch batch) throws RocksDatabaseException { + family.batchDelete(batch, this.key); + } + + @Override + public int keyLen() { + return key.length; + } + + @Override + public int valLen() { + return 0; + } + + @Override + public Op getOpType() { + return Op.DELETE; + } + } + + /** + * Put operation to be applied to a {@link ColumnFamily} batch using the CodecBuffer api. + */ + private final class CodecBufferPutOperation extends Operation { + private final CodecBuffer key; + private final CodecBuffer value; + private final AtomicBoolean closed = new AtomicBoolean(false); + + private CodecBufferPutOperation(CodecBuffer key, CodecBuffer value, Bytes keyBytes) { + super(keyBytes); + this.key = key; + this.value = value; + } + + @Override + public void apply(ColumnFamily family, ManagedWriteBatch batch) throws RocksDatabaseException { + family.batchPut(batch, key.asReadOnlyByteBuffer(), value.asReadOnlyByteBuffer()); + } + + @Override + public int keyLen() { + return key.readableBytes(); + } + + @Override + public int valLen() { + return value.readableBytes(); + } + + @Override + public Op getOpType() { + return Op.PUT; + } + + @Override + public void close() { + if (closed.compareAndSet(false, true)) { + key.release(); + value.release(); + } + super.close(); + } + } + + /** + * Put operation to be applied to a {@link ColumnFamily} batch using the byte array api. + */ + private final class ByteArrayPutOperation extends Operation { + private final byte[] key; + private final byte[] value; + + private ByteArrayPutOperation(byte[] key, byte[] value, Bytes keyBytes) { + super(Objects.requireNonNull(keyBytes)); + this.key = Objects.requireNonNull(key, "key == null"); + this.value = Objects.requireNonNull(value, "value == null"); + } + + @Override + public void apply(ColumnFamily family, ManagedWriteBatch batch) throws RocksDatabaseException { + family.batchPut(batch, key, value); + } + + @Override + public int keyLen() { + return key.length; + } + + @Override + public int valLen() { + return value.length; + } + + @Override + public Op getOpType() { + return Op.PUT; + } + } + + /** + * Delete range operation to be applied to a {@link ColumnFamily} batch. + */ + private final class DeleteRangeOperation extends Operation { + private final byte[] startKey; + private final byte[] endKey; + + private DeleteRangeOperation(byte[] startKey, byte[] endKey) { + super(null); + this.startKey = Objects.requireNonNull(startKey, "startKey == null"); + this.endKey = Objects.requireNonNull(endKey, "endKey == null"); + } + + @Override + public void apply(ColumnFamily family, ManagedWriteBatch batch) throws RocksDatabaseException { + family.batchDeleteRange(batch, startKey, endKey); + } + + @Override + public int keyLen() { + return startKey.length + endKey.length; + } + + @Override + public int valLen() { + return 0; + } + + @Override + public Op getOpType() { + return Op.DELETE_RANGE; + } + } + + /** Cache and deduplicate db ops (put/delete). */ + private class OpCache { + /** A (family name -> {@link FamilyCache}) map. */ + private final Map name2cache = new HashMap<>(); + + /** A cache for a {@link ColumnFamily}. */ + private class FamilyCache { + private final ColumnFamily family; + /** + * A mapping of operation keys to their respective indices in {@code FamilyCache}. + * + * Key details: + * - Maintains a mapping of unique operation keys to their insertion or processing order. + * - Used internally to manage and sort operations during batch writes. + * - Facilitates filtering, overwriting, or deletion of operations based on their keys. + * + * Constraints: + * - Keys must be unique, represented using {@link Bytes}, to avoid collisions. + * - Each key is associated with a unique integer index to track insertion order. + * + * This field plays a critical role in managing the logical consistency and proper execution + * order of operations stored in the batch when interacting with a RocksDB-backed system. + */ + private final Map opsKeys = new HashMap<>(); + /** + * Maintains a mapping of unique operation indices to their corresponding {@code Operation} instances. + * + * This map serves as the primary container for recording operations in preparation for a batch write + * within a RocksDB-backed system. Each operation is referenced by an integer index, which determines + * its insertion order and ensures correct sequencing during batch execution. + * + * Key characteristics: + * - Stores operations of type {@code Operation}. + * - Uses a unique integer key (index) for mapping each operation. + * - Serves as an intermediary structure during batch preparation and execution. + * + * Usage context: + * - This map is managed as part of the batch-writing process, which involves organizing, + * filtering, and applying multiple operations in a single cohesive batch. + * - Operations stored in this map are expected to define specific actions (e.g., put, delete, + * delete range) and their associated data (e.g., keys, values). + */ + private final Map batchOps = new HashMap<>(); + private boolean isCommit; + + private long batchSize; + private long discardedSize; + private int discardedCount; + private int putCount; + private int delCount; + private int delRangeCount; + private AtomicInteger opIndex; + + FamilyCache(ColumnFamily family) { + this.family = family; + this.opIndex = new AtomicInteger(0); + } + + /** + * Prepares a batch write operation for a RocksDB-backed system. + * + * This method ensures the orderly execution of operations accumulated in the batch, + * respecting their respective types and order of insertion. + * + * Key functionalities: + * 1. Ensures that the batch is not already committed before proceeding. + * 2. Sorts all operations by their `opIndex` to maintain a consistent execution order. + * 3. Filters and adapts operations to account for any delete range operations that might + * affect other operations in the batch: + * - Operations with keys that fall within the range specified by a delete range operation + * are discarded. + * - Delete range operations are executed in their correct order. + * 4. Applies remaining operations to the write batch, ensuring proper filtering and execution. + * 5. Logs a summary of the batch execution for debugging purposes. + * + * Throws: + * - RocksDatabaseException if any error occurs while applying operations to the write batch. + * + * Prerequisites: + * - The method assumes that the operations are represented by `Operation` objects, each of which + * encapsulates the logic for its specific type. + * - Delete range operations must be represented by the `DeleteRangeOperation` class. + */ + void prepareBatchWrite() throws RocksDatabaseException { + Preconditions.checkState(!isCommit, "%s is already committed.", this); + isCommit = true; + // Sort Entries based on opIndex and flush the operation to the batch in the same order. + List ops = batchOps.entrySet().stream().sorted(Comparator.comparingInt(Map.Entry::getKey)) + .map(Map.Entry::getValue).collect(Collectors.toList()); + List> deleteRangeIndices = new ArrayList<>(); + int index = 0; + int prevIndex = -2; + for (Operation op : ops) { + if (Op.DELETE_RANGE == op.getOpType()) { + if (index - prevIndex > 1) { + deleteRangeIndices.add(new ArrayList<>()); + } + List continuousIndices = deleteRangeIndices.get(deleteRangeIndices.size() - 1); + continuousIndices.add(index); + prevIndex = index; + } + index++; + } + // This is to apply the last batch of entries after the last DeleteRangeOperation. + deleteRangeIndices.add(Collections.emptyList()); + int startIndex = 0; + for (List continuousDeleteRangeIndices : deleteRangeIndices) { + List deleteRangeOps = continuousDeleteRangeIndices.stream() + .map(i -> (DeleteRangeOperation)ops.get(i)) + .collect(Collectors.toList()); + List> deleteRangeOpsRanges = continuousDeleteRangeIndices.stream() + .map(i -> (DeleteRangeOperation)ops.get(i)) + .map(i -> Pair.of(new Bytes(i.startKey), new Bytes(i.endKey))) + .collect(Collectors.toList()); + int firstOpIndex = continuousDeleteRangeIndices.isEmpty() ? ops.size() : continuousDeleteRangeIndices.get(0); + + for (int i = startIndex; i < firstOpIndex; i++) { + Operation op = ops.get(i); + Bytes key = op.getKey(); + // Compare the key with the startKey and endKey of the delete range operation. Add to Batch if key + // doesn't fall [startKey, endKey) range. + boolean keyInRange = false; + Pair deleteRange = null; + for (Pair deleteRangeOp : deleteRangeOpsRanges) { + if (key.compareTo(deleteRangeOp.getLeft()) >= 0 && key.compareTo(deleteRangeOp.getRight()) < 0) { + keyInRange = true; + deleteRange = deleteRangeOp; + break; + } + } + if (!keyInRange) { + op.apply(family, writeBatch); + } else { + Pair finalDeleteRange = deleteRange; + debug(() -> String.format("Discarding Operation with Key: %s as it falls within the range of [%s, %s)", + bytes2String(key.asReadOnlyByteBuffer()), + bytes2String(finalDeleteRange.getKey().asReadOnlyByteBuffer()), + bytes2String(finalDeleteRange.getRight().asReadOnlyByteBuffer()))); + discardedCount++; + discardedSize += op.totalLength(); + } + } + for (DeleteRangeOperation deleteRangeOp : deleteRangeOps) { + // Apply the delete range operation to the batch. + deleteRangeOp.apply(family, writeBatch); + } + // Update the startIndex to start from the next operation after the delete range operation. + startIndex = firstOpIndex + continuousDeleteRangeIndices.size(); + } + debug(this::summary); + } + + private String summary() { + return String.format(" %s %s, #put=%s, #del=%s", this, + batchSizeDiscardedString(), putCount, delCount); + } + + void clear() { + final boolean warn = !isCommit && batchSize > 0; + String details = warn ? summary() : null; + + IOUtils.close(LOG, batchOps.values()); + batchOps.clear(); + + if (warn) { + LOG.warn("discarding changes {}", details); + } + } + + private void deleteIfExist(Bytes key, boolean removeFromIndexMap) { + // remove previous first in order to call release() + if (opsKeys.containsKey(key)) { + int previousIndex = removeFromIndexMap ? opsKeys.remove(key) : opsKeys.get(key); + final Operation previous = batchOps.remove(previousIndex); + previous.close(); + discardedSize += previous.totalLength(); + discardedCount++; + debug(() -> String.format("%s overwriting a previous %s[valLen => %s]", this, previous.getOpType(), + previous.valLen())); + } + } + + void overWriteOpIfExist(Bytes key, Operation operation) { + Preconditions.checkState(!isCommit, "%s is already committed.", this); + deleteIfExist(key, true); + batchSize += operation.totalLength(); + int newIndex = opIndex.getAndIncrement(); + final Integer overwritten = opsKeys.put(key, newIndex); + batchOps.put(newIndex, operation); + Preconditions.checkState(overwritten == null || !batchOps.containsKey(overwritten)); + debug(() -> String.format("%s %s, %s; key=%s", this, + Op.DELETE == operation.getOpType() ? delString(operation.totalLength()) : putString(operation.keyLen(), + operation.valLen()), + batchSizeDiscardedString(), key)); + } + + void put(CodecBuffer key, CodecBuffer value) { + putCount++; + + // always release the key with the value + Bytes keyBytes = new Bytes(key); + overWriteOpIfExist(keyBytes, new CodecBufferPutOperation(key, value, keyBytes)); + } + + void put(byte[] key, byte[] value) { + putCount++; + Bytes keyBytes = new Bytes(key); + overWriteOpIfExist(keyBytes, new ByteArrayPutOperation(key, value, keyBytes)); + } + + void delete(byte[] key) { + delCount++; + Bytes keyBytes = new Bytes(key); + overWriteOpIfExist(keyBytes, new DeleteOperation(key, keyBytes)); + } + + void deleteRange(byte[] startKey, byte[] endKey) { + delRangeCount++; + batchOps.put(opIndex.getAndIncrement(), new DeleteRangeOperation(startKey, endKey)); + } + + String putString(int keySize, int valueSize) { + return String.format("put(key: %s, value: %s), #put=%s", + byteSize2String(keySize), byteSize2String(valueSize), putCount); + } + + String delString(int keySize) { + return String.format("del(key: %s), #del=%s", + byteSize2String(keySize), delCount); + } + + String batchSizeDiscardedString() { + return String.format("batchSize=%s, discarded: %s", + byteSize2String(batchSize), + countSize2String(discardedCount, discardedSize)); + } + + @Override + public String toString() { + return name + ": " + family.getName(); + } + } + + void put(ColumnFamily f, CodecBuffer key, CodecBuffer value) { + name2cache.computeIfAbsent(f.getName(), k -> new FamilyCache(f)) + .put(key, value); + } + + void put(ColumnFamily f, byte[] key, byte[] value) { + name2cache.computeIfAbsent(f.getName(), k -> new FamilyCache(f)) + .put(key, value); + } + + void delete(ColumnFamily family, byte[] key) { + name2cache.computeIfAbsent(family.getName(), k -> new FamilyCache(family)) + .delete(key); + } + + void deleteRange(ColumnFamily family, byte[] startKey, byte[] endKey) { + name2cache.computeIfAbsent(family.getName(), k -> new FamilyCache(family)) + .deleteRange(startKey, endKey); + } + + /** Prepare batch write for the entire cache. */ + UncheckedAutoCloseable prepareBatchWrite() throws RocksDatabaseException { + for (Map.Entry e : name2cache.entrySet()) { + e.getValue().prepareBatchWrite(); + } + return this::clear; + } + + void clear() { + for (Map.Entry e : name2cache.entrySet()) { + e.getValue().clear(); + } + name2cache.clear(); + } + + String getCommitString() { + int putCount = 0; + int delCount = 0; + int opSize = 0; + int discardedCount = 0; + int discardedSize = 0; + int delRangeCount = 0; + + for (FamilyCache f : name2cache.values()) { + putCount += f.putCount; + delCount += f.delCount; + opSize += f.batchSize; + discardedCount += f.discardedCount; + discardedSize += f.discardedSize; + delRangeCount += f.delRangeCount; + } + + final int opCount = putCount + delCount; + return String.format( + "#put=%s, #del=%s, #delRange=%s, batchSize: %s, discarded: %s, committed: %s", + putCount, delCount, delRangeCount, + countSize2String(opCount, opSize), + countSize2String(discardedCount, discardedSize), + countSize2String(opCount - discardedCount, opSize - discardedSize)); + } + } + + public AtomicRDBBatchOperation() { + this(new ManagedWriteBatch()); + } + + public AtomicRDBBatchOperation(ManagedWriteBatch writeBatch) { + this.writeBatch = writeBatch; + } + + @Override + public String toString() { + return name; + } + + @Override + void commit(RocksDatabase db) throws RocksDatabaseException { + debug(() -> String.format("%s: commit %s", + name, opCache.getCommitString())); + try (UncheckedAutoCloseable ignored = opCache.prepareBatchWrite()) { + db.batchWrite(writeBatch); + } + } + + @Override + void commit(RocksDatabase db, ManagedWriteOptions writeOptions) throws RocksDatabaseException { + debug(() -> String.format("%s: commit-with-writeOptions %s", + name, opCache.getCommitString())); + try (UncheckedAutoCloseable ignored = opCache.prepareBatchWrite()) { + db.batchWrite(writeBatch, writeOptions); + } + } + + @Override + public void close() { + debug(() -> String.format("%s: close", name)); + writeBatch.close(); + opCache.clear(); + } + + @Override + void delete(ColumnFamily family, byte[] key) { + opCache.delete(family, key); + } + + @Override + void put(ColumnFamily family, CodecBuffer key, CodecBuffer value) { + opCache.put(family, key, value); + } + + @Override + void put(ColumnFamily family, byte[] key, byte[] value) { + opCache.put(family, key, value); + } + + @Override + void deleteRange(ColumnFamily family, byte[] startKey, byte[] endKey) { + opCache.deleteRange(family, startKey, endKey); + } +} diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/BatchOperation.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/BatchOperation.java index 7233422eeff..26c5a2a761b 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/BatchOperation.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/BatchOperation.java @@ -17,11 +17,11 @@ package org.apache.hadoop.hdds.utils.db; +import java.io.Closeable; + /** * Class represents a batch operation, collects multiple db operation. */ -public interface BatchOperation extends AutoCloseable { +public interface BatchOperation extends Closeable { - @Override - void close(); } diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/BatchOperationHandler.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/BatchOperationHandler.java index 179247f8937..4fc93bf9ed6 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/BatchOperationHandler.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/BatchOperationHandler.java @@ -17,6 +17,8 @@ package org.apache.hadoop.hdds.utils.db; +import java.io.IOException; + /** * Create and commit batch operation for one DB. */ @@ -36,5 +38,5 @@ public interface BatchOperationHandler { * * @param operation which contains all the required batch operation. */ - void commitBatchOperation(BatchOperation operation) throws RocksDatabaseException; + void commitBatchOperation(BatchOperation operation) throws IOException; } diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBBatchOperation.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBBatchOperation.java index 49693bd2967..b6bc7cf0c21 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBBatchOperation.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBBatchOperation.java @@ -17,669 +17,44 @@ package org.apache.hadoop.hdds.utils.db; -import static org.apache.hadoop.hdds.StringUtils.bytes2String; - -import com.google.common.base.Preconditions; -import com.google.common.primitives.UnsignedBytes; -import java.io.Closeable; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Comparator; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.Supplier; -import java.util.stream.Collectors; -import org.apache.commons.lang3.tuple.Pair; -import org.apache.hadoop.hdds.utils.IOUtils; import org.apache.hadoop.hdds.utils.db.RocksDatabase.ColumnFamily; import org.apache.hadoop.hdds.utils.db.managed.ManagedWriteBatch; import org.apache.hadoop.hdds.utils.db.managed.ManagedWriteOptions; -import org.apache.ratis.util.TraditionalBinaryPrefix; -import org.apache.ratis.util.UncheckedAutoCloseable; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** - * Batch operation implementation for rocks db. - * Note that a {@link RDBBatchOperation} object only for one batch. - * Also, this class is not threadsafe. + * Interface for performing batch operations on a RocksDB database. + * + * Provides methods to perform operations on a specific column family within + * a database, such as inserting or deleting a key-value pair or deleting + * a range of keys. These batch operations are designed for use in scenarios + * where multiple database modifications need to be grouped together, ensuring + * efficiency and atomicity. + * + * This interface extends {@link BatchOperation}, inheriting functionality for + * managing batch sizes and allowing cleanup of batch resources via the + * {@link #close()} method. */ -public final class RDBBatchOperation implements BatchOperation { - static final Logger LOG = LoggerFactory.getLogger(RDBBatchOperation.class); +public abstract class RDBBatchOperation implements BatchOperation { + + abstract void delete(ColumnFamily family, byte[] key) throws RocksDatabaseException; - private static final AtomicInteger BATCH_COUNT = new AtomicInteger(); + abstract void put(ColumnFamily family, CodecBuffer key, CodecBuffer value) throws RocksDatabaseException; - private final String name = "Batch-" + BATCH_COUNT.getAndIncrement(); + abstract void put(ColumnFamily family, byte[] key, byte[] value) throws RocksDatabaseException; - private final ManagedWriteBatch writeBatch; + abstract void deleteRange(ColumnFamily family, byte[] startKey, byte[] endKey) throws RocksDatabaseException; - private final OpCache opCache = new OpCache(); + abstract void commit(RocksDatabase db) throws RocksDatabaseException; - private enum Op { DELETE, PUT, DELETE_RANGE } + abstract void commit(RocksDatabase db, ManagedWriteOptions writeOptions) throws RocksDatabaseException; + // TODO: Remove this once recon components code implements BatchOperationHandler and make use of mocked batch + // operation. public static RDBBatchOperation newAtomicOperation() { - return newAtomicOperation(new ManagedWriteBatch()); + return new AtomicRDBBatchOperation(); } public static RDBBatchOperation newAtomicOperation(ManagedWriteBatch writeBatch) { - return new RDBBatchOperation(writeBatch); - } - - private static void debug(Supplier message) { - if (LOG.isTraceEnabled()) { - LOG.trace("\n{}", message.get()); - } - } - - private static String byteSize2String(long length) { - return TraditionalBinaryPrefix.long2String(length, "B", 2); - } - - private static String countSize2String(int count, long size) { - return count + " (" + byteSize2String(size) + ")"; - } - - /** - * The key type of {@link RDBBatchOperation.OpCache.FamilyCache#opsKeys}. - * To implement {@link #equals(Object)} and {@link #hashCode()} - * based on the contents of the bytes. - */ - static final class Bytes implements Comparable { - private final byte[] array; - private final CodecBuffer buffer; - /** Cache the hash value. */ - private final int hash; - - Bytes(CodecBuffer buffer) { - this.array = null; - this.buffer = Objects.requireNonNull(buffer, "buffer == null"); - this.hash = buffer.asReadOnlyByteBuffer().hashCode(); - } - - Bytes(byte[] array) { - this.array = array; - this.buffer = null; - this.hash = ByteBuffer.wrap(array).hashCode(); - } - - ByteBuffer asReadOnlyByteBuffer() { - return buffer.asReadOnlyByteBuffer(); - } - - @Override - public boolean equals(Object obj) { - if (this == obj) { - return true; - } else if (!(obj instanceof Bytes)) { - return false; - } - final Bytes that = (Bytes) obj; - if (this.hash != that.hash) { - return false; - } - final ByteBuffer thisBuf = this.array != null ? - ByteBuffer.wrap(this.array) : this.asReadOnlyByteBuffer(); - final ByteBuffer thatBuf = that.array != null ? - ByteBuffer.wrap(that.array) : that.asReadOnlyByteBuffer(); - return thisBuf.equals(thatBuf); - } - - @Override - public int hashCode() { - return hash; - } - - @Override - public String toString() { - return array != null ? bytes2String(array) - : bytes2String(asReadOnlyByteBuffer()); - } - - // This method mimics the ByteWiseComparator in RocksDB. - @Override - public int compareTo(RDBBatchOperation.Bytes that) { - final ByteBuffer thisBuf = this.array != null ? - ByteBuffer.wrap(this.array) : this.asReadOnlyByteBuffer(); - final ByteBuffer thatBuf = that.array != null ? - ByteBuffer.wrap(that.array) : that.asReadOnlyByteBuffer(); - - for (int i = 0; i < Math.min(thisBuf.remaining(), thatBuf.remaining()); i++) { - int cmp = UnsignedBytes.compare(thisBuf.get(i), thatBuf.get(i)); - if (cmp != 0) { - return cmp; - } - } - return thisBuf.remaining() - thatBuf.remaining(); - } - } - - private abstract class Operation implements Closeable { - private Bytes keyBytes; - - private Operation(Bytes keyBytes) { - this.keyBytes = keyBytes; - } - - abstract void apply(ColumnFamily family, ManagedWriteBatch batch) throws RocksDatabaseException; - - abstract int keyLen(); - - abstract int valLen(); - - Bytes getKey() { - return keyBytes; - } - - int totalLength() { - return keyLen() + valLen(); - } - - abstract Op getOpType(); - - @Override - public void close() { - } - } - - /** - * Delete operation to be applied to a {@link ColumnFamily} batch. - */ - private final class DeleteOperation extends Operation { - private final byte[] key; - - private DeleteOperation(byte[] key, Bytes keyBytes) { - super(Objects.requireNonNull(keyBytes, "keyBytes == null")); - this.key = Objects.requireNonNull(key, "key == null"); - } - - @Override - public void apply(ColumnFamily family, ManagedWriteBatch batch) throws RocksDatabaseException { - family.batchDelete(batch, this.key); - } - - @Override - public int keyLen() { - return key.length; - } - - @Override - public int valLen() { - return 0; - } - - @Override - public Op getOpType() { - return Op.DELETE; - } - } - - /** - * Put operation to be applied to a {@link ColumnFamily} batch using the CodecBuffer api. - */ - private final class CodecBufferPutOperation extends Operation { - private final CodecBuffer key; - private final CodecBuffer value; - private final AtomicBoolean closed = new AtomicBoolean(false); - - private CodecBufferPutOperation(CodecBuffer key, CodecBuffer value, Bytes keyBytes) { - super(keyBytes); - this.key = key; - this.value = value; - } - - @Override - public void apply(ColumnFamily family, ManagedWriteBatch batch) throws RocksDatabaseException { - family.batchPut(batch, key.asReadOnlyByteBuffer(), value.asReadOnlyByteBuffer()); - } - - @Override - public int keyLen() { - return key.readableBytes(); - } - - @Override - public int valLen() { - return value.readableBytes(); - } - - @Override - public Op getOpType() { - return Op.PUT; - } - - @Override - public void close() { - if (closed.compareAndSet(false, true)) { - key.release(); - value.release(); - } - super.close(); - } - } - - /** - * Put operation to be applied to a {@link ColumnFamily} batch using the byte array api. - */ - private final class ByteArrayPutOperation extends Operation { - private final byte[] key; - private final byte[] value; - - private ByteArrayPutOperation(byte[] key, byte[] value, Bytes keyBytes) { - super(Objects.requireNonNull(keyBytes)); - this.key = Objects.requireNonNull(key, "key == null"); - this.value = Objects.requireNonNull(value, "value == null"); - } - - @Override - public void apply(ColumnFamily family, ManagedWriteBatch batch) throws RocksDatabaseException { - family.batchPut(batch, key, value); - } - - @Override - public int keyLen() { - return key.length; - } - - @Override - public int valLen() { - return value.length; - } - - @Override - public Op getOpType() { - return Op.PUT; - } - } - - /** - * Delete range operation to be applied to a {@link ColumnFamily} batch. - */ - private final class DeleteRangeOperation extends Operation { - private final byte[] startKey; - private final byte[] endKey; - - private DeleteRangeOperation(byte[] startKey, byte[] endKey) { - super(null); - this.startKey = Objects.requireNonNull(startKey, "startKey == null"); - this.endKey = Objects.requireNonNull(endKey, "endKey == null"); - } - - @Override - public void apply(ColumnFamily family, ManagedWriteBatch batch) throws RocksDatabaseException { - family.batchDeleteRange(batch, startKey, endKey); - } - - @Override - public int keyLen() { - return startKey.length + endKey.length; - } - - @Override - public int valLen() { - return 0; - } - - @Override - public Op getOpType() { - return Op.DELETE_RANGE; - } - } - - /** Cache and deduplicate db ops (put/delete). */ - private class OpCache { - /** A (family name -> {@link FamilyCache}) map. */ - private final Map name2cache = new HashMap<>(); - - /** A cache for a {@link ColumnFamily}. */ - private class FamilyCache { - private final ColumnFamily family; - /** - * A mapping of operation keys to their respective indices in {@code FamilyCache}. - * - * Key details: - * - Maintains a mapping of unique operation keys to their insertion or processing order. - * - Used internally to manage and sort operations during batch writes. - * - Facilitates filtering, overwriting, or deletion of operations based on their keys. - * - * Constraints: - * - Keys must be unique, represented using {@link Bytes}, to avoid collisions. - * - Each key is associated with a unique integer index to track insertion order. - * - * This field plays a critical role in managing the logical consistency and proper execution - * order of operations stored in the batch when interacting with a RocksDB-backed system. - */ - private final Map opsKeys = new HashMap<>(); - /** - * Maintains a mapping of unique operation indices to their corresponding {@code Operation} instances. - * - * This map serves as the primary container for recording operations in preparation for a batch write - * within a RocksDB-backed system. Each operation is referenced by an integer index, which determines - * its insertion order and ensures correct sequencing during batch execution. - * - * Key characteristics: - * - Stores operations of type {@code Operation}. - * - Uses a unique integer key (index) for mapping each operation. - * - Serves as an intermediary structure during batch preparation and execution. - * - * Usage context: - * - This map is managed as part of the batch-writing process, which involves organizing, - * filtering, and applying multiple operations in a single cohesive batch. - * - Operations stored in this map are expected to define specific actions (e.g., put, delete, - * delete range) and their associated data (e.g., keys, values). - */ - private final Map batchOps = new HashMap<>(); - private boolean isCommit; - - private long batchSize; - private long discardedSize; - private int discardedCount; - private int putCount; - private int delCount; - private int delRangeCount; - private AtomicInteger opIndex; - - FamilyCache(ColumnFamily family) { - this.family = family; - this.opIndex = new AtomicInteger(0); - } - - /** - * Prepares a batch write operation for a RocksDB-backed system. - * - * This method ensures the orderly execution of operations accumulated in the batch, - * respecting their respective types and order of insertion. - * - * Key functionalities: - * 1. Ensures that the batch is not already committed before proceeding. - * 2. Sorts all operations by their `opIndex` to maintain a consistent execution order. - * 3. Filters and adapts operations to account for any delete range operations that might - * affect other operations in the batch: - * - Operations with keys that fall within the range specified by a delete range operation - * are discarded. - * - Delete range operations are executed in their correct order. - * 4. Applies remaining operations to the write batch, ensuring proper filtering and execution. - * 5. Logs a summary of the batch execution for debugging purposes. - * - * Throws: - * - RocksDatabaseException if any error occurs while applying operations to the write batch. - * - * Prerequisites: - * - The method assumes that the operations are represented by `Operation` objects, each of which - * encapsulates the logic for its specific type. - * - Delete range operations must be represented by the `DeleteRangeOperation` class. - */ - void prepareBatchWrite() throws RocksDatabaseException { - Preconditions.checkState(!isCommit, "%s is already committed.", this); - isCommit = true; - // Sort Entries based on opIndex and flush the operation to the batch in the same order. - List ops = batchOps.entrySet().stream().sorted(Comparator.comparingInt(Map.Entry::getKey)) - .map(Map.Entry::getValue).collect(Collectors.toList()); - List> deleteRangeIndices = new ArrayList<>(); - int index = 0; - int prevIndex = -2; - for (Operation op : ops) { - if (Op.DELETE_RANGE == op.getOpType()) { - if (index - prevIndex > 1) { - deleteRangeIndices.add(new ArrayList<>()); - } - List continuousIndices = deleteRangeIndices.get(deleteRangeIndices.size() - 1); - continuousIndices.add(index); - prevIndex = index; - } - index++; - } - // This is to apply the last batch of entries after the last DeleteRangeOperation. - deleteRangeIndices.add(Collections.emptyList()); - int startIndex = 0; - for (List continuousDeleteRangeIndices : deleteRangeIndices) { - List deleteRangeOps = continuousDeleteRangeIndices.stream() - .map(i -> (DeleteRangeOperation)ops.get(i)) - .collect(Collectors.toList()); - List> deleteRangeOpsRanges = continuousDeleteRangeIndices.stream() - .map(i -> (DeleteRangeOperation)ops.get(i)) - .map(i -> Pair.of(new Bytes(i.startKey), new Bytes(i.endKey))) - .collect(Collectors.toList()); - int firstOpIndex = continuousDeleteRangeIndices.isEmpty() ? ops.size() : continuousDeleteRangeIndices.get(0); - - for (int i = startIndex; i < firstOpIndex; i++) { - Operation op = ops.get(i); - Bytes key = op.getKey(); - // Compare the key with the startKey and endKey of the delete range operation. Add to Batch if key - // doesn't fall [startKey, endKey) range. - boolean keyInRange = false; - Pair deleteRange = null; - for (Pair deleteRangeOp : deleteRangeOpsRanges) { - if (key.compareTo(deleteRangeOp.getLeft()) >= 0 && key.compareTo(deleteRangeOp.getRight()) < 0) { - keyInRange = true; - deleteRange = deleteRangeOp; - break; - } - } - if (!keyInRange) { - op.apply(family, writeBatch); - } else { - Pair finalDeleteRange = deleteRange; - debug(() -> String.format("Discarding Operation with Key: %s as it falls within the range of [%s, %s)", - bytes2String(key.asReadOnlyByteBuffer()), - bytes2String(finalDeleteRange.getKey().asReadOnlyByteBuffer()), - bytes2String(finalDeleteRange.getRight().asReadOnlyByteBuffer()))); - discardedCount++; - discardedSize += op.totalLength(); - } - } - for (DeleteRangeOperation deleteRangeOp : deleteRangeOps) { - // Apply the delete range operation to the batch. - deleteRangeOp.apply(family, writeBatch); - } - // Update the startIndex to start from the next operation after the delete range operation. - startIndex = firstOpIndex + continuousDeleteRangeIndices.size(); - } - debug(this::summary); - } - - private String summary() { - return String.format(" %s %s, #put=%s, #del=%s", this, - batchSizeDiscardedString(), putCount, delCount); - } - - void clear() { - final boolean warn = !isCommit && batchSize > 0; - String details = warn ? summary() : null; - - IOUtils.close(LOG, batchOps.values()); - batchOps.clear(); - - if (warn) { - LOG.warn("discarding changes {}", details); - } - } - - private void deleteIfExist(Bytes key, boolean removeFromIndexMap) { - // remove previous first in order to call release() - if (opsKeys.containsKey(key)) { - int previousIndex = removeFromIndexMap ? opsKeys.remove(key) : opsKeys.get(key); - final Operation previous = batchOps.remove(previousIndex); - previous.close(); - discardedSize += previous.totalLength(); - discardedCount++; - debug(() -> String.format("%s overwriting a previous %s[valLen => %s]", this, previous.getOpType(), - previous.valLen())); - } - } - - void overWriteOpIfExist(Bytes key, Operation operation) { - Preconditions.checkState(!isCommit, "%s is already committed.", this); - deleteIfExist(key, true); - batchSize += operation.totalLength(); - int newIndex = opIndex.getAndIncrement(); - final Integer overwritten = opsKeys.put(key, newIndex); - batchOps.put(newIndex, operation); - Preconditions.checkState(overwritten == null || !batchOps.containsKey(overwritten)); - debug(() -> String.format("%s %s, %s; key=%s", this, - Op.DELETE == operation.getOpType() ? delString(operation.totalLength()) : putString(operation.keyLen(), - operation.valLen()), - batchSizeDiscardedString(), key)); - } - - void put(CodecBuffer key, CodecBuffer value) { - putCount++; - - // always release the key with the value - Bytes keyBytes = new Bytes(key); - overWriteOpIfExist(keyBytes, new CodecBufferPutOperation(key, value, keyBytes)); - } - - void put(byte[] key, byte[] value) { - putCount++; - Bytes keyBytes = new Bytes(key); - overWriteOpIfExist(keyBytes, new ByteArrayPutOperation(key, value, keyBytes)); - } - - void delete(byte[] key) { - delCount++; - Bytes keyBytes = new Bytes(key); - overWriteOpIfExist(keyBytes, new DeleteOperation(key, keyBytes)); - } - - void deleteRange(byte[] startKey, byte[] endKey) { - delRangeCount++; - batchOps.put(opIndex.getAndIncrement(), new DeleteRangeOperation(startKey, endKey)); - } - - String putString(int keySize, int valueSize) { - return String.format("put(key: %s, value: %s), #put=%s", - byteSize2String(keySize), byteSize2String(valueSize), putCount); - } - - String delString(int keySize) { - return String.format("del(key: %s), #del=%s", - byteSize2String(keySize), delCount); - } - - String batchSizeDiscardedString() { - return String.format("batchSize=%s, discarded: %s", - byteSize2String(batchSize), - countSize2String(discardedCount, discardedSize)); - } - - @Override - public String toString() { - return name + ": " + family.getName(); - } - } - - void put(ColumnFamily f, CodecBuffer key, CodecBuffer value) { - name2cache.computeIfAbsent(f.getName(), k -> new FamilyCache(f)) - .put(key, value); - } - - void put(ColumnFamily f, byte[] key, byte[] value) { - name2cache.computeIfAbsent(f.getName(), k -> new FamilyCache(f)) - .put(key, value); - } - - void delete(ColumnFamily family, byte[] key) { - name2cache.computeIfAbsent(family.getName(), k -> new FamilyCache(family)) - .delete(key); - } - - void deleteRange(ColumnFamily family, byte[] startKey, byte[] endKey) { - name2cache.computeIfAbsent(family.getName(), k -> new FamilyCache(family)) - .deleteRange(startKey, endKey); - } - - /** Prepare batch write for the entire cache. */ - UncheckedAutoCloseable prepareBatchWrite() throws RocksDatabaseException { - for (Map.Entry e : name2cache.entrySet()) { - e.getValue().prepareBatchWrite(); - } - return this::clear; - } - - void clear() { - for (Map.Entry e : name2cache.entrySet()) { - e.getValue().clear(); - } - name2cache.clear(); - } - - String getCommitString() { - int putCount = 0; - int delCount = 0; - int opSize = 0; - int discardedCount = 0; - int discardedSize = 0; - int delRangeCount = 0; - - for (FamilyCache f : name2cache.values()) { - putCount += f.putCount; - delCount += f.delCount; - opSize += f.batchSize; - discardedCount += f.discardedCount; - discardedSize += f.discardedSize; - delRangeCount += f.delRangeCount; - } - - final int opCount = putCount + delCount; - return String.format( - "#put=%s, #del=%s, #delRange=%s, batchSize: %s, discarded: %s, committed: %s", - putCount, delCount, delRangeCount, - countSize2String(opCount, opSize), - countSize2String(discardedCount, discardedSize), - countSize2String(opCount - discardedCount, opSize - discardedSize)); - } - } - - private RDBBatchOperation(ManagedWriteBatch writeBatch) { - this.writeBatch = writeBatch; - } - - @Override - public String toString() { - return name; - } - - public void commit(RocksDatabase db) throws RocksDatabaseException { - debug(() -> String.format("%s: commit %s", - name, opCache.getCommitString())); - try (UncheckedAutoCloseable ignored = opCache.prepareBatchWrite()) { - db.batchWrite(writeBatch); - } - } - - public void commit(RocksDatabase db, ManagedWriteOptions writeOptions) throws RocksDatabaseException { - debug(() -> String.format("%s: commit-with-writeOptions %s", - name, opCache.getCommitString())); - try (UncheckedAutoCloseable ignored = opCache.prepareBatchWrite()) { - db.batchWrite(writeBatch, writeOptions); - } - } - - @Override - public void close() { - debug(() -> String.format("%s: close", name)); - writeBatch.close(); - opCache.clear(); - } - - public void delete(ColumnFamily family, byte[] key) { - opCache.delete(family, key); - } - - public void put(ColumnFamily family, CodecBuffer key, CodecBuffer value) { - opCache.put(family, key, value); - } - - public void put(ColumnFamily family, byte[] key, byte[] value) { - opCache.put(family, key, value); - } - - public void deleteRange(ColumnFamily family, byte[] startKey, byte[] endKey) { - opCache.deleteRange(family, startKey, endKey); + return new AtomicRDBBatchOperation(writeBatch); } } diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java index efccdf31aef..498adc5891f 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java @@ -48,6 +48,7 @@ import org.apache.hadoop.hdds.utils.db.managed.ManagedDBOptions; import org.apache.hadoop.hdds.utils.db.managed.ManagedStatistics; import org.apache.hadoop.hdds.utils.db.managed.ManagedTransactionLogIterator; +import org.apache.hadoop.hdds.utils.db.managed.ManagedWriteBatch; import org.apache.hadoop.hdds.utils.db.managed.ManagedWriteOptions; import org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer; import org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer.RocksDBCheckpointDifferHolder; @@ -273,14 +274,21 @@ public long getEstimatedKeyCount() throws RocksDatabaseException { } @Override - public BatchOperation initBatchOperation() { + public RDBBatchOperation initBatchOperation() { return RDBBatchOperation.newAtomicOperation(); } + public RDBBatchOperation initBatchOperation(ManagedWriteBatch writeBatch) { + return RDBBatchOperation.newAtomicOperation(writeBatch); + } + @Override - public void commitBatchOperation(BatchOperation operation) - throws RocksDatabaseException { - ((RDBBatchOperation) operation).commit(db); + public void commitBatchOperation(BatchOperation operation) throws IOException { + ((RDBBatchOperation)operation).commit(db); + } + + public void commitBatchOperation(RDBBatchOperation operation, ManagedWriteOptions writeOptions) throws IOException { + operation.commit(db, writeOptions); } @Override diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBTable.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBTable.java index 045f020b2fe..329971d9df1 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBTable.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBTable.java @@ -70,7 +70,7 @@ public void put(byte[] key, byte[] value) throws RocksDatabaseException { db.put(family, key, value); } - void putWithBatch(BatchOperation batch, CodecBuffer key, CodecBuffer value) { + void putWithBatch(BatchOperation batch, CodecBuffer key, CodecBuffer value) throws RocksDatabaseException { if (batch instanceof RDBBatchOperation) { ((RDBBatchOperation) batch).put(family, key, value); } else { @@ -80,7 +80,7 @@ void putWithBatch(BatchOperation batch, CodecBuffer key, CodecBuffer value) { } @Override - public void putWithBatch(BatchOperation batch, byte[] key, byte[] value) { + public void putWithBatch(BatchOperation batch, byte[] key, byte[] value) throws RocksDatabaseException { if (batch instanceof RDBBatchOperation) { ((RDBBatchOperation) batch).put(family, key, value); } else { @@ -194,7 +194,7 @@ public void deleteRange(byte[] beginKey, byte[] endKey) throws RocksDatabaseExce } @Override - public void deleteWithBatch(BatchOperation batch, byte[] key) { + public void deleteWithBatch(BatchOperation batch, byte[] key) throws RocksDatabaseException { if (batch instanceof RDBBatchOperation) { ((RDBBatchOperation) batch).delete(family, key); } else { @@ -204,7 +204,7 @@ public void deleteWithBatch(BatchOperation batch, byte[] key) { } @Override - public void deleteRangeWithBatch(BatchOperation batch, byte[] beginKey, byte[] endKey) { + public void deleteRangeWithBatch(BatchOperation batch, byte[] beginKey, byte[] endKey) throws RocksDatabaseException { if (batch instanceof RDBBatchOperation) { ((RDBBatchOperation) batch).deleteRange(family, beginKey, endKey); } else { diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/Table.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/Table.java index 6904f22d7d8..3d53d9a997a 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/Table.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/Table.java @@ -132,7 +132,7 @@ default VALUE getReadCopy(KEY key) throws RocksDatabaseException, CodecException * @param batch the batch operation * @param key metadata key */ - void deleteWithBatch(BatchOperation batch, KEY key) throws CodecException; + void deleteWithBatch(BatchOperation batch, KEY key) throws RocksDatabaseException, CodecException; /** * Deletes a range of keys from the metadata store as part of a batch operation. @@ -140,7 +140,8 @@ default VALUE getReadCopy(KEY key) throws RocksDatabaseException, CodecException * @param beginKey start metadata key, inclusive. * @param endKey end metadata key, exclusive. */ - void deleteRangeWithBatch(BatchOperation batch, KEY beginKey, KEY endKey) throws CodecException; + void deleteRangeWithBatch(BatchOperation batch, KEY beginKey, KEY endKey) throws RocksDatabaseException, + CodecException; /** * Deletes a range of keys from the metadata store. diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java index 6d2fa3a99ff..738050fc667 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java @@ -376,12 +376,13 @@ public void delete(KEY key) throws RocksDatabaseException, CodecException { } @Override - public void deleteWithBatch(BatchOperation batch, KEY key) throws CodecException { + public void deleteWithBatch(BatchOperation batch, KEY key) throws CodecException, RocksDatabaseException { rawTable.deleteWithBatch(batch, encodeKey(key)); } @Override - public void deleteRangeWithBatch(BatchOperation batch, KEY beginKey, KEY endKey) throws CodecException { + public void deleteRangeWithBatch(BatchOperation batch, KEY beginKey, KEY endKey) + throws CodecException, RocksDatabaseException { rawTable.deleteRangeWithBatch(batch, encodeKey(beginKey), encodeKey(endKey)); } diff --git a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestCodec.java b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestCodec.java index f695f286405..230118cd7d6 100644 --- a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestCodec.java +++ b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestCodec.java @@ -34,7 +34,7 @@ import java.util.UUID; import java.util.concurrent.ThreadLocalRandom; import java.util.function.Consumer; -import org.apache.hadoop.hdds.utils.db.RDBBatchOperation.Bytes; +import org.apache.hadoop.hdds.utils.db.AtomicRDBBatchOperation.Bytes; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.function.Executable; import org.slf4j.Logger; diff --git a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBBatchOperation.java b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBBatchOperation.java index bd33ab070ce..5e5b6f75779 100644 --- a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBBatchOperation.java +++ b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBBatchOperation.java @@ -26,6 +26,7 @@ import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.when; +import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -54,7 +55,7 @@ * 2. Mocking of methods to track operations performed on*/ public class TestRDBBatchOperation { @Test - public void testBatchOperationWithDeleteRange() throws RocksDatabaseException { + public void testBatchOperationWithDeleteRange() throws IOException { final List, Integer>> deleteKeyRangePairs = new ArrayList<>(); final List, Integer>> putKeys = new ArrayList<>(); final List> deleteKeys = new ArrayList<>(); diff --git a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBTableStore.java b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBTableStore.java index cd155f27a96..9f56a1c9c64 100644 --- a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBTableStore.java +++ b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBTableStore.java @@ -260,7 +260,7 @@ public void deleteRange() throws Exception { @Test public void batchPut() throws Exception { final Table testTable = rdbStore.getTable("Fifth"); - try (BatchOperation batch = rdbStore.initBatchOperation()) { + try (RDBBatchOperation batch = rdbStore.initBatchOperation()) { //given byte[] key = RandomStringUtils.secure().next(10).getBytes(StandardCharsets.UTF_8); @@ -280,7 +280,7 @@ public void batchPut() throws Exception { @Test public void batchDelete() throws Exception { final Table testTable = rdbStore.getTable("Fifth"); - try (BatchOperation batch = rdbStore.initBatchOperation()) { + try (RDBBatchOperation batch = rdbStore.initBatchOperation()) { //given byte[] key = @@ -780,7 +780,7 @@ private void populateTable(Table table, @Test public void batchDeleteWithRange() throws Exception { final Table testTable = rdbStore.getTable("Fifth"); - try (BatchOperation batch = rdbStore.initBatchOperation()) { + try (RDBBatchOperation batch = rdbStore.initBatchOperation()) { //given String keyStr = RandomStringUtils.secure().next(10); @@ -814,7 +814,7 @@ public void batchDeleteWithRange() throws Exception { @Test public void orderOfBatchOperations() throws Exception { final Table testTable = rdbStore.getTable("Fifth"); - try (BatchOperation batch = rdbStore.initBatchOperation()) { + try (RDBBatchOperation batch = rdbStore.initBatchOperation()) { //given String keyStr = RandomStringUtils.secure().next(10); diff --git a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestTypedRDBTableStore.java b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestTypedRDBTableStore.java index 8bf3e59e210..c945a421296 100644 --- a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestTypedRDBTableStore.java +++ b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestTypedRDBTableStore.java @@ -161,7 +161,7 @@ public void delete() throws Exception { @Test public void batchPut() throws Exception { final Table testTable = createTypedTable("Fourth"); - try (BatchOperation batch = rdbStore.initBatchOperation()) { + try (RDBBatchOperation batch = rdbStore.initBatchOperation()) { //given String key = RandomStringUtils.secure().next(10); @@ -180,7 +180,7 @@ public void batchPut() throws Exception { @Test public void batchDelete() throws Exception { final Table testTable = createTypedTable("Fourth"); - try (BatchOperation batch = rdbStore.initBatchOperation()) { + try (RDBBatchOperation batch = rdbStore.initBatchOperation()) { //given String key = diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHADBTransactionBuffer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHADBTransactionBuffer.java index 7acf03424d8..540910d040a 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHADBTransactionBuffer.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHADBTransactionBuffer.java @@ -17,6 +17,7 @@ package org.apache.hadoop.hdds.scm.ha; +import java.io.IOException; import java.util.concurrent.atomic.AtomicReference; import org.apache.hadoop.hdds.scm.metadata.DBTransactionBuffer; import org.apache.hadoop.hdds.utils.TransactionInfo; @@ -42,7 +43,7 @@ public interface SCMHADBTransactionBuffer AtomicReference getLatestSnapshotRef(); - void flush() throws RocksDatabaseException, CodecException; + void flush() throws IOException; boolean shouldFlush(long snapshotWaitTime); diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHADBTransactionBufferImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHADBTransactionBufferImpl.java index 4b1243fd53d..a27db5160f1 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHADBTransactionBufferImpl.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHADBTransactionBufferImpl.java @@ -20,6 +20,7 @@ import static org.apache.hadoop.ozone.OzoneConsts.TRANSACTION_INFO_KEY; import com.google.common.base.Preconditions; +import java.io.IOException; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -78,7 +79,8 @@ public void addToBuffer(Table table, KEY key, VALUE val } @Override - public void removeFromBuffer(Table table, KEY key) throws CodecException { + public void removeFromBuffer(Table table, KEY key) + throws CodecException, RocksDatabaseException { rwLock.readLock().lock(); try { txFlushPending.getAndIncrement(); @@ -121,7 +123,7 @@ public AtomicReference getLatestSnapshotRef() { } @Override - public void flush() throws RocksDatabaseException, CodecException { + public void flush() throws IOException { rwLock.writeLock().lock(); try { // write latest trx info into trx table in the same batch @@ -190,7 +192,7 @@ public String toString() { } @Override - public void close() { + public void close() throws IOException { if (currentBatchOperation != null) { currentBatchOperation.close(); } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHADBTransactionBufferStub.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHADBTransactionBufferStub.java index 2e7b3fdb0dd..dfffcd78918 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHADBTransactionBufferStub.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHADBTransactionBufferStub.java @@ -17,6 +17,7 @@ package org.apache.hadoop.hdds.scm.ha; +import java.io.IOException; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.hadoop.hdds.utils.TransactionInfo; @@ -67,7 +68,8 @@ public void addToBuffer(Table table, KEY key, VALUE val } @Override - public void removeFromBuffer(Table table, KEY key) throws CodecException { + public void removeFromBuffer(Table table, KEY key) + throws CodecException, RocksDatabaseException { rwLock.readLock().lock(); try { table.deleteWithBatch(getCurrentBatchOperation(), key); @@ -102,7 +104,7 @@ public AtomicReference getLatestSnapshotRef() { } @Override - public void flush() throws RocksDatabaseException { + public void flush() throws IOException { rwLock.writeLock().lock(); try { if (dbStore != null) { @@ -127,7 +129,7 @@ public void init() { } @Override - public void close() throws RocksDatabaseException { + public void close() throws IOException { flush(); } } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/snapshot/OMSnapshotMoveDeletedKeysResponse.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/snapshot/OMSnapshotMoveDeletedKeysResponse.java index f5558dcfbf9..1ee26a608b2 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/snapshot/OMSnapshotMoveDeletedKeysResponse.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/snapshot/OMSnapshotMoveDeletedKeysResponse.java @@ -26,6 +26,7 @@ import java.util.Objects; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.utils.db.BatchOperation; +import org.apache.hadoop.hdds.utils.db.RDBBatchOperation; import org.apache.hadoop.hdds.utils.db.RDBStore; import org.apache.hadoop.ozone.om.OMMetadataManager; import org.apache.hadoop.ozone.om.OmMetadataManagerImpl; @@ -99,7 +100,7 @@ protected void addToDBBatch(OMMetadataManager omMetadataManager, RDBStore nextSnapshotStore = (RDBStore) nextOmSnapshot.getMetadataManager().getStore(); // Init Batch Operation for snapshot db. - try (BatchOperation writeBatch = + try (RDBBatchOperation writeBatch = nextSnapshotStore.initBatchOperation()) { processKeys(writeBatch, nextOmSnapshot.getMetadataManager()); processDirs(writeBatch, nextOmSnapshot.getMetadataManager(), @@ -118,7 +119,7 @@ protected void addToDBBatch(OMMetadataManager omMetadataManager, // Update From Snapshot Deleted Table. RDBStore fromSnapshotStore = (RDBStore) fromOmSnapshot.getMetadataManager().getStore(); - try (BatchOperation fromSnapshotBatchOp = + try (RDBBatchOperation fromSnapshotBatchOp = fromSnapshotStore.initBatchOperation()) { processReclaimKeys(fromSnapshotBatchOp, fromOmSnapshot.getMetadataManager()); diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/snapshot/OMSnapshotMoveTableKeysResponse.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/snapshot/OMSnapshotMoveTableKeysResponse.java index c9ed469d6ca..25c768bbb63 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/snapshot/OMSnapshotMoveTableKeysResponse.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/snapshot/OMSnapshotMoveTableKeysResponse.java @@ -27,6 +27,7 @@ import java.util.List; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.utils.db.BatchOperation; +import org.apache.hadoop.hdds.utils.db.RDBBatchOperation; import org.apache.hadoop.hdds.utils.db.RDBStore; import org.apache.hadoop.ozone.om.OMMetadataManager; import org.apache.hadoop.ozone.om.OmMetadataManagerImpl; @@ -106,7 +107,7 @@ protected void addToDBBatch(OMMetadataManager omMetadataManager, BatchOperation OmSnapshot nextOmSnapshot = rcOmNextSnapshot.get(); RDBStore nextSnapshotStore = (RDBStore) nextOmSnapshot.getMetadataManager().getStore(); // Init Batch Operation for snapshot db. - try (BatchOperation writeBatch = nextSnapshotStore.initBatchOperation()) { + try (RDBBatchOperation writeBatch = nextSnapshotStore.initBatchOperation()) { addKeysToNextSnapshot(writeBatch, nextOmSnapshot.getMetadataManager()); nextSnapshotStore.commitBatchOperation(writeBatch); nextSnapshotStore.getDb().flushWal(true); @@ -120,7 +121,7 @@ protected void addToDBBatch(OMMetadataManager omMetadataManager, BatchOperation // Update From Snapshot Deleted Table. RDBStore fromSnapshotStore = (RDBStore) fromOmSnapshot.getMetadataManager().getStore(); - try (BatchOperation fromSnapshotBatchOp = fromSnapshotStore.initBatchOperation()) { + try (RDBBatchOperation fromSnapshotBatchOp = fromSnapshotStore.initBatchOperation()) { deleteKeysFromSnapshot(fromSnapshotBatchOp, fromOmSnapshot.getMetadataManager()); fromSnapshotStore.commitBatchOperation(fromSnapshotBatchOp); fromSnapshotStore.getDb().flushWal(true); diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/bucket/TestOMBucketCreateResponse.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/bucket/TestOMBucketCreateResponse.java index 7ae5f878a26..b5635345978 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/bucket/TestOMBucketCreateResponse.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/bucket/TestOMBucketCreateResponse.java @@ -19,6 +19,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; +import java.io.IOException; import java.nio.file.Path; import java.util.UUID; import org.apache.hadoop.hdds.conf.OzoneConfiguration; @@ -58,7 +59,7 @@ public void setup() throws Exception { } @AfterEach - public void tearDown() { + public void tearDown() throws IOException { if (batchOperation != null) { batchOperation.close(); } diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/bucket/TestOMBucketDeleteResponse.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/bucket/TestOMBucketDeleteResponse.java index 3699b91cd27..428027f727a 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/bucket/TestOMBucketDeleteResponse.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/bucket/TestOMBucketDeleteResponse.java @@ -19,6 +19,7 @@ import static org.junit.jupiter.api.Assertions.assertNull; +import java.io.IOException; import java.nio.file.Path; import java.util.UUID; import org.apache.hadoop.hdds.conf.OzoneConfiguration; @@ -58,7 +59,7 @@ public void setup() throws Exception { } @AfterEach - public void tearDown() { + public void tearDown() throws IOException { if (batchOperation != null) { batchOperation.close(); } diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/bucket/TestOMBucketSetPropertyResponse.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/bucket/TestOMBucketSetPropertyResponse.java index 56254935764..f2db3c9b022 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/bucket/TestOMBucketSetPropertyResponse.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/bucket/TestOMBucketSetPropertyResponse.java @@ -19,6 +19,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; +import java.io.IOException; import java.nio.file.Path; import java.util.UUID; import org.apache.hadoop.hdds.conf.OzoneConfiguration; @@ -58,7 +59,7 @@ public void setup() throws Exception { } @AfterEach - public void tearDown() { + public void tearDown() throws IOException { if (batchOperation != null) { batchOperation.close(); } diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/file/TestOMDirectoryCreateResponse.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/file/TestOMDirectoryCreateResponse.java index aa152a5d2b7..eaa369d2a01 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/file/TestOMDirectoryCreateResponse.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/file/TestOMDirectoryCreateResponse.java @@ -20,6 +20,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; +import java.io.IOException; import java.nio.file.Path; import java.util.ArrayList; import java.util.UUID; @@ -66,7 +67,7 @@ public void setup() throws Exception { } @AfterEach - public void tearDown() { + public void tearDown() throws IOException { if (batchOperation != null) { batchOperation.close(); } diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/key/TestOMKeyResponse.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/key/TestOMKeyResponse.java index 02afd43960e..0aaea10eaa4 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/key/TestOMKeyResponse.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/key/TestOMKeyResponse.java @@ -124,7 +124,7 @@ protected OzoneConfiguration getOzoneConfiguration() { } @AfterEach - public void stop() { + public void stop() throws IOException { framework().clearInlineMocks(); if (batchOperation != null) { batchOperation.close(); diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/s3/multipart/TestS3MultipartResponse.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/s3/multipart/TestS3MultipartResponse.java index ac56273d628..123d34f4838 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/s3/multipart/TestS3MultipartResponse.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/s3/multipart/TestS3MultipartResponse.java @@ -77,7 +77,7 @@ public void setup() throws Exception { } @AfterEach - public void tearDown() { + public void tearDown() throws IOException { if (batchOperation != null) { batchOperation.close(); } diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/security/TestOMDelegationTokenResponse.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/security/TestOMDelegationTokenResponse.java index 13568e0ca23..1e4aa61cbf1 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/security/TestOMDelegationTokenResponse.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/security/TestOMDelegationTokenResponse.java @@ -51,7 +51,7 @@ public void setup() throws IOException { } @AfterEach - public void tearDown() { + public void tearDown() throws IOException { if (batchOperation != null) { batchOperation.close(); } diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/snapshot/TestOMSnapshotCreateResponse.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/snapshot/TestOMSnapshotCreateResponse.java index 6bef4b84247..e3f1ce8fa1d 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/snapshot/TestOMSnapshotCreateResponse.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/snapshot/TestOMSnapshotCreateResponse.java @@ -85,7 +85,7 @@ public void setup() throws Exception { } @AfterEach - public void tearDown() { + public void tearDown() throws IOException { if (batchOperation != null) { batchOperation.close(); } diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/snapshot/TestOMSnapshotDeleteResponse.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/snapshot/TestOMSnapshotDeleteResponse.java index bdb23b65f2c..fd18d4590ff 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/snapshot/TestOMSnapshotDeleteResponse.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/snapshot/TestOMSnapshotDeleteResponse.java @@ -25,6 +25,7 @@ import static org.mockito.Mockito.when; import java.io.File; +import java.io.IOException; import java.nio.file.Path; import java.util.UUID; import org.apache.hadoop.hdds.conf.OzoneConfiguration; @@ -77,7 +78,7 @@ public void setup() throws Exception { } @AfterEach - public void tearDown() { + public void tearDown() throws IOException { if (batchOperation != null) { batchOperation.close(); } diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/volume/TestOMVolumeResponse.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/volume/TestOMVolumeResponse.java index e8d1707bbe9..cd035ec7b4f 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/volume/TestOMVolumeResponse.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/volume/TestOMVolumeResponse.java @@ -17,6 +17,7 @@ package org.apache.hadoop.ozone.om.response.volume; +import java.io.IOException; import java.nio.file.Path; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.utils.db.BatchOperation; @@ -47,7 +48,7 @@ public void setup() throws Exception { } @AfterEach - public void tearDown() { + public void tearDown() throws IOException { if (batchOperation != null) { batchOperation.close(); } diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotRequestAndResponse.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotRequestAndResponse.java index 9c6f033b907..fc937a10a28 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotRequestAndResponse.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotRequestAndResponse.java @@ -191,7 +191,7 @@ public void baseSetup() throws Exception { } @AfterEach - public void stop() { + public void stop() throws IOException { omMetrics.unRegister(); omSnapshotIntMetrics.unregister(); framework().clearInlineMocks(); diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java index dca33c759b8..ae0da3a935c 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java @@ -74,7 +74,6 @@ import org.apache.hadoop.hdds.utils.db.RDBBatchOperation; import org.apache.hadoop.hdds.utils.db.RDBStore; import org.apache.hadoop.hdds.utils.db.RocksDBCheckpoint; -import org.apache.hadoop.hdds.utils.db.RocksDatabase; import org.apache.hadoop.hdds.utils.db.Table; import org.apache.hadoop.hdds.utils.db.TableIterator; import org.apache.hadoop.hdds.utils.db.managed.ManagedWriteBatch; @@ -633,7 +632,6 @@ ImmutablePair innerGetAndApplyDeltaUpdatesFromOM(long fromSequenc latestSequenceNumberOfOM = dbUpdates.getLatestSequenceNumber(); RDBStore rocksDBStore = (RDBStore) omMetadataManager.getStore(); - final RocksDatabase rocksDB = rocksDBStore.getDb(); numUpdates = dbUpdates.getData().size(); if (numUpdates > 0) { metrics.incrNumUpdatesInDeltaTotal(numUpdates); @@ -655,10 +653,9 @@ ImmutablePair innerGetAndApplyDeltaUpdatesFromOM(long fromSequenc // Events gets populated in events list in OMDBUpdatesHandler with call back for put/delete/update writeBatch.iterate(omdbUpdatesHandler); // Commit the OM DB transactions in recon rocks DB and sync here. - try (RDBBatchOperation rdbBatchOperation = - RDBBatchOperation.newAtomicOperation(writeBatch)) { + try (RDBBatchOperation rdbBatchOperation = rocksDBStore.initBatchOperation(writeBatch)) { try (ManagedWriteOptions wOpts = new ManagedWriteOptions()) { - rdbBatchOperation.commit(rocksDB, wOpts); + rocksDBStore.commitBatchOperation(rdbBatchOperation, wOpts); } } }