From 90eff700b895ca660394bb7d4738e954ece567c2 Mon Sep 17 00:00:00 2001 From: Bill Biwer Date: Mon, 18 May 2015 14:01:53 -0500 Subject: [PATCH 1/6] Added non-generic IAggregateRepository and implementations --- .../EventStoreAggregateRepository.cs | 241 ++++++++++++- .../InMemoryAggregateRepository.cs | 34 +- .../InMemoryJsonAggregateRepository.cs | 39 ++- .../MementoBasedAggregateRepository.cs | 63 ++++ .../SqlServerAggregateRepository.cs | 316 ++++++++++++++++-- .../LogicalTypeRegistryBuilder.cs | 1 + .../Domain/AggregateRepositoryTests.cs | 190 +++++++++++ .../EventStoreAggregateRepositoryTests.cs | 20 +- .../InMemoryAggregateRepositoryTests.cs | 15 +- .../InMemoryJsonAggregateRepositoryTests.cs | 35 +- .../SqlServerAggregateRepositoryTests.cs | 28 +- .../Modeling/Domain/Aggregate.cs | 35 +- .../Domain/AggregateEventsLoadFrame.cs | 11 + .../Domain/AggregateEventsSaveFrame.cs | 15 + .../Modeling/Domain/AggregateLoadFrame.cs | 12 + .../Modeling/Domain/AggregateMemento.cs | 24 ++ .../Modeling/Domain/AggregateRepository.cs | 144 +++++++- .../Modeling/Domain/AggregateSaveFrame.cs | 14 + .../Modeling/Domain/AggregateSnapshot.cs | 3 +- .../Domain/AggregateSnapshotLoadFrame.cs | 11 + .../Domain/AggregateSnapshotSaveFrame.cs | 11 + .../Modeling/Domain/IAggregate.cs | 11 + .../Modeling/Domain/IAggregateRepository.cs | 7 + .../Modeling/Domain/IAggregateSnapshot.cs | 11 +- .../Modeling/Domain/IScalarValue.cs | 2 + 25 files changed, 1216 insertions(+), 77 deletions(-) diff --git a/src/iSynaptic.Core.Persistence/EventStoreAggregateRepository.cs b/src/iSynaptic.Core.Persistence/EventStoreAggregateRepository.cs index 918b370..571ea82 100644 --- a/src/iSynaptic.Core.Persistence/EventStoreAggregateRepository.cs +++ b/src/iSynaptic.Core.Persistence/EventStoreAggregateRepository.cs @@ -21,23 +21,254 @@ // THE SOFTWARE. using System; -using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; - using EventStore.ClientAPI; using EventStore.ClientAPI.Exceptions; -using Newtonsoft.Json; using iSynaptic.Commons; -using iSynaptic.Commons.Collections.Generic; using iSynaptic.Commons.Linq; -using iSynaptic.Modeling; using iSynaptic.Modeling.Domain; using iSynaptic.Serialization; +using Newtonsoft.Json; namespace iSynaptic.Core.Persistence { + public class EventStoreAggregateRepository : AggregateRepository + { + private static readonly Guid _offsetEventId = new Guid(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1); + private static readonly EventData _offsetEvent = new EventData(_offsetEventId, "streamOffset", true, Encoding.Default.GetBytes("{}"), null); + + private readonly ILogicalTypeRegistry _logicalTypeRegistry; + + private readonly JsonSerializer _dataSerializer; + private readonly JsonSerializer _metadataSerializer; + + private readonly Func _connectionFactory; + + public EventStoreAggregateRepository(ILogicalTypeRegistry logicalTypeRegistry, Func connectionFactory) + { + _logicalTypeRegistry = Guard.NotNull(logicalTypeRegistry, "logicalTypeRegistry"); + _connectionFactory = Guard.NotNull(connectionFactory, "connectionFactory"); + + _dataSerializer = JsonSerializerBuilder.Build(logicalTypeRegistry); + + var metadataSerializerSettings = JsonSerializerBuilder.BuildSettings(logicalTypeRegistry); + metadataSerializerSettings.TypeNameHandling = TypeNameHandling.None; + + _metadataSerializer = JsonSerializer.Create(metadataSerializerSettings); + } + + protected override async Task GetEvents(object id, int minVersion, int maxVersion) + { + var maxCount = (maxVersion - minVersion) + 1; + + if (maxCount <= 0) + return null; + + using (var cn = _connectionFactory()) + { + await cn.ConnectAsync().ConfigureAwait(false); + + String streamId = BuildStreamIdentifier(id); + + var metadataResult = await cn.GetStreamMetadataAsync(streamId).ConfigureAwait(false); + if (metadataResult.MetastreamVersion == ExpectedVersion.NoStream) + return null; + + string aggregateTypeString; + + if (!metadataResult.StreamMetadata.TryGetValue("aggregateType", out aggregateTypeString)) + { + throw new InvalidOperationException("Aggregate type is not specified in event stream metadata."); + } + + var resolvedEvents = (await cn.ReadStreamEventsForwardAsync(streamId, minVersion, maxCount, false).ConfigureAwait(false)) + .ToMaybe() + .Where(x => x.Status == SliceReadStatus.Success) + .SelectMany(x => x.Events) + .ToArray(); + + if (resolvedEvents.Length > 0) + { + Type aggregateType = _logicalTypeRegistry.LookupActualType(LogicalType.Parse(aggregateTypeString)); + + var events = resolvedEvents + .Select(x => x.Event.Data) + .Select(Encoding.Default.GetString) + .Select(x => _dataSerializer.Deserialize(x)); + + return new AggregateEventsLoadFrame(aggregateType, id, events); + } + + return null; + } + } + + protected async override Task SaveEvents(AggregateEventsSaveFrame frame) + { + var aggregateType = frame.AggregateType; + var id = frame.Id; + var evts = frame.Events.ToArray(); + + if (evts.Length <= 0) + throw new ArgumentException("There are no events to save.", "frame"); + + String streamId = BuildStreamIdentifier(id); + using (var cn = _connectionFactory()) + { + await cn.ConnectAsync().ConfigureAwait(false); + + bool isNewStream = evts[0].Version == 1; + + try + { + var events = evts.Select(e => BuildEventData(e.EventId, e, aggregateType)); + + if (isNewStream) + { + var metadata = BuildStreamMetadata(aggregateType); + await cn.SetStreamMetadataAsync(streamId, ExpectedVersion.NoStream, metadata).ConfigureAwait(false); + + events = new[] { _offsetEvent }.Concat(events); + } + + int expectedVersion = isNewStream + ? ExpectedVersion.EmptyStream + : evts[0].Version - 1; + + await cn.AppendToStreamAsync( + streamId, + expectedVersion, + events).ConfigureAwait(false); + + } + catch (WrongExpectedVersionException ex) + { + throw new AggregateConcurrencyException(ex); + } + } + } + + protected override async Task GetSnapshot(object id, int maxVersion) + { + using (var cn = _connectionFactory()) + { + await cn.ConnectAsync().ConfigureAwait(false); + + String snapshotStreamId = BuildSnapshotStreamIdentifier(id); + + var metadataResult = await cn.GetStreamMetadataAsync(snapshotStreamId).ConfigureAwait(false); + if (metadataResult.MetastreamVersion == ExpectedVersion.NoStream) + return null; + + string aggregateTypeString; + + if (!metadataResult.StreamMetadata.TryGetValue("aggregateType", out aggregateTypeString)) + { + throw new InvalidOperationException("Aggregate type is not specified in event stream metadata."); + } + + var resolvedEvent = (await cn.ReadStreamEventsForwardAsync(snapshotStreamId, 0, int.MaxValue, false).ConfigureAwait(false)) + .ToMaybe() + .Where(x => x.Status == SliceReadStatus.Success) + .SelectMany(x => x.Events) + .TrySingle(); + + var snapshot = resolvedEvent + .Select(x => x.Event.Data) + .Select(Encoding.Default.GetString) + .Select(x => _dataSerializer.Deserialize(x)) + .Where(x => x.Version <= maxVersion); + + if (snapshot.HasValue) + { + Type aggregateType = _logicalTypeRegistry.LookupActualType(LogicalType.Parse(aggregateTypeString)); + + return new AggregateSnapshotLoadFrame(aggregateType, id, snapshot.Value); + } + + return null; + } + } + + protected async override Task SaveSnapshot(AggregateSnapshotSaveFrame frame) + { + using (var cn = _connectionFactory()) + { + await cn.ConnectAsync().ConfigureAwait(false); + + var aggregateType = frame.AggregateType; + var id = frame.Id; + var snapshot = frame.Snapshot; + + String streamId = BuildSnapshotStreamIdentifier(id); + + var metadata = BuildSnapshotStreamMetadata(aggregateType); + + var stream = await cn.ReadStreamEventsForwardAsync(streamId, 0, int.MaxValue, false).ConfigureAwait(false); + if (stream.Status == SliceReadStatus.StreamNotFound) + { + await cn.SetStreamMetadataAsync(streamId, ExpectedVersion.NoStream, metadata).ConfigureAwait(false); + + await cn.AppendToStreamAsync( + streamId, + ExpectedVersion.EmptyStream, + BuildEventData( + snapshot.SnapshotId, + snapshot, + aggregateType) + ).ConfigureAwait(false); + } + else + { + await cn.AppendToStreamAsync( + streamId, + stream.LastEventNumber, + BuildEventData( + snapshot.SnapshotId, + snapshot, + aggregateType) + ).ConfigureAwait(false); + } + } + } + + private StreamMetadata BuildSnapshotStreamMetadata(Type aggregateType) + { + return StreamMetadata.Build() + .SetMaxCount(1) + .SetCustomProperty("aggregateType", _logicalTypeRegistry.LookupLogicalType(aggregateType).ToString()) + .Build(); + } + + private StreamMetadata BuildStreamMetadata(Type aggregateType) + { + return StreamMetadata.Build() + .SetCustomProperty("aggregateType", _logicalTypeRegistry.LookupLogicalType(aggregateType).ToString()) + .Build(); + } + + protected EventData BuildEventData(Guid id, object data, Type aggregateType) + { + return new EventData(id, + _logicalTypeRegistry.LookupLogicalType(data.GetType()).ToString(), + true, + Encoding.Default.GetBytes(_dataSerializer.Serialize(data)), + null); + } + + protected virtual String BuildStreamIdentifier(object id) + { + return _dataSerializer.Serialize(id); + } + + protected virtual String BuildSnapshotStreamIdentifier(object id) + { + return String.Format("{0}-snapshot", BuildStreamIdentifier(id)); + } + } + public class EventStoreAggregateRepository : AggregateRepository where TAggregate : class, IAggregate where TIdentifier : IEquatable diff --git a/src/iSynaptic.Core.Persistence/InMemoryAggregateRepository.cs b/src/iSynaptic.Core.Persistence/InMemoryAggregateRepository.cs index 4d1b07b..e6e89ed 100644 --- a/src/iSynaptic.Core.Persistence/InMemoryAggregateRepository.cs +++ b/src/iSynaptic.Core.Persistence/InMemoryAggregateRepository.cs @@ -27,10 +27,42 @@ using iSynaptic.Commons; using iSynaptic.Commons.Collections.Generic; using iSynaptic.Modeling.Domain; -using iSynaptic.Serialization; namespace iSynaptic.Core.Persistence { + public class InMemoryAggregateRepository : MementoBasedAggregateRepository + { + private readonly Dictionary _state = + new Dictionary(); + + protected override Task> TryLoadMemento(object id) + { + lock (_state) + { + return Task.FromResult(_state.TryGetValue(id)); + } + } + + protected override async Task StoreMemento(Func>> mementoFactory) + { + bool lockTaken = false; + + try + { + Monitor.Enter(_state, ref lockTaken); + + var memento = await mementoFactory(); + _state[memento.Key] = memento.Value; + + } + finally + { + if (lockTaken) + Monitor.Exit(_state); + } + } + } + public class InMemoryAggregateRepository : MementoBasedAggregateRepository where TAggregate : class, IAggregate where TIdentifier : IEquatable diff --git a/src/iSynaptic.Core.Persistence/InMemoryJsonAggregateRepository.cs b/src/iSynaptic.Core.Persistence/InMemoryJsonAggregateRepository.cs index 3dacd07..d312f14 100644 --- a/src/iSynaptic.Core.Persistence/InMemoryJsonAggregateRepository.cs +++ b/src/iSynaptic.Core.Persistence/InMemoryJsonAggregateRepository.cs @@ -27,12 +27,47 @@ using Newtonsoft.Json; using iSynaptic.Commons; using iSynaptic.Commons.Collections.Generic; -using iSynaptic.Modeling; using iSynaptic.Modeling.Domain; using iSynaptic.Serialization; namespace iSynaptic.Core.Persistence { + public class InMemoryJsonAggregateRepository : MementoBasedAggregateRepository + { + private readonly Dictionary _state = + new Dictionary(); + + private readonly JsonSerializer _serializer; + + public InMemoryJsonAggregateRepository(JsonSerializer serializer) + { + _serializer = Guard.NotNull(serializer, "serializer"); + } + + protected override Task> TryLoadMemento(object id) + { + return Task.FromResult(_state.TryGetValue(_serializer.Serialize(id)) + .Select(json => _serializer.Deserialize(json))); + } + + protected override async Task StoreMemento(Func>> mementoFactory) + { + bool lockTaken = false; + try + { + Monitor.Enter(_state, ref lockTaken); + + var memento = await mementoFactory(); + _state[_serializer.Serialize(memento.Key)] = _serializer.Serialize(memento.Value); + } + finally + { + if (lockTaken) + Monitor.Exit(_state); + } + } + } + public class InMemoryJsonAggregateRepository : MementoBasedAggregateRepository where TAggregate : class, IAggregate where TIdentifier : IEquatable @@ -70,4 +105,4 @@ protected override async Task StoreMemento(Func> TryLoadMemento(object id); + protected abstract Task StoreMemento(Func>> mementoFactory); + + protected override async Task GetSnapshot(object id, int maxVersion) + { + return (await TryLoadMemento(id)) + .Where(x => x.Snapshot.Select(y => y.Version <= maxVersion).ValueOrDefault()) + .Select(x => new AggregateSnapshotLoadFrame(x.AggregateType, id, x.Snapshot.Value)) + .ValueOrDefault(); + } + + protected override async Task GetEvents(object id, int minVersion, int maxVersion) + { + return (await TryLoadMemento(id)) + .Select(x => new AggregateEventsLoadFrame( + x.AggregateType, + id, + x.Events + .SkipWhile(y => y.Version < minVersion) + .TakeWhile(y => y.Version <= maxVersion))) + .ValueOrDefault(); + } + + protected override Task SaveSnapshot(AggregateSnapshotSaveFrame frame) + { + return StoreMemento(async () => + { + var aggregateType = frame.AggregateType; + var snapshot = frame.Snapshot; + + var state = (await TryLoadMemento(snapshot.Id)).ValueOrDefault(); + + return KeyValuePair.Create(snapshot.Id, new AggregateMemento(aggregateType, snapshot.ToMaybe(), state != null ? state.Events : null)); + }); + } + + protected override Task SaveEvents(AggregateEventsSaveFrame frame) + { + var aggregateType = frame.AggregateType; + var id = frame.Id; + var events = frame.Events.ToArray(); + + return StoreMemento(async () => + KeyValuePair.Create(id, (await TryLoadMemento(id)) + .Select(x => + { + var lastEvent = x.Events.TryLast(); + var actualVersion = lastEvent.Select(y => y.Version).ValueOrDefault(); + + if (actualVersion != frame.ExpectedVersion) + throw new AggregateConcurrencyException(); + + return new AggregateMemento( + aggregateType, + x.Snapshot, + x.Events.Concat(events.SkipWhile(y => y.Version <= lastEvent.Select(z => z.Version).ValueOrDefault()))); + }) + .ValueOrDefault(() => new AggregateMemento(aggregateType, Maybe.NoValue, events)))); + } + } + public abstract class MementoBasedAggregateRepository : AggregateRepository where TAggregate : class, IAggregate where TIdentifier : IEquatable diff --git a/src/iSynaptic.Core.Persistence/SqlServerAggregateRepository.cs b/src/iSynaptic.Core.Persistence/SqlServerAggregateRepository.cs index 2356559..8d1380f 100644 --- a/src/iSynaptic.Core.Persistence/SqlServerAggregateRepository.cs +++ b/src/iSynaptic.Core.Persistence/SqlServerAggregateRepository.cs @@ -36,7 +36,7 @@ namespace iSynaptic.Core.Persistence { - public static class SqlServerAggregateRepository + public class SqlServerAggregateRepository : AggregateRepository { private static readonly Regex _scriptRegex = new Regex(@"(?