]> git.proxmox.com Git - ceph.git/blob - ceph/src/osd/PG.cc
import 15.2.9
[ceph.git] / ceph / src / osd / PG.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #include "PG.h"
16 // #include "msg/Messenger.h"
17 #include "messages/MOSDRepScrub.h"
18 // #include "common/cmdparse.h"
19 // #include "common/ceph_context.h"
20
21 #include "common/errno.h"
22 #include "common/ceph_releases.h"
23 #include "common/config.h"
24 #include "OSD.h"
25 #include "OpRequest.h"
26 #include "ScrubStore.h"
27 #include "Session.h"
28 #include "osd/scheduler/OpSchedulerItem.h"
29
30 #include "common/Timer.h"
31 #include "common/perf_counters.h"
32
33 #include "messages/MOSDOp.h"
34 #include "messages/MOSDPGNotify.h"
35 // #include "messages/MOSDPGLog.h"
36 #include "messages/MOSDPGInfo.h"
37 #include "messages/MOSDPGScan.h"
38 #include "messages/MOSDPGBackfill.h"
39 #include "messages/MOSDPGBackfillRemove.h"
40 #include "messages/MBackfillReserve.h"
41 #include "messages/MRecoveryReserve.h"
42 #include "messages/MOSDPGPush.h"
43 #include "messages/MOSDPGPushReply.h"
44 #include "messages/MOSDPGPull.h"
45 #include "messages/MOSDECSubOpWrite.h"
46 #include "messages/MOSDECSubOpWriteReply.h"
47 #include "messages/MOSDECSubOpRead.h"
48 #include "messages/MOSDECSubOpReadReply.h"
49 #include "messages/MOSDPGUpdateLogMissing.h"
50 #include "messages/MOSDPGUpdateLogMissingReply.h"
51 #include "messages/MOSDBackoff.h"
52 #include "messages/MOSDScrubReserve.h"
53 #include "messages/MOSDRepOp.h"
54 #include "messages/MOSDRepOpReply.h"
55 #include "messages/MOSDRepScrubMap.h"
56 #include "messages/MOSDPGRecoveryDelete.h"
57 #include "messages/MOSDPGRecoveryDeleteReply.h"
58
59 #include "common/BackTrace.h"
60 #include "common/EventTrace.h"
61
62 #ifdef WITH_LTTNG
63 #define TRACEPOINT_DEFINE
64 #define TRACEPOINT_PROBE_DYNAMIC_LINKAGE
65 #include "tracing/pg.h"
66 #undef TRACEPOINT_PROBE_DYNAMIC_LINKAGE
67 #undef TRACEPOINT_DEFINE
68 #else
69 #define tracepoint(...)
70 #endif
71
72 #include <sstream>
73
74 #define dout_context cct
75 #define dout_subsys ceph_subsys_osd
76 #undef dout_prefix
77 #define dout_prefix _prefix(_dout, this)
78
79 using namespace ceph::osd::scheduler;
80
81 template <class T>
82 static ostream& _prefix(std::ostream *_dout, T *t)
83 {
84 return t->gen_prefix(*_dout);
85 }
86
87 void PG::get(const char* tag)
88 {
89 int after = ++ref;
90 lgeneric_subdout(cct, refs, 5) << "PG::get " << this << " "
91 << "tag " << (tag ? tag : "(none") << " "
92 << (after - 1) << " -> " << after << dendl;
93 #ifdef PG_DEBUG_REFS
94 std::lock_guard l(_ref_id_lock);
95 _tag_counts[tag]++;
96 #endif
97 }
98
99 void PG::put(const char* tag)
100 {
101 #ifdef PG_DEBUG_REFS
102 {
103 std::lock_guard l(_ref_id_lock);
104 auto tag_counts_entry = _tag_counts.find(tag);
105 ceph_assert(tag_counts_entry != _tag_counts.end());
106 --tag_counts_entry->second;
107 if (tag_counts_entry->second == 0) {
108 _tag_counts.erase(tag_counts_entry);
109 }
110 }
111 #endif
112 auto local_cct = cct;
113 int after = --ref;
114 lgeneric_subdout(local_cct, refs, 5) << "PG::put " << this << " "
115 << "tag " << (tag ? tag : "(none") << " "
116 << (after + 1) << " -> " << after
117 << dendl;
118 if (after == 0)
119 delete this;
120 }
121
122 #ifdef PG_DEBUG_REFS
123 uint64_t PG::get_with_id()
124 {
125 ref++;
126 std::lock_guard l(_ref_id_lock);
127 uint64_t id = ++_ref_id;
128 BackTrace bt(0);
129 stringstream ss;
130 bt.print(ss);
131 lgeneric_subdout(cct, refs, 5) << "PG::get " << this << " " << info.pgid
132 << " got id " << id << " "
133 << (ref - 1) << " -> " << ref
134 << dendl;
135 ceph_assert(!_live_ids.count(id));
136 _live_ids.insert(make_pair(id, ss.str()));
137 return id;
138 }
139
140 void PG::put_with_id(uint64_t id)
141 {
142 int newref = --ref;
143 lgeneric_subdout(cct, refs, 5) << "PG::put " << this << " " << info.pgid
144 << " put id " << id << " "
145 << (newref + 1) << " -> " << newref
146 << dendl;
147 {
148 std::lock_guard l(_ref_id_lock);
149 ceph_assert(_live_ids.count(id));
150 _live_ids.erase(id);
151 }
152 if (newref)
153 delete this;
154 }
155
156 void PG::dump_live_ids()
157 {
158 std::lock_guard l(_ref_id_lock);
159 dout(0) << "\t" << __func__ << ": " << info.pgid << " live ids:" << dendl;
160 for (map<uint64_t, string>::iterator i = _live_ids.begin();
161 i != _live_ids.end();
162 ++i) {
163 dout(0) << "\t\tid: " << *i << dendl;
164 }
165 dout(0) << "\t" << __func__ << ": " << info.pgid << " live tags:" << dendl;
166 for (map<string, uint64_t>::iterator i = _tag_counts.begin();
167 i != _tag_counts.end();
168 ++i) {
169 dout(0) << "\t\tid: " << *i << dendl;
170 }
171 }
172 #endif
173
174 PG::PG(OSDService *o, OSDMapRef curmap,
175 const PGPool &_pool, spg_t p) :
176 pg_whoami(o->whoami, p.shard),
177 pg_id(p),
178 coll(p),
179 osd(o),
180 cct(o->cct),
181 osdriver(osd->store, coll_t(), OSD::make_snapmapper_oid()),
182 snap_mapper(
183 cct,
184 &osdriver,
185 p.ps(),
186 p.get_split_bits(_pool.info.get_pg_num()),
187 _pool.id,
188 p.shard),
189 trace_endpoint("0.0.0.0", 0, "PG"),
190 info_struct_v(0),
191 pgmeta_oid(p.make_pgmeta_oid()),
192 stat_queue_item(this),
193 scrub_queued(false),
194 recovery_queued(false),
195 recovery_ops_active(0),
196 backfill_reserving(false),
197 pg_stats_publish_valid(false),
198 finish_sync_event(NULL),
199 scrub_after_recovery(false),
200 save_req_scrub(false),
201 active_pushes(0),
202 recovery_state(
203 o->cct,
204 pg_whoami,
205 p,
206 _pool,
207 curmap,
208 this,
209 this),
210 pool(recovery_state.get_pool()),
211 info(recovery_state.get_info())
212 {
213 #ifdef PG_DEBUG_REFS
214 osd->add_pgid(p, this);
215 #endif
216 #ifdef WITH_BLKIN
217 std::stringstream ss;
218 ss << "PG " << info.pgid;
219 trace_endpoint.copy_name(ss.str());
220 #endif
221 }
222
223 PG::~PG()
224 {
225 #ifdef PG_DEBUG_REFS
226 osd->remove_pgid(info.pgid, this);
227 #endif
228 }
229
230 void PG::lock(bool no_lockdep) const
231 {
232 #ifdef CEPH_DEBUG_MUTEX
233 _lock.lock(no_lockdep);
234 #else
235 _lock.lock();
236 locked_by = std::this_thread::get_id();
237 #endif
238 // if we have unrecorded dirty state with the lock dropped, there is a bug
239 ceph_assert(!recovery_state.debug_has_dirty_state());
240
241 dout(30) << "lock" << dendl;
242 }
243
244 bool PG::is_locked() const
245 {
246 return ceph_mutex_is_locked(_lock);
247 }
248
249 void PG::unlock() const
250 {
251 //generic_dout(0) << this << " " << info.pgid << " unlock" << dendl;
252 ceph_assert(!recovery_state.debug_has_dirty_state());
253 #ifndef CEPH_DEBUG_MUTEX
254 locked_by = {};
255 #endif
256 _lock.unlock();
257 }
258
259 std::ostream& PG::gen_prefix(std::ostream& out) const
260 {
261 OSDMapRef mapref = recovery_state.get_osdmap();
262 #ifdef CEPH_DEBUG_MUTEX
263 if (_lock.is_locked_by_me()) {
264 #else
265 if (locked_by == std::this_thread::get_id()) {
266 #endif
267 out << "osd." << osd->whoami
268 << " pg_epoch: " << (mapref ? mapref->get_epoch():0)
269 << " " << *this << " ";
270 } else {
271 out << "osd." << osd->whoami
272 << " pg_epoch: " << (mapref ? mapref->get_epoch():0)
273 << " pg[" << pg_id.pgid << "(unlocked)] ";
274 }
275 return out;
276 }
277
278 PerfCounters &PG::get_peering_perf() {
279 return *(osd->recoverystate_perf);
280 }
281
282 PerfCounters &PG::get_perf_logger() {
283 return *(osd->logger);
284 }
285
286 void PG::log_state_enter(const char *state) {
287 osd->pg_recovery_stats.log_enter(state);
288 }
289
290 void PG::log_state_exit(
291 const char *state_name, utime_t enter_time,
292 uint64_t events, utime_t event_dur) {
293 osd->pg_recovery_stats.log_exit(
294 state_name, ceph_clock_now() - enter_time, events, event_dur);
295 }
296
297 /********* PG **********/
298
299 void PG::remove_snap_mapped_object(
300 ObjectStore::Transaction &t, const hobject_t &soid)
301 {
302 t.remove(
303 coll,
304 ghobject_t(soid, ghobject_t::NO_GEN, pg_whoami.shard));
305 clear_object_snap_mapping(&t, soid);
306 }
307
308 void PG::clear_object_snap_mapping(
309 ObjectStore::Transaction *t, const hobject_t &soid)
310 {
311 OSDriver::OSTransaction _t(osdriver.get_transaction(t));
312 if (soid.snap < CEPH_MAXSNAP) {
313 int r = snap_mapper.remove_oid(
314 soid,
315 &_t);
316 if (!(r == 0 || r == -ENOENT)) {
317 derr << __func__ << ": remove_oid returned " << cpp_strerror(r) << dendl;
318 ceph_abort();
319 }
320 }
321 }
322
323 void PG::update_object_snap_mapping(
324 ObjectStore::Transaction *t, const hobject_t &soid, const set<snapid_t> &snaps)
325 {
326 OSDriver::OSTransaction _t(osdriver.get_transaction(t));
327 ceph_assert(soid.snap < CEPH_MAXSNAP);
328 int r = snap_mapper.remove_oid(
329 soid,
330 &_t);
331 if (!(r == 0 || r == -ENOENT)) {
332 derr << __func__ << ": remove_oid returned " << cpp_strerror(r) << dendl;
333 ceph_abort();
334 }
335 snap_mapper.add_oid(
336 soid,
337 snaps,
338 &_t);
339 }
340
341 /******* PG ***********/
342 void PG::clear_primary_state()
343 {
344 projected_log = PGLog::IndexedLog();
345
346 snap_trimq.clear();
347 snap_trimq_repeat.clear();
348 finish_sync_event = 0; // so that _finish_recovery doesn't go off in another thread
349 release_pg_backoffs();
350
351 scrubber.reserved_peers.clear();
352 scrub_after_recovery = false;
353 save_req_scrub = false;
354
355 agent_clear();
356 }
357
358 PG::Scrubber::Scrubber()
359 : local_reserved(false), remote_reserved(false), reserve_failed(false),
360 epoch_start(0),
361 active(false),
362 shallow_errors(0), deep_errors(0), fixed(0),
363 must_scrub(false), must_deep_scrub(false), must_repair(false),
364 need_auto(false), req_scrub(false), time_for_deep(false),
365 auto_repair(false),
366 check_repair(false),
367 deep_scrub_on_error(false),
368 num_digest_updates_pending(0),
369 state(INACTIVE),
370 deep(false)
371 {}
372
373 PG::Scrubber::~Scrubber() {}
374
375 bool PG::op_has_sufficient_caps(OpRequestRef& op)
376 {
377 // only check MOSDOp
378 if (op->get_req()->get_type() != CEPH_MSG_OSD_OP)
379 return true;
380
381 auto req = op->get_req<MOSDOp>();
382 auto priv = req->get_connection()->get_priv();
383 auto session = static_cast<Session*>(priv.get());
384 if (!session) {
385 dout(0) << "op_has_sufficient_caps: no session for op " << *req << dendl;
386 return false;
387 }
388 OSDCap& caps = session->caps;
389 priv.reset();
390
391 const string &key = req->get_hobj().get_key().empty() ?
392 req->get_oid().name :
393 req->get_hobj().get_key();
394
395 bool cap = caps.is_capable(pool.name, req->get_hobj().nspace,
396 pool.info.application_metadata,
397 key,
398 op->need_read_cap(),
399 op->need_write_cap(),
400 op->classes(),
401 session->get_peer_socket_addr());
402
403 dout(20) << "op_has_sufficient_caps "
404 << "session=" << session
405 << " pool=" << pool.id << " (" << pool.name
406 << " " << req->get_hobj().nspace
407 << ")"
408 << " pool_app_metadata=" << pool.info.application_metadata
409 << " need_read_cap=" << op->need_read_cap()
410 << " need_write_cap=" << op->need_write_cap()
411 << " classes=" << op->classes()
412 << " -> " << (cap ? "yes" : "NO")
413 << dendl;
414 return cap;
415 }
416
417 bool PG::requeue_scrub(bool high_priority)
418 {
419 ceph_assert(ceph_mutex_is_locked(_lock));
420 if (scrub_queued) {
421 dout(10) << __func__ << ": already queued" << dendl;
422 return false;
423 } else {
424 dout(10) << __func__ << ": queueing" << dendl;
425 scrub_queued = true;
426 osd->queue_for_scrub(this, high_priority);
427 return true;
428 }
429 }
430
431 void PG::queue_recovery()
432 {
433 if (!is_primary() || !is_peered()) {
434 dout(10) << "queue_recovery -- not primary or not peered " << dendl;
435 ceph_assert(!recovery_queued);
436 } else if (recovery_queued) {
437 dout(10) << "queue_recovery -- already queued" << dendl;
438 } else {
439 dout(10) << "queue_recovery -- queuing" << dendl;
440 recovery_queued = true;
441 osd->queue_for_recovery(this);
442 }
443 }
444
445 bool PG::queue_scrub()
446 {
447 ceph_assert(ceph_mutex_is_locked(_lock));
448 if (is_scrubbing()) {
449 return false;
450 }
451 // An interrupted recovery repair could leave this set.
452 state_clear(PG_STATE_REPAIR);
453 if (scrubber.need_auto) {
454 scrubber.must_scrub = true;
455 scrubber.must_deep_scrub = true;
456 scrubber.auto_repair = true;
457 scrubber.need_auto = false;
458 }
459 scrubber.priority = scrubber.must_scrub ?
460 cct->_conf->osd_requested_scrub_priority : get_scrub_priority();
461 scrubber.must_scrub = false;
462 state_set(PG_STATE_SCRUBBING);
463 if (scrubber.must_deep_scrub) {
464 state_set(PG_STATE_DEEP_SCRUB);
465 scrubber.must_deep_scrub = false;
466 }
467 if (scrubber.must_repair || scrubber.auto_repair) {
468 state_set(PG_STATE_REPAIR);
469 scrubber.must_repair = false;
470 }
471 requeue_scrub();
472 return true;
473 }
474
475 unsigned PG::get_scrub_priority()
476 {
477 // a higher value -> a higher priority
478 int64_t pool_scrub_priority = 0;
479 pool.info.opts.get(pool_opts_t::SCRUB_PRIORITY, &pool_scrub_priority);
480 return pool_scrub_priority > 0 ? pool_scrub_priority : cct->_conf->osd_scrub_priority;
481 }
482
483 Context *PG::finish_recovery()
484 {
485 dout(10) << "finish_recovery" << dendl;
486 ceph_assert(info.last_complete == info.last_update);
487
488 clear_recovery_state();
489
490 /*
491 * sync all this before purging strays. but don't block!
492 */
493 finish_sync_event = new C_PG_FinishRecovery(this);
494 return finish_sync_event;
495 }
496
497 void PG::_finish_recovery(Context *c)
498 {
499 std::scoped_lock locker{*this};
500 if (recovery_state.is_deleting() || !is_clean()) {
501 dout(10) << __func__ << " raced with delete or repair" << dendl;
502 return;
503 }
504 // When recovery is initiated by a repair, that flag is left on
505 state_clear(PG_STATE_REPAIR);
506 if (c == finish_sync_event) {
507 dout(10) << "_finish_recovery" << dendl;
508 finish_sync_event = 0;
509 recovery_state.purge_strays();
510
511 publish_stats_to_osd();
512
513 if (scrub_after_recovery) {
514 dout(10) << "_finish_recovery requeueing for scrub" << dendl;
515 scrub_after_recovery = false;
516 scrubber.must_deep_scrub = true;
517 scrubber.check_repair = true;
518 // We remember whether req_scrub was set when scrub_after_recovery set to true
519 scrubber.req_scrub = save_req_scrub;
520 queue_scrub();
521 }
522 } else {
523 dout(10) << "_finish_recovery -- stale" << dendl;
524 }
525 }
526
527 void PG::start_recovery_op(const hobject_t& soid)
528 {
529 dout(10) << "start_recovery_op " << soid
530 #ifdef DEBUG_RECOVERY_OIDS
531 << " (" << recovering_oids << ")"
532 #endif
533 << dendl;
534 ceph_assert(recovery_ops_active >= 0);
535 recovery_ops_active++;
536 #ifdef DEBUG_RECOVERY_OIDS
537 recovering_oids.insert(soid);
538 #endif
539 osd->start_recovery_op(this, soid);
540 }
541
542 void PG::finish_recovery_op(const hobject_t& soid, bool dequeue)
543 {
544 dout(10) << "finish_recovery_op " << soid
545 #ifdef DEBUG_RECOVERY_OIDS
546 << " (" << recovering_oids << ")"
547 #endif
548 << dendl;
549 ceph_assert(recovery_ops_active > 0);
550 recovery_ops_active--;
551 #ifdef DEBUG_RECOVERY_OIDS
552 ceph_assert(recovering_oids.count(soid));
553 recovering_oids.erase(recovering_oids.find(soid));
554 #endif
555 osd->finish_recovery_op(this, soid, dequeue);
556
557 if (!dequeue) {
558 queue_recovery();
559 }
560 }
561
562 void PG::split_into(pg_t child_pgid, PG *child, unsigned split_bits)
563 {
564 recovery_state.split_into(child_pgid, &child->recovery_state, split_bits);
565
566 child->update_snap_mapper_bits(split_bits);
567
568 child->snap_trimq = snap_trimq;
569 child->snap_trimq_repeat = snap_trimq_repeat;
570
571 _split_into(child_pgid, child, split_bits);
572
573 // release all backoffs for simplicity
574 release_backoffs(hobject_t(), hobject_t::get_max());
575 }
576
577 void PG::start_split_stats(const set<spg_t>& childpgs, vector<object_stat_sum_t> *out)
578 {
579 recovery_state.start_split_stats(childpgs, out);
580 }
581
582 void PG::finish_split_stats(const object_stat_sum_t& stats, ObjectStore::Transaction &t)
583 {
584 recovery_state.finish_split_stats(stats, t);
585 }
586
587 void PG::merge_from(map<spg_t,PGRef>& sources, PeeringCtx &rctx,
588 unsigned split_bits,
589 const pg_merge_meta_t& last_pg_merge_meta)
590 {
591 dout(10) << __func__ << " from " << sources << " split_bits " << split_bits
592 << dendl;
593 map<spg_t, PeeringState*> source_ps;
594 for (auto &&source : sources) {
595 source_ps.emplace(source.first, &source.second->recovery_state);
596 }
597 recovery_state.merge_from(source_ps, rctx, split_bits, last_pg_merge_meta);
598
599 for (auto& i : sources) {
600 auto& source = i.second;
601 // wipe out source's pgmeta
602 rctx.transaction.remove(source->coll, source->pgmeta_oid);
603
604 // merge (and destroy source collection)
605 rctx.transaction.merge_collection(source->coll, coll, split_bits);
606 }
607
608 // merge_collection does this, but maybe all of our sources were missing.
609 rctx.transaction.collection_set_bits(coll, split_bits);
610
611 snap_mapper.update_bits(split_bits);
612 }
613
614 void PG::add_backoff(const ceph::ref_t<Session>& s, const hobject_t& begin, const hobject_t& end)
615 {
616 auto con = s->con;
617 if (!con) // OSD::ms_handle_reset clears s->con without a lock
618 return;
619 auto b = s->have_backoff(info.pgid, begin);
620 if (b) {
621 derr << __func__ << " already have backoff for " << s << " begin " << begin
622 << " " << *b << dendl;
623 ceph_abort();
624 }
625 std::lock_guard l(backoff_lock);
626 b = ceph::make_ref<Backoff>(info.pgid, this, s, ++s->backoff_seq, begin, end);
627 backoffs[begin].insert(b);
628 s->add_backoff(b);
629 dout(10) << __func__ << " session " << s << " added " << *b << dendl;
630 con->send_message(
631 new MOSDBackoff(
632 info.pgid,
633 get_osdmap_epoch(),
634 CEPH_OSD_BACKOFF_OP_BLOCK,
635 b->id,
636 begin,
637 end));
638 }
639
640 void PG::release_backoffs(const hobject_t& begin, const hobject_t& end)
641 {
642 dout(10) << __func__ << " [" << begin << "," << end << ")" << dendl;
643 vector<ceph::ref_t<Backoff>> bv;
644 {
645 std::lock_guard l(backoff_lock);
646 auto p = backoffs.lower_bound(begin);
647 while (p != backoffs.end()) {
648 int r = cmp(p->first, end);
649 dout(20) << __func__ << " ? " << r << " " << p->first
650 << " " << p->second << dendl;
651 // note: must still examine begin=end=p->first case
652 if (r > 0 || (r == 0 && begin < end)) {
653 break;
654 }
655 dout(20) << __func__ << " checking " << p->first
656 << " " << p->second << dendl;
657 auto q = p->second.begin();
658 while (q != p->second.end()) {
659 dout(20) << __func__ << " checking " << *q << dendl;
660 int r = cmp((*q)->begin, begin);
661 if (r == 0 || (r > 0 && (*q)->end < end)) {
662 bv.push_back(*q);
663 q = p->second.erase(q);
664 } else {
665 ++q;
666 }
667 }
668 if (p->second.empty()) {
669 p = backoffs.erase(p);
670 } else {
671 ++p;
672 }
673 }
674 }
675 for (auto b : bv) {
676 std::lock_guard l(b->lock);
677 dout(10) << __func__ << " " << *b << dendl;
678 if (b->session) {
679 ceph_assert(b->pg == this);
680 ConnectionRef con = b->session->con;
681 if (con) { // OSD::ms_handle_reset clears s->con without a lock
682 con->send_message(
683 new MOSDBackoff(
684 info.pgid,
685 get_osdmap_epoch(),
686 CEPH_OSD_BACKOFF_OP_UNBLOCK,
687 b->id,
688 b->begin,
689 b->end));
690 }
691 if (b->is_new()) {
692 b->state = Backoff::STATE_DELETING;
693 } else {
694 b->session->rm_backoff(b);
695 b->session.reset();
696 }
697 b->pg.reset();
698 }
699 }
700 }
701
702 void PG::clear_backoffs()
703 {
704 dout(10) << __func__ << " " << dendl;
705 map<hobject_t,set<ceph::ref_t<Backoff>>> ls;
706 {
707 std::lock_guard l(backoff_lock);
708 ls.swap(backoffs);
709 }
710 for (auto& p : ls) {
711 for (auto& b : p.second) {
712 std::lock_guard l(b->lock);
713 dout(10) << __func__ << " " << *b << dendl;
714 if (b->session) {
715 ceph_assert(b->pg == this);
716 if (b->is_new()) {
717 b->state = Backoff::STATE_DELETING;
718 } else {
719 b->session->rm_backoff(b);
720 b->session.reset();
721 }
722 b->pg.reset();
723 }
724 }
725 }
726 }
727
728 // called by Session::clear_backoffs()
729 void PG::rm_backoff(const ceph::ref_t<Backoff>& b)
730 {
731 dout(10) << __func__ << " " << *b << dendl;
732 std::lock_guard l(backoff_lock);
733 ceph_assert(ceph_mutex_is_locked_by_me(b->lock));
734 ceph_assert(b->pg == this);
735 auto p = backoffs.find(b->begin);
736 // may race with release_backoffs()
737 if (p != backoffs.end()) {
738 auto q = p->second.find(b);
739 if (q != p->second.end()) {
740 p->second.erase(q);
741 if (p->second.empty()) {
742 backoffs.erase(p);
743 }
744 }
745 }
746 }
747
748 void PG::clear_recovery_state()
749 {
750 dout(10) << "clear_recovery_state" << dendl;
751
752 finish_sync_event = 0;
753
754 hobject_t soid;
755 while (recovery_ops_active > 0) {
756 #ifdef DEBUG_RECOVERY_OIDS
757 soid = *recovering_oids.begin();
758 #endif
759 finish_recovery_op(soid, true);
760 }
761
762 backfill_info.clear();
763 peer_backfill_info.clear();
764 waiting_on_backfill.clear();
765 _clear_recovery_state(); // pg impl specific hook
766 }
767
768 void PG::cancel_recovery()
769 {
770 dout(10) << "cancel_recovery" << dendl;
771 clear_recovery_state();
772 }
773
774 void PG::set_probe_targets(const set<pg_shard_t> &probe_set)
775 {
776 std::lock_guard l(heartbeat_peer_lock);
777 probe_targets.clear();
778 for (set<pg_shard_t>::iterator i = probe_set.begin();
779 i != probe_set.end();
780 ++i) {
781 probe_targets.insert(i->osd);
782 }
783 }
784
785 void PG::send_cluster_message(
786 int target, Message *m,
787 epoch_t epoch, bool share_map_update=false)
788 {
789 ConnectionRef con = osd->get_con_osd_cluster(
790 target, get_osdmap_epoch());
791 if (!con) {
792 m->put();
793 return;
794 }
795
796 if (share_map_update) {
797 osd->maybe_share_map(con.get(), get_osdmap());
798 }
799 osd->send_message_osd_cluster(m, con.get());
800 }
801
802 void PG::clear_probe_targets()
803 {
804 std::lock_guard l(heartbeat_peer_lock);
805 probe_targets.clear();
806 }
807
808 void PG::update_heartbeat_peers(set<int> new_peers)
809 {
810 bool need_update = false;
811 heartbeat_peer_lock.lock();
812 if (new_peers == heartbeat_peers) {
813 dout(10) << "update_heartbeat_peers " << heartbeat_peers << " unchanged" << dendl;
814 } else {
815 dout(10) << "update_heartbeat_peers " << heartbeat_peers << " -> " << new_peers << dendl;
816 heartbeat_peers.swap(new_peers);
817 need_update = true;
818 }
819 heartbeat_peer_lock.unlock();
820
821 if (need_update)
822 osd->need_heartbeat_peer_update();
823 }
824
825
826 bool PG::check_in_progress_op(
827 const osd_reqid_t &r,
828 eversion_t *version,
829 version_t *user_version,
830 int *return_code,
831 vector<pg_log_op_return_item_t> *op_returns
832 ) const
833 {
834 return (
835 projected_log.get_request(r, version, user_version, return_code,
836 op_returns) ||
837 recovery_state.get_pg_log().get_log().get_request(
838 r, version, user_version, return_code, op_returns));
839 }
840
841 void PG::publish_stats_to_osd()
842 {
843 if (!is_primary())
844 return;
845
846 std::lock_guard l{pg_stats_publish_lock};
847 auto stats = recovery_state.prepare_stats_for_publish(
848 pg_stats_publish_valid,
849 pg_stats_publish,
850 unstable_stats);
851 if (stats) {
852 pg_stats_publish = stats.value();
853 pg_stats_publish_valid = true;
854 }
855 }
856
857 unsigned PG::get_target_pg_log_entries() const
858 {
859 return osd->get_target_pg_log_entries();
860 }
861
862 void PG::clear_publish_stats()
863 {
864 dout(15) << "clear_stats" << dendl;
865 std::lock_guard l{pg_stats_publish_lock};
866 pg_stats_publish_valid = false;
867 }
868
869 /**
870 * initialize a newly instantiated pg
871 *
872 * Initialize PG state, as when a PG is initially created, or when it
873 * is first instantiated on the current node.
874 *
875 * @param role our role/rank
876 * @param newup up set
877 * @param newacting acting set
878 * @param history pg history
879 * @param pi past_intervals
880 * @param backfill true if info should be marked as backfill
881 * @param t transaction to write out our new state in
882 */
883 void PG::init(
884 int role,
885 const vector<int>& newup, int new_up_primary,
886 const vector<int>& newacting, int new_acting_primary,
887 const pg_history_t& history,
888 const PastIntervals& pi,
889 bool backfill,
890 ObjectStore::Transaction &t)
891 {
892 recovery_state.init(
893 role, newup, new_up_primary, newacting,
894 new_acting_primary, history, pi, backfill, t);
895 }
896
897 void PG::shutdown()
898 {
899 ch->flush();
900 std::scoped_lock l{*this};
901 recovery_state.shutdown();
902 on_shutdown();
903 }
904
905 #pragma GCC diagnostic ignored "-Wpragmas"
906 #pragma GCC diagnostic push
907 #pragma GCC diagnostic ignored "-Wdeprecated-declarations"
908
909 void PG::upgrade(ObjectStore *store)
910 {
911 dout(0) << __func__ << " " << info_struct_v << " -> " << pg_latest_struct_v
912 << dendl;
913 ceph_assert(info_struct_v <= 10);
914 ObjectStore::Transaction t;
915
916 // <do upgrade steps here>
917
918 // finished upgrade!
919 ceph_assert(info_struct_v == 10);
920
921 // update infover_key
922 if (info_struct_v < pg_latest_struct_v) {
923 map<string,bufferlist> v;
924 __u8 ver = pg_latest_struct_v;
925 encode(ver, v[string(infover_key)]);
926 t.omap_setkeys(coll, pgmeta_oid, v);
927 }
928
929 recovery_state.force_write_state(t);
930
931 ObjectStore::CollectionHandle ch = store->open_collection(coll);
932 int r = store->queue_transaction(ch, std::move(t));
933 if (r != 0) {
934 derr << __func__ << ": queue_transaction returned "
935 << cpp_strerror(r) << dendl;
936 ceph_abort();
937 }
938 ceph_assert(r == 0);
939
940 C_SaferCond waiter;
941 if (!ch->flush_commit(&waiter)) {
942 waiter.wait();
943 }
944 }
945
946 #pragma GCC diagnostic pop
947 #pragma GCC diagnostic warning "-Wpragmas"
948
949 void PG::prepare_write(
950 pg_info_t &info,
951 pg_info_t &last_written_info,
952 PastIntervals &past_intervals,
953 PGLog &pglog,
954 bool dirty_info,
955 bool dirty_big_info,
956 bool need_write_epoch,
957 ObjectStore::Transaction &t)
958 {
959 info.stats.stats.add(unstable_stats);
960 unstable_stats.clear();
961 map<string,bufferlist> km;
962 string key_to_remove;
963 if (dirty_big_info || dirty_info) {
964 int ret = prepare_info_keymap(
965 cct,
966 &km,
967 &key_to_remove,
968 get_osdmap_epoch(),
969 info,
970 last_written_info,
971 past_intervals,
972 dirty_big_info,
973 need_write_epoch,
974 cct->_conf->osd_fast_info,
975 osd->logger,
976 this);
977 ceph_assert(ret == 0);
978 }
979 pglog.write_log_and_missing(
980 t, &km, coll, pgmeta_oid, pool.info.require_rollback());
981 if (!km.empty())
982 t.omap_setkeys(coll, pgmeta_oid, km);
983 if (!key_to_remove.empty())
984 t.omap_rmkey(coll, pgmeta_oid, key_to_remove);
985 }
986
987 #pragma GCC diagnostic ignored "-Wpragmas"
988 #pragma GCC diagnostic push
989 #pragma GCC diagnostic ignored "-Wdeprecated-declarations"
990
991 bool PG::_has_removal_flag(ObjectStore *store,
992 spg_t pgid)
993 {
994 coll_t coll(pgid);
995 ghobject_t pgmeta_oid(pgid.make_pgmeta_oid());
996
997 // first try new way
998 set<string> keys;
999 keys.insert("_remove");
1000 map<string,bufferlist> values;
1001 auto ch = store->open_collection(coll);
1002 ceph_assert(ch);
1003 if (store->omap_get_values(ch, pgmeta_oid, keys, &values) == 0 &&
1004 values.size() == 1)
1005 return true;
1006
1007 return false;
1008 }
1009
1010 int PG::peek_map_epoch(ObjectStore *store,
1011 spg_t pgid,
1012 epoch_t *pepoch)
1013 {
1014 coll_t coll(pgid);
1015 ghobject_t legacy_infos_oid(OSD::make_infos_oid());
1016 ghobject_t pgmeta_oid(pgid.make_pgmeta_oid());
1017 epoch_t cur_epoch = 0;
1018
1019 // validate collection name
1020 ceph_assert(coll.is_pg());
1021
1022 // try for v8
1023 set<string> keys;
1024 keys.insert(string(infover_key));
1025 keys.insert(string(epoch_key));
1026 map<string,bufferlist> values;
1027 auto ch = store->open_collection(coll);
1028 ceph_assert(ch);
1029 int r = store->omap_get_values(ch, pgmeta_oid, keys, &values);
1030 if (r == 0) {
1031 ceph_assert(values.size() == 2);
1032
1033 // sanity check version
1034 auto bp = values[string(infover_key)].cbegin();
1035 __u8 struct_v = 0;
1036 decode(struct_v, bp);
1037 ceph_assert(struct_v >= 8);
1038
1039 // get epoch
1040 bp = values[string(epoch_key)].begin();
1041 decode(cur_epoch, bp);
1042 } else {
1043 // probably bug 10617; see OSD::load_pgs()
1044 return -1;
1045 }
1046
1047 *pepoch = cur_epoch;
1048 return 0;
1049 }
1050
1051 #pragma GCC diagnostic pop
1052 #pragma GCC diagnostic warning "-Wpragmas"
1053
1054 bool PG::check_log_for_corruption(ObjectStore *store)
1055 {
1056 /// TODO: this method needs to work with the omap log
1057 return true;
1058 }
1059
1060 //! Get the name we're going to save our corrupt page log as
1061 std::string PG::get_corrupt_pg_log_name() const
1062 {
1063 const int MAX_BUF = 512;
1064 char buf[MAX_BUF];
1065 struct tm tm_buf;
1066 time_t my_time(time(NULL));
1067 const struct tm *t = localtime_r(&my_time, &tm_buf);
1068 int ret = strftime(buf, sizeof(buf), "corrupt_log_%Y-%m-%d_%k:%M_", t);
1069 if (ret == 0) {
1070 dout(0) << "strftime failed" << dendl;
1071 return "corrupt_log_unknown_time";
1072 }
1073 string out(buf);
1074 out += stringify(info.pgid);
1075 return out;
1076 }
1077
1078 int PG::read_info(
1079 ObjectStore *store, spg_t pgid, const coll_t &coll,
1080 pg_info_t &info, PastIntervals &past_intervals,
1081 __u8 &struct_v)
1082 {
1083 set<string> keys;
1084 keys.insert(string(infover_key));
1085 keys.insert(string(info_key));
1086 keys.insert(string(biginfo_key));
1087 keys.insert(string(fastinfo_key));
1088 ghobject_t pgmeta_oid(pgid.make_pgmeta_oid());
1089 map<string,bufferlist> values;
1090 auto ch = store->open_collection(coll);
1091 ceph_assert(ch);
1092 int r = store->omap_get_values(ch, pgmeta_oid, keys, &values);
1093 ceph_assert(r == 0);
1094 ceph_assert(values.size() == 3 ||
1095 values.size() == 4);
1096
1097 auto p = values[string(infover_key)].cbegin();
1098 decode(struct_v, p);
1099 ceph_assert(struct_v >= 10);
1100
1101 p = values[string(info_key)].begin();
1102 decode(info, p);
1103
1104 p = values[string(biginfo_key)].begin();
1105 decode(past_intervals, p);
1106 decode(info.purged_snaps, p);
1107
1108 p = values[string(fastinfo_key)].begin();
1109 if (!p.end()) {
1110 pg_fast_info_t fast;
1111 decode(fast, p);
1112 fast.try_apply_to(&info);
1113 }
1114 return 0;
1115 }
1116
1117 void PG::read_state(ObjectStore *store)
1118 {
1119 PastIntervals past_intervals_from_disk;
1120 pg_info_t info_from_disk;
1121 int r = read_info(
1122 store,
1123 pg_id,
1124 coll,
1125 info_from_disk,
1126 past_intervals_from_disk,
1127 info_struct_v);
1128 ceph_assert(r >= 0);
1129
1130 if (info_struct_v < pg_compat_struct_v) {
1131 derr << "PG needs upgrade, but on-disk data is too old; upgrade to"
1132 << " an older version first." << dendl;
1133 ceph_abort_msg("PG too old to upgrade");
1134 }
1135
1136 recovery_state.init_from_disk_state(
1137 std::move(info_from_disk),
1138 std::move(past_intervals_from_disk),
1139 [this, store] (PGLog &pglog) {
1140 ostringstream oss;
1141 pglog.read_log_and_missing(
1142 store,
1143 ch,
1144 pgmeta_oid,
1145 info,
1146 oss,
1147 cct->_conf->osd_ignore_stale_divergent_priors,
1148 cct->_conf->osd_debug_verify_missing_on_start);
1149
1150 if (oss.tellp())
1151 osd->clog->error() << oss.str();
1152 return 0;
1153 });
1154
1155 if (info_struct_v < pg_latest_struct_v) {
1156 upgrade(store);
1157 }
1158
1159 // initialize current mapping
1160 {
1161 int primary, up_primary;
1162 vector<int> acting, up;
1163 get_osdmap()->pg_to_up_acting_osds(
1164 pg_id.pgid, &up, &up_primary, &acting, &primary);
1165 recovery_state.init_primary_up_acting(
1166 up,
1167 acting,
1168 up_primary,
1169 primary);
1170 recovery_state.set_role(OSDMap::calc_pg_role(pg_whoami, acting));
1171 }
1172
1173 // init pool options
1174 store->set_collection_opts(ch, pool.info.opts);
1175
1176 PeeringCtx rctx(ceph_release_t::unknown);
1177 handle_initialize(rctx);
1178 // note: we don't activate here because we know the OSD will advance maps
1179 // during boot.
1180 write_if_dirty(rctx.transaction);
1181 store->queue_transaction(ch, std::move(rctx.transaction));
1182 }
1183
1184 void PG::update_snap_map(
1185 const vector<pg_log_entry_t> &log_entries,
1186 ObjectStore::Transaction &t)
1187 {
1188 for (vector<pg_log_entry_t>::const_iterator i = log_entries.begin();
1189 i != log_entries.end();
1190 ++i) {
1191 OSDriver::OSTransaction _t(osdriver.get_transaction(&t));
1192 if (i->soid.snap < CEPH_MAXSNAP) {
1193 if (i->is_delete()) {
1194 int r = snap_mapper.remove_oid(
1195 i->soid,
1196 &_t);
1197 if (r != 0)
1198 derr << __func__ << " remove_oid " << i->soid << " failed with " << r << dendl;
1199 // On removal tolerate missing key corruption
1200 ceph_assert(r == 0 || r == -ENOENT);
1201 } else if (i->is_update()) {
1202 ceph_assert(i->snaps.length() > 0);
1203 vector<snapid_t> snaps;
1204 bufferlist snapbl = i->snaps;
1205 auto p = snapbl.cbegin();
1206 try {
1207 decode(snaps, p);
1208 } catch (...) {
1209 derr << __func__ << " decode snaps failure on " << *i << dendl;
1210 snaps.clear();
1211 }
1212 set<snapid_t> _snaps(snaps.begin(), snaps.end());
1213
1214 if (i->is_clone() || i->is_promote()) {
1215 snap_mapper.add_oid(
1216 i->soid,
1217 _snaps,
1218 &_t);
1219 } else if (i->is_modify()) {
1220 int r = snap_mapper.update_snaps(
1221 i->soid,
1222 _snaps,
1223 0,
1224 &_t);
1225 ceph_assert(r == 0);
1226 } else {
1227 ceph_assert(i->is_clean());
1228 }
1229 }
1230 }
1231 }
1232 }
1233
1234 /**
1235 * filter trimming|trimmed snaps out of snapcontext
1236 */
1237 void PG::filter_snapc(vector<snapid_t> &snaps)
1238 {
1239 // nothing needs to trim, we can return immediately
1240 if (snap_trimq.empty() && info.purged_snaps.empty())
1241 return;
1242
1243 bool filtering = false;
1244 vector<snapid_t> newsnaps;
1245 for (vector<snapid_t>::iterator p = snaps.begin();
1246 p != snaps.end();
1247 ++p) {
1248 if (snap_trimq.contains(*p) || info.purged_snaps.contains(*p)) {
1249 if (!filtering) {
1250 // start building a new vector with what we've seen so far
1251 dout(10) << "filter_snapc filtering " << snaps << dendl;
1252 newsnaps.insert(newsnaps.begin(), snaps.begin(), p);
1253 filtering = true;
1254 }
1255 dout(20) << "filter_snapc removing trimq|purged snap " << *p << dendl;
1256 } else {
1257 if (filtering)
1258 newsnaps.push_back(*p); // continue building new vector
1259 }
1260 }
1261 if (filtering) {
1262 snaps.swap(newsnaps);
1263 dout(10) << "filter_snapc result " << snaps << dendl;
1264 }
1265 }
1266
1267 void PG::requeue_object_waiters(map<hobject_t, list<OpRequestRef>>& m)
1268 {
1269 for (map<hobject_t, list<OpRequestRef>>::iterator it = m.begin();
1270 it != m.end();
1271 ++it)
1272 requeue_ops(it->second);
1273 m.clear();
1274 }
1275
1276 void PG::requeue_op(OpRequestRef op)
1277 {
1278 auto p = waiting_for_map.find(op->get_source());
1279 if (p != waiting_for_map.end()) {
1280 dout(20) << __func__ << " " << op << " (waiting_for_map " << p->first << ")"
1281 << dendl;
1282 p->second.push_front(op);
1283 } else {
1284 dout(20) << __func__ << " " << op << dendl;
1285 osd->enqueue_front(
1286 OpSchedulerItem(
1287 unique_ptr<OpSchedulerItem::OpQueueable>(new PGOpItem(info.pgid, op)),
1288 op->get_req()->get_cost(),
1289 op->get_req()->get_priority(),
1290 op->get_req()->get_recv_stamp(),
1291 op->get_req()->get_source().num(),
1292 get_osdmap_epoch()));
1293 }
1294 }
1295
1296 void PG::requeue_ops(list<OpRequestRef> &ls)
1297 {
1298 for (list<OpRequestRef>::reverse_iterator i = ls.rbegin();
1299 i != ls.rend();
1300 ++i) {
1301 requeue_op(*i);
1302 }
1303 ls.clear();
1304 }
1305
1306 void PG::requeue_map_waiters()
1307 {
1308 epoch_t epoch = get_osdmap_epoch();
1309 auto p = waiting_for_map.begin();
1310 while (p != waiting_for_map.end()) {
1311 if (epoch < p->second.front()->min_epoch) {
1312 dout(20) << __func__ << " " << p->first << " front op "
1313 << p->second.front() << " must still wait, doing nothing"
1314 << dendl;
1315 ++p;
1316 } else {
1317 dout(20) << __func__ << " " << p->first << " " << p->second << dendl;
1318 for (auto q = p->second.rbegin(); q != p->second.rend(); ++q) {
1319 auto req = *q;
1320 osd->enqueue_front(OpSchedulerItem(
1321 unique_ptr<OpSchedulerItem::OpQueueable>(new PGOpItem(info.pgid, req)),
1322 req->get_req()->get_cost(),
1323 req->get_req()->get_priority(),
1324 req->get_req()->get_recv_stamp(),
1325 req->get_req()->get_source().num(),
1326 epoch));
1327 }
1328 p = waiting_for_map.erase(p);
1329 }
1330 }
1331 }
1332
1333
1334 // ==========================================================================================
1335 // SCRUB
1336
1337 /*
1338 * when holding pg and sched_scrub_lock, then the states are:
1339 * scheduling:
1340 * scrubber.local_reserved = true
1341 * scrubber.active = false
1342 * scrubber.reserved_peers includes whoami
1343 * osd->scrubs_local++
1344 * scheduling, replica declined:
1345 * scrubber.local_reserved = true
1346 * scrubber.reserved_peers includes -1
1347 * osd->scrub_local++
1348 * pending:
1349 * scrubber.local_reserved = true
1350 * scrubber.active = false
1351 * scrubber.reserved_peers.size() == acting.size();
1352 * pg on scrub_wq
1353 * osd->scrub_local++
1354 * scrubbing:
1355 * scrubber.local_reserved = true;
1356 * scrubber.active = true
1357 * scrubber.reserved_peers empty
1358 */
1359
1360 // returns true if a scrub has been newly kicked off
1361 bool PG::sched_scrub()
1362 {
1363 ceph_assert(ceph_mutex_is_locked(_lock));
1364 ceph_assert(!is_scrubbing());
1365 if (!(is_primary() && is_active() && is_clean())) {
1366 return false;
1367 }
1368
1369 // All processing the first time through commits us to whatever
1370 // choices are made.
1371 if (!scrubber.local_reserved) {
1372 dout(20) << __func__ << ": Start processing pg " << info.pgid << dendl;
1373
1374 bool allow_deep_scrub = !(get_osdmap()->test_flag(CEPH_OSDMAP_NODEEP_SCRUB) ||
1375 pool.info.has_flag(pg_pool_t::FLAG_NODEEP_SCRUB));
1376 bool allow_scrub = !(get_osdmap()->test_flag(CEPH_OSDMAP_NOSCRUB) ||
1377 pool.info.has_flag(pg_pool_t::FLAG_NOSCRUB));
1378 bool has_deep_errors = (info.stats.stats.sum.num_deep_scrub_errors > 0);
1379 bool try_to_auto_repair = (cct->_conf->osd_scrub_auto_repair
1380 && get_pgbackend()->auto_repair_supported());
1381
1382 scrubber.time_for_deep = false;
1383 // Clear these in case user issues the scrub/repair command during
1384 // the scheduling of the scrub/repair (e.g. request reservation)
1385 scrubber.deep_scrub_on_error = false;
1386 scrubber.auto_repair = false;
1387
1388 // All periodic scrub handling goes here because must_scrub is
1389 // always set for must_deep_scrub and must_repair.
1390 if (!scrubber.must_scrub) {
1391 ceph_assert(!scrubber.must_deep_scrub && !scrubber.must_repair);
1392 // Handle deep scrub determination only if allowed
1393 if (allow_deep_scrub) {
1394 // Initial entry and scheduled scrubs without nodeep_scrub set get here
1395 if (scrubber.need_auto) {
1396 dout(20) << __func__ << ": need repair after scrub errors" << dendl;
1397 scrubber.time_for_deep = true;
1398 } else {
1399 double deep_scrub_interval = 0;
1400 pool.info.opts.get(pool_opts_t::DEEP_SCRUB_INTERVAL, &deep_scrub_interval);
1401 if (deep_scrub_interval <= 0) {
1402 deep_scrub_interval = cct->_conf->osd_deep_scrub_interval;
1403 }
1404 scrubber.time_for_deep = ceph_clock_now() >=
1405 info.history.last_deep_scrub_stamp + deep_scrub_interval;
1406
1407 bool deep_coin_flip = false;
1408 // If we randomize when !allow_scrub && allow_deep_scrub, then it guarantees
1409 // we will deep scrub because this function is called often.
1410 if (!scrubber.time_for_deep && allow_scrub)
1411 deep_coin_flip = (rand() % 100) < cct->_conf->osd_deep_scrub_randomize_ratio * 100;
1412 dout(20) << __func__ << ": time_for_deep=" << scrubber.time_for_deep << " deep_coin_flip=" << deep_coin_flip << dendl;
1413
1414 scrubber.time_for_deep = (scrubber.time_for_deep || deep_coin_flip);
1415 }
1416
1417 if (!scrubber.time_for_deep && has_deep_errors) {
1418 osd->clog->info() << "osd." << osd->whoami
1419 << " pg " << info.pgid
1420 << " Deep scrub errors, upgrading scrub to deep-scrub";
1421 scrubber.time_for_deep = true;
1422 }
1423
1424 if (try_to_auto_repair) {
1425 if (scrubber.time_for_deep) {
1426 dout(20) << __func__ << ": auto repair with deep scrubbing" << dendl;
1427 scrubber.auto_repair = true;
1428 } else if (allow_scrub) {
1429 dout(20) << __func__ << ": auto repair with scrubbing, rescrub if errors found" << dendl;
1430 scrubber.deep_scrub_on_error = true;
1431 }
1432 }
1433 } else { // !allow_deep_scrub
1434 dout(20) << __func__ << ": nodeep_scrub set" << dendl;
1435 if (has_deep_errors) {
1436 osd->clog->error() << "osd." << osd->whoami
1437 << " pg " << info.pgid
1438 << " Regular scrub skipped due to deep-scrub errors and nodeep-scrub set";
1439 return false;
1440 }
1441 }
1442
1443 //NOSCRUB so skip regular scrubs
1444 if (!allow_scrub && !scrubber.time_for_deep) {
1445 return false;
1446 }
1447 // scrubber.must_scrub
1448 } else if (!scrubber.must_deep_scrub && has_deep_errors) {
1449 osd->clog->error() << "osd." << osd->whoami
1450 << " pg " << info.pgid
1451 << " Regular scrub request, deep-scrub details will be lost";
1452 }
1453 // Unless precluded this was handle above
1454 scrubber.need_auto = false;
1455
1456 ceph_assert(scrubber.reserved_peers.empty());
1457 bool allow_scrubing = cct->_conf->osd_scrub_during_recovery ||
1458 (cct->_conf->osd_repair_during_recovery && scrubber.must_repair) ||
1459 !osd->is_recovery_active();
1460 if (allow_scrubing &&
1461 osd->inc_scrubs_local()) {
1462 dout(20) << __func__ << ": reserved locally, reserving replicas" << dendl;
1463 scrubber.local_reserved = true;
1464 scrubber.reserved_peers.insert(pg_whoami);
1465 scrub_reserve_replicas();
1466 } else {
1467 dout(20) << __func__ << ": failed to reserve locally" << dendl;
1468 return false;
1469 }
1470 }
1471
1472 if (scrubber.local_reserved) {
1473 if (scrubber.reserve_failed) {
1474 dout(20) << __func__ << ": failed, a peer declined" << dendl;
1475 clear_scrub_reserved();
1476 scrub_unreserve_replicas();
1477 return false;
1478 } else if (scrubber.reserved_peers.size() == get_actingset().size()) {
1479 dout(20) << __func__ << ": success, reserved self and replicas" << dendl;
1480 if (scrubber.time_for_deep) {
1481 dout(10) << __func__ << ": scrub will be deep" << dendl;
1482 state_set(PG_STATE_DEEP_SCRUB);
1483 scrubber.time_for_deep = false;
1484 }
1485 queue_scrub();
1486 } else {
1487 // none declined, since scrubber.reserved is set
1488 dout(20) << __func__ << ": reserved " << scrubber.reserved_peers
1489 << ", waiting for replicas" << dendl;
1490 }
1491 }
1492 return true;
1493 }
1494
1495 bool PG::is_scrub_registered()
1496 {
1497 return !scrubber.scrub_reg_stamp.is_zero();
1498 }
1499
1500 void PG::reg_next_scrub()
1501 {
1502 if (!is_primary())
1503 return;
1504
1505 utime_t reg_stamp;
1506 bool must = false;
1507 if (scrubber.must_scrub || scrubber.need_auto) {
1508 // Set the smallest time that isn't utime_t()
1509 reg_stamp = Scrubber::scrub_must_stamp();
1510 must = true;
1511 } else if (info.stats.stats_invalid && cct->_conf->osd_scrub_invalid_stats) {
1512 reg_stamp = ceph_clock_now();
1513 must = true;
1514 } else {
1515 reg_stamp = info.history.last_scrub_stamp;
1516 }
1517 // note down the sched_time, so we can locate this scrub, and remove it
1518 // later on.
1519 double scrub_min_interval = 0, scrub_max_interval = 0;
1520 pool.info.opts.get(pool_opts_t::SCRUB_MIN_INTERVAL, &scrub_min_interval);
1521 pool.info.opts.get(pool_opts_t::SCRUB_MAX_INTERVAL, &scrub_max_interval);
1522 ceph_assert(!is_scrub_registered());
1523 scrubber.scrub_reg_stamp = osd->reg_pg_scrub(info.pgid,
1524 reg_stamp,
1525 scrub_min_interval,
1526 scrub_max_interval,
1527 must);
1528 dout(10) << __func__ << " pg " << pg_id << " register next scrub, scrub time "
1529 << scrubber.scrub_reg_stamp << ", must = " << (int)must << dendl;
1530 }
1531
1532 void PG::unreg_next_scrub()
1533 {
1534 if (is_scrub_registered()) {
1535 osd->unreg_pg_scrub(info.pgid, scrubber.scrub_reg_stamp);
1536 scrubber.scrub_reg_stamp = utime_t();
1537 }
1538 }
1539
1540 void PG::on_info_history_change()
1541 {
1542 unreg_next_scrub();
1543 reg_next_scrub();
1544 }
1545
1546 void PG::scrub_requested(bool deep, bool repair, bool need_auto)
1547 {
1548 unreg_next_scrub();
1549 if (need_auto) {
1550 scrubber.need_auto = true;
1551 } else {
1552 scrubber.must_scrub = true;
1553 scrubber.must_deep_scrub = deep || repair;
1554 scrubber.must_repair = repair;
1555 // User might intervene, so clear this
1556 scrubber.need_auto = false;
1557 scrubber.req_scrub = true;
1558 }
1559 reg_next_scrub();
1560 }
1561
1562 void PG::clear_ready_to_merge() {
1563 osd->clear_ready_to_merge(this);
1564 }
1565
1566 void PG::queue_want_pg_temp(const vector<int> &wanted) {
1567 osd->queue_want_pg_temp(get_pgid().pgid, wanted);
1568 }
1569
1570 void PG::clear_want_pg_temp() {
1571 osd->remove_want_pg_temp(get_pgid().pgid);
1572 }
1573
1574 void PG::on_role_change() {
1575 requeue_ops(waiting_for_peered);
1576 plpg_on_role_change();
1577 }
1578
1579 void PG::on_new_interval() {
1580 scrub_queued = false;
1581 projected_last_update = eversion_t();
1582 cancel_recovery();
1583 }
1584
1585 epoch_t PG::oldest_stored_osdmap() {
1586 return osd->get_superblock().oldest_map;
1587 }
1588
1589 OstreamTemp PG::get_clog_info() {
1590 return osd->clog->info();
1591 }
1592
1593 OstreamTemp PG::get_clog_debug() {
1594 return osd->clog->debug();
1595 }
1596
1597 OstreamTemp PG::get_clog_error() {
1598 return osd->clog->error();
1599 }
1600
1601 void PG::schedule_event_after(
1602 PGPeeringEventRef event,
1603 float delay) {
1604 std::lock_guard lock(osd->recovery_request_lock);
1605 osd->recovery_request_timer.add_event_after(
1606 delay,
1607 new QueuePeeringEvt(
1608 this,
1609 std::move(event)));
1610 }
1611
1612 void PG::request_local_background_io_reservation(
1613 unsigned priority,
1614 PGPeeringEventRef on_grant,
1615 PGPeeringEventRef on_preempt) {
1616 osd->local_reserver.request_reservation(
1617 pg_id,
1618 on_grant ? new QueuePeeringEvt(
1619 this, on_grant) : nullptr,
1620 priority,
1621 on_preempt ? new QueuePeeringEvt(
1622 this, on_preempt) : nullptr);
1623 }
1624
1625 void PG::update_local_background_io_priority(
1626 unsigned priority) {
1627 osd->local_reserver.update_priority(
1628 pg_id,
1629 priority);
1630 }
1631
1632 void PG::cancel_local_background_io_reservation() {
1633 osd->local_reserver.cancel_reservation(
1634 pg_id);
1635 }
1636
1637 void PG::request_remote_recovery_reservation(
1638 unsigned priority,
1639 PGPeeringEventRef on_grant,
1640 PGPeeringEventRef on_preempt) {
1641 osd->remote_reserver.request_reservation(
1642 pg_id,
1643 on_grant ? new QueuePeeringEvt(
1644 this, on_grant) : nullptr,
1645 priority,
1646 on_preempt ? new QueuePeeringEvt(
1647 this, on_preempt) : nullptr);
1648 }
1649
1650 void PG::cancel_remote_recovery_reservation() {
1651 osd->remote_reserver.cancel_reservation(
1652 pg_id);
1653 }
1654
1655 void PG::schedule_event_on_commit(
1656 ObjectStore::Transaction &t,
1657 PGPeeringEventRef on_commit)
1658 {
1659 t.register_on_commit(new QueuePeeringEvt(this, on_commit));
1660 }
1661
1662 void PG::on_active_exit()
1663 {
1664 backfill_reserving = false;
1665 agent_stop();
1666 }
1667
1668 void PG::on_active_advmap(const OSDMapRef &osdmap)
1669 {
1670 const auto& new_removed_snaps = osdmap->get_new_removed_snaps();
1671 auto i = new_removed_snaps.find(get_pgid().pool());
1672 if (i != new_removed_snaps.end()) {
1673 bool bad = false;
1674 for (auto j : i->second) {
1675 if (snap_trimq.intersects(j.first, j.second)) {
1676 decltype(snap_trimq) added, overlap;
1677 added.insert(j.first, j.second);
1678 overlap.intersection_of(snap_trimq, added);
1679 derr << __func__ << " removed_snaps already contains "
1680 << overlap << dendl;
1681 bad = true;
1682 snap_trimq.union_of(added);
1683 } else {
1684 snap_trimq.insert(j.first, j.second);
1685 }
1686 }
1687 dout(10) << __func__ << " new removed_snaps " << i->second
1688 << ", snap_trimq now " << snap_trimq << dendl;
1689 ceph_assert(!bad || !cct->_conf->osd_debug_verify_cached_snaps);
1690 }
1691
1692 const auto& new_purged_snaps = osdmap->get_new_purged_snaps();
1693 auto j = new_purged_snaps.find(get_pgid().pgid.pool());
1694 if (j != new_purged_snaps.end()) {
1695 bool bad = false;
1696 for (auto k : j->second) {
1697 if (!recovery_state.get_info().purged_snaps.contains(k.first, k.second)) {
1698 interval_set<snapid_t> rm, overlap;
1699 rm.insert(k.first, k.second);
1700 overlap.intersection_of(recovery_state.get_info().purged_snaps, rm);
1701 derr << __func__ << " purged_snaps does not contain "
1702 << rm << ", only " << overlap << dendl;
1703 recovery_state.adjust_purged_snaps(
1704 [&overlap](auto &purged_snaps) {
1705 purged_snaps.subtract(overlap);
1706 });
1707 // This can currently happen in the normal (if unlikely) course of
1708 // events. Because adding snaps to purged_snaps does not increase
1709 // the pg version or add a pg log entry, we don't reliably propagate
1710 // purged_snaps additions to other OSDs.
1711 // One example:
1712 // - purge S
1713 // - primary and replicas update purged_snaps
1714 // - no object updates
1715 // - pg mapping changes, new primary on different node
1716 // - new primary pg version == eversion_t(), so info is not
1717 // propagated.
1718 //bad = true;
1719 } else {
1720 recovery_state.adjust_purged_snaps(
1721 [&k](auto &purged_snaps) {
1722 purged_snaps.erase(k.first, k.second);
1723 });
1724 }
1725 }
1726 dout(10) << __func__ << " new purged_snaps " << j->second
1727 << ", now " << recovery_state.get_info().purged_snaps << dendl;
1728 ceph_assert(!bad || !cct->_conf->osd_debug_verify_cached_snaps);
1729 }
1730 }
1731
1732 void PG::queue_snap_retrim(snapid_t snap)
1733 {
1734 if (!is_active() ||
1735 !is_primary()) {
1736 dout(10) << __func__ << " snap " << snap << " - not active and primary"
1737 << dendl;
1738 return;
1739 }
1740 if (!snap_trimq.contains(snap)) {
1741 snap_trimq.insert(snap);
1742 snap_trimq_repeat.insert(snap);
1743 dout(20) << __func__ << " snap " << snap
1744 << ", trimq now " << snap_trimq
1745 << ", repeat " << snap_trimq_repeat << dendl;
1746 kick_snap_trim();
1747 } else {
1748 dout(20) << __func__ << " snap " << snap
1749 << " already in trimq " << snap_trimq << dendl;
1750 }
1751 }
1752
1753 void PG::on_active_actmap()
1754 {
1755 if (cct->_conf->osd_check_for_log_corruption)
1756 check_log_for_corruption(osd->store);
1757
1758
1759 if (recovery_state.is_active()) {
1760 dout(10) << "Active: kicking snap trim" << dendl;
1761 kick_snap_trim();
1762 }
1763
1764 if (recovery_state.is_peered() &&
1765 !recovery_state.is_clean() &&
1766 !recovery_state.get_osdmap()->test_flag(CEPH_OSDMAP_NOBACKFILL) &&
1767 (!recovery_state.get_osdmap()->test_flag(CEPH_OSDMAP_NOREBALANCE) ||
1768 recovery_state.is_degraded())) {
1769 queue_recovery();
1770 }
1771 }
1772
1773 void PG::on_backfill_reserved()
1774 {
1775 backfill_reserving = false;
1776 queue_recovery();
1777 }
1778
1779 void PG::on_backfill_canceled()
1780 {
1781 if (!waiting_on_backfill.empty()) {
1782 waiting_on_backfill.clear();
1783 finish_recovery_op(hobject_t::get_max());
1784 }
1785 }
1786
1787 void PG::on_recovery_reserved()
1788 {
1789 queue_recovery();
1790 }
1791
1792 void PG::set_not_ready_to_merge_target(pg_t pgid, pg_t src)
1793 {
1794 osd->set_not_ready_to_merge_target(pgid, src);
1795 }
1796
1797 void PG::set_not_ready_to_merge_source(pg_t pgid)
1798 {
1799 osd->set_not_ready_to_merge_source(pgid);
1800 }
1801
1802 void PG::set_ready_to_merge_target(eversion_t lu, epoch_t les, epoch_t lec)
1803 {
1804 osd->set_ready_to_merge_target(this, lu, les, lec);
1805 }
1806
1807 void PG::set_ready_to_merge_source(eversion_t lu)
1808 {
1809 osd->set_ready_to_merge_source(this, lu);
1810 }
1811
1812 void PG::send_pg_created(pg_t pgid)
1813 {
1814 osd->send_pg_created(pgid);
1815 }
1816
1817 ceph::signedspan PG::get_mnow()
1818 {
1819 return osd->get_mnow();
1820 }
1821
1822 HeartbeatStampsRef PG::get_hb_stamps(int peer)
1823 {
1824 return osd->get_hb_stamps(peer);
1825 }
1826
1827 void PG::schedule_renew_lease(epoch_t lpr, ceph::timespan delay)
1828 {
1829 auto spgid = info.pgid;
1830 auto o = osd;
1831 osd->mono_timer.add_event(
1832 delay,
1833 [o, lpr, spgid]() {
1834 o->queue_renew_lease(lpr, spgid);
1835 });
1836 }
1837
1838 void PG::queue_check_readable(epoch_t lpr, ceph::timespan delay)
1839 {
1840 osd->queue_check_readable(info.pgid, lpr, delay);
1841 }
1842
1843 void PG::rebuild_missing_set_with_deletes(PGLog &pglog)
1844 {
1845 pglog.rebuild_missing_set_with_deletes(
1846 osd->store,
1847 ch,
1848 recovery_state.get_info());
1849 }
1850
1851 void PG::on_activate_committed()
1852 {
1853 if (!is_primary()) {
1854 // waiters
1855 if (recovery_state.needs_flush() == 0) {
1856 requeue_ops(waiting_for_peered);
1857 } else if (!waiting_for_peered.empty()) {
1858 dout(10) << __func__ << " flushes in progress, moving "
1859 << waiting_for_peered.size() << " items to waiting_for_flush"
1860 << dendl;
1861 ceph_assert(waiting_for_flush.empty());
1862 waiting_for_flush.swap(waiting_for_peered);
1863 }
1864 }
1865 }
1866
1867 void PG::do_replica_scrub_map(OpRequestRef op)
1868 {
1869 auto m = op->get_req<MOSDRepScrubMap>();
1870 dout(7) << __func__ << " " << *m << dendl;
1871 if (m->map_epoch < info.history.same_interval_since) {
1872 dout(10) << __func__ << " discarding old from "
1873 << m->map_epoch << " < " << info.history.same_interval_since
1874 << dendl;
1875 return;
1876 }
1877 if (!scrubber.is_chunky_scrub_active()) {
1878 dout(10) << __func__ << " scrub isn't active" << dendl;
1879 return;
1880 }
1881
1882 op->mark_started();
1883
1884 auto p = const_cast<bufferlist&>(m->get_data()).cbegin();
1885 scrubber.received_maps[m->from].decode(p, info.pgid.pool());
1886 dout(10) << "map version is "
1887 << scrubber.received_maps[m->from].valid_through
1888 << dendl;
1889
1890 dout(10) << __func__ << " waiting_on_whom was " << scrubber.waiting_on_whom
1891 << dendl;
1892 ceph_assert(scrubber.waiting_on_whom.count(m->from));
1893 scrubber.waiting_on_whom.erase(m->from);
1894 if (m->preempted) {
1895 dout(10) << __func__ << " replica was preempted, setting flag" << dendl;
1896 scrub_preempted = true;
1897 }
1898 if (scrubber.waiting_on_whom.empty()) {
1899 requeue_scrub(ops_blocked_by_scrub());
1900 }
1901 }
1902
1903 // send scrub v3 messages (chunky scrub)
1904 void PG::_request_scrub_map(
1905 pg_shard_t replica, eversion_t version,
1906 hobject_t start, hobject_t end,
1907 bool deep,
1908 bool allow_preemption)
1909 {
1910 ceph_assert(replica != pg_whoami);
1911 dout(10) << "scrub requesting scrubmap from osd." << replica
1912 << " deep " << (int)deep << dendl;
1913 MOSDRepScrub *repscrubop = new MOSDRepScrub(
1914 spg_t(info.pgid.pgid, replica.shard), version,
1915 get_osdmap_epoch(),
1916 get_last_peering_reset(),
1917 start, end, deep,
1918 allow_preemption,
1919 scrubber.priority,
1920 ops_blocked_by_scrub());
1921 // default priority, we want the rep scrub processed prior to any recovery
1922 // or client io messages (we are holding a lock!)
1923 osd->send_message_osd_cluster(
1924 replica.osd, repscrubop, get_osdmap_epoch());
1925 }
1926
1927 void PG::handle_scrub_reserve_request(OpRequestRef op)
1928 {
1929 dout(7) << __func__ << " " << *op->get_req() << dendl;
1930 op->mark_started();
1931 if (scrubber.remote_reserved) {
1932 dout(10) << __func__ << " ignoring reserve request: Already reserved"
1933 << dendl;
1934 return;
1935 }
1936 if ((cct->_conf->osd_scrub_during_recovery || !osd->is_recovery_active()) &&
1937 osd->inc_scrubs_remote()) {
1938 scrubber.remote_reserved = true;
1939 } else {
1940 dout(20) << __func__ << ": failed to reserve remotely" << dendl;
1941 scrubber.remote_reserved = false;
1942 }
1943 auto m = op->get_req<MOSDScrubReserve>();
1944 Message *reply = new MOSDScrubReserve(
1945 spg_t(info.pgid.pgid, get_primary().shard),
1946 m->map_epoch,
1947 scrubber.remote_reserved ? MOSDScrubReserve::GRANT : MOSDScrubReserve::REJECT,
1948 pg_whoami);
1949 osd->send_message_osd_cluster(reply, op->get_req()->get_connection());
1950 }
1951
1952 void PG::handle_scrub_reserve_grant(OpRequestRef op, pg_shard_t from)
1953 {
1954 dout(7) << __func__ << " " << *op->get_req() << dendl;
1955 op->mark_started();
1956 if (!scrubber.local_reserved) {
1957 dout(10) << "ignoring obsolete scrub reserve reply" << dendl;
1958 return;
1959 }
1960 if (scrubber.reserved_peers.find(from) != scrubber.reserved_peers.end()) {
1961 dout(10) << " already had osd." << from << " reserved" << dendl;
1962 } else {
1963 dout(10) << " osd." << from << " scrub reserve = success" << dendl;
1964 scrubber.reserved_peers.insert(from);
1965 sched_scrub();
1966 }
1967 }
1968
1969 void PG::handle_scrub_reserve_reject(OpRequestRef op, pg_shard_t from)
1970 {
1971 dout(7) << __func__ << " " << *op->get_req() << dendl;
1972 op->mark_started();
1973 if (!scrubber.local_reserved) {
1974 dout(10) << "ignoring obsolete scrub reserve reply" << dendl;
1975 return;
1976 }
1977 if (scrubber.reserved_peers.find(from) != scrubber.reserved_peers.end()) {
1978 dout(10) << " already had osd." << from << " reserved" << dendl;
1979 } else {
1980 /* One decline stops this pg from being scheduled for scrubbing. */
1981 dout(10) << " osd." << from << " scrub reserve = fail" << dendl;
1982 scrubber.reserve_failed = true;
1983 sched_scrub();
1984 }
1985 }
1986
1987 void PG::handle_scrub_reserve_release(OpRequestRef op)
1988 {
1989 dout(7) << __func__ << " " << *op->get_req() << dendl;
1990 op->mark_started();
1991 clear_scrub_reserved();
1992 }
1993
1994 // Compute pending backfill data
1995 static int64_t pending_backfill(CephContext *cct, int64_t bf_bytes, int64_t local_bytes)
1996 {
1997 lgeneric_dout(cct, 20) << __func__ << " Adjust local usage "
1998 << (local_bytes >> 10) << "KiB"
1999 << " primary usage " << (bf_bytes >> 10)
2000 << "KiB" << dendl;
2001
2002 return std::max((int64_t)0, bf_bytes - local_bytes);
2003 }
2004
2005
2006 // We can zero the value of primary num_bytes as just an atomic.
2007 // However, setting above zero reserves space for backfill and requires
2008 // the OSDService::stat_lock which protects all OSD usage
2009 bool PG::try_reserve_recovery_space(
2010 int64_t primary_bytes, int64_t local_bytes) {
2011 // Use tentative_bacfill_full() to make sure enough
2012 // space is available to handle target bytes from primary.
2013
2014 // TODO: If we passed num_objects from primary we could account for
2015 // an estimate of the metadata overhead.
2016
2017 // TODO: If we had compressed_allocated and compressed_original from primary
2018 // we could compute compression ratio and adjust accordingly.
2019
2020 // XXX: There is no way to get omap overhead and this would only apply
2021 // to whatever possibly different partition that is storing the database.
2022
2023 // update_osd_stat() from heartbeat will do this on a new
2024 // statfs using ps->primary_bytes.
2025 uint64_t pending_adjustment = 0;
2026 if (primary_bytes) {
2027 // For erasure coded pool overestimate by a full stripe per object
2028 // because we don't know how each objected rounded to the nearest stripe
2029 if (pool.info.is_erasure()) {
2030 primary_bytes /= (int)get_pgbackend()->get_ec_data_chunk_count();
2031 primary_bytes += get_pgbackend()->get_ec_stripe_chunk_size() *
2032 info.stats.stats.sum.num_objects;
2033 local_bytes /= (int)get_pgbackend()->get_ec_data_chunk_count();
2034 local_bytes += get_pgbackend()->get_ec_stripe_chunk_size() *
2035 info.stats.stats.sum.num_objects;
2036 }
2037 pending_adjustment = pending_backfill(
2038 cct,
2039 primary_bytes,
2040 local_bytes);
2041 dout(10) << __func__ << " primary_bytes " << (primary_bytes >> 10)
2042 << "KiB"
2043 << " local " << (local_bytes >> 10) << "KiB"
2044 << " pending_adjustments " << (pending_adjustment >> 10) << "KiB"
2045 << dendl;
2046 }
2047
2048 // This lock protects not only the stats OSDService but also setting the
2049 // pg primary_bytes. That's why we don't immediately unlock
2050 std::lock_guard l{osd->stat_lock};
2051 osd_stat_t cur_stat = osd->osd_stat;
2052 if (cct->_conf->osd_debug_reject_backfill_probability > 0 &&
2053 (rand()%1000 < (cct->_conf->osd_debug_reject_backfill_probability*1000.0))) {
2054 dout(10) << "backfill reservation rejected: failure injection"
2055 << dendl;
2056 return false;
2057 } else if (!cct->_conf->osd_debug_skip_full_check_in_backfill_reservation &&
2058 osd->tentative_backfill_full(this, pending_adjustment, cur_stat)) {
2059 dout(10) << "backfill reservation rejected: backfill full"
2060 << dendl;
2061 return false;
2062 } else {
2063 // Don't reserve space if skipped reservation check, this is used
2064 // to test the other backfill full check AND in case a corruption
2065 // of num_bytes requires ignoring that value and trying the
2066 // backfill anyway.
2067 if (primary_bytes &&
2068 !cct->_conf->osd_debug_skip_full_check_in_backfill_reservation) {
2069 primary_num_bytes.store(primary_bytes);
2070 local_num_bytes.store(local_bytes);
2071 } else {
2072 unreserve_recovery_space();
2073 }
2074 return true;
2075 }
2076 }
2077
2078 void PG::unreserve_recovery_space() {
2079 primary_num_bytes.store(0);
2080 local_num_bytes.store(0);
2081 return;
2082 }
2083
2084 void PG::clear_scrub_reserved()
2085 {
2086 scrubber.reserved_peers.clear();
2087 scrubber.reserve_failed = false;
2088
2089 if (scrubber.local_reserved) {
2090 scrubber.local_reserved = false;
2091 osd->dec_scrubs_local();
2092 }
2093 if (scrubber.remote_reserved) {
2094 scrubber.remote_reserved = false;
2095 osd->dec_scrubs_remote();
2096 }
2097 }
2098
2099 void PG::scrub_reserve_replicas()
2100 {
2101 ceph_assert(recovery_state.get_backfill_targets().empty());
2102 std::vector<std::pair<int, Message*>> messages;
2103 messages.reserve(get_actingset().size());
2104 epoch_t e = get_osdmap_epoch();
2105 for (set<pg_shard_t>::iterator i = get_actingset().begin();
2106 i != get_actingset().end();
2107 ++i) {
2108 if (*i == pg_whoami) continue;
2109 dout(10) << "scrub requesting reserve from osd." << *i << dendl;
2110 Message* m = new MOSDScrubReserve(spg_t(info.pgid.pgid, i->shard), e,
2111 MOSDScrubReserve::REQUEST, pg_whoami);
2112 messages.push_back(std::make_pair(i->osd, m));
2113 }
2114 if (!messages.empty()) {
2115 osd->send_message_osd_cluster(messages, e);
2116 }
2117 }
2118
2119 void PG::scrub_unreserve_replicas()
2120 {
2121 ceph_assert(recovery_state.get_backfill_targets().empty());
2122 std::vector<std::pair<int, Message*>> messages;
2123 messages.reserve(get_actingset().size());
2124 epoch_t e = get_osdmap_epoch();
2125 for (set<pg_shard_t>::iterator i = get_actingset().begin();
2126 i != get_actingset().end();
2127 ++i) {
2128 if (*i == pg_whoami) continue;
2129 dout(10) << "scrub requesting unreserve from osd." << *i << dendl;
2130 Message* m = new MOSDScrubReserve(spg_t(info.pgid.pgid, i->shard), e,
2131 MOSDScrubReserve::RELEASE, pg_whoami);
2132 messages.push_back(std::make_pair(i->osd, m));
2133 }
2134 if (!messages.empty()) {
2135 osd->send_message_osd_cluster(messages, e);
2136 }
2137 }
2138
2139 void PG::_scan_rollback_obs(const vector<ghobject_t> &rollback_obs)
2140 {
2141 ObjectStore::Transaction t;
2142 eversion_t trimmed_to = recovery_state.get_last_rollback_info_trimmed_to_applied();
2143 for (vector<ghobject_t>::const_iterator i = rollback_obs.begin();
2144 i != rollback_obs.end();
2145 ++i) {
2146 if (i->generation < trimmed_to.version) {
2147 dout(10) << __func__ << "osd." << osd->whoami
2148 << " pg " << info.pgid
2149 << " found obsolete rollback obj "
2150 << *i << " generation < trimmed_to "
2151 << trimmed_to
2152 << "...repaired" << dendl;
2153 t.remove(coll, *i);
2154 }
2155 }
2156 if (!t.empty()) {
2157 derr << __func__ << ": queueing trans to clean up obsolete rollback objs"
2158 << dendl;
2159 osd->store->queue_transaction(ch, std::move(t), NULL);
2160 }
2161 }
2162
2163 void PG::_scan_snaps(ScrubMap &smap)
2164 {
2165 hobject_t head;
2166 SnapSet snapset;
2167
2168 // Test qa/standalone/scrub/osd-scrub-snaps.sh uses this message to verify
2169 // caller using clean_meta_map(), and it works properly.
2170 dout(20) << __func__ << " start" << dendl;
2171
2172 for (map<hobject_t, ScrubMap::object>::reverse_iterator i = smap.objects.rbegin();
2173 i != smap.objects.rend();
2174 ++i) {
2175 const hobject_t &hoid = i->first;
2176 ScrubMap::object &o = i->second;
2177
2178 dout(20) << __func__ << " " << hoid << dendl;
2179
2180 ceph_assert(!hoid.is_snapdir());
2181 if (hoid.is_head()) {
2182 // parse the SnapSet
2183 bufferlist bl;
2184 if (o.attrs.find(SS_ATTR) == o.attrs.end()) {
2185 continue;
2186 }
2187 bl.push_back(o.attrs[SS_ATTR]);
2188 auto p = bl.cbegin();
2189 try {
2190 decode(snapset, p);
2191 } catch(...) {
2192 continue;
2193 }
2194 head = hoid.get_head();
2195 continue;
2196 }
2197 if (hoid.snap < CEPH_MAXSNAP) {
2198 // check and if necessary fix snap_mapper
2199 if (hoid.get_head() != head) {
2200 derr << __func__ << " no head for " << hoid << " (have " << head << ")"
2201 << dendl;
2202 continue;
2203 }
2204 set<snapid_t> obj_snaps;
2205 auto p = snapset.clone_snaps.find(hoid.snap);
2206 if (p == snapset.clone_snaps.end()) {
2207 derr << __func__ << " no clone_snaps for " << hoid << " in " << snapset
2208 << dendl;
2209 continue;
2210 }
2211 obj_snaps.insert(p->second.begin(), p->second.end());
2212 set<snapid_t> cur_snaps;
2213 int r = snap_mapper.get_snaps(hoid, &cur_snaps);
2214 if (r != 0 && r != -ENOENT) {
2215 derr << __func__ << ": get_snaps returned " << cpp_strerror(r) << dendl;
2216 ceph_abort();
2217 }
2218 if (r == -ENOENT || cur_snaps != obj_snaps) {
2219 ObjectStore::Transaction t;
2220 OSDriver::OSTransaction _t(osdriver.get_transaction(&t));
2221 if (r == 0) {
2222 r = snap_mapper.remove_oid(hoid, &_t);
2223 if (r != 0) {
2224 derr << __func__ << ": remove_oid returned " << cpp_strerror(r)
2225 << dendl;
2226 ceph_abort();
2227 }
2228 osd->clog->error() << "osd." << osd->whoami
2229 << " found snap mapper error on pg "
2230 << info.pgid
2231 << " oid " << hoid << " snaps in mapper: "
2232 << cur_snaps << ", oi: "
2233 << obj_snaps
2234 << "...repaired";
2235 } else {
2236 osd->clog->error() << "osd." << osd->whoami
2237 << " found snap mapper error on pg "
2238 << info.pgid
2239 << " oid " << hoid << " snaps missing in mapper"
2240 << ", should be: "
2241 << obj_snaps
2242 << " was " << cur_snaps << " r " << r
2243 << "...repaired";
2244 }
2245 snap_mapper.add_oid(hoid, obj_snaps, &_t);
2246
2247 // wait for repair to apply to avoid confusing other bits of the system.
2248 {
2249 ceph::condition_variable my_cond;
2250 ceph::mutex my_lock = ceph::make_mutex("PG::_scan_snaps my_lock");
2251 int r = 0;
2252 bool done;
2253 t.register_on_applied_sync(
2254 new C_SafeCond(my_lock, my_cond, &done, &r));
2255 r = osd->store->queue_transaction(ch, std::move(t));
2256 if (r != 0) {
2257 derr << __func__ << ": queue_transaction got " << cpp_strerror(r)
2258 << dendl;
2259 } else {
2260 std::unique_lock l{my_lock};
2261 my_cond.wait(l, [&done] { return done;});
2262 }
2263 }
2264 }
2265 }
2266 }
2267 }
2268
2269 void PG::_repair_oinfo_oid(ScrubMap &smap)
2270 {
2271 for (map<hobject_t, ScrubMap::object>::reverse_iterator i = smap.objects.rbegin();
2272 i != smap.objects.rend();
2273 ++i) {
2274 const hobject_t &hoid = i->first;
2275 ScrubMap::object &o = i->second;
2276
2277 bufferlist bl;
2278 if (o.attrs.find(OI_ATTR) == o.attrs.end()) {
2279 continue;
2280 }
2281 bl.push_back(o.attrs[OI_ATTR]);
2282 object_info_t oi;
2283 try {
2284 oi.decode(bl);
2285 } catch(...) {
2286 continue;
2287 }
2288 if (oi.soid != hoid) {
2289 ObjectStore::Transaction t;
2290 OSDriver::OSTransaction _t(osdriver.get_transaction(&t));
2291 osd->clog->error() << "osd." << osd->whoami
2292 << " found object info error on pg "
2293 << info.pgid
2294 << " oid " << hoid << " oid in object info: "
2295 << oi.soid
2296 << "...repaired";
2297 // Fix object info
2298 oi.soid = hoid;
2299 bl.clear();
2300 encode(oi, bl, get_osdmap()->get_features(CEPH_ENTITY_TYPE_OSD, nullptr));
2301
2302 bufferptr bp(bl.c_str(), bl.length());
2303 o.attrs[OI_ATTR] = bp;
2304
2305 t.setattr(coll, ghobject_t(hoid), OI_ATTR, bl);
2306 int r = osd->store->queue_transaction(ch, std::move(t));
2307 if (r != 0) {
2308 derr << __func__ << ": queue_transaction got " << cpp_strerror(r)
2309 << dendl;
2310 }
2311 }
2312 }
2313 }
2314 int PG::build_scrub_map_chunk(
2315 ScrubMap &map,
2316 ScrubMapBuilder &pos,
2317 hobject_t start,
2318 hobject_t end,
2319 bool deep,
2320 ThreadPool::TPHandle &handle)
2321 {
2322 dout(10) << __func__ << " [" << start << "," << end << ") "
2323 << " pos " << pos
2324 << dendl;
2325
2326 // start
2327 while (pos.empty()) {
2328 pos.deep = deep;
2329 map.valid_through = info.last_update;
2330
2331 // objects
2332 vector<ghobject_t> rollback_obs;
2333 pos.ret = get_pgbackend()->objects_list_range(
2334 start,
2335 end,
2336 &pos.ls,
2337 &rollback_obs);
2338 if (pos.ret < 0) {
2339 dout(5) << "objects_list_range error: " << pos.ret << dendl;
2340 return pos.ret;
2341 }
2342 if (pos.ls.empty()) {
2343 break;
2344 }
2345 _scan_rollback_obs(rollback_obs);
2346 pos.pos = 0;
2347 return -EINPROGRESS;
2348 }
2349
2350 // scan objects
2351 while (!pos.done()) {
2352 int r = get_pgbackend()->be_scan_list(map, pos);
2353 if (r == -EINPROGRESS) {
2354 return r;
2355 }
2356 }
2357
2358 // finish
2359 dout(20) << __func__ << " finishing" << dendl;
2360 ceph_assert(pos.done());
2361 _repair_oinfo_oid(map);
2362 if (!is_primary()) {
2363 ScrubMap for_meta_scrub;
2364 // In case we restarted smaller chunk, clear old data
2365 scrubber.cleaned_meta_map.clear_from(scrubber.start);
2366 scrubber.cleaned_meta_map.insert(map);
2367 scrubber.clean_meta_map(for_meta_scrub);
2368 _scan_snaps(for_meta_scrub);
2369 }
2370
2371 dout(20) << __func__ << " done, got " << map.objects.size() << " items"
2372 << dendl;
2373 return 0;
2374 }
2375
2376 void PG::Scrubber::cleanup_store(ObjectStore::Transaction *t) {
2377 if (!store)
2378 return;
2379 struct OnComplete : Context {
2380 std::unique_ptr<Scrub::Store> store;
2381 explicit OnComplete(
2382 std::unique_ptr<Scrub::Store> &&store)
2383 : store(std::move(store)) {}
2384 void finish(int) override {}
2385 };
2386 store->cleanup(t);
2387 t->register_on_complete(new OnComplete(std::move(store)));
2388 ceph_assert(!store);
2389 }
2390
2391 void PG::repair_object(
2392 const hobject_t &soid,
2393 const list<pair<ScrubMap::object, pg_shard_t> > &ok_peers,
2394 const set<pg_shard_t> &bad_peers)
2395 {
2396 set<pg_shard_t> ok_shards;
2397 for (auto &&peer: ok_peers) ok_shards.insert(peer.second);
2398
2399 dout(10) << "repair_object " << soid
2400 << " bad_peers osd.{" << bad_peers << "},"
2401 << " ok_peers osd.{" << ok_shards << "}" << dendl;
2402
2403 const ScrubMap::object &po = ok_peers.back().first;
2404 eversion_t v;
2405 object_info_t oi;
2406 try {
2407 bufferlist bv;
2408 if (po.attrs.count(OI_ATTR)) {
2409 bv.push_back(po.attrs.find(OI_ATTR)->second);
2410 }
2411 auto bliter = bv.cbegin();
2412 decode(oi, bliter);
2413 } catch (...) {
2414 dout(0) << __func__ << ": Need version of replica, bad object_info_t: "
2415 << soid << dendl;
2416 ceph_abort();
2417 }
2418
2419 if (bad_peers.count(get_primary())) {
2420 // We should only be scrubbing if the PG is clean.
2421 ceph_assert(waiting_for_unreadable_object.empty());
2422 dout(10) << __func__ << ": primary = " << get_primary() << dendl;
2423 }
2424
2425 /* No need to pass ok_peers, they must not be missing the object, so
2426 * force_object_missing will add them to missing_loc anyway */
2427 recovery_state.force_object_missing(bad_peers, soid, oi.version);
2428 }
2429
2430 /* replica_scrub
2431 *
2432 * Wait for last_update_applied to match msg->scrub_to as above. Wait
2433 * for pushes to complete in case of recent recovery. Build a single
2434 * scrubmap of objects that are in the range [msg->start, msg->end).
2435 */
2436 void PG::replica_scrub(
2437 OpRequestRef op,
2438 ThreadPool::TPHandle &handle)
2439 {
2440 auto msg = op->get_req<MOSDRepScrub>();
2441 ceph_assert(!scrubber.active_rep_scrub);
2442 dout(7) << "replica_scrub" << dendl;
2443
2444 if (msg->map_epoch < info.history.same_interval_since) {
2445 dout(10) << "replica_scrub discarding old replica_scrub from "
2446 << msg->map_epoch << " < " << info.history.same_interval_since
2447 << dendl;
2448 return;
2449 }
2450
2451 ceph_assert(msg->chunky);
2452 if (active_pushes > 0) {
2453 dout(10) << "waiting for active pushes to finish" << dendl;
2454 scrubber.active_rep_scrub = op;
2455 return;
2456 }
2457
2458 scrubber.state = Scrubber::BUILD_MAP_REPLICA;
2459 scrubber.replica_scrub_start = msg->min_epoch;
2460 scrubber.start = msg->start;
2461 scrubber.end = msg->end;
2462 scrubber.max_end = msg->end;
2463 scrubber.deep = msg->deep;
2464 scrubber.epoch_start = info.history.same_interval_since;
2465 if (msg->priority) {
2466 scrubber.priority = msg->priority;
2467 } else {
2468 scrubber.priority = get_scrub_priority();
2469 }
2470
2471 scrub_can_preempt = msg->allow_preemption;
2472 scrub_preempted = false;
2473 scrubber.replica_scrubmap_pos.reset();
2474
2475 requeue_scrub(msg->high_priority);
2476 }
2477
2478 /* Scrub:
2479 * PG_STATE_SCRUBBING is set when the scrub is queued
2480 *
2481 * scrub will be chunky if all OSDs in PG support chunky scrub
2482 * scrub will fail if OSDs are too old.
2483 */
2484 void PG::scrub(epoch_t queued, ThreadPool::TPHandle &handle)
2485 {
2486 OSDService *osds = osd;
2487 double scrub_sleep = osds->osd->scrub_sleep_time(scrubber.must_scrub);
2488 if (scrub_sleep > 0 &&
2489 (scrubber.state == PG::Scrubber::NEW_CHUNK ||
2490 scrubber.state == PG::Scrubber::INACTIVE) &&
2491 scrubber.needs_sleep) {
2492 ceph_assert(!scrubber.sleeping);
2493 dout(20) << __func__ << " state is INACTIVE|NEW_CHUNK, sleeping" << dendl;
2494
2495 // Do an async sleep so we don't block the op queue
2496 spg_t pgid = get_pgid();
2497 int state = scrubber.state;
2498 auto scrub_requeue_callback =
2499 new LambdaContext([osds, pgid, state](int r) {
2500 PGRef pg = osds->osd->lookup_lock_pg(pgid);
2501 if (pg == nullptr) {
2502 lgeneric_dout(osds->osd->cct, 20)
2503 << "scrub_requeue_callback: Could not find "
2504 << "PG " << pgid << " can't complete scrub requeue after sleep"
2505 << dendl;
2506 return;
2507 }
2508 pg->scrubber.sleeping = false;
2509 pg->scrubber.needs_sleep = false;
2510 lgeneric_dout(pg->cct, 20)
2511 << "scrub_requeue_callback: slept for "
2512 << ceph_clock_now() - pg->scrubber.sleep_start
2513 << ", re-queuing scrub with state " << state << dendl;
2514 pg->scrub_queued = false;
2515 pg->requeue_scrub();
2516 pg->scrubber.sleep_start = utime_t();
2517 pg->unlock();
2518 });
2519 std::lock_guard l(osd->sleep_lock);
2520 osd->sleep_timer.add_event_after(scrub_sleep,
2521 scrub_requeue_callback);
2522 scrubber.sleeping = true;
2523 scrubber.sleep_start = ceph_clock_now();
2524 return;
2525 }
2526 if (pg_has_reset_since(queued)) {
2527 return;
2528 }
2529 ceph_assert(scrub_queued);
2530 scrub_queued = false;
2531 scrubber.needs_sleep = true;
2532
2533 // for the replica
2534 if (!is_primary() &&
2535 scrubber.state == PG::Scrubber::BUILD_MAP_REPLICA) {
2536 chunky_scrub(handle);
2537 return;
2538 }
2539
2540 if (!is_primary() || !is_active() || !is_clean() || !is_scrubbing()) {
2541 dout(10) << "scrub -- not primary or active or not clean" << dendl;
2542 state_clear(PG_STATE_SCRUBBING);
2543 state_clear(PG_STATE_REPAIR);
2544 state_clear(PG_STATE_DEEP_SCRUB);
2545 publish_stats_to_osd();
2546 return;
2547 }
2548
2549 if (!scrubber.active) {
2550 ceph_assert(recovery_state.get_backfill_targets().empty());
2551
2552 scrubber.deep = state_test(PG_STATE_DEEP_SCRUB);
2553
2554 dout(10) << "starting a new chunky scrub" << dendl;
2555 }
2556
2557 chunky_scrub(handle);
2558 }
2559
2560 void PG::abort_scrub()
2561 {
2562 scrub_clear_state();
2563 scrub_unreserve_replicas();
2564 }
2565
2566 /*
2567 * Chunky scrub scrubs objects one chunk at a time with writes blocked for that
2568 * chunk.
2569 *
2570 * The object store is partitioned into chunks which end on hash boundaries. For
2571 * each chunk, the following logic is performed:
2572 *
2573 * (1) Block writes on the chunk
2574 * (2) Request maps from replicas
2575 * (3) Wait for pushes to be applied (after recovery)
2576 * (4) Wait for writes to flush on the chunk
2577 * (5) Wait for maps from replicas
2578 * (6) Compare / repair all scrub maps
2579 * (7) Wait for digest updates to apply
2580 *
2581 * This logic is encoded in the mostly linear state machine:
2582 *
2583 * +------------------+
2584 * _________v__________ |
2585 * | | |
2586 * | INACTIVE | |
2587 * |____________________| |
2588 * | |
2589 * | +----------+ |
2590 * _________v___v______ | |
2591 * | | | |
2592 * | NEW_CHUNK | | |
2593 * |____________________| | |
2594 * | | |
2595 * _________v__________ | |
2596 * | | | |
2597 * | WAIT_PUSHES | | |
2598 * |____________________| | |
2599 * | | |
2600 * _________v__________ | |
2601 * | | | |
2602 * | WAIT_LAST_UPDATE | | |
2603 * |____________________| | |
2604 * | | |
2605 * _________v__________ | |
2606 * | | | |
2607 * | BUILD_MAP | | |
2608 * |____________________| | |
2609 * | | |
2610 * _________v__________ | |
2611 * | | | |
2612 * | WAIT_REPLICAS | | |
2613 * |____________________| | |
2614 * | | |
2615 * _________v__________ | |
2616 * | | | |
2617 * | COMPARE_MAPS | | |
2618 * |____________________| | |
2619 * | | |
2620 * | | |
2621 * _________v__________ | |
2622 * | | | |
2623 * |WAIT_DIGEST_UPDATES | | |
2624 * |____________________| | |
2625 * | | | |
2626 * | +----------+ |
2627 * _________v__________ |
2628 * | | |
2629 * | FINISH | |
2630 * |____________________| |
2631 * | |
2632 * +------------------+
2633 *
2634 * The primary determines the last update from the subset by walking the log. If
2635 * it sees a log entry pertaining to a file in the chunk, it tells the replicas
2636 * to wait until that update is applied before building a scrub map. Both the
2637 * primary and replicas will wait for any active pushes to be applied.
2638 *
2639 * In contrast to classic_scrub, chunky_scrub is entirely handled by scrub_wq.
2640 *
2641 * scrubber.state encodes the current state of the scrub (refer to state diagram
2642 * for details).
2643 */
2644 void PG::chunky_scrub(ThreadPool::TPHandle &handle)
2645 {
2646 // check for map changes
2647 if (scrubber.is_chunky_scrub_active()) {
2648 if (scrubber.epoch_start != info.history.same_interval_since) {
2649 dout(10) << "scrub pg changed, aborting" << dendl;
2650 abort_scrub();
2651 return;
2652 }
2653 }
2654
2655 bool done = false;
2656 int ret;
2657
2658 while (!done) {
2659 dout(20) << "scrub state " << Scrubber::state_string(scrubber.state)
2660 << " [" << scrubber.start << "," << scrubber.end << ")"
2661 << " max_end " << scrubber.max_end << dendl;
2662
2663 switch (scrubber.state) {
2664 case PG::Scrubber::INACTIVE:
2665 dout(10) << "scrub start" << dendl;
2666 ceph_assert(is_primary());
2667
2668 publish_stats_to_osd();
2669 scrubber.epoch_start = info.history.same_interval_since;
2670 scrubber.active = true;
2671
2672 {
2673 ObjectStore::Transaction t;
2674 scrubber.cleanup_store(&t);
2675 scrubber.store.reset(Scrub::Store::create(osd->store, &t,
2676 info.pgid, coll));
2677 osd->store->queue_transaction(ch, std::move(t), nullptr);
2678 }
2679
2680 // Don't include temporary objects when scrubbing
2681 scrubber.start = info.pgid.pgid.get_hobj_start();
2682 scrubber.state = PG::Scrubber::NEW_CHUNK;
2683
2684 {
2685 bool repair = state_test(PG_STATE_REPAIR);
2686 bool deep_scrub = state_test(PG_STATE_DEEP_SCRUB);
2687 const char *mode = (repair ? "repair": (deep_scrub ? "deep-scrub" : "scrub"));
2688 stringstream oss;
2689 oss << info.pgid.pgid << " " << mode << " starts" << std::endl;
2690 osd->clog->debug(oss);
2691 }
2692
2693 scrubber.preempt_left = cct->_conf.get_val<uint64_t>(
2694 "osd_scrub_max_preemptions");
2695 scrubber.preempt_divisor = 1;
2696 break;
2697
2698 case PG::Scrubber::NEW_CHUNK:
2699 scrubber.primary_scrubmap = ScrubMap();
2700 scrubber.received_maps.clear();
2701
2702 // begin (possible) preemption window
2703 if (scrub_preempted) {
2704 scrubber.preempt_left--;
2705 scrubber.preempt_divisor *= 2;
2706 dout(10) << __func__ << " preempted, " << scrubber.preempt_left
2707 << " left" << dendl;
2708 scrub_preempted = false;
2709 }
2710 scrub_can_preempt = scrubber.preempt_left > 0;
2711
2712 {
2713 /* get the start and end of our scrub chunk
2714 *
2715 * Our scrub chunk has an important restriction we're going to need to
2716 * respect. We can't let head be start or end.
2717 * Using a half-open interval means that if end == head,
2718 * we'd scrub/lock head and the clone right next to head in different
2719 * chunks which would allow us to miss clones created between
2720 * scrubbing that chunk and scrubbing the chunk including head.
2721 * This isn't true for any of the other clones since clones can
2722 * only be created "just to the left of" head. There is one exception
2723 * to this: promotion of clones which always happens to the left of the
2724 * left-most clone, but promote_object checks the scrubber in that
2725 * case, so it should be ok. Also, it's ok to "miss" clones at the
2726 * left end of the range if we are a tier because they may legitimately
2727 * not exist (see _scrub).
2728 */
2729 ceph_assert(scrubber.preempt_divisor > 0);
2730 int min = std::max<int64_t>(3, cct->_conf->osd_scrub_chunk_min /
2731 scrubber.preempt_divisor);
2732 int max = std::max<int64_t>(min, cct->_conf->osd_scrub_chunk_max /
2733 scrubber.preempt_divisor);
2734 hobject_t start = scrubber.start;
2735 hobject_t candidate_end;
2736 vector<hobject_t> objects;
2737 ret = get_pgbackend()->objects_list_partial(
2738 start,
2739 min,
2740 max,
2741 &objects,
2742 &candidate_end);
2743 ceph_assert(ret >= 0);
2744
2745 if (!objects.empty()) {
2746 hobject_t back = objects.back();
2747 while (candidate_end.is_head() &&
2748 candidate_end == back.get_head()) {
2749 candidate_end = back;
2750 objects.pop_back();
2751 if (objects.empty()) {
2752 ceph_assert(0 ==
2753 "Somehow we got more than 2 objects which"
2754 "have the same head but are not clones");
2755 }
2756 back = objects.back();
2757 }
2758 if (candidate_end.is_head()) {
2759 ceph_assert(candidate_end != back.get_head());
2760 candidate_end = candidate_end.get_object_boundary();
2761 }
2762 } else {
2763 ceph_assert(candidate_end.is_max());
2764 }
2765
2766 if (!_range_available_for_scrub(scrubber.start, candidate_end)) {
2767 // we'll be requeued by whatever made us unavailable for scrub
2768 dout(10) << __func__ << ": scrub blocked somewhere in range "
2769 << "[" << scrubber.start << ", " << candidate_end << ")"
2770 << dendl;
2771 done = true;
2772 break;
2773 }
2774 scrubber.end = candidate_end;
2775 if (scrubber.end > scrubber.max_end)
2776 scrubber.max_end = scrubber.end;
2777 }
2778
2779 // walk the log to find the latest update that affects our chunk
2780 scrubber.subset_last_update = eversion_t();
2781 for (auto p = projected_log.log.rbegin();
2782 p != projected_log.log.rend();
2783 ++p) {
2784 if (p->soid >= scrubber.start &&
2785 p->soid < scrubber.end) {
2786 scrubber.subset_last_update = p->version;
2787 break;
2788 }
2789 }
2790 if (scrubber.subset_last_update == eversion_t()) {
2791 for (list<pg_log_entry_t>::const_reverse_iterator p =
2792 recovery_state.get_pg_log().get_log().log.rbegin();
2793 p != recovery_state.get_pg_log().get_log().log.rend();
2794 ++p) {
2795 if (p->soid >= scrubber.start &&
2796 p->soid < scrubber.end) {
2797 scrubber.subset_last_update = p->version;
2798 break;
2799 }
2800 }
2801 }
2802
2803 scrubber.state = PG::Scrubber::WAIT_PUSHES;
2804 break;
2805
2806 case PG::Scrubber::WAIT_PUSHES:
2807 if (active_pushes == 0) {
2808 scrubber.state = PG::Scrubber::WAIT_LAST_UPDATE;
2809 } else {
2810 dout(15) << "wait for pushes to apply" << dendl;
2811 done = true;
2812 }
2813 break;
2814
2815 case PG::Scrubber::WAIT_LAST_UPDATE:
2816 if (recovery_state.get_last_update_applied() <
2817 scrubber.subset_last_update) {
2818 // will be requeued by op_applied
2819 dout(15) << "wait for EC read/modify/writes to queue" << dendl;
2820 done = true;
2821 break;
2822 }
2823
2824 // ask replicas to scan
2825 scrubber.waiting_on_whom.insert(pg_whoami);
2826
2827 // request maps from replicas
2828 for (set<pg_shard_t>::iterator i = get_acting_recovery_backfill().begin();
2829 i != get_acting_recovery_backfill().end();
2830 ++i) {
2831 if (*i == pg_whoami) continue;
2832 _request_scrub_map(*i, scrubber.subset_last_update,
2833 scrubber.start, scrubber.end, scrubber.deep,
2834 scrubber.preempt_left > 0);
2835 scrubber.waiting_on_whom.insert(*i);
2836 }
2837 dout(10) << __func__ << " waiting_on_whom " << scrubber.waiting_on_whom
2838 << dendl;
2839
2840 scrubber.state = PG::Scrubber::BUILD_MAP;
2841 scrubber.primary_scrubmap_pos.reset();
2842 break;
2843
2844 case PG::Scrubber::BUILD_MAP:
2845 ceph_assert(recovery_state.get_last_update_applied() >=
2846 scrubber.subset_last_update);
2847
2848 // build my own scrub map
2849 if (scrub_preempted) {
2850 dout(10) << __func__ << " preempted" << dendl;
2851 scrubber.state = PG::Scrubber::BUILD_MAP_DONE;
2852 break;
2853 }
2854 ret = build_scrub_map_chunk(
2855 scrubber.primary_scrubmap,
2856 scrubber.primary_scrubmap_pos,
2857 scrubber.start, scrubber.end,
2858 scrubber.deep,
2859 handle);
2860 if (ret == -EINPROGRESS) {
2861 requeue_scrub();
2862 done = true;
2863 break;
2864 }
2865 scrubber.state = PG::Scrubber::BUILD_MAP_DONE;
2866 break;
2867
2868 case PG::Scrubber::BUILD_MAP_DONE:
2869 if (scrubber.primary_scrubmap_pos.ret < 0) {
2870 dout(5) << "error: " << scrubber.primary_scrubmap_pos.ret
2871 << ", aborting" << dendl;
2872 scrub_clear_state();
2873 scrub_unreserve_replicas();
2874 return;
2875 }
2876 dout(10) << __func__ << " waiting_on_whom was "
2877 << scrubber.waiting_on_whom << dendl;
2878 ceph_assert(scrubber.waiting_on_whom.count(pg_whoami));
2879 scrubber.waiting_on_whom.erase(pg_whoami);
2880
2881 scrubber.state = PG::Scrubber::WAIT_REPLICAS;
2882 break;
2883
2884 case PG::Scrubber::WAIT_REPLICAS:
2885 if (!scrubber.waiting_on_whom.empty()) {
2886 // will be requeued by do_replica_scrub_map
2887 dout(10) << "wait for replicas to build scrub map" << dendl;
2888 done = true;
2889 break;
2890 }
2891 // Since repair is only by request and we need to scrub afterward
2892 // treat the same as req_scrub.
2893 if (!scrubber.req_scrub) {
2894 if (state_test(PG_STATE_DEEP_SCRUB)) {
2895 if (get_osdmap()->test_flag(CEPH_OSDMAP_NODEEP_SCRUB) ||
2896 pool.info.has_flag(pg_pool_t::FLAG_NODEEP_SCRUB)) {
2897 dout(10) << "nodeep_scrub set, aborting" << dendl;
2898 abort_scrub();
2899 return;
2900 }
2901 } else if (state_test(PG_STATE_SCRUBBING)) {
2902 if (get_osdmap()->test_flag(CEPH_OSDMAP_NOSCRUB) || pool.info.has_flag(pg_pool_t::FLAG_NOSCRUB)) {
2903 dout(10) << "noscrub set, aborting" << dendl;
2904 abort_scrub();
2905 return;
2906 }
2907 }
2908 }
2909 // end (possible) preemption window
2910 scrub_can_preempt = false;
2911 if (scrub_preempted) {
2912 dout(10) << __func__ << " preempted, restarting chunk" << dendl;
2913 scrubber.state = PG::Scrubber::NEW_CHUNK;
2914 } else {
2915 scrubber.state = PG::Scrubber::COMPARE_MAPS;
2916 }
2917 break;
2918
2919 case PG::Scrubber::COMPARE_MAPS:
2920 ceph_assert(recovery_state.get_last_update_applied() >=
2921 scrubber.subset_last_update);
2922 ceph_assert(scrubber.waiting_on_whom.empty());
2923
2924 scrub_compare_maps();
2925 scrubber.start = scrubber.end;
2926 scrubber.run_callbacks();
2927
2928 // requeue the writes from the chunk that just finished
2929 requeue_ops(waiting_for_scrub);
2930
2931 scrubber.state = PG::Scrubber::WAIT_DIGEST_UPDATES;
2932
2933 // fall-thru
2934
2935 case PG::Scrubber::WAIT_DIGEST_UPDATES:
2936 if (scrubber.num_digest_updates_pending) {
2937 dout(10) << __func__ << " waiting on "
2938 << scrubber.num_digest_updates_pending
2939 << " digest updates" << dendl;
2940 done = true;
2941 break;
2942 }
2943
2944 scrubber.preempt_left = cct->_conf.get_val<uint64_t>(
2945 "osd_scrub_max_preemptions");
2946 scrubber.preempt_divisor = 1;
2947
2948 if (!(scrubber.end.is_max())) {
2949 scrubber.state = PG::Scrubber::NEW_CHUNK;
2950 requeue_scrub();
2951 done = true;
2952 } else {
2953 scrubber.state = PG::Scrubber::FINISH;
2954 }
2955
2956 break;
2957
2958 case PG::Scrubber::FINISH:
2959 scrub_finish();
2960 scrubber.state = PG::Scrubber::INACTIVE;
2961 done = true;
2962
2963 if (!snap_trimq.empty()) {
2964 dout(10) << "scrub finished, requeuing snap_trimmer" << dendl;
2965 snap_trimmer_scrub_complete();
2966 }
2967
2968 break;
2969
2970 case PG::Scrubber::BUILD_MAP_REPLICA:
2971 // build my own scrub map
2972 if (scrub_preempted) {
2973 dout(10) << __func__ << " preempted" << dendl;
2974 ret = 0;
2975 } else {
2976 ret = build_scrub_map_chunk(
2977 scrubber.replica_scrubmap,
2978 scrubber.replica_scrubmap_pos,
2979 scrubber.start, scrubber.end,
2980 scrubber.deep,
2981 handle);
2982 }
2983 if (ret == -EINPROGRESS) {
2984 requeue_scrub();
2985 done = true;
2986 break;
2987 }
2988 // reply
2989 {
2990 MOSDRepScrubMap *reply = new MOSDRepScrubMap(
2991 spg_t(info.pgid.pgid, get_primary().shard),
2992 scrubber.replica_scrub_start,
2993 pg_whoami);
2994 reply->preempted = scrub_preempted;
2995 ::encode(scrubber.replica_scrubmap, reply->get_data());
2996 osd->send_message_osd_cluster(
2997 get_primary().osd, reply,
2998 scrubber.replica_scrub_start);
2999 }
3000 scrub_preempted = false;
3001 scrub_can_preempt = false;
3002 scrubber.state = PG::Scrubber::INACTIVE;
3003 scrubber.replica_scrubmap = ScrubMap();
3004 scrubber.replica_scrubmap_pos = ScrubMapBuilder();
3005 scrubber.start = hobject_t();
3006 scrubber.end = hobject_t();
3007 scrubber.max_end = hobject_t();
3008 done = true;
3009 break;
3010
3011 default:
3012 ceph_abort();
3013 }
3014 }
3015 dout(20) << "scrub final state " << Scrubber::state_string(scrubber.state)
3016 << " [" << scrubber.start << "," << scrubber.end << ")"
3017 << " max_end " << scrubber.max_end << dendl;
3018 }
3019
3020 bool PG::write_blocked_by_scrub(const hobject_t& soid)
3021 {
3022 if (soid < scrubber.start || soid >= scrubber.end) {
3023 return false;
3024 }
3025 if (scrub_can_preempt) {
3026 if (!scrub_preempted) {
3027 dout(10) << __func__ << " " << soid << " preempted" << dendl;
3028 scrub_preempted = true;
3029 } else {
3030 dout(10) << __func__ << " " << soid << " already preempted" << dendl;
3031 }
3032 return false;
3033 }
3034 return true;
3035 }
3036
3037 bool PG::range_intersects_scrub(const hobject_t &start, const hobject_t& end)
3038 {
3039 // does [start, end] intersect [scrubber.start, scrubber.max_end)
3040 return (start < scrubber.max_end &&
3041 end >= scrubber.start);
3042 }
3043
3044 void PG::scrub_clear_state(bool has_error)
3045 {
3046 ceph_assert(is_locked());
3047 state_clear(PG_STATE_SCRUBBING);
3048 if (!has_error)
3049 state_clear(PG_STATE_REPAIR);
3050 state_clear(PG_STATE_DEEP_SCRUB);
3051 publish_stats_to_osd();
3052
3053 scrubber.req_scrub = false;
3054 // local -> nothing.
3055 if (scrubber.local_reserved) {
3056 osd->dec_scrubs_local();
3057 scrubber.local_reserved = false;
3058 scrubber.reserved_peers.clear();
3059 }
3060
3061 requeue_ops(waiting_for_scrub);
3062
3063 scrubber.reset();
3064
3065 // type-specific state clear
3066 _scrub_clear_state();
3067 }
3068
3069 void PG::scrub_compare_maps()
3070 {
3071 dout(10) << __func__ << " has maps, analyzing" << dendl;
3072
3073 // construct authoritative scrub map for type specific scrubbing
3074 scrubber.cleaned_meta_map.insert(scrubber.primary_scrubmap);
3075 map<hobject_t,
3076 pair<std::optional<uint32_t>,
3077 std::optional<uint32_t>>> missing_digest;
3078
3079 map<pg_shard_t, ScrubMap *> maps;
3080 maps[pg_whoami] = &scrubber.primary_scrubmap;
3081
3082 for (const auto& i : get_acting_recovery_backfill()) {
3083 if (i == pg_whoami) continue;
3084 dout(2) << __func__ << " replica " << i << " has "
3085 << scrubber.received_maps[i].objects.size()
3086 << " items" << dendl;
3087 maps[i] = &scrubber.received_maps[i];
3088 }
3089
3090 set<hobject_t> master_set;
3091
3092 // Construct master set
3093 for (const auto map : maps) {
3094 for (const auto i : map.second->objects) {
3095 master_set.insert(i.first);
3096 }
3097 }
3098
3099 stringstream ss;
3100 get_pgbackend()->be_omap_checks(maps, master_set,
3101 scrubber.omap_stats, ss);
3102
3103 if (!ss.str().empty()) {
3104 osd->clog->warn(ss);
3105 }
3106
3107 if (recovery_state.get_acting().size() > 1) {
3108 dout(10) << __func__ << " comparing replica scrub maps" << dendl;
3109
3110 // Map from object with errors to good peer
3111 map<hobject_t, list<pg_shard_t>> authoritative;
3112
3113 dout(2) << __func__ << get_primary() << " has "
3114 << scrubber.primary_scrubmap.objects.size() << " items" << dendl;
3115
3116 ss.str("");
3117 ss.clear();
3118
3119 get_pgbackend()->be_compare_scrubmaps(
3120 maps,
3121 master_set,
3122 state_test(PG_STATE_REPAIR),
3123 scrubber.missing,
3124 scrubber.inconsistent,
3125 authoritative,
3126 missing_digest,
3127 scrubber.shallow_errors,
3128 scrubber.deep_errors,
3129 scrubber.store.get(),
3130 info.pgid, recovery_state.get_acting(),
3131 ss);
3132 dout(2) << ss.str() << dendl;
3133
3134 if (!ss.str().empty()) {
3135 osd->clog->error(ss);
3136 }
3137
3138 for (map<hobject_t, list<pg_shard_t>>::iterator i = authoritative.begin();
3139 i != authoritative.end();
3140 ++i) {
3141 list<pair<ScrubMap::object, pg_shard_t> > good_peers;
3142 for (list<pg_shard_t>::const_iterator j = i->second.begin();
3143 j != i->second.end();
3144 ++j) {
3145 good_peers.emplace_back(maps[*j]->objects[i->first], *j);
3146 }
3147 scrubber.authoritative.emplace(i->first, good_peers);
3148 }
3149
3150 for (map<hobject_t, list<pg_shard_t>>::iterator i = authoritative.begin();
3151 i != authoritative.end();
3152 ++i) {
3153 scrubber.cleaned_meta_map.objects.erase(i->first);
3154 scrubber.cleaned_meta_map.objects.insert(
3155 *(maps[i->second.back()]->objects.find(i->first))
3156 );
3157 }
3158 }
3159
3160 ScrubMap for_meta_scrub;
3161 scrubber.clean_meta_map(for_meta_scrub);
3162
3163 // ok, do the pg-type specific scrubbing
3164 scrub_snapshot_metadata(for_meta_scrub, missing_digest);
3165 // Called here on the primary can use an authoritative map if it isn't the primary
3166 _scan_snaps(for_meta_scrub);
3167 if (!scrubber.store->empty()) {
3168 if (state_test(PG_STATE_REPAIR)) {
3169 dout(10) << __func__ << ": discarding scrub results" << dendl;
3170 scrubber.store->flush(nullptr);
3171 } else {
3172 dout(10) << __func__ << ": updating scrub object" << dendl;
3173 ObjectStore::Transaction t;
3174 scrubber.store->flush(&t);
3175 osd->store->queue_transaction(ch, std::move(t), nullptr);
3176 }
3177 }
3178 }
3179
3180 bool PG::scrub_process_inconsistent()
3181 {
3182 dout(10) << __func__ << ": checking authoritative" << dendl;
3183 bool repair = state_test(PG_STATE_REPAIR);
3184 bool deep_scrub = state_test(PG_STATE_DEEP_SCRUB);
3185 const char *mode = (repair ? "repair": (deep_scrub ? "deep-scrub" : "scrub"));
3186
3187 // authoriative only store objects which missing or inconsistent.
3188 if (!scrubber.authoritative.empty()) {
3189 stringstream ss;
3190 ss << info.pgid << " " << mode << " "
3191 << scrubber.missing.size() << " missing, "
3192 << scrubber.inconsistent.size() << " inconsistent objects";
3193 dout(2) << ss.str() << dendl;
3194 osd->clog->error(ss);
3195 if (repair) {
3196 state_clear(PG_STATE_CLEAN);
3197 for (map<hobject_t, list<pair<ScrubMap::object, pg_shard_t> >>::iterator i =
3198 scrubber.authoritative.begin();
3199 i != scrubber.authoritative.end();
3200 ++i) {
3201 auto missing_entry = scrubber.missing.find(i->first);
3202 if (missing_entry != scrubber.missing.end()) {
3203 repair_object(
3204 i->first,
3205 i->second,
3206 missing_entry->second);
3207 scrubber.fixed += missing_entry->second.size();
3208 }
3209 if (scrubber.inconsistent.count(i->first)) {
3210 repair_object(
3211 i->first,
3212 i->second,
3213 scrubber.inconsistent[i->first]);
3214 scrubber.fixed += missing_entry->second.size();
3215 }
3216 }
3217 }
3218 }
3219 return (!scrubber.authoritative.empty() && repair);
3220 }
3221
3222 bool PG::ops_blocked_by_scrub() const {
3223 return (waiting_for_scrub.size() != 0);
3224 }
3225
3226 // the part that actually finalizes a scrub
3227 void PG::scrub_finish()
3228 {
3229 dout(20) << __func__ << dendl;
3230 bool repair = state_test(PG_STATE_REPAIR);
3231 bool do_auto_scrub = false;
3232 // if the repair request comes from auto-repair and large number of errors,
3233 // we would like to cancel auto-repair
3234 if (repair && scrubber.auto_repair
3235 && scrubber.authoritative.size() > cct->_conf->osd_scrub_auto_repair_num_errors) {
3236 state_clear(PG_STATE_REPAIR);
3237 repair = false;
3238 }
3239 bool deep_scrub = state_test(PG_STATE_DEEP_SCRUB);
3240 const char *mode = (repair ? "repair": (deep_scrub ? "deep-scrub" : "scrub"));
3241
3242 // if a regular scrub had errors within the limit, do a deep scrub to auto repair.
3243 if (scrubber.deep_scrub_on_error
3244 && scrubber.authoritative.size()
3245 && scrubber.authoritative.size() <= cct->_conf->osd_scrub_auto_repair_num_errors) {
3246 ceph_assert(!deep_scrub);
3247 do_auto_scrub = true;
3248 dout(20) << __func__ << " Try to auto repair after scrub errors" << dendl;
3249 }
3250 scrubber.deep_scrub_on_error = false;
3251
3252 // type-specific finish (can tally more errors)
3253 _scrub_finish();
3254
3255 bool has_error = scrub_process_inconsistent();
3256
3257 {
3258 stringstream oss;
3259 oss << info.pgid.pgid << " " << mode << " ";
3260 int total_errors = scrubber.shallow_errors + scrubber.deep_errors;
3261 if (total_errors)
3262 oss << total_errors << " errors";
3263 else
3264 oss << "ok";
3265 if (!deep_scrub && info.stats.stats.sum.num_deep_scrub_errors)
3266 oss << " ( " << info.stats.stats.sum.num_deep_scrub_errors
3267 << " remaining deep scrub error details lost)";
3268 if (repair)
3269 oss << ", " << scrubber.fixed << " fixed";
3270 if (total_errors)
3271 osd->clog->error(oss);
3272 else
3273 osd->clog->debug(oss);
3274 }
3275
3276 // Since we don't know which errors were fixed, we can only clear them
3277 // when every one has been fixed.
3278 if (repair) {
3279 if (scrubber.fixed == scrubber.shallow_errors + scrubber.deep_errors) {
3280 ceph_assert(deep_scrub);
3281 scrubber.shallow_errors = scrubber.deep_errors = 0;
3282 dout(20) << __func__ << " All may be fixed" << dendl;
3283 } else if (has_error) {
3284 // Deep scrub in order to get corrected error counts
3285 scrub_after_recovery = true;
3286 save_req_scrub = scrubber.req_scrub;
3287 dout(20) << __func__ << " Set scrub_after_recovery, req_scrub=" << save_req_scrub << dendl;
3288 } else if (scrubber.shallow_errors || scrubber.deep_errors) {
3289 // We have errors but nothing can be fixed, so there is no repair
3290 // possible.
3291 state_set(PG_STATE_FAILED_REPAIR);
3292 dout(10) << __func__ << " " << (scrubber.shallow_errors + scrubber.deep_errors)
3293 << " error(s) present with no repair possible" << dendl;
3294 }
3295 }
3296
3297 {
3298 // finish up
3299 ObjectStore::Transaction t;
3300 recovery_state.update_stats(
3301 [this, deep_scrub](auto &history, auto &stats) {
3302 utime_t now = ceph_clock_now();
3303 history.last_scrub = recovery_state.get_info().last_update;
3304 history.last_scrub_stamp = now;
3305 if (scrubber.deep) {
3306 history.last_deep_scrub = recovery_state.get_info().last_update;
3307 history.last_deep_scrub_stamp = now;
3308 }
3309
3310 if (deep_scrub) {
3311 if ((scrubber.shallow_errors == 0) && (scrubber.deep_errors == 0))
3312 history.last_clean_scrub_stamp = now;
3313 stats.stats.sum.num_shallow_scrub_errors = scrubber.shallow_errors;
3314 stats.stats.sum.num_deep_scrub_errors = scrubber.deep_errors;
3315 stats.stats.sum.num_large_omap_objects = scrubber.omap_stats.large_omap_objects;
3316 stats.stats.sum.num_omap_bytes = scrubber.omap_stats.omap_bytes;
3317 stats.stats.sum.num_omap_keys = scrubber.omap_stats.omap_keys;
3318 dout(25) << "scrub_finish shard " << pg_whoami << " num_omap_bytes = "
3319 << stats.stats.sum.num_omap_bytes << " num_omap_keys = "
3320 << stats.stats.sum.num_omap_keys << dendl;
3321 } else {
3322 stats.stats.sum.num_shallow_scrub_errors = scrubber.shallow_errors;
3323 // XXX: last_clean_scrub_stamp doesn't mean the pg is not inconsistent
3324 // because of deep-scrub errors
3325 if (scrubber.shallow_errors == 0)
3326 history.last_clean_scrub_stamp = now;
3327 }
3328 stats.stats.sum.num_scrub_errors =
3329 stats.stats.sum.num_shallow_scrub_errors +
3330 stats.stats.sum.num_deep_scrub_errors;
3331 if (scrubber.check_repair) {
3332 scrubber.check_repair = false;
3333 if (info.stats.stats.sum.num_scrub_errors) {
3334 state_set(PG_STATE_FAILED_REPAIR);
3335 dout(10) << "scrub_finish " << info.stats.stats.sum.num_scrub_errors
3336 << " error(s) still present after re-scrub" << dendl;
3337 }
3338 }
3339 return true;
3340 },
3341 &t);
3342 int tr = osd->store->queue_transaction(ch, std::move(t), NULL);
3343 ceph_assert(tr == 0);
3344 }
3345
3346 if (has_error) {
3347 queue_peering_event(
3348 PGPeeringEventRef(
3349 std::make_shared<PGPeeringEvent>(
3350 get_osdmap_epoch(),
3351 get_osdmap_epoch(),
3352 PeeringState::DoRecovery())));
3353 }
3354
3355 scrub_clear_state(has_error);
3356 scrub_unreserve_replicas();
3357
3358 if (do_auto_scrub) {
3359 scrub_requested(false, false, true);
3360 }
3361
3362 if (is_active() && is_primary()) {
3363 recovery_state.share_pg_info();
3364 }
3365 }
3366
3367 bool PG::old_peering_msg(epoch_t reply_epoch, epoch_t query_epoch)
3368 {
3369 if (get_last_peering_reset() > reply_epoch ||
3370 get_last_peering_reset() > query_epoch) {
3371 dout(10) << "old_peering_msg reply_epoch " << reply_epoch << " query_epoch " << query_epoch
3372 << " last_peering_reset " << get_last_peering_reset()
3373 << dendl;
3374 return true;
3375 }
3376 return false;
3377 }
3378
3379 struct FlushState {
3380 PGRef pg;
3381 epoch_t epoch;
3382 FlushState(PG *pg, epoch_t epoch) : pg(pg), epoch(epoch) {}
3383 ~FlushState() {
3384 std::scoped_lock l{*pg};
3385 if (!pg->pg_has_reset_since(epoch)) {
3386 pg->recovery_state.complete_flush();
3387 }
3388 }
3389 };
3390 typedef std::shared_ptr<FlushState> FlushStateRef;
3391
3392 void PG::start_flush_on_transaction(ObjectStore::Transaction &t)
3393 {
3394 // flush in progress ops
3395 FlushStateRef flush_trigger (std::make_shared<FlushState>(
3396 this, get_osdmap_epoch()));
3397 t.register_on_applied(new ContainerContext<FlushStateRef>(flush_trigger));
3398 t.register_on_commit(new ContainerContext<FlushStateRef>(flush_trigger));
3399 }
3400
3401 bool PG::try_flush_or_schedule_async()
3402 {
3403
3404 Context *c = new QueuePeeringEvt(
3405 this, get_osdmap_epoch(), PeeringState::IntervalFlush());
3406 if (!ch->flush_commit(c)) {
3407 return false;
3408 } else {
3409 delete c;
3410 return true;
3411 }
3412 }
3413
3414 ostream& operator<<(ostream& out, const PG& pg)
3415 {
3416 out << pg.recovery_state;
3417 if (pg.scrubber.must_repair)
3418 out << " MUST_REPAIR";
3419 if (pg.scrubber.auto_repair)
3420 out << " AUTO_REPAIR";
3421 if (pg.scrubber.check_repair)
3422 out << " CHECK_REPAIR";
3423 if (pg.scrubber.deep_scrub_on_error)
3424 out << " DEEP_SCRUB_ON_ERROR";
3425 if (pg.scrubber.must_deep_scrub)
3426 out << " MUST_DEEP_SCRUB";
3427 if (pg.scrubber.must_scrub)
3428 out << " MUST_SCRUB";
3429 if (pg.scrubber.time_for_deep)
3430 out << " TIME_FOR_DEEP";
3431 if (pg.scrubber.need_auto)
3432 out << " NEED_AUTO";
3433 if (pg.scrubber.req_scrub)
3434 out << " REQ_SCRUB";
3435
3436 if (pg.recovery_ops_active)
3437 out << " rops=" << pg.recovery_ops_active;
3438
3439 //out << " (" << pg.pg_log.get_tail() << "," << pg.pg_log.get_head() << "]";
3440 if (pg.recovery_state.have_missing()) {
3441 out << " m=" << pg.recovery_state.get_num_missing();
3442 if (pg.is_primary()) {
3443 uint64_t unfound = pg.recovery_state.get_num_unfound();
3444 if (unfound)
3445 out << " u=" << unfound;
3446 }
3447 }
3448 if (!pg.is_clean()) {
3449 out << " mbc=" << pg.recovery_state.get_missing_by_count();
3450 }
3451 if (!pg.snap_trimq.empty()) {
3452 out << " trimq=";
3453 // only show a count if the set is large
3454 if (pg.snap_trimq.num_intervals() > 16) {
3455 out << pg.snap_trimq.size();
3456 if (!pg.snap_trimq_repeat.empty()) {
3457 out << "(" << pg.snap_trimq_repeat.size() << ")";
3458 }
3459 } else {
3460 out << pg.snap_trimq;
3461 if (!pg.snap_trimq_repeat.empty()) {
3462 out << "(" << pg.snap_trimq_repeat << ")";
3463 }
3464 }
3465 }
3466 if (!pg.recovery_state.get_info().purged_snaps.empty()) {
3467 out << " ps="; // snap trim queue / purged snaps
3468 if (pg.recovery_state.get_info().purged_snaps.num_intervals() > 16) {
3469 out << pg.recovery_state.get_info().purged_snaps.size();
3470 } else {
3471 out << pg.recovery_state.get_info().purged_snaps;
3472 }
3473 }
3474
3475 out << "]";
3476
3477
3478 return out;
3479 }
3480
3481 bool PG::can_discard_op(OpRequestRef& op)
3482 {
3483 auto m = op->get_req<MOSDOp>();
3484 if (cct->_conf->osd_discard_disconnected_ops && OSD::op_is_discardable(m)) {
3485 dout(20) << " discard " << *m << dendl;
3486 return true;
3487 }
3488
3489 if (m->get_map_epoch() < info.history.same_primary_since) {
3490 dout(7) << " changed after " << m->get_map_epoch()
3491 << ", dropping " << *m << dendl;
3492 return true;
3493 }
3494
3495 if ((m->get_flags() & (CEPH_OSD_FLAG_BALANCE_READS |
3496 CEPH_OSD_FLAG_LOCALIZE_READS)) &&
3497 !is_primary() &&
3498 m->get_map_epoch() < info.history.same_interval_since) {
3499 // Note: the Objecter will resend on interval change without the primary
3500 // changing if it actually sent to a replica. If the primary hasn't
3501 // changed since the send epoch, we got it, and we're primary, it won't
3502 // have resent even if the interval did change as it sent it to the primary
3503 // (us).
3504 return true;
3505 }
3506
3507
3508 if (m->get_connection()->has_feature(CEPH_FEATURE_RESEND_ON_SPLIT)) {
3509 // >= luminous client
3510 if (m->get_connection()->has_feature(CEPH_FEATURE_SERVER_NAUTILUS)) {
3511 // >= nautilus client
3512 if (m->get_map_epoch() < pool.info.get_last_force_op_resend()) {
3513 dout(7) << __func__ << " sent before last_force_op_resend "
3514 << pool.info.last_force_op_resend
3515 << ", dropping" << *m << dendl;
3516 return true;
3517 }
3518 } else {
3519 // == < nautilus client (luminous or mimic)
3520 if (m->get_map_epoch() < pool.info.get_last_force_op_resend_prenautilus()) {
3521 dout(7) << __func__ << " sent before last_force_op_resend_prenautilus "
3522 << pool.info.last_force_op_resend_prenautilus
3523 << ", dropping" << *m << dendl;
3524 return true;
3525 }
3526 }
3527 if (m->get_map_epoch() < info.history.last_epoch_split) {
3528 dout(7) << __func__ << " pg split in "
3529 << info.history.last_epoch_split << ", dropping" << dendl;
3530 return true;
3531 }
3532 } else if (m->get_connection()->has_feature(CEPH_FEATURE_OSD_POOLRESEND)) {
3533 // < luminous client
3534 if (m->get_map_epoch() < pool.info.get_last_force_op_resend_preluminous()) {
3535 dout(7) << __func__ << " sent before last_force_op_resend_preluminous "
3536 << pool.info.last_force_op_resend_preluminous
3537 << ", dropping" << *m << dendl;
3538 return true;
3539 }
3540 }
3541
3542 return false;
3543 }
3544
3545 template<typename T, int MSGTYPE>
3546 bool PG::can_discard_replica_op(OpRequestRef& op)
3547 {
3548 auto m = op->get_req<T>();
3549 ceph_assert(m->get_type() == MSGTYPE);
3550
3551 int from = m->get_source().num();
3552
3553 // if a repop is replied after a replica goes down in a new osdmap, and
3554 // before the pg advances to this new osdmap, the repop replies before this
3555 // repop can be discarded by that replica OSD, because the primary resets the
3556 // connection to it when handling the new osdmap marking it down, and also
3557 // resets the messenger sesssion when the replica reconnects. to avoid the
3558 // out-of-order replies, the messages from that replica should be discarded.
3559 OSDMapRef next_map = osd->get_next_osdmap();
3560 if (next_map->is_down(from))
3561 return true;
3562 /* Mostly, this overlaps with the old_peering_msg
3563 * condition. An important exception is pushes
3564 * sent by replicas not in the acting set, since
3565 * if such a replica goes down it does not cause
3566 * a new interval. */
3567 if (next_map->get_down_at(from) >= m->map_epoch)
3568 return true;
3569
3570 // same pg?
3571 // if pg changes _at all_, we reset and repeer!
3572 if (old_peering_msg(m->map_epoch, m->map_epoch)) {
3573 dout(10) << "can_discard_replica_op pg changed " << info.history
3574 << " after " << m->map_epoch
3575 << ", dropping" << dendl;
3576 return true;
3577 }
3578 return false;
3579 }
3580
3581 bool PG::can_discard_scan(OpRequestRef op)
3582 {
3583 auto m = op->get_req<MOSDPGScan>();
3584 ceph_assert(m->get_type() == MSG_OSD_PG_SCAN);
3585
3586 if (old_peering_msg(m->map_epoch, m->query_epoch)) {
3587 dout(10) << " got old scan, ignoring" << dendl;
3588 return true;
3589 }
3590 return false;
3591 }
3592
3593 bool PG::can_discard_backfill(OpRequestRef op)
3594 {
3595 auto m = op->get_req<MOSDPGBackfill>();
3596 ceph_assert(m->get_type() == MSG_OSD_PG_BACKFILL);
3597
3598 if (old_peering_msg(m->map_epoch, m->query_epoch)) {
3599 dout(10) << " got old backfill, ignoring" << dendl;
3600 return true;
3601 }
3602
3603 return false;
3604
3605 }
3606
3607 bool PG::can_discard_request(OpRequestRef& op)
3608 {
3609 switch (op->get_req()->get_type()) {
3610 case CEPH_MSG_OSD_OP:
3611 return can_discard_op(op);
3612 case CEPH_MSG_OSD_BACKOFF:
3613 return false; // never discard
3614 case MSG_OSD_REPOP:
3615 return can_discard_replica_op<MOSDRepOp, MSG_OSD_REPOP>(op);
3616 case MSG_OSD_PG_PUSH:
3617 return can_discard_replica_op<MOSDPGPush, MSG_OSD_PG_PUSH>(op);
3618 case MSG_OSD_PG_PULL:
3619 return can_discard_replica_op<MOSDPGPull, MSG_OSD_PG_PULL>(op);
3620 case MSG_OSD_PG_PUSH_REPLY:
3621 return can_discard_replica_op<MOSDPGPushReply, MSG_OSD_PG_PUSH_REPLY>(op);
3622 case MSG_OSD_REPOPREPLY:
3623 return can_discard_replica_op<MOSDRepOpReply, MSG_OSD_REPOPREPLY>(op);
3624 case MSG_OSD_PG_RECOVERY_DELETE:
3625 return can_discard_replica_op<MOSDPGRecoveryDelete, MSG_OSD_PG_RECOVERY_DELETE>(op);
3626
3627 case MSG_OSD_PG_RECOVERY_DELETE_REPLY:
3628 return can_discard_replica_op<MOSDPGRecoveryDeleteReply, MSG_OSD_PG_RECOVERY_DELETE_REPLY>(op);
3629
3630 case MSG_OSD_EC_WRITE:
3631 return can_discard_replica_op<MOSDECSubOpWrite, MSG_OSD_EC_WRITE>(op);
3632 case MSG_OSD_EC_WRITE_REPLY:
3633 return can_discard_replica_op<MOSDECSubOpWriteReply, MSG_OSD_EC_WRITE_REPLY>(op);
3634 case MSG_OSD_EC_READ:
3635 return can_discard_replica_op<MOSDECSubOpRead, MSG_OSD_EC_READ>(op);
3636 case MSG_OSD_EC_READ_REPLY:
3637 return can_discard_replica_op<MOSDECSubOpReadReply, MSG_OSD_EC_READ_REPLY>(op);
3638 case MSG_OSD_REP_SCRUB:
3639 return can_discard_replica_op<MOSDRepScrub, MSG_OSD_REP_SCRUB>(op);
3640 case MSG_OSD_SCRUB_RESERVE:
3641 return can_discard_replica_op<MOSDScrubReserve, MSG_OSD_SCRUB_RESERVE>(op);
3642 case MSG_OSD_REP_SCRUBMAP:
3643 return can_discard_replica_op<MOSDRepScrubMap, MSG_OSD_REP_SCRUBMAP>(op);
3644 case MSG_OSD_PG_UPDATE_LOG_MISSING:
3645 return can_discard_replica_op<
3646 MOSDPGUpdateLogMissing, MSG_OSD_PG_UPDATE_LOG_MISSING>(op);
3647 case MSG_OSD_PG_UPDATE_LOG_MISSING_REPLY:
3648 return can_discard_replica_op<
3649 MOSDPGUpdateLogMissingReply, MSG_OSD_PG_UPDATE_LOG_MISSING_REPLY>(op);
3650
3651 case MSG_OSD_PG_SCAN:
3652 return can_discard_scan(op);
3653 case MSG_OSD_PG_BACKFILL:
3654 return can_discard_backfill(op);
3655 case MSG_OSD_PG_BACKFILL_REMOVE:
3656 return can_discard_replica_op<MOSDPGBackfillRemove,
3657 MSG_OSD_PG_BACKFILL_REMOVE>(op);
3658 }
3659 return true;
3660 }
3661
3662 void PG::do_peering_event(PGPeeringEventRef evt, PeeringCtx &rctx)
3663 {
3664 dout(10) << __func__ << ": " << evt->get_desc() << dendl;
3665 ceph_assert(have_same_or_newer_map(evt->get_epoch_sent()));
3666 if (old_peering_evt(evt)) {
3667 dout(10) << "discard old " << evt->get_desc() << dendl;
3668 } else {
3669 recovery_state.handle_event(evt, &rctx);
3670 }
3671 // write_if_dirty regardless of path above to ensure we capture any work
3672 // done by OSD::advance_pg().
3673 write_if_dirty(rctx.transaction);
3674 }
3675
3676 void PG::queue_peering_event(PGPeeringEventRef evt)
3677 {
3678 if (old_peering_evt(evt))
3679 return;
3680 osd->osd->enqueue_peering_evt(info.pgid, evt);
3681 }
3682
3683 void PG::queue_null(epoch_t msg_epoch,
3684 epoch_t query_epoch)
3685 {
3686 dout(10) << "null" << dendl;
3687 queue_peering_event(
3688 PGPeeringEventRef(std::make_shared<PGPeeringEvent>(msg_epoch, query_epoch,
3689 NullEvt())));
3690 }
3691
3692 void PG::find_unfound(epoch_t queued, PeeringCtx &rctx)
3693 {
3694 /*
3695 * if we couldn't start any recovery ops and things are still
3696 * unfound, see if we can discover more missing object locations.
3697 * It may be that our initial locations were bad and we errored
3698 * out while trying to pull.
3699 */
3700 if (!recovery_state.discover_all_missing(rctx)) {
3701 string action;
3702 if (state_test(PG_STATE_BACKFILLING)) {
3703 auto evt = PGPeeringEventRef(
3704 new PGPeeringEvent(
3705 queued,
3706 queued,
3707 PeeringState::UnfoundBackfill()));
3708 queue_peering_event(evt);
3709 action = "in backfill";
3710 } else if (state_test(PG_STATE_RECOVERING)) {
3711 auto evt = PGPeeringEventRef(
3712 new PGPeeringEvent(
3713 queued,
3714 queued,
3715 PeeringState::UnfoundRecovery()));
3716 queue_peering_event(evt);
3717 action = "in recovery";
3718 } else {
3719 action = "already out of recovery/backfill";
3720 }
3721 dout(10) << __func__ << ": no luck, giving up on this pg for now (" << action << ")" << dendl;
3722 } else {
3723 dout(10) << __func__ << ": no luck, giving up on this pg for now (queue_recovery)" << dendl;
3724 queue_recovery();
3725 }
3726 }
3727
3728 void PG::handle_advance_map(
3729 OSDMapRef osdmap, OSDMapRef lastmap,
3730 vector<int>& newup, int up_primary,
3731 vector<int>& newacting, int acting_primary,
3732 PeeringCtx &rctx)
3733 {
3734 dout(10) << __func__ << ": " << osdmap->get_epoch() << dendl;
3735 osd_shard->update_pg_epoch(pg_slot, osdmap->get_epoch());
3736 recovery_state.advance_map(
3737 osdmap,
3738 lastmap,
3739 newup,
3740 up_primary,
3741 newacting,
3742 acting_primary,
3743 rctx);
3744 }
3745
3746 void PG::handle_activate_map(PeeringCtx &rctx)
3747 {
3748 dout(10) << __func__ << ": " << get_osdmap()->get_epoch()
3749 << dendl;
3750 recovery_state.activate_map(rctx);
3751
3752 requeue_map_waiters();
3753 }
3754
3755 void PG::handle_initialize(PeeringCtx &rctx)
3756 {
3757 dout(10) << __func__ << dendl;
3758 PeeringState::Initialize evt;
3759 recovery_state.handle_event(evt, &rctx);
3760 }
3761
3762 void PG::handle_query_state(Formatter *f)
3763 {
3764 dout(10) << "handle_query_state" << dendl;
3765 PeeringState::QueryState q(f);
3766 recovery_state.handle_event(q, 0);
3767
3768 if (is_primary() && is_active()) {
3769 f->open_object_section("scrub");
3770 f->dump_stream("scrubber.epoch_start") << scrubber.epoch_start;
3771 f->dump_bool("scrubber.active", scrubber.active);
3772 f->dump_string("scrubber.state", PG::Scrubber::state_string(scrubber.state));
3773 f->dump_stream("scrubber.start") << scrubber.start;
3774 f->dump_stream("scrubber.end") << scrubber.end;
3775 f->dump_stream("scrubber.max_end") << scrubber.max_end;
3776 f->dump_stream("scrubber.subset_last_update") << scrubber.subset_last_update;
3777 f->dump_bool("scrubber.deep", scrubber.deep);
3778 {
3779 f->open_array_section("scrubber.waiting_on_whom");
3780 for (set<pg_shard_t>::iterator p = scrubber.waiting_on_whom.begin();
3781 p != scrubber.waiting_on_whom.end();
3782 ++p) {
3783 f->dump_stream("shard") << *p;
3784 }
3785 f->close_section();
3786 }
3787 f->close_section();
3788 }
3789 }
3790
3791 void PG::init_collection_pool_opts()
3792 {
3793 auto r = osd->store->set_collection_opts(ch, pool.info.opts);
3794 if (r < 0 && r != -EOPNOTSUPP) {
3795 derr << __func__ << " set_collection_opts returns error:" << r << dendl;
3796 }
3797 }
3798
3799 void PG::on_pool_change()
3800 {
3801 init_collection_pool_opts();
3802 plpg_on_pool_change();
3803 }
3804
3805 void PG::C_DeleteMore::complete(int r) {
3806 ceph_assert(r == 0);
3807 pg->lock();
3808 if (!pg->pg_has_reset_since(epoch)) {
3809 pg->osd->queue_for_pg_delete(pg->get_pgid(), epoch);
3810 }
3811 pg->unlock();
3812 delete this;
3813 }
3814
3815 ghobject_t PG::do_delete_work(ObjectStore::Transaction &t,
3816 ghobject_t _next)
3817 {
3818 dout(10) << __func__ << dendl;
3819
3820 {
3821 float osd_delete_sleep = osd->osd->get_osd_delete_sleep();
3822 if (osd_delete_sleep > 0 && delete_needs_sleep) {
3823 epoch_t e = get_osdmap()->get_epoch();
3824 PGRef pgref(this);
3825 auto delete_requeue_callback = new LambdaContext([this, pgref, e](int r) {
3826 dout(20) << __func__ << " wake up at "
3827 << ceph_clock_now()
3828 << ", re-queuing delete" << dendl;
3829 std::scoped_lock locker{*this};
3830 delete_needs_sleep = false;
3831 if (!pg_has_reset_since(e)) {
3832 osd->queue_for_pg_delete(get_pgid(), e);
3833 }
3834 });
3835
3836 auto delete_schedule_time = ceph::real_clock::now();
3837 delete_schedule_time += ceph::make_timespan(osd_delete_sleep);
3838 std::lock_guard l{osd->sleep_lock};
3839 osd->sleep_timer.add_event_at(delete_schedule_time,
3840 delete_requeue_callback);
3841 dout(20) << __func__ << " Delete scheduled at " << delete_schedule_time << dendl;
3842 return _next;
3843 }
3844 }
3845
3846 delete_needs_sleep = true;
3847
3848 ghobject_t next;
3849
3850 vector<ghobject_t> olist;
3851 int max = std::min(osd->store->get_ideal_list_max(),
3852 (int)cct->_conf->osd_target_transaction_size);
3853
3854 osd->store->collection_list(
3855 ch,
3856 _next,
3857 ghobject_t::get_max(),
3858 max,
3859 &olist,
3860 &next);
3861 dout(20) << __func__ << " " << olist << dendl;
3862
3863 // make sure we've removed everything
3864 // by one more listing from the beginning
3865 if (_next != ghobject_t() && olist.empty()) {
3866 next = ghobject_t();
3867 osd->store->collection_list(
3868 ch,
3869 next,
3870 ghobject_t::get_max(),
3871 max,
3872 &olist,
3873 &next);
3874 if (!olist.empty()) {
3875 dout(0) << __func__ << " additional unexpected onode list"
3876 <<" (new onodes has appeared since PG removal started"
3877 << olist << dendl;
3878 }
3879 }
3880
3881 OSDriver::OSTransaction _t(osdriver.get_transaction(&t));
3882 int64_t num = 0;
3883 for (auto& oid : olist) {
3884 if (oid == pgmeta_oid) {
3885 continue;
3886 }
3887 if (oid.is_pgmeta()) {
3888 osd->clog->warn() << info.pgid << " found stray pgmeta-like " << oid
3889 << " during PG removal";
3890 }
3891 int r = snap_mapper.remove_oid(oid.hobj, &_t);
3892 if (r != 0 && r != -ENOENT) {
3893 ceph_abort();
3894 }
3895 t.remove(coll, oid);
3896 ++num;
3897 }
3898 if (num) {
3899 dout(20) << __func__ << " deleting " << num << " objects" << dendl;
3900 Context *fin = new C_DeleteMore(this, get_osdmap_epoch());
3901 t.register_on_commit(fin);
3902 } else {
3903 if (cct->_conf->osd_inject_failure_on_pg_removal) {
3904 _exit(1);
3905 }
3906
3907 // final flush here to ensure completions drop refs. Of particular concern
3908 // are the SnapMapper ContainerContexts.
3909 {
3910 PGRef pgref(this);
3911 PGLog::clear_info_log(info.pgid, &t);
3912 t.remove_collection(coll);
3913 t.register_on_commit(new ContainerContext<PGRef>(pgref));
3914 t.register_on_applied(new ContainerContext<PGRef>(pgref));
3915 osd->store->queue_transaction(ch, std::move(t));
3916 }
3917 ch->flush();
3918
3919 if (!osd->try_finish_pg_delete(this, pool.info.get_pg_num())) {
3920 dout(1) << __func__ << " raced with merge, reinstantiating" << dendl;
3921 ch = osd->store->create_new_collection(coll);
3922 create_pg_collection(t,
3923 info.pgid,
3924 info.pgid.get_split_bits(pool.info.get_pg_num()));
3925 init_pg_ondisk(t, info.pgid, &pool.info);
3926 recovery_state.reset_last_persisted();
3927 } else {
3928 recovery_state.set_delete_complete();
3929
3930 // cancel reserver here, since the PG is about to get deleted and the
3931 // exit() methods don't run when that happens.
3932 osd->local_reserver.cancel_reservation(info.pgid);
3933
3934 osd->logger->dec(l_osd_pg_removing);
3935 }
3936 }
3937 return next;
3938 }
3939
3940 int PG::pg_stat_adjust(osd_stat_t *ns)
3941 {
3942 osd_stat_t &new_stat = *ns;
3943 if (is_primary()) {
3944 return 0;
3945 }
3946 // Adjust the kb_used by adding pending backfill data
3947 uint64_t reserved_num_bytes = get_reserved_num_bytes();
3948
3949 // For now we don't consider projected space gains here
3950 // I suggest we have an optional 2 pass backfill that frees up
3951 // space in a first pass. This could be triggered when at nearfull
3952 // or near to backfillfull.
3953 if (reserved_num_bytes > 0) {
3954 // TODO: Handle compression by adjusting by the PGs average
3955 // compression precentage.
3956 dout(20) << __func__ << " reserved_num_bytes " << (reserved_num_bytes >> 10) << "KiB"
3957 << " Before kb_used " << new_stat.statfs.kb_used() << "KiB" << dendl;
3958 if (new_stat.statfs.available > reserved_num_bytes)
3959 new_stat.statfs.available -= reserved_num_bytes;
3960 else
3961 new_stat.statfs.available = 0;
3962 dout(20) << __func__ << " After kb_used " << new_stat.statfs.kb_used() << "KiB" << dendl;
3963 return 1;
3964 }
3965 return 0;
3966 }
3967
3968 ostream& operator<<(ostream& out, const PG::BackfillInterval& bi)
3969 {
3970 out << "BackfillInfo(" << bi.begin << "-" << bi.end
3971 << " " << bi.objects.size() << " objects";
3972 if (!bi.objects.empty())
3973 out << " " << bi.objects;
3974 out << ")";
3975 return out;
3976 }
3977
3978 void PG::dump_pgstate_history(Formatter *f)
3979 {
3980 std::scoped_lock l{*this};
3981 recovery_state.dump_history(f);
3982 }
3983
3984 void PG::dump_missing(Formatter *f)
3985 {
3986 for (auto& i : recovery_state.get_pg_log().get_missing().get_items()) {
3987 f->open_object_section("object");
3988 f->dump_object("oid", i.first);
3989 f->dump_object("missing_info", i.second);
3990 if (recovery_state.get_missing_loc().needs_recovery(i.first)) {
3991 f->dump_bool(
3992 "unfound",
3993 recovery_state.get_missing_loc().is_unfound(i.first));
3994 f->open_array_section("locations");
3995 for (auto l : recovery_state.get_missing_loc().get_locations(i.first)) {
3996 f->dump_object("shard", l);
3997 }
3998 f->close_section();
3999 }
4000 f->close_section();
4001 }
4002 }
4003
4004 void PG::get_pg_stats(std::function<void(const pg_stat_t&, epoch_t lec)> f)
4005 {
4006 std::lock_guard l{pg_stats_publish_lock};
4007 if (pg_stats_publish_valid) {
4008 f(pg_stats_publish, pg_stats_publish.get_effective_last_epoch_clean());
4009 }
4010 }
4011
4012 void PG::with_heartbeat_peers(std::function<void(int)> f)
4013 {
4014 std::lock_guard l{heartbeat_peer_lock};
4015 for (auto p : heartbeat_peers) {
4016 f(p);
4017 }
4018 for (auto p : probe_targets) {
4019 f(p);
4020 }
4021 }
4022
4023 uint64_t PG::get_min_alloc_size() const {
4024 return osd->store->get_min_alloc_size();
4025 }