Roc Toolkit internal modules
Roc Toolkit: real-time audio streaming
Loading...
Searching...
No Matches
mpsc_queue.h
Go to the documentation of this file.
1/*
2 * Copyright (c) 2020 Roc Streaming authors
3 *
4 * This Source Code Form is subject to the terms of the Mozilla Public
5 * License, v. 2.0. If a copy of the MPL was not distributed with this
6 * file, You can obtain one at http://mozilla.org/MPL/2.0/.
7 */
8
9//! @file roc_core/mpsc_queue.h
10//! @brief Multi-producer single-consumer queue.
11
12#ifndef ROC_CORE_MPSC_QUEUE_H_
13#define ROC_CORE_MPSC_QUEUE_H_
14
15#include "roc_core/atomic_ops.h"
20#include "roc_core/panic.h"
21
22namespace roc {
23namespace core {
24
25//! Thread-safe lock-free node-based intrusive multi-producer single-consumer queue.
26//!
27//! Provides sequential consistency.
28//!
29//! Based on Dmitry Vyukov algorithm:
30//! - http://tiny.cc/3d3moz
31//! - https://int08h.com/post/ode-to-a-vyukov-queue/
32//! - https://github.com/samanbarghi/MPSCQ
33//!
34//! @tparam T defines object type, it should inherit MpscQueueNode.
35//!
36//! @tparam OwnershipPolicy defines ownership policy which is used to acquire an
37//! element ownership when it's added to the queue and release ownership when it's
38//! removed from the queue.
39template <class T, template <class TT> class OwnershipPolicy = RefCountedOwnership>
40class MpscQueue : public NonCopyable<> {
41public:
42 //! Pointer type.
43 //! @remarks
44 //! either raw or smart pointer depending on the ownership policy.
45 typedef typename OwnershipPolicy<T>::Pointer Pointer;
46
47 MpscQueue()
48 : tail_(&stub_)
49 , head_(&stub_) {
50 }
51
52 ~MpscQueue() {
53 // release ownership of all objects
54 while (pop_front_exclusive()) {
55 }
56 }
57
58 //! Add object to the end of the queue.
59 //! Can be called concurrently.
60 //! Acquires ownership of @p obj.
61 //! After this call returns, any thread calling pop_front_exclusive() or
62 //! try_pop_front_exclusive() is guaranteed to see a non-empty queue. But note
63 //! that the latter can still fail if called concurrently with push_back().
64 //! @note
65 //! - On CPUs with atomic exchange, e.g. x86, this operation is both lock-free
66 //! and wait-free, i.e. it never waits for sleeping threads and never spins.
67 //! - On CPUs without atomic exchange, e.g. arm64, this operation is lock-free,
68 //! but not wait-free, i.e. it never waits for sleeping threads, but with a low
69 //! probability can spin while there are concurrent non-sleeping push_back()
70 //! calls (because of the spin loop in the implementation of atomic exchange).
71 //! - Concurrent try_pop_front() and pop_front() does not affect this operation.
72 //! Only concurrent push_back() calls can make it spin.
73 void push_back(T& obj) {
74 OwnershipPolicy<T>::acquire(obj);
75
76 MpscQueueNode::MpscQueueData* node = obj.mpsc_queue_data();
77
78 change_owner_(node, NULL, this);
79
80 push_node_(node);
81 }
82
83 //! Try to remove object from the beginning of the queue (non-blocking version).
84 //! Should NOT be called concurrently.
85 //! Releases ownership of the returned object.
86 //! @remarks
87 //! - Returns NULL if the queue is empty.
88 //! - May return NULL even if the queue is actially non-empty, in particular if
89 //! concurrent push_back() call is running, or if the push_back() results were
90 //! not fully published yet.
91 //! @note
92 //! - This operation is both lock-free and wait-free on all architectures, i.e. it
93 //! never waits for sleeping threads and never spins indefinitely.
95 MpscQueueNode::MpscQueueData* node = pop_node_<false>();
96 if (!node) {
97 return NULL;
98 }
99
100 change_owner_(node, this, NULL);
101
102 Pointer obj = static_cast<T*>(node->container_of());
103 OwnershipPolicy<T>::release(*obj);
104
105 return obj;
106 }
107
108 //! Remove object from the beginning of the queue (blocking version).
109 //! Should NOT be called concurrently.
110 //! Releases ownership of the returned object.
111 //! @remarks
112 //! - Returns NULL if the queue is empty.
113 //! - May spin while a concurrent push_back() call is running.
114 //! @remarks
115 //! - This operation is NOT lock-free (or wait-free). It may spin until all
116 //! concurrent push_back() calls are finished.
117 //! - On the "fast-path", however, this operation does not wait for any
118 //! threads and just performs a few atomic reads and writes.
120 MpscQueueNode::MpscQueueData* node = pop_node_<true>();
121 if (!node) {
122 return NULL;
123 }
124
125 change_owner_(node, this, NULL);
126
127 Pointer obj = static_cast<T*>(node->container_of());
128 OwnershipPolicy<T>::release(*obj);
129
130 return obj;
131 }
132
133private:
134 typedef MpscQueueNode::MpscQueueData MpscQueueData;
135
136 void change_owner_(MpscQueueData* node, void* from, void* to) {
137 void* exp = from;
138 if (!AtomicOps::compare_exchange_relaxed(node->queue, exp, to)) {
139 roc_panic("mpsc queue: unexpected node owner: from=%p to=%p cur=%p", from, to,
140 exp);
141 }
142 }
143
144 void push_node_(MpscQueueData* node) {
145 AtomicOps::store_relaxed(node->next, (MpscQueueData*)NULL);
146
147 MpscQueueData* prev = AtomicOps::exchange_seq_cst(tail_, node);
148
149 AtomicOps::store_release(prev->next, node);
150 }
151
152 template <bool CanSpin> MpscQueueData* pop_node_() {
153 MpscQueueData* head = AtomicOps::load_relaxed(head_);
154 MpscQueueData* next = AtomicOps::load_acquire(head->next);
155
156 if (head == &stub_) {
157 if (!next) {
158 if (AtomicOps::load_seq_cst(tail_) == head) {
159 // queue is empty
160 return NULL;
161 } else {
162 // queue is not empty, so head->next == NULL means that
163 // a push_node_() call is in progress
164 if (!(next = (CanSpin ? wait_next_(head) : try_wait_next_(head)))) {
165 // this may happen only if CanSpin is false
166 return NULL;
167 }
168 }
169 }
170 // remove stub from the beginning of the list
171 AtomicOps::store_relaxed(head_, next);
172 head = next;
173 next = AtomicOps::load_acquire(next->next);
174 }
175
176 if (!next) {
177 // head is not stub and head->next == NULL
178
179 if (AtomicOps::load_seq_cst(tail_) == head) {
180 // queue is empty
181 // add stub to the end of the list to ensure that we always
182 // have head->next when removing head and head wont become NULL
183 push_node_(&stub_);
184 }
185
186 // if head->next == NULL here means that a push_node_() call is in progress
187 if (!(next = (CanSpin ? wait_next_(head) : try_wait_next_(head)))) {
188 // this may happen only if CanSpin is false
189 return NULL;
190 }
191 }
192
193 // move list head to the next node
194 AtomicOps::store_relaxed(head_, next);
195
196 return head;
197 }
198
199 // Wait until concurrent push_node_() completes and node->next becomes non-NULL.
200 // This version may block indefinetely.
201 // Usually it returns immediately. It can block only if the thread performing
202 // push_node_() was interrupted exactly after updating tail and before updating
203 // next, and is now sleeping. In this rare case, this method will wait until the
204 // push_node_() thread is resumed and completed.
205 MpscQueueData* wait_next_(MpscQueueData* node) {
206 if (MpscQueueData* next = try_wait_next_(node)) {
207 return next;
208 }
209 for (;;) {
210 if (MpscQueueData* next = AtomicOps::load_seq_cst(node->next)) {
211 return next;
212 }
213 cpu_relax();
214 }
215 }
216
217 // Wait until concurrent push_node_() completes and node->next becomes non-NULL.
218 // This version is non-blocking and gives up after a few re-tries.
219 // Usually it succeedes. It can fail only in the same rare case when
220 // wait_next_() blocks.
221 MpscQueueData* try_wait_next_(MpscQueueData* node) {
222 MpscQueueData* next;
223 if ((next = AtomicOps::load_acquire(node->next))) {
224 return next;
225 }
226 if ((next = AtomicOps::load_acquire(node->next))) {
227 return next;
228 }
229 if ((next = AtomicOps::load_acquire(node->next))) {
230 return next;
231 }
232 return NULL;
233 }
234
235 MpscQueueData* tail_;
236 MpscQueueData* head_;
237
238 MpscQueueData stub_;
239};
240
241} // namespace core
242} // namespace roc
243
244#endif // ROC_CORE_MPSC_QUEUE_H_
static void store_relaxed(T1 &var, T2 val)
Atomic store (no barrier).
Definition atomic_ops.h:66
static T1 exchange_seq_cst(T1 &var, T2 val)
Atomic exchange (full barrier).
Definition atomic_ops.h:106
static T load_relaxed(const T &var)
Atomic load (no barrier).
Definition atomic_ops.h:46
static T load_acquire(const T &var)
Atomic load (acquire barrier).
Definition atomic_ops.h:51
static void store_release(T1 &var, T2 val)
Atomic store (release barrier).
Definition atomic_ops.h:71
static bool compare_exchange_relaxed(T1 &var, T1 &exp, T2 des)
Atomic compare-and-swap (no barrier).
Definition atomic_ops.h:117
static T load_seq_cst(const T &var)
Atomic load (full barrier).
Definition atomic_ops.h:56
Thread-safe lock-free node-based intrusive multi-producer single-consumer queue.
Definition mpsc_queue.h:40
void push_back(T &obj)
Add object to the end of the queue. Can be called concurrently. Acquires ownership of obj....
Definition mpsc_queue.h:73
Pointer pop_front_exclusive()
Remove object from the beginning of the queue (blocking version). Should NOT be called concurrently....
Definition mpsc_queue.h:119
Pointer try_pop_front_exclusive()
Try to remove object from the beginning of the queue (non-blocking version). Should NOT be called con...
Definition mpsc_queue.h:94
OwnershipPolicy< T >::Pointer Pointer
Pointer type.
Definition mpsc_queue.h:45
Base class for non-copyable objects.
Definition noncopyable.h:23
CPU-specific instructions.
MpscQueue node.
void cpu_relax()
CPU pause instruction.
Root namespace.
Non-copyable object.
Ownership policies.
Panic.
#define roc_panic(...)
Print error message and terminate program gracefully.
Definition panic.h:50
MpscQueueNode * container_of()
Get MpscQueueNode object that contains this ListData object.