Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 106 additions & 1 deletion include/boost/lockfree/spsc_queue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@
#include <boost/static_assert.hpp>
#include <boost/utility.hpp>
#include <boost/utility/enable_if.hpp>
#include <boost/config.hpp> // for BOOST_LIKELY
#include <boost/config.hpp> // for BOOST_LIKELY and BOOST_HAS_RVALUE_REFS

#include <boost/type_traits/has_trivial_destructor.hpp>
#include <boost/type_traits/is_convertible.hpp>
#include <boost/type_traits/is_copy_constructible.hpp>

#include <boost/lockfree/detail/atomic.hpp>
#include <boost/lockfree/detail/copy_payload.hpp>
Expand Down Expand Up @@ -112,6 +113,42 @@ class ringbuffer_base

return true;
}
#ifdef BOOST_HAS_RVALUE_REFS

bool push(T&& t, T * buffer, size_t max_size)
{
const size_t write_index = write_index_.load(memory_order_relaxed); // only written from push thread
const size_t next = next_index(write_index, max_size);

if (next == read_index_.load(memory_order_acquire))
return false; /* ringbuffer is full */

new (buffer + write_index) T(std::move(t)); // move-construct

write_index_.store(next, memory_order_release);

return true;
}


template<typename... Args>
typename boost::enable_if< typename boost::is_constructible<T, Args...>::type, bool>::type
emplace(T * buffer, size_t max_size, Args&&... args )
{
const size_t write_index = write_index_.load(memory_order_relaxed); // only written from push thread
const size_t next = next_index(write_index, max_size);

if (next == read_index_.load(memory_order_acquire))
return false; /* ringbuffer is full */

new (buffer + write_index) T(std::forward<Args>(args)...); // emplace
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps new (buffer + write_index) T{std::forward<Args>(args)...};


write_index_.store(next, memory_order_release);

return true;
}

#endif

size_t push(const T * input_buffer, size_t input_count, T * internal_buffer, size_t max_size)
{
Expand Down Expand Up @@ -450,6 +487,22 @@ class compile_time_sized_ringbuffer:
return ringbuffer_base<T>::push(t, data(), max_size);
}

#ifdef BOOST_HAS_RVALUE_REFS
bool push(T&& t)
{
return ringbuffer_base<T>::push(std::move(t), data(), max_size);
}


template<typename... Args>
typename boost::enable_if< typename boost::is_constructible<T, Args...>::type, bool>::type
emplace(Args&&... args)
{
return ringbuffer_base<T>::emplace(data(), max_size, std::forward<Args>(args)...);
}

#endif

template <typename Functor>
bool consume_one(Functor & f)
{
Expand Down Expand Up @@ -562,6 +615,23 @@ class runtime_sized_ringbuffer:
{
return ringbuffer_base<T>::push(t, &*array_, max_elements_);
}
#ifdef BOOST_HAS_RVALUE_REFS

bool push(T&& t)
{
return ringbuffer_base<T>::push(std::move(t), &*array_, max_elements_);
}



template<typename... Args>
typename boost::enable_if< typename boost::is_constructible<T, Args...>::type, bool>::type
emplace(Args&&... args)
{
return ringbuffer_base<T>::emplace(&*array_, max_elements_, std::forward<Args>(args)...);
}

#endif

template <typename Functor>
bool consume_one(Functor & f)
Expand Down Expand Up @@ -761,6 +831,41 @@ class spsc_queue:
return base_type::push(t);
}


#ifdef BOOST_HAS_RVALUE_REFS
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You probably want to detect the absence of
BOOST_NO_CXX11_RVALUE_REFERENCES for push, and the absence of both BOOST_NO_CXX11_RVALUE_REFERENCES and BOOST_NO_CXX11_VARIADIC_TEMPLATES for emplace.


/** Pushes object t to the ringbuffer via move construction/
*
* \pre only one thread is allowed to push data to the spsc_queue
* \post object will be pushed to the spsc_queue, unless it is full.
* \return true, if the push operation is successful.
*
* \note Thread-safe and wait-free
* */

bool push(T&& t)
{
return base_type::push(std::move(t));
}

/** Emplaces an instance of T to the ringbuffer via direct initialization using the given constructor arguments
*
* \pre only one thread is allowed to push data to the spsc_queue
* \post object will be pushed to the spsc_queue, unless it is full.
* \return true, if the push operation is successful.
*
* \note Thread-safe and wait-free
* */

template<typename... Args>
typename boost::enable_if< typename boost::is_constructible<T, Args...>::type, bool>::type
emplace(Args&&... args)
{
return base_type::emplace(std::forward<Args>(args)...);
}

#endif

/** Pops one object from ringbuffer.
*
* \pre only one thread is allowed to pop data to the spsc_queue
Expand Down