Skip to content
Open
75 changes: 75 additions & 0 deletions include/boost/lockfree/detail/move_payload.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
// boost lockfree: move_payload helper
//
// Copyright (C) 2011 Tim Blechmann
//
// Distributed under the Boost Software License, Version 1.0. (See
// accompanying file LICENSE_1_0.txt or copy at
// http://www.boost.org/LICENSE_1_0.txt)

#ifndef BOOST_LOCKFREE_DETAIL_MOVE_PAYLOAD_HPP_INCLUDED
#define BOOST_LOCKFREE_DETAIL_MOVE_PAYLOAD_HPP_INCLUDED

#include <boost/mpl/if.hpp>
#include <boost/type_traits/is_convertible.hpp>

#if defined(_MSC_VER)
#pragma warning(push)
#pragma warning(disable: 4512) // assignment operator could not be generated
#endif

namespace boost {
namespace lockfree {
namespace detail {

struct move_convertible
{
template <typename T, typename U>
static void move(T & t, U & u)
{
u = std::move(t);
}
};

struct move_constructible_and_assignable
{
template <typename T, typename U>
static void move(T & t, U & u)
{
u = U(std::move(t));
}
};

template <typename T, typename U>
void move_payload(T & t, U & u)
{
typedef typename boost::mpl::if_<typename boost::is_convertible<T, U>::type,
move_convertible,
move_constructible_and_assignable
>::type move_type;
move_type::move(t, u);
}

template <typename T>
struct consume_via_move
{
consume_via_move(T & out):
out_(out)
{}

template <typename U>
void operator()(U & element)
{
move_payload(element, out_);
}

T & out_;
};


}}}

#if defined(_MSC_VER)
#pragma warning(pop)
#endif

#endif /* BOOST_LOCKFREE_DETAIL_COPY_PAYLOAD_HPP_INCLUDED */
129 changes: 107 additions & 22 deletions include/boost/lockfree/spsc_queue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,25 @@
#include <algorithm>
#include <memory>

#include <boost/config.hpp> // for BOOST_NO_CXX11_

#include <boost/aligned_storage.hpp>
#include <boost/assert.hpp>

#include <boost/static_assert.hpp>
#include <boost/utility.hpp>
#include <boost/utility/enable_if.hpp>
#include <boost/config.hpp> // for BOOST_LIKELY

#include <boost/type_traits/has_trivial_destructor.hpp>
#include <boost/type_traits/is_convertible.hpp>
#include <boost/type_traits/is_copy_constructible.hpp>

#include <boost/lockfree/detail/atomic.hpp>
#include <boost/lockfree/detail/copy_payload.hpp>
#ifndef BOOST_NO_CXX11_RVALUE_REFERENCES
#include <boost/lockfree/detail/move_payload.hpp>
#include <boost/move/move.hpp>
#endif
#include <boost/lockfree/detail/parameter.hpp>
#include <boost/lockfree/detail/prefix.hpp>

Expand Down Expand Up @@ -98,7 +105,26 @@ class ringbuffer_base
return write_available(write_index, read_index, max_size);
}

bool push(T const & t, T * buffer, size_t max_size)
#ifndef BOOST_NO_CXX11_RVALUE_REFERENCES
bool push(T && t, T * buffer, size_t max_size )
{
const size_t write_index = write_index_.load( memory_order_relaxed ); // only written from push thread
const size_t next = next_index( write_index, max_size );

if( next == read_index_.load( memory_order_acquire ) )
return false; /* ringbuffer is full */

new (buffer + write_index) T(boost::move(t)); // move-construct

write_index_.store( next, memory_order_release );

return true;
}
#endif

template<typename U>
typename boost::enable_if<boost::integral_constant<bool, boost::is_copy_constructible<U>::value && boost::is_same<T,U>::value>, bool>::type
push(U const & t, U * buffer, size_t max_size)
{
const size_t write_index = write_index_.load(memory_order_relaxed); // only written from push thread
const size_t next = next_index(write_index, max_size);
Expand Down Expand Up @@ -382,21 +408,30 @@ class ringbuffer_base
return write_index == read_index;
}

template< class OutputIterator >
OutputIterator copy_and_delete( T * first, T * last, OutputIterator out )

template<class OutputIterator>
typename boost::enable_if<boost::has_trivial_destructor<T>, OutputIterator>::type
copy_and_delete(T * first, T * last, OutputIterator out)
{
if (boost::has_trivial_destructor<T>::value) {
return std::copy(first, last, out); // will use memcpy if possible
} else {
for (; first != last; ++first, ++out) {
*out = *first;
first->~T();
}
return out;
return std::copy(first, last, out); // will use memcpy if possible
}

template<class OutputIterator>
typename boost::disable_if<boost::has_trivial_destructor<T>, OutputIterator>::type
copy_and_delete(T * first, T * last, OutputIterator out)
{
for (; first != last; ++first, ++out) {
#ifndef BOOST_NO_CXX11_RVALUE_REFERENCES
*out = boost::move(*first);
#else
*out = *first;
#endif
first->~T();
}
return out;
}

template< class Functor >
template<class Functor>
void run_functor_and_delete( T * first, T * last, Functor & functor )
{
for (; first != last; ++first) {
Expand Down Expand Up @@ -445,11 +480,21 @@ class compile_time_sized_ringbuffer:
}

public:
bool push(T const & t)

template<typename U>
typename boost::enable_if<boost::integral_constant<bool, boost::is_copy_constructible<U>::value && boost::is_same<T, U>::value>,bool>::type
push(U const & t)
{
return ringbuffer_base<T>::push(t, data(), max_size);
}

#ifndef BOOST_NO_CXX11_RVALUE_REFERENCES
bool push(T && t)
{
return ringbuffer_base<T>::push(boost::move(t), data(), max_size);
}
#endif

template <typename Functor>
bool consume_one(Functor & f)
{
Expand Down Expand Up @@ -558,11 +603,20 @@ class runtime_sized_ringbuffer:
Alloc::deallocate(array_, max_elements_);
}

bool push(T const & t)
template< typename U>
typename boost::enable_if< boost::integral_constant<bool, boost::is_copy_constructible<U>::value && boost::is_same<T,U>::value>, bool >::type
push(U const & t)
{
return ringbuffer_base<T>::push(t, &*array_, max_elements_);
}

#ifndef BOOST_NO_CXX11_RVALUE_REFERENCES
bool push(T && t)
{
return ringbuffer_base<T>::push(boost::move(t), &*array_, max_elements_);
}
#endif

template <typename Functor>
bool consume_one(Functor & f)
{
Expand Down Expand Up @@ -756,10 +810,26 @@ class spsc_queue:
*
* \note Thread-safe and wait-free
* */
bool push(T const & t)
bool
push(T const & u)
{
return base_type::push(u);
}

#ifndef BOOST_NO_CXX11_RVALUE_REFERENCES
/** Pushes object t to the ringbuffer.
*
* \pre only one thread is allowed to push data to the spsc_queue
* \post object will be pushed to the spsc_queue, unless it is full.
* \return true, if the push operation is successful.
*
* \note Thread-safe and wait-free
* */
bool push(T && t)
{
return base_type::push(t);
return base_type::push(boost::move(t));
}
#endif

/** Pops one object from ringbuffer.
*
Expand All @@ -772,25 +842,40 @@ class spsc_queue:
bool pop ()
{
detail::consume_noop consume_functor;
return consume_one( consume_functor );
return consume_one(consume_functor);
}

/** Pops one object from ringbuffer.
/** Pops one object from ringbuffer. If it is move assignable it will be moved,
* otherwise it will be copied.
*
* \pre only one thread is allowed to pop data to the spsc_queue
* \post if ringbuffer is not empty, object will be copied to ret.
* \post if ringbuffer is not empty, object will be assigned to ret.
* \return true, if the pop operation is successful, false if ringbuffer was empty.
*
* \note Thread-safe and wait-free
*/
#ifdef BOOST_NO_CXX11_RVALUE_REFERENCES

template <typename U>
typename boost::enable_if<typename is_convertible<T, U>::type, bool>::type
pop (U & ret)
pop(U & ret)
{
detail::consume_via_copy<U> consume_functor(ret);
return consume_one( consume_functor );
return consume_one(consume_functor);
}

#else

template <typename U>
typename boost::enable_if<typename is_convertible<T, U>::type, bool>::type
pop(U & ret)
{
detail::consume_via_move<U> consume_functor( ret );
return consume_one(consume_functor);
}

#endif

/** Pushes as many objects from the array t as there is space.
*
* \pre only one thread is allowed to push data to the spsc_queue
Expand Down
32 changes: 31 additions & 1 deletion test/spsc_queue_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
#include <iostream>
#include <memory>

#include "test_helpers.hpp"
#include "test_common.hpp"
#include "test_helpers.hpp"

using namespace boost;
using namespace boost::lockfree;
Expand Down Expand Up @@ -405,3 +405,33 @@ BOOST_AUTO_TEST_CASE( spsc_queue_reset_test )

BOOST_REQUIRE(f.empty());
}

#ifndef BOOST_NO_CXX11_RVALUE_REFERENCES

BOOST_AUTO_TEST_CASE( spsc_queue_unique_ptr_push_pop_test )
{
spsc_queue<std::unique_ptr<int[]>, capacity<64> > f;

BOOST_REQUIRE(f.empty());

unique_ptr<int[]> in;
unique_ptr<int[]> out;

const int fortytwo = 42;

in.reset( new int[1] );
in[0] = fortytwo;
int* data = in.get();

BOOST_REQUIRE( f.push( std::move(in) ) );
BOOST_REQUIRE( f.pop(out) );

BOOST_REQUIRE( out.get() == data );
BOOST_REQUIRE( out[0] == fortytwo );

f.reset();

BOOST_REQUIRE(f.empty());
}

#endif