diff --git a/src/rpp/concurrent_queue.h b/src/rpp/concurrent_queue.h index 05b86b5..bf3c850 100644 --- a/src/rpp/concurrent_queue.h +++ b/src/rpp/concurrent_queue.h @@ -5,7 +5,8 @@ */ #pragma once #include "config.h" -#include "semaphore.h" +#include "mutex.h" +#include "condition_variable.h" #include #include // std::this_thread::yield() #include // std::is_trivially_destructible_v @@ -22,7 +23,7 @@ namespace rpp * @note This is not optimized for speed, but has acceptable performance * and due to its simplicity it won't randomly deadlock on you. */ - template + template class concurrent_queue { // for faster performance the Head and Tail are always within the linear range @@ -38,15 +39,25 @@ namespace rpp using time_point = clock::time_point; private: - using mutex_t = rpp::mutex; - using lock_t = rpp::semaphore::lock_t; - mutable mutex_t Mutex; - mutable rpp::condition_variable Waiter; + using lock_t = std::unique_lock; + std::optional MutexStorage; + std::optional WaiterStorage; + mutex_t& Mutex; + cond_var_t& Waiter; // special state flag for all waiters to immediately exit the queue std::atomic_bool Cleared = false; public: - concurrent_queue() = default; + concurrent_queue() + : Mutex{MutexStorage.emplace()} + , Waiter{WaiterStorage.emplace()} + {} + + concurrent_queue(mutex_t& mutex, cond_var_t& waiter) + : Mutex{mutex} + , Waiter{waiter} + {} + ~concurrent_queue() noexcept { // safely lock, clear and notify all waiters to give up @@ -59,6 +70,8 @@ namespace rpp concurrent_queue& operator=(const concurrent_queue&) = delete; concurrent_queue(concurrent_queue&& q) noexcept + : Mutex{MutexStorage.emplace()} + , Waiter{WaiterStorage.emplace()} { // safely swap the states from q to default empty state std::lock_guard lock { q.mutex() }; diff --git a/src/rpp/shm_mutex.cpp b/src/rpp/shm_mutex.cpp new file mode 100644 index 0000000..6c7605e --- /dev/null +++ b/src/rpp/shm_mutex.cpp @@ -0,0 +1,80 @@ +#if __linux__ +#include "shm_mutex.h" +#include "debugging.h" +#include + +// TODO: throw instead of logging errors + +namespace rpp { + shm_mutex::shm_mutex() noexcept + { + pthread_mutexattr_t attr; + pthread_mutexattr_init(&attr); + if (int err = pthread_mutexattr_setpshared(&attr, PTHREAD_PROCESS_SHARED); err != 0) + { + LogError("shm_mutex: pthread_mutexattr_setpshared failed: error code %d", err); + return; + } + pthread_mutex_init(&handle, &attr); + } + + shm_mutex::~shm_mutex() noexcept + { + if (int err = pthread_mutex_destroy(&handle); err != 0) + { + LogError("shm_mutex: pthread_mutex_destroy failed: error code %d", err); + } + } + + void shm_mutex::lock() + { + if (int err = pthread_mutex_lock(&handle); err != 0) + { + LogError("shm_mutex: pthread_mutex_lock failed: error code %d", err); + } + } + + bool shm_mutex::try_lock() + { + int err = pthread_mutex_trylock(&handle); + if (err != 0 && err != EBUSY) + { + LogError("shm_mutex: pthread_mutex_trylock failed: error code %d", err); + return false; + } + return err == 0; + } + + void shm_mutex::unlock() + { + if (int err = pthread_mutex_unlock(&handle); err != 0) + { + LogError("shm_mutex: pthread_mutex_unlock failed: error code %d", err); + } + } + + // ------------------------------------------------------- + + shm_cond_var::shm_cond_var() + { + pthread_condattr_t attr; + pthread_condattr_init(&attr); + if (int err = pthread_condattr_setpshared(&attr, PTHREAD_PROCESS_SHARED); err != 0) + { + LogError("shm_mutex: pthread_mutexattr_setpshared failed: error code %d", err); + return; + } + pthread_cond_init(&handle, &attr); + } + + shm_cond_var::~shm_cond_var() noexcept + { + if (int err = pthread_cond_destroy(&handle); err != 0) + { + LogError("shm_mutex: pthread_mutex_destroy failed: error code %d", err); + } + } + +} // namespace rpp + +#endif // __linux__ diff --git a/src/rpp/shm_mutex.h b/src/rpp/shm_mutex.h new file mode 100644 index 0000000..8f03c1b --- /dev/null +++ b/src/rpp/shm_mutex.h @@ -0,0 +1,135 @@ +#pragma once +#if __linux__ +#include +#include +#include +#include + +namespace rpp { + + template + inline timespec to_timespec(const std::chrono::time_point& tp) + { + using namespace std::chrono; + + // Get duration since the system_clock epoch, in nanoseconds + auto d = tp.time_since_epoch(); + auto ns = duration_cast(d).count(); + + timespec ts; + ts.tv_sec = ns / 1'000'000'000; + ts.tv_nsec = ns % 1'000'000'000; + return ts; + } + + class shm_mutex + { + pthread_mutex_t handle; + public: + using native_handle_type = pthread_mutex_t*; + + shm_mutex() noexcept; + ~shm_mutex() noexcept; + + // No copy/move + shm_mutex(const shm_mutex&) = delete; + shm_mutex& operator=(const shm_mutex&) = delete; + shm_mutex(shm_mutex&&) = delete; + shm_mutex& operator=(shm_mutex&&) = delete; + + void lock(); + bool try_lock(); + void unlock(); + + native_handle_type native_handle() noexcept { return &handle; } + }; + + class shm_cond_var + { + shm_mutex cs; + pthread_cond_t handle; + + using clock = std::chrono::high_resolution_clock; + using duration = clock::duration; + using time_point = clock::time_point; + public: + using native_handle_type = pthread_cond_t*; + shm_cond_var(); + ~shm_cond_var() noexcept; + + // No copy/move + shm_cond_var(const shm_cond_var&) = delete; + shm_cond_var& operator=(const shm_cond_var&) = delete; + shm_cond_var(shm_cond_var&&) = delete; + shm_cond_var& operator=(shm_cond_var&&) = delete; + + void notify_one() + { + pthread_cond_signal(&handle); + } + void notify_all() + { + pthread_cond_broadcast(&handle); + } + + void wait(std::unique_lock& lock) + { + cs.lock(); + lock.unlock(); + (void)pthread_cond_wait(&handle, cs.native_handle()); + cs.unlock(); + lock.lock(); + } + + std::cv_status wait_for(std::unique_lock& lock, const duration& rel_time) + { + auto abs_time = clock::now() + rel_time; + return wait_until(lock, abs_time); + } + + template + void wait(std::unique_lock& mtx, const Predicate& stop_waiting) + { + while (!stop_waiting()) + wait(mtx); + } + + std::cv_status wait_until(std::unique_lock& lock, const time_point& abs_time) + { + cs.lock(); + lock.unlock(); + timespec abstime = to_timespec(abs_time); + int err = pthread_cond_timedwait(&handle, cs.native_handle(), &abstime); + std::cv_status status = (err == 0) ? std::cv_status::no_timeout : std::cv_status::timeout; // TODO: handle other errors + cs.unlock(); + lock.lock(); + return status; + } + + template + bool wait_until(std::unique_lock& lock, const time_point& abs_time, + const Predicate& stop_waiting) + { + while (!stop_waiting()) + if (wait_until(lock, abs_time) == std::cv_status::timeout) + return stop_waiting(); + return true; + } + + template + bool wait_for(std::unique_lock& lock, const duration& rel_time, + const Predicate& stop_waiting) + { + auto abs_time = clock::now() + rel_time; + while (!stop_waiting()) + if (wait_until(lock, abs_time) == std::cv_status::timeout) + return stop_waiting(); + return true; + } + + native_handle_type native_handle() noexcept { return &handle; } + }; + +} // namespace rpp + +#endif // __linux__ diff --git a/tests/test_shm_concurrent_queue.cpp b/tests/test_shm_concurrent_queue.cpp new file mode 100644 index 0000000..07a2dd8 --- /dev/null +++ b/tests/test_shm_concurrent_queue.cpp @@ -0,0 +1,624 @@ +#ifdef __linux__ + +#include +#include +#include +#include +#include +#include +using namespace rpp; +using namespace std::chrono_literals; + +// TODO: find a way to deduplicate test for queue with pthread mutex and condition_variable + +TestImpl(test_concurrent_queue_pthread) +{ + static constexpr double MS = 1.0 / 1000.0; + +#if APPVEYOR + static constexpr double sigma_s = 0.02; + static constexpr double sigma_ms = sigma_s * 2000.0; +#else + static constexpr double sigma_s = 0.01; + static constexpr double sigma_ms = sigma_s * 1000.0; +#endif + #define CONCURRENT_QUEUE rpp::concurrent_queue + using Clock = CONCURRENT_QUEUE::clock; + using Millis = std::chrono::milliseconds; + + TestInit(test_concurrent_queue_pthread) + { + } + + TestCase(push_and_pop) + { + CONCURRENT_QUEUE queue; + queue.push("item1"); + queue.push("item2"); + std::string item3 = "item3"; + queue.push(item3); // copy + AssertThat(queue.size(), 3); + AssertThat(queue.safe_size(), 3); + AssertThat(queue.empty(), false); + + AssertThat(queue.pop(), "item1"); + AssertThat(queue.pop(), "item2"); + AssertThat(queue.pop(), "item3"); + AssertThat(queue.size(), 0); + AssertThat(queue.safe_size(), 0); + AssertThat(queue.empty(), true); + + queue.push_no_notify("item4"); + AssertThat(queue.size(), 1); + AssertThat(queue.safe_size(), 1); + AssertThat(queue.empty(), false); + AssertThat(queue.pop(), "item4"); + } + + TestCase(clear) + { + CONCURRENT_QUEUE queue; + queue.push("item1"); + queue.push("item2"); + queue.push("item3"); + AssertThat(queue.size(), 3); + AssertThat(queue.empty(), false); + queue.clear(); + AssertThat(queue.size(), 0); + AssertThat(queue.empty(), true); + } + + TestCase(atomic_copy) + { + CONCURRENT_QUEUE queue; + queue.push("item1"); + queue.push("item2"); + queue.push("item3"); + std::vector items = queue.atomic_copy(); + AssertThat(items.size(), 3); + AssertThat(items.at(0), "item1"); + AssertThat(items.at(1), "item2"); + AssertThat(items.at(2), "item3"); + } + + TestCase(iterate_locked) + { + CONCURRENT_QUEUE queue; + queue.push("item1"); + queue.push("item2"); + queue.push("item3"); + + std::vector items; + for (const std::string& item : queue.iterator()) + items.push_back(item); + AssertThat(items.size(), 3); + AssertThat(items.at(0), "item1"); + AssertThat(items.at(1), "item2"); + AssertThat(items.at(2), "item3"); + } + + TestCase(iterate_external_lock) + { + CONCURRENT_QUEUE queue; + queue.push("item1"); + queue.push("item2"); + queue.push("item3"); + + std::vector items; + { + std::unique_lock lock = queue.spin_lock(); + for (const std::string& item : queue.iterator(lock)) + items.push_back(item); + } + AssertThat(items.size(), 3); + AssertThat(items.at(0), "item1"); + AssertThat(items.at(1), "item2"); + AssertThat(items.at(2), "item3"); + } + + TestCase(can_erase_every_second_item) + { + concurrent_queue queue; + std::vector expected; + + for (int i = 0; i < 1000; ++i) + { + queue.push(i); + if (i % 2 != 0) expected.push_back(i); + } + + { + auto iterator = queue.iterator(); + for (auto it = iterator.begin(); it != iterator.end(); ) + { + if (*it % 2 == 0) + it = iterator.erase(it); + else + ++it; + } + } + + std::vector items; + for (int item : queue.iterator()) + items.push_back(item); + + AssertThat(items.size(), 500); + AssertEqual(items, expected); + } + + TestCase(rapid_growth) + { + constexpr int MAX_SIZE = 40'000; + CONCURRENT_QUEUE queue; + for (int i = 0; i < MAX_SIZE; ++i) + queue.push("item"); + AssertThat(queue.size(), MAX_SIZE); + + int numPopped = 0; + std::string item; + while (queue.wait_pop(item, 50ms)) + ++numPopped; + AssertThat(numPopped, MAX_SIZE); + } + + TestCase(rapid_growth_async) + { + constexpr int MAX_SIZE = 40'000; + CONCURRENT_QUEUE queue; + cfuture<> producer = rpp::async_task([&] { + for (int i = 0; i < MAX_SIZE; ++i) + queue.push("item"); + }); + scope_guard([&]{ producer.get(); }); + + rpp::Timer t; + int numPopped = 0; + std::string item; + while (numPopped < MAX_SIZE && queue.wait_pop(item, 100ms)) + { + ++numPopped; + } + + double elapsed = t.elapsed(); + double megaitems_per_sec = numPopped / (elapsed * 1'000'000.0); + print_info("rapid_growth_async elapsed: %.2f ms %.0f Mitem/s\n", elapsed*1000, megaitems_per_sec); + AssertThat(numPopped, MAX_SIZE); + } + + // try_pop() is excellent for polling scenarios + // if you don't want to wait for an item, but just check + // if any work could be done, otherwise just continue + TestCase(try_pop) + { + CONCURRENT_QUEUE queue; + std::string item; + AssertThat(queue.try_pop(item), false); + AssertThat(item, ""); + + queue.push("item1"); + AssertThat(queue.try_pop(item), true); + AssertThat(item, "item1"); + AssertThat(queue.try_pop(item), false); + AssertThat(item, "item1"); + + queue.push("item2"); + queue.push("item3"); + AssertThat(queue.try_pop(item), true); + AssertThat(item, "item2"); + AssertThat(queue.try_pop(item), true); + AssertThat(item, "item3"); + AssertThat(queue.try_pop(item), false); + AssertThat(item, "item3"); + } + + TestCase(atomic_flush) + { + CONCURRENT_QUEUE queue; + queue.push("item1"); + queue.push("item2"); + queue.push("item3"); + + // count the number of tasks that were atomically processed + std::atomic_int numProcessed = 0; + cfuture<> worker = rpp::async_task([&] + { + std::string item; + while (queue.pop_atomic_start(item)) + { + rpp::sleep_ms(1); // simulate work + ++numProcessed; + queue.pop_atomic_end(); + } + }); + scope_guard([&]{ worker.get(); }); + + // flush + rpp::Timer t; + while (!queue.empty()) + { + rpp::sleep_us(100); + if (t.elapsed_ms() > 100) { + AssertFailed("queue could not empty itself! size=%zu", queue.size()); + break; + } + } + AssertThat(numProcessed, 3); + } + + // wait_pop() is best used for producer/consumer scenarios + // for long-lived service threads that don't have any cancellation mechanism + TestCase(wait_pop_producer_consumer) + { + CONCURRENT_QUEUE queue; + + cfuture<> producer = rpp::async_task([&] { + queue.push("item1"); + queue.push("item2"); + queue.push("item3"); + }); + + cfuture<> consumer = rpp::async_task([&] { + std::string item1 = *queue.wait_pop(); + AssertThat(item1, "item1"); + std::string item2 = *queue.wait_pop(); + AssertThat(item2, "item2"); + std::string item3 = *queue.wait_pop(); + AssertThat(item3, "item3"); + }); + + producer.get(); + consumer.get(); + } + + // wait_pop() is best used for producer/consumer scenarios + // for long-lived service threads that don't have any cancellation mechanism + // other than basic notify() + TestCase(wait_pop_2_producer_consumer) + { + CONCURRENT_QUEUE queue; + + cfuture<> producer = rpp::async_task([&] { + queue.push("item1"); + queue.push("item2"); + queue.push("item3"); + rpp::sleep_ms(5); + queue.notify_one(); // notify consumer + }); + + cfuture<> consumer = rpp::async_task([&] { + std::string item1, item2, item3; + AssertTrue(queue.wait_pop(item1)); + AssertThat(item1, "item1"); + AssertTrue(queue.wait_pop(item2)); + AssertThat(item2, "item2"); + AssertTrue(queue.wait_pop(item3)); + AssertThat(item3, "item3"); + + // enter infinite wait, but we should be notified by the producer + std::string item4; + AssertFalse(queue.wait_pop(item4)); + AssertThat(item4, ""); + }); + + producer.get(); + consumer.get(); + } + + struct PopResult + { + std::string item; + double elapsed_ms; + bool success = false; + explicit operator bool() const noexcept { return success; } + }; + + PopResult wait_pop_timed(CONCURRENT_QUEUE& queue, Clock::duration timeout) + { + PopResult r; + rpp::Timer t; + r.success = queue.wait_pop(r.item, timeout); + r.elapsed_ms = t.elapsed_millis(); + print_info("wait_pop_timed elapsed: %.2f ms item: %s\n", r.elapsed_ms, r.item.c_str()); + return r; + } + + #define AssertWaitPopTimed(timeout, expectSuccess, expectItem, minElapsedMs, maxElapsedMs) \ + AssertThat(bool(r = wait_pop_timed(queue, timeout)), expectSuccess); \ + AssertThat(r.item, expectItem); \ + AssertInRange(r.elapsed_ms, minElapsedMs, maxElapsedMs); + + // wait infinitely until an item is pushed + TestCase(wait_pop_with_timeout) + { + CONCURRENT_QUEUE queue; + + PopResult r; + AssertWaitPopTimed(5ms, false, /*item*/"", /*elapsed ms:*/ 4.0, 10.0); + AssertWaitPopTimed(0ms, false, /*item*/"", /*elapsed ms:*/ 0.0, 0.2); + + // if someone pushes an item if we have a huge timeout, + // we should get it immediately + queue.push("item1"); + AssertWaitPopTimed(10s, true, /*item*/"item1", /*elapsed ms:*/ 0.0, 10.0); + AssertWaitPopTimed(15ms, false, /*item*/"", /*elapsed ms:*/ 14.0, 20.0); + } + + // introduce a slow producer thread so we can test our timeouts + TestCase(wait_pop_with_timeout_slow_producer) + { + CONCURRENT_QUEUE queue; + rpp::cfuture<> slow_producer = rpp::async_task([&] { + spin_sleep_for_ms(50); + queue.push("item1"); + spin_sleep_for_ms(50); + queue.push("item2"); + spin_sleep_for_ms(50); + queue.push("item3"); + spin_sleep_for_ms(100); + queue.push("item4"); + }); + scope_guard([&]{ slow_producer.get(); }); + + PopResult r; + AssertWaitPopTimed(5ms, false, /*item*/"", /*elapsed ms:*/ 4.0, 11.0); + AssertWaitPopTimed(0ms, false, /*item*/"", /*elapsed ms:*/ 0.0, 0.5); + AssertWaitPopTimed(15ms, false, /*item*/"", /*elapsed ms:*/ 14.0, 18.0); + + AssertWaitPopTimed(50ms, true, /*item*/"item1", /*elapsed ms:*/ 15.0, 50.0); // this should not timeout + AssertWaitPopTimed(75ms, true, /*item*/"item2", /*elapsed ms:*/ 25.0, 55.0); + AssertWaitPopTimed(75ms, true, /*item*/"item3", /*elapsed ms:*/ 25.0, 55.0); + + // now we enter a long wait, but we should be notified by the producer + AssertWaitPopTimed(1000ms, true, /*item*/"item4", /*elapsed ms:*/ 0.0, 110.0); + } + + PopResult wait_pop_until(CONCURRENT_QUEUE& queue, Clock::time_point until) + { + PopResult r; + rpp::Timer t; + r.success = queue.wait_pop_until(r.item, until); + r.elapsed_ms = t.elapsed_millis(); + print_info("wait_pop_until elapsed: %.2f ms item: %s\n", r.elapsed_ms, r.item.c_str()); + return r; + } + + #define AssertWaitPopUntil(until, expectSuccess, expectItem, minElapsedMs, maxElapsedMs) \ + AssertThat(bool(r = wait_pop_until(queue, until)), expectSuccess); \ + AssertThat(r.item, expectItem); \ + AssertInRange(r.elapsed_ms, minElapsedMs, maxElapsedMs); + + // wait until an absolute time limit + TestCase(wait_pop_until) + { + CONCURRENT_QUEUE queue; + + PopResult r; + + AssertWaitPopUntil(Clock::now()+5ms, false, /*item*/"", /*elapsed ms:*/ 2.9, 10.0); + AssertWaitPopUntil(Clock::now()+0ms, false, /*item*/"", /*elapsed ms:*/ 0.0, 0.2); + + // if someone pushes an item if we have a huge timeout, + // we should get it immediately + queue.push("item1"); + AssertWaitPopUntil(Clock::now()+10s, true, /*item*/"item1", /*elapsed ms:*/ 0.0, 10.0); + AssertWaitPopUntil(Clock::now()+15ms, false, /*item*/"", /*elapsed ms:*/ 12.9, 20.0); + + // if we have an item, but `until` is in the past, it should immediately return false + queue.push("item2"); + AssertWaitPopUntil(Clock::now()-15ms, false, /*item*/"", /*elapsed ms:*/ 0.0, 0.2); + // and now we can consume it + AssertWaitPopUntil(Clock::now()+15ms, true, /*item*/"item2", /*elapsed ms:*/ 0.0, 0.2); + } + + // ensure that `wait_pop_until` gives up if timeout is reached + TestCase(wait_pop_until_stops_on_timeout) + { + CONCURRENT_QUEUE queue; + rpp::cfuture<> slow_producer = rpp::async_task([&] { + spin_sleep_for_ms(50); + queue.push("item1"); + spin_sleep_for_ms(50); + queue.push("item2"); + spin_sleep_for_ms(50); + queue.push("item3"); + }); + scope_guard([&]{ slow_producer.get(); }); // Clang doesn't have jthread yet o_O + + PopResult r; + + // we should only have time to receive item1 and item2 + auto until = Clock::now() + 125ms; + AssertWaitPopUntil(until, true, /*item*/"item1", /*elapsed ms:*/ 20.0, 60.0); + AssertWaitPopUntil(until, true, /*item*/"item2", /*elapsed ms*/ 20.0, 60.0); + AssertWaitPopUntil(until, false, /*item*/"", /*elapsed ms*/ 20.0, 60.0); + } + + // in general the pop_with_timeout is not very useful because + // it requires some external mechanism to notify the queue + TestCase(wait_pop_with_timeout_and_cancelcondition) + { + CONCURRENT_QUEUE queue; + std::atomic_bool finished = false; + rpp::cfuture<> slow_producer = rpp::async_task([&] { + spin_sleep_for_ms(50); + queue.push("item1"); + spin_sleep_for_ms(50); + queue.push("item2"); + spin_sleep_for_ms(50); + queue.push("item3"); + spin_sleep_for_ms(50); + finished = true; + queue.notify(); // notify any waiting threads + }); + scope_guard([&]{ slow_producer.get(); }); + + auto cancelCondition = [&] { return (bool)finished; }; + std::string item; + AssertFalse(queue.wait_pop(item, 15ms, cancelCondition)); // this should timeout + AssertTrue(queue.wait_pop(item, 40ms, cancelCondition)); + AssertThat(item, "item1"); + AssertTrue(queue.wait_pop(item, 55ms, cancelCondition)); + AssertThat(item, "item2"); + AssertTrue(queue.wait_pop(item, 55ms, cancelCondition)); + AssertThat(item, "item3"); + // now wait until producer exits by setting the cancellation condition + // this should not take longer than ~55ms + rpp::Timer t; + AssertFalse(queue.wait_pop(item, 1000ms, cancelCondition)); + AssertLess(t.elapsed_millis(), 55); + } + + // ensure that wait_pop_interval actually checks the cancelCondition + // with sufficient frequency + TestCase(wait_pop_interval) + { + CONCURRENT_QUEUE queue; + rpp::cfuture<> slow_producer = rpp::async_task([&] + { + spin_sleep_for_ms(50); + queue.push("item1"); + spin_sleep_for_ms(50); + queue.push("item2"); + spin_sleep_for_ms(50); + queue.push("item3"); + }); + scope_guard([&]{ slow_producer.get(); }); + + auto wait_pop_interval = [&](std::string& item, auto timeout, auto interval, auto cancel) + { + rpp::Timer t; + bool result = queue.wait_pop_interval(item, timeout, interval, cancel); + double elapsed_ms = t.elapsed_millis(); + print_info("wait_pop_interval elapsed: %.2f ms item: %s\n", elapsed_ms, item.c_str()); + return result; + }; + + std::string item; + + // wait for 100ms with 10ms intervals, but first item will arrive within ~50ms + std::atomic_int counter0 = 0; + AssertTrue(wait_pop_interval(item, 100ms, 10ms, [&] { return ++counter0 >= 10; })); + AssertThat(item, "item1"); + AssertInRange(int(counter0), 5, 9); + + // wait for 20ms with 5ms intervals, it should timeout + std::atomic_int counter1 = 0; + AssertFalse(wait_pop_interval(item, 20ms, 5ms, [&] { return ++counter1 >= 10; })); + AssertInRange(int(counter1), 1, 6); // tolerance is VERY loose here + + // wait another 30ms with 2ms intervals, and it should trigger the cancelcondition + std::atomic_int counter2 = 0; + AssertFalse(wait_pop_interval(item, 30ms, 2ms, [&] { return ++counter2 >= 10; })); + AssertThat(int(counter2), 10); // it should have cancelled exactly at 10 checks + + // wait until we pop the item finally + std::atomic_int counter3 = 0; + AssertTrue(wait_pop_interval(item, 100ms, 5ms, [&] { return ++counter3 >= 20; })); + AssertThat(item, "item2"); + AssertLess(int(counter3), 20); // we should never have reached all the checks + + // now wait with extreme short intervals + std::atomic_int counter4 = 0; + AssertTrue(wait_pop_interval(item, 55ms, 1ms, [&] { return ++counter4 >= 55; })); + AssertThat(item, "item3"); + // we should never have reached the limit of 55 + // however there is NO guarantee that the sleep will be 1ms, that's just a minimum hint + // most likely it will sleep in 1-5ms range, so we lower the min range + AssertInRange(int(counter4), 15, 54); + } + + TestCase(wait_pop_cross_thread_perf) + { + constexpr int num_iterations = 10; + constexpr int num_items = 100'000; + constexpr int total_items = num_iterations * num_items; + double total_time = 0.0; + for (int i = 0; i < num_iterations; ++i) + { + CONCURRENT_QUEUE queue; + rpp::Timer t; + + rpp::cfuture<> producer = rpp::async_task([&] { + for (int i = 0; i < num_items; ++i) { + queue.push("item"); + if (i % 1000 == 0) // yield every 1000 items + std::this_thread::yield(); + } + }); + rpp::cfuture<> consumer = rpp::async_task([&] { + int num_received = 0; + std::string item; + // std::deque items; + while (num_received < num_items) { + // if (queue.try_pop_all(items)) { + // num_received += items.size(); + // rpp::sleep_us(100); + // } + + // if (queue.try_pop(item)) + // ++num_received; + // item = *queue.wait_pop(); + // ++num_received; + if (queue.wait_pop(item, Millis{5})) { + ++num_received; + } + } + }); + + producer.get(); + consumer.get(); + double elapsed = t.elapsed(); + total_time += elapsed; + print_info("wait_pop consumer elapsed: %.2f ms queue capacity: %d\n", elapsed*1000, queue.capacity()); + } + + double avg_time = total_time / num_iterations; + double items_per_sec = double(total_items) / total_time; + double Mitems_per_sec = items_per_sec / 1'000'000.0; + print_info("AVERAGE wait_pop consumer elapsed: %.2f ms %.1f Mitems/s\n", avg_time, Mitems_per_sec); + } + + TestCase(wait_pop_interval_cross_thread_perf) + { + constexpr int num_iterations = 10; + constexpr int num_items = 100'000; + constexpr int total_items = num_iterations * num_items; + double total_time = 0.0; + for (int i = 0; i < num_iterations; ++i) + { + CONCURRENT_QUEUE queue; + rpp::Timer t; + + rpp::cfuture<> producer = rpp::async_task([&] { + for (int i = 0; i < num_items; ++i) { + queue.push("item"); + if (i % 1000 == 0) // yield every 1000 items + std::this_thread::yield(); + } + }); + rpp::cfuture<> consumer = rpp::async_task([&] { + int num_received = 0; + std::string item; + while (num_received < num_items) { + if (queue.wait_pop_interval(item, Millis{15}, Millis{5}, + []{ return false; })) { + ++num_received; + } + } + }); + + producer.get(); + consumer.get(); + double elapsed = t.elapsed(); + total_time += elapsed; + print_info("wait_pop_interval consumer elapsed: %.2f ms queue capacity: %d\n", elapsed*1000, queue.capacity()); + } + + double avg_time = total_time / num_iterations; + double items_per_sec = double(total_items) / total_time; + double Mitems_per_sec = items_per_sec / 1'000'000.0; + print_info("AVERAGE wait_pop_interval consumer elapsed: %.2f ms %.1f Mitems/s\n", avg_time, Mitems_per_sec); + } +}; + +#endif // __linux__