]> git.proxmox.com Git - rustc.git/blob - src/vendor/rustc-rayon/src/iter/collect/mod.rs
New upstream version 1.28.0~beta.14+dfsg1
[rustc.git] / src / vendor / rustc-rayon / src / iter / collect / mod.rs
1 use super::{ParallelIterator, IndexedParallelIterator, IntoParallelIterator, ParallelExtend};
2 use std::collections::LinkedList;
3 use std::slice;
4 use std::sync::atomic::{AtomicUsize, Ordering};
5
6 mod consumer;
7 use self::consumer::CollectConsumer;
8 use super::unzip::unzip_indexed;
9
10 mod test;
11
12 /// Collects the results of the exact iterator into the specified vector.
13 ///
14 /// This is not directly public, but called by `IndexedParallelIterator::collect_into_vec`.
15 pub fn collect_into_vec<I, T>(pi: I, v: &mut Vec<T>)
16 where I: IndexedParallelIterator<Item = T>,
17 T: Send
18 {
19 v.truncate(0); // clear any old data
20 let mut collect = Collect::new(v, pi.len());
21 pi.drive(collect.as_consumer());
22 collect.complete();
23 }
24
25 /// Collects the results of the iterator into the specified vector.
26 ///
27 /// Technically, this only works for `IndexedParallelIterator`, but we're faking a
28 /// bit of specialization here until Rust can do that natively. Callers are
29 /// using `opt_len` to find the length before calling this, and only exact
30 /// iterators will return anything but `None` there.
31 ///
32 /// Since the type system doesn't understand that contract, we have to allow
33 /// *any* `ParallelIterator` here, and `CollectConsumer` has to also implement
34 /// `UnindexedConsumer`. That implementation panics `unreachable!` in case
35 /// there's a bug where we actually do try to use this unindexed.
36 fn special_extend<I, T>(pi: I, len: usize, v: &mut Vec<T>)
37 where I: ParallelIterator<Item = T>,
38 T: Send
39 {
40 let mut collect = Collect::new(v, len);
41 pi.drive_unindexed(collect.as_consumer());
42 collect.complete();
43 }
44
45 /// Unzips the results of the exact iterator into the specified vectors.
46 ///
47 /// This is not directly public, but called by `IndexedParallelIterator::unzip_into_vecs`.
48 pub fn unzip_into_vecs<I, A, B>(pi: I, left: &mut Vec<A>, right: &mut Vec<B>)
49 where I: IndexedParallelIterator<Item = (A, B)>,
50 A: Send,
51 B: Send
52 {
53 // clear any old data
54 left.truncate(0);
55 right.truncate(0);
56
57 let len = pi.len();
58 let mut left = Collect::new(left, len);
59 let mut right = Collect::new(right, len);
60
61 unzip_indexed(pi, left.as_consumer(), right.as_consumer());
62
63 left.complete();
64 right.complete();
65 }
66
67
68 /// Manage the collection vector.
69 struct Collect<'c, T: Send + 'c> {
70 writes: AtomicUsize,
71 vec: &'c mut Vec<T>,
72 len: usize,
73 }
74
75 impl<'c, T: Send + 'c> Collect<'c, T> {
76 fn new(vec: &'c mut Vec<T>, len: usize) -> Self {
77 Collect {
78 writes: AtomicUsize::new(0),
79 vec: vec,
80 len: len,
81 }
82 }
83
84 /// Create a consumer on a slice of our memory.
85 fn as_consumer(&mut self) -> CollectConsumer<T> {
86 // Reserve the new space.
87 self.vec.reserve(self.len);
88
89 // Get a correct borrow, then extend it for the newly added length.
90 let start = self.vec.len();
91 let mut slice = &mut self.vec[start..];
92 slice = unsafe { slice::from_raw_parts_mut(slice.as_mut_ptr(), self.len) };
93 CollectConsumer::new(&self.writes, slice)
94 }
95
96 /// Update the final vector length.
97 fn complete(self) {
98 unsafe {
99 // Here, we assert that `v` is fully initialized. This is
100 // checked by the following assert, which counts how many
101 // total writes occurred. Since we know that the consumer
102 // cannot have escaped from `drive` (by parametricity,
103 // essentially), we know that any stores that will happen,
104 // have happened. Unless some code is buggy, that means we
105 // should have seen `len` total writes.
106 let actual_writes = self.writes.load(Ordering::Relaxed);
107 assert!(actual_writes == self.len,
108 "expected {} total writes, but got {}",
109 self.len,
110 actual_writes);
111 let new_len = self.vec.len() + self.len;
112 self.vec.set_len(new_len);
113 }
114 }
115 }
116
117
118 /// Extend a vector with items from a parallel iterator.
119 impl<T> ParallelExtend<T> for Vec<T>
120 where T: Send
121 {
122 fn par_extend<I>(&mut self, par_iter: I)
123 where I: IntoParallelIterator<Item = T>
124 {
125 // See the vec_collect benchmarks in rayon-demo for different strategies.
126 let par_iter = par_iter.into_par_iter();
127 match par_iter.opt_len() {
128 Some(len) => {
129 // When Rust gets specialization, we can get here for indexed iterators
130 // without relying on `opt_len`. Until then, `special_extend()` fakes
131 // an unindexed mode on the promise that `opt_len()` is accurate.
132 special_extend(par_iter, len, self);
133 }
134 None => {
135 // This works like `extend`, but `Vec::append` is more efficient.
136 let list: LinkedList<_> = par_iter
137 .fold(Vec::new, |mut vec, elem| {
138 vec.push(elem);
139 vec
140 })
141 .map(|vec| {
142 let mut list = LinkedList::new();
143 list.push_back(vec);
144 list
145 })
146 .reduce(LinkedList::new, |mut list1, mut list2| {
147 list1.append(&mut list2);
148 list1
149 });
150
151 self.reserve(list.iter().map(Vec::len).sum());
152 for mut vec in list {
153 self.append(&mut vec);
154 }
155 }
156 }
157 }
158 }