diff --git a/hadoop-hdds/framework/pom.xml b/hadoop-hdds/framework/pom.xml index ea7f08edd82..27654930c41 100644 --- a/hadoop-hdds/framework/pom.xml +++ b/hadoop-hdds/framework/pom.xml @@ -307,6 +307,12 @@ test-jar test + + org.apache.ozone + hdds-managed-rocksdb + test-jar + test + org.apache.ozone hdds-test-utils diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/CodecBufferCodec.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/CodecBufferCodec.java index 9d2944fab66..416cc8bb9c1 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/CodecBufferCodec.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/CodecBufferCodec.java @@ -39,12 +39,12 @@ */ public final class CodecBufferCodec implements Codec { - private static final Codec DIRECT_INSTANCE = new CodecBufferCodec(true); - private static final Codec NON_DIRECT_INSTANCE = new CodecBufferCodec(false); + private static final CodecBufferCodec DIRECT_INSTANCE = new CodecBufferCodec(true); + private static final CodecBufferCodec NON_DIRECT_INSTANCE = new CodecBufferCodec(false); private final CodecBuffer.Allocator allocator; - public static Codec get(boolean direct) { + public static CodecBufferCodec get(boolean direct) { return direct ? DIRECT_INSTANCE : NON_DIRECT_INSTANCE; } diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStore.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStore.java index f83f9b3d10a..6ef5174f0f9 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStore.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStore.java @@ -21,7 +21,6 @@ import java.io.File; import java.io.IOException; -import java.util.Collection; import java.util.Comparator; import java.util.List; import java.util.Map; @@ -192,20 +191,20 @@ DBUpdatesWrapper getUpdatesSince(long sequenceNumber, long limitCount) * @return a closable iterator over merged key-value pairs, where each key corresponds * to a collection of values from the tables */ - default ClosableIterator>> getMergeIterator( + default ClosableIterator>> getMergeIterator( Comparator keyComparator, KEY prefix, Table... table) { List tableValues = IntStream.range(0, table.length).mapToObj(i -> null).collect(Collectors.toList()); KeyValue defaultNullValue = newKeyValue(null, null); Comparator> comparator = Comparator.comparing(KeyValue::getKey, keyComparator); return new MinHeapMergeIterator, Table.KeyValueIterator, - KeyValue>>(table.length, comparator) { + KeyValue>>(table.length, comparator) { @Override protected Table.KeyValueIterator getIterator(int idx) throws IOException { return table[idx].iterator(prefix); } @Override - protected KeyValue> merge(Map> keysToMerge) { + protected KeyValue> merge(Map> keysToMerge) { KEY key = keysToMerge.values().stream().findAny() .orElseThrow(() -> new NoSuchElementException("No keys found")).getKey(); for (int i = 0; i < tableValues.size(); i++) { diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBBatchOperation.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBBatchOperation.java index de181ae0c8d..549157f8975 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBBatchOperation.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBBatchOperation.java @@ -49,6 +49,7 @@ public final class RDBBatchOperation implements BatchOperation { static final Logger LOG = LoggerFactory.getLogger(RDBBatchOperation.class); private static final AtomicInteger BATCH_COUNT = new AtomicInteger(); + private static final CodecBufferCodec DIRECT_CODEC_BUFFER_CODEC = CodecBufferCodec.get(true); private final String name = "Batch-" + BATCH_COUNT.getAndIncrement(); @@ -135,16 +136,26 @@ public void close() { } } - private abstract static class Op implements Closeable { + private abstract static class SingleKeyOp extends Op { + private final CodecBuffer keyBuffer; private final Bytes keyBytes; - private Op(Bytes keyBytes) { - this.keyBytes = keyBytes; + private SingleKeyOp(CodecBuffer keyBuffer) { + this.keyBuffer = Objects.requireNonNull(keyBuffer); + this.keyBytes = Bytes.newBytes(keyBuffer); } - abstract void apply(ColumnFamily family, ManagedWriteBatch batch) throws RocksDatabaseException; + CodecBuffer getKeyBuffer() { + return keyBuffer; + } - abstract int keyLen(); + Bytes getKeyBytes() { + return keyBytes; + } + + int keyLen() { + return getKeyBuffer().readableBytes(); + } int valLen() { return 0; @@ -155,100 +166,75 @@ int totalLength() { } @Override - public void close() { - if (keyBytes != null) { - keyBytes.close(); + boolean closeImpl() { + if (super.closeImpl()) { + IOUtils.close(LOG, keyBuffer, keyBytes); + return true; } + return false; } } - /** - * Delete operation to be applied to a {@link ColumnFamily} batch. - */ - private static final class DeleteOp extends Op { - private final byte[] key; + private abstract static class Op implements Closeable { + private final AtomicBoolean closed = new AtomicBoolean(false); - private DeleteOp(byte[] key, Bytes keyBytes) { - super(Objects.requireNonNull(keyBytes, "keyBytes == null")); - this.key = Objects.requireNonNull(key, "key == null"); - } + abstract void apply(ColumnFamily family, ManagedWriteBatch batch) throws RocksDatabaseException; - @Override - public void apply(ColumnFamily family, ManagedWriteBatch batch) throws RocksDatabaseException { - family.batchDelete(batch, this.key); + abstract int totalLength(); + + boolean closeImpl() { + return closed.compareAndSet(false, true); } @Override - public int keyLen() { - return key.length; + public final void close() { + closeImpl(); } } /** - * Put operation to be applied to a {@link ColumnFamily} batch using the CodecBuffer api. + * Delete operation to be applied to a {@link ColumnFamily} batch. */ - private final class PutOp extends Op { - private final CodecBuffer key; - private final CodecBuffer value; - private final AtomicBoolean closed = new AtomicBoolean(false); + private static final class DeleteOp extends SingleKeyOp { - private PutOp(CodecBuffer key, CodecBuffer value, Bytes keyBytes) { - super(keyBytes); - this.key = key; - this.value = value; + private DeleteOp(CodecBuffer key) { + super(key); } @Override public void apply(ColumnFamily family, ManagedWriteBatch batch) throws RocksDatabaseException { - family.batchPut(batch, key.asReadOnlyByteBuffer(), value.asReadOnlyByteBuffer()); - } - - @Override - public int keyLen() { - return key.readableBytes(); - } - - @Override - public int valLen() { - return value.readableBytes(); - } - - @Override - public void close() { - if (closed.compareAndSet(false, true)) { - key.release(); - value.release(); - } - super.close(); + family.batchDelete(batch, this.getKeyBuffer().asReadOnlyByteBuffer()); } } /** - * Put operation to be applied to a {@link ColumnFamily} batch using the byte array api. + * Put operation to be applied to a {@link ColumnFamily} batch using the CodecBuffer api. */ - private static final class ByteArrayPutOp extends Op { - private final byte[] key; - private final byte[] value; + private final class PutOp extends SingleKeyOp { + private final CodecBuffer value; - private ByteArrayPutOp(byte[] key, byte[] value, Bytes keyBytes) { - super(keyBytes); - this.key = Objects.requireNonNull(key, "key == null"); + private PutOp(CodecBuffer key, CodecBuffer value) { + super(Objects.requireNonNull(key, "key == null")); this.value = Objects.requireNonNull(value, "value == null"); } @Override public void apply(ColumnFamily family, ManagedWriteBatch batch) throws RocksDatabaseException { - family.batchPut(batch, key, value); + family.batchPut(batch, getKeyBuffer().asReadOnlyByteBuffer(), value.asReadOnlyByteBuffer()); } @Override - public int keyLen() { - return key.length; + public int valLen() { + return value.readableBytes(); } @Override - public int valLen() { - return value.length; + boolean closeImpl() { + if (super.closeImpl()) { + IOUtils.close(LOG, value); + return true; + } + return false; } } @@ -271,7 +257,7 @@ private class FamilyCache { * It supports operations such as additions and deletions while maintaining the ability to overwrite * existing entries when necessary. */ - private final Map ops = new HashMap<>(); + private final Map ops = new HashMap<>(); private boolean isCommit; private long batchSize; @@ -312,7 +298,7 @@ void clear() { } private void deleteIfExist(Bytes key) { - final Op previous = ops.remove(key); + final SingleKeyOp previous = ops.remove(key); if (previous != null) { previous.close(); discardedSize += previous.totalLength(); @@ -322,8 +308,9 @@ private void deleteIfExist(Bytes key) { } } - void overwriteIfExists(Bytes key, Op op) { + void overwriteIfExists(SingleKeyOp op) { Preconditions.checkState(!isCommit, "%s is already committed.", this); + Bytes key = op.getKeyBytes(); deleteIfExist(key); batchSize += op.totalLength(); Op overwritten = ops.put(key, op); @@ -336,21 +323,25 @@ void overwriteIfExists(Bytes key, Op op) { void put(CodecBuffer key, CodecBuffer value) { putCount++; - // always release the key with the value - Bytes keyBytes = Bytes.newBytes(key); - overwriteIfExists(keyBytes, new PutOp(key, value, keyBytes)); + overwriteIfExists(new PutOp(key, value)); } void put(byte[] key, byte[] value) { putCount++; - Bytes keyBytes = new Bytes(key); - overwriteIfExists(keyBytes, new ByteArrayPutOp(key, value, keyBytes)); + CodecBuffer keyBuffer = DIRECT_CODEC_BUFFER_CODEC.fromPersistedFormat(key); + CodecBuffer valueBuffer = DIRECT_CODEC_BUFFER_CODEC.fromPersistedFormat(value); + overwriteIfExists(new PutOp(keyBuffer, valueBuffer)); } void delete(byte[] key) { delCount++; - Bytes keyBytes = new Bytes(key); - overwriteIfExists(keyBytes, new DeleteOp(key, keyBytes)); + CodecBuffer keyBuffer = DIRECT_CODEC_BUFFER_CODEC.fromPersistedFormat(key); + overwriteIfExists(new DeleteOp(keyBuffer)); + } + + void delete(CodecBuffer key) { + delCount++; + overwriteIfExists(new DeleteOp(key)); } String putString(int keySize, int valueSize) { @@ -388,6 +379,10 @@ void delete(ColumnFamily family, byte[] key) { .delete(key); } + void delete(ColumnFamily family, CodecBuffer key) { + name2cache.computeIfAbsent(family.getName(), k -> new FamilyCache(family)).delete(key); + } + /** Prepare batch write for the entire cache. */ UncheckedAutoCloseable prepareBatchWrite() throws RocksDatabaseException { for (Map.Entry e : name2cache.entrySet()) { @@ -464,6 +459,10 @@ public void delete(ColumnFamily family, byte[] key) { opCache.delete(family, key); } + public void delete(ColumnFamily family, CodecBuffer key) { + opCache.delete(family, key); + } + public void put(ColumnFamily family, CodecBuffer key, CodecBuffer value) { opCache.put(family, key, value); } diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBTable.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBTable.java index f732735cbe3..2aef5daa3c9 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBTable.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBTable.java @@ -193,6 +193,14 @@ public void deleteRange(byte[] beginKey, byte[] endKey) throws RocksDatabaseExce db.deleteRange(family, beginKey, endKey); } + void deleteWithBatch(BatchOperation batch, CodecBuffer key) { + if (batch instanceof RDBBatchOperation) { + ((RDBBatchOperation) batch).delete(family, key); + } else { + throw new IllegalArgumentException("Unexpected batch class: " + batch.getClass().getSimpleName()); + } + } + @Override public void deleteWithBatch(BatchOperation batch, byte[] key) { if (batch instanceof RDBBatchOperation) { diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDatabase.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDatabase.java index 659954a861b..5aff9351804 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDatabase.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDatabase.java @@ -299,7 +299,7 @@ public ColumnFamilyHandle getHandle() { return handle; } - public void batchDelete(ManagedWriteBatch writeBatch, byte[] key) + public void batchDelete(ManagedWriteBatch writeBatch, ByteBuffer key) throws RocksDatabaseException { try (UncheckedAutoCloseable ignored = acquire()) { writeBatch.delete(getHandle(), key); @@ -308,20 +308,6 @@ public void batchDelete(ManagedWriteBatch writeBatch, byte[] key) } } - public void batchPut(ManagedWriteBatch writeBatch, byte[] key, byte[] value) - throws RocksDatabaseException { - if (LOG.isDebugEnabled()) { - LOG.debug("batchPut array key {}", bytes2String(key)); - LOG.debug("batchPut array value {}", bytes2String(value)); - } - - try (UncheckedAutoCloseable ignored = acquire()) { - writeBatch.put(getHandle(), key, value); - } catch (RocksDBException e) { - throw toRocksDatabaseException(this, "batchPut key " + bytes2String(key), e); - } - } - public void batchPut(ManagedWriteBatch writeBatch, ByteBuffer key, ByteBuffer value) throws RocksDatabaseException { if (LOG.isDebugEnabled()) { diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java index 8000d48c618..59e924529ce 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java @@ -377,7 +377,19 @@ public void delete(KEY key) throws RocksDatabaseException, CodecException { @Override public void deleteWithBatch(BatchOperation batch, KEY key) throws CodecException { - rawTable.deleteWithBatch(batch, encodeKey(key)); + if (supportCodecBuffer) { + CodecBuffer keyBuffer = null; + try { + keyBuffer = keyCodec.toDirectCodecBuffer(key); + // The buffers will be released after commit. + rawTable.deleteWithBatch(batch, keyBuffer); + } catch (Exception e) { + IOUtils.closeQuietly(keyBuffer); + throw e; + } + } else { + rawTable.deleteWithBatch(batch, encodeKey(key)); + } } @Override diff --git a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBBatchOperation.java b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBBatchOperation.java new file mode 100644 index 00000000000..77aa812975d --- /dev/null +++ b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBBatchOperation.java @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.utils.db; + +import static org.apache.hadoop.hdds.StringUtils.string2Bytes; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.when; + +import com.google.common.collect.ImmutableSet; +import com.google.common.primitives.UnsignedBytes; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ThreadLocalRandom; +import org.apache.commons.lang3.RandomStringUtils; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.utils.db.Table.KeyValue; +import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksObjectUtils; +import org.apache.hadoop.hdds.utils.db.managed.ManagedWriteBatch; +import org.apache.hadoop.hdds.utils.db.managed.TrackingUtilManagedWriteBatchForTesting; +import org.apache.hadoop.hdds.utils.db.managed.TrackingUtilManagedWriteBatchForTesting.OpType; +import org.apache.hadoop.hdds.utils.db.managed.TrackingUtilManagedWriteBatchForTesting.Operation; +import org.apache.hadoop.ozone.util.ClosableIterator; +import org.apache.ratis.util.function.CheckedConsumer; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; +import org.mockito.Mockito; +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.RocksDBException; + +/** + * The TestRDBBatchOperation class provides test cases to validate the functionality of RDB batch operations + * in a RocksDB-based backend. It verifies the correct behavior of write operations using batch processing + * and ensures the integrity of operations like put and delete when performed in batch mode. + */ +public class TestRDBBatchOperation { + + static { + ManagedRocksObjectUtils.loadRocksDBLibrary(); + } + + @TempDir + private Path tempDir; + + private static Operation getOperation(String key, String value, OpType opType) { + return new Operation(string2Bytes(key), value == null ? null : string2Bytes(value), opType); + } + + @Test + public void testBatchOperation() throws RocksDatabaseException, CodecException, RocksDBException { + try (TrackingUtilManagedWriteBatchForTesting writeBatch = new TrackingUtilManagedWriteBatchForTesting(); + RDBBatchOperation batchOperation = RDBBatchOperation.newAtomicOperation(writeBatch)) { + ColumnFamilyHandle columnFamilyHandle = Mockito.mock(ColumnFamilyHandle.class); + RocksDatabase.ColumnFamily columnFamily = Mockito.mock(RocksDatabase.ColumnFamily.class); + doAnswer((i) -> { + ((ManagedWriteBatch)i.getArgument(0)) + .put(columnFamilyHandle, (ByteBuffer) i.getArgument(1), (ByteBuffer) i.getArgument(2)); + return null; + }).when(columnFamily).batchPut(any(ManagedWriteBatch.class), any(ByteBuffer.class), any(ByteBuffer.class)); + + doAnswer((i) -> { + ((ManagedWriteBatch)i.getArgument(0)) + .delete(columnFamilyHandle, (ByteBuffer) i.getArgument(1)); + return null; + }).when(columnFamily).batchDelete(any(ManagedWriteBatch.class), any(ByteBuffer.class)); + + when(columnFamily.getHandle()).thenReturn(columnFamilyHandle); + when(columnFamilyHandle.getName()).thenReturn(string2Bytes("test")); + when(columnFamily.getName()).thenReturn("test"); + Codec codec = StringCodec.get(); + // OP1: This should be skipped in favor of OP9. + batchOperation.put(columnFamily, codec.toDirectCodecBuffer("key01"), codec.toDirectCodecBuffer("value01")); + // OP2 + batchOperation.put(columnFamily, codec.toPersistedFormat("key02"), codec.toPersistedFormat("value02")); + // OP3: This should be skipped in favor of OP4. + batchOperation.put(columnFamily, codec.toDirectCodecBuffer("key03"), codec.toDirectCodecBuffer("value03")); + // OP4 + batchOperation.put(columnFamily, codec.toPersistedFormat("key03"), codec.toPersistedFormat("value04")); + // OP5 + batchOperation.delete(columnFamily, codec.toDirectCodecBuffer("key05")); + // OP6 + batchOperation.delete(columnFamily, codec.toPersistedFormat("key10")); + // OP7 + batchOperation.put(columnFamily, codec.toDirectCodecBuffer("key04"), codec.toDirectCodecBuffer("value04")); + // OP8 + batchOperation.put(columnFamily, codec.toPersistedFormat("key06"), codec.toPersistedFormat("value05")); + //OP9 + batchOperation.put(columnFamily, codec.toDirectCodecBuffer("key01"), codec.toDirectCodecBuffer("value011")); + + + RocksDatabase db = Mockito.mock(RocksDatabase.class); + doNothing().when(db).batchWrite(any()); + batchOperation.commit(db); + Set expectedOps = ImmutableSet.of( + getOperation("key01", "value011", OpType.PUT_DIRECT), + getOperation("key02", "value02", OpType.PUT_DIRECT), + getOperation("key03", "value04", OpType.PUT_DIRECT), + getOperation("key05", null, OpType.DELETE_DIRECT), + getOperation("key10", null, OpType.DELETE_DIRECT), + getOperation("key04", "value04", OpType.PUT_DIRECT), + getOperation("key06", "value05", OpType.PUT_DIRECT)); + assertEquals(Collections.singleton("test"), writeBatch.getOperations().keySet()); + assertEquals(expectedOps, new HashSet<>(writeBatch.getOperations().get("test"))); + } + } + + private DBStore getDBStore(OzoneConfiguration conf, String name, String tableName) throws RocksDatabaseException { + return DBStoreBuilder.newBuilder(conf) + .setName(name).setPath(tempDir).addTable(tableName).build(); + } + + private void performPut(Table withBatchTable, BatchOperation batchOperation, + Table withoutBatchTable, String key) throws RocksDatabaseException, CodecException { + String value = getRandomString(); + withBatchTable.putWithBatch(batchOperation, key, value); + withoutBatchTable.put(key, value); + } + + private void performDelete(Table withBatchTable, BatchOperation batchOperation, + Table withoutBatchTable, String key) throws RocksDatabaseException, CodecException { + withBatchTable.deleteWithBatch(batchOperation, key); + withoutBatchTable.delete(key); + } + + private String getRandomString() { + int length = ThreadLocalRandom.current().nextInt(1, 1024); + return RandomStringUtils.secure().next(length); + } + + private void performOpWithRandomKey(CheckedConsumer op, Set keySet, + List keyList) throws IOException { + String key = getRandomString(); + op.accept(key); + if (!keySet.contains(key)) { + keyList.add(key); + keySet.add(key); + } + } + + private void performOpWithRandomPreExistingKey(CheckedConsumer op, List keyList) + throws IOException { + int randomIndex = ThreadLocalRandom.current().nextInt(0, keyList.size()); + op.accept(keyList.get(randomIndex)); + } + + @Test + public void testRDBBatchOperationWithRDB() throws IOException { + OzoneConfiguration conf = new OzoneConfiguration(); + String tableName = "test"; + try (DBStore dbStore1 = getDBStore(conf, "WithBatch.db", tableName); + DBStore dbStore2 = getDBStore(conf, "WithoutBatch.db", tableName)) { + try (BatchOperation batchOperation = dbStore1.initBatchOperation()) { + Table withBatchTable = dbStore1.getTable(tableName, StringCodec.get(), StringCodec.get()); + Table withoutBatchTable = dbStore2.getTable(tableName, StringCodec.get(), StringCodec.get()); + List keyList = new ArrayList<>(); + Set keySet = new HashSet<>(); + List> ops = Arrays.asList( + (key) -> performPut(withBatchTable, batchOperation, withoutBatchTable, key), + (key) -> performDelete(withBatchTable, batchOperation, withoutBatchTable, key)); + for (int i = 0; i < 30000; i++) { + CheckedConsumer op = ops.get(ThreadLocalRandom.current().nextInt(ops.size())); + boolean performWithPreExistingKey = ThreadLocalRandom.current().nextBoolean(); + if (performWithPreExistingKey && !keyList.isEmpty()) { + performOpWithRandomPreExistingKey(op, keyList); + } else { + performOpWithRandomKey(op, keySet, keyList); + } + } + dbStore1.commitBatchOperation(batchOperation); + } + Table withBatchTable = dbStore1.getTable(tableName, ByteArrayCodec.get(), StringCodec.get()); + Table withoutBatchTable = dbStore2.getTable(tableName, ByteArrayCodec.get(), StringCodec.get()); + try (ClosableIterator>> itr = dbStore1.getMergeIterator( + UnsignedBytes.lexicographicalComparator(), null, (Table) withBatchTable, + (Table) withoutBatchTable)) { + while (itr.hasNext()) { + KeyValue> kv = itr.next(); + String actualKey = StringCodec.get().fromPersistedFormat(kv.getKey()); + assertEquals(2, kv.getValue().size(), "Expected 2 values for key " + actualKey); + assertEquals(kv.getValue().get(0), kv.getValue().get(1), "Expected same value for key " + actualKey); + } + } + } + + } + +} diff --git a/hadoop-hdds/managed-rocksdb/pom.xml b/hadoop-hdds/managed-rocksdb/pom.xml index 5e6976500f9..1a1fb3a82be 100644 --- a/hadoop-hdds/managed-rocksdb/pom.xml +++ b/hadoop-hdds/managed-rocksdb/pom.xml @@ -25,11 +25,6 @@ Apache Ozone HDDS Managed RocksDB Apache Ozone Managed RocksDB library - - - true - - com.google.guava @@ -63,6 +58,11 @@ org.slf4j slf4j-api + + org.apache.commons + commons-lang3 + test + @@ -74,6 +74,18 @@ none + + org.apache.maven.plugins + maven-jar-plugin + + + test-jar + + test-jar + + + + diff --git a/hadoop-hdds/managed-rocksdb/src/test/java/org/apache/hadoop/hdds/utils/db/managed/TrackingUtilManagedWriteBatchForTesting.java b/hadoop-hdds/managed-rocksdb/src/test/java/org/apache/hadoop/hdds/utils/db/managed/TrackingUtilManagedWriteBatchForTesting.java new file mode 100644 index 00000000000..1c9241c1d9f --- /dev/null +++ b/hadoop-hdds/managed-rocksdb/src/test/java/org/apache/hadoop/hdds/utils/db/managed/TrackingUtilManagedWriteBatchForTesting.java @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.utils.db.managed; + +import static org.apache.hadoop.hdds.StringUtils.bytes2String; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.RocksDBException; + +/** + * The TrackingUtilManagedWriteBatch class extends ManagedWriteBatch to provide functionality + * for tracking operations in a managed write batch context. Operations such as put, delete, + * merge, and delete range are managed and tracked, along with their corresponding operation types. + * + * This class supports direct and indirect operation types, delineated in the OpType enumeration. + * Direct operations are created using ByteBuffers while indirect operations are created using + * byte arrays. + */ +public class TrackingUtilManagedWriteBatchForTesting extends ManagedWriteBatch { + + private final Map> operations = new HashMap<>(); + + /** + * The OpType enumeration defines the different types of operations performed in a batch. + */ + public enum OpType { + PUT_DIRECT, + DELETE_DIRECT, + MERGE_DIRECT, + DELETE_RANGE_INDIRECT, + PUT_NON_DIRECT, + DELETE_NON_DIRECT, + MERGE_NON_DIRECT, + } + + /** + * The Operation class represents an individual operation to be performed in the context of + * a batch operation, such as a database write, delete, or merge. Each operation is characterized + * by a key, value, and an operation type (OpType). + * + * Operations can be of different types, as defined in the OpType enumeration, which include + * actions such as put, delete, merge, and delete range, either direct or indirect. + */ + public static class Operation { + private final byte[] key; + private final byte[] value; + private final OpType opType; + + public Operation(byte[] key, byte[] value, OpType opType) { + this.key = Arrays.copyOf(key, key.length); + this.value = value == null ? null : Arrays.copyOf(value, value.length); + this.opType = opType; + } + + public Operation(byte[] key, OpType opType) { + this(key, null, opType); + } + + @Override + public final boolean equals(Object o) { + if (!(o instanceof Operation)) { + return false; + } + + Operation operation = (Operation) o; + return Arrays.equals(key, operation.key) && Arrays.equals(value, operation.value) && + opType == operation.opType; + } + + @Override + public final int hashCode() { + return Arrays.hashCode(key) + Arrays.hashCode(value) + opType.hashCode(); + } + + @Override + public String toString() { + return "Operation{" + + "key=" + bytes2String(key) + + ", value=" + (value == null ? null : bytes2String(value)) + + ", opType=" + opType + + '}'; + } + } + + public Map> getOperations() { + return operations; + } + + public TrackingUtilManagedWriteBatchForTesting() { + super(); + } + + private byte[] convert(ByteBuffer buffer) { + byte[] bytes = new byte[buffer.remaining()]; + buffer.get(bytes); + return bytes; + } + + @Override + public void delete(ColumnFamilyHandle columnFamilyHandle, byte[] key) throws RocksDBException { + operations.computeIfAbsent(bytes2String(columnFamilyHandle.getName()), k -> new ArrayList<>()) + .add(new Operation(key, OpType.DELETE_NON_DIRECT)); + } + + @Override + public void delete(ColumnFamilyHandle columnFamilyHandle, ByteBuffer key) throws RocksDBException { + operations.computeIfAbsent(bytes2String(columnFamilyHandle.getName()), k -> new ArrayList<>()) + .add(new Operation(convert(key), OpType.DELETE_DIRECT)); + } + + @Override + public void delete(byte[] key) throws RocksDBException { + operations.computeIfAbsent("", k -> new ArrayList<>()).add(new Operation(key, OpType.DELETE_NON_DIRECT)); + } + + @Override + public void delete(ByteBuffer key) throws RocksDBException { + operations.computeIfAbsent("", k -> new ArrayList<>()) + .add(new Operation(convert(key), OpType.DELETE_DIRECT)); + } + + @Override + public void deleteRange(byte[] beginKey, byte[] endKey) { + operations.computeIfAbsent("", k -> new ArrayList<>()) + .add(new Operation(beginKey, endKey, OpType.DELETE_RANGE_INDIRECT)); + } + + @Override + public void deleteRange(ColumnFamilyHandle columnFamilyHandle, byte[] beginKey, byte[] endKey) + throws RocksDBException { + operations.computeIfAbsent(bytes2String(columnFamilyHandle.getName()), k -> new ArrayList<>()) + .add(new Operation(beginKey, endKey, OpType.DELETE_RANGE_INDIRECT)); + } + + @Override + public void merge(ColumnFamilyHandle columnFamilyHandle, byte[] key, byte[] value) throws RocksDBException { + operations.computeIfAbsent(bytes2String(columnFamilyHandle.getName()), k -> new ArrayList<>()) + .add(new Operation(key, value, OpType.MERGE_NON_DIRECT)); + } + + @Override + public void merge(byte[] key, byte[] value) { + operations.computeIfAbsent("", k -> new ArrayList<>()) + .add(new Operation(key, value, OpType.MERGE_NON_DIRECT)); + } + + @Override + public void put(ColumnFamilyHandle columnFamilyHandle, byte[] key, byte[] value) throws RocksDBException { + operations.computeIfAbsent(bytes2String(columnFamilyHandle.getName()), k -> new ArrayList<>()) + .add(new Operation(key, value, OpType.PUT_NON_DIRECT)); + } + + @Override + public void put(ColumnFamilyHandle columnFamilyHandle, ByteBuffer key, ByteBuffer value) throws RocksDBException { + operations.computeIfAbsent(bytes2String(columnFamilyHandle.getName()), k -> new ArrayList<>()) + .add(new Operation(convert(key), convert(value), OpType.PUT_DIRECT)); + } + + @Override + public void put(byte[] key, byte[] value) throws RocksDBException { + operations.computeIfAbsent("", k -> new ArrayList<>()).add(new Operation(key, value, OpType.PUT_NON_DIRECT)); + } + + @Override + public void put(ByteBuffer key, ByteBuffer value) throws RocksDBException { + operations.computeIfAbsent("", k -> new ArrayList<>()) + .add(new Operation(convert(key), convert(value), OpType.PUT_DIRECT)); + } + + @Override + public void close() { + super.close(); + } +} diff --git a/pom.xml b/pom.xml index 813226c07b0..32bea5743c4 100644 --- a/pom.xml +++ b/pom.xml @@ -1069,6 +1069,12 @@ hdds-managed-rocksdb ${hdds.version} + + org.apache.ozone + hdds-managed-rocksdb + ${hdds.version} + test-jar + org.apache.ozone hdds-rocks-native