]> git.proxmox.com Git - ceph.git/blame - ceph/src/osd/OSDMapMapping.h
import 14.2.4 nautilus point release
[ceph.git] / ceph / src / osd / OSDMapMapping.h
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4
5#ifndef CEPH_OSDMAPMAPPING_H
6#define CEPH_OSDMAPMAPPING_H
7
8#include <vector>
9#include <map>
10
11#include "osd/osd_types.h"
12#include "common/WorkQueue.h"
11fdf7f2 13#include "common/Cond.h"
7c673cae
FG
14
15class OSDMap;
16
17/// work queue to perform work on batches of pgids on multiple CPUs
18class ParallelPGMapper {
19public:
20 struct Job {
21 utime_t start, finish;
22 unsigned shards = 0;
23 const OSDMap *osdmap;
24 bool aborted = false;
25 Context *onfinish = nullptr;
26
27 Mutex lock = {"ParallelPGMapper::Job::lock"};
28 Cond cond;
29
30 Job(const OSDMap *om) : start(ceph_clock_now()), osdmap(om) {}
31 virtual ~Job() {
11fdf7f2 32 ceph_assert(shards == 0);
7c673cae
FG
33 }
34
494da23a
TL
35 // child must implement either form of process
36 virtual void process(const vector<pg_t>& pgs) = 0;
7c673cae
FG
37 virtual void process(int64_t poolid, unsigned ps_begin, unsigned ps_end) = 0;
38 virtual void complete() = 0;
39
40 void set_finish_event(Context *fin) {
41 lock.Lock();
42 if (shards == 0) {
43 // already done.
44 lock.Unlock();
45 fin->complete(0);
46 } else {
47 // set finisher
48 onfinish = fin;
49 lock.Unlock();
50 }
51 }
52 bool is_done() {
11fdf7f2 53 std::lock_guard l(lock);
7c673cae
FG
54 return shards == 0;
55 }
56 utime_t get_duration() {
57 return finish - start;
58 }
59 void wait() {
11fdf7f2 60 std::lock_guard l(lock);
7c673cae
FG
61 while (shards > 0) {
62 cond.Wait(lock);
63 }
64 }
65 bool wait_for(double duration) {
66 utime_t until = start;
67 until += duration;
11fdf7f2 68 std::lock_guard l(lock);
7c673cae
FG
69 while (shards > 0) {
70 if (ceph_clock_now() >= until) {
71 return false;
72 }
73 cond.Wait(lock);
74 }
75 return true;
76 }
77 void abort() {
78 Context *fin = nullptr;
79 {
11fdf7f2 80 std::lock_guard l(lock);
7c673cae
FG
81 aborted = true;
82 fin = onfinish;
83 onfinish = nullptr;
84 while (shards > 0) {
85 cond.Wait(lock);
86 }
87 }
88 if (fin) {
89 fin->complete(-ECANCELED);
90 }
91 }
92
93 void start_one() {
11fdf7f2 94 std::lock_guard l(lock);
7c673cae
FG
95 ++shards;
96 }
97 void finish_one();
98 };
99
100protected:
101 CephContext *cct;
102
103 struct Item {
104 Job *job;
105 int64_t pool;
106 unsigned begin, end;
494da23a 107 vector<pg_t> pgs;
7c673cae 108
494da23a 109 Item(Job *j, vector<pg_t> pgs) : job(j), pgs(pgs) {}
7c673cae
FG
110 Item(Job *j, int64_t p, unsigned b, unsigned e)
111 : job(j),
112 pool(p),
113 begin(b),
114 end(e) {}
115 };
116 std::deque<Item*> q;
117
118 struct WQ : public ThreadPool::WorkQueue<Item> {
119 ParallelPGMapper *m;
120
121 WQ(ParallelPGMapper *m_, ThreadPool *tp)
122 : ThreadPool::WorkQueue<Item>("ParallelPGMapper::WQ", 0, 0, tp),
123 m(m_) {}
124
125 bool _enqueue(Item *i) override {
126 m->q.push_back(i);
127 return true;
128 }
129 void _dequeue(Item *i) override {
130 ceph_abort();
131 }
132 Item *_dequeue() override {
133 while (!m->q.empty()) {
134 Item *i = m->q.front();
135 m->q.pop_front();
136 if (i->job->aborted) {
137 i->job->finish_one();
138 delete i;
139 } else {
140 return i;
141 }
142 }
143 return nullptr;
144 }
145
146 void _process(Item *i, ThreadPool::TPHandle &h) override;
147
148 void _clear() override {
11fdf7f2 149 ceph_assert(_empty());
7c673cae
FG
150 }
151
152 bool _empty() override {
153 return m->q.empty();
154 }
155 } wq;
156
157public:
158 ParallelPGMapper(CephContext *cct, ThreadPool *tp)
159 : cct(cct),
160 wq(this, tp) {}
161
162 void queue(
163 Job *job,
494da23a
TL
164 unsigned pgs_per_item,
165 const vector<pg_t>& input_pgs);
7c673cae
FG
166
167 void drain() {
168 wq.drain();
169 }
170};
171
172
173/// a precalculated mapping of every PG for a given OSDMap
174class OSDMapMapping {
175public:
176 MEMPOOL_CLASS_HELPERS();
177private:
178
179 struct PoolMapping {
180 MEMPOOL_CLASS_HELPERS();
181
182 unsigned size = 0;
183 unsigned pg_num = 0;
11fdf7f2 184 bool erasure = false;
7c673cae
FG
185 mempool::osdmap_mapping::vector<int32_t> table;
186
187 size_t row_size() const {
188 return
189 1 + // acting_primary
190 1 + // up_primary
191 1 + // num acting
192 1 + // num up
193 size + // acting
194 size; // up
195 }
196
11fdf7f2 197 PoolMapping(int s, int p, bool e)
7c673cae
FG
198 : size(s),
199 pg_num(p),
11fdf7f2 200 erasure(e),
7c673cae
FG
201 table(pg_num * row_size()) {
202 }
203
204 void get(size_t ps,
205 std::vector<int> *up,
206 int *up_primary,
207 std::vector<int> *acting,
208 int *acting_primary) const {
209 const int32_t *row = &table[row_size() * ps];
210 if (acting_primary) {
211 *acting_primary = row[0];
212 }
213 if (up_primary) {
214 *up_primary = row[1];
215 }
216 if (acting) {
217 acting->resize(row[2]);
218 for (int i = 0; i < row[2]; ++i) {
219 (*acting)[i] = row[4 + i];
220 }
221 }
222 if (up) {
223 up->resize(row[3]);
224 for (int i = 0; i < row[3]; ++i) {
225 (*up)[i] = row[4 + size + i];
226 }
227 }
228 }
229
230 void set(size_t ps,
231 const std::vector<int>& up,
232 int up_primary,
233 const std::vector<int>& acting,
234 int acting_primary) {
235 int32_t *row = &table[row_size() * ps];
236 row[0] = acting_primary;
237 row[1] = up_primary;
11fdf7f2
TL
238 // these should always be <= the pool size, but just in case, avoid
239 // blowing out the array. Note that our mapping is not completely
240 // accurate in this case--this is just to avoid crashing.
241 row[2] = std::min<int32_t>(acting.size(), size);
242 row[3] = std::min<int32_t>(up.size(), size);
7c673cae
FG
243 for (int i = 0; i < row[2]; ++i) {
244 row[4 + i] = acting[i];
245 }
246 for (int i = 0; i < row[3]; ++i) {
247 row[4 + size + i] = up[i];
248 }
249 }
250 };
251
252 mempool::osdmap_mapping::map<int64_t,PoolMapping> pools;
253 mempool::osdmap_mapping::vector<
254 mempool::osdmap_mapping::vector<pg_t>> acting_rmap; // osd -> pg
255 //unused: mempool::osdmap_mapping::vector<std::vector<pg_t>> up_rmap; // osd -> pg
b5b8bbf5 256 epoch_t epoch = 0;
7c673cae
FG
257 uint64_t num_pgs = 0;
258
259 void _init_mappings(const OSDMap& osdmap);
260 void _update_range(
261 const OSDMap& map,
262 int64_t pool,
263 unsigned pg_begin, unsigned pg_end);
264
265 void _build_rmap(const OSDMap& osdmap);
266
267 void _start(const OSDMap& osdmap) {
268 _init_mappings(osdmap);
269 }
270 void _finish(const OSDMap& osdmap);
271
272 void _dump();
273
274 friend class ParallelPGMapper;
275
276 struct MappingJob : public ParallelPGMapper::Job {
277 OSDMapMapping *mapping;
278 MappingJob(const OSDMap *osdmap, OSDMapMapping *m)
279 : Job(osdmap), mapping(m) {
280 mapping->_start(*osdmap);
281 }
494da23a 282 void process(const vector<pg_t>& pgs) override {}
7c673cae
FG
283 void process(int64_t pool, unsigned ps_begin, unsigned ps_end) override {
284 mapping->_update_range(*osdmap, pool, ps_begin, ps_end);
285 }
286 void complete() override {
287 mapping->_finish(*osdmap);
288 }
289 };
290
291public:
292 void get(pg_t pgid,
293 std::vector<int> *up,
294 int *up_primary,
295 std::vector<int> *acting,
296 int *acting_primary) const {
297 auto p = pools.find(pgid.pool());
11fdf7f2
TL
298 ceph_assert(p != pools.end());
299 ceph_assert(pgid.ps() < p->second.pg_num);
7c673cae
FG
300 p->second.get(pgid.ps(), up, up_primary, acting, acting_primary);
301 }
302
11fdf7f2
TL
303 bool get_primary_and_shard(pg_t pgid,
304 int *acting_primary,
305 spg_t *spgid) {
306 auto p = pools.find(pgid.pool());
307 ceph_assert(p != pools.end());
308 ceph_assert(pgid.ps() < p->second.pg_num);
309 vector<int> acting;
310 p->second.get(pgid.ps(), nullptr, nullptr, &acting, acting_primary);
311 if (p->second.erasure) {
312 for (uint8_t i = 0; i < acting.size(); ++i) {
313 if (acting[i] == *acting_primary) {
314 *spgid = spg_t(pgid, shard_id_t(i));
315 return true;
316 }
317 }
318 return false;
319 } else {
320 *spgid = spg_t(pgid);
321 return true;
322 }
323 }
324
7c673cae 325 const mempool::osdmap_mapping::vector<pg_t>& get_osd_acting_pgs(unsigned osd) {
11fdf7f2 326 ceph_assert(osd < acting_rmap.size());
7c673cae
FG
327 return acting_rmap[osd];
328 }
7c673cae
FG
329
330 void update(const OSDMap& map);
331 void update(const OSDMap& map, pg_t pgid);
332
333 std::unique_ptr<MappingJob> start_update(
334 const OSDMap& map,
335 ParallelPGMapper& mapper,
336 unsigned pgs_per_item) {
337 std::unique_ptr<MappingJob> job(new MappingJob(&map, this));
494da23a 338 mapper.queue(job.get(), pgs_per_item, {});
7c673cae
FG
339 return job;
340 }
341
342 epoch_t get_epoch() const {
343 return epoch;
344 }
345
346 uint64_t get_num_pgs() const {
347 return num_pgs;
348 }
349};
350
351
352#endif