]> git.proxmox.com Git - rustc.git/blame - vendor/rustc-rayon/src/iter/par_bridge.rs
New upstream version 1.63.0+dfsg1
[rustc.git] / vendor / rustc-rayon / src / iter / par_bridge.rs
CommitLineData
e74abb32 1use crossbeam_deque::{Steal, Stealer, Worker};
532ac7d7
XL
2
3use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
4use std::sync::{Mutex, TryLockError};
5use std::thread::yield_now;
6
6a06907d
XL
7use crate::current_num_threads;
8use crate::iter::plumbing::{bridge_unindexed, Folder, UnindexedConsumer, UnindexedProducer};
9use crate::iter::ParallelIterator;
532ac7d7
XL
10
11/// Conversion trait to convert an `Iterator` to a `ParallelIterator`.
12///
13/// This creates a "bridge" from a sequential iterator to a parallel one, by distributing its items
14/// across the Rayon thread pool. This has the advantage of being able to parallelize just about
15/// anything, but the resulting `ParallelIterator` can be less efficient than if you started with
16/// `par_iter` instead. However, it can still be useful for iterators that are difficult to
17/// parallelize by other means, like channels or file or network I/O.
18///
19/// The resulting iterator is not guaranteed to keep the order of the original iterator.
20///
21/// # Examples
22///
23/// To use this trait, take an existing `Iterator` and call `par_bridge` on it. After that, you can
24/// use any of the `ParallelIterator` methods:
25///
26/// ```
27/// use rayon::iter::ParallelBridge;
28/// use rayon::prelude::ParallelIterator;
29/// use std::sync::mpsc::channel;
30///
31/// let rx = {
32/// let (tx, rx) = channel();
33///
34/// tx.send("one!");
35/// tx.send("two!");
36/// tx.send("three!");
37///
38/// rx
39/// };
40///
41/// let mut output: Vec<&'static str> = rx.into_iter().par_bridge().collect();
42/// output.sort_unstable();
43///
44/// assert_eq!(&*output, &["one!", "three!", "two!"]);
45/// ```
46pub trait ParallelBridge: Sized {
923072b8 47 /// Creates a bridge from this type to a `ParallelIterator`.
532ac7d7
XL
48 fn par_bridge(self) -> IterBridge<Self>;
49}
50
51impl<T: Iterator + Send> ParallelBridge for T
52where
53 T::Item: Send,
54{
55 fn par_bridge(self) -> IterBridge<Self> {
56 IterBridge { iter: self }
57 }
58}
59
60/// `IterBridge` is a parallel iterator that wraps a sequential iterator.
61///
62/// This type is created when using the `par_bridge` method on `ParallelBridge`. See the
63/// [`ParallelBridge`] documentation for details.
64///
65/// [`ParallelBridge`]: trait.ParallelBridge.html
66#[derive(Debug, Clone)]
67pub struct IterBridge<Iter> {
68 iter: Iter,
69}
70
71impl<Iter: Iterator + Send> ParallelIterator for IterBridge<Iter>
72where
73 Iter::Item: Send,
74{
75 type Item = Iter::Item;
76
77 fn drive_unindexed<C>(self, consumer: C) -> C::Result
78 where
79 C: UnindexedConsumer<Self::Item>,
80 {
81 let split_count = AtomicUsize::new(current_num_threads());
e74abb32
XL
82 let worker = Worker::new_fifo();
83 let stealer = worker.stealer();
532ac7d7 84 let done = AtomicBool::new(false);
e74abb32 85 let iter = Mutex::new((self.iter, worker));
532ac7d7
XL
86
87 bridge_unindexed(
88 IterParallelProducer {
89 split_count: &split_count,
90 done: &done,
91 iter: &iter,
92 items: stealer,
93 },
94 consumer,
95 )
96 }
97}
98
6a06907d 99struct IterParallelProducer<'a, Iter: Iterator> {
532ac7d7
XL
100 split_count: &'a AtomicUsize,
101 done: &'a AtomicBool,
e74abb32 102 iter: &'a Mutex<(Iter, Worker<Iter::Item>)>,
532ac7d7
XL
103 items: Stealer<Iter::Item>,
104}
105
106// manual clone because T doesn't need to be Clone, but the derive assumes it should be
107impl<'a, Iter: Iterator + 'a> Clone for IterParallelProducer<'a, Iter> {
108 fn clone(&self) -> Self {
109 IterParallelProducer {
110 split_count: self.split_count,
111 done: self.done,
112 iter: self.iter,
113 items: self.items.clone(),
114 }
115 }
116}
117
118impl<'a, Iter: Iterator + Send + 'a> UnindexedProducer for IterParallelProducer<'a, Iter>
119where
120 Iter::Item: Send,
121{
122 type Item = Iter::Item;
123
124 fn split(self) -> (Self, Option<Self>) {
125 let mut count = self.split_count.load(Ordering::SeqCst);
126
127 loop {
923072b8
FG
128 // Check if the iterator is exhausted *and* we've consumed every item from it.
129 let done = self.done.load(Ordering::SeqCst) && self.items.is_empty();
130
532ac7d7
XL
131 match count.checked_sub(1) {
132 Some(new_count) if !done => {
923072b8
FG
133 match self.split_count.compare_exchange_weak(
134 count,
135 new_count,
136 Ordering::SeqCst,
137 Ordering::SeqCst,
138 ) {
139 Ok(_) => return (self.clone(), Some(self)),
140 Err(last_count) => count = last_count,
532ac7d7
XL
141 }
142 }
143 _ => {
144 return (self, None);
145 }
146 }
147 }
148 }
149
150 fn fold_with<F>(self, mut folder: F) -> F
151 where
152 F: Folder<Self::Item>,
153 {
154 loop {
155 match self.items.steal() {
e74abb32 156 Steal::Success(it) => {
532ac7d7
XL
157 folder = folder.consume(it);
158 if folder.full() {
159 return folder;
160 }
161 }
162 Steal::Empty => {
923072b8 163 // Don't storm the mutex if we're already done.
532ac7d7 164 if self.done.load(Ordering::SeqCst) {
923072b8
FG
165 // Someone might have pushed more between our `steal()` and `done.load()`
166 if self.items.is_empty() {
167 // The iterator is out of items, no use in continuing
168 return folder;
169 }
532ac7d7
XL
170 } else {
171 // our cache is out of items, time to load more from the iterator
172 match self.iter.try_lock() {
173 Ok(mut guard) => {
923072b8
FG
174 // Check `done` again in case we raced with the previous lock
175 // holder on its way out.
176 if self.done.load(Ordering::SeqCst) {
177 if self.items.is_empty() {
178 return folder;
179 }
180 continue;
181 }
182
532ac7d7
XL
183 let count = current_num_threads();
184 let count = (count * count) * 2;
185
e74abb32 186 let (ref mut iter, ref worker) = *guard;
532ac7d7 187
e74abb32
XL
188 // while worker.len() < count {
189 // FIXME the new deque doesn't let us count items. We can just
190 // push a number of items, but that doesn't consider active
191 // stealers elsewhere.
192 for _ in 0..count {
532ac7d7 193 if let Some(it) = iter.next() {
e74abb32 194 worker.push(it);
532ac7d7
XL
195 } else {
196 self.done.store(true, Ordering::SeqCst);
197 break;
198 }
199 }
200 }
201 Err(TryLockError::WouldBlock) => {
202 // someone else has the mutex, just sit tight until it's ready
923072b8 203 yield_now(); //TODO: use a thread-pool-aware yield? (#548)
532ac7d7
XL
204 }
205 Err(TryLockError::Poisoned(_)) => {
206 // any panics from other threads will have been caught by the pool,
207 // and will be re-thrown when joined - just exit
208 return folder;
209 }
210 }
211 }
212 }
213 Steal::Retry => (),
214 }
215 }
216 }
217}