Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 75 additions & 0 deletions include/boost/lockfree/detail/move_payload.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
// boost lockfree: move_payload helper
//
// Copyright (C) 2011 Tim Blechmann
//
// Distributed under the Boost Software License, Version 1.0. (See
// accompanying file LICENSE_1_0.txt or copy at
// http://www.boost.org/LICENSE_1_0.txt)

#ifndef BOOST_LOCKFREE_DETAIL_MOVE_PAYLOAD_HPP_INCLUDED
#define BOOST_LOCKFREE_DETAIL_MOVE_PAYLOAD_HPP_INCLUDED

#include <boost/mpl/if.hpp>
#include <boost/type_traits/is_convertible.hpp>

#if defined(_MSC_VER)
#pragma warning(push)
#pragma warning(disable: 4512) // assignment operator could not be generated
#endif

namespace boost {
namespace lockfree {
namespace detail {

struct move_convertible
{
template <typename T, typename U>
static void move(T & t, U & u)
{
u = std::move(t);
}
};

struct move_constructible_and_assignable
{
template <typename T, typename U>
static void move(T & t, U & u)
{
u = U(std::move(t));
}
};

template <typename T, typename U>
void move_payload(T & t, U & u)
{
typedef typename boost::mpl::if_<typename boost::is_convertible<T, U>::type,
move_convertible,
move_constructible_and_assignable
>::type move_type;
move_type::move(t, u);
}

template <typename T>
struct consume_via_move
{
consume_via_move(T & out):
out_(out)
{}

template <typename U>
void operator()(U & element)
{
move_payload(element, out_);
}

T & out_;
};


}}}

#if defined(_MSC_VER)
#pragma warning(pop)
#endif

#endif /* BOOST_LOCKFREE_DETAIL_COPY_PAYLOAD_HPP_INCLUDED */
100 changes: 82 additions & 18 deletions include/boost/lockfree/spsc_queue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,21 @@

#include <algorithm>
#include <memory>
#include <type_traits>

#include <boost/aligned_storage.hpp>
#include <boost/assert.hpp>
#include <boost/config.hpp> // for BOOST_LIKELY
#include <boost/static_assert.hpp>
#include <boost/utility.hpp>
#include <boost/utility/enable_if.hpp>
#include <boost/config.hpp> // for BOOST_LIKELY

#include <boost/type_traits/has_trivial_destructor.hpp>
#include <boost/type_traits/is_convertible.hpp>

#include <boost/lockfree/detail/atomic.hpp>
#include <boost/lockfree/detail/copy_payload.hpp>
#include <boost/lockfree/detail/move_payload.hpp>
#include <boost/lockfree/detail/parameter.hpp>
#include <boost/lockfree/detail/prefix.hpp>

Expand Down Expand Up @@ -98,7 +100,24 @@ class ringbuffer_base
return write_available(write_index, read_index, max_size);
}

bool push(T const & t, T * buffer, size_t max_size)
bool push( T && t, T * buffer, size_t max_size )
{
const size_t write_index = write_index_.load( memory_order_relaxed ); // only written from push thread
const size_t next = next_index( write_index, max_size );

if( next == read_index_.load( memory_order_acquire ) )
return false; /* ringbuffer is full */

new (buffer + write_index) T(std::move(t)); // move-construct

write_index_.store( next, memory_order_release );

return true;
}

template<typename T>
typename std::enable_if< std::is_copy_constructible<T>::value, bool >::type
push(T const & t, T * buffer, size_t max_size)
{
const size_t write_index = write_index_.load(memory_order_relaxed); // only written from push thread
const size_t next = next_index(write_index, max_size);
Expand Down Expand Up @@ -382,18 +401,23 @@ class ringbuffer_base
return write_index == read_index;
}

template< class OutputIterator >
OutputIterator copy_and_delete( T * first, T * last, OutputIterator out )
template< class OutputIterator>
typename std::enable_if< boost::has_trivial_destructor<T>::value, OutputIterator >::type
copy_and_delete( T * first, T * last, OutputIterator out )
{
if (boost::has_trivial_destructor<T>::value) {
return std::copy(first, last, out); // will use memcpy if possible
} else {
for (; first != last; ++first, ++out) {
*out = *first;
first->~T();
}
return out;
return std::copy(first, last, out); // will use memcpy if possible
}


template<class OutputIterator>
typename std::enable_if< ! boost::has_trivial_destructor<T>::value, OutputIterator >::type
copy_and_delete( T * first, T * last, OutputIterator out )
{
for (; first != last; ++first, ++out) {
*out = std::move(*first);
first->~T();
}
return out;
}

template< class Functor >
Expand Down Expand Up @@ -445,11 +469,18 @@ class compile_time_sized_ringbuffer:
}

public:

template< typename = std::enable_if< std::is_copy_constructible<T>::value, bool >::type >
bool push(T const & t)
{
return ringbuffer_base<T>::push(t, data(), max_size);
}

bool push(T&& t)
{
return ringbuffer_base<T>::push(std::move(t), data(), max_size);
}

template <typename Functor>
bool consume_one(Functor & f)
{
Expand Down Expand Up @@ -558,11 +589,18 @@ class runtime_sized_ringbuffer:
Alloc::deallocate(array_, max_elements_);
}

bool push(T const & t)
template< typename = std::enable_if< std::is_copy_constructible<T>::value, bool >::type >
bool
push(T const & t)
{
return ringbuffer_base<T>::push(t, &*array_, max_elements_);
}

bool push(T&& t)
{
return ringbuffer_base<T>::push(std::move(t), &*array_, max_elements_);
}

template <typename Functor>
bool consume_one(Functor & f)
{
Expand Down Expand Up @@ -756,11 +794,25 @@ class spsc_queue:
*
* \note Thread-safe and wait-free
* */
bool push(T const & t)
template< typename = std::enable_if< std::is_copy_constructible<T>::value, bool >::type >
bool push(T const & t)
{
return base_type::push(t);
}

/** Pushes object t to the ringbuffer.
*
* \pre only one thread is allowed to push data to the spsc_queue
* \post object will be pushed to the spsc_queue, unless it is full.
* \return true, if the push operation is successful.
*
* \note Thread-safe and wait-free
* */
bool push( T && t )
{
return base_type::push( std::move( t ) );
}

/** Pops one object from ringbuffer.
*
* \pre only one thread is allowed to pop data to the spsc_queue
Expand All @@ -775,22 +827,34 @@ class spsc_queue:
return consume_one( consume_functor );
}

/** Pops one object from ringbuffer.
/** Pops one object from ringbuffer. If it is move assignable it will be moved,
* otherwise it will be copied.
*
* \pre only one thread is allowed to pop data to the spsc_queue
* \post if ringbuffer is not empty, object will be copied to ret.
* \post if ringbuffer is not empty, object will be assigned to ret.
* \return true, if the pop operation is successful, false if ringbuffer was empty.
*
* \note Thread-safe and wait-free
*/
template <typename U>
typename boost::enable_if<typename is_convertible<T, U>::type, bool>::type
pop (U & ret)
typename boost::enable_if<typename integral_constant<bool,
is_convertible<T, U>::value && !std::is_move_assignable<T>::value>::type, bool>::type
pop ( U & ret)
{
detail::consume_via_copy<U> consume_functor(ret);
return consume_one( consume_functor );
}

template <typename U>
typename boost::enable_if<typename integral_constant<bool,
is_convertible<T, U>::value && std::is_move_assignable<T>::value>::type, bool>::type
pop ( U & ret )
{
detail::consume_via_move<U> consume_functor( ret );
return consume_one( consume_functor );
}


/** Pushes as many objects from the array t as there is space.
*
* \pre only one thread is allowed to push data to the spsc_queue
Expand Down
30 changes: 29 additions & 1 deletion test/spsc_queue_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
#include <iostream>
#include <memory>

#include "test_helpers.hpp"
#include "test_common.hpp"
#include "test_helpers.hpp"

using namespace boost;
using namespace boost::lockfree;
Expand Down Expand Up @@ -405,3 +405,31 @@ BOOST_AUTO_TEST_CASE( spsc_queue_reset_test )

BOOST_REQUIRE(f.empty());
}


BOOST_AUTO_TEST_CASE( spsc_queue_unique_ptr_push_pop_test )
{
spsc_queue<std::unique_ptr<int[]>, capacity<64> > f;

BOOST_REQUIRE(f.empty());

unique_ptr<int[]> in;
unique_ptr<int[]> out;

const int fortytwo = 42;

in.reset( new int[1] );
in[0] = fortytwo;
int* data = in.get();

BOOST_REQUIRE( f.push( std::move(in) ) );
BOOST_REQUIRE( f.pop(out) );

BOOST_REQUIRE( out.get() == data );
BOOST_REQUIRE( out[0] == fortytwo );

f.reset();

BOOST_REQUIRE(f.empty());
}