]>
Commit | Line | Data |
---|---|---|
1a4d82fc JJ |
1 | //! A single-producer single-consumer concurrent queue |
2 | //! | |
3 | //! This module contains the implementation of an SPSC queue which can be used | |
bd371182 | 4 | //! concurrently between two threads. This data structure is safe to use and |
1a4d82fc JJ |
5 | //! enforces the semantics that there is one pusher and one popper. |
6 | ||
7cac9316 XL |
7 | // http://www.1024cores.net/home/lock-free-algorithms/queues/unbounded-spsc-queue |
8 | ||
1a4d82fc | 9 | use core::cell::UnsafeCell; |
60c5eb7d | 10 | use core::ptr; |
1a4d82fc | 11 | |
532ac7d7 XL |
12 | use crate::boxed::Box; |
13 | use crate::sync::atomic::{AtomicPtr, AtomicUsize, Ordering}; | |
1a4d82fc | 14 | |
abe05a73 XL |
15 | use super::cache_aligned::CacheAligned; |
16 | ||
1a4d82fc JJ |
17 | // Node within the linked list queue of messages to send |
18 | struct Node<T> { | |
19 | // FIXME: this could be an uninitialized T if we're careful enough, and | |
20 | // that would reduce memory usage (and be a bit faster). | |
21 | // is it worth it? | |
60c5eb7d XL |
22 | value: Option<T>, // nullable for re-use of nodes |
23 | cached: bool, // This node goes into the node cache | |
24 | next: AtomicPtr<Node<T>>, // next node in the queue | |
1a4d82fc JJ |
25 | } |
26 | ||
27 | /// The single-producer single-consumer queue. This structure is not cloneable, | |
28 | /// but it can be safely shared in an Arc if it is guaranteed that there | |
29 | /// is only one popper and one pusher touching the queue at any one point in | |
30 | /// time. | |
60c5eb7d | 31 | pub struct Queue<T, ProducerAddition = (), ConsumerAddition = ()> { |
1a4d82fc | 32 | // consumer fields |
abe05a73 XL |
33 | consumer: CacheAligned<Consumer<T, ConsumerAddition>>, |
34 | ||
35 | // producer fields | |
36 | producer: CacheAligned<Producer<T, ProducerAddition>>, | |
37 | } | |
38 | ||
39 | struct Consumer<T, Addition> { | |
1a4d82fc | 40 | tail: UnsafeCell<*mut Node<T>>, // where to pop from |
60c5eb7d XL |
41 | tail_prev: AtomicPtr<Node<T>>, // where to pop from |
42 | cache_bound: usize, // maximum cache size | |
43 | cached_nodes: AtomicUsize, // number of nodes marked as cachable | |
abe05a73 XL |
44 | addition: Addition, |
45 | } | |
1a4d82fc | 46 | |
abe05a73 | 47 | struct Producer<T, Addition> { |
1a4d82fc JJ |
48 | head: UnsafeCell<*mut Node<T>>, // where to push to |
49 | first: UnsafeCell<*mut Node<T>>, // where to get new nodes from | |
50 | tail_copy: UnsafeCell<*mut Node<T>>, // between first/tail | |
abe05a73 | 51 | addition: Addition, |
1a4d82fc JJ |
52 | } |
53 | ||
60c5eb7d | 54 | unsafe impl<T: Send, P: Send + Sync, C: Send + Sync> Send for Queue<T, P, C> {} |
1a4d82fc | 55 | |
60c5eb7d | 56 | unsafe impl<T: Send, P: Send + Sync, C: Send + Sync> Sync for Queue<T, P, C> {} |
1a4d82fc | 57 | |
c34b1796 | 58 | impl<T> Node<T> { |
1a4d82fc | 59 | fn new() -> *mut Node<T> { |
62682a34 SL |
60 | Box::into_raw(box Node { |
61 | value: None, | |
abe05a73 | 62 | cached: false, |
62682a34 SL |
63 | next: AtomicPtr::new(ptr::null_mut::<Node<T>>()), |
64 | }) | |
1a4d82fc JJ |
65 | } |
66 | } | |
67 | ||
abe05a73 | 68 | impl<T, ProducerAddition, ConsumerAddition> Queue<T, ProducerAddition, ConsumerAddition> { |
abe05a73 XL |
69 | /// Creates a new queue. With given additional elements in the producer and |
70 | /// consumer portions of the queue. | |
71 | /// | |
72 | /// Due to the performance implications of cache-contention, | |
73 | /// we wish to keep fields used mainly by the producer on a separate cache | |
74 | /// line than those used by the consumer. | |
75 | /// Since cache lines are usually 64 bytes, it is unreasonably expensive to | |
76 | /// allocate one for small fields, so we allow users to insert additional | |
77 | /// fields into the cache lines already allocated by this for the producer | |
78 | /// and consumer. | |
1a4d82fc JJ |
79 | /// |
80 | /// This is unsafe as the type system doesn't enforce a single | |
81 | /// consumer-producer relationship. It also allows the consumer to `pop` | |
82 | /// items while there is a `peek` active due to all methods having a | |
83 | /// non-mutable receiver. | |
84 | /// | |
85 | /// # Arguments | |
86 | /// | |
87 | /// * `bound` - This queue implementation is implemented with a linked | |
88 | /// list, and this means that a push is always a malloc. In | |
89 | /// order to amortize this cost, an internal cache of nodes is | |
90 | /// maintained to prevent a malloc from always being | |
91 | /// necessary. This bound is the limit on the size of the | |
92 | /// cache (if desired). If the value is 0, then the cache has | |
93 | /// no bound. Otherwise, the cache will never grow larger than | |
94 | /// `bound` (although the queue itself could be much larger. | |
abe05a73 XL |
95 | pub unsafe fn with_additions( |
96 | bound: usize, | |
97 | producer_addition: ProducerAddition, | |
98 | consumer_addition: ConsumerAddition, | |
99 | ) -> Self { | |
1a4d82fc JJ |
100 | let n1 = Node::new(); |
101 | let n2 = Node::new(); | |
102 | (*n1).next.store(n2, Ordering::Relaxed); | |
103 | Queue { | |
abe05a73 XL |
104 | consumer: CacheAligned::new(Consumer { |
105 | tail: UnsafeCell::new(n2), | |
106 | tail_prev: AtomicPtr::new(n1), | |
107 | cache_bound: bound, | |
108 | cached_nodes: AtomicUsize::new(0), | |
60c5eb7d | 109 | addition: consumer_addition, |
abe05a73 XL |
110 | }), |
111 | producer: CacheAligned::new(Producer { | |
112 | head: UnsafeCell::new(n2), | |
113 | first: UnsafeCell::new(n1), | |
114 | tail_copy: UnsafeCell::new(n1), | |
60c5eb7d | 115 | addition: producer_addition, |
abe05a73 | 116 | }), |
1a4d82fc JJ |
117 | } |
118 | } | |
119 | ||
120 | /// Pushes a new value onto this queue. Note that to use this function | |
121 | /// safely, it must be externally guaranteed that there is only one pusher. | |
122 | pub fn push(&self, t: T) { | |
123 | unsafe { | |
124 | // Acquire a node (which either uses a cached one or allocates a new | |
125 | // one), and then append this to the 'head' node. | |
126 | let n = self.alloc(); | |
127 | assert!((*n).value.is_none()); | |
128 | (*n).value = Some(t); | |
85aaf69f | 129 | (*n).next.store(ptr::null_mut(), Ordering::Relaxed); |
abe05a73 XL |
130 | (**self.producer.head.get()).next.store(n, Ordering::Release); |
131 | *(&self.producer.head).get() = n; | |
1a4d82fc JJ |
132 | } |
133 | } | |
134 | ||
135 | unsafe fn alloc(&self) -> *mut Node<T> { | |
136 | // First try to see if we can consume the 'first' node for our uses. | |
abe05a73 XL |
137 | if *self.producer.first.get() != *self.producer.tail_copy.get() { |
138 | let ret = *self.producer.first.get(); | |
139 | *self.producer.0.first.get() = (*ret).next.load(Ordering::Relaxed); | |
1a4d82fc JJ |
140 | return ret; |
141 | } | |
142 | // If the above fails, then update our copy of the tail and try | |
143 | // again. | |
60c5eb7d | 144 | *self.producer.0.tail_copy.get() = self.consumer.tail_prev.load(Ordering::Acquire); |
abe05a73 XL |
145 | if *self.producer.first.get() != *self.producer.tail_copy.get() { |
146 | let ret = *self.producer.first.get(); | |
147 | *self.producer.0.first.get() = (*ret).next.load(Ordering::Relaxed); | |
1a4d82fc JJ |
148 | return ret; |
149 | } | |
150 | // If all of that fails, then we have to allocate a new node | |
151 | // (there's nothing in the node cache). | |
152 | Node::new() | |
153 | } | |
154 | ||
155 | /// Attempts to pop a value from this queue. Remember that to use this type | |
156 | /// safely you must ensure that there is only one popper at a time. | |
157 | pub fn pop(&self) -> Option<T> { | |
158 | unsafe { | |
159 | // The `tail` node is not actually a used node, but rather a | |
160 | // sentinel from where we should start popping from. Hence, look at | |
161 | // tail's next field and see if we can use it. If we do a pop, then | |
162 | // the current tail node is a candidate for going into the cache. | |
abe05a73 | 163 | let tail = *self.consumer.tail.get(); |
1a4d82fc | 164 | let next = (*tail).next.load(Ordering::Acquire); |
60c5eb7d XL |
165 | if next.is_null() { |
166 | return None; | |
167 | } | |
1a4d82fc JJ |
168 | assert!((*next).value.is_some()); |
169 | let ret = (*next).value.take(); | |
170 | ||
abe05a73 XL |
171 | *self.consumer.0.tail.get() = next; |
172 | if self.consumer.cache_bound == 0 { | |
173 | self.consumer.tail_prev.store(tail, Ordering::Release); | |
1a4d82fc | 174 | } else { |
abe05a73 XL |
175 | let cached_nodes = self.consumer.cached_nodes.load(Ordering::Relaxed); |
176 | if cached_nodes < self.consumer.cache_bound && !(*tail).cached { | |
177 | self.consumer.cached_nodes.store(cached_nodes, Ordering::Relaxed); | |
178 | (*tail).cached = true; | |
179 | } | |
180 | ||
181 | if (*tail).cached { | |
182 | self.consumer.tail_prev.store(tail, Ordering::Release); | |
1a4d82fc | 183 | } else { |
abe05a73 | 184 | (*self.consumer.tail_prev.load(Ordering::Relaxed)) |
60c5eb7d XL |
185 | .next |
186 | .store(next, Ordering::Relaxed); | |
1a4d82fc JJ |
187 | // We have successfully erased all references to 'tail', so |
188 | // now we can safely drop it. | |
c34b1796 | 189 | let _: Box<Node<T>> = Box::from_raw(tail); |
1a4d82fc JJ |
190 | } |
191 | } | |
e9174d1e | 192 | ret |
1a4d82fc JJ |
193 | } |
194 | } | |
195 | ||
196 | /// Attempts to peek at the head of the queue, returning `None` if the queue | |
197 | /// has no data currently | |
198 | /// | |
199 | /// # Warning | |
200 | /// The reference returned is invalid if it is not used before the consumer | |
201 | /// pops the value off the queue. If the producer then pushes another value | |
202 | /// onto the queue, it will overwrite the value pointed to by the reference. | |
e9174d1e | 203 | pub fn peek(&self) -> Option<&mut T> { |
1a4d82fc JJ |
204 | // This is essentially the same as above with all the popping bits |
205 | // stripped out. | |
206 | unsafe { | |
abe05a73 | 207 | let tail = *self.consumer.tail.get(); |
1a4d82fc | 208 | let next = (*tail).next.load(Ordering::Acquire); |
e9174d1e | 209 | if next.is_null() { None } else { (*next).value.as_mut() } |
1a4d82fc JJ |
210 | } |
211 | } | |
abe05a73 XL |
212 | |
213 | pub fn producer_addition(&self) -> &ProducerAddition { | |
214 | &self.producer.addition | |
215 | } | |
216 | ||
217 | pub fn consumer_addition(&self) -> &ConsumerAddition { | |
218 | &self.consumer.addition | |
219 | } | |
1a4d82fc JJ |
220 | } |
221 | ||
abe05a73 | 222 | impl<T, ProducerAddition, ConsumerAddition> Drop for Queue<T, ProducerAddition, ConsumerAddition> { |
1a4d82fc JJ |
223 | fn drop(&mut self) { |
224 | unsafe { | |
abe05a73 | 225 | let mut cur = *self.producer.first.get(); |
1a4d82fc JJ |
226 | while !cur.is_null() { |
227 | let next = (*cur).next.load(Ordering::Relaxed); | |
c34b1796 | 228 | let _n: Box<Node<T>> = Box::from_raw(cur); |
1a4d82fc JJ |
229 | cur = next; |
230 | } | |
231 | } | |
232 | } | |
233 | } | |
234 | ||
c30ab7b3 | 235 | #[cfg(all(test, not(target_os = "emscripten")))] |
d9579d0f | 236 | mod tests { |
1a4d82fc | 237 | use super::Queue; |
60c5eb7d | 238 | use crate::sync::mpsc::channel; |
532ac7d7 XL |
239 | use crate::sync::Arc; |
240 | use crate::thread; | |
1a4d82fc JJ |
241 | |
242 | #[test] | |
243 | fn smoke() { | |
244 | unsafe { | |
abe05a73 | 245 | let queue = Queue::with_additions(0, (), ()); |
85aaf69f | 246 | queue.push(1); |
1a4d82fc | 247 | queue.push(2); |
85aaf69f | 248 | assert_eq!(queue.pop(), Some(1)); |
1a4d82fc JJ |
249 | assert_eq!(queue.pop(), Some(2)); |
250 | assert_eq!(queue.pop(), None); | |
251 | queue.push(3); | |
252 | queue.push(4); | |
253 | assert_eq!(queue.pop(), Some(3)); | |
254 | assert_eq!(queue.pop(), Some(4)); | |
255 | assert_eq!(queue.pop(), None); | |
256 | } | |
257 | } | |
258 | ||
259 | #[test] | |
260 | fn peek() { | |
261 | unsafe { | |
abe05a73 | 262 | let queue = Queue::with_additions(0, (), ()); |
85aaf69f | 263 | queue.push(vec![1]); |
1a4d82fc JJ |
264 | |
265 | // Ensure the borrowchecker works | |
266 | match queue.peek() { | |
3157f602 XL |
267 | Some(vec) => { |
268 | assert_eq!(&*vec, &[1]); | |
60c5eb7d XL |
269 | } |
270 | None => unreachable!(), | |
1a4d82fc JJ |
271 | } |
272 | ||
3157f602 XL |
273 | match queue.pop() { |
274 | Some(vec) => { | |
275 | assert_eq!(&*vec, &[1]); | |
60c5eb7d XL |
276 | } |
277 | None => unreachable!(), | |
3157f602 | 278 | } |
1a4d82fc JJ |
279 | } |
280 | } | |
281 | ||
282 | #[test] | |
283 | fn drop_full() { | |
284 | unsafe { | |
abe05a73 | 285 | let q: Queue<Box<_>> = Queue::with_additions(0, (), ()); |
85aaf69f SL |
286 | q.push(box 1); |
287 | q.push(box 2); | |
1a4d82fc JJ |
288 | } |
289 | } | |
290 | ||
291 | #[test] | |
292 | fn smoke_bound() { | |
293 | unsafe { | |
abe05a73 | 294 | let q = Queue::with_additions(0, (), ()); |
85aaf69f | 295 | q.push(1); |
1a4d82fc JJ |
296 | q.push(2); |
297 | assert_eq!(q.pop(), Some(1)); | |
298 | assert_eq!(q.pop(), Some(2)); | |
299 | assert_eq!(q.pop(), None); | |
300 | q.push(3); | |
301 | q.push(4); | |
302 | assert_eq!(q.pop(), Some(3)); | |
303 | assert_eq!(q.pop(), Some(4)); | |
304 | assert_eq!(q.pop(), None); | |
305 | } | |
306 | } | |
307 | ||
308 | #[test] | |
309 | fn stress() { | |
310 | unsafe { | |
311 | stress_bound(0); | |
312 | stress_bound(1); | |
313 | } | |
314 | ||
c34b1796 | 315 | unsafe fn stress_bound(bound: usize) { |
abe05a73 | 316 | let q = Arc::new(Queue::with_additions(bound, (), ())); |
1a4d82fc JJ |
317 | |
318 | let (tx, rx) = channel(); | |
319 | let q2 = q.clone(); | |
60c5eb7d | 320 | let _t = thread::spawn(move || { |
85aaf69f | 321 | for _ in 0..100000 { |
1a4d82fc JJ |
322 | loop { |
323 | match q2.pop() { | |
85aaf69f | 324 | Some(1) => break, |
1a4d82fc JJ |
325 | Some(_) => panic!(), |
326 | None => {} | |
327 | } | |
328 | } | |
329 | } | |
330 | tx.send(()).unwrap(); | |
331 | }); | |
85aaf69f | 332 | for _ in 0..100000 { |
1a4d82fc JJ |
333 | q.push(1); |
334 | } | |
335 | rx.recv().unwrap(); | |
336 | } | |
337 | } | |
338 | } |