]> git.proxmox.com Git - ceph.git/blob - ceph/src/osd/OSDMapMapping.h
216c30446a95a216230681f94feb73cf3f7db3bd
[ceph.git] / ceph / src / osd / OSDMapMapping.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4
5 #ifndef CEPH_OSDMAPMAPPING_H
6 #define CEPH_OSDMAPMAPPING_H
7
8 #include <vector>
9 #include <map>
10
11 #include "osd/osd_types.h"
12 #include "common/WorkQueue.h"
13 #include "common/Cond.h"
14
15 class OSDMap;
16
17 /// work queue to perform work on batches of pgids on multiple CPUs
18 class ParallelPGMapper {
19 public:
20 struct Job {
21 utime_t start, finish;
22 unsigned shards = 0;
23 const OSDMap *osdmap;
24 bool aborted = false;
25 Context *onfinish = nullptr;
26
27 ceph::mutex lock = ceph::make_mutex("ParallelPGMapper::Job::lock");
28 ceph::condition_variable cond;
29
30 Job(const OSDMap *om) : start(ceph_clock_now()), osdmap(om) {}
31 virtual ~Job() {
32 ceph_assert(shards == 0);
33 }
34
35 // child must implement either form of process
36 virtual void process(const std::vector<pg_t>& pgs) = 0;
37 virtual void process(int64_t poolid, unsigned ps_begin, unsigned ps_end) = 0;
38 virtual void complete() = 0;
39
40 void set_finish_event(Context *fin) {
41 lock.lock();
42 if (shards == 0) {
43 // already done.
44 lock.unlock();
45 fin->complete(0);
46 } else {
47 // set finisher
48 onfinish = fin;
49 lock.unlock();
50 }
51 }
52 bool is_done() {
53 std::lock_guard l(lock);
54 return shards == 0;
55 }
56 utime_t get_duration() {
57 return finish - start;
58 }
59 void wait() {
60 std::unique_lock l(lock);
61 cond.wait(l, [this] { return shards == 0; });
62 }
63 bool wait_for(double duration) {
64 utime_t until = start;
65 until += duration;
66 std::unique_lock l(lock);
67 while (shards > 0) {
68 if (ceph_clock_now() >= until) {
69 return false;
70 }
71 cond.wait(l);
72 }
73 return true;
74 }
75 void abort() {
76 Context *fin = nullptr;
77 {
78 std::unique_lock l(lock);
79 aborted = true;
80 fin = onfinish;
81 onfinish = nullptr;
82 cond.wait(l, [this] { return shards == 0; });
83 }
84 if (fin) {
85 fin->complete(-ECANCELED);
86 }
87 }
88
89 void start_one() {
90 std::lock_guard l(lock);
91 ++shards;
92 }
93 void finish_one();
94 };
95
96 protected:
97 CephContext *cct;
98
99 struct Item {
100 Job *job;
101 int64_t pool;
102 unsigned begin, end;
103 std::vector<pg_t> pgs;
104
105 Item(Job *j, std::vector<pg_t> pgs) : job(j), pgs(pgs) {}
106 Item(Job *j, int64_t p, unsigned b, unsigned e)
107 : job(j),
108 pool(p),
109 begin(b),
110 end(e) {}
111 };
112 std::deque<Item*> q;
113
114 struct WQ : public ThreadPool::WorkQueue<Item> {
115 ParallelPGMapper *m;
116
117 WQ(ParallelPGMapper *m_, ThreadPool *tp)
118 : ThreadPool::WorkQueue<Item>(
119 "ParallelPGMapper::WQ",
120 ceph::make_timespan(m_->cct->_conf->threadpool_default_timeout),
121 ceph::timespan::zero(),
122 tp),
123 m(m_) {}
124
125 bool _enqueue(Item *i) override {
126 m->q.push_back(i);
127 return true;
128 }
129 void _dequeue(Item *i) override {
130 ceph_abort();
131 }
132 Item *_dequeue() override {
133 while (!m->q.empty()) {
134 Item *i = m->q.front();
135 m->q.pop_front();
136 if (i->job->aborted) {
137 i->job->finish_one();
138 delete i;
139 } else {
140 return i;
141 }
142 }
143 return nullptr;
144 }
145
146 void _process(Item *i, ThreadPool::TPHandle &h) override;
147
148 void _clear() override {
149 ceph_assert(_empty());
150 }
151
152 bool _empty() override {
153 return m->q.empty();
154 }
155 } wq;
156
157 public:
158 ParallelPGMapper(CephContext *cct, ThreadPool *tp)
159 : cct(cct),
160 wq(this, tp) {}
161
162 void queue(
163 Job *job,
164 unsigned pgs_per_item,
165 const std::vector<pg_t>& input_pgs);
166
167 void drain() {
168 wq.drain();
169 }
170 };
171
172
173 /// a precalculated mapping of every PG for a given OSDMap
174 class OSDMapMapping {
175 public:
176 MEMPOOL_CLASS_HELPERS();
177 private:
178
179 struct PoolMapping {
180 MEMPOOL_CLASS_HELPERS();
181
182 unsigned size = 0;
183 unsigned pg_num = 0;
184 bool erasure = false;
185 mempool::osdmap_mapping::vector<int32_t> table;
186
187 size_t row_size() const {
188 return
189 1 + // acting_primary
190 1 + // up_primary
191 1 + // num acting
192 1 + // num up
193 size + // acting
194 size; // up
195 }
196
197 PoolMapping(int s, int p, bool e)
198 : size(s),
199 pg_num(p),
200 erasure(e),
201 table(pg_num * row_size()) {
202 }
203
204 void get(size_t ps,
205 std::vector<int> *up,
206 int *up_primary,
207 std::vector<int> *acting,
208 int *acting_primary) const {
209 const int32_t *row = &table[row_size() * ps];
210 if (acting_primary) {
211 *acting_primary = row[0];
212 }
213 if (up_primary) {
214 *up_primary = row[1];
215 }
216 if (acting) {
217 acting->resize(row[2]);
218 for (int i = 0; i < row[2]; ++i) {
219 (*acting)[i] = row[4 + i];
220 }
221 }
222 if (up) {
223 up->resize(row[3]);
224 for (int i = 0; i < row[3]; ++i) {
225 (*up)[i] = row[4 + size + i];
226 }
227 }
228 }
229
230 void set(size_t ps,
231 const std::vector<int>& up,
232 int up_primary,
233 const std::vector<int>& acting,
234 int acting_primary) {
235 int32_t *row = &table[row_size() * ps];
236 row[0] = acting_primary;
237 row[1] = up_primary;
238 // these should always be <= the pool size, but just in case, avoid
239 // blowing out the array. Note that our mapping is not completely
240 // accurate in this case--this is just to avoid crashing.
241 row[2] = std::min<int32_t>(acting.size(), size);
242 row[3] = std::min<int32_t>(up.size(), size);
243 for (int i = 0; i < row[2]; ++i) {
244 row[4 + i] = acting[i];
245 }
246 for (int i = 0; i < row[3]; ++i) {
247 row[4 + size + i] = up[i];
248 }
249 }
250 };
251
252 mempool::osdmap_mapping::map<int64_t,PoolMapping> pools;
253 mempool::osdmap_mapping::vector<
254 mempool::osdmap_mapping::vector<pg_t>> acting_rmap; // osd -> pg
255 //unused: mempool::osdmap_mapping::vector<std::vector<pg_t>> up_rmap; // osd -> pg
256 epoch_t epoch = 0;
257 uint64_t num_pgs = 0;
258
259 void _init_mappings(const OSDMap& osdmap);
260 void _update_range(
261 const OSDMap& map,
262 int64_t pool,
263 unsigned pg_begin, unsigned pg_end);
264
265 void _build_rmap(const OSDMap& osdmap);
266
267 void _start(const OSDMap& osdmap) {
268 _init_mappings(osdmap);
269 }
270 void _finish(const OSDMap& osdmap);
271
272 void _dump();
273
274 friend class ParallelPGMapper;
275
276 struct MappingJob : public ParallelPGMapper::Job {
277 OSDMapMapping *mapping;
278 MappingJob(const OSDMap *osdmap, OSDMapMapping *m)
279 : Job(osdmap), mapping(m) {
280 mapping->_start(*osdmap);
281 }
282 void process(const std::vector<pg_t>& pgs) override {}
283 void process(int64_t pool, unsigned ps_begin, unsigned ps_end) override {
284 mapping->_update_range(*osdmap, pool, ps_begin, ps_end);
285 }
286 void complete() override {
287 mapping->_finish(*osdmap);
288 }
289 };
290 friend class OSDMapTest;
291 // for testing only
292 void update(const OSDMap& map);
293
294 public:
295 void get(pg_t pgid,
296 std::vector<int> *up,
297 int *up_primary,
298 std::vector<int> *acting,
299 int *acting_primary) const {
300 auto p = pools.find(pgid.pool());
301 ceph_assert(p != pools.end());
302 ceph_assert(pgid.ps() < p->second.pg_num);
303 p->second.get(pgid.ps(), up, up_primary, acting, acting_primary);
304 }
305
306 bool get_primary_and_shard(pg_t pgid,
307 int *acting_primary,
308 spg_t *spgid) {
309 auto p = pools.find(pgid.pool());
310 ceph_assert(p != pools.end());
311 ceph_assert(pgid.ps() < p->second.pg_num);
312 std::vector<int> acting;
313 p->second.get(pgid.ps(), nullptr, nullptr, &acting, acting_primary);
314 if (p->second.erasure) {
315 for (uint8_t i = 0; i < acting.size(); ++i) {
316 if (acting[i] == *acting_primary) {
317 *spgid = spg_t(pgid, shard_id_t(i));
318 return true;
319 }
320 }
321 return false;
322 } else {
323 *spgid = spg_t(pgid);
324 return true;
325 }
326 }
327
328 const mempool::osdmap_mapping::vector<pg_t>& get_osd_acting_pgs(unsigned osd) {
329 ceph_assert(osd < acting_rmap.size());
330 return acting_rmap[osd];
331 }
332
333 void update(const OSDMap& map, pg_t pgid);
334
335 std::unique_ptr<MappingJob> start_update(
336 const OSDMap& map,
337 ParallelPGMapper& mapper,
338 unsigned pgs_per_item) {
339 std::unique_ptr<MappingJob> job(new MappingJob(&map, this));
340 mapper.queue(job.get(), pgs_per_item, {});
341 return job;
342 }
343
344 epoch_t get_epoch() const {
345 return epoch;
346 }
347
348 uint64_t get_num_pgs() const {
349 return num_pgs;
350 }
351 };
352
353
354 #endif