1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include <boost/smart_ptr/make_local_shared.hpp>
6 #include "crimson/osd/shard_services.h"
8 #include "messages/MOSDAlive.h"
9 #include "messages/MOSDMap.h"
10 #include "messages/MOSDPGCreated.h"
11 #include "messages/MOSDPGTemp.h"
13 #include "osd/osd_perf_counters.h"
14 #include "osd/PeeringState.h"
15 #include "crimson/common/config_proxy.h"
16 #include "crimson/mgr/client.h"
17 #include "crimson/mon/MonClient.h"
18 #include "crimson/net/Messenger.h"
19 #include "crimson/net/Connection.h"
20 #include "crimson/os/cyanstore/cyan_store.h"
21 #include "crimson/osd/osdmap_service.h"
22 #include "crimson/osd/osd_operations/pg_advance_map.h"
23 #include "crimson/osd/pg.h"
24 #include "crimson/osd/pg_meta.h"
27 seastar::logger
& logger() {
28 return crimson::get_logger(ceph_subsys_osd
);
34 namespace crimson::osd
{
36 PerShardState::PerShardState(
38 ceph::mono_time startup_time
,
40 PerfCounters
*recoverystate_perf
,
41 crimson::os::FuturizedStore
&store
)
43 store(store
.get_sharded_store()),
44 perf(perf
), recoverystate_perf(recoverystate_perf
),
45 throttler(crimson::common::local_conf()),
47 static_cast<ceph_tid_t
>(seastar::this_shard_id()) <<
48 (std::numeric_limits
<ceph_tid_t
>::digits
- 8)),
49 startup_time(startup_time
)
52 seastar::future
<> PerShardState::dump_ops_in_flight(Formatter
*f
) const
54 registry
.for_each_op([f
](const auto &op
) {
57 return seastar::now();
60 seastar::future
<> PerShardState::stop_pgs()
63 return seastar::parallel_for_each(
66 return p
.second
->stop();
70 std::map
<pg_t
, pg_stat_t
> PerShardState::get_pg_stats() const
73 std::map
<pg_t
, pg_stat_t
> ret
;
74 for (auto [pgid
, pg
] : pg_map
.get_pgs()) {
75 if (pg
->is_primary()) {
76 auto stats
= pg
->get_stats();
77 // todo: update reported_epoch,reported_seq,last_fresh
78 stats
.reported_epoch
= osdmap
->get_epoch();
79 ret
.emplace(pgid
.pgid
, std::move(stats
));
85 seastar::future
<> PerShardState::broadcast_map_to_pgs(
86 ShardServices
&shard_services
,
90 auto &pgs
= pg_map
.get_pgs();
91 return seastar::parallel_for_each(
92 pgs
.begin(), pgs
.end(),
93 [=, &shard_services
](auto& pg
) {
94 return shard_services
.start_operation
<PGAdvanceMap
>(
97 PeeringCtx
{}, false).second
;
101 Ref
<PG
> PerShardState::get_pg(spg_t pgid
)
104 return pg_map
.get_pg(pgid
);
107 HeartbeatStampsRef
PerShardState::get_hb_stamps(int peer
)
110 auto [stamps
, added
] = heartbeat_stamps
.try_emplace(peer
);
112 stamps
->second
= ceph::make_ref
<HeartbeatStamps
>(peer
);
114 return stamps
->second
;
117 OSDSingletonState::OSDSingletonState(
119 crimson::net::Messenger
&cluster_msgr
,
120 crimson::net::Messenger
&public_msgr
,
121 crimson::mon::Client
&monc
,
122 crimson::mgr::Client
&mgrc
)
124 osdmap_gate("OSDSingletonState::osdmap_gate"),
125 cluster_msgr(cluster_msgr
),
126 public_msgr(public_msgr
),
132 crimson::common::local_conf()->osd_max_backfills
,
133 crimson::common::local_conf()->osd_min_recovery_priority
),
137 crimson::common::local_conf()->osd_max_backfills
,
138 crimson::common::local_conf()->osd_min_recovery_priority
),
142 crimson::common::local_conf()->osd_max_trimming_pgs
)
144 crimson::common::local_conf().add_observer(this);
145 osdmaps
[0] = boost::make_local_shared
<OSDMap
>();
147 perf
= build_osd_logger(&cct
);
148 cct
.get_perfcounters_collection()->add(perf
);
150 recoverystate_perf
= build_recoverystate_perf(&cct
);
151 cct
.get_perfcounters_collection()->add(recoverystate_perf
);
154 seastar::future
<> OSDSingletonState::send_to_osd(
155 int peer
, MessageURef m
, epoch_t from_epoch
)
157 if (osdmap
->is_down(peer
)) {
158 logger().info("{}: osd.{} is_down", __func__
, peer
);
159 return seastar::now();
160 } else if (osdmap
->get_info(peer
).up_from
> from_epoch
) {
161 logger().info("{}: osd.{} {} > {}", __func__
, peer
,
162 osdmap
->get_info(peer
).up_from
, from_epoch
);
163 return seastar::now();
165 auto conn
= cluster_msgr
.connect(
166 osdmap
->get_cluster_addrs(peer
).front(), CEPH_ENTITY_TYPE_OSD
);
167 return conn
->send(std::move(m
));
171 seastar::future
<> OSDSingletonState::osdmap_subscribe(
172 version_t epoch
, bool force_request
)
174 logger().info("{}({})", __func__
, epoch
);
175 if (monc
.sub_want_increment("osdmap", epoch
, CEPH_SUBSCRIBE_ONETIME
) ||
177 return monc
.renew_subs();
179 return seastar::now();
183 void OSDSingletonState::queue_want_pg_temp(
185 const vector
<int>& want
,
188 auto p
= pg_temp_pending
.find(pgid
);
189 if (p
== pg_temp_pending
.end() ||
190 p
->second
.acting
!= want
||
192 pg_temp_wanted
[pgid
] = {want
, forced
};
196 void OSDSingletonState::remove_want_pg_temp(pg_t pgid
)
198 pg_temp_wanted
.erase(pgid
);
199 pg_temp_pending
.erase(pgid
);
202 void OSDSingletonState::requeue_pg_temp()
204 unsigned old_wanted
= pg_temp_wanted
.size();
205 unsigned old_pending
= pg_temp_pending
.size();
206 pg_temp_wanted
.merge(pg_temp_pending
);
207 pg_temp_pending
.clear();
213 pg_temp_wanted
.size());
216 seastar::future
<> OSDSingletonState::send_pg_temp()
218 if (pg_temp_wanted
.empty())
219 return seastar::now();
220 logger().debug("{}: {}", __func__
, pg_temp_wanted
);
221 MURef
<MOSDPGTemp
> ms
[2] = {nullptr, nullptr};
222 for (auto& [pgid
, pg_temp
] : pg_temp_wanted
) {
223 auto& m
= ms
[pg_temp
.forced
];
225 m
= crimson::make_message
<MOSDPGTemp
>(osdmap
->get_epoch());
226 m
->forced
= pg_temp
.forced
;
228 m
->pg_temp
.emplace(pgid
, pg_temp
.acting
);
230 pg_temp_pending
.merge(pg_temp_wanted
);
231 pg_temp_wanted
.clear();
232 return seastar::parallel_for_each(std::begin(ms
), std::end(ms
),
235 return monc
.send_message(std::move(m
));
237 return seastar::now();
242 std::ostream
& operator<<(
244 const OSDSingletonState::pg_temp_t
& pg_temp
)
246 out
<< pg_temp
.acting
;
247 if (pg_temp
.forced
) {
253 seastar::future
<> OSDSingletonState::send_pg_created(pg_t pgid
)
255 logger().debug(__func__
);
256 auto o
= get_osdmap();
257 ceph_assert(o
->require_osd_release
>= ceph_release_t::luminous
);
258 pg_created
.insert(pgid
);
259 return monc
.send_message(crimson::make_message
<MOSDPGCreated
>(pgid
));
262 seastar::future
<> OSDSingletonState::send_pg_created()
264 logger().debug(__func__
);
265 auto o
= get_osdmap();
266 ceph_assert(o
->require_osd_release
>= ceph_release_t::luminous
);
267 return seastar::parallel_for_each(pg_created
,
269 return monc
.send_message(crimson::make_message
<MOSDPGCreated
>(pgid
));
273 void OSDSingletonState::prune_pg_created()
275 logger().debug(__func__
);
276 auto o
= get_osdmap();
277 auto i
= pg_created
.begin();
278 while (i
!= pg_created
.end()) {
279 auto p
= o
->get_pg_pool(i
->pool());
280 if (!p
|| !p
->has_flag(pg_pool_t::FLAG_CREATING
)) {
281 logger().debug("{} pruning {}", __func__
, *i
);
282 i
= pg_created
.erase(i
);
284 logger().debug(" keeping {}", __func__
, *i
);
290 seastar::future
<> OSDSingletonState::send_alive(const epoch_t want
)
293 "{} want={} up_thru_wanted={}",
298 if (want
> up_thru_wanted
) {
299 up_thru_wanted
= want
;
301 logger().debug("{} want={} <= up_thru_wanted={}; skipping",
302 __func__
, want
, up_thru_wanted
);
303 return seastar::now();
305 if (!osdmap
->exists(whoami
)) {
306 logger().warn("{} DNE", __func__
);
307 return seastar::now();
308 } if (const epoch_t up_thru
= osdmap
->get_up_thru(whoami
);
309 up_thru_wanted
> up_thru
) {
310 logger().debug("{} up_thru_wanted={} up_thru={}", __func__
, want
, up_thru
);
311 return monc
.send_message(
312 crimson::make_message
<MOSDAlive
>(osdmap
->get_epoch(), want
));
314 logger().debug("{} {} <= {}", __func__
, want
, osdmap
->get_up_thru(whoami
));
315 return seastar::now();
319 const char** OSDSingletonState::get_tracked_conf_keys() const
321 static const char* KEYS
[] = {
323 "osd_min_recovery_priority",
324 "osd_max_trimming_pgs",
330 void OSDSingletonState::handle_conf_change(
331 const ConfigProxy
& conf
,
332 const std::set
<std::string
> &changed
)
334 if (changed
.count("osd_max_backfills")) {
335 local_reserver
.set_max(conf
->osd_max_backfills
);
336 remote_reserver
.set_max(conf
->osd_max_backfills
);
338 if (changed
.count("osd_min_recovery_priority")) {
339 local_reserver
.set_min_priority(conf
->osd_min_recovery_priority
);
340 remote_reserver
.set_min_priority(conf
->osd_min_recovery_priority
);
342 if (changed
.count("osd_max_trimming_pgs")) {
343 snap_reserver
.set_max(conf
->osd_max_trimming_pgs
);
347 seastar::future
<OSDSingletonState::local_cached_map_t
>
348 OSDSingletonState::get_local_map(epoch_t e
)
350 // TODO: use LRU cache for managing osdmap, fallback to disk if we have to
351 if (auto found
= osdmaps
.find(e
); found
) {
352 return seastar::make_ready_future
<local_cached_map_t
>(std::move(found
));
354 return load_map(e
).then([e
, this](std::unique_ptr
<OSDMap
> osdmap
) {
355 return seastar::make_ready_future
<local_cached_map_t
>(
356 osdmaps
.insert(e
, std::move(osdmap
)));
361 void OSDSingletonState::store_map_bl(
362 ceph::os::Transaction
& t
,
363 epoch_t e
, bufferlist
&& bl
)
365 meta_coll
->store_map(t
, e
, bl
);
366 map_bl_cache
.insert(e
, std::move(bl
));
369 seastar::future
<bufferlist
> OSDSingletonState::load_map_bl(
372 if (std::optional
<bufferlist
> found
= map_bl_cache
.find(e
); found
) {
373 return seastar::make_ready_future
<bufferlist
>(*found
);
375 return meta_coll
->load_map(e
);
379 seastar::future
<std::map
<epoch_t
, bufferlist
>> OSDSingletonState::load_map_bls(
383 logger().debug("{} loading maps [{},{}]",
384 __func__
, first
, last
);
385 ceph_assert(first
<= last
);
386 return seastar::map_reduce(boost::make_counting_iterator
<epoch_t
>(first
),
387 boost::make_counting_iterator
<epoch_t
>(last
+ 1),
389 return load_map_bl(e
).then([e
](auto&& bl
) {
390 return seastar::make_ready_future
<std::pair
<epoch_t
, bufferlist
>>(
391 std::make_pair(e
, std::move(bl
)));
394 std::map
<epoch_t
, bufferlist
>{},
395 [](auto&& bls
, auto&& epoch_bl
) {
396 bls
.emplace(std::move(epoch_bl
));
397 return std::move(bls
);
401 seastar::future
<std::unique_ptr
<OSDMap
>> OSDSingletonState::load_map(epoch_t e
)
403 auto o
= std::make_unique
<OSDMap
>();
405 return load_map_bl(e
).then([o
=std::move(o
)](bufferlist bl
) mutable {
407 return seastar::make_ready_future
<std::unique_ptr
<OSDMap
>>(std::move(o
));
410 return seastar::make_ready_future
<std::unique_ptr
<OSDMap
>>(std::move(o
));
414 seastar::future
<> OSDSingletonState::store_maps(ceph::os::Transaction
& t
,
415 epoch_t start
, Ref
<MOSDMap
> m
)
417 return seastar::do_for_each(
418 boost::make_counting_iterator(start
),
419 boost::make_counting_iterator(m
->get_last() + 1),
420 [&t
, m
, this](epoch_t e
) {
421 if (auto p
= m
->maps
.find(e
); p
!= m
->maps
.end()) {
422 auto o
= std::make_unique
<OSDMap
>();
423 o
->decode(p
->second
);
424 logger().info("store_maps osdmap.{}", e
);
425 store_map_bl(t
, e
, std::move(std::move(p
->second
)));
426 osdmaps
.insert(e
, std::move(o
));
427 return seastar::now();
428 } else if (auto p
= m
->incremental_maps
.find(e
);
429 p
!= m
->incremental_maps
.end()) {
430 return load_map(e
- 1).then([e
, bl
=p
->second
, &t
, this](auto o
) {
431 OSDMap::Incremental inc
;
432 auto i
= bl
.cbegin();
434 o
->apply_incremental(inc
);
436 o
->encode(fbl
, inc
.encode_features
| CEPH_FEATURE_RESERVED
);
437 store_map_bl(t
, e
, std::move(fbl
));
438 osdmaps
.insert(e
, std::move(o
));
439 return seastar::now();
442 logger().error("MOSDMap lied about what maps it had?");
443 return seastar::now();
448 seastar::future
<Ref
<PG
>> ShardServices::make_pg(
449 OSDMapService::cached_map_t create_map
,
453 using ec_profile_t
= std::map
<std::string
, std::string
>;
454 auto get_pool_info_for_pg
= [create_map
, pgid
, this] {
455 if (create_map
->have_pg_pool(pgid
.pool())) {
456 pg_pool_t pi
= *create_map
->get_pg_pool(pgid
.pool());
457 std::string name
= create_map
->get_pool_name(pgid
.pool());
458 ec_profile_t ec_profile
;
459 if (pi
.is_erasure()) {
460 ec_profile
= create_map
->get_erasure_code_profile(
461 pi
.erasure_code_profile
);
463 return seastar::make_ready_future
<
464 std::tuple
<pg_pool_t
,std::string
, ec_profile_t
>
468 std::move(ec_profile
)));
470 // pool was deleted; grab final pg_pool_t off disk.
471 return get_pool_info(pgid
.pool());
474 auto get_collection
= [pgid
, do_create
, this] {
475 const coll_t cid
{pgid
};
477 return get_store().create_new_collection(cid
);
479 return get_store().open_collection(cid
);
482 return seastar::when_all(
483 std::move(get_pool_info_for_pg
),
484 std::move(get_collection
)
485 ).then([pgid
, create_map
, this](auto &&ret
) {
486 auto [pool
, name
, ec_profile
] = std::move(std::get
<0>(ret
).get0());
487 auto coll
= std::move(std::get
<1>(ret
).get0());
488 return seastar::make_ready_future
<Ref
<PG
>>(
491 pg_shard_t
{local_state
.whoami
, pgid
.shard
},
501 seastar::future
<Ref
<PG
>> ShardServices::handle_pg_create_info(
502 std::unique_ptr
<PGCreateInfo
> info
) {
503 return seastar::do_with(
506 -> seastar::future
<Ref
<PG
>> {
507 return get_map(info
->epoch
).then(
508 [&info
, this](cached_map_t startmap
)
509 -> seastar::future
<std::tuple
<Ref
<PG
>, cached_map_t
>> {
510 const spg_t
&pgid
= info
->pgid
;
512 int64_t pool_id
= pgid
.pgid
.pool();
513 const pg_pool_t
*pool
= get_map()->get_pg_pool(pool_id
);
516 "{} ignoring pgid {}, pool dne",
519 local_state
.pg_map
.pg_creation_canceled(pgid
);
520 return seastar::make_ready_future
<
521 std::tuple
<Ref
<PG
>, OSDMapService::cached_map_t
>
522 >(std::make_tuple(Ref
<PG
>(), startmap
));
523 } else if (!pool
->is_crimson()) {
525 "{} ignoring pgid {}, pool lacks crimson flag",
528 local_state
.pg_map
.pg_creation_canceled(pgid
);
529 return seastar::make_ready_future
<
530 std::tuple
<Ref
<PG
>, OSDMapService::cached_map_t
>
531 >(std::make_tuple(Ref
<PG
>(), startmap
));
533 ceph_assert(get_map()->require_osd_release
>=
534 ceph_release_t::octopus
);
535 if (!pool
->has_flag(pg_pool_t::FLAG_CREATING
)) {
536 // this ensures we do not process old creating messages after the
537 // pool's initial pgs have been created (and pg are subsequently
538 // allowed to split or merge).
540 "{} dropping {} create, pool does not have CREATING flag set",
543 local_state
.pg_map
.pg_creation_canceled(pgid
);
544 return seastar::make_ready_future
<
545 std::tuple
<Ref
<PG
>, OSDMapService::cached_map_t
>
546 >(std::make_tuple(Ref
<PG
>(), startmap
));
551 ).then([startmap
=std::move(startmap
)](auto pg
) mutable {
552 return seastar::make_ready_future
<
553 std::tuple
<Ref
<PG
>, OSDMapService::cached_map_t
>
554 >(std::make_tuple(std::move(pg
), std::move(startmap
)));
556 }).then([this, &info
](auto &&ret
)
557 ->seastar::future
<Ref
<PG
>> {
558 auto [pg
, startmap
] = std::move(ret
);
560 return seastar::make_ready_future
<Ref
<PG
>>(Ref
<PG
>());
561 const pg_pool_t
* pp
= startmap
->get_pg_pool(info
->pgid
.pool());
563 int up_primary
, acting_primary
;
564 vector
<int> up
, acting
;
565 startmap
->pg_to_up_acting_osds(
566 info
->pgid
.pgid
, &up
, &up_primary
, &acting
, &acting_primary
);
568 int role
= startmap
->calc_pg_role(
569 pg_shard_t(local_state
.whoami
, info
->pgid
.shard
),
573 create_pg_collection(
576 info
->pgid
.get_split_bits(pp
->get_pg_num()));
589 info
->past_intervals
,
592 return start_operation
<PGAdvanceMap
>(
593 *this, pg
, get_map()->get_epoch(), std::move(rctx
), true
594 ).second
.then([pg
=pg
] {
595 return seastar::make_ready_future
<Ref
<PG
>>(pg
);
602 ShardServices::get_or_create_pg_ret
603 ShardServices::get_or_create_pg(
604 PGMap::PGCreationBlockingEvent::TriggerI
&& trigger
,
607 std::unique_ptr
<PGCreateInfo
> info
)
610 auto [fut
, creating
] = local_state
.pg_map
.wait_for_pg(
611 std::move(trigger
), pgid
);
613 local_state
.pg_map
.set_creating(pgid
);
614 (void)handle_pg_create_info(
617 return std::move(fut
);
619 return get_or_create_pg_ret(
620 get_or_create_pg_ertr::ready_future_marker
{},
621 local_state
.pg_map
.get_pg(pgid
));
625 ShardServices::wait_for_pg_ret
626 ShardServices::wait_for_pg(
627 PGMap::PGCreationBlockingEvent::TriggerI
&& trigger
, spg_t pgid
)
629 return local_state
.pg_map
.wait_for_pg(std::move(trigger
), pgid
).first
;
632 seastar::future
<Ref
<PG
>> ShardServices::load_pg(spg_t pgid
)
635 logger().debug("{}: {}", __func__
, pgid
);
637 return seastar::do_with(PGMeta(get_store(), pgid
), [](auto& pg_meta
) {
638 return pg_meta
.get_epoch();
639 }).then([this](epoch_t e
) {
641 }).then([pgid
, this](auto&& create_map
) {
642 return make_pg(std::move(create_map
), pgid
, false);
643 }).then([this](Ref
<PG
> pg
) {
644 return pg
->read_state(&get_store()).then([pg
] {
645 return seastar::make_ready_future
<Ref
<PG
>>(std::move(pg
));
647 }).handle_exception([pgid
](auto ep
) {
648 logger().info("pg {} saw exception on load {}", pgid
, ep
);
649 ceph_abort("Could not load pg" == 0);
650 return seastar::make_exception_future
<Ref
<PG
>>(ep
);
654 seastar::future
<> ShardServices::dispatch_context_transaction(
655 crimson::os::CollectionRef col
, PeeringCtx
&ctx
) {
656 if (ctx
.transaction
.empty()) {
657 logger().debug("ShardServices::dispatch_context_transaction: empty transaction");
658 return seastar::now();
661 logger().debug("ShardServices::dispatch_context_transaction: do_transaction ...");
662 auto ret
= get_store().do_transaction(
664 std::move(ctx
.transaction
));
665 ctx
.reset_transaction();
669 seastar::future
<> ShardServices::dispatch_context_messages(
670 BufferedRecoveryMessages
&&ctx
)
672 auto ret
= seastar::parallel_for_each(std::move(ctx
.message_map
),
673 [this](auto& osd_messages
) {
674 auto& [peer
, messages
] = osd_messages
;
675 logger().debug("dispatch_context_messages sending messages to {}", peer
);
676 return seastar::parallel_for_each(
677 std::move(messages
), [=, peer
=peer
, this](auto& m
) {
678 return send_to_osd(peer
, std::move(m
), local_state
.osdmap
->get_epoch());
681 ctx
.message_map
.clear();
685 seastar::future
<> ShardServices::dispatch_context(
686 crimson::os::CollectionRef col
,
689 ceph_assert(col
|| ctx
.transaction
.empty());
690 return seastar::when_all_succeed(
691 dispatch_context_messages(
692 BufferedRecoveryMessages
{ctx
}),
693 col
? dispatch_context_transaction(col
, ctx
) : seastar::now()
695 return seastar::now();
699 seastar::future
<> OSDSingletonState::send_incremental_map(
700 crimson::net::Connection
&conn
,
703 if (first
>= superblock
.oldest_map
) {
705 first
, superblock
.newest_map
706 ).then([this, &conn
, first
](auto&& bls
) {
707 auto m
= crimson::make_message
<MOSDMap
>(
709 osdmap
->get_encoding_features());
710 m
->cluster_osdmap_trim_lower_bound
= first
;
711 m
->newest_map
= superblock
.newest_map
;
712 m
->maps
= std::move(bls
);
713 return conn
.send(std::move(m
));
716 return load_map_bl(osdmap
->get_epoch()
717 ).then([this, &conn
](auto&& bl
) mutable {
718 auto m
= crimson::make_message
<MOSDMap
>(
720 osdmap
->get_encoding_features());
721 /* TODO: once we support the tracking of superblock's
722 * cluster_osdmap_trim_lower_bound, the MOSDMap should
723 * be populated with this value instead of the oldest_map.
724 * See: OSD::handle_osd_map for how classic updates the
725 * cluster's trim lower bound.
727 m
->cluster_osdmap_trim_lower_bound
= superblock
.oldest_map
;
728 m
->newest_map
= superblock
.newest_map
;
729 m
->maps
.emplace(osdmap
->get_epoch(), std::move(bl
));
730 return conn
.send(std::move(m
));
735 seastar::future
<> OSDSingletonState::send_incremental_map_to_osd(
739 if (osdmap
->is_down(osd
)) {
740 logger().info("{}: osd.{} is_down", __func__
, osd
);
741 return seastar::now();
743 auto conn
= cluster_msgr
.connect(
744 osdmap
->get_cluster_addrs(osd
).front(), CEPH_ENTITY_TYPE_OSD
);
745 return send_incremental_map(*conn
, first
);