]> git.proxmox.com Git - rustc.git/blame - src/libstd/sync/mpsc/mpsc_queue.rs
Imported Upstream version 1.2.0+dfsg1
[rustc.git] / src / libstd / sync / mpsc / mpsc_queue.rs
CommitLineData
1a4d82fc
JJ
1/* Copyright (c) 2010-2011 Dmitry Vyukov. All rights reserved.
2 * Redistribution and use in source and binary forms, with or without
3 * modification, are permitted provided that the following conditions are met:
4 *
5 * 1. Redistributions of source code must retain the above copyright notice,
6 * this list of conditions and the following disclaimer.
7 *
8 * 2. Redistributions in binary form must reproduce the above copyright
9 * notice, this list of conditions and the following disclaimer in the
10 * documentation and/or other materials provided with the distribution.
11 *
12 * THIS SOFTWARE IS PROVIDED BY DMITRY VYUKOV "AS IS" AND ANY EXPRESS OR IMPLIED
13 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
14 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT
15 * SHALL DMITRY VYUKOV OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
16 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
17 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
18 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
19 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE
20 * OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
21 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
22 *
23 * The views and conclusions contained in the software and documentation are
24 * those of the authors and should not be interpreted as representing official
25 * policies, either expressed or implied, of Dmitry Vyukov.
26 */
27
28//! A mostly lock-free multi-producer, single consumer queue.
29//!
30//! This module contains an implementation of a concurrent MPSC queue. This
bd371182 31//! queue can be used to share data between threads, and is also used as the
1a4d82fc
JJ
32//! building block of channels in rust.
33//!
34//! Note that the current implementation of this queue has a caveat of the `pop`
35//! method, and see the method for more information about it. Due to this
36//! caveat, this queue may not be appropriate for all use-cases.
37
1a4d82fc
JJ
38// http://www.1024cores.net/home/lock-free-algorithms
39// /queues/non-intrusive-mpsc-node-based-queue
40
41pub use self::PopResult::*;
42
43use core::prelude::*;
44
45use alloc::boxed::Box;
85aaf69f 46use core::ptr;
1a4d82fc
JJ
47use core::cell::UnsafeCell;
48
49use sync::atomic::{AtomicPtr, Ordering};
50
51/// A result of the `pop` function.
52pub enum PopResult<T> {
53 /// Some data has been popped
54 Data(T),
55 /// The queue is empty
56 Empty,
57 /// The queue is in an inconsistent state. Popping data should succeed, but
58 /// some pushers have yet to make enough progress in order allow a pop to
59 /// succeed. It is recommended that a pop() occur "in the near future" in
60 /// order to see if the sender has made progress or not
61 Inconsistent,
62}
63
64struct Node<T> {
65 next: AtomicPtr<Node<T>>,
66 value: Option<T>,
67}
68
69/// The multi-producer single-consumer structure. This is not cloneable, but it
70/// may be safely shared so long as it is guaranteed that there is only one
71/// popper at a time (many pushers are allowed).
72pub struct Queue<T> {
73 head: AtomicPtr<Node<T>>,
74 tail: UnsafeCell<*mut Node<T>>,
75}
76
c34b1796
AL
77unsafe impl<T: Send> Send for Queue<T> { }
78unsafe impl<T: Send> Sync for Queue<T> { }
1a4d82fc
JJ
79
80impl<T> Node<T> {
81 unsafe fn new(v: Option<T>) -> *mut Node<T> {
62682a34 82 Box::into_raw(box Node {
85aaf69f 83 next: AtomicPtr::new(ptr::null_mut()),
1a4d82fc
JJ
84 value: v,
85 })
86 }
87}
88
c34b1796 89impl<T> Queue<T> {
1a4d82fc
JJ
90 /// Creates a new queue that is safe to share among multiple producers and
91 /// one consumer.
92 pub fn new() -> Queue<T> {
93 let stub = unsafe { Node::new(None) };
94 Queue {
95 head: AtomicPtr::new(stub),
96 tail: UnsafeCell::new(stub),
97 }
98 }
99
100 /// Pushes a new value onto this queue.
101 pub fn push(&self, t: T) {
102 unsafe {
103 let n = Node::new(Some(t));
104 let prev = self.head.swap(n, Ordering::AcqRel);
105 (*prev).next.store(n, Ordering::Release);
106 }
107 }
108
109 /// Pops some data from this queue.
110 ///
111 /// Note that the current implementation means that this function cannot
112 /// return `Option<T>`. It is possible for this queue to be in an
113 /// inconsistent state where many pushes have succeeded and completely
114 /// finished, but pops cannot return `Some(t)`. This inconsistent state
115 /// happens when a pusher is pre-empted at an inopportune moment.
116 ///
117 /// This inconsistent state means that this queue does indeed have data, but
118 /// it does not currently have access to it at this time.
119 pub fn pop(&self) -> PopResult<T> {
120 unsafe {
121 let tail = *self.tail.get();
122 let next = (*tail).next.load(Ordering::Acquire);
123
124 if !next.is_null() {
125 *self.tail.get() = next;
126 assert!((*tail).value.is_none());
127 assert!((*next).value.is_some());
128 let ret = (*next).value.take().unwrap();
c34b1796 129 let _: Box<Node<T>> = Box::from_raw(tail);
1a4d82fc
JJ
130 return Data(ret);
131 }
132
133 if self.head.load(Ordering::Acquire) == tail {Empty} else {Inconsistent}
134 }
135 }
136}
137
85aaf69f 138#[stable(feature = "rust1", since = "1.0.0")]
c34b1796 139impl<T> Drop for Queue<T> {
1a4d82fc
JJ
140 fn drop(&mut self) {
141 unsafe {
142 let mut cur = *self.tail.get();
143 while !cur.is_null() {
144 let next = (*cur).next.load(Ordering::Relaxed);
c34b1796 145 let _: Box<Node<T>> = Box::from_raw(cur);
1a4d82fc
JJ
146 cur = next;
147 }
148 }
149 }
150}
151
152#[cfg(test)]
153mod tests {
154 use prelude::v1::*;
155
156 use sync::mpsc::channel;
157 use super::{Queue, Data, Empty, Inconsistent};
158 use sync::Arc;
85aaf69f 159 use thread;
1a4d82fc
JJ
160
161 #[test]
162 fn test_full() {
c34b1796 163 let q: Queue<Box<_>> = Queue::new();
85aaf69f
SL
164 q.push(box 1);
165 q.push(box 2);
1a4d82fc
JJ
166 }
167
168 #[test]
169 fn test() {
85aaf69f
SL
170 let nthreads = 8;
171 let nmsgs = 1000;
1a4d82fc
JJ
172 let q = Queue::new();
173 match q.pop() {
174 Empty => {}
175 Inconsistent | Data(..) => panic!()
176 }
177 let (tx, rx) = channel();
178 let q = Arc::new(q);
179
85aaf69f 180 for _ in 0..nthreads {
1a4d82fc
JJ
181 let tx = tx.clone();
182 let q = q.clone();
85aaf69f
SL
183 thread::spawn(move|| {
184 for i in 0..nmsgs {
1a4d82fc
JJ
185 q.push(i);
186 }
187 tx.send(()).unwrap();
188 });
189 }
190
85aaf69f 191 let mut i = 0;
1a4d82fc
JJ
192 while i < nthreads * nmsgs {
193 match q.pop() {
194 Empty | Inconsistent => {},
195 Data(_) => { i += 1 }
196 }
197 }
198 drop(tx);
85aaf69f 199 for _ in 0..nthreads {
1a4d82fc
JJ
200 rx.recv().unwrap();
201 }
202 }
203}