]> git.proxmox.com Git - ceph.git/blame - ceph/src/osd/PG.cc
bump version to 18.2.2-pve1
[ceph.git] / ceph / src / osd / PG.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15#include "PG.h"
7c673cae 16#include "messages/MOSDRepScrub.h"
7c673cae
FG
17
18#include "common/errno.h"
9f95a23c 19#include "common/ceph_releases.h"
7c673cae
FG
20#include "common/config.h"
21#include "OSD.h"
22#include "OpRequest.h"
20effc67
TL
23#include "osd/scrubber/ScrubStore.h"
24#include "osd/scrubber/pg_scrubber.h"
9f95a23c 25#include "osd/scheduler/OpSchedulerItem.h"
20effc67 26#include "Session.h"
7c673cae
FG
27
28#include "common/Timer.h"
29#include "common/perf_counters.h"
30
31#include "messages/MOSDOp.h"
7c673cae
FG
32#include "messages/MOSDPGScan.h"
33#include "messages/MOSDPGBackfill.h"
34#include "messages/MOSDPGBackfillRemove.h"
35#include "messages/MBackfillReserve.h"
36#include "messages/MRecoveryReserve.h"
37#include "messages/MOSDPGPush.h"
38#include "messages/MOSDPGPushReply.h"
39#include "messages/MOSDPGPull.h"
40#include "messages/MOSDECSubOpWrite.h"
41#include "messages/MOSDECSubOpWriteReply.h"
42#include "messages/MOSDECSubOpRead.h"
43#include "messages/MOSDECSubOpReadReply.h"
44#include "messages/MOSDPGUpdateLogMissing.h"
45#include "messages/MOSDPGUpdateLogMissingReply.h"
46#include "messages/MOSDBackoff.h"
47#include "messages/MOSDScrubReserve.h"
7c673cae 48#include "messages/MOSDRepOp.h"
7c673cae
FG
49#include "messages/MOSDRepOpReply.h"
50#include "messages/MOSDRepScrubMap.h"
c07f9fc5
FG
51#include "messages/MOSDPGRecoveryDelete.h"
52#include "messages/MOSDPGRecoveryDeleteReply.h"
7c673cae
FG
53
54#include "common/BackTrace.h"
55#include "common/EventTrace.h"
56
57#ifdef WITH_LTTNG
58#define TRACEPOINT_DEFINE
59#define TRACEPOINT_PROBE_DYNAMIC_LINKAGE
60#include "tracing/pg.h"
61#undef TRACEPOINT_PROBE_DYNAMIC_LINKAGE
62#undef TRACEPOINT_DEFINE
63#else
64#define tracepoint(...)
65#endif
66
67#include <sstream>
68
69#define dout_context cct
70#define dout_subsys ceph_subsys_osd
71#undef dout_prefix
72#define dout_prefix _prefix(_dout, this)
73
f67539c2
TL
74using std::list;
75using std::map;
76using std::ostringstream;
77using std::pair;
78using std::set;
79using std::string;
80using std::stringstream;
81using std::unique_ptr;
82using std::vector;
83
84using ceph::bufferlist;
85using ceph::bufferptr;
86using ceph::decode;
87using ceph::encode;
88using ceph::Formatter;
89
9f95a23c 90using namespace ceph::osd::scheduler;
7c673cae
FG
91
92template <class T>
93static ostream& _prefix(std::ostream *_dout, T *t)
94{
11fdf7f2 95 return t->gen_prefix(*_dout);
7c673cae
FG
96}
97
7c673cae
FG
98void PG::get(const char* tag)
99{
11fdf7f2
TL
100 int after = ++ref;
101 lgeneric_subdout(cct, refs, 5) << "PG::get " << this << " "
102 << "tag " << (tag ? tag : "(none") << " "
103 << (after - 1) << " -> " << after << dendl;
7c673cae 104#ifdef PG_DEBUG_REFS
11fdf7f2 105 std::lock_guard l(_ref_id_lock);
7c673cae
FG
106 _tag_counts[tag]++;
107#endif
108}
109
110void PG::put(const char* tag)
111{
112#ifdef PG_DEBUG_REFS
113 {
11fdf7f2 114 std::lock_guard l(_ref_id_lock);
7c673cae 115 auto tag_counts_entry = _tag_counts.find(tag);
11fdf7f2 116 ceph_assert(tag_counts_entry != _tag_counts.end());
7c673cae
FG
117 --tag_counts_entry->second;
118 if (tag_counts_entry->second == 0) {
119 _tag_counts.erase(tag_counts_entry);
120 }
121 }
122#endif
11fdf7f2
TL
123 auto local_cct = cct;
124 int after = --ref;
125 lgeneric_subdout(local_cct, refs, 5) << "PG::put " << this << " "
126 << "tag " << (tag ? tag : "(none") << " "
127 << (after + 1) << " -> " << after
128 << dendl;
129 if (after == 0)
7c673cae
FG
130 delete this;
131}
132
133#ifdef PG_DEBUG_REFS
134uint64_t PG::get_with_id()
135{
136 ref++;
11fdf7f2 137 std::lock_guard l(_ref_id_lock);
7c673cae 138 uint64_t id = ++_ref_id;
20effc67 139 ClibBackTrace bt(0);
7c673cae
FG
140 stringstream ss;
141 bt.print(ss);
11fdf7f2
TL
142 lgeneric_subdout(cct, refs, 5) << "PG::get " << this << " " << info.pgid
143 << " got id " << id << " "
144 << (ref - 1) << " -> " << ref
145 << dendl;
146 ceph_assert(!_live_ids.count(id));
7c673cae
FG
147 _live_ids.insert(make_pair(id, ss.str()));
148 return id;
149}
150
151void PG::put_with_id(uint64_t id)
152{
11fdf7f2
TL
153 int newref = --ref;
154 lgeneric_subdout(cct, refs, 5) << "PG::put " << this << " " << info.pgid
155 << " put id " << id << " "
156 << (newref + 1) << " -> " << newref
157 << dendl;
7c673cae 158 {
11fdf7f2
TL
159 std::lock_guard l(_ref_id_lock);
160 ceph_assert(_live_ids.count(id));
7c673cae
FG
161 _live_ids.erase(id);
162 }
11fdf7f2 163 if (newref)
7c673cae
FG
164 delete this;
165}
166
167void PG::dump_live_ids()
168{
11fdf7f2 169 std::lock_guard l(_ref_id_lock);
7c673cae
FG
170 dout(0) << "\t" << __func__ << ": " << info.pgid << " live ids:" << dendl;
171 for (map<uint64_t, string>::iterator i = _live_ids.begin();
172 i != _live_ids.end();
173 ++i) {
174 dout(0) << "\t\tid: " << *i << dendl;
175 }
176 dout(0) << "\t" << __func__ << ": " << info.pgid << " live tags:" << dendl;
177 for (map<string, uint64_t>::iterator i = _tag_counts.begin();
178 i != _tag_counts.end();
179 ++i) {
180 dout(0) << "\t\tid: " << *i << dendl;
181 }
182}
183#endif
184
7c673cae
FG
185PG::PG(OSDService *o, OSDMapRef curmap,
186 const PGPool &_pool, spg_t p) :
9f95a23c 187 pg_whoami(o->whoami, p.shard),
11fdf7f2
TL
188 pg_id(p),
189 coll(p),
7c673cae
FG
190 osd(o),
191 cct(o->cct),
192 osdriver(osd->store, coll_t(), OSD::make_snapmapper_oid()),
193 snap_mapper(
194 cct,
195 &osdriver,
196 p.ps(),
11fdf7f2 197 p.get_split_bits(_pool.info.get_pg_num()),
7c673cae
FG
198 _pool.id,
199 p.shard),
7c673cae 200 trace_endpoint("0.0.0.0", 0, "PG"),
7c673cae 201 info_struct_v(0),
7c673cae 202 pgmeta_oid(p.make_pgmeta_oid()),
7c673cae 203 stat_queue_item(this),
7c673cae
FG
204 recovery_queued(false),
205 recovery_ops_active(0),
7c673cae 206 backfill_reserving(false),
7c673cae 207 finish_sync_event(NULL),
7c673cae
FG
208 scrub_after_recovery(false),
209 active_pushes(0),
9f95a23c
TL
210 recovery_state(
211 o->cct,
212 pg_whoami,
213 p,
214 _pool,
215 curmap,
216 this,
217 this),
1e59de90 218 pool(recovery_state.get_pgpool()),
9f95a23c 219 info(recovery_state.get_info())
7c673cae
FG
220{
221#ifdef PG_DEBUG_REFS
222 osd->add_pgid(p, this);
223#endif
224#ifdef WITH_BLKIN
225 std::stringstream ss;
226 ss << "PG " << info.pgid;
227 trace_endpoint.copy_name(ss.str());
228#endif
7c673cae
FG
229}
230
231PG::~PG()
232{
7c673cae
FG
233#ifdef PG_DEBUG_REFS
234 osd->remove_pgid(info.pgid, this);
235#endif
236}
237
7c673cae
FG
238void PG::lock(bool no_lockdep) const
239{
9f95a23c
TL
240#ifdef CEPH_DEBUG_MUTEX
241 _lock.lock(no_lockdep);
242#else
243 _lock.lock();
244 locked_by = std::this_thread::get_id();
245#endif
7c673cae 246 // if we have unrecorded dirty state with the lock dropped, there is a bug
9f95a23c 247 ceph_assert(!recovery_state.debug_has_dirty_state());
7c673cae
FG
248
249 dout(30) << "lock" << dendl;
250}
251
9f95a23c
TL
252bool PG::is_locked() const
253{
254 return ceph_mutex_is_locked(_lock);
255}
256
257void PG::unlock() const
258{
259 //generic_dout(0) << this << " " << info.pgid << " unlock" << dendl;
260 ceph_assert(!recovery_state.debug_has_dirty_state());
261#ifndef CEPH_DEBUG_MUTEX
262 locked_by = {};
263#endif
264 _lock.unlock();
265}
266
11fdf7f2 267std::ostream& PG::gen_prefix(std::ostream& out) const
7c673cae 268{
9f95a23c
TL
269 OSDMapRef mapref = recovery_state.get_osdmap();
270#ifdef CEPH_DEBUG_MUTEX
7c673cae 271 if (_lock.is_locked_by_me()) {
9f95a23c
TL
272#else
273 if (locked_by == std::this_thread::get_id()) {
274#endif
7c673cae
FG
275 out << "osd." << osd->whoami
276 << " pg_epoch: " << (mapref ? mapref->get_epoch():0)
277 << " " << *this << " ";
278 } else {
279 out << "osd." << osd->whoami
280 << " pg_epoch: " << (mapref ? mapref->get_epoch():0)
9f95a23c 281 << " pg[" << pg_id.pgid << "(unlocked)] ";
7c673cae 282 }
11fdf7f2 283 return out;
7c673cae 284}
7c673cae 285
9f95a23c
TL
286PerfCounters &PG::get_peering_perf() {
287 return *(osd->recoverystate_perf);
7c673cae
FG
288}
289
9f95a23c
TL
290PerfCounters &PG::get_perf_logger() {
291 return *(osd->logger);
292}
7c673cae 293
9f95a23c
TL
294void PG::log_state_enter(const char *state) {
295 osd->pg_recovery_stats.log_enter(state);
296}
7c673cae 297
9f95a23c
TL
298void PG::log_state_exit(
299 const char *state_name, utime_t enter_time,
300 uint64_t events, utime_t event_dur) {
301 osd->pg_recovery_stats.log_exit(
302 state_name, ceph_clock_now() - enter_time, events, event_dur);
7c673cae 303}
f67539c2 304
9f95a23c 305/********* PG **********/
7c673cae
FG
306
307void PG::remove_snap_mapped_object(
308 ObjectStore::Transaction &t, const hobject_t &soid)
309{
310 t.remove(
311 coll,
312 ghobject_t(soid, ghobject_t::NO_GEN, pg_whoami.shard));
313 clear_object_snap_mapping(&t, soid);
314}
315
316void PG::clear_object_snap_mapping(
317 ObjectStore::Transaction *t, const hobject_t &soid)
318{
319 OSDriver::OSTransaction _t(osdriver.get_transaction(t));
320 if (soid.snap < CEPH_MAXSNAP) {
321 int r = snap_mapper.remove_oid(
322 soid,
323 &_t);
324 if (!(r == 0 || r == -ENOENT)) {
325 derr << __func__ << ": remove_oid returned " << cpp_strerror(r) << dendl;
326 ceph_abort();
327 }
328 }
329}
330
331void PG::update_object_snap_mapping(
332 ObjectStore::Transaction *t, const hobject_t &soid, const set<snapid_t> &snaps)
333{
334 OSDriver::OSTransaction _t(osdriver.get_transaction(t));
11fdf7f2 335 ceph_assert(soid.snap < CEPH_MAXSNAP);
7c673cae
FG
336 int r = snap_mapper.remove_oid(
337 soid,
338 &_t);
339 if (!(r == 0 || r == -ENOENT)) {
340 derr << __func__ << ": remove_oid returned " << cpp_strerror(r) << dendl;
341 ceph_abort();
342 }
343 snap_mapper.add_oid(
344 soid,
345 snaps,
346 &_t);
347}
348
9f95a23c
TL
349/******* PG ***********/
350void PG::clear_primary_state()
7c673cae 351{
f67539c2
TL
352 dout(20) << __func__ << dendl;
353
9f95a23c 354 projected_log = PGLog::IndexedLog();
7c673cae 355
9f95a23c
TL
356 snap_trimq.clear();
357 snap_trimq_repeat.clear();
358 finish_sync_event = 0; // so that _finish_recovery doesn't go off in another thread
359 release_pg_backoffs();
7c673cae 360
f67539c2
TL
361 if (m_scrubber) {
362 m_scrubber->discard_replica_reservations();
363 }
9f95a23c
TL
364 scrub_after_recovery = false;
365
366 agent_clear();
7c673cae
FG
367}
368
91327a77 369
9f95a23c
TL
370bool PG::op_has_sufficient_caps(OpRequestRef& op)
371{
372 // only check MOSDOp
373 if (op->get_req()->get_type() != CEPH_MSG_OSD_OP)
c07f9fc5 374 return true;
9f95a23c
TL
375
376 auto req = op->get_req<MOSDOp>();
377 auto priv = req->get_connection()->get_priv();
378 auto session = static_cast<Session*>(priv.get());
379 if (!session) {
380 dout(0) << "op_has_sufficient_caps: no session for op " << *req << dendl;
c07f9fc5 381 return false;
7c673cae 382 }
9f95a23c
TL
383 OSDCap& caps = session->caps;
384 priv.reset();
7c673cae 385
9f95a23c
TL
386 const string &key = req->get_hobj().get_key().empty() ?
387 req->get_oid().name :
388 req->get_hobj().get_key();
91327a77 389
9f95a23c
TL
390 bool cap = caps.is_capable(pool.name, req->get_hobj().nspace,
391 pool.info.application_metadata,
392 key,
393 op->need_read_cap(),
394 op->need_write_cap(),
395 op->classes(),
396 session->get_peer_socket_addr());
91327a77 397
9f95a23c
TL
398 dout(20) << "op_has_sufficient_caps "
399 << "session=" << session
400 << " pool=" << pool.id << " (" << pool.name
401 << " " << req->get_hobj().nspace
402 << ")"
403 << " pool_app_metadata=" << pool.info.application_metadata
404 << " need_read_cap=" << op->need_read_cap()
405 << " need_write_cap=" << op->need_write_cap()
406 << " classes=" << op->classes()
407 << " -> " << (cap ? "yes" : "NO")
408 << dendl;
409 return cap;
7c673cae
FG
410}
411
9f95a23c 412void PG::queue_recovery()
91327a77 413{
9f95a23c
TL
414 if (!is_primary() || !is_peered()) {
415 dout(10) << "queue_recovery -- not primary or not peered " << dendl;
416 ceph_assert(!recovery_queued);
417 } else if (recovery_queued) {
418 dout(10) << "queue_recovery -- already queued" << dendl;
91327a77 419 } else {
9f95a23c
TL
420 dout(10) << "queue_recovery -- queuing" << dendl;
421 recovery_queued = true;
1e59de90
TL
422 // Let cost per object be the average object size
423 auto num_bytes = static_cast<uint64_t>(
424 std::max<int64_t>(
425 0, // ensure bytes is non-negative
426 info.stats.stats.sum.num_bytes));
427 auto num_objects = static_cast<uint64_t>(
428 std::max<int64_t>(
429 1, // ensure objects is non-negative and non-zero
430 info.stats.stats.sum.num_objects));
431 uint64_t cost_per_object = std::max<uint64_t>(num_bytes / num_objects, 1);
432 osd->queue_for_recovery(
433 this, cost_per_object, recovery_state.get_recovery_op_priority()
434 );
91327a77
AA
435 }
436}
9f95a23c 437
f67539c2 438void PG::queue_scrub_after_repair()
7c673cae 439{
f67539c2 440 dout(10) << __func__ << dendl;
9f95a23c 441 ceph_assert(ceph_mutex_is_locked(_lock));
f67539c2
TL
442
443 m_planned_scrub.must_deep_scrub = true;
444 m_planned_scrub.check_repair = true;
445 m_planned_scrub.must_scrub = true;
1e59de90 446 m_planned_scrub.calculated_to_deep = true;
f67539c2 447
20effc67
TL
448 if (is_scrub_queued_or_active()) {
449 dout(10) << __func__ << ": scrubbing already ("
450 << (is_scrubbing() ? "active)" : "queued)") << dendl;
f67539c2
TL
451 return;
452 }
453
454 m_scrubber->set_op_parameters(m_planned_scrub);
455 dout(15) << __func__ << ": queueing" << dendl;
456
f67539c2 457 osd->queue_scrub_after_repair(this, Scrub::scrub_prio_t::high_priority);
9f95a23c 458}
7c673cae 459
9f95a23c
TL
460unsigned PG::get_scrub_priority()
461{
462 // a higher value -> a higher priority
f67539c2
TL
463 int64_t pool_scrub_priority =
464 pool.info.opts.value_or(pool_opts_t::SCRUB_PRIORITY, (int64_t)0);
9f95a23c
TL
465 return pool_scrub_priority > 0 ? pool_scrub_priority : cct->_conf->osd_scrub_priority;
466}
7c673cae 467
9f95a23c
TL
468Context *PG::finish_recovery()
469{
470 dout(10) << "finish_recovery" << dendl;
471 ceph_assert(info.last_complete == info.last_update);
7c673cae 472
9f95a23c 473 clear_recovery_state();
92f5a8d4 474
9f95a23c
TL
475 /*
476 * sync all this before purging strays. but don't block!
477 */
478 finish_sync_event = new C_PG_FinishRecovery(this);
479 return finish_sync_event;
7c673cae
FG
480}
481
f67539c2 482void PG::_finish_recovery(Context* c)
7c673cae 483{
f67539c2
TL
484 dout(15) << __func__ << " finish_sync_event? " << finish_sync_event << " clean? "
485 << is_clean() << dendl;
486
9f95a23c
TL
487 std::scoped_lock locker{*this};
488 if (recovery_state.is_deleting() || !is_clean()) {
489 dout(10) << __func__ << " raced with delete or repair" << dendl;
490 return;
7c673cae 491 }
9f95a23c
TL
492 // When recovery is initiated by a repair, that flag is left on
493 state_clear(PG_STATE_REPAIR);
494 if (c == finish_sync_event) {
f67539c2 495 dout(15) << __func__ << " scrub_after_recovery? " << scrub_after_recovery << dendl;
9f95a23c
TL
496 finish_sync_event = 0;
497 recovery_state.purge_strays();
7c673cae 498
9f95a23c
TL
499 publish_stats_to_osd();
500
501 if (scrub_after_recovery) {
502 dout(10) << "_finish_recovery requeueing for scrub" << dendl;
503 scrub_after_recovery = false;
f67539c2 504 queue_scrub_after_repair();
7c673cae 505 }
9f95a23c
TL
506 } else {
507 dout(10) << "_finish_recovery -- stale" << dendl;
7c673cae 508 }
9f95a23c 509}
7c673cae 510
9f95a23c
TL
511void PG::start_recovery_op(const hobject_t& soid)
512{
513 dout(10) << "start_recovery_op " << soid
514#ifdef DEBUG_RECOVERY_OIDS
515 << " (" << recovering_oids << ")"
516#endif
517 << dendl;
518 ceph_assert(recovery_ops_active >= 0);
519 recovery_ops_active++;
520#ifdef DEBUG_RECOVERY_OIDS
521 recovering_oids.insert(soid);
522#endif
523 osd->start_recovery_op(this, soid);
7c673cae
FG
524}
525
9f95a23c 526void PG::finish_recovery_op(const hobject_t& soid, bool dequeue)
7c673cae 527{
9f95a23c
TL
528 dout(10) << "finish_recovery_op " << soid
529#ifdef DEBUG_RECOVERY_OIDS
f67539c2 530 << " (" << recovering_oids << ")"
9f95a23c
TL
531#endif
532 << dendl;
533 ceph_assert(recovery_ops_active > 0);
534 recovery_ops_active--;
535#ifdef DEBUG_RECOVERY_OIDS
536 ceph_assert(recovering_oids.count(soid));
537 recovering_oids.erase(recovering_oids.find(soid));
538#endif
539 osd->finish_recovery_op(this, soid, dequeue);
7c673cae 540
9f95a23c
TL
541 if (!dequeue) {
542 queue_recovery();
7c673cae 543 }
7c673cae
FG
544}
545
9f95a23c
TL
546void PG::split_into(pg_t child_pgid, PG *child, unsigned split_bits)
547{
548 recovery_state.split_into(child_pgid, &child->recovery_state, split_bits);
7c673cae 549
9f95a23c 550 child->update_snap_mapper_bits(split_bits);
7c673cae 551
9f95a23c
TL
552 child->snap_trimq = snap_trimq;
553 child->snap_trimq_repeat = snap_trimq_repeat;
554
555 _split_into(child_pgid, child, split_bits);
556
557 // release all backoffs for simplicity
558 release_backoffs(hobject_t(), hobject_t::get_max());
7c673cae
FG
559}
560
9f95a23c 561void PG::start_split_stats(const set<spg_t>& childpgs, vector<object_stat_sum_t> *out)
7c673cae 562{
9f95a23c 563 recovery_state.start_split_stats(childpgs, out);
7c673cae
FG
564}
565
9f95a23c
TL
566void PG::finish_split_stats(const object_stat_sum_t& stats, ObjectStore::Transaction &t)
567{
568 recovery_state.finish_split_stats(stats, t);
569}
570
571void PG::merge_from(map<spg_t,PGRef>& sources, PeeringCtx &rctx,
572 unsigned split_bits,
573 const pg_merge_meta_t& last_pg_merge_meta)
574{
575 dout(10) << __func__ << " from " << sources << " split_bits " << split_bits
576 << dendl;
577 map<spg_t, PeeringState*> source_ps;
578 for (auto &&source : sources) {
579 source_ps.emplace(source.first, &source.second->recovery_state);
580 }
581 recovery_state.merge_from(source_ps, rctx, split_bits, last_pg_merge_meta);
582
583 for (auto& i : sources) {
584 auto& source = i.second;
585 // wipe out source's pgmeta
586 rctx.transaction.remove(source->coll, source->pgmeta_oid);
587
588 // merge (and destroy source collection)
589 rctx.transaction.merge_collection(source->coll, coll, split_bits);
7c673cae
FG
590 }
591
9f95a23c
TL
592 // merge_collection does this, but maybe all of our sources were missing.
593 rctx.transaction.collection_set_bits(coll, split_bits);
594
595 snap_mapper.update_bits(split_bits);
7c673cae
FG
596}
597
9f95a23c 598void PG::add_backoff(const ceph::ref_t<Session>& s, const hobject_t& begin, const hobject_t& end)
7c673cae 599{
9f95a23c
TL
600 auto con = s->con;
601 if (!con) // OSD::ms_handle_reset clears s->con without a lock
602 return;
603 auto b = s->have_backoff(info.pgid, begin);
604 if (b) {
605 derr << __func__ << " already have backoff for " << s << " begin " << begin
606 << " " << *b << dendl;
607 ceph_abort();
7c673cae 608 }
9f95a23c
TL
609 std::lock_guard l(backoff_lock);
610 b = ceph::make_ref<Backoff>(info.pgid, this, s, ++s->backoff_seq, begin, end);
611 backoffs[begin].insert(b);
612 s->add_backoff(b);
613 dout(10) << __func__ << " session " << s << " added " << *b << dendl;
614 con->send_message(
615 new MOSDBackoff(
616 info.pgid,
617 get_osdmap_epoch(),
618 CEPH_OSD_BACKOFF_OP_BLOCK,
619 b->id,
620 begin,
621 end));
7c673cae
FG
622}
623
9f95a23c 624void PG::release_backoffs(const hobject_t& begin, const hobject_t& end)
7c673cae 625{
9f95a23c
TL
626 dout(10) << __func__ << " [" << begin << "," << end << ")" << dendl;
627 vector<ceph::ref_t<Backoff>> bv;
628 {
629 std::lock_guard l(backoff_lock);
630 auto p = backoffs.lower_bound(begin);
631 while (p != backoffs.end()) {
632 int r = cmp(p->first, end);
633 dout(20) << __func__ << " ? " << r << " " << p->first
634 << " " << p->second << dendl;
635 // note: must still examine begin=end=p->first case
636 if (r > 0 || (r == 0 && begin < end)) {
637 break;
638 }
639 dout(20) << __func__ << " checking " << p->first
640 << " " << p->second << dendl;
641 auto q = p->second.begin();
642 while (q != p->second.end()) {
643 dout(20) << __func__ << " checking " << *q << dendl;
20effc67
TL
644 int rr = cmp((*q)->begin, begin);
645 if (rr == 0 || (rr > 0 && (*q)->end < end)) {
9f95a23c
TL
646 bv.push_back(*q);
647 q = p->second.erase(q);
648 } else {
649 ++q;
650 }
651 }
652 if (p->second.empty()) {
653 p = backoffs.erase(p);
654 } else {
655 ++p;
656 }
7c673cae
FG
657 }
658 }
9f95a23c
TL
659 for (auto b : bv) {
660 std::lock_guard l(b->lock);
661 dout(10) << __func__ << " " << *b << dendl;
662 if (b->session) {
663 ceph_assert(b->pg == this);
664 ConnectionRef con = b->session->con;
665 if (con) { // OSD::ms_handle_reset clears s->con without a lock
666 con->send_message(
667 new MOSDBackoff(
668 info.pgid,
669 get_osdmap_epoch(),
670 CEPH_OSD_BACKOFF_OP_UNBLOCK,
671 b->id,
672 b->begin,
673 b->end));
7c673cae 674 }
9f95a23c
TL
675 if (b->is_new()) {
676 b->state = Backoff::STATE_DELETING;
7c673cae 677 } else {
9f95a23c
TL
678 b->session->rm_backoff(b);
679 b->session.reset();
7c673cae 680 }
9f95a23c
TL
681 b->pg.reset();
682 }
7c673cae 683 }
7c673cae
FG
684}
685
9f95a23c 686void PG::clear_backoffs()
7c673cae 687{
9f95a23c
TL
688 dout(10) << __func__ << " " << dendl;
689 map<hobject_t,set<ceph::ref_t<Backoff>>> ls;
690 {
691 std::lock_guard l(backoff_lock);
692 ls.swap(backoffs);
693 }
694 for (auto& p : ls) {
695 for (auto& b : p.second) {
696 std::lock_guard l(b->lock);
697 dout(10) << __func__ << " " << *b << dendl;
698 if (b->session) {
699 ceph_assert(b->pg == this);
700 if (b->is_new()) {
701 b->state = Backoff::STATE_DELETING;
702 } else {
703 b->session->rm_backoff(b);
704 b->session.reset();
705 }
706 b->pg.reset();
707 }
708 }
709 }
710}
7c673cae 711
9f95a23c
TL
712// called by Session::clear_backoffs()
713void PG::rm_backoff(const ceph::ref_t<Backoff>& b)
714{
715 dout(10) << __func__ << " " << *b << dendl;
716 std::lock_guard l(backoff_lock);
717 ceph_assert(ceph_mutex_is_locked_by_me(b->lock));
718 ceph_assert(b->pg == this);
719 auto p = backoffs.find(b->begin);
720 // may race with release_backoffs()
721 if (p != backoffs.end()) {
722 auto q = p->second.find(b);
723 if (q != p->second.end()) {
724 p->second.erase(q);
725 if (p->second.empty()) {
726 backoffs.erase(p);
727 }
728 }
729 }
730}
7c673cae 731
f67539c2 732void PG::clear_recovery_state()
9f95a23c
TL
733{
734 dout(10) << "clear_recovery_state" << dendl;
7c673cae 735
9f95a23c 736 finish_sync_event = 0;
7c673cae 737
9f95a23c
TL
738 hobject_t soid;
739 while (recovery_ops_active > 0) {
740#ifdef DEBUG_RECOVERY_OIDS
741 soid = *recovering_oids.begin();
742#endif
743 finish_recovery_op(soid, true);
744 }
7c673cae 745
9f95a23c
TL
746 backfill_info.clear();
747 peer_backfill_info.clear();
748 waiting_on_backfill.clear();
749 _clear_recovery_state(); // pg impl specific hook
750}
7c673cae 751
9f95a23c
TL
752void PG::cancel_recovery()
753{
754 dout(10) << "cancel_recovery" << dendl;
755 clear_recovery_state();
7c673cae
FG
756}
757
9f95a23c
TL
758void PG::set_probe_targets(const set<pg_shard_t> &probe_set)
759{
760 std::lock_guard l(heartbeat_peer_lock);
761 probe_targets.clear();
762 for (set<pg_shard_t>::iterator i = probe_set.begin();
763 i != probe_set.end();
7c673cae 764 ++i) {
9f95a23c 765 probe_targets.insert(i->osd);
7c673cae 766 }
7c673cae
FG
767}
768
9f95a23c 769void PG::send_cluster_message(
f67539c2 770 int target, MessageRef m,
20effc67 771 epoch_t epoch, bool share_map_update)
9f95a23c
TL
772{
773 ConnectionRef con = osd->get_con_osd_cluster(
774 target, get_osdmap_epoch());
775 if (!con) {
11fdf7f2
TL
776 return;
777 }
7c673cae 778
9f95a23c
TL
779 if (share_map_update) {
780 osd->maybe_share_map(con.get(), get_osdmap());
7c673cae 781 }
9f95a23c
TL
782 osd->send_message_osd_cluster(m, con.get());
783}
7c673cae 784
9f95a23c
TL
785void PG::clear_probe_targets()
786{
787 std::lock_guard l(heartbeat_peer_lock);
788 probe_targets.clear();
789}
7c673cae 790
9f95a23c
TL
791void PG::update_heartbeat_peers(set<int> new_peers)
792{
793 bool need_update = false;
794 heartbeat_peer_lock.lock();
795 if (new_peers == heartbeat_peers) {
796 dout(10) << "update_heartbeat_peers " << heartbeat_peers << " unchanged" << dendl;
797 } else {
798 dout(10) << "update_heartbeat_peers " << heartbeat_peers << " -> " << new_peers << dendl;
799 heartbeat_peers.swap(new_peers);
800 need_update = true;
11fdf7f2 801 }
9f95a23c 802 heartbeat_peer_lock.unlock();
11fdf7f2 803
9f95a23c
TL
804 if (need_update)
805 osd->need_heartbeat_peer_update();
806}
11fdf7f2 807
11fdf7f2 808
9f95a23c
TL
809bool PG::check_in_progress_op(
810 const osd_reqid_t &r,
811 eversion_t *version,
812 version_t *user_version,
813 int *return_code,
814 vector<pg_log_op_return_item_t> *op_returns
815 ) const
816{
817 return (
818 projected_log.get_request(r, version, user_version, return_code,
819 op_returns) ||
820 recovery_state.get_pg_log().get_log().get_request(
821 r, version, user_version, return_code, op_returns));
11fdf7f2
TL
822}
823
9f95a23c 824void PG::publish_stats_to_osd()
11fdf7f2 825{
9f95a23c
TL
826 if (!is_primary())
827 return;
11fdf7f2 828
20effc67
TL
829 ceph_assert(m_scrubber);
830 recovery_state.update_stats_wo_resched(
831 [scrubber = m_scrubber.get()](pg_history_t& hist,
832 pg_stat_t& info) mutable -> void {
833 info.scrub_sched_status = scrubber->get_schedule();
834 });
835
9f95a23c 836 std::lock_guard l{pg_stats_publish_lock};
20effc67
TL
837 auto stats =
838 recovery_state.prepare_stats_for_publish(pg_stats_publish, unstable_stats);
9f95a23c 839 if (stats) {
20effc67 840 pg_stats_publish = std::move(stats);
11fdf7f2 841 }
11fdf7f2
TL
842}
843
9f95a23c 844unsigned PG::get_target_pg_log_entries() const
11fdf7f2 845{
9f95a23c 846 return osd->get_target_pg_log_entries();
11fdf7f2
TL
847}
848
9f95a23c 849void PG::clear_publish_stats()
11fdf7f2 850{
9f95a23c
TL
851 dout(15) << "clear_stats" << dendl;
852 std::lock_guard l{pg_stats_publish_lock};
20effc67 853 pg_stats_publish.reset();
7c673cae
FG
854}
855
856/**
9f95a23c 857 * initialize a newly instantiated pg
7c673cae 858 *
9f95a23c
TL
859 * Initialize PG state, as when a PG is initially created, or when it
860 * is first instantiated on the current node.
7c673cae 861 *
9f95a23c
TL
862 * @param role our role/rank
863 * @param newup up set
864 * @param newacting acting set
865 * @param history pg history
866 * @param pi past_intervals
867 * @param backfill true if info should be marked as backfill
868 * @param t transaction to write out our new state in
7c673cae 869 */
9f95a23c
TL
870void PG::init(
871 int role,
872 const vector<int>& newup, int new_up_primary,
873 const vector<int>& newacting, int new_acting_primary,
874 const pg_history_t& history,
875 const PastIntervals& pi,
9f95a23c
TL
876 ObjectStore::Transaction &t)
877{
878 recovery_state.init(
879 role, newup, new_up_primary, newacting,
20effc67 880 new_acting_primary, history, pi, t);
9f95a23c 881}
7c673cae 882
9f95a23c
TL
883void PG::shutdown()
884{
885 ch->flush();
886 std::scoped_lock l{*this};
887 recovery_state.shutdown();
888 on_shutdown();
889}
7c673cae 890
9f95a23c
TL
891#pragma GCC diagnostic ignored "-Wpragmas"
892#pragma GCC diagnostic push
893#pragma GCC diagnostic ignored "-Wdeprecated-declarations"
7c673cae 894
9f95a23c
TL
895void PG::upgrade(ObjectStore *store)
896{
897 dout(0) << __func__ << " " << info_struct_v << " -> " << pg_latest_struct_v
898 << dendl;
899 ceph_assert(info_struct_v <= 10);
900 ObjectStore::Transaction t;
7c673cae 901
9f95a23c 902 // <do upgrade steps here>
7c673cae 903
9f95a23c
TL
904 // finished upgrade!
905 ceph_assert(info_struct_v == 10);
7c673cae 906
9f95a23c
TL
907 // update infover_key
908 if (info_struct_v < pg_latest_struct_v) {
909 map<string,bufferlist> v;
910 __u8 ver = pg_latest_struct_v;
911 encode(ver, v[string(infover_key)]);
912 t.omap_setkeys(coll, pgmeta_oid, v);
7c673cae 913 }
7c673cae 914
9f95a23c 915 recovery_state.force_write_state(t);
7c673cae 916
9f95a23c
TL
917 ObjectStore::CollectionHandle ch = store->open_collection(coll);
918 int r = store->queue_transaction(ch, std::move(t));
919 if (r != 0) {
920 derr << __func__ << ": queue_transaction returned "
921 << cpp_strerror(r) << dendl;
922 ceph_abort();
7c673cae 923 }
9f95a23c 924 ceph_assert(r == 0);
7c673cae 925
9f95a23c
TL
926 C_SaferCond waiter;
927 if (!ch->flush_commit(&waiter)) {
928 waiter.wait();
7c673cae 929 }
9f95a23c 930}
7c673cae 931
9f95a23c
TL
932#pragma GCC diagnostic pop
933#pragma GCC diagnostic warning "-Wpragmas"
7c673cae 934
9f95a23c
TL
935void PG::prepare_write(
936 pg_info_t &info,
937 pg_info_t &last_written_info,
938 PastIntervals &past_intervals,
939 PGLog &pglog,
940 bool dirty_info,
941 bool dirty_big_info,
942 bool need_write_epoch,
943 ObjectStore::Transaction &t)
944{
945 info.stats.stats.add(unstable_stats);
946 unstable_stats.clear();
947 map<string,bufferlist> km;
948 string key_to_remove;
949 if (dirty_big_info || dirty_info) {
950 int ret = prepare_info_keymap(
951 cct,
952 &km,
953 &key_to_remove,
11fdf7f2 954 get_osdmap_epoch(),
9f95a23c
TL
955 info,
956 last_written_info,
957 past_intervals,
958 dirty_big_info,
959 need_write_epoch,
960 cct->_conf->osd_fast_info,
961 osd->logger,
962 this);
963 ceph_assert(ret == 0);
7c673cae 964 }
9f95a23c
TL
965 pglog.write_log_and_missing(
966 t, &km, coll, pgmeta_oid, pool.info.require_rollback());
967 if (!km.empty())
968 t.omap_setkeys(coll, pgmeta_oid, km);
969 if (!key_to_remove.empty())
970 t.omap_rmkey(coll, pgmeta_oid, key_to_remove);
971}
7c673cae 972
9f95a23c
TL
973#pragma GCC diagnostic ignored "-Wpragmas"
974#pragma GCC diagnostic push
975#pragma GCC diagnostic ignored "-Wdeprecated-declarations"
7c673cae 976
9f95a23c
TL
977bool PG::_has_removal_flag(ObjectStore *store,
978 spg_t pgid)
979{
980 coll_t coll(pgid);
981 ghobject_t pgmeta_oid(pgid.make_pgmeta_oid());
7c673cae 982
9f95a23c
TL
983 // first try new way
984 set<string> keys;
985 keys.insert("_remove");
986 map<string,bufferlist> values;
987 auto ch = store->open_collection(coll);
988 ceph_assert(ch);
989 if (store->omap_get_values(ch, pgmeta_oid, keys, &values) == 0 &&
990 values.size() == 1)
991 return true;
7c673cae 992
9f95a23c
TL
993 return false;
994}
c07f9fc5 995
9f95a23c
TL
996int PG::peek_map_epoch(ObjectStore *store,
997 spg_t pgid,
998 epoch_t *pepoch)
999{
1000 coll_t coll(pgid);
9f95a23c
TL
1001 ghobject_t pgmeta_oid(pgid.make_pgmeta_oid());
1002 epoch_t cur_epoch = 0;
7c673cae 1003
9f95a23c
TL
1004 // validate collection name
1005 ceph_assert(coll.is_pg());
7c673cae 1006
9f95a23c
TL
1007 // try for v8
1008 set<string> keys;
1009 keys.insert(string(infover_key));
1010 keys.insert(string(epoch_key));
1011 map<string,bufferlist> values;
1012 auto ch = store->open_collection(coll);
1013 ceph_assert(ch);
1014 int r = store->omap_get_values(ch, pgmeta_oid, keys, &values);
1015 if (r == 0) {
1016 ceph_assert(values.size() == 2);
7c673cae 1017
9f95a23c
TL
1018 // sanity check version
1019 auto bp = values[string(infover_key)].cbegin();
1020 __u8 struct_v = 0;
1021 decode(struct_v, bp);
1022 ceph_assert(struct_v >= 8);
91327a77 1023
9f95a23c
TL
1024 // get epoch
1025 bp = values[string(epoch_key)].begin();
1026 decode(cur_epoch, bp);
1027 } else {
1028 // probably bug 10617; see OSD::load_pgs()
1029 return -1;
1030 }
7c673cae 1031
9f95a23c
TL
1032 *pepoch = cur_epoch;
1033 return 0;
1034}
7c673cae 1035
9f95a23c
TL
1036#pragma GCC diagnostic pop
1037#pragma GCC diagnostic warning "-Wpragmas"
7c673cae 1038
9f95a23c
TL
1039bool PG::check_log_for_corruption(ObjectStore *store)
1040{
1041 /// TODO: this method needs to work with the omap log
1042 return true;
1043}
7c673cae 1044
9f95a23c
TL
1045//! Get the name we're going to save our corrupt page log as
1046std::string PG::get_corrupt_pg_log_name() const
1047{
1048 const int MAX_BUF = 512;
1049 char buf[MAX_BUF];
1050 struct tm tm_buf;
1051 time_t my_time(time(NULL));
1052 const struct tm *t = localtime_r(&my_time, &tm_buf);
1053 int ret = strftime(buf, sizeof(buf), "corrupt_log_%Y-%m-%d_%k:%M_", t);
1054 if (ret == 0) {
1055 dout(0) << "strftime failed" << dendl;
1056 return "corrupt_log_unknown_time";
7c673cae 1057 }
9f95a23c
TL
1058 string out(buf);
1059 out += stringify(info.pgid);
1060 return out;
7c673cae
FG
1061}
1062
9f95a23c
TL
1063int PG::read_info(
1064 ObjectStore *store, spg_t pgid, const coll_t &coll,
1065 pg_info_t &info, PastIntervals &past_intervals,
1066 __u8 &struct_v)
7c673cae 1067{
9f95a23c
TL
1068 set<string> keys;
1069 keys.insert(string(infover_key));
1070 keys.insert(string(info_key));
1071 keys.insert(string(biginfo_key));
1072 keys.insert(string(fastinfo_key));
1073 ghobject_t pgmeta_oid(pgid.make_pgmeta_oid());
1074 map<string,bufferlist> values;
1075 auto ch = store->open_collection(coll);
1076 ceph_assert(ch);
1077 int r = store->omap_get_values(ch, pgmeta_oid, keys, &values);
1078 ceph_assert(r == 0);
1079 ceph_assert(values.size() == 3 ||
1080 values.size() == 4);
7c673cae 1081
9f95a23c
TL
1082 auto p = values[string(infover_key)].cbegin();
1083 decode(struct_v, p);
1084 ceph_assert(struct_v >= 10);
7c673cae 1085
9f95a23c
TL
1086 p = values[string(info_key)].begin();
1087 decode(info, p);
7c673cae 1088
9f95a23c
TL
1089 p = values[string(biginfo_key)].begin();
1090 decode(past_intervals, p);
1091 decode(info.purged_snaps, p);
7c673cae 1092
9f95a23c
TL
1093 p = values[string(fastinfo_key)].begin();
1094 if (!p.end()) {
1095 pg_fast_info_t fast;
1096 decode(fast, p);
1097 fast.try_apply_to(&info);
1098 }
1099 return 0;
7c673cae
FG
1100}
1101
9f95a23c
TL
1102void PG::read_state(ObjectStore *store)
1103{
1104 PastIntervals past_intervals_from_disk;
1105 pg_info_t info_from_disk;
1106 int r = read_info(
1107 store,
1108 pg_id,
1109 coll,
1110 info_from_disk,
1111 past_intervals_from_disk,
1112 info_struct_v);
1113 ceph_assert(r >= 0);
7c673cae 1114
9f95a23c
TL
1115 if (info_struct_v < pg_compat_struct_v) {
1116 derr << "PG needs upgrade, but on-disk data is too old; upgrade to"
1117 << " an older version first." << dendl;
1118 ceph_abort_msg("PG too old to upgrade");
1119 }
7c673cae 1120
9f95a23c
TL
1121 recovery_state.init_from_disk_state(
1122 std::move(info_from_disk),
1123 std::move(past_intervals_from_disk),
1124 [this, store] (PGLog &pglog) {
1125 ostringstream oss;
1126 pglog.read_log_and_missing(
1127 store,
1128 ch,
1129 pgmeta_oid,
1130 info,
1131 oss,
1132 cct->_conf->osd_ignore_stale_divergent_priors,
1133 cct->_conf->osd_debug_verify_missing_on_start);
1134
1135 if (oss.tellp())
1136 osd->clog->error() << oss.str();
1137 return 0;
1138 });
7c673cae 1139
9f95a23c
TL
1140 if (info_struct_v < pg_latest_struct_v) {
1141 upgrade(store);
7c673cae
FG
1142 }
1143
9f95a23c
TL
1144 // initialize current mapping
1145 {
1146 int primary, up_primary;
1147 vector<int> acting, up;
1148 get_osdmap()->pg_to_up_acting_osds(
1149 pg_id.pgid, &up, &up_primary, &acting, &primary);
1150 recovery_state.init_primary_up_acting(
1151 up,
1152 acting,
1153 up_primary,
1154 primary);
1155 recovery_state.set_role(OSDMap::calc_pg_role(pg_whoami, acting));
1156 }
1157
1158 // init pool options
1159 store->set_collection_opts(ch, pool.info.opts);
7c673cae 1160
20effc67 1161 PeeringCtx rctx;
9f95a23c
TL
1162 handle_initialize(rctx);
1163 // note: we don't activate here because we know the OSD will advance maps
1164 // during boot.
1165 write_if_dirty(rctx.transaction);
1166 store->queue_transaction(ch, std::move(rctx.transaction));
7c673cae
FG
1167}
1168
9f95a23c
TL
1169void PG::update_snap_map(
1170 const vector<pg_log_entry_t> &log_entries,
1171 ObjectStore::Transaction &t)
7c673cae 1172{
f67539c2 1173 for (auto i = log_entries.cbegin(); i != log_entries.cend(); ++i) {
9f95a23c
TL
1174 OSDriver::OSTransaction _t(osdriver.get_transaction(&t));
1175 if (i->soid.snap < CEPH_MAXSNAP) {
1176 if (i->is_delete()) {
1177 int r = snap_mapper.remove_oid(
1178 i->soid,
1179 &_t);
f67539c2 1180 if (r)
9f95a23c
TL
1181 derr << __func__ << " remove_oid " << i->soid << " failed with " << r << dendl;
1182 // On removal tolerate missing key corruption
1183 ceph_assert(r == 0 || r == -ENOENT);
1184 } else if (i->is_update()) {
1185 ceph_assert(i->snaps.length() > 0);
1186 vector<snapid_t> snaps;
1187 bufferlist snapbl = i->snaps;
1188 auto p = snapbl.cbegin();
1189 try {
1190 decode(snaps, p);
1191 } catch (...) {
1192 derr << __func__ << " decode snaps failure on " << *i << dendl;
1193 snaps.clear();
1194 }
1195 set<snapid_t> _snaps(snaps.begin(), snaps.end());
7c673cae 1196
9f95a23c
TL
1197 if (i->is_clone() || i->is_promote()) {
1198 snap_mapper.add_oid(
1199 i->soid,
1200 _snaps,
1201 &_t);
1202 } else if (i->is_modify()) {
1203 int r = snap_mapper.update_snaps(
1204 i->soid,
1205 _snaps,
1206 0,
1207 &_t);
1208 ceph_assert(r == 0);
11fdf7f2 1209 } else {
9f95a23c 1210 ceph_assert(i->is_clean());
11fdf7f2
TL
1211 }
1212 }
11fdf7f2
TL
1213 }
1214 }
7c673cae
FG
1215}
1216
9f95a23c
TL
1217/**
1218 * filter trimming|trimmed snaps out of snapcontext
1219 */
1220void PG::filter_snapc(vector<snapid_t> &snaps)
7c673cae 1221{
9f95a23c
TL
1222 // nothing needs to trim, we can return immediately
1223 if (snap_trimq.empty() && info.purged_snaps.empty())
1224 return;
1225
1226 bool filtering = false;
1227 vector<snapid_t> newsnaps;
1228 for (vector<snapid_t>::iterator p = snaps.begin();
1229 p != snaps.end();
1230 ++p) {
1231 if (snap_trimq.contains(*p) || info.purged_snaps.contains(*p)) {
1232 if (!filtering) {
1233 // start building a new vector with what we've seen so far
1234 dout(10) << "filter_snapc filtering " << snaps << dendl;
1235 newsnaps.insert(newsnaps.begin(), snaps.begin(), p);
1236 filtering = true;
1237 }
1238 dout(20) << "filter_snapc removing trimq|purged snap " << *p << dendl;
1239 } else {
1240 if (filtering)
1241 newsnaps.push_back(*p); // continue building new vector
d2e6a577 1242 }
c07f9fc5 1243 }
9f95a23c
TL
1244 if (filtering) {
1245 snaps.swap(newsnaps);
1246 dout(10) << "filter_snapc result " << snaps << dendl;
a8e16298 1247 }
a8e16298
TL
1248}
1249
9f95a23c 1250void PG::requeue_object_waiters(map<hobject_t, list<OpRequestRef>>& m)
a8e16298 1251{
f67539c2 1252 for (auto it = m.begin(); it != m.end(); ++it)
9f95a23c
TL
1253 requeue_ops(it->second);
1254 m.clear();
c07f9fc5 1255}
7c673cae 1256
9f95a23c 1257void PG::requeue_op(OpRequestRef op)
c07f9fc5 1258{
9f95a23c
TL
1259 auto p = waiting_for_map.find(op->get_source());
1260 if (p != waiting_for_map.end()) {
1e59de90
TL
1261 dout(20) << __func__ << " " << *op->get_req()
1262 << " (waiting_for_map " << p->first << ")"
9f95a23c
TL
1263 << dendl;
1264 p->second.push_front(op);
c07f9fc5 1265 } else {
1e59de90 1266 dout(20) << __func__ << " " << *op->get_req() << dendl;
9f95a23c
TL
1267 osd->enqueue_front(
1268 OpSchedulerItem(
1269 unique_ptr<OpSchedulerItem::OpQueueable>(new PGOpItem(info.pgid, op)),
1270 op->get_req()->get_cost(),
1271 op->get_req()->get_priority(),
1272 op->get_req()->get_recv_stamp(),
1273 op->get_req()->get_source().num(),
1274 get_osdmap_epoch()));
7c673cae 1275 }
c07f9fc5 1276}
7c673cae 1277
9f95a23c 1278void PG::requeue_ops(list<OpRequestRef> &ls)
c07f9fc5 1279{
9f95a23c
TL
1280 for (list<OpRequestRef>::reverse_iterator i = ls.rbegin();
1281 i != ls.rend();
1282 ++i) {
1283 requeue_op(*i);
c07f9fc5 1284 }
9f95a23c 1285 ls.clear();
7c673cae
FG
1286}
1287
9f95a23c 1288void PG::requeue_map_waiters()
7c673cae 1289{
9f95a23c
TL
1290 epoch_t epoch = get_osdmap_epoch();
1291 auto p = waiting_for_map.begin();
1292 while (p != waiting_for_map.end()) {
1293 if (epoch < p->second.front()->min_epoch) {
1294 dout(20) << __func__ << " " << p->first << " front op "
1295 << p->second.front() << " must still wait, doing nothing"
1296 << dendl;
1297 ++p;
1298 } else {
1299 dout(20) << __func__ << " " << p->first << " " << p->second << dendl;
1300 for (auto q = p->second.rbegin(); q != p->second.rend(); ++q) {
1301 auto req = *q;
1302 osd->enqueue_front(OpSchedulerItem(
1303 unique_ptr<OpSchedulerItem::OpQueueable>(new PGOpItem(info.pgid, req)),
1304 req->get_req()->get_cost(),
1305 req->get_req()->get_priority(),
1306 req->get_req()->get_recv_stamp(),
1307 req->get_req()->get_source().num(),
1308 epoch));
1309 }
1310 p = waiting_for_map.erase(p);
c07f9fc5 1311 }
9f95a23c
TL
1312 }
1313}
7c673cae 1314
f67539c2
TL
1315bool PG::get_must_scrub() const
1316{
1317 dout(20) << __func__ << " must_scrub? " << (m_planned_scrub.must_scrub ? "true" : "false") << dendl;
1318 return m_planned_scrub.must_scrub;
1319}
1320
1321unsigned int PG::scrub_requeue_priority(Scrub::scrub_prio_t with_priority) const
1322{
1323 return m_scrubber->scrub_requeue_priority(with_priority);
1324}
1325
1326unsigned int PG::scrub_requeue_priority(Scrub::scrub_prio_t with_priority, unsigned int suggested_priority) const
1327{
1328 return m_scrubber->scrub_requeue_priority(with_priority, suggested_priority);
1329}
7c673cae 1330
9f95a23c
TL
1331// ==========================================================================================
1332// SCRUB
7c673cae 1333
9f95a23c 1334/*
f67539c2
TL
1335 * implementation note:
1336 * PG::sched_scrub() is called only once per a specific scrub session.
1337 * That call commits us to the whatever choices are made (deep/shallow, etc').
1338 * Unless failing to start scrubbing, the 'planned scrub' flag-set is 'frozen' into
1339 * PgScrubber's m_flags, then cleared.
9f95a23c 1340 */
20effc67 1341Scrub::schedule_result_t PG::sched_scrub()
11fdf7f2 1342{
2a845540 1343 using Scrub::schedule_result_t;
f67539c2
TL
1344 dout(15) << __func__ << " pg(" << info.pgid
1345 << (is_active() ? ") <active>" : ") <not-active>")
1346 << (is_clean() ? " <clean>" : " <not-clean>") << dendl;
9f95a23c 1347 ceph_assert(ceph_mutex_is_locked(_lock));
20effc67 1348 ceph_assert(m_scrubber);
f67539c2 1349
20effc67 1350 if (is_scrub_queued_or_active()) {
2a845540 1351 return schedule_result_t::already_started;
11fdf7f2 1352 }
11fdf7f2 1353
20effc67 1354 if (!is_primary() || !is_active() || !is_clean()) {
2a845540
TL
1355 return schedule_result_t::bad_pg_state;
1356 }
1357
1358 if (state_test(PG_STATE_SNAPTRIM) || state_test(PG_STATE_SNAPTRIM_WAIT)) {
1359 // note that the trimmer checks scrub status when setting 'snaptrim_wait'
1360 // (on the transition from NotTrimming to Trimming/WaitReservation),
1361 // i.e. some time before setting 'snaptrim'.
1362 dout(10) << __func__ << ": cannot scrub while snap-trimming" << dendl;
1363 return schedule_result_t::bad_pg_state;
f67539c2 1364 }
7c673cae 1365
f67539c2
TL
1366 // analyse the combination of the requested scrub flags, the osd/pool configuration
1367 // and the PG status to determine whether we should scrub now, and what type of scrub
1368 // should that be.
1e59de90 1369 auto updated_flags = validate_scrub_mode();
f67539c2
TL
1370 if (!updated_flags) {
1371 // the stars do not align for starting a scrub for this PG at this time
1372 // (due to configuration or priority issues)
1373 // The reason was already reported by the callee.
1374 dout(10) << __func__ << ": failed to initiate a scrub" << dendl;
2a845540 1375 return schedule_result_t::preconditions;
f67539c2 1376 }
7c673cae 1377
f67539c2
TL
1378 // try to reserve the local OSD resources. If failing: no harm. We will
1379 // be retried by the OSD later on.
1380 if (!m_scrubber->reserve_local()) {
1381 dout(10) << __func__ << ": failed to reserve locally" << dendl;
2a845540 1382 return schedule_result_t::no_local_resources;
f67539c2 1383 }
7c673cae 1384
f67539c2
TL
1385 // can commit to the updated flags now, as nothing will stop the scrub
1386 m_planned_scrub = *updated_flags;
9f95a23c 1387
f67539c2
TL
1388 // An interrupted recovery repair could leave this set.
1389 state_clear(PG_STATE_REPAIR);
9f95a23c 1390
f67539c2
TL
1391 // Pass control to the scrubber. It is the scrubber that handles the replicas'
1392 // resources reservations.
1393 m_scrubber->set_op_parameters(m_planned_scrub);
9f95a23c 1394
f67539c2 1395 dout(10) << __func__ << ": queueing" << dendl;
f67539c2 1396 osd->queue_for_scrub(this, Scrub::scrub_prio_t::low_priority);
2a845540 1397 return schedule_result_t::scrub_initiated;
f67539c2
TL
1398}
1399
1400double PG::next_deepscrub_interval() const
1401{
1402 double deep_scrub_interval =
1403 pool.info.opts.value_or(pool_opts_t::DEEP_SCRUB_INTERVAL, 0.0);
1404 if (deep_scrub_interval <= 0.0)
1405 deep_scrub_interval = cct->_conf->osd_deep_scrub_interval;
1406 return info.history.last_deep_scrub_stamp + deep_scrub_interval;
1407}
1408
1409bool PG::is_time_for_deep(bool allow_deep_scrub,
1e59de90 1410 bool allow_shallow_scrub,
f67539c2
TL
1411 bool has_deep_errors,
1412 const requested_scrub_t& planned) const
1413{
1e59de90
TL
1414 dout(10) << fmt::format(
1415 "{}: need-auto? {} allowed? {}/{} deep-errors? {} "
1416 "last_deep_scrub_stamp {}",
1417 __func__,
1418 planned.need_auto,
1419 allow_shallow_scrub,
1420 allow_deep_scrub,
1421 has_deep_errors,
1422 info.history.last_deep_scrub_stamp)
1423 << dendl;
f67539c2
TL
1424
1425 if (!allow_deep_scrub)
1426 return false;
1427
1428 if (planned.need_auto) {
1429 dout(10) << __func__ << ": need repair after scrub errors" << dendl;
1430 return true;
1431 }
1432
20effc67 1433 if (ceph_clock_now() >= next_deepscrub_interval()) {
1e59de90
TL
1434 dout(20) << __func__ << ": now (" << ceph_clock_now()
1435 << ") >= time for deep (" << next_deepscrub_interval() << ")"
1436 << dendl;
f67539c2 1437 return true;
20effc67 1438 }
f67539c2
TL
1439
1440 if (has_deep_errors) {
1e59de90 1441 // note: the text below is matched by 'standalone' tests
f67539c2 1442 osd->clog->info() << "osd." << osd->whoami << " pg " << info.pgid
1e59de90 1443 << " Deep scrub errors, upgrading scrub to deep-scrub";
f67539c2 1444 return true;
9f95a23c
TL
1445 }
1446
1e59de90
TL
1447 // we only flip coins if 'allow_shallow_scrub' is asserted. Otherwise - as
1448 // this function is called often, we will probably be deep-scrubbing most of
1449 // the time.
1450 if (allow_shallow_scrub) {
1451 const bool deep_coin_flip =
f67539c2
TL
1452 (rand() % 100) < cct->_conf->osd_deep_scrub_randomize_ratio * 100;
1453
1454 dout(15) << __func__ << ": time_for_deep=" << planned.time_for_deep
1455 << " deep_coin_flip=" << deep_coin_flip << dendl;
1456
1457 if (deep_coin_flip)
1458 return true;
1459 }
1460
1461 return false;
1462}
1463
1e59de90
TL
1464/*
1465 clang-format off
1466
1467 Request details | none | no-scrub | no-scrub+no-deep | no-deep
1468 ------------------------------------------------------------------------
1469 ------------------------------------------------------------------------
1470 initiated | shallow | shallow | shallow | shallow
1471 ------------------------------------------------------------------------
1472 init. + t.f.deep | deep | deep | shallow | shallow
1473 ------------------------------------------------------------------------
1474 initiated deep | deep | deep | deep | deep
1475 ------------------------------------------------------------------------
1476
1477 clang-format on
1478*/
1479std::optional<requested_scrub_t> PG::validate_initiated_scrub(
1480 bool allow_deep_scrub,
1481 bool try_to_auto_repair,
1482 bool time_for_deep,
1483 bool has_deep_errors,
1484 const requested_scrub_t& planned) const
1485{
1486 requested_scrub_t upd_flags{planned};
1487
1488 upd_flags.time_for_deep = time_for_deep;
1489 upd_flags.deep_scrub_on_error = false;
1490 upd_flags.auto_repair = false; // will only be considered for periodic scrubs
1491
1492 if (upd_flags.must_deep_scrub) {
1493 upd_flags.calculated_to_deep = true;
1494 } else if (upd_flags.time_for_deep && allow_deep_scrub) {
1495 upd_flags.calculated_to_deep = true;
1496 } else {
1497 upd_flags.calculated_to_deep = false;
1498 if (has_deep_errors) {
1499 osd->clog->error() << fmt::format(
1500 "osd.{} pg {} Regular scrub request, deep-scrub details will be lost",
1501 osd->whoami,
1502 info.pgid);
1503 }
1504 }
1505
1506 return upd_flags;
1507}
1508
1509/*
1510 clang-format off
1511
1512 for periodic scrubs:
1513
1514 Periodic type | none | no-scrub | no-scrub+no-deep | no-deep
1515 ------------------------------------------------------------------------
1516 ------------------------------------------------------------------------
1517 periodic | shallow | x | x | shallow
1518 ------------------------------------------------------------------------
1519 periodic + t.f.deep| deep | deep | x | shallow
1520 ------------------------------------------------------------------------
1521
1522 clang-format on
1523*/
1524std::optional<requested_scrub_t> PG::validate_periodic_mode(
1525 bool allow_deep_scrub,
1526 bool try_to_auto_repair,
1527 bool allow_shallow_scrub,
1528 bool time_for_deep,
1529 bool has_deep_errors,
1530 const requested_scrub_t& planned) const
f67539c2
TL
1531
1532{
1533 ceph_assert(!planned.must_deep_scrub && !planned.must_repair);
1534
1535 if (!allow_deep_scrub && has_deep_errors) {
1e59de90
TL
1536 osd->clog->error()
1537 << "osd." << osd->whoami << " pg " << info.pgid
1538 << " Regular scrub skipped due to deep-scrub errors and nodeep-scrub set";
1539 return std::nullopt; // no scrubbing
f67539c2
TL
1540 }
1541
1e59de90 1542 requested_scrub_t upd_flags{planned};
f67539c2 1543
1e59de90
TL
1544 upd_flags.time_for_deep = time_for_deep;
1545 upd_flags.deep_scrub_on_error = false;
1546 upd_flags.auto_repair = false;
1547 upd_flags.calculated_to_deep = false;
1548
1549 dout(20) << fmt::format("{}: allowed:{}/{} t.f.d:{} req:{}",
1550 __func__,
1551 allow_shallow_scrub,
1552 allow_deep_scrub,
1553 upd_flags.time_for_deep,
1554 planned)
1555 << dendl;
f67539c2 1556
1e59de90
TL
1557 // should we perform a shallow scrub?
1558 if (allow_shallow_scrub) {
1559 if (!upd_flags.time_for_deep || !allow_deep_scrub) {
1560 if (try_to_auto_repair) {
1561 dout(10) << __func__
1562 << ": auto repair with scrubbing, rescrub if errors found"
f67539c2 1563 << dendl;
1e59de90 1564 upd_flags.deep_scrub_on_error = true;
9f95a23c 1565 }
1e59de90
TL
1566 dout(20) << __func__ << " will do shallow scrub (time_for_deep = "
1567 << upd_flags.time_for_deep << ")" << dendl;
1568 return upd_flags;
7c673cae 1569 }
1e59de90 1570 // else - either deep-scrub or nothing
7c673cae 1571 }
f67539c2 1572
1e59de90
TL
1573 if (upd_flags.time_for_deep) {
1574 if (allow_deep_scrub) {
1575 if (try_to_auto_repair) {
1576 dout(20) << __func__ << ": auto repair with deep scrubbing" << dendl;
1577 upd_flags.auto_repair = true;
1578 }
1579 upd_flags.calculated_to_deep = true;
1580 dout(20) << fmt::format("{}: final: {}", __func__, upd_flags) << dendl;
1581 return upd_flags;
1582 }
1583 if (allow_shallow_scrub) {
1584 dout(20) << fmt::format("{}: final:{}", __func__, upd_flags) << dendl;
1585 return upd_flags;
1586 }
1587 return std::nullopt;
f67539c2
TL
1588 }
1589
1e59de90 1590 return std::nullopt; // no scrubbing
7c673cae
FG
1591}
1592
1e59de90
TL
1593
1594/*
1595 From docs.ceph.com (osd-internals/scrub):
1596
1597 clang-format off
1598
1599 Desired no-scrub flags & scrub type interactions:
1600
1601 Periodic type | none | no-scrub | no-scrub+no-deep | no-deep
1602 ------------------------------------------------------------------------
1603 ------------------------------------------------------------------------
1604 periodic | shallow | x | x | shallow
1605 ------------------------------------------------------------------------
1606 periodic + t.f.deep| deep | deep | x | shallow
1607 ------------------------------------------------------------------------
1608 initiated | shallow | shallow | shallow | shallow
1609 ------------------------------------------------------------------------
1610 init. + t.f.deep | deep | deep | shallow | shallow
1611 ------------------------------------------------------------------------
1612 initiated deep | deep | deep | deep | deep
1613 ------------------------------------------------------------------------
1614
1615 "periodic" - if !must_scrub && !must_deep_scrub;
1616 "initiated deep" - if must_scrub && must_deep_scrub;
1617 "initiated" - if must_scrub && !must_deep_scrub;
1618
1619 clang-format on
1620*/
1621/*
1622 * The returned flags collection (requested_scrub_t) is based on
1623 * m_planned_scrub with the following modifications:
1624 *
1625 * - calculated_to_deep will be set to shallow or deep, depending on the
1626 * scrub type (according to the decision table above);
1627 * - deep_scrub_on_error will be determined;
1628 * - same for auto_repair;
1629 * - time_for_deep will be set to true if the scrub is periodic and the
1630 * time for a deep scrub has been reached (+ some other conditions);
1631 * and
1632 * - need_auto is cleared
1633 */
1634std::optional<requested_scrub_t> PG::validate_scrub_mode() const
7c673cae 1635{
1e59de90 1636 const bool allow_shallow_scrub =
20effc67
TL
1637 !(get_osdmap()->test_flag(CEPH_OSDMAP_NOSCRUB) ||
1638 pool.info.has_flag(pg_pool_t::FLAG_NOSCRUB));
1639 const bool allow_deep_scrub =
20effc67
TL
1640 !(get_osdmap()->test_flag(CEPH_OSDMAP_NODEEP_SCRUB) ||
1641 pool.info.has_flag(pg_pool_t::FLAG_NODEEP_SCRUB));
1642 const bool has_deep_errors = (info.stats.stats.sum.num_deep_scrub_errors > 0);
1643 const bool try_to_auto_repair = (cct->_conf->osd_scrub_auto_repair &&
1e59de90 1644 get_pgbackend()->auto_repair_supported());
20effc67
TL
1645
1646 dout(10) << __func__ << " pg: " << info.pgid
1e59de90
TL
1647 << " allow: " << allow_shallow_scrub << "/" << allow_deep_scrub
1648 << " deep errs: " << has_deep_errors
1649 << " auto-repair: " << try_to_auto_repair << " ("
1650 << cct->_conf->osd_scrub_auto_repair << ")" << dendl;
f67539c2 1651
1e59de90
TL
1652 // scrubbing while recovering?
1653 const bool prevented_by_recovery =
1654 osd->is_recovery_active() && !cct->_conf->osd_scrub_during_recovery &&
1655 (!cct->_conf->osd_repair_during_recovery || !m_planned_scrub.must_repair);
f67539c2 1656
1e59de90
TL
1657 if (prevented_by_recovery) {
1658 dout(20) << __func__ << ": scrubbing prevented during recovery" << dendl;
1659 return std::nullopt;
f67539c2
TL
1660 }
1661
1e59de90
TL
1662 const bool time_for_deep = is_time_for_deep(allow_deep_scrub,
1663 allow_shallow_scrub,
1664 has_deep_errors,
1665 m_planned_scrub);
1666 std::optional<requested_scrub_t> upd_flags;
f67539c2 1667
1e59de90
TL
1668 if (m_planned_scrub.must_scrub) {
1669 upd_flags = validate_initiated_scrub(allow_deep_scrub,
1670 try_to_auto_repair,
1671 time_for_deep,
1672 has_deep_errors,
1673 m_planned_scrub);
1674 } else {
1675 ceph_assert(!m_planned_scrub.must_deep_scrub);
1676 upd_flags = validate_periodic_mode(allow_deep_scrub,
1677 try_to_auto_repair,
1678 allow_shallow_scrub,
1679 time_for_deep,
1680 has_deep_errors,
1681 m_planned_scrub);
1682 if (!upd_flags) {
20effc67 1683 dout(20) << __func__ << ": no periodic scrubs allowed" << dendl;
f67539c2
TL
1684 return std::nullopt;
1685 }
1686 }
1687
1e59de90
TL
1688 dout(10) << fmt::format("{}: next scrub flags: {}", __func__, *upd_flags)
1689 << dendl;
1690 upd_flags->need_auto = false;
f67539c2 1691 return upd_flags;
7c673cae
FG
1692}
1693
20effc67
TL
1694/*
1695 * Note: on_info_history_change() is used in those two cases where we're not sure
1696 * whether the role of the PG was changed, and if so - was this change relayed to the
1697 * scrub-queue.
1698 */
1699void PG::on_info_history_change()
7c673cae 1700{
20effc67 1701 ceph_assert(m_scrubber);
1e59de90 1702 m_scrubber->on_primary_change(__func__, m_planned_scrub);
9f95a23c 1703}
7c673cae 1704
20effc67 1705void PG::reschedule_scrub()
9f95a23c 1706{
20effc67
TL
1707 dout(20) << __func__ << " for a " << (is_primary() ? "Primary" : "non-primary") <<dendl;
1708
1709 // we are assuming no change in primary status
1710 if (is_primary()) {
1711 ceph_assert(m_scrubber);
1712 m_scrubber->update_scrub_job(m_planned_scrub);
1713 }
1714}
1715
1716void PG::on_primary_status_change(bool was_primary, bool now_primary)
1717{
1718 // make sure we have a working scrubber when becoming a primary
20effc67
TL
1719 if (was_primary != now_primary) {
1720 ceph_assert(m_scrubber);
1e59de90 1721 m_scrubber->on_primary_change(__func__, m_planned_scrub);
f67539c2 1722 }
9f95a23c 1723}
7c673cae 1724
f67539c2 1725void PG::scrub_requested(scrub_level_t scrub_level, scrub_type_t scrub_type)
9f95a23c 1726{
20effc67 1727 ceph_assert(m_scrubber);
f67539c2 1728 m_scrubber->scrub_requested(scrub_level, scrub_type, m_planned_scrub);
9f95a23c 1729}
7c673cae 1730
9f95a23c
TL
1731void PG::clear_ready_to_merge() {
1732 osd->clear_ready_to_merge(this);
1733}
7c673cae 1734
9f95a23c
TL
1735void PG::queue_want_pg_temp(const vector<int> &wanted) {
1736 osd->queue_want_pg_temp(get_pgid().pgid, wanted);
1737}
7c673cae 1738
9f95a23c
TL
1739void PG::clear_want_pg_temp() {
1740 osd->remove_want_pg_temp(get_pgid().pgid);
1741}
7c673cae 1742
9f95a23c
TL
1743void PG::on_role_change() {
1744 requeue_ops(waiting_for_peered);
1745 plpg_on_role_change();
1746}
7c673cae 1747
20effc67
TL
1748void PG::on_new_interval()
1749{
9f95a23c
TL
1750 projected_last_update = eversion_t();
1751 cancel_recovery();
20effc67 1752
1e59de90 1753 ceph_assert(m_scrubber);
20effc67
TL
1754 // log some scrub data before we react to the interval
1755 dout(20) << __func__ << (is_scrub_queued_or_active() ? " scrubbing " : " ")
1756 << "flags: " << m_planned_scrub << dendl;
1757
1e59de90 1758 m_scrubber->on_primary_change(__func__, m_planned_scrub);
9f95a23c 1759}
7c673cae 1760
1e59de90
TL
1761epoch_t PG::cluster_osdmap_trim_lower_bound() {
1762 return osd->get_superblock().cluster_osdmap_trim_lower_bound;
9f95a23c 1763}
7c673cae 1764
9f95a23c
TL
1765OstreamTemp PG::get_clog_info() {
1766 return osd->clog->info();
1767}
7c673cae 1768
9f95a23c
TL
1769OstreamTemp PG::get_clog_debug() {
1770 return osd->clog->debug();
1771}
7c673cae 1772
9f95a23c
TL
1773OstreamTemp PG::get_clog_error() {
1774 return osd->clog->error();
1775}
7c673cae 1776
9f95a23c
TL
1777void PG::schedule_event_after(
1778 PGPeeringEventRef event,
1779 float delay) {
1780 std::lock_guard lock(osd->recovery_request_lock);
1781 osd->recovery_request_timer.add_event_after(
1782 delay,
1783 new QueuePeeringEvt(
1784 this,
1785 std::move(event)));
1786}
7c673cae 1787
9f95a23c
TL
1788void PG::request_local_background_io_reservation(
1789 unsigned priority,
f67539c2
TL
1790 PGPeeringEventURef on_grant,
1791 PGPeeringEventURef on_preempt) {
9f95a23c
TL
1792 osd->local_reserver.request_reservation(
1793 pg_id,
1794 on_grant ? new QueuePeeringEvt(
f67539c2 1795 this, std::move(on_grant)) : nullptr,
9f95a23c
TL
1796 priority,
1797 on_preempt ? new QueuePeeringEvt(
f67539c2 1798 this, std::move(on_preempt)) : nullptr);
9f95a23c 1799}
7c673cae 1800
9f95a23c
TL
1801void PG::update_local_background_io_priority(
1802 unsigned priority) {
1803 osd->local_reserver.update_priority(
1804 pg_id,
1805 priority);
1806}
7c673cae 1807
9f95a23c
TL
1808void PG::cancel_local_background_io_reservation() {
1809 osd->local_reserver.cancel_reservation(
1810 pg_id);
1811}
7c673cae 1812
9f95a23c
TL
1813void PG::request_remote_recovery_reservation(
1814 unsigned priority,
f67539c2
TL
1815 PGPeeringEventURef on_grant,
1816 PGPeeringEventURef on_preempt) {
9f95a23c
TL
1817 osd->remote_reserver.request_reservation(
1818 pg_id,
1819 on_grant ? new QueuePeeringEvt(
f67539c2 1820 this, std::move(on_grant)) : nullptr,
9f95a23c
TL
1821 priority,
1822 on_preempt ? new QueuePeeringEvt(
f67539c2 1823 this, std::move(on_preempt)) : nullptr);
9f95a23c 1824}
11fdf7f2 1825
9f95a23c
TL
1826void PG::cancel_remote_recovery_reservation() {
1827 osd->remote_reserver.cancel_reservation(
1828 pg_id);
7c673cae
FG
1829}
1830
9f95a23c
TL
1831void PG::schedule_event_on_commit(
1832 ObjectStore::Transaction &t,
1833 PGPeeringEventRef on_commit)
11fdf7f2 1834{
9f95a23c 1835 t.register_on_commit(new QueuePeeringEvt(this, on_commit));
11fdf7f2
TL
1836}
1837
f67539c2
TL
1838void PG::on_activate(interval_set<snapid_t> snaps)
1839{
1840 ceph_assert(!m_scrubber->are_callbacks_pending());
1841 ceph_assert(callbacks_for_degraded_object.empty());
1842 snap_trimq = snaps;
1843 release_pg_backoffs();
1844 projected_last_update = info.last_update;
1845}
1846
9f95a23c 1847void PG::on_active_exit()
11fdf7f2 1848{
9f95a23c
TL
1849 backfill_reserving = false;
1850 agent_stop();
11fdf7f2
TL
1851}
1852
9f95a23c 1853void PG::on_active_advmap(const OSDMapRef &osdmap)
11fdf7f2 1854{
9f95a23c
TL
1855 const auto& new_removed_snaps = osdmap->get_new_removed_snaps();
1856 auto i = new_removed_snaps.find(get_pgid().pool());
1857 if (i != new_removed_snaps.end()) {
1858 bool bad = false;
1859 for (auto j : i->second) {
1860 if (snap_trimq.intersects(j.first, j.second)) {
1861 decltype(snap_trimq) added, overlap;
1862 added.insert(j.first, j.second);
1863 overlap.intersection_of(snap_trimq, added);
1864 derr << __func__ << " removed_snaps already contains "
1865 << overlap << dendl;
1866 bad = true;
1867 snap_trimq.union_of(added);
1868 } else {
1869 snap_trimq.insert(j.first, j.second);
1870 }
1871 }
1872 dout(10) << __func__ << " new removed_snaps " << i->second
1873 << ", snap_trimq now " << snap_trimq << dendl;
1874 ceph_assert(!bad || !cct->_conf->osd_debug_verify_cached_snaps);
1875 }
1876
1877 const auto& new_purged_snaps = osdmap->get_new_purged_snaps();
1878 auto j = new_purged_snaps.find(get_pgid().pgid.pool());
1879 if (j != new_purged_snaps.end()) {
1880 bool bad = false;
1881 for (auto k : j->second) {
1882 if (!recovery_state.get_info().purged_snaps.contains(k.first, k.second)) {
1883 interval_set<snapid_t> rm, overlap;
1884 rm.insert(k.first, k.second);
1885 overlap.intersection_of(recovery_state.get_info().purged_snaps, rm);
1886 derr << __func__ << " purged_snaps does not contain "
1887 << rm << ", only " << overlap << dendl;
1888 recovery_state.adjust_purged_snaps(
1889 [&overlap](auto &purged_snaps) {
1890 purged_snaps.subtract(overlap);
1891 });
1892 // This can currently happen in the normal (if unlikely) course of
1893 // events. Because adding snaps to purged_snaps does not increase
1894 // the pg version or add a pg log entry, we don't reliably propagate
1895 // purged_snaps additions to other OSDs.
1896 // One example:
1897 // - purge S
1898 // - primary and replicas update purged_snaps
1899 // - no object updates
1900 // - pg mapping changes, new primary on different node
1901 // - new primary pg version == eversion_t(), so info is not
1902 // propagated.
1903 //bad = true;
1904 } else {
1905 recovery_state.adjust_purged_snaps(
1906 [&k](auto &purged_snaps) {
1907 purged_snaps.erase(k.first, k.second);
1908 });
11fdf7f2
TL
1909 }
1910 }
9f95a23c
TL
1911 dout(10) << __func__ << " new purged_snaps " << j->second
1912 << ", now " << recovery_state.get_info().purged_snaps << dendl;
1913 ceph_assert(!bad || !cct->_conf->osd_debug_verify_cached_snaps);
11fdf7f2 1914 }
11fdf7f2
TL
1915}
1916
9f95a23c 1917void PG::queue_snap_retrim(snapid_t snap)
7c673cae 1918{
9f95a23c
TL
1919 if (!is_active() ||
1920 !is_primary()) {
1921 dout(10) << __func__ << " snap " << snap << " - not active and primary"
1922 << dendl;
7c673cae 1923 return;
7c673cae 1924 }
9f95a23c
TL
1925 if (!snap_trimq.contains(snap)) {
1926 snap_trimq.insert(snap);
1927 snap_trimq_repeat.insert(snap);
1928 dout(20) << __func__ << " snap " << snap
1929 << ", trimq now " << snap_trimq
1930 << ", repeat " << snap_trimq_repeat << dendl;
1931 kick_snap_trim();
1932 } else {
1933 dout(20) << __func__ << " snap " << snap
1934 << " already in trimq " << snap_trimq << dendl;
7c673cae 1935 }
7c673cae
FG
1936}
1937
9f95a23c 1938void PG::on_active_actmap()
7c673cae 1939{
9f95a23c
TL
1940 if (cct->_conf->osd_check_for_log_corruption)
1941 check_log_for_corruption(osd->store);
1942
1943
1944 if (recovery_state.is_active()) {
1945 dout(10) << "Active: kicking snap trim" << dendl;
1946 kick_snap_trim();
7c673cae 1947 }
9f95a23c
TL
1948
1949 if (recovery_state.is_peered() &&
1950 !recovery_state.is_clean() &&
1951 !recovery_state.get_osdmap()->test_flag(CEPH_OSDMAP_NOBACKFILL) &&
1952 (!recovery_state.get_osdmap()->test_flag(CEPH_OSDMAP_NOREBALANCE) ||
1953 recovery_state.is_degraded())) {
1954 queue_recovery();
7c673cae
FG
1955 }
1956}
1957
9f95a23c 1958void PG::on_backfill_reserved()
7c673cae 1959{
9f95a23c
TL
1960 backfill_reserving = false;
1961 queue_recovery();
7c673cae
FG
1962}
1963
9f95a23c 1964void PG::on_backfill_canceled()
7c673cae 1965{
9f95a23c
TL
1966 if (!waiting_on_backfill.empty()) {
1967 waiting_on_backfill.clear();
1968 finish_recovery_op(hobject_t::get_max());
7c673cae
FG
1969 }
1970}
1971
9f95a23c 1972void PG::on_recovery_reserved()
7c673cae 1973{
9f95a23c 1974 queue_recovery();
7c673cae
FG
1975}
1976
9f95a23c 1977void PG::set_not_ready_to_merge_target(pg_t pgid, pg_t src)
7c673cae 1978{
9f95a23c 1979 osd->set_not_ready_to_merge_target(pgid, src);
7c673cae
FG
1980}
1981
9f95a23c 1982void PG::set_not_ready_to_merge_source(pg_t pgid)
7c673cae 1983{
9f95a23c 1984 osd->set_not_ready_to_merge_source(pgid);
7c673cae
FG
1985}
1986
9f95a23c 1987void PG::set_ready_to_merge_target(eversion_t lu, epoch_t les, epoch_t lec)
7c673cae 1988{
9f95a23c 1989 osd->set_ready_to_merge_target(this, lu, les, lec);
7c673cae
FG
1990}
1991
9f95a23c 1992void PG::set_ready_to_merge_source(eversion_t lu)
7c673cae 1993{
9f95a23c 1994 osd->set_ready_to_merge_source(this, lu);
7c673cae
FG
1995}
1996
9f95a23c 1997void PG::send_pg_created(pg_t pgid)
7c673cae 1998{
9f95a23c
TL
1999 osd->send_pg_created(pgid);
2000}
7c673cae 2001
1e59de90 2002ceph::signedspan PG::get_mnow() const
9f95a23c
TL
2003{
2004 return osd->get_mnow();
2005}
7c673cae 2006
9f95a23c
TL
2007HeartbeatStampsRef PG::get_hb_stamps(int peer)
2008{
2009 return osd->get_hb_stamps(peer);
7c673cae
FG
2010}
2011
9f95a23c
TL
2012void PG::schedule_renew_lease(epoch_t lpr, ceph::timespan delay)
2013{
2014 auto spgid = info.pgid;
2015 auto o = osd;
2016 osd->mono_timer.add_event(
2017 delay,
2018 [o, lpr, spgid]() {
2019 o->queue_renew_lease(lpr, spgid);
2020 });
2021}
7c673cae 2022
9f95a23c 2023void PG::queue_check_readable(epoch_t lpr, ceph::timespan delay)
7c673cae 2024{
9f95a23c 2025 osd->queue_check_readable(info.pgid, lpr, delay);
7c673cae
FG
2026}
2027
9f95a23c 2028void PG::rebuild_missing_set_with_deletes(PGLog &pglog)
91327a77 2029{
9f95a23c
TL
2030 pglog.rebuild_missing_set_with_deletes(
2031 osd->store,
2032 ch,
2033 recovery_state.get_info());
91327a77
AA
2034}
2035
9f95a23c 2036void PG::on_activate_committed()
91327a77 2037{
9f95a23c
TL
2038 if (!is_primary()) {
2039 // waiters
2040 if (recovery_state.needs_flush() == 0) {
2041 requeue_ops(waiting_for_peered);
2042 } else if (!waiting_for_peered.empty()) {
2043 dout(10) << __func__ << " flushes in progress, moving "
2044 << waiting_for_peered.size() << " items to waiting_for_flush"
2045 << dendl;
2046 ceph_assert(waiting_for_flush.empty());
2047 waiting_for_flush.swap(waiting_for_peered);
91327a77 2048 }
9f95a23c
TL
2049 }
2050}
91327a77 2051
9f95a23c
TL
2052// Compute pending backfill data
2053static int64_t pending_backfill(CephContext *cct, int64_t bf_bytes, int64_t local_bytes)
11fdf7f2 2054{
9f95a23c
TL
2055 lgeneric_dout(cct, 20) << __func__ << " Adjust local usage "
2056 << (local_bytes >> 10) << "KiB"
2057 << " primary usage " << (bf_bytes >> 10)
2058 << "KiB" << dendl;
11fdf7f2 2059
9f95a23c
TL
2060 return std::max((int64_t)0, bf_bytes - local_bytes);
2061}
7c673cae 2062
7c673cae 2063
9f95a23c
TL
2064// We can zero the value of primary num_bytes as just an atomic.
2065// However, setting above zero reserves space for backfill and requires
2066// the OSDService::stat_lock which protects all OSD usage
2067bool PG::try_reserve_recovery_space(
2068 int64_t primary_bytes, int64_t local_bytes) {
2069 // Use tentative_bacfill_full() to make sure enough
2070 // space is available to handle target bytes from primary.
7c673cae 2071
9f95a23c
TL
2072 // TODO: If we passed num_objects from primary we could account for
2073 // an estimate of the metadata overhead.
7c673cae 2074
9f95a23c
TL
2075 // TODO: If we had compressed_allocated and compressed_original from primary
2076 // we could compute compression ratio and adjust accordingly.
7c673cae 2077
9f95a23c
TL
2078 // XXX: There is no way to get omap overhead and this would only apply
2079 // to whatever possibly different partition that is storing the database.
7c673cae 2080
9f95a23c
TL
2081 // update_osd_stat() from heartbeat will do this on a new
2082 // statfs using ps->primary_bytes.
2083 uint64_t pending_adjustment = 0;
2084 if (primary_bytes) {
2085 // For erasure coded pool overestimate by a full stripe per object
2086 // because we don't know how each objected rounded to the nearest stripe
2087 if (pool.info.is_erasure()) {
2088 primary_bytes /= (int)get_pgbackend()->get_ec_data_chunk_count();
2089 primary_bytes += get_pgbackend()->get_ec_stripe_chunk_size() *
2090 info.stats.stats.sum.num_objects;
2091 local_bytes /= (int)get_pgbackend()->get_ec_data_chunk_count();
2092 local_bytes += get_pgbackend()->get_ec_stripe_chunk_size() *
2093 info.stats.stats.sum.num_objects;
2094 }
2095 pending_adjustment = pending_backfill(
2096 cct,
2097 primary_bytes,
2098 local_bytes);
2099 dout(10) << __func__ << " primary_bytes " << (primary_bytes >> 10)
2100 << "KiB"
2101 << " local " << (local_bytes >> 10) << "KiB"
2102 << " pending_adjustments " << (pending_adjustment >> 10) << "KiB"
2103 << dendl;
7c673cae 2104 }
7c673cae 2105
9f95a23c
TL
2106 // This lock protects not only the stats OSDService but also setting the
2107 // pg primary_bytes. That's why we don't immediately unlock
2108 std::lock_guard l{osd->stat_lock};
2109 osd_stat_t cur_stat = osd->osd_stat;
2110 if (cct->_conf->osd_debug_reject_backfill_probability > 0 &&
2111 (rand()%1000 < (cct->_conf->osd_debug_reject_backfill_probability*1000.0))) {
2112 dout(10) << "backfill reservation rejected: failure injection"
2113 << dendl;
2114 return false;
2115 } else if (!cct->_conf->osd_debug_skip_full_check_in_backfill_reservation &&
2116 osd->tentative_backfill_full(this, pending_adjustment, cur_stat)) {
2117 dout(10) << "backfill reservation rejected: backfill full"
2118 << dendl;
2119 return false;
2120 } else {
2121 // Don't reserve space if skipped reservation check, this is used
2122 // to test the other backfill full check AND in case a corruption
2123 // of num_bytes requires ignoring that value and trying the
2124 // backfill anyway.
2125 if (primary_bytes &&
2126 !cct->_conf->osd_debug_skip_full_check_in_backfill_reservation) {
2127 primary_num_bytes.store(primary_bytes);
2128 local_num_bytes.store(local_bytes);
2129 } else {
2130 unreserve_recovery_space();
2131 }
2132 return true;
7c673cae
FG
2133 }
2134}
2135
9f95a23c
TL
2136void PG::unreserve_recovery_space() {
2137 primary_num_bytes.store(0);
2138 local_num_bytes.store(0);
7c673cae
FG
2139}
2140
9f95a23c 2141void PG::_scan_rollback_obs(const vector<ghobject_t> &rollback_obs)
7c673cae 2142{
9f95a23c
TL
2143 ObjectStore::Transaction t;
2144 eversion_t trimmed_to = recovery_state.get_last_rollback_info_trimmed_to_applied();
2145 for (vector<ghobject_t>::const_iterator i = rollback_obs.begin();
2146 i != rollback_obs.end();
2147 ++i) {
2148 if (i->generation < trimmed_to.version) {
2149 dout(10) << __func__ << "osd." << osd->whoami
2150 << " pg " << info.pgid
2151 << " found obsolete rollback obj "
2152 << *i << " generation < trimmed_to "
2153 << trimmed_to
2154 << "...repaired" << dendl;
2155 t.remove(coll, *i);
2156 }
2157 }
2158 if (!t.empty()) {
2159 derr << __func__ << ": queueing trans to clean up obsolete rollback objs"
2160 << dendl;
2161 osd->store->queue_transaction(ch, std::move(t), NULL);
2162 }
7c673cae
FG
2163}
2164
7c673cae 2165
20effc67 2166void PG::forward_scrub_event(ScrubAPI fn, epoch_t epoch_queued, std::string_view desc)
7c673cae 2167{
20effc67
TL
2168 dout(20) << __func__ << ": " << desc << " queued at: " << epoch_queued << dendl;
2169 ceph_assert(m_scrubber);
2170 if (is_active()) {
f67539c2 2171 ((*m_scrubber).*fn)(epoch_queued);
7c673cae 2172 } else {
f67539c2
TL
2173 // pg might be in the process of being deleted
2174 dout(5) << __func__ << " refusing to forward. " << (is_clean() ? "(clean) " : "(not clean) ") <<
20effc67 2175 (is_active() ? "(active) " : "(not active) ") << dendl;
7c673cae 2176 }
f67539c2 2177}
7c673cae 2178
20effc67
TL
2179void PG::forward_scrub_event(ScrubSafeAPI fn,
2180 epoch_t epoch_queued,
2181 Scrub::act_token_t act_token,
2182 std::string_view desc)
f67539c2 2183{
20effc67
TL
2184 dout(20) << __func__ << ": " << desc << " queued: " << epoch_queued
2185 << " token: " << act_token << dendl;
2186 ceph_assert(m_scrubber);
2187 if (is_active()) {
2188 ((*m_scrubber).*fn)(epoch_queued, act_token);
2189 } else {
2190 // pg might be in the process of being deleted
2191 dout(5) << __func__ << " refusing to forward. "
2192 << (is_clean() ? "(clean) " : "(not clean) ")
2193 << (is_active() ? "(active) " : "(not active) ") << dendl;
2194 }
7c673cae
FG
2195}
2196
20effc67 2197void PG::replica_scrub(OpRequestRef op, ThreadPool::TPHandle& handle)
f6b5b4d7 2198{
20effc67
TL
2199 dout(10) << __func__ << " (op)" << dendl;
2200 ceph_assert(m_scrubber);
2201 m_scrubber->replica_scrub_op(op);
f6b5b4d7
TL
2202}
2203
f67539c2 2204void PG::replica_scrub(epoch_t epoch_queued,
20effc67 2205 Scrub::act_token_t act_token,
f67539c2
TL
2206 [[maybe_unused]] ThreadPool::TPHandle& handle)
2207{
2208 dout(10) << __func__ << " queued at: " << epoch_queued
2209 << (is_primary() ? " (primary)" : " (replica)") << dendl;
20effc67
TL
2210 forward_scrub_event(&ScrubPgIF::send_start_replica, epoch_queued, act_token,
2211 "StartReplica/nw");
f67539c2 2212}
7c673cae 2213
f67539c2
TL
2214bool PG::ops_blocked_by_scrub() const
2215{
2216 return !waiting_for_scrub.empty();
2217}
11fdf7f2 2218
f67539c2
TL
2219Scrub::scrub_prio_t PG::is_scrub_blocking_ops() const
2220{
2221 return waiting_for_scrub.empty() ? Scrub::scrub_prio_t::low_priority
2222 : Scrub::scrub_prio_t::high_priority;
7c673cae
FG
2223}
2224
9f95a23c 2225bool PG::old_peering_msg(epoch_t reply_epoch, epoch_t query_epoch)
7c673cae 2226{
f67539c2
TL
2227 if (auto last_reset = get_last_peering_reset();
2228 last_reset > reply_epoch || last_reset > query_epoch) {
2229 dout(10) << "old_peering_msg reply_epoch " << reply_epoch << " query_epoch "
2230 << query_epoch << " last_peering_reset " << last_reset << dendl;
9f95a23c
TL
2231 return true;
2232 }
2233 return false;
7c673cae
FG
2234}
2235
9f95a23c
TL
2236struct FlushState {
2237 PGRef pg;
2238 epoch_t epoch;
2239 FlushState(PG *pg, epoch_t epoch) : pg(pg), epoch(epoch) {}
2240 ~FlushState() {
2241 std::scoped_lock l{*pg};
2242 if (!pg->pg_has_reset_since(epoch)) {
2243 pg->recovery_state.complete_flush();
2244 }
7c673cae 2245 }
9f95a23c
TL
2246};
2247typedef std::shared_ptr<FlushState> FlushStateRef;
7c673cae 2248
9f95a23c 2249void PG::start_flush_on_transaction(ObjectStore::Transaction &t)
7c673cae 2250{
9f95a23c
TL
2251 // flush in progress ops
2252 FlushStateRef flush_trigger (std::make_shared<FlushState>(
2253 this, get_osdmap_epoch()));
2254 t.register_on_applied(new ContainerContext<FlushStateRef>(flush_trigger));
2255 t.register_on_commit(new ContainerContext<FlushStateRef>(flush_trigger));
7c673cae
FG
2256}
2257
9f95a23c 2258bool PG::try_flush_or_schedule_async()
11fdf7f2 2259{
9f95a23c
TL
2260 Context *c = new QueuePeeringEvt(
2261 this, get_osdmap_epoch(), PeeringState::IntervalFlush());
2262 if (!ch->flush_commit(c)) {
2263 return false;
2264 } else {
2265 delete c;
2266 return true;
2267 }
11fdf7f2
TL
2268}
2269
9f95a23c 2270ostream& operator<<(ostream& out, const PG& pg)
11fdf7f2 2271{
9f95a23c 2272 out << pg.recovery_state;
f67539c2
TL
2273
2274 // listing all scrub-related flags - both current and "planned next scrub"
2275 if (pg.is_scrubbing()) {
2276 out << *pg.m_scrubber;
2277 }
2278 out << pg.m_planned_scrub;
11fdf7f2 2279
9f95a23c
TL
2280 if (pg.recovery_ops_active)
2281 out << " rops=" << pg.recovery_ops_active;
11fdf7f2 2282
9f95a23c
TL
2283 //out << " (" << pg.pg_log.get_tail() << "," << pg.pg_log.get_head() << "]";
2284 if (pg.recovery_state.have_missing()) {
2285 out << " m=" << pg.recovery_state.get_num_missing();
2286 if (pg.is_primary()) {
2287 uint64_t unfound = pg.recovery_state.get_num_unfound();
2288 if (unfound)
2289 out << " u=" << unfound;
2290 }
2291 }
2292 if (!pg.is_clean()) {
2293 out << " mbc=" << pg.recovery_state.get_missing_by_count();
2294 }
2295 if (!pg.snap_trimq.empty()) {
2296 out << " trimq=";
2297 // only show a count if the set is large
2298 if (pg.snap_trimq.num_intervals() > 16) {
2299 out << pg.snap_trimq.size();
2300 if (!pg.snap_trimq_repeat.empty()) {
2301 out << "(" << pg.snap_trimq_repeat.size() << ")";
2302 }
2303 } else {
2304 out << pg.snap_trimq;
2305 if (!pg.snap_trimq_repeat.empty()) {
2306 out << "(" << pg.snap_trimq_repeat << ")";
2307 }
2308 }
2309 }
2310 if (!pg.recovery_state.get_info().purged_snaps.empty()) {
2311 out << " ps="; // snap trim queue / purged snaps
2312 if (pg.recovery_state.get_info().purged_snaps.num_intervals() > 16) {
2313 out << pg.recovery_state.get_info().purged_snaps.size();
2314 } else {
2315 out << pg.recovery_state.get_info().purged_snaps;
2316 }
11fdf7f2 2317 }
11fdf7f2 2318
9f95a23c 2319 out << "]";
9f95a23c 2320 return out;
11fdf7f2
TL
2321}
2322
9f95a23c 2323bool PG::can_discard_op(OpRequestRef& op)
7c673cae 2324{
9f95a23c
TL
2325 auto m = op->get_req<MOSDOp>();
2326 if (cct->_conf->osd_discard_disconnected_ops && OSD::op_is_discardable(m)) {
2327 dout(20) << " discard " << *m << dendl;
2328 return true;
2329 }
7c673cae 2330
9f95a23c
TL
2331 if (m->get_map_epoch() < info.history.same_primary_since) {
2332 dout(7) << " changed after " << m->get_map_epoch()
2333 << ", dropping " << *m << dendl;
2334 return true;
2335 }
7c673cae 2336
9f95a23c
TL
2337 if ((m->get_flags() & (CEPH_OSD_FLAG_BALANCE_READS |
2338 CEPH_OSD_FLAG_LOCALIZE_READS)) &&
2339 !is_primary() &&
2340 m->get_map_epoch() < info.history.same_interval_since) {
2341 // Note: the Objecter will resend on interval change without the primary
2342 // changing if it actually sent to a replica. If the primary hasn't
2343 // changed since the send epoch, we got it, and we're primary, it won't
2344 // have resent even if the interval did change as it sent it to the primary
2345 // (us).
2346 return true;
7c673cae 2347 }
7c673cae 2348
7c673cae 2349
9f95a23c
TL
2350 if (m->get_connection()->has_feature(CEPH_FEATURE_RESEND_ON_SPLIT)) {
2351 // >= luminous client
2352 if (m->get_connection()->has_feature(CEPH_FEATURE_SERVER_NAUTILUS)) {
2353 // >= nautilus client
2354 if (m->get_map_epoch() < pool.info.get_last_force_op_resend()) {
2355 dout(7) << __func__ << " sent before last_force_op_resend "
2356 << pool.info.last_force_op_resend
2357 << ", dropping" << *m << dendl;
2358 return true;
2359 }
2360 } else {
2361 // == < nautilus client (luminous or mimic)
2362 if (m->get_map_epoch() < pool.info.get_last_force_op_resend_prenautilus()) {
2363 dout(7) << __func__ << " sent before last_force_op_resend_prenautilus "
2364 << pool.info.last_force_op_resend_prenautilus
2365 << ", dropping" << *m << dendl;
2366 return true;
2367 }
7c673cae 2368 }
9f95a23c
TL
2369 if (m->get_map_epoch() < info.history.last_epoch_split) {
2370 dout(7) << __func__ << " pg split in "
2371 << info.history.last_epoch_split << ", dropping" << dendl;
2372 return true;
7c673cae 2373 }
9f95a23c
TL
2374 } else if (m->get_connection()->has_feature(CEPH_FEATURE_OSD_POOLRESEND)) {
2375 // < luminous client
2376 if (m->get_map_epoch() < pool.info.get_last_force_op_resend_preluminous()) {
2377 dout(7) << __func__ << " sent before last_force_op_resend_preluminous "
2378 << pool.info.last_force_op_resend_preluminous
2379 << ", dropping" << *m << dendl;
2380 return true;
7c673cae
FG
2381 }
2382 }
2383
9f95a23c 2384 return false;
7c673cae
FG
2385}
2386
9f95a23c
TL
2387template<typename T, int MSGTYPE>
2388bool PG::can_discard_replica_op(OpRequestRef& op)
7c673cae 2389{
9f95a23c
TL
2390 auto m = op->get_req<T>();
2391 ceph_assert(m->get_type() == MSGTYPE);
7c673cae 2392
9f95a23c 2393 int from = m->get_source().num();
7c673cae 2394
9f95a23c
TL
2395 // if a repop is replied after a replica goes down in a new osdmap, and
2396 // before the pg advances to this new osdmap, the repop replies before this
2397 // repop can be discarded by that replica OSD, because the primary resets the
2398 // connection to it when handling the new osdmap marking it down, and also
2399 // resets the messenger sesssion when the replica reconnects. to avoid the
2400 // out-of-order replies, the messages from that replica should be discarded.
2401 OSDMapRef next_map = osd->get_next_osdmap();
f67539c2
TL
2402 if (next_map->is_down(from)) {
2403 dout(20) << " " << __func__ << " dead for nextmap is down " << from << dendl;
9f95a23c 2404 return true;
f67539c2 2405 }
9f95a23c
TL
2406 /* Mostly, this overlaps with the old_peering_msg
2407 * condition. An important exception is pushes
2408 * sent by replicas not in the acting set, since
2409 * if such a replica goes down it does not cause
2410 * a new interval. */
f67539c2
TL
2411 if (next_map->get_down_at(from) >= m->map_epoch) {
2412 dout(20) << " " << __func__ << " dead for 'get_down_at' " << from << dendl;
9f95a23c 2413 return true;
f67539c2 2414 }
7c673cae 2415
9f95a23c
TL
2416 // same pg?
2417 // if pg changes _at all_, we reset and repeer!
2418 if (old_peering_msg(m->map_epoch, m->map_epoch)) {
2419 dout(10) << "can_discard_replica_op pg changed " << info.history
2420 << " after " << m->map_epoch
2421 << ", dropping" << dendl;
2422 return true;
7c673cae 2423 }
9f95a23c 2424 return false;
7c673cae
FG
2425}
2426
9f95a23c 2427bool PG::can_discard_scan(OpRequestRef op)
7c673cae 2428{
9f95a23c
TL
2429 auto m = op->get_req<MOSDPGScan>();
2430 ceph_assert(m->get_type() == MSG_OSD_PG_SCAN);
7c673cae 2431
9f95a23c
TL
2432 if (old_peering_msg(m->map_epoch, m->query_epoch)) {
2433 dout(10) << " got old scan, ignoring" << dendl;
2434 return true;
7c673cae 2435 }
9f95a23c 2436 return false;
7c673cae
FG
2437}
2438
9f95a23c 2439bool PG::can_discard_backfill(OpRequestRef op)
7c673cae 2440{
9f95a23c
TL
2441 auto m = op->get_req<MOSDPGBackfill>();
2442 ceph_assert(m->get_type() == MSG_OSD_PG_BACKFILL);
7c673cae 2443
9f95a23c
TL
2444 if (old_peering_msg(m->map_epoch, m->query_epoch)) {
2445 dout(10) << " got old backfill, ignoring" << dendl;
2446 return true;
7c673cae
FG
2447 }
2448
9f95a23c 2449 return false;
7c673cae 2450
7c673cae
FG
2451}
2452
9f95a23c 2453bool PG::can_discard_request(OpRequestRef& op)
7c673cae 2454{
9f95a23c
TL
2455 switch (op->get_req()->get_type()) {
2456 case CEPH_MSG_OSD_OP:
2457 return can_discard_op(op);
2458 case CEPH_MSG_OSD_BACKOFF:
2459 return false; // never discard
2460 case MSG_OSD_REPOP:
2461 return can_discard_replica_op<MOSDRepOp, MSG_OSD_REPOP>(op);
2462 case MSG_OSD_PG_PUSH:
2463 return can_discard_replica_op<MOSDPGPush, MSG_OSD_PG_PUSH>(op);
2464 case MSG_OSD_PG_PULL:
2465 return can_discard_replica_op<MOSDPGPull, MSG_OSD_PG_PULL>(op);
2466 case MSG_OSD_PG_PUSH_REPLY:
2467 return can_discard_replica_op<MOSDPGPushReply, MSG_OSD_PG_PUSH_REPLY>(op);
2468 case MSG_OSD_REPOPREPLY:
2469 return can_discard_replica_op<MOSDRepOpReply, MSG_OSD_REPOPREPLY>(op);
2470 case MSG_OSD_PG_RECOVERY_DELETE:
2471 return can_discard_replica_op<MOSDPGRecoveryDelete, MSG_OSD_PG_RECOVERY_DELETE>(op);
7c673cae 2472
9f95a23c
TL
2473 case MSG_OSD_PG_RECOVERY_DELETE_REPLY:
2474 return can_discard_replica_op<MOSDPGRecoveryDeleteReply, MSG_OSD_PG_RECOVERY_DELETE_REPLY>(op);
7c673cae 2475
9f95a23c
TL
2476 case MSG_OSD_EC_WRITE:
2477 return can_discard_replica_op<MOSDECSubOpWrite, MSG_OSD_EC_WRITE>(op);
2478 case MSG_OSD_EC_WRITE_REPLY:
2479 return can_discard_replica_op<MOSDECSubOpWriteReply, MSG_OSD_EC_WRITE_REPLY>(op);
2480 case MSG_OSD_EC_READ:
2481 return can_discard_replica_op<MOSDECSubOpRead, MSG_OSD_EC_READ>(op);
2482 case MSG_OSD_EC_READ_REPLY:
2483 return can_discard_replica_op<MOSDECSubOpReadReply, MSG_OSD_EC_READ_REPLY>(op);
2484 case MSG_OSD_REP_SCRUB:
2485 return can_discard_replica_op<MOSDRepScrub, MSG_OSD_REP_SCRUB>(op);
2486 case MSG_OSD_SCRUB_RESERVE:
2487 return can_discard_replica_op<MOSDScrubReserve, MSG_OSD_SCRUB_RESERVE>(op);
2488 case MSG_OSD_REP_SCRUBMAP:
2489 return can_discard_replica_op<MOSDRepScrubMap, MSG_OSD_REP_SCRUBMAP>(op);
2490 case MSG_OSD_PG_UPDATE_LOG_MISSING:
2491 return can_discard_replica_op<
2492 MOSDPGUpdateLogMissing, MSG_OSD_PG_UPDATE_LOG_MISSING>(op);
2493 case MSG_OSD_PG_UPDATE_LOG_MISSING_REPLY:
2494 return can_discard_replica_op<
2495 MOSDPGUpdateLogMissingReply, MSG_OSD_PG_UPDATE_LOG_MISSING_REPLY>(op);
2496
2497 case MSG_OSD_PG_SCAN:
2498 return can_discard_scan(op);
2499 case MSG_OSD_PG_BACKFILL:
2500 return can_discard_backfill(op);
2501 case MSG_OSD_PG_BACKFILL_REMOVE:
2502 return can_discard_replica_op<MOSDPGBackfillRemove,
2503 MSG_OSD_PG_BACKFILL_REMOVE>(op);
7c673cae 2504 }
9f95a23c 2505 return true;
7c673cae
FG
2506}
2507
9f95a23c 2508void PG::do_peering_event(PGPeeringEventRef evt, PeeringCtx &rctx)
7c673cae 2509{
9f95a23c
TL
2510 dout(10) << __func__ << ": " << evt->get_desc() << dendl;
2511 ceph_assert(have_same_or_newer_map(evt->get_epoch_sent()));
2512 if (old_peering_evt(evt)) {
2513 dout(10) << "discard old " << evt->get_desc() << dendl;
2514 } else {
2515 recovery_state.handle_event(evt, &rctx);
7c673cae 2516 }
9f95a23c
TL
2517 // write_if_dirty regardless of path above to ensure we capture any work
2518 // done by OSD::advance_pg().
2519 write_if_dirty(rctx.transaction);
7c673cae
FG
2520}
2521
9f95a23c 2522void PG::queue_peering_event(PGPeeringEventRef evt)
7c673cae 2523{
9f95a23c
TL
2524 if (old_peering_evt(evt))
2525 return;
2526 osd->osd->enqueue_peering_evt(info.pgid, evt);
7c673cae
FG
2527}
2528
9f95a23c
TL
2529void PG::queue_null(epoch_t msg_epoch,
2530 epoch_t query_epoch)
7c673cae 2531{
9f95a23c
TL
2532 dout(10) << "null" << dendl;
2533 queue_peering_event(
2534 PGPeeringEventRef(std::make_shared<PGPeeringEvent>(msg_epoch, query_epoch,
2535 NullEvt())));
7c673cae
FG
2536}
2537
9f95a23c 2538void PG::find_unfound(epoch_t queued, PeeringCtx &rctx)
7c673cae 2539{
9f95a23c
TL
2540 /*
2541 * if we couldn't start any recovery ops and things are still
2542 * unfound, see if we can discover more missing object locations.
2543 * It may be that our initial locations were bad and we errored
2544 * out while trying to pull.
2545 */
2546 if (!recovery_state.discover_all_missing(rctx)) {
2547 string action;
2548 if (state_test(PG_STATE_BACKFILLING)) {
2549 auto evt = PGPeeringEventRef(
2550 new PGPeeringEvent(
2551 queued,
2552 queued,
2553 PeeringState::UnfoundBackfill()));
2554 queue_peering_event(evt);
2555 action = "in backfill";
2556 } else if (state_test(PG_STATE_RECOVERING)) {
2557 auto evt = PGPeeringEventRef(
2558 new PGPeeringEvent(
2559 queued,
2560 queued,
2561 PeeringState::UnfoundRecovery()));
2562 queue_peering_event(evt);
2563 action = "in recovery";
2564 } else {
2565 action = "already out of recovery/backfill";
7c673cae 2566 }
9f95a23c
TL
2567 dout(10) << __func__ << ": no luck, giving up on this pg for now (" << action << ")" << dendl;
2568 } else {
2569 dout(10) << __func__ << ": no luck, giving up on this pg for now (queue_recovery)" << dendl;
2570 queue_recovery();
7c673cae 2571 }
7c673cae
FG
2572}
2573
9f95a23c
TL
2574void PG::handle_advance_map(
2575 OSDMapRef osdmap, OSDMapRef lastmap,
2576 vector<int>& newup, int up_primary,
2577 vector<int>& newacting, int acting_primary,
2578 PeeringCtx &rctx)
7c673cae 2579{
9f95a23c
TL
2580 dout(10) << __func__ << ": " << osdmap->get_epoch() << dendl;
2581 osd_shard->update_pg_epoch(pg_slot, osdmap->get_epoch());
2582 recovery_state.advance_map(
2583 osdmap,
2584 lastmap,
2585 newup,
2586 up_primary,
2587 newacting,
2588 acting_primary,
2589 rctx);
7c673cae
FG
2590}
2591
9f95a23c 2592void PG::handle_activate_map(PeeringCtx &rctx)
7c673cae 2593{
9f95a23c
TL
2594 dout(10) << __func__ << ": " << get_osdmap()->get_epoch()
2595 << dendl;
2596 recovery_state.activate_map(rctx);
7c673cae 2597
9f95a23c 2598 requeue_map_waiters();
7c673cae
FG
2599}
2600
9f95a23c 2601void PG::handle_initialize(PeeringCtx &rctx)
7c673cae 2602{
9f95a23c
TL
2603 dout(10) << __func__ << dendl;
2604 PeeringState::Initialize evt;
2605 recovery_state.handle_event(evt, &rctx);
7c673cae
FG
2606}
2607
f67539c2 2608
9f95a23c 2609void PG::handle_query_state(Formatter *f)
7c673cae 2610{
9f95a23c
TL
2611 dout(10) << "handle_query_state" << dendl;
2612 PeeringState::QueryState q(f);
2613 recovery_state.handle_event(q, 0);
7c673cae
FG
2614}
2615
9f95a23c 2616void PG::init_collection_pool_opts()
11fdf7f2 2617{
9f95a23c
TL
2618 auto r = osd->store->set_collection_opts(ch, pool.info.opts);
2619 if (r < 0 && r != -EOPNOTSUPP) {
2620 derr << __func__ << " set_collection_opts returns error:" << r << dendl;
11fdf7f2 2621 }
11fdf7f2
TL
2622}
2623
9f95a23c 2624void PG::on_pool_change()
7c673cae 2625{
9f95a23c
TL
2626 init_collection_pool_opts();
2627 plpg_on_pool_change();
7c673cae
FG
2628}
2629
9f95a23c
TL
2630void PG::C_DeleteMore::complete(int r) {
2631 ceph_assert(r == 0);
2632 pg->lock();
2633 if (!pg->pg_has_reset_since(epoch)) {
2634 pg->osd->queue_for_pg_delete(pg->get_pgid(), epoch);
7c673cae 2635 }
9f95a23c
TL
2636 pg->unlock();
2637 delete this;
7c673cae
FG
2638}
2639
f67539c2
TL
2640std::pair<ghobject_t, bool> PG::do_delete_work(
2641 ObjectStore::Transaction &t,
2642 ghobject_t _next)
7c673cae 2643{
9f95a23c 2644 dout(10) << __func__ << dendl;
7c673cae 2645
9f95a23c
TL
2646 {
2647 float osd_delete_sleep = osd->osd->get_osd_delete_sleep();
2648 if (osd_delete_sleep > 0 && delete_needs_sleep) {
2649 epoch_t e = get_osdmap()->get_epoch();
2650 PGRef pgref(this);
2651 auto delete_requeue_callback = new LambdaContext([this, pgref, e](int r) {
20effc67 2652 dout(20) << "do_delete_work() [cb] wake up at "
9f95a23c
TL
2653 << ceph_clock_now()
2654 << ", re-queuing delete" << dendl;
2655 std::scoped_lock locker{*this};
2656 delete_needs_sleep = false;
2657 if (!pg_has_reset_since(e)) {
2658 osd->queue_for_pg_delete(get_pgid(), e);
2659 }
2660 });
7c673cae 2661
9f95a23c
TL
2662 auto delete_schedule_time = ceph::real_clock::now();
2663 delete_schedule_time += ceph::make_timespan(osd_delete_sleep);
2664 std::lock_guard l{osd->sleep_lock};
2665 osd->sleep_timer.add_event_at(delete_schedule_time,
2666 delete_requeue_callback);
2667 dout(20) << __func__ << " Delete scheduled at " << delete_schedule_time << dendl;
f67539c2 2668 return std::make_pair(_next, true);
9f95a23c
TL
2669 }
2670 }
7c673cae 2671
9f95a23c 2672 delete_needs_sleep = true;
7c673cae 2673
adb31ebb
TL
2674 ghobject_t next;
2675
9f95a23c
TL
2676 vector<ghobject_t> olist;
2677 int max = std::min(osd->store->get_ideal_list_max(),
2678 (int)cct->_conf->osd_target_transaction_size);
adb31ebb 2679
9f95a23c
TL
2680 osd->store->collection_list(
2681 ch,
adb31ebb 2682 _next,
9f95a23c
TL
2683 ghobject_t::get_max(),
2684 max,
2685 &olist,
2686 &next);
2687 dout(20) << __func__ << " " << olist << dendl;
7c673cae 2688
adb31ebb
TL
2689 // make sure we've removed everything
2690 // by one more listing from the beginning
2691 if (_next != ghobject_t() && olist.empty()) {
2692 next = ghobject_t();
2693 osd->store->collection_list(
2694 ch,
2695 next,
2696 ghobject_t::get_max(),
2697 max,
2698 &olist,
2699 &next);
20effc67
TL
2700 for (auto& oid : olist) {
2701 if (oid == pgmeta_oid) {
2702 dout(20) << __func__ << " removing pgmeta object " << oid << dendl;
2703 } else {
2704 dout(0) << __func__ << " additional unexpected onode"
2705 <<" new onode has appeared since PG removal started"
2706 << oid << dendl;
b3b6e05e 2707 }
adb31ebb
TL
2708 }
2709 }
2710
9f95a23c
TL
2711 OSDriver::OSTransaction _t(osdriver.get_transaction(&t));
2712 int64_t num = 0;
2713 for (auto& oid : olist) {
2714 if (oid == pgmeta_oid) {
7c673cae
FG
2715 continue;
2716 }
9f95a23c
TL
2717 if (oid.is_pgmeta()) {
2718 osd->clog->warn() << info.pgid << " found stray pgmeta-like " << oid
2719 << " during PG removal";
7c673cae 2720 }
9f95a23c
TL
2721 int r = snap_mapper.remove_oid(oid.hobj, &_t);
2722 if (r != 0 && r != -ENOENT) {
2723 ceph_abort();
7c673cae 2724 }
9f95a23c
TL
2725 t.remove(coll, oid);
2726 ++num;
7c673cae 2727 }
f67539c2 2728 bool running = true;
9f95a23c
TL
2729 if (num) {
2730 dout(20) << __func__ << " deleting " << num << " objects" << dendl;
2731 Context *fin = new C_DeleteMore(this, get_osdmap_epoch());
2732 t.register_on_commit(fin);
7c673cae 2733 } else {
9f95a23c
TL
2734 if (cct->_conf->osd_inject_failure_on_pg_removal) {
2735 _exit(1);
7c673cae 2736 }
7c673cae 2737
9f95a23c
TL
2738 // final flush here to ensure completions drop refs. Of particular concern
2739 // are the SnapMapper ContainerContexts.
2740 {
2741 PGRef pgref(this);
2742 PGLog::clear_info_log(info.pgid, &t);
2743 t.remove_collection(coll);
2744 t.register_on_commit(new ContainerContext<PGRef>(pgref));
2745 t.register_on_applied(new ContainerContext<PGRef>(pgref));
2746 osd->store->queue_transaction(ch, std::move(t));
7c673cae 2747 }
9f95a23c 2748 ch->flush();
7c673cae 2749
9f95a23c
TL
2750 if (!osd->try_finish_pg_delete(this, pool.info.get_pg_num())) {
2751 dout(1) << __func__ << " raced with merge, reinstantiating" << dendl;
2752 ch = osd->store->create_new_collection(coll);
2753 create_pg_collection(t,
2754 info.pgid,
2755 info.pgid.get_split_bits(pool.info.get_pg_num()));
2756 init_pg_ondisk(t, info.pgid, &pool.info);
2757 recovery_state.reset_last_persisted();
2758 } else {
2759 recovery_state.set_delete_complete();
7c673cae 2760
9f95a23c
TL
2761 // cancel reserver here, since the PG is about to get deleted and the
2762 // exit() methods don't run when that happens.
2763 osd->local_reserver.cancel_reservation(info.pgid);
7c673cae 2764
f67539c2 2765 running = false;
9f95a23c 2766 }
7c673cae 2767 }
f67539c2 2768 return {next, running};
7c673cae
FG
2769}
2770
9f95a23c 2771int PG::pg_stat_adjust(osd_stat_t *ns)
7c673cae 2772{
9f95a23c
TL
2773 osd_stat_t &new_stat = *ns;
2774 if (is_primary()) {
2775 return 0;
7c673cae 2776 }
9f95a23c
TL
2777 // Adjust the kb_used by adding pending backfill data
2778 uint64_t reserved_num_bytes = get_reserved_num_bytes();
7c673cae 2779
9f95a23c
TL
2780 // For now we don't consider projected space gains here
2781 // I suggest we have an optional 2 pass backfill that frees up
2782 // space in a first pass. This could be triggered when at nearfull
2783 // or near to backfillfull.
2784 if (reserved_num_bytes > 0) {
2785 // TODO: Handle compression by adjusting by the PGs average
2786 // compression precentage.
2787 dout(20) << __func__ << " reserved_num_bytes " << (reserved_num_bytes >> 10) << "KiB"
2788 << " Before kb_used " << new_stat.statfs.kb_used() << "KiB" << dendl;
2789 if (new_stat.statfs.available > reserved_num_bytes)
2790 new_stat.statfs.available -= reserved_num_bytes;
2791 else
2792 new_stat.statfs.available = 0;
2793 dout(20) << __func__ << " After kb_used " << new_stat.statfs.kb_used() << "KiB" << dendl;
2794 return 1;
7c673cae 2795 }
9f95a23c 2796 return 0;
7c673cae
FG
2797}
2798
11fdf7f2
TL
2799void PG::dump_pgstate_history(Formatter *f)
2800{
9f95a23c
TL
2801 std::scoped_lock l{*this};
2802 recovery_state.dump_history(f);
11fdf7f2 2803}
7c673cae 2804
11fdf7f2
TL
2805void PG::dump_missing(Formatter *f)
2806{
9f95a23c 2807 for (auto& i : recovery_state.get_pg_log().get_missing().get_items()) {
11fdf7f2
TL
2808 f->open_object_section("object");
2809 f->dump_object("oid", i.first);
2810 f->dump_object("missing_info", i.second);
9f95a23c
TL
2811 if (recovery_state.get_missing_loc().needs_recovery(i.first)) {
2812 f->dump_bool(
2813 "unfound",
2814 recovery_state.get_missing_loc().is_unfound(i.first));
11fdf7f2 2815 f->open_array_section("locations");
9f95a23c 2816 for (auto l : recovery_state.get_missing_loc().get_locations(i.first)) {
11fdf7f2
TL
2817 f->dump_object("shard", l);
2818 }
2819 f->close_section();
2820 }
2821 f->close_section();
2822 }
2823}
2824
1e59de90
TL
2825void PG::with_pg_stats(ceph::coarse_real_clock::time_point now_is,
2826 std::function<void(const pg_stat_t&, epoch_t lec)>&& f)
11fdf7f2 2827{
1e59de90
TL
2828 dout(30) << __func__ << dendl;
2829 // possibly update the scrub state & timers
2830 lock();
2831 if (m_scrubber) {
2832 m_scrubber->update_scrub_stats(now_is);
2833 }
2834 unlock();
2835
2836 // now - the actual publishing
9f95a23c 2837 std::lock_guard l{pg_stats_publish_lock};
20effc67
TL
2838 if (pg_stats_publish) {
2839 f(*pg_stats_publish, pg_stats_publish->get_effective_last_epoch_clean());
11fdf7f2 2840 }
11fdf7f2
TL
2841}
2842
20effc67 2843void PG::with_heartbeat_peers(std::function<void(int)>&& f)
11fdf7f2 2844{
9f95a23c 2845 std::lock_guard l{heartbeat_peer_lock};
11fdf7f2
TL
2846 for (auto p : heartbeat_peers) {
2847 f(p);
2848 }
2849 for (auto p : probe_targets) {
2850 f(p);
2851 }
9f95a23c
TL
2852}
2853
2854uint64_t PG::get_min_alloc_size() const {
2855 return osd->store->get_min_alloc_size();
11fdf7f2 2856}