diff --git a/CMakeLists.txt b/CMakeLists.txt index ea64135c..b11d839f 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -17,7 +17,7 @@ if(CCACHE_PROGRAM) endif() project(libquic - VERSION 1.7.1 + VERSION 1.8.0 DESCRIPTION "Modular QUIC library for stream and connection management" LANGUAGES ${LANGS}) diff --git a/include/oxen/quic/btstream.hpp b/include/oxen/quic/btstream.hpp index 30ac7d5c..d8cb99c8 100644 --- a/include/oxen/quic/btstream.hpp +++ b/include/oxen/quic/btstream.hpp @@ -247,6 +247,7 @@ namespace oxen::quic friend struct sent_request; friend class Network; friend class Loop; + friend class JobQueue; protected: template @@ -281,7 +282,7 @@ namespace oxen::quic auto req = std::make_shared(*this, encode_command(ep, rid, body), rid, std::forward(opts)...); if (req->cb) - loop.call([this, r = std::move(req)]() mutable { + endpoint.job_queue.call([this, r = std::move(req)]() mutable { if (auto* req = add_sent_request(std::move(r))) send(std::move(req->data)); }); diff --git a/include/oxen/quic/connection.hpp b/include/oxen/quic/connection.hpp index 3d180a8c..5921e4a5 100644 --- a/include/oxen/quic/connection.hpp +++ b/include/oxen/quic/connection.hpp @@ -126,7 +126,7 @@ namespace oxen::quic // has a forward declaration; the user of this method needs to have the full definition // available to call this. return std::static_pointer_cast(queue_incoming_stream_impl([&](Connection& c, EndpointDeferred& e) { - return e.loop.template make_shared(c, e, std::forward(args)...); + return e.job_queue.template make_shared(c, e, std::forward(args)...); })); } @@ -148,7 +148,7 @@ namespace oxen::quic std::shared_ptr open_stream(Args&&... args) { return std::static_pointer_cast(open_stream_impl([&](Connection& c, EndpointDeferred& e) { - return e.loop.template make_shared(c, e, std::forward(args)...); + return e.job_queue.template make_shared(c, e, std::forward(args)...); })); } diff --git a/include/oxen/quic/datagram.hpp b/include/oxen/quic/datagram.hpp index 98547a23..789a14a2 100644 --- a/include/oxen/quic/datagram.hpp +++ b/include/oxen/quic/datagram.hpp @@ -329,6 +329,7 @@ namespace oxen::quic protected: friend class Connection; friend class Loop; + friend class JobQueue; friend struct dgram::rotating_buffer; friend class TestHelper; diff --git a/include/oxen/quic/endpoint.hpp b/include/oxen/quic/endpoint.hpp index ad55fb4a..d84d0afe 100644 --- a/include/oxen/quic/endpoint.hpp +++ b/include/oxen/quic/endpoint.hpp @@ -50,6 +50,7 @@ namespace oxen::quic connection_closed_callback connection_close_cb; Loop& loop; + JobQueue job_queue{loop}; template void listen(Opt&&... opts) @@ -58,7 +59,7 @@ namespace oxen::quic (0 + ... + std::is_convertible_v, std::shared_ptr>) == 1, "listen() requires exactly one std::shared_ptr argument"); - loop.call_get([&opts..., this]() { + job_queue.call_get([&opts..., this]() { if (inbound_ctx) throw std::logic_error{"Cannot call listen() more than once"}; @@ -82,7 +83,7 @@ namespace oxen::quic if (_local.is_ipv6() && !remote.is_ipv6()) remote.map_ipv4_as_ipv6(); - return loop.call_get([this, &opts..., remote = std::move(remote)]() mutable { + return job_queue.call_get([this, &opts..., remote = std::move(remote)]() mutable { // initialize client context and client tls context simultaneously auto outbound_ctx = std::make_shared(Direction::OUTBOUND, std::forward(opts)...); _assign_context_globals(*outbound_ctx); @@ -159,6 +160,7 @@ namespace oxen::quic private: friend class Network; friend class Loop; + friend class JobQueue; friend class Connection; friend struct connection_callbacks; friend class TestHelper; diff --git a/include/oxen/quic/iochannel.hpp b/include/oxen/quic/iochannel.hpp index cc71c217..a9a4c37f 100644 --- a/include/oxen/quic/iochannel.hpp +++ b/include/oxen/quic/iochannel.hpp @@ -121,7 +121,7 @@ namespace oxen::quic typename EP = Endpoint> Ret call_get_accessor(T (Class::*getter)() const) const { - return static_cast(endpoint).loop.call_get( + return static_cast(endpoint).job_queue.call_get( [this, &getter]() -> Ret { return (static_cast(this)->*getter)(); }); } }; diff --git a/include/oxen/quic/loop.hpp b/include/oxen/quic/loop.hpp index 20ce8a3b..d9214b5f 100644 --- a/include/oxen/quic/loop.hpp +++ b/include/oxen/quic/loop.hpp @@ -19,6 +19,7 @@ namespace oxen::quic struct Ticker { friend class Loop; + friend class JobQueue; private: event_ptr ev; @@ -31,8 +32,6 @@ namespace oxen::quic Ticker() = default; public: - ~Ticker(); - /** Starts the repeating event on the given interval on Ticker creation. Does nothing if * already active. Returns: @@ -59,6 +58,7 @@ namespace oxen::quic class Wakeable { friend class Loop; + friend class JobQueue; event_ptr ev; std::function f; @@ -70,53 +70,76 @@ namespace oxen::quic void wake(); }; - class Loop + // Get an independent JobQueue to use for jobs, call_later, loop deleters, etc. + // + // Do not use this unless you know you need it. + // + // The interface is the same as `Loop::call` and similar, but a JobQueue can have a shorter lifetime + // than the Loop on which it runs. The purpose of this is if you have multiple components using + // the same Loop and one of those components may have jobs queued which reference it *after* its + // destructor, that component can instead own this JobQueue and those jobs will not be + // processed, but anything else using the Loop will be unaffected. + // + // Effectively this allows a subqueue of events that can be cancelled (via JobQueue destruction) + // without needing to cancel jobs of unrelated job queues. + // + // This queue can be stopped or destroyed *off* the loop thread, if necessary, but note that + // stopping and destruction still requires that the loop thread is usable to perform the actual + // destruction, and so the loop this class uses must outlive this queue. + // + // If you keep objects alive which you created with this queue, e.g. tickers, wakeables, etc. + // you are responsible for making sure any concrete references to them (especially shared_ptr) + // are gone before the JobQueue is. Their destructors are (necessarily and intentionally) jobs + // on the job queue from which they spawned. + class JobQueue { - protected: - std::unique_ptr<::event_base, void (*)(struct ::event_base*)> ev_loop; - std::thread loop_thread; - std::thread::id loop_thread_id; + friend class Loop; + + std::shared_ptr running{std::make_shared(true)}; event_ptr job_waker; std::queue job_queue; std::mutex job_queue_mutex; - void add_oneshot_event(std::chrono::microseconds delay, std::function hook); + Loop& loop; - private: - std::list> tickers; - - std::shared_ptr make_ticker(); + void add_oneshot_event(std::chrono::microseconds delay, std::function hook); - // call_later events aren't guaranteed to get properly disposed off if the event loop stops - // before it fires, so we stash it in here temporarily and remove it when fired. During the - // Loop destructor, then, if there's anything left that's one that needs to be cleaned up. + // call_later events aren't guaranteed to get properly disposed off if the job queue is + // destroyed before it fires, so we stash it in here temporarily and remove it when fired. + // During the JobQueue destructor, then any unfired events need to be cleaned up. struct OneShotDelayed; std::list delayed_events; - public: - Loop(); + void setup_job_waker(); + void process_job_queue(); - Loop(const Loop&) = delete; - Loop(Loop&&) = delete; - Loop& operator=(Loop&&) = delete; - Loop& operator=(Loop) = delete; + public: + bool inside() const; - virtual ~Loop(); + JobQueue(Loop& l); - ::event_base* get_event_base() const { return ev_loop.get(); } + // Cancels all jobs in the queue and deletes this job queue's event from the event loop. + // This is normally called automatically during destruction, but can be called before + // destruction if needed. This method does nothing if the queue has already been stopped. + // + // Stopping is terminal (i.e. there is no way to restart a queue other than replacing it). + // + // Note that this method requires the event loop and will block until the owning Loop is + // able to process it. + void stop(); - bool inside() const { return std::this_thread::get_id() == loop_thread_id; } + // Calls stop() if not already called. + ~JobQueue(); - // Returns a pointer deleter that defers the actual destruction call to this network - // object's event loop. + // Returns a pointer deleter that defers the actual destruction call to this JobQueue template auto loop_deleter() { return [this](T* ptr) { call_get([ptr] { delete ptr; }); }; } - // Returns a pointer deleter that defers invocation of a custom deleter to the event loop + // Returns a pointer deleter that defers invocation of a custom deleter to this JobQueue template Callable> auto wrapped_deleter(Callable f) { @@ -126,9 +149,8 @@ namespace oxen::quic } // Similar in concept to std::make_shared, but it creates the shared pointer with a - // custom deleter that dispatches actual object destruction to the network's event loop for - // thread safety, and waits for destruction of the overlying object to complete before - // returning. + // custom deleter that dispatches actual object destruction to this JobQueue for thread + // safety, and waits for destruction of the overlying object to complete before returning. template std::shared_ptr make_shared(Args&&... args) { @@ -145,9 +167,8 @@ namespace oxen::quic return std::shared_ptr(obj, wrapped_deleter(std::forward(deleter))); } - /// Calls `f()` on the event loop. If the caller is already in the event loop thread then - /// f() is called immediately; otherwise it is scheduled on the event loop thread at the - /// next available opportunity. + /// Calls `f()` on the JobQueue. If the caller is already in the Loop thread then + /// f() is called immediately; otherwise it is scheduled at the end of the queue. template Callable> void call(Callable&& f) { @@ -161,9 +182,9 @@ namespace oxen::quic } } - // Calls `f()` on the event loop and returns its value. If this is called from within the - // event loop thread then this simply calls and returns the result of `f()`. If *not* in - // the event loop then a call to `f()` is scheduled on the event loop for the next available + // Calls `f()` on the JobQueue and returns its value. If this is called from within the + // Loop thread then this simply calls and returns the result of `f()`. If *not* in + // the Loop thread then a call to `f()` is scheduled on the JobQueue for the next available // opportunity and then the current thread blocks until that call is invoked, then returns // it back to the caller. template ()())> @@ -174,51 +195,39 @@ namespace oxen::quic return f(); } - std::promise prom; - auto fut = prom.get_future(); + struct CallGetter + { + std::shared_ptr> prom{std::make_shared>()}; + Callable& f; - call_soon([&f, &prom] { - try + void operator()() { - if constexpr (!std::is_void_v) - prom.set_value(f()); - else + try { - f(); - prom.set_value(); + if constexpr (!std::is_void_v) + prom->set_value(f()); + else + { + f(); + prom->set_value(); + } + } + catch (...) + { + prom->set_exception(std::current_exception()); } } - catch (...) - { - prom.set_exception(std::current_exception()); - } - }); + }; - return fut.get(); - } + CallGetter g{.f = f}; + auto fut = g.prom->get_future(); - /// Sets up a task `f()` to be called on the event loop periodically. - /// - /// `interval` controls the interval on which the task will be called. - /// - /// `start_immediately` controls whether the task is scheduled on the event loop right away - /// (true, the default), or not (false). If not started immediately then the task will not - /// fire until `start()` is called on it. (Note that this parameter does not mean "call - /// immediately" -- it simply controls whether the initial timer for the first call is - /// started or not). - /// - /// The ticker will remain active as long the loop remains active and the returned Ticker - /// object is kept alive. - template Callable> - [[nodiscard]] std::shared_ptr call_every( - std::chrono::microseconds interval, Callable&& f, bool start_immediately = true) - { - auto h = make_ticker(); - h->init_event(get_event_base(), interval, std::forward(f), start_immediately); - return h; + call_soon(std::move(g)); + + return fut.get(); } - /// Schedules a call of `f()` on the event loop after a delay. + /// Schedules a call of `f()` on the JobQueue after a delay. template Callable> void call_later(std::chrono::microseconds delay, Callable hook) { @@ -240,39 +249,142 @@ namespace oxen::quic } } - /// Creates a Wakeable event tied to this event loop that can be manually triggered when - /// desired to schedule an invocation of the callback. Unlike call_soon, this is idempotent - /// (i.e. multiple wakeups before it actually runs does not schedule multiple calls). Note - /// that this call only constructs the event, but does not initially schedule it. - std::shared_ptr make_wakeable(std::function hook); - static void activate(::event& evt); - /// Schedules a call of `f()` at the next available opportunity on the event loop. Unlike - /// `call()`, `call_soon()` never calls f() immediately even if already inside the event - /// loop. + /// Schedules a call of `f()` at the next available opportunity on the JobQueue. Unlike + /// `call()`, `call_soon()` never calls f() immediately even if already inside the Loop + /// thread. template Callable> void call_soon(Callable f) { { std::lock_guard lock{job_queue_mutex}; + if (!*running) + throw std::runtime_error{"Attempting to queue job onto stopped loop."}; job_queue.emplace(std::move(f)); } activate(*job_waker); } - /// Takes any type of shared_ptr and schedules a reset of that shared pointer on the event - /// loop. Asyncronous. + /// Takes any type of shared_ptr and schedules a reset of that shared pointer on the + /// JobQueue. Asyncronous. template void reset_soon(std::shared_ptr&& ptr) { call_soon([ptr = std::move(ptr)]() mutable { ptr.reset(); }); } + }; + + class Loop + { + friend class JobQueue; + + protected: + std::unique_ptr<::event_base, void (*)(struct ::event_base*)> ev_loop; + std::thread loop_thread; + std::thread::id loop_thread_id; private: - void setup_job_waker(); + JobQueue main_queue{*this}; - void process_job_queue(); + std::shared_ptr make_ticker(); + + public: + Loop(); + + Loop(const Loop&) = delete; + Loop(Loop&&) = delete; + Loop& operator=(Loop&&) = delete; + Loop& operator=(Loop) = delete; + + virtual ~Loop(); + + ::event_base* get_event_base() const { return ev_loop.get(); } + + bool inside() const { return std::this_thread::get_id() == loop_thread_id; } + + // See JobQueue::wrapped_deleter, applies to Loop's main event queue. + template Callable> + auto wrapped_deleter(Callable&& f) + { + return main_queue.wrapped_deleter(std::forward(f)); + } + + // See JobQueue::make_shared, applies to Loop's main event queue. + template + std::shared_ptr make_shared(Args&&... args) + { + return main_queue.make_shared(std::forward(args)...); + } + + // See JobQueue::shared_ptr, applies to Loop's main event queue. + template Callable> + std::shared_ptr shared_ptr(T* obj, Callable&& deleter) + { + return main_queue.shared_ptr(obj, std::forward(deleter)); + } + + // See JobQueue::call, applies to Loop's main event queue. + template Callable> + void call(Callable&& f) + { + main_queue.call(std::forward(f)); + } + + // See JobQueue::call_get, applies to Loop's main event queue. + template ()())> + Ret call_get(Callable&& f) + { + return main_queue.call_get(std::forward(f)); + } + + /// Sets up a task `f()` to be called on the event loop periodically. + /// + /// `interval` controls the interval on which the task will be called. + /// + /// `start_immediately` controls whether the task is scheduled on the event loop right away + /// (true, the default), or not (false). If not started immediately then the task will not + /// fire until `start()` is called on it. (Note that this parameter does not mean "call + /// immediately" -- it simply controls whether the initial timer for the first call is + /// started or not). + /// + /// The owner of the Ticker is responsible for making sure it does not outlive the Loop + /// from which it was created. + template Callable> + [[nodiscard]] std::shared_ptr call_every( + std::chrono::microseconds interval, Callable&& f, bool start_immediately = true) + { + auto h = make_ticker(); + h->init_event(get_event_base(), interval, std::forward(f), start_immediately); + return h; + } + + // See JobQueue::call_later, applies to Loop's main event queue. + template Callable> + void call_later(std::chrono::microseconds delay, Callable&& hook) + { + main_queue.call_later(delay, std::forward(hook)); + } + + /// Creates a Wakeable event tied to this event loop that can be manually triggered when + /// desired to schedule an invocation of the callback. Unlike call_soon, this is idempotent + /// (i.e. multiple wakeups before it actually runs does not schedule multiple calls). Note + /// that this call only constructs the event, but does not initially schedule it. + std::shared_ptr make_wakeable(std::function hook); + + // See JobQueue::call_soon, applies to Loop's main event queue. + template Callable> + void call_soon(Callable&& f) + { + main_queue.call_soon(std::forward(f)); + } + + // See JobQueue::reset_soon, applies to Loop's main event queue. + template + void reset_soon(std::shared_ptr&& ptr) + { + call_soon([ptr = std::move(ptr)]() mutable { ptr.reset(); }); + } }; } // namespace oxen::quic diff --git a/include/oxen/quic/stream.hpp b/include/oxen/quic/stream.hpp index 129f006f..ece2e180 100644 --- a/include/oxen/quic/stream.hpp +++ b/include/oxen/quic/stream.hpp @@ -67,6 +67,7 @@ namespace oxen::quic friend class Connection; friend class Network; friend class Loop; + friend class JobQueue; protected: template diff --git a/src/btstream.cpp b/src/btstream.cpp index 163ddcfa..53c83b91 100644 --- a/src/btstream.cpp +++ b/src/btstream.cpp @@ -169,7 +169,7 @@ namespace oxen::quic void BTRequestStream::register_handler(std::string ep, std::function func) { - loop.call([this, ep = std::move(ep), func = std::move(func)]() mutable { + endpoint.job_queue.call([this, ep = std::move(ep), func = std::move(func)]() mutable { registered_endpoints[std::move(ep)] = std::move(func); }); } @@ -177,7 +177,7 @@ namespace oxen::quic void BTRequestStream::register_generic_handler(std::function request_handler) { log::debug(log_cat, "BTRequestStream set generic request handler"); - loop.call([this, func = std::move(request_handler)]() mutable { generic_handler = std::move(func); }); + endpoint.job_queue.call([this, func = std::move(request_handler)]() mutable { generic_handler = std::move(func); }); } void BTRequestStream::handle_input(message msg) diff --git a/src/connection.cpp b/src/connection.cpp index 369a98af..d67a6f53 100644 --- a/src/connection.cpp +++ b/src/connection.cpp @@ -478,7 +478,7 @@ namespace oxen::quic void Connection::set_new_path(Path new_path) { - _loop.call([this, new_path]() { _path = new_path; }); + _endpoint.job_queue.call([this, new_path]() { _path = new_path; }); } int Connection::recv_token(const uint8_t* token, size_t tokenlen) @@ -588,12 +588,12 @@ namespace oxen::quic void Connection::set_remote_addr(const ngtcp2_addr& new_remote) { - _loop.call([this, new_remote]() { _path.set_new_remote(new_remote); }); + _endpoint.job_queue.call([this, new_remote]() { _path.set_new_remote(new_remote); }); } void Connection::set_local_addr(Address new_local) { - _loop.call([this, new_local]() { + _endpoint.job_queue.call([this, new_local]() { Path new_path{new_local, _path.remote}; _path = new_path; }); @@ -629,7 +629,7 @@ namespace oxen::quic void Connection::halt_events() { log::trace(log_cat, "{} called", __PRETTY_FUNCTION__); - assert(_loop.inside()); + assert(_endpoint.job_queue.inside()); packet_io_trigger.reset(); packet_retransmit_timer.reset(); log::debug(log_cat, "Connection ({}) io trigger/retransmit timer events halted", reference_id()); @@ -637,7 +637,7 @@ namespace oxen::quic void Connection::packet_io_ready() { - assert(_loop.inside()); + assert(_endpoint.job_queue.inside()); if (packet_io_trigger) event_active(packet_io_trigger.get(), 0, 0); // else we've reset the trigger (via halt_events), which means the connection is closing/draining/etc. @@ -650,12 +650,12 @@ namespace oxen::quic std::shared_ptr Connection::datagrams() { - return _loop.call_get([this] { return dgrams; }); + return _endpoint.job_queue.call_get([this] { return dgrams; }); } void Connection::revert_early_channels() { - assert(_loop.inside()); + assert(_endpoint.job_queue.inside()); log::debug(log_cat, "Client reverting early stream data"); // We need to re-open any opened streams because the remote rejected early data, and when @@ -734,7 +734,7 @@ namespace oxen::quic break; case NGTCP2_ERR_DRAINING: log::trace(log_cat, "Note: {} is draining; signaling endpoint to drain connection", reference_id()); - _loop.call_soon([wself = weak_from_this()]() { + _endpoint.job_queue.call_soon([wself = weak_from_this()]() { if (auto self = wself.lock()) { log::debug(log_cat, "Endpoint draining connection {}", self->reference_id()); @@ -819,7 +819,7 @@ namespace oxen::quic if (!stream && default_stream) stream = default_stream(*this, _endpoint); if (!stream) - stream = _loop.make_shared( + stream = _endpoint.job_queue.make_shared( *this, _endpoint, context->stream_data_cb, context->stream_close_cb, context->stream_fin_cb); return stream; @@ -828,7 +828,7 @@ namespace oxen::quic std::shared_ptr Connection::queue_incoming_stream_impl( std::function(Connection& c, Endpoint& e)> make_stream) { - return _loop.call_get([this, &make_stream]() { + return _endpoint.job_queue.call_get([this, &make_stream]() { std::shared_ptr stream; if (make_stream) stream = make_stream(*this, _endpoint); @@ -870,7 +870,7 @@ namespace oxen::quic std::shared_ptr Connection::open_stream_impl( std::function(Connection& c, Endpoint& e)> make_stream) { - return _loop.call_get([this, &make_stream]() { + return _endpoint.job_queue.call_get([this, &make_stream]() { std::shared_ptr stream; if (make_stream) stream = make_stream(*this, _endpoint); @@ -918,7 +918,7 @@ namespace oxen::quic std::shared_ptr Connection::get_stream_impl(int64_t id) { - return _loop.call_get([this, id]() -> std::shared_ptr { + return _endpoint.job_queue.call_get([this, id]() -> std::shared_ptr { if (auto it = _streams.find(id); it != _streams.end()) return it->second; @@ -1404,7 +1404,7 @@ namespace oxen::quic if (uint64_t app_err_code = context->stream_open_cb ? context->stream_open_cb(*stream) : 0; app_err_code != 0) { log::info(log_cat, "stream_open_callback returned error code {}, closing stream {}", app_err_code, id); - assert(_loop.inside()); + assert(_endpoint.job_queue.inside()); stream->close(app_err_code); return 0; } @@ -1666,7 +1666,7 @@ namespace oxen::quic std::string_view Connection::selected_alpn() const { - return _loop.call_get( + return _endpoint.job_queue.call_get( [this]() { return (handshaked or establish_hook_called) ? get_session()->selected_alpn() : ""sv; }); } @@ -1708,7 +1708,7 @@ namespace oxen::quic { if (!_max_dgram_size_changed) return std::nullopt; - return _loop.call_get([this]() -> std::optional { + return _endpoint.job_queue.call_get([this]() -> std::optional { // Check it again via an exchange, in case someone raced us here if (_max_dgram_size_changed.exchange(false)) return _last_max_dgram_piece * (_packet_splitting ? 2 : 1); @@ -1858,12 +1858,12 @@ namespace oxen::quic : nullptr; if (context->config.datagram_support) - dgrams = _loop.make_shared( + dgrams = _endpoint.job_queue.make_shared( *this, _endpoint, context->dgram_data_cb ? context->dgram_data_cb : ep.dgram_recv_cb, context->config.dgram_queue_limit); - pseudo_stream = _loop.make_shared(*this, _endpoint); + pseudo_stream = _endpoint.job_queue.make_shared(*this, _endpoint); pseudo_stream->_stream_id = -1; const auto d_str = is_outbound() ? "outbound" : "inbound"; @@ -2084,35 +2084,35 @@ namespace oxen::quic size_t Connection::num_streams_active() const { - return _loop.call_get([this] { return _streams.size(); }); + return _endpoint.job_queue.call_get([this] { return _streams.size(); }); } size_t Connection::num_streams_pending() const { - return _loop.call_get([this] { return pending_streams.size(); }); + return _endpoint.job_queue.call_get([this] { return pending_streams.size(); }); } uint64_t Connection::get_max_streams() const { - return _loop.call_get([this] { return _max_streams; }); + return _endpoint.job_queue.call_get([this] { return _max_streams; }); } uint64_t Connection::get_streams_available() const { - return _loop.call_get([this] { return ngtcp2_conn_get_streams_bidi_left(*this); }); + return _endpoint.job_queue.call_get([this] { return ngtcp2_conn_get_streams_bidi_left(*this); }); } Path Connection::path() const { - return _loop.call_get([this] { return _path; }); + return _endpoint.job_queue.call_get([this] { return _path; }); } Address Connection::local() const { - return _loop.call_get([this] { return _path.local; }); + return _endpoint.job_queue.call_get([this] { return _path.local; }); } Address Connection::remote() const { - return _loop.call_get([this] { return _path.remote; }); + return _endpoint.job_queue.call_get([this] { return _path.remote; }); } size_t Connection::get_max_datagram_size() const { - return _loop.call_get([this] { return get_max_datagram_piece() * (_packet_splitting ? 2 : 1); }); + return _endpoint.job_queue.call_get([this] { return get_max_datagram_piece() * (_packet_splitting ? 2 : 1); }); } Connection::~Connection() diff --git a/src/datagram.cpp b/src/datagram.cpp index 388ee9ed..914c93d9 100644 --- a/src/datagram.cpp +++ b/src/datagram.cpp @@ -51,19 +51,19 @@ namespace oxen::quic void Datagrams::set_split_datagram_lookahead(int n) { - loop.call([this, val = n >= 0 ? static_cast(n) : dgram::queue::DEFAULT_SPLIT_LOOKAHEAD] { + endpoint.job_queue.call([this, val = n >= 0 ? static_cast(n) : dgram::queue::DEFAULT_SPLIT_LOOKAHEAD] { log::debug(log_cat, "Changing split datagram lookahead from {} to {}", _send_buffer.split_lookahead, val); _send_buffer.split_lookahead = val; }); } int Datagrams::get_split_datagram_lookahead() const { - return loop.call_get([this] { return static_cast(_send_buffer.split_lookahead); }); + return endpoint.job_queue.call_get([this] { return static_cast(_send_buffer.split_lookahead); }); } void Datagrams::send_impl(std::span data, std::shared_ptr keep_alive) { - loop.call([this, data, keep_alive = std::move(keep_alive)]() mutable { + endpoint.job_queue.call([this, data, keep_alive = std::move(keep_alive)]() mutable { if (!_conn) { log::debug(log_cat, "Unable to send datagram: connection has gone away"); @@ -122,7 +122,7 @@ namespace oxen::quic { log::trace(log_cat, "{} called", __PRETTY_FUNCTION__); - assert(datagram.loop.inside()); + assert(datagram.endpoint.job_queue.inside()); assert(datagram._conn); auto idx = dgid >> 2; diff --git a/src/endpoint.cpp b/src/endpoint.cpp index dadf6b25..0104f8cf 100644 --- a/src/endpoint.cpp +++ b/src/endpoint.cpp @@ -113,7 +113,7 @@ namespace oxen::quic ConnectionID Endpoint::next_reference_id() { log::trace(log_cat, "{} called", __PRETTY_FUNCTION__); - assert(loop.inside()); + assert(job_queue.inside()); return ConnectionID{++_next_rid}; } @@ -127,11 +127,11 @@ namespace oxen::quic void Endpoint::manually_receive_packet(Packet&& pkt) { - if (loop.inside()) + if (job_queue.inside()) return handle_packet(std::move(pkt)); pkt.ensure_owned_data(); - loop.call_soon([this, packet = std::move(pkt)]() mutable { handle_packet(std::move(packet)); }); + job_queue.call_soon([this, packet = std::move(pkt)]() mutable { handle_packet(std::move(packet)); }); } void Endpoint::_init_internals() @@ -229,7 +229,7 @@ namespace oxen::quic { // We need to defer this because we aren't allowed to close connections during some other // callback, and can't guarantee we aren't in such a callback. - loop.call_soon([wself = weak_from_this(), d] { + job_queue.call_soon([wself = weak_from_this(), d] { if (auto self = wself.lock()) self->_close_conns(d); }); @@ -250,7 +250,7 @@ namespace oxen::quic Endpoint::~Endpoint() { - assert(loop.inside()); + assert(job_queue.inside()); _close_conns(std::nullopt); // Close it here rather than via member destruction because it still owns a callback that @@ -260,7 +260,7 @@ namespace oxen::quic void Endpoint::schedule_conn_cleanup(Connection& conn) { - loop.call_later( + job_queue.call_later( std::chrono::microseconds{(ngtcp2_conn_get_pto(conn) * 3 + 999) / 1000}, [this, wself = weak_from_this(), cid = conn.reference_id()] { auto self = wself.lock(); @@ -367,7 +367,7 @@ namespace oxen::quic void Endpoint::drop_connection(Connection& conn, io_error err) { log::debug(log_cat, "Scheduling drop connection ({}) with errcode {}", conn.reference_id(), err.code()); - loop.call_soon([wself = weak_from_this(), &conn, err] { + job_queue.call_soon([wself = weak_from_this(), &conn, err] { if (auto self = wself.lock()) self->_drop_connection(conn, err); }); @@ -377,10 +377,10 @@ namespace oxen::quic { if (!msg) msg = ec.strerror(); - loop.call_soon([wself = weak_from_this(), - connid = conn.reference_id(), - ec = std::move(ec), - msg = std::move(*msg)]() mutable { + job_queue.call_soon([wself = weak_from_this(), + connid = conn.reference_id(), + ec = std::move(ec), + msg = std::move(*msg)]() mutable { if (auto self = wself.lock()) if (auto it = self->conns.find(connid); it != self->conns.end() && it->second) self->_close_connection(*it->second, std::move(ec), std::move(msg)); @@ -426,7 +426,7 @@ namespace oxen::quic { log::debug(log_cat, "Closing connection ({})", conn.reference_id()); - assert(loop.inside()); + assert(job_queue.inside()); if (conn.is_closing() || conn.is_draining()) return; @@ -532,7 +532,7 @@ namespace oxen::quic // Defer destruction until the next event loop tick because there are code paths that // can land here from within an ongoing connection method and so it isn't safe to allow // the Connection to get destroyed right now. - loop.reset_soon(std::move(it->second)); + job_queue.reset_soon(std::move(it->second)); // We do want to remove it from `conns`, though, because some scheduled callbacks check // for `rid` being still in the endpoint and so, in that respect, we want the connection // to be considered gone even if its destructor doesn't fire yet. @@ -544,7 +544,7 @@ namespace oxen::quic void Endpoint::initial_association(Connection& conn) { log::trace(log_cat, "{} called", __PRETTY_FUNCTION__); - assert(loop.inside()); + assert(job_queue.inside()); auto dir_str = conn.is_outbound() ? "CLIENT"s : "SERVER"s; auto n = ngtcp2_conn_get_scid(conn, nullptr); @@ -568,7 +568,7 @@ namespace oxen::quic void Endpoint::associate_reset(const uint8_t* token, Connection& conn) { - assert(loop.inside()); + assert(job_queue.inside()); if (!token) { log::debug(log_cat, "Cannot add a null reset token"); @@ -585,7 +585,7 @@ namespace oxen::quic void Endpoint::dissociate_reset(const uint8_t* token, Connection& conn) { - assert(loop.inside()); + assert(job_queue.inside()); if (!token) return; @@ -614,7 +614,7 @@ namespace oxen::quic void Endpoint::associate_cid(const quic_cid& qcid, Connection& conn, bool weakly) { - assert(loop.inside()); + assert(job_queue.inside()); log::trace( log_cat, "{} associating CID:{} to {}", conn.is_inbound() ? "SERVER" : "CLIENT", qcid, conn.reference_id()); @@ -625,14 +625,14 @@ namespace oxen::quic void Endpoint::associate_cid(const ngtcp2_cid* cid, Connection& conn) { - assert(loop.inside()); + assert(job_queue.inside()); if (cid->datalen) return associate_cid(quic_cid{*cid}, conn); } void Endpoint::dissociate_cid(const quic_cid& qcid, Connection& conn) { - assert(loop.inside()); + assert(job_queue.inside()); log::trace( log_cat, "{} dissociating CID:{} to {}", conn.is_inbound() ? "SERVER" : "CLIENT", qcid, conn.reference_id()); @@ -642,7 +642,7 @@ namespace oxen::quic void Endpoint::dissociate_cid(const ngtcp2_cid* cid, Connection& conn) { - assert(loop.inside()); + assert(job_queue.inside()); if (cid->datalen) return dissociate_cid(quic_cid{*cid}, conn); } @@ -967,7 +967,7 @@ namespace oxen::quic log::debug(log_cat, "Constructing path using packet path: {}", pkt.path); - assert(loop.inside()); + assert(job_queue.inside()); auto next_rid = next_reference_id(); diff --git a/src/iochannel.cpp b/src/iochannel.cpp index 27ba593c..1563ac1b 100644 --- a/src/iochannel.cpp +++ b/src/iochannel.cpp @@ -15,7 +15,7 @@ namespace oxen::quic std::shared_ptr IOChannel::get_conn() { - return loop.call_get([this] { return _conn ? _conn->shared_from_this() : nullptr; }); + return endpoint.job_queue.call_get([this] { return _conn ? _conn->shared_from_this() : nullptr; }); } void IOChannel::send(std::string&& data) diff --git a/src/loop.cpp b/src/loop.cpp index a9aaeb2b..78f458fb 100644 --- a/src/loop.cpp +++ b/src/loop.cpp @@ -46,12 +46,11 @@ namespace oxen::quic bool Ticker::stop() { - if (event_del(ev.get()) != 0) + if (ev && event_del(ev.get()) != 0) { log::warning(log_cat, "EventHandler failed to pause repeating event!"); return false; } - return true; } @@ -86,13 +85,7 @@ namespace oxen::quic this)); if (start_immediately and not start()) - log::warning(log_cat, "Failed to immediately start one-off event!"); - } - - Ticker::~Ticker() - { - ev.reset(); - f = nullptr; + log::warning(log_cat, "Failed to immediately start repeating event!"); } static std::vector get_ev_methods() @@ -103,9 +96,8 @@ namespace oxen::quic return ev_methods_avail; } - Loop::Loop() : ev_loop{nullptr, ::event_base_free} + static ::event_base* make_ev_loop() { - log::trace(log_cat, "Beginning loop context creation with new ev loop thread"); #ifdef _WIN32 { @@ -143,14 +135,14 @@ namespace oxen::quic event_config_set_flag(ev_conf.get(), EVENT_BASE_FLAG_NO_CACHE_TIME); event_config_set_flag(ev_conf.get(), EVENT_BASE_FLAG_EPOLL_USE_CHANGELIST); - ev_loop = {event_base_new_with_config(ev_conf.get()), event_base_free}; - - log::debug(log_cat, "Started libevent loop with backend {}", event_base_get_method(ev_loop.get())); - - setup_job_waker(); + auto ev_loop = event_base_new_with_config(ev_conf.get()); + log::debug(log_cat, "Started libevent loop with backend {}", event_base_get_method(ev_loop)); + return ev_loop; + } + Loop::Loop() : ev_loop{make_ev_loop(), ::event_base_free} + { std::promise p; - loop_thread = std::thread{[this, &p] { log::debug(log_cat, "Starting event loop run"); p.set_value(); @@ -164,30 +156,67 @@ namespace oxen::quic log::info(log_cat, "libevent loop is started"); } - struct Loop::OneShotDelayed + struct JobQueue::OneShotDelayed { - Loop& loop; + JobQueue& jq; std::function f; + event_ptr ev; + + OneShotDelayed(JobQueue& jq_, std::function f) : jq{jq_}, f{std::move(f)} {} - OneShotDelayed(Loop& loop, std::function f) : loop{loop}, f{std::move(f)} {} + ~OneShotDelayed() + { + if (ev) + event_del(ev.get()); + } }; - Loop::~Loop() + JobQueue::JobQueue(Loop& l) : loop{l} { - log::debug(log_cat, "Shutting down loop..."); + setup_job_waker(); + } - for (auto& t : tickers) + JobQueue::~JobQueue() + { + log::debug(log_cat, "Destryoing job queue."); + if (job_waker) + stop(); + } + + void JobQueue::stop() + { + // Synchronization point: if we aren't on the loop, recurse into it: + if (!loop.inside()) { - if (auto tick = t.lock()) - { - tick->f = nullptr; - tick->stop(); - } + loop.call_get([this] { stop(); }); + return; } + std::lock_guard l{job_queue_mutex}; + if (!job_waker) + return; + + log::debug(log_cat, "Stopping/cancelling job queue events"); + *running = false; + + job_waker.reset(); + + // Why does std::queue not have a clear() method? + std::queue{}.swap(job_queue); + for (auto* osd : delayed_events) delete osd; delayed_events.clear(); + } + + Loop::~Loop() + { + log::debug(log_cat, "Shutting down loop..."); + + // JobQueue has a canary such that if it's processing jobs as it is destroyed it should be + // safe, but we *do* want to stop/destroy it before general member destruction (and on the + // loop thread, implemented by stop() itself). + main_queue.stop(); event_base_loopbreak(ev_loop.get()); loop_thread.join(); @@ -201,14 +230,18 @@ namespace oxen::quic std::shared_ptr Loop::make_ticker() { - std::erase_if(tickers, [](auto& wp) { return wp.expired(); }); - auto t = make_shared(); - tickers.emplace_back(t); - return t; + return make_shared(); } std::shared_ptr Loop::make_wakeable(std::function callback) { + if (!callback) + { + // FIXME: should this throw/assert? + log::error(log_cat, "Not making Wakeable with empty callback."); + return nullptr; + } + auto w = make_shared(); w->f = std::move(callback); w->ev.reset(event_new( @@ -217,8 +250,7 @@ namespace oxen::quic 0, [](evutil_socket_t, short, void* w) { auto* wakeable = static_cast(w); - if (wakeable->f) - wakeable->f(); + wakeable->f(); }, w.get())); return w; @@ -229,46 +261,55 @@ namespace oxen::quic event_active(ev.get(), 0, 0); } - void Loop::setup_job_waker() + void JobQueue::setup_job_waker() { // Almost identical to the generic make_wakeable, except that we avoid the std::function and // its implicit virtual function call. job_waker.reset(event_new( - ev_loop.get(), + loop.ev_loop.get(), -1, 0, [](evutil_socket_t, short, void* self) { log::trace(log_cat, "processing job queue"); - static_cast(self)->process_job_queue(); + static_cast(self)->process_job_queue(); }, this)); assert(job_waker); } - void Loop::add_oneshot_event(std::chrono::microseconds delay, std::function hook) + void JobQueue::add_oneshot_event(std::chrono::microseconds delay, std::function hook) { + // lock if not in loop thread, to make running check safe -- most uses of this should be + // from the loop thread, so this shouldn't be a bottleneck + std::unique_lock l{job_queue_mutex, std::defer_lock}; + if (!inside()) + l.lock(); + + if (!*running) + throw std::runtime_error{"Attempting to queue job onto stopped loop."}; + auto* handler = new OneShotDelayed{*this, std::move(hook)}; delayed_events.push_back(handler); auto& h = *handler; const auto delay_tv = loop_time_to_timeval(delay); - event_base_once( - get_event_base(), + h.ev.reset(event_new( + loop.get_event_base(), -1, - EV_TIMEOUT, + 0, [](evutil_socket_t, short, void* e) mutable { auto* h = static_cast(e); if (h->f) h->f(); - auto& de = h->loop.delayed_events; + auto& de = h->jq.delayed_events; if (auto it = std::find(de.begin(), de.end(), h); it != de.end()) de.erase(it); delete h; }, - &h, - &delay_tv); + &h)); + event_add(h.ev.get(), &delay_tv); } - void Loop::process_job_queue() + void JobQueue::process_job_queue() { log::trace(log_cat, "Event loop processing job queue"); assert(inside()); @@ -280,7 +321,12 @@ namespace oxen::quic job_queue.swap(swapped_queue); } - while (not swapped_queue.empty()) + // copy shared_ptr as a "running" canary, as this object's destructor + // should eventually be one of the queued jobs, after which no further jobs + // should run. + auto running_ptr = running; + + while (not swapped_queue.empty() && *running_ptr) { auto job = swapped_queue.front(); swapped_queue.pop(); @@ -288,8 +334,13 @@ namespace oxen::quic } } + bool JobQueue::inside() const + { + return loop.inside(); + } + // Wrapper around event_active so that we can keep libevent out of the public headers. - void Loop::activate(::event& evt) + void JobQueue::activate(::event& evt) { event_active(&evt, 0, 0); } diff --git a/src/stream.cpp b/src/stream.cpp index faf04e08..27d3b16d 100644 --- a/src/stream.cpp +++ b/src/stream.cpp @@ -66,7 +66,7 @@ namespace oxen::quic throw std::logic_error{ "Invalid enable_watermarks() call: alarm watermark ({}) must be > clear watermark ({})"_format( alarm, clear)}; - loop.call_get([&] { + endpoint.job_queue.call_get([&] { if (_is_closing || _send_fin) { log::debug(log_cat, "Failed to set watermarks; stream is not active!"); @@ -94,7 +94,7 @@ namespace oxen::quic void Stream::disable_watermarks() { - loop.call_get([this] { + endpoint.job_queue.call_get([this] { if (!_watermarking) return; _watermarking.reset(); @@ -107,7 +107,7 @@ namespace oxen::quic void Stream::pause() { - loop.call_get([this]() { + endpoint.job_queue.call_get([this]() { if (not _paused) { log::debug(log_cat, "Pausing stream ID:{}", _stream_id); @@ -121,7 +121,7 @@ namespace oxen::quic void Stream::resume() { - loop.call_get([this]() { + endpoint.job_queue.call_get([this]() { if (_paused) { log::debug(log_cat, "Resuming stream ID:{}", _stream_id); @@ -140,26 +140,26 @@ namespace oxen::quic bool Stream::is_paused() const { - return loop.call_get([this]() { return _paused; }); + return endpoint.job_queue.call_get([this]() { return _paused; }); } bool Stream::writable() const { - return loop.call_get([this] { return !(_is_closing || _send_fin || _sent_fin); }); + return endpoint.job_queue.call_get([this] { return !(_is_closing || _send_fin || _sent_fin); }); } bool Stream::readable() const { - return loop.call_get([this] { return !(_is_closing || _received_fin); }); + return endpoint.job_queue.call_get([this] { return !(_is_closing || _received_fin); }); } bool Stream::is_ready() const { - return loop.call_get([this] { return _ready; }); + return endpoint.job_queue.call_get([this] { return _ready; }); } std::optional Stream::watermark_status() const { - return loop.call_get([this]() -> std::optional { + return endpoint.job_queue.call_get([this]() -> std::optional { if (!_watermarking) return std::nullopt; return _watermark_alarm; @@ -175,7 +175,7 @@ namespace oxen::quic void Stream::send_fin() { - loop.call([this] { + endpoint.job_queue.call([this] { _send_fin = true; _conn->packet_io_ready(); }); @@ -188,7 +188,7 @@ namespace oxen::quic // NB: this *must* be a call (not a call_soon) because Connection calls on a short-lived // Stream that won't survive a return to the event loop. - loop.call([this, app_err_code]() { + endpoint.job_queue.call([this, app_err_code]() { log::trace(log_cat, "{} called", __PRETTY_FUNCTION__); if (_is_closing) @@ -216,15 +216,15 @@ namespace oxen::quic void Stream::set_data_callback(stream_data_callback cb) { - loop.call_get([&] { _data_callback = std::move(cb); }); + endpoint.job_queue.call_get([&] { _data_callback = std::move(cb); }); } void Stream::set_close_callback(stream_close_callback cb) { - loop.call_get([&] { _close_callback = std::move(cb); }); + endpoint.job_queue.call_get([&] { _close_callback = std::move(cb); }); } void Stream::set_fin_callback(std::function cb) { - loop.call_get([&] { _fin_callback = std::move(cb); }); + endpoint.job_queue.call_get([&] { _fin_callback = std::move(cb); }); } void Stream::closed(uint64_t app_code) @@ -277,7 +277,7 @@ namespace oxen::quic void Stream::append_buffer(std::span buffer, std::shared_ptr keep_alive) { log::trace(log_cat, "{} called", __PRETTY_FUNCTION__); - assert(loop.inside()); + assert(endpoint.job_queue.inside()); assert(_conn); _unsent_size += buffer.size(); @@ -368,7 +368,7 @@ namespace oxen::quic void Stream::revert_stream() { - assert(loop.inside()); + assert(endpoint.job_queue.inside()); log::trace(log_cat, "Stream (ID:{}) reverting after early data rejected...", _stream_id); _unacked_size = 0; _current_buffer_index = 0; @@ -424,7 +424,7 @@ namespace oxen::quic // still actually alive. (But if we're already in the event loop the lambda fires // immediately and we don't want to have to do an extra refcount increment/decrement). std::optional> wself; - if (!loop.inside()) + if (!endpoint.job_queue.inside()) wself = weak_from_this(); // In theory, `endpoint` that we use here might be inaccessible as well, but unlike conn @@ -432,7 +432,7 @@ namespace oxen::quic // events) the application has control and responsibility for keeping the network/endpoint // alive at least as long as all the Connections/Streams that instances that were attached // to it. - loop.call([this, wself = std::move(wself), data, ka = std::move(keep_alive)]() { + endpoint.job_queue.call([this, wself = std::move(wself), data, ka = std::move(keep_alive)]() { std::shared_ptr sself; if (wself) { diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 83c7e8ed..a2c1079a 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -51,6 +51,7 @@ if(LIBQUIC_BUILD_TESTS) 014-0rtt.cpp 015-bt-encoding.cpp 016-stateless_reset.cpp + job-queue.cpp case_logger.cpp ) diff --git a/tests/job-queue.cpp b/tests/job-queue.cpp new file mode 100644 index 00000000..e92055fd --- /dev/null +++ b/tests/job-queue.cpp @@ -0,0 +1,122 @@ +#include "unit_test.hpp" +#include "utils.hpp" + +#include + +#ifndef _WIN32 +extern "C" +{ +#include +} +#endif + +namespace oxen::quic::test +{ + using namespace std::literals; + + TEST_CASE("JobQueue - One extra queue", "[jobqueue]") + { + Loop loop; + JobQueue jq{loop}; + + SECTION("Extra queue jobs go away, others do not.") + { + callback_waiter queued{[]() {}}; + callback_waiter good{[]() {}}; + callback_waiter bad{[]() {}}; + callback_waiter main_ok{[]() {}}; + + // queue 3 jobs: + // + // one should run then queue a job onto the main job queue, which + // should later execute without issue + // + // the next should destroy the created job queue + // + // the third should not execute + loop.call([&]() { + jq.call_soon([&]() { good.call(); }); + + jq.call_soon([&]() { jq.stop(); }); + + jq.call_soon([&]() { bad.call(); }); + + // call_soon so it gets queued, as it is being called from inside the loop. + loop.call_soon([&]() { main_ok.call(); }); + + queued.call(); + }); + + REQUIRE(queued.wait(10ms)); + + REQUIRE(good.wait(10ms)); + REQUIRE_FALSE(bad.wait(10ms)); + REQUIRE(main_ok.wait(10ms)); + } + + SECTION("call exception if invoked after JobQueue stopped") + { + callback_waiter stopped{[]() {}}; + bool soon_failed{false}; + bool later_failed{false}; + + jq.call([&]() { + jq.stop(); + + try + { + jq.call_soon([]() {}); + } + catch (const std::exception& e) + { + soon_failed = true; + } + + try + { + jq.call_later(1s, []() {}); + } + catch (const std::exception& e) + { + later_failed = true; + } + + stopped.call(); + }); + + CHECK_NOFAIL(stopped.wait(10ms)); + + REQUIRE(soon_failed); + REQUIRE(later_failed); + REQUIRE_THROWS_AS(jq.call([]() {}), std::runtime_error); + REQUIRE_THROWS_AS(jq.call_soon([]() {}), std::runtime_error); + REQUIRE_THROWS_AS(jq.call_get([]() { return 0; }), std::runtime_error); + REQUIRE_THROWS_AS(jq.call_later(1s, []() {}), std::runtime_error); + } + + SECTION("call_get exception if JobQueue goes away before fulfilled") + { + jq.call([&]() { + // this needs to happen after the call_get below is queued. hopefully there + // won't be some fruit-flavored platform where this sleep is insufficient. + std::this_thread::sleep_for(10ms); + jq.stop(); + }); + + bool success{false}; + try + { + jq.call_get([&]() {}); + } + catch (const std::future_error& e) + { + success = true; + } + catch (const std::exception& e) + {} + + CHECK_NOFAIL(success); + } + } + +} // namespace oxen::quic::test diff --git a/tests/utils.hpp b/tests/utils.hpp index e1fcfd40..006552fc 100644 --- a/tests/utils.hpp +++ b/tests/utils.hpp @@ -266,6 +266,8 @@ namespace oxen::quic return func(std::forward(args)...); }; } + + void call() { this->operator Func_t()(); } }; /// Waits for some condition to be satisfied, sleeping between checks. Returns the result of