Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions src/WebSocketExtensions/Extensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,15 @@ public async static Task<WebSocketMessage> ReceiveMessageAsync(this WebSocket we
}
else
{
return new WebSocketMessage(Encoding.UTF8.GetString(arr), connectionId);
if (receivedResult.Count > 0)
{
return new WebSocketMessage(Encoding.UTF8.GetString(arr), connectionId);
}
else
{
return new WebSocketMessage(connectionId);
}
}

}
}
else if (receivedResult.MessageType == WebSocketMessageType.Close)
Expand Down Expand Up @@ -98,6 +104,7 @@ public static async Task ProcessIncomingMessages(
Guid connectionId,
Action<StringMessageReceivedEventArgs> messageBehavior,
Action<BinaryMessageReceivedEventArgs> binaryBehavior,
Action<HealthMessageReceivedEventArgs> healthBehavior,
Action<WebSocketReceivedResultEventArgs> closeBehavior,
Action<string> logInfo,
CancellationToken cancellationToken = default(CancellationToken))
Expand All @@ -117,7 +124,7 @@ public static async Task ProcessIncomingMessages(
break;
}

msg.SetMessageHandlers(messageBehavior, binaryBehavior, webSocket);
msg.SetMessageHandlers(messageBehavior, binaryBehavior, healthBehavior, webSocket);

messageQueue.Push(msg);
}
Expand Down
17 changes: 17 additions & 0 deletions src/WebSocketExtensions/HealthMessageReceivedEventArgs.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
using System;
using System.Net.WebSockets;

namespace WebSocketExtensions
{
public class HealthMessageReceivedEventArgs
{
public Guid ConnectionId { get; set; }
public WebSocket WebSocket { get; set; }

public HealthMessageReceivedEventArgs(WebSocket webSocket, Guid connectionId)
{
WebSocket = webSocket;
ConnectionId = connectionId;
}
}
}
3 changes: 2 additions & 1 deletion src/WebSocketExtensions/HttpListenerWebSocketServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -263,9 +263,10 @@ private async Task handleClient<TWebSocketBehavior>(HttpListenerContext listener
{
var stringBehavior = MakeSafe<StringMessageReceivedEventArgs>(behavior.OnStringMessage, "behavior.OnStringMessage");
var binaryBehavior = MakeSafe<BinaryMessageReceivedEventArgs>(behavior.OnBinaryMessage, "behavior.OnBinaryMessage");
var healthBehavior = MakeSafe<HealthMessageReceivedEventArgs>((e) => { }, "behavior.OnHealthMessage");
var closeBehavior = MakeSafe<WebSocketReceivedResultEventArgs>((r) => behavior.OnClose(new WebSocketClosedEventArgs(connectionId, r)), "behavior.OnClose");

await webSocketContext.WebSocket.ProcessIncomingMessages(_messageQueue, connectionId, stringBehavior, binaryBehavior, closeBehavior, _logInfo, token);
await webSocketContext.WebSocket.ProcessIncomingMessages(_messageQueue, connectionId, stringBehavior, binaryBehavior, healthBehavior, closeBehavior, _logInfo, token);
}
}
finally
Expand Down
68 changes: 66 additions & 2 deletions src/WebSocketExtensions/WebListenerWebSocketServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Net;
Expand All @@ -15,21 +16,68 @@ namespace WebSocketExtensions
public class WebListenerWebSocketServer : WebSocketReciever, IDisposable
{
private ConcurrentDictionary<Guid, WebSocket> _clients;
private ConcurrentDictionary<Guid, DateTime> _clientsLastReceived;
private ConcurrentDictionary<Guid, CancellationTokenSource> _clientCancellationTokenSources;
private ConcurrentDictionary<string, Func<WebListenerWebSocketServerBehavior>> _behaviors;
private WebListener _webListener;
private Task _listenTask;
private PagingMessageQueue _messageQueue;
private CancellationTokenSource _cancellationTokenSource;
private System.Timers.Timer _healthTimer;

private int _connectedClientCount = 0;
private readonly long _queueThrottleLimit;
private readonly TimeSpan _pingIntervalTimeSpan;
private readonly TimeSpan _timeoutIntervalTimeSpan;

private bool _isDisposing = false;

public WebListenerWebSocketServer(Action<string, bool> logger = null, long queueThrottleLimitBytes = long.MaxValue) : base(logger)
public WebListenerWebSocketServer(Action<string, bool> logger = null, long queueThrottleLimitBytes = long.MaxValue, int pingIntervalMilliseconds = 30000, int timeoutIntervalMilliseconds = 120000) : base(logger)
{
_behaviors = new ConcurrentDictionary<string, Func<WebListenerWebSocketServerBehavior>>();
_clients = new ConcurrentDictionary<Guid, WebSocket>();
_clientsLastReceived = new ConcurrentDictionary<Guid, DateTime>();
_clientCancellationTokenSources = new ConcurrentDictionary<Guid, CancellationTokenSource>();
_queueThrottleLimit = queueThrottleLimitBytes;
_pingIntervalTimeSpan = TimeSpan.FromMilliseconds(pingIntervalMilliseconds);
_timeoutIntervalTimeSpan = TimeSpan.FromMilliseconds(timeoutIntervalMilliseconds);

_healthTimer = new System.Timers.Timer(pingIntervalMilliseconds);

_healthTimer.Elapsed += _healthTimer_Elapsed;

_healthTimer.Start();
}

private void _healthTimer_Elapsed(object sender, System.Timers.ElapsedEventArgs e)
{
foreach(var kvp in _clients)
{
if (_clientsLastReceived.TryGetValue(kvp.Key, out DateTime lastReceived))
{
var lastReceivedDifference = DateTime.UtcNow - lastReceived;

if (lastReceivedDifference > _timeoutIntervalTimeSpan)
{
if (_clientCancellationTokenSources.TryGetValue(kvp.Key, out CancellationTokenSource clientCancellationTokenSource))
{
_logInfo($"Health check triggering client cancellation: {kvp.Key}.");

clientCancellationTokenSource.Cancel();
}
}
else if (lastReceivedDifference > _pingIntervalTimeSpan)
{
SendStringAsync(kvp.Key, "");
}
}
else
{
updateLastReceived(kvp.Key);

SendStringAsync(kvp.Key, "");
}
}
}

public IList<Guid> GetActiveConnectionIds()
Expand Down Expand Up @@ -193,6 +241,11 @@ private async Task listenLoop(WebListener listener, CancellationToken tok)
_logInfo($"Listening loop stopped.");
}

private void updateLastReceived(Guid connectionId)
{
_clientsLastReceived.AddOrUpdate(connectionId, DateTime.UtcNow, (key, oldValue) => DateTime.UtcNow);
}

private async Task handleClient<TWebSocketBehavior>(RequestContext requestContext, Func<TWebSocketBehavior> behaviorBuilder, CancellationToken token)
where TWebSocketBehavior : WebListenerWebSocketServerBehavior
{
Expand Down Expand Up @@ -245,15 +298,20 @@ private async Task handleClient<TWebSocketBehavior>(RequestContext requestContex
return;
}

var clientCancellationTokenSource = new CancellationTokenSource();

try
{
_clientCancellationTokenSources.TryAdd(connectionId, clientCancellationTokenSource);

using (webSocket)
{
var stringBehavior = MakeSafe<StringMessageReceivedEventArgs>(behavior.OnStringMessage, "behavior.OnStringMessage");
var binaryBehavior = MakeSafe<BinaryMessageReceivedEventArgs>(behavior.OnBinaryMessage, "behavior.OnBinaryMessage");
var healthBehavior = MakeSafe<HealthMessageReceivedEventArgs>((r) => { updateLastReceived(r.ConnectionId); behavior.OnHealthMessage(r); }, "behavior.OnHealthMessage");
var closeBehavior = MakeSafe<WebSocketReceivedResultEventArgs>((r) => behavior.OnClose(new WebSocketClosedEventArgs(connectionId, r)), "behavior.OnClose");

await webSocket.ProcessIncomingMessages(_messageQueue, connectionId, stringBehavior, binaryBehavior, closeBehavior, _logInfo, token);
await webSocket.ProcessIncomingMessages(_messageQueue, connectionId, stringBehavior, binaryBehavior, healthBehavior, closeBehavior, _logInfo, CancellationTokenSource.CreateLinkedTokenSource(token, clientCancellationTokenSource.Token).Token);
}
}
finally
Expand All @@ -271,6 +329,11 @@ private async Task handleClient<TWebSocketBehavior>(RequestContext requestContex
}

_logInfo($"Completed HandleClient task for connection id '{connectionId}'.");

_clientCancellationTokenSources.TryRemove(connectionId, out clientCancellationTokenSource);

webSocket.Dispose();
clientCancellationTokenSource.Dispose();
}
}

Expand Down Expand Up @@ -300,6 +363,7 @@ public void Dispose()
if (!_isDisposing)
{
_isDisposing = true;
_healthTimer.Dispose();
stopListeningThread();
_messageQueue?.CompleteAdding();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ public virtual void OnConnectionEstablished(Guid connectionId, RequestContext re
public virtual bool OnValidateContext(RequestContext requestContext, ref int errorStatusCode, ref string statusDescription) { return true; }
public virtual void OnStringMessage(StringMessageReceivedEventArgs e) { }
public virtual void OnBinaryMessage(BinaryMessageReceivedEventArgs e) { }
public virtual void OnHealthMessage(HealthMessageReceivedEventArgs e) { }
public virtual void OnClose(WebSocketClosedEventArgs e) { }
public virtual void OnError(ErrorEventArgs e) { }
}
Expand Down
4 changes: 3 additions & 1 deletion src/WebSocketExtensions/WebSocketClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ public class WebSocketClient : WebSocketReciever, IDisposable
{
public Action<StringMessageReceivedEventArgs> MessageHandler { get; set; } = (e) => { };
public Action<BinaryMessageReceivedEventArgs> BinaryHandler { get; set; } = (e) => { };
public Action<HealthMessageReceivedEventArgs> HealthHandler { get; set; } = (e) => { };
public Action<WebSocketReceivedResultEventArgs> CloseHandler { get; set; } = (e) => { };
public Action<ClientWebSocketOptions> ConfigureOptionsBeforeConnect { get; set; } = (e) => { };

Expand Down Expand Up @@ -38,11 +39,12 @@ public WebSocketClient(Action<string, bool> logger = null, Guid? clientId = null

var messageBehavior = MakeSafe(MessageHandler, "MessageHandler");
var binaryBehavior = MakeSafe(BinaryHandler, "BinaryHandler");
var healthBehavior = MakeSafe<HealthMessageReceivedEventArgs>((e) => { _client.SendStringAsync(""); HealthHandler(e); }, "HealthHandler");
_closeBehavior = MakeSafe(CloseHandler, "CloseHandler");

_messageQueue = new PagingMessageQueue("WebSocketClient", _logError, _recieveQueueLimitBytes);

_incomingMessagesTask = Task.Factory.StartNew(async () => await _client.ProcessIncomingMessages(_messageQueue, _clientId, messageBehavior, binaryBehavior, _closeBehavior, _logInfo, _cancellationTokenSource.Token));
_incomingMessagesTask = Task.Factory.StartNew(async () => await _client.ProcessIncomingMessages(_messageQueue, _clientId, messageBehavior, binaryBehavior, healthBehavior, _closeBehavior, _logInfo, _cancellationTokenSource.Token));
}

public Action<T> MakeSafe<T>(Action<T> torun, string handlerName)
Expand Down
6 changes: 3 additions & 3 deletions src/WebSocketExtensions/WebSocketExtensions.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@
<RepositoryUrl>https://github.com/maxfridbe/websocketextensions</RepositoryUrl>
<RepositoryType>git</RepositoryType>
<PackageReleaseNotes>Feel free to contribute, Integration Tests Work but no other gaurentees.</PackageReleaseNotes>
<Version>2.1.6.0</Version>
<AssemblyVersion>2.1.6.0</AssemblyVersion>
<FileVersion>2.1.6.0</FileVersion>
<Version>2.1.7.0</Version>
<AssemblyVersion>2.1.7.0</AssemblyVersion>
<FileVersion>2.1.7.0</FileVersion>

<IncludeSymbols>true</IncludeSymbols>
<SymbolPackageFormat>snupkg</SymbolPackageFormat>
Expand Down
15 changes: 14 additions & 1 deletion src/WebSocketExtensions/WebSocketMessage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ public class WebSocketMessage : IDisposable
{
public Action<StringMessageReceivedEventArgs> StringBehavior { get; private set; }
public Action<BinaryMessageReceivedEventArgs> BinaryBehavior { get; private set; }
public Action<HealthMessageReceivedEventArgs> HealthBehavior { get; private set; }

public Guid ConnectionId { get; private set; }
public bool InMemory => _bindata != null;
Expand All @@ -25,6 +26,11 @@ public class WebSocketMessage : IDisposable
private string _pagePath = null;
private static string PAGING_TEMP_PATH = Path.GetTempPath();

public WebSocketMessage(Guid connectionId)
{
ConnectionId = connectionId;
}

public WebSocketMessage(byte[] data, Guid connectionId) {
_bindata = data;
ConnectionId = connectionId;
Expand Down Expand Up @@ -72,10 +78,12 @@ public byte[] GetBinData()
public void SetMessageHandlers(
Action<StringMessageReceivedEventArgs> messageBehavior,
Action<BinaryMessageReceivedEventArgs> binaryBehavior,
Action<HealthMessageReceivedEventArgs> healthBehavior,
WebSocket webSocket)
{
StringBehavior = messageBehavior;
BinaryBehavior = binaryBehavior;
HealthBehavior = healthBehavior;
_webSocket = webSocket;
}

Expand All @@ -93,9 +101,13 @@ public void HandleMessage(Action<string> logError)
}
else if (Exception != null)
{

logError($"Exception in read thread of connection {ConnectionId}:\r\n {ExceptionMessage}\r\n{Exception}\r\n{ (Exception.InnerException != null ? Exception.InnerException.ToString() : String.Empty) }");
}
else
{
var args = new HealthMessageReceivedEventArgs(_webSocket, ConnectionId);
HealthBehavior(args);
}
}

public void Dispose()
Expand All @@ -107,6 +119,7 @@ public void Dispose()
_bindata = null;
StringBehavior = null;
BinaryBehavior = null;
HealthBehavior = null;
}
}
}