-
Notifications
You must be signed in to change notification settings - Fork 587
HDDS-14166. Get rid of byte array operations from RDBBatchOperation for PUT and DELETE #9552
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
f47ebe4
61091f2
95cfe2b
d9b1480
aa1841a
48639d2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -49,6 +49,7 @@ public final class RDBBatchOperation implements BatchOperation { | |
| static final Logger LOG = LoggerFactory.getLogger(RDBBatchOperation.class); | ||
|
|
||
| private static final AtomicInteger BATCH_COUNT = new AtomicInteger(); | ||
| private static final CodecBufferCodec DIRECT_CODEC_BUFFER_CODEC = CodecBufferCodec.get(true); | ||
|
|
||
| private final String name = "Batch-" + BATCH_COUNT.getAndIncrement(); | ||
|
|
||
|
|
@@ -135,16 +136,26 @@ public void close() { | |
| } | ||
| } | ||
|
|
||
| private abstract static class Op implements Closeable { | ||
| private abstract static class SingleKeyOp extends Op { | ||
|
Comment on lines
-138
to
+139
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Move SingleKeyOp to below Op. |
||
| private final CodecBuffer keyBuffer; | ||
| private final Bytes keyBytes; | ||
|
|
||
| private Op(Bytes keyBytes) { | ||
| this.keyBytes = keyBytes; | ||
| private SingleKeyOp(CodecBuffer keyBuffer) { | ||
| this.keyBuffer = Objects.requireNonNull(keyBuffer); | ||
| this.keyBytes = Bytes.newBytes(keyBuffer); | ||
| } | ||
|
|
||
| abstract void apply(ColumnFamily family, ManagedWriteBatch batch) throws RocksDatabaseException; | ||
| CodecBuffer getKeyBuffer() { | ||
| return keyBuffer; | ||
| } | ||
|
|
||
| abstract int keyLen(); | ||
| Bytes getKeyBytes() { | ||
| return keyBytes; | ||
| } | ||
|
|
||
| int keyLen() { | ||
| return getKeyBuffer().readableBytes(); | ||
| } | ||
|
|
||
| int valLen() { | ||
| return 0; | ||
|
|
@@ -155,100 +166,75 @@ int totalLength() { | |
| } | ||
|
|
||
| @Override | ||
| public void close() { | ||
| if (keyBytes != null) { | ||
| keyBytes.close(); | ||
| boolean closeImpl() { | ||
| if (super.closeImpl()) { | ||
| IOUtils.close(LOG, keyBuffer, keyBytes); | ||
| return true; | ||
| } | ||
| return false; | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Delete operation to be applied to a {@link ColumnFamily} batch. | ||
| */ | ||
| private static final class DeleteOp extends Op { | ||
| private final byte[] key; | ||
| private abstract static class Op implements Closeable { | ||
| private final AtomicBoolean closed = new AtomicBoolean(false); | ||
|
|
||
| private DeleteOp(byte[] key, Bytes keyBytes) { | ||
| super(Objects.requireNonNull(keyBytes, "keyBytes == null")); | ||
| this.key = Objects.requireNonNull(key, "key == null"); | ||
| } | ||
| abstract void apply(ColumnFamily family, ManagedWriteBatch batch) throws RocksDatabaseException; | ||
|
|
||
| @Override | ||
| public void apply(ColumnFamily family, ManagedWriteBatch batch) throws RocksDatabaseException { | ||
| family.batchDelete(batch, this.key); | ||
| abstract int totalLength(); | ||
|
|
||
| boolean closeImpl() { | ||
| return closed.compareAndSet(false, true); | ||
| } | ||
|
|
||
| @Override | ||
| public int keyLen() { | ||
| return key.length; | ||
| public final void close() { | ||
| closeImpl(); | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Put operation to be applied to a {@link ColumnFamily} batch using the CodecBuffer api. | ||
| * Delete operation to be applied to a {@link ColumnFamily} batch. | ||
| */ | ||
| private final class PutOp extends Op { | ||
| private final CodecBuffer key; | ||
| private final CodecBuffer value; | ||
| private final AtomicBoolean closed = new AtomicBoolean(false); | ||
| private static final class DeleteOp extends SingleKeyOp { | ||
|
|
||
| private PutOp(CodecBuffer key, CodecBuffer value, Bytes keyBytes) { | ||
| super(keyBytes); | ||
| this.key = key; | ||
| this.value = value; | ||
| private DeleteOp(CodecBuffer key) { | ||
| super(key); | ||
| } | ||
|
|
||
| @Override | ||
| public void apply(ColumnFamily family, ManagedWriteBatch batch) throws RocksDatabaseException { | ||
| family.batchPut(batch, key.asReadOnlyByteBuffer(), value.asReadOnlyByteBuffer()); | ||
| } | ||
|
|
||
| @Override | ||
| public int keyLen() { | ||
| return key.readableBytes(); | ||
| } | ||
|
|
||
| @Override | ||
| public int valLen() { | ||
| return value.readableBytes(); | ||
| } | ||
|
|
||
| @Override | ||
| public void close() { | ||
| if (closed.compareAndSet(false, true)) { | ||
| key.release(); | ||
| value.release(); | ||
| } | ||
| super.close(); | ||
| family.batchDelete(batch, this.getKeyBuffer().asReadOnlyByteBuffer()); | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Put operation to be applied to a {@link ColumnFamily} batch using the byte array api. | ||
| * Put operation to be applied to a {@link ColumnFamily} batch using the CodecBuffer api. | ||
| */ | ||
| private static final class ByteArrayPutOp extends Op { | ||
| private final byte[] key; | ||
| private final byte[] value; | ||
| private final class PutOp extends SingleKeyOp { | ||
| private final CodecBuffer value; | ||
|
|
||
| private ByteArrayPutOp(byte[] key, byte[] value, Bytes keyBytes) { | ||
| super(keyBytes); | ||
| this.key = Objects.requireNonNull(key, "key == null"); | ||
| private PutOp(CodecBuffer key, CodecBuffer value) { | ||
| super(Objects.requireNonNull(key, "key == null")); | ||
| this.value = Objects.requireNonNull(value, "value == null"); | ||
| } | ||
|
|
||
| @Override | ||
| public void apply(ColumnFamily family, ManagedWriteBatch batch) throws RocksDatabaseException { | ||
| family.batchPut(batch, key, value); | ||
| family.batchPut(batch, getKeyBuffer().asReadOnlyByteBuffer(), value.asReadOnlyByteBuffer()); | ||
| } | ||
|
|
||
| @Override | ||
| public int keyLen() { | ||
| return key.length; | ||
| public int valLen() { | ||
| return value.readableBytes(); | ||
| } | ||
|
|
||
| @Override | ||
| public int valLen() { | ||
| return value.length; | ||
| boolean closeImpl() { | ||
| if (super.closeImpl()) { | ||
| IOUtils.close(LOG, value); | ||
| return true; | ||
| } | ||
| return false; | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -271,7 +257,7 @@ private class FamilyCache { | |
| * It supports operations such as additions and deletions while maintaining the ability to overwrite | ||
| * existing entries when necessary. | ||
| */ | ||
| private final Map<Bytes, Op> ops = new HashMap<>(); | ||
| private final Map<Bytes, SingleKeyOp> ops = new HashMap<>(); | ||
| private boolean isCommit; | ||
|
|
||
| private long batchSize; | ||
|
|
@@ -312,7 +298,7 @@ void clear() { | |
| } | ||
|
|
||
| private void deleteIfExist(Bytes key) { | ||
| final Op previous = ops.remove(key); | ||
| final SingleKeyOp previous = ops.remove(key); | ||
| if (previous != null) { | ||
| previous.close(); | ||
| discardedSize += previous.totalLength(); | ||
|
|
@@ -322,8 +308,9 @@ private void deleteIfExist(Bytes key) { | |
| } | ||
| } | ||
|
|
||
| void overwriteIfExists(Bytes key, Op op) { | ||
| void overwriteIfExists(SingleKeyOp op) { | ||
| Preconditions.checkState(!isCommit, "%s is already committed.", this); | ||
| Bytes key = op.getKeyBytes(); | ||
| deleteIfExist(key); | ||
| batchSize += op.totalLength(); | ||
| Op overwritten = ops.put(key, op); | ||
|
|
@@ -336,21 +323,25 @@ void overwriteIfExists(Bytes key, Op op) { | |
|
|
||
| void put(CodecBuffer key, CodecBuffer value) { | ||
| putCount++; | ||
| // always release the key with the value | ||
| Bytes keyBytes = Bytes.newBytes(key); | ||
| overwriteIfExists(keyBytes, new PutOp(key, value, keyBytes)); | ||
| overwriteIfExists(new PutOp(key, value)); | ||
| } | ||
|
|
||
| void put(byte[] key, byte[] value) { | ||
| putCount++; | ||
| Bytes keyBytes = new Bytes(key); | ||
| overwriteIfExists(keyBytes, new ByteArrayPutOp(key, value, keyBytes)); | ||
| CodecBuffer keyBuffer = DIRECT_CODEC_BUFFER_CODEC.fromPersistedFormat(key); | ||
| CodecBuffer valueBuffer = DIRECT_CODEC_BUFFER_CODEC.fromPersistedFormat(value); | ||
| overwriteIfExists(new PutOp(keyBuffer, valueBuffer)); | ||
| } | ||
|
|
||
| void delete(byte[] key) { | ||
|
Comment on lines
329
to
336
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Remove all the put and delete methods with byte[]. Except for tests, they are only used by RDBTable once. Just convert the byte[] to CodecBuffer there. |
||
| delCount++; | ||
| Bytes keyBytes = new Bytes(key); | ||
| overwriteIfExists(keyBytes, new DeleteOp(key, keyBytes)); | ||
| CodecBuffer keyBuffer = DIRECT_CODEC_BUFFER_CODEC.fromPersistedFormat(key); | ||
| overwriteIfExists(new DeleteOp(keyBuffer)); | ||
| } | ||
|
|
||
| void delete(CodecBuffer key) { | ||
| delCount++; | ||
| overwriteIfExists(new DeleteOp(key)); | ||
| } | ||
|
|
||
| String putString(int keySize, int valueSize) { | ||
|
|
@@ -388,6 +379,10 @@ void delete(ColumnFamily family, byte[] key) { | |
| .delete(key); | ||
| } | ||
|
|
||
| void delete(ColumnFamily family, CodecBuffer key) { | ||
| name2cache.computeIfAbsent(family.getName(), k -> new FamilyCache(family)).delete(key); | ||
| } | ||
|
|
||
| /** Prepare batch write for the entire cache. */ | ||
| UncheckedAutoCloseable prepareBatchWrite() throws RocksDatabaseException { | ||
| for (Map.Entry<String, FamilyCache> e : name2cache.entrySet()) { | ||
|
|
@@ -464,6 +459,10 @@ public void delete(ColumnFamily family, byte[] key) { | |
| opCache.delete(family, key); | ||
| } | ||
|
|
||
| public void delete(ColumnFamily family, CodecBuffer key) { | ||
| opCache.delete(family, key); | ||
| } | ||
|
|
||
| public void put(ColumnFamily family, CodecBuffer key, CodecBuffer value) { | ||
| opCache.put(family, key, value); | ||
| } | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method currently only used in test. What is it for?