From e15782de4d81ce6dfc2e5cc4697ae9366bf8af0a Mon Sep 17 00:00:00 2001 From: Swaminathan Balachandran Date: Wed, 24 Dec 2025 07:39:09 -0500 Subject: [PATCH 1/6] HDDS-14237. Simplify ManagedDirectSlice to use ByteBuffers Change-Id: I1cc243d0dadbe647ad9dc7b94822ba7dc6a83e1c --- .../utils/db/managed/ManagedDirectSlice.java | 106 ++++-------------- .../db/managed/TestManagedDirectSlice.java | 42 +------ .../hdds/utils/db/RDBSstFileWriter.java | 8 +- 3 files changed, 27 insertions(+), 129 deletions(-) diff --git a/hadoop-hdds/managed-rocksdb/src/main/java/org/apache/hadoop/hdds/utils/db/managed/ManagedDirectSlice.java b/hadoop-hdds/managed-rocksdb/src/main/java/org/apache/hadoop/hdds/utils/db/managed/ManagedDirectSlice.java index 52b8b5aabb1e..1c2be3be36de 100644 --- a/hadoop-hdds/managed-rocksdb/src/main/java/org/apache/hadoop/hdds/utils/db/managed/ManagedDirectSlice.java +++ b/hadoop-hdds/managed-rocksdb/src/main/java/org/apache/hadoop/hdds/utils/db/managed/ManagedDirectSlice.java @@ -17,98 +17,34 @@ package org.apache.hadoop.hdds.utils.db.managed; -import static org.apache.hadoop.hdds.utils.db.managed.ManagedRocksDB.NOT_FOUND; - -import com.google.common.annotations.VisibleForTesting; import java.nio.ByteBuffer; -import org.apache.hadoop.hdds.utils.db.RocksDatabaseException; -import org.apache.ratis.util.function.CheckedConsumer; -import org.apache.ratis.util.function.CheckedFunction; import org.rocksdb.DirectSlice; -import org.rocksdb.RocksDBException; /** - * ManagedDirectSlice is a managed wrapper around the DirectSlice object. It ensures - * proper handling of native resources associated with DirectSlice, utilizing - * the ManagedObject infrastructure to prevent resource leaks. It works in tandem - * with a ByteBuffer, which acts as the data source for the managed slice. + * ManagedDirectSlice is a class that extends the {@link DirectSlice} class and provides additional + * management for slices of direct {@link ByteBuffer} memory. This class initializes the slice with + * the given ByteBuffer and sets its prefix and length properties based on the buffer's position + * and remaining capacity. + * + * The class is designed to handle specific memory slicing operations while ensuring that the + * provided ByteBuffer’s constraints are respected. ManagedDirectSlice leverages its parent + * {@link DirectSlice} functionalities to deliver optimized direct buffer handling. + * + * Constructor: + * - Initializes the ManagedDirectSlice instance with a provided ByteBuffer. + * - Sets the slice length to the buffer's remaining capacity. + * - Removes the prefix based on the buffer's position. * - * This class overrides certain operations to tightly control the lifecycle and - * behavior of the DirectSlice it manages. It specifically caters to use cases - * where the slice is used in RocksDB operations, providing methods for safely - * interacting with the slice for put-like operations. + * NOTE: This class should be only with ByteBuffer whose position and limit is going be immutable in the lifetime of + * this ManagedDirectSlice instance. This means that the ByteBuffer's position and limit should not be modified + * externally while the ManagedDirectSlice is in use. The value in the byte buffer should be only accessed via the + * instance. */ -public class ManagedDirectSlice extends ManagedObject { - - private final ByteBuffer data; +public class ManagedDirectSlice extends DirectSlice { public ManagedDirectSlice(ByteBuffer data) { - super(new DirectSlice(data)); - this.data = data; - } - - @Override - public DirectSlice get() { - throw new UnsupportedOperationException("get() is not supported."); - } - - /** - * Executes the provided consumer on the internal {@code DirectSlice} after - * adjusting the slice's prefix and length based on the current position and - * remaining data in the associated {@code ByteBuffer}. If the consumer throws - * a {@code RocksDBException}, it is wrapped and rethrown as a - * {@code RocksDatabaseException}. - * - * @param consumer the operation to perform on the managed {@code DirectSlice}. - * The consumer must handle a {@code DirectSlice} and may throw - * a {@code RocksDBException}. - * @throws RocksDatabaseException if the provided consumer throws a - * {@code RocksDBException}. - */ - public void putFromBuffer(CheckedConsumer consumer) - throws RocksDatabaseException { - DirectSlice slice = super.get(); - slice.removePrefix(this.data.position()); - slice.setLength(this.data.remaining()); - try { - consumer.accept(slice); - } catch (RocksDBException e) { - throw new RocksDatabaseException("Error while performing put op with directSlice", e); - } - data.position(data.limit()); - } - - /** - * Retrieves data from the associated DirectSlice into the buffer managed by this instance. - * The supplied function is applied to the DirectSlice to process the data, and the method - * adjusts the buffer's position and limit based on the result. - * - * @param function a function that operates on a DirectSlice and returns the number - * of bytes written to the buffer, or a specific "not found" value - * if the operation fails. The function may throw a RocksDBException. - * @return the number of bytes written to the buffer if successful, or a specific - * "not found" value indicating the requested data was absent. - * @throws RocksDatabaseException if the provided function throws a RocksDBException, - * wrapping the original exception. - */ - public int getToBuffer(CheckedFunction function) - throws RocksDatabaseException { - DirectSlice slice = super.get(); - slice.removePrefix(this.data.position()); - slice.setLength(this.data.remaining()); - try { - int lengthWritten = function.apply(slice); - if (lengthWritten != NOT_FOUND) { - this.data.limit(Math.min(data.limit(), data.position() + lengthWritten)); - } - return lengthWritten; - } catch (RocksDBException e) { - throw new RocksDatabaseException("Error while performing put op with directSlice", e); - } - } - - @VisibleForTesting - DirectSlice getDirectSlice() { - return super.get(); + super(data); + this.removePrefix(data.position()); + this.setLength(data.remaining()); } } diff --git a/hadoop-hdds/managed-rocksdb/src/test/java/org/apache/hadoop/hdds/utils/db/managed/TestManagedDirectSlice.java b/hadoop-hdds/managed-rocksdb/src/test/java/org/apache/hadoop/hdds/utils/db/managed/TestManagedDirectSlice.java index c332d32704f8..f13aba39a7ba 100644 --- a/hadoop-hdds/managed-rocksdb/src/test/java/org/apache/hadoop/hdds/utils/db/managed/TestManagedDirectSlice.java +++ b/hadoop-hdds/managed-rocksdb/src/test/java/org/apache/hadoop/hdds/utils/db/managed/TestManagedDirectSlice.java @@ -22,11 +22,8 @@ import java.nio.ByteBuffer; import java.util.Arrays; import org.apache.commons.lang3.RandomUtils; -import org.apache.hadoop.hdds.utils.db.RocksDatabaseException; -import org.junit.jupiter.api.Assertions; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.CsvSource; -import org.rocksdb.DirectSlice; /** * Tests for ManagedDirectSlice. @@ -39,48 +36,15 @@ public class TestManagedDirectSlice { @ParameterizedTest @CsvSource({"0, 1024", "1024, 1024", "512, 1024", "0, 100", "10, 512", "0, 0"}) - public void testManagedDirectSliceWithOffsetMovedAheadByteBuffer(int offset, int numberOfBytesWritten) - throws RocksDatabaseException { + public void testManagedDirectSliceWithOffset(int offset, int numberOfBytesWritten) { ByteBuffer byteBuffer = ByteBuffer.allocateDirect(1024); byte[] randomBytes = RandomUtils.secure().nextBytes(numberOfBytesWritten); byteBuffer.put(randomBytes); byteBuffer.flip(); + byteBuffer.position(offset); try (ManagedDirectSlice directSlice = new ManagedDirectSlice(byteBuffer); ManagedSlice slice = new ManagedSlice(Arrays.copyOfRange(randomBytes, offset, numberOfBytesWritten))) { - byteBuffer.position(offset); - directSlice.putFromBuffer((ds) -> { - DirectSlice directSliceFromByteBuffer = directSlice.getDirectSlice(); - assertEquals(numberOfBytesWritten - offset, ds.size()); - assertEquals(0, directSliceFromByteBuffer.compare(slice)); - assertEquals(0, slice.compare(directSliceFromByteBuffer)); - }); - Assertions.assertEquals(numberOfBytesWritten, byteBuffer.position()); - } - } - - @ParameterizedTest - @CsvSource({"0, 1024, 512", "1024, 1024, 5", "512, 1024, 600", "0, 100, 80", "10, 512, 80", "0, 0, 10", - "100, 256, -1"}) - public void testManagedDirectSliceWithOpPutToByteBuffer(int offset, int maxNumberOfBytesWrite, - int numberOfBytesToWrite) throws RocksDatabaseException { - ByteBuffer byteBuffer = ByteBuffer.allocateDirect(1024); - byte[] randomBytes = RandomUtils.secure().nextBytes(offset); - byteBuffer.put(randomBytes); - try (ManagedDirectSlice directSlice = new ManagedDirectSlice(byteBuffer)) { - byteBuffer.position(offset); - byteBuffer.limit(Math.min(offset + maxNumberOfBytesWrite, 1024)); - assertEquals(numberOfBytesToWrite, directSlice.getToBuffer((ds) -> { - assertEquals(byteBuffer.remaining(), ds.size()); - return numberOfBytesToWrite; - })); - Assertions.assertEquals(offset, byteBuffer.position()); - if (numberOfBytesToWrite == -1) { - assertEquals(offset + maxNumberOfBytesWrite, byteBuffer.limit()); - } else { - Assertions.assertEquals(Math.min(Math.min(offset + numberOfBytesToWrite, 1024), maxNumberOfBytesWrite), - byteBuffer.limit()); - } - + assertEquals(slice, directSlice); } } } diff --git a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/hadoop/hdds/utils/db/RDBSstFileWriter.java b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/hadoop/hdds/utils/db/RDBSstFileWriter.java index 14f553a9b185..a689e9fdea14 100644 --- a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/hadoop/hdds/utils/db/RDBSstFileWriter.java +++ b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/hadoop/hdds/utils/db/RDBSstFileWriter.java @@ -85,11 +85,9 @@ public void delete(byte[] key) throws RocksDatabaseException { public void delete(CodecBuffer key) throws RocksDatabaseException { try (ManagedDirectSlice slice = new ManagedDirectSlice(key.asReadOnlyByteBuffer())) { - slice.putFromBuffer(directSlice -> { - sstFileWriter.delete(directSlice); - keyCounter.incrementAndGet(); - }); - } catch (RocksDatabaseException e) { + sstFileWriter.delete(slice); + keyCounter.incrementAndGet(); + } catch (RocksDBException e) { closeOnFailure(); throw new RocksDatabaseException("Failed to delete key (length=" + key.readableBytes() + "), sstFile=" + sstFile.getAbsolutePath(), e); From 792fe8afd6e12604c1720261af530ecd5387c8ac Mon Sep 17 00:00:00 2001 From: Swaminathan Balachandran Date: Wed, 24 Dec 2025 16:12:43 -0500 Subject: [PATCH 2/6] HDDS-14241. Refactor RDBBatchOperation to support various Operations by abstracting out implementation of each operation Change-Id: Idf5b9a4b9f66eae41a9d832ddbf22e43d1027344 --- .../hdds/utils/db/RDBBatchOperation.java | 236 +++++++++++++----- 1 file changed, 174 insertions(+), 62 deletions(-) diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBBatchOperation.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBBatchOperation.java index 87d9ffc625cd..1d2f9ef456d9 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBBatchOperation.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBBatchOperation.java @@ -20,12 +20,15 @@ import static org.apache.hadoop.hdds.StringUtils.bytes2String; import com.google.common.base.Preconditions; +import java.io.Closeable; import java.nio.ByteBuffer; import java.util.HashMap; import java.util.Map; import java.util.Objects; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Supplier; +import org.apache.hadoop.hdds.utils.IOUtils; import org.apache.hadoop.hdds.utils.db.RocksDatabase.ColumnFamily; import org.apache.hadoop.hdds.utils.db.managed.ManagedWriteBatch; import org.apache.hadoop.hdds.utils.db.managed.ManagedWriteOptions; @@ -42,6 +45,9 @@ public final class RDBBatchOperation implements BatchOperation { static final Logger LOG = LoggerFactory.getLogger(RDBBatchOperation.class); + private static final String PUT_OP = "PUT"; + private static final String DELETE_OP = "DELETE"; + private static final AtomicInteger BATCH_COUNT = new AtomicInteger(); private final String name = "Batch-" + BATCH_COUNT.getAndIncrement(); @@ -50,8 +56,6 @@ public final class RDBBatchOperation implements BatchOperation { private final OpCache opCache = new OpCache(); - private enum Op { DELETE } - public static RDBBatchOperation newAtomicOperation() { return newAtomicOperation(new ManagedWriteBatch()); } @@ -75,7 +79,7 @@ private static String countSize2String(int count, long size) { } /** - * The key type of {@link RDBBatchOperation.OpCache.FamilyCache#ops}. + * The key type of {@link RDBBatchOperation.OpCache.FamilyCache#batchOps}. * To implement {@link #equals(Object)} and {@link #hashCode()} * based on the contents of the bytes. */ @@ -97,10 +101,6 @@ static final class Bytes { this.hash = ByteBuffer.wrap(array).hashCode(); } - byte[] array() { - return array; - } - ByteBuffer asReadOnlyByteBuffer() { return buffer.asReadOnlyByteBuffer(); } @@ -135,6 +135,138 @@ public String toString() { } } + private abstract class Operation implements Closeable { + + private Operation() { + } + + abstract void apply(ColumnFamily family, ManagedWriteBatch batch) throws RocksDatabaseException; + + abstract int keyLen(); + + abstract int valLen(); + + int totalLength() { + return keyLen() + valLen(); + } + + abstract String getOpType(); + + @Override + public void close() { + } + } + + /** + * Delete operation to be applied to a {@link ColumnFamily} batch. + */ + private final class DeleteOperation extends Operation { + private final byte[] key; + + private DeleteOperation(byte[] key) { + super(); + this.key = Objects.requireNonNull(key, "key == null"); + } + + @Override + public void apply(ColumnFamily family, ManagedWriteBatch batch) throws RocksDatabaseException { + family.batchDelete(batch, this.key); + } + + @Override + public int keyLen() { + return key.length; + } + + @Override + public int valLen() { + return 0; + } + + @Override + public String getOpType() { + return DELETE_OP; + } + } + + /** + * Put operation to be applied to a {@link ColumnFamily} batch using the CodecBuffer api. + */ + private final class CodecBufferPutOperation extends Operation { + private final CodecBuffer key; + private final CodecBuffer value; + private final AtomicBoolean closed = new AtomicBoolean(false); + + private CodecBufferPutOperation(CodecBuffer key, CodecBuffer value) { + super(); + this.key = key; + this.value = value; + } + + @Override + public void apply(ColumnFamily family, ManagedWriteBatch batch) throws RocksDatabaseException { + family.batchPut(batch, key.asReadOnlyByteBuffer(), value.asReadOnlyByteBuffer()); + } + + @Override + public int keyLen() { + return key.readableBytes(); + } + + @Override + public int valLen() { + return value.readableBytes(); + } + + @Override + public String getOpType() { + return PUT_OP; + } + + @Override + public void close() { + if (closed.compareAndSet(false, true)) { + key.release(); + value.release(); + } + super.close(); + } + } + + /** + * Put operation to be applied to a {@link ColumnFamily} batch using the byte array api. + */ + private final class ByteArrayPutOperation extends Operation { + private final byte[] key; + private final byte[] value; + + private ByteArrayPutOperation(byte[] key, byte[] value) { + super(); + this.key = Objects.requireNonNull(key, "key == null"); + this.value = Objects.requireNonNull(value, "value == null"); + } + + @Override + public void apply(ColumnFamily family, ManagedWriteBatch batch) throws RocksDatabaseException { + family.batchPut(batch, key, value); + } + + @Override + public int keyLen() { + return key.length; + } + + @Override + public int valLen() { + return value.length; + } + + @Override + public String getOpType() { + return PUT_OP; + } + } + /** Cache and deduplicate db ops (put/delete). */ private class OpCache { /** A (family name -> {@link FamilyCache}) map. */ @@ -143,13 +275,18 @@ private class OpCache { /** A cache for a {@link ColumnFamily}. */ private class FamilyCache { private final ColumnFamily family; + /** - * A (dbKey -> dbValue) map, where the dbKey type is {@link Bytes} - * and the dbValue type is {@link Object}. - * When dbValue is a byte[]/{@link ByteBuffer}, it represents a put-op. - * Otherwise, it represents a delete-op (dbValue is {@link Op#DELETE}). + * A mapping of keys to operations for batch processing in the {@link FamilyCache}. + * The keys are represented as {@link Bytes} objects, encapsulating the byte array or buffer + * for efficient equality and hashing. The values are instances of {@link Operation}, representing + * different types of operations that can be applied to a {@link ColumnFamily}. + * + * This field is intended to store pending batch updates before they are written to the database. + * It supports operations such as additions and deletions while maintaining the ability to overwrite + * existing entries when necessary. */ - private final Map ops = new HashMap<>(); + private final Map batchOps = new HashMap<>(); private boolean isCommit; private long batchSize; @@ -166,22 +303,9 @@ private class FamilyCache { void prepareBatchWrite() throws RocksDatabaseException { Preconditions.checkState(!isCommit, "%s is already committed.", this); isCommit = true; - for (Map.Entry op : ops.entrySet()) { - final Bytes key = op.getKey(); - final Object value = op.getValue(); - if (value instanceof byte[]) { - family.batchPut(writeBatch, key.array(), (byte[]) value); - } else if (value instanceof CodecBuffer) { - family.batchPut(writeBatch, key.asReadOnlyByteBuffer(), - ((CodecBuffer) value).asReadOnlyByteBuffer()); - } else if (value == Op.DELETE) { - family.batchDelete(writeBatch, key.array()); - } else { - throw new IllegalStateException("Unexpected value: " + value - + ", class=" + value.getClass().getSimpleName()); - } + for (Operation op : batchOps.values()) { + op.apply(family, writeBatch); } - debug(this::summary); } @@ -194,48 +318,35 @@ void clear() { final boolean warn = !isCommit && batchSize > 0; String details = warn ? summary() : null; - for (Object value : ops.values()) { - if (value instanceof CodecBuffer) { - ((CodecBuffer) value).release(); // the key will also be released - } - } - ops.clear(); + IOUtils.close(LOG, batchOps.values()); + batchOps.clear(); if (warn) { LOG.warn("discarding changes {}", details); } } - void putOrDelete(Bytes key, int keyLen, Object val, int valLen) { - Preconditions.checkState(!isCommit, "%s is already committed.", this); - batchSize += keyLen + valLen; - // remove previous first in order to call release() - final Object previous = ops.remove(key); + private void deleteIfExist(Bytes key) { + final Operation previous = batchOps.remove(key); if (previous != null) { - final boolean isPut = previous != Op.DELETE; - final int preLen; - if (!isPut) { - preLen = 0; - } else if (previous instanceof CodecBuffer) { - final CodecBuffer previousValue = (CodecBuffer) previous; - preLen = previousValue.readableBytes(); - previousValue.release(); // key will also be released - } else if (previous instanceof byte[]) { - preLen = ((byte[]) previous).length; - } else { - throw new IllegalStateException("Unexpected previous: " + previous - + ", class=" + previous.getClass().getSimpleName()); - } - discardedSize += keyLen + preLen; + previous.close(); + discardedSize += previous.totalLength(); discardedCount++; - debug(() -> String.format("%s overwriting a previous %s", this, - isPut ? "put (value: " + byteSize2String(preLen) + ")" : "del")); + debug(() -> String.format("%s overwriting a previous %s[valLen => %s]", this, previous.getOpType(), + previous.valLen())); } - final Object overwritten = ops.put(key, val); + } + + void overWriteOpIfExist(Bytes key, Operation operation) { + Preconditions.checkState(!isCommit, "%s is already committed.", this); + deleteIfExist(key); + batchSize += operation.totalLength(); + Operation overwritten = batchOps.put(key, operation); Preconditions.checkState(overwritten == null); debug(() -> String.format("%s %s, %s; key=%s", this, - valLen == 0 ? delString(keyLen) : putString(keyLen, valLen), + DELETE_OP.equals(operation.getOpType()) ? delString(operation.totalLength()) : putString(operation.keyLen(), + operation.valLen()), batchSizeDiscardedString(), key)); } @@ -243,19 +354,20 @@ void put(CodecBuffer key, CodecBuffer value) { putCount++; // always release the key with the value - value.getReleaseFuture().thenAccept(v -> key.release()); - putOrDelete(new Bytes(key), key.readableBytes(), - value, value.readableBytes()); + Bytes keyBytes = new Bytes(key); + overWriteOpIfExist(keyBytes, new CodecBufferPutOperation(key, value)); } void put(byte[] key, byte[] value) { putCount++; - putOrDelete(new Bytes(key), key.length, value, value.length); + Bytes keyBytes = new Bytes(key); + overWriteOpIfExist(keyBytes, new ByteArrayPutOperation(key, value)); } void delete(byte[] key) { delCount++; - putOrDelete(new Bytes(key), key.length, Op.DELETE, 0); + Bytes keyBytes = new Bytes(key); + overWriteOpIfExist(keyBytes, new DeleteOperation(key)); } String putString(int keySize, int valueSize) { From deb7900d94dc1f02c8cf96252e84041a641e965c Mon Sep 17 00:00:00 2001 From: Swaminathan Balachandran Date: Wed, 24 Dec 2025 16:40:22 -0500 Subject: [PATCH 3/6] HDDS-14238. Move RDBBatchOperation Byte comparison to native comparison for optimization Change-Id: Ia7655ff5148197be488a2c1151ec7fd1d6f9d452 --- .../hdds/utils/db/RDBBatchOperation.java | 73 +++++++++++-------- .../hadoop/hdds/utils/db/TestCodec.java | 18 +++-- pom.xml | 1 + 3 files changed, 54 insertions(+), 38 deletions(-) diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBBatchOperation.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBBatchOperation.java index 1d2f9ef456d9..663a395f8aa2 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBBatchOperation.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBBatchOperation.java @@ -17,8 +17,6 @@ package org.apache.hadoop.hdds.utils.db; -import static org.apache.hadoop.hdds.StringUtils.bytes2String; - import com.google.common.base.Preconditions; import java.io.Closeable; import java.nio.ByteBuffer; @@ -30,10 +28,13 @@ import java.util.function.Supplier; import org.apache.hadoop.hdds.utils.IOUtils; import org.apache.hadoop.hdds.utils.db.RocksDatabase.ColumnFamily; +import org.apache.hadoop.hdds.utils.db.managed.ManagedDirectSlice; +import org.apache.hadoop.hdds.utils.db.managed.ManagedSlice; import org.apache.hadoop.hdds.utils.db.managed.ManagedWriteBatch; import org.apache.hadoop.hdds.utils.db.managed.ManagedWriteOptions; import org.apache.ratis.util.TraditionalBinaryPrefix; import org.apache.ratis.util.UncheckedAutoCloseable; +import org.rocksdb.AbstractSlice; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -83,26 +84,33 @@ private static String countSize2String(int count, long size) { * To implement {@link #equals(Object)} and {@link #hashCode()} * based on the contents of the bytes. */ - static final class Bytes { - private final byte[] array; - private final CodecBuffer buffer; + static final class Bytes implements Closeable { + private AbstractSlice slice; /** Cache the hash value. */ - private final int hash; + private int hash; Bytes(CodecBuffer buffer) { - this.array = null; - this.buffer = Objects.requireNonNull(buffer, "buffer == null"); - this.hash = buffer.asReadOnlyByteBuffer().hashCode(); + Objects.requireNonNull(buffer, "buffer == null"); + if (buffer.isDirect()) { + initWithDirectByteBuffer(buffer.asReadOnlyByteBuffer()); + } else { + initWithByteArray(buffer.getArray()); + } } Bytes(byte[] array) { - this.array = array; - this.buffer = null; + Objects.requireNonNull(array, "array == null"); + initWithByteArray(array); + } + + private void initWithByteArray(byte[] array) { + this.slice = new ManagedSlice(array); this.hash = ByteBuffer.wrap(array).hashCode(); } - ByteBuffer asReadOnlyByteBuffer() { - return buffer.asReadOnlyByteBuffer(); + private void initWithDirectByteBuffer(ByteBuffer byteBuffer) { + this.slice = new ManagedDirectSlice(byteBuffer); + this.hash = byteBuffer.hashCode(); } @Override @@ -116,11 +124,7 @@ public boolean equals(Object obj) { if (this.hash != that.hash) { return false; } - final ByteBuffer thisBuf = this.array != null ? - ByteBuffer.wrap(this.array) : this.asReadOnlyByteBuffer(); - final ByteBuffer thatBuf = that.array != null ? - ByteBuffer.wrap(that.array) : that.asReadOnlyByteBuffer(); - return thisBuf.equals(thatBuf); + return slice.equals(that.slice); } @Override @@ -130,14 +134,20 @@ public int hashCode() { @Override public String toString() { - return array != null ? bytes2String(array) - : bytes2String(asReadOnlyByteBuffer()); + return slice.toString(); + } + + @Override + public void close() { + slice.close(); } } private abstract class Operation implements Closeable { + private final Bytes keyBytes; - private Operation() { + private Operation(Bytes keyBytes) { + this.keyBytes = keyBytes; } abstract void apply(ColumnFamily family, ManagedWriteBatch batch) throws RocksDatabaseException; @@ -154,6 +164,9 @@ int totalLength() { @Override public void close() { + if (keyBytes != null) { + keyBytes.close(); + } } } @@ -163,8 +176,8 @@ public void close() { private final class DeleteOperation extends Operation { private final byte[] key; - private DeleteOperation(byte[] key) { - super(); + private DeleteOperation(byte[] key, Bytes keyBytes) { + super(Objects.requireNonNull(keyBytes, "keyBytes == null")); this.key = Objects.requireNonNull(key, "key == null"); } @@ -197,8 +210,8 @@ private final class CodecBufferPutOperation extends Operation { private final CodecBuffer value; private final AtomicBoolean closed = new AtomicBoolean(false); - private CodecBufferPutOperation(CodecBuffer key, CodecBuffer value) { - super(); + private CodecBufferPutOperation(CodecBuffer key, CodecBuffer value, Bytes keyBytes) { + super(keyBytes); this.key = key; this.value = value; } @@ -240,8 +253,8 @@ private final class ByteArrayPutOperation extends Operation { private final byte[] key; private final byte[] value; - private ByteArrayPutOperation(byte[] key, byte[] value) { - super(); + private ByteArrayPutOperation(byte[] key, byte[] value, Bytes keyBytes) { + super(keyBytes); this.key = Objects.requireNonNull(key, "key == null"); this.value = Objects.requireNonNull(value, "value == null"); } @@ -355,19 +368,19 @@ void put(CodecBuffer key, CodecBuffer value) { // always release the key with the value Bytes keyBytes = new Bytes(key); - overWriteOpIfExist(keyBytes, new CodecBufferPutOperation(key, value)); + overWriteOpIfExist(keyBytes, new CodecBufferPutOperation(key, value, keyBytes)); } void put(byte[] key, byte[] value) { putCount++; Bytes keyBytes = new Bytes(key); - overWriteOpIfExist(keyBytes, new ByteArrayPutOperation(key, value)); + overWriteOpIfExist(keyBytes, new ByteArrayPutOperation(key, value, keyBytes)); } void delete(byte[] key) { delCount++; Bytes keyBytes = new Bytes(key); - overWriteOpIfExist(keyBytes, new DeleteOperation(key)); + overWriteOpIfExist(keyBytes, new DeleteOperation(key, keyBytes)); } String putString(int keySize, int valueSize) { diff --git a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestCodec.java b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestCodec.java index f695f2864055..88eca2b02f6b 100644 --- a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestCodec.java +++ b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestCodec.java @@ -26,6 +26,7 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; +import com.google.common.collect.ImmutableList; import com.google.common.primitives.Ints; import com.google.common.primitives.Longs; import com.google.common.primitives.Shorts; @@ -35,6 +36,7 @@ import java.util.concurrent.ThreadLocalRandom; import java.util.function.Consumer; import org.apache.hadoop.hdds.utils.db.RDBBatchOperation.Bytes; +import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksObjectUtils; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.function.Executable; import org.slf4j.Logger; @@ -49,6 +51,7 @@ public final class TestCodec { static { CodecBuffer.enableLeakDetection(); + ManagedRocksObjectUtils.loadRocksDBLibrary(); } @Test @@ -295,14 +298,13 @@ public static void runTest(Codec codec, T original, static void runTestBytes(T object, Codec codec) throws IOException { final byte[] array = codec.toPersistedFormat(object); final Bytes fromArray = new Bytes(array); - - try (CodecBuffer buffer = codec.toCodecBuffer(object, - CodecBuffer.Allocator.HEAP)) { - final Bytes fromBuffer = new Bytes(buffer); - - assertEquals(fromArray.hashCode(), fromBuffer.hashCode()); - assertEquals(fromArray, fromBuffer); - assertEquals(fromBuffer, fromArray); + for (CodecBuffer.Allocator allocator : ImmutableList.of(CodecBuffer.Allocator.HEAP, CodecBuffer.Allocator.DIRECT)) { + try (CodecBuffer buffer = codec.toCodecBuffer(object, allocator)) { + final Bytes fromBuffer = new Bytes(buffer); + assertEquals(fromArray.hashCode(), fromBuffer.hashCode()); + assertEquals(fromArray, fromBuffer); + assertEquals(fromBuffer, fromArray); + } } } } diff --git a/pom.xml b/pom.xml index 73ed23ae01ff..fef4c5177d0b 100644 --- a/pom.xml +++ b/pom.xml @@ -1965,6 +1965,7 @@ org.rocksdb.ColumnFamilyHandle org.rocksdb.Env org.rocksdb.Statistics + org.rocksdb.AbstractSlice org.rocksdb.RocksDB.* From ee1dad3613afe5448cb83782f89f100c2e63a848 Mon Sep 17 00:00:00 2001 From: Swaminathan Balachandran Date: Wed, 24 Dec 2025 16:44:57 -0500 Subject: [PATCH 4/6] HDDS-14166. Completely get rid of byte array operations from RDBBatchOperation Change-Id: I4a7c62d8cc91173a1374ba5f3515f3c9ac99376f --- .../hdds/utils/db/CodecBufferCodec.java | 6 +- .../hdds/utils/db/RDBBatchOperation.java | 79 +++++++++---------- .../apache/hadoop/hdds/utils/db/RDBTable.java | 8 ++ .../hadoop/hdds/utils/db/RocksDatabase.java | 16 +--- .../hadoop/hdds/utils/db/TypedTable.java | 14 +++- 5 files changed, 61 insertions(+), 62 deletions(-) diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/CodecBufferCodec.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/CodecBufferCodec.java index 9d2944fab66e..416cc8bb9c16 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/CodecBufferCodec.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/CodecBufferCodec.java @@ -39,12 +39,12 @@ */ public final class CodecBufferCodec implements Codec { - private static final Codec DIRECT_INSTANCE = new CodecBufferCodec(true); - private static final Codec NON_DIRECT_INSTANCE = new CodecBufferCodec(false); + private static final CodecBufferCodec DIRECT_INSTANCE = new CodecBufferCodec(true); + private static final CodecBufferCodec NON_DIRECT_INSTANCE = new CodecBufferCodec(false); private final CodecBuffer.Allocator allocator; - public static Codec get(boolean direct) { + public static CodecBufferCodec get(boolean direct) { return direct ? DIRECT_INSTANCE : NON_DIRECT_INSTANCE; } diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBBatchOperation.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBBatchOperation.java index 663a395f8aa2..ecc9d37d7756 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBBatchOperation.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBBatchOperation.java @@ -50,6 +50,7 @@ public final class RDBBatchOperation implements BatchOperation { private static final String DELETE_OP = "DELETE"; private static final AtomicInteger BATCH_COUNT = new AtomicInteger(); + private static final CodecBufferCodec DIRECT_CODEC_BUFFER_CODEC = CodecBufferCodec.get(true); private final String name = "Batch-" + BATCH_COUNT.getAndIncrement(); @@ -174,21 +175,22 @@ public void close() { * Delete operation to be applied to a {@link ColumnFamily} batch. */ private final class DeleteOperation extends Operation { - private final byte[] key; + private final CodecBuffer key; + private final AtomicBoolean closed = new AtomicBoolean(false); - private DeleteOperation(byte[] key, Bytes keyBytes) { - super(Objects.requireNonNull(keyBytes, "keyBytes == null")); + private DeleteOperation(CodecBuffer key, Bytes keyBytes) { + super(keyBytes); this.key = Objects.requireNonNull(key, "key == null"); } @Override public void apply(ColumnFamily family, ManagedWriteBatch batch) throws RocksDatabaseException { - family.batchDelete(batch, this.key); + family.batchDelete(batch, this.key.asReadOnlyByteBuffer()); } @Override public int keyLen() { - return key.length; + return key.readableBytes(); } @Override @@ -200,17 +202,25 @@ public int valLen() { public String getOpType() { return DELETE_OP; } + + @Override + public void close() { + if (closed.compareAndSet(false, true)) { + key.release(); + } + super.close(); + } } /** * Put operation to be applied to a {@link ColumnFamily} batch using the CodecBuffer api. */ - private final class CodecBufferPutOperation extends Operation { + private final class PutOperation extends Operation { private final CodecBuffer key; private final CodecBuffer value; private final AtomicBoolean closed = new AtomicBoolean(false); - private CodecBufferPutOperation(CodecBuffer key, CodecBuffer value, Bytes keyBytes) { + private PutOperation(CodecBuffer key, CodecBuffer value, Bytes keyBytes) { super(keyBytes); this.key = key; this.value = value; @@ -246,40 +256,6 @@ public void close() { } } - /** - * Put operation to be applied to a {@link ColumnFamily} batch using the byte array api. - */ - private final class ByteArrayPutOperation extends Operation { - private final byte[] key; - private final byte[] value; - - private ByteArrayPutOperation(byte[] key, byte[] value, Bytes keyBytes) { - super(keyBytes); - this.key = Objects.requireNonNull(key, "key == null"); - this.value = Objects.requireNonNull(value, "value == null"); - } - - @Override - public void apply(ColumnFamily family, ManagedWriteBatch batch) throws RocksDatabaseException { - family.batchPut(batch, key, value); - } - - @Override - public int keyLen() { - return key.length; - } - - @Override - public int valLen() { - return value.length; - } - - @Override - public String getOpType() { - return PUT_OP; - } - } - /** Cache and deduplicate db ops (put/delete). */ private class OpCache { /** A (family name -> {@link FamilyCache}) map. */ @@ -368,16 +344,25 @@ void put(CodecBuffer key, CodecBuffer value) { // always release the key with the value Bytes keyBytes = new Bytes(key); - overWriteOpIfExist(keyBytes, new CodecBufferPutOperation(key, value, keyBytes)); + overWriteOpIfExist(keyBytes, new PutOperation(key, value, keyBytes)); } void put(byte[] key, byte[] value) { putCount++; + CodecBuffer keyBuffer = DIRECT_CODEC_BUFFER_CODEC.fromPersistedFormat(key); + CodecBuffer valueBuffer = DIRECT_CODEC_BUFFER_CODEC.fromPersistedFormat(value); Bytes keyBytes = new Bytes(key); - overWriteOpIfExist(keyBytes, new ByteArrayPutOperation(key, value, keyBytes)); + overWriteOpIfExist(keyBytes, new PutOperation(keyBuffer, valueBuffer, keyBytes)); } void delete(byte[] key) { + delCount++; + CodecBuffer keyBuffer = DIRECT_CODEC_BUFFER_CODEC.fromPersistedFormat(key); + Bytes keyBytes = new Bytes(keyBuffer); + overWriteOpIfExist(keyBytes, new DeleteOperation(keyBuffer, keyBytes)); + } + + void delete(CodecBuffer key) { delCount++; Bytes keyBytes = new Bytes(key); overWriteOpIfExist(keyBytes, new DeleteOperation(key, keyBytes)); @@ -420,6 +405,10 @@ void delete(ColumnFamily family, byte[] key) { .delete(key); } + void delete(ColumnFamily family, CodecBuffer key) { + name2cache.computeIfAbsent(family.getName(), k -> new FamilyCache(family)).delete(key); + } + /** Prepare batch write for the entire cache. */ UncheckedAutoCloseable prepareBatchWrite() throws RocksDatabaseException { for (Map.Entry e : name2cache.entrySet()) { @@ -496,6 +485,10 @@ public void delete(ColumnFamily family, byte[] key) { opCache.delete(family, key); } + public void delete(ColumnFamily family, CodecBuffer key) { + opCache.delete(family, key); + } + public void put(ColumnFamily family, CodecBuffer key, CodecBuffer value) { opCache.put(family, key, value); } diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBTable.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBTable.java index f732735cbe35..2aef5daa3c90 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBTable.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBTable.java @@ -193,6 +193,14 @@ public void deleteRange(byte[] beginKey, byte[] endKey) throws RocksDatabaseExce db.deleteRange(family, beginKey, endKey); } + void deleteWithBatch(BatchOperation batch, CodecBuffer key) { + if (batch instanceof RDBBatchOperation) { + ((RDBBatchOperation) batch).delete(family, key); + } else { + throw new IllegalArgumentException("Unexpected batch class: " + batch.getClass().getSimpleName()); + } + } + @Override public void deleteWithBatch(BatchOperation batch, byte[] key) { if (batch instanceof RDBBatchOperation) { diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDatabase.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDatabase.java index 659954a861bd..5aff93518044 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDatabase.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDatabase.java @@ -299,7 +299,7 @@ public ColumnFamilyHandle getHandle() { return handle; } - public void batchDelete(ManagedWriteBatch writeBatch, byte[] key) + public void batchDelete(ManagedWriteBatch writeBatch, ByteBuffer key) throws RocksDatabaseException { try (UncheckedAutoCloseable ignored = acquire()) { writeBatch.delete(getHandle(), key); @@ -308,20 +308,6 @@ public void batchDelete(ManagedWriteBatch writeBatch, byte[] key) } } - public void batchPut(ManagedWriteBatch writeBatch, byte[] key, byte[] value) - throws RocksDatabaseException { - if (LOG.isDebugEnabled()) { - LOG.debug("batchPut array key {}", bytes2String(key)); - LOG.debug("batchPut array value {}", bytes2String(value)); - } - - try (UncheckedAutoCloseable ignored = acquire()) { - writeBatch.put(getHandle(), key, value); - } catch (RocksDBException e) { - throw toRocksDatabaseException(this, "batchPut key " + bytes2String(key), e); - } - } - public void batchPut(ManagedWriteBatch writeBatch, ByteBuffer key, ByteBuffer value) throws RocksDatabaseException { if (LOG.isDebugEnabled()) { diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java index 8000d48c6187..59e924529ce4 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java @@ -377,7 +377,19 @@ public void delete(KEY key) throws RocksDatabaseException, CodecException { @Override public void deleteWithBatch(BatchOperation batch, KEY key) throws CodecException { - rawTable.deleteWithBatch(batch, encodeKey(key)); + if (supportCodecBuffer) { + CodecBuffer keyBuffer = null; + try { + keyBuffer = keyCodec.toDirectCodecBuffer(key); + // The buffers will be released after commit. + rawTable.deleteWithBatch(batch, keyBuffer); + } catch (Exception e) { + IOUtils.closeQuietly(keyBuffer); + throw e; + } + } else { + rawTable.deleteWithBatch(batch, encodeKey(key)); + } } @Override From 78e1c8dd331d0d7f7229fd609d138338ced7c147 Mon Sep 17 00:00:00 2001 From: Swaminathan Balachandran Date: Wed, 24 Dec 2025 18:57:53 -0500 Subject: [PATCH 5/6] HDDS-14239. Implement DeleteRange with batch op with minimal code changes and simple logic Change-Id: If659fd2cf71d76909cb81c4362e2e02d85930635 --- .../container/metadata/DatanodeTable.java | 5 + hadoop-hdds/framework/pom.xml | 6 + .../hdds/utils/db/RDBBatchOperation.java | 221 ++++++++++++++++-- .../apache/hadoop/hdds/utils/db/RDBTable.java | 9 + .../hadoop/hdds/utils/db/RocksDatabase.java | 10 + .../apache/hadoop/hdds/utils/db/Table.java | 8 + .../hadoop/hdds/utils/db/TypedTable.java | 5 + .../hdds/utils/db/InMemoryTestTable.java | 5 + .../hdds/utils/db/TestRDBBatchOperation.java | 132 +++++++++++ .../hdds/utils/db/TestRDBTableStore.java | 84 ++++++- hadoop-hdds/managed-rocksdb/pom.xml | 22 +- .../TrackingUtilManagedWriteBatch.java | 195 ++++++++++++++++ .../hadoop/ozone/om/OmSnapshotManager.java | 7 +- pom.xml | 6 + 14 files changed, 676 insertions(+), 39 deletions(-) create mode 100644 hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBBatchOperation.java create mode 100644 hadoop-hdds/managed-rocksdb/src/test/java/org/apache/hadoop/hdds/utils/db/managed/TrackingUtilManagedWriteBatch.java diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeTable.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeTable.java index ed7a05027e8b..5b39147f3e29 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeTable.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeTable.java @@ -73,6 +73,11 @@ public void deleteWithBatch(BatchOperation batch, KEY key) throws CodecException table.deleteWithBatch(batch, key); } + @Override + public void deleteRangeWithBatch(BatchOperation batch, KEY beginKey, KEY endKey) throws CodecException { + table.deleteRangeWithBatch(batch, beginKey, endKey); + } + @Override public final KeyValueIterator iterator(KEY prefix, IteratorType type) { throw new UnsupportedOperationException("Iterating tables directly is not" + diff --git a/hadoop-hdds/framework/pom.xml b/hadoop-hdds/framework/pom.xml index ea7f08edd82b..27654930c412 100644 --- a/hadoop-hdds/framework/pom.xml +++ b/hadoop-hdds/framework/pom.xml @@ -307,6 +307,12 @@ test-jar test + + org.apache.ozone + hdds-managed-rocksdb + test-jar + test + org.apache.ozone hdds-test-utils diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBBatchOperation.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBBatchOperation.java index ecc9d37d7756..d3cd4ed7224a 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBBatchOperation.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBBatchOperation.java @@ -20,12 +20,17 @@ import com.google.common.base.Preconditions; import java.io.Closeable; import java.nio.ByteBuffer; +import java.util.Collection; +import java.util.Comparator; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.TreeMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Supplier; +import java.util.stream.Collectors; import org.apache.hadoop.hdds.utils.IOUtils; import org.apache.hadoop.hdds.utils.db.RocksDatabase.ColumnFamily; import org.apache.hadoop.hdds.utils.db.managed.ManagedDirectSlice; @@ -48,6 +53,7 @@ public final class RDBBatchOperation implements BatchOperation { private static final String PUT_OP = "PUT"; private static final String DELETE_OP = "DELETE"; + private static final String DELETE_RANGE_OP = "DELETE_RANGE"; private static final AtomicInteger BATCH_COUNT = new AtomicInteger(); private static final CodecBufferCodec DIRECT_CODEC_BUFFER_CODEC = CodecBufferCodec.get(true); @@ -85,7 +91,7 @@ private static String countSize2String(int count, long size) { * To implement {@link #equals(Object)} and {@link #hashCode()} * based on the contents of the bytes. */ - static final class Bytes implements Closeable { + static final class Bytes implements Comparable, Closeable { private AbstractSlice slice; /** Cache the hash value. */ private int hash; @@ -138,6 +144,12 @@ public String toString() { return slice.toString(); } + // This method mimics the ByteWiseComparator in RocksDB. + @Override + public int compareTo(RDBBatchOperation.Bytes that) { + return this.slice.compare(that.slice); + } + @Override public void close() { slice.close(); @@ -157,6 +169,10 @@ private Operation(Bytes keyBytes) { abstract int valLen(); + Bytes getKey() { + return keyBytes; + } + int totalLength() { return keyLen() + valLen(); } @@ -179,7 +195,7 @@ private final class DeleteOperation extends Operation { private final AtomicBoolean closed = new AtomicBoolean(false); private DeleteOperation(CodecBuffer key, Bytes keyBytes) { - super(keyBytes); + super(Objects.requireNonNull(keyBytes, "keyBytes == null")); this.key = Objects.requireNonNull(key, "key == null"); } @@ -221,7 +237,7 @@ private final class PutOperation extends Operation { private final AtomicBoolean closed = new AtomicBoolean(false); private PutOperation(CodecBuffer key, CodecBuffer value, Bytes keyBytes) { - super(keyBytes); + super(Objects.requireNonNull(keyBytes, "keyBytes == null")); this.key = key; this.value = value; } @@ -256,6 +272,55 @@ public void close() { } } + /** + * Delete range operation to be applied to a {@link ColumnFamily} batch. + */ + private final class DeleteRangeOperation extends Operation { + private final byte[] startKey; + private final byte[] endKey; + private final Bytes startKeyBytes; + private final Bytes endKeyBytes; + + private DeleteRangeOperation(byte[] startKey, byte[] endKey) { + super(null); + this.startKey = Objects.requireNonNull(startKey, "startKey == null"); + this.endKey = Objects.requireNonNull(endKey, "endKey == null"); + this.startKeyBytes = new Bytes(startKey); + this.endKeyBytes = new Bytes(endKey); + } + + @Override + public void apply(ColumnFamily family, ManagedWriteBatch batch) throws RocksDatabaseException { + family.batchDeleteRange(batch, startKey, endKey); + } + + @Override + public int keyLen() { + return startKey.length + endKey.length; + } + + @Override + public int valLen() { + return 0; + } + + @Override + public String getOpType() { + return DELETE_RANGE_OP; + } + + @Override + public void close() { + super.close(); + startKeyBytes.close(); + endKeyBytes.close(); + } + + private boolean contains(Bytes key) { + return startKeyBytes.compareTo(key) <= 0 && endKeyBytes.compareTo(key) > 0; + } + } + /** Cache and deduplicate db ops (put/delete). */ private class OpCache { /** A (family name -> {@link FamilyCache}) map. */ @@ -264,18 +329,41 @@ private class OpCache { /** A cache for a {@link ColumnFamily}. */ private class FamilyCache { private final ColumnFamily family; - /** - * A mapping of keys to operations for batch processing in the {@link FamilyCache}. - * The keys are represented as {@link Bytes} objects, encapsulating the byte array or buffer - * for efficient equality and hashing. The values are instances of {@link Operation}, representing - * different types of operations that can be applied to a {@link ColumnFamily}. + * A mapping of operation keys to their respective indices in {@code FamilyCache}. + * + * Key details: + * - Maintains a mapping of unique operation keys to their insertion or processing order. + * - Used internally to manage and sort operations during batch writes. + * - Facilitates filtering, overwriting, or deletion of operations based on their keys. + * + * Constraints: + * - Keys must be unique, represented using {@link Bytes}, to avoid collisions. + * - Each key is associated with a unique integer index to track insertion order. + * + * This field plays a critical role in managing the logical consistency and proper execution + * order of operations stored in the batch when interacting with a RocksDB-backed system. + */ + private final Map opsKeys = new HashMap<>(); + /** + * Maintains a mapping of unique operation indices to their corresponding {@code Operation} instances. * - * This field is intended to store pending batch updates before they are written to the database. - * It supports operations such as additions and deletions while maintaining the ability to overwrite - * existing entries when necessary. + * This map serves as the primary container for recording operations in preparation for a batch write + * within a RocksDB-backed system. Each operation is referenced by an integer index, which determines + * its insertion order and ensures correct sequencing during batch execution. + * + * Key characteristics: + * - Stores operations of type {@code Operation}. + * - Uses a unique integer key (index) for mapping each operation. + * - Serves as an intermediary structure during batch preparation and execution. + * + * Usage context: + * - This map is managed as part of the batch-writing process, which involves organizing, + * filtering, and applying multiple operations in a single cohesive batch. + * - Operations stored in this map are expected to define specific actions (e.g., put, delete, + * delete range) and their associated data (e.g., keys, values). */ - private final Map batchOps = new HashMap<>(); + private final Map batchOps = new HashMap<>(); private boolean isCommit; private long batchSize; @@ -283,17 +371,84 @@ private class FamilyCache { private int discardedCount; private int putCount; private int delCount; + private int delRangeCount; + private AtomicInteger opIndex; FamilyCache(ColumnFamily family) { this.family = family; + this.opIndex = new AtomicInteger(0); + } + + private DeleteRangeOperation findFirstDeleteRangeMatchingRange(Collection deleteRangeOps, + Bytes key) { + for (DeleteRangeOperation deleteRangeOp : deleteRangeOps) { + if (deleteRangeOp.contains(key)) { + return deleteRangeOp; + } + } + return null; } - /** Prepare batch write for the entire family. */ + /** + * Prepares a batch write operation for a RocksDB-backed system. + * + * This method ensures the orderly execution of operations accumulated in the batch, + * respecting their respective types and order of insertion. + * + * Key functionalities: + * 1. Ensures that the batch is not already committed before proceeding. + * 2. Sorts all operations by their `opIndex` to maintain a consistent execution order. + * 3. Filters and adapts operations to account for any delete range operations that might + * affect other operations in the batch: + * - Operations with keys that fall within the range specified by a delete range operation + * are discarded. + * - Delete range operations are executed in their correct order. + * 4. Applies remaining operations to the write batch, ensuring proper filtering and execution. + * 5. Logs a summary of the batch execution for debugging purposes. + * + * Throws: + * - RocksDatabaseException if any error occurs while applying operations to the write batch. + * + * Prerequisites: + * - The method assumes that the operations are represented by `Operation` objects, each of which + * encapsulates the logic for its specific type. + * - Delete range operations must be represented by the `DeleteRangeOperation` class. + */ void prepareBatchWrite() throws RocksDatabaseException { Preconditions.checkState(!isCommit, "%s is already committed.", this); isCommit = true; - for (Operation op : batchOps.values()) { - op.apply(family, writeBatch); + // Sort Entries based on opIndex and flush the operation to the batch in the same order. + List ops = batchOps.entrySet().stream().sorted(Comparator.comparingInt(Map.Entry::getKey)) + .map(Map.Entry::getValue).collect(Collectors.toList()); + TreeMap deleteRangeIndices = new TreeMap<>(); + int index = 0; + for (Operation op : ops) { + if (DELETE_RANGE_OP.equals(op.getOpType())) { + DeleteRangeOperation deleteRangeOp = (DeleteRangeOperation) op; + deleteRangeIndices.put(index, deleteRangeOp); + } + index++; + } + + for (int idx = 0; idx < ops.size(); idx++) { + Operation op = ops.get(idx); + if (DELETE_RANGE_OP.equals(op.getOpType())) { + op.apply(family, writeBatch); + } else { + // Find the first delete range op matching which would contain the key after the + // operation has occurred. If there is no such operation then perform the operation otherwise discard the + // op. + DeleteRangeOperation deleteRangeOp = findFirstDeleteRangeMatchingRange( + deleteRangeIndices.tailMap(idx, false).values(), op.getKey()); + if (deleteRangeOp == null) { + op.apply(family, writeBatch); + } else { + debug(() -> String.format("Discarding Operation with Key: %s as it falls within the range of [%s, %s)", + op.getKey(), deleteRangeOp.startKeyBytes, deleteRangeOp.endKeyBytes)); + discardedCount++; + discardedSize += op.totalLength(); + } + } } debug(this::summary); } @@ -316,8 +471,10 @@ void clear() { } private void deleteIfExist(Bytes key) { - final Operation previous = batchOps.remove(key); - if (previous != null) { + // remove previous first in order to call release() + if (opsKeys.containsKey(key)) { + int previousIndex = opsKeys.remove(key); + final Operation previous = batchOps.remove(previousIndex); previous.close(); discardedSize += previous.totalLength(); discardedCount++; @@ -330,9 +487,10 @@ void overWriteOpIfExist(Bytes key, Operation operation) { Preconditions.checkState(!isCommit, "%s is already committed.", this); deleteIfExist(key); batchSize += operation.totalLength(); - Operation overwritten = batchOps.put(key, operation); - Preconditions.checkState(overwritten == null); - + int newIndex = opIndex.getAndIncrement(); + final Integer overwrittenOpKey = opsKeys.put(key, newIndex); + final Operation overwrittenOp = batchOps.put(newIndex, operation); + Preconditions.checkState(overwrittenOpKey == null && overwrittenOp == null); debug(() -> String.format("%s %s, %s; key=%s", this, DELETE_OP.equals(operation.getOpType()) ? delString(operation.totalLength()) : putString(operation.keyLen(), operation.valLen()), @@ -351,7 +509,7 @@ void put(byte[] key, byte[] value) { putCount++; CodecBuffer keyBuffer = DIRECT_CODEC_BUFFER_CODEC.fromPersistedFormat(key); CodecBuffer valueBuffer = DIRECT_CODEC_BUFFER_CODEC.fromPersistedFormat(value); - Bytes keyBytes = new Bytes(key); + Bytes keyBytes = new Bytes(keyBuffer); overWriteOpIfExist(keyBytes, new PutOperation(keyBuffer, valueBuffer, keyBytes)); } @@ -359,6 +517,7 @@ void delete(byte[] key) { delCount++; CodecBuffer keyBuffer = DIRECT_CODEC_BUFFER_CODEC.fromPersistedFormat(key); Bytes keyBytes = new Bytes(keyBuffer); + overWriteOpIfExist(keyBytes, new DeleteOperation(keyBuffer, keyBytes)); } @@ -368,6 +527,11 @@ void delete(CodecBuffer key) { overWriteOpIfExist(keyBytes, new DeleteOperation(key, keyBytes)); } + void deleteRange(byte[] startKey, byte[] endKey) { + delRangeCount++; + batchOps.put(opIndex.getAndIncrement(), new DeleteRangeOperation(startKey, endKey)); + } + String putString(int keySize, int valueSize) { return String.format("put(key: %s, value: %s), #put=%s", byteSize2String(keySize), byteSize2String(valueSize), putCount); @@ -409,6 +573,11 @@ void delete(ColumnFamily family, CodecBuffer key) { name2cache.computeIfAbsent(family.getName(), k -> new FamilyCache(family)).delete(key); } + void deleteRange(ColumnFamily family, byte[] startKey, byte[] endKey) { + name2cache.computeIfAbsent(family.getName(), k -> new FamilyCache(family)) + .deleteRange(startKey, endKey); + } + /** Prepare batch write for the entire cache. */ UncheckedAutoCloseable prepareBatchWrite() throws RocksDatabaseException { for (Map.Entry e : name2cache.entrySet()) { @@ -430,6 +599,7 @@ String getCommitString() { int opSize = 0; int discardedCount = 0; int discardedSize = 0; + int delRangeCount = 0; for (FamilyCache f : name2cache.values()) { putCount += f.putCount; @@ -437,12 +607,13 @@ String getCommitString() { opSize += f.batchSize; discardedCount += f.discardedCount; discardedSize += f.discardedSize; + delRangeCount += f.delRangeCount; } final int opCount = putCount + delCount; return String.format( - "#put=%s, #del=%s, batchSize: %s, discarded: %s, committed: %s", - putCount, delCount, + "#put=%s, #del=%s, #delRange=%s, batchSize: %s, discarded: %s, committed: %s", + putCount, delCount, delRangeCount, countSize2String(opCount, opSize), countSize2String(discardedCount, discardedSize), countSize2String(opCount - discardedCount, opSize - discardedSize)); @@ -496,4 +667,8 @@ public void put(ColumnFamily family, CodecBuffer key, CodecBuffer value) { public void put(ColumnFamily family, byte[] key, byte[] value) { opCache.put(family, key, value); } + + public void deleteRange(ColumnFamily family, byte[] startKey, byte[] endKey) { + opCache.deleteRange(family, startKey, endKey); + } } diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBTable.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBTable.java index 2aef5daa3c90..ec9d900a1d47 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBTable.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBTable.java @@ -211,6 +211,15 @@ public void deleteWithBatch(BatchOperation batch, byte[] key) { } + @Override + public void deleteRangeWithBatch(BatchOperation batch, byte[] beginKey, byte[] endKey) { + if (batch instanceof RDBBatchOperation) { + ((RDBBatchOperation) batch).deleteRange(family, beginKey, endKey); + } else { + throw new IllegalArgumentException("batch should be RDBBatchOperation"); + } + } + @Override public KeyValueIterator iterator(byte[] prefix, IteratorType type) throws RocksDatabaseException { diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDatabase.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDatabase.java index 5aff93518044..621178f687d5 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDatabase.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDatabase.java @@ -308,6 +308,16 @@ public void batchDelete(ManagedWriteBatch writeBatch, ByteBuffer key) } } + public void batchDeleteRange(ManagedWriteBatch writeBatch, byte[] beginKey, byte[] endKey) + throws RocksDatabaseException { + try (UncheckedAutoCloseable ignored = acquire()) { + writeBatch.deleteRange(getHandle(), beginKey, endKey); + } catch (RocksDBException e) { + throw toRocksDatabaseException(this, "batchDeleteRange key " + bytes2String(beginKey) + " - " + + bytes2String(endKey), e); + } + } + public void batchPut(ManagedWriteBatch writeBatch, ByteBuffer key, ByteBuffer value) throws RocksDatabaseException { if (LOG.isDebugEnabled()) { diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/Table.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/Table.java index fc0490344062..6904f22d7d8c 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/Table.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/Table.java @@ -134,6 +134,14 @@ default VALUE getReadCopy(KEY key) throws RocksDatabaseException, CodecException */ void deleteWithBatch(BatchOperation batch, KEY key) throws CodecException; + /** + * Deletes a range of keys from the metadata store as part of a batch operation. + * @param batch Batch operation to perform the delete operation. + * @param beginKey start metadata key, inclusive. + * @param endKey end metadata key, exclusive. + */ + void deleteRangeWithBatch(BatchOperation batch, KEY beginKey, KEY endKey) throws CodecException; + /** * Deletes a range of keys from the metadata store. * diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java index 59e924529ce4..bd0f6321b5bc 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java @@ -392,6 +392,11 @@ public void deleteWithBatch(BatchOperation batch, KEY key) throws CodecException } } + @Override + public void deleteRangeWithBatch(BatchOperation batch, KEY beginKey, KEY endKey) throws CodecException { + rawTable.deleteRangeWithBatch(batch, encodeKey(beginKey), encodeKey(endKey)); + } + @Override public void deleteRange(KEY beginKey, KEY endKey) throws RocksDatabaseException, CodecException { rawTable.deleteRange(encodeKey(beginKey), encodeKey(endKey)); diff --git a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/InMemoryTestTable.java b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/InMemoryTestTable.java index 1dbb5029713a..7f2ce3fc3a5b 100644 --- a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/InMemoryTestTable.java +++ b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/InMemoryTestTable.java @@ -90,6 +90,11 @@ public void deleteWithBatch(BatchOperation batch, KEY key) { throw new UnsupportedOperationException(); } + @Override + public void deleteRangeWithBatch(BatchOperation batch, KEY beginKey, KEY endKey) { + throw new UnsupportedOperationException(); + } + @Override public void deleteRange(KEY beginKey, KEY endKey) { map.subMap(beginKey, endKey).clear(); diff --git a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBBatchOperation.java b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBBatchOperation.java new file mode 100644 index 000000000000..4f6db2a45bf2 --- /dev/null +++ b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBBatchOperation.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.utils.db; + +import static org.apache.hadoop.hdds.StringUtils.string2Bytes; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.when; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import java.nio.ByteBuffer; +import java.util.List; +import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksObjectUtils; +import org.apache.hadoop.hdds.utils.db.managed.ManagedWriteBatch; +import org.apache.hadoop.hdds.utils.db.managed.TrackingUtilManagedWriteBatch; +import org.apache.hadoop.hdds.utils.db.managed.TrackingUtilManagedWriteBatch.OpType; +import org.apache.hadoop.hdds.utils.db.managed.TrackingUtilManagedWriteBatch.Operation; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.RocksDBException; + +/** + * Test class for verifying batch operations with delete ranges using the + * RDBBatchOperation and MockedConstruction of ManagedWriteBatch. + * + * This test class includes: + * - Mocking and tracking of operations including put, delete, and delete range + * within a batch operation. + * - Validation of committed operations using assertions on collected data. + * - Ensures that the batch operation interacts correctly with the + * RocksDatabase and ColumnFamilyHandle components. + * + * The test method includes: + * 1. Setup of mocked ColumnFamilyHandle and RocksDatabase.ColumnFamily. + * 2. Mocking of methods to track operations performed on*/ +public class TestRDBBatchOperation { + + static { + ManagedRocksObjectUtils.loadRocksDBLibrary(); + } + + private static Operation getOperation(String key, String value, OpType opType) { + return new Operation(string2Bytes(key), value == null ? null : string2Bytes(value), opType); + } + + @Test + public void testBatchOperationWithDeleteRange() throws RocksDatabaseException, CodecException, RocksDBException { + try (TrackingUtilManagedWriteBatch writeBatch = new TrackingUtilManagedWriteBatch(); + RDBBatchOperation batchOperation = RDBBatchOperation.newAtomicOperation(writeBatch)) { + ColumnFamilyHandle columnFamilyHandle = Mockito.mock(ColumnFamilyHandle.class); + RocksDatabase.ColumnFamily columnFamily = Mockito.mock(RocksDatabase.ColumnFamily.class); + doAnswer((i) -> { + ((ManagedWriteBatch)i.getArgument(0)) + .put(columnFamilyHandle, (ByteBuffer) i.getArgument(1), (ByteBuffer) i.getArgument(2)); + return null; + }).when(columnFamily).batchPut(any(ManagedWriteBatch.class), any(ByteBuffer.class), any(ByteBuffer.class)); + + doAnswer((i) -> { + ((ManagedWriteBatch)i.getArgument(0)) + .deleteRange(columnFamilyHandle, (byte[]) i.getArgument(1), (byte[]) i.getArgument(2)); + return null; + }).when(columnFamily).batchDeleteRange(any(ManagedWriteBatch.class), any(byte[].class), any(byte[].class)); + + doAnswer((i) -> { + ((ManagedWriteBatch)i.getArgument(0)) + .delete(columnFamilyHandle, (ByteBuffer) i.getArgument(1)); + return null; + }).when(columnFamily).batchDelete(any(ManagedWriteBatch.class), any(ByteBuffer.class)); + + when(columnFamily.getHandle()).thenReturn(columnFamilyHandle); + when(columnFamilyHandle.getName()).thenReturn(string2Bytes("test")); + when(columnFamily.getName()).thenReturn("test"); + Codec codec = StringCodec.get(); + // OP1 + batchOperation.put(columnFamily, codec.toDirectCodecBuffer("key01"), codec.toDirectCodecBuffer("value01")); + // OP2 + batchOperation.put(columnFamily, codec.toPersistedFormat("key02"), codec.toPersistedFormat("value02")); + // OP3 + batchOperation.put(columnFamily, codec.toDirectCodecBuffer("key03"), codec.toDirectCodecBuffer("value03")); + // OP4 + batchOperation.put(columnFamily, codec.toPersistedFormat("key03"), codec.toPersistedFormat("value04")); + // OP5 + batchOperation.delete(columnFamily, codec.toDirectCodecBuffer("key05")); + // OP6 : This delete operation should get skipped because of OP11 + batchOperation.delete(columnFamily, codec.toPersistedFormat("key10")); + // OP7 + batchOperation.deleteRange(columnFamily, codec.toPersistedFormat("key01"), codec.toPersistedFormat("key02")); + // OP8 + batchOperation.deleteRange(columnFamily, codec.toPersistedFormat("key02"), codec.toPersistedFormat("key03")); + // OP9 + batchOperation.put(columnFamily, codec.toDirectCodecBuffer("key04"), codec.toDirectCodecBuffer("value04")); + // OP10 + batchOperation.put(columnFamily, codec.toPersistedFormat("key06"), codec.toPersistedFormat("value05")); + // OP11 + batchOperation.deleteRange(columnFamily, codec.toPersistedFormat("key06"), codec.toPersistedFormat("key12")); + // OP12 + batchOperation.deleteRange(columnFamily, codec.toPersistedFormat("key09"), codec.toPersistedFormat("key10")); + + RocksDatabase db = Mockito.mock(RocksDatabase.class); + doNothing().when(db).batchWrite(any()); + batchOperation.commit(db); + List expectedOps = ImmutableList.of( + getOperation("key03", "value04", OpType.PUT_DIRECT), + getOperation("key05", null, OpType.DELETE_DIRECT), + getOperation("key01", "key02", OpType.DELETE_RANGE_INDIRECT), + getOperation("key02", "key03", OpType.DELETE_RANGE_INDIRECT), + getOperation("key04", "value04", OpType.PUT_DIRECT), + getOperation("key06", "key12", OpType.DELETE_RANGE_INDIRECT), + getOperation("key09", "key10", OpType.DELETE_RANGE_INDIRECT)); + assertEquals(ImmutableMap.of("test", expectedOps), writeBatch.getOperations()); + } + } +} diff --git a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBTableStore.java b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBTableStore.java index 2741834c9d75..cd155f27a96f 100644 --- a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBTableStore.java +++ b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBTableStore.java @@ -17,6 +17,7 @@ package org.apache.hadoop.hdds.utils.db; +import static org.apache.hadoop.hdds.StringUtils.bytes2String; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; @@ -66,7 +67,7 @@ public class TestRDBTableStore { public static final int MAX_DB_UPDATES_SIZE_THRESHOLD = 80; private static int count = 0; private final List families = - Arrays.asList(StringUtils.bytes2String(RocksDB.DEFAULT_COLUMN_FAMILY), + Arrays.asList(bytes2String(RocksDB.DEFAULT_COLUMN_FAMILY), "First", "Second", "Third", "Fourth", "Fifth", "Sixth", "Seventh", @@ -635,21 +636,21 @@ public void testPrefixedRangeKVs() throws Exception { // test start with a middle key startKey = StringUtils.string2Bytes( - StringUtils.bytes2String(samplePrefix) + "3"); + bytes2String(samplePrefix) + "3"); rangeKVs = testTable.getRangeKVs(startKey, blockCount, samplePrefix); assertEquals(2, rangeKVs.size()); // test with a filter - final KeyPrefixFilter filter1 = KeyPrefixFilter.newFilter(StringUtils.bytes2String(samplePrefix) + "1"); + final KeyPrefixFilter filter1 = KeyPrefixFilter.newFilter(bytes2String(samplePrefix) + "1"); startKey = StringUtils.string2Bytes( - StringUtils.bytes2String(samplePrefix)); + bytes2String(samplePrefix)); rangeKVs = testTable.getRangeKVs(startKey, blockCount, samplePrefix, filter1); assertEquals(1, rangeKVs.size()); // test start with a non-exist key startKey = StringUtils.string2Bytes( - StringUtils.bytes2String(samplePrefix) + 123); + bytes2String(samplePrefix) + 123); rangeKVs = testTable.getRangeKVs(startKey, 10, samplePrefix); assertEquals(0, rangeKVs.size()); } @@ -775,4 +776,77 @@ private void populateTable(Table table, } } } + + @Test + public void batchDeleteWithRange() throws Exception { + final Table testTable = rdbStore.getTable("Fifth"); + try (BatchOperation batch = rdbStore.initBatchOperation()) { + + //given + String keyStr = RandomStringUtils.secure().next(10); + byte[] startKey = ("1-" + keyStr).getBytes(StandardCharsets.UTF_8); + byte[] keyInRange1 = ("2-" + keyStr).getBytes(StandardCharsets.UTF_8); + byte[] keyInRange2 = ("3-" + keyStr).getBytes(StandardCharsets.UTF_8); + byte[] endKey = ("4-" + keyStr).getBytes(StandardCharsets.UTF_8); + byte[] value = + RandomStringUtils.secure().next(10).getBytes(StandardCharsets.UTF_8); + testTable.put(startKey, value); + testTable.put(keyInRange1, value); + testTable.put(keyInRange2, value); + testTable.put(endKey, value); + assertNotNull(testTable.get(startKey)); + assertNotNull(testTable.get(keyInRange1)); + assertNotNull(testTable.get(keyInRange2)); + assertNotNull(testTable.get(endKey)); + + //when + testTable.deleteRangeWithBatch(batch, startKey, endKey); + rdbStore.commitBatchOperation(batch); + + //then + assertNull(testTable.get(startKey)); + assertNull(testTable.get(keyInRange1)); + assertNull(testTable.get(keyInRange2)); + assertNotNull(testTable.get(endKey)); + } + } + + @Test + public void orderOfBatchOperations() throws Exception { + final Table testTable = rdbStore.getTable("Fifth"); + try (BatchOperation batch = rdbStore.initBatchOperation()) { + + //given + String keyStr = RandomStringUtils.secure().next(10); + byte[] startKey = ("1-" + keyStr).getBytes(StandardCharsets.UTF_8); + byte[] keyInRange1 = ("2-" + keyStr).getBytes(StandardCharsets.UTF_8); + byte[] endKey = ("3-" + keyStr).getBytes(StandardCharsets.UTF_8); + byte[] value1 = ("value1-" + RandomStringUtils.secure().next(10)).getBytes(StandardCharsets.UTF_8); + byte[] value2 = ("value2-" + RandomStringUtils.secure().next(10)).getBytes(StandardCharsets.UTF_8); + byte[] value3 = ("value3-" + RandomStringUtils.secure().next(10)).getBytes(StandardCharsets.UTF_8); + + //when + testTable.putWithBatch(batch, startKey, value1); + testTable.putWithBatch(batch, keyInRange1, value1); + testTable.deleteWithBatch(batch, keyInRange1); + // ops map key should be <, 1> + testTable.deleteRangeWithBatch(batch, startKey, endKey); + testTable.putWithBatch(batch, startKey, value2); + testTable.putWithBatch(batch, keyInRange1, value2); + // ops map key is <, 2>. + testTable.deleteRangeWithBatch(batch, startKey, keyInRange1); + testTable.putWithBatch(batch, endKey, value1); + testTable.putWithBatch(batch, endKey, value2); + // ops map key is <, 3>. + testTable.deleteRangeWithBatch(batch, startKey, endKey); + testTable.putWithBatch(batch, startKey, value3); + + rdbStore.commitBatchOperation(batch); + + //then + assertEquals(bytes2String(value3), bytes2String(testTable.get(startKey))); + assertNull(testTable.get(keyInRange1)); + assertEquals(bytes2String(value2), bytes2String(testTable.get(endKey))); + } + } } diff --git a/hadoop-hdds/managed-rocksdb/pom.xml b/hadoop-hdds/managed-rocksdb/pom.xml index 5e6976500f96..1a1fb3a82be6 100644 --- a/hadoop-hdds/managed-rocksdb/pom.xml +++ b/hadoop-hdds/managed-rocksdb/pom.xml @@ -25,11 +25,6 @@ Apache Ozone HDDS Managed RocksDB Apache Ozone Managed RocksDB library - - - true - - com.google.guava @@ -63,6 +58,11 @@ org.slf4j slf4j-api + + org.apache.commons + commons-lang3 + test + @@ -74,6 +74,18 @@ none + + org.apache.maven.plugins + maven-jar-plugin + + + test-jar + + test-jar + + + + diff --git a/hadoop-hdds/managed-rocksdb/src/test/java/org/apache/hadoop/hdds/utils/db/managed/TrackingUtilManagedWriteBatch.java b/hadoop-hdds/managed-rocksdb/src/test/java/org/apache/hadoop/hdds/utils/db/managed/TrackingUtilManagedWriteBatch.java new file mode 100644 index 000000000000..eca3977d1efa --- /dev/null +++ b/hadoop-hdds/managed-rocksdb/src/test/java/org/apache/hadoop/hdds/utils/db/managed/TrackingUtilManagedWriteBatch.java @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.utils.db.managed; + +import static org.apache.hadoop.hdds.StringUtils.bytes2String; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.RocksDBException; + +/** + * The TrackingUtilManagedWriteBatch class extends ManagedWriteBatch to provide functionality + * for tracking operations in a managed write batch context. Operations such as put, delete, + * merge, and delete range are managed and tracked, along with their corresponding operation types. + * + * This class supports direct and indirect operation types, delineated in the OpType enumeration. + * Direct operations are created using ByteBuffers while indirect operations are created using + * byte arrays. + */ +public class TrackingUtilManagedWriteBatch extends ManagedWriteBatch { + + private final Map> operations = new HashMap<>(); + + /** + * The OpType enumeration defines the different types of operations performed in a batch. + */ + public enum OpType { + PUT_DIRECT, + DELETE_DIRECT, + MERGE_DIRECT, + DELETE_RANGE_INDIRECT, + PUT_INDIRECT, + DELETE_INDIRECT, + MERGE_INDIRECT, + } + + /** + * The Operation class represents an individual operation to be performed in the context of + * a batch operation, such as a database write, delete, or merge. Each operation is characterized + * by a key, value, and an operation type (OpType). + * + * Operations can be of different types, as defined in the OpType enumeration, which include + * actions such as put, delete, merge, and delete range, either direct or indirect. + */ + public static class Operation { + private final byte[] key; + private final byte[] value; + private final OpType opType; + + public Operation(byte[] key, byte[] value, OpType opType) { + this.key = Arrays.copyOf(key, key.length); + this.value = value == null ? null : Arrays.copyOf(value, value.length); + this.opType = opType; + } + + public Operation(byte[] key, OpType opType) { + this(key, null, opType); + } + + @Override + public final boolean equals(Object o) { + if (!(o instanceof Operation)) { + return false; + } + + Operation operation = (Operation) o; + return Arrays.equals(key, operation.key) && Arrays.equals(value, operation.value) && + opType == operation.opType; + } + + @Override + public final int hashCode() { + return Arrays.hashCode(key) + Arrays.hashCode(value) + opType.hashCode(); + } + + @Override + public String toString() { + return "Operation{" + + "key=" + bytes2String(key) + + ", value=" + (value == null ? null : bytes2String(value)) + + ", opType=" + opType + + '}'; + } + } + + public Map> getOperations() { + return operations; + } + + public TrackingUtilManagedWriteBatch() { + super(); + } + + private byte[] convert(ByteBuffer buffer) { + byte[] bytes = new byte[buffer.remaining()]; + buffer.get(bytes); + return bytes; + } + + @Override + public void delete(ColumnFamilyHandle columnFamilyHandle, byte[] key) throws RocksDBException { + operations.computeIfAbsent(bytes2String(columnFamilyHandle.getName()), k -> new ArrayList<>()) + .add(new Operation(key, OpType.DELETE_INDIRECT)); + } + + @Override + public void delete(ColumnFamilyHandle columnFamilyHandle, ByteBuffer key) throws RocksDBException { + operations.computeIfAbsent(bytes2String(columnFamilyHandle.getName()), k -> new ArrayList<>()) + .add(new Operation(convert(key), OpType.DELETE_DIRECT)); + } + + @Override + public void delete(byte[] key) throws RocksDBException { + operations.computeIfAbsent("", k -> new ArrayList<>()).add(new Operation(key, OpType.DELETE_INDIRECT)); + } + + @Override + public void delete(ByteBuffer key) throws RocksDBException { + operations.computeIfAbsent("", k -> new ArrayList<>()) + .add(new Operation(convert(key), OpType.DELETE_DIRECT)); + } + + @Override + public void deleteRange(byte[] beginKey, byte[] endKey) { + operations.computeIfAbsent("", k -> new ArrayList<>()) + .add(new Operation(beginKey, endKey, OpType.DELETE_RANGE_INDIRECT)); + } + + @Override + public void deleteRange(ColumnFamilyHandle columnFamilyHandle, byte[] beginKey, byte[] endKey) + throws RocksDBException { + operations.computeIfAbsent(bytes2String(columnFamilyHandle.getName()), k -> new ArrayList<>()) + .add(new Operation(beginKey, endKey, OpType.DELETE_RANGE_INDIRECT)); + } + + @Override + public void merge(ColumnFamilyHandle columnFamilyHandle, byte[] key, byte[] value) throws RocksDBException { + operations.computeIfAbsent(bytes2String(columnFamilyHandle.getName()), k -> new ArrayList<>()) + .add(new Operation(key, value, OpType.MERGE_INDIRECT)); + } + + @Override + public void merge(byte[] key, byte[] value) { + operations.computeIfAbsent("", k -> new ArrayList<>()) + .add(new Operation(key, value, OpType.MERGE_INDIRECT)); + } + + @Override + public void put(ColumnFamilyHandle columnFamilyHandle, byte[] key, byte[] value) throws RocksDBException { + operations.computeIfAbsent(bytes2String(columnFamilyHandle.getName()), k -> new ArrayList<>()) + .add(new Operation(key, value, OpType.PUT_INDIRECT)); + } + + @Override + public void put(ColumnFamilyHandle columnFamilyHandle, ByteBuffer key, ByteBuffer value) throws RocksDBException { + operations.computeIfAbsent(bytes2String(columnFamilyHandle.getName()), k -> new ArrayList<>()) + .add(new Operation(convert(key), convert(value), OpType.PUT_DIRECT)); + } + + @Override + public void put(byte[] key, byte[] value) throws RocksDBException { + operations.computeIfAbsent("", k -> new ArrayList<>()).add(new Operation(key, value, OpType.PUT_INDIRECT)); + } + + @Override + public void put(ByteBuffer key, ByteBuffer value) throws RocksDBException { + operations.computeIfAbsent("", k -> new ArrayList<>()) + .add(new Operation(convert(key), convert(value), OpType.PUT_DIRECT)); + } + + @Override + public void close() { + super.close(); + } +} diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java index b488997b5228..77ed8a824874 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java @@ -92,7 +92,6 @@ import org.apache.hadoop.hdds.utils.db.RocksDBCheckpoint; import org.apache.hadoop.hdds.utils.db.RocksDatabase; import org.apache.hadoop.hdds.utils.db.Table; -import org.apache.hadoop.hdds.utils.db.TableIterator; import org.apache.hadoop.hdds.utils.db.managed.ManagedColumnFamilyOptions; import org.apache.hadoop.hdds.utils.db.managed.ManagedDBOptions; import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksDB; @@ -584,11 +583,7 @@ private static void deleteKeysFromTableWithBucketPrefix(OMMetadataManager metada String endKey = getLexicographicallyHigherString(prefix); LOG.debug("Deleting key range from {} - startKey: {}, endKey: {}", table.getName(), prefix, endKey); - try (TableIterator itr = table.keyIterator(prefix)) { - while (itr.hasNext()) { - table.deleteWithBatch(batchOperation, itr.next()); - } - } + table.deleteRangeWithBatch(batchOperation, prefix, endKey); } @VisibleForTesting diff --git a/pom.xml b/pom.xml index fef4c5177d0b..d68b95b6982e 100644 --- a/pom.xml +++ b/pom.xml @@ -1075,6 +1075,12 @@ hdds-managed-rocksdb ${hdds.version} + + org.apache.ozone + hdds-managed-rocksdb + ${hdds.version} + test-jar + org.apache.ozone hdds-rocks-native From 9c927e5261b6ca748e8132c9b2da5215b5e6b5c9 Mon Sep 17 00:00:00 2001 From: Swaminathan Balachandran Date: Thu, 25 Dec 2025 09:18:21 -0500 Subject: [PATCH 6/6] HDDS-14245. Optimize Prepare Batch looping to optimize delete range search using prefix count index structure Change-Id: I704dab8380d1dd35faa2b9453ef90b439937e95b --- .../hdds/utils/db/RDBBatchOperation.java | 81 ++++--- .../hadoop/hdds/utils/db/RangeQueryIndex.java | 191 +++++++++++++++ .../hdds/utils/db/TestRangeQueryIndex.java | 217 ++++++++++++++++++ 3 files changed, 448 insertions(+), 41 deletions(-) create mode 100644 hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RangeQueryIndex.java create mode 100644 hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRangeQueryIndex.java diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBBatchOperation.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBBatchOperation.java index d3cd4ed7224a..808a34ca45f2 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBBatchOperation.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBBatchOperation.java @@ -19,14 +19,15 @@ import com.google.common.base.Preconditions; import java.io.Closeable; +import java.io.IOException; import java.nio.ByteBuffer; -import java.util.Collection; import java.util.Comparator; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.TreeMap; +import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Supplier; @@ -185,6 +186,11 @@ public void close() { keyBytes.close(); } } + + @Override + public String toString() { + return getOpType() + ", key=" + keyBytes; + } } /** @@ -278,15 +284,13 @@ public void close() { private final class DeleteRangeOperation extends Operation { private final byte[] startKey; private final byte[] endKey; - private final Bytes startKeyBytes; - private final Bytes endKeyBytes; + private final RangeQueryIndex.Range rangeEntry; private DeleteRangeOperation(byte[] startKey, byte[] endKey) { super(null); this.startKey = Objects.requireNonNull(startKey, "startKey == null"); this.endKey = Objects.requireNonNull(endKey, "endKey == null"); - this.startKeyBytes = new Bytes(startKey); - this.endKeyBytes = new Bytes(endKey); + this.rangeEntry = new RangeQueryIndex.Range<>(new Bytes(startKey), new Bytes(endKey)); } @Override @@ -312,12 +316,13 @@ public String getOpType() { @Override public void close() { super.close(); - startKeyBytes.close(); - endKeyBytes.close(); + rangeEntry.getStartInclusive().close(); + rangeEntry.getEndExclusive().close(); } - private boolean contains(Bytes key) { - return startKeyBytes.compareTo(key) <= 0 && endKeyBytes.compareTo(key) > 0; + @Override + public String toString() { + return getOpType() + ", rangeEntry=" + rangeEntry; } } @@ -379,16 +384,6 @@ private class FamilyCache { this.opIndex = new AtomicInteger(0); } - private DeleteRangeOperation findFirstDeleteRangeMatchingRange(Collection deleteRangeOps, - Bytes key) { - for (DeleteRangeOperation deleteRangeOp : deleteRangeOps) { - if (deleteRangeOp.contains(key)) { - return deleteRangeOp; - } - } - return null; - } - /** * Prepares a batch write operation for a RocksDB-backed system. * @@ -420,37 +415,41 @@ void prepareBatchWrite() throws RocksDatabaseException { // Sort Entries based on opIndex and flush the operation to the batch in the same order. List ops = batchOps.entrySet().stream().sorted(Comparator.comparingInt(Map.Entry::getKey)) .map(Map.Entry::getValue).collect(Collectors.toList()); - TreeMap deleteRangeIndices = new TreeMap<>(); - int index = 0; + Set> deleteRangeEntries = new HashSet<>(); for (Operation op : ops) { if (DELETE_RANGE_OP.equals(op.getOpType())) { DeleteRangeOperation deleteRangeOp = (DeleteRangeOperation) op; - deleteRangeIndices.put(index, deleteRangeOp); + deleteRangeEntries.add(deleteRangeOp.rangeEntry); } - index++; } - - for (int idx = 0; idx < ops.size(); idx++) { - Operation op = ops.get(idx); - if (DELETE_RANGE_OP.equals(op.getOpType())) { - op.apply(family, writeBatch); - } else { - // Find the first delete range op matching which would contain the key after the - // operation has occurred. If there is no such operation then perform the operation otherwise discard the - // op. - DeleteRangeOperation deleteRangeOp = findFirstDeleteRangeMatchingRange( - deleteRangeIndices.tailMap(idx, false).values(), op.getKey()); - if (deleteRangeOp == null) { + try { + RangeQueryIndex rangeQueryIdx = new RangeQueryIndex<>(deleteRangeEntries); + for (Operation op : ops) { + if (DELETE_RANGE_OP.equals(op.getOpType())) { + DeleteRangeOperation deleteRangeOp = (DeleteRangeOperation) op; + rangeQueryIdx.removeRange(deleteRangeOp.rangeEntry); op.apply(family, writeBatch); } else { - debug(() -> String.format("Discarding Operation with Key: %s as it falls within the range of [%s, %s)", - op.getKey(), deleteRangeOp.startKeyBytes, deleteRangeOp.endKeyBytes)); - discardedCount++; - discardedSize += op.totalLength(); + // Find a delete range op matching which would contain the key after the + // operation has occurred. If there is no such operation then perform the operation otherwise discard the + // op. + if (!rangeQueryIdx.containsIntersectingRange(op.getKey())) { + op.apply(family, writeBatch); + } else { + debug(() -> { + RangeQueryIndex.Range deleteRangeOp = rangeQueryIdx.getFirstIntersectingRange(op.getKey()); + return String.format("Discarding Operation with Key: %s as it falls within the range of [%s, %s)", + op.getKey(), deleteRangeOp.getStartInclusive(), deleteRangeOp.getEndExclusive()); + }); + discardedCount++; + discardedSize += op.totalLength(); + } } } + debug(this::summary); + } catch (IOException e) { + throw new RocksDatabaseException("Failed to prepare batch write", e); } - debug(this::summary); } private String summary() { diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RangeQueryIndex.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RangeQueryIndex.java new file mode 100644 index 000000000000..89c6d16905ab --- /dev/null +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RangeQueryIndex.java @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.utils.db; + +import java.io.IOException; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.TreeMap; + +/** + * An index for answering "does this point fall within any of these ranges?" efficiently. + * + *

The indexed ranges are half-open intervals of the form + * {@code [startInclusive, endExclusive)}. + * + *

Core idea (sweep-line / prefix-sum over range boundaries): + * Instead of scanning every range on each query, this index stores a sorted map from + * boundary points to a running count of "active" ranges at that point. + * + *

    + *
  • For each range {@code [s, e)}, we add a delta {@code +1} at {@code s} and a delta + * {@code -1} at {@code e}.
  • + *
  • We then convert the deltas into a prefix sum in key order, so every boundary key + * stores the number of ranges active at that coordinate.
  • + *
  • For any query point {@code k}, the active count is {@code floorEntry(k).value}. + * If it is {@code > 0}, then {@code k} intersects at least one range.
  • + *
+ * + *

Update model: this index supports only removing ranges that were part of the + * initial set. Removal updates the prefix sums for keys in {@code [startInclusive, endExclusive)} + * (net effect of removing {@code +1} at start and {@code -1} at end). + * + *

Complexities: + *

    + *
  • Build: {@code O(R log B)} where {@code R} is #ranges and {@code B} is #distinct boundaries.
  • + *
  • {@link #containsIntersectingRange(Object)}: {@code O(log B)}.
  • + *
  • {@link #removeRange(Range)}: {@code O(log B + K)} where {@code K} is #boundaries in the range.
  • + *
+ * + * @param boundary type (must be {@link Comparable} to be stored in a {@link TreeMap}) + */ +class RangeQueryIndex> { + + private final TreeMap rangeCountIndexMap; + private final Set> ranges; + + RangeQueryIndex(Set> ranges) { + this.rangeCountIndexMap = new TreeMap<>(); + this.ranges = ranges; + init(); + } + + private void init() { + // Phase 1: store boundary deltas (+1 at start, -1 at end). + for (Range range : ranges) { + rangeCountIndexMap.compute(range.startInclusive, (k, v) -> v == null ? 1 : v + 1); + rangeCountIndexMap.compute(range.endExclusive, (k, v) -> v == null ? -1 : v - 1); + } + + // Phase 2: convert deltas to prefix sums so each key holds the active range count at that coordinate. + int totalCount = 0; + for (Map.Entry entry : rangeCountIndexMap.entrySet()) { + totalCount += entry.getValue(); + entry.setValue(totalCount); + } + } + + /** + * Remove a range from the index. + * + *

This method assumes the range set is "popped" over time (ranges are removed but not added). + * Internally, removing {@code [s, e)} decreases the active count by 1 for all boundary keys in + * {@code [s, e)} and leaves counts outside the range unchanged. + * + * @throws IOException if the given {@code range} is not part of the indexed set + */ + void removeRange(Range range) throws IOException { + if (!ranges.contains(range)) { + throw new IOException(String.format("Range %s not found in index structure : %s", range, ranges)); + } + ranges.remove(range); + for (Map.Entry entry : rangeCountIndexMap.subMap(range.startInclusive, true, + range.endExclusive, false).entrySet()) { + entry.setValue(entry.getValue() - 1); + } + } + + /** + * @return true iff {@code key} is contained in at least one indexed range. + * + *

Implementation detail: uses {@link TreeMap#floorEntry(Object)} to find the last boundary + * at or before {@code key}, and checks the prefix-summed active count at that point.

+ */ + boolean containsIntersectingRange(T key) { + Map.Entry countEntry = rangeCountIndexMap.floorEntry(key); + if (countEntry == null) { + return false; + } + return countEntry.getValue() > 0; + } + + /** + * Returns an intersecting range containing {@code key}, if any. + * + *

This method first checks {@link #containsIntersectingRange(Comparable)} using the index; + * if the count indicates an intersection exists, it then scans the backing {@link #ranges} + * set to find a concrete {@link Range} that contains {@code key}.

+ * + *

Note that because {@link #ranges} is a {@link Set}, "first" refers to whatever iteration + * order that set provides (it is not guaranteed to be deterministic unless the provided set is).

+ * + * @return a containing range, or null if none intersect + */ + Range getFirstIntersectingRange(T key) { + Map.Entry countEntry = rangeCountIndexMap.floorEntry(key); + if (countEntry == null) { + return null; + } + for (Range range : ranges) { + if (range.contains(key)) { + return range; + } + } + return null; + } + + /** + * A half-open interval {@code [startInclusive, endExclusive)}. + * + *

For a value {@code k} to be contained, it must satisfy: + * {@code startInclusive <= k < endExclusive} (according to {@link Comparable#compareTo(Object)}).

+ */ + static final class Range> { + private final T startInclusive; + private final T endExclusive; + + Range(T startInclusive, T endExclusive) { + this.startInclusive = Objects.requireNonNull(startInclusive, "start == null"); + this.endExclusive = Objects.requireNonNull(endExclusive, "end == null"); + } + + @Override + public boolean equals(Object o) { + return this == o; + } + + @Override + public int hashCode() { + return Objects.hash(startInclusive, endExclusive); + } + + T getStartInclusive() { + return startInclusive; + } + + T getEndExclusive() { + return endExclusive; + } + + /** + * @return true iff {@code key} is within {@code [startInclusive, endExclusive)}. + */ + public boolean contains(T key) { + return startInclusive.compareTo(key) <= 0 && key.compareTo(endExclusive) < 0; + } + + @Override + public String toString() { + return "Range{" + + "startInclusive=" + startInclusive + + ", endExclusive=" + endExclusive + + '}'; + } + } +} diff --git a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRangeQueryIndex.java b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRangeQueryIndex.java new file mode 100644 index 000000000000..aa75d053902e --- /dev/null +++ b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRangeQueryIndex.java @@ -0,0 +1,217 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.utils.db; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertSame; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.io.IOException; +import java.util.LinkedHashSet; +import java.util.Set; +import org.apache.hadoop.hdds.utils.db.RangeQueryIndex.Range; +import org.junit.jupiter.api.Test; + +/** + * Test class for validating the behavior and functionality of the {@code RangeQueryIndex} class. + * + *

This class contains a collection of unit tests to ensure correct behavior of the range + * indexing system under various scenarios, such as intersections, overlaps, boundary conditions, + * and removal of range objects. The tests leverage the {@code Range} class for defining + * half-open intervals and test different operations provided by the {@code RangeQueryIndex}. + * + *

The tested operations include: + * - Checking for intersecting ranges. + * - Retrieving the first intersecting range. + * - Handling overlaps and nested ranges. + * - Adjacency between ranges. + * - Behaviors when handling duplicate ranges or ranges with identical bounds but different instances. + * - Error conditions when attempting invalid removals of ranges. + */ +public class TestRangeQueryIndex { + + @Test + public void testContainsIntersectingRangeHalfOpenBoundaries() { + final Range r1 = new Range<>(10, 20); // [10, 20) + final Range r2 = new Range<>(30, 40); // [30, 40) + final Set> ranges = new LinkedHashSet<>(); + ranges.add(r1); + ranges.add(r2); + + final RangeQueryIndex index = new RangeQueryIndex<>(ranges); + + // Before first range + assertFalse(index.containsIntersectingRange(0)); + assertFalse(index.containsIntersectingRange(9)); + + // Start is inclusive + assertTrue(index.containsIntersectingRange(10)); + assertTrue(index.containsIntersectingRange(19)); + + // End is exclusive + assertFalse(index.containsIntersectingRange(20)); + assertFalse(index.containsIntersectingRange(29)); + + // Second range + assertTrue(index.containsIntersectingRange(30)); + assertTrue(index.containsIntersectingRange(39)); + assertFalse(index.containsIntersectingRange(40)); + assertFalse(index.containsIntersectingRange(100)); + } + + @Test + public void testGetFirstIntersectingRangeAndRemovalWithOverlaps() throws Exception { + // Use LinkedHashSet to make iteration order deterministic for getFirstIntersectingRange(). + final Range r2 = new Range<>(5, 15); // overlaps with r1 for [5, 10) + final Range r1 = new Range<>(0, 10); + final Set> ranges = new LinkedHashSet<>(); + ranges.add(r2); + ranges.add(r1); + + final RangeQueryIndex index = new RangeQueryIndex<>(ranges); + + assertTrue(index.containsIntersectingRange(7)); + final Range first = index.getFirstIntersectingRange(7); + assertNotNull(first); + assertSame(r2, first, "should return the first containing range in set iteration order"); + + index.removeRange(r2); + assertTrue(index.containsIntersectingRange(7), "still intersecting due to remaining overlapping range"); + assertSame(r1, index.getFirstIntersectingRange(7)); + + index.removeRange(r1); + assertFalse(index.containsIntersectingRange(7)); + assertNull(index.getFirstIntersectingRange(7)); + } + + @Test + public void testAdjacentRangesShareBoundary() { + final Range left = new Range<>(0, 10); // [0, 10) + final Range right = new Range<>(10, 20); // [10, 20) + final Set> ranges = new LinkedHashSet<>(); + ranges.add(left); + ranges.add(right); + final RangeQueryIndex index = new RangeQueryIndex<>(ranges); + + // End is exclusive for left; start is inclusive for right. + assertTrue(index.containsIntersectingRange(9)); + assertTrue(index.containsIntersectingRange(0)); + assertTrue(index.containsIntersectingRange(10)); + assertTrue(index.containsIntersectingRange(19)); + assertFalse(index.containsIntersectingRange(20)); + } + + @Test + public void testMultipleOverlapsAndNestedRangesRemovalOrder() throws Exception { + // rOuter covers everything; rMid overlaps partially; rInner is nested. + final Range rOuter = new Range<>(0, 100); // [0, 100) + final Range rMid = new Range<>(20, 80); // [20, 80) + final Range rInner = new Range<>(30, 40); // [30, 40) + final Set> ranges = new LinkedHashSet<>(); + ranges.add(rOuter); + ranges.add(rMid); + ranges.add(rInner); + final RangeQueryIndex index = new RangeQueryIndex<>(ranges); + + // Covered by outer only + assertTrue(index.containsIntersectingRange(10)); + assertSame(rOuter, index.getFirstIntersectingRange(10)); + + // Covered by all three + assertTrue(index.containsIntersectingRange(35)); + assertSame(rOuter, index.getFirstIntersectingRange(35)); + + // Remove the middle range first. + index.removeRange(rMid); + assertTrue(index.containsIntersectingRange(35), "still covered by outer + inner"); + + // Remove the inner range next. + index.removeRange(rInner); + assertTrue(index.containsIntersectingRange(35), "still covered by outer"); + + // Now remove the outer range; should become uncovered. + index.removeRange(rOuter); + assertFalse(index.containsIntersectingRange(35)); + assertNull(index.getFirstIntersectingRange(35)); + } + + @Test + public void testDuplicateSameBoundsDifferentInstances() throws Exception { + // Range.equals is identity-based, so two ranges with the same bounds can co-exist in the Set. + final Range r1 = new Range<>(0, 10); + final Range r2 = new Range<>(0, 10); + final Set> ranges = new LinkedHashSet<>(); + ranges.add(r1); + ranges.add(r2); + + final RangeQueryIndex index = new RangeQueryIndex<>(ranges); + assertTrue(index.containsIntersectingRange(5)); + + // Remove one instance: should still intersect due to the other. + index.removeRange(r1); + assertTrue(index.containsIntersectingRange(5)); + + // Remove the second instance: now it should not intersect. + index.removeRange(r2); + assertFalse(index.containsIntersectingRange(5)); + } + + @Test + public void testRemoveSameInstanceTwiceThrows() throws Exception { + final Range r = new Range<>(0, 10); + final Set> ranges = new LinkedHashSet<>(); + ranges.add(r); + + final RangeQueryIndex index = new RangeQueryIndex<>(ranges); + index.removeRange(r); + assertThrows(IOException.class, () -> index.removeRange(r)); + } + + @Test + public void testRemoveRangeNotFoundThrows() throws Exception { + final Range r1 = new Range<>(0, 10); + final Set> ranges = new LinkedHashSet<>(); + ranges.add(r1); + + final RangeQueryIndex index = new RangeQueryIndex<>(ranges); + + // Range.equals is identity-based, so a different object with same bounds is not "found". + final Range sameBoundsDifferentInstance = new Range<>(0, 10); + assertThrows(IOException.class, () -> index.removeRange(sameBoundsDifferentInstance)); + + // Removing the original instance works. + index.removeRange(r1); + assertFalse(index.containsIntersectingRange(0)); + } + + @Test + public void testRemoveRangeDifferentBoundsThrows() { + final Range r1 = new Range<>(0, 10); + final Set> ranges = new LinkedHashSet<>(); + ranges.add(r1); + final RangeQueryIndex index = new RangeQueryIndex<>(ranges); + + assertThrows(IOException.class, () -> index.removeRange(new Range<>(1, 2))); + assertTrue(index.containsIntersectingRange(1), "index should remain unchanged after failed remove"); + } +} + +