]>
git.proxmox.com Git - ceph.git/blob - ceph/src/osd/OSDMapMapping.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
5 #ifndef CEPH_OSDMAPMAPPING_H
6 #define CEPH_OSDMAPMAPPING_H
11 #include "osd/osd_types.h"
12 #include "common/WorkQueue.h"
16 /// work queue to perform work on batches of pgids on multiple CPUs
17 class ParallelPGMapper
{
20 utime_t start
, finish
;
24 Context
*onfinish
= nullptr;
26 Mutex lock
= {"ParallelPGMapper::Job::lock"};
29 Job(const OSDMap
*om
) : start(ceph_clock_now()), osdmap(om
) {}
34 // child must implement this
35 virtual void process(int64_t poolid
, unsigned ps_begin
, unsigned ps_end
) = 0;
36 virtual void complete() = 0;
38 void set_finish_event(Context
*fin
) {
51 Mutex::Locker
l(lock
);
54 utime_t
get_duration() {
55 return finish
- start
;
58 Mutex::Locker
l(lock
);
63 bool wait_for(double duration
) {
64 utime_t until
= start
;
66 Mutex::Locker
l(lock
);
68 if (ceph_clock_now() >= until
) {
76 Context
*fin
= nullptr;
78 Mutex::Locker
l(lock
);
87 fin
->complete(-ECANCELED
);
92 Mutex::Locker
l(lock
);
106 Item(Job
*j
, int64_t p
, unsigned b
, unsigned e
)
114 struct WQ
: public ThreadPool::WorkQueue
<Item
> {
117 WQ(ParallelPGMapper
*m_
, ThreadPool
*tp
)
118 : ThreadPool::WorkQueue
<Item
>("ParallelPGMapper::WQ", 0, 0, tp
),
121 bool _enqueue(Item
*i
) override
{
125 void _dequeue(Item
*i
) override
{
128 Item
*_dequeue() override
{
129 while (!m
->q
.empty()) {
130 Item
*i
= m
->q
.front();
132 if (i
->job
->aborted
) {
133 i
->job
->finish_one();
142 void _process(Item
*i
, ThreadPool::TPHandle
&h
) override
;
144 void _clear() override
{
148 bool _empty() override
{
154 ParallelPGMapper(CephContext
*cct
, ThreadPool
*tp
)
160 unsigned pgs_per_item
);
168 /// a precalculated mapping of every PG for a given OSDMap
169 class OSDMapMapping
{
171 MEMPOOL_CLASS_HELPERS();
175 MEMPOOL_CLASS_HELPERS();
179 mempool::osdmap_mapping::vector
<int32_t> table
;
181 size_t row_size() const {
183 1 + // acting_primary
191 PoolMapping(int s
, int p
)
194 table(pg_num
* row_size()) {
198 std::vector
<int> *up
,
200 std::vector
<int> *acting
,
201 int *acting_primary
) const {
202 const int32_t *row
= &table
[row_size() * ps
];
203 if (acting_primary
) {
204 *acting_primary
= row
[0];
207 *up_primary
= row
[1];
210 acting
->resize(row
[2]);
211 for (int i
= 0; i
< row
[2]; ++i
) {
212 (*acting
)[i
] = row
[4 + i
];
217 for (int i
= 0; i
< row
[3]; ++i
) {
218 (*up
)[i
] = row
[4 + size
+ i
];
224 const std::vector
<int>& up
,
226 const std::vector
<int>& acting
,
227 int acting_primary
) {
228 int32_t *row
= &table
[row_size() * ps
];
229 row
[0] = acting_primary
;
231 row
[2] = acting
.size();
233 for (int i
= 0; i
< row
[2]; ++i
) {
234 row
[4 + i
] = acting
[i
];
236 for (int i
= 0; i
< row
[3]; ++i
) {
237 row
[4 + size
+ i
] = up
[i
];
242 mempool::osdmap_mapping::map
<int64_t,PoolMapping
> pools
;
243 mempool::osdmap_mapping::vector
<
244 mempool::osdmap_mapping::vector
<pg_t
>> acting_rmap
; // osd -> pg
245 //unused: mempool::osdmap_mapping::vector<std::vector<pg_t>> up_rmap; // osd -> pg
247 uint64_t num_pgs
= 0;
249 void _init_mappings(const OSDMap
& osdmap
);
253 unsigned pg_begin
, unsigned pg_end
);
255 void _build_rmap(const OSDMap
& osdmap
);
257 void _start(const OSDMap
& osdmap
) {
258 _init_mappings(osdmap
);
260 void _finish(const OSDMap
& osdmap
);
264 friend class ParallelPGMapper
;
266 struct MappingJob
: public ParallelPGMapper::Job
{
267 OSDMapMapping
*mapping
;
268 MappingJob(const OSDMap
*osdmap
, OSDMapMapping
*m
)
269 : Job(osdmap
), mapping(m
) {
270 mapping
->_start(*osdmap
);
272 void process(int64_t pool
, unsigned ps_begin
, unsigned ps_end
) override
{
273 mapping
->_update_range(*osdmap
, pool
, ps_begin
, ps_end
);
275 void complete() override
{
276 mapping
->_finish(*osdmap
);
282 std::vector
<int> *up
,
284 std::vector
<int> *acting
,
285 int *acting_primary
) const {
286 auto p
= pools
.find(pgid
.pool());
287 assert(p
!= pools
.end());
288 assert(pgid
.ps() < p
->second
.pg_num
);
289 p
->second
.get(pgid
.ps(), up
, up_primary
, acting
, acting_primary
);
292 const mempool::osdmap_mapping::vector
<pg_t
>& get_osd_acting_pgs(unsigned osd
) {
293 assert(osd
< acting_rmap
.size());
294 return acting_rmap
[osd
];
297 const std::vector<pg_t>& get_osd_up_pgs(unsigned osd) {
298 assert(osd < up_rmap.size());
303 void update(const OSDMap
& map
);
304 void update(const OSDMap
& map
, pg_t pgid
);
306 std::unique_ptr
<MappingJob
> start_update(
308 ParallelPGMapper
& mapper
,
309 unsigned pgs_per_item
) {
310 std::unique_ptr
<MappingJob
> job(new MappingJob(&map
, this));
311 mapper
.queue(job
.get(), pgs_per_item
);
315 epoch_t
get_epoch() const {
319 uint64_t get_num_pgs() const {