diff --git a/README.md b/README.md index 2e9f23e..ba1585e 100644 --- a/README.md +++ b/README.md @@ -174,4 +174,184 @@ The serialization format varies depending on the element type: Example types that work automatically with this support: - Protocol Buffer message types: `IList` -- Primitive types: `IList`, `IList`, `IList`, etc. \ No newline at end of file +- Primitive types: `IList`, `IList`, `IList`, etc. + +## Merge Operators + +RocksDb.Extensions provides powerful support for RocksDB's merge operators, enabling efficient atomic read-modify-write operations without requiring separate read and write operations. This is particularly useful for counters, list operations, and other accumulative data structures. + +### What are Merge Operators? + +Merge operators allow you to apply atomic transformations to values in RocksDB without needing to: +1. Read the current value +2. Modify it in your application code +3. Write it back + +Instead, you submit a "merge operand" (the change to apply), and RocksDB handles the merge internally. This provides several benefits: + +- **Performance**: Eliminates the round-trip read before write +- **Atomicity**: Operations are applied atomically at the database level +- **Correctness**: Avoids race conditions in concurrent scenarios +- **Efficiency**: Merge operands can be combined during compaction + +### Creating a Mergeable Store + +To use merge operators, inherit from `MergeableRocksDbStore` instead of `RocksDbStore`: + +```csharp +public class TagsStore : MergeableRocksDbStore, CollectionOperation> +{ + public TagsStore(IMergeAccessor, CollectionOperation> mergeAccessor) + : base(mergeAccessor) { } + + public void AddTags(string key, params string[] tags) + { + Merge(key, CollectionOperation.Add(tags)); + } + + public void RemoveTags(string key, params string[] tags) + { + Merge(key, CollectionOperation.Remove(tags)); + } +} +``` + +Key differences from regular stores: +- Three type parameters: `TKey`, `TValue`, and `TOperand` +- `TValue` is the actual value stored in the database +- `TOperand` is the merge operand (the change/delta to apply) +- `Merge()` method for applying operations + +### Registering a Mergeable Store + +Register your mergeable store with a merge operator implementation: + +```csharp +rocksDbBuilder.AddMergeableStore, TagsStore, CollectionOperation>( + columnFamily: "tags", + mergeOperator: new ListMergeOperator() +); +``` + +### Built-in Merge Operator: ListMergeOperator + +The library includes `ListMergeOperator` for atomic list operations with add/remove support: + +**Features:** +- Add multiple items to a list atomically +- Remove multiple items from a list atomically +- Operations are applied in order +- Removes only the first occurrence of each item (like `List.Remove()`) +- Non-existent items in remove operations are silently ignored +- Efficient partial merge: combines multiple Add operations during compaction + +**Example Usage:** + +```csharp +// Add tags without reading the current list +tagsStore.AddTags("article-1", "csharp", "dotnet", "rocksdb"); + +// Remove specific tags +tagsStore.RemoveTags("article-1", "rocksdb"); + +// Read the current value +if (tagsStore.TryGet("article-1", out var tags)) +{ + // tags now contains: ["csharp", "dotnet"] +} +``` + +### Creating Custom Merge Operators + +You can create custom merge operators by implementing `IMergeOperator`: + +```csharp +public class CounterMergeOperator : IMergeOperator +{ + public string Name => "CounterMergeOperator"; + + public (bool Success, long Value) FullMerge(long? existingValue, ReadOnlySpan operands) + { + var result = existingValue ?? 0; + foreach (var operand in operands) + { + result += operand; + } + return (true, result); + } + + public (bool Success, long? Operand) PartialMerge(ReadOnlySpan operands) + { + // Counters are commutative - we can combine all increments + long sum = 0; + foreach (var operand in operands) + { + sum += operand; + } + return (true, sum); + } +} + +// Usage in a store +public class CounterStore : MergeableRocksDbStore +{ + public CounterStore(IMergeAccessor mergeAccessor) + : base(mergeAccessor) { } + + public void Increment(string key, long delta = 1) => Merge(key, delta); + public void Decrement(string key, long delta = 1) => Merge(key, -delta); +} + +// Registration +rocksDbBuilder.AddMergeableStore( + "counters", + new CounterMergeOperator() +); +``` + +### Understanding IMergeOperator Methods + +**FullMerge:** +- Called during Get operations to produce the final value +- Receives the existing value (or null/default) and all pending merge operands +- Must apply all operands in order and return the final result +- Returns `(bool Success, TValue Value)` - Success=false indicates merge failure + +**PartialMerge:** +- Called during compaction to optimize storage +- Combines multiple operands without knowing the existing value +- Returns `(bool Success, TOperand? Operand)`: + - Success=true: operands were successfully combined + - Success=false: operands cannot be safely combined (RocksDB keeps them separate) +- Optimization only - if partial merge fails, RocksDB will call FullMerge later + +**When to return Success=false in PartialMerge:** +- Operations are order-dependent (like Add followed by Remove) +- Operations require knowledge of the existing value +- Operations cannot be combined safely + +### Use Cases for Merge Operators + +1. **Counters**: Increment/decrement without reading + - `TValue=long`, `TOperand=long` + +2. **List Append**: Add items to lists + - `TValue=IList`, `TOperand=IList` or `CollectionOperation` + +3. **Set Operations**: Union, intersection, difference + - `TValue=ISet`, `TOperand=SetOperation` + +4. **JSON Updates**: Merge JSON objects or arrays + - `TValue=JsonDocument`, `TOperand=JsonPatch` + +5. **Time Series**: Append time-stamped events + - `TValue=IList`, `TOperand=Event` + +### Best Practices + +- Use merge operators when you need atomic updates without reading first +- Implement PartialMerge for commutative/associative operations to improve compaction efficiency +- Return Success=false in PartialMerge when operations cannot be safely combined +- The merge operator's `Name` property must remain consistent across database opens +- Test your merge operators thoroughly, especially edge cases with null/empty values +- Consider serialization overhead - simpler operands often perform better diff --git a/src/RocksDb.Extensions/IMergeAccessor.cs b/src/RocksDb.Extensions/IMergeAccessor.cs new file mode 100644 index 0000000..9c5605e --- /dev/null +++ b/src/RocksDb.Extensions/IMergeAccessor.cs @@ -0,0 +1,17 @@ +using System.ComponentModel; + +namespace RocksDb.Extensions; + +#pragma warning disable CS1591 + +/// +/// This interface is not intended to be used directly by the clients of the library. +/// It provides merge operation support with a separate operand type. +/// +[EditorBrowsable(EditorBrowsableState.Never)] +public interface IMergeAccessor : IRocksDbAccessor +{ + void Merge(TKey key, TOperand operand); +} + +#pragma warning restore CS1591 diff --git a/src/RocksDb.Extensions/IMergeOperator.cs b/src/RocksDb.Extensions/IMergeOperator.cs new file mode 100644 index 0000000..195ef54 --- /dev/null +++ b/src/RocksDb.Extensions/IMergeOperator.cs @@ -0,0 +1,62 @@ +namespace RocksDb.Extensions; + +/// +/// Defines a merge operator for RocksDB that enables atomic read-modify-write operations. +/// Merge operators allow efficient updates without requiring a separate read before write, +/// which is particularly useful for counters, list appends, set unions, and other accumulative operations. +/// +/// The type of the value stored in the database. +/// The type of the merge operand (the delta/change to apply). +/// +/// The separation of and allows for flexible merge patterns: +/// +/// For counters: TValue=long, TOperand=long (same type) +/// For list append: TValue=IList<T>, TOperand=IList<T> (same type) +/// For list with add/remove: TValue=IList<T>, TOperand=CollectionOperation<T> (different types) +/// +/// +public interface IMergeOperator +{ + /// + /// Gets the name of the merge operator. This name is stored in the database + /// and must remain consistent across database opens. + /// + string Name { get; } + + /// + /// Performs a full merge of the existing value with one or more operands. + /// Called when a Get operation encounters merge operands and needs to produce the final value. + /// + /// The existing value in the database, or null/default if no value exists. + /// The span of merge operands to apply, in order. + /// A tuple containing: + /// + /// Success: true if the merge operation succeeded; false if it failed. + /// Value: The merged result when Success is true; otherwise a default value. + /// + /// + /// + /// This method must handle the case where existingValue is null (for reference types) or default (for value types). + /// The Success flag allows graceful error handling - if false is returned, the Get operation will return as if the key doesn't exist. + /// + (bool Success, TValue Value) FullMerge(TValue? existingValue, ReadOnlySpan operands); + + /// + /// Performs a partial merge of multiple operands without the existing value. + /// Called during compaction to combine multiple merge operands into a single operand. + /// This is an optimization that reduces the number of operands that need to be stored. + /// + /// The span of merge operands to combine, in order. + /// A tuple containing: + /// + /// Success: true if operands were successfully combined; false if it's unsafe to combine without knowing the existing value. + /// Operand: The combined operand when Success is true; otherwise null. + /// + /// + /// + /// Return (false, null) when it's not safe to combine operands without knowing the existing value. + /// When false is returned, RocksDB will keep the operands separate and call FullMerge later. + /// This allows for more efficient storage when operations are commutative and associative. + /// + (bool Success, TOperand Operand) PartialMerge(ReadOnlySpan operands); +} diff --git a/src/RocksDb.Extensions/IRocksDbBuilder.cs b/src/RocksDb.Extensions/IRocksDbBuilder.cs index 7d1124c..2edf242 100644 --- a/src/RocksDb.Extensions/IRocksDbBuilder.cs +++ b/src/RocksDb.Extensions/IRocksDbBuilder.cs @@ -22,4 +22,37 @@ public interface IRocksDbBuilder /// Use GetRequiredKeyedService<TStore>(columnFamily) to retrieve a specific store instance. /// IRocksDbBuilder AddStore(string columnFamily) where TStore : RocksDbStore; + + /// + /// Adds a mergeable RocksDB store to the builder for the specified column family. + /// This method enforces that the store inherits from + /// and requires a merge operator, providing compile-time safety for merge operations. + /// + /// The name of the column family to associate with the store. + /// The merge operator to use for atomic read-modify-write operations. + /// The type of the store's key. + /// The type of the store's values (the stored state). + /// The type of the merge operand (the delta/change to apply). + /// The type of the store to add. Must inherit from . + /// The builder instance for method chaining. + /// Thrown if the specified column family is already registered. + /// + /// + /// Use this method when your store needs merge operations. The constraint + /// ensures that only stores designed for merge operations can be registered with this method. + /// + /// + /// The separation of and allows for flexible patterns: + /// + /// Counters: TValue=long, TOperand=long (same type) + /// List append: TValue=IList<T>, TOperand=IList<T> (same type) + /// List with add/remove: TValue=IList<T>, TOperand=CollectionOperation<T> (different types) + /// + /// + /// + /// For stores that don't need merge operations, use instead. + /// + /// + IRocksDbBuilder AddMergeableStore(string columnFamily, IMergeOperator mergeOperator) + where TStore : MergeableRocksDbStore; } \ No newline at end of file diff --git a/src/RocksDb.Extensions/ListOperationSerializer.cs b/src/RocksDb.Extensions/ListOperationSerializer.cs new file mode 100644 index 0000000..1a7953f --- /dev/null +++ b/src/RocksDb.Extensions/ListOperationSerializer.cs @@ -0,0 +1,84 @@ +using System.Buffers; +using RocksDb.Extensions.MergeOperators; + +namespace RocksDb.Extensions; + +/// +/// Serializes CollectionOperation<T> which contains an operation type (Add/Remove) and a list of items. +/// +/// +/// +/// The serialized format consists of: +/// - 1 byte: Operation type (0 = Add, 1 = Remove) +/// - Remaining bytes: Serialized list using FixedSizeListSerializer (for primitives) or VariableSizeListSerializer (for complex types) +/// +/// +/// Space efficiency optimization: +/// - For primitive types (int, long, bool, etc.), uses FixedSizeListSerializer which stores: +/// - 4 bytes: list count +/// - N * elementSize bytes: all elements (no per-element size prefix) +/// Example: List<int> with 3 elements = 4 + (3 * 4) = 16 bytes +/// +/// +/// - For non-primitive types (strings, objects, protobuf messages), uses VariableSizeListSerializer which stores: +/// - 4 bytes: list count +/// - For each element: 4 bytes size prefix + element data +/// Example: List<string> with ["ab", "cde"] = 4 + (4+2) + (4+3) = 17 bytes +/// +/// +internal class ListOperationSerializer : ISerializer> +{ + private readonly ISerializer> _listSerializer; + + public ListOperationSerializer(ISerializer itemSerializer) + { + // Use FixedSizeListSerializer for primitive types to avoid storing size for each element + // Use VariableSizeListSerializer for non-primitive types where elements may vary in size + _listSerializer = typeof(T).IsPrimitive + ? new FixedSizeListSerializer(itemSerializer) + : new VariableSizeListSerializer(itemSerializer); + } + + public bool TryCalculateSize(ref CollectionOperation value, out int size) + { + // 1 byte for operation type + size of the list + size = sizeof(byte); + + var items = value.Items; + if (_listSerializer.TryCalculateSize(ref items, out var listSize)) + { + size += listSize; + return true; + } + + return false; + } + + public void WriteTo(ref CollectionOperation value, ref Span span) + { + // Write operation type (1 byte) + span[0] = (byte)value.Type; + + // Write the list using the list serializer + var listSpan = span.Slice(sizeof(byte)); + var items = value.Items; + _listSerializer.WriteTo(ref items, ref listSpan); + } + + public void WriteTo(ref CollectionOperation value, IBufferWriter buffer) + { + throw new NotImplementedException(); + } + + public CollectionOperation Deserialize(ReadOnlySpan buffer) + { + // Read operation type + var operationType = (OperationType)buffer[0]; + + // Read the list using the list serializer + var listBuffer = buffer.Slice(sizeof(byte)); + var items = _listSerializer.Deserialize(listBuffer); + + return new CollectionOperation(operationType, items); + } +} diff --git a/src/RocksDb.Extensions/MergeAccessor.cs b/src/RocksDb.Extensions/MergeAccessor.cs new file mode 100644 index 0000000..6825a2e --- /dev/null +++ b/src/RocksDb.Extensions/MergeAccessor.cs @@ -0,0 +1,88 @@ +using System.Buffers; +using CommunityToolkit.HighPerformance.Buffers; + +namespace RocksDb.Extensions; + +internal class MergeAccessor : RocksDbAccessor, IMergeAccessor +{ + private readonly ISerializer _operandSerializer; + + public MergeAccessor( + RocksDbContext db, + ColumnFamily columnFamily, + ISerializer keySerializer, + ISerializer valueSerializer, + ISerializer operandSerializer) : base(db, columnFamily, keySerializer, valueSerializer) + { + _operandSerializer = operandSerializer; + } + + public void Merge(TKey key, TOperand operand) + { + byte[]? rentedKeyBuffer = null; + bool useSpanAsKey; + // ReSharper disable once AssignmentInConditionalExpression + Span keyBuffer = (useSpanAsKey = KeySerializer.TryCalculateSize(ref key, out var keySize)) + ? keySize < MaxStackSize + ? stackalloc byte[keySize] + : (rentedKeyBuffer = ArrayPool.Shared.Rent(keySize)).AsSpan(0, keySize) + : Span.Empty; + + ReadOnlySpan keySpan = keyBuffer; + ArrayPoolBufferWriter? keyBufferWriter = null; + + byte[]? rentedOperandBuffer = null; + bool useSpanAsOperand; + // ReSharper disable once AssignmentInConditionalExpression + Span operandBuffer = (useSpanAsOperand = _operandSerializer.TryCalculateSize(ref operand, out var operandSize)) + ? operandSize < MaxStackSize + ? stackalloc byte[operandSize] + : (rentedOperandBuffer = ArrayPool.Shared.Rent(operandSize)).AsSpan(0, operandSize) + : Span.Empty; + + + ReadOnlySpan operandSpan = operandBuffer; + ArrayPoolBufferWriter? operandBufferWriter = null; + + try + { + if (useSpanAsKey) + { + KeySerializer.WriteTo(ref key, ref keyBuffer); + } + else + { + keyBufferWriter = new ArrayPoolBufferWriter(); + KeySerializer.WriteTo(ref key, keyBufferWriter); + keySpan = keyBufferWriter.WrittenSpan; + } + + if (useSpanAsOperand) + { + _operandSerializer.WriteTo(ref operand, ref operandBuffer); + } + else + { + operandBufferWriter = new ArrayPoolBufferWriter(); + _operandSerializer.WriteTo(ref operand, operandBufferWriter); + operandSpan = operandBufferWriter.WrittenSpan; + } + + RocksDbContext.Db.Merge(keySpan, operandSpan, ColumnFamily.Handle, RocksDbContext.WriteOptions); + } + finally + { + keyBufferWriter?.Dispose(); + operandBufferWriter?.Dispose(); + if (rentedKeyBuffer is not null) + { + ArrayPool.Shared.Return(rentedKeyBuffer); + } + + if (rentedOperandBuffer is not null) + { + ArrayPool.Shared.Return(rentedOperandBuffer); + } + } + } +} diff --git a/src/RocksDb.Extensions/MergeOperatorConfig.cs b/src/RocksDb.Extensions/MergeOperatorConfig.cs new file mode 100644 index 0000000..f4b0215 --- /dev/null +++ b/src/RocksDb.Extensions/MergeOperatorConfig.cs @@ -0,0 +1,141 @@ +using System.Buffers; +using System.Runtime.CompilerServices; +using CommunityToolkit.HighPerformance.Buffers; + +namespace RocksDb.Extensions; + +/// +/// Internal configuration for a merge operator associated with a column family. +/// +internal class MergeOperatorConfig +{ + /// + /// Gets the name of the merge operator. + /// + public string Name { get; set; } = null!; + + /// + /// Gets the full merge callback delegate. + /// + public global::RocksDbSharp.MergeOperators.FullMergeFunc FullMerge { get; set; } = null!; + + /// + /// Gets the partial merge callback delegate. + /// + public global::RocksDbSharp.MergeOperators.PartialMergeFunc PartialMerge { get; set; } = null!; + + internal static MergeOperatorConfig CreateMergeOperatorConfig( + IMergeOperator mergeOperator, + ISerializer valueSerializer, + ISerializer operandSerializer) + { + return new MergeOperatorConfig + { + Name = mergeOperator.Name, + FullMerge = (ReadOnlySpan _, bool hasExistingValue, ReadOnlySpan existingValue, global::RocksDbSharp.MergeOperators.OperandsEnumerator operands, out bool success) => + { + return FullMergeCallback(hasExistingValue, existingValue, operands, mergeOperator, valueSerializer, operandSerializer, out success); + }, + PartialMerge = (ReadOnlySpan _, global::RocksDbSharp.MergeOperators.OperandsEnumerator operands, out bool success) => + { + return PartialMergeCallback(operands, mergeOperator, operandSerializer, out success); + } + }; + } + + private static byte[] FullMergeCallback(bool hasExistingValue, + ReadOnlySpan existingValue, + global::RocksDbSharp.MergeOperators.OperandsEnumerator operands, + IMergeOperator mergeOperator, + ISerializer valueSerializer, + ISerializer operandSerializer, + out bool success) + { + var existing = hasExistingValue ? valueSerializer.Deserialize(existingValue) : default!; + + var operandArray = ArrayPool.Shared.Rent(operands.Count); + try + { + for (int i = 0; i < operands.Count; i++) + { + operandArray[i] = operandSerializer.Deserialize(operands.Get(i)); + } + + var operandSpan = operandArray.AsSpan(0, operands.Count); + var (mergeSuccess, result) = mergeOperator.FullMerge(existing, operandSpan); + + if (!mergeSuccess) + { + success = false; + return Array.Empty(); + } + + success = true; + return SerializeValue(result, valueSerializer); + } + catch + { + // RocksDB callback context doesn't allow exceptions to propagate + success = false; + return Array.Empty(); + } + finally + { + ArrayPool.Shared.Return(operandArray, clearArray: RuntimeHelpers.IsReferenceOrContainsReferences()); + } + } + + private static byte[] PartialMergeCallback(global::RocksDbSharp.MergeOperators.OperandsEnumerator operands, + IMergeOperator mergeOperator, + ISerializer operandSerializer, + out bool success) + { + var operandArray = ArrayPool.Shared.Rent(operands.Count); + try + { + for (int i = 0; i < operands.Count; i++) + { + operandArray[i] = operandSerializer.Deserialize(operands.Get(i)); + } + + var operandSpan = operandArray.AsSpan(0, operands.Count); + var (mergeSuccess, result) = mergeOperator.PartialMerge(operandSpan); + + if (!mergeSuccess) + { + success = false; + return Array.Empty(); + } + + success = true; + return SerializeValue(result, operandSerializer); + } + catch + { + // RocksDB callback context doesn't allow exceptions to propagate + success = false; + return Array.Empty(); + } + finally + { + ArrayPool.Shared.Return(operandArray, clearArray: RuntimeHelpers.IsReferenceOrContainsReferences()); + } + } + + private static byte[] SerializeValue(T value, ISerializer serializer) + { + if (serializer.TryCalculateSize(ref value, out var size)) + { + var buffer = new byte[size]; + var span = buffer.AsSpan(); + serializer.WriteTo(ref value, ref span); + return buffer; + } + else + { + using var bufferWriter = new ArrayPoolBufferWriter(); + serializer.WriteTo(ref value, bufferWriter); + return bufferWriter.WrittenSpan.ToArray(); + } + } +} diff --git a/src/RocksDb.Extensions/MergeOperators/CollectionOperation.cs b/src/RocksDb.Extensions/MergeOperators/CollectionOperation.cs new file mode 100644 index 0000000..a0c8c34 --- /dev/null +++ b/src/RocksDb.Extensions/MergeOperators/CollectionOperation.cs @@ -0,0 +1,49 @@ +namespace RocksDb.Extensions.MergeOperators; + +/// +/// Represents an operation (add or remove) to apply to a collection via merge. +/// +/// The type of elements in the collection. +public class CollectionOperation +{ + /// + /// Gets the type of operation to perform. + /// + public OperationType Type { get; } + + /// + /// Gets the items to add or remove. + /// + public IList Items { get; } + + /// + /// Creates a new collection operation. + /// + /// The type of operation. + /// The items to add or remove. + public CollectionOperation(OperationType type, IList items) + { + Type = type; + Items = items ?? throw new ArgumentNullException(nameof(items)); + } + + /// + /// Creates an Add operation for the specified items. + /// + public static CollectionOperation Add(params T[] items) => new(OperationType.Add, items); + + /// + /// Creates an Add operation for the specified items. + /// + public static CollectionOperation Add(IList items) => new(OperationType.Add, items); + + /// + /// Creates a Remove operation for the specified items. + /// + public static CollectionOperation Remove(params T[] items) => new(OperationType.Remove, items); + + /// + /// Creates a Remove operation for the specified items. + /// + public static CollectionOperation Remove(IList items) => new(OperationType.Remove, items); +} diff --git a/src/RocksDb.Extensions/MergeOperators/ListMergeOperator.cs b/src/RocksDb.Extensions/MergeOperators/ListMergeOperator.cs new file mode 100644 index 0000000..ff09b84 --- /dev/null +++ b/src/RocksDb.Extensions/MergeOperators/ListMergeOperator.cs @@ -0,0 +1,88 @@ +namespace RocksDb.Extensions.MergeOperators; + +/// +/// A merge operator that supports both adding and removing items from a list. +/// Each merge operand is a CollectionOperation that specifies whether to add or remove items. +/// Operations are applied in order, enabling atomic list modifications without read-before-write. +/// +/// The type of elements in the list. +/// +/// +/// The value type stored in RocksDB is IList<T> (the actual list contents), +/// while merge operands are CollectionOperation<T> (the operations to apply). +/// +/// +/// Remove operations delete the first occurrence of each item (same as ). +/// If an item to remove doesn't exist in the list, the operation is silently ignored. +/// +/// +/// The partial merge optimization can combine multiple Add operations into a single operation, +/// but will refuse to combine when Remove operations are present (returns Success = false). +/// This ensures correctness since the order of Add and Remove operations matters. +/// +/// +public class ListMergeOperator : IMergeOperator, CollectionOperation> +{ + /// + public string Name => $"ListMergeOperator<{typeof(T).Name}>"; + + /// + public (bool Success, IList Value) FullMerge(IList? existingValue, ReadOnlySpan> operands) + { + // Start with existing items or empty list + var result = existingValue != null ? new List(existingValue) : new List(); + + // Apply all operands in order + foreach (var operand in operands) + { + ApplyOperation(result, operand); + } + + return (true, result); + } + + /// + public (bool Success, CollectionOperation Operand) PartialMerge(ReadOnlySpan> operands) + { + var allAdds = new List(); + + foreach (var operand in operands) + { + if (operand.Type == OperationType.Remove) + { + // If there are any removes, we can't safely combine without knowing the existing state + // Return false to signal that RocksDB should keep operands separate + return (false, null!); + } + } + + foreach (var operand in operands) + { + foreach (var item in operand.Items) + { + allAdds.Add(item); + } + } + + // Only adds present - safe to combine + return (true, CollectionOperation.Add(allAdds)); + } + + private static void ApplyOperation(List result, CollectionOperation operation) + { + switch (operation.Type) + { + case OperationType.Add: + result.AddRange(operation.Items); + break; + + case OperationType.Remove: + foreach (var item in operation.Items) + { + result.Remove(item); // Removes first occurrence + } + + break; + } + } +} \ No newline at end of file diff --git a/src/RocksDb.Extensions/MergeOperators/OperationType.cs b/src/RocksDb.Extensions/MergeOperators/OperationType.cs new file mode 100644 index 0000000..d6305f6 --- /dev/null +++ b/src/RocksDb.Extensions/MergeOperators/OperationType.cs @@ -0,0 +1,17 @@ +namespace RocksDb.Extensions.MergeOperators; + +/// +/// Specifies the type of operation to perform on a collection. +/// +public enum OperationType +{ + /// + /// Add items to the collection. + /// + Add, + + /// + /// Remove items from the collection. + /// + Remove +} \ No newline at end of file diff --git a/src/RocksDb.Extensions/MergeableRocksDbStore.cs b/src/RocksDb.Extensions/MergeableRocksDbStore.cs new file mode 100644 index 0000000..a4f94a9 --- /dev/null +++ b/src/RocksDb.Extensions/MergeableRocksDbStore.cs @@ -0,0 +1,66 @@ +namespace RocksDb.Extensions; + +/// +/// Base class for a RocksDB store that supports merge operations. +/// Inherit from this class when you need to use RocksDB's merge operator functionality +/// for efficient atomic read-modify-write operations. +/// +/// The type of the store's keys. +/// The type of the store's values. +/// The type of the merge operand. +/// +/// +/// Merge operations are useful for: +/// - Counters: Increment/decrement without reading current value (TValue=long, TOperand=long) +/// - Lists: Append items without reading the entire list (TValue=IList<T>, TOperand=IList<T>) +/// - Lists with add/remove: Modify lists atomically (TValue=IList<T>, TOperand=CollectionOperation<T>) +/// +/// +/// When using this base class, you must register the store with a merge operator using +/// . +/// +/// +/// +/// +/// // Counter store where value and operand are the same type +/// public class CounterStore : MergeableRocksDbStore<string, long, long> +/// { +/// public CounterStore(IMergeAccessor<string, long, long> mergeAccessor) +/// : base(mergeAccessor) { } +/// +/// public void Increment(string key, long delta = 1) => Merge(key, delta); +/// } +/// +/// // Tags store where value is IList<string> but operand is CollectionOperation<string> +/// public class TagsStore : MergeableRocksDbStore<string, IList<string>, CollectionOperation<string>> +/// { +/// public TagsStore(IMergeAccessor<string, IList<string>, CollectionOperation<string>> mergeAccessor) +/// : base(mergeAccessor) { } +/// +/// public void AddTag(string key, string tag) => Merge(key, CollectionOperation<string>.Add(tag)); +/// public void RemoveTag(string key, string tag) => Merge(key, CollectionOperation<string>.Remove(tag)); +/// } +/// +/// +public abstract class MergeableRocksDbStore : RocksDbStoreBase +{ + private readonly IMergeAccessor _mergeAccessor; + + /// + /// Initializes a new instance of the class. + /// + /// The RocksDB accessor to use for database operations. + protected MergeableRocksDbStore(IMergeAccessor mergeAccessor) : base(mergeAccessor) + { + _mergeAccessor = mergeAccessor; + } + + /// + /// Performs an atomic merge operation on the value associated with the specified key. + /// This operation uses RocksDB's merge operator to combine the operand with the existing value + /// without requiring a separate read operation. + /// + /// The key to merge the operand with. + /// The operand to merge with the existing value. + public void Merge(TKey key, TOperand operand) => _mergeAccessor.Merge(key, operand); +} diff --git a/src/RocksDb.Extensions/RocksDb.Extensions.csproj b/src/RocksDb.Extensions/RocksDb.Extensions.csproj index a7e0c85..87ef619 100644 --- a/src/RocksDb.Extensions/RocksDb.Extensions.csproj +++ b/src/RocksDb.Extensions/RocksDb.Extensions.csproj @@ -27,6 +27,6 @@ - + diff --git a/src/RocksDb.Extensions/RocksDbAccessor.cs b/src/RocksDb.Extensions/RocksDbAccessor.cs index 8ad8fe5..bf93edf 100644 --- a/src/RocksDb.Extensions/RocksDbAccessor.cs +++ b/src/RocksDb.Extensions/RocksDbAccessor.cs @@ -7,22 +7,24 @@ namespace RocksDb.Extensions; internal class RocksDbAccessor : IRocksDbAccessor, ISpanDeserializer { - private const int MaxStackSize = 256; + private protected const int MaxStackSize = 256; - private readonly ISerializer _keySerializer; + protected readonly ISerializer KeySerializer; private readonly ISerializer _valueSerializer; - private readonly RocksDbContext _rocksDbContext; - private readonly ColumnFamily _columnFamily; + private protected readonly RocksDbContext RocksDbContext; + private protected readonly ColumnFamily ColumnFamily; private readonly bool _checkIfExists; + + private readonly object _syncRoot = new(); public RocksDbAccessor(RocksDbContext rocksDbContext, ColumnFamily columnFamily, ISerializer keySerializer, ISerializer valueSerializer) { - _rocksDbContext = rocksDbContext; - _columnFamily = columnFamily; - _keySerializer = keySerializer; + RocksDbContext = rocksDbContext; + ColumnFamily = columnFamily; + KeySerializer = keySerializer; _valueSerializer = valueSerializer; _checkIfExists = typeof(TValue).IsValueType; @@ -34,7 +36,7 @@ public void Remove(TKey key) bool useSpan; // ReSharper disable once AssignmentInConditionalExpression - Span keyBuffer = (useSpan = _keySerializer.TryCalculateSize(ref key, out var keySize)) + Span keyBuffer = (useSpan = KeySerializer.TryCalculateSize(ref key, out var keySize)) ? keySize < MaxStackSize ? stackalloc byte[keySize] : (rentedKeyBuffer = ArrayPool.Shared.Rent(keySize)).AsSpan(0, keySize) @@ -47,16 +49,16 @@ public void Remove(TKey key) { if (useSpan) { - _keySerializer.WriteTo(ref key, ref keyBuffer); + KeySerializer.WriteTo(ref key, ref keyBuffer); } else { keyBufferWriter = new ArrayPoolBufferWriter(); - _keySerializer.WriteTo(ref key, keyBufferWriter); + KeySerializer.WriteTo(ref key, keyBufferWriter); keySpan = keyBufferWriter.WrittenSpan; } - _rocksDbContext.Db.Remove(keySpan, _columnFamily.Handle); + RocksDbContext.Db.Remove(keySpan, ColumnFamily.Handle, RocksDbContext.WriteOptions); } finally { @@ -73,7 +75,7 @@ public void Put(TKey key, TValue value) byte[]? rentedKeyBuffer = null; bool useSpanAsKey; // ReSharper disable once AssignmentInConditionalExpression - Span keyBuffer = (useSpanAsKey = _keySerializer.TryCalculateSize(ref key, out var keySize)) + Span keyBuffer = (useSpanAsKey = KeySerializer.TryCalculateSize(ref key, out var keySize)) ? keySize < MaxStackSize ? stackalloc byte[keySize] : (rentedKeyBuffer = ArrayPool.Shared.Rent(keySize)).AsSpan(0, keySize) @@ -99,12 +101,12 @@ public void Put(TKey key, TValue value) { if (useSpanAsKey) { - _keySerializer.WriteTo(ref key, ref keyBuffer); + KeySerializer.WriteTo(ref key, ref keyBuffer); } else { keyBufferWriter = new ArrayPoolBufferWriter(); - _keySerializer.WriteTo(ref key, keyBufferWriter); + KeySerializer.WriteTo(ref key, keyBufferWriter); keySpan = keyBufferWriter.WrittenSpan; } @@ -119,7 +121,7 @@ public void Put(TKey key, TValue value) valueSpan = valueBufferWriter.WrittenSpan; } - _rocksDbContext.Db.Put(keySpan, valueSpan, _columnFamily.Handle); + RocksDbContext.Db.Put(keySpan, valueSpan, ColumnFamily.Handle, RocksDbContext.WriteOptions); } finally { @@ -143,7 +145,7 @@ public bool TryGet(TKey key, [MaybeNullWhen(false)] out TValue value) bool useSpan; // ReSharper disable once AssignmentInConditionalExpression - Span keyBuffer = (useSpan = _keySerializer.TryCalculateSize(ref key, out var keySize)) + Span keyBuffer = (useSpan = KeySerializer.TryCalculateSize(ref key, out var keySize)) ? keySize < MaxStackSize ? stackalloc byte[keySize] : (rentedKeyBuffer = ArrayPool.Shared.Rent(keySize)).AsSpan(0, keySize) @@ -156,22 +158,22 @@ public bool TryGet(TKey key, [MaybeNullWhen(false)] out TValue value) { if (useSpan) { - _keySerializer.WriteTo(ref key, ref keyBuffer); + KeySerializer.WriteTo(ref key, ref keyBuffer); } else { keyBufferWriter = new ArrayPoolBufferWriter(); - _keySerializer.WriteTo(ref key, keyBufferWriter); + KeySerializer.WriteTo(ref key, keyBufferWriter); keySpan = keyBufferWriter.WrittenSpan; } - if (_checkIfExists && _rocksDbContext.Db.HasKey(keySpan, _columnFamily.Handle) == false) + if (_checkIfExists && RocksDbContext.Db.HasKey(keySpan, ColumnFamily.Handle) == false) { value = default; return false; } - value = _rocksDbContext.Db.Get(keySpan, this, _columnFamily.Handle); + value = RocksDbContext.Db.Get(keySpan, this, ColumnFamily.Handle); return value != null; } finally @@ -202,7 +204,7 @@ public void PutRange(ReadOnlySpan keys, ReadOnlySpan values) AddToBatch(keys[i], values[i], batch); } - _rocksDbContext.Db.Write(batch); + RocksDbContext.Db.Write(batch, RocksDbContext.WriteOptions); } public void PutRange(ReadOnlySpan values, Func keySelector) @@ -215,7 +217,7 @@ public void PutRange(ReadOnlySpan values, Func keySelector AddToBatch(key, value, batch); } - _rocksDbContext.Db.Write(batch); + RocksDbContext.Db.Write(batch, RocksDbContext.WriteOptions); } public void PutRange(IReadOnlyList<(TKey key, TValue value)> items) @@ -227,7 +229,7 @@ public void PutRange(IReadOnlyList<(TKey key, TValue value)> items) AddToBatch(key, value, batch); } - _rocksDbContext.Db.Write(batch); + RocksDbContext.Db.Write(batch, RocksDbContext.WriteOptions); } private void AddToBatch(TKey key, TValue value, WriteBatch batch) @@ -235,7 +237,7 @@ private void AddToBatch(TKey key, TValue value, WriteBatch batch) byte[]? rentedKeyBuffer = null; bool useSpanAsKey; // ReSharper disable once AssignmentInConditionalExpression - Span keyBuffer = (useSpanAsKey = _keySerializer.TryCalculateSize(ref key, out var keySize)) + Span keyBuffer = (useSpanAsKey = KeySerializer.TryCalculateSize(ref key, out var keySize)) ? keySize < MaxStackSize ? stackalloc byte[keySize] : (rentedKeyBuffer = ArrayPool.Shared.Rent(keySize)).AsSpan(0, keySize) @@ -261,12 +263,12 @@ private void AddToBatch(TKey key, TValue value, WriteBatch batch) { if (useSpanAsKey) { - _keySerializer.WriteTo(ref key, ref keyBuffer); + KeySerializer.WriteTo(ref key, ref keyBuffer); } else { keyBufferWriter = new ArrayPoolBufferWriter(); - _keySerializer.WriteTo(ref key, keyBufferWriter); + KeySerializer.WriteTo(ref key, keyBufferWriter); keySpan = keyBufferWriter.WrittenSpan; } @@ -280,8 +282,8 @@ private void AddToBatch(TKey key, TValue value, WriteBatch batch) _valueSerializer.WriteTo(ref value, valueBufferWriter); valueSpan = valueBufferWriter.WrittenSpan; } - - _ = batch.Put(keySpan, valueSpan, _columnFamily.Handle); + + _ = batch.Put(keySpan, valueSpan, ColumnFamily.Handle); } finally { @@ -298,21 +300,21 @@ private void AddToBatch(TKey key, TValue value, WriteBatch batch) } } } - + public IEnumerable GetAllKeys() { - using var iterator = _rocksDbContext.Db.NewIterator(_columnFamily.Handle); + using var iterator = RocksDbContext.Db.NewIterator(ColumnFamily.Handle); _ = iterator.SeekToFirst(); while (iterator.Valid()) { - yield return _keySerializer.Deserialize(iterator.Key()); + yield return KeySerializer.Deserialize(iterator.Key()); _ = iterator.Next(); } } public IEnumerable GetAllValues() { - using var iterator = _rocksDbContext.Db.NewIterator(_columnFamily.Handle); + using var iterator = RocksDbContext.Db.NewIterator(ColumnFamily.Handle); _ = iterator.SeekToFirst(); while (iterator.Valid()) { @@ -320,10 +322,10 @@ public IEnumerable GetAllValues() _ = iterator.Next(); } } - + public int Count() { - using var iterator = _rocksDbContext.Db.NewIterator(_columnFamily.Handle); + using var iterator = RocksDbContext.Db.NewIterator(ColumnFamily.Handle); _ = iterator.SeekToFirst(); var count = 0; while (iterator.Valid()) @@ -341,7 +343,7 @@ public bool HasKey(TKey key) bool useSpan; // ReSharper disable once AssignmentInConditionalExpression - Span keyBuffer = (useSpan = _keySerializer.TryCalculateSize(ref key, out var keySize)) + Span keyBuffer = (useSpan = KeySerializer.TryCalculateSize(ref key, out var keySize)) ? keySize < MaxStackSize ? stackalloc byte[keySize] : (rentedKeyBuffer = ArrayPool.Shared.Rent(keySize)).AsSpan(0, keySize) @@ -354,16 +356,16 @@ public bool HasKey(TKey key) { if (useSpan) { - _keySerializer.WriteTo(ref key, ref keyBuffer); + KeySerializer.WriteTo(ref key, ref keyBuffer); } else { keyBufferWriter = new ArrayPoolBufferWriter(); - _keySerializer.WriteTo(ref key, keyBufferWriter); + KeySerializer.WriteTo(ref key, keyBufferWriter); keySpan = keyBufferWriter.WrittenSpan; } - return _rocksDbContext.Db.HasKey(keySpan, _columnFamily.Handle); + return RocksDbContext.Db.HasKey(keySpan, ColumnFamily.Handle); } finally { @@ -374,14 +376,18 @@ public bool HasKey(TKey key) } } } - + public void Clear() { - var prevColumnFamilyHandle = _columnFamily.Handle; - _rocksDbContext.Db.DropColumnFamily(_columnFamily.Name); - _columnFamily.Handle = _rocksDbContext.Db.CreateColumnFamily(_rocksDbContext.ColumnFamilyOptions, _columnFamily.Name); + lock (_syncRoot) + { + var prevColumnFamilyHandle = ColumnFamily.Handle; + RocksDbContext.Db.DropColumnFamily(ColumnFamily.Name); - Native.Instance.rocksdb_column_family_handle_destroy(prevColumnFamilyHandle.Handle); - } -} + var cfOptions = RocksDbContext.CreateColumnFamilyOptions(ColumnFamily.Name); + ColumnFamily.Handle = RocksDbContext.Db.CreateColumnFamily(cfOptions, ColumnFamily.Name); + Native.Instance.rocksdb_column_family_handle_destroy(prevColumnFamilyHandle.Handle); + } + } +} \ No newline at end of file diff --git a/src/RocksDb.Extensions/RocksDbBuilder.cs b/src/RocksDb.Extensions/RocksDbBuilder.cs index 4123075..9c88392 100644 --- a/src/RocksDb.Extensions/RocksDbBuilder.cs +++ b/src/RocksDb.Extensions/RocksDbBuilder.cs @@ -22,7 +22,10 @@ public IRocksDbBuilder AddStore(string columnFamily) where throw new InvalidOperationException($"{columnFamily} is already registered."); } - _ = _serviceCollection.Configure(options => { options.ColumnFamilies.Add(columnFamily); }); + _ = _serviceCollection.Configure(options => + { + options.ColumnFamilies.Add(columnFamily); + }); _serviceCollection.AddKeyedSingleton(columnFamily, (provider, _) => { @@ -31,6 +34,7 @@ public IRocksDbBuilder AddStore(string columnFamily) where var rocksDbOptions = provider.GetRequiredService>(); var keySerializer = CreateSerializer(rocksDbOptions.Value.SerializerFactories); var valueSerializer = CreateSerializer(rocksDbOptions.Value.SerializerFactories); + var rocksDbAccessor = new RocksDbAccessor( rocksDbContext, new ColumnFamily(columnFamilyHandle, columnFamily), @@ -45,6 +49,49 @@ public IRocksDbBuilder AddStore(string columnFamily) where return this; } + public IRocksDbBuilder AddMergeableStore(string columnFamily, IMergeOperator mergeOperator) + where TStore : MergeableRocksDbStore + { + if (!_columnFamilyLookup.Add(columnFamily)) + { + throw new InvalidOperationException($"{columnFamily} is already registered."); + } + + _ = _serviceCollection.Configure(options => + { + options.ColumnFamilies.Add(columnFamily); + + var valueSerializer = CreateSerializer(options.SerializerFactories); + var operandSerializer = CreateSerializer(options.SerializerFactories); + var config = MergeOperatorConfig.CreateMergeOperatorConfig(mergeOperator, valueSerializer, operandSerializer); + options.MergeOperators[columnFamily] = config; + }); + + _serviceCollection.AddKeyedSingleton(columnFamily, (provider, _) => + { + var rocksDbContext = provider.GetRequiredService(); + var columnFamilyHandle = rocksDbContext.Db.GetColumnFamily(columnFamily); + var rocksDbOptions = provider.GetRequiredService>(); + var keySerializer = CreateSerializer(rocksDbOptions.Value.SerializerFactories); + var valueSerializer = CreateSerializer(rocksDbOptions.Value.SerializerFactories); + var operandSerializer = CreateSerializer(rocksDbOptions.Value.SerializerFactories); + + var rocksDbAccessor = new MergeAccessor( + rocksDbContext, + new ColumnFamily(columnFamilyHandle, columnFamily), + keySerializer, + valueSerializer, + operandSerializer + ); + + return ActivatorUtilities.CreateInstance(provider, rocksDbAccessor); + }); + + _serviceCollection.TryAddSingleton(typeof(TStore), provider => provider.GetRequiredKeyedService(columnFamily)); + + return this; + } + private static ISerializer CreateSerializer(IReadOnlyList serializerFactories) { var type = typeof(T); @@ -70,11 +117,25 @@ private static ISerializer CreateSerializer(IReadOnlyList) Activator.CreateInstance(typeof(FixedSizeListSerializer<>).MakeGenericType(elementType), scalarSerializer); + return (ISerializer) Activator.CreateInstance(typeof(FixedSizeListSerializer<>).MakeGenericType(elementType), scalarSerializer)!; } // Use variable size list serializer for non-primitive types - return (ISerializer) Activator.CreateInstance(typeof(VariableSizeListSerializer<>).MakeGenericType(elementType), scalarSerializer); + return (ISerializer) Activator.CreateInstance(typeof(VariableSizeListSerializer<>).MakeGenericType(elementType), scalarSerializer)!; + } + + // Handle CollectionOperation for the ListMergeOperator + if (type.IsGenericType && type.GetGenericTypeDefinition() == typeof(MergeOperators.CollectionOperation<>)) + { + var itemType = type.GetGenericArguments()[0]; + + // Create the item serializer + var itemSerializer = typeof(RocksDbBuilder).GetMethod(nameof(CreateSerializer), BindingFlags.NonPublic | BindingFlags.Static) + ?.MakeGenericMethod(itemType) + .Invoke(null, new object[] { serializerFactories }); + + // Create ListOperationSerializer + return (ISerializer) Activator.CreateInstance(typeof(ListOperationSerializer<>).MakeGenericType(itemType), itemSerializer)!; } throw new InvalidOperationException($"Type {type.FullName} cannot be used as RocksDbStore key/value. " + diff --git a/src/RocksDb.Extensions/RocksDbContext.cs b/src/RocksDb.Extensions/RocksDbContext.cs index 4cc489e..4ba20ca 100644 --- a/src/RocksDb.Extensions/RocksDbContext.cs +++ b/src/RocksDb.Extensions/RocksDbContext.cs @@ -5,9 +5,9 @@ namespace RocksDb.Extensions; internal class RocksDbContext : IDisposable { - private readonly RocksDbSharp.RocksDb _rocksDb; + private readonly WriteOptions _writeOptions; + private readonly Dictionary _mergeOperators; private readonly Cache _cache; - private readonly ColumnFamilyOptions _userSpecifiedOptions; private const long BlockCacheSize = 50 * 1024 * 1024L; private const long BlockSize = 4096L; @@ -17,23 +17,7 @@ internal class RocksDbContext : IDisposable public RocksDbContext(IOptions options) { var dbOptions = new DbOptions(); - _userSpecifiedOptions = new ColumnFamilyOptions(); - var tableConfig = new BlockBasedTableOptions(); _cache = Cache.CreateLru(BlockCacheSize); - tableConfig.SetBlockCache(_cache); - tableConfig.SetBlockSize(BlockSize); - - var filter = BloomFilterPolicy.Create(); - tableConfig.SetFilterPolicy(filter); - _userSpecifiedOptions.SetBlockBasedTableFactory(tableConfig); - _userSpecifiedOptions.SetWriteBufferSize(WriteBufferSize); - _userSpecifiedOptions.SetCompression(Compression.No); - _userSpecifiedOptions.SetCompactionStyle(Compaction.Universal); - _userSpecifiedOptions.SetMaxWriteBufferNumberToMaintain(MaxWriteBuffers); - _userSpecifiedOptions.SetCreateIfMissing(); - _userSpecifiedOptions.SetCreateMissingColumnFamilies(); - _userSpecifiedOptions.SetErrorIfExists(false); - _userSpecifiedOptions.SetInfoLogLevel(InfoLogLevel.Error); // this is the recommended way to increase parallelism in RocksDb // note that the current implementation of setIncreaseParallelism affects the number @@ -45,25 +29,38 @@ public RocksDbContext(IOptions options) dbOptions.IncreaseParallelism(Math.Max(Environment.ProcessorCount, 2)); dbOptions.SetCreateIfMissing(); dbOptions.SetCreateMissingColumnFamilies(); + dbOptions.SetErrorIfExists(false); + dbOptions.SetInfoLogLevel(InfoLogLevel.Error); dbOptions.SetUseDirectReads(options.Value.UseDirectReads); dbOptions.SetUseDirectIoForFlushAndCompaction(options.Value.UseDirectIoForFlushAndCompaction); + dbOptions.EnableStatistics(); + dbOptions.SetMaxWriteBufferNumber(MaxWriteBuffers); + dbOptions.SetWriteBufferSize(WriteBufferSize); + dbOptions.SetCompression(Compression.No); + dbOptions.SetCompactionStyle(Compaction.Universal); + + var tableConfig = new BlockBasedTableOptions(); + tableConfig.SetBlockCache(_cache); + tableConfig.SetBlockSize(BlockSize); + + var filter = BloomFilterPolicy.Create(); + tableConfig.SetFilterPolicy(filter); + + dbOptions.SetBlockBasedTableFactory(tableConfig); + + _writeOptions = new WriteOptions(); + _writeOptions.DisableWal(1); - var fOptions = new FlushOptions(); - fOptions.SetWaitForFlush(options.Value.WaitForFlush); - - var writeOptions = new WriteOptions(); - writeOptions.DisableWal(1); - - _userSpecifiedOptions.EnableStatistics(); - - var columnFamilies = CreateColumnFamilies(options.Value.ColumnFamilies, _userSpecifiedOptions); + _mergeOperators = options.Value.MergeOperators; + + var columnFamilies = CreateColumnFamilies(options.Value.ColumnFamilies); if (options.Value.DeleteExistingDatabaseOnStartup) { DestroyDatabase(options.Value.Path); } - _rocksDb = RocksDbSharp.RocksDb.Open(dbOptions, options.Value.Path, columnFamilies); + Db = RocksDbSharp.RocksDb.Open(dbOptions, options.Value.Path, columnFamilies); } private static void DestroyDatabase(string path) @@ -72,16 +69,33 @@ private static void DestroyDatabase(string path) Native.Instance.rocksdb_destroy_db(dbOptions.Handle, path); } - public RocksDbSharp.RocksDb Db => _rocksDb; + public RocksDbSharp.RocksDb Db { get; } + + public WriteOptions WriteOptions => _writeOptions; + + public ColumnFamilyOptions CreateColumnFamilyOptions(string columnFamilyName) + { + var cfOptions = new ColumnFamilyOptions(); + if (_mergeOperators.TryGetValue(columnFamilyName, out var mergeOperatorConfig)) + { + var mergeOp = global::RocksDbSharp.MergeOperators.Create( + mergeOperatorConfig.Name, + mergeOperatorConfig.PartialMerge, + mergeOperatorConfig.FullMerge); + + cfOptions.SetMergeOperator(mergeOp); + } + + return cfOptions; + } - public ColumnFamilyOptions ColumnFamilyOptions => _userSpecifiedOptions; - private static ColumnFamilies CreateColumnFamilies(IReadOnlyList columnFamilyNames, - ColumnFamilyOptions columnFamilyOptions) + private ColumnFamilies CreateColumnFamilies(IReadOnlyList columnFamilyNames) { - var columnFamilies = new ColumnFamilies(columnFamilyOptions); + var columnFamilies = new ColumnFamilies(); foreach (var columnFamilyName in columnFamilyNames) { + var columnFamilyOptions = CreateColumnFamilyOptions(columnFamilyName); columnFamilies.Add(columnFamilyName, columnFamilyOptions); } diff --git a/src/RocksDb.Extensions/RocksDbOptions.cs b/src/RocksDb.Extensions/RocksDbOptions.cs index f8c9a60..09f5202 100644 --- a/src/RocksDb.Extensions/RocksDbOptions.cs +++ b/src/RocksDb.Extensions/RocksDbOptions.cs @@ -33,6 +33,12 @@ public class RocksDbOptions internal List ColumnFamilies { get; } = new(); + /// + /// Internal dictionary of merge operators per column family. + /// Column family names are case-sensitive, matching RocksDB's behavior. + /// + internal Dictionary MergeOperators { get; } = new(); + /// /// Enables direct I/O mode for reads, which bypasses the OS page cache. /// diff --git a/src/RocksDb.Extensions/RocksDbStore.cs b/src/RocksDb.Extensions/RocksDbStore.cs new file mode 100644 index 0000000..3b573e6 --- /dev/null +++ b/src/RocksDb.Extensions/RocksDbStore.cs @@ -0,0 +1,24 @@ +namespace RocksDb.Extensions; + +/// +/// Base class for a RocksDB store that provides basic operations such as add, update, remove, get and get all. +/// +/// The type of the store's keys. +/// The type of the store's values. +public abstract class RocksDbStore : RocksDbStoreBase +{ + /// + /// Initializes a new instance of the class with the specified RocksDB accessor. + /// + /// The RocksDB accessor to use for database operations. + protected RocksDbStore(IRocksDbAccessor rocksDbAccessor) : base(rocksDbAccessor) + { + } + + /// + /// Gets all the values in the store. (Obsolete, use GetAllValues instead) + /// + /// An enumerable collection of all the values in the store. + [Obsolete("Use GetAllValues() instead.")] + public IEnumerable GetAll() => GetAllValues(); +} diff --git a/src/RocksDb.Extensions/IRocksDbStore.cs b/src/RocksDb.Extensions/RocksDbStoreBase.cs similarity index 88% rename from src/RocksDb.Extensions/IRocksDbStore.cs rename to src/RocksDb.Extensions/RocksDbStoreBase.cs index c39f6b9..0d04dbe 100644 --- a/src/RocksDb.Extensions/IRocksDbStore.cs +++ b/src/RocksDb.Extensions/RocksDbStoreBase.cs @@ -1,21 +1,28 @@ +using System.ComponentModel; using System.Diagnostics.CodeAnalysis; namespace RocksDb.Extensions; /// -/// Base class for a RocksDB store that provides basic operations such as add, update, remove, get and get all. +/// Base class containing common operations for RocksDB stores. +/// This class is not intended for direct use by library consumers. +/// Use or instead. /// /// The type of the store's keys. /// The type of the store's values. -public abstract class RocksDbStore +[EditorBrowsable(EditorBrowsableState.Never)] +public abstract class RocksDbStoreBase { private readonly IRocksDbAccessor _rocksDbAccessor; /// - /// Initializes a new instance of the class with the specified RocksDB accessor. + /// Initializes a new instance of the class. /// /// The RocksDB accessor to use for database operations. - protected RocksDbStore(IRocksDbAccessor rocksDbAccessor) => _rocksDbAccessor = rocksDbAccessor; + protected internal RocksDbStoreBase(IRocksDbAccessor rocksDbAccessor) + { + _rocksDbAccessor = rocksDbAccessor; + } /// /// Removes the specified key and its associated value from the store. @@ -64,13 +71,6 @@ public abstract class RocksDbStore /// An enumerable collection of all the values in the store. public IEnumerable GetAllValues() => _rocksDbAccessor.GetAllValues(); - /// - /// Gets all the values in the store. (Obsolete, use GetAllValues instead) - /// - /// An enumerable collection of all the values in the store. - [Obsolete("Use GetAllValues() instead.")] - public IEnumerable GetAll() => GetAllValues(); - /// /// Determines whether the store contains a value for a specific key. /// @@ -108,3 +108,4 @@ public abstract class RocksDbStore /// An enumerable collection of all the keys in the store. public IEnumerable GetAllKeys() => _rocksDbAccessor.GetAllKeys(); } + diff --git a/src/RocksDb.Extensions/VariableSizeListSerializer.cs b/src/RocksDb.Extensions/VariableSizeListSerializer.cs index 6e51e5c..0c6e47b 100644 --- a/src/RocksDb.Extensions/VariableSizeListSerializer.cs +++ b/src/RocksDb.Extensions/VariableSizeListSerializer.cs @@ -39,6 +39,12 @@ public bool TryCalculateSize(ref IList value, out int size) size += sizeof(int); size += elementSize; } + else + { + // Element serializer can't calculate size, so we can't either + size = 0; + return false; + } } return true; diff --git a/test/RocksDb.Extensions.Tests/MergeOperatorTests.cs b/test/RocksDb.Extensions.Tests/MergeOperatorTests.cs new file mode 100644 index 0000000..ae997d5 --- /dev/null +++ b/test/RocksDb.Extensions.Tests/MergeOperatorTests.cs @@ -0,0 +1,341 @@ +using NUnit.Framework; +using RocksDb.Extensions.MergeOperators; +using RocksDb.Extensions.ProtoBufNet; +using RocksDb.Extensions.Tests.Protos; +using RocksDb.Extensions.Tests.Utils; + +namespace RocksDb.Extensions.Tests; + +public class MergeOperatorTests +{ + [Test] + public void should_add_to_existing_list_using_merge_operation() + { + // Arrange + using var testFixture = TestFixture.Create(rockDb => + { + rockDb.AddMergeableStore, TagsStore, CollectionOperation>("tags", new ListMergeOperator()); + }); + + var store = testFixture.GetStore(); + var key = "article-1"; + + // Act + store.Put(key, new List { "csharp", "dotnet" }); + store.AddTags(key, "rocksdb"); + + // Assert + Assert.That(store.TryGet(key, out var tags), Is.True); + Assert.That(tags, Is.Not.Null); + Assert.That(tags!.Count, Is.EqualTo(3)); + Assert.That(tags, Does.Contain("csharp")); + Assert.That(tags, Does.Contain("dotnet")); + Assert.That(tags, Does.Contain("rocksdb")); + } + + [Test] + public void should_add_items_to_list_using_list_merge_operator() + { + // Arrange + using var testFixture = TestFixture.Create(rockDb => + { + rockDb.AddMergeableStore, TagsStore, CollectionOperation>("tags", new ListMergeOperator()); + }); + + var store = testFixture.GetStore(); + var key = "article-1"; + + // Act + store.AddTags(key, "csharp", "dotnet"); + store.AddTags(key, "rocksdb"); + + // Assert + Assert.That(store.TryGet(key, out var tags), Is.True); + Assert.That(tags, Is.Not.Null); + Assert.That(tags!.Count, Is.EqualTo(3)); + Assert.That(tags, Does.Contain("csharp")); + Assert.That(tags, Does.Contain("dotnet")); + Assert.That(tags, Does.Contain("rocksdb")); + } + + [Test] + public void should_remove_items_from_list_using_list_merge_operator() + { + // Arrange + using var testFixture = TestFixture.Create(rockDb => + { + rockDb.AddMergeableStore, TagsStore, CollectionOperation>("tags", new ListMergeOperator()); + }); + + var store = testFixture.GetStore(); + var key = "article-1"; + + // Act - Add items, then remove some + store.Merge(key, CollectionOperation.Add("csharp", "dotnet", "java", "python")); + store.RemoveTags(key, "java", "python"); + + // Assert + Assert.That(store.TryGet(key, out var tags), Is.True); + Assert.That(tags, Is.Not.Null); + Assert.That(tags!.Count, Is.EqualTo(2)); + Assert.That(tags, Does.Contain("csharp")); + Assert.That(tags, Does.Contain("dotnet")); + Assert.That(tags, Does.Not.Contain("java")); + Assert.That(tags, Does.Not.Contain("python")); + } + + [Test] + public void should_handle_mixed_add_and_remove_operations() + { + // Arrange + using var testFixture = TestFixture.Create(rockDb => + { + rockDb.AddMergeableStore, TagsStore, CollectionOperation>("tags", new ListMergeOperator()); + }); + + var store = testFixture.GetStore(); + var key = "article-1"; + + // Act - Interleave adds and removes + store.AddTags(key, "a", "b", "c"); + store.RemoveTags(key, "b"); + store.AddTags(key, "d", "e"); + store.RemoveTags(key, "a", "e"); + + // Assert - Should have: c, d + Assert.That(store.TryGet(key, out var tags), Is.True); + Assert.That(tags, Is.Not.Null); + Assert.That(tags!.Count, Is.EqualTo(2)); + Assert.That(tags, Does.Contain("c")); + Assert.That(tags, Does.Contain("d")); + } + + [Test] + public void should_handle_remove_nonexistent_item_gracefully() + { + // Arrange + using var testFixture = TestFixture.Create(rockDb => + { + rockDb.AddMergeableStore, TagsStore, CollectionOperation>("tags", new ListMergeOperator()); + }); + + var store = testFixture.GetStore(); + var key = "article-1"; + + // Act - Try to remove items that don't exist + store.AddTags(key, "csharp"); + store.RemoveTags(key, "nonexistent"); + + // Assert - Original item should still be there + Assert.That(store.TryGet(key, out var tags), Is.True); + Assert.That(tags, Is.Not.Null); + Assert.That(tags!.Count, Is.EqualTo(1)); + Assert.That(tags, Does.Contain("csharp")); + } + + [Test] + public void should_remove_only_first_occurrence_of_duplicate_items() + { + // Arrange + using var testFixture = TestFixture.Create(rockDb => + { + rockDb.AddMergeableStore, TagsStore, CollectionOperation>("tags", new ListMergeOperator()); + }); + + var store = testFixture.GetStore(); + var key = "article-1"; + + // Act - Add duplicate items, then remove one + store.AddTags(key, "tag", "tag", "tag"); + store.RemoveTags(key, "tag"); + + // Assert - Should have 2 remaining + Assert.That(store.TryGet(key, out var tags), Is.True); + Assert.That(tags, Is.Not.Null); + Assert.That(tags!.Count, Is.EqualTo(2)); + Assert.That(tags[0], Is.EqualTo("tag")); + Assert.That(tags[1], Is.EqualTo("tag")); + } + + [Test] + public void should_add_primitive_types_to_list_using_list_merge_operator() + { + // Arrange + using var testFixture = TestFixture.Create(rockDb => + { + rockDb.AddMergeableStore, ScoresStore, CollectionOperation>( + "scores", + new ListMergeOperator()); + }); + + var store = testFixture.GetStore(); + var key = "player-1"; + + // Act + store.AddScores(key, 100, 200); + store.AddScores(key, 300); + + // Assert + Assert.That(store.TryGet(key, out var scores), Is.True); + Assert.That(scores, Is.Not.Null); + Assert.That(scores!.Count, Is.EqualTo(3)); + Assert.That(scores, Does.Contain(100)); + Assert.That(scores, Does.Contain(200)); + Assert.That(scores, Does.Contain(300)); + } + + [Test] + public void should_add_and_remove_primitive_types_using_list_merge_operator() + { + // Arrange + using var testFixture = TestFixture.Create(rockDb => + { + rockDb.AddMergeableStore, ScoresStore, CollectionOperation>( + "scores", + new ListMergeOperator()); + }); + + var store = testFixture.GetStore(); + var key = "player-1"; + + // Act - Add items, then remove some + store.AddScores(key, 100, 200, 300, 400); + store.RemoveScores(key, 200, 400); // Remove middle values + + // Assert + Assert.That(store.TryGet(key, out var scores), Is.True); + Assert.That(scores, Is.Not.Null); + Assert.That(scores!.Count, Is.EqualTo(2)); + Assert.That(scores, Does.Contain(100)); + Assert.That(scores, Does.Contain(300)); + Assert.That(scores, Does.Not.Contain(200)); + Assert.That(scores, Does.Not.Contain(400)); + } + + [Test] + public void should_preserve_merge_operator_after_clear() + { + // Arrange + using var testFixture = TestFixture.Create(rockDb => + { + rockDb.AddMergeableStore, TagsStore, CollectionOperation>("tags", new ListMergeOperator()); + }); + + var store = testFixture.GetStore(); + var key = "article-1"; + + // Act - Add items, clear the store, then use merge operation again + store.AddTags(key, "csharp", "dotnet"); + Assert.That(store.TryGet(key, out var tagsBeforeClear), Is.True); + Assert.That(tagsBeforeClear!.Count, Is.EqualTo(2)); + + store.Clear(); + + // After clear, merge operations should still work + store.AddTags(key, "rocksdb", "database"); + store.AddTags(key, "performance"); + + // Assert + Assert.That(store.TryGet(key, out var tagsAfterClear), Is.True); + Assert.That(tagsAfterClear, Is.Not.Null); + Assert.That(tagsAfterClear!.Count, Is.EqualTo(3)); + Assert.That(tagsAfterClear, Does.Contain("rocksdb")); + Assert.That(tagsAfterClear, Does.Contain("database")); + Assert.That(tagsAfterClear, Does.Contain("performance")); + + // Old data should be gone + Assert.That(tagsAfterClear, Does.Not.Contain("csharp")); + Assert.That(tagsAfterClear, Does.Not.Contain("dotnet")); + } + + [Test] + public void should_not_lose_data_when_using_protobufnet_serializer_with_merge_operators() + { + // Arrange + using var testFixture = TestFixture.Create(rockDb => + { + rockDb.AddMergeableStore, ProtoNetTagsStore, CollectionOperation>( + "protonet-tags", + new ListMergeOperator()); + }, options => + { + options.SerializerFactories.Clear(); + options.SerializerFactories.Add(new PrimitiveTypesSerializerFactory()); // For string keys + options.SerializerFactories.Add(new ProtoBufNetSerializerFactory()); // For ProtoNetTag values + }); + + var store = testFixture.GetStore(); + var key = "article-1"; + + var tag1 = new ProtoNetTag { Id = 1, Name = "csharp" }; + var tag2 = new ProtoNetTag { Id = 2, Name = "dotnet" }; + var tag3 = new ProtoNetTag { Id = 3, Name = "rocksdb" }; + + // Act - Use Put first to establish initial value, then Merge + store.Put(key, new List { tag1 }); + store.AddTags(key, tag2); + store.AddTags(key, tag3); + + // Assert + Assert.That(store.TryGet(key, out var tags), Is.True); + Assert.That(tags, Is.Not.Null); + Assert.That(tags!.Count, Is.EqualTo(3), "All tags should be preserved"); + Assert.That(tags, Does.Contain(tag1)); + Assert.That(tags, Does.Contain(tag2)); + Assert.That(tags, Does.Contain(tag3)); + } + + private class TagsStore : MergeableRocksDbStore, CollectionOperation> + { + public TagsStore(IMergeAccessor, CollectionOperation> mergeAccessor) + : base(mergeAccessor) + { + } + + public void AddTags(string key, params string[] tags) + { + Merge(key, CollectionOperation.Add(tags)); + } + + public void RemoveTags(string key, params string[] tags) + { + Merge(key, CollectionOperation.Remove(tags)); + } + } + + private class ScoresStore : MergeableRocksDbStore, CollectionOperation> + { + public ScoresStore(IMergeAccessor, CollectionOperation> mergeAccessor) + : base(mergeAccessor) + { + } + + public void AddScores(string key, params int[] scores) + { + Merge(key, CollectionOperation.Add(scores)); + } + + public void RemoveScores(string key, params int[] scores) + { + Merge(key, CollectionOperation.Remove(scores)); + } + } + + private class ProtoNetTagsStore : MergeableRocksDbStore, CollectionOperation> + { + public ProtoNetTagsStore(IMergeAccessor, CollectionOperation> mergeAccessor) + : base(mergeAccessor) + { + } + + public void AddTags(string key, params ProtoNetTag[] tags) + { + Merge(key, CollectionOperation.Add(tags)); + } + + public void RemoveTags(string key, params ProtoNetTag[] tags) + { + Merge(key, CollectionOperation.Remove(tags)); + } + } +} \ No newline at end of file diff --git a/test/RocksDb.Extensions.Tests/Protos/ProtoNetTag.cs b/test/RocksDb.Extensions.Tests/Protos/ProtoNetTag.cs new file mode 100644 index 0000000..c5a7097 --- /dev/null +++ b/test/RocksDb.Extensions.Tests/Protos/ProtoNetTag.cs @@ -0,0 +1,25 @@ +using System.Runtime.Serialization; +using ProtoBuf; + +namespace RocksDb.Extensions.Tests.Protos; + +[DataContract] +public class ProtoNetTag +{ + [ProtoMember(1)] + public int Id { get; set; } + + [ProtoMember(2)] + public string Name { get; set; } = null!; + + public override bool Equals(object? obj) + { + return obj is ProtoNetTag tag && Id == tag.Id && Name == tag.Name; + } + + public override int GetHashCode() + { + return HashCode.Combine(Id, Name); + } +} +