use sync::atomic::{AtomicPtr, AtomicUsize, Ordering};
+use super::cache_aligned::CacheAligned;
+
// Node within the linked list queue of messages to send
struct Node<T> {
// FIXME: this could be an uninitialized T if we're careful enough, and
// that would reduce memory usage (and be a bit faster).
// is it worth it?
value: Option<T>, // nullable for re-use of nodes
+ cached: bool, // This node goes into the node cache
next: AtomicPtr<Node<T>>, // next node in the queue
}
/// but it can be safely shared in an Arc if it is guaranteed that there
/// is only one popper and one pusher touching the queue at any one point in
/// time.
-pub struct Queue<T> {
+pub struct Queue<T, ProducerAddition=(), ConsumerAddition=()> {
// consumer fields
+ consumer: CacheAligned<Consumer<T, ConsumerAddition>>,
+
+ // producer fields
+ producer: CacheAligned<Producer<T, ProducerAddition>>,
+}
+
+struct Consumer<T, Addition> {
tail: UnsafeCell<*mut Node<T>>, // where to pop from
tail_prev: AtomicPtr<Node<T>>, // where to pop from
+ cache_bound: usize, // maximum cache size
+ cached_nodes: AtomicUsize, // number of nodes marked as cachable
+ addition: Addition,
+}
- // producer fields
+struct Producer<T, Addition> {
head: UnsafeCell<*mut Node<T>>, // where to push to
first: UnsafeCell<*mut Node<T>>, // where to get new nodes from
tail_copy: UnsafeCell<*mut Node<T>>, // between first/tail
-
- // Cache maintenance fields. Additions and subtractions are stored
- // separately in order to allow them to use nonatomic addition/subtraction.
- cache_bound: usize,
- cache_additions: AtomicUsize,
- cache_subtractions: AtomicUsize,
+ addition: Addition,
}
-unsafe impl<T: Send> Send for Queue<T> { }
+unsafe impl<T: Send, P: Send + Sync, C: Send + Sync> Send for Queue<T, P, C> { }
-unsafe impl<T: Send> Sync for Queue<T> { }
+unsafe impl<T: Send, P: Send + Sync, C: Send + Sync> Sync for Queue<T, P, C> { }
impl<T> Node<T> {
fn new() -> *mut Node<T> {
Box::into_raw(box Node {
value: None,
+ cached: false,
next: AtomicPtr::new(ptr::null_mut::<Node<T>>()),
})
}
}
-impl<T> Queue<T> {
- /// Creates a new queue.
+impl<T, ProducerAddition, ConsumerAddition> Queue<T, ProducerAddition, ConsumerAddition> {
+
+ /// Creates a new queue. With given additional elements in the producer and
+ /// consumer portions of the queue.
+ ///
+ /// Due to the performance implications of cache-contention,
+ /// we wish to keep fields used mainly by the producer on a separate cache
+ /// line than those used by the consumer.
+ /// Since cache lines are usually 64 bytes, it is unreasonably expensive to
+ /// allocate one for small fields, so we allow users to insert additional
+ /// fields into the cache lines already allocated by this for the producer
+ /// and consumer.
///
/// This is unsafe as the type system doesn't enforce a single
/// consumer-producer relationship. It also allows the consumer to `pop`
/// cache (if desired). If the value is 0, then the cache has
/// no bound. Otherwise, the cache will never grow larger than
/// `bound` (although the queue itself could be much larger.
- pub unsafe fn new(bound: usize) -> Queue<T> {
+ pub unsafe fn with_additions(
+ bound: usize,
+ producer_addition: ProducerAddition,
+ consumer_addition: ConsumerAddition,
+ ) -> Self {
let n1 = Node::new();
let n2 = Node::new();
(*n1).next.store(n2, Ordering::Relaxed);
Queue {
- tail: UnsafeCell::new(n2),
- tail_prev: AtomicPtr::new(n1),
- head: UnsafeCell::new(n2),
- first: UnsafeCell::new(n1),
- tail_copy: UnsafeCell::new(n1),
- cache_bound: bound,
- cache_additions: AtomicUsize::new(0),
- cache_subtractions: AtomicUsize::new(0),
+ consumer: CacheAligned::new(Consumer {
+ tail: UnsafeCell::new(n2),
+ tail_prev: AtomicPtr::new(n1),
+ cache_bound: bound,
+ cached_nodes: AtomicUsize::new(0),
+ addition: consumer_addition
+ }),
+ producer: CacheAligned::new(Producer {
+ head: UnsafeCell::new(n2),
+ first: UnsafeCell::new(n1),
+ tail_copy: UnsafeCell::new(n1),
+ addition: producer_addition
+ }),
}
}
assert!((*n).value.is_none());
(*n).value = Some(t);
(*n).next.store(ptr::null_mut(), Ordering::Relaxed);
- (**self.head.get()).next.store(n, Ordering::Release);
- *self.head.get() = n;
+ (**self.producer.head.get()).next.store(n, Ordering::Release);
+ *(&self.producer.head).get() = n;
}
}
unsafe fn alloc(&self) -> *mut Node<T> {
// First try to see if we can consume the 'first' node for our uses.
- // We try to avoid as many atomic instructions as possible here, so
- // the addition to cache_subtractions is not atomic (plus we're the
- // only one subtracting from the cache).
- if *self.first.get() != *self.tail_copy.get() {
- if self.cache_bound > 0 {
- let b = self.cache_subtractions.load(Ordering::Relaxed);
- self.cache_subtractions.store(b + 1, Ordering::Relaxed);
- }
- let ret = *self.first.get();
- *self.first.get() = (*ret).next.load(Ordering::Relaxed);
+ if *self.producer.first.get() != *self.producer.tail_copy.get() {
+ let ret = *self.producer.first.get();
+ *self.producer.0.first.get() = (*ret).next.load(Ordering::Relaxed);
return ret;
}
// If the above fails, then update our copy of the tail and try
// again.
- *self.tail_copy.get() = self.tail_prev.load(Ordering::Acquire);
- if *self.first.get() != *self.tail_copy.get() {
- if self.cache_bound > 0 {
- let b = self.cache_subtractions.load(Ordering::Relaxed);
- self.cache_subtractions.store(b + 1, Ordering::Relaxed);
- }
- let ret = *self.first.get();
- *self.first.get() = (*ret).next.load(Ordering::Relaxed);
+ *self.producer.0.tail_copy.get() =
+ self.consumer.tail_prev.load(Ordering::Acquire);
+ if *self.producer.first.get() != *self.producer.tail_copy.get() {
+ let ret = *self.producer.first.get();
+ *self.producer.0.first.get() = (*ret).next.load(Ordering::Relaxed);
return ret;
}
// If all of that fails, then we have to allocate a new node
// sentinel from where we should start popping from. Hence, look at
// tail's next field and see if we can use it. If we do a pop, then
// the current tail node is a candidate for going into the cache.
- let tail = *self.tail.get();
+ let tail = *self.consumer.tail.get();
let next = (*tail).next.load(Ordering::Acquire);
if next.is_null() { return None }
assert!((*next).value.is_some());
let ret = (*next).value.take();
- *self.tail.get() = next;
- if self.cache_bound == 0 {
- self.tail_prev.store(tail, Ordering::Release);
+ *self.consumer.0.tail.get() = next;
+ if self.consumer.cache_bound == 0 {
+ self.consumer.tail_prev.store(tail, Ordering::Release);
} else {
- // FIXME: this is dubious with overflow.
- let additions = self.cache_additions.load(Ordering::Relaxed);
- let subtractions = self.cache_subtractions.load(Ordering::Relaxed);
- let size = additions - subtractions;
-
- if size < self.cache_bound {
- self.tail_prev.store(tail, Ordering::Release);
- self.cache_additions.store(additions + 1, Ordering::Relaxed);
+ let cached_nodes = self.consumer.cached_nodes.load(Ordering::Relaxed);
+ if cached_nodes < self.consumer.cache_bound && !(*tail).cached {
+ self.consumer.cached_nodes.store(cached_nodes, Ordering::Relaxed);
+ (*tail).cached = true;
+ }
+
+ if (*tail).cached {
+ self.consumer.tail_prev.store(tail, Ordering::Release);
} else {
- (*self.tail_prev.load(Ordering::Relaxed))
- .next.store(next, Ordering::Relaxed);
+ (*self.consumer.tail_prev.load(Ordering::Relaxed))
+ .next.store(next, Ordering::Relaxed);
// We have successfully erased all references to 'tail', so
// now we can safely drop it.
let _: Box<Node<T>> = Box::from_raw(tail);
// This is essentially the same as above with all the popping bits
// stripped out.
unsafe {
- let tail = *self.tail.get();
+ let tail = *self.consumer.tail.get();
let next = (*tail).next.load(Ordering::Acquire);
if next.is_null() { None } else { (*next).value.as_mut() }
}
}
+
+ pub fn producer_addition(&self) -> &ProducerAddition {
+ &self.producer.addition
+ }
+
+ pub fn consumer_addition(&self) -> &ConsumerAddition {
+ &self.consumer.addition
+ }
}
-impl<T> Drop for Queue<T> {
+impl<T, ProducerAddition, ConsumerAddition> Drop for Queue<T, ProducerAddition, ConsumerAddition> {
fn drop(&mut self) {
unsafe {
- let mut cur = *self.first.get();
+ let mut cur = *self.producer.first.get();
while !cur.is_null() {
let next = (*cur).next.load(Ordering::Relaxed);
let _n: Box<Node<T>> = Box::from_raw(cur);
#[test]
fn smoke() {
unsafe {
- let queue = Queue::new(0);
+ let queue = Queue::with_additions(0, (), ());
queue.push(1);
queue.push(2);
assert_eq!(queue.pop(), Some(1));
#[test]
fn peek() {
unsafe {
- let queue = Queue::new(0);
+ let queue = Queue::with_additions(0, (), ());
queue.push(vec![1]);
// Ensure the borrowchecker works
#[test]
fn drop_full() {
unsafe {
- let q: Queue<Box<_>> = Queue::new(0);
+ let q: Queue<Box<_>> = Queue::with_additions(0, (), ());
q.push(box 1);
q.push(box 2);
}
#[test]
fn smoke_bound() {
unsafe {
- let q = Queue::new(0);
+ let q = Queue::with_additions(0, (), ());
q.push(1);
q.push(2);
assert_eq!(q.pop(), Some(1));
}
unsafe fn stress_bound(bound: usize) {
- let q = Arc::new(Queue::new(bound));
+ let q = Arc::new(Queue::with_additions(bound, (), ()));
let (tx, rx) = channel();
let q2 = q.clone();