]> git.proxmox.com Git - ceph.git/blob - ceph/src/crimson/osd/pg.h
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / crimson / osd / pg.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #pragma once
5
6 #include <memory>
7 #include <optional>
8 #include <boost/intrusive_ptr.hpp>
9 #include <boost/smart_ptr/intrusive_ref_counter.hpp>
10 #include <boost/smart_ptr/local_shared_ptr.hpp>
11 #include <seastar/core/future.hh>
12 #include <seastar/core/shared_future.hh>
13 #include <seastar/core/sleep.hh>
14
15 #include "common/dout.h"
16 #include "crimson/net/Fwd.h"
17 #include "messages/MOSDRepOpReply.h"
18 #include "messages/MOSDOpReply.h"
19 #include "os/Transaction.h"
20 #include "osd/osd_types.h"
21 #include "crimson/osd/object_context.h"
22 #include "osd/PeeringState.h"
23
24 #include "crimson/common/type_helpers.h"
25 #include "crimson/os/futurized_collection.h"
26 #include "crimson/osd/backfill_state.h"
27 #include "crimson/osd/osd_operations/client_request.h"
28 #include "crimson/osd/osd_operations/peering_event.h"
29 #include "crimson/osd/osd_operations/replicated_request.h"
30 #include "crimson/osd/osd_operations/background_recovery.h"
31 #include "crimson/osd/shard_services.h"
32 #include "crimson/osd/osdmap_gate.h"
33 #include "crimson/osd/pg_recovery.h"
34 #include "crimson/osd/pg_recovery_listener.h"
35 #include "crimson/osd/recovery_backend.h"
36
37 class MQuery;
38 class OSDMap;
39 class PGBackend;
40 class PGPeeringEvent;
41 class osd_op_params_t;
42
43 namespace recovery {
44 class Context;
45 }
46
47 namespace crimson::net {
48 class Messenger;
49 }
50
51 namespace crimson::os {
52 class FuturizedStore;
53 }
54
55 namespace crimson::osd {
56 class ClientRequest;
57 class OpsExecuter;
58
59 class PG : public boost::intrusive_ref_counter<
60 PG,
61 boost::thread_unsafe_counter>,
62 public PGRecoveryListener,
63 PeeringState::PeeringListener,
64 DoutPrefixProvider
65 {
66 using ec_profile_t = std::map<std::string,std::string>;
67 using cached_map_t = boost::local_shared_ptr<const OSDMap>;
68
69 ClientRequest::PGPipeline client_request_pg_pipeline;
70 PeeringEvent::PGPipeline peering_request_pg_pipeline;
71 RepRequest::PGPipeline replicated_request_pg_pipeline;
72
73 spg_t pgid;
74 pg_shard_t pg_whoami;
75 crimson::os::CollectionRef coll_ref;
76 ghobject_t pgmeta_oid;
77
78 seastar::timer<seastar::lowres_clock> check_readable_timer;
79 seastar::timer<seastar::lowres_clock> renew_lease_timer;
80
81 public:
82 PG(spg_t pgid,
83 pg_shard_t pg_shard,
84 crimson::os::CollectionRef coll_ref,
85 pg_pool_t&& pool,
86 std::string&& name,
87 cached_map_t osdmap,
88 ShardServices &shard_services,
89 ec_profile_t profile);
90
91 ~PG();
92
93 const pg_shard_t& get_pg_whoami() const final {
94 return pg_whoami;
95 }
96
97 const spg_t& get_pgid() const final {
98 return pgid;
99 }
100
101 PGBackend& get_backend() {
102 return *backend;
103 }
104 const PGBackend& get_backend() const {
105 return *backend;
106 }
107 // EpochSource
108 epoch_t get_osdmap_epoch() const final {
109 return peering_state.get_osdmap_epoch();
110 }
111
112 eversion_t get_pg_trim_to() const {
113 return peering_state.get_pg_trim_to();
114 }
115
116 eversion_t get_min_last_complete_ondisk() const {
117 return peering_state.get_min_last_complete_ondisk();
118 }
119
120 const pg_info_t& get_info() const final {
121 return peering_state.get_info();
122 }
123
124 // DoutPrefixProvider
125 std::ostream& gen_prefix(std::ostream& out) const final {
126 return out << *this;
127 }
128 crimson::common::CephContext *get_cct() const final {
129 return shard_services.get_cct();
130 }
131 unsigned get_subsys() const final {
132 return ceph_subsys_osd;
133 }
134
135 crimson::os::CollectionRef get_collection_ref() {
136 return coll_ref;
137 }
138
139 // PeeringListener
140 void prepare_write(
141 pg_info_t &info,
142 pg_info_t &last_written_info,
143 PastIntervals &past_intervals,
144 PGLog &pglog,
145 bool dirty_info,
146 bool dirty_big_info,
147 bool need_write_epoch,
148 ceph::os::Transaction &t) final;
149
150 void on_info_history_change() final {
151 // Not needed yet -- mainly for scrub scheduling
152 }
153
154 void scrub_requested(scrub_level_t scrub_level, scrub_type_t scrub_type) final;
155
156 uint64_t get_snap_trimq_size() const final {
157 return 0;
158 }
159
160 void send_cluster_message(
161 int osd, MessageRef m,
162 epoch_t epoch, bool share_map_update=false) final {
163 (void)shard_services.send_to_osd(osd, m, epoch);
164 }
165
166 void send_pg_created(pg_t pgid) final {
167 (void)shard_services.send_pg_created(pgid);
168 }
169
170 bool try_flush_or_schedule_async() final;
171
172 void start_flush_on_transaction(
173 ceph::os::Transaction &t) final {
174 t.register_on_commit(
175 new LambdaContext([this](int r){
176 peering_state.complete_flush();
177 }));
178 }
179
180 void on_flushed() final {
181 // will be needed for unblocking IO operations/peering
182 }
183
184 template <typename T>
185 void start_peering_event_operation(T &&evt, float delay = 0) {
186 (void) shard_services.start_operation<LocalPeeringEvent>(
187 this,
188 shard_services,
189 pg_whoami,
190 pgid,
191 delay,
192 std::forward<T>(evt));
193 }
194
195 void schedule_event_after(
196 PGPeeringEventRef event,
197 float delay) final {
198 start_peering_event_operation(std::move(*event), delay);
199 }
200 std::vector<pg_shard_t> get_replica_recovery_order() const final {
201 return peering_state.get_replica_recovery_order();
202 }
203 void request_local_background_io_reservation(
204 unsigned priority,
205 PGPeeringEventURef on_grant,
206 PGPeeringEventURef on_preempt) final {
207 shard_services.local_reserver.request_reservation(
208 pgid,
209 on_grant ? make_lambda_context([this, on_grant=std::move(on_grant)] (int) {
210 start_peering_event_operation(std::move(*on_grant));
211 }) : nullptr,
212 priority,
213 on_preempt ? make_lambda_context(
214 [this, on_preempt=std::move(on_preempt)] (int) {
215 start_peering_event_operation(std::move(*on_preempt));
216 }) : nullptr);
217 }
218
219 void update_local_background_io_priority(
220 unsigned priority) final {
221 shard_services.local_reserver.update_priority(
222 pgid,
223 priority);
224 }
225
226 void cancel_local_background_io_reservation() final {
227 shard_services.local_reserver.cancel_reservation(
228 pgid);
229 }
230
231 void request_remote_recovery_reservation(
232 unsigned priority,
233 PGPeeringEventURef on_grant,
234 PGPeeringEventURef on_preempt) final {
235 shard_services.remote_reserver.request_reservation(
236 pgid,
237 on_grant ? make_lambda_context([this, on_grant=std::move(on_grant)] (int) {
238 start_peering_event_operation(std::move(*on_grant));
239 }) : nullptr,
240 priority,
241 on_preempt ? make_lambda_context(
242 [this, on_preempt=std::move(on_preempt)] (int) {
243 start_peering_event_operation(std::move(*on_preempt));
244 }) : nullptr);
245 }
246
247 void cancel_remote_recovery_reservation() final {
248 shard_services.remote_reserver.cancel_reservation(
249 pgid);
250 }
251
252 void schedule_event_on_commit(
253 ceph::os::Transaction &t,
254 PGPeeringEventRef on_commit) final {
255 t.register_on_commit(
256 make_lambda_context(
257 [this, on_commit=std::move(on_commit)](int) {
258 start_peering_event_operation(std::move(*on_commit));
259 }));
260 }
261
262 void update_heartbeat_peers(set<int> peers) final {
263 // Not needed yet
264 }
265 void set_probe_targets(const set<pg_shard_t> &probe_set) final {
266 // Not needed yet
267 }
268 void clear_probe_targets() final {
269 // Not needed yet
270 }
271 void queue_want_pg_temp(const std::vector<int> &wanted) final {
272 shard_services.queue_want_pg_temp(pgid.pgid, wanted);
273 }
274 void clear_want_pg_temp() final {
275 shard_services.remove_want_pg_temp(pgid.pgid);
276 }
277 void publish_stats_to_osd() final {
278 if (!is_primary())
279 return;
280
281 (void) peering_state.prepare_stats_for_publish(
282 false,
283 pg_stat_t(),
284 object_stat_collection_t());
285 }
286 void clear_publish_stats() final {
287 // Not needed yet
288 }
289 void check_recovery_sources(const OSDMapRef& newmap) final {
290 // Not needed yet
291 }
292 void check_blocklisted_watchers() final {
293 // Not needed yet
294 }
295 void clear_primary_state() final {
296 // Not needed yet
297 }
298
299 void queue_check_readable(epoch_t last_peering_reset,
300 ceph::timespan delay) final;
301 void recheck_readable() final;
302
303 unsigned get_target_pg_log_entries() const final;
304
305 void on_pool_change() final {
306 // Not needed yet
307 }
308 void on_role_change() final {
309 // Not needed yet
310 }
311 void on_change(ceph::os::Transaction &t) final;
312 void on_activate(interval_set<snapid_t> to_trim) final;
313 void on_activate_complete() final;
314 void on_new_interval() final {
315 // Not needed yet
316 }
317 Context *on_clean() final {
318 // Not needed yet (will be needed for IO unblocking)
319 return nullptr;
320 }
321 void on_activate_committed() final {
322 // Not needed yet (will be needed for IO unblocking)
323 }
324 void on_active_exit() final {
325 // Not needed yet
326 }
327
328 void on_removal(ceph::os::Transaction &t) final {
329 // TODO
330 }
331 std::pair<ghobject_t, bool>
332 do_delete_work(ceph::os::Transaction &t, ghobject_t _next) final;
333
334 // merge/split not ready
335 void clear_ready_to_merge() final {}
336 void set_not_ready_to_merge_target(pg_t pgid, pg_t src) final {}
337 void set_not_ready_to_merge_source(pg_t pgid) final {}
338 void set_ready_to_merge_target(eversion_t lu, epoch_t les, epoch_t lec) final {}
339 void set_ready_to_merge_source(eversion_t lu) final {}
340
341 void on_active_actmap() final {
342 // Not needed yet
343 }
344 void on_active_advmap(const OSDMapRef &osdmap) final {
345 // Not needed yet
346 }
347 epoch_t oldest_stored_osdmap() final {
348 // TODO
349 return 0;
350 }
351
352 void on_backfill_reserved() final {
353 recovery_handler->on_backfill_reserved();
354 }
355 void on_backfill_canceled() final {
356 ceph_assert(0 == "Not implemented");
357 }
358
359 void on_recovery_reserved() final {
360 recovery_handler->start_pglogbased_recovery();
361 }
362
363
364 bool try_reserve_recovery_space(
365 int64_t primary_num_bytes, int64_t local_num_bytes) final {
366 // TODO
367 return true;
368 }
369 void unreserve_recovery_space() final {}
370
371 struct PGLogEntryHandler : public PGLog::LogEntryHandler {
372 PG *pg;
373 ceph::os::Transaction *t;
374 PGLogEntryHandler(PG *pg, ceph::os::Transaction *t) : pg(pg), t(t) {}
375
376 // LogEntryHandler
377 void remove(const hobject_t &hoid) override {
378 // TODO
379 }
380 void try_stash(const hobject_t &hoid, version_t v) override {
381 // TODO
382 }
383 void rollback(const pg_log_entry_t &entry) override {
384 // TODO
385 }
386 void rollforward(const pg_log_entry_t &entry) override {
387 // TODO
388 }
389 void trim(const pg_log_entry_t &entry) override {
390 // TODO
391 }
392 };
393 PGLog::LogEntryHandlerRef get_log_handler(
394 ceph::os::Transaction &t) final {
395 return std::make_unique<PG::PGLogEntryHandler>(this, &t);
396 }
397
398 void rebuild_missing_set_with_deletes(PGLog &pglog) final {
399 ceph_assert(0 == "Impossible for crimson");
400 }
401
402 PerfCounters &get_peering_perf() final {
403 return shard_services.get_recoverystate_perf_logger();
404 }
405 PerfCounters &get_perf_logger() final {
406 return shard_services.get_perf_logger();
407 }
408
409 void log_state_enter(const char *state) final;
410 void log_state_exit(
411 const char *state_name, utime_t enter_time,
412 uint64_t events, utime_t event_dur) final;
413
414 void dump_recovery_info(Formatter *f) const final {
415 }
416
417 OstreamTemp get_clog_info() final {
418 // not needed yet: replace with not a stub (needs to be wired up to monc)
419 return OstreamTemp(CLOG_INFO, nullptr);
420 }
421 OstreamTemp get_clog_debug() final {
422 // not needed yet: replace with not a stub (needs to be wired up to monc)
423 return OstreamTemp(CLOG_DEBUG, nullptr);
424 }
425 OstreamTemp get_clog_error() final {
426 // not needed yet: replace with not a stub (needs to be wired up to monc)
427 return OstreamTemp(CLOG_ERROR, nullptr);
428 }
429
430 ceph::signedspan get_mnow() final;
431 HeartbeatStampsRef get_hb_stamps(int peer) final;
432 void schedule_renew_lease(epoch_t plr, ceph::timespan delay) final;
433
434
435 // Utility
436 bool is_primary() const final {
437 return peering_state.is_primary();
438 }
439 bool is_nonprimary() const {
440 return peering_state.is_nonprimary();
441 }
442 bool is_peered() const final {
443 return peering_state.is_peered();
444 }
445 bool is_recovering() const final {
446 return peering_state.is_recovering();
447 }
448 bool is_backfilling() const final {
449 return peering_state.is_backfilling();
450 }
451 pg_stat_t get_stats() {
452 auto stats = peering_state.prepare_stats_for_publish(
453 false,
454 pg_stat_t(),
455 object_stat_collection_t());
456 ceph_assert(stats);
457 return *stats;
458 }
459 bool get_need_up_thru() const {
460 return peering_state.get_need_up_thru();
461 }
462 epoch_t get_same_interval_since() const {
463 return get_info().history.same_interval_since;
464 }
465
466 const auto& get_pool() const {
467 return peering_state.get_pool();
468 }
469 pg_shard_t get_primary() const {
470 return peering_state.get_primary();
471 }
472
473 /// initialize created PG
474 void init(
475 int role,
476 const std::vector<int>& up,
477 int up_primary,
478 const std::vector<int>& acting,
479 int acting_primary,
480 const pg_history_t& history,
481 const PastIntervals& pim,
482 bool backfill,
483 ceph::os::Transaction &t);
484
485 seastar::future<> read_state(crimson::os::FuturizedStore* store);
486
487 void do_peering_event(
488 PGPeeringEvent& evt, PeeringCtx &rctx);
489
490 void handle_advance_map(cached_map_t next_map, PeeringCtx &rctx);
491 void handle_activate_map(PeeringCtx &rctx);
492 void handle_initialize(PeeringCtx &rctx);
493
494 static hobject_t get_oid(const MOSDOp &m);
495 static RWState::State get_lock_type(const OpInfo &op_info);
496 static std::optional<hobject_t> resolve_oid(
497 const SnapSet &snapset,
498 const hobject_t &oid);
499
500 using load_obc_ertr = crimson::errorator<
501 crimson::ct_error::object_corrupted>;
502
503 load_obc_ertr::future<crimson::osd::ObjectContextRef>
504 load_head_obc(ObjectContextRef obc);
505
506 load_obc_ertr::future<>
507 reload_obc(crimson::osd::ObjectContext& obc) const;
508
509 public:
510 using with_obc_func_t =
511 std::function<load_obc_ertr::future<> (ObjectContextRef)>;
512
513 template<RWState::State State>
514 load_obc_ertr::future<> with_head_obc(hobject_t oid, with_obc_func_t&& func);
515
516 load_obc_ertr::future<> with_locked_obc(
517 Ref<MOSDOp> &m,
518 const OpInfo &op_info,
519 Operation *op,
520 with_obc_func_t&& f);
521
522 seastar::future<> handle_rep_op(Ref<MOSDRepOp> m);
523 void handle_rep_op_reply(crimson::net::ConnectionRef conn,
524 const MOSDRepOpReply& m);
525
526 void print(std::ostream& os) const;
527 void dump_primary(Formatter*);
528
529 private:
530 template<RWState::State State>
531 load_obc_ertr::future<> with_clone_obc(hobject_t oid, with_obc_func_t&& func);
532
533 load_obc_ertr::future<ObjectContextRef> get_locked_obc(
534 Operation *op,
535 const hobject_t &oid,
536 RWState::State type);
537
538 void do_peering_event(
539 const boost::statechart::event_base &evt,
540 PeeringCtx &rctx);
541 osd_op_params_t&& fill_op_params_bump_pg_version(
542 osd_op_params_t&& osd_op_p,
543 Ref<MOSDOp> m,
544 const bool user_modify);
545 seastar::future<Ref<MOSDOpReply>> handle_failed_op(
546 const std::error_code& e,
547 ObjectContextRef obc,
548 const OpsExecuter& ox,
549 const MOSDOp& m) const;
550 seastar::future<Ref<MOSDOpReply>> do_osd_ops(
551 Ref<MOSDOp> m,
552 ObjectContextRef obc,
553 const OpInfo &op_info);
554 seastar::future<Ref<MOSDOpReply>> do_pg_ops(Ref<MOSDOp> m);
555 seastar::future<> submit_transaction(const OpInfo& op_info,
556 const std::vector<OSDOp>& ops,
557 ObjectContextRef&& obc,
558 ceph::os::Transaction&& txn,
559 const osd_op_params_t& oop);
560
561 private:
562 OSDMapGate osdmap_gate;
563 ShardServices &shard_services;
564
565 cached_map_t osdmap;
566
567 public:
568 cached_map_t get_osdmap() { return osdmap; }
569 eversion_t next_version() {
570 return eversion_t(get_osdmap_epoch(),
571 ++projected_last_update.version);
572 }
573 ShardServices& get_shard_services() final {
574 return shard_services;
575 }
576 seastar::future<> stop();
577
578 private:
579 std::unique_ptr<PGBackend> backend;
580 std::unique_ptr<RecoveryBackend> recovery_backend;
581 std::unique_ptr<PGRecovery> recovery_handler;
582
583 PeeringState peering_state;
584 eversion_t projected_last_update;
585 public:
586 RecoveryBackend* get_recovery_backend() final {
587 return recovery_backend.get();
588 }
589 PGRecovery* get_recovery_handler() final {
590 return recovery_handler.get();
591 }
592 PeeringState& get_peering_state() final {
593 return peering_state;
594 }
595 bool has_reset_since(epoch_t epoch) const final {
596 return peering_state.pg_has_reset_since(epoch);
597 }
598
599 const pg_missing_tracker_t& get_local_missing() const {
600 return peering_state.get_pg_log().get_missing();
601 }
602 epoch_t get_last_peering_reset() const final {
603 return peering_state.get_last_peering_reset();
604 }
605 const set<pg_shard_t> &get_acting_recovery_backfill() const {
606 return peering_state.get_acting_recovery_backfill();
607 }
608 bool is_backfill_target(pg_shard_t osd) const {
609 return peering_state.is_backfill_target(osd);
610 }
611 void begin_peer_recover(pg_shard_t peer, const hobject_t oid) {
612 peering_state.begin_peer_recover(peer, oid);
613 }
614 uint64_t min_peer_features() const {
615 return peering_state.get_min_peer_features();
616 }
617 const map<hobject_t, set<pg_shard_t>>&
618 get_missing_loc_shards() const {
619 return peering_state.get_missing_loc().get_missing_locs();
620 }
621 const map<pg_shard_t, pg_missing_t> &get_shard_missing() const {
622 return peering_state.get_peer_missing();
623 }
624 const pg_missing_const_i* get_shard_missing(pg_shard_t shard) const {
625 if (shard == pg_whoami)
626 return &get_local_missing();
627 else {
628 auto it = peering_state.get_peer_missing().find(shard);
629 if (it == peering_state.get_peer_missing().end())
630 return nullptr;
631 else
632 return &it->second;
633 }
634 }
635 int get_recovery_op_priority() const {
636 int64_t pri = 0;
637 get_pool().info.opts.get(pool_opts_t::RECOVERY_OP_PRIORITY, &pri);
638 return pri > 0 ? pri : crimson::common::local_conf()->osd_recovery_op_priority;
639 }
640 seastar::future<> mark_unfound_lost(int) {
641 // TODO: see PrimaryLogPG::mark_all_unfound_lost()
642 return seastar::now();
643 }
644
645 private:
646 // instead of seastar::gate, we use a boolean flag to indicate
647 // whether the system is shutting down, as we don't need to track
648 // continuations here.
649 bool stopping = false;
650
651 class WaitForActiveBlocker : public BlockerT<WaitForActiveBlocker> {
652 PG *pg;
653
654 const spg_t pgid;
655 seastar::shared_promise<> p;
656
657 protected:
658 void dump_detail(Formatter *f) const;
659
660 public:
661 static constexpr const char *type_name = "WaitForActiveBlocker";
662
663 WaitForActiveBlocker(PG *pg) : pg(pg) {}
664 void on_active();
665 blocking_future<> wait();
666 seastar::future<> stop();
667 } wait_for_active_blocker;
668
669 friend std::ostream& operator<<(std::ostream&, const PG& pg);
670 friend class ClientRequest;
671 friend class PGAdvanceMap;
672 friend class PeeringEvent;
673 friend class RepRequest;
674 friend class BackfillRecovery;
675 friend struct PGFacade;
676 private:
677 seastar::future<bool> find_unfound() {
678 return seastar::make_ready_future<bool>(true);
679 }
680
681 template <typename MsgType>
682 bool can_discard_replica_op(const MsgType& m) const;
683 bool can_discard_op(const MOSDOp& m) const;
684 bool is_missing_object(const hobject_t& soid) const {
685 return peering_state.get_pg_log().get_missing().get_items().count(soid);
686 }
687 bool is_unreadable_object(const hobject_t &oid,
688 eversion_t* v = 0) const final {
689 return is_missing_object(oid) ||
690 !peering_state.get_missing_loc().readable_with_acting(
691 oid, get_actingset(), v);
692 }
693 bool is_degraded_or_backfilling_object(const hobject_t& soid) const;
694 const set<pg_shard_t> &get_actingset() const {
695 return peering_state.get_actingset();
696 }
697
698 private:
699 BackfillRecovery::BackfillRecoveryPipeline backfill_pipeline;
700 };
701
702 std::ostream& operator<<(std::ostream&, const PG& pg);
703
704 }