]>
Commit | Line | Data |
---|---|---|
1 | //! Parallel iterator types for [results][std::result] | |
2 | //! | |
3 | //! You will rarely need to interact with this module directly unless you need | |
4 | //! to name one of the iterator types. | |
5 | //! | |
6 | //! [std::result]: https://doc.rust-lang.org/stable/std/result/ | |
7 | ||
8 | use iter::plumbing::*; | |
9 | use iter::*; | |
10 | use std::sync::Mutex; | |
11 | ||
12 | use option; | |
13 | ||
14 | /// Parallel iterator over a result | |
15 | #[derive(Debug, Clone)] | |
16 | pub struct IntoIter<T: Send> { | |
17 | inner: option::IntoIter<T>, | |
18 | } | |
19 | ||
20 | impl<T: Send, E> IntoParallelIterator for Result<T, E> { | |
21 | type Item = T; | |
22 | type Iter = IntoIter<T>; | |
23 | ||
24 | fn into_par_iter(self) -> Self::Iter { | |
25 | IntoIter { | |
26 | inner: self.ok().into_par_iter(), | |
27 | } | |
28 | } | |
29 | } | |
30 | ||
31 | delegate_indexed_iterator! { | |
32 | IntoIter<T> => T, | |
33 | impl<T: Send> | |
34 | } | |
35 | ||
36 | /// Parallel iterator over an immutable reference to a result | |
37 | #[derive(Debug)] | |
38 | pub struct Iter<'a, T: Sync + 'a> { | |
39 | inner: option::IntoIter<&'a T>, | |
40 | } | |
41 | ||
42 | impl<'a, T: Sync> Clone for Iter<'a, T> { | |
43 | fn clone(&self) -> Self { | |
44 | Iter { | |
45 | inner: self.inner.clone(), | |
46 | } | |
47 | } | |
48 | } | |
49 | ||
50 | impl<'a, T: Sync, E> IntoParallelIterator for &'a Result<T, E> { | |
51 | type Item = &'a T; | |
52 | type Iter = Iter<'a, T>; | |
53 | ||
54 | fn into_par_iter(self) -> Self::Iter { | |
55 | Iter { | |
56 | inner: self.as_ref().ok().into_par_iter(), | |
57 | } | |
58 | } | |
59 | } | |
60 | ||
61 | delegate_indexed_iterator! { | |
62 | Iter<'a, T> => &'a T, | |
63 | impl<'a, T: Sync + 'a> | |
64 | } | |
65 | ||
66 | /// Parallel iterator over a mutable reference to a result | |
67 | #[derive(Debug)] | |
68 | pub struct IterMut<'a, T: Send + 'a> { | |
69 | inner: option::IntoIter<&'a mut T>, | |
70 | } | |
71 | ||
72 | impl<'a, T: Send, E> IntoParallelIterator for &'a mut Result<T, E> { | |
73 | type Item = &'a mut T; | |
74 | type Iter = IterMut<'a, T>; | |
75 | ||
76 | fn into_par_iter(self) -> Self::Iter { | |
77 | IterMut { | |
78 | inner: self.as_mut().ok().into_par_iter(), | |
79 | } | |
80 | } | |
81 | } | |
82 | ||
83 | delegate_indexed_iterator! { | |
84 | IterMut<'a, T> => &'a mut T, | |
85 | impl<'a, T: Send + 'a> | |
86 | } | |
87 | ||
88 | /// Collect an arbitrary `Result`-wrapped collection. | |
89 | /// | |
90 | /// If any item is `Err`, then all previous `Ok` items collected are | |
91 | /// discarded, and it returns that error. If there are multiple errors, the | |
92 | /// one returned is not deterministic. | |
93 | impl<C, T, E> FromParallelIterator<Result<T, E>> for Result<C, E> | |
94 | where | |
95 | C: FromParallelIterator<T>, | |
96 | T: Send, | |
97 | E: Send, | |
98 | { | |
99 | fn from_par_iter<I>(par_iter: I) -> Self | |
100 | where | |
101 | I: IntoParallelIterator<Item = Result<T, E>>, | |
102 | { | |
103 | fn ok<T, E>(saved: &Mutex<Option<E>>) -> impl Fn(Result<T, E>) -> Option<T> + '_ { | |
104 | move |item| match item { | |
105 | Ok(item) => Some(item), | |
106 | Err(error) => { | |
107 | // We don't need a blocking `lock()`, as anybody | |
108 | // else holding the lock will also be writing | |
109 | // `Some(error)`, and then ours is irrelevant. | |
110 | if let Ok(mut guard) = saved.try_lock() { | |
111 | if guard.is_none() { | |
112 | *guard = Some(error); | |
113 | } | |
114 | } | |
115 | None | |
116 | } | |
117 | } | |
118 | } | |
119 | ||
120 | let saved_error = Mutex::new(None); | |
121 | let collection = par_iter | |
122 | .into_par_iter() | |
123 | .map(ok(&saved_error)) | |
124 | .while_some() | |
125 | .collect(); | |
126 | ||
127 | match saved_error.into_inner().unwrap() { | |
128 | Some(error) => Err(error), | |
129 | None => Ok(collection), | |
130 | } | |
131 | } | |
132 | } |