]> git.proxmox.com Git - ceph.git/blob - ceph/src/osd/scrubber/pg_scrubber.cc
220d2933cd00b601d8bb8633cabc27a63cd0ea17
[ceph.git] / ceph / src / osd / scrubber / pg_scrubber.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=2 sw=2 smarttab
3
4 #include "./pg_scrubber.h" // the '.' notation used to affect clang-format order
5
6 #include <cmath>
7 #include <iostream>
8 #include <vector>
9
10 #include "debug.h"
11
12 #include "common/errno.h"
13 #include "messages/MOSDOp.h"
14 #include "messages/MOSDRepScrub.h"
15 #include "messages/MOSDRepScrubMap.h"
16 #include "messages/MOSDScrub.h"
17 #include "messages/MOSDScrubReserve.h"
18
19 #include "osd/OSD.h"
20 #include "osd/osd_types_fmt.h"
21 #include "ScrubStore.h"
22 #include "scrub_machine.h"
23
24 using std::list;
25 using std::map;
26 using std::pair;
27 using std::set;
28 using std::stringstream;
29 using std::vector;
30 using namespace Scrub;
31 using namespace std::chrono;
32 using namespace std::chrono_literals;
33 using namespace std::literals;
34
35 #define dout_context (m_osds->cct)
36 #define dout_subsys ceph_subsys_osd
37 #undef dout_prefix
38 #define dout_prefix _prefix(_dout, this)
39
40 template <class T>
41 static ostream& _prefix(std::ostream* _dout, T* t)
42 {
43 return t->gen_prefix(*_dout);
44 }
45
46 ostream& operator<<(ostream& out, const scrub_flags_t& sf)
47 {
48 if (sf.auto_repair)
49 out << " AUTO_REPAIR";
50 if (sf.check_repair)
51 out << " CHECK_REPAIR";
52 if (sf.deep_scrub_on_error)
53 out << " DEEP_SCRUB_ON_ERROR";
54 if (sf.required)
55 out << " REQ_SCRUB";
56
57 return out;
58 }
59
60 ostream& operator<<(ostream& out, const requested_scrub_t& sf)
61 {
62 if (sf.must_repair)
63 out << " MUST_REPAIR";
64 if (sf.auto_repair)
65 out << " planned AUTO_REPAIR";
66 if (sf.check_repair)
67 out << " planned CHECK_REPAIR";
68 if (sf.deep_scrub_on_error)
69 out << " planned DEEP_SCRUB_ON_ERROR";
70 if (sf.must_deep_scrub)
71 out << " MUST_DEEP_SCRUB";
72 if (sf.must_scrub)
73 out << " MUST_SCRUB";
74 if (sf.time_for_deep)
75 out << " TIME_FOR_DEEP";
76 if (sf.need_auto)
77 out << " NEED_AUTO";
78 if (sf.req_scrub)
79 out << " planned REQ_SCRUB";
80
81 return out;
82 }
83
84 /*
85 * if the incoming message is from a previous interval, it must mean
86 * PrimaryLogPG::on_change() was called when that interval ended. We can safely discard
87 * the stale message.
88 */
89 bool PgScrubber::check_interval(epoch_t epoch_to_verify)
90 {
91 return epoch_to_verify >= m_pg->get_same_interval_since();
92 }
93
94 bool PgScrubber::is_message_relevant(epoch_t epoch_to_verify)
95 {
96 if (!m_active) {
97 // not scrubbing. We can assume that the scrub was already terminated, and we
98 // can silently discard the incoming event.
99 return false;
100 }
101
102 // is this a message from before we started this scrub?
103 if (epoch_to_verify < m_epoch_start) {
104 return false;
105 }
106
107 // has a new interval started?
108 if (!check_interval(epoch_to_verify)) {
109 // if this is a new interval, on_change() has already terminated that
110 // old scrub.
111 return false;
112 }
113
114 ceph_assert(is_primary());
115
116 // were we instructed to abort?
117 return verify_against_abort(epoch_to_verify);
118 }
119
120 bool PgScrubber::verify_against_abort(epoch_t epoch_to_verify)
121 {
122 if (!should_abort()) {
123 return true;
124 }
125
126 dout(10) << __func__ << " aborting. incoming epoch: " << epoch_to_verify
127 << " vs last-aborted: " << m_last_aborted << dendl;
128
129 // if we were not aware of the abort before - kill the scrub.
130 if (epoch_to_verify >= m_last_aborted) {
131 scrub_clear_state();
132 m_last_aborted = std::max(epoch_to_verify, m_epoch_start);
133 }
134 return false;
135 }
136
137 bool PgScrubber::should_abort() const
138 {
139 if (m_flags.required) {
140 return false; // not stopping 'required' scrubs for configuration changes
141 }
142
143 if (m_is_deep) {
144 if (get_osdmap()->test_flag(CEPH_OSDMAP_NODEEP_SCRUB) ||
145 m_pg->pool.info.has_flag(pg_pool_t::FLAG_NODEEP_SCRUB)) {
146 dout(10) << "nodeep_scrub set, aborting" << dendl;
147 return true;
148 }
149 } else if (get_osdmap()->test_flag(CEPH_OSDMAP_NOSCRUB) ||
150 m_pg->pool.info.has_flag(pg_pool_t::FLAG_NOSCRUB)) {
151 dout(10) << "noscrub set, aborting" << dendl;
152 return true;
153 }
154
155 return false;
156 }
157
158 // initiating state-machine events --------------------------------
159
160 /*
161 * a note re the checks performed before sending scrub-initiating messages:
162 *
163 * For those ('StartScrub', 'AfterRepairScrub') scrub-initiation messages that
164 * possibly were in the queue while the PG changed state and became unavailable for
165 * scrubbing:
166 *
167 * The check_interval() catches all major changes to the PG. As for the other conditions
168 * we may check (and see is_message_relevant() above):
169 *
170 * - we are not 'active' yet, so must not check against is_active(), and:
171 *
172 * - the 'abort' flags were just verified (when the triggering message was queued). As
173 * those are only modified in human speeds - they need not be queried again.
174 *
175 * Some of the considerations above are also relevant to the replica-side initiation
176 * ('StartReplica' & 'StartReplicaNoWait').
177 */
178
179 void PgScrubber::initiate_regular_scrub(epoch_t epoch_queued)
180 {
181 dout(15) << __func__ << " epoch: " << epoch_queued << dendl;
182 // we may have lost our Primary status while the message languished in the queue
183 if (check_interval(epoch_queued)) {
184 dout(10) << "scrubber event -->> StartScrub epoch: " << epoch_queued << dendl;
185 reset_epoch(epoch_queued);
186 m_fsm->process_event(StartScrub{});
187 dout(10) << "scrubber event --<< StartScrub" << dendl;
188 } else {
189 clear_queued_or_active(); // also restarts snap trimming
190 }
191 }
192
193 void PgScrubber::initiate_scrub_after_repair(epoch_t epoch_queued)
194 {
195 dout(15) << __func__ << " epoch: " << epoch_queued << dendl;
196 // we may have lost our Primary status while the message languished in the queue
197 if (check_interval(epoch_queued)) {
198 dout(10) << "scrubber event -->> AfterRepairScrub epoch: " << epoch_queued << dendl;
199 reset_epoch(epoch_queued);
200 m_fsm->process_event(AfterRepairScrub{});
201 dout(10) << "scrubber event --<< AfterRepairScrub" << dendl;
202 } else {
203 clear_queued_or_active(); // also restarts snap trimming
204 }
205 }
206
207 void PgScrubber::send_scrub_unblock(epoch_t epoch_queued)
208 {
209 dout(10) << "scrubber event -->> " << __func__ << " epoch: " << epoch_queued << dendl;
210 if (is_message_relevant(epoch_queued)) {
211 m_fsm->process_event(Unblocked{});
212 }
213 dout(10) << "scrubber event --<< " << __func__ << dendl;
214 }
215
216 void PgScrubber::send_scrub_resched(epoch_t epoch_queued)
217 {
218 dout(10) << "scrubber event -->> " << __func__ << " epoch: " << epoch_queued << dendl;
219 if (is_message_relevant(epoch_queued)) {
220 m_fsm->process_event(InternalSchedScrub{});
221 }
222 dout(10) << "scrubber event --<< " << __func__ << dendl;
223 }
224
225 void PgScrubber::send_start_replica(epoch_t epoch_queued, Scrub::act_token_t token)
226 {
227 dout(10) << "scrubber event -->> " << __func__ << " epoch: " << epoch_queued
228 << " token: " << token << dendl;
229 if (is_primary()) {
230 // shouldn't happen. Ignore
231 dout(1) << "got a replica scrub request while Primary!" << dendl;
232 return;
233 }
234
235 if (check_interval(epoch_queued) && is_token_current(token)) {
236 // save us some time by not waiting for updates if there are none
237 // to wait for. Affects the transition from NotActive into either
238 // ReplicaWaitUpdates or ActiveReplica.
239 if (pending_active_pushes())
240 m_fsm->process_event(StartReplica{});
241 else
242 m_fsm->process_event(StartReplicaNoWait{});
243 }
244 dout(10) << "scrubber event --<< " << __func__ << dendl;
245 }
246
247 void PgScrubber::send_sched_replica(epoch_t epoch_queued, Scrub::act_token_t token)
248 {
249 dout(10) << "scrubber event -->> " << __func__ << " epoch: " << epoch_queued
250 << " token: " << token << dendl;
251 if (check_interval(epoch_queued) && is_token_current(token)) {
252 m_fsm->process_event(SchedReplica{}); // retest for map availability
253 }
254 dout(10) << "scrubber event --<< " << __func__ << dendl;
255 }
256
257 void PgScrubber::active_pushes_notification(epoch_t epoch_queued)
258 {
259 // note: Primary only
260 dout(10) << "scrubber event -->> " << __func__ << " epoch: " << epoch_queued << dendl;
261 if (is_message_relevant(epoch_queued)) {
262 m_fsm->process_event(ActivePushesUpd{});
263 }
264 dout(10) << "scrubber event --<< " << __func__ << dendl;
265 }
266
267 void PgScrubber::update_applied_notification(epoch_t epoch_queued)
268 {
269 // note: Primary only
270 dout(10) << "scrubber event -->> " << __func__ << " epoch: " << epoch_queued << dendl;
271 if (is_message_relevant(epoch_queued)) {
272 m_fsm->process_event(UpdatesApplied{});
273 }
274 dout(10) << "scrubber event --<< " << __func__ << dendl;
275 }
276
277 void PgScrubber::digest_update_notification(epoch_t epoch_queued)
278 {
279 // note: Primary only
280 dout(10) << "scrubber event -->> " << __func__ << " epoch: " << epoch_queued << dendl;
281 if (is_message_relevant(epoch_queued)) {
282 m_fsm->process_event(DigestUpdate{});
283 }
284 dout(10) << "scrubber event --<< " << __func__ << dendl;
285 }
286
287 void PgScrubber::send_local_map_done(epoch_t epoch_queued)
288 {
289 dout(10) << "scrubber event -->> " << __func__ << " epoch: " << epoch_queued << dendl;
290 if (is_message_relevant(epoch_queued)) {
291 m_fsm->process_event(Scrub::IntLocalMapDone{});
292 }
293 dout(10) << "scrubber event --<< " << __func__ << dendl;
294 }
295
296 void PgScrubber::send_replica_maps_ready(epoch_t epoch_queued)
297 {
298 dout(10) << "scrubber event -->> " << __func__ << " epoch: " << epoch_queued << dendl;
299 if (is_message_relevant(epoch_queued)) {
300 m_fsm->process_event(GotReplicas{});
301 }
302 dout(10) << "scrubber event --<< " << __func__ << dendl;
303 }
304
305 void PgScrubber::send_replica_pushes_upd(epoch_t epoch_queued)
306 {
307 dout(10) << "scrubber event -->> " << __func__ << " epoch: " << epoch_queued << dendl;
308 if (check_interval(epoch_queued)) {
309 m_fsm->process_event(ReplicaPushesUpd{});
310 }
311 dout(10) << "scrubber event --<< " << __func__ << dendl;
312 }
313
314 void PgScrubber::send_remotes_reserved(epoch_t epoch_queued)
315 {
316 dout(10) << "scrubber event -->> " << __func__ << " epoch: " << epoch_queued << dendl;
317 // note: scrub is not active yet
318 if (check_interval(epoch_queued)) {
319 m_fsm->process_event(RemotesReserved{});
320 }
321 dout(10) << "scrubber event --<< " << __func__ << dendl;
322 }
323
324 void PgScrubber::send_reservation_failure(epoch_t epoch_queued)
325 {
326 dout(10) << "scrubber event -->> " << __func__ << " epoch: " << epoch_queued << dendl;
327 if (check_interval(epoch_queued)) { // do not check for 'active'!
328 m_fsm->process_event(ReservationFailure{});
329 }
330 dout(10) << "scrubber event --<< " << __func__ << dendl;
331 }
332
333 void PgScrubber::send_full_reset(epoch_t epoch_queued)
334 {
335 dout(10) << "scrubber event -->> " << __func__ << " epoch: " << epoch_queued << dendl;
336
337 m_fsm->process_event(Scrub::FullReset{});
338
339 dout(10) << "scrubber event --<< " << __func__ << dendl;
340 }
341
342 void PgScrubber::send_chunk_free(epoch_t epoch_queued)
343 {
344 dout(10) << "scrubber event -->> " << __func__ << " epoch: " << epoch_queued << dendl;
345 if (check_interval(epoch_queued)) {
346 m_fsm->process_event(Scrub::SelectedChunkFree{});
347 }
348 dout(10) << "scrubber event --<< " << __func__ << dendl;
349 }
350
351 void PgScrubber::send_chunk_busy(epoch_t epoch_queued)
352 {
353 dout(10) << "scrubber event -->> " << __func__ << " epoch: " << epoch_queued << dendl;
354 if (check_interval(epoch_queued)) {
355 m_fsm->process_event(Scrub::ChunkIsBusy{});
356 }
357 dout(10) << "scrubber event --<< " << __func__ << dendl;
358 }
359
360 void PgScrubber::send_get_next_chunk(epoch_t epoch_queued)
361 {
362 dout(10) << "scrubber event -->> " << __func__ << " epoch: " << epoch_queued << dendl;
363 if (is_message_relevant(epoch_queued)) {
364 m_fsm->process_event(Scrub::NextChunk{});
365 }
366 dout(10) << "scrubber event --<< " << __func__ << dendl;
367 }
368
369 void PgScrubber::send_scrub_is_finished(epoch_t epoch_queued)
370 {
371 dout(10) << "scrubber event -->> " << __func__ << " epoch: " << epoch_queued << dendl;
372
373 // can't check for "active"
374
375 m_fsm->process_event(Scrub::ScrubFinished{});
376
377 dout(10) << "scrubber event --<< " << __func__ << dendl;
378 }
379
380 void PgScrubber::send_maps_compared(epoch_t epoch_queued)
381 {
382 dout(10) << "scrubber event -->> " << __func__ << " epoch: " << epoch_queued << dendl;
383
384 m_fsm->process_event(Scrub::MapsCompared{});
385
386 dout(10) << "scrubber event --<< " << __func__ << dendl;
387 }
388
389 // -----------------
390
391 bool PgScrubber::is_reserving() const
392 {
393 return m_fsm->is_reserving();
394 }
395
396 void PgScrubber::reset_epoch(epoch_t epoch_queued)
397 {
398 dout(10) << __func__ << " state deep? " << state_test(PG_STATE_DEEP_SCRUB) << dendl;
399 m_fsm->assert_not_active();
400
401 m_epoch_start = epoch_queued;
402 m_needs_sleep = true;
403 m_is_deep = state_test(PG_STATE_DEEP_SCRUB);
404 update_op_mode_text();
405 }
406
407 unsigned int PgScrubber::scrub_requeue_priority(Scrub::scrub_prio_t with_priority) const
408 {
409 unsigned int qu_priority = m_flags.priority;
410
411 if (with_priority == Scrub::scrub_prio_t::high_priority) {
412 qu_priority =
413 std::max(qu_priority, (unsigned int)m_pg->get_cct()->_conf->osd_client_op_priority);
414 }
415 return qu_priority;
416 }
417
418 unsigned int PgScrubber::scrub_requeue_priority(Scrub::scrub_prio_t with_priority,
419 unsigned int suggested_priority) const
420 {
421 if (with_priority == Scrub::scrub_prio_t::high_priority) {
422 suggested_priority = std::max(suggested_priority,
423 (unsigned int)m_pg->cct->_conf->osd_client_op_priority);
424 }
425 return suggested_priority;
426 }
427
428 // ///////////////////////////////////////////////////////////////////// //
429 // scrub-op registration handling
430
431 void PgScrubber::unregister_from_osd()
432 {
433 if (m_scrub_job) {
434 dout(15) << __func__ << " prev. state: " << registration_state() << dendl;
435 m_osds->get_scrub_services().remove_from_osd_queue(m_scrub_job);
436 }
437 }
438
439 bool PgScrubber::is_scrub_registered() const
440 {
441 return m_scrub_job && m_scrub_job->in_queues;
442 }
443
444 std::string_view PgScrubber::registration_state() const
445 {
446 if (m_scrub_job) {
447 return m_scrub_job->registration_state();
448 }
449 return "(no sched job)"sv;
450 }
451
452 void PgScrubber::rm_from_osd_scrubbing()
453 {
454 // make sure the OSD won't try to scrub this one just now
455 unregister_from_osd();
456 }
457
458 void PgScrubber::on_primary_change(const requested_scrub_t& request_flags)
459 {
460 dout(10) << __func__ << (is_primary() ? " Primary " : " Replica ")
461 << " flags: " << request_flags << dendl;
462
463 if (!m_scrub_job) {
464 return;
465 }
466
467 dout(15) << __func__ << " scrub-job state: " << m_scrub_job->state_desc() << dendl;
468
469 if (is_primary()) {
470 auto suggested = determine_scrub_time(request_flags);
471 m_osds->get_scrub_services().register_with_osd(m_scrub_job, suggested);
472 } else {
473 m_osds->get_scrub_services().remove_from_osd_queue(m_scrub_job);
474 }
475
476 dout(15) << __func__ << " done " << registration_state() << dendl;
477 }
478
479 void PgScrubber::on_maybe_registration_change(const requested_scrub_t& request_flags)
480 {
481 dout(10) << __func__ << (is_primary() ? " Primary " : " Replica/other ")
482 << registration_state() << " flags: " << request_flags << dendl;
483
484 on_primary_change(request_flags);
485 dout(15) << __func__ << " done " << registration_state() << dendl;
486 }
487
488 void PgScrubber::update_scrub_job(const requested_scrub_t& request_flags)
489 {
490 dout(10) << __func__ << " flags: " << request_flags << dendl;
491
492 {
493 // verify that the 'in_q' status matches our "Primariority"
494 if (m_scrub_job && is_primary() && !m_scrub_job->in_queues) {
495 dout(1) << __func__ << " !!! primary but not scheduled! " << dendl;
496 }
497 }
498
499 if (is_primary() && m_scrub_job) {
500 auto suggested = determine_scrub_time(request_flags);
501 m_osds->get_scrub_services().update_job(m_scrub_job, suggested);
502 }
503
504 dout(15) << __func__ << " done " << registration_state() << dendl;
505 }
506
507 ScrubQueue::sched_params_t
508 PgScrubber::determine_scrub_time(const requested_scrub_t& request_flags) const
509 {
510 ScrubQueue::sched_params_t res;
511
512 if (!is_primary()) {
513 return res; // with ok_to_scrub set to 'false'
514 }
515
516 if (request_flags.must_scrub || request_flags.need_auto) {
517
518 // Set the smallest time that isn't utime_t()
519 res.proposed_time = PgScrubber::scrub_must_stamp();
520 res.is_must = ScrubQueue::must_scrub_t::mandatory;
521 // we do not need the interval data in this case
522
523 } else if (m_pg->info.stats.stats_invalid &&
524 m_pg->cct->_conf->osd_scrub_invalid_stats) {
525 res.proposed_time = ceph_clock_now();
526 res.is_must = ScrubQueue::must_scrub_t::mandatory;
527
528 } else {
529 res.proposed_time = m_pg->info.history.last_scrub_stamp;
530 res.min_interval =
531 m_pg->get_pool().info.opts.value_or(pool_opts_t::SCRUB_MIN_INTERVAL, 0.0);
532 res.max_interval =
533 m_pg->get_pool().info.opts.value_or(pool_opts_t::SCRUB_MAX_INTERVAL, 0.0);
534 }
535
536 dout(15) << __func__ << " suggested: " << res.proposed_time << " hist: "
537 << m_pg->info.history.last_scrub_stamp << " v:" << m_pg->info.stats.stats_invalid
538 << " / " << m_pg->cct->_conf->osd_scrub_invalid_stats << " must:"
539 << (res.is_must==ScrubQueue::must_scrub_t::mandatory ? "y" : "n" )
540 << " pool min: " << res.min_interval
541 << dendl;
542 return res;
543 }
544
545 void PgScrubber::scrub_requested(scrub_level_t scrub_level,
546 scrub_type_t scrub_type,
547 requested_scrub_t& req_flags)
548 {
549 dout(10) << __func__ << (scrub_level == scrub_level_t::deep ? " deep " : " shallow ")
550 << (scrub_type == scrub_type_t::do_repair ? " repair-scrub " : " not-repair ")
551 << " prev stamp: " << m_scrub_job->get_sched_time()
552 << " registered? " << registration_state()
553 << dendl;
554
555 req_flags.must_scrub = true;
556 req_flags.must_deep_scrub =
557 (scrub_level == scrub_level_t::deep) || (scrub_type == scrub_type_t::do_repair);
558 req_flags.must_repair = (scrub_type == scrub_type_t::do_repair);
559 // User might intervene, so clear this
560 req_flags.need_auto = false;
561 req_flags.req_scrub = true;
562
563 dout(20) << __func__ << " pg(" << m_pg_id << ") planned:" << req_flags << dendl;
564
565 update_scrub_job(req_flags);
566 m_pg->publish_stats_to_osd();
567 }
568
569
570 void PgScrubber::request_rescrubbing(requested_scrub_t& request_flags)
571 {
572 dout(10) << __func__ << " flags: " << request_flags << dendl;
573
574 request_flags.need_auto = true;
575 update_scrub_job(request_flags);
576 }
577
578 bool PgScrubber::reserve_local()
579 {
580 // try to create the reservation object (which translates into asking the
581 // OSD for the local scrub resource). If failing - undo it immediately
582
583 m_local_osd_resource.emplace(m_osds);
584 if (m_local_osd_resource->is_reserved()) {
585 dout(15) << __func__ << ": local resources reserved" << dendl;
586 return true;
587 }
588
589 dout(10) << __func__ << ": failed to reserve local scrub resources" << dendl;
590 m_local_osd_resource.reset();
591 return false;
592 }
593
594 // ----------------------------------------------------------------------------
595
596 bool PgScrubber::has_pg_marked_new_updates() const
597 {
598 auto last_applied = m_pg->recovery_state.get_last_update_applied();
599 dout(10) << __func__ << " recovery last: " << last_applied
600 << " vs. scrub's: " << m_subset_last_update << dendl;
601
602 return last_applied >= m_subset_last_update;
603 }
604
605 void PgScrubber::set_subset_last_update(eversion_t e)
606 {
607 m_subset_last_update = e;
608 dout(15) << __func__ << " last-update: " << e << dendl;
609 }
610
611 void PgScrubber::on_applied_when_primary(const eversion_t& applied_version)
612 {
613 // we are only interested in updates if we are the Primary, and in state
614 // WaitLastUpdate
615 if (m_fsm->is_accepting_updates() && (applied_version >= m_subset_last_update)) {
616 m_osds->queue_scrub_applied_update(m_pg, m_pg->is_scrub_blocking_ops());
617 dout(15) << __func__ << " update: " << applied_version
618 << " vs. required: " << m_subset_last_update << dendl;
619 }
620 }
621
622 /*
623 * The selected range is set directly into 'm_start' and 'm_end'
624 * setting:
625 * - m_subset_last_update
626 * - m_max_end
627 * - end
628 * - start
629 */
630 bool PgScrubber::select_range()
631 {
632 m_primary_scrubmap = ScrubMap{};
633 m_received_maps.clear();
634
635 /* get the start and end of our scrub chunk
636 *
637 * Our scrub chunk has an important restriction we're going to need to
638 * respect. We can't let head be start or end.
639 * Using a half-open interval means that if end == head,
640 * we'd scrub/lock head and the clone right next to head in different
641 * chunks which would allow us to miss clones created between
642 * scrubbing that chunk and scrubbing the chunk including head.
643 * This isn't true for any of the other clones since clones can
644 * only be created "just to the left of" head. There is one exception
645 * to this: promotion of clones which always happens to the left of the
646 * left-most clone, but promote_object checks the scrubber in that
647 * case, so it should be ok. Also, it's ok to "miss" clones at the
648 * left end of the range if we are a tier because they may legitimately
649 * not exist (see _scrub).
650 */
651 int min_idx = static_cast<int>(std::max<int64_t>(
652 3, m_pg->get_cct()->_conf->osd_scrub_chunk_min / (int)preemption_data.chunk_divisor()));
653
654 int max_idx = static_cast<int>(std::max<int64_t>(min_idx, m_pg->get_cct()->_conf->osd_scrub_chunk_max /
655 (int)preemption_data.chunk_divisor()));
656
657 dout(10) << __func__ << " Min: " << min_idx << " Max: " << max_idx
658 << " Div: " << preemption_data.chunk_divisor() << dendl;
659
660 hobject_t start = m_start;
661 hobject_t candidate_end;
662 std::vector<hobject_t> objects;
663 int ret = m_pg->get_pgbackend()->objects_list_partial(start, min_idx, max_idx, &objects,
664 &candidate_end);
665 ceph_assert(ret >= 0);
666
667 if (!objects.empty()) {
668
669 hobject_t back = objects.back();
670 while (candidate_end.is_head() && candidate_end == back.get_head()) {
671 candidate_end = back;
672 objects.pop_back();
673 if (objects.empty()) {
674 ceph_assert(0 ==
675 "Somehow we got more than 2 objects which"
676 "have the same head but are not clones");
677 }
678 back = objects.back();
679 }
680
681 if (candidate_end.is_head()) {
682 ceph_assert(candidate_end != back.get_head());
683 candidate_end = candidate_end.get_object_boundary();
684 }
685
686 } else {
687 ceph_assert(candidate_end.is_max());
688 }
689
690 // is that range free for us? if not - we will be rescheduled later by whoever
691 // triggered us this time
692
693 if (!m_pg->_range_available_for_scrub(m_start, candidate_end)) {
694 // we'll be requeued by whatever made us unavailable for scrub
695 dout(10) << __func__ << ": scrub blocked somewhere in range "
696 << "[" << m_start << ", " << candidate_end << ")" << dendl;
697 return false;
698 }
699
700 m_end = candidate_end;
701 if (m_end > m_max_end)
702 m_max_end = m_end;
703
704 dout(15) << __func__ << " range selected: " << m_start << " //// " << m_end << " //// "
705 << m_max_end << dendl;
706
707 // debug: be 'blocked' if told so by the 'pg scrub_debug block' asok command
708 if (m_debug_blockrange > 0) {
709 m_debug_blockrange--;
710 return false;
711 }
712 return true;
713 }
714
715 void PgScrubber::select_range_n_notify()
716 {
717 if (select_range()) {
718 // the next chunk to handle is not blocked
719 dout(20) << __func__ << ": selection OK" << dendl;
720 m_osds->queue_scrub_chunk_free(m_pg, Scrub::scrub_prio_t::low_priority);
721
722 } else {
723 // we will wait for the objects range to become available for scrubbing
724 dout(10) << __func__ << ": selected chunk is busy" << dendl;
725 m_osds->queue_scrub_chunk_busy(m_pg, Scrub::scrub_prio_t::low_priority);
726 }
727 }
728
729 bool PgScrubber::write_blocked_by_scrub(const hobject_t& soid)
730 {
731 if (soid < m_start || soid >= m_end) {
732 return false;
733 }
734
735 dout(20) << __func__ << " " << soid << " can preempt? "
736 << preemption_data.is_preemptable() << " already preempted? "
737 << preemption_data.was_preempted() << dendl;
738
739 if (preemption_data.was_preempted()) {
740 // otherwise - write requests arriving while 'already preempted' is set
741 // but 'preemptable' is not - will not be allowed to continue, and will
742 // not be requeued on time.
743 return false;
744 }
745
746 if (preemption_data.is_preemptable()) {
747
748 dout(10) << __func__ << " " << soid << " preempted" << dendl;
749
750 // signal the preemption
751 preemption_data.do_preempt();
752 m_end = m_start; // free the range we were scrubbing
753
754 return false;
755 }
756 return true;
757 }
758
759 bool PgScrubber::range_intersects_scrub(const hobject_t& start, const hobject_t& end)
760 {
761 // does [start, end] intersect [scrubber.start, scrubber.m_max_end)
762 return (start < m_max_end && end >= m_start);
763 }
764
765 Scrub::BlockedRangeWarning PgScrubber::acquire_blocked_alarm()
766 {
767 return std::make_unique<blocked_range_t>(m_osds, ceph::timespan{300s}, m_pg_id);
768 }
769
770 /**
771 * if we are required to sleep:
772 * arrange a callback sometimes later.
773 * be sure to be able to identify a stale callback.
774 * Otherwise: perform a requeue (i.e. - rescheduling thru the OSD queue)
775 * anyway.
776 */
777 void PgScrubber::add_delayed_scheduling()
778 {
779 m_end = m_start; // not blocking any range now
780
781 milliseconds sleep_time{0ms};
782 if (m_needs_sleep) {
783 double scrub_sleep =
784 1000.0 * m_osds->get_scrub_services().scrub_sleep_time(m_flags.required);
785 sleep_time = milliseconds{int64_t(scrub_sleep)};
786 }
787 dout(15) << __func__ << " sleep: " << sleep_time.count() << "ms. needed? "
788 << m_needs_sleep << dendl;
789
790 if (sleep_time.count()) {
791 // schedule a transition for some 'sleep_time' ms in the future
792
793 m_needs_sleep = false;
794 m_sleep_started_at = ceph_clock_now();
795
796 // the following log line is used by osd-scrub-test.sh
797 dout(20) << __func__ << " scrub state is PendingTimer, sleeping" << dendl;
798
799 // the 'delayer' for crimson is different. Will be factored out.
800
801 spg_t pgid = m_pg->get_pgid();
802 auto callbk = new LambdaContext([osds = m_osds, pgid, scrbr = this](
803 [[maybe_unused]] int r) mutable {
804 PGRef pg = osds->osd->lookup_lock_pg(pgid);
805 if (!pg) {
806 lgeneric_subdout(g_ceph_context, osd, 10)
807 << "scrub_requeue_callback: Could not find "
808 << "PG " << pgid << " can't complete scrub requeue after sleep" << dendl;
809 return;
810 }
811 scrbr->m_needs_sleep = true;
812 lgeneric_dout(scrbr->get_pg_cct(), 7)
813 << "scrub_requeue_callback: slept for "
814 << ceph_clock_now() - scrbr->m_sleep_started_at << ", re-queuing scrub" << dendl;
815
816 scrbr->m_sleep_started_at = utime_t{};
817 osds->queue_for_scrub_resched(&(*pg), Scrub::scrub_prio_t::low_priority);
818 pg->unlock();
819 });
820
821 std::lock_guard l(m_osds->sleep_lock);
822 m_osds->sleep_timer.add_event_after(sleep_time.count() / 1000.0f, callbk);
823
824 } else {
825 // just a requeue
826 m_osds->queue_for_scrub_resched(m_pg, Scrub::scrub_prio_t::high_priority);
827 }
828 }
829
830 eversion_t PgScrubber::search_log_for_updates() const
831 {
832 auto& projected = m_pg->projected_log.log;
833 auto pi = find_if(
834 projected.crbegin(), projected.crend(),
835 [this](const auto& e) -> bool { return e.soid >= m_start && e.soid < m_end; });
836
837 if (pi != projected.crend())
838 return pi->version;
839
840 // there was no relevant update entry in the log
841
842 auto& log = m_pg->recovery_state.get_pg_log().get_log().log;
843 auto p = find_if(log.crbegin(), log.crend(), [this](const auto& e) -> bool {
844 return e.soid >= m_start && e.soid < m_end;
845 });
846
847 if (p == log.crend())
848 return eversion_t{};
849 else
850 return p->version;
851 }
852
853 void PgScrubber::get_replicas_maps(bool replica_can_preempt)
854 {
855 dout(10) << __func__ << " started in epoch/interval: " << m_epoch_start << "/"
856 << m_interval_start
857 << " pg same_interval_since: " << m_pg->info.history.same_interval_since
858 << dendl;
859
860 m_primary_scrubmap_pos.reset();
861
862 // ask replicas to scan and send maps
863 for (const auto& i : m_pg->get_acting_recovery_backfill()) {
864
865 if (i == m_pg_whoami)
866 continue;
867
868 m_maps_status.mark_replica_map_request(i);
869 _request_scrub_map(i, m_subset_last_update, m_start, m_end, m_is_deep,
870 replica_can_preempt);
871 }
872
873 dout(10) << __func__ << " awaiting" << m_maps_status << dendl;
874 }
875
876 bool PgScrubber::was_epoch_changed() const
877 {
878 // for crimson we have m_pg->get_info().history.same_interval_since
879 dout(10) << __func__ << " epoch_start: " << m_interval_start
880 << " from pg: " << m_pg->get_history().same_interval_since << dendl;
881
882 return m_interval_start < m_pg->get_history().same_interval_since;
883 }
884
885 void PgScrubber::mark_local_map_ready()
886 {
887 m_maps_status.mark_local_map_ready();
888 }
889
890 bool PgScrubber::are_all_maps_available() const
891 {
892 return m_maps_status.are_all_maps_available();
893 }
894
895 std::string PgScrubber::dump_awaited_maps() const
896 {
897 return m_maps_status.dump();
898 }
899
900 void PgScrubber::update_op_mode_text()
901 {
902 auto visible_repair = state_test(PG_STATE_REPAIR);
903 m_mode_desc = (visible_repair ? "repair" : (m_is_deep ? "deep-scrub" : "scrub"));
904
905 dout(10) << __func__ << ": repair: visible: " << (visible_repair ? "true" : "false")
906 << ", internal: " << (m_is_repair ? "true" : "false")
907 << ". Displayed: " << m_mode_desc << dendl;
908 }
909
910 void PgScrubber::_request_scrub_map(pg_shard_t replica,
911 eversion_t version,
912 hobject_t start,
913 hobject_t end,
914 bool deep,
915 bool allow_preemption)
916 {
917 ceph_assert(replica != m_pg_whoami);
918 dout(10) << __func__ << " scrubmap from osd." << replica
919 << (deep ? " deep" : " shallow") << dendl;
920
921 auto repscrubop =
922 new MOSDRepScrub(spg_t(m_pg->info.pgid.pgid, replica.shard), version,
923 get_osdmap_epoch(), m_pg->get_last_peering_reset(), start, end, deep,
924 allow_preemption, m_flags.priority, m_pg->ops_blocked_by_scrub());
925
926 // default priority. We want the replica-scrub processed prior to any recovery
927 // or client io messages (we are holding a lock!)
928 m_osds->send_message_osd_cluster(replica.osd, repscrubop, get_osdmap_epoch());
929 }
930
931 void PgScrubber::cleanup_store(ObjectStore::Transaction* t)
932 {
933 if (!m_store)
934 return;
935
936 struct OnComplete : Context {
937 std::unique_ptr<Scrub::Store> store;
938 explicit OnComplete(std::unique_ptr<Scrub::Store>&& store) : store(std::move(store))
939 {}
940 void finish(int) override {}
941 };
942 m_store->cleanup(t);
943 t->register_on_complete(new OnComplete(std::move(m_store)));
944 ceph_assert(!m_store);
945 }
946
947 void PgScrubber::on_init()
948 {
949 // going upwards from 'inactive'
950 ceph_assert(!is_scrub_active());
951 m_pg->reset_objects_scrubbed();
952 preemption_data.reset();
953 m_pg->publish_stats_to_osd();
954 m_interval_start = m_pg->get_history().same_interval_since;
955
956 dout(10) << __func__ << " start same_interval:" << m_interval_start << dendl;
957
958 // create a new store
959 {
960 ObjectStore::Transaction t;
961 cleanup_store(&t);
962 m_store.reset(
963 Scrub::Store::create(m_pg->osd->store, &t, m_pg->info.pgid, m_pg->coll));
964 m_pg->osd->store->queue_transaction(m_pg->ch, std::move(t), nullptr);
965 }
966
967 m_start = m_pg->info.pgid.pgid.get_hobj_start();
968 m_active = true;
969 ++m_sessions_counter;
970 m_pg->publish_stats_to_osd();
971 }
972
973 void PgScrubber::on_replica_init()
974 {
975 m_active = true;
976 ++m_sessions_counter;
977 }
978
979 void PgScrubber::_scan_snaps(ScrubMap& smap)
980 {
981 hobject_t head;
982 SnapSet snapset;
983
984 // Test qa/standalone/scrub/osd-scrub-snaps.sh greps for the strings
985 // in this function
986 dout(15) << "_scan_snaps starts" << dendl;
987
988 for (auto i = smap.objects.rbegin(); i != smap.objects.rend(); ++i) {
989
990 const hobject_t& hoid = i->first;
991 ScrubMap::object& o = i->second;
992
993 dout(20) << __func__ << " " << hoid << dendl;
994
995 ceph_assert(!hoid.is_snapdir());
996 if (hoid.is_head()) {
997 // parse the SnapSet
998 bufferlist bl;
999 if (o.attrs.find(SS_ATTR) == o.attrs.end()) {
1000 continue;
1001 }
1002 bl.push_back(o.attrs[SS_ATTR]);
1003 auto p = bl.cbegin();
1004 try {
1005 decode(snapset, p);
1006 } catch (...) {
1007 continue;
1008 }
1009 head = hoid.get_head();
1010 continue;
1011 }
1012
1013 if (hoid.snap < CEPH_MAXSNAP) {
1014 // check and if necessary fix snap_mapper
1015 if (hoid.get_head() != head) {
1016 derr << __func__ << " no head for " << hoid << " (have " << head << ")" << dendl;
1017 continue;
1018 }
1019 set<snapid_t> obj_snaps;
1020 auto p = snapset.clone_snaps.find(hoid.snap);
1021 if (p == snapset.clone_snaps.end()) {
1022 derr << __func__ << " no clone_snaps for " << hoid << " in " << snapset << dendl;
1023 continue;
1024 }
1025 obj_snaps.insert(p->second.begin(), p->second.end());
1026 set<snapid_t> cur_snaps;
1027 int r = m_pg->snap_mapper.get_snaps(hoid, &cur_snaps);
1028 if (r != 0 && r != -ENOENT) {
1029 derr << __func__ << ": get_snaps returned " << cpp_strerror(r) << dendl;
1030 ceph_abort();
1031 }
1032 if (r == -ENOENT || cur_snaps != obj_snaps) {
1033 ObjectStore::Transaction t;
1034 OSDriver::OSTransaction _t(m_pg->osdriver.get_transaction(&t));
1035 if (r == 0) {
1036 r = m_pg->snap_mapper.remove_oid(hoid, &_t);
1037 if (r != 0) {
1038 derr << __func__ << ": remove_oid returned " << cpp_strerror(r) << dendl;
1039 ceph_abort();
1040 }
1041 m_pg->osd->clog->error()
1042 << "osd." << m_pg->osd->whoami << " found snap mapper error on pg "
1043 << m_pg->info.pgid << " oid " << hoid << " snaps in mapper: " << cur_snaps
1044 << ", oi: " << obj_snaps << "...repaired";
1045 } else {
1046 m_pg->osd->clog->error()
1047 << "osd." << m_pg->osd->whoami << " found snap mapper error on pg "
1048 << m_pg->info.pgid << " oid " << hoid << " snaps missing in mapper"
1049 << ", should be: " << obj_snaps << " was " << cur_snaps << " r " << r
1050 << "...repaired";
1051 }
1052 m_pg->snap_mapper.add_oid(hoid, obj_snaps, &_t);
1053
1054 // wait for repair to apply to avoid confusing other bits of the system.
1055 {
1056 dout(15) << __func__ << " wait on repair!" << dendl;
1057
1058 ceph::condition_variable my_cond;
1059 ceph::mutex my_lock = ceph::make_mutex("PG::_scan_snaps my_lock");
1060 int e = 0;
1061 bool done;
1062
1063 t.register_on_applied_sync(new C_SafeCond(my_lock, my_cond, &done, &e));
1064
1065 e = m_pg->osd->store->queue_transaction(m_pg->ch, std::move(t));
1066 if (e != 0) {
1067 derr << __func__ << ": queue_transaction got " << cpp_strerror(e) << dendl;
1068 } else {
1069 std::unique_lock l{my_lock};
1070 my_cond.wait(l, [&done] { return done; });
1071 }
1072 }
1073 }
1074 }
1075 }
1076 }
1077
1078 int PgScrubber::build_primary_map_chunk()
1079 {
1080 epoch_t map_building_since = m_pg->get_osdmap_epoch();
1081 dout(20) << __func__ << ": initiated at epoch " << map_building_since << dendl;
1082
1083 auto ret = build_scrub_map_chunk(m_primary_scrubmap, m_primary_scrubmap_pos, m_start,
1084 m_end, m_is_deep);
1085
1086 if (ret == -EINPROGRESS) {
1087 // reschedule another round of asking the backend to collect the scrub data
1088 m_osds->queue_for_scrub_resched(m_pg, Scrub::scrub_prio_t::low_priority);
1089 }
1090 return ret;
1091 }
1092
1093 int PgScrubber::build_replica_map_chunk()
1094 {
1095 dout(10) << __func__ << " interval start: " << m_interval_start
1096 << " current token: " << m_current_token << " epoch: " << m_epoch_start
1097 << " deep: " << m_is_deep << dendl;
1098
1099 auto ret = build_scrub_map_chunk(replica_scrubmap, replica_scrubmap_pos, m_start, m_end,
1100 m_is_deep);
1101
1102 switch (ret) {
1103
1104 case -EINPROGRESS:
1105 // must wait for the backend to finish. No external event source.
1106 // (note: previous version used low priority here. Now switched to using the
1107 // priority of the original message)
1108 m_osds->queue_for_rep_scrub_resched(m_pg, m_replica_request_priority,
1109 m_flags.priority, m_current_token);
1110 break;
1111
1112 case 0: {
1113 // finished!
1114 m_cleaned_meta_map.clear_from(m_start);
1115 m_cleaned_meta_map.insert(replica_scrubmap);
1116 auto for_meta_scrub = clean_meta_map();
1117 _scan_snaps(for_meta_scrub);
1118
1119 // the local map has been created. Send it to the primary.
1120 // Note: once the message reaches the Primary, it may ask us for another
1121 // chunk - and we better be done with the current scrub. Thus - the preparation of
1122 // the reply message is separate, and we clear the scrub state before actually
1123 // sending it.
1124
1125 auto reply = prep_replica_map_msg(PreemptionNoted::no_preemption);
1126 replica_handling_done();
1127 dout(15) << __func__ << " chunk map sent " << dendl;
1128 send_replica_map(reply);
1129 } break;
1130
1131 default:
1132 // negative retval: build_scrub_map_chunk() signalled an error
1133 // Pre-Pacific code ignored this option, treating it as a success.
1134 // \todo Add an error flag in the returning message.
1135 dout(1) << "Error! Aborting. ActiveReplica::react(SchedReplica) Ret: " << ret
1136 << dendl;
1137 replica_handling_done();
1138 // only in debug mode for now:
1139 assert(false && "backend error");
1140 break;
1141 };
1142
1143 return ret;
1144 }
1145
1146 int PgScrubber::build_scrub_map_chunk(
1147 ScrubMap& map, ScrubMapBuilder& pos, hobject_t start, hobject_t end, bool deep)
1148 {
1149 dout(10) << __func__ << " [" << start << "," << end << ") "
1150 << " pos " << pos << " Deep: " << deep << dendl;
1151
1152 // start
1153 while (pos.empty()) {
1154
1155 pos.deep = deep;
1156 map.valid_through = m_pg->info.last_update;
1157
1158 // objects
1159 vector<ghobject_t> rollback_obs;
1160 pos.ret =
1161 m_pg->get_pgbackend()->objects_list_range(start, end, &pos.ls, &rollback_obs);
1162 dout(10) << __func__ << " while pos empty " << pos.ret << dendl;
1163 if (pos.ret < 0) {
1164 dout(5) << "objects_list_range error: " << pos.ret << dendl;
1165 return pos.ret;
1166 }
1167 dout(10) << __func__ << " pos.ls.empty()? " << (pos.ls.empty() ? "+" : "-") << dendl;
1168 if (pos.ls.empty()) {
1169 break;
1170 }
1171 m_pg->_scan_rollback_obs(rollback_obs);
1172 pos.pos = 0;
1173 return -EINPROGRESS;
1174 }
1175
1176 // scan objects
1177 while (!pos.done()) {
1178
1179 int r = m_pg->get_pgbackend()->be_scan_list(map, pos);
1180 dout(30) << __func__ << " BE returned " << r << dendl;
1181 if (r == -EINPROGRESS) {
1182 dout(20) << __func__ << " in progress" << dendl;
1183 return r;
1184 }
1185 }
1186
1187 // finish
1188 dout(20) << __func__ << " finishing" << dendl;
1189 ceph_assert(pos.done());
1190 m_pg->_repair_oinfo_oid(map);
1191
1192 dout(20) << __func__ << " done, got " << map.objects.size() << " items" << dendl;
1193 return 0;
1194 }
1195
1196 /*
1197 * Process:
1198 * Building a map of objects suitable for snapshot validation.
1199 * The data in m_cleaned_meta_map is the left over partial items that need to
1200 * be completed before they can be processed.
1201 *
1202 * Snapshots in maps precede the head object, which is why we are scanning backwards.
1203 */
1204 ScrubMap PgScrubber::clean_meta_map()
1205 {
1206 ScrubMap for_meta_scrub;
1207
1208 if (m_end.is_max() || m_cleaned_meta_map.objects.empty()) {
1209 m_cleaned_meta_map.swap(for_meta_scrub);
1210 } else {
1211 auto iter = m_cleaned_meta_map.objects.end();
1212 --iter; // not empty, see 'if' clause
1213 auto begin = m_cleaned_meta_map.objects.begin();
1214 if (iter->first.has_snapset()) {
1215 ++iter;
1216 } else {
1217 while (iter != begin) {
1218 auto next = iter--;
1219 if (next->first.get_head() != iter->first.get_head()) {
1220 ++iter;
1221 break;
1222 }
1223 }
1224 }
1225 for_meta_scrub.objects.insert(begin, iter);
1226 m_cleaned_meta_map.objects.erase(begin, iter);
1227 }
1228
1229 return for_meta_scrub;
1230 }
1231
1232 void PgScrubber::run_callbacks()
1233 {
1234 std::list<Context*> to_run;
1235 to_run.swap(m_callbacks);
1236
1237 for (auto& tr : to_run) {
1238 tr->complete(0);
1239 }
1240 }
1241
1242 void PgScrubber::maps_compare_n_cleanup()
1243 {
1244 scrub_compare_maps();
1245 m_start = m_end;
1246 run_callbacks();
1247 requeue_waiting();
1248 m_osds->queue_scrub_maps_compared(m_pg, Scrub::scrub_prio_t::low_priority);
1249 }
1250
1251 Scrub::preemption_t& PgScrubber::get_preemptor()
1252 {
1253 return preemption_data;
1254 }
1255
1256 /*
1257 * Process note: called for the arriving "give me your map, replica!" request.
1258 * Unlike the original implementation, we do not requeue the Op waiting for
1259 * updates. Instead - we trigger the FSM.
1260 */
1261 void PgScrubber::replica_scrub_op(OpRequestRef op)
1262 {
1263 op->mark_started();
1264 auto msg = op->get_req<MOSDRepScrub>();
1265 dout(10) << __func__ << " pg:" << m_pg->pg_id
1266 << " Msg: map_epoch:" << msg->map_epoch
1267 << " min_epoch:" << msg->min_epoch << " deep?" << msg->deep << dendl;
1268
1269 // are we still processing a previous scrub-map request without noticing that
1270 // the interval changed? won't see it here, but rather at the reservation
1271 // stage.
1272
1273 if (msg->map_epoch < m_pg->info.history.same_interval_since) {
1274 dout(10) << "replica_scrub_op discarding old replica_scrub from "
1275 << msg->map_epoch << " < "
1276 << m_pg->info.history.same_interval_since << dendl;
1277
1278 // is there a general sync issue? are we holding a stale reservation?
1279 // not checking now - assuming we will actively react to interval change.
1280
1281 return;
1282 }
1283
1284 if (is_queued_or_active()) {
1285 // this is bug!
1286 // Somehow, we have received a new scrub request from our Primary, before
1287 // having finished with the previous one. Did we go through an interval
1288 // change without reseting the FSM? Possible responses:
1289 // - crashing (the original assert_not_active() implemented that one), or
1290 // - trying to recover:
1291 // - (logging enough information to debug this scenario)
1292 // - reset the FSM.
1293 m_osds->clog->warn() << fmt::format(
1294 "{}: error: a second scrub-op received while handling the previous one",
1295 __func__);
1296
1297 scrub_clear_state();
1298 m_osds->clog->warn() << fmt::format(
1299 "{}: after a reset. Now handling the new OP", __func__);
1300 }
1301 // make sure the FSM is at NotActive
1302 m_fsm->assert_not_active();
1303
1304 replica_scrubmap = ScrubMap{};
1305 replica_scrubmap_pos = ScrubMapBuilder{};
1306
1307 m_replica_min_epoch = msg->min_epoch;
1308 m_start = msg->start;
1309 m_end = msg->end;
1310 m_max_end = msg->end;
1311 m_is_deep = msg->deep;
1312 m_interval_start = m_pg->info.history.same_interval_since;
1313 m_replica_request_priority = msg->high_priority
1314 ? Scrub::scrub_prio_t::high_priority
1315 : Scrub::scrub_prio_t::low_priority;
1316 m_flags.priority = msg->priority ? msg->priority : m_pg->get_scrub_priority();
1317
1318 preemption_data.reset();
1319 preemption_data.force_preemptability(msg->allow_preemption);
1320
1321 replica_scrubmap_pos.reset();
1322
1323 set_queued_or_active();
1324 m_osds->queue_for_rep_scrub(m_pg, m_replica_request_priority,
1325 m_flags.priority, m_current_token);
1326 }
1327
1328 void PgScrubber::set_op_parameters(requested_scrub_t& request)
1329 {
1330 dout(10) << __func__ << " input: " << request << dendl;
1331
1332 set_queued_or_active(); // we are fully committed now.
1333
1334 // write down the epoch of starting a new scrub. Will be used
1335 // to discard stale messages from previous aborted scrubs.
1336 m_epoch_start = m_pg->get_osdmap_epoch();
1337
1338 m_flags.check_repair = request.check_repair;
1339 m_flags.auto_repair = request.auto_repair || request.need_auto;
1340 m_flags.required = request.req_scrub || request.must_scrub;
1341
1342 m_flags.priority = (request.must_scrub || request.need_auto)
1343 ? get_pg_cct()->_conf->osd_requested_scrub_priority
1344 : m_pg->get_scrub_priority();
1345
1346 state_set(PG_STATE_SCRUBBING);
1347
1348 // will we be deep-scrubbing?
1349 if (request.must_deep_scrub || request.need_auto || request.time_for_deep) {
1350 state_set(PG_STATE_DEEP_SCRUB);
1351 }
1352
1353 // m_is_repair is set for either 'must_repair' or 'repair-on-the-go' (i.e.
1354 // deep-scrub with the auto_repair configuration flag set). m_is_repair value
1355 // determines the scrubber behavior.
1356 // PG_STATE_REPAIR, on the other hand, is only used for status reports (inc. the
1357 // PG status as appearing in the logs).
1358 m_is_repair = request.must_repair || m_flags.auto_repair;
1359 if (request.must_repair) {
1360 state_set(PG_STATE_REPAIR);
1361 // not calling update_op_mode_text() yet, as m_is_deep not set yet
1362 }
1363
1364 // the publishing here is required for tests synchronization
1365 m_pg->publish_stats_to_osd();
1366 m_flags.deep_scrub_on_error = request.deep_scrub_on_error;
1367 }
1368
1369 void PgScrubber::scrub_compare_maps()
1370 {
1371 dout(10) << __func__ << " has maps, analyzing" << dendl;
1372
1373 // construct authoritative scrub map for type-specific scrubbing
1374 m_cleaned_meta_map.insert(m_primary_scrubmap);
1375 map<hobject_t, pair<std::optional<uint32_t>, std::optional<uint32_t>>> missing_digest;
1376
1377 map<pg_shard_t, ScrubMap*> maps;
1378 maps[m_pg_whoami] = &m_primary_scrubmap;
1379
1380 for (const auto& i : m_pg->get_acting_recovery_backfill()) {
1381 if (i == m_pg_whoami)
1382 continue;
1383 dout(2) << __func__ << " replica " << i << " has "
1384 << m_received_maps[i].objects.size() << " items" << dendl;
1385 maps[i] = &m_received_maps[i];
1386 }
1387
1388 set<hobject_t> master_set;
1389
1390 // Construct master set
1391 for (const auto& map : maps) {
1392 for (const auto& i : map.second->objects) {
1393 master_set.insert(i.first);
1394 }
1395 }
1396
1397 stringstream ss;
1398 m_pg->get_pgbackend()->be_omap_checks(maps, master_set, m_omap_stats, ss);
1399
1400 if (!ss.str().empty()) {
1401 m_osds->clog->warn(ss);
1402 }
1403
1404 if (m_pg->recovery_state.get_acting_recovery_backfill().size() > 1) {
1405
1406 dout(10) << __func__ << " comparing replica scrub maps" << dendl;
1407
1408 // Map from object with errors to good peer
1409 map<hobject_t, list<pg_shard_t>> authoritative;
1410
1411 dout(2) << __func__ << ": primary (" << m_pg->get_primary() << ") has "
1412 << m_primary_scrubmap.objects.size() << " items" << dendl;
1413 m_pg->add_objects_scrubbed_count(m_primary_scrubmap.objects.size());
1414
1415 ss.str("");
1416 ss.clear();
1417
1418 m_pg->get_pgbackend()->be_compare_scrubmaps(
1419 maps, master_set, m_is_repair, m_missing, m_inconsistent,
1420 authoritative, missing_digest, m_shallow_errors, m_deep_errors, m_store.get(),
1421 m_pg->info.pgid, m_pg->recovery_state.get_acting(), ss);
1422
1423 if (!ss.str().empty()) {
1424 m_osds->clog->error(ss);
1425 }
1426
1427 for (auto& i : authoritative) {
1428 list<pair<ScrubMap::object, pg_shard_t>> good_peers;
1429 for (list<pg_shard_t>::const_iterator j = i.second.begin(); j != i.second.end();
1430 ++j) {
1431 good_peers.emplace_back(maps[*j]->objects[i.first], *j);
1432 }
1433 m_authoritative.emplace(i.first, good_peers);
1434 }
1435
1436 for (auto i = authoritative.begin(); i != authoritative.end(); ++i) {
1437 m_cleaned_meta_map.objects.erase(i->first);
1438 m_cleaned_meta_map.objects.insert(
1439 *(maps[i->second.back()]->objects.find(i->first)));
1440 }
1441 }
1442
1443 auto for_meta_scrub = clean_meta_map();
1444
1445 // ok, do the pg-type specific scrubbing
1446
1447 // (Validates consistency of the object info and snap sets)
1448 scrub_snapshot_metadata(for_meta_scrub, missing_digest);
1449
1450 // Called here on the primary can use an authoritative map if it isn't the primary
1451 _scan_snaps(for_meta_scrub);
1452
1453 if (!m_store->empty()) {
1454
1455 if (m_is_repair) {
1456 dout(10) << __func__ << ": discarding scrub results" << dendl;
1457 m_store->flush(nullptr);
1458 } else {
1459 dout(10) << __func__ << ": updating scrub object" << dendl;
1460 ObjectStore::Transaction t;
1461 m_store->flush(&t);
1462 m_pg->osd->store->queue_transaction(m_pg->ch, std::move(t), nullptr);
1463 }
1464 }
1465 }
1466
1467 ScrubMachineListener::MsgAndEpoch PgScrubber::prep_replica_map_msg(
1468 PreemptionNoted was_preempted)
1469 {
1470 dout(10) << __func__ << " min epoch:" << m_replica_min_epoch << dendl;
1471
1472 auto reply =
1473 make_message<MOSDRepScrubMap>(spg_t(m_pg->info.pgid.pgid, m_pg->get_primary().shard),
1474 m_replica_min_epoch, m_pg_whoami);
1475
1476 reply->preempted = (was_preempted == PreemptionNoted::preempted);
1477 ::encode(replica_scrubmap, reply->get_data());
1478
1479 return ScrubMachineListener::MsgAndEpoch{reply, m_replica_min_epoch};
1480 }
1481
1482 void PgScrubber::send_replica_map(const MsgAndEpoch& preprepared)
1483 {
1484 m_pg->send_cluster_message(m_pg->get_primary().osd, preprepared.m_msg,
1485 preprepared.m_epoch, false);
1486 }
1487
1488 void PgScrubber::send_preempted_replica()
1489 {
1490 auto reply =
1491 make_message<MOSDRepScrubMap>(spg_t{m_pg->info.pgid.pgid, m_pg->get_primary().shard},
1492 m_replica_min_epoch, m_pg_whoami);
1493
1494 reply->preempted = true;
1495 ::encode(replica_scrubmap, reply->get_data()); // must not skip this
1496 m_pg->send_cluster_message(m_pg->get_primary().osd, reply, m_replica_min_epoch, false);
1497 }
1498
1499 /*
1500 * - if the replica lets us know it was interrupted, we mark the chunk as interrupted.
1501 * The state-machine will react to that when all replica maps are received.
1502 * - when all maps are received, we signal the FSM with the GotReplicas event (see
1503 * scrub_send_replmaps_ready()). Note that due to the no-reentrancy limitations of the
1504 * FSM, we do not 'process' the event directly. Instead - it is queued for the OSD to
1505 * handle.
1506 */
1507 void PgScrubber::map_from_replica(OpRequestRef op)
1508 {
1509 auto m = op->get_req<MOSDRepScrubMap>();
1510 dout(15) << __func__ << " " << *m << dendl;
1511
1512 if (m->map_epoch < m_pg->info.history.same_interval_since) {
1513 dout(10) << __func__ << " discarding old from " << m->map_epoch << " < "
1514 << m_pg->info.history.same_interval_since << dendl;
1515 return;
1516 }
1517
1518 auto p = const_cast<bufferlist&>(m->get_data()).cbegin();
1519
1520 m_received_maps[m->from].decode(p, m_pg->info.pgid.pool());
1521 dout(15) << "map version is " << m_received_maps[m->from].valid_through << dendl;
1522
1523 auto [is_ok, err_txt] = m_maps_status.mark_arriving_map(m->from);
1524 if (!is_ok) {
1525 // previously an unexpected map was triggering an assert. Now, as scrubs can be
1526 // aborted at any time, the chances of this happening have increased, and aborting is
1527 // not justified
1528 dout(1) << __func__ << err_txt << " from OSD " << m->from << dendl;
1529 return;
1530 }
1531
1532 if (m->preempted) {
1533 dout(10) << __func__ << " replica was preempted, setting flag" << dendl;
1534 preemption_data.do_preempt();
1535 }
1536
1537 if (m_maps_status.are_all_maps_available()) {
1538 dout(15) << __func__ << " all repl-maps available" << dendl;
1539 m_osds->queue_scrub_got_repl_maps(m_pg, m_pg->is_scrub_blocking_ops());
1540 }
1541 }
1542
1543 void PgScrubber::handle_scrub_reserve_request(OpRequestRef op)
1544 {
1545 dout(10) << __func__ << " " << *op->get_req() << dendl;
1546 op->mark_started();
1547 auto request_ep = op->get_req<MOSDScrubReserve>()->get_map_epoch();
1548
1549 /*
1550 * if we are currently holding a reservation, then:
1551 * either (1) we, the scrubber, did not yet notice an interval change. The remembered
1552 * reservation epoch is from before our interval, and we can silently discard the
1553 * reservation (no message is required).
1554 * or:
1555 * (2) the interval hasn't changed, but the same Primary that (we think) holds the
1556 * lock just sent us a new request. Note that we know it's the same Primary, as
1557 * otherwise the interval would have changed.
1558 * Ostensibly we can discard & redo the reservation. But then we
1559 * will be temporarily releasing the OSD resource - and might not be able to grab it
1560 * again. Thus, we simply treat this as a successful new request
1561 * (but mark the fact that if there is a previous request from the primary to
1562 * scrub a specific chunk - that request is now defunct).
1563 */
1564
1565 if (m_remote_osd_resource.has_value() && m_remote_osd_resource->is_stale()) {
1566 // we are holding a stale reservation from a past epoch
1567 m_remote_osd_resource.reset();
1568 dout(10) << __func__ << " cleared existing stale reservation" << dendl;
1569 }
1570
1571 if (request_ep < m_pg->get_same_interval_since()) {
1572 // will not ack stale requests
1573 return;
1574 }
1575
1576 bool granted{false};
1577 if (m_remote_osd_resource.has_value()) {
1578
1579 dout(10) << __func__ << " already reserved." << dendl;
1580
1581 /*
1582 * it might well be that we did not yet finish handling the latest scrub-op from
1583 * our primary. This happens, for example, if 'noscrub' was set via a command, then
1584 * reset. The primary in this scenario will remain in the same interval, but we do need
1585 * to reset our internal state (otherwise - the first renewed 'give me your scrub map'
1586 * from the primary will see us in active state, crashing the OSD).
1587 */
1588 advance_token();
1589 granted = true;
1590
1591 } else if (m_pg->cct->_conf->osd_scrub_during_recovery ||
1592 !m_osds->is_recovery_active()) {
1593 m_remote_osd_resource.emplace(this, m_pg, m_osds, request_ep);
1594 // OSD resources allocated?
1595 granted = m_remote_osd_resource->is_reserved();
1596 if (!granted) {
1597 // just forget it
1598 m_remote_osd_resource.reset();
1599 dout(20) << __func__ << ": failed to reserve remotely" << dendl;
1600 }
1601 }
1602
1603 dout(10) << __func__ << " reserved? " << (granted ? "yes" : "no") << dendl;
1604
1605 Message* reply = new MOSDScrubReserve(
1606 spg_t(m_pg->info.pgid.pgid, m_pg->get_primary().shard), request_ep,
1607 granted ? MOSDScrubReserve::GRANT : MOSDScrubReserve::REJECT, m_pg_whoami);
1608
1609 m_osds->send_message_osd_cluster(reply, op->get_req()->get_connection());
1610 }
1611
1612 void PgScrubber::handle_scrub_reserve_grant(OpRequestRef op, pg_shard_t from)
1613 {
1614 dout(10) << __func__ << " " << *op->get_req() << dendl;
1615 op->mark_started();
1616
1617 if (m_reservations.has_value()) {
1618 m_reservations->handle_reserve_grant(op, from);
1619 } else {
1620 dout(20) << __func__ << ": late/unsolicited reservation grant from osd "
1621 << from << " (" << op << ")" << dendl;
1622 }
1623 }
1624
1625 void PgScrubber::handle_scrub_reserve_reject(OpRequestRef op, pg_shard_t from)
1626 {
1627 dout(10) << __func__ << " " << *op->get_req() << dendl;
1628 op->mark_started();
1629
1630 if (m_reservations.has_value()) {
1631 // there is an active reservation process. No action is required otherwise.
1632 m_reservations->handle_reserve_reject(op, from);
1633 }
1634 }
1635
1636 void PgScrubber::handle_scrub_reserve_release(OpRequestRef op)
1637 {
1638 dout(10) << __func__ << " " << *op->get_req() << dendl;
1639 op->mark_started();
1640
1641 /*
1642 * this specific scrub session has terminated. All incoming events carrying the old
1643 * tag will be discarded.
1644 */
1645 advance_token();
1646 m_remote_osd_resource.reset();
1647 }
1648
1649 void PgScrubber::discard_replica_reservations()
1650 {
1651 dout(10) << __func__ << dendl;
1652 if (m_reservations.has_value()) {
1653 m_reservations->discard_all();
1654 }
1655 }
1656
1657 void PgScrubber::clear_scrub_reservations()
1658 {
1659 dout(10) << __func__ << dendl;
1660 m_reservations.reset(); // the remote reservations
1661 m_local_osd_resource.reset(); // the local reservation
1662 m_remote_osd_resource.reset(); // we as replica reserved for a Primary
1663 }
1664
1665 void PgScrubber::message_all_replicas(int32_t opcode, std::string_view op_text)
1666 {
1667 ceph_assert(m_pg->recovery_state.get_backfill_targets().empty());
1668
1669 std::vector<pair<int, Message*>> messages;
1670 messages.reserve(m_pg->get_actingset().size());
1671
1672 epoch_t epch = get_osdmap_epoch();
1673
1674 for (auto& p : m_pg->get_actingset()) {
1675
1676 if (p == m_pg_whoami)
1677 continue;
1678
1679 dout(10) << "scrub requesting " << op_text << " from osd." << p << " Epoch: " << epch
1680 << dendl;
1681 Message* m = new MOSDScrubReserve(spg_t(m_pg->info.pgid.pgid, p.shard), epch, opcode,
1682 m_pg_whoami);
1683 messages.push_back(std::make_pair(p.osd, m));
1684 }
1685
1686 if (!messages.empty()) {
1687 m_osds->send_message_osd_cluster(messages, epch);
1688 }
1689 }
1690
1691 void PgScrubber::unreserve_replicas()
1692 {
1693 dout(10) << __func__ << dendl;
1694 m_reservations.reset();
1695 }
1696
1697 void PgScrubber::set_reserving_now()
1698 {
1699 m_osds->get_scrub_services().set_reserving_now();
1700 }
1701
1702 void PgScrubber::clear_reserving_now()
1703 {
1704 m_osds->get_scrub_services().clear_reserving_now();
1705 }
1706
1707 void PgScrubber::set_queued_or_active()
1708 {
1709 m_queued_or_active = true;
1710 }
1711
1712 void PgScrubber::clear_queued_or_active()
1713 {
1714 if (m_queued_or_active) {
1715 m_queued_or_active = false;
1716 // and just in case snap trimming was blocked by the aborted scrub
1717 m_pg->snap_trimmer_scrub_complete();
1718 }
1719 }
1720
1721 bool PgScrubber::is_queued_or_active() const
1722 {
1723 return m_queued_or_active;
1724 }
1725
1726 [[nodiscard]] bool PgScrubber::scrub_process_inconsistent()
1727 {
1728 dout(10) << __func__ << ": checking authoritative (mode="
1729 << m_mode_desc << ", auth remaining #: " << m_authoritative.size()
1730 << ")" << dendl;
1731
1732 // authoritative only store objects which are missing or inconsistent.
1733 if (!m_authoritative.empty()) {
1734
1735 stringstream ss;
1736 ss << m_pg->info.pgid << " " << m_mode_desc << " " << m_missing.size() << " missing, "
1737 << m_inconsistent.size() << " inconsistent objects";
1738 dout(2) << ss.str() << dendl;
1739 m_osds->clog->error(ss);
1740
1741 if (m_is_repair) {
1742 state_clear(PG_STATE_CLEAN);
1743 // we know we have a problem, so it's OK to set the user-visible flag
1744 // even if we only reached here via auto-repair
1745 state_set(PG_STATE_REPAIR);
1746 update_op_mode_text();
1747
1748 for (const auto& [hobj, shrd_list] : m_authoritative) {
1749
1750 auto missing_entry = m_missing.find(hobj);
1751
1752 if (missing_entry != m_missing.end()) {
1753 m_pg->repair_object(hobj, shrd_list, missing_entry->second);
1754 m_fixed_count += missing_entry->second.size();
1755 }
1756
1757 if (m_inconsistent.count(hobj)) {
1758 m_pg->repair_object(hobj, shrd_list, m_inconsistent[hobj]);
1759 m_fixed_count += m_inconsistent[hobj].size();
1760 }
1761 }
1762 }
1763 }
1764 return (!m_authoritative.empty() && m_is_repair);
1765 }
1766
1767 /*
1768 * note: only called for the Primary.
1769 */
1770 void PgScrubber::scrub_finish()
1771 {
1772 dout(10) << __func__ << " before flags: " << m_flags
1773 << ". repair state: " << (state_test(PG_STATE_REPAIR) ? "repair" : "no-repair")
1774 << ". deep_scrub_on_error: " << m_flags.deep_scrub_on_error << dendl;
1775
1776 ceph_assert(m_pg->is_locked());
1777 ceph_assert(is_queued_or_active());
1778
1779 m_pg->m_planned_scrub = requested_scrub_t{};
1780
1781 // if the repair request comes from auto-repair and large number of errors,
1782 // we would like to cancel auto-repair
1783 if (m_is_repair && m_flags.auto_repair &&
1784 m_authoritative.size() > m_pg->cct->_conf->osd_scrub_auto_repair_num_errors) {
1785
1786 dout(10) << __func__ << " undoing the repair" << dendl;
1787 state_clear(PG_STATE_REPAIR); // not expected to be set, anyway
1788 m_is_repair = false;
1789 update_op_mode_text();
1790 }
1791
1792 bool do_auto_scrub = false;
1793
1794 // if a regular scrub had errors within the limit, do a deep scrub to auto repair
1795 if (m_flags.deep_scrub_on_error && !m_authoritative.empty() &&
1796 m_authoritative.size() <= m_pg->cct->_conf->osd_scrub_auto_repair_num_errors) {
1797 ceph_assert(!m_is_deep);
1798 do_auto_scrub = true;
1799 dout(15) << __func__ << " Try to auto repair after scrub errors" << dendl;
1800 }
1801
1802 m_flags.deep_scrub_on_error = false;
1803
1804 // type-specific finish (can tally more errors)
1805 _scrub_finish();
1806
1807 bool has_error = scrub_process_inconsistent();
1808
1809 {
1810 stringstream oss;
1811 oss << m_pg->info.pgid.pgid << " " << m_mode_desc << " ";
1812 int total_errors = m_shallow_errors + m_deep_errors;
1813 if (total_errors)
1814 oss << total_errors << " errors";
1815 else
1816 oss << "ok";
1817 if (!m_is_deep && m_pg->info.stats.stats.sum.num_deep_scrub_errors)
1818 oss << " ( " << m_pg->info.stats.stats.sum.num_deep_scrub_errors
1819 << " remaining deep scrub error details lost)";
1820 if (m_is_repair)
1821 oss << ", " << m_fixed_count << " fixed";
1822 if (total_errors)
1823 m_osds->clog->error(oss);
1824 else
1825 m_osds->clog->debug(oss);
1826 }
1827
1828 // Since we don't know which errors were fixed, we can only clear them
1829 // when every one has been fixed.
1830 if (m_is_repair) {
1831 if (m_fixed_count == m_shallow_errors + m_deep_errors) {
1832
1833 ceph_assert(m_is_deep);
1834 m_shallow_errors = 0;
1835 m_deep_errors = 0;
1836 dout(20) << __func__ << " All may be fixed" << dendl;
1837
1838 } else if (has_error) {
1839
1840 // Deep scrub in order to get corrected error counts
1841 m_pg->scrub_after_recovery = true;
1842 m_pg->m_planned_scrub.req_scrub =
1843 m_pg->m_planned_scrub.req_scrub || m_flags.required;
1844
1845 dout(20) << __func__ << " Current 'required': " << m_flags.required
1846 << " Planned 'req_scrub': " << m_pg->m_planned_scrub.req_scrub << dendl;
1847
1848 } else if (m_shallow_errors || m_deep_errors) {
1849
1850 // We have errors but nothing can be fixed, so there is no repair
1851 // possible.
1852 state_set(PG_STATE_FAILED_REPAIR);
1853 dout(10) << __func__ << " " << (m_shallow_errors + m_deep_errors)
1854 << " error(s) present with no repair possible" << dendl;
1855 }
1856 }
1857
1858 {
1859 // finish up
1860 ObjectStore::Transaction t;
1861 m_pg->recovery_state.update_stats(
1862 [this](auto& history, auto& stats) {
1863 dout(10) << "m_pg->recovery_state.update_stats()" << dendl;
1864 utime_t now = ceph_clock_now();
1865 history.last_scrub = m_pg->recovery_state.get_info().last_update;
1866 history.last_scrub_stamp = now;
1867 if (m_is_deep) {
1868 history.last_deep_scrub = m_pg->recovery_state.get_info().last_update;
1869 history.last_deep_scrub_stamp = now;
1870 }
1871
1872 if (m_is_deep) {
1873 if ((m_shallow_errors == 0) && (m_deep_errors == 0))
1874 history.last_clean_scrub_stamp = now;
1875 stats.stats.sum.num_shallow_scrub_errors = m_shallow_errors;
1876 stats.stats.sum.num_deep_scrub_errors = m_deep_errors;
1877 stats.stats.sum.num_large_omap_objects = m_omap_stats.large_omap_objects;
1878 stats.stats.sum.num_omap_bytes = m_omap_stats.omap_bytes;
1879 stats.stats.sum.num_omap_keys = m_omap_stats.omap_keys;
1880 dout(25) << "scrub_finish shard " << m_pg_whoami
1881 << " num_omap_bytes = " << stats.stats.sum.num_omap_bytes
1882 << " num_omap_keys = " << stats.stats.sum.num_omap_keys << dendl;
1883 } else {
1884 stats.stats.sum.num_shallow_scrub_errors = m_shallow_errors;
1885 // XXX: last_clean_scrub_stamp doesn't mean the pg is not inconsistent
1886 // because of deep-scrub errors
1887 if (m_shallow_errors == 0)
1888 history.last_clean_scrub_stamp = now;
1889 }
1890 stats.stats.sum.num_scrub_errors = stats.stats.sum.num_shallow_scrub_errors +
1891 stats.stats.sum.num_deep_scrub_errors;
1892 if (m_flags.check_repair) {
1893 m_flags.check_repair = false;
1894 if (m_pg->info.stats.stats.sum.num_scrub_errors) {
1895 state_set(PG_STATE_FAILED_REPAIR);
1896 dout(10) << "scrub_finish " << m_pg->info.stats.stats.sum.num_scrub_errors
1897 << " error(s) still present after re-scrub" << dendl;
1898 }
1899 }
1900 return true;
1901 },
1902 &t);
1903 int tr = m_osds->store->queue_transaction(m_pg->ch, std::move(t), nullptr);
1904 ceph_assert(tr == 0);
1905 }
1906
1907 if (has_error) {
1908 m_pg->queue_peering_event(PGPeeringEventRef(std::make_shared<PGPeeringEvent>(
1909 get_osdmap_epoch(), get_osdmap_epoch(), PeeringState::DoRecovery())));
1910 } else {
1911 m_is_repair = false;
1912 state_clear(PG_STATE_REPAIR);
1913 update_op_mode_text();
1914 }
1915
1916 cleanup_on_finish();
1917 if (do_auto_scrub) {
1918 request_rescrubbing(m_pg->m_planned_scrub);
1919 }
1920
1921 if (m_pg->is_active() && m_pg->is_primary()) {
1922 m_pg->recovery_state.share_pg_info();
1923 }
1924 }
1925
1926 void PgScrubber::on_digest_updates()
1927 {
1928 dout(10) << __func__ << " #pending: " << num_digest_updates_pending
1929 << (m_end.is_max() ? " <last chunk>" : " <mid chunk>")
1930 << (is_queued_or_active() ? "" : " ** not marked as scrubbing **")
1931 << dendl;
1932
1933 if (num_digest_updates_pending > 0) {
1934 // do nothing for now. We will be called again when new updates arrive
1935 return;
1936 }
1937
1938 // got all updates, and finished with this chunk. Any more?
1939 if (m_end.is_max()) {
1940 m_osds->queue_scrub_is_finished(m_pg);
1941 } else {
1942 // go get a new chunk (via "requeue")
1943 preemption_data.reset();
1944 m_osds->queue_scrub_next_chunk(m_pg, m_pg->is_scrub_blocking_ops());
1945 }
1946 }
1947
1948 /*
1949 * note that the flags-set fetched from the PG (m_pg->m_planned_scrub)
1950 * is cleared once scrubbing starts; Some of the values dumped here are
1951 * thus transitory.
1952 */
1953 void PgScrubber::dump_scrubber(ceph::Formatter* f,
1954 const requested_scrub_t& request_flags) const
1955 {
1956 f->open_object_section("scrubber");
1957
1958 if (m_active) { // TBD replace with PR#42780's test
1959 f->dump_bool("active", true);
1960 dump_active_scrubber(f, state_test(PG_STATE_DEEP_SCRUB));
1961 } else {
1962 f->dump_bool("active", false);
1963 f->dump_bool("must_scrub",
1964 (m_pg->m_planned_scrub.must_scrub || m_flags.required));
1965 f->dump_bool("must_deep_scrub", request_flags.must_deep_scrub);
1966 f->dump_bool("must_repair", request_flags.must_repair);
1967 f->dump_bool("need_auto", request_flags.need_auto);
1968
1969 f->dump_stream("scrub_reg_stamp") << m_scrub_job->get_sched_time();
1970
1971 // note that we are repeating logic that is coded elsewhere (currently PG.cc).
1972 // This is not optimal.
1973 bool deep_expected = (ceph_clock_now() >= m_pg->next_deepscrub_interval()) ||
1974 request_flags.must_deep_scrub || request_flags.need_auto;
1975 auto sched_state =
1976 m_scrub_job->scheduling_state(ceph_clock_now(), deep_expected);
1977 f->dump_string("schedule", sched_state);
1978 }
1979
1980 if (m_publish_sessions) {
1981 f->dump_int("test_sequence",
1982 m_sessions_counter); // an ever-increasing number used by tests
1983 }
1984
1985 f->close_section();
1986 }
1987
1988 void PgScrubber::dump_active_scrubber(ceph::Formatter* f, bool is_deep) const
1989 {
1990 f->dump_stream("epoch_start") << m_interval_start;
1991 f->dump_stream("start") << m_start;
1992 f->dump_stream("end") << m_end;
1993 f->dump_stream("max_end") << m_max_end;
1994 f->dump_stream("subset_last_update") << m_subset_last_update;
1995 // note that m_is_deep will be set some time after PG_STATE_DEEP_SCRUB is
1996 // asserted. Thus, using the latter.
1997 f->dump_bool("deep", is_deep);
1998
1999 // dump the scrub-type flags
2000 f->dump_bool("req_scrub", m_flags.required);
2001 f->dump_bool("auto_repair", m_flags.auto_repair);
2002 f->dump_bool("check_repair", m_flags.check_repair);
2003 f->dump_bool("deep_scrub_on_error", m_flags.deep_scrub_on_error);
2004 f->dump_unsigned("priority", m_flags.priority);
2005
2006 f->dump_int("shallow_errors", m_shallow_errors);
2007 f->dump_int("deep_errors", m_deep_errors);
2008 f->dump_int("fixed", m_fixed_count);
2009 {
2010 f->open_array_section("waiting_on_whom");
2011 for (const auto& p : m_maps_status.get_awaited()) {
2012 f->dump_stream("shard") << p;
2013 }
2014 f->close_section();
2015 }
2016 f->dump_string("schedule", "scrubbing");
2017 }
2018
2019 pg_scrubbing_status_t PgScrubber::get_schedule() const
2020 {
2021 dout(25) << __func__ << dendl;
2022
2023 if (!m_scrub_job) {
2024 return pg_scrubbing_status_t{};
2025 }
2026
2027 auto now_is = ceph_clock_now();
2028
2029 if (m_active) {
2030 // report current scrub info, including updated duration
2031 int32_t duration = (utime_t{now_is} - scrub_begin_stamp).sec();
2032
2033 return pg_scrubbing_status_t{
2034 utime_t{},
2035 duration,
2036 pg_scrub_sched_status_t::active,
2037 true, // active
2038 (m_is_deep ? scrub_level_t::deep : scrub_level_t::shallow),
2039 false /* is periodic? unknown, actually */};
2040 }
2041 if (m_scrub_job->state != ScrubQueue::qu_state_t::registered) {
2042 return pg_scrubbing_status_t{utime_t{},
2043 0,
2044 pg_scrub_sched_status_t::not_queued,
2045 false,
2046 scrub_level_t::shallow,
2047 false};
2048 }
2049
2050 // Will next scrub surely be a deep one? note that deep-scrub might be
2051 // selected even if we report a regular scrub here.
2052 bool deep_expected = (now_is >= m_pg->next_deepscrub_interval()) ||
2053 m_pg->m_planned_scrub.must_deep_scrub ||
2054 m_pg->m_planned_scrub.need_auto;
2055 scrub_level_t expected_level =
2056 deep_expected ? scrub_level_t::deep : scrub_level_t::shallow;
2057 bool periodic = !m_pg->m_planned_scrub.must_scrub &&
2058 !m_pg->m_planned_scrub.need_auto &&
2059 !m_pg->m_planned_scrub.must_deep_scrub;
2060
2061 // are we ripe for scrubbing?
2062 if (now_is > m_scrub_job->schedule.scheduled_at) {
2063 // we are waiting for our turn at the OSD.
2064 return pg_scrubbing_status_t{m_scrub_job->schedule.scheduled_at,
2065 0,
2066 pg_scrub_sched_status_t::queued,
2067 false,
2068 expected_level,
2069 periodic};
2070 }
2071
2072 return pg_scrubbing_status_t{m_scrub_job->schedule.scheduled_at,
2073 0,
2074 pg_scrub_sched_status_t::scheduled,
2075 false,
2076 expected_level,
2077 periodic};
2078 }
2079
2080 void PgScrubber::handle_query_state(ceph::Formatter* f)
2081 {
2082 dout(15) << __func__ << dendl;
2083
2084 f->open_object_section("scrub");
2085 f->dump_stream("scrubber.epoch_start") << m_interval_start;
2086 f->dump_bool("scrubber.active", m_active);
2087 f->dump_stream("scrubber.start") << m_start;
2088 f->dump_stream("scrubber.end") << m_end;
2089 f->dump_stream("scrubber.max_end") << m_max_end;
2090 f->dump_stream("scrubber.subset_last_update") << m_subset_last_update;
2091 f->dump_bool("scrubber.deep", m_is_deep);
2092 {
2093 f->open_array_section("scrubber.waiting_on_whom");
2094 for (const auto& p : m_maps_status.get_awaited()) {
2095 f->dump_stream("shard") << p;
2096 }
2097 f->close_section();
2098 }
2099
2100 f->dump_string("comment", "DEPRECATED - may be removed in the next release");
2101
2102 f->close_section();
2103 }
2104
2105 PgScrubber::~PgScrubber()
2106 {
2107 if (m_scrub_job) {
2108 // make sure the OSD won't try to scrub this one just now
2109 rm_from_osd_scrubbing();
2110 m_scrub_job.reset();
2111 }
2112 }
2113
2114 PgScrubber::PgScrubber(PG* pg)
2115 : m_pg{pg}
2116 , m_pg_id{pg->pg_id}
2117 , m_osds{m_pg->osd}
2118 , m_pg_whoami{pg->pg_whoami}
2119 , preemption_data{pg}
2120 {
2121 m_fsm = std::make_unique<ScrubMachine>(m_pg, this);
2122 m_fsm->initiate();
2123
2124 m_scrub_job = ceph::make_ref<ScrubQueue::ScrubJob>(m_osds->cct, m_pg->pg_id,
2125 m_osds->get_nodeid());
2126 }
2127
2128 void PgScrubber::set_scrub_begin_time()
2129 {
2130 scrub_begin_stamp = ceph_clock_now();
2131 m_osds->clog->debug() << fmt::format(
2132 "{} {} starts",
2133 m_pg->info.pgid.pgid,
2134 m_mode_desc);
2135 }
2136
2137 void PgScrubber::set_scrub_duration()
2138 {
2139 utime_t stamp = ceph_clock_now();
2140 utime_t duration = stamp - scrub_begin_stamp;
2141 m_pg->recovery_state.update_stats([=](auto& history, auto& stats) {
2142 stats.last_scrub_duration = ceill(duration.to_msec()/1000.0);
2143 stats.scrub_duration = double(duration);
2144 return true;
2145 });
2146 }
2147
2148 void PgScrubber::reserve_replicas()
2149 {
2150 dout(10) << __func__ << dendl;
2151 m_reservations.emplace(m_pg, m_pg_whoami, m_scrub_job);
2152 }
2153
2154 void PgScrubber::cleanup_on_finish()
2155 {
2156 dout(10) << __func__ << dendl;
2157 ceph_assert(m_pg->is_locked());
2158
2159 state_clear(PG_STATE_SCRUBBING);
2160 state_clear(PG_STATE_DEEP_SCRUB);
2161 m_pg->publish_stats_to_osd();
2162
2163 clear_scrub_reservations();
2164 m_pg->publish_stats_to_osd();
2165
2166 requeue_waiting();
2167
2168 reset_internal_state();
2169 m_pg->publish_stats_to_osd();
2170 m_flags = scrub_flags_t{};
2171
2172 // type-specific state clear
2173 _scrub_clear_state();
2174 }
2175
2176 // uses process_event(), so must be invoked externally
2177 void PgScrubber::scrub_clear_state()
2178 {
2179 dout(10) << __func__ << dendl;
2180
2181 clear_pgscrub_state();
2182 m_fsm->process_event(FullReset{});
2183 }
2184
2185 /*
2186 * note: does not access the state-machine
2187 */
2188 void PgScrubber::clear_pgscrub_state()
2189 {
2190 dout(10) << __func__ << dendl;
2191 ceph_assert(m_pg->is_locked());
2192
2193 state_clear(PG_STATE_SCRUBBING);
2194 state_clear(PG_STATE_DEEP_SCRUB);
2195
2196 state_clear(PG_STATE_REPAIR);
2197
2198 clear_scrub_reservations();
2199 m_pg->publish_stats_to_osd();
2200
2201 requeue_waiting();
2202
2203 reset_internal_state();
2204 m_flags = scrub_flags_t{};
2205
2206 // type-specific state clear
2207 _scrub_clear_state();
2208 m_pg->publish_stats_to_osd();
2209 }
2210
2211 void PgScrubber::replica_handling_done()
2212 {
2213 dout(10) << __func__ << dendl;
2214
2215 state_clear(PG_STATE_SCRUBBING);
2216 state_clear(PG_STATE_DEEP_SCRUB);
2217
2218 reset_internal_state();
2219 }
2220
2221 /*
2222 * note: performs run_callbacks()
2223 * note: reservations-related variables are not reset here
2224 */
2225 void PgScrubber::reset_internal_state()
2226 {
2227 dout(10) << __func__ << dendl;
2228
2229 preemption_data.reset();
2230 m_maps_status.reset();
2231 m_received_maps.clear();
2232
2233 m_start = hobject_t{};
2234 m_end = hobject_t{};
2235 m_max_end = hobject_t{};
2236 m_subset_last_update = eversion_t{};
2237 m_shallow_errors = 0;
2238 m_deep_errors = 0;
2239 m_fixed_count = 0;
2240 m_omap_stats = (const struct omap_stat_t){0};
2241
2242 run_callbacks();
2243
2244 m_inconsistent.clear();
2245 m_missing.clear();
2246 m_authoritative.clear();
2247 num_digest_updates_pending = 0;
2248 m_primary_scrubmap = ScrubMap{};
2249 m_primary_scrubmap_pos.reset();
2250 replica_scrubmap = ScrubMap{};
2251 replica_scrubmap_pos.reset();
2252 m_cleaned_meta_map = ScrubMap{};
2253 m_needs_sleep = true;
2254 m_sleep_started_at = utime_t{};
2255
2256 m_active = false;
2257 clear_queued_or_active();
2258 ++m_sessions_counter;
2259 }
2260
2261 // note that only applicable to the Replica:
2262 void PgScrubber::advance_token()
2263 {
2264 dout(10) << __func__ << " was: " << m_current_token << dendl;
2265 m_current_token++;
2266
2267 // when advance_token() is called, it is assumed that no scrubbing takes place.
2268 // We will, though, verify that. And if we are actually still handling a stale request -
2269 // both our internal state and the FSM state will be cleared.
2270 replica_handling_done();
2271 m_fsm->process_event(FullReset{});
2272 }
2273
2274 bool PgScrubber::is_token_current(Scrub::act_token_t received_token)
2275 {
2276 if (received_token == 0 || received_token == m_current_token) {
2277 return true;
2278 }
2279 dout(5) << __func__ << " obsolete token (" << received_token
2280 << " vs current " << m_current_token << dendl;
2281
2282 return false;
2283 }
2284
2285 const OSDMapRef& PgScrubber::get_osdmap() const
2286 {
2287 return m_pg->get_osdmap();
2288 }
2289
2290 ostream& operator<<(ostream& out, const PgScrubber& scrubber)
2291 {
2292 return out << scrubber.m_flags;
2293 }
2294
2295 std::ostream& PgScrubber::gen_prefix(std::ostream& out) const
2296 {
2297 const auto fsm_state = m_fsm ? m_fsm->current_states_desc() : "- :";
2298 if (m_pg) {
2299 return m_pg->gen_prefix(out) << "scrubber " << fsm_state << ": ";
2300 } else {
2301 return out << " scrubber [~] " << fsm_state << ": ";
2302 }
2303 }
2304
2305 void PgScrubber::log_cluster_warning(const std::string& warning) const
2306 {
2307 m_osds->clog->do_log(CLOG_WARN, warning);
2308 }
2309
2310 ostream& PgScrubber::show(ostream& out) const
2311 {
2312 return out << " [ " << m_pg_id << ": " << m_flags << " ] ";
2313 }
2314
2315 int PgScrubber::asok_debug(std::string_view cmd,
2316 std::string param,
2317 Formatter* f,
2318 stringstream& ss)
2319 {
2320 dout(10) << __func__ << " cmd: " << cmd << " param: " << param << dendl;
2321
2322 if (cmd == "block") {
2323 // set a flag that will cause the next 'select_range' to report a blocked object
2324 m_debug_blockrange = 1;
2325
2326 } else if (cmd == "unblock") {
2327 // send an 'unblock' event, as if a blocked range was freed
2328 m_debug_blockrange = 0;
2329 m_fsm->process_event(Unblocked{});
2330
2331 } else if ((cmd == "set") || (cmd == "unset")) {
2332
2333 if (param == "sessions") {
2334 // set/reset the inclusion of the scrub sessions counter in 'query' output
2335 m_publish_sessions = (cmd == "set");
2336
2337 } else if (param == "block") {
2338 if (cmd == "set") {
2339 // set a flag that will cause the next 'select_range' to report a blocked object
2340 m_debug_blockrange = 1;
2341 } else {
2342 // send an 'unblock' event, as if a blocked range was freed
2343 m_debug_blockrange = 0;
2344 m_fsm->process_event(Unblocked{});
2345 }
2346 }
2347 }
2348
2349 return 0;
2350 }
2351 // ///////////////////// preemption_data_t //////////////////////////////////
2352
2353 PgScrubber::preemption_data_t::preemption_data_t(PG* pg) : m_pg{pg}
2354 {
2355 m_left = static_cast<int>(
2356 m_pg->get_cct()->_conf.get_val<uint64_t>("osd_scrub_max_preemptions"));
2357 }
2358
2359 void PgScrubber::preemption_data_t::reset()
2360 {
2361 std::lock_guard<std::mutex> lk{m_preemption_lock};
2362
2363 m_preemptable = false;
2364 m_preempted = false;
2365 m_left =
2366 static_cast<int>(m_pg->cct->_conf.get_val<uint64_t>("osd_scrub_max_preemptions"));
2367 m_size_divisor = 1;
2368 }
2369
2370
2371 // ///////////////////// ReplicaReservations //////////////////////////////////
2372 namespace Scrub {
2373
2374 void ReplicaReservations::release_replica(pg_shard_t peer, epoch_t epoch)
2375 {
2376 auto m = new MOSDScrubReserve(spg_t(m_pg_info.pgid.pgid, peer.shard), epoch,
2377 MOSDScrubReserve::RELEASE, m_pg->pg_whoami);
2378 m_osds->send_message_osd_cluster(peer.osd, m, epoch);
2379 }
2380
2381 ReplicaReservations::ReplicaReservations(PG* pg, pg_shard_t whoami, ScrubQueue::ScrubJobRef scrubjob)
2382 : m_pg{pg}
2383 , m_acting_set{pg->get_actingset()}
2384 , m_osds{m_pg->get_pg_osd(ScrubberPasskey())}
2385 , m_pending{static_cast<int>(m_acting_set.size()) - 1}
2386 , m_pg_info{m_pg->get_pg_info(ScrubberPasskey())}
2387 , m_scrub_job{scrubjob}
2388 {
2389 epoch_t epoch = m_pg->get_osdmap_epoch();
2390
2391 {
2392 std::stringstream prefix;
2393 prefix << "osd." << m_osds->whoami << " ep: " << epoch
2394 << " scrubber::ReplicaReservations pg[" << pg->pg_id << "]: ";
2395 m_log_msg_prefix = prefix.str();
2396 }
2397
2398 // handle the special case of no replicas
2399 if (m_pending <= 0) {
2400 // just signal the scrub state-machine to continue
2401 send_all_done();
2402
2403 } else {
2404
2405 for (auto p : m_acting_set) {
2406 if (p == whoami)
2407 continue;
2408 auto m = new MOSDScrubReserve(spg_t(m_pg_info.pgid.pgid, p.shard), epoch,
2409 MOSDScrubReserve::REQUEST, m_pg->pg_whoami);
2410 m_osds->send_message_osd_cluster(p.osd, m, epoch);
2411 m_waited_for_peers.push_back(p);
2412 dout(10) << __func__ << ": reserve " << p.osd << dendl;
2413 }
2414 }
2415 }
2416
2417 void ReplicaReservations::send_all_done()
2418 {
2419 m_osds->queue_for_scrub_granted(m_pg, scrub_prio_t::low_priority);
2420 }
2421
2422 void ReplicaReservations::send_reject()
2423 {
2424 m_scrub_job->resources_failure = true;
2425 m_osds->queue_for_scrub_denied(m_pg, scrub_prio_t::low_priority);
2426 }
2427
2428 void ReplicaReservations::discard_all()
2429 {
2430 dout(10) << __func__ << ": " << m_reserved_peers << dendl;
2431
2432 m_had_rejections = true; // preventing late-coming responses from triggering events
2433 m_reserved_peers.clear();
2434 m_waited_for_peers.clear();
2435 }
2436
2437 ReplicaReservations::~ReplicaReservations()
2438 {
2439 m_had_rejections = true; // preventing late-coming responses from triggering events
2440
2441 // send un-reserve messages to all reserved replicas. We do not wait for answer (there
2442 // wouldn't be one). Other incoming messages will be discarded on the way, by our
2443 // owner.
2444 epoch_t epoch = m_pg->get_osdmap_epoch();
2445
2446 for (auto& p : m_reserved_peers) {
2447 release_replica(p, epoch);
2448 }
2449 m_reserved_peers.clear();
2450
2451 // note: the release will follow on the heels of the request. When tried otherwise,
2452 // grants that followed a reject arrived after the whole scrub machine-state was
2453 // reset, causing leaked reservations.
2454 for (auto& p : m_waited_for_peers) {
2455 release_replica(p, epoch);
2456 }
2457 m_waited_for_peers.clear();
2458 }
2459
2460 /**
2461 * @ATTN we would not reach here if the ReplicaReservation object managed by the
2462 * scrubber was reset.
2463 */
2464 void ReplicaReservations::handle_reserve_grant(OpRequestRef op, pg_shard_t from)
2465 {
2466 dout(10) << __func__ << ": granted by " << from << dendl;
2467 op->mark_started();
2468
2469 {
2470 // reduce the amount of extra release messages. Not a must, but the log is cleaner
2471 auto w = find(m_waited_for_peers.begin(), m_waited_for_peers.end(), from);
2472 if (w != m_waited_for_peers.end())
2473 m_waited_for_peers.erase(w);
2474 }
2475
2476 // are we forced to reject the reservation?
2477 if (m_had_rejections) {
2478
2479 dout(10) << __func__ << ": rejecting late-coming reservation from "
2480 << from << dendl;
2481 release_replica(from, m_pg->get_osdmap_epoch());
2482
2483 } else if (std::find(m_reserved_peers.begin(), m_reserved_peers.end(), from) !=
2484 m_reserved_peers.end()) {
2485
2486 dout(10) << __func__ << ": already had osd." << from << " reserved" << dendl;
2487
2488 } else {
2489
2490 dout(10) << __func__ << ": osd." << from << " scrub reserve = success"
2491 << dendl;
2492 m_reserved_peers.push_back(from);
2493 if (--m_pending == 0) {
2494 send_all_done();
2495 }
2496 }
2497 }
2498
2499 void ReplicaReservations::handle_reserve_reject(OpRequestRef op, pg_shard_t from)
2500 {
2501 dout(10) << __func__ << ": rejected by " << from << dendl;
2502 dout(15) << __func__ << ": " << *op->get_req() << dendl;
2503 op->mark_started();
2504
2505 {
2506 // reduce the amount of extra release messages. Not a must, but the log is cleaner
2507 auto w = find(m_waited_for_peers.begin(), m_waited_for_peers.end(), from);
2508 if (w != m_waited_for_peers.end())
2509 m_waited_for_peers.erase(w);
2510 }
2511
2512 if (m_had_rejections) {
2513
2514 // our failure was already handled when the first rejection arrived
2515 dout(15) << __func__ << ": ignoring late-coming rejection from "
2516 << from << dendl;
2517
2518 } else if (std::find(m_reserved_peers.begin(), m_reserved_peers.end(), from) !=
2519 m_reserved_peers.end()) {
2520
2521 dout(10) << __func__ << ": already had osd." << from << " reserved" << dendl;
2522
2523 } else {
2524
2525 dout(10) << __func__ << ": osd." << from << " scrub reserve = fail" << dendl;
2526 m_had_rejections = true; // preventing any additional notifications
2527 send_reject();
2528 }
2529 }
2530
2531 std::ostream& ReplicaReservations::gen_prefix(std::ostream& out) const
2532 {
2533 return out << m_log_msg_prefix;
2534 }
2535
2536 // ///////////////////// LocalReservation //////////////////////////////////
2537
2538 // note: no dout()s in LocalReservation functions. Client logs interactions.
2539 LocalReservation::LocalReservation(OSDService* osds)
2540 : m_osds{osds}
2541 {
2542 if (m_osds->get_scrub_services().inc_scrubs_local()) {
2543 // the failure is signalled by not having m_holding_local_reservation set
2544 m_holding_local_reservation = true;
2545 }
2546 }
2547
2548 LocalReservation::~LocalReservation()
2549 {
2550 if (m_holding_local_reservation) {
2551 m_holding_local_reservation = false;
2552 m_osds->get_scrub_services().dec_scrubs_local();
2553 }
2554 }
2555
2556 // ///////////////////// ReservedByRemotePrimary ///////////////////////////////
2557
2558 ReservedByRemotePrimary::ReservedByRemotePrimary(const PgScrubber* scrubber,
2559 PG* pg,
2560 OSDService* osds,
2561 epoch_t epoch)
2562 : m_scrubber{scrubber}
2563 , m_pg{pg}
2564 , m_osds{osds}
2565 , m_reserved_at{epoch}
2566 {
2567 if (!m_osds->get_scrub_services().inc_scrubs_remote()) {
2568 dout(10) << __func__ << ": failed to reserve at Primary request" << dendl;
2569 // the failure is signalled by not having m_reserved_by_remote_primary set
2570 return;
2571 }
2572
2573 dout(20) << __func__ << ": scrub resources reserved at Primary request" << dendl;
2574 m_reserved_by_remote_primary = true;
2575 }
2576
2577 bool ReservedByRemotePrimary::is_stale() const
2578 {
2579 return m_reserved_at < m_pg->get_same_interval_since();
2580 }
2581
2582 ReservedByRemotePrimary::~ReservedByRemotePrimary()
2583 {
2584 if (m_reserved_by_remote_primary) {
2585 m_reserved_by_remote_primary = false;
2586 m_osds->get_scrub_services().dec_scrubs_remote();
2587 }
2588 }
2589
2590 std::ostream& ReservedByRemotePrimary::gen_prefix(std::ostream& out) const
2591 {
2592 return m_scrubber->gen_prefix(out);
2593 }
2594
2595 // ///////////////////// MapsCollectionStatus ////////////////////////////////
2596
2597 auto MapsCollectionStatus::mark_arriving_map(pg_shard_t from)
2598 -> std::tuple<bool, std::string_view>
2599 {
2600 auto fe = std::find(m_maps_awaited_for.begin(), m_maps_awaited_for.end(), from);
2601 if (fe != m_maps_awaited_for.end()) {
2602 // we are indeed waiting for a map from this replica
2603 m_maps_awaited_for.erase(fe);
2604 return std::tuple{true, ""sv};
2605 } else {
2606 return std::tuple{false, " unsolicited scrub-map"sv};
2607 }
2608 }
2609
2610 void MapsCollectionStatus::reset()
2611 {
2612 *this = MapsCollectionStatus{};
2613 }
2614
2615 std::string MapsCollectionStatus::dump() const
2616 {
2617 std::string all;
2618 for (const auto& rp : m_maps_awaited_for) {
2619 all.append(rp.get_osd() + " "s);
2620 }
2621 return all;
2622 }
2623
2624 ostream& operator<<(ostream& out, const MapsCollectionStatus& sf)
2625 {
2626 out << " [ ";
2627 for (const auto& rp : sf.m_maps_awaited_for) {
2628 out << rp.get_osd() << " ";
2629 }
2630 if (!sf.m_local_map_ready) {
2631 out << " local ";
2632 }
2633 return out << " ] ";
2634 }
2635
2636 // ///////////////////// blocked_range_t ///////////////////////////////
2637
2638 blocked_range_t::blocked_range_t(OSDService* osds, ceph::timespan waittime, spg_t pg_id)
2639 : m_osds{osds}
2640 {
2641 auto now_is = std::chrono::system_clock::now();
2642 m_callbk = new LambdaContext([now_is, pg_id, osds]([[maybe_unused]] int r) {
2643 std::time_t now_c = std::chrono::system_clock::to_time_t(now_is);
2644 char buf[50];
2645 strftime(buf, sizeof(buf), "%Y-%m-%dT%H:%M:%S", std::localtime(&now_c));
2646 lgeneric_subdout(g_ceph_context, osd, 10)
2647 << "PgScrubber: " << pg_id << " blocked on an object for too long (since " << buf
2648 << ")" << dendl;
2649 osds->clog->warn() << "osd." << osds->whoami << " PgScrubber: " << pg_id << " blocked on an object for too long (since " << buf << ")";
2650 return;
2651 });
2652
2653 std::lock_guard l(m_osds->sleep_lock);
2654 m_osds->sleep_timer.add_event_after(waittime, m_callbk);
2655 }
2656
2657 blocked_range_t::~blocked_range_t()
2658 {
2659 std::lock_guard l(m_osds->sleep_lock);
2660 m_osds->sleep_timer.cancel_event(m_callbk);
2661 }
2662
2663 } // namespace Scrub