]> git.proxmox.com Git - ceph.git/blob - ceph/src/osd/scrubber/pg_scrubber.cc
25e4a83d9e8b4974f0315bcfbdd9a38bf7f6a838
[ceph.git] / ceph / src / osd / scrubber / pg_scrubber.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=2 sw=2 smarttab
3
4 #include "./pg_scrubber.h" // the '.' notation used to affect clang-format order
5
6 #include <cmath>
7 #include <iostream>
8 #include <vector>
9
10 #include "debug.h"
11
12 #include "common/errno.h"
13 #include "messages/MOSDOp.h"
14 #include "messages/MOSDRepScrub.h"
15 #include "messages/MOSDRepScrubMap.h"
16 #include "messages/MOSDScrub.h"
17 #include "messages/MOSDScrubReserve.h"
18
19 #include "osd/OSD.h"
20 #include "osd/osd_types_fmt.h"
21 #include "ScrubStore.h"
22 #include "scrub_machine.h"
23
24 using std::list;
25 using std::map;
26 using std::pair;
27 using std::set;
28 using std::stringstream;
29 using std::vector;
30 using namespace Scrub;
31 using namespace std::chrono;
32 using namespace std::chrono_literals;
33 using namespace std::literals;
34
35 #define dout_context (m_osds->cct)
36 #define dout_subsys ceph_subsys_osd
37 #undef dout_prefix
38 #define dout_prefix _prefix(_dout, this)
39
40 template <class T>
41 static ostream& _prefix(std::ostream* _dout, T* t)
42 {
43 return t->gen_prefix(*_dout);
44 }
45
46 ostream& operator<<(ostream& out, const scrub_flags_t& sf)
47 {
48 if (sf.auto_repair)
49 out << " AUTO_REPAIR";
50 if (sf.check_repair)
51 out << " CHECK_REPAIR";
52 if (sf.deep_scrub_on_error)
53 out << " DEEP_SCRUB_ON_ERROR";
54 if (sf.required)
55 out << " REQ_SCRUB";
56
57 return out;
58 }
59
60 ostream& operator<<(ostream& out, const requested_scrub_t& sf)
61 {
62 if (sf.must_repair)
63 out << " MUST_REPAIR";
64 if (sf.auto_repair)
65 out << " planned AUTO_REPAIR";
66 if (sf.check_repair)
67 out << " planned CHECK_REPAIR";
68 if (sf.deep_scrub_on_error)
69 out << " planned DEEP_SCRUB_ON_ERROR";
70 if (sf.must_deep_scrub)
71 out << " MUST_DEEP_SCRUB";
72 if (sf.must_scrub)
73 out << " MUST_SCRUB";
74 if (sf.time_for_deep)
75 out << " TIME_FOR_DEEP";
76 if (sf.need_auto)
77 out << " NEED_AUTO";
78 if (sf.req_scrub)
79 out << " planned REQ_SCRUB";
80
81 return out;
82 }
83
84 /*
85 * if the incoming message is from a previous interval, it must mean
86 * PrimaryLogPG::on_change() was called when that interval ended. We can safely discard
87 * the stale message.
88 */
89 bool PgScrubber::check_interval(epoch_t epoch_to_verify)
90 {
91 return epoch_to_verify >= m_pg->get_same_interval_since();
92 }
93
94 bool PgScrubber::is_message_relevant(epoch_t epoch_to_verify)
95 {
96 if (!m_active) {
97 // not scrubbing. We can assume that the scrub was already terminated, and we
98 // can silently discard the incoming event.
99 return false;
100 }
101
102 // is this a message from before we started this scrub?
103 if (epoch_to_verify < m_epoch_start) {
104 return false;
105 }
106
107 // has a new interval started?
108 if (!check_interval(epoch_to_verify)) {
109 // if this is a new interval, on_change() has already terminated that
110 // old scrub.
111 return false;
112 }
113
114 ceph_assert(is_primary());
115
116 // were we instructed to abort?
117 return verify_against_abort(epoch_to_verify);
118 }
119
120 bool PgScrubber::verify_against_abort(epoch_t epoch_to_verify)
121 {
122 if (!should_abort()) {
123 return true;
124 }
125
126 dout(10) << __func__ << " aborting. incoming epoch: " << epoch_to_verify
127 << " vs last-aborted: " << m_last_aborted << dendl;
128
129 // if we were not aware of the abort before - kill the scrub.
130 if (epoch_to_verify > m_last_aborted) {
131 scrub_clear_state();
132 m_last_aborted = std::max(epoch_to_verify, m_epoch_start);
133 }
134 return false;
135 }
136
137 bool PgScrubber::should_abort() const
138 {
139 if (m_flags.required) {
140 return false; // not stopping 'required' scrubs for configuration changes
141 }
142
143 if (m_is_deep) {
144 if (get_osdmap()->test_flag(CEPH_OSDMAP_NODEEP_SCRUB) ||
145 m_pg->pool.info.has_flag(pg_pool_t::FLAG_NODEEP_SCRUB)) {
146 dout(10) << "nodeep_scrub set, aborting" << dendl;
147 return true;
148 }
149 }
150
151 if (get_osdmap()->test_flag(CEPH_OSDMAP_NOSCRUB) ||
152 m_pg->pool.info.has_flag(pg_pool_t::FLAG_NOSCRUB)) {
153 dout(10) << "noscrub set, aborting" << dendl;
154 return true;
155 }
156
157 return false;
158 }
159
160 // initiating state-machine events --------------------------------
161
162 /*
163 * a note re the checks performed before sending scrub-initiating messages:
164 *
165 * For those ('StartScrub', 'AfterRepairScrub') scrub-initiation messages that
166 * possibly were in the queue while the PG changed state and became unavailable for
167 * scrubbing:
168 *
169 * The check_interval() catches all major changes to the PG. As for the other conditions
170 * we may check (and see is_message_relevant() above):
171 *
172 * - we are not 'active' yet, so must not check against is_active(), and:
173 *
174 * - the 'abort' flags were just verified (when the triggering message was queued). As
175 * those are only modified in human speeds - they need not be queried again.
176 *
177 * Some of the considerations above are also relevant to the replica-side initiation
178 * ('StartReplica' & 'StartReplicaNoWait').
179 */
180
181 void PgScrubber::initiate_regular_scrub(epoch_t epoch_queued)
182 {
183 dout(15) << __func__ << " epoch: " << epoch_queued << dendl;
184 // we may have lost our Primary status while the message languished in the queue
185 if (check_interval(epoch_queued)) {
186 dout(10) << "scrubber event -->> StartScrub epoch: " << epoch_queued << dendl;
187 reset_epoch(epoch_queued);
188 m_fsm->process_event(StartScrub{});
189 dout(10) << "scrubber event --<< StartScrub" << dendl;
190 } else {
191 clear_queued_or_active(); // also restarts snap trimming
192 }
193 }
194
195 void PgScrubber::initiate_scrub_after_repair(epoch_t epoch_queued)
196 {
197 dout(15) << __func__ << " epoch: " << epoch_queued << dendl;
198 // we may have lost our Primary status while the message languished in the queue
199 if (check_interval(epoch_queued)) {
200 dout(10) << "scrubber event -->> AfterRepairScrub epoch: " << epoch_queued << dendl;
201 reset_epoch(epoch_queued);
202 m_fsm->process_event(AfterRepairScrub{});
203 dout(10) << "scrubber event --<< AfterRepairScrub" << dendl;
204 } else {
205 clear_queued_or_active(); // also restarts snap trimming
206 }
207 }
208
209 void PgScrubber::send_scrub_unblock(epoch_t epoch_queued)
210 {
211 dout(10) << "scrubber event -->> " << __func__ << " epoch: " << epoch_queued << dendl;
212 if (is_message_relevant(epoch_queued)) {
213 m_fsm->process_event(Unblocked{});
214 }
215 dout(10) << "scrubber event --<< " << __func__ << dendl;
216 }
217
218 void PgScrubber::send_scrub_resched(epoch_t epoch_queued)
219 {
220 dout(10) << "scrubber event -->> " << __func__ << " epoch: " << epoch_queued << dendl;
221 if (is_message_relevant(epoch_queued)) {
222 m_fsm->process_event(InternalSchedScrub{});
223 }
224 dout(10) << "scrubber event --<< " << __func__ << dendl;
225 }
226
227 void PgScrubber::send_start_replica(epoch_t epoch_queued, Scrub::act_token_t token)
228 {
229 dout(10) << "scrubber event -->> " << __func__ << " epoch: " << epoch_queued
230 << " token: " << token << dendl;
231 if (is_primary()) {
232 // shouldn't happen. Ignore
233 dout(1) << "got a replica scrub request while Primary!" << dendl;
234 return;
235 }
236
237 if (check_interval(epoch_queued) && is_token_current(token)) {
238 // save us some time by not waiting for updates if there are none
239 // to wait for. Affects the transition from NotActive into either
240 // ReplicaWaitUpdates or ActiveReplica.
241 if (pending_active_pushes())
242 m_fsm->process_event(StartReplica{});
243 else
244 m_fsm->process_event(StartReplicaNoWait{});
245 }
246 dout(10) << "scrubber event --<< " << __func__ << dendl;
247 }
248
249 void PgScrubber::send_sched_replica(epoch_t epoch_queued, Scrub::act_token_t token)
250 {
251 dout(10) << "scrubber event -->> " << __func__ << " epoch: " << epoch_queued
252 << " token: " << token << dendl;
253 if (check_interval(epoch_queued) && is_token_current(token)) {
254 m_fsm->process_event(SchedReplica{}); // retest for map availability
255 }
256 dout(10) << "scrubber event --<< " << __func__ << dendl;
257 }
258
259 void PgScrubber::active_pushes_notification(epoch_t epoch_queued)
260 {
261 // note: Primary only
262 dout(10) << "scrubber event -->> " << __func__ << " epoch: " << epoch_queued << dendl;
263 if (is_message_relevant(epoch_queued)) {
264 m_fsm->process_event(ActivePushesUpd{});
265 }
266 dout(10) << "scrubber event --<< " << __func__ << dendl;
267 }
268
269 void PgScrubber::update_applied_notification(epoch_t epoch_queued)
270 {
271 // note: Primary only
272 dout(10) << "scrubber event -->> " << __func__ << " epoch: " << epoch_queued << dendl;
273 if (is_message_relevant(epoch_queued)) {
274 m_fsm->process_event(UpdatesApplied{});
275 }
276 dout(10) << "scrubber event --<< " << __func__ << dendl;
277 }
278
279 void PgScrubber::digest_update_notification(epoch_t epoch_queued)
280 {
281 // note: Primary only
282 dout(10) << "scrubber event -->> " << __func__ << " epoch: " << epoch_queued << dendl;
283 if (is_message_relevant(epoch_queued)) {
284 m_fsm->process_event(DigestUpdate{});
285 }
286 dout(10) << "scrubber event --<< " << __func__ << dendl;
287 }
288
289 void PgScrubber::send_local_map_done(epoch_t epoch_queued)
290 {
291 dout(10) << "scrubber event -->> " << __func__ << " epoch: " << epoch_queued << dendl;
292 if (is_message_relevant(epoch_queued)) {
293 m_fsm->process_event(Scrub::IntLocalMapDone{});
294 }
295 dout(10) << "scrubber event --<< " << __func__ << dendl;
296 }
297
298 void PgScrubber::send_replica_maps_ready(epoch_t epoch_queued)
299 {
300 dout(10) << "scrubber event -->> " << __func__ << " epoch: " << epoch_queued << dendl;
301 if (is_message_relevant(epoch_queued)) {
302 m_fsm->process_event(GotReplicas{});
303 }
304 dout(10) << "scrubber event --<< " << __func__ << dendl;
305 }
306
307 void PgScrubber::send_replica_pushes_upd(epoch_t epoch_queued)
308 {
309 dout(10) << "scrubber event -->> " << __func__ << " epoch: " << epoch_queued << dendl;
310 if (check_interval(epoch_queued)) {
311 m_fsm->process_event(ReplicaPushesUpd{});
312 }
313 dout(10) << "scrubber event --<< " << __func__ << dendl;
314 }
315
316 void PgScrubber::send_remotes_reserved(epoch_t epoch_queued)
317 {
318 dout(10) << "scrubber event -->> " << __func__ << " epoch: " << epoch_queued << dendl;
319 // note: scrub is not active yet
320 if (check_interval(epoch_queued)) {
321 m_fsm->process_event(RemotesReserved{});
322 }
323 dout(10) << "scrubber event --<< " << __func__ << dendl;
324 }
325
326 void PgScrubber::send_reservation_failure(epoch_t epoch_queued)
327 {
328 dout(10) << "scrubber event -->> " << __func__ << " epoch: " << epoch_queued << dendl;
329 if (check_interval(epoch_queued)) { // do not check for 'active'!
330 m_fsm->process_event(ReservationFailure{});
331 }
332 dout(10) << "scrubber event --<< " << __func__ << dendl;
333 }
334
335 void PgScrubber::send_full_reset(epoch_t epoch_queued)
336 {
337 dout(10) << "scrubber event -->> " << __func__ << " epoch: " << epoch_queued << dendl;
338
339 m_fsm->process_event(Scrub::FullReset{});
340
341 dout(10) << "scrubber event --<< " << __func__ << dendl;
342 }
343
344 void PgScrubber::send_chunk_free(epoch_t epoch_queued)
345 {
346 dout(10) << "scrubber event -->> " << __func__ << " epoch: " << epoch_queued << dendl;
347 if (check_interval(epoch_queued)) {
348 m_fsm->process_event(Scrub::SelectedChunkFree{});
349 }
350 dout(10) << "scrubber event --<< " << __func__ << dendl;
351 }
352
353 void PgScrubber::send_chunk_busy(epoch_t epoch_queued)
354 {
355 dout(10) << "scrubber event -->> " << __func__ << " epoch: " << epoch_queued << dendl;
356 if (check_interval(epoch_queued)) {
357 m_fsm->process_event(Scrub::ChunkIsBusy{});
358 }
359 dout(10) << "scrubber event --<< " << __func__ << dendl;
360 }
361
362 void PgScrubber::send_get_next_chunk(epoch_t epoch_queued)
363 {
364 dout(10) << "scrubber event -->> " << __func__ << " epoch: " << epoch_queued << dendl;
365 if (is_message_relevant(epoch_queued)) {
366 m_fsm->process_event(Scrub::NextChunk{});
367 }
368 dout(10) << "scrubber event --<< " << __func__ << dendl;
369 }
370
371 void PgScrubber::send_scrub_is_finished(epoch_t epoch_queued)
372 {
373 dout(10) << "scrubber event -->> " << __func__ << " epoch: " << epoch_queued << dendl;
374
375 // can't check for "active"
376
377 m_fsm->process_event(Scrub::ScrubFinished{});
378
379 dout(10) << "scrubber event --<< " << __func__ << dendl;
380 }
381
382 void PgScrubber::send_maps_compared(epoch_t epoch_queued)
383 {
384 dout(10) << "scrubber event -->> " << __func__ << " epoch: " << epoch_queued << dendl;
385
386 m_fsm->process_event(Scrub::MapsCompared{});
387
388 dout(10) << "scrubber event --<< " << __func__ << dendl;
389 }
390
391 // -----------------
392
393 bool PgScrubber::is_reserving() const
394 {
395 return m_fsm->is_reserving();
396 }
397
398 void PgScrubber::reset_epoch(epoch_t epoch_queued)
399 {
400 dout(10) << __func__ << " state deep? " << state_test(PG_STATE_DEEP_SCRUB) << dendl;
401 m_fsm->assert_not_active();
402
403 m_epoch_start = epoch_queued;
404 m_needs_sleep = true;
405 m_is_deep = state_test(PG_STATE_DEEP_SCRUB);
406 update_op_mode_text();
407 }
408
409 unsigned int PgScrubber::scrub_requeue_priority(Scrub::scrub_prio_t with_priority) const
410 {
411 unsigned int qu_priority = m_flags.priority;
412
413 if (with_priority == Scrub::scrub_prio_t::high_priority) {
414 qu_priority =
415 std::max(qu_priority, (unsigned int)m_pg->get_cct()->_conf->osd_client_op_priority);
416 }
417 return qu_priority;
418 }
419
420 unsigned int PgScrubber::scrub_requeue_priority(Scrub::scrub_prio_t with_priority,
421 unsigned int suggested_priority) const
422 {
423 if (with_priority == Scrub::scrub_prio_t::high_priority) {
424 suggested_priority = std::max(suggested_priority,
425 (unsigned int)m_pg->cct->_conf->osd_client_op_priority);
426 }
427 return suggested_priority;
428 }
429
430 // ///////////////////////////////////////////////////////////////////// //
431 // scrub-op registration handling
432
433 void PgScrubber::unregister_from_osd()
434 {
435 if (m_scrub_job) {
436 dout(15) << __func__ << " prev. state: " << registration_state() << dendl;
437 m_osds->get_scrub_services().remove_from_osd_queue(m_scrub_job);
438 }
439 }
440
441 bool PgScrubber::is_scrub_registered() const
442 {
443 return m_scrub_job && m_scrub_job->in_queues;
444 }
445
446 std::string_view PgScrubber::registration_state() const
447 {
448 if (m_scrub_job) {
449 return m_scrub_job->registration_state();
450 }
451 return "(no sched job)"sv;
452 }
453
454 void PgScrubber::rm_from_osd_scrubbing()
455 {
456 // make sure the OSD won't try to scrub this one just now
457 unregister_from_osd();
458 }
459
460 void PgScrubber::on_primary_change(const requested_scrub_t& request_flags)
461 {
462 dout(10) << __func__ << (is_primary() ? " Primary " : " Replica ")
463 << " flags: " << request_flags << dendl;
464
465 if (!m_scrub_job) {
466 return;
467 }
468
469 dout(15) << __func__ << " scrub-job state: " << m_scrub_job->state_desc() << dendl;
470
471 if (is_primary()) {
472 auto suggested = determine_scrub_time(request_flags);
473 m_osds->get_scrub_services().register_with_osd(m_scrub_job, suggested);
474 } else {
475 m_osds->get_scrub_services().remove_from_osd_queue(m_scrub_job);
476 }
477
478 dout(15) << __func__ << " done " << registration_state() << dendl;
479 }
480
481 void PgScrubber::on_maybe_registration_change(const requested_scrub_t& request_flags)
482 {
483 dout(10) << __func__ << (is_primary() ? " Primary " : " Replica/other ")
484 << registration_state() << " flags: " << request_flags << dendl;
485
486 on_primary_change(request_flags);
487 dout(15) << __func__ << " done " << registration_state() << dendl;
488 }
489
490 void PgScrubber::update_scrub_job(const requested_scrub_t& request_flags)
491 {
492 dout(10) << __func__ << " flags: " << request_flags << dendl;
493
494 {
495 // verify that the 'in_q' status matches our "Primariority"
496 if (m_scrub_job && is_primary() && !m_scrub_job->in_queues) {
497 dout(1) << __func__ << " !!! primary but not scheduled! " << dendl;
498 }
499 }
500
501 if (is_primary() && m_scrub_job) {
502 auto suggested = determine_scrub_time(request_flags);
503 m_osds->get_scrub_services().update_job(m_scrub_job, suggested);
504 }
505
506 dout(15) << __func__ << " done " << registration_state() << dendl;
507 }
508
509 ScrubQueue::sched_params_t
510 PgScrubber::determine_scrub_time(const requested_scrub_t& request_flags) const
511 {
512 ScrubQueue::sched_params_t res;
513
514 if (!is_primary()) {
515 return res; // with ok_to_scrub set to 'false'
516 }
517
518 if (request_flags.must_scrub || request_flags.need_auto) {
519
520 // Set the smallest time that isn't utime_t()
521 res.proposed_time = PgScrubber::scrub_must_stamp();
522 res.is_must = ScrubQueue::must_scrub_t::mandatory;
523 // we do not need the interval data in this case
524
525 } else if (m_pg->info.stats.stats_invalid &&
526 m_pg->cct->_conf->osd_scrub_invalid_stats) {
527 res.proposed_time = ceph_clock_now();
528 res.is_must = ScrubQueue::must_scrub_t::mandatory;
529
530 } else {
531 res.proposed_time = m_pg->info.history.last_scrub_stamp;
532 res.min_interval =
533 m_pg->get_pool().info.opts.value_or(pool_opts_t::SCRUB_MIN_INTERVAL, 0.0);
534 res.max_interval =
535 m_pg->get_pool().info.opts.value_or(pool_opts_t::SCRUB_MAX_INTERVAL, 0.0);
536 }
537
538 dout(15) << __func__ << " suggested: " << res.proposed_time << " hist: "
539 << m_pg->info.history.last_scrub_stamp << " v:" << m_pg->info.stats.stats_invalid
540 << " / " << m_pg->cct->_conf->osd_scrub_invalid_stats << " must:"
541 << (res.is_must==ScrubQueue::must_scrub_t::mandatory ? "y" : "n" )
542 << " pool min: " << res.min_interval
543 << dendl;
544 return res;
545 }
546
547 void PgScrubber::scrub_requested(scrub_level_t scrub_level,
548 scrub_type_t scrub_type,
549 requested_scrub_t& req_flags)
550 {
551 dout(10) << __func__ << (scrub_level == scrub_level_t::deep ? " deep " : " shallow ")
552 << (scrub_type == scrub_type_t::do_repair ? " repair-scrub " : " not-repair ")
553 << " prev stamp: " << m_scrub_job->get_sched_time()
554 << " registered? " << registration_state()
555 << dendl;
556
557 req_flags.must_scrub = true;
558 req_flags.must_deep_scrub =
559 (scrub_level == scrub_level_t::deep) || (scrub_type == scrub_type_t::do_repair);
560 req_flags.must_repair = (scrub_type == scrub_type_t::do_repair);
561 // User might intervene, so clear this
562 req_flags.need_auto = false;
563 req_flags.req_scrub = true;
564
565 dout(20) << __func__ << " pg(" << m_pg_id << ") planned:" << req_flags << dendl;
566
567 update_scrub_job(req_flags);
568 m_pg->publish_stats_to_osd();
569 }
570
571
572 void PgScrubber::request_rescrubbing(requested_scrub_t& request_flags)
573 {
574 dout(10) << __func__ << " flags: " << request_flags << dendl;
575
576 request_flags.need_auto = true;
577 update_scrub_job(request_flags);
578 }
579
580 bool PgScrubber::reserve_local()
581 {
582 // try to create the reservation object (which translates into asking the
583 // OSD for the local scrub resource). If failing - undo it immediately
584
585 m_local_osd_resource.emplace(m_osds);
586 if (m_local_osd_resource->is_reserved()) {
587 dout(15) << __func__ << ": local resources reserved" << dendl;
588 return true;
589 }
590
591 dout(10) << __func__ << ": failed to reserve local scrub resources" << dendl;
592 m_local_osd_resource.reset();
593 return false;
594 }
595
596 // ----------------------------------------------------------------------------
597
598 bool PgScrubber::has_pg_marked_new_updates() const
599 {
600 auto last_applied = m_pg->recovery_state.get_last_update_applied();
601 dout(10) << __func__ << " recovery last: " << last_applied
602 << " vs. scrub's: " << m_subset_last_update << dendl;
603
604 return last_applied >= m_subset_last_update;
605 }
606
607 void PgScrubber::set_subset_last_update(eversion_t e)
608 {
609 m_subset_last_update = e;
610 dout(15) << __func__ << " last-update: " << e << dendl;
611 }
612
613 void PgScrubber::on_applied_when_primary(const eversion_t& applied_version)
614 {
615 // we are only interested in updates if we are the Primary, and in state
616 // WaitLastUpdate
617 if (m_fsm->is_accepting_updates() && (applied_version >= m_subset_last_update)) {
618 m_osds->queue_scrub_applied_update(m_pg, m_pg->is_scrub_blocking_ops());
619 dout(15) << __func__ << " update: " << applied_version
620 << " vs. required: " << m_subset_last_update << dendl;
621 }
622 }
623
624 /*
625 * The selected range is set directly into 'm_start' and 'm_end'
626 * setting:
627 * - m_subset_last_update
628 * - m_max_end
629 * - end
630 * - start
631 */
632 bool PgScrubber::select_range()
633 {
634 m_primary_scrubmap = ScrubMap{};
635 m_received_maps.clear();
636
637 /* get the start and end of our scrub chunk
638 *
639 * Our scrub chunk has an important restriction we're going to need to
640 * respect. We can't let head be start or end.
641 * Using a half-open interval means that if end == head,
642 * we'd scrub/lock head and the clone right next to head in different
643 * chunks which would allow us to miss clones created between
644 * scrubbing that chunk and scrubbing the chunk including head.
645 * This isn't true for any of the other clones since clones can
646 * only be created "just to the left of" head. There is one exception
647 * to this: promotion of clones which always happens to the left of the
648 * left-most clone, but promote_object checks the scrubber in that
649 * case, so it should be ok. Also, it's ok to "miss" clones at the
650 * left end of the range if we are a tier because they may legitimately
651 * not exist (see _scrub).
652 */
653 int min_idx = static_cast<int>(std::max<int64_t>(
654 3, m_pg->get_cct()->_conf->osd_scrub_chunk_min / (int)preemption_data.chunk_divisor()));
655
656 int max_idx = static_cast<int>(std::max<int64_t>(min_idx, m_pg->get_cct()->_conf->osd_scrub_chunk_max /
657 (int)preemption_data.chunk_divisor()));
658
659 dout(10) << __func__ << " Min: " << min_idx << " Max: " << max_idx
660 << " Div: " << preemption_data.chunk_divisor() << dendl;
661
662 hobject_t start = m_start;
663 hobject_t candidate_end;
664 std::vector<hobject_t> objects;
665 int ret = m_pg->get_pgbackend()->objects_list_partial(start, min_idx, max_idx, &objects,
666 &candidate_end);
667 ceph_assert(ret >= 0);
668
669 if (!objects.empty()) {
670
671 hobject_t back = objects.back();
672 while (candidate_end.is_head() && candidate_end == back.get_head()) {
673 candidate_end = back;
674 objects.pop_back();
675 if (objects.empty()) {
676 ceph_assert(0 ==
677 "Somehow we got more than 2 objects which"
678 "have the same head but are not clones");
679 }
680 back = objects.back();
681 }
682
683 if (candidate_end.is_head()) {
684 ceph_assert(candidate_end != back.get_head());
685 candidate_end = candidate_end.get_object_boundary();
686 }
687
688 } else {
689 ceph_assert(candidate_end.is_max());
690 }
691
692 // is that range free for us? if not - we will be rescheduled later by whoever
693 // triggered us this time
694
695 if (!m_pg->_range_available_for_scrub(m_start, candidate_end)) {
696 // we'll be requeued by whatever made us unavailable for scrub
697 dout(10) << __func__ << ": scrub blocked somewhere in range "
698 << "[" << m_start << ", " << candidate_end << ")" << dendl;
699 return false;
700 }
701
702 m_end = candidate_end;
703 if (m_end > m_max_end)
704 m_max_end = m_end;
705
706 dout(15) << __func__ << " range selected: " << m_start << " //// " << m_end << " //// "
707 << m_max_end << dendl;
708
709 // debug: be 'blocked' if told so by the 'pg scrub_debug block' asok command
710 if (m_debug_blockrange > 0) {
711 m_debug_blockrange--;
712 return false;
713 }
714 return true;
715 }
716
717 void PgScrubber::select_range_n_notify()
718 {
719 if (select_range()) {
720 // the next chunk to handle is not blocked
721 dout(20) << __func__ << ": selection OK" << dendl;
722 m_osds->queue_scrub_chunk_free(m_pg, Scrub::scrub_prio_t::low_priority);
723
724 } else {
725 // we will wait for the objects range to become available for scrubbing
726 dout(10) << __func__ << ": selected chunk is busy" << dendl;
727 m_osds->queue_scrub_chunk_busy(m_pg, Scrub::scrub_prio_t::low_priority);
728 }
729 }
730
731 bool PgScrubber::write_blocked_by_scrub(const hobject_t& soid)
732 {
733 if (soid < m_start || soid >= m_end) {
734 return false;
735 }
736
737 dout(20) << __func__ << " " << soid << " can preempt? "
738 << preemption_data.is_preemptable() << " already preempted? "
739 << preemption_data.was_preempted() << dendl;
740
741 if (preemption_data.was_preempted()) {
742 // otherwise - write requests arriving while 'already preempted' is set
743 // but 'preemptable' is not - will not be allowed to continue, and will
744 // not be requeued on time.
745 return false;
746 }
747
748 if (preemption_data.is_preemptable()) {
749
750 dout(10) << __func__ << " " << soid << " preempted" << dendl;
751
752 // signal the preemption
753 preemption_data.do_preempt();
754 m_end = m_start; // free the range we were scrubbing
755
756 return false;
757 }
758 return true;
759 }
760
761 bool PgScrubber::range_intersects_scrub(const hobject_t& start, const hobject_t& end)
762 {
763 // does [start, end] intersect [scrubber.start, scrubber.m_max_end)
764 return (start < m_max_end && end >= m_start);
765 }
766
767 Scrub::BlockedRangeWarning PgScrubber::acquire_blocked_alarm()
768 {
769 return std::make_unique<blocked_range_t>(m_osds, ceph::timespan{300s}, m_pg_id);
770 }
771
772 /**
773 * if we are required to sleep:
774 * arrange a callback sometimes later.
775 * be sure to be able to identify a stale callback.
776 * Otherwise: perform a requeue (i.e. - rescheduling thru the OSD queue)
777 * anyway.
778 */
779 void PgScrubber::add_delayed_scheduling()
780 {
781 m_end = m_start; // not blocking any range now
782
783 milliseconds sleep_time{0ms};
784 if (m_needs_sleep) {
785 double scrub_sleep =
786 1000.0 * m_osds->get_scrub_services().scrub_sleep_time(m_flags.required);
787 sleep_time = milliseconds{int64_t(scrub_sleep)};
788 }
789 dout(15) << __func__ << " sleep: " << sleep_time.count() << "ms. needed? "
790 << m_needs_sleep << dendl;
791
792 if (sleep_time.count()) {
793 // schedule a transition for some 'sleep_time' ms in the future
794
795 m_needs_sleep = false;
796 m_sleep_started_at = ceph_clock_now();
797
798 // the following log line is used by osd-scrub-test.sh
799 dout(20) << __func__ << " scrub state is PendingTimer, sleeping" << dendl;
800
801 // the 'delayer' for crimson is different. Will be factored out.
802
803 spg_t pgid = m_pg->get_pgid();
804 auto callbk = new LambdaContext([osds = m_osds, pgid, scrbr = this](
805 [[maybe_unused]] int r) mutable {
806 PGRef pg = osds->osd->lookup_lock_pg(pgid);
807 if (!pg) {
808 lgeneric_subdout(g_ceph_context, osd, 10)
809 << "scrub_requeue_callback: Could not find "
810 << "PG " << pgid << " can't complete scrub requeue after sleep" << dendl;
811 return;
812 }
813 scrbr->m_needs_sleep = true;
814 lgeneric_dout(scrbr->get_pg_cct(), 7)
815 << "scrub_requeue_callback: slept for "
816 << ceph_clock_now() - scrbr->m_sleep_started_at << ", re-queuing scrub" << dendl;
817
818 scrbr->m_sleep_started_at = utime_t{};
819 osds->queue_for_scrub_resched(&(*pg), Scrub::scrub_prio_t::low_priority);
820 pg->unlock();
821 });
822
823 std::lock_guard l(m_osds->sleep_lock);
824 m_osds->sleep_timer.add_event_after(sleep_time.count() / 1000.0f, callbk);
825
826 } else {
827 // just a requeue
828 m_osds->queue_for_scrub_resched(m_pg, Scrub::scrub_prio_t::high_priority);
829 }
830 }
831
832 eversion_t PgScrubber::search_log_for_updates() const
833 {
834 auto& projected = m_pg->projected_log.log;
835 auto pi = find_if(
836 projected.crbegin(), projected.crend(),
837 [this](const auto& e) -> bool { return e.soid >= m_start && e.soid < m_end; });
838
839 if (pi != projected.crend())
840 return pi->version;
841
842 // there was no relevant update entry in the log
843
844 auto& log = m_pg->recovery_state.get_pg_log().get_log().log;
845 auto p = find_if(log.crbegin(), log.crend(), [this](const auto& e) -> bool {
846 return e.soid >= m_start && e.soid < m_end;
847 });
848
849 if (p == log.crend())
850 return eversion_t{};
851 else
852 return p->version;
853 }
854
855 void PgScrubber::get_replicas_maps(bool replica_can_preempt)
856 {
857 dout(10) << __func__ << " started in epoch/interval: " << m_epoch_start << "/"
858 << m_interval_start
859 << " pg same_interval_since: " << m_pg->info.history.same_interval_since
860 << dendl;
861
862 m_primary_scrubmap_pos.reset();
863
864 // ask replicas to scan and send maps
865 for (const auto& i : m_pg->get_acting_recovery_backfill()) {
866
867 if (i == m_pg_whoami)
868 continue;
869
870 m_maps_status.mark_replica_map_request(i);
871 _request_scrub_map(i, m_subset_last_update, m_start, m_end, m_is_deep,
872 replica_can_preempt);
873 }
874
875 dout(10) << __func__ << " awaiting" << m_maps_status << dendl;
876 }
877
878 bool PgScrubber::was_epoch_changed() const
879 {
880 // for crimson we have m_pg->get_info().history.same_interval_since
881 dout(10) << __func__ << " epoch_start: " << m_interval_start
882 << " from pg: " << m_pg->get_history().same_interval_since << dendl;
883
884 return m_interval_start < m_pg->get_history().same_interval_since;
885 }
886
887 void PgScrubber::mark_local_map_ready()
888 {
889 m_maps_status.mark_local_map_ready();
890 }
891
892 bool PgScrubber::are_all_maps_available() const
893 {
894 return m_maps_status.are_all_maps_available();
895 }
896
897 std::string PgScrubber::dump_awaited_maps() const
898 {
899 return m_maps_status.dump();
900 }
901
902 void PgScrubber::update_op_mode_text()
903 {
904 auto visible_repair = state_test(PG_STATE_REPAIR);
905 m_mode_desc = (visible_repair ? "repair" : (m_is_deep ? "deep-scrub" : "scrub"));
906
907 dout(10) << __func__ << ": repair: visible: " << (visible_repair ? "true" : "false")
908 << ", internal: " << (m_is_repair ? "true" : "false")
909 << ". Displayed: " << m_mode_desc << dendl;
910 }
911
912 void PgScrubber::_request_scrub_map(pg_shard_t replica,
913 eversion_t version,
914 hobject_t start,
915 hobject_t end,
916 bool deep,
917 bool allow_preemption)
918 {
919 ceph_assert(replica != m_pg_whoami);
920 dout(10) << __func__ << " scrubmap from osd." << replica
921 << (deep ? " deep" : " shallow") << dendl;
922
923 auto repscrubop =
924 new MOSDRepScrub(spg_t(m_pg->info.pgid.pgid, replica.shard), version,
925 get_osdmap_epoch(), m_pg->get_last_peering_reset(), start, end, deep,
926 allow_preemption, m_flags.priority, m_pg->ops_blocked_by_scrub());
927
928 // default priority. We want the replica-scrub processed prior to any recovery
929 // or client io messages (we are holding a lock!)
930 m_osds->send_message_osd_cluster(replica.osd, repscrubop, get_osdmap_epoch());
931 }
932
933 void PgScrubber::cleanup_store(ObjectStore::Transaction* t)
934 {
935 if (!m_store)
936 return;
937
938 struct OnComplete : Context {
939 std::unique_ptr<Scrub::Store> store;
940 explicit OnComplete(std::unique_ptr<Scrub::Store>&& store) : store(std::move(store))
941 {}
942 void finish(int) override {}
943 };
944 m_store->cleanup(t);
945 t->register_on_complete(new OnComplete(std::move(m_store)));
946 ceph_assert(!m_store);
947 }
948
949 void PgScrubber::on_init()
950 {
951 // going upwards from 'inactive'
952 ceph_assert(!is_scrub_active());
953 m_pg->reset_objects_scrubbed();
954 preemption_data.reset();
955 m_pg->publish_stats_to_osd();
956 m_interval_start = m_pg->get_history().same_interval_since;
957
958 dout(10) << __func__ << " start same_interval:" << m_interval_start << dendl;
959
960 // create a new store
961 {
962 ObjectStore::Transaction t;
963 cleanup_store(&t);
964 m_store.reset(
965 Scrub::Store::create(m_pg->osd->store, &t, m_pg->info.pgid, m_pg->coll));
966 m_pg->osd->store->queue_transaction(m_pg->ch, std::move(t), nullptr);
967 }
968
969 m_start = m_pg->info.pgid.pgid.get_hobj_start();
970 m_active = true;
971 ++m_sessions_counter;
972 m_pg->publish_stats_to_osd();
973 }
974
975 void PgScrubber::on_replica_init()
976 {
977 m_active = true;
978 ++m_sessions_counter;
979 }
980
981 void PgScrubber::_scan_snaps(ScrubMap& smap)
982 {
983 hobject_t head;
984 SnapSet snapset;
985
986 // Test qa/standalone/scrub/osd-scrub-snaps.sh greps for the strings
987 // in this function
988 dout(15) << "_scan_snaps starts" << dendl;
989
990 for (auto i = smap.objects.rbegin(); i != smap.objects.rend(); ++i) {
991
992 const hobject_t& hoid = i->first;
993 ScrubMap::object& o = i->second;
994
995 dout(20) << __func__ << " " << hoid << dendl;
996
997 ceph_assert(!hoid.is_snapdir());
998 if (hoid.is_head()) {
999 // parse the SnapSet
1000 bufferlist bl;
1001 if (o.attrs.find(SS_ATTR) == o.attrs.end()) {
1002 continue;
1003 }
1004 bl.push_back(o.attrs[SS_ATTR]);
1005 auto p = bl.cbegin();
1006 try {
1007 decode(snapset, p);
1008 } catch (...) {
1009 continue;
1010 }
1011 head = hoid.get_head();
1012 continue;
1013 }
1014
1015 if (hoid.snap < CEPH_MAXSNAP) {
1016 // check and if necessary fix snap_mapper
1017 if (hoid.get_head() != head) {
1018 derr << __func__ << " no head for " << hoid << " (have " << head << ")" << dendl;
1019 continue;
1020 }
1021 set<snapid_t> obj_snaps;
1022 auto p = snapset.clone_snaps.find(hoid.snap);
1023 if (p == snapset.clone_snaps.end()) {
1024 derr << __func__ << " no clone_snaps for " << hoid << " in " << snapset << dendl;
1025 continue;
1026 }
1027 obj_snaps.insert(p->second.begin(), p->second.end());
1028 set<snapid_t> cur_snaps;
1029 int r = m_pg->snap_mapper.get_snaps(hoid, &cur_snaps);
1030 if (r != 0 && r != -ENOENT) {
1031 derr << __func__ << ": get_snaps returned " << cpp_strerror(r) << dendl;
1032 ceph_abort();
1033 }
1034 if (r == -ENOENT || cur_snaps != obj_snaps) {
1035 ObjectStore::Transaction t;
1036 OSDriver::OSTransaction _t(m_pg->osdriver.get_transaction(&t));
1037 if (r == 0) {
1038 r = m_pg->snap_mapper.remove_oid(hoid, &_t);
1039 if (r != 0) {
1040 derr << __func__ << ": remove_oid returned " << cpp_strerror(r) << dendl;
1041 ceph_abort();
1042 }
1043 m_pg->osd->clog->error()
1044 << "osd." << m_pg->osd->whoami << " found snap mapper error on pg "
1045 << m_pg->info.pgid << " oid " << hoid << " snaps in mapper: " << cur_snaps
1046 << ", oi: " << obj_snaps << "...repaired";
1047 } else {
1048 m_pg->osd->clog->error()
1049 << "osd." << m_pg->osd->whoami << " found snap mapper error on pg "
1050 << m_pg->info.pgid << " oid " << hoid << " snaps missing in mapper"
1051 << ", should be: " << obj_snaps << " was " << cur_snaps << " r " << r
1052 << "...repaired";
1053 }
1054 m_pg->snap_mapper.add_oid(hoid, obj_snaps, &_t);
1055
1056 // wait for repair to apply to avoid confusing other bits of the system.
1057 {
1058 dout(15) << __func__ << " wait on repair!" << dendl;
1059
1060 ceph::condition_variable my_cond;
1061 ceph::mutex my_lock = ceph::make_mutex("PG::_scan_snaps my_lock");
1062 int e = 0;
1063 bool done;
1064
1065 t.register_on_applied_sync(new C_SafeCond(my_lock, my_cond, &done, &e));
1066
1067 e = m_pg->osd->store->queue_transaction(m_pg->ch, std::move(t));
1068 if (e != 0) {
1069 derr << __func__ << ": queue_transaction got " << cpp_strerror(e) << dendl;
1070 } else {
1071 std::unique_lock l{my_lock};
1072 my_cond.wait(l, [&done] { return done; });
1073 }
1074 }
1075 }
1076 }
1077 }
1078 }
1079
1080 int PgScrubber::build_primary_map_chunk()
1081 {
1082 epoch_t map_building_since = m_pg->get_osdmap_epoch();
1083 dout(20) << __func__ << ": initiated at epoch " << map_building_since << dendl;
1084
1085 auto ret = build_scrub_map_chunk(m_primary_scrubmap, m_primary_scrubmap_pos, m_start,
1086 m_end, m_is_deep);
1087
1088 if (ret == -EINPROGRESS) {
1089 // reschedule another round of asking the backend to collect the scrub data
1090 m_osds->queue_for_scrub_resched(m_pg, Scrub::scrub_prio_t::low_priority);
1091 }
1092 return ret;
1093 }
1094
1095 int PgScrubber::build_replica_map_chunk()
1096 {
1097 dout(10) << __func__ << " interval start: " << m_interval_start
1098 << " current token: " << m_current_token << " epoch: " << m_epoch_start
1099 << " deep: " << m_is_deep << dendl;
1100
1101 auto ret = build_scrub_map_chunk(replica_scrubmap, replica_scrubmap_pos, m_start, m_end,
1102 m_is_deep);
1103
1104 switch (ret) {
1105
1106 case -EINPROGRESS:
1107 // must wait for the backend to finish. No external event source.
1108 // (note: previous version used low priority here. Now switched to using the
1109 // priority of the original message)
1110 m_osds->queue_for_rep_scrub_resched(m_pg, m_replica_request_priority,
1111 m_flags.priority, m_current_token);
1112 break;
1113
1114 case 0: {
1115 // finished!
1116 m_cleaned_meta_map.clear_from(m_start);
1117 m_cleaned_meta_map.insert(replica_scrubmap);
1118 auto for_meta_scrub = clean_meta_map();
1119 _scan_snaps(for_meta_scrub);
1120
1121 // the local map has been created. Send it to the primary.
1122 // Note: once the message reaches the Primary, it may ask us for another
1123 // chunk - and we better be done with the current scrub. Thus - the preparation of
1124 // the reply message is separate, and we clear the scrub state before actually
1125 // sending it.
1126
1127 auto reply = prep_replica_map_msg(PreemptionNoted::no_preemption);
1128 replica_handling_done();
1129 dout(15) << __func__ << " chunk map sent " << dendl;
1130 send_replica_map(reply);
1131 } break;
1132
1133 default:
1134 // negative retval: build_scrub_map_chunk() signalled an error
1135 // Pre-Pacific code ignored this option, treating it as a success.
1136 // \todo Add an error flag in the returning message.
1137 dout(1) << "Error! Aborting. ActiveReplica::react(SchedReplica) Ret: " << ret
1138 << dendl;
1139 replica_handling_done();
1140 // only in debug mode for now:
1141 assert(false && "backend error");
1142 break;
1143 };
1144
1145 return ret;
1146 }
1147
1148 int PgScrubber::build_scrub_map_chunk(
1149 ScrubMap& map, ScrubMapBuilder& pos, hobject_t start, hobject_t end, bool deep)
1150 {
1151 dout(10) << __func__ << " [" << start << "," << end << ") "
1152 << " pos " << pos << " Deep: " << deep << dendl;
1153
1154 // start
1155 while (pos.empty()) {
1156
1157 pos.deep = deep;
1158 map.valid_through = m_pg->info.last_update;
1159
1160 // objects
1161 vector<ghobject_t> rollback_obs;
1162 pos.ret =
1163 m_pg->get_pgbackend()->objects_list_range(start, end, &pos.ls, &rollback_obs);
1164 dout(10) << __func__ << " while pos empty " << pos.ret << dendl;
1165 if (pos.ret < 0) {
1166 dout(5) << "objects_list_range error: " << pos.ret << dendl;
1167 return pos.ret;
1168 }
1169 dout(10) << __func__ << " pos.ls.empty()? " << (pos.ls.empty() ? "+" : "-") << dendl;
1170 if (pos.ls.empty()) {
1171 break;
1172 }
1173 m_pg->_scan_rollback_obs(rollback_obs);
1174 pos.pos = 0;
1175 return -EINPROGRESS;
1176 }
1177
1178 // scan objects
1179 while (!pos.done()) {
1180
1181 int r = m_pg->get_pgbackend()->be_scan_list(map, pos);
1182 dout(30) << __func__ << " BE returned " << r << dendl;
1183 if (r == -EINPROGRESS) {
1184 dout(20) << __func__ << " in progress" << dendl;
1185 return r;
1186 }
1187 }
1188
1189 // finish
1190 dout(20) << __func__ << " finishing" << dendl;
1191 ceph_assert(pos.done());
1192 m_pg->_repair_oinfo_oid(map);
1193
1194 dout(20) << __func__ << " done, got " << map.objects.size() << " items" << dendl;
1195 return 0;
1196 }
1197
1198 /*
1199 * Process:
1200 * Building a map of objects suitable for snapshot validation.
1201 * The data in m_cleaned_meta_map is the left over partial items that need to
1202 * be completed before they can be processed.
1203 *
1204 * Snapshots in maps precede the head object, which is why we are scanning backwards.
1205 */
1206 ScrubMap PgScrubber::clean_meta_map()
1207 {
1208 ScrubMap for_meta_scrub;
1209
1210 if (m_end.is_max() || m_cleaned_meta_map.objects.empty()) {
1211 m_cleaned_meta_map.swap(for_meta_scrub);
1212 } else {
1213 auto iter = m_cleaned_meta_map.objects.end();
1214 --iter; // not empty, see 'if' clause
1215 auto begin = m_cleaned_meta_map.objects.begin();
1216 if (iter->first.has_snapset()) {
1217 ++iter;
1218 } else {
1219 while (iter != begin) {
1220 auto next = iter--;
1221 if (next->first.get_head() != iter->first.get_head()) {
1222 ++iter;
1223 break;
1224 }
1225 }
1226 }
1227 for_meta_scrub.objects.insert(begin, iter);
1228 m_cleaned_meta_map.objects.erase(begin, iter);
1229 }
1230
1231 return for_meta_scrub;
1232 }
1233
1234 void PgScrubber::run_callbacks()
1235 {
1236 std::list<Context*> to_run;
1237 to_run.swap(m_callbacks);
1238
1239 for (auto& tr : to_run) {
1240 tr->complete(0);
1241 }
1242 }
1243
1244 void PgScrubber::maps_compare_n_cleanup()
1245 {
1246 scrub_compare_maps();
1247 m_start = m_end;
1248 run_callbacks();
1249 requeue_waiting();
1250 m_osds->queue_scrub_maps_compared(m_pg, Scrub::scrub_prio_t::low_priority);
1251 }
1252
1253 Scrub::preemption_t& PgScrubber::get_preemptor()
1254 {
1255 return preemption_data;
1256 }
1257
1258 /*
1259 * Process note: called for the arriving "give me your map, replica!" request.
1260 * Unlike the original implementation, we do not requeue the Op waiting for
1261 * updates. Instead - we trigger the FSM.
1262 */
1263 void PgScrubber::replica_scrub_op(OpRequestRef op)
1264 {
1265 op->mark_started();
1266 auto msg = op->get_req<MOSDRepScrub>();
1267 dout(10) << __func__ << " pg:" << m_pg->pg_id
1268 << " Msg: map_epoch:" << msg->map_epoch
1269 << " min_epoch:" << msg->min_epoch << " deep?" << msg->deep << dendl;
1270
1271 // are we still processing a previous scrub-map request without noticing that
1272 // the interval changed? won't see it here, but rather at the reservation
1273 // stage.
1274
1275 if (msg->map_epoch < m_pg->info.history.same_interval_since) {
1276 dout(10) << "replica_scrub_op discarding old replica_scrub from "
1277 << msg->map_epoch << " < "
1278 << m_pg->info.history.same_interval_since << dendl;
1279
1280 // is there a general sync issue? are we holding a stale reservation?
1281 // not checking now - assuming we will actively react to interval change.
1282
1283 return;
1284 }
1285
1286 if (is_queued_or_active()) {
1287 // this is bug!
1288 // Somehow, we have received a new scrub request from our Primary, before
1289 // having finished with the previous one. Did we go through an interval
1290 // change without reseting the FSM? Possible responses:
1291 // - crashing (the original assert_not_active() implemented that one), or
1292 // - trying to recover:
1293 // - (logging enough information to debug this scenario)
1294 // - reset the FSM.
1295 m_osds->clog->warn() << fmt::format(
1296 "{}: error: a second scrub-op received while handling the previous one",
1297 __func__);
1298
1299 scrub_clear_state();
1300 m_osds->clog->warn() << fmt::format(
1301 "{}: after a reset. Now handling the new OP", __func__);
1302 }
1303 // make sure the FSM is at NotActive
1304 m_fsm->assert_not_active();
1305
1306 replica_scrubmap = ScrubMap{};
1307 replica_scrubmap_pos = ScrubMapBuilder{};
1308
1309 m_replica_min_epoch = msg->min_epoch;
1310 m_start = msg->start;
1311 m_end = msg->end;
1312 m_max_end = msg->end;
1313 m_is_deep = msg->deep;
1314 m_interval_start = m_pg->info.history.same_interval_since;
1315 m_replica_request_priority = msg->high_priority
1316 ? Scrub::scrub_prio_t::high_priority
1317 : Scrub::scrub_prio_t::low_priority;
1318 m_flags.priority = msg->priority ? msg->priority : m_pg->get_scrub_priority();
1319
1320 preemption_data.reset();
1321 preemption_data.force_preemptability(msg->allow_preemption);
1322
1323 replica_scrubmap_pos.reset();
1324
1325 set_queued_or_active();
1326 m_osds->queue_for_rep_scrub(m_pg, m_replica_request_priority,
1327 m_flags.priority, m_current_token);
1328 }
1329
1330 void PgScrubber::set_op_parameters(requested_scrub_t& request)
1331 {
1332 dout(10) << __func__ << " input: " << request << dendl;
1333
1334 set_queued_or_active(); // we are fully committed now.
1335
1336 // write down the epoch of starting a new scrub. Will be used
1337 // to discard stale messages from previous aborted scrubs.
1338 m_epoch_start = m_pg->get_osdmap_epoch();
1339
1340 m_flags.check_repair = request.check_repair;
1341 m_flags.auto_repair = request.auto_repair || request.need_auto;
1342 m_flags.required = request.req_scrub || request.must_scrub;
1343
1344 m_flags.priority = (request.must_scrub || request.need_auto)
1345 ? get_pg_cct()->_conf->osd_requested_scrub_priority
1346 : m_pg->get_scrub_priority();
1347
1348 state_set(PG_STATE_SCRUBBING);
1349
1350 // will we be deep-scrubbing?
1351 if (request.must_deep_scrub || request.need_auto || request.time_for_deep) {
1352 state_set(PG_STATE_DEEP_SCRUB);
1353 }
1354
1355 // m_is_repair is set for either 'must_repair' or 'repair-on-the-go' (i.e.
1356 // deep-scrub with the auto_repair configuration flag set). m_is_repair value
1357 // determines the scrubber behavior.
1358 // PG_STATE_REPAIR, on the other hand, is only used for status reports (inc. the
1359 // PG status as appearing in the logs).
1360 m_is_repair = request.must_repair || m_flags.auto_repair;
1361 if (request.must_repair) {
1362 state_set(PG_STATE_REPAIR);
1363 // not calling update_op_mode_text() yet, as m_is_deep not set yet
1364 }
1365
1366 // the publishing here is required for tests synchronization
1367 m_pg->publish_stats_to_osd();
1368 m_flags.deep_scrub_on_error = request.deep_scrub_on_error;
1369 }
1370
1371 void PgScrubber::scrub_compare_maps()
1372 {
1373 dout(10) << __func__ << " has maps, analyzing" << dendl;
1374
1375 // construct authoritative scrub map for type-specific scrubbing
1376 m_cleaned_meta_map.insert(m_primary_scrubmap);
1377 map<hobject_t, pair<std::optional<uint32_t>, std::optional<uint32_t>>> missing_digest;
1378
1379 map<pg_shard_t, ScrubMap*> maps;
1380 maps[m_pg_whoami] = &m_primary_scrubmap;
1381
1382 for (const auto& i : m_pg->get_acting_recovery_backfill()) {
1383 if (i == m_pg_whoami)
1384 continue;
1385 dout(2) << __func__ << " replica " << i << " has "
1386 << m_received_maps[i].objects.size() << " items" << dendl;
1387 maps[i] = &m_received_maps[i];
1388 }
1389
1390 set<hobject_t> master_set;
1391
1392 // Construct master set
1393 for (const auto& map : maps) {
1394 for (const auto& i : map.second->objects) {
1395 master_set.insert(i.first);
1396 }
1397 }
1398
1399 stringstream ss;
1400 m_pg->get_pgbackend()->be_omap_checks(maps, master_set, m_omap_stats, ss);
1401
1402 if (!ss.str().empty()) {
1403 m_osds->clog->warn(ss);
1404 }
1405
1406 if (m_pg->recovery_state.get_acting_recovery_backfill().size() > 1) {
1407
1408 dout(10) << __func__ << " comparing replica scrub maps" << dendl;
1409
1410 // Map from object with errors to good peer
1411 map<hobject_t, list<pg_shard_t>> authoritative;
1412
1413 dout(2) << __func__ << ": primary (" << m_pg->get_primary() << ") has "
1414 << m_primary_scrubmap.objects.size() << " items" << dendl;
1415 m_pg->add_objects_scrubbed_count(m_primary_scrubmap.objects.size());
1416
1417 ss.str("");
1418 ss.clear();
1419
1420 m_pg->get_pgbackend()->be_compare_scrubmaps(
1421 maps, master_set, m_is_repair, m_missing, m_inconsistent,
1422 authoritative, missing_digest, m_shallow_errors, m_deep_errors, m_store.get(),
1423 m_pg->info.pgid, m_pg->recovery_state.get_acting(), ss);
1424
1425 if (!ss.str().empty()) {
1426 m_osds->clog->error(ss);
1427 }
1428
1429 for (auto& i : authoritative) {
1430 list<pair<ScrubMap::object, pg_shard_t>> good_peers;
1431 for (list<pg_shard_t>::const_iterator j = i.second.begin(); j != i.second.end();
1432 ++j) {
1433 good_peers.emplace_back(maps[*j]->objects[i.first], *j);
1434 }
1435 m_authoritative.emplace(i.first, good_peers);
1436 }
1437
1438 for (auto i = authoritative.begin(); i != authoritative.end(); ++i) {
1439 m_cleaned_meta_map.objects.erase(i->first);
1440 m_cleaned_meta_map.objects.insert(
1441 *(maps[i->second.back()]->objects.find(i->first)));
1442 }
1443 }
1444
1445 auto for_meta_scrub = clean_meta_map();
1446
1447 // ok, do the pg-type specific scrubbing
1448
1449 // (Validates consistency of the object info and snap sets)
1450 scrub_snapshot_metadata(for_meta_scrub, missing_digest);
1451
1452 // Called here on the primary can use an authoritative map if it isn't the primary
1453 _scan_snaps(for_meta_scrub);
1454
1455 if (!m_store->empty()) {
1456
1457 if (m_is_repair) {
1458 dout(10) << __func__ << ": discarding scrub results" << dendl;
1459 m_store->flush(nullptr);
1460 } else {
1461 dout(10) << __func__ << ": updating scrub object" << dendl;
1462 ObjectStore::Transaction t;
1463 m_store->flush(&t);
1464 m_pg->osd->store->queue_transaction(m_pg->ch, std::move(t), nullptr);
1465 }
1466 }
1467 }
1468
1469 ScrubMachineListener::MsgAndEpoch PgScrubber::prep_replica_map_msg(
1470 PreemptionNoted was_preempted)
1471 {
1472 dout(10) << __func__ << " min epoch:" << m_replica_min_epoch << dendl;
1473
1474 auto reply =
1475 make_message<MOSDRepScrubMap>(spg_t(m_pg->info.pgid.pgid, m_pg->get_primary().shard),
1476 m_replica_min_epoch, m_pg_whoami);
1477
1478 reply->preempted = (was_preempted == PreemptionNoted::preempted);
1479 ::encode(replica_scrubmap, reply->get_data());
1480
1481 return ScrubMachineListener::MsgAndEpoch{reply, m_replica_min_epoch};
1482 }
1483
1484 void PgScrubber::send_replica_map(const MsgAndEpoch& preprepared)
1485 {
1486 m_pg->send_cluster_message(m_pg->get_primary().osd, preprepared.m_msg,
1487 preprepared.m_epoch, false);
1488 }
1489
1490 void PgScrubber::send_preempted_replica()
1491 {
1492 auto reply =
1493 make_message<MOSDRepScrubMap>(spg_t{m_pg->info.pgid.pgid, m_pg->get_primary().shard},
1494 m_replica_min_epoch, m_pg_whoami);
1495
1496 reply->preempted = true;
1497 ::encode(replica_scrubmap, reply->get_data()); // must not skip this
1498 m_pg->send_cluster_message(m_pg->get_primary().osd, reply, m_replica_min_epoch, false);
1499 }
1500
1501 /*
1502 * - if the replica lets us know it was interrupted, we mark the chunk as interrupted.
1503 * The state-machine will react to that when all replica maps are received.
1504 * - when all maps are received, we signal the FSM with the GotReplicas event (see
1505 * scrub_send_replmaps_ready()). Note that due to the no-reentrancy limitations of the
1506 * FSM, we do not 'process' the event directly. Instead - it is queued for the OSD to
1507 * handle.
1508 */
1509 void PgScrubber::map_from_replica(OpRequestRef op)
1510 {
1511 auto m = op->get_req<MOSDRepScrubMap>();
1512 dout(15) << __func__ << " " << *m << dendl;
1513
1514 if (m->map_epoch < m_pg->info.history.same_interval_since) {
1515 dout(10) << __func__ << " discarding old from " << m->map_epoch << " < "
1516 << m_pg->info.history.same_interval_since << dendl;
1517 return;
1518 }
1519
1520 auto p = const_cast<bufferlist&>(m->get_data()).cbegin();
1521
1522 m_received_maps[m->from].decode(p, m_pg->info.pgid.pool());
1523 dout(15) << "map version is " << m_received_maps[m->from].valid_through << dendl;
1524
1525 auto [is_ok, err_txt] = m_maps_status.mark_arriving_map(m->from);
1526 if (!is_ok) {
1527 // previously an unexpected map was triggering an assert. Now, as scrubs can be
1528 // aborted at any time, the chances of this happening have increased, and aborting is
1529 // not justified
1530 dout(1) << __func__ << err_txt << " from OSD " << m->from << dendl;
1531 return;
1532 }
1533
1534 if (m->preempted) {
1535 dout(10) << __func__ << " replica was preempted, setting flag" << dendl;
1536 preemption_data.do_preempt();
1537 }
1538
1539 if (m_maps_status.are_all_maps_available()) {
1540 dout(15) << __func__ << " all repl-maps available" << dendl;
1541 m_osds->queue_scrub_got_repl_maps(m_pg, m_pg->is_scrub_blocking_ops());
1542 }
1543 }
1544
1545 void PgScrubber::handle_scrub_reserve_request(OpRequestRef op)
1546 {
1547 dout(10) << __func__ << " " << *op->get_req() << dendl;
1548 op->mark_started();
1549 auto request_ep = op->get_req<MOSDScrubReserve>()->get_map_epoch();
1550
1551 /*
1552 * if we are currently holding a reservation, then:
1553 * either (1) we, the scrubber, did not yet notice an interval change. The remembered
1554 * reservation epoch is from before our interval, and we can silently discard the
1555 * reservation (no message is required).
1556 * or:
1557 * (2) the interval hasn't changed, but the same Primary that (we think) holds the
1558 * lock just sent us a new request. Note that we know it's the same Primary, as
1559 * otherwise the interval would have changed.
1560 * Ostensibly we can discard & redo the reservation. But then we
1561 * will be temporarily releasing the OSD resource - and might not be able to grab it
1562 * again. Thus, we simply treat this as a successful new request
1563 * (but mark the fact that if there is a previous request from the primary to
1564 * scrub a specific chunk - that request is now defunct).
1565 */
1566
1567 if (m_remote_osd_resource.has_value() && m_remote_osd_resource->is_stale()) {
1568 // we are holding a stale reservation from a past epoch
1569 m_remote_osd_resource.reset();
1570 dout(10) << __func__ << " cleared existing stale reservation" << dendl;
1571 }
1572
1573 if (request_ep < m_pg->get_same_interval_since()) {
1574 // will not ack stale requests
1575 return;
1576 }
1577
1578 bool granted{false};
1579 if (m_remote_osd_resource.has_value()) {
1580
1581 dout(10) << __func__ << " already reserved." << dendl;
1582
1583 /*
1584 * it might well be that we did not yet finish handling the latest scrub-op from
1585 * our primary. This happens, for example, if 'noscrub' was set via a command, then
1586 * reset. The primary in this scenario will remain in the same interval, but we do need
1587 * to reset our internal state (otherwise - the first renewed 'give me your scrub map'
1588 * from the primary will see us in active state, crashing the OSD).
1589 */
1590 advance_token();
1591 granted = true;
1592
1593 } else if (m_pg->cct->_conf->osd_scrub_during_recovery ||
1594 !m_osds->is_recovery_active()) {
1595 m_remote_osd_resource.emplace(this, m_pg, m_osds, request_ep);
1596 // OSD resources allocated?
1597 granted = m_remote_osd_resource->is_reserved();
1598 if (!granted) {
1599 // just forget it
1600 m_remote_osd_resource.reset();
1601 dout(20) << __func__ << ": failed to reserve remotely" << dendl;
1602 }
1603 }
1604
1605 dout(10) << __func__ << " reserved? " << (granted ? "yes" : "no") << dendl;
1606
1607 Message* reply = new MOSDScrubReserve(
1608 spg_t(m_pg->info.pgid.pgid, m_pg->get_primary().shard), request_ep,
1609 granted ? MOSDScrubReserve::GRANT : MOSDScrubReserve::REJECT, m_pg_whoami);
1610
1611 m_osds->send_message_osd_cluster(reply, op->get_req()->get_connection());
1612 }
1613
1614 void PgScrubber::handle_scrub_reserve_grant(OpRequestRef op, pg_shard_t from)
1615 {
1616 dout(10) << __func__ << " " << *op->get_req() << dendl;
1617 op->mark_started();
1618
1619 if (m_reservations.has_value()) {
1620 m_reservations->handle_reserve_grant(op, from);
1621 } else {
1622 derr << __func__ << ": received unsolicited reservation grant from osd " << from
1623 << " (" << op << ")" << dendl;
1624 }
1625 }
1626
1627 void PgScrubber::handle_scrub_reserve_reject(OpRequestRef op, pg_shard_t from)
1628 {
1629 dout(10) << __func__ << " " << *op->get_req() << dendl;
1630 op->mark_started();
1631
1632 if (m_reservations.has_value()) {
1633 // there is an active reservation process. No action is required otherwise.
1634 m_reservations->handle_reserve_reject(op, from);
1635 }
1636 }
1637
1638 void PgScrubber::handle_scrub_reserve_release(OpRequestRef op)
1639 {
1640 dout(10) << __func__ << " " << *op->get_req() << dendl;
1641 op->mark_started();
1642
1643 /*
1644 * this specific scrub session has terminated. All incoming events carrying the old
1645 * tag will be discarded.
1646 */
1647 advance_token();
1648 m_remote_osd_resource.reset();
1649 }
1650
1651 void PgScrubber::discard_replica_reservations()
1652 {
1653 dout(10) << __func__ << dendl;
1654 if (m_reservations.has_value()) {
1655 m_reservations->discard_all();
1656 }
1657 }
1658
1659 void PgScrubber::clear_scrub_reservations()
1660 {
1661 dout(10) << __func__ << dendl;
1662 m_reservations.reset(); // the remote reservations
1663 m_local_osd_resource.reset(); // the local reservation
1664 m_remote_osd_resource.reset(); // we as replica reserved for a Primary
1665 }
1666
1667 void PgScrubber::message_all_replicas(int32_t opcode, std::string_view op_text)
1668 {
1669 ceph_assert(m_pg->recovery_state.get_backfill_targets().empty());
1670
1671 std::vector<pair<int, Message*>> messages;
1672 messages.reserve(m_pg->get_actingset().size());
1673
1674 epoch_t epch = get_osdmap_epoch();
1675
1676 for (auto& p : m_pg->get_actingset()) {
1677
1678 if (p == m_pg_whoami)
1679 continue;
1680
1681 dout(10) << "scrub requesting " << op_text << " from osd." << p << " Epoch: " << epch
1682 << dendl;
1683 Message* m = new MOSDScrubReserve(spg_t(m_pg->info.pgid.pgid, p.shard), epch, opcode,
1684 m_pg_whoami);
1685 messages.push_back(std::make_pair(p.osd, m));
1686 }
1687
1688 if (!messages.empty()) {
1689 m_osds->send_message_osd_cluster(messages, epch);
1690 }
1691 }
1692
1693 void PgScrubber::unreserve_replicas()
1694 {
1695 dout(10) << __func__ << dendl;
1696 m_reservations.reset();
1697 }
1698
1699 void PgScrubber::set_reserving_now()
1700 {
1701 m_osds->get_scrub_services().set_reserving_now();
1702 }
1703
1704 void PgScrubber::clear_reserving_now()
1705 {
1706 m_osds->get_scrub_services().clear_reserving_now();
1707 }
1708
1709 void PgScrubber::set_queued_or_active()
1710 {
1711 m_queued_or_active = true;
1712 }
1713
1714 void PgScrubber::clear_queued_or_active()
1715 {
1716 if (m_queued_or_active) {
1717 m_queued_or_active = false;
1718 // and just in case snap trimming was blocked by the aborted scrub
1719 m_pg->snap_trimmer_scrub_complete();
1720 }
1721 }
1722
1723 bool PgScrubber::is_queued_or_active() const
1724 {
1725 return m_queued_or_active;
1726 }
1727
1728 [[nodiscard]] bool PgScrubber::scrub_process_inconsistent()
1729 {
1730 dout(10) << __func__ << ": checking authoritative (mode="
1731 << m_mode_desc << ", auth remaining #: " << m_authoritative.size()
1732 << ")" << dendl;
1733
1734 // authoritative only store objects which are missing or inconsistent.
1735 if (!m_authoritative.empty()) {
1736
1737 stringstream ss;
1738 ss << m_pg->info.pgid << " " << m_mode_desc << " " << m_missing.size() << " missing, "
1739 << m_inconsistent.size() << " inconsistent objects";
1740 dout(2) << ss.str() << dendl;
1741 m_osds->clog->error(ss);
1742
1743 if (m_is_repair) {
1744 state_clear(PG_STATE_CLEAN);
1745 // we know we have a problem, so it's OK to set the user-visible flag
1746 // even if we only reached here via auto-repair
1747 state_set(PG_STATE_REPAIR);
1748 update_op_mode_text();
1749
1750 for (const auto& [hobj, shrd_list] : m_authoritative) {
1751
1752 auto missing_entry = m_missing.find(hobj);
1753
1754 if (missing_entry != m_missing.end()) {
1755 m_pg->repair_object(hobj, shrd_list, missing_entry->second);
1756 m_fixed_count += missing_entry->second.size();
1757 }
1758
1759 if (m_inconsistent.count(hobj)) {
1760 m_pg->repair_object(hobj, shrd_list, m_inconsistent[hobj]);
1761 m_fixed_count += m_inconsistent[hobj].size();
1762 }
1763 }
1764 }
1765 }
1766 return (!m_authoritative.empty() && m_is_repair);
1767 }
1768
1769 /*
1770 * note: only called for the Primary.
1771 */
1772 void PgScrubber::scrub_finish()
1773 {
1774 dout(10) << __func__ << " before flags: " << m_flags
1775 << ". repair state: " << (state_test(PG_STATE_REPAIR) ? "repair" : "no-repair")
1776 << ". deep_scrub_on_error: " << m_flags.deep_scrub_on_error << dendl;
1777
1778 ceph_assert(m_pg->is_locked());
1779 ceph_assert(is_queued_or_active());
1780
1781 m_pg->m_planned_scrub = requested_scrub_t{};
1782
1783 // if the repair request comes from auto-repair and large number of errors,
1784 // we would like to cancel auto-repair
1785 if (m_is_repair && m_flags.auto_repair &&
1786 m_authoritative.size() > m_pg->cct->_conf->osd_scrub_auto_repair_num_errors) {
1787
1788 dout(10) << __func__ << " undoing the repair" << dendl;
1789 state_clear(PG_STATE_REPAIR); // not expected to be set, anyway
1790 m_is_repair = false;
1791 update_op_mode_text();
1792 }
1793
1794 bool do_auto_scrub = false;
1795
1796 // if a regular scrub had errors within the limit, do a deep scrub to auto repair
1797 if (m_flags.deep_scrub_on_error && !m_authoritative.empty() &&
1798 m_authoritative.size() <= m_pg->cct->_conf->osd_scrub_auto_repair_num_errors) {
1799 ceph_assert(!m_is_deep);
1800 do_auto_scrub = true;
1801 dout(15) << __func__ << " Try to auto repair after scrub errors" << dendl;
1802 }
1803
1804 m_flags.deep_scrub_on_error = false;
1805
1806 // type-specific finish (can tally more errors)
1807 _scrub_finish();
1808
1809 bool has_error = scrub_process_inconsistent();
1810
1811 {
1812 stringstream oss;
1813 oss << m_pg->info.pgid.pgid << " " << m_mode_desc << " ";
1814 int total_errors = m_shallow_errors + m_deep_errors;
1815 if (total_errors)
1816 oss << total_errors << " errors";
1817 else
1818 oss << "ok";
1819 if (!m_is_deep && m_pg->info.stats.stats.sum.num_deep_scrub_errors)
1820 oss << " ( " << m_pg->info.stats.stats.sum.num_deep_scrub_errors
1821 << " remaining deep scrub error details lost)";
1822 if (m_is_repair)
1823 oss << ", " << m_fixed_count << " fixed";
1824 if (total_errors)
1825 m_osds->clog->error(oss);
1826 else
1827 m_osds->clog->debug(oss);
1828 }
1829
1830 // Since we don't know which errors were fixed, we can only clear them
1831 // when every one has been fixed.
1832 if (m_is_repair) {
1833 if (m_fixed_count == m_shallow_errors + m_deep_errors) {
1834
1835 ceph_assert(m_is_deep);
1836 m_shallow_errors = 0;
1837 m_deep_errors = 0;
1838 dout(20) << __func__ << " All may be fixed" << dendl;
1839
1840 } else if (has_error) {
1841
1842 // Deep scrub in order to get corrected error counts
1843 m_pg->scrub_after_recovery = true;
1844 m_pg->m_planned_scrub.req_scrub =
1845 m_pg->m_planned_scrub.req_scrub || m_flags.required;
1846
1847 dout(20) << __func__ << " Current 'required': " << m_flags.required
1848 << " Planned 'req_scrub': " << m_pg->m_planned_scrub.req_scrub << dendl;
1849
1850 } else if (m_shallow_errors || m_deep_errors) {
1851
1852 // We have errors but nothing can be fixed, so there is no repair
1853 // possible.
1854 state_set(PG_STATE_FAILED_REPAIR);
1855 dout(10) << __func__ << " " << (m_shallow_errors + m_deep_errors)
1856 << " error(s) present with no repair possible" << dendl;
1857 }
1858 }
1859
1860 {
1861 // finish up
1862 ObjectStore::Transaction t;
1863 m_pg->recovery_state.update_stats(
1864 [this](auto& history, auto& stats) {
1865 dout(10) << "m_pg->recovery_state.update_stats()" << dendl;
1866 utime_t now = ceph_clock_now();
1867 history.last_scrub = m_pg->recovery_state.get_info().last_update;
1868 history.last_scrub_stamp = now;
1869 if (m_is_deep) {
1870 history.last_deep_scrub = m_pg->recovery_state.get_info().last_update;
1871 history.last_deep_scrub_stamp = now;
1872 }
1873
1874 if (m_is_deep) {
1875 if ((m_shallow_errors == 0) && (m_deep_errors == 0))
1876 history.last_clean_scrub_stamp = now;
1877 stats.stats.sum.num_shallow_scrub_errors = m_shallow_errors;
1878 stats.stats.sum.num_deep_scrub_errors = m_deep_errors;
1879 stats.stats.sum.num_large_omap_objects = m_omap_stats.large_omap_objects;
1880 stats.stats.sum.num_omap_bytes = m_omap_stats.omap_bytes;
1881 stats.stats.sum.num_omap_keys = m_omap_stats.omap_keys;
1882 dout(25) << "scrub_finish shard " << m_pg_whoami
1883 << " num_omap_bytes = " << stats.stats.sum.num_omap_bytes
1884 << " num_omap_keys = " << stats.stats.sum.num_omap_keys << dendl;
1885 } else {
1886 stats.stats.sum.num_shallow_scrub_errors = m_shallow_errors;
1887 // XXX: last_clean_scrub_stamp doesn't mean the pg is not inconsistent
1888 // because of deep-scrub errors
1889 if (m_shallow_errors == 0)
1890 history.last_clean_scrub_stamp = now;
1891 }
1892 stats.stats.sum.num_scrub_errors = stats.stats.sum.num_shallow_scrub_errors +
1893 stats.stats.sum.num_deep_scrub_errors;
1894 if (m_flags.check_repair) {
1895 m_flags.check_repair = false;
1896 if (m_pg->info.stats.stats.sum.num_scrub_errors) {
1897 state_set(PG_STATE_FAILED_REPAIR);
1898 dout(10) << "scrub_finish " << m_pg->info.stats.stats.sum.num_scrub_errors
1899 << " error(s) still present after re-scrub" << dendl;
1900 }
1901 }
1902 return true;
1903 },
1904 &t);
1905 int tr = m_osds->store->queue_transaction(m_pg->ch, std::move(t), nullptr);
1906 ceph_assert(tr == 0);
1907 }
1908
1909 if (has_error) {
1910 m_pg->queue_peering_event(PGPeeringEventRef(std::make_shared<PGPeeringEvent>(
1911 get_osdmap_epoch(), get_osdmap_epoch(), PeeringState::DoRecovery())));
1912 } else {
1913 m_is_repair = false;
1914 state_clear(PG_STATE_REPAIR);
1915 update_op_mode_text();
1916 }
1917
1918 cleanup_on_finish();
1919 if (do_auto_scrub) {
1920 request_rescrubbing(m_pg->m_planned_scrub);
1921 }
1922
1923 if (m_pg->is_active() && m_pg->is_primary()) {
1924 m_pg->recovery_state.share_pg_info();
1925 }
1926 }
1927
1928 void PgScrubber::on_digest_updates()
1929 {
1930 dout(10) << __func__ << " #pending: " << num_digest_updates_pending
1931 << (m_end.is_max() ? " <last chunk>" : " <mid chunk>")
1932 << (is_queued_or_active() ? "" : " ** not marked as scrubbing **")
1933 << dendl;
1934
1935 if (num_digest_updates_pending > 0) {
1936 // do nothing for now. We will be called again when new updates arrive
1937 return;
1938 }
1939
1940 // got all updates, and finished with this chunk. Any more?
1941 if (m_end.is_max()) {
1942 m_osds->queue_scrub_is_finished(m_pg);
1943 } else {
1944 // go get a new chunk (via "requeue")
1945 preemption_data.reset();
1946 m_osds->queue_scrub_next_chunk(m_pg, m_pg->is_scrub_blocking_ops());
1947 }
1948 }
1949
1950 /*
1951 * note that the flags-set fetched from the PG (m_pg->m_planned_scrub)
1952 * is cleared once scrubbing starts; Some of the values dumped here are
1953 * thus transitory.
1954 */
1955 void PgScrubber::dump_scrubber(ceph::Formatter* f,
1956 const requested_scrub_t& request_flags) const
1957 {
1958 f->open_object_section("scrubber");
1959
1960 if (m_active) { // TBD replace with PR#42780's test
1961 f->dump_bool("active", true);
1962 dump_active_scrubber(f, state_test(PG_STATE_DEEP_SCRUB));
1963 } else {
1964 f->dump_bool("active", false);
1965 f->dump_bool("must_scrub",
1966 (m_pg->m_planned_scrub.must_scrub || m_flags.required));
1967 f->dump_bool("must_deep_scrub", request_flags.must_deep_scrub);
1968 f->dump_bool("must_repair", request_flags.must_repair);
1969 f->dump_bool("need_auto", request_flags.need_auto);
1970
1971 f->dump_stream("scrub_reg_stamp") << m_scrub_job->get_sched_time();
1972
1973 // note that we are repeating logic that is coded elsewhere (currently PG.cc).
1974 // This is not optimal.
1975 bool deep_expected = (ceph_clock_now() >= m_pg->next_deepscrub_interval()) ||
1976 request_flags.must_deep_scrub || request_flags.need_auto;
1977 auto sched_state =
1978 m_scrub_job->scheduling_state(ceph_clock_now(), deep_expected);
1979 f->dump_string("schedule", sched_state);
1980 }
1981
1982 if (m_publish_sessions) {
1983 f->dump_int("test_sequence",
1984 m_sessions_counter); // an ever-increasing number used by tests
1985 }
1986
1987 f->close_section();
1988 }
1989
1990 void PgScrubber::dump_active_scrubber(ceph::Formatter* f, bool is_deep) const
1991 {
1992 f->dump_stream("epoch_start") << m_interval_start;
1993 f->dump_stream("start") << m_start;
1994 f->dump_stream("end") << m_end;
1995 f->dump_stream("max_end") << m_max_end;
1996 f->dump_stream("subset_last_update") << m_subset_last_update;
1997 // note that m_is_deep will be set some time after PG_STATE_DEEP_SCRUB is
1998 // asserted. Thus, using the latter.
1999 f->dump_bool("deep", is_deep);
2000
2001 // dump the scrub-type flags
2002 f->dump_bool("req_scrub", m_flags.required);
2003 f->dump_bool("auto_repair", m_flags.auto_repair);
2004 f->dump_bool("check_repair", m_flags.check_repair);
2005 f->dump_bool("deep_scrub_on_error", m_flags.deep_scrub_on_error);
2006 f->dump_unsigned("priority", m_flags.priority);
2007
2008 f->dump_int("shallow_errors", m_shallow_errors);
2009 f->dump_int("deep_errors", m_deep_errors);
2010 f->dump_int("fixed", m_fixed_count);
2011 {
2012 f->open_array_section("waiting_on_whom");
2013 for (const auto& p : m_maps_status.get_awaited()) {
2014 f->dump_stream("shard") << p;
2015 }
2016 f->close_section();
2017 }
2018 f->dump_string("schedule", "scrubbing");
2019 }
2020
2021 pg_scrubbing_status_t PgScrubber::get_schedule() const
2022 {
2023 dout(25) << __func__ << dendl;
2024
2025 if (!m_scrub_job) {
2026 return pg_scrubbing_status_t{};
2027 }
2028
2029 auto now_is = ceph_clock_now();
2030
2031 if (m_active) {
2032 // report current scrub info, including updated duration
2033 int32_t duration = (utime_t{now_is} - scrub_begin_stamp).sec();
2034
2035 return pg_scrubbing_status_t{
2036 utime_t{},
2037 duration,
2038 pg_scrub_sched_status_t::active,
2039 true, // active
2040 (m_is_deep ? scrub_level_t::deep : scrub_level_t::shallow),
2041 false /* is periodic? unknown, actually */};
2042 }
2043 if (m_scrub_job->state != ScrubQueue::qu_state_t::registered) {
2044 return pg_scrubbing_status_t{utime_t{},
2045 0,
2046 pg_scrub_sched_status_t::not_queued,
2047 false,
2048 scrub_level_t::shallow,
2049 false};
2050 }
2051
2052 // Will next scrub surely be a deep one? note that deep-scrub might be
2053 // selected even if we report a regular scrub here.
2054 bool deep_expected = (now_is >= m_pg->next_deepscrub_interval()) ||
2055 m_pg->m_planned_scrub.must_deep_scrub ||
2056 m_pg->m_planned_scrub.need_auto;
2057 scrub_level_t expected_level =
2058 deep_expected ? scrub_level_t::deep : scrub_level_t::shallow;
2059 bool periodic = !m_pg->m_planned_scrub.must_scrub &&
2060 !m_pg->m_planned_scrub.need_auto &&
2061 !m_pg->m_planned_scrub.must_deep_scrub;
2062
2063 // are we ripe for scrubbing?
2064 if (now_is > m_scrub_job->schedule.scheduled_at) {
2065 // we are waiting for our turn at the OSD.
2066 return pg_scrubbing_status_t{m_scrub_job->schedule.scheduled_at,
2067 0,
2068 pg_scrub_sched_status_t::queued,
2069 false,
2070 expected_level,
2071 periodic};
2072 }
2073
2074 return pg_scrubbing_status_t{m_scrub_job->schedule.scheduled_at,
2075 0,
2076 pg_scrub_sched_status_t::scheduled,
2077 false,
2078 expected_level,
2079 periodic};
2080 }
2081
2082 void PgScrubber::handle_query_state(ceph::Formatter* f)
2083 {
2084 dout(15) << __func__ << dendl;
2085
2086 f->open_object_section("scrub");
2087 f->dump_stream("scrubber.epoch_start") << m_interval_start;
2088 f->dump_bool("scrubber.active", m_active);
2089 f->dump_stream("scrubber.start") << m_start;
2090 f->dump_stream("scrubber.end") << m_end;
2091 f->dump_stream("scrubber.max_end") << m_max_end;
2092 f->dump_stream("scrubber.subset_last_update") << m_subset_last_update;
2093 f->dump_bool("scrubber.deep", m_is_deep);
2094 {
2095 f->open_array_section("scrubber.waiting_on_whom");
2096 for (const auto& p : m_maps_status.get_awaited()) {
2097 f->dump_stream("shard") << p;
2098 }
2099 f->close_section();
2100 }
2101
2102 f->dump_string("comment", "DEPRECATED - may be removed in the next release");
2103
2104 f->close_section();
2105 }
2106
2107 PgScrubber::~PgScrubber()
2108 {
2109 if (m_scrub_job) {
2110 // make sure the OSD won't try to scrub this one just now
2111 rm_from_osd_scrubbing();
2112 m_scrub_job.reset();
2113 }
2114 }
2115
2116 PgScrubber::PgScrubber(PG* pg)
2117 : m_pg{pg}
2118 , m_pg_id{pg->pg_id}
2119 , m_osds{m_pg->osd}
2120 , m_pg_whoami{pg->pg_whoami}
2121 , preemption_data{pg}
2122 {
2123 m_fsm = std::make_unique<ScrubMachine>(m_pg, this);
2124 m_fsm->initiate();
2125
2126 m_scrub_job = ceph::make_ref<ScrubQueue::ScrubJob>(m_osds->cct, m_pg->pg_id,
2127 m_osds->get_nodeid());
2128 }
2129
2130 void PgScrubber::set_scrub_begin_time()
2131 {
2132 scrub_begin_stamp = ceph_clock_now();
2133 }
2134
2135 void PgScrubber::set_scrub_duration()
2136 {
2137 utime_t stamp = ceph_clock_now();
2138 utime_t duration = stamp - scrub_begin_stamp;
2139 m_pg->recovery_state.update_stats([=](auto& history, auto& stats) {
2140 stats.last_scrub_duration = ceill(duration.to_msec()/1000.0);
2141 stats.scrub_duration = double(duration);
2142 return true;
2143 });
2144 }
2145
2146 void PgScrubber::reserve_replicas()
2147 {
2148 dout(10) << __func__ << dendl;
2149 m_reservations.emplace(m_pg, m_pg_whoami, m_scrub_job);
2150 }
2151
2152 void PgScrubber::cleanup_on_finish()
2153 {
2154 dout(10) << __func__ << dendl;
2155 ceph_assert(m_pg->is_locked());
2156
2157 state_clear(PG_STATE_SCRUBBING);
2158 state_clear(PG_STATE_DEEP_SCRUB);
2159 m_pg->publish_stats_to_osd();
2160
2161 clear_scrub_reservations();
2162 m_pg->publish_stats_to_osd();
2163
2164 requeue_waiting();
2165
2166 reset_internal_state();
2167 m_pg->publish_stats_to_osd();
2168 m_flags = scrub_flags_t{};
2169
2170 // type-specific state clear
2171 _scrub_clear_state();
2172 }
2173
2174 // uses process_event(), so must be invoked externally
2175 void PgScrubber::scrub_clear_state()
2176 {
2177 dout(10) << __func__ << dendl;
2178
2179 clear_pgscrub_state();
2180 m_fsm->process_event(FullReset{});
2181 }
2182
2183 /*
2184 * note: does not access the state-machine
2185 */
2186 void PgScrubber::clear_pgscrub_state()
2187 {
2188 dout(10) << __func__ << dendl;
2189 ceph_assert(m_pg->is_locked());
2190
2191 state_clear(PG_STATE_SCRUBBING);
2192 state_clear(PG_STATE_DEEP_SCRUB);
2193
2194 state_clear(PG_STATE_REPAIR);
2195
2196 clear_scrub_reservations();
2197 m_pg->publish_stats_to_osd();
2198
2199 requeue_waiting();
2200
2201 reset_internal_state();
2202 m_flags = scrub_flags_t{};
2203
2204 // type-specific state clear
2205 _scrub_clear_state();
2206 m_pg->publish_stats_to_osd();
2207 }
2208
2209 void PgScrubber::replica_handling_done()
2210 {
2211 dout(10) << __func__ << dendl;
2212
2213 state_clear(PG_STATE_SCRUBBING);
2214 state_clear(PG_STATE_DEEP_SCRUB);
2215
2216 reset_internal_state();
2217 }
2218
2219 /*
2220 * note: performs run_callbacks()
2221 * note: reservations-related variables are not reset here
2222 */
2223 void PgScrubber::reset_internal_state()
2224 {
2225 dout(10) << __func__ << dendl;
2226
2227 preemption_data.reset();
2228 m_maps_status.reset();
2229 m_received_maps.clear();
2230
2231 m_start = hobject_t{};
2232 m_end = hobject_t{};
2233 m_max_end = hobject_t{};
2234 m_subset_last_update = eversion_t{};
2235 m_shallow_errors = 0;
2236 m_deep_errors = 0;
2237 m_fixed_count = 0;
2238 m_omap_stats = (const struct omap_stat_t){0};
2239
2240 run_callbacks();
2241
2242 m_inconsistent.clear();
2243 m_missing.clear();
2244 m_authoritative.clear();
2245 num_digest_updates_pending = 0;
2246 m_primary_scrubmap = ScrubMap{};
2247 m_primary_scrubmap_pos.reset();
2248 replica_scrubmap = ScrubMap{};
2249 replica_scrubmap_pos.reset();
2250 m_cleaned_meta_map = ScrubMap{};
2251 m_needs_sleep = true;
2252 m_sleep_started_at = utime_t{};
2253
2254 m_active = false;
2255 clear_queued_or_active();
2256 ++m_sessions_counter;
2257 }
2258
2259 // note that only applicable to the Replica:
2260 void PgScrubber::advance_token()
2261 {
2262 dout(10) << __func__ << " was: " << m_current_token << dendl;
2263 m_current_token++;
2264
2265 // when advance_token() is called, it is assumed that no scrubbing takes place.
2266 // We will, though, verify that. And if we are actually still handling a stale request -
2267 // both our internal state and the FSM state will be cleared.
2268 replica_handling_done();
2269 m_fsm->process_event(FullReset{});
2270 }
2271
2272 bool PgScrubber::is_token_current(Scrub::act_token_t received_token)
2273 {
2274 if (received_token == 0 || received_token == m_current_token) {
2275 return true;
2276 }
2277 dout(5) << __func__ << " obsolete token (" << received_token
2278 << " vs current " << m_current_token << dendl;
2279
2280 return false;
2281 }
2282
2283 const OSDMapRef& PgScrubber::get_osdmap() const
2284 {
2285 return m_pg->get_osdmap();
2286 }
2287
2288 ostream& operator<<(ostream& out, const PgScrubber& scrubber)
2289 {
2290 return out << scrubber.m_flags;
2291 }
2292
2293 std::ostream& PgScrubber::gen_prefix(std::ostream& out) const
2294 {
2295 const auto fsm_state = m_fsm ? m_fsm->current_states_desc() : "- :";
2296 if (m_pg) {
2297 return m_pg->gen_prefix(out) << "scrubber " << fsm_state << ": ";
2298 } else {
2299 return out << " scrubber [~] " << fsm_state << ": ";
2300 }
2301 }
2302
2303 void PgScrubber::log_cluster_warning(const std::string& warning) const
2304 {
2305 m_osds->clog->do_log(CLOG_WARN, warning);
2306 }
2307
2308 ostream& PgScrubber::show(ostream& out) const
2309 {
2310 return out << " [ " << m_pg_id << ": " << m_flags << " ] ";
2311 }
2312
2313 int PgScrubber::asok_debug(std::string_view cmd,
2314 std::string param,
2315 Formatter* f,
2316 stringstream& ss)
2317 {
2318 dout(10) << __func__ << " cmd: " << cmd << " param: " << param << dendl;
2319
2320 if (cmd == "block") {
2321 // set a flag that will cause the next 'select_range' to report a blocked object
2322 m_debug_blockrange = 1;
2323
2324 } else if (cmd == "unblock") {
2325 // send an 'unblock' event, as if a blocked range was freed
2326 m_debug_blockrange = 0;
2327 m_fsm->process_event(Unblocked{});
2328
2329 } else if ((cmd == "set") || (cmd == "unset")) {
2330
2331 if (param == "sessions") {
2332 // set/reset the inclusion of the scrub sessions counter in 'query' output
2333 m_publish_sessions = (cmd == "set");
2334
2335 } else if (param == "block") {
2336 if (cmd == "set") {
2337 // set a flag that will cause the next 'select_range' to report a blocked object
2338 m_debug_blockrange = 1;
2339 } else {
2340 // send an 'unblock' event, as if a blocked range was freed
2341 m_debug_blockrange = 0;
2342 m_fsm->process_event(Unblocked{});
2343 }
2344 }
2345 }
2346
2347 return 0;
2348 }
2349 // ///////////////////// preemption_data_t //////////////////////////////////
2350
2351 PgScrubber::preemption_data_t::preemption_data_t(PG* pg) : m_pg{pg}
2352 {
2353 m_left = static_cast<int>(
2354 m_pg->get_cct()->_conf.get_val<uint64_t>("osd_scrub_max_preemptions"));
2355 }
2356
2357 void PgScrubber::preemption_data_t::reset()
2358 {
2359 std::lock_guard<std::mutex> lk{m_preemption_lock};
2360
2361 m_preemptable = false;
2362 m_preempted = false;
2363 m_left =
2364 static_cast<int>(m_pg->cct->_conf.get_val<uint64_t>("osd_scrub_max_preemptions"));
2365 m_size_divisor = 1;
2366 }
2367
2368
2369 // ///////////////////// ReplicaReservations //////////////////////////////////
2370 namespace Scrub {
2371
2372 void ReplicaReservations::release_replica(pg_shard_t peer, epoch_t epoch)
2373 {
2374 auto m = new MOSDScrubReserve(spg_t(m_pg_info.pgid.pgid, peer.shard), epoch,
2375 MOSDScrubReserve::RELEASE, m_pg->pg_whoami);
2376 m_osds->send_message_osd_cluster(peer.osd, m, epoch);
2377 }
2378
2379 ReplicaReservations::ReplicaReservations(PG* pg, pg_shard_t whoami, ScrubQueue::ScrubJobRef scrubjob)
2380 : m_pg{pg}
2381 , m_acting_set{pg->get_actingset()}
2382 , m_osds{m_pg->get_pg_osd(ScrubberPasskey())}
2383 , m_pending{static_cast<int>(m_acting_set.size()) - 1}
2384 , m_pg_info{m_pg->get_pg_info(ScrubberPasskey())}
2385 , m_scrub_job{scrubjob}
2386 {
2387 epoch_t epoch = m_pg->get_osdmap_epoch();
2388
2389 {
2390 std::stringstream prefix;
2391 prefix << "osd." << m_osds->whoami << " ep: " << epoch
2392 << " scrubber::ReplicaReservations pg[" << pg->pg_id << "]: ";
2393 m_log_msg_prefix = prefix.str();
2394 }
2395
2396 // handle the special case of no replicas
2397 if (m_pending <= 0) {
2398 // just signal the scrub state-machine to continue
2399 send_all_done();
2400
2401 } else {
2402
2403 for (auto p : m_acting_set) {
2404 if (p == whoami)
2405 continue;
2406 auto m = new MOSDScrubReserve(spg_t(m_pg_info.pgid.pgid, p.shard), epoch,
2407 MOSDScrubReserve::REQUEST, m_pg->pg_whoami);
2408 m_osds->send_message_osd_cluster(p.osd, m, epoch);
2409 m_waited_for_peers.push_back(p);
2410 dout(10) << __func__ << ": reserve " << p.osd << dendl;
2411 }
2412 }
2413 }
2414
2415 void ReplicaReservations::send_all_done()
2416 {
2417 m_osds->queue_for_scrub_granted(m_pg, scrub_prio_t::low_priority);
2418 }
2419
2420 void ReplicaReservations::send_reject()
2421 {
2422 m_scrub_job->resources_failure = true;
2423 m_osds->queue_for_scrub_denied(m_pg, scrub_prio_t::low_priority);
2424 }
2425
2426 void ReplicaReservations::discard_all()
2427 {
2428 dout(10) << __func__ << ": " << m_reserved_peers << dendl;
2429
2430 m_had_rejections = true; // preventing late-coming responses from triggering events
2431 m_reserved_peers.clear();
2432 m_waited_for_peers.clear();
2433 }
2434
2435 ReplicaReservations::~ReplicaReservations()
2436 {
2437 m_had_rejections = true; // preventing late-coming responses from triggering events
2438
2439 // send un-reserve messages to all reserved replicas. We do not wait for answer (there
2440 // wouldn't be one). Other incoming messages will be discarded on the way, by our
2441 // owner.
2442 epoch_t epoch = m_pg->get_osdmap_epoch();
2443
2444 for (auto& p : m_reserved_peers) {
2445 release_replica(p, epoch);
2446 }
2447 m_reserved_peers.clear();
2448
2449 // note: the release will follow on the heels of the request. When tried otherwise,
2450 // grants that followed a reject arrived after the whole scrub machine-state was
2451 // reset, causing leaked reservations.
2452 for (auto& p : m_waited_for_peers) {
2453 release_replica(p, epoch);
2454 }
2455 m_waited_for_peers.clear();
2456 }
2457
2458 /**
2459 * @ATTN we would not reach here if the ReplicaReservation object managed by the
2460 * scrubber was reset.
2461 */
2462 void ReplicaReservations::handle_reserve_grant(OpRequestRef op, pg_shard_t from)
2463 {
2464 dout(10) << __func__ << ": granted by " << from << dendl;
2465 op->mark_started();
2466
2467 {
2468 // reduce the amount of extra release messages. Not a must, but the log is cleaner
2469 auto w = find(m_waited_for_peers.begin(), m_waited_for_peers.end(), from);
2470 if (w != m_waited_for_peers.end())
2471 m_waited_for_peers.erase(w);
2472 }
2473
2474 // are we forced to reject the reservation?
2475 if (m_had_rejections) {
2476
2477 dout(10) << __func__ << ": rejecting late-coming reservation from "
2478 << from << dendl;
2479 release_replica(from, m_pg->get_osdmap_epoch());
2480
2481 } else if (std::find(m_reserved_peers.begin(), m_reserved_peers.end(), from) !=
2482 m_reserved_peers.end()) {
2483
2484 dout(10) << __func__ << ": already had osd." << from << " reserved" << dendl;
2485
2486 } else {
2487
2488 dout(10) << __func__ << ": osd." << from << " scrub reserve = success"
2489 << dendl;
2490 m_reserved_peers.push_back(from);
2491 if (--m_pending == 0) {
2492 send_all_done();
2493 }
2494 }
2495 }
2496
2497 void ReplicaReservations::handle_reserve_reject(OpRequestRef op, pg_shard_t from)
2498 {
2499 dout(10) << __func__ << ": rejected by " << from << dendl;
2500 dout(15) << __func__ << ": " << *op->get_req() << dendl;
2501 op->mark_started();
2502
2503 {
2504 // reduce the amount of extra release messages. Not a must, but the log is cleaner
2505 auto w = find(m_waited_for_peers.begin(), m_waited_for_peers.end(), from);
2506 if (w != m_waited_for_peers.end())
2507 m_waited_for_peers.erase(w);
2508 }
2509
2510 if (m_had_rejections) {
2511
2512 // our failure was already handled when the first rejection arrived
2513 dout(15) << __func__ << ": ignoring late-coming rejection from "
2514 << from << dendl;
2515
2516 } else if (std::find(m_reserved_peers.begin(), m_reserved_peers.end(), from) !=
2517 m_reserved_peers.end()) {
2518
2519 dout(10) << __func__ << ": already had osd." << from << " reserved" << dendl;
2520
2521 } else {
2522
2523 dout(10) << __func__ << ": osd." << from << " scrub reserve = fail" << dendl;
2524 m_had_rejections = true; // preventing any additional notifications
2525 send_reject();
2526 }
2527 }
2528
2529 std::ostream& ReplicaReservations::gen_prefix(std::ostream& out) const
2530 {
2531 return out << m_log_msg_prefix;
2532 }
2533
2534 // ///////////////////// LocalReservation //////////////////////////////////
2535
2536 // note: no dout()s in LocalReservation functions. Client logs interactions.
2537 LocalReservation::LocalReservation(OSDService* osds)
2538 : m_osds{osds}
2539 {
2540 if (m_osds->get_scrub_services().inc_scrubs_local()) {
2541 // the failure is signalled by not having m_holding_local_reservation set
2542 m_holding_local_reservation = true;
2543 }
2544 }
2545
2546 LocalReservation::~LocalReservation()
2547 {
2548 if (m_holding_local_reservation) {
2549 m_holding_local_reservation = false;
2550 m_osds->get_scrub_services().dec_scrubs_local();
2551 }
2552 }
2553
2554 // ///////////////////// ReservedByRemotePrimary ///////////////////////////////
2555
2556 ReservedByRemotePrimary::ReservedByRemotePrimary(const PgScrubber* scrubber,
2557 PG* pg,
2558 OSDService* osds,
2559 epoch_t epoch)
2560 : m_scrubber{scrubber}
2561 , m_pg{pg}
2562 , m_osds{osds}
2563 , m_reserved_at{epoch}
2564 {
2565 if (!m_osds->get_scrub_services().inc_scrubs_remote()) {
2566 dout(10) << __func__ << ": failed to reserve at Primary request" << dendl;
2567 // the failure is signalled by not having m_reserved_by_remote_primary set
2568 return;
2569 }
2570
2571 dout(20) << __func__ << ": scrub resources reserved at Primary request" << dendl;
2572 m_reserved_by_remote_primary = true;
2573 }
2574
2575 bool ReservedByRemotePrimary::is_stale() const
2576 {
2577 return m_reserved_at < m_pg->get_same_interval_since();
2578 }
2579
2580 ReservedByRemotePrimary::~ReservedByRemotePrimary()
2581 {
2582 if (m_reserved_by_remote_primary) {
2583 m_reserved_by_remote_primary = false;
2584 m_osds->get_scrub_services().dec_scrubs_remote();
2585 }
2586 }
2587
2588 std::ostream& ReservedByRemotePrimary::gen_prefix(std::ostream& out) const
2589 {
2590 return m_scrubber->gen_prefix(out);
2591 }
2592
2593 // ///////////////////// MapsCollectionStatus ////////////////////////////////
2594
2595 auto MapsCollectionStatus::mark_arriving_map(pg_shard_t from)
2596 -> std::tuple<bool, std::string_view>
2597 {
2598 auto fe = std::find(m_maps_awaited_for.begin(), m_maps_awaited_for.end(), from);
2599 if (fe != m_maps_awaited_for.end()) {
2600 // we are indeed waiting for a map from this replica
2601 m_maps_awaited_for.erase(fe);
2602 return std::tuple{true, ""sv};
2603 } else {
2604 return std::tuple{false, " unsolicited scrub-map"sv};
2605 }
2606 }
2607
2608 void MapsCollectionStatus::reset()
2609 {
2610 *this = MapsCollectionStatus{};
2611 }
2612
2613 std::string MapsCollectionStatus::dump() const
2614 {
2615 std::string all;
2616 for (const auto& rp : m_maps_awaited_for) {
2617 all.append(rp.get_osd() + " "s);
2618 }
2619 return all;
2620 }
2621
2622 ostream& operator<<(ostream& out, const MapsCollectionStatus& sf)
2623 {
2624 out << " [ ";
2625 for (const auto& rp : sf.m_maps_awaited_for) {
2626 out << rp.get_osd() << " ";
2627 }
2628 if (!sf.m_local_map_ready) {
2629 out << " local ";
2630 }
2631 return out << " ] ";
2632 }
2633
2634 // ///////////////////// blocked_range_t ///////////////////////////////
2635
2636 blocked_range_t::blocked_range_t(OSDService* osds, ceph::timespan waittime, spg_t pg_id)
2637 : m_osds{osds}
2638 {
2639 auto now_is = std::chrono::system_clock::now();
2640 m_callbk = new LambdaContext([now_is, pg_id, osds]([[maybe_unused]] int r) {
2641 std::time_t now_c = std::chrono::system_clock::to_time_t(now_is);
2642 char buf[50];
2643 strftime(buf, sizeof(buf), "%Y-%m-%dT%H:%M:%S", std::localtime(&now_c));
2644 lgeneric_subdout(g_ceph_context, osd, 10)
2645 << "PgScrubber: " << pg_id << " blocked on an object for too long (since " << buf
2646 << ")" << dendl;
2647 osds->clog->warn() << "osd." << osds->whoami << " PgScrubber: " << pg_id << " blocked on an object for too long (since " << buf << ")";
2648 return;
2649 });
2650
2651 std::lock_guard l(m_osds->sleep_lock);
2652 m_osds->sleep_timer.add_event_after(waittime, m_callbk);
2653 }
2654
2655 blocked_range_t::~blocked_range_t()
2656 {
2657 std::lock_guard l(m_osds->sleep_lock);
2658 m_osds->sleep_timer.cancel_event(m_callbk);
2659 }
2660
2661 } // namespace Scrub