Skip to content

Commit 10178f4

Browse files
committed
Refactor message class to command envelope
1 parent d3961b9 commit 10178f4

File tree

15 files changed

+90
-88
lines changed

15 files changed

+90
-88
lines changed

src/InEngine.Core/AbstractCommand.cs

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -51,12 +51,14 @@ public void UpdateProgress(int tick)
5151
#region Scheduling
5252
public virtual void Execute(IJobExecutionContext context)
5353
{
54-
var properties = GetType().GetProperties();
55-
context.MergedJobDataMap.ToList().ForEach(x => {
56-
var property = properties.FirstOrDefault(y => y.Name == x.Key);
57-
if (property != null)
58-
property.SetValue(this, x.Value);
59-
});
54+
if (context != null) {
55+
var properties = GetType().GetProperties();
56+
context.MergedJobDataMap.ToList().ForEach(x => {
57+
var property = properties.FirstOrDefault(y => y.Name == x.Key);
58+
if (property != null)
59+
property.SetValue(this, x.Value);
60+
});
61+
}
6062

6163
try
6264
{

src/InEngine.Core/Queuing/Clients/Database/MessageModel.cs renamed to src/InEngine.Core/Queuing/Clients/Database/CommandEnvelopeModel.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
namespace InEngine.Core.Queuing.Clients.Database
22
{
3-
public class MessageModel : Message
3+
public class CommandEnvelopeModel : CommandEnvelope
44
{
55
public string Status { get; set; }
66
public string QueueName { get; set; }

src/InEngine.Core/Queuing/Clients/Database/MessageStatus.cs renamed to src/InEngine.Core/Queuing/Clients/Database/CommandEnvelopeStatus.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
namespace InEngine.Core.Queuing.Clients.Database
22
{
3-
public static class MessageStatus
3+
public static class CommandEnvelopeStatus
44
{
55
public static string Pending { get => "Pending"; }
66
public static string InProgress { get => "InProgress"; }

src/InEngine.Core/Queuing/Clients/Database/QueueDbContext.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ namespace InEngine.Core.Queuing.Clients.Database
55
{
66
public class QueueDbContext : DbContext
77
{
8-
public DbSet<MessageModel> Messages { get; set; }
8+
public DbSet<CommandEnvelopeModel> Messages { get; set; }
99
public string MessageTableName { get; set; }
1010

1111
public QueueDbContext(string messageTableName)
@@ -21,7 +21,7 @@ public QueueDbContext(DbConnection existingConnection, bool contextOwnsConnectio
2121

2222
protected override void OnModelCreating(DbModelBuilder modelBuilder)
2323
{
24-
modelBuilder.Entity<MessageModel>().ToTable(MessageTableName);
24+
modelBuilder.Entity<CommandEnvelopeModel>().ToTable(MessageTableName);
2525
}
2626
}
2727
}

src/InEngine.Core/Queuing/Clients/DatabaseClient.cs

Lines changed: 32 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@ public void Publish(ICommand command)
2121

2222
using (var context = new QueueDbContext(QueueBaseName))
2323
{
24-
context.Messages.Add(new MessageModel() {
25-
Status = MessageStatus.Pending,
24+
context.Messages.Add(new CommandEnvelopeModel() {
25+
Status = CommandEnvelopeStatus.Pending,
2626
IsCompressed = UseCompression,
2727
CommandClassName = command.GetType().FullName,
2828
CommandAssemblyName = command.GetType().Assembly.GetName().Name + ".dll",
@@ -35,7 +35,7 @@ public void Publish(ICommand command)
3535
public bool Consume()
3636
{
3737

38-
IMessage message;
38+
ICommandEnvelope commandEnvelope;
3939
using (var conn = new SqlConnection())
4040
{
4141
conn.Open();
@@ -48,10 +48,10 @@ public bool Consume()
4848
context.Database.UseTransaction(transaction);
4949
var messageModel = context.Messages
5050
.OrderBy(x => x.QueuedAt)
51-
.FirstOrDefault(x => x.Status == MessageStatus.Pending&& x.QueueName == QueueName);
52-
messageModel.Status = MessageStatus.InProgress;
51+
.FirstOrDefault(x => x.Status == CommandEnvelopeStatus.Pending&& x.QueueName == QueueName);
52+
messageModel.Status = CommandEnvelopeStatus.InProgress;
5353
context.SaveChanges();
54-
message = messageModel;
54+
commandEnvelope = messageModel;
5555
}
5656

5757
transaction.Commit();
@@ -66,7 +66,7 @@ public bool Consume()
6666

6767
try
6868
{
69-
Queue.ExtractCommandInstanceFromMessage(message).Run();
69+
Queue.ExtractCommandInstanceFromMessageAndRun(commandEnvelope);
7070
}
7171
catch (Exception exception)
7272
{
@@ -80,18 +80,18 @@ public bool Consume()
8080
using (var context = new QueueDbContext(conn, false, QueueBaseName))
8181
{
8282
context.Database.UseTransaction(transaction);
83-
var messageModel = context.Messages.FirstOrDefault(x => x.Id == message.Id);
84-
messageModel.Status = MessageStatus.Failed;
83+
var messageModel = context.Messages.FirstOrDefault(x => x.Id == commandEnvelope.Id);
84+
messageModel.Status = CommandEnvelopeStatus.Failed;
8585
context.SaveChanges();
86-
message = messageModel;
86+
commandEnvelope = messageModel;
8787
}
8888

8989
transaction.Commit();
9090
}
9191
catch (Exception dbException)
9292
{
9393
transaction.Rollback();
94-
throw new CommandFailedException("Failed to set queue message from in-progress to failed.", dbException);
94+
throw new CommandFailedException("Failed to set queue commandEnvelope from in-progress to failed.", dbException);
9595
}
9696
}
9797
}
@@ -110,42 +110,42 @@ public bool Consume()
110110
using (var context = new QueueDbContext(conn, false, QueueBaseName))
111111
{
112112
context.Database.UseTransaction(transaction);
113-
var messageModel = context.Messages.FirstOrDefault(x => x.Id == message.Id);
114-
messageModel.Status = MessageStatus.Completed;
113+
var messageModel = context.Messages.FirstOrDefault(x => x.Id == commandEnvelope.Id);
114+
messageModel.Status = CommandEnvelopeStatus.Completed;
115115
context.SaveChanges();
116-
message = messageModel;
116+
commandEnvelope = messageModel;
117117
}
118118
transaction.Commit();
119119
}
120120
catch (Exception dbException)
121121
{
122122
transaction.Rollback();
123-
throw new CommandFailedException("Failed to set queue message from in-progress to completed.", dbException);
123+
throw new CommandFailedException("Failed to set queue commandEnvelope from in-progress to completed.", dbException);
124124
}
125125
}
126126
}
127127
}
128128
catch (Exception exception)
129129
{
130-
throw new CommandFailedException($"Failed to remove completed message from queue", exception);
130+
throw new CommandFailedException($"Failed to remove completed commandEnvelope from queue", exception);
131131
}
132132

133133
return true;
134134
}
135135

136136
public long GetPendingQueueLength()
137137
{
138-
return GetQueueLength(MessageStatus.Pending);
138+
return GetQueueLength(CommandEnvelopeStatus.Pending);
139139
}
140140

141141
public long GetInProgressQueueLength()
142142
{
143-
return GetQueueLength(MessageStatus.InProgress);
143+
return GetQueueLength(CommandEnvelopeStatus.InProgress);
144144
}
145145

146146
public long GetFailedQueueLength()
147147
{
148-
return GetQueueLength(MessageStatus.Failed);
148+
return GetQueueLength(CommandEnvelopeStatus.Failed);
149149
}
150150

151151
public long GetQueueLength(string status)
@@ -158,19 +158,19 @@ public long GetQueueLength(string status)
158158

159159
public bool ClearPendingQueue()
160160
{
161-
ClearQueue(MessageStatus.Pending);
161+
ClearQueue(CommandEnvelopeStatus.Pending);
162162
return true;
163163
}
164164

165165
public bool ClearInProgressQueue()
166166
{
167-
ClearQueue(MessageStatus.InProgress);
167+
ClearQueue(CommandEnvelopeStatus.InProgress);
168168
return true;
169169
}
170170

171171
public bool ClearFailedQueue()
172172
{
173-
ClearQueue(MessageStatus.Failed);
173+
ClearQueue(CommandEnvelopeStatus.Failed);
174174
return true;
175175
}
176176

@@ -198,10 +198,10 @@ public void RepublishFailedMessages()
198198
{
199199
context.Database.UseTransaction(transaction);
200200
var messageModels = context.Messages
201-
.Where(x => x.QueueName == QueueName && x.Status == MessageStatus.Pending)
201+
.Where(x => x.QueueName == QueueName && x.Status == CommandEnvelopeStatus.Pending)
202202
.OrderBy(x => x.QueuedAt);
203203
foreach(var messageModel in messageModels)
204-
messageModel.Status = MessageStatus.InProgress;
204+
messageModel.Status = CommandEnvelopeStatus.InProgress;
205205
context.SaveChanges();
206206
}
207207

@@ -216,30 +216,30 @@ public void RepublishFailedMessages()
216216
}
217217
}
218218

219-
public List<IMessage> PeekPendingMessages(long from, long to)
219+
public List<ICommandEnvelope> PeekPendingMessages(long from, long to)
220220
{
221-
return GetMessages(MessageStatus.Pending, from, to);
221+
return GetMessages(CommandEnvelopeStatus.Pending, from, to);
222222
}
223223

224-
public List<IMessage> PeekInProgressMessages(long from, long to)
224+
public List<ICommandEnvelope> PeekInProgressMessages(long from, long to)
225225
{
226-
return GetMessages(MessageStatus.InProgress, from, to);
226+
return GetMessages(CommandEnvelopeStatus.InProgress, from, to);
227227
}
228228

229-
public List<IMessage> PeekFailedMessages(long from, long to)
229+
public List<ICommandEnvelope> PeekFailedMessages(long from, long to)
230230
{
231-
return GetMessages(MessageStatus.Failed, from, to);
231+
return GetMessages(CommandEnvelopeStatus.Failed, from, to);
232232
}
233233

234-
public List<IMessage> GetMessages(string status, long from, long to)
234+
public List<ICommandEnvelope> GetMessages(string status, long from, long to)
235235
{
236236
using (var context = new QueueDbContext(QueueBaseName))
237237
{
238238
return context.Messages
239239
.Where(x => x.QueueName == QueueName && x.Status == status)
240240
.Skip(Convert.ToInt32(from))
241241
.Take(Convert.ToInt32(to - from))
242-
.Select(x => x as IMessage)
242+
.Select(x => x as ICommandEnvelope)
243243
.ToList();
244244
}
245245
}

src/InEngine.Core/Queuing/Clients/FileClient.cs

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ public void Publish(ICommand command)
4646
{
4747
if (!Directory.Exists(PendingQueuePath))
4848
Directory.CreateDirectory(PendingQueuePath);
49-
var serializedMessage = new Message()
49+
var serializedMessage = new CommandEnvelope()
5050
{
5151
IsCompressed = UseCompression,
5252
CommandClassName = command.GetType().FullName,
@@ -79,15 +79,15 @@ public bool Consume()
7979
return false;
8080
}
8181

82-
var message = File.ReadAllText(inProgressFilePath).DeserializeFromJson<Message>();
82+
var commandEnvelope = File.ReadAllText(inProgressFilePath).DeserializeFromJson<CommandEnvelope>();
8383
try
8484
{
85-
Queue.ExtractCommandInstanceFromMessageAndRun(message as IMessage);
85+
Queue.ExtractCommandInstanceFromMessageAndRun(commandEnvelope as ICommandEnvelope);
8686
}
8787
catch (Exception exception)
8888
{
8989
File.Move(inProgressFilePath, Path.Combine(FailedQueuePath, fileInfo.Name));
90-
throw new CommandFailedException("Failed to consume message.", exception);
90+
throw new CommandFailedException("Failed to consume commandEnvelope.", exception);
9191
}
9292

9393
try
@@ -96,7 +96,7 @@ public bool Consume()
9696
}
9797
catch (Exception exception)
9898
{
99-
throw new CommandFailedException("Failed to move message from in-progress queue.", exception);
99+
throw new CommandFailedException("Failed to move commandEnvelope from in-progress queue.", exception);
100100
}
101101
return true;
102102
}
@@ -148,22 +148,22 @@ public bool ClearQueue(string queuePath)
148148
return true;
149149
}
150150

151-
public List<IMessage> PeekFailedMessages(long from, long to)
151+
public List<ICommandEnvelope> PeekFailedMessages(long from, long to)
152152
{
153153
return PeekMessages(FailedQueuePath, from, to);
154154
}
155155

156-
public List<IMessage> PeekInProgressMessages(long from, long to)
156+
public List<ICommandEnvelope> PeekInProgressMessages(long from, long to)
157157
{
158158
return PeekMessages(InProgressQueuePath, from, to);
159159
}
160160

161-
public List<IMessage> PeekPendingMessages(long from, long to)
161+
public List<ICommandEnvelope> PeekPendingMessages(long from, long to)
162162
{
163163
return PeekMessages(PendingQueuePath, from, to);
164164
}
165165

166-
public List<IMessage> PeekMessages(string queuePath, long from, long to)
166+
public List<ICommandEnvelope> PeekMessages(string queuePath, long from, long to)
167167
{
168168
var maxResults = Convert.ToInt32(to + from);
169169
var files = new DirectoryInfo(queuePath)
@@ -172,11 +172,11 @@ public List<IMessage> PeekMessages(string queuePath, long from, long to)
172172
.ToList();
173173

174174
if (files.Count() <= maxResults)
175-
return files.Select(x => File.ReadAllText(x.FullName).DeserializeFromJson<Message>() as IMessage).ToList();
175+
return files.Select(x => File.ReadAllText(x.FullName).DeserializeFromJson<CommandEnvelope>() as ICommandEnvelope).ToList();
176176

177177
return files
178178
.GetRange(Convert.ToInt32(from), Convert.ToInt32(to))
179-
.Select(x => File.ReadAllText(x.FullName).DeserializeFromJson<Message>() as IMessage)
179+
.Select(x => File.ReadAllText(x.FullName).DeserializeFromJson<CommandEnvelope>() as ICommandEnvelope)
180180
.ToList();
181181
}
182182
}

src/InEngine.Core/Queuing/Clients/RedisClient.cs

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ public void Publish(ICommand command)
3232
{
3333
Redis.ListLeftPush(
3434
PendingQueueName,
35-
new Message() {
35+
new CommandEnvelope() {
3636
IsCompressed = UseCompression,
3737
CommandClassName = command.GetType().FullName,
3838
CommandAssemblyName = command.GetType().Assembly.GetName().Name + ".dll",
@@ -47,13 +47,13 @@ public bool Consume()
4747
var serializedMessage = rawRedisMessageValue.ToString();
4848
if (serializedMessage == null)
4949
return false;
50-
var message = serializedMessage.DeserializeFromJson<Message>();
51-
if (message == null)
50+
var commandEnvelope = serializedMessage.DeserializeFromJson<CommandEnvelope>();
51+
if (commandEnvelope == null)
5252
return false;
5353

5454
try
5555
{
56-
Queue.ExtractCommandInstanceFromMessageAndRun(message as IMessage);
56+
Queue.ExtractCommandInstanceFromMessageAndRun(commandEnvelope as ICommandEnvelope);
5757
}
5858
catch (Exception exception)
5959
{
@@ -68,7 +68,7 @@ public bool Consume()
6868
}
6969
catch (Exception exception)
7070
{
71-
throw new CommandFailedException($"Failed to remove completed message from queue: {InProgressQueueName}", exception);
71+
throw new CommandFailedException($"Failed to remove completed commandEnvelope from queue: {InProgressQueueName}", exception);
7272
}
7373

7474
return true;
@@ -109,26 +109,26 @@ public void RepublishFailedMessages()
109109
Redis.ListRightPopLeftPush(FailedQueueName, PendingQueueName);
110110
}
111111

112-
public List<IMessage> PeekPendingMessages(long from, long to)
112+
public List<ICommandEnvelope> PeekPendingMessages(long from, long to)
113113
{
114114
return GetMessages(PendingQueueName, from, to);
115115
}
116116

117-
public List<IMessage> PeekInProgressMessages(long from, long to)
117+
public List<ICommandEnvelope> PeekInProgressMessages(long from, long to)
118118
{
119119
return GetMessages(InProgressQueueName, from, to);
120120
}
121121

122-
public List<IMessage> PeekFailedMessages(long from, long to)
122+
public List<ICommandEnvelope> PeekFailedMessages(long from, long to)
123123
{
124124
return GetMessages(FailedQueueName, from, to);
125125
}
126126

127-
public List<IMessage> GetMessages(string queueName, long from, long to)
127+
public List<ICommandEnvelope> GetMessages(string queueName, long from, long to)
128128
{
129129
return Redis.ListRange(queueName, from, to)
130130
.ToStringArray()
131-
.Select(x => x.DeserializeFromJson<Message>() as IMessage).ToList();
131+
.Select(x => x.DeserializeFromJson<CommandEnvelope>() as ICommandEnvelope).ToList();
132132
}
133133
}
134134
}

0 commit comments

Comments
 (0)