-
Notifications
You must be signed in to change notification settings - Fork 2
Add pthread mutex and condvar, incorporate in concurrent_queue #22
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -5,7 +5,8 @@ | |
| */ | ||
| #pragma once | ||
| #include "config.h" | ||
| #include "semaphore.h" | ||
| #include "mutex.h" | ||
| #include "condition_variable.h" | ||
| #include <vector> | ||
| #include <thread> // std::this_thread::yield() | ||
| #include <type_traits> // std::is_trivially_destructible_v | ||
|
|
@@ -22,7 +23,7 @@ namespace rpp | |
| * @note This is not optimized for speed, but has acceptable performance | ||
| * and due to its simplicity it won't randomly deadlock on you. | ||
| */ | ||
| template<class T> | ||
| template<class T, class mutex_t = rpp::mutex, class cond_var_t = rpp::condition_variable> | ||
| class concurrent_queue | ||
| { | ||
| // for faster performance the Head and Tail are always within the linear range | ||
|
|
@@ -38,15 +39,25 @@ namespace rpp | |
| using time_point = clock::time_point; | ||
|
|
||
| private: | ||
| using mutex_t = rpp::mutex; | ||
| using lock_t = rpp::semaphore::lock_t; | ||
| mutable mutex_t Mutex; | ||
| mutable rpp::condition_variable Waiter; | ||
| using lock_t = std::unique_lock<mutex_t>; | ||
| std::optional<mutex_t> MutexStorage; | ||
| std::optional<cond_var_t> WaiterStorage; | ||
| mutex_t& Mutex; | ||
| cond_var_t& Waiter; | ||
| // special state flag for all waiters to immediately exit the queue | ||
| std::atomic_bool Cleared = false; | ||
|
|
||
| public: | ||
| concurrent_queue() = default; | ||
| concurrent_queue() | ||
| : Mutex{MutexStorage.emplace()} | ||
| , Waiter{WaiterStorage.emplace()} | ||
| {} | ||
|
|
||
| concurrent_queue(mutex_t& mutex, cond_var_t& waiter) | ||
| : Mutex{mutex} | ||
| , Waiter{waiter} | ||
| {} | ||
|
|
||
| ~concurrent_queue() noexcept | ||
| { | ||
| // safely lock, clear and notify all waiters to give up | ||
|
|
@@ -59,6 +70,8 @@ namespace rpp | |
| concurrent_queue& operator=(const concurrent_queue&) = delete; | ||
|
|
||
| concurrent_queue(concurrent_queue&& q) noexcept | ||
| : Mutex{MutexStorage.emplace()} | ||
| , Waiter{WaiterStorage.emplace()} | ||
| { | ||
|
Owner
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should copy the syncing properly depending on which strategy is used concurrent_queue(concurrent_queue&& q) noexcept
: DefaultSync{} // ensure DefaultSync is constructed before emplace()
, Sync{q.DefaultSync ? this->DefaultSync.emplace() : q.Sync}
{ |
||
| // safely swap the states from q to default empty state | ||
| std::lock_guard lock { q.mutex() }; | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,80 @@ | ||
| #if __linux__ | ||
| #include "shm_mutex.h" | ||
| #include "debugging.h" | ||
| #include <condition_variable> | ||
|
|
||
| // TODO: throw instead of logging errors | ||
|
Owner
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Well, throwing is also problematic. We have a lot of code that we don't want to crash because of somewhat optional mutexes. If the mutex really fails we can just log it and return. |
||
|
|
||
| namespace rpp { | ||
| shm_mutex::shm_mutex() noexcept | ||
| { | ||
| pthread_mutexattr_t attr; | ||
| pthread_mutexattr_init(&attr); | ||
| if (int err = pthread_mutexattr_setpshared(&attr, PTHREAD_PROCESS_SHARED); err != 0) | ||
| { | ||
| LogError("shm_mutex: pthread_mutexattr_setpshared failed: error code %d", err); | ||
| return; | ||
| } | ||
| pthread_mutex_init(&handle, &attr); | ||
| } | ||
|
|
||
| shm_mutex::~shm_mutex() noexcept | ||
| { | ||
| if (int err = pthread_mutex_destroy(&handle); err != 0) | ||
| { | ||
| LogError("shm_mutex: pthread_mutex_destroy failed: error code %d", err); | ||
| } | ||
| } | ||
|
|
||
| void shm_mutex::lock() | ||
| { | ||
| if (int err = pthread_mutex_lock(&handle); err != 0) | ||
| { | ||
| LogError("shm_mutex: pthread_mutex_lock failed: error code %d", err); | ||
| } | ||
| } | ||
|
|
||
| bool shm_mutex::try_lock() | ||
| { | ||
| int err = pthread_mutex_trylock(&handle); | ||
| if (err != 0 && err != EBUSY) | ||
| { | ||
| LogError("shm_mutex: pthread_mutex_trylock failed: error code %d", err); | ||
| return false; | ||
| } | ||
| return err == 0; | ||
| } | ||
|
|
||
| void shm_mutex::unlock() | ||
| { | ||
| if (int err = pthread_mutex_unlock(&handle); err != 0) | ||
| { | ||
| LogError("shm_mutex: pthread_mutex_unlock failed: error code %d", err); | ||
| } | ||
| } | ||
|
|
||
| // ------------------------------------------------------- | ||
|
|
||
| shm_cond_var::shm_cond_var() | ||
| { | ||
| pthread_condattr_t attr; | ||
| pthread_condattr_init(&attr); | ||
| if (int err = pthread_condattr_setpshared(&attr, PTHREAD_PROCESS_SHARED); err != 0) | ||
| { | ||
| LogError("shm_mutex: pthread_mutexattr_setpshared failed: error code %d", err); | ||
| return; | ||
| } | ||
| pthread_cond_init(&handle, &attr); | ||
| } | ||
|
|
||
| shm_cond_var::~shm_cond_var() noexcept | ||
| { | ||
| if (int err = pthread_cond_destroy(&handle); err != 0) | ||
| { | ||
| LogError("shm_mutex: pthread_mutex_destroy failed: error code %d", err); | ||
| } | ||
| } | ||
|
|
||
| } // namespace rpp | ||
|
|
||
| #endif // __linux__ | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,135 @@ | ||
| #pragma once | ||
| #if __linux__ | ||
| #include <pthread.h> | ||
| #include <chrono> | ||
| #include <mutex> | ||
| #include <condition_variable> | ||
|
|
||
| namespace rpp { | ||
|
|
||
| template<class Clock, class Duration> | ||
| inline timespec to_timespec(const std::chrono::time_point<Clock, Duration>& tp) | ||
|
Owner
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should move away from std::chrono tbh, it's slow and worse than rpp::TimePoint by now |
||
| { | ||
| using namespace std::chrono; | ||
|
|
||
| // Get duration since the system_clock epoch, in nanoseconds | ||
| auto d = tp.time_since_epoch(); | ||
| auto ns = duration_cast<nanoseconds>(d).count(); | ||
|
|
||
| timespec ts; | ||
| ts.tv_sec = ns / 1'000'000'000; | ||
| ts.tv_nsec = ns % 1'000'000'000; | ||
| return ts; | ||
| } | ||
|
|
||
| class shm_mutex | ||
| { | ||
| pthread_mutex_t handle; | ||
| public: | ||
| using native_handle_type = pthread_mutex_t*; | ||
|
|
||
| shm_mutex() noexcept; | ||
| ~shm_mutex() noexcept; | ||
|
|
||
| // No copy/move | ||
| shm_mutex(const shm_mutex&) = delete; | ||
| shm_mutex& operator=(const shm_mutex&) = delete; | ||
| shm_mutex(shm_mutex&&) = delete; | ||
| shm_mutex& operator=(shm_mutex&&) = delete; | ||
|
|
||
| void lock(); | ||
| bool try_lock(); | ||
| void unlock(); | ||
|
|
||
| native_handle_type native_handle() noexcept { return &handle; } | ||
| }; | ||
|
|
||
| class shm_cond_var | ||
| { | ||
| shm_mutex cs; | ||
| pthread_cond_t handle; | ||
|
|
||
| using clock = std::chrono::high_resolution_clock; | ||
| using duration = clock::duration; | ||
| using time_point = clock::time_point; | ||
| public: | ||
| using native_handle_type = pthread_cond_t*; | ||
| shm_cond_var(); | ||
| ~shm_cond_var() noexcept; | ||
|
|
||
| // No copy/move | ||
| shm_cond_var(const shm_cond_var&) = delete; | ||
| shm_cond_var& operator=(const shm_cond_var&) = delete; | ||
| shm_cond_var(shm_cond_var&&) = delete; | ||
| shm_cond_var& operator=(shm_cond_var&&) = delete; | ||
|
|
||
| void notify_one() | ||
| { | ||
| pthread_cond_signal(&handle); | ||
| } | ||
| void notify_all() | ||
| { | ||
| pthread_cond_broadcast(&handle); | ||
| } | ||
|
|
||
| void wait(std::unique_lock<shm_mutex>& lock) | ||
| { | ||
| cs.lock(); | ||
| lock.unlock(); | ||
| (void)pthread_cond_wait(&handle, cs.native_handle()); | ||
| cs.unlock(); | ||
| lock.lock(); | ||
| } | ||
|
|
||
| std::cv_status wait_for(std::unique_lock<shm_mutex>& lock, const duration& rel_time) | ||
| { | ||
| auto abs_time = clock::now() + rel_time; | ||
| return wait_until(lock, abs_time); | ||
| } | ||
|
|
||
| template<class Predicate> | ||
| void wait(std::unique_lock<shm_mutex>& mtx, const Predicate& stop_waiting) | ||
| { | ||
| while (!stop_waiting()) | ||
| wait(mtx); | ||
| } | ||
|
|
||
| std::cv_status wait_until(std::unique_lock<shm_mutex>& lock, const time_point& abs_time) | ||
| { | ||
| cs.lock(); | ||
| lock.unlock(); | ||
| timespec abstime = to_timespec(abs_time); | ||
| int err = pthread_cond_timedwait(&handle, cs.native_handle(), &abstime); | ||
| std::cv_status status = (err == 0) ? std::cv_status::no_timeout : std::cv_status::timeout; // TODO: handle other errors | ||
| cs.unlock(); | ||
| lock.lock(); | ||
| return status; | ||
| } | ||
|
|
||
| template<class Predicate> | ||
| bool wait_until(std::unique_lock<shm_mutex>& lock, const time_point& abs_time, | ||
| const Predicate& stop_waiting) | ||
| { | ||
| while (!stop_waiting()) | ||
| if (wait_until(lock, abs_time) == std::cv_status::timeout) | ||
| return stop_waiting(); | ||
| return true; | ||
| } | ||
|
|
||
| template<class Predicate> | ||
| bool wait_for(std::unique_lock<shm_mutex>& lock, const duration& rel_time, | ||
| const Predicate& stop_waiting) | ||
| { | ||
| auto abs_time = clock::now() + rel_time; | ||
| while (!stop_waiting()) | ||
| if (wait_until(lock, abs_time) == std::cv_status::timeout) | ||
| return stop_waiting(); | ||
| return true; | ||
| } | ||
|
|
||
| native_handle_type native_handle() noexcept { return &handle; } | ||
| }; | ||
|
|
||
| } // namespace rpp | ||
|
|
||
| #endif // __linux__ | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should instead replace with a singular locking strategy pattern
I'm not sure which is more optimal, unique ptr or std::optional, needs to be investigated