]> git.proxmox.com Git - rustc.git/blame - vendor/rayon/src/iter/par_bridge.rs
New upstream version 1.69.0+dfsg1
[rustc.git] / vendor / rayon / src / iter / par_bridge.rs
CommitLineData
416331ca 1use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
9ffffee4 2use std::sync::Mutex;
416331ca 3
f035d41b
XL
4use crate::iter::plumbing::{bridge_unindexed, Folder, UnindexedConsumer, UnindexedProducer};
5use crate::iter::ParallelIterator;
9ffffee4 6use crate::{current_num_threads, current_thread_index};
416331ca
XL
7
8/// Conversion trait to convert an `Iterator` to a `ParallelIterator`.
9///
10/// This creates a "bridge" from a sequential iterator to a parallel one, by distributing its items
11/// across the Rayon thread pool. This has the advantage of being able to parallelize just about
12/// anything, but the resulting `ParallelIterator` can be less efficient than if you started with
13/// `par_iter` instead. However, it can still be useful for iterators that are difficult to
14/// parallelize by other means, like channels or file or network I/O.
15///
16/// The resulting iterator is not guaranteed to keep the order of the original iterator.
17///
18/// # Examples
19///
20/// To use this trait, take an existing `Iterator` and call `par_bridge` on it. After that, you can
21/// use any of the `ParallelIterator` methods:
22///
23/// ```
24/// use rayon::iter::ParallelBridge;
25/// use rayon::prelude::ParallelIterator;
26/// use std::sync::mpsc::channel;
27///
28/// let rx = {
29/// let (tx, rx) = channel();
30///
31/// tx.send("one!");
32/// tx.send("two!");
33/// tx.send("three!");
34///
35/// rx
36/// };
37///
38/// let mut output: Vec<&'static str> = rx.into_iter().par_bridge().collect();
39/// output.sort_unstable();
40///
41/// assert_eq!(&*output, &["one!", "three!", "two!"]);
42/// ```
43pub trait ParallelBridge: Sized {
f035d41b 44 /// Creates a bridge from this type to a `ParallelIterator`.
416331ca
XL
45 fn par_bridge(self) -> IterBridge<Self>;
46}
47
48impl<T: Iterator + Send> ParallelBridge for T
49where
50 T::Item: Send,
51{
52 fn par_bridge(self) -> IterBridge<Self> {
53 IterBridge { iter: self }
54 }
55}
56
57/// `IterBridge` is a parallel iterator that wraps a sequential iterator.
58///
59/// This type is created when using the `par_bridge` method on `ParallelBridge`. See the
60/// [`ParallelBridge`] documentation for details.
61///
62/// [`ParallelBridge`]: trait.ParallelBridge.html
63#[derive(Debug, Clone)]
64pub struct IterBridge<Iter> {
65 iter: Iter,
66}
67
68impl<Iter: Iterator + Send> ParallelIterator for IterBridge<Iter>
69where
70 Iter::Item: Send,
71{
72 type Item = Iter::Item;
73
74 fn drive_unindexed<C>(self, consumer: C) -> C::Result
75 where
76 C: UnindexedConsumer<Self::Item>,
77 {
9ffffee4
FG
78 let num_threads = current_num_threads();
79 let threads_started: Vec<_> = (0..num_threads).map(|_| AtomicBool::new(false)).collect();
416331ca
XL
80
81 bridge_unindexed(
9ffffee4
FG
82 &IterParallelProducer {
83 split_count: AtomicUsize::new(num_threads),
84 iter: Mutex::new(self.iter.fuse()),
85 threads_started: &threads_started,
416331ca
XL
86 },
87 consumer,
88 )
89 }
90}
91
9ffffee4
FG
92struct IterParallelProducer<'a, Iter> {
93 split_count: AtomicUsize,
94 iter: Mutex<std::iter::Fuse<Iter>>,
95 threads_started: &'a [AtomicBool],
416331ca
XL
96}
97
9ffffee4 98impl<Iter: Iterator + Send> UnindexedProducer for &IterParallelProducer<'_, Iter> {
416331ca
XL
99 type Item = Iter::Item;
100
101 fn split(self) -> (Self, Option<Self>) {
102 let mut count = self.split_count.load(Ordering::SeqCst);
103
104 loop {
9ffffee4
FG
105 // Check if the iterator is exhausted
106 if let Some(new_count) = count.checked_sub(1) {
107 match self.split_count.compare_exchange_weak(
108 count,
109 new_count,
110 Ordering::SeqCst,
111 Ordering::SeqCst,
112 ) {
113 Ok(_) => return (self, Some(self)),
114 Err(last_count) => count = last_count,
416331ca 115 }
9ffffee4
FG
116 } else {
117 return (self, None);
416331ca
XL
118 }
119 }
120 }
121
122 fn fold_with<F>(self, mut folder: F) -> F
123 where
124 F: Folder<Self::Item>,
125 {
9ffffee4
FG
126 // Guard against work-stealing-induced recursion, in case `Iter::next()`
127 // calls rayon internally, so we don't deadlock our mutex. We might also
128 // be recursing via `folder` methods, which doesn't present a mutex hazard,
129 // but it's lower overhead for us to just check this once, rather than
130 // updating additional shared state on every mutex lock/unlock.
131 // (If this isn't a rayon thread, then there's no work-stealing anyway...)
132 if let Some(i) = current_thread_index() {
133 // Note: If the number of threads in the pool ever grows dynamically, then
134 // we'll end up sharing flags and may falsely detect recursion -- that's
135 // still fine for overall correctness, just not optimal for parallelism.
136 let thread_started = &self.threads_started[i % self.threads_started.len()];
137 if thread_started.swap(true, Ordering::Relaxed) {
138 // We can't make progress with a nested mutex, so just return and let
139 // the outermost loop continue with the rest of the iterator items.
140 return folder;
141 }
142 }
143
416331ca 144 loop {
9ffffee4
FG
145 if let Ok(mut iter) = self.iter.lock() {
146 if let Some(it) = iter.next() {
147 drop(iter);
416331ca
XL
148 folder = folder.consume(it);
149 if folder.full() {
150 return folder;
151 }
9ffffee4
FG
152 } else {
153 return folder;
416331ca 154 }
9ffffee4
FG
155 } else {
156 // any panics from other threads will have been caught by the pool,
157 // and will be re-thrown when joined - just exit
158 return folder;
416331ca
XL
159 }
160 }
161 }
162}