Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
182 changes: 181 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -174,4 +174,184 @@ The serialization format varies depending on the element type:
Example types that work automatically with this support:

- Protocol Buffer message types: `IList<YourProtobufMessage>`
- Primitive types: `IList<int>`, `IList<long>`, `IList<string>`, etc.
- Primitive types: `IList<int>`, `IList<long>`, `IList<string>`, etc.

## Merge Operators

RocksDb.Extensions provides powerful support for RocksDB's merge operators, enabling efficient atomic read-modify-write operations without requiring separate read and write operations. This is particularly useful for counters, list operations, and other accumulative data structures.

### What are Merge Operators?

Merge operators allow you to apply atomic transformations to values in RocksDB without needing to:
1. Read the current value
2. Modify it in your application code
3. Write it back

Instead, you submit a "merge operand" (the change to apply), and RocksDB handles the merge internally. This provides several benefits:

- **Performance**: Eliminates the round-trip read before write
- **Atomicity**: Operations are applied atomically at the database level
- **Correctness**: Avoids race conditions in concurrent scenarios
- **Efficiency**: Merge operands can be combined during compaction

### Creating a Mergeable Store

To use merge operators, inherit from `MergeableRocksDbStore<TKey, TValue, TOperand>` instead of `RocksDbStore<TKey, TValue>`:

```csharp
public class TagsStore : MergeableRocksDbStore<string, IList<string>, CollectionOperation<string>>
{
public TagsStore(IMergeAccessor<string, IList<string>, CollectionOperation<string>> mergeAccessor)
: base(mergeAccessor) { }

public void AddTags(string key, params string[] tags)
{
Merge(key, CollectionOperation<string>.Add(tags));
}

public void RemoveTags(string key, params string[] tags)
{
Merge(key, CollectionOperation<string>.Remove(tags));
}
}
```

Key differences from regular stores:
- Three type parameters: `TKey`, `TValue`, and `TOperand`
- `TValue` is the actual value stored in the database
- `TOperand` is the merge operand (the change/delta to apply)
- `Merge()` method for applying operations

### Registering a Mergeable Store

Register your mergeable store with a merge operator implementation:

```csharp
rocksDbBuilder.AddMergeableStore<string, IList<string>, TagsStore, CollectionOperation<string>>(
columnFamily: "tags",
mergeOperator: new ListMergeOperator<string>()
);
```

### Built-in Merge Operator: ListMergeOperator

The library includes `ListMergeOperator<T>` for atomic list operations with add/remove support:

**Features:**
- Add multiple items to a list atomically
- Remove multiple items from a list atomically
- Operations are applied in order
- Removes only the first occurrence of each item (like `List<T>.Remove()`)
- Non-existent items in remove operations are silently ignored
- Efficient partial merge: combines multiple Add operations during compaction

**Example Usage:**

```csharp
// Add tags without reading the current list
tagsStore.AddTags("article-1", "csharp", "dotnet", "rocksdb");

// Remove specific tags
tagsStore.RemoveTags("article-1", "rocksdb");

// Read the current value
if (tagsStore.TryGet("article-1", out var tags))
{
// tags now contains: ["csharp", "dotnet"]
}
```

### Creating Custom Merge Operators

You can create custom merge operators by implementing `IMergeOperator<TValue, TOperand>`:

```csharp
public class CounterMergeOperator : IMergeOperator<long, long>
{
public string Name => "CounterMergeOperator";

public (bool Success, long Value) FullMerge(long? existingValue, ReadOnlySpan<long> operands)
{
var result = existingValue ?? 0;
foreach (var operand in operands)
{
result += operand;
}
return (true, result);
}

public (bool Success, long? Operand) PartialMerge(ReadOnlySpan<long> operands)
{
// Counters are commutative - we can combine all increments
long sum = 0;
foreach (var operand in operands)
{
sum += operand;
}
return (true, sum);
}
}

// Usage in a store
public class CounterStore : MergeableRocksDbStore<string, long, long>
{
public CounterStore(IMergeAccessor<string, long, long> mergeAccessor)
: base(mergeAccessor) { }

public void Increment(string key, long delta = 1) => Merge(key, delta);
public void Decrement(string key, long delta = 1) => Merge(key, -delta);
}

// Registration
rocksDbBuilder.AddMergeableStore<string, long, CounterStore, long>(
"counters",
new CounterMergeOperator()
);
```

### Understanding IMergeOperator Methods

**FullMerge:**
- Called during Get operations to produce the final value
- Receives the existing value (or null/default) and all pending merge operands
- Must apply all operands in order and return the final result
- Returns `(bool Success, TValue Value)` - Success=false indicates merge failure

**PartialMerge:**
- Called during compaction to optimize storage
- Combines multiple operands without knowing the existing value
- Returns `(bool Success, TOperand? Operand)`:
- Success=true: operands were successfully combined
- Success=false: operands cannot be safely combined (RocksDB keeps them separate)
- Optimization only - if partial merge fails, RocksDB will call FullMerge later

**When to return Success=false in PartialMerge:**
- Operations are order-dependent (like Add followed by Remove)
- Operations require knowledge of the existing value
- Operations cannot be combined safely

### Use Cases for Merge Operators

1. **Counters**: Increment/decrement without reading
- `TValue=long`, `TOperand=long`

2. **List Append**: Add items to lists
- `TValue=IList<T>`, `TOperand=IList<T>` or `CollectionOperation<T>`

3. **Set Operations**: Union, intersection, difference
- `TValue=ISet<T>`, `TOperand=SetOperation<T>`

4. **JSON Updates**: Merge JSON objects or arrays
- `TValue=JsonDocument`, `TOperand=JsonPatch`

5. **Time Series**: Append time-stamped events
- `TValue=IList<Event>`, `TOperand=Event`

### Best Practices

- Use merge operators when you need atomic updates without reading first
- Implement PartialMerge for commutative/associative operations to improve compaction efficiency
- Return Success=false in PartialMerge when operations cannot be safely combined
- The merge operator's `Name` property must remain consistent across database opens
- Test your merge operators thoroughly, especially edge cases with null/empty values
- Consider serialization overhead - simpler operands often perform better
17 changes: 17 additions & 0 deletions src/RocksDb.Extensions/IMergeAccessor.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
using System.ComponentModel;

namespace RocksDb.Extensions;

#pragma warning disable CS1591

/// <summary>
/// This interface is not intended to be used directly by the clients of the library.
/// It provides merge operation support with a separate operand type.
/// </summary>
[EditorBrowsable(EditorBrowsableState.Never)]
public interface IMergeAccessor<TKey, TValue, in TOperand> : IRocksDbAccessor<TKey, TValue>
{
void Merge(TKey key, TOperand operand);
}

#pragma warning restore CS1591
62 changes: 62 additions & 0 deletions src/RocksDb.Extensions/IMergeOperator.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
namespace RocksDb.Extensions;

/// <summary>
/// Defines a merge operator for RocksDB that enables atomic read-modify-write operations.
/// Merge operators allow efficient updates without requiring a separate read before write,
/// which is particularly useful for counters, list appends, set unions, and other accumulative operations.
/// </summary>
/// <typeparam name="TValue">The type of the value stored in the database.</typeparam>
/// <typeparam name="TOperand">The type of the merge operand (the delta/change to apply).</typeparam>
/// <remarks>
/// The separation of <typeparamref name="TValue"/> and <typeparamref name="TOperand"/> allows for flexible merge patterns:
/// <list type="bullet">
/// <item><description>For counters: TValue=long, TOperand=long (same type)</description></item>
/// <item><description>For list append: TValue=IList&lt;T&gt;, TOperand=IList&lt;T&gt; (same type)</description></item>
/// <item><description>For list with add/remove: TValue=IList&lt;T&gt;, TOperand=CollectionOperation&lt;T&gt; (different types)</description></item>
/// </list>
/// </remarks>
public interface IMergeOperator<TValue, TOperand>
{
/// <summary>
/// Gets the name of the merge operator. This name is stored in the database
/// and must remain consistent across database opens.
/// </summary>
string Name { get; }

/// <summary>
/// Performs a full merge of the existing value with one or more operands.
/// Called when a Get operation encounters merge operands and needs to produce the final value.
/// </summary>
/// <param name="existingValue">The existing value in the database, or null/default if no value exists.</param>
/// <param name="operands">The span of merge operands to apply, in order.</param>
/// <returns>A tuple containing:
/// <list type="bullet">
/// <item><description><c>Success</c>: true if the merge operation succeeded; false if it failed.</description></item>
/// <item><description><c>Value</c>: The merged result when Success is true; otherwise a default value.</description></item>
/// </list>
/// </returns>
/// <remarks>
/// This method must handle the case where existingValue is null (for reference types) or default (for value types).
/// The Success flag allows graceful error handling - if false is returned, the Get operation will return as if the key doesn't exist.
/// </remarks>
(bool Success, TValue Value) FullMerge(TValue? existingValue, ReadOnlySpan<TOperand> operands);

/// <summary>
/// Performs a partial merge of multiple operands without the existing value.
/// Called during compaction to combine multiple merge operands into a single operand.
/// This is an optimization that reduces the number of operands that need to be stored.
/// </summary>
/// <param name="operands">The span of merge operands to combine, in order.</param>
/// <returns>A tuple containing:
/// <list type="bullet">
/// <item><description><c>Success</c>: true if operands were successfully combined; false if it's unsafe to combine without knowing the existing value.</description></item>
/// <item><description><c>Operand</c>: The combined operand when Success is true; otherwise null.</description></item>
/// </list>
/// </returns>
/// <remarks>
/// Return (false, null) when it's not safe to combine operands without knowing the existing value.
/// When false is returned, RocksDB will keep the operands separate and call FullMerge later.
/// This allows for more efficient storage when operations are commutative and associative.
/// </remarks>
(bool Success, TOperand Operand) PartialMerge(ReadOnlySpan<TOperand> operands);
}
33 changes: 33 additions & 0 deletions src/RocksDb.Extensions/IRocksDbBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,37 @@ public interface IRocksDbBuilder
/// Use <c>GetRequiredKeyedService&lt;TStore&gt;(columnFamily)</c> to retrieve a specific store instance.
/// </remarks>
IRocksDbBuilder AddStore<TKey, TValue, TStore>(string columnFamily) where TStore : RocksDbStore<TKey, TValue>;

/// <summary>
/// Adds a mergeable RocksDB store to the builder for the specified column family.
/// This method enforces that the store inherits from <see cref="MergeableRocksDbStore{TKey,TValue,TOperand}"/>
/// and requires a merge operator, providing compile-time safety for merge operations.
/// </summary>
/// <param name="columnFamily">The name of the column family to associate with the store.</param>
/// <param name="mergeOperator">The merge operator to use for atomic read-modify-write operations.</param>
/// <typeparam name="TKey">The type of the store's key.</typeparam>
/// <typeparam name="TValue">The type of the store's values (the stored state).</typeparam>
/// <typeparam name="TOperand">The type of the merge operand (the delta/change to apply).</typeparam>
/// <typeparam name="TStore">The type of the store to add. Must inherit from <see cref="MergeableRocksDbStore{TKey,TValue,TOperand}"/>.</typeparam>
/// <returns>The builder instance for method chaining.</returns>
/// <exception cref="InvalidOperationException">Thrown if the specified column family is already registered.</exception>
/// <remarks>
/// <para>
/// Use this method when your store needs merge operations. The <typeparamref name="TStore"/> constraint
/// ensures that only stores designed for merge operations can be registered with this method.
/// </para>
/// <para>
/// The separation of <typeparamref name="TValue"/> and <typeparamref name="TOperand"/> allows for flexible patterns:
/// <list type="bullet">
/// <item><description>Counters: TValue=long, TOperand=long (same type)</description></item>
/// <item><description>List append: TValue=IList&lt;T&gt;, TOperand=IList&lt;T&gt; (same type)</description></item>
/// <item><description>List with add/remove: TValue=IList&lt;T&gt;, TOperand=CollectionOperation&lt;T&gt; (different types)</description></item>
/// </list>
/// </para>
/// <para>
/// For stores that don't need merge operations, use <see cref="AddStore{TKey,TValue,TStore}(string)"/> instead.
/// </para>
/// </remarks>
IRocksDbBuilder AddMergeableStore<TKey, TValue, TStore, TOperand>(string columnFamily, IMergeOperator<TValue, TOperand> mergeOperator)
where TStore : MergeableRocksDbStore<TKey, TValue, TOperand>;
}
84 changes: 84 additions & 0 deletions src/RocksDb.Extensions/ListOperationSerializer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
using System.Buffers;
using RocksDb.Extensions.MergeOperators;

namespace RocksDb.Extensions;

/// <summary>
/// Serializes CollectionOperation&lt;T&gt; which contains an operation type (Add/Remove) and a list of items.
/// </summary>
/// <remarks>
/// <para>
/// The serialized format consists of:
/// - 1 byte: Operation type (0 = Add, 1 = Remove)
/// - Remaining bytes: Serialized list using FixedSizeListSerializer (for primitives) or VariableSizeListSerializer (for complex types)
/// </para>
/// <para>
/// Space efficiency optimization:
/// - For primitive types (int, long, bool, etc.), uses FixedSizeListSerializer which stores:
/// - 4 bytes: list count
/// - N * elementSize bytes: all elements (no per-element size prefix)
/// Example: List&lt;int&gt; with 3 elements = 4 + (3 * 4) = 16 bytes
/// </para>
/// <para>
/// - For non-primitive types (strings, objects, protobuf messages), uses VariableSizeListSerializer which stores:
/// - 4 bytes: list count
/// - For each element: 4 bytes size prefix + element data
/// Example: List&lt;string&gt; with ["ab", "cde"] = 4 + (4+2) + (4+3) = 17 bytes
/// </para>
/// </remarks>
internal class ListOperationSerializer<T> : ISerializer<CollectionOperation<T>>
{
private readonly ISerializer<IList<T>> _listSerializer;

public ListOperationSerializer(ISerializer<T> itemSerializer)
{
// Use FixedSizeListSerializer for primitive types to avoid storing size for each element
// Use VariableSizeListSerializer for non-primitive types where elements may vary in size
_listSerializer = typeof(T).IsPrimitive
? new FixedSizeListSerializer<T>(itemSerializer)
: new VariableSizeListSerializer<T>(itemSerializer);
}

public bool TryCalculateSize(ref CollectionOperation<T> value, out int size)
{
// 1 byte for operation type + size of the list
size = sizeof(byte);

var items = value.Items;
if (_listSerializer.TryCalculateSize(ref items, out var listSize))
{
size += listSize;
return true;
}

return false;
}

public void WriteTo(ref CollectionOperation<T> value, ref Span<byte> span)
{
// Write operation type (1 byte)
span[0] = (byte)value.Type;

// Write the list using the list serializer
var listSpan = span.Slice(sizeof(byte));
var items = value.Items;
_listSerializer.WriteTo(ref items, ref listSpan);
}

public void WriteTo(ref CollectionOperation<T> value, IBufferWriter<byte> buffer)
{
throw new NotImplementedException();
}

public CollectionOperation<T> Deserialize(ReadOnlySpan<byte> buffer)
{
// Read operation type
var operationType = (OperationType)buffer[0];

// Read the list using the list serializer
var listBuffer = buffer.Slice(sizeof(byte));
var items = _listSerializer.Deserialize(listBuffer);

return new CollectionOperation<T>(operationType, items);
}
}
Loading