]> git.proxmox.com Git - ceph.git/blame - ceph/src/osd/OSDMapMapping.h
buildsys: switch source download to quincy
[ceph.git] / ceph / src / osd / OSDMapMapping.h
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4
5#ifndef CEPH_OSDMAPMAPPING_H
6#define CEPH_OSDMAPMAPPING_H
7
8#include <vector>
9#include <map>
10
11#include "osd/osd_types.h"
12#include "common/WorkQueue.h"
11fdf7f2 13#include "common/Cond.h"
7c673cae
FG
14
15class OSDMap;
16
17/// work queue to perform work on batches of pgids on multiple CPUs
18class ParallelPGMapper {
19public:
20 struct Job {
21 utime_t start, finish;
22 unsigned shards = 0;
23 const OSDMap *osdmap;
24 bool aborted = false;
25 Context *onfinish = nullptr;
26
9f95a23c
TL
27 ceph::mutex lock = ceph::make_mutex("ParallelPGMapper::Job::lock");
28 ceph::condition_variable cond;
7c673cae
FG
29
30 Job(const OSDMap *om) : start(ceph_clock_now()), osdmap(om) {}
31 virtual ~Job() {
11fdf7f2 32 ceph_assert(shards == 0);
7c673cae
FG
33 }
34
494da23a 35 // child must implement either form of process
f67539c2 36 virtual void process(const std::vector<pg_t>& pgs) = 0;
7c673cae
FG
37 virtual void process(int64_t poolid, unsigned ps_begin, unsigned ps_end) = 0;
38 virtual void complete() = 0;
39
40 void set_finish_event(Context *fin) {
9f95a23c 41 lock.lock();
7c673cae
FG
42 if (shards == 0) {
43 // already done.
9f95a23c 44 lock.unlock();
7c673cae
FG
45 fin->complete(0);
46 } else {
47 // set finisher
48 onfinish = fin;
9f95a23c 49 lock.unlock();
7c673cae
FG
50 }
51 }
52 bool is_done() {
11fdf7f2 53 std::lock_guard l(lock);
7c673cae
FG
54 return shards == 0;
55 }
56 utime_t get_duration() {
57 return finish - start;
58 }
59 void wait() {
9f95a23c
TL
60 std::unique_lock l(lock);
61 cond.wait(l, [this] { return shards == 0; });
7c673cae
FG
62 }
63 bool wait_for(double duration) {
64 utime_t until = start;
65 until += duration;
9f95a23c 66 std::unique_lock l(lock);
7c673cae
FG
67 while (shards > 0) {
68 if (ceph_clock_now() >= until) {
69 return false;
70 }
9f95a23c 71 cond.wait(l);
7c673cae
FG
72 }
73 return true;
74 }
75 void abort() {
76 Context *fin = nullptr;
77 {
9f95a23c 78 std::unique_lock l(lock);
7c673cae
FG
79 aborted = true;
80 fin = onfinish;
81 onfinish = nullptr;
9f95a23c 82 cond.wait(l, [this] { return shards == 0; });
7c673cae
FG
83 }
84 if (fin) {
85 fin->complete(-ECANCELED);
86 }
87 }
88
89 void start_one() {
11fdf7f2 90 std::lock_guard l(lock);
7c673cae
FG
91 ++shards;
92 }
93 void finish_one();
94 };
95
96protected:
97 CephContext *cct;
98
99 struct Item {
100 Job *job;
101 int64_t pool;
102 unsigned begin, end;
f67539c2 103 std::vector<pg_t> pgs;
7c673cae 104
f67539c2 105 Item(Job *j, std::vector<pg_t> pgs) : job(j), pgs(pgs) {}
7c673cae
FG
106 Item(Job *j, int64_t p, unsigned b, unsigned e)
107 : job(j),
108 pool(p),
109 begin(b),
110 end(e) {}
111 };
112 std::deque<Item*> q;
113
114 struct WQ : public ThreadPool::WorkQueue<Item> {
115 ParallelPGMapper *m;
116
117 WQ(ParallelPGMapper *m_, ThreadPool *tp)
f67539c2
TL
118 : ThreadPool::WorkQueue<Item>("ParallelPGMapper::WQ",
119 ceph::timespan::zero(),
120 ceph::timespan::zero(),
121 tp),
7c673cae
FG
122 m(m_) {}
123
124 bool _enqueue(Item *i) override {
125 m->q.push_back(i);
126 return true;
127 }
128 void _dequeue(Item *i) override {
129 ceph_abort();
130 }
131 Item *_dequeue() override {
132 while (!m->q.empty()) {
133 Item *i = m->q.front();
134 m->q.pop_front();
135 if (i->job->aborted) {
136 i->job->finish_one();
137 delete i;
138 } else {
139 return i;
140 }
141 }
142 return nullptr;
143 }
144
145 void _process(Item *i, ThreadPool::TPHandle &h) override;
146
147 void _clear() override {
11fdf7f2 148 ceph_assert(_empty());
7c673cae
FG
149 }
150
151 bool _empty() override {
152 return m->q.empty();
153 }
154 } wq;
155
156public:
157 ParallelPGMapper(CephContext *cct, ThreadPool *tp)
158 : cct(cct),
159 wq(this, tp) {}
160
161 void queue(
162 Job *job,
494da23a 163 unsigned pgs_per_item,
f67539c2 164 const std::vector<pg_t>& input_pgs);
7c673cae
FG
165
166 void drain() {
167 wq.drain();
168 }
169};
170
171
172/// a precalculated mapping of every PG for a given OSDMap
173class OSDMapMapping {
174public:
175 MEMPOOL_CLASS_HELPERS();
176private:
177
178 struct PoolMapping {
179 MEMPOOL_CLASS_HELPERS();
180
181 unsigned size = 0;
182 unsigned pg_num = 0;
11fdf7f2 183 bool erasure = false;
7c673cae
FG
184 mempool::osdmap_mapping::vector<int32_t> table;
185
186 size_t row_size() const {
187 return
188 1 + // acting_primary
189 1 + // up_primary
190 1 + // num acting
191 1 + // num up
192 size + // acting
193 size; // up
194 }
195
11fdf7f2 196 PoolMapping(int s, int p, bool e)
7c673cae
FG
197 : size(s),
198 pg_num(p),
11fdf7f2 199 erasure(e),
7c673cae
FG
200 table(pg_num * row_size()) {
201 }
202
203 void get(size_t ps,
204 std::vector<int> *up,
205 int *up_primary,
206 std::vector<int> *acting,
207 int *acting_primary) const {
208 const int32_t *row = &table[row_size() * ps];
209 if (acting_primary) {
210 *acting_primary = row[0];
211 }
212 if (up_primary) {
213 *up_primary = row[1];
214 }
215 if (acting) {
216 acting->resize(row[2]);
217 for (int i = 0; i < row[2]; ++i) {
218 (*acting)[i] = row[4 + i];
219 }
220 }
221 if (up) {
222 up->resize(row[3]);
223 for (int i = 0; i < row[3]; ++i) {
224 (*up)[i] = row[4 + size + i];
225 }
226 }
227 }
228
229 void set(size_t ps,
230 const std::vector<int>& up,
231 int up_primary,
232 const std::vector<int>& acting,
233 int acting_primary) {
234 int32_t *row = &table[row_size() * ps];
235 row[0] = acting_primary;
236 row[1] = up_primary;
11fdf7f2
TL
237 // these should always be <= the pool size, but just in case, avoid
238 // blowing out the array. Note that our mapping is not completely
239 // accurate in this case--this is just to avoid crashing.
240 row[2] = std::min<int32_t>(acting.size(), size);
241 row[3] = std::min<int32_t>(up.size(), size);
7c673cae
FG
242 for (int i = 0; i < row[2]; ++i) {
243 row[4 + i] = acting[i];
244 }
245 for (int i = 0; i < row[3]; ++i) {
246 row[4 + size + i] = up[i];
247 }
248 }
249 };
250
251 mempool::osdmap_mapping::map<int64_t,PoolMapping> pools;
252 mempool::osdmap_mapping::vector<
253 mempool::osdmap_mapping::vector<pg_t>> acting_rmap; // osd -> pg
254 //unused: mempool::osdmap_mapping::vector<std::vector<pg_t>> up_rmap; // osd -> pg
b5b8bbf5 255 epoch_t epoch = 0;
7c673cae
FG
256 uint64_t num_pgs = 0;
257
258 void _init_mappings(const OSDMap& osdmap);
259 void _update_range(
260 const OSDMap& map,
261 int64_t pool,
262 unsigned pg_begin, unsigned pg_end);
263
264 void _build_rmap(const OSDMap& osdmap);
265
266 void _start(const OSDMap& osdmap) {
267 _init_mappings(osdmap);
268 }
269 void _finish(const OSDMap& osdmap);
270
271 void _dump();
272
273 friend class ParallelPGMapper;
274
275 struct MappingJob : public ParallelPGMapper::Job {
276 OSDMapMapping *mapping;
277 MappingJob(const OSDMap *osdmap, OSDMapMapping *m)
278 : Job(osdmap), mapping(m) {
279 mapping->_start(*osdmap);
280 }
f67539c2 281 void process(const std::vector<pg_t>& pgs) override {}
7c673cae
FG
282 void process(int64_t pool, unsigned ps_begin, unsigned ps_end) override {
283 mapping->_update_range(*osdmap, pool, ps_begin, ps_end);
284 }
285 void complete() override {
286 mapping->_finish(*osdmap);
287 }
288 };
289
290public:
291 void get(pg_t pgid,
292 std::vector<int> *up,
293 int *up_primary,
294 std::vector<int> *acting,
295 int *acting_primary) const {
296 auto p = pools.find(pgid.pool());
11fdf7f2
TL
297 ceph_assert(p != pools.end());
298 ceph_assert(pgid.ps() < p->second.pg_num);
7c673cae
FG
299 p->second.get(pgid.ps(), up, up_primary, acting, acting_primary);
300 }
301
11fdf7f2
TL
302 bool get_primary_and_shard(pg_t pgid,
303 int *acting_primary,
304 spg_t *spgid) {
305 auto p = pools.find(pgid.pool());
306 ceph_assert(p != pools.end());
307 ceph_assert(pgid.ps() < p->second.pg_num);
9f95a23c 308 std::vector<int> acting;
11fdf7f2
TL
309 p->second.get(pgid.ps(), nullptr, nullptr, &acting, acting_primary);
310 if (p->second.erasure) {
311 for (uint8_t i = 0; i < acting.size(); ++i) {
312 if (acting[i] == *acting_primary) {
313 *spgid = spg_t(pgid, shard_id_t(i));
314 return true;
315 }
316 }
317 return false;
318 } else {
319 *spgid = spg_t(pgid);
320 return true;
321 }
322 }
323
7c673cae 324 const mempool::osdmap_mapping::vector<pg_t>& get_osd_acting_pgs(unsigned osd) {
11fdf7f2 325 ceph_assert(osd < acting_rmap.size());
7c673cae
FG
326 return acting_rmap[osd];
327 }
7c673cae
FG
328
329 void update(const OSDMap& map);
330 void update(const OSDMap& map, pg_t pgid);
331
332 std::unique_ptr<MappingJob> start_update(
333 const OSDMap& map,
334 ParallelPGMapper& mapper,
335 unsigned pgs_per_item) {
336 std::unique_ptr<MappingJob> job(new MappingJob(&map, this));
494da23a 337 mapper.queue(job.get(), pgs_per_item, {});
7c673cae
FG
338 return job;
339 }
340
341 epoch_t get_epoch() const {
342 return epoch;
343 }
344
345 uint64_t get_num_pgs() const {
346 return num_pgs;
347 }
348};
349
350
351#endif