]>
Commit | Line | Data |
---|---|---|
7cac9316 XL |
1 | // Copyright 2017 The Rust Project Developers. See the COPYRIGHT |
2 | // file at the top-level directory of this distribution and at | |
3 | // http://rust-lang.org/COPYRIGHT. | |
4 | // | |
5 | // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or | |
6 | // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license | |
7 | // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your | |
8 | // option. This file may not be copied, modified, or distributed | |
9 | // except according to those terms. | |
1a4d82fc JJ |
10 | |
11 | //! A mostly lock-free multi-producer, single consumer queue. | |
12 | //! | |
13 | //! This module contains an implementation of a concurrent MPSC queue. This | |
bd371182 | 14 | //! queue can be used to share data between threads, and is also used as the |
1a4d82fc JJ |
15 | //! building block of channels in rust. |
16 | //! | |
17 | //! Note that the current implementation of this queue has a caveat of the `pop` | |
18 | //! method, and see the method for more information about it. Due to this | |
19 | //! caveat, this queue may not be appropriate for all use-cases. | |
20 | ||
1a4d82fc JJ |
21 | // http://www.1024cores.net/home/lock-free-algorithms |
22 | // /queues/non-intrusive-mpsc-node-based-queue | |
23 | ||
24 | pub use self::PopResult::*; | |
25 | ||
1a4d82fc | 26 | use alloc::boxed::Box; |
85aaf69f | 27 | use core::ptr; |
1a4d82fc JJ |
28 | use core::cell::UnsafeCell; |
29 | ||
30 | use sync::atomic::{AtomicPtr, Ordering}; | |
31 | ||
32 | /// A result of the `pop` function. | |
33 | pub enum PopResult<T> { | |
34 | /// Some data has been popped | |
35 | Data(T), | |
36 | /// The queue is empty | |
37 | Empty, | |
38 | /// The queue is in an inconsistent state. Popping data should succeed, but | |
39 | /// some pushers have yet to make enough progress in order allow a pop to | |
40 | /// succeed. It is recommended that a pop() occur "in the near future" in | |
41 | /// order to see if the sender has made progress or not | |
42 | Inconsistent, | |
43 | } | |
44 | ||
45 | struct Node<T> { | |
46 | next: AtomicPtr<Node<T>>, | |
47 | value: Option<T>, | |
48 | } | |
49 | ||
50 | /// The multi-producer single-consumer structure. This is not cloneable, but it | |
51 | /// may be safely shared so long as it is guaranteed that there is only one | |
52 | /// popper at a time (many pushers are allowed). | |
53 | pub struct Queue<T> { | |
54 | head: AtomicPtr<Node<T>>, | |
55 | tail: UnsafeCell<*mut Node<T>>, | |
56 | } | |
57 | ||
c34b1796 AL |
58 | unsafe impl<T: Send> Send for Queue<T> { } |
59 | unsafe impl<T: Send> Sync for Queue<T> { } | |
1a4d82fc JJ |
60 | |
61 | impl<T> Node<T> { | |
62 | unsafe fn new(v: Option<T>) -> *mut Node<T> { | |
62682a34 | 63 | Box::into_raw(box Node { |
85aaf69f | 64 | next: AtomicPtr::new(ptr::null_mut()), |
1a4d82fc JJ |
65 | value: v, |
66 | }) | |
67 | } | |
68 | } | |
69 | ||
c34b1796 | 70 | impl<T> Queue<T> { |
1a4d82fc JJ |
71 | /// Creates a new queue that is safe to share among multiple producers and |
72 | /// one consumer. | |
73 | pub fn new() -> Queue<T> { | |
74 | let stub = unsafe { Node::new(None) }; | |
75 | Queue { | |
76 | head: AtomicPtr::new(stub), | |
77 | tail: UnsafeCell::new(stub), | |
78 | } | |
79 | } | |
80 | ||
81 | /// Pushes a new value onto this queue. | |
82 | pub fn push(&self, t: T) { | |
83 | unsafe { | |
84 | let n = Node::new(Some(t)); | |
85 | let prev = self.head.swap(n, Ordering::AcqRel); | |
86 | (*prev).next.store(n, Ordering::Release); | |
87 | } | |
88 | } | |
89 | ||
90 | /// Pops some data from this queue. | |
91 | /// | |
92 | /// Note that the current implementation means that this function cannot | |
93 | /// return `Option<T>`. It is possible for this queue to be in an | |
94 | /// inconsistent state where many pushes have succeeded and completely | |
95 | /// finished, but pops cannot return `Some(t)`. This inconsistent state | |
96 | /// happens when a pusher is pre-empted at an inopportune moment. | |
97 | /// | |
98 | /// This inconsistent state means that this queue does indeed have data, but | |
99 | /// it does not currently have access to it at this time. | |
100 | pub fn pop(&self) -> PopResult<T> { | |
101 | unsafe { | |
102 | let tail = *self.tail.get(); | |
103 | let next = (*tail).next.load(Ordering::Acquire); | |
104 | ||
105 | if !next.is_null() { | |
106 | *self.tail.get() = next; | |
107 | assert!((*tail).value.is_none()); | |
108 | assert!((*next).value.is_some()); | |
109 | let ret = (*next).value.take().unwrap(); | |
c34b1796 | 110 | let _: Box<Node<T>> = Box::from_raw(tail); |
1a4d82fc JJ |
111 | return Data(ret); |
112 | } | |
113 | ||
114 | if self.head.load(Ordering::Acquire) == tail {Empty} else {Inconsistent} | |
115 | } | |
116 | } | |
117 | } | |
118 | ||
c34b1796 | 119 | impl<T> Drop for Queue<T> { |
1a4d82fc JJ |
120 | fn drop(&mut self) { |
121 | unsafe { | |
122 | let mut cur = *self.tail.get(); | |
123 | while !cur.is_null() { | |
124 | let next = (*cur).next.load(Ordering::Relaxed); | |
c34b1796 | 125 | let _: Box<Node<T>> = Box::from_raw(cur); |
1a4d82fc JJ |
126 | cur = next; |
127 | } | |
128 | } | |
129 | } | |
130 | } | |
131 | ||
c30ab7b3 | 132 | #[cfg(all(test, not(target_os = "emscripten")))] |
1a4d82fc | 133 | mod tests { |
1a4d82fc JJ |
134 | use sync::mpsc::channel; |
135 | use super::{Queue, Data, Empty, Inconsistent}; | |
136 | use sync::Arc; | |
85aaf69f | 137 | use thread; |
1a4d82fc JJ |
138 | |
139 | #[test] | |
140 | fn test_full() { | |
c34b1796 | 141 | let q: Queue<Box<_>> = Queue::new(); |
85aaf69f SL |
142 | q.push(box 1); |
143 | q.push(box 2); | |
1a4d82fc JJ |
144 | } |
145 | ||
146 | #[test] | |
147 | fn test() { | |
85aaf69f SL |
148 | let nthreads = 8; |
149 | let nmsgs = 1000; | |
1a4d82fc JJ |
150 | let q = Queue::new(); |
151 | match q.pop() { | |
152 | Empty => {} | |
153 | Inconsistent | Data(..) => panic!() | |
154 | } | |
155 | let (tx, rx) = channel(); | |
156 | let q = Arc::new(q); | |
157 | ||
85aaf69f | 158 | for _ in 0..nthreads { |
1a4d82fc JJ |
159 | let tx = tx.clone(); |
160 | let q = q.clone(); | |
85aaf69f SL |
161 | thread::spawn(move|| { |
162 | for i in 0..nmsgs { | |
1a4d82fc JJ |
163 | q.push(i); |
164 | } | |
165 | tx.send(()).unwrap(); | |
166 | }); | |
167 | } | |
168 | ||
85aaf69f | 169 | let mut i = 0; |
1a4d82fc JJ |
170 | while i < nthreads * nmsgs { |
171 | match q.pop() { | |
172 | Empty | Inconsistent => {}, | |
173 | Data(_) => { i += 1 } | |
174 | } | |
175 | } | |
176 | drop(tx); | |
85aaf69f | 177 | for _ in 0..nthreads { |
1a4d82fc JJ |
178 | rx.recv().unwrap(); |
179 | } | |
180 | } | |
181 | } |