]> git.proxmox.com Git - rustc.git/blobdiff - vendor/rustc-rayon/src/iter/par_bridge.rs
New upstream version 1.63.0+dfsg1
[rustc.git] / vendor / rustc-rayon / src / iter / par_bridge.rs
index 07d26de08c1edb0b51e5ad0e1e03c79988be6465..339ac1a321fe79b79acd4f59e2c9955a6e2b9cb9 100644 (file)
@@ -44,7 +44,7 @@ use crate::iter::ParallelIterator;
 /// assert_eq!(&*output, &["one!", "three!", "two!"]);
 /// ```
 pub trait ParallelBridge: Sized {
-    /// Create a bridge from this type to a `ParallelIterator`.
+    /// Creates a bridge from this type to a `ParallelIterator`.
     fn par_bridge(self) -> IterBridge<Self>;
 }
 
@@ -125,16 +125,19 @@ where
         let mut count = self.split_count.load(Ordering::SeqCst);
 
         loop {
-            let done = self.done.load(Ordering::SeqCst);
+            // Check if the iterator is exhausted *and* we've consumed every item from it.
+            let done = self.done.load(Ordering::SeqCst) && self.items.is_empty();
+
             match count.checked_sub(1) {
                 Some(new_count) if !done => {
-                    let last_count =
-                        self.split_count
-                            .compare_and_swap(count, new_count, Ordering::SeqCst);
-                    if last_count == count {
-                        return (self.clone(), Some(self));
-                    } else {
-                        count = last_count;
+                    match self.split_count.compare_exchange_weak(
+                        count,
+                        new_count,
+                        Ordering::SeqCst,
+                        Ordering::SeqCst,
+                    ) {
+                        Ok(_) => return (self.clone(), Some(self)),
+                        Err(last_count) => count = last_count,
                     }
                 }
                 _ => {
@@ -157,13 +160,26 @@ where
                     }
                 }
                 Steal::Empty => {
+                    // Don't storm the mutex if we're already done.
                     if self.done.load(Ordering::SeqCst) {
-                        // the iterator is out of items, no use in continuing
-                        return folder;
+                        // Someone might have pushed more between our `steal()` and `done.load()`
+                        if self.items.is_empty() {
+                            // The iterator is out of items, no use in continuing
+                            return folder;
+                        }
                     } else {
                         // our cache is out of items, time to load more from the iterator
                         match self.iter.try_lock() {
                             Ok(mut guard) => {
+                                // Check `done` again in case we raced with the previous lock
+                                // holder on its way out.
+                                if self.done.load(Ordering::SeqCst) {
+                                    if self.items.is_empty() {
+                                        return folder;
+                                    }
+                                    continue;
+                                }
+
                                 let count = current_num_threads();
                                 let count = (count * count) * 2;
 
@@ -184,7 +200,7 @@ where
                             }
                             Err(TryLockError::WouldBlock) => {
                                 // someone else has the mutex, just sit tight until it's ready
-                                yield_now(); //TODO: use a thread=pool-aware yield? (#548)
+                                yield_now(); //TODO: use a thread-pool-aware yield? (#548)
                             }
                             Err(TryLockError::Poisoned(_)) => {
                                 // any panics from other threads will have been caught by the pool,