]>
Commit | Line | Data |
---|---|---|
416331ca | 1 | use super::{IndexedParallelIterator, IntoParallelIterator, ParallelExtend, ParallelIterator}; |
2c00a5a8 | 2 | use std::slice; |
2c00a5a8 XL |
3 | |
4 | mod consumer; | |
5 | use self::consumer::CollectConsumer; | |
f035d41b | 6 | use self::consumer::CollectResult; |
2c00a5a8 XL |
7 | use super::unzip::unzip_indexed; |
8 | ||
9 | mod test; | |
10 | ||
11 | /// Collects the results of the exact iterator into the specified vector. | |
12 | /// | |
416331ca XL |
13 | /// This is called by `IndexedParallelIterator::collect_into_vec`. |
14 | pub(super) fn collect_into_vec<I, T>(pi: I, v: &mut Vec<T>) | |
15 | where | |
16 | I: IndexedParallelIterator<Item = T>, | |
17 | T: Send, | |
2c00a5a8 XL |
18 | { |
19 | v.truncate(0); // clear any old data | |
f035d41b XL |
20 | let len = pi.len(); |
21 | Collect::new(v, len).with_consumer(|consumer| pi.drive(consumer)); | |
2c00a5a8 XL |
22 | } |
23 | ||
24 | /// Collects the results of the iterator into the specified vector. | |
25 | /// | |
26 | /// Technically, this only works for `IndexedParallelIterator`, but we're faking a | |
27 | /// bit of specialization here until Rust can do that natively. Callers are | |
28 | /// using `opt_len` to find the length before calling this, and only exact | |
29 | /// iterators will return anything but `None` there. | |
30 | /// | |
31 | /// Since the type system doesn't understand that contract, we have to allow | |
32 | /// *any* `ParallelIterator` here, and `CollectConsumer` has to also implement | |
33 | /// `UnindexedConsumer`. That implementation panics `unreachable!` in case | |
34 | /// there's a bug where we actually do try to use this unindexed. | |
35 | fn special_extend<I, T>(pi: I, len: usize, v: &mut Vec<T>) | |
416331ca XL |
36 | where |
37 | I: ParallelIterator<Item = T>, | |
38 | T: Send, | |
2c00a5a8 | 39 | { |
f035d41b | 40 | Collect::new(v, len).with_consumer(|consumer| pi.drive_unindexed(consumer)); |
2c00a5a8 XL |
41 | } |
42 | ||
43 | /// Unzips the results of the exact iterator into the specified vectors. | |
44 | /// | |
416331ca XL |
45 | /// This is called by `IndexedParallelIterator::unzip_into_vecs`. |
46 | pub(super) fn unzip_into_vecs<I, A, B>(pi: I, left: &mut Vec<A>, right: &mut Vec<B>) | |
47 | where | |
48 | I: IndexedParallelIterator<Item = (A, B)>, | |
49 | A: Send, | |
50 | B: Send, | |
2c00a5a8 XL |
51 | { |
52 | // clear any old data | |
53 | left.truncate(0); | |
54 | right.truncate(0); | |
55 | ||
56 | let len = pi.len(); | |
f035d41b XL |
57 | Collect::new(right, len).with_consumer(|right_consumer| { |
58 | let mut right_result = None; | |
59 | Collect::new(left, len).with_consumer(|left_consumer| { | |
60 | let (left_r, right_r) = unzip_indexed(pi, left_consumer, right_consumer); | |
61 | right_result = Some(right_r); | |
62 | left_r | |
63 | }); | |
64 | right_result.unwrap() | |
65 | }); | |
2c00a5a8 XL |
66 | } |
67 | ||
2c00a5a8 | 68 | /// Manage the collection vector. |
f035d41b | 69 | struct Collect<'c, T: Send> { |
2c00a5a8 XL |
70 | vec: &'c mut Vec<T>, |
71 | len: usize, | |
72 | } | |
73 | ||
74 | impl<'c, T: Send + 'c> Collect<'c, T> { | |
75 | fn new(vec: &'c mut Vec<T>, len: usize) -> Self { | |
f035d41b | 76 | Collect { vec, len } |
2c00a5a8 XL |
77 | } |
78 | ||
f035d41b XL |
79 | /// Create a consumer on the slice of memory we are collecting into. |
80 | /// | |
81 | /// The consumer needs to be used inside the scope function, and the | |
82 | /// complete collect result passed back. | |
83 | /// | |
84 | /// This method will verify the collect result, and panic if the slice | |
85 | /// was not fully written into. Otherwise, in the successful case, | |
86 | /// the vector is complete with the collected result. | |
87 | fn with_consumer<F>(mut self, scope_fn: F) | |
88 | where | |
1b1a35ee | 89 | F: FnOnce(CollectConsumer<'_, T>) -> CollectResult<'_, T>, |
f035d41b | 90 | { |
2c00a5a8 | 91 | unsafe { |
f035d41b XL |
92 | let slice = Self::reserve_get_tail_slice(&mut self.vec, self.len); |
93 | let result = scope_fn(CollectConsumer::new(slice)); | |
94 | ||
95 | // The CollectResult represents a contiguous part of the | |
96 | // slice, that has been written to. | |
97 | // On unwind here, the CollectResult will be dropped. | |
98 | // If some producers on the way did not produce enough elements, | |
99 | // partial CollectResults may have been dropped without | |
100 | // being reduced to the final result, and we will see | |
101 | // that as the length coming up short. | |
102 | // | |
103 | // Here, we assert that `slice` is fully initialized. This is | |
104 | // checked by the following assert, which verifies if a | |
105 | // complete CollectResult was produced; if the length is | |
106 | // correct, it is necessarily covering the target slice. | |
107 | // Since we know that the consumer cannot have escaped from | |
108 | // `drive` (by parametricity, essentially), we know that any | |
109 | // stores that will happen, have happened. Unless some code is buggy, | |
110 | // that means we should have seen `len` total writes. | |
111 | let actual_writes = result.len(); | |
416331ca XL |
112 | assert!( |
113 | actual_writes == self.len, | |
114 | "expected {} total writes, but got {}", | |
115 | self.len, | |
116 | actual_writes | |
117 | ); | |
f035d41b XL |
118 | |
119 | // Release the result's mutable borrow and "proxy ownership" | |
120 | // of the elements, before the vector takes it over. | |
121 | result.release_ownership(); | |
122 | ||
2c00a5a8 XL |
123 | let new_len = self.vec.len() + self.len; |
124 | self.vec.set_len(new_len); | |
125 | } | |
126 | } | |
f035d41b XL |
127 | |
128 | /// Reserve space for `len` more elements in the vector, | |
129 | /// and return a slice to the uninitialized tail of the vector | |
130 | /// | |
131 | /// Safety: The tail slice is uninitialized | |
132 | unsafe fn reserve_get_tail_slice(vec: &mut Vec<T>, len: usize) -> &mut [T] { | |
133 | // Reserve the new space. | |
134 | vec.reserve(len); | |
135 | ||
136 | // Get a correct borrow, then extend it for the newly added length. | |
137 | let start = vec.len(); | |
138 | let slice = &mut vec[start..]; | |
139 | slice::from_raw_parts_mut(slice.as_mut_ptr(), len) | |
140 | } | |
2c00a5a8 XL |
141 | } |
142 | ||
f035d41b | 143 | /// Extends a vector with items from a parallel iterator. |
2c00a5a8 | 144 | impl<T> ParallelExtend<T> for Vec<T> |
416331ca XL |
145 | where |
146 | T: Send, | |
2c00a5a8 XL |
147 | { |
148 | fn par_extend<I>(&mut self, par_iter: I) | |
416331ca XL |
149 | where |
150 | I: IntoParallelIterator<Item = T>, | |
2c00a5a8 XL |
151 | { |
152 | // See the vec_collect benchmarks in rayon-demo for different strategies. | |
153 | let par_iter = par_iter.into_par_iter(); | |
154 | match par_iter.opt_len() { | |
155 | Some(len) => { | |
156 | // When Rust gets specialization, we can get here for indexed iterators | |
157 | // without relying on `opt_len`. Until then, `special_extend()` fakes | |
158 | // an unindexed mode on the promise that `opt_len()` is accurate. | |
159 | special_extend(par_iter, len, self); | |
160 | } | |
161 | None => { | |
162 | // This works like `extend`, but `Vec::append` is more efficient. | |
e74abb32 XL |
163 | let list = super::extend::collect(par_iter); |
164 | self.reserve(super::extend::len(&list)); | |
2c00a5a8 XL |
165 | for mut vec in list { |
166 | self.append(&mut vec); | |
167 | } | |
168 | } | |
169 | } | |
170 | } | |
171 | } |