]> git.proxmox.com Git - ceph.git/blame - ceph/src/osd/PG.cc
bump version to 15.2.4-pve1
[ceph.git] / ceph / src / osd / PG.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15#include "PG.h"
16// #include "msg/Messenger.h"
17#include "messages/MOSDRepScrub.h"
18// #include "common/cmdparse.h"
19// #include "common/ceph_context.h"
20
21#include "common/errno.h"
9f95a23c 22#include "common/ceph_releases.h"
7c673cae
FG
23#include "common/config.h"
24#include "OSD.h"
25#include "OpRequest.h"
26#include "ScrubStore.h"
27#include "Session.h"
9f95a23c 28#include "osd/scheduler/OpSchedulerItem.h"
7c673cae
FG
29
30#include "common/Timer.h"
31#include "common/perf_counters.h"
32
33#include "messages/MOSDOp.h"
34#include "messages/MOSDPGNotify.h"
35// #include "messages/MOSDPGLog.h"
7c673cae 36#include "messages/MOSDPGInfo.h"
7c673cae
FG
37#include "messages/MOSDPGScan.h"
38#include "messages/MOSDPGBackfill.h"
39#include "messages/MOSDPGBackfillRemove.h"
40#include "messages/MBackfillReserve.h"
41#include "messages/MRecoveryReserve.h"
42#include "messages/MOSDPGPush.h"
43#include "messages/MOSDPGPushReply.h"
44#include "messages/MOSDPGPull.h"
45#include "messages/MOSDECSubOpWrite.h"
46#include "messages/MOSDECSubOpWriteReply.h"
47#include "messages/MOSDECSubOpRead.h"
48#include "messages/MOSDECSubOpReadReply.h"
49#include "messages/MOSDPGUpdateLogMissing.h"
50#include "messages/MOSDPGUpdateLogMissingReply.h"
51#include "messages/MOSDBackoff.h"
52#include "messages/MOSDScrubReserve.h"
7c673cae 53#include "messages/MOSDRepOp.h"
7c673cae
FG
54#include "messages/MOSDRepOpReply.h"
55#include "messages/MOSDRepScrubMap.h"
c07f9fc5
FG
56#include "messages/MOSDPGRecoveryDelete.h"
57#include "messages/MOSDPGRecoveryDeleteReply.h"
7c673cae
FG
58
59#include "common/BackTrace.h"
60#include "common/EventTrace.h"
61
62#ifdef WITH_LTTNG
63#define TRACEPOINT_DEFINE
64#define TRACEPOINT_PROBE_DYNAMIC_LINKAGE
65#include "tracing/pg.h"
66#undef TRACEPOINT_PROBE_DYNAMIC_LINKAGE
67#undef TRACEPOINT_DEFINE
68#else
69#define tracepoint(...)
70#endif
71
72#include <sstream>
73
74#define dout_context cct
75#define dout_subsys ceph_subsys_osd
76#undef dout_prefix
77#define dout_prefix _prefix(_dout, this)
78
9f95a23c 79using namespace ceph::osd::scheduler;
7c673cae
FG
80
81template <class T>
82static ostream& _prefix(std::ostream *_dout, T *t)
83{
11fdf7f2 84 return t->gen_prefix(*_dout);
7c673cae
FG
85}
86
7c673cae
FG
87void PG::get(const char* tag)
88{
11fdf7f2
TL
89 int after = ++ref;
90 lgeneric_subdout(cct, refs, 5) << "PG::get " << this << " "
91 << "tag " << (tag ? tag : "(none") << " "
92 << (after - 1) << " -> " << after << dendl;
7c673cae 93#ifdef PG_DEBUG_REFS
11fdf7f2 94 std::lock_guard l(_ref_id_lock);
7c673cae
FG
95 _tag_counts[tag]++;
96#endif
97}
98
99void PG::put(const char* tag)
100{
101#ifdef PG_DEBUG_REFS
102 {
11fdf7f2 103 std::lock_guard l(_ref_id_lock);
7c673cae 104 auto tag_counts_entry = _tag_counts.find(tag);
11fdf7f2 105 ceph_assert(tag_counts_entry != _tag_counts.end());
7c673cae
FG
106 --tag_counts_entry->second;
107 if (tag_counts_entry->second == 0) {
108 _tag_counts.erase(tag_counts_entry);
109 }
110 }
111#endif
11fdf7f2
TL
112 auto local_cct = cct;
113 int after = --ref;
114 lgeneric_subdout(local_cct, refs, 5) << "PG::put " << this << " "
115 << "tag " << (tag ? tag : "(none") << " "
116 << (after + 1) << " -> " << after
117 << dendl;
118 if (after == 0)
7c673cae
FG
119 delete this;
120}
121
122#ifdef PG_DEBUG_REFS
123uint64_t PG::get_with_id()
124{
125 ref++;
11fdf7f2 126 std::lock_guard l(_ref_id_lock);
7c673cae
FG
127 uint64_t id = ++_ref_id;
128 BackTrace bt(0);
129 stringstream ss;
130 bt.print(ss);
11fdf7f2
TL
131 lgeneric_subdout(cct, refs, 5) << "PG::get " << this << " " << info.pgid
132 << " got id " << id << " "
133 << (ref - 1) << " -> " << ref
134 << dendl;
135 ceph_assert(!_live_ids.count(id));
7c673cae
FG
136 _live_ids.insert(make_pair(id, ss.str()));
137 return id;
138}
139
140void PG::put_with_id(uint64_t id)
141{
11fdf7f2
TL
142 int newref = --ref;
143 lgeneric_subdout(cct, refs, 5) << "PG::put " << this << " " << info.pgid
144 << " put id " << id << " "
145 << (newref + 1) << " -> " << newref
146 << dendl;
7c673cae 147 {
11fdf7f2
TL
148 std::lock_guard l(_ref_id_lock);
149 ceph_assert(_live_ids.count(id));
7c673cae
FG
150 _live_ids.erase(id);
151 }
11fdf7f2 152 if (newref)
7c673cae
FG
153 delete this;
154}
155
156void PG::dump_live_ids()
157{
11fdf7f2 158 std::lock_guard l(_ref_id_lock);
7c673cae
FG
159 dout(0) << "\t" << __func__ << ": " << info.pgid << " live ids:" << dendl;
160 for (map<uint64_t, string>::iterator i = _live_ids.begin();
161 i != _live_ids.end();
162 ++i) {
163 dout(0) << "\t\tid: " << *i << dendl;
164 }
165 dout(0) << "\t" << __func__ << ": " << info.pgid << " live tags:" << dendl;
166 for (map<string, uint64_t>::iterator i = _tag_counts.begin();
167 i != _tag_counts.end();
168 ++i) {
169 dout(0) << "\t\tid: " << *i << dendl;
170 }
171}
172#endif
173
7c673cae
FG
174PG::PG(OSDService *o, OSDMapRef curmap,
175 const PGPool &_pool, spg_t p) :
9f95a23c 176 pg_whoami(o->whoami, p.shard),
11fdf7f2
TL
177 pg_id(p),
178 coll(p),
7c673cae
FG
179 osd(o),
180 cct(o->cct),
181 osdriver(osd->store, coll_t(), OSD::make_snapmapper_oid()),
182 snap_mapper(
183 cct,
184 &osdriver,
185 p.ps(),
11fdf7f2 186 p.get_split_bits(_pool.info.get_pg_num()),
7c673cae
FG
187 _pool.id,
188 p.shard),
7c673cae 189 trace_endpoint("0.0.0.0", 0, "PG"),
7c673cae 190 info_struct_v(0),
7c673cae 191 pgmeta_oid(p.make_pgmeta_oid()),
7c673cae
FG
192 stat_queue_item(this),
193 scrub_queued(false),
194 recovery_queued(false),
195 recovery_ops_active(0),
7c673cae 196 backfill_reserving(false),
7c673cae 197 pg_stats_publish_valid(false),
7c673cae 198 finish_sync_event(NULL),
7c673cae
FG
199 scrub_after_recovery(false),
200 active_pushes(0),
9f95a23c
TL
201 recovery_state(
202 o->cct,
203 pg_whoami,
204 p,
205 _pool,
206 curmap,
207 this,
208 this),
209 pool(recovery_state.get_pool()),
210 info(recovery_state.get_info())
7c673cae
FG
211{
212#ifdef PG_DEBUG_REFS
213 osd->add_pgid(p, this);
214#endif
215#ifdef WITH_BLKIN
216 std::stringstream ss;
217 ss << "PG " << info.pgid;
218 trace_endpoint.copy_name(ss.str());
219#endif
7c673cae
FG
220}
221
222PG::~PG()
223{
7c673cae
FG
224#ifdef PG_DEBUG_REFS
225 osd->remove_pgid(info.pgid, this);
226#endif
227}
228
7c673cae
FG
229void PG::lock(bool no_lockdep) const
230{
9f95a23c
TL
231#ifdef CEPH_DEBUG_MUTEX
232 _lock.lock(no_lockdep);
233#else
234 _lock.lock();
235 locked_by = std::this_thread::get_id();
236#endif
7c673cae 237 // if we have unrecorded dirty state with the lock dropped, there is a bug
9f95a23c 238 ceph_assert(!recovery_state.debug_has_dirty_state());
7c673cae
FG
239
240 dout(30) << "lock" << dendl;
241}
242
9f95a23c
TL
243bool PG::is_locked() const
244{
245 return ceph_mutex_is_locked(_lock);
246}
247
248void PG::unlock() const
249{
250 //generic_dout(0) << this << " " << info.pgid << " unlock" << dendl;
251 ceph_assert(!recovery_state.debug_has_dirty_state());
252#ifndef CEPH_DEBUG_MUTEX
253 locked_by = {};
254#endif
255 _lock.unlock();
256}
257
11fdf7f2 258std::ostream& PG::gen_prefix(std::ostream& out) const
7c673cae 259{
9f95a23c
TL
260 OSDMapRef mapref = recovery_state.get_osdmap();
261#ifdef CEPH_DEBUG_MUTEX
7c673cae 262 if (_lock.is_locked_by_me()) {
9f95a23c
TL
263#else
264 if (locked_by == std::this_thread::get_id()) {
265#endif
7c673cae
FG
266 out << "osd." << osd->whoami
267 << " pg_epoch: " << (mapref ? mapref->get_epoch():0)
268 << " " << *this << " ";
269 } else {
270 out << "osd." << osd->whoami
271 << " pg_epoch: " << (mapref ? mapref->get_epoch():0)
9f95a23c 272 << " pg[" << pg_id.pgid << "(unlocked)] ";
7c673cae 273 }
11fdf7f2 274 return out;
7c673cae 275}
7c673cae 276
9f95a23c
TL
277PerfCounters &PG::get_peering_perf() {
278 return *(osd->recoverystate_perf);
7c673cae
FG
279}
280
9f95a23c
TL
281PerfCounters &PG::get_perf_logger() {
282 return *(osd->logger);
283}
7c673cae 284
9f95a23c
TL
285void PG::log_state_enter(const char *state) {
286 osd->pg_recovery_stats.log_enter(state);
287}
7c673cae 288
9f95a23c
TL
289void PG::log_state_exit(
290 const char *state_name, utime_t enter_time,
291 uint64_t events, utime_t event_dur) {
292 osd->pg_recovery_stats.log_exit(
293 state_name, ceph_clock_now() - enter_time, events, event_dur);
7c673cae 294}
9f95a23c
TL
295
296/********* PG **********/
7c673cae
FG
297
298void PG::remove_snap_mapped_object(
299 ObjectStore::Transaction &t, const hobject_t &soid)
300{
301 t.remove(
302 coll,
303 ghobject_t(soid, ghobject_t::NO_GEN, pg_whoami.shard));
304 clear_object_snap_mapping(&t, soid);
305}
306
307void PG::clear_object_snap_mapping(
308 ObjectStore::Transaction *t, const hobject_t &soid)
309{
310 OSDriver::OSTransaction _t(osdriver.get_transaction(t));
311 if (soid.snap < CEPH_MAXSNAP) {
312 int r = snap_mapper.remove_oid(
313 soid,
314 &_t);
315 if (!(r == 0 || r == -ENOENT)) {
316 derr << __func__ << ": remove_oid returned " << cpp_strerror(r) << dendl;
317 ceph_abort();
318 }
319 }
320}
321
322void PG::update_object_snap_mapping(
323 ObjectStore::Transaction *t, const hobject_t &soid, const set<snapid_t> &snaps)
324{
325 OSDriver::OSTransaction _t(osdriver.get_transaction(t));
11fdf7f2 326 ceph_assert(soid.snap < CEPH_MAXSNAP);
7c673cae
FG
327 int r = snap_mapper.remove_oid(
328 soid,
329 &_t);
330 if (!(r == 0 || r == -ENOENT)) {
331 derr << __func__ << ": remove_oid returned " << cpp_strerror(r) << dendl;
332 ceph_abort();
333 }
334 snap_mapper.add_oid(
335 soid,
336 snaps,
337 &_t);
338}
339
9f95a23c
TL
340/******* PG ***********/
341void PG::clear_primary_state()
7c673cae 342{
9f95a23c 343 projected_log = PGLog::IndexedLog();
7c673cae 344
9f95a23c
TL
345 snap_trimq.clear();
346 snap_trimq_repeat.clear();
347 finish_sync_event = 0; // so that _finish_recovery doesn't go off in another thread
348 release_pg_backoffs();
7c673cae 349
9f95a23c
TL
350 scrubber.reserved_peers.clear();
351 scrub_after_recovery = false;
352
353 agent_clear();
7c673cae
FG
354}
355
9f95a23c
TL
356PG::Scrubber::Scrubber()
357 : local_reserved(false), remote_reserved(false), reserve_failed(false),
358 epoch_start(0),
359 active(false),
360 shallow_errors(0), deep_errors(0), fixed(0),
361 must_scrub(false), must_deep_scrub(false), must_repair(false),
362 need_auto(false), time_for_deep(false),
363 auto_repair(false),
364 check_repair(false),
365 deep_scrub_on_error(false),
366 num_digest_updates_pending(0),
367 state(INACTIVE),
368 deep(false)
369{}
91327a77 370
9f95a23c 371PG::Scrubber::~Scrubber() {}
91327a77 372
9f95a23c
TL
373bool PG::op_has_sufficient_caps(OpRequestRef& op)
374{
375 // only check MOSDOp
376 if (op->get_req()->get_type() != CEPH_MSG_OSD_OP)
c07f9fc5 377 return true;
9f95a23c
TL
378
379 auto req = op->get_req<MOSDOp>();
380 auto priv = req->get_connection()->get_priv();
381 auto session = static_cast<Session*>(priv.get());
382 if (!session) {
383 dout(0) << "op_has_sufficient_caps: no session for op " << *req << dendl;
c07f9fc5 384 return false;
7c673cae 385 }
9f95a23c
TL
386 OSDCap& caps = session->caps;
387 priv.reset();
7c673cae 388
9f95a23c
TL
389 const string &key = req->get_hobj().get_key().empty() ?
390 req->get_oid().name :
391 req->get_hobj().get_key();
91327a77 392
9f95a23c
TL
393 bool cap = caps.is_capable(pool.name, req->get_hobj().nspace,
394 pool.info.application_metadata,
395 key,
396 op->need_read_cap(),
397 op->need_write_cap(),
398 op->classes(),
399 session->get_peer_socket_addr());
91327a77 400
9f95a23c
TL
401 dout(20) << "op_has_sufficient_caps "
402 << "session=" << session
403 << " pool=" << pool.id << " (" << pool.name
404 << " " << req->get_hobj().nspace
405 << ")"
406 << " pool_app_metadata=" << pool.info.application_metadata
407 << " need_read_cap=" << op->need_read_cap()
408 << " need_write_cap=" << op->need_write_cap()
409 << " classes=" << op->classes()
410 << " -> " << (cap ? "yes" : "NO")
411 << dendl;
412 return cap;
7c673cae
FG
413}
414
9f95a23c 415bool PG::requeue_scrub(bool high_priority)
7c673cae 416{
9f95a23c
TL
417 ceph_assert(ceph_mutex_is_locked(_lock));
418 if (scrub_queued) {
419 dout(10) << __func__ << ": already queued" << dendl;
420 return false;
421 } else {
422 dout(10) << __func__ << ": queueing" << dendl;
423 scrub_queued = true;
424 osd->queue_for_scrub(this, high_priority);
425 return true;
7c673cae 426 }
7c673cae
FG
427}
428
9f95a23c 429void PG::queue_recovery()
91327a77 430{
9f95a23c
TL
431 if (!is_primary() || !is_peered()) {
432 dout(10) << "queue_recovery -- not primary or not peered " << dendl;
433 ceph_assert(!recovery_queued);
434 } else if (recovery_queued) {
435 dout(10) << "queue_recovery -- already queued" << dendl;
91327a77 436 } else {
9f95a23c
TL
437 dout(10) << "queue_recovery -- queuing" << dendl;
438 recovery_queued = true;
439 osd->queue_for_recovery(this);
91327a77
AA
440 }
441}
9f95a23c
TL
442
443bool PG::queue_scrub()
7c673cae 444{
9f95a23c
TL
445 ceph_assert(ceph_mutex_is_locked(_lock));
446 if (is_scrubbing()) {
447 return false;
448 }
449 // An interrupted recovery repair could leave this set.
450 state_clear(PG_STATE_REPAIR);
451 if (scrubber.need_auto) {
452 scrubber.must_scrub = true;
453 scrubber.must_deep_scrub = true;
454 scrubber.auto_repair = true;
455 scrubber.need_auto = false;
456 }
457 scrubber.priority = scrubber.must_scrub ?
458 cct->_conf->osd_requested_scrub_priority : get_scrub_priority();
459 scrubber.must_scrub = false;
460 state_set(PG_STATE_SCRUBBING);
461 if (scrubber.must_deep_scrub) {
462 state_set(PG_STATE_DEEP_SCRUB);
463 scrubber.must_deep_scrub = false;
464 }
465 if (scrubber.must_repair || scrubber.auto_repair) {
466 state_set(PG_STATE_REPAIR);
467 scrubber.must_repair = false;
468 }
469 requeue_scrub();
470 return true;
471}
7c673cae 472
9f95a23c
TL
473unsigned PG::get_scrub_priority()
474{
475 // a higher value -> a higher priority
476 int64_t pool_scrub_priority = 0;
477 pool.info.opts.get(pool_opts_t::SCRUB_PRIORITY, &pool_scrub_priority);
478 return pool_scrub_priority > 0 ? pool_scrub_priority : cct->_conf->osd_scrub_priority;
479}
7c673cae 480
9f95a23c
TL
481Context *PG::finish_recovery()
482{
483 dout(10) << "finish_recovery" << dendl;
484 ceph_assert(info.last_complete == info.last_update);
7c673cae 485
9f95a23c 486 clear_recovery_state();
92f5a8d4 487
9f95a23c
TL
488 /*
489 * sync all this before purging strays. but don't block!
490 */
491 finish_sync_event = new C_PG_FinishRecovery(this);
492 return finish_sync_event;
7c673cae
FG
493}
494
9f95a23c 495void PG::_finish_recovery(Context *c)
7c673cae 496{
9f95a23c
TL
497 std::scoped_lock locker{*this};
498 if (recovery_state.is_deleting() || !is_clean()) {
499 dout(10) << __func__ << " raced with delete or repair" << dendl;
500 return;
7c673cae 501 }
9f95a23c
TL
502 // When recovery is initiated by a repair, that flag is left on
503 state_clear(PG_STATE_REPAIR);
504 if (c == finish_sync_event) {
505 dout(10) << "_finish_recovery" << dendl;
506 finish_sync_event = 0;
507 recovery_state.purge_strays();
7c673cae 508
9f95a23c
TL
509 publish_stats_to_osd();
510
511 if (scrub_after_recovery) {
512 dout(10) << "_finish_recovery requeueing for scrub" << dendl;
513 scrub_after_recovery = false;
514 scrubber.must_deep_scrub = true;
515 scrubber.check_repair = true;
516 queue_scrub();
7c673cae 517 }
9f95a23c
TL
518 } else {
519 dout(10) << "_finish_recovery -- stale" << dendl;
7c673cae 520 }
9f95a23c 521}
7c673cae 522
9f95a23c
TL
523void PG::start_recovery_op(const hobject_t& soid)
524{
525 dout(10) << "start_recovery_op " << soid
526#ifdef DEBUG_RECOVERY_OIDS
527 << " (" << recovering_oids << ")"
528#endif
529 << dendl;
530 ceph_assert(recovery_ops_active >= 0);
531 recovery_ops_active++;
532#ifdef DEBUG_RECOVERY_OIDS
533 recovering_oids.insert(soid);
534#endif
535 osd->start_recovery_op(this, soid);
7c673cae
FG
536}
537
9f95a23c 538void PG::finish_recovery_op(const hobject_t& soid, bool dequeue)
7c673cae 539{
9f95a23c
TL
540 dout(10) << "finish_recovery_op " << soid
541#ifdef DEBUG_RECOVERY_OIDS
542 << " (" << recovering_oids << ")"
543#endif
544 << dendl;
545 ceph_assert(recovery_ops_active > 0);
546 recovery_ops_active--;
547#ifdef DEBUG_RECOVERY_OIDS
548 ceph_assert(recovering_oids.count(soid));
549 recovering_oids.erase(recovering_oids.find(soid));
550#endif
551 osd->finish_recovery_op(this, soid, dequeue);
7c673cae 552
9f95a23c
TL
553 if (!dequeue) {
554 queue_recovery();
7c673cae 555 }
7c673cae
FG
556}
557
9f95a23c
TL
558void PG::split_into(pg_t child_pgid, PG *child, unsigned split_bits)
559{
560 recovery_state.split_into(child_pgid, &child->recovery_state, split_bits);
7c673cae 561
9f95a23c 562 child->update_snap_mapper_bits(split_bits);
7c673cae 563
9f95a23c
TL
564 child->snap_trimq = snap_trimq;
565 child->snap_trimq_repeat = snap_trimq_repeat;
566
567 _split_into(child_pgid, child, split_bits);
568
569 // release all backoffs for simplicity
570 release_backoffs(hobject_t(), hobject_t::get_max());
7c673cae
FG
571}
572
9f95a23c 573void PG::start_split_stats(const set<spg_t>& childpgs, vector<object_stat_sum_t> *out)
7c673cae 574{
9f95a23c 575 recovery_state.start_split_stats(childpgs, out);
7c673cae
FG
576}
577
9f95a23c
TL
578void PG::finish_split_stats(const object_stat_sum_t& stats, ObjectStore::Transaction &t)
579{
580 recovery_state.finish_split_stats(stats, t);
581}
582
583void PG::merge_from(map<spg_t,PGRef>& sources, PeeringCtx &rctx,
584 unsigned split_bits,
585 const pg_merge_meta_t& last_pg_merge_meta)
586{
587 dout(10) << __func__ << " from " << sources << " split_bits " << split_bits
588 << dendl;
589 map<spg_t, PeeringState*> source_ps;
590 for (auto &&source : sources) {
591 source_ps.emplace(source.first, &source.second->recovery_state);
592 }
593 recovery_state.merge_from(source_ps, rctx, split_bits, last_pg_merge_meta);
594
595 for (auto& i : sources) {
596 auto& source = i.second;
597 // wipe out source's pgmeta
598 rctx.transaction.remove(source->coll, source->pgmeta_oid);
599
600 // merge (and destroy source collection)
601 rctx.transaction.merge_collection(source->coll, coll, split_bits);
7c673cae
FG
602 }
603
9f95a23c
TL
604 // merge_collection does this, but maybe all of our sources were missing.
605 rctx.transaction.collection_set_bits(coll, split_bits);
606
607 snap_mapper.update_bits(split_bits);
7c673cae
FG
608}
609
9f95a23c 610void PG::add_backoff(const ceph::ref_t<Session>& s, const hobject_t& begin, const hobject_t& end)
7c673cae 611{
9f95a23c
TL
612 auto con = s->con;
613 if (!con) // OSD::ms_handle_reset clears s->con without a lock
614 return;
615 auto b = s->have_backoff(info.pgid, begin);
616 if (b) {
617 derr << __func__ << " already have backoff for " << s << " begin " << begin
618 << " " << *b << dendl;
619 ceph_abort();
7c673cae 620 }
9f95a23c
TL
621 std::lock_guard l(backoff_lock);
622 b = ceph::make_ref<Backoff>(info.pgid, this, s, ++s->backoff_seq, begin, end);
623 backoffs[begin].insert(b);
624 s->add_backoff(b);
625 dout(10) << __func__ << " session " << s << " added " << *b << dendl;
626 con->send_message(
627 new MOSDBackoff(
628 info.pgid,
629 get_osdmap_epoch(),
630 CEPH_OSD_BACKOFF_OP_BLOCK,
631 b->id,
632 begin,
633 end));
7c673cae
FG
634}
635
9f95a23c 636void PG::release_backoffs(const hobject_t& begin, const hobject_t& end)
7c673cae 637{
9f95a23c
TL
638 dout(10) << __func__ << " [" << begin << "," << end << ")" << dendl;
639 vector<ceph::ref_t<Backoff>> bv;
640 {
641 std::lock_guard l(backoff_lock);
642 auto p = backoffs.lower_bound(begin);
643 while (p != backoffs.end()) {
644 int r = cmp(p->first, end);
645 dout(20) << __func__ << " ? " << r << " " << p->first
646 << " " << p->second << dendl;
647 // note: must still examine begin=end=p->first case
648 if (r > 0 || (r == 0 && begin < end)) {
649 break;
650 }
651 dout(20) << __func__ << " checking " << p->first
652 << " " << p->second << dendl;
653 auto q = p->second.begin();
654 while (q != p->second.end()) {
655 dout(20) << __func__ << " checking " << *q << dendl;
656 int r = cmp((*q)->begin, begin);
657 if (r == 0 || (r > 0 && (*q)->end < end)) {
658 bv.push_back(*q);
659 q = p->second.erase(q);
660 } else {
661 ++q;
662 }
663 }
664 if (p->second.empty()) {
665 p = backoffs.erase(p);
666 } else {
667 ++p;
668 }
7c673cae
FG
669 }
670 }
9f95a23c
TL
671 for (auto b : bv) {
672 std::lock_guard l(b->lock);
673 dout(10) << __func__ << " " << *b << dendl;
674 if (b->session) {
675 ceph_assert(b->pg == this);
676 ConnectionRef con = b->session->con;
677 if (con) { // OSD::ms_handle_reset clears s->con without a lock
678 con->send_message(
679 new MOSDBackoff(
680 info.pgid,
681 get_osdmap_epoch(),
682 CEPH_OSD_BACKOFF_OP_UNBLOCK,
683 b->id,
684 b->begin,
685 b->end));
7c673cae 686 }
9f95a23c
TL
687 if (b->is_new()) {
688 b->state = Backoff::STATE_DELETING;
7c673cae 689 } else {
9f95a23c
TL
690 b->session->rm_backoff(b);
691 b->session.reset();
7c673cae 692 }
9f95a23c
TL
693 b->pg.reset();
694 }
7c673cae 695 }
7c673cae
FG
696}
697
9f95a23c 698void PG::clear_backoffs()
7c673cae 699{
9f95a23c
TL
700 dout(10) << __func__ << " " << dendl;
701 map<hobject_t,set<ceph::ref_t<Backoff>>> ls;
702 {
703 std::lock_guard l(backoff_lock);
704 ls.swap(backoffs);
705 }
706 for (auto& p : ls) {
707 for (auto& b : p.second) {
708 std::lock_guard l(b->lock);
709 dout(10) << __func__ << " " << *b << dendl;
710 if (b->session) {
711 ceph_assert(b->pg == this);
712 if (b->is_new()) {
713 b->state = Backoff::STATE_DELETING;
714 } else {
715 b->session->rm_backoff(b);
716 b->session.reset();
717 }
718 b->pg.reset();
719 }
720 }
721 }
722}
7c673cae 723
9f95a23c
TL
724// called by Session::clear_backoffs()
725void PG::rm_backoff(const ceph::ref_t<Backoff>& b)
726{
727 dout(10) << __func__ << " " << *b << dendl;
728 std::lock_guard l(backoff_lock);
729 ceph_assert(ceph_mutex_is_locked_by_me(b->lock));
730 ceph_assert(b->pg == this);
731 auto p = backoffs.find(b->begin);
732 // may race with release_backoffs()
733 if (p != backoffs.end()) {
734 auto q = p->second.find(b);
735 if (q != p->second.end()) {
736 p->second.erase(q);
737 if (p->second.empty()) {
738 backoffs.erase(p);
739 }
740 }
741 }
742}
7c673cae 743
9f95a23c
TL
744void PG::clear_recovery_state()
745{
746 dout(10) << "clear_recovery_state" << dendl;
7c673cae 747
9f95a23c 748 finish_sync_event = 0;
7c673cae 749
9f95a23c
TL
750 hobject_t soid;
751 while (recovery_ops_active > 0) {
752#ifdef DEBUG_RECOVERY_OIDS
753 soid = *recovering_oids.begin();
754#endif
755 finish_recovery_op(soid, true);
756 }
7c673cae 757
9f95a23c
TL
758 backfill_info.clear();
759 peer_backfill_info.clear();
760 waiting_on_backfill.clear();
761 _clear_recovery_state(); // pg impl specific hook
762}
7c673cae 763
9f95a23c
TL
764void PG::cancel_recovery()
765{
766 dout(10) << "cancel_recovery" << dendl;
767 clear_recovery_state();
7c673cae
FG
768}
769
9f95a23c
TL
770void PG::set_probe_targets(const set<pg_shard_t> &probe_set)
771{
772 std::lock_guard l(heartbeat_peer_lock);
773 probe_targets.clear();
774 for (set<pg_shard_t>::iterator i = probe_set.begin();
775 i != probe_set.end();
7c673cae 776 ++i) {
9f95a23c 777 probe_targets.insert(i->osd);
7c673cae 778 }
7c673cae
FG
779}
780
9f95a23c
TL
781void PG::send_cluster_message(
782 int target, Message *m,
783 epoch_t epoch, bool share_map_update=false)
784{
785 ConnectionRef con = osd->get_con_osd_cluster(
786 target, get_osdmap_epoch());
787 if (!con) {
788 m->put();
11fdf7f2
TL
789 return;
790 }
7c673cae 791
9f95a23c
TL
792 if (share_map_update) {
793 osd->maybe_share_map(con.get(), get_osdmap());
7c673cae 794 }
9f95a23c
TL
795 osd->send_message_osd_cluster(m, con.get());
796}
7c673cae 797
9f95a23c
TL
798void PG::clear_probe_targets()
799{
800 std::lock_guard l(heartbeat_peer_lock);
801 probe_targets.clear();
802}
7c673cae 803
9f95a23c
TL
804void PG::update_heartbeat_peers(set<int> new_peers)
805{
806 bool need_update = false;
807 heartbeat_peer_lock.lock();
808 if (new_peers == heartbeat_peers) {
809 dout(10) << "update_heartbeat_peers " << heartbeat_peers << " unchanged" << dendl;
810 } else {
811 dout(10) << "update_heartbeat_peers " << heartbeat_peers << " -> " << new_peers << dendl;
812 heartbeat_peers.swap(new_peers);
813 need_update = true;
11fdf7f2 814 }
9f95a23c 815 heartbeat_peer_lock.unlock();
11fdf7f2 816
9f95a23c
TL
817 if (need_update)
818 osd->need_heartbeat_peer_update();
819}
11fdf7f2 820
11fdf7f2 821
9f95a23c
TL
822bool PG::check_in_progress_op(
823 const osd_reqid_t &r,
824 eversion_t *version,
825 version_t *user_version,
826 int *return_code,
827 vector<pg_log_op_return_item_t> *op_returns
828 ) const
829{
830 return (
831 projected_log.get_request(r, version, user_version, return_code,
832 op_returns) ||
833 recovery_state.get_pg_log().get_log().get_request(
834 r, version, user_version, return_code, op_returns));
11fdf7f2
TL
835}
836
9f95a23c 837void PG::publish_stats_to_osd()
11fdf7f2 838{
9f95a23c
TL
839 if (!is_primary())
840 return;
11fdf7f2 841
9f95a23c
TL
842 std::lock_guard l{pg_stats_publish_lock};
843 auto stats = recovery_state.prepare_stats_for_publish(
844 pg_stats_publish_valid,
845 pg_stats_publish,
846 unstable_stats);
847 if (stats) {
848 pg_stats_publish = stats.value();
849 pg_stats_publish_valid = true;
11fdf7f2 850 }
11fdf7f2
TL
851}
852
9f95a23c 853unsigned PG::get_target_pg_log_entries() const
11fdf7f2 854{
9f95a23c 855 return osd->get_target_pg_log_entries();
11fdf7f2
TL
856}
857
9f95a23c 858void PG::clear_publish_stats()
11fdf7f2 859{
9f95a23c
TL
860 dout(15) << "clear_stats" << dendl;
861 std::lock_guard l{pg_stats_publish_lock};
862 pg_stats_publish_valid = false;
7c673cae
FG
863}
864
865/**
9f95a23c 866 * initialize a newly instantiated pg
7c673cae 867 *
9f95a23c
TL
868 * Initialize PG state, as when a PG is initially created, or when it
869 * is first instantiated on the current node.
7c673cae 870 *
9f95a23c
TL
871 * @param role our role/rank
872 * @param newup up set
873 * @param newacting acting set
874 * @param history pg history
875 * @param pi past_intervals
876 * @param backfill true if info should be marked as backfill
877 * @param t transaction to write out our new state in
7c673cae 878 */
9f95a23c
TL
879void PG::init(
880 int role,
881 const vector<int>& newup, int new_up_primary,
882 const vector<int>& newacting, int new_acting_primary,
883 const pg_history_t& history,
884 const PastIntervals& pi,
885 bool backfill,
886 ObjectStore::Transaction &t)
887{
888 recovery_state.init(
889 role, newup, new_up_primary, newacting,
890 new_acting_primary, history, pi, backfill, t);
891}
7c673cae 892
9f95a23c
TL
893void PG::shutdown()
894{
895 ch->flush();
896 std::scoped_lock l{*this};
897 recovery_state.shutdown();
898 on_shutdown();
899}
7c673cae 900
9f95a23c
TL
901#pragma GCC diagnostic ignored "-Wpragmas"
902#pragma GCC diagnostic push
903#pragma GCC diagnostic ignored "-Wdeprecated-declarations"
7c673cae 904
9f95a23c
TL
905void PG::upgrade(ObjectStore *store)
906{
907 dout(0) << __func__ << " " << info_struct_v << " -> " << pg_latest_struct_v
908 << dendl;
909 ceph_assert(info_struct_v <= 10);
910 ObjectStore::Transaction t;
7c673cae 911
9f95a23c 912 // <do upgrade steps here>
7c673cae 913
9f95a23c
TL
914 // finished upgrade!
915 ceph_assert(info_struct_v == 10);
7c673cae 916
9f95a23c
TL
917 // update infover_key
918 if (info_struct_v < pg_latest_struct_v) {
919 map<string,bufferlist> v;
920 __u8 ver = pg_latest_struct_v;
921 encode(ver, v[string(infover_key)]);
922 t.omap_setkeys(coll, pgmeta_oid, v);
7c673cae 923 }
7c673cae 924
9f95a23c 925 recovery_state.force_write_state(t);
7c673cae 926
9f95a23c
TL
927 ObjectStore::CollectionHandle ch = store->open_collection(coll);
928 int r = store->queue_transaction(ch, std::move(t));
929 if (r != 0) {
930 derr << __func__ << ": queue_transaction returned "
931 << cpp_strerror(r) << dendl;
932 ceph_abort();
7c673cae 933 }
9f95a23c 934 ceph_assert(r == 0);
7c673cae 935
9f95a23c
TL
936 C_SaferCond waiter;
937 if (!ch->flush_commit(&waiter)) {
938 waiter.wait();
7c673cae 939 }
9f95a23c 940}
7c673cae 941
9f95a23c
TL
942#pragma GCC diagnostic pop
943#pragma GCC diagnostic warning "-Wpragmas"
7c673cae 944
9f95a23c
TL
945void PG::prepare_write(
946 pg_info_t &info,
947 pg_info_t &last_written_info,
948 PastIntervals &past_intervals,
949 PGLog &pglog,
950 bool dirty_info,
951 bool dirty_big_info,
952 bool need_write_epoch,
953 ObjectStore::Transaction &t)
954{
955 info.stats.stats.add(unstable_stats);
956 unstable_stats.clear();
957 map<string,bufferlist> km;
958 string key_to_remove;
959 if (dirty_big_info || dirty_info) {
960 int ret = prepare_info_keymap(
961 cct,
962 &km,
963 &key_to_remove,
11fdf7f2 964 get_osdmap_epoch(),
9f95a23c
TL
965 info,
966 last_written_info,
967 past_intervals,
968 dirty_big_info,
969 need_write_epoch,
970 cct->_conf->osd_fast_info,
971 osd->logger,
972 this);
973 ceph_assert(ret == 0);
7c673cae 974 }
9f95a23c
TL
975 pglog.write_log_and_missing(
976 t, &km, coll, pgmeta_oid, pool.info.require_rollback());
977 if (!km.empty())
978 t.omap_setkeys(coll, pgmeta_oid, km);
979 if (!key_to_remove.empty())
980 t.omap_rmkey(coll, pgmeta_oid, key_to_remove);
981}
7c673cae 982
9f95a23c
TL
983#pragma GCC diagnostic ignored "-Wpragmas"
984#pragma GCC diagnostic push
985#pragma GCC diagnostic ignored "-Wdeprecated-declarations"
7c673cae 986
9f95a23c
TL
987bool PG::_has_removal_flag(ObjectStore *store,
988 spg_t pgid)
989{
990 coll_t coll(pgid);
991 ghobject_t pgmeta_oid(pgid.make_pgmeta_oid());
7c673cae 992
9f95a23c
TL
993 // first try new way
994 set<string> keys;
995 keys.insert("_remove");
996 map<string,bufferlist> values;
997 auto ch = store->open_collection(coll);
998 ceph_assert(ch);
999 if (store->omap_get_values(ch, pgmeta_oid, keys, &values) == 0 &&
1000 values.size() == 1)
1001 return true;
7c673cae 1002
9f95a23c
TL
1003 return false;
1004}
c07f9fc5 1005
9f95a23c
TL
1006int PG::peek_map_epoch(ObjectStore *store,
1007 spg_t pgid,
1008 epoch_t *pepoch)
1009{
1010 coll_t coll(pgid);
1011 ghobject_t legacy_infos_oid(OSD::make_infos_oid());
1012 ghobject_t pgmeta_oid(pgid.make_pgmeta_oid());
1013 epoch_t cur_epoch = 0;
7c673cae 1014
9f95a23c
TL
1015 // validate collection name
1016 ceph_assert(coll.is_pg());
7c673cae 1017
9f95a23c
TL
1018 // try for v8
1019 set<string> keys;
1020 keys.insert(string(infover_key));
1021 keys.insert(string(epoch_key));
1022 map<string,bufferlist> values;
1023 auto ch = store->open_collection(coll);
1024 ceph_assert(ch);
1025 int r = store->omap_get_values(ch, pgmeta_oid, keys, &values);
1026 if (r == 0) {
1027 ceph_assert(values.size() == 2);
7c673cae 1028
9f95a23c
TL
1029 // sanity check version
1030 auto bp = values[string(infover_key)].cbegin();
1031 __u8 struct_v = 0;
1032 decode(struct_v, bp);
1033 ceph_assert(struct_v >= 8);
91327a77 1034
9f95a23c
TL
1035 // get epoch
1036 bp = values[string(epoch_key)].begin();
1037 decode(cur_epoch, bp);
1038 } else {
1039 // probably bug 10617; see OSD::load_pgs()
1040 return -1;
1041 }
7c673cae 1042
9f95a23c
TL
1043 *pepoch = cur_epoch;
1044 return 0;
1045}
7c673cae 1046
9f95a23c
TL
1047#pragma GCC diagnostic pop
1048#pragma GCC diagnostic warning "-Wpragmas"
7c673cae 1049
9f95a23c
TL
1050bool PG::check_log_for_corruption(ObjectStore *store)
1051{
1052 /// TODO: this method needs to work with the omap log
1053 return true;
1054}
7c673cae 1055
9f95a23c
TL
1056//! Get the name we're going to save our corrupt page log as
1057std::string PG::get_corrupt_pg_log_name() const
1058{
1059 const int MAX_BUF = 512;
1060 char buf[MAX_BUF];
1061 struct tm tm_buf;
1062 time_t my_time(time(NULL));
1063 const struct tm *t = localtime_r(&my_time, &tm_buf);
1064 int ret = strftime(buf, sizeof(buf), "corrupt_log_%Y-%m-%d_%k:%M_", t);
1065 if (ret == 0) {
1066 dout(0) << "strftime failed" << dendl;
1067 return "corrupt_log_unknown_time";
7c673cae 1068 }
9f95a23c
TL
1069 string out(buf);
1070 out += stringify(info.pgid);
1071 return out;
7c673cae
FG
1072}
1073
9f95a23c
TL
1074int PG::read_info(
1075 ObjectStore *store, spg_t pgid, const coll_t &coll,
1076 pg_info_t &info, PastIntervals &past_intervals,
1077 __u8 &struct_v)
7c673cae 1078{
9f95a23c
TL
1079 set<string> keys;
1080 keys.insert(string(infover_key));
1081 keys.insert(string(info_key));
1082 keys.insert(string(biginfo_key));
1083 keys.insert(string(fastinfo_key));
1084 ghobject_t pgmeta_oid(pgid.make_pgmeta_oid());
1085 map<string,bufferlist> values;
1086 auto ch = store->open_collection(coll);
1087 ceph_assert(ch);
1088 int r = store->omap_get_values(ch, pgmeta_oid, keys, &values);
1089 ceph_assert(r == 0);
1090 ceph_assert(values.size() == 3 ||
1091 values.size() == 4);
7c673cae 1092
9f95a23c
TL
1093 auto p = values[string(infover_key)].cbegin();
1094 decode(struct_v, p);
1095 ceph_assert(struct_v >= 10);
7c673cae 1096
9f95a23c
TL
1097 p = values[string(info_key)].begin();
1098 decode(info, p);
7c673cae 1099
9f95a23c
TL
1100 p = values[string(biginfo_key)].begin();
1101 decode(past_intervals, p);
1102 decode(info.purged_snaps, p);
7c673cae 1103
9f95a23c
TL
1104 p = values[string(fastinfo_key)].begin();
1105 if (!p.end()) {
1106 pg_fast_info_t fast;
1107 decode(fast, p);
1108 fast.try_apply_to(&info);
1109 }
1110 return 0;
7c673cae
FG
1111}
1112
9f95a23c
TL
1113void PG::read_state(ObjectStore *store)
1114{
1115 PastIntervals past_intervals_from_disk;
1116 pg_info_t info_from_disk;
1117 int r = read_info(
1118 store,
1119 pg_id,
1120 coll,
1121 info_from_disk,
1122 past_intervals_from_disk,
1123 info_struct_v);
1124 ceph_assert(r >= 0);
7c673cae 1125
9f95a23c
TL
1126 if (info_struct_v < pg_compat_struct_v) {
1127 derr << "PG needs upgrade, but on-disk data is too old; upgrade to"
1128 << " an older version first." << dendl;
1129 ceph_abort_msg("PG too old to upgrade");
1130 }
7c673cae 1131
9f95a23c
TL
1132 recovery_state.init_from_disk_state(
1133 std::move(info_from_disk),
1134 std::move(past_intervals_from_disk),
1135 [this, store] (PGLog &pglog) {
1136 ostringstream oss;
1137 pglog.read_log_and_missing(
1138 store,
1139 ch,
1140 pgmeta_oid,
1141 info,
1142 oss,
1143 cct->_conf->osd_ignore_stale_divergent_priors,
1144 cct->_conf->osd_debug_verify_missing_on_start);
1145
1146 if (oss.tellp())
1147 osd->clog->error() << oss.str();
1148 return 0;
1149 });
7c673cae 1150
9f95a23c
TL
1151 if (info_struct_v < pg_latest_struct_v) {
1152 upgrade(store);
7c673cae
FG
1153 }
1154
9f95a23c
TL
1155 // initialize current mapping
1156 {
1157 int primary, up_primary;
1158 vector<int> acting, up;
1159 get_osdmap()->pg_to_up_acting_osds(
1160 pg_id.pgid, &up, &up_primary, &acting, &primary);
1161 recovery_state.init_primary_up_acting(
1162 up,
1163 acting,
1164 up_primary,
1165 primary);
1166 recovery_state.set_role(OSDMap::calc_pg_role(pg_whoami, acting));
1167 }
1168
1169 // init pool options
1170 store->set_collection_opts(ch, pool.info.opts);
7c673cae 1171
9f95a23c
TL
1172 PeeringCtx rctx(ceph_release_t::unknown);
1173 handle_initialize(rctx);
1174 // note: we don't activate here because we know the OSD will advance maps
1175 // during boot.
1176 write_if_dirty(rctx.transaction);
1177 store->queue_transaction(ch, std::move(rctx.transaction));
7c673cae
FG
1178}
1179
9f95a23c
TL
1180void PG::update_snap_map(
1181 const vector<pg_log_entry_t> &log_entries,
1182 ObjectStore::Transaction &t)
7c673cae 1183{
9f95a23c
TL
1184 for (vector<pg_log_entry_t>::const_iterator i = log_entries.begin();
1185 i != log_entries.end();
1186 ++i) {
1187 OSDriver::OSTransaction _t(osdriver.get_transaction(&t));
1188 if (i->soid.snap < CEPH_MAXSNAP) {
1189 if (i->is_delete()) {
1190 int r = snap_mapper.remove_oid(
1191 i->soid,
1192 &_t);
1193 if (r != 0)
1194 derr << __func__ << " remove_oid " << i->soid << " failed with " << r << dendl;
1195 // On removal tolerate missing key corruption
1196 ceph_assert(r == 0 || r == -ENOENT);
1197 } else if (i->is_update()) {
1198 ceph_assert(i->snaps.length() > 0);
1199 vector<snapid_t> snaps;
1200 bufferlist snapbl = i->snaps;
1201 auto p = snapbl.cbegin();
1202 try {
1203 decode(snaps, p);
1204 } catch (...) {
1205 derr << __func__ << " decode snaps failure on " << *i << dendl;
1206 snaps.clear();
1207 }
1208 set<snapid_t> _snaps(snaps.begin(), snaps.end());
7c673cae 1209
9f95a23c
TL
1210 if (i->is_clone() || i->is_promote()) {
1211 snap_mapper.add_oid(
1212 i->soid,
1213 _snaps,
1214 &_t);
1215 } else if (i->is_modify()) {
1216 int r = snap_mapper.update_snaps(
1217 i->soid,
1218 _snaps,
1219 0,
1220 &_t);
1221 ceph_assert(r == 0);
11fdf7f2 1222 } else {
9f95a23c 1223 ceph_assert(i->is_clean());
11fdf7f2
TL
1224 }
1225 }
11fdf7f2
TL
1226 }
1227 }
7c673cae
FG
1228}
1229
9f95a23c
TL
1230/**
1231 * filter trimming|trimmed snaps out of snapcontext
1232 */
1233void PG::filter_snapc(vector<snapid_t> &snaps)
7c673cae 1234{
9f95a23c
TL
1235 // nothing needs to trim, we can return immediately
1236 if (snap_trimq.empty() && info.purged_snaps.empty())
1237 return;
1238
1239 bool filtering = false;
1240 vector<snapid_t> newsnaps;
1241 for (vector<snapid_t>::iterator p = snaps.begin();
1242 p != snaps.end();
1243 ++p) {
1244 if (snap_trimq.contains(*p) || info.purged_snaps.contains(*p)) {
1245 if (!filtering) {
1246 // start building a new vector with what we've seen so far
1247 dout(10) << "filter_snapc filtering " << snaps << dendl;
1248 newsnaps.insert(newsnaps.begin(), snaps.begin(), p);
1249 filtering = true;
1250 }
1251 dout(20) << "filter_snapc removing trimq|purged snap " << *p << dendl;
1252 } else {
1253 if (filtering)
1254 newsnaps.push_back(*p); // continue building new vector
d2e6a577 1255 }
c07f9fc5 1256 }
9f95a23c
TL
1257 if (filtering) {
1258 snaps.swap(newsnaps);
1259 dout(10) << "filter_snapc result " << snaps << dendl;
a8e16298 1260 }
a8e16298
TL
1261}
1262
9f95a23c 1263void PG::requeue_object_waiters(map<hobject_t, list<OpRequestRef>>& m)
a8e16298 1264{
9f95a23c
TL
1265 for (map<hobject_t, list<OpRequestRef>>::iterator it = m.begin();
1266 it != m.end();
1267 ++it)
1268 requeue_ops(it->second);
1269 m.clear();
c07f9fc5 1270}
7c673cae 1271
9f95a23c 1272void PG::requeue_op(OpRequestRef op)
c07f9fc5 1273{
9f95a23c
TL
1274 auto p = waiting_for_map.find(op->get_source());
1275 if (p != waiting_for_map.end()) {
1276 dout(20) << __func__ << " " << op << " (waiting_for_map " << p->first << ")"
1277 << dendl;
1278 p->second.push_front(op);
c07f9fc5 1279 } else {
9f95a23c
TL
1280 dout(20) << __func__ << " " << op << dendl;
1281 osd->enqueue_front(
1282 OpSchedulerItem(
1283 unique_ptr<OpSchedulerItem::OpQueueable>(new PGOpItem(info.pgid, op)),
1284 op->get_req()->get_cost(),
1285 op->get_req()->get_priority(),
1286 op->get_req()->get_recv_stamp(),
1287 op->get_req()->get_source().num(),
1288 get_osdmap_epoch()));
7c673cae 1289 }
c07f9fc5 1290}
7c673cae 1291
9f95a23c 1292void PG::requeue_ops(list<OpRequestRef> &ls)
c07f9fc5 1293{
9f95a23c
TL
1294 for (list<OpRequestRef>::reverse_iterator i = ls.rbegin();
1295 i != ls.rend();
1296 ++i) {
1297 requeue_op(*i);
c07f9fc5 1298 }
9f95a23c 1299 ls.clear();
7c673cae
FG
1300}
1301
9f95a23c 1302void PG::requeue_map_waiters()
7c673cae 1303{
9f95a23c
TL
1304 epoch_t epoch = get_osdmap_epoch();
1305 auto p = waiting_for_map.begin();
1306 while (p != waiting_for_map.end()) {
1307 if (epoch < p->second.front()->min_epoch) {
1308 dout(20) << __func__ << " " << p->first << " front op "
1309 << p->second.front() << " must still wait, doing nothing"
1310 << dendl;
1311 ++p;
1312 } else {
1313 dout(20) << __func__ << " " << p->first << " " << p->second << dendl;
1314 for (auto q = p->second.rbegin(); q != p->second.rend(); ++q) {
1315 auto req = *q;
1316 osd->enqueue_front(OpSchedulerItem(
1317 unique_ptr<OpSchedulerItem::OpQueueable>(new PGOpItem(info.pgid, req)),
1318 req->get_req()->get_cost(),
1319 req->get_req()->get_priority(),
1320 req->get_req()->get_recv_stamp(),
1321 req->get_req()->get_source().num(),
1322 epoch));
1323 }
1324 p = waiting_for_map.erase(p);
c07f9fc5 1325 }
9f95a23c
TL
1326 }
1327}
7c673cae 1328
7c673cae 1329
9f95a23c
TL
1330// ==========================================================================================
1331// SCRUB
7c673cae 1332
9f95a23c
TL
1333/*
1334 * when holding pg and sched_scrub_lock, then the states are:
1335 * scheduling:
1336 * scrubber.local_reserved = true
1337 * scrubber.active = false
1338 * scrubber.reserved_peers includes whoami
1339 * osd->scrubs_local++
1340 * scheduling, replica declined:
1341 * scrubber.local_reserved = true
1342 * scrubber.reserved_peers includes -1
1343 * osd->scrub_local++
1344 * pending:
1345 * scrubber.local_reserved = true
1346 * scrubber.active = false
1347 * scrubber.reserved_peers.size() == acting.size();
1348 * pg on scrub_wq
1349 * osd->scrub_local++
1350 * scrubbing:
1351 * scrubber.local_reserved = true;
1352 * scrubber.active = true
1353 * scrubber.reserved_peers empty
1354 */
7c673cae 1355
9f95a23c
TL
1356// returns true if a scrub has been newly kicked off
1357bool PG::sched_scrub()
11fdf7f2 1358{
9f95a23c
TL
1359 ceph_assert(ceph_mutex_is_locked(_lock));
1360 ceph_assert(!is_scrubbing());
1361 if (!(is_primary() && is_active() && is_clean())) {
1362 return false;
11fdf7f2 1363 }
11fdf7f2 1364
9f95a23c
TL
1365 // All processing the first time through commits us to whatever
1366 // choices are made.
1367 if (!scrubber.local_reserved) {
1368 dout(20) << __func__ << ": Start processing pg " << info.pgid << dendl;
7c673cae 1369
9f95a23c
TL
1370 bool allow_deep_scrub = !(get_osdmap()->test_flag(CEPH_OSDMAP_NODEEP_SCRUB) ||
1371 pool.info.has_flag(pg_pool_t::FLAG_NODEEP_SCRUB));
1372 bool allow_scrub = !(get_osdmap()->test_flag(CEPH_OSDMAP_NOSCRUB) ||
1373 pool.info.has_flag(pg_pool_t::FLAG_NOSCRUB));
1374 bool has_deep_errors = (info.stats.stats.sum.num_deep_scrub_errors > 0);
1375 bool try_to_auto_repair = (cct->_conf->osd_scrub_auto_repair
1376 && get_pgbackend()->auto_repair_supported());
7c673cae 1377
9f95a23c
TL
1378 scrubber.time_for_deep = false;
1379 // Clear these in case user issues the scrub/repair command during
1380 // the scheduling of the scrub/repair (e.g. request reservation)
1381 scrubber.deep_scrub_on_error = false;
1382 scrubber.auto_repair = false;
7c673cae 1383
9f95a23c
TL
1384 // All periodic scrub handling goes here because must_scrub is
1385 // always set for must_deep_scrub and must_repair.
1386 if (!scrubber.must_scrub) {
1387 ceph_assert(!scrubber.must_deep_scrub && !scrubber.must_repair);
1388 // Handle deep scrub determination only if allowed
1389 if (allow_deep_scrub) {
1390 // Initial entry and scheduled scrubs without nodeep_scrub set get here
1391 if (scrubber.need_auto) {
1392 dout(20) << __func__ << ": need repair after scrub errors" << dendl;
1393 scrubber.time_for_deep = true;
1394 } else {
1395 double deep_scrub_interval = 0;
1396 pool.info.opts.get(pool_opts_t::DEEP_SCRUB_INTERVAL, &deep_scrub_interval);
1397 if (deep_scrub_interval <= 0) {
1398 deep_scrub_interval = cct->_conf->osd_deep_scrub_interval;
1399 }
1400 scrubber.time_for_deep = ceph_clock_now() >=
1401 info.history.last_deep_scrub_stamp + deep_scrub_interval;
7c673cae 1402
9f95a23c
TL
1403 bool deep_coin_flip = false;
1404 // If we randomize when !allow_scrub && allow_deep_scrub, then it guarantees
1405 // we will deep scrub because this function is called often.
1406 if (!scrubber.time_for_deep && allow_scrub)
1407 deep_coin_flip = (rand() % 100) < cct->_conf->osd_deep_scrub_randomize_ratio * 100;
1408 dout(20) << __func__ << ": time_for_deep=" << scrubber.time_for_deep << " deep_coin_flip=" << deep_coin_flip << dendl;
7c673cae 1409
9f95a23c
TL
1410 scrubber.time_for_deep = (scrubber.time_for_deep || deep_coin_flip);
1411 }
1412
1413 if (!scrubber.time_for_deep && has_deep_errors) {
1414 osd->clog->info() << "osd." << osd->whoami
1415 << " pg " << info.pgid
1416 << " Deep scrub errors, upgrading scrub to deep-scrub";
1417 scrubber.time_for_deep = true;
1418 }
1419
1420 if (try_to_auto_repair) {
1421 if (scrubber.time_for_deep) {
1422 dout(20) << __func__ << ": auto repair with deep scrubbing" << dendl;
1423 scrubber.auto_repair = true;
1424 } else if (allow_scrub) {
1425 dout(20) << __func__ << ": auto repair with scrubbing, rescrub if errors found" << dendl;
1426 scrubber.deep_scrub_on_error = true;
1427 }
1428 }
1429 } else { // !allow_deep_scrub
1430 dout(20) << __func__ << ": nodeep_scrub set" << dendl;
1431 if (has_deep_errors) {
1432 osd->clog->error() << "osd." << osd->whoami
1433 << " pg " << info.pgid
1434 << " Regular scrub skipped due to deep-scrub errors and nodeep-scrub set";
1435 return false;
1436 }
1437 }
1438
1439 //NOSCRUB so skip regular scrubs
1440 if (!allow_scrub && !scrubber.time_for_deep) {
1441 return false;
1442 }
1443 // scrubber.must_scrub
1444 } else if (!scrubber.must_deep_scrub && has_deep_errors) {
1445 osd->clog->error() << "osd." << osd->whoami
1446 << " pg " << info.pgid
1447 << " Regular scrub request, deep-scrub details will be lost";
1448 }
1449 // Unless precluded this was handle above
1450 scrubber.need_auto = false;
1451
1452 ceph_assert(scrubber.reserved_peers.empty());
1453 bool allow_scrubing = cct->_conf->osd_scrub_during_recovery ||
1454 (cct->_conf->osd_repair_during_recovery && scrubber.must_repair) ||
1455 !osd->is_recovery_active();
1456 if (allow_scrubing &&
1457 osd->inc_scrubs_local()) {
1458 dout(20) << __func__ << ": reserved locally, reserving replicas" << dendl;
1459 scrubber.local_reserved = true;
1460 scrubber.reserved_peers.insert(pg_whoami);
1461 scrub_reserve_replicas();
1462 } else {
1463 dout(20) << __func__ << ": failed to reserve locally" << dendl;
1464 return false;
1465 }
1466 }
1467
1468 if (scrubber.local_reserved) {
1469 if (scrubber.reserve_failed) {
1470 dout(20) << __func__ << ": failed, a peer declined" << dendl;
1471 clear_scrub_reserved();
1472 scrub_unreserve_replicas();
1473 return false;
1474 } else if (scrubber.reserved_peers.size() == get_actingset().size()) {
1475 dout(20) << __func__ << ": success, reserved self and replicas" << dendl;
1476 if (scrubber.time_for_deep) {
1477 dout(10) << __func__ << ": scrub will be deep" << dendl;
1478 state_set(PG_STATE_DEEP_SCRUB);
1479 scrubber.time_for_deep = false;
1480 }
7c673cae 1481 queue_scrub();
9f95a23c
TL
1482 } else {
1483 // none declined, since scrubber.reserved is set
1484 dout(20) << __func__ << ": reserved " << scrubber.reserved_peers
1485 << ", waiting for replicas" << dendl;
7c673cae 1486 }
7c673cae 1487 }
9f95a23c 1488 return true;
7c673cae
FG
1489}
1490
9f95a23c 1491bool PG::is_scrub_registered()
7c673cae 1492{
9f95a23c 1493 return !scrubber.scrub_reg_stamp.is_zero();
7c673cae
FG
1494}
1495
9f95a23c 1496void PG::reg_next_scrub()
7c673cae 1497{
9f95a23c
TL
1498 if (!is_primary())
1499 return;
7c673cae 1500
9f95a23c
TL
1501 utime_t reg_stamp;
1502 bool must = false;
1503 if (scrubber.must_scrub || scrubber.need_auto) {
1504 // Set the smallest time that isn't utime_t()
1505 reg_stamp = Scrubber::scrub_must_stamp();
1506 must = true;
1507 } else if (info.stats.stats_invalid && cct->_conf->osd_scrub_invalid_stats) {
1508 reg_stamp = ceph_clock_now();
1509 must = true;
1510 } else {
1511 reg_stamp = info.history.last_scrub_stamp;
7c673cae 1512 }
9f95a23c
TL
1513 // note down the sched_time, so we can locate this scrub, and remove it
1514 // later on.
1515 double scrub_min_interval = 0, scrub_max_interval = 0;
1516 pool.info.opts.get(pool_opts_t::SCRUB_MIN_INTERVAL, &scrub_min_interval);
1517 pool.info.opts.get(pool_opts_t::SCRUB_MAX_INTERVAL, &scrub_max_interval);
1518 ceph_assert(!is_scrub_registered());
1519 scrubber.scrub_reg_stamp = osd->reg_pg_scrub(info.pgid,
1520 reg_stamp,
1521 scrub_min_interval,
1522 scrub_max_interval,
1523 must);
1524 dout(10) << __func__ << " pg " << pg_id << " register next scrub, scrub time "
1525 << scrubber.scrub_reg_stamp << ", must = " << (int)must << dendl;
7c673cae
FG
1526}
1527
9f95a23c 1528void PG::unreg_next_scrub()
7c673cae 1529{
9f95a23c
TL
1530 if (is_scrub_registered()) {
1531 osd->unreg_pg_scrub(info.pgid, scrubber.scrub_reg_stamp);
1532 scrubber.scrub_reg_stamp = utime_t();
1533 }
1534}
7c673cae 1535
9f95a23c
TL
1536void PG::on_info_history_change()
1537{
1538 unreg_next_scrub();
1539 reg_next_scrub();
1540}
7c673cae 1541
9f95a23c
TL
1542void PG::scrub_requested(bool deep, bool repair, bool need_auto)
1543{
1544 unreg_next_scrub();
1545 if (need_auto) {
1546 scrubber.need_auto = true;
1547 } else {
1548 scrubber.must_scrub = true;
1549 scrubber.must_deep_scrub = deep || repair;
1550 scrubber.must_repair = repair;
1551 // User might intervene, so clear this
1552 scrubber.need_auto = false;
1553 }
1554 reg_next_scrub();
1555}
7c673cae 1556
9f95a23c
TL
1557void PG::clear_ready_to_merge() {
1558 osd->clear_ready_to_merge(this);
1559}
7c673cae 1560
9f95a23c
TL
1561void PG::queue_want_pg_temp(const vector<int> &wanted) {
1562 osd->queue_want_pg_temp(get_pgid().pgid, wanted);
1563}
7c673cae 1564
9f95a23c
TL
1565void PG::clear_want_pg_temp() {
1566 osd->remove_want_pg_temp(get_pgid().pgid);
1567}
7c673cae 1568
9f95a23c
TL
1569void PG::on_role_change() {
1570 requeue_ops(waiting_for_peered);
1571 plpg_on_role_change();
1572}
7c673cae 1573
9f95a23c
TL
1574void PG::on_new_interval() {
1575 scrub_queued = false;
1576 projected_last_update = eversion_t();
1577 cancel_recovery();
1578}
7c673cae 1579
9f95a23c
TL
1580epoch_t PG::oldest_stored_osdmap() {
1581 return osd->get_superblock().oldest_map;
1582}
7c673cae 1583
9f95a23c
TL
1584OstreamTemp PG::get_clog_info() {
1585 return osd->clog->info();
1586}
7c673cae 1587
9f95a23c
TL
1588OstreamTemp PG::get_clog_debug() {
1589 return osd->clog->debug();
1590}
7c673cae 1591
9f95a23c
TL
1592OstreamTemp PG::get_clog_error() {
1593 return osd->clog->error();
1594}
7c673cae 1595
9f95a23c
TL
1596void PG::schedule_event_after(
1597 PGPeeringEventRef event,
1598 float delay) {
1599 std::lock_guard lock(osd->recovery_request_lock);
1600 osd->recovery_request_timer.add_event_after(
1601 delay,
1602 new QueuePeeringEvt(
1603 this,
1604 std::move(event)));
1605}
7c673cae 1606
9f95a23c
TL
1607void PG::request_local_background_io_reservation(
1608 unsigned priority,
1609 PGPeeringEventRef on_grant,
1610 PGPeeringEventRef on_preempt) {
1611 osd->local_reserver.request_reservation(
1612 pg_id,
1613 on_grant ? new QueuePeeringEvt(
1614 this, on_grant) : nullptr,
1615 priority,
1616 on_preempt ? new QueuePeeringEvt(
1617 this, on_preempt) : nullptr);
1618}
7c673cae 1619
9f95a23c
TL
1620void PG::update_local_background_io_priority(
1621 unsigned priority) {
1622 osd->local_reserver.update_priority(
1623 pg_id,
1624 priority);
1625}
7c673cae 1626
9f95a23c
TL
1627void PG::cancel_local_background_io_reservation() {
1628 osd->local_reserver.cancel_reservation(
1629 pg_id);
1630}
7c673cae 1631
9f95a23c
TL
1632void PG::request_remote_recovery_reservation(
1633 unsigned priority,
1634 PGPeeringEventRef on_grant,
1635 PGPeeringEventRef on_preempt) {
1636 osd->remote_reserver.request_reservation(
1637 pg_id,
1638 on_grant ? new QueuePeeringEvt(
1639 this, on_grant) : nullptr,
1640 priority,
1641 on_preempt ? new QueuePeeringEvt(
1642 this, on_preempt) : nullptr);
1643}
11fdf7f2 1644
9f95a23c
TL
1645void PG::cancel_remote_recovery_reservation() {
1646 osd->remote_reserver.cancel_reservation(
1647 pg_id);
7c673cae
FG
1648}
1649
9f95a23c
TL
1650void PG::schedule_event_on_commit(
1651 ObjectStore::Transaction &t,
1652 PGPeeringEventRef on_commit)
11fdf7f2 1653{
9f95a23c 1654 t.register_on_commit(new QueuePeeringEvt(this, on_commit));
11fdf7f2
TL
1655}
1656
9f95a23c 1657void PG::on_active_exit()
11fdf7f2 1658{
9f95a23c
TL
1659 backfill_reserving = false;
1660 agent_stop();
11fdf7f2
TL
1661}
1662
9f95a23c 1663void PG::on_active_advmap(const OSDMapRef &osdmap)
11fdf7f2 1664{
9f95a23c
TL
1665 const auto& new_removed_snaps = osdmap->get_new_removed_snaps();
1666 auto i = new_removed_snaps.find(get_pgid().pool());
1667 if (i != new_removed_snaps.end()) {
1668 bool bad = false;
1669 for (auto j : i->second) {
1670 if (snap_trimq.intersects(j.first, j.second)) {
1671 decltype(snap_trimq) added, overlap;
1672 added.insert(j.first, j.second);
1673 overlap.intersection_of(snap_trimq, added);
1674 derr << __func__ << " removed_snaps already contains "
1675 << overlap << dendl;
1676 bad = true;
1677 snap_trimq.union_of(added);
1678 } else {
1679 snap_trimq.insert(j.first, j.second);
1680 }
1681 }
1682 dout(10) << __func__ << " new removed_snaps " << i->second
1683 << ", snap_trimq now " << snap_trimq << dendl;
1684 ceph_assert(!bad || !cct->_conf->osd_debug_verify_cached_snaps);
1685 }
1686
1687 const auto& new_purged_snaps = osdmap->get_new_purged_snaps();
1688 auto j = new_purged_snaps.find(get_pgid().pgid.pool());
1689 if (j != new_purged_snaps.end()) {
1690 bool bad = false;
1691 for (auto k : j->second) {
1692 if (!recovery_state.get_info().purged_snaps.contains(k.first, k.second)) {
1693 interval_set<snapid_t> rm, overlap;
1694 rm.insert(k.first, k.second);
1695 overlap.intersection_of(recovery_state.get_info().purged_snaps, rm);
1696 derr << __func__ << " purged_snaps does not contain "
1697 << rm << ", only " << overlap << dendl;
1698 recovery_state.adjust_purged_snaps(
1699 [&overlap](auto &purged_snaps) {
1700 purged_snaps.subtract(overlap);
1701 });
1702 // This can currently happen in the normal (if unlikely) course of
1703 // events. Because adding snaps to purged_snaps does not increase
1704 // the pg version or add a pg log entry, we don't reliably propagate
1705 // purged_snaps additions to other OSDs.
1706 // One example:
1707 // - purge S
1708 // - primary and replicas update purged_snaps
1709 // - no object updates
1710 // - pg mapping changes, new primary on different node
1711 // - new primary pg version == eversion_t(), so info is not
1712 // propagated.
1713 //bad = true;
1714 } else {
1715 recovery_state.adjust_purged_snaps(
1716 [&k](auto &purged_snaps) {
1717 purged_snaps.erase(k.first, k.second);
1718 });
11fdf7f2
TL
1719 }
1720 }
9f95a23c
TL
1721 dout(10) << __func__ << " new purged_snaps " << j->second
1722 << ", now " << recovery_state.get_info().purged_snaps << dendl;
1723 ceph_assert(!bad || !cct->_conf->osd_debug_verify_cached_snaps);
11fdf7f2 1724 }
11fdf7f2
TL
1725}
1726
9f95a23c 1727void PG::queue_snap_retrim(snapid_t snap)
7c673cae 1728{
9f95a23c
TL
1729 if (!is_active() ||
1730 !is_primary()) {
1731 dout(10) << __func__ << " snap " << snap << " - not active and primary"
1732 << dendl;
7c673cae 1733 return;
7c673cae 1734 }
9f95a23c
TL
1735 if (!snap_trimq.contains(snap)) {
1736 snap_trimq.insert(snap);
1737 snap_trimq_repeat.insert(snap);
1738 dout(20) << __func__ << " snap " << snap
1739 << ", trimq now " << snap_trimq
1740 << ", repeat " << snap_trimq_repeat << dendl;
1741 kick_snap_trim();
1742 } else {
1743 dout(20) << __func__ << " snap " << snap
1744 << " already in trimq " << snap_trimq << dendl;
7c673cae 1745 }
7c673cae
FG
1746}
1747
9f95a23c 1748void PG::on_active_actmap()
7c673cae 1749{
9f95a23c
TL
1750 if (cct->_conf->osd_check_for_log_corruption)
1751 check_log_for_corruption(osd->store);
1752
1753
1754 if (recovery_state.is_active()) {
1755 dout(10) << "Active: kicking snap trim" << dendl;
1756 kick_snap_trim();
7c673cae 1757 }
9f95a23c
TL
1758
1759 if (recovery_state.is_peered() &&
1760 !recovery_state.is_clean() &&
1761 !recovery_state.get_osdmap()->test_flag(CEPH_OSDMAP_NOBACKFILL) &&
1762 (!recovery_state.get_osdmap()->test_flag(CEPH_OSDMAP_NOREBALANCE) ||
1763 recovery_state.is_degraded())) {
1764 queue_recovery();
7c673cae
FG
1765 }
1766}
1767
9f95a23c 1768void PG::on_backfill_reserved()
7c673cae 1769{
9f95a23c
TL
1770 backfill_reserving = false;
1771 queue_recovery();
7c673cae
FG
1772}
1773
9f95a23c 1774void PG::on_backfill_canceled()
7c673cae 1775{
9f95a23c
TL
1776 if (!waiting_on_backfill.empty()) {
1777 waiting_on_backfill.clear();
1778 finish_recovery_op(hobject_t::get_max());
7c673cae
FG
1779 }
1780}
1781
9f95a23c 1782void PG::on_recovery_reserved()
7c673cae 1783{
9f95a23c 1784 queue_recovery();
7c673cae
FG
1785}
1786
9f95a23c 1787void PG::set_not_ready_to_merge_target(pg_t pgid, pg_t src)
7c673cae 1788{
9f95a23c 1789 osd->set_not_ready_to_merge_target(pgid, src);
7c673cae
FG
1790}
1791
9f95a23c 1792void PG::set_not_ready_to_merge_source(pg_t pgid)
7c673cae 1793{
9f95a23c 1794 osd->set_not_ready_to_merge_source(pgid);
7c673cae
FG
1795}
1796
9f95a23c 1797void PG::set_ready_to_merge_target(eversion_t lu, epoch_t les, epoch_t lec)
7c673cae 1798{
9f95a23c 1799 osd->set_ready_to_merge_target(this, lu, les, lec);
7c673cae
FG
1800}
1801
9f95a23c 1802void PG::set_ready_to_merge_source(eversion_t lu)
7c673cae 1803{
9f95a23c 1804 osd->set_ready_to_merge_source(this, lu);
7c673cae
FG
1805}
1806
9f95a23c 1807void PG::send_pg_created(pg_t pgid)
7c673cae 1808{
9f95a23c
TL
1809 osd->send_pg_created(pgid);
1810}
7c673cae 1811
9f95a23c
TL
1812ceph::signedspan PG::get_mnow()
1813{
1814 return osd->get_mnow();
1815}
7c673cae 1816
9f95a23c
TL
1817HeartbeatStampsRef PG::get_hb_stamps(int peer)
1818{
1819 return osd->get_hb_stamps(peer);
7c673cae
FG
1820}
1821
9f95a23c
TL
1822void PG::schedule_renew_lease(epoch_t lpr, ceph::timespan delay)
1823{
1824 auto spgid = info.pgid;
1825 auto o = osd;
1826 osd->mono_timer.add_event(
1827 delay,
1828 [o, lpr, spgid]() {
1829 o->queue_renew_lease(lpr, spgid);
1830 });
1831}
7c673cae 1832
9f95a23c 1833void PG::queue_check_readable(epoch_t lpr, ceph::timespan delay)
7c673cae 1834{
9f95a23c 1835 osd->queue_check_readable(info.pgid, lpr, delay);
7c673cae
FG
1836}
1837
9f95a23c 1838void PG::rebuild_missing_set_with_deletes(PGLog &pglog)
91327a77 1839{
9f95a23c
TL
1840 pglog.rebuild_missing_set_with_deletes(
1841 osd->store,
1842 ch,
1843 recovery_state.get_info());
91327a77
AA
1844}
1845
9f95a23c 1846void PG::on_activate_committed()
91327a77 1847{
9f95a23c
TL
1848 if (!is_primary()) {
1849 // waiters
1850 if (recovery_state.needs_flush() == 0) {
1851 requeue_ops(waiting_for_peered);
1852 } else if (!waiting_for_peered.empty()) {
1853 dout(10) << __func__ << " flushes in progress, moving "
1854 << waiting_for_peered.size() << " items to waiting_for_flush"
1855 << dendl;
1856 ceph_assert(waiting_for_flush.empty());
1857 waiting_for_flush.swap(waiting_for_peered);
91327a77 1858 }
9f95a23c
TL
1859 }
1860}
91327a77 1861
9f95a23c
TL
1862void PG::do_replica_scrub_map(OpRequestRef op)
1863{
1864 auto m = op->get_req<MOSDRepScrubMap>();
1865 dout(7) << __func__ << " " << *m << dendl;
1866 if (m->map_epoch < info.history.same_interval_since) {
1867 dout(10) << __func__ << " discarding old from "
1868 << m->map_epoch << " < " << info.history.same_interval_since
1869 << dendl;
1870 return;
1871 }
1872 if (!scrubber.is_chunky_scrub_active()) {
1873 dout(10) << __func__ << " scrub isn't active" << dendl;
1874 return;
1875 }
91327a77 1876
9f95a23c 1877 op->mark_started();
b32b8144 1878
9f95a23c
TL
1879 auto p = const_cast<bufferlist&>(m->get_data()).cbegin();
1880 scrubber.received_maps[m->from].decode(p, info.pgid.pool());
1881 dout(10) << "map version is "
1882 << scrubber.received_maps[m->from].valid_through
1883 << dendl;
7c673cae 1884
9f95a23c
TL
1885 dout(10) << __func__ << " waiting_on_whom was " << scrubber.waiting_on_whom
1886 << dendl;
1887 ceph_assert(scrubber.waiting_on_whom.count(m->from));
1888 scrubber.waiting_on_whom.erase(m->from);
1889 if (m->preempted) {
1890 dout(10) << __func__ << " replica was preempted, setting flag" << dendl;
1891 scrub_preempted = true;
1892 }
1893 if (scrubber.waiting_on_whom.empty()) {
1894 requeue_scrub(ops_blocked_by_scrub());
7c673cae
FG
1895 }
1896}
1897
9f95a23c
TL
1898// send scrub v3 messages (chunky scrub)
1899void PG::_request_scrub_map(
1900 pg_shard_t replica, eversion_t version,
1901 hobject_t start, hobject_t end,
1902 bool deep,
1903 bool allow_preemption)
7c673cae 1904{
9f95a23c
TL
1905 ceph_assert(replica != pg_whoami);
1906 dout(10) << "scrub requesting scrubmap from osd." << replica
1907 << " deep " << (int)deep << dendl;
1908 MOSDRepScrub *repscrubop = new MOSDRepScrub(
1909 spg_t(info.pgid.pgid, replica.shard), version,
1910 get_osdmap_epoch(),
1911 get_last_peering_reset(),
1912 start, end, deep,
1913 allow_preemption,
1914 scrubber.priority,
1915 ops_blocked_by_scrub());
1916 // default priority, we want the rep scrub processed prior to any recovery
1917 // or client io messages (we are holding a lock!)
1918 osd->send_message_osd_cluster(
1919 replica.osd, repscrubop, get_osdmap_epoch());
7c673cae
FG
1920}
1921
9f95a23c 1922void PG::handle_scrub_reserve_request(OpRequestRef op)
7c673cae 1923{
9f95a23c
TL
1924 dout(7) << __func__ << " " << *op->get_req() << dendl;
1925 op->mark_started();
1926 if (scrubber.remote_reserved) {
1927 dout(10) << __func__ << " ignoring reserve request: Already reserved"
1928 << dendl;
7c673cae 1929 return;
7c673cae 1930 }
9f95a23c
TL
1931 if ((cct->_conf->osd_scrub_during_recovery || !osd->is_recovery_active()) &&
1932 osd->inc_scrubs_remote()) {
1933 scrubber.remote_reserved = true;
b32b8144 1934 } else {
9f95a23c
TL
1935 dout(20) << __func__ << ": failed to reserve remotely" << dendl;
1936 scrubber.remote_reserved = false;
11fdf7f2 1937 }
9f95a23c
TL
1938 auto m = op->get_req<MOSDScrubReserve>();
1939 Message *reply = new MOSDScrubReserve(
1940 spg_t(info.pgid.pgid, get_primary().shard),
1941 m->map_epoch,
1942 scrubber.remote_reserved ? MOSDScrubReserve::GRANT : MOSDScrubReserve::REJECT,
1943 pg_whoami);
1944 osd->send_message_osd_cluster(reply, op->get_req()->get_connection());
1945}
11fdf7f2 1946
9f95a23c
TL
1947void PG::handle_scrub_reserve_grant(OpRequestRef op, pg_shard_t from)
1948{
1949 dout(7) << __func__ << " " << *op->get_req() << dendl;
1950 op->mark_started();
1951 if (!scrubber.local_reserved) {
1952 dout(10) << "ignoring obsolete scrub reserve reply" << dendl;
1953 return;
1954 }
1955 if (scrubber.reserved_peers.find(from) != scrubber.reserved_peers.end()) {
1956 dout(10) << " already had osd." << from << " reserved" << dendl;
7c673cae 1957 } else {
9f95a23c
TL
1958 dout(10) << " osd." << from << " scrub reserve = success" << dendl;
1959 scrubber.reserved_peers.insert(from);
1960 sched_scrub();
7c673cae 1961 }
7c673cae
FG
1962}
1963
9f95a23c 1964void PG::handle_scrub_reserve_reject(OpRequestRef op, pg_shard_t from)
7c673cae 1965{
9f95a23c
TL
1966 dout(7) << __func__ << " " << *op->get_req() << dendl;
1967 op->mark_started();
1968 if (!scrubber.local_reserved) {
1969 dout(10) << "ignoring obsolete scrub reserve reply" << dendl;
1970 return;
1971 }
1972 if (scrubber.reserved_peers.find(from) != scrubber.reserved_peers.end()) {
1973 dout(10) << " already had osd." << from << " reserved" << dendl;
1974 } else {
1975 /* One decline stops this pg from being scheduled for scrubbing. */
1976 dout(10) << " osd." << from << " scrub reserve = fail" << dendl;
1977 scrubber.reserve_failed = true;
1978 sched_scrub();
1979 }
7c673cae
FG
1980}
1981
9f95a23c 1982void PG::handle_scrub_reserve_release(OpRequestRef op)
7c673cae 1983{
9f95a23c
TL
1984 dout(7) << __func__ << " " << *op->get_req() << dendl;
1985 op->mark_started();
1986 clear_scrub_reserved();
7c673cae
FG
1987}
1988
9f95a23c
TL
1989// Compute pending backfill data
1990static int64_t pending_backfill(CephContext *cct, int64_t bf_bytes, int64_t local_bytes)
11fdf7f2 1991{
9f95a23c
TL
1992 lgeneric_dout(cct, 20) << __func__ << " Adjust local usage "
1993 << (local_bytes >> 10) << "KiB"
1994 << " primary usage " << (bf_bytes >> 10)
1995 << "KiB" << dendl;
11fdf7f2 1996
9f95a23c
TL
1997 return std::max((int64_t)0, bf_bytes - local_bytes);
1998}
7c673cae 1999
7c673cae 2000
9f95a23c
TL
2001// We can zero the value of primary num_bytes as just an atomic.
2002// However, setting above zero reserves space for backfill and requires
2003// the OSDService::stat_lock which protects all OSD usage
2004bool PG::try_reserve_recovery_space(
2005 int64_t primary_bytes, int64_t local_bytes) {
2006 // Use tentative_bacfill_full() to make sure enough
2007 // space is available to handle target bytes from primary.
7c673cae 2008
9f95a23c
TL
2009 // TODO: If we passed num_objects from primary we could account for
2010 // an estimate of the metadata overhead.
7c673cae 2011
9f95a23c
TL
2012 // TODO: If we had compressed_allocated and compressed_original from primary
2013 // we could compute compression ratio and adjust accordingly.
7c673cae 2014
9f95a23c
TL
2015 // XXX: There is no way to get omap overhead and this would only apply
2016 // to whatever possibly different partition that is storing the database.
7c673cae 2017
9f95a23c
TL
2018 // update_osd_stat() from heartbeat will do this on a new
2019 // statfs using ps->primary_bytes.
2020 uint64_t pending_adjustment = 0;
2021 if (primary_bytes) {
2022 // For erasure coded pool overestimate by a full stripe per object
2023 // because we don't know how each objected rounded to the nearest stripe
2024 if (pool.info.is_erasure()) {
2025 primary_bytes /= (int)get_pgbackend()->get_ec_data_chunk_count();
2026 primary_bytes += get_pgbackend()->get_ec_stripe_chunk_size() *
2027 info.stats.stats.sum.num_objects;
2028 local_bytes /= (int)get_pgbackend()->get_ec_data_chunk_count();
2029 local_bytes += get_pgbackend()->get_ec_stripe_chunk_size() *
2030 info.stats.stats.sum.num_objects;
2031 }
2032 pending_adjustment = pending_backfill(
2033 cct,
2034 primary_bytes,
2035 local_bytes);
2036 dout(10) << __func__ << " primary_bytes " << (primary_bytes >> 10)
2037 << "KiB"
2038 << " local " << (local_bytes >> 10) << "KiB"
2039 << " pending_adjustments " << (pending_adjustment >> 10) << "KiB"
2040 << dendl;
7c673cae 2041 }
7c673cae 2042
9f95a23c
TL
2043 // This lock protects not only the stats OSDService but also setting the
2044 // pg primary_bytes. That's why we don't immediately unlock
2045 std::lock_guard l{osd->stat_lock};
2046 osd_stat_t cur_stat = osd->osd_stat;
2047 if (cct->_conf->osd_debug_reject_backfill_probability > 0 &&
2048 (rand()%1000 < (cct->_conf->osd_debug_reject_backfill_probability*1000.0))) {
2049 dout(10) << "backfill reservation rejected: failure injection"
2050 << dendl;
2051 return false;
2052 } else if (!cct->_conf->osd_debug_skip_full_check_in_backfill_reservation &&
2053 osd->tentative_backfill_full(this, pending_adjustment, cur_stat)) {
2054 dout(10) << "backfill reservation rejected: backfill full"
2055 << dendl;
2056 return false;
2057 } else {
2058 // Don't reserve space if skipped reservation check, this is used
2059 // to test the other backfill full check AND in case a corruption
2060 // of num_bytes requires ignoring that value and trying the
2061 // backfill anyway.
2062 if (primary_bytes &&
2063 !cct->_conf->osd_debug_skip_full_check_in_backfill_reservation) {
2064 primary_num_bytes.store(primary_bytes);
2065 local_num_bytes.store(local_bytes);
2066 } else {
2067 unreserve_recovery_space();
2068 }
2069 return true;
7c673cae
FG
2070 }
2071}
2072
9f95a23c
TL
2073void PG::unreserve_recovery_space() {
2074 primary_num_bytes.store(0);
2075 local_num_bytes.store(0);
2076 return;
2077}
7c673cae 2078
9f95a23c 2079void PG::clear_scrub_reserved()
7c673cae 2080{
9f95a23c
TL
2081 scrubber.reserved_peers.clear();
2082 scrubber.reserve_failed = false;
7c673cae 2083
9f95a23c
TL
2084 if (scrubber.local_reserved) {
2085 scrubber.local_reserved = false;
2086 osd->dec_scrubs_local();
7c673cae 2087 }
9f95a23c
TL
2088 if (scrubber.remote_reserved) {
2089 scrubber.remote_reserved = false;
2090 osd->dec_scrubs_remote();
7c673cae 2091 }
7c673cae
FG
2092}
2093
9f95a23c 2094void PG::scrub_reserve_replicas()
7c673cae 2095{
9f95a23c
TL
2096 ceph_assert(recovery_state.get_backfill_targets().empty());
2097 std::vector<std::pair<int, Message*>> messages;
2098 messages.reserve(get_actingset().size());
2099 epoch_t e = get_osdmap_epoch();
2100 for (set<pg_shard_t>::iterator i = get_actingset().begin();
2101 i != get_actingset().end();
2102 ++i) {
2103 if (*i == pg_whoami) continue;
2104 dout(10) << "scrub requesting reserve from osd." << *i << dendl;
2105 Message* m = new MOSDScrubReserve(spg_t(info.pgid.pgid, i->shard), e,
2106 MOSDScrubReserve::REQUEST, pg_whoami);
2107 messages.push_back(std::make_pair(i->osd, m));
2108 }
2109 if (!messages.empty()) {
2110 osd->send_message_osd_cluster(messages, e);
2111 }
7c673cae
FG
2112}
2113
9f95a23c 2114void PG::scrub_unreserve_replicas()
7c673cae 2115{
9f95a23c
TL
2116 ceph_assert(recovery_state.get_backfill_targets().empty());
2117 std::vector<std::pair<int, Message*>> messages;
2118 messages.reserve(get_actingset().size());
2119 epoch_t e = get_osdmap_epoch();
2120 for (set<pg_shard_t>::iterator i = get_actingset().begin();
2121 i != get_actingset().end();
2122 ++i) {
2123 if (*i == pg_whoami) continue;
2124 dout(10) << "scrub requesting unreserve from osd." << *i << dendl;
2125 Message* m = new MOSDScrubReserve(spg_t(info.pgid.pgid, i->shard), e,
2126 MOSDScrubReserve::RELEASE, pg_whoami);
2127 messages.push_back(std::make_pair(i->osd, m));
2128 }
2129 if (!messages.empty()) {
2130 osd->send_message_osd_cluster(messages, e);
7c673cae 2131 }
7c673cae
FG
2132}
2133
9f95a23c 2134void PG::_scan_rollback_obs(const vector<ghobject_t> &rollback_obs)
7c673cae 2135{
9f95a23c
TL
2136 ObjectStore::Transaction t;
2137 eversion_t trimmed_to = recovery_state.get_last_rollback_info_trimmed_to_applied();
2138 for (vector<ghobject_t>::const_iterator i = rollback_obs.begin();
2139 i != rollback_obs.end();
2140 ++i) {
2141 if (i->generation < trimmed_to.version) {
2142 dout(10) << __func__ << "osd." << osd->whoami
2143 << " pg " << info.pgid
2144 << " found obsolete rollback obj "
2145 << *i << " generation < trimmed_to "
2146 << trimmed_to
2147 << "...repaired" << dendl;
2148 t.remove(coll, *i);
2149 }
2150 }
2151 if (!t.empty()) {
2152 derr << __func__ << ": queueing trans to clean up obsolete rollback objs"
2153 << dendl;
2154 osd->store->queue_transaction(ch, std::move(t), NULL);
2155 }
7c673cae
FG
2156}
2157
9f95a23c 2158void PG::_scan_snaps(ScrubMap &smap)
7c673cae 2159{
9f95a23c
TL
2160 hobject_t head;
2161 SnapSet snapset;
7c673cae 2162
9f95a23c
TL
2163 // Test qa/standalone/scrub/osd-scrub-snaps.sh uses this message to verify
2164 // caller using clean_meta_map(), and it works properly.
2165 dout(20) << __func__ << " start" << dendl;
7c673cae 2166
9f95a23c
TL
2167 for (map<hobject_t, ScrubMap::object>::reverse_iterator i = smap.objects.rbegin();
2168 i != smap.objects.rend();
2169 ++i) {
2170 const hobject_t &hoid = i->first;
2171 ScrubMap::object &o = i->second;
7c673cae 2172
9f95a23c 2173 dout(20) << __func__ << " " << hoid << dendl;
7c673cae 2174
9f95a23c
TL
2175 ceph_assert(!hoid.is_snapdir());
2176 if (hoid.is_head()) {
2177 // parse the SnapSet
2178 bufferlist bl;
2179 if (o.attrs.find(SS_ATTR) == o.attrs.end()) {
2180 continue;
2181 }
2182 bl.push_back(o.attrs[SS_ATTR]);
2183 auto p = bl.cbegin();
2184 try {
2185 decode(snapset, p);
2186 } catch(...) {
2187 continue;
2188 }
2189 head = hoid.get_head();
2190 continue;
2191 }
2192 if (hoid.snap < CEPH_MAXSNAP) {
2193 // check and if necessary fix snap_mapper
2194 if (hoid.get_head() != head) {
2195 derr << __func__ << " no head for " << hoid << " (have " << head << ")"
2196 << dendl;
2197 continue;
2198 }
2199 set<snapid_t> obj_snaps;
2200 auto p = snapset.clone_snaps.find(hoid.snap);
2201 if (p == snapset.clone_snaps.end()) {
2202 derr << __func__ << " no clone_snaps for " << hoid << " in " << snapset
2203 << dendl;
2204 continue;
2205 }
2206 obj_snaps.insert(p->second.begin(), p->second.end());
2207 set<snapid_t> cur_snaps;
2208 int r = snap_mapper.get_snaps(hoid, &cur_snaps);
2209 if (r != 0 && r != -ENOENT) {
2210 derr << __func__ << ": get_snaps returned " << cpp_strerror(r) << dendl;
2211 ceph_abort();
2212 }
2213 if (r == -ENOENT || cur_snaps != obj_snaps) {
2214 ObjectStore::Transaction t;
2215 OSDriver::OSTransaction _t(osdriver.get_transaction(&t));
2216 if (r == 0) {
2217 r = snap_mapper.remove_oid(hoid, &_t);
2218 if (r != 0) {
2219 derr << __func__ << ": remove_oid returned " << cpp_strerror(r)
2220 << dendl;
2221 ceph_abort();
2222 }
2223 osd->clog->error() << "osd." << osd->whoami
2224 << " found snap mapper error on pg "
2225 << info.pgid
2226 << " oid " << hoid << " snaps in mapper: "
2227 << cur_snaps << ", oi: "
2228 << obj_snaps
2229 << "...repaired";
2230 } else {
2231 osd->clog->error() << "osd." << osd->whoami
2232 << " found snap mapper error on pg "
2233 << info.pgid
2234 << " oid " << hoid << " snaps missing in mapper"
2235 << ", should be: "
2236 << obj_snaps
2237 << " was " << cur_snaps << " r " << r
2238 << "...repaired";
2239 }
2240 snap_mapper.add_oid(hoid, obj_snaps, &_t);
7c673cae 2241
9f95a23c
TL
2242 // wait for repair to apply to avoid confusing other bits of the system.
2243 {
2244 ceph::condition_variable my_cond;
2245 ceph::mutex my_lock = ceph::make_mutex("PG::_scan_snaps my_lock");
2246 int r = 0;
2247 bool done;
2248 t.register_on_applied_sync(
2249 new C_SafeCond(my_lock, my_cond, &done, &r));
2250 r = osd->store->queue_transaction(ch, std::move(t));
2251 if (r != 0) {
2252 derr << __func__ << ": queue_transaction got " << cpp_strerror(r)
2253 << dendl;
2254 } else {
2255 std::unique_lock l{my_lock};
2256 my_cond.wait(l, [&done] { return done;});
2257 }
2258 }
2259 }
2260 }
7c673cae 2261 }
7c673cae
FG
2262}
2263
9f95a23c 2264void PG::_repair_oinfo_oid(ScrubMap &smap)
7c673cae 2265{
9f95a23c
TL
2266 for (map<hobject_t, ScrubMap::object>::reverse_iterator i = smap.objects.rbegin();
2267 i != smap.objects.rend();
2268 ++i) {
2269 const hobject_t &hoid = i->first;
2270 ScrubMap::object &o = i->second;
7c673cae 2271
9f95a23c
TL
2272 bufferlist bl;
2273 if (o.attrs.find(OI_ATTR) == o.attrs.end()) {
2274 continue;
2275 }
2276 bl.push_back(o.attrs[OI_ATTR]);
2277 object_info_t oi;
2278 try {
2279 oi.decode(bl);
2280 } catch(...) {
2281 continue;
2282 }
2283 if (oi.soid != hoid) {
2284 ObjectStore::Transaction t;
2285 OSDriver::OSTransaction _t(osdriver.get_transaction(&t));
2286 osd->clog->error() << "osd." << osd->whoami
2287 << " found object info error on pg "
2288 << info.pgid
2289 << " oid " << hoid << " oid in object info: "
2290 << oi.soid
2291 << "...repaired";
2292 // Fix object info
2293 oi.soid = hoid;
2294 bl.clear();
2295 encode(oi, bl, get_osdmap()->get_features(CEPH_ENTITY_TYPE_OSD, nullptr));
7c673cae 2296
9f95a23c
TL
2297 bufferptr bp(bl.c_str(), bl.length());
2298 o.attrs[OI_ATTR] = bp;
7c673cae 2299
9f95a23c
TL
2300 t.setattr(coll, ghobject_t(hoid), OI_ATTR, bl);
2301 int r = osd->store->queue_transaction(ch, std::move(t));
2302 if (r != 0) {
2303 derr << __func__ << ": queue_transaction got " << cpp_strerror(r)
2304 << dendl;
2305 }
7c673cae
FG
2306 }
2307 }
7c673cae 2308}
9f95a23c
TL
2309int PG::build_scrub_map_chunk(
2310 ScrubMap &map,
2311 ScrubMapBuilder &pos,
2312 hobject_t start,
2313 hobject_t end,
2314 bool deep,
2315 ThreadPool::TPHandle &handle)
7c673cae 2316{
9f95a23c
TL
2317 dout(10) << __func__ << " [" << start << "," << end << ") "
2318 << " pos " << pos
2319 << dendl;
11fdf7f2 2320
9f95a23c
TL
2321 // start
2322 while (pos.empty()) {
2323 pos.deep = deep;
2324 map.valid_through = info.last_update;
11fdf7f2 2325
9f95a23c
TL
2326 // objects
2327 vector<ghobject_t> rollback_obs;
2328 pos.ret = get_pgbackend()->objects_list_range(
2329 start,
2330 end,
2331 &pos.ls,
2332 &rollback_obs);
2333 if (pos.ret < 0) {
2334 dout(5) << "objects_list_range error: " << pos.ret << dendl;
2335 return pos.ret;
2336 }
2337 if (pos.ls.empty()) {
2338 break;
2339 }
2340 _scan_rollback_obs(rollback_obs);
2341 pos.pos = 0;
2342 return -EINPROGRESS;
2343 }
11fdf7f2 2344
9f95a23c
TL
2345 // scan objects
2346 while (!pos.done()) {
2347 int r = get_pgbackend()->be_scan_list(map, pos);
2348 if (r == -EINPROGRESS) {
2349 return r;
2350 }
2351 }
11fdf7f2 2352
9f95a23c
TL
2353 // finish
2354 dout(20) << __func__ << " finishing" << dendl;
2355 ceph_assert(pos.done());
2356 _repair_oinfo_oid(map);
2357 if (!is_primary()) {
2358 ScrubMap for_meta_scrub;
2359 // In case we restarted smaller chunk, clear old data
2360 scrubber.cleaned_meta_map.clear_from(scrubber.start);
2361 scrubber.cleaned_meta_map.insert(map);
2362 scrubber.clean_meta_map(for_meta_scrub);
2363 _scan_snaps(for_meta_scrub);
7c673cae 2364 }
9f95a23c
TL
2365
2366 dout(20) << __func__ << " done, got " << map.objects.size() << " items"
2367 << dendl;
7c673cae
FG
2368 return 0;
2369}
2370
9f95a23c
TL
2371void PG::Scrubber::cleanup_store(ObjectStore::Transaction *t) {
2372 if (!store)
2373 return;
2374 struct OnComplete : Context {
2375 std::unique_ptr<Scrub::Store> store;
2376 explicit OnComplete(
2377 std::unique_ptr<Scrub::Store> &&store)
2378 : store(std::move(store)) {}
2379 void finish(int) override {}
2380 };
2381 store->cleanup(t);
2382 t->register_on_complete(new OnComplete(std::move(store)));
2383 ceph_assert(!store);
2384}
7c673cae 2385
9f95a23c
TL
2386void PG::repair_object(
2387 const hobject_t &soid,
2388 const list<pair<ScrubMap::object, pg_shard_t> > &ok_peers,
2389 const set<pg_shard_t> &bad_peers)
2390{
2391 set<pg_shard_t> ok_shards;
2392 for (auto &&peer: ok_peers) ok_shards.insert(peer.second);
d2e6a577 2393
9f95a23c
TL
2394 dout(10) << "repair_object " << soid
2395 << " bad_peers osd.{" << bad_peers << "},"
2396 << " ok_peers osd.{" << ok_shards << "}" << dendl;
11fdf7f2 2397
9f95a23c
TL
2398 const ScrubMap::object &po = ok_peers.back().first;
2399 eversion_t v;
2400 object_info_t oi;
2401 try {
2402 bufferlist bv;
2403 if (po.attrs.count(OI_ATTR)) {
2404 bv.push_back(po.attrs.find(OI_ATTR)->second);
2405 }
2406 auto bliter = bv.cbegin();
2407 decode(oi, bliter);
2408 } catch (...) {
2409 dout(0) << __func__ << ": Need version of replica, bad object_info_t: "
2410 << soid << dendl;
2411 ceph_abort();
11fdf7f2
TL
2412 }
2413
9f95a23c
TL
2414 if (bad_peers.count(get_primary())) {
2415 // We should only be scrubbing if the PG is clean.
2416 ceph_assert(waiting_for_unreadable_object.empty());
2417 dout(10) << __func__ << ": primary = " << get_primary() << dendl;
11fdf7f2
TL
2418 }
2419
9f95a23c
TL
2420 /* No need to pass ok_peers, they must not be missing the object, so
2421 * force_object_missing will add them to missing_loc anyway */
2422 recovery_state.force_object_missing(bad_peers, soid, oi.version);
7c673cae
FG
2423}
2424
9f95a23c
TL
2425/* replica_scrub
2426 *
2427 * Wait for last_update_applied to match msg->scrub_to as above. Wait
2428 * for pushes to complete in case of recent recovery. Build a single
2429 * scrubmap of objects that are in the range [msg->start, msg->end).
2430 */
2431void PG::replica_scrub(
2432 OpRequestRef op,
2433 ThreadPool::TPHandle &handle)
7c673cae 2434{
9f95a23c
TL
2435 auto msg = op->get_req<MOSDRepScrub>();
2436 ceph_assert(!scrubber.active_rep_scrub);
2437 dout(7) << "replica_scrub" << dendl;
7c673cae 2438
9f95a23c
TL
2439 if (msg->map_epoch < info.history.same_interval_since) {
2440 dout(10) << "replica_scrub discarding old replica_scrub from "
2441 << msg->map_epoch << " < " << info.history.same_interval_since
2442 << dendl;
2443 return;
7c673cae 2444 }
7c673cae 2445
9f95a23c
TL
2446 ceph_assert(msg->chunky);
2447 if (active_pushes > 0) {
2448 dout(10) << "waiting for active pushes to finish" << dendl;
2449 scrubber.active_rep_scrub = op;
7c673cae 2450 return;
7c673cae 2451 }
7c673cae 2452
9f95a23c
TL
2453 scrubber.state = Scrubber::BUILD_MAP_REPLICA;
2454 scrubber.replica_scrub_start = msg->min_epoch;
2455 scrubber.start = msg->start;
2456 scrubber.end = msg->end;
2457 scrubber.max_end = msg->end;
2458 scrubber.deep = msg->deep;
2459 scrubber.epoch_start = info.history.same_interval_since;
2460 if (msg->priority) {
2461 scrubber.priority = msg->priority;
7c673cae 2462 } else {
9f95a23c 2463 scrubber.priority = get_scrub_priority();
7c673cae 2464 }
7c673cae 2465
9f95a23c
TL
2466 scrub_can_preempt = msg->allow_preemption;
2467 scrub_preempted = false;
2468 scrubber.replica_scrubmap_pos.reset();
7c673cae 2469
9f95a23c 2470 requeue_scrub(msg->high_priority);
7c673cae
FG
2471}
2472
9f95a23c
TL
2473/* Scrub:
2474 * PG_STATE_SCRUBBING is set when the scrub is queued
2475 *
2476 * scrub will be chunky if all OSDs in PG support chunky scrub
2477 * scrub will fail if OSDs are too old.
7c673cae 2478 */
9f95a23c 2479void PG::scrub(epoch_t queued, ThreadPool::TPHandle &handle)
7c673cae 2480{
9f95a23c
TL
2481 OSDService *osds = osd;
2482 double scrub_sleep = osds->osd->scrub_sleep_time(scrubber.must_scrub);
2483 if (scrub_sleep > 0 &&
2484 (scrubber.state == PG::Scrubber::NEW_CHUNK ||
2485 scrubber.state == PG::Scrubber::INACTIVE) &&
2486 scrubber.needs_sleep) {
2487 ceph_assert(!scrubber.sleeping);
2488 dout(20) << __func__ << " state is INACTIVE|NEW_CHUNK, sleeping" << dendl;
7c673cae 2489
9f95a23c
TL
2490 // Do an async sleep so we don't block the op queue
2491 spg_t pgid = get_pgid();
2492 int state = scrubber.state;
2493 auto scrub_requeue_callback =
2494 new LambdaContext([osds, pgid, state](int r) {
2495 PGRef pg = osds->osd->lookup_lock_pg(pgid);
2496 if (pg == nullptr) {
2497 lgeneric_dout(osds->osd->cct, 20)
2498 << "scrub_requeue_callback: Could not find "
2499 << "PG " << pgid << " can't complete scrub requeue after sleep"
2500 << dendl;
2501 return;
494da23a 2502 }
9f95a23c
TL
2503 pg->scrubber.sleeping = false;
2504 pg->scrubber.needs_sleep = false;
2505 lgeneric_dout(pg->cct, 20)
2506 << "scrub_requeue_callback: slept for "
2507 << ceph_clock_now() - pg->scrubber.sleep_start
2508 << ", re-queuing scrub with state " << state << dendl;
2509 pg->scrub_queued = false;
2510 pg->requeue_scrub();
2511 pg->scrubber.sleep_start = utime_t();
2512 pg->unlock();
2513 });
2514 std::lock_guard l(osd->sleep_lock);
2515 osd->sleep_timer.add_event_after(scrub_sleep,
2516 scrub_requeue_callback);
2517 scrubber.sleeping = true;
2518 scrubber.sleep_start = ceph_clock_now();
2519 return;
2520 }
2521 if (pg_has_reset_since(queued)) {
2522 return;
2523 }
2524 ceph_assert(scrub_queued);
2525 scrub_queued = false;
2526 scrubber.needs_sleep = true;
7c673cae 2527
9f95a23c
TL
2528 // for the replica
2529 if (!is_primary() &&
2530 scrubber.state == PG::Scrubber::BUILD_MAP_REPLICA) {
2531 chunky_scrub(handle);
2532 return;
7c673cae 2533 }
494da23a 2534
9f95a23c
TL
2535 if (!is_primary() || !is_active() || !is_clean() || !is_scrubbing()) {
2536 dout(10) << "scrub -- not primary or active or not clean" << dendl;
2537 state_clear(PG_STATE_SCRUBBING);
2538 state_clear(PG_STATE_REPAIR);
2539 state_clear(PG_STATE_DEEP_SCRUB);
2540 publish_stats_to_osd();
2541 return;
2542 }
7c673cae 2543
9f95a23c
TL
2544 if (!scrubber.active) {
2545 ceph_assert(recovery_state.get_backfill_targets().empty());
3efd9988 2546
9f95a23c 2547 scrubber.deep = state_test(PG_STATE_DEEP_SCRUB);
3efd9988 2548
9f95a23c
TL
2549 dout(10) << "starting a new chunky scrub" << dendl;
2550 }
7c673cae 2551
9f95a23c 2552 chunky_scrub(handle);
7c673cae
FG
2553}
2554
9f95a23c
TL
2555/*
2556 * Chunky scrub scrubs objects one chunk at a time with writes blocked for that
2557 * chunk.
2558 *
2559 * The object store is partitioned into chunks which end on hash boundaries. For
2560 * each chunk, the following logic is performed:
2561 *
2562 * (1) Block writes on the chunk
2563 * (2) Request maps from replicas
2564 * (3) Wait for pushes to be applied (after recovery)
2565 * (4) Wait for writes to flush on the chunk
2566 * (5) Wait for maps from replicas
2567 * (6) Compare / repair all scrub maps
2568 * (7) Wait for digest updates to apply
2569 *
2570 * This logic is encoded in the mostly linear state machine:
2571 *
2572 * +------------------+
2573 * _________v__________ |
2574 * | | |
2575 * | INACTIVE | |
2576 * |____________________| |
2577 * | |
2578 * | +----------+ |
2579 * _________v___v______ | |
2580 * | | | |
2581 * | NEW_CHUNK | | |
2582 * |____________________| | |
2583 * | | |
2584 * _________v__________ | |
2585 * | | | |
2586 * | WAIT_PUSHES | | |
2587 * |____________________| | |
2588 * | | |
2589 * _________v__________ | |
2590 * | | | |
2591 * | WAIT_LAST_UPDATE | | |
2592 * |____________________| | |
2593 * | | |
2594 * _________v__________ | |
2595 * | | | |
2596 * | BUILD_MAP | | |
2597 * |____________________| | |
2598 * | | |
2599 * _________v__________ | |
2600 * | | | |
2601 * | WAIT_REPLICAS | | |
2602 * |____________________| | |
2603 * | | |
2604 * _________v__________ | |
2605 * | | | |
2606 * | COMPARE_MAPS | | |
2607 * |____________________| | |
2608 * | | |
2609 * | | |
2610 * _________v__________ | |
2611 * | | | |
2612 * |WAIT_DIGEST_UPDATES | | |
2613 * |____________________| | |
2614 * | | | |
2615 * | +----------+ |
2616 * _________v__________ |
2617 * | | |
2618 * | FINISH | |
2619 * |____________________| |
2620 * | |
2621 * +------------------+
2622 *
2623 * The primary determines the last update from the subset by walking the log. If
2624 * it sees a log entry pertaining to a file in the chunk, it tells the replicas
2625 * to wait until that update is applied before building a scrub map. Both the
2626 * primary and replicas will wait for any active pushes to be applied.
2627 *
2628 * In contrast to classic_scrub, chunky_scrub is entirely handled by scrub_wq.
2629 *
2630 * scrubber.state encodes the current state of the scrub (refer to state diagram
2631 * for details).
2632 */
2633void PG::chunky_scrub(ThreadPool::TPHandle &handle)
11fdf7f2 2634{
9f95a23c
TL
2635 // check for map changes
2636 if (scrubber.is_chunky_scrub_active()) {
2637 if (scrubber.epoch_start != info.history.same_interval_since) {
2638 dout(10) << "scrub pg changed, aborting" << dendl;
2639 scrub_clear_state();
2640 scrub_unreserve_replicas();
2641 return;
2642 }
2643 }
11fdf7f2 2644
9f95a23c
TL
2645 bool done = false;
2646 int ret;
11fdf7f2 2647
9f95a23c
TL
2648 while (!done) {
2649 dout(20) << "scrub state " << Scrubber::state_string(scrubber.state)
2650 << " [" << scrubber.start << "," << scrubber.end << ")"
2651 << " max_end " << scrubber.max_end << dendl;
7c673cae 2652
9f95a23c
TL
2653 switch (scrubber.state) {
2654 case PG::Scrubber::INACTIVE:
2655 dout(10) << "scrub start" << dendl;
2656 ceph_assert(is_primary());
7c673cae 2657
9f95a23c
TL
2658 publish_stats_to_osd();
2659 scrubber.epoch_start = info.history.same_interval_since;
2660 scrubber.active = true;
7c673cae 2661
9f95a23c
TL
2662 {
2663 ObjectStore::Transaction t;
2664 scrubber.cleanup_store(&t);
2665 scrubber.store.reset(Scrub::Store::create(osd->store, &t,
2666 info.pgid, coll));
2667 osd->store->queue_transaction(ch, std::move(t), nullptr);
2668 }
7c673cae 2669
9f95a23c
TL
2670 // Don't include temporary objects when scrubbing
2671 scrubber.start = info.pgid.pgid.get_hobj_start();
2672 scrubber.state = PG::Scrubber::NEW_CHUNK;
7c673cae 2673
9f95a23c
TL
2674 {
2675 bool repair = state_test(PG_STATE_REPAIR);
2676 bool deep_scrub = state_test(PG_STATE_DEEP_SCRUB);
2677 const char *mode = (repair ? "repair": (deep_scrub ? "deep-scrub" : "scrub"));
2678 stringstream oss;
2679 oss << info.pgid.pgid << " " << mode << " starts" << std::endl;
2680 osd->clog->debug(oss);
2681 }
7c673cae 2682
9f95a23c
TL
2683 scrubber.preempt_left = cct->_conf.get_val<uint64_t>(
2684 "osd_scrub_max_preemptions");
2685 scrubber.preempt_divisor = 1;
2686 break;
7c673cae 2687
9f95a23c
TL
2688 case PG::Scrubber::NEW_CHUNK:
2689 scrubber.primary_scrubmap = ScrubMap();
2690 scrubber.received_maps.clear();
7c673cae 2691
9f95a23c
TL
2692 // begin (possible) preemption window
2693 if (scrub_preempted) {
2694 scrubber.preempt_left--;
2695 scrubber.preempt_divisor *= 2;
2696 dout(10) << __func__ << " preempted, " << scrubber.preempt_left
2697 << " left" << dendl;
2698 scrub_preempted = false;
2699 }
2700 scrub_can_preempt = scrubber.preempt_left > 0;
7c673cae 2701
9f95a23c
TL
2702 {
2703 /* get the start and end of our scrub chunk
2704 *
2705 * Our scrub chunk has an important restriction we're going to need to
2706 * respect. We can't let head be start or end.
2707 * Using a half-open interval means that if end == head,
2708 * we'd scrub/lock head and the clone right next to head in different
2709 * chunks which would allow us to miss clones created between
2710 * scrubbing that chunk and scrubbing the chunk including head.
2711 * This isn't true for any of the other clones since clones can
2712 * only be created "just to the left of" head. There is one exception
2713 * to this: promotion of clones which always happens to the left of the
2714 * left-most clone, but promote_object checks the scrubber in that
2715 * case, so it should be ok. Also, it's ok to "miss" clones at the
2716 * left end of the range if we are a tier because they may legitimately
2717 * not exist (see _scrub).
2718 */
2719 int min = std::max<int64_t>(3, cct->_conf->osd_scrub_chunk_min /
2720 scrubber.preempt_divisor);
2721 int max = std::max<int64_t>(min, cct->_conf->osd_scrub_chunk_max /
2722 scrubber.preempt_divisor);
2723 hobject_t start = scrubber.start;
2724 hobject_t candidate_end;
2725 vector<hobject_t> objects;
2726 ret = get_pgbackend()->objects_list_partial(
2727 start,
2728 min,
2729 max,
2730 &objects,
2731 &candidate_end);
2732 ceph_assert(ret >= 0);
7c673cae 2733
9f95a23c
TL
2734 if (!objects.empty()) {
2735 hobject_t back = objects.back();
2736 while (candidate_end.is_head() &&
2737 candidate_end == back.get_head()) {
2738 candidate_end = back;
2739 objects.pop_back();
2740 if (objects.empty()) {
2741 ceph_assert(0 ==
2742 "Somehow we got more than 2 objects which"
2743 "have the same head but are not clones");
2744 }
2745 back = objects.back();
2746 }
2747 if (candidate_end.is_head()) {
2748 ceph_assert(candidate_end != back.get_head());
2749 candidate_end = candidate_end.get_object_boundary();
2750 }
2751 } else {
2752 ceph_assert(candidate_end.is_max());
2753 }
7c673cae 2754
9f95a23c
TL
2755 if (!_range_available_for_scrub(scrubber.start, candidate_end)) {
2756 // we'll be requeued by whatever made us unavailable for scrub
2757 dout(10) << __func__ << ": scrub blocked somewhere in range "
2758 << "[" << scrubber.start << ", " << candidate_end << ")"
2759 << dendl;
2760 done = true;
2761 break;
2762 }
2763 scrubber.end = candidate_end;
2764 if (scrubber.end > scrubber.max_end)
2765 scrubber.max_end = scrubber.end;
2766 }
7c673cae 2767
9f95a23c
TL
2768 // walk the log to find the latest update that affects our chunk
2769 scrubber.subset_last_update = eversion_t();
2770 for (auto p = projected_log.log.rbegin();
2771 p != projected_log.log.rend();
2772 ++p) {
2773 if (p->soid >= scrubber.start &&
2774 p->soid < scrubber.end) {
2775 scrubber.subset_last_update = p->version;
2776 break;
2777 }
2778 }
2779 if (scrubber.subset_last_update == eversion_t()) {
2780 for (list<pg_log_entry_t>::const_reverse_iterator p =
2781 recovery_state.get_pg_log().get_log().log.rbegin();
2782 p != recovery_state.get_pg_log().get_log().log.rend();
2783 ++p) {
2784 if (p->soid >= scrubber.start &&
2785 p->soid < scrubber.end) {
2786 scrubber.subset_last_update = p->version;
2787 break;
2788 }
2789 }
2790 }
7c673cae 2791
9f95a23c
TL
2792 scrubber.state = PG::Scrubber::WAIT_PUSHES;
2793 break;
7c673cae 2794
9f95a23c
TL
2795 case PG::Scrubber::WAIT_PUSHES:
2796 if (active_pushes == 0) {
2797 scrubber.state = PG::Scrubber::WAIT_LAST_UPDATE;
2798 } else {
2799 dout(15) << "wait for pushes to apply" << dendl;
2800 done = true;
2801 }
2802 break;
7c673cae 2803
9f95a23c
TL
2804 case PG::Scrubber::WAIT_LAST_UPDATE:
2805 if (recovery_state.get_last_update_applied() <
2806 scrubber.subset_last_update) {
2807 // will be requeued by op_applied
2808 dout(15) << "wait for EC read/modify/writes to queue" << dendl;
2809 done = true;
2810 break;
2811 }
7c673cae 2812
9f95a23c
TL
2813 // ask replicas to scan
2814 scrubber.waiting_on_whom.insert(pg_whoami);
7c673cae 2815
9f95a23c
TL
2816 // request maps from replicas
2817 for (set<pg_shard_t>::iterator i = get_acting_recovery_backfill().begin();
2818 i != get_acting_recovery_backfill().end();
2819 ++i) {
2820 if (*i == pg_whoami) continue;
2821 _request_scrub_map(*i, scrubber.subset_last_update,
2822 scrubber.start, scrubber.end, scrubber.deep,
2823 scrubber.preempt_left > 0);
2824 scrubber.waiting_on_whom.insert(*i);
2825 }
2826 dout(10) << __func__ << " waiting_on_whom " << scrubber.waiting_on_whom
2827 << dendl;
7c673cae 2828
9f95a23c
TL
2829 scrubber.state = PG::Scrubber::BUILD_MAP;
2830 scrubber.primary_scrubmap_pos.reset();
2831 break;
7c673cae 2832
9f95a23c
TL
2833 case PG::Scrubber::BUILD_MAP:
2834 ceph_assert(recovery_state.get_last_update_applied() >=
2835 scrubber.subset_last_update);
224ce89b 2836
9f95a23c
TL
2837 // build my own scrub map
2838 if (scrub_preempted) {
2839 dout(10) << __func__ << " preempted" << dendl;
2840 scrubber.state = PG::Scrubber::BUILD_MAP_DONE;
2841 break;
2842 }
2843 ret = build_scrub_map_chunk(
2844 scrubber.primary_scrubmap,
2845 scrubber.primary_scrubmap_pos,
2846 scrubber.start, scrubber.end,
2847 scrubber.deep,
2848 handle);
2849 if (ret == -EINPROGRESS) {
2850 requeue_scrub();
2851 done = true;
2852 break;
2853 }
2854 scrubber.state = PG::Scrubber::BUILD_MAP_DONE;
2855 break;
b32b8144 2856
9f95a23c
TL
2857 case PG::Scrubber::BUILD_MAP_DONE:
2858 if (scrubber.primary_scrubmap_pos.ret < 0) {
2859 dout(5) << "error: " << scrubber.primary_scrubmap_pos.ret
2860 << ", aborting" << dendl;
2861 scrub_clear_state();
2862 scrub_unreserve_replicas();
2863 return;
2864 }
2865 dout(10) << __func__ << " waiting_on_whom was "
2866 << scrubber.waiting_on_whom << dendl;
2867 ceph_assert(scrubber.waiting_on_whom.count(pg_whoami));
2868 scrubber.waiting_on_whom.erase(pg_whoami);
7c673cae 2869
9f95a23c
TL
2870 scrubber.state = PG::Scrubber::WAIT_REPLICAS;
2871 break;
7c673cae 2872
9f95a23c
TL
2873 case PG::Scrubber::WAIT_REPLICAS:
2874 if (!scrubber.waiting_on_whom.empty()) {
2875 // will be requeued by do_replica_scrub_map
2876 dout(10) << "wait for replicas to build scrub map" << dendl;
2877 done = true;
2878 break;
2879 }
2880 // end (possible) preemption window
2881 scrub_can_preempt = false;
2882 if (scrub_preempted) {
2883 dout(10) << __func__ << " preempted, restarting chunk" << dendl;
2884 scrubber.state = PG::Scrubber::NEW_CHUNK;
2885 } else {
2886 scrubber.state = PG::Scrubber::COMPARE_MAPS;
2887 }
2888 break;
7c673cae 2889
9f95a23c
TL
2890 case PG::Scrubber::COMPARE_MAPS:
2891 ceph_assert(recovery_state.get_last_update_applied() >=
2892 scrubber.subset_last_update);
2893 ceph_assert(scrubber.waiting_on_whom.empty());
7c673cae 2894
9f95a23c
TL
2895 scrub_compare_maps();
2896 scrubber.start = scrubber.end;
2897 scrubber.run_callbacks();
7c673cae 2898
9f95a23c
TL
2899 // requeue the writes from the chunk that just finished
2900 requeue_ops(waiting_for_scrub);
7c673cae 2901
9f95a23c 2902 scrubber.state = PG::Scrubber::WAIT_DIGEST_UPDATES;
7c673cae 2903
9f95a23c 2904 // fall-thru
7c673cae 2905
9f95a23c
TL
2906 case PG::Scrubber::WAIT_DIGEST_UPDATES:
2907 if (scrubber.num_digest_updates_pending) {
2908 dout(10) << __func__ << " waiting on "
2909 << scrubber.num_digest_updates_pending
2910 << " digest updates" << dendl;
2911 done = true;
2912 break;
2913 }
7c673cae 2914
9f95a23c
TL
2915 scrubber.preempt_left = cct->_conf.get_val<uint64_t>(
2916 "osd_scrub_max_preemptions");
2917 scrubber.preempt_divisor = 1;
7c673cae 2918
9f95a23c
TL
2919 if (!(scrubber.end.is_max())) {
2920 scrubber.state = PG::Scrubber::NEW_CHUNK;
2921 requeue_scrub();
2922 done = true;
2923 } else {
2924 scrubber.state = PG::Scrubber::FINISH;
2925 }
7c673cae 2926
9f95a23c 2927 break;
224ce89b 2928
9f95a23c
TL
2929 case PG::Scrubber::FINISH:
2930 scrub_finish();
2931 scrubber.state = PG::Scrubber::INACTIVE;
2932 done = true;
7c673cae 2933
9f95a23c
TL
2934 if (!snap_trimq.empty()) {
2935 dout(10) << "scrub finished, requeuing snap_trimmer" << dendl;
2936 snap_trimmer_scrub_complete();
2937 }
7c673cae 2938
9f95a23c 2939 break;
7c673cae 2940
9f95a23c
TL
2941 case PG::Scrubber::BUILD_MAP_REPLICA:
2942 // build my own scrub map
2943 if (scrub_preempted) {
2944 dout(10) << __func__ << " preempted" << dendl;
2945 ret = 0;
11fdf7f2 2946 } else {
9f95a23c
TL
2947 ret = build_scrub_map_chunk(
2948 scrubber.replica_scrubmap,
2949 scrubber.replica_scrubmap_pos,
2950 scrubber.start, scrubber.end,
2951 scrubber.deep,
2952 handle);
11fdf7f2 2953 }
9f95a23c
TL
2954 if (ret == -EINPROGRESS) {
2955 requeue_scrub();
2956 done = true;
2957 break;
11fdf7f2 2958 }
9f95a23c
TL
2959 // reply
2960 {
2961 MOSDRepScrubMap *reply = new MOSDRepScrubMap(
2962 spg_t(info.pgid.pgid, get_primary().shard),
2963 scrubber.replica_scrub_start,
2964 pg_whoami);
2965 reply->preempted = scrub_preempted;
2966 ::encode(scrubber.replica_scrubmap, reply->get_data());
2967 osd->send_message_osd_cluster(
2968 get_primary().osd, reply,
2969 scrubber.replica_scrub_start);
2970 }
2971 scrub_preempted = false;
2972 scrub_can_preempt = false;
2973 scrubber.state = PG::Scrubber::INACTIVE;
2974 scrubber.replica_scrubmap = ScrubMap();
2975 scrubber.replica_scrubmap_pos = ScrubMapBuilder();
2976 scrubber.start = hobject_t();
2977 scrubber.end = hobject_t();
2978 scrubber.max_end = hobject_t();
2979 done = true;
2980 break;
7c673cae 2981
9f95a23c
TL
2982 default:
2983 ceph_abort();
7c673cae 2984 }
7c673cae 2985 }
9f95a23c
TL
2986 dout(20) << "scrub final state " << Scrubber::state_string(scrubber.state)
2987 << " [" << scrubber.start << "," << scrubber.end << ")"
2988 << " max_end " << scrubber.max_end << dendl;
7c673cae
FG
2989}
2990
9f95a23c 2991bool PG::write_blocked_by_scrub(const hobject_t& soid)
7c673cae 2992{
9f95a23c
TL
2993 if (soid < scrubber.start || soid >= scrubber.end) {
2994 return false;
7c673cae 2995 }
9f95a23c
TL
2996 if (scrub_can_preempt) {
2997 if (!scrub_preempted) {
2998 dout(10) << __func__ << " " << soid << " preempted" << dendl;
2999 scrub_preempted = true;
3000 } else {
3001 dout(10) << __func__ << " " << soid << " already preempted" << dendl;
7c673cae 3002 }
9f95a23c 3003 return false;
7c673cae 3004 }
9f95a23c 3005 return true;
7c673cae
FG
3006}
3007
9f95a23c 3008bool PG::range_intersects_scrub(const hobject_t &start, const hobject_t& end)
7c673cae 3009{
9f95a23c
TL
3010 // does [start, end] intersect [scrubber.start, scrubber.max_end)
3011 return (start < scrubber.max_end &&
3012 end >= scrubber.start);
7c673cae
FG
3013}
3014
9f95a23c 3015void PG::scrub_clear_state(bool has_error)
7c673cae 3016{
9f95a23c
TL
3017 ceph_assert(is_locked());
3018 state_clear(PG_STATE_SCRUBBING);
3019 if (!has_error)
3020 state_clear(PG_STATE_REPAIR);
3021 state_clear(PG_STATE_DEEP_SCRUB);
3022 publish_stats_to_osd();
7c673cae 3023
9f95a23c
TL
3024 // local -> nothing.
3025 if (scrubber.local_reserved) {
3026 osd->dec_scrubs_local();
3027 scrubber.local_reserved = false;
3028 scrubber.reserved_peers.clear();
7c673cae
FG
3029 }
3030
9f95a23c
TL
3031 requeue_ops(waiting_for_scrub);
3032
3033 scrubber.reset();
7c673cae 3034
9f95a23c
TL
3035 // type-specific state clear
3036 _scrub_clear_state();
7c673cae
FG
3037}
3038
9f95a23c 3039void PG::scrub_compare_maps()
7c673cae 3040{
9f95a23c 3041 dout(10) << __func__ << " has maps, analyzing" << dendl;
11fdf7f2 3042
9f95a23c
TL
3043 // construct authoritative scrub map for type specific scrubbing
3044 scrubber.cleaned_meta_map.insert(scrubber.primary_scrubmap);
3045 map<hobject_t,
3046 pair<std::optional<uint32_t>,
3047 std::optional<uint32_t>>> missing_digest;
11fdf7f2 3048
9f95a23c
TL
3049 map<pg_shard_t, ScrubMap *> maps;
3050 maps[pg_whoami] = &scrubber.primary_scrubmap;
11fdf7f2 3051
9f95a23c
TL
3052 for (const auto& i : get_acting_recovery_backfill()) {
3053 if (i == pg_whoami) continue;
3054 dout(2) << __func__ << " replica " << i << " has "
3055 << scrubber.received_maps[i].objects.size()
3056 << " items" << dendl;
3057 maps[i] = &scrubber.received_maps[i];
7c673cae
FG
3058 }
3059
9f95a23c 3060 set<hobject_t> master_set;
7c673cae 3061
9f95a23c
TL
3062 // Construct master set
3063 for (const auto map : maps) {
3064 for (const auto i : map.second->objects) {
3065 master_set.insert(i.first);
3066 }
7c673cae
FG
3067 }
3068
9f95a23c
TL
3069 stringstream ss;
3070 get_pgbackend()->be_omap_checks(maps, master_set,
3071 scrubber.omap_stats, ss);
7c673cae 3072
9f95a23c
TL
3073 if (!ss.str().empty()) {
3074 osd->clog->warn(ss);
3075 }
7c673cae 3076
9f95a23c
TL
3077 if (recovery_state.get_acting().size() > 1) {
3078 dout(10) << __func__ << " comparing replica scrub maps" << dendl;
7c673cae 3079
9f95a23c
TL
3080 // Map from object with errors to good peer
3081 map<hobject_t, list<pg_shard_t>> authoritative;
7c673cae 3082
9f95a23c
TL
3083 dout(2) << __func__ << get_primary() << " has "
3084 << scrubber.primary_scrubmap.objects.size() << " items" << dendl;
7c673cae 3085
9f95a23c
TL
3086 ss.str("");
3087 ss.clear();
7c673cae 3088
9f95a23c
TL
3089 get_pgbackend()->be_compare_scrubmaps(
3090 maps,
3091 master_set,
3092 state_test(PG_STATE_REPAIR),
3093 scrubber.missing,
3094 scrubber.inconsistent,
3095 authoritative,
3096 missing_digest,
3097 scrubber.shallow_errors,
3098 scrubber.deep_errors,
3099 scrubber.store.get(),
3100 info.pgid, recovery_state.get_acting(),
3101 ss);
3102 dout(2) << ss.str() << dendl;
7c673cae 3103
9f95a23c
TL
3104 if (!ss.str().empty()) {
3105 osd->clog->error(ss);
3106 }
7c673cae 3107
9f95a23c
TL
3108 for (map<hobject_t, list<pg_shard_t>>::iterator i = authoritative.begin();
3109 i != authoritative.end();
3110 ++i) {
3111 list<pair<ScrubMap::object, pg_shard_t> > good_peers;
3112 for (list<pg_shard_t>::const_iterator j = i->second.begin();
3113 j != i->second.end();
3114 ++j) {
3115 good_peers.emplace_back(maps[*j]->objects[i->first], *j);
3116 }
3117 scrubber.authoritative.emplace(i->first, good_peers);
3118 }
7c673cae 3119
9f95a23c
TL
3120 for (map<hobject_t, list<pg_shard_t>>::iterator i = authoritative.begin();
3121 i != authoritative.end();
3122 ++i) {
3123 scrubber.cleaned_meta_map.objects.erase(i->first);
3124 scrubber.cleaned_meta_map.objects.insert(
3125 *(maps[i->second.back()]->objects.find(i->first))
3126 );
3127 }
3128 }
7c673cae 3129
9f95a23c
TL
3130 ScrubMap for_meta_scrub;
3131 scrubber.clean_meta_map(for_meta_scrub);
7c673cae 3132
9f95a23c
TL
3133 // ok, do the pg-type specific scrubbing
3134 scrub_snapshot_metadata(for_meta_scrub, missing_digest);
3135 // Called here on the primary can use an authoritative map if it isn't the primary
3136 _scan_snaps(for_meta_scrub);
3137 if (!scrubber.store->empty()) {
3138 if (state_test(PG_STATE_REPAIR)) {
3139 dout(10) << __func__ << ": discarding scrub results" << dendl;
3140 scrubber.store->flush(nullptr);
3141 } else {
3142 dout(10) << __func__ << ": updating scrub object" << dendl;
3143 ObjectStore::Transaction t;
3144 scrubber.store->flush(&t);
3145 osd->store->queue_transaction(ch, std::move(t), nullptr);
3146 }
3147 }
11fdf7f2
TL
3148}
3149
9f95a23c 3150bool PG::scrub_process_inconsistent()
7c673cae 3151{
9f95a23c
TL
3152 dout(10) << __func__ << ": checking authoritative" << dendl;
3153 bool repair = state_test(PG_STATE_REPAIR);
3154 bool deep_scrub = state_test(PG_STATE_DEEP_SCRUB);
3155 const char *mode = (repair ? "repair": (deep_scrub ? "deep-scrub" : "scrub"));
3156
3157 // authoriative only store objects which missing or inconsistent.
3158 if (!scrubber.authoritative.empty()) {
3159 stringstream ss;
3160 ss << info.pgid << " " << mode << " "
3161 << scrubber.missing.size() << " missing, "
3162 << scrubber.inconsistent.size() << " inconsistent objects";
3163 dout(2) << ss.str() << dendl;
3164 osd->clog->error(ss);
3165 if (repair) {
3166 state_clear(PG_STATE_CLEAN);
3167 for (map<hobject_t, list<pair<ScrubMap::object, pg_shard_t> >>::iterator i =
3168 scrubber.authoritative.begin();
3169 i != scrubber.authoritative.end();
3170 ++i) {
3171 auto missing_entry = scrubber.missing.find(i->first);
3172 if (missing_entry != scrubber.missing.end()) {
3173 repair_object(
3174 i->first,
3175 i->second,
3176 missing_entry->second);
3177 scrubber.fixed += missing_entry->second.size();
3178 }
3179 if (scrubber.inconsistent.count(i->first)) {
3180 repair_object(
3181 i->first,
3182 i->second,
3183 scrubber.inconsistent[i->first]);
3184 scrubber.fixed += missing_entry->second.size();
3185 }
3186 }
3187 }
7c673cae 3188 }
9f95a23c 3189 return (!scrubber.authoritative.empty() && repair);
7c673cae
FG
3190}
3191
9f95a23c
TL
3192bool PG::ops_blocked_by_scrub() const {
3193 return (waiting_for_scrub.size() != 0);
7c673cae
FG
3194}
3195
9f95a23c
TL
3196// the part that actually finalizes a scrub
3197void PG::scrub_finish()
7c673cae 3198{
9f95a23c
TL
3199 dout(20) << __func__ << dendl;
3200 bool repair = state_test(PG_STATE_REPAIR);
3201 bool do_auto_scrub = false;
3202 // if the repair request comes from auto-repair and large number of errors,
3203 // we would like to cancel auto-repair
3204 if (repair && scrubber.auto_repair
3205 && scrubber.authoritative.size() > cct->_conf->osd_scrub_auto_repair_num_errors) {
3206 state_clear(PG_STATE_REPAIR);
3207 repair = false;
3208 }
3209 bool deep_scrub = state_test(PG_STATE_DEEP_SCRUB);
3210 const char *mode = (repair ? "repair": (deep_scrub ? "deep-scrub" : "scrub"));
7c673cae 3211
9f95a23c
TL
3212 // if a regular scrub had errors within the limit, do a deep scrub to auto repair.
3213 if (scrubber.deep_scrub_on_error
3214 && scrubber.authoritative.size()
3215 && scrubber.authoritative.size() <= cct->_conf->osd_scrub_auto_repair_num_errors) {
3216 ceph_assert(!deep_scrub);
3217 do_auto_scrub = true;
3218 dout(20) << __func__ << " Try to auto repair after scrub errors" << dendl;
3219 }
3220 scrubber.deep_scrub_on_error = false;
7c673cae 3221
9f95a23c
TL
3222 // type-specific finish (can tally more errors)
3223 _scrub_finish();
7c673cae 3224
9f95a23c 3225 bool has_error = scrub_process_inconsistent();
11fdf7f2 3226
9f95a23c
TL
3227 {
3228 stringstream oss;
3229 oss << info.pgid.pgid << " " << mode << " ";
3230 int total_errors = scrubber.shallow_errors + scrubber.deep_errors;
3231 if (total_errors)
3232 oss << total_errors << " errors";
3233 else
3234 oss << "ok";
3235 if (!deep_scrub && info.stats.stats.sum.num_deep_scrub_errors)
3236 oss << " ( " << info.stats.stats.sum.num_deep_scrub_errors
3237 << " remaining deep scrub error details lost)";
3238 if (repair)
3239 oss << ", " << scrubber.fixed << " fixed";
3240 if (total_errors)
3241 osd->clog->error(oss);
3242 else
3243 osd->clog->debug(oss);
11fdf7f2 3244 }
7c673cae 3245
9f95a23c
TL
3246 // Since we don't know which errors were fixed, we can only clear them
3247 // when every one has been fixed.
3248 if (repair) {
3249 if (scrubber.fixed == scrubber.shallow_errors + scrubber.deep_errors) {
3250 ceph_assert(deep_scrub);
3251 scrubber.shallow_errors = scrubber.deep_errors = 0;
3252 dout(20) << __func__ << " All may be fixed" << dendl;
3253 } else if (has_error) {
3254 // Deep scrub in order to get corrected error counts
3255 scrub_after_recovery = true;
3256 dout(20) << __func__ << " Set scrub_after_recovery" << dendl;
3257 } else if (scrubber.shallow_errors || scrubber.deep_errors) {
3258 // We have errors but nothing can be fixed, so there is no repair
3259 // possible.
3260 state_set(PG_STATE_FAILED_REPAIR);
3261 dout(10) << __func__ << " " << (scrubber.shallow_errors + scrubber.deep_errors)
3262 << " error(s) present with no repair possible" << dendl;
3263 }
3264 }
7c673cae 3265
9f95a23c
TL
3266 {
3267 // finish up
3268 ObjectStore::Transaction t;
3269 recovery_state.update_stats(
3270 [this, deep_scrub](auto &history, auto &stats) {
3271 utime_t now = ceph_clock_now();
3272 history.last_scrub = recovery_state.get_info().last_update;
3273 history.last_scrub_stamp = now;
3274 if (scrubber.deep) {
3275 history.last_deep_scrub = recovery_state.get_info().last_update;
3276 history.last_deep_scrub_stamp = now;
3277 }
7c673cae 3278
9f95a23c
TL
3279 if (deep_scrub) {
3280 if ((scrubber.shallow_errors == 0) && (scrubber.deep_errors == 0))
3281 history.last_clean_scrub_stamp = now;
3282 stats.stats.sum.num_shallow_scrub_errors = scrubber.shallow_errors;
3283 stats.stats.sum.num_deep_scrub_errors = scrubber.deep_errors;
3284 stats.stats.sum.num_large_omap_objects = scrubber.omap_stats.large_omap_objects;
3285 stats.stats.sum.num_omap_bytes = scrubber.omap_stats.omap_bytes;
3286 stats.stats.sum.num_omap_keys = scrubber.omap_stats.omap_keys;
3287 dout(25) << "scrub_finish shard " << pg_whoami << " num_omap_bytes = "
3288 << stats.stats.sum.num_omap_bytes << " num_omap_keys = "
3289 << stats.stats.sum.num_omap_keys << dendl;
3290 } else {
3291 stats.stats.sum.num_shallow_scrub_errors = scrubber.shallow_errors;
3292 // XXX: last_clean_scrub_stamp doesn't mean the pg is not inconsistent
3293 // because of deep-scrub errors
3294 if (scrubber.shallow_errors == 0)
3295 history.last_clean_scrub_stamp = now;
3296 }
3297 stats.stats.sum.num_scrub_errors =
3298 stats.stats.sum.num_shallow_scrub_errors +
3299 stats.stats.sum.num_deep_scrub_errors;
3300 if (scrubber.check_repair) {
3301 scrubber.check_repair = false;
3302 if (info.stats.stats.sum.num_scrub_errors) {
3303 state_set(PG_STATE_FAILED_REPAIR);
3304 dout(10) << "scrub_finish " << info.stats.stats.sum.num_scrub_errors
3305 << " error(s) still present after re-scrub" << dendl;
3306 }
3307 }
3308 return true;
3309 },
3310 &t);
3311 int tr = osd->store->queue_transaction(ch, std::move(t), NULL);
3312 ceph_assert(tr == 0);
7c673cae
FG
3313 }
3314
9f95a23c
TL
3315 if (has_error) {
3316 queue_peering_event(
3317 PGPeeringEventRef(
3318 std::make_shared<PGPeeringEvent>(
3319 get_osdmap_epoch(),
3320 get_osdmap_epoch(),
3321 PeeringState::DoRecovery())));
3322 }
7c673cae 3323
9f95a23c
TL
3324 scrub_clear_state(has_error);
3325 scrub_unreserve_replicas();
7c673cae 3326
9f95a23c
TL
3327 if (do_auto_scrub) {
3328 scrub_requested(false, false, true);
7c673cae 3329 }
11fdf7f2 3330
9f95a23c
TL
3331 if (is_active() && is_primary()) {
3332 recovery_state.share_pg_info();
3333 }
7c673cae
FG
3334}
3335
9f95a23c 3336bool PG::old_peering_msg(epoch_t reply_epoch, epoch_t query_epoch)
7c673cae 3337{
9f95a23c
TL
3338 if (get_last_peering_reset() > reply_epoch ||
3339 get_last_peering_reset() > query_epoch) {
3340 dout(10) << "old_peering_msg reply_epoch " << reply_epoch << " query_epoch " << query_epoch
3341 << " last_peering_reset " << get_last_peering_reset()
3342 << dendl;
3343 return true;
3344 }
3345 return false;
7c673cae
FG
3346}
3347
9f95a23c
TL
3348struct FlushState {
3349 PGRef pg;
3350 epoch_t epoch;
3351 FlushState(PG *pg, epoch_t epoch) : pg(pg), epoch(epoch) {}
3352 ~FlushState() {
3353 std::scoped_lock l{*pg};
3354 if (!pg->pg_has_reset_since(epoch)) {
3355 pg->recovery_state.complete_flush();
3356 }
7c673cae 3357 }
9f95a23c
TL
3358};
3359typedef std::shared_ptr<FlushState> FlushStateRef;
7c673cae 3360
9f95a23c 3361void PG::start_flush_on_transaction(ObjectStore::Transaction &t)
7c673cae 3362{
9f95a23c
TL
3363 // flush in progress ops
3364 FlushStateRef flush_trigger (std::make_shared<FlushState>(
3365 this, get_osdmap_epoch()));
3366 t.register_on_applied(new ContainerContext<FlushStateRef>(flush_trigger));
3367 t.register_on_commit(new ContainerContext<FlushStateRef>(flush_trigger));
7c673cae
FG
3368}
3369
9f95a23c 3370bool PG::try_flush_or_schedule_async()
11fdf7f2 3371{
9f95a23c
TL
3372
3373 Context *c = new QueuePeeringEvt(
3374 this, get_osdmap_epoch(), PeeringState::IntervalFlush());
3375 if (!ch->flush_commit(c)) {
3376 return false;
3377 } else {
3378 delete c;
3379 return true;
3380 }
11fdf7f2
TL
3381}
3382
9f95a23c 3383ostream& operator<<(ostream& out, const PG& pg)
11fdf7f2 3384{
9f95a23c
TL
3385 out << pg.recovery_state;
3386 if (pg.scrubber.must_repair)
3387 out << " MUST_REPAIR";
3388 if (pg.scrubber.auto_repair)
3389 out << " AUTO_REPAIR";
3390 if (pg.scrubber.check_repair)
3391 out << " CHECK_REPAIR";
3392 if (pg.scrubber.deep_scrub_on_error)
3393 out << " DEEP_SCRUB_ON_ERROR";
3394 if (pg.scrubber.must_deep_scrub)
3395 out << " MUST_DEEP_SCRUB";
3396 if (pg.scrubber.must_scrub)
3397 out << " MUST_SCRUB";
3398 if (pg.scrubber.time_for_deep)
3399 out << " TIME_FOR_DEEP";
3400 if (pg.scrubber.need_auto)
3401 out << " NEED_AUTO";
11fdf7f2 3402
9f95a23c
TL
3403 if (pg.recovery_ops_active)
3404 out << " rops=" << pg.recovery_ops_active;
11fdf7f2 3405
9f95a23c
TL
3406 //out << " (" << pg.pg_log.get_tail() << "," << pg.pg_log.get_head() << "]";
3407 if (pg.recovery_state.have_missing()) {
3408 out << " m=" << pg.recovery_state.get_num_missing();
3409 if (pg.is_primary()) {
3410 uint64_t unfound = pg.recovery_state.get_num_unfound();
3411 if (unfound)
3412 out << " u=" << unfound;
3413 }
3414 }
3415 if (!pg.is_clean()) {
3416 out << " mbc=" << pg.recovery_state.get_missing_by_count();
3417 }
3418 if (!pg.snap_trimq.empty()) {
3419 out << " trimq=";
3420 // only show a count if the set is large
3421 if (pg.snap_trimq.num_intervals() > 16) {
3422 out << pg.snap_trimq.size();
3423 if (!pg.snap_trimq_repeat.empty()) {
3424 out << "(" << pg.snap_trimq_repeat.size() << ")";
3425 }
3426 } else {
3427 out << pg.snap_trimq;
3428 if (!pg.snap_trimq_repeat.empty()) {
3429 out << "(" << pg.snap_trimq_repeat << ")";
3430 }
3431 }
3432 }
3433 if (!pg.recovery_state.get_info().purged_snaps.empty()) {
3434 out << " ps="; // snap trim queue / purged snaps
3435 if (pg.recovery_state.get_info().purged_snaps.num_intervals() > 16) {
3436 out << pg.recovery_state.get_info().purged_snaps.size();
3437 } else {
3438 out << pg.recovery_state.get_info().purged_snaps;
3439 }
11fdf7f2 3440 }
11fdf7f2 3441
9f95a23c 3442 out << "]";
11fdf7f2 3443
11fdf7f2 3444
9f95a23c 3445 return out;
11fdf7f2
TL
3446}
3447
9f95a23c 3448bool PG::can_discard_op(OpRequestRef& op)
7c673cae 3449{
9f95a23c
TL
3450 auto m = op->get_req<MOSDOp>();
3451 if (cct->_conf->osd_discard_disconnected_ops && OSD::op_is_discardable(m)) {
3452 dout(20) << " discard " << *m << dendl;
3453 return true;
3454 }
7c673cae 3455
9f95a23c
TL
3456 if (m->get_map_epoch() < info.history.same_primary_since) {
3457 dout(7) << " changed after " << m->get_map_epoch()
3458 << ", dropping " << *m << dendl;
3459 return true;
3460 }
7c673cae 3461
9f95a23c
TL
3462 if ((m->get_flags() & (CEPH_OSD_FLAG_BALANCE_READS |
3463 CEPH_OSD_FLAG_LOCALIZE_READS)) &&
3464 !is_primary() &&
3465 m->get_map_epoch() < info.history.same_interval_since) {
3466 // Note: the Objecter will resend on interval change without the primary
3467 // changing if it actually sent to a replica. If the primary hasn't
3468 // changed since the send epoch, we got it, and we're primary, it won't
3469 // have resent even if the interval did change as it sent it to the primary
3470 // (us).
3471 return true;
7c673cae 3472 }
7c673cae 3473
7c673cae 3474
9f95a23c
TL
3475 if (m->get_connection()->has_feature(CEPH_FEATURE_RESEND_ON_SPLIT)) {
3476 // >= luminous client
3477 if (m->get_connection()->has_feature(CEPH_FEATURE_SERVER_NAUTILUS)) {
3478 // >= nautilus client
3479 if (m->get_map_epoch() < pool.info.get_last_force_op_resend()) {
3480 dout(7) << __func__ << " sent before last_force_op_resend "
3481 << pool.info.last_force_op_resend
3482 << ", dropping" << *m << dendl;
3483 return true;
3484 }
3485 } else {
3486 // == < nautilus client (luminous or mimic)
3487 if (m->get_map_epoch() < pool.info.get_last_force_op_resend_prenautilus()) {
3488 dout(7) << __func__ << " sent before last_force_op_resend_prenautilus "
3489 << pool.info.last_force_op_resend_prenautilus
3490 << ", dropping" << *m << dendl;
3491 return true;
3492 }
7c673cae 3493 }
9f95a23c
TL
3494 if (m->get_map_epoch() < info.history.last_epoch_split) {
3495 dout(7) << __func__ << " pg split in "
3496 << info.history.last_epoch_split << ", dropping" << dendl;
3497 return true;
7c673cae 3498 }
9f95a23c
TL
3499 } else if (m->get_connection()->has_feature(CEPH_FEATURE_OSD_POOLRESEND)) {
3500 // < luminous client
3501 if (m->get_map_epoch() < pool.info.get_last_force_op_resend_preluminous()) {
3502 dout(7) << __func__ << " sent before last_force_op_resend_preluminous "
3503 << pool.info.last_force_op_resend_preluminous
3504 << ", dropping" << *m << dendl;
3505 return true;
7c673cae
FG
3506 }
3507 }
3508
9f95a23c 3509 return false;
7c673cae
FG
3510}
3511
9f95a23c
TL
3512template<typename T, int MSGTYPE>
3513bool PG::can_discard_replica_op(OpRequestRef& op)
7c673cae 3514{
9f95a23c
TL
3515 auto m = op->get_req<T>();
3516 ceph_assert(m->get_type() == MSGTYPE);
7c673cae 3517
9f95a23c 3518 int from = m->get_source().num();
7c673cae 3519
9f95a23c
TL
3520 // if a repop is replied after a replica goes down in a new osdmap, and
3521 // before the pg advances to this new osdmap, the repop replies before this
3522 // repop can be discarded by that replica OSD, because the primary resets the
3523 // connection to it when handling the new osdmap marking it down, and also
3524 // resets the messenger sesssion when the replica reconnects. to avoid the
3525 // out-of-order replies, the messages from that replica should be discarded.
3526 OSDMapRef next_map = osd->get_next_osdmap();
3527 if (next_map->is_down(from))
3528 return true;
3529 /* Mostly, this overlaps with the old_peering_msg
3530 * condition. An important exception is pushes
3531 * sent by replicas not in the acting set, since
3532 * if such a replica goes down it does not cause
3533 * a new interval. */
3534 if (next_map->get_down_at(from) >= m->map_epoch)
3535 return true;
7c673cae 3536
9f95a23c
TL
3537 // same pg?
3538 // if pg changes _at all_, we reset and repeer!
3539 if (old_peering_msg(m->map_epoch, m->map_epoch)) {
3540 dout(10) << "can_discard_replica_op pg changed " << info.history
3541 << " after " << m->map_epoch
3542 << ", dropping" << dendl;
3543 return true;
7c673cae 3544 }
9f95a23c 3545 return false;
7c673cae
FG
3546}
3547
9f95a23c 3548bool PG::can_discard_scan(OpRequestRef op)
7c673cae 3549{
9f95a23c
TL
3550 auto m = op->get_req<MOSDPGScan>();
3551 ceph_assert(m->get_type() == MSG_OSD_PG_SCAN);
7c673cae 3552
9f95a23c
TL
3553 if (old_peering_msg(m->map_epoch, m->query_epoch)) {
3554 dout(10) << " got old scan, ignoring" << dendl;
3555 return true;
7c673cae 3556 }
9f95a23c 3557 return false;
7c673cae
FG
3558}
3559
9f95a23c 3560bool PG::can_discard_backfill(OpRequestRef op)
7c673cae 3561{
9f95a23c
TL
3562 auto m = op->get_req<MOSDPGBackfill>();
3563 ceph_assert(m->get_type() == MSG_OSD_PG_BACKFILL);
7c673cae 3564
9f95a23c
TL
3565 if (old_peering_msg(m->map_epoch, m->query_epoch)) {
3566 dout(10) << " got old backfill, ignoring" << dendl;
3567 return true;
7c673cae
FG
3568 }
3569
9f95a23c 3570 return false;
7c673cae 3571
7c673cae
FG
3572}
3573
9f95a23c 3574bool PG::can_discard_request(OpRequestRef& op)
7c673cae 3575{
9f95a23c
TL
3576 switch (op->get_req()->get_type()) {
3577 case CEPH_MSG_OSD_OP:
3578 return can_discard_op(op);
3579 case CEPH_MSG_OSD_BACKOFF:
3580 return false; // never discard
3581 case MSG_OSD_REPOP:
3582 return can_discard_replica_op<MOSDRepOp, MSG_OSD_REPOP>(op);
3583 case MSG_OSD_PG_PUSH:
3584 return can_discard_replica_op<MOSDPGPush, MSG_OSD_PG_PUSH>(op);
3585 case MSG_OSD_PG_PULL:
3586 return can_discard_replica_op<MOSDPGPull, MSG_OSD_PG_PULL>(op);
3587 case MSG_OSD_PG_PUSH_REPLY:
3588 return can_discard_replica_op<MOSDPGPushReply, MSG_OSD_PG_PUSH_REPLY>(op);
3589 case MSG_OSD_REPOPREPLY:
3590 return can_discard_replica_op<MOSDRepOpReply, MSG_OSD_REPOPREPLY>(op);
3591 case MSG_OSD_PG_RECOVERY_DELETE:
3592 return can_discard_replica_op<MOSDPGRecoveryDelete, MSG_OSD_PG_RECOVERY_DELETE>(op);
7c673cae 3593
9f95a23c
TL
3594 case MSG_OSD_PG_RECOVERY_DELETE_REPLY:
3595 return can_discard_replica_op<MOSDPGRecoveryDeleteReply, MSG_OSD_PG_RECOVERY_DELETE_REPLY>(op);
7c673cae 3596
9f95a23c
TL
3597 case MSG_OSD_EC_WRITE:
3598 return can_discard_replica_op<MOSDECSubOpWrite, MSG_OSD_EC_WRITE>(op);
3599 case MSG_OSD_EC_WRITE_REPLY:
3600 return can_discard_replica_op<MOSDECSubOpWriteReply, MSG_OSD_EC_WRITE_REPLY>(op);
3601 case MSG_OSD_EC_READ:
3602 return can_discard_replica_op<MOSDECSubOpRead, MSG_OSD_EC_READ>(op);
3603 case MSG_OSD_EC_READ_REPLY:
3604 return can_discard_replica_op<MOSDECSubOpReadReply, MSG_OSD_EC_READ_REPLY>(op);
3605 case MSG_OSD_REP_SCRUB:
3606 return can_discard_replica_op<MOSDRepScrub, MSG_OSD_REP_SCRUB>(op);
3607 case MSG_OSD_SCRUB_RESERVE:
3608 return can_discard_replica_op<MOSDScrubReserve, MSG_OSD_SCRUB_RESERVE>(op);
3609 case MSG_OSD_REP_SCRUBMAP:
3610 return can_discard_replica_op<MOSDRepScrubMap, MSG_OSD_REP_SCRUBMAP>(op);
3611 case MSG_OSD_PG_UPDATE_LOG_MISSING:
3612 return can_discard_replica_op<
3613 MOSDPGUpdateLogMissing, MSG_OSD_PG_UPDATE_LOG_MISSING>(op);
3614 case MSG_OSD_PG_UPDATE_LOG_MISSING_REPLY:
3615 return can_discard_replica_op<
3616 MOSDPGUpdateLogMissingReply, MSG_OSD_PG_UPDATE_LOG_MISSING_REPLY>(op);
3617
3618 case MSG_OSD_PG_SCAN:
3619 return can_discard_scan(op);
3620 case MSG_OSD_PG_BACKFILL:
3621 return can_discard_backfill(op);
3622 case MSG_OSD_PG_BACKFILL_REMOVE:
3623 return can_discard_replica_op<MOSDPGBackfillRemove,
3624 MSG_OSD_PG_BACKFILL_REMOVE>(op);
7c673cae 3625 }
9f95a23c 3626 return true;
7c673cae
FG
3627}
3628
9f95a23c 3629void PG::do_peering_event(PGPeeringEventRef evt, PeeringCtx &rctx)
7c673cae 3630{
9f95a23c
TL
3631 dout(10) << __func__ << ": " << evt->get_desc() << dendl;
3632 ceph_assert(have_same_or_newer_map(evt->get_epoch_sent()));
3633 if (old_peering_evt(evt)) {
3634 dout(10) << "discard old " << evt->get_desc() << dendl;
3635 } else {
3636 recovery_state.handle_event(evt, &rctx);
7c673cae 3637 }
9f95a23c
TL
3638 // write_if_dirty regardless of path above to ensure we capture any work
3639 // done by OSD::advance_pg().
3640 write_if_dirty(rctx.transaction);
7c673cae
FG
3641}
3642
9f95a23c 3643void PG::queue_peering_event(PGPeeringEventRef evt)
7c673cae 3644{
9f95a23c
TL
3645 if (old_peering_evt(evt))
3646 return;
3647 osd->osd->enqueue_peering_evt(info.pgid, evt);
7c673cae
FG
3648}
3649
9f95a23c
TL
3650void PG::queue_null(epoch_t msg_epoch,
3651 epoch_t query_epoch)
7c673cae 3652{
9f95a23c
TL
3653 dout(10) << "null" << dendl;
3654 queue_peering_event(
3655 PGPeeringEventRef(std::make_shared<PGPeeringEvent>(msg_epoch, query_epoch,
3656 NullEvt())));
7c673cae
FG
3657}
3658
9f95a23c 3659void PG::find_unfound(epoch_t queued, PeeringCtx &rctx)
7c673cae 3660{
9f95a23c
TL
3661 /*
3662 * if we couldn't start any recovery ops and things are still
3663 * unfound, see if we can discover more missing object locations.
3664 * It may be that our initial locations were bad and we errored
3665 * out while trying to pull.
3666 */
3667 if (!recovery_state.discover_all_missing(rctx)) {
3668 string action;
3669 if (state_test(PG_STATE_BACKFILLING)) {
3670 auto evt = PGPeeringEventRef(
3671 new PGPeeringEvent(
3672 queued,
3673 queued,
3674 PeeringState::UnfoundBackfill()));
3675 queue_peering_event(evt);
3676 action = "in backfill";
3677 } else if (state_test(PG_STATE_RECOVERING)) {
3678 auto evt = PGPeeringEventRef(
3679 new PGPeeringEvent(
3680 queued,
3681 queued,
3682 PeeringState::UnfoundRecovery()));
3683 queue_peering_event(evt);
3684 action = "in recovery";
3685 } else {
3686 action = "already out of recovery/backfill";
7c673cae 3687 }
9f95a23c
TL
3688 dout(10) << __func__ << ": no luck, giving up on this pg for now (" << action << ")" << dendl;
3689 } else {
3690 dout(10) << __func__ << ": no luck, giving up on this pg for now (queue_recovery)" << dendl;
3691 queue_recovery();
7c673cae 3692 }
7c673cae
FG
3693}
3694
9f95a23c
TL
3695void PG::handle_advance_map(
3696 OSDMapRef osdmap, OSDMapRef lastmap,
3697 vector<int>& newup, int up_primary,
3698 vector<int>& newacting, int acting_primary,
3699 PeeringCtx &rctx)
7c673cae 3700{
9f95a23c
TL
3701 dout(10) << __func__ << ": " << osdmap->get_epoch() << dendl;
3702 osd_shard->update_pg_epoch(pg_slot, osdmap->get_epoch());
3703 recovery_state.advance_map(
3704 osdmap,
3705 lastmap,
3706 newup,
3707 up_primary,
3708 newacting,
3709 acting_primary,
3710 rctx);
7c673cae
FG
3711}
3712
9f95a23c 3713void PG::handle_activate_map(PeeringCtx &rctx)
7c673cae 3714{
9f95a23c
TL
3715 dout(10) << __func__ << ": " << get_osdmap()->get_epoch()
3716 << dendl;
3717 recovery_state.activate_map(rctx);
7c673cae 3718
9f95a23c 3719 requeue_map_waiters();
7c673cae
FG
3720}
3721
9f95a23c 3722void PG::handle_initialize(PeeringCtx &rctx)
7c673cae 3723{
9f95a23c
TL
3724 dout(10) << __func__ << dendl;
3725 PeeringState::Initialize evt;
3726 recovery_state.handle_event(evt, &rctx);
7c673cae
FG
3727}
3728
9f95a23c 3729void PG::handle_query_state(Formatter *f)
7c673cae 3730{
9f95a23c
TL
3731 dout(10) << "handle_query_state" << dendl;
3732 PeeringState::QueryState q(f);
3733 recovery_state.handle_event(q, 0);
7c673cae 3734
9f95a23c
TL
3735 if (is_primary() && is_active()) {
3736 f->open_object_section("scrub");
3737 f->dump_stream("scrubber.epoch_start") << scrubber.epoch_start;
3738 f->dump_bool("scrubber.active", scrubber.active);
3739 f->dump_string("scrubber.state", PG::Scrubber::state_string(scrubber.state));
3740 f->dump_stream("scrubber.start") << scrubber.start;
3741 f->dump_stream("scrubber.end") << scrubber.end;
3742 f->dump_stream("scrubber.max_end") << scrubber.max_end;
3743 f->dump_stream("scrubber.subset_last_update") << scrubber.subset_last_update;
3744 f->dump_bool("scrubber.deep", scrubber.deep);
3745 {
3746 f->open_array_section("scrubber.waiting_on_whom");
3747 for (set<pg_shard_t>::iterator p = scrubber.waiting_on_whom.begin();
3748 p != scrubber.waiting_on_whom.end();
3749 ++p) {
3750 f->dump_stream("shard") << *p;
3751 }
3752 f->close_section();
3753 }
3754 f->close_section();
3755 }
7c673cae
FG
3756}
3757
9f95a23c 3758void PG::init_collection_pool_opts()
11fdf7f2 3759{
9f95a23c
TL
3760 auto r = osd->store->set_collection_opts(ch, pool.info.opts);
3761 if (r < 0 && r != -EOPNOTSUPP) {
3762 derr << __func__ << " set_collection_opts returns error:" << r << dendl;
11fdf7f2 3763 }
11fdf7f2
TL
3764}
3765
9f95a23c 3766void PG::on_pool_change()
7c673cae 3767{
9f95a23c
TL
3768 init_collection_pool_opts();
3769 plpg_on_pool_change();
7c673cae
FG
3770}
3771
9f95a23c
TL
3772void PG::C_DeleteMore::complete(int r) {
3773 ceph_assert(r == 0);
3774 pg->lock();
3775 if (!pg->pg_has_reset_since(epoch)) {
3776 pg->osd->queue_for_pg_delete(pg->get_pgid(), epoch);
7c673cae 3777 }
9f95a23c
TL
3778 pg->unlock();
3779 delete this;
7c673cae
FG
3780}
3781
9f95a23c 3782void PG::do_delete_work(ObjectStore::Transaction &t)
7c673cae 3783{
9f95a23c 3784 dout(10) << __func__ << dendl;
7c673cae 3785
9f95a23c
TL
3786 {
3787 float osd_delete_sleep = osd->osd->get_osd_delete_sleep();
3788 if (osd_delete_sleep > 0 && delete_needs_sleep) {
3789 epoch_t e = get_osdmap()->get_epoch();
3790 PGRef pgref(this);
3791 auto delete_requeue_callback = new LambdaContext([this, pgref, e](int r) {
3792 dout(20) << __func__ << " wake up at "
3793 << ceph_clock_now()
3794 << ", re-queuing delete" << dendl;
3795 std::scoped_lock locker{*this};
3796 delete_needs_sleep = false;
3797 if (!pg_has_reset_since(e)) {
3798 osd->queue_for_pg_delete(get_pgid(), e);
3799 }
3800 });
7c673cae 3801
9f95a23c
TL
3802 auto delete_schedule_time = ceph::real_clock::now();
3803 delete_schedule_time += ceph::make_timespan(osd_delete_sleep);
3804 std::lock_guard l{osd->sleep_lock};
3805 osd->sleep_timer.add_event_at(delete_schedule_time,
3806 delete_requeue_callback);
3807 dout(20) << __func__ << " Delete scheduled at " << delete_schedule_time << dendl;
3808 return;
3809 }
3810 }
7c673cae 3811
9f95a23c 3812 delete_needs_sleep = true;
7c673cae 3813
9f95a23c
TL
3814 vector<ghobject_t> olist;
3815 int max = std::min(osd->store->get_ideal_list_max(),
3816 (int)cct->_conf->osd_target_transaction_size);
3817 ghobject_t next;
3818 osd->store->collection_list(
3819 ch,
3820 next,
3821 ghobject_t::get_max(),
3822 max,
3823 &olist,
3824 &next);
3825 dout(20) << __func__ << " " << olist << dendl;
7c673cae 3826
9f95a23c
TL
3827 OSDriver::OSTransaction _t(osdriver.get_transaction(&t));
3828 int64_t num = 0;
3829 for (auto& oid : olist) {
3830 if (oid == pgmeta_oid) {
7c673cae
FG
3831 continue;
3832 }
9f95a23c
TL
3833 if (oid.is_pgmeta()) {
3834 osd->clog->warn() << info.pgid << " found stray pgmeta-like " << oid
3835 << " during PG removal";
7c673cae 3836 }
9f95a23c
TL
3837 int r = snap_mapper.remove_oid(oid.hobj, &_t);
3838 if (r != 0 && r != -ENOENT) {
3839 ceph_abort();
7c673cae 3840 }
9f95a23c
TL
3841 t.remove(coll, oid);
3842 ++num;
7c673cae 3843 }
9f95a23c
TL
3844 if (num) {
3845 dout(20) << __func__ << " deleting " << num << " objects" << dendl;
3846 Context *fin = new C_DeleteMore(this, get_osdmap_epoch());
3847 t.register_on_commit(fin);
7c673cae 3848 } else {
9f95a23c
TL
3849 dout(20) << __func__ << " finished" << dendl;
3850 if (cct->_conf->osd_inject_failure_on_pg_removal) {
3851 _exit(1);
7c673cae 3852 }
7c673cae 3853
9f95a23c
TL
3854 // final flush here to ensure completions drop refs. Of particular concern
3855 // are the SnapMapper ContainerContexts.
3856 {
3857 PGRef pgref(this);
3858 PGLog::clear_info_log(info.pgid, &t);
3859 t.remove_collection(coll);
3860 t.register_on_commit(new ContainerContext<PGRef>(pgref));
3861 t.register_on_applied(new ContainerContext<PGRef>(pgref));
3862 osd->store->queue_transaction(ch, std::move(t));
7c673cae 3863 }
9f95a23c 3864 ch->flush();
7c673cae 3865
9f95a23c
TL
3866 if (!osd->try_finish_pg_delete(this, pool.info.get_pg_num())) {
3867 dout(1) << __func__ << " raced with merge, reinstantiating" << dendl;
3868 ch = osd->store->create_new_collection(coll);
3869 create_pg_collection(t,
3870 info.pgid,
3871 info.pgid.get_split_bits(pool.info.get_pg_num()));
3872 init_pg_ondisk(t, info.pgid, &pool.info);
3873 recovery_state.reset_last_persisted();
3874 } else {
3875 recovery_state.set_delete_complete();
7c673cae 3876
9f95a23c
TL
3877 // cancel reserver here, since the PG is about to get deleted and the
3878 // exit() methods don't run when that happens.
3879 osd->local_reserver.cancel_reservation(info.pgid);
7c673cae 3880
9f95a23c
TL
3881 osd->logger->dec(l_osd_pg_removing);
3882 }
7c673cae 3883 }
7c673cae
FG
3884}
3885
9f95a23c 3886int PG::pg_stat_adjust(osd_stat_t *ns)
7c673cae 3887{
9f95a23c
TL
3888 osd_stat_t &new_stat = *ns;
3889 if (is_primary()) {
3890 return 0;
7c673cae 3891 }
9f95a23c
TL
3892 // Adjust the kb_used by adding pending backfill data
3893 uint64_t reserved_num_bytes = get_reserved_num_bytes();
7c673cae 3894
9f95a23c
TL
3895 // For now we don't consider projected space gains here
3896 // I suggest we have an optional 2 pass backfill that frees up
3897 // space in a first pass. This could be triggered when at nearfull
3898 // or near to backfillfull.
3899 if (reserved_num_bytes > 0) {
3900 // TODO: Handle compression by adjusting by the PGs average
3901 // compression precentage.
3902 dout(20) << __func__ << " reserved_num_bytes " << (reserved_num_bytes >> 10) << "KiB"
3903 << " Before kb_used " << new_stat.statfs.kb_used() << "KiB" << dendl;
3904 if (new_stat.statfs.available > reserved_num_bytes)
3905 new_stat.statfs.available -= reserved_num_bytes;
3906 else
3907 new_stat.statfs.available = 0;
3908 dout(20) << __func__ << " After kb_used " << new_stat.statfs.kb_used() << "KiB" << dendl;
3909 return 1;
7c673cae 3910 }
9f95a23c 3911 return 0;
7c673cae
FG
3912}
3913
3914ostream& operator<<(ostream& out, const PG::BackfillInterval& bi)
3915{
3916 out << "BackfillInfo(" << bi.begin << "-" << bi.end
3917 << " " << bi.objects.size() << " objects";
3918 if (!bi.objects.empty())
3919 out << " " << bi.objects;
3920 out << ")";
3921 return out;
3922}
3923
11fdf7f2
TL
3924void PG::dump_pgstate_history(Formatter *f)
3925{
9f95a23c
TL
3926 std::scoped_lock l{*this};
3927 recovery_state.dump_history(f);
11fdf7f2 3928}
7c673cae 3929
11fdf7f2
TL
3930void PG::dump_missing(Formatter *f)
3931{
9f95a23c 3932 for (auto& i : recovery_state.get_pg_log().get_missing().get_items()) {
11fdf7f2
TL
3933 f->open_object_section("object");
3934 f->dump_object("oid", i.first);
3935 f->dump_object("missing_info", i.second);
9f95a23c
TL
3936 if (recovery_state.get_missing_loc().needs_recovery(i.first)) {
3937 f->dump_bool(
3938 "unfound",
3939 recovery_state.get_missing_loc().is_unfound(i.first));
11fdf7f2 3940 f->open_array_section("locations");
9f95a23c 3941 for (auto l : recovery_state.get_missing_loc().get_locations(i.first)) {
11fdf7f2
TL
3942 f->dump_object("shard", l);
3943 }
3944 f->close_section();
3945 }
3946 f->close_section();
3947 }
3948}
3949
3950void PG::get_pg_stats(std::function<void(const pg_stat_t&, epoch_t lec)> f)
3951{
9f95a23c 3952 std::lock_guard l{pg_stats_publish_lock};
11fdf7f2
TL
3953 if (pg_stats_publish_valid) {
3954 f(pg_stats_publish, pg_stats_publish.get_effective_last_epoch_clean());
3955 }
11fdf7f2
TL
3956}
3957
3958void PG::with_heartbeat_peers(std::function<void(int)> f)
3959{
9f95a23c 3960 std::lock_guard l{heartbeat_peer_lock};
11fdf7f2
TL
3961 for (auto p : heartbeat_peers) {
3962 f(p);
3963 }
3964 for (auto p : probe_targets) {
3965 f(p);
3966 }
9f95a23c
TL
3967}
3968
3969uint64_t PG::get_min_alloc_size() const {
3970 return osd->store->get_min_alloc_size();
11fdf7f2 3971}