diff --git a/source/TS.NET.Engine/EngineManager.cs b/source/TS.NET.Engine/EngineManager.cs index 4d2de61..5171db3 100644 --- a/source/TS.NET.Engine/EngineManager.cs +++ b/source/TS.NET.Engine/EngineManager.cs @@ -195,7 +195,7 @@ public bool TryStart(string configurationFile, string calibrationFile, string de switch (thunderscopeSettings.WaveformBufferReader) { case "DataServer": - DataServer dataServer = new(loggerFactory.CreateLogger(nameof(DataServer)), thunderscopeSettings, System.Net.IPAddress.Any, 5026, captureBuffer); + DataServer dataServer = new(loggerFactory.CreateLogger(nameof(DataServer)), thunderscopeSettings, System.Net.IPAddress.Any, 5026, captureBuffer, scpiServer); waveformBufferReader = dataServer; break; case "None": diff --git a/source/TS.NET.Engine/Threads/ScpiServer.cs b/source/TS.NET.Engine/Threads/ScpiServer.cs index fea4123..02c70ce 100644 --- a/source/TS.NET.Engine/Threads/ScpiServer.cs +++ b/source/TS.NET.Engine/Threads/ScpiServer.cs @@ -13,6 +13,8 @@ class ScpiServer : TcpServer, IThread private readonly BlockingRequestResponse hardwareControl; private readonly BlockingRequestResponse processingControl; + private uint sequence = 0; + public ScpiServer(ILogger logger, ThunderscopeSettings settings, IPAddress address, @@ -47,6 +49,16 @@ public void Start(SemaphoreSlim startSemaphore) { base.Stop(); } + + public void OnUpdateSequence(uint seq) + { + sequence = seq; + } + + public uint GetSequence() + { + return sequence; + } } internal class ScpiSession : TcpSession @@ -55,9 +67,10 @@ internal class ScpiSession : TcpSession private readonly BlockingRequestResponse hardwareControl; private readonly BlockingRequestResponse processingControl; private readonly ThunderscopeSettings settings; + private ScpiServer server; public ScpiSession( - TcpServer server, + ScpiServer server, ILogger logger, ThunderscopeSettings settings, BlockingRequestResponse hardwareControl, @@ -67,6 +80,7 @@ public ScpiSession( this.settings = settings; this.hardwareControl = hardwareControl; this.processingControl = processingControl; + this.server = server; } protected override void OnConnected() @@ -103,7 +117,7 @@ protected override void OnError(SocketError error) logger.LogDebug($"SCPI session caught an error with code {error}"); } - public static string? ProcessSCPICommand( + public string? ProcessSCPICommand( ILogger logger, ThunderscopeSettings settings, BlockingRequestResponse hardwareControl, @@ -629,6 +643,11 @@ protected override void OnError(SocketError error) logger.LogError($"MODE? - No response from {nameof(processingControl.Response.Reader)}"); return "Error: No/bad response from channel.\n"; } + case "SEQNUM?": + { + uint seq = this.server.GetSequence(); + return $"{seq}\n"; + } } } else if (subject?.StartsWith("ACQ") == true) diff --git a/source/TS.NET.Engine/Waveform Buffer Readers/DataServer.cs b/source/TS.NET.Engine/Waveform Buffer Readers/DataServer.cs index 5b0f3dd..387a4f3 100644 --- a/source/TS.NET.Engine/Waveform Buffer Readers/DataServer.cs +++ b/source/TS.NET.Engine/Waveform Buffer Readers/DataServer.cs @@ -12,6 +12,7 @@ internal class DataServer : IThread private readonly IPAddress address; private readonly int port; private readonly ICaptureBufferReader captureBuffer; + private readonly ScpiServer scpi; private CancellationTokenSource? listenerCancelTokenSource; private Task? taskListener; @@ -26,13 +27,15 @@ public DataServer( ThunderscopeSettings settings, IPAddress address, int port, - ICaptureBufferReader captureBuffer) + ICaptureBufferReader captureBuffer, + ScpiServer scpi) { this.logger = logger; this.settings = settings; this.address = address; this.port = port; this.captureBuffer = captureBuffer; + this.scpi = scpi; } public void Start(SemaphoreSlim startSemaphore) @@ -99,31 +102,85 @@ private void LoopListener(ILogger logger, CancellationToken cancelToken) } } + private void CheckForACKs(ILogger logger, Socket socket) + { + //See if we have data ready to read. Grab the ACKs if so (may be >1 queued) + Span ack = stackalloc byte[4]; + while(socket.Poll(1000, SelectMode.SelectRead)) + { + //Get the ACK number (if we see one). + //TODO: this assumes all 4 bytes are always in the same TCP segment. Probably reasonable + //but for max robustness we'd want to handle partial acks somehow + int read = 0; + read = socket.Receive(ack); + if(read == 4) + { + nack = BitConverter.ToUInt32(ack); + inflight = sequenceNumber - nack; + logger.LogInformation($"Got ACK: {nack}, last sequenceNumber={sequenceNumber}, {inflight} in flight"); + } + else + logger.LogInformation("TODO handle partial read"); + } + } + + private uint nack = 0; + private uint inflight = 0; private void LoopSession(ILogger logger, Socket socket, CancellationToken cancelToken) { string sessionID = socket.RemoteEndPoint?.ToString() ?? "Unknown"; + try { Span cmdBuf = stackalloc byte[1]; + bool creditMode = false; while (true) { cancelToken.ThrowIfCancellationRequested(); - int read = 0; - read = socket.Receive(cmdBuf); - if (read == 0) - break; - byte cmd = cmdBuf[0]; - switch (cmd) + //New credit-based flow control path + if(creditMode) { - case (byte)'K': - SendScopehalOld(socket, cancelToken); - break; - case (byte)'S': + CheckForACKs(logger, socket); + inflight = sequenceNumber - nack; + + //Figure out how many un-acked waveforms we have, block if >5 in flight + //TODO: tune this for latency/throughput tradeoff? + if(inflight >= 5) + continue; + + //Send data + else + { SendScopehal(socket, cancelToken); + logger.LogInformation($"Sending waveform (seq={sequenceNumber})"); + scpi.OnUpdateSequence(sequenceNumber); + } + } + + //Legacy path (plus entry to credit mode) + else + { + int read = 0; + read = socket.Receive(cmdBuf); + if (read == 0) break; - default: - break; + + byte cmd = cmdBuf[0]; + switch (cmd) + { + case (byte)'K': + SendScopehalOld(socket, cancelToken); + break; + case (byte)'S': + SendScopehal(socket, cancelToken); + break; + case (byte)'C': + creditMode = true; + break; + default: + break; + } } } }