From 5514d48c4a0a1d5a926283000d817ee22f17e6a1 Mon Sep 17 00:00:00 2001 From: Angel Hudgins Date: Thu, 22 Jan 2026 16:22:40 +0100 Subject: [PATCH 1/2] fix!: MeasureManyStream now properly terminates The stream now returns None when all ping requests have been sent and all responses received. Previously, poll_next_unpin would return Poll::Pending forever after processing all results, causing while let loops to hang indefinitely. Changes: - poll_next_unpin now returns Poll> instead of Poll<...> - Added completion check: when send_queue is empty AND in_flight is empty, return Poll::Ready(None) - Updated Stream impl to pass through the Option directly - Added v4_done/v6_done flags to DualstackMeasureManyStream to track completion state of both underlying streams - Removed unused task::ready imports Closes #3 --- src/lib.rs | 33 ++++++++++++++++++++++++--------- src/pinger.rs | 20 ++++++++++++-------- 2 files changed, 36 insertions(+), 17 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 0462e56..33b4a9f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -23,6 +23,8 @@ rustdoc::broken_intra_doc_links )] +#[cfg(feature = "stream")] +use std::pin::Pin; use std::{ io, marker::PhantomData, @@ -30,8 +32,6 @@ use std::{ task::{Context, Poll}, time::Duration, }; -#[cfg(feature = "stream")] -use std::{pin::Pin, task::ready}; #[cfg(feature = "stream")] use futures_core::Stream; @@ -88,6 +88,8 @@ impl DualstackPinger { DualstackMeasureManyStream { v4: self.v4.measure_many(addresses_v4), v6: self.v6.measure_many(addresses_v6), + v4_done: false, + v6_done: false, } } } @@ -105,16 +107,30 @@ impl DualstackPinger { pub struct DualstackMeasureManyStream<'a, I: Iterator> { v4: MeasureManyStream<'a, Ipv4Addr, FilterIpAddr>, v6: MeasureManyStream<'a, Ipv6Addr, FilterIpAddr>, + v4_done: bool, + v6_done: bool, } impl> DualstackMeasureManyStream<'_, I> { - pub fn poll_next_unpin(&mut self, cx: &mut Context<'_>) -> Poll<(IpAddr, Duration)> { - if let Poll::Ready((v4, rtt)) = self.v4.poll_next_unpin(cx) { - return Poll::Ready((IpAddr::V4(v4), rtt)); + pub fn poll_next_unpin(&mut self, cx: &mut Context<'_>) -> Poll> { + if !self.v4_done { + match self.v4.poll_next_unpin(cx) { + Poll::Ready(Some((v4, rtt))) => return Poll::Ready(Some((IpAddr::V4(v4), rtt))), + Poll::Ready(None) => self.v4_done = true, + Poll::Pending => {} + } + } + + if !self.v6_done { + match self.v6.poll_next_unpin(cx) { + Poll::Ready(Some((v6, rtt))) => return Poll::Ready(Some((IpAddr::V6(v6), rtt))), + Poll::Ready(None) => self.v6_done = true, + Poll::Pending => {} + } } - if let Poll::Ready((v6, rtt)) = self.v6.poll_next_unpin(cx) { - return Poll::Ready((IpAddr::V6(v6), rtt)); + if self.v4_done && self.v6_done { + return Poll::Ready(None); } Poll::Pending @@ -126,8 +142,7 @@ impl + Unpin> Stream for DualstackMeasureManyStream<' type Item = (IpAddr, Duration); fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let result = ready!(self.as_mut().poll_next_unpin(cx)); - Poll::Ready(Some(result)) + self.as_mut().poll_next_unpin(cx) } } diff --git a/src/pinger.rs b/src/pinger.rs index 8ced061..670b80d 100644 --- a/src/pinger.rs +++ b/src/pinger.rs @@ -1,3 +1,5 @@ +#[cfg(feature = "stream")] +use std::pin::Pin; use std::{ collections::HashMap, future::poll_fn, @@ -11,8 +13,6 @@ use std::{ task::{Context, Poll}, time::Duration, }; -#[cfg(feature = "stream")] -use std::{pin::Pin, task::ready}; use bytes::BytesMut; #[cfg(feature = "stream")] @@ -210,15 +210,20 @@ pub struct MeasureManyStream<'a, V: IpVersion, I: Iterator> { } impl> MeasureManyStream<'_, V, I> { - pub fn poll_next_unpin(&mut self, cx: &mut Context<'_>) -> Poll<(V, Duration)> { - // Try to see if another `MeasureManyStream` got it - if let Poll::Ready(Some((addr, rtt))) = self.poll_next_from_different_round(cx) { - return Poll::Ready((addr, rtt)); + pub fn poll_next_unpin(&mut self, cx: &mut Context<'_>) -> Poll> { + // Try to receive a response (may be from a different round) + if let Poll::Ready(maybe_reply) = self.poll_next_from_different_round(cx) { + return Poll::Ready(maybe_reply); } // Try to send ICMP echo requests self.poll_next_icmp_replies(cx); + // Check if we're done: no more addresses to send AND no responses pending + if self.send_queue.peek().is_none() && self.in_flight.is_empty() { + return Poll::Ready(None); + } + Poll::Pending } @@ -269,8 +274,7 @@ impl + Unpin> Stream for MeasureManyStream<' type Item = (V, Duration); fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let result = ready!(self.as_mut().poll_next_unpin(cx)); - Poll::Ready(Some(result)) + self.as_mut().poll_next_unpin(cx) } } From 87895f3eb16f3ad561af6ccc187e9f89236c720e Mon Sep 17 00:00:00 2001 From: Angel Hudgins Date: Thu, 22 Jan 2026 16:22:58 +0100 Subject: [PATCH 2/2] test: add regression tests for stream termination Tests verify that MeasureManyStream properly returns None when: - Single ping completes - Multiple pings to different addresses complete - Empty address list is provided Tests run on both current_thread and multi_thread runtimes. --- tests/stream_termination.rs | 116 ++++++++++++++++++++++++++++++++++++ 1 file changed, 116 insertions(+) create mode 100644 tests/stream_termination.rs diff --git a/tests/stream_termination.rs b/tests/stream_termination.rs new file mode 100644 index 0000000..60a4725 --- /dev/null +++ b/tests/stream_termination.rs @@ -0,0 +1,116 @@ +//! Regression tests for stream termination. +//! +//! The `MeasureManyStream` must properly terminate (return `None`) when all +//! ping requests have been sent and all responses have been received. Without +//! this, `while let Some(...) = stream.next().await` loops hang forever. + +use std::{net::IpAddr, time::Duration}; + +use futures_util::StreamExt; +use massping::DualstackPinger; +use tokio::time; + +/// Test that the stream properly terminates after receiving all responses. +/// +/// This is a regression test for the bug where `poll_next_unpin` always +/// returned `Poll::Pending` after processing all results, causing +/// `while let` loops to hang indefinitely. +#[tokio::test(flavor = "current_thread")] +async fn stream_terminates_after_single_ping() { + let pinger = DualstackPinger::new().unwrap(); + let localhost: IpAddr = "127.0.0.1".parse().unwrap(); + let mut stream = pinger.measure_many([localhost].into_iter()); + + let mut count = 0; + + // This should complete - not hang forever + let result = time::timeout(Duration::from_secs(5), async { + while let Some((addr, rtt)) = stream.next().await { + assert_eq!(addr, localhost); + assert!(rtt < Duration::from_secs(1), "RTT too high: {rtt:?}"); + count += 1; + } + }) + .await; + + assert!( + result.is_ok(), + "stream did not terminate - hung in while let loop" + ); + assert_eq!(count, 1, "expected exactly 1 ping response"); +} + +/// Test that the stream properly terminates after receiving multiple responses. +/// +/// Note: We use different addresses because `in_flight` is keyed by address, +/// so pinging the same address multiple times in one `measure_many` call +/// would overwrite the previous entry. +#[tokio::test(flavor = "current_thread")] +async fn stream_terminates_after_multiple_pings() { + let pinger = DualstackPinger::new().unwrap(); + + // Use different loopback addresses (127.0.0.x all route to localhost) + let addresses: Vec = vec![ + "127.0.0.1".parse().unwrap(), + "127.0.0.2".parse().unwrap(), + "127.0.0.3".parse().unwrap(), + ]; + let mut stream = pinger.measure_many(addresses.iter().copied()); + + let mut count = 0; + + let result = time::timeout(Duration::from_secs(5), async { + while let Some((_addr, rtt)) = stream.next().await { + assert!(rtt < Duration::from_secs(1), "RTT too high: {rtt:?}"); + count += 1; + } + }) + .await; + + assert!( + result.is_ok(), + "stream did not terminate - hung in while let loop" + ); + assert_eq!(count, 3, "expected exactly 3 ping responses"); +} + +/// Test that an empty address list terminates immediately. +#[tokio::test(flavor = "current_thread")] +async fn stream_terminates_with_empty_input() { + let pinger = DualstackPinger::new().unwrap(); + let addresses: Vec = vec![]; + let mut stream = pinger.measure_many(addresses.into_iter()); + + let result = time::timeout(Duration::from_secs(1), async { + let first = stream.next().await; + assert!(first.is_none(), "expected None for empty address list"); + }) + .await; + + assert!(result.is_ok(), "stream did not terminate for empty input"); +} + +/// Test stream termination with multi_thread runtime as a baseline. +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn stream_terminates_multi_thread() { + let pinger = DualstackPinger::new().unwrap(); + let localhost: IpAddr = "127.0.0.1".parse().unwrap(); + let mut stream = pinger.measure_many([localhost].into_iter()); + + let mut count = 0; + + let result = time::timeout(Duration::from_secs(5), async { + while let Some((addr, rtt)) = stream.next().await { + assert_eq!(addr, localhost); + assert!(rtt < Duration::from_secs(1), "RTT too high: {rtt:?}"); + count += 1; + } + }) + .await; + + assert!( + result.is_ok(), + "stream did not terminate - hung in while let loop" + ); + assert_eq!(count, 1, "expected exactly 1 ping response"); +}