1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
8 #include <boost/intrusive_ptr.hpp>
9 #include <boost/smart_ptr/intrusive_ref_counter.hpp>
10 #include <boost/smart_ptr/local_shared_ptr.hpp>
11 #include <seastar/core/future.hh>
12 #include <seastar/core/shared_future.hh>
13 #include <seastar/core/sleep.hh>
15 #include "common/dout.h"
16 #include "crimson/net/Fwd.h"
17 #include "messages/MOSDRepOpReply.h"
18 #include "messages/MOSDOpReply.h"
19 #include "os/Transaction.h"
20 #include "osd/osd_types.h"
21 #include "crimson/osd/object_context.h"
22 #include "osd/PeeringState.h"
24 #include "crimson/common/type_helpers.h"
25 #include "crimson/os/futurized_collection.h"
26 #include "crimson/osd/backfill_state.h"
27 #include "crimson/osd/osd_operations/client_request.h"
28 #include "crimson/osd/osd_operations/peering_event.h"
29 #include "crimson/osd/osd_operations/replicated_request.h"
30 #include "crimson/osd/osd_operations/background_recovery.h"
31 #include "crimson/osd/shard_services.h"
32 #include "crimson/osd/osdmap_gate.h"
33 #include "crimson/osd/pg_recovery.h"
34 #include "crimson/osd/pg_recovery_listener.h"
35 #include "crimson/osd/recovery_backend.h"
41 class osd_op_params_t
;
47 namespace crimson::net
{
51 namespace crimson::os
{
55 namespace crimson::osd
{
59 class PG
: public boost::intrusive_ref_counter
<
61 boost::thread_unsafe_counter
>,
62 public PGRecoveryListener
,
63 PeeringState::PeeringListener
,
66 using ec_profile_t
= std::map
<std::string
,std::string
>;
67 using cached_map_t
= boost::local_shared_ptr
<const OSDMap
>;
69 ClientRequest::PGPipeline client_request_pg_pipeline
;
70 PeeringEvent::PGPipeline peering_request_pg_pipeline
;
71 RepRequest::PGPipeline replicated_request_pg_pipeline
;
75 crimson::os::CollectionRef coll_ref
;
76 ghobject_t pgmeta_oid
;
78 seastar::timer
<seastar::lowres_clock
> check_readable_timer
;
79 seastar::timer
<seastar::lowres_clock
> renew_lease_timer
;
84 crimson::os::CollectionRef coll_ref
,
88 ShardServices
&shard_services
,
89 ec_profile_t profile
);
93 const pg_shard_t
& get_pg_whoami() const final
{
97 const spg_t
& get_pgid() const final
{
101 PGBackend
& get_backend() {
104 const PGBackend
& get_backend() const {
108 epoch_t
get_osdmap_epoch() const final
{
109 return peering_state
.get_osdmap_epoch();
112 eversion_t
get_pg_trim_to() const {
113 return peering_state
.get_pg_trim_to();
116 eversion_t
get_min_last_complete_ondisk() const {
117 return peering_state
.get_min_last_complete_ondisk();
120 const pg_info_t
& get_info() const final
{
121 return peering_state
.get_info();
124 // DoutPrefixProvider
125 std::ostream
& gen_prefix(std::ostream
& out
) const final
{
128 crimson::common::CephContext
*get_cct() const final
{
129 return shard_services
.get_cct();
131 unsigned get_subsys() const final
{
132 return ceph_subsys_osd
;
135 crimson::os::CollectionRef
get_collection_ref() {
142 pg_info_t
&last_written_info
,
143 PastIntervals
&past_intervals
,
147 bool need_write_epoch
,
148 ceph::os::Transaction
&t
) final
;
150 void on_info_history_change() final
{
151 // Not needed yet -- mainly for scrub scheduling
154 void scrub_requested(scrub_level_t scrub_level
, scrub_type_t scrub_type
) final
;
156 uint64_t get_snap_trimq_size() const final
{
160 void send_cluster_message(
161 int osd
, MessageRef m
,
162 epoch_t epoch
, bool share_map_update
=false) final
{
163 (void)shard_services
.send_to_osd(osd
, m
, epoch
);
166 void send_pg_created(pg_t pgid
) final
{
167 (void)shard_services
.send_pg_created(pgid
);
170 bool try_flush_or_schedule_async() final
;
172 void start_flush_on_transaction(
173 ceph::os::Transaction
&t
) final
{
174 t
.register_on_commit(
175 new LambdaContext([this](int r
){
176 peering_state
.complete_flush();
180 void on_flushed() final
{
181 // will be needed for unblocking IO operations/peering
184 template <typename T
>
185 void start_peering_event_operation(T
&&evt
, float delay
= 0) {
186 (void) shard_services
.start_operation
<LocalPeeringEvent
>(
192 std::forward
<T
>(evt
));
195 void schedule_event_after(
196 PGPeeringEventRef event
,
198 start_peering_event_operation(std::move(*event
), delay
);
200 std::vector
<pg_shard_t
> get_replica_recovery_order() const final
{
201 return peering_state
.get_replica_recovery_order();
203 void request_local_background_io_reservation(
205 PGPeeringEventURef on_grant
,
206 PGPeeringEventURef on_preempt
) final
{
207 shard_services
.local_reserver
.request_reservation(
209 on_grant
? make_lambda_context([this, on_grant
=std::move(on_grant
)] (int) {
210 start_peering_event_operation(std::move(*on_grant
));
213 on_preempt
? make_lambda_context(
214 [this, on_preempt
=std::move(on_preempt
)] (int) {
215 start_peering_event_operation(std::move(*on_preempt
));
219 void update_local_background_io_priority(
220 unsigned priority
) final
{
221 shard_services
.local_reserver
.update_priority(
226 void cancel_local_background_io_reservation() final
{
227 shard_services
.local_reserver
.cancel_reservation(
231 void request_remote_recovery_reservation(
233 PGPeeringEventURef on_grant
,
234 PGPeeringEventURef on_preempt
) final
{
235 shard_services
.remote_reserver
.request_reservation(
237 on_grant
? make_lambda_context([this, on_grant
=std::move(on_grant
)] (int) {
238 start_peering_event_operation(std::move(*on_grant
));
241 on_preempt
? make_lambda_context(
242 [this, on_preempt
=std::move(on_preempt
)] (int) {
243 start_peering_event_operation(std::move(*on_preempt
));
247 void cancel_remote_recovery_reservation() final
{
248 shard_services
.remote_reserver
.cancel_reservation(
252 void schedule_event_on_commit(
253 ceph::os::Transaction
&t
,
254 PGPeeringEventRef on_commit
) final
{
255 t
.register_on_commit(
257 [this, on_commit
=std::move(on_commit
)](int) {
258 start_peering_event_operation(std::move(*on_commit
));
262 void update_heartbeat_peers(set
<int> peers
) final
{
265 void set_probe_targets(const set
<pg_shard_t
> &probe_set
) final
{
268 void clear_probe_targets() final
{
271 void queue_want_pg_temp(const std::vector
<int> &wanted
) final
{
272 shard_services
.queue_want_pg_temp(pgid
.pgid
, wanted
);
274 void clear_want_pg_temp() final
{
275 shard_services
.remove_want_pg_temp(pgid
.pgid
);
277 void publish_stats_to_osd() final
{
281 (void) peering_state
.prepare_stats_for_publish(
284 object_stat_collection_t());
286 void clear_publish_stats() final
{
289 void check_recovery_sources(const OSDMapRef
& newmap
) final
{
292 void check_blocklisted_watchers() final
{
295 void clear_primary_state() final
{
299 void queue_check_readable(epoch_t last_peering_reset
,
300 ceph::timespan delay
) final
;
301 void recheck_readable() final
;
303 unsigned get_target_pg_log_entries() const final
;
305 void on_pool_change() final
{
308 void on_role_change() final
{
311 void on_change(ceph::os::Transaction
&t
) final
;
312 void on_activate(interval_set
<snapid_t
> to_trim
) final
;
313 void on_activate_complete() final
;
314 void on_new_interval() final
{
317 Context
*on_clean() final
{
318 // Not needed yet (will be needed for IO unblocking)
321 void on_activate_committed() final
{
322 // Not needed yet (will be needed for IO unblocking)
324 void on_active_exit() final
{
328 void on_removal(ceph::os::Transaction
&t
) final
{
331 std::pair
<ghobject_t
, bool>
332 do_delete_work(ceph::os::Transaction
&t
, ghobject_t _next
) final
;
334 // merge/split not ready
335 void clear_ready_to_merge() final
{}
336 void set_not_ready_to_merge_target(pg_t pgid
, pg_t src
) final
{}
337 void set_not_ready_to_merge_source(pg_t pgid
) final
{}
338 void set_ready_to_merge_target(eversion_t lu
, epoch_t les
, epoch_t lec
) final
{}
339 void set_ready_to_merge_source(eversion_t lu
) final
{}
341 void on_active_actmap() final
{
344 void on_active_advmap(const OSDMapRef
&osdmap
) final
{
347 epoch_t
oldest_stored_osdmap() final
{
352 void on_backfill_reserved() final
{
353 recovery_handler
->on_backfill_reserved();
355 void on_backfill_canceled() final
{
356 ceph_assert(0 == "Not implemented");
359 void on_recovery_reserved() final
{
360 recovery_handler
->start_pglogbased_recovery();
364 bool try_reserve_recovery_space(
365 int64_t primary_num_bytes
, int64_t local_num_bytes
) final
{
369 void unreserve_recovery_space() final
{}
371 struct PGLogEntryHandler
: public PGLog::LogEntryHandler
{
373 ceph::os::Transaction
*t
;
374 PGLogEntryHandler(PG
*pg
, ceph::os::Transaction
*t
) : pg(pg
), t(t
) {}
377 void remove(const hobject_t
&hoid
) override
{
380 void try_stash(const hobject_t
&hoid
, version_t v
) override
{
383 void rollback(const pg_log_entry_t
&entry
) override
{
386 void rollforward(const pg_log_entry_t
&entry
) override
{
389 void trim(const pg_log_entry_t
&entry
) override
{
393 PGLog::LogEntryHandlerRef
get_log_handler(
394 ceph::os::Transaction
&t
) final
{
395 return std::make_unique
<PG::PGLogEntryHandler
>(this, &t
);
398 void rebuild_missing_set_with_deletes(PGLog
&pglog
) final
{
399 ceph_assert(0 == "Impossible for crimson");
402 PerfCounters
&get_peering_perf() final
{
403 return shard_services
.get_recoverystate_perf_logger();
405 PerfCounters
&get_perf_logger() final
{
406 return shard_services
.get_perf_logger();
409 void log_state_enter(const char *state
) final
;
411 const char *state_name
, utime_t enter_time
,
412 uint64_t events
, utime_t event_dur
) final
;
414 void dump_recovery_info(Formatter
*f
) const final
{
417 OstreamTemp
get_clog_info() final
{
418 // not needed yet: replace with not a stub (needs to be wired up to monc)
419 return OstreamTemp(CLOG_INFO
, nullptr);
421 OstreamTemp
get_clog_debug() final
{
422 // not needed yet: replace with not a stub (needs to be wired up to monc)
423 return OstreamTemp(CLOG_DEBUG
, nullptr);
425 OstreamTemp
get_clog_error() final
{
426 // not needed yet: replace with not a stub (needs to be wired up to monc)
427 return OstreamTemp(CLOG_ERROR
, nullptr);
430 ceph::signedspan
get_mnow() final
;
431 HeartbeatStampsRef
get_hb_stamps(int peer
) final
;
432 void schedule_renew_lease(epoch_t plr
, ceph::timespan delay
) final
;
436 bool is_primary() const final
{
437 return peering_state
.is_primary();
439 bool is_nonprimary() const {
440 return peering_state
.is_nonprimary();
442 bool is_peered() const final
{
443 return peering_state
.is_peered();
445 bool is_recovering() const final
{
446 return peering_state
.is_recovering();
448 bool is_backfilling() const final
{
449 return peering_state
.is_backfilling();
451 pg_stat_t
get_stats() {
452 auto stats
= peering_state
.prepare_stats_for_publish(
455 object_stat_collection_t());
459 bool get_need_up_thru() const {
460 return peering_state
.get_need_up_thru();
462 epoch_t
get_same_interval_since() const {
463 return get_info().history
.same_interval_since
;
466 const auto& get_pool() const {
467 return peering_state
.get_pool();
469 pg_shard_t
get_primary() const {
470 return peering_state
.get_primary();
473 /// initialize created PG
476 const std::vector
<int>& up
,
478 const std::vector
<int>& acting
,
480 const pg_history_t
& history
,
481 const PastIntervals
& pim
,
483 ceph::os::Transaction
&t
);
485 seastar::future
<> read_state(crimson::os::FuturizedStore
* store
);
487 void do_peering_event(
488 PGPeeringEvent
& evt
, PeeringCtx
&rctx
);
490 void handle_advance_map(cached_map_t next_map
, PeeringCtx
&rctx
);
491 void handle_activate_map(PeeringCtx
&rctx
);
492 void handle_initialize(PeeringCtx
&rctx
);
494 static hobject_t
get_oid(const MOSDOp
&m
);
495 static RWState::State
get_lock_type(const OpInfo
&op_info
);
496 static std::optional
<hobject_t
> resolve_oid(
497 const SnapSet
&snapset
,
498 const hobject_t
&oid
);
500 using load_obc_ertr
= crimson::errorator
<
501 crimson::ct_error::object_corrupted
>;
503 load_obc_ertr::future
<crimson::osd::ObjectContextRef
>
504 load_head_obc(ObjectContextRef obc
);
506 load_obc_ertr::future
<>
507 reload_obc(crimson::osd::ObjectContext
& obc
) const;
510 using with_obc_func_t
=
511 std::function
<load_obc_ertr::future
<> (ObjectContextRef
)>;
513 template<RWState::State State
>
514 load_obc_ertr::future
<> with_head_obc(hobject_t oid
, with_obc_func_t
&& func
);
516 load_obc_ertr::future
<> with_locked_obc(
518 const OpInfo
&op_info
,
520 with_obc_func_t
&& f
);
522 seastar::future
<> handle_rep_op(Ref
<MOSDRepOp
> m
);
523 void handle_rep_op_reply(crimson::net::ConnectionRef conn
,
524 const MOSDRepOpReply
& m
);
526 void print(std::ostream
& os
) const;
527 void dump_primary(Formatter
*);
530 template<RWState::State State
>
531 load_obc_ertr::future
<> with_clone_obc(hobject_t oid
, with_obc_func_t
&& func
);
533 load_obc_ertr::future
<ObjectContextRef
> get_locked_obc(
535 const hobject_t
&oid
,
536 RWState::State type
);
538 void do_peering_event(
539 const boost::statechart::event_base
&evt
,
541 osd_op_params_t
&& fill_op_params_bump_pg_version(
542 osd_op_params_t
&& osd_op_p
,
544 const bool user_modify
);
545 seastar::future
<Ref
<MOSDOpReply
>> handle_failed_op(
546 const std::error_code
& e
,
547 ObjectContextRef obc
,
548 const OpsExecuter
& ox
,
549 const MOSDOp
& m
) const;
550 seastar::future
<Ref
<MOSDOpReply
>> do_osd_ops(
552 ObjectContextRef obc
,
553 const OpInfo
&op_info
);
554 seastar::future
<Ref
<MOSDOpReply
>> do_pg_ops(Ref
<MOSDOp
> m
);
555 seastar::future
<> submit_transaction(const OpInfo
& op_info
,
556 const std::vector
<OSDOp
>& ops
,
557 ObjectContextRef
&& obc
,
558 ceph::os::Transaction
&& txn
,
559 const osd_op_params_t
& oop
);
562 OSDMapGate osdmap_gate
;
563 ShardServices
&shard_services
;
568 cached_map_t
get_osdmap() { return osdmap
; }
569 eversion_t
next_version() {
570 return eversion_t(get_osdmap_epoch(),
571 ++projected_last_update
.version
);
573 ShardServices
& get_shard_services() final
{
574 return shard_services
;
576 seastar::future
<> stop();
579 std::unique_ptr
<PGBackend
> backend
;
580 std::unique_ptr
<RecoveryBackend
> recovery_backend
;
581 std::unique_ptr
<PGRecovery
> recovery_handler
;
583 PeeringState peering_state
;
584 eversion_t projected_last_update
;
586 RecoveryBackend
* get_recovery_backend() final
{
587 return recovery_backend
.get();
589 PGRecovery
* get_recovery_handler() final
{
590 return recovery_handler
.get();
592 PeeringState
& get_peering_state() final
{
593 return peering_state
;
595 bool has_reset_since(epoch_t epoch
) const final
{
596 return peering_state
.pg_has_reset_since(epoch
);
599 const pg_missing_tracker_t
& get_local_missing() const {
600 return peering_state
.get_pg_log().get_missing();
602 epoch_t
get_last_peering_reset() const final
{
603 return peering_state
.get_last_peering_reset();
605 const set
<pg_shard_t
> &get_acting_recovery_backfill() const {
606 return peering_state
.get_acting_recovery_backfill();
608 bool is_backfill_target(pg_shard_t osd
) const {
609 return peering_state
.is_backfill_target(osd
);
611 void begin_peer_recover(pg_shard_t peer
, const hobject_t oid
) {
612 peering_state
.begin_peer_recover(peer
, oid
);
614 uint64_t min_peer_features() const {
615 return peering_state
.get_min_peer_features();
617 const map
<hobject_t
, set
<pg_shard_t
>>&
618 get_missing_loc_shards() const {
619 return peering_state
.get_missing_loc().get_missing_locs();
621 const map
<pg_shard_t
, pg_missing_t
> &get_shard_missing() const {
622 return peering_state
.get_peer_missing();
624 const pg_missing_const_i
* get_shard_missing(pg_shard_t shard
) const {
625 if (shard
== pg_whoami
)
626 return &get_local_missing();
628 auto it
= peering_state
.get_peer_missing().find(shard
);
629 if (it
== peering_state
.get_peer_missing().end())
635 int get_recovery_op_priority() const {
637 get_pool().info
.opts
.get(pool_opts_t::RECOVERY_OP_PRIORITY
, &pri
);
638 return pri
> 0 ? pri
: crimson::common::local_conf()->osd_recovery_op_priority
;
640 seastar::future
<> mark_unfound_lost(int) {
641 // TODO: see PrimaryLogPG::mark_all_unfound_lost()
642 return seastar::now();
646 // instead of seastar::gate, we use a boolean flag to indicate
647 // whether the system is shutting down, as we don't need to track
648 // continuations here.
649 bool stopping
= false;
651 class WaitForActiveBlocker
: public BlockerT
<WaitForActiveBlocker
> {
655 seastar::shared_promise
<> p
;
658 void dump_detail(Formatter
*f
) const;
661 static constexpr const char *type_name
= "WaitForActiveBlocker";
663 WaitForActiveBlocker(PG
*pg
) : pg(pg
) {}
665 blocking_future
<> wait();
666 seastar::future
<> stop();
667 } wait_for_active_blocker
;
669 friend std::ostream
& operator<<(std::ostream
&, const PG
& pg
);
670 friend class ClientRequest
;
671 friend class PGAdvanceMap
;
672 friend class PeeringEvent
;
673 friend class RepRequest
;
674 friend class BackfillRecovery
;
675 friend struct PGFacade
;
677 seastar::future
<bool> find_unfound() {
678 return seastar::make_ready_future
<bool>(true);
681 template <typename MsgType
>
682 bool can_discard_replica_op(const MsgType
& m
) const;
683 bool can_discard_op(const MOSDOp
& m
) const;
684 bool is_missing_object(const hobject_t
& soid
) const {
685 return peering_state
.get_pg_log().get_missing().get_items().count(soid
);
687 bool is_unreadable_object(const hobject_t
&oid
,
688 eversion_t
* v
= 0) const final
{
689 return is_missing_object(oid
) ||
690 !peering_state
.get_missing_loc().readable_with_acting(
691 oid
, get_actingset(), v
);
693 bool is_degraded_or_backfilling_object(const hobject_t
& soid
) const;
694 const set
<pg_shard_t
> &get_actingset() const {
695 return peering_state
.get_actingset();
699 BackfillRecovery::BackfillRecoveryPipeline backfill_pipeline
;
702 std::ostream
& operator<<(std::ostream
&, const PG
& pg
);