]> git.proxmox.com Git - ceph.git/blame - ceph/src/osd/PG.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / osd / PG.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15#include "PG.h"
7c673cae 16#include "messages/MOSDRepScrub.h"
7c673cae
FG
17
18#include "common/errno.h"
9f95a23c 19#include "common/ceph_releases.h"
7c673cae
FG
20#include "common/config.h"
21#include "OSD.h"
22#include "OpRequest.h"
20effc67
TL
23#include "osd/scrubber/ScrubStore.h"
24#include "osd/scrubber/pg_scrubber.h"
9f95a23c 25#include "osd/scheduler/OpSchedulerItem.h"
20effc67 26#include "Session.h"
7c673cae
FG
27
28#include "common/Timer.h"
29#include "common/perf_counters.h"
30
31#include "messages/MOSDOp.h"
7c673cae
FG
32#include "messages/MOSDPGScan.h"
33#include "messages/MOSDPGBackfill.h"
34#include "messages/MOSDPGBackfillRemove.h"
35#include "messages/MBackfillReserve.h"
36#include "messages/MRecoveryReserve.h"
37#include "messages/MOSDPGPush.h"
38#include "messages/MOSDPGPushReply.h"
39#include "messages/MOSDPGPull.h"
40#include "messages/MOSDECSubOpWrite.h"
41#include "messages/MOSDECSubOpWriteReply.h"
42#include "messages/MOSDECSubOpRead.h"
43#include "messages/MOSDECSubOpReadReply.h"
44#include "messages/MOSDPGUpdateLogMissing.h"
45#include "messages/MOSDPGUpdateLogMissingReply.h"
46#include "messages/MOSDBackoff.h"
47#include "messages/MOSDScrubReserve.h"
7c673cae 48#include "messages/MOSDRepOp.h"
7c673cae
FG
49#include "messages/MOSDRepOpReply.h"
50#include "messages/MOSDRepScrubMap.h"
c07f9fc5
FG
51#include "messages/MOSDPGRecoveryDelete.h"
52#include "messages/MOSDPGRecoveryDeleteReply.h"
7c673cae
FG
53
54#include "common/BackTrace.h"
55#include "common/EventTrace.h"
56
57#ifdef WITH_LTTNG
58#define TRACEPOINT_DEFINE
59#define TRACEPOINT_PROBE_DYNAMIC_LINKAGE
60#include "tracing/pg.h"
61#undef TRACEPOINT_PROBE_DYNAMIC_LINKAGE
62#undef TRACEPOINT_DEFINE
63#else
64#define tracepoint(...)
65#endif
66
67#include <sstream>
68
69#define dout_context cct
70#define dout_subsys ceph_subsys_osd
71#undef dout_prefix
72#define dout_prefix _prefix(_dout, this)
73
f67539c2
TL
74using std::list;
75using std::map;
76using std::ostringstream;
77using std::pair;
78using std::set;
79using std::string;
80using std::stringstream;
81using std::unique_ptr;
82using std::vector;
83
84using ceph::bufferlist;
85using ceph::bufferptr;
86using ceph::decode;
87using ceph::encode;
88using ceph::Formatter;
89
9f95a23c 90using namespace ceph::osd::scheduler;
7c673cae
FG
91
92template <class T>
93static ostream& _prefix(std::ostream *_dout, T *t)
94{
11fdf7f2 95 return t->gen_prefix(*_dout);
7c673cae
FG
96}
97
7c673cae
FG
98void PG::get(const char* tag)
99{
11fdf7f2
TL
100 int after = ++ref;
101 lgeneric_subdout(cct, refs, 5) << "PG::get " << this << " "
102 << "tag " << (tag ? tag : "(none") << " "
103 << (after - 1) << " -> " << after << dendl;
7c673cae 104#ifdef PG_DEBUG_REFS
11fdf7f2 105 std::lock_guard l(_ref_id_lock);
7c673cae
FG
106 _tag_counts[tag]++;
107#endif
108}
109
110void PG::put(const char* tag)
111{
112#ifdef PG_DEBUG_REFS
113 {
11fdf7f2 114 std::lock_guard l(_ref_id_lock);
7c673cae 115 auto tag_counts_entry = _tag_counts.find(tag);
11fdf7f2 116 ceph_assert(tag_counts_entry != _tag_counts.end());
7c673cae
FG
117 --tag_counts_entry->second;
118 if (tag_counts_entry->second == 0) {
119 _tag_counts.erase(tag_counts_entry);
120 }
121 }
122#endif
11fdf7f2
TL
123 auto local_cct = cct;
124 int after = --ref;
125 lgeneric_subdout(local_cct, refs, 5) << "PG::put " << this << " "
126 << "tag " << (tag ? tag : "(none") << " "
127 << (after + 1) << " -> " << after
128 << dendl;
129 if (after == 0)
7c673cae
FG
130 delete this;
131}
132
133#ifdef PG_DEBUG_REFS
134uint64_t PG::get_with_id()
135{
136 ref++;
11fdf7f2 137 std::lock_guard l(_ref_id_lock);
7c673cae 138 uint64_t id = ++_ref_id;
20effc67 139 ClibBackTrace bt(0);
7c673cae
FG
140 stringstream ss;
141 bt.print(ss);
11fdf7f2
TL
142 lgeneric_subdout(cct, refs, 5) << "PG::get " << this << " " << info.pgid
143 << " got id " << id << " "
144 << (ref - 1) << " -> " << ref
145 << dendl;
146 ceph_assert(!_live_ids.count(id));
7c673cae
FG
147 _live_ids.insert(make_pair(id, ss.str()));
148 return id;
149}
150
151void PG::put_with_id(uint64_t id)
152{
11fdf7f2
TL
153 int newref = --ref;
154 lgeneric_subdout(cct, refs, 5) << "PG::put " << this << " " << info.pgid
155 << " put id " << id << " "
156 << (newref + 1) << " -> " << newref
157 << dendl;
7c673cae 158 {
11fdf7f2
TL
159 std::lock_guard l(_ref_id_lock);
160 ceph_assert(_live_ids.count(id));
7c673cae
FG
161 _live_ids.erase(id);
162 }
11fdf7f2 163 if (newref)
7c673cae
FG
164 delete this;
165}
166
167void PG::dump_live_ids()
168{
11fdf7f2 169 std::lock_guard l(_ref_id_lock);
7c673cae
FG
170 dout(0) << "\t" << __func__ << ": " << info.pgid << " live ids:" << dendl;
171 for (map<uint64_t, string>::iterator i = _live_ids.begin();
172 i != _live_ids.end();
173 ++i) {
174 dout(0) << "\t\tid: " << *i << dendl;
175 }
176 dout(0) << "\t" << __func__ << ": " << info.pgid << " live tags:" << dendl;
177 for (map<string, uint64_t>::iterator i = _tag_counts.begin();
178 i != _tag_counts.end();
179 ++i) {
180 dout(0) << "\t\tid: " << *i << dendl;
181 }
182}
183#endif
184
7c673cae
FG
185PG::PG(OSDService *o, OSDMapRef curmap,
186 const PGPool &_pool, spg_t p) :
9f95a23c 187 pg_whoami(o->whoami, p.shard),
11fdf7f2
TL
188 pg_id(p),
189 coll(p),
7c673cae
FG
190 osd(o),
191 cct(o->cct),
192 osdriver(osd->store, coll_t(), OSD::make_snapmapper_oid()),
193 snap_mapper(
194 cct,
195 &osdriver,
196 p.ps(),
11fdf7f2 197 p.get_split_bits(_pool.info.get_pg_num()),
7c673cae
FG
198 _pool.id,
199 p.shard),
7c673cae 200 trace_endpoint("0.0.0.0", 0, "PG"),
7c673cae 201 info_struct_v(0),
7c673cae 202 pgmeta_oid(p.make_pgmeta_oid()),
7c673cae 203 stat_queue_item(this),
7c673cae
FG
204 recovery_queued(false),
205 recovery_ops_active(0),
7c673cae 206 backfill_reserving(false),
7c673cae 207 finish_sync_event(NULL),
7c673cae
FG
208 scrub_after_recovery(false),
209 active_pushes(0),
9f95a23c
TL
210 recovery_state(
211 o->cct,
212 pg_whoami,
213 p,
214 _pool,
215 curmap,
216 this,
217 this),
218 pool(recovery_state.get_pool()),
219 info(recovery_state.get_info())
7c673cae
FG
220{
221#ifdef PG_DEBUG_REFS
222 osd->add_pgid(p, this);
223#endif
224#ifdef WITH_BLKIN
225 std::stringstream ss;
226 ss << "PG " << info.pgid;
227 trace_endpoint.copy_name(ss.str());
228#endif
7c673cae
FG
229}
230
231PG::~PG()
232{
7c673cae
FG
233#ifdef PG_DEBUG_REFS
234 osd->remove_pgid(info.pgid, this);
235#endif
236}
237
7c673cae
FG
238void PG::lock(bool no_lockdep) const
239{
9f95a23c
TL
240#ifdef CEPH_DEBUG_MUTEX
241 _lock.lock(no_lockdep);
242#else
243 _lock.lock();
244 locked_by = std::this_thread::get_id();
245#endif
7c673cae 246 // if we have unrecorded dirty state with the lock dropped, there is a bug
9f95a23c 247 ceph_assert(!recovery_state.debug_has_dirty_state());
7c673cae
FG
248
249 dout(30) << "lock" << dendl;
250}
251
9f95a23c
TL
252bool PG::is_locked() const
253{
254 return ceph_mutex_is_locked(_lock);
255}
256
257void PG::unlock() const
258{
259 //generic_dout(0) << this << " " << info.pgid << " unlock" << dendl;
260 ceph_assert(!recovery_state.debug_has_dirty_state());
261#ifndef CEPH_DEBUG_MUTEX
262 locked_by = {};
263#endif
264 _lock.unlock();
265}
266
11fdf7f2 267std::ostream& PG::gen_prefix(std::ostream& out) const
7c673cae 268{
9f95a23c
TL
269 OSDMapRef mapref = recovery_state.get_osdmap();
270#ifdef CEPH_DEBUG_MUTEX
7c673cae 271 if (_lock.is_locked_by_me()) {
9f95a23c
TL
272#else
273 if (locked_by == std::this_thread::get_id()) {
274#endif
7c673cae
FG
275 out << "osd." << osd->whoami
276 << " pg_epoch: " << (mapref ? mapref->get_epoch():0)
277 << " " << *this << " ";
278 } else {
279 out << "osd." << osd->whoami
280 << " pg_epoch: " << (mapref ? mapref->get_epoch():0)
9f95a23c 281 << " pg[" << pg_id.pgid << "(unlocked)] ";
7c673cae 282 }
11fdf7f2 283 return out;
7c673cae 284}
7c673cae 285
9f95a23c
TL
286PerfCounters &PG::get_peering_perf() {
287 return *(osd->recoverystate_perf);
7c673cae
FG
288}
289
9f95a23c
TL
290PerfCounters &PG::get_perf_logger() {
291 return *(osd->logger);
292}
7c673cae 293
9f95a23c
TL
294void PG::log_state_enter(const char *state) {
295 osd->pg_recovery_stats.log_enter(state);
296}
7c673cae 297
9f95a23c
TL
298void PG::log_state_exit(
299 const char *state_name, utime_t enter_time,
300 uint64_t events, utime_t event_dur) {
301 osd->pg_recovery_stats.log_exit(
302 state_name, ceph_clock_now() - enter_time, events, event_dur);
7c673cae 303}
f67539c2 304
9f95a23c 305/********* PG **********/
7c673cae
FG
306
307void PG::remove_snap_mapped_object(
308 ObjectStore::Transaction &t, const hobject_t &soid)
309{
310 t.remove(
311 coll,
312 ghobject_t(soid, ghobject_t::NO_GEN, pg_whoami.shard));
313 clear_object_snap_mapping(&t, soid);
314}
315
316void PG::clear_object_snap_mapping(
317 ObjectStore::Transaction *t, const hobject_t &soid)
318{
319 OSDriver::OSTransaction _t(osdriver.get_transaction(t));
320 if (soid.snap < CEPH_MAXSNAP) {
321 int r = snap_mapper.remove_oid(
322 soid,
323 &_t);
324 if (!(r == 0 || r == -ENOENT)) {
325 derr << __func__ << ": remove_oid returned " << cpp_strerror(r) << dendl;
326 ceph_abort();
327 }
328 }
329}
330
331void PG::update_object_snap_mapping(
332 ObjectStore::Transaction *t, const hobject_t &soid, const set<snapid_t> &snaps)
333{
334 OSDriver::OSTransaction _t(osdriver.get_transaction(t));
11fdf7f2 335 ceph_assert(soid.snap < CEPH_MAXSNAP);
7c673cae
FG
336 int r = snap_mapper.remove_oid(
337 soid,
338 &_t);
339 if (!(r == 0 || r == -ENOENT)) {
340 derr << __func__ << ": remove_oid returned " << cpp_strerror(r) << dendl;
341 ceph_abort();
342 }
343 snap_mapper.add_oid(
344 soid,
345 snaps,
346 &_t);
347}
348
9f95a23c
TL
349/******* PG ***********/
350void PG::clear_primary_state()
7c673cae 351{
f67539c2
TL
352 dout(20) << __func__ << dendl;
353
9f95a23c 354 projected_log = PGLog::IndexedLog();
7c673cae 355
9f95a23c
TL
356 snap_trimq.clear();
357 snap_trimq_repeat.clear();
358 finish_sync_event = 0; // so that _finish_recovery doesn't go off in another thread
359 release_pg_backoffs();
7c673cae 360
f67539c2
TL
361 if (m_scrubber) {
362 m_scrubber->discard_replica_reservations();
363 }
9f95a23c
TL
364 scrub_after_recovery = false;
365
366 agent_clear();
7c673cae
FG
367}
368
91327a77 369
9f95a23c
TL
370bool PG::op_has_sufficient_caps(OpRequestRef& op)
371{
372 // only check MOSDOp
373 if (op->get_req()->get_type() != CEPH_MSG_OSD_OP)
c07f9fc5 374 return true;
9f95a23c
TL
375
376 auto req = op->get_req<MOSDOp>();
377 auto priv = req->get_connection()->get_priv();
378 auto session = static_cast<Session*>(priv.get());
379 if (!session) {
380 dout(0) << "op_has_sufficient_caps: no session for op " << *req << dendl;
c07f9fc5 381 return false;
7c673cae 382 }
9f95a23c
TL
383 OSDCap& caps = session->caps;
384 priv.reset();
7c673cae 385
9f95a23c
TL
386 const string &key = req->get_hobj().get_key().empty() ?
387 req->get_oid().name :
388 req->get_hobj().get_key();
91327a77 389
9f95a23c
TL
390 bool cap = caps.is_capable(pool.name, req->get_hobj().nspace,
391 pool.info.application_metadata,
392 key,
393 op->need_read_cap(),
394 op->need_write_cap(),
395 op->classes(),
396 session->get_peer_socket_addr());
91327a77 397
9f95a23c
TL
398 dout(20) << "op_has_sufficient_caps "
399 << "session=" << session
400 << " pool=" << pool.id << " (" << pool.name
401 << " " << req->get_hobj().nspace
402 << ")"
403 << " pool_app_metadata=" << pool.info.application_metadata
404 << " need_read_cap=" << op->need_read_cap()
405 << " need_write_cap=" << op->need_write_cap()
406 << " classes=" << op->classes()
407 << " -> " << (cap ? "yes" : "NO")
408 << dendl;
409 return cap;
7c673cae
FG
410}
411
9f95a23c 412void PG::queue_recovery()
91327a77 413{
9f95a23c
TL
414 if (!is_primary() || !is_peered()) {
415 dout(10) << "queue_recovery -- not primary or not peered " << dendl;
416 ceph_assert(!recovery_queued);
417 } else if (recovery_queued) {
418 dout(10) << "queue_recovery -- already queued" << dendl;
91327a77 419 } else {
9f95a23c
TL
420 dout(10) << "queue_recovery -- queuing" << dendl;
421 recovery_queued = true;
422 osd->queue_for_recovery(this);
91327a77
AA
423 }
424}
9f95a23c 425
f67539c2 426void PG::queue_scrub_after_repair()
7c673cae 427{
f67539c2 428 dout(10) << __func__ << dendl;
9f95a23c 429 ceph_assert(ceph_mutex_is_locked(_lock));
f67539c2
TL
430
431 m_planned_scrub.must_deep_scrub = true;
432 m_planned_scrub.check_repair = true;
433 m_planned_scrub.must_scrub = true;
434
20effc67
TL
435 if (is_scrub_queued_or_active()) {
436 dout(10) << __func__ << ": scrubbing already ("
437 << (is_scrubbing() ? "active)" : "queued)") << dendl;
f67539c2
TL
438 return;
439 }
440
441 m_scrubber->set_op_parameters(m_planned_scrub);
442 dout(15) << __func__ << ": queueing" << dendl;
443
20effc67 444 m_scrubber->set_queued_or_active();
f67539c2 445 osd->queue_scrub_after_repair(this, Scrub::scrub_prio_t::high_priority);
9f95a23c 446}
7c673cae 447
9f95a23c
TL
448unsigned PG::get_scrub_priority()
449{
450 // a higher value -> a higher priority
f67539c2
TL
451 int64_t pool_scrub_priority =
452 pool.info.opts.value_or(pool_opts_t::SCRUB_PRIORITY, (int64_t)0);
9f95a23c
TL
453 return pool_scrub_priority > 0 ? pool_scrub_priority : cct->_conf->osd_scrub_priority;
454}
7c673cae 455
9f95a23c
TL
456Context *PG::finish_recovery()
457{
458 dout(10) << "finish_recovery" << dendl;
459 ceph_assert(info.last_complete == info.last_update);
7c673cae 460
9f95a23c 461 clear_recovery_state();
92f5a8d4 462
9f95a23c
TL
463 /*
464 * sync all this before purging strays. but don't block!
465 */
466 finish_sync_event = new C_PG_FinishRecovery(this);
467 return finish_sync_event;
7c673cae
FG
468}
469
f67539c2 470void PG::_finish_recovery(Context* c)
7c673cae 471{
f67539c2
TL
472 dout(15) << __func__ << " finish_sync_event? " << finish_sync_event << " clean? "
473 << is_clean() << dendl;
474
9f95a23c
TL
475 std::scoped_lock locker{*this};
476 if (recovery_state.is_deleting() || !is_clean()) {
477 dout(10) << __func__ << " raced with delete or repair" << dendl;
478 return;
7c673cae 479 }
9f95a23c
TL
480 // When recovery is initiated by a repair, that flag is left on
481 state_clear(PG_STATE_REPAIR);
482 if (c == finish_sync_event) {
f67539c2 483 dout(15) << __func__ << " scrub_after_recovery? " << scrub_after_recovery << dendl;
9f95a23c
TL
484 finish_sync_event = 0;
485 recovery_state.purge_strays();
7c673cae 486
9f95a23c
TL
487 publish_stats_to_osd();
488
489 if (scrub_after_recovery) {
490 dout(10) << "_finish_recovery requeueing for scrub" << dendl;
491 scrub_after_recovery = false;
f67539c2 492 queue_scrub_after_repair();
7c673cae 493 }
9f95a23c
TL
494 } else {
495 dout(10) << "_finish_recovery -- stale" << dendl;
7c673cae 496 }
9f95a23c 497}
7c673cae 498
9f95a23c
TL
499void PG::start_recovery_op(const hobject_t& soid)
500{
501 dout(10) << "start_recovery_op " << soid
502#ifdef DEBUG_RECOVERY_OIDS
503 << " (" << recovering_oids << ")"
504#endif
505 << dendl;
506 ceph_assert(recovery_ops_active >= 0);
507 recovery_ops_active++;
508#ifdef DEBUG_RECOVERY_OIDS
509 recovering_oids.insert(soid);
510#endif
511 osd->start_recovery_op(this, soid);
7c673cae
FG
512}
513
9f95a23c 514void PG::finish_recovery_op(const hobject_t& soid, bool dequeue)
7c673cae 515{
9f95a23c
TL
516 dout(10) << "finish_recovery_op " << soid
517#ifdef DEBUG_RECOVERY_OIDS
f67539c2 518 << " (" << recovering_oids << ")"
9f95a23c
TL
519#endif
520 << dendl;
521 ceph_assert(recovery_ops_active > 0);
522 recovery_ops_active--;
523#ifdef DEBUG_RECOVERY_OIDS
524 ceph_assert(recovering_oids.count(soid));
525 recovering_oids.erase(recovering_oids.find(soid));
526#endif
527 osd->finish_recovery_op(this, soid, dequeue);
7c673cae 528
9f95a23c
TL
529 if (!dequeue) {
530 queue_recovery();
7c673cae 531 }
7c673cae
FG
532}
533
9f95a23c
TL
534void PG::split_into(pg_t child_pgid, PG *child, unsigned split_bits)
535{
536 recovery_state.split_into(child_pgid, &child->recovery_state, split_bits);
7c673cae 537
9f95a23c 538 child->update_snap_mapper_bits(split_bits);
7c673cae 539
9f95a23c
TL
540 child->snap_trimq = snap_trimq;
541 child->snap_trimq_repeat = snap_trimq_repeat;
542
543 _split_into(child_pgid, child, split_bits);
544
545 // release all backoffs for simplicity
546 release_backoffs(hobject_t(), hobject_t::get_max());
7c673cae
FG
547}
548
9f95a23c 549void PG::start_split_stats(const set<spg_t>& childpgs, vector<object_stat_sum_t> *out)
7c673cae 550{
9f95a23c 551 recovery_state.start_split_stats(childpgs, out);
7c673cae
FG
552}
553
9f95a23c
TL
554void PG::finish_split_stats(const object_stat_sum_t& stats, ObjectStore::Transaction &t)
555{
556 recovery_state.finish_split_stats(stats, t);
557}
558
559void PG::merge_from(map<spg_t,PGRef>& sources, PeeringCtx &rctx,
560 unsigned split_bits,
561 const pg_merge_meta_t& last_pg_merge_meta)
562{
563 dout(10) << __func__ << " from " << sources << " split_bits " << split_bits
564 << dendl;
565 map<spg_t, PeeringState*> source_ps;
566 for (auto &&source : sources) {
567 source_ps.emplace(source.first, &source.second->recovery_state);
568 }
569 recovery_state.merge_from(source_ps, rctx, split_bits, last_pg_merge_meta);
570
571 for (auto& i : sources) {
572 auto& source = i.second;
573 // wipe out source's pgmeta
574 rctx.transaction.remove(source->coll, source->pgmeta_oid);
575
576 // merge (and destroy source collection)
577 rctx.transaction.merge_collection(source->coll, coll, split_bits);
7c673cae
FG
578 }
579
9f95a23c
TL
580 // merge_collection does this, but maybe all of our sources were missing.
581 rctx.transaction.collection_set_bits(coll, split_bits);
582
583 snap_mapper.update_bits(split_bits);
7c673cae
FG
584}
585
9f95a23c 586void PG::add_backoff(const ceph::ref_t<Session>& s, const hobject_t& begin, const hobject_t& end)
7c673cae 587{
9f95a23c
TL
588 auto con = s->con;
589 if (!con) // OSD::ms_handle_reset clears s->con without a lock
590 return;
591 auto b = s->have_backoff(info.pgid, begin);
592 if (b) {
593 derr << __func__ << " already have backoff for " << s << " begin " << begin
594 << " " << *b << dendl;
595 ceph_abort();
7c673cae 596 }
9f95a23c
TL
597 std::lock_guard l(backoff_lock);
598 b = ceph::make_ref<Backoff>(info.pgid, this, s, ++s->backoff_seq, begin, end);
599 backoffs[begin].insert(b);
600 s->add_backoff(b);
601 dout(10) << __func__ << " session " << s << " added " << *b << dendl;
602 con->send_message(
603 new MOSDBackoff(
604 info.pgid,
605 get_osdmap_epoch(),
606 CEPH_OSD_BACKOFF_OP_BLOCK,
607 b->id,
608 begin,
609 end));
7c673cae
FG
610}
611
9f95a23c 612void PG::release_backoffs(const hobject_t& begin, const hobject_t& end)
7c673cae 613{
9f95a23c
TL
614 dout(10) << __func__ << " [" << begin << "," << end << ")" << dendl;
615 vector<ceph::ref_t<Backoff>> bv;
616 {
617 std::lock_guard l(backoff_lock);
618 auto p = backoffs.lower_bound(begin);
619 while (p != backoffs.end()) {
620 int r = cmp(p->first, end);
621 dout(20) << __func__ << " ? " << r << " " << p->first
622 << " " << p->second << dendl;
623 // note: must still examine begin=end=p->first case
624 if (r > 0 || (r == 0 && begin < end)) {
625 break;
626 }
627 dout(20) << __func__ << " checking " << p->first
628 << " " << p->second << dendl;
629 auto q = p->second.begin();
630 while (q != p->second.end()) {
631 dout(20) << __func__ << " checking " << *q << dendl;
20effc67
TL
632 int rr = cmp((*q)->begin, begin);
633 if (rr == 0 || (rr > 0 && (*q)->end < end)) {
9f95a23c
TL
634 bv.push_back(*q);
635 q = p->second.erase(q);
636 } else {
637 ++q;
638 }
639 }
640 if (p->second.empty()) {
641 p = backoffs.erase(p);
642 } else {
643 ++p;
644 }
7c673cae
FG
645 }
646 }
9f95a23c
TL
647 for (auto b : bv) {
648 std::lock_guard l(b->lock);
649 dout(10) << __func__ << " " << *b << dendl;
650 if (b->session) {
651 ceph_assert(b->pg == this);
652 ConnectionRef con = b->session->con;
653 if (con) { // OSD::ms_handle_reset clears s->con without a lock
654 con->send_message(
655 new MOSDBackoff(
656 info.pgid,
657 get_osdmap_epoch(),
658 CEPH_OSD_BACKOFF_OP_UNBLOCK,
659 b->id,
660 b->begin,
661 b->end));
7c673cae 662 }
9f95a23c
TL
663 if (b->is_new()) {
664 b->state = Backoff::STATE_DELETING;
7c673cae 665 } else {
9f95a23c
TL
666 b->session->rm_backoff(b);
667 b->session.reset();
7c673cae 668 }
9f95a23c
TL
669 b->pg.reset();
670 }
7c673cae 671 }
7c673cae
FG
672}
673
9f95a23c 674void PG::clear_backoffs()
7c673cae 675{
9f95a23c
TL
676 dout(10) << __func__ << " " << dendl;
677 map<hobject_t,set<ceph::ref_t<Backoff>>> ls;
678 {
679 std::lock_guard l(backoff_lock);
680 ls.swap(backoffs);
681 }
682 for (auto& p : ls) {
683 for (auto& b : p.second) {
684 std::lock_guard l(b->lock);
685 dout(10) << __func__ << " " << *b << dendl;
686 if (b->session) {
687 ceph_assert(b->pg == this);
688 if (b->is_new()) {
689 b->state = Backoff::STATE_DELETING;
690 } else {
691 b->session->rm_backoff(b);
692 b->session.reset();
693 }
694 b->pg.reset();
695 }
696 }
697 }
698}
7c673cae 699
9f95a23c
TL
700// called by Session::clear_backoffs()
701void PG::rm_backoff(const ceph::ref_t<Backoff>& b)
702{
703 dout(10) << __func__ << " " << *b << dendl;
704 std::lock_guard l(backoff_lock);
705 ceph_assert(ceph_mutex_is_locked_by_me(b->lock));
706 ceph_assert(b->pg == this);
707 auto p = backoffs.find(b->begin);
708 // may race with release_backoffs()
709 if (p != backoffs.end()) {
710 auto q = p->second.find(b);
711 if (q != p->second.end()) {
712 p->second.erase(q);
713 if (p->second.empty()) {
714 backoffs.erase(p);
715 }
716 }
717 }
718}
7c673cae 719
f67539c2 720void PG::clear_recovery_state()
9f95a23c
TL
721{
722 dout(10) << "clear_recovery_state" << dendl;
7c673cae 723
9f95a23c 724 finish_sync_event = 0;
7c673cae 725
9f95a23c
TL
726 hobject_t soid;
727 while (recovery_ops_active > 0) {
728#ifdef DEBUG_RECOVERY_OIDS
729 soid = *recovering_oids.begin();
730#endif
731 finish_recovery_op(soid, true);
732 }
7c673cae 733
9f95a23c
TL
734 backfill_info.clear();
735 peer_backfill_info.clear();
736 waiting_on_backfill.clear();
737 _clear_recovery_state(); // pg impl specific hook
738}
7c673cae 739
9f95a23c
TL
740void PG::cancel_recovery()
741{
742 dout(10) << "cancel_recovery" << dendl;
743 clear_recovery_state();
7c673cae
FG
744}
745
9f95a23c
TL
746void PG::set_probe_targets(const set<pg_shard_t> &probe_set)
747{
748 std::lock_guard l(heartbeat_peer_lock);
749 probe_targets.clear();
750 for (set<pg_shard_t>::iterator i = probe_set.begin();
751 i != probe_set.end();
7c673cae 752 ++i) {
9f95a23c 753 probe_targets.insert(i->osd);
7c673cae 754 }
7c673cae
FG
755}
756
9f95a23c 757void PG::send_cluster_message(
f67539c2 758 int target, MessageRef m,
20effc67 759 epoch_t epoch, bool share_map_update)
9f95a23c
TL
760{
761 ConnectionRef con = osd->get_con_osd_cluster(
762 target, get_osdmap_epoch());
763 if (!con) {
11fdf7f2
TL
764 return;
765 }
7c673cae 766
9f95a23c
TL
767 if (share_map_update) {
768 osd->maybe_share_map(con.get(), get_osdmap());
7c673cae 769 }
9f95a23c
TL
770 osd->send_message_osd_cluster(m, con.get());
771}
7c673cae 772
9f95a23c
TL
773void PG::clear_probe_targets()
774{
775 std::lock_guard l(heartbeat_peer_lock);
776 probe_targets.clear();
777}
7c673cae 778
9f95a23c
TL
779void PG::update_heartbeat_peers(set<int> new_peers)
780{
781 bool need_update = false;
782 heartbeat_peer_lock.lock();
783 if (new_peers == heartbeat_peers) {
784 dout(10) << "update_heartbeat_peers " << heartbeat_peers << " unchanged" << dendl;
785 } else {
786 dout(10) << "update_heartbeat_peers " << heartbeat_peers << " -> " << new_peers << dendl;
787 heartbeat_peers.swap(new_peers);
788 need_update = true;
11fdf7f2 789 }
9f95a23c 790 heartbeat_peer_lock.unlock();
11fdf7f2 791
9f95a23c
TL
792 if (need_update)
793 osd->need_heartbeat_peer_update();
794}
11fdf7f2 795
11fdf7f2 796
9f95a23c
TL
797bool PG::check_in_progress_op(
798 const osd_reqid_t &r,
799 eversion_t *version,
800 version_t *user_version,
801 int *return_code,
802 vector<pg_log_op_return_item_t> *op_returns
803 ) const
804{
805 return (
806 projected_log.get_request(r, version, user_version, return_code,
807 op_returns) ||
808 recovery_state.get_pg_log().get_log().get_request(
809 r, version, user_version, return_code, op_returns));
11fdf7f2
TL
810}
811
9f95a23c 812void PG::publish_stats_to_osd()
11fdf7f2 813{
9f95a23c
TL
814 if (!is_primary())
815 return;
11fdf7f2 816
20effc67
TL
817 ceph_assert(m_scrubber);
818 recovery_state.update_stats_wo_resched(
819 [scrubber = m_scrubber.get()](pg_history_t& hist,
820 pg_stat_t& info) mutable -> void {
821 info.scrub_sched_status = scrubber->get_schedule();
822 });
823
9f95a23c 824 std::lock_guard l{pg_stats_publish_lock};
20effc67
TL
825 auto stats =
826 recovery_state.prepare_stats_for_publish(pg_stats_publish, unstable_stats);
9f95a23c 827 if (stats) {
20effc67 828 pg_stats_publish = std::move(stats);
11fdf7f2 829 }
11fdf7f2
TL
830}
831
9f95a23c 832unsigned PG::get_target_pg_log_entries() const
11fdf7f2 833{
9f95a23c 834 return osd->get_target_pg_log_entries();
11fdf7f2
TL
835}
836
9f95a23c 837void PG::clear_publish_stats()
11fdf7f2 838{
9f95a23c
TL
839 dout(15) << "clear_stats" << dendl;
840 std::lock_guard l{pg_stats_publish_lock};
20effc67 841 pg_stats_publish.reset();
7c673cae
FG
842}
843
844/**
9f95a23c 845 * initialize a newly instantiated pg
7c673cae 846 *
9f95a23c
TL
847 * Initialize PG state, as when a PG is initially created, or when it
848 * is first instantiated on the current node.
7c673cae 849 *
9f95a23c
TL
850 * @param role our role/rank
851 * @param newup up set
852 * @param newacting acting set
853 * @param history pg history
854 * @param pi past_intervals
855 * @param backfill true if info should be marked as backfill
856 * @param t transaction to write out our new state in
7c673cae 857 */
9f95a23c
TL
858void PG::init(
859 int role,
860 const vector<int>& newup, int new_up_primary,
861 const vector<int>& newacting, int new_acting_primary,
862 const pg_history_t& history,
863 const PastIntervals& pi,
9f95a23c
TL
864 ObjectStore::Transaction &t)
865{
866 recovery_state.init(
867 role, newup, new_up_primary, newacting,
20effc67 868 new_acting_primary, history, pi, t);
9f95a23c 869}
7c673cae 870
9f95a23c
TL
871void PG::shutdown()
872{
873 ch->flush();
874 std::scoped_lock l{*this};
875 recovery_state.shutdown();
876 on_shutdown();
877}
7c673cae 878
9f95a23c
TL
879#pragma GCC diagnostic ignored "-Wpragmas"
880#pragma GCC diagnostic push
881#pragma GCC diagnostic ignored "-Wdeprecated-declarations"
7c673cae 882
9f95a23c
TL
883void PG::upgrade(ObjectStore *store)
884{
885 dout(0) << __func__ << " " << info_struct_v << " -> " << pg_latest_struct_v
886 << dendl;
887 ceph_assert(info_struct_v <= 10);
888 ObjectStore::Transaction t;
7c673cae 889
9f95a23c 890 // <do upgrade steps here>
7c673cae 891
9f95a23c
TL
892 // finished upgrade!
893 ceph_assert(info_struct_v == 10);
7c673cae 894
9f95a23c
TL
895 // update infover_key
896 if (info_struct_v < pg_latest_struct_v) {
897 map<string,bufferlist> v;
898 __u8 ver = pg_latest_struct_v;
899 encode(ver, v[string(infover_key)]);
900 t.omap_setkeys(coll, pgmeta_oid, v);
7c673cae 901 }
7c673cae 902
9f95a23c 903 recovery_state.force_write_state(t);
7c673cae 904
9f95a23c
TL
905 ObjectStore::CollectionHandle ch = store->open_collection(coll);
906 int r = store->queue_transaction(ch, std::move(t));
907 if (r != 0) {
908 derr << __func__ << ": queue_transaction returned "
909 << cpp_strerror(r) << dendl;
910 ceph_abort();
7c673cae 911 }
9f95a23c 912 ceph_assert(r == 0);
7c673cae 913
9f95a23c
TL
914 C_SaferCond waiter;
915 if (!ch->flush_commit(&waiter)) {
916 waiter.wait();
7c673cae 917 }
9f95a23c 918}
7c673cae 919
9f95a23c
TL
920#pragma GCC diagnostic pop
921#pragma GCC diagnostic warning "-Wpragmas"
7c673cae 922
9f95a23c
TL
923void PG::prepare_write(
924 pg_info_t &info,
925 pg_info_t &last_written_info,
926 PastIntervals &past_intervals,
927 PGLog &pglog,
928 bool dirty_info,
929 bool dirty_big_info,
930 bool need_write_epoch,
931 ObjectStore::Transaction &t)
932{
933 info.stats.stats.add(unstable_stats);
934 unstable_stats.clear();
935 map<string,bufferlist> km;
936 string key_to_remove;
937 if (dirty_big_info || dirty_info) {
938 int ret = prepare_info_keymap(
939 cct,
940 &km,
941 &key_to_remove,
11fdf7f2 942 get_osdmap_epoch(),
9f95a23c
TL
943 info,
944 last_written_info,
945 past_intervals,
946 dirty_big_info,
947 need_write_epoch,
948 cct->_conf->osd_fast_info,
949 osd->logger,
950 this);
951 ceph_assert(ret == 0);
7c673cae 952 }
9f95a23c
TL
953 pglog.write_log_and_missing(
954 t, &km, coll, pgmeta_oid, pool.info.require_rollback());
955 if (!km.empty())
956 t.omap_setkeys(coll, pgmeta_oid, km);
957 if (!key_to_remove.empty())
958 t.omap_rmkey(coll, pgmeta_oid, key_to_remove);
959}
7c673cae 960
9f95a23c
TL
961#pragma GCC diagnostic ignored "-Wpragmas"
962#pragma GCC diagnostic push
963#pragma GCC diagnostic ignored "-Wdeprecated-declarations"
7c673cae 964
9f95a23c
TL
965bool PG::_has_removal_flag(ObjectStore *store,
966 spg_t pgid)
967{
968 coll_t coll(pgid);
969 ghobject_t pgmeta_oid(pgid.make_pgmeta_oid());
7c673cae 970
9f95a23c
TL
971 // first try new way
972 set<string> keys;
973 keys.insert("_remove");
974 map<string,bufferlist> values;
975 auto ch = store->open_collection(coll);
976 ceph_assert(ch);
977 if (store->omap_get_values(ch, pgmeta_oid, keys, &values) == 0 &&
978 values.size() == 1)
979 return true;
7c673cae 980
9f95a23c
TL
981 return false;
982}
c07f9fc5 983
9f95a23c
TL
984int PG::peek_map_epoch(ObjectStore *store,
985 spg_t pgid,
986 epoch_t *pepoch)
987{
988 coll_t coll(pgid);
9f95a23c
TL
989 ghobject_t pgmeta_oid(pgid.make_pgmeta_oid());
990 epoch_t cur_epoch = 0;
7c673cae 991
9f95a23c
TL
992 // validate collection name
993 ceph_assert(coll.is_pg());
7c673cae 994
9f95a23c
TL
995 // try for v8
996 set<string> keys;
997 keys.insert(string(infover_key));
998 keys.insert(string(epoch_key));
999 map<string,bufferlist> values;
1000 auto ch = store->open_collection(coll);
1001 ceph_assert(ch);
1002 int r = store->omap_get_values(ch, pgmeta_oid, keys, &values);
1003 if (r == 0) {
1004 ceph_assert(values.size() == 2);
7c673cae 1005
9f95a23c
TL
1006 // sanity check version
1007 auto bp = values[string(infover_key)].cbegin();
1008 __u8 struct_v = 0;
1009 decode(struct_v, bp);
1010 ceph_assert(struct_v >= 8);
91327a77 1011
9f95a23c
TL
1012 // get epoch
1013 bp = values[string(epoch_key)].begin();
1014 decode(cur_epoch, bp);
1015 } else {
1016 // probably bug 10617; see OSD::load_pgs()
1017 return -1;
1018 }
7c673cae 1019
9f95a23c
TL
1020 *pepoch = cur_epoch;
1021 return 0;
1022}
7c673cae 1023
9f95a23c
TL
1024#pragma GCC diagnostic pop
1025#pragma GCC diagnostic warning "-Wpragmas"
7c673cae 1026
9f95a23c
TL
1027bool PG::check_log_for_corruption(ObjectStore *store)
1028{
1029 /// TODO: this method needs to work with the omap log
1030 return true;
1031}
7c673cae 1032
9f95a23c
TL
1033//! Get the name we're going to save our corrupt page log as
1034std::string PG::get_corrupt_pg_log_name() const
1035{
1036 const int MAX_BUF = 512;
1037 char buf[MAX_BUF];
1038 struct tm tm_buf;
1039 time_t my_time(time(NULL));
1040 const struct tm *t = localtime_r(&my_time, &tm_buf);
1041 int ret = strftime(buf, sizeof(buf), "corrupt_log_%Y-%m-%d_%k:%M_", t);
1042 if (ret == 0) {
1043 dout(0) << "strftime failed" << dendl;
1044 return "corrupt_log_unknown_time";
7c673cae 1045 }
9f95a23c
TL
1046 string out(buf);
1047 out += stringify(info.pgid);
1048 return out;
7c673cae
FG
1049}
1050
9f95a23c
TL
1051int PG::read_info(
1052 ObjectStore *store, spg_t pgid, const coll_t &coll,
1053 pg_info_t &info, PastIntervals &past_intervals,
1054 __u8 &struct_v)
7c673cae 1055{
9f95a23c
TL
1056 set<string> keys;
1057 keys.insert(string(infover_key));
1058 keys.insert(string(info_key));
1059 keys.insert(string(biginfo_key));
1060 keys.insert(string(fastinfo_key));
1061 ghobject_t pgmeta_oid(pgid.make_pgmeta_oid());
1062 map<string,bufferlist> values;
1063 auto ch = store->open_collection(coll);
1064 ceph_assert(ch);
1065 int r = store->omap_get_values(ch, pgmeta_oid, keys, &values);
1066 ceph_assert(r == 0);
1067 ceph_assert(values.size() == 3 ||
1068 values.size() == 4);
7c673cae 1069
9f95a23c
TL
1070 auto p = values[string(infover_key)].cbegin();
1071 decode(struct_v, p);
1072 ceph_assert(struct_v >= 10);
7c673cae 1073
9f95a23c
TL
1074 p = values[string(info_key)].begin();
1075 decode(info, p);
7c673cae 1076
9f95a23c
TL
1077 p = values[string(biginfo_key)].begin();
1078 decode(past_intervals, p);
1079 decode(info.purged_snaps, p);
7c673cae 1080
9f95a23c
TL
1081 p = values[string(fastinfo_key)].begin();
1082 if (!p.end()) {
1083 pg_fast_info_t fast;
1084 decode(fast, p);
1085 fast.try_apply_to(&info);
1086 }
1087 return 0;
7c673cae
FG
1088}
1089
9f95a23c
TL
1090void PG::read_state(ObjectStore *store)
1091{
1092 PastIntervals past_intervals_from_disk;
1093 pg_info_t info_from_disk;
1094 int r = read_info(
1095 store,
1096 pg_id,
1097 coll,
1098 info_from_disk,
1099 past_intervals_from_disk,
1100 info_struct_v);
1101 ceph_assert(r >= 0);
7c673cae 1102
9f95a23c
TL
1103 if (info_struct_v < pg_compat_struct_v) {
1104 derr << "PG needs upgrade, but on-disk data is too old; upgrade to"
1105 << " an older version first." << dendl;
1106 ceph_abort_msg("PG too old to upgrade");
1107 }
7c673cae 1108
9f95a23c
TL
1109 recovery_state.init_from_disk_state(
1110 std::move(info_from_disk),
1111 std::move(past_intervals_from_disk),
1112 [this, store] (PGLog &pglog) {
1113 ostringstream oss;
1114 pglog.read_log_and_missing(
1115 store,
1116 ch,
1117 pgmeta_oid,
1118 info,
1119 oss,
1120 cct->_conf->osd_ignore_stale_divergent_priors,
1121 cct->_conf->osd_debug_verify_missing_on_start);
1122
1123 if (oss.tellp())
1124 osd->clog->error() << oss.str();
1125 return 0;
1126 });
7c673cae 1127
9f95a23c
TL
1128 if (info_struct_v < pg_latest_struct_v) {
1129 upgrade(store);
7c673cae
FG
1130 }
1131
9f95a23c
TL
1132 // initialize current mapping
1133 {
1134 int primary, up_primary;
1135 vector<int> acting, up;
1136 get_osdmap()->pg_to_up_acting_osds(
1137 pg_id.pgid, &up, &up_primary, &acting, &primary);
1138 recovery_state.init_primary_up_acting(
1139 up,
1140 acting,
1141 up_primary,
1142 primary);
1143 recovery_state.set_role(OSDMap::calc_pg_role(pg_whoami, acting));
1144 }
1145
1146 // init pool options
1147 store->set_collection_opts(ch, pool.info.opts);
7c673cae 1148
20effc67 1149 PeeringCtx rctx;
9f95a23c
TL
1150 handle_initialize(rctx);
1151 // note: we don't activate here because we know the OSD will advance maps
1152 // during boot.
1153 write_if_dirty(rctx.transaction);
1154 store->queue_transaction(ch, std::move(rctx.transaction));
7c673cae
FG
1155}
1156
9f95a23c
TL
1157void PG::update_snap_map(
1158 const vector<pg_log_entry_t> &log_entries,
1159 ObjectStore::Transaction &t)
7c673cae 1160{
f67539c2 1161 for (auto i = log_entries.cbegin(); i != log_entries.cend(); ++i) {
9f95a23c
TL
1162 OSDriver::OSTransaction _t(osdriver.get_transaction(&t));
1163 if (i->soid.snap < CEPH_MAXSNAP) {
1164 if (i->is_delete()) {
1165 int r = snap_mapper.remove_oid(
1166 i->soid,
1167 &_t);
f67539c2 1168 if (r)
9f95a23c
TL
1169 derr << __func__ << " remove_oid " << i->soid << " failed with " << r << dendl;
1170 // On removal tolerate missing key corruption
1171 ceph_assert(r == 0 || r == -ENOENT);
1172 } else if (i->is_update()) {
1173 ceph_assert(i->snaps.length() > 0);
1174 vector<snapid_t> snaps;
1175 bufferlist snapbl = i->snaps;
1176 auto p = snapbl.cbegin();
1177 try {
1178 decode(snaps, p);
1179 } catch (...) {
1180 derr << __func__ << " decode snaps failure on " << *i << dendl;
1181 snaps.clear();
1182 }
1183 set<snapid_t> _snaps(snaps.begin(), snaps.end());
7c673cae 1184
9f95a23c
TL
1185 if (i->is_clone() || i->is_promote()) {
1186 snap_mapper.add_oid(
1187 i->soid,
1188 _snaps,
1189 &_t);
1190 } else if (i->is_modify()) {
1191 int r = snap_mapper.update_snaps(
1192 i->soid,
1193 _snaps,
1194 0,
1195 &_t);
1196 ceph_assert(r == 0);
11fdf7f2 1197 } else {
9f95a23c 1198 ceph_assert(i->is_clean());
11fdf7f2
TL
1199 }
1200 }
11fdf7f2
TL
1201 }
1202 }
7c673cae
FG
1203}
1204
9f95a23c
TL
1205/**
1206 * filter trimming|trimmed snaps out of snapcontext
1207 */
1208void PG::filter_snapc(vector<snapid_t> &snaps)
7c673cae 1209{
9f95a23c
TL
1210 // nothing needs to trim, we can return immediately
1211 if (snap_trimq.empty() && info.purged_snaps.empty())
1212 return;
1213
1214 bool filtering = false;
1215 vector<snapid_t> newsnaps;
1216 for (vector<snapid_t>::iterator p = snaps.begin();
1217 p != snaps.end();
1218 ++p) {
1219 if (snap_trimq.contains(*p) || info.purged_snaps.contains(*p)) {
1220 if (!filtering) {
1221 // start building a new vector with what we've seen so far
1222 dout(10) << "filter_snapc filtering " << snaps << dendl;
1223 newsnaps.insert(newsnaps.begin(), snaps.begin(), p);
1224 filtering = true;
1225 }
1226 dout(20) << "filter_snapc removing trimq|purged snap " << *p << dendl;
1227 } else {
1228 if (filtering)
1229 newsnaps.push_back(*p); // continue building new vector
d2e6a577 1230 }
c07f9fc5 1231 }
9f95a23c
TL
1232 if (filtering) {
1233 snaps.swap(newsnaps);
1234 dout(10) << "filter_snapc result " << snaps << dendl;
a8e16298 1235 }
a8e16298
TL
1236}
1237
9f95a23c 1238void PG::requeue_object_waiters(map<hobject_t, list<OpRequestRef>>& m)
a8e16298 1239{
f67539c2 1240 for (auto it = m.begin(); it != m.end(); ++it)
9f95a23c
TL
1241 requeue_ops(it->second);
1242 m.clear();
c07f9fc5 1243}
7c673cae 1244
9f95a23c 1245void PG::requeue_op(OpRequestRef op)
c07f9fc5 1246{
9f95a23c
TL
1247 auto p = waiting_for_map.find(op->get_source());
1248 if (p != waiting_for_map.end()) {
1249 dout(20) << __func__ << " " << op << " (waiting_for_map " << p->first << ")"
1250 << dendl;
1251 p->second.push_front(op);
c07f9fc5 1252 } else {
9f95a23c
TL
1253 dout(20) << __func__ << " " << op << dendl;
1254 osd->enqueue_front(
1255 OpSchedulerItem(
1256 unique_ptr<OpSchedulerItem::OpQueueable>(new PGOpItem(info.pgid, op)),
1257 op->get_req()->get_cost(),
1258 op->get_req()->get_priority(),
1259 op->get_req()->get_recv_stamp(),
1260 op->get_req()->get_source().num(),
1261 get_osdmap_epoch()));
7c673cae 1262 }
c07f9fc5 1263}
7c673cae 1264
9f95a23c 1265void PG::requeue_ops(list<OpRequestRef> &ls)
c07f9fc5 1266{
9f95a23c
TL
1267 for (list<OpRequestRef>::reverse_iterator i = ls.rbegin();
1268 i != ls.rend();
1269 ++i) {
1270 requeue_op(*i);
c07f9fc5 1271 }
9f95a23c 1272 ls.clear();
7c673cae
FG
1273}
1274
9f95a23c 1275void PG::requeue_map_waiters()
7c673cae 1276{
9f95a23c
TL
1277 epoch_t epoch = get_osdmap_epoch();
1278 auto p = waiting_for_map.begin();
1279 while (p != waiting_for_map.end()) {
1280 if (epoch < p->second.front()->min_epoch) {
1281 dout(20) << __func__ << " " << p->first << " front op "
1282 << p->second.front() << " must still wait, doing nothing"
1283 << dendl;
1284 ++p;
1285 } else {
1286 dout(20) << __func__ << " " << p->first << " " << p->second << dendl;
1287 for (auto q = p->second.rbegin(); q != p->second.rend(); ++q) {
1288 auto req = *q;
1289 osd->enqueue_front(OpSchedulerItem(
1290 unique_ptr<OpSchedulerItem::OpQueueable>(new PGOpItem(info.pgid, req)),
1291 req->get_req()->get_cost(),
1292 req->get_req()->get_priority(),
1293 req->get_req()->get_recv_stamp(),
1294 req->get_req()->get_source().num(),
1295 epoch));
1296 }
1297 p = waiting_for_map.erase(p);
c07f9fc5 1298 }
9f95a23c
TL
1299 }
1300}
7c673cae 1301
f67539c2
TL
1302bool PG::get_must_scrub() const
1303{
1304 dout(20) << __func__ << " must_scrub? " << (m_planned_scrub.must_scrub ? "true" : "false") << dendl;
1305 return m_planned_scrub.must_scrub;
1306}
1307
1308unsigned int PG::scrub_requeue_priority(Scrub::scrub_prio_t with_priority) const
1309{
1310 return m_scrubber->scrub_requeue_priority(with_priority);
1311}
1312
1313unsigned int PG::scrub_requeue_priority(Scrub::scrub_prio_t with_priority, unsigned int suggested_priority) const
1314{
1315 return m_scrubber->scrub_requeue_priority(with_priority, suggested_priority);
1316}
7c673cae 1317
9f95a23c
TL
1318// ==========================================================================================
1319// SCRUB
7c673cae 1320
9f95a23c 1321/*
f67539c2
TL
1322 * implementation note:
1323 * PG::sched_scrub() is called only once per a specific scrub session.
1324 * That call commits us to the whatever choices are made (deep/shallow, etc').
1325 * Unless failing to start scrubbing, the 'planned scrub' flag-set is 'frozen' into
1326 * PgScrubber's m_flags, then cleared.
9f95a23c 1327 */
20effc67 1328Scrub::schedule_result_t PG::sched_scrub()
11fdf7f2 1329{
f67539c2
TL
1330 dout(15) << __func__ << " pg(" << info.pgid
1331 << (is_active() ? ") <active>" : ") <not-active>")
1332 << (is_clean() ? " <clean>" : " <not-clean>") << dendl;
9f95a23c 1333 ceph_assert(ceph_mutex_is_locked(_lock));
20effc67 1334 ceph_assert(m_scrubber);
f67539c2 1335
20effc67
TL
1336 if (is_scrub_queued_or_active()) {
1337 return Scrub::schedule_result_t::already_started;
11fdf7f2 1338 }
11fdf7f2 1339
20effc67
TL
1340 if (!is_primary() || !is_active() || !is_clean()) {
1341 return Scrub::schedule_result_t::bad_pg_state;
f67539c2 1342 }
7c673cae 1343
f67539c2
TL
1344 // analyse the combination of the requested scrub flags, the osd/pool configuration
1345 // and the PG status to determine whether we should scrub now, and what type of scrub
1346 // should that be.
1347 auto updated_flags = verify_scrub_mode();
1348 if (!updated_flags) {
1349 // the stars do not align for starting a scrub for this PG at this time
1350 // (due to configuration or priority issues)
1351 // The reason was already reported by the callee.
1352 dout(10) << __func__ << ": failed to initiate a scrub" << dendl;
20effc67 1353 return Scrub::schedule_result_t::preconditions;
f67539c2 1354 }
7c673cae 1355
f67539c2
TL
1356 // try to reserve the local OSD resources. If failing: no harm. We will
1357 // be retried by the OSD later on.
1358 if (!m_scrubber->reserve_local()) {
1359 dout(10) << __func__ << ": failed to reserve locally" << dendl;
20effc67 1360 return Scrub::schedule_result_t::no_local_resources;
f67539c2 1361 }
7c673cae 1362
f67539c2
TL
1363 // can commit to the updated flags now, as nothing will stop the scrub
1364 m_planned_scrub = *updated_flags;
9f95a23c 1365
f67539c2
TL
1366 // An interrupted recovery repair could leave this set.
1367 state_clear(PG_STATE_REPAIR);
9f95a23c 1368
f67539c2
TL
1369 // Pass control to the scrubber. It is the scrubber that handles the replicas'
1370 // resources reservations.
1371 m_scrubber->set_op_parameters(m_planned_scrub);
9f95a23c 1372
f67539c2 1373 dout(10) << __func__ << ": queueing" << dendl;
20effc67 1374 m_scrubber->set_queued_or_active();
f67539c2 1375 osd->queue_for_scrub(this, Scrub::scrub_prio_t::low_priority);
20effc67 1376 return Scrub::schedule_result_t::scrub_initiated;
f67539c2
TL
1377}
1378
1379double PG::next_deepscrub_interval() const
1380{
1381 double deep_scrub_interval =
1382 pool.info.opts.value_or(pool_opts_t::DEEP_SCRUB_INTERVAL, 0.0);
1383 if (deep_scrub_interval <= 0.0)
1384 deep_scrub_interval = cct->_conf->osd_deep_scrub_interval;
1385 return info.history.last_deep_scrub_stamp + deep_scrub_interval;
1386}
1387
1388bool PG::is_time_for_deep(bool allow_deep_scrub,
1389 bool allow_scrub,
1390 bool has_deep_errors,
1391 const requested_scrub_t& planned) const
1392{
20effc67
TL
1393 dout(10) << __func__ << ": need_auto?" << planned.need_auto << " allow_deep_scrub? "
1394 << allow_deep_scrub << dendl;
f67539c2
TL
1395
1396 if (!allow_deep_scrub)
1397 return false;
1398
1399 if (planned.need_auto) {
1400 dout(10) << __func__ << ": need repair after scrub errors" << dendl;
1401 return true;
1402 }
1403
20effc67
TL
1404 if (ceph_clock_now() >= next_deepscrub_interval()) {
1405 dout(20) << __func__ << ": now (" << ceph_clock_now() << ") >= time for deep ("
1406 << next_deepscrub_interval() << ")" << dendl;
f67539c2 1407 return true;
20effc67 1408 }
f67539c2
TL
1409
1410 if (has_deep_errors) {
1411 osd->clog->info() << "osd." << osd->whoami << " pg " << info.pgid
1412 << " Deep scrub errors, upgrading scrub to deep-scrub";
1413 return true;
9f95a23c
TL
1414 }
1415
f67539c2
TL
1416 // we only flip coins if 'allow_scrub' is asserted. Otherwise - as this function is
1417 // called often, we will probably be deep-scrubbing most of the time.
1418 if (allow_scrub) {
1419 bool deep_coin_flip =
1420 (rand() % 100) < cct->_conf->osd_deep_scrub_randomize_ratio * 100;
1421
1422 dout(15) << __func__ << ": time_for_deep=" << planned.time_for_deep
1423 << " deep_coin_flip=" << deep_coin_flip << dendl;
1424
1425 if (deep_coin_flip)
1426 return true;
1427 }
1428
1429 return false;
1430}
1431
1432bool PG::verify_periodic_scrub_mode(bool allow_deep_scrub,
1433 bool try_to_auto_repair,
1434 bool allow_regular_scrub,
1435 bool has_deep_errors,
1436 requested_scrub_t& planned) const
1437
1438{
1439 ceph_assert(!planned.must_deep_scrub && !planned.must_repair);
1440
1441 if (!allow_deep_scrub && has_deep_errors) {
1442 osd->clog->error()
1443 << "osd." << osd->whoami << " pg " << info.pgid
1444 << " Regular scrub skipped due to deep-scrub errors and nodeep-scrub set";
9f95a23c 1445 return false;
f67539c2
TL
1446 }
1447
1448 if (allow_deep_scrub) {
1449 // Initial entry and scheduled scrubs without nodeep_scrub set get here
1450
1451 planned.time_for_deep =
1452 is_time_for_deep(allow_deep_scrub, allow_regular_scrub, has_deep_errors, planned);
1453
1454 if (try_to_auto_repair) {
1455 if (planned.time_for_deep) {
1456 dout(20) << __func__ << ": auto repair with deep scrubbing" << dendl;
1457 planned.auto_repair = true;
1458 } else if (allow_regular_scrub) {
1459 dout(20) << __func__ << ": auto repair with scrubbing, rescrub if errors found"
1460 << dendl;
1461 planned.deep_scrub_on_error = true;
9f95a23c 1462 }
7c673cae 1463 }
7c673cae 1464 }
f67539c2
TL
1465
1466 dout(20) << __func__ << " updated flags: " << planned
1467 << " allow_regular_scrub: " << allow_regular_scrub << dendl;
1468
1469 // NOSCRUB so skip regular scrubs
1470 if (!allow_regular_scrub && !planned.time_for_deep) {
1471 return false;
1472 }
1473
9f95a23c 1474 return true;
7c673cae
FG
1475}
1476
f67539c2 1477std::optional<requested_scrub_t> PG::verify_scrub_mode() const
7c673cae 1478{
20effc67
TL
1479 const bool allow_regular_scrub =
1480 !(get_osdmap()->test_flag(CEPH_OSDMAP_NOSCRUB) ||
1481 pool.info.has_flag(pg_pool_t::FLAG_NOSCRUB));
1482 const bool allow_deep_scrub =
1483 allow_regular_scrub &&
1484 !(get_osdmap()->test_flag(CEPH_OSDMAP_NODEEP_SCRUB) ||
1485 pool.info.has_flag(pg_pool_t::FLAG_NODEEP_SCRUB));
1486 const bool has_deep_errors = (info.stats.stats.sum.num_deep_scrub_errors > 0);
1487 const bool try_to_auto_repair = (cct->_conf->osd_scrub_auto_repair &&
1488 get_pgbackend()->auto_repair_supported());
1489
1490 dout(10) << __func__ << " pg: " << info.pgid
1491 << " allow: " << allow_regular_scrub << "/" << allow_deep_scrub
1492 << " deep errs: " << has_deep_errors
1493 << " auto-repair: " << try_to_auto_repair << " ("
1494 << cct->_conf->osd_scrub_auto_repair << ")" << dendl;
7c673cae 1495
f67539c2
TL
1496 auto upd_flags = m_planned_scrub;
1497
1498 upd_flags.time_for_deep = false;
1499 // Clear these in case user issues the scrub/repair command during
1500 // the scheduling of the scrub/repair (e.g. request reservation)
1501 upd_flags.deep_scrub_on_error = false;
1502 upd_flags.auto_repair = false;
1503
1504 if (upd_flags.must_scrub && !upd_flags.must_deep_scrub && has_deep_errors) {
20effc67
TL
1505 osd->clog->error()
1506 << "osd." << osd->whoami << " pg " << info.pgid
1507 << " Regular scrub request, deep-scrub details will be lost";
f67539c2
TL
1508 }
1509
1510 if (!upd_flags.must_scrub) {
1511 // All periodic scrub handling goes here because must_scrub is
1512 // always set for must_deep_scrub and must_repair.
1513
20effc67
TL
1514 const bool can_start_periodic = verify_periodic_scrub_mode(
1515 allow_deep_scrub, try_to_auto_repair, allow_regular_scrub,
1516 has_deep_errors, upd_flags);
f67539c2 1517 if (!can_start_periodic) {
20effc67
TL
1518 // "I don't want no scrub"
1519 dout(20) << __func__ << ": no periodic scrubs allowed" << dendl;
f67539c2
TL
1520 return std::nullopt;
1521 }
1522 }
1523
1524 // scrubbing while recovering?
1525
1526 bool prevented_by_recovery =
1527 osd->is_recovery_active() && !cct->_conf->osd_scrub_during_recovery &&
1528 (!cct->_conf->osd_repair_during_recovery || !upd_flags.must_repair);
1529
1530 if (prevented_by_recovery) {
1531 dout(20) << __func__ << ": scrubbing prevented during recovery" << dendl;
1532 return std::nullopt;
7c673cae 1533 }
f67539c2
TL
1534
1535 upd_flags.need_auto = false;
1536 return upd_flags;
7c673cae
FG
1537}
1538
20effc67
TL
1539/*
1540 * Note: on_info_history_change() is used in those two cases where we're not sure
1541 * whether the role of the PG was changed, and if so - was this change relayed to the
1542 * scrub-queue.
1543 */
1544void PG::on_info_history_change()
7c673cae 1545{
20effc67
TL
1546 dout(20) << __func__ << " for a " << (is_primary() ? "Primary" : "non-primary") <<dendl;
1547
1548 ceph_assert(m_scrubber);
1549 m_scrubber->on_maybe_registration_change(m_planned_scrub);
9f95a23c 1550}
7c673cae 1551
20effc67 1552void PG::reschedule_scrub()
9f95a23c 1553{
20effc67
TL
1554 dout(20) << __func__ << " for a " << (is_primary() ? "Primary" : "non-primary") <<dendl;
1555
1556 // we are assuming no change in primary status
1557 if (is_primary()) {
1558 ceph_assert(m_scrubber);
1559 m_scrubber->update_scrub_job(m_planned_scrub);
1560 }
1561}
1562
1563void PG::on_primary_status_change(bool was_primary, bool now_primary)
1564{
1565 // make sure we have a working scrubber when becoming a primary
1566
1567 if (was_primary != now_primary) {
1568 ceph_assert(m_scrubber);
1569 m_scrubber->on_primary_change(m_planned_scrub);
f67539c2 1570 }
9f95a23c 1571}
7c673cae 1572
f67539c2 1573void PG::scrub_requested(scrub_level_t scrub_level, scrub_type_t scrub_type)
9f95a23c 1574{
20effc67 1575 ceph_assert(m_scrubber);
f67539c2 1576 m_scrubber->scrub_requested(scrub_level, scrub_type, m_planned_scrub);
9f95a23c 1577}
7c673cae 1578
9f95a23c
TL
1579void PG::clear_ready_to_merge() {
1580 osd->clear_ready_to_merge(this);
1581}
7c673cae 1582
9f95a23c
TL
1583void PG::queue_want_pg_temp(const vector<int> &wanted) {
1584 osd->queue_want_pg_temp(get_pgid().pgid, wanted);
1585}
7c673cae 1586
9f95a23c
TL
1587void PG::clear_want_pg_temp() {
1588 osd->remove_want_pg_temp(get_pgid().pgid);
1589}
7c673cae 1590
9f95a23c
TL
1591void PG::on_role_change() {
1592 requeue_ops(waiting_for_peered);
1593 plpg_on_role_change();
1594}
7c673cae 1595
20effc67
TL
1596void PG::on_new_interval()
1597{
9f95a23c
TL
1598 projected_last_update = eversion_t();
1599 cancel_recovery();
20effc67
TL
1600
1601 assert(m_scrubber);
1602 // log some scrub data before we react to the interval
1603 dout(20) << __func__ << (is_scrub_queued_or_active() ? " scrubbing " : " ")
1604 << "flags: " << m_planned_scrub << dendl;
1605
1606 m_scrubber->on_maybe_registration_change(m_planned_scrub);
9f95a23c 1607}
7c673cae 1608
9f95a23c
TL
1609epoch_t PG::oldest_stored_osdmap() {
1610 return osd->get_superblock().oldest_map;
1611}
7c673cae 1612
9f95a23c
TL
1613OstreamTemp PG::get_clog_info() {
1614 return osd->clog->info();
1615}
7c673cae 1616
9f95a23c
TL
1617OstreamTemp PG::get_clog_debug() {
1618 return osd->clog->debug();
1619}
7c673cae 1620
9f95a23c
TL
1621OstreamTemp PG::get_clog_error() {
1622 return osd->clog->error();
1623}
7c673cae 1624
9f95a23c
TL
1625void PG::schedule_event_after(
1626 PGPeeringEventRef event,
1627 float delay) {
1628 std::lock_guard lock(osd->recovery_request_lock);
1629 osd->recovery_request_timer.add_event_after(
1630 delay,
1631 new QueuePeeringEvt(
1632 this,
1633 std::move(event)));
1634}
7c673cae 1635
9f95a23c
TL
1636void PG::request_local_background_io_reservation(
1637 unsigned priority,
f67539c2
TL
1638 PGPeeringEventURef on_grant,
1639 PGPeeringEventURef on_preempt) {
9f95a23c
TL
1640 osd->local_reserver.request_reservation(
1641 pg_id,
1642 on_grant ? new QueuePeeringEvt(
f67539c2 1643 this, std::move(on_grant)) : nullptr,
9f95a23c
TL
1644 priority,
1645 on_preempt ? new QueuePeeringEvt(
f67539c2 1646 this, std::move(on_preempt)) : nullptr);
9f95a23c 1647}
7c673cae 1648
9f95a23c
TL
1649void PG::update_local_background_io_priority(
1650 unsigned priority) {
1651 osd->local_reserver.update_priority(
1652 pg_id,
1653 priority);
1654}
7c673cae 1655
9f95a23c
TL
1656void PG::cancel_local_background_io_reservation() {
1657 osd->local_reserver.cancel_reservation(
1658 pg_id);
1659}
7c673cae 1660
9f95a23c
TL
1661void PG::request_remote_recovery_reservation(
1662 unsigned priority,
f67539c2
TL
1663 PGPeeringEventURef on_grant,
1664 PGPeeringEventURef on_preempt) {
9f95a23c
TL
1665 osd->remote_reserver.request_reservation(
1666 pg_id,
1667 on_grant ? new QueuePeeringEvt(
f67539c2 1668 this, std::move(on_grant)) : nullptr,
9f95a23c
TL
1669 priority,
1670 on_preempt ? new QueuePeeringEvt(
f67539c2 1671 this, std::move(on_preempt)) : nullptr);
9f95a23c 1672}
11fdf7f2 1673
9f95a23c
TL
1674void PG::cancel_remote_recovery_reservation() {
1675 osd->remote_reserver.cancel_reservation(
1676 pg_id);
7c673cae
FG
1677}
1678
9f95a23c
TL
1679void PG::schedule_event_on_commit(
1680 ObjectStore::Transaction &t,
1681 PGPeeringEventRef on_commit)
11fdf7f2 1682{
9f95a23c 1683 t.register_on_commit(new QueuePeeringEvt(this, on_commit));
11fdf7f2
TL
1684}
1685
f67539c2
TL
1686void PG::on_activate(interval_set<snapid_t> snaps)
1687{
1688 ceph_assert(!m_scrubber->are_callbacks_pending());
1689 ceph_assert(callbacks_for_degraded_object.empty());
1690 snap_trimq = snaps;
1691 release_pg_backoffs();
1692 projected_last_update = info.last_update;
1693}
1694
9f95a23c 1695void PG::on_active_exit()
11fdf7f2 1696{
9f95a23c
TL
1697 backfill_reserving = false;
1698 agent_stop();
11fdf7f2
TL
1699}
1700
9f95a23c 1701void PG::on_active_advmap(const OSDMapRef &osdmap)
11fdf7f2 1702{
9f95a23c
TL
1703 const auto& new_removed_snaps = osdmap->get_new_removed_snaps();
1704 auto i = new_removed_snaps.find(get_pgid().pool());
1705 if (i != new_removed_snaps.end()) {
1706 bool bad = false;
1707 for (auto j : i->second) {
1708 if (snap_trimq.intersects(j.first, j.second)) {
1709 decltype(snap_trimq) added, overlap;
1710 added.insert(j.first, j.second);
1711 overlap.intersection_of(snap_trimq, added);
1712 derr << __func__ << " removed_snaps already contains "
1713 << overlap << dendl;
1714 bad = true;
1715 snap_trimq.union_of(added);
1716 } else {
1717 snap_trimq.insert(j.first, j.second);
1718 }
1719 }
1720 dout(10) << __func__ << " new removed_snaps " << i->second
1721 << ", snap_trimq now " << snap_trimq << dendl;
1722 ceph_assert(!bad || !cct->_conf->osd_debug_verify_cached_snaps);
1723 }
1724
1725 const auto& new_purged_snaps = osdmap->get_new_purged_snaps();
1726 auto j = new_purged_snaps.find(get_pgid().pgid.pool());
1727 if (j != new_purged_snaps.end()) {
1728 bool bad = false;
1729 for (auto k : j->second) {
1730 if (!recovery_state.get_info().purged_snaps.contains(k.first, k.second)) {
1731 interval_set<snapid_t> rm, overlap;
1732 rm.insert(k.first, k.second);
1733 overlap.intersection_of(recovery_state.get_info().purged_snaps, rm);
1734 derr << __func__ << " purged_snaps does not contain "
1735 << rm << ", only " << overlap << dendl;
1736 recovery_state.adjust_purged_snaps(
1737 [&overlap](auto &purged_snaps) {
1738 purged_snaps.subtract(overlap);
1739 });
1740 // This can currently happen in the normal (if unlikely) course of
1741 // events. Because adding snaps to purged_snaps does not increase
1742 // the pg version or add a pg log entry, we don't reliably propagate
1743 // purged_snaps additions to other OSDs.
1744 // One example:
1745 // - purge S
1746 // - primary and replicas update purged_snaps
1747 // - no object updates
1748 // - pg mapping changes, new primary on different node
1749 // - new primary pg version == eversion_t(), so info is not
1750 // propagated.
1751 //bad = true;
1752 } else {
1753 recovery_state.adjust_purged_snaps(
1754 [&k](auto &purged_snaps) {
1755 purged_snaps.erase(k.first, k.second);
1756 });
11fdf7f2
TL
1757 }
1758 }
9f95a23c
TL
1759 dout(10) << __func__ << " new purged_snaps " << j->second
1760 << ", now " << recovery_state.get_info().purged_snaps << dendl;
1761 ceph_assert(!bad || !cct->_conf->osd_debug_verify_cached_snaps);
11fdf7f2 1762 }
11fdf7f2
TL
1763}
1764
9f95a23c 1765void PG::queue_snap_retrim(snapid_t snap)
7c673cae 1766{
9f95a23c
TL
1767 if (!is_active() ||
1768 !is_primary()) {
1769 dout(10) << __func__ << " snap " << snap << " - not active and primary"
1770 << dendl;
7c673cae 1771 return;
7c673cae 1772 }
9f95a23c
TL
1773 if (!snap_trimq.contains(snap)) {
1774 snap_trimq.insert(snap);
1775 snap_trimq_repeat.insert(snap);
1776 dout(20) << __func__ << " snap " << snap
1777 << ", trimq now " << snap_trimq
1778 << ", repeat " << snap_trimq_repeat << dendl;
1779 kick_snap_trim();
1780 } else {
1781 dout(20) << __func__ << " snap " << snap
1782 << " already in trimq " << snap_trimq << dendl;
7c673cae 1783 }
7c673cae
FG
1784}
1785
9f95a23c 1786void PG::on_active_actmap()
7c673cae 1787{
9f95a23c
TL
1788 if (cct->_conf->osd_check_for_log_corruption)
1789 check_log_for_corruption(osd->store);
1790
1791
1792 if (recovery_state.is_active()) {
1793 dout(10) << "Active: kicking snap trim" << dendl;
1794 kick_snap_trim();
7c673cae 1795 }
9f95a23c
TL
1796
1797 if (recovery_state.is_peered() &&
1798 !recovery_state.is_clean() &&
1799 !recovery_state.get_osdmap()->test_flag(CEPH_OSDMAP_NOBACKFILL) &&
1800 (!recovery_state.get_osdmap()->test_flag(CEPH_OSDMAP_NOREBALANCE) ||
1801 recovery_state.is_degraded())) {
1802 queue_recovery();
7c673cae
FG
1803 }
1804}
1805
9f95a23c 1806void PG::on_backfill_reserved()
7c673cae 1807{
9f95a23c
TL
1808 backfill_reserving = false;
1809 queue_recovery();
7c673cae
FG
1810}
1811
9f95a23c 1812void PG::on_backfill_canceled()
7c673cae 1813{
9f95a23c
TL
1814 if (!waiting_on_backfill.empty()) {
1815 waiting_on_backfill.clear();
1816 finish_recovery_op(hobject_t::get_max());
7c673cae
FG
1817 }
1818}
1819
9f95a23c 1820void PG::on_recovery_reserved()
7c673cae 1821{
9f95a23c 1822 queue_recovery();
7c673cae
FG
1823}
1824
9f95a23c 1825void PG::set_not_ready_to_merge_target(pg_t pgid, pg_t src)
7c673cae 1826{
9f95a23c 1827 osd->set_not_ready_to_merge_target(pgid, src);
7c673cae
FG
1828}
1829
9f95a23c 1830void PG::set_not_ready_to_merge_source(pg_t pgid)
7c673cae 1831{
9f95a23c 1832 osd->set_not_ready_to_merge_source(pgid);
7c673cae
FG
1833}
1834
9f95a23c 1835void PG::set_ready_to_merge_target(eversion_t lu, epoch_t les, epoch_t lec)
7c673cae 1836{
9f95a23c 1837 osd->set_ready_to_merge_target(this, lu, les, lec);
7c673cae
FG
1838}
1839
9f95a23c 1840void PG::set_ready_to_merge_source(eversion_t lu)
7c673cae 1841{
9f95a23c 1842 osd->set_ready_to_merge_source(this, lu);
7c673cae
FG
1843}
1844
9f95a23c 1845void PG::send_pg_created(pg_t pgid)
7c673cae 1846{
9f95a23c
TL
1847 osd->send_pg_created(pgid);
1848}
7c673cae 1849
9f95a23c
TL
1850ceph::signedspan PG::get_mnow()
1851{
1852 return osd->get_mnow();
1853}
7c673cae 1854
9f95a23c
TL
1855HeartbeatStampsRef PG::get_hb_stamps(int peer)
1856{
1857 return osd->get_hb_stamps(peer);
7c673cae
FG
1858}
1859
9f95a23c
TL
1860void PG::schedule_renew_lease(epoch_t lpr, ceph::timespan delay)
1861{
1862 auto spgid = info.pgid;
1863 auto o = osd;
1864 osd->mono_timer.add_event(
1865 delay,
1866 [o, lpr, spgid]() {
1867 o->queue_renew_lease(lpr, spgid);
1868 });
1869}
7c673cae 1870
9f95a23c 1871void PG::queue_check_readable(epoch_t lpr, ceph::timespan delay)
7c673cae 1872{
9f95a23c 1873 osd->queue_check_readable(info.pgid, lpr, delay);
7c673cae
FG
1874}
1875
9f95a23c 1876void PG::rebuild_missing_set_with_deletes(PGLog &pglog)
91327a77 1877{
9f95a23c
TL
1878 pglog.rebuild_missing_set_with_deletes(
1879 osd->store,
1880 ch,
1881 recovery_state.get_info());
91327a77
AA
1882}
1883
9f95a23c 1884void PG::on_activate_committed()
91327a77 1885{
9f95a23c
TL
1886 if (!is_primary()) {
1887 // waiters
1888 if (recovery_state.needs_flush() == 0) {
1889 requeue_ops(waiting_for_peered);
1890 } else if (!waiting_for_peered.empty()) {
1891 dout(10) << __func__ << " flushes in progress, moving "
1892 << waiting_for_peered.size() << " items to waiting_for_flush"
1893 << dendl;
1894 ceph_assert(waiting_for_flush.empty());
1895 waiting_for_flush.swap(waiting_for_peered);
91327a77 1896 }
9f95a23c
TL
1897 }
1898}
91327a77 1899
9f95a23c
TL
1900// Compute pending backfill data
1901static int64_t pending_backfill(CephContext *cct, int64_t bf_bytes, int64_t local_bytes)
11fdf7f2 1902{
9f95a23c
TL
1903 lgeneric_dout(cct, 20) << __func__ << " Adjust local usage "
1904 << (local_bytes >> 10) << "KiB"
1905 << " primary usage " << (bf_bytes >> 10)
1906 << "KiB" << dendl;
11fdf7f2 1907
9f95a23c
TL
1908 return std::max((int64_t)0, bf_bytes - local_bytes);
1909}
7c673cae 1910
7c673cae 1911
9f95a23c
TL
1912// We can zero the value of primary num_bytes as just an atomic.
1913// However, setting above zero reserves space for backfill and requires
1914// the OSDService::stat_lock which protects all OSD usage
1915bool PG::try_reserve_recovery_space(
1916 int64_t primary_bytes, int64_t local_bytes) {
1917 // Use tentative_bacfill_full() to make sure enough
1918 // space is available to handle target bytes from primary.
7c673cae 1919
9f95a23c
TL
1920 // TODO: If we passed num_objects from primary we could account for
1921 // an estimate of the metadata overhead.
7c673cae 1922
9f95a23c
TL
1923 // TODO: If we had compressed_allocated and compressed_original from primary
1924 // we could compute compression ratio and adjust accordingly.
7c673cae 1925
9f95a23c
TL
1926 // XXX: There is no way to get omap overhead and this would only apply
1927 // to whatever possibly different partition that is storing the database.
7c673cae 1928
9f95a23c
TL
1929 // update_osd_stat() from heartbeat will do this on a new
1930 // statfs using ps->primary_bytes.
1931 uint64_t pending_adjustment = 0;
1932 if (primary_bytes) {
1933 // For erasure coded pool overestimate by a full stripe per object
1934 // because we don't know how each objected rounded to the nearest stripe
1935 if (pool.info.is_erasure()) {
1936 primary_bytes /= (int)get_pgbackend()->get_ec_data_chunk_count();
1937 primary_bytes += get_pgbackend()->get_ec_stripe_chunk_size() *
1938 info.stats.stats.sum.num_objects;
1939 local_bytes /= (int)get_pgbackend()->get_ec_data_chunk_count();
1940 local_bytes += get_pgbackend()->get_ec_stripe_chunk_size() *
1941 info.stats.stats.sum.num_objects;
1942 }
1943 pending_adjustment = pending_backfill(
1944 cct,
1945 primary_bytes,
1946 local_bytes);
1947 dout(10) << __func__ << " primary_bytes " << (primary_bytes >> 10)
1948 << "KiB"
1949 << " local " << (local_bytes >> 10) << "KiB"
1950 << " pending_adjustments " << (pending_adjustment >> 10) << "KiB"
1951 << dendl;
7c673cae 1952 }
7c673cae 1953
9f95a23c
TL
1954 // This lock protects not only the stats OSDService but also setting the
1955 // pg primary_bytes. That's why we don't immediately unlock
1956 std::lock_guard l{osd->stat_lock};
1957 osd_stat_t cur_stat = osd->osd_stat;
1958 if (cct->_conf->osd_debug_reject_backfill_probability > 0 &&
1959 (rand()%1000 < (cct->_conf->osd_debug_reject_backfill_probability*1000.0))) {
1960 dout(10) << "backfill reservation rejected: failure injection"
1961 << dendl;
1962 return false;
1963 } else if (!cct->_conf->osd_debug_skip_full_check_in_backfill_reservation &&
1964 osd->tentative_backfill_full(this, pending_adjustment, cur_stat)) {
1965 dout(10) << "backfill reservation rejected: backfill full"
1966 << dendl;
1967 return false;
1968 } else {
1969 // Don't reserve space if skipped reservation check, this is used
1970 // to test the other backfill full check AND in case a corruption
1971 // of num_bytes requires ignoring that value and trying the
1972 // backfill anyway.
1973 if (primary_bytes &&
1974 !cct->_conf->osd_debug_skip_full_check_in_backfill_reservation) {
1975 primary_num_bytes.store(primary_bytes);
1976 local_num_bytes.store(local_bytes);
1977 } else {
1978 unreserve_recovery_space();
1979 }
1980 return true;
7c673cae
FG
1981 }
1982}
1983
9f95a23c
TL
1984void PG::unreserve_recovery_space() {
1985 primary_num_bytes.store(0);
1986 local_num_bytes.store(0);
7c673cae
FG
1987}
1988
9f95a23c 1989void PG::_scan_rollback_obs(const vector<ghobject_t> &rollback_obs)
7c673cae 1990{
9f95a23c
TL
1991 ObjectStore::Transaction t;
1992 eversion_t trimmed_to = recovery_state.get_last_rollback_info_trimmed_to_applied();
1993 for (vector<ghobject_t>::const_iterator i = rollback_obs.begin();
1994 i != rollback_obs.end();
1995 ++i) {
1996 if (i->generation < trimmed_to.version) {
1997 dout(10) << __func__ << "osd." << osd->whoami
1998 << " pg " << info.pgid
1999 << " found obsolete rollback obj "
2000 << *i << " generation < trimmed_to "
2001 << trimmed_to
2002 << "...repaired" << dendl;
2003 t.remove(coll, *i);
2004 }
2005 }
2006 if (!t.empty()) {
2007 derr << __func__ << ": queueing trans to clean up obsolete rollback objs"
2008 << dendl;
2009 osd->store->queue_transaction(ch, std::move(t), NULL);
2010 }
7c673cae
FG
2011}
2012
7c673cae 2013
9f95a23c 2014void PG::_repair_oinfo_oid(ScrubMap &smap)
7c673cae 2015{
9f95a23c
TL
2016 for (map<hobject_t, ScrubMap::object>::reverse_iterator i = smap.objects.rbegin();
2017 i != smap.objects.rend();
2018 ++i) {
2019 const hobject_t &hoid = i->first;
2020 ScrubMap::object &o = i->second;
7c673cae 2021
9f95a23c
TL
2022 bufferlist bl;
2023 if (o.attrs.find(OI_ATTR) == o.attrs.end()) {
2024 continue;
2025 }
2026 bl.push_back(o.attrs[OI_ATTR]);
2027 object_info_t oi;
2028 try {
2029 oi.decode(bl);
2030 } catch(...) {
2031 continue;
2032 }
2033 if (oi.soid != hoid) {
2034 ObjectStore::Transaction t;
2035 OSDriver::OSTransaction _t(osdriver.get_transaction(&t));
2036 osd->clog->error() << "osd." << osd->whoami
2037 << " found object info error on pg "
2038 << info.pgid
2039 << " oid " << hoid << " oid in object info: "
2040 << oi.soid
2041 << "...repaired";
2042 // Fix object info
2043 oi.soid = hoid;
2044 bl.clear();
2045 encode(oi, bl, get_osdmap()->get_features(CEPH_ENTITY_TYPE_OSD, nullptr));
7c673cae 2046
9f95a23c
TL
2047 bufferptr bp(bl.c_str(), bl.length());
2048 o.attrs[OI_ATTR] = bp;
7c673cae 2049
9f95a23c
TL
2050 t.setattr(coll, ghobject_t(hoid), OI_ATTR, bl);
2051 int r = osd->store->queue_transaction(ch, std::move(t));
2052 if (r != 0) {
2053 derr << __func__ << ": queue_transaction got " << cpp_strerror(r)
2054 << dendl;
2055 }
7c673cae
FG
2056 }
2057 }
7c673cae 2058}
7c673cae 2059
9f95a23c
TL
2060void PG::repair_object(
2061 const hobject_t &soid,
2062 const list<pair<ScrubMap::object, pg_shard_t> > &ok_peers,
2063 const set<pg_shard_t> &bad_peers)
2064{
2065 set<pg_shard_t> ok_shards;
2066 for (auto &&peer: ok_peers) ok_shards.insert(peer.second);
d2e6a577 2067
9f95a23c
TL
2068 dout(10) << "repair_object " << soid
2069 << " bad_peers osd.{" << bad_peers << "},"
2070 << " ok_peers osd.{" << ok_shards << "}" << dendl;
11fdf7f2 2071
9f95a23c
TL
2072 const ScrubMap::object &po = ok_peers.back().first;
2073 eversion_t v;
2074 object_info_t oi;
2075 try {
2076 bufferlist bv;
2077 if (po.attrs.count(OI_ATTR)) {
2078 bv.push_back(po.attrs.find(OI_ATTR)->second);
2079 }
2080 auto bliter = bv.cbegin();
2081 decode(oi, bliter);
2082 } catch (...) {
2083 dout(0) << __func__ << ": Need version of replica, bad object_info_t: "
2084 << soid << dendl;
2085 ceph_abort();
11fdf7f2
TL
2086 }
2087
9f95a23c
TL
2088 if (bad_peers.count(get_primary())) {
2089 // We should only be scrubbing if the PG is clean.
2090 ceph_assert(waiting_for_unreadable_object.empty());
2091 dout(10) << __func__ << ": primary = " << get_primary() << dendl;
11fdf7f2
TL
2092 }
2093
9f95a23c
TL
2094 /* No need to pass ok_peers, they must not be missing the object, so
2095 * force_object_missing will add them to missing_loc anyway */
2096 recovery_state.force_object_missing(bad_peers, soid, oi.version);
7c673cae
FG
2097}
2098
20effc67 2099void PG::forward_scrub_event(ScrubAPI fn, epoch_t epoch_queued, std::string_view desc)
7c673cae 2100{
20effc67
TL
2101 dout(20) << __func__ << ": " << desc << " queued at: " << epoch_queued << dendl;
2102 ceph_assert(m_scrubber);
2103 if (is_active()) {
f67539c2 2104 ((*m_scrubber).*fn)(epoch_queued);
7c673cae 2105 } else {
f67539c2
TL
2106 // pg might be in the process of being deleted
2107 dout(5) << __func__ << " refusing to forward. " << (is_clean() ? "(clean) " : "(not clean) ") <<
20effc67 2108 (is_active() ? "(active) " : "(not active) ") << dendl;
7c673cae 2109 }
f67539c2 2110}
7c673cae 2111
20effc67
TL
2112void PG::forward_scrub_event(ScrubSafeAPI fn,
2113 epoch_t epoch_queued,
2114 Scrub::act_token_t act_token,
2115 std::string_view desc)
f67539c2 2116{
20effc67
TL
2117 dout(20) << __func__ << ": " << desc << " queued: " << epoch_queued
2118 << " token: " << act_token << dendl;
2119 ceph_assert(m_scrubber);
2120 if (is_active()) {
2121 ((*m_scrubber).*fn)(epoch_queued, act_token);
2122 } else {
2123 // pg might be in the process of being deleted
2124 dout(5) << __func__ << " refusing to forward. "
2125 << (is_clean() ? "(clean) " : "(not clean) ")
2126 << (is_active() ? "(active) " : "(not active) ") << dendl;
2127 }
7c673cae
FG
2128}
2129
20effc67 2130void PG::replica_scrub(OpRequestRef op, ThreadPool::TPHandle& handle)
f6b5b4d7 2131{
20effc67
TL
2132 dout(10) << __func__ << " (op)" << dendl;
2133 ceph_assert(m_scrubber);
2134 m_scrubber->replica_scrub_op(op);
f6b5b4d7
TL
2135}
2136
f67539c2 2137void PG::replica_scrub(epoch_t epoch_queued,
20effc67 2138 Scrub::act_token_t act_token,
f67539c2
TL
2139 [[maybe_unused]] ThreadPool::TPHandle& handle)
2140{
2141 dout(10) << __func__ << " queued at: " << epoch_queued
2142 << (is_primary() ? " (primary)" : " (replica)") << dendl;
20effc67
TL
2143 forward_scrub_event(&ScrubPgIF::send_start_replica, epoch_queued, act_token,
2144 "StartReplica/nw");
f67539c2 2145}
7c673cae 2146
f67539c2
TL
2147bool PG::ops_blocked_by_scrub() const
2148{
2149 return !waiting_for_scrub.empty();
2150}
11fdf7f2 2151
f67539c2
TL
2152Scrub::scrub_prio_t PG::is_scrub_blocking_ops() const
2153{
2154 return waiting_for_scrub.empty() ? Scrub::scrub_prio_t::low_priority
2155 : Scrub::scrub_prio_t::high_priority;
7c673cae
FG
2156}
2157
9f95a23c 2158bool PG::old_peering_msg(epoch_t reply_epoch, epoch_t query_epoch)
7c673cae 2159{
f67539c2
TL
2160 if (auto last_reset = get_last_peering_reset();
2161 last_reset > reply_epoch || last_reset > query_epoch) {
2162 dout(10) << "old_peering_msg reply_epoch " << reply_epoch << " query_epoch "
2163 << query_epoch << " last_peering_reset " << last_reset << dendl;
9f95a23c
TL
2164 return true;
2165 }
2166 return false;
7c673cae
FG
2167}
2168
9f95a23c
TL
2169struct FlushState {
2170 PGRef pg;
2171 epoch_t epoch;
2172 FlushState(PG *pg, epoch_t epoch) : pg(pg), epoch(epoch) {}
2173 ~FlushState() {
2174 std::scoped_lock l{*pg};
2175 if (!pg->pg_has_reset_since(epoch)) {
2176 pg->recovery_state.complete_flush();
2177 }
7c673cae 2178 }
9f95a23c
TL
2179};
2180typedef std::shared_ptr<FlushState> FlushStateRef;
7c673cae 2181
9f95a23c 2182void PG::start_flush_on_transaction(ObjectStore::Transaction &t)
7c673cae 2183{
9f95a23c
TL
2184 // flush in progress ops
2185 FlushStateRef flush_trigger (std::make_shared<FlushState>(
2186 this, get_osdmap_epoch()));
2187 t.register_on_applied(new ContainerContext<FlushStateRef>(flush_trigger));
2188 t.register_on_commit(new ContainerContext<FlushStateRef>(flush_trigger));
7c673cae
FG
2189}
2190
9f95a23c 2191bool PG::try_flush_or_schedule_async()
11fdf7f2 2192{
9f95a23c
TL
2193 Context *c = new QueuePeeringEvt(
2194 this, get_osdmap_epoch(), PeeringState::IntervalFlush());
2195 if (!ch->flush_commit(c)) {
2196 return false;
2197 } else {
2198 delete c;
2199 return true;
2200 }
11fdf7f2
TL
2201}
2202
9f95a23c 2203ostream& operator<<(ostream& out, const PG& pg)
11fdf7f2 2204{
9f95a23c 2205 out << pg.recovery_state;
f67539c2
TL
2206
2207 // listing all scrub-related flags - both current and "planned next scrub"
2208 if (pg.is_scrubbing()) {
2209 out << *pg.m_scrubber;
2210 }
2211 out << pg.m_planned_scrub;
11fdf7f2 2212
9f95a23c
TL
2213 if (pg.recovery_ops_active)
2214 out << " rops=" << pg.recovery_ops_active;
11fdf7f2 2215
9f95a23c
TL
2216 //out << " (" << pg.pg_log.get_tail() << "," << pg.pg_log.get_head() << "]";
2217 if (pg.recovery_state.have_missing()) {
2218 out << " m=" << pg.recovery_state.get_num_missing();
2219 if (pg.is_primary()) {
2220 uint64_t unfound = pg.recovery_state.get_num_unfound();
2221 if (unfound)
2222 out << " u=" << unfound;
2223 }
2224 }
2225 if (!pg.is_clean()) {
2226 out << " mbc=" << pg.recovery_state.get_missing_by_count();
2227 }
2228 if (!pg.snap_trimq.empty()) {
2229 out << " trimq=";
2230 // only show a count if the set is large
2231 if (pg.snap_trimq.num_intervals() > 16) {
2232 out << pg.snap_trimq.size();
2233 if (!pg.snap_trimq_repeat.empty()) {
2234 out << "(" << pg.snap_trimq_repeat.size() << ")";
2235 }
2236 } else {
2237 out << pg.snap_trimq;
2238 if (!pg.snap_trimq_repeat.empty()) {
2239 out << "(" << pg.snap_trimq_repeat << ")";
2240 }
2241 }
2242 }
2243 if (!pg.recovery_state.get_info().purged_snaps.empty()) {
2244 out << " ps="; // snap trim queue / purged snaps
2245 if (pg.recovery_state.get_info().purged_snaps.num_intervals() > 16) {
2246 out << pg.recovery_state.get_info().purged_snaps.size();
2247 } else {
2248 out << pg.recovery_state.get_info().purged_snaps;
2249 }
11fdf7f2 2250 }
11fdf7f2 2251
9f95a23c 2252 out << "]";
9f95a23c 2253 return out;
11fdf7f2
TL
2254}
2255
9f95a23c 2256bool PG::can_discard_op(OpRequestRef& op)
7c673cae 2257{
9f95a23c
TL
2258 auto m = op->get_req<MOSDOp>();
2259 if (cct->_conf->osd_discard_disconnected_ops && OSD::op_is_discardable(m)) {
2260 dout(20) << " discard " << *m << dendl;
2261 return true;
2262 }
7c673cae 2263
9f95a23c
TL
2264 if (m->get_map_epoch() < info.history.same_primary_since) {
2265 dout(7) << " changed after " << m->get_map_epoch()
2266 << ", dropping " << *m << dendl;
2267 return true;
2268 }
7c673cae 2269
9f95a23c
TL
2270 if ((m->get_flags() & (CEPH_OSD_FLAG_BALANCE_READS |
2271 CEPH_OSD_FLAG_LOCALIZE_READS)) &&
2272 !is_primary() &&
2273 m->get_map_epoch() < info.history.same_interval_since) {
2274 // Note: the Objecter will resend on interval change without the primary
2275 // changing if it actually sent to a replica. If the primary hasn't
2276 // changed since the send epoch, we got it, and we're primary, it won't
2277 // have resent even if the interval did change as it sent it to the primary
2278 // (us).
2279 return true;
7c673cae 2280 }
7c673cae 2281
7c673cae 2282
9f95a23c
TL
2283 if (m->get_connection()->has_feature(CEPH_FEATURE_RESEND_ON_SPLIT)) {
2284 // >= luminous client
2285 if (m->get_connection()->has_feature(CEPH_FEATURE_SERVER_NAUTILUS)) {
2286 // >= nautilus client
2287 if (m->get_map_epoch() < pool.info.get_last_force_op_resend()) {
2288 dout(7) << __func__ << " sent before last_force_op_resend "
2289 << pool.info.last_force_op_resend
2290 << ", dropping" << *m << dendl;
2291 return true;
2292 }
2293 } else {
2294 // == < nautilus client (luminous or mimic)
2295 if (m->get_map_epoch() < pool.info.get_last_force_op_resend_prenautilus()) {
2296 dout(7) << __func__ << " sent before last_force_op_resend_prenautilus "
2297 << pool.info.last_force_op_resend_prenautilus
2298 << ", dropping" << *m << dendl;
2299 return true;
2300 }
7c673cae 2301 }
9f95a23c
TL
2302 if (m->get_map_epoch() < info.history.last_epoch_split) {
2303 dout(7) << __func__ << " pg split in "
2304 << info.history.last_epoch_split << ", dropping" << dendl;
2305 return true;
7c673cae 2306 }
9f95a23c
TL
2307 } else if (m->get_connection()->has_feature(CEPH_FEATURE_OSD_POOLRESEND)) {
2308 // < luminous client
2309 if (m->get_map_epoch() < pool.info.get_last_force_op_resend_preluminous()) {
2310 dout(7) << __func__ << " sent before last_force_op_resend_preluminous "
2311 << pool.info.last_force_op_resend_preluminous
2312 << ", dropping" << *m << dendl;
2313 return true;
7c673cae
FG
2314 }
2315 }
2316
9f95a23c 2317 return false;
7c673cae
FG
2318}
2319
9f95a23c
TL
2320template<typename T, int MSGTYPE>
2321bool PG::can_discard_replica_op(OpRequestRef& op)
7c673cae 2322{
9f95a23c
TL
2323 auto m = op->get_req<T>();
2324 ceph_assert(m->get_type() == MSGTYPE);
7c673cae 2325
9f95a23c 2326 int from = m->get_source().num();
7c673cae 2327
9f95a23c
TL
2328 // if a repop is replied after a replica goes down in a new osdmap, and
2329 // before the pg advances to this new osdmap, the repop replies before this
2330 // repop can be discarded by that replica OSD, because the primary resets the
2331 // connection to it when handling the new osdmap marking it down, and also
2332 // resets the messenger sesssion when the replica reconnects. to avoid the
2333 // out-of-order replies, the messages from that replica should be discarded.
2334 OSDMapRef next_map = osd->get_next_osdmap();
f67539c2
TL
2335 if (next_map->is_down(from)) {
2336 dout(20) << " " << __func__ << " dead for nextmap is down " << from << dendl;
9f95a23c 2337 return true;
f67539c2 2338 }
9f95a23c
TL
2339 /* Mostly, this overlaps with the old_peering_msg
2340 * condition. An important exception is pushes
2341 * sent by replicas not in the acting set, since
2342 * if such a replica goes down it does not cause
2343 * a new interval. */
f67539c2
TL
2344 if (next_map->get_down_at(from) >= m->map_epoch) {
2345 dout(20) << " " << __func__ << " dead for 'get_down_at' " << from << dendl;
9f95a23c 2346 return true;
f67539c2 2347 }
7c673cae 2348
9f95a23c
TL
2349 // same pg?
2350 // if pg changes _at all_, we reset and repeer!
2351 if (old_peering_msg(m->map_epoch, m->map_epoch)) {
2352 dout(10) << "can_discard_replica_op pg changed " << info.history
2353 << " after " << m->map_epoch
2354 << ", dropping" << dendl;
2355 return true;
7c673cae 2356 }
9f95a23c 2357 return false;
7c673cae
FG
2358}
2359
9f95a23c 2360bool PG::can_discard_scan(OpRequestRef op)
7c673cae 2361{
9f95a23c
TL
2362 auto m = op->get_req<MOSDPGScan>();
2363 ceph_assert(m->get_type() == MSG_OSD_PG_SCAN);
7c673cae 2364
9f95a23c
TL
2365 if (old_peering_msg(m->map_epoch, m->query_epoch)) {
2366 dout(10) << " got old scan, ignoring" << dendl;
2367 return true;
7c673cae 2368 }
9f95a23c 2369 return false;
7c673cae
FG
2370}
2371
9f95a23c 2372bool PG::can_discard_backfill(OpRequestRef op)
7c673cae 2373{
9f95a23c
TL
2374 auto m = op->get_req<MOSDPGBackfill>();
2375 ceph_assert(m->get_type() == MSG_OSD_PG_BACKFILL);
7c673cae 2376
9f95a23c
TL
2377 if (old_peering_msg(m->map_epoch, m->query_epoch)) {
2378 dout(10) << " got old backfill, ignoring" << dendl;
2379 return true;
7c673cae
FG
2380 }
2381
9f95a23c 2382 return false;
7c673cae 2383
7c673cae
FG
2384}
2385
9f95a23c 2386bool PG::can_discard_request(OpRequestRef& op)
7c673cae 2387{
9f95a23c
TL
2388 switch (op->get_req()->get_type()) {
2389 case CEPH_MSG_OSD_OP:
2390 return can_discard_op(op);
2391 case CEPH_MSG_OSD_BACKOFF:
2392 return false; // never discard
2393 case MSG_OSD_REPOP:
2394 return can_discard_replica_op<MOSDRepOp, MSG_OSD_REPOP>(op);
2395 case MSG_OSD_PG_PUSH:
2396 return can_discard_replica_op<MOSDPGPush, MSG_OSD_PG_PUSH>(op);
2397 case MSG_OSD_PG_PULL:
2398 return can_discard_replica_op<MOSDPGPull, MSG_OSD_PG_PULL>(op);
2399 case MSG_OSD_PG_PUSH_REPLY:
2400 return can_discard_replica_op<MOSDPGPushReply, MSG_OSD_PG_PUSH_REPLY>(op);
2401 case MSG_OSD_REPOPREPLY:
2402 return can_discard_replica_op<MOSDRepOpReply, MSG_OSD_REPOPREPLY>(op);
2403 case MSG_OSD_PG_RECOVERY_DELETE:
2404 return can_discard_replica_op<MOSDPGRecoveryDelete, MSG_OSD_PG_RECOVERY_DELETE>(op);
7c673cae 2405
9f95a23c
TL
2406 case MSG_OSD_PG_RECOVERY_DELETE_REPLY:
2407 return can_discard_replica_op<MOSDPGRecoveryDeleteReply, MSG_OSD_PG_RECOVERY_DELETE_REPLY>(op);
7c673cae 2408
9f95a23c
TL
2409 case MSG_OSD_EC_WRITE:
2410 return can_discard_replica_op<MOSDECSubOpWrite, MSG_OSD_EC_WRITE>(op);
2411 case MSG_OSD_EC_WRITE_REPLY:
2412 return can_discard_replica_op<MOSDECSubOpWriteReply, MSG_OSD_EC_WRITE_REPLY>(op);
2413 case MSG_OSD_EC_READ:
2414 return can_discard_replica_op<MOSDECSubOpRead, MSG_OSD_EC_READ>(op);
2415 case MSG_OSD_EC_READ_REPLY:
2416 return can_discard_replica_op<MOSDECSubOpReadReply, MSG_OSD_EC_READ_REPLY>(op);
2417 case MSG_OSD_REP_SCRUB:
2418 return can_discard_replica_op<MOSDRepScrub, MSG_OSD_REP_SCRUB>(op);
2419 case MSG_OSD_SCRUB_RESERVE:
2420 return can_discard_replica_op<MOSDScrubReserve, MSG_OSD_SCRUB_RESERVE>(op);
2421 case MSG_OSD_REP_SCRUBMAP:
2422 return can_discard_replica_op<MOSDRepScrubMap, MSG_OSD_REP_SCRUBMAP>(op);
2423 case MSG_OSD_PG_UPDATE_LOG_MISSING:
2424 return can_discard_replica_op<
2425 MOSDPGUpdateLogMissing, MSG_OSD_PG_UPDATE_LOG_MISSING>(op);
2426 case MSG_OSD_PG_UPDATE_LOG_MISSING_REPLY:
2427 return can_discard_replica_op<
2428 MOSDPGUpdateLogMissingReply, MSG_OSD_PG_UPDATE_LOG_MISSING_REPLY>(op);
2429
2430 case MSG_OSD_PG_SCAN:
2431 return can_discard_scan(op);
2432 case MSG_OSD_PG_BACKFILL:
2433 return can_discard_backfill(op);
2434 case MSG_OSD_PG_BACKFILL_REMOVE:
2435 return can_discard_replica_op<MOSDPGBackfillRemove,
2436 MSG_OSD_PG_BACKFILL_REMOVE>(op);
7c673cae 2437 }
9f95a23c 2438 return true;
7c673cae
FG
2439}
2440
9f95a23c 2441void PG::do_peering_event(PGPeeringEventRef evt, PeeringCtx &rctx)
7c673cae 2442{
9f95a23c
TL
2443 dout(10) << __func__ << ": " << evt->get_desc() << dendl;
2444 ceph_assert(have_same_or_newer_map(evt->get_epoch_sent()));
2445 if (old_peering_evt(evt)) {
2446 dout(10) << "discard old " << evt->get_desc() << dendl;
2447 } else {
2448 recovery_state.handle_event(evt, &rctx);
7c673cae 2449 }
9f95a23c
TL
2450 // write_if_dirty regardless of path above to ensure we capture any work
2451 // done by OSD::advance_pg().
2452 write_if_dirty(rctx.transaction);
7c673cae
FG
2453}
2454
9f95a23c 2455void PG::queue_peering_event(PGPeeringEventRef evt)
7c673cae 2456{
9f95a23c
TL
2457 if (old_peering_evt(evt))
2458 return;
2459 osd->osd->enqueue_peering_evt(info.pgid, evt);
7c673cae
FG
2460}
2461
9f95a23c
TL
2462void PG::queue_null(epoch_t msg_epoch,
2463 epoch_t query_epoch)
7c673cae 2464{
9f95a23c
TL
2465 dout(10) << "null" << dendl;
2466 queue_peering_event(
2467 PGPeeringEventRef(std::make_shared<PGPeeringEvent>(msg_epoch, query_epoch,
2468 NullEvt())));
7c673cae
FG
2469}
2470
9f95a23c 2471void PG::find_unfound(epoch_t queued, PeeringCtx &rctx)
7c673cae 2472{
9f95a23c
TL
2473 /*
2474 * if we couldn't start any recovery ops and things are still
2475 * unfound, see if we can discover more missing object locations.
2476 * It may be that our initial locations were bad and we errored
2477 * out while trying to pull.
2478 */
2479 if (!recovery_state.discover_all_missing(rctx)) {
2480 string action;
2481 if (state_test(PG_STATE_BACKFILLING)) {
2482 auto evt = PGPeeringEventRef(
2483 new PGPeeringEvent(
2484 queued,
2485 queued,
2486 PeeringState::UnfoundBackfill()));
2487 queue_peering_event(evt);
2488 action = "in backfill";
2489 } else if (state_test(PG_STATE_RECOVERING)) {
2490 auto evt = PGPeeringEventRef(
2491 new PGPeeringEvent(
2492 queued,
2493 queued,
2494 PeeringState::UnfoundRecovery()));
2495 queue_peering_event(evt);
2496 action = "in recovery";
2497 } else {
2498 action = "already out of recovery/backfill";
7c673cae 2499 }
9f95a23c
TL
2500 dout(10) << __func__ << ": no luck, giving up on this pg for now (" << action << ")" << dendl;
2501 } else {
2502 dout(10) << __func__ << ": no luck, giving up on this pg for now (queue_recovery)" << dendl;
2503 queue_recovery();
7c673cae 2504 }
7c673cae
FG
2505}
2506
9f95a23c
TL
2507void PG::handle_advance_map(
2508 OSDMapRef osdmap, OSDMapRef lastmap,
2509 vector<int>& newup, int up_primary,
2510 vector<int>& newacting, int acting_primary,
2511 PeeringCtx &rctx)
7c673cae 2512{
9f95a23c
TL
2513 dout(10) << __func__ << ": " << osdmap->get_epoch() << dendl;
2514 osd_shard->update_pg_epoch(pg_slot, osdmap->get_epoch());
2515 recovery_state.advance_map(
2516 osdmap,
2517 lastmap,
2518 newup,
2519 up_primary,
2520 newacting,
2521 acting_primary,
2522 rctx);
7c673cae
FG
2523}
2524
9f95a23c 2525void PG::handle_activate_map(PeeringCtx &rctx)
7c673cae 2526{
9f95a23c
TL
2527 dout(10) << __func__ << ": " << get_osdmap()->get_epoch()
2528 << dendl;
2529 recovery_state.activate_map(rctx);
7c673cae 2530
9f95a23c 2531 requeue_map_waiters();
7c673cae
FG
2532}
2533
9f95a23c 2534void PG::handle_initialize(PeeringCtx &rctx)
7c673cae 2535{
9f95a23c
TL
2536 dout(10) << __func__ << dendl;
2537 PeeringState::Initialize evt;
2538 recovery_state.handle_event(evt, &rctx);
7c673cae
FG
2539}
2540
f67539c2 2541
9f95a23c 2542void PG::handle_query_state(Formatter *f)
7c673cae 2543{
9f95a23c
TL
2544 dout(10) << "handle_query_state" << dendl;
2545 PeeringState::QueryState q(f);
2546 recovery_state.handle_event(q, 0);
7c673cae
FG
2547}
2548
9f95a23c 2549void PG::init_collection_pool_opts()
11fdf7f2 2550{
9f95a23c
TL
2551 auto r = osd->store->set_collection_opts(ch, pool.info.opts);
2552 if (r < 0 && r != -EOPNOTSUPP) {
2553 derr << __func__ << " set_collection_opts returns error:" << r << dendl;
11fdf7f2 2554 }
11fdf7f2
TL
2555}
2556
9f95a23c 2557void PG::on_pool_change()
7c673cae 2558{
9f95a23c
TL
2559 init_collection_pool_opts();
2560 plpg_on_pool_change();
7c673cae
FG
2561}
2562
9f95a23c
TL
2563void PG::C_DeleteMore::complete(int r) {
2564 ceph_assert(r == 0);
2565 pg->lock();
2566 if (!pg->pg_has_reset_since(epoch)) {
2567 pg->osd->queue_for_pg_delete(pg->get_pgid(), epoch);
7c673cae 2568 }
9f95a23c
TL
2569 pg->unlock();
2570 delete this;
7c673cae
FG
2571}
2572
f67539c2
TL
2573std::pair<ghobject_t, bool> PG::do_delete_work(
2574 ObjectStore::Transaction &t,
2575 ghobject_t _next)
7c673cae 2576{
9f95a23c 2577 dout(10) << __func__ << dendl;
7c673cae 2578
9f95a23c
TL
2579 {
2580 float osd_delete_sleep = osd->osd->get_osd_delete_sleep();
2581 if (osd_delete_sleep > 0 && delete_needs_sleep) {
2582 epoch_t e = get_osdmap()->get_epoch();
2583 PGRef pgref(this);
2584 auto delete_requeue_callback = new LambdaContext([this, pgref, e](int r) {
20effc67 2585 dout(20) << "do_delete_work() [cb] wake up at "
9f95a23c
TL
2586 << ceph_clock_now()
2587 << ", re-queuing delete" << dendl;
2588 std::scoped_lock locker{*this};
2589 delete_needs_sleep = false;
2590 if (!pg_has_reset_since(e)) {
2591 osd->queue_for_pg_delete(get_pgid(), e);
2592 }
2593 });
7c673cae 2594
9f95a23c
TL
2595 auto delete_schedule_time = ceph::real_clock::now();
2596 delete_schedule_time += ceph::make_timespan(osd_delete_sleep);
2597 std::lock_guard l{osd->sleep_lock};
2598 osd->sleep_timer.add_event_at(delete_schedule_time,
2599 delete_requeue_callback);
2600 dout(20) << __func__ << " Delete scheduled at " << delete_schedule_time << dendl;
f67539c2 2601 return std::make_pair(_next, true);
9f95a23c
TL
2602 }
2603 }
7c673cae 2604
9f95a23c 2605 delete_needs_sleep = true;
7c673cae 2606
adb31ebb
TL
2607 ghobject_t next;
2608
9f95a23c
TL
2609 vector<ghobject_t> olist;
2610 int max = std::min(osd->store->get_ideal_list_max(),
2611 (int)cct->_conf->osd_target_transaction_size);
adb31ebb 2612
9f95a23c
TL
2613 osd->store->collection_list(
2614 ch,
adb31ebb 2615 _next,
9f95a23c
TL
2616 ghobject_t::get_max(),
2617 max,
2618 &olist,
2619 &next);
2620 dout(20) << __func__ << " " << olist << dendl;
7c673cae 2621
adb31ebb
TL
2622 // make sure we've removed everything
2623 // by one more listing from the beginning
2624 if (_next != ghobject_t() && olist.empty()) {
2625 next = ghobject_t();
2626 osd->store->collection_list(
2627 ch,
2628 next,
2629 ghobject_t::get_max(),
2630 max,
2631 &olist,
2632 &next);
20effc67
TL
2633 for (auto& oid : olist) {
2634 if (oid == pgmeta_oid) {
2635 dout(20) << __func__ << " removing pgmeta object " << oid << dendl;
2636 } else {
2637 dout(0) << __func__ << " additional unexpected onode"
2638 <<" new onode has appeared since PG removal started"
2639 << oid << dendl;
b3b6e05e 2640 }
adb31ebb
TL
2641 }
2642 }
2643
9f95a23c
TL
2644 OSDriver::OSTransaction _t(osdriver.get_transaction(&t));
2645 int64_t num = 0;
2646 for (auto& oid : olist) {
2647 if (oid == pgmeta_oid) {
7c673cae
FG
2648 continue;
2649 }
9f95a23c
TL
2650 if (oid.is_pgmeta()) {
2651 osd->clog->warn() << info.pgid << " found stray pgmeta-like " << oid
2652 << " during PG removal";
7c673cae 2653 }
9f95a23c
TL
2654 int r = snap_mapper.remove_oid(oid.hobj, &_t);
2655 if (r != 0 && r != -ENOENT) {
2656 ceph_abort();
7c673cae 2657 }
9f95a23c
TL
2658 t.remove(coll, oid);
2659 ++num;
7c673cae 2660 }
f67539c2 2661 bool running = true;
9f95a23c
TL
2662 if (num) {
2663 dout(20) << __func__ << " deleting " << num << " objects" << dendl;
2664 Context *fin = new C_DeleteMore(this, get_osdmap_epoch());
2665 t.register_on_commit(fin);
7c673cae 2666 } else {
9f95a23c
TL
2667 if (cct->_conf->osd_inject_failure_on_pg_removal) {
2668 _exit(1);
7c673cae 2669 }
7c673cae 2670
9f95a23c
TL
2671 // final flush here to ensure completions drop refs. Of particular concern
2672 // are the SnapMapper ContainerContexts.
2673 {
2674 PGRef pgref(this);
2675 PGLog::clear_info_log(info.pgid, &t);
2676 t.remove_collection(coll);
2677 t.register_on_commit(new ContainerContext<PGRef>(pgref));
2678 t.register_on_applied(new ContainerContext<PGRef>(pgref));
2679 osd->store->queue_transaction(ch, std::move(t));
7c673cae 2680 }
9f95a23c 2681 ch->flush();
7c673cae 2682
9f95a23c
TL
2683 if (!osd->try_finish_pg_delete(this, pool.info.get_pg_num())) {
2684 dout(1) << __func__ << " raced with merge, reinstantiating" << dendl;
2685 ch = osd->store->create_new_collection(coll);
2686 create_pg_collection(t,
2687 info.pgid,
2688 info.pgid.get_split_bits(pool.info.get_pg_num()));
2689 init_pg_ondisk(t, info.pgid, &pool.info);
2690 recovery_state.reset_last_persisted();
2691 } else {
2692 recovery_state.set_delete_complete();
7c673cae 2693
9f95a23c
TL
2694 // cancel reserver here, since the PG is about to get deleted and the
2695 // exit() methods don't run when that happens.
2696 osd->local_reserver.cancel_reservation(info.pgid);
7c673cae 2697
f67539c2 2698 running = false;
9f95a23c 2699 }
7c673cae 2700 }
f67539c2 2701 return {next, running};
7c673cae
FG
2702}
2703
9f95a23c 2704int PG::pg_stat_adjust(osd_stat_t *ns)
7c673cae 2705{
9f95a23c
TL
2706 osd_stat_t &new_stat = *ns;
2707 if (is_primary()) {
2708 return 0;
7c673cae 2709 }
9f95a23c
TL
2710 // Adjust the kb_used by adding pending backfill data
2711 uint64_t reserved_num_bytes = get_reserved_num_bytes();
7c673cae 2712
9f95a23c
TL
2713 // For now we don't consider projected space gains here
2714 // I suggest we have an optional 2 pass backfill that frees up
2715 // space in a first pass. This could be triggered when at nearfull
2716 // or near to backfillfull.
2717 if (reserved_num_bytes > 0) {
2718 // TODO: Handle compression by adjusting by the PGs average
2719 // compression precentage.
2720 dout(20) << __func__ << " reserved_num_bytes " << (reserved_num_bytes >> 10) << "KiB"
2721 << " Before kb_used " << new_stat.statfs.kb_used() << "KiB" << dendl;
2722 if (new_stat.statfs.available > reserved_num_bytes)
2723 new_stat.statfs.available -= reserved_num_bytes;
2724 else
2725 new_stat.statfs.available = 0;
2726 dout(20) << __func__ << " After kb_used " << new_stat.statfs.kb_used() << "KiB" << dendl;
2727 return 1;
7c673cae 2728 }
9f95a23c 2729 return 0;
7c673cae
FG
2730}
2731
11fdf7f2
TL
2732void PG::dump_pgstate_history(Formatter *f)
2733{
9f95a23c
TL
2734 std::scoped_lock l{*this};
2735 recovery_state.dump_history(f);
11fdf7f2 2736}
7c673cae 2737
11fdf7f2
TL
2738void PG::dump_missing(Formatter *f)
2739{
9f95a23c 2740 for (auto& i : recovery_state.get_pg_log().get_missing().get_items()) {
11fdf7f2
TL
2741 f->open_object_section("object");
2742 f->dump_object("oid", i.first);
2743 f->dump_object("missing_info", i.second);
9f95a23c
TL
2744 if (recovery_state.get_missing_loc().needs_recovery(i.first)) {
2745 f->dump_bool(
2746 "unfound",
2747 recovery_state.get_missing_loc().is_unfound(i.first));
11fdf7f2 2748 f->open_array_section("locations");
9f95a23c 2749 for (auto l : recovery_state.get_missing_loc().get_locations(i.first)) {
11fdf7f2
TL
2750 f->dump_object("shard", l);
2751 }
2752 f->close_section();
2753 }
2754 f->close_section();
2755 }
2756}
2757
20effc67 2758void PG::with_pg_stats(std::function<void(const pg_stat_t&, epoch_t lec)>&& f)
11fdf7f2 2759{
9f95a23c 2760 std::lock_guard l{pg_stats_publish_lock};
20effc67
TL
2761 if (pg_stats_publish) {
2762 f(*pg_stats_publish, pg_stats_publish->get_effective_last_epoch_clean());
11fdf7f2 2763 }
11fdf7f2
TL
2764}
2765
20effc67 2766void PG::with_heartbeat_peers(std::function<void(int)>&& f)
11fdf7f2 2767{
9f95a23c 2768 std::lock_guard l{heartbeat_peer_lock};
11fdf7f2
TL
2769 for (auto p : heartbeat_peers) {
2770 f(p);
2771 }
2772 for (auto p : probe_targets) {
2773 f(p);
2774 }
9f95a23c
TL
2775}
2776
2777uint64_t PG::get_min_alloc_size() const {
2778 return osd->store->get_min_alloc_size();
11fdf7f2 2779}