1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2014 Red Hat
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
15 #include "ScrubStack.h"
16 #include "common/Finisher.h"
17 #include "mds/MDSRank.h"
18 #include "mds/MDCache.h"
19 #include "mds/MDSContinuation.h"
21 #define dout_context g_ceph_context
22 #define dout_subsys ceph_subsys_mds
24 #define dout_prefix _prefix(_dout, mdcache->mds)
28 static std::ostream
& _prefix(std::ostream
*_dout
, MDSRank
*mds
) {
29 return *_dout
<< "mds." << mds
->get_nodeid() << ".scrubstack ";
32 std::ostream
&operator<<(std::ostream
&os
, const ScrubStack::State
&state
) {
34 case ScrubStack::STATE_RUNNING
:
37 case ScrubStack::STATE_IDLE
:
40 case ScrubStack::STATE_PAUSING
:
43 case ScrubStack::STATE_PAUSED
:
53 void ScrubStack::dequeue(MDSCacheObject
*obj
)
55 dout(20) << "dequeue " << *obj
<< " from ScrubStack" << dendl
;
56 ceph_assert(obj
->item_scrub
.is_on_list());
57 obj
->put(MDSCacheObject::PIN_SCRUBQUEUE
);
58 obj
->item_scrub
.remove_myself();
62 int ScrubStack::_enqueue(MDSCacheObject
*obj
, ScrubHeaderRef
& header
, bool top
)
64 ceph_assert(ceph_mutex_is_locked_by_me(mdcache
->mds
->mds_lock
));
65 if (CInode
*in
= dynamic_cast<CInode
*>(obj
)) {
66 if (in
->scrub_is_in_progress()) {
67 dout(10) << __func__
<< " with {" << *in
<< "}" << ", already in scrubbing" << dendl
;
70 if(in
->state_test(CInode::STATE_PURGING
)) {
71 dout(10) << *obj
<< " is purging, skip pushing into scrub stack" << dendl
;
72 // treating this as success since purge will make sure this inode goes away
76 dout(10) << __func__
<< " with {" << *in
<< "}" << ", top=" << top
<< dendl
;
77 in
->scrub_initialize(header
);
78 } else if (CDir
*dir
= dynamic_cast<CDir
*>(obj
)) {
79 if (dir
->scrub_is_in_progress()) {
80 dout(10) << __func__
<< " with {" << *dir
<< "}" << ", already in scrubbing" << dendl
;
83 if(dir
->get_inode()->state_test(CInode::STATE_PURGING
)) {
84 dout(10) << *obj
<< " is purging, skip pushing into scrub stack" << dendl
;
85 // treating this as success since purge will make sure this dir inode goes away
89 dout(10) << __func__
<< " with {" << *dir
<< "}" << ", top=" << top
<< dendl
;
90 // The edge directory must be in memory
92 dir
->scrub_initialize(header
);
94 ceph_assert(0 == "queue dentry to scrub stack");
97 dout(20) << "enqueue " << *obj
<< " to " << (top
? "top" : "bottom") << " of ScrubStack" << dendl
;
98 if (!obj
->item_scrub
.is_on_list()) {
99 obj
->get(MDSCacheObject::PIN_SCRUBQUEUE
);
103 scrub_stack
.push_front(&obj
->item_scrub
);
105 scrub_stack
.push_back(&obj
->item_scrub
);
109 int ScrubStack::enqueue(CInode
*in
, ScrubHeaderRef
& header
, bool top
)
113 return -CEPHFS_EAGAIN
;
115 header
->set_origin(in
->ino());
116 auto ret
= scrubbing_map
.emplace(header
->get_tag(), header
);
118 dout(10) << __func__
<< " with {" << *in
<< "}"
119 << ", conflicting tag " << header
->get_tag() << dendl
;
120 return -CEPHFS_EEXIST
;
122 if (header
->get_scrub_mdsdir()) {
125 rank
= mdcache
->mds
->get_nodeid();
126 if(rank
>= 0 && rank
< MAX_MDS
) {
127 fp
.set_path("", MDS_INO_MDSDIR(rank
));
129 int r
= _enqueue(mdcache
->get_inode(fp
.get_ino()), header
, true);
133 //to make sure mdsdir is always on the top
136 int r
= _enqueue(in
, header
, top
);
140 clog_scrub_summary(in
);
146 void ScrubStack::add_to_waiting(MDSCacheObject
*obj
)
148 scrubs_in_progress
++;
149 obj
->item_scrub
.remove_myself();
150 scrub_waiting
.push_back(&obj
->item_scrub
);
153 void ScrubStack::remove_from_waiting(MDSCacheObject
*obj
, bool kick
)
155 scrubs_in_progress
--;
156 if (obj
->item_scrub
.is_on_list()) {
157 obj
->item_scrub
.remove_myself();
158 scrub_stack
.push_front(&obj
->item_scrub
);
164 class C_RetryScrub
: public MDSInternalContext
{
166 C_RetryScrub(ScrubStack
*s
, MDSCacheObject
*o
) :
167 MDSInternalContext(s
->mdcache
->mds
), stack(s
), obj(o
) {
168 stack
->add_to_waiting(obj
);
170 void finish(int r
) override
{
171 stack
->remove_from_waiting(obj
);
178 void ScrubStack::kick_off_scrubs()
180 ceph_assert(ceph_mutex_is_locked(mdcache
->mds
->mds_lock
));
181 dout(20) << __func__
<< ": state=" << state
<< dendl
;
183 if (clear_stack
|| state
== STATE_PAUSING
|| state
== STATE_PAUSED
) {
184 if (scrubs_in_progress
== 0) {
185 dout(10) << __func__
<< ": in progress scrub operations finished, "
186 << stack_size
<< " in the stack" << dendl
;
188 State final_state
= state
;
190 abort_pending_scrubs();
191 final_state
= STATE_IDLE
;
193 if (state
== STATE_PAUSING
) {
194 final_state
= STATE_PAUSED
;
197 set_state(final_state
);
198 complete_control_contexts(0);
204 dout(20) << __func__
<< " entering with " << scrubs_in_progress
<< " in "
205 "progress and " << stack_size
<< " in the stack" << dendl
;
206 elist
<MDSCacheObject
*>::iterator it
= scrub_stack
.begin();
207 while (g_conf()->mds_max_scrub_ops_in_progress
> scrubs_in_progress
) {
209 if (scrubs_in_progress
== 0) {
210 set_state(STATE_IDLE
);
216 assert(state
== STATE_RUNNING
|| state
== STATE_IDLE
);
217 set_state(STATE_RUNNING
);
219 if (CInode
*in
= dynamic_cast<CInode
*>(*it
)) {
220 dout(20) << __func__
<< " examining " << *in
<< dendl
;
223 if (!validate_inode_auth(in
))
227 // it's a regular file, symlink, or hard link
228 dequeue(in
); // we only touch it this once, so remove from stack
230 scrub_file_inode(in
);
232 bool added_children
= false;
233 bool done
= false; // it's done, so pop it off the stack
234 scrub_dir_inode(in
, &added_children
, &done
);
236 dout(20) << __func__
<< " dir inode, done" << dendl
;
239 if (added_children
) {
240 // dirfrags were queued at top of stack
241 it
= scrub_stack
.begin();
244 } else if (CDir
*dir
= dynamic_cast<CDir
*>(*it
)) {
247 bool done
= false; // it's done, so pop it off the stack
248 scrub_dirfrag(dir
, &done
);
250 dout(20) << __func__
<< " dirfrag, done" << dendl
;
251 ++it
; // child inodes were queued at bottom of stack
257 ceph_assert(0 == "dentry in scrub stack");
262 bool ScrubStack::validate_inode_auth(CInode
*in
)
265 if (!in
->can_auth_pin()) {
266 dout(10) << __func__
<< " can't auth pin" << dendl
;
267 in
->add_waiter(CInode::WAIT_UNFREEZE
, new C_RetryScrub(this, in
));
272 MDSRank
*mds
= mdcache
->mds
;
273 if (in
->is_ambiguous_auth()) {
274 dout(10) << __func__
<< " ambiguous auth" << dendl
;
275 in
->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH
, new C_RetryScrub(this, in
));
276 } else if (mds
->is_cluster_degraded()) {
277 dout(20) << __func__
<< " cluster degraded" << dendl
;
278 mds
->wait_for_cluster_recovered(new C_RetryScrub(this, in
));
280 ScrubHeaderRef header
= in
->get_scrub_header();
283 auto ret
= remote_scrubs
.emplace(std::piecewise_construct
,
284 std::forward_as_tuple(in
),
285 std::forward_as_tuple());
286 ceph_assert(ret
.second
); // FIXME: parallel scrubs?
287 auto &scrub_r
= ret
.first
->second
;
288 scrub_r
.tag
= header
->get_tag();
290 mds_rank_t auth
= in
->authority().first
;
291 dout(10) << __func__
<< " forward to mds." << auth
<< dendl
;
292 auto r
= make_message
<MMDSScrub
>(MMDSScrub::OP_QUEUEINO
, in
->ino(),
293 std::move(in
->scrub_queued_frags()),
294 header
->get_tag(), header
->get_origin(),
295 header
->is_internal_tag(), header
->get_force(),
296 header
->get_recursive(), header
->get_repair());
297 mdcache
->mds
->send_message_mds(r
, auth
);
299 scrub_r
.gather_set
.insert(auth
);
307 void ScrubStack::scrub_dir_inode(CInode
*in
, bool *added_children
, bool *done
)
309 dout(10) << __func__
<< " " << *in
<< dendl
;
310 ceph_assert(in
->is_auth());
311 MDSRank
*mds
= mdcache
->mds
;
313 ScrubHeaderRef header
= in
->get_scrub_header();
316 MDSGatherBuilder
gather(g_ceph_context
);
318 auto &queued
= in
->scrub_queued_frags();
319 std::map
<mds_rank_t
, fragset_t
> scrub_remote
;
322 in
->dirfragtree
.get_leaves(frags
);
323 dout(20) << __func__
<< "recursive mode, frags " << frags
<< dendl
;
324 for (auto &fg
: frags
) {
325 if (queued
.contains(fg
))
327 CDir
*dir
= in
->get_or_open_dirfrag(mdcache
, fg
);
328 if (!dir
->is_auth()) {
329 if (dir
->is_ambiguous_auth()) {
330 dout(20) << __func__
<< " ambiguous auth " << *dir
<< dendl
;
331 dir
->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH
, gather
.new_sub());
332 } else if (mds
->is_cluster_degraded()) {
333 dout(20) << __func__
<< " cluster degraded" << dendl
;
334 mds
->wait_for_cluster_recovered(gather
.new_sub());
336 mds_rank_t auth
= dir
->authority().first
;
337 scrub_remote
[auth
].insert_raw(fg
);
339 } else if (!dir
->can_auth_pin()) {
340 dout(20) << __func__
<< " freezing/frozen " << *dir
<< dendl
;
341 dir
->add_waiter(CDir::WAIT_UNFREEZE
, gather
.new_sub());
342 } else if (dir
->get_version() == 0) {
343 dout(20) << __func__
<< " barebones " << *dir
<< dendl
;
344 dir
->fetch_keys({}, gather
.new_sub());
346 _enqueue(dir
, header
, true);
347 queued
.insert_raw(dir
->get_frag());
348 *added_children
= true;
354 if (gather
.has_subs()) {
355 gather
.set_finisher(new C_RetryScrub(this, in
));
360 if (!scrub_remote
.empty()) {
361 auto ret
= remote_scrubs
.emplace(std::piecewise_construct
,
362 std::forward_as_tuple(in
),
363 std::forward_as_tuple());
364 ceph_assert(ret
.second
); // FIXME: parallel scrubs?
365 auto &scrub_r
= ret
.first
->second
;
366 scrub_r
.tag
= header
->get_tag();
368 for (auto& p
: scrub_remote
) {
370 dout(20) << __func__
<< " forward " << p
.second
<< " to mds." << p
.first
<< dendl
;
371 auto r
= make_message
<MMDSScrub
>(MMDSScrub::OP_QUEUEDIR
, in
->ino(),
372 std::move(p
.second
), header
->get_tag(),
373 header
->get_origin(), header
->is_internal_tag(),
374 header
->get_force(), header
->get_recursive(),
375 header
->get_repair());
376 mds
->send_message_mds(r
, p
.first
);
377 scrub_r
.gather_set
.insert(p
.first
);
384 scrub_dir_inode_final(in
);
387 dout(10) << __func__
<< " done" << dendl
;
390 class C_InodeValidated
: public MDSInternalContext
394 CInode::validated_data result
;
397 C_InodeValidated(MDSRank
*mds
, ScrubStack
*stack_
, CInode
*target_
)
398 : MDSInternalContext(mds
), stack(stack_
), target(target_
)
400 stack
->scrubs_in_progress
++;
402 void finish(int r
) override
{
403 stack
->_validate_inode_done(target
, r
, result
);
404 stack
->scrubs_in_progress
--;
405 stack
->kick_off_scrubs();
409 void ScrubStack::scrub_dir_inode_final(CInode
*in
)
411 dout(20) << __func__
<< " " << *in
<< dendl
;
413 C_InodeValidated
*fin
= new C_InodeValidated(mdcache
->mds
, this, in
);
414 in
->validate_disk_state(&fin
->result
, fin
);
418 void ScrubStack::scrub_dirfrag(CDir
*dir
, bool *done
)
420 ceph_assert(dir
!= NULL
);
422 dout(10) << __func__
<< " " << *dir
<< dendl
;
424 if (!dir
->is_complete()) {
425 dir
->fetch(new C_RetryScrub(this, dir
), true); // already auth pinned
426 dout(10) << __func__
<< " incomplete, fetching" << dendl
;
430 ScrubHeaderRef header
= dir
->get_scrub_header();
431 version_t last_scrub
= dir
->scrub_info()->last_recursive
.version
;
432 if (header
->get_recursive()) {
433 auto next_seq
= mdcache
->get_global_snaprealm()->get_newest_seq()+1;
434 for (auto it
= dir
->begin(); it
!= dir
->end(); /* nop */) {
435 auto [dnk
, dn
] = *it
;
436 ++it
; /* trim (in the future) may remove dentry */
438 if (dn
->scrub(next_seq
)) {
440 dir
->get_inode()->make_path_string(path
, true);
441 clog
->warn() << "Scrub error on dentry " << *dn
442 << " see " << g_conf()->name
443 << " log and `damage ls` output for details";
446 if (dnk
.snapid
!= CEPH_NOSNAP
) {
450 CDentry::linkage_t
*dnl
= dn
->get_linkage();
451 if (dn
->get_version() <= last_scrub
&&
452 dnl
->get_remote_d_type() != DT_DIR
&&
453 !header
->get_force()) {
454 dout(15) << __func__
<< " skip dentry " << dnk
455 << ", no change since last scrub" << dendl
;
458 if (dnl
->is_primary()) {
459 _enqueue(dnl
->get_inode(), header
, false);
460 } else if (dnl
->is_remote()) {
461 // TODO: check remote linkage
466 if (!dir
->scrub_local()) {
468 dir
->get_inode()->make_path_string(path
, true);
469 clog
->warn() << "Scrub error on dir " << dir
->ino()
470 << " (" << path
<< ") see " << g_conf()->name
471 << " log and `damage ls` output for details";
474 dir
->scrub_finished();
475 dir
->auth_unpin(this);
478 dout(10) << __func__
<< " done" << dendl
;
481 void ScrubStack::scrub_file_inode(CInode
*in
)
483 C_InodeValidated
*fin
= new C_InodeValidated(mdcache
->mds
, this, in
);
484 // At this stage the DN is already past scrub_initialize, so
485 // it's in the cache, it has PIN_SCRUBQUEUE and it is authpinned
486 in
->validate_disk_state(&fin
->result
, fin
);
489 void ScrubStack::_validate_inode_done(CInode
*in
, int r
,
490 const CInode::validated_data
&result
)
492 LogChannelRef clog
= mdcache
->mds
->clog
;
493 const ScrubHeaderRefConst header
= in
->scrub_info()->header
;
496 if (!result
.passed_validation
) {
497 // Build path string for use in messages
498 in
->make_path_string(path
, true);
501 if (result
.backtrace
.checked
&& !result
.backtrace
.passed
&&
502 !result
.backtrace
.repaired
)
504 // Record backtrace fails as remote linkage damage, as
505 // we may not be able to resolve hard links to this inode
506 mdcache
->mds
->damage_table
.notify_remote_damaged(in
->ino(), path
);
507 } else if (result
.inode
.checked
&& !result
.inode
.passed
&&
508 !result
.inode
.repaired
) {
509 // Record damaged inode structures as damaged dentries as
510 // that is where they are stored
511 auto parent
= in
->get_projected_parent_dn();
513 auto dir
= parent
->get_dir();
514 mdcache
->mds
->damage_table
.notify_dentry(
515 dir
->inode
->ino(), dir
->frag
, parent
->last
, parent
->get_name(), path
);
519 // Inform the cluster log if we found an error
520 if (!result
.passed_validation
) {
521 if (result
.all_damage_repaired()) {
522 clog
->info() << "Scrub repaired inode " << in
->ino()
523 << " (" << path
<< ")";
525 clog
->warn() << "Scrub error on inode " << in
->ino()
526 << " (" << path
<< ") see " << g_conf()->name
527 << " log and `damage ls` output for details";
530 // Put the verbose JSON output into the MDS log for later inspection
533 CachedStackStringStream css
;
535 derr
<< __func__
<< " scrub error on inode " << *in
<< ": " << css
->strv()
538 dout(10) << __func__
<< " scrub passed on inode " << *in
<< dendl
;
541 in
->scrub_finished();
544 void ScrubStack::complete_control_contexts(int r
) {
545 ceph_assert(ceph_mutex_is_locked_by_me(mdcache
->mds
->mds_lock
));
547 for (auto &ctx
: control_ctxs
) {
550 control_ctxs
.clear();
553 void ScrubStack::set_state(State next_state
) {
554 if (state
!= next_state
) {
555 dout(20) << __func__
<< ", from state=" << state
<< ", to state="
556 << next_state
<< dendl
;
558 clog_scrub_summary();
562 bool ScrubStack::scrub_in_transition_state() {
563 ceph_assert(ceph_mutex_is_locked_by_me(mdcache
->mds
->mds_lock
));
564 dout(20) << __func__
<< ": state=" << state
<< dendl
;
566 // STATE_RUNNING is considered as a transition state so as to
567 // "delay" the scrub control operation.
568 if (state
== STATE_RUNNING
|| state
== STATE_PAUSING
) {
575 std::string_view
ScrubStack::scrub_summary() {
576 ceph_assert(ceph_mutex_is_locked_by_me(mdcache
->mds
->mds_lock
));
578 bool have_more
= false;
579 CachedStackStringStream cs
;
581 if (state
== STATE_IDLE
) {
582 if (scrubbing_map
.empty())
584 *cs
<< "idle+waiting";
587 if (state
== STATE_RUNNING
) {
594 if (state
== STATE_PAUSING
) {
597 } else if (state
== STATE_PAUSED
) {
610 if (!scrubbing_map
.empty()) {
613 for (auto &p
: scrubbing_map
) {
616 auto& header
= p
.second
;
617 if (CInode
*in
= mdcache
->get_inode(header
->get_origin()))
618 *cs
<< scrub_inode_path(in
);
620 *cs
<< "#" << header
->get_origin();
629 void ScrubStack::scrub_status(Formatter
*f
) {
630 ceph_assert(ceph_mutex_is_locked_by_me(mdcache
->mds
->mds_lock
));
632 f
->open_object_section("result");
634 CachedStackStringStream css
;
635 bool have_more
= false;
637 if (state
== STATE_IDLE
) {
638 if (scrubbing_map
.empty())
639 *css
<< "no active scrubs running";
641 *css
<< state
<< " (waiting for more scrubs)";
642 } else if (state
== STATE_RUNNING
) {
646 *css
<< "scrub active";
648 *css
<< " (" << stack_size
<< " inodes in the stack)";
650 if (state
== STATE_PAUSING
|| state
== STATE_PAUSED
) {
661 *css
<< " (" << stack_size
<< " inodes in the stack)";
663 f
->dump_string("status", css
->strv());
665 f
->open_object_section("scrubs");
667 for (auto& p
: scrubbing_map
) {
669 auto& header
= p
.second
;
671 std::string
tag(header
->get_tag());
672 f
->open_object_section(tag
.c_str()); // scrub id
674 if (CInode
*in
= mdcache
->get_inode(header
->get_origin()))
675 f
->dump_string("path", scrub_inode_path(in
));
677 f
->dump_stream("path") << "#" << header
->get_origin();
679 f
->dump_string("tag", header
->get_tag());
681 CachedStackStringStream optcss
;
682 if (header
->get_recursive()) {
683 *optcss
<< "recursive";
686 if (header
->get_repair()) {
693 if (header
->get_force()) {
699 if (header
->get_scrub_mdsdir()) {
703 *optcss
<< "scrub_mdsdir";
706 f
->dump_string("options", optcss
->strv());
707 f
->close_section(); // scrub id
709 f
->close_section(); // scrubs
710 f
->close_section(); // result
713 void ScrubStack::abort_pending_scrubs() {
714 ceph_assert(ceph_mutex_is_locked_by_me(mdcache
->mds
->mds_lock
));
715 ceph_assert(clear_stack
);
717 auto abort_one
= [this](MDSCacheObject
*obj
) {
718 if (CInode
*in
= dynamic_cast<CInode
*>(obj
)) {
720 } else if (CDir
*dir
= dynamic_cast<CDir
*>(obj
)) {
721 dir
->scrub_aborted();
722 dir
->auth_unpin(this);
724 ceph_abort(0 == "dentry in scrub stack");
727 for (auto it
= scrub_stack
.begin(); !it
.end(); ++it
)
729 for (auto it
= scrub_waiting
.begin(); !it
.end(); ++it
)
734 scrub_waiting
.clear();
736 for (auto& p
: remote_scrubs
)
737 remove_from_waiting(p
.first
, false);
738 remote_scrubs
.clear();
743 void ScrubStack::send_state_message(int op
) {
744 MDSRank
*mds
= mdcache
->mds
;
745 set
<mds_rank_t
> up_mds
;
746 mds
->get_mds_map()->get_up_mds_set(up_mds
);
747 for (auto& r
: up_mds
) {
750 auto m
= make_message
<MMDSScrub
>(op
);
751 mds
->send_message_mds(m
, r
);
755 void ScrubStack::scrub_abort(Context
*on_finish
) {
756 ceph_assert(ceph_mutex_is_locked_by_me(mdcache
->mds
->mds_lock
));
758 dout(10) << __func__
<< ": aborting with " << scrubs_in_progress
759 << " scrubs in progress and " << stack_size
<< " in the"
760 << " stack" << dendl
;
762 if (mdcache
->mds
->get_nodeid() == 0) {
763 scrub_epoch_last_abort
= scrub_epoch
;
764 scrub_any_peer_aborting
= true;
765 send_state_message(MMDSScrub::OP_ABORT
);
769 if (scrub_in_transition_state()) {
771 control_ctxs
.push_back(on_finish
);
775 abort_pending_scrubs();
776 if (state
!= STATE_PAUSED
)
777 set_state(STATE_IDLE
);
780 on_finish
->complete(0);
783 void ScrubStack::scrub_pause(Context
*on_finish
) {
784 ceph_assert(ceph_mutex_is_locked_by_me(mdcache
->mds
->mds_lock
));
786 dout(10) << __func__
<< ": pausing with " << scrubs_in_progress
787 << " scrubs in progress and " << stack_size
<< " in the"
788 << " stack" << dendl
;
790 if (mdcache
->mds
->get_nodeid() == 0)
791 send_state_message(MMDSScrub::OP_PAUSE
);
793 // abort is in progress
796 on_finish
->complete(-CEPHFS_EINVAL
);
800 bool done
= scrub_in_transition_state();
802 set_state(STATE_PAUSING
);
804 control_ctxs
.push_back(on_finish
);
808 set_state(STATE_PAUSED
);
810 on_finish
->complete(0);
813 bool ScrubStack::scrub_resume() {
814 ceph_assert(ceph_mutex_is_locked_by_me(mdcache
->mds
->mds_lock
));
815 dout(20) << __func__
<< ": state=" << state
<< dendl
;
817 if (mdcache
->mds
->get_nodeid() == 0)
818 send_state_message(MMDSScrub::OP_RESUME
);
824 } else if (state
== STATE_PAUSING
) {
825 set_state(STATE_RUNNING
);
826 complete_control_contexts(-CEPHFS_ECANCELED
);
827 } else if (state
== STATE_PAUSED
) {
828 set_state(STATE_RUNNING
);
835 // send current scrub summary to cluster log
836 void ScrubStack::clog_scrub_summary(CInode
*in
) {
841 } else if (in
->scrub_is_in_progress()) {
846 clog
->info() << "scrub " << what
<< " for path: " << scrub_inode_path(in
);
849 clog
->info() << "scrub summary: " << scrub_summary();
852 void ScrubStack::dispatch(const cref_t
<Message
> &m
)
854 switch (m
->get_type()) {
856 handle_scrub(ref_cast
<MMDSScrub
>(m
));
859 case MSG_MDS_SCRUB_STATS
:
860 handle_scrub_stats(ref_cast
<MMDSScrubStats
>(m
));
864 derr
<< " scrub stack unknown message " << m
->get_type() << dendl_impl
;
865 ceph_abort_msg("scrub stack unknown message");
869 bool ScrubStack::remove_inode_if_stacked(CInode
*in
) {
870 MDSCacheObject
*obj
= dynamic_cast<MDSCacheObject
*>(in
);
871 if(obj
->item_scrub
.is_on_list()) {
872 dout(20) << "removing inode " << *in
<< " from scrub_stack" << dendl
;
873 obj
->put(MDSCacheObject::PIN_SCRUBQUEUE
);
874 obj
->item_scrub
.remove_myself();
881 void ScrubStack::handle_scrub(const cref_t
<MMDSScrub
> &m
)
884 mds_rank_t from
= mds_rank_t(m
->get_source().num());
885 dout(10) << __func__
<< " " << *m
<< " from mds." << from
<< dendl
;
887 switch (m
->get_op()) {
888 case MMDSScrub::OP_QUEUEDIR
:
890 CInode
*diri
= mdcache
->get_inode(m
->get_ino());
893 std::vector
<CDir
*> dfs
;
894 MDSGatherBuilder
gather(g_ceph_context
);
895 for (const auto& fg
: m
->get_frags()) {
896 CDir
*dir
= diri
->get_dirfrag(fg
);
898 dout(10) << __func__
<< " no frag " << fg
<< dendl
;
901 if (!dir
->is_auth()) {
902 dout(10) << __func__
<< " not auth " << *dir
<< dendl
;
905 if (!dir
->can_auth_pin()) {
906 dout(10) << __func__
<< " can't auth pin " << *dir
<< dendl
;
907 dir
->add_waiter(CDir::WAIT_UNFREEZE
, gather
.new_sub());
913 if (gather
.has_subs()) {
914 gather
.set_finisher(new C_MDS_RetryMessage(mdcache
->mds
, m
));
921 ScrubHeaderRef header
;
922 if (auto it
= scrubbing_map
.find(m
->get_tag()); it
!= scrubbing_map
.end()) {
925 header
= std::make_shared
<ScrubHeader
>(m
->get_tag(), m
->is_internal_tag(),
926 m
->is_force(), m
->is_recursive(),
928 header
->set_origin(m
->get_origin());
929 scrubbing_map
.emplace(header
->get_tag(), header
);
931 for (auto dir
: dfs
) {
932 queued
.insert_raw(dir
->get_frag());
933 _enqueue(dir
, header
, true);
939 auto r
= make_message
<MMDSScrub
>(MMDSScrub::OP_QUEUEDIR_ACK
, m
->get_ino(),
940 std::move(queued
), m
->get_tag());
941 mdcache
->mds
->send_message_mds(r
, from
);
944 case MMDSScrub::OP_QUEUEDIR_ACK
:
946 CInode
*diri
= mdcache
->get_inode(m
->get_ino());
948 auto it
= remote_scrubs
.find(diri
);
949 if (it
!= remote_scrubs
.end() &&
950 m
->get_tag() == it
->second
.tag
) {
951 if (it
->second
.gather_set
.erase(from
)) {
952 auto &queued
= diri
->scrub_queued_frags();
953 for (auto &fg
: m
->get_frags())
954 queued
.insert_raw(fg
);
957 if (it
->second
.gather_set
.empty()) {
958 remote_scrubs
.erase(it
);
960 const auto& header
= diri
->get_scrub_header();
961 header
->set_epoch_last_forwarded(scrub_epoch
);
962 remove_from_waiting(diri
);
968 case MMDSScrub::OP_QUEUEINO
:
970 CInode
*in
= mdcache
->get_inode(m
->get_ino());
973 ScrubHeaderRef header
;
974 if (auto it
= scrubbing_map
.find(m
->get_tag()); it
!= scrubbing_map
.end()) {
977 header
= std::make_shared
<ScrubHeader
>(m
->get_tag(), m
->is_internal_tag(),
978 m
->is_force(), m
->is_recursive(),
980 header
->set_origin(m
->get_origin());
981 scrubbing_map
.emplace(header
->get_tag(), header
);
984 _enqueue(in
, header
, true);
985 in
->scrub_queued_frags() = m
->get_frags();
989 auto r
= make_message
<MMDSScrub
>(MMDSScrub::OP_QUEUEINO_ACK
, m
->get_ino(),
990 std::move(queued
), m
->get_tag());
991 mdcache
->mds
->send_message_mds(r
, from
);
994 case MMDSScrub::OP_QUEUEINO_ACK
:
996 CInode
*in
= mdcache
->get_inode(m
->get_ino());
998 auto it
= remote_scrubs
.find(in
);
999 if (it
!= remote_scrubs
.end() &&
1000 m
->get_tag() == it
->second
.tag
&&
1001 it
->second
.gather_set
.erase(from
)) {
1002 ceph_assert(it
->second
.gather_set
.empty());
1003 remote_scrubs
.erase(it
);
1005 remove_from_waiting(in
, false);
1008 const auto& header
= in
->get_scrub_header();
1009 header
->set_epoch_last_forwarded(scrub_epoch
);
1010 in
->scrub_finished();
1016 case MMDSScrub::OP_ABORT
:
1017 scrub_abort(nullptr);
1019 case MMDSScrub::OP_PAUSE
:
1020 scrub_pause(nullptr);
1022 case MMDSScrub::OP_RESUME
:
1026 derr
<< " scrub stack unknown scrub operation " << m
->get_op() << dendl_impl
;
1027 ceph_abort_msg("scrub stack unknown scrub operation");
1031 void ScrubStack::handle_scrub_stats(const cref_t
<MMDSScrubStats
> &m
)
1033 mds_rank_t from
= mds_rank_t(m
->get_source().num());
1034 dout(7) << __func__
<< " " << *m
<< " from mds." << from
<< dendl
;
1037 if (scrub_epoch
!= m
->get_epoch() - 1) {
1038 scrub_epoch
= m
->get_epoch() - 1;
1039 for (auto& p
: scrubbing_map
) {
1040 if (p
.second
->get_epoch_last_forwarded())
1041 p
.second
->set_epoch_last_forwarded(scrub_epoch
);
1044 bool any_finished
= false;
1045 bool any_repaired
= false;
1046 std::set
<std::string
> scrubbing_tags
;
1047 for (auto it
= scrubbing_map
.begin(); it
!= scrubbing_map
.end(); ) {
1048 auto& header
= it
->second
;
1049 if (header
->get_num_pending() ||
1050 header
->get_epoch_last_forwarded() >= scrub_epoch
) {
1051 scrubbing_tags
.insert(it
->first
);
1053 } else if (m
->is_finished(it
->first
)) {
1054 any_finished
= true;
1055 if (header
->get_repaired())
1056 any_repaired
= true;
1057 scrubbing_map
.erase(it
++);
1063 scrub_epoch
= m
->get_epoch();
1065 auto ack
= make_message
<MMDSScrubStats
>(scrub_epoch
,
1066 std::move(scrubbing_tags
), clear_stack
);
1067 mdcache
->mds
->send_message_mds(ack
, 0);
1070 clog_scrub_summary();
1072 mdcache
->mds
->mdlog
->trim_all();
1074 if (scrub_epoch
== m
->get_epoch() &&
1075 (size_t)from
< mds_scrub_stats
.size()) {
1076 auto& stat
= mds_scrub_stats
[from
];
1077 stat
.epoch_acked
= m
->get_epoch();
1078 stat
.scrubbing_tags
= m
->get_scrubbing_tags();
1079 stat
.aborting
= m
->is_aborting();
1084 void ScrubStack::advance_scrub_status()
1086 if (!scrub_any_peer_aborting
&& scrubbing_map
.empty())
1089 MDSRank
*mds
= mdcache
->mds
;
1091 set
<mds_rank_t
> up_mds
;
1092 mds
->get_mds_map()->get_up_mds_set(up_mds
);
1093 auto up_max
= *up_mds
.rbegin();
1095 bool update_scrubbing
= false;
1096 std::set
<std::string
> scrubbing_tags
;
1099 update_scrubbing
= true;
1100 scrub_any_peer_aborting
= false;
1101 } else if (mds_scrub_stats
.size() > (size_t)(up_max
)) {
1102 bool any_aborting
= false;
1103 bool fully_acked
= true;
1104 for (const auto& stat
: mds_scrub_stats
) {
1105 if (stat
.aborting
|| stat
.epoch_acked
<= scrub_epoch_last_abort
)
1106 any_aborting
= true;
1107 if (stat
.epoch_acked
!= scrub_epoch
) {
1108 fully_acked
= false;
1111 scrubbing_tags
.insert(stat
.scrubbing_tags
.begin(),
1112 stat
.scrubbing_tags
.end());
1115 scrub_any_peer_aborting
= false;
1117 // handle_scrub_stats() reports scrub is still in-progress if it has
1118 // forwarded any object to other mds since previous epoch. Let's assume,
1119 // at time 'A', we got scrub stats from all mds for previous epoch. If
1120 // a scrub is not reported by any mds, we know there is no forward of
1121 // the scrub since time 'A'. So we can consider the scrub is finished.
1122 if (scrub_epoch_fully_acked
+ 1 == scrub_epoch
)
1123 update_scrubbing
= true;
1124 scrub_epoch_fully_acked
= scrub_epoch
;
1128 if (mds_scrub_stats
.size() != (size_t)up_max
+ 1)
1129 mds_scrub_stats
.resize((size_t)up_max
+ 1);
1130 mds_scrub_stats
.at(0).epoch_acked
= scrub_epoch
+ 1;
1132 bool any_finished
= false;
1133 bool any_repaired
= false;
1135 for (auto it
= scrubbing_map
.begin(); it
!= scrubbing_map
.end(); ) {
1136 auto& header
= it
->second
;
1137 if (header
->get_num_pending() ||
1138 header
->get_epoch_last_forwarded() >= scrub_epoch
) {
1139 if (update_scrubbing
&& up_max
!= 0)
1140 scrubbing_tags
.insert(it
->first
);
1142 } else if (update_scrubbing
&& !scrubbing_tags
.count(it
->first
)) {
1143 // no longer being scrubbed globally
1144 any_finished
= true;
1145 if (header
->get_repaired())
1146 any_repaired
= true;
1147 scrubbing_map
.erase(it
++);
1155 for (auto& r
: up_mds
) {
1158 auto m
= update_scrubbing
?
1159 make_message
<MMDSScrubStats
>(scrub_epoch
, scrubbing_tags
) :
1160 make_message
<MMDSScrubStats
>(scrub_epoch
);
1161 mds
->send_message_mds(m
, r
);
1165 clog_scrub_summary();
1167 mdcache
->mds
->mdlog
->trim_all();
1170 void ScrubStack::handle_mds_failure(mds_rank_t mds
)
1173 scrub_abort(nullptr);
1178 for (auto it
= remote_scrubs
.begin(); it
!= remote_scrubs
.end(); ) {
1179 if (it
->second
.gather_set
.erase(mds
) &&
1180 it
->second
.gather_set
.empty()) {
1181 CInode
*in
= it
->first
;
1182 remote_scrubs
.erase(it
++);
1183 remove_from_waiting(in
, false);