]> git.proxmox.com Git - rustc.git/blame - src/libstd/sync/mpsc/spsc_queue.rs
New upstream version 1.19.0+dfsg1
[rustc.git] / src / libstd / sync / mpsc / spsc_queue.rs
CommitLineData
7cac9316
XL
1// Copyright 2017 The Rust Project Developers. See the COPYRIGHT
2// file at the top-level directory of this distribution and at
3// http://rust-lang.org/COPYRIGHT.
4//
5// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8// option. This file may not be copied, modified, or distributed
9// except according to those terms.
1a4d82fc
JJ
10
11//! A single-producer single-consumer concurrent queue
12//!
13//! This module contains the implementation of an SPSC queue which can be used
bd371182 14//! concurrently between two threads. This data structure is safe to use and
1a4d82fc
JJ
15//! enforces the semantics that there is one pusher and one popper.
16
7cac9316
XL
17// http://www.1024cores.net/home/lock-free-algorithms/queues/unbounded-spsc-queue
18
1a4d82fc 19use alloc::boxed::Box;
85aaf69f 20use core::ptr;
1a4d82fc
JJ
21use core::cell::UnsafeCell;
22
85aaf69f 23use sync::atomic::{AtomicPtr, AtomicUsize, Ordering};
1a4d82fc
JJ
24
25// Node within the linked list queue of messages to send
26struct Node<T> {
27 // FIXME: this could be an uninitialized T if we're careful enough, and
28 // that would reduce memory usage (and be a bit faster).
29 // is it worth it?
30 value: Option<T>, // nullable for re-use of nodes
31 next: AtomicPtr<Node<T>>, // next node in the queue
32}
33
34/// The single-producer single-consumer queue. This structure is not cloneable,
35/// but it can be safely shared in an Arc if it is guaranteed that there
36/// is only one popper and one pusher touching the queue at any one point in
37/// time.
38pub struct Queue<T> {
39 // consumer fields
40 tail: UnsafeCell<*mut Node<T>>, // where to pop from
41 tail_prev: AtomicPtr<Node<T>>, // where to pop from
42
43 // producer fields
44 head: UnsafeCell<*mut Node<T>>, // where to push to
45 first: UnsafeCell<*mut Node<T>>, // where to get new nodes from
46 tail_copy: UnsafeCell<*mut Node<T>>, // between first/tail
47
48 // Cache maintenance fields. Additions and subtractions are stored
49 // separately in order to allow them to use nonatomic addition/subtraction.
c34b1796 50 cache_bound: usize,
85aaf69f
SL
51 cache_additions: AtomicUsize,
52 cache_subtractions: AtomicUsize,
1a4d82fc
JJ
53}
54
c34b1796 55unsafe impl<T: Send> Send for Queue<T> { }
1a4d82fc 56
c34b1796 57unsafe impl<T: Send> Sync for Queue<T> { }
1a4d82fc 58
c34b1796 59impl<T> Node<T> {
1a4d82fc 60 fn new() -> *mut Node<T> {
62682a34
SL
61 Box::into_raw(box Node {
62 value: None,
63 next: AtomicPtr::new(ptr::null_mut::<Node<T>>()),
64 })
1a4d82fc
JJ
65 }
66}
67
c34b1796 68impl<T> Queue<T> {
1a4d82fc
JJ
69 /// Creates a new queue.
70 ///
71 /// This is unsafe as the type system doesn't enforce a single
72 /// consumer-producer relationship. It also allows the consumer to `pop`
73 /// items while there is a `peek` active due to all methods having a
74 /// non-mutable receiver.
75 ///
76 /// # Arguments
77 ///
78 /// * `bound` - This queue implementation is implemented with a linked
79 /// list, and this means that a push is always a malloc. In
80 /// order to amortize this cost, an internal cache of nodes is
81 /// maintained to prevent a malloc from always being
82 /// necessary. This bound is the limit on the size of the
83 /// cache (if desired). If the value is 0, then the cache has
84 /// no bound. Otherwise, the cache will never grow larger than
85 /// `bound` (although the queue itself could be much larger.
c34b1796 86 pub unsafe fn new(bound: usize) -> Queue<T> {
1a4d82fc
JJ
87 let n1 = Node::new();
88 let n2 = Node::new();
89 (*n1).next.store(n2, Ordering::Relaxed);
90 Queue {
91 tail: UnsafeCell::new(n2),
92 tail_prev: AtomicPtr::new(n1),
93 head: UnsafeCell::new(n2),
94 first: UnsafeCell::new(n1),
95 tail_copy: UnsafeCell::new(n1),
96 cache_bound: bound,
85aaf69f
SL
97 cache_additions: AtomicUsize::new(0),
98 cache_subtractions: AtomicUsize::new(0),
1a4d82fc
JJ
99 }
100 }
101
102 /// Pushes a new value onto this queue. Note that to use this function
103 /// safely, it must be externally guaranteed that there is only one pusher.
104 pub fn push(&self, t: T) {
105 unsafe {
106 // Acquire a node (which either uses a cached one or allocates a new
107 // one), and then append this to the 'head' node.
108 let n = self.alloc();
109 assert!((*n).value.is_none());
110 (*n).value = Some(t);
85aaf69f 111 (*n).next.store(ptr::null_mut(), Ordering::Relaxed);
1a4d82fc
JJ
112 (**self.head.get()).next.store(n, Ordering::Release);
113 *self.head.get() = n;
114 }
115 }
116
117 unsafe fn alloc(&self) -> *mut Node<T> {
118 // First try to see if we can consume the 'first' node for our uses.
119 // We try to avoid as many atomic instructions as possible here, so
120 // the addition to cache_subtractions is not atomic (plus we're the
121 // only one subtracting from the cache).
122 if *self.first.get() != *self.tail_copy.get() {
123 if self.cache_bound > 0 {
124 let b = self.cache_subtractions.load(Ordering::Relaxed);
125 self.cache_subtractions.store(b + 1, Ordering::Relaxed);
126 }
127 let ret = *self.first.get();
128 *self.first.get() = (*ret).next.load(Ordering::Relaxed);
129 return ret;
130 }
131 // If the above fails, then update our copy of the tail and try
132 // again.
133 *self.tail_copy.get() = self.tail_prev.load(Ordering::Acquire);
134 if *self.first.get() != *self.tail_copy.get() {
135 if self.cache_bound > 0 {
136 let b = self.cache_subtractions.load(Ordering::Relaxed);
137 self.cache_subtractions.store(b + 1, Ordering::Relaxed);
138 }
139 let ret = *self.first.get();
140 *self.first.get() = (*ret).next.load(Ordering::Relaxed);
141 return ret;
142 }
143 // If all of that fails, then we have to allocate a new node
144 // (there's nothing in the node cache).
145 Node::new()
146 }
147
148 /// Attempts to pop a value from this queue. Remember that to use this type
149 /// safely you must ensure that there is only one popper at a time.
150 pub fn pop(&self) -> Option<T> {
151 unsafe {
152 // The `tail` node is not actually a used node, but rather a
153 // sentinel from where we should start popping from. Hence, look at
154 // tail's next field and see if we can use it. If we do a pop, then
155 // the current tail node is a candidate for going into the cache.
156 let tail = *self.tail.get();
157 let next = (*tail).next.load(Ordering::Acquire);
158 if next.is_null() { return None }
159 assert!((*next).value.is_some());
160 let ret = (*next).value.take();
161
162 *self.tail.get() = next;
163 if self.cache_bound == 0 {
164 self.tail_prev.store(tail, Ordering::Release);
165 } else {
166 // FIXME: this is dubious with overflow.
167 let additions = self.cache_additions.load(Ordering::Relaxed);
168 let subtractions = self.cache_subtractions.load(Ordering::Relaxed);
169 let size = additions - subtractions;
170
171 if size < self.cache_bound {
172 self.tail_prev.store(tail, Ordering::Release);
173 self.cache_additions.store(additions + 1, Ordering::Relaxed);
174 } else {
175 (*self.tail_prev.load(Ordering::Relaxed))
176 .next.store(next, Ordering::Relaxed);
177 // We have successfully erased all references to 'tail', so
178 // now we can safely drop it.
c34b1796 179 let _: Box<Node<T>> = Box::from_raw(tail);
1a4d82fc
JJ
180 }
181 }
e9174d1e 182 ret
1a4d82fc
JJ
183 }
184 }
185
186 /// Attempts to peek at the head of the queue, returning `None` if the queue
187 /// has no data currently
188 ///
189 /// # Warning
190 /// The reference returned is invalid if it is not used before the consumer
191 /// pops the value off the queue. If the producer then pushes another value
192 /// onto the queue, it will overwrite the value pointed to by the reference.
e9174d1e 193 pub fn peek(&self) -> Option<&mut T> {
1a4d82fc
JJ
194 // This is essentially the same as above with all the popping bits
195 // stripped out.
196 unsafe {
197 let tail = *self.tail.get();
198 let next = (*tail).next.load(Ordering::Acquire);
e9174d1e 199 if next.is_null() { None } else { (*next).value.as_mut() }
1a4d82fc
JJ
200 }
201 }
202}
203
c34b1796 204impl<T> Drop for Queue<T> {
1a4d82fc
JJ
205 fn drop(&mut self) {
206 unsafe {
207 let mut cur = *self.first.get();
208 while !cur.is_null() {
209 let next = (*cur).next.load(Ordering::Relaxed);
c34b1796 210 let _n: Box<Node<T>> = Box::from_raw(cur);
1a4d82fc
JJ
211 cur = next;
212 }
213 }
214 }
215}
216
c30ab7b3 217#[cfg(all(test, not(target_os = "emscripten")))]
d9579d0f 218mod tests {
1a4d82fc
JJ
219 use sync::Arc;
220 use super::Queue;
85aaf69f 221 use thread;
1a4d82fc
JJ
222 use sync::mpsc::channel;
223
224 #[test]
225 fn smoke() {
226 unsafe {
227 let queue = Queue::new(0);
85aaf69f 228 queue.push(1);
1a4d82fc 229 queue.push(2);
85aaf69f 230 assert_eq!(queue.pop(), Some(1));
1a4d82fc
JJ
231 assert_eq!(queue.pop(), Some(2));
232 assert_eq!(queue.pop(), None);
233 queue.push(3);
234 queue.push(4);
235 assert_eq!(queue.pop(), Some(3));
236 assert_eq!(queue.pop(), Some(4));
237 assert_eq!(queue.pop(), None);
238 }
239 }
240
241 #[test]
242 fn peek() {
243 unsafe {
244 let queue = Queue::new(0);
85aaf69f 245 queue.push(vec![1]);
1a4d82fc
JJ
246
247 // Ensure the borrowchecker works
248 match queue.peek() {
3157f602
XL
249 Some(vec) => {
250 assert_eq!(&*vec, &[1]);
1a4d82fc
JJ
251 },
252 None => unreachable!()
253 }
254
3157f602
XL
255 match queue.pop() {
256 Some(vec) => {
257 assert_eq!(&*vec, &[1]);
258 },
259 None => unreachable!()
260 }
1a4d82fc
JJ
261 }
262 }
263
264 #[test]
265 fn drop_full() {
266 unsafe {
c34b1796 267 let q: Queue<Box<_>> = Queue::new(0);
85aaf69f
SL
268 q.push(box 1);
269 q.push(box 2);
1a4d82fc
JJ
270 }
271 }
272
273 #[test]
274 fn smoke_bound() {
275 unsafe {
276 let q = Queue::new(0);
85aaf69f 277 q.push(1);
1a4d82fc
JJ
278 q.push(2);
279 assert_eq!(q.pop(), Some(1));
280 assert_eq!(q.pop(), Some(2));
281 assert_eq!(q.pop(), None);
282 q.push(3);
283 q.push(4);
284 assert_eq!(q.pop(), Some(3));
285 assert_eq!(q.pop(), Some(4));
286 assert_eq!(q.pop(), None);
287 }
288 }
289
290 #[test]
291 fn stress() {
292 unsafe {
293 stress_bound(0);
294 stress_bound(1);
295 }
296
c34b1796 297 unsafe fn stress_bound(bound: usize) {
1a4d82fc
JJ
298 let q = Arc::new(Queue::new(bound));
299
300 let (tx, rx) = channel();
301 let q2 = q.clone();
85aaf69f
SL
302 let _t = thread::spawn(move|| {
303 for _ in 0..100000 {
1a4d82fc
JJ
304 loop {
305 match q2.pop() {
85aaf69f 306 Some(1) => break,
1a4d82fc
JJ
307 Some(_) => panic!(),
308 None => {}
309 }
310 }
311 }
312 tx.send(()).unwrap();
313 });
85aaf69f 314 for _ in 0..100000 {
1a4d82fc
JJ
315 q.push(1);
316 }
317 rx.recv().unwrap();
318 }
319 }
320}