]>
Commit | Line | Data |
---|---|---|
7cac9316 XL |
1 | // Copyright 2017 The Rust Project Developers. See the COPYRIGHT |
2 | // file at the top-level directory of this distribution and at | |
3 | // http://rust-lang.org/COPYRIGHT. | |
4 | // | |
5 | // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or | |
6 | // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license | |
7 | // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your | |
8 | // option. This file may not be copied, modified, or distributed | |
9 | // except according to those terms. | |
1a4d82fc JJ |
10 | |
11 | //! A single-producer single-consumer concurrent queue | |
12 | //! | |
13 | //! This module contains the implementation of an SPSC queue which can be used | |
bd371182 | 14 | //! concurrently between two threads. This data structure is safe to use and |
1a4d82fc JJ |
15 | //! enforces the semantics that there is one pusher and one popper. |
16 | ||
7cac9316 XL |
17 | // http://www.1024cores.net/home/lock-free-algorithms/queues/unbounded-spsc-queue |
18 | ||
1a4d82fc | 19 | use alloc::boxed::Box; |
85aaf69f | 20 | use core::ptr; |
1a4d82fc JJ |
21 | use core::cell::UnsafeCell; |
22 | ||
85aaf69f | 23 | use sync::atomic::{AtomicPtr, AtomicUsize, Ordering}; |
1a4d82fc JJ |
24 | |
25 | // Node within the linked list queue of messages to send | |
26 | struct Node<T> { | |
27 | // FIXME: this could be an uninitialized T if we're careful enough, and | |
28 | // that would reduce memory usage (and be a bit faster). | |
29 | // is it worth it? | |
30 | value: Option<T>, // nullable for re-use of nodes | |
31 | next: AtomicPtr<Node<T>>, // next node in the queue | |
32 | } | |
33 | ||
34 | /// The single-producer single-consumer queue. This structure is not cloneable, | |
35 | /// but it can be safely shared in an Arc if it is guaranteed that there | |
36 | /// is only one popper and one pusher touching the queue at any one point in | |
37 | /// time. | |
38 | pub struct Queue<T> { | |
39 | // consumer fields | |
40 | tail: UnsafeCell<*mut Node<T>>, // where to pop from | |
41 | tail_prev: AtomicPtr<Node<T>>, // where to pop from | |
42 | ||
43 | // producer fields | |
44 | head: UnsafeCell<*mut Node<T>>, // where to push to | |
45 | first: UnsafeCell<*mut Node<T>>, // where to get new nodes from | |
46 | tail_copy: UnsafeCell<*mut Node<T>>, // between first/tail | |
47 | ||
48 | // Cache maintenance fields. Additions and subtractions are stored | |
49 | // separately in order to allow them to use nonatomic addition/subtraction. | |
c34b1796 | 50 | cache_bound: usize, |
85aaf69f SL |
51 | cache_additions: AtomicUsize, |
52 | cache_subtractions: AtomicUsize, | |
1a4d82fc JJ |
53 | } |
54 | ||
c34b1796 | 55 | unsafe impl<T: Send> Send for Queue<T> { } |
1a4d82fc | 56 | |
c34b1796 | 57 | unsafe impl<T: Send> Sync for Queue<T> { } |
1a4d82fc | 58 | |
c34b1796 | 59 | impl<T> Node<T> { |
1a4d82fc | 60 | fn new() -> *mut Node<T> { |
62682a34 SL |
61 | Box::into_raw(box Node { |
62 | value: None, | |
63 | next: AtomicPtr::new(ptr::null_mut::<Node<T>>()), | |
64 | }) | |
1a4d82fc JJ |
65 | } |
66 | } | |
67 | ||
c34b1796 | 68 | impl<T> Queue<T> { |
1a4d82fc JJ |
69 | /// Creates a new queue. |
70 | /// | |
71 | /// This is unsafe as the type system doesn't enforce a single | |
72 | /// consumer-producer relationship. It also allows the consumer to `pop` | |
73 | /// items while there is a `peek` active due to all methods having a | |
74 | /// non-mutable receiver. | |
75 | /// | |
76 | /// # Arguments | |
77 | /// | |
78 | /// * `bound` - This queue implementation is implemented with a linked | |
79 | /// list, and this means that a push is always a malloc. In | |
80 | /// order to amortize this cost, an internal cache of nodes is | |
81 | /// maintained to prevent a malloc from always being | |
82 | /// necessary. This bound is the limit on the size of the | |
83 | /// cache (if desired). If the value is 0, then the cache has | |
84 | /// no bound. Otherwise, the cache will never grow larger than | |
85 | /// `bound` (although the queue itself could be much larger. | |
c34b1796 | 86 | pub unsafe fn new(bound: usize) -> Queue<T> { |
1a4d82fc JJ |
87 | let n1 = Node::new(); |
88 | let n2 = Node::new(); | |
89 | (*n1).next.store(n2, Ordering::Relaxed); | |
90 | Queue { | |
91 | tail: UnsafeCell::new(n2), | |
92 | tail_prev: AtomicPtr::new(n1), | |
93 | head: UnsafeCell::new(n2), | |
94 | first: UnsafeCell::new(n1), | |
95 | tail_copy: UnsafeCell::new(n1), | |
96 | cache_bound: bound, | |
85aaf69f SL |
97 | cache_additions: AtomicUsize::new(0), |
98 | cache_subtractions: AtomicUsize::new(0), | |
1a4d82fc JJ |
99 | } |
100 | } | |
101 | ||
102 | /// Pushes a new value onto this queue. Note that to use this function | |
103 | /// safely, it must be externally guaranteed that there is only one pusher. | |
104 | pub fn push(&self, t: T) { | |
105 | unsafe { | |
106 | // Acquire a node (which either uses a cached one or allocates a new | |
107 | // one), and then append this to the 'head' node. | |
108 | let n = self.alloc(); | |
109 | assert!((*n).value.is_none()); | |
110 | (*n).value = Some(t); | |
85aaf69f | 111 | (*n).next.store(ptr::null_mut(), Ordering::Relaxed); |
1a4d82fc JJ |
112 | (**self.head.get()).next.store(n, Ordering::Release); |
113 | *self.head.get() = n; | |
114 | } | |
115 | } | |
116 | ||
117 | unsafe fn alloc(&self) -> *mut Node<T> { | |
118 | // First try to see if we can consume the 'first' node for our uses. | |
119 | // We try to avoid as many atomic instructions as possible here, so | |
120 | // the addition to cache_subtractions is not atomic (plus we're the | |
121 | // only one subtracting from the cache). | |
122 | if *self.first.get() != *self.tail_copy.get() { | |
123 | if self.cache_bound > 0 { | |
124 | let b = self.cache_subtractions.load(Ordering::Relaxed); | |
125 | self.cache_subtractions.store(b + 1, Ordering::Relaxed); | |
126 | } | |
127 | let ret = *self.first.get(); | |
128 | *self.first.get() = (*ret).next.load(Ordering::Relaxed); | |
129 | return ret; | |
130 | } | |
131 | // If the above fails, then update our copy of the tail and try | |
132 | // again. | |
133 | *self.tail_copy.get() = self.tail_prev.load(Ordering::Acquire); | |
134 | if *self.first.get() != *self.tail_copy.get() { | |
135 | if self.cache_bound > 0 { | |
136 | let b = self.cache_subtractions.load(Ordering::Relaxed); | |
137 | self.cache_subtractions.store(b + 1, Ordering::Relaxed); | |
138 | } | |
139 | let ret = *self.first.get(); | |
140 | *self.first.get() = (*ret).next.load(Ordering::Relaxed); | |
141 | return ret; | |
142 | } | |
143 | // If all of that fails, then we have to allocate a new node | |
144 | // (there's nothing in the node cache). | |
145 | Node::new() | |
146 | } | |
147 | ||
148 | /// Attempts to pop a value from this queue. Remember that to use this type | |
149 | /// safely you must ensure that there is only one popper at a time. | |
150 | pub fn pop(&self) -> Option<T> { | |
151 | unsafe { | |
152 | // The `tail` node is not actually a used node, but rather a | |
153 | // sentinel from where we should start popping from. Hence, look at | |
154 | // tail's next field and see if we can use it. If we do a pop, then | |
155 | // the current tail node is a candidate for going into the cache. | |
156 | let tail = *self.tail.get(); | |
157 | let next = (*tail).next.load(Ordering::Acquire); | |
158 | if next.is_null() { return None } | |
159 | assert!((*next).value.is_some()); | |
160 | let ret = (*next).value.take(); | |
161 | ||
162 | *self.tail.get() = next; | |
163 | if self.cache_bound == 0 { | |
164 | self.tail_prev.store(tail, Ordering::Release); | |
165 | } else { | |
166 | // FIXME: this is dubious with overflow. | |
167 | let additions = self.cache_additions.load(Ordering::Relaxed); | |
168 | let subtractions = self.cache_subtractions.load(Ordering::Relaxed); | |
169 | let size = additions - subtractions; | |
170 | ||
171 | if size < self.cache_bound { | |
172 | self.tail_prev.store(tail, Ordering::Release); | |
173 | self.cache_additions.store(additions + 1, Ordering::Relaxed); | |
174 | } else { | |
175 | (*self.tail_prev.load(Ordering::Relaxed)) | |
176 | .next.store(next, Ordering::Relaxed); | |
177 | // We have successfully erased all references to 'tail', so | |
178 | // now we can safely drop it. | |
c34b1796 | 179 | let _: Box<Node<T>> = Box::from_raw(tail); |
1a4d82fc JJ |
180 | } |
181 | } | |
e9174d1e | 182 | ret |
1a4d82fc JJ |
183 | } |
184 | } | |
185 | ||
186 | /// Attempts to peek at the head of the queue, returning `None` if the queue | |
187 | /// has no data currently | |
188 | /// | |
189 | /// # Warning | |
190 | /// The reference returned is invalid if it is not used before the consumer | |
191 | /// pops the value off the queue. If the producer then pushes another value | |
192 | /// onto the queue, it will overwrite the value pointed to by the reference. | |
e9174d1e | 193 | pub fn peek(&self) -> Option<&mut T> { |
1a4d82fc JJ |
194 | // This is essentially the same as above with all the popping bits |
195 | // stripped out. | |
196 | unsafe { | |
197 | let tail = *self.tail.get(); | |
198 | let next = (*tail).next.load(Ordering::Acquire); | |
e9174d1e | 199 | if next.is_null() { None } else { (*next).value.as_mut() } |
1a4d82fc JJ |
200 | } |
201 | } | |
202 | } | |
203 | ||
c34b1796 | 204 | impl<T> Drop for Queue<T> { |
1a4d82fc JJ |
205 | fn drop(&mut self) { |
206 | unsafe { | |
207 | let mut cur = *self.first.get(); | |
208 | while !cur.is_null() { | |
209 | let next = (*cur).next.load(Ordering::Relaxed); | |
c34b1796 | 210 | let _n: Box<Node<T>> = Box::from_raw(cur); |
1a4d82fc JJ |
211 | cur = next; |
212 | } | |
213 | } | |
214 | } | |
215 | } | |
216 | ||
c30ab7b3 | 217 | #[cfg(all(test, not(target_os = "emscripten")))] |
d9579d0f | 218 | mod tests { |
1a4d82fc JJ |
219 | use sync::Arc; |
220 | use super::Queue; | |
85aaf69f | 221 | use thread; |
1a4d82fc JJ |
222 | use sync::mpsc::channel; |
223 | ||
224 | #[test] | |
225 | fn smoke() { | |
226 | unsafe { | |
227 | let queue = Queue::new(0); | |
85aaf69f | 228 | queue.push(1); |
1a4d82fc | 229 | queue.push(2); |
85aaf69f | 230 | assert_eq!(queue.pop(), Some(1)); |
1a4d82fc JJ |
231 | assert_eq!(queue.pop(), Some(2)); |
232 | assert_eq!(queue.pop(), None); | |
233 | queue.push(3); | |
234 | queue.push(4); | |
235 | assert_eq!(queue.pop(), Some(3)); | |
236 | assert_eq!(queue.pop(), Some(4)); | |
237 | assert_eq!(queue.pop(), None); | |
238 | } | |
239 | } | |
240 | ||
241 | #[test] | |
242 | fn peek() { | |
243 | unsafe { | |
244 | let queue = Queue::new(0); | |
85aaf69f | 245 | queue.push(vec![1]); |
1a4d82fc JJ |
246 | |
247 | // Ensure the borrowchecker works | |
248 | match queue.peek() { | |
3157f602 XL |
249 | Some(vec) => { |
250 | assert_eq!(&*vec, &[1]); | |
1a4d82fc JJ |
251 | }, |
252 | None => unreachable!() | |
253 | } | |
254 | ||
3157f602 XL |
255 | match queue.pop() { |
256 | Some(vec) => { | |
257 | assert_eq!(&*vec, &[1]); | |
258 | }, | |
259 | None => unreachable!() | |
260 | } | |
1a4d82fc JJ |
261 | } |
262 | } | |
263 | ||
264 | #[test] | |
265 | fn drop_full() { | |
266 | unsafe { | |
c34b1796 | 267 | let q: Queue<Box<_>> = Queue::new(0); |
85aaf69f SL |
268 | q.push(box 1); |
269 | q.push(box 2); | |
1a4d82fc JJ |
270 | } |
271 | } | |
272 | ||
273 | #[test] | |
274 | fn smoke_bound() { | |
275 | unsafe { | |
276 | let q = Queue::new(0); | |
85aaf69f | 277 | q.push(1); |
1a4d82fc JJ |
278 | q.push(2); |
279 | assert_eq!(q.pop(), Some(1)); | |
280 | assert_eq!(q.pop(), Some(2)); | |
281 | assert_eq!(q.pop(), None); | |
282 | q.push(3); | |
283 | q.push(4); | |
284 | assert_eq!(q.pop(), Some(3)); | |
285 | assert_eq!(q.pop(), Some(4)); | |
286 | assert_eq!(q.pop(), None); | |
287 | } | |
288 | } | |
289 | ||
290 | #[test] | |
291 | fn stress() { | |
292 | unsafe { | |
293 | stress_bound(0); | |
294 | stress_bound(1); | |
295 | } | |
296 | ||
c34b1796 | 297 | unsafe fn stress_bound(bound: usize) { |
1a4d82fc JJ |
298 | let q = Arc::new(Queue::new(bound)); |
299 | ||
300 | let (tx, rx) = channel(); | |
301 | let q2 = q.clone(); | |
85aaf69f SL |
302 | let _t = thread::spawn(move|| { |
303 | for _ in 0..100000 { | |
1a4d82fc JJ |
304 | loop { |
305 | match q2.pop() { | |
85aaf69f | 306 | Some(1) => break, |
1a4d82fc JJ |
307 | Some(_) => panic!(), |
308 | None => {} | |
309 | } | |
310 | } | |
311 | } | |
312 | tx.send(()).unwrap(); | |
313 | }); | |
85aaf69f | 314 | for _ in 0..100000 { |
1a4d82fc JJ |
315 | q.push(1); |
316 | } | |
317 | rx.recv().unwrap(); | |
318 | } | |
319 | } | |
320 | } |