1 //! Traits and functions used to implement parallel iteration. These are
2 //! low-level details -- users of parallel iterators should not need to
3 //! interact with them directly. See [the `plumbing` README][r] for a general overview.
5 //! [r]: https://github.com/rayon-rs/rayon/blob/master/src/iter/plumbing/README.md
7 use crate::join_context
;
9 use super::IndexedParallelIterator
;
14 /// The `ProducerCallback` trait is a kind of generic closure,
15 /// [analogous to `FnOnce`][FnOnce]. See [the corresponding section in
16 /// the plumbing README][r] for more details.
18 /// [r]: https://github.com/rayon-rs/rayon/blob/master/src/iter/plumbing/README.md#producer-callback
19 /// [FnOnce]: https://doc.rust-lang.org/std/ops/trait.FnOnce.html
20 pub trait ProducerCallback
<T
> {
21 /// The type of value returned by this callback. Analogous to
22 /// [`Output` from the `FnOnce` trait][Output].
24 /// [Output]: https://doc.rust-lang.org/std/ops/trait.FnOnce.html#associatedtype.Output
27 /// Invokes the callback with the given producer as argument. The
28 /// key point of this trait is that this method is generic over
29 /// `P`, and hence implementors must be defined for any producer.
30 fn callback
<P
>(self, producer
: P
) -> Self::Output
32 P
: Producer
<Item
= T
>;
35 /// A `Producer` is effectively a "splittable `IntoIterator`". That
36 /// is, a producer is a value which can be converted into an iterator
37 /// at any time: at that point, it simply produces items on demand,
38 /// like any iterator. But what makes a `Producer` special is that,
39 /// *before* we convert to an iterator, we can also **split** it at a
40 /// particular point using the `split_at` method. This will yield up
41 /// two producers, one producing the items before that point, and one
42 /// producing the items after that point (these two producers can then
43 /// independently be split further, or be converted into iterators).
44 /// In Rayon, this splitting is used to divide between threads.
45 /// See [the `plumbing` README][r] for further details.
47 /// Note that each producer will always produce a fixed number of
48 /// items N. However, this number N is not queryable through the API;
49 /// the consumer is expected to track it.
51 /// NB. You might expect `Producer` to extend the `IntoIterator`
52 /// trait. However, [rust-lang/rust#20671][20671] prevents us from
53 /// declaring the DoubleEndedIterator and ExactSizeIterator
54 /// constraints on a required IntoIterator trait, so we inline
55 /// IntoIterator here until that issue is fixed.
57 /// [r]: https://github.com/rayon-rs/rayon/blob/master/src/iter/plumbing/README.md
58 /// [20671]: https://github.com/rust-lang/rust/issues/20671
59 pub trait Producer
: Send
+ Sized
{
60 /// The type of item that will be produced by this producer once
61 /// it is converted into an iterator.
64 /// The type of iterator we will become.
65 type IntoIter
: Iterator
<Item
= Self::Item
> + DoubleEndedIterator
+ ExactSizeIterator
;
67 /// Convert `self` into an iterator; at this point, no more parallel splits
69 fn into_iter(self) -> Self::IntoIter
;
71 /// The minimum number of items that we will process
72 /// sequentially. Defaults to 1, which means that we will split
73 /// all the way down to a single item. This can be raised higher
74 /// using the [`with_min_len`] method, which will force us to
75 /// create sequential tasks at a larger granularity. Note that
76 /// Rayon automatically normally attempts to adjust the size of
77 /// parallel splits to reduce overhead, so this should not be
80 /// [`with_min_len`]: ../trait.IndexedParallelIterator.html#method.with_min_len
81 fn min_len(&self) -> usize {
85 /// The maximum number of items that we will process
86 /// sequentially. Defaults to MAX, which means that we can choose
87 /// not to split at all. This can be lowered using the
88 /// [`with_max_len`] method, which will force us to create more
89 /// parallel tasks. Note that Rayon automatically normally
90 /// attempts to adjust the size of parallel splits to reduce
91 /// overhead, so this should not be needed.
93 /// [`with_max_len`]: ../trait.IndexedParallelIterator.html#method.with_max_len
94 fn max_len(&self) -> usize {
98 /// Split into two producers; one produces items `0..index`, the
99 /// other `index..N`. Index must be less than or equal to `N`.
100 fn split_at(self, index
: usize) -> (Self, Self);
102 /// Iterate the producer, feeding each element to `folder`, and
103 /// stop when the folder is full (or all elements have been consumed).
105 /// The provided implementation is sufficient for most iterables.
106 fn fold_with
<F
>(self, folder
: F
) -> F
108 F
: Folder
<Self::Item
>,
110 folder
.consume_iter(self.into_iter())
114 /// A consumer is effectively a [generalized "fold" operation][fold],
115 /// and in fact each consumer will eventually be converted into a
116 /// [`Folder`]. What makes a consumer special is that, like a
117 /// [`Producer`], it can be **split** into multiple consumers using
118 /// the `split_at` method. When a consumer is split, it produces two
119 /// consumers, as well as a **reducer**. The two consumers can be fed
120 /// items independently, and when they are done the reducer is used to
121 /// combine their two results into one. See [the `plumbing`
122 /// README][r] for further details.
124 /// [r]: https://github.com/rayon-rs/rayon/blob/master/src/iter/plumbing/README.md
125 /// [fold]: https://doc.rust-lang.org/std/iter/trait.Iterator.html#method.fold
126 /// [`Folder`]: trait.Folder.html
127 /// [`Producer`]: trait.Producer.html
128 pub trait Consumer
<Item
>: Send
+ Sized
{
129 /// The type of folder that this consumer can be converted into.
130 type Folder
: Folder
<Item
, Result
= Self::Result
>;
132 /// The type of reducer that is produced if this consumer is split.
133 type Reducer
: Reducer
<Self::Result
>;
135 /// The type of result that this consumer will ultimately produce.
138 /// Divide the consumer into two consumers, one processing items
139 /// `0..index` and one processing items from `index..`. Also
140 /// produces a reducer that can be used to reduce the results at
142 fn split_at(self, index
: usize) -> (Self, Self, Self::Reducer
);
144 /// Convert the consumer into a folder that can consume items
145 /// sequentially, eventually producing a final result.
146 fn into_folder(self) -> Self::Folder
;
148 /// Hint whether this `Consumer` would like to stop processing
149 /// further items, e.g. if a search has been completed.
150 fn full(&self) -> bool
;
153 /// The `Folder` trait encapsulates [the standard fold
154 /// operation][fold]. It can be fed many items using the `consume`
155 /// method. At the end, once all items have been consumed, it can then
156 /// be converted (using `complete`) into a final value.
158 /// [fold]: https://doc.rust-lang.org/std/iter/trait.Iterator.html#method.fold
159 pub trait Folder
<Item
>: Sized
{
160 /// The type of result that will ultimately be produced by the folder.
163 /// Consume next item and return new sequential state.
164 fn consume(self, item
: Item
) -> Self;
166 /// Consume items from the iterator until full, and return new sequential state.
168 /// This method is **optional**. The default simply iterates over
169 /// `iter`, invoking `consume` and checking after each iteration
170 /// whether `full` returns false.
172 /// The main reason to override it is if you can provide a more
173 /// specialized, efficient implementation.
174 fn consume_iter
<I
>(mut self, iter
: I
) -> Self
176 I
: IntoIterator
<Item
= Item
>,
179 self = self.consume(item
);
187 /// Finish consuming items, produce final result.
188 fn complete(self) -> Self::Result
;
190 /// Hint whether this `Folder` would like to stop processing
191 /// further items, e.g. if a search has been completed.
192 fn full(&self) -> bool
;
195 /// The reducer is the final step of a `Consumer` -- after a consumer
196 /// has been split into two parts, and each of those parts has been
197 /// fully processed, we are left with two results. The reducer is then
198 /// used to combine those two results into one. See [the `plumbing`
199 /// README][r] for further details.
201 /// [r]: https://github.com/rayon-rs/rayon/blob/master/src/iter/plumbing/README.md
202 pub trait Reducer
<Result
> {
203 /// Reduce two final results into one; this is executed after a
205 fn reduce(self, left
: Result
, right
: Result
) -> Result
;
208 /// A stateless consumer can be freely copied. These consumers can be
209 /// used like regular consumers, but they also support a
210 /// `split_off_left` method that does not take an index to split, but
211 /// simply splits at some arbitrary point (`for_each`, for example,
212 /// produces an unindexed consumer).
213 pub trait UnindexedConsumer
<I
>: Consumer
<I
> {
214 /// Splits off a "left" consumer and returns it. The `self`
215 /// consumer should then be used to consume the "right" portion of
216 /// the data. (The ordering matters for methods like find_first --
217 /// values produced by the returned value are given precedence
218 /// over values produced by `self`.) Once the left and right
219 /// halves have been fully consumed, you should reduce the results
220 /// with the result of `to_reducer`.
221 fn split_off_left(&self) -> Self;
223 /// Creates a reducer that can be used to combine the results from
224 /// a split consumer.
225 fn to_reducer(&self) -> Self::Reducer
;
228 /// A variant on `Producer` which does not know its exact length or
229 /// cannot represent it in a `usize`. These producers act like
230 /// ordinary producers except that they cannot be told to split at a
231 /// particular point. Instead, you just ask them to split 'somewhere'.
233 /// (In principle, `Producer` could extend this trait; however, it
234 /// does not because to do so would require producers to carry their
235 /// own length with them.)
236 pub trait UnindexedProducer
: Send
+ Sized
{
237 /// The type of item returned by this producer.
240 /// Split midway into a new producer if possible, otherwise return `None`.
241 fn split(self) -> (Self, Option
<Self>);
243 /// Iterate the producer, feeding each element to `folder`, and
244 /// stop when the folder is full (or all elements have been consumed).
245 fn fold_with
<F
>(self, folder
: F
) -> F
247 F
: Folder
<Self::Item
>;
250 /// A splitter controls the policy for splitting into smaller work items.
252 /// Thief-splitting is an adaptive policy that starts by splitting into
253 /// enough jobs for every worker thread, and then resets itself whenever a
254 /// job is actually stolen into a different thread.
255 #[derive(Clone, Copy)]
257 /// The `splits` tell us approximately how many remaining times we'd
258 /// like to split this job. We always just divide it by two though, so
259 /// the effective number of pieces will be `next_power_of_two()`.
265 fn new() -> Splitter
{
267 splits
: crate::current_num_threads(),
272 fn try_split(&mut self, stolen
: bool
) -> bool
{
273 let Splitter { splits }
= *self;
276 // This job was stolen! Reset the number of desired splits to the
277 // thread count, if that's more than we had remaining anyway.
278 self.splits
= cmp
::max(crate::current_num_threads(), self.splits
/ 2);
280 } else if splits
> 0 {
281 // We have splits remaining, make it so.
285 // Not stolen, and no more splits -- we're done!
291 /// The length splitter is built on thief-splitting, but additionally takes
292 /// into account the remaining length of the iterator.
293 #[derive(Clone, Copy)]
294 struct LengthSplitter
{
297 /// The smallest we're willing to divide into. Usually this is just 1,
298 /// but you can choose a larger working size with `with_min_len()`.
302 impl LengthSplitter
{
303 /// Create a new splitter based on lengths.
305 /// The `min` is a hard lower bound. We'll never split below that, but
306 /// of course an iterator might start out smaller already.
308 /// The `max` is an upper bound on the working size, used to determine
309 /// the minimum number of times we need to split to get under that limit.
310 /// The adaptive algorithm may very well split even further, but never
311 /// smaller than the `min`.
313 fn new(min
: usize, max
: usize, len
: usize) -> LengthSplitter
{
314 let mut splitter
= LengthSplitter
{
315 inner
: Splitter
::new(),
316 min
: cmp
::max(min
, 1),
319 // Divide the given length by the max working length to get the minimum
320 // number of splits we need to get under that max. This rounds down,
321 // but the splitter actually gives `next_power_of_two()` pieces anyway.
322 // e.g. len 12345 / max 100 = 123 min_splits -> 128 pieces.
323 let min_splits
= len
/ cmp
::max(max
, 1);
325 // Only update the value if it's not splitting enough already.
326 if min_splits
> splitter
.inner
.splits
{
327 splitter
.inner
.splits
= min_splits
;
334 fn try_split(&mut self, len
: usize, stolen
: bool
) -> bool
{
335 // If splitting wouldn't make us too small, try the inner splitter.
336 len
/ 2 >= self.min
&& self.inner
.try_split(stolen
)
340 /// This helper function is used to "connect" a parallel iterator to a
341 /// consumer. It will convert the `par_iter` into a producer P and
342 /// then pull items from P and feed them to `consumer`, splitting and
343 /// creating parallel threads as needed.
345 /// This is useful when you are implementing your own parallel
346 /// iterators: it is often used as the definition of the
347 /// [`drive_unindexed`] or [`drive`] methods.
349 /// [`drive_unindexed`]: ../trait.ParallelIterator.html#tymethod.drive_unindexed
350 /// [`drive`]: ../trait.IndexedParallelIterator.html#tymethod.drive
351 pub fn bridge
<I
, C
>(par_iter
: I
, consumer
: C
) -> C
::Result
353 I
: IndexedParallelIterator
,
354 C
: Consumer
<I
::Item
>,
356 let len
= par_iter
.len();
357 return par_iter
.with_producer(Callback { len, consumer }
);
364 impl<C
, I
> ProducerCallback
<I
> for Callback
<C
>
368 type Output
= C
::Result
;
369 fn callback
<P
>(self, producer
: P
) -> C
::Result
371 P
: Producer
<Item
= I
>,
373 bridge_producer_consumer(self.len
, producer
, self.consumer
)
378 /// This helper function is used to "connect" a producer and a
379 /// consumer. You may prefer to call [`bridge`], which wraps this
380 /// function. This function will draw items from `producer` and feed
381 /// them to `consumer`, splitting and creating parallel tasks when
384 /// This is useful when you are implementing your own parallel
385 /// iterators: it is often used as the definition of the
386 /// [`drive_unindexed`] or [`drive`] methods.
388 /// [`bridge`]: fn.bridge.html
389 /// [`drive_unindexed`]: ../trait.ParallelIterator.html#tymethod.drive_unindexed
390 /// [`drive`]: ../trait.IndexedParallelIterator.html#tymethod.drive
391 pub fn bridge_producer_consumer
<P
, C
>(len
: usize, producer
: P
, consumer
: C
) -> C
::Result
394 C
: Consumer
<P
::Item
>,
396 let splitter
= LengthSplitter
::new(producer
.min_len(), producer
.max_len(), len
);
397 return helper(len
, false, splitter
, producer
, consumer
);
402 mut splitter
: LengthSplitter
,
408 C
: Consumer
<P
::Item
>,
411 consumer
.into_folder().complete()
412 } else if splitter
.try_split(len
, migrated
) {
414 let (left_producer
, right_producer
) = producer
.split_at(mid
);
415 let (left_consumer
, right_consumer
, reducer
) = consumer
.split_at(mid
);
416 let (left_result
, right_result
) = join_context(
436 reducer
.reduce(left_result
, right_result
)
438 producer
.fold_with(consumer
.into_folder()).complete()
443 /// A variant of [`bridge_producer_consumer`] where the producer is an unindexed producer.
445 /// [`bridge_producer_consumer`]: fn.bridge_producer_consumer.html
446 pub fn bridge_unindexed
<P
, C
>(producer
: P
, consumer
: C
) -> C
::Result
448 P
: UnindexedProducer
,
449 C
: UnindexedConsumer
<P
::Item
>,
451 let splitter
= Splitter
::new();
452 bridge_unindexed_producer_consumer(false, splitter
, producer
, consumer
)
455 fn bridge_unindexed_producer_consumer
<P
, C
>(
457 mut splitter
: Splitter
,
462 P
: UnindexedProducer
,
463 C
: UnindexedConsumer
<P
::Item
>,
466 consumer
.into_folder().complete()
467 } else if splitter
.try_split(migrated
) {
468 match producer
.split() {
469 (left_producer
, Some(right_producer
)) => {
470 let (reducer
, left_consumer
, right_consumer
) =
471 (consumer
.to_reducer(), consumer
.split_off_left(), consumer
);
472 let bridge
= bridge_unindexed_producer_consumer
;
473 let (left_result
, right_result
) = join_context(
474 |context
| bridge(context
.migrated(), splitter
, left_producer
, left_consumer
),
475 |context
| bridge(context
.migrated(), splitter
, right_producer
, right_consumer
),
477 reducer
.reduce(left_result
, right_result
)
479 (producer
, None
) => producer
.fold_with(consumer
.into_folder()).complete(),
482 producer
.fold_with(consumer
.into_folder()).complete()