Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 20 additions & 7 deletions src/rpp/concurrent_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
*/
#pragma once
#include "config.h"
#include "semaphore.h"
#include "mutex.h"
#include "condition_variable.h"
#include <vector>
#include <thread> // std::this_thread::yield()
#include <type_traits> // std::is_trivially_destructible_v
Expand All @@ -22,7 +23,7 @@ namespace rpp
* @note This is not optimized for speed, but has acceptable performance
* and due to its simplicity it won't randomly deadlock on you.
*/
template<class T>
template<class T, class mutex_t = rpp::mutex, class cond_var_t = rpp::condition_variable>
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should instead replace with a singular locking strategy pattern

template<class T, class sync_t = rpp::mutex_sync>
class concurrent_queue
{
    using mutex_t = typename sync_t::mutex_t;
    uinsg cond_var_t = typename sync_t::cond_var_t;
    using lock_t = std::unique_lock<mutex_t>;

    std::unique_ptr<sync_t> DefaultSync;
    sync_t& Sync;

public:
    concurrent_queue()
        : DefaultSync{std::make_unique<sync_t>())
        , Sync{*DefaultSync}
    {}

    concurrent_queue(sync_t& sync) noexcept
        : Sync{sync}
    {}
};

I'm not sure which is more optimal, unique ptr or std::optional, needs to be investigated

class concurrent_queue
{
// for faster performance the Head and Tail are always within the linear range
Expand All @@ -38,15 +39,25 @@ namespace rpp
using time_point = clock::time_point;

private:
using mutex_t = rpp::mutex;
using lock_t = rpp::semaphore::lock_t;
mutable mutex_t Mutex;
mutable rpp::condition_variable Waiter;
using lock_t = std::unique_lock<mutex_t>;
std::optional<mutex_t> MutexStorage;
std::optional<cond_var_t> WaiterStorage;
mutex_t& Mutex;
cond_var_t& Waiter;
// special state flag for all waiters to immediately exit the queue
std::atomic_bool Cleared = false;

public:
concurrent_queue() = default;
concurrent_queue()
: Mutex{MutexStorage.emplace()}
, Waiter{WaiterStorage.emplace()}
{}

concurrent_queue(mutex_t& mutex, cond_var_t& waiter)
: Mutex{mutex}
, Waiter{waiter}
{}

~concurrent_queue() noexcept
{
// safely lock, clear and notify all waiters to give up
Expand All @@ -59,6 +70,8 @@ namespace rpp
concurrent_queue& operator=(const concurrent_queue&) = delete;

concurrent_queue(concurrent_queue&& q) noexcept
: Mutex{MutexStorage.emplace()}
, Waiter{WaiterStorage.emplace()}
{
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should copy the syncing properly depending on which strategy is used

        concurrent_queue(concurrent_queue&& q) noexcept
            : DefaultSync{} // ensure DefaultSync is constructed before emplace()
            , Sync{q.DefaultSync ? this->DefaultSync.emplace() : q.Sync}
        {

// safely swap the states from q to default empty state
std::lock_guard lock { q.mutex() };
Expand Down
80 changes: 80 additions & 0 deletions src/rpp/shm_mutex.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
#if __linux__
#include "shm_mutex.h"
#include "debugging.h"
#include <condition_variable>

// TODO: throw instead of logging errors
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, throwing is also problematic. We have a lot of code that we don't want to crash because of somewhat optional mutexes. If the mutex really fails we can just log it and return.


namespace rpp {
shm_mutex::shm_mutex() noexcept
{
pthread_mutexattr_t attr;
pthread_mutexattr_init(&attr);
if (int err = pthread_mutexattr_setpshared(&attr, PTHREAD_PROCESS_SHARED); err != 0)
{
LogError("shm_mutex: pthread_mutexattr_setpshared failed: error code %d", err);
return;
}
pthread_mutex_init(&handle, &attr);
}

shm_mutex::~shm_mutex() noexcept
{
if (int err = pthread_mutex_destroy(&handle); err != 0)
{
LogError("shm_mutex: pthread_mutex_destroy failed: error code %d", err);
}
}

void shm_mutex::lock()
{
if (int err = pthread_mutex_lock(&handle); err != 0)
{
LogError("shm_mutex: pthread_mutex_lock failed: error code %d", err);
}
}

bool shm_mutex::try_lock()
{
int err = pthread_mutex_trylock(&handle);
if (err != 0 && err != EBUSY)
{
LogError("shm_mutex: pthread_mutex_trylock failed: error code %d", err);
return false;
}
return err == 0;
}

void shm_mutex::unlock()
{
if (int err = pthread_mutex_unlock(&handle); err != 0)
{
LogError("shm_mutex: pthread_mutex_unlock failed: error code %d", err);
}
}

// -------------------------------------------------------

shm_cond_var::shm_cond_var()
{
pthread_condattr_t attr;
pthread_condattr_init(&attr);
if (int err = pthread_condattr_setpshared(&attr, PTHREAD_PROCESS_SHARED); err != 0)
{
LogError("shm_mutex: pthread_mutexattr_setpshared failed: error code %d", err);
return;
}
pthread_cond_init(&handle, &attr);
}

shm_cond_var::~shm_cond_var() noexcept
{
if (int err = pthread_cond_destroy(&handle); err != 0)
{
LogError("shm_mutex: pthread_mutex_destroy failed: error code %d", err);
}
}

} // namespace rpp

#endif // __linux__
135 changes: 135 additions & 0 deletions src/rpp/shm_mutex.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
#pragma once
#if __linux__
#include <pthread.h>
#include <chrono>
#include <mutex>
#include <condition_variable>

namespace rpp {

template<class Clock, class Duration>
inline timespec to_timespec(const std::chrono::time_point<Clock, Duration>& tp)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should move away from std::chrono tbh, it's slow and worse than rpp::TimePoint by now

{
using namespace std::chrono;

// Get duration since the system_clock epoch, in nanoseconds
auto d = tp.time_since_epoch();
auto ns = duration_cast<nanoseconds>(d).count();

timespec ts;
ts.tv_sec = ns / 1'000'000'000;
ts.tv_nsec = ns % 1'000'000'000;
return ts;
}

class shm_mutex
{
pthread_mutex_t handle;
public:
using native_handle_type = pthread_mutex_t*;

shm_mutex() noexcept;
~shm_mutex() noexcept;

// No copy/move
shm_mutex(const shm_mutex&) = delete;
shm_mutex& operator=(const shm_mutex&) = delete;
shm_mutex(shm_mutex&&) = delete;
shm_mutex& operator=(shm_mutex&&) = delete;

void lock();
bool try_lock();
void unlock();

native_handle_type native_handle() noexcept { return &handle; }
};

class shm_cond_var
{
shm_mutex cs;
pthread_cond_t handle;

using clock = std::chrono::high_resolution_clock;
using duration = clock::duration;
using time_point = clock::time_point;
public:
using native_handle_type = pthread_cond_t*;
shm_cond_var();
~shm_cond_var() noexcept;

// No copy/move
shm_cond_var(const shm_cond_var&) = delete;
shm_cond_var& operator=(const shm_cond_var&) = delete;
shm_cond_var(shm_cond_var&&) = delete;
shm_cond_var& operator=(shm_cond_var&&) = delete;

void notify_one()
{
pthread_cond_signal(&handle);
}
void notify_all()
{
pthread_cond_broadcast(&handle);
}

void wait(std::unique_lock<shm_mutex>& lock)
{
cs.lock();
lock.unlock();
(void)pthread_cond_wait(&handle, cs.native_handle());
cs.unlock();
lock.lock();
}

std::cv_status wait_for(std::unique_lock<shm_mutex>& lock, const duration& rel_time)
{
auto abs_time = clock::now() + rel_time;
return wait_until(lock, abs_time);
}

template<class Predicate>
void wait(std::unique_lock<shm_mutex>& mtx, const Predicate& stop_waiting)
{
while (!stop_waiting())
wait(mtx);
}

std::cv_status wait_until(std::unique_lock<shm_mutex>& lock, const time_point& abs_time)
{
cs.lock();
lock.unlock();
timespec abstime = to_timespec(abs_time);
int err = pthread_cond_timedwait(&handle, cs.native_handle(), &abstime);
std::cv_status status = (err == 0) ? std::cv_status::no_timeout : std::cv_status::timeout; // TODO: handle other errors
cs.unlock();
lock.lock();
return status;
}

template<class Predicate>
bool wait_until(std::unique_lock<shm_mutex>& lock, const time_point& abs_time,
const Predicate& stop_waiting)
{
while (!stop_waiting())
if (wait_until(lock, abs_time) == std::cv_status::timeout)
return stop_waiting();
return true;
}

template<class Predicate>
bool wait_for(std::unique_lock<shm_mutex>& lock, const duration& rel_time,
const Predicate& stop_waiting)
{
auto abs_time = clock::now() + rel_time;
while (!stop_waiting())
if (wait_until(lock, abs_time) == std::cv_status::timeout)
return stop_waiting();
return true;
}

native_handle_type native_handle() noexcept { return &handle; }
};

} // namespace rpp

#endif // __linux__
Loading