]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | ||
4 | ||
5 | #ifndef CEPH_OSDMAPMAPPING_H | |
6 | #define CEPH_OSDMAPMAPPING_H | |
7 | ||
8 | #include <vector> | |
9 | #include <map> | |
10 | ||
11 | #include "osd/osd_types.h" | |
12 | #include "common/WorkQueue.h" | |
13 | ||
14 | class OSDMap; | |
15 | ||
16 | /// work queue to perform work on batches of pgids on multiple CPUs | |
17 | class ParallelPGMapper { | |
18 | public: | |
19 | struct Job { | |
20 | utime_t start, finish; | |
21 | unsigned shards = 0; | |
22 | const OSDMap *osdmap; | |
23 | bool aborted = false; | |
24 | Context *onfinish = nullptr; | |
25 | ||
26 | Mutex lock = {"ParallelPGMapper::Job::lock"}; | |
27 | Cond cond; | |
28 | ||
29 | Job(const OSDMap *om) : start(ceph_clock_now()), osdmap(om) {} | |
30 | virtual ~Job() { | |
31 | assert(shards == 0); | |
32 | } | |
33 | ||
34 | // child must implement this | |
35 | virtual void process(int64_t poolid, unsigned ps_begin, unsigned ps_end) = 0; | |
36 | virtual void complete() = 0; | |
37 | ||
38 | void set_finish_event(Context *fin) { | |
39 | lock.Lock(); | |
40 | if (shards == 0) { | |
41 | // already done. | |
42 | lock.Unlock(); | |
43 | fin->complete(0); | |
44 | } else { | |
45 | // set finisher | |
46 | onfinish = fin; | |
47 | lock.Unlock(); | |
48 | } | |
49 | } | |
50 | bool is_done() { | |
51 | Mutex::Locker l(lock); | |
52 | return shards == 0; | |
53 | } | |
54 | utime_t get_duration() { | |
55 | return finish - start; | |
56 | } | |
57 | void wait() { | |
58 | Mutex::Locker l(lock); | |
59 | while (shards > 0) { | |
60 | cond.Wait(lock); | |
61 | } | |
62 | } | |
63 | bool wait_for(double duration) { | |
64 | utime_t until = start; | |
65 | until += duration; | |
66 | Mutex::Locker l(lock); | |
67 | while (shards > 0) { | |
68 | if (ceph_clock_now() >= until) { | |
69 | return false; | |
70 | } | |
71 | cond.Wait(lock); | |
72 | } | |
73 | return true; | |
74 | } | |
75 | void abort() { | |
76 | Context *fin = nullptr; | |
77 | { | |
78 | Mutex::Locker l(lock); | |
79 | aborted = true; | |
80 | fin = onfinish; | |
81 | onfinish = nullptr; | |
82 | while (shards > 0) { | |
83 | cond.Wait(lock); | |
84 | } | |
85 | } | |
86 | if (fin) { | |
87 | fin->complete(-ECANCELED); | |
88 | } | |
89 | } | |
90 | ||
91 | void start_one() { | |
92 | Mutex::Locker l(lock); | |
93 | ++shards; | |
94 | } | |
95 | void finish_one(); | |
96 | }; | |
97 | ||
98 | protected: | |
99 | CephContext *cct; | |
100 | ||
101 | struct Item { | |
102 | Job *job; | |
103 | int64_t pool; | |
104 | unsigned begin, end; | |
105 | ||
106 | Item(Job *j, int64_t p, unsigned b, unsigned e) | |
107 | : job(j), | |
108 | pool(p), | |
109 | begin(b), | |
110 | end(e) {} | |
111 | }; | |
112 | std::deque<Item*> q; | |
113 | ||
114 | struct WQ : public ThreadPool::WorkQueue<Item> { | |
115 | ParallelPGMapper *m; | |
116 | ||
117 | WQ(ParallelPGMapper *m_, ThreadPool *tp) | |
118 | : ThreadPool::WorkQueue<Item>("ParallelPGMapper::WQ", 0, 0, tp), | |
119 | m(m_) {} | |
120 | ||
121 | bool _enqueue(Item *i) override { | |
122 | m->q.push_back(i); | |
123 | return true; | |
124 | } | |
125 | void _dequeue(Item *i) override { | |
126 | ceph_abort(); | |
127 | } | |
128 | Item *_dequeue() override { | |
129 | while (!m->q.empty()) { | |
130 | Item *i = m->q.front(); | |
131 | m->q.pop_front(); | |
132 | if (i->job->aborted) { | |
133 | i->job->finish_one(); | |
134 | delete i; | |
135 | } else { | |
136 | return i; | |
137 | } | |
138 | } | |
139 | return nullptr; | |
140 | } | |
141 | ||
142 | void _process(Item *i, ThreadPool::TPHandle &h) override; | |
143 | ||
144 | void _clear() override { | |
145 | assert(_empty()); | |
146 | } | |
147 | ||
148 | bool _empty() override { | |
149 | return m->q.empty(); | |
150 | } | |
151 | } wq; | |
152 | ||
153 | public: | |
154 | ParallelPGMapper(CephContext *cct, ThreadPool *tp) | |
155 | : cct(cct), | |
156 | wq(this, tp) {} | |
157 | ||
158 | void queue( | |
159 | Job *job, | |
160 | unsigned pgs_per_item); | |
161 | ||
162 | void drain() { | |
163 | wq.drain(); | |
164 | } | |
165 | }; | |
166 | ||
167 | ||
168 | /// a precalculated mapping of every PG for a given OSDMap | |
169 | class OSDMapMapping { | |
170 | public: | |
171 | MEMPOOL_CLASS_HELPERS(); | |
172 | private: | |
173 | ||
174 | struct PoolMapping { | |
175 | MEMPOOL_CLASS_HELPERS(); | |
176 | ||
177 | unsigned size = 0; | |
178 | unsigned pg_num = 0; | |
179 | mempool::osdmap_mapping::vector<int32_t> table; | |
180 | ||
181 | size_t row_size() const { | |
182 | return | |
183 | 1 + // acting_primary | |
184 | 1 + // up_primary | |
185 | 1 + // num acting | |
186 | 1 + // num up | |
187 | size + // acting | |
188 | size; // up | |
189 | } | |
190 | ||
191 | PoolMapping(int s, int p) | |
192 | : size(s), | |
193 | pg_num(p), | |
194 | table(pg_num * row_size()) { | |
195 | } | |
196 | ||
197 | void get(size_t ps, | |
198 | std::vector<int> *up, | |
199 | int *up_primary, | |
200 | std::vector<int> *acting, | |
201 | int *acting_primary) const { | |
202 | const int32_t *row = &table[row_size() * ps]; | |
203 | if (acting_primary) { | |
204 | *acting_primary = row[0]; | |
205 | } | |
206 | if (up_primary) { | |
207 | *up_primary = row[1]; | |
208 | } | |
209 | if (acting) { | |
210 | acting->resize(row[2]); | |
211 | for (int i = 0; i < row[2]; ++i) { | |
212 | (*acting)[i] = row[4 + i]; | |
213 | } | |
214 | } | |
215 | if (up) { | |
216 | up->resize(row[3]); | |
217 | for (int i = 0; i < row[3]; ++i) { | |
218 | (*up)[i] = row[4 + size + i]; | |
219 | } | |
220 | } | |
221 | } | |
222 | ||
223 | void set(size_t ps, | |
224 | const std::vector<int>& up, | |
225 | int up_primary, | |
226 | const std::vector<int>& acting, | |
227 | int acting_primary) { | |
228 | int32_t *row = &table[row_size() * ps]; | |
229 | row[0] = acting_primary; | |
230 | row[1] = up_primary; | |
231 | row[2] = acting.size(); | |
232 | row[3] = up.size(); | |
233 | for (int i = 0; i < row[2]; ++i) { | |
234 | row[4 + i] = acting[i]; | |
235 | } | |
236 | for (int i = 0; i < row[3]; ++i) { | |
237 | row[4 + size + i] = up[i]; | |
238 | } | |
239 | } | |
240 | }; | |
241 | ||
242 | mempool::osdmap_mapping::map<int64_t,PoolMapping> pools; | |
243 | mempool::osdmap_mapping::vector< | |
244 | mempool::osdmap_mapping::vector<pg_t>> acting_rmap; // osd -> pg | |
245 | //unused: mempool::osdmap_mapping::vector<std::vector<pg_t>> up_rmap; // osd -> pg | |
b5b8bbf5 | 246 | epoch_t epoch = 0; |
7c673cae FG |
247 | uint64_t num_pgs = 0; |
248 | ||
249 | void _init_mappings(const OSDMap& osdmap); | |
250 | void _update_range( | |
251 | const OSDMap& map, | |
252 | int64_t pool, | |
253 | unsigned pg_begin, unsigned pg_end); | |
254 | ||
255 | void _build_rmap(const OSDMap& osdmap); | |
256 | ||
257 | void _start(const OSDMap& osdmap) { | |
258 | _init_mappings(osdmap); | |
259 | } | |
260 | void _finish(const OSDMap& osdmap); | |
261 | ||
262 | void _dump(); | |
263 | ||
264 | friend class ParallelPGMapper; | |
265 | ||
266 | struct MappingJob : public ParallelPGMapper::Job { | |
267 | OSDMapMapping *mapping; | |
268 | MappingJob(const OSDMap *osdmap, OSDMapMapping *m) | |
269 | : Job(osdmap), mapping(m) { | |
270 | mapping->_start(*osdmap); | |
271 | } | |
272 | void process(int64_t pool, unsigned ps_begin, unsigned ps_end) override { | |
273 | mapping->_update_range(*osdmap, pool, ps_begin, ps_end); | |
274 | } | |
275 | void complete() override { | |
276 | mapping->_finish(*osdmap); | |
277 | } | |
278 | }; | |
279 | ||
280 | public: | |
281 | void get(pg_t pgid, | |
282 | std::vector<int> *up, | |
283 | int *up_primary, | |
284 | std::vector<int> *acting, | |
285 | int *acting_primary) const { | |
286 | auto p = pools.find(pgid.pool()); | |
287 | assert(p != pools.end()); | |
288 | assert(pgid.ps() < p->second.pg_num); | |
289 | p->second.get(pgid.ps(), up, up_primary, acting, acting_primary); | |
290 | } | |
291 | ||
292 | const mempool::osdmap_mapping::vector<pg_t>& get_osd_acting_pgs(unsigned osd) { | |
293 | assert(osd < acting_rmap.size()); | |
294 | return acting_rmap[osd]; | |
295 | } | |
296 | /* unsued | |
297 | const std::vector<pg_t>& get_osd_up_pgs(unsigned osd) { | |
298 | assert(osd < up_rmap.size()); | |
299 | return up_rmap[osd]; | |
300 | } | |
301 | */ | |
302 | ||
303 | void update(const OSDMap& map); | |
304 | void update(const OSDMap& map, pg_t pgid); | |
305 | ||
306 | std::unique_ptr<MappingJob> start_update( | |
307 | const OSDMap& map, | |
308 | ParallelPGMapper& mapper, | |
309 | unsigned pgs_per_item) { | |
310 | std::unique_ptr<MappingJob> job(new MappingJob(&map, this)); | |
311 | mapper.queue(job.get(), pgs_per_item); | |
312 | return job; | |
313 | } | |
314 | ||
315 | epoch_t get_epoch() const { | |
316 | return epoch; | |
317 | } | |
318 | ||
319 | uint64_t get_num_pgs() const { | |
320 | return num_pgs; | |
321 | } | |
322 | }; | |
323 | ||
324 | ||
325 | #endif |