]>
Commit | Line | Data |
---|---|---|
1a4d82fc JJ |
1 | /* Copyright (c) 2010-2011 Dmitry Vyukov. All rights reserved. |
2 | * Redistribution and use in source and binary forms, with or without | |
3 | * modification, are permitted provided that the following conditions are met: | |
4 | * | |
5 | * 1. Redistributions of source code must retain the above copyright notice, | |
6 | * this list of conditions and the following disclaimer. | |
7 | * | |
8 | * 2. Redistributions in binary form must reproduce the above copyright | |
9 | * notice, this list of conditions and the following disclaimer in the | |
10 | * documentation and/or other materials provided with the distribution. | |
11 | * | |
12 | * THIS SOFTWARE IS PROVIDED BY DMITRY VYUKOV "AS IS" AND ANY EXPRESS OR IMPLIED | |
13 | * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF | |
14 | * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT | |
15 | * SHALL DMITRY VYUKOV OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, | |
16 | * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT | |
17 | * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR | |
18 | * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF | |
19 | * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE | |
20 | * OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF | |
21 | * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. | |
22 | * | |
23 | * The views and conclusions contained in the software and documentation are | |
24 | * those of the authors and should not be interpreted as representing official | |
25 | * policies, either expressed or implied, of Dmitry Vyukov. | |
26 | */ | |
27 | ||
28 | //! A mostly lock-free multi-producer, single consumer queue. | |
29 | //! | |
30 | //! This module contains an implementation of a concurrent MPSC queue. This | |
bd371182 | 31 | //! queue can be used to share data between threads, and is also used as the |
1a4d82fc JJ |
32 | //! building block of channels in rust. |
33 | //! | |
34 | //! Note that the current implementation of this queue has a caveat of the `pop` | |
35 | //! method, and see the method for more information about it. Due to this | |
36 | //! caveat, this queue may not be appropriate for all use-cases. | |
37 | ||
1a4d82fc JJ |
38 | // http://www.1024cores.net/home/lock-free-algorithms |
39 | // /queues/non-intrusive-mpsc-node-based-queue | |
40 | ||
41 | pub use self::PopResult::*; | |
42 | ||
43 | use core::prelude::*; | |
44 | ||
45 | use alloc::boxed::Box; | |
85aaf69f | 46 | use core::ptr; |
1a4d82fc JJ |
47 | use core::cell::UnsafeCell; |
48 | ||
49 | use sync::atomic::{AtomicPtr, Ordering}; | |
50 | ||
51 | /// A result of the `pop` function. | |
52 | pub enum PopResult<T> { | |
53 | /// Some data has been popped | |
54 | Data(T), | |
55 | /// The queue is empty | |
56 | Empty, | |
57 | /// The queue is in an inconsistent state. Popping data should succeed, but | |
58 | /// some pushers have yet to make enough progress in order allow a pop to | |
59 | /// succeed. It is recommended that a pop() occur "in the near future" in | |
60 | /// order to see if the sender has made progress or not | |
61 | Inconsistent, | |
62 | } | |
63 | ||
64 | struct Node<T> { | |
65 | next: AtomicPtr<Node<T>>, | |
66 | value: Option<T>, | |
67 | } | |
68 | ||
69 | /// The multi-producer single-consumer structure. This is not cloneable, but it | |
70 | /// may be safely shared so long as it is guaranteed that there is only one | |
71 | /// popper at a time (many pushers are allowed). | |
72 | pub struct Queue<T> { | |
73 | head: AtomicPtr<Node<T>>, | |
74 | tail: UnsafeCell<*mut Node<T>>, | |
75 | } | |
76 | ||
c34b1796 AL |
77 | unsafe impl<T: Send> Send for Queue<T> { } |
78 | unsafe impl<T: Send> Sync for Queue<T> { } | |
1a4d82fc JJ |
79 | |
80 | impl<T> Node<T> { | |
81 | unsafe fn new(v: Option<T>) -> *mut Node<T> { | |
62682a34 | 82 | Box::into_raw(box Node { |
85aaf69f | 83 | next: AtomicPtr::new(ptr::null_mut()), |
1a4d82fc JJ |
84 | value: v, |
85 | }) | |
86 | } | |
87 | } | |
88 | ||
c34b1796 | 89 | impl<T> Queue<T> { |
1a4d82fc JJ |
90 | /// Creates a new queue that is safe to share among multiple producers and |
91 | /// one consumer. | |
92 | pub fn new() -> Queue<T> { | |
93 | let stub = unsafe { Node::new(None) }; | |
94 | Queue { | |
95 | head: AtomicPtr::new(stub), | |
96 | tail: UnsafeCell::new(stub), | |
97 | } | |
98 | } | |
99 | ||
100 | /// Pushes a new value onto this queue. | |
101 | pub fn push(&self, t: T) { | |
102 | unsafe { | |
103 | let n = Node::new(Some(t)); | |
104 | let prev = self.head.swap(n, Ordering::AcqRel); | |
105 | (*prev).next.store(n, Ordering::Release); | |
106 | } | |
107 | } | |
108 | ||
109 | /// Pops some data from this queue. | |
110 | /// | |
111 | /// Note that the current implementation means that this function cannot | |
112 | /// return `Option<T>`. It is possible for this queue to be in an | |
113 | /// inconsistent state where many pushes have succeeded and completely | |
114 | /// finished, but pops cannot return `Some(t)`. This inconsistent state | |
115 | /// happens when a pusher is pre-empted at an inopportune moment. | |
116 | /// | |
117 | /// This inconsistent state means that this queue does indeed have data, but | |
118 | /// it does not currently have access to it at this time. | |
119 | pub fn pop(&self) -> PopResult<T> { | |
120 | unsafe { | |
121 | let tail = *self.tail.get(); | |
122 | let next = (*tail).next.load(Ordering::Acquire); | |
123 | ||
124 | if !next.is_null() { | |
125 | *self.tail.get() = next; | |
126 | assert!((*tail).value.is_none()); | |
127 | assert!((*next).value.is_some()); | |
128 | let ret = (*next).value.take().unwrap(); | |
c34b1796 | 129 | let _: Box<Node<T>> = Box::from_raw(tail); |
1a4d82fc JJ |
130 | return Data(ret); |
131 | } | |
132 | ||
133 | if self.head.load(Ordering::Acquire) == tail {Empty} else {Inconsistent} | |
134 | } | |
135 | } | |
136 | } | |
137 | ||
85aaf69f | 138 | #[stable(feature = "rust1", since = "1.0.0")] |
c34b1796 | 139 | impl<T> Drop for Queue<T> { |
1a4d82fc JJ |
140 | fn drop(&mut self) { |
141 | unsafe { | |
142 | let mut cur = *self.tail.get(); | |
143 | while !cur.is_null() { | |
144 | let next = (*cur).next.load(Ordering::Relaxed); | |
c34b1796 | 145 | let _: Box<Node<T>> = Box::from_raw(cur); |
1a4d82fc JJ |
146 | cur = next; |
147 | } | |
148 | } | |
149 | } | |
150 | } | |
151 | ||
152 | #[cfg(test)] | |
153 | mod tests { | |
154 | use prelude::v1::*; | |
155 | ||
156 | use sync::mpsc::channel; | |
157 | use super::{Queue, Data, Empty, Inconsistent}; | |
158 | use sync::Arc; | |
85aaf69f | 159 | use thread; |
1a4d82fc JJ |
160 | |
161 | #[test] | |
162 | fn test_full() { | |
c34b1796 | 163 | let q: Queue<Box<_>> = Queue::new(); |
85aaf69f SL |
164 | q.push(box 1); |
165 | q.push(box 2); | |
1a4d82fc JJ |
166 | } |
167 | ||
168 | #[test] | |
169 | fn test() { | |
85aaf69f SL |
170 | let nthreads = 8; |
171 | let nmsgs = 1000; | |
1a4d82fc JJ |
172 | let q = Queue::new(); |
173 | match q.pop() { | |
174 | Empty => {} | |
175 | Inconsistent | Data(..) => panic!() | |
176 | } | |
177 | let (tx, rx) = channel(); | |
178 | let q = Arc::new(q); | |
179 | ||
85aaf69f | 180 | for _ in 0..nthreads { |
1a4d82fc JJ |
181 | let tx = tx.clone(); |
182 | let q = q.clone(); | |
85aaf69f SL |
183 | thread::spawn(move|| { |
184 | for i in 0..nmsgs { | |
1a4d82fc JJ |
185 | q.push(i); |
186 | } | |
187 | tx.send(()).unwrap(); | |
188 | }); | |
189 | } | |
190 | ||
85aaf69f | 191 | let mut i = 0; |
1a4d82fc JJ |
192 | while i < nthreads * nmsgs { |
193 | match q.pop() { | |
194 | Empty | Inconsistent => {}, | |
195 | Data(_) => { i += 1 } | |
196 | } | |
197 | } | |
198 | drop(tx); | |
85aaf69f | 199 | for _ in 0..nthreads { |
1a4d82fc JJ |
200 | rx.recv().unwrap(); |
201 | } | |
202 | } | |
203 | } |