]> git.proxmox.com Git - ceph.git/blob - ceph/src/osd/PG.cc
import ceph quincy 17.2.4
[ceph.git] / ceph / src / osd / PG.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #include "PG.h"
16 #include "messages/MOSDRepScrub.h"
17
18 #include "common/errno.h"
19 #include "common/ceph_releases.h"
20 #include "common/config.h"
21 #include "OSD.h"
22 #include "OpRequest.h"
23 #include "osd/scrubber/ScrubStore.h"
24 #include "osd/scrubber/pg_scrubber.h"
25 #include "osd/scheduler/OpSchedulerItem.h"
26 #include "Session.h"
27
28 #include "common/Timer.h"
29 #include "common/perf_counters.h"
30
31 #include "messages/MOSDOp.h"
32 #include "messages/MOSDPGScan.h"
33 #include "messages/MOSDPGBackfill.h"
34 #include "messages/MOSDPGBackfillRemove.h"
35 #include "messages/MBackfillReserve.h"
36 #include "messages/MRecoveryReserve.h"
37 #include "messages/MOSDPGPush.h"
38 #include "messages/MOSDPGPushReply.h"
39 #include "messages/MOSDPGPull.h"
40 #include "messages/MOSDECSubOpWrite.h"
41 #include "messages/MOSDECSubOpWriteReply.h"
42 #include "messages/MOSDECSubOpRead.h"
43 #include "messages/MOSDECSubOpReadReply.h"
44 #include "messages/MOSDPGUpdateLogMissing.h"
45 #include "messages/MOSDPGUpdateLogMissingReply.h"
46 #include "messages/MOSDBackoff.h"
47 #include "messages/MOSDScrubReserve.h"
48 #include "messages/MOSDRepOp.h"
49 #include "messages/MOSDRepOpReply.h"
50 #include "messages/MOSDRepScrubMap.h"
51 #include "messages/MOSDPGRecoveryDelete.h"
52 #include "messages/MOSDPGRecoveryDeleteReply.h"
53
54 #include "common/BackTrace.h"
55 #include "common/EventTrace.h"
56
57 #ifdef WITH_LTTNG
58 #define TRACEPOINT_DEFINE
59 #define TRACEPOINT_PROBE_DYNAMIC_LINKAGE
60 #include "tracing/pg.h"
61 #undef TRACEPOINT_PROBE_DYNAMIC_LINKAGE
62 #undef TRACEPOINT_DEFINE
63 #else
64 #define tracepoint(...)
65 #endif
66
67 #include <sstream>
68
69 #define dout_context cct
70 #define dout_subsys ceph_subsys_osd
71 #undef dout_prefix
72 #define dout_prefix _prefix(_dout, this)
73
74 using std::list;
75 using std::map;
76 using std::ostringstream;
77 using std::pair;
78 using std::set;
79 using std::string;
80 using std::stringstream;
81 using std::unique_ptr;
82 using std::vector;
83
84 using ceph::bufferlist;
85 using ceph::bufferptr;
86 using ceph::decode;
87 using ceph::encode;
88 using ceph::Formatter;
89
90 using namespace ceph::osd::scheduler;
91
92 template <class T>
93 static ostream& _prefix(std::ostream *_dout, T *t)
94 {
95 return t->gen_prefix(*_dout);
96 }
97
98 void PG::get(const char* tag)
99 {
100 int after = ++ref;
101 lgeneric_subdout(cct, refs, 5) << "PG::get " << this << " "
102 << "tag " << (tag ? tag : "(none") << " "
103 << (after - 1) << " -> " << after << dendl;
104 #ifdef PG_DEBUG_REFS
105 std::lock_guard l(_ref_id_lock);
106 _tag_counts[tag]++;
107 #endif
108 }
109
110 void PG::put(const char* tag)
111 {
112 #ifdef PG_DEBUG_REFS
113 {
114 std::lock_guard l(_ref_id_lock);
115 auto tag_counts_entry = _tag_counts.find(tag);
116 ceph_assert(tag_counts_entry != _tag_counts.end());
117 --tag_counts_entry->second;
118 if (tag_counts_entry->second == 0) {
119 _tag_counts.erase(tag_counts_entry);
120 }
121 }
122 #endif
123 auto local_cct = cct;
124 int after = --ref;
125 lgeneric_subdout(local_cct, refs, 5) << "PG::put " << this << " "
126 << "tag " << (tag ? tag : "(none") << " "
127 << (after + 1) << " -> " << after
128 << dendl;
129 if (after == 0)
130 delete this;
131 }
132
133 #ifdef PG_DEBUG_REFS
134 uint64_t PG::get_with_id()
135 {
136 ref++;
137 std::lock_guard l(_ref_id_lock);
138 uint64_t id = ++_ref_id;
139 ClibBackTrace bt(0);
140 stringstream ss;
141 bt.print(ss);
142 lgeneric_subdout(cct, refs, 5) << "PG::get " << this << " " << info.pgid
143 << " got id " << id << " "
144 << (ref - 1) << " -> " << ref
145 << dendl;
146 ceph_assert(!_live_ids.count(id));
147 _live_ids.insert(make_pair(id, ss.str()));
148 return id;
149 }
150
151 void PG::put_with_id(uint64_t id)
152 {
153 int newref = --ref;
154 lgeneric_subdout(cct, refs, 5) << "PG::put " << this << " " << info.pgid
155 << " put id " << id << " "
156 << (newref + 1) << " -> " << newref
157 << dendl;
158 {
159 std::lock_guard l(_ref_id_lock);
160 ceph_assert(_live_ids.count(id));
161 _live_ids.erase(id);
162 }
163 if (newref)
164 delete this;
165 }
166
167 void PG::dump_live_ids()
168 {
169 std::lock_guard l(_ref_id_lock);
170 dout(0) << "\t" << __func__ << ": " << info.pgid << " live ids:" << dendl;
171 for (map<uint64_t, string>::iterator i = _live_ids.begin();
172 i != _live_ids.end();
173 ++i) {
174 dout(0) << "\t\tid: " << *i << dendl;
175 }
176 dout(0) << "\t" << __func__ << ": " << info.pgid << " live tags:" << dendl;
177 for (map<string, uint64_t>::iterator i = _tag_counts.begin();
178 i != _tag_counts.end();
179 ++i) {
180 dout(0) << "\t\tid: " << *i << dendl;
181 }
182 }
183 #endif
184
185 PG::PG(OSDService *o, OSDMapRef curmap,
186 const PGPool &_pool, spg_t p) :
187 pg_whoami(o->whoami, p.shard),
188 pg_id(p),
189 coll(p),
190 osd(o),
191 cct(o->cct),
192 osdriver(osd->store, coll_t(), OSD::make_snapmapper_oid()),
193 snap_mapper(
194 cct,
195 &osdriver,
196 p.ps(),
197 p.get_split_bits(_pool.info.get_pg_num()),
198 _pool.id,
199 p.shard),
200 trace_endpoint("0.0.0.0", 0, "PG"),
201 info_struct_v(0),
202 pgmeta_oid(p.make_pgmeta_oid()),
203 stat_queue_item(this),
204 recovery_queued(false),
205 recovery_ops_active(0),
206 backfill_reserving(false),
207 finish_sync_event(NULL),
208 scrub_after_recovery(false),
209 active_pushes(0),
210 recovery_state(
211 o->cct,
212 pg_whoami,
213 p,
214 _pool,
215 curmap,
216 this,
217 this),
218 pool(recovery_state.get_pool()),
219 info(recovery_state.get_info())
220 {
221 #ifdef PG_DEBUG_REFS
222 osd->add_pgid(p, this);
223 #endif
224 #ifdef WITH_BLKIN
225 std::stringstream ss;
226 ss << "PG " << info.pgid;
227 trace_endpoint.copy_name(ss.str());
228 #endif
229 }
230
231 PG::~PG()
232 {
233 #ifdef PG_DEBUG_REFS
234 osd->remove_pgid(info.pgid, this);
235 #endif
236 }
237
238 void PG::lock(bool no_lockdep) const
239 {
240 #ifdef CEPH_DEBUG_MUTEX
241 _lock.lock(no_lockdep);
242 #else
243 _lock.lock();
244 locked_by = std::this_thread::get_id();
245 #endif
246 // if we have unrecorded dirty state with the lock dropped, there is a bug
247 ceph_assert(!recovery_state.debug_has_dirty_state());
248
249 dout(30) << "lock" << dendl;
250 }
251
252 bool PG::is_locked() const
253 {
254 return ceph_mutex_is_locked(_lock);
255 }
256
257 void PG::unlock() const
258 {
259 //generic_dout(0) << this << " " << info.pgid << " unlock" << dendl;
260 ceph_assert(!recovery_state.debug_has_dirty_state());
261 #ifndef CEPH_DEBUG_MUTEX
262 locked_by = {};
263 #endif
264 _lock.unlock();
265 }
266
267 std::ostream& PG::gen_prefix(std::ostream& out) const
268 {
269 OSDMapRef mapref = recovery_state.get_osdmap();
270 #ifdef CEPH_DEBUG_MUTEX
271 if (_lock.is_locked_by_me()) {
272 #else
273 if (locked_by == std::this_thread::get_id()) {
274 #endif
275 out << "osd." << osd->whoami
276 << " pg_epoch: " << (mapref ? mapref->get_epoch():0)
277 << " " << *this << " ";
278 } else {
279 out << "osd." << osd->whoami
280 << " pg_epoch: " << (mapref ? mapref->get_epoch():0)
281 << " pg[" << pg_id.pgid << "(unlocked)] ";
282 }
283 return out;
284 }
285
286 PerfCounters &PG::get_peering_perf() {
287 return *(osd->recoverystate_perf);
288 }
289
290 PerfCounters &PG::get_perf_logger() {
291 return *(osd->logger);
292 }
293
294 void PG::log_state_enter(const char *state) {
295 osd->pg_recovery_stats.log_enter(state);
296 }
297
298 void PG::log_state_exit(
299 const char *state_name, utime_t enter_time,
300 uint64_t events, utime_t event_dur) {
301 osd->pg_recovery_stats.log_exit(
302 state_name, ceph_clock_now() - enter_time, events, event_dur);
303 }
304
305 /********* PG **********/
306
307 void PG::remove_snap_mapped_object(
308 ObjectStore::Transaction &t, const hobject_t &soid)
309 {
310 t.remove(
311 coll,
312 ghobject_t(soid, ghobject_t::NO_GEN, pg_whoami.shard));
313 clear_object_snap_mapping(&t, soid);
314 }
315
316 void PG::clear_object_snap_mapping(
317 ObjectStore::Transaction *t, const hobject_t &soid)
318 {
319 OSDriver::OSTransaction _t(osdriver.get_transaction(t));
320 if (soid.snap < CEPH_MAXSNAP) {
321 int r = snap_mapper.remove_oid(
322 soid,
323 &_t);
324 if (!(r == 0 || r == -ENOENT)) {
325 derr << __func__ << ": remove_oid returned " << cpp_strerror(r) << dendl;
326 ceph_abort();
327 }
328 }
329 }
330
331 void PG::update_object_snap_mapping(
332 ObjectStore::Transaction *t, const hobject_t &soid, const set<snapid_t> &snaps)
333 {
334 OSDriver::OSTransaction _t(osdriver.get_transaction(t));
335 ceph_assert(soid.snap < CEPH_MAXSNAP);
336 int r = snap_mapper.remove_oid(
337 soid,
338 &_t);
339 if (!(r == 0 || r == -ENOENT)) {
340 derr << __func__ << ": remove_oid returned " << cpp_strerror(r) << dendl;
341 ceph_abort();
342 }
343 snap_mapper.add_oid(
344 soid,
345 snaps,
346 &_t);
347 }
348
349 /******* PG ***********/
350 void PG::clear_primary_state()
351 {
352 dout(20) << __func__ << dendl;
353
354 projected_log = PGLog::IndexedLog();
355
356 snap_trimq.clear();
357 snap_trimq_repeat.clear();
358 finish_sync_event = 0; // so that _finish_recovery doesn't go off in another thread
359 release_pg_backoffs();
360
361 if (m_scrubber) {
362 m_scrubber->discard_replica_reservations();
363 }
364 scrub_after_recovery = false;
365
366 agent_clear();
367 }
368
369
370 bool PG::op_has_sufficient_caps(OpRequestRef& op)
371 {
372 // only check MOSDOp
373 if (op->get_req()->get_type() != CEPH_MSG_OSD_OP)
374 return true;
375
376 auto req = op->get_req<MOSDOp>();
377 auto priv = req->get_connection()->get_priv();
378 auto session = static_cast<Session*>(priv.get());
379 if (!session) {
380 dout(0) << "op_has_sufficient_caps: no session for op " << *req << dendl;
381 return false;
382 }
383 OSDCap& caps = session->caps;
384 priv.reset();
385
386 const string &key = req->get_hobj().get_key().empty() ?
387 req->get_oid().name :
388 req->get_hobj().get_key();
389
390 bool cap = caps.is_capable(pool.name, req->get_hobj().nspace,
391 pool.info.application_metadata,
392 key,
393 op->need_read_cap(),
394 op->need_write_cap(),
395 op->classes(),
396 session->get_peer_socket_addr());
397
398 dout(20) << "op_has_sufficient_caps "
399 << "session=" << session
400 << " pool=" << pool.id << " (" << pool.name
401 << " " << req->get_hobj().nspace
402 << ")"
403 << " pool_app_metadata=" << pool.info.application_metadata
404 << " need_read_cap=" << op->need_read_cap()
405 << " need_write_cap=" << op->need_write_cap()
406 << " classes=" << op->classes()
407 << " -> " << (cap ? "yes" : "NO")
408 << dendl;
409 return cap;
410 }
411
412 void PG::queue_recovery()
413 {
414 if (!is_primary() || !is_peered()) {
415 dout(10) << "queue_recovery -- not primary or not peered " << dendl;
416 ceph_assert(!recovery_queued);
417 } else if (recovery_queued) {
418 dout(10) << "queue_recovery -- already queued" << dendl;
419 } else {
420 dout(10) << "queue_recovery -- queuing" << dendl;
421 recovery_queued = true;
422 osd->queue_for_recovery(this);
423 }
424 }
425
426 void PG::queue_scrub_after_repair()
427 {
428 dout(10) << __func__ << dendl;
429 ceph_assert(ceph_mutex_is_locked(_lock));
430
431 m_planned_scrub.must_deep_scrub = true;
432 m_planned_scrub.check_repair = true;
433 m_planned_scrub.must_scrub = true;
434
435 if (is_scrub_queued_or_active()) {
436 dout(10) << __func__ << ": scrubbing already ("
437 << (is_scrubbing() ? "active)" : "queued)") << dendl;
438 return;
439 }
440
441 m_scrubber->set_op_parameters(m_planned_scrub);
442 dout(15) << __func__ << ": queueing" << dendl;
443
444 osd->queue_scrub_after_repair(this, Scrub::scrub_prio_t::high_priority);
445 }
446
447 unsigned PG::get_scrub_priority()
448 {
449 // a higher value -> a higher priority
450 int64_t pool_scrub_priority =
451 pool.info.opts.value_or(pool_opts_t::SCRUB_PRIORITY, (int64_t)0);
452 return pool_scrub_priority > 0 ? pool_scrub_priority : cct->_conf->osd_scrub_priority;
453 }
454
455 Context *PG::finish_recovery()
456 {
457 dout(10) << "finish_recovery" << dendl;
458 ceph_assert(info.last_complete == info.last_update);
459
460 clear_recovery_state();
461
462 /*
463 * sync all this before purging strays. but don't block!
464 */
465 finish_sync_event = new C_PG_FinishRecovery(this);
466 return finish_sync_event;
467 }
468
469 void PG::_finish_recovery(Context* c)
470 {
471 dout(15) << __func__ << " finish_sync_event? " << finish_sync_event << " clean? "
472 << is_clean() << dendl;
473
474 std::scoped_lock locker{*this};
475 if (recovery_state.is_deleting() || !is_clean()) {
476 dout(10) << __func__ << " raced with delete or repair" << dendl;
477 return;
478 }
479 // When recovery is initiated by a repair, that flag is left on
480 state_clear(PG_STATE_REPAIR);
481 if (c == finish_sync_event) {
482 dout(15) << __func__ << " scrub_after_recovery? " << scrub_after_recovery << dendl;
483 finish_sync_event = 0;
484 recovery_state.purge_strays();
485
486 publish_stats_to_osd();
487
488 if (scrub_after_recovery) {
489 dout(10) << "_finish_recovery requeueing for scrub" << dendl;
490 scrub_after_recovery = false;
491 queue_scrub_after_repair();
492 }
493 } else {
494 dout(10) << "_finish_recovery -- stale" << dendl;
495 }
496 }
497
498 void PG::start_recovery_op(const hobject_t& soid)
499 {
500 dout(10) << "start_recovery_op " << soid
501 #ifdef DEBUG_RECOVERY_OIDS
502 << " (" << recovering_oids << ")"
503 #endif
504 << dendl;
505 ceph_assert(recovery_ops_active >= 0);
506 recovery_ops_active++;
507 #ifdef DEBUG_RECOVERY_OIDS
508 recovering_oids.insert(soid);
509 #endif
510 osd->start_recovery_op(this, soid);
511 }
512
513 void PG::finish_recovery_op(const hobject_t& soid, bool dequeue)
514 {
515 dout(10) << "finish_recovery_op " << soid
516 #ifdef DEBUG_RECOVERY_OIDS
517 << " (" << recovering_oids << ")"
518 #endif
519 << dendl;
520 ceph_assert(recovery_ops_active > 0);
521 recovery_ops_active--;
522 #ifdef DEBUG_RECOVERY_OIDS
523 ceph_assert(recovering_oids.count(soid));
524 recovering_oids.erase(recovering_oids.find(soid));
525 #endif
526 osd->finish_recovery_op(this, soid, dequeue);
527
528 if (!dequeue) {
529 queue_recovery();
530 }
531 }
532
533 void PG::split_into(pg_t child_pgid, PG *child, unsigned split_bits)
534 {
535 recovery_state.split_into(child_pgid, &child->recovery_state, split_bits);
536
537 child->update_snap_mapper_bits(split_bits);
538
539 child->snap_trimq = snap_trimq;
540 child->snap_trimq_repeat = snap_trimq_repeat;
541
542 _split_into(child_pgid, child, split_bits);
543
544 // release all backoffs for simplicity
545 release_backoffs(hobject_t(), hobject_t::get_max());
546 }
547
548 void PG::start_split_stats(const set<spg_t>& childpgs, vector<object_stat_sum_t> *out)
549 {
550 recovery_state.start_split_stats(childpgs, out);
551 }
552
553 void PG::finish_split_stats(const object_stat_sum_t& stats, ObjectStore::Transaction &t)
554 {
555 recovery_state.finish_split_stats(stats, t);
556 }
557
558 void PG::merge_from(map<spg_t,PGRef>& sources, PeeringCtx &rctx,
559 unsigned split_bits,
560 const pg_merge_meta_t& last_pg_merge_meta)
561 {
562 dout(10) << __func__ << " from " << sources << " split_bits " << split_bits
563 << dendl;
564 map<spg_t, PeeringState*> source_ps;
565 for (auto &&source : sources) {
566 source_ps.emplace(source.first, &source.second->recovery_state);
567 }
568 recovery_state.merge_from(source_ps, rctx, split_bits, last_pg_merge_meta);
569
570 for (auto& i : sources) {
571 auto& source = i.second;
572 // wipe out source's pgmeta
573 rctx.transaction.remove(source->coll, source->pgmeta_oid);
574
575 // merge (and destroy source collection)
576 rctx.transaction.merge_collection(source->coll, coll, split_bits);
577 }
578
579 // merge_collection does this, but maybe all of our sources were missing.
580 rctx.transaction.collection_set_bits(coll, split_bits);
581
582 snap_mapper.update_bits(split_bits);
583 }
584
585 void PG::add_backoff(const ceph::ref_t<Session>& s, const hobject_t& begin, const hobject_t& end)
586 {
587 auto con = s->con;
588 if (!con) // OSD::ms_handle_reset clears s->con without a lock
589 return;
590 auto b = s->have_backoff(info.pgid, begin);
591 if (b) {
592 derr << __func__ << " already have backoff for " << s << " begin " << begin
593 << " " << *b << dendl;
594 ceph_abort();
595 }
596 std::lock_guard l(backoff_lock);
597 b = ceph::make_ref<Backoff>(info.pgid, this, s, ++s->backoff_seq, begin, end);
598 backoffs[begin].insert(b);
599 s->add_backoff(b);
600 dout(10) << __func__ << " session " << s << " added " << *b << dendl;
601 con->send_message(
602 new MOSDBackoff(
603 info.pgid,
604 get_osdmap_epoch(),
605 CEPH_OSD_BACKOFF_OP_BLOCK,
606 b->id,
607 begin,
608 end));
609 }
610
611 void PG::release_backoffs(const hobject_t& begin, const hobject_t& end)
612 {
613 dout(10) << __func__ << " [" << begin << "," << end << ")" << dendl;
614 vector<ceph::ref_t<Backoff>> bv;
615 {
616 std::lock_guard l(backoff_lock);
617 auto p = backoffs.lower_bound(begin);
618 while (p != backoffs.end()) {
619 int r = cmp(p->first, end);
620 dout(20) << __func__ << " ? " << r << " " << p->first
621 << " " << p->second << dendl;
622 // note: must still examine begin=end=p->first case
623 if (r > 0 || (r == 0 && begin < end)) {
624 break;
625 }
626 dout(20) << __func__ << " checking " << p->first
627 << " " << p->second << dendl;
628 auto q = p->second.begin();
629 while (q != p->second.end()) {
630 dout(20) << __func__ << " checking " << *q << dendl;
631 int rr = cmp((*q)->begin, begin);
632 if (rr == 0 || (rr > 0 && (*q)->end < end)) {
633 bv.push_back(*q);
634 q = p->second.erase(q);
635 } else {
636 ++q;
637 }
638 }
639 if (p->second.empty()) {
640 p = backoffs.erase(p);
641 } else {
642 ++p;
643 }
644 }
645 }
646 for (auto b : bv) {
647 std::lock_guard l(b->lock);
648 dout(10) << __func__ << " " << *b << dendl;
649 if (b->session) {
650 ceph_assert(b->pg == this);
651 ConnectionRef con = b->session->con;
652 if (con) { // OSD::ms_handle_reset clears s->con without a lock
653 con->send_message(
654 new MOSDBackoff(
655 info.pgid,
656 get_osdmap_epoch(),
657 CEPH_OSD_BACKOFF_OP_UNBLOCK,
658 b->id,
659 b->begin,
660 b->end));
661 }
662 if (b->is_new()) {
663 b->state = Backoff::STATE_DELETING;
664 } else {
665 b->session->rm_backoff(b);
666 b->session.reset();
667 }
668 b->pg.reset();
669 }
670 }
671 }
672
673 void PG::clear_backoffs()
674 {
675 dout(10) << __func__ << " " << dendl;
676 map<hobject_t,set<ceph::ref_t<Backoff>>> ls;
677 {
678 std::lock_guard l(backoff_lock);
679 ls.swap(backoffs);
680 }
681 for (auto& p : ls) {
682 for (auto& b : p.second) {
683 std::lock_guard l(b->lock);
684 dout(10) << __func__ << " " << *b << dendl;
685 if (b->session) {
686 ceph_assert(b->pg == this);
687 if (b->is_new()) {
688 b->state = Backoff::STATE_DELETING;
689 } else {
690 b->session->rm_backoff(b);
691 b->session.reset();
692 }
693 b->pg.reset();
694 }
695 }
696 }
697 }
698
699 // called by Session::clear_backoffs()
700 void PG::rm_backoff(const ceph::ref_t<Backoff>& b)
701 {
702 dout(10) << __func__ << " " << *b << dendl;
703 std::lock_guard l(backoff_lock);
704 ceph_assert(ceph_mutex_is_locked_by_me(b->lock));
705 ceph_assert(b->pg == this);
706 auto p = backoffs.find(b->begin);
707 // may race with release_backoffs()
708 if (p != backoffs.end()) {
709 auto q = p->second.find(b);
710 if (q != p->second.end()) {
711 p->second.erase(q);
712 if (p->second.empty()) {
713 backoffs.erase(p);
714 }
715 }
716 }
717 }
718
719 void PG::clear_recovery_state()
720 {
721 dout(10) << "clear_recovery_state" << dendl;
722
723 finish_sync_event = 0;
724
725 hobject_t soid;
726 while (recovery_ops_active > 0) {
727 #ifdef DEBUG_RECOVERY_OIDS
728 soid = *recovering_oids.begin();
729 #endif
730 finish_recovery_op(soid, true);
731 }
732
733 backfill_info.clear();
734 peer_backfill_info.clear();
735 waiting_on_backfill.clear();
736 _clear_recovery_state(); // pg impl specific hook
737 }
738
739 void PG::cancel_recovery()
740 {
741 dout(10) << "cancel_recovery" << dendl;
742 clear_recovery_state();
743 }
744
745 void PG::set_probe_targets(const set<pg_shard_t> &probe_set)
746 {
747 std::lock_guard l(heartbeat_peer_lock);
748 probe_targets.clear();
749 for (set<pg_shard_t>::iterator i = probe_set.begin();
750 i != probe_set.end();
751 ++i) {
752 probe_targets.insert(i->osd);
753 }
754 }
755
756 void PG::send_cluster_message(
757 int target, MessageRef m,
758 epoch_t epoch, bool share_map_update)
759 {
760 ConnectionRef con = osd->get_con_osd_cluster(
761 target, get_osdmap_epoch());
762 if (!con) {
763 return;
764 }
765
766 if (share_map_update) {
767 osd->maybe_share_map(con.get(), get_osdmap());
768 }
769 osd->send_message_osd_cluster(m, con.get());
770 }
771
772 void PG::clear_probe_targets()
773 {
774 std::lock_guard l(heartbeat_peer_lock);
775 probe_targets.clear();
776 }
777
778 void PG::update_heartbeat_peers(set<int> new_peers)
779 {
780 bool need_update = false;
781 heartbeat_peer_lock.lock();
782 if (new_peers == heartbeat_peers) {
783 dout(10) << "update_heartbeat_peers " << heartbeat_peers << " unchanged" << dendl;
784 } else {
785 dout(10) << "update_heartbeat_peers " << heartbeat_peers << " -> " << new_peers << dendl;
786 heartbeat_peers.swap(new_peers);
787 need_update = true;
788 }
789 heartbeat_peer_lock.unlock();
790
791 if (need_update)
792 osd->need_heartbeat_peer_update();
793 }
794
795
796 bool PG::check_in_progress_op(
797 const osd_reqid_t &r,
798 eversion_t *version,
799 version_t *user_version,
800 int *return_code,
801 vector<pg_log_op_return_item_t> *op_returns
802 ) const
803 {
804 return (
805 projected_log.get_request(r, version, user_version, return_code,
806 op_returns) ||
807 recovery_state.get_pg_log().get_log().get_request(
808 r, version, user_version, return_code, op_returns));
809 }
810
811 void PG::publish_stats_to_osd()
812 {
813 if (!is_primary())
814 return;
815
816 ceph_assert(m_scrubber);
817 recovery_state.update_stats_wo_resched(
818 [scrubber = m_scrubber.get()](pg_history_t& hist,
819 pg_stat_t& info) mutable -> void {
820 info.scrub_sched_status = scrubber->get_schedule();
821 });
822
823 std::lock_guard l{pg_stats_publish_lock};
824 auto stats =
825 recovery_state.prepare_stats_for_publish(pg_stats_publish, unstable_stats);
826 if (stats) {
827 pg_stats_publish = std::move(stats);
828 }
829 }
830
831 unsigned PG::get_target_pg_log_entries() const
832 {
833 return osd->get_target_pg_log_entries();
834 }
835
836 void PG::clear_publish_stats()
837 {
838 dout(15) << "clear_stats" << dendl;
839 std::lock_guard l{pg_stats_publish_lock};
840 pg_stats_publish.reset();
841 }
842
843 /**
844 * initialize a newly instantiated pg
845 *
846 * Initialize PG state, as when a PG is initially created, or when it
847 * is first instantiated on the current node.
848 *
849 * @param role our role/rank
850 * @param newup up set
851 * @param newacting acting set
852 * @param history pg history
853 * @param pi past_intervals
854 * @param backfill true if info should be marked as backfill
855 * @param t transaction to write out our new state in
856 */
857 void PG::init(
858 int role,
859 const vector<int>& newup, int new_up_primary,
860 const vector<int>& newacting, int new_acting_primary,
861 const pg_history_t& history,
862 const PastIntervals& pi,
863 ObjectStore::Transaction &t)
864 {
865 recovery_state.init(
866 role, newup, new_up_primary, newacting,
867 new_acting_primary, history, pi, t);
868 }
869
870 void PG::shutdown()
871 {
872 ch->flush();
873 std::scoped_lock l{*this};
874 recovery_state.shutdown();
875 on_shutdown();
876 }
877
878 #pragma GCC diagnostic ignored "-Wpragmas"
879 #pragma GCC diagnostic push
880 #pragma GCC diagnostic ignored "-Wdeprecated-declarations"
881
882 void PG::upgrade(ObjectStore *store)
883 {
884 dout(0) << __func__ << " " << info_struct_v << " -> " << pg_latest_struct_v
885 << dendl;
886 ceph_assert(info_struct_v <= 10);
887 ObjectStore::Transaction t;
888
889 // <do upgrade steps here>
890
891 // finished upgrade!
892 ceph_assert(info_struct_v == 10);
893
894 // update infover_key
895 if (info_struct_v < pg_latest_struct_v) {
896 map<string,bufferlist> v;
897 __u8 ver = pg_latest_struct_v;
898 encode(ver, v[string(infover_key)]);
899 t.omap_setkeys(coll, pgmeta_oid, v);
900 }
901
902 recovery_state.force_write_state(t);
903
904 ObjectStore::CollectionHandle ch = store->open_collection(coll);
905 int r = store->queue_transaction(ch, std::move(t));
906 if (r != 0) {
907 derr << __func__ << ": queue_transaction returned "
908 << cpp_strerror(r) << dendl;
909 ceph_abort();
910 }
911 ceph_assert(r == 0);
912
913 C_SaferCond waiter;
914 if (!ch->flush_commit(&waiter)) {
915 waiter.wait();
916 }
917 }
918
919 #pragma GCC diagnostic pop
920 #pragma GCC diagnostic warning "-Wpragmas"
921
922 void PG::prepare_write(
923 pg_info_t &info,
924 pg_info_t &last_written_info,
925 PastIntervals &past_intervals,
926 PGLog &pglog,
927 bool dirty_info,
928 bool dirty_big_info,
929 bool need_write_epoch,
930 ObjectStore::Transaction &t)
931 {
932 info.stats.stats.add(unstable_stats);
933 unstable_stats.clear();
934 map<string,bufferlist> km;
935 string key_to_remove;
936 if (dirty_big_info || dirty_info) {
937 int ret = prepare_info_keymap(
938 cct,
939 &km,
940 &key_to_remove,
941 get_osdmap_epoch(),
942 info,
943 last_written_info,
944 past_intervals,
945 dirty_big_info,
946 need_write_epoch,
947 cct->_conf->osd_fast_info,
948 osd->logger,
949 this);
950 ceph_assert(ret == 0);
951 }
952 pglog.write_log_and_missing(
953 t, &km, coll, pgmeta_oid, pool.info.require_rollback());
954 if (!km.empty())
955 t.omap_setkeys(coll, pgmeta_oid, km);
956 if (!key_to_remove.empty())
957 t.omap_rmkey(coll, pgmeta_oid, key_to_remove);
958 }
959
960 #pragma GCC diagnostic ignored "-Wpragmas"
961 #pragma GCC diagnostic push
962 #pragma GCC diagnostic ignored "-Wdeprecated-declarations"
963
964 bool PG::_has_removal_flag(ObjectStore *store,
965 spg_t pgid)
966 {
967 coll_t coll(pgid);
968 ghobject_t pgmeta_oid(pgid.make_pgmeta_oid());
969
970 // first try new way
971 set<string> keys;
972 keys.insert("_remove");
973 map<string,bufferlist> values;
974 auto ch = store->open_collection(coll);
975 ceph_assert(ch);
976 if (store->omap_get_values(ch, pgmeta_oid, keys, &values) == 0 &&
977 values.size() == 1)
978 return true;
979
980 return false;
981 }
982
983 int PG::peek_map_epoch(ObjectStore *store,
984 spg_t pgid,
985 epoch_t *pepoch)
986 {
987 coll_t coll(pgid);
988 ghobject_t pgmeta_oid(pgid.make_pgmeta_oid());
989 epoch_t cur_epoch = 0;
990
991 // validate collection name
992 ceph_assert(coll.is_pg());
993
994 // try for v8
995 set<string> keys;
996 keys.insert(string(infover_key));
997 keys.insert(string(epoch_key));
998 map<string,bufferlist> values;
999 auto ch = store->open_collection(coll);
1000 ceph_assert(ch);
1001 int r = store->omap_get_values(ch, pgmeta_oid, keys, &values);
1002 if (r == 0) {
1003 ceph_assert(values.size() == 2);
1004
1005 // sanity check version
1006 auto bp = values[string(infover_key)].cbegin();
1007 __u8 struct_v = 0;
1008 decode(struct_v, bp);
1009 ceph_assert(struct_v >= 8);
1010
1011 // get epoch
1012 bp = values[string(epoch_key)].begin();
1013 decode(cur_epoch, bp);
1014 } else {
1015 // probably bug 10617; see OSD::load_pgs()
1016 return -1;
1017 }
1018
1019 *pepoch = cur_epoch;
1020 return 0;
1021 }
1022
1023 #pragma GCC diagnostic pop
1024 #pragma GCC diagnostic warning "-Wpragmas"
1025
1026 bool PG::check_log_for_corruption(ObjectStore *store)
1027 {
1028 /// TODO: this method needs to work with the omap log
1029 return true;
1030 }
1031
1032 //! Get the name we're going to save our corrupt page log as
1033 std::string PG::get_corrupt_pg_log_name() const
1034 {
1035 const int MAX_BUF = 512;
1036 char buf[MAX_BUF];
1037 struct tm tm_buf;
1038 time_t my_time(time(NULL));
1039 const struct tm *t = localtime_r(&my_time, &tm_buf);
1040 int ret = strftime(buf, sizeof(buf), "corrupt_log_%Y-%m-%d_%k:%M_", t);
1041 if (ret == 0) {
1042 dout(0) << "strftime failed" << dendl;
1043 return "corrupt_log_unknown_time";
1044 }
1045 string out(buf);
1046 out += stringify(info.pgid);
1047 return out;
1048 }
1049
1050 int PG::read_info(
1051 ObjectStore *store, spg_t pgid, const coll_t &coll,
1052 pg_info_t &info, PastIntervals &past_intervals,
1053 __u8 &struct_v)
1054 {
1055 set<string> keys;
1056 keys.insert(string(infover_key));
1057 keys.insert(string(info_key));
1058 keys.insert(string(biginfo_key));
1059 keys.insert(string(fastinfo_key));
1060 ghobject_t pgmeta_oid(pgid.make_pgmeta_oid());
1061 map<string,bufferlist> values;
1062 auto ch = store->open_collection(coll);
1063 ceph_assert(ch);
1064 int r = store->omap_get_values(ch, pgmeta_oid, keys, &values);
1065 ceph_assert(r == 0);
1066 ceph_assert(values.size() == 3 ||
1067 values.size() == 4);
1068
1069 auto p = values[string(infover_key)].cbegin();
1070 decode(struct_v, p);
1071 ceph_assert(struct_v >= 10);
1072
1073 p = values[string(info_key)].begin();
1074 decode(info, p);
1075
1076 p = values[string(biginfo_key)].begin();
1077 decode(past_intervals, p);
1078 decode(info.purged_snaps, p);
1079
1080 p = values[string(fastinfo_key)].begin();
1081 if (!p.end()) {
1082 pg_fast_info_t fast;
1083 decode(fast, p);
1084 fast.try_apply_to(&info);
1085 }
1086 return 0;
1087 }
1088
1089 void PG::read_state(ObjectStore *store)
1090 {
1091 PastIntervals past_intervals_from_disk;
1092 pg_info_t info_from_disk;
1093 int r = read_info(
1094 store,
1095 pg_id,
1096 coll,
1097 info_from_disk,
1098 past_intervals_from_disk,
1099 info_struct_v);
1100 ceph_assert(r >= 0);
1101
1102 if (info_struct_v < pg_compat_struct_v) {
1103 derr << "PG needs upgrade, but on-disk data is too old; upgrade to"
1104 << " an older version first." << dendl;
1105 ceph_abort_msg("PG too old to upgrade");
1106 }
1107
1108 recovery_state.init_from_disk_state(
1109 std::move(info_from_disk),
1110 std::move(past_intervals_from_disk),
1111 [this, store] (PGLog &pglog) {
1112 ostringstream oss;
1113 pglog.read_log_and_missing(
1114 store,
1115 ch,
1116 pgmeta_oid,
1117 info,
1118 oss,
1119 cct->_conf->osd_ignore_stale_divergent_priors,
1120 cct->_conf->osd_debug_verify_missing_on_start);
1121
1122 if (oss.tellp())
1123 osd->clog->error() << oss.str();
1124 return 0;
1125 });
1126
1127 if (info_struct_v < pg_latest_struct_v) {
1128 upgrade(store);
1129 }
1130
1131 // initialize current mapping
1132 {
1133 int primary, up_primary;
1134 vector<int> acting, up;
1135 get_osdmap()->pg_to_up_acting_osds(
1136 pg_id.pgid, &up, &up_primary, &acting, &primary);
1137 recovery_state.init_primary_up_acting(
1138 up,
1139 acting,
1140 up_primary,
1141 primary);
1142 recovery_state.set_role(OSDMap::calc_pg_role(pg_whoami, acting));
1143 }
1144
1145 // init pool options
1146 store->set_collection_opts(ch, pool.info.opts);
1147
1148 PeeringCtx rctx;
1149 handle_initialize(rctx);
1150 // note: we don't activate here because we know the OSD will advance maps
1151 // during boot.
1152 write_if_dirty(rctx.transaction);
1153 store->queue_transaction(ch, std::move(rctx.transaction));
1154 }
1155
1156 void PG::update_snap_map(
1157 const vector<pg_log_entry_t> &log_entries,
1158 ObjectStore::Transaction &t)
1159 {
1160 for (auto i = log_entries.cbegin(); i != log_entries.cend(); ++i) {
1161 OSDriver::OSTransaction _t(osdriver.get_transaction(&t));
1162 if (i->soid.snap < CEPH_MAXSNAP) {
1163 if (i->is_delete()) {
1164 int r = snap_mapper.remove_oid(
1165 i->soid,
1166 &_t);
1167 if (r)
1168 derr << __func__ << " remove_oid " << i->soid << " failed with " << r << dendl;
1169 // On removal tolerate missing key corruption
1170 ceph_assert(r == 0 || r == -ENOENT);
1171 } else if (i->is_update()) {
1172 ceph_assert(i->snaps.length() > 0);
1173 vector<snapid_t> snaps;
1174 bufferlist snapbl = i->snaps;
1175 auto p = snapbl.cbegin();
1176 try {
1177 decode(snaps, p);
1178 } catch (...) {
1179 derr << __func__ << " decode snaps failure on " << *i << dendl;
1180 snaps.clear();
1181 }
1182 set<snapid_t> _snaps(snaps.begin(), snaps.end());
1183
1184 if (i->is_clone() || i->is_promote()) {
1185 snap_mapper.add_oid(
1186 i->soid,
1187 _snaps,
1188 &_t);
1189 } else if (i->is_modify()) {
1190 int r = snap_mapper.update_snaps(
1191 i->soid,
1192 _snaps,
1193 0,
1194 &_t);
1195 ceph_assert(r == 0);
1196 } else {
1197 ceph_assert(i->is_clean());
1198 }
1199 }
1200 }
1201 }
1202 }
1203
1204 /**
1205 * filter trimming|trimmed snaps out of snapcontext
1206 */
1207 void PG::filter_snapc(vector<snapid_t> &snaps)
1208 {
1209 // nothing needs to trim, we can return immediately
1210 if (snap_trimq.empty() && info.purged_snaps.empty())
1211 return;
1212
1213 bool filtering = false;
1214 vector<snapid_t> newsnaps;
1215 for (vector<snapid_t>::iterator p = snaps.begin();
1216 p != snaps.end();
1217 ++p) {
1218 if (snap_trimq.contains(*p) || info.purged_snaps.contains(*p)) {
1219 if (!filtering) {
1220 // start building a new vector with what we've seen so far
1221 dout(10) << "filter_snapc filtering " << snaps << dendl;
1222 newsnaps.insert(newsnaps.begin(), snaps.begin(), p);
1223 filtering = true;
1224 }
1225 dout(20) << "filter_snapc removing trimq|purged snap " << *p << dendl;
1226 } else {
1227 if (filtering)
1228 newsnaps.push_back(*p); // continue building new vector
1229 }
1230 }
1231 if (filtering) {
1232 snaps.swap(newsnaps);
1233 dout(10) << "filter_snapc result " << snaps << dendl;
1234 }
1235 }
1236
1237 void PG::requeue_object_waiters(map<hobject_t, list<OpRequestRef>>& m)
1238 {
1239 for (auto it = m.begin(); it != m.end(); ++it)
1240 requeue_ops(it->second);
1241 m.clear();
1242 }
1243
1244 void PG::requeue_op(OpRequestRef op)
1245 {
1246 auto p = waiting_for_map.find(op->get_source());
1247 if (p != waiting_for_map.end()) {
1248 dout(20) << __func__ << " " << op << " (waiting_for_map " << p->first << ")"
1249 << dendl;
1250 p->second.push_front(op);
1251 } else {
1252 dout(20) << __func__ << " " << op << dendl;
1253 osd->enqueue_front(
1254 OpSchedulerItem(
1255 unique_ptr<OpSchedulerItem::OpQueueable>(new PGOpItem(info.pgid, op)),
1256 op->get_req()->get_cost(),
1257 op->get_req()->get_priority(),
1258 op->get_req()->get_recv_stamp(),
1259 op->get_req()->get_source().num(),
1260 get_osdmap_epoch()));
1261 }
1262 }
1263
1264 void PG::requeue_ops(list<OpRequestRef> &ls)
1265 {
1266 for (list<OpRequestRef>::reverse_iterator i = ls.rbegin();
1267 i != ls.rend();
1268 ++i) {
1269 requeue_op(*i);
1270 }
1271 ls.clear();
1272 }
1273
1274 void PG::requeue_map_waiters()
1275 {
1276 epoch_t epoch = get_osdmap_epoch();
1277 auto p = waiting_for_map.begin();
1278 while (p != waiting_for_map.end()) {
1279 if (epoch < p->second.front()->min_epoch) {
1280 dout(20) << __func__ << " " << p->first << " front op "
1281 << p->second.front() << " must still wait, doing nothing"
1282 << dendl;
1283 ++p;
1284 } else {
1285 dout(20) << __func__ << " " << p->first << " " << p->second << dendl;
1286 for (auto q = p->second.rbegin(); q != p->second.rend(); ++q) {
1287 auto req = *q;
1288 osd->enqueue_front(OpSchedulerItem(
1289 unique_ptr<OpSchedulerItem::OpQueueable>(new PGOpItem(info.pgid, req)),
1290 req->get_req()->get_cost(),
1291 req->get_req()->get_priority(),
1292 req->get_req()->get_recv_stamp(),
1293 req->get_req()->get_source().num(),
1294 epoch));
1295 }
1296 p = waiting_for_map.erase(p);
1297 }
1298 }
1299 }
1300
1301 bool PG::get_must_scrub() const
1302 {
1303 dout(20) << __func__ << " must_scrub? " << (m_planned_scrub.must_scrub ? "true" : "false") << dendl;
1304 return m_planned_scrub.must_scrub;
1305 }
1306
1307 unsigned int PG::scrub_requeue_priority(Scrub::scrub_prio_t with_priority) const
1308 {
1309 return m_scrubber->scrub_requeue_priority(with_priority);
1310 }
1311
1312 unsigned int PG::scrub_requeue_priority(Scrub::scrub_prio_t with_priority, unsigned int suggested_priority) const
1313 {
1314 return m_scrubber->scrub_requeue_priority(with_priority, suggested_priority);
1315 }
1316
1317 // ==========================================================================================
1318 // SCRUB
1319
1320 /*
1321 * implementation note:
1322 * PG::sched_scrub() is called only once per a specific scrub session.
1323 * That call commits us to the whatever choices are made (deep/shallow, etc').
1324 * Unless failing to start scrubbing, the 'planned scrub' flag-set is 'frozen' into
1325 * PgScrubber's m_flags, then cleared.
1326 */
1327 Scrub::schedule_result_t PG::sched_scrub()
1328 {
1329 using Scrub::schedule_result_t;
1330 dout(15) << __func__ << " pg(" << info.pgid
1331 << (is_active() ? ") <active>" : ") <not-active>")
1332 << (is_clean() ? " <clean>" : " <not-clean>") << dendl;
1333 ceph_assert(ceph_mutex_is_locked(_lock));
1334 ceph_assert(m_scrubber);
1335
1336 if (is_scrub_queued_or_active()) {
1337 return schedule_result_t::already_started;
1338 }
1339
1340 if (!is_primary() || !is_active() || !is_clean()) {
1341 return schedule_result_t::bad_pg_state;
1342 }
1343
1344 if (state_test(PG_STATE_SNAPTRIM) || state_test(PG_STATE_SNAPTRIM_WAIT)) {
1345 // note that the trimmer checks scrub status when setting 'snaptrim_wait'
1346 // (on the transition from NotTrimming to Trimming/WaitReservation),
1347 // i.e. some time before setting 'snaptrim'.
1348 dout(10) << __func__ << ": cannot scrub while snap-trimming" << dendl;
1349 return schedule_result_t::bad_pg_state;
1350 }
1351
1352 // analyse the combination of the requested scrub flags, the osd/pool configuration
1353 // and the PG status to determine whether we should scrub now, and what type of scrub
1354 // should that be.
1355 auto updated_flags = verify_scrub_mode();
1356 if (!updated_flags) {
1357 // the stars do not align for starting a scrub for this PG at this time
1358 // (due to configuration or priority issues)
1359 // The reason was already reported by the callee.
1360 dout(10) << __func__ << ": failed to initiate a scrub" << dendl;
1361 return schedule_result_t::preconditions;
1362 }
1363
1364 // try to reserve the local OSD resources. If failing: no harm. We will
1365 // be retried by the OSD later on.
1366 if (!m_scrubber->reserve_local()) {
1367 dout(10) << __func__ << ": failed to reserve locally" << dendl;
1368 return schedule_result_t::no_local_resources;
1369 }
1370
1371 // can commit to the updated flags now, as nothing will stop the scrub
1372 m_planned_scrub = *updated_flags;
1373
1374 // An interrupted recovery repair could leave this set.
1375 state_clear(PG_STATE_REPAIR);
1376
1377 // Pass control to the scrubber. It is the scrubber that handles the replicas'
1378 // resources reservations.
1379 m_scrubber->set_op_parameters(m_planned_scrub);
1380
1381 dout(10) << __func__ << ": queueing" << dendl;
1382 osd->queue_for_scrub(this, Scrub::scrub_prio_t::low_priority);
1383 return schedule_result_t::scrub_initiated;
1384 }
1385
1386 double PG::next_deepscrub_interval() const
1387 {
1388 double deep_scrub_interval =
1389 pool.info.opts.value_or(pool_opts_t::DEEP_SCRUB_INTERVAL, 0.0);
1390 if (deep_scrub_interval <= 0.0)
1391 deep_scrub_interval = cct->_conf->osd_deep_scrub_interval;
1392 return info.history.last_deep_scrub_stamp + deep_scrub_interval;
1393 }
1394
1395 bool PG::is_time_for_deep(bool allow_deep_scrub,
1396 bool allow_scrub,
1397 bool has_deep_errors,
1398 const requested_scrub_t& planned) const
1399 {
1400 dout(10) << __func__ << ": need_auto?" << planned.need_auto << " allow_deep_scrub? "
1401 << allow_deep_scrub << dendl;
1402
1403 if (!allow_deep_scrub)
1404 return false;
1405
1406 if (planned.need_auto) {
1407 dout(10) << __func__ << ": need repair after scrub errors" << dendl;
1408 return true;
1409 }
1410
1411 if (ceph_clock_now() >= next_deepscrub_interval()) {
1412 dout(20) << __func__ << ": now (" << ceph_clock_now() << ") >= time for deep ("
1413 << next_deepscrub_interval() << ")" << dendl;
1414 return true;
1415 }
1416
1417 if (has_deep_errors) {
1418 osd->clog->info() << "osd." << osd->whoami << " pg " << info.pgid
1419 << " Deep scrub errors, upgrading scrub to deep-scrub";
1420 return true;
1421 }
1422
1423 // we only flip coins if 'allow_scrub' is asserted. Otherwise - as this function is
1424 // called often, we will probably be deep-scrubbing most of the time.
1425 if (allow_scrub) {
1426 bool deep_coin_flip =
1427 (rand() % 100) < cct->_conf->osd_deep_scrub_randomize_ratio * 100;
1428
1429 dout(15) << __func__ << ": time_for_deep=" << planned.time_for_deep
1430 << " deep_coin_flip=" << deep_coin_flip << dendl;
1431
1432 if (deep_coin_flip)
1433 return true;
1434 }
1435
1436 return false;
1437 }
1438
1439 bool PG::verify_periodic_scrub_mode(bool allow_deep_scrub,
1440 bool try_to_auto_repair,
1441 bool allow_regular_scrub,
1442 bool has_deep_errors,
1443 requested_scrub_t& planned) const
1444
1445 {
1446 ceph_assert(!planned.must_deep_scrub && !planned.must_repair);
1447
1448 if (!allow_deep_scrub && has_deep_errors) {
1449 osd->clog->error()
1450 << "osd." << osd->whoami << " pg " << info.pgid
1451 << " Regular scrub skipped due to deep-scrub errors and nodeep-scrub set";
1452 return false;
1453 }
1454
1455 if (allow_deep_scrub) {
1456 // Initial entry and scheduled scrubs without nodeep_scrub set get here
1457
1458 planned.time_for_deep =
1459 is_time_for_deep(allow_deep_scrub, allow_regular_scrub, has_deep_errors, planned);
1460
1461 if (try_to_auto_repair) {
1462 if (planned.time_for_deep) {
1463 dout(20) << __func__ << ": auto repair with deep scrubbing" << dendl;
1464 planned.auto_repair = true;
1465 } else if (allow_regular_scrub) {
1466 dout(20) << __func__ << ": auto repair with scrubbing, rescrub if errors found"
1467 << dendl;
1468 planned.deep_scrub_on_error = true;
1469 }
1470 }
1471 }
1472
1473 dout(20) << __func__ << " updated flags: " << planned
1474 << " allow_regular_scrub: " << allow_regular_scrub << dendl;
1475
1476 // NOSCRUB so skip regular scrubs
1477 if (!allow_regular_scrub && !planned.time_for_deep) {
1478 return false;
1479 }
1480
1481 return true;
1482 }
1483
1484 std::optional<requested_scrub_t> PG::verify_scrub_mode() const
1485 {
1486 const bool allow_regular_scrub =
1487 !(get_osdmap()->test_flag(CEPH_OSDMAP_NOSCRUB) ||
1488 pool.info.has_flag(pg_pool_t::FLAG_NOSCRUB));
1489 const bool allow_deep_scrub =
1490 allow_regular_scrub &&
1491 !(get_osdmap()->test_flag(CEPH_OSDMAP_NODEEP_SCRUB) ||
1492 pool.info.has_flag(pg_pool_t::FLAG_NODEEP_SCRUB));
1493 const bool has_deep_errors = (info.stats.stats.sum.num_deep_scrub_errors > 0);
1494 const bool try_to_auto_repair = (cct->_conf->osd_scrub_auto_repair &&
1495 get_pgbackend()->auto_repair_supported());
1496
1497 dout(10) << __func__ << " pg: " << info.pgid
1498 << " allow: " << allow_regular_scrub << "/" << allow_deep_scrub
1499 << " deep errs: " << has_deep_errors
1500 << " auto-repair: " << try_to_auto_repair << " ("
1501 << cct->_conf->osd_scrub_auto_repair << ")" << dendl;
1502
1503 auto upd_flags = m_planned_scrub;
1504
1505 upd_flags.time_for_deep = false;
1506 // Clear these in case user issues the scrub/repair command during
1507 // the scheduling of the scrub/repair (e.g. request reservation)
1508 upd_flags.deep_scrub_on_error = false;
1509 upd_flags.auto_repair = false;
1510
1511 if (upd_flags.must_scrub && !upd_flags.must_deep_scrub && has_deep_errors) {
1512 osd->clog->error()
1513 << "osd." << osd->whoami << " pg " << info.pgid
1514 << " Regular scrub request, deep-scrub details will be lost";
1515 }
1516
1517 if (!upd_flags.must_scrub) {
1518 // All periodic scrub handling goes here because must_scrub is
1519 // always set for must_deep_scrub and must_repair.
1520
1521 const bool can_start_periodic = verify_periodic_scrub_mode(
1522 allow_deep_scrub, try_to_auto_repair, allow_regular_scrub,
1523 has_deep_errors, upd_flags);
1524 if (!can_start_periodic) {
1525 // "I don't want no scrub"
1526 dout(20) << __func__ << ": no periodic scrubs allowed" << dendl;
1527 return std::nullopt;
1528 }
1529 }
1530
1531 // scrubbing while recovering?
1532
1533 bool prevented_by_recovery =
1534 osd->is_recovery_active() && !cct->_conf->osd_scrub_during_recovery &&
1535 (!cct->_conf->osd_repair_during_recovery || !upd_flags.must_repair);
1536
1537 if (prevented_by_recovery) {
1538 dout(20) << __func__ << ": scrubbing prevented during recovery" << dendl;
1539 return std::nullopt;
1540 }
1541
1542 upd_flags.need_auto = false;
1543 return upd_flags;
1544 }
1545
1546 /*
1547 * Note: on_info_history_change() is used in those two cases where we're not sure
1548 * whether the role of the PG was changed, and if so - was this change relayed to the
1549 * scrub-queue.
1550 */
1551 void PG::on_info_history_change()
1552 {
1553 dout(20) << __func__ << " for a " << (is_primary() ? "Primary" : "non-primary") <<dendl;
1554
1555 ceph_assert(m_scrubber);
1556 m_scrubber->on_maybe_registration_change(m_planned_scrub);
1557 }
1558
1559 void PG::reschedule_scrub()
1560 {
1561 dout(20) << __func__ << " for a " << (is_primary() ? "Primary" : "non-primary") <<dendl;
1562
1563 // we are assuming no change in primary status
1564 if (is_primary()) {
1565 ceph_assert(m_scrubber);
1566 m_scrubber->update_scrub_job(m_planned_scrub);
1567 }
1568 }
1569
1570 void PG::on_primary_status_change(bool was_primary, bool now_primary)
1571 {
1572 // make sure we have a working scrubber when becoming a primary
1573
1574 if (was_primary != now_primary) {
1575 ceph_assert(m_scrubber);
1576 m_scrubber->on_primary_change(m_planned_scrub);
1577 }
1578 }
1579
1580 void PG::scrub_requested(scrub_level_t scrub_level, scrub_type_t scrub_type)
1581 {
1582 ceph_assert(m_scrubber);
1583 m_scrubber->scrub_requested(scrub_level, scrub_type, m_planned_scrub);
1584 }
1585
1586 void PG::clear_ready_to_merge() {
1587 osd->clear_ready_to_merge(this);
1588 }
1589
1590 void PG::queue_want_pg_temp(const vector<int> &wanted) {
1591 osd->queue_want_pg_temp(get_pgid().pgid, wanted);
1592 }
1593
1594 void PG::clear_want_pg_temp() {
1595 osd->remove_want_pg_temp(get_pgid().pgid);
1596 }
1597
1598 void PG::on_role_change() {
1599 requeue_ops(waiting_for_peered);
1600 plpg_on_role_change();
1601 }
1602
1603 void PG::on_new_interval()
1604 {
1605 projected_last_update = eversion_t();
1606 cancel_recovery();
1607
1608 assert(m_scrubber);
1609 // log some scrub data before we react to the interval
1610 dout(20) << __func__ << (is_scrub_queued_or_active() ? " scrubbing " : " ")
1611 << "flags: " << m_planned_scrub << dendl;
1612
1613 m_scrubber->on_maybe_registration_change(m_planned_scrub);
1614 }
1615
1616 epoch_t PG::oldest_stored_osdmap() {
1617 return osd->get_superblock().oldest_map;
1618 }
1619
1620 OstreamTemp PG::get_clog_info() {
1621 return osd->clog->info();
1622 }
1623
1624 OstreamTemp PG::get_clog_debug() {
1625 return osd->clog->debug();
1626 }
1627
1628 OstreamTemp PG::get_clog_error() {
1629 return osd->clog->error();
1630 }
1631
1632 void PG::schedule_event_after(
1633 PGPeeringEventRef event,
1634 float delay) {
1635 std::lock_guard lock(osd->recovery_request_lock);
1636 osd->recovery_request_timer.add_event_after(
1637 delay,
1638 new QueuePeeringEvt(
1639 this,
1640 std::move(event)));
1641 }
1642
1643 void PG::request_local_background_io_reservation(
1644 unsigned priority,
1645 PGPeeringEventURef on_grant,
1646 PGPeeringEventURef on_preempt) {
1647 osd->local_reserver.request_reservation(
1648 pg_id,
1649 on_grant ? new QueuePeeringEvt(
1650 this, std::move(on_grant)) : nullptr,
1651 priority,
1652 on_preempt ? new QueuePeeringEvt(
1653 this, std::move(on_preempt)) : nullptr);
1654 }
1655
1656 void PG::update_local_background_io_priority(
1657 unsigned priority) {
1658 osd->local_reserver.update_priority(
1659 pg_id,
1660 priority);
1661 }
1662
1663 void PG::cancel_local_background_io_reservation() {
1664 osd->local_reserver.cancel_reservation(
1665 pg_id);
1666 }
1667
1668 void PG::request_remote_recovery_reservation(
1669 unsigned priority,
1670 PGPeeringEventURef on_grant,
1671 PGPeeringEventURef on_preempt) {
1672 osd->remote_reserver.request_reservation(
1673 pg_id,
1674 on_grant ? new QueuePeeringEvt(
1675 this, std::move(on_grant)) : nullptr,
1676 priority,
1677 on_preempt ? new QueuePeeringEvt(
1678 this, std::move(on_preempt)) : nullptr);
1679 }
1680
1681 void PG::cancel_remote_recovery_reservation() {
1682 osd->remote_reserver.cancel_reservation(
1683 pg_id);
1684 }
1685
1686 void PG::schedule_event_on_commit(
1687 ObjectStore::Transaction &t,
1688 PGPeeringEventRef on_commit)
1689 {
1690 t.register_on_commit(new QueuePeeringEvt(this, on_commit));
1691 }
1692
1693 void PG::on_activate(interval_set<snapid_t> snaps)
1694 {
1695 ceph_assert(!m_scrubber->are_callbacks_pending());
1696 ceph_assert(callbacks_for_degraded_object.empty());
1697 snap_trimq = snaps;
1698 release_pg_backoffs();
1699 projected_last_update = info.last_update;
1700 }
1701
1702 void PG::on_active_exit()
1703 {
1704 backfill_reserving = false;
1705 agent_stop();
1706 }
1707
1708 void PG::on_active_advmap(const OSDMapRef &osdmap)
1709 {
1710 const auto& new_removed_snaps = osdmap->get_new_removed_snaps();
1711 auto i = new_removed_snaps.find(get_pgid().pool());
1712 if (i != new_removed_snaps.end()) {
1713 bool bad = false;
1714 for (auto j : i->second) {
1715 if (snap_trimq.intersects(j.first, j.second)) {
1716 decltype(snap_trimq) added, overlap;
1717 added.insert(j.first, j.second);
1718 overlap.intersection_of(snap_trimq, added);
1719 derr << __func__ << " removed_snaps already contains "
1720 << overlap << dendl;
1721 bad = true;
1722 snap_trimq.union_of(added);
1723 } else {
1724 snap_trimq.insert(j.first, j.second);
1725 }
1726 }
1727 dout(10) << __func__ << " new removed_snaps " << i->second
1728 << ", snap_trimq now " << snap_trimq << dendl;
1729 ceph_assert(!bad || !cct->_conf->osd_debug_verify_cached_snaps);
1730 }
1731
1732 const auto& new_purged_snaps = osdmap->get_new_purged_snaps();
1733 auto j = new_purged_snaps.find(get_pgid().pgid.pool());
1734 if (j != new_purged_snaps.end()) {
1735 bool bad = false;
1736 for (auto k : j->second) {
1737 if (!recovery_state.get_info().purged_snaps.contains(k.first, k.second)) {
1738 interval_set<snapid_t> rm, overlap;
1739 rm.insert(k.first, k.second);
1740 overlap.intersection_of(recovery_state.get_info().purged_snaps, rm);
1741 derr << __func__ << " purged_snaps does not contain "
1742 << rm << ", only " << overlap << dendl;
1743 recovery_state.adjust_purged_snaps(
1744 [&overlap](auto &purged_snaps) {
1745 purged_snaps.subtract(overlap);
1746 });
1747 // This can currently happen in the normal (if unlikely) course of
1748 // events. Because adding snaps to purged_snaps does not increase
1749 // the pg version or add a pg log entry, we don't reliably propagate
1750 // purged_snaps additions to other OSDs.
1751 // One example:
1752 // - purge S
1753 // - primary and replicas update purged_snaps
1754 // - no object updates
1755 // - pg mapping changes, new primary on different node
1756 // - new primary pg version == eversion_t(), so info is not
1757 // propagated.
1758 //bad = true;
1759 } else {
1760 recovery_state.adjust_purged_snaps(
1761 [&k](auto &purged_snaps) {
1762 purged_snaps.erase(k.first, k.second);
1763 });
1764 }
1765 }
1766 dout(10) << __func__ << " new purged_snaps " << j->second
1767 << ", now " << recovery_state.get_info().purged_snaps << dendl;
1768 ceph_assert(!bad || !cct->_conf->osd_debug_verify_cached_snaps);
1769 }
1770 }
1771
1772 void PG::queue_snap_retrim(snapid_t snap)
1773 {
1774 if (!is_active() ||
1775 !is_primary()) {
1776 dout(10) << __func__ << " snap " << snap << " - not active and primary"
1777 << dendl;
1778 return;
1779 }
1780 if (!snap_trimq.contains(snap)) {
1781 snap_trimq.insert(snap);
1782 snap_trimq_repeat.insert(snap);
1783 dout(20) << __func__ << " snap " << snap
1784 << ", trimq now " << snap_trimq
1785 << ", repeat " << snap_trimq_repeat << dendl;
1786 kick_snap_trim();
1787 } else {
1788 dout(20) << __func__ << " snap " << snap
1789 << " already in trimq " << snap_trimq << dendl;
1790 }
1791 }
1792
1793 void PG::on_active_actmap()
1794 {
1795 if (cct->_conf->osd_check_for_log_corruption)
1796 check_log_for_corruption(osd->store);
1797
1798
1799 if (recovery_state.is_active()) {
1800 dout(10) << "Active: kicking snap trim" << dendl;
1801 kick_snap_trim();
1802 }
1803
1804 if (recovery_state.is_peered() &&
1805 !recovery_state.is_clean() &&
1806 !recovery_state.get_osdmap()->test_flag(CEPH_OSDMAP_NOBACKFILL) &&
1807 (!recovery_state.get_osdmap()->test_flag(CEPH_OSDMAP_NOREBALANCE) ||
1808 recovery_state.is_degraded())) {
1809 queue_recovery();
1810 }
1811 }
1812
1813 void PG::on_backfill_reserved()
1814 {
1815 backfill_reserving = false;
1816 queue_recovery();
1817 }
1818
1819 void PG::on_backfill_canceled()
1820 {
1821 if (!waiting_on_backfill.empty()) {
1822 waiting_on_backfill.clear();
1823 finish_recovery_op(hobject_t::get_max());
1824 }
1825 }
1826
1827 void PG::on_recovery_reserved()
1828 {
1829 queue_recovery();
1830 }
1831
1832 void PG::set_not_ready_to_merge_target(pg_t pgid, pg_t src)
1833 {
1834 osd->set_not_ready_to_merge_target(pgid, src);
1835 }
1836
1837 void PG::set_not_ready_to_merge_source(pg_t pgid)
1838 {
1839 osd->set_not_ready_to_merge_source(pgid);
1840 }
1841
1842 void PG::set_ready_to_merge_target(eversion_t lu, epoch_t les, epoch_t lec)
1843 {
1844 osd->set_ready_to_merge_target(this, lu, les, lec);
1845 }
1846
1847 void PG::set_ready_to_merge_source(eversion_t lu)
1848 {
1849 osd->set_ready_to_merge_source(this, lu);
1850 }
1851
1852 void PG::send_pg_created(pg_t pgid)
1853 {
1854 osd->send_pg_created(pgid);
1855 }
1856
1857 ceph::signedspan PG::get_mnow()
1858 {
1859 return osd->get_mnow();
1860 }
1861
1862 HeartbeatStampsRef PG::get_hb_stamps(int peer)
1863 {
1864 return osd->get_hb_stamps(peer);
1865 }
1866
1867 void PG::schedule_renew_lease(epoch_t lpr, ceph::timespan delay)
1868 {
1869 auto spgid = info.pgid;
1870 auto o = osd;
1871 osd->mono_timer.add_event(
1872 delay,
1873 [o, lpr, spgid]() {
1874 o->queue_renew_lease(lpr, spgid);
1875 });
1876 }
1877
1878 void PG::queue_check_readable(epoch_t lpr, ceph::timespan delay)
1879 {
1880 osd->queue_check_readable(info.pgid, lpr, delay);
1881 }
1882
1883 void PG::rebuild_missing_set_with_deletes(PGLog &pglog)
1884 {
1885 pglog.rebuild_missing_set_with_deletes(
1886 osd->store,
1887 ch,
1888 recovery_state.get_info());
1889 }
1890
1891 void PG::on_activate_committed()
1892 {
1893 if (!is_primary()) {
1894 // waiters
1895 if (recovery_state.needs_flush() == 0) {
1896 requeue_ops(waiting_for_peered);
1897 } else if (!waiting_for_peered.empty()) {
1898 dout(10) << __func__ << " flushes in progress, moving "
1899 << waiting_for_peered.size() << " items to waiting_for_flush"
1900 << dendl;
1901 ceph_assert(waiting_for_flush.empty());
1902 waiting_for_flush.swap(waiting_for_peered);
1903 }
1904 }
1905 }
1906
1907 // Compute pending backfill data
1908 static int64_t pending_backfill(CephContext *cct, int64_t bf_bytes, int64_t local_bytes)
1909 {
1910 lgeneric_dout(cct, 20) << __func__ << " Adjust local usage "
1911 << (local_bytes >> 10) << "KiB"
1912 << " primary usage " << (bf_bytes >> 10)
1913 << "KiB" << dendl;
1914
1915 return std::max((int64_t)0, bf_bytes - local_bytes);
1916 }
1917
1918
1919 // We can zero the value of primary num_bytes as just an atomic.
1920 // However, setting above zero reserves space for backfill and requires
1921 // the OSDService::stat_lock which protects all OSD usage
1922 bool PG::try_reserve_recovery_space(
1923 int64_t primary_bytes, int64_t local_bytes) {
1924 // Use tentative_bacfill_full() to make sure enough
1925 // space is available to handle target bytes from primary.
1926
1927 // TODO: If we passed num_objects from primary we could account for
1928 // an estimate of the metadata overhead.
1929
1930 // TODO: If we had compressed_allocated and compressed_original from primary
1931 // we could compute compression ratio and adjust accordingly.
1932
1933 // XXX: There is no way to get omap overhead and this would only apply
1934 // to whatever possibly different partition that is storing the database.
1935
1936 // update_osd_stat() from heartbeat will do this on a new
1937 // statfs using ps->primary_bytes.
1938 uint64_t pending_adjustment = 0;
1939 if (primary_bytes) {
1940 // For erasure coded pool overestimate by a full stripe per object
1941 // because we don't know how each objected rounded to the nearest stripe
1942 if (pool.info.is_erasure()) {
1943 primary_bytes /= (int)get_pgbackend()->get_ec_data_chunk_count();
1944 primary_bytes += get_pgbackend()->get_ec_stripe_chunk_size() *
1945 info.stats.stats.sum.num_objects;
1946 local_bytes /= (int)get_pgbackend()->get_ec_data_chunk_count();
1947 local_bytes += get_pgbackend()->get_ec_stripe_chunk_size() *
1948 info.stats.stats.sum.num_objects;
1949 }
1950 pending_adjustment = pending_backfill(
1951 cct,
1952 primary_bytes,
1953 local_bytes);
1954 dout(10) << __func__ << " primary_bytes " << (primary_bytes >> 10)
1955 << "KiB"
1956 << " local " << (local_bytes >> 10) << "KiB"
1957 << " pending_adjustments " << (pending_adjustment >> 10) << "KiB"
1958 << dendl;
1959 }
1960
1961 // This lock protects not only the stats OSDService but also setting the
1962 // pg primary_bytes. That's why we don't immediately unlock
1963 std::lock_guard l{osd->stat_lock};
1964 osd_stat_t cur_stat = osd->osd_stat;
1965 if (cct->_conf->osd_debug_reject_backfill_probability > 0 &&
1966 (rand()%1000 < (cct->_conf->osd_debug_reject_backfill_probability*1000.0))) {
1967 dout(10) << "backfill reservation rejected: failure injection"
1968 << dendl;
1969 return false;
1970 } else if (!cct->_conf->osd_debug_skip_full_check_in_backfill_reservation &&
1971 osd->tentative_backfill_full(this, pending_adjustment, cur_stat)) {
1972 dout(10) << "backfill reservation rejected: backfill full"
1973 << dendl;
1974 return false;
1975 } else {
1976 // Don't reserve space if skipped reservation check, this is used
1977 // to test the other backfill full check AND in case a corruption
1978 // of num_bytes requires ignoring that value and trying the
1979 // backfill anyway.
1980 if (primary_bytes &&
1981 !cct->_conf->osd_debug_skip_full_check_in_backfill_reservation) {
1982 primary_num_bytes.store(primary_bytes);
1983 local_num_bytes.store(local_bytes);
1984 } else {
1985 unreserve_recovery_space();
1986 }
1987 return true;
1988 }
1989 }
1990
1991 void PG::unreserve_recovery_space() {
1992 primary_num_bytes.store(0);
1993 local_num_bytes.store(0);
1994 }
1995
1996 void PG::_scan_rollback_obs(const vector<ghobject_t> &rollback_obs)
1997 {
1998 ObjectStore::Transaction t;
1999 eversion_t trimmed_to = recovery_state.get_last_rollback_info_trimmed_to_applied();
2000 for (vector<ghobject_t>::const_iterator i = rollback_obs.begin();
2001 i != rollback_obs.end();
2002 ++i) {
2003 if (i->generation < trimmed_to.version) {
2004 dout(10) << __func__ << "osd." << osd->whoami
2005 << " pg " << info.pgid
2006 << " found obsolete rollback obj "
2007 << *i << " generation < trimmed_to "
2008 << trimmed_to
2009 << "...repaired" << dendl;
2010 t.remove(coll, *i);
2011 }
2012 }
2013 if (!t.empty()) {
2014 derr << __func__ << ": queueing trans to clean up obsolete rollback objs"
2015 << dendl;
2016 osd->store->queue_transaction(ch, std::move(t), NULL);
2017 }
2018 }
2019
2020
2021 void PG::_repair_oinfo_oid(ScrubMap &smap)
2022 {
2023 for (map<hobject_t, ScrubMap::object>::reverse_iterator i = smap.objects.rbegin();
2024 i != smap.objects.rend();
2025 ++i) {
2026 const hobject_t &hoid = i->first;
2027 ScrubMap::object &o = i->second;
2028
2029 bufferlist bl;
2030 if (o.attrs.find(OI_ATTR) == o.attrs.end()) {
2031 continue;
2032 }
2033 bl.push_back(o.attrs[OI_ATTR]);
2034 object_info_t oi;
2035 try {
2036 oi.decode(bl);
2037 } catch(...) {
2038 continue;
2039 }
2040 if (oi.soid != hoid) {
2041 ObjectStore::Transaction t;
2042 OSDriver::OSTransaction _t(osdriver.get_transaction(&t));
2043 osd->clog->error() << "osd." << osd->whoami
2044 << " found object info error on pg "
2045 << info.pgid
2046 << " oid " << hoid << " oid in object info: "
2047 << oi.soid
2048 << "...repaired";
2049 // Fix object info
2050 oi.soid = hoid;
2051 bl.clear();
2052 encode(oi, bl, get_osdmap()->get_features(CEPH_ENTITY_TYPE_OSD, nullptr));
2053
2054 bufferptr bp(bl.c_str(), bl.length());
2055 o.attrs[OI_ATTR] = bp;
2056
2057 t.setattr(coll, ghobject_t(hoid), OI_ATTR, bl);
2058 int r = osd->store->queue_transaction(ch, std::move(t));
2059 if (r != 0) {
2060 derr << __func__ << ": queue_transaction got " << cpp_strerror(r)
2061 << dendl;
2062 }
2063 }
2064 }
2065 }
2066
2067 void PG::repair_object(
2068 const hobject_t &soid,
2069 const list<pair<ScrubMap::object, pg_shard_t> > &ok_peers,
2070 const set<pg_shard_t> &bad_peers)
2071 {
2072 set<pg_shard_t> ok_shards;
2073 for (auto &&peer: ok_peers) ok_shards.insert(peer.second);
2074
2075 dout(10) << "repair_object " << soid
2076 << " bad_peers osd.{" << bad_peers << "},"
2077 << " ok_peers osd.{" << ok_shards << "}" << dendl;
2078
2079 const ScrubMap::object &po = ok_peers.back().first;
2080 eversion_t v;
2081 object_info_t oi;
2082 try {
2083 bufferlist bv;
2084 if (po.attrs.count(OI_ATTR)) {
2085 bv.push_back(po.attrs.find(OI_ATTR)->second);
2086 }
2087 auto bliter = bv.cbegin();
2088 decode(oi, bliter);
2089 } catch (...) {
2090 dout(0) << __func__ << ": Need version of replica, bad object_info_t: "
2091 << soid << dendl;
2092 ceph_abort();
2093 }
2094
2095 if (bad_peers.count(get_primary())) {
2096 // We should only be scrubbing if the PG is clean.
2097 ceph_assert(waiting_for_unreadable_object.empty());
2098 dout(10) << __func__ << ": primary = " << get_primary() << dendl;
2099 }
2100
2101 /* No need to pass ok_peers, they must not be missing the object, so
2102 * force_object_missing will add them to missing_loc anyway */
2103 recovery_state.force_object_missing(bad_peers, soid, oi.version);
2104 }
2105
2106 void PG::forward_scrub_event(ScrubAPI fn, epoch_t epoch_queued, std::string_view desc)
2107 {
2108 dout(20) << __func__ << ": " << desc << " queued at: " << epoch_queued << dendl;
2109 ceph_assert(m_scrubber);
2110 if (is_active()) {
2111 ((*m_scrubber).*fn)(epoch_queued);
2112 } else {
2113 // pg might be in the process of being deleted
2114 dout(5) << __func__ << " refusing to forward. " << (is_clean() ? "(clean) " : "(not clean) ") <<
2115 (is_active() ? "(active) " : "(not active) ") << dendl;
2116 }
2117 }
2118
2119 void PG::forward_scrub_event(ScrubSafeAPI fn,
2120 epoch_t epoch_queued,
2121 Scrub::act_token_t act_token,
2122 std::string_view desc)
2123 {
2124 dout(20) << __func__ << ": " << desc << " queued: " << epoch_queued
2125 << " token: " << act_token << dendl;
2126 ceph_assert(m_scrubber);
2127 if (is_active()) {
2128 ((*m_scrubber).*fn)(epoch_queued, act_token);
2129 } else {
2130 // pg might be in the process of being deleted
2131 dout(5) << __func__ << " refusing to forward. "
2132 << (is_clean() ? "(clean) " : "(not clean) ")
2133 << (is_active() ? "(active) " : "(not active) ") << dendl;
2134 }
2135 }
2136
2137 void PG::replica_scrub(OpRequestRef op, ThreadPool::TPHandle& handle)
2138 {
2139 dout(10) << __func__ << " (op)" << dendl;
2140 ceph_assert(m_scrubber);
2141 m_scrubber->replica_scrub_op(op);
2142 }
2143
2144 void PG::replica_scrub(epoch_t epoch_queued,
2145 Scrub::act_token_t act_token,
2146 [[maybe_unused]] ThreadPool::TPHandle& handle)
2147 {
2148 dout(10) << __func__ << " queued at: " << epoch_queued
2149 << (is_primary() ? " (primary)" : " (replica)") << dendl;
2150 forward_scrub_event(&ScrubPgIF::send_start_replica, epoch_queued, act_token,
2151 "StartReplica/nw");
2152 }
2153
2154 bool PG::ops_blocked_by_scrub() const
2155 {
2156 return !waiting_for_scrub.empty();
2157 }
2158
2159 Scrub::scrub_prio_t PG::is_scrub_blocking_ops() const
2160 {
2161 return waiting_for_scrub.empty() ? Scrub::scrub_prio_t::low_priority
2162 : Scrub::scrub_prio_t::high_priority;
2163 }
2164
2165 bool PG::old_peering_msg(epoch_t reply_epoch, epoch_t query_epoch)
2166 {
2167 if (auto last_reset = get_last_peering_reset();
2168 last_reset > reply_epoch || last_reset > query_epoch) {
2169 dout(10) << "old_peering_msg reply_epoch " << reply_epoch << " query_epoch "
2170 << query_epoch << " last_peering_reset " << last_reset << dendl;
2171 return true;
2172 }
2173 return false;
2174 }
2175
2176 struct FlushState {
2177 PGRef pg;
2178 epoch_t epoch;
2179 FlushState(PG *pg, epoch_t epoch) : pg(pg), epoch(epoch) {}
2180 ~FlushState() {
2181 std::scoped_lock l{*pg};
2182 if (!pg->pg_has_reset_since(epoch)) {
2183 pg->recovery_state.complete_flush();
2184 }
2185 }
2186 };
2187 typedef std::shared_ptr<FlushState> FlushStateRef;
2188
2189 void PG::start_flush_on_transaction(ObjectStore::Transaction &t)
2190 {
2191 // flush in progress ops
2192 FlushStateRef flush_trigger (std::make_shared<FlushState>(
2193 this, get_osdmap_epoch()));
2194 t.register_on_applied(new ContainerContext<FlushStateRef>(flush_trigger));
2195 t.register_on_commit(new ContainerContext<FlushStateRef>(flush_trigger));
2196 }
2197
2198 bool PG::try_flush_or_schedule_async()
2199 {
2200 Context *c = new QueuePeeringEvt(
2201 this, get_osdmap_epoch(), PeeringState::IntervalFlush());
2202 if (!ch->flush_commit(c)) {
2203 return false;
2204 } else {
2205 delete c;
2206 return true;
2207 }
2208 }
2209
2210 ostream& operator<<(ostream& out, const PG& pg)
2211 {
2212 out << pg.recovery_state;
2213
2214 // listing all scrub-related flags - both current and "planned next scrub"
2215 if (pg.is_scrubbing()) {
2216 out << *pg.m_scrubber;
2217 }
2218 out << pg.m_planned_scrub;
2219
2220 if (pg.recovery_ops_active)
2221 out << " rops=" << pg.recovery_ops_active;
2222
2223 //out << " (" << pg.pg_log.get_tail() << "," << pg.pg_log.get_head() << "]";
2224 if (pg.recovery_state.have_missing()) {
2225 out << " m=" << pg.recovery_state.get_num_missing();
2226 if (pg.is_primary()) {
2227 uint64_t unfound = pg.recovery_state.get_num_unfound();
2228 if (unfound)
2229 out << " u=" << unfound;
2230 }
2231 }
2232 if (!pg.is_clean()) {
2233 out << " mbc=" << pg.recovery_state.get_missing_by_count();
2234 }
2235 if (!pg.snap_trimq.empty()) {
2236 out << " trimq=";
2237 // only show a count if the set is large
2238 if (pg.snap_trimq.num_intervals() > 16) {
2239 out << pg.snap_trimq.size();
2240 if (!pg.snap_trimq_repeat.empty()) {
2241 out << "(" << pg.snap_trimq_repeat.size() << ")";
2242 }
2243 } else {
2244 out << pg.snap_trimq;
2245 if (!pg.snap_trimq_repeat.empty()) {
2246 out << "(" << pg.snap_trimq_repeat << ")";
2247 }
2248 }
2249 }
2250 if (!pg.recovery_state.get_info().purged_snaps.empty()) {
2251 out << " ps="; // snap trim queue / purged snaps
2252 if (pg.recovery_state.get_info().purged_snaps.num_intervals() > 16) {
2253 out << pg.recovery_state.get_info().purged_snaps.size();
2254 } else {
2255 out << pg.recovery_state.get_info().purged_snaps;
2256 }
2257 }
2258
2259 out << "]";
2260 return out;
2261 }
2262
2263 bool PG::can_discard_op(OpRequestRef& op)
2264 {
2265 auto m = op->get_req<MOSDOp>();
2266 if (cct->_conf->osd_discard_disconnected_ops && OSD::op_is_discardable(m)) {
2267 dout(20) << " discard " << *m << dendl;
2268 return true;
2269 }
2270
2271 if (m->get_map_epoch() < info.history.same_primary_since) {
2272 dout(7) << " changed after " << m->get_map_epoch()
2273 << ", dropping " << *m << dendl;
2274 return true;
2275 }
2276
2277 if ((m->get_flags() & (CEPH_OSD_FLAG_BALANCE_READS |
2278 CEPH_OSD_FLAG_LOCALIZE_READS)) &&
2279 !is_primary() &&
2280 m->get_map_epoch() < info.history.same_interval_since) {
2281 // Note: the Objecter will resend on interval change without the primary
2282 // changing if it actually sent to a replica. If the primary hasn't
2283 // changed since the send epoch, we got it, and we're primary, it won't
2284 // have resent even if the interval did change as it sent it to the primary
2285 // (us).
2286 return true;
2287 }
2288
2289
2290 if (m->get_connection()->has_feature(CEPH_FEATURE_RESEND_ON_SPLIT)) {
2291 // >= luminous client
2292 if (m->get_connection()->has_feature(CEPH_FEATURE_SERVER_NAUTILUS)) {
2293 // >= nautilus client
2294 if (m->get_map_epoch() < pool.info.get_last_force_op_resend()) {
2295 dout(7) << __func__ << " sent before last_force_op_resend "
2296 << pool.info.last_force_op_resend
2297 << ", dropping" << *m << dendl;
2298 return true;
2299 }
2300 } else {
2301 // == < nautilus client (luminous or mimic)
2302 if (m->get_map_epoch() < pool.info.get_last_force_op_resend_prenautilus()) {
2303 dout(7) << __func__ << " sent before last_force_op_resend_prenautilus "
2304 << pool.info.last_force_op_resend_prenautilus
2305 << ", dropping" << *m << dendl;
2306 return true;
2307 }
2308 }
2309 if (m->get_map_epoch() < info.history.last_epoch_split) {
2310 dout(7) << __func__ << " pg split in "
2311 << info.history.last_epoch_split << ", dropping" << dendl;
2312 return true;
2313 }
2314 } else if (m->get_connection()->has_feature(CEPH_FEATURE_OSD_POOLRESEND)) {
2315 // < luminous client
2316 if (m->get_map_epoch() < pool.info.get_last_force_op_resend_preluminous()) {
2317 dout(7) << __func__ << " sent before last_force_op_resend_preluminous "
2318 << pool.info.last_force_op_resend_preluminous
2319 << ", dropping" << *m << dendl;
2320 return true;
2321 }
2322 }
2323
2324 return false;
2325 }
2326
2327 template<typename T, int MSGTYPE>
2328 bool PG::can_discard_replica_op(OpRequestRef& op)
2329 {
2330 auto m = op->get_req<T>();
2331 ceph_assert(m->get_type() == MSGTYPE);
2332
2333 int from = m->get_source().num();
2334
2335 // if a repop is replied after a replica goes down in a new osdmap, and
2336 // before the pg advances to this new osdmap, the repop replies before this
2337 // repop can be discarded by that replica OSD, because the primary resets the
2338 // connection to it when handling the new osdmap marking it down, and also
2339 // resets the messenger sesssion when the replica reconnects. to avoid the
2340 // out-of-order replies, the messages from that replica should be discarded.
2341 OSDMapRef next_map = osd->get_next_osdmap();
2342 if (next_map->is_down(from)) {
2343 dout(20) << " " << __func__ << " dead for nextmap is down " << from << dendl;
2344 return true;
2345 }
2346 /* Mostly, this overlaps with the old_peering_msg
2347 * condition. An important exception is pushes
2348 * sent by replicas not in the acting set, since
2349 * if such a replica goes down it does not cause
2350 * a new interval. */
2351 if (next_map->get_down_at(from) >= m->map_epoch) {
2352 dout(20) << " " << __func__ << " dead for 'get_down_at' " << from << dendl;
2353 return true;
2354 }
2355
2356 // same pg?
2357 // if pg changes _at all_, we reset and repeer!
2358 if (old_peering_msg(m->map_epoch, m->map_epoch)) {
2359 dout(10) << "can_discard_replica_op pg changed " << info.history
2360 << " after " << m->map_epoch
2361 << ", dropping" << dendl;
2362 return true;
2363 }
2364 return false;
2365 }
2366
2367 bool PG::can_discard_scan(OpRequestRef op)
2368 {
2369 auto m = op->get_req<MOSDPGScan>();
2370 ceph_assert(m->get_type() == MSG_OSD_PG_SCAN);
2371
2372 if (old_peering_msg(m->map_epoch, m->query_epoch)) {
2373 dout(10) << " got old scan, ignoring" << dendl;
2374 return true;
2375 }
2376 return false;
2377 }
2378
2379 bool PG::can_discard_backfill(OpRequestRef op)
2380 {
2381 auto m = op->get_req<MOSDPGBackfill>();
2382 ceph_assert(m->get_type() == MSG_OSD_PG_BACKFILL);
2383
2384 if (old_peering_msg(m->map_epoch, m->query_epoch)) {
2385 dout(10) << " got old backfill, ignoring" << dendl;
2386 return true;
2387 }
2388
2389 return false;
2390
2391 }
2392
2393 bool PG::can_discard_request(OpRequestRef& op)
2394 {
2395 switch (op->get_req()->get_type()) {
2396 case CEPH_MSG_OSD_OP:
2397 return can_discard_op(op);
2398 case CEPH_MSG_OSD_BACKOFF:
2399 return false; // never discard
2400 case MSG_OSD_REPOP:
2401 return can_discard_replica_op<MOSDRepOp, MSG_OSD_REPOP>(op);
2402 case MSG_OSD_PG_PUSH:
2403 return can_discard_replica_op<MOSDPGPush, MSG_OSD_PG_PUSH>(op);
2404 case MSG_OSD_PG_PULL:
2405 return can_discard_replica_op<MOSDPGPull, MSG_OSD_PG_PULL>(op);
2406 case MSG_OSD_PG_PUSH_REPLY:
2407 return can_discard_replica_op<MOSDPGPushReply, MSG_OSD_PG_PUSH_REPLY>(op);
2408 case MSG_OSD_REPOPREPLY:
2409 return can_discard_replica_op<MOSDRepOpReply, MSG_OSD_REPOPREPLY>(op);
2410 case MSG_OSD_PG_RECOVERY_DELETE:
2411 return can_discard_replica_op<MOSDPGRecoveryDelete, MSG_OSD_PG_RECOVERY_DELETE>(op);
2412
2413 case MSG_OSD_PG_RECOVERY_DELETE_REPLY:
2414 return can_discard_replica_op<MOSDPGRecoveryDeleteReply, MSG_OSD_PG_RECOVERY_DELETE_REPLY>(op);
2415
2416 case MSG_OSD_EC_WRITE:
2417 return can_discard_replica_op<MOSDECSubOpWrite, MSG_OSD_EC_WRITE>(op);
2418 case MSG_OSD_EC_WRITE_REPLY:
2419 return can_discard_replica_op<MOSDECSubOpWriteReply, MSG_OSD_EC_WRITE_REPLY>(op);
2420 case MSG_OSD_EC_READ:
2421 return can_discard_replica_op<MOSDECSubOpRead, MSG_OSD_EC_READ>(op);
2422 case MSG_OSD_EC_READ_REPLY:
2423 return can_discard_replica_op<MOSDECSubOpReadReply, MSG_OSD_EC_READ_REPLY>(op);
2424 case MSG_OSD_REP_SCRUB:
2425 return can_discard_replica_op<MOSDRepScrub, MSG_OSD_REP_SCRUB>(op);
2426 case MSG_OSD_SCRUB_RESERVE:
2427 return can_discard_replica_op<MOSDScrubReserve, MSG_OSD_SCRUB_RESERVE>(op);
2428 case MSG_OSD_REP_SCRUBMAP:
2429 return can_discard_replica_op<MOSDRepScrubMap, MSG_OSD_REP_SCRUBMAP>(op);
2430 case MSG_OSD_PG_UPDATE_LOG_MISSING:
2431 return can_discard_replica_op<
2432 MOSDPGUpdateLogMissing, MSG_OSD_PG_UPDATE_LOG_MISSING>(op);
2433 case MSG_OSD_PG_UPDATE_LOG_MISSING_REPLY:
2434 return can_discard_replica_op<
2435 MOSDPGUpdateLogMissingReply, MSG_OSD_PG_UPDATE_LOG_MISSING_REPLY>(op);
2436
2437 case MSG_OSD_PG_SCAN:
2438 return can_discard_scan(op);
2439 case MSG_OSD_PG_BACKFILL:
2440 return can_discard_backfill(op);
2441 case MSG_OSD_PG_BACKFILL_REMOVE:
2442 return can_discard_replica_op<MOSDPGBackfillRemove,
2443 MSG_OSD_PG_BACKFILL_REMOVE>(op);
2444 }
2445 return true;
2446 }
2447
2448 void PG::do_peering_event(PGPeeringEventRef evt, PeeringCtx &rctx)
2449 {
2450 dout(10) << __func__ << ": " << evt->get_desc() << dendl;
2451 ceph_assert(have_same_or_newer_map(evt->get_epoch_sent()));
2452 if (old_peering_evt(evt)) {
2453 dout(10) << "discard old " << evt->get_desc() << dendl;
2454 } else {
2455 recovery_state.handle_event(evt, &rctx);
2456 }
2457 // write_if_dirty regardless of path above to ensure we capture any work
2458 // done by OSD::advance_pg().
2459 write_if_dirty(rctx.transaction);
2460 }
2461
2462 void PG::queue_peering_event(PGPeeringEventRef evt)
2463 {
2464 if (old_peering_evt(evt))
2465 return;
2466 osd->osd->enqueue_peering_evt(info.pgid, evt);
2467 }
2468
2469 void PG::queue_null(epoch_t msg_epoch,
2470 epoch_t query_epoch)
2471 {
2472 dout(10) << "null" << dendl;
2473 queue_peering_event(
2474 PGPeeringEventRef(std::make_shared<PGPeeringEvent>(msg_epoch, query_epoch,
2475 NullEvt())));
2476 }
2477
2478 void PG::find_unfound(epoch_t queued, PeeringCtx &rctx)
2479 {
2480 /*
2481 * if we couldn't start any recovery ops and things are still
2482 * unfound, see if we can discover more missing object locations.
2483 * It may be that our initial locations were bad and we errored
2484 * out while trying to pull.
2485 */
2486 if (!recovery_state.discover_all_missing(rctx)) {
2487 string action;
2488 if (state_test(PG_STATE_BACKFILLING)) {
2489 auto evt = PGPeeringEventRef(
2490 new PGPeeringEvent(
2491 queued,
2492 queued,
2493 PeeringState::UnfoundBackfill()));
2494 queue_peering_event(evt);
2495 action = "in backfill";
2496 } else if (state_test(PG_STATE_RECOVERING)) {
2497 auto evt = PGPeeringEventRef(
2498 new PGPeeringEvent(
2499 queued,
2500 queued,
2501 PeeringState::UnfoundRecovery()));
2502 queue_peering_event(evt);
2503 action = "in recovery";
2504 } else {
2505 action = "already out of recovery/backfill";
2506 }
2507 dout(10) << __func__ << ": no luck, giving up on this pg for now (" << action << ")" << dendl;
2508 } else {
2509 dout(10) << __func__ << ": no luck, giving up on this pg for now (queue_recovery)" << dendl;
2510 queue_recovery();
2511 }
2512 }
2513
2514 void PG::handle_advance_map(
2515 OSDMapRef osdmap, OSDMapRef lastmap,
2516 vector<int>& newup, int up_primary,
2517 vector<int>& newacting, int acting_primary,
2518 PeeringCtx &rctx)
2519 {
2520 dout(10) << __func__ << ": " << osdmap->get_epoch() << dendl;
2521 osd_shard->update_pg_epoch(pg_slot, osdmap->get_epoch());
2522 recovery_state.advance_map(
2523 osdmap,
2524 lastmap,
2525 newup,
2526 up_primary,
2527 newacting,
2528 acting_primary,
2529 rctx);
2530 }
2531
2532 void PG::handle_activate_map(PeeringCtx &rctx)
2533 {
2534 dout(10) << __func__ << ": " << get_osdmap()->get_epoch()
2535 << dendl;
2536 recovery_state.activate_map(rctx);
2537
2538 requeue_map_waiters();
2539 }
2540
2541 void PG::handle_initialize(PeeringCtx &rctx)
2542 {
2543 dout(10) << __func__ << dendl;
2544 PeeringState::Initialize evt;
2545 recovery_state.handle_event(evt, &rctx);
2546 }
2547
2548
2549 void PG::handle_query_state(Formatter *f)
2550 {
2551 dout(10) << "handle_query_state" << dendl;
2552 PeeringState::QueryState q(f);
2553 recovery_state.handle_event(q, 0);
2554 }
2555
2556 void PG::init_collection_pool_opts()
2557 {
2558 auto r = osd->store->set_collection_opts(ch, pool.info.opts);
2559 if (r < 0 && r != -EOPNOTSUPP) {
2560 derr << __func__ << " set_collection_opts returns error:" << r << dendl;
2561 }
2562 }
2563
2564 void PG::on_pool_change()
2565 {
2566 init_collection_pool_opts();
2567 plpg_on_pool_change();
2568 }
2569
2570 void PG::C_DeleteMore::complete(int r) {
2571 ceph_assert(r == 0);
2572 pg->lock();
2573 if (!pg->pg_has_reset_since(epoch)) {
2574 pg->osd->queue_for_pg_delete(pg->get_pgid(), epoch);
2575 }
2576 pg->unlock();
2577 delete this;
2578 }
2579
2580 std::pair<ghobject_t, bool> PG::do_delete_work(
2581 ObjectStore::Transaction &t,
2582 ghobject_t _next)
2583 {
2584 dout(10) << __func__ << dendl;
2585
2586 {
2587 float osd_delete_sleep = osd->osd->get_osd_delete_sleep();
2588 if (osd_delete_sleep > 0 && delete_needs_sleep) {
2589 epoch_t e = get_osdmap()->get_epoch();
2590 PGRef pgref(this);
2591 auto delete_requeue_callback = new LambdaContext([this, pgref, e](int r) {
2592 dout(20) << "do_delete_work() [cb] wake up at "
2593 << ceph_clock_now()
2594 << ", re-queuing delete" << dendl;
2595 std::scoped_lock locker{*this};
2596 delete_needs_sleep = false;
2597 if (!pg_has_reset_since(e)) {
2598 osd->queue_for_pg_delete(get_pgid(), e);
2599 }
2600 });
2601
2602 auto delete_schedule_time = ceph::real_clock::now();
2603 delete_schedule_time += ceph::make_timespan(osd_delete_sleep);
2604 std::lock_guard l{osd->sleep_lock};
2605 osd->sleep_timer.add_event_at(delete_schedule_time,
2606 delete_requeue_callback);
2607 dout(20) << __func__ << " Delete scheduled at " << delete_schedule_time << dendl;
2608 return std::make_pair(_next, true);
2609 }
2610 }
2611
2612 delete_needs_sleep = true;
2613
2614 ghobject_t next;
2615
2616 vector<ghobject_t> olist;
2617 int max = std::min(osd->store->get_ideal_list_max(),
2618 (int)cct->_conf->osd_target_transaction_size);
2619
2620 osd->store->collection_list(
2621 ch,
2622 _next,
2623 ghobject_t::get_max(),
2624 max,
2625 &olist,
2626 &next);
2627 dout(20) << __func__ << " " << olist << dendl;
2628
2629 // make sure we've removed everything
2630 // by one more listing from the beginning
2631 if (_next != ghobject_t() && olist.empty()) {
2632 next = ghobject_t();
2633 osd->store->collection_list(
2634 ch,
2635 next,
2636 ghobject_t::get_max(),
2637 max,
2638 &olist,
2639 &next);
2640 for (auto& oid : olist) {
2641 if (oid == pgmeta_oid) {
2642 dout(20) << __func__ << " removing pgmeta object " << oid << dendl;
2643 } else {
2644 dout(0) << __func__ << " additional unexpected onode"
2645 <<" new onode has appeared since PG removal started"
2646 << oid << dendl;
2647 }
2648 }
2649 }
2650
2651 OSDriver::OSTransaction _t(osdriver.get_transaction(&t));
2652 int64_t num = 0;
2653 for (auto& oid : olist) {
2654 if (oid == pgmeta_oid) {
2655 continue;
2656 }
2657 if (oid.is_pgmeta()) {
2658 osd->clog->warn() << info.pgid << " found stray pgmeta-like " << oid
2659 << " during PG removal";
2660 }
2661 int r = snap_mapper.remove_oid(oid.hobj, &_t);
2662 if (r != 0 && r != -ENOENT) {
2663 ceph_abort();
2664 }
2665 t.remove(coll, oid);
2666 ++num;
2667 }
2668 bool running = true;
2669 if (num) {
2670 dout(20) << __func__ << " deleting " << num << " objects" << dendl;
2671 Context *fin = new C_DeleteMore(this, get_osdmap_epoch());
2672 t.register_on_commit(fin);
2673 } else {
2674 if (cct->_conf->osd_inject_failure_on_pg_removal) {
2675 _exit(1);
2676 }
2677
2678 // final flush here to ensure completions drop refs. Of particular concern
2679 // are the SnapMapper ContainerContexts.
2680 {
2681 PGRef pgref(this);
2682 PGLog::clear_info_log(info.pgid, &t);
2683 t.remove_collection(coll);
2684 t.register_on_commit(new ContainerContext<PGRef>(pgref));
2685 t.register_on_applied(new ContainerContext<PGRef>(pgref));
2686 osd->store->queue_transaction(ch, std::move(t));
2687 }
2688 ch->flush();
2689
2690 if (!osd->try_finish_pg_delete(this, pool.info.get_pg_num())) {
2691 dout(1) << __func__ << " raced with merge, reinstantiating" << dendl;
2692 ch = osd->store->create_new_collection(coll);
2693 create_pg_collection(t,
2694 info.pgid,
2695 info.pgid.get_split_bits(pool.info.get_pg_num()));
2696 init_pg_ondisk(t, info.pgid, &pool.info);
2697 recovery_state.reset_last_persisted();
2698 } else {
2699 recovery_state.set_delete_complete();
2700
2701 // cancel reserver here, since the PG is about to get deleted and the
2702 // exit() methods don't run when that happens.
2703 osd->local_reserver.cancel_reservation(info.pgid);
2704
2705 running = false;
2706 }
2707 }
2708 return {next, running};
2709 }
2710
2711 int PG::pg_stat_adjust(osd_stat_t *ns)
2712 {
2713 osd_stat_t &new_stat = *ns;
2714 if (is_primary()) {
2715 return 0;
2716 }
2717 // Adjust the kb_used by adding pending backfill data
2718 uint64_t reserved_num_bytes = get_reserved_num_bytes();
2719
2720 // For now we don't consider projected space gains here
2721 // I suggest we have an optional 2 pass backfill that frees up
2722 // space in a first pass. This could be triggered when at nearfull
2723 // or near to backfillfull.
2724 if (reserved_num_bytes > 0) {
2725 // TODO: Handle compression by adjusting by the PGs average
2726 // compression precentage.
2727 dout(20) << __func__ << " reserved_num_bytes " << (reserved_num_bytes >> 10) << "KiB"
2728 << " Before kb_used " << new_stat.statfs.kb_used() << "KiB" << dendl;
2729 if (new_stat.statfs.available > reserved_num_bytes)
2730 new_stat.statfs.available -= reserved_num_bytes;
2731 else
2732 new_stat.statfs.available = 0;
2733 dout(20) << __func__ << " After kb_used " << new_stat.statfs.kb_used() << "KiB" << dendl;
2734 return 1;
2735 }
2736 return 0;
2737 }
2738
2739 void PG::dump_pgstate_history(Formatter *f)
2740 {
2741 std::scoped_lock l{*this};
2742 recovery_state.dump_history(f);
2743 }
2744
2745 void PG::dump_missing(Formatter *f)
2746 {
2747 for (auto& i : recovery_state.get_pg_log().get_missing().get_items()) {
2748 f->open_object_section("object");
2749 f->dump_object("oid", i.first);
2750 f->dump_object("missing_info", i.second);
2751 if (recovery_state.get_missing_loc().needs_recovery(i.first)) {
2752 f->dump_bool(
2753 "unfound",
2754 recovery_state.get_missing_loc().is_unfound(i.first));
2755 f->open_array_section("locations");
2756 for (auto l : recovery_state.get_missing_loc().get_locations(i.first)) {
2757 f->dump_object("shard", l);
2758 }
2759 f->close_section();
2760 }
2761 f->close_section();
2762 }
2763 }
2764
2765 void PG::with_pg_stats(std::function<void(const pg_stat_t&, epoch_t lec)>&& f)
2766 {
2767 std::lock_guard l{pg_stats_publish_lock};
2768 if (pg_stats_publish) {
2769 f(*pg_stats_publish, pg_stats_publish->get_effective_last_epoch_clean());
2770 }
2771 }
2772
2773 void PG::with_heartbeat_peers(std::function<void(int)>&& f)
2774 {
2775 std::lock_guard l{heartbeat_peer_lock};
2776 for (auto p : heartbeat_peers) {
2777 f(p);
2778 }
2779 for (auto p : probe_targets) {
2780 f(p);
2781 }
2782 }
2783
2784 uint64_t PG::get_min_alloc_size() const {
2785 return osd->store->get_min_alloc_size();
2786 }