]>
Commit | Line | Data |
---|---|---|
2c00a5a8 XL |
1 | use super::plumbing::*; |
2 | use super::*; | |
532ac7d7 | 3 | use std::sync::atomic::{AtomicBool, Ordering}; |
2c00a5a8 | 4 | |
e74abb32 | 5 | pub(super) fn find<I, P>(pi: I, find_op: P) -> Option<I::Item> |
532ac7d7 XL |
6 | where |
7 | I: ParallelIterator, | |
8 | P: Fn(&I::Item) -> bool + Sync, | |
2c00a5a8 XL |
9 | { |
10 | let found = AtomicBool::new(false); | |
11 | let consumer = FindConsumer::new(&find_op, &found); | |
12 | pi.drive_unindexed(consumer) | |
13 | } | |
14 | ||
6a06907d | 15 | struct FindConsumer<'p, P> { |
2c00a5a8 XL |
16 | find_op: &'p P, |
17 | found: &'p AtomicBool, | |
18 | } | |
19 | ||
20 | impl<'p, P> FindConsumer<'p, P> { | |
21 | fn new(find_op: &'p P, found: &'p AtomicBool) -> Self { | |
e74abb32 | 22 | FindConsumer { find_op, found } |
2c00a5a8 XL |
23 | } |
24 | } | |
25 | ||
26 | impl<'p, T, P: 'p> Consumer<T> for FindConsumer<'p, P> | |
532ac7d7 XL |
27 | where |
28 | T: Send, | |
29 | P: Fn(&T) -> bool + Sync, | |
2c00a5a8 XL |
30 | { |
31 | type Folder = FindFolder<'p, T, P>; | |
32 | type Reducer = FindReducer; | |
33 | type Result = Option<T>; | |
34 | ||
35 | fn split_at(self, _index: usize) -> (Self, Self, Self::Reducer) { | |
36 | (self.split_off_left(), self, FindReducer) | |
37 | } | |
38 | ||
39 | fn into_folder(self) -> Self::Folder { | |
40 | FindFolder { | |
41 | find_op: self.find_op, | |
42 | found: self.found, | |
43 | item: None, | |
44 | } | |
45 | } | |
46 | ||
47 | fn full(&self) -> bool { | |
48 | self.found.load(Ordering::Relaxed) | |
49 | } | |
50 | } | |
51 | ||
2c00a5a8 | 52 | impl<'p, T, P: 'p> UnindexedConsumer<T> for FindConsumer<'p, P> |
532ac7d7 XL |
53 | where |
54 | T: Send, | |
55 | P: Fn(&T) -> bool + Sync, | |
2c00a5a8 XL |
56 | { |
57 | fn split_off_left(&self) -> Self { | |
58 | FindConsumer::new(self.find_op, self.found) | |
59 | } | |
60 | ||
61 | fn to_reducer(&self) -> Self::Reducer { | |
62 | FindReducer | |
63 | } | |
64 | } | |
65 | ||
6a06907d | 66 | struct FindFolder<'p, T, P> { |
2c00a5a8 XL |
67 | find_op: &'p P, |
68 | found: &'p AtomicBool, | |
69 | item: Option<T>, | |
70 | } | |
71 | ||
72 | impl<'p, T, P> Folder<T> for FindFolder<'p, T, P> | |
532ac7d7 XL |
73 | where |
74 | P: Fn(&T) -> bool + 'p, | |
2c00a5a8 XL |
75 | { |
76 | type Result = Option<T>; | |
77 | ||
78 | fn consume(mut self, item: T) -> Self { | |
79 | if (self.find_op)(&item) { | |
80 | self.found.store(true, Ordering::Relaxed); | |
81 | self.item = Some(item); | |
82 | } | |
83 | self | |
84 | } | |
85 | ||
e74abb32 XL |
86 | fn consume_iter<I>(mut self, iter: I) -> Self |
87 | where | |
88 | I: IntoIterator<Item = T>, | |
89 | { | |
90 | fn not_full<T>(found: &AtomicBool) -> impl Fn(&T) -> bool + '_ { | |
91 | move |_| !found.load(Ordering::Relaxed) | |
92 | } | |
93 | ||
94 | self.item = iter | |
95 | .into_iter() | |
96 | // stop iterating if another thread has found something | |
97 | .take_while(not_full(&self.found)) | |
98 | .find(self.find_op); | |
99 | if self.item.is_some() { | |
100 | self.found.store(true, Ordering::Relaxed) | |
101 | } | |
102 | self | |
103 | } | |
104 | ||
2c00a5a8 XL |
105 | fn complete(self) -> Self::Result { |
106 | self.item | |
107 | } | |
108 | ||
109 | fn full(&self) -> bool { | |
110 | self.found.load(Ordering::Relaxed) | |
111 | } | |
112 | } | |
113 | ||
2c00a5a8 XL |
114 | struct FindReducer; |
115 | ||
116 | impl<T> Reducer<Option<T>> for FindReducer { | |
117 | fn reduce(self, left: Option<T>, right: Option<T>) -> Option<T> { | |
118 | left.or(right) | |
119 | } | |
120 | } |