diff --git a/nuget/CommonDomain.nuspec b/nuget/CommonDomain.nuspec index 7d3da07..34c6b9b 100644 --- a/nuget/CommonDomain.nuspec +++ b/nuget/CommonDomain.nuspec @@ -12,7 +12,7 @@ A domain project for quickly implementing CQRS functionality in domain models. A domain project for quickly implementing CQRS functionality in domain models. - + diff --git a/src/proj/CommonDomain.Core/AggregateBase.cs b/src/proj/CommonDomain.Core/AggregateBase.cs index ceace40..cf07553 100644 --- a/src/proj/CommonDomain.Core/AggregateBase.cs +++ b/src/proj/CommonDomain.Core/AggregateBase.cs @@ -53,8 +53,11 @@ protected void RaiseEvent(object @event) } void IAggregate.ApplyEvent(object @event) { - this.RegisteredRoutes.Dispatch(@event); - this.Version++; + foreach (var item in ( ( @event is IEnumerable ) ? @event as IEnumerable : new []{ @event } )) + { + this.RegisteredRoutes.Dispatch(item); + this.Version++; + } } ICollection IAggregate.GetUncommittedEvents() { diff --git a/src/proj/CommonDomain.Persistence.EventStore/CommonDomain.Persistence.EventStore.csproj b/src/proj/CommonDomain.Persistence.EventStore/CommonDomain.Persistence.EventStore.csproj index 7585c39..db53fea 100644 --- a/src/proj/CommonDomain.Persistence.EventStore/CommonDomain.Persistence.EventStore.csproj +++ b/src/proj/CommonDomain.Persistence.EventStore/CommonDomain.Persistence.EventStore.csproj @@ -71,8 +71,9 @@ - - ..\..\packages\EventStore.3.0.11305.44\lib\net40\EventStore.dll + + False + ..\..\..\_lib\EventStore.dll diff --git a/src/proj/CommonDomain.Persistence.EventStore/EventStoreRepository.cs b/src/proj/CommonDomain.Persistence.EventStore/EventStoreRepository.cs index 89e101c..ace6201 100644 --- a/src/proj/CommonDomain.Persistence.EventStore/EventStoreRepository.cs +++ b/src/proj/CommonDomain.Persistence.EventStore/EventStoreRepository.cs @@ -6,11 +6,9 @@ namespace CommonDomain.Persistence.EventStore using global::EventStore; using global::EventStore.Persistence; - public class EventStoreRepository : IRepository, IDisposable + public class EventStoreRepository : IRepository { private const string AggregateTypeHeader = "AggregateType"; - private readonly IDictionary snapshots = new Dictionary(); - private readonly IDictionary streams = new Dictionary(); private readonly IStoreEvents eventStore; private readonly IConstructAggregates factory; private readonly IDetectConflicts conflictDetector; @@ -25,40 +23,22 @@ public EventStoreRepository( this.conflictDetector = conflictDetector; } - public void Dispose() - { - this.Dispose(true); - GC.SuppressFinalize(this); - } - protected virtual void Dispose(bool disposing) - { - if (!disposing) - return; + public virtual TAggregate GetById(Guid id) where TAggregate : class, IAggregate + { + return GetById(id, int.MaxValue); + } - lock (this.streams) - { - foreach (var stream in this.streams) - stream.Value.Dispose(); - - this.snapshots.Clear(); - this.streams.Clear(); - } - } - - public virtual TAggregate GetById(Guid id) where TAggregate : class, IAggregate - { - return GetById(id, int.MaxValue); - } - public virtual TAggregate GetById(Guid id, int versionToLoad) where TAggregate : class, IAggregate { - var snapshot = this.GetSnapshot(id, versionToLoad); - var stream = this.OpenStream(id, versionToLoad, snapshot); - var aggregate = this.GetAggregate(snapshot, stream); + var snapshot = GetSnapshot(id, versionToLoad); + using (var stream = OpenStream(id, versionToLoad, snapshot)) + { + var aggregate = GetAggregate(snapshot, stream); - ApplyEventsToAggregate(versionToLoad, stream, aggregate); + ApplyEventsToAggregate(versionToLoad, stream, aggregate); - return aggregate as TAggregate; + return aggregate as TAggregate; + } } private static void ApplyEventsToAggregate(int versionToLoad, IEventStream stream, IAggregate aggregate) { @@ -69,75 +49,65 @@ private static void ApplyEventsToAggregate(int versionToLoad, IEventStream strea private IAggregate GetAggregate(Snapshot snapshot, IEventStream stream) { var memento = snapshot == null ? null : snapshot.Payload as IMemento; - return this.factory.Build(typeof(TAggregate), stream.StreamId, memento); + return factory.Build(typeof(TAggregate), stream.StreamId, memento, stream.CommittedHeaders); } private Snapshot GetSnapshot(Guid id, int version) { - Snapshot snapshot; - if (!this.snapshots.TryGetValue(id, out snapshot)) - this.snapshots[id] = snapshot = this.eventStore.Advanced.GetSnapshot(id, version); - - return snapshot; + return eventStore.Advanced.GetSnapshot(id, version); } private IEventStream OpenStream(Guid id, int version, Snapshot snapshot) { - IEventStream stream; - if (this.streams.TryGetValue(id, out stream)) - return stream; + return snapshot == null + ? eventStore.OpenStream(id, 0, version) + : eventStore.OpenStream(snapshot, version); + } - stream = snapshot == null - ? this.eventStore.OpenStream(id, 0, version) - : this.eventStore.OpenStream(snapshot, version); + public virtual void Save(IAggregate aggregate, Guid commitId, Action> updateHeaders) + { + var headers = PrepareHeaders(aggregate, updateHeaders); + while (true) + { + using (var stream = PrepareStream(aggregate, headers)) + { + var commitEventCount = stream.CommittedEvents.Count; - return this.streams[id] = stream; - } + try + { + stream.CommitChanges(commitId); + aggregate.ClearUncommittedEvents(); + return; + } + catch (DuplicateCommitException) + { + stream.ClearChanges(); + return; + } + catch (ConcurrencyException e) + { + if (ThrowOnConflict(stream, commitEventCount)) + throw new ConflictingCommandException(e.Message, e); - public virtual void Save(IAggregate aggregate, Guid commitId, Action> updateHeaders) - { - var headers = PrepareHeaders(aggregate, updateHeaders); - while (true) - { - var stream = this.PrepareStream(aggregate, headers); - var commitEventCount = stream.CommittedEvents.Count; - - try - { - stream.CommitChanges(commitId); - aggregate.ClearUncommittedEvents(); - return; - } - catch (DuplicateCommitException) - { - stream.ClearChanges(); - return; - } - catch (ConcurrencyException e) - { - if (this.ThrowOnConflict(stream, commitEventCount)) - throw new ConflictingCommandException(e.Message, e); - - stream.ClearChanges(); - } - catch (StorageException e) - { - throw new PersistenceException(e.Message, e); - } - } + stream.ClearChanges(); + } + catch (StorageException e) + { + throw new PersistenceException(e.Message, e); + } + } + } } private IEventStream PrepareStream(IAggregate aggregate, Dictionary headers) { - IEventStream stream; - if (!this.streams.TryGetValue(aggregate.Id, out stream)) - this.streams[aggregate.Id] = stream = this.eventStore.CreateStream(aggregate.Id); + IEventStream stream = eventStore.OpenStream(aggregate.Id, 0, int.MaxValue); foreach (var item in headers) - stream.UncommittedHeaders[item.Key] = item.Value; - - aggregate.GetUncommittedEvents() - .Cast() - .Select(x => new EventMessage { Body = x }) - .ToList() - .ForEach(stream.Add); + stream.UncommittedHeaders[item.Key] = item.Value; + + aggregate.GetUncommittedEvents() + .Cast() + .Select(x => new EventMessage { Body = x }) + .ToList() + .ForEach(stream.Add); return stream; } @@ -155,7 +125,7 @@ private bool ThrowOnConflict(IEventStream stream, int skip) { var committed = stream.CommittedEvents.Skip(skip).Select(x => x.Body); var uncommitted = stream.UncommittedEvents.Select(x => x.Body); - return this.conflictDetector.ConflictsWith(uncommitted, committed); + return conflictDetector.ConflictsWith(uncommitted, committed); } } } \ No newline at end of file diff --git a/src/proj/CommonDomain.Persistence.EventStore/SagaEventStoreRepository.cs b/src/proj/CommonDomain.Persistence.EventStore/SagaEventStoreRepository.cs index 438d436..bb5ef21 100644 --- a/src/proj/CommonDomain.Persistence.EventStore/SagaEventStoreRepository.cs +++ b/src/proj/CommonDomain.Persistence.EventStore/SagaEventStoreRepository.cs @@ -12,13 +12,15 @@ public class SagaEventStoreRepository : ISagaRepository, IDisposable private const string UndispatchedMessageHeader = "UndispatchedMessage."; private readonly IDictionary streams = new Dictionary(); private readonly IStoreEvents eventStore; + private readonly IConstructSagas sagaFactory; - public SagaEventStoreRepository(IStoreEvents eventStore) + public SagaEventStoreRepository(IStoreEvents eventStore, IConstructSagas sagaFactory) { - this.eventStore = eventStore; + this.eventStore = eventStore; + this.sagaFactory = sagaFactory; } - public void Dispose() + public void Dispose() { this.Dispose(true); GC.SuppressFinalize(this); @@ -37,9 +39,9 @@ protected virtual void Dispose(bool disposing) } } - public TSaga GetById(Guid sagaId) where TSaga : class, ISaga, new() + public TSaga GetById(Guid sagaId) where TSaga : class, ISaga { - return BuildSaga(this.OpenStream(sagaId)); + return BuildSaga(sagaId, this.OpenStream(sagaId)); } private IEventStream OpenStream(Guid sagaId) { @@ -59,9 +61,9 @@ private IEventStream OpenStream(Guid sagaId) return this.streams[sagaId] = stream; } - private static TSaga BuildSaga(IEventStream stream) where TSaga : class, ISaga, new() + private TSaga BuildSaga(Guid sagaId, IEventStream stream) where TSaga : class, ISaga { - var saga = new TSaga(); + var saga = sagaFactory.Build(sagaId); foreach (var @event in stream.CommittedEvents.Select(x => x.Body)) saga.Transition(@event); @@ -128,7 +130,7 @@ private static void Persist(IEventStream stream, Guid commitId) catch (StorageException e) { throw new PersistenceException(e.Message, e); - } + } } } } \ No newline at end of file diff --git a/src/proj/CommonDomain.Persistence/CommonDomain.Persistence.csproj b/src/proj/CommonDomain.Persistence/CommonDomain.Persistence.csproj index 4af358c..e90f23b 100644 --- a/src/proj/CommonDomain.Persistence/CommonDomain.Persistence.csproj +++ b/src/proj/CommonDomain.Persistence/CommonDomain.Persistence.csproj @@ -46,6 +46,7 @@ + diff --git a/src/proj/CommonDomain.Persistence/IConstructAggregates.cs b/src/proj/CommonDomain.Persistence/IConstructAggregates.cs index 965ce57..3f1925d 100644 --- a/src/proj/CommonDomain.Persistence/IConstructAggregates.cs +++ b/src/proj/CommonDomain.Persistence/IConstructAggregates.cs @@ -1,9 +1,11 @@ +using System.Collections.Generic; + namespace CommonDomain.Persistence { using System; public interface IConstructAggregates { - IAggregate Build(Type type, Guid id, IMemento snapshot); + IAggregate Build(Type type, Guid id, IMemento snapshot, IDictionary headers); } } \ No newline at end of file diff --git a/src/proj/CommonDomain.Persistence/IConstructSagas.cs b/src/proj/CommonDomain.Persistence/IConstructSagas.cs new file mode 100644 index 0000000..03a3e69 --- /dev/null +++ b/src/proj/CommonDomain.Persistence/IConstructSagas.cs @@ -0,0 +1,12 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; + +namespace CommonDomain.Persistence +{ + public interface IConstructSagas + { + TSaga Build(Guid id) where TSaga : ISaga; + } +} diff --git a/src/proj/CommonDomain.Persistence/ISagaRepository.cs b/src/proj/CommonDomain.Persistence/ISagaRepository.cs index ed02c49..716654f 100644 --- a/src/proj/CommonDomain.Persistence/ISagaRepository.cs +++ b/src/proj/CommonDomain.Persistence/ISagaRepository.cs @@ -5,7 +5,7 @@ namespace CommonDomain.Persistence public interface ISagaRepository { - TSaga GetById(Guid sagaId) where TSaga : class, ISaga, new(); + TSaga GetById(Guid sagaId) where TSaga : class, ISaga; void Save(ISaga saga, Guid commitId, Action> updateHeaders); } } \ No newline at end of file