]> git.proxmox.com Git - rustc.git/blame - src/libstd/sync/mpsc/mpsc_queue.rs
New upstream version 1.19.0+dfsg1
[rustc.git] / src / libstd / sync / mpsc / mpsc_queue.rs
CommitLineData
7cac9316
XL
1// Copyright 2017 The Rust Project Developers. See the COPYRIGHT
2// file at the top-level directory of this distribution and at
3// http://rust-lang.org/COPYRIGHT.
4//
5// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8// option. This file may not be copied, modified, or distributed
9// except according to those terms.
1a4d82fc
JJ
10
11//! A mostly lock-free multi-producer, single consumer queue.
12//!
13//! This module contains an implementation of a concurrent MPSC queue. This
bd371182 14//! queue can be used to share data between threads, and is also used as the
1a4d82fc
JJ
15//! building block of channels in rust.
16//!
17//! Note that the current implementation of this queue has a caveat of the `pop`
18//! method, and see the method for more information about it. Due to this
19//! caveat, this queue may not be appropriate for all use-cases.
20
1a4d82fc
JJ
21// http://www.1024cores.net/home/lock-free-algorithms
22// /queues/non-intrusive-mpsc-node-based-queue
23
24pub use self::PopResult::*;
25
1a4d82fc 26use alloc::boxed::Box;
85aaf69f 27use core::ptr;
1a4d82fc
JJ
28use core::cell::UnsafeCell;
29
30use sync::atomic::{AtomicPtr, Ordering};
31
32/// A result of the `pop` function.
33pub enum PopResult<T> {
34 /// Some data has been popped
35 Data(T),
36 /// The queue is empty
37 Empty,
38 /// The queue is in an inconsistent state. Popping data should succeed, but
39 /// some pushers have yet to make enough progress in order allow a pop to
40 /// succeed. It is recommended that a pop() occur "in the near future" in
41 /// order to see if the sender has made progress or not
42 Inconsistent,
43}
44
45struct Node<T> {
46 next: AtomicPtr<Node<T>>,
47 value: Option<T>,
48}
49
50/// The multi-producer single-consumer structure. This is not cloneable, but it
51/// may be safely shared so long as it is guaranteed that there is only one
52/// popper at a time (many pushers are allowed).
53pub struct Queue<T> {
54 head: AtomicPtr<Node<T>>,
55 tail: UnsafeCell<*mut Node<T>>,
56}
57
c34b1796
AL
58unsafe impl<T: Send> Send for Queue<T> { }
59unsafe impl<T: Send> Sync for Queue<T> { }
1a4d82fc
JJ
60
61impl<T> Node<T> {
62 unsafe fn new(v: Option<T>) -> *mut Node<T> {
62682a34 63 Box::into_raw(box Node {
85aaf69f 64 next: AtomicPtr::new(ptr::null_mut()),
1a4d82fc
JJ
65 value: v,
66 })
67 }
68}
69
c34b1796 70impl<T> Queue<T> {
1a4d82fc
JJ
71 /// Creates a new queue that is safe to share among multiple producers and
72 /// one consumer.
73 pub fn new() -> Queue<T> {
74 let stub = unsafe { Node::new(None) };
75 Queue {
76 head: AtomicPtr::new(stub),
77 tail: UnsafeCell::new(stub),
78 }
79 }
80
81 /// Pushes a new value onto this queue.
82 pub fn push(&self, t: T) {
83 unsafe {
84 let n = Node::new(Some(t));
85 let prev = self.head.swap(n, Ordering::AcqRel);
86 (*prev).next.store(n, Ordering::Release);
87 }
88 }
89
90 /// Pops some data from this queue.
91 ///
92 /// Note that the current implementation means that this function cannot
93 /// return `Option<T>`. It is possible for this queue to be in an
94 /// inconsistent state where many pushes have succeeded and completely
95 /// finished, but pops cannot return `Some(t)`. This inconsistent state
96 /// happens when a pusher is pre-empted at an inopportune moment.
97 ///
98 /// This inconsistent state means that this queue does indeed have data, but
99 /// it does not currently have access to it at this time.
100 pub fn pop(&self) -> PopResult<T> {
101 unsafe {
102 let tail = *self.tail.get();
103 let next = (*tail).next.load(Ordering::Acquire);
104
105 if !next.is_null() {
106 *self.tail.get() = next;
107 assert!((*tail).value.is_none());
108 assert!((*next).value.is_some());
109 let ret = (*next).value.take().unwrap();
c34b1796 110 let _: Box<Node<T>> = Box::from_raw(tail);
1a4d82fc
JJ
111 return Data(ret);
112 }
113
114 if self.head.load(Ordering::Acquire) == tail {Empty} else {Inconsistent}
115 }
116 }
117}
118
c34b1796 119impl<T> Drop for Queue<T> {
1a4d82fc
JJ
120 fn drop(&mut self) {
121 unsafe {
122 let mut cur = *self.tail.get();
123 while !cur.is_null() {
124 let next = (*cur).next.load(Ordering::Relaxed);
c34b1796 125 let _: Box<Node<T>> = Box::from_raw(cur);
1a4d82fc
JJ
126 cur = next;
127 }
128 }
129 }
130}
131
c30ab7b3 132#[cfg(all(test, not(target_os = "emscripten")))]
1a4d82fc 133mod tests {
1a4d82fc
JJ
134 use sync::mpsc::channel;
135 use super::{Queue, Data, Empty, Inconsistent};
136 use sync::Arc;
85aaf69f 137 use thread;
1a4d82fc
JJ
138
139 #[test]
140 fn test_full() {
c34b1796 141 let q: Queue<Box<_>> = Queue::new();
85aaf69f
SL
142 q.push(box 1);
143 q.push(box 2);
1a4d82fc
JJ
144 }
145
146 #[test]
147 fn test() {
85aaf69f
SL
148 let nthreads = 8;
149 let nmsgs = 1000;
1a4d82fc
JJ
150 let q = Queue::new();
151 match q.pop() {
152 Empty => {}
153 Inconsistent | Data(..) => panic!()
154 }
155 let (tx, rx) = channel();
156 let q = Arc::new(q);
157
85aaf69f 158 for _ in 0..nthreads {
1a4d82fc
JJ
159 let tx = tx.clone();
160 let q = q.clone();
85aaf69f
SL
161 thread::spawn(move|| {
162 for i in 0..nmsgs {
1a4d82fc
JJ
163 q.push(i);
164 }
165 tx.send(()).unwrap();
166 });
167 }
168
85aaf69f 169 let mut i = 0;
1a4d82fc
JJ
170 while i < nthreads * nmsgs {
171 match q.pop() {
172 Empty | Inconsistent => {},
173 Data(_) => { i += 1 }
174 }
175 }
176 drop(tx);
85aaf69f 177 for _ in 0..nthreads {
1a4d82fc
JJ
178 rx.recv().unwrap();
179 }
180 }
181}