diff --git a/cs/benchmark/FasterSpanByteYcsbBenchmark.cs b/cs/benchmark/FasterSpanByteYcsbBenchmark.cs index 2a101b7b3..641d4a47f 100644 --- a/cs/benchmark/FasterSpanByteYcsbBenchmark.cs +++ b/cs/benchmark/FasterSpanByteYcsbBenchmark.cs @@ -83,6 +83,11 @@ internal FasterSpanByteYcsbBenchmark(KeySpanByte[] i_keys_, KeySpanByte[] t_keys store = new FasterKV (testLoader.MaxKey / 2, new LogSettings { LogDevice = device, PreallocateLog = true, MemorySizeBits = 35 }, new CheckpointSettings { CheckPointType = CheckpointType.Snapshot, CheckpointDir = testLoader.BackupPath }); + + if (testLoader.SecondaryIndexType.HasFlag(SecondaryIndexType.Key)) + store.SecondaryIndexBroker.AddIndex(new NullKeyIndex()); + if (testLoader.SecondaryIndexType.HasFlag(SecondaryIndexType.Value)) + store.SecondaryIndexBroker.AddIndex(new NullValueIndex()); } internal void Dispose() diff --git a/cs/benchmark/FasterYcsbBenchmark.cs b/cs/benchmark/FasterYcsbBenchmark.cs index 9e8380079..2695ad6df 100644 --- a/cs/benchmark/FasterYcsbBenchmark.cs +++ b/cs/benchmark/FasterYcsbBenchmark.cs @@ -83,6 +83,11 @@ internal FASTER_YcsbBenchmark(Key[] i_keys_, Key[] t_keys_, TestLoader testLoade store = new FasterKV (testLoader.MaxKey / 2, new LogSettings { LogDevice = device, PreallocateLog = true }, new CheckpointSettings { CheckPointType = CheckpointType.Snapshot, CheckpointDir = testLoader.BackupPath }); + + if (testLoader.SecondaryIndexType.HasFlag(SecondaryIndexType.Key)) + store.SecondaryIndexBroker.AddIndex(new NullKeyIndex()); + if (testLoader.SecondaryIndexType.HasFlag(SecondaryIndexType.Value)) + store.SecondaryIndexBroker.AddIndex(new NullValueIndex()); } internal void Dispose() diff --git a/cs/benchmark/Options.cs b/cs/benchmark/Options.cs index c138ed222..b599af92d 100644 --- a/cs/benchmark/Options.cs +++ b/cs/benchmark/Options.cs @@ -38,6 +38,14 @@ class Options "\n 1 = RecordInfo.SpinLock()")] public int LockImpl { get; set; } + [Option('x', "index", Required = false, Default = 0, + HelpText = "Secondary index type(s); these implement a no-op index to test the overhead on FasterKV operations:" + + "\n 0 = None (default)" + + "\n 1 = Key-based index" + + "\n 2 = Value-based index" + + "\n 3 = Both index types")] + public int SecondaryIndexType { get; set; } + [Option('i', "iterations", Required = false, Default = 1, HelpText = "Number of iterations of the test to run")] public int IterationCount { get; set; } @@ -95,7 +103,7 @@ class Options public string GetOptionsString() { static string boolStr(bool value) => value ? "y" : "n"; - return $"d: {DistributionName.ToLower()}; n: {NumaStyle}; r: {ReadPercent}; t: {ThreadCount}; z: {LockImpl}; i: {IterationCount};" + return $"d: {DistributionName.ToLower()}; n: {NumaStyle}; r: {ReadPercent}; t: {ThreadCount}; x: {SecondaryIndexType}; z: {LockImpl}; i: {IterationCount};" + $" sd: {boolStr(UseSmallData)}; sm: {boolStr(UseSmallMemoryLog)}; sy: {boolStr(this.UseSyntheticData)}; noaff: {boolStr(this.NoThreadAffinity)};" + $" chkptms: {this.PeriodicCheckpointMilliseconds}; chkpttype: {(this.PeriodicCheckpointMilliseconds > 0 ? this.PeriodicCheckpointType.ToString() : "None")}; chkptincr: {boolStr(this.PeriodicCheckpointTryIncremental)}"; } diff --git a/cs/benchmark/SecondaryIndexes.cs b/cs/benchmark/SecondaryIndexes.cs new file mode 100644 index 000000000..ff2b19e23 --- /dev/null +++ b/cs/benchmark/SecondaryIndexes.cs @@ -0,0 +1,67 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT license. + +using FASTER.core; +using System.Threading; +using System.Threading.Tasks; + +namespace FASTER.benchmark +{ + class NullKeyIndex : ISecondaryKeyIndex + { + public string Name => "KeyIndex"; + + public bool IsMutable => true; + + public void SetSessionSlot(long slot) { } + + public void Delete(ref Key key, RecordId recordId, SecondaryIndexSessionBroker indexSessionBroker) { } + + public void Insert(ref Key key, RecordId recordId, SecondaryIndexSessionBroker indexSessionBroker) { } + + public void Upsert(ref Key key, RecordId recordId, bool isMutableRecord, SecondaryIndexSessionBroker indexSessionBroker) { } + + public void OnPrimaryTruncate(long newBeginAddress) { } + + public void ScanReadOnlyPages(IFasterScanIterator iter, SecondaryIndexSessionBroker indexSessionBroker) { } + + public void OnPrimaryCheckpointInitiated(PrimaryCheckpointInfo recoveredPci) { } + + public void OnPrimaryCheckpointCompleted(PrimaryCheckpointInfo primaryCheckpointInfo) { } + + public PrimaryCheckpointInfo Recover(PrimaryCheckpointInfo recoveredPci, bool undoNextVersion) => default; + + public Task RecoverAsync(PrimaryCheckpointInfo recoveredPci, bool undoNextVersion, CancellationToken cancellationToken = default) => default; + + public void RecoveryReplay(IFasterScanIterator iter, SecondaryIndexSessionBroker indexSessionBroker) { } + } + + class NullValueIndex : ISecondaryValueIndex + { + public string Name => "ValueIndex"; + + public bool IsMutable => true; + + public void SetSessionSlot(long slot) { } + + public void Delete(ref Key key, RecordId recordId, SecondaryIndexSessionBroker indexSessionBroker) { } + + public void Insert(ref Key key, ref Value value, RecordId recordId, SecondaryIndexSessionBroker indexSessionBroker) { } + + public void Upsert(ref Key key, ref Value value, RecordId recordId, bool isMutableRecord, SecondaryIndexSessionBroker indexSessionBroker) { } + + public void OnPrimaryTruncate(long newBeginAddress) { } + + public void ScanReadOnlyPages(IFasterScanIterator iter, SecondaryIndexSessionBroker indexSessionBroker) { } + + public void OnPrimaryCheckpointInitiated(PrimaryCheckpointInfo recoveredPci) { } + + public void OnPrimaryCheckpointCompleted(PrimaryCheckpointInfo primaryCheckpointInfo) { } + + public PrimaryCheckpointInfo Recover(PrimaryCheckpointInfo recoveredPci, bool undoNextVersion) => default; + + public Task RecoverAsync(PrimaryCheckpointInfo recoveredPci, bool undoNextVersion, CancellationToken cancellationToken = default) => default; // Not used for this class + + public void RecoveryReplay(IFasterScanIterator iter, SecondaryIndexSessionBroker indexSessionBroker) { } + } +} diff --git a/cs/benchmark/TestLoader.cs b/cs/benchmark/TestLoader.cs index 72501a1bb..12335ce9a 100644 --- a/cs/benchmark/TestLoader.cs +++ b/cs/benchmark/TestLoader.cs @@ -25,6 +25,7 @@ class TestLoader internal BenchmarkType BenchmarkType; internal LockImpl LockImpl; + internal SecondaryIndexType SecondaryIndexType; internal string Distribution; internal Key[] init_keys = default; @@ -63,6 +64,10 @@ static bool verifyOption(bool isValid, string name) if (!verifyOption(Enum.IsDefined(typeof(LockImpl), this.LockImpl), "Lock Implementation")) return false; + this.SecondaryIndexType = (SecondaryIndexType)Options.SecondaryIndexType; + if (!verifyOption(Enum.IsDefined(typeof(SecondaryIndexType), this.SecondaryIndexType), "Secondary Index Type")) + return false; + if (!verifyOption(Options.IterationCount > 0, "Iteration Count")) return false; @@ -80,6 +85,7 @@ static bool verifyOption(bool isValid, string name) this.TxnCount = this.Options.UseSmallData ? 10000000 : 1000000000; this.MaxKey = this.Options.UseSmallData ? 1 << 22 : 1 << 28; + Console.WriteLine($"Scenario: {this.BenchmarkType}, Locking: {(LockImpl)Options.LockImpl}, Indexing: {(SecondaryIndexType)Options.SecondaryIndexType}"); return true; } @@ -355,7 +361,7 @@ internal bool MaybeRecoverStore(FasterKV store) catch (Exception ex) { var suffix = Directory.Exists(this.BackupPath) ? "" : " (directory does not exist)"; - Console.WriteLine($"Unable to recover prior store: {ex.Message}{suffix}"); + Console.WriteLine($" Unable to recover prior store: {ex.Message}{suffix}"); } } return false; diff --git a/cs/benchmark/YcsbConstants.cs b/cs/benchmark/YcsbConstants.cs index cbbd5e522..1f0fdd2a9 100644 --- a/cs/benchmark/YcsbConstants.cs +++ b/cs/benchmark/YcsbConstants.cs @@ -2,6 +2,7 @@ // Licensed under the MIT license. using FASTER.core; +using System; namespace FASTER.benchmark { @@ -18,6 +19,15 @@ enum LockImpl : int RecordInfo }; + [Flags] + enum SecondaryIndexType : int + { + None = 0, + Key = 1, + Value = 2, + Both = 3 + }; + enum AddressLineNum : int { Before = 1, diff --git a/cs/benchmark/scripts/compare_runs.ps1 b/cs/benchmark/scripts/compare_runs.ps1 index 9547312e8..200151087 100644 --- a/cs/benchmark/scripts/compare_runs.ps1 +++ b/cs/benchmark/scripts/compare_runs.ps1 @@ -40,7 +40,7 @@ param ( ) class Result : System.IComparable, System.IEquatable[Object] { - # To make things work in one class, name the properties "Left", "Right", and "Diff"--they aren't displayed until the Diff is calculated. + # To make things work in one class, name the properties "Baseline", "Current", and "Diff"--they aren't displayed until the Diff is calculated. [double]$BaselineMean [double]$BaselineStdDev [double]$CurrentMean @@ -56,6 +56,7 @@ class Result : System.IComparable, System.IEquatable[Object] { [string]$Distribution [int]$ReadPercent [uint]$ThreadCount + [uint]$IndexMode [uint]$LockMode [uint]$Iterations [bool]$SmallData @@ -80,6 +81,7 @@ class Result : System.IComparable, System.IEquatable[Object] { "d" { $this.Distribution = $value } "r" { $this.ReadPercent = $value } "t" { $this.ThreadCount = $value } + "x" { $this.IndexMode = $value } "z" { $this.LockMode = $value } "i" { $this.Iterations = $value } "sd" { $this.SmallData = $value -eq "y" } @@ -98,6 +100,7 @@ class Result : System.IComparable, System.IEquatable[Object] { $this.Distribution = $other.Distribution $this.ReadPercent = $other.ReadPercent $this.ThreadCount = $other.ThreadCount + $this.IndexMode = $other.IndexMode $this.LockMode = $other.LockMode $this.Iterations = $other.Iterations $this.SmallData = $other.SmallData @@ -165,6 +168,7 @@ class Result : System.IComparable, System.IEquatable[Object] { -and $this.Distribution -eq $other.Distribution -and $this.ReadPercent -eq $other.ReadPercent -and $this.ThreadCount -eq $other.ThreadCount + -and $this.IndexMode -eq $other.IndexMode -and $this.LockMode -eq $other.LockMode -and $this.Iterations -eq $other.Iterations -and $this.SmallData -eq $other.SmallData @@ -177,7 +181,7 @@ class Result : System.IComparable, System.IEquatable[Object] { } [int] GetHashCode() { - return ($this.Numa, $this.Distribution, $this.ReadPercent, $this.ThreadCount, $this.LockMode, + return ($this.Numa, $this.Distribution, $this.ReadPercent, $this.ThreadCount, $this.IndexMode, $this.LockMode, $this.Iterations, $this.SmallData, $this.SmallMemory, $this.SyntheticData, $this.NoAff, $this.ChkptMs, $this.ChkptType, $this.ChkptIncr).GetHashCode(); } diff --git a/cs/benchmark/scripts/run_benchmark.ps1 b/cs/benchmark/scripts/run_benchmark.ps1 index d94171d90..62d085b88 100644 --- a/cs/benchmark/scripts/run_benchmark.ps1 +++ b/cs/benchmark/scripts/run_benchmark.ps1 @@ -30,6 +30,10 @@ Number of threads to use. Used primarily to debug changes to this script or do a quick one-off run; the default is multiple counts as defined in the script. +.PARAMETER IndexMode + Indexing mode to use: 0 = None, 1 = Key, 2 = Value, 3 = both. + Used primarily to debug changes to this script or do a quick one-off run; the default is multiple counts as defined in the script. + .PARAMETER LockMode Locking mode to use: 0 = No locking, 1 = RecordInfo locking Used primarily to debug changes to this script or do a quick one-off run; the default is multiple counts as defined in the script. @@ -83,6 +87,7 @@ param ( [Parameter(Mandatory=$true)] [string[]]$ExeDirs, [Parameter(Mandatory=$false)] [int]$RunSeconds = 30, [Parameter(Mandatory=$false)] [int]$ThreadCount = -1, + [Parameter(Mandatory=$false)] [int]$IndexMode = -1, [Parameter(Mandatory=$false)] [int]$LockMode = -1, [Parameter(Mandatory=$false)] [int[]]$ReadPercentages, [Parameter(Mandatory=$false)] [switch]$UseRecover, @@ -132,6 +137,7 @@ $iterations = 7 $distributions = ("uniform", "zipf") $readPercents = (0, 100) $threadCounts = (1, 20, 40, 60, 80) +$indexModes = (0, 1, 2) #, 3) $lockModes = (0, 1) $smallDatas = (0) #, 1) $smallMemories = (0) #, 1) @@ -141,6 +147,9 @@ $k = "" if ($ThreadCount -ge 0) { $threadCounts = ($ThreadCount) } +if ($IndexMode -ge 0) { + $indexModes = ($IndexMode) +} if ($LockMode -ge 0) { $lockModes = ($LockMode) } @@ -155,6 +164,7 @@ if ($UseRecover) { $permutations = $distributions.Count * $readPercents.Count * $threadCounts.Count * + $indexModes.Count * $lockModes.Count * $smallDatas.Count * $smallMemories.Count * @@ -164,27 +174,29 @@ $permutation = 1 foreach ($d in $distributions) { foreach ($r in $readPercents) { foreach ($t in $threadCounts) { - foreach ($z in $lockModes) { - foreach ($sd in $smallDatas) { - foreach ($sm in $smallMemories) { - foreach ($sy in $syntheticDatas) { - Write-Host - Write-Host "Permutation $permutation of $permutations" + foreach ($x in $indexModes) { + foreach ($z in $lockModes) { + foreach ($sd in $smallDatas) { + foreach ($sm in $smallMemories) { + foreach ($sy in $syntheticDatas) { + Write-Host + Write-Host "Permutation $permutation of $permutations" - # Only certain combinations of Numa/Threads are supported - $n = ($t -lt 48) ? 0 : 1; + # Only certain combinations of Numa/Threads are supported + $n = ($t -lt 48) ? 0 : 1; - for($ii = 0; $ii -lt $exeNames.Count; ++$ii) { - $exeName = $exeNames[$ii] - $resultDir = $resultDirs[$ii] + for($ii = 0; $ii -lt $exeNames.Count; ++$ii) { + $exeName = $exeNames[$ii] + $resultDir = $resultDirs[$ii] - Write-Host - Write-Host "Permutation $permutation/$permutations generating results $($ii + 1)/$($exeNames.Count) to $resultDir for: -n $n -d $d -r $r -t $t -z $z -i $iterations --runsec $RunSeconds $k" + Write-Host + Write-Host "Permutation $permutation/$permutations generating results $($ii + 1)/$($exeNames.Count) to $resultDir for: -n $n -d $d -r $r -t $t -z $z -i $iterations --runsec $RunSeconds $k" - # RunSec and Recover are for one-off operations and are not recorded in the filenames. - & "$exeName" -b 0 -n $n -d $d -r $r -t $t -z $z -i $iterations --runsec $RunSeconds $k | Tee-Object "$resultDir/results_n-$($n)_d-$($d)_r-$($r)_t-$($t)_z-$($z).txt" + # RunSec and Recover are for one-off operations and are not recorded in the filenames. + & "$exeName" -b 0 -n $n -d $d -r $r -t $t -x $x -z $z -i $iterations --runsec $RunSeconds $k | Tee-Object "$resultDir/results_n-$($n)_d-$($d)_r-$($r)_t-$($t)_x-$($x)_z-$($z).txt" + } + ++$permutation } - ++$permutation } } } diff --git a/cs/samples/ReadAddress/Types.cs b/cs/samples/ReadAddress/Types.cs index f1306a949..b27f3d1dc 100644 --- a/cs/samples/ReadAddress/Types.cs +++ b/cs/samples/ReadAddress/Types.cs @@ -50,7 +50,7 @@ public class Functions : AdvancedSimpleFunctions // Track the recordInfo for its PreviousAddress. public override void ReadCompletionCallback(ref Key key, ref Value input, ref Value output, Context ctx, Status status, RecordInfo recordInfo) { - if (!(ctx is null)) + if (ctx is not null) { ctx.recordInfo = recordInfo; ctx.status = status; diff --git a/cs/src/core/Allocator/AllocatorBase.cs b/cs/src/core/Allocator/AllocatorBase.cs index 37f745f50..802d04cf6 100644 --- a/cs/src/core/Allocator/AllocatorBase.cs +++ b/cs/src/core/Allocator/AllocatorBase.cs @@ -230,12 +230,16 @@ public abstract partial class AllocatorBase : IDisposable /// /// Observer for records entering read-only region /// - internal IObserver> OnReadOnlyObserver; + internal IObserver>[] OnReadOnlyObservers = Array.Empty>>(); /// /// Observer for records getting evicted from memory (page closed) /// - internal IObserver> OnEvictionObserver; + internal IObserver>[] OnEvictionObservers = Array.Empty>>(); + + // Observer membership locks + private readonly object readOnlyObserverLock = new object(); + private readonly object evictionObserverLock = new object(); /// /// The "event" to be waited on for flush completion by the initiator of an operation @@ -825,8 +829,7 @@ public virtual void Dispose() epoch.Dispose(); bufferPool.Free(); - OnReadOnlyObserver?.OnCompleted(); - OnEvictionObserver?.OnCompleted(); + this.OnCompleted(); } /// @@ -886,6 +889,86 @@ public int EmptyPageCount /// internal abstract void DeleteFromMemory(); + internal void AddReadOnlyObserver(IObserver> observer) + => AddObserver(ref OnReadOnlyObservers, observer, readOnlyObserverLock); + internal void RemoveReadOnlyObserver(IObserver> observer) + => RemoveObserver(ref OnReadOnlyObservers, observer, readOnlyObserverLock); + + internal void AddEvictionObserver(IObserver> observer) + => AddObserver(ref OnEvictionObservers, observer, evictionObserverLock); + internal void RemoveEvictionObserver(IObserver> observer) + => RemoveObserver(ref OnEvictionObservers, observer, evictionObserverLock); + + internal static void AddObserver(ref IObserver>[] observers, IObserver> observer, object lockObject) + { + lock (lockObject) + { + if (observers.Length == 0) + { + observers = new[] { observer }; + return; + } + var equatable = observer as IEquatable>>; + foreach (var existing in observers) + { + bool found = equatable is { } && existing is IEquatable>> existingEquatable + ? equatable.Equals(existingEquatable) + : observer.Equals(existing); + if (found) + return; + } + var vec = new IObserver>[observers.Length + 1]; + Array.Copy(observers, vec, observers.Length); + vec[observers.Length] = observer; + observers = vec; + } + } + + internal static void RemoveObserver(ref IObserver>[] observers, IObserver> observer, object lockObject) + { + lock (lockObject) + { + if (observers.Length == 0) + return; + var equatable = observer as IEquatable>>; + for (var ii = 0; ii < observers.Length; ++ii) + { + var existing = observers[ii]; + bool found = equatable is { } && existing is IEquatable>> existingEquatable + ? equatable.Equals(existingEquatable) + : observer.Equals(existing); + if (found) + { + if (observers.Length == 1) + { + observers = Array.Empty>>(); + return; + } + var vec = new IObserver>[observers.Length - 1]; + if (ii > 0) + Array.Copy(observers, vec, ii); + if (ii < observers.Length - 1) + Array.Copy(observers, ii + 1, vec, ii, observers.Length - ii - 1); + observers = vec; + } + } + } + } + + void OnCompleted() + { + OnCompleted(ref this.OnReadOnlyObservers); + OnCompleted(ref this.OnEvictionObservers); + } + + internal static void OnCompleted(ref IObserver>[] observers) + { + var localObservers = observers; + if (localObservers is null || Interlocked.CompareExchange(ref observers, null, localObservers) != localObservers) + return; + foreach (var observer in localObservers) + observer.OnCompleted(); + } /// /// Segment size @@ -1241,10 +1324,13 @@ public void OnPagesMarkedReadOnly(long newSafeReadOnlyAddress) if (Utility.MonotonicUpdate(ref SafeReadOnlyAddress, newSafeReadOnlyAddress, out long oldSafeReadOnlyAddress)) { Debug.WriteLine("SafeReadOnly shifted from {0:X} to {1:X}", oldSafeReadOnlyAddress, newSafeReadOnlyAddress); - if (OnReadOnlyObserver != null) + + var localObservers = this.OnReadOnlyObservers; + foreach (var observer in localObservers) { + // TODO: Parallelize OPMRO using var iter = Scan(oldSafeReadOnlyAddress, newSafeReadOnlyAddress, ScanBufferingMode.NoBuffering); - OnReadOnlyObserver?.OnNext(iter); + observer.OnNext(iter); } AsyncFlushPages(oldSafeReadOnlyAddress, newSafeReadOnlyAddress); } diff --git a/cs/src/core/Allocator/BlittableAllocator.cs b/cs/src/core/Allocator/BlittableAllocator.cs index 1189d3cc5..27c436ca0 100644 --- a/cs/src/core/Allocator/BlittableAllocator.cs +++ b/cs/src/core/Allocator/BlittableAllocator.cs @@ -340,8 +340,12 @@ public override IFasterScanIterator Scan(long beginAddress, long end /// internal override void MemoryPageScan(long beginAddress, long endAddress) { - using var iter = new BlittableScanIterator(this, beginAddress, endAddress, ScanBufferingMode.NoBuffering, epoch, true); - OnEvictionObserver?.OnNext(iter); + var localObservers = OnEvictionObservers; + foreach (var observer in localObservers) + { + using var iter = new BlittableScanIterator(this, beginAddress, endAddress, ScanBufferingMode.NoBuffering, epoch, true); + observer.OnNext(iter); + } } /// diff --git a/cs/src/core/Allocator/GenericAllocator.cs b/cs/src/core/Allocator/GenericAllocator.cs index c00d2811d..da2ca3edf 100644 --- a/cs/src/core/Allocator/GenericAllocator.cs +++ b/cs/src/core/Allocator/GenericAllocator.cs @@ -1030,12 +1030,16 @@ internal override void MemoryPageScan(long beginAddress, long endAddress) int start = (int)(beginAddress & PageSizeMask) / recordSize; int count = (int)(endAddress - beginAddress) / recordSize; int end = start + count; - using var iter = new MemoryPageScanIterator(values[page], start, end); Debug.Assert(epoch.ThisInstanceProtected()); try { epoch.Suspend(); - OnEvictionObserver?.OnNext(iter); + var localObservers = OnEvictionObservers; + foreach (var observer in localObservers) + { + using var iter = new MemoryPageScanIterator(values[page], start, end); + observer.OnNext(iter); + } } finally { diff --git a/cs/src/core/Allocator/IFasterScanIterator.cs b/cs/src/core/Allocator/IFasterScanIterator.cs index 3101c3fc9..55698679f 100644 --- a/cs/src/core/Allocator/IFasterScanIterator.cs +++ b/cs/src/core/Allocator/IFasterScanIterator.cs @@ -70,5 +70,15 @@ public interface IFasterScanIterator : IDisposable /// Next address /// long NextAddress { get; } + + /// + /// The starting address of the scan + /// + long BeginAddress { get; } + + /// + /// The ending address of the scan + /// + long EndAddress { get; } } } diff --git a/cs/src/core/Allocator/MemoryPageScanIterator.cs b/cs/src/core/Allocator/MemoryPageScanIterator.cs index 2d82f5622..6b54a2b49 100644 --- a/cs/src/core/Allocator/MemoryPageScanIterator.cs +++ b/cs/src/core/Allocator/MemoryPageScanIterator.cs @@ -16,7 +16,7 @@ namespace FASTER.core class MemoryPageScanIterator : IFasterScanIterator { readonly Record[] page; - readonly int end; + readonly int start, end; int offset; @@ -25,6 +25,7 @@ public MemoryPageScanIterator(Record[] page, int start, int end) this.page = new Record[page.Length]; Array.Copy(page, start, this.page, start, end - start); offset = start - 1; + this.start = start; this.end = end; } @@ -32,6 +33,10 @@ public MemoryPageScanIterator(Record[] page, int start, int end) public long NextAddress => offset + 1; + public long BeginAddress => start; + + public long EndAddress => end; + public void Dispose() { } diff --git a/cs/src/core/Allocator/ScanIteratorBase.cs b/cs/src/core/Allocator/ScanIteratorBase.cs index 37a110ec6..21c0db3bd 100644 --- a/cs/src/core/Allocator/ScanIteratorBase.cs +++ b/cs/src/core/Allocator/ScanIteratorBase.cs @@ -52,6 +52,16 @@ public abstract class ScanIteratorBase /// public long NextAddress => nextAddress; + /// + /// The starting address of the scan + /// + public long BeginAddress => beginAddress; + + /// + /// The ending address of the scan + /// + public long EndAddress => endAddress; + /// /// Constructor /// diff --git a/cs/src/core/Allocator/VarLenBlittableAllocator.cs b/cs/src/core/Allocator/VarLenBlittableAllocator.cs index 9c8db19f4..e1ebae221 100644 --- a/cs/src/core/Allocator/VarLenBlittableAllocator.cs +++ b/cs/src/core/Allocator/VarLenBlittableAllocator.cs @@ -464,8 +464,12 @@ public override IFasterScanIterator Scan(long beginAddress, long end /// internal override void MemoryPageScan(long beginAddress, long endAddress) { - using var iter = new VariableLengthBlittableScanIterator(this, beginAddress, endAddress, ScanBufferingMode.NoBuffering, epoch, true); - OnEvictionObserver?.OnNext(iter); + var localObservers = OnEvictionObservers; + foreach (var observer in localObservers) + { + using var iter = new VariableLengthBlittableScanIterator(this, beginAddress, endAddress, ScanBufferingMode.NoBuffering, epoch, true); + observer.OnNext(iter); + } } diff --git a/cs/src/core/Allocator/WorkQueueLIFO.cs b/cs/src/core/Allocator/WorkQueueLIFO.cs index 42eb76a62..852a7dfc7 100644 --- a/cs/src/core/Allocator/WorkQueueLIFO.cs +++ b/cs/src/core/Allocator/WorkQueueLIFO.cs @@ -54,7 +54,7 @@ public void EnqueueAndTryWork(T work, bool asTask) private void ProcessQueue() { - // Process items in qork queue + // Process items in work queue while (true) { while (_queue.TryPop(out var workItem)) diff --git a/cs/src/core/Async/DeleteAsync.cs b/cs/src/core/Async/DeleteAsync.cs index 48b408641..69c419a1d 100644 --- a/cs/src/core/Async/DeleteAsync.cs +++ b/cs/src/core/Async/DeleteAsync.cs @@ -27,6 +27,18 @@ public Status DoFastOperation(FasterKV fasterKV, ref PendingContext< internalStatus = fasterKV.InternalDelete(ref pendingContext.key.Get(), ref pendingContext.userContext, ref pendingContext, fasterSession, currentCtx, pendingContext.serialNum); } while (internalStatus == OperationStatus.RETRY_NOW); output = default; + + if (internalStatus == OperationStatus.SUCCESS || internalStatus == OperationStatus.NOTFOUND) + { + if (pendingContext.IsNewRecord) + { + Debug.Assert(internalStatus == OperationStatus.SUCCESS); + + // No need to lock here; we have just written a new record with a tombstone, so it will not be changed + // TODO - but this can race with an INSERT of the same key... + fasterKV.UpdateSIForDelete(ref pendingContext.key.Get(), new RecordId(pendingContext.recordInfo, pendingContext.logicalAddress), isNewRecord: true, fasterSession.SecondaryIndexSessionBroker); + } + } return TranslateStatus(internalStatus); } @@ -103,7 +115,17 @@ internal ValueTask> DeleteAsync>(new DeleteAsyncResult((Status)internalStatus)); + } Debug.Assert(internalStatus == OperationStatus.ALLOCATE_FAILED); } finally diff --git a/cs/src/core/Async/RMWAsync.cs b/cs/src/core/Async/RMWAsync.cs index 3276aa652..c80b5a5df 100644 --- a/cs/src/core/Async/RMWAsync.cs +++ b/cs/src/core/Async/RMWAsync.cs @@ -32,7 +32,16 @@ public Status DoFastOperation(FasterKV fasterKV, ref PendingContext< pendingContext.serialNum, asyncOp, out flushEvent, out newDiskRequest); output = pendingContext.output; - if (status == Status.PENDING && !newDiskRequest.IsDefault()) + if (status == Status.OK || status == Status.NOTFOUND) + { + if (pendingContext.IsNewRecord) + { + long physicalAddress = fasterKV.hlog.GetPhysicalAddress(pendingContext.logicalAddress); + fasterKV.UpdateSIForInsert>(ref fasterKV.hlog.GetKey(physicalAddress), ref fasterKV.hlog.GetValue(physicalAddress), + ref fasterKV.hlog.GetInfo(physicalAddress), pendingContext.logicalAddress, fasterSession); + } + } + else if (status == Status.PENDING && !newDiskRequest.IsDefault()) { flushEvent = default; this.diskRequest = newDiskRequest; @@ -163,12 +172,33 @@ private Status CallInternalRMW(IFasterSession>(ref key, ref this.hlog.GetValue(physicalAddress), + ref this.hlog.GetInfo(physicalAddress), pcontext.logicalAddress, fasterSession); + pcontext.IsNewRecord = false; // So DoFastOperation does not do this again + } return (Status)internalStatus; + } if (internalStatus == OperationStatus.ALLOCATE_FAILED) return Status.PENDING; // This plus diskRequest.IsDefault() means allocate failed flushEvent = default; - return HandleOperationStatus(currentCtx, currentCtx, ref pcontext, fasterSession, internalStatus, asyncOp, out diskRequest); + var status = HandleOperationStatus(currentCtx, currentCtx, ref pcontext, fasterSession, internalStatus, asyncOp, out diskRequest); + if (status != Status.PENDING) + { + if (pcontext.IsNewRecord) + { + Debug.Assert(status == Status.OK || status == Status.NOTFOUND); + long physicalAddress = this.hlog.GetPhysicalAddress(pcontext.logicalAddress); + UpdateSIForInsert>(ref key, ref this.hlog.GetValue(physicalAddress), + ref this.hlog.GetInfo(physicalAddress), pcontext.logicalAddress, fasterSession); + pcontext.IsNewRecord = false; // So DoFastOperation does not do this again + } + } + return status; } private static async ValueTask> SlowRmwAsync( diff --git a/cs/src/core/Async/ReadAsync.cs b/cs/src/core/Async/ReadAsync.cs index a6b472c82..149405ec7 100644 --- a/cs/src/core/Async/ReadAsync.cs +++ b/cs/src/core/Async/ReadAsync.cs @@ -189,7 +189,7 @@ internal ValueTask> ReadAsync> SlowReadAsync( + internal static async ValueTask> SlowReadAsync( FasterKV @this, IFasterSession fasterSession, FasterExecutionContext currentCtx, diff --git a/cs/src/core/Async/UpsertAsync.cs b/cs/src/core/Async/UpsertAsync.cs index a5e9096a7..40ae7eaa7 100644 --- a/cs/src/core/Async/UpsertAsync.cs +++ b/cs/src/core/Async/UpsertAsync.cs @@ -27,6 +27,15 @@ public Status DoFastOperation(FasterKV fasterKV, ref PendingContext< internalStatus = fasterKV.InternalUpsert(ref pendingContext.key.Get(), ref pendingContext.value.Get(), ref pendingContext.userContext, ref pendingContext, fasterSession, currentCtx, pendingContext.serialNum); } while (internalStatus == OperationStatus.RETRY_NOW); output = default; + if (internalStatus == OperationStatus.SUCCESS || internalStatus == OperationStatus.NOTFOUND) + { + if (pendingContext.IsNewRecord) + { + long physicalAddress = fasterKV.hlog.GetPhysicalAddress(pendingContext.logicalAddress); + fasterKV.UpdateSIForInsert>(ref pendingContext.key.Get(), ref pendingContext.value.Get(), + ref fasterKV.hlog.GetInfo(physicalAddress), pendingContext.logicalAddress, fasterSession); + } + } return TranslateStatus(internalStatus); } @@ -103,7 +112,16 @@ private ValueTask> UpsertAsync>(ref key, ref this.hlog.GetValue(physicalAddress), + ref this.hlog.GetInfo(physicalAddress), pcontext.logicalAddress, fasterSession); + } return new ValueTask>(new UpsertAsyncResult((Status)internalStatus)); + } + Debug.Assert(internalStatus == OperationStatus.ALLOCATE_FAILED); } finally diff --git a/cs/src/core/ClientSession/AdvancedClientSession.cs b/cs/src/core/ClientSession/AdvancedClientSession.cs index b770aae93..d82faa2b0 100644 --- a/cs/src/core/ClientSession/AdvancedClientSession.cs +++ b/cs/src/core/ClientSession/AdvancedClientSession.cs @@ -39,6 +39,8 @@ public sealed class AdvancedClientSession>.NotAsyncSessionErr; internal AdvancedClientSession( @@ -68,7 +70,7 @@ internal AdvancedClientSession( } else { - if (!(fht.hlog is VariableLengthBlittableAllocator)) + if (fht.hlog is not VariableLengthBlittableAllocator) Debug.WriteLine("Warning: Session param of variableLengthStruct provided for non-varlen allocator"); } @@ -101,7 +103,7 @@ internal AdvancedClientSession( private void UpdateVarlen(ref IVariableLengthStruct variableLengthStruct) { - if (!(fht.hlog is VariableLengthBlittableAllocator)) + if (fht.hlog is not VariableLengthBlittableAllocator) return; if (typeof(Value) == typeof(SpanByte) && typeof(Input) == typeof(SpanByte)) @@ -148,6 +150,7 @@ public void Dispose() this.completedOutputs?.Dispose(); CompletePending(true); fht.DisposeClientSession(ID); + SecondaryIndexSessionBroker.Dispose(); // Session runs on a single thread if (!SupportAsync) @@ -976,7 +979,8 @@ public bool ConcurrentWriter(ref Key key, ref Value src, ref Value dst, ref Reco private bool ConcurrentWriterNoLock(ref Key key, ref Value src, ref Value dst, ref RecordInfo recordInfo, long address) { recordInfo.Version = _clientSession.ctx.version; - return _clientSession.functions.ConcurrentWriter(ref key, ref src, ref dst, ref recordInfo, address); + return _clientSession.functions.ConcurrentWriter(ref key, ref src, ref dst, ref recordInfo, address) + && _clientSession.fht.UpdateSIForIPU(ref key, ref dst, new RecordId(recordInfo, address), this.SecondaryIndexSessionBroker); } private bool ConcurrentWriterLock(ref Key key, ref Value src, ref Value dst, ref RecordInfo recordInfo, long address) @@ -1063,7 +1067,8 @@ public bool InPlaceUpdater(ref Key key, ref Input input, ref Value value, ref Ou private bool InPlaceUpdaterNoLock(ref Key key, ref Input input, ref Output output, ref Value value, ref RecordInfo recordInfo, long address) { recordInfo.Version = _clientSession.ctx.version; - return _clientSession.functions.InPlaceUpdater(ref key, ref input, ref value, ref output, ref recordInfo, address); + return _clientSession.functions.InPlaceUpdater(ref key, ref input, ref value, ref output, ref recordInfo, address) + && _clientSession.fht.UpdateSIForIPU(ref key, ref value, new RecordId(recordInfo, address), this.SecondaryIndexSessionBroker); } private bool InPlaceUpdaterLock(ref Key key, ref Input input, ref Output output, ref Value value, ref RecordInfo recordInfo, long address) @@ -1132,6 +1137,8 @@ public IHeapContainer GetHeapContainer(ref Input input) public bool CompletePendingWithOutputs(out CompletedOutputIterator completedOutputs, bool wait = false, bool spinWaitForCommit = false) => throw new NotImplementedException(); + + public SecondaryIndexSessionBroker SecondaryIndexSessionBroker => _clientSession.SecondaryIndexSessionBroker; } } } diff --git a/cs/src/core/ClientSession/ClientSession.cs b/cs/src/core/ClientSession/ClientSession.cs index 965b00e94..8ed860d22 100644 --- a/cs/src/core/ClientSession/ClientSession.cs +++ b/cs/src/core/ClientSession/ClientSession.cs @@ -39,6 +39,8 @@ public sealed class ClientSession internal readonly InternalFasterSession FasterSession; + internal readonly SecondaryIndexSessionBroker SecondaryIndexSessionBroker = new(); + internal const string NotAsyncSessionErr = "Session does not support async operations"; internal ClientSession( @@ -68,7 +70,7 @@ internal ClientSession( } else { - if (!(fht.hlog is VariableLengthBlittableAllocator)) + if (fht.hlog is not VariableLengthBlittableAllocator) Debug.WriteLine("Warning: Session param of variableLengthStruct provided for non-varlen allocator"); } @@ -101,7 +103,7 @@ internal ClientSession( private void UpdateVarlen(ref IVariableLengthStruct variableLengthStruct) { - if (!(fht.hlog is VariableLengthBlittableAllocator)) + if (fht.hlog is not VariableLengthBlittableAllocator) return; if (typeof(Value) == typeof(SpanByte) && typeof(Input) == typeof(SpanByte)) @@ -153,6 +155,7 @@ public void Dispose() this.completedOutputs?.Dispose(); CompletePending(true); fht.DisposeClientSession(ID); + SecondaryIndexSessionBroker.Dispose(); // Session runs on a single thread if (!SupportAsync) @@ -1045,7 +1048,7 @@ public void ConcurrentReaderLock(ref Key key, ref Input input, ref Value value, } [MethodImpl(MethodImplOptions.AggressiveInlining)] - public bool ConcurrentWriter(ref Key key, ref Value src, ref Value dst, ref RecordInfo recordInfo, long address) + public bool ConcurrentWriter(ref Key key, ref Value src, ref Value dst, ref RecordInfo recordInfo, long address) => !this.SupportsLocking ? ConcurrentWriterNoLock(ref key, ref src, ref dst, ref recordInfo, address) : ConcurrentWriterLock(ref key, ref src, ref dst, ref recordInfo, address); @@ -1054,7 +1057,8 @@ public bool ConcurrentWriter(ref Key key, ref Value src, ref Value dst, ref Reco private bool ConcurrentWriterNoLock(ref Key key, ref Value src, ref Value dst, ref RecordInfo recordInfo, long address) { recordInfo.Version = _clientSession.ctx.version; - return _clientSession.functions.ConcurrentWriter(ref key, ref src, ref dst); + return _clientSession.functions.ConcurrentWriter(ref key, ref src, ref dst) + && _clientSession.fht.UpdateSIForIPU(ref key, ref dst, new RecordId(recordInfo, address), this.SecondaryIndexSessionBroker); } [MethodImpl(MethodImplOptions.AggressiveInlining)] @@ -1076,13 +1080,13 @@ private bool ConcurrentWriterLock(ref Key key, ref Value src, ref Value dst, ref public void ConcurrentDeleter(ref Key key, ref Value value, ref RecordInfo recordInfo, long address) { if (!this.SupportsLocking) - ConcurrentDeleterNoLock(ref key, ref value, ref recordInfo, address); + ConcurrentDeleterNoLock(ref recordInfo); else ConcurrentDeleterLock(ref key, ref value, ref recordInfo, address); } [MethodImpl(MethodImplOptions.AggressiveInlining)] - private void ConcurrentDeleterNoLock(ref Key key, ref Value value, ref RecordInfo recordInfo, long address) + private void ConcurrentDeleterNoLock(ref RecordInfo recordInfo) { // Non-Advanced IFunctions has no ConcurrentDeleter recordInfo.Version = _clientSession.ctx.version; @@ -1095,7 +1099,7 @@ private void ConcurrentDeleterLock(ref Key key, ref Value value, ref RecordInfo this.Lock(ref recordInfo, ref key, ref value, LockType.Exclusive, ref context); try { - ConcurrentDeleterNoLock(ref key, ref value, ref recordInfo, address); + ConcurrentDeleterNoLock(ref recordInfo); } finally { @@ -1141,7 +1145,8 @@ public bool InPlaceUpdater(ref Key key, ref Input input, ref Value value, ref Ou private bool InPlaceUpdaterNoLock(ref Key key, ref Input input, ref Output output, ref Value value, ref RecordInfo recordInfo, long address) { recordInfo.Version = _clientSession.ctx.version; - return _clientSession.functions.InPlaceUpdater(ref key, ref input, ref value, ref output); + return _clientSession.functions.InPlaceUpdater(ref key, ref input, ref value, ref output) + && _clientSession.fht.UpdateSIForIPU(ref key, ref value, new RecordId(recordInfo, address), this.SecondaryIndexSessionBroker); } private bool InPlaceUpdaterLock(ref Key key, ref Input input, ref Output output, ref Value value, ref RecordInfo recordInfo, long address) @@ -1209,6 +1214,8 @@ public IHeapContainer GetHeapContainer(ref Input input) public bool CompletePendingWithOutputs(out CompletedOutputIterator completedOutputs, bool wait = false, bool spinWaitForCommit = false) => throw new NotImplementedException(); + + public SecondaryIndexSessionBroker SecondaryIndexSessionBroker => _clientSession.SecondaryIndexSessionBroker; } } } diff --git a/cs/src/core/Index/Common/CompletedOutput.cs b/cs/src/core/Index/Common/CompletedOutput.cs index 107853595..5e3d8025c 100644 --- a/cs/src/core/Index/Common/CompletedOutput.cs +++ b/cs/src/core/Index/Common/CompletedOutput.cs @@ -2,7 +2,6 @@ // Licensed under the MIT license. using System; -using System.Collections.Generic; namespace FASTER.core { @@ -94,7 +93,7 @@ public struct CompletedOutput public ref TInput Input => ref inputContainer.Get(); /// - /// The output for this pending operation. + /// The output for this pending operation. It is the caller's responsibility to dispose this if necessary; will not try to dispose this member. /// public TOutput Output; diff --git a/cs/src/core/Index/Common/Contexts.cs b/cs/src/core/Index/Common/Contexts.cs index 80ba150bf..47b8df441 100644 --- a/cs/src/core/Index/Common/Contexts.cs +++ b/cs/src/core/Index/Common/Contexts.cs @@ -18,7 +18,6 @@ internal enum OperationType READ, RMW, UPSERT, - INSERT, DELETE } @@ -96,6 +95,7 @@ internal struct PendingContext internal const byte kNoKey = 0x10; internal const byte kSkipCopyReadsToTail = 0x20; internal const byte kIsAsync = 0x40; + internal const byte kIsNewRecord = 0x80; [MethodImpl(MethodImplOptions.AggressiveInlining)] internal IHeapContainer DetachKey() @@ -168,6 +168,12 @@ internal bool IsAsync set => operationFlags = value ? (byte)(operationFlags | kIsAsync) : (byte)(operationFlags & ~kIsAsync); } + internal bool IsNewRecord + { + get => (operationFlags & kIsNewRecord) != 0; + set => operationFlags = value ? (byte)(operationFlags | kIsNewRecord) : (byte)(operationFlags & ~kIsNewRecord); + } + public void Dispose() { key?.Dispose(); @@ -266,7 +272,7 @@ public struct HybridLogRecoveryInfo /// public int nextVersion; /// - /// Flushed logical address + /// Flushed logical address; indicates the latest immutable address on the main FASTER log at recovery time. /// public long flushedLogicalAddress; /// @@ -588,37 +594,39 @@ public HybridLogCheckpointInfo Transfer() } public void Recover(Guid token, ICheckpointManager checkpointManager, int deltaLogPageSizeBits, - bool scanDelta, long recoverTo) + bool scanDelta = false, long recoverTo = -1) { deltaFileDevice = checkpointManager.GetDeltaLogDevice(token); - deltaFileDevice.Initialize(-1); - if (deltaFileDevice.GetFileSize(0) > 0) + if (deltaFileDevice is not null) { - deltaLog = new DeltaLog(deltaFileDevice, deltaLogPageSizeBits, -1); - deltaLog.InitializeForReads(); - info.Recover(token, checkpointManager, deltaLog, scanDelta, recoverTo); - } - else - { - info.Recover(token, checkpointManager, null); + deltaFileDevice.Initialize(-1); + if (deltaFileDevice.GetFileSize(0) > 0) + { + deltaLog = new DeltaLog(deltaFileDevice, deltaLogPageSizeBits, -1); + deltaLog.InitializeForReads(); + info.Recover(token, checkpointManager, deltaLog, scanDelta, recoverTo); + return; + } } + info.Recover(token, checkpointManager, null); } public void Recover(Guid token, ICheckpointManager checkpointManager, int deltaLogPageSizeBits, out byte[] commitCookie, bool scanDelta = false, long recoverTo = -1) { deltaFileDevice = checkpointManager.GetDeltaLogDevice(token); - deltaFileDevice.Initialize(-1); - if (deltaFileDevice.GetFileSize(0) > 0) - { - deltaLog = new DeltaLog(deltaFileDevice, deltaLogPageSizeBits, -1); - deltaLog.InitializeForReads(); - info.Recover(token, checkpointManager, out commitCookie, deltaLog, scanDelta, recoverTo); - } - else + if (deltaFileDevice is not null) { - info.Recover(token, checkpointManager, out commitCookie); + deltaFileDevice.Initialize(-1); + if (deltaFileDevice.GetFileSize(0) > 0) + { + deltaLog = new DeltaLog(deltaFileDevice, deltaLogPageSizeBits, -1); + deltaLog.InitializeForReads(); + info.Recover(token, checkpointManager, out commitCookie, deltaLog, scanDelta, recoverTo); + return; + } } + info.Recover(token, checkpointManager, out commitCookie); } public bool IsDefault() diff --git a/cs/src/core/Index/Common/HeapContainer.cs b/cs/src/core/Index/Common/HeapContainer.cs index c46d94940..77f1cb0a8 100644 --- a/cs/src/core/Index/Common/HeapContainer.cs +++ b/cs/src/core/Index/Common/HeapContainer.cs @@ -12,9 +12,8 @@ namespace FASTER.core public interface IHeapContainer : IDisposable { /// - /// Get object + /// Get a reference to the contained object /// - /// ref T Get(); } diff --git a/cs/src/core/Index/FASTER/FASTER.cs b/cs/src/core/Index/FASTER/FASTER.cs index f4b30578f..25dd594b3 100644 --- a/cs/src/core/Index/FASTER/FASTER.cs +++ b/cs/src/core/Index/FASTER/FASTER.cs @@ -5,7 +5,6 @@ using System.Collections.Concurrent; using System.Collections.Generic; using System.Diagnostics; -using System.IO; using System.Linq; using System.Runtime.CompilerServices; using System.Threading; @@ -35,11 +34,15 @@ public partial class FasterKV : FasterBase, { internal readonly AllocatorBase hlog; private readonly AllocatorBase readcache; - private readonly IFasterEqualityComparer comparer; + + /// + /// Compares two keys + /// + protected readonly IFasterEqualityComparer comparer; internal readonly bool UseReadCache; private readonly CopyReadsToTail CopyReadsToTail; - private readonly bool FoldOverSnapshot; + internal readonly bool UseFoldOverCheckpoint; internal readonly int sectorSize; private readonly bool WriteDefaultOnDelete; internal bool RelaxedCPR; @@ -84,6 +87,11 @@ public partial class FasterKV : FasterBase, internal ConcurrentDictionary _recoveredSessions; + /// + /// Manages secondary indexes for this FASTER instance. + /// + public SecondaryIndexBroker SecondaryIndexBroker { get; } + /// /// Create FASTER instance /// @@ -135,17 +143,13 @@ public FasterKV(long size, LogSettings logSettings, } else { - checkpointManager = checkpointSettings.CheckpointManager ?? - new DeviceLogCommitCheckpointManager - (new LocalStorageNamedDeviceFactory(), - new DefaultCheckpointNamingScheme( - new DirectoryInfo(checkpointSettings.CheckpointDir ?? ".").FullName), removeOutdated: checkpointSettings.RemoveOutdated); + checkpointManager = checkpointSettings.CheckpointManager ?? Utility.CreateDefaultCheckpointManager(checkpointSettings); } if (checkpointSettings.CheckpointManager == null) disposeCheckpointManager = true; - FoldOverSnapshot = checkpointSettings.CheckPointType == core.CheckpointType.FoldOver; + UseFoldOverCheckpoint = checkpointSettings.CheckPointType == core.CheckpointType.FoldOver; CopyReadsToTail = logSettings.CopyReadsToTail; if (logSettings.ReadCacheSettings != null) @@ -221,6 +225,8 @@ public FasterKV(long size, LogSettings logSettings, hlog.Initialize(); + this.SecondaryIndexBroker = new SecondaryIndexBroker(this); + sectorSize = (int)logSettings.LogDevice.SectorSize; Initialize(size, sectorSize); @@ -244,7 +250,7 @@ public FasterKV(long size, LogSettings logSettings, /// operation such as growing the index). Use CompleteCheckpointAsync to wait completion. /// public bool TakeFullCheckpoint(out Guid token, long targetVersion = -1) - => TakeFullCheckpoint(out token, this.FoldOverSnapshot ? CheckpointType.FoldOver : CheckpointType.Snapshot, targetVersion); + => TakeFullCheckpoint(out token, this.UseFoldOverCheckpoint ? CheckpointType.FoldOver : CheckpointType.Snapshot, targetVersion); /// /// Initiate full checkpoint @@ -353,17 +359,7 @@ public bool TakeIndexCheckpoint(out Guid token) /// /// Whether we could initiate the checkpoint. Use CompleteCheckpointAsync to wait completion. public bool TakeHybridLogCheckpoint(out Guid token, long targetVersion = -1) - { - ISynchronizationTask backend; - if (FoldOverSnapshot) - backend = new FoldOverCheckpointTask(); - else - backend = new SnapshotCheckpointTask(); - - var result = StartStateMachine(new HybridLogCheckpointStateMachine(backend, targetVersion)); - token = _hybridLogCheckpointToken; - return result; - } + => TakeHybridLogCheckpoint(out token, UseFoldOverCheckpoint ? CheckpointType.FoldOver : CheckpointType.Snapshot, tryIncremental: false, targetVersion); /// /// Initiate log-only checkpoint @@ -631,7 +627,6 @@ internal Status ContextUpsert(ref Key key while (internalStatus == OperationStatus.RETRY_NOW); Status status; - if (internalStatus == OperationStatus.SUCCESS || internalStatus == OperationStatus.NOTFOUND) { status = (Status)internalStatus; @@ -641,11 +636,65 @@ internal Status ContextUpsert(ref Key key status = HandleOperationStatus(sessionCtx, sessionCtx, ref pcontext, fasterSession, internalStatus, false, out _); } + if (pcontext.IsNewRecord) + { + Debug.Assert(status == Status.OK); + ref RecordInfo recordInfo = ref this.hlog.GetInfo(this.hlog.GetPhysicalAddress(pcontext.logicalAddress)); + UpdateSIForInsert(ref key, ref value, ref recordInfo, pcontext.logicalAddress, fasterSession); + } + Debug.Assert(serialNo >= sessionCtx.serialNum, "Operation serial numbers must be non-decreasing"); sessionCtx.serialNum = serialNo; return status; } + [MethodImpl(MethodImplOptions.AggressiveInlining)] + internal bool UpdateSIForIPU(ref Key key, ref Value value, RecordId recordId, SecondaryIndexSessionBroker indexSessionBroker) + { + // KeyIndexes do not need notification of in-place updates because the key does not change. + if (this.SecondaryIndexBroker.MutableValueIndexCount > 0) + this.SecondaryIndexBroker.Upsert(ref key, ref value, recordId, indexSessionBroker); + return true; + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private void UpdateSIForInsert(ref Key key, ref Value value, ref RecordInfo recordInfo, long address, FasterSession fasterSession) + where FasterSession : IFasterSession + { + if (!fasterSession.SupportsLocking) + UpdateSIForInsertNoLock(ref key, ref value, ref recordInfo, address, fasterSession.SecondaryIndexSessionBroker); + else + UpdateSIForInsertLock(ref key, ref value, ref recordInfo, address, fasterSession); + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private void UpdateSIForInsertNoLock(ref Key key, ref Value value, ref RecordInfo recordInfo, long address, SecondaryIndexSessionBroker indexSessionBroker) + { + if (!recordInfo.Invalid && !recordInfo.Tombstone) + { + var recordId = new RecordId(recordInfo, address); + if (this.SecondaryIndexBroker.MutableKeyIndexCount > 0) + this.SecondaryIndexBroker.Insert(ref key, recordId, indexSessionBroker); + if (this.SecondaryIndexBroker.MutableValueIndexCount > 0) + this.SecondaryIndexBroker.Insert(ref key, ref value, recordId, indexSessionBroker); + } + } + + private void UpdateSIForInsertLock(ref Key key, ref Value value, ref RecordInfo recordInfo, long address, FasterSession fasterSession) + where FasterSession : IFasterSession + { + long context = 0; + fasterSession.Lock(ref recordInfo, ref key, ref value, LockType.Exclusive, ref context); + try + { + UpdateSIForInsertNoLock(ref key, ref value, ref recordInfo, address, fasterSession.SecondaryIndexSessionBroker); + } + finally + { + fasterSession.Unlock(ref recordInfo, ref key, ref value, LockType.Exclusive, context); + } + } + [MethodImpl(MethodImplOptions.AggressiveInlining)] internal Status ContextRMW(ref Key key, ref Input input, ref Output output, Context context, FasterSession fasterSession, long serialNo, FasterExecutionContext sessionCtx) @@ -668,6 +717,15 @@ internal Status ContextRMW(ref Key key, r status = HandleOperationStatus(sessionCtx, sessionCtx, ref pcontext, fasterSession, internalStatus, false, out _); } + if (pcontext.IsNewRecord) + { + Debug.Assert(status == Status.OK || status == Status.NOTFOUND); + long physicalAddress = this.hlog.GetPhysicalAddress(pcontext.logicalAddress); + ref RecordInfo recordInfo = ref this.hlog.GetInfo(physicalAddress); + ref Value value = ref this.hlog.GetValue(physicalAddress); + UpdateSIForInsert(ref key, ref value, ref recordInfo, pcontext.logicalAddress, fasterSession); + } + Debug.Assert(serialNo >= sessionCtx.serialNum, "Operation serial numbers must be non-decreasing"); sessionCtx.serialNum = serialNo; return status; @@ -699,6 +757,15 @@ internal Status ContextDelete( status = HandleOperationStatus(sessionCtx, sessionCtx, ref pcontext, fasterSession, internalStatus, false, out _); } + if (pcontext.IsNewRecord) + { + Debug.Assert(status == Status.OK); + + // No need to lock here; we have just written a new record with a tombstone, so it will not be changed + // TODO - but this can race with an INSERT of the same key... + this.UpdateSIForDelete(ref key, new RecordId(pcontext.recordInfo, pcontext.logicalAddress), isNewRecord: true, fasterSession.SecondaryIndexSessionBroker); + } + Debug.Assert(serialNo >= sessionCtx.serialNum, "Operation serial numbers must be non-decreasing"); sessionCtx.serialNum = serialNo; return status; diff --git a/cs/src/core/Index/FASTER/FASTERImpl.cs b/cs/src/core/Index/FASTER/FASTERImpl.cs index 2752cd1b3..56cc26058 100644 --- a/cs/src/core/Index/FASTER/FASTERImpl.cs +++ b/cs/src/core/Index/FASTER/FASTERImpl.cs @@ -396,7 +396,7 @@ internal OperationStatus InternalUpsert( goto CreateNewRecord; } -#region Entry latch operation + #region Entry latch operation if (sessionCtx.phase != Phase.REST) { latchDestination = AcquireLatchUpsert(sessionCtx, bucket, ref status, ref latchOperation, ref entry, logicalAddress); @@ -578,6 +578,7 @@ private OperationStatus CreateNewRecordUpsert( { ref RecordInfo recordInfo = ref hlog.GetInfo(physicalAddress); if (!recordInfo.Tombstone) - { - if (FoldOverSnapshot) + { + if (UseFoldOverCheckpoint) { Debug.Assert(recordInfo.Version == sessionCtx.version); } @@ -780,9 +781,9 @@ internal OperationStatus InternalRMW( } } -#endregion + #endregion -#region Create new record + #region Create new record CreateNewRecord: if (latchDestination != LatchDestination.CreatePendingContext) { @@ -972,6 +973,7 @@ ref hlog.GetValue(newPhysicalAddress, newPhysicalAddress + actualSize), if (foundEntry.word == entry.word) { pendingContext.logicalAddress = newLogicalAddress; + pendingContext.IsNewRecord = this.SecondaryIndexBroker.HasMutableIndexes; } else { @@ -1210,6 +1212,7 @@ internal OperationStatus InternalDelete( if (foundEntry.word == entry.word) { pendingContext.logicalAddress = newLogicalAddress; + pendingContext.IsNewRecord = this.SecondaryIndexBroker.HasMutableIndexes; status = OperationStatus.SUCCESS; goto LatchRelease; } @@ -1255,6 +1258,17 @@ internal OperationStatus InternalDelete( return status; } + [MethodImpl(MethodImplOptions.AggressiveInlining)] + internal bool UpdateSIForDelete(ref Key key, RecordId recordId, bool isNewRecord, SecondaryIndexSessionBroker indexSessionBroker) + { + // TODO: if isNewRecord, we've added a new record to mark a delete, but the index operation won't have the correct recordId here. Should we read it? + if (!isNewRecord) + recordId = default; + if (this.SecondaryIndexBroker.MutableKeyIndexCount > 0 || this.SecondaryIndexBroker.MutableValueIndexCount > 0) + this.SecondaryIndexBroker.Delete(ref key, recordId, indexSessionBroker); + return true; + } + #endregion #region ContainsKeyInMemory @@ -1402,7 +1416,9 @@ internal void InternalContinuePendingReadCopyToTail(ref key, ref recordValue, ref recordInfo, newLogicalAddress, fasterSession); + } return status; } else @@ -2007,7 +2029,7 @@ internal OperationStatus InternalTryCopyToTail fht, Functions functions, long unti public long NextAddress => enumerationPhase == 0 ? iter1.NextAddress : iter2.NextAddress; + public long BeginAddress => enumerationPhase == 0 ? iter1.BeginAddress : iter2.BeginAddress; + + public long EndAddress => enumerationPhase == 0 ? iter1.EndAddress : iter2.EndAddress; + public void Dispose() { iter1?.Dispose(); diff --git a/cs/src/core/Index/FASTER/FASTERLegacy.cs b/cs/src/core/Index/FASTER/FASTERLegacy.cs index e8d55a9fd..ffb6b2d15 100644 --- a/cs/src/core/Index/FASTER/FASTERLegacy.cs +++ b/cs/src/core/Index/FASTER/FASTERLegacy.cs @@ -413,6 +413,8 @@ public IHeapContainer GetHeapContainer(ref Input input) public bool CompletePendingWithOutputs(out CompletedOutputIterator completedOutputs, bool wait = false, bool spinWaitForCommit = false) => throw new NotImplementedException(); + + public SecondaryIndexSessionBroker SecondaryIndexSessionBroker => null; } } diff --git a/cs/src/core/Index/FASTER/LogAccessor.cs b/cs/src/core/Index/FASTER/LogAccessor.cs index e93275d33..c52a98db8 100644 --- a/cs/src/core/Index/FASTER/LogAccessor.cs +++ b/cs/src/core/Index/FASTER/LogAccessor.cs @@ -150,28 +150,26 @@ public void ShiftHeadAddress(long newHeadAddress, bool wait) /// /// Subscribe to records (in batches) as they become read-only in the log - /// Currently, we support only one subscriber to the log (easy to extend) /// Subscriber only receives new log updates from the time of subscription onwards /// To scan the historical part of the log, use the Scan(...) method /// /// Observer to which scan iterator is pushed public IDisposable Subscribe(IObserver> readOnlyObserver) { - allocator.OnReadOnlyObserver = readOnlyObserver; - return new LogSubscribeDisposable(allocator, true); + allocator.AddReadOnlyObserver(readOnlyObserver); + return new LogSubscribeDisposable(allocator, readOnlyObserver, true); } /// /// Subscribe to records (in batches) as they get evicted from main memory. - /// Currently, we support only one subscriber to the log (easy to extend) /// Subscriber only receives eviction updates from the time of subscription onwards /// To scan the historical part of the log, use the Scan(...) method /// /// Observer to which scan iterator is pushed public IDisposable SubscribeEvictions(IObserver> evictionObserver) { - allocator.OnEvictionObserver = evictionObserver; - return new LogSubscribeDisposable(allocator, false); + allocator.AddEvictionObserver(evictionObserver); + return new LogSubscribeDisposable(allocator, evictionObserver, false); } /// @@ -180,20 +178,22 @@ public IDisposable SubscribeEvictions(IObserver> class LogSubscribeDisposable : IDisposable { private readonly AllocatorBase allocator; + private readonly IObserver> observer; private readonly bool readOnly; - public LogSubscribeDisposable(AllocatorBase allocator, bool readOnly) + public LogSubscribeDisposable(AllocatorBase allocator, IObserver> observer, bool readOnly) { this.allocator = allocator; + this.observer = observer; this.readOnly = readOnly; } public void Dispose() { if (readOnly) - allocator.OnReadOnlyObserver = null; + allocator.RemoveReadOnlyObserver(observer); else - allocator.OnEvictionObserver = null; + allocator.RemoveEvictionObserver(observer); } } diff --git a/cs/src/core/Index/Interfaces/IFasterSession.cs b/cs/src/core/Index/Interfaces/IFasterSession.cs index aa35804c1..94663b78b 100644 --- a/cs/src/core/Index/Interfaces/IFasterSession.cs +++ b/cs/src/core/Index/Interfaces/IFasterSession.cs @@ -1,4 +1,7 @@ -namespace FASTER.core +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT license. + +namespace FASTER.core { /// /// Provides thread management and callback to checkpoint completion (called state machine). @@ -24,5 +27,7 @@ internal interface IFasterSession : IAdvance bool CompletePendingWithOutputs(out CompletedOutputIterator completedOutputs, bool wait = false, bool spinWaitForCommit = false); IHeapContainer GetHeapContainer(ref Input input); + + SecondaryIndexSessionBroker SecondaryIndexSessionBroker { get; } } } \ No newline at end of file diff --git a/cs/src/core/Index/Recovery/Checkpoint.cs b/cs/src/core/Index/Recovery/Checkpoint.cs index 7409521f4..7d3f9a498 100644 --- a/cs/src/core/Index/Recovery/Checkpoint.cs +++ b/cs/src/core/Index/Recovery/Checkpoint.cs @@ -99,9 +99,16 @@ internal void InitializeIndexCheckpoint(Guid indexToken) internal void InitializeHybridLogCheckpoint(Guid hybridLogToken, int version) { + // At this point we do not know the FlushedUntilAddress + this.SecondaryIndexBroker.OnPrimaryCheckpointInitiated(new PrimaryCheckpointInfo(version, Constants.kInvalidAddress)); _hybridLogCheckpoint.Initialize(hybridLogToken, version, checkpointManager); } + internal PrimaryCheckpointInfo GetCurrentPrimaryCheckpointInfo() + => GetCurrentPrimaryCheckpointInfo(_hybridLogCheckpoint.info); + + internal PrimaryCheckpointInfo GetCurrentPrimaryCheckpointInfo(HybridLogRecoveryInfo info) + => new PrimaryCheckpointInfo(info.version, UseFoldOverCheckpoint ? info.finalLogicalAddress : info.flushedLogicalAddress); // #endregion } } \ No newline at end of file diff --git a/cs/src/core/Index/Recovery/ICheckpointManager.cs b/cs/src/core/Index/Recovery/ICheckpointManager.cs index a2ccfc506..291e73123 100644 --- a/cs/src/core/Index/Recovery/ICheckpointManager.cs +++ b/cs/src/core/Index/Recovery/ICheckpointManager.cs @@ -82,7 +82,7 @@ public interface ICheckpointManager : IDisposable /// whether or not to scan through the delta log to acquire latest entry /// version upper bound to scan for in the delta log. Function will return the largest version metadata no greater than the given version. /// Metadata, or null if invalid - byte[] GetLogCheckpointMetadata(Guid logToken, DeltaLog deltaLog, bool scanDelta, long recoverTo); + byte[] GetLogCheckpointMetadata(Guid logToken, DeltaLog deltaLog, bool scanDelta = false, long recoverTo = -1); /// /// Get list of index checkpoint tokens, in order of usage preference diff --git a/cs/src/core/Index/Recovery/Recovery.cs b/cs/src/core/Index/Recovery/Recovery.cs index 47a52d536..9cf447e10 100644 --- a/cs/src/core/Index/Recovery/Recovery.cs +++ b/cs/src/core/Index/Recovery/Recovery.cs @@ -304,6 +304,7 @@ private void InternalRecover(IndexCheckpointInfo recoveredICInfo, HybridLogCheck // Recover session information hlog.RecoveryReset(tailAddress, headAddress, recoveredHLCInfo.info.beginAddress, readOnlyAddress); _recoveredSessions = recoveredHLCInfo.info.continueTokens; + checkpointManager.OnRecovery(recoveredICInfo.info.token, recoveredHLCInfo.info.guid); recoveredHLCInfo.Dispose(); } @@ -348,6 +349,7 @@ await RecoverHybridLogFromSnapshotFileAsync(recoveredHLCInfo.info.flushedLogical hlog.RecoveryReset(tailAddress, headAddress, recoveredHLCInfo.info.beginAddress, readOnlyAddress); _recoveredSessions = recoveredHLCInfo.info.continueTokens; + checkpointManager.OnRecovery(recoveredICInfo.info.token, recoveredHLCInfo.info.guid); recoveredHLCInfo.Dispose(); } diff --git a/cs/src/core/Index/Synchronization/HybridLogCheckpointTask.cs b/cs/src/core/Index/Synchronization/HybridLogCheckpointTask.cs index 8e50fee92..c4e1143fb 100644 --- a/cs/src/core/Index/Synchronization/HybridLogCheckpointTask.cs +++ b/cs/src/core/Index/Synchronization/HybridLogCheckpointTask.cs @@ -40,6 +40,7 @@ public virtual void GlobalBeforeEnteringState(SystemState next, faster.lastVersion = lastVersion; break; case Phase.REST: + faster.SecondaryIndexBroker.OnPrimaryCheckpointCompleted(faster.GetCurrentPrimaryCheckpointInfo()); faster._hybridLogCheckpoint.Dispose(); var nextTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); faster.checkpointTcs.SetResult(new LinkedCheckpointInfo { NextTask = nextTcs.Task }); diff --git a/cs/src/core/SecondaryIndex/ISecondaryIndex.cs b/cs/src/core/SecondaryIndex/ISecondaryIndex.cs new file mode 100644 index 000000000..730c60e6f --- /dev/null +++ b/cs/src/core/SecondaryIndex/ISecondaryIndex.cs @@ -0,0 +1,72 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT license. + +using System.Threading; +using System.Threading.Tasks; + +namespace FASTER.core +{ + /// + /// Base, non-generic interface for a SecondaryIndex in FASTER. + /// + public interface ISecondaryIndex + { + /// + /// The friendly identifier of the index. + /// + string Name { get; } + + /// + /// If true, the index is updated immediately on each FasterKV operation; otherwise it is updated only when record pages go ReadOnly. + /// + bool IsMutable { get; } + + /// + /// The slot id to be passed to ; called by . + /// + void SetSessionSlot(long slot); + + /// + /// Returns the latest checkpoint token for this index. + /// + /// The info of the primary checkpoint that has just started + /// Called when the Primary FKV has begun checkpointing the log (either by itself or as part of a full checkpoint). + void OnPrimaryCheckpointInitiated(PrimaryCheckpointInfo currentCheckpointInfo); + + /// + /// Provides information about a just-completed Primary FasterKV checkpoint. + /// + /// The info of the primary checkpoint that has just completed + /// Called when the Primary FKV has completed checkpointing the log (either by itself or as part of a full checkpoint). The Secondary Index should + /// hold this information in-memory so it can store it to and retrieve it from its own checkpoint information, so the can replay + /// records from that point + void OnPrimaryCheckpointCompleted(PrimaryCheckpointInfo completedCheckpointInfo); + + /// + /// Sychronous function to begin recovery of a secondary index; the index recovers itself here and returns a indicating the last Primary FasterKV + /// checkpoint to which it recovered. The will replay records since then, until the index is up to date with the Primary FasterKV. + /// + /// Info of the checkpoint token that was just recovered by the Primary FasterKV; we need to find the most recent Secondary checkpoint whose currentCheckpoint is earlier + /// Whether records with versions beyond checkpoint version need to be undone (and invalidated on log) + /// A task wrapping the Primary FasterKV checkpoint info stored in the secondary index checkpoint that was recovered, or default if not found. + /// This is called after the Primary FKV has recovered itself and before it is open for operations. + PrimaryCheckpointInfo Recover(PrimaryCheckpointInfo primaryRecoveredPci, bool undoNextVersion); + + /// + /// Asychronous function to begin recovery of a secondary index; the index recovers itself here and returns a indicating the last Primary FasterKV + /// checkpoint to which it recovered. The will replay records since then, until the index is up to date with the Primary FasterKV. + /// + /// Info of the checkpoint token that was just recovered by the Primary FasterKV; we need to find the most recent Secondary checkpoint whose currentCheckpoint is earlier + /// Whether records with versions beyond checkpoint version need to be undone (and invalidated on log) + /// Allow cancellation of the operation + /// A task wrapping the Primary FasterKV checkpoint info stored in the secondary index checkpoint that was recovered, or default if not found. + /// This is called after the Primary FKV has recovered itself and before it is open for operations. + Task RecoverAsync(PrimaryCheckpointInfo primaryRecoveredPci, bool undoNextVersion, CancellationToken cancellationToken = default); + + /// + /// Called when the Primary FKV has set the new BeginAddress. + /// + /// + void OnPrimaryTruncate(long newBeginAddress); + } +} \ No newline at end of file diff --git a/cs/src/core/SecondaryIndex/ISecondaryKeyIndex.cs b/cs/src/core/SecondaryIndex/ISecondaryKeyIndex.cs new file mode 100644 index 000000000..5533cf4df --- /dev/null +++ b/cs/src/core/SecondaryIndex/ISecondaryKeyIndex.cs @@ -0,0 +1,64 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT license. + +using System.Threading; + +namespace FASTER.core +{ + /// + /// Interface for a FASTER SecondaryIndex that is derived from the FasterKV Key generic parameter. + /// + public interface ISecondaryKeyIndex : ISecondaryIndex + { + /// + /// Inserts a key into the secondary index. Called only for mutable indexes, on the initial insert of a Key. KeyIndexes do not take RecordIds + /// because they reflect the current value of the primary FasterKV Key. + /// + /// The key to be inserted; always mutable + /// The identifier of the record containing the ; may be used to generate a list + /// The for the primary FasterKV session making this call + /// + /// If the index is mutable and the is already there, this call should be ignored, because it is the result + /// of a race in which the record in the primary FasterKV was updated after the initial insert but before this method + /// was called. + /// + void Insert(ref TKVKey key, RecordId recordId, SecondaryIndexSessionBroker indexSessionBroker); + + /// + /// Upserts a key into the secondary index. This may be called either immediately during a FasterKV operation, or during recovery. + /// + /// The key to be upserted + /// The identifier of the record containing the + /// Whether the recordId was in the mutable region of FASTER. If true, the record may subsequently be Upserted or Deleted. + /// The for the primary FasterKV session making this call + /// + /// For an immutable index, this is the only call made on the interface, when the page containing the has moved to ReadOnly. + /// In this case, is false, and the index may move the to an immutable storage area. + /// + void Upsert(ref TKVKey key, RecordId recordId, bool isMutableRecord, SecondaryIndexSessionBroker indexSessionBroker); + + /// + /// Removes a key from the secondary index. Called only for mutable indexes. + /// + /// The key to be removed + /// The identifier of the record to be removed for the + /// The for the primary FasterKV session making this call + void Delete(ref TKVKey key, RecordId recordId, SecondaryIndexSessionBroker indexSessionBroker); + + /// + /// Scans a range from a page that has gone readonly. + /// + /// The iterator over the region that is going readonly + /// The for the primary FasterKV session making this call + /// The value type for the + void ScanReadOnlyPages(IFasterScanIterator iter, SecondaryIndexSessionBroker indexSessionBroker); + + /// + /// Scans a range of pages after or has completed. + /// + /// The iterator over the records past the extent of recovery that are being replayed + /// The for the primary FasterKV session making this call + /// The value type for the + void RecoveryReplay(IFasterScanIterator iter, SecondaryIndexSessionBroker indexSessionBroker); + } +} \ No newline at end of file diff --git a/cs/src/core/SecondaryIndex/ISecondaryValueIndex.cs b/cs/src/core/SecondaryIndex/ISecondaryValueIndex.cs new file mode 100644 index 000000000..8e5cb2c99 --- /dev/null +++ b/cs/src/core/SecondaryIndex/ISecondaryValueIndex.cs @@ -0,0 +1,66 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT license. + +using System.Threading; + +namespace FASTER.core +{ + /// + /// Interface for a FASTER SecondaryIndex that is derived from the FasterKV Value generic parameter. + /// + public interface ISecondaryValueIndex : ISecondaryIndex + { + /// + /// Inserts a recordId into the secondary index, with the associated value from which the index derives its key(s). + /// Called only for mutable indexes, on the initial insert of a Key. + /// + /// The key for the being inserted + /// The value to be inserted; always mutable + /// The identifier of the record containing the + /// The for the primary FasterKV session making this call + /// + /// If the index is mutable and the is already there for this , + /// this call should be ignored, because it is the result of a race in which the record in the primary FasterKV was + /// updated after the initial insert but before this method was called, so the on this call + /// would overwrite it with an obsolete value. + /// + void Insert(ref TKVKey key, ref TKVValue value, RecordId recordId, SecondaryIndexSessionBroker indexSessionBroker); + + /// + /// Upserts a recordId into the secondary index, with the associated value from which the index derives its key(s). + /// This may be called either immediately during a FasterKV operation, or during recovery. + /// + /// The key for the being upserted + /// The value to be upserted + /// The identifier of the record containing the + /// Whether the recordId was in the mutable region of FASTER; if so, it may subsequently be Upserted or Deleted. + /// The for the primary FasterKV session making this call + /// + /// For an immutable index, this is the only call made on the interface, when the page containing the has moved to ReadOnly. + /// In this case, is false, and the index may move the to an immutable storage area. + /// + void Upsert(ref TKVKey key, ref TKVValue value, RecordId recordId, bool isMutableRecord, SecondaryIndexSessionBroker indexSessionBroker); + + /// + /// Removes a recordId from the secondary index. Called only for mutable indexes. + /// + /// The key for the being deleted + /// The recordId to be removed + /// The for the primary FasterKV session making this call + void Delete(ref TKVKey key, RecordId recordId, SecondaryIndexSessionBroker indexSessionBroker); + + /// + /// Scans a range from a page that has gone readonly. + /// + /// The iterator over the region that is going readonly + /// The for the primary FasterKV session making this call + void ScanReadOnlyPages(IFasterScanIterator iter, SecondaryIndexSessionBroker indexSessionBroker); + + /// + /// Scans a range of pages after or has completed. + /// + /// The iterator over the records past the extent of recovery that are being replayed + /// The for the primary FasterKV session making this call + void RecoveryReplay(IFasterScanIterator iter, SecondaryIndexSessionBroker indexSessionBroker); + } +} \ No newline at end of file diff --git a/cs/src/core/SecondaryIndex/PrimaryCheckpointInfo.cs b/cs/src/core/SecondaryIndex/PrimaryCheckpointInfo.cs new file mode 100644 index 000000000..dbeae0f9f --- /dev/null +++ b/cs/src/core/SecondaryIndex/PrimaryCheckpointInfo.cs @@ -0,0 +1,100 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT license. + +using System; +using System.Diagnostics; + +namespace FASTER.core +{ + /// + /// Carries information about a Primary FasterKV checkpoint. + /// + /// Should be carried by Secondary Index recovery info to be returned to the SecondaryIndexBroker + /// so it knows where to start replaying records to the index. + public struct PrimaryCheckpointInfo : IComparable + { + /// + /// The database version. + /// + public int Version; + + /// + /// The latest immutable address on the Primary FasterKV log at recovery time + /// + public long FlushedUntilAddress; + + /// + /// Constructor + /// + public PrimaryCheckpointInfo(int version, long address) + { + this.Version = version; + this.FlushedUntilAddress = address; + } + + /// + /// Indicates whether this instance has not been assigned values. + /// + public bool IsDefault() => this.Version == 0 && this.FlushedUntilAddress == 0; + + /// + /// We only consider version here + public int CompareTo(PrimaryCheckpointInfo other) => this.Version.CompareTo(other.Version); + + #region Serialization + + /// + /// Serialized byte size of data members + /// + public const int SerializedSize = 8 + 4; + + /// + /// Constructs from a byte array. + /// + public PrimaryCheckpointInfo(byte[] metadata) + { + var offset = 0; + + var slice = metadata.Slice(0, 4); + this.Version = BitConverter.ToInt32(slice, 0); + offset += slice.Length; + + slice = metadata.Slice(offset, 8); + this.FlushedUntilAddress = BitConverter.ToInt64(slice, 0); + offset += slice.Length; + + Debug.Assert(offset == SerializedSize); + } + + /// + /// Converts to a byte array for serialization. + /// + /// + public byte[] ToByteArray() + { + var result = new byte[SerializedSize]; + var offset = 0; + + var bytes = BitConverter.GetBytes(Version); + Array.Copy(bytes, 0, result, offset, bytes.Length); + offset += bytes.Length; + + bytes = BitConverter.GetBytes(FlushedUntilAddress); + Array.Copy(bytes, 0, result, offset, bytes.Length); + offset += bytes.Length; + + Debug.Assert(offset == SerializedSize); + return result; + } + + /// + /// Calculates checksum of data members + /// + public readonly long Checksum() => this.Version ^ this.FlushedUntilAddress; + + #endregion Serialization + + /// + public override string ToString() => $"{nameof(Version)} {Version}, {nameof(FlushedUntilAddress)} {FlushedUntilAddress}"; + } +} diff --git a/cs/src/core/SecondaryIndex/QueryRecord.cs b/cs/src/core/SecondaryIndex/QueryRecord.cs new file mode 100644 index 000000000..28013271b --- /dev/null +++ b/cs/src/core/SecondaryIndex/QueryRecord.cs @@ -0,0 +1,55 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT license. + +using System; + +namespace FASTER.core +{ + /// + /// The wrapper around the provider data stored in the primary faster instance. + /// + /// The type of the key in the primary FasterKV instance + /// The type of the value in the primary FasterKV instance + /// Having this class enables separation between the LogicalAddress stored in the Index-implementing + /// FasterKV instances, and the actual and + /// types. + public class QueryRecord : IDisposable + { + internal IHeapContainer keyContainer; + internal IHeapContainer valueContainer; + + internal QueryRecord(IHeapContainer keyContainer, IHeapContainer valueContainer, RecordId recordId) + { + this.keyContainer = keyContainer; + this.valueContainer = valueContainer; + this.RecordId = recordId; + } + + /// + /// A reference to the record key + /// + public ref TKVKey KeyRef => ref this.keyContainer.Get(); + + /// + /// A reference to the record value + /// + public ref TKVValue ValueRef => ref this.valueContainer.Get(); + + /// + /// The ID of the record with this key and value; useful for post-query processing across multiple indexes. + /// + public RecordId RecordId { get; } + + /// + public void Dispose() + { + this.keyContainer?.Dispose(); + this.keyContainer = null; + this.valueContainer?.Dispose(); + this.valueContainer = null; + } + + /// + public override string ToString() => $"Key = {this.keyContainer.Get()}; Value = {this.valueContainer.Get()}"; + } +} diff --git a/cs/src/core/SecondaryIndex/QuerySegment.cs b/cs/src/core/SecondaryIndex/QuerySegment.cs new file mode 100644 index 000000000..6e7308ff5 --- /dev/null +++ b/cs/src/core/SecondaryIndex/QuerySegment.cs @@ -0,0 +1,54 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT license. + +using System; +using System.Collections; +using System.Collections.Generic; + +namespace FASTER.core +{ + /// + /// A segment of a segmented query (one that is retrieved via a resumable enumeration using the Continuation token). + /// + /// + /// + public class QuerySegment : IEnumerable>, IDisposable + { + /// + /// The records for the current segment + /// + public List> Results { get; } + + /// + /// The token to be used to continue the query + /// + public string ContinuationToken { get; } + + /// + /// Indicates whether the query has returned all records. + /// + public bool IsQueryComplete { get; } + + /// + /// Constructor + /// + public QuerySegment(List> results, bool isComplete, string continuationToken) + { + this.Results = results; + this.IsQueryComplete = isComplete; + this.ContinuationToken = continuationToken; + } + + /// + public IEnumerator> GetEnumerator() => this.Results.GetEnumerator(); + + IEnumerator IEnumerable.GetEnumerator() => this.GetEnumerator(); + + /// + public void Dispose() + { + this.Results.ForEach(record => record.Dispose()); + this.Results.Clear(); + } + } +} diff --git a/cs/src/core/SecondaryIndex/ReadOnlyObserver.cs b/cs/src/core/SecondaryIndex/ReadOnlyObserver.cs new file mode 100644 index 000000000..c6ad10154 --- /dev/null +++ b/cs/src/core/SecondaryIndex/ReadOnlyObserver.cs @@ -0,0 +1,31 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT license. + +using System; + +namespace FASTER.core +{ + class ReadOnlyObserver : IObserver> + { + readonly SecondaryIndexBroker secondaryIndexBroker; + + internal ReadOnlyObserver(SecondaryIndexBroker sib) => this.secondaryIndexBroker = sib; + + public void OnCompleted() + { + // Called when AllocatorBase is Disposed + } + + public void OnError(Exception error) + { + // Apparently not called by FASTER + } + + public void OnNext(IFasterScanIterator iter) + { + // We're not operating in the context of a FasterKV session, so we need our own sessionBroker. + using var indexSessionBroker = new SecondaryIndexSessionBroker(); + secondaryIndexBroker.ScanReadOnlyPages(iter, indexSessionBroker); + } + } +} diff --git a/cs/src/core/SecondaryIndex/RecordId.cs b/cs/src/core/SecondaryIndex/RecordId.cs new file mode 100644 index 000000000..6719fe2f0 --- /dev/null +++ b/cs/src/core/SecondaryIndex/RecordId.cs @@ -0,0 +1,77 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT license. + +using System; + +namespace FASTER.core +{ + /// + /// Encapsulates the address and version of a record in the log + /// + public struct RecordId : IComparable + { + private long word; + + internal RecordId(RecordInfo recordInfo, long address) : this(recordInfo.Version, address) { } + + internal RecordId(int version, long address) + { + this.word = default; + this.Address = address; + this.Version = version; + } + + /// + /// The version of the record + /// + public int Version + { + get + { + return (int)(((word & RecordInfo.kVersionMaskInWord) >> RecordInfo.kVersionShiftInWord) & RecordInfo.kVersionMaskInInteger); + } + set + { + word &= ~RecordInfo.kVersionMaskInWord; + word |= ((value & RecordInfo.kVersionMaskInInteger) << RecordInfo.kVersionShiftInWord); + } + } + + /// + /// The logical address of the record + /// + public long Address + { + get + { + return (word & RecordInfo.kPreviousAddressMask); + } + set + { + word &= ~RecordInfo.kPreviousAddressMask; + word |= (value & RecordInfo.kPreviousAddressMask); + } + } + + /// + public int CompareTo(RecordId other) + { + var cmp = this.Address.CompareTo(other.Address); + return cmp == 0 ? this.Version.CompareTo(other.Version) : cmp; + } + + /// + /// Check that the passed record address and version matches this RecordInfo + /// + public bool Equals(long address, int version) => this.Address == address && this.Version == version; + + /// + /// Whether this is a default instance of RecordId + /// + /// This is a method instead of property so it will not be serialized + public bool IsDefault() => this.Address == Constants.kInvalidAddress; + + /// + public override string ToString() => $"address {this.Address}, version {this.Version}"; + } +} diff --git a/cs/src/core/SecondaryIndex/SecondaryIndexBroker.cs b/cs/src/core/SecondaryIndex/SecondaryIndexBroker.cs new file mode 100644 index 000000000..103c8e860 --- /dev/null +++ b/cs/src/core/SecondaryIndex/SecondaryIndexBroker.cs @@ -0,0 +1,317 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT license. + +using System; +using System.Collections.Generic; +using System.Runtime.CompilerServices; +using System.Threading.Tasks; + +#pragma warning disable IDE0056 // Use index operator (^ is not supported on .NET Framework or NETCORE pre-3.0) + +namespace FASTER.core +{ + /// + /// Manages the list of secondary indexes in the FasterKV. + /// + public class SecondaryIndexBroker + { + private ISecondaryKeyIndex[] allKeyIndexes; + private ISecondaryValueIndex[] allValueIndexes; + + // Use arrays for faster traversal. + private ISecondaryKeyIndex[] mutableKeyIndexes = Array.Empty>(); + internal int MutableKeyIndexCount => mutableKeyIndexes.Length; + + private ISecondaryValueIndex[] mutableValueIndexes = Array.Empty>(); + internal int MutableValueIndexCount => mutableValueIndexes.Length; + + readonly object membershipLock = new(); + + readonly FasterKV primaryFkv; + IDisposable logSubscribeDisposable; // Used if we implement index removal, if we go to zero indexes; Dispose() and null this then. + + // Rather than compare to null, use this flag. + internal bool IsSecondaryFkv = false; + + internal SecondaryIndexBroker(FasterKV pFkv) => this.primaryFkv = pFkv; + + /// + /// Adds a secondary index to the list. + /// + /// + public void AddIndex(ISecondaryIndex index) + { + bool isMutable = false; + + void AppendToArray(ref TIndex[] vec, TIndex idx) + { + var resizedVec = new TIndex[vec is null ? 1 : vec.Length + 1]; + if (vec is not null) + Array.Copy(vec, resizedVec, vec.Length); + resizedVec[resizedVec.Length - 1] = idx; + vec = resizedVec; + } + + bool addSpecific(ref TIndex[] allVec, ref TIndex[] mutableVec, TIndex idx) + where TIndex : ISecondaryIndex + { + if (idx is null) + return false; + if (idx.IsMutable) + { + AppendToArray(ref mutableVec, idx); + isMutable = true; + } + AppendToArray(ref allVec, idx); + return true; + } + + lock (membershipLock) + { + if (!addSpecific(ref allKeyIndexes, ref mutableKeyIndexes, index as ISecondaryKeyIndex) + && !addSpecific(ref allValueIndexes, ref mutableValueIndexes, index as ISecondaryValueIndex)) + throw new SecondaryIndexException("Object is not a KeyIndex or ValueIndex"); + this.HasMutableIndexes |= isMutable; // Note: removing indexes will have to recalculate this + index.SetSessionSlot(SecondaryIndexSessionBroker.NextSessionSlot++); + + if (logSubscribeDisposable is null) + logSubscribeDisposable = primaryFkv.Log.Subscribe(new ReadOnlyObserver(primaryFkv.SecondaryIndexBroker)); + } + } + + /// + /// The number of indexes registered. + /// + internal int Count => (allKeyIndexes?.Length ?? 0) + (allValueIndexes?.Length ?? 0); + + /// + /// The number of indexes registered. + /// + internal bool HasMutableIndexes { get; private set; } + + // On failure of an operation, a SecondaryIndexException is thrown by the Index + + #region Mutable KeyIndexes + /// + /// Inserts a mutable key into all mutable secondary key indexes. + /// + [MethodImpl(MethodImplOptions.AggressiveInlining)] + internal void Insert(ref TKVKey key, RecordId recordId, SecondaryIndexSessionBroker indexSessionBroker) + { + var mki = this.mutableKeyIndexes; + foreach (var keyIndex in mki) + keyIndex.Insert(ref key, recordId, indexSessionBroker); + } + + /// + /// Upserts a mutable key into all mutable secondary key indexes. + /// + [MethodImpl(MethodImplOptions.AggressiveInlining)] + internal void Upsert(ref TKVKey key, RecordId recordId, SecondaryIndexSessionBroker indexSessionBroker) + { + var mki = this.mutableKeyIndexes; + foreach (var keyIndex in mki) + keyIndex.Upsert(ref key, recordId, isMutableRecord: false, indexSessionBroker); + } + #endregion Mutable KeyIndexes + + #region Mutable ValueIndexes + /// + /// Inserts a recordId keyed by a mutable value into all mutable secondary value indexes. + /// + [MethodImpl(MethodImplOptions.AggressiveInlining)] + internal void Insert(ref TKVKey key, ref TKVValue value, RecordId recordId, SecondaryIndexSessionBroker indexSessionBroker) + { + var mvi = this.mutableValueIndexes; + foreach (var valueIndex in mvi) + valueIndex.Insert(ref key, ref value, recordId, indexSessionBroker); + } + + /// + /// Upserts a recordId keyed by a mutable value into all mutable secondary value indexes. + /// + [MethodImpl(MethodImplOptions.AggressiveInlining)] + internal void Upsert(ref TKVKey key, ref TKVValue value, RecordId recordId, SecondaryIndexSessionBroker indexSessionBroker) + { + var mvi = this.mutableValueIndexes; + foreach (var valueIndex in mvi) + valueIndex.Upsert(ref key, ref value, recordId, isMutableRecord:false, indexSessionBroker); + } + #endregion Mutable ValueIndexes + + #region Mutable Key and Value Indexes + + /// + /// Deletes recordId for a key from all mutable secondary key indexes. + /// + [MethodImpl(MethodImplOptions.AggressiveInlining)] + internal void Delete(ref TKVKey key, RecordId recordId, SecondaryIndexSessionBroker indexSessionBroker) + { + var mki = this.mutableKeyIndexes; + foreach (var keyIndex in mki) + keyIndex.Delete(ref key, recordId, indexSessionBroker); + + var mvi = this.mutableValueIndexes; + foreach (var valueIndex in mvi) + valueIndex.Delete(ref key, recordId, indexSessionBroker); + } + #endregion Mutable Key and Value Indexes + + /// + /// Upserts a readonly key into all secondary key indexes and readonly values into secondary value indexes. + /// + internal void ScanReadOnlyPages(IFasterScanIterator iter, SecondaryIndexSessionBroker indexSessionBroker) + { + // Use 'iter' for the first index; open others only if we have more than one index. + var inputIter = iter; + + IFasterScanIterator GetIter() + { + var localIter = inputIter ?? primaryFkv.Log.Scan(iter.BeginAddress, iter.EndAddress, ScanBufferingMode.NoBuffering); + inputIter = null; + return localIter; + } + + void ReleaseIter(IFasterScanIterator localIter) + { + if (localIter != iter) + localIter.Dispose(); + } + + var ki = this.allKeyIndexes; + if (ki is not null) + { + foreach (var keyIndex in ki) + { + var localIter = GetIter(); + keyIndex.ScanReadOnlyPages(localIter, indexSessionBroker); + ReleaseIter(localIter); + } + } + var vi = this.allValueIndexes; + if (vi is not null) + { + foreach (var valueIndex in vi) + { + var localIter = GetIter(); + valueIndex.ScanReadOnlyPages(localIter, indexSessionBroker); + ReleaseIter(localIter); + } + } + } + + internal void OnPrimaryCheckpointInitiated(PrimaryCheckpointInfo currentPci) + { + var ki = this.allKeyIndexes; + if (ki is not null) + { + foreach (var keyIndex in ki) + keyIndex.OnPrimaryCheckpointInitiated(currentPci); + } + var vi = this.allValueIndexes; + if (vi is not null) + { + foreach (var valueIndex in vi) + valueIndex.OnPrimaryCheckpointInitiated(currentPci); + } + } + + internal void OnPrimaryCheckpointCompleted(PrimaryCheckpointInfo completedPci) + { + var ki = this.allKeyIndexes; + if (ki is not null) + { + foreach (var keyIndex in ki) + keyIndex.OnPrimaryCheckpointCompleted(completedPci); + } + var vi = this.allValueIndexes; + if (vi is not null) + { + foreach (var valueIndex in vi) + valueIndex.OnPrimaryCheckpointCompleted(completedPci); + } + } + + internal void Recover(PrimaryCheckpointInfo primaryRecoveredPci, bool undoNextVersion) + { + if (this.IsSecondaryFkv) + return; + + // This is called during recovery, before the PrimaryFKV is open for operations, so we do not have to worry about things changing. + // We're not operating in the context of a FasterKV session, so we need our own sessionBroker. + using var indexSessionBroker = new SecondaryIndexSessionBroker(); + + var tasks = new List(); + + var ki = this.allKeyIndexes; + if (ki is not null) + { + foreach (var keyIndex in ki) + tasks.Add(Task.Run(() => RecoverIndex(keyIndex, default, primaryRecoveredPci, undoNextVersion, indexSessionBroker))); + } + var vi = this.allValueIndexes; + if (vi is not null) + { + foreach (var valueIndex in vi) + tasks.Add(Task.Run(() => RecoverIndex(default, valueIndex, primaryRecoveredPci, undoNextVersion, indexSessionBroker))); + } + + Task.WaitAll(tasks.ToArray()); + } + + internal async Task RecoverAsync(PrimaryCheckpointInfo primaryRecoveredPci, bool undoNextVersion) + { + if (this.IsSecondaryFkv) + return; + + // This is called during recovery, before the PrimaryFKV is open for operations, so we do not have to worry about things changing + // We're not operating in the context of a FasterKV session, so we need our own sessionBroker. + using var indexSessionBroker = new SecondaryIndexSessionBroker(); + + var tasks = new List(); + + var ki = this.allKeyIndexes; + if (ki is not null) + { + foreach (var keyIndex in ki) + tasks.Add(RecoverIndexAsync(keyIndex, default, primaryRecoveredPci, undoNextVersion, indexSessionBroker)); + } + var vi = this.allValueIndexes; + if (vi is not null) + { + foreach (var valueIndex in vi) + tasks.Add(RecoverIndexAsync(default, valueIndex, primaryRecoveredPci, undoNextVersion, indexSessionBroker)); + } + + // Must await so we don't Dispose indexSessionBroker before the tasks complete. + await Task.WhenAll(tasks.ToArray()); + } + + void RecoverIndex(ISecondaryKeyIndex keyIndex, ISecondaryValueIndex valueIndex, PrimaryCheckpointInfo recoveredPci, bool undoNextVersion, SecondaryIndexSessionBroker indexSessionBroker) + { + var pci = ((ISecondaryIndex)keyIndex ?? valueIndex).Recover(recoveredPci, undoNextVersion); + RollIndexForward(keyIndex, valueIndex, pci, indexSessionBroker); + } + + async Task RecoverIndexAsync(ISecondaryKeyIndex keyIndex, ISecondaryValueIndex valueIndex, PrimaryCheckpointInfo recoveredPci, bool undoNextVersion, SecondaryIndexSessionBroker indexSessionBroker) + { + var pci = await ((ISecondaryIndex)keyIndex ?? valueIndex).RecoverAsync(recoveredPci, undoNextVersion); + await Task.Run(() => RollIndexForward(keyIndex, valueIndex, pci, indexSessionBroker)); + } + + private void RollIndexForward(ISecondaryKeyIndex keyIndex, ISecondaryValueIndex valueIndex, PrimaryCheckpointInfo pci, SecondaryIndexSessionBroker indexSessionBroker) + { + var index = (ISecondaryIndex)keyIndex ?? valueIndex; + var endAddress = index.IsMutable ? primaryFkv.Log.TailAddress : primaryFkv.Log.ReadOnlyAddress; + if (pci.FlushedUntilAddress < endAddress) + { + var startAddress = Math.Max(pci.FlushedUntilAddress, primaryFkv.Log.BeginAddress); + using var iter = primaryFkv.Log.Scan(startAddress, endAddress); + if (keyIndex is not null) + keyIndex.RecoveryReplay(iter, indexSessionBroker); + else + valueIndex.RecoveryReplay(iter, indexSessionBroker); + } + } + } +} diff --git a/cs/src/core/SecondaryIndex/SecondaryIndexException.cs b/cs/src/core/SecondaryIndex/SecondaryIndexException.cs new file mode 100644 index 000000000..015650b64 --- /dev/null +++ b/cs/src/core/SecondaryIndex/SecondaryIndexException.cs @@ -0,0 +1,34 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT license. + +using System; +using System.Runtime.Serialization; + +namespace FASTER.core +{ + /// + /// FASTER exception base type + /// + public class SecondaryIndexException : FasterException + { + /// + public SecondaryIndexException() + { + } + + /// + public SecondaryIndexException(string message) : base(message) + { + } + + /// + public SecondaryIndexException(string message, Exception innerException) : base(message, innerException) + { + } + + /// + public SecondaryIndexException(SerializationInfo info, StreamingContext context) : base(info, context) + { + } + } +} \ No newline at end of file diff --git a/cs/src/core/SecondaryIndex/SecondaryIndexSessionBroker.cs b/cs/src/core/SecondaryIndex/SecondaryIndexSessionBroker.cs new file mode 100644 index 000000000..fb96db930 --- /dev/null +++ b/cs/src/core/SecondaryIndex/SecondaryIndexSessionBroker.cs @@ -0,0 +1,75 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT license. + +using System; +using System.Threading; + +namespace FASTER.core +{ + /// + /// Allows an index to register its own concept of sessions to be attached to a primary FasterKV session. + /// + public class SecondaryIndexSessionBroker : IDisposable + { + readonly object sessionLock = new object(); + + // This never decreases in size (if we support removal, we'll just null the slot). + object[] indexSessions = new object[0]; + + // The next free session slot; these slots are only valid for the current instantiation of the FasterKV. + // If we support removing indexes, add a stack of free slots, and null the slot in SecondaryIndexSessionBroker.indexSessions. + internal static long NextSessionSlot = 0; + + /// + /// Gets a previously registered session object attached by an index to this primary FasterKV session. + /// + /// The value passed to by + /// The session object that was attached by an index to this primary FasterKV session, or null if none was attached. + public object GetSessionObject(long slot) => slot < this.indexSessions.Length ? this.indexSessions[slot] : null; + + /// + /// REgisters a session object to be attached by an index to this primary FasterKV session. + /// + /// The value passed to by + /// + /// The session object to be attached by an index to this primary FasterKV session. + public object SetSessionObject(long slot, object sessionObject) + { + if (slot >= this.indexSessions.Length) + { + if (slot > NextSessionSlot) + throw new FasterException("Secondary index session slot is out of range"); + + lock (sessionLock) + { + var vec = new object[slot + 1]; + Array.Copy(this.indexSessions, vec, this.indexSessions.Length); + this.indexSessions = vec; + } + } + + this.indexSessions[slot] = sessionObject; + return sessionObject; + } + + /// + public void Dispose() + { + var sessions = this.indexSessions; + if (sessions is null) + return; + + sessions = Interlocked.CompareExchange(ref this.indexSessions, null, sessions); + if (sessions is null) + return; + + foreach (var session in sessions) + { + if (session is IDisposable idisp) + idisp.Dispose(); + } + + GC.SuppressFinalize(this); + } + } +} diff --git a/cs/src/core/Utilities/Utility.cs b/cs/src/core/Utilities/Utility.cs index 433af9d51..3b0e63f78 100644 --- a/cs/src/core/Utilities/Utility.cs +++ b/cs/src/core/Utilities/Utility.cs @@ -2,6 +2,7 @@ // Licensed under the MIT license. using System; +using System.IO; using System.Reflection; using System.Runtime.CompilerServices; using System.Runtime.InteropServices; @@ -68,6 +69,14 @@ internal static bool IsBlittable() return true; } + internal static byte[] Slice(this byte[] source, int offset, int length) + { + // ArraySegment doesn't support ToArray() on all platforms we support + var result = new byte[length]; + Array.Copy(source, offset, result, 0, length); + return result; + } + /// /// Check if two byte arrays of given length are equal /// @@ -325,5 +334,11 @@ private static async Task SlowWithCancellationAsync(Task task, Cancella // make sure any exceptions in the task get unwrapped and exposed to the caller. return await task; } + + internal static ICheckpointManager CreateDefaultCheckpointManager(CheckpointSettings checkpointSettings) + => new DeviceLogCommitCheckpointManager + (new LocalStorageNamedDeviceFactory(), + new DefaultCheckpointNamingScheme( + new DirectoryInfo(checkpointSettings.CheckpointDir ?? ".").FullName), removeOutdated: checkpointSettings.RemoveOutdated); } } \ No newline at end of file diff --git a/cs/test/BasicFASTERTests.cs b/cs/test/BasicFASTERTests.cs index 3c1eadb8e..1e93526ca 100644 --- a/cs/test/BasicFASTERTests.cs +++ b/cs/test/BasicFASTERTests.cs @@ -2,6 +2,7 @@ // Licensed under the MIT license. using System; +using System.Diagnostics; using System.Linq; using FASTER.core; using NUnit.Framework; @@ -244,7 +245,12 @@ public unsafe void NativeInMemWriteRead2() public unsafe void TestShiftHeadAddress([Values] TestUtils.DeviceType deviceType) { InputStruct input = default; - int count = 200; + const int RandSeed = 10; + const int RandRange = 10000; + const int NumRecs = 200; + + Random r = new Random(RandSeed); + var sw = Stopwatch.StartNew(); string filename = path + "TestShiftHeadAddress" + deviceType.ToString() + ".log"; log = TestUtils.CreateTestDevice(deviceType, filename); @@ -253,20 +259,21 @@ public unsafe void TestShiftHeadAddress([Values] TestUtils.DeviceType deviceType session = fht.For(new Functions()).NewSession(); - Random r = new Random(10); - for (int c = 0; c < count; c++) + for (int c = 0; c < NumRecs; c++) { - var i = r.Next(10000); + var i = r.Next(RandRange); var key1 = new KeyStruct { kfield1 = i, kfield2 = i + 1 }; var value = new ValueStruct { vfield1 = i, vfield2 = i + 1 }; session.Upsert(ref key1, ref value, Empty.Default, 0); } + Console.WriteLine($"Time to insert {NumRecs} records: {sw.ElapsedMilliseconds} ms"); - r = new Random(10); + r = new Random(RandSeed); + sw.Restart(); - for (int c = 0; c < count; c++) + for (int c = 0; c < NumRecs; c++) { - var i = r.Next(10000); + var i = r.Next(RandRange); OutputStruct output = default; var key1 = new KeyStruct { kfield1 = i, kfield2 = i + 1 }; var value = new ValueStruct { vfield1 = i, vfield2 = i + 1 }; @@ -279,20 +286,30 @@ public unsafe void TestShiftHeadAddress([Values] TestUtils.DeviceType deviceType Assert.AreEqual(value.vfield1, output.value.vfield1); Assert.AreEqual(value.vfield2, output.value.vfield2); } + Console.WriteLine($"Time to read {NumRecs} in-memory records: {sw.ElapsedMilliseconds} ms"); // Shift head and retry - should not find in main memory now fht.Log.FlushAndEvict(true); - r = new Random(10); - for (int c = 0; c < count; c++) + r = new Random(RandSeed); + sw.Restart(); + + for (int c = 0; c < NumRecs; c++) { - var i = r.Next(10000); + var i = r.Next(RandRange); OutputStruct output = default; var key1 = new KeyStruct { kfield1 = i, kfield2 = i + 1 }; + var value = new ValueStruct { vfield1 = i, vfield2 = i + 1 }; + Status foundStatus = session.Read(ref key1, ref input, ref output, Empty.Default, 0); Assert.AreEqual(Status.PENDING, foundStatus); - session.CompletePending(true); + session.CompletePendingWithOutputs(out var outputs, wait: true); + Assert.IsTrue(outputs.Next()); + Assert.AreEqual(value.vfield1, outputs.Current.Output.value.vfield1); + outputs.Current.Dispose(); + Assert.IsFalse(outputs.Next()); } + Console.WriteLine($"Time to read {NumRecs} on-disk records: {sw.ElapsedMilliseconds} ms"); } [Test] diff --git a/cs/test/LockTests.cs b/cs/test/LockTests.cs index 35d131579..225da38f9 100644 --- a/cs/test/LockTests.cs +++ b/cs/test/LockTests.cs @@ -15,11 +15,6 @@ internal class LockTests { internal class Functions : AdvancedSimpleFunctions { - public override void ConcurrentReader(ref int key, ref int input, ref int value, ref int dst, ref RecordInfo recordInfo, long address) - { - dst = value; - } - bool Increment(ref int dst) { ++dst; diff --git a/cs/test/ObjectTestTypes.cs b/cs/test/ObjectTestTypes.cs index fd883443c..46a19831f 100644 --- a/cs/test/ObjectTestTypes.cs +++ b/cs/test/ObjectTestTypes.cs @@ -230,7 +230,6 @@ public override void SingleReader(ref MyKey key, ref MyInput input, ref MyValue { if (dst == null) dst = new MyOutput(); - dst.value = value; } @@ -288,7 +287,6 @@ public class MyLargeValue public MyLargeValue() { - } public MyLargeValue(int size) diff --git a/cs/test/ObserverTests.cs b/cs/test/ObserverTests.cs new file mode 100644 index 000000000..418132e6e --- /dev/null +++ b/cs/test/ObserverTests.cs @@ -0,0 +1,89 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT license. + +using FASTER.core; +using NUnit.Framework; +using System; + +namespace FASTER.test +{ + [TestFixture] + class BasicObserverTests + { + internal IObserver>[] TestObservers; + readonly object lockObject = new object(); + + class TestObserver : IObserver> + { + internal int Id; + internal bool IsCompleted = false; + internal int OnNextCalls = 0; + + internal TestObserver(int id) => this.Id = id; + + public void OnCompleted() => this.IsCompleted = true; + + public void OnError(Exception error) { /* Apparently not called by FASTER */ } + + public void OnNext(IFasterScanIterator iter) { ++this.OnNextCalls; } + } + + [SetUp] + public void Setup() + { + this.TestObservers = Array.Empty>>(); + } + + [TearDown] + public void TearDown() + { + } + + [Test] + [Category("FasterLog")] + public void ObserverMembershipTests() + { + const int NumObservers = 5; + for (var ii = 0; ii < NumObservers; ++ii) + { + AllocatorBase.AddObserver(ref TestObservers, new TestObserver(ii), lockObject); + } + + Assert.AreEqual(NumObservers, TestObservers.Length); + + // We don't actually use the iterator in this test. + IFasterScanIterator iter = null; + + foreach (var observer in this.TestObservers) + observer.OnNext(iter); + foreach (var observer in this.TestObservers) + Assert.AreEqual(1, (observer as TestObserver).OnNextCalls); + + const int toRemove = 1; + AllocatorBase.RemoveObserver(ref TestObservers, TestObservers[toRemove], lockObject); + Assert.AreEqual(NumObservers - 1, TestObservers.Length); + + for (var ii = 0; ii < NumObservers - 1; ++ii) + { + int comparand = ii < toRemove ? ii : ii + 1; + Assert.AreEqual(comparand, (TestObservers[ii] as TestObserver).Id); + } + + AllocatorBase.RemoveObserver(ref TestObservers, TestObservers[0], lockObject); + Assert.AreEqual(NumObservers - 2, TestObservers.Length); + AllocatorBase.RemoveObserver(ref TestObservers, TestObservers[TestObservers.Length - 1], lockObject); + Assert.AreEqual(NumObservers - 3, TestObservers.Length); + + foreach (var observer in this.TestObservers) + observer.OnNext(iter); + foreach (var observer in this.TestObservers) + Assert.AreEqual(2, (observer as TestObserver).OnNextCalls); + + while (TestObservers.Length > 0) + { + AllocatorBase.RemoveObserver(ref TestObservers, TestObservers[0], lockObject); + } + Assert.AreEqual(0, TestObservers.Length); + } + } +} diff --git a/cs/test/ReadAddressTests.cs b/cs/test/ReadAddressTests.cs index 8cc222b56..2c98a4031 100644 --- a/cs/test/ReadAddressTests.cs +++ b/cs/test/ReadAddressTests.cs @@ -7,12 +7,9 @@ using NUnit.Framework; using System.Threading.Tasks; using System.Threading; -using System.Collections.Generic; -using System.Diagnostics; namespace FASTER.test.readaddress { -#if false // TODO temporarily deactivated due to removal of addresses from single-writer callbacks (also add UpsertAsync where we do RMWAsync/Upsert); update to new test format [TestFixture] public class ReadAddressTests { @@ -63,12 +60,41 @@ public void Reset() } } + private class InsertValueIndex : ISecondaryValueIndex + { + public long lastWriteAddress; + + public string Name => nameof(InsertValueIndex); + + public bool IsMutable => true; + + public void SetSessionSlot(long slot) { } + + public void Delete(ref Key key, RecordId recordId, SecondaryIndexSessionBroker indexSessionBroker) { } + + public void Insert(ref Key key, ref Value value, RecordId recordId, SecondaryIndexSessionBroker indexSessionBroker) => lastWriteAddress = recordId.Address; + + public void Upsert(ref Key key, ref Value value, RecordId recordId, bool isMutableRecord, SecondaryIndexSessionBroker indexSessionBroker) { } + + public void OnPrimaryTruncate(long newBeginAddress) { } + + public void ScanReadOnlyPages(IFasterScanIterator iter, SecondaryIndexSessionBroker indexSessionBroker) { } + + public void OnPrimaryCheckpointInitiated(PrimaryCheckpointInfo recoveredPCI) { } + + public void OnPrimaryCheckpointCompleted(PrimaryCheckpointInfo primaryCheckpointInfo) { } + + public PrimaryCheckpointInfo Recover(PrimaryCheckpointInfo recoveredPCI, bool undoNextVersion) => default; + + public Task RecoverAsync(PrimaryCheckpointInfo recoveredPCI, bool undoNextVersion, CancellationToken cancellationToken = default) => default; + + public void RecoveryReplay(IFasterScanIterator iter, SecondaryIndexSessionBroker indexSessionBroker) { } + } + private static long SetReadOutput(long key, long value) => (key << 32) | value; internal class Functions : AdvancedSimpleFunctions { - internal long lastWriteAddress = Constants.kInvalidAddress; - public override void ConcurrentReader(ref Key key, ref Value input, ref Value value, ref Value dst, ref RecordInfo recordInfo, long address) => dst.value = SetReadOutput(key.key, value.value); @@ -78,26 +104,7 @@ public override void SingleReader(ref Key key, ref Value input, ref Value value, // Return false to force a chain of values. public override bool ConcurrentWriter(ref Key key, ref Value src, ref Value dst, ref RecordInfo recordInfo, long address) => false; - public override bool InPlaceUpdater(ref Key key, ref Value input, ref Value value, ref RecordInfo recordInfo, long address) => false; - - // Record addresses - public override void SingleWriter(ref Key key, ref Value src, ref Value dst, long address) - { - this.lastWriteAddress = address; - base.SingleWriter(ref key, ref src, ref dst, address); - } - - public override void InitialUpdater(ref Key key, ref Value input, ref Value value) - { - this.lastWriteAddress = address; - base.InitialUpdater(ref key, ref input, ref value); - } - - public override void CopyUpdater(ref Key key, ref Value input, ref Value oldValue, ref Value newValue) - { - this.lastWriteAddress = newAddress; - base.CopyUpdater(ref key, ref input, ref oldValue, ref newValue); - } + public override bool InPlaceUpdater(ref Key key, ref Value input, ref Value value, ref Value output, ref RecordInfo recordInfo, long address) => false; // Track the recordInfo for its PreviousAddress. public override void ReadCompletionCallback(ref Key key, ref Value input, ref Value output, Context ctx, Status status, RecordInfo recordInfo) @@ -128,6 +135,7 @@ private class TestStore : IDisposable internal IDevice logDevice; internal string testDir; private readonly bool flush; + readonly InsertValueIndex insertValueIndex = new InsertValueIndex(); internal long[] InsertAddresses = new long[numKeys]; @@ -156,6 +164,8 @@ internal TestStore(bool useReadCache, CopyReadsToTail copyReadsToTail, bool flus serializerSettings: null, comparer: new Key.Comparer() ); + + this.fkv.SecondaryIndexBroker.AddIndex(insertValueIndex); } internal async ValueTask Flush() @@ -186,19 +196,21 @@ internal async Task Populate(bool useRMW, bool useAsync) prevLap = lap; } - var key = new Key(ii % keyMod); - var value = new Value(key.key + LapOffset(lap)); + Key key = new(ii % keyMod); + Value value = new(key.key + LapOffset(lap)); + Value output = new(); var status = useRMW ? useAsync - ? (await session.RMWAsync(ref key, ref value, context, serialNo: lap)).Complete() - : session.RMW(ref key, ref value, serialNo: lap) + ? ((await session.RMWAsync(ref key, ref value, context, serialNo: lap)).Complete()).status + : session.RMW(ref key, ref value, ref output, serialNo: lap) : session.Upsert(ref key, ref value, serialNo: lap); if (status == Status.PENDING) await session.CompletePendingAsync(); - InsertAddresses[ii] = functions.lastWriteAddress; + Assert.IsTrue(insertValueIndex.lastWriteAddress > 0); + InsertAddresses[ii] = insertValueIndex.lastWriteAddress; //Assert.IsTrue(session.ctx.HasNoPendingRequests); // Illustrate that deleted records can be shown as well (unless overwritten by in-place operations, which are not done here) @@ -418,14 +430,13 @@ public async Task ReadAtAddressAsyncTests(bool useReadCache, CopyReadsToTail cop } } - // Test is similar to others but tests the Overload where RadFlag.none is set -- probably don't need all combinations of test but doesn't hurt + // Test is similar to others but tests the Overload where ReadFlag.none is set -- probably don't need all combinations of test but doesn't hurt [TestCase(false, CopyReadsToTail.None, false, false)] [TestCase(false, CopyReadsToTail.FromStorage, true, true)] [TestCase(true, CopyReadsToTail.None, false, true)] [Category("FasterKV")] public async Task ReadAtAddressAsyncReadFlagsNoneTests(bool useReadCache, CopyReadsToTail copyReadsToTail, bool useRMW, bool flush) { - CancellationToken cancellationToken; using var testStore = new TestStore(useReadCache, copyReadsToTail, flush); await testStore.Populate(useRMW, useAsync: true); using var session = testStore.fkv.For(new Functions()).NewSession(); @@ -452,7 +463,7 @@ public async Task ReadAtAddressAsyncReadFlagsNoneTests(bool useReadCache, CopyRe var saveOutput = output; var saveRecordInfo = recordInfo; - readAsyncResult = await session.ReadAtAddressAsync(readAtAddress, ref input, ReadFlags.None, default, serialNo: maxLap + 1, cancellationToken); + readAsyncResult = await session.ReadAtAddressAsync(readAtAddress, ref input, ReadFlags.None, default, serialNo: maxLap + 1); (status, output) = readAsyncResult.Complete(out recordInfo); Assert.AreEqual(saveOutput, output); @@ -469,7 +480,6 @@ public async Task ReadAtAddressAsyncReadFlagsNoneTests(bool useReadCache, CopyRe [Category("FasterKV")] public async Task ReadAtAddressAsyncReadFlagsSkipCacheTests(bool useReadCache, CopyReadsToTail copyReadsToTail, bool useRMW, bool flush) { - CancellationToken cancellationToken; using var testStore = new TestStore(useReadCache, copyReadsToTail, flush); await testStore.Populate(useRMW, useAsync: true); using var session = testStore.fkv.For(new Functions()).NewSession(); @@ -496,7 +506,7 @@ public async Task ReadAtAddressAsyncReadFlagsSkipCacheTests(bool useReadCache, C var saveOutput = output; var saveRecordInfo = recordInfo; - readAsyncResult = await session.ReadAtAddressAsync(readAtAddress, ref input, ReadFlags.SkipReadCache, default, maxLap + 1, cancellationToken); + readAsyncResult = await session.ReadAtAddressAsync(readAtAddress, ref input, ReadFlags.SkipReadCache, default, maxLap + 1); (status, output) = readAsyncResult.Complete(out recordInfo); Assert.AreEqual(saveOutput, output); @@ -575,7 +585,7 @@ public async Task ReadNoKeyAsyncTests(bool useReadCache, CopyReadsToTail copyRea await testStore.Flush(); } } -#endif + [TestFixture] public class ReadMinAddressTests { diff --git a/cs/test/RecoveryTestTypes.cs b/cs/test/RecoveryTestTypes.cs index a0f0bf74c..a3f6b649b 100644 --- a/cs/test/RecoveryTestTypes.cs +++ b/cs/test/RecoveryTestTypes.cs @@ -13,6 +13,8 @@ public struct AdId : IFasterEqualityComparer public long GetHashCode64(ref AdId key) => Utility.GetHashCode(key.adId); public bool Equals(ref AdId k1, ref AdId k2) => k1.adId == k2.adId; + + public override string ToString() => adId.ToString(); } public struct AdInput diff --git a/cs/test/SimpleIndexBase.cs b/cs/test/SimpleIndexBase.cs new file mode 100644 index 000000000..8e3ce6111 --- /dev/null +++ b/cs/test/SimpleIndexBase.cs @@ -0,0 +1,411 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT license. + +using FASTER.core; +using NUnit.Framework; +using System; +using System.Collections.Generic; +using System.IO; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; + +namespace FASTER.test.SecondaryIndex.SimpleIndex +{ + class SimpleIndexBase + { + protected internal long sessionSlot = 0; + protected readonly string indexType; + private Guid sessionId = Guid.Empty; + + protected SimpleIndexBase(string name, bool isKeyIndex, bool isMutableIndex) + { + this.Name = name; + this.IsMutable = isMutableIndex; + this.indexType = isKeyIndex ? "KeyIndex" : "ValueIndex"; + } + + public string Name { get; private set; } + + public bool IsMutable { get; private set; } + + public void SetSessionSlot(long slot) => this.sessionSlot = slot; + + protected void VerifySession(SecondaryIndexSessionBroker indexSessionBroker, bool isMutableRecord = true) + { + Assert.IsNotNull(indexSessionBroker); + var sessionObject = indexSessionBroker.GetSessionObject(this.sessionSlot); + + // For these tests, we will always do all mutable inserts before checkpointing and getting readonly inserts. + // The readonly inserts come from a different SecondaryIndexSessionBroker (owned by the SecondaryIndexBroker + // for the ReadOnlyObserver), so we expect not to find a session there on the first call. + if (isMutableRecord) + Assert.AreEqual(sessionObject is null, this.sessionId == Guid.Empty); + + if (sessionObject is not SimpleIndexSession session) + { + if (sessionObject is not null) + Assert.Fail($"Unexpected session object type {sessionObject.GetType().Name} for {indexType}"); + + if (this.sessionId == Guid.Empty) + this.sessionId = Guid.NewGuid(); + session = new SimpleIndexSession() { Id = this.sessionId }; + indexSessionBroker.SetSessionObject(this.sessionSlot, session); + } + Assert.AreEqual(this.sessionId, session.Id); + } + + public void OnPrimaryCheckpointInitiated(PrimaryCheckpointInfo recoveredPCI) { } + + public void OnPrimaryCheckpointCompleted(PrimaryCheckpointInfo primaryCheckpointInfo) { } + + public PrimaryCheckpointInfo Recover(PrimaryCheckpointInfo recoveredPCI, bool undoNextVersion) => default; + + public Task RecoverAsync(PrimaryCheckpointInfo recoveredPCI, bool undoNextVersion, CancellationToken cancellationToken = default) => default; + } + + class SimpleKeyIndexBase : SimpleIndexBase + where TKey : IComparable + { + // Value is IsMutable + protected internal readonly Dictionary keys = new Dictionary(); + + protected SimpleKeyIndexBase(string name, bool isMutableIndex) + : base(name, isKeyIndex: true, isMutableIndex) + { + } + + // Name methods "BaseXxx" instead of using virtuals so we don't get unwanted implementations, e.g. of Delete taking a key for the ValueIndex. + + internal void BaseDelete(ref TKey key, SecondaryIndexSessionBroker indexSessionBroker) + { + VerifySession(indexSessionBroker); + + Assert.IsTrue(this.keys.TryGetValue(key, out bool isMutable)); + Assert.IsFalse(isMutable); + this.keys.Remove(key); + } + + private void UpdateKey(ref TKey key, bool isMutable, SecondaryIndexSessionBroker indexSessionBroker) + { + VerifySession(indexSessionBroker, isMutable); + + // Key indexes just track the key, and a mutable record for a key may be added after the page for its previous record goes immutable. + this.keys[key] = isMutable; + } + + internal void BaseInsert(ref TKey key, SecondaryIndexSessionBroker indexSessionBroker) + => UpdateKey(ref key, isMutable: true, indexSessionBroker); + + internal void BaseUpsert(ref TKey key, bool isMutableRecord, SecondaryIndexSessionBroker indexSessionBroker) + => UpdateKey(ref key, isMutableRecord, indexSessionBroker); + + // Pseudo-range index + internal void Query(TKey from, TKey to, int expectedCount) + { + var keysFound = this.keys.Keys.Where(k => k.CompareTo(from) >= 0 && k.CompareTo(to) <= 0).OrderBy(k => k).ToArray(); + Assert.AreEqual(expectedCount, keysFound.Length); + } + + internal static void VerifyIntIndex(ISecondaryIndex index, PrimaryFasterKV store) + { + const int from = 42, to = 101; + var indexBase = index as SimpleKeyIndexBase; + Assert.AreEqual(indexBase.IsMutable ? SimpleIndexUtils.NumKeys : 0, indexBase.keys.Count); + + if (store is null) + { + Assert.IsFalse(indexBase.keys.Values.Any(isMutable => !isMutable)); + indexBase.Query(from, to, to - from + 1); + return; + } + + store.Checkpoint(); + Assert.IsFalse(indexBase.keys.Values.Any(isMutable => isMutable)); + indexBase.Query(from, to, to - from + 1); + } + + internal static void VerifyMixedIntIndexes(ISecondaryIndex mutableIndex, ISecondaryIndex immutableIndex, PrimaryFasterKV store) + { + const int from = 42, to = 101; + var mutableIndexBase = mutableIndex as SimpleKeyIndexBase; + var immutableIndexBase = immutableIndex as SimpleKeyIndexBase; + Assert.AreEqual(SimpleIndexUtils.NumKeys, mutableIndexBase.keys.Count); + Assert.AreEqual(0, immutableIndexBase.keys.Count); + + store.Checkpoint(); + Assert.AreEqual(SimpleIndexUtils.NumKeys, mutableIndexBase.keys.Count); + Assert.AreEqual(SimpleIndexUtils.NumKeys, immutableIndexBase.keys.Count); + + mutableIndexBase.Query(from, to, to - from + 1); + immutableIndexBase.Query(from, to, to - from + 1); + } + } + + class SimpleValueIndexBase : SimpleIndexBase + { + protected internal readonly Dictionary> MutableRecords = new Dictionary>(); + protected internal readonly Dictionary> ImmutableRecords = new Dictionary>(); + private readonly Dictionary reverseLookup = new Dictionary(); + readonly Func valueIndexKeyFunc; + + protected SimpleValueIndexBase(string name, Func indexKeyFunc, bool isMutableIndex) + : base(name, isKeyIndex: false, isMutableIndex) + { + this.valueIndexKeyFunc = indexKeyFunc; + } + + // Name methods "BaseXxx" instead of using virtuals so we don't get unwanted implementations, e.g. of Delete taking a key for the ValueIndex. + + internal void BaseDelete(RecordId recordId, SecondaryIndexSessionBroker indexSessionBroker) + { + Assert.IsFalse(recordId.IsDefault()); + VerifySession(indexSessionBroker); + if (!reverseLookup.TryGetValue(recordId, out TValue key)) + Assert.Fail($"RecordId {recordId} not found in revserse lookup for {indexType}"); + + VerifyNotImmutable(ref key, recordId); + if (!MutableRecords.ContainsKey(key)) + Assert.Fail($"{indexType} '{key}' not found as index key"); + MutableRecords.Remove(key); + + Assert.IsTrue(reverseLookup.ContainsKey(recordId)); + reverseLookup.Remove(recordId); + } + + internal void BaseInsert(ref TValue rawKey, RecordId recordId, SecondaryIndexSessionBroker indexSessionBroker) + { + VerifySession(indexSessionBroker); + Assert.IsFalse(recordId.IsDefault()); + var key = this.valueIndexKeyFunc(rawKey); + VerifyNotImmutable(ref key, recordId); + if (!MutableRecords.TryGetValue(key, out var recordIds)) + { + recordIds = new List(); + MutableRecords[key] = recordIds; + } + else if (recordIds.Contains(recordId)) + { + return; + } + recordIds.Add(recordId); + AddToReverseLookup(ref key, recordId); + } + + internal void BaseUpsert(ref TValue rawKey, RecordId recordId, bool isMutableRecord, SecondaryIndexSessionBroker indexSessionBroker) + { + VerifySession(indexSessionBroker, isMutableRecord); + if (isMutableRecord) + { + BaseInsert(ref rawKey, recordId, indexSessionBroker); + return; + } + + // Move from mutable to immutable + var key = this.valueIndexKeyFunc(rawKey); + VerifyNotImmutable(ref key, recordId); + VerifySession(indexSessionBroker, isMutableRecord); + if (MutableRecords.TryGetValue(key, out var recordIds) && recordIds.Contains(recordId)) + { + recordIds.Remove(recordId); + if (recordIds.Count == 0) + MutableRecords.Remove(key); + } + + if (!ImmutableRecords.TryGetValue(key, out recordIds)) + { + recordIds = new List(); + ImmutableRecords[key] = recordIds; + } + recordIds.Add(recordId); + AddToReverseLookup(ref key, recordId); + } + + private readonly List emptyRecordList = new List(); + + internal RecordId[] Query(TValue rawKey) + { + var key = this.valueIndexKeyFunc(rawKey); + if (!MutableRecords.TryGetValue(key, out var mutableRecordList)) + mutableRecordList = emptyRecordList; + if (!ImmutableRecords.TryGetValue(key, out var immutableRecordList)) + immutableRecordList = emptyRecordList; + + return mutableRecordList.Concat(immutableRecordList).ToArray(); + } + + private void VerifyNotImmutable(ref TValue key, RecordId recordId) + { + if (ImmutableRecords.TryGetValue(key, out var recordIds) && recordIds.Contains(recordId)) + Assert.Fail($"Unexpected update of recordId {recordId} for {indexType} '{key}'"); + } + + private void AddToReverseLookup(ref TValue key, RecordId recordId) + { + if (reverseLookup.TryGetValue(recordId, out TValue existingKey)) + { + if (!existingKey.Equals(key)) + Assert.Fail($"Unexpected update of recordId {recordId} for {indexType} '{key}'"); + return; + } + reverseLookup[recordId] = key; + } + + internal static void VerifyMutableIntIndex(ISecondaryIndex secondaryIndex, int indexKeyDivisor, int queryKeyOffset) + { + var indexBase = secondaryIndex as SimpleValueIndexBase; + Assert.AreEqual(SimpleIndexUtils.NumKeys / indexKeyDivisor, indexBase.MutableRecords.Count); + Assert.AreEqual(0, indexBase.ImmutableRecords.Count); + + var records = indexBase.Query(42 + queryKeyOffset); + Assert.AreEqual(indexKeyDivisor, records.Length); + } + + internal static void VerifyImmutableIntIndex(ISecondaryIndex secondaryIndex, int indexKeyDivisor, int queryKeyOffset, PrimaryFasterKV store) + { + var indexBase = secondaryIndex as SimpleValueIndexBase; + Assert.AreEqual(0, indexBase.MutableRecords.Count); + Assert.AreEqual(0, indexBase.ImmutableRecords.Count); + + store.Checkpoint(); + Assert.AreEqual(0, indexBase.MutableRecords.Count); + Assert.AreEqual(SimpleIndexUtils.NumKeys / indexKeyDivisor, indexBase.ImmutableRecords.Count); + + var records = indexBase.Query(42 + queryKeyOffset); + Assert.AreEqual(indexKeyDivisor, records.Length); + } + + internal static void VerifyMixedIntIndexes(ISecondaryIndex mutableIndex, ISecondaryIndex immutableIndex, int indexKeyDivisor, int queryKeyOffset, PrimaryFasterKV store) + { + var mutableIndexBase = mutableIndex as SimpleValueIndexBase; + var immutableIndexBase = immutableIndex as SimpleValueIndexBase; + Assert.AreEqual(SimpleIndexUtils.NumKeys / indexKeyDivisor, mutableIndexBase.MutableRecords.Count); + Assert.AreEqual(0, mutableIndexBase.ImmutableRecords.Count); + Assert.AreEqual(0, immutableIndexBase.MutableRecords.Count); + Assert.AreEqual(0, immutableIndexBase.ImmutableRecords.Count); + + store.Checkpoint(); + Assert.AreEqual(0, mutableIndexBase.MutableRecords.Count); + Assert.AreEqual(SimpleIndexUtils.NumKeys / indexKeyDivisor, mutableIndexBase.ImmutableRecords.Count); + Assert.AreEqual(0, immutableIndexBase.MutableRecords.Count); + Assert.AreEqual(SimpleIndexUtils.NumKeys / indexKeyDivisor, immutableIndexBase.ImmutableRecords.Count); + + var records = mutableIndexBase.Query(42 + queryKeyOffset); + Assert.AreEqual(indexKeyDivisor, records.Length); + + records = immutableIndexBase.Query(42 + queryKeyOffset); + Assert.AreEqual(indexKeyDivisor, records.Length); + } + } + + internal class SimpleIndexSession + { + public Guid Id { get; set; } + } + + internal class PrimaryFasterKV + { + private string testPath; + internal FasterKV fkv; + private IDevice log; + + internal void Setup() + { + testPath = TestContext.CurrentContext.TestDirectory + "/" + Path.GetRandomFileName(); + if (!Directory.Exists(testPath)) + Directory.CreateDirectory(testPath); + + log = Devices.CreateLogDevice(testPath + $"/{TestContext.CurrentContext.Test.Name}.log", false); + + fkv = new FasterKV(SimpleIndexUtils.KeySpace, new LogSettings { LogDevice = log }, + new CheckpointSettings { CheckpointDir = testPath, CheckPointType = CheckpointType.FoldOver } + ); + } + + internal void Populate(bool useAdvancedFunctions, bool useRMW, bool isAsync) + { + if (useAdvancedFunctions) + SimpleIndexUtils.PopulateIntsWithAdvancedFunctions(this.fkv, useRMW, isAsync); + else + SimpleIndexUtils.PopulateInts(this.fkv, useRMW, isAsync); + } + + internal void Checkpoint() + { + this.fkv.TakeFullCheckpoint(out _); + this.fkv.CompleteCheckpointAsync().GetAwaiter().GetResult(); + } + + internal void TearDown() + { + fkv.Dispose(); + fkv = null; + log.Dispose(); + TestUtils.DeleteDirectory(testPath); + } + } + + static class SimpleIndexUtils + { + internal const int NumKeys = 2_000; + internal const long KeySpace = 1L << 14; + + internal const int ValueStart = 10_000; + + internal static void PopulateInts(FasterKV fkv, bool useRMW, bool isAsync) + { + using var session = fkv.NewSession(new SimpleFunctions()); + + // Prpcess the batch of input data + for (int key = 0; key < NumKeys; key++) + { + var value = key + ValueStart; + if (useRMW) + { + if (isAsync) + session.RMWAsync(ref key, ref value).GetAwaiter().GetResult().Complete(); + else + session.RMW(ref key, ref value); + } + else + { + if (isAsync) + session.UpsertAsync(ref key, ref value).GetAwaiter().GetResult().Complete(); + else + session.Upsert(ref key, ref value); + } + } + + // Make sure operations are completed + session.CompletePending(true); + } + + internal static void PopulateIntsWithAdvancedFunctions(FasterKV fkv, bool useRMW, bool isAsync) + { + using var session = fkv.NewSession(new AdvancedSimpleFunctions()); + + // Prpcess the batch of input data + for (int key = 0; key < NumKeys; key++) + { + var value = key + ValueStart; + if (useRMW) + { + if (isAsync) + session.RMWAsync(ref key, ref value).GetAwaiter().GetResult().Complete(); + else + session.RMW(ref key, ref value); + } + else + { + if (isAsync) + session.UpsertAsync(ref key, ref value).GetAwaiter().GetResult().Complete(); + else + session.Upsert(ref key, ref value); + } + } + + // Make sure operations are completed + session.CompletePending(true); + } + } +} diff --git a/cs/test/SimpleKeyIndexTests.cs b/cs/test/SimpleKeyIndexTests.cs new file mode 100644 index 000000000..de536e65d --- /dev/null +++ b/cs/test/SimpleKeyIndexTests.cs @@ -0,0 +1,80 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT license. + +using System; +using FASTER.core; +using NUnit.Framework; + +namespace FASTER.test.SecondaryIndex.SimpleIndex +{ + class SimpleKeyIndexTests + { + class SimpleKeyIndex : SimpleKeyIndexBase, ISecondaryKeyIndex + where TKey : IComparable + { + internal SimpleKeyIndex(string name, bool isMutableIndex) : base(name, isMutableIndex: isMutableIndex) { } + + public void Delete(ref TKey key, RecordId recordId, SecondaryIndexSessionBroker indexSessionBroker) + => BaseDelete(ref key, indexSessionBroker); + + public void Insert(ref TKey key, RecordId recordId, SecondaryIndexSessionBroker indexSessionBroker) + => BaseInsert(ref key, indexSessionBroker); + + public void Upsert(ref TKey key, RecordId recordId, bool isMutableRecord, SecondaryIndexSessionBroker indexSessionBroker) + => BaseUpsert(ref key, isMutableRecord, indexSessionBroker); + + public void OnPrimaryTruncate(long newBeginAddress) { } + + public void ScanReadOnlyPages(IFasterScanIterator iter, SecondaryIndexSessionBroker indexSessionBroker) + { + while (iter.GetNext(out var recordInfo)) + Upsert(ref iter.GetKey(), new RecordId(recordInfo.Version, iter.CurrentAddress), isMutableRecord: false, indexSessionBroker); + } + + public void RecoveryReplay(IFasterScanIterator iter, SecondaryIndexSessionBroker indexSessionBroker) { } + } + + readonly PrimaryFasterKV store = new PrimaryFasterKV(); + + [SetUp] + public void Setup() => store.Setup(); + + [TearDown] + public void TearDown() => store.TearDown(); + + private ISecondaryIndex CreateIndex(bool isMutable, bool isAsync) + => new SimpleKeyIndex($"{TestContext.CurrentContext.Test.Name}_mutable_{(isAsync ? "async" : "sync")}", isMutable); + + [Test] + [Category(TestUtils.SecondaryIndexCategory)] + public void MutableInsertTest([Values] bool useAdvancedFunctions, [Values] bool useRMW, [Values] bool isAsync) + { + var secondaryIndex = CreateIndex(isMutable: true, isAsync); + store.fkv.SecondaryIndexBroker.AddIndex(secondaryIndex); + store.Populate(useAdvancedFunctions, useRMW, isAsync); + SimpleKeyIndexBase.VerifyIntIndex(secondaryIndex, store: null); + } + + [Test] + [Category(TestUtils.SecondaryIndexCategory)] + public void ImmutableInsertTest([Values] bool useAdvancedFunctions, [Values] bool useRMW, [Values] bool isAsync) + { + var secondaryIndex = CreateIndex(isMutable: false, isAsync); + store.fkv.SecondaryIndexBroker.AddIndex(secondaryIndex); + store.Populate(useAdvancedFunctions, useRMW, isAsync); + SimpleKeyIndexBase.VerifyIntIndex(secondaryIndex, store); + } + + [Test] + [Category(TestUtils.SecondaryIndexCategory)] + public void MixedInsertTest([Values] bool useAdvancedFunctions, [Values] bool useRMW, [Values] bool isAsync) + { + var mutableIndex = CreateIndex(isMutable: true, isAsync); + var immutableIndex = CreateIndex(isMutable: false, isAsync); + store.fkv.SecondaryIndexBroker.AddIndex(mutableIndex); + store.fkv.SecondaryIndexBroker.AddIndex(immutableIndex); + store.Populate(useAdvancedFunctions, useRMW, isAsync); + SimpleKeyIndexBase.VerifyMixedIntIndexes(mutableIndex, immutableIndex, store); + } + } +} diff --git a/cs/test/SimpleRecoveryTest.cs b/cs/test/SimpleRecoveryTest.cs index b0c39997a..1ae2cd2c8 100644 --- a/cs/test/SimpleRecoveryTest.cs +++ b/cs/test/SimpleRecoveryTest.cs @@ -72,7 +72,7 @@ public async ValueTask LocalDeviceSimpleRecoveryTest([Values] CheckpointType che { checkpointManager = new DeviceLogCommitCheckpointManager( new LocalStorageNamedDeviceFactory(), - new DefaultCheckpointNamingScheme($"{TestUtils.MethodTestDir}/{TestUtils.AzureTestDirectory}")); + new DefaultCheckpointNamingScheme($"{TestUtils.MethodTestDir}/chkpt")); await SimpleRecoveryTest1_Worker(checkpointType, isAsync, testCommitCookie); checkpointManager.PurgeAll(); } @@ -140,9 +140,16 @@ private async ValueTask SimpleRecoveryTest1_Worker(CheckpointType checkpointType var status = session2.Read(ref inputArray[key], ref inputArg, ref output, Empty.Default, 0); if (status == Status.PENDING) - session2.CompletePending(true); + { + session2.CompletePendingWithOutputs(out var outputs, wait: true); + Assert.IsTrue(outputs.Next()); + output = outputs.Current.Output; + Assert.IsFalse(outputs.Next()); + outputs.Current.Dispose(); + } else - Assert.IsTrue(output.value.numClicks == key); + Assert.AreEqual(Status.OK, status); + Assert.AreEqual(key, output.value.numClicks); } session2.Dispose(); } @@ -193,7 +200,7 @@ public async ValueTask SimpleRecoveryTest2([Values]CheckpointType checkpointType session2.CompletePending(true); else { - Assert.IsTrue(output.value.numClicks == key); + Assert.AreEqual(key, output.value.numClicks); } } session2.Dispose(); @@ -248,8 +255,8 @@ public class AdSimpleFunctions : FunctionsBase : SimpleValueIndexBase, ISecondaryValueIndex + { + internal SimpleValueIndex(string name, Func indexKeyFunc, bool isMutableIndex) : base(name, indexKeyFunc, isMutableIndex: isMutableIndex) { } + + public void Delete(ref TKey key, RecordId recordId, SecondaryIndexSessionBroker indexSessionBroker) + => BaseDelete(recordId, indexSessionBroker); + + public void Insert(ref TKey key, ref TValue value, RecordId recordId, SecondaryIndexSessionBroker indexSessionBroker) + => BaseInsert(ref value, recordId, indexSessionBroker); + + public void Upsert(ref TKey key, ref TValue value, RecordId recordId, bool isMutableRecord, SecondaryIndexSessionBroker indexSessionBroker) + => BaseUpsert(ref value, recordId, isMutableRecord, indexSessionBroker); + + public void OnPrimaryTruncate(long newBeginAddress) { } + + public void ScanReadOnlyPages(IFasterScanIterator iter, SecondaryIndexSessionBroker indexSessionBroker) + { + while (iter.GetNext(out var recordInfo)) + Upsert(ref iter.GetKey(), ref iter.GetValue(), new RecordId(recordInfo.Version, iter.CurrentAddress), isMutableRecord: false, indexSessionBroker); + } + + public void RecoveryReplay(IFasterScanIterator iter, SecondaryIndexSessionBroker indexSessionBroker) { } + } + + private const int valueDivisor = 50; + readonly PrimaryFasterKV store = new PrimaryFasterKV(); + + [SetUp] + public void Setup() => store.Setup(); + + [TearDown] + public void TearDown() => store.TearDown(); + + private ISecondaryIndex CreateIndex(bool isMutable, bool isAsync, Func indexKeyFunc) + => new SimpleValueIndex($"{TestContext.CurrentContext.Test.Name}_mutable_{(isAsync ? "async" : "sync")}", indexKeyFunc, isMutable); + + [Test] + [Category(TestUtils.SecondaryIndexCategory)] + public void MutableInsertTest([Values] bool useAdvancedFunctions, [Values] bool useRMW, [Values] bool isAsync) + { + var secondaryIndex = CreateIndex(isMutable: true, isAsync, rawValue => (rawValue - SimpleIndexUtils.ValueStart) / valueDivisor); + store.fkv.SecondaryIndexBroker.AddIndex(secondaryIndex); + store.Populate(useAdvancedFunctions, useRMW, isAsync); + SimpleValueIndexBase.VerifyMutableIntIndex(secondaryIndex, valueDivisor, SimpleIndexUtils.ValueStart); + } + + [Test] + [Category(TestUtils.SecondaryIndexCategory)] + public void ImmutableInsertTest([Values] bool useAdvancedFunctions, [Values] bool useRMW, [Values] bool isAsync) + { + var secondaryIndex = CreateIndex(isMutable: false, isAsync, rawValue => (rawValue - SimpleIndexUtils.ValueStart) / valueDivisor); + store.fkv.SecondaryIndexBroker.AddIndex(secondaryIndex); + store.Populate(useAdvancedFunctions, useRMW, isAsync); + SimpleValueIndexBase.VerifyImmutableIntIndex(secondaryIndex, valueDivisor, SimpleIndexUtils.ValueStart, store); + } + + [Test] + [Category(TestUtils.SecondaryIndexCategory)] + public void MixedInsertTest([Values] bool useAdvancedFunctions, [Values] bool useRMW, [Values] bool isAsync) + { + var mutableIndex = CreateIndex(isMutable: true, isAsync, rawValue => (rawValue - SimpleIndexUtils.ValueStart) / valueDivisor); + var immutableIndex = CreateIndex(isMutable: false, isAsync, rawValue => (rawValue - SimpleIndexUtils.ValueStart) / valueDivisor); + store.fkv.SecondaryIndexBroker.AddIndex(mutableIndex); + store.fkv.SecondaryIndexBroker.AddIndex(immutableIndex); + store.Populate(useAdvancedFunctions, useRMW, isAsync); + SimpleValueIndexBase.VerifyMixedIntIndexes(mutableIndex, immutableIndex, valueDivisor, SimpleIndexUtils.ValueStart, store); + } + } +} diff --git a/cs/test/TestUtils.cs b/cs/test/TestUtils.cs index 1e4200c84..4096c1186 100644 --- a/cs/test/TestUtils.cs +++ b/cs/test/TestUtils.cs @@ -14,6 +14,9 @@ namespace FASTER.test { internal static class TestUtils { + internal const string SecondaryIndexCategory = "SecondaryIndex"; + internal const string HashValueIndexCategory = "HashValueIndex"; + /// /// Delete a directory recursively ///