1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
8 #include <boost/intrusive_ptr.hpp>
9 #include <boost/smart_ptr/intrusive_ref_counter.hpp>
10 #include <boost/smart_ptr/local_shared_ptr.hpp>
11 #include <seastar/core/future.hh>
12 #include <seastar/core/shared_future.hh>
14 #include "common/dout.h"
15 #include "crimson/net/Fwd.h"
16 #include "os/Transaction.h"
17 #include "osd/osd_types.h"
18 #include "crimson/osd/object_context.h"
19 #include "osd/PeeringState.h"
21 #include "crimson/common/type_helpers.h"
22 #include "crimson/os/futurized_collection.h"
23 #include "crimson/osd/osd_operations/client_request.h"
24 #include "crimson/osd/osd_operations/peering_event.h"
25 #include "crimson/osd/osd_operations/replicated_request.h"
26 #include "crimson/osd/shard_services.h"
27 #include "crimson/osd/osdmap_gate.h"
37 namespace crimson::net
{
41 namespace crimson::os
{
45 namespace crimson::osd
{
48 class PG
: public boost::intrusive_ref_counter
<
50 boost::thread_unsafe_counter
>,
51 PeeringState::PeeringListener
,
54 using ec_profile_t
= std::map
<std::string
,std::string
>;
55 using cached_map_t
= boost::local_shared_ptr
<const OSDMap
>;
57 ClientRequest::PGPipeline client_request_pg_pipeline
;
58 PeeringEvent::PGPipeline peering_request_pg_pipeline
;
59 RepRequest::PGPipeline replicated_request_pg_pipeline
;
63 crimson::os::CollectionRef coll_ref
;
64 ghobject_t pgmeta_oid
;
66 seastar::timer
<seastar::lowres_clock
> check_readable_timer
;
67 seastar::timer
<seastar::lowres_clock
> renew_lease_timer
;
72 crimson::os::CollectionRef coll_ref
,
76 ShardServices
&shard_services
,
77 ec_profile_t profile
);
81 const pg_shard_t
& get_pg_whoami() const {
85 const spg_t
& get_pgid() const {
89 PGBackend
& get_backend() {
92 const PGBackend
& get_backend() const {
97 epoch_t
get_osdmap_epoch() const final
{
98 return peering_state
.get_osdmap_epoch();
101 // DoutPrefixProvider
102 std::ostream
& gen_prefix(std::ostream
& out
) const final
{
105 crimson::common::CephContext
*get_cct() const final
{
106 return shard_services
.get_cct();
108 unsigned get_subsys() const final
{
109 return ceph_subsys_osd
;
112 crimson::os::CollectionRef
get_collection_ref() {
119 pg_info_t
&last_written_info
,
120 PastIntervals
&past_intervals
,
124 bool need_write_epoch
,
125 ceph::os::Transaction
&t
) final
;
127 void on_info_history_change() final
{
128 // Not needed yet -- mainly for scrub scheduling
131 void scrub_requested(bool deep
, bool repair
, bool need_auto
= false) final
{
132 ceph_assert(0 == "Not implemented");
135 uint64_t get_snap_trimq_size() const final
{
139 void send_cluster_message(
141 epoch_t epoch
, bool share_map_update
=false) final
{
142 (void)shard_services
.send_to_osd(osd
, m
, epoch
);
145 void send_pg_created(pg_t pgid
) final
{
146 (void)shard_services
.send_pg_created(pgid
);
149 bool try_flush_or_schedule_async() final
;
151 void start_flush_on_transaction(
152 ceph::os::Transaction
&t
) final
{
153 t
.register_on_commit(
154 new LambdaContext([this](int r
){
155 peering_state
.complete_flush();
159 void on_flushed() final
{
160 // will be needed for unblocking IO operations/peering
163 void schedule_event_after(
164 PGPeeringEventRef event
,
166 ceph_assert(0 == "Not implemented yet");
169 void request_local_background_io_reservation(
171 PGPeeringEventRef on_grant
,
172 PGPeeringEventRef on_preempt
) final
{
173 ceph_assert(0 == "Not implemented yet");
176 void update_local_background_io_priority(
177 unsigned priority
) final
{
178 ceph_assert(0 == "Not implemented yet");
181 void cancel_local_background_io_reservation() final
{
182 // Not implemented yet, but gets called on exit() from some states
185 void request_remote_recovery_reservation(
187 PGPeeringEventRef on_grant
,
188 PGPeeringEventRef on_preempt
) final
{
189 ceph_assert(0 == "Not implemented yet");
192 void cancel_remote_recovery_reservation() final
{
193 // Not implemented yet, but gets called on exit() from some states
196 void schedule_event_on_commit(
197 ceph::os::Transaction
&t
,
198 PGPeeringEventRef on_commit
) final
{
199 t
.register_on_commit(
201 [this, on_commit
=std::move(on_commit
)](int r
){
202 shard_services
.start_operation
<LocalPeeringEvent
>(
207 std::move(*on_commit
));
211 void update_heartbeat_peers(set
<int> peers
) final
{
214 void set_probe_targets(const set
<pg_shard_t
> &probe_set
) final
{
217 void clear_probe_targets() final
{
220 void queue_want_pg_temp(const std::vector
<int> &wanted
) final
{
221 shard_services
.queue_want_pg_temp(pgid
.pgid
, wanted
);
223 void clear_want_pg_temp() final
{
224 shard_services
.remove_want_pg_temp(pgid
.pgid
);
226 void publish_stats_to_osd() final
{
229 void clear_publish_stats() final
{
232 void check_recovery_sources(const OSDMapRef
& newmap
) final
{
235 void check_blacklisted_watchers() final
{
238 void clear_primary_state() final
{
242 void queue_check_readable(epoch_t last_peering_reset
,
243 ceph::timespan delay
) final
;
244 void recheck_readable() final
;
246 unsigned get_target_pg_log_entries() const final
;
248 void on_pool_change() final
{
251 void on_role_change() final
{
254 void on_change(ceph::os::Transaction
&t
) final
{
257 void on_activate(interval_set
<snapid_t
> to_trim
) final
;
258 void on_activate_complete() final
;
259 void on_new_interval() final
{
262 Context
*on_clean() final
{
263 // Not needed yet (will be needed for IO unblocking)
266 void on_activate_committed() final
{
267 // Not needed yet (will be needed for IO unblocking)
269 void on_active_exit() final
{
273 void on_removal(ceph::os::Transaction
&t
) final
{
276 void do_delete_work(ceph::os::Transaction
&t
) final
;
278 // merge/split not ready
279 void clear_ready_to_merge() final
{}
280 void set_not_ready_to_merge_target(pg_t pgid
, pg_t src
) final
{}
281 void set_not_ready_to_merge_source(pg_t pgid
) final
{}
282 void set_ready_to_merge_target(eversion_t lu
, epoch_t les
, epoch_t lec
) final
{}
283 void set_ready_to_merge_source(eversion_t lu
) final
{}
285 void on_active_actmap() final
{
288 void on_active_advmap(const OSDMapRef
&osdmap
) final
{
291 epoch_t
oldest_stored_osdmap() final
{
297 void on_backfill_reserved() final
{
298 ceph_assert(0 == "Not implemented");
300 void on_backfill_canceled() final
{
301 ceph_assert(0 == "Not implemented");
303 void on_recovery_reserved() final
{
304 ceph_assert(0 == "Not implemented");
308 bool try_reserve_recovery_space(
309 int64_t primary_num_bytes
, int64_t local_num_bytes
) final
{
312 void unreserve_recovery_space() final
{}
314 struct PGLogEntryHandler
: public PGLog::LogEntryHandler
{
316 ceph::os::Transaction
*t
;
317 PGLogEntryHandler(PG
*pg
, ceph::os::Transaction
*t
) : pg(pg
), t(t
) {}
320 void remove(const hobject_t
&hoid
) override
{
323 void try_stash(const hobject_t
&hoid
, version_t v
) override
{
326 void rollback(const pg_log_entry_t
&entry
) override
{
329 void rollforward(const pg_log_entry_t
&entry
) override
{
332 void trim(const pg_log_entry_t
&entry
) override
{
336 PGLog::LogEntryHandlerRef
get_log_handler(
337 ceph::os::Transaction
&t
) final
{
338 return std::make_unique
<PG::PGLogEntryHandler
>(this, &t
);
341 void rebuild_missing_set_with_deletes(PGLog
&pglog
) final
{
342 ceph_assert(0 == "Impossible for crimson");
345 PerfCounters
&get_peering_perf() final
{
346 return shard_services
.get_recoverystate_perf_logger();
348 PerfCounters
&get_perf_logger() final
{
349 return shard_services
.get_perf_logger();
352 void log_state_enter(const char *state
) final
;
354 const char *state_name
, utime_t enter_time
,
355 uint64_t events
, utime_t event_dur
) final
;
357 void dump_recovery_info(Formatter
*f
) const final
{
360 OstreamTemp
get_clog_info() final
{
361 // not needed yet: replace with not a stub (needs to be wired up to monc)
362 return OstreamTemp(CLOG_INFO
, nullptr);
364 OstreamTemp
get_clog_debug() final
{
365 // not needed yet: replace with not a stub (needs to be wired up to monc)
366 return OstreamTemp(CLOG_DEBUG
, nullptr);
368 OstreamTemp
get_clog_error() final
{
369 // not needed yet: replace with not a stub (needs to be wired up to monc)
370 return OstreamTemp(CLOG_ERROR
, nullptr);
373 ceph::signedspan
get_mnow() final
;
374 HeartbeatStampsRef
get_hb_stamps(int peer
) final
;
375 void schedule_renew_lease(epoch_t plr
, ceph::timespan delay
) final
;
379 bool is_primary() const {
380 return peering_state
.is_primary();
382 pg_stat_t
get_stats() {
383 auto stats
= peering_state
.prepare_stats_for_publish(
386 object_stat_collection_t());
390 bool get_need_up_thru() const {
391 return peering_state
.get_need_up_thru();
394 const auto& get_pool() const {
395 return peering_state
.get_pool();
398 /// initialize created PG
401 const std::vector
<int>& up
,
403 const std::vector
<int>& acting
,
405 const pg_history_t
& history
,
406 const PastIntervals
& pim
,
408 ceph::os::Transaction
&t
);
410 seastar::future
<> read_state(crimson::os::FuturizedStore
* store
);
412 void do_peering_event(
413 PGPeeringEvent
& evt
, PeeringCtx
&rctx
);
415 void handle_advance_map(cached_map_t next_map
, PeeringCtx
&rctx
);
416 void handle_activate_map(PeeringCtx
&rctx
);
417 void handle_initialize(PeeringCtx
&rctx
);
419 static std::pair
<hobject_t
, RWState::State
> get_oid_and_lock(
421 const OpInfo
&op_info
);
422 static std::optional
<hobject_t
> resolve_oid(
423 const SnapSet
&snapset
,
424 const hobject_t
&oid
);
426 using load_obc_ertr
= crimson::errorator
<
427 crimson::ct_error::object_corrupted
>;
428 load_obc_ertr::future
<
429 std::pair
<crimson::osd::ObjectContextRef
, bool>>
430 get_or_load_clone_obc(
431 hobject_t oid
, crimson::osd::ObjectContextRef head_obc
);
433 load_obc_ertr::future
<
434 std::pair
<crimson::osd::ObjectContextRef
, bool>>
435 get_or_load_head_obc(hobject_t oid
);
437 load_obc_ertr::future
<ObjectContextRef
> get_locked_obc(
439 const hobject_t
&oid
,
440 RWState::State type
);
442 template <typename F
>
443 auto with_locked_obc(
445 const OpInfo
&op_info
,
448 auto [oid
, type
] = get_oid_and_lock(*m
, op_info
);
449 return get_locked_obc(op
, oid
, type
)
450 .safe_then([this, f
=std::forward
<F
>(f
), type
=type
](auto obc
) {
451 return f(obc
).finally([this, obc
, type
=type
] {
452 obc
->put_lock_type(type
);
453 return load_obc_ertr::now();
458 seastar::future
<> handle_rep_op(Ref
<MOSDRepOp
> m
);
459 void handle_rep_op_reply(crimson::net::Connection
* conn
,
460 const MOSDRepOpReply
& m
);
462 void print(std::ostream
& os
) const;
465 void do_peering_event(
466 const boost::statechart::event_base
&evt
,
468 seastar::future
<Ref
<MOSDOpReply
>> do_osd_ops(
470 ObjectContextRef obc
);
471 seastar::future
<Ref
<MOSDOpReply
>> do_pg_ops(Ref
<MOSDOp
> m
);
472 seastar::future
<> do_osd_op(
475 ceph::os::Transaction
& txn
);
476 seastar::future
<ceph::bufferlist
> do_pgnls(ceph::bufferlist
& indata
,
477 const std::string
& nspace
,
479 seastar::future
<> submit_transaction(ObjectContextRef
&& obc
,
480 ceph::os::Transaction
&& txn
,
484 OSDMapGate osdmap_gate
;
485 ShardServices
&shard_services
;
490 cached_map_t
get_osdmap() { return osdmap
; }
493 std::unique_ptr
<PGBackend
> backend
;
495 PeeringState peering_state
;
496 eversion_t projected_last_update
;
498 class WaitForActiveBlocker
: public BlockerT
<WaitForActiveBlocker
> {
502 seastar::shared_promise
<> p
;
505 void dump_detail(Formatter
*f
) const;
508 static constexpr const char *type_name
= "WaitForActiveBlocker";
510 WaitForActiveBlocker(PG
*pg
) : pg(pg
) {}
512 blocking_future
<> wait();
513 } wait_for_active_blocker
;
515 friend std::ostream
& operator<<(std::ostream
&, const PG
& pg
);
516 friend class ClientRequest
;
517 friend class PGAdvanceMap
;
518 friend class PeeringEvent
;
519 friend class RepRequest
;
522 std::ostream
& operator<<(std::ostream
&, const PG
& pg
);