Boost C++ Libraries Home Libraries People FAQ More

PrevUpHomeNext

Concurrent Queue

Boost.Fiber provides a bounded, concurrent queue (MPMC) suitable to synchonize fibers via message passing between threads. The capacity of the internal ring buffer determines if enqueue- or dequeue- operation will be lock-free. If the internal buffer has space left, e.g. the queue is not full, the enqueue-operation is lock-free for the producer. Until the queue is not empty, a dequeue-operation guaranteed to be lock-free for the consumer.

typedef boost::fibers::concurrent_queue< int > queue_t;

void send( queue_t & queue) {
    for ( int i = 0; i < 5; ++i) {
        queue.push( i);
    }
    queue.close();
}

void recv( queue_t & queue) {
    int i;
    while ( boost::fibers::queue_op_status::success == queue.pop(i) ) {
        std::cout << "received " << i << std::endl;
    }
}

queue_t queue{ 1 };
boost::fibers::fiber f1( std::bind( send, ref( queue) ) );
boost::fibers::fiber f2( std::bind( recv, ref( queue) ) );

f1.join();
f2.join();
Enumeration queue_op_status

queue operations return the state of the queue.

enum class queue_op_status {
    success,
    empty,
    full,
    closed,
    timeout
};
success

Effects:

Operation was successful.

empty

Effects:

queue is empty, operation failed.

full

Effects:

queue is full, operation failed.

closed

Effects:

queue is closed, operation failed.

timeout

Effects:

The operation did not become ready before specified timeout elapsed.

Template concurrent_queue<>

#include <boost/fiber/concurrent_queue.hpp>

namespace boost {
namespace fibers {

template< typename T >
class concurrent_queue {
public:
    typedef T   value_type;

    explicit concurrent_queue( std::size_t capacity);

    concurrent_queue( concurrent_queue const& other) = delete;
    concurrent_queue & operator=( concurrent_queue const& other) = delete;

    void close() noexcept;

    queue_op_status push( value_type const& va);
    queue_op_status push( value_type && va);
    template< typename Rep, typename Period >
    queue_op_status push_wait_for(
        value_type const& va,
        std::chrono::duration< Rep, Period > const& timeout_duration);
    queue_op_status push_wait_for( value_type && va,
        std::chrono::duration< Rep, Period > const& timeout_duration);
    template< typename Clock, typename Duration >
    queue_op_status push_wait_until(
        value_type const& va,
        std::chrono::time_point< Clock, Duration > const& timeout_time);
    template< typename Clock, typename Duration >
    queue_op_status push_wait_until(
        value_type && va,
        std::chrono::time_point< Clock, Duration > const& timeout_time);
    queue_op_status try_push( value_type const& va);
    queue_op_status try_push( value_type && va);

    queue_op_status pop( value_type & va);
    value_type value_pop();
    template< typename Rep, typename Period >
    queue_op_status pop_wait_for(
        value_type & va,
        std::chrono::duration< Rep, Period > const& timeout_duration);
    template< typename Clock, typename Duration >
    queue_op_status pop_wait_until(
        value_type & va,
        std::chrono::time_point< Clock, Duration > const& timeout_time);
    queue_op_status try_pop( value_type & va);
};

}}
Constructor
explicit concurrent_queue( std::size_t capacity);

Preconditions:

0 < capacity

Effects:

The constructor constructs an object of class concurrent_queue with an internal buffer of size capacity.

Throws:

fiber_error

Error Conditions:

invalid_argument: if 0 >= capacity.

Notes:

A push(), push_wait_for() or push_wait_until() will not block until the number of values in the queue becomes equal to capacity.

Member function close()

void close() noexcept;

Effects:

Deactivates the queue. No values can be put after calling this->close(). Fibers blocked in this->pop(), this->pop_wait_for() or this->pop_wait_until() will return closed. Fibers blocked in this->value_pop() will receive an exception.

Throws:

Nothing.

Note:

close() is like closing a pipe. It informs waiting consumers that no more values will arrive.

Member function push()

queue_op_status push( value_type const& va);
queue_op_status push( value_type && va);

Effects:

If queue is closed, returns closed. Otherwise enqueues the value in the queue, wakes up a fiber blocked on this->pop(), this->value_pop(), this->pop_wait_for() or this->pop_wait_until() and returns success.

Throws:

Exceptions thrown by copy- or move-operations.

Member function pop()

queue_op_status pop( value_type & va);

Effects:

Dequeues a value from the queue. If the queue is empty, the fiber gets suspended until at least one new item is push()ed (return value success and va contains dequeued value) or the queue gets close()d (return value closed).

Throws:

Exceptions thrown by copy- or move-operations.

Member function value_pop()

value_type value_pop();

Effects:

Dequeues a value from the queue. If the queue is empty, the fiber gets suspended until at least one new item is push()ed or the queue gets close()d (which throws an exception).

Throws:

fiber_error if *this is closed or by copy- or move-operations.

Error conditions:

std::errc::operation_not_permitted

Member function try_pop()

queue_op_status try_pop( value_type & va);

Effects:

If queue is empty, returns empty. If queue is closed, returns closed. Otherwise it returns success and va contains the dequeued value.

Throws:

Exceptions thrown by copy- or move-operations.

Member function pop_wait_for()

template< typename Rep, typename Period >
queue_op_status pop_wait_for(
    value_type & va,
    std::chrono::duration< Rep, Period > const& timeout_duration)

Effects:

Accepts std::chrono::duration and internally computes a timeout time as (system time + timeout_duration). If queue is not empty, immediately dequeues a value from the queue. Otherwise the fiber gets suspended until at least one new item is push()ed (return value success and va contains dequeued value), or the queue gets close()d (return value closed), or the system time reaches the computed timeout time (return value timeout).

Throws:

timeout-related exceptions or by copy- or move-operations.

Member function pop_wait_until()

template< typename Clock, typename Duration >
queue_op_status pop_wait_until(
    value_type & va,
    std::chrono::time_point< Clock, Duration > const& timeout_time)

Effects:

Accepts a std::chrono::time_point< Clock, Duration >. If queue is not empty, immediately dequeues a value from the queue. Otherwise the fiber gets suspended until at least one new item is push()ed (return value success and va contains dequeued value), or the queue gets close()d (return value closed), or the system time reaches the passed time_point (return value timeout).

Throws:

timeout-related exceptions or by copy- or move-operations.


PrevUpHomeNext