]> git.proxmox.com Git - rustc.git/blame - src/libstd/sync/mpsc/spsc_queue.rs
New upstream version 1.41.1+dfsg1
[rustc.git] / src / libstd / sync / mpsc / spsc_queue.rs
CommitLineData
1a4d82fc
JJ
1//! A single-producer single-consumer concurrent queue
2//!
3//! This module contains the implementation of an SPSC queue which can be used
bd371182 4//! concurrently between two threads. This data structure is safe to use and
1a4d82fc
JJ
5//! enforces the semantics that there is one pusher and one popper.
6
7cac9316
XL
7// http://www.1024cores.net/home/lock-free-algorithms/queues/unbounded-spsc-queue
8
1a4d82fc 9use core::cell::UnsafeCell;
60c5eb7d 10use core::ptr;
1a4d82fc 11
532ac7d7
XL
12use crate::boxed::Box;
13use crate::sync::atomic::{AtomicPtr, AtomicUsize, Ordering};
1a4d82fc 14
abe05a73
XL
15use super::cache_aligned::CacheAligned;
16
1a4d82fc
JJ
17// Node within the linked list queue of messages to send
18struct Node<T> {
19 // FIXME: this could be an uninitialized T if we're careful enough, and
20 // that would reduce memory usage (and be a bit faster).
21 // is it worth it?
60c5eb7d
XL
22 value: Option<T>, // nullable for re-use of nodes
23 cached: bool, // This node goes into the node cache
24 next: AtomicPtr<Node<T>>, // next node in the queue
1a4d82fc
JJ
25}
26
27/// The single-producer single-consumer queue. This structure is not cloneable,
28/// but it can be safely shared in an Arc if it is guaranteed that there
29/// is only one popper and one pusher touching the queue at any one point in
30/// time.
60c5eb7d 31pub struct Queue<T, ProducerAddition = (), ConsumerAddition = ()> {
1a4d82fc 32 // consumer fields
abe05a73
XL
33 consumer: CacheAligned<Consumer<T, ConsumerAddition>>,
34
35 // producer fields
36 producer: CacheAligned<Producer<T, ProducerAddition>>,
37}
38
39struct Consumer<T, Addition> {
1a4d82fc 40 tail: UnsafeCell<*mut Node<T>>, // where to pop from
60c5eb7d
XL
41 tail_prev: AtomicPtr<Node<T>>, // where to pop from
42 cache_bound: usize, // maximum cache size
43 cached_nodes: AtomicUsize, // number of nodes marked as cachable
abe05a73
XL
44 addition: Addition,
45}
1a4d82fc 46
abe05a73 47struct Producer<T, Addition> {
1a4d82fc
JJ
48 head: UnsafeCell<*mut Node<T>>, // where to push to
49 first: UnsafeCell<*mut Node<T>>, // where to get new nodes from
50 tail_copy: UnsafeCell<*mut Node<T>>, // between first/tail
abe05a73 51 addition: Addition,
1a4d82fc
JJ
52}
53
60c5eb7d 54unsafe impl<T: Send, P: Send + Sync, C: Send + Sync> Send for Queue<T, P, C> {}
1a4d82fc 55
60c5eb7d 56unsafe impl<T: Send, P: Send + Sync, C: Send + Sync> Sync for Queue<T, P, C> {}
1a4d82fc 57
c34b1796 58impl<T> Node<T> {
1a4d82fc 59 fn new() -> *mut Node<T> {
62682a34
SL
60 Box::into_raw(box Node {
61 value: None,
abe05a73 62 cached: false,
62682a34
SL
63 next: AtomicPtr::new(ptr::null_mut::<Node<T>>()),
64 })
1a4d82fc
JJ
65 }
66}
67
abe05a73 68impl<T, ProducerAddition, ConsumerAddition> Queue<T, ProducerAddition, ConsumerAddition> {
abe05a73
XL
69 /// Creates a new queue. With given additional elements in the producer and
70 /// consumer portions of the queue.
71 ///
72 /// Due to the performance implications of cache-contention,
73 /// we wish to keep fields used mainly by the producer on a separate cache
74 /// line than those used by the consumer.
75 /// Since cache lines are usually 64 bytes, it is unreasonably expensive to
76 /// allocate one for small fields, so we allow users to insert additional
77 /// fields into the cache lines already allocated by this for the producer
78 /// and consumer.
1a4d82fc
JJ
79 ///
80 /// This is unsafe as the type system doesn't enforce a single
81 /// consumer-producer relationship. It also allows the consumer to `pop`
82 /// items while there is a `peek` active due to all methods having a
83 /// non-mutable receiver.
84 ///
85 /// # Arguments
86 ///
87 /// * `bound` - This queue implementation is implemented with a linked
88 /// list, and this means that a push is always a malloc. In
89 /// order to amortize this cost, an internal cache of nodes is
90 /// maintained to prevent a malloc from always being
91 /// necessary. This bound is the limit on the size of the
92 /// cache (if desired). If the value is 0, then the cache has
93 /// no bound. Otherwise, the cache will never grow larger than
94 /// `bound` (although the queue itself could be much larger.
abe05a73
XL
95 pub unsafe fn with_additions(
96 bound: usize,
97 producer_addition: ProducerAddition,
98 consumer_addition: ConsumerAddition,
99 ) -> Self {
1a4d82fc
JJ
100 let n1 = Node::new();
101 let n2 = Node::new();
102 (*n1).next.store(n2, Ordering::Relaxed);
103 Queue {
abe05a73
XL
104 consumer: CacheAligned::new(Consumer {
105 tail: UnsafeCell::new(n2),
106 tail_prev: AtomicPtr::new(n1),
107 cache_bound: bound,
108 cached_nodes: AtomicUsize::new(0),
60c5eb7d 109 addition: consumer_addition,
abe05a73
XL
110 }),
111 producer: CacheAligned::new(Producer {
112 head: UnsafeCell::new(n2),
113 first: UnsafeCell::new(n1),
114 tail_copy: UnsafeCell::new(n1),
60c5eb7d 115 addition: producer_addition,
abe05a73 116 }),
1a4d82fc
JJ
117 }
118 }
119
120 /// Pushes a new value onto this queue. Note that to use this function
121 /// safely, it must be externally guaranteed that there is only one pusher.
122 pub fn push(&self, t: T) {
123 unsafe {
124 // Acquire a node (which either uses a cached one or allocates a new
125 // one), and then append this to the 'head' node.
126 let n = self.alloc();
127 assert!((*n).value.is_none());
128 (*n).value = Some(t);
85aaf69f 129 (*n).next.store(ptr::null_mut(), Ordering::Relaxed);
abe05a73
XL
130 (**self.producer.head.get()).next.store(n, Ordering::Release);
131 *(&self.producer.head).get() = n;
1a4d82fc
JJ
132 }
133 }
134
135 unsafe fn alloc(&self) -> *mut Node<T> {
136 // First try to see if we can consume the 'first' node for our uses.
abe05a73
XL
137 if *self.producer.first.get() != *self.producer.tail_copy.get() {
138 let ret = *self.producer.first.get();
139 *self.producer.0.first.get() = (*ret).next.load(Ordering::Relaxed);
1a4d82fc
JJ
140 return ret;
141 }
142 // If the above fails, then update our copy of the tail and try
143 // again.
60c5eb7d 144 *self.producer.0.tail_copy.get() = self.consumer.tail_prev.load(Ordering::Acquire);
abe05a73
XL
145 if *self.producer.first.get() != *self.producer.tail_copy.get() {
146 let ret = *self.producer.first.get();
147 *self.producer.0.first.get() = (*ret).next.load(Ordering::Relaxed);
1a4d82fc
JJ
148 return ret;
149 }
150 // If all of that fails, then we have to allocate a new node
151 // (there's nothing in the node cache).
152 Node::new()
153 }
154
155 /// Attempts to pop a value from this queue. Remember that to use this type
156 /// safely you must ensure that there is only one popper at a time.
157 pub fn pop(&self) -> Option<T> {
158 unsafe {
159 // The `tail` node is not actually a used node, but rather a
160 // sentinel from where we should start popping from. Hence, look at
161 // tail's next field and see if we can use it. If we do a pop, then
162 // the current tail node is a candidate for going into the cache.
abe05a73 163 let tail = *self.consumer.tail.get();
1a4d82fc 164 let next = (*tail).next.load(Ordering::Acquire);
60c5eb7d
XL
165 if next.is_null() {
166 return None;
167 }
1a4d82fc
JJ
168 assert!((*next).value.is_some());
169 let ret = (*next).value.take();
170
abe05a73
XL
171 *self.consumer.0.tail.get() = next;
172 if self.consumer.cache_bound == 0 {
173 self.consumer.tail_prev.store(tail, Ordering::Release);
1a4d82fc 174 } else {
abe05a73
XL
175 let cached_nodes = self.consumer.cached_nodes.load(Ordering::Relaxed);
176 if cached_nodes < self.consumer.cache_bound && !(*tail).cached {
177 self.consumer.cached_nodes.store(cached_nodes, Ordering::Relaxed);
178 (*tail).cached = true;
179 }
180
181 if (*tail).cached {
182 self.consumer.tail_prev.store(tail, Ordering::Release);
1a4d82fc 183 } else {
abe05a73 184 (*self.consumer.tail_prev.load(Ordering::Relaxed))
60c5eb7d
XL
185 .next
186 .store(next, Ordering::Relaxed);
1a4d82fc
JJ
187 // We have successfully erased all references to 'tail', so
188 // now we can safely drop it.
c34b1796 189 let _: Box<Node<T>> = Box::from_raw(tail);
1a4d82fc
JJ
190 }
191 }
e9174d1e 192 ret
1a4d82fc
JJ
193 }
194 }
195
196 /// Attempts to peek at the head of the queue, returning `None` if the queue
197 /// has no data currently
198 ///
199 /// # Warning
200 /// The reference returned is invalid if it is not used before the consumer
201 /// pops the value off the queue. If the producer then pushes another value
202 /// onto the queue, it will overwrite the value pointed to by the reference.
e9174d1e 203 pub fn peek(&self) -> Option<&mut T> {
1a4d82fc
JJ
204 // This is essentially the same as above with all the popping bits
205 // stripped out.
206 unsafe {
abe05a73 207 let tail = *self.consumer.tail.get();
1a4d82fc 208 let next = (*tail).next.load(Ordering::Acquire);
e9174d1e 209 if next.is_null() { None } else { (*next).value.as_mut() }
1a4d82fc
JJ
210 }
211 }
abe05a73
XL
212
213 pub fn producer_addition(&self) -> &ProducerAddition {
214 &self.producer.addition
215 }
216
217 pub fn consumer_addition(&self) -> &ConsumerAddition {
218 &self.consumer.addition
219 }
1a4d82fc
JJ
220}
221
abe05a73 222impl<T, ProducerAddition, ConsumerAddition> Drop for Queue<T, ProducerAddition, ConsumerAddition> {
1a4d82fc
JJ
223 fn drop(&mut self) {
224 unsafe {
abe05a73 225 let mut cur = *self.producer.first.get();
1a4d82fc
JJ
226 while !cur.is_null() {
227 let next = (*cur).next.load(Ordering::Relaxed);
c34b1796 228 let _n: Box<Node<T>> = Box::from_raw(cur);
1a4d82fc
JJ
229 cur = next;
230 }
231 }
232 }
233}
234
c30ab7b3 235#[cfg(all(test, not(target_os = "emscripten")))]
d9579d0f 236mod tests {
1a4d82fc 237 use super::Queue;
60c5eb7d 238 use crate::sync::mpsc::channel;
532ac7d7
XL
239 use crate::sync::Arc;
240 use crate::thread;
1a4d82fc
JJ
241
242 #[test]
243 fn smoke() {
244 unsafe {
abe05a73 245 let queue = Queue::with_additions(0, (), ());
85aaf69f 246 queue.push(1);
1a4d82fc 247 queue.push(2);
85aaf69f 248 assert_eq!(queue.pop(), Some(1));
1a4d82fc
JJ
249 assert_eq!(queue.pop(), Some(2));
250 assert_eq!(queue.pop(), None);
251 queue.push(3);
252 queue.push(4);
253 assert_eq!(queue.pop(), Some(3));
254 assert_eq!(queue.pop(), Some(4));
255 assert_eq!(queue.pop(), None);
256 }
257 }
258
259 #[test]
260 fn peek() {
261 unsafe {
abe05a73 262 let queue = Queue::with_additions(0, (), ());
85aaf69f 263 queue.push(vec![1]);
1a4d82fc
JJ
264
265 // Ensure the borrowchecker works
266 match queue.peek() {
3157f602
XL
267 Some(vec) => {
268 assert_eq!(&*vec, &[1]);
60c5eb7d
XL
269 }
270 None => unreachable!(),
1a4d82fc
JJ
271 }
272
3157f602
XL
273 match queue.pop() {
274 Some(vec) => {
275 assert_eq!(&*vec, &[1]);
60c5eb7d
XL
276 }
277 None => unreachable!(),
3157f602 278 }
1a4d82fc
JJ
279 }
280 }
281
282 #[test]
283 fn drop_full() {
284 unsafe {
abe05a73 285 let q: Queue<Box<_>> = Queue::with_additions(0, (), ());
85aaf69f
SL
286 q.push(box 1);
287 q.push(box 2);
1a4d82fc
JJ
288 }
289 }
290
291 #[test]
292 fn smoke_bound() {
293 unsafe {
abe05a73 294 let q = Queue::with_additions(0, (), ());
85aaf69f 295 q.push(1);
1a4d82fc
JJ
296 q.push(2);
297 assert_eq!(q.pop(), Some(1));
298 assert_eq!(q.pop(), Some(2));
299 assert_eq!(q.pop(), None);
300 q.push(3);
301 q.push(4);
302 assert_eq!(q.pop(), Some(3));
303 assert_eq!(q.pop(), Some(4));
304 assert_eq!(q.pop(), None);
305 }
306 }
307
308 #[test]
309 fn stress() {
310 unsafe {
311 stress_bound(0);
312 stress_bound(1);
313 }
314
c34b1796 315 unsafe fn stress_bound(bound: usize) {
abe05a73 316 let q = Arc::new(Queue::with_additions(bound, (), ()));
1a4d82fc
JJ
317
318 let (tx, rx) = channel();
319 let q2 = q.clone();
60c5eb7d 320 let _t = thread::spawn(move || {
85aaf69f 321 for _ in 0..100000 {
1a4d82fc
JJ
322 loop {
323 match q2.pop() {
85aaf69f 324 Some(1) => break,
1a4d82fc
JJ
325 Some(_) => panic!(),
326 None => {}
327 }
328 }
329 }
330 tx.send(()).unwrap();
331 });
85aaf69f 332 for _ in 0..100000 {
1a4d82fc
JJ
333 q.push(1);
334 }
335 rx.recv().unwrap();
336 }
337 }
338}