From 5456e7646c3a34e2b8150eca141e939efa9ae9f1 Mon Sep 17 00:00:00 2001 From: Andrew Zonenberg Date: Mon, 29 Dec 2025 23:45:48 -0800 Subject: [PATCH 1/2] Initial version of credit based flow control. Doesn't yet include flush logic to clear queued waveforms when trigger is disarmed --- .../Waveform Buffer Readers/DataServer.cs | 69 +++++++++++++++---- 1 file changed, 57 insertions(+), 12 deletions(-) diff --git a/source/TS.NET.Engine/Waveform Buffer Readers/DataServer.cs b/source/TS.NET.Engine/Waveform Buffer Readers/DataServer.cs index 5b0f3dd..da687c2 100644 --- a/source/TS.NET.Engine/Waveform Buffer Readers/DataServer.cs +++ b/source/TS.NET.Engine/Waveform Buffer Readers/DataServer.cs @@ -102,28 +102,73 @@ private void LoopListener(ILogger logger, CancellationToken cancelToken) private void LoopSession(ILogger logger, Socket socket, CancellationToken cancelToken) { string sessionID = socket.RemoteEndPoint?.ToString() ?? "Unknown"; + try { + Span ack = stackalloc byte[4]; Span cmdBuf = stackalloc byte[1]; + uint nack = 0; + uint inflight = 0; + bool creditMode = false; while (true) { cancelToken.ThrowIfCancellationRequested(); - int read = 0; - read = socket.Receive(cmdBuf); - if (read == 0) - break; - byte cmd = cmdBuf[0]; - switch (cmd) + //New credit-based flow control path + if(creditMode) { - case (byte)'K': - SendScopehalOld(socket, cancelToken); - break; - case (byte)'S': + //See if we have data ready to read. Grab the ACKs if so (may be >1 queued) + while(socket.Poll(1000, SelectMode.SelectRead)) + { + //Get the ACK number (if we see one). + //TODO: this assumes all 4 bytes are always in the same TCP segment. Probably reasonable + //but for max robustness we'd want to handle partial acks somehow + int read = 0; + read = socket.Receive(ack); + if(read == 4) + { + nack = BitConverter.ToUInt32(ack); + inflight = sequenceNumber - nack; + logger.LogInformation($"Got ACK: {nack}, last sequenceNumber={sequenceNumber}, {inflight} in flight"); + } + else + logger.LogInformation("TODO handle partial read"); + } + inflight = sequenceNumber - nack; + + //Figure out how many un-acked waveforms we have, block if >5 in flight + if(inflight >= 5) + { + //logger.LogInformation($"{inflight} waveforms in flight, blocking"); + continue; + } + else SendScopehal(socket, cancelToken); + } + + //Legacy path (plus entry to credit mode) + else + { + int read = 0; + read = socket.Receive(cmdBuf); + if (read == 0) break; - default: - break; + + byte cmd = cmdBuf[0]; + switch (cmd) + { + case (byte)'K': + SendScopehalOld(socket, cancelToken); + break; + case (byte)'S': + SendScopehal(socket, cancelToken); + break; + case (byte)'C': + creditMode = true; + break; + default: + break; + } } } } From 76355a84b9022592d789eae41bcaee45434891e1 Mon Sep 17 00:00:00 2001 From: Andrew Zonenberg Date: Tue, 30 Dec 2025 01:39:35 -0800 Subject: [PATCH 2/2] Finished initial version of credit-based flow control --- source/TS.NET.Engine/EngineManager.cs | 2 +- source/TS.NET.Engine/Threads/ScpiServer.cs | 23 ++++++- .../Waveform Buffer Readers/DataServer.cs | 60 +++++++++++-------- 3 files changed, 58 insertions(+), 27 deletions(-) diff --git a/source/TS.NET.Engine/EngineManager.cs b/source/TS.NET.Engine/EngineManager.cs index 4d2de61..5171db3 100644 --- a/source/TS.NET.Engine/EngineManager.cs +++ b/source/TS.NET.Engine/EngineManager.cs @@ -195,7 +195,7 @@ public bool TryStart(string configurationFile, string calibrationFile, string de switch (thunderscopeSettings.WaveformBufferReader) { case "DataServer": - DataServer dataServer = new(loggerFactory.CreateLogger(nameof(DataServer)), thunderscopeSettings, System.Net.IPAddress.Any, 5026, captureBuffer); + DataServer dataServer = new(loggerFactory.CreateLogger(nameof(DataServer)), thunderscopeSettings, System.Net.IPAddress.Any, 5026, captureBuffer, scpiServer); waveformBufferReader = dataServer; break; case "None": diff --git a/source/TS.NET.Engine/Threads/ScpiServer.cs b/source/TS.NET.Engine/Threads/ScpiServer.cs index fea4123..02c70ce 100644 --- a/source/TS.NET.Engine/Threads/ScpiServer.cs +++ b/source/TS.NET.Engine/Threads/ScpiServer.cs @@ -13,6 +13,8 @@ class ScpiServer : TcpServer, IThread private readonly BlockingRequestResponse hardwareControl; private readonly BlockingRequestResponse processingControl; + private uint sequence = 0; + public ScpiServer(ILogger logger, ThunderscopeSettings settings, IPAddress address, @@ -47,6 +49,16 @@ public void Start(SemaphoreSlim startSemaphore) { base.Stop(); } + + public void OnUpdateSequence(uint seq) + { + sequence = seq; + } + + public uint GetSequence() + { + return sequence; + } } internal class ScpiSession : TcpSession @@ -55,9 +67,10 @@ internal class ScpiSession : TcpSession private readonly BlockingRequestResponse hardwareControl; private readonly BlockingRequestResponse processingControl; private readonly ThunderscopeSettings settings; + private ScpiServer server; public ScpiSession( - TcpServer server, + ScpiServer server, ILogger logger, ThunderscopeSettings settings, BlockingRequestResponse hardwareControl, @@ -67,6 +80,7 @@ public ScpiSession( this.settings = settings; this.hardwareControl = hardwareControl; this.processingControl = processingControl; + this.server = server; } protected override void OnConnected() @@ -103,7 +117,7 @@ protected override void OnError(SocketError error) logger.LogDebug($"SCPI session caught an error with code {error}"); } - public static string? ProcessSCPICommand( + public string? ProcessSCPICommand( ILogger logger, ThunderscopeSettings settings, BlockingRequestResponse hardwareControl, @@ -629,6 +643,11 @@ protected override void OnError(SocketError error) logger.LogError($"MODE? - No response from {nameof(processingControl.Response.Reader)}"); return "Error: No/bad response from channel.\n"; } + case "SEQNUM?": + { + uint seq = this.server.GetSequence(); + return $"{seq}\n"; + } } } else if (subject?.StartsWith("ACQ") == true) diff --git a/source/TS.NET.Engine/Waveform Buffer Readers/DataServer.cs b/source/TS.NET.Engine/Waveform Buffer Readers/DataServer.cs index da687c2..387a4f3 100644 --- a/source/TS.NET.Engine/Waveform Buffer Readers/DataServer.cs +++ b/source/TS.NET.Engine/Waveform Buffer Readers/DataServer.cs @@ -12,6 +12,7 @@ internal class DataServer : IThread private readonly IPAddress address; private readonly int port; private readonly ICaptureBufferReader captureBuffer; + private readonly ScpiServer scpi; private CancellationTokenSource? listenerCancelTokenSource; private Task? taskListener; @@ -26,13 +27,15 @@ public DataServer( ThunderscopeSettings settings, IPAddress address, int port, - ICaptureBufferReader captureBuffer) + ICaptureBufferReader captureBuffer, + ScpiServer scpi) { this.logger = logger; this.settings = settings; this.address = address; this.port = port; this.captureBuffer = captureBuffer; + this.scpi = scpi; } public void Start(SemaphoreSlim startSemaphore) @@ -99,16 +102,37 @@ private void LoopListener(ILogger logger, CancellationToken cancelToken) } } + private void CheckForACKs(ILogger logger, Socket socket) + { + //See if we have data ready to read. Grab the ACKs if so (may be >1 queued) + Span ack = stackalloc byte[4]; + while(socket.Poll(1000, SelectMode.SelectRead)) + { + //Get the ACK number (if we see one). + //TODO: this assumes all 4 bytes are always in the same TCP segment. Probably reasonable + //but for max robustness we'd want to handle partial acks somehow + int read = 0; + read = socket.Receive(ack); + if(read == 4) + { + nack = BitConverter.ToUInt32(ack); + inflight = sequenceNumber - nack; + logger.LogInformation($"Got ACK: {nack}, last sequenceNumber={sequenceNumber}, {inflight} in flight"); + } + else + logger.LogInformation("TODO handle partial read"); + } + } + + private uint nack = 0; + private uint inflight = 0; private void LoopSession(ILogger logger, Socket socket, CancellationToken cancelToken) { string sessionID = socket.RemoteEndPoint?.ToString() ?? "Unknown"; try { - Span ack = stackalloc byte[4]; Span cmdBuf = stackalloc byte[1]; - uint nack = 0; - uint inflight = 0; bool creditMode = false; while (true) { @@ -117,33 +141,21 @@ private void LoopSession(ILogger logger, Socket socket, CancellationToken cancel //New credit-based flow control path if(creditMode) { - //See if we have data ready to read. Grab the ACKs if so (may be >1 queued) - while(socket.Poll(1000, SelectMode.SelectRead)) - { - //Get the ACK number (if we see one). - //TODO: this assumes all 4 bytes are always in the same TCP segment. Probably reasonable - //but for max robustness we'd want to handle partial acks somehow - int read = 0; - read = socket.Receive(ack); - if(read == 4) - { - nack = BitConverter.ToUInt32(ack); - inflight = sequenceNumber - nack; - logger.LogInformation($"Got ACK: {nack}, last sequenceNumber={sequenceNumber}, {inflight} in flight"); - } - else - logger.LogInformation("TODO handle partial read"); - } + CheckForACKs(logger, socket); inflight = sequenceNumber - nack; //Figure out how many un-acked waveforms we have, block if >5 in flight + //TODO: tune this for latency/throughput tradeoff? if(inflight >= 5) - { - //logger.LogInformation($"{inflight} waveforms in flight, blocking"); continue; - } + + //Send data else + { SendScopehal(socket, cancelToken); + logger.LogInformation($"Sending waveform (seq={sequenceNumber})"); + scpi.OnUpdateSequence(sequenceNumber); + } } //Legacy path (plus entry to credit mode)