]>
Commit | Line | Data |
---|---|---|
064997fb FG |
1 | use crate::lock::RwLock; |
2 | use crate::mapref::multiple::{RefMulti, RefMutMulti}; | |
3 | use crate::util; | |
4 | use crate::{DashMap, HashMap}; | |
5 | use core::hash::{BuildHasher, Hash}; | |
6 | use rayon::iter::plumbing::UnindexedConsumer; | |
7 | use rayon::iter::{FromParallelIterator, IntoParallelIterator, ParallelExtend, ParallelIterator}; | |
8 | use std::collections::hash_map::RandomState; | |
9 | use std::sync::Arc; | |
10 | ||
11 | impl<K, V, S> ParallelExtend<(K, V)> for DashMap<K, V, S> | |
12 | where | |
13 | K: Send + Sync + Eq + Hash, | |
14 | V: Send + Sync, | |
15 | S: Send + Sync + Clone + BuildHasher, | |
16 | { | |
17 | fn par_extend<I>(&mut self, par_iter: I) | |
18 | where | |
19 | I: IntoParallelIterator<Item = (K, V)>, | |
20 | { | |
21 | (&*self).par_extend(par_iter); | |
22 | } | |
23 | } | |
24 | ||
25 | // Since we don't actually need mutability, we can implement this on a | |
26 | // reference, similar to `io::Write for &File`. | |
27 | impl<K, V, S> ParallelExtend<(K, V)> for &'_ DashMap<K, V, S> | |
28 | where | |
29 | K: Send + Sync + Eq + Hash, | |
30 | V: Send + Sync, | |
31 | S: Send + Sync + Clone + BuildHasher, | |
32 | { | |
33 | fn par_extend<I>(&mut self, par_iter: I) | |
34 | where | |
35 | I: IntoParallelIterator<Item = (K, V)>, | |
36 | { | |
37 | let &mut map = self; | |
38 | par_iter.into_par_iter().for_each(move |(key, value)| { | |
39 | map.insert(key, value); | |
40 | }); | |
41 | } | |
42 | } | |
43 | ||
44 | impl<K, V, S> FromParallelIterator<(K, V)> for DashMap<K, V, S> | |
45 | where | |
46 | K: Send + Sync + Eq + Hash, | |
47 | V: Send + Sync, | |
48 | S: Send + Sync + Clone + Default + BuildHasher, | |
49 | { | |
50 | fn from_par_iter<I>(par_iter: I) -> Self | |
51 | where | |
52 | I: IntoParallelIterator<Item = (K, V)>, | |
53 | { | |
54 | let map = Self::default(); | |
55 | (&map).par_extend(par_iter); | |
56 | map | |
57 | } | |
58 | } | |
59 | ||
60 | // Implementation note: while the shards will iterate in parallel, we flatten | |
61 | // sequentially within each shard (`flat_map_iter`), because the standard | |
62 | // `HashMap` only implements `ParallelIterator` by collecting to a `Vec` first. | |
63 | // There is real parallel support in the `hashbrown/rayon` feature, but we don't | |
64 | // always use that map. | |
65 | ||
66 | impl<K, V, S> IntoParallelIterator for DashMap<K, V, S> | |
67 | where | |
68 | K: Send + Eq + Hash, | |
69 | V: Send, | |
70 | S: Send + Clone + BuildHasher, | |
71 | { | |
72 | type Iter = OwningIter<K, V, S>; | |
73 | type Item = (K, V); | |
74 | ||
75 | fn into_par_iter(self) -> Self::Iter { | |
76 | OwningIter { | |
77 | shards: self.shards, | |
78 | } | |
79 | } | |
80 | } | |
81 | ||
82 | pub struct OwningIter<K, V, S = RandomState> { | |
2b03887a | 83 | pub(super) shards: Box<[RwLock<HashMap<K, V, S>>]>, |
064997fb FG |
84 | } |
85 | ||
86 | impl<K, V, S> ParallelIterator for OwningIter<K, V, S> | |
87 | where | |
88 | K: Send + Eq + Hash, | |
89 | V: Send, | |
90 | S: Send + Clone + BuildHasher, | |
91 | { | |
92 | type Item = (K, V); | |
93 | ||
94 | fn drive_unindexed<C>(self, consumer: C) -> C::Result | |
95 | where | |
96 | C: UnindexedConsumer<Self::Item>, | |
97 | { | |
98 | Vec::from(self.shards) | |
99 | .into_par_iter() | |
100 | .flat_map_iter(|shard| { | |
101 | shard | |
102 | .into_inner() | |
103 | .into_iter() | |
104 | .map(|(k, v)| (k, v.into_inner())) | |
105 | }) | |
106 | .drive_unindexed(consumer) | |
107 | } | |
108 | } | |
109 | ||
110 | // This impl also enables `IntoParallelRefIterator::par_iter` | |
111 | impl<'a, K, V, S> IntoParallelIterator for &'a DashMap<K, V, S> | |
112 | where | |
113 | K: Send + Sync + Eq + Hash, | |
114 | V: Send + Sync, | |
115 | S: Send + Sync + Clone + BuildHasher, | |
116 | { | |
117 | type Iter = Iter<'a, K, V, S>; | |
118 | type Item = RefMulti<'a, K, V, S>; | |
119 | ||
120 | fn into_par_iter(self) -> Self::Iter { | |
121 | Iter { | |
122 | shards: &self.shards, | |
123 | } | |
124 | } | |
125 | } | |
126 | ||
127 | pub struct Iter<'a, K, V, S = RandomState> { | |
2b03887a | 128 | pub(super) shards: &'a [RwLock<HashMap<K, V, S>>], |
064997fb FG |
129 | } |
130 | ||
131 | impl<'a, K, V, S> ParallelIterator for Iter<'a, K, V, S> | |
132 | where | |
133 | K: Send + Sync + Eq + Hash, | |
134 | V: Send + Sync, | |
135 | S: Send + Sync + Clone + BuildHasher, | |
136 | { | |
137 | type Item = RefMulti<'a, K, V, S>; | |
138 | ||
139 | fn drive_unindexed<C>(self, consumer: C) -> C::Result | |
140 | where | |
141 | C: UnindexedConsumer<Self::Item>, | |
142 | { | |
143 | self.shards | |
144 | .into_par_iter() | |
145 | .flat_map_iter(|shard| { | |
146 | let guard = shard.read(); | |
147 | let sref: &'a HashMap<K, V, S> = unsafe { util::change_lifetime_const(&*guard) }; | |
148 | ||
149 | let guard = Arc::new(guard); | |
150 | sref.iter().map(move |(k, v)| { | |
151 | let guard = Arc::clone(&guard); | |
152 | unsafe { RefMulti::new(guard, k, v.get()) } | |
153 | }) | |
154 | }) | |
155 | .drive_unindexed(consumer) | |
156 | } | |
157 | } | |
158 | ||
159 | // This impl also enables `IntoParallelRefMutIterator::par_iter_mut` | |
160 | impl<'a, K, V, S> IntoParallelIterator for &'a mut DashMap<K, V, S> | |
161 | where | |
162 | K: Send + Sync + Eq + Hash, | |
163 | V: Send + Sync, | |
164 | S: Send + Sync + Clone + BuildHasher, | |
165 | { | |
166 | type Iter = IterMut<'a, K, V, S>; | |
167 | type Item = RefMutMulti<'a, K, V, S>; | |
168 | ||
169 | fn into_par_iter(self) -> Self::Iter { | |
170 | IterMut { | |
171 | shards: &self.shards, | |
172 | } | |
173 | } | |
174 | } | |
175 | ||
4b012472 | 176 | impl<K, V, S> DashMap<K, V, S> |
064997fb FG |
177 | where |
178 | K: Send + Sync + Eq + Hash, | |
179 | V: Send + Sync, | |
180 | S: Send + Sync + Clone + BuildHasher, | |
181 | { | |
182 | // Unlike `IntoParallelRefMutIterator::par_iter_mut`, we only _need_ `&self`. | |
183 | pub fn par_iter_mut(&self) -> IterMut<'_, K, V, S> { | |
184 | IterMut { | |
185 | shards: &self.shards, | |
186 | } | |
187 | } | |
188 | } | |
189 | ||
190 | pub struct IterMut<'a, K, V, S = RandomState> { | |
191 | shards: &'a [RwLock<HashMap<K, V, S>>], | |
192 | } | |
193 | ||
194 | impl<'a, K, V, S> ParallelIterator for IterMut<'a, K, V, S> | |
195 | where | |
196 | K: Send + Sync + Eq + Hash, | |
197 | V: Send + Sync, | |
198 | S: Send + Sync + Clone + BuildHasher, | |
199 | { | |
200 | type Item = RefMutMulti<'a, K, V, S>; | |
201 | ||
202 | fn drive_unindexed<C>(self, consumer: C) -> C::Result | |
203 | where | |
204 | C: UnindexedConsumer<Self::Item>, | |
205 | { | |
206 | self.shards | |
207 | .into_par_iter() | |
208 | .flat_map_iter(|shard| { | |
209 | let mut guard = shard.write(); | |
210 | let sref: &'a mut HashMap<K, V, S> = | |
211 | unsafe { util::change_lifetime_mut(&mut *guard) }; | |
212 | ||
213 | let guard = Arc::new(guard); | |
214 | sref.iter_mut().map(move |(k, v)| { | |
215 | let guard = Arc::clone(&guard); | |
216 | unsafe { RefMutMulti::new(guard, k, v.get_mut()) } | |
217 | }) | |
218 | }) | |
219 | .drive_unindexed(consumer) | |
220 | } | |
221 | } |