1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2014 Red Hat
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
15 #include "ScrubStack.h"
16 #include "common/Finisher.h"
17 #include "mds/MDSRank.h"
18 #include "mds/MDCache.h"
19 #include "mds/MDSContinuation.h"
21 #define dout_context g_ceph_context
22 #define dout_subsys ceph_subsys_mds
24 #define dout_prefix _prefix(_dout, mdcache->mds)
28 static std::ostream
& _prefix(std::ostream
*_dout
, MDSRank
*mds
) {
29 return *_dout
<< "mds." << mds
->get_nodeid() << ".scrubstack ";
32 std::ostream
&operator<<(std::ostream
&os
, const ScrubStack::State
&state
) {
34 case ScrubStack::STATE_RUNNING
:
37 case ScrubStack::STATE_IDLE
:
40 case ScrubStack::STATE_PAUSING
:
43 case ScrubStack::STATE_PAUSED
:
53 void ScrubStack::dequeue(MDSCacheObject
*obj
)
55 dout(20) << "dequeue " << *obj
<< " from ScrubStack" << dendl
;
56 ceph_assert(obj
->item_scrub
.is_on_list());
57 obj
->put(MDSCacheObject::PIN_SCRUBQUEUE
);
58 obj
->item_scrub
.remove_myself();
62 int ScrubStack::_enqueue(MDSCacheObject
*obj
, ScrubHeaderRef
& header
, bool top
)
64 ceph_assert(ceph_mutex_is_locked_by_me(mdcache
->mds
->mds_lock
));
65 if (CInode
*in
= dynamic_cast<CInode
*>(obj
)) {
66 if (in
->scrub_is_in_progress()) {
67 dout(10) << __func__
<< " with {" << *in
<< "}" << ", already in scrubbing" << dendl
;
71 dout(10) << __func__
<< " with {" << *in
<< "}" << ", top=" << top
<< dendl
;
72 in
->scrub_initialize(header
);
73 } else if (CDir
*dir
= dynamic_cast<CDir
*>(obj
)) {
74 if (dir
->scrub_is_in_progress()) {
75 dout(10) << __func__
<< " with {" << *dir
<< "}" << ", already in scrubbing" << dendl
;
79 dout(10) << __func__
<< " with {" << *dir
<< "}" << ", top=" << top
<< dendl
;
80 // The edge directory must be in memory
82 dir
->scrub_initialize(header
);
84 ceph_assert(0 == "queue dentry to scrub stack");
87 dout(20) << "enqueue " << *obj
<< " to " << (top
? "top" : "bottom") << " of ScrubStack" << dendl
;
88 if (!obj
->item_scrub
.is_on_list()) {
89 obj
->get(MDSCacheObject::PIN_SCRUBQUEUE
);
93 scrub_stack
.push_front(&obj
->item_scrub
);
95 scrub_stack
.push_back(&obj
->item_scrub
);
99 int ScrubStack::enqueue(CInode
*in
, ScrubHeaderRef
& header
, bool top
)
103 return -CEPHFS_EAGAIN
;
105 header
->set_origin(in
->ino());
106 auto ret
= scrubbing_map
.emplace(header
->get_tag(), header
);
108 dout(10) << __func__
<< " with {" << *in
<< "}"
109 << ", conflicting tag " << header
->get_tag() << dendl
;
110 return -CEPHFS_EEXIST
;
113 int r
= _enqueue(in
, header
, top
);
117 clog_scrub_summary(in
);
123 void ScrubStack::add_to_waiting(MDSCacheObject
*obj
)
125 scrubs_in_progress
++;
126 obj
->item_scrub
.remove_myself();
127 scrub_waiting
.push_back(&obj
->item_scrub
);
130 void ScrubStack::remove_from_waiting(MDSCacheObject
*obj
, bool kick
)
132 scrubs_in_progress
--;
133 if (obj
->item_scrub
.is_on_list()) {
134 obj
->item_scrub
.remove_myself();
135 scrub_stack
.push_front(&obj
->item_scrub
);
141 class C_RetryScrub
: public MDSInternalContext
{
143 C_RetryScrub(ScrubStack
*s
, MDSCacheObject
*o
) :
144 MDSInternalContext(s
->mdcache
->mds
), stack(s
), obj(o
) {
145 stack
->add_to_waiting(obj
);
147 void finish(int r
) override
{
148 stack
->remove_from_waiting(obj
);
155 void ScrubStack::kick_off_scrubs()
157 ceph_assert(ceph_mutex_is_locked(mdcache
->mds
->mds_lock
));
158 dout(20) << __func__
<< ": state=" << state
<< dendl
;
160 if (clear_stack
|| state
== STATE_PAUSING
|| state
== STATE_PAUSED
) {
161 if (scrubs_in_progress
== 0) {
162 dout(10) << __func__
<< ": in progress scrub operations finished, "
163 << stack_size
<< " in the stack" << dendl
;
165 State final_state
= state
;
167 abort_pending_scrubs();
168 final_state
= STATE_IDLE
;
170 if (state
== STATE_PAUSING
) {
171 final_state
= STATE_PAUSED
;
174 set_state(final_state
);
175 complete_control_contexts(0);
181 dout(20) << __func__
<< " entering with " << scrubs_in_progress
<< " in "
182 "progress and " << stack_size
<< " in the stack" << dendl
;
183 elist
<MDSCacheObject
*>::iterator it
= scrub_stack
.begin();
184 while (g_conf()->mds_max_scrub_ops_in_progress
> scrubs_in_progress
) {
186 if (scrubs_in_progress
== 0) {
187 set_state(STATE_IDLE
);
193 assert(state
== STATE_RUNNING
|| state
== STATE_IDLE
);
194 set_state(STATE_RUNNING
);
196 if (CInode
*in
= dynamic_cast<CInode
*>(*it
)) {
197 dout(20) << __func__
<< " examining " << *in
<< dendl
;
200 if (!validate_inode_auth(in
))
204 // it's a regular file, symlink, or hard link
205 dequeue(in
); // we only touch it this once, so remove from stack
207 scrub_file_inode(in
);
209 bool added_children
= false;
210 bool done
= false; // it's done, so pop it off the stack
211 scrub_dir_inode(in
, &added_children
, &done
);
213 dout(20) << __func__
<< " dir inode, done" << dendl
;
216 if (added_children
) {
217 // dirfrags were queued at top of stack
218 it
= scrub_stack
.begin();
221 } else if (CDir
*dir
= dynamic_cast<CDir
*>(*it
)) {
224 bool done
= false; // it's done, so pop it off the stack
225 scrub_dirfrag(dir
, &done
);
227 dout(20) << __func__
<< " dirfrag, done" << dendl
;
228 ++it
; // child inodes were queued at bottom of stack
234 ceph_assert(0 == "dentry in scrub stack");
239 bool ScrubStack::validate_inode_auth(CInode
*in
)
242 if (!in
->can_auth_pin()) {
243 dout(10) << __func__
<< " can't auth pin" << dendl
;
244 in
->add_waiter(CInode::WAIT_UNFREEZE
, new C_RetryScrub(this, in
));
249 MDSRank
*mds
= mdcache
->mds
;
250 if (in
->is_ambiguous_auth()) {
251 dout(10) << __func__
<< " ambiguous auth" << dendl
;
252 in
->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH
, new C_RetryScrub(this, in
));
253 } else if (mds
->is_cluster_degraded()) {
254 dout(20) << __func__
<< " cluster degraded" << dendl
;
255 mds
->wait_for_cluster_recovered(new C_RetryScrub(this, in
));
257 ScrubHeaderRef header
= in
->get_scrub_header();
260 auto ret
= remote_scrubs
.emplace(std::piecewise_construct
,
261 std::forward_as_tuple(in
),
262 std::forward_as_tuple());
263 ceph_assert(ret
.second
); // FIXME: parallel scrubs?
264 auto &scrub_r
= ret
.first
->second
;
265 scrub_r
.tag
= header
->get_tag();
267 mds_rank_t auth
= in
->authority().first
;
268 dout(10) << __func__
<< " forward to mds." << auth
<< dendl
;
269 auto r
= make_message
<MMDSScrub
>(MMDSScrub::OP_QUEUEINO
, in
->ino(),
270 std::move(in
->scrub_queued_frags()),
271 header
->get_tag(), header
->get_origin(),
272 header
->is_internal_tag(), header
->get_force(),
273 header
->get_recursive(), header
->get_repair());
274 mdcache
->mds
->send_message_mds(r
, auth
);
276 scrub_r
.gather_set
.insert(auth
);
284 void ScrubStack::scrub_dir_inode(CInode
*in
, bool *added_children
, bool *done
)
286 dout(10) << __func__
<< " " << *in
<< dendl
;
287 ceph_assert(in
->is_auth());
288 MDSRank
*mds
= mdcache
->mds
;
290 ScrubHeaderRef header
= in
->get_scrub_header();
293 MDSGatherBuilder
gather(g_ceph_context
);
295 auto &queued
= in
->scrub_queued_frags();
296 std::map
<mds_rank_t
, fragset_t
> scrub_remote
;
299 in
->dirfragtree
.get_leaves(frags
);
300 dout(20) << __func__
<< "recursive mode, frags " << frags
<< dendl
;
301 for (auto &fg
: frags
) {
302 if (queued
.contains(fg
))
304 CDir
*dir
= in
->get_or_open_dirfrag(mdcache
, fg
);
305 if (!dir
->is_auth()) {
306 if (dir
->is_ambiguous_auth()) {
307 dout(20) << __func__
<< " ambiguous auth " << *dir
<< dendl
;
308 dir
->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH
, gather
.new_sub());
309 } else if (mds
->is_cluster_degraded()) {
310 dout(20) << __func__
<< " cluster degraded" << dendl
;
311 mds
->wait_for_cluster_recovered(gather
.new_sub());
313 mds_rank_t auth
= dir
->authority().first
;
314 scrub_remote
[auth
].insert_raw(fg
);
316 } else if (!dir
->can_auth_pin()) {
317 dout(20) << __func__
<< " freezing/frozen " << *dir
<< dendl
;
318 dir
->add_waiter(CDir::WAIT_UNFREEZE
, gather
.new_sub());
319 } else if (dir
->get_version() == 0) {
320 dout(20) << __func__
<< " barebones " << *dir
<< dendl
;
321 dir
->fetch_keys({}, gather
.new_sub());
323 _enqueue(dir
, header
, true);
324 queued
.insert_raw(dir
->get_frag());
325 *added_children
= true;
331 if (gather
.has_subs()) {
332 gather
.set_finisher(new C_RetryScrub(this, in
));
337 if (!scrub_remote
.empty()) {
338 auto ret
= remote_scrubs
.emplace(std::piecewise_construct
,
339 std::forward_as_tuple(in
),
340 std::forward_as_tuple());
341 ceph_assert(ret
.second
); // FIXME: parallel scrubs?
342 auto &scrub_r
= ret
.first
->second
;
343 scrub_r
.tag
= header
->get_tag();
345 for (auto& p
: scrub_remote
) {
347 dout(20) << __func__
<< " forward " << p
.second
<< " to mds." << p
.first
<< dendl
;
348 auto r
= make_message
<MMDSScrub
>(MMDSScrub::OP_QUEUEDIR
, in
->ino(),
349 std::move(p
.second
), header
->get_tag(),
350 header
->get_origin(), header
->is_internal_tag(),
351 header
->get_force(), header
->get_recursive(),
352 header
->get_repair());
353 mds
->send_message_mds(r
, p
.first
);
354 scrub_r
.gather_set
.insert(p
.first
);
361 scrub_dir_inode_final(in
);
364 dout(10) << __func__
<< " done" << dendl
;
367 class C_InodeValidated
: public MDSInternalContext
371 CInode::validated_data result
;
374 C_InodeValidated(MDSRank
*mds
, ScrubStack
*stack_
, CInode
*target_
)
375 : MDSInternalContext(mds
), stack(stack_
), target(target_
)
377 stack
->scrubs_in_progress
++;
379 void finish(int r
) override
{
380 stack
->_validate_inode_done(target
, r
, result
);
381 stack
->scrubs_in_progress
--;
382 stack
->kick_off_scrubs();
386 void ScrubStack::scrub_dir_inode_final(CInode
*in
)
388 dout(20) << __func__
<< " " << *in
<< dendl
;
390 C_InodeValidated
*fin
= new C_InodeValidated(mdcache
->mds
, this, in
);
391 in
->validate_disk_state(&fin
->result
, fin
);
395 void ScrubStack::scrub_dirfrag(CDir
*dir
, bool *done
)
397 ceph_assert(dir
!= NULL
);
399 dout(10) << __func__
<< " " << *dir
<< dendl
;
401 if (!dir
->is_complete()) {
402 dir
->fetch(new C_RetryScrub(this, dir
), true); // already auth pinned
403 dout(10) << __func__
<< " incomplete, fetching" << dendl
;
407 ScrubHeaderRef header
= dir
->get_scrub_header();
408 version_t last_scrub
= dir
->scrub_info()->last_recursive
.version
;
409 if (header
->get_recursive()) {
410 auto next_seq
= mdcache
->get_global_snaprealm()->get_newest_seq()+1;
411 for (auto it
= dir
->begin(); it
!= dir
->end(); /* nop */) {
412 auto [dnk
, dn
] = *it
;
413 ++it
; /* trim (in the future) may remove dentry */
415 if (dn
->scrub(next_seq
)) {
417 dir
->get_inode()->make_path_string(path
, true);
418 clog
->warn() << "Scrub error on dentry " << *dn
419 << " see " << g_conf()->name
420 << " log and `damage ls` output for details";
423 if (dnk
.snapid
!= CEPH_NOSNAP
) {
427 CDentry::linkage_t
*dnl
= dn
->get_linkage();
428 if (dn
->get_version() <= last_scrub
&&
429 dnl
->get_remote_d_type() != DT_DIR
&&
430 !header
->get_force()) {
431 dout(15) << __func__
<< " skip dentry " << dnk
432 << ", no change since last scrub" << dendl
;
435 if (dnl
->is_primary()) {
436 _enqueue(dnl
->get_inode(), header
, false);
437 } else if (dnl
->is_remote()) {
438 // TODO: check remote linkage
443 if (!dir
->scrub_local()) {
445 dir
->get_inode()->make_path_string(path
, true);
446 clog
->warn() << "Scrub error on dir " << dir
->ino()
447 << " (" << path
<< ") see " << g_conf()->name
448 << " log and `damage ls` output for details";
451 dir
->scrub_finished();
452 dir
->auth_unpin(this);
455 dout(10) << __func__
<< " done" << dendl
;
458 void ScrubStack::scrub_file_inode(CInode
*in
)
460 C_InodeValidated
*fin
= new C_InodeValidated(mdcache
->mds
, this, in
);
461 // At this stage the DN is already past scrub_initialize, so
462 // it's in the cache, it has PIN_SCRUBQUEUE and it is authpinned
463 in
->validate_disk_state(&fin
->result
, fin
);
466 void ScrubStack::_validate_inode_done(CInode
*in
, int r
,
467 const CInode::validated_data
&result
)
469 LogChannelRef clog
= mdcache
->mds
->clog
;
470 const ScrubHeaderRefConst header
= in
->scrub_info()->header
;
473 if (!result
.passed_validation
) {
474 // Build path string for use in messages
475 in
->make_path_string(path
, true);
478 if (result
.backtrace
.checked
&& !result
.backtrace
.passed
&&
479 !result
.backtrace
.repaired
)
481 // Record backtrace fails as remote linkage damage, as
482 // we may not be able to resolve hard links to this inode
483 mdcache
->mds
->damage_table
.notify_remote_damaged(in
->ino(), path
);
484 } else if (result
.inode
.checked
&& !result
.inode
.passed
&&
485 !result
.inode
.repaired
) {
486 // Record damaged inode structures as damaged dentries as
487 // that is where they are stored
488 auto parent
= in
->get_projected_parent_dn();
490 auto dir
= parent
->get_dir();
491 mdcache
->mds
->damage_table
.notify_dentry(
492 dir
->inode
->ino(), dir
->frag
, parent
->last
, parent
->get_name(), path
);
496 // Inform the cluster log if we found an error
497 if (!result
.passed_validation
) {
498 if (result
.all_damage_repaired()) {
499 clog
->info() << "Scrub repaired inode " << in
->ino()
500 << " (" << path
<< ")";
502 clog
->warn() << "Scrub error on inode " << in
->ino()
503 << " (" << path
<< ") see " << g_conf()->name
504 << " log and `damage ls` output for details";
507 // Put the verbose JSON output into the MDS log for later inspection
510 CachedStackStringStream css
;
512 derr
<< __func__
<< " scrub error on inode " << *in
<< ": " << css
->strv()
515 dout(10) << __func__
<< " scrub passed on inode " << *in
<< dendl
;
518 in
->scrub_finished();
521 void ScrubStack::complete_control_contexts(int r
) {
522 ceph_assert(ceph_mutex_is_locked_by_me(mdcache
->mds
->mds_lock
));
524 for (auto &ctx
: control_ctxs
) {
527 control_ctxs
.clear();
530 void ScrubStack::set_state(State next_state
) {
531 if (state
!= next_state
) {
532 dout(20) << __func__
<< ", from state=" << state
<< ", to state="
533 << next_state
<< dendl
;
535 clog_scrub_summary();
539 bool ScrubStack::scrub_in_transition_state() {
540 ceph_assert(ceph_mutex_is_locked_by_me(mdcache
->mds
->mds_lock
));
541 dout(20) << __func__
<< ": state=" << state
<< dendl
;
543 // STATE_RUNNING is considered as a transition state so as to
544 // "delay" the scrub control operation.
545 if (state
== STATE_RUNNING
|| state
== STATE_PAUSING
) {
552 std::string_view
ScrubStack::scrub_summary() {
553 ceph_assert(ceph_mutex_is_locked_by_me(mdcache
->mds
->mds_lock
));
555 bool have_more
= false;
556 CachedStackStringStream cs
;
558 if (state
== STATE_IDLE
) {
559 if (scrubbing_map
.empty())
561 *cs
<< "idle+waiting";
564 if (state
== STATE_RUNNING
) {
571 if (state
== STATE_PAUSING
) {
574 } else if (state
== STATE_PAUSED
) {
587 if (!scrubbing_map
.empty()) {
590 for (auto &p
: scrubbing_map
) {
593 auto& header
= p
.second
;
594 if (CInode
*in
= mdcache
->get_inode(header
->get_origin()))
595 *cs
<< scrub_inode_path(in
);
597 *cs
<< "#" << header
->get_origin();
606 void ScrubStack::scrub_status(Formatter
*f
) {
607 ceph_assert(ceph_mutex_is_locked_by_me(mdcache
->mds
->mds_lock
));
609 f
->open_object_section("result");
611 CachedStackStringStream css
;
612 bool have_more
= false;
614 if (state
== STATE_IDLE
) {
615 if (scrubbing_map
.empty())
616 *css
<< "no active scrubs running";
618 *css
<< state
<< " (waiting for more scrubs)";
619 } else if (state
== STATE_RUNNING
) {
623 *css
<< "scrub active";
625 *css
<< " (" << stack_size
<< " inodes in the stack)";
627 if (state
== STATE_PAUSING
|| state
== STATE_PAUSED
) {
638 *css
<< " (" << stack_size
<< " inodes in the stack)";
640 f
->dump_string("status", css
->strv());
642 f
->open_object_section("scrubs");
644 for (auto& p
: scrubbing_map
) {
646 auto& header
= p
.second
;
648 std::string
tag(header
->get_tag());
649 f
->open_object_section(tag
.c_str()); // scrub id
651 if (CInode
*in
= mdcache
->get_inode(header
->get_origin()))
652 f
->dump_string("path", scrub_inode_path(in
));
654 f
->dump_stream("path") << "#" << header
->get_origin();
656 f
->dump_string("tag", header
->get_tag());
658 CachedStackStringStream optcss
;
659 if (header
->get_recursive()) {
660 *optcss
<< "recursive";
663 if (header
->get_repair()) {
670 if (header
->get_force()) {
677 f
->dump_string("options", optcss
->strv());
678 f
->close_section(); // scrub id
680 f
->close_section(); // scrubs
681 f
->close_section(); // result
684 void ScrubStack::abort_pending_scrubs() {
685 ceph_assert(ceph_mutex_is_locked_by_me(mdcache
->mds
->mds_lock
));
686 ceph_assert(clear_stack
);
688 auto abort_one
= [this](MDSCacheObject
*obj
) {
689 if (CInode
*in
= dynamic_cast<CInode
*>(obj
)) {
691 } else if (CDir
*dir
= dynamic_cast<CDir
*>(obj
)) {
692 dir
->scrub_aborted();
693 dir
->auth_unpin(this);
695 ceph_abort(0 == "dentry in scrub stack");
698 for (auto it
= scrub_stack
.begin(); !it
.end(); ++it
)
700 for (auto it
= scrub_waiting
.begin(); !it
.end(); ++it
)
705 scrub_waiting
.clear();
707 for (auto& p
: remote_scrubs
)
708 remove_from_waiting(p
.first
, false);
709 remote_scrubs
.clear();
714 void ScrubStack::send_state_message(int op
) {
715 MDSRank
*mds
= mdcache
->mds
;
716 set
<mds_rank_t
> up_mds
;
717 mds
->get_mds_map()->get_up_mds_set(up_mds
);
718 for (auto& r
: up_mds
) {
721 auto m
= make_message
<MMDSScrub
>(op
);
722 mds
->send_message_mds(m
, r
);
726 void ScrubStack::scrub_abort(Context
*on_finish
) {
727 ceph_assert(ceph_mutex_is_locked_by_me(mdcache
->mds
->mds_lock
));
729 dout(10) << __func__
<< ": aborting with " << scrubs_in_progress
730 << " scrubs in progress and " << stack_size
<< " in the"
731 << " stack" << dendl
;
733 if (mdcache
->mds
->get_nodeid() == 0) {
734 scrub_epoch_last_abort
= scrub_epoch
;
735 scrub_any_peer_aborting
= true;
736 send_state_message(MMDSScrub::OP_ABORT
);
740 if (scrub_in_transition_state()) {
742 control_ctxs
.push_back(on_finish
);
746 abort_pending_scrubs();
747 if (state
!= STATE_PAUSED
)
748 set_state(STATE_IDLE
);
751 on_finish
->complete(0);
754 void ScrubStack::scrub_pause(Context
*on_finish
) {
755 ceph_assert(ceph_mutex_is_locked_by_me(mdcache
->mds
->mds_lock
));
757 dout(10) << __func__
<< ": pausing with " << scrubs_in_progress
758 << " scrubs in progress and " << stack_size
<< " in the"
759 << " stack" << dendl
;
761 if (mdcache
->mds
->get_nodeid() == 0)
762 send_state_message(MMDSScrub::OP_PAUSE
);
764 // abort is in progress
767 on_finish
->complete(-CEPHFS_EINVAL
);
771 bool done
= scrub_in_transition_state();
773 set_state(STATE_PAUSING
);
775 control_ctxs
.push_back(on_finish
);
779 set_state(STATE_PAUSED
);
781 on_finish
->complete(0);
784 bool ScrubStack::scrub_resume() {
785 ceph_assert(ceph_mutex_is_locked_by_me(mdcache
->mds
->mds_lock
));
786 dout(20) << __func__
<< ": state=" << state
<< dendl
;
788 if (mdcache
->mds
->get_nodeid() == 0)
789 send_state_message(MMDSScrub::OP_RESUME
);
795 } else if (state
== STATE_PAUSING
) {
796 set_state(STATE_RUNNING
);
797 complete_control_contexts(-CEPHFS_ECANCELED
);
798 } else if (state
== STATE_PAUSED
) {
799 set_state(STATE_RUNNING
);
806 // send current scrub summary to cluster log
807 void ScrubStack::clog_scrub_summary(CInode
*in
) {
812 } else if (in
->scrub_is_in_progress()) {
817 clog
->info() << "scrub " << what
<< " for path: " << scrub_inode_path(in
);
820 clog
->info() << "scrub summary: " << scrub_summary();
823 void ScrubStack::dispatch(const cref_t
<Message
> &m
)
825 switch (m
->get_type()) {
827 handle_scrub(ref_cast
<MMDSScrub
>(m
));
830 case MSG_MDS_SCRUB_STATS
:
831 handle_scrub_stats(ref_cast
<MMDSScrubStats
>(m
));
835 derr
<< " scrub stack unknown message " << m
->get_type() << dendl_impl
;
836 ceph_abort_msg("scrub stack unknown message");
840 void ScrubStack::handle_scrub(const cref_t
<MMDSScrub
> &m
)
843 mds_rank_t from
= mds_rank_t(m
->get_source().num());
844 dout(10) << __func__
<< " " << *m
<< " from mds." << from
<< dendl
;
846 switch (m
->get_op()) {
847 case MMDSScrub::OP_QUEUEDIR
:
849 CInode
*diri
= mdcache
->get_inode(m
->get_ino());
852 std::vector
<CDir
*> dfs
;
853 MDSGatherBuilder
gather(g_ceph_context
);
854 for (const auto& fg
: m
->get_frags()) {
855 CDir
*dir
= diri
->get_dirfrag(fg
);
857 dout(10) << __func__
<< " no frag " << fg
<< dendl
;
860 if (!dir
->is_auth()) {
861 dout(10) << __func__
<< " not auth " << *dir
<< dendl
;
864 if (!dir
->can_auth_pin()) {
865 dout(10) << __func__
<< " can't auth pin " << *dir
<< dendl
;
866 dir
->add_waiter(CDir::WAIT_UNFREEZE
, gather
.new_sub());
872 if (gather
.has_subs()) {
873 gather
.set_finisher(new C_MDS_RetryMessage(mdcache
->mds
, m
));
880 ScrubHeaderRef header
;
881 if (auto it
= scrubbing_map
.find(m
->get_tag()); it
!= scrubbing_map
.end()) {
884 header
= std::make_shared
<ScrubHeader
>(m
->get_tag(), m
->is_internal_tag(),
885 m
->is_force(), m
->is_recursive(),
887 header
->set_origin(m
->get_origin());
888 scrubbing_map
.emplace(header
->get_tag(), header
);
890 for (auto dir
: dfs
) {
891 queued
.insert_raw(dir
->get_frag());
892 _enqueue(dir
, header
, true);
898 auto r
= make_message
<MMDSScrub
>(MMDSScrub::OP_QUEUEDIR_ACK
, m
->get_ino(),
899 std::move(queued
), m
->get_tag());
900 mdcache
->mds
->send_message_mds(r
, from
);
903 case MMDSScrub::OP_QUEUEDIR_ACK
:
905 CInode
*diri
= mdcache
->get_inode(m
->get_ino());
907 auto it
= remote_scrubs
.find(diri
);
908 if (it
!= remote_scrubs
.end() &&
909 m
->get_tag() == it
->second
.tag
) {
910 if (it
->second
.gather_set
.erase(from
)) {
911 auto &queued
= diri
->scrub_queued_frags();
912 for (auto &fg
: m
->get_frags())
913 queued
.insert_raw(fg
);
916 if (it
->second
.gather_set
.empty()) {
917 remote_scrubs
.erase(it
);
919 const auto& header
= diri
->get_scrub_header();
920 header
->set_epoch_last_forwarded(scrub_epoch
);
921 remove_from_waiting(diri
);
927 case MMDSScrub::OP_QUEUEINO
:
929 CInode
*in
= mdcache
->get_inode(m
->get_ino());
932 ScrubHeaderRef header
;
933 if (auto it
= scrubbing_map
.find(m
->get_tag()); it
!= scrubbing_map
.end()) {
936 header
= std::make_shared
<ScrubHeader
>(m
->get_tag(), m
->is_internal_tag(),
937 m
->is_force(), m
->is_recursive(),
939 header
->set_origin(m
->get_origin());
940 scrubbing_map
.emplace(header
->get_tag(), header
);
943 _enqueue(in
, header
, true);
944 in
->scrub_queued_frags() = m
->get_frags();
948 auto r
= make_message
<MMDSScrub
>(MMDSScrub::OP_QUEUEINO_ACK
, m
->get_ino(),
949 std::move(queued
), m
->get_tag());
950 mdcache
->mds
->send_message_mds(r
, from
);
953 case MMDSScrub::OP_QUEUEINO_ACK
:
955 CInode
*in
= mdcache
->get_inode(m
->get_ino());
957 auto it
= remote_scrubs
.find(in
);
958 if (it
!= remote_scrubs
.end() &&
959 m
->get_tag() == it
->second
.tag
&&
960 it
->second
.gather_set
.erase(from
)) {
961 ceph_assert(it
->second
.gather_set
.empty());
962 remote_scrubs
.erase(it
);
964 remove_from_waiting(in
, false);
967 const auto& header
= in
->get_scrub_header();
968 header
->set_epoch_last_forwarded(scrub_epoch
);
969 in
->scrub_finished();
975 case MMDSScrub::OP_ABORT
:
976 scrub_abort(nullptr);
978 case MMDSScrub::OP_PAUSE
:
979 scrub_pause(nullptr);
981 case MMDSScrub::OP_RESUME
:
985 derr
<< " scrub stack unknown scrub operation " << m
->get_op() << dendl_impl
;
986 ceph_abort_msg("scrub stack unknown scrub operation");
990 void ScrubStack::handle_scrub_stats(const cref_t
<MMDSScrubStats
> &m
)
992 mds_rank_t from
= mds_rank_t(m
->get_source().num());
993 dout(7) << __func__
<< " " << *m
<< " from mds." << from
<< dendl
;
996 if (scrub_epoch
!= m
->get_epoch() - 1) {
997 scrub_epoch
= m
->get_epoch() - 1;
998 for (auto& p
: scrubbing_map
) {
999 if (p
.second
->get_epoch_last_forwarded())
1000 p
.second
->set_epoch_last_forwarded(scrub_epoch
);
1003 bool any_finished
= false;
1004 bool any_repaired
= false;
1005 std::set
<std::string
> scrubbing_tags
;
1006 for (auto it
= scrubbing_map
.begin(); it
!= scrubbing_map
.end(); ) {
1007 auto& header
= it
->second
;
1008 if (header
->get_num_pending() ||
1009 header
->get_epoch_last_forwarded() >= scrub_epoch
) {
1010 scrubbing_tags
.insert(it
->first
);
1012 } else if (m
->is_finished(it
->first
)) {
1013 any_finished
= true;
1014 if (header
->get_repaired())
1015 any_repaired
= true;
1016 scrubbing_map
.erase(it
++);
1022 scrub_epoch
= m
->get_epoch();
1024 auto ack
= make_message
<MMDSScrubStats
>(scrub_epoch
,
1025 std::move(scrubbing_tags
), clear_stack
);
1026 mdcache
->mds
->send_message_mds(ack
, 0);
1029 clog_scrub_summary();
1031 mdcache
->mds
->mdlog
->trim_all();
1033 if (scrub_epoch
== m
->get_epoch() &&
1034 (size_t)from
< mds_scrub_stats
.size()) {
1035 auto& stat
= mds_scrub_stats
[from
];
1036 stat
.epoch_acked
= m
->get_epoch();
1037 stat
.scrubbing_tags
= m
->get_scrubbing_tags();
1038 stat
.aborting
= m
->is_aborting();
1043 void ScrubStack::advance_scrub_status()
1045 if (!scrub_any_peer_aborting
&& scrubbing_map
.empty())
1048 MDSRank
*mds
= mdcache
->mds
;
1050 set
<mds_rank_t
> up_mds
;
1051 mds
->get_mds_map()->get_up_mds_set(up_mds
);
1052 auto up_max
= *up_mds
.rbegin();
1054 bool update_scrubbing
= false;
1055 std::set
<std::string
> scrubbing_tags
;
1058 update_scrubbing
= true;
1059 scrub_any_peer_aborting
= false;
1060 } else if (mds_scrub_stats
.size() > (size_t)(up_max
)) {
1061 bool any_aborting
= false;
1062 bool fully_acked
= true;
1063 for (const auto& stat
: mds_scrub_stats
) {
1064 if (stat
.aborting
|| stat
.epoch_acked
<= scrub_epoch_last_abort
)
1065 any_aborting
= true;
1066 if (stat
.epoch_acked
!= scrub_epoch
) {
1067 fully_acked
= false;
1070 scrubbing_tags
.insert(stat
.scrubbing_tags
.begin(),
1071 stat
.scrubbing_tags
.end());
1074 scrub_any_peer_aborting
= false;
1076 // handle_scrub_stats() reports scrub is still in-progress if it has
1077 // forwarded any object to other mds since previous epoch. Let's assume,
1078 // at time 'A', we got scrub stats from all mds for previous epoch. If
1079 // a scrub is not reported by any mds, we know there is no forward of
1080 // the scrub since time 'A'. So we can consider the scrub is finished.
1081 if (scrub_epoch_fully_acked
+ 1 == scrub_epoch
)
1082 update_scrubbing
= true;
1083 scrub_epoch_fully_acked
= scrub_epoch
;
1087 if (mds_scrub_stats
.size() != (size_t)up_max
+ 1)
1088 mds_scrub_stats
.resize((size_t)up_max
+ 1);
1089 mds_scrub_stats
.at(0).epoch_acked
= scrub_epoch
+ 1;
1091 bool any_finished
= false;
1092 bool any_repaired
= false;
1094 for (auto it
= scrubbing_map
.begin(); it
!= scrubbing_map
.end(); ) {
1095 auto& header
= it
->second
;
1096 if (header
->get_num_pending() ||
1097 header
->get_epoch_last_forwarded() >= scrub_epoch
) {
1098 if (update_scrubbing
&& up_max
!= 0)
1099 scrubbing_tags
.insert(it
->first
);
1101 } else if (update_scrubbing
&& !scrubbing_tags
.count(it
->first
)) {
1102 // no longer being scrubbed globally
1103 any_finished
= true;
1104 if (header
->get_repaired())
1105 any_repaired
= true;
1106 scrubbing_map
.erase(it
++);
1114 for (auto& r
: up_mds
) {
1117 auto m
= update_scrubbing
?
1118 make_message
<MMDSScrubStats
>(scrub_epoch
, scrubbing_tags
) :
1119 make_message
<MMDSScrubStats
>(scrub_epoch
);
1120 mds
->send_message_mds(m
, r
);
1124 clog_scrub_summary();
1126 mdcache
->mds
->mdlog
->trim_all();
1129 void ScrubStack::handle_mds_failure(mds_rank_t mds
)
1132 scrub_abort(nullptr);
1137 for (auto it
= remote_scrubs
.begin(); it
!= remote_scrubs
.end(); ) {
1138 if (it
->second
.gather_set
.erase(mds
) &&
1139 it
->second
.gather_set
.empty()) {
1140 CInode
*in
= it
->first
;
1141 remote_scrubs
.erase(it
++);
1142 remove_from_waiting(in
, false);