]> git.proxmox.com Git - ceph.git/blob - ceph/src/osd/PG.h
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / osd / PG.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #ifndef CEPH_PG_H
16 #define CEPH_PG_H
17
18 #include <boost/scoped_ptr.hpp>
19 #include <boost/container/flat_set.hpp>
20 #include "include/mempool.h"
21
22 // re-include our assert to clobber boost's
23 #include "include/ceph_assert.h"
24 #include "include/common_fwd.h"
25
26 #include "include/types.h"
27 #include "include/stringify.h"
28 #include "osd_types.h"
29 #include "include/xlist.h"
30 #include "SnapMapper.h"
31 #include "Session.h"
32 #include "common/Timer.h"
33
34 #include "PGLog.h"
35 #include "OSDMap.h"
36 #include "messages/MOSDPGLog.h"
37 #include "include/str_list.h"
38 #include "PGBackend.h"
39 #include "PGPeeringEvent.h"
40 #include "PeeringState.h"
41 #include "recovery_types.h"
42 #include "MissingLoc.h"
43 #include "scrubber_common.h"
44
45 #include "mgr/OSDPerfMetricTypes.h"
46
47 #include <atomic>
48 #include <list>
49 #include <memory>
50 #include <string>
51 #include <tuple>
52
53 //#define DEBUG_RECOVERY_OIDS // track std::set of recovering oids explicitly, to find counting bugs
54 //#define PG_DEBUG_REFS // track provenance of pg refs, helpful for finding leaks
55
56 class OSD;
57 class OSDService;
58 class OSDShard;
59 class OSDShardPGSlot;
60 class MOSDPGScan;
61 class MOSDPGBackfill;
62 class MOSDPGInfo;
63
64 class PG;
65 struct OpRequest;
66 typedef OpRequest::Ref OpRequestRef;
67 class MOSDPGLog;
68 class DynamicPerfStats;
69 class PgScrubber;
70
71 namespace Scrub {
72 class Store;
73 class ReplicaReservations;
74 class LocalReservation;
75 class ReservedByRemotePrimary;
76 }
77
78 #ifdef PG_DEBUG_REFS
79 #include "common/tracked_int_ptr.hpp"
80 uint64_t get_with_id(PG *pg);
81 void put_with_id(PG *pg, uint64_t id);
82 typedef TrackedIntPtr<PG> PGRef;
83 #else
84 typedef boost::intrusive_ptr<PG> PGRef;
85 #endif
86
87 class PGRecoveryStats {
88 struct per_state_info {
89 uint64_t enter, exit; // enter/exit counts
90 uint64_t events;
91 utime_t event_time; // time spent processing events
92 utime_t total_time; // total time in state
93 utime_t min_time, max_time;
94
95 // cppcheck-suppress unreachableCode
96 per_state_info() : enter(0), exit(0), events(0) {}
97 };
98 std::map<const char *,per_state_info> info;
99 ceph::mutex lock = ceph::make_mutex("PGRecoverStats::lock");
100
101 public:
102 PGRecoveryStats() = default;
103
104 void reset() {
105 std::lock_guard l(lock);
106 info.clear();
107 }
108 void dump(ostream& out) {
109 std::lock_guard l(lock);
110 for (std::map<const char *,per_state_info>::iterator p = info.begin(); p != info.end(); ++p) {
111 per_state_info& i = p->second;
112 out << i.enter << "\t" << i.exit << "\t"
113 << i.events << "\t" << i.event_time << "\t"
114 << i.total_time << "\t"
115 << i.min_time << "\t" << i.max_time << "\t"
116 << p->first << "\n";
117 }
118 }
119
120 void dump_formatted(ceph::Formatter *f) {
121 std::lock_guard l(lock);
122 f->open_array_section("pg_recovery_stats");
123 for (std::map<const char *,per_state_info>::iterator p = info.begin();
124 p != info.end(); ++p) {
125 per_state_info& i = p->second;
126 f->open_object_section("recovery_state");
127 f->dump_int("enter", i.enter);
128 f->dump_int("exit", i.exit);
129 f->dump_int("events", i.events);
130 f->dump_stream("event_time") << i.event_time;
131 f->dump_stream("total_time") << i.total_time;
132 f->dump_stream("min_time") << i.min_time;
133 f->dump_stream("max_time") << i.max_time;
134 std::vector<std::string> states;
135 get_str_vec(p->first, "/", states);
136 f->open_array_section("nested_states");
137 for (std::vector<std::string>::iterator st = states.begin();
138 st != states.end(); ++st) {
139 f->dump_string("state", *st);
140 }
141 f->close_section();
142 f->close_section();
143 }
144 f->close_section();
145 }
146
147 void log_enter(const char *s) {
148 std::lock_guard l(lock);
149 info[s].enter++;
150 }
151 void log_exit(const char *s, utime_t dur, uint64_t events, utime_t event_dur) {
152 std::lock_guard l(lock);
153 per_state_info &i = info[s];
154 i.exit++;
155 i.total_time += dur;
156 if (dur > i.max_time)
157 i.max_time = dur;
158 if (dur < i.min_time || i.min_time == utime_t())
159 i.min_time = dur;
160 i.events += events;
161 i.event_time += event_dur;
162 }
163 };
164
165 /** PG - Replica Placement Group
166 *
167 */
168
169 class PG : public DoutPrefixProvider, public PeeringState::PeeringListener {
170 friend struct NamedState;
171 friend class PeeringState;
172 friend class PgScrubber;
173 friend class PrimaryLogScrub;
174 friend class Scrub::ReplicaReservations;
175 friend class Scrub::LocalReservation; // dout()-only friendship
176 friend class Scrub::ReservedByRemotePrimary; // dout()-only friendship
177
178 public:
179 const pg_shard_t pg_whoami;
180 const spg_t pg_id;
181
182 std::unique_ptr<ScrubPgIF> m_scrubber;
183
184 /// flags detailing scheduling/operation characteristics of the next scrub
185 requested_scrub_t m_planned_scrub;
186 /// scrubbing state for both Primary & replicas
187 bool is_scrub_active() const { return m_scrubber->is_scrub_active(); }
188
189 public:
190 // -- members --
191 const coll_t coll;
192
193 ObjectStore::CollectionHandle ch;
194
195 // -- methods --
196 std::ostream& gen_prefix(std::ostream& out) const override;
197 CephContext *get_cct() const override {
198 return cct;
199 }
200 unsigned get_subsys() const override {
201 return ceph_subsys_osd;
202 }
203
204 const char* const get_current_state() const {
205 return recovery_state.get_current_state();
206 }
207
208 const OSDMapRef& get_osdmap() const {
209 ceph_assert(is_locked());
210 return recovery_state.get_osdmap();
211 }
212
213 epoch_t get_osdmap_epoch() const override final {
214 return recovery_state.get_osdmap()->get_epoch();
215 }
216
217 PerfCounters &get_peering_perf() override;
218 PerfCounters &get_perf_logger() override;
219 void log_state_enter(const char *state) override;
220 void log_state_exit(
221 const char *state_name, utime_t enter_time,
222 uint64_t events, utime_t event_dur) override;
223
224 void lock(bool no_lockdep = false) const;
225 void unlock() const;
226 bool is_locked() const;
227
228 const spg_t& get_pgid() const {
229 return pg_id;
230 }
231
232 const PGPool& get_pool() const {
233 return pool;
234 }
235 uint64_t get_last_user_version() const {
236 return info.last_user_version;
237 }
238 const pg_history_t& get_history() const {
239 return info.history;
240 }
241 bool get_need_up_thru() const {
242 return recovery_state.get_need_up_thru();
243 }
244 epoch_t get_same_interval_since() const {
245 return info.history.same_interval_since;
246 }
247
248 static void set_last_scrub_stamp(
249 utime_t t, pg_history_t &history, pg_stat_t &stats) {
250 stats.last_scrub_stamp = t;
251 history.last_scrub_stamp = t;
252 }
253
254 void set_last_scrub_stamp(utime_t t) {
255 recovery_state.update_stats(
256 [=](auto &history, auto &stats) {
257 set_last_scrub_stamp(t, history, stats);
258 return true;
259 });
260 }
261
262 static void set_last_deep_scrub_stamp(
263 utime_t t, pg_history_t &history, pg_stat_t &stats) {
264 stats.last_deep_scrub_stamp = t;
265 history.last_deep_scrub_stamp = t;
266 }
267
268 void set_last_deep_scrub_stamp(utime_t t) {
269 recovery_state.update_stats(
270 [=](auto &history, auto &stats) {
271 set_last_deep_scrub_stamp(t, history, stats);
272 return true;
273 });
274 }
275
276 bool is_deleting() const {
277 return recovery_state.is_deleting();
278 }
279 bool is_deleted() const {
280 return recovery_state.is_deleted();
281 }
282 bool is_nonprimary() const {
283 return recovery_state.is_nonprimary();
284 }
285 bool is_primary() const {
286 return recovery_state.is_primary();
287 }
288 bool pg_has_reset_since(epoch_t e) {
289 ceph_assert(is_locked());
290 return recovery_state.pg_has_reset_since(e);
291 }
292
293 bool is_ec_pg() const {
294 return recovery_state.is_ec_pg();
295 }
296 int get_role() const {
297 return recovery_state.get_role();
298 }
299 const std::vector<int> get_acting() const {
300 return recovery_state.get_acting();
301 }
302 const std::set<pg_shard_t> &get_actingset() const {
303 return recovery_state.get_actingset();
304 }
305 int get_acting_primary() const {
306 return recovery_state.get_acting_primary();
307 }
308 pg_shard_t get_primary() const {
309 return recovery_state.get_primary();
310 }
311 const std::vector<int> get_up() const {
312 return recovery_state.get_up();
313 }
314 int get_up_primary() const {
315 return recovery_state.get_up_primary();
316 }
317 const PastIntervals& get_past_intervals() const {
318 return recovery_state.get_past_intervals();
319 }
320 bool is_acting_recovery_backfill(pg_shard_t osd) const {
321 return recovery_state.is_acting_recovery_backfill(osd);
322 }
323 const std::set<pg_shard_t> &get_acting_recovery_backfill() const {
324 return recovery_state.get_acting_recovery_backfill();
325 }
326 bool is_acting(pg_shard_t osd) const {
327 return recovery_state.is_acting(osd);
328 }
329 bool is_up(pg_shard_t osd) const {
330 return recovery_state.is_up(osd);
331 }
332 static bool has_shard(bool ec, const std::vector<int>& v, pg_shard_t osd) {
333 return PeeringState::has_shard(ec, v, osd);
334 }
335
336 /// initialize created PG
337 void init(
338 int role,
339 const std::vector<int>& up,
340 int up_primary,
341 const std::vector<int>& acting,
342 int acting_primary,
343 const pg_history_t& history,
344 const PastIntervals& pim,
345 bool backfill,
346 ObjectStore::Transaction &t);
347
348 /// read existing pg state off disk
349 void read_state(ObjectStore *store);
350 static int peek_map_epoch(ObjectStore *store, spg_t pgid, epoch_t *pepoch);
351
352 static int get_latest_struct_v() {
353 return pg_latest_struct_v;
354 }
355 static int get_compat_struct_v() {
356 return pg_compat_struct_v;
357 }
358 static int read_info(
359 ObjectStore *store, spg_t pgid, const coll_t &coll,
360 pg_info_t &info, PastIntervals &past_intervals,
361 __u8 &);
362 static bool _has_removal_flag(ObjectStore *store, spg_t pgid);
363
364 void rm_backoff(const ceph::ref_t<Backoff>& b);
365
366 void update_snap_mapper_bits(uint32_t bits) {
367 snap_mapper.update_bits(bits);
368 }
369 void start_split_stats(const std::set<spg_t>& childpgs, std::vector<object_stat_sum_t> *v);
370 virtual void split_colls(
371 spg_t child,
372 int split_bits,
373 int seed,
374 const pg_pool_t *pool,
375 ObjectStore::Transaction &t) = 0;
376 void split_into(pg_t child_pgid, PG *child, unsigned split_bits);
377 void merge_from(std::map<spg_t,PGRef>& sources, PeeringCtx &rctx,
378 unsigned split_bits,
379 const pg_merge_meta_t& last_pg_merge_meta);
380 void finish_split_stats(const object_stat_sum_t& stats,
381 ObjectStore::Transaction &t);
382
383 void scrub(epoch_t queued, ThreadPool::TPHandle &handle);
384 /**
385 * a special version of PG::scrub(), which:
386 * - is initiated after repair, and
387 * - is not required to allocate local/remote OSD scrub resources
388 */
389 void recovery_scrub(epoch_t queued, ThreadPool::TPHandle &handle);
390 void replica_scrub(epoch_t queued, ThreadPool::TPHandle &handle);
391 void replica_scrub_resched(epoch_t queued, ThreadPool::TPHandle &handle);
392
393 /// Queues a PGScrubResourcesOK message. Will translate into 'RemotesReserved' FSM event
394 void scrub_send_resources_granted(epoch_t queued, ThreadPool::TPHandle &handle);
395 void scrub_send_resources_denied(epoch_t queued, ThreadPool::TPHandle &handle);
396 void scrub_send_scrub_resched(epoch_t queued, ThreadPool::TPHandle &handle);
397 void scrub_send_pushes_update(epoch_t queued, ThreadPool::TPHandle &handle);
398 void scrub_send_applied_update(epoch_t queued, ThreadPool::TPHandle &handle);
399 void scrub_send_unblocking(epoch_t epoch_queued, ThreadPool::TPHandle &handle);
400 void scrub_send_digest_update(epoch_t epoch_queued, ThreadPool::TPHandle &handle);
401 void scrub_send_replmaps_ready(epoch_t epoch_queued, ThreadPool::TPHandle &handle);
402 void scrub_send_replica_pushes(epoch_t queued, ThreadPool::TPHandle &handle);
403
404 void reg_next_scrub();
405
406 void queue_want_pg_temp(const std::vector<int> &wanted) override;
407 void clear_want_pg_temp() override;
408
409 void on_new_interval() override;
410
411 void on_role_change() override;
412 virtual void plpg_on_role_change() = 0;
413
414 void init_collection_pool_opts();
415 void on_pool_change() override;
416 virtual void plpg_on_pool_change() = 0;
417
418 void on_info_history_change() override;
419
420 void scrub_requested(scrub_level_t scrub_level, scrub_type_t scrub_type) override;
421
422 uint64_t get_snap_trimq_size() const override {
423 return snap_trimq.size();
424 }
425 unsigned get_target_pg_log_entries() const override;
426
427 void clear_publish_stats() override;
428 void clear_primary_state() override;
429
430 epoch_t oldest_stored_osdmap() override;
431 OstreamTemp get_clog_error() override;
432 OstreamTemp get_clog_info() override;
433 OstreamTemp get_clog_debug() override;
434
435 void schedule_event_after(
436 PGPeeringEventRef event,
437 float delay) override;
438 void request_local_background_io_reservation(
439 unsigned priority,
440 PGPeeringEventURef on_grant,
441 PGPeeringEventURef on_preempt) override;
442 void update_local_background_io_priority(
443 unsigned priority) override;
444 void cancel_local_background_io_reservation() override;
445
446 void request_remote_recovery_reservation(
447 unsigned priority,
448 PGPeeringEventURef on_grant,
449 PGPeeringEventURef on_preempt) override;
450 void cancel_remote_recovery_reservation() override;
451
452 void schedule_event_on_commit(
453 ObjectStore::Transaction &t,
454 PGPeeringEventRef on_commit) override;
455
456 void on_active_exit() override;
457
458 Context *on_clean() override {
459 if (is_active()) {
460 kick_snap_trim();
461 }
462 requeue_ops(waiting_for_clean_to_primary_repair);
463 return finish_recovery();
464 }
465
466 void on_activate(interval_set<snapid_t> snaps) override;
467
468 void on_activate_committed() override;
469
470 void on_active_actmap() override;
471 void on_active_advmap(const OSDMapRef &osdmap) override;
472
473 void queue_snap_retrim(snapid_t snap);
474
475 void on_backfill_reserved() override;
476 void on_backfill_canceled() override;
477 void on_recovery_reserved() override;
478
479 bool is_forced_recovery_or_backfill() const {
480 return recovery_state.is_forced_recovery_or_backfill();
481 }
482
483 PGLog::LogEntryHandlerRef get_log_handler(
484 ObjectStore::Transaction &t) override {
485 return std::make_unique<PG::PGLogEntryHandler>(this, &t);
486 }
487
488 std::pair<ghobject_t, bool> do_delete_work(ObjectStore::Transaction &t,
489 ghobject_t _next) override;
490
491 void clear_ready_to_merge() override;
492 void set_not_ready_to_merge_target(pg_t pgid, pg_t src) override;
493 void set_not_ready_to_merge_source(pg_t pgid) override;
494 void set_ready_to_merge_target(eversion_t lu, epoch_t les, epoch_t lec) override;
495 void set_ready_to_merge_source(eversion_t lu) override;
496
497 void send_pg_created(pg_t pgid) override;
498
499 ceph::signedspan get_mnow() override;
500 HeartbeatStampsRef get_hb_stamps(int peer) override;
501 void schedule_renew_lease(epoch_t lpr, ceph::timespan delay) override;
502 void queue_check_readable(epoch_t lpr, ceph::timespan delay) override;
503
504 void rebuild_missing_set_with_deletes(PGLog &pglog) override;
505
506 void queue_peering_event(PGPeeringEventRef evt);
507 void do_peering_event(PGPeeringEventRef evt, PeeringCtx &rcx);
508 void queue_null(epoch_t msg_epoch, epoch_t query_epoch);
509 void queue_flushed(epoch_t started_at);
510 void handle_advance_map(
511 OSDMapRef osdmap, OSDMapRef lastmap,
512 std::vector<int>& newup, int up_primary,
513 std::vector<int>& newacting, int acting_primary,
514 PeeringCtx &rctx);
515 void handle_activate_map(PeeringCtx &rctx);
516 void handle_initialize(PeeringCtx &rxcx);
517 void handle_query_state(ceph::Formatter *f);
518
519 /**
520 * @param ops_begun returns how many recovery ops the function started
521 * @returns true if any useful work was accomplished; false otherwise
522 */
523 virtual bool start_recovery_ops(
524 uint64_t max,
525 ThreadPool::TPHandle &handle,
526 uint64_t *ops_begun) = 0;
527
528 // more work after the above, but with a PeeringCtx
529 void find_unfound(epoch_t queued, PeeringCtx &rctx);
530
531 virtual void get_watchers(std::list<obj_watch_item_t> *ls) = 0;
532
533 void dump_pgstate_history(ceph::Formatter *f);
534 void dump_missing(ceph::Formatter *f);
535
536 void get_pg_stats(std::function<void(const pg_stat_t&, epoch_t lec)> f);
537 void with_heartbeat_peers(std::function<void(int)> f);
538
539 void shutdown();
540 virtual void on_shutdown() = 0;
541
542 bool get_must_scrub() const;
543 bool sched_scrub();
544
545 unsigned int scrub_requeue_priority(Scrub::scrub_prio_t with_priority, unsigned int suggested_priority) const;
546 /// the version that refers to flags_.priority
547 unsigned int scrub_requeue_priority(Scrub::scrub_prio_t with_priority) const;
548 private:
549 // auxiliaries used by sched_scrub():
550 double next_deepscrub_interval() const;
551
552 /// should we perform deep scrub?
553 bool is_time_for_deep(bool allow_deep_scrub,
554 bool allow_scrub,
555 bool has_deep_errors,
556 const requested_scrub_t& planned) const;
557
558 /**
559 * Verify the various 'next scrub' flags in m_planned_scrub against configuration
560 * and scrub-related timestamps.
561 *
562 * @returns an updated copy of the m_planned_flags (or nothing if no scrubbing)
563 */
564 std::optional<requested_scrub_t> verify_scrub_mode() const;
565
566 bool verify_periodic_scrub_mode(bool allow_deep_scrub,
567 bool try_to_auto_repair,
568 bool allow_regular_scrub,
569 bool has_deep_errors,
570 requested_scrub_t& planned) const;
571
572 using ScrubAPI = void (ScrubPgIF::*)(epoch_t epoch_queued);
573 void forward_scrub_event(ScrubAPI fn, epoch_t epoch_queued);
574
575 public:
576 virtual void do_request(
577 OpRequestRef& op,
578 ThreadPool::TPHandle &handle
579 ) = 0;
580 virtual void clear_cache() = 0;
581 virtual int get_cache_obj_count() = 0;
582
583 virtual void snap_trimmer(epoch_t epoch_queued) = 0;
584 virtual void do_command(
585 const std::string_view& prefix,
586 const cmdmap_t& cmdmap,
587 const ceph::buffer::list& idata,
588 std::function<void(int,const std::string&,ceph::buffer::list&)> on_finish) = 0;
589
590 virtual bool agent_work(int max) = 0;
591 virtual bool agent_work(int max, int agent_flush_quota) = 0;
592 virtual void agent_stop() = 0;
593 virtual void agent_delay() = 0;
594 virtual void agent_clear() = 0;
595 virtual void agent_choose_mode_restart() = 0;
596
597 struct C_DeleteMore : public Context {
598 PGRef pg;
599 epoch_t epoch;
600 C_DeleteMore(PG *p, epoch_t e) : pg(p), epoch(e) {}
601 void finish(int r) override {
602 ceph_abort();
603 }
604 void complete(int r) override;
605 };
606
607 void _delete_some(ObjectStore::Transaction *t);
608
609 virtual void set_dynamic_perf_stats_queries(
610 const std::list<OSDPerfMetricQuery> &queries) {
611 }
612 virtual void get_dynamic_perf_stats(DynamicPerfStats *stats) {
613 }
614
615 uint64_t get_min_alloc_size() const;
616
617 // reference counting
618 #ifdef PG_DEBUG_REFS
619 uint64_t get_with_id();
620 void put_with_id(uint64_t);
621 void dump_live_ids();
622 #endif
623 void get(const char* tag);
624 void put(const char* tag);
625 int get_num_ref() {
626 return ref;
627 }
628
629 // ctor
630 PG(OSDService *o, OSDMapRef curmap,
631 const PGPool &pool, spg_t p);
632 ~PG() override;
633
634 // prevent copying
635 explicit PG(const PG& rhs) = delete;
636 PG& operator=(const PG& rhs) = delete;
637
638 protected:
639 // -------------
640 // protected
641 OSDService *osd;
642 public:
643 OSDShard *osd_shard = nullptr;
644 OSDShardPGSlot *pg_slot = nullptr;
645 protected:
646 CephContext *cct;
647
648 // locking and reference counting.
649 // I destroy myself when the reference count hits zero.
650 // lock() should be called before doing anything.
651 // get() should be called on pointer copy (to another thread, etc.).
652 // put() should be called on destruction of some previously copied pointer.
653 // unlock() when done with the current pointer (_most common_).
654 mutable ceph::mutex _lock = ceph::make_mutex("PG::_lock");
655 #ifndef CEPH_DEBUG_MUTEX
656 mutable std::thread::id locked_by;
657 #endif
658 std::atomic<unsigned int> ref{0};
659
660 #ifdef PG_DEBUG_REFS
661 ceph::mutex _ref_id_lock = ceph::make_mutex("PG::_ref_id_lock");
662 std::map<uint64_t, std::string> _live_ids;
663 std::map<std::string, uint64_t> _tag_counts;
664 uint64_t _ref_id = 0;
665
666 friend uint64_t get_with_id(PG *pg) { return pg->get_with_id(); }
667 friend void put_with_id(PG *pg, uint64_t id) { return pg->put_with_id(id); }
668 #endif
669
670 private:
671 friend void intrusive_ptr_add_ref(PG *pg) {
672 pg->get("intptr");
673 }
674 friend void intrusive_ptr_release(PG *pg) {
675 pg->put("intptr");
676 }
677
678
679 // =====================
680
681 protected:
682 OSDriver osdriver;
683 SnapMapper snap_mapper;
684
685 virtual PGBackend *get_pgbackend() = 0;
686 virtual const PGBackend* get_pgbackend() const = 0;
687
688 protected:
689 void requeue_map_waiters();
690
691 protected:
692
693 ZTracer::Endpoint trace_endpoint;
694
695
696 protected:
697 __u8 info_struct_v = 0;
698 void upgrade(ObjectStore *store);
699
700 protected:
701 ghobject_t pgmeta_oid;
702
703 // ------------------
704 interval_set<snapid_t> snap_trimq;
705 std::set<snapid_t> snap_trimq_repeat;
706
707 /* You should not use these items without taking their respective queue locks
708 * (if they have one) */
709 xlist<PG*>::item stat_queue_item;
710 bool scrub_queued;
711 bool recovery_queued;
712
713 int recovery_ops_active;
714 std::set<pg_shard_t> waiting_on_backfill;
715 #ifdef DEBUG_RECOVERY_OIDS
716 multiset<hobject_t> recovering_oids;
717 #endif
718
719 public:
720 bool dne() { return info.dne(); }
721
722 virtual void send_cluster_message(
723 int osd, MessageRef m, epoch_t epoch, bool share_map_update) override;
724
725 protected:
726 epoch_t get_last_peering_reset() const {
727 return recovery_state.get_last_peering_reset();
728 }
729
730 /* heartbeat peers */
731 void set_probe_targets(const std::set<pg_shard_t> &probe_set) override;
732 void clear_probe_targets() override;
733
734 ceph::mutex heartbeat_peer_lock =
735 ceph::make_mutex("PG::heartbeat_peer_lock");
736 std::set<int> heartbeat_peers;
737 std::set<int> probe_targets;
738
739 protected:
740 BackfillInterval backfill_info;
741 std::map<pg_shard_t, BackfillInterval> peer_backfill_info;
742 bool backfill_reserving;
743
744 // The primary's num_bytes and local num_bytes for this pg, only valid
745 // during backfill for non-primary shards.
746 // Both of these are adjusted for EC to reflect the on-disk bytes
747 std::atomic<int64_t> primary_num_bytes = 0;
748 std::atomic<int64_t> local_num_bytes = 0;
749
750 public:
751 // Space reserved for backfill is primary_num_bytes - local_num_bytes
752 // Don't care that difference itself isn't atomic
753 uint64_t get_reserved_num_bytes() {
754 int64_t primary = primary_num_bytes.load();
755 int64_t local = local_num_bytes.load();
756 if (primary > local)
757 return primary - local;
758 else
759 return 0;
760 }
761
762 bool is_remote_backfilling() {
763 return primary_num_bytes.load() > 0;
764 }
765
766 bool try_reserve_recovery_space(int64_t primary, int64_t local) override;
767 void unreserve_recovery_space() override;
768
769 // If num_bytes are inconsistent and local_num- goes negative
770 // it's ok, because it would then be ignored.
771
772 // The value of num_bytes could be negative,
773 // but we don't let local_num_bytes go negative.
774 void add_local_num_bytes(int64_t num_bytes) {
775 if (num_bytes) {
776 int64_t prev_bytes = local_num_bytes.load();
777 int64_t new_bytes;
778 do {
779 new_bytes = prev_bytes + num_bytes;
780 if (new_bytes < 0)
781 new_bytes = 0;
782 } while(!local_num_bytes.compare_exchange_weak(prev_bytes, new_bytes));
783 }
784 }
785 void sub_local_num_bytes(int64_t num_bytes) {
786 ceph_assert(num_bytes >= 0);
787 if (num_bytes) {
788 int64_t prev_bytes = local_num_bytes.load();
789 int64_t new_bytes;
790 do {
791 new_bytes = prev_bytes - num_bytes;
792 if (new_bytes < 0)
793 new_bytes = 0;
794 } while(!local_num_bytes.compare_exchange_weak(prev_bytes, new_bytes));
795 }
796 }
797 // The value of num_bytes could be negative,
798 // but we don't let info.stats.stats.sum.num_bytes go negative.
799 void add_num_bytes(int64_t num_bytes) {
800 ceph_assert(ceph_mutex_is_locked_by_me(_lock));
801 if (num_bytes) {
802 recovery_state.update_stats(
803 [num_bytes](auto &history, auto &stats) {
804 stats.stats.sum.num_bytes += num_bytes;
805 if (stats.stats.sum.num_bytes < 0) {
806 stats.stats.sum.num_bytes = 0;
807 }
808 return false;
809 });
810 }
811 }
812 void sub_num_bytes(int64_t num_bytes) {
813 ceph_assert(ceph_mutex_is_locked_by_me(_lock));
814 ceph_assert(num_bytes >= 0);
815 if (num_bytes) {
816 recovery_state.update_stats(
817 [num_bytes](auto &history, auto &stats) {
818 stats.stats.sum.num_bytes -= num_bytes;
819 if (stats.stats.sum.num_bytes < 0) {
820 stats.stats.sum.num_bytes = 0;
821 }
822 return false;
823 });
824 }
825 }
826
827 // Only used in testing so not worried about needing the PG lock here
828 int64_t get_stats_num_bytes() {
829 std::lock_guard l{_lock};
830 int num_bytes = info.stats.stats.sum.num_bytes;
831 if (pool.info.is_erasure()) {
832 num_bytes /= (int)get_pgbackend()->get_ec_data_chunk_count();
833 // Round up each object by a stripe
834 num_bytes += get_pgbackend()->get_ec_stripe_chunk_size() * info.stats.stats.sum.num_objects;
835 }
836 int64_t lnb = local_num_bytes.load();
837 if (lnb && lnb != num_bytes) {
838 lgeneric_dout(cct, 0) << this << " " << info.pgid << " num_bytes mismatch "
839 << lnb << " vs stats "
840 << info.stats.stats.sum.num_bytes << " / chunk "
841 << get_pgbackend()->get_ec_data_chunk_count()
842 << dendl;
843 }
844 return num_bytes;
845 }
846
847 protected:
848
849 /*
850 * blocked request wait hierarchy
851 *
852 * In order to preserve request ordering we need to be careful about the
853 * order in which blocked requests get requeued. Generally speaking, we
854 * push the requests back up to the op_wq in reverse order (most recent
855 * request first) so that they come back out again in the original order.
856 * However, because there are multiple wait queues, we need to requeue
857 * waitlists in order. Generally speaking, we requeue the wait lists
858 * that are checked first.
859 *
860 * Here are the various wait lists, in the order they are used during
861 * request processing, with notes:
862 *
863 * - waiting_for_map
864 * - may start or stop blocking at any time (depending on client epoch)
865 * - waiting_for_peered
866 * - !is_peered()
867 * - only starts blocking on interval change; never restarts
868 * - waiting_for_flush
869 * - flushes_in_progress
870 * - waiting for final flush during activate
871 * - waiting_for_active
872 * - !is_active()
873 * - only starts blocking on interval change; never restarts
874 * - waiting_for_readable
875 * - now > readable_until
876 * - unblocks when we get fresh(er) osd_pings
877 * - waiting_for_scrub
878 * - starts and stops blocking for varying intervals during scrub
879 * - waiting_for_unreadable_object
880 * - never restarts once object is readable (* except for EIO?)
881 * - waiting_for_degraded_object
882 * - never restarts once object is writeable (* except for EIO?)
883 * - waiting_for_blocked_object
884 * - starts and stops based on proxied op activity
885 * - obc rwlocks
886 * - starts and stops based on read/write activity
887 *
888 * Notes:
889 *
890 * 1. During and interval change, we requeue *everything* in the above order.
891 *
892 * 2. When an obc rwlock is released, we check for a scrub block and requeue
893 * the op there if it applies. We ignore the unreadable/degraded/blocked
894 * queues because we assume they cannot apply at that time (this is
895 * probably mostly true).
896 *
897 * 3. The requeue_ops helper will push ops onto the waiting_for_map std::list if
898 * it is non-empty.
899 *
900 * These three behaviors are generally sufficient to maintain ordering, with
901 * the possible exception of cases where we make an object degraded or
902 * unreadable that was previously okay, e.g. when scrub or op processing
903 * encounter an unexpected error. FIXME.
904 */
905
906 // ops with newer maps than our (or blocked behind them)
907 // track these by client, since inter-request ordering doesn't otherwise
908 // matter.
909 std::unordered_map<entity_name_t,std::list<OpRequestRef>> waiting_for_map;
910
911 // ops waiting on peered
912 std::list<OpRequestRef> waiting_for_peered;
913
914 /// ops waiting on readble
915 std::list<OpRequestRef> waiting_for_readable;
916
917 // ops waiting on active (require peered as well)
918 std::list<OpRequestRef> waiting_for_active;
919 std::list<OpRequestRef> waiting_for_flush;
920 std::list<OpRequestRef> waiting_for_scrub;
921
922 std::list<OpRequestRef> waiting_for_cache_not_full;
923 std::list<OpRequestRef> waiting_for_clean_to_primary_repair;
924 std::map<hobject_t, std::list<OpRequestRef>> waiting_for_unreadable_object,
925 waiting_for_degraded_object,
926 waiting_for_blocked_object;
927
928 std::set<hobject_t> objects_blocked_on_cache_full;
929 std::map<hobject_t,snapid_t> objects_blocked_on_degraded_snap;
930 std::map<hobject_t,ObjectContextRef> objects_blocked_on_snap_promotion;
931
932 // Callbacks should assume pg (and nothing else) is locked
933 std::map<hobject_t, std::list<Context*>> callbacks_for_degraded_object;
934
935 std::map<eversion_t,
936 std::list<
937 std::tuple<OpRequestRef, version_t, int,
938 std::vector<pg_log_op_return_item_t>>>> waiting_for_ondisk;
939
940 void requeue_object_waiters(std::map<hobject_t, std::list<OpRequestRef>>& m);
941 void requeue_op(OpRequestRef op);
942 void requeue_ops(std::list<OpRequestRef> &l);
943
944 // stats that persist lazily
945 object_stat_collection_t unstable_stats;
946
947 // publish stats
948 ceph::mutex pg_stats_publish_lock =
949 ceph::make_mutex("PG::pg_stats_publish_lock");
950 bool pg_stats_publish_valid;
951 pg_stat_t pg_stats_publish;
952
953 friend class TestOpsSocketHook;
954 void publish_stats_to_osd() override;
955
956 bool needs_recovery() const {
957 return recovery_state.needs_recovery();
958 }
959 bool needs_backfill() const {
960 return recovery_state.needs_backfill();
961 }
962
963 bool all_unfound_are_queried_or_lost(const OSDMapRef osdmap) const;
964
965 struct PGLogEntryHandler : public PGLog::LogEntryHandler {
966 PG *pg;
967 ObjectStore::Transaction *t;
968 PGLogEntryHandler(PG *pg, ObjectStore::Transaction *t) : pg(pg), t(t) {}
969
970 // LogEntryHandler
971 void remove(const hobject_t &hoid) override {
972 pg->get_pgbackend()->remove(hoid, t);
973 }
974 void try_stash(const hobject_t &hoid, version_t v) override {
975 pg->get_pgbackend()->try_stash(hoid, v, t);
976 }
977 void rollback(const pg_log_entry_t &entry) override {
978 ceph_assert(entry.can_rollback());
979 pg->get_pgbackend()->rollback(entry, t);
980 }
981 void rollforward(const pg_log_entry_t &entry) override {
982 pg->get_pgbackend()->rollforward(entry, t);
983 }
984 void trim(const pg_log_entry_t &entry) override {
985 pg->get_pgbackend()->trim(entry, t);
986 }
987 };
988
989 void update_object_snap_mapping(
990 ObjectStore::Transaction *t, const hobject_t &soid,
991 const std::set<snapid_t> &snaps);
992 void clear_object_snap_mapping(
993 ObjectStore::Transaction *t, const hobject_t &soid);
994 void remove_snap_mapped_object(
995 ObjectStore::Transaction& t, const hobject_t& soid);
996
997 bool have_unfound() const {
998 return recovery_state.have_unfound();
999 }
1000 uint64_t get_num_unfound() const {
1001 return recovery_state.get_num_unfound();
1002 }
1003
1004 virtual void check_local() = 0;
1005
1006 void purge_strays();
1007
1008 void update_heartbeat_peers(std::set<int> peers) override;
1009
1010 Context *finish_sync_event;
1011
1012 Context *finish_recovery();
1013 void _finish_recovery(Context *c);
1014 struct C_PG_FinishRecovery : public Context {
1015 PGRef pg;
1016 explicit C_PG_FinishRecovery(PG *p) : pg(p) {}
1017 void finish(int r) override {
1018 pg->_finish_recovery(this);
1019 }
1020 };
1021 void cancel_recovery();
1022 void clear_recovery_state();
1023 virtual void _clear_recovery_state() = 0;
1024 void start_recovery_op(const hobject_t& soid);
1025 void finish_recovery_op(const hobject_t& soid, bool dequeue=false);
1026
1027 virtual void _split_into(pg_t child_pgid, PG *child, unsigned split_bits) = 0;
1028
1029 friend class C_OSD_RepModify_Commit;
1030 friend struct C_DeleteMore;
1031
1032 // -- backoff --
1033 ceph::mutex backoff_lock = // orders inside Backoff::lock
1034 ceph::make_mutex("PG::backoff_lock");
1035 std::map<hobject_t,std::set<ceph::ref_t<Backoff>>> backoffs;
1036
1037 void add_backoff(const ceph::ref_t<Session>& s, const hobject_t& begin, const hobject_t& end);
1038 void release_backoffs(const hobject_t& begin, const hobject_t& end);
1039 void release_backoffs(const hobject_t& o) {
1040 release_backoffs(o, o);
1041 }
1042 void clear_backoffs();
1043
1044 void add_pg_backoff(const ceph::ref_t<Session>& s) {
1045 hobject_t begin = info.pgid.pgid.get_hobj_start();
1046 hobject_t end = info.pgid.pgid.get_hobj_end(pool.info.get_pg_num());
1047 add_backoff(s, begin, end);
1048 }
1049 public:
1050 void release_pg_backoffs() {
1051 hobject_t begin = info.pgid.pgid.get_hobj_start();
1052 hobject_t end = info.pgid.pgid.get_hobj_end(pool.info.get_pg_num());
1053 release_backoffs(begin, end);
1054 }
1055
1056 // -- scrub --
1057 protected:
1058 bool scrub_after_recovery;
1059
1060 int active_pushes;
1061
1062 void repair_object(
1063 const hobject_t &soid,
1064 const std::list<std::pair<ScrubMap::object, pg_shard_t> > &ok_peers,
1065 const std::set<pg_shard_t> &bad_peers);
1066
1067 [[nodiscard]] bool ops_blocked_by_scrub() const;
1068 [[nodiscard]] Scrub::scrub_prio_t is_scrub_blocking_ops() const;
1069
1070 void _repair_oinfo_oid(ScrubMap &map);
1071 void _scan_rollback_obs(const std::vector<ghobject_t> &rollback_obs);
1072 /**
1073 * returns true if [begin, end) is good to scrub at this time
1074 * a false return value obliges the implementer to requeue scrub when the
1075 * condition preventing scrub clears
1076 */
1077 virtual bool _range_available_for_scrub(
1078 const hobject_t &begin, const hobject_t &end) = 0;
1079
1080 /**
1081 * Initiate the process that will create our scrub map for the Primary.
1082 * (triggered by MSG_OSD_REP_SCRUB)
1083 */
1084 void replica_scrub(OpRequestRef op, ThreadPool::TPHandle &handle);
1085
1086 // -- recovery state --
1087
1088 struct QueuePeeringEvt : Context {
1089 PGRef pg;
1090 PGPeeringEventRef evt;
1091
1092 template <class EVT>
1093 QueuePeeringEvt(PG *pg, epoch_t epoch, EVT evt) :
1094 pg(pg), evt(std::make_shared<PGPeeringEvent>(epoch, epoch, evt)) {}
1095
1096 QueuePeeringEvt(PG *pg, PGPeeringEventRef evt) :
1097 pg(pg), evt(std::move(evt)) {}
1098
1099 void finish(int r) override {
1100 pg->lock();
1101 pg->queue_peering_event(std::move(evt));
1102 pg->unlock();
1103 }
1104 };
1105
1106
1107 public:
1108 int pg_stat_adjust(osd_stat_t *new_stat);
1109 protected:
1110 bool delete_needs_sleep = false;
1111
1112 protected:
1113 bool state_test(uint64_t m) const { return recovery_state.state_test(m); }
1114 void state_set(uint64_t m) { recovery_state.state_set(m); }
1115 void state_clear(uint64_t m) { recovery_state.state_clear(m); }
1116
1117 bool is_complete() const {
1118 return recovery_state.is_complete();
1119 }
1120 bool should_send_notify() const {
1121 return recovery_state.should_send_notify();
1122 }
1123
1124 bool is_active() const { return recovery_state.is_active(); }
1125 bool is_activating() const { return recovery_state.is_activating(); }
1126 bool is_peering() const { return recovery_state.is_peering(); }
1127 bool is_down() const { return recovery_state.is_down(); }
1128 bool is_recovery_unfound() const { return recovery_state.is_recovery_unfound(); }
1129 bool is_backfill_unfound() const { return recovery_state.is_backfill_unfound(); }
1130 bool is_incomplete() const { return recovery_state.is_incomplete(); }
1131 bool is_clean() const { return recovery_state.is_clean(); }
1132 bool is_degraded() const { return recovery_state.is_degraded(); }
1133 bool is_undersized() const { return recovery_state.is_undersized(); }
1134 bool is_scrubbing() const { return state_test(PG_STATE_SCRUBBING); } // Primary only
1135 bool is_remapped() const { return recovery_state.is_remapped(); }
1136 bool is_peered() const { return recovery_state.is_peered(); }
1137 bool is_recovering() const { return recovery_state.is_recovering(); }
1138 bool is_premerge() const { return recovery_state.is_premerge(); }
1139 bool is_repair() const { return recovery_state.is_repair(); }
1140 bool is_laggy() const { return state_test(PG_STATE_LAGGY); }
1141 bool is_wait() const { return state_test(PG_STATE_WAIT); }
1142
1143 bool is_empty() const { return recovery_state.is_empty(); }
1144
1145 // pg on-disk state
1146 void do_pending_flush();
1147
1148 public:
1149 virtual void prepare_write(
1150 pg_info_t &info,
1151 pg_info_t &last_written_info,
1152 PastIntervals &past_intervals,
1153 PGLog &pglog,
1154 bool dirty_info,
1155 bool dirty_big_info,
1156 bool need_write_epoch,
1157 ObjectStore::Transaction &t) override;
1158
1159 void write_if_dirty(PeeringCtx &rctx) {
1160 write_if_dirty(rctx.transaction);
1161 }
1162 protected:
1163 void write_if_dirty(ObjectStore::Transaction& t) {
1164 recovery_state.write_if_dirty(t);
1165 }
1166
1167 PGLog::IndexedLog projected_log;
1168 bool check_in_progress_op(
1169 const osd_reqid_t &r,
1170 eversion_t *version,
1171 version_t *user_version,
1172 int *return_code,
1173 std::vector<pg_log_op_return_item_t> *op_returns) const;
1174 eversion_t projected_last_update;
1175 eversion_t get_next_version() const {
1176 eversion_t at_version(
1177 get_osdmap_epoch(),
1178 projected_last_update.version+1);
1179 ceph_assert(at_version > info.last_update);
1180 ceph_assert(at_version > recovery_state.get_pg_log().get_head());
1181 ceph_assert(at_version > projected_last_update);
1182 return at_version;
1183 }
1184
1185 bool check_log_for_corruption(ObjectStore *store);
1186
1187 std::string get_corrupt_pg_log_name() const;
1188
1189 void update_snap_map(
1190 const std::vector<pg_log_entry_t> &log_entries,
1191 ObjectStore::Transaction& t);
1192
1193 void filter_snapc(std::vector<snapid_t> &snaps);
1194
1195 virtual void kick_snap_trim() = 0;
1196 virtual void snap_trimmer_scrub_complete() = 0;
1197
1198 void queue_recovery();
1199 void queue_scrub_after_repair();
1200 unsigned int get_scrub_priority();
1201
1202 bool try_flush_or_schedule_async() override;
1203 void start_flush_on_transaction(
1204 ObjectStore::Transaction &t) override;
1205
1206 void update_history(const pg_history_t& history) {
1207 recovery_state.update_history(history);
1208 }
1209
1210 // OpRequest queueing
1211 bool can_discard_op(OpRequestRef& op);
1212 bool can_discard_scan(OpRequestRef op);
1213 bool can_discard_backfill(OpRequestRef op);
1214 bool can_discard_request(OpRequestRef& op);
1215
1216 template<typename T, int MSGTYPE>
1217 bool can_discard_replica_op(OpRequestRef& op);
1218
1219 bool old_peering_msg(epoch_t reply_epoch, epoch_t query_epoch);
1220 bool old_peering_evt(PGPeeringEventRef evt) {
1221 return old_peering_msg(evt->get_epoch_sent(), evt->get_epoch_requested());
1222 }
1223 bool have_same_or_newer_map(epoch_t e) {
1224 return e <= get_osdmap_epoch();
1225 }
1226
1227 bool op_has_sufficient_caps(OpRequestRef& op);
1228
1229 // abstract bits
1230 friend struct FlushState;
1231
1232 friend ostream& operator<<(ostream& out, const PG& pg);
1233
1234 protected:
1235 PeeringState recovery_state;
1236
1237 // ref to recovery_state.pool
1238 const PGPool &pool;
1239
1240 // ref to recovery_state.info
1241 const pg_info_t &info;
1242 };
1243
1244 #endif