]>
Commit | Line | Data |
---|---|---|
e74abb32 | 1 | use crossbeam_deque::{Steal, Stealer, Worker}; |
532ac7d7 XL |
2 | |
3 | use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; | |
4 | use std::sync::{Mutex, TryLockError}; | |
5 | use std::thread::yield_now; | |
6 | ||
6a06907d XL |
7 | use crate::current_num_threads; |
8 | use crate::iter::plumbing::{bridge_unindexed, Folder, UnindexedConsumer, UnindexedProducer}; | |
9 | use crate::iter::ParallelIterator; | |
532ac7d7 XL |
10 | |
11 | /// Conversion trait to convert an `Iterator` to a `ParallelIterator`. | |
12 | /// | |
13 | /// This creates a "bridge" from a sequential iterator to a parallel one, by distributing its items | |
14 | /// across the Rayon thread pool. This has the advantage of being able to parallelize just about | |
15 | /// anything, but the resulting `ParallelIterator` can be less efficient than if you started with | |
16 | /// `par_iter` instead. However, it can still be useful for iterators that are difficult to | |
17 | /// parallelize by other means, like channels or file or network I/O. | |
18 | /// | |
19 | /// The resulting iterator is not guaranteed to keep the order of the original iterator. | |
20 | /// | |
21 | /// # Examples | |
22 | /// | |
23 | /// To use this trait, take an existing `Iterator` and call `par_bridge` on it. After that, you can | |
24 | /// use any of the `ParallelIterator` methods: | |
25 | /// | |
26 | /// ``` | |
27 | /// use rayon::iter::ParallelBridge; | |
28 | /// use rayon::prelude::ParallelIterator; | |
29 | /// use std::sync::mpsc::channel; | |
30 | /// | |
31 | /// let rx = { | |
32 | /// let (tx, rx) = channel(); | |
33 | /// | |
34 | /// tx.send("one!"); | |
35 | /// tx.send("two!"); | |
36 | /// tx.send("three!"); | |
37 | /// | |
38 | /// rx | |
39 | /// }; | |
40 | /// | |
41 | /// let mut output: Vec<&'static str> = rx.into_iter().par_bridge().collect(); | |
42 | /// output.sort_unstable(); | |
43 | /// | |
44 | /// assert_eq!(&*output, &["one!", "three!", "two!"]); | |
45 | /// ``` | |
46 | pub trait ParallelBridge: Sized { | |
923072b8 | 47 | /// Creates a bridge from this type to a `ParallelIterator`. |
532ac7d7 XL |
48 | fn par_bridge(self) -> IterBridge<Self>; |
49 | } | |
50 | ||
51 | impl<T: Iterator + Send> ParallelBridge for T | |
52 | where | |
53 | T::Item: Send, | |
54 | { | |
55 | fn par_bridge(self) -> IterBridge<Self> { | |
56 | IterBridge { iter: self } | |
57 | } | |
58 | } | |
59 | ||
60 | /// `IterBridge` is a parallel iterator that wraps a sequential iterator. | |
61 | /// | |
62 | /// This type is created when using the `par_bridge` method on `ParallelBridge`. See the | |
63 | /// [`ParallelBridge`] documentation for details. | |
64 | /// | |
65 | /// [`ParallelBridge`]: trait.ParallelBridge.html | |
66 | #[derive(Debug, Clone)] | |
67 | pub struct IterBridge<Iter> { | |
68 | iter: Iter, | |
69 | } | |
70 | ||
71 | impl<Iter: Iterator + Send> ParallelIterator for IterBridge<Iter> | |
72 | where | |
73 | Iter::Item: Send, | |
74 | { | |
75 | type Item = Iter::Item; | |
76 | ||
77 | fn drive_unindexed<C>(self, consumer: C) -> C::Result | |
78 | where | |
79 | C: UnindexedConsumer<Self::Item>, | |
80 | { | |
81 | let split_count = AtomicUsize::new(current_num_threads()); | |
e74abb32 XL |
82 | let worker = Worker::new_fifo(); |
83 | let stealer = worker.stealer(); | |
532ac7d7 | 84 | let done = AtomicBool::new(false); |
e74abb32 | 85 | let iter = Mutex::new((self.iter, worker)); |
532ac7d7 XL |
86 | |
87 | bridge_unindexed( | |
88 | IterParallelProducer { | |
89 | split_count: &split_count, | |
90 | done: &done, | |
91 | iter: &iter, | |
92 | items: stealer, | |
93 | }, | |
94 | consumer, | |
95 | ) | |
96 | } | |
97 | } | |
98 | ||
6a06907d | 99 | struct IterParallelProducer<'a, Iter: Iterator> { |
532ac7d7 XL |
100 | split_count: &'a AtomicUsize, |
101 | done: &'a AtomicBool, | |
e74abb32 | 102 | iter: &'a Mutex<(Iter, Worker<Iter::Item>)>, |
532ac7d7 XL |
103 | items: Stealer<Iter::Item>, |
104 | } | |
105 | ||
106 | // manual clone because T doesn't need to be Clone, but the derive assumes it should be | |
107 | impl<'a, Iter: Iterator + 'a> Clone for IterParallelProducer<'a, Iter> { | |
108 | fn clone(&self) -> Self { | |
109 | IterParallelProducer { | |
110 | split_count: self.split_count, | |
111 | done: self.done, | |
112 | iter: self.iter, | |
113 | items: self.items.clone(), | |
114 | } | |
115 | } | |
116 | } | |
117 | ||
118 | impl<'a, Iter: Iterator + Send + 'a> UnindexedProducer for IterParallelProducer<'a, Iter> | |
119 | where | |
120 | Iter::Item: Send, | |
121 | { | |
122 | type Item = Iter::Item; | |
123 | ||
124 | fn split(self) -> (Self, Option<Self>) { | |
125 | let mut count = self.split_count.load(Ordering::SeqCst); | |
126 | ||
127 | loop { | |
923072b8 FG |
128 | // Check if the iterator is exhausted *and* we've consumed every item from it. |
129 | let done = self.done.load(Ordering::SeqCst) && self.items.is_empty(); | |
130 | ||
532ac7d7 XL |
131 | match count.checked_sub(1) { |
132 | Some(new_count) if !done => { | |
923072b8 FG |
133 | match self.split_count.compare_exchange_weak( |
134 | count, | |
135 | new_count, | |
136 | Ordering::SeqCst, | |
137 | Ordering::SeqCst, | |
138 | ) { | |
139 | Ok(_) => return (self.clone(), Some(self)), | |
140 | Err(last_count) => count = last_count, | |
532ac7d7 XL |
141 | } |
142 | } | |
143 | _ => { | |
144 | return (self, None); | |
145 | } | |
146 | } | |
147 | } | |
148 | } | |
149 | ||
150 | fn fold_with<F>(self, mut folder: F) -> F | |
151 | where | |
152 | F: Folder<Self::Item>, | |
153 | { | |
154 | loop { | |
155 | match self.items.steal() { | |
e74abb32 | 156 | Steal::Success(it) => { |
532ac7d7 XL |
157 | folder = folder.consume(it); |
158 | if folder.full() { | |
159 | return folder; | |
160 | } | |
161 | } | |
162 | Steal::Empty => { | |
923072b8 | 163 | // Don't storm the mutex if we're already done. |
532ac7d7 | 164 | if self.done.load(Ordering::SeqCst) { |
923072b8 FG |
165 | // Someone might have pushed more between our `steal()` and `done.load()` |
166 | if self.items.is_empty() { | |
167 | // The iterator is out of items, no use in continuing | |
168 | return folder; | |
169 | } | |
532ac7d7 XL |
170 | } else { |
171 | // our cache is out of items, time to load more from the iterator | |
172 | match self.iter.try_lock() { | |
173 | Ok(mut guard) => { | |
923072b8 FG |
174 | // Check `done` again in case we raced with the previous lock |
175 | // holder on its way out. | |
176 | if self.done.load(Ordering::SeqCst) { | |
177 | if self.items.is_empty() { | |
178 | return folder; | |
179 | } | |
180 | continue; | |
181 | } | |
182 | ||
532ac7d7 XL |
183 | let count = current_num_threads(); |
184 | let count = (count * count) * 2; | |
185 | ||
e74abb32 | 186 | let (ref mut iter, ref worker) = *guard; |
532ac7d7 | 187 | |
e74abb32 XL |
188 | // while worker.len() < count { |
189 | // FIXME the new deque doesn't let us count items. We can just | |
190 | // push a number of items, but that doesn't consider active | |
191 | // stealers elsewhere. | |
192 | for _ in 0..count { | |
532ac7d7 | 193 | if let Some(it) = iter.next() { |
e74abb32 | 194 | worker.push(it); |
532ac7d7 XL |
195 | } else { |
196 | self.done.store(true, Ordering::SeqCst); | |
197 | break; | |
198 | } | |
199 | } | |
200 | } | |
201 | Err(TryLockError::WouldBlock) => { | |
202 | // someone else has the mutex, just sit tight until it's ready | |
923072b8 | 203 | yield_now(); //TODO: use a thread-pool-aware yield? (#548) |
532ac7d7 XL |
204 | } |
205 | Err(TryLockError::Poisoned(_)) => { | |
206 | // any panics from other threads will have been caught by the pool, | |
207 | // and will be re-thrown when joined - just exit | |
208 | return folder; | |
209 | } | |
210 | } | |
211 | } | |
212 | } | |
213 | Steal::Retry => (), | |
214 | } | |
215 | } | |
216 | } | |
217 | } |