]> git.proxmox.com Git - ceph.git/blame - ceph/src/osd/PG.cc
import ceph pacific 16.2.5
[ceph.git] / ceph / src / osd / PG.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15#include "PG.h"
7c673cae 16#include "messages/MOSDRepScrub.h"
7c673cae
FG
17
18#include "common/errno.h"
9f95a23c 19#include "common/ceph_releases.h"
7c673cae
FG
20#include "common/config.h"
21#include "OSD.h"
22#include "OpRequest.h"
23#include "ScrubStore.h"
f67539c2 24#include "pg_scrubber.h"
7c673cae 25#include "Session.h"
9f95a23c 26#include "osd/scheduler/OpSchedulerItem.h"
7c673cae
FG
27
28#include "common/Timer.h"
29#include "common/perf_counters.h"
30
31#include "messages/MOSDOp.h"
32#include "messages/MOSDPGNotify.h"
7c673cae 33#include "messages/MOSDPGInfo.h"
7c673cae
FG
34#include "messages/MOSDPGScan.h"
35#include "messages/MOSDPGBackfill.h"
36#include "messages/MOSDPGBackfillRemove.h"
37#include "messages/MBackfillReserve.h"
38#include "messages/MRecoveryReserve.h"
39#include "messages/MOSDPGPush.h"
40#include "messages/MOSDPGPushReply.h"
41#include "messages/MOSDPGPull.h"
42#include "messages/MOSDECSubOpWrite.h"
43#include "messages/MOSDECSubOpWriteReply.h"
44#include "messages/MOSDECSubOpRead.h"
45#include "messages/MOSDECSubOpReadReply.h"
46#include "messages/MOSDPGUpdateLogMissing.h"
47#include "messages/MOSDPGUpdateLogMissingReply.h"
48#include "messages/MOSDBackoff.h"
49#include "messages/MOSDScrubReserve.h"
7c673cae 50#include "messages/MOSDRepOp.h"
7c673cae
FG
51#include "messages/MOSDRepOpReply.h"
52#include "messages/MOSDRepScrubMap.h"
c07f9fc5
FG
53#include "messages/MOSDPGRecoveryDelete.h"
54#include "messages/MOSDPGRecoveryDeleteReply.h"
7c673cae
FG
55
56#include "common/BackTrace.h"
57#include "common/EventTrace.h"
58
59#ifdef WITH_LTTNG
60#define TRACEPOINT_DEFINE
61#define TRACEPOINT_PROBE_DYNAMIC_LINKAGE
62#include "tracing/pg.h"
63#undef TRACEPOINT_PROBE_DYNAMIC_LINKAGE
64#undef TRACEPOINT_DEFINE
65#else
66#define tracepoint(...)
67#endif
68
69#include <sstream>
70
71#define dout_context cct
72#define dout_subsys ceph_subsys_osd
73#undef dout_prefix
74#define dout_prefix _prefix(_dout, this)
75
f67539c2
TL
76using std::list;
77using std::map;
78using std::ostringstream;
79using std::pair;
80using std::set;
81using std::string;
82using std::stringstream;
83using std::unique_ptr;
84using std::vector;
85
86using ceph::bufferlist;
87using ceph::bufferptr;
88using ceph::decode;
89using ceph::encode;
90using ceph::Formatter;
91
9f95a23c 92using namespace ceph::osd::scheduler;
7c673cae
FG
93
94template <class T>
95static ostream& _prefix(std::ostream *_dout, T *t)
96{
11fdf7f2 97 return t->gen_prefix(*_dout);
7c673cae
FG
98}
99
7c673cae
FG
100void PG::get(const char* tag)
101{
11fdf7f2
TL
102 int after = ++ref;
103 lgeneric_subdout(cct, refs, 5) << "PG::get " << this << " "
104 << "tag " << (tag ? tag : "(none") << " "
105 << (after - 1) << " -> " << after << dendl;
7c673cae 106#ifdef PG_DEBUG_REFS
11fdf7f2 107 std::lock_guard l(_ref_id_lock);
7c673cae
FG
108 _tag_counts[tag]++;
109#endif
110}
111
112void PG::put(const char* tag)
113{
114#ifdef PG_DEBUG_REFS
115 {
11fdf7f2 116 std::lock_guard l(_ref_id_lock);
7c673cae 117 auto tag_counts_entry = _tag_counts.find(tag);
11fdf7f2 118 ceph_assert(tag_counts_entry != _tag_counts.end());
7c673cae
FG
119 --tag_counts_entry->second;
120 if (tag_counts_entry->second == 0) {
121 _tag_counts.erase(tag_counts_entry);
122 }
123 }
124#endif
11fdf7f2
TL
125 auto local_cct = cct;
126 int after = --ref;
127 lgeneric_subdout(local_cct, refs, 5) << "PG::put " << this << " "
128 << "tag " << (tag ? tag : "(none") << " "
129 << (after + 1) << " -> " << after
130 << dendl;
131 if (after == 0)
7c673cae
FG
132 delete this;
133}
134
135#ifdef PG_DEBUG_REFS
136uint64_t PG::get_with_id()
137{
138 ref++;
11fdf7f2 139 std::lock_guard l(_ref_id_lock);
7c673cae
FG
140 uint64_t id = ++_ref_id;
141 BackTrace bt(0);
142 stringstream ss;
143 bt.print(ss);
11fdf7f2
TL
144 lgeneric_subdout(cct, refs, 5) << "PG::get " << this << " " << info.pgid
145 << " got id " << id << " "
146 << (ref - 1) << " -> " << ref
147 << dendl;
148 ceph_assert(!_live_ids.count(id));
7c673cae
FG
149 _live_ids.insert(make_pair(id, ss.str()));
150 return id;
151}
152
153void PG::put_with_id(uint64_t id)
154{
11fdf7f2
TL
155 int newref = --ref;
156 lgeneric_subdout(cct, refs, 5) << "PG::put " << this << " " << info.pgid
157 << " put id " << id << " "
158 << (newref + 1) << " -> " << newref
159 << dendl;
7c673cae 160 {
11fdf7f2
TL
161 std::lock_guard l(_ref_id_lock);
162 ceph_assert(_live_ids.count(id));
7c673cae
FG
163 _live_ids.erase(id);
164 }
11fdf7f2 165 if (newref)
7c673cae
FG
166 delete this;
167}
168
169void PG::dump_live_ids()
170{
11fdf7f2 171 std::lock_guard l(_ref_id_lock);
7c673cae
FG
172 dout(0) << "\t" << __func__ << ": " << info.pgid << " live ids:" << dendl;
173 for (map<uint64_t, string>::iterator i = _live_ids.begin();
174 i != _live_ids.end();
175 ++i) {
176 dout(0) << "\t\tid: " << *i << dendl;
177 }
178 dout(0) << "\t" << __func__ << ": " << info.pgid << " live tags:" << dendl;
179 for (map<string, uint64_t>::iterator i = _tag_counts.begin();
180 i != _tag_counts.end();
181 ++i) {
182 dout(0) << "\t\tid: " << *i << dendl;
183 }
184}
185#endif
186
7c673cae
FG
187PG::PG(OSDService *o, OSDMapRef curmap,
188 const PGPool &_pool, spg_t p) :
9f95a23c 189 pg_whoami(o->whoami, p.shard),
11fdf7f2
TL
190 pg_id(p),
191 coll(p),
7c673cae
FG
192 osd(o),
193 cct(o->cct),
194 osdriver(osd->store, coll_t(), OSD::make_snapmapper_oid()),
195 snap_mapper(
196 cct,
197 &osdriver,
198 p.ps(),
11fdf7f2 199 p.get_split_bits(_pool.info.get_pg_num()),
7c673cae
FG
200 _pool.id,
201 p.shard),
7c673cae 202 trace_endpoint("0.0.0.0", 0, "PG"),
7c673cae 203 info_struct_v(0),
7c673cae 204 pgmeta_oid(p.make_pgmeta_oid()),
7c673cae
FG
205 stat_queue_item(this),
206 scrub_queued(false),
207 recovery_queued(false),
208 recovery_ops_active(0),
7c673cae 209 backfill_reserving(false),
7c673cae 210 pg_stats_publish_valid(false),
7c673cae 211 finish_sync_event(NULL),
7c673cae
FG
212 scrub_after_recovery(false),
213 active_pushes(0),
9f95a23c
TL
214 recovery_state(
215 o->cct,
216 pg_whoami,
217 p,
218 _pool,
219 curmap,
220 this,
221 this),
222 pool(recovery_state.get_pool()),
223 info(recovery_state.get_info())
7c673cae
FG
224{
225#ifdef PG_DEBUG_REFS
226 osd->add_pgid(p, this);
227#endif
228#ifdef WITH_BLKIN
229 std::stringstream ss;
230 ss << "PG " << info.pgid;
231 trace_endpoint.copy_name(ss.str());
232#endif
7c673cae
FG
233}
234
235PG::~PG()
236{
7c673cae
FG
237#ifdef PG_DEBUG_REFS
238 osd->remove_pgid(info.pgid, this);
239#endif
240}
241
7c673cae
FG
242void PG::lock(bool no_lockdep) const
243{
9f95a23c
TL
244#ifdef CEPH_DEBUG_MUTEX
245 _lock.lock(no_lockdep);
246#else
247 _lock.lock();
248 locked_by = std::this_thread::get_id();
249#endif
7c673cae 250 // if we have unrecorded dirty state with the lock dropped, there is a bug
9f95a23c 251 ceph_assert(!recovery_state.debug_has_dirty_state());
7c673cae
FG
252
253 dout(30) << "lock" << dendl;
254}
255
9f95a23c
TL
256bool PG::is_locked() const
257{
258 return ceph_mutex_is_locked(_lock);
259}
260
261void PG::unlock() const
262{
263 //generic_dout(0) << this << " " << info.pgid << " unlock" << dendl;
264 ceph_assert(!recovery_state.debug_has_dirty_state());
265#ifndef CEPH_DEBUG_MUTEX
266 locked_by = {};
267#endif
268 _lock.unlock();
269}
270
11fdf7f2 271std::ostream& PG::gen_prefix(std::ostream& out) const
7c673cae 272{
9f95a23c
TL
273 OSDMapRef mapref = recovery_state.get_osdmap();
274#ifdef CEPH_DEBUG_MUTEX
7c673cae 275 if (_lock.is_locked_by_me()) {
9f95a23c
TL
276#else
277 if (locked_by == std::this_thread::get_id()) {
278#endif
7c673cae
FG
279 out << "osd." << osd->whoami
280 << " pg_epoch: " << (mapref ? mapref->get_epoch():0)
281 << " " << *this << " ";
282 } else {
283 out << "osd." << osd->whoami
284 << " pg_epoch: " << (mapref ? mapref->get_epoch():0)
9f95a23c 285 << " pg[" << pg_id.pgid << "(unlocked)] ";
7c673cae 286 }
11fdf7f2 287 return out;
7c673cae 288}
7c673cae 289
9f95a23c
TL
290PerfCounters &PG::get_peering_perf() {
291 return *(osd->recoverystate_perf);
7c673cae
FG
292}
293
9f95a23c
TL
294PerfCounters &PG::get_perf_logger() {
295 return *(osd->logger);
296}
7c673cae 297
9f95a23c
TL
298void PG::log_state_enter(const char *state) {
299 osd->pg_recovery_stats.log_enter(state);
300}
7c673cae 301
9f95a23c
TL
302void PG::log_state_exit(
303 const char *state_name, utime_t enter_time,
304 uint64_t events, utime_t event_dur) {
305 osd->pg_recovery_stats.log_exit(
306 state_name, ceph_clock_now() - enter_time, events, event_dur);
7c673cae 307}
f67539c2 308
9f95a23c 309/********* PG **********/
7c673cae
FG
310
311void PG::remove_snap_mapped_object(
312 ObjectStore::Transaction &t, const hobject_t &soid)
313{
314 t.remove(
315 coll,
316 ghobject_t(soid, ghobject_t::NO_GEN, pg_whoami.shard));
317 clear_object_snap_mapping(&t, soid);
318}
319
320void PG::clear_object_snap_mapping(
321 ObjectStore::Transaction *t, const hobject_t &soid)
322{
323 OSDriver::OSTransaction _t(osdriver.get_transaction(t));
324 if (soid.snap < CEPH_MAXSNAP) {
325 int r = snap_mapper.remove_oid(
326 soid,
327 &_t);
328 if (!(r == 0 || r == -ENOENT)) {
329 derr << __func__ << ": remove_oid returned " << cpp_strerror(r) << dendl;
330 ceph_abort();
331 }
332 }
333}
334
335void PG::update_object_snap_mapping(
336 ObjectStore::Transaction *t, const hobject_t &soid, const set<snapid_t> &snaps)
337{
338 OSDriver::OSTransaction _t(osdriver.get_transaction(t));
11fdf7f2 339 ceph_assert(soid.snap < CEPH_MAXSNAP);
7c673cae
FG
340 int r = snap_mapper.remove_oid(
341 soid,
342 &_t);
343 if (!(r == 0 || r == -ENOENT)) {
344 derr << __func__ << ": remove_oid returned " << cpp_strerror(r) << dendl;
345 ceph_abort();
346 }
347 snap_mapper.add_oid(
348 soid,
349 snaps,
350 &_t);
351}
352
9f95a23c
TL
353/******* PG ***********/
354void PG::clear_primary_state()
7c673cae 355{
f67539c2
TL
356 dout(20) << __func__ << dendl;
357
9f95a23c 358 projected_log = PGLog::IndexedLog();
7c673cae 359
9f95a23c
TL
360 snap_trimq.clear();
361 snap_trimq_repeat.clear();
362 finish_sync_event = 0; // so that _finish_recovery doesn't go off in another thread
363 release_pg_backoffs();
7c673cae 364
f67539c2
TL
365 if (m_scrubber) {
366 m_scrubber->discard_replica_reservations();
367 }
9f95a23c
TL
368 scrub_after_recovery = false;
369
370 agent_clear();
7c673cae
FG
371}
372
91327a77 373
9f95a23c
TL
374bool PG::op_has_sufficient_caps(OpRequestRef& op)
375{
376 // only check MOSDOp
377 if (op->get_req()->get_type() != CEPH_MSG_OSD_OP)
c07f9fc5 378 return true;
9f95a23c
TL
379
380 auto req = op->get_req<MOSDOp>();
381 auto priv = req->get_connection()->get_priv();
382 auto session = static_cast<Session*>(priv.get());
383 if (!session) {
384 dout(0) << "op_has_sufficient_caps: no session for op " << *req << dendl;
c07f9fc5 385 return false;
7c673cae 386 }
9f95a23c
TL
387 OSDCap& caps = session->caps;
388 priv.reset();
7c673cae 389
9f95a23c
TL
390 const string &key = req->get_hobj().get_key().empty() ?
391 req->get_oid().name :
392 req->get_hobj().get_key();
91327a77 393
9f95a23c
TL
394 bool cap = caps.is_capable(pool.name, req->get_hobj().nspace,
395 pool.info.application_metadata,
396 key,
397 op->need_read_cap(),
398 op->need_write_cap(),
399 op->classes(),
400 session->get_peer_socket_addr());
91327a77 401
9f95a23c
TL
402 dout(20) << "op_has_sufficient_caps "
403 << "session=" << session
404 << " pool=" << pool.id << " (" << pool.name
405 << " " << req->get_hobj().nspace
406 << ")"
407 << " pool_app_metadata=" << pool.info.application_metadata
408 << " need_read_cap=" << op->need_read_cap()
409 << " need_write_cap=" << op->need_write_cap()
410 << " classes=" << op->classes()
411 << " -> " << (cap ? "yes" : "NO")
412 << dendl;
413 return cap;
7c673cae
FG
414}
415
9f95a23c 416void PG::queue_recovery()
91327a77 417{
9f95a23c
TL
418 if (!is_primary() || !is_peered()) {
419 dout(10) << "queue_recovery -- not primary or not peered " << dendl;
420 ceph_assert(!recovery_queued);
421 } else if (recovery_queued) {
422 dout(10) << "queue_recovery -- already queued" << dendl;
91327a77 423 } else {
9f95a23c
TL
424 dout(10) << "queue_recovery -- queuing" << dendl;
425 recovery_queued = true;
426 osd->queue_for_recovery(this);
91327a77
AA
427 }
428}
9f95a23c 429
f67539c2 430void PG::queue_scrub_after_repair()
7c673cae 431{
f67539c2 432 dout(10) << __func__ << dendl;
9f95a23c 433 ceph_assert(ceph_mutex_is_locked(_lock));
f67539c2
TL
434
435 m_planned_scrub.must_deep_scrub = true;
436 m_planned_scrub.check_repair = true;
437 m_planned_scrub.must_scrub = true;
438
9f95a23c 439 if (is_scrubbing()) {
f67539c2
TL
440 dout(10) << __func__ << ": scrubbing already" << dendl;
441 return;
9f95a23c 442 }
f67539c2
TL
443 if (scrub_queued) {
444 dout(10) << __func__ << ": already queued" << dendl;
445 return;
446 }
447
448 m_scrubber->set_op_parameters(m_planned_scrub);
449 dout(15) << __func__ << ": queueing" << dendl;
450
451 scrub_queued = true;
452 osd->queue_scrub_after_repair(this, Scrub::scrub_prio_t::high_priority);
9f95a23c 453}
7c673cae 454
9f95a23c
TL
455unsigned PG::get_scrub_priority()
456{
457 // a higher value -> a higher priority
f67539c2
TL
458 int64_t pool_scrub_priority =
459 pool.info.opts.value_or(pool_opts_t::SCRUB_PRIORITY, (int64_t)0);
9f95a23c
TL
460 return pool_scrub_priority > 0 ? pool_scrub_priority : cct->_conf->osd_scrub_priority;
461}
7c673cae 462
9f95a23c
TL
463Context *PG::finish_recovery()
464{
465 dout(10) << "finish_recovery" << dendl;
466 ceph_assert(info.last_complete == info.last_update);
7c673cae 467
9f95a23c 468 clear_recovery_state();
92f5a8d4 469
9f95a23c
TL
470 /*
471 * sync all this before purging strays. but don't block!
472 */
473 finish_sync_event = new C_PG_FinishRecovery(this);
474 return finish_sync_event;
7c673cae
FG
475}
476
f67539c2 477void PG::_finish_recovery(Context* c)
7c673cae 478{
f67539c2
TL
479 dout(15) << __func__ << " finish_sync_event? " << finish_sync_event << " clean? "
480 << is_clean() << dendl;
481
9f95a23c
TL
482 std::scoped_lock locker{*this};
483 if (recovery_state.is_deleting() || !is_clean()) {
484 dout(10) << __func__ << " raced with delete or repair" << dendl;
485 return;
7c673cae 486 }
9f95a23c
TL
487 // When recovery is initiated by a repair, that flag is left on
488 state_clear(PG_STATE_REPAIR);
489 if (c == finish_sync_event) {
f67539c2 490 dout(15) << __func__ << " scrub_after_recovery? " << scrub_after_recovery << dendl;
9f95a23c
TL
491 finish_sync_event = 0;
492 recovery_state.purge_strays();
7c673cae 493
9f95a23c
TL
494 publish_stats_to_osd();
495
496 if (scrub_after_recovery) {
497 dout(10) << "_finish_recovery requeueing for scrub" << dendl;
498 scrub_after_recovery = false;
f67539c2 499 queue_scrub_after_repair();
7c673cae 500 }
9f95a23c
TL
501 } else {
502 dout(10) << "_finish_recovery -- stale" << dendl;
7c673cae 503 }
9f95a23c 504}
7c673cae 505
9f95a23c
TL
506void PG::start_recovery_op(const hobject_t& soid)
507{
508 dout(10) << "start_recovery_op " << soid
509#ifdef DEBUG_RECOVERY_OIDS
510 << " (" << recovering_oids << ")"
511#endif
512 << dendl;
513 ceph_assert(recovery_ops_active >= 0);
514 recovery_ops_active++;
515#ifdef DEBUG_RECOVERY_OIDS
516 recovering_oids.insert(soid);
517#endif
518 osd->start_recovery_op(this, soid);
7c673cae
FG
519}
520
9f95a23c 521void PG::finish_recovery_op(const hobject_t& soid, bool dequeue)
7c673cae 522{
9f95a23c
TL
523 dout(10) << "finish_recovery_op " << soid
524#ifdef DEBUG_RECOVERY_OIDS
f67539c2 525 << " (" << recovering_oids << ")"
9f95a23c
TL
526#endif
527 << dendl;
528 ceph_assert(recovery_ops_active > 0);
529 recovery_ops_active--;
530#ifdef DEBUG_RECOVERY_OIDS
531 ceph_assert(recovering_oids.count(soid));
532 recovering_oids.erase(recovering_oids.find(soid));
533#endif
534 osd->finish_recovery_op(this, soid, dequeue);
7c673cae 535
9f95a23c
TL
536 if (!dequeue) {
537 queue_recovery();
7c673cae 538 }
7c673cae
FG
539}
540
9f95a23c
TL
541void PG::split_into(pg_t child_pgid, PG *child, unsigned split_bits)
542{
543 recovery_state.split_into(child_pgid, &child->recovery_state, split_bits);
7c673cae 544
9f95a23c 545 child->update_snap_mapper_bits(split_bits);
7c673cae 546
9f95a23c
TL
547 child->snap_trimq = snap_trimq;
548 child->snap_trimq_repeat = snap_trimq_repeat;
549
550 _split_into(child_pgid, child, split_bits);
551
552 // release all backoffs for simplicity
553 release_backoffs(hobject_t(), hobject_t::get_max());
7c673cae
FG
554}
555
9f95a23c 556void PG::start_split_stats(const set<spg_t>& childpgs, vector<object_stat_sum_t> *out)
7c673cae 557{
9f95a23c 558 recovery_state.start_split_stats(childpgs, out);
7c673cae
FG
559}
560
9f95a23c
TL
561void PG::finish_split_stats(const object_stat_sum_t& stats, ObjectStore::Transaction &t)
562{
563 recovery_state.finish_split_stats(stats, t);
564}
565
566void PG::merge_from(map<spg_t,PGRef>& sources, PeeringCtx &rctx,
567 unsigned split_bits,
568 const pg_merge_meta_t& last_pg_merge_meta)
569{
570 dout(10) << __func__ << " from " << sources << " split_bits " << split_bits
571 << dendl;
572 map<spg_t, PeeringState*> source_ps;
573 for (auto &&source : sources) {
574 source_ps.emplace(source.first, &source.second->recovery_state);
575 }
576 recovery_state.merge_from(source_ps, rctx, split_bits, last_pg_merge_meta);
577
578 for (auto& i : sources) {
579 auto& source = i.second;
580 // wipe out source's pgmeta
581 rctx.transaction.remove(source->coll, source->pgmeta_oid);
582
583 // merge (and destroy source collection)
584 rctx.transaction.merge_collection(source->coll, coll, split_bits);
7c673cae
FG
585 }
586
9f95a23c
TL
587 // merge_collection does this, but maybe all of our sources were missing.
588 rctx.transaction.collection_set_bits(coll, split_bits);
589
590 snap_mapper.update_bits(split_bits);
7c673cae
FG
591}
592
9f95a23c 593void PG::add_backoff(const ceph::ref_t<Session>& s, const hobject_t& begin, const hobject_t& end)
7c673cae 594{
9f95a23c
TL
595 auto con = s->con;
596 if (!con) // OSD::ms_handle_reset clears s->con without a lock
597 return;
598 auto b = s->have_backoff(info.pgid, begin);
599 if (b) {
600 derr << __func__ << " already have backoff for " << s << " begin " << begin
601 << " " << *b << dendl;
602 ceph_abort();
7c673cae 603 }
9f95a23c
TL
604 std::lock_guard l(backoff_lock);
605 b = ceph::make_ref<Backoff>(info.pgid, this, s, ++s->backoff_seq, begin, end);
606 backoffs[begin].insert(b);
607 s->add_backoff(b);
608 dout(10) << __func__ << " session " << s << " added " << *b << dendl;
609 con->send_message(
610 new MOSDBackoff(
611 info.pgid,
612 get_osdmap_epoch(),
613 CEPH_OSD_BACKOFF_OP_BLOCK,
614 b->id,
615 begin,
616 end));
7c673cae
FG
617}
618
9f95a23c 619void PG::release_backoffs(const hobject_t& begin, const hobject_t& end)
7c673cae 620{
9f95a23c
TL
621 dout(10) << __func__ << " [" << begin << "," << end << ")" << dendl;
622 vector<ceph::ref_t<Backoff>> bv;
623 {
624 std::lock_guard l(backoff_lock);
625 auto p = backoffs.lower_bound(begin);
626 while (p != backoffs.end()) {
627 int r = cmp(p->first, end);
628 dout(20) << __func__ << " ? " << r << " " << p->first
629 << " " << p->second << dendl;
630 // note: must still examine begin=end=p->first case
631 if (r > 0 || (r == 0 && begin < end)) {
632 break;
633 }
634 dout(20) << __func__ << " checking " << p->first
635 << " " << p->second << dendl;
636 auto q = p->second.begin();
637 while (q != p->second.end()) {
638 dout(20) << __func__ << " checking " << *q << dendl;
639 int r = cmp((*q)->begin, begin);
640 if (r == 0 || (r > 0 && (*q)->end < end)) {
641 bv.push_back(*q);
642 q = p->second.erase(q);
643 } else {
644 ++q;
645 }
646 }
647 if (p->second.empty()) {
648 p = backoffs.erase(p);
649 } else {
650 ++p;
651 }
7c673cae
FG
652 }
653 }
9f95a23c
TL
654 for (auto b : bv) {
655 std::lock_guard l(b->lock);
656 dout(10) << __func__ << " " << *b << dendl;
657 if (b->session) {
658 ceph_assert(b->pg == this);
659 ConnectionRef con = b->session->con;
660 if (con) { // OSD::ms_handle_reset clears s->con without a lock
661 con->send_message(
662 new MOSDBackoff(
663 info.pgid,
664 get_osdmap_epoch(),
665 CEPH_OSD_BACKOFF_OP_UNBLOCK,
666 b->id,
667 b->begin,
668 b->end));
7c673cae 669 }
9f95a23c
TL
670 if (b->is_new()) {
671 b->state = Backoff::STATE_DELETING;
7c673cae 672 } else {
9f95a23c
TL
673 b->session->rm_backoff(b);
674 b->session.reset();
7c673cae 675 }
9f95a23c
TL
676 b->pg.reset();
677 }
7c673cae 678 }
7c673cae
FG
679}
680
9f95a23c 681void PG::clear_backoffs()
7c673cae 682{
9f95a23c
TL
683 dout(10) << __func__ << " " << dendl;
684 map<hobject_t,set<ceph::ref_t<Backoff>>> ls;
685 {
686 std::lock_guard l(backoff_lock);
687 ls.swap(backoffs);
688 }
689 for (auto& p : ls) {
690 for (auto& b : p.second) {
691 std::lock_guard l(b->lock);
692 dout(10) << __func__ << " " << *b << dendl;
693 if (b->session) {
694 ceph_assert(b->pg == this);
695 if (b->is_new()) {
696 b->state = Backoff::STATE_DELETING;
697 } else {
698 b->session->rm_backoff(b);
699 b->session.reset();
700 }
701 b->pg.reset();
702 }
703 }
704 }
705}
7c673cae 706
9f95a23c
TL
707// called by Session::clear_backoffs()
708void PG::rm_backoff(const ceph::ref_t<Backoff>& b)
709{
710 dout(10) << __func__ << " " << *b << dendl;
711 std::lock_guard l(backoff_lock);
712 ceph_assert(ceph_mutex_is_locked_by_me(b->lock));
713 ceph_assert(b->pg == this);
714 auto p = backoffs.find(b->begin);
715 // may race with release_backoffs()
716 if (p != backoffs.end()) {
717 auto q = p->second.find(b);
718 if (q != p->second.end()) {
719 p->second.erase(q);
720 if (p->second.empty()) {
721 backoffs.erase(p);
722 }
723 }
724 }
725}
7c673cae 726
f67539c2 727void PG::clear_recovery_state()
9f95a23c
TL
728{
729 dout(10) << "clear_recovery_state" << dendl;
7c673cae 730
9f95a23c 731 finish_sync_event = 0;
7c673cae 732
9f95a23c
TL
733 hobject_t soid;
734 while (recovery_ops_active > 0) {
735#ifdef DEBUG_RECOVERY_OIDS
736 soid = *recovering_oids.begin();
737#endif
738 finish_recovery_op(soid, true);
739 }
7c673cae 740
9f95a23c
TL
741 backfill_info.clear();
742 peer_backfill_info.clear();
743 waiting_on_backfill.clear();
744 _clear_recovery_state(); // pg impl specific hook
745}
7c673cae 746
9f95a23c
TL
747void PG::cancel_recovery()
748{
749 dout(10) << "cancel_recovery" << dendl;
750 clear_recovery_state();
7c673cae
FG
751}
752
9f95a23c
TL
753void PG::set_probe_targets(const set<pg_shard_t> &probe_set)
754{
755 std::lock_guard l(heartbeat_peer_lock);
756 probe_targets.clear();
757 for (set<pg_shard_t>::iterator i = probe_set.begin();
758 i != probe_set.end();
7c673cae 759 ++i) {
9f95a23c 760 probe_targets.insert(i->osd);
7c673cae 761 }
7c673cae
FG
762}
763
9f95a23c 764void PG::send_cluster_message(
f67539c2 765 int target, MessageRef m,
9f95a23c
TL
766 epoch_t epoch, bool share_map_update=false)
767{
768 ConnectionRef con = osd->get_con_osd_cluster(
769 target, get_osdmap_epoch());
770 if (!con) {
11fdf7f2
TL
771 return;
772 }
7c673cae 773
9f95a23c
TL
774 if (share_map_update) {
775 osd->maybe_share_map(con.get(), get_osdmap());
7c673cae 776 }
9f95a23c
TL
777 osd->send_message_osd_cluster(m, con.get());
778}
7c673cae 779
9f95a23c
TL
780void PG::clear_probe_targets()
781{
782 std::lock_guard l(heartbeat_peer_lock);
783 probe_targets.clear();
784}
7c673cae 785
9f95a23c
TL
786void PG::update_heartbeat_peers(set<int> new_peers)
787{
788 bool need_update = false;
789 heartbeat_peer_lock.lock();
790 if (new_peers == heartbeat_peers) {
791 dout(10) << "update_heartbeat_peers " << heartbeat_peers << " unchanged" << dendl;
792 } else {
793 dout(10) << "update_heartbeat_peers " << heartbeat_peers << " -> " << new_peers << dendl;
794 heartbeat_peers.swap(new_peers);
795 need_update = true;
11fdf7f2 796 }
9f95a23c 797 heartbeat_peer_lock.unlock();
11fdf7f2 798
9f95a23c
TL
799 if (need_update)
800 osd->need_heartbeat_peer_update();
801}
11fdf7f2 802
11fdf7f2 803
9f95a23c
TL
804bool PG::check_in_progress_op(
805 const osd_reqid_t &r,
806 eversion_t *version,
807 version_t *user_version,
808 int *return_code,
809 vector<pg_log_op_return_item_t> *op_returns
810 ) const
811{
812 return (
813 projected_log.get_request(r, version, user_version, return_code,
814 op_returns) ||
815 recovery_state.get_pg_log().get_log().get_request(
816 r, version, user_version, return_code, op_returns));
11fdf7f2
TL
817}
818
9f95a23c 819void PG::publish_stats_to_osd()
11fdf7f2 820{
9f95a23c
TL
821 if (!is_primary())
822 return;
11fdf7f2 823
9f95a23c
TL
824 std::lock_guard l{pg_stats_publish_lock};
825 auto stats = recovery_state.prepare_stats_for_publish(
826 pg_stats_publish_valid,
827 pg_stats_publish,
828 unstable_stats);
829 if (stats) {
830 pg_stats_publish = stats.value();
831 pg_stats_publish_valid = true;
11fdf7f2 832 }
11fdf7f2
TL
833}
834
9f95a23c 835unsigned PG::get_target_pg_log_entries() const
11fdf7f2 836{
9f95a23c 837 return osd->get_target_pg_log_entries();
11fdf7f2
TL
838}
839
9f95a23c 840void PG::clear_publish_stats()
11fdf7f2 841{
9f95a23c
TL
842 dout(15) << "clear_stats" << dendl;
843 std::lock_guard l{pg_stats_publish_lock};
844 pg_stats_publish_valid = false;
7c673cae
FG
845}
846
847/**
9f95a23c 848 * initialize a newly instantiated pg
7c673cae 849 *
9f95a23c
TL
850 * Initialize PG state, as when a PG is initially created, or when it
851 * is first instantiated on the current node.
7c673cae 852 *
9f95a23c
TL
853 * @param role our role/rank
854 * @param newup up set
855 * @param newacting acting set
856 * @param history pg history
857 * @param pi past_intervals
858 * @param backfill true if info should be marked as backfill
859 * @param t transaction to write out our new state in
7c673cae 860 */
9f95a23c
TL
861void PG::init(
862 int role,
863 const vector<int>& newup, int new_up_primary,
864 const vector<int>& newacting, int new_acting_primary,
865 const pg_history_t& history,
866 const PastIntervals& pi,
867 bool backfill,
868 ObjectStore::Transaction &t)
869{
870 recovery_state.init(
871 role, newup, new_up_primary, newacting,
872 new_acting_primary, history, pi, backfill, t);
873}
7c673cae 874
9f95a23c
TL
875void PG::shutdown()
876{
877 ch->flush();
878 std::scoped_lock l{*this};
879 recovery_state.shutdown();
880 on_shutdown();
881}
7c673cae 882
9f95a23c
TL
883#pragma GCC diagnostic ignored "-Wpragmas"
884#pragma GCC diagnostic push
885#pragma GCC diagnostic ignored "-Wdeprecated-declarations"
7c673cae 886
9f95a23c
TL
887void PG::upgrade(ObjectStore *store)
888{
889 dout(0) << __func__ << " " << info_struct_v << " -> " << pg_latest_struct_v
890 << dendl;
891 ceph_assert(info_struct_v <= 10);
892 ObjectStore::Transaction t;
7c673cae 893
9f95a23c 894 // <do upgrade steps here>
7c673cae 895
9f95a23c
TL
896 // finished upgrade!
897 ceph_assert(info_struct_v == 10);
7c673cae 898
9f95a23c
TL
899 // update infover_key
900 if (info_struct_v < pg_latest_struct_v) {
901 map<string,bufferlist> v;
902 __u8 ver = pg_latest_struct_v;
903 encode(ver, v[string(infover_key)]);
904 t.omap_setkeys(coll, pgmeta_oid, v);
7c673cae 905 }
7c673cae 906
9f95a23c 907 recovery_state.force_write_state(t);
7c673cae 908
9f95a23c
TL
909 ObjectStore::CollectionHandle ch = store->open_collection(coll);
910 int r = store->queue_transaction(ch, std::move(t));
911 if (r != 0) {
912 derr << __func__ << ": queue_transaction returned "
913 << cpp_strerror(r) << dendl;
914 ceph_abort();
7c673cae 915 }
9f95a23c 916 ceph_assert(r == 0);
7c673cae 917
9f95a23c
TL
918 C_SaferCond waiter;
919 if (!ch->flush_commit(&waiter)) {
920 waiter.wait();
7c673cae 921 }
9f95a23c 922}
7c673cae 923
9f95a23c
TL
924#pragma GCC diagnostic pop
925#pragma GCC diagnostic warning "-Wpragmas"
7c673cae 926
9f95a23c
TL
927void PG::prepare_write(
928 pg_info_t &info,
929 pg_info_t &last_written_info,
930 PastIntervals &past_intervals,
931 PGLog &pglog,
932 bool dirty_info,
933 bool dirty_big_info,
934 bool need_write_epoch,
935 ObjectStore::Transaction &t)
936{
937 info.stats.stats.add(unstable_stats);
938 unstable_stats.clear();
939 map<string,bufferlist> km;
940 string key_to_remove;
941 if (dirty_big_info || dirty_info) {
942 int ret = prepare_info_keymap(
943 cct,
944 &km,
945 &key_to_remove,
11fdf7f2 946 get_osdmap_epoch(),
9f95a23c
TL
947 info,
948 last_written_info,
949 past_intervals,
950 dirty_big_info,
951 need_write_epoch,
952 cct->_conf->osd_fast_info,
953 osd->logger,
954 this);
955 ceph_assert(ret == 0);
7c673cae 956 }
9f95a23c
TL
957 pglog.write_log_and_missing(
958 t, &km, coll, pgmeta_oid, pool.info.require_rollback());
959 if (!km.empty())
960 t.omap_setkeys(coll, pgmeta_oid, km);
961 if (!key_to_remove.empty())
962 t.omap_rmkey(coll, pgmeta_oid, key_to_remove);
963}
7c673cae 964
9f95a23c
TL
965#pragma GCC diagnostic ignored "-Wpragmas"
966#pragma GCC diagnostic push
967#pragma GCC diagnostic ignored "-Wdeprecated-declarations"
7c673cae 968
9f95a23c
TL
969bool PG::_has_removal_flag(ObjectStore *store,
970 spg_t pgid)
971{
972 coll_t coll(pgid);
973 ghobject_t pgmeta_oid(pgid.make_pgmeta_oid());
7c673cae 974
9f95a23c
TL
975 // first try new way
976 set<string> keys;
977 keys.insert("_remove");
978 map<string,bufferlist> values;
979 auto ch = store->open_collection(coll);
980 ceph_assert(ch);
981 if (store->omap_get_values(ch, pgmeta_oid, keys, &values) == 0 &&
982 values.size() == 1)
983 return true;
7c673cae 984
9f95a23c
TL
985 return false;
986}
c07f9fc5 987
9f95a23c
TL
988int PG::peek_map_epoch(ObjectStore *store,
989 spg_t pgid,
990 epoch_t *pepoch)
991{
992 coll_t coll(pgid);
993 ghobject_t legacy_infos_oid(OSD::make_infos_oid());
994 ghobject_t pgmeta_oid(pgid.make_pgmeta_oid());
995 epoch_t cur_epoch = 0;
7c673cae 996
9f95a23c
TL
997 // validate collection name
998 ceph_assert(coll.is_pg());
7c673cae 999
9f95a23c
TL
1000 // try for v8
1001 set<string> keys;
1002 keys.insert(string(infover_key));
1003 keys.insert(string(epoch_key));
1004 map<string,bufferlist> values;
1005 auto ch = store->open_collection(coll);
1006 ceph_assert(ch);
1007 int r = store->omap_get_values(ch, pgmeta_oid, keys, &values);
1008 if (r == 0) {
1009 ceph_assert(values.size() == 2);
7c673cae 1010
9f95a23c
TL
1011 // sanity check version
1012 auto bp = values[string(infover_key)].cbegin();
1013 __u8 struct_v = 0;
1014 decode(struct_v, bp);
1015 ceph_assert(struct_v >= 8);
91327a77 1016
9f95a23c
TL
1017 // get epoch
1018 bp = values[string(epoch_key)].begin();
1019 decode(cur_epoch, bp);
1020 } else {
1021 // probably bug 10617; see OSD::load_pgs()
1022 return -1;
1023 }
7c673cae 1024
9f95a23c
TL
1025 *pepoch = cur_epoch;
1026 return 0;
1027}
7c673cae 1028
9f95a23c
TL
1029#pragma GCC diagnostic pop
1030#pragma GCC diagnostic warning "-Wpragmas"
7c673cae 1031
9f95a23c
TL
1032bool PG::check_log_for_corruption(ObjectStore *store)
1033{
1034 /// TODO: this method needs to work with the omap log
1035 return true;
1036}
7c673cae 1037
9f95a23c
TL
1038//! Get the name we're going to save our corrupt page log as
1039std::string PG::get_corrupt_pg_log_name() const
1040{
1041 const int MAX_BUF = 512;
1042 char buf[MAX_BUF];
1043 struct tm tm_buf;
1044 time_t my_time(time(NULL));
1045 const struct tm *t = localtime_r(&my_time, &tm_buf);
1046 int ret = strftime(buf, sizeof(buf), "corrupt_log_%Y-%m-%d_%k:%M_", t);
1047 if (ret == 0) {
1048 dout(0) << "strftime failed" << dendl;
1049 return "corrupt_log_unknown_time";
7c673cae 1050 }
9f95a23c
TL
1051 string out(buf);
1052 out += stringify(info.pgid);
1053 return out;
7c673cae
FG
1054}
1055
9f95a23c
TL
1056int PG::read_info(
1057 ObjectStore *store, spg_t pgid, const coll_t &coll,
1058 pg_info_t &info, PastIntervals &past_intervals,
1059 __u8 &struct_v)
7c673cae 1060{
9f95a23c
TL
1061 set<string> keys;
1062 keys.insert(string(infover_key));
1063 keys.insert(string(info_key));
1064 keys.insert(string(biginfo_key));
1065 keys.insert(string(fastinfo_key));
1066 ghobject_t pgmeta_oid(pgid.make_pgmeta_oid());
1067 map<string,bufferlist> values;
1068 auto ch = store->open_collection(coll);
1069 ceph_assert(ch);
1070 int r = store->omap_get_values(ch, pgmeta_oid, keys, &values);
1071 ceph_assert(r == 0);
1072 ceph_assert(values.size() == 3 ||
1073 values.size() == 4);
7c673cae 1074
9f95a23c
TL
1075 auto p = values[string(infover_key)].cbegin();
1076 decode(struct_v, p);
1077 ceph_assert(struct_v >= 10);
7c673cae 1078
9f95a23c
TL
1079 p = values[string(info_key)].begin();
1080 decode(info, p);
7c673cae 1081
9f95a23c
TL
1082 p = values[string(biginfo_key)].begin();
1083 decode(past_intervals, p);
1084 decode(info.purged_snaps, p);
7c673cae 1085
9f95a23c
TL
1086 p = values[string(fastinfo_key)].begin();
1087 if (!p.end()) {
1088 pg_fast_info_t fast;
1089 decode(fast, p);
1090 fast.try_apply_to(&info);
1091 }
1092 return 0;
7c673cae
FG
1093}
1094
9f95a23c
TL
1095void PG::read_state(ObjectStore *store)
1096{
1097 PastIntervals past_intervals_from_disk;
1098 pg_info_t info_from_disk;
1099 int r = read_info(
1100 store,
1101 pg_id,
1102 coll,
1103 info_from_disk,
1104 past_intervals_from_disk,
1105 info_struct_v);
1106 ceph_assert(r >= 0);
7c673cae 1107
9f95a23c
TL
1108 if (info_struct_v < pg_compat_struct_v) {
1109 derr << "PG needs upgrade, but on-disk data is too old; upgrade to"
1110 << " an older version first." << dendl;
1111 ceph_abort_msg("PG too old to upgrade");
1112 }
7c673cae 1113
9f95a23c
TL
1114 recovery_state.init_from_disk_state(
1115 std::move(info_from_disk),
1116 std::move(past_intervals_from_disk),
1117 [this, store] (PGLog &pglog) {
1118 ostringstream oss;
1119 pglog.read_log_and_missing(
1120 store,
1121 ch,
1122 pgmeta_oid,
1123 info,
1124 oss,
1125 cct->_conf->osd_ignore_stale_divergent_priors,
1126 cct->_conf->osd_debug_verify_missing_on_start);
1127
1128 if (oss.tellp())
1129 osd->clog->error() << oss.str();
1130 return 0;
1131 });
7c673cae 1132
9f95a23c
TL
1133 if (info_struct_v < pg_latest_struct_v) {
1134 upgrade(store);
7c673cae
FG
1135 }
1136
9f95a23c
TL
1137 // initialize current mapping
1138 {
1139 int primary, up_primary;
1140 vector<int> acting, up;
1141 get_osdmap()->pg_to_up_acting_osds(
1142 pg_id.pgid, &up, &up_primary, &acting, &primary);
1143 recovery_state.init_primary_up_acting(
1144 up,
1145 acting,
1146 up_primary,
1147 primary);
1148 recovery_state.set_role(OSDMap::calc_pg_role(pg_whoami, acting));
1149 }
1150
1151 // init pool options
1152 store->set_collection_opts(ch, pool.info.opts);
7c673cae 1153
9f95a23c
TL
1154 PeeringCtx rctx(ceph_release_t::unknown);
1155 handle_initialize(rctx);
1156 // note: we don't activate here because we know the OSD will advance maps
1157 // during boot.
1158 write_if_dirty(rctx.transaction);
1159 store->queue_transaction(ch, std::move(rctx.transaction));
7c673cae
FG
1160}
1161
9f95a23c
TL
1162void PG::update_snap_map(
1163 const vector<pg_log_entry_t> &log_entries,
1164 ObjectStore::Transaction &t)
7c673cae 1165{
f67539c2 1166 for (auto i = log_entries.cbegin(); i != log_entries.cend(); ++i) {
9f95a23c
TL
1167 OSDriver::OSTransaction _t(osdriver.get_transaction(&t));
1168 if (i->soid.snap < CEPH_MAXSNAP) {
1169 if (i->is_delete()) {
1170 int r = snap_mapper.remove_oid(
1171 i->soid,
1172 &_t);
f67539c2 1173 if (r)
9f95a23c
TL
1174 derr << __func__ << " remove_oid " << i->soid << " failed with " << r << dendl;
1175 // On removal tolerate missing key corruption
1176 ceph_assert(r == 0 || r == -ENOENT);
1177 } else if (i->is_update()) {
1178 ceph_assert(i->snaps.length() > 0);
1179 vector<snapid_t> snaps;
1180 bufferlist snapbl = i->snaps;
1181 auto p = snapbl.cbegin();
1182 try {
1183 decode(snaps, p);
1184 } catch (...) {
1185 derr << __func__ << " decode snaps failure on " << *i << dendl;
1186 snaps.clear();
1187 }
1188 set<snapid_t> _snaps(snaps.begin(), snaps.end());
7c673cae 1189
9f95a23c
TL
1190 if (i->is_clone() || i->is_promote()) {
1191 snap_mapper.add_oid(
1192 i->soid,
1193 _snaps,
1194 &_t);
1195 } else if (i->is_modify()) {
1196 int r = snap_mapper.update_snaps(
1197 i->soid,
1198 _snaps,
1199 0,
1200 &_t);
1201 ceph_assert(r == 0);
11fdf7f2 1202 } else {
9f95a23c 1203 ceph_assert(i->is_clean());
11fdf7f2
TL
1204 }
1205 }
11fdf7f2
TL
1206 }
1207 }
7c673cae
FG
1208}
1209
9f95a23c
TL
1210/**
1211 * filter trimming|trimmed snaps out of snapcontext
1212 */
1213void PG::filter_snapc(vector<snapid_t> &snaps)
7c673cae 1214{
9f95a23c
TL
1215 // nothing needs to trim, we can return immediately
1216 if (snap_trimq.empty() && info.purged_snaps.empty())
1217 return;
1218
1219 bool filtering = false;
1220 vector<snapid_t> newsnaps;
1221 for (vector<snapid_t>::iterator p = snaps.begin();
1222 p != snaps.end();
1223 ++p) {
1224 if (snap_trimq.contains(*p) || info.purged_snaps.contains(*p)) {
1225 if (!filtering) {
1226 // start building a new vector with what we've seen so far
1227 dout(10) << "filter_snapc filtering " << snaps << dendl;
1228 newsnaps.insert(newsnaps.begin(), snaps.begin(), p);
1229 filtering = true;
1230 }
1231 dout(20) << "filter_snapc removing trimq|purged snap " << *p << dendl;
1232 } else {
1233 if (filtering)
1234 newsnaps.push_back(*p); // continue building new vector
d2e6a577 1235 }
c07f9fc5 1236 }
9f95a23c
TL
1237 if (filtering) {
1238 snaps.swap(newsnaps);
1239 dout(10) << "filter_snapc result " << snaps << dendl;
a8e16298 1240 }
a8e16298
TL
1241}
1242
9f95a23c 1243void PG::requeue_object_waiters(map<hobject_t, list<OpRequestRef>>& m)
a8e16298 1244{
f67539c2 1245 for (auto it = m.begin(); it != m.end(); ++it)
9f95a23c
TL
1246 requeue_ops(it->second);
1247 m.clear();
c07f9fc5 1248}
7c673cae 1249
9f95a23c 1250void PG::requeue_op(OpRequestRef op)
c07f9fc5 1251{
9f95a23c
TL
1252 auto p = waiting_for_map.find(op->get_source());
1253 if (p != waiting_for_map.end()) {
1254 dout(20) << __func__ << " " << op << " (waiting_for_map " << p->first << ")"
1255 << dendl;
1256 p->second.push_front(op);
c07f9fc5 1257 } else {
9f95a23c
TL
1258 dout(20) << __func__ << " " << op << dendl;
1259 osd->enqueue_front(
1260 OpSchedulerItem(
1261 unique_ptr<OpSchedulerItem::OpQueueable>(new PGOpItem(info.pgid, op)),
1262 op->get_req()->get_cost(),
1263 op->get_req()->get_priority(),
1264 op->get_req()->get_recv_stamp(),
1265 op->get_req()->get_source().num(),
1266 get_osdmap_epoch()));
7c673cae 1267 }
c07f9fc5 1268}
7c673cae 1269
9f95a23c 1270void PG::requeue_ops(list<OpRequestRef> &ls)
c07f9fc5 1271{
9f95a23c
TL
1272 for (list<OpRequestRef>::reverse_iterator i = ls.rbegin();
1273 i != ls.rend();
1274 ++i) {
1275 requeue_op(*i);
c07f9fc5 1276 }
9f95a23c 1277 ls.clear();
7c673cae
FG
1278}
1279
9f95a23c 1280void PG::requeue_map_waiters()
7c673cae 1281{
9f95a23c
TL
1282 epoch_t epoch = get_osdmap_epoch();
1283 auto p = waiting_for_map.begin();
1284 while (p != waiting_for_map.end()) {
1285 if (epoch < p->second.front()->min_epoch) {
1286 dout(20) << __func__ << " " << p->first << " front op "
1287 << p->second.front() << " must still wait, doing nothing"
1288 << dendl;
1289 ++p;
1290 } else {
1291 dout(20) << __func__ << " " << p->first << " " << p->second << dendl;
1292 for (auto q = p->second.rbegin(); q != p->second.rend(); ++q) {
1293 auto req = *q;
1294 osd->enqueue_front(OpSchedulerItem(
1295 unique_ptr<OpSchedulerItem::OpQueueable>(new PGOpItem(info.pgid, req)),
1296 req->get_req()->get_cost(),
1297 req->get_req()->get_priority(),
1298 req->get_req()->get_recv_stamp(),
1299 req->get_req()->get_source().num(),
1300 epoch));
1301 }
1302 p = waiting_for_map.erase(p);
c07f9fc5 1303 }
9f95a23c
TL
1304 }
1305}
7c673cae 1306
f67539c2
TL
1307bool PG::get_must_scrub() const
1308{
1309 dout(20) << __func__ << " must_scrub? " << (m_planned_scrub.must_scrub ? "true" : "false") << dendl;
1310 return m_planned_scrub.must_scrub;
1311}
1312
1313unsigned int PG::scrub_requeue_priority(Scrub::scrub_prio_t with_priority) const
1314{
1315 return m_scrubber->scrub_requeue_priority(with_priority);
1316}
1317
1318unsigned int PG::scrub_requeue_priority(Scrub::scrub_prio_t with_priority, unsigned int suggested_priority) const
1319{
1320 return m_scrubber->scrub_requeue_priority(with_priority, suggested_priority);
1321}
7c673cae 1322
9f95a23c
TL
1323// ==========================================================================================
1324// SCRUB
7c673cae 1325
9f95a23c 1326/*
f67539c2
TL
1327 * implementation note:
1328 * PG::sched_scrub() is called only once per a specific scrub session.
1329 * That call commits us to the whatever choices are made (deep/shallow, etc').
1330 * Unless failing to start scrubbing, the 'planned scrub' flag-set is 'frozen' into
1331 * PgScrubber's m_flags, then cleared.
9f95a23c 1332 */
9f95a23c 1333bool PG::sched_scrub()
11fdf7f2 1334{
f67539c2
TL
1335 dout(15) << __func__ << " pg(" << info.pgid
1336 << (is_active() ? ") <active>" : ") <not-active>")
1337 << (is_clean() ? " <clean>" : " <not-clean>") << dendl;
9f95a23c 1338 ceph_assert(ceph_mutex_is_locked(_lock));
f67539c2 1339
b3b6e05e
TL
1340 if (m_scrubber && m_scrubber->is_scrub_active()) {
1341 return false;
1342 }
1343
f67539c2 1344 if (!is_primary() || !is_active() || !is_clean()) {
9f95a23c 1345 return false;
11fdf7f2 1346 }
11fdf7f2 1347
f67539c2
TL
1348 if (scrub_queued) {
1349 // only applicable to the very first time a scrub event is queued
1350 // (until handled and posted to the scrub FSM)
1351 dout(10) << __func__ << ": already queued" << dendl;
1352 return false;
1353 }
7c673cae 1354
f67539c2
TL
1355 // analyse the combination of the requested scrub flags, the osd/pool configuration
1356 // and the PG status to determine whether we should scrub now, and what type of scrub
1357 // should that be.
1358 auto updated_flags = verify_scrub_mode();
1359 if (!updated_flags) {
1360 // the stars do not align for starting a scrub for this PG at this time
1361 // (due to configuration or priority issues)
1362 // The reason was already reported by the callee.
1363 dout(10) << __func__ << ": failed to initiate a scrub" << dendl;
1364 return false;
1365 }
7c673cae 1366
f67539c2
TL
1367 // try to reserve the local OSD resources. If failing: no harm. We will
1368 // be retried by the OSD later on.
1369 if (!m_scrubber->reserve_local()) {
1370 dout(10) << __func__ << ": failed to reserve locally" << dendl;
1371 return false;
1372 }
7c673cae 1373
f67539c2
TL
1374 // can commit to the updated flags now, as nothing will stop the scrub
1375 m_planned_scrub = *updated_flags;
9f95a23c 1376
f67539c2
TL
1377 // An interrupted recovery repair could leave this set.
1378 state_clear(PG_STATE_REPAIR);
9f95a23c 1379
f67539c2
TL
1380 // Pass control to the scrubber. It is the scrubber that handles the replicas'
1381 // resources reservations.
1382 m_scrubber->set_op_parameters(m_planned_scrub);
9f95a23c 1383
f67539c2
TL
1384 dout(10) << __func__ << ": queueing" << dendl;
1385
1386 scrub_queued = true;
1387 osd->queue_for_scrub(this, Scrub::scrub_prio_t::low_priority);
1388 return true;
1389}
1390
1391double PG::next_deepscrub_interval() const
1392{
1393 double deep_scrub_interval =
1394 pool.info.opts.value_or(pool_opts_t::DEEP_SCRUB_INTERVAL, 0.0);
1395 if (deep_scrub_interval <= 0.0)
1396 deep_scrub_interval = cct->_conf->osd_deep_scrub_interval;
1397 return info.history.last_deep_scrub_stamp + deep_scrub_interval;
1398}
1399
1400bool PG::is_time_for_deep(bool allow_deep_scrub,
1401 bool allow_scrub,
1402 bool has_deep_errors,
1403 const requested_scrub_t& planned) const
1404{
1405 dout(10) << __func__ << ": need_auto?" << planned.need_auto << " allow_deep_scrub? " << allow_deep_scrub << dendl;
1406
1407 if (!allow_deep_scrub)
1408 return false;
1409
1410 if (planned.need_auto) {
1411 dout(10) << __func__ << ": need repair after scrub errors" << dendl;
1412 return true;
1413 }
1414
1415 if (ceph_clock_now() >= next_deepscrub_interval())
1416 return true;
1417
1418 if (has_deep_errors) {
1419 osd->clog->info() << "osd." << osd->whoami << " pg " << info.pgid
1420 << " Deep scrub errors, upgrading scrub to deep-scrub";
1421 return true;
9f95a23c
TL
1422 }
1423
f67539c2
TL
1424 // we only flip coins if 'allow_scrub' is asserted. Otherwise - as this function is
1425 // called often, we will probably be deep-scrubbing most of the time.
1426 if (allow_scrub) {
1427 bool deep_coin_flip =
1428 (rand() % 100) < cct->_conf->osd_deep_scrub_randomize_ratio * 100;
1429
1430 dout(15) << __func__ << ": time_for_deep=" << planned.time_for_deep
1431 << " deep_coin_flip=" << deep_coin_flip << dendl;
1432
1433 if (deep_coin_flip)
1434 return true;
1435 }
1436
1437 return false;
1438}
1439
1440bool PG::verify_periodic_scrub_mode(bool allow_deep_scrub,
1441 bool try_to_auto_repair,
1442 bool allow_regular_scrub,
1443 bool has_deep_errors,
1444 requested_scrub_t& planned) const
1445
1446{
1447 ceph_assert(!planned.must_deep_scrub && !planned.must_repair);
1448
1449 if (!allow_deep_scrub && has_deep_errors) {
1450 osd->clog->error()
1451 << "osd." << osd->whoami << " pg " << info.pgid
1452 << " Regular scrub skipped due to deep-scrub errors and nodeep-scrub set";
9f95a23c 1453 return false;
f67539c2
TL
1454 }
1455
1456 if (allow_deep_scrub) {
1457 // Initial entry and scheduled scrubs without nodeep_scrub set get here
1458
1459 planned.time_for_deep =
1460 is_time_for_deep(allow_deep_scrub, allow_regular_scrub, has_deep_errors, planned);
1461
1462 if (try_to_auto_repair) {
1463 if (planned.time_for_deep) {
1464 dout(20) << __func__ << ": auto repair with deep scrubbing" << dendl;
1465 planned.auto_repair = true;
1466 } else if (allow_regular_scrub) {
1467 dout(20) << __func__ << ": auto repair with scrubbing, rescrub if errors found"
1468 << dendl;
1469 planned.deep_scrub_on_error = true;
9f95a23c 1470 }
7c673cae 1471 }
7c673cae 1472 }
f67539c2
TL
1473
1474 dout(20) << __func__ << " updated flags: " << planned
1475 << " allow_regular_scrub: " << allow_regular_scrub << dendl;
1476
1477 // NOSCRUB so skip regular scrubs
1478 if (!allow_regular_scrub && !planned.time_for_deep) {
1479 return false;
1480 }
1481
9f95a23c 1482 return true;
7c673cae
FG
1483}
1484
f67539c2 1485std::optional<requested_scrub_t> PG::verify_scrub_mode() const
7c673cae 1486{
f67539c2 1487 dout(10) << __func__ << " processing pg " << info.pgid << dendl;
7c673cae 1488
f67539c2
TL
1489 bool allow_deep_scrub = !(get_osdmap()->test_flag(CEPH_OSDMAP_NODEEP_SCRUB) ||
1490 pool.info.has_flag(pg_pool_t::FLAG_NODEEP_SCRUB));
1491 bool allow_regular_scrub = !(get_osdmap()->test_flag(CEPH_OSDMAP_NOSCRUB) ||
1492 pool.info.has_flag(pg_pool_t::FLAG_NOSCRUB));
1493 bool has_deep_errors = (info.stats.stats.sum.num_deep_scrub_errors > 0);
1494 bool try_to_auto_repair =
1495 (cct->_conf->osd_scrub_auto_repair && get_pgbackend()->auto_repair_supported());
7c673cae 1496
f67539c2
TL
1497 auto upd_flags = m_planned_scrub;
1498
1499 upd_flags.time_for_deep = false;
1500 // Clear these in case user issues the scrub/repair command during
1501 // the scheduling of the scrub/repair (e.g. request reservation)
1502 upd_flags.deep_scrub_on_error = false;
1503 upd_flags.auto_repair = false;
1504
1505 if (upd_flags.must_scrub && !upd_flags.must_deep_scrub && has_deep_errors) {
1506 osd->clog->error() << "osd." << osd->whoami << " pg " << info.pgid
1507 << " Regular scrub request, deep-scrub details will be lost";
1508 }
1509
1510 if (!upd_flags.must_scrub) {
1511 // All periodic scrub handling goes here because must_scrub is
1512 // always set for must_deep_scrub and must_repair.
1513
1514 bool can_start_periodic =
1515 verify_periodic_scrub_mode(allow_deep_scrub, try_to_auto_repair,
1516 allow_regular_scrub, has_deep_errors, upd_flags);
1517 if (!can_start_periodic) {
1518 return std::nullopt;
1519 }
1520 }
1521
1522 // scrubbing while recovering?
1523
1524 bool prevented_by_recovery =
1525 osd->is_recovery_active() && !cct->_conf->osd_scrub_during_recovery &&
1526 (!cct->_conf->osd_repair_during_recovery || !upd_flags.must_repair);
1527
1528 if (prevented_by_recovery) {
1529 dout(20) << __func__ << ": scrubbing prevented during recovery" << dendl;
1530 return std::nullopt;
7c673cae 1531 }
f67539c2
TL
1532
1533 upd_flags.need_auto = false;
1534 return upd_flags;
7c673cae
FG
1535}
1536
f67539c2 1537void PG::reg_next_scrub()
7c673cae 1538{
f67539c2 1539 m_scrubber->reg_next_scrub(m_planned_scrub);
9f95a23c 1540}
7c673cae 1541
9f95a23c
TL
1542void PG::on_info_history_change()
1543{
f67539c2
TL
1544 if (m_scrubber) {
1545 m_scrubber->unreg_next_scrub();
1546 m_scrubber->reg_next_scrub(m_planned_scrub);
1547 }
9f95a23c 1548}
7c673cae 1549
f67539c2 1550void PG::scrub_requested(scrub_level_t scrub_level, scrub_type_t scrub_type)
9f95a23c 1551{
f67539c2 1552 m_scrubber->scrub_requested(scrub_level, scrub_type, m_planned_scrub);
9f95a23c 1553}
7c673cae 1554
9f95a23c
TL
1555void PG::clear_ready_to_merge() {
1556 osd->clear_ready_to_merge(this);
1557}
7c673cae 1558
9f95a23c
TL
1559void PG::queue_want_pg_temp(const vector<int> &wanted) {
1560 osd->queue_want_pg_temp(get_pgid().pgid, wanted);
1561}
7c673cae 1562
9f95a23c
TL
1563void PG::clear_want_pg_temp() {
1564 osd->remove_want_pg_temp(get_pgid().pgid);
1565}
7c673cae 1566
9f95a23c
TL
1567void PG::on_role_change() {
1568 requeue_ops(waiting_for_peered);
1569 plpg_on_role_change();
1570}
7c673cae 1571
9f95a23c 1572void PG::on_new_interval() {
f67539c2 1573 dout(20) << __func__ << " scrub_queued was " << scrub_queued << " flags: " << m_planned_scrub << dendl;
9f95a23c
TL
1574 scrub_queued = false;
1575 projected_last_update = eversion_t();
1576 cancel_recovery();
1577}
7c673cae 1578
9f95a23c
TL
1579epoch_t PG::oldest_stored_osdmap() {
1580 return osd->get_superblock().oldest_map;
1581}
7c673cae 1582
9f95a23c
TL
1583OstreamTemp PG::get_clog_info() {
1584 return osd->clog->info();
1585}
7c673cae 1586
9f95a23c
TL
1587OstreamTemp PG::get_clog_debug() {
1588 return osd->clog->debug();
1589}
7c673cae 1590
9f95a23c
TL
1591OstreamTemp PG::get_clog_error() {
1592 return osd->clog->error();
1593}
7c673cae 1594
9f95a23c
TL
1595void PG::schedule_event_after(
1596 PGPeeringEventRef event,
1597 float delay) {
1598 std::lock_guard lock(osd->recovery_request_lock);
1599 osd->recovery_request_timer.add_event_after(
1600 delay,
1601 new QueuePeeringEvt(
1602 this,
1603 std::move(event)));
1604}
7c673cae 1605
9f95a23c
TL
1606void PG::request_local_background_io_reservation(
1607 unsigned priority,
f67539c2
TL
1608 PGPeeringEventURef on_grant,
1609 PGPeeringEventURef on_preempt) {
9f95a23c
TL
1610 osd->local_reserver.request_reservation(
1611 pg_id,
1612 on_grant ? new QueuePeeringEvt(
f67539c2 1613 this, std::move(on_grant)) : nullptr,
9f95a23c
TL
1614 priority,
1615 on_preempt ? new QueuePeeringEvt(
f67539c2 1616 this, std::move(on_preempt)) : nullptr);
9f95a23c 1617}
7c673cae 1618
9f95a23c
TL
1619void PG::update_local_background_io_priority(
1620 unsigned priority) {
1621 osd->local_reserver.update_priority(
1622 pg_id,
1623 priority);
1624}
7c673cae 1625
9f95a23c
TL
1626void PG::cancel_local_background_io_reservation() {
1627 osd->local_reserver.cancel_reservation(
1628 pg_id);
1629}
7c673cae 1630
9f95a23c
TL
1631void PG::request_remote_recovery_reservation(
1632 unsigned priority,
f67539c2
TL
1633 PGPeeringEventURef on_grant,
1634 PGPeeringEventURef on_preempt) {
9f95a23c
TL
1635 osd->remote_reserver.request_reservation(
1636 pg_id,
1637 on_grant ? new QueuePeeringEvt(
f67539c2 1638 this, std::move(on_grant)) : nullptr,
9f95a23c
TL
1639 priority,
1640 on_preempt ? new QueuePeeringEvt(
f67539c2 1641 this, std::move(on_preempt)) : nullptr);
9f95a23c 1642}
11fdf7f2 1643
9f95a23c
TL
1644void PG::cancel_remote_recovery_reservation() {
1645 osd->remote_reserver.cancel_reservation(
1646 pg_id);
7c673cae
FG
1647}
1648
9f95a23c
TL
1649void PG::schedule_event_on_commit(
1650 ObjectStore::Transaction &t,
1651 PGPeeringEventRef on_commit)
11fdf7f2 1652{
9f95a23c 1653 t.register_on_commit(new QueuePeeringEvt(this, on_commit));
11fdf7f2
TL
1654}
1655
f67539c2
TL
1656void PG::on_activate(interval_set<snapid_t> snaps)
1657{
1658 ceph_assert(!m_scrubber->are_callbacks_pending());
1659 ceph_assert(callbacks_for_degraded_object.empty());
1660 snap_trimq = snaps;
1661 release_pg_backoffs();
1662 projected_last_update = info.last_update;
1663}
1664
9f95a23c 1665void PG::on_active_exit()
11fdf7f2 1666{
9f95a23c
TL
1667 backfill_reserving = false;
1668 agent_stop();
11fdf7f2
TL
1669}
1670
9f95a23c 1671void PG::on_active_advmap(const OSDMapRef &osdmap)
11fdf7f2 1672{
9f95a23c
TL
1673 const auto& new_removed_snaps = osdmap->get_new_removed_snaps();
1674 auto i = new_removed_snaps.find(get_pgid().pool());
1675 if (i != new_removed_snaps.end()) {
1676 bool bad = false;
1677 for (auto j : i->second) {
1678 if (snap_trimq.intersects(j.first, j.second)) {
1679 decltype(snap_trimq) added, overlap;
1680 added.insert(j.first, j.second);
1681 overlap.intersection_of(snap_trimq, added);
1682 derr << __func__ << " removed_snaps already contains "
1683 << overlap << dendl;
1684 bad = true;
1685 snap_trimq.union_of(added);
1686 } else {
1687 snap_trimq.insert(j.first, j.second);
1688 }
1689 }
1690 dout(10) << __func__ << " new removed_snaps " << i->second
1691 << ", snap_trimq now " << snap_trimq << dendl;
1692 ceph_assert(!bad || !cct->_conf->osd_debug_verify_cached_snaps);
1693 }
1694
1695 const auto& new_purged_snaps = osdmap->get_new_purged_snaps();
1696 auto j = new_purged_snaps.find(get_pgid().pgid.pool());
1697 if (j != new_purged_snaps.end()) {
1698 bool bad = false;
1699 for (auto k : j->second) {
1700 if (!recovery_state.get_info().purged_snaps.contains(k.first, k.second)) {
1701 interval_set<snapid_t> rm, overlap;
1702 rm.insert(k.first, k.second);
1703 overlap.intersection_of(recovery_state.get_info().purged_snaps, rm);
1704 derr << __func__ << " purged_snaps does not contain "
1705 << rm << ", only " << overlap << dendl;
1706 recovery_state.adjust_purged_snaps(
1707 [&overlap](auto &purged_snaps) {
1708 purged_snaps.subtract(overlap);
1709 });
1710 // This can currently happen in the normal (if unlikely) course of
1711 // events. Because adding snaps to purged_snaps does not increase
1712 // the pg version or add a pg log entry, we don't reliably propagate
1713 // purged_snaps additions to other OSDs.
1714 // One example:
1715 // - purge S
1716 // - primary and replicas update purged_snaps
1717 // - no object updates
1718 // - pg mapping changes, new primary on different node
1719 // - new primary pg version == eversion_t(), so info is not
1720 // propagated.
1721 //bad = true;
1722 } else {
1723 recovery_state.adjust_purged_snaps(
1724 [&k](auto &purged_snaps) {
1725 purged_snaps.erase(k.first, k.second);
1726 });
11fdf7f2
TL
1727 }
1728 }
9f95a23c
TL
1729 dout(10) << __func__ << " new purged_snaps " << j->second
1730 << ", now " << recovery_state.get_info().purged_snaps << dendl;
1731 ceph_assert(!bad || !cct->_conf->osd_debug_verify_cached_snaps);
11fdf7f2 1732 }
11fdf7f2
TL
1733}
1734
9f95a23c 1735void PG::queue_snap_retrim(snapid_t snap)
7c673cae 1736{
9f95a23c
TL
1737 if (!is_active() ||
1738 !is_primary()) {
1739 dout(10) << __func__ << " snap " << snap << " - not active and primary"
1740 << dendl;
7c673cae 1741 return;
7c673cae 1742 }
9f95a23c
TL
1743 if (!snap_trimq.contains(snap)) {
1744 snap_trimq.insert(snap);
1745 snap_trimq_repeat.insert(snap);
1746 dout(20) << __func__ << " snap " << snap
1747 << ", trimq now " << snap_trimq
1748 << ", repeat " << snap_trimq_repeat << dendl;
1749 kick_snap_trim();
1750 } else {
1751 dout(20) << __func__ << " snap " << snap
1752 << " already in trimq " << snap_trimq << dendl;
7c673cae 1753 }
7c673cae
FG
1754}
1755
9f95a23c 1756void PG::on_active_actmap()
7c673cae 1757{
9f95a23c
TL
1758 if (cct->_conf->osd_check_for_log_corruption)
1759 check_log_for_corruption(osd->store);
1760
1761
1762 if (recovery_state.is_active()) {
1763 dout(10) << "Active: kicking snap trim" << dendl;
1764 kick_snap_trim();
7c673cae 1765 }
9f95a23c
TL
1766
1767 if (recovery_state.is_peered() &&
1768 !recovery_state.is_clean() &&
1769 !recovery_state.get_osdmap()->test_flag(CEPH_OSDMAP_NOBACKFILL) &&
1770 (!recovery_state.get_osdmap()->test_flag(CEPH_OSDMAP_NOREBALANCE) ||
1771 recovery_state.is_degraded())) {
1772 queue_recovery();
7c673cae
FG
1773 }
1774}
1775
9f95a23c 1776void PG::on_backfill_reserved()
7c673cae 1777{
9f95a23c
TL
1778 backfill_reserving = false;
1779 queue_recovery();
7c673cae
FG
1780}
1781
9f95a23c 1782void PG::on_backfill_canceled()
7c673cae 1783{
9f95a23c
TL
1784 if (!waiting_on_backfill.empty()) {
1785 waiting_on_backfill.clear();
1786 finish_recovery_op(hobject_t::get_max());
7c673cae
FG
1787 }
1788}
1789
9f95a23c 1790void PG::on_recovery_reserved()
7c673cae 1791{
9f95a23c 1792 queue_recovery();
7c673cae
FG
1793}
1794
9f95a23c 1795void PG::set_not_ready_to_merge_target(pg_t pgid, pg_t src)
7c673cae 1796{
9f95a23c 1797 osd->set_not_ready_to_merge_target(pgid, src);
7c673cae
FG
1798}
1799
9f95a23c 1800void PG::set_not_ready_to_merge_source(pg_t pgid)
7c673cae 1801{
9f95a23c 1802 osd->set_not_ready_to_merge_source(pgid);
7c673cae
FG
1803}
1804
9f95a23c 1805void PG::set_ready_to_merge_target(eversion_t lu, epoch_t les, epoch_t lec)
7c673cae 1806{
9f95a23c 1807 osd->set_ready_to_merge_target(this, lu, les, lec);
7c673cae
FG
1808}
1809
9f95a23c 1810void PG::set_ready_to_merge_source(eversion_t lu)
7c673cae 1811{
9f95a23c 1812 osd->set_ready_to_merge_source(this, lu);
7c673cae
FG
1813}
1814
9f95a23c 1815void PG::send_pg_created(pg_t pgid)
7c673cae 1816{
9f95a23c
TL
1817 osd->send_pg_created(pgid);
1818}
7c673cae 1819
9f95a23c
TL
1820ceph::signedspan PG::get_mnow()
1821{
1822 return osd->get_mnow();
1823}
7c673cae 1824
9f95a23c
TL
1825HeartbeatStampsRef PG::get_hb_stamps(int peer)
1826{
1827 return osd->get_hb_stamps(peer);
7c673cae
FG
1828}
1829
9f95a23c
TL
1830void PG::schedule_renew_lease(epoch_t lpr, ceph::timespan delay)
1831{
1832 auto spgid = info.pgid;
1833 auto o = osd;
1834 osd->mono_timer.add_event(
1835 delay,
1836 [o, lpr, spgid]() {
1837 o->queue_renew_lease(lpr, spgid);
1838 });
1839}
7c673cae 1840
9f95a23c 1841void PG::queue_check_readable(epoch_t lpr, ceph::timespan delay)
7c673cae 1842{
9f95a23c 1843 osd->queue_check_readable(info.pgid, lpr, delay);
7c673cae
FG
1844}
1845
9f95a23c 1846void PG::rebuild_missing_set_with_deletes(PGLog &pglog)
91327a77 1847{
9f95a23c
TL
1848 pglog.rebuild_missing_set_with_deletes(
1849 osd->store,
1850 ch,
1851 recovery_state.get_info());
91327a77
AA
1852}
1853
9f95a23c 1854void PG::on_activate_committed()
91327a77 1855{
9f95a23c
TL
1856 if (!is_primary()) {
1857 // waiters
1858 if (recovery_state.needs_flush() == 0) {
1859 requeue_ops(waiting_for_peered);
1860 } else if (!waiting_for_peered.empty()) {
1861 dout(10) << __func__ << " flushes in progress, moving "
1862 << waiting_for_peered.size() << " items to waiting_for_flush"
1863 << dendl;
1864 ceph_assert(waiting_for_flush.empty());
1865 waiting_for_flush.swap(waiting_for_peered);
91327a77 1866 }
9f95a23c
TL
1867 }
1868}
91327a77 1869
9f95a23c
TL
1870// Compute pending backfill data
1871static int64_t pending_backfill(CephContext *cct, int64_t bf_bytes, int64_t local_bytes)
11fdf7f2 1872{
9f95a23c
TL
1873 lgeneric_dout(cct, 20) << __func__ << " Adjust local usage "
1874 << (local_bytes >> 10) << "KiB"
1875 << " primary usage " << (bf_bytes >> 10)
1876 << "KiB" << dendl;
11fdf7f2 1877
9f95a23c
TL
1878 return std::max((int64_t)0, bf_bytes - local_bytes);
1879}
7c673cae 1880
7c673cae 1881
9f95a23c
TL
1882// We can zero the value of primary num_bytes as just an atomic.
1883// However, setting above zero reserves space for backfill and requires
1884// the OSDService::stat_lock which protects all OSD usage
1885bool PG::try_reserve_recovery_space(
1886 int64_t primary_bytes, int64_t local_bytes) {
1887 // Use tentative_bacfill_full() to make sure enough
1888 // space is available to handle target bytes from primary.
7c673cae 1889
9f95a23c
TL
1890 // TODO: If we passed num_objects from primary we could account for
1891 // an estimate of the metadata overhead.
7c673cae 1892
9f95a23c
TL
1893 // TODO: If we had compressed_allocated and compressed_original from primary
1894 // we could compute compression ratio and adjust accordingly.
7c673cae 1895
9f95a23c
TL
1896 // XXX: There is no way to get omap overhead and this would only apply
1897 // to whatever possibly different partition that is storing the database.
7c673cae 1898
9f95a23c
TL
1899 // update_osd_stat() from heartbeat will do this on a new
1900 // statfs using ps->primary_bytes.
1901 uint64_t pending_adjustment = 0;
1902 if (primary_bytes) {
1903 // For erasure coded pool overestimate by a full stripe per object
1904 // because we don't know how each objected rounded to the nearest stripe
1905 if (pool.info.is_erasure()) {
1906 primary_bytes /= (int)get_pgbackend()->get_ec_data_chunk_count();
1907 primary_bytes += get_pgbackend()->get_ec_stripe_chunk_size() *
1908 info.stats.stats.sum.num_objects;
1909 local_bytes /= (int)get_pgbackend()->get_ec_data_chunk_count();
1910 local_bytes += get_pgbackend()->get_ec_stripe_chunk_size() *
1911 info.stats.stats.sum.num_objects;
1912 }
1913 pending_adjustment = pending_backfill(
1914 cct,
1915 primary_bytes,
1916 local_bytes);
1917 dout(10) << __func__ << " primary_bytes " << (primary_bytes >> 10)
1918 << "KiB"
1919 << " local " << (local_bytes >> 10) << "KiB"
1920 << " pending_adjustments " << (pending_adjustment >> 10) << "KiB"
1921 << dendl;
7c673cae 1922 }
7c673cae 1923
9f95a23c
TL
1924 // This lock protects not only the stats OSDService but also setting the
1925 // pg primary_bytes. That's why we don't immediately unlock
1926 std::lock_guard l{osd->stat_lock};
1927 osd_stat_t cur_stat = osd->osd_stat;
1928 if (cct->_conf->osd_debug_reject_backfill_probability > 0 &&
1929 (rand()%1000 < (cct->_conf->osd_debug_reject_backfill_probability*1000.0))) {
1930 dout(10) << "backfill reservation rejected: failure injection"
1931 << dendl;
1932 return false;
1933 } else if (!cct->_conf->osd_debug_skip_full_check_in_backfill_reservation &&
1934 osd->tentative_backfill_full(this, pending_adjustment, cur_stat)) {
1935 dout(10) << "backfill reservation rejected: backfill full"
1936 << dendl;
1937 return false;
1938 } else {
1939 // Don't reserve space if skipped reservation check, this is used
1940 // to test the other backfill full check AND in case a corruption
1941 // of num_bytes requires ignoring that value and trying the
1942 // backfill anyway.
1943 if (primary_bytes &&
1944 !cct->_conf->osd_debug_skip_full_check_in_backfill_reservation) {
1945 primary_num_bytes.store(primary_bytes);
1946 local_num_bytes.store(local_bytes);
1947 } else {
1948 unreserve_recovery_space();
1949 }
1950 return true;
7c673cae
FG
1951 }
1952}
1953
9f95a23c
TL
1954void PG::unreserve_recovery_space() {
1955 primary_num_bytes.store(0);
1956 local_num_bytes.store(0);
7c673cae
FG
1957}
1958
9f95a23c 1959void PG::_scan_rollback_obs(const vector<ghobject_t> &rollback_obs)
7c673cae 1960{
9f95a23c
TL
1961 ObjectStore::Transaction t;
1962 eversion_t trimmed_to = recovery_state.get_last_rollback_info_trimmed_to_applied();
1963 for (vector<ghobject_t>::const_iterator i = rollback_obs.begin();
1964 i != rollback_obs.end();
1965 ++i) {
1966 if (i->generation < trimmed_to.version) {
1967 dout(10) << __func__ << "osd." << osd->whoami
1968 << " pg " << info.pgid
1969 << " found obsolete rollback obj "
1970 << *i << " generation < trimmed_to "
1971 << trimmed_to
1972 << "...repaired" << dendl;
1973 t.remove(coll, *i);
1974 }
1975 }
1976 if (!t.empty()) {
1977 derr << __func__ << ": queueing trans to clean up obsolete rollback objs"
1978 << dendl;
1979 osd->store->queue_transaction(ch, std::move(t), NULL);
1980 }
7c673cae
FG
1981}
1982
7c673cae 1983
9f95a23c 1984void PG::_repair_oinfo_oid(ScrubMap &smap)
7c673cae 1985{
9f95a23c
TL
1986 for (map<hobject_t, ScrubMap::object>::reverse_iterator i = smap.objects.rbegin();
1987 i != smap.objects.rend();
1988 ++i) {
1989 const hobject_t &hoid = i->first;
1990 ScrubMap::object &o = i->second;
7c673cae 1991
9f95a23c
TL
1992 bufferlist bl;
1993 if (o.attrs.find(OI_ATTR) == o.attrs.end()) {
1994 continue;
1995 }
1996 bl.push_back(o.attrs[OI_ATTR]);
1997 object_info_t oi;
1998 try {
1999 oi.decode(bl);
2000 } catch(...) {
2001 continue;
2002 }
2003 if (oi.soid != hoid) {
2004 ObjectStore::Transaction t;
2005 OSDriver::OSTransaction _t(osdriver.get_transaction(&t));
2006 osd->clog->error() << "osd." << osd->whoami
2007 << " found object info error on pg "
2008 << info.pgid
2009 << " oid " << hoid << " oid in object info: "
2010 << oi.soid
2011 << "...repaired";
2012 // Fix object info
2013 oi.soid = hoid;
2014 bl.clear();
2015 encode(oi, bl, get_osdmap()->get_features(CEPH_ENTITY_TYPE_OSD, nullptr));
7c673cae 2016
9f95a23c
TL
2017 bufferptr bp(bl.c_str(), bl.length());
2018 o.attrs[OI_ATTR] = bp;
7c673cae 2019
9f95a23c
TL
2020 t.setattr(coll, ghobject_t(hoid), OI_ATTR, bl);
2021 int r = osd->store->queue_transaction(ch, std::move(t));
2022 if (r != 0) {
2023 derr << __func__ << ": queue_transaction got " << cpp_strerror(r)
2024 << dendl;
2025 }
7c673cae
FG
2026 }
2027 }
7c673cae 2028}
7c673cae 2029
9f95a23c
TL
2030void PG::repair_object(
2031 const hobject_t &soid,
2032 const list<pair<ScrubMap::object, pg_shard_t> > &ok_peers,
2033 const set<pg_shard_t> &bad_peers)
2034{
2035 set<pg_shard_t> ok_shards;
2036 for (auto &&peer: ok_peers) ok_shards.insert(peer.second);
d2e6a577 2037
9f95a23c
TL
2038 dout(10) << "repair_object " << soid
2039 << " bad_peers osd.{" << bad_peers << "},"
2040 << " ok_peers osd.{" << ok_shards << "}" << dendl;
11fdf7f2 2041
9f95a23c
TL
2042 const ScrubMap::object &po = ok_peers.back().first;
2043 eversion_t v;
2044 object_info_t oi;
2045 try {
2046 bufferlist bv;
2047 if (po.attrs.count(OI_ATTR)) {
2048 bv.push_back(po.attrs.find(OI_ATTR)->second);
2049 }
2050 auto bliter = bv.cbegin();
2051 decode(oi, bliter);
2052 } catch (...) {
2053 dout(0) << __func__ << ": Need version of replica, bad object_info_t: "
2054 << soid << dendl;
2055 ceph_abort();
11fdf7f2
TL
2056 }
2057
9f95a23c
TL
2058 if (bad_peers.count(get_primary())) {
2059 // We should only be scrubbing if the PG is clean.
2060 ceph_assert(waiting_for_unreadable_object.empty());
2061 dout(10) << __func__ << ": primary = " << get_primary() << dendl;
11fdf7f2
TL
2062 }
2063
9f95a23c
TL
2064 /* No need to pass ok_peers, they must not be missing the object, so
2065 * force_object_missing will add them to missing_loc anyway */
2066 recovery_state.force_object_missing(bad_peers, soid, oi.version);
7c673cae
FG
2067}
2068
f67539c2 2069void PG::forward_scrub_event(ScrubAPI fn, epoch_t epoch_queued)
7c673cae 2070{
f67539c2
TL
2071 dout(20) << __func__ << " queued at: " << epoch_queued << dendl;
2072 if (is_active() && m_scrubber) {
2073 ((*m_scrubber).*fn)(epoch_queued);
7c673cae 2074 } else {
f67539c2
TL
2075 // pg might be in the process of being deleted
2076 dout(5) << __func__ << " refusing to forward. " << (is_clean() ? "(clean) " : "(not clean) ") <<
2077 (is_active() ? "(active) " : "(not active) ") << dendl;
7c673cae 2078 }
f67539c2 2079}
7c673cae 2080
f67539c2
TL
2081void PG::replica_scrub(OpRequestRef op, ThreadPool::TPHandle& handle)
2082{
2083 dout(10) << __func__ << " (op)" << dendl;
2084 if (m_scrubber)
2085 m_scrubber->replica_scrub_op(op);
7c673cae
FG
2086}
2087
f67539c2
TL
2088void PG::scrub(epoch_t epoch_queued, ThreadPool::TPHandle& handle)
2089{
2090 dout(10) << __func__ << " queued at: " << epoch_queued << dendl;
2091 // a new scrub
9f95a23c 2092 scrub_queued = false;
f67539c2 2093 forward_scrub_event(&ScrubPgIF::initiate_regular_scrub, epoch_queued);
7c673cae
FG
2094}
2095
f67539c2
TL
2096// note: no need to secure OSD resources for a recovery scrub
2097void PG::recovery_scrub(epoch_t epoch_queued,
2098 [[maybe_unused]] ThreadPool::TPHandle& handle)
f6b5b4d7 2099{
f67539c2
TL
2100 dout(10) << __func__ << " queued at: " << epoch_queued << dendl;
2101 // a new scrub
2102 scrub_queued = false;
2103 forward_scrub_event(&ScrubPgIF::initiate_scrub_after_repair, epoch_queued);
f6b5b4d7
TL
2104}
2105
f67539c2
TL
2106void PG::replica_scrub(epoch_t epoch_queued,
2107 [[maybe_unused]] ThreadPool::TPHandle& handle)
2108{
2109 dout(10) << __func__ << " queued at: " << epoch_queued
2110 << (is_primary() ? " (primary)" : " (replica)") << dendl;
2111 scrub_queued = false;
2112 forward_scrub_event(&ScrubPgIF::send_start_replica, epoch_queued);
7c673cae
FG
2113}
2114
f67539c2
TL
2115void PG::scrub_send_scrub_resched(epoch_t epoch_queued,
2116 [[maybe_unused]] ThreadPool::TPHandle& handle)
7c673cae 2117{
f67539c2
TL
2118 dout(10) << __func__ << " queued at: " << epoch_queued << dendl;
2119 scrub_queued = false;
2120 forward_scrub_event(&ScrubPgIF::send_scrub_resched, epoch_queued);
7c673cae
FG
2121}
2122
f67539c2
TL
2123void PG::scrub_send_resources_granted(epoch_t epoch_queued,
2124 [[maybe_unused]] ThreadPool::TPHandle& handle)
7c673cae 2125{
f67539c2
TL
2126 dout(10) << __func__ << " queued at: " << epoch_queued << dendl;
2127 forward_scrub_event(&ScrubPgIF::send_remotes_reserved, epoch_queued);
7c673cae
FG
2128}
2129
f67539c2
TL
2130void PG::scrub_send_resources_denied(epoch_t epoch_queued,
2131 [[maybe_unused]] ThreadPool::TPHandle& handle)
7c673cae 2132{
f67539c2
TL
2133 dout(10) << __func__ << " queued at: " << epoch_queued << dendl;
2134 forward_scrub_event(&ScrubPgIF::send_reservation_failure, epoch_queued);
7c673cae
FG
2135}
2136
f67539c2
TL
2137void PG::replica_scrub_resched(epoch_t epoch_queued,
2138 [[maybe_unused]] ThreadPool::TPHandle& handle)
7c673cae 2139{
f67539c2
TL
2140 dout(10) << __func__ << " queued at: " << epoch_queued << dendl;
2141 scrub_queued = false;
2142 forward_scrub_event(&ScrubPgIF::send_sched_replica, epoch_queued);
11fdf7f2
TL
2143}
2144
f67539c2
TL
2145void PG::scrub_send_pushes_update(epoch_t epoch_queued,
2146 [[maybe_unused]] ThreadPool::TPHandle& handle)
2147{
2148 dout(10) << __func__ << " queued at: " << epoch_queued << dendl;
2149 forward_scrub_event(&ScrubPgIF::active_pushes_notification, epoch_queued);
7c673cae
FG
2150}
2151
f67539c2
TL
2152void PG::scrub_send_replica_pushes(epoch_t epoch_queued,
2153 [[maybe_unused]] ThreadPool::TPHandle& handle)
2154{
2155 dout(15) << __func__ << " queued at: " << epoch_queued << dendl;
2156 forward_scrub_event(&ScrubPgIF::send_replica_pushes_upd, epoch_queued);
7c673cae
FG
2157}
2158
f67539c2
TL
2159void PG::scrub_send_applied_update(epoch_t epoch_queued,
2160 [[maybe_unused]] ThreadPool::TPHandle& handle)
7c673cae 2161{
f67539c2
TL
2162 dout(15) << __func__ << " queued at: " << epoch_queued << dendl;
2163 forward_scrub_event(&ScrubPgIF::update_applied_notification, epoch_queued);
2164}
7c673cae 2165
f67539c2
TL
2166void PG::scrub_send_unblocking(epoch_t epoch_queued,
2167 [[maybe_unused]] ThreadPool::TPHandle& handle)
2168{
2169 dout(15) << __func__ << " queued at: " << epoch_queued << dendl;
2170 forward_scrub_event(&ScrubPgIF::send_scrub_unblock, epoch_queued);
2171}
7c673cae 2172
f67539c2
TL
2173void PG::scrub_send_digest_update(epoch_t epoch_queued,
2174 [[maybe_unused]] ThreadPool::TPHandle& handle)
2175{
2176 dout(15) << __func__ << " queued at: " << epoch_queued << dendl;
2177 forward_scrub_event(&ScrubPgIF::digest_update_notification, epoch_queued);
2178}
7c673cae 2179
f67539c2
TL
2180void PG::scrub_send_replmaps_ready(epoch_t epoch_queued,
2181 [[maybe_unused]] ThreadPool::TPHandle& handle)
2182{
2183 dout(15) << __func__ << " queued at: " << epoch_queued << dendl;
2184 forward_scrub_event(&ScrubPgIF::send_replica_maps_ready, epoch_queued);
2185}
7c673cae 2186
f67539c2
TL
2187bool PG::ops_blocked_by_scrub() const
2188{
2189 return !waiting_for_scrub.empty();
2190}
11fdf7f2 2191
f67539c2
TL
2192Scrub::scrub_prio_t PG::is_scrub_blocking_ops() const
2193{
2194 return waiting_for_scrub.empty() ? Scrub::scrub_prio_t::low_priority
2195 : Scrub::scrub_prio_t::high_priority;
7c673cae
FG
2196}
2197
9f95a23c 2198bool PG::old_peering_msg(epoch_t reply_epoch, epoch_t query_epoch)
7c673cae 2199{
f67539c2
TL
2200 if (auto last_reset = get_last_peering_reset();
2201 last_reset > reply_epoch || last_reset > query_epoch) {
2202 dout(10) << "old_peering_msg reply_epoch " << reply_epoch << " query_epoch "
2203 << query_epoch << " last_peering_reset " << last_reset << dendl;
9f95a23c
TL
2204 return true;
2205 }
2206 return false;
7c673cae
FG
2207}
2208
9f95a23c
TL
2209struct FlushState {
2210 PGRef pg;
2211 epoch_t epoch;
2212 FlushState(PG *pg, epoch_t epoch) : pg(pg), epoch(epoch) {}
2213 ~FlushState() {
2214 std::scoped_lock l{*pg};
2215 if (!pg->pg_has_reset_since(epoch)) {
2216 pg->recovery_state.complete_flush();
2217 }
7c673cae 2218 }
9f95a23c
TL
2219};
2220typedef std::shared_ptr<FlushState> FlushStateRef;
7c673cae 2221
9f95a23c 2222void PG::start_flush_on_transaction(ObjectStore::Transaction &t)
7c673cae 2223{
9f95a23c
TL
2224 // flush in progress ops
2225 FlushStateRef flush_trigger (std::make_shared<FlushState>(
2226 this, get_osdmap_epoch()));
2227 t.register_on_applied(new ContainerContext<FlushStateRef>(flush_trigger));
2228 t.register_on_commit(new ContainerContext<FlushStateRef>(flush_trigger));
7c673cae
FG
2229}
2230
9f95a23c 2231bool PG::try_flush_or_schedule_async()
11fdf7f2 2232{
9f95a23c
TL
2233 Context *c = new QueuePeeringEvt(
2234 this, get_osdmap_epoch(), PeeringState::IntervalFlush());
2235 if (!ch->flush_commit(c)) {
2236 return false;
2237 } else {
2238 delete c;
2239 return true;
2240 }
11fdf7f2
TL
2241}
2242
9f95a23c 2243ostream& operator<<(ostream& out, const PG& pg)
11fdf7f2 2244{
9f95a23c 2245 out << pg.recovery_state;
f67539c2
TL
2246
2247 // listing all scrub-related flags - both current and "planned next scrub"
2248 if (pg.is_scrubbing()) {
2249 out << *pg.m_scrubber;
2250 }
2251 out << pg.m_planned_scrub;
11fdf7f2 2252
9f95a23c
TL
2253 if (pg.recovery_ops_active)
2254 out << " rops=" << pg.recovery_ops_active;
11fdf7f2 2255
9f95a23c
TL
2256 //out << " (" << pg.pg_log.get_tail() << "," << pg.pg_log.get_head() << "]";
2257 if (pg.recovery_state.have_missing()) {
2258 out << " m=" << pg.recovery_state.get_num_missing();
2259 if (pg.is_primary()) {
2260 uint64_t unfound = pg.recovery_state.get_num_unfound();
2261 if (unfound)
2262 out << " u=" << unfound;
2263 }
2264 }
2265 if (!pg.is_clean()) {
2266 out << " mbc=" << pg.recovery_state.get_missing_by_count();
2267 }
2268 if (!pg.snap_trimq.empty()) {
2269 out << " trimq=";
2270 // only show a count if the set is large
2271 if (pg.snap_trimq.num_intervals() > 16) {
2272 out << pg.snap_trimq.size();
2273 if (!pg.snap_trimq_repeat.empty()) {
2274 out << "(" << pg.snap_trimq_repeat.size() << ")";
2275 }
2276 } else {
2277 out << pg.snap_trimq;
2278 if (!pg.snap_trimq_repeat.empty()) {
2279 out << "(" << pg.snap_trimq_repeat << ")";
2280 }
2281 }
2282 }
2283 if (!pg.recovery_state.get_info().purged_snaps.empty()) {
2284 out << " ps="; // snap trim queue / purged snaps
2285 if (pg.recovery_state.get_info().purged_snaps.num_intervals() > 16) {
2286 out << pg.recovery_state.get_info().purged_snaps.size();
2287 } else {
2288 out << pg.recovery_state.get_info().purged_snaps;
2289 }
11fdf7f2 2290 }
11fdf7f2 2291
9f95a23c 2292 out << "]";
9f95a23c 2293 return out;
11fdf7f2
TL
2294}
2295
9f95a23c 2296bool PG::can_discard_op(OpRequestRef& op)
7c673cae 2297{
9f95a23c
TL
2298 auto m = op->get_req<MOSDOp>();
2299 if (cct->_conf->osd_discard_disconnected_ops && OSD::op_is_discardable(m)) {
2300 dout(20) << " discard " << *m << dendl;
2301 return true;
2302 }
7c673cae 2303
9f95a23c
TL
2304 if (m->get_map_epoch() < info.history.same_primary_since) {
2305 dout(7) << " changed after " << m->get_map_epoch()
2306 << ", dropping " << *m << dendl;
2307 return true;
2308 }
7c673cae 2309
9f95a23c
TL
2310 if ((m->get_flags() & (CEPH_OSD_FLAG_BALANCE_READS |
2311 CEPH_OSD_FLAG_LOCALIZE_READS)) &&
2312 !is_primary() &&
2313 m->get_map_epoch() < info.history.same_interval_since) {
2314 // Note: the Objecter will resend on interval change without the primary
2315 // changing if it actually sent to a replica. If the primary hasn't
2316 // changed since the send epoch, we got it, and we're primary, it won't
2317 // have resent even if the interval did change as it sent it to the primary
2318 // (us).
2319 return true;
7c673cae 2320 }
7c673cae 2321
7c673cae 2322
9f95a23c
TL
2323 if (m->get_connection()->has_feature(CEPH_FEATURE_RESEND_ON_SPLIT)) {
2324 // >= luminous client
2325 if (m->get_connection()->has_feature(CEPH_FEATURE_SERVER_NAUTILUS)) {
2326 // >= nautilus client
2327 if (m->get_map_epoch() < pool.info.get_last_force_op_resend()) {
2328 dout(7) << __func__ << " sent before last_force_op_resend "
2329 << pool.info.last_force_op_resend
2330 << ", dropping" << *m << dendl;
2331 return true;
2332 }
2333 } else {
2334 // == < nautilus client (luminous or mimic)
2335 if (m->get_map_epoch() < pool.info.get_last_force_op_resend_prenautilus()) {
2336 dout(7) << __func__ << " sent before last_force_op_resend_prenautilus "
2337 << pool.info.last_force_op_resend_prenautilus
2338 << ", dropping" << *m << dendl;
2339 return true;
2340 }
7c673cae 2341 }
9f95a23c
TL
2342 if (m->get_map_epoch() < info.history.last_epoch_split) {
2343 dout(7) << __func__ << " pg split in "
2344 << info.history.last_epoch_split << ", dropping" << dendl;
2345 return true;
7c673cae 2346 }
9f95a23c
TL
2347 } else if (m->get_connection()->has_feature(CEPH_FEATURE_OSD_POOLRESEND)) {
2348 // < luminous client
2349 if (m->get_map_epoch() < pool.info.get_last_force_op_resend_preluminous()) {
2350 dout(7) << __func__ << " sent before last_force_op_resend_preluminous "
2351 << pool.info.last_force_op_resend_preluminous
2352 << ", dropping" << *m << dendl;
2353 return true;
7c673cae
FG
2354 }
2355 }
2356
9f95a23c 2357 return false;
7c673cae
FG
2358}
2359
9f95a23c
TL
2360template<typename T, int MSGTYPE>
2361bool PG::can_discard_replica_op(OpRequestRef& op)
7c673cae 2362{
9f95a23c
TL
2363 auto m = op->get_req<T>();
2364 ceph_assert(m->get_type() == MSGTYPE);
7c673cae 2365
9f95a23c 2366 int from = m->get_source().num();
7c673cae 2367
9f95a23c
TL
2368 // if a repop is replied after a replica goes down in a new osdmap, and
2369 // before the pg advances to this new osdmap, the repop replies before this
2370 // repop can be discarded by that replica OSD, because the primary resets the
2371 // connection to it when handling the new osdmap marking it down, and also
2372 // resets the messenger sesssion when the replica reconnects. to avoid the
2373 // out-of-order replies, the messages from that replica should be discarded.
2374 OSDMapRef next_map = osd->get_next_osdmap();
f67539c2
TL
2375 if (next_map->is_down(from)) {
2376 dout(20) << " " << __func__ << " dead for nextmap is down " << from << dendl;
9f95a23c 2377 return true;
f67539c2 2378 }
9f95a23c
TL
2379 /* Mostly, this overlaps with the old_peering_msg
2380 * condition. An important exception is pushes
2381 * sent by replicas not in the acting set, since
2382 * if such a replica goes down it does not cause
2383 * a new interval. */
f67539c2
TL
2384 if (next_map->get_down_at(from) >= m->map_epoch) {
2385 dout(20) << " " << __func__ << " dead for 'get_down_at' " << from << dendl;
9f95a23c 2386 return true;
f67539c2 2387 }
7c673cae 2388
9f95a23c
TL
2389 // same pg?
2390 // if pg changes _at all_, we reset and repeer!
2391 if (old_peering_msg(m->map_epoch, m->map_epoch)) {
2392 dout(10) << "can_discard_replica_op pg changed " << info.history
2393 << " after " << m->map_epoch
2394 << ", dropping" << dendl;
2395 return true;
7c673cae 2396 }
9f95a23c 2397 return false;
7c673cae
FG
2398}
2399
9f95a23c 2400bool PG::can_discard_scan(OpRequestRef op)
7c673cae 2401{
9f95a23c
TL
2402 auto m = op->get_req<MOSDPGScan>();
2403 ceph_assert(m->get_type() == MSG_OSD_PG_SCAN);
7c673cae 2404
9f95a23c
TL
2405 if (old_peering_msg(m->map_epoch, m->query_epoch)) {
2406 dout(10) << " got old scan, ignoring" << dendl;
2407 return true;
7c673cae 2408 }
9f95a23c 2409 return false;
7c673cae
FG
2410}
2411
9f95a23c 2412bool PG::can_discard_backfill(OpRequestRef op)
7c673cae 2413{
9f95a23c
TL
2414 auto m = op->get_req<MOSDPGBackfill>();
2415 ceph_assert(m->get_type() == MSG_OSD_PG_BACKFILL);
7c673cae 2416
9f95a23c
TL
2417 if (old_peering_msg(m->map_epoch, m->query_epoch)) {
2418 dout(10) << " got old backfill, ignoring" << dendl;
2419 return true;
7c673cae
FG
2420 }
2421
9f95a23c 2422 return false;
7c673cae 2423
7c673cae
FG
2424}
2425
9f95a23c 2426bool PG::can_discard_request(OpRequestRef& op)
7c673cae 2427{
9f95a23c
TL
2428 switch (op->get_req()->get_type()) {
2429 case CEPH_MSG_OSD_OP:
2430 return can_discard_op(op);
2431 case CEPH_MSG_OSD_BACKOFF:
2432 return false; // never discard
2433 case MSG_OSD_REPOP:
2434 return can_discard_replica_op<MOSDRepOp, MSG_OSD_REPOP>(op);
2435 case MSG_OSD_PG_PUSH:
2436 return can_discard_replica_op<MOSDPGPush, MSG_OSD_PG_PUSH>(op);
2437 case MSG_OSD_PG_PULL:
2438 return can_discard_replica_op<MOSDPGPull, MSG_OSD_PG_PULL>(op);
2439 case MSG_OSD_PG_PUSH_REPLY:
2440 return can_discard_replica_op<MOSDPGPushReply, MSG_OSD_PG_PUSH_REPLY>(op);
2441 case MSG_OSD_REPOPREPLY:
2442 return can_discard_replica_op<MOSDRepOpReply, MSG_OSD_REPOPREPLY>(op);
2443 case MSG_OSD_PG_RECOVERY_DELETE:
2444 return can_discard_replica_op<MOSDPGRecoveryDelete, MSG_OSD_PG_RECOVERY_DELETE>(op);
7c673cae 2445
9f95a23c
TL
2446 case MSG_OSD_PG_RECOVERY_DELETE_REPLY:
2447 return can_discard_replica_op<MOSDPGRecoveryDeleteReply, MSG_OSD_PG_RECOVERY_DELETE_REPLY>(op);
7c673cae 2448
9f95a23c
TL
2449 case MSG_OSD_EC_WRITE:
2450 return can_discard_replica_op<MOSDECSubOpWrite, MSG_OSD_EC_WRITE>(op);
2451 case MSG_OSD_EC_WRITE_REPLY:
2452 return can_discard_replica_op<MOSDECSubOpWriteReply, MSG_OSD_EC_WRITE_REPLY>(op);
2453 case MSG_OSD_EC_READ:
2454 return can_discard_replica_op<MOSDECSubOpRead, MSG_OSD_EC_READ>(op);
2455 case MSG_OSD_EC_READ_REPLY:
2456 return can_discard_replica_op<MOSDECSubOpReadReply, MSG_OSD_EC_READ_REPLY>(op);
2457 case MSG_OSD_REP_SCRUB:
2458 return can_discard_replica_op<MOSDRepScrub, MSG_OSD_REP_SCRUB>(op);
2459 case MSG_OSD_SCRUB_RESERVE:
2460 return can_discard_replica_op<MOSDScrubReserve, MSG_OSD_SCRUB_RESERVE>(op);
2461 case MSG_OSD_REP_SCRUBMAP:
2462 return can_discard_replica_op<MOSDRepScrubMap, MSG_OSD_REP_SCRUBMAP>(op);
2463 case MSG_OSD_PG_UPDATE_LOG_MISSING:
2464 return can_discard_replica_op<
2465 MOSDPGUpdateLogMissing, MSG_OSD_PG_UPDATE_LOG_MISSING>(op);
2466 case MSG_OSD_PG_UPDATE_LOG_MISSING_REPLY:
2467 return can_discard_replica_op<
2468 MOSDPGUpdateLogMissingReply, MSG_OSD_PG_UPDATE_LOG_MISSING_REPLY>(op);
2469
2470 case MSG_OSD_PG_SCAN:
2471 return can_discard_scan(op);
2472 case MSG_OSD_PG_BACKFILL:
2473 return can_discard_backfill(op);
2474 case MSG_OSD_PG_BACKFILL_REMOVE:
2475 return can_discard_replica_op<MOSDPGBackfillRemove,
2476 MSG_OSD_PG_BACKFILL_REMOVE>(op);
7c673cae 2477 }
9f95a23c 2478 return true;
7c673cae
FG
2479}
2480
9f95a23c 2481void PG::do_peering_event(PGPeeringEventRef evt, PeeringCtx &rctx)
7c673cae 2482{
9f95a23c
TL
2483 dout(10) << __func__ << ": " << evt->get_desc() << dendl;
2484 ceph_assert(have_same_or_newer_map(evt->get_epoch_sent()));
2485 if (old_peering_evt(evt)) {
2486 dout(10) << "discard old " << evt->get_desc() << dendl;
2487 } else {
2488 recovery_state.handle_event(evt, &rctx);
7c673cae 2489 }
9f95a23c
TL
2490 // write_if_dirty regardless of path above to ensure we capture any work
2491 // done by OSD::advance_pg().
2492 write_if_dirty(rctx.transaction);
7c673cae
FG
2493}
2494
9f95a23c 2495void PG::queue_peering_event(PGPeeringEventRef evt)
7c673cae 2496{
9f95a23c
TL
2497 if (old_peering_evt(evt))
2498 return;
2499 osd->osd->enqueue_peering_evt(info.pgid, evt);
7c673cae
FG
2500}
2501
9f95a23c
TL
2502void PG::queue_null(epoch_t msg_epoch,
2503 epoch_t query_epoch)
7c673cae 2504{
9f95a23c
TL
2505 dout(10) << "null" << dendl;
2506 queue_peering_event(
2507 PGPeeringEventRef(std::make_shared<PGPeeringEvent>(msg_epoch, query_epoch,
2508 NullEvt())));
7c673cae
FG
2509}
2510
9f95a23c 2511void PG::find_unfound(epoch_t queued, PeeringCtx &rctx)
7c673cae 2512{
9f95a23c
TL
2513 /*
2514 * if we couldn't start any recovery ops and things are still
2515 * unfound, see if we can discover more missing object locations.
2516 * It may be that our initial locations were bad and we errored
2517 * out while trying to pull.
2518 */
2519 if (!recovery_state.discover_all_missing(rctx)) {
2520 string action;
2521 if (state_test(PG_STATE_BACKFILLING)) {
2522 auto evt = PGPeeringEventRef(
2523 new PGPeeringEvent(
2524 queued,
2525 queued,
2526 PeeringState::UnfoundBackfill()));
2527 queue_peering_event(evt);
2528 action = "in backfill";
2529 } else if (state_test(PG_STATE_RECOVERING)) {
2530 auto evt = PGPeeringEventRef(
2531 new PGPeeringEvent(
2532 queued,
2533 queued,
2534 PeeringState::UnfoundRecovery()));
2535 queue_peering_event(evt);
2536 action = "in recovery";
2537 } else {
2538 action = "already out of recovery/backfill";
7c673cae 2539 }
9f95a23c
TL
2540 dout(10) << __func__ << ": no luck, giving up on this pg for now (" << action << ")" << dendl;
2541 } else {
2542 dout(10) << __func__ << ": no luck, giving up on this pg for now (queue_recovery)" << dendl;
2543 queue_recovery();
7c673cae 2544 }
7c673cae
FG
2545}
2546
9f95a23c
TL
2547void PG::handle_advance_map(
2548 OSDMapRef osdmap, OSDMapRef lastmap,
2549 vector<int>& newup, int up_primary,
2550 vector<int>& newacting, int acting_primary,
2551 PeeringCtx &rctx)
7c673cae 2552{
9f95a23c
TL
2553 dout(10) << __func__ << ": " << osdmap->get_epoch() << dendl;
2554 osd_shard->update_pg_epoch(pg_slot, osdmap->get_epoch());
2555 recovery_state.advance_map(
2556 osdmap,
2557 lastmap,
2558 newup,
2559 up_primary,
2560 newacting,
2561 acting_primary,
2562 rctx);
7c673cae
FG
2563}
2564
9f95a23c 2565void PG::handle_activate_map(PeeringCtx &rctx)
7c673cae 2566{
9f95a23c
TL
2567 dout(10) << __func__ << ": " << get_osdmap()->get_epoch()
2568 << dendl;
2569 recovery_state.activate_map(rctx);
7c673cae 2570
9f95a23c 2571 requeue_map_waiters();
7c673cae
FG
2572}
2573
9f95a23c 2574void PG::handle_initialize(PeeringCtx &rctx)
7c673cae 2575{
9f95a23c
TL
2576 dout(10) << __func__ << dendl;
2577 PeeringState::Initialize evt;
2578 recovery_state.handle_event(evt, &rctx);
7c673cae
FG
2579}
2580
f67539c2 2581
9f95a23c 2582void PG::handle_query_state(Formatter *f)
7c673cae 2583{
9f95a23c
TL
2584 dout(10) << "handle_query_state" << dendl;
2585 PeeringState::QueryState q(f);
2586 recovery_state.handle_event(q, 0);
7c673cae 2587
f67539c2
TL
2588 // This code has moved to after the close of recovery_state array.
2589 // I don't think that scrub is a recovery state
2590 if (is_primary() && is_active() && m_scrubber && m_scrubber->is_scrub_active()) {
2591 m_scrubber->handle_query_state(f);
9f95a23c 2592 }
7c673cae
FG
2593}
2594
9f95a23c 2595void PG::init_collection_pool_opts()
11fdf7f2 2596{
9f95a23c
TL
2597 auto r = osd->store->set_collection_opts(ch, pool.info.opts);
2598 if (r < 0 && r != -EOPNOTSUPP) {
2599 derr << __func__ << " set_collection_opts returns error:" << r << dendl;
11fdf7f2 2600 }
11fdf7f2
TL
2601}
2602
9f95a23c 2603void PG::on_pool_change()
7c673cae 2604{
9f95a23c
TL
2605 init_collection_pool_opts();
2606 plpg_on_pool_change();
7c673cae
FG
2607}
2608
9f95a23c
TL
2609void PG::C_DeleteMore::complete(int r) {
2610 ceph_assert(r == 0);
2611 pg->lock();
2612 if (!pg->pg_has_reset_since(epoch)) {
2613 pg->osd->queue_for_pg_delete(pg->get_pgid(), epoch);
7c673cae 2614 }
9f95a23c
TL
2615 pg->unlock();
2616 delete this;
7c673cae
FG
2617}
2618
f67539c2
TL
2619std::pair<ghobject_t, bool> PG::do_delete_work(
2620 ObjectStore::Transaction &t,
2621 ghobject_t _next)
7c673cae 2622{
9f95a23c 2623 dout(10) << __func__ << dendl;
7c673cae 2624
9f95a23c
TL
2625 {
2626 float osd_delete_sleep = osd->osd->get_osd_delete_sleep();
2627 if (osd_delete_sleep > 0 && delete_needs_sleep) {
2628 epoch_t e = get_osdmap()->get_epoch();
2629 PGRef pgref(this);
2630 auto delete_requeue_callback = new LambdaContext([this, pgref, e](int r) {
2631 dout(20) << __func__ << " wake up at "
2632 << ceph_clock_now()
2633 << ", re-queuing delete" << dendl;
2634 std::scoped_lock locker{*this};
2635 delete_needs_sleep = false;
2636 if (!pg_has_reset_since(e)) {
2637 osd->queue_for_pg_delete(get_pgid(), e);
2638 }
2639 });
7c673cae 2640
9f95a23c
TL
2641 auto delete_schedule_time = ceph::real_clock::now();
2642 delete_schedule_time += ceph::make_timespan(osd_delete_sleep);
2643 std::lock_guard l{osd->sleep_lock};
2644 osd->sleep_timer.add_event_at(delete_schedule_time,
2645 delete_requeue_callback);
2646 dout(20) << __func__ << " Delete scheduled at " << delete_schedule_time << dendl;
f67539c2 2647 return std::make_pair(_next, true);
9f95a23c
TL
2648 }
2649 }
7c673cae 2650
9f95a23c 2651 delete_needs_sleep = true;
7c673cae 2652
adb31ebb
TL
2653 ghobject_t next;
2654
9f95a23c
TL
2655 vector<ghobject_t> olist;
2656 int max = std::min(osd->store->get_ideal_list_max(),
2657 (int)cct->_conf->osd_target_transaction_size);
adb31ebb 2658
9f95a23c
TL
2659 osd->store->collection_list(
2660 ch,
adb31ebb 2661 _next,
9f95a23c
TL
2662 ghobject_t::get_max(),
2663 max,
2664 &olist,
2665 &next);
2666 dout(20) << __func__ << " " << olist << dendl;
7c673cae 2667
adb31ebb
TL
2668 // make sure we've removed everything
2669 // by one more listing from the beginning
2670 if (_next != ghobject_t() && olist.empty()) {
2671 next = ghobject_t();
2672 osd->store->collection_list(
2673 ch,
2674 next,
2675 ghobject_t::get_max(),
2676 max,
2677 &olist,
2678 &next);
2679 if (!olist.empty()) {
b3b6e05e
TL
2680 for (auto& oid : olist) {
2681 if (oid == pgmeta_oid) {
2682 dout(20) << __func__ << " removing pgmeta object " << oid << dendl;
2683 } else {
2684 dout(0) << __func__ << " additional unexpected onode"
2685 <<" new onode has appeared since PG removal started"
2686 << oid << dendl;
2687 }
2688 }
adb31ebb
TL
2689 }
2690 }
2691
9f95a23c
TL
2692 OSDriver::OSTransaction _t(osdriver.get_transaction(&t));
2693 int64_t num = 0;
2694 for (auto& oid : olist) {
2695 if (oid == pgmeta_oid) {
7c673cae
FG
2696 continue;
2697 }
9f95a23c
TL
2698 if (oid.is_pgmeta()) {
2699 osd->clog->warn() << info.pgid << " found stray pgmeta-like " << oid
2700 << " during PG removal";
7c673cae 2701 }
9f95a23c
TL
2702 int r = snap_mapper.remove_oid(oid.hobj, &_t);
2703 if (r != 0 && r != -ENOENT) {
2704 ceph_abort();
7c673cae 2705 }
9f95a23c
TL
2706 t.remove(coll, oid);
2707 ++num;
7c673cae 2708 }
f67539c2 2709 bool running = true;
9f95a23c
TL
2710 if (num) {
2711 dout(20) << __func__ << " deleting " << num << " objects" << dendl;
2712 Context *fin = new C_DeleteMore(this, get_osdmap_epoch());
2713 t.register_on_commit(fin);
7c673cae 2714 } else {
9f95a23c
TL
2715 if (cct->_conf->osd_inject_failure_on_pg_removal) {
2716 _exit(1);
7c673cae 2717 }
7c673cae 2718
9f95a23c
TL
2719 // final flush here to ensure completions drop refs. Of particular concern
2720 // are the SnapMapper ContainerContexts.
2721 {
2722 PGRef pgref(this);
2723 PGLog::clear_info_log(info.pgid, &t);
2724 t.remove_collection(coll);
2725 t.register_on_commit(new ContainerContext<PGRef>(pgref));
2726 t.register_on_applied(new ContainerContext<PGRef>(pgref));
2727 osd->store->queue_transaction(ch, std::move(t));
7c673cae 2728 }
9f95a23c 2729 ch->flush();
7c673cae 2730
9f95a23c
TL
2731 if (!osd->try_finish_pg_delete(this, pool.info.get_pg_num())) {
2732 dout(1) << __func__ << " raced with merge, reinstantiating" << dendl;
2733 ch = osd->store->create_new_collection(coll);
2734 create_pg_collection(t,
2735 info.pgid,
2736 info.pgid.get_split_bits(pool.info.get_pg_num()));
2737 init_pg_ondisk(t, info.pgid, &pool.info);
2738 recovery_state.reset_last_persisted();
2739 } else {
2740 recovery_state.set_delete_complete();
7c673cae 2741
9f95a23c
TL
2742 // cancel reserver here, since the PG is about to get deleted and the
2743 // exit() methods don't run when that happens.
2744 osd->local_reserver.cancel_reservation(info.pgid);
7c673cae 2745
f67539c2 2746 running = false;
9f95a23c 2747 }
7c673cae 2748 }
f67539c2 2749 return {next, running};
7c673cae
FG
2750}
2751
9f95a23c 2752int PG::pg_stat_adjust(osd_stat_t *ns)
7c673cae 2753{
9f95a23c
TL
2754 osd_stat_t &new_stat = *ns;
2755 if (is_primary()) {
2756 return 0;
7c673cae 2757 }
9f95a23c
TL
2758 // Adjust the kb_used by adding pending backfill data
2759 uint64_t reserved_num_bytes = get_reserved_num_bytes();
7c673cae 2760
9f95a23c
TL
2761 // For now we don't consider projected space gains here
2762 // I suggest we have an optional 2 pass backfill that frees up
2763 // space in a first pass. This could be triggered when at nearfull
2764 // or near to backfillfull.
2765 if (reserved_num_bytes > 0) {
2766 // TODO: Handle compression by adjusting by the PGs average
2767 // compression precentage.
2768 dout(20) << __func__ << " reserved_num_bytes " << (reserved_num_bytes >> 10) << "KiB"
2769 << " Before kb_used " << new_stat.statfs.kb_used() << "KiB" << dendl;
2770 if (new_stat.statfs.available > reserved_num_bytes)
2771 new_stat.statfs.available -= reserved_num_bytes;
2772 else
2773 new_stat.statfs.available = 0;
2774 dout(20) << __func__ << " After kb_used " << new_stat.statfs.kb_used() << "KiB" << dendl;
2775 return 1;
7c673cae 2776 }
9f95a23c 2777 return 0;
7c673cae
FG
2778}
2779
11fdf7f2
TL
2780void PG::dump_pgstate_history(Formatter *f)
2781{
9f95a23c
TL
2782 std::scoped_lock l{*this};
2783 recovery_state.dump_history(f);
11fdf7f2 2784}
7c673cae 2785
11fdf7f2
TL
2786void PG::dump_missing(Formatter *f)
2787{
9f95a23c 2788 for (auto& i : recovery_state.get_pg_log().get_missing().get_items()) {
11fdf7f2
TL
2789 f->open_object_section("object");
2790 f->dump_object("oid", i.first);
2791 f->dump_object("missing_info", i.second);
9f95a23c
TL
2792 if (recovery_state.get_missing_loc().needs_recovery(i.first)) {
2793 f->dump_bool(
2794 "unfound",
2795 recovery_state.get_missing_loc().is_unfound(i.first));
11fdf7f2 2796 f->open_array_section("locations");
9f95a23c 2797 for (auto l : recovery_state.get_missing_loc().get_locations(i.first)) {
11fdf7f2
TL
2798 f->dump_object("shard", l);
2799 }
2800 f->close_section();
2801 }
2802 f->close_section();
2803 }
2804}
2805
2806void PG::get_pg_stats(std::function<void(const pg_stat_t&, epoch_t lec)> f)
2807{
9f95a23c 2808 std::lock_guard l{pg_stats_publish_lock};
11fdf7f2
TL
2809 if (pg_stats_publish_valid) {
2810 f(pg_stats_publish, pg_stats_publish.get_effective_last_epoch_clean());
2811 }
11fdf7f2
TL
2812}
2813
2814void PG::with_heartbeat_peers(std::function<void(int)> f)
2815{
9f95a23c 2816 std::lock_guard l{heartbeat_peer_lock};
11fdf7f2
TL
2817 for (auto p : heartbeat_peers) {
2818 f(p);
2819 }
2820 for (auto p : probe_targets) {
2821 f(p);
2822 }
9f95a23c
TL
2823}
2824
2825uint64_t PG::get_min_alloc_size() const {
2826 return osd->store->get_min_alloc_size();
11fdf7f2 2827}