From 8c98fd9d1cd5f50aa34ace82bf025b95b9d484f0 Mon Sep 17 00:00:00 2001 From: Ryan Braun Date: Tue, 30 Nov 2021 09:59:04 -0600 Subject: [PATCH] Added logic to check the health of a client connection. --- src/WebSocketExtensions/Extensions.cs | 13 +++- .../HealthMessageReceivedEventArgs.cs | 17 +++++ .../HttpListenerWebSocketServer.cs | 3 +- .../WebListenerWebSocketServer.cs | 68 ++++++++++++++++++- .../WebListenerWebSocketServerBehavior.cs | 1 + src/WebSocketExtensions/WebSocketClient.cs | 4 +- .../WebSocketExtensions.csproj | 6 +- src/WebSocketExtensions/WebSocketMessage.cs | 15 +++- 8 files changed, 116 insertions(+), 11 deletions(-) create mode 100644 src/WebSocketExtensions/HealthMessageReceivedEventArgs.cs diff --git a/src/WebSocketExtensions/Extensions.cs b/src/WebSocketExtensions/Extensions.cs index f0fadb0..0023352 100644 --- a/src/WebSocketExtensions/Extensions.cs +++ b/src/WebSocketExtensions/Extensions.cs @@ -36,9 +36,15 @@ public async static Task ReceiveMessageAsync(this WebSocket we } else { - return new WebSocketMessage(Encoding.UTF8.GetString(arr), connectionId); + if (receivedResult.Count > 0) + { + return new WebSocketMessage(Encoding.UTF8.GetString(arr), connectionId); + } + else + { + return new WebSocketMessage(connectionId); + } } - } } else if (receivedResult.MessageType == WebSocketMessageType.Close) @@ -98,6 +104,7 @@ public static async Task ProcessIncomingMessages( Guid connectionId, Action messageBehavior, Action binaryBehavior, + Action healthBehavior, Action closeBehavior, Action logInfo, CancellationToken cancellationToken = default(CancellationToken)) @@ -117,7 +124,7 @@ public static async Task ProcessIncomingMessages( break; } - msg.SetMessageHandlers(messageBehavior, binaryBehavior, webSocket); + msg.SetMessageHandlers(messageBehavior, binaryBehavior, healthBehavior, webSocket); messageQueue.Push(msg); } diff --git a/src/WebSocketExtensions/HealthMessageReceivedEventArgs.cs b/src/WebSocketExtensions/HealthMessageReceivedEventArgs.cs new file mode 100644 index 0000000..24cb521 --- /dev/null +++ b/src/WebSocketExtensions/HealthMessageReceivedEventArgs.cs @@ -0,0 +1,17 @@ +using System; +using System.Net.WebSockets; + +namespace WebSocketExtensions +{ + public class HealthMessageReceivedEventArgs + { + public Guid ConnectionId { get; set; } + public WebSocket WebSocket { get; set; } + + public HealthMessageReceivedEventArgs(WebSocket webSocket, Guid connectionId) + { + WebSocket = webSocket; + ConnectionId = connectionId; + } + } +} \ No newline at end of file diff --git a/src/WebSocketExtensions/HttpListenerWebSocketServer.cs b/src/WebSocketExtensions/HttpListenerWebSocketServer.cs index e91070c..8eade52 100644 --- a/src/WebSocketExtensions/HttpListenerWebSocketServer.cs +++ b/src/WebSocketExtensions/HttpListenerWebSocketServer.cs @@ -263,9 +263,10 @@ private async Task handleClient(HttpListenerContext listener { var stringBehavior = MakeSafe(behavior.OnStringMessage, "behavior.OnStringMessage"); var binaryBehavior = MakeSafe(behavior.OnBinaryMessage, "behavior.OnBinaryMessage"); + var healthBehavior = MakeSafe((e) => { }, "behavior.OnHealthMessage"); var closeBehavior = MakeSafe((r) => behavior.OnClose(new WebSocketClosedEventArgs(connectionId, r)), "behavior.OnClose"); - await webSocketContext.WebSocket.ProcessIncomingMessages(_messageQueue, connectionId, stringBehavior, binaryBehavior, closeBehavior, _logInfo, token); + await webSocketContext.WebSocket.ProcessIncomingMessages(_messageQueue, connectionId, stringBehavior, binaryBehavior, healthBehavior, closeBehavior, _logInfo, token); } } finally diff --git a/src/WebSocketExtensions/WebListenerWebSocketServer.cs b/src/WebSocketExtensions/WebListenerWebSocketServer.cs index 795422a..9ab7ebe 100644 --- a/src/WebSocketExtensions/WebListenerWebSocketServer.cs +++ b/src/WebSocketExtensions/WebListenerWebSocketServer.cs @@ -2,6 +2,7 @@ using System; using System.Collections.Concurrent; using System.Collections.Generic; +using System.Diagnostics; using System.IO; using System.Linq; using System.Net; @@ -15,21 +16,68 @@ namespace WebSocketExtensions public class WebListenerWebSocketServer : WebSocketReciever, IDisposable { private ConcurrentDictionary _clients; + private ConcurrentDictionary _clientsLastReceived; + private ConcurrentDictionary _clientCancellationTokenSources; private ConcurrentDictionary> _behaviors; private WebListener _webListener; private Task _listenTask; private PagingMessageQueue _messageQueue; private CancellationTokenSource _cancellationTokenSource; + private System.Timers.Timer _healthTimer; private int _connectedClientCount = 0; private readonly long _queueThrottleLimit; + private readonly TimeSpan _pingIntervalTimeSpan; + private readonly TimeSpan _timeoutIntervalTimeSpan; + private bool _isDisposing = false; - public WebListenerWebSocketServer(Action logger = null, long queueThrottleLimitBytes = long.MaxValue) : base(logger) + public WebListenerWebSocketServer(Action logger = null, long queueThrottleLimitBytes = long.MaxValue, int pingIntervalMilliseconds = 30000, int timeoutIntervalMilliseconds = 120000) : base(logger) { _behaviors = new ConcurrentDictionary>(); _clients = new ConcurrentDictionary(); + _clientsLastReceived = new ConcurrentDictionary(); + _clientCancellationTokenSources = new ConcurrentDictionary(); _queueThrottleLimit = queueThrottleLimitBytes; + _pingIntervalTimeSpan = TimeSpan.FromMilliseconds(pingIntervalMilliseconds); + _timeoutIntervalTimeSpan = TimeSpan.FromMilliseconds(timeoutIntervalMilliseconds); + + _healthTimer = new System.Timers.Timer(pingIntervalMilliseconds); + + _healthTimer.Elapsed += _healthTimer_Elapsed; + + _healthTimer.Start(); + } + + private void _healthTimer_Elapsed(object sender, System.Timers.ElapsedEventArgs e) + { + foreach(var kvp in _clients) + { + if (_clientsLastReceived.TryGetValue(kvp.Key, out DateTime lastReceived)) + { + var lastReceivedDifference = DateTime.UtcNow - lastReceived; + + if (lastReceivedDifference > _timeoutIntervalTimeSpan) + { + if (_clientCancellationTokenSources.TryGetValue(kvp.Key, out CancellationTokenSource clientCancellationTokenSource)) + { + _logInfo($"Health check triggering client cancellation: {kvp.Key}."); + + clientCancellationTokenSource.Cancel(); + } + } + else if (lastReceivedDifference > _pingIntervalTimeSpan) + { + SendStringAsync(kvp.Key, ""); + } + } + else + { + updateLastReceived(kvp.Key); + + SendStringAsync(kvp.Key, ""); + } + } } public IList GetActiveConnectionIds() @@ -193,6 +241,11 @@ private async Task listenLoop(WebListener listener, CancellationToken tok) _logInfo($"Listening loop stopped."); } + private void updateLastReceived(Guid connectionId) + { + _clientsLastReceived.AddOrUpdate(connectionId, DateTime.UtcNow, (key, oldValue) => DateTime.UtcNow); + } + private async Task handleClient(RequestContext requestContext, Func behaviorBuilder, CancellationToken token) where TWebSocketBehavior : WebListenerWebSocketServerBehavior { @@ -245,15 +298,20 @@ private async Task handleClient(RequestContext requestContex return; } + var clientCancellationTokenSource = new CancellationTokenSource(); + try { + _clientCancellationTokenSources.TryAdd(connectionId, clientCancellationTokenSource); + using (webSocket) { var stringBehavior = MakeSafe(behavior.OnStringMessage, "behavior.OnStringMessage"); var binaryBehavior = MakeSafe(behavior.OnBinaryMessage, "behavior.OnBinaryMessage"); + var healthBehavior = MakeSafe((r) => { updateLastReceived(r.ConnectionId); behavior.OnHealthMessage(r); }, "behavior.OnHealthMessage"); var closeBehavior = MakeSafe((r) => behavior.OnClose(new WebSocketClosedEventArgs(connectionId, r)), "behavior.OnClose"); - await webSocket.ProcessIncomingMessages(_messageQueue, connectionId, stringBehavior, binaryBehavior, closeBehavior, _logInfo, token); + await webSocket.ProcessIncomingMessages(_messageQueue, connectionId, stringBehavior, binaryBehavior, healthBehavior, closeBehavior, _logInfo, CancellationTokenSource.CreateLinkedTokenSource(token, clientCancellationTokenSource.Token).Token); } } finally @@ -271,6 +329,11 @@ private async Task handleClient(RequestContext requestContex } _logInfo($"Completed HandleClient task for connection id '{connectionId}'."); + + _clientCancellationTokenSources.TryRemove(connectionId, out clientCancellationTokenSource); + + webSocket.Dispose(); + clientCancellationTokenSource.Dispose(); } } @@ -300,6 +363,7 @@ public void Dispose() if (!_isDisposing) { _isDisposing = true; + _healthTimer.Dispose(); stopListeningThread(); _messageQueue?.CompleteAdding(); } diff --git a/src/WebSocketExtensions/WebListenerWebSocketServerBehavior.cs b/src/WebSocketExtensions/WebListenerWebSocketServerBehavior.cs index 7b2b247..d5555ba 100644 --- a/src/WebSocketExtensions/WebListenerWebSocketServerBehavior.cs +++ b/src/WebSocketExtensions/WebListenerWebSocketServerBehavior.cs @@ -12,6 +12,7 @@ public virtual void OnConnectionEstablished(Guid connectionId, RequestContext re public virtual bool OnValidateContext(RequestContext requestContext, ref int errorStatusCode, ref string statusDescription) { return true; } public virtual void OnStringMessage(StringMessageReceivedEventArgs e) { } public virtual void OnBinaryMessage(BinaryMessageReceivedEventArgs e) { } + public virtual void OnHealthMessage(HealthMessageReceivedEventArgs e) { } public virtual void OnClose(WebSocketClosedEventArgs e) { } public virtual void OnError(ErrorEventArgs e) { } } diff --git a/src/WebSocketExtensions/WebSocketClient.cs b/src/WebSocketExtensions/WebSocketClient.cs index 0027ba7..0967f49 100644 --- a/src/WebSocketExtensions/WebSocketClient.cs +++ b/src/WebSocketExtensions/WebSocketClient.cs @@ -10,6 +10,7 @@ public class WebSocketClient : WebSocketReciever, IDisposable { public Action MessageHandler { get; set; } = (e) => { }; public Action BinaryHandler { get; set; } = (e) => { }; + public Action HealthHandler { get; set; } = (e) => { }; public Action CloseHandler { get; set; } = (e) => { }; public Action ConfigureOptionsBeforeConnect { get; set; } = (e) => { }; @@ -38,11 +39,12 @@ public WebSocketClient(Action logger = null, Guid? clientId = null var messageBehavior = MakeSafe(MessageHandler, "MessageHandler"); var binaryBehavior = MakeSafe(BinaryHandler, "BinaryHandler"); + var healthBehavior = MakeSafe((e) => { _client.SendStringAsync(""); HealthHandler(e); }, "HealthHandler"); _closeBehavior = MakeSafe(CloseHandler, "CloseHandler"); _messageQueue = new PagingMessageQueue("WebSocketClient", _logError, _recieveQueueLimitBytes); - _incomingMessagesTask = Task.Factory.StartNew(async () => await _client.ProcessIncomingMessages(_messageQueue, _clientId, messageBehavior, binaryBehavior, _closeBehavior, _logInfo, _cancellationTokenSource.Token)); + _incomingMessagesTask = Task.Factory.StartNew(async () => await _client.ProcessIncomingMessages(_messageQueue, _clientId, messageBehavior, binaryBehavior, healthBehavior, _closeBehavior, _logInfo, _cancellationTokenSource.Token)); } public Action MakeSafe(Action torun, string handlerName) diff --git a/src/WebSocketExtensions/WebSocketExtensions.csproj b/src/WebSocketExtensions/WebSocketExtensions.csproj index 9244869..fedc18f 100644 --- a/src/WebSocketExtensions/WebSocketExtensions.csproj +++ b/src/WebSocketExtensions/WebSocketExtensions.csproj @@ -13,9 +13,9 @@ https://github.com/maxfridbe/websocketextensions git Feel free to contribute, Integration Tests Work but no other gaurentees. - 2.1.6.0 - 2.1.6.0 - 2.1.6.0 + 2.1.7.0 + 2.1.7.0 + 2.1.7.0 true snupkg diff --git a/src/WebSocketExtensions/WebSocketMessage.cs b/src/WebSocketExtensions/WebSocketMessage.cs index f014ede..fc9daff 100644 --- a/src/WebSocketExtensions/WebSocketMessage.cs +++ b/src/WebSocketExtensions/WebSocketMessage.cs @@ -8,6 +8,7 @@ public class WebSocketMessage : IDisposable { public Action StringBehavior { get; private set; } public Action BinaryBehavior { get; private set; } + public Action HealthBehavior { get; private set; } public Guid ConnectionId { get; private set; } public bool InMemory => _bindata != null; @@ -25,6 +26,11 @@ public class WebSocketMessage : IDisposable private string _pagePath = null; private static string PAGING_TEMP_PATH = Path.GetTempPath(); + public WebSocketMessage(Guid connectionId) + { + ConnectionId = connectionId; + } + public WebSocketMessage(byte[] data, Guid connectionId) { _bindata = data; ConnectionId = connectionId; @@ -72,10 +78,12 @@ public byte[] GetBinData() public void SetMessageHandlers( Action messageBehavior, Action binaryBehavior, + Action healthBehavior, WebSocket webSocket) { StringBehavior = messageBehavior; BinaryBehavior = binaryBehavior; + HealthBehavior = healthBehavior; _webSocket = webSocket; } @@ -93,9 +101,13 @@ public void HandleMessage(Action logError) } else if (Exception != null) { - logError($"Exception in read thread of connection {ConnectionId}:\r\n {ExceptionMessage}\r\n{Exception}\r\n{ (Exception.InnerException != null ? Exception.InnerException.ToString() : String.Empty) }"); } + else + { + var args = new HealthMessageReceivedEventArgs(_webSocket, ConnectionId); + HealthBehavior(args); + } } public void Dispose() @@ -107,6 +119,7 @@ public void Dispose() _bindata = null; StringBehavior = null; BinaryBehavior = null; + HealthBehavior = null; } } }