]>
Commit | Line | Data |
---|---|---|
1a4d82fc JJ |
1 | //! A single-producer single-consumer concurrent queue |
2 | //! | |
3 | //! This module contains the implementation of an SPSC queue which can be used | |
bd371182 | 4 | //! concurrently between two threads. This data structure is safe to use and |
1a4d82fc JJ |
5 | //! enforces the semantics that there is one pusher and one popper. |
6 | ||
7cac9316 XL |
7 | // http://www.1024cores.net/home/lock-free-algorithms/queues/unbounded-spsc-queue |
8 | ||
1b1a35ee XL |
9 | #[cfg(all(test, not(target_os = "emscripten")))] |
10 | mod tests; | |
11 | ||
1a4d82fc | 12 | use core::cell::UnsafeCell; |
60c5eb7d | 13 | use core::ptr; |
1a4d82fc | 14 | |
532ac7d7 XL |
15 | use crate::boxed::Box; |
16 | use crate::sync::atomic::{AtomicPtr, AtomicUsize, Ordering}; | |
1a4d82fc | 17 | |
abe05a73 XL |
18 | use super::cache_aligned::CacheAligned; |
19 | ||
1a4d82fc JJ |
20 | // Node within the linked list queue of messages to send |
21 | struct Node<T> { | |
22 | // FIXME: this could be an uninitialized T if we're careful enough, and | |
23 | // that would reduce memory usage (and be a bit faster). | |
24 | // is it worth it? | |
60c5eb7d XL |
25 | value: Option<T>, // nullable for re-use of nodes |
26 | cached: bool, // This node goes into the node cache | |
27 | next: AtomicPtr<Node<T>>, // next node in the queue | |
1a4d82fc JJ |
28 | } |
29 | ||
30 | /// The single-producer single-consumer queue. This structure is not cloneable, | |
31 | /// but it can be safely shared in an Arc if it is guaranteed that there | |
32 | /// is only one popper and one pusher touching the queue at any one point in | |
33 | /// time. | |
60c5eb7d | 34 | pub struct Queue<T, ProducerAddition = (), ConsumerAddition = ()> { |
1a4d82fc | 35 | // consumer fields |
abe05a73 XL |
36 | consumer: CacheAligned<Consumer<T, ConsumerAddition>>, |
37 | ||
38 | // producer fields | |
39 | producer: CacheAligned<Producer<T, ProducerAddition>>, | |
40 | } | |
41 | ||
42 | struct Consumer<T, Addition> { | |
1a4d82fc | 43 | tail: UnsafeCell<*mut Node<T>>, // where to pop from |
60c5eb7d XL |
44 | tail_prev: AtomicPtr<Node<T>>, // where to pop from |
45 | cache_bound: usize, // maximum cache size | |
74b04a01 | 46 | cached_nodes: AtomicUsize, // number of nodes marked as cacheable |
abe05a73 XL |
47 | addition: Addition, |
48 | } | |
1a4d82fc | 49 | |
abe05a73 | 50 | struct Producer<T, Addition> { |
1a4d82fc JJ |
51 | head: UnsafeCell<*mut Node<T>>, // where to push to |
52 | first: UnsafeCell<*mut Node<T>>, // where to get new nodes from | |
53 | tail_copy: UnsafeCell<*mut Node<T>>, // between first/tail | |
abe05a73 | 54 | addition: Addition, |
1a4d82fc JJ |
55 | } |
56 | ||
60c5eb7d | 57 | unsafe impl<T: Send, P: Send + Sync, C: Send + Sync> Send for Queue<T, P, C> {} |
1a4d82fc | 58 | |
60c5eb7d | 59 | unsafe impl<T: Send, P: Send + Sync, C: Send + Sync> Sync for Queue<T, P, C> {} |
1a4d82fc | 60 | |
c34b1796 | 61 | impl<T> Node<T> { |
1a4d82fc | 62 | fn new() -> *mut Node<T> { |
62682a34 SL |
63 | Box::into_raw(box Node { |
64 | value: None, | |
abe05a73 | 65 | cached: false, |
62682a34 SL |
66 | next: AtomicPtr::new(ptr::null_mut::<Node<T>>()), |
67 | }) | |
1a4d82fc JJ |
68 | } |
69 | } | |
70 | ||
abe05a73 | 71 | impl<T, ProducerAddition, ConsumerAddition> Queue<T, ProducerAddition, ConsumerAddition> { |
abe05a73 XL |
72 | /// Creates a new queue. With given additional elements in the producer and |
73 | /// consumer portions of the queue. | |
74 | /// | |
75 | /// Due to the performance implications of cache-contention, | |
76 | /// we wish to keep fields used mainly by the producer on a separate cache | |
77 | /// line than those used by the consumer. | |
78 | /// Since cache lines are usually 64 bytes, it is unreasonably expensive to | |
79 | /// allocate one for small fields, so we allow users to insert additional | |
80 | /// fields into the cache lines already allocated by this for the producer | |
81 | /// and consumer. | |
1a4d82fc JJ |
82 | /// |
83 | /// This is unsafe as the type system doesn't enforce a single | |
84 | /// consumer-producer relationship. It also allows the consumer to `pop` | |
85 | /// items while there is a `peek` active due to all methods having a | |
86 | /// non-mutable receiver. | |
87 | /// | |
88 | /// # Arguments | |
89 | /// | |
90 | /// * `bound` - This queue implementation is implemented with a linked | |
91 | /// list, and this means that a push is always a malloc. In | |
92 | /// order to amortize this cost, an internal cache of nodes is | |
93 | /// maintained to prevent a malloc from always being | |
94 | /// necessary. This bound is the limit on the size of the | |
95 | /// cache (if desired). If the value is 0, then the cache has | |
96 | /// no bound. Otherwise, the cache will never grow larger than | |
97 | /// `bound` (although the queue itself could be much larger. | |
abe05a73 XL |
98 | pub unsafe fn with_additions( |
99 | bound: usize, | |
100 | producer_addition: ProducerAddition, | |
101 | consumer_addition: ConsumerAddition, | |
102 | ) -> Self { | |
1a4d82fc JJ |
103 | let n1 = Node::new(); |
104 | let n2 = Node::new(); | |
105 | (*n1).next.store(n2, Ordering::Relaxed); | |
106 | Queue { | |
abe05a73 XL |
107 | consumer: CacheAligned::new(Consumer { |
108 | tail: UnsafeCell::new(n2), | |
109 | tail_prev: AtomicPtr::new(n1), | |
110 | cache_bound: bound, | |
111 | cached_nodes: AtomicUsize::new(0), | |
60c5eb7d | 112 | addition: consumer_addition, |
abe05a73 XL |
113 | }), |
114 | producer: CacheAligned::new(Producer { | |
115 | head: UnsafeCell::new(n2), | |
116 | first: UnsafeCell::new(n1), | |
117 | tail_copy: UnsafeCell::new(n1), | |
60c5eb7d | 118 | addition: producer_addition, |
abe05a73 | 119 | }), |
1a4d82fc JJ |
120 | } |
121 | } | |
122 | ||
123 | /// Pushes a new value onto this queue. Note that to use this function | |
124 | /// safely, it must be externally guaranteed that there is only one pusher. | |
125 | pub fn push(&self, t: T) { | |
126 | unsafe { | |
127 | // Acquire a node (which either uses a cached one or allocates a new | |
128 | // one), and then append this to the 'head' node. | |
129 | let n = self.alloc(); | |
130 | assert!((*n).value.is_none()); | |
131 | (*n).value = Some(t); | |
85aaf69f | 132 | (*n).next.store(ptr::null_mut(), Ordering::Relaxed); |
abe05a73 XL |
133 | (**self.producer.head.get()).next.store(n, Ordering::Release); |
134 | *(&self.producer.head).get() = n; | |
1a4d82fc JJ |
135 | } |
136 | } | |
137 | ||
138 | unsafe fn alloc(&self) -> *mut Node<T> { | |
139 | // First try to see if we can consume the 'first' node for our uses. | |
abe05a73 XL |
140 | if *self.producer.first.get() != *self.producer.tail_copy.get() { |
141 | let ret = *self.producer.first.get(); | |
142 | *self.producer.0.first.get() = (*ret).next.load(Ordering::Relaxed); | |
1a4d82fc JJ |
143 | return ret; |
144 | } | |
145 | // If the above fails, then update our copy of the tail and try | |
146 | // again. | |
60c5eb7d | 147 | *self.producer.0.tail_copy.get() = self.consumer.tail_prev.load(Ordering::Acquire); |
abe05a73 XL |
148 | if *self.producer.first.get() != *self.producer.tail_copy.get() { |
149 | let ret = *self.producer.first.get(); | |
150 | *self.producer.0.first.get() = (*ret).next.load(Ordering::Relaxed); | |
1a4d82fc JJ |
151 | return ret; |
152 | } | |
153 | // If all of that fails, then we have to allocate a new node | |
154 | // (there's nothing in the node cache). | |
155 | Node::new() | |
156 | } | |
157 | ||
158 | /// Attempts to pop a value from this queue. Remember that to use this type | |
159 | /// safely you must ensure that there is only one popper at a time. | |
160 | pub fn pop(&self) -> Option<T> { | |
161 | unsafe { | |
162 | // The `tail` node is not actually a used node, but rather a | |
163 | // sentinel from where we should start popping from. Hence, look at | |
164 | // tail's next field and see if we can use it. If we do a pop, then | |
165 | // the current tail node is a candidate for going into the cache. | |
abe05a73 | 166 | let tail = *self.consumer.tail.get(); |
1a4d82fc | 167 | let next = (*tail).next.load(Ordering::Acquire); |
60c5eb7d XL |
168 | if next.is_null() { |
169 | return None; | |
170 | } | |
1a4d82fc JJ |
171 | assert!((*next).value.is_some()); |
172 | let ret = (*next).value.take(); | |
173 | ||
abe05a73 XL |
174 | *self.consumer.0.tail.get() = next; |
175 | if self.consumer.cache_bound == 0 { | |
176 | self.consumer.tail_prev.store(tail, Ordering::Release); | |
1a4d82fc | 177 | } else { |
abe05a73 XL |
178 | let cached_nodes = self.consumer.cached_nodes.load(Ordering::Relaxed); |
179 | if cached_nodes < self.consumer.cache_bound && !(*tail).cached { | |
180 | self.consumer.cached_nodes.store(cached_nodes, Ordering::Relaxed); | |
181 | (*tail).cached = true; | |
182 | } | |
183 | ||
184 | if (*tail).cached { | |
185 | self.consumer.tail_prev.store(tail, Ordering::Release); | |
1a4d82fc | 186 | } else { |
abe05a73 | 187 | (*self.consumer.tail_prev.load(Ordering::Relaxed)) |
60c5eb7d XL |
188 | .next |
189 | .store(next, Ordering::Relaxed); | |
1a4d82fc JJ |
190 | // We have successfully erased all references to 'tail', so |
191 | // now we can safely drop it. | |
c34b1796 | 192 | let _: Box<Node<T>> = Box::from_raw(tail); |
1a4d82fc JJ |
193 | } |
194 | } | |
e9174d1e | 195 | ret |
1a4d82fc JJ |
196 | } |
197 | } | |
198 | ||
199 | /// Attempts to peek at the head of the queue, returning `None` if the queue | |
200 | /// has no data currently | |
201 | /// | |
202 | /// # Warning | |
203 | /// The reference returned is invalid if it is not used before the consumer | |
204 | /// pops the value off the queue. If the producer then pushes another value | |
205 | /// onto the queue, it will overwrite the value pointed to by the reference. | |
e9174d1e | 206 | pub fn peek(&self) -> Option<&mut T> { |
1a4d82fc JJ |
207 | // This is essentially the same as above with all the popping bits |
208 | // stripped out. | |
209 | unsafe { | |
abe05a73 | 210 | let tail = *self.consumer.tail.get(); |
1a4d82fc | 211 | let next = (*tail).next.load(Ordering::Acquire); |
e9174d1e | 212 | if next.is_null() { None } else { (*next).value.as_mut() } |
1a4d82fc JJ |
213 | } |
214 | } | |
abe05a73 XL |
215 | |
216 | pub fn producer_addition(&self) -> &ProducerAddition { | |
217 | &self.producer.addition | |
218 | } | |
219 | ||
220 | pub fn consumer_addition(&self) -> &ConsumerAddition { | |
221 | &self.consumer.addition | |
222 | } | |
1a4d82fc JJ |
223 | } |
224 | ||
abe05a73 | 225 | impl<T, ProducerAddition, ConsumerAddition> Drop for Queue<T, ProducerAddition, ConsumerAddition> { |
1a4d82fc JJ |
226 | fn drop(&mut self) { |
227 | unsafe { | |
abe05a73 | 228 | let mut cur = *self.producer.first.get(); |
1a4d82fc JJ |
229 | while !cur.is_null() { |
230 | let next = (*cur).next.load(Ordering::Relaxed); | |
c34b1796 | 231 | let _n: Box<Node<T>> = Box::from_raw(cur); |
1a4d82fc JJ |
232 | cur = next; |
233 | } | |
234 | } | |
235 | } | |
236 | } |