Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 3 additions & 24 deletions include/boost/lockfree/detail/freelist.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,15 +85,6 @@ class alignas( cacheline_bytes ) freelist_stack : Alloc
return node;
}

template < bool ThreadSafe, bool Bounded, typename ArgumentType >
T* construct( const ArgumentType& arg )
{
T* node = allocate< ThreadSafe, Bounded >();
if ( node )
new ( node ) T( arg );
return node;
}

template < bool ThreadSafe, bool Bounded, typename ArgumentType >
T* construct( ArgumentType&& arg )
{
Expand All @@ -108,7 +99,7 @@ class alignas( cacheline_bytes ) freelist_stack : Alloc
{
T* node = allocate< ThreadSafe, Bounded >();
if ( node )
new ( node ) T( arg1, arg2 );
new ( node ) T( std::forward< ArgumentType1 >( arg1 ), std::forward< ArgumentType2 >( arg2 ) );
return node;
}

Expand Down Expand Up @@ -454,18 +445,6 @@ class fixed_size_freelist : NodeStorage
return node;
}

template < bool ThreadSafe, bool Bounded, typename ArgumentType >
T* construct( const ArgumentType& arg )
{
index_t node_index = allocate< ThreadSafe >();
if ( node_index == null_handle() )
return NULL;

T* node = NodeStorage::nodes() + node_index;
new ( node ) T( arg );
return node;
}

template < bool ThreadSafe, bool Bounded, typename ArgumentType >
T* construct( ArgumentType&& arg )
{
Expand All @@ -479,14 +458,14 @@ class fixed_size_freelist : NodeStorage
}

template < bool ThreadSafe, bool Bounded, typename ArgumentType1, typename ArgumentType2 >
T* construct( const ArgumentType1& arg1, const ArgumentType2& arg2 )
T* construct( ArgumentType1&& arg1, ArgumentType2&& arg2 )
{
index_t node_index = allocate< ThreadSafe >();
if ( node_index == null_handle() )
return NULL;

T* node = NodeStorage::nodes() + node_index;
new ( node ) T( arg1, arg2 );
new ( node ) T( std::forward< ArgumentType1 >( arg1 ), std::forward< ArgumentType2 >( arg2 ) );
return node;
}

Expand Down
39 changes: 30 additions & 9 deletions include/boost/lockfree/queue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -354,11 +354,11 @@ class queue

bool do_push_node( node* n )
{
handle_type node_handle = pool.get_handle( n );

if ( n == NULL )
return false;

handle_type node_handle = pool.get_handle( n );

for ( ;; ) {
tagged_node_handle tail = tail_.load( memory_order_acquire );
node* tail_node = pool.get_pointer( tail );
Expand Down Expand Up @@ -393,27 +393,48 @@ class queue
* \note Not Thread-safe. If internal memory pool is exhausted and the memory pool is not fixed-sized, a new node
* will be allocated from the OS. This may not be lock-free. \throws if memory allocator throws
* */
bool unsynchronized_push( const T& t )
{
return unsynchronized_push_impl( t );
}

/// \copydoc boost::lockfree::queue::unsynchronized_push(const T& t)
bool unsynchronized_push( T&& t )
{
node* n = pool.template construct< false, false >( std::forward< T >( t ), pool.null_handle() );
return unsynchronized_push_impl( std::forward< T >( t ) );
}

private:
#ifndef BOOST_DOXYGEN_INVOKED
template < typename U >
bool unsynchronized_push_impl( U&& t )
{
node* n = pool.template construct< false, false >( std::forward< U >( t ), pool.null_handle() );

if ( n == NULL )
return false;

handle_type node_handle = pool.get_handle( n );

for ( ;; ) {
tagged_node_handle tail = tail_.load( memory_order_relaxed );
tagged_node_handle next = tail->next.load( memory_order_relaxed );
node* next_ptr = next.get_ptr();
tagged_node_handle tail = tail_.load( memory_order_relaxed );
node* tail_node = pool.get_pointer( tail );
tagged_node_handle next = tail_node->next.load( memory_order_relaxed );
node* next_ptr = pool.get_pointer( next );

if ( next_ptr == 0 ) {
tail->next.store( tagged_node_handle( n, next.get_next_tag() ), memory_order_relaxed );
tail_.store( tagged_node_handle( n, tail.get_next_tag() ), memory_order_relaxed );
tail_node->next.store( tagged_node_handle( node_handle, next.get_next_tag() ), memory_order_relaxed );
tail_.store( tagged_node_handle( node_handle, tail.get_next_tag() ), memory_order_relaxed );
return true;
} else
tail_.store( tagged_node_handle( next_ptr, tail.get_next_tag() ), memory_order_relaxed );
tail_.store( tagged_node_handle( pool.get_handle( next_ptr ), tail.get_next_tag() ),
memory_order_relaxed );
}
}

#endif
public:

/** Pops object from queue.
*
* \post if pop operation is successful, object will be copied to ret.
Expand Down
2 changes: 1 addition & 1 deletion include/boost/lockfree/stack.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -619,7 +619,7 @@ class stack
tagged_node_handle old_tos = tos.load( detail::memory_order_relaxed );
node* old_tos_pointer = pool.get_pointer( old_tos );

if ( !pool.get_pointer( old_tos ) )
if ( !old_tos_pointer )
return false;

node* new_tos_ptr = pool.get_pointer( old_tos_pointer->next );
Expand Down
146 changes: 146 additions & 0 deletions test/queue_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -241,4 +241,150 @@ BOOST_AUTO_TEST_CASE( queue_uses_optional )
BOOST_TEST_REQUIRE( pop_to_optional );
}

BOOST_AUTO_TEST_CASE( queue_uses_optional_capacity )
{
boost::lockfree::queue< int, boost::lockfree::capacity< 64 > > stk;

bool pop_to_nullopt = stk.pop( boost::lockfree::uses_optional ) == std::nullopt;
BOOST_TEST_REQUIRE( pop_to_nullopt );

stk.push( 53 );
bool pop_to_optional = stk.pop( boost::lockfree::uses_optional ) == 53;
BOOST_TEST_REQUIRE( pop_to_optional );
}

#endif

BOOST_AUTO_TEST_CASE( fixed_size_queue_test_exhausted )
{
queue< int, capacity< 2 > > f;

BOOST_TEST_REQUIRE( f.push( 1 ) );
BOOST_TEST_REQUIRE( f.push( 2 ) );
BOOST_TEST_REQUIRE( !f.push( 3 ) );

int out;
BOOST_TEST_REQUIRE( f.pop( out ) );
BOOST_TEST_REQUIRE( out == 1 );
BOOST_TEST_REQUIRE( f.pop( out ) );
BOOST_TEST_REQUIRE( out == 2 );
BOOST_TEST_REQUIRE( !f.pop( out ) );
BOOST_TEST_REQUIRE( f.empty() );
}

BOOST_AUTO_TEST_CASE( bounded_queue_test_exhausted )
{
queue< int > f( 2 );

BOOST_TEST_REQUIRE( f.bounded_push( 1 ) );
BOOST_TEST_REQUIRE( f.bounded_push( 2 ) );
BOOST_TEST_REQUIRE( !f.bounded_push( 3 ) );

int out;
BOOST_TEST_REQUIRE( f.pop( out ) );
BOOST_TEST_REQUIRE( out == 1 );
BOOST_TEST_REQUIRE( f.pop( out ) );
BOOST_TEST_REQUIRE( out == 2 );
BOOST_TEST_REQUIRE( !f.pop( out ) );
BOOST_TEST_REQUIRE( f.empty() );
}

BOOST_AUTO_TEST_CASE( queue_unsynchronized_push_const_ref )
{
queue< int > f( 64 );

BOOST_TEST_REQUIRE( f.empty() );

const int a = 42;
const int b = 43;

f.unsynchronized_push( a );
f.unsynchronized_push( b );

int i1( 0 ), i2( 0 );
BOOST_TEST_REQUIRE( f.unsynchronized_pop( i1 ) );
BOOST_TEST_REQUIRE( i1 == 42 );
BOOST_TEST_REQUIRE( f.unsynchronized_pop( i2 ) );
BOOST_TEST_REQUIRE( i2 == 43 );
BOOST_TEST_REQUIRE( f.empty() );
}

BOOST_AUTO_TEST_CASE( queue_consume_one_capacity_test )
{
queue< int, capacity< 64 > > f;

BOOST_TEST_REQUIRE( f.empty() );

f.push( 10 );
f.push( 20 );

bool success1 = f.consume_one( []( int i ) {
BOOST_TEST_REQUIRE( i == 10 );
} );

bool success2 = f.consume_one( []( int i ) {
BOOST_TEST_REQUIRE( i == 20 );
} );

BOOST_TEST_REQUIRE( success1 );
BOOST_TEST_REQUIRE( success2 );
BOOST_TEST_REQUIRE( !f.consume_one( []( int ) {} ) );
BOOST_TEST_REQUIRE( f.empty() );
}

BOOST_AUTO_TEST_CASE( queue_consume_all_capacity_test )
{
queue< int, capacity< 64 > > f;

BOOST_TEST_REQUIRE( f.empty() );

f.push( 1 );
f.push( 2 );
f.push( 3 );

size_t consumed = f.consume_all( []( int ) {} );

BOOST_TEST_REQUIRE( consumed == 3u );
BOOST_TEST_REQUIRE( f.empty() );
}

BOOST_AUTO_TEST_CASE( queue_empty_pop_test )
{
queue< int > f( 64 );

int out = 0xDEAD;
BOOST_TEST_REQUIRE( !f.pop( out ) );
BOOST_TEST_REQUIRE( !f.unsynchronized_pop( out ) );
BOOST_TEST_REQUIRE( !f.consume_one( []( int ) {} ) );
BOOST_TEST_REQUIRE( f.consume_all( []( int ) {} ) == 0u );
}

BOOST_AUTO_TEST_CASE( queue_push_pop_many )
{
queue< int > f( 64 );

for ( int i = 0; i < 100; ++i )
BOOST_TEST_REQUIRE( f.push( i ) );

for ( int i = 0; i < 100; ++i ) {
int out;
BOOST_TEST_REQUIRE( f.pop( out ) );
BOOST_TEST_REQUIRE( out == i );
}
BOOST_TEST_REQUIRE( f.empty() );
}

BOOST_AUTO_TEST_CASE( queue_push_pop_many_capacity )
{
queue< int, capacity< 128 > > f;

for ( int i = 0; i < 100; ++i )
BOOST_TEST_REQUIRE( f.push( i ) );

for ( int i = 0; i < 100; ++i ) {
int out;
BOOST_TEST_REQUIRE( f.pop( out ) );
BOOST_TEST_REQUIRE( out == i );
}
BOOST_TEST_REQUIRE( f.empty() );
}
18 changes: 18 additions & 0 deletions test/queue_unbounded_stress_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,21 @@ BOOST_AUTO_TEST_CASE( queue_test_unbounded )
boost::lockfree::queue< long > q( 128 );
tester->run( q );
}

BOOST_AUTO_TEST_CASE( queue_test_unbounded_asymmetric_many_writers )
{
typedef queue_stress_tester< false > tester_type;
std::unique_ptr< tester_type > tester( new tester_type( 1, 4 ) );

boost::lockfree::queue< long > q( 128 );
tester->run( q );
}

BOOST_AUTO_TEST_CASE( queue_test_unbounded_asymmetric_many_readers )
{
typedef queue_stress_tester< false > tester_type;
std::unique_ptr< tester_type > tester( new tester_type( 4, 1 ) );

boost::lockfree::queue< long > q( 128 );
tester->run( q );
}
Loading