diff --git a/CHANGELOG.md b/CHANGELOG.md index a3cc7ac..9d6e7b4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,11 @@ TODO ### Fixed +- Fixed error handling for additional rounds. Cleanup. + [#32](https://github.com/netsec-ethz/scion-java-multiping/pull/32) + +### Fixed + - Fixed mangled output with mode SHOW_SCMP_ONLY [#31](https://github.com/netsec-ethz/scion-java-multiping/pull/31) - Fixed occurrence of ISD=0 / "AS not listed" diff --git a/src/main/java/org/scion/multiping/PingAll.java b/src/main/java/org/scion/multiping/PingAll.java index 3d4a45e..985dbbc 100644 --- a/src/main/java/org/scion/multiping/PingAll.java +++ b/src/main/java/org/scion/multiping/PingAll.java @@ -212,8 +212,13 @@ private void runAS(ParseAssignments.HostEntry remote) throws IOException { if (msgs[0] != null && bestPath.get() != null && REPEAT > 1) { try (ScionProvider.Sync sender = service.getSync()) { for (int i = 1; i < msgs.length; i++) { - List messages = sender.sendTracerouteRequest(bestPath.get()); - msgs[i] = messages.get(messages.size() - 1); + try { + List messages = sender.sendTracerouteRequest(bestPath.get()); + msgs[i] = messages.get(messages.size() - 1); + } catch (IOException e) { + msgs[i] = Scmp.TracerouteMessage.createEmpty(bestPath.get()); + msgs[i].setTimedOut(1_000_000_000); + } } } } @@ -284,7 +289,7 @@ private Scmp.TimedMessage findPaths(List paths, Ref bestOut, long is case FASTEST_TR: return findFastestTR(paths, bestOut, isdAs); case FASTEST_TR_ASYNC: - return findFastestTRasync(paths, bestOut, isdAs); + return findFastestTraceAsync(paths, bestOut, isdAs); case SHORTEST_TR: return findShortestTR(paths, bestOut, isdAs); case SHORTEST_ECHO: @@ -374,37 +379,9 @@ private Scmp.TracerouteMessage findFastestTR(List paths, Ref refBest } } - private Scmp.TracerouteMessage findFastestTRasync( + private Scmp.TracerouteMessage findFastestTraceAsync( List paths, Ref refBest, long isdAs) { - ConcurrentHashMap messages = new ConcurrentHashMap<>(); - CountDownLatch barrier = new CountDownLatch(paths.size()); - AtomicInteger errors = new AtomicInteger(); - ScmpSenderAsync.ResponseHandler handler = - new ScmpSenderAsync.ResponseHandler() { - @Override - public void onResponse(Scmp.TimedMessage msg) { - barrier.countDown(); - messages.put(msg.getSequenceNumber(), msg); - } - - @Override - public void onTimeout(Scmp.TimedMessage msg) { - barrier.countDown(); - messages.put(msg.getSequenceNumber(), msg); - } - - @Override - public void onError(Scmp.ErrorMessage msg) { - errors.incrementAndGet(); - barrier.countDown(); - } - - @Override - public void onException(Throwable t) { - errors.incrementAndGet(); - barrier.countDown(); - } - }; + PingResponseHandler handler = new PingResponseHandler(paths.size()); // Send all requests try (ScionProvider.Async sender = service.getAsync(handler)) { @@ -414,26 +391,20 @@ public void onException(Throwable t) { } // Wait for all messages to be received, BEFORE closing the "sender". - if (!barrier.await(1100, TimeUnit.MILLISECONDS)) { - throw new IllegalStateException( - "Missing messages: " + barrier.getCount() + "/" + paths.size()); - } + handler.await(); } catch (IOException e) { println("ERROR: " + e.getMessage()); summary.incAsError(isdAs); return null; - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new IllegalStateException(e); } - if (errors.get() > 0 && messages.isEmpty()) { + if (handler.hasErrors() && handler.messages.isEmpty()) { summary.incAsError(isdAs); return null; } Scmp.TracerouteMessage best = null; - for (Scmp.TimedMessage tm : messages.values()) { + for (Scmp.TimedMessage tm : handler.messages.values()) { Scmp.TracerouteMessage msg = (Scmp.TracerouteMessage) tm; summary.checkTotalMax(msg.getIsdAs(), msg); @@ -455,4 +426,55 @@ public void onException(Throwable t) { } return best; } + + private static class PingResponseHandler implements ScmpSenderAsync.ResponseHandler { + private final Map messages = new ConcurrentHashMap<>(); + private final CountDownLatch barrier; + private final AtomicInteger errors = new AtomicInteger(); + private final int nPaths; + + private PingResponseHandler(int nPaths) { + this.nPaths = nPaths; + barrier = new CountDownLatch(nPaths); + } + + @Override + public void onResponse(Scmp.TimedMessage msg) { + barrier.countDown(); + messages.put(msg.getSequenceNumber(), msg); + } + + @Override + public void onTimeout(Scmp.TimedMessage msg) { + barrier.countDown(); + messages.put(msg.getSequenceNumber(), msg); + } + + @Override + public void onError(Scmp.ErrorMessage msg) { + errors.incrementAndGet(); + barrier.countDown(); + } + + @Override + public void onException(Throwable t) { + errors.incrementAndGet(); + barrier.countDown(); + } + + void await() { + try { + if (!barrier.await(1100, TimeUnit.MILLISECONDS)) { + throw new IllegalStateException("Missing messages: " + barrier.getCount() + "/" + nPaths); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IllegalStateException(e); + } + } + + public boolean hasErrors() { + return errors.get() > 0; + } + } }