1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
8 #include <boost/intrusive_ptr.hpp>
9 #include <seastar/core/future.hh>
11 #include "include/common_fwd.h"
12 #include "osd_operation.h"
13 #include "msg/MessageRef.h"
14 #include "crimson/common/exception.h"
15 #include "crimson/common/shared_lru.h"
16 #include "crimson/os/futurized_collection.h"
17 #include "osd/PeeringState.h"
18 #include "crimson/osd/osdmap_service.h"
19 #include "crimson/osd/osdmap_gate.h"
20 #include "crimson/osd/osd_meta.h"
21 #include "crimson/osd/object_context.h"
22 #include "crimson/osd/pg_map.h"
23 #include "crimson/osd/state.h"
24 #include "common/AsyncReserver.h"
26 namespace crimson::net
{
30 namespace crimson::mgr
{
34 namespace crimson::mon
{
38 namespace crimson::os
{
44 class BufferedRecoveryMessages
;
46 namespace crimson::osd
{
48 // seastar::sharded puts start_single on core 0
49 constexpr core_id_t PRIMARY_CORE
= 0;
56 * Per-shard state holding instances local to each shard.
59 friend class ShardServices
;
60 friend class PGShardManager
;
61 using cached_map_t
= OSDMapService::cached_map_t
;
62 using local_cached_map_t
= OSDMapService::local_cached_map_t
;
64 const core_id_t core
= seastar::this_shard_id();
65 #define assert_core() ceph_assert(seastar::this_shard_id() == core);
68 crimson::os::FuturizedStore::Shard
&store
;
69 crimson::common::CephContext cct
;
71 PerfCounters
*perf
= nullptr;
72 PerfCounters
*recoverystate_perf
= nullptr;
75 OSDOperationRegistry registry
;
76 OperationThrottler throttler
;
78 seastar::future
<> dump_ops_in_flight(Formatter
*f
) const;
81 OSDMapService::cached_map_t osdmap
;
82 const auto &get_osdmap() const {
86 void update_map(OSDMapService::cached_map_t new_osdmap
) {
88 osdmap
= std::move(new_osdmap
);
90 void set_up_epoch(epoch_t epoch
) {
95 // prevent creating new osd operations when system is shutting down,
96 // this is necessary because there are chances that a new operation
97 // is created, after the interruption of all ongoing operations, and
98 // creats and waits on a new and may-never-resolve future, in which
99 // case the shutdown may never succeed.
100 bool stopping
= false;
101 seastar::future
<> stop_registry() {
103 crimson::get_logger(ceph_subsys_osd
).info("PerShardState::{}", __func__
);
105 return registry
.stop();
111 seastar::future
<> stop_pgs();
112 std::map
<pg_t
, pg_stat_t
> get_pg_stats() const;
113 seastar::future
<> broadcast_map_to_pgs(
114 ShardServices
&shard_services
,
117 Ref
<PG
> get_pg(spg_t pgid
);
118 template <typename F
>
119 void for_each_pg(F
&&f
) const {
121 for (auto &pg
: pg_map
.get_pgs()) {
122 std::invoke(f
, pg
.first
, pg
.second
);
126 template <typename T
, typename
... Args
>
127 auto start_operation(Args
&&... args
) {
129 if (__builtin_expect(stopping
, false)) {
130 throw crimson::common::system_shutdown_exception();
132 auto op
= registry
.create_operation
<T
>(std::forward
<Args
>(args
)...);
133 crimson::get_logger(ceph_subsys_osd
).info(
134 "PerShardState::{}, {}", __func__
, *op
);
135 auto fut
= seastar::yield().then([op
] {
136 return op
->start().finally([op
/* by copy */] {
137 // ensure the op's lifetime is appropriate. It is not enough to
138 // guarantee it's alive at the scheduling stages (i.e. `then()`
139 // calling) but also during the actual execution (i.e. when passed
140 // lambdas are actually run).
143 return std::make_pair(std::move(op
), std::move(fut
));
146 template <typename InterruptorT
, typename T
, typename
... Args
>
147 auto start_operation_may_interrupt(Args
&&... args
) {
149 if (__builtin_expect(stopping
, false)) {
150 throw crimson::common::system_shutdown_exception();
152 auto op
= registry
.create_operation
<T
>(std::forward
<Args
>(args
)...);
153 crimson::get_logger(ceph_subsys_osd
).info(
154 "PerShardState::{}, {}", __func__
, *op
);
155 auto fut
= InterruptorT::make_interruptible(
157 ).then_interruptible([op
] {
158 return op
->start().finally([op
/* by copy */] {
159 // ensure the op's lifetime is appropriate. It is not enough to
160 // guarantee it's alive at the scheduling stages (i.e. `then()`
161 // calling) but also during the actual execution (i.e. when passed
162 // lambdas are actually run).
165 return std::make_pair(std::move(op
), std::move(fut
));
168 // tids for ops i issue, prefixed with core id to ensure uniqueness
170 ceph_tid_t
get_tid() {
175 HeartbeatStampsRef
get_hb_stamps(int peer
);
176 std::map
<int, HeartbeatStampsRef
> heartbeat_stamps
;
179 const ceph::mono_time startup_time
;
180 ceph::signedspan
get_mnow() const {
182 return ceph::mono_clock::now() - startup_time
;
188 ceph::mono_time startup_time
,
190 PerfCounters
*recoverystate_perf
,
191 crimson::os::FuturizedStore
&store
);
197 * OSD-wide singleton holding instances that need to be accessible
200 class OSDSingletonState
: public md_config_obs_t
{
201 friend class ShardServices
;
202 friend class PGShardManager
;
203 using cached_map_t
= OSDMapService::cached_map_t
;
204 using local_cached_map_t
= OSDMapService::local_cached_map_t
;
209 crimson::net::Messenger
&cluster_msgr
,
210 crimson::net::Messenger
&public_msgr
,
211 crimson::mon::Client
&monc
,
212 crimson::mgr::Client
&mgrc
);
217 crimson::common::CephContext cct
;
218 PerfCounters
*perf
= nullptr;
219 PerfCounters
*recoverystate_perf
= nullptr;
223 SharedLRU
<epoch_t
, OSDMap
> osdmaps
;
224 SimpleLRU
<epoch_t
, bufferlist
, false> map_bl_cache
;
227 cached_map_t
&get_osdmap() { return osdmap
; }
228 void update_map(cached_map_t new_osdmap
) {
229 osdmap
= std::move(new_osdmap
);
231 OSD_OSDMapGate osdmap_gate
;
233 crimson::net::Messenger
&cluster_msgr
;
234 crimson::net::Messenger
&public_msgr
;
236 seastar::future
<> send_to_osd(int peer
, MessageURef m
, epoch_t from_epoch
);
238 crimson::mon::Client
&monc
;
239 seastar::future
<> osdmap_subscribe(version_t epoch
, bool force_request
);
241 crimson::mgr::Client
&mgrc
;
243 std::unique_ptr
<OSDMeta
> meta_coll
;
244 template <typename
... Args
>
245 void init_meta_coll(Args
&&... args
) {
246 meta_coll
= std::make_unique
<OSDMeta
>(std::forward
<Args
>(args
)...);
248 OSDMeta
&get_meta_coll() {
253 OSDSuperblock superblock
;
254 void set_superblock(OSDSuperblock _superblock
) {
255 superblock
= std::move(_superblock
);
258 seastar::future
<> send_incremental_map(
259 crimson::net::Connection
&conn
,
262 seastar::future
<> send_incremental_map_to_osd(int osd
, epoch_t first
);
264 auto get_pool_info(int64_t poolid
) {
265 return get_meta_coll().load_final_pool_info(poolid
);
268 // global pg temp state
270 std::vector
<int> acting
;
273 std::map
<pg_t
, pg_temp_t
> pg_temp_wanted
;
274 std::map
<pg_t
, pg_temp_t
> pg_temp_pending
;
275 friend std::ostream
& operator<<(std::ostream
&, const pg_temp_t
&);
277 void queue_want_pg_temp(pg_t pgid
, const std::vector
<int>& want
,
278 bool forced
= false);
279 void remove_want_pg_temp(pg_t pgid
);
280 void requeue_pg_temp();
281 seastar::future
<> send_pg_temp();
283 // TODO: add config to control mapping
284 PGShardMapping pg_to_shard_mapping
{0, seastar::smp::count
};
286 std::set
<pg_t
> pg_created
;
287 seastar::future
<> send_pg_created(pg_t pgid
);
288 seastar::future
<> send_pg_created();
289 void prune_pg_created();
291 struct DirectFinisher
{
292 void queue(Context
*c
) {
296 AsyncReserver
<spg_t
, DirectFinisher
> local_reserver
;
297 AsyncReserver
<spg_t
, DirectFinisher
> remote_reserver
;
298 AsyncReserver
<spg_t
, DirectFinisher
> snap_reserver
;
300 epoch_t up_thru_wanted
= 0;
301 seastar::future
<> send_alive(epoch_t want
);
303 const char** get_tracked_conf_keys() const final
;
304 void handle_conf_change(
305 const ConfigProxy
& conf
,
306 const std::set
<std::string
> &changed
) final
;
308 seastar::future
<local_cached_map_t
> get_local_map(epoch_t e
);
309 seastar::future
<std::unique_ptr
<OSDMap
>> load_map(epoch_t e
);
310 seastar::future
<bufferlist
> load_map_bl(epoch_t e
);
311 seastar::future
<std::map
<epoch_t
, bufferlist
>>
312 load_map_bls(epoch_t first
, epoch_t last
);
313 void store_map_bl(ceph::os::Transaction
& t
,
314 epoch_t e
, bufferlist
&& bl
);
315 seastar::future
<> store_maps(ceph::os::Transaction
& t
,
316 epoch_t start
, Ref
<MOSDMap
> m
);
320 * Represents services available to each PG
322 class ShardServices
: public OSDMapService
{
323 friend class PGShardManager
;
324 using cached_map_t
= OSDMapService::cached_map_t
;
325 using local_cached_map_t
= OSDMapService::local_cached_map_t
;
327 PerShardState local_state
;
328 seastar::sharded
<OSDSingletonState
> &osd_singleton_state
;
330 template <typename F
, typename
... Args
>
331 auto with_singleton(F
&&f
, Args
&&... args
) {
332 return osd_singleton_state
.invoke_on(
335 std::forward
<Args
>(args
)...
339 #define FORWARD_CONST(FROM_METHOD, TO_METHOD, TARGET) \
340 template <typename... Args> \
341 auto FROM_METHOD(Args&&... args) const { \
342 return TARGET.TO_METHOD(std::forward<Args>(args)...); \
345 #define FORWARD(FROM_METHOD, TO_METHOD, TARGET) \
346 template <typename... Args> \
347 auto FROM_METHOD(Args&&... args) { \
348 return TARGET.TO_METHOD(std::forward<Args>(args)...); \
351 #define FORWARD_TO_LOCAL(METHOD) FORWARD(METHOD, METHOD, local_state)
352 #define FORWARD_TO_LOCAL_CONST(METHOD) FORWARD_CONST( \
353 METHOD, METHOD, local_state) \
355 #define FORWARD_TO_OSD_SINGLETON_TARGET(METHOD, TARGET) \
356 template <typename... Args> \
357 auto METHOD(Args&&... args) { \
358 return with_singleton( \
359 [](auto &local_state, auto&&... args) { \
360 return local_state.TARGET( \
361 std::forward<decltype(args)>(args)...); \
362 }, std::forward<Args>(args)...); \
364 #define FORWARD_TO_OSD_SINGLETON(METHOD) \
365 FORWARD_TO_OSD_SINGLETON_TARGET(METHOD, METHOD)
368 template <typename
... PSSArgs
>
370 seastar::sharded
<OSDSingletonState
> &osd_singleton_state
,
372 : local_state(std::forward
<PSSArgs
>(args
)...),
373 osd_singleton_state(osd_singleton_state
) {}
375 FORWARD_TO_OSD_SINGLETON(send_to_osd
)
377 crimson::os::FuturizedStore::Shard
&get_store() {
378 return local_state
.store
;
381 auto remove_pg(spg_t pgid
) {
382 local_state
.pg_map
.remove_pg(pgid
);
383 return with_singleton(
384 [pgid
](auto &osstate
) {
385 osstate
.pg_to_shard_mapping
.remove_pg(pgid
);
389 crimson::common::CephContext
*get_cct() {
390 return &(local_state
.cct
);
393 template <typename T
, typename
... Args
>
394 auto start_operation(Args
&&... args
) {
395 return local_state
.start_operation
<T
>(std::forward
<Args
>(args
)...);
398 template <typename InterruptorT
, typename T
, typename
... Args
>
399 auto start_operation_may_interrupt(Args
&&... args
) {
400 return local_state
.start_operation_may_interrupt
<
401 InterruptorT
, T
>(std::forward
<Args
>(args
)...);
404 auto &get_registry() { return local_state
.registry
; }
407 PerfCounters
&get_recoverystate_perf_logger() {
408 return *local_state
.recoverystate_perf
;
410 PerfCounters
&get_perf_logger() {
411 return *local_state
.perf
;
415 FORWARD_TO_LOCAL_CONST(dump_ops_in_flight
);
417 // Local PG Management
418 seastar::future
<Ref
<PG
>> make_pg(
419 cached_map_t create_map
,
422 seastar::future
<Ref
<PG
>> handle_pg_create_info(
423 std::unique_ptr
<PGCreateInfo
> info
);
425 using get_or_create_pg_ertr
= PGMap::wait_for_pg_ertr
;
426 using get_or_create_pg_ret
= get_or_create_pg_ertr::future
<Ref
<PG
>>;
427 get_or_create_pg_ret
get_or_create_pg(
428 PGMap::PGCreationBlockingEvent::TriggerI
&&,
431 std::unique_ptr
<PGCreateInfo
> info
);
433 using wait_for_pg_ertr
= PGMap::wait_for_pg_ertr
;
434 using wait_for_pg_ret
= wait_for_pg_ertr::future
<Ref
<PG
>>;
435 wait_for_pg_ret
wait_for_pg(
436 PGMap::PGCreationBlockingEvent::TriggerI
&&, spg_t pgid
);
437 seastar::future
<Ref
<PG
>> load_pg(spg_t pgid
);
439 /// Dispatch and reset ctx transaction
440 seastar::future
<> dispatch_context_transaction(
441 crimson::os::CollectionRef col
, PeeringCtx
&ctx
);
443 /// Dispatch and reset ctx messages
444 seastar::future
<> dispatch_context_messages(
445 BufferedRecoveryMessages
&&ctx
);
447 /// Dispatch ctx and dispose of context
448 seastar::future
<> dispatch_context(
449 crimson::os::CollectionRef col
,
452 /// Dispatch ctx and dispose of ctx, transaction must be empty
453 seastar::future
<> dispatch_context(
455 return dispatch_context({}, std::move(ctx
));
458 /// Return per-core tid
459 ceph_tid_t
get_tid() { return local_state
.get_tid(); }
461 /// Return core-local pg count * number of cores
462 unsigned get_num_local_pgs() const {
463 return local_state
.pg_map
.get_pg_count();
467 cached_map_t
get_map() const final
{ return local_state
.get_osdmap(); }
468 epoch_t
get_up_epoch() const final
{ return local_state
.up_epoch
; }
469 seastar::future
<cached_map_t
> get_map(epoch_t e
) final
{
470 return with_singleton(
471 [](auto &sstate
, epoch_t e
) {
472 return sstate
.get_local_map(
474 ).then([](auto lmap
) {
475 return seastar::foreign_ptr
<local_cached_map_t
>(lmap
);
477 }, e
).then([](auto fmap
) {
478 return make_local_shared_foreign(std::move(fmap
));
482 FORWARD_TO_OSD_SINGLETON(get_pool_info
)
483 FORWARD(with_throttle_while
, with_throttle_while
, local_state
.throttler
)
485 FORWARD_TO_OSD_SINGLETON(send_incremental_map
)
486 FORWARD_TO_OSD_SINGLETON(send_incremental_map_to_osd
)
488 FORWARD_TO_OSD_SINGLETON(osdmap_subscribe
)
489 FORWARD_TO_OSD_SINGLETON(queue_want_pg_temp
)
490 FORWARD_TO_OSD_SINGLETON(remove_want_pg_temp
)
491 FORWARD_TO_OSD_SINGLETON(requeue_pg_temp
)
492 FORWARD_TO_OSD_SINGLETON(send_pg_created
)
493 FORWARD_TO_OSD_SINGLETON(send_alive
)
494 FORWARD_TO_OSD_SINGLETON(send_pg_temp
)
495 FORWARD_TO_LOCAL_CONST(get_mnow
)
496 FORWARD_TO_LOCAL(get_hb_stamps
)
498 FORWARD(pg_created
, pg_created
, local_state
.pg_map
)
500 FORWARD_TO_OSD_SINGLETON_TARGET(
501 local_update_priority
,
502 local_reserver
.update_priority
)
503 FORWARD_TO_OSD_SINGLETON_TARGET(
504 local_cancel_reservation
,
505 local_reserver
.cancel_reservation
)
506 FORWARD_TO_OSD_SINGLETON_TARGET(
507 local_dump_reservations
,
509 FORWARD_TO_OSD_SINGLETON_TARGET(
510 remote_cancel_reservation
,
511 remote_reserver
.cancel_reservation
)
512 FORWARD_TO_OSD_SINGLETON_TARGET(
513 remote_dump_reservations
,
514 remote_reserver
.dump
)
515 FORWARD_TO_OSD_SINGLETON_TARGET(
516 snap_cancel_reservation
,
517 snap_reserver
.cancel_reservation
)
518 FORWARD_TO_OSD_SINGLETON_TARGET(
519 snap_dump_reservations
,
522 Context
*invoke_context_on_core(core_id_t core
, Context
*c
) {
523 if (!c
) return nullptr;
524 return new LambdaContext([core
, c
](int code
) {
525 std::ignore
= seastar::smp::submit_to(
532 seastar::future
<> local_request_reservation(
534 Context
*on_reserved
,
536 Context
*on_preempt
) {
537 return with_singleton(
538 [item
, prio
](OSDSingletonState
&singleton
,
539 Context
*wrapped_on_reserved
, Context
*wrapped_on_preempt
) {
540 return singleton
.local_reserver
.request_reservation(
546 invoke_context_on_core(seastar::this_shard_id(), on_reserved
),
547 invoke_context_on_core(seastar::this_shard_id(), on_preempt
));
549 seastar::future
<> remote_request_reservation(
551 Context
*on_reserved
,
553 Context
*on_preempt
) {
554 return with_singleton(
555 [item
, prio
](OSDSingletonState
&singleton
,
556 Context
*wrapped_on_reserved
, Context
*wrapped_on_preempt
) {
557 return singleton
.remote_reserver
.request_reservation(
563 invoke_context_on_core(seastar::this_shard_id(), on_reserved
),
564 invoke_context_on_core(seastar::this_shard_id(), on_preempt
));
566 seastar::future
<> snap_request_reservation(
568 Context
*on_reserved
,
570 return with_singleton(
571 [item
, prio
](OSDSingletonState
&singleton
,
572 Context
*wrapped_on_reserved
) {
573 return singleton
.snap_reserver
.request_reservation(
578 invoke_context_on_core(seastar::this_shard_id(), on_reserved
));
583 #undef FORWARD_TO_OSD_SINGLETON
584 #undef FORWARD_TO_LOCAL
585 #undef FORWARD_TO_LOCAL_CONST
590 #if FMT_VERSION >= 90000
591 template <> struct fmt::formatter
<crimson::osd::OSDSingletonState::pg_temp_t
> : fmt::ostream_formatter
{};