]> git.proxmox.com Git - ceph.git/blob - ceph/src/osd/OSDMapMapping.h
update sources to v12.2.0
[ceph.git] / ceph / src / osd / OSDMapMapping.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4
5 #ifndef CEPH_OSDMAPMAPPING_H
6 #define CEPH_OSDMAPMAPPING_H
7
8 #include <vector>
9 #include <map>
10
11 #include "osd/osd_types.h"
12 #include "common/WorkQueue.h"
13
14 class OSDMap;
15
16 /// work queue to perform work on batches of pgids on multiple CPUs
17 class ParallelPGMapper {
18 public:
19 struct Job {
20 utime_t start, finish;
21 unsigned shards = 0;
22 const OSDMap *osdmap;
23 bool aborted = false;
24 Context *onfinish = nullptr;
25
26 Mutex lock = {"ParallelPGMapper::Job::lock"};
27 Cond cond;
28
29 Job(const OSDMap *om) : start(ceph_clock_now()), osdmap(om) {}
30 virtual ~Job() {
31 assert(shards == 0);
32 }
33
34 // child must implement this
35 virtual void process(int64_t poolid, unsigned ps_begin, unsigned ps_end) = 0;
36 virtual void complete() = 0;
37
38 void set_finish_event(Context *fin) {
39 lock.Lock();
40 if (shards == 0) {
41 // already done.
42 lock.Unlock();
43 fin->complete(0);
44 } else {
45 // set finisher
46 onfinish = fin;
47 lock.Unlock();
48 }
49 }
50 bool is_done() {
51 Mutex::Locker l(lock);
52 return shards == 0;
53 }
54 utime_t get_duration() {
55 return finish - start;
56 }
57 void wait() {
58 Mutex::Locker l(lock);
59 while (shards > 0) {
60 cond.Wait(lock);
61 }
62 }
63 bool wait_for(double duration) {
64 utime_t until = start;
65 until += duration;
66 Mutex::Locker l(lock);
67 while (shards > 0) {
68 if (ceph_clock_now() >= until) {
69 return false;
70 }
71 cond.Wait(lock);
72 }
73 return true;
74 }
75 void abort() {
76 Context *fin = nullptr;
77 {
78 Mutex::Locker l(lock);
79 aborted = true;
80 fin = onfinish;
81 onfinish = nullptr;
82 while (shards > 0) {
83 cond.Wait(lock);
84 }
85 }
86 if (fin) {
87 fin->complete(-ECANCELED);
88 }
89 }
90
91 void start_one() {
92 Mutex::Locker l(lock);
93 ++shards;
94 }
95 void finish_one();
96 };
97
98 protected:
99 CephContext *cct;
100
101 struct Item {
102 Job *job;
103 int64_t pool;
104 unsigned begin, end;
105
106 Item(Job *j, int64_t p, unsigned b, unsigned e)
107 : job(j),
108 pool(p),
109 begin(b),
110 end(e) {}
111 };
112 std::deque<Item*> q;
113
114 struct WQ : public ThreadPool::WorkQueue<Item> {
115 ParallelPGMapper *m;
116
117 WQ(ParallelPGMapper *m_, ThreadPool *tp)
118 : ThreadPool::WorkQueue<Item>("ParallelPGMapper::WQ", 0, 0, tp),
119 m(m_) {}
120
121 bool _enqueue(Item *i) override {
122 m->q.push_back(i);
123 return true;
124 }
125 void _dequeue(Item *i) override {
126 ceph_abort();
127 }
128 Item *_dequeue() override {
129 while (!m->q.empty()) {
130 Item *i = m->q.front();
131 m->q.pop_front();
132 if (i->job->aborted) {
133 i->job->finish_one();
134 delete i;
135 } else {
136 return i;
137 }
138 }
139 return nullptr;
140 }
141
142 void _process(Item *i, ThreadPool::TPHandle &h) override;
143
144 void _clear() override {
145 assert(_empty());
146 }
147
148 bool _empty() override {
149 return m->q.empty();
150 }
151 } wq;
152
153 public:
154 ParallelPGMapper(CephContext *cct, ThreadPool *tp)
155 : cct(cct),
156 wq(this, tp) {}
157
158 void queue(
159 Job *job,
160 unsigned pgs_per_item);
161
162 void drain() {
163 wq.drain();
164 }
165 };
166
167
168 /// a precalculated mapping of every PG for a given OSDMap
169 class OSDMapMapping {
170 public:
171 MEMPOOL_CLASS_HELPERS();
172 private:
173
174 struct PoolMapping {
175 MEMPOOL_CLASS_HELPERS();
176
177 unsigned size = 0;
178 unsigned pg_num = 0;
179 mempool::osdmap_mapping::vector<int32_t> table;
180
181 size_t row_size() const {
182 return
183 1 + // acting_primary
184 1 + // up_primary
185 1 + // num acting
186 1 + // num up
187 size + // acting
188 size; // up
189 }
190
191 PoolMapping(int s, int p)
192 : size(s),
193 pg_num(p),
194 table(pg_num * row_size()) {
195 }
196
197 void get(size_t ps,
198 std::vector<int> *up,
199 int *up_primary,
200 std::vector<int> *acting,
201 int *acting_primary) const {
202 const int32_t *row = &table[row_size() * ps];
203 if (acting_primary) {
204 *acting_primary = row[0];
205 }
206 if (up_primary) {
207 *up_primary = row[1];
208 }
209 if (acting) {
210 acting->resize(row[2]);
211 for (int i = 0; i < row[2]; ++i) {
212 (*acting)[i] = row[4 + i];
213 }
214 }
215 if (up) {
216 up->resize(row[3]);
217 for (int i = 0; i < row[3]; ++i) {
218 (*up)[i] = row[4 + size + i];
219 }
220 }
221 }
222
223 void set(size_t ps,
224 const std::vector<int>& up,
225 int up_primary,
226 const std::vector<int>& acting,
227 int acting_primary) {
228 int32_t *row = &table[row_size() * ps];
229 row[0] = acting_primary;
230 row[1] = up_primary;
231 row[2] = acting.size();
232 row[3] = up.size();
233 for (int i = 0; i < row[2]; ++i) {
234 row[4 + i] = acting[i];
235 }
236 for (int i = 0; i < row[3]; ++i) {
237 row[4 + size + i] = up[i];
238 }
239 }
240 };
241
242 mempool::osdmap_mapping::map<int64_t,PoolMapping> pools;
243 mempool::osdmap_mapping::vector<
244 mempool::osdmap_mapping::vector<pg_t>> acting_rmap; // osd -> pg
245 //unused: mempool::osdmap_mapping::vector<std::vector<pg_t>> up_rmap; // osd -> pg
246 epoch_t epoch = 0;
247 uint64_t num_pgs = 0;
248
249 void _init_mappings(const OSDMap& osdmap);
250 void _update_range(
251 const OSDMap& map,
252 int64_t pool,
253 unsigned pg_begin, unsigned pg_end);
254
255 void _build_rmap(const OSDMap& osdmap);
256
257 void _start(const OSDMap& osdmap) {
258 _init_mappings(osdmap);
259 }
260 void _finish(const OSDMap& osdmap);
261
262 void _dump();
263
264 friend class ParallelPGMapper;
265
266 struct MappingJob : public ParallelPGMapper::Job {
267 OSDMapMapping *mapping;
268 MappingJob(const OSDMap *osdmap, OSDMapMapping *m)
269 : Job(osdmap), mapping(m) {
270 mapping->_start(*osdmap);
271 }
272 void process(int64_t pool, unsigned ps_begin, unsigned ps_end) override {
273 mapping->_update_range(*osdmap, pool, ps_begin, ps_end);
274 }
275 void complete() override {
276 mapping->_finish(*osdmap);
277 }
278 };
279
280 public:
281 void get(pg_t pgid,
282 std::vector<int> *up,
283 int *up_primary,
284 std::vector<int> *acting,
285 int *acting_primary) const {
286 auto p = pools.find(pgid.pool());
287 assert(p != pools.end());
288 assert(pgid.ps() < p->second.pg_num);
289 p->second.get(pgid.ps(), up, up_primary, acting, acting_primary);
290 }
291
292 const mempool::osdmap_mapping::vector<pg_t>& get_osd_acting_pgs(unsigned osd) {
293 assert(osd < acting_rmap.size());
294 return acting_rmap[osd];
295 }
296 /* unsued
297 const std::vector<pg_t>& get_osd_up_pgs(unsigned osd) {
298 assert(osd < up_rmap.size());
299 return up_rmap[osd];
300 }
301 */
302
303 void update(const OSDMap& map);
304 void update(const OSDMap& map, pg_t pgid);
305
306 std::unique_ptr<MappingJob> start_update(
307 const OSDMap& map,
308 ParallelPGMapper& mapper,
309 unsigned pgs_per_item) {
310 std::unique_ptr<MappingJob> job(new MappingJob(&map, this));
311 mapper.queue(job.get(), pgs_per_item);
312 return job;
313 }
314
315 epoch_t get_epoch() const {
316 return epoch;
317 }
318
319 uint64_t get_num_pgs() const {
320 return num_pgs;
321 }
322 };
323
324
325 #endif