Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion source/TS.NET.Engine/EngineManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ public bool TryStart(string configurationFile, string calibrationFile, string de
switch (thunderscopeSettings.WaveformBufferReader)
{
case "DataServer":
DataServer dataServer = new(loggerFactory.CreateLogger(nameof(DataServer)), thunderscopeSettings, System.Net.IPAddress.Any, 5026, captureBuffer);
DataServer dataServer = new(loggerFactory.CreateLogger(nameof(DataServer)), thunderscopeSettings, System.Net.IPAddress.Any, 5026, captureBuffer, scpiServer);
waveformBufferReader = dataServer;
break;
case "None":
Expand Down
23 changes: 21 additions & 2 deletions source/TS.NET.Engine/Threads/ScpiServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ class ScpiServer : TcpServer, IThread
private readonly BlockingRequestResponse<HardwareRequestDto, HardwareResponseDto> hardwareControl;
private readonly BlockingRequestResponse<ProcessingRequestDto, ProcessingResponseDto> processingControl;

private uint sequence = 0;

public ScpiServer(ILogger logger,
ThunderscopeSettings settings,
IPAddress address,
Expand Down Expand Up @@ -47,6 +49,16 @@ public void Start(SemaphoreSlim startSemaphore)
{
base.Stop();
}

public void OnUpdateSequence(uint seq)
{
sequence = seq;
}

public uint GetSequence()
{
return sequence;
}
}

internal class ScpiSession : TcpSession
Expand All @@ -55,9 +67,10 @@ internal class ScpiSession : TcpSession
private readonly BlockingRequestResponse<HardwareRequestDto, HardwareResponseDto> hardwareControl;
private readonly BlockingRequestResponse<ProcessingRequestDto, ProcessingResponseDto> processingControl;
private readonly ThunderscopeSettings settings;
private ScpiServer server;

public ScpiSession(
TcpServer server,
ScpiServer server,
ILogger logger,
ThunderscopeSettings settings,
BlockingRequestResponse<HardwareRequestDto, HardwareResponseDto> hardwareControl,
Expand All @@ -67,6 +80,7 @@ public ScpiSession(
this.settings = settings;
this.hardwareControl = hardwareControl;
this.processingControl = processingControl;
this.server = server;
}

protected override void OnConnected()
Expand Down Expand Up @@ -103,7 +117,7 @@ protected override void OnError(SocketError error)
logger.LogDebug($"SCPI session caught an error with code {error}");
}

public static string? ProcessSCPICommand(
public string? ProcessSCPICommand(
ILogger logger,
ThunderscopeSettings settings,
BlockingRequestResponse<HardwareRequestDto, HardwareResponseDto> hardwareControl,
Expand Down Expand Up @@ -629,6 +643,11 @@ protected override void OnError(SocketError error)
logger.LogError($"MODE? - No response from {nameof(processingControl.Response.Reader)}");
return "Error: No/bad response from channel.\n";
}
case "SEQNUM?":
{
uint seq = this.server.GetSequence();
return $"{seq}\n";
}
}
}
else if (subject?.StartsWith("ACQ") == true)
Expand Down
83 changes: 70 additions & 13 deletions source/TS.NET.Engine/Waveform Buffer Readers/DataServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ internal class DataServer : IThread
private readonly IPAddress address;
private readonly int port;
private readonly ICaptureBufferReader captureBuffer;
private readonly ScpiServer scpi;

private CancellationTokenSource? listenerCancelTokenSource;
private Task? taskListener;
Expand All @@ -26,13 +27,15 @@ public DataServer(
ThunderscopeSettings settings,
IPAddress address,
int port,
ICaptureBufferReader captureBuffer)
ICaptureBufferReader captureBuffer,
ScpiServer scpi)
{
this.logger = logger;
this.settings = settings;
this.address = address;
this.port = port;
this.captureBuffer = captureBuffer;
this.scpi = scpi;
}

public void Start(SemaphoreSlim startSemaphore)
Expand Down Expand Up @@ -99,31 +102,85 @@ private void LoopListener(ILogger logger, CancellationToken cancelToken)
}
}

private void CheckForACKs(ILogger logger, Socket socket)
{
//See if we have data ready to read. Grab the ACKs if so (may be >1 queued)
Span<byte> ack = stackalloc byte[4];
while(socket.Poll(1000, SelectMode.SelectRead))
{
//Get the ACK number (if we see one).
//TODO: this assumes all 4 bytes are always in the same TCP segment. Probably reasonable
//but for max robustness we'd want to handle partial acks somehow
int read = 0;
read = socket.Receive(ack);
if(read == 4)
{
nack = BitConverter.ToUInt32(ack);
inflight = sequenceNumber - nack;
logger.LogInformation($"Got ACK: {nack}, last sequenceNumber={sequenceNumber}, {inflight} in flight");
}
else
logger.LogInformation("TODO handle partial read");
}
}

private uint nack = 0;
private uint inflight = 0;
private void LoopSession(ILogger logger, Socket socket, CancellationToken cancelToken)
{
string sessionID = socket.RemoteEndPoint?.ToString() ?? "Unknown";

try
{
Span<byte> cmdBuf = stackalloc byte[1];
bool creditMode = false;
while (true)
{
cancelToken.ThrowIfCancellationRequested();
int read = 0;
read = socket.Receive(cmdBuf);
if (read == 0)
break;

byte cmd = cmdBuf[0];
switch (cmd)
//New credit-based flow control path
if(creditMode)
{
case (byte)'K':
SendScopehalOld(socket, cancelToken);
break;
case (byte)'S':
CheckForACKs(logger, socket);
inflight = sequenceNumber - nack;

//Figure out how many un-acked waveforms we have, block if >5 in flight
//TODO: tune this for latency/throughput tradeoff?
if(inflight >= 5)
continue;

//Send data
else
{
SendScopehal(socket, cancelToken);
logger.LogInformation($"Sending waveform (seq={sequenceNumber})");
scpi.OnUpdateSequence(sequenceNumber);
}
}

//Legacy path (plus entry to credit mode)
else
{
int read = 0;
read = socket.Receive(cmdBuf);
if (read == 0)
break;
default:
break;

byte cmd = cmdBuf[0];
switch (cmd)
{
case (byte)'K':
SendScopehalOld(socket, cancelToken);
break;
case (byte)'S':
SendScopehal(socket, cancelToken);
break;
case (byte)'C':
creditMode = true;
break;
default:
break;
}
}
}
}
Expand Down