]> git.proxmox.com Git - rustc.git/blob - src/libstd/sync/mpsc/spsc_queue.rs
New upstream version 1.27.1+dfsg1
[rustc.git] / src / libstd / sync / mpsc / spsc_queue.rs
1 // Copyright 2017 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 //! A single-producer single-consumer concurrent queue
12 //!
13 //! This module contains the implementation of an SPSC queue which can be used
14 //! concurrently between two threads. This data structure is safe to use and
15 //! enforces the semantics that there is one pusher and one popper.
16
17 // http://www.1024cores.net/home/lock-free-algorithms/queues/unbounded-spsc-queue
18
19 use boxed::Box;
20 use core::ptr;
21 use core::cell::UnsafeCell;
22
23 use sync::atomic::{AtomicPtr, AtomicUsize, Ordering};
24
25 use super::cache_aligned::CacheAligned;
26
27 // Node within the linked list queue of messages to send
28 struct Node<T> {
29 // FIXME: this could be an uninitialized T if we're careful enough, and
30 // that would reduce memory usage (and be a bit faster).
31 // is it worth it?
32 value: Option<T>, // nullable for re-use of nodes
33 cached: bool, // This node goes into the node cache
34 next: AtomicPtr<Node<T>>, // next node in the queue
35 }
36
37 /// The single-producer single-consumer queue. This structure is not cloneable,
38 /// but it can be safely shared in an Arc if it is guaranteed that there
39 /// is only one popper and one pusher touching the queue at any one point in
40 /// time.
41 pub struct Queue<T, ProducerAddition=(), ConsumerAddition=()> {
42 // consumer fields
43 consumer: CacheAligned<Consumer<T, ConsumerAddition>>,
44
45 // producer fields
46 producer: CacheAligned<Producer<T, ProducerAddition>>,
47 }
48
49 struct Consumer<T, Addition> {
50 tail: UnsafeCell<*mut Node<T>>, // where to pop from
51 tail_prev: AtomicPtr<Node<T>>, // where to pop from
52 cache_bound: usize, // maximum cache size
53 cached_nodes: AtomicUsize, // number of nodes marked as cachable
54 addition: Addition,
55 }
56
57 struct Producer<T, Addition> {
58 head: UnsafeCell<*mut Node<T>>, // where to push to
59 first: UnsafeCell<*mut Node<T>>, // where to get new nodes from
60 tail_copy: UnsafeCell<*mut Node<T>>, // between first/tail
61 addition: Addition,
62 }
63
64 unsafe impl<T: Send, P: Send + Sync, C: Send + Sync> Send for Queue<T, P, C> { }
65
66 unsafe impl<T: Send, P: Send + Sync, C: Send + Sync> Sync for Queue<T, P, C> { }
67
68 impl<T> Node<T> {
69 fn new() -> *mut Node<T> {
70 Box::into_raw(box Node {
71 value: None,
72 cached: false,
73 next: AtomicPtr::new(ptr::null_mut::<Node<T>>()),
74 })
75 }
76 }
77
78 impl<T, ProducerAddition, ConsumerAddition> Queue<T, ProducerAddition, ConsumerAddition> {
79
80 /// Creates a new queue. With given additional elements in the producer and
81 /// consumer portions of the queue.
82 ///
83 /// Due to the performance implications of cache-contention,
84 /// we wish to keep fields used mainly by the producer on a separate cache
85 /// line than those used by the consumer.
86 /// Since cache lines are usually 64 bytes, it is unreasonably expensive to
87 /// allocate one for small fields, so we allow users to insert additional
88 /// fields into the cache lines already allocated by this for the producer
89 /// and consumer.
90 ///
91 /// This is unsafe as the type system doesn't enforce a single
92 /// consumer-producer relationship. It also allows the consumer to `pop`
93 /// items while there is a `peek` active due to all methods having a
94 /// non-mutable receiver.
95 ///
96 /// # Arguments
97 ///
98 /// * `bound` - This queue implementation is implemented with a linked
99 /// list, and this means that a push is always a malloc. In
100 /// order to amortize this cost, an internal cache of nodes is
101 /// maintained to prevent a malloc from always being
102 /// necessary. This bound is the limit on the size of the
103 /// cache (if desired). If the value is 0, then the cache has
104 /// no bound. Otherwise, the cache will never grow larger than
105 /// `bound` (although the queue itself could be much larger.
106 pub unsafe fn with_additions(
107 bound: usize,
108 producer_addition: ProducerAddition,
109 consumer_addition: ConsumerAddition,
110 ) -> Self {
111 let n1 = Node::new();
112 let n2 = Node::new();
113 (*n1).next.store(n2, Ordering::Relaxed);
114 Queue {
115 consumer: CacheAligned::new(Consumer {
116 tail: UnsafeCell::new(n2),
117 tail_prev: AtomicPtr::new(n1),
118 cache_bound: bound,
119 cached_nodes: AtomicUsize::new(0),
120 addition: consumer_addition
121 }),
122 producer: CacheAligned::new(Producer {
123 head: UnsafeCell::new(n2),
124 first: UnsafeCell::new(n1),
125 tail_copy: UnsafeCell::new(n1),
126 addition: producer_addition
127 }),
128 }
129 }
130
131 /// Pushes a new value onto this queue. Note that to use this function
132 /// safely, it must be externally guaranteed that there is only one pusher.
133 pub fn push(&self, t: T) {
134 unsafe {
135 // Acquire a node (which either uses a cached one or allocates a new
136 // one), and then append this to the 'head' node.
137 let n = self.alloc();
138 assert!((*n).value.is_none());
139 (*n).value = Some(t);
140 (*n).next.store(ptr::null_mut(), Ordering::Relaxed);
141 (**self.producer.head.get()).next.store(n, Ordering::Release);
142 *(&self.producer.head).get() = n;
143 }
144 }
145
146 unsafe fn alloc(&self) -> *mut Node<T> {
147 // First try to see if we can consume the 'first' node for our uses.
148 if *self.producer.first.get() != *self.producer.tail_copy.get() {
149 let ret = *self.producer.first.get();
150 *self.producer.0.first.get() = (*ret).next.load(Ordering::Relaxed);
151 return ret;
152 }
153 // If the above fails, then update our copy of the tail and try
154 // again.
155 *self.producer.0.tail_copy.get() =
156 self.consumer.tail_prev.load(Ordering::Acquire);
157 if *self.producer.first.get() != *self.producer.tail_copy.get() {
158 let ret = *self.producer.first.get();
159 *self.producer.0.first.get() = (*ret).next.load(Ordering::Relaxed);
160 return ret;
161 }
162 // If all of that fails, then we have to allocate a new node
163 // (there's nothing in the node cache).
164 Node::new()
165 }
166
167 /// Attempts to pop a value from this queue. Remember that to use this type
168 /// safely you must ensure that there is only one popper at a time.
169 pub fn pop(&self) -> Option<T> {
170 unsafe {
171 // The `tail` node is not actually a used node, but rather a
172 // sentinel from where we should start popping from. Hence, look at
173 // tail's next field and see if we can use it. If we do a pop, then
174 // the current tail node is a candidate for going into the cache.
175 let tail = *self.consumer.tail.get();
176 let next = (*tail).next.load(Ordering::Acquire);
177 if next.is_null() { return None }
178 assert!((*next).value.is_some());
179 let ret = (*next).value.take();
180
181 *self.consumer.0.tail.get() = next;
182 if self.consumer.cache_bound == 0 {
183 self.consumer.tail_prev.store(tail, Ordering::Release);
184 } else {
185 let cached_nodes = self.consumer.cached_nodes.load(Ordering::Relaxed);
186 if cached_nodes < self.consumer.cache_bound && !(*tail).cached {
187 self.consumer.cached_nodes.store(cached_nodes, Ordering::Relaxed);
188 (*tail).cached = true;
189 }
190
191 if (*tail).cached {
192 self.consumer.tail_prev.store(tail, Ordering::Release);
193 } else {
194 (*self.consumer.tail_prev.load(Ordering::Relaxed))
195 .next.store(next, Ordering::Relaxed);
196 // We have successfully erased all references to 'tail', so
197 // now we can safely drop it.
198 let _: Box<Node<T>> = Box::from_raw(tail);
199 }
200 }
201 ret
202 }
203 }
204
205 /// Attempts to peek at the head of the queue, returning `None` if the queue
206 /// has no data currently
207 ///
208 /// # Warning
209 /// The reference returned is invalid if it is not used before the consumer
210 /// pops the value off the queue. If the producer then pushes another value
211 /// onto the queue, it will overwrite the value pointed to by the reference.
212 pub fn peek(&self) -> Option<&mut T> {
213 // This is essentially the same as above with all the popping bits
214 // stripped out.
215 unsafe {
216 let tail = *self.consumer.tail.get();
217 let next = (*tail).next.load(Ordering::Acquire);
218 if next.is_null() { None } else { (*next).value.as_mut() }
219 }
220 }
221
222 pub fn producer_addition(&self) -> &ProducerAddition {
223 &self.producer.addition
224 }
225
226 pub fn consumer_addition(&self) -> &ConsumerAddition {
227 &self.consumer.addition
228 }
229 }
230
231 impl<T, ProducerAddition, ConsumerAddition> Drop for Queue<T, ProducerAddition, ConsumerAddition> {
232 fn drop(&mut self) {
233 unsafe {
234 let mut cur = *self.producer.first.get();
235 while !cur.is_null() {
236 let next = (*cur).next.load(Ordering::Relaxed);
237 let _n: Box<Node<T>> = Box::from_raw(cur);
238 cur = next;
239 }
240 }
241 }
242 }
243
244 #[cfg(all(test, not(target_os = "emscripten")))]
245 mod tests {
246 use sync::Arc;
247 use super::Queue;
248 use thread;
249 use sync::mpsc::channel;
250
251 #[test]
252 fn smoke() {
253 unsafe {
254 let queue = Queue::with_additions(0, (), ());
255 queue.push(1);
256 queue.push(2);
257 assert_eq!(queue.pop(), Some(1));
258 assert_eq!(queue.pop(), Some(2));
259 assert_eq!(queue.pop(), None);
260 queue.push(3);
261 queue.push(4);
262 assert_eq!(queue.pop(), Some(3));
263 assert_eq!(queue.pop(), Some(4));
264 assert_eq!(queue.pop(), None);
265 }
266 }
267
268 #[test]
269 fn peek() {
270 unsafe {
271 let queue = Queue::with_additions(0, (), ());
272 queue.push(vec![1]);
273
274 // Ensure the borrowchecker works
275 match queue.peek() {
276 Some(vec) => {
277 assert_eq!(&*vec, &[1]);
278 },
279 None => unreachable!()
280 }
281
282 match queue.pop() {
283 Some(vec) => {
284 assert_eq!(&*vec, &[1]);
285 },
286 None => unreachable!()
287 }
288 }
289 }
290
291 #[test]
292 fn drop_full() {
293 unsafe {
294 let q: Queue<Box<_>> = Queue::with_additions(0, (), ());
295 q.push(box 1);
296 q.push(box 2);
297 }
298 }
299
300 #[test]
301 fn smoke_bound() {
302 unsafe {
303 let q = Queue::with_additions(0, (), ());
304 q.push(1);
305 q.push(2);
306 assert_eq!(q.pop(), Some(1));
307 assert_eq!(q.pop(), Some(2));
308 assert_eq!(q.pop(), None);
309 q.push(3);
310 q.push(4);
311 assert_eq!(q.pop(), Some(3));
312 assert_eq!(q.pop(), Some(4));
313 assert_eq!(q.pop(), None);
314 }
315 }
316
317 #[test]
318 fn stress() {
319 unsafe {
320 stress_bound(0);
321 stress_bound(1);
322 }
323
324 unsafe fn stress_bound(bound: usize) {
325 let q = Arc::new(Queue::with_additions(bound, (), ()));
326
327 let (tx, rx) = channel();
328 let q2 = q.clone();
329 let _t = thread::spawn(move|| {
330 for _ in 0..100000 {
331 loop {
332 match q2.pop() {
333 Some(1) => break,
334 Some(_) => panic!(),
335 None => {}
336 }
337 }
338 }
339 tx.send(()).unwrap();
340 });
341 for _ in 0..100000 {
342 q.push(1);
343 }
344 rx.recv().unwrap();
345 }
346 }
347 }