Roc Toolkit internal modules
Roc Toolkit: real-time audio streaming
Loading...
Searching...
No Matches
mpsc_queue.h
Go to the documentation of this file.
1/*
2 * Copyright (c) 2020 Roc Streaming authors
3 *
4 * This Source Code Form is subject to the terms of the Mozilla Public
5 * License, v. 2.0. If a copy of the MPL was not distributed with this
6 * file, You can obtain one at http://mozilla.org/MPL/2.0/.
7 */
8
9//! @file roc_core/mpsc_queue.h
10//! @brief Multi-producer single-consumer queue.
11
12#ifndef ROC_CORE_MPSC_QUEUE_H_
13#define ROC_CORE_MPSC_QUEUE_H_
14
15#include "roc_core/atomic_ops.h"
20#include "roc_core/panic.h"
21
22namespace roc {
23namespace core {
24
25//! Thread-safe lock-free node-based intrusive multi-producer single-consumer queue.
26//!
27//! Provides sequential consistency.
28//!
29//! Based on Dmitry Vyukov algorithm:
30//! - http://tiny.cc/3d3moz
31//! - https://int08h.com/post/ode-to-a-vyukov-queue/
32//! - https://github.com/samanbarghi/MPSCQ
33//!
34//! @tparam T defines object type, it should inherit MpscQueueNode.
35//!
36//! @tparam OwnershipPolicy defines ownership policy which is used to acquire an
37//! element ownership when it's added to the queue and release ownership when it's
38//! removed from the queue.
39template <class T, template <class TT> class OwnershipPolicy = RefCountedOwnership>
40class MpscQueue : public NonCopyable<> {
41public:
42 //! Pointer type.
43 //! @remarks
44 //! either raw or smart pointer depending on the ownership policy.
46
47 ~MpscQueue() {
48 // release ownership of all objects
49 while (pop_front_exclusive()) {
50 }
51 }
52
53 //! Add object to the end of the queue.
54 //! Can be called concurrently.
55 //! Acquires ownership of @p obj.
56 //! After this call returns, any thread calling pop_front_exclusive() or
57 //! try_pop_front_exclusive() is guaranteed to see a non-empty queue. But note
58 //! that the latter can still fail if called concurrently with push_back().
59 //! @note
60 //! - On CPUs with atomic exchange, e.g. x86, this operation is both lock-free
61 //! and wait-free, i.e. it never waits for sleeping threads and never spins.
62 //! - On CPUs without atomic exchange, e.g. arm64, this operation is lock-free,
63 //! but not wait-free, i.e. it never waits for sleeping threads, but with a low
64 //! probability can spin while there are concurrent non-sleeping push_back()
65 //! calls (because of the spin loop in the implementation of atomic exchange).
66 //! - Concurrent try_pop_front() and pop_front() does not affect this operation.
67 //! Only concurrent push_back() calls can make it spin.
68 void push_back(T& obj) {
70
71 MpscQueueNode::MpscQueueData* node = obj.mpsc_queue_data();
72
73 impl_.push_back(node);
74 }
75
76 //! Try to remove object from the beginning of the queue (non-blocking version).
77 //! Should NOT be called concurrently.
78 //! Releases ownership of the returned object.
79 //! @remarks
80 //! - Returns NULL if the queue is empty.
81 //! - May return NULL even if the queue is actially non-empty, in particular if
82 //! concurrent push_back() call is running, or if the push_back() results were
83 //! not fully published yet.
84 //! @note
85 //! - This operation is both lock-free and wait-free on all architectures, i.e. it
86 //! never waits for sleeping threads and never spins indefinitely.
88 MpscQueueNode::MpscQueueData* node = impl_.pop_front(false);
89 if (!node) {
90 return NULL;
91 }
92
93 Pointer obj = static_cast<T*>(node->container_of());
95
96 return obj;
97 }
98
99 //! Remove object from the beginning of the queue (blocking version).
100 //! Should NOT be called concurrently.
101 //! Releases ownership of the returned object.
102 //! @remarks
103 //! - Returns NULL if the queue is empty.
104 //! - May spin while a concurrent push_back() call is running.
105 //! @remarks
106 //! - This operation is NOT lock-free (or wait-free). It may spin until all
107 //! concurrent push_back() calls are finished.
108 //! - On the "fast-path", however, this operation does not wait for any
109 //! threads and just performs a few atomic reads and writes.
111 MpscQueueNode::MpscQueueData* node = impl_.pop_front(true);
112 if (!node) {
113 return NULL;
114 }
115
116 Pointer obj = static_cast<T*>(node->container_of());
118
119 return obj;
120 }
121
122private:
123 MpscQueueImpl impl_;
124};
125
126} // namespace core
127} // namespace roc
128
129#endif // ROC_CORE_MPSC_QUEUE_H_
Multi-producer single-consumer queue internal implementation class.
void push_back(MpscQueueNode::MpscQueueData *node)
Add object to the end of the queue.
MpscQueueNode::MpscQueueData * pop_front(bool can_spin)
Remove object from the beginning of the queue.
Thread-safe lock-free node-based intrusive multi-producer single-consumer queue.
Definition mpsc_queue.h:40
void push_back(T &obj)
Add object to the end of the queue. Can be called concurrently. Acquires ownership of obj....
Definition mpsc_queue.h:68
Pointer pop_front_exclusive()
Remove object from the beginning of the queue (blocking version). Should NOT be called concurrently....
Definition mpsc_queue.h:110
Pointer try_pop_front_exclusive()
Try to remove object from the beginning of the queue (non-blocking version). Should NOT be called con...
Definition mpsc_queue.h:87
OwnershipPolicy< T >::Pointer Pointer
Pointer type.
Definition mpsc_queue.h:45
Base class for non-copyable objects.
Definition noncopyable.h:23
Shared ownership intrusive pointer.
Definition shared_ptr.h:32
Multi-producer single-consumer queue internal implementation.
MpscQueue node.
Root namespace.
Non-copyable object.
Ownership policies.
Panic.
MpscQueueNode * container_of()
Get MpscQueueNode object that contains this ListData object.