]> git.proxmox.com Git - ceph.git/blob - ceph/src/mds/ScrubStack.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / mds / ScrubStack.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2014 Red Hat
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #include "ScrubStack.h"
16 #include "common/Finisher.h"
17 #include "mds/MDSRank.h"
18 #include "mds/MDCache.h"
19 #include "mds/MDSContinuation.h"
20
21 #define dout_context g_ceph_context
22 #define dout_subsys ceph_subsys_mds
23 #undef dout_prefix
24 #define dout_prefix _prefix(_dout, mdcache->mds)
25
26 using namespace std;
27
28 static std::ostream& _prefix(std::ostream *_dout, MDSRank *mds) {
29 return *_dout << "mds." << mds->get_nodeid() << ".scrubstack ";
30 }
31
32 std::ostream &operator<<(std::ostream &os, const ScrubStack::State &state) {
33 switch(state) {
34 case ScrubStack::STATE_RUNNING:
35 os << "RUNNING";
36 break;
37 case ScrubStack::STATE_IDLE:
38 os << "IDLE";
39 break;
40 case ScrubStack::STATE_PAUSING:
41 os << "PAUSING";
42 break;
43 case ScrubStack::STATE_PAUSED:
44 os << "PAUSED";
45 break;
46 default:
47 ceph_abort();
48 }
49
50 return os;
51 }
52
53 void ScrubStack::dequeue(MDSCacheObject *obj)
54 {
55 dout(20) << "dequeue " << *obj << " from ScrubStack" << dendl;
56 ceph_assert(obj->item_scrub.is_on_list());
57 obj->put(MDSCacheObject::PIN_SCRUBQUEUE);
58 obj->item_scrub.remove_myself();
59 stack_size--;
60 }
61
62 int ScrubStack::_enqueue(MDSCacheObject *obj, ScrubHeaderRef& header, bool top)
63 {
64 ceph_assert(ceph_mutex_is_locked_by_me(mdcache->mds->mds_lock));
65 if (CInode *in = dynamic_cast<CInode*>(obj)) {
66 if (in->scrub_is_in_progress()) {
67 dout(10) << __func__ << " with {" << *in << "}" << ", already in scrubbing" << dendl;
68 return -CEPHFS_EBUSY;
69 }
70
71 dout(10) << __func__ << " with {" << *in << "}" << ", top=" << top << dendl;
72 in->scrub_initialize(header);
73 } else if (CDir *dir = dynamic_cast<CDir*>(obj)) {
74 if (dir->scrub_is_in_progress()) {
75 dout(10) << __func__ << " with {" << *dir << "}" << ", already in scrubbing" << dendl;
76 return -CEPHFS_EBUSY;
77 }
78
79 dout(10) << __func__ << " with {" << *dir << "}" << ", top=" << top << dendl;
80 // The edge directory must be in memory
81 dir->auth_pin(this);
82 dir->scrub_initialize(header);
83 } else {
84 ceph_assert(0 == "queue dentry to scrub stack");
85 }
86
87 dout(20) << "enqueue " << *obj << " to " << (top ? "top" : "bottom") << " of ScrubStack" << dendl;
88 if (!obj->item_scrub.is_on_list()) {
89 obj->get(MDSCacheObject::PIN_SCRUBQUEUE);
90 stack_size++;
91 }
92 if (top)
93 scrub_stack.push_front(&obj->item_scrub);
94 else
95 scrub_stack.push_back(&obj->item_scrub);
96 return 0;
97 }
98
99 int ScrubStack::enqueue(CInode *in, ScrubHeaderRef& header, bool top)
100 {
101 // abort in progress
102 if (clear_stack)
103 return -CEPHFS_EAGAIN;
104
105 header->set_origin(in->ino());
106 auto ret = scrubbing_map.emplace(header->get_tag(), header);
107 if (!ret.second) {
108 dout(10) << __func__ << " with {" << *in << "}"
109 << ", conflicting tag " << header->get_tag() << dendl;
110 return -CEPHFS_EEXIST;
111 }
112
113 int r = _enqueue(in, header, top);
114 if (r < 0)
115 return r;
116
117 clog_scrub_summary(in);
118
119 kick_off_scrubs();
120 return 0;
121 }
122
123 void ScrubStack::add_to_waiting(MDSCacheObject *obj)
124 {
125 scrubs_in_progress++;
126 obj->item_scrub.remove_myself();
127 scrub_waiting.push_back(&obj->item_scrub);
128 }
129
130 void ScrubStack::remove_from_waiting(MDSCacheObject *obj, bool kick)
131 {
132 scrubs_in_progress--;
133 if (obj->item_scrub.is_on_list()) {
134 obj->item_scrub.remove_myself();
135 scrub_stack.push_front(&obj->item_scrub);
136 if (kick)
137 kick_off_scrubs();
138 }
139 }
140
141 class C_RetryScrub : public MDSInternalContext {
142 public:
143 C_RetryScrub(ScrubStack *s, MDSCacheObject *o) :
144 MDSInternalContext(s->mdcache->mds), stack(s), obj(o) {
145 stack->add_to_waiting(obj);
146 }
147 void finish(int r) override {
148 stack->remove_from_waiting(obj);
149 }
150 private:
151 ScrubStack *stack;
152 MDSCacheObject *obj;
153 };
154
155 void ScrubStack::kick_off_scrubs()
156 {
157 ceph_assert(ceph_mutex_is_locked(mdcache->mds->mds_lock));
158 dout(20) << __func__ << ": state=" << state << dendl;
159
160 if (clear_stack || state == STATE_PAUSING || state == STATE_PAUSED) {
161 if (scrubs_in_progress == 0) {
162 dout(10) << __func__ << ": in progress scrub operations finished, "
163 << stack_size << " in the stack" << dendl;
164
165 State final_state = state;
166 if (clear_stack) {
167 abort_pending_scrubs();
168 final_state = STATE_IDLE;
169 }
170 if (state == STATE_PAUSING) {
171 final_state = STATE_PAUSED;
172 }
173
174 set_state(final_state);
175 complete_control_contexts(0);
176 }
177
178 return;
179 }
180
181 dout(20) << __func__ << " entering with " << scrubs_in_progress << " in "
182 "progress and " << stack_size << " in the stack" << dendl;
183 elist<MDSCacheObject*>::iterator it = scrub_stack.begin();
184 while (g_conf()->mds_max_scrub_ops_in_progress > scrubs_in_progress) {
185 if (it.end()) {
186 if (scrubs_in_progress == 0) {
187 set_state(STATE_IDLE);
188 }
189
190 return;
191 }
192
193 assert(state == STATE_RUNNING || state == STATE_IDLE);
194 set_state(STATE_RUNNING);
195
196 if (CInode *in = dynamic_cast<CInode*>(*it)) {
197 dout(20) << __func__ << " examining " << *in << dendl;
198 ++it;
199
200 if (!validate_inode_auth(in))
201 continue;
202
203 if (!in->is_dir()) {
204 // it's a regular file, symlink, or hard link
205 dequeue(in); // we only touch it this once, so remove from stack
206
207 scrub_file_inode(in);
208 } else {
209 bool added_children = false;
210 bool done = false; // it's done, so pop it off the stack
211 scrub_dir_inode(in, &added_children, &done);
212 if (done) {
213 dout(20) << __func__ << " dir inode, done" << dendl;
214 dequeue(in);
215 }
216 if (added_children) {
217 // dirfrags were queued at top of stack
218 it = scrub_stack.begin();
219 }
220 }
221 } else if (CDir *dir = dynamic_cast<CDir*>(*it)) {
222 auto next = it;
223 ++next;
224 bool done = false; // it's done, so pop it off the stack
225 scrub_dirfrag(dir, &done);
226 if (done) {
227 dout(20) << __func__ << " dirfrag, done" << dendl;
228 ++it; // child inodes were queued at bottom of stack
229 dequeue(dir);
230 } else {
231 it = next;
232 }
233 } else {
234 ceph_assert(0 == "dentry in scrub stack");
235 }
236 }
237 }
238
239 bool ScrubStack::validate_inode_auth(CInode *in)
240 {
241 if (in->is_auth()) {
242 if (!in->can_auth_pin()) {
243 dout(10) << __func__ << " can't auth pin" << dendl;
244 in->add_waiter(CInode::WAIT_UNFREEZE, new C_RetryScrub(this, in));
245 return false;
246 }
247 return true;
248 } else {
249 MDSRank *mds = mdcache->mds;
250 if (in->is_ambiguous_auth()) {
251 dout(10) << __func__ << " ambiguous auth" << dendl;
252 in->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, new C_RetryScrub(this, in));
253 } else if (mds->is_cluster_degraded()) {
254 dout(20) << __func__ << " cluster degraded" << dendl;
255 mds->wait_for_cluster_recovered(new C_RetryScrub(this, in));
256 } else {
257 ScrubHeaderRef header = in->get_scrub_header();
258 ceph_assert(header);
259
260 auto ret = remote_scrubs.emplace(std::piecewise_construct,
261 std::forward_as_tuple(in),
262 std::forward_as_tuple());
263 ceph_assert(ret.second); // FIXME: parallel scrubs?
264 auto &scrub_r = ret.first->second;
265 scrub_r.tag = header->get_tag();
266
267 mds_rank_t auth = in->authority().first;
268 dout(10) << __func__ << " forward to mds." << auth << dendl;
269 auto r = make_message<MMDSScrub>(MMDSScrub::OP_QUEUEINO, in->ino(),
270 std::move(in->scrub_queued_frags()),
271 header->get_tag(), header->get_origin(),
272 header->is_internal_tag(), header->get_force(),
273 header->get_recursive(), header->get_repair());
274 mdcache->mds->send_message_mds(r, auth);
275
276 scrub_r.gather_set.insert(auth);
277 // wait for ACK
278 add_to_waiting(in);
279 }
280 return false;
281 }
282 }
283
284 void ScrubStack::scrub_dir_inode(CInode *in, bool *added_children, bool *done)
285 {
286 dout(10) << __func__ << " " << *in << dendl;
287 ceph_assert(in->is_auth());
288 MDSRank *mds = mdcache->mds;
289
290 ScrubHeaderRef header = in->get_scrub_header();
291 ceph_assert(header);
292
293 MDSGatherBuilder gather(g_ceph_context);
294
295 auto &queued = in->scrub_queued_frags();
296 std::map<mds_rank_t, fragset_t> scrub_remote;
297
298 frag_vec_t frags;
299 in->dirfragtree.get_leaves(frags);
300 dout(20) << __func__ << "recursive mode, frags " << frags << dendl;
301 for (auto &fg : frags) {
302 if (queued.contains(fg))
303 continue;
304 CDir *dir = in->get_or_open_dirfrag(mdcache, fg);
305 if (!dir->is_auth()) {
306 if (dir->is_ambiguous_auth()) {
307 dout(20) << __func__ << " ambiguous auth " << *dir << dendl;
308 dir->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, gather.new_sub());
309 } else if (mds->is_cluster_degraded()) {
310 dout(20) << __func__ << " cluster degraded" << dendl;
311 mds->wait_for_cluster_recovered(gather.new_sub());
312 } else {
313 mds_rank_t auth = dir->authority().first;
314 scrub_remote[auth].insert_raw(fg);
315 }
316 } else if (!dir->can_auth_pin()) {
317 dout(20) << __func__ << " freezing/frozen " << *dir << dendl;
318 dir->add_waiter(CDir::WAIT_UNFREEZE, gather.new_sub());
319 } else if (dir->get_version() == 0) {
320 dout(20) << __func__ << " barebones " << *dir << dendl;
321 dir->fetch_keys({}, gather.new_sub());
322 } else {
323 _enqueue(dir, header, true);
324 queued.insert_raw(dir->get_frag());
325 *added_children = true;
326 }
327 }
328
329 queued.simplify();
330
331 if (gather.has_subs()) {
332 gather.set_finisher(new C_RetryScrub(this, in));
333 gather.activate();
334 return;
335 }
336
337 if (!scrub_remote.empty()) {
338 auto ret = remote_scrubs.emplace(std::piecewise_construct,
339 std::forward_as_tuple(in),
340 std::forward_as_tuple());
341 ceph_assert(ret.second); // FIXME: parallel scrubs?
342 auto &scrub_r = ret.first->second;
343 scrub_r.tag = header->get_tag();
344
345 for (auto& p : scrub_remote) {
346 p.second.simplify();
347 dout(20) << __func__ << " forward " << p.second << " to mds." << p.first << dendl;
348 auto r = make_message<MMDSScrub>(MMDSScrub::OP_QUEUEDIR, in->ino(),
349 std::move(p.second), header->get_tag(),
350 header->get_origin(), header->is_internal_tag(),
351 header->get_force(), header->get_recursive(),
352 header->get_repair());
353 mds->send_message_mds(r, p.first);
354 scrub_r.gather_set.insert(p.first);
355 }
356 // wait for ACKs
357 add_to_waiting(in);
358 return;
359 }
360
361 scrub_dir_inode_final(in);
362
363 *done = true;
364 dout(10) << __func__ << " done" << dendl;
365 }
366
367 class C_InodeValidated : public MDSInternalContext
368 {
369 public:
370 ScrubStack *stack;
371 CInode::validated_data result;
372 CInode *target;
373
374 C_InodeValidated(MDSRank *mds, ScrubStack *stack_, CInode *target_)
375 : MDSInternalContext(mds), stack(stack_), target(target_)
376 {
377 stack->scrubs_in_progress++;
378 }
379 void finish(int r) override {
380 stack->_validate_inode_done(target, r, result);
381 stack->scrubs_in_progress--;
382 stack->kick_off_scrubs();
383 }
384 };
385
386 void ScrubStack::scrub_dir_inode_final(CInode *in)
387 {
388 dout(20) << __func__ << " " << *in << dendl;
389
390 C_InodeValidated *fin = new C_InodeValidated(mdcache->mds, this, in);
391 in->validate_disk_state(&fin->result, fin);
392 return;
393 }
394
395 void ScrubStack::scrub_dirfrag(CDir *dir, bool *done)
396 {
397 ceph_assert(dir != NULL);
398
399 dout(10) << __func__ << " " << *dir << dendl;
400
401 if (!dir->is_complete()) {
402 dir->fetch(new C_RetryScrub(this, dir), true); // already auth pinned
403 dout(10) << __func__ << " incomplete, fetching" << dendl;
404 return;
405 }
406
407 ScrubHeaderRef header = dir->get_scrub_header();
408 version_t last_scrub = dir->scrub_info()->last_recursive.version;
409 if (header->get_recursive()) {
410 auto next_seq = mdcache->get_global_snaprealm()->get_newest_seq()+1;
411 for (auto it = dir->begin(); it != dir->end(); /* nop */) {
412 auto [dnk, dn] = *it;
413 ++it; /* trim (in the future) may remove dentry */
414
415 if (dn->scrub(next_seq)) {
416 std::string path;
417 dir->get_inode()->make_path_string(path, true);
418 clog->warn() << "Scrub error on dentry " << *dn
419 << " see " << g_conf()->name
420 << " log and `damage ls` output for details";
421 }
422
423 if (dnk.snapid != CEPH_NOSNAP) {
424 continue;
425 }
426
427 CDentry::linkage_t *dnl = dn->get_linkage();
428 if (dn->get_version() <= last_scrub &&
429 dnl->get_remote_d_type() != DT_DIR &&
430 !header->get_force()) {
431 dout(15) << __func__ << " skip dentry " << dnk
432 << ", no change since last scrub" << dendl;
433 continue;
434 }
435 if (dnl->is_primary()) {
436 _enqueue(dnl->get_inode(), header, false);
437 } else if (dnl->is_remote()) {
438 // TODO: check remote linkage
439 }
440 }
441 }
442
443 if (!dir->scrub_local()) {
444 std::string path;
445 dir->get_inode()->make_path_string(path, true);
446 clog->warn() << "Scrub error on dir " << dir->ino()
447 << " (" << path << ") see " << g_conf()->name
448 << " log and `damage ls` output for details";
449 }
450
451 dir->scrub_finished();
452 dir->auth_unpin(this);
453
454 *done = true;
455 dout(10) << __func__ << " done" << dendl;
456 }
457
458 void ScrubStack::scrub_file_inode(CInode *in)
459 {
460 C_InodeValidated *fin = new C_InodeValidated(mdcache->mds, this, in);
461 // At this stage the DN is already past scrub_initialize, so
462 // it's in the cache, it has PIN_SCRUBQUEUE and it is authpinned
463 in->validate_disk_state(&fin->result, fin);
464 }
465
466 void ScrubStack::_validate_inode_done(CInode *in, int r,
467 const CInode::validated_data &result)
468 {
469 LogChannelRef clog = mdcache->mds->clog;
470 const ScrubHeaderRefConst header = in->scrub_info()->header;
471
472 std::string path;
473 if (!result.passed_validation) {
474 // Build path string for use in messages
475 in->make_path_string(path, true);
476 }
477
478 if (result.backtrace.checked && !result.backtrace.passed &&
479 !result.backtrace.repaired)
480 {
481 // Record backtrace fails as remote linkage damage, as
482 // we may not be able to resolve hard links to this inode
483 mdcache->mds->damage_table.notify_remote_damaged(in->ino(), path);
484 } else if (result.inode.checked && !result.inode.passed &&
485 !result.inode.repaired) {
486 // Record damaged inode structures as damaged dentries as
487 // that is where they are stored
488 auto parent = in->get_projected_parent_dn();
489 if (parent) {
490 auto dir = parent->get_dir();
491 mdcache->mds->damage_table.notify_dentry(
492 dir->inode->ino(), dir->frag, parent->last, parent->get_name(), path);
493 }
494 }
495
496 // Inform the cluster log if we found an error
497 if (!result.passed_validation) {
498 if (result.all_damage_repaired()) {
499 clog->info() << "Scrub repaired inode " << in->ino()
500 << " (" << path << ")";
501 } else {
502 clog->warn() << "Scrub error on inode " << in->ino()
503 << " (" << path << ") see " << g_conf()->name
504 << " log and `damage ls` output for details";
505 }
506
507 // Put the verbose JSON output into the MDS log for later inspection
508 JSONFormatter f;
509 result.dump(&f);
510 CachedStackStringStream css;
511 f.flush(*css);
512 derr << __func__ << " scrub error on inode " << *in << ": " << css->strv()
513 << dendl;
514 } else {
515 dout(10) << __func__ << " scrub passed on inode " << *in << dendl;
516 }
517
518 in->scrub_finished();
519 }
520
521 void ScrubStack::complete_control_contexts(int r) {
522 ceph_assert(ceph_mutex_is_locked_by_me(mdcache->mds->mds_lock));
523
524 for (auto &ctx : control_ctxs) {
525 ctx->complete(r);
526 }
527 control_ctxs.clear();
528 }
529
530 void ScrubStack::set_state(State next_state) {
531 if (state != next_state) {
532 dout(20) << __func__ << ", from state=" << state << ", to state="
533 << next_state << dendl;
534 state = next_state;
535 clog_scrub_summary();
536 }
537 }
538
539 bool ScrubStack::scrub_in_transition_state() {
540 ceph_assert(ceph_mutex_is_locked_by_me(mdcache->mds->mds_lock));
541 dout(20) << __func__ << ": state=" << state << dendl;
542
543 // STATE_RUNNING is considered as a transition state so as to
544 // "delay" the scrub control operation.
545 if (state == STATE_RUNNING || state == STATE_PAUSING) {
546 return true;
547 }
548
549 return false;
550 }
551
552 std::string_view ScrubStack::scrub_summary() {
553 ceph_assert(ceph_mutex_is_locked_by_me(mdcache->mds->mds_lock));
554
555 bool have_more = false;
556 CachedStackStringStream cs;
557
558 if (state == STATE_IDLE) {
559 if (scrubbing_map.empty())
560 return "idle";
561 *cs << "idle+waiting";
562 }
563
564 if (state == STATE_RUNNING) {
565 if (clear_stack) {
566 *cs << "aborting";
567 } else {
568 *cs << "active";
569 }
570 } else {
571 if (state == STATE_PAUSING) {
572 have_more = true;
573 *cs << "pausing";
574 } else if (state == STATE_PAUSED) {
575 have_more = true;
576 *cs << "paused";
577 }
578
579 if (clear_stack) {
580 if (have_more) {
581 *cs << "+";
582 }
583 *cs << "aborting";
584 }
585 }
586
587 if (!scrubbing_map.empty()) {
588 *cs << " paths [";
589 bool first = true;
590 for (auto &p : scrubbing_map) {
591 if (!first)
592 *cs << ",";
593 auto& header = p.second;
594 if (CInode *in = mdcache->get_inode(header->get_origin()))
595 *cs << scrub_inode_path(in);
596 else
597 *cs << "#" << header->get_origin();
598 first = false;
599 }
600 *cs << "]";
601 }
602
603 return cs->strv();
604 }
605
606 void ScrubStack::scrub_status(Formatter *f) {
607 ceph_assert(ceph_mutex_is_locked_by_me(mdcache->mds->mds_lock));
608
609 f->open_object_section("result");
610
611 CachedStackStringStream css;
612 bool have_more = false;
613
614 if (state == STATE_IDLE) {
615 if (scrubbing_map.empty())
616 *css << "no active scrubs running";
617 else
618 *css << state << " (waiting for more scrubs)";
619 } else if (state == STATE_RUNNING) {
620 if (clear_stack) {
621 *css << "ABORTING";
622 } else {
623 *css << "scrub active";
624 }
625 *css << " (" << stack_size << " inodes in the stack)";
626 } else {
627 if (state == STATE_PAUSING || state == STATE_PAUSED) {
628 have_more = true;
629 *css << state;
630 }
631 if (clear_stack) {
632 if (have_more) {
633 *css << "+";
634 }
635 *css << "ABORTING";
636 }
637
638 *css << " (" << stack_size << " inodes in the stack)";
639 }
640 f->dump_string("status", css->strv());
641
642 f->open_object_section("scrubs");
643
644 for (auto& p : scrubbing_map) {
645 have_more = false;
646 auto& header = p.second;
647
648 std::string tag(header->get_tag());
649 f->open_object_section(tag.c_str()); // scrub id
650
651 if (CInode *in = mdcache->get_inode(header->get_origin()))
652 f->dump_string("path", scrub_inode_path(in));
653 else
654 f->dump_stream("path") << "#" << header->get_origin();
655
656 f->dump_string("tag", header->get_tag());
657
658 CachedStackStringStream optcss;
659 if (header->get_recursive()) {
660 *optcss << "recursive";
661 have_more = true;
662 }
663 if (header->get_repair()) {
664 if (have_more) {
665 *optcss << ",";
666 }
667 *optcss << "repair";
668 have_more = true;
669 }
670 if (header->get_force()) {
671 if (have_more) {
672 *optcss << ",";
673 }
674 *optcss << "force";
675 }
676
677 f->dump_string("options", optcss->strv());
678 f->close_section(); // scrub id
679 }
680 f->close_section(); // scrubs
681 f->close_section(); // result
682 }
683
684 void ScrubStack::abort_pending_scrubs() {
685 ceph_assert(ceph_mutex_is_locked_by_me(mdcache->mds->mds_lock));
686 ceph_assert(clear_stack);
687
688 auto abort_one = [this](MDSCacheObject *obj) {
689 if (CInode *in = dynamic_cast<CInode*>(obj)) {
690 in->scrub_aborted();
691 } else if (CDir *dir = dynamic_cast<CDir*>(obj)) {
692 dir->scrub_aborted();
693 dir->auth_unpin(this);
694 } else {
695 ceph_abort(0 == "dentry in scrub stack");
696 }
697 };
698 for (auto it = scrub_stack.begin(); !it.end(); ++it)
699 abort_one(*it);
700 for (auto it = scrub_waiting.begin(); !it.end(); ++it)
701 abort_one(*it);
702
703 stack_size = 0;
704 scrub_stack.clear();
705 scrub_waiting.clear();
706
707 for (auto& p : remote_scrubs)
708 remove_from_waiting(p.first, false);
709 remote_scrubs.clear();
710
711 clear_stack = false;
712 }
713
714 void ScrubStack::send_state_message(int op) {
715 MDSRank *mds = mdcache->mds;
716 set<mds_rank_t> up_mds;
717 mds->get_mds_map()->get_up_mds_set(up_mds);
718 for (auto& r : up_mds) {
719 if (r == 0)
720 continue;
721 auto m = make_message<MMDSScrub>(op);
722 mds->send_message_mds(m, r);
723 }
724 }
725
726 void ScrubStack::scrub_abort(Context *on_finish) {
727 ceph_assert(ceph_mutex_is_locked_by_me(mdcache->mds->mds_lock));
728
729 dout(10) << __func__ << ": aborting with " << scrubs_in_progress
730 << " scrubs in progress and " << stack_size << " in the"
731 << " stack" << dendl;
732
733 if (mdcache->mds->get_nodeid() == 0) {
734 scrub_epoch_last_abort = scrub_epoch;
735 scrub_any_peer_aborting = true;
736 send_state_message(MMDSScrub::OP_ABORT);
737 }
738
739 clear_stack = true;
740 if (scrub_in_transition_state()) {
741 if (on_finish)
742 control_ctxs.push_back(on_finish);
743 return;
744 }
745
746 abort_pending_scrubs();
747 if (state != STATE_PAUSED)
748 set_state(STATE_IDLE);
749
750 if (on_finish)
751 on_finish->complete(0);
752 }
753
754 void ScrubStack::scrub_pause(Context *on_finish) {
755 ceph_assert(ceph_mutex_is_locked_by_me(mdcache->mds->mds_lock));
756
757 dout(10) << __func__ << ": pausing with " << scrubs_in_progress
758 << " scrubs in progress and " << stack_size << " in the"
759 << " stack" << dendl;
760
761 if (mdcache->mds->get_nodeid() == 0)
762 send_state_message(MMDSScrub::OP_PAUSE);
763
764 // abort is in progress
765 if (clear_stack) {
766 if (on_finish)
767 on_finish->complete(-CEPHFS_EINVAL);
768 return;
769 }
770
771 bool done = scrub_in_transition_state();
772 if (done) {
773 set_state(STATE_PAUSING);
774 if (on_finish)
775 control_ctxs.push_back(on_finish);
776 return;
777 }
778
779 set_state(STATE_PAUSED);
780 if (on_finish)
781 on_finish->complete(0);
782 }
783
784 bool ScrubStack::scrub_resume() {
785 ceph_assert(ceph_mutex_is_locked_by_me(mdcache->mds->mds_lock));
786 dout(20) << __func__ << ": state=" << state << dendl;
787
788 if (mdcache->mds->get_nodeid() == 0)
789 send_state_message(MMDSScrub::OP_RESUME);
790
791 int r = 0;
792
793 if (clear_stack) {
794 r = -CEPHFS_EINVAL;
795 } else if (state == STATE_PAUSING) {
796 set_state(STATE_RUNNING);
797 complete_control_contexts(-CEPHFS_ECANCELED);
798 } else if (state == STATE_PAUSED) {
799 set_state(STATE_RUNNING);
800 kick_off_scrubs();
801 }
802
803 return r;
804 }
805
806 // send current scrub summary to cluster log
807 void ScrubStack::clog_scrub_summary(CInode *in) {
808 if (in) {
809 std::string what;
810 if (clear_stack) {
811 what = "aborted";
812 } else if (in->scrub_is_in_progress()) {
813 what = "queued";
814 } else {
815 what = "completed";
816 }
817 clog->info() << "scrub " << what << " for path: " << scrub_inode_path(in);
818 }
819
820 clog->info() << "scrub summary: " << scrub_summary();
821 }
822
823 void ScrubStack::dispatch(const cref_t<Message> &m)
824 {
825 switch (m->get_type()) {
826 case MSG_MDS_SCRUB:
827 handle_scrub(ref_cast<MMDSScrub>(m));
828 break;
829
830 case MSG_MDS_SCRUB_STATS:
831 handle_scrub_stats(ref_cast<MMDSScrubStats>(m));
832 break;
833
834 default:
835 derr << " scrub stack unknown message " << m->get_type() << dendl_impl;
836 ceph_abort_msg("scrub stack unknown message");
837 }
838 }
839
840 void ScrubStack::handle_scrub(const cref_t<MMDSScrub> &m)
841 {
842
843 mds_rank_t from = mds_rank_t(m->get_source().num());
844 dout(10) << __func__ << " " << *m << " from mds." << from << dendl;
845
846 switch (m->get_op()) {
847 case MMDSScrub::OP_QUEUEDIR:
848 {
849 CInode *diri = mdcache->get_inode(m->get_ino());
850 ceph_assert(diri);
851
852 std::vector<CDir*> dfs;
853 MDSGatherBuilder gather(g_ceph_context);
854 for (const auto& fg : m->get_frags()) {
855 CDir *dir = diri->get_dirfrag(fg);
856 if (!dir) {
857 dout(10) << __func__ << " no frag " << fg << dendl;
858 continue;
859 }
860 if (!dir->is_auth()) {
861 dout(10) << __func__ << " not auth " << *dir << dendl;
862 continue;
863 }
864 if (!dir->can_auth_pin()) {
865 dout(10) << __func__ << " can't auth pin " << *dir << dendl;
866 dir->add_waiter(CDir::WAIT_UNFREEZE, gather.new_sub());
867 continue;
868 }
869 dfs.push_back(dir);
870 }
871
872 if (gather.has_subs()) {
873 gather.set_finisher(new C_MDS_RetryMessage(mdcache->mds, m));
874 gather.activate();
875 return;
876 }
877
878 fragset_t queued;
879 if (!dfs.empty()) {
880 ScrubHeaderRef header;
881 if (auto it = scrubbing_map.find(m->get_tag()); it != scrubbing_map.end()) {
882 header = it->second;
883 } else {
884 header = std::make_shared<ScrubHeader>(m->get_tag(), m->is_internal_tag(),
885 m->is_force(), m->is_recursive(),
886 m->is_repair());
887 header->set_origin(m->get_origin());
888 scrubbing_map.emplace(header->get_tag(), header);
889 }
890 for (auto dir : dfs) {
891 queued.insert_raw(dir->get_frag());
892 _enqueue(dir, header, true);
893 }
894 queued.simplify();
895 kick_off_scrubs();
896 }
897
898 auto r = make_message<MMDSScrub>(MMDSScrub::OP_QUEUEDIR_ACK, m->get_ino(),
899 std::move(queued), m->get_tag());
900 mdcache->mds->send_message_mds(r, from);
901 }
902 break;
903 case MMDSScrub::OP_QUEUEDIR_ACK:
904 {
905 CInode *diri = mdcache->get_inode(m->get_ino());
906 ceph_assert(diri);
907 auto it = remote_scrubs.find(diri);
908 if (it != remote_scrubs.end() &&
909 m->get_tag() == it->second.tag) {
910 if (it->second.gather_set.erase(from)) {
911 auto &queued = diri->scrub_queued_frags();
912 for (auto &fg : m->get_frags())
913 queued.insert_raw(fg);
914 queued.simplify();
915
916 if (it->second.gather_set.empty()) {
917 remote_scrubs.erase(it);
918
919 const auto& header = diri->get_scrub_header();
920 header->set_epoch_last_forwarded(scrub_epoch);
921 remove_from_waiting(diri);
922 }
923 }
924 }
925 }
926 break;
927 case MMDSScrub::OP_QUEUEINO:
928 {
929 CInode *in = mdcache->get_inode(m->get_ino());
930 ceph_assert(in);
931
932 ScrubHeaderRef header;
933 if (auto it = scrubbing_map.find(m->get_tag()); it != scrubbing_map.end()) {
934 header = it->second;
935 } else {
936 header = std::make_shared<ScrubHeader>(m->get_tag(), m->is_internal_tag(),
937 m->is_force(), m->is_recursive(),
938 m->is_repair());
939 header->set_origin(m->get_origin());
940 scrubbing_map.emplace(header->get_tag(), header);
941 }
942
943 _enqueue(in, header, true);
944 in->scrub_queued_frags() = m->get_frags();
945 kick_off_scrubs();
946
947 fragset_t queued;
948 auto r = make_message<MMDSScrub>(MMDSScrub::OP_QUEUEINO_ACK, m->get_ino(),
949 std::move(queued), m->get_tag());
950 mdcache->mds->send_message_mds(r, from);
951 }
952 break;
953 case MMDSScrub::OP_QUEUEINO_ACK:
954 {
955 CInode *in = mdcache->get_inode(m->get_ino());
956 ceph_assert(in);
957 auto it = remote_scrubs.find(in);
958 if (it != remote_scrubs.end() &&
959 m->get_tag() == it->second.tag &&
960 it->second.gather_set.erase(from)) {
961 ceph_assert(it->second.gather_set.empty());
962 remote_scrubs.erase(it);
963
964 remove_from_waiting(in, false);
965 dequeue(in);
966
967 const auto& header = in->get_scrub_header();
968 header->set_epoch_last_forwarded(scrub_epoch);
969 in->scrub_finished();
970
971 kick_off_scrubs();
972 }
973 }
974 break;
975 case MMDSScrub::OP_ABORT:
976 scrub_abort(nullptr);
977 break;
978 case MMDSScrub::OP_PAUSE:
979 scrub_pause(nullptr);
980 break;
981 case MMDSScrub::OP_RESUME:
982 scrub_resume();
983 break;
984 default:
985 derr << " scrub stack unknown scrub operation " << m->get_op() << dendl_impl;
986 ceph_abort_msg("scrub stack unknown scrub operation");
987 }
988 }
989
990 void ScrubStack::handle_scrub_stats(const cref_t<MMDSScrubStats> &m)
991 {
992 mds_rank_t from = mds_rank_t(m->get_source().num());
993 dout(7) << __func__ << " " << *m << " from mds." << from << dendl;
994
995 if (from == 0) {
996 if (scrub_epoch != m->get_epoch() - 1) {
997 scrub_epoch = m->get_epoch() - 1;
998 for (auto& p : scrubbing_map) {
999 if (p.second->get_epoch_last_forwarded())
1000 p.second->set_epoch_last_forwarded(scrub_epoch);
1001 }
1002 }
1003 bool any_finished = false;
1004 bool any_repaired = false;
1005 std::set<std::string> scrubbing_tags;
1006 for (auto it = scrubbing_map.begin(); it != scrubbing_map.end(); ) {
1007 auto& header = it->second;
1008 if (header->get_num_pending() ||
1009 header->get_epoch_last_forwarded() >= scrub_epoch) {
1010 scrubbing_tags.insert(it->first);
1011 ++it;
1012 } else if (m->is_finished(it->first)) {
1013 any_finished = true;
1014 if (header->get_repaired())
1015 any_repaired = true;
1016 scrubbing_map.erase(it++);
1017 } else {
1018 ++it;
1019 }
1020 }
1021
1022 scrub_epoch = m->get_epoch();
1023
1024 auto ack = make_message<MMDSScrubStats>(scrub_epoch,
1025 std::move(scrubbing_tags), clear_stack);
1026 mdcache->mds->send_message_mds(ack, 0);
1027
1028 if (any_finished)
1029 clog_scrub_summary();
1030 if (any_repaired)
1031 mdcache->mds->mdlog->trim_all();
1032 } else {
1033 if (scrub_epoch == m->get_epoch() &&
1034 (size_t)from < mds_scrub_stats.size()) {
1035 auto& stat = mds_scrub_stats[from];
1036 stat.epoch_acked = m->get_epoch();
1037 stat.scrubbing_tags = m->get_scrubbing_tags();
1038 stat.aborting = m->is_aborting();
1039 }
1040 }
1041 }
1042
1043 void ScrubStack::advance_scrub_status()
1044 {
1045 if (!scrub_any_peer_aborting && scrubbing_map.empty())
1046 return;
1047
1048 MDSRank *mds = mdcache->mds;
1049
1050 set<mds_rank_t> up_mds;
1051 mds->get_mds_map()->get_up_mds_set(up_mds);
1052 auto up_max = *up_mds.rbegin();
1053
1054 bool update_scrubbing = false;
1055 std::set<std::string> scrubbing_tags;
1056
1057 if (up_max == 0) {
1058 update_scrubbing = true;
1059 scrub_any_peer_aborting = false;
1060 } else if (mds_scrub_stats.size() > (size_t)(up_max)) {
1061 bool any_aborting = false;
1062 bool fully_acked = true;
1063 for (const auto& stat : mds_scrub_stats) {
1064 if (stat.aborting || stat.epoch_acked <= scrub_epoch_last_abort)
1065 any_aborting = true;
1066 if (stat.epoch_acked != scrub_epoch) {
1067 fully_acked = false;
1068 continue;
1069 }
1070 scrubbing_tags.insert(stat.scrubbing_tags.begin(),
1071 stat.scrubbing_tags.end());
1072 }
1073 if (!any_aborting)
1074 scrub_any_peer_aborting = false;
1075 if (fully_acked) {
1076 // handle_scrub_stats() reports scrub is still in-progress if it has
1077 // forwarded any object to other mds since previous epoch. Let's assume,
1078 // at time 'A', we got scrub stats from all mds for previous epoch. If
1079 // a scrub is not reported by any mds, we know there is no forward of
1080 // the scrub since time 'A'. So we can consider the scrub is finished.
1081 if (scrub_epoch_fully_acked + 1 == scrub_epoch)
1082 update_scrubbing = true;
1083 scrub_epoch_fully_acked = scrub_epoch;
1084 }
1085 }
1086
1087 if (mds_scrub_stats.size() != (size_t)up_max + 1)
1088 mds_scrub_stats.resize((size_t)up_max + 1);
1089 mds_scrub_stats.at(0).epoch_acked = scrub_epoch + 1;
1090
1091 bool any_finished = false;
1092 bool any_repaired = false;
1093
1094 for (auto it = scrubbing_map.begin(); it != scrubbing_map.end(); ) {
1095 auto& header = it->second;
1096 if (header->get_num_pending() ||
1097 header->get_epoch_last_forwarded() >= scrub_epoch) {
1098 if (update_scrubbing && up_max != 0)
1099 scrubbing_tags.insert(it->first);
1100 ++it;
1101 } else if (update_scrubbing && !scrubbing_tags.count(it->first)) {
1102 // no longer being scrubbed globally
1103 any_finished = true;
1104 if (header->get_repaired())
1105 any_repaired = true;
1106 scrubbing_map.erase(it++);
1107 } else {
1108 ++it;
1109 }
1110 }
1111
1112 ++scrub_epoch;
1113
1114 for (auto& r : up_mds) {
1115 if (r == 0)
1116 continue;
1117 auto m = update_scrubbing ?
1118 make_message<MMDSScrubStats>(scrub_epoch, scrubbing_tags) :
1119 make_message<MMDSScrubStats>(scrub_epoch);
1120 mds->send_message_mds(m, r);
1121 }
1122
1123 if (any_finished)
1124 clog_scrub_summary();
1125 if (any_repaired)
1126 mdcache->mds->mdlog->trim_all();
1127 }
1128
1129 void ScrubStack::handle_mds_failure(mds_rank_t mds)
1130 {
1131 if (mds == 0) {
1132 scrub_abort(nullptr);
1133 return;
1134 }
1135
1136 bool kick = false;
1137 for (auto it = remote_scrubs.begin(); it != remote_scrubs.end(); ) {
1138 if (it->second.gather_set.erase(mds) &&
1139 it->second.gather_set.empty()) {
1140 CInode *in = it->first;
1141 remote_scrubs.erase(it++);
1142 remove_from_waiting(in, false);
1143 kick = true;
1144 } else {
1145 ++it;
1146 }
1147 }
1148 if (kick)
1149 kick_off_scrubs();
1150 }