1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
5 #ifndef CEPH_OSDMAPMAPPING_H
6 #define CEPH_OSDMAPMAPPING_H
11 #include "osd/osd_types.h"
12 #include "common/WorkQueue.h"
13 #include "common/Cond.h"
17 /// work queue to perform work on batches of pgids on multiple CPUs
18 class ParallelPGMapper
{
21 utime_t start
, finish
;
25 Context
*onfinish
= nullptr;
27 ceph::mutex lock
= ceph::make_mutex("ParallelPGMapper::Job::lock");
28 ceph::condition_variable cond
;
30 Job(const OSDMap
*om
) : start(ceph_clock_now()), osdmap(om
) {}
32 ceph_assert(shards
== 0);
35 // child must implement either form of process
36 virtual void process(const std::vector
<pg_t
>& pgs
) = 0;
37 virtual void process(int64_t poolid
, unsigned ps_begin
, unsigned ps_end
) = 0;
38 virtual void complete() = 0;
40 void set_finish_event(Context
*fin
) {
53 std::lock_guard
l(lock
);
56 utime_t
get_duration() {
57 return finish
- start
;
60 std::unique_lock
l(lock
);
61 cond
.wait(l
, [this] { return shards
== 0; });
63 bool wait_for(double duration
) {
64 utime_t until
= start
;
66 std::unique_lock
l(lock
);
68 if (ceph_clock_now() >= until
) {
76 Context
*fin
= nullptr;
78 std::unique_lock
l(lock
);
82 cond
.wait(l
, [this] { return shards
== 0; });
85 fin
->complete(-ECANCELED
);
90 std::lock_guard
l(lock
);
103 std::vector
<pg_t
> pgs
;
105 Item(Job
*j
, std::vector
<pg_t
> pgs
) : job(j
), pgs(pgs
) {}
106 Item(Job
*j
, int64_t p
, unsigned b
, unsigned e
)
114 struct WQ
: public ThreadPool::WorkQueue
<Item
> {
117 WQ(ParallelPGMapper
*m_
, ThreadPool
*tp
)
118 : ThreadPool::WorkQueue
<Item
>(
119 "ParallelPGMapper::WQ",
120 ceph::make_timespan(m_
->cct
->_conf
->threadpool_default_timeout
),
121 ceph::timespan::zero(),
125 bool _enqueue(Item
*i
) override
{
129 void _dequeue(Item
*i
) override
{
132 Item
*_dequeue() override
{
133 while (!m
->q
.empty()) {
134 Item
*i
= m
->q
.front();
136 if (i
->job
->aborted
) {
137 i
->job
->finish_one();
146 void _process(Item
*i
, ThreadPool::TPHandle
&h
) override
;
148 void _clear() override
{
149 ceph_assert(_empty());
152 bool _empty() override
{
158 ParallelPGMapper(CephContext
*cct
, ThreadPool
*tp
)
164 unsigned pgs_per_item
,
165 const std::vector
<pg_t
>& input_pgs
);
173 /// a precalculated mapping of every PG for a given OSDMap
174 class OSDMapMapping
{
176 MEMPOOL_CLASS_HELPERS();
180 MEMPOOL_CLASS_HELPERS();
184 bool erasure
= false;
185 mempool::osdmap_mapping::vector
<int32_t> table
;
187 size_t row_size() const {
189 1 + // acting_primary
197 PoolMapping(int s
, int p
, bool e
)
201 table(pg_num
* row_size()) {
205 std::vector
<int> *up
,
207 std::vector
<int> *acting
,
208 int *acting_primary
) const {
209 const int32_t *row
= &table
[row_size() * ps
];
210 if (acting_primary
) {
211 *acting_primary
= row
[0];
214 *up_primary
= row
[1];
217 acting
->resize(row
[2]);
218 for (int i
= 0; i
< row
[2]; ++i
) {
219 (*acting
)[i
] = row
[4 + i
];
224 for (int i
= 0; i
< row
[3]; ++i
) {
225 (*up
)[i
] = row
[4 + size
+ i
];
231 const std::vector
<int>& up
,
233 const std::vector
<int>& acting
,
234 int acting_primary
) {
235 int32_t *row
= &table
[row_size() * ps
];
236 row
[0] = acting_primary
;
238 // these should always be <= the pool size, but just in case, avoid
239 // blowing out the array. Note that our mapping is not completely
240 // accurate in this case--this is just to avoid crashing.
241 row
[2] = std::min
<int32_t>(acting
.size(), size
);
242 row
[3] = std::min
<int32_t>(up
.size(), size
);
243 for (int i
= 0; i
< row
[2]; ++i
) {
244 row
[4 + i
] = acting
[i
];
246 for (int i
= 0; i
< row
[3]; ++i
) {
247 row
[4 + size
+ i
] = up
[i
];
252 mempool::osdmap_mapping::map
<int64_t,PoolMapping
> pools
;
253 mempool::osdmap_mapping::vector
<
254 mempool::osdmap_mapping::vector
<pg_t
>> acting_rmap
; // osd -> pg
255 //unused: mempool::osdmap_mapping::vector<std::vector<pg_t>> up_rmap; // osd -> pg
257 uint64_t num_pgs
= 0;
259 void _init_mappings(const OSDMap
& osdmap
);
263 unsigned pg_begin
, unsigned pg_end
);
265 void _build_rmap(const OSDMap
& osdmap
);
267 void _start(const OSDMap
& osdmap
) {
268 _init_mappings(osdmap
);
270 void _finish(const OSDMap
& osdmap
);
274 friend class ParallelPGMapper
;
276 struct MappingJob
: public ParallelPGMapper::Job
{
277 OSDMapMapping
*mapping
;
278 MappingJob(const OSDMap
*osdmap
, OSDMapMapping
*m
)
279 : Job(osdmap
), mapping(m
) {
280 mapping
->_start(*osdmap
);
282 void process(const std::vector
<pg_t
>& pgs
) override
{}
283 void process(int64_t pool
, unsigned ps_begin
, unsigned ps_end
) override
{
284 mapping
->_update_range(*osdmap
, pool
, ps_begin
, ps_end
);
286 void complete() override
{
287 mapping
->_finish(*osdmap
);
290 friend class OSDMapTest
;
292 void update(const OSDMap
& map
);
296 std::vector
<int> *up
,
298 std::vector
<int> *acting
,
299 int *acting_primary
) const {
300 auto p
= pools
.find(pgid
.pool());
301 ceph_assert(p
!= pools
.end());
302 ceph_assert(pgid
.ps() < p
->second
.pg_num
);
303 p
->second
.get(pgid
.ps(), up
, up_primary
, acting
, acting_primary
);
306 bool get_primary_and_shard(pg_t pgid
,
309 auto p
= pools
.find(pgid
.pool());
310 ceph_assert(p
!= pools
.end());
311 ceph_assert(pgid
.ps() < p
->second
.pg_num
);
312 std::vector
<int> acting
;
313 p
->second
.get(pgid
.ps(), nullptr, nullptr, &acting
, acting_primary
);
314 if (p
->second
.erasure
) {
315 for (uint8_t i
= 0; i
< acting
.size(); ++i
) {
316 if (acting
[i
] == *acting_primary
) {
317 *spgid
= spg_t(pgid
, shard_id_t(i
));
323 *spgid
= spg_t(pgid
);
328 const mempool::osdmap_mapping::vector
<pg_t
>& get_osd_acting_pgs(unsigned osd
) {
329 ceph_assert(osd
< acting_rmap
.size());
330 return acting_rmap
[osd
];
333 void update(const OSDMap
& map
, pg_t pgid
);
335 std::unique_ptr
<MappingJob
> start_update(
337 ParallelPGMapper
& mapper
,
338 unsigned pgs_per_item
) {
339 std::unique_ptr
<MappingJob
> job(new MappingJob(&map
, this));
340 mapper
.queue(job
.get(), pgs_per_item
, {});
344 epoch_t
get_epoch() const {
348 uint64_t get_num_pgs() const {