diff --git a/include/boost/lockfree/spsc_queue.hpp b/include/boost/lockfree/spsc_queue.hpp index 5ecfb2a..ad507ef 100644 --- a/include/boost/lockfree/spsc_queue.hpp +++ b/include/boost/lockfree/spsc_queue.hpp @@ -18,10 +18,11 @@ #include #include #include -#include // for BOOST_LIKELY +#include // for BOOST_LIKELY and BOOST_HAS_RVALUE_REFS #include #include +#include #include #include @@ -112,6 +113,42 @@ class ringbuffer_base return true; } +#ifdef BOOST_HAS_RVALUE_REFS + + bool push(T&& t, T * buffer, size_t max_size) + { + const size_t write_index = write_index_.load(memory_order_relaxed); // only written from push thread + const size_t next = next_index(write_index, max_size); + + if (next == read_index_.load(memory_order_acquire)) + return false; /* ringbuffer is full */ + + new (buffer + write_index) T(std::move(t)); // move-construct + + write_index_.store(next, memory_order_release); + + return true; + } + + + template + typename boost::enable_if< typename boost::is_constructible::type, bool>::type + emplace(T * buffer, size_t max_size, Args&&... args ) + { + const size_t write_index = write_index_.load(memory_order_relaxed); // only written from push thread + const size_t next = next_index(write_index, max_size); + + if (next == read_index_.load(memory_order_acquire)) + return false; /* ringbuffer is full */ + + new (buffer + write_index) T(std::forward(args)...); // emplace + + write_index_.store(next, memory_order_release); + + return true; + } + +#endif size_t push(const T * input_buffer, size_t input_count, T * internal_buffer, size_t max_size) { @@ -450,6 +487,22 @@ class compile_time_sized_ringbuffer: return ringbuffer_base::push(t, data(), max_size); } +#ifdef BOOST_HAS_RVALUE_REFS + bool push(T&& t) + { + return ringbuffer_base::push(std::move(t), data(), max_size); + } + + + template + typename boost::enable_if< typename boost::is_constructible::type, bool>::type + emplace(Args&&... args) + { + return ringbuffer_base::emplace(data(), max_size, std::forward(args)...); + } + +#endif + template bool consume_one(Functor & f) { @@ -562,6 +615,23 @@ class runtime_sized_ringbuffer: { return ringbuffer_base::push(t, &*array_, max_elements_); } +#ifdef BOOST_HAS_RVALUE_REFS + + bool push(T&& t) + { + return ringbuffer_base::push(std::move(t), &*array_, max_elements_); + } + + + + template + typename boost::enable_if< typename boost::is_constructible::type, bool>::type + emplace(Args&&... args) + { + return ringbuffer_base::emplace(&*array_, max_elements_, std::forward(args)...); + } + +#endif template bool consume_one(Functor & f) @@ -761,6 +831,41 @@ class spsc_queue: return base_type::push(t); } + +#ifdef BOOST_HAS_RVALUE_REFS + + /** Pushes object t to the ringbuffer via move construction/ + * + * \pre only one thread is allowed to push data to the spsc_queue + * \post object will be pushed to the spsc_queue, unless it is full. + * \return true, if the push operation is successful. + * + * \note Thread-safe and wait-free + * */ + + bool push(T&& t) + { + return base_type::push(std::move(t)); + } + + /** Emplaces an instance of T to the ringbuffer via direct initialization using the given constructor arguments + * + * \pre only one thread is allowed to push data to the spsc_queue + * \post object will be pushed to the spsc_queue, unless it is full. + * \return true, if the push operation is successful. + * + * \note Thread-safe and wait-free + * */ + + template + typename boost::enable_if< typename boost::is_constructible::type, bool>::type + emplace(Args&&... args) + { + return base_type::emplace(std::forward(args)...); + } + +#endif + /** Pops one object from ringbuffer. * * \pre only one thread is allowed to pop data to the spsc_queue