]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | ||
4 | ||
5 | #ifndef CEPH_OSDMAPMAPPING_H | |
6 | #define CEPH_OSDMAPMAPPING_H | |
7 | ||
8 | #include <vector> | |
9 | #include <map> | |
10 | ||
11 | #include "osd/osd_types.h" | |
12 | #include "common/WorkQueue.h" | |
11fdf7f2 | 13 | #include "common/Cond.h" |
7c673cae FG |
14 | |
15 | class OSDMap; | |
16 | ||
17 | /// work queue to perform work on batches of pgids on multiple CPUs | |
18 | class ParallelPGMapper { | |
19 | public: | |
20 | struct Job { | |
21 | utime_t start, finish; | |
22 | unsigned shards = 0; | |
23 | const OSDMap *osdmap; | |
24 | bool aborted = false; | |
25 | Context *onfinish = nullptr; | |
26 | ||
9f95a23c TL |
27 | ceph::mutex lock = ceph::make_mutex("ParallelPGMapper::Job::lock"); |
28 | ceph::condition_variable cond; | |
7c673cae FG |
29 | |
30 | Job(const OSDMap *om) : start(ceph_clock_now()), osdmap(om) {} | |
31 | virtual ~Job() { | |
11fdf7f2 | 32 | ceph_assert(shards == 0); |
7c673cae FG |
33 | } |
34 | ||
494da23a | 35 | // child must implement either form of process |
f67539c2 | 36 | virtual void process(const std::vector<pg_t>& pgs) = 0; |
7c673cae FG |
37 | virtual void process(int64_t poolid, unsigned ps_begin, unsigned ps_end) = 0; |
38 | virtual void complete() = 0; | |
39 | ||
40 | void set_finish_event(Context *fin) { | |
9f95a23c | 41 | lock.lock(); |
7c673cae FG |
42 | if (shards == 0) { |
43 | // already done. | |
9f95a23c | 44 | lock.unlock(); |
7c673cae FG |
45 | fin->complete(0); |
46 | } else { | |
47 | // set finisher | |
48 | onfinish = fin; | |
9f95a23c | 49 | lock.unlock(); |
7c673cae FG |
50 | } |
51 | } | |
52 | bool is_done() { | |
11fdf7f2 | 53 | std::lock_guard l(lock); |
7c673cae FG |
54 | return shards == 0; |
55 | } | |
56 | utime_t get_duration() { | |
57 | return finish - start; | |
58 | } | |
59 | void wait() { | |
9f95a23c TL |
60 | std::unique_lock l(lock); |
61 | cond.wait(l, [this] { return shards == 0; }); | |
7c673cae FG |
62 | } |
63 | bool wait_for(double duration) { | |
64 | utime_t until = start; | |
65 | until += duration; | |
9f95a23c | 66 | std::unique_lock l(lock); |
7c673cae FG |
67 | while (shards > 0) { |
68 | if (ceph_clock_now() >= until) { | |
69 | return false; | |
70 | } | |
9f95a23c | 71 | cond.wait(l); |
7c673cae FG |
72 | } |
73 | return true; | |
74 | } | |
75 | void abort() { | |
76 | Context *fin = nullptr; | |
77 | { | |
9f95a23c | 78 | std::unique_lock l(lock); |
7c673cae FG |
79 | aborted = true; |
80 | fin = onfinish; | |
81 | onfinish = nullptr; | |
9f95a23c | 82 | cond.wait(l, [this] { return shards == 0; }); |
7c673cae FG |
83 | } |
84 | if (fin) { | |
85 | fin->complete(-ECANCELED); | |
86 | } | |
87 | } | |
88 | ||
89 | void start_one() { | |
11fdf7f2 | 90 | std::lock_guard l(lock); |
7c673cae FG |
91 | ++shards; |
92 | } | |
93 | void finish_one(); | |
94 | }; | |
95 | ||
96 | protected: | |
97 | CephContext *cct; | |
98 | ||
99 | struct Item { | |
100 | Job *job; | |
101 | int64_t pool; | |
102 | unsigned begin, end; | |
f67539c2 | 103 | std::vector<pg_t> pgs; |
7c673cae | 104 | |
f67539c2 | 105 | Item(Job *j, std::vector<pg_t> pgs) : job(j), pgs(pgs) {} |
7c673cae FG |
106 | Item(Job *j, int64_t p, unsigned b, unsigned e) |
107 | : job(j), | |
108 | pool(p), | |
109 | begin(b), | |
110 | end(e) {} | |
111 | }; | |
112 | std::deque<Item*> q; | |
113 | ||
114 | struct WQ : public ThreadPool::WorkQueue<Item> { | |
115 | ParallelPGMapper *m; | |
116 | ||
117 | WQ(ParallelPGMapper *m_, ThreadPool *tp) | |
f67539c2 TL |
118 | : ThreadPool::WorkQueue<Item>("ParallelPGMapper::WQ", |
119 | ceph::timespan::zero(), | |
120 | ceph::timespan::zero(), | |
121 | tp), | |
7c673cae FG |
122 | m(m_) {} |
123 | ||
124 | bool _enqueue(Item *i) override { | |
125 | m->q.push_back(i); | |
126 | return true; | |
127 | } | |
128 | void _dequeue(Item *i) override { | |
129 | ceph_abort(); | |
130 | } | |
131 | Item *_dequeue() override { | |
132 | while (!m->q.empty()) { | |
133 | Item *i = m->q.front(); | |
134 | m->q.pop_front(); | |
135 | if (i->job->aborted) { | |
136 | i->job->finish_one(); | |
137 | delete i; | |
138 | } else { | |
139 | return i; | |
140 | } | |
141 | } | |
142 | return nullptr; | |
143 | } | |
144 | ||
145 | void _process(Item *i, ThreadPool::TPHandle &h) override; | |
146 | ||
147 | void _clear() override { | |
11fdf7f2 | 148 | ceph_assert(_empty()); |
7c673cae FG |
149 | } |
150 | ||
151 | bool _empty() override { | |
152 | return m->q.empty(); | |
153 | } | |
154 | } wq; | |
155 | ||
156 | public: | |
157 | ParallelPGMapper(CephContext *cct, ThreadPool *tp) | |
158 | : cct(cct), | |
159 | wq(this, tp) {} | |
160 | ||
161 | void queue( | |
162 | Job *job, | |
494da23a | 163 | unsigned pgs_per_item, |
f67539c2 | 164 | const std::vector<pg_t>& input_pgs); |
7c673cae FG |
165 | |
166 | void drain() { | |
167 | wq.drain(); | |
168 | } | |
169 | }; | |
170 | ||
171 | ||
172 | /// a precalculated mapping of every PG for a given OSDMap | |
173 | class OSDMapMapping { | |
174 | public: | |
175 | MEMPOOL_CLASS_HELPERS(); | |
176 | private: | |
177 | ||
178 | struct PoolMapping { | |
179 | MEMPOOL_CLASS_HELPERS(); | |
180 | ||
181 | unsigned size = 0; | |
182 | unsigned pg_num = 0; | |
11fdf7f2 | 183 | bool erasure = false; |
7c673cae FG |
184 | mempool::osdmap_mapping::vector<int32_t> table; |
185 | ||
186 | size_t row_size() const { | |
187 | return | |
188 | 1 + // acting_primary | |
189 | 1 + // up_primary | |
190 | 1 + // num acting | |
191 | 1 + // num up | |
192 | size + // acting | |
193 | size; // up | |
194 | } | |
195 | ||
11fdf7f2 | 196 | PoolMapping(int s, int p, bool e) |
7c673cae FG |
197 | : size(s), |
198 | pg_num(p), | |
11fdf7f2 | 199 | erasure(e), |
7c673cae FG |
200 | table(pg_num * row_size()) { |
201 | } | |
202 | ||
203 | void get(size_t ps, | |
204 | std::vector<int> *up, | |
205 | int *up_primary, | |
206 | std::vector<int> *acting, | |
207 | int *acting_primary) const { | |
208 | const int32_t *row = &table[row_size() * ps]; | |
209 | if (acting_primary) { | |
210 | *acting_primary = row[0]; | |
211 | } | |
212 | if (up_primary) { | |
213 | *up_primary = row[1]; | |
214 | } | |
215 | if (acting) { | |
216 | acting->resize(row[2]); | |
217 | for (int i = 0; i < row[2]; ++i) { | |
218 | (*acting)[i] = row[4 + i]; | |
219 | } | |
220 | } | |
221 | if (up) { | |
222 | up->resize(row[3]); | |
223 | for (int i = 0; i < row[3]; ++i) { | |
224 | (*up)[i] = row[4 + size + i]; | |
225 | } | |
226 | } | |
227 | } | |
228 | ||
229 | void set(size_t ps, | |
230 | const std::vector<int>& up, | |
231 | int up_primary, | |
232 | const std::vector<int>& acting, | |
233 | int acting_primary) { | |
234 | int32_t *row = &table[row_size() * ps]; | |
235 | row[0] = acting_primary; | |
236 | row[1] = up_primary; | |
11fdf7f2 TL |
237 | // these should always be <= the pool size, but just in case, avoid |
238 | // blowing out the array. Note that our mapping is not completely | |
239 | // accurate in this case--this is just to avoid crashing. | |
240 | row[2] = std::min<int32_t>(acting.size(), size); | |
241 | row[3] = std::min<int32_t>(up.size(), size); | |
7c673cae FG |
242 | for (int i = 0; i < row[2]; ++i) { |
243 | row[4 + i] = acting[i]; | |
244 | } | |
245 | for (int i = 0; i < row[3]; ++i) { | |
246 | row[4 + size + i] = up[i]; | |
247 | } | |
248 | } | |
249 | }; | |
250 | ||
251 | mempool::osdmap_mapping::map<int64_t,PoolMapping> pools; | |
252 | mempool::osdmap_mapping::vector< | |
253 | mempool::osdmap_mapping::vector<pg_t>> acting_rmap; // osd -> pg | |
254 | //unused: mempool::osdmap_mapping::vector<std::vector<pg_t>> up_rmap; // osd -> pg | |
b5b8bbf5 | 255 | epoch_t epoch = 0; |
7c673cae FG |
256 | uint64_t num_pgs = 0; |
257 | ||
258 | void _init_mappings(const OSDMap& osdmap); | |
259 | void _update_range( | |
260 | const OSDMap& map, | |
261 | int64_t pool, | |
262 | unsigned pg_begin, unsigned pg_end); | |
263 | ||
264 | void _build_rmap(const OSDMap& osdmap); | |
265 | ||
266 | void _start(const OSDMap& osdmap) { | |
267 | _init_mappings(osdmap); | |
268 | } | |
269 | void _finish(const OSDMap& osdmap); | |
270 | ||
271 | void _dump(); | |
272 | ||
273 | friend class ParallelPGMapper; | |
274 | ||
275 | struct MappingJob : public ParallelPGMapper::Job { | |
276 | OSDMapMapping *mapping; | |
277 | MappingJob(const OSDMap *osdmap, OSDMapMapping *m) | |
278 | : Job(osdmap), mapping(m) { | |
279 | mapping->_start(*osdmap); | |
280 | } | |
f67539c2 | 281 | void process(const std::vector<pg_t>& pgs) override {} |
7c673cae FG |
282 | void process(int64_t pool, unsigned ps_begin, unsigned ps_end) override { |
283 | mapping->_update_range(*osdmap, pool, ps_begin, ps_end); | |
284 | } | |
285 | void complete() override { | |
286 | mapping->_finish(*osdmap); | |
287 | } | |
288 | }; | |
289 | ||
290 | public: | |
291 | void get(pg_t pgid, | |
292 | std::vector<int> *up, | |
293 | int *up_primary, | |
294 | std::vector<int> *acting, | |
295 | int *acting_primary) const { | |
296 | auto p = pools.find(pgid.pool()); | |
11fdf7f2 TL |
297 | ceph_assert(p != pools.end()); |
298 | ceph_assert(pgid.ps() < p->second.pg_num); | |
7c673cae FG |
299 | p->second.get(pgid.ps(), up, up_primary, acting, acting_primary); |
300 | } | |
301 | ||
11fdf7f2 TL |
302 | bool get_primary_and_shard(pg_t pgid, |
303 | int *acting_primary, | |
304 | spg_t *spgid) { | |
305 | auto p = pools.find(pgid.pool()); | |
306 | ceph_assert(p != pools.end()); | |
307 | ceph_assert(pgid.ps() < p->second.pg_num); | |
9f95a23c | 308 | std::vector<int> acting; |
11fdf7f2 TL |
309 | p->second.get(pgid.ps(), nullptr, nullptr, &acting, acting_primary); |
310 | if (p->second.erasure) { | |
311 | for (uint8_t i = 0; i < acting.size(); ++i) { | |
312 | if (acting[i] == *acting_primary) { | |
313 | *spgid = spg_t(pgid, shard_id_t(i)); | |
314 | return true; | |
315 | } | |
316 | } | |
317 | return false; | |
318 | } else { | |
319 | *spgid = spg_t(pgid); | |
320 | return true; | |
321 | } | |
322 | } | |
323 | ||
7c673cae | 324 | const mempool::osdmap_mapping::vector<pg_t>& get_osd_acting_pgs(unsigned osd) { |
11fdf7f2 | 325 | ceph_assert(osd < acting_rmap.size()); |
7c673cae FG |
326 | return acting_rmap[osd]; |
327 | } | |
7c673cae FG |
328 | |
329 | void update(const OSDMap& map); | |
330 | void update(const OSDMap& map, pg_t pgid); | |
331 | ||
332 | std::unique_ptr<MappingJob> start_update( | |
333 | const OSDMap& map, | |
334 | ParallelPGMapper& mapper, | |
335 | unsigned pgs_per_item) { | |
336 | std::unique_ptr<MappingJob> job(new MappingJob(&map, this)); | |
494da23a | 337 | mapper.queue(job.get(), pgs_per_item, {}); |
7c673cae FG |
338 | return job; |
339 | } | |
340 | ||
341 | epoch_t get_epoch() const { | |
342 | return epoch; | |
343 | } | |
344 | ||
345 | uint64_t get_num_pgs() const { | |
346 | return num_pgs; | |
347 | } | |
348 | }; | |
349 | ||
350 | ||
351 | #endif |