diff --git a/CMakeLists.txt b/CMakeLists.txt index 44d8029..3c32ecd 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -79,10 +79,18 @@ else() message(STATUS "Building unit tests!") add_executable(async_unit_test) # Add module files - target_sources(async_unit_test - PRIVATE + target_sources(async_unit_test PUBLIC + FILE_SET CXX_MODULES + TYPE CXX_MODULES + FILES + tests/util.cppm + PRIVATE tests/main.test.cpp - tests/async.test.cpp + tests/basics.test.cpp + tests/blocked_by.test.cpp + tests/cancel.test.cpp + tests/guards.test.cpp + tests/proxy.test.cpp ) target_compile_features(async_unit_test PUBLIC cxx_std_23) diff --git a/benchmarks/benchmark.cpp b/benchmarks/benchmark.cpp index 3f4103f..aad9486 100644 --- a/benchmarks/benchmark.cpp +++ b/benchmarks/benchmark.cpp @@ -222,6 +222,27 @@ static void bm_virtual_call_variant(benchmark::State& state) } BENCHMARK(bm_virtual_call_variant); +/** + * @brief Sync wait a future + * + * This is only safe is the operation never blocks by time. For this benchmark, + * that should always be the case. + * + * @tparam T - type of the future + * @param p_future - future to finish + * @return auto - result T + */ +template +auto sync_wait(async::future& p_future) +{ + while (not p_future.done()) { + p_future.resume(); + } + if constexpr (not std::is_void_v) { + return std::move(p_future.value()); + } +} + // ---------------------------------------------------------------------------- // 3. FUTURE SYNC: Non-coroutine functions returning future, 3 levels deep // These functions directly construct future with a value (no coroutine) @@ -238,7 +259,7 @@ __attribute__((noinline)) async::future sync_future_level2( int x) { auto f = sync_future_level3(ctx, x); - return f.sync_wait() + 1; + return sync_wait(f) + 1; } __attribute__((noinline)) async::future sync_future_level1( @@ -246,7 +267,7 @@ __attribute__((noinline)) async::future sync_future_level1( int x) { auto f = sync_future_level2(ctx, x); - return f.sync_wait() + 1; + return sync_wait(f) + 1; } struct benchmark_context : public async::context { @@ -271,7 +292,7 @@ static void bm_future_sync_return(benchmark::State& state) int input = 42; for (auto _ : state) { auto f = sync_future_level1(ctx, input); - int result = f.sync_wait(); + int result = sync_wait(f); benchmark::DoNotOptimize(result); } } @@ -308,7 +329,7 @@ static void bm_future_coroutine(benchmark::State& state) int input = 42; for (auto _ : state) { auto f = coro_level1(ctx, input); - int result = f.sync_wait(); + int result = sync_wait(f); benchmark::DoNotOptimize(result); } } @@ -349,7 +370,7 @@ static void bm_future_sync_await(benchmark::State& state) int input = 42; for (auto _ : state) { auto f = sync_in_coro_level1(ctx, input); - int result = f.sync_wait(); + int result = sync_wait(f); benchmark::DoNotOptimize(result); } } @@ -370,7 +391,8 @@ __attribute__((noinline)) async::future mixed_sync_level2( async::context& ctx, int x) { - return mixed_sync_level3(ctx, x).sync_wait() + 1; + auto future = mixed_sync_level3(ctx, x); + return sync_wait(future) + 1; } __attribute__((noinline)) async::future mixed_coro_level1( @@ -389,7 +411,7 @@ static void bm_future_mixed(benchmark::State& state) int input = 42; for (auto _ : state) { auto f = mixed_coro_level1(ctx, input); - int result = f.sync_wait(); + int result = sync_wait(f); benchmark::DoNotOptimize(result); } } @@ -431,7 +453,7 @@ static void bm_future_void_coroutine(benchmark::State& state) int output = 0; for (auto _ : state) { auto f = void_coro_level1(ctx, output, input); - f.sync_wait(); + sync_wait(f); benchmark::DoNotOptimize(f); benchmark::DoNotOptimize(output); } diff --git a/conanfile.py b/conanfile.py index 9f7b777..a9f3598 100644 --- a/conanfile.py +++ b/conanfile.py @@ -33,7 +33,7 @@ class async_context_conan(ConanFile): description = ("Implementation of C++20 coroutines targeting embedded system by eliminating the usage of the global heap and providing a 'context' which contains a coroutine stack frame and other useful utilities for scheduling.") topics = ("async", "coroutines", "stack", "scheduling", "scheduler") settings = "compiler", "build_type", "os", "arch" - exports_sources = "modules/*", "benchmarks/*", "tests/*", "CMakeLists.txt", "*.cmake.in", "LICENSE" + exports_sources = "modules/*", "benchmarks/*", "tests/*", "CMakeLists.txt", "LICENSE" package_type = "static-library" shared = False diff --git a/modules/async_context.cppm b/modules/async_context.cppm index 1b525b0..f99dcf8 100644 --- a/modules/async_context.cppm +++ b/modules/async_context.cppm @@ -23,7 +23,6 @@ module; #include #include #include -#include #include #include #include @@ -42,30 +41,73 @@ export using u8 = std::uint8_t; export using byte = std::uint8_t; export using usize = std::size_t; export using uptr = std::uintptr_t; + constexpr size_t mask = sizeof(uptr) - 1uz; constexpr size_t shift = std::countr_zero(sizeof(uptr)); export enum class blocked_by : u8 { - /// Not blocked by anything, ready to run! + /// Not blocked by anything, ready to run, can be resumed. nothing = 0, /// Blocked by a time duration that must elapse before resuming. + /// + /// Another way of saying this is that the active coroutine is requesting to + /// be rescheduled at or after the time duration provided. The sleep time + /// provided is the minimum that a scheduler must wait before resuming the + /// coroutine. If the coroutine is resumed earlier, then this is erroneous + /// behavior. This behavior is clearly wrong, but is well defined. The + /// coroutine will resume earlier than it had anticipated, which can cause + /// other problems down the line. For example, if a coroutine resets a device + /// and must wait 50ms before attempting to communicate with it again. If that + /// time isn't upheld, then the code may thrown an exception when the device + /// is not online by the time of resumption. + /// + /// This blocked by state is special in that it is not poll-able. Unlike the + /// blocked by states below, when a coroutine requests to be rescheduled, the + /// scheduler must ensure that the context/future it is bound to is resumed at + /// the right time. time = 1, /// Blocked by an I/O operation (DMA, bus transaction, etc.). /// An interrupt or I/O completion will call unblock() when ready. + /// + /// This blocked by state is poll-able, meaning that the coroutine may be + /// resumed before the context is unblocked. + /// Coroutines MUST check that their I/O operations have completed before + /// continuing on with their operations. If a coroutine is resumed and their + /// I/O operation is still pending, those coroutines should block themselves + /// by I/O again to signal to the scheduler that they are not ready yet. + /// + /// A time estimate may be provided to the scheduler to give extra information + /// about when to poll or reschedule the context again. The time information + /// may be ignored. io = 2, - /// Blocked by a synchronization primitive or resource contention. + /// Blocked by a resource contention. + /// /// Examples: mutex, semaphore, two coroutines competing for an I2C bus. - /// The transition handler may integrate with OS schedulers or implement - /// priority inheritance strategies. + /// + /// If the coroutine is resumed, the coroutine should check that it can + /// acquire the resource before assuming that it can. Just like I/O, if the + /// coroutine determines that its still blocked by sync, then it must re-block + /// itself by sync. sync = 3, /// Blocked by an external coroutine outside the async::context system. - /// Examples: co_awaiting std::task, std::generator, or third-party async - /// types. The transition handler has no control over scheduling - it can only - /// wait for the external coroutine's await_resume() to call unblock(). + /// + /// Examples: co_awaiting a std::task, std::generator, or third-party + /// coroutine library. + /// + /// A coroutine invoking a 3rd party async library is considered to be a + /// coroutine supervisor. A coroutine supervisor stays as the active coroutine + /// on its context, and must manually resume the 3rd party async library until + /// it finishes. This is important since the async context scheduler has no + /// knowledge of the 3rd party async operation and how it works. + /// + /// If the external async library has the ability to call unblock() on the + /// context, then it should, but is not mandated to do so. Like I/O, this is + /// pollable by a scheduler and the coroutine code should block on external if + /// the external coroutine is still active. external = 4, }; @@ -104,6 +146,10 @@ export struct bad_coroutine_alloc : std::bad_alloc context const* violator; }; +/** + * @brief Thrown when a coroutine awaits a cancelled future + * + */ export class operation_cancelled : public std::exception { [[nodiscard]] char const* what() const noexcept override @@ -122,7 +168,7 @@ export class operation_cancelled : public std::exception * @brief The data type for sleep time duration * */ -using sleep_duration = std::chrono::nanoseconds; +export using sleep_duration = std::chrono::nanoseconds; /** * @brief Information about the block state when context::schedule is called @@ -210,13 +256,6 @@ public: m_active_handle = p_active_handle; } - void sync_wait() - { - while (m_active_handle != std::noop_coroutine()) { - m_active_handle.resume(); - } - } - constexpr bool done() { return m_active_handle == std::noop_coroutine(); @@ -224,7 +263,11 @@ public: void resume() { - m_active_handle.resume(); + // We cannot resume the a coroutine blocked by time. + // Only the scheduler can unblock a context state. + if (m_state != blocked_by::time) { + m_active_handle.resume(); + } } /** @@ -431,6 +474,60 @@ private: } }; +export class basic_context : public context +{ +public: + basic_context() = default; + ~basic_context() override = default; + + [[nodiscard]] constexpr sleep_duration pending_delay() const noexcept + { + return m_pending_delay; + } + + /** + * @brief Perform sync_wait operation + * + * @tparam DelayFunc + * @param p_delay - a delay function, that accepts a sleep duration and + * returns void. + */ + void sync_wait(std::invocable auto&& p_delay) + { + while (active_handle() != std::noop_coroutine()) { + active_handle().resume(); + + if (state() == blocked_by::time && m_pending_delay > sleep_duration(0)) { + p_delay(m_pending_delay); + m_pending_delay = sleep_duration(0); + unblock_without_notification(); + } + } + } + +private: + /** + * @brief Forwards the schedule call to the original context + * + * @param p_block_state - state that this context has been set to + * @param p_block_info - information about the blocking conditions + */ + void do_schedule(blocked_by p_block_state, + block_info p_block_info) noexcept override + { + if (p_block_state == blocked_by::time) { + if (auto* ex = std::get_if(&p_block_info)) { + m_pending_delay = *ex; + } else { + m_pending_delay = sleep_duration{ 0 }; + } + } + // Ignore the rest and poll them... + } + + sleep_duration m_pending_delay{ 0 }; +}; + export class context_token { public: @@ -508,6 +605,24 @@ private: uptr m_context_address = 0U; }; +export struct io +{ + io(sleep_duration p_duration = sleep_duration{ 0u }) + : m_duration(p_duration) + { + } + sleep_duration m_duration; +}; + +export struct sync +{ + sync(context_token p_context) + : m_context(p_context) + { + } + context_token m_context; +}; + // ============================================================================= // // Promise Base @@ -596,6 +711,12 @@ public: return m_context->block_by_time(p_sleep_duration); } + constexpr std::suspend_always await_transform() noexcept + { + m_context->block_by_io(); + return {}; + } + template constexpr U&& await_transform(U&& p_awaitable) noexcept { @@ -644,6 +765,15 @@ using monostate_or = std::conditional_t, std::monostate, T>; struct cancelled_state {}; +/** + * @brief Represents a future that is currently busy. + * + * The purpose of this state is to report that a future is currently in a busy + * state without exposing the coroutine handle. + */ +struct busy_state +{}; + /** * @brief Defines the states that a future can be in * @@ -657,6 +787,7 @@ using future_state = cancelled_state, // 2 - cancelled std::exception_ptr // 3 - exception >; + template struct final_awaiter { @@ -770,8 +901,9 @@ public: full_handle_type::from_address(handle.address()) .promise() .get_context() - .active_handle() .resume(); + } else if (std::holds_alternative(m_state)) { + std::rethrow_exception(std::get(m_state)); } } @@ -799,14 +931,12 @@ public: } /** - * @brief Extract result value from async operation. - * - * Throws std::bad_variant_access if `done()` return false or `cancelled()` - * return true. + * @brief Extract value from async operation. * * @return Type - reference to the value from this async operation. + * @throws std::bad_variant_access if `has_value()` return false */ - [[nodiscard]] constexpr monostate_or& result() + [[nodiscard]] constexpr monostate_or& value() requires(not std::is_void_v) { return std::get(m_state); @@ -874,41 +1004,6 @@ public: return awaiter{ *this }; } - /** - * @brief Run future synchronously until the future is done - * - */ - void sync_wait() - requires(std::is_void_v) - { - while (not done()) { - resume(); - } - - if (auto* ex = std::get_if(&m_state)) [[unlikely]] { - std::rethrow_exception(*ex); - } - } - - /** - * @brief Run synchronously until the future is done and return its result - * - * @returns monostate_or - Returns a reference to contained object - */ - monostate_or& sync_wait() - requires(not std::is_void_v) - { - while (not done()) { - resume(); - } - - if (auto* ex = std::get_if(&m_state)) [[unlikely]] { - std::rethrow_exception(*ex); - } - - return std::get(m_state); - } - template constexpr future(U&& p_value) noexcept requires std::is_constructible_v diff --git a/tests/async.test.cpp b/tests/async.test.cpp deleted file mode 100644 index f43f731..0000000 --- a/tests/async.test.cpp +++ /dev/null @@ -1,692 +0,0 @@ -// Copyright 2024 - 2025 Khalil Estell and the libhal contributors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include -#include -#include -#include -#include -#include -#include - -#include - -import async_context; - -namespace async { -std::ostream& operator<<(std::ostream& out, blocked_by b) -{ - switch (b) { - case blocked_by::nothing: - return out << "nothing"; - case blocked_by::time: - return out << "time"; - case blocked_by::io: - return out << "io"; - case blocked_by::sync: - return out << "sync"; - case blocked_by::external: - return out << "external"; - default: - // For unknown values we print the numeric value - return out << "blocked_by(" << static_cast(b) << ')'; - } -} -} // namespace async - -bool resumption_occurred = false; - -struct thread_info -{ - async::context* sync_context = nullptr; - int sleep_count = 0; - bool io_block = false; -}; - -struct test_context : public async::context -{ - std::shared_ptr info; - std::array m_stack{}; - - test_context(std::shared_ptr const& p_info) - : info(p_info) - { - this->initialize_stack_memory(m_stack); - } - test_context() - : info(std::make_shared()) - { - this->initialize_stack_memory(m_stack); - } - -private: - void do_schedule(async::blocked_by p_block_state, - async::block_info p_block_info) noexcept override - { - std::println("Scheduler called!", info->sleep_count); - - switch (p_block_state) { - case async::blocked_by::time: { - if (std::holds_alternative(p_block_info)) { - std::println("sleep for: {}", - std::get(p_block_info)); - info->sleep_count++; - std::println("Sleep count = {}!", info->sleep_count); - } - break; - } - case async::blocked_by::sync: { - if (std::holds_alternative(p_block_info)) { - auto* context = std::get(p_block_info); - std::println( - "Coroutine ({}) is blocked by syncronization with coroutine ({})", - static_cast(this), - static_cast(context)); - info->sync_context = context; - } - break; - } - case async::blocked_by::io: { - info->io_block = true; - break; - } - case async::blocked_by::nothing: { - std::println("Context ({}) has been unblocked!", - static_cast(this)); - break; - } - default: { - break; - } - } - } -}; - -namespace async { -void async_context_suite() -{ - using namespace boost::ut; - - "coroutine with time-based blocking and sync_wait"_test = []() { - // Setup - test_context ctx; - - static constexpr int expected_return_value = 5; - - auto print_and_wait_coroutine = [](async::context&) -> async::future { - using namespace std::chrono_literals; - std::println("Printed from a coroutine"); - co_await 100ns; - resumption_occurred = true; - co_await 100ns; - co_return expected_return_value; - }; - - // Exercise - auto future_print = print_and_wait_coroutine(ctx); - expect(that % 0 < ctx.memory_used()); - auto value = future_print.sync_wait(); - - // Verify - expect(that % resumption_occurred); - expect(that % future_print.done()); - expect(that % 0 == ctx.memory_used()); - expect(that % 2 == ctx.info->sleep_count); - expect(that % expected_return_value == value); - }; - - "block_by_io and block_by_sync notify scheduler correctly"_test = []() { - // Setup - auto info = std::make_shared(); - test_context ctx1(info); - test_context ctx2(info); - - resumption_occurred = false; - - auto test_coro = [&ctx2](async::context& p_context) -> async::future { - using namespace std::chrono_literals; - std::println("Printed from a coroutine"); - co_await 100ns; - resumption_occurred = true; - co_await p_context.block_by_io(); - co_await p_context.block_by_sync(&ctx2); - co_return; - }; - - // Exercise 1 - auto blocked_by_testing = test_coro(ctx1); - - // Verify 1 - expect(that % not resumption_occurred); - expect(that % 0 < ctx1.memory_used()); - expect(that % 0 == ctx2.memory_used()); - - // Exercise 2 - blocked_by_testing.sync_wait(); - - // Verify 2 - expect(that % resumption_occurred); - expect(that % blocked_by_testing.done()); - expect(that % info->io_block); - expect(that % &ctx2 == info->sync_context); - expect(that % 0 == ctx1.memory_used()); - expect(that % 0 == ctx2.memory_used()); - }; - - "Context Token"_test = []() { - // Setup - test_context ctx1; - test_context ctx2; - - async::context_token io_in_use; - - auto single_resource = - [&](async::context& p_context) -> async::future { - using namespace std::chrono_literals; - - std::println("Executing 'single_resource' coroutine"); - while (io_in_use) { - // TODO(#44): For some reason this segfaults on Linux - // std::println("Resource unavailable, blocked by {}", - // io_in_use.address()); - co_await io_in_use.set_as_block_by_sync(p_context); - } - - // Block next coroutine from using this resource - io_in_use = p_context; - - // setup dma transaction... - - // It cannot be assumed that the scheduler will not sync_wait() this - // coroutine, thus, a loop is required to sure that the async operation - // has actually completed. - while (io_in_use == p_context) { - std::println("Waiting on io complete flag, blocking by I/O"); - // Continually notify that this is blocked by IO - co_await p_context.block_by_io(); - } - - std::println("IO operation complete! Returning!"); - - co_return; - }; - - std::println("🧱 Future setup"); - auto access_first = single_resource(ctx1); - auto access_second = single_resource(ctx2); - - expect(that % 0 < ctx1.memory_used()); - expect(that % 0 < ctx2.memory_used()); - - auto check_access_first_blocked_by = - [&](async::blocked_by p_state = async::blocked_by::io, - std::source_location const& p_location = - std::source_location::current()) { - expect(that % p_state == ctx1.state()) - << "ctx1 state mismatch, line: " << p_location.line() << '\n'; - }; - - auto check_access_second_blocked_by = - [&](async::blocked_by p_state = async::blocked_by::nothing, - std::source_location const& p_location = - std::source_location::current()) { - expect(that % p_state == ctx2.state()) - << "ctx2 state mismatch, line: " << p_location.line() << '\n'; - }; - - // access_first will claim the resource and will return control, and be - // blocked by IO. - std::println("▶️ Resume 1st: 1"); - access_first.resume(); - - check_access_first_blocked_by(); - check_access_second_blocked_by(); - - std::println("▶️ Resume 1st: 2"); - access_first.resume(); - - check_access_first_blocked_by(); - check_access_second_blocked_by(); - - std::println("▶️ Resume 1st: 3"); - access_first.resume(); - - check_access_first_blocked_by(); - check_access_second_blocked_by(); - - std::println("▶️ Resume 2nd: 1"); - access_second.resume(); - - check_access_first_blocked_by(); - check_access_second_blocked_by(async::blocked_by::sync); - - io_in_use.unblock_and_clear(); - - check_access_first_blocked_by(async::blocked_by::nothing); - check_access_second_blocked_by(async::blocked_by::sync); - - std::println("▶️ Resume 2nd: 2"); - access_second.resume(); - - // Resuming access_second shouldn't change the state of anything - check_access_first_blocked_by(async::blocked_by::nothing); - check_access_second_blocked_by(async::blocked_by::io); - - std::println("▶️ Resume 1st: 4, this should finish the operation"); - access_first.resume(); - - expect(that % ctx1.state() == async::blocked_by::nothing); - expect(that % access_first.done()); - - check_access_second_blocked_by(async::blocked_by::io); - access_second.resume(); - check_access_second_blocked_by(async::blocked_by::io); - - io_in_use.unblock_and_clear(); - access_second.resume(); - - expect(that % ctx2.state() == async::blocked_by::nothing); - expect(that % access_second.done()); - - expect(that % 0 == ctx1.memory_used()); - expect(that % 0 == ctx2.memory_used()); - }; - - struct raii_counter - { - raii_counter(std::pair p_counts) - : counts(p_counts) - { - std::println("🔨 Constructing..."); - (*counts.first)++; - } - - ~raii_counter() // NOLINT(bugprone-exception-escape) - { - std::println("💥 Destructing..."); - (*counts.second)++; - } - std::pair counts; - }; - - "Cancellation"_test = []() { - // Setup - test_context ctx; - - std::println("===================================="); - std::println("Running cancellation test"); - std::println("===================================="); - - std::pair count{ 0, 0 }; - int ends_reached = 0; - - auto get_counter = [&count]() -> auto { - return raii_counter( - std::make_pair(&count.first, &count.second)); - }; - - auto a = [get_counter, - &ends_reached](async::context& p_ctx) -> future { - std::println("entering a"); - raii_counter counter = get_counter(); - co_await std::suspend_always{}; - std::println("a exited"); - ends_reached++; - co_return; - }; - - auto b = - [a, get_counter, &ends_reached](async::context& p_ctx) -> future { - std::println("entering b"); - raii_counter counter = get_counter(); - co_await a(p_ctx); - std::println("b exited"); - ends_reached++; - co_return; - }; - - auto c = - [b, get_counter, &ends_reached](async::context& p_ctx) -> future { - std::println("entering c"); - raii_counter counter = get_counter(); - co_await b(p_ctx); - std::println("c exited"); - ends_reached++; - co_return; - }; - - { - expect(count == std::make_pair(0, 0)) - << "count is {" << count.first << ", " << count.second << "}\n"; - expect(that % ends_reached == 0); - - auto future = c(ctx); - - expect(count == std::make_pair(0, 0)) - << "count is {" << count.first << ", " << count.second << "}\n"; - expect(that % ends_reached == 0); - - std::println("Resume until future reaches suspension @ coroutine A"); - future.resume(); - - expect(count == std::make_pair(3, 0)) - << "count is {" << count.first << ", " << count.second << "}\n"; - expect(that % ends_reached == 0); - expect(that % 0 < ctx.memory_used()); - } // destroy future - - expect(count == std::make_pair(3, 3)) - << "count is {" << count.first << ", " << count.second << "}\n"; - expect(that % ends_reached == 0); - expect(that % 0 == ctx.memory_used()); - - std::println(">>>>>>>>>>>>>>>>>>>>>>>>>>>"); - }; - - "Context Cancellation"_test = []() { - // Setup - test_context ctx; - - std::println("===================================="); - std::println("Running Context Cancellation"); - std::println("===================================="); - - std::pair count{ 0, 0 }; - int ends_reached = 0; - - auto get_counter = [&count]() -> auto { - return raii_counter( - std::make_pair(&count.first, &count.second)); - }; - - auto a = [get_counter, - &ends_reached](async::context& p_ctx) -> future { - std::println("entering a"); - raii_counter counter = get_counter(); - co_await std::suspend_always{}; - std::println("a exited"); - ends_reached++; - co_return; - }; - auto b = - [a, get_counter, &ends_reached](async::context& p_ctx) -> future { - std::println("entering b"); - raii_counter counter = get_counter(); - co_await a(p_ctx); - std::println("b exited"); - ends_reached++; - co_return; - }; - auto c = - [b, get_counter, &ends_reached](async::context& p_ctx) -> future { - std::println("entering c"); - raii_counter counter = get_counter(); - co_await b(p_ctx); - std::println("c exited"); - ends_reached++; - co_return; - }; - - expect(count == std::make_pair(0, 0)); - expect(that % ends_reached == 0); - - auto future = c(ctx); - - expect(count == std::make_pair(0, 0)); - expect(that % ends_reached == 0); - - std::println("Resume until future reaches suspension @ coroutine A"); - future.resume(); - - expect(count == std::make_pair(3, 0)); - expect(that % ends_reached == 0); - expect(that % 0 < ctx.memory_used()); - expect(that % false == future.has_value()); - expect(that % false == future.done()); - - ctx.unsafe_cancel(); - - expect(count == std::make_pair(3, 3)); - expect(that % ends_reached == 0); - expect(that % 0 == ctx.memory_used()); - expect(that % false == future.has_value()); - // Unfortunately, context doesn't have the information necessary to this - // future. The future is invalid, but we currently cannot change its state - // from the perview of the context. - expect(that % false == future.done()); - - std::println(">>>>>>>>>>>>>>>>>>>>>>>>>>>"); - }; - - "Exception Propagation"_test = []() { - // Setup - test_context ctx; - - std::println("===================================="); - std::println("Running Exception Propagation Test"); - std::println("===================================="); - - struct raii_counter - { - raii_counter(std::pair p_counts) - : counts(p_counts) - { - std::println("🔨 Constructing..."); - (*counts.first)++; - } - - ~raii_counter() // NOLINT(bugprone-exception-escape) - { - std::println("💥 Destructing..."); - (*counts.second)++; - } - std::pair counts; - }; - - std::pair count{ 0, 0 }; - int ends_reached = 0; - - auto get_counter = [&count]() -> auto { - return raii_counter( - std::make_pair(&count.first, &count.second)); - }; - - bool should_throw = true; - auto a = [get_counter, &should_throw, &ends_reached]( - async::context& p_ctx) -> future { - std::println("entering a"); - raii_counter counter = get_counter(); - co_await std::suspend_always{}; - if (should_throw) { - throw std::runtime_error("Throwing this error for the test"); - } - std::println("a exited"); - ends_reached++; - co_return; - }; - auto b = - [a, get_counter, &ends_reached](async::context& p_ctx) -> future { - std::println("entering b"); - raii_counter counter = get_counter(); - co_await a(p_ctx); - std::println("b exited"); - ends_reached++; - co_return; - }; - auto c = - [b, get_counter, &ends_reached](async::context& p_ctx) -> future { - std::println("entering c"); - raii_counter counter = get_counter(); - co_await b(p_ctx); - std::println("c exited"); - ends_reached++; - co_return; - }; - - expect(count == std::make_pair(0, 0)); - expect(that % ends_reached == 0); - - auto future = c(ctx); - - expect(count == std::make_pair(0, 0)); - expect(that % ends_reached == 0); - - std::println("Resume until future reaches suspension @ coroutine A"); - future.resume(); - - expect(throws([&future]() { - std::println("This resume should throw an runtime_error"); - future.sync_wait(); - })) - << "runtime_error Exception was not caught!"; - expect(that % true == future.done()); - expect(that % false == future.has_value()); - expect(count == std::make_pair(3, 3)) - << "count is {" << count.first << ", " << count.second << "}\n"; - expect(that % ends_reached == 0); - expect(that % 0 == ctx.memory_used()); - }; - - "Proxy Context (normal behavior, no timeout)"_test = []() { - // Setup - test_context ctx; - std::println("===================================="); - std::println("Running Proxy Context Test (no timeout normal behavior)"); - std::println("===================================="); - - static constexpr auto expected_suspensions = 5; - static constexpr auto timeout_count = expected_suspensions + 2; - auto suspension_count = 0; - - auto b = [&suspension_count](async::context&) -> future { - while (suspension_count < expected_suspensions) { - suspension_count++; - // TODO(#44): For some reason this segfaults on Linux - // std::println("p_suspend_count = {}!", suspension_count); - co_await std::suspend_always{}; - } - co_return expected_suspensions; - }; - - auto a = [b](async::context& p_ctx) -> future { - std::println("Entered coroutine a!"); - auto proxy = async::proxy_context::from(p_ctx); - std::println("Made a proxy!"); - int counter = timeout_count; - auto supervised_future = b(proxy); - - while (not supervised_future.done()) { - std::println("supervised_future not done()!"); - if (counter <= 0) { - std::println("TIMEDOUT detected!"); - break; - } - std::println("resuming supervised_future..."); - supervised_future.resume(); - - std::println("suspending ourself..."); - co_await std::suspend_always{}; - counter--; - } - - std::println("finished while loop()!"); - - if (counter > 0) { - std::println("✅ SUCCESS!"); - co_return supervised_future.sync_wait(); - } - - std::println("TIMED OUT!!"); - - co_return -1; - }; - - auto my_future = a(ctx); - auto value = my_future.sync_wait(); - - expect(that % my_future.done()); - expect(that % expected_suspensions == value); - expect(that % 0 == ctx.memory_used()); - expect(that % suspension_count == expected_suspensions); - }; - - "Proxy Coroutines Timeout"_test = []() { - // Setup - test_context ctx; - std::println("===================================="); - std::println("Running Proxy Context Test (with timeout)"); - std::println("===================================="); - - static constexpr auto expected_suspensions = 5; - static constexpr auto timeout_count = expected_suspensions - 2; - auto suspension_count = 0; - - auto b = [&suspension_count](async::context&) -> future { - suspension_count = 0; - while (suspension_count < expected_suspensions) { - suspension_count++; - // TODO(#44): For some reason this segfaults on Linux - // std::println("p_suspend_count = {}!", suspension_count); - co_await std::suspend_always{}; - } - co_return expected_suspensions; - }; - - auto a = [b](async::context& p_ctx) -> future { - std::println("Entered coroutine a!"); - auto proxy = async::proxy_context::from(p_ctx); - std::println("Made a proxy!"); - int counter = timeout_count; - auto supervised_future = b(proxy); - - while (not supervised_future.done()) { - std::println("supervised_future not done()!"); - if (counter <= 0) { - std::println("TIMEDOUT detected!"); - break; - } - std::println("resuming supervised_future..."); - supervised_future.resume(); - - std::println("suspending ourself..."); - co_await std::suspend_always{}; - counter--; - } - - std::println("finished while loop()!"); - - if (counter > 0) { - std::println("✅ SUCCESS!"); - co_return supervised_future.sync_wait(); - } - - std::println("‼️ TIMED OUT!!"); - - co_return -1; - }; - - auto my_future = a(ctx); - auto value = my_future.sync_wait(); - auto value2 = a(ctx).sync_wait(); - - expect(that % my_future.done()); - expect(that % -1 == value); - expect(that % -1 == value2); - expect(that % suspension_count == timeout_count); - expect(that % 0 == ctx.memory_used()); - }; -#if 0 -#endif -}; -} // namespace async diff --git a/tests/basics.test.cpp b/tests/basics.test.cpp new file mode 100644 index 0000000..76e883b --- /dev/null +++ b/tests/basics.test.cpp @@ -0,0 +1,206 @@ +#include + +#include + +import async_context; +import test_utils; + +boost::ut::suite<"basics"> basics = []() { + using namespace boost::ut; + + "sync return"_test = []() { + // Setup + test_context ctx; + + static constexpr int expected_return_value = 5; + + unsigned step = 0; + auto sync_coroutine = [&step](async::context&) -> async::future { + step = 1; + return expected_return_value; + }; + + // Exercise + auto future = sync_coroutine(ctx); + + // Verify + expect(that % 0 == ctx.memory_used()); + expect(that % not ctx.info->scheduled_called_once); + expect(that % future.done()); + expect(that % future.has_value()); + expect(that % expected_return_value == future.value()); + expect(that % 1 == step); + }; + + "co_return"_test = []() { + // Setup + test_context ctx; + + static constexpr int expected_return_value = 5; + unsigned step = 0; + auto async_coroutine = [&step](async::context&) -> async::future { + step = 1; + co_return expected_return_value; + }; + + // Exercise 1 + auto future = async_coroutine(ctx); + + // Verify 1 + expect(that % 0 < ctx.memory_used()); + expect(that % not ctx.info->scheduled_called_once); + expect(that % not future.done()); + expect(that % not future.has_value()); + expect(that % 0 == step); + + // Exercise 2: start and finish coroutine + future.resume(); + + // Verify 2 + expect(that % 0 == ctx.memory_used()); + expect(that % not ctx.info->scheduled_called_once); + expect(that % future.done()); + expect(that % future.has_value()); + expect(that % expected_return_value == future.value()); + expect(that % 1 == step); + }; + + "suspend then co_return"_test = []() { + // Setup + test_context ctx; + + static constexpr int expected_return_value = 1413; + unsigned step = 0; + auto async_coroutine = [&step](async::context&) -> async::future { + step = 1; + co_await std::suspend_always{}; + step = 2; + co_return expected_return_value; + }; + + // Exercise 1 + auto future = async_coroutine(ctx); + + // Verify 1 + expect(that % 0 < ctx.memory_used()); + expect(that % not ctx.info->scheduled_called_once); + expect(that % not future.done()); + expect(that % not future.has_value()); + expect(that % 0 == step); + + // Exercise 2: start and suspend coroutine + future.resume(); + + // Verify 2 + expect(that % 0 < ctx.memory_used()); + expect(that % not ctx.info->scheduled_called_once); + expect(that % not future.done()); + expect(that % not future.has_value()); + expect(that % 1 == step); + + // Exercise 3: resume and co_return from coroutine + future.resume(); + + // Verify 3 + expect(that % 0 == ctx.memory_used()); + expect(that % not ctx.info->scheduled_called_once); + expect(that % future.done()); + expect(that % future.has_value()); + expect(that % expected_return_value == future.value()); + expect(that % 2 == step); + }; + + "co_await coroutine"_test = []() { + // Setup + test_context ctx; + + static constexpr int expected_return_value = 1413; + unsigned step = 0; + auto co2 = [&step](async::context&) -> async::future { + step = 2; + co_await std::suspend_always{}; + // skipped as the co1 will immediately resume + step = 3; + co_return expected_return_value; + }; + auto co = [&step, &co2](async::context& p_ctx) -> async::future { + step = 1; // skipped as the co2 will immediately start + co_await co2(p_ctx); + step = 4; + co_return expected_return_value; + }; + + // Exercise 1 + auto future = co(ctx); + + // Verify 1 + expect(that % 0 < ctx.memory_used()); + expect(that % not ctx.info->scheduled_called_once); + expect(that % not future.done()); + expect(that % not future.has_value()); + expect(that % 0 == step); + + // Exercise 2: start, enter co_2, start immediately and suspend + future.resume(); + + // Verify 2 + expect(that % 0 < ctx.memory_used()); + expect(that % not ctx.info->scheduled_called_once); + expect(that % not future.done()); + expect(that % not future.has_value()); + expect(that % 2 == step); + + // Exercise 3: resume, co_2 co_returns, immediately resumes parent, return + future.resume(); + + // Verify 3 + expect(that % 0 == ctx.memory_used()); + expect(that % not ctx.info->scheduled_called_once); + expect(that % future.done()); + expect(that % future.has_value()); + expect(that % expected_return_value == future.value()); + expect(that % 4 == step); + }; + + "co_await coroutine"_test = []() { + // Setup + test_context ctx; + + static constexpr int return_value1 = 1413; + static constexpr int return_value2 = 4324; + static constexpr int expected_total = return_value1 + return_value2; + + unsigned step = 0; + auto co2 = [](async::context&) -> async::future { + return return_value1; + }; + + auto co = [&step, &co2](async::context& p_ctx) -> async::future { + step = 1; // skipped as the co2 will immediately start + auto val = co_await co2(p_ctx); + step = 2; + co_return val + return_value2; + }; + + // Exercise 1 + auto future = co(ctx); + + // Verify 1 + expect(that % 0 < ctx.memory_used()); + expect(that % not ctx.info->scheduled_called_once); + expect(that % not future.done()); + expect(that % not future.has_value()); + expect(that % 0 == step); + + // Exercise 2: start, call co_2, returns value immediately and co_returns + future.resume(); + + // Verify 3 + expect(that % 0 == ctx.memory_used()); + expect(that % not ctx.info->scheduled_called_once); + expect(that % future.done()); + expect(that % future.has_value()); + expect(that % expected_total == future.value()); + expect(that % 2 == step); + }; +}; diff --git a/tests/blocked_by.test.cpp b/tests/blocked_by.test.cpp new file mode 100644 index 0000000..c851c93 --- /dev/null +++ b/tests/blocked_by.test.cpp @@ -0,0 +1,210 @@ +#include +#include + +#include + +import async_context; +import test_utils; + +boost::ut::suite<"blocking_states"> blocking_states = []() { + using namespace boost::ut; + using namespace std::chrono_literals; + + "co_await 10ms & co_await 50ms"_test = []() { + // Setup + test_context ctx; + + static constexpr int expected_return_value = 8748; + unsigned step = 0; + auto co = [&step](async::context&) -> async::future { + step = 1; + co_await 10ms; + step = 2; + co_await 25ms; + step = 3; + co_return expected_return_value; + }; + + // Exercise 1 + auto future = co(ctx); + + // Verify 1 + expect(that % 0 < ctx.memory_used()); + expect(that % not ctx.info->scheduled_called_once); + expect(that % not future.done()); + expect(that % not future.has_value()); + expect(that % 0 == step); + + // Exercise 2 + future.resume(); + + // Verify 2 + expect(that % 0 < ctx.memory_used()); + expect(that % ctx.info->scheduled_called_once); + expect(that % 1 == ctx.info->sleep_count); + expect(that % not future.done()); + expect(that % 10ms == ctx.info->last_sleep_time); + + expect(that % not future.has_value()); + expect(that % 1 == step); + + // Exercise 3 + future.resume(); + + // Verify 3 + expect(that % 0 < ctx.memory_used()); + expect(that % 2 == ctx.info->sleep_count); + expect(that % not future.done()); + expect(that % not future.has_value()); + expect(that % 2 == step); + expect(that % 25ms == ctx.info->last_sleep_time); + + // Exercise 4 + future.resume(); + + // Verify 4 + expect(that % 0 == ctx.memory_used()); + expect(that % 2 == ctx.info->sleep_count); + expect(that % future.done()); + expect(that % future.has_value()); + expect(that % 3 == step); + expect(that % expected_return_value == future.value()); + }; + + "context::block_by_io() "_test = []() { + // Setup + test_context ctx; + + unsigned step = 0; + bool io_complete = false; + + auto co = [&step, + &io_complete](async::context& p_ctx) -> async::future { + step = 1; + io_complete = false; + + while (not io_complete) { + co_await p_ctx.block_by_io(); + } + + step = 2; + + co_return; + }; + + // Exercise 1 + auto future = co(ctx); + + // Verify 1 + expect(that % 0 < ctx.memory_used()); + expect(that % not ctx.info->scheduled_called_once); + expect(that % not future.done()); + expect(that % 0 == step); + + // Exercise 2: enter loop and block by io + future.resume(); + + // Verify 2 + expect(that % 0 < ctx.memory_used()); + expect(that % ctx.info->scheduled_called_once); + expect(that % ctx.info->io_block); + expect(that % not future.done()); + expect(that % 1 == step); + + // Exercise 3: stay in loop and re-block on io + future.resume(); + + // Verify 3 + expect(that % 0 < ctx.memory_used()); + expect(that % ctx.info->scheduled_called_once); + expect(that % ctx.info->io_block); + expect(that % not future.done()); + expect(that % 1 == step); + + // Exercise 4: unblock IO and resume to final suspend + io_complete = true; + future.resume(); + + // Verify 4 + expect(that % 0 == ctx.memory_used()); + expect(that % ctx.info->scheduled_called_once); + expect(that % ctx.info->io_block); + expect(that % future.done()); + expect(that % 2 == step); + }; + + "blocked_by time, io, & sync"_test = []() { + // Setup + auto info = std::make_shared(); + test_context ctx1(info); + test_context ctx2(info); + + int step = 0; + + auto co = [&](async::context& p_context) -> async::future { + using namespace std::chrono_literals; + step = 1; + co_await 100ns; + step = 2; + co_await p_context.block_by_io(); + step = 3; + co_await p_context.block_by_sync(&ctx2); + step = 4; + co_return; + }; + + // Exercise 1 + auto future = co(ctx1); + + // Verify 1 + expect(that % 0 < ctx1.memory_used()); + expect(that % 0 == ctx2.memory_used()); + expect(that % not ctx1.info->scheduled_called_once); + expect(that % not future.done()); + expect(that % 0 == step); + + // Exercise 2 + future.resume(); + + // Verify 2 + expect(that % 0 < ctx1.memory_used()); + expect(that % 0 == ctx2.memory_used()); + expect(that % ctx1.info->scheduled_called_once); + expect(that % 100ns == ctx1.info->last_sleep_time); + expect(that % not ctx1.info->io_block); + expect(that % not future.done()); + expect(that % 1 == step); + + // Exercise 3 + future.resume(); + + // Verify 3 + expect(that % 0 < ctx1.memory_used()); + expect(that % 0 == ctx2.memory_used()); + expect(that % ctx1.info->scheduled_called_once); + expect(that % ctx1.info->io_block) << "context should be blocked by IO"; + expect(that % not future.done()); + expect(that % 2 == step); + + // Exercise 4: move to blocked by sync + ctx1.unblock(); + future.resume(); + + // Verify 4 + expect(that % 0 < ctx1.memory_used()); + expect(that % 0 == ctx2.memory_used()); + expect(that % not future.done()); + expect(that % &ctx2 == ctx1.info->sync_context) + << "sync context should be &ctx2"; + expect(that % 3 == step); + + // Exercise 5: finish + future.resume(); + + // Verify 5 + expect(that % 0 == ctx1.memory_used()); + expect(that % 0 == ctx2.memory_used()); + expect(that % future.done()); + expect(that % 4 == step); + }; +}; diff --git a/tests/cancel.test.cpp b/tests/cancel.test.cpp new file mode 100644 index 0000000..a4bc241 --- /dev/null +++ b/tests/cancel.test.cpp @@ -0,0 +1,218 @@ +#include +#include + +#include + +import async_context; +import test_utils; + +boost::ut::suite<"cancellation_tests"> cancellation_tests = []() { + using namespace boost::ut; + using namespace std::chrono_literals; + + "Cancellation"_test = []() { + // Setup + test_context ctx; + + std::pair count{ 0, 0 }; + int ends_reached = 0; + + auto get_counter = [&count]() -> auto { + return raii_counter( + std::make_pair(&count.first, &count.second)); + }; + + auto a = [get_counter, + &ends_reached](async::context& p_ctx) -> async::future { + std::println("[future cancel] entering a"); + raii_counter counter = get_counter(); + co_await std::suspend_always{}; + std::println("[future cancel] a exited"); + ends_reached++; + co_return; + }; + + auto b = [a, get_counter, &ends_reached]( + async::context& p_ctx) -> async::future { + std::println("[future cancel] entering b"); + raii_counter counter = get_counter(); + co_await a(p_ctx); + std::println("[future cancel] b exited"); + ends_reached++; + co_return; + }; + + auto c = [b, get_counter, &ends_reached]( + async::context& p_ctx) -> async::future { + std::println("[future cancel] entering c"); + raii_counter counter = get_counter(); + co_await b(p_ctx); + std::println("[future cancel] c exited"); + ends_reached++; + co_return; + }; + + { + expect(count == std::make_pair(0, 0)) + << "count is {" << count.first << ", " << count.second << "}\n"; + expect(that % ends_reached == 0); + + auto future = c(ctx); + + expect(count == std::make_pair(0, 0)) + << "count is {" << count.first << ", " << count.second << "}\n"; + expect(that % ends_reached == 0); + + future.resume(); + + expect(count == std::make_pair(3, 0)) + << "count is {" << count.first << ", " << count.second << "}\n"; + expect(that % ends_reached == 0); + expect(that % 0 < ctx.memory_used()); + } // destroy future + + expect(count == std::make_pair(3, 3)) + << "count is {" << count.first << ", " << count.second << "}\n"; + expect(that % ends_reached == 0); + expect(that % 0 == ctx.memory_used()); + }; + + "Context Cancellation"_test = []() { + // Setup + test_context ctx; + + std::pair count{ 0, 0 }; + int ends_reached = 0; + + auto get_counter = [&count]() -> auto { + return raii_counter( + std::make_pair(&count.first, &count.second)); + }; + + auto a = [get_counter, + &ends_reached](async::context& p_ctx) -> async::future { + std::println("[context cancel] entering a"); + raii_counter counter = get_counter(); + co_await std::suspend_always{}; + std::println("[context cancel] a exited"); + ends_reached++; + co_return; + }; + auto b = [a, get_counter, &ends_reached]( + async::context& p_ctx) -> async::future { + std::println("[context cancel] entering b"); + raii_counter counter = get_counter(); + co_await a(p_ctx); + std::println("[context cancel] b exited"); + ends_reached++; + co_return; + }; + auto c = [b, get_counter, &ends_reached]( + async::context& p_ctx) -> async::future { + std::println("[context cancel] entering c"); + raii_counter counter = get_counter(); + co_await b(p_ctx); + std::println("[context cancel] c exited"); + ends_reached++; + co_return; + }; + + expect(count == std::make_pair(0, 0)); + expect(that % ends_reached == 0); + + auto future = c(ctx); + + expect(count == std::make_pair(0, 0)); + expect(that % ends_reached == 0); + + future.resume(); + + expect(count == std::make_pair(3, 0)); + expect(that % ends_reached == 0); + expect(that % 0 < ctx.memory_used()); + expect(that % false == future.has_value()); + expect(that % false == future.done()); + + ctx.unsafe_cancel(); + + expect(count == std::make_pair(3, 3)); + expect(that % ends_reached == 0); + expect(that % 0 == ctx.memory_used()); + expect(that % false == future.has_value()); + // Unfortunately, context doesn't have the information necessary to this + // future. The future is invalid, but we currently cannot change its state + // from the perview of the context. + expect(that % false == future.done()); + }; + + "Exception Propagation"_test = []() { + // Setup + test_context ctx; + + std::pair count{ 0, 0 }; + int ends_reached = 0; + + auto get_counter = [&count]() -> auto { + return raii_counter( + std::make_pair(&count.first, &count.second)); + }; + + bool should_throw = true; + int step = 0; + auto a = [&](async::context& p_ctx) -> async::future { + step = 3; + std::println("[exception] entering a"); + raii_counter counter = get_counter(); + co_await std::suspend_always{}; + step = 4; + if (should_throw) { + std::println("[exception] ‼️ throwing runtime error"); + throw std::runtime_error("Throwing this error for the test"); + } + std::println("[exception] a exited"); + ends_reached++; + co_return; + }; + auto b = [&](async::context& p_ctx) -> async::future { + step = 2; + std::println("[exception] entering b"); + raii_counter counter = get_counter(); + co_await a(p_ctx); + std::println("[exception] b exited"); + ends_reached++; + co_return; + }; + auto c = [&](async::context& p_ctx) -> async::future { + step = 1; + std::println("[exception] entering c"); + raii_counter counter = get_counter(); + co_await b(p_ctx); + std::println("[exception] c exited"); + ends_reached++; + co_return; + }; + + expect(count == std::make_pair(0, 0)); + expect(that % ends_reached == 0); + + auto future = c(ctx); + + expect(count == std::make_pair(0, 0)); + expect(that % ends_reached == 0); + + std::println("Resume until future reaches suspension @ coroutine A"); + future.resume(); + + expect(throws([&]() { + future.resume(); + future.resume(); + })) + << "runtime_error exception was not thrown!"; + expect(that % future.done()); + expect(that % not future.has_value()); + expect(count == std::make_pair(3, 3)) + << "count is {" << count.first << ", " << count.second << "}\n"; + expect(that % ends_reached == 0); + expect(that % 0 == ctx.memory_used()); + }; +}; diff --git a/tests/guards.test.cpp b/tests/guards.test.cpp new file mode 100644 index 0000000..3215aad --- /dev/null +++ b/tests/guards.test.cpp @@ -0,0 +1,130 @@ +#include +#include + +#include + +import async_context; +import test_utils; + +boost::ut::suite<"guards_tests"> guards_tests = []() { + using namespace boost::ut; + using namespace std::chrono_literals; + "Context Token"_test = []() { + // Setup + test_context ctx1; + test_context ctx2; + + async::context_token io_in_use; + + auto single_resource = + [&](async::context& p_context) -> async::future { + using namespace std::chrono_literals; + + std::println("Executing 'single_resource' coroutine"); + while (io_in_use) { + // TODO(#44): For some reason this segfaults on Linux + // std::println("Resource unavailable, blocked by {}", + // io_in_use.address()); + co_await io_in_use.set_as_block_by_sync(p_context); + } + + // Block next coroutine from using this resource + io_in_use = p_context; + + // setup dma transaction... + + // It cannot be assumed that the scheduler will not sync_wait() this + // coroutine, thus, a loop is required to sure that the async operation + // has actually completed. + while (io_in_use == p_context) { + std::println("Waiting on io complete flag, blocking by I/O"); + // Continually notify that this is blocked by IO + co_await p_context.block_by_io(); + } + + std::println("IO operation complete! Returning!"); + + co_return; + }; + + std::println("🧱 Future setup"); + auto access_first = single_resource(ctx1); + auto access_second = single_resource(ctx2); + + expect(that % 0 < ctx1.memory_used()); + expect(that % 0 < ctx2.memory_used()); + + auto check_access_first_blocked_by = + [&](async::blocked_by p_state = async::blocked_by::io, + std::source_location const& p_location = + std::source_location::current()) { + expect(that % p_state == ctx1.state()) + << "ctx1 state mismatch, line: " << p_location.line() << '\n'; + }; + + auto check_access_second_blocked_by = + [&](async::blocked_by p_state = async::blocked_by::nothing, + std::source_location const& p_location = + std::source_location::current()) { + expect(that % p_state == ctx2.state()) + << "ctx2 state mismatch, line: " << p_location.line() << '\n'; + }; + + // access_first will claim the resource and will return control, and be + // blocked by IO. + std::println("▶️ Resume 1st: 1"); + access_first.resume(); + + check_access_first_blocked_by(); + check_access_second_blocked_by(); + + std::println("▶️ Resume 1st: 2"); + access_first.resume(); + + check_access_first_blocked_by(); + check_access_second_blocked_by(); + + std::println("▶️ Resume 1st: 3"); + access_first.resume(); + + check_access_first_blocked_by(); + check_access_second_blocked_by(); + + std::println("▶️ Resume 2nd: 1"); + access_second.resume(); + + check_access_first_blocked_by(); + check_access_second_blocked_by(async::blocked_by::sync); + + io_in_use.unblock_and_clear(); + + check_access_first_blocked_by(async::blocked_by::nothing); + check_access_second_blocked_by(async::blocked_by::sync); + + std::println("▶️ Resume 2nd: 2"); + access_second.resume(); + + // Resuming access_second shouldn't change the state of anything + check_access_first_blocked_by(async::blocked_by::nothing); + check_access_second_blocked_by(async::blocked_by::io); + + std::println("▶️ Resume 1st: 4, this should finish the operation"); + access_first.resume(); + + expect(that % ctx1.state() == async::blocked_by::nothing); + expect(that % access_first.done()); + + check_access_second_blocked_by(async::blocked_by::io); + access_second.resume(); + check_access_second_blocked_by(async::blocked_by::io); + + io_in_use.unblock_and_clear(); + access_second.resume(); + + expect(that % ctx2.state() == async::blocked_by::nothing); + expect(that % access_second.done()); + + expect(that % 0 == ctx1.memory_used()); + expect(that % 0 == ctx2.memory_used()); + }; +}; diff --git a/tests/main.test.cpp b/tests/main.test.cpp index 9482d00..40e2379 100644 --- a/tests/main.test.cpp +++ b/tests/main.test.cpp @@ -12,16 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. -// export module libahl_unit_tests; - namespace async { -// Extern position dependant test go here. Refrain from using this whenever -// possible. -extern void async_context_suite(); } // namespace async int main() { - // Position dependent test go below: - async::async_context_suite(); } diff --git a/tests/proxy.test.cpp b/tests/proxy.test.cpp new file mode 100644 index 0000000..7837415 --- /dev/null +++ b/tests/proxy.test.cpp @@ -0,0 +1,160 @@ +#include +#include + +#include + +import async_context; +import test_utils; + +boost::ut::suite<"proxy_tests"> proxy_tests = []() { + using namespace boost::ut; + using namespace std::chrono_literals; + + "Proxy Context (normal behavior, no timeout)"_test = []() { + // Setup + test_context ctx; + std::println("===================================="); + std::println("Running Proxy Context Test (no timeout normal behavior)"); + std::println("===================================="); + + static constexpr auto expected_suspensions = 5; + static constexpr auto timeout_count = expected_suspensions + 2; + auto suspension_count = 0; + + auto b = [&suspension_count](async::context&) -> async::future { + while (suspension_count < expected_suspensions) { + suspension_count++; + // TODO(#44): For some reason this segfaults on Linux + // std::println("p_suspend_count = {}!", suspension_count); + co_await std::suspend_always{}; + } + co_return expected_suspensions; + }; + + auto a = [b](async::context& p_ctx) -> async::future { + std::println("Entered coroutine a!"); + auto proxy = async::proxy_context::from(p_ctx); + std::println("Made a proxy!"); + int counter = timeout_count; + auto supervised_future = b(proxy); + + while (not supervised_future.done()) { + std::println("supervised_future not done()!"); + if (counter <= 0) { + std::println("TIMEDOUT detected!"); + break; + } + std::println("resuming supervised_future..."); + supervised_future.resume(); + + std::println("suspending ourself..."); + co_await std::suspend_always{}; + counter--; + } + + std::println("finished while loop()!"); + + if (counter > 0) { + if (supervised_future.has_value()) { + std::println("✅ SUCCESS!"); + co_return supervised_future.value(); + } else { + std::println("‼️ No value after completion!"); + co_return -2; + } + } else { + std::println("‼️ TIMED OUT!!"); + co_return -1; + } + + std::println("TIMED OUT!!"); + + co_return -1; + }; + + auto my_future = a(ctx); + while (not my_future.done()) { + my_future.resume(); + } + + expect(my_future.has_value()); + auto value = my_future.value(); + + expect(that % my_future.done()); + expect(that % expected_suspensions == value); + expect(that % 0 == ctx.memory_used()); + expect(that % suspension_count == expected_suspensions); + }; + + "Proxy Coroutines Timeout"_test = []() { + // Setup + test_context ctx; + std::println("===================================="); + std::println("Running Proxy Context Test (with timeout)"); + std::println("===================================="); + + static constexpr auto expected_suspensions = 5; + static constexpr auto timeout_count = expected_suspensions - 2; + auto suspension_count = 0; + + auto b = [&suspension_count](async::context&) -> async::future { + suspension_count = 0; + while (suspension_count < expected_suspensions) { + suspension_count++; + // TODO(#44): For some reason this segfaults on Linux + // std::println("p_suspend_count = {}!", suspension_count); + co_await std::suspend_always{}; + } + co_return expected_suspensions; + }; + + auto a = [b](async::context& p_ctx) -> async::future { + std::println("Entered coroutine a!"); + auto proxy = async::proxy_context::from(p_ctx); + std::println("Made a proxy!"); + int counter = timeout_count; + auto supervised_future = b(proxy); + + while (not supervised_future.done()) { + std::println("supervised_future not done()!"); + if (counter <= 0) { + std::println("TIMEDOUT detected!"); + break; + } + std::println("resuming supervised_future..."); + supervised_future.resume(); + + std::println("suspending ourself..."); + co_await std::suspend_always{}; + counter--; + } + + std::println("finished while loop()!"); + + if (counter > 0) { + if (supervised_future.has_value()) { + std::println("✅ SUCCESS!"); + co_return supervised_future.value(); + } else { + std::println("‼️ No value after completion!"); + co_return -2; + } + } else { + std::println("‼️ TIMED OUT!!"); + co_return -1; + } + }; + + auto future = a(ctx); + + while (not future.done()) { + future.resume(); + } + auto value = future.value(); + + expect(that % future.done()); + expect(that % -1 == value); + expect(that % suspension_count == timeout_count); + expect(that % 0 == ctx.memory_used()); + }; +}; diff --git a/tests/util.cppm b/tests/util.cppm new file mode 100644 index 0000000..52abe51 --- /dev/null +++ b/tests/util.cppm @@ -0,0 +1,131 @@ +module; + +#include +#include +#include +#include +#include +#include +#include + +#include + +export module test_utils; + +import async_context; + +export namespace async { +std::ostream& operator<<(std::ostream& out, blocked_by b) +{ + switch (b) { + case blocked_by::nothing: + return out << "nothing"; + case blocked_by::time: + return out << "time"; + case blocked_by::io: + return out << "io"; + case blocked_by::sync: + return out << "sync"; + case blocked_by::external: + return out << "external"; + default: + // For unknown values we print the numeric value + return out << "blocked_by(" << static_cast(b) << ')'; + } +} +} // namespace async + +export { + struct thread_info + { + async::context* sync_context = nullptr; + int sleep_count = 0; + bool io_block = false; + async::sleep_duration last_sleep_time = {}; + bool scheduled_called_once = false; + }; + + struct test_context : public async::basic_context + { + std::shared_ptr info; + std::array m_stack{}; + + test_context(std::shared_ptr const& p_info) + : info(p_info) + { + this->initialize_stack_memory(m_stack); + } + test_context() + : info(std::make_shared()) + { + this->initialize_stack_memory(m_stack); + } + + private: + void do_schedule(async::blocked_by p_block_state, + async::block_info p_block_info) noexcept override + { + info->scheduled_called_once = true; + + switch (p_block_state) { + case async::blocked_by::time: { + if (std::holds_alternative(p_block_info)) { + auto const time = std::get(p_block_info); + info->sleep_count++; + info->last_sleep_time = time; + } + unblock_without_notification(); + break; + } + case async::blocked_by::sync: { + if (std::holds_alternative(p_block_info)) { + auto* context = std::get(p_block_info); + std::println( + "Coroutine ({}) is blocked by syncronization with coroutine ({})", + static_cast(this), + static_cast(context)); + info->sync_context = context; + } + break; + } + case async::blocked_by::io: { + info->io_block = true; + break; + } + case async::blocked_by::nothing: { + std::println("Context ({}) has been unblocked!", + static_cast(this)); + break; + } + default: { + break; + } + } + } + }; + + struct sleep_counter + { + int count = 0; + + void operator()(async::sleep_duration) + { + count++; + } + }; + + struct raii_counter + { + raii_counter(std::pair p_counts) + : counts(p_counts) + { + (*counts.first)++; + } + + ~raii_counter() // NOLINT(bugprone-exception-escape) + { + (*counts.second)++; + } + std::pair counts; + }; +}