]>
Commit | Line | Data |
---|---|---|
532ac7d7 XL |
1 | use super::plumbing::*; |
2 | use super::ParallelIterator; | |
3 | ||
4 | use super::private::Try; | |
5 | use std::sync::atomic::{AtomicBool, Ordering}; | |
6 | ||
e74abb32 | 7 | pub(super) fn try_reduce<PI, R, ID, T>(pi: PI, identity: ID, reduce_op: R) -> T |
532ac7d7 XL |
8 | where |
9 | PI: ParallelIterator<Item = T>, | |
10 | R: Fn(T::Ok, T::Ok) -> T + Sync, | |
11 | ID: Fn() -> T::Ok + Sync, | |
12 | T: Try + Send, | |
13 | { | |
14 | let full = AtomicBool::new(false); | |
15 | let consumer = TryReduceConsumer { | |
16 | identity: &identity, | |
17 | reduce_op: &reduce_op, | |
18 | full: &full, | |
19 | }; | |
20 | pi.drive_unindexed(consumer) | |
21 | } | |
22 | ||
6a06907d | 23 | struct TryReduceConsumer<'r, R, ID> { |
532ac7d7 XL |
24 | identity: &'r ID, |
25 | reduce_op: &'r R, | |
26 | full: &'r AtomicBool, | |
27 | } | |
28 | ||
29 | impl<'r, R, ID> Copy for TryReduceConsumer<'r, R, ID> {} | |
30 | ||
31 | impl<'r, R, ID> Clone for TryReduceConsumer<'r, R, ID> { | |
32 | fn clone(&self) -> Self { | |
33 | *self | |
34 | } | |
35 | } | |
36 | ||
37 | impl<'r, R, ID, T> Consumer<T> for TryReduceConsumer<'r, R, ID> | |
38 | where | |
39 | R: Fn(T::Ok, T::Ok) -> T + Sync, | |
40 | ID: Fn() -> T::Ok + Sync, | |
41 | T: Try + Send, | |
42 | { | |
43 | type Folder = TryReduceFolder<'r, R, T>; | |
44 | type Reducer = Self; | |
45 | type Result = T; | |
46 | ||
47 | fn split_at(self, _index: usize) -> (Self, Self, Self) { | |
48 | (self, self, self) | |
49 | } | |
50 | ||
51 | fn into_folder(self) -> Self::Folder { | |
52 | TryReduceFolder { | |
53 | reduce_op: self.reduce_op, | |
54 | result: Ok((self.identity)()), | |
55 | full: self.full, | |
56 | } | |
57 | } | |
58 | ||
59 | fn full(&self) -> bool { | |
60 | self.full.load(Ordering::Relaxed) | |
61 | } | |
62 | } | |
63 | ||
64 | impl<'r, R, ID, T> UnindexedConsumer<T> for TryReduceConsumer<'r, R, ID> | |
65 | where | |
66 | R: Fn(T::Ok, T::Ok) -> T + Sync, | |
67 | ID: Fn() -> T::Ok + Sync, | |
68 | T: Try + Send, | |
69 | { | |
70 | fn split_off_left(&self) -> Self { | |
71 | *self | |
72 | } | |
73 | ||
74 | fn to_reducer(&self) -> Self::Reducer { | |
75 | *self | |
76 | } | |
77 | } | |
78 | ||
79 | impl<'r, R, ID, T> Reducer<T> for TryReduceConsumer<'r, R, ID> | |
80 | where | |
81 | R: Fn(T::Ok, T::Ok) -> T + Sync, | |
82 | T: Try, | |
83 | { | |
84 | fn reduce(self, left: T, right: T) -> T { | |
85 | match (left.into_result(), right.into_result()) { | |
86 | (Ok(left), Ok(right)) => (self.reduce_op)(left, right), | |
87 | (Err(e), _) | (_, Err(e)) => T::from_error(e), | |
88 | } | |
89 | } | |
90 | } | |
91 | ||
6a06907d | 92 | struct TryReduceFolder<'r, R, T: Try> { |
532ac7d7 XL |
93 | reduce_op: &'r R, |
94 | result: Result<T::Ok, T::Error>, | |
95 | full: &'r AtomicBool, | |
96 | } | |
97 | ||
98 | impl<'r, R, T> Folder<T> for TryReduceFolder<'r, R, T> | |
99 | where | |
100 | R: Fn(T::Ok, T::Ok) -> T, | |
101 | T: Try, | |
102 | { | |
103 | type Result = T; | |
104 | ||
e74abb32 | 105 | fn consume(mut self, item: T) -> Self { |
532ac7d7 | 106 | let reduce_op = self.reduce_op; |
e74abb32 XL |
107 | if let Ok(left) = self.result { |
108 | self.result = match item.into_result() { | |
109 | Ok(right) => reduce_op(left, right).into_result(), | |
110 | Err(error) => Err(error), | |
111 | }; | |
532ac7d7 | 112 | } |
e74abb32 XL |
113 | if self.result.is_err() { |
114 | self.full.store(true, Ordering::Relaxed) | |
532ac7d7 | 115 | } |
e74abb32 | 116 | self |
532ac7d7 XL |
117 | } |
118 | ||
119 | fn complete(self) -> T { | |
120 | match self.result { | |
121 | Ok(ok) => T::from_ok(ok), | |
122 | Err(error) => T::from_error(error), | |
123 | } | |
124 | } | |
125 | ||
126 | fn full(&self) -> bool { | |
127 | self.full.load(Ordering::Relaxed) | |
128 | } | |
129 | } |