Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions hadoop-hdds/framework/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,12 @@
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.ozone</groupId>
<artifactId>hdds-managed-rocksdb</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.ozone</groupId>
<artifactId>hdds-test-utils</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,12 @@
*/
public final class CodecBufferCodec implements Codec<CodecBuffer> {

private static final Codec<CodecBuffer> DIRECT_INSTANCE = new CodecBufferCodec(true);
private static final Codec<CodecBuffer> NON_DIRECT_INSTANCE = new CodecBufferCodec(false);
private static final CodecBufferCodec DIRECT_INSTANCE = new CodecBufferCodec(true);
private static final CodecBufferCodec NON_DIRECT_INSTANCE = new CodecBufferCodec(false);

private final CodecBuffer.Allocator allocator;

public static Codec<CodecBuffer> get(boolean direct) {
public static CodecBufferCodec get(boolean direct) {
return direct ? DIRECT_INSTANCE : NON_DIRECT_INSTANCE;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import java.io.File;
import java.io.IOException;
import java.util.Collection;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -192,20 +191,20 @@ DBUpdatesWrapper getUpdatesSince(long sequenceNumber, long limitCount)
* @return a closable iterator over merged key-value pairs, where each key corresponds
* to a collection of values from the tables
*/
default <KEY> ClosableIterator<KeyValue<KEY, Collection<Object>>> getMergeIterator(
default <KEY> ClosableIterator<KeyValue<KEY, List<Object>>> getMergeIterator(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method currently only used in test. What is it for?

Comparator<KEY> keyComparator, KEY prefix, Table<KEY, Object>... table) {
List<Object> tableValues = IntStream.range(0, table.length).mapToObj(i -> null).collect(Collectors.toList());
KeyValue<KEY, Object> defaultNullValue = newKeyValue(null, null);
Comparator<KeyValue<KEY, Object>> comparator = Comparator.comparing(KeyValue::getKey, keyComparator);
return new MinHeapMergeIterator<KeyValue<KEY, Object>, Table.KeyValueIterator<KEY, Object>,
KeyValue<KEY, Collection<Object>>>(table.length, comparator) {
KeyValue<KEY, List<Object>>>(table.length, comparator) {
@Override
protected Table.KeyValueIterator<KEY, Object> getIterator(int idx) throws IOException {
return table[idx].iterator(prefix);
}

@Override
protected KeyValue<KEY, Collection<Object>> merge(Map<Integer, KeyValue<KEY, Object>> keysToMerge) {
protected KeyValue<KEY, List<Object>> merge(Map<Integer, KeyValue<KEY, Object>> keysToMerge) {
KEY key = keysToMerge.values().stream().findAny()
.orElseThrow(() -> new NoSuchElementException("No keys found")).getKey();
for (int i = 0; i < tableValues.size(); i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public final class RDBBatchOperation implements BatchOperation {
static final Logger LOG = LoggerFactory.getLogger(RDBBatchOperation.class);

private static final AtomicInteger BATCH_COUNT = new AtomicInteger();
private static final CodecBufferCodec DIRECT_CODEC_BUFFER_CODEC = CodecBufferCodec.get(true);

private final String name = "Batch-" + BATCH_COUNT.getAndIncrement();

Expand Down Expand Up @@ -135,16 +136,26 @@ public void close() {
}
}

private abstract static class Op implements Closeable {
private abstract static class SingleKeyOp extends Op {
Comment on lines -138 to +139
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move SingleKeyOp to below Op.

private final CodecBuffer keyBuffer;
private final Bytes keyBytes;

private Op(Bytes keyBytes) {
this.keyBytes = keyBytes;
private SingleKeyOp(CodecBuffer keyBuffer) {
this.keyBuffer = Objects.requireNonNull(keyBuffer);
this.keyBytes = Bytes.newBytes(keyBuffer);
}

abstract void apply(ColumnFamily family, ManagedWriteBatch batch) throws RocksDatabaseException;
CodecBuffer getKeyBuffer() {
return keyBuffer;
}

abstract int keyLen();
Bytes getKeyBytes() {
return keyBytes;
}

int keyLen() {
return getKeyBuffer().readableBytes();
}

int valLen() {
return 0;
Expand All @@ -155,100 +166,75 @@ int totalLength() {
}

@Override
public void close() {
if (keyBytes != null) {
keyBytes.close();
boolean closeImpl() {
if (super.closeImpl()) {
IOUtils.close(LOG, keyBuffer, keyBytes);
return true;
}
return false;
}
}

/**
* Delete operation to be applied to a {@link ColumnFamily} batch.
*/
private static final class DeleteOp extends Op {
private final byte[] key;
private abstract static class Op implements Closeable {
private final AtomicBoolean closed = new AtomicBoolean(false);

private DeleteOp(byte[] key, Bytes keyBytes) {
super(Objects.requireNonNull(keyBytes, "keyBytes == null"));
this.key = Objects.requireNonNull(key, "key == null");
}
abstract void apply(ColumnFamily family, ManagedWriteBatch batch) throws RocksDatabaseException;

@Override
public void apply(ColumnFamily family, ManagedWriteBatch batch) throws RocksDatabaseException {
family.batchDelete(batch, this.key);
abstract int totalLength();

boolean closeImpl() {
return closed.compareAndSet(false, true);
}

@Override
public int keyLen() {
return key.length;
public final void close() {
closeImpl();
}
}

/**
* Put operation to be applied to a {@link ColumnFamily} batch using the CodecBuffer api.
* Delete operation to be applied to a {@link ColumnFamily} batch.
*/
private final class PutOp extends Op {
private final CodecBuffer key;
private final CodecBuffer value;
private final AtomicBoolean closed = new AtomicBoolean(false);
private static final class DeleteOp extends SingleKeyOp {

private PutOp(CodecBuffer key, CodecBuffer value, Bytes keyBytes) {
super(keyBytes);
this.key = key;
this.value = value;
private DeleteOp(CodecBuffer key) {
super(key);
}

@Override
public void apply(ColumnFamily family, ManagedWriteBatch batch) throws RocksDatabaseException {
family.batchPut(batch, key.asReadOnlyByteBuffer(), value.asReadOnlyByteBuffer());
}

@Override
public int keyLen() {
return key.readableBytes();
}

@Override
public int valLen() {
return value.readableBytes();
}

@Override
public void close() {
if (closed.compareAndSet(false, true)) {
key.release();
value.release();
}
super.close();
family.batchDelete(batch, this.getKeyBuffer().asReadOnlyByteBuffer());
}
}

/**
* Put operation to be applied to a {@link ColumnFamily} batch using the byte array api.
* Put operation to be applied to a {@link ColumnFamily} batch using the CodecBuffer api.
*/
private static final class ByteArrayPutOp extends Op {
private final byte[] key;
private final byte[] value;
private final class PutOp extends SingleKeyOp {
private final CodecBuffer value;

private ByteArrayPutOp(byte[] key, byte[] value, Bytes keyBytes) {
super(keyBytes);
this.key = Objects.requireNonNull(key, "key == null");
private PutOp(CodecBuffer key, CodecBuffer value) {
super(Objects.requireNonNull(key, "key == null"));
this.value = Objects.requireNonNull(value, "value == null");
}

@Override
public void apply(ColumnFamily family, ManagedWriteBatch batch) throws RocksDatabaseException {
family.batchPut(batch, key, value);
family.batchPut(batch, getKeyBuffer().asReadOnlyByteBuffer(), value.asReadOnlyByteBuffer());
}

@Override
public int keyLen() {
return key.length;
public int valLen() {
return value.readableBytes();
}

@Override
public int valLen() {
return value.length;
boolean closeImpl() {
if (super.closeImpl()) {
IOUtils.close(LOG, value);
return true;
}
return false;
}
}

Expand All @@ -271,7 +257,7 @@ private class FamilyCache {
* It supports operations such as additions and deletions while maintaining the ability to overwrite
* existing entries when necessary.
*/
private final Map<Bytes, Op> ops = new HashMap<>();
private final Map<Bytes, SingleKeyOp> ops = new HashMap<>();
private boolean isCommit;

private long batchSize;
Expand Down Expand Up @@ -312,7 +298,7 @@ void clear() {
}

private void deleteIfExist(Bytes key) {
final Op previous = ops.remove(key);
final SingleKeyOp previous = ops.remove(key);
if (previous != null) {
previous.close();
discardedSize += previous.totalLength();
Expand All @@ -322,8 +308,9 @@ private void deleteIfExist(Bytes key) {
}
}

void overwriteIfExists(Bytes key, Op op) {
void overwriteIfExists(SingleKeyOp op) {
Preconditions.checkState(!isCommit, "%s is already committed.", this);
Bytes key = op.getKeyBytes();
deleteIfExist(key);
batchSize += op.totalLength();
Op overwritten = ops.put(key, op);
Expand All @@ -336,21 +323,25 @@ void overwriteIfExists(Bytes key, Op op) {

void put(CodecBuffer key, CodecBuffer value) {
putCount++;
// always release the key with the value
Bytes keyBytes = Bytes.newBytes(key);
overwriteIfExists(keyBytes, new PutOp(key, value, keyBytes));
overwriteIfExists(new PutOp(key, value));
}

void put(byte[] key, byte[] value) {
putCount++;
Bytes keyBytes = new Bytes(key);
overwriteIfExists(keyBytes, new ByteArrayPutOp(key, value, keyBytes));
CodecBuffer keyBuffer = DIRECT_CODEC_BUFFER_CODEC.fromPersistedFormat(key);
CodecBuffer valueBuffer = DIRECT_CODEC_BUFFER_CODEC.fromPersistedFormat(value);
overwriteIfExists(new PutOp(keyBuffer, valueBuffer));
}

void delete(byte[] key) {
Comment on lines 329 to 336
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove all the put and delete methods with byte[]. Except for tests, they are only used by RDBTable once. Just convert the byte[] to CodecBuffer there.

delCount++;
Bytes keyBytes = new Bytes(key);
overwriteIfExists(keyBytes, new DeleteOp(key, keyBytes));
CodecBuffer keyBuffer = DIRECT_CODEC_BUFFER_CODEC.fromPersistedFormat(key);
overwriteIfExists(new DeleteOp(keyBuffer));
}

void delete(CodecBuffer key) {
delCount++;
overwriteIfExists(new DeleteOp(key));
}

String putString(int keySize, int valueSize) {
Expand Down Expand Up @@ -388,6 +379,10 @@ void delete(ColumnFamily family, byte[] key) {
.delete(key);
}

void delete(ColumnFamily family, CodecBuffer key) {
name2cache.computeIfAbsent(family.getName(), k -> new FamilyCache(family)).delete(key);
}

/** Prepare batch write for the entire cache. */
UncheckedAutoCloseable prepareBatchWrite() throws RocksDatabaseException {
for (Map.Entry<String, FamilyCache> e : name2cache.entrySet()) {
Expand Down Expand Up @@ -464,6 +459,10 @@ public void delete(ColumnFamily family, byte[] key) {
opCache.delete(family, key);
}

public void delete(ColumnFamily family, CodecBuffer key) {
opCache.delete(family, key);
}

public void put(ColumnFamily family, CodecBuffer key, CodecBuffer value) {
opCache.put(family, key, value);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,14 @@ public void deleteRange(byte[] beginKey, byte[] endKey) throws RocksDatabaseExce
db.deleteRange(family, beginKey, endKey);
}

void deleteWithBatch(BatchOperation batch, CodecBuffer key) {
if (batch instanceof RDBBatchOperation) {
((RDBBatchOperation) batch).delete(family, key);
} else {
throw new IllegalArgumentException("Unexpected batch class: " + batch.getClass().getSimpleName());
}
}

@Override
public void deleteWithBatch(BatchOperation batch, byte[] key) {
if (batch instanceof RDBBatchOperation) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ public ColumnFamilyHandle getHandle() {
return handle;
}

public void batchDelete(ManagedWriteBatch writeBatch, byte[] key)
public void batchDelete(ManagedWriteBatch writeBatch, ByteBuffer key)
throws RocksDatabaseException {
try (UncheckedAutoCloseable ignored = acquire()) {
writeBatch.delete(getHandle(), key);
Expand All @@ -308,20 +308,6 @@ public void batchDelete(ManagedWriteBatch writeBatch, byte[] key)
}
}

public void batchPut(ManagedWriteBatch writeBatch, byte[] key, byte[] value)
throws RocksDatabaseException {
if (LOG.isDebugEnabled()) {
LOG.debug("batchPut array key {}", bytes2String(key));
LOG.debug("batchPut array value {}", bytes2String(value));
}

try (UncheckedAutoCloseable ignored = acquire()) {
writeBatch.put(getHandle(), key, value);
} catch (RocksDBException e) {
throw toRocksDatabaseException(this, "batchPut key " + bytes2String(key), e);
}
}

public void batchPut(ManagedWriteBatch writeBatch, ByteBuffer key,
ByteBuffer value) throws RocksDatabaseException {
if (LOG.isDebugEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,19 @@ public void delete(KEY key) throws RocksDatabaseException, CodecException {

@Override
public void deleteWithBatch(BatchOperation batch, KEY key) throws CodecException {
rawTable.deleteWithBatch(batch, encodeKey(key));
if (supportCodecBuffer) {
CodecBuffer keyBuffer = null;
try {
keyBuffer = keyCodec.toDirectCodecBuffer(key);
// The buffers will be released after commit.
rawTable.deleteWithBatch(batch, keyBuffer);
} catch (Exception e) {
IOUtils.closeQuietly(keyBuffer);
throw e;
}
} else {
rawTable.deleteWithBatch(batch, encodeKey(key));
}
}

@Override
Expand Down
Loading