1 // Copyright 2017 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
11 //! A single-producer single-consumer concurrent queue
13 //! This module contains the implementation of an SPSC queue which can be used
14 //! concurrently between two threads. This data structure is safe to use and
15 //! enforces the semantics that there is one pusher and one popper.
17 // http://www.1024cores.net/home/lock-free-algorithms/queues/unbounded-spsc-queue
21 use core
::cell
::UnsafeCell
;
23 use sync
::atomic
::{AtomicPtr, AtomicUsize, Ordering}
;
25 use super::cache_aligned
::CacheAligned
;
27 // Node within the linked list queue of messages to send
29 // FIXME: this could be an uninitialized T if we're careful enough, and
30 // that would reduce memory usage (and be a bit faster).
32 value
: Option
<T
>, // nullable for re-use of nodes
33 cached
: bool
, // This node goes into the node cache
34 next
: AtomicPtr
<Node
<T
>>, // next node in the queue
37 /// The single-producer single-consumer queue. This structure is not cloneable,
38 /// but it can be safely shared in an Arc if it is guaranteed that there
39 /// is only one popper and one pusher touching the queue at any one point in
41 pub struct Queue
<T
, ProducerAddition
=(), ConsumerAddition
=()> {
43 consumer
: CacheAligned
<Consumer
<T
, ConsumerAddition
>>,
46 producer
: CacheAligned
<Producer
<T
, ProducerAddition
>>,
49 struct Consumer
<T
, Addition
> {
50 tail
: UnsafeCell
<*mut Node
<T
>>, // where to pop from
51 tail_prev
: AtomicPtr
<Node
<T
>>, // where to pop from
52 cache_bound
: usize, // maximum cache size
53 cached_nodes
: AtomicUsize
, // number of nodes marked as cachable
57 struct Producer
<T
, Addition
> {
58 head
: UnsafeCell
<*mut Node
<T
>>, // where to push to
59 first
: UnsafeCell
<*mut Node
<T
>>, // where to get new nodes from
60 tail_copy
: UnsafeCell
<*mut Node
<T
>>, // between first/tail
64 unsafe impl<T
: Send
, P
: Send
+ Sync
, C
: Send
+ Sync
> Send
for Queue
<T
, P
, C
> { }
66 unsafe impl<T
: Send
, P
: Send
+ Sync
, C
: Send
+ Sync
> Sync
for Queue
<T
, P
, C
> { }
69 fn new() -> *mut Node
<T
> {
70 Box
::into_raw(box Node
{
73 next
: AtomicPtr
::new(ptr
::null_mut
::<Node
<T
>>()),
78 impl<T
, ProducerAddition
, ConsumerAddition
> Queue
<T
, ProducerAddition
, ConsumerAddition
> {
80 /// Creates a new queue. With given additional elements in the producer and
81 /// consumer portions of the queue.
83 /// Due to the performance implications of cache-contention,
84 /// we wish to keep fields used mainly by the producer on a separate cache
85 /// line than those used by the consumer.
86 /// Since cache lines are usually 64 bytes, it is unreasonably expensive to
87 /// allocate one for small fields, so we allow users to insert additional
88 /// fields into the cache lines already allocated by this for the producer
91 /// This is unsafe as the type system doesn't enforce a single
92 /// consumer-producer relationship. It also allows the consumer to `pop`
93 /// items while there is a `peek` active due to all methods having a
94 /// non-mutable receiver.
98 /// * `bound` - This queue implementation is implemented with a linked
99 /// list, and this means that a push is always a malloc. In
100 /// order to amortize this cost, an internal cache of nodes is
101 /// maintained to prevent a malloc from always being
102 /// necessary. This bound is the limit on the size of the
103 /// cache (if desired). If the value is 0, then the cache has
104 /// no bound. Otherwise, the cache will never grow larger than
105 /// `bound` (although the queue itself could be much larger.
106 pub unsafe fn with_additions(
108 producer_addition
: ProducerAddition
,
109 consumer_addition
: ConsumerAddition
,
111 let n1
= Node
::new();
112 let n2
= Node
::new();
113 (*n1
).next
.store(n2
, Ordering
::Relaxed
);
115 consumer
: CacheAligned
::new(Consumer
{
116 tail
: UnsafeCell
::new(n2
),
117 tail_prev
: AtomicPtr
::new(n1
),
119 cached_nodes
: AtomicUsize
::new(0),
120 addition
: consumer_addition
122 producer
: CacheAligned
::new(Producer
{
123 head
: UnsafeCell
::new(n2
),
124 first
: UnsafeCell
::new(n1
),
125 tail_copy
: UnsafeCell
::new(n1
),
126 addition
: producer_addition
131 /// Pushes a new value onto this queue. Note that to use this function
132 /// safely, it must be externally guaranteed that there is only one pusher.
133 pub fn push(&self, t
: T
) {
135 // Acquire a node (which either uses a cached one or allocates a new
136 // one), and then append this to the 'head' node.
137 let n
= self.alloc();
138 assert
!((*n
).value
.is_none());
139 (*n
).value
= Some(t
);
140 (*n
).next
.store(ptr
::null_mut(), Ordering
::Relaxed
);
141 (**self.producer
.head
.get()).next
.store(n
, Ordering
::Release
);
142 *(&self.producer
.head
).get() = n
;
146 unsafe fn alloc(&self) -> *mut Node
<T
> {
147 // First try to see if we can consume the 'first' node for our uses.
148 if *self.producer
.first
.get() != *self.producer
.tail_copy
.get() {
149 let ret
= *self.producer
.first
.get();
150 *self.producer
.0.first
.get() = (*ret
).next
.load(Ordering
::Relaxed
);
153 // If the above fails, then update our copy of the tail and try
155 *self.producer
.0.tail_copy
.get() =
156 self.consumer
.tail_prev
.load(Ordering
::Acquire
);
157 if *self.producer
.first
.get() != *self.producer
.tail_copy
.get() {
158 let ret
= *self.producer
.first
.get();
159 *self.producer
.0.first
.get() = (*ret
).next
.load(Ordering
::Relaxed
);
162 // If all of that fails, then we have to allocate a new node
163 // (there's nothing in the node cache).
167 /// Attempts to pop a value from this queue. Remember that to use this type
168 /// safely you must ensure that there is only one popper at a time.
169 pub fn pop(&self) -> Option
<T
> {
171 // The `tail` node is not actually a used node, but rather a
172 // sentinel from where we should start popping from. Hence, look at
173 // tail's next field and see if we can use it. If we do a pop, then
174 // the current tail node is a candidate for going into the cache.
175 let tail
= *self.consumer
.tail
.get();
176 let next
= (*tail
).next
.load(Ordering
::Acquire
);
177 if next
.is_null() { return None }
178 assert
!((*next
).value
.is_some());
179 let ret
= (*next
).value
.take();
181 *self.consumer
.0.tail
.get() = next
;
182 if self.consumer
.cache_bound
== 0 {
183 self.consumer
.tail_prev
.store(tail
, Ordering
::Release
);
185 let cached_nodes
= self.consumer
.cached_nodes
.load(Ordering
::Relaxed
);
186 if cached_nodes
< self.consumer
.cache_bound
&& !(*tail
).cached
{
187 self.consumer
.cached_nodes
.store(cached_nodes
, Ordering
::Relaxed
);
188 (*tail
).cached
= true;
192 self.consumer
.tail_prev
.store(tail
, Ordering
::Release
);
194 (*self.consumer
.tail_prev
.load(Ordering
::Relaxed
))
195 .next
.store(next
, Ordering
::Relaxed
);
196 // We have successfully erased all references to 'tail', so
197 // now we can safely drop it.
198 let _
: Box
<Node
<T
>> = Box
::from_raw(tail
);
205 /// Attempts to peek at the head of the queue, returning `None` if the queue
206 /// has no data currently
209 /// The reference returned is invalid if it is not used before the consumer
210 /// pops the value off the queue. If the producer then pushes another value
211 /// onto the queue, it will overwrite the value pointed to by the reference.
212 pub fn peek(&self) -> Option
<&mut T
> {
213 // This is essentially the same as above with all the popping bits
216 let tail
= *self.consumer
.tail
.get();
217 let next
= (*tail
).next
.load(Ordering
::Acquire
);
218 if next
.is_null() { None }
else { (*next).value.as_mut() }
222 pub fn producer_addition(&self) -> &ProducerAddition
{
223 &self.producer
.addition
226 pub fn consumer_addition(&self) -> &ConsumerAddition
{
227 &self.consumer
.addition
231 impl<T
, ProducerAddition
, ConsumerAddition
> Drop
for Queue
<T
, ProducerAddition
, ConsumerAddition
> {
234 let mut cur
= *self.producer
.first
.get();
235 while !cur
.is_null() {
236 let next
= (*cur
).next
.load(Ordering
::Relaxed
);
237 let _n
: Box
<Node
<T
>> = Box
::from_raw(cur
);
244 #[cfg(all(test, not(target_os = "emscripten")))]
249 use sync
::mpsc
::channel
;
254 let queue
= Queue
::with_additions(0, (), ());
257 assert_eq
!(queue
.pop(), Some(1));
258 assert_eq
!(queue
.pop(), Some(2));
259 assert_eq
!(queue
.pop(), None
);
262 assert_eq
!(queue
.pop(), Some(3));
263 assert_eq
!(queue
.pop(), Some(4));
264 assert_eq
!(queue
.pop(), None
);
271 let queue
= Queue
::with_additions(0, (), ());
274 // Ensure the borrowchecker works
277 assert_eq
!(&*vec
, &[1]);
279 None
=> unreachable
!()
284 assert_eq
!(&*vec
, &[1]);
286 None
=> unreachable
!()
294 let q
: Queue
<Box
<_
>> = Queue
::with_additions(0, (), ());
303 let q
= Queue
::with_additions(0, (), ());
306 assert_eq
!(q
.pop(), Some(1));
307 assert_eq
!(q
.pop(), Some(2));
308 assert_eq
!(q
.pop(), None
);
311 assert_eq
!(q
.pop(), Some(3));
312 assert_eq
!(q
.pop(), Some(4));
313 assert_eq
!(q
.pop(), None
);
324 unsafe fn stress_bound(bound
: usize) {
325 let q
= Arc
::new(Queue
::with_additions(bound
, (), ()));
327 let (tx
, rx
) = channel();
329 let _t
= thread
::spawn(move|| {
339 tx
.send(()).unwrap();