Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@ TODO

### Fixed

- Fixed error handling for additional rounds. Cleanup.
[#32](https://github.com/netsec-ethz/scion-java-multiping/pull/32)

### Fixed

- Fixed mangled output with mode SHOW_SCMP_ONLY
[#31](https://github.com/netsec-ethz/scion-java-multiping/pull/31)
- Fixed occurrence of ISD=0 / "AS not listed"
Expand Down
106 changes: 64 additions & 42 deletions src/main/java/org/scion/multiping/PingAll.java
Original file line number Diff line number Diff line change
Expand Up @@ -212,8 +212,13 @@ private void runAS(ParseAssignments.HostEntry remote) throws IOException {
if (msgs[0] != null && bestPath.get() != null && REPEAT > 1) {
try (ScionProvider.Sync sender = service.getSync()) {
for (int i = 1; i < msgs.length; i++) {
List<Scmp.TracerouteMessage> messages = sender.sendTracerouteRequest(bestPath.get());
msgs[i] = messages.get(messages.size() - 1);
try {
List<Scmp.TracerouteMessage> messages = sender.sendTracerouteRequest(bestPath.get());
msgs[i] = messages.get(messages.size() - 1);
} catch (IOException e) {
msgs[i] = Scmp.TracerouteMessage.createEmpty(bestPath.get());
msgs[i].setTimedOut(1_000_000_000);
}
}
}
}
Expand Down Expand Up @@ -284,7 +289,7 @@ private Scmp.TimedMessage findPaths(List<Path> paths, Ref<Path> bestOut, long is
case FASTEST_TR:
return findFastestTR(paths, bestOut, isdAs);
case FASTEST_TR_ASYNC:
return findFastestTRasync(paths, bestOut, isdAs);
return findFastestTraceAsync(paths, bestOut, isdAs);
case SHORTEST_TR:
return findShortestTR(paths, bestOut, isdAs);
case SHORTEST_ECHO:
Expand Down Expand Up @@ -374,37 +379,9 @@ private Scmp.TracerouteMessage findFastestTR(List<Path> paths, Ref<Path> refBest
}
}

private Scmp.TracerouteMessage findFastestTRasync(
private Scmp.TracerouteMessage findFastestTraceAsync(
List<Path> paths, Ref<Path> refBest, long isdAs) {
ConcurrentHashMap<Integer, Scmp.TimedMessage> messages = new ConcurrentHashMap<>();
CountDownLatch barrier = new CountDownLatch(paths.size());
AtomicInteger errors = new AtomicInteger();
ScmpSenderAsync.ResponseHandler handler =
new ScmpSenderAsync.ResponseHandler() {
@Override
public void onResponse(Scmp.TimedMessage msg) {
barrier.countDown();
messages.put(msg.getSequenceNumber(), msg);
}

@Override
public void onTimeout(Scmp.TimedMessage msg) {
barrier.countDown();
messages.put(msg.getSequenceNumber(), msg);
}

@Override
public void onError(Scmp.ErrorMessage msg) {
errors.incrementAndGet();
barrier.countDown();
}

@Override
public void onException(Throwable t) {
errors.incrementAndGet();
barrier.countDown();
}
};
PingResponseHandler handler = new PingResponseHandler(paths.size());

// Send all requests
try (ScionProvider.Async sender = service.getAsync(handler)) {
Expand All @@ -414,26 +391,20 @@ public void onException(Throwable t) {
}

// Wait for all messages to be received, BEFORE closing the "sender".
if (!barrier.await(1100, TimeUnit.MILLISECONDS)) {
throw new IllegalStateException(
"Missing messages: " + barrier.getCount() + "/" + paths.size());
}
handler.await();
} catch (IOException e) {
println("ERROR: " + e.getMessage());
summary.incAsError(isdAs);
return null;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException(e);
}

if (errors.get() > 0 && messages.isEmpty()) {
if (handler.hasErrors() && handler.messages.isEmpty()) {
summary.incAsError(isdAs);
return null;
}

Scmp.TracerouteMessage best = null;
for (Scmp.TimedMessage tm : messages.values()) {
for (Scmp.TimedMessage tm : handler.messages.values()) {
Scmp.TracerouteMessage msg = (Scmp.TracerouteMessage) tm;
summary.checkTotalMax(msg.getIsdAs(), msg);

Expand All @@ -455,4 +426,55 @@ public void onException(Throwable t) {
}
return best;
}

private static class PingResponseHandler implements ScmpSenderAsync.ResponseHandler {
private final Map<Integer, Scmp.TimedMessage> messages = new ConcurrentHashMap<>();
private final CountDownLatch barrier;
private final AtomicInteger errors = new AtomicInteger();
private final int nPaths;

private PingResponseHandler(int nPaths) {
this.nPaths = nPaths;
barrier = new CountDownLatch(nPaths);
}

@Override
public void onResponse(Scmp.TimedMessage msg) {
barrier.countDown();
messages.put(msg.getSequenceNumber(), msg);
}

@Override
public void onTimeout(Scmp.TimedMessage msg) {
barrier.countDown();
messages.put(msg.getSequenceNumber(), msg);
}

@Override
public void onError(Scmp.ErrorMessage msg) {
errors.incrementAndGet();
barrier.countDown();
}

@Override
public void onException(Throwable t) {
errors.incrementAndGet();
barrier.countDown();
}

void await() {
try {
if (!barrier.await(1100, TimeUnit.MILLISECONDS)) {
throw new IllegalStateException("Missing messages: " + barrier.getCount() + "/" + nPaths);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException(e);
}
}

public boolean hasErrors() {
return errors.get() > 0;
}
}
}