]> git.proxmox.com Git - ceph.git/blob - ceph/src/crimson/osd/pg.h
import quincy beta 17.1.0
[ceph.git] / ceph / src / crimson / osd / pg.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
2 // vim: ts=8 sw=2 smarttab expandtab
3
4 #pragma once
5
6 #include <memory>
7 #include <optional>
8 #include <boost/smart_ptr/intrusive_ref_counter.hpp>
9 #include <boost/smart_ptr/local_shared_ptr.hpp>
10 #include <seastar/core/future.hh>
11 #include <seastar/core/shared_future.hh>
12
13 #include "common/dout.h"
14 #include "crimson/net/Fwd.h"
15 #include "messages/MOSDRepOpReply.h"
16 #include "messages/MOSDOpReply.h"
17 #include "os/Transaction.h"
18 #include "osd/osd_types.h"
19 #include "crimson/osd/object_context.h"
20 #include "osd/PeeringState.h"
21
22 #include "crimson/common/interruptible_future.h"
23 #include "crimson/common/type_helpers.h"
24 #include "crimson/os/futurized_collection.h"
25 #include "crimson/osd/backfill_state.h"
26 #include "crimson/osd/pg_interval_interrupt_condition.h"
27 #include "crimson/osd/ops_executer.h"
28 #include "crimson/osd/osd_operations/client_request.h"
29 #include "crimson/osd/osd_operations/peering_event.h"
30 #include "crimson/osd/osd_operations/replicated_request.h"
31 #include "crimson/osd/osd_operations/background_recovery.h"
32 #include "crimson/osd/shard_services.h"
33 #include "crimson/osd/osdmap_gate.h"
34 #include "crimson/osd/pg_recovery.h"
35 #include "crimson/osd/pg_recovery_listener.h"
36 #include "crimson/osd/recovery_backend.h"
37
38 class MQuery;
39 class OSDMap;
40 class PGBackend;
41 class PGPeeringEvent;
42 class osd_op_params_t;
43
44 namespace recovery {
45 class Context;
46 }
47
48 namespace crimson::net {
49 class Messenger;
50 }
51
52 namespace crimson::os {
53 class FuturizedStore;
54 }
55
56 namespace crimson::osd {
57 class ClientRequest;
58 class OpsExecuter;
59
60 class PG : public boost::intrusive_ref_counter<
61 PG,
62 boost::thread_unsafe_counter>,
63 public PGRecoveryListener,
64 PeeringState::PeeringListener,
65 DoutPrefixProvider
66 {
67 using ec_profile_t = std::map<std::string,std::string>;
68 using cached_map_t = boost::local_shared_ptr<const OSDMap>;
69
70 ClientRequest::PGPipeline client_request_pg_pipeline;
71 PeeringEvent::PGPipeline peering_request_pg_pipeline;
72 RepRequest::PGPipeline replicated_request_pg_pipeline;
73
74 spg_t pgid;
75 pg_shard_t pg_whoami;
76 crimson::os::CollectionRef coll_ref;
77 ghobject_t pgmeta_oid;
78
79 seastar::timer<seastar::lowres_clock> check_readable_timer;
80 seastar::timer<seastar::lowres_clock> renew_lease_timer;
81
82 public:
83 template <typename T = void>
84 using interruptible_future =
85 ::crimson::interruptible::interruptible_future<
86 ::crimson::osd::IOInterruptCondition, T>;
87
88 PG(spg_t pgid,
89 pg_shard_t pg_shard,
90 crimson::os::CollectionRef coll_ref,
91 pg_pool_t&& pool,
92 std::string&& name,
93 cached_map_t osdmap,
94 ShardServices &shard_services,
95 ec_profile_t profile);
96
97 ~PG();
98
99 const pg_shard_t& get_pg_whoami() const final {
100 return pg_whoami;
101 }
102
103 const spg_t& get_pgid() const final {
104 return pgid;
105 }
106
107 PGBackend& get_backend() {
108 return *backend;
109 }
110 const PGBackend& get_backend() const {
111 return *backend;
112 }
113 // EpochSource
114 epoch_t get_osdmap_epoch() const final {
115 return peering_state.get_osdmap_epoch();
116 }
117
118 eversion_t get_pg_trim_to() const {
119 return peering_state.get_pg_trim_to();
120 }
121
122 eversion_t get_min_last_complete_ondisk() const {
123 return peering_state.get_min_last_complete_ondisk();
124 }
125
126 const pg_info_t& get_info() const final {
127 return peering_state.get_info();
128 }
129
130 // DoutPrefixProvider
131 std::ostream& gen_prefix(std::ostream& out) const final {
132 return out << *this;
133 }
134 crimson::common::CephContext *get_cct() const final {
135 return shard_services.get_cct();
136 }
137 unsigned get_subsys() const final {
138 return ceph_subsys_osd;
139 }
140
141 crimson::os::CollectionRef get_collection_ref() {
142 return coll_ref;
143 }
144
145 // PeeringListener
146 void prepare_write(
147 pg_info_t &info,
148 pg_info_t &last_written_info,
149 PastIntervals &past_intervals,
150 PGLog &pglog,
151 bool dirty_info,
152 bool dirty_big_info,
153 bool need_write_epoch,
154 ceph::os::Transaction &t) final;
155
156 void on_info_history_change() final {
157 // Not needed yet -- mainly for scrub scheduling
158 }
159
160 /// Notify PG that Primary/Replica status has changed (to update scrub registration)
161 void on_primary_status_change(bool was_primary, bool now_primary) final {
162 }
163
164 /// Need to reschedule next scrub. Assuming no change in role
165 void reschedule_scrub() final {
166 }
167
168 void scrub_requested(scrub_level_t scrub_level, scrub_type_t scrub_type) final;
169
170 uint64_t get_snap_trimq_size() const final {
171 return 0;
172 }
173
174 void send_cluster_message(
175 int osd, MessageURef m,
176 epoch_t epoch, bool share_map_update=false) final {
177 (void)shard_services.send_to_osd(osd, std::move(m), epoch);
178 }
179
180 void send_pg_created(pg_t pgid) final {
181 (void)shard_services.send_pg_created(pgid);
182 }
183
184 bool try_flush_or_schedule_async() final;
185
186 void start_flush_on_transaction(
187 ceph::os::Transaction &t) final {
188 t.register_on_commit(
189 new LambdaContext([this](int r){
190 peering_state.complete_flush();
191 }));
192 }
193
194 void on_flushed() final {
195 // will be needed for unblocking IO operations/peering
196 }
197
198 template <typename T>
199 void start_peering_event_operation(T &&evt, float delay = 0) {
200 (void) shard_services.start_operation<LocalPeeringEvent>(
201 this,
202 shard_services,
203 pg_whoami,
204 pgid,
205 delay,
206 std::forward<T>(evt));
207 }
208
209 void schedule_event_after(
210 PGPeeringEventRef event,
211 float delay) final {
212 start_peering_event_operation(std::move(*event), delay);
213 }
214 std::vector<pg_shard_t> get_replica_recovery_order() const final {
215 return peering_state.get_replica_recovery_order();
216 }
217 void request_local_background_io_reservation(
218 unsigned priority,
219 PGPeeringEventURef on_grant,
220 PGPeeringEventURef on_preempt) final {
221 shard_services.local_reserver.request_reservation(
222 pgid,
223 on_grant ? make_lambda_context([this, on_grant=std::move(on_grant)] (int) {
224 start_peering_event_operation(std::move(*on_grant));
225 }) : nullptr,
226 priority,
227 on_preempt ? make_lambda_context(
228 [this, on_preempt=std::move(on_preempt)] (int) {
229 start_peering_event_operation(std::move(*on_preempt));
230 }) : nullptr);
231 }
232
233 void update_local_background_io_priority(
234 unsigned priority) final {
235 shard_services.local_reserver.update_priority(
236 pgid,
237 priority);
238 }
239
240 void cancel_local_background_io_reservation() final {
241 shard_services.local_reserver.cancel_reservation(
242 pgid);
243 }
244
245 void request_remote_recovery_reservation(
246 unsigned priority,
247 PGPeeringEventURef on_grant,
248 PGPeeringEventURef on_preempt) final {
249 shard_services.remote_reserver.request_reservation(
250 pgid,
251 on_grant ? make_lambda_context([this, on_grant=std::move(on_grant)] (int) {
252 start_peering_event_operation(std::move(*on_grant));
253 }) : nullptr,
254 priority,
255 on_preempt ? make_lambda_context(
256 [this, on_preempt=std::move(on_preempt)] (int) {
257 start_peering_event_operation(std::move(*on_preempt));
258 }) : nullptr);
259 }
260
261 void cancel_remote_recovery_reservation() final {
262 shard_services.remote_reserver.cancel_reservation(
263 pgid);
264 }
265
266 void schedule_event_on_commit(
267 ceph::os::Transaction &t,
268 PGPeeringEventRef on_commit) final {
269 t.register_on_commit(
270 make_lambda_context(
271 [this, on_commit=std::move(on_commit)](int) {
272 start_peering_event_operation(std::move(*on_commit));
273 }));
274 }
275
276 void update_heartbeat_peers(std::set<int> peers) final {
277 // Not needed yet
278 }
279 void set_probe_targets(const std::set<pg_shard_t> &probe_set) final {
280 // Not needed yet
281 }
282 void clear_probe_targets() final {
283 // Not needed yet
284 }
285 void queue_want_pg_temp(const std::vector<int> &wanted) final {
286 shard_services.queue_want_pg_temp(pgid.pgid, wanted);
287 }
288 void clear_want_pg_temp() final {
289 shard_services.remove_want_pg_temp(pgid.pgid);
290 }
291 void check_recovery_sources(const OSDMapRef& newmap) final {
292 // Not needed yet
293 }
294 void check_blocklisted_watchers() final {
295 // Not needed yet
296 }
297 void clear_primary_state() final {
298 // Not needed yet
299 }
300
301 void queue_check_readable(epoch_t last_peering_reset,
302 ceph::timespan delay) final;
303 void recheck_readable() final;
304
305 unsigned get_target_pg_log_entries() const final;
306
307 void on_pool_change() final {
308 // Not needed yet
309 }
310 void on_role_change() final {
311 // Not needed yet
312 }
313 void on_change(ceph::os::Transaction &t) final;
314 void on_activate(interval_set<snapid_t> to_trim) final;
315 void on_activate_complete() final;
316 void on_new_interval() final {
317 // Not needed yet
318 }
319 Context *on_clean() final {
320 // Not needed yet (will be needed for IO unblocking)
321 return nullptr;
322 }
323 void on_activate_committed() final {
324 // Not needed yet (will be needed for IO unblocking)
325 }
326 void on_active_exit() final {
327 // Not needed yet
328 }
329
330 void on_removal(ceph::os::Transaction &t) final {
331 // TODO
332 }
333 std::pair<ghobject_t, bool>
334 do_delete_work(ceph::os::Transaction &t, ghobject_t _next) final;
335
336 // merge/split not ready
337 void clear_ready_to_merge() final {}
338 void set_not_ready_to_merge_target(pg_t pgid, pg_t src) final {}
339 void set_not_ready_to_merge_source(pg_t pgid) final {}
340 void set_ready_to_merge_target(eversion_t lu, epoch_t les, epoch_t lec) final {}
341 void set_ready_to_merge_source(eversion_t lu) final {}
342
343 void on_active_actmap() final {
344 // Not needed yet
345 }
346 void on_active_advmap(const OSDMapRef &osdmap) final {
347 // Not needed yet
348 }
349 epoch_t oldest_stored_osdmap() final {
350 // TODO
351 return 0;
352 }
353
354 void on_backfill_reserved() final {
355 recovery_handler->on_backfill_reserved();
356 }
357 void on_backfill_canceled() final {
358 ceph_assert(0 == "Not implemented");
359 }
360
361 void on_recovery_reserved() final {
362 recovery_handler->start_pglogbased_recovery();
363 }
364
365
366 bool try_reserve_recovery_space(
367 int64_t primary_num_bytes, int64_t local_num_bytes) final {
368 // TODO
369 return true;
370 }
371 void unreserve_recovery_space() final {}
372
373 struct PGLogEntryHandler : public PGLog::LogEntryHandler {
374 PG *pg;
375 ceph::os::Transaction *t;
376 PGLogEntryHandler(PG *pg, ceph::os::Transaction *t) : pg(pg), t(t) {}
377
378 // LogEntryHandler
379 void remove(const hobject_t &hoid) override {
380 // TODO
381 }
382 void try_stash(const hobject_t &hoid, version_t v) override {
383 // TODO
384 }
385 void rollback(const pg_log_entry_t &entry) override {
386 // TODO
387 }
388 void rollforward(const pg_log_entry_t &entry) override {
389 // TODO
390 }
391 void trim(const pg_log_entry_t &entry) override {
392 // TODO
393 }
394 };
395 PGLog::LogEntryHandlerRef get_log_handler(
396 ceph::os::Transaction &t) final {
397 return std::make_unique<PG::PGLogEntryHandler>(this, &t);
398 }
399
400 void rebuild_missing_set_with_deletes(PGLog &pglog) final {
401 ceph_assert(0 == "Impossible for crimson");
402 }
403
404 PerfCounters &get_peering_perf() final {
405 return shard_services.get_recoverystate_perf_logger();
406 }
407 PerfCounters &get_perf_logger() final {
408 return shard_services.get_perf_logger();
409 }
410
411 void log_state_enter(const char *state) final;
412 void log_state_exit(
413 const char *state_name, utime_t enter_time,
414 uint64_t events, utime_t event_dur) final;
415
416 void dump_recovery_info(Formatter *f) const final {
417 }
418
419 OstreamTemp get_clog_info() final {
420 // not needed yet: replace with not a stub (needs to be wired up to monc)
421 return OstreamTemp(CLOG_INFO, nullptr);
422 }
423 OstreamTemp get_clog_debug() final {
424 // not needed yet: replace with not a stub (needs to be wired up to monc)
425 return OstreamTemp(CLOG_DEBUG, nullptr);
426 }
427 OstreamTemp get_clog_error() final {
428 // not needed yet: replace with not a stub (needs to be wired up to monc)
429 return OstreamTemp(CLOG_ERROR, nullptr);
430 }
431
432 ceph::signedspan get_mnow() final;
433 HeartbeatStampsRef get_hb_stamps(int peer) final;
434 void schedule_renew_lease(epoch_t plr, ceph::timespan delay) final;
435
436
437 // Utility
438 bool is_primary() const final {
439 return peering_state.is_primary();
440 }
441 bool is_nonprimary() const {
442 return peering_state.is_nonprimary();
443 }
444 bool is_peered() const final {
445 return peering_state.is_peered();
446 }
447 bool is_recovering() const final {
448 return peering_state.is_recovering();
449 }
450 bool is_backfilling() const final {
451 return peering_state.is_backfilling();
452 }
453 uint64_t get_last_user_version() const {
454 return get_info().last_user_version;
455 }
456 bool get_need_up_thru() const {
457 return peering_state.get_need_up_thru();
458 }
459 epoch_t get_same_interval_since() const {
460 return get_info().history.same_interval_since;
461 }
462
463 const auto& get_pool() const {
464 return peering_state.get_pool();
465 }
466 pg_shard_t get_primary() const {
467 return peering_state.get_primary();
468 }
469
470 /// initialize created PG
471 void init(
472 int role,
473 const std::vector<int>& up,
474 int up_primary,
475 const std::vector<int>& acting,
476 int acting_primary,
477 const pg_history_t& history,
478 const PastIntervals& pim,
479 ceph::os::Transaction &t);
480
481 seastar::future<> read_state(crimson::os::FuturizedStore* store);
482
483 void do_peering_event(
484 PGPeeringEvent& evt, PeeringCtx &rctx);
485
486 void handle_advance_map(cached_map_t next_map, PeeringCtx &rctx);
487 void handle_activate_map(PeeringCtx &rctx);
488 void handle_initialize(PeeringCtx &rctx);
489
490 static hobject_t get_oid(const hobject_t& hobj);
491 static RWState::State get_lock_type(const OpInfo &op_info);
492 static std::optional<hobject_t> resolve_oid(
493 const SnapSet &snapset,
494 const hobject_t &oid);
495
496 using load_obc_ertr = crimson::errorator<
497 crimson::ct_error::object_corrupted>;
498 using load_obc_iertr =
499 ::crimson::interruptible::interruptible_errorator<
500 ::crimson::osd::IOInterruptCondition,
501 load_obc_ertr>;
502 using interruptor = ::crimson::interruptible::interruptor<
503 ::crimson::osd::IOInterruptCondition>;
504 load_obc_iertr::future<
505 std::pair<crimson::osd::ObjectContextRef, bool>>
506 get_or_load_clone_obc(
507 hobject_t oid, crimson::osd::ObjectContextRef head_obc);
508
509 load_obc_iertr::future<
510 std::pair<crimson::osd::ObjectContextRef, bool>>
511 get_or_load_head_obc(hobject_t oid);
512
513 load_obc_iertr::future<crimson::osd::ObjectContextRef>
514 load_head_obc(ObjectContextRef obc);
515
516 load_obc_iertr::future<>
517 reload_obc(crimson::osd::ObjectContext& obc) const;
518
519 public:
520 using with_obc_func_t =
521 std::function<load_obc_iertr::future<> (ObjectContextRef)>;
522
523 using obc_accessing_list_t = boost::intrusive::list<
524 ObjectContext,
525 ObjectContext::obc_accessing_option_t>;
526 obc_accessing_list_t obc_set_accessing;
527
528 template<RWState::State State>
529 load_obc_iertr::future<> with_head_obc(hobject_t oid, with_obc_func_t&& func);
530
531 template<RWState::State State>
532 interruptible_future<> with_locked_obc(
533 ObjectContextRef obc,
534 with_obc_func_t&& f);
535 load_obc_iertr::future<> with_locked_obc(
536 const hobject_t &hobj,
537 const OpInfo &op_info,
538 with_obc_func_t&& f);
539
540 interruptible_future<> handle_rep_op(Ref<MOSDRepOp> m);
541 void handle_rep_op_reply(crimson::net::ConnectionRef conn,
542 const MOSDRepOpReply& m);
543
544 void print(std::ostream& os) const;
545 void dump_primary(Formatter*);
546
547 private:
548 template<RWState::State State>
549 load_obc_iertr::future<> with_head_obc(
550 ObjectContextRef obc,
551 bool existed,
552 with_obc_func_t&& func);
553 template<RWState::State State>
554 interruptible_future<> with_existing_head_obc(
555 ObjectContextRef head,
556 with_obc_func_t&& func);
557
558 template<RWState::State State>
559 load_obc_iertr::future<> with_clone_obc(hobject_t oid, with_obc_func_t&& func);
560 template<RWState::State State>
561 interruptible_future<> with_existing_clone_obc(
562 ObjectContextRef clone, with_obc_func_t&& func);
563
564 load_obc_iertr::future<ObjectContextRef> get_locked_obc(
565 Operation *op,
566 const hobject_t &oid,
567 RWState::State type);
568
569 void fill_op_params_bump_pg_version(
570 osd_op_params_t& osd_op_p,
571 const bool user_modify);
572 using do_osd_ops_ertr = crimson::errorator<
573 crimson::ct_error::eagain>;
574 using do_osd_ops_iertr =
575 ::crimson::interruptible::interruptible_errorator<
576 ::crimson::osd::IOInterruptCondition,
577 ::crimson::errorator<crimson::ct_error::eagain>>;
578 template <typename Ret = void>
579 using pg_rep_op_fut_t =
580 std::tuple<interruptible_future<>,
581 do_osd_ops_iertr::future<Ret>>;
582 do_osd_ops_iertr::future<pg_rep_op_fut_t<MURef<MOSDOpReply>>> do_osd_ops(
583 Ref<MOSDOp> m,
584 ObjectContextRef obc,
585 const OpInfo &op_info);
586 using do_osd_ops_success_func_t =
587 std::function<do_osd_ops_iertr::future<>()>;
588 using do_osd_ops_failure_func_t =
589 std::function<do_osd_ops_iertr::future<>(const std::error_code&)>;
590 struct do_osd_ops_params_t;
591 do_osd_ops_iertr::future<pg_rep_op_fut_t<>> do_osd_ops(
592 ObjectContextRef obc,
593 std::vector<OSDOp>& ops,
594 const OpInfo &op_info,
595 const do_osd_ops_params_t& params,
596 do_osd_ops_success_func_t success_func,
597 do_osd_ops_failure_func_t failure_func);
598 template <class Ret, class SuccessFunc, class FailureFunc>
599 do_osd_ops_iertr::future<pg_rep_op_fut_t<Ret>> do_osd_ops_execute(
600 seastar::lw_shared_ptr<OpsExecuter> ox,
601 std::vector<OSDOp>& ops,
602 const OpInfo &op_info,
603 SuccessFunc&& success_func,
604 FailureFunc&& failure_func);
605 interruptible_future<MURef<MOSDOpReply>> do_pg_ops(Ref<MOSDOp> m);
606 std::tuple<interruptible_future<>, interruptible_future<>>
607 submit_transaction(
608 const OpInfo& op_info,
609 const std::vector<OSDOp>& ops,
610 ObjectContextRef&& obc,
611 ceph::os::Transaction&& txn,
612 osd_op_params_t&& oop);
613 interruptible_future<> repair_object(
614 const hobject_t& oid,
615 eversion_t& v);
616
617 private:
618 OSDMapGate osdmap_gate;
619 ShardServices &shard_services;
620
621 cached_map_t osdmap;
622
623 public:
624 cached_map_t get_osdmap() { return osdmap; }
625 eversion_t next_version() {
626 return eversion_t(get_osdmap_epoch(),
627 ++projected_last_update.version);
628 }
629 ShardServices& get_shard_services() final {
630 return shard_services;
631 }
632 seastar::future<> stop();
633 private:
634 std::unique_ptr<PGBackend> backend;
635 std::unique_ptr<RecoveryBackend> recovery_backend;
636 std::unique_ptr<PGRecovery> recovery_handler;
637
638 PeeringState peering_state;
639 eversion_t projected_last_update;
640
641 public:
642 // PeeringListener
643 void publish_stats_to_osd() final;
644 void clear_publish_stats() final;
645 pg_stat_t get_stats() const;
646 private:
647 std::optional<pg_stat_t> pg_stats;
648
649 public:
650 RecoveryBackend* get_recovery_backend() final {
651 return recovery_backend.get();
652 }
653 PGRecovery* get_recovery_handler() final {
654 return recovery_handler.get();
655 }
656 PeeringState& get_peering_state() final {
657 return peering_state;
658 }
659 bool has_reset_since(epoch_t epoch) const final {
660 return peering_state.pg_has_reset_since(epoch);
661 }
662
663 const pg_missing_tracker_t& get_local_missing() const {
664 return peering_state.get_pg_log().get_missing();
665 }
666 epoch_t get_last_peering_reset() const final {
667 return peering_state.get_last_peering_reset();
668 }
669 const std::set<pg_shard_t> &get_acting_recovery_backfill() const {
670 return peering_state.get_acting_recovery_backfill();
671 }
672 bool is_backfill_target(pg_shard_t osd) const {
673 return peering_state.is_backfill_target(osd);
674 }
675 void begin_peer_recover(pg_shard_t peer, const hobject_t oid) {
676 peering_state.begin_peer_recover(peer, oid);
677 }
678 uint64_t min_peer_features() const {
679 return peering_state.get_min_peer_features();
680 }
681 const std::map<hobject_t, std::set<pg_shard_t>>&
682 get_missing_loc_shards() const {
683 return peering_state.get_missing_loc().get_missing_locs();
684 }
685 const std::map<pg_shard_t, pg_missing_t> &get_shard_missing() const {
686 return peering_state.get_peer_missing();
687 }
688 epoch_t get_interval_start_epoch() const {
689 return get_info().history.same_interval_since;
690 }
691 const pg_missing_const_i* get_shard_missing(pg_shard_t shard) const {
692 if (shard == pg_whoami)
693 return &get_local_missing();
694 else {
695 auto it = peering_state.get_peer_missing().find(shard);
696 if (it == peering_state.get_peer_missing().end())
697 return nullptr;
698 else
699 return &it->second;
700 }
701 }
702 interruptible_future<std::tuple<bool, int>> already_complete(const osd_reqid_t& reqid);
703 int get_recovery_op_priority() const {
704 int64_t pri = 0;
705 get_pool().info.opts.get(pool_opts_t::RECOVERY_OP_PRIORITY, &pri);
706 return pri > 0 ? pri : crimson::common::local_conf()->osd_recovery_op_priority;
707 }
708 seastar::future<> mark_unfound_lost(int) {
709 // TODO: see PrimaryLogPG::mark_all_unfound_lost()
710 return seastar::now();
711 }
712
713 bool old_peering_msg(epoch_t reply_epoch, epoch_t query_epoch) const;
714
715 template <typename MsgType>
716 bool can_discard_replica_op(const MsgType& m) const {
717 return can_discard_replica_op(m, m.map_epoch);
718 }
719
720 private:
721 // instead of seastar::gate, we use a boolean flag to indicate
722 // whether the system is shutting down, as we don't need to track
723 // continuations here.
724 bool stopping = false;
725
726 class WaitForActiveBlocker : public BlockerT<WaitForActiveBlocker> {
727 PG *pg;
728
729 const spg_t pgid;
730 seastar::shared_promise<> p;
731
732 protected:
733 void dump_detail(Formatter *f) const;
734
735 public:
736 static constexpr const char *type_name = "WaitForActiveBlocker";
737
738 WaitForActiveBlocker(PG *pg) : pg(pg) {}
739 void on_active();
740 blocking_future<> wait();
741 seastar::future<> stop();
742 } wait_for_active_blocker;
743
744 friend std::ostream& operator<<(std::ostream&, const PG& pg);
745 friend class ClientRequest;
746 friend struct CommonClientRequest;
747 friend class PGAdvanceMap;
748 friend class PeeringEvent;
749 friend class RepRequest;
750 friend class BackfillRecovery;
751 friend struct PGFacade;
752 friend class InternalClientRequest;
753 friend class WatchTimeoutRequest;
754 private:
755 seastar::future<bool> find_unfound() {
756 return seastar::make_ready_future<bool>(true);
757 }
758
759 bool can_discard_replica_op(const Message& m, epoch_t m_map_epoch) const;
760 bool can_discard_op(const MOSDOp& m) const;
761 bool is_missing_object(const hobject_t& soid) const {
762 return peering_state.get_pg_log().get_missing().get_items().count(soid);
763 }
764 bool is_unreadable_object(const hobject_t &oid,
765 eversion_t* v = 0) const final {
766 return is_missing_object(oid) ||
767 !peering_state.get_missing_loc().readable_with_acting(
768 oid, get_actingset(), v);
769 }
770 bool is_degraded_or_backfilling_object(const hobject_t& soid) const;
771 const std::set<pg_shard_t> &get_actingset() const {
772 return peering_state.get_actingset();
773 }
774
775 private:
776 BackfillRecovery::BackfillRecoveryPipeline backfill_pipeline;
777
778 friend class IOInterruptCondition;
779 };
780
781 struct PG::do_osd_ops_params_t {
782 crimson::net::ConnectionRef get_connection() const {
783 return nullptr;
784 }
785 osd_reqid_t get_reqid() const {
786 return reqid;
787 }
788 utime_t get_mtime() const {
789 return mtime;
790 };
791 epoch_t get_map_epoch() const {
792 return map_epoch;
793 }
794 entity_inst_t get_orig_source_inst() const {
795 return orig_source_inst;
796 }
797 uint64_t get_features() const {
798 return features;
799 }
800 crimson::net::ConnectionRef conn;
801 osd_reqid_t reqid;
802 utime_t mtime;
803 epoch_t map_epoch;
804 entity_inst_t orig_source_inst;
805 uint64_t features;
806 };
807
808 std::ostream& operator<<(std::ostream&, const PG& pg);
809
810 }