From c1b9e540e1fbf05bd8f7daa0bb14c65c5ea84542 Mon Sep 17 00:00:00 2001 From: Thomas Winget Date: Thu, 22 Jan 2026 02:15:27 -0500 Subject: [PATCH 01/16] Adds a separate JobQueue to Loop It is useful to be able to have a job queue separate from the Loop's main job queue so that jobs depending on something which will be destroyed can be cancelled without affecting other classes using the Loop. For example, A and B both use the Loop and B will be destroyed, with its destructor happening on the Loop. If some other class can queue jobs which reference B (or any of its members) *after* B's destructor is queued, but before its destructor finishes (and those other classes are informed of its destruction), those jobs would execute after B's destructor and segfault. Instead, if B owns a JobQueue and jobs which would reference it are queued onto *that* queue, then when B is destroyed, so is that queue, but any jobs in the Loop's main queue (or any other separate JobQueues) are unaffected. --- include/oxen/quic/btstream.hpp | 1 + include/oxen/quic/datagram.hpp | 1 + include/oxen/quic/endpoint.hpp | 1 + include/oxen/quic/loop.hpp | 175 +++++++++++++++++++++++++++++---- include/oxen/quic/stream.hpp | 1 + src/loop.cpp | 80 +++++++++++---- 6 files changed, 223 insertions(+), 36 deletions(-) diff --git a/include/oxen/quic/btstream.hpp b/include/oxen/quic/btstream.hpp index 30ac7d5c..932766d3 100644 --- a/include/oxen/quic/btstream.hpp +++ b/include/oxen/quic/btstream.hpp @@ -247,6 +247,7 @@ namespace oxen::quic friend struct sent_request; friend class Network; friend class Loop; + friend class JobQueue; protected: template diff --git a/include/oxen/quic/datagram.hpp b/include/oxen/quic/datagram.hpp index 98547a23..789a14a2 100644 --- a/include/oxen/quic/datagram.hpp +++ b/include/oxen/quic/datagram.hpp @@ -329,6 +329,7 @@ namespace oxen::quic protected: friend class Connection; friend class Loop; + friend class JobQueue; friend struct dgram::rotating_buffer; friend class TestHelper; diff --git a/include/oxen/quic/endpoint.hpp b/include/oxen/quic/endpoint.hpp index ad55fb4a..5ca08507 100644 --- a/include/oxen/quic/endpoint.hpp +++ b/include/oxen/quic/endpoint.hpp @@ -159,6 +159,7 @@ namespace oxen::quic private: friend class Network; friend class Loop; + friend class JobQueue; friend class Connection; friend struct connection_callbacks; friend class TestHelper; diff --git a/include/oxen/quic/loop.hpp b/include/oxen/quic/loop.hpp index 20ce8a3b..ea3851a9 100644 --- a/include/oxen/quic/loop.hpp +++ b/include/oxen/quic/loop.hpp @@ -19,6 +19,7 @@ namespace oxen::quic struct Ticker { friend class Loop; + friend class JobQueue; private: event_ptr ev; @@ -59,6 +60,7 @@ namespace oxen::quic class Wakeable { friend class Loop; + friend class JobQueue; event_ptr ev; std::function f; @@ -70,21 +72,22 @@ namespace oxen::quic void wake(); }; - class Loop + class JobQueue { - protected: - std::unique_ptr<::event_base, void (*)(struct ::event_base*)> ev_loop; - std::thread loop_thread; - std::thread::id loop_thread_id; + friend class Loop; + + std::shared_ptr running{std::make_shared(true)}; event_ptr job_waker; std::queue job_queue; std::mutex job_queue_mutex; + Loop& loop; + void add_oneshot_event(std::chrono::microseconds delay, std::function hook); - private: std::list> tickers; + std::list> wakeables; std::shared_ptr make_ticker(); @@ -94,19 +97,17 @@ namespace oxen::quic struct OneShotDelayed; std::list delayed_events; - public: - Loop(); + void setup_job_waker(); + void process_job_queue(); - Loop(const Loop&) = delete; - Loop(Loop&&) = delete; - Loop& operator=(Loop&&) = delete; - Loop& operator=(Loop) = delete; + bool inside() const; - virtual ~Loop(); + ::event_base* get_event_base() const; - ::event_base* get_event_base() const { return ev_loop.get(); } + public: + JobQueue(Loop& l); - bool inside() const { return std::this_thread::get_id() == loop_thread_id; } + ~JobQueue(); // Returns a pointer deleter that defers the actual destruction call to this network // object's event loop. @@ -269,10 +270,150 @@ namespace oxen::quic { call_soon([ptr = std::move(ptr)]() mutable { ptr.reset(); }); } + }; + + class Loop + { + friend class JobQueue; + + protected: + std::unique_ptr<::event_base, void (*)(struct ::event_base*)> ev_loop; + std::thread loop_thread; + std::thread::id loop_thread_id; private: - void setup_job_waker(); + std::unique_ptr main_queue; - void process_job_queue(); + std::unique_ptr create_job_queue() { return std::make_unique(*this); } + + public: + Loop(); + + Loop(const Loop&) = delete; + Loop(Loop&&) = delete; + Loop& operator=(Loop&&) = delete; + Loop& operator=(Loop) = delete; + + virtual ~Loop(); + + ::event_base* get_event_base() const { return ev_loop.get(); } + + // Get an independent JobQueue to use for jobs, call_later, loop deleters, etc. + // + // Do not use this unless you know you need it. + // + // The interface is the same as `Loop::call` et al, but when this JobQueue is + // destroyed the Loop can live on. The purpose of this is if you have multiple + // components using the same Loop and one of those components may have jobs queued + // which reference it *after* its destructor, that component can instead own this + // JobQueue and those jobs will not be processed, but anything else using the Loop + // will be unaffected. + // + // The owner of this JobQueue is responsible for making sure that the JobQueue is + // deleted on the loop thread. The easiest way to do this is to make sure the owning + // object is deleted on the loop thread. + std::unique_ptr make_job_queue(); + + bool inside() const { return std::this_thread::get_id() == loop_thread_id; } + + // FIXME: this *may* be superfluous with the addition of JobQueue, but since it's + // public I'm not sure if we've used it outside this class... + // Returns a pointer deleter that defers invocation of a custom deleter to the event loop + template Callable> + auto wrapped_deleter(Callable f) + { + return main_queue->wrapped_deleter(std::move(f)); + } + + // Similar in concept to std::make_shared, but it creates the shared pointer with a + // custom deleter that dispatches actual object destruction to the network's event loop for + // thread safety, and waits for destruction of the overlying object to complete before + // returning. + template + std::shared_ptr make_shared(Args&&... args) + { + return main_queue->make_shared(std::forward(args)...); + } + + // Similar to the above make_shared, but instead of forwarding arguments for the + // construction of the object, it creates the shared_ptr from the already created object ptr + // and wraps the object's deleter in a wrapped_deleter + template Callable> + std::shared_ptr shared_ptr(T* obj, Callable&& deleter) + { + return main_queue->shared_ptr(obj, std::move(deleter)); + } + + /// Calls `f()` on the event loop. If the caller is already in the event loop thread then + /// f() is called immediately; otherwise it is scheduled on the event loop thread at the + /// next available opportunity. + template Callable> + void call(Callable&& f) + { + return main_queue->call(std::move(f)); + } + + // Calls `f()` on the event loop and returns its value. If this is called from within the + // event loop thread then this simply calls and returns the result of `f()`. If *not* in + // the event loop then a call to `f()` is scheduled on the event loop for the next available + // opportunity and then the current thread blocks until that call is invoked, then returns + // it back to the caller. + template ()())> + Ret call_get(Callable&& f) + { + return main_queue->call_get(std::move(f)); + } + + /// Sets up a task `f()` to be called on the event loop periodically. + /// + /// `interval` controls the interval on which the task will be called. + /// + /// `start_immediately` controls whether the task is scheduled on the event loop right away + /// (true, the default), or not (false). If not started immediately then the task will not + /// fire until `start()` is called on it. (Note that this parameter does not mean "call + /// immediately" -- it simply controls whether the initial timer for the first call is + /// started or not). + /// + /// The ticker will remain active as long the loop remains active and the returned Ticker + /// object is kept alive. + template Callable> + [[nodiscard]] std::shared_ptr call_every( + std::chrono::microseconds interval, Callable&& f, bool start_immediately = true) + { + return main_queue->call_every(interval, std::move(f), start_immediately); + } + + /// Schedules a call of `f()` on the event loop after a delay. + template Callable> + void call_later(std::chrono::microseconds delay, Callable hook) + { + main_queue->call_later(delay, std::move(hook)); + } + + /// Creates a Wakeable event tied to this event loop that can be manually triggered when + /// desired to schedule an invocation of the callback. Unlike call_soon, this is idempotent + /// (i.e. multiple wakeups before it actually runs does not schedule multiple calls). Note + /// that this call only constructs the event, but does not initially schedule it. + std::shared_ptr make_wakeable(std::function hook) + { + return main_queue->make_wakeable(std::move(hook)); + } + + /// Schedules a call of `f()` at the next available opportunity on the event loop. Unlike + /// `call()`, `call_soon()` never calls f() immediately even if already inside the event + /// loop. + template Callable> + void call_soon(Callable f) + { + main_queue->call_soon(std::move(f)); + } + + /// Takes any type of shared_ptr and schedules a reset of that shared pointer on the event + /// loop. Asyncronous. + template + void reset_soon(std::shared_ptr&& ptr) + { + call_soon([ptr = std::move(ptr)]() mutable { ptr.reset(); }); + } }; } // namespace oxen::quic diff --git a/include/oxen/quic/stream.hpp b/include/oxen/quic/stream.hpp index 129f006f..ece2e180 100644 --- a/include/oxen/quic/stream.hpp +++ b/include/oxen/quic/stream.hpp @@ -67,6 +67,7 @@ namespace oxen::quic friend class Connection; friend class Network; friend class Loop; + friend class JobQueue; protected: template diff --git a/src/loop.cpp b/src/loop.cpp index a9aaeb2b..cf91d787 100644 --- a/src/loop.cpp +++ b/src/loop.cpp @@ -147,10 +147,10 @@ namespace oxen::quic log::debug(log_cat, "Started libevent loop with backend {}", event_base_get_method(ev_loop.get())); - setup_job_waker(); - std::promise p; + main_queue = create_job_queue(); + loop_thread = std::thread{[this, &p] { log::debug(log_cat, "Starting event loop run"); p.set_value(); @@ -164,17 +164,31 @@ namespace oxen::quic log::info(log_cat, "libevent loop is started"); } - struct Loop::OneShotDelayed + std::unique_ptr Loop::make_job_queue() { - Loop& loop; + return main_queue->call_get([this]() { return create_job_queue(); }); + } + + struct JobQueue::OneShotDelayed + { + JobQueue& jq; std::function f; - OneShotDelayed(Loop& loop, std::function f) : loop{loop}, f{std::move(f)} {} + OneShotDelayed(JobQueue& jq_, std::function f) : jq{jq_}, f{std::move(f)} {} }; - Loop::~Loop() + JobQueue::JobQueue(Loop& l) : loop{l} { - log::debug(log_cat, "Shutting down loop..."); + setup_job_waker(); + } + + JobQueue::~JobQueue() + { + log::debug(log_cat, "Destroying loop job queue."); + *running = false; + + [[maybe_unused]] auto res = event_del(job_waker.get()); + assert(res == 0); for (auto& t : tickers) { @@ -188,6 +202,15 @@ namespace oxen::quic for (auto* osd : delayed_events) delete osd; delayed_events.clear(); + } + + Loop::~Loop() + { + log::debug(log_cat, "Shutting down loop..."); + + // JobQueue has a canary such that if it's processing jobs as it is destroyed + // it should be safe, but we *do* want to stop/destroy it on the loop thread. + main_queue->call_get([this]() { main_queue.reset(); }); event_base_loopbreak(ev_loop.get()); loop_thread.join(); @@ -199,7 +222,7 @@ namespace oxen::quic #endif } - std::shared_ptr Loop::make_ticker() + std::shared_ptr JobQueue::make_ticker() { std::erase_if(tickers, [](auto& wp) { return wp.expired(); }); auto t = make_shared(); @@ -207,12 +230,12 @@ namespace oxen::quic return t; } - std::shared_ptr Loop::make_wakeable(std::function callback) + std::shared_ptr JobQueue::make_wakeable(std::function callback) { auto w = make_shared(); w->f = std::move(callback); w->ev.reset(event_new( - ev_loop.get(), + loop.ev_loop.get(), -1, 0, [](evutil_socket_t, short, void* w) { @@ -221,45 +244,49 @@ namespace oxen::quic wakeable->f(); }, w.get())); + wakeables.emplace_back(w); return w; } void Wakeable::wake() { + if (!ev) + return; + event_active(ev.get(), 0, 0); } - void Loop::setup_job_waker() + void JobQueue::setup_job_waker() { // Almost identical to the generic make_wakeable, except that we avoid the std::function and // its implicit virtual function call. job_waker.reset(event_new( - ev_loop.get(), + loop.ev_loop.get(), -1, 0, [](evutil_socket_t, short, void* self) { log::trace(log_cat, "processing job queue"); - static_cast(self)->process_job_queue(); + static_cast(self)->process_job_queue(); }, this)); assert(job_waker); } - void Loop::add_oneshot_event(std::chrono::microseconds delay, std::function hook) + void JobQueue::add_oneshot_event(std::chrono::microseconds delay, std::function hook) { auto* handler = new OneShotDelayed{*this, std::move(hook)}; delayed_events.push_back(handler); auto& h = *handler; const auto delay_tv = loop_time_to_timeval(delay); event_base_once( - get_event_base(), + loop.get_event_base(), -1, EV_TIMEOUT, [](evutil_socket_t, short, void* e) mutable { auto* h = static_cast(e); if (h->f) h->f(); - auto& de = h->loop.delayed_events; + auto& de = h->jq.delayed_events; if (auto it = std::find(de.begin(), de.end(), h); it != de.end()) de.erase(it); delete h; @@ -268,7 +295,7 @@ namespace oxen::quic &delay_tv); } - void Loop::process_job_queue() + void JobQueue::process_job_queue() { log::trace(log_cat, "Event loop processing job queue"); assert(inside()); @@ -280,7 +307,12 @@ namespace oxen::quic job_queue.swap(swapped_queue); } - while (not swapped_queue.empty()) + // copy shared_ptr as a "running" canary, as this object's destructor + // should eventually be one of the queued jobs, after which no further jobs + // should run. + auto running_ptr = running; + + while (not swapped_queue.empty() && *running_ptr) { auto job = swapped_queue.front(); swapped_queue.pop(); @@ -288,8 +320,18 @@ namespace oxen::quic } } + bool JobQueue::inside() const + { + return loop.inside(); + } + + ::event_base* JobQueue::get_event_base() const + { + return loop.get_event_base(); + } + // Wrapper around event_active so that we can keep libevent out of the public headers. - void Loop::activate(::event& evt) + void JobQueue::activate(::event& evt) { event_active(&evt, 0, 0); } From 591830a2d26643ed8e5fd085edae2d474dfa891d Mon Sep 17 00:00:00 2001 From: Thomas Winget Date: Thu, 22 Jan 2026 23:58:55 -0500 Subject: [PATCH 02/16] Add test for JobQueue Adds a test that jobs queued onto a JobQueue from a Loop after that JobQueue is destroyed indeed do not execute, and that jobs queued onto that loop's main JobQueue which would execute *after* that job *do* execute. --- tests/CMakeLists.txt | 1 + tests/job-queue.cpp | 66 ++++++++++++++++++++++++++++++++++++++++++++ tests/utils.hpp | 5 ++++ 3 files changed, 72 insertions(+) create mode 100644 tests/job-queue.cpp diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 83c7e8ed..a2c1079a 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -51,6 +51,7 @@ if(LIBQUIC_BUILD_TESTS) 014-0rtt.cpp 015-bt-encoding.cpp 016-stateless_reset.cpp + job-queue.cpp case_logger.cpp ) diff --git a/tests/job-queue.cpp b/tests/job-queue.cpp new file mode 100644 index 00000000..f7679b47 --- /dev/null +++ b/tests/job-queue.cpp @@ -0,0 +1,66 @@ +#include + +#include "unit_test.hpp" +#include "utils.hpp" + +#ifndef _WIN32 +extern "C" +{ +#include +} +#endif + +namespace oxen::quic::test +{ + using namespace std::literals; + + TEST_CASE("JobQueue - One extra queue", "[jobqueue]") + { + SECTION("Extra queue jobs go away, others do not.") + { + Loop loop; + auto jq = loop.make_job_queue(); + + callback_waiter queued{[](){}}; + callback_waiter good{[](){}}; + callback_waiter bad{[](){}}; + callback_waiter main_ok{[](){}}; + + // queue 3 jobs: + // + // one should run then queue a job onto the main job queue, which + // should later execute without issue + // + // the next should destroy the created job queue + // + // the third should not execute + loop.call([&](){ + jq->call_soon([&]() { + good.call(); + }); + + jq->call_soon([&]() { + jq.reset(); + }); + + jq->call_soon([&]() { + bad.call(); + }); + + // call_soon so it gets queued, as it is being called from inside the loop. + loop.call_soon([&](){ + main_ok.call(); + }); + + queued.call(); + }); + + REQUIRE(queued.wait(10ms)); + + REQUIRE(good.wait(10ms)); + REQUIRE_FALSE(bad.wait(10ms)); + REQUIRE(main_ok.wait(10ms)); + } + } + +} // namespace oxen::quic::test diff --git a/tests/utils.hpp b/tests/utils.hpp index e1fcfd40..00b93ece 100644 --- a/tests/utils.hpp +++ b/tests/utils.hpp @@ -266,6 +266,11 @@ namespace oxen::quic return func(std::forward(args)...); }; } + + void call() + { + this->operator Func_t()(); + } }; /// Waits for some condition to be satisfied, sleeping between checks. Returns the result of From 56b77756bd5cfcd99d208afe3926dc0324c307d4 Mon Sep 17 00:00:00 2001 From: Thomas Winget Date: Fri, 23 Jan 2026 02:05:59 -0500 Subject: [PATCH 03/16] add tests for multiple loop job queues Adds some tests for the event queue behaving correctly when multiple job queues are created/destroyed -- in this case, "ticker" tests were added. There are also bug fixes related to said tickers, exposed by said tests. --- include/oxen/quic/loop.hpp | 13 ++++--- src/loop.cpp | 21 ++++++------ tests/job-queue.cpp | 70 ++++++++++++++++++++++++++------------ tests/utils.hpp | 5 +-- 4 files changed, 68 insertions(+), 41 deletions(-) diff --git a/include/oxen/quic/loop.hpp b/include/oxen/quic/loop.hpp index ea3851a9..8905f2c3 100644 --- a/include/oxen/quic/loop.hpp +++ b/include/oxen/quic/loop.hpp @@ -25,15 +25,14 @@ namespace oxen::quic event_ptr ev; timeval interval; std::function f; + std::shared_ptr alive; void init_event( ::event_base* loop, std::chrono::microseconds _t, std::function task, bool start_immediately = true); - Ticker() = default; + Ticker(std::shared_ptr keepalive) : alive{keepalive} {} public: - ~Ticker(); - /** Starts the repeating event on the given interval on Ticker creation. Does nothing if * already active. Returns: @@ -64,8 +63,9 @@ namespace oxen::quic event_ptr ev; std::function f; + std::shared_ptr alive; - Wakeable() = default; + Wakeable(std::shared_ptr keepalive) : alive{keepalive} {} public: /// Call to schedule f() to be called, if not already scheduled. @@ -312,6 +312,11 @@ namespace oxen::quic // The owner of this JobQueue is responsible for making sure that the JobQueue is // deleted on the loop thread. The easiest way to do this is to make sure the owning // object is deleted on the loop thread. + // + // If you keep objects alive which you created with this queue, e.g. tickers, wakeables, + // etc. you are responsible for making sure any concrete references to them (especially + // shared_ptr) are gone before the JobQueue is. Their destructors are (necessarily and + // intentionally) jobs on the job queue from which they spawned. std::unique_ptr make_job_queue(); bool inside() const { return std::this_thread::get_id() == loop_thread_id; } diff --git a/src/loop.cpp b/src/loop.cpp index cf91d787..4d42e1ba 100644 --- a/src/loop.cpp +++ b/src/loop.cpp @@ -46,12 +46,14 @@ namespace oxen::quic bool Ticker::stop() { - if (event_del(ev.get()) != 0) + if (!alive) + return true; + + if (ev && event_del(ev.get()) != 0) { log::warning(log_cat, "EventHandler failed to pause repeating event!"); return false; } - return true; } @@ -89,12 +91,6 @@ namespace oxen::quic log::warning(log_cat, "Failed to immediately start one-off event!"); } - Ticker::~Ticker() - { - ev.reset(); - f = nullptr; - } - static std::vector get_ev_methods() { std::vector ev_methods_avail; @@ -224,15 +220,18 @@ namespace oxen::quic std::shared_ptr JobQueue::make_ticker() { + if (!running) + return nullptr; + std::erase_if(tickers, [](auto& wp) { return wp.expired(); }); - auto t = make_shared(); + auto t = make_shared(running); tickers.emplace_back(t); return t; } std::shared_ptr JobQueue::make_wakeable(std::function callback) { - auto w = make_shared(); + auto w = make_shared(running); w->f = std::move(callback); w->ev.reset(event_new( loop.ev_loop.get(), @@ -250,7 +249,7 @@ namespace oxen::quic void Wakeable::wake() { - if (!ev) + if (!ev || !alive) return; event_active(ev.get(), 0, 0); diff --git a/tests/job-queue.cpp b/tests/job-queue.cpp index f7679b47..5595ff8a 100644 --- a/tests/job-queue.cpp +++ b/tests/job-queue.cpp @@ -1,8 +1,8 @@ -#include - #include "unit_test.hpp" #include "utils.hpp" +#include + #ifndef _WIN32 extern "C" { @@ -21,10 +21,10 @@ namespace oxen::quic::test Loop loop; auto jq = loop.make_job_queue(); - callback_waiter queued{[](){}}; - callback_waiter good{[](){}}; - callback_waiter bad{[](){}}; - callback_waiter main_ok{[](){}}; + callback_waiter queued{[]() {}}; + callback_waiter good{[]() {}}; + callback_waiter bad{[]() {}}; + callback_waiter main_ok{[]() {}}; // queue 3 jobs: // @@ -34,26 +34,18 @@ namespace oxen::quic::test // the next should destroy the created job queue // // the third should not execute - loop.call([&](){ - jq->call_soon([&]() { - good.call(); - }); + loop.call([&]() { + jq->call_soon([&]() { good.call(); }); - jq->call_soon([&]() { - jq.reset(); - }); + jq->call_soon([&]() { jq.reset(); }); - jq->call_soon([&]() { - bad.call(); - }); + jq->call_soon([&]() { bad.call(); }); - // call_soon so it gets queued, as it is being called from inside the loop. - loop.call_soon([&](){ - main_ok.call(); - }); + // call_soon so it gets queued, as it is being called from inside the loop. + loop.call_soon([&]() { main_ok.call(); }); - queued.call(); - }); + queued.call(); + }); REQUIRE(queued.wait(10ms)); @@ -61,6 +53,40 @@ namespace oxen::quic::test REQUIRE_FALSE(bad.wait(10ms)); REQUIRE(main_ok.wait(10ms)); } + + SECTION("Tickers stop when their JobQueue dies") + { + Loop loop; + auto jq = loop.make_job_queue(); + + callback_waiter queued{[]() {}}; + + std::atomic bad_count = 0; + std::atomic good_count = 0; + std::shared_ptr bad; + std::shared_ptr good; + + // increment each counter every interval + loop.call([&]() { + bad = jq->call_every(1ms, [&]() { bad_count++; }); + good = loop.call_every(1ms, [&]() { good_count++; }); + + loop.call_later(20ms, [&]() { + // our ticker references must expire before the job queue does + bad.reset(); + + jq.reset(); + }); + + queued.call(); + }); + + REQUIRE(queued.wait(10ms)); + std::this_thread::sleep_for(40ms); + + // allows for a bit of stupid timing, should be sufficient + REQUIRE(good_count > bad_count + 5); + } } } // namespace oxen::quic::test diff --git a/tests/utils.hpp b/tests/utils.hpp index 00b93ece..006552fc 100644 --- a/tests/utils.hpp +++ b/tests/utils.hpp @@ -267,10 +267,7 @@ namespace oxen::quic }; } - void call() - { - this->operator Func_t()(); - } + void call() { this->operator Func_t()(); } }; /// Waits for some condition to be satisfied, sleeping between checks. Returns the result of From 2b96b6a3faf4ed058a67e5cc4155289b7326f23a Mon Sep 17 00:00:00 2001 From: Thomas Winget Date: Fri, 23 Jan 2026 17:33:35 -0500 Subject: [PATCH 04/16] version bump for new job queueing --- CMakeLists.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index ea64135c..b11d839f 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -17,7 +17,7 @@ if(CCACHE_PROGRAM) endif() project(libquic - VERSION 1.7.1 + VERSION 1.8.0 DESCRIPTION "Modular QUIC library for stream and connection management" LANGUAGES ${LANGS}) From 6fb9f02a8a1d61934b89bc8476c05751af023c0e Mon Sep 17 00:00:00 2001 From: Jason Rhinelander Date: Tue, 27 Jan 2026 17:59:16 -0400 Subject: [PATCH 05/16] JobQueue tweaks - add JobQueue::stop() to be able to stop without destruction - make JobQueue::stop()/destruction synchronize itself rather than require the caller to worry about synchronization. - refactor Loop construction a little to put all the initialization into a static function rather than doing it inline in the constructor itself: this allows the next point: - Hold a plain JobQueue rather than a unique_ptr inside Loop - Delete `loop.make_job_queue()` -- the caller can just go `std::make_unique(loop)` or use some other smart pointer or even just a plain object. - fix some forwarding std::move's that should be std::forwards instead. --- include/oxen/quic/loop.hpp | 81 +++++++++++++++++++++----------------- src/loop.cpp | 50 +++++++++++++---------- tests/job-queue.cpp | 14 +++---- 3 files changed, 81 insertions(+), 64 deletions(-) diff --git a/include/oxen/quic/loop.hpp b/include/oxen/quic/loop.hpp index 8905f2c3..ae279a92 100644 --- a/include/oxen/quic/loop.hpp +++ b/include/oxen/quic/loop.hpp @@ -72,6 +72,27 @@ namespace oxen::quic void wake(); }; + // Get an independent JobQueue to use for jobs, call_later, loop deleters, etc. + // + // Do not use this unless you know you need it. + // + // The interface is the same as `Loop::call` et al, but a JobQueue can have a shorter lifetime + // than the Loop on which it runs. The purpose of this is if you have multiple components using + // the same Loop and one of those components may have jobs queued which reference it *after* its + // destructor, that component can instead own this JobQueue and those jobs will not be + // processed, but anything else using the Loop will be unaffected. + // + // Effectively this allows a subqueue of events that can be cancelled (via JobQueue destruction) + // without needing to cancel jobs of unrelated job queues. + // + // This queue can be stopped or destroyed *off* the loop thread, if necessary, but note that + // stopping and destruction still requires that the loop thread is usable to perform the actual + // destruction, and so the loop this class uses must outlive this queue. + // + // If you keep objects alive which you created with this queue, e.g. tickers, wakeables, etc. + // you are responsible for making sure any concrete references to them (especially shared_ptr) + // are gone before the JobQueue is. Their destructors are (necessarily and intentionally) jobs + // on the job queue from which they spawned. class JobQueue { friend class Loop; @@ -107,6 +128,17 @@ namespace oxen::quic public: JobQueue(Loop& l); + // Cancels all jobs in the queue and deletes this job queue's event from the event loop. + // This is normally called automatically during destruction, but can be called before + // destruction if needed. This method does nothing if the queue has already been stopped. + // + // Stopping is terminal (i.e. there is no way to restart a queue other than replacing it). + // + // Note that this method requires the event loop and will block until the owning Loop is + // able to process it. + void stop(); + + // Calls stop() if not already called. ~JobQueue(); // Returns a pointer deleter that defers the actual destruction call to this network @@ -282,9 +314,7 @@ namespace oxen::quic std::thread::id loop_thread_id; private: - std::unique_ptr main_queue; - - std::unique_ptr create_job_queue() { return std::make_unique(*this); } + JobQueue main_queue{*this}; public: Loop(); @@ -298,36 +328,15 @@ namespace oxen::quic ::event_base* get_event_base() const { return ev_loop.get(); } - // Get an independent JobQueue to use for jobs, call_later, loop deleters, etc. - // - // Do not use this unless you know you need it. - // - // The interface is the same as `Loop::call` et al, but when this JobQueue is - // destroyed the Loop can live on. The purpose of this is if you have multiple - // components using the same Loop and one of those components may have jobs queued - // which reference it *after* its destructor, that component can instead own this - // JobQueue and those jobs will not be processed, but anything else using the Loop - // will be unaffected. - // - // The owner of this JobQueue is responsible for making sure that the JobQueue is - // deleted on the loop thread. The easiest way to do this is to make sure the owning - // object is deleted on the loop thread. - // - // If you keep objects alive which you created with this queue, e.g. tickers, wakeables, - // etc. you are responsible for making sure any concrete references to them (especially - // shared_ptr) are gone before the JobQueue is. Their destructors are (necessarily and - // intentionally) jobs on the job queue from which they spawned. - std::unique_ptr make_job_queue(); - bool inside() const { return std::this_thread::get_id() == loop_thread_id; } // FIXME: this *may* be superfluous with the addition of JobQueue, but since it's // public I'm not sure if we've used it outside this class... // Returns a pointer deleter that defers invocation of a custom deleter to the event loop template Callable> - auto wrapped_deleter(Callable f) + auto wrapped_deleter(Callable&& f) { - return main_queue->wrapped_deleter(std::move(f)); + return main_queue.wrapped_deleter(std::forward(f)); } // Similar in concept to std::make_shared, but it creates the shared pointer with a @@ -337,7 +346,7 @@ namespace oxen::quic template std::shared_ptr make_shared(Args&&... args) { - return main_queue->make_shared(std::forward(args)...); + return main_queue.make_shared(std::forward(args)...); } // Similar to the above make_shared, but instead of forwarding arguments for the @@ -346,7 +355,7 @@ namespace oxen::quic template Callable> std::shared_ptr shared_ptr(T* obj, Callable&& deleter) { - return main_queue->shared_ptr(obj, std::move(deleter)); + return main_queue.shared_ptr(obj, std::forward(deleter)); } /// Calls `f()` on the event loop. If the caller is already in the event loop thread then @@ -355,7 +364,7 @@ namespace oxen::quic template Callable> void call(Callable&& f) { - return main_queue->call(std::move(f)); + main_queue.call(std::forward(f)); } // Calls `f()` on the event loop and returns its value. If this is called from within the @@ -366,7 +375,7 @@ namespace oxen::quic template ()())> Ret call_get(Callable&& f) { - return main_queue->call_get(std::move(f)); + return main_queue.call_get(std::forward(f)); } /// Sets up a task `f()` to be called on the event loop periodically. @@ -385,14 +394,14 @@ namespace oxen::quic [[nodiscard]] std::shared_ptr call_every( std::chrono::microseconds interval, Callable&& f, bool start_immediately = true) { - return main_queue->call_every(interval, std::move(f), start_immediately); + return main_queue.call_every(interval, std::forward(f), start_immediately); } /// Schedules a call of `f()` on the event loop after a delay. template Callable> - void call_later(std::chrono::microseconds delay, Callable hook) + void call_later(std::chrono::microseconds delay, Callable&& hook) { - main_queue->call_later(delay, std::move(hook)); + main_queue.call_later(delay, std::forward(hook)); } /// Creates a Wakeable event tied to this event loop that can be manually triggered when @@ -401,16 +410,16 @@ namespace oxen::quic /// that this call only constructs the event, but does not initially schedule it. std::shared_ptr make_wakeable(std::function hook) { - return main_queue->make_wakeable(std::move(hook)); + return main_queue.make_wakeable(std::move(hook)); } /// Schedules a call of `f()` at the next available opportunity on the event loop. Unlike /// `call()`, `call_soon()` never calls f() immediately even if already inside the event /// loop. template Callable> - void call_soon(Callable f) + void call_soon(Callable&& f) { - main_queue->call_soon(std::move(f)); + main_queue.call_soon(std::forward(f)); } /// Takes any type of shared_ptr and schedules a reset of that shared pointer on the event diff --git a/src/loop.cpp b/src/loop.cpp index 4d42e1ba..eaf5ec3b 100644 --- a/src/loop.cpp +++ b/src/loop.cpp @@ -99,9 +99,7 @@ namespace oxen::quic return ev_methods_avail; } - Loop::Loop() : ev_loop{nullptr, ::event_base_free} - { - log::trace(log_cat, "Beginning loop context creation with new ev loop thread"); + static ::event_base* make_ev_loop() { #ifdef _WIN32 { @@ -139,14 +137,14 @@ namespace oxen::quic event_config_set_flag(ev_conf.get(), EVENT_BASE_FLAG_NO_CACHE_TIME); event_config_set_flag(ev_conf.get(), EVENT_BASE_FLAG_EPOLL_USE_CHANGELIST); - ev_loop = {event_base_new_with_config(ev_conf.get()), event_base_free}; - - log::debug(log_cat, "Started libevent loop with backend {}", event_base_get_method(ev_loop.get())); + auto ev_loop = event_base_new_with_config(ev_conf.get()); + log::debug(log_cat, "Started libevent loop with backend {}", event_base_get_method(ev_loop)); + return ev_loop; + } + Loop::Loop() : ev_loop{make_ev_loop(), ::event_base_free} + { std::promise p; - - main_queue = create_job_queue(); - loop_thread = std::thread{[this, &p] { log::debug(log_cat, "Starting event loop run"); p.set_value(); @@ -160,11 +158,6 @@ namespace oxen::quic log::info(log_cat, "libevent loop is started"); } - std::unique_ptr Loop::make_job_queue() - { - return main_queue->call_get([this]() { return create_job_queue(); }); - } - struct JobQueue::OneShotDelayed { JobQueue& jq; @@ -178,13 +171,27 @@ namespace oxen::quic setup_job_waker(); } - JobQueue::~JobQueue() + JobQueue::~JobQueue() { + log::debug(log_cat, "Destryoing job queue."); + if (job_waker) + stop(); + } + + void JobQueue::stop() { - log::debug(log_cat, "Destroying loop job queue."); + // Synchronization point: if we aren't on the loop, recurse into it: + if (!loop.inside()) { + loop.call_get([this] { stop(); }); + return; + } + + if (!job_waker) + return; + + log::debug(log_cat, "Stopping/cancelling job queue events"); *running = false; - [[maybe_unused]] auto res = event_del(job_waker.get()); - assert(res == 0); + job_waker.reset(); for (auto& t : tickers) { @@ -204,9 +211,10 @@ namespace oxen::quic { log::debug(log_cat, "Shutting down loop..."); - // JobQueue has a canary such that if it's processing jobs as it is destroyed - // it should be safe, but we *do* want to stop/destroy it on the loop thread. - main_queue->call_get([this]() { main_queue.reset(); }); + // JobQueue has a canary such that if it's processing jobs as it is destroyed it should be + // safe, but we *do* want to stop/destroy it before general member destruction (and on the + // loop thread, implemented by stop() itself). + main_queue.stop(); event_base_loopbreak(ev_loop.get()); loop_thread.join(); diff --git a/tests/job-queue.cpp b/tests/job-queue.cpp index 5595ff8a..d6d16c42 100644 --- a/tests/job-queue.cpp +++ b/tests/job-queue.cpp @@ -19,7 +19,7 @@ namespace oxen::quic::test SECTION("Extra queue jobs go away, others do not.") { Loop loop; - auto jq = loop.make_job_queue(); + JobQueue jq{loop}; callback_waiter queued{[]() {}}; callback_waiter good{[]() {}}; @@ -35,11 +35,11 @@ namespace oxen::quic::test // // the third should not execute loop.call([&]() { - jq->call_soon([&]() { good.call(); }); + jq.call_soon([&]() { good.call(); }); - jq->call_soon([&]() { jq.reset(); }); + jq.call_soon([&]() { jq.stop(); }); - jq->call_soon([&]() { bad.call(); }); + jq.call_soon([&]() { bad.call(); }); // call_soon so it gets queued, as it is being called from inside the loop. loop.call_soon([&]() { main_ok.call(); }); @@ -57,7 +57,7 @@ namespace oxen::quic::test SECTION("Tickers stop when their JobQueue dies") { Loop loop; - auto jq = loop.make_job_queue(); + JobQueue jq{loop}; callback_waiter queued{[]() {}}; @@ -68,14 +68,14 @@ namespace oxen::quic::test // increment each counter every interval loop.call([&]() { - bad = jq->call_every(1ms, [&]() { bad_count++; }); + bad = jq.call_every(1ms, [&]() { bad_count++; }); good = loop.call_every(1ms, [&]() { good_count++; }); loop.call_later(20ms, [&]() { // our ticker references must expire before the job queue does bad.reset(); - jq.reset(); + jq.stop(); }); queued.call(); From 94d7a336909d434731187219c86385d4e6e28a7e Mon Sep 17 00:00:00 2001 From: Thomas Winget Date: Wed, 28 Jan 2026 01:53:37 -0500 Subject: [PATCH 06/16] minor comment fixes --- include/oxen/quic/loop.hpp | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/include/oxen/quic/loop.hpp b/include/oxen/quic/loop.hpp index ae279a92..daa79c65 100644 --- a/include/oxen/quic/loop.hpp +++ b/include/oxen/quic/loop.hpp @@ -76,7 +76,7 @@ namespace oxen::quic // // Do not use this unless you know you need it. // - // The interface is the same as `Loop::call` et al, but a JobQueue can have a shorter lifetime + // The interface is the same as `Loop::call` and similar, but a JobQueue can have a shorter lifetime // than the Loop on which it runs. The purpose of this is if you have multiple components using // the same Loop and one of those components may have jobs queued which reference it *after* its // destructor, that component can instead own this JobQueue and those jobs will not be @@ -330,8 +330,6 @@ namespace oxen::quic bool inside() const { return std::this_thread::get_id() == loop_thread_id; } - // FIXME: this *may* be superfluous with the addition of JobQueue, but since it's - // public I'm not sure if we've used it outside this class... // Returns a pointer deleter that defers invocation of a custom deleter to the event loop template Callable> auto wrapped_deleter(Callable&& f) From f3bda9016c391ae7c650229a0652b03b96d12161 Mon Sep 17 00:00:00 2001 From: Thomas Winget Date: Wed, 28 Jan 2026 02:03:13 -0500 Subject: [PATCH 07/16] Enforce owner of ticker/wakeable responsibility to destroy them Removes some unnecessary containers/checks on Ticker and Wakeable, as it is the owner's responsibility to ensure these are destroyed before the Loop from which they were created stops. --- include/oxen/quic/loop.hpp | 13 ++++--------- src/loop.cpp | 35 +++++++++++------------------------ 2 files changed, 15 insertions(+), 33 deletions(-) diff --git a/include/oxen/quic/loop.hpp b/include/oxen/quic/loop.hpp index daa79c65..fb7fa2d7 100644 --- a/include/oxen/quic/loop.hpp +++ b/include/oxen/quic/loop.hpp @@ -25,12 +25,11 @@ namespace oxen::quic event_ptr ev; timeval interval; std::function f; - std::shared_ptr alive; void init_event( ::event_base* loop, std::chrono::microseconds _t, std::function task, bool start_immediately = true); - Ticker(std::shared_ptr keepalive) : alive{keepalive} {} + Ticker() = default; public: /** Starts the repeating event on the given interval on Ticker creation. Does nothing if @@ -63,9 +62,8 @@ namespace oxen::quic event_ptr ev; std::function f; - std::shared_ptr alive; - Wakeable(std::shared_ptr keepalive) : alive{keepalive} {} + Wakeable() = default; public: /// Call to schedule f() to be called, if not already scheduled. @@ -107,9 +105,6 @@ namespace oxen::quic void add_oneshot_event(std::chrono::microseconds delay, std::function hook); - std::list> tickers; - std::list> wakeables; - std::shared_ptr make_ticker(); // call_later events aren't guaranteed to get properly disposed off if the event loop stops @@ -240,8 +235,8 @@ namespace oxen::quic /// immediately" -- it simply controls whether the initial timer for the first call is /// started or not). /// - /// The ticker will remain active as long the loop remains active and the returned Ticker - /// object is kept alive. + /// The owner of the Ticker is responsible for making sure it does not outlive the Loop + /// from which it was created. template Callable> [[nodiscard]] std::shared_ptr call_every( std::chrono::microseconds interval, Callable&& f, bool start_immediately = true) diff --git a/src/loop.cpp b/src/loop.cpp index eaf5ec3b..9eb87cc5 100644 --- a/src/loop.cpp +++ b/src/loop.cpp @@ -46,9 +46,6 @@ namespace oxen::quic bool Ticker::stop() { - if (!alive) - return true; - if (ev && event_del(ev.get()) != 0) { log::warning(log_cat, "EventHandler failed to pause repeating event!"); @@ -88,7 +85,7 @@ namespace oxen::quic this)); if (start_immediately and not start()) - log::warning(log_cat, "Failed to immediately start one-off event!"); + log::warning(log_cat, "Failed to immediately start repeating event!"); } static std::vector get_ev_methods() @@ -193,15 +190,6 @@ namespace oxen::quic job_waker.reset(); - for (auto& t : tickers) - { - if (auto tick = t.lock()) - { - tick->f = nullptr; - tick->stop(); - } - } - for (auto* osd : delayed_events) delete osd; delayed_events.clear(); @@ -231,15 +219,19 @@ namespace oxen::quic if (!running) return nullptr; - std::erase_if(tickers, [](auto& wp) { return wp.expired(); }); - auto t = make_shared(running); - tickers.emplace_back(t); - return t; + return make_shared(); } std::shared_ptr JobQueue::make_wakeable(std::function callback) { - auto w = make_shared(running); + if (!callback) + { + // FIXME: should this throw/assert? + log::error(log_cat, "Not making Wakeable with empty callback."); + return nullptr; + } + + auto w = make_shared(); w->f = std::move(callback); w->ev.reset(event_new( loop.ev_loop.get(), @@ -247,19 +239,14 @@ namespace oxen::quic 0, [](evutil_socket_t, short, void* w) { auto* wakeable = static_cast(w); - if (wakeable->f) - wakeable->f(); + wakeable->f(); }, w.get())); - wakeables.emplace_back(w); return w; } void Wakeable::wake() { - if (!ev || !alive) - return; - event_active(ev.get(), 0, 0); } From c6ebe64a4b1a6d6d562a0f7be691711e858addfc Mon Sep 17 00:00:00 2001 From: Thomas Winget Date: Wed, 28 Jan 2026 02:41:29 -0500 Subject: [PATCH 08/16] Move Ticker and Wakeable creation back to Loop Since Ticker and Wakeable create their own events and their lifetime is managed by the owner, they do not need to be creatable via both JobQueue and Loop. Since their lifetime is no longer cleaned up automatically by a JobQueue, that test section has been removed. "one-shot" jobs created via `call_later` remain on JobQueue, however, as the creator of that job does not keep a reference to/own it (so it needs to die with the JobQueue when it dies). --- include/oxen/quic/loop.hpp | 46 +++++++------------------------------- src/loop.cpp | 14 +++--------- tests/job-queue.cpp | 34 ---------------------------- 3 files changed, 11 insertions(+), 83 deletions(-) diff --git a/include/oxen/quic/loop.hpp b/include/oxen/quic/loop.hpp index fb7fa2d7..867b3766 100644 --- a/include/oxen/quic/loop.hpp +++ b/include/oxen/quic/loop.hpp @@ -105,8 +105,6 @@ namespace oxen::quic void add_oneshot_event(std::chrono::microseconds delay, std::function hook); - std::shared_ptr make_ticker(); - // call_later events aren't guaranteed to get properly disposed off if the event loop stops // before it fires, so we stash it in here temporarily and remove it when fired. During the // Loop destructor, then, if there's anything left that's one that needs to be cleaned up. @@ -118,8 +116,6 @@ namespace oxen::quic bool inside() const; - ::event_base* get_event_base() const; - public: JobQueue(Loop& l); @@ -225,27 +221,6 @@ namespace oxen::quic return fut.get(); } - /// Sets up a task `f()` to be called on the event loop periodically. - /// - /// `interval` controls the interval on which the task will be called. - /// - /// `start_immediately` controls whether the task is scheduled on the event loop right away - /// (true, the default), or not (false). If not started immediately then the task will not - /// fire until `start()` is called on it. (Note that this parameter does not mean "call - /// immediately" -- it simply controls whether the initial timer for the first call is - /// started or not). - /// - /// The owner of the Ticker is responsible for making sure it does not outlive the Loop - /// from which it was created. - template Callable> - [[nodiscard]] std::shared_ptr call_every( - std::chrono::microseconds interval, Callable&& f, bool start_immediately = true) - { - auto h = make_ticker(); - h->init_event(get_event_base(), interval, std::forward(f), start_immediately); - return h; - } - /// Schedules a call of `f()` on the event loop after a delay. template Callable> void call_later(std::chrono::microseconds delay, Callable hook) @@ -268,12 +243,6 @@ namespace oxen::quic } } - /// Creates a Wakeable event tied to this event loop that can be manually triggered when - /// desired to schedule an invocation of the callback. Unlike call_soon, this is idempotent - /// (i.e. multiple wakeups before it actually runs does not schedule multiple calls). Note - /// that this call only constructs the event, but does not initially schedule it. - std::shared_ptr make_wakeable(std::function hook); - static void activate(::event& evt); /// Schedules a call of `f()` at the next available opportunity on the event loop. Unlike @@ -311,6 +280,8 @@ namespace oxen::quic private: JobQueue main_queue{*this}; + std::shared_ptr make_ticker(); + public: Loop(); @@ -381,13 +352,15 @@ namespace oxen::quic /// immediately" -- it simply controls whether the initial timer for the first call is /// started or not). /// - /// The ticker will remain active as long the loop remains active and the returned Ticker - /// object is kept alive. + /// The owner of the Ticker is responsible for making sure it does not outlive the Loop + /// from which it was created. template Callable> [[nodiscard]] std::shared_ptr call_every( std::chrono::microseconds interval, Callable&& f, bool start_immediately = true) { - return main_queue.call_every(interval, std::forward(f), start_immediately); + auto h = make_ticker(); + h->init_event(get_event_base(), interval, std::forward(f), start_immediately); + return h; } /// Schedules a call of `f()` on the event loop after a delay. @@ -401,10 +374,7 @@ namespace oxen::quic /// desired to schedule an invocation of the callback. Unlike call_soon, this is idempotent /// (i.e. multiple wakeups before it actually runs does not schedule multiple calls). Note /// that this call only constructs the event, but does not initially schedule it. - std::shared_ptr make_wakeable(std::function hook) - { - return main_queue.make_wakeable(std::move(hook)); - } + std::shared_ptr make_wakeable(std::function hook); /// Schedules a call of `f()` at the next available opportunity on the event loop. Unlike /// `call()`, `call_soon()` never calls f() immediately even if already inside the event diff --git a/src/loop.cpp b/src/loop.cpp index 9eb87cc5..0d1f128b 100644 --- a/src/loop.cpp +++ b/src/loop.cpp @@ -214,15 +214,12 @@ namespace oxen::quic #endif } - std::shared_ptr JobQueue::make_ticker() + std::shared_ptr Loop::make_ticker() { - if (!running) - return nullptr; - return make_shared(); } - std::shared_ptr JobQueue::make_wakeable(std::function callback) + std::shared_ptr Loop::make_wakeable(std::function callback) { if (!callback) { @@ -234,7 +231,7 @@ namespace oxen::quic auto w = make_shared(); w->f = std::move(callback); w->ev.reset(event_new( - loop.ev_loop.get(), + ev_loop.get(), -1, 0, [](evutil_socket_t, short, void* w) { @@ -319,11 +316,6 @@ namespace oxen::quic return loop.inside(); } - ::event_base* JobQueue::get_event_base() const - { - return loop.get_event_base(); - } - // Wrapper around event_active so that we can keep libevent out of the public headers. void JobQueue::activate(::event& evt) { diff --git a/tests/job-queue.cpp b/tests/job-queue.cpp index d6d16c42..c29c6deb 100644 --- a/tests/job-queue.cpp +++ b/tests/job-queue.cpp @@ -53,40 +53,6 @@ namespace oxen::quic::test REQUIRE_FALSE(bad.wait(10ms)); REQUIRE(main_ok.wait(10ms)); } - - SECTION("Tickers stop when their JobQueue dies") - { - Loop loop; - JobQueue jq{loop}; - - callback_waiter queued{[]() {}}; - - std::atomic bad_count = 0; - std::atomic good_count = 0; - std::shared_ptr bad; - std::shared_ptr good; - - // increment each counter every interval - loop.call([&]() { - bad = jq.call_every(1ms, [&]() { bad_count++; }); - good = loop.call_every(1ms, [&]() { good_count++; }); - - loop.call_later(20ms, [&]() { - // our ticker references must expire before the job queue does - bad.reset(); - - jq.stop(); - }); - - queued.call(); - }); - - REQUIRE(queued.wait(10ms)); - std::this_thread::sleep_for(40ms); - - // allows for a bit of stupid timing, should be sufficient - REQUIRE(good_count > bad_count + 5); - } } } // namespace oxen::quic::test From 097d0f439489a6ecba7c5e921a3c58b9bd71cb01 Mon Sep 17 00:00:00 2001 From: Thomas Winget Date: Wed, 28 Jan 2026 02:57:00 -0500 Subject: [PATCH 09/16] update some comments on Loop, JobQueue --- include/oxen/quic/loop.hpp | 69 ++++++++++++++------------------------ 1 file changed, 26 insertions(+), 43 deletions(-) diff --git a/include/oxen/quic/loop.hpp b/include/oxen/quic/loop.hpp index 867b3766..470a5447 100644 --- a/include/oxen/quic/loop.hpp +++ b/include/oxen/quic/loop.hpp @@ -105,9 +105,9 @@ namespace oxen::quic void add_oneshot_event(std::chrono::microseconds delay, std::function hook); - // call_later events aren't guaranteed to get properly disposed off if the event loop stops - // before it fires, so we stash it in here temporarily and remove it when fired. During the - // Loop destructor, then, if there's anything left that's one that needs to be cleaned up. + // call_later events aren't guaranteed to get properly disposed off if the job queue is + // destroyed before it fires, so we stash it in here temporarily and remove it when fired. + // During the JobQueue destructor, then any unfired events need to be cleaned up. struct OneShotDelayed; std::list delayed_events; @@ -132,15 +132,14 @@ namespace oxen::quic // Calls stop() if not already called. ~JobQueue(); - // Returns a pointer deleter that defers the actual destruction call to this network - // object's event loop. + // Returns a pointer deleter that defers the actual destruction call to this JobQueue template auto loop_deleter() { return [this](T* ptr) { call_get([ptr] { delete ptr; }); }; } - // Returns a pointer deleter that defers invocation of a custom deleter to the event loop + // Returns a pointer deleter that defers invocation of a custom deleter to this JobQueue template Callable> auto wrapped_deleter(Callable f) { @@ -150,9 +149,8 @@ namespace oxen::quic } // Similar in concept to std::make_shared, but it creates the shared pointer with a - // custom deleter that dispatches actual object destruction to the network's event loop for - // thread safety, and waits for destruction of the overlying object to complete before - // returning. + // custom deleter that dispatches actual object destruction to this JobQueue for thread + // safety, and waits for destruction of the overlying object to complete before returning. template std::shared_ptr make_shared(Args&&... args) { @@ -169,9 +167,8 @@ namespace oxen::quic return std::shared_ptr(obj, wrapped_deleter(std::forward(deleter))); } - /// Calls `f()` on the event loop. If the caller is already in the event loop thread then - /// f() is called immediately; otherwise it is scheduled on the event loop thread at the - /// next available opportunity. + /// Calls `f()` on the JobQueue. If the caller is already in the Loop thread then + /// f() is called immediately; otherwise it is scheduled at the end of the queue. template Callable> void call(Callable&& f) { @@ -185,9 +182,9 @@ namespace oxen::quic } } - // Calls `f()` on the event loop and returns its value. If this is called from within the - // event loop thread then this simply calls and returns the result of `f()`. If *not* in - // the event loop then a call to `f()` is scheduled on the event loop for the next available + // Calls `f()` on the JobQueue and returns its value. If this is called from within the + // Loop thread then this simply calls and returns the result of `f()`. If *not* in + // the Loop thread then a call to `f()` is scheduled on the JobQueue for the next available // opportunity and then the current thread blocks until that call is invoked, then returns // it back to the caller. template ()())> @@ -221,7 +218,7 @@ namespace oxen::quic return fut.get(); } - /// Schedules a call of `f()` on the event loop after a delay. + /// Schedules a call of `f()` on the JobQueue after a delay. template Callable> void call_later(std::chrono::microseconds delay, Callable hook) { @@ -245,9 +242,9 @@ namespace oxen::quic static void activate(::event& evt); - /// Schedules a call of `f()` at the next available opportunity on the event loop. Unlike - /// `call()`, `call_soon()` never calls f() immediately even if already inside the event - /// loop. + /// Schedules a call of `f()` at the next available opportunity on the JobQueue. Unlike + /// `call()`, `call_soon()` never calls f() immediately even if already inside the Loop + /// thread. template Callable> void call_soon(Callable f) { @@ -259,8 +256,8 @@ namespace oxen::quic activate(*job_waker); } - /// Takes any type of shared_ptr and schedules a reset of that shared pointer on the event - /// loop. Asyncronous. + /// Takes any type of shared_ptr and schedules a reset of that shared pointer on the + /// JobQueue. Asyncronous. template void reset_soon(std::shared_ptr&& ptr) { @@ -296,46 +293,35 @@ namespace oxen::quic bool inside() const { return std::this_thread::get_id() == loop_thread_id; } - // Returns a pointer deleter that defers invocation of a custom deleter to the event loop + // See JobQueue::wrapped_deleter, applies to Loop's main event queue. template Callable> auto wrapped_deleter(Callable&& f) { return main_queue.wrapped_deleter(std::forward(f)); } - // Similar in concept to std::make_shared, but it creates the shared pointer with a - // custom deleter that dispatches actual object destruction to the network's event loop for - // thread safety, and waits for destruction of the overlying object to complete before - // returning. + // See JobQueue::make_shared, applies to Loop's main event queue. template std::shared_ptr make_shared(Args&&... args) { return main_queue.make_shared(std::forward(args)...); } - // Similar to the above make_shared, but instead of forwarding arguments for the - // construction of the object, it creates the shared_ptr from the already created object ptr - // and wraps the object's deleter in a wrapped_deleter + // See JobQueue::shared_ptr, applies to Loop's main event queue. template Callable> std::shared_ptr shared_ptr(T* obj, Callable&& deleter) { return main_queue.shared_ptr(obj, std::forward(deleter)); } - /// Calls `f()` on the event loop. If the caller is already in the event loop thread then - /// f() is called immediately; otherwise it is scheduled on the event loop thread at the - /// next available opportunity. + // See JobQueue::call, applies to Loop's main event queue. template Callable> void call(Callable&& f) { main_queue.call(std::forward(f)); } - // Calls `f()` on the event loop and returns its value. If this is called from within the - // event loop thread then this simply calls and returns the result of `f()`. If *not* in - // the event loop then a call to `f()` is scheduled on the event loop for the next available - // opportunity and then the current thread blocks until that call is invoked, then returns - // it back to the caller. + // See JobQueue::call_get, applies to Loop's main event queue. template ()())> Ret call_get(Callable&& f) { @@ -363,7 +349,7 @@ namespace oxen::quic return h; } - /// Schedules a call of `f()` on the event loop after a delay. + // See JobQueue::call_later, applies to Loop's main event queue. template Callable> void call_later(std::chrono::microseconds delay, Callable&& hook) { @@ -376,17 +362,14 @@ namespace oxen::quic /// that this call only constructs the event, but does not initially schedule it. std::shared_ptr make_wakeable(std::function hook); - /// Schedules a call of `f()` at the next available opportunity on the event loop. Unlike - /// `call()`, `call_soon()` never calls f() immediately even if already inside the event - /// loop. + // See JobQueue::call_soon, applies to Loop's main event queue. template Callable> void call_soon(Callable&& f) { main_queue.call_soon(std::forward(f)); } - /// Takes any type of shared_ptr and schedules a reset of that shared pointer on the event - /// loop. Asyncronous. + // See JobQueue::reset_soon, applies to Loop's main event queue. template void reset_soon(std::shared_ptr&& ptr) { From d1794965734a74a0bd008deaf3ecd726fae8df77 Mon Sep 17 00:00:00 2001 From: Thomas Winget Date: Wed, 28 Jan 2026 03:26:05 -0500 Subject: [PATCH 10/16] use event_add instead of oneshot for call_later It makes sense for JobQueue to have its own `call_later`, so the user doesn't have to manage a bunch of one-off jobs. As such, JobQueue needs to manage those jobs and cancel them when it is stopped/destroyed, so event_base_once can't work here. I don't *think* this has any performance implications. --- src/loop.cpp | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/src/loop.cpp b/src/loop.cpp index 0d1f128b..bb69266b 100644 --- a/src/loop.cpp +++ b/src/loop.cpp @@ -159,8 +159,15 @@ namespace oxen::quic { JobQueue& jq; std::function f; + event_ptr ev; OneShotDelayed(JobQueue& jq_, std::function f) : jq{jq_}, f{std::move(f)} {} + + ~OneShotDelayed() + { + if (ev) + event_del(ev.get()); + } }; JobQueue::JobQueue(Loop& l) : loop{l} @@ -269,10 +276,10 @@ namespace oxen::quic delayed_events.push_back(handler); auto& h = *handler; const auto delay_tv = loop_time_to_timeval(delay); - event_base_once( + h.ev.reset(event_new( loop.get_event_base(), -1, - EV_TIMEOUT, + 0, [](evutil_socket_t, short, void* e) mutable { auto* h = static_cast(e); if (h->f) @@ -282,8 +289,8 @@ namespace oxen::quic de.erase(it); delete h; }, - &h, - &delay_tv); + &h)); + event_add(h.ev.get(), &delay_tv); } void JobQueue::process_job_queue() From 64c503a491b2cf42f7edc7d0ab606c319e5dcedd Mon Sep 17 00:00:00 2001 From: Thomas Winget Date: Wed, 28 Jan 2026 03:56:39 -0500 Subject: [PATCH 11/16] Make Endpoint have its own JobQueue Where applicable, its children (Connections, Streams, Datagrams, etc.) will use this JobQueue instead of the main Loop one. There are a few places with timers and such that were not changed, but they clean up their own events on destruction in those cases so it should not be an issue. --- include/oxen/quic/btstream.hpp | 2 +- include/oxen/quic/connection.hpp | 4 +-- include/oxen/quic/endpoint.hpp | 5 ++-- include/oxen/quic/iochannel.hpp | 2 +- include/oxen/quic/loop.hpp | 2 +- src/btstream.cpp | 4 +-- src/connection.cpp | 50 ++++++++++++++++---------------- src/datagram.cpp | 8 ++--- src/endpoint.cpp | 36 +++++++++++------------ src/iochannel.cpp | 2 +- src/stream.cpp | 36 +++++++++++------------ 11 files changed, 76 insertions(+), 75 deletions(-) diff --git a/include/oxen/quic/btstream.hpp b/include/oxen/quic/btstream.hpp index 932766d3..d8cb99c8 100644 --- a/include/oxen/quic/btstream.hpp +++ b/include/oxen/quic/btstream.hpp @@ -282,7 +282,7 @@ namespace oxen::quic auto req = std::make_shared(*this, encode_command(ep, rid, body), rid, std::forward(opts)...); if (req->cb) - loop.call([this, r = std::move(req)]() mutable { + endpoint.job_queue.call([this, r = std::move(req)]() mutable { if (auto* req = add_sent_request(std::move(r))) send(std::move(req->data)); }); diff --git a/include/oxen/quic/connection.hpp b/include/oxen/quic/connection.hpp index 3d180a8c..5921e4a5 100644 --- a/include/oxen/quic/connection.hpp +++ b/include/oxen/quic/connection.hpp @@ -126,7 +126,7 @@ namespace oxen::quic // has a forward declaration; the user of this method needs to have the full definition // available to call this. return std::static_pointer_cast(queue_incoming_stream_impl([&](Connection& c, EndpointDeferred& e) { - return e.loop.template make_shared(c, e, std::forward(args)...); + return e.job_queue.template make_shared(c, e, std::forward(args)...); })); } @@ -148,7 +148,7 @@ namespace oxen::quic std::shared_ptr open_stream(Args&&... args) { return std::static_pointer_cast(open_stream_impl([&](Connection& c, EndpointDeferred& e) { - return e.loop.template make_shared(c, e, std::forward(args)...); + return e.job_queue.template make_shared(c, e, std::forward(args)...); })); } diff --git a/include/oxen/quic/endpoint.hpp b/include/oxen/quic/endpoint.hpp index 5ca08507..d84d0afe 100644 --- a/include/oxen/quic/endpoint.hpp +++ b/include/oxen/quic/endpoint.hpp @@ -50,6 +50,7 @@ namespace oxen::quic connection_closed_callback connection_close_cb; Loop& loop; + JobQueue job_queue{loop}; template void listen(Opt&&... opts) @@ -58,7 +59,7 @@ namespace oxen::quic (0 + ... + std::is_convertible_v, std::shared_ptr>) == 1, "listen() requires exactly one std::shared_ptr argument"); - loop.call_get([&opts..., this]() { + job_queue.call_get([&opts..., this]() { if (inbound_ctx) throw std::logic_error{"Cannot call listen() more than once"}; @@ -82,7 +83,7 @@ namespace oxen::quic if (_local.is_ipv6() && !remote.is_ipv6()) remote.map_ipv4_as_ipv6(); - return loop.call_get([this, &opts..., remote = std::move(remote)]() mutable { + return job_queue.call_get([this, &opts..., remote = std::move(remote)]() mutable { // initialize client context and client tls context simultaneously auto outbound_ctx = std::make_shared(Direction::OUTBOUND, std::forward(opts)...); _assign_context_globals(*outbound_ctx); diff --git a/include/oxen/quic/iochannel.hpp b/include/oxen/quic/iochannel.hpp index cc71c217..a9a4c37f 100644 --- a/include/oxen/quic/iochannel.hpp +++ b/include/oxen/quic/iochannel.hpp @@ -121,7 +121,7 @@ namespace oxen::quic typename EP = Endpoint> Ret call_get_accessor(T (Class::*getter)() const) const { - return static_cast(endpoint).loop.call_get( + return static_cast(endpoint).job_queue.call_get( [this, &getter]() -> Ret { return (static_cast(this)->*getter)(); }); } }; diff --git a/include/oxen/quic/loop.hpp b/include/oxen/quic/loop.hpp index 470a5447..f19bebe1 100644 --- a/include/oxen/quic/loop.hpp +++ b/include/oxen/quic/loop.hpp @@ -114,9 +114,9 @@ namespace oxen::quic void setup_job_waker(); void process_job_queue(); + public: bool inside() const; - public: JobQueue(Loop& l); // Cancels all jobs in the queue and deletes this job queue's event from the event loop. diff --git a/src/btstream.cpp b/src/btstream.cpp index 163ddcfa..53c83b91 100644 --- a/src/btstream.cpp +++ b/src/btstream.cpp @@ -169,7 +169,7 @@ namespace oxen::quic void BTRequestStream::register_handler(std::string ep, std::function func) { - loop.call([this, ep = std::move(ep), func = std::move(func)]() mutable { + endpoint.job_queue.call([this, ep = std::move(ep), func = std::move(func)]() mutable { registered_endpoints[std::move(ep)] = std::move(func); }); } @@ -177,7 +177,7 @@ namespace oxen::quic void BTRequestStream::register_generic_handler(std::function request_handler) { log::debug(log_cat, "BTRequestStream set generic request handler"); - loop.call([this, func = std::move(request_handler)]() mutable { generic_handler = std::move(func); }); + endpoint.job_queue.call([this, func = std::move(request_handler)]() mutable { generic_handler = std::move(func); }); } void BTRequestStream::handle_input(message msg) diff --git a/src/connection.cpp b/src/connection.cpp index 369a98af..d67a6f53 100644 --- a/src/connection.cpp +++ b/src/connection.cpp @@ -478,7 +478,7 @@ namespace oxen::quic void Connection::set_new_path(Path new_path) { - _loop.call([this, new_path]() { _path = new_path; }); + _endpoint.job_queue.call([this, new_path]() { _path = new_path; }); } int Connection::recv_token(const uint8_t* token, size_t tokenlen) @@ -588,12 +588,12 @@ namespace oxen::quic void Connection::set_remote_addr(const ngtcp2_addr& new_remote) { - _loop.call([this, new_remote]() { _path.set_new_remote(new_remote); }); + _endpoint.job_queue.call([this, new_remote]() { _path.set_new_remote(new_remote); }); } void Connection::set_local_addr(Address new_local) { - _loop.call([this, new_local]() { + _endpoint.job_queue.call([this, new_local]() { Path new_path{new_local, _path.remote}; _path = new_path; }); @@ -629,7 +629,7 @@ namespace oxen::quic void Connection::halt_events() { log::trace(log_cat, "{} called", __PRETTY_FUNCTION__); - assert(_loop.inside()); + assert(_endpoint.job_queue.inside()); packet_io_trigger.reset(); packet_retransmit_timer.reset(); log::debug(log_cat, "Connection ({}) io trigger/retransmit timer events halted", reference_id()); @@ -637,7 +637,7 @@ namespace oxen::quic void Connection::packet_io_ready() { - assert(_loop.inside()); + assert(_endpoint.job_queue.inside()); if (packet_io_trigger) event_active(packet_io_trigger.get(), 0, 0); // else we've reset the trigger (via halt_events), which means the connection is closing/draining/etc. @@ -650,12 +650,12 @@ namespace oxen::quic std::shared_ptr Connection::datagrams() { - return _loop.call_get([this] { return dgrams; }); + return _endpoint.job_queue.call_get([this] { return dgrams; }); } void Connection::revert_early_channels() { - assert(_loop.inside()); + assert(_endpoint.job_queue.inside()); log::debug(log_cat, "Client reverting early stream data"); // We need to re-open any opened streams because the remote rejected early data, and when @@ -734,7 +734,7 @@ namespace oxen::quic break; case NGTCP2_ERR_DRAINING: log::trace(log_cat, "Note: {} is draining; signaling endpoint to drain connection", reference_id()); - _loop.call_soon([wself = weak_from_this()]() { + _endpoint.job_queue.call_soon([wself = weak_from_this()]() { if (auto self = wself.lock()) { log::debug(log_cat, "Endpoint draining connection {}", self->reference_id()); @@ -819,7 +819,7 @@ namespace oxen::quic if (!stream && default_stream) stream = default_stream(*this, _endpoint); if (!stream) - stream = _loop.make_shared( + stream = _endpoint.job_queue.make_shared( *this, _endpoint, context->stream_data_cb, context->stream_close_cb, context->stream_fin_cb); return stream; @@ -828,7 +828,7 @@ namespace oxen::quic std::shared_ptr Connection::queue_incoming_stream_impl( std::function(Connection& c, Endpoint& e)> make_stream) { - return _loop.call_get([this, &make_stream]() { + return _endpoint.job_queue.call_get([this, &make_stream]() { std::shared_ptr stream; if (make_stream) stream = make_stream(*this, _endpoint); @@ -870,7 +870,7 @@ namespace oxen::quic std::shared_ptr Connection::open_stream_impl( std::function(Connection& c, Endpoint& e)> make_stream) { - return _loop.call_get([this, &make_stream]() { + return _endpoint.job_queue.call_get([this, &make_stream]() { std::shared_ptr stream; if (make_stream) stream = make_stream(*this, _endpoint); @@ -918,7 +918,7 @@ namespace oxen::quic std::shared_ptr Connection::get_stream_impl(int64_t id) { - return _loop.call_get([this, id]() -> std::shared_ptr { + return _endpoint.job_queue.call_get([this, id]() -> std::shared_ptr { if (auto it = _streams.find(id); it != _streams.end()) return it->second; @@ -1404,7 +1404,7 @@ namespace oxen::quic if (uint64_t app_err_code = context->stream_open_cb ? context->stream_open_cb(*stream) : 0; app_err_code != 0) { log::info(log_cat, "stream_open_callback returned error code {}, closing stream {}", app_err_code, id); - assert(_loop.inside()); + assert(_endpoint.job_queue.inside()); stream->close(app_err_code); return 0; } @@ -1666,7 +1666,7 @@ namespace oxen::quic std::string_view Connection::selected_alpn() const { - return _loop.call_get( + return _endpoint.job_queue.call_get( [this]() { return (handshaked or establish_hook_called) ? get_session()->selected_alpn() : ""sv; }); } @@ -1708,7 +1708,7 @@ namespace oxen::quic { if (!_max_dgram_size_changed) return std::nullopt; - return _loop.call_get([this]() -> std::optional { + return _endpoint.job_queue.call_get([this]() -> std::optional { // Check it again via an exchange, in case someone raced us here if (_max_dgram_size_changed.exchange(false)) return _last_max_dgram_piece * (_packet_splitting ? 2 : 1); @@ -1858,12 +1858,12 @@ namespace oxen::quic : nullptr; if (context->config.datagram_support) - dgrams = _loop.make_shared( + dgrams = _endpoint.job_queue.make_shared( *this, _endpoint, context->dgram_data_cb ? context->dgram_data_cb : ep.dgram_recv_cb, context->config.dgram_queue_limit); - pseudo_stream = _loop.make_shared(*this, _endpoint); + pseudo_stream = _endpoint.job_queue.make_shared(*this, _endpoint); pseudo_stream->_stream_id = -1; const auto d_str = is_outbound() ? "outbound" : "inbound"; @@ -2084,35 +2084,35 @@ namespace oxen::quic size_t Connection::num_streams_active() const { - return _loop.call_get([this] { return _streams.size(); }); + return _endpoint.job_queue.call_get([this] { return _streams.size(); }); } size_t Connection::num_streams_pending() const { - return _loop.call_get([this] { return pending_streams.size(); }); + return _endpoint.job_queue.call_get([this] { return pending_streams.size(); }); } uint64_t Connection::get_max_streams() const { - return _loop.call_get([this] { return _max_streams; }); + return _endpoint.job_queue.call_get([this] { return _max_streams; }); } uint64_t Connection::get_streams_available() const { - return _loop.call_get([this] { return ngtcp2_conn_get_streams_bidi_left(*this); }); + return _endpoint.job_queue.call_get([this] { return ngtcp2_conn_get_streams_bidi_left(*this); }); } Path Connection::path() const { - return _loop.call_get([this] { return _path; }); + return _endpoint.job_queue.call_get([this] { return _path; }); } Address Connection::local() const { - return _loop.call_get([this] { return _path.local; }); + return _endpoint.job_queue.call_get([this] { return _path.local; }); } Address Connection::remote() const { - return _loop.call_get([this] { return _path.remote; }); + return _endpoint.job_queue.call_get([this] { return _path.remote; }); } size_t Connection::get_max_datagram_size() const { - return _loop.call_get([this] { return get_max_datagram_piece() * (_packet_splitting ? 2 : 1); }); + return _endpoint.job_queue.call_get([this] { return get_max_datagram_piece() * (_packet_splitting ? 2 : 1); }); } Connection::~Connection() diff --git a/src/datagram.cpp b/src/datagram.cpp index 388ee9ed..914c93d9 100644 --- a/src/datagram.cpp +++ b/src/datagram.cpp @@ -51,19 +51,19 @@ namespace oxen::quic void Datagrams::set_split_datagram_lookahead(int n) { - loop.call([this, val = n >= 0 ? static_cast(n) : dgram::queue::DEFAULT_SPLIT_LOOKAHEAD] { + endpoint.job_queue.call([this, val = n >= 0 ? static_cast(n) : dgram::queue::DEFAULT_SPLIT_LOOKAHEAD] { log::debug(log_cat, "Changing split datagram lookahead from {} to {}", _send_buffer.split_lookahead, val); _send_buffer.split_lookahead = val; }); } int Datagrams::get_split_datagram_lookahead() const { - return loop.call_get([this] { return static_cast(_send_buffer.split_lookahead); }); + return endpoint.job_queue.call_get([this] { return static_cast(_send_buffer.split_lookahead); }); } void Datagrams::send_impl(std::span data, std::shared_ptr keep_alive) { - loop.call([this, data, keep_alive = std::move(keep_alive)]() mutable { + endpoint.job_queue.call([this, data, keep_alive = std::move(keep_alive)]() mutable { if (!_conn) { log::debug(log_cat, "Unable to send datagram: connection has gone away"); @@ -122,7 +122,7 @@ namespace oxen::quic { log::trace(log_cat, "{} called", __PRETTY_FUNCTION__); - assert(datagram.loop.inside()); + assert(datagram.endpoint.job_queue.inside()); assert(datagram._conn); auto idx = dgid >> 2; diff --git a/src/endpoint.cpp b/src/endpoint.cpp index dadf6b25..884b6776 100644 --- a/src/endpoint.cpp +++ b/src/endpoint.cpp @@ -113,7 +113,7 @@ namespace oxen::quic ConnectionID Endpoint::next_reference_id() { log::trace(log_cat, "{} called", __PRETTY_FUNCTION__); - assert(loop.inside()); + assert(job_queue.inside()); return ConnectionID{++_next_rid}; } @@ -127,11 +127,11 @@ namespace oxen::quic void Endpoint::manually_receive_packet(Packet&& pkt) { - if (loop.inside()) + if (job_queue.inside()) return handle_packet(std::move(pkt)); pkt.ensure_owned_data(); - loop.call_soon([this, packet = std::move(pkt)]() mutable { handle_packet(std::move(packet)); }); + job_queue.call_soon([this, packet = std::move(pkt)]() mutable { handle_packet(std::move(packet)); }); } void Endpoint::_init_internals() @@ -229,7 +229,7 @@ namespace oxen::quic { // We need to defer this because we aren't allowed to close connections during some other // callback, and can't guarantee we aren't in such a callback. - loop.call_soon([wself = weak_from_this(), d] { + job_queue.call_soon([wself = weak_from_this(), d] { if (auto self = wself.lock()) self->_close_conns(d); }); @@ -250,7 +250,7 @@ namespace oxen::quic Endpoint::~Endpoint() { - assert(loop.inside()); + assert(job_queue.inside()); _close_conns(std::nullopt); // Close it here rather than via member destruction because it still owns a callback that @@ -260,7 +260,7 @@ namespace oxen::quic void Endpoint::schedule_conn_cleanup(Connection& conn) { - loop.call_later( + job_queue.call_later( std::chrono::microseconds{(ngtcp2_conn_get_pto(conn) * 3 + 999) / 1000}, [this, wself = weak_from_this(), cid = conn.reference_id()] { auto self = wself.lock(); @@ -367,7 +367,7 @@ namespace oxen::quic void Endpoint::drop_connection(Connection& conn, io_error err) { log::debug(log_cat, "Scheduling drop connection ({}) with errcode {}", conn.reference_id(), err.code()); - loop.call_soon([wself = weak_from_this(), &conn, err] { + job_queue.call_soon([wself = weak_from_this(), &conn, err] { if (auto self = wself.lock()) self->_drop_connection(conn, err); }); @@ -377,7 +377,7 @@ namespace oxen::quic { if (!msg) msg = ec.strerror(); - loop.call_soon([wself = weak_from_this(), + job_queue.call_soon([wself = weak_from_this(), connid = conn.reference_id(), ec = std::move(ec), msg = std::move(*msg)]() mutable { @@ -426,7 +426,7 @@ namespace oxen::quic { log::debug(log_cat, "Closing connection ({})", conn.reference_id()); - assert(loop.inside()); + assert(job_queue.inside()); if (conn.is_closing() || conn.is_draining()) return; @@ -532,7 +532,7 @@ namespace oxen::quic // Defer destruction until the next event loop tick because there are code paths that // can land here from within an ongoing connection method and so it isn't safe to allow // the Connection to get destroyed right now. - loop.reset_soon(std::move(it->second)); + job_queue.reset_soon(std::move(it->second)); // We do want to remove it from `conns`, though, because some scheduled callbacks check // for `rid` being still in the endpoint and so, in that respect, we want the connection // to be considered gone even if its destructor doesn't fire yet. @@ -544,7 +544,7 @@ namespace oxen::quic void Endpoint::initial_association(Connection& conn) { log::trace(log_cat, "{} called", __PRETTY_FUNCTION__); - assert(loop.inside()); + assert(job_queue.inside()); auto dir_str = conn.is_outbound() ? "CLIENT"s : "SERVER"s; auto n = ngtcp2_conn_get_scid(conn, nullptr); @@ -568,7 +568,7 @@ namespace oxen::quic void Endpoint::associate_reset(const uint8_t* token, Connection& conn) { - assert(loop.inside()); + assert(job_queue.inside()); if (!token) { log::debug(log_cat, "Cannot add a null reset token"); @@ -585,7 +585,7 @@ namespace oxen::quic void Endpoint::dissociate_reset(const uint8_t* token, Connection& conn) { - assert(loop.inside()); + assert(job_queue.inside()); if (!token) return; @@ -614,7 +614,7 @@ namespace oxen::quic void Endpoint::associate_cid(const quic_cid& qcid, Connection& conn, bool weakly) { - assert(loop.inside()); + assert(job_queue.inside()); log::trace( log_cat, "{} associating CID:{} to {}", conn.is_inbound() ? "SERVER" : "CLIENT", qcid, conn.reference_id()); @@ -625,14 +625,14 @@ namespace oxen::quic void Endpoint::associate_cid(const ngtcp2_cid* cid, Connection& conn) { - assert(loop.inside()); + assert(job_queue.inside()); if (cid->datalen) return associate_cid(quic_cid{*cid}, conn); } void Endpoint::dissociate_cid(const quic_cid& qcid, Connection& conn) { - assert(loop.inside()); + assert(job_queue.inside()); log::trace( log_cat, "{} dissociating CID:{} to {}", conn.is_inbound() ? "SERVER" : "CLIENT", qcid, conn.reference_id()); @@ -642,7 +642,7 @@ namespace oxen::quic void Endpoint::dissociate_cid(const ngtcp2_cid* cid, Connection& conn) { - assert(loop.inside()); + assert(job_queue.inside()); if (cid->datalen) return dissociate_cid(quic_cid{*cid}, conn); } @@ -967,7 +967,7 @@ namespace oxen::quic log::debug(log_cat, "Constructing path using packet path: {}", pkt.path); - assert(loop.inside()); + assert(job_queue.inside()); auto next_rid = next_reference_id(); diff --git a/src/iochannel.cpp b/src/iochannel.cpp index 27ba593c..1563ac1b 100644 --- a/src/iochannel.cpp +++ b/src/iochannel.cpp @@ -15,7 +15,7 @@ namespace oxen::quic std::shared_ptr IOChannel::get_conn() { - return loop.call_get([this] { return _conn ? _conn->shared_from_this() : nullptr; }); + return endpoint.job_queue.call_get([this] { return _conn ? _conn->shared_from_this() : nullptr; }); } void IOChannel::send(std::string&& data) diff --git a/src/stream.cpp b/src/stream.cpp index faf04e08..27d3b16d 100644 --- a/src/stream.cpp +++ b/src/stream.cpp @@ -66,7 +66,7 @@ namespace oxen::quic throw std::logic_error{ "Invalid enable_watermarks() call: alarm watermark ({}) must be > clear watermark ({})"_format( alarm, clear)}; - loop.call_get([&] { + endpoint.job_queue.call_get([&] { if (_is_closing || _send_fin) { log::debug(log_cat, "Failed to set watermarks; stream is not active!"); @@ -94,7 +94,7 @@ namespace oxen::quic void Stream::disable_watermarks() { - loop.call_get([this] { + endpoint.job_queue.call_get([this] { if (!_watermarking) return; _watermarking.reset(); @@ -107,7 +107,7 @@ namespace oxen::quic void Stream::pause() { - loop.call_get([this]() { + endpoint.job_queue.call_get([this]() { if (not _paused) { log::debug(log_cat, "Pausing stream ID:{}", _stream_id); @@ -121,7 +121,7 @@ namespace oxen::quic void Stream::resume() { - loop.call_get([this]() { + endpoint.job_queue.call_get([this]() { if (_paused) { log::debug(log_cat, "Resuming stream ID:{}", _stream_id); @@ -140,26 +140,26 @@ namespace oxen::quic bool Stream::is_paused() const { - return loop.call_get([this]() { return _paused; }); + return endpoint.job_queue.call_get([this]() { return _paused; }); } bool Stream::writable() const { - return loop.call_get([this] { return !(_is_closing || _send_fin || _sent_fin); }); + return endpoint.job_queue.call_get([this] { return !(_is_closing || _send_fin || _sent_fin); }); } bool Stream::readable() const { - return loop.call_get([this] { return !(_is_closing || _received_fin); }); + return endpoint.job_queue.call_get([this] { return !(_is_closing || _received_fin); }); } bool Stream::is_ready() const { - return loop.call_get([this] { return _ready; }); + return endpoint.job_queue.call_get([this] { return _ready; }); } std::optional Stream::watermark_status() const { - return loop.call_get([this]() -> std::optional { + return endpoint.job_queue.call_get([this]() -> std::optional { if (!_watermarking) return std::nullopt; return _watermark_alarm; @@ -175,7 +175,7 @@ namespace oxen::quic void Stream::send_fin() { - loop.call([this] { + endpoint.job_queue.call([this] { _send_fin = true; _conn->packet_io_ready(); }); @@ -188,7 +188,7 @@ namespace oxen::quic // NB: this *must* be a call (not a call_soon) because Connection calls on a short-lived // Stream that won't survive a return to the event loop. - loop.call([this, app_err_code]() { + endpoint.job_queue.call([this, app_err_code]() { log::trace(log_cat, "{} called", __PRETTY_FUNCTION__); if (_is_closing) @@ -216,15 +216,15 @@ namespace oxen::quic void Stream::set_data_callback(stream_data_callback cb) { - loop.call_get([&] { _data_callback = std::move(cb); }); + endpoint.job_queue.call_get([&] { _data_callback = std::move(cb); }); } void Stream::set_close_callback(stream_close_callback cb) { - loop.call_get([&] { _close_callback = std::move(cb); }); + endpoint.job_queue.call_get([&] { _close_callback = std::move(cb); }); } void Stream::set_fin_callback(std::function cb) { - loop.call_get([&] { _fin_callback = std::move(cb); }); + endpoint.job_queue.call_get([&] { _fin_callback = std::move(cb); }); } void Stream::closed(uint64_t app_code) @@ -277,7 +277,7 @@ namespace oxen::quic void Stream::append_buffer(std::span buffer, std::shared_ptr keep_alive) { log::trace(log_cat, "{} called", __PRETTY_FUNCTION__); - assert(loop.inside()); + assert(endpoint.job_queue.inside()); assert(_conn); _unsent_size += buffer.size(); @@ -368,7 +368,7 @@ namespace oxen::quic void Stream::revert_stream() { - assert(loop.inside()); + assert(endpoint.job_queue.inside()); log::trace(log_cat, "Stream (ID:{}) reverting after early data rejected...", _stream_id); _unacked_size = 0; _current_buffer_index = 0; @@ -424,7 +424,7 @@ namespace oxen::quic // still actually alive. (But if we're already in the event loop the lambda fires // immediately and we don't want to have to do an extra refcount increment/decrement). std::optional> wself; - if (!loop.inside()) + if (!endpoint.job_queue.inside()) wself = weak_from_this(); // In theory, `endpoint` that we use here might be inaccessible as well, but unlike conn @@ -432,7 +432,7 @@ namespace oxen::quic // events) the application has control and responsibility for keeping the network/endpoint // alive at least as long as all the Connections/Streams that instances that were attached // to it. - loop.call([this, wself = std::move(wself), data, ka = std::move(keep_alive)]() { + endpoint.job_queue.call([this, wself = std::move(wself), data, ka = std::move(keep_alive)]() { std::shared_ptr sself; if (wself) { From adfb7bf6565582125334f0216214cbe9f3aee76b Mon Sep 17 00:00:00 2001 From: Thomas Winget Date: Wed, 28 Jan 2026 03:58:58 -0500 Subject: [PATCH 12/16] clang format --- src/endpoint.cpp | 6 +++--- src/loop.cpp | 9 ++++++--- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/src/endpoint.cpp b/src/endpoint.cpp index 884b6776..0104f8cf 100644 --- a/src/endpoint.cpp +++ b/src/endpoint.cpp @@ -378,9 +378,9 @@ namespace oxen::quic if (!msg) msg = ec.strerror(); job_queue.call_soon([wself = weak_from_this(), - connid = conn.reference_id(), - ec = std::move(ec), - msg = std::move(*msg)]() mutable { + connid = conn.reference_id(), + ec = std::move(ec), + msg = std::move(*msg)]() mutable { if (auto self = wself.lock()) if (auto it = self->conns.find(connid); it != self->conns.end() && it->second) self->_close_connection(*it->second, std::move(ec), std::move(msg)); diff --git a/src/loop.cpp b/src/loop.cpp index bb69266b..d41788c3 100644 --- a/src/loop.cpp +++ b/src/loop.cpp @@ -96,7 +96,8 @@ namespace oxen::quic return ev_methods_avail; } - static ::event_base* make_ev_loop() { + static ::event_base* make_ev_loop() + { #ifdef _WIN32 { @@ -175,7 +176,8 @@ namespace oxen::quic setup_job_waker(); } - JobQueue::~JobQueue() { + JobQueue::~JobQueue() + { log::debug(log_cat, "Destryoing job queue."); if (job_waker) stop(); @@ -184,7 +186,8 @@ namespace oxen::quic void JobQueue::stop() { // Synchronization point: if we aren't on the loop, recurse into it: - if (!loop.inside()) { + if (!loop.inside()) + { loop.call_get([this] { stop(); }); return; } From 3190e9d79a5dcfdcc658812133e3221a595b7ce8 Mon Sep 17 00:00:00 2001 From: Thomas Winget Date: Wed, 28 Jan 2026 14:25:19 -0500 Subject: [PATCH 13/16] Add JobQueue destruction safety to call_get When a JobQueue is destroyed with a pending call_get on its queue, the destructor of that callable will now release the promise with a future error; otherwise a race condition could cause another thread to hang permanently. --- include/oxen/quic/loop.hpp | 37 +++++++++++++++++++++++-------------- tests/job-queue.cpp | 29 ++++++++++++++++++++++++++--- 2 files changed, 49 insertions(+), 17 deletions(-) diff --git a/include/oxen/quic/loop.hpp b/include/oxen/quic/loop.hpp index f19bebe1..42397f01 100644 --- a/include/oxen/quic/loop.hpp +++ b/include/oxen/quic/loop.hpp @@ -195,25 +195,34 @@ namespace oxen::quic return f(); } - std::promise prom; - auto fut = prom.get_future(); + struct CallGetter + { + std::shared_ptr> prom{std::make_shared>()}; + Callable& f; - call_soon([&f, &prom] { - try + void operator()() { - if constexpr (!std::is_void_v) - prom.set_value(f()); - else + try { - f(); - prom.set_value(); + if constexpr (!std::is_void_v) + prom->set_value(f()); + else + { + f(); + prom->set_value(); + } + } + catch (...) + { + prom->set_exception(std::current_exception()); } } - catch (...) - { - prom.set_exception(std::current_exception()); - } - }); + }; + + CallGetter g{.f = f}; + auto fut = g.prom->get_future(); + + call_soon(std::move(g)); return fut.get(); } diff --git a/tests/job-queue.cpp b/tests/job-queue.cpp index c29c6deb..09d0fd8c 100644 --- a/tests/job-queue.cpp +++ b/tests/job-queue.cpp @@ -16,11 +16,11 @@ namespace oxen::quic::test TEST_CASE("JobQueue - One extra queue", "[jobqueue]") { + Loop loop; + JobQueue jq{loop}; + SECTION("Extra queue jobs go away, others do not.") { - Loop loop; - JobQueue jq{loop}; - callback_waiter queued{[]() {}}; callback_waiter good{[]() {}}; callback_waiter bad{[]() {}}; @@ -53,6 +53,29 @@ namespace oxen::quic::test REQUIRE_FALSE(bad.wait(10ms)); REQUIRE(main_ok.wait(10ms)); } + + SECTION("call_get exception if JobQueue goes away before fulfilled") + { + bool foo{false}; + + jq.call([&]() { + // this needs to happen after the call_get below is queued. hopefully there + // won't be some fruit-flavored platform where this sleep is insufficient. + std::this_thread::sleep_for(10ms); + jq.stop(); + }); + + try + { + foo = jq.call_get([&]() { return true; }); + } + catch (std::future_error& e) + { + // this is the expected case + } + + REQUIRE_FALSE(foo); + } } } // namespace oxen::quic::test From 085c68d8b0fbf410c24ed9154c888163166642f4 Mon Sep 17 00:00:00 2001 From: Thomas Winget Date: Wed, 28 Jan 2026 15:14:21 -0500 Subject: [PATCH 14/16] change try/catch test to require throw --- tests/job-queue.cpp | 13 +------------ 1 file changed, 1 insertion(+), 12 deletions(-) diff --git a/tests/job-queue.cpp b/tests/job-queue.cpp index 09d0fd8c..481a7a51 100644 --- a/tests/job-queue.cpp +++ b/tests/job-queue.cpp @@ -56,8 +56,6 @@ namespace oxen::quic::test SECTION("call_get exception if JobQueue goes away before fulfilled") { - bool foo{false}; - jq.call([&]() { // this needs to happen after the call_get below is queued. hopefully there // won't be some fruit-flavored platform where this sleep is insufficient. @@ -65,16 +63,7 @@ namespace oxen::quic::test jq.stop(); }); - try - { - foo = jq.call_get([&]() { return true; }); - } - catch (std::future_error& e) - { - // this is the expected case - } - - REQUIRE_FALSE(foo); + REQUIRE_THROWS_AS(jq.call_get([&]() {}), std::future_error); } } From b72ab24f3aafc10167d895fbd81362c018d9995a Mon Sep 17 00:00:00 2001 From: Thomas Winget Date: Wed, 28 Jan 2026 21:21:09 -0500 Subject: [PATCH 15/16] fix jobqueue test and throw on call after stop --- include/oxen/quic/loop.hpp | 2 ++ src/loop.cpp | 13 +++++++++ tests/job-queue.cpp | 54 +++++++++++++++++++++++++++++++++++++- 3 files changed, 68 insertions(+), 1 deletion(-) diff --git a/include/oxen/quic/loop.hpp b/include/oxen/quic/loop.hpp index 42397f01..d9214b5f 100644 --- a/include/oxen/quic/loop.hpp +++ b/include/oxen/quic/loop.hpp @@ -259,6 +259,8 @@ namespace oxen::quic { { std::lock_guard lock{job_queue_mutex}; + if (!*running) + throw std::runtime_error{"Attempting to queue job onto stopped loop."}; job_queue.emplace(std::move(f)); } diff --git a/src/loop.cpp b/src/loop.cpp index d41788c3..9d9b398b 100644 --- a/src/loop.cpp +++ b/src/loop.cpp @@ -192,6 +192,7 @@ namespace oxen::quic return; } + std::lock_guard l{job_queue_mutex}; if (!job_waker) return; @@ -200,6 +201,9 @@ namespace oxen::quic job_waker.reset(); + // Why does std::queue not have a clear() method? + std::queue{}.swap(job_queue); + for (auto* osd : delayed_events) delete osd; delayed_events.clear(); @@ -275,6 +279,15 @@ namespace oxen::quic void JobQueue::add_oneshot_event(std::chrono::microseconds delay, std::function hook) { + // lock if not in loop thread, to make running check safe -- most uses of this should be + // from the loop thread, so this shouldn't be a bottleneck + std::unique_lock l{job_queue_mutex}; + if (!inside()) + l.lock(); + + if (!*running) + throw std::runtime_error{"Attempting to queue job onto stopped loop."}; + auto* handler = new OneShotDelayed{*this, std::move(hook)}; delayed_events.push_back(handler); auto& h = *handler; diff --git a/tests/job-queue.cpp b/tests/job-queue.cpp index 481a7a51..e92055fd 100644 --- a/tests/job-queue.cpp +++ b/tests/job-queue.cpp @@ -54,6 +54,46 @@ namespace oxen::quic::test REQUIRE(main_ok.wait(10ms)); } + SECTION("call exception if invoked after JobQueue stopped") + { + callback_waiter stopped{[]() {}}; + bool soon_failed{false}; + bool later_failed{false}; + + jq.call([&]() { + jq.stop(); + + try + { + jq.call_soon([]() {}); + } + catch (const std::exception& e) + { + soon_failed = true; + } + + try + { + jq.call_later(1s, []() {}); + } + catch (const std::exception& e) + { + later_failed = true; + } + + stopped.call(); + }); + + CHECK_NOFAIL(stopped.wait(10ms)); + + REQUIRE(soon_failed); + REQUIRE(later_failed); + REQUIRE_THROWS_AS(jq.call([]() {}), std::runtime_error); + REQUIRE_THROWS_AS(jq.call_soon([]() {}), std::runtime_error); + REQUIRE_THROWS_AS(jq.call_get([]() { return 0; }), std::runtime_error); + REQUIRE_THROWS_AS(jq.call_later(1s, []() {}), std::runtime_error); + } + SECTION("call_get exception if JobQueue goes away before fulfilled") { jq.call([&]() { @@ -63,7 +103,19 @@ namespace oxen::quic::test jq.stop(); }); - REQUIRE_THROWS_AS(jq.call_get([&]() {}), std::future_error); + bool success{false}; + try + { + jq.call_get([&]() {}); + } + catch (const std::future_error& e) + { + success = true; + } + catch (const std::exception& e) + {} + + CHECK_NOFAIL(success); } } From ee1240fe8388778a43d82e3d1dc5613a1a404cfb Mon Sep 17 00:00:00 2001 From: Thomas Winget Date: Wed, 28 Jan 2026 21:30:16 -0500 Subject: [PATCH 16/16] defer lock, woops --- src/loop.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/loop.cpp b/src/loop.cpp index 9d9b398b..78f458fb 100644 --- a/src/loop.cpp +++ b/src/loop.cpp @@ -281,7 +281,7 @@ namespace oxen::quic { // lock if not in loop thread, to make running check safe -- most uses of this should be // from the loop thread, so this shouldn't be a bottleneck - std::unique_lock l{job_queue_mutex}; + std::unique_lock l{job_queue_mutex, std::defer_lock}; if (!inside()) l.lock();