*
*/
-#include <iostream>
-
#include "ScrubStack.h"
#include "common/Finisher.h"
#include "mds/MDSRank.h"
#define dout_context g_ceph_context
#define dout_subsys ceph_subsys_mds
#undef dout_prefix
-#define dout_prefix _prefix(_dout, scrubstack->mdcache->mds)
-static ostream& _prefix(std::ostream *_dout, MDSRank *mds) {
+#define dout_prefix _prefix(_dout, mdcache->mds)
+
+using namespace std;
+
+static std::ostream& _prefix(std::ostream *_dout, MDSRank *mds) {
return *_dout << "mds." << mds->get_nodeid() << ".scrubstack ";
}
return os;
}
-void ScrubStack::push_inode(CInode *in)
+void ScrubStack::dequeue(MDSCacheObject *obj)
{
- dout(20) << "pushing " << *in << " on top of ScrubStack" << dendl;
- if (!in->item_scrub.is_on_list()) {
- in->get(CInode::PIN_SCRUBQUEUE);
- stack_size++;
- }
- inode_stack.push_front(&in->item_scrub);
+ dout(20) << "dequeue " << *obj << " from ScrubStack" << dendl;
+ ceph_assert(obj->item_scrub.is_on_list());
+ obj->put(MDSCacheObject::PIN_SCRUBQUEUE);
+ obj->item_scrub.remove_myself();
+ stack_size--;
}
-void ScrubStack::push_inode_bottom(CInode *in)
+int ScrubStack::_enqueue(MDSCacheObject *obj, ScrubHeaderRef& header, bool top)
{
- dout(20) << "pushing " << *in << " on bottom of ScrubStack" << dendl;
- if (!in->item_scrub.is_on_list()) {
- in->get(CInode::PIN_SCRUBQUEUE);
+ ceph_assert(ceph_mutex_is_locked_by_me(mdcache->mds->mds_lock));
+ if (CInode *in = dynamic_cast<CInode*>(obj)) {
+ if (in->scrub_is_in_progress()) {
+ dout(10) << __func__ << " with {" << *in << "}" << ", already in scrubbing" << dendl;
+ return -CEPHFS_EBUSY;
+ }
+
+ dout(10) << __func__ << " with {" << *in << "}" << ", top=" << top << dendl;
+ in->scrub_initialize(header);
+ } else if (CDir *dir = dynamic_cast<CDir*>(obj)) {
+ if (dir->scrub_is_in_progress()) {
+ dout(10) << __func__ << " with {" << *dir << "}" << ", already in scrubbing" << dendl;
+ return -CEPHFS_EBUSY;
+ }
+
+ dout(10) << __func__ << " with {" << *dir << "}" << ", top=" << top << dendl;
+ // The edge directory must be in memory
+ dir->auth_pin(this);
+ dir->scrub_initialize(header);
+ } else {
+ ceph_assert(0 == "queue dentry to scrub stack");
+ }
+
+ dout(20) << "enqueue " << *obj << " to " << (top ? "top" : "bottom") << " of ScrubStack" << dendl;
+ if (!obj->item_scrub.is_on_list()) {
+ obj->get(MDSCacheObject::PIN_SCRUBQUEUE);
stack_size++;
}
- inode_stack.push_back(&in->item_scrub);
+ if (top)
+ scrub_stack.push_front(&obj->item_scrub);
+ else
+ scrub_stack.push_back(&obj->item_scrub);
+ return 0;
}
-void ScrubStack::pop_inode(CInode *in)
+int ScrubStack::enqueue(CInode *in, ScrubHeaderRef& header, bool top)
{
- dout(20) << "popping " << *in
- << " off of ScrubStack" << dendl;
- ceph_assert(in->item_scrub.is_on_list());
- in->put(CInode::PIN_SCRUBQUEUE);
- in->item_scrub.remove_myself();
- stack_size--;
+ // abort in progress
+ if (clear_stack)
+ return -CEPHFS_EAGAIN;
+
+ header->set_origin(in->ino());
+ auto ret = scrubbing_map.emplace(header->get_tag(), header);
+ if (!ret.second) {
+ dout(10) << __func__ << " with {" << *in << "}"
+ << ", conflicting tag " << header->get_tag() << dendl;
+ return -CEPHFS_EEXIST;
+ }
+
+ int r = _enqueue(in, header, top);
+ if (r < 0)
+ return r;
+
+ clog_scrub_summary(in);
+
+ kick_off_scrubs();
+ return 0;
}
-void ScrubStack::_enqueue_inode(CInode *in, CDentry *parent,
- ScrubHeaderRef& header,
- MDSContext *on_finish, bool top)
+void ScrubStack::add_to_waiting(MDSCacheObject *obj)
{
- dout(10) << __func__ << " with {" << *in << "}"
- << ", on_finish=" << on_finish << ", top=" << top << dendl;
- ceph_assert(mdcache->mds->mds_lock.is_locked_by_me());
- in->scrub_initialize(parent, header, on_finish);
- if (top)
- push_inode(in);
- else
- push_inode_bottom(in);
+ scrubs_in_progress++;
+ obj->item_scrub.remove_myself();
+ scrub_waiting.push_back(&obj->item_scrub);
}
-void ScrubStack::enqueue_inode(CInode *in, ScrubHeaderRef& header,
- MDSContext *on_finish, bool top)
+void ScrubStack::remove_from_waiting(MDSCacheObject *obj, bool kick)
{
- // abort in progress
- if (clear_inode_stack) {
- on_finish->complete(-EAGAIN);
- return;
+ scrubs_in_progress--;
+ if (obj->item_scrub.is_on_list()) {
+ obj->item_scrub.remove_myself();
+ scrub_stack.push_front(&obj->item_scrub);
+ if (kick)
+ kick_off_scrubs();
}
-
- _enqueue_inode(in, NULL, header, on_finish, top);
- kick_off_scrubs();
}
+class C_RetryScrub : public MDSInternalContext {
+public:
+ C_RetryScrub(ScrubStack *s, MDSCacheObject *o) :
+ MDSInternalContext(s->mdcache->mds), stack(s), obj(o) {
+ stack->add_to_waiting(obj);
+ }
+ void finish(int r) override {
+ stack->remove_from_waiting(obj);
+ }
+private:
+ ScrubStack *stack;
+ MDSCacheObject *obj;
+};
+
void ScrubStack::kick_off_scrubs()
{
- ceph_assert(mdcache->mds->mds_lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(mdcache->mds->mds_lock));
dout(20) << __func__ << ": state=" << state << dendl;
- if (clear_inode_stack || state == STATE_PAUSING || state == STATE_PAUSED) {
+ if (clear_stack || state == STATE_PAUSING || state == STATE_PAUSED) {
if (scrubs_in_progress == 0) {
dout(10) << __func__ << ": in progress scrub operations finished, "
<< stack_size << " in the stack" << dendl;
State final_state = state;
- if (clear_inode_stack) {
+ if (clear_stack) {
abort_pending_scrubs();
final_state = STATE_IDLE;
}
dout(20) << __func__ << " entering with " << scrubs_in_progress << " in "
"progress and " << stack_size << " in the stack" << dendl;
- bool can_continue = true;
- elist<CInode*>::iterator i = inode_stack.begin();
- while (g_conf()->mds_max_scrub_ops_in_progress > scrubs_in_progress &&
- can_continue) {
- if (i.end()) {
+ elist<MDSCacheObject*>::iterator it = scrub_stack.begin();
+ while (g_conf()->mds_max_scrub_ops_in_progress > scrubs_in_progress) {
+ if (it.end()) {
if (scrubs_in_progress == 0) {
set_state(STATE_IDLE);
}
assert(state == STATE_RUNNING || state == STATE_IDLE);
set_state(STATE_RUNNING);
- CInode *curi = *i;
- ++i; // we have our reference, push iterator forward
+ if (CInode *in = dynamic_cast<CInode*>(*it)) {
+ dout(20) << __func__ << " examining " << *in << dendl;
+ ++it;
- dout(20) << __func__ << " examining " << *curi << dendl;
+ if (!validate_inode_auth(in))
+ continue;
- if (!curi->is_dir()) {
- // it's a regular file, symlink, or hard link
- pop_inode(curi); // we only touch it this once, so remove from stack
+ if (!in->is_dir()) {
+ // it's a regular file, symlink, or hard link
+ dequeue(in); // we only touch it this once, so remove from stack
- if (!curi->scrub_info()->on_finish) {
- scrubs_in_progress++;
- curi->scrub_set_finisher(&scrub_kick);
+ scrub_file_inode(in);
+ } else {
+ bool added_children = false;
+ bool done = false; // it's done, so pop it off the stack
+ scrub_dir_inode(in, &added_children, &done);
+ if (done) {
+ dout(20) << __func__ << " dir inode, done" << dendl;
+ dequeue(in);
+ }
+ if (added_children) {
+ // dirfrags were queued at top of stack
+ it = scrub_stack.begin();
+ }
}
- scrub_file_inode(curi);
- can_continue = true;
- } else {
- bool completed; // it's done, so pop it off the stack
- bool terminal; // not done, but we can start ops on other directories
- bool progress; // it added new dentries to the top of the stack
- scrub_dir_inode(curi, &progress, &terminal, &completed);
- if (completed) {
- dout(20) << __func__ << " dir completed" << dendl;
- pop_inode(curi);
- } else if (progress) {
- dout(20) << __func__ << " dir progressed" << dendl;
- // we added new stuff to top of stack, so reset ourselves there
- i = inode_stack.begin();
+ } else if (CDir *dir = dynamic_cast<CDir*>(*it)) {
+ auto next = it;
+ ++next;
+ bool done = false; // it's done, so pop it off the stack
+ scrub_dirfrag(dir, &done);
+ if (done) {
+ dout(20) << __func__ << " dirfrag, done" << dendl;
+ ++it; // child inodes were queued at bottom of stack
+ dequeue(dir);
} else {
- dout(20) << __func__ << " dir no-op" << dendl;
+ it = next;
}
+ } else {
+ ceph_assert(0 == "dentry in scrub stack");
+ }
+ }
+}
- can_continue = progress || terminal || completed;
+bool ScrubStack::validate_inode_auth(CInode *in)
+{
+ if (in->is_auth()) {
+ if (!in->can_auth_pin()) {
+ dout(10) << __func__ << " can't auth pin" << dendl;
+ in->add_waiter(CInode::WAIT_UNFREEZE, new C_RetryScrub(this, in));
+ return false;
+ }
+ return true;
+ } else {
+ MDSRank *mds = mdcache->mds;
+ if (in->is_ambiguous_auth()) {
+ dout(10) << __func__ << " ambiguous auth" << dendl;
+ in->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, new C_RetryScrub(this, in));
+ } else if (mds->is_cluster_degraded()) {
+ dout(20) << __func__ << " cluster degraded" << dendl;
+ mds->wait_for_cluster_recovered(new C_RetryScrub(this, in));
+ } else {
+ ScrubHeaderRef header = in->get_scrub_header();
+ ceph_assert(header);
+
+ auto ret = remote_scrubs.emplace(std::piecewise_construct,
+ std::forward_as_tuple(in),
+ std::forward_as_tuple());
+ ceph_assert(ret.second); // FIXME: parallel scrubs?
+ auto &scrub_r = ret.first->second;
+ scrub_r.tag = header->get_tag();
+
+ mds_rank_t auth = in->authority().first;
+ dout(10) << __func__ << " forward to mds." << auth << dendl;
+ auto r = make_message<MMDSScrub>(MMDSScrub::OP_QUEUEINO, in->ino(),
+ std::move(in->scrub_queued_frags()),
+ header->get_tag(), header->get_origin(),
+ header->is_internal_tag(), header->get_force(),
+ header->get_recursive(), header->get_repair());
+ mdcache->mds->send_message_mds(r, auth);
+
+ scrub_r.gather_set.insert(auth);
+ // wait for ACK
+ add_to_waiting(in);
}
+ return false;
}
}
-void ScrubStack::scrub_dir_inode(CInode *in,
- bool *added_children,
- bool *terminal,
- bool *done)
+void ScrubStack::scrub_dir_inode(CInode *in, bool *added_children, bool *done)
{
dout(10) << __func__ << " " << *in << dendl;
-
- *added_children = false;
- bool all_frags_terminal = true;
- bool all_frags_done = true;
+ ceph_assert(in->is_auth());
+ MDSRank *mds = mdcache->mds;
ScrubHeaderRef header = in->get_scrub_header();
- ceph_assert(header != nullptr);
-
- if (header->get_recursive()) {
- frag_vec_t scrubbing_frags;
- list<CDir*> scrubbing_cdirs;
- in->scrub_dirfrags_scrubbing(&scrubbing_frags);
- dout(20) << __func__ << " iterating over " << scrubbing_frags.size()
- << " scrubbing frags" << dendl;
- for (const auto& fg : scrubbing_frags) {
- // turn frags into CDir *
- CDir *dir = in->get_dirfrag(fg);
- if (dir) {
- scrubbing_cdirs.push_back(dir);
- dout(25) << __func__ << " got CDir " << *dir << " presently scrubbing" << dendl;
+ ceph_assert(header);
+
+ MDSGatherBuilder gather(g_ceph_context);
+
+ auto &queued = in->scrub_queued_frags();
+ std::map<mds_rank_t, fragset_t> scrub_remote;
+
+ frag_vec_t frags;
+ in->dirfragtree.get_leaves(frags);
+ dout(20) << __func__ << "recursive mode, frags " << frags << dendl;
+ for (auto &fg : frags) {
+ if (queued.contains(fg))
+ continue;
+ CDir *dir = in->get_or_open_dirfrag(mdcache, fg);
+ if (!dir->is_auth()) {
+ if (dir->is_ambiguous_auth()) {
+ dout(20) << __func__ << " ambiguous auth " << *dir << dendl;
+ dir->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, gather.new_sub());
+ } else if (mds->is_cluster_degraded()) {
+ dout(20) << __func__ << " cluster degraded" << dendl;
+ mds->wait_for_cluster_recovered(gather.new_sub());
} else {
- in->scrub_dirfrag_finished(fg);
- dout(25) << __func__ << " missing dirfrag " << fg << " skip scrubbing" << dendl;
+ mds_rank_t auth = dir->authority().first;
+ scrub_remote[auth].insert_raw(fg);
}
+ } else if (!dir->can_auth_pin()) {
+ dout(20) << __func__ << " freezing/frozen " << *dir << dendl;
+ dir->add_waiter(CDir::WAIT_UNFREEZE, gather.new_sub());
+ } else if (dir->get_version() == 0) {
+ dout(20) << __func__ << " barebones " << *dir << dendl;
+ dir->fetch(gather.new_sub());
+ } else {
+ _enqueue(dir, header, true);
+ queued.insert_raw(dir->get_frag());
+ *added_children = true;
}
-
- dout(20) << __func__ << " consuming from " << scrubbing_cdirs.size()
- << " scrubbing cdirs" << dendl;
-
- list<CDir*>::iterator i = scrubbing_cdirs.begin();
- while (g_conf()->mds_max_scrub_ops_in_progress > scrubs_in_progress) {
- // select next CDir
- CDir *cur_dir = NULL;
- if (i != scrubbing_cdirs.end()) {
- cur_dir = *i;
- ++i;
- dout(20) << __func__ << " got cur_dir = " << *cur_dir << dendl;
- } else {
- bool ready = get_next_cdir(in, &cur_dir);
- dout(20) << __func__ << " get_next_cdir ready=" << ready << dendl;
-
- if (ready && cur_dir) {
- scrubbing_cdirs.push_back(cur_dir);
- } else if (!ready) {
- // We are waiting for load of a frag
- all_frags_done = false;
- all_frags_terminal = false;
- break;
- } else {
- // Finished with all frags
- break;
- }
- }
- // scrub that CDir
- bool frag_added_children = false;
- bool frag_terminal = true;
- bool frag_done = false;
- scrub_dirfrag(cur_dir, header,
- &frag_added_children, &frag_terminal, &frag_done);
- if (frag_done) {
- cur_dir->inode->scrub_dirfrag_finished(cur_dir->frag);
- }
- *added_children |= frag_added_children;
- all_frags_terminal = all_frags_terminal && frag_terminal;
- all_frags_done = all_frags_done && frag_done;
- }
-
- dout(20) << "finished looping; all_frags_terminal=" << all_frags_terminal
- << ", all_frags_done=" << all_frags_done << dendl;
- } else {
- dout(20) << "!scrub_recursive" << dendl;
}
- if (all_frags_done) {
- assert (!*added_children); // can't do this if children are still pending
+ queued.simplify();
- // OK, so now I can... fire off a validate on the dir inode, and
- // when it completes, come through here again, noticing that we've
- // set a flag to indicate the validate happened, and
- scrub_dir_inode_final(in);
+ if (gather.has_subs()) {
+ gather.set_finisher(new C_RetryScrub(this, in));
+ gather.activate();
+ return;
}
- *terminal = all_frags_terminal;
- *done = all_frags_done;
- dout(10) << __func__ << " is exiting " << *terminal << " " << *done << dendl;
- return;
-}
-
-bool ScrubStack::get_next_cdir(CInode *in, CDir **new_dir)
-{
- dout(20) << __func__ << " on " << *in << dendl;
- frag_t next_frag;
- int r = in->scrub_dirfrag_next(&next_frag);
- assert (r >= 0);
-
- if (r == 0) {
- // we got a frag to scrub, otherwise it would be ENOENT
- dout(25) << "looking up new frag " << next_frag << dendl;
- CDir *next_dir = in->get_or_open_dirfrag(mdcache, next_frag);
- if (!next_dir->is_complete()) {
- scrubs_in_progress++;
- next_dir->fetch(&scrub_kick);
- dout(25) << "fetching frag from RADOS" << dendl;
- return false;
+ if (!scrub_remote.empty()) {
+ auto ret = remote_scrubs.emplace(std::piecewise_construct,
+ std::forward_as_tuple(in),
+ std::forward_as_tuple());
+ ceph_assert(ret.second); // FIXME: parallel scrubs?
+ auto &scrub_r = ret.first->second;
+ scrub_r.tag = header->get_tag();
+
+ for (auto& p : scrub_remote) {
+ p.second.simplify();
+ dout(20) << __func__ << " forward " << p.second << " to mds." << p.first << dendl;
+ auto r = make_message<MMDSScrub>(MMDSScrub::OP_QUEUEDIR, in->ino(),
+ std::move(p.second), header->get_tag(),
+ header->get_origin(), header->is_internal_tag(),
+ header->get_force(), header->get_recursive(),
+ header->get_repair());
+ mds->send_message_mds(r, p.first);
+ scrub_r.gather_set.insert(p.first);
}
- *new_dir = next_dir;
- dout(25) << "returning dir " << *new_dir << dendl;
- return true;
+ // wait for ACKs
+ add_to_waiting(in);
+ return;
}
- ceph_assert(r == ENOENT);
- // there are no dirfrags left
- *new_dir = NULL;
- return true;
+
+ scrub_dir_inode_final(in);
+
+ *done = true;
+ dout(10) << __func__ << " done" << dendl;
}
class C_InodeValidated : public MDSInternalContext
{
- public:
- ScrubStack *stack;
- CInode::validated_data result;
- CInode *target;
-
- C_InodeValidated(MDSRank *mds, ScrubStack *stack_, CInode *target_)
- : MDSInternalContext(mds), stack(stack_), target(target_)
- {}
+public:
+ ScrubStack *stack;
+ CInode::validated_data result;
+ CInode *target;
- void finish(int r) override
- {
- stack->_validate_inode_done(target, r, result);
- }
+ C_InodeValidated(MDSRank *mds, ScrubStack *stack_, CInode *target_)
+ : MDSInternalContext(mds), stack(stack_), target(target_)
+ {
+ stack->scrubs_in_progress++;
+ }
+ void finish(int r) override {
+ stack->_validate_inode_done(target, r, result);
+ stack->scrubs_in_progress--;
+ stack->kick_off_scrubs();
+ }
};
-
void ScrubStack::scrub_dir_inode_final(CInode *in)
{
dout(20) << __func__ << " " << *in << dendl;
- // Two passes through this function. First one triggers inode validation,
- // second one sets finally_done
- // FIXME: kind of overloading scrub_in_progress here, using it while
- // dentry is still on stack to indicate that we have finished
- // doing our validate_disk_state on the inode
- // FIXME: the magic-constructing scrub_info() is going to leave
- // an unneeded scrub_infop lying around here
- if (!in->scrub_info()->children_scrubbed) {
- if (!in->scrub_info()->on_finish) {
- scrubs_in_progress++;
- in->scrub_set_finisher(&scrub_kick);
- }
-
- in->scrub_children_finished();
- C_InodeValidated *fin = new C_InodeValidated(mdcache->mds, this, in);
- in->validate_disk_state(&fin->result, fin);
- }
-
+ C_InodeValidated *fin = new C_InodeValidated(mdcache->mds, this, in);
+ in->validate_disk_state(&fin->result, fin);
return;
}
-void ScrubStack::scrub_dirfrag(CDir *dir,
- ScrubHeaderRef& header,
- bool *added_children, bool *is_terminal,
- bool *done)
+void ScrubStack::scrub_dirfrag(CDir *dir, bool *done)
{
ceph_assert(dir != NULL);
- dout(20) << __func__ << " on " << *dir << dendl;
- *added_children = false;
- *is_terminal = false;
- *done = false;
+ dout(10) << __func__ << " " << *dir << dendl;
-
- if (!dir->scrub_info()->directory_scrubbing) {
- // Get the frag complete before calling
- // scrub initialize, so that it can populate its lists
- // of dentries.
- if (!dir->is_complete()) {
- scrubs_in_progress++;
- dir->fetch(&scrub_kick);
- return;
- }
-
- dir->scrub_initialize(header);
+ if (!dir->is_complete()) {
+ dir->fetch(new C_RetryScrub(this, dir), true); // already auth pinned
+ dout(10) << __func__ << " incomplete, fetching" << dendl;
+ return;
}
- int r = 0;
- while(r == 0) {
- CDentry *dn = NULL;
- scrubs_in_progress++;
- r = dir->scrub_dentry_next(&scrub_kick, &dn);
- if (r != EAGAIN) {
- scrubs_in_progress--;
- }
-
- if (r == EAGAIN) {
- // Drop out, CDir fetcher will call back our kicker context
- dout(20) << __func__ << " waiting for fetch on " << *dir << dendl;
- return;
- }
-
- if (r == ENOENT) {
- // Nothing left to scrub, are we done?
- std::list<CDentry*> scrubbing;
- dir->scrub_dentries_scrubbing(&scrubbing);
- if (scrubbing.empty()) {
- dout(20) << __func__ << " dirfrag done: " << *dir << dendl;
- // FIXME: greg: What's the diff meant to be between done and terminal
- dir->scrub_finished();
- *done = true;
- *is_terminal = true;
- } else {
- dout(20) << __func__ << " " << scrubbing.size() << " dentries still "
- "scrubbing in " << *dir << dendl;
+ ScrubHeaderRef header = dir->get_scrub_header();
+ version_t last_scrub = dir->scrub_info()->last_recursive.version;
+ if (header->get_recursive()) {
+ for (auto it = dir->begin(); it != dir->end(); ++it) {
+ if (it->first.snapid != CEPH_NOSNAP)
+ continue;
+ CDentry *dn = it->second;
+ CDentry::linkage_t *dnl = dn->get_linkage();
+ if (dn->get_version() <= last_scrub &&
+ dnl->get_remote_d_type() != DT_DIR &&
+ !header->get_force()) {
+ dout(15) << __func__ << " skip dentry " << it->first
+ << ", no change since last scrub" << dendl;
+ continue;
+ }
+ if (dnl->is_primary()) {
+ _enqueue(dnl->get_inode(), header, false);
+ } else if (dnl->is_remote()) {
+ // TODO: check remote linkage
}
- return;
}
+ }
- // scrub_dentry_next defined to only give EAGAIN, ENOENT, 0 -- we should
- // never get random IO errors here.
- ceph_assert(r == 0);
+ dir->scrub_local();
- _enqueue_inode(dn->get_projected_inode(), dn, header, NULL, true);
+ dir->scrub_finished();
+ dir->auth_unpin(this);
- *added_children = true;
- }
+ *done = true;
+ dout(10) << __func__ << " done" << dendl;
}
void ScrubStack::scrub_file_inode(CInode *in)
{
// Record backtrace fails as remote linkage damage, as
// we may not be able to resolve hard links to this inode
- mdcache->mds->damage_table.notify_remote_damaged(in->inode.ino, path);
+ mdcache->mds->damage_table.notify_remote_damaged(in->ino(), path);
} else if (result.inode.checked && !result.inode.passed &&
!result.inode.repaired) {
// Record damaged inode structures as damaged dentries as
// Put the verbose JSON output into the MDS log for later inspection
JSONFormatter f;
result.dump(&f);
- std::ostringstream out;
- f.flush(out);
- derr << __func__ << " scrub error on inode " << *in << ": " << out.str()
+ CachedStackStringStream css;
+ f.flush(*css);
+ derr << __func__ << " scrub error on inode " << *in << ": " << css->strv()
<< dendl;
} else {
dout(10) << __func__ << " scrub passed on inode " << *in << dendl;
}
- MDSContext *c = NULL;
- in->scrub_finished(&c);
-
- if (in == header->get_origin()) {
- scrub_origins.erase(in);
- if (!header->get_recursive()) {
- if (r >= 0) { // we got into the scrubbing dump it
- result.dump(&(header->get_formatter()));
- } else { // we failed the lookup or something; dump ourselves
- header->get_formatter().open_object_section("results");
- header->get_formatter().dump_int("return_code", r);
- header->get_formatter().close_section(); // results
- }
- }
- }
- if (c) {
- finisher->queue(new MDSIOContextWrapper(mdcache->mds, c), 0);
- }
+ in->scrub_finished();
}
-ScrubStack::C_KickOffScrubs::C_KickOffScrubs(MDCache *mdcache, ScrubStack *s)
- : MDSInternalContext(mdcache->mds), stack(s) { }
-
void ScrubStack::complete_control_contexts(int r) {
- ceph_assert(mdcache->mds->mds_lock.is_locked_by_me());
+ ceph_assert(ceph_mutex_is_locked_by_me(mdcache->mds->mds_lock));
for (auto &ctx : control_ctxs) {
ctx->complete(r);
dout(20) << __func__ << ", from state=" << state << ", to state="
<< next_state << dendl;
state = next_state;
+ clog_scrub_summary();
}
}
bool ScrubStack::scrub_in_transition_state() {
- ceph_assert(mdcache->mds->mds_lock.is_locked_by_me());
+ ceph_assert(ceph_mutex_is_locked_by_me(mdcache->mds->mds_lock));
dout(20) << __func__ << ": state=" << state << dendl;
// STATE_RUNNING is considered as a transition state so as to
return false;
}
+std::string_view ScrubStack::scrub_summary() {
+ ceph_assert(ceph_mutex_is_locked_by_me(mdcache->mds->mds_lock));
+
+ bool have_more = false;
+ CachedStackStringStream cs;
+
+ if (state == STATE_IDLE) {
+ if (scrubbing_map.empty())
+ return "idle";
+ *cs << "idle+waiting";
+ }
+
+ if (state == STATE_RUNNING) {
+ if (clear_stack) {
+ *cs << "aborting";
+ } else {
+ *cs << "active";
+ }
+ } else {
+ if (state == STATE_PAUSING) {
+ have_more = true;
+ *cs << "pausing";
+ } else if (state == STATE_PAUSED) {
+ have_more = true;
+ *cs << "paused";
+ }
+
+ if (clear_stack) {
+ if (have_more) {
+ *cs << "+";
+ }
+ *cs << "aborting";
+ }
+ }
+
+ if (!scrubbing_map.empty()) {
+ *cs << " paths [";
+ bool first = true;
+ for (auto &p : scrubbing_map) {
+ if (!first)
+ *cs << ",";
+ auto& header = p.second;
+ if (CInode *in = mdcache->get_inode(header->get_origin()))
+ *cs << scrub_inode_path(in);
+ else
+ *cs << "#" << header->get_origin();
+ first = false;
+ }
+ *cs << "]";
+ }
+
+ return cs->strv();
+}
+
void ScrubStack::scrub_status(Formatter *f) {
- ceph_assert(mdcache->mds->mds_lock.is_locked_by_me());
+ ceph_assert(ceph_mutex_is_locked_by_me(mdcache->mds->mds_lock));
f->open_object_section("result");
- std::stringstream ss;
+ CachedStackStringStream css;
bool have_more = false;
if (state == STATE_IDLE) {
- ss << "no active scrubs running";
+ if (scrubbing_map.empty())
+ *css << "no active scrubs running";
+ else
+ *css << state << " (waiting for more scrubs)";
} else if (state == STATE_RUNNING) {
- if (clear_inode_stack) {
- ss << "ABORTING";
+ if (clear_stack) {
+ *css << "ABORTING";
} else {
- ss << "scrub active";
+ *css << "scrub active";
}
- ss << " (" << stack_size << " inodes in the stack)";
+ *css << " (" << stack_size << " inodes in the stack)";
} else {
if (state == STATE_PAUSING || state == STATE_PAUSED) {
have_more = true;
- ss << state;
+ *css << state;
}
- if (clear_inode_stack) {
+ if (clear_stack) {
if (have_more) {
- ss << "+";
+ *css << "+";
}
- ss << "ABORTING";
+ *css << "ABORTING";
}
- ss << " (" << stack_size << " inodes in the stack)";
+ *css << " (" << stack_size << " inodes in the stack)";
}
- f->dump_string("status", ss.str());
+ f->dump_string("status", css->strv());
f->open_object_section("scrubs");
- for (auto &inode : scrub_origins) {
+
+ for (auto& p : scrubbing_map) {
have_more = false;
- ScrubHeaderRefConst header = inode->get_scrub_header();
+ auto& header = p.second;
std::string tag(header->get_tag());
f->open_object_section(tag.c_str()); // scrub id
- std::string path;
- inode->make_path_string(path, true);
- f->dump_string("path", path.empty() ? "/" : path.c_str());
+ if (CInode *in = mdcache->get_inode(header->get_origin()))
+ f->dump_string("path", scrub_inode_path(in));
+ else
+ f->dump_stream("path") << "#" << header->get_origin();
- std::stringstream optss;
+ f->dump_string("tag", header->get_tag());
+
+ CachedStackStringStream optcss;
if (header->get_recursive()) {
- optss << "recursive";
+ *optcss << "recursive";
have_more = true;
}
if (header->get_repair()) {
if (have_more) {
- optss << ",";
+ *optcss << ",";
}
- optss << "repair";
+ *optcss << "repair";
have_more = true;
}
if (header->get_force()) {
if (have_more) {
- optss << ",";
+ *optcss << ",";
}
- optss << "force";
+ *optcss << "force";
}
- f->dump_string("options", optss.str());
+ f->dump_string("options", optcss->strv());
f->close_section(); // scrub id
}
f->close_section(); // scrubs
}
void ScrubStack::abort_pending_scrubs() {
- ceph_assert(mdcache->mds->mds_lock.is_locked_by_me());
- ceph_assert(clear_inode_stack);
-
- for (auto inode = inode_stack.begin(); !inode.end(); ++inode) {
- CInode *in = *inode;
- if (in == in->scrub_info()->header->get_origin()) {
- scrub_origins.erase(in);
- }
-
- MDSContext *ctx = nullptr;
- in->scrub_aborted(&ctx);
- if (ctx != nullptr) {
- ctx->complete(-ECANCELED);
+ ceph_assert(ceph_mutex_is_locked_by_me(mdcache->mds->mds_lock));
+ ceph_assert(clear_stack);
+
+ auto abort_one = [this](MDSCacheObject *obj) {
+ if (CInode *in = dynamic_cast<CInode*>(obj)) {
+ in->scrub_aborted();
+ } else if (CDir *dir = dynamic_cast<CDir*>(obj)) {
+ dir->scrub_aborted();
+ dir->auth_unpin(this);
+ } else {
+ ceph_abort(0 == "dentry in scrub stack");
}
- }
+ };
+ for (auto it = scrub_stack.begin(); !it.end(); ++it)
+ abort_one(*it);
+ for (auto it = scrub_waiting.begin(); !it.end(); ++it)
+ abort_one(*it);
stack_size = 0;
- inode_stack.clear();
- clear_inode_stack = false;
+ scrub_stack.clear();
+ scrub_waiting.clear();
+
+ for (auto& p : remote_scrubs)
+ remove_from_waiting(p.first, false);
+ remote_scrubs.clear();
+
+ clear_stack = false;
+}
+
+void ScrubStack::send_state_message(int op) {
+ MDSRank *mds = mdcache->mds;
+ set<mds_rank_t> up_mds;
+ mds->get_mds_map()->get_up_mds_set(up_mds);
+ for (auto& r : up_mds) {
+ if (r == 0)
+ continue;
+ auto m = make_message<MMDSScrub>(op);
+ mds->send_message_mds(m, r);
+ }
}
void ScrubStack::scrub_abort(Context *on_finish) {
- ceph_assert(mdcache->mds->mds_lock.is_locked_by_me());
- ceph_assert(on_finish != nullptr);
+ ceph_assert(ceph_mutex_is_locked_by_me(mdcache->mds->mds_lock));
dout(10) << __func__ << ": aborting with " << scrubs_in_progress
<< " scrubs in progress and " << stack_size << " in the"
<< " stack" << dendl;
- clear_inode_stack = true;
+ if (mdcache->mds->get_nodeid() == 0) {
+ scrub_epoch_last_abort = scrub_epoch;
+ scrub_any_peer_aborting = true;
+ send_state_message(MMDSScrub::OP_ABORT);
+ }
+
+ clear_stack = true;
if (scrub_in_transition_state()) {
- control_ctxs.push_back(on_finish);
+ if (on_finish)
+ control_ctxs.push_back(on_finish);
return;
}
abort_pending_scrubs();
- if (state != STATE_PAUSED) {
+ if (state != STATE_PAUSED)
set_state(STATE_IDLE);
- }
- on_finish->complete(0);
+
+ if (on_finish)
+ on_finish->complete(0);
}
void ScrubStack::scrub_pause(Context *on_finish) {
- ceph_assert(mdcache->mds->mds_lock.is_locked_by_me());
- ceph_assert(on_finish != nullptr);
+ ceph_assert(ceph_mutex_is_locked_by_me(mdcache->mds->mds_lock));
dout(10) << __func__ << ": pausing with " << scrubs_in_progress
<< " scrubs in progress and " << stack_size << " in the"
<< " stack" << dendl;
+ if (mdcache->mds->get_nodeid() == 0)
+ send_state_message(MMDSScrub::OP_PAUSE);
+
// abort is in progress
- if (clear_inode_stack) {
- on_finish->complete(-EINVAL);
+ if (clear_stack) {
+ if (on_finish)
+ on_finish->complete(-CEPHFS_EINVAL);
return;
}
bool done = scrub_in_transition_state();
if (done) {
set_state(STATE_PAUSING);
- control_ctxs.push_back(on_finish);
+ if (on_finish)
+ control_ctxs.push_back(on_finish);
return;
}
set_state(STATE_PAUSED);
- on_finish->complete(0);
+ if (on_finish)
+ on_finish->complete(0);
}
bool ScrubStack::scrub_resume() {
- ceph_assert(mdcache->mds->mds_lock.is_locked_by_me());
+ ceph_assert(ceph_mutex_is_locked_by_me(mdcache->mds->mds_lock));
dout(20) << __func__ << ": state=" << state << dendl;
+ if (mdcache->mds->get_nodeid() == 0)
+ send_state_message(MMDSScrub::OP_RESUME);
+
int r = 0;
- if (clear_inode_stack) {
- r = -EINVAL;
+ if (clear_stack) {
+ r = -CEPHFS_EINVAL;
} else if (state == STATE_PAUSING) {
set_state(STATE_RUNNING);
- complete_control_contexts(-ECANCELED);
+ complete_control_contexts(-CEPHFS_ECANCELED);
} else if (state == STATE_PAUSED) {
set_state(STATE_RUNNING);
kick_off_scrubs();
return r;
}
+
+// send current scrub summary to cluster log
+void ScrubStack::clog_scrub_summary(CInode *in) {
+ if (in) {
+ std::string what;
+ if (clear_stack) {
+ what = "aborted";
+ } else if (in->scrub_is_in_progress()) {
+ what = "queued";
+ } else {
+ what = "completed";
+ }
+ clog->info() << "scrub " << what << " for path: " << scrub_inode_path(in);
+ }
+
+ clog->info() << "scrub summary: " << scrub_summary();
+}
+
+void ScrubStack::dispatch(const cref_t<Message> &m)
+{
+ switch (m->get_type()) {
+ case MSG_MDS_SCRUB:
+ handle_scrub(ref_cast<MMDSScrub>(m));
+ break;
+
+ case MSG_MDS_SCRUB_STATS:
+ handle_scrub_stats(ref_cast<MMDSScrubStats>(m));
+ break;
+
+ default:
+ derr << " scrub stack unknown message " << m->get_type() << dendl_impl;
+ ceph_abort_msg("scrub stack unknown message");
+ }
+}
+
+void ScrubStack::handle_scrub(const cref_t<MMDSScrub> &m)
+{
+
+ mds_rank_t from = mds_rank_t(m->get_source().num());
+ dout(10) << __func__ << " " << *m << " from mds." << from << dendl;
+
+ switch (m->get_op()) {
+ case MMDSScrub::OP_QUEUEDIR:
+ {
+ CInode *diri = mdcache->get_inode(m->get_ino());
+ ceph_assert(diri);
+
+ std::vector<CDir*> dfs;
+ MDSGatherBuilder gather(g_ceph_context);
+ for (const auto& fg : m->get_frags()) {
+ CDir *dir = diri->get_dirfrag(fg);
+ if (!dir) {
+ dout(10) << __func__ << " no frag " << fg << dendl;
+ continue;
+ }
+ if (!dir->is_auth()) {
+ dout(10) << __func__ << " not auth " << *dir << dendl;
+ continue;
+ }
+ if (!dir->can_auth_pin()) {
+ dout(10) << __func__ << " can't auth pin " << *dir << dendl;
+ dir->add_waiter(CDir::WAIT_UNFREEZE, gather.new_sub());
+ continue;
+ }
+ dfs.push_back(dir);
+ }
+
+ if (gather.has_subs()) {
+ gather.set_finisher(new C_MDS_RetryMessage(mdcache->mds, m));
+ gather.activate();
+ return;
+ }
+
+ fragset_t queued;
+ if (!dfs.empty()) {
+ ScrubHeaderRef header;
+ if (auto it = scrubbing_map.find(m->get_tag()); it != scrubbing_map.end()) {
+ header = it->second;
+ } else {
+ header = std::make_shared<ScrubHeader>(m->get_tag(), m->is_internal_tag(),
+ m->is_force(), m->is_recursive(),
+ m->is_repair());
+ header->set_origin(m->get_origin());
+ scrubbing_map.emplace(header->get_tag(), header);
+ }
+ for (auto dir : dfs) {
+ queued.insert_raw(dir->get_frag());
+ _enqueue(dir, header, true);
+ }
+ queued.simplify();
+ kick_off_scrubs();
+ }
+
+ auto r = make_message<MMDSScrub>(MMDSScrub::OP_QUEUEDIR_ACK, m->get_ino(),
+ std::move(queued), m->get_tag());
+ mdcache->mds->send_message_mds(r, from);
+ }
+ break;
+ case MMDSScrub::OP_QUEUEDIR_ACK:
+ {
+ CInode *diri = mdcache->get_inode(m->get_ino());
+ ceph_assert(diri);
+ auto it = remote_scrubs.find(diri);
+ if (it != remote_scrubs.end() &&
+ m->get_tag() == it->second.tag) {
+ if (it->second.gather_set.erase(from)) {
+ auto &queued = diri->scrub_queued_frags();
+ for (auto &fg : m->get_frags())
+ queued.insert_raw(fg);
+ queued.simplify();
+
+ if (it->second.gather_set.empty()) {
+ remote_scrubs.erase(it);
+
+ const auto& header = diri->get_scrub_header();
+ header->set_epoch_last_forwarded(scrub_epoch);
+ remove_from_waiting(diri);
+ }
+ }
+ }
+ }
+ break;
+ case MMDSScrub::OP_QUEUEINO:
+ {
+ CInode *in = mdcache->get_inode(m->get_ino());
+ ceph_assert(in);
+
+ ScrubHeaderRef header;
+ if (auto it = scrubbing_map.find(m->get_tag()); it != scrubbing_map.end()) {
+ header = it->second;
+ } else {
+ header = std::make_shared<ScrubHeader>(m->get_tag(), m->is_internal_tag(),
+ m->is_force(), m->is_recursive(),
+ m->is_repair());
+ header->set_origin(m->get_origin());
+ scrubbing_map.emplace(header->get_tag(), header);
+ }
+
+ _enqueue(in, header, true);
+ in->scrub_queued_frags() = m->get_frags();
+ kick_off_scrubs();
+
+ fragset_t queued;
+ auto r = make_message<MMDSScrub>(MMDSScrub::OP_QUEUEINO_ACK, m->get_ino(),
+ std::move(queued), m->get_tag());
+ mdcache->mds->send_message_mds(r, from);
+ }
+ break;
+ case MMDSScrub::OP_QUEUEINO_ACK:
+ {
+ CInode *in = mdcache->get_inode(m->get_ino());
+ ceph_assert(in);
+ auto it = remote_scrubs.find(in);
+ if (it != remote_scrubs.end() &&
+ m->get_tag() == it->second.tag &&
+ it->second.gather_set.erase(from)) {
+ ceph_assert(it->second.gather_set.empty());
+ remote_scrubs.erase(it);
+
+ remove_from_waiting(in, false);
+ dequeue(in);
+
+ const auto& header = in->get_scrub_header();
+ header->set_epoch_last_forwarded(scrub_epoch);
+ in->scrub_finished();
+
+ kick_off_scrubs();
+ }
+ }
+ break;
+ case MMDSScrub::OP_ABORT:
+ scrub_abort(nullptr);
+ break;
+ case MMDSScrub::OP_PAUSE:
+ scrub_pause(nullptr);
+ break;
+ case MMDSScrub::OP_RESUME:
+ scrub_resume();
+ break;
+ default:
+ derr << " scrub stack unknown scrub operation " << m->get_op() << dendl_impl;
+ ceph_abort_msg("scrub stack unknown scrub operation");
+ }
+}
+
+void ScrubStack::handle_scrub_stats(const cref_t<MMDSScrubStats> &m)
+{
+ mds_rank_t from = mds_rank_t(m->get_source().num());
+ dout(7) << __func__ << " " << *m << " from mds." << from << dendl;
+
+ if (from == 0) {
+ if (scrub_epoch != m->get_epoch() - 1) {
+ scrub_epoch = m->get_epoch() - 1;
+ for (auto& p : scrubbing_map) {
+ if (p.second->get_epoch_last_forwarded())
+ p.second->set_epoch_last_forwarded(scrub_epoch);
+ }
+ }
+ bool any_finished = false;
+ bool any_repaired = false;
+ std::set<std::string> scrubbing_tags;
+ for (auto it = scrubbing_map.begin(); it != scrubbing_map.end(); ) {
+ auto& header = it->second;
+ if (header->get_num_pending() ||
+ header->get_epoch_last_forwarded() >= scrub_epoch) {
+ scrubbing_tags.insert(it->first);
+ ++it;
+ } else if (m->is_finished(it->first)) {
+ any_finished = true;
+ if (header->get_repaired())
+ any_repaired = true;
+ scrubbing_map.erase(it++);
+ } else {
+ ++it;
+ }
+ }
+
+ scrub_epoch = m->get_epoch();
+
+ auto ack = make_message<MMDSScrubStats>(scrub_epoch,
+ std::move(scrubbing_tags), clear_stack);
+ mdcache->mds->send_message_mds(ack, 0);
+
+ if (any_finished)
+ clog_scrub_summary();
+ if (any_repaired)
+ mdcache->mds->mdlog->trim_all();
+ } else {
+ if (scrub_epoch == m->get_epoch() &&
+ (size_t)from < mds_scrub_stats.size()) {
+ auto& stat = mds_scrub_stats[from];
+ stat.epoch_acked = m->get_epoch();
+ stat.scrubbing_tags = m->get_scrubbing_tags();
+ stat.aborting = m->is_aborting();
+ }
+ }
+}
+
+void ScrubStack::advance_scrub_status()
+{
+ if (!scrub_any_peer_aborting && scrubbing_map.empty())
+ return;
+
+ MDSRank *mds = mdcache->mds;
+
+ set<mds_rank_t> up_mds;
+ mds->get_mds_map()->get_up_mds_set(up_mds);
+ auto up_max = *up_mds.rbegin();
+
+ bool update_scrubbing = false;
+ std::set<std::string> scrubbing_tags;
+
+ if (up_max == 0) {
+ update_scrubbing = true;
+ scrub_any_peer_aborting = false;
+ } else if (mds_scrub_stats.size() > (size_t)(up_max)) {
+ bool any_aborting = false;
+ bool fully_acked = true;
+ for (const auto& stat : mds_scrub_stats) {
+ if (stat.aborting || stat.epoch_acked <= scrub_epoch_last_abort)
+ any_aborting = true;
+ if (stat.epoch_acked != scrub_epoch) {
+ fully_acked = false;
+ continue;
+ }
+ scrubbing_tags.insert(stat.scrubbing_tags.begin(),
+ stat.scrubbing_tags.end());
+ }
+ if (!any_aborting)
+ scrub_any_peer_aborting = false;
+ if (fully_acked) {
+ // handle_scrub_stats() reports scrub is still in-progress if it has
+ // forwarded any object to other mds since previous epoch. Let's assume,
+ // at time 'A', we got scrub stats from all mds for previous epoch. If
+ // a scrub is not reported by any mds, we know there is no forward of
+ // the scrub since time 'A'. So we can consider the scrub is finished.
+ if (scrub_epoch_fully_acked + 1 == scrub_epoch)
+ update_scrubbing = true;
+ scrub_epoch_fully_acked = scrub_epoch;
+ }
+ }
+
+ if (mds_scrub_stats.size() != (size_t)up_max + 1)
+ mds_scrub_stats.resize((size_t)up_max + 1);
+ mds_scrub_stats.at(0).epoch_acked = scrub_epoch + 1;
+
+ bool any_finished = false;
+ bool any_repaired = false;
+
+ for (auto it = scrubbing_map.begin(); it != scrubbing_map.end(); ) {
+ auto& header = it->second;
+ if (header->get_num_pending() ||
+ header->get_epoch_last_forwarded() >= scrub_epoch) {
+ if (update_scrubbing && up_max != 0)
+ scrubbing_tags.insert(it->first);
+ ++it;
+ } else if (update_scrubbing && !scrubbing_tags.count(it->first)) {
+ // no longer being scrubbed globally
+ any_finished = true;
+ if (header->get_repaired())
+ any_repaired = true;
+ scrubbing_map.erase(it++);
+ } else {
+ ++it;
+ }
+ }
+
+ ++scrub_epoch;
+
+ for (auto& r : up_mds) {
+ if (r == 0)
+ continue;
+ auto m = update_scrubbing ?
+ make_message<MMDSScrubStats>(scrub_epoch, scrubbing_tags) :
+ make_message<MMDSScrubStats>(scrub_epoch);
+ mds->send_message_mds(m, r);
+ }
+
+ if (any_finished)
+ clog_scrub_summary();
+ if (any_repaired)
+ mdcache->mds->mdlog->trim_all();
+}
+
+void ScrubStack::handle_mds_failure(mds_rank_t mds)
+{
+ if (mds == 0) {
+ scrub_abort(nullptr);
+ return;
+ }
+
+ bool kick = false;
+ for (auto it = remote_scrubs.begin(); it != remote_scrubs.end(); ) {
+ if (it->second.gather_set.erase(mds) &&
+ it->second.gather_set.empty()) {
+ CInode *in = it->first;
+ remote_scrubs.erase(it++);
+ remove_from_waiting(in, false);
+ kick = true;
+ } else {
+ ++it;
+ }
+ }
+ if (kick)
+ kick_off_scrubs();
+}