1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2014 Red Hat
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
15 #include "ScrubStack.h"
16 #include "common/Finisher.h"
17 #include "mds/MDSRank.h"
18 #include "mds/MDCache.h"
19 #include "mds/MDSContinuation.h"
21 #define dout_context g_ceph_context
22 #define dout_subsys ceph_subsys_mds
24 #define dout_prefix _prefix(_dout, mdcache->mds)
25 static ostream
& _prefix(std::ostream
*_dout
, MDSRank
*mds
) {
26 return *_dout
<< "mds." << mds
->get_nodeid() << ".scrubstack ";
29 std::ostream
&operator<<(std::ostream
&os
, const ScrubStack::State
&state
) {
31 case ScrubStack::STATE_RUNNING
:
34 case ScrubStack::STATE_IDLE
:
37 case ScrubStack::STATE_PAUSING
:
40 case ScrubStack::STATE_PAUSED
:
50 void ScrubStack::dequeue(MDSCacheObject
*obj
)
52 dout(20) << "dequeue " << *obj
<< " from ScrubStack" << dendl
;
53 ceph_assert(obj
->item_scrub
.is_on_list());
54 obj
->put(MDSCacheObject::PIN_SCRUBQUEUE
);
55 obj
->item_scrub
.remove_myself();
59 int ScrubStack::_enqueue(MDSCacheObject
*obj
, ScrubHeaderRef
& header
, bool top
)
61 ceph_assert(ceph_mutex_is_locked_by_me(mdcache
->mds
->mds_lock
));
62 if (CInode
*in
= dynamic_cast<CInode
*>(obj
)) {
63 if (in
->scrub_is_in_progress()) {
64 dout(10) << __func__
<< " with {" << *in
<< "}" << ", already in scrubbing" << dendl
;
68 dout(10) << __func__
<< " with {" << *in
<< "}" << ", top=" << top
<< dendl
;
69 in
->scrub_initialize(header
);
70 } else if (CDir
*dir
= dynamic_cast<CDir
*>(obj
)) {
71 if (dir
->scrub_is_in_progress()) {
72 dout(10) << __func__
<< " with {" << *dir
<< "}" << ", already in scrubbing" << dendl
;
76 dout(10) << __func__
<< " with {" << *dir
<< "}" << ", top=" << top
<< dendl
;
77 // The edge directory must be in memory
79 dir
->scrub_initialize(header
);
81 ceph_assert(0 == "queue dentry to scrub stack");
84 dout(20) << "enqueue " << *obj
<< " to " << (top
? "top" : "bottom") << " of ScrubStack" << dendl
;
85 if (!obj
->item_scrub
.is_on_list()) {
86 obj
->get(MDSCacheObject::PIN_SCRUBQUEUE
);
90 scrub_stack
.push_front(&obj
->item_scrub
);
92 scrub_stack
.push_back(&obj
->item_scrub
);
96 int ScrubStack::enqueue(CInode
*in
, ScrubHeaderRef
& header
, bool top
)
100 return -CEPHFS_EAGAIN
;
102 header
->set_origin(in
->ino());
103 auto ret
= scrubbing_map
.emplace(header
->get_tag(), header
);
105 dout(10) << __func__
<< " with {" << *in
<< "}"
106 << ", conflicting tag " << header
->get_tag() << dendl
;
107 return -CEPHFS_EEXIST
;
110 int r
= _enqueue(in
, header
, top
);
114 clog_scrub_summary(in
);
120 void ScrubStack::add_to_waiting(MDSCacheObject
*obj
)
122 scrubs_in_progress
++;
123 obj
->item_scrub
.remove_myself();
124 scrub_waiting
.push_back(&obj
->item_scrub
);
127 void ScrubStack::remove_from_waiting(MDSCacheObject
*obj
, bool kick
)
129 scrubs_in_progress
--;
130 if (obj
->item_scrub
.is_on_list()) {
131 obj
->item_scrub
.remove_myself();
132 scrub_stack
.push_front(&obj
->item_scrub
);
138 class C_RetryScrub
: public MDSInternalContext
{
140 C_RetryScrub(ScrubStack
*s
, MDSCacheObject
*o
) :
141 MDSInternalContext(s
->mdcache
->mds
), stack(s
), obj(o
) {
142 stack
->add_to_waiting(obj
);
144 void finish(int r
) override
{
145 stack
->remove_from_waiting(obj
);
152 void ScrubStack::kick_off_scrubs()
154 ceph_assert(ceph_mutex_is_locked(mdcache
->mds
->mds_lock
));
155 dout(20) << __func__
<< ": state=" << state
<< dendl
;
157 if (clear_stack
|| state
== STATE_PAUSING
|| state
== STATE_PAUSED
) {
158 if (scrubs_in_progress
== 0) {
159 dout(10) << __func__
<< ": in progress scrub operations finished, "
160 << stack_size
<< " in the stack" << dendl
;
162 State final_state
= state
;
164 abort_pending_scrubs();
165 final_state
= STATE_IDLE
;
167 if (state
== STATE_PAUSING
) {
168 final_state
= STATE_PAUSED
;
171 set_state(final_state
);
172 complete_control_contexts(0);
178 dout(20) << __func__
<< " entering with " << scrubs_in_progress
<< " in "
179 "progress and " << stack_size
<< " in the stack" << dendl
;
180 elist
<MDSCacheObject
*>::iterator it
= scrub_stack
.begin();
181 while (g_conf()->mds_max_scrub_ops_in_progress
> scrubs_in_progress
) {
183 if (scrubs_in_progress
== 0) {
184 set_state(STATE_IDLE
);
190 assert(state
== STATE_RUNNING
|| state
== STATE_IDLE
);
191 set_state(STATE_RUNNING
);
193 if (CInode
*in
= dynamic_cast<CInode
*>(*it
)) {
194 dout(20) << __func__
<< " examining " << *in
<< dendl
;
197 if (!validate_inode_auth(in
))
201 // it's a regular file, symlink, or hard link
202 dequeue(in
); // we only touch it this once, so remove from stack
204 scrub_file_inode(in
);
206 bool added_children
= false;
207 bool done
= false; // it's done, so pop it off the stack
208 scrub_dir_inode(in
, &added_children
, &done
);
210 dout(20) << __func__
<< " dir inode, done" << dendl
;
213 if (added_children
) {
214 // dirfrags were queued at top of stack
215 it
= scrub_stack
.begin();
218 } else if (CDir
*dir
= dynamic_cast<CDir
*>(*it
)) {
221 bool done
= false; // it's done, so pop it off the stack
222 scrub_dirfrag(dir
, &done
);
224 dout(20) << __func__
<< " dirfrag, done" << dendl
;
225 ++it
; // child inodes were queued at bottom of stack
231 ceph_assert(0 == "dentry in scrub stack");
236 bool ScrubStack::validate_inode_auth(CInode
*in
)
239 if (!in
->can_auth_pin()) {
240 dout(10) << __func__
<< " can't auth pin" << dendl
;
241 in
->add_waiter(CInode::WAIT_UNFREEZE
, new C_RetryScrub(this, in
));
246 MDSRank
*mds
= mdcache
->mds
;
247 if (in
->is_ambiguous_auth()) {
248 dout(10) << __func__
<< " ambiguous auth" << dendl
;
249 in
->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH
, new C_RetryScrub(this, in
));
250 } else if (mds
->is_cluster_degraded()) {
251 dout(20) << __func__
<< " cluster degraded" << dendl
;
252 mds
->wait_for_cluster_recovered(new C_RetryScrub(this, in
));
254 ScrubHeaderRef header
= in
->get_scrub_header();
257 auto ret
= remote_scrubs
.emplace(std::piecewise_construct
,
258 std::forward_as_tuple(in
),
259 std::forward_as_tuple());
260 ceph_assert(ret
.second
); // FIXME: parallel scrubs?
261 auto &scrub_r
= ret
.first
->second
;
262 scrub_r
.tag
= header
->get_tag();
264 mds_rank_t auth
= in
->authority().first
;
265 dout(10) << __func__
<< " forward to mds." << auth
<< dendl
;
266 auto r
= make_message
<MMDSScrub
>(MMDSScrub::OP_QUEUEINO
, in
->ino(),
267 std::move(in
->scrub_queued_frags()),
268 header
->get_tag(), header
->get_origin(),
269 header
->is_internal_tag(), header
->get_force(),
270 header
->get_recursive(), header
->get_repair());
271 mdcache
->mds
->send_message_mds(r
, auth
);
273 scrub_r
.gather_set
.insert(auth
);
281 void ScrubStack::scrub_dir_inode(CInode
*in
, bool *added_children
, bool *done
)
283 dout(10) << __func__
<< " " << *in
<< dendl
;
284 ceph_assert(in
->is_auth());
285 MDSRank
*mds
= mdcache
->mds
;
287 ScrubHeaderRef header
= in
->get_scrub_header();
290 MDSGatherBuilder
gather(g_ceph_context
);
292 auto &queued
= in
->scrub_queued_frags();
293 std::map
<mds_rank_t
, fragset_t
> scrub_remote
;
296 in
->dirfragtree
.get_leaves(frags
);
297 dout(20) << __func__
<< "recursive mode, frags " << frags
<< dendl
;
298 for (auto &fg
: frags
) {
299 if (queued
.contains(fg
))
301 CDir
*dir
= in
->get_or_open_dirfrag(mdcache
, fg
);
302 if (!dir
->is_auth()) {
303 if (dir
->is_ambiguous_auth()) {
304 dout(20) << __func__
<< " ambiguous auth " << *dir
<< dendl
;
305 dir
->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH
, gather
.new_sub());
306 } else if (mds
->is_cluster_degraded()) {
307 dout(20) << __func__
<< " cluster degraded" << dendl
;
308 mds
->wait_for_cluster_recovered(gather
.new_sub());
310 mds_rank_t auth
= dir
->authority().first
;
311 scrub_remote
[auth
].insert_raw(fg
);
313 } else if (!dir
->can_auth_pin()) {
314 dout(20) << __func__
<< " freezing/frozen " << *dir
<< dendl
;
315 dir
->add_waiter(CDir::WAIT_UNFREEZE
, gather
.new_sub());
316 } else if (dir
->get_version() == 0) {
317 dout(20) << __func__
<< " barebones " << *dir
<< dendl
;
318 dir
->fetch(gather
.new_sub());
320 _enqueue(dir
, header
, true);
321 queued
.insert_raw(dir
->get_frag());
322 *added_children
= true;
328 if (gather
.has_subs()) {
329 gather
.set_finisher(new C_RetryScrub(this, in
));
334 if (!scrub_remote
.empty()) {
335 auto ret
= remote_scrubs
.emplace(std::piecewise_construct
,
336 std::forward_as_tuple(in
),
337 std::forward_as_tuple());
338 ceph_assert(ret
.second
); // FIXME: parallel scrubs?
339 auto &scrub_r
= ret
.first
->second
;
340 scrub_r
.tag
= header
->get_tag();
342 for (auto& p
: scrub_remote
) {
344 dout(20) << __func__
<< " forward " << p
.second
<< " to mds." << p
.first
<< dendl
;
345 auto r
= make_message
<MMDSScrub
>(MMDSScrub::OP_QUEUEDIR
, in
->ino(),
346 std::move(p
.second
), header
->get_tag(),
347 header
->get_origin(), header
->is_internal_tag(),
348 header
->get_force(), header
->get_recursive(),
349 header
->get_repair());
350 mds
->send_message_mds(r
, p
.first
);
351 scrub_r
.gather_set
.insert(p
.first
);
358 scrub_dir_inode_final(in
);
361 dout(10) << __func__
<< " done" << dendl
;
364 class C_InodeValidated
: public MDSInternalContext
368 CInode::validated_data result
;
371 C_InodeValidated(MDSRank
*mds
, ScrubStack
*stack_
, CInode
*target_
)
372 : MDSInternalContext(mds
), stack(stack_
), target(target_
)
374 stack
->scrubs_in_progress
++;
376 void finish(int r
) override
{
377 stack
->_validate_inode_done(target
, r
, result
);
378 stack
->scrubs_in_progress
--;
379 stack
->kick_off_scrubs();
383 void ScrubStack::scrub_dir_inode_final(CInode
*in
)
385 dout(20) << __func__
<< " " << *in
<< dendl
;
387 C_InodeValidated
*fin
= new C_InodeValidated(mdcache
->mds
, this, in
);
388 in
->validate_disk_state(&fin
->result
, fin
);
392 void ScrubStack::scrub_dirfrag(CDir
*dir
, bool *done
)
394 ceph_assert(dir
!= NULL
);
396 dout(10) << __func__
<< " " << *dir
<< dendl
;
398 if (!dir
->is_complete()) {
399 dir
->fetch(new C_RetryScrub(this, dir
), true); // already auth pinned
400 dout(10) << __func__
<< " incomplete, fetching" << dendl
;
404 ScrubHeaderRef header
= dir
->get_scrub_header();
405 version_t last_scrub
= dir
->scrub_info()->last_recursive
.version
;
406 if (header
->get_recursive()) {
407 for (auto it
= dir
->begin(); it
!= dir
->end(); ++it
) {
408 if (it
->first
.snapid
!= CEPH_NOSNAP
)
410 CDentry
*dn
= it
->second
;
411 CDentry::linkage_t
*dnl
= dn
->get_linkage();
412 if (dn
->get_version() <= last_scrub
&&
413 dnl
->get_remote_d_type() != DT_DIR
&&
414 !header
->get_force()) {
415 dout(15) << __func__
<< " skip dentry " << it
->first
416 << ", no change since last scrub" << dendl
;
419 if (dnl
->is_primary()) {
420 _enqueue(dnl
->get_inode(), header
, false);
421 } else if (dnl
->is_remote()) {
422 // TODO: check remote linkage
429 dir
->scrub_finished();
430 dir
->auth_unpin(this);
433 dout(10) << __func__
<< " done" << dendl
;
436 void ScrubStack::scrub_file_inode(CInode
*in
)
438 C_InodeValidated
*fin
= new C_InodeValidated(mdcache
->mds
, this, in
);
439 // At this stage the DN is already past scrub_initialize, so
440 // it's in the cache, it has PIN_SCRUBQUEUE and it is authpinned
441 in
->validate_disk_state(&fin
->result
, fin
);
444 void ScrubStack::_validate_inode_done(CInode
*in
, int r
,
445 const CInode::validated_data
&result
)
447 LogChannelRef clog
= mdcache
->mds
->clog
;
448 const ScrubHeaderRefConst header
= in
->scrub_info()->header
;
451 if (!result
.passed_validation
) {
452 // Build path string for use in messages
453 in
->make_path_string(path
, true);
456 if (result
.backtrace
.checked
&& !result
.backtrace
.passed
&&
457 !result
.backtrace
.repaired
)
459 // Record backtrace fails as remote linkage damage, as
460 // we may not be able to resolve hard links to this inode
461 mdcache
->mds
->damage_table
.notify_remote_damaged(in
->ino(), path
);
462 } else if (result
.inode
.checked
&& !result
.inode
.passed
&&
463 !result
.inode
.repaired
) {
464 // Record damaged inode structures as damaged dentries as
465 // that is where they are stored
466 auto parent
= in
->get_projected_parent_dn();
468 auto dir
= parent
->get_dir();
469 mdcache
->mds
->damage_table
.notify_dentry(
470 dir
->inode
->ino(), dir
->frag
, parent
->last
, parent
->get_name(), path
);
474 // Inform the cluster log if we found an error
475 if (!result
.passed_validation
) {
476 if (result
.all_damage_repaired()) {
477 clog
->info() << "Scrub repaired inode " << in
->ino()
478 << " (" << path
<< ")";
480 clog
->warn() << "Scrub error on inode " << in
->ino()
481 << " (" << path
<< ") see " << g_conf()->name
482 << " log and `damage ls` output for details";
485 // Put the verbose JSON output into the MDS log for later inspection
488 CachedStackStringStream css
;
490 derr
<< __func__
<< " scrub error on inode " << *in
<< ": " << css
->strv()
493 dout(10) << __func__
<< " scrub passed on inode " << *in
<< dendl
;
496 in
->scrub_finished();
499 void ScrubStack::complete_control_contexts(int r
) {
500 ceph_assert(ceph_mutex_is_locked_by_me(mdcache
->mds
->mds_lock
));
502 for (auto &ctx
: control_ctxs
) {
505 control_ctxs
.clear();
508 void ScrubStack::set_state(State next_state
) {
509 if (state
!= next_state
) {
510 dout(20) << __func__
<< ", from state=" << state
<< ", to state="
511 << next_state
<< dendl
;
513 clog_scrub_summary();
517 bool ScrubStack::scrub_in_transition_state() {
518 ceph_assert(ceph_mutex_is_locked_by_me(mdcache
->mds
->mds_lock
));
519 dout(20) << __func__
<< ": state=" << state
<< dendl
;
521 // STATE_RUNNING is considered as a transition state so as to
522 // "delay" the scrub control operation.
523 if (state
== STATE_RUNNING
|| state
== STATE_PAUSING
) {
530 std::string_view
ScrubStack::scrub_summary() {
531 ceph_assert(ceph_mutex_is_locked_by_me(mdcache
->mds
->mds_lock
));
533 bool have_more
= false;
534 CachedStackStringStream cs
;
536 if (state
== STATE_IDLE
) {
537 if (scrubbing_map
.empty())
539 *cs
<< "idle+waiting";
542 if (state
== STATE_RUNNING
) {
549 if (state
== STATE_PAUSING
) {
552 } else if (state
== STATE_PAUSED
) {
565 if (!scrubbing_map
.empty()) {
568 for (auto &p
: scrubbing_map
) {
571 auto& header
= p
.second
;
572 if (CInode
*in
= mdcache
->get_inode(header
->get_origin()))
573 *cs
<< scrub_inode_path(in
);
575 *cs
<< "#" << header
->get_origin();
584 void ScrubStack::scrub_status(Formatter
*f
) {
585 ceph_assert(ceph_mutex_is_locked_by_me(mdcache
->mds
->mds_lock
));
587 f
->open_object_section("result");
589 CachedStackStringStream css
;
590 bool have_more
= false;
592 if (state
== STATE_IDLE
) {
593 if (scrubbing_map
.empty())
594 *css
<< "no active scrubs running";
596 *css
<< state
<< " (waiting for more scrubs)";
597 } else if (state
== STATE_RUNNING
) {
601 *css
<< "scrub active";
603 *css
<< " (" << stack_size
<< " inodes in the stack)";
605 if (state
== STATE_PAUSING
|| state
== STATE_PAUSED
) {
616 *css
<< " (" << stack_size
<< " inodes in the stack)";
618 f
->dump_string("status", css
->strv());
620 f
->open_object_section("scrubs");
622 for (auto& p
: scrubbing_map
) {
624 auto& header
= p
.second
;
626 std::string
tag(header
->get_tag());
627 f
->open_object_section(tag
.c_str()); // scrub id
629 if (CInode
*in
= mdcache
->get_inode(header
->get_origin()))
630 f
->dump_string("path", scrub_inode_path(in
));
632 f
->dump_stream("path") << "#" << header
->get_origin();
634 f
->dump_string("tag", header
->get_tag());
636 CachedStackStringStream optcss
;
637 if (header
->get_recursive()) {
638 *optcss
<< "recursive";
641 if (header
->get_repair()) {
648 if (header
->get_force()) {
655 f
->dump_string("options", optcss
->strv());
656 f
->close_section(); // scrub id
658 f
->close_section(); // scrubs
659 f
->close_section(); // result
662 void ScrubStack::abort_pending_scrubs() {
663 ceph_assert(ceph_mutex_is_locked_by_me(mdcache
->mds
->mds_lock
));
664 ceph_assert(clear_stack
);
666 auto abort_one
= [this](MDSCacheObject
*obj
) {
667 if (CInode
*in
= dynamic_cast<CInode
*>(obj
)) {
669 } else if (CDir
*dir
= dynamic_cast<CDir
*>(obj
)) {
670 dir
->scrub_aborted();
671 dir
->auth_unpin(this);
673 ceph_abort(0 == "dentry in scrub stack");
676 for (auto it
= scrub_stack
.begin(); !it
.end(); ++it
)
678 for (auto it
= scrub_waiting
.begin(); !it
.end(); ++it
)
683 scrub_waiting
.clear();
685 for (auto& p
: remote_scrubs
)
686 remove_from_waiting(p
.first
, false);
687 remote_scrubs
.clear();
692 void ScrubStack::send_state_message(int op
) {
693 MDSRank
*mds
= mdcache
->mds
;
694 set
<mds_rank_t
> up_mds
;
695 mds
->get_mds_map()->get_up_mds_set(up_mds
);
696 for (auto& r
: up_mds
) {
699 auto m
= make_message
<MMDSScrub
>(op
);
700 mds
->send_message_mds(m
, r
);
704 void ScrubStack::scrub_abort(Context
*on_finish
) {
705 ceph_assert(ceph_mutex_is_locked_by_me(mdcache
->mds
->mds_lock
));
707 dout(10) << __func__
<< ": aborting with " << scrubs_in_progress
708 << " scrubs in progress and " << stack_size
<< " in the"
709 << " stack" << dendl
;
711 if (mdcache
->mds
->get_nodeid() == 0) {
712 scrub_epoch_last_abort
= scrub_epoch
;
713 scrub_any_peer_aborting
= true;
714 send_state_message(MMDSScrub::OP_ABORT
);
718 if (scrub_in_transition_state()) {
720 control_ctxs
.push_back(on_finish
);
724 abort_pending_scrubs();
725 if (state
!= STATE_PAUSED
)
726 set_state(STATE_IDLE
);
729 on_finish
->complete(0);
732 void ScrubStack::scrub_pause(Context
*on_finish
) {
733 ceph_assert(ceph_mutex_is_locked_by_me(mdcache
->mds
->mds_lock
));
735 dout(10) << __func__
<< ": pausing with " << scrubs_in_progress
736 << " scrubs in progress and " << stack_size
<< " in the"
737 << " stack" << dendl
;
739 if (mdcache
->mds
->get_nodeid() == 0)
740 send_state_message(MMDSScrub::OP_PAUSE
);
742 // abort is in progress
745 on_finish
->complete(-CEPHFS_EINVAL
);
749 bool done
= scrub_in_transition_state();
751 set_state(STATE_PAUSING
);
753 control_ctxs
.push_back(on_finish
);
757 set_state(STATE_PAUSED
);
759 on_finish
->complete(0);
762 bool ScrubStack::scrub_resume() {
763 ceph_assert(ceph_mutex_is_locked_by_me(mdcache
->mds
->mds_lock
));
764 dout(20) << __func__
<< ": state=" << state
<< dendl
;
766 if (mdcache
->mds
->get_nodeid() == 0)
767 send_state_message(MMDSScrub::OP_RESUME
);
773 } else if (state
== STATE_PAUSING
) {
774 set_state(STATE_RUNNING
);
775 complete_control_contexts(-CEPHFS_ECANCELED
);
776 } else if (state
== STATE_PAUSED
) {
777 set_state(STATE_RUNNING
);
784 // send current scrub summary to cluster log
785 void ScrubStack::clog_scrub_summary(CInode
*in
) {
790 } else if (in
->scrub_is_in_progress()) {
795 clog
->info() << "scrub " << what
<< " for path: " << scrub_inode_path(in
);
798 clog
->info() << "scrub summary: " << scrub_summary();
801 void ScrubStack::dispatch(const cref_t
<Message
> &m
)
803 switch (m
->get_type()) {
805 handle_scrub(ref_cast
<MMDSScrub
>(m
));
808 case MSG_MDS_SCRUB_STATS
:
809 handle_scrub_stats(ref_cast
<MMDSScrubStats
>(m
));
813 derr
<< " scrub stack unknown message " << m
->get_type() << dendl_impl
;
814 ceph_abort_msg("scrub stack unknown message");
818 void ScrubStack::handle_scrub(const cref_t
<MMDSScrub
> &m
)
821 mds_rank_t from
= mds_rank_t(m
->get_source().num());
822 dout(10) << __func__
<< " " << *m
<< " from mds." << from
<< dendl
;
824 switch (m
->get_op()) {
825 case MMDSScrub::OP_QUEUEDIR
:
827 CInode
*diri
= mdcache
->get_inode(m
->get_ino());
830 std::vector
<CDir
*> dfs
;
831 MDSGatherBuilder
gather(g_ceph_context
);
832 for (const auto& fg
: m
->get_frags()) {
833 CDir
*dir
= diri
->get_dirfrag(fg
);
835 dout(10) << __func__
<< " no frag " << fg
<< dendl
;
838 if (!dir
->is_auth()) {
839 dout(10) << __func__
<< " not auth " << *dir
<< dendl
;
842 if (!dir
->can_auth_pin()) {
843 dout(10) << __func__
<< " can't auth pin " << *dir
<< dendl
;
844 dir
->add_waiter(CDir::WAIT_UNFREEZE
, gather
.new_sub());
850 if (gather
.has_subs()) {
851 gather
.set_finisher(new C_MDS_RetryMessage(mdcache
->mds
, m
));
858 ScrubHeaderRef header
;
859 if (auto it
= scrubbing_map
.find(m
->get_tag()); it
!= scrubbing_map
.end()) {
862 header
= std::make_shared
<ScrubHeader
>(m
->get_tag(), m
->is_internal_tag(),
863 m
->is_force(), m
->is_recursive(),
865 header
->set_origin(m
->get_origin());
866 scrubbing_map
.emplace(header
->get_tag(), header
);
868 for (auto dir
: dfs
) {
869 queued
.insert_raw(dir
->get_frag());
870 _enqueue(dir
, header
, true);
876 auto r
= make_message
<MMDSScrub
>(MMDSScrub::OP_QUEUEDIR_ACK
, m
->get_ino(),
877 std::move(queued
), m
->get_tag());
878 mdcache
->mds
->send_message_mds(r
, from
);
881 case MMDSScrub::OP_QUEUEDIR_ACK
:
883 CInode
*diri
= mdcache
->get_inode(m
->get_ino());
885 auto it
= remote_scrubs
.find(diri
);
886 if (it
!= remote_scrubs
.end() &&
887 m
->get_tag() == it
->second
.tag
) {
888 if (it
->second
.gather_set
.erase(from
)) {
889 auto &queued
= diri
->scrub_queued_frags();
890 for (auto &fg
: m
->get_frags())
891 queued
.insert_raw(fg
);
894 if (it
->second
.gather_set
.empty()) {
895 remote_scrubs
.erase(it
);
897 const auto& header
= diri
->get_scrub_header();
898 header
->set_epoch_last_forwarded(scrub_epoch
);
899 remove_from_waiting(diri
);
905 case MMDSScrub::OP_QUEUEINO
:
907 CInode
*in
= mdcache
->get_inode(m
->get_ino());
910 ScrubHeaderRef header
;
911 if (auto it
= scrubbing_map
.find(m
->get_tag()); it
!= scrubbing_map
.end()) {
914 header
= std::make_shared
<ScrubHeader
>(m
->get_tag(), m
->is_internal_tag(),
915 m
->is_force(), m
->is_recursive(),
917 header
->set_origin(m
->get_origin());
918 scrubbing_map
.emplace(header
->get_tag(), header
);
921 _enqueue(in
, header
, true);
922 in
->scrub_queued_frags() = m
->get_frags();
926 auto r
= make_message
<MMDSScrub
>(MMDSScrub::OP_QUEUEINO_ACK
, m
->get_ino(),
927 std::move(queued
), m
->get_tag());
928 mdcache
->mds
->send_message_mds(r
, from
);
931 case MMDSScrub::OP_QUEUEINO_ACK
:
933 CInode
*in
= mdcache
->get_inode(m
->get_ino());
935 auto it
= remote_scrubs
.find(in
);
936 if (it
!= remote_scrubs
.end() &&
937 m
->get_tag() == it
->second
.tag
&&
938 it
->second
.gather_set
.erase(from
)) {
939 ceph_assert(it
->second
.gather_set
.empty());
940 remote_scrubs
.erase(it
);
942 remove_from_waiting(in
, false);
945 const auto& header
= in
->get_scrub_header();
946 header
->set_epoch_last_forwarded(scrub_epoch
);
947 in
->scrub_finished();
953 case MMDSScrub::OP_ABORT
:
954 scrub_abort(nullptr);
956 case MMDSScrub::OP_PAUSE
:
957 scrub_pause(nullptr);
959 case MMDSScrub::OP_RESUME
:
963 derr
<< " scrub stack unknown scrub operation " << m
->get_op() << dendl_impl
;
964 ceph_abort_msg("scrub stack unknown scrub operation");
968 void ScrubStack::handle_scrub_stats(const cref_t
<MMDSScrubStats
> &m
)
970 mds_rank_t from
= mds_rank_t(m
->get_source().num());
971 dout(7) << __func__
<< " " << *m
<< " from mds." << from
<< dendl
;
974 if (scrub_epoch
!= m
->get_epoch() - 1) {
975 scrub_epoch
= m
->get_epoch() - 1;
976 for (auto& p
: scrubbing_map
) {
977 if (p
.second
->get_epoch_last_forwarded())
978 p
.second
->set_epoch_last_forwarded(scrub_epoch
);
981 bool any_finished
= false;
982 bool any_repaired
= false;
983 std::set
<std::string
> scrubbing_tags
;
984 for (auto it
= scrubbing_map
.begin(); it
!= scrubbing_map
.end(); ) {
985 auto& header
= it
->second
;
986 if (header
->get_num_pending() ||
987 header
->get_epoch_last_forwarded() >= scrub_epoch
) {
988 scrubbing_tags
.insert(it
->first
);
990 } else if (m
->is_finished(it
->first
)) {
992 if (header
->get_repaired())
994 scrubbing_map
.erase(it
++);
1000 scrub_epoch
= m
->get_epoch();
1002 auto ack
= make_message
<MMDSScrubStats
>(scrub_epoch
,
1003 std::move(scrubbing_tags
), clear_stack
);
1004 mdcache
->mds
->send_message_mds(ack
, 0);
1007 clog_scrub_summary();
1009 mdcache
->mds
->mdlog
->trim_all();
1011 if (scrub_epoch
== m
->get_epoch() &&
1012 (size_t)from
< mds_scrub_stats
.size()) {
1013 auto& stat
= mds_scrub_stats
[from
];
1014 stat
.epoch_acked
= m
->get_epoch();
1015 stat
.scrubbing_tags
= m
->get_scrubbing_tags();
1016 stat
.aborting
= m
->is_aborting();
1021 void ScrubStack::advance_scrub_status()
1023 if (!scrub_any_peer_aborting
&& scrubbing_map
.empty())
1026 MDSRank
*mds
= mdcache
->mds
;
1028 set
<mds_rank_t
> up_mds
;
1029 mds
->get_mds_map()->get_up_mds_set(up_mds
);
1030 auto up_max
= *up_mds
.rbegin();
1032 bool update_scrubbing
= false;
1033 std::set
<std::string
> scrubbing_tags
;
1036 update_scrubbing
= true;
1037 scrub_any_peer_aborting
= false;
1038 } else if (mds_scrub_stats
.size() > (size_t)(up_max
)) {
1039 bool any_aborting
= false;
1040 bool fully_acked
= true;
1041 for (const auto& stat
: mds_scrub_stats
) {
1042 if (stat
.aborting
|| stat
.epoch_acked
<= scrub_epoch_last_abort
)
1043 any_aborting
= true;
1044 if (stat
.epoch_acked
!= scrub_epoch
) {
1045 fully_acked
= false;
1048 scrubbing_tags
.insert(stat
.scrubbing_tags
.begin(),
1049 stat
.scrubbing_tags
.end());
1052 scrub_any_peer_aborting
= false;
1054 // handle_scrub_stats() reports scrub is still in-progress if it has
1055 // forwarded any object to other mds since previous epoch. Let's assume,
1056 // at time 'A', we got scrub stats from all mds for previous epoch. If
1057 // a scrub is not reported by any mds, we know there is no forward of
1058 // the scrub since time 'A'. So we can consider the scrub is finished.
1059 if (scrub_epoch_fully_acked
+ 1 == scrub_epoch
)
1060 update_scrubbing
= true;
1061 scrub_epoch_fully_acked
= scrub_epoch
;
1065 if (mds_scrub_stats
.size() != (size_t)up_max
+ 1)
1066 mds_scrub_stats
.resize((size_t)up_max
+ 1);
1067 mds_scrub_stats
.at(0).epoch_acked
= scrub_epoch
+ 1;
1069 bool any_finished
= false;
1070 bool any_repaired
= false;
1072 for (auto it
= scrubbing_map
.begin(); it
!= scrubbing_map
.end(); ) {
1073 auto& header
= it
->second
;
1074 if (header
->get_num_pending() ||
1075 header
->get_epoch_last_forwarded() >= scrub_epoch
) {
1076 if (update_scrubbing
&& up_max
!= 0)
1077 scrubbing_tags
.insert(it
->first
);
1079 } else if (update_scrubbing
&& !scrubbing_tags
.count(it
->first
)) {
1080 // no longer being scrubbed globally
1081 any_finished
= true;
1082 if (header
->get_repaired())
1083 any_repaired
= true;
1084 scrubbing_map
.erase(it
++);
1092 for (auto& r
: up_mds
) {
1095 auto m
= update_scrubbing
?
1096 make_message
<MMDSScrubStats
>(scrub_epoch
, scrubbing_tags
) :
1097 make_message
<MMDSScrubStats
>(scrub_epoch
);
1098 mds
->send_message_mds(m
, r
);
1102 clog_scrub_summary();
1104 mdcache
->mds
->mdlog
->trim_all();
1107 void ScrubStack::handle_mds_failure(mds_rank_t mds
)
1110 scrub_abort(nullptr);
1115 for (auto it
= remote_scrubs
.begin(); it
!= remote_scrubs
.end(); ) {
1116 if (it
->second
.gather_set
.erase(mds
) &&
1117 it
->second
.gather_set
.empty()) {
1118 CInode
*in
= it
->first
;
1119 remote_scrubs
.erase(it
++);
1120 remove_from_waiting(in
, false);