]> git.proxmox.com Git - ceph.git/blob - ceph/src/mds/ScrubStack.cc
update ceph source to reef 18.2.1
[ceph.git] / ceph / src / mds / ScrubStack.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2014 Red Hat
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #include "ScrubStack.h"
16 #include "common/Finisher.h"
17 #include "mds/MDSRank.h"
18 #include "mds/MDCache.h"
19 #include "mds/MDSContinuation.h"
20
21 #define dout_context g_ceph_context
22 #define dout_subsys ceph_subsys_mds
23 #undef dout_prefix
24 #define dout_prefix _prefix(_dout, mdcache->mds)
25
26 using namespace std;
27
28 static std::ostream& _prefix(std::ostream *_dout, MDSRank *mds) {
29 return *_dout << "mds." << mds->get_nodeid() << ".scrubstack ";
30 }
31
32 std::ostream &operator<<(std::ostream &os, const ScrubStack::State &state) {
33 switch(state) {
34 case ScrubStack::STATE_RUNNING:
35 os << "RUNNING";
36 break;
37 case ScrubStack::STATE_IDLE:
38 os << "IDLE";
39 break;
40 case ScrubStack::STATE_PAUSING:
41 os << "PAUSING";
42 break;
43 case ScrubStack::STATE_PAUSED:
44 os << "PAUSED";
45 break;
46 default:
47 ceph_abort();
48 }
49
50 return os;
51 }
52
53 void ScrubStack::dequeue(MDSCacheObject *obj)
54 {
55 dout(20) << "dequeue " << *obj << " from ScrubStack" << dendl;
56 ceph_assert(obj->item_scrub.is_on_list());
57 obj->put(MDSCacheObject::PIN_SCRUBQUEUE);
58 obj->item_scrub.remove_myself();
59 stack_size--;
60 }
61
62 int ScrubStack::_enqueue(MDSCacheObject *obj, ScrubHeaderRef& header, bool top)
63 {
64 ceph_assert(ceph_mutex_is_locked_by_me(mdcache->mds->mds_lock));
65 if (CInode *in = dynamic_cast<CInode*>(obj)) {
66 if (in->scrub_is_in_progress()) {
67 dout(10) << __func__ << " with {" << *in << "}" << ", already in scrubbing" << dendl;
68 return -CEPHFS_EBUSY;
69 }
70 if(in->state_test(CInode::STATE_PURGING)) {
71 dout(10) << *obj << " is purging, skip pushing into scrub stack" << dendl;
72 // treating this as success since purge will make sure this inode goes away
73 return 0;
74 }
75
76 dout(10) << __func__ << " with {" << *in << "}" << ", top=" << top << dendl;
77 in->scrub_initialize(header);
78 } else if (CDir *dir = dynamic_cast<CDir*>(obj)) {
79 if (dir->scrub_is_in_progress()) {
80 dout(10) << __func__ << " with {" << *dir << "}" << ", already in scrubbing" << dendl;
81 return -CEPHFS_EBUSY;
82 }
83 if(dir->get_inode()->state_test(CInode::STATE_PURGING)) {
84 dout(10) << *obj << " is purging, skip pushing into scrub stack" << dendl;
85 // treating this as success since purge will make sure this dir inode goes away
86 return 0;
87 }
88
89 dout(10) << __func__ << " with {" << *dir << "}" << ", top=" << top << dendl;
90 // The edge directory must be in memory
91 dir->auth_pin(this);
92 dir->scrub_initialize(header);
93 } else {
94 ceph_assert(0 == "queue dentry to scrub stack");
95 }
96
97 dout(20) << "enqueue " << *obj << " to " << (top ? "top" : "bottom") << " of ScrubStack" << dendl;
98 if (!obj->item_scrub.is_on_list()) {
99 obj->get(MDSCacheObject::PIN_SCRUBQUEUE);
100 stack_size++;
101 }
102 if (top)
103 scrub_stack.push_front(&obj->item_scrub);
104 else
105 scrub_stack.push_back(&obj->item_scrub);
106 return 0;
107 }
108
109 int ScrubStack::enqueue(CInode *in, ScrubHeaderRef& header, bool top)
110 {
111 // abort in progress
112 if (clear_stack)
113 return -CEPHFS_EAGAIN;
114
115 header->set_origin(in->ino());
116 auto ret = scrubbing_map.emplace(header->get_tag(), header);
117 if (!ret.second) {
118 dout(10) << __func__ << " with {" << *in << "}"
119 << ", conflicting tag " << header->get_tag() << dendl;
120 return -CEPHFS_EEXIST;
121 }
122 if (header->get_scrub_mdsdir()) {
123 filepath fp;
124 mds_rank_t rank;
125 rank = mdcache->mds->get_nodeid();
126 if(rank >= 0 && rank < MAX_MDS) {
127 fp.set_path("", MDS_INO_MDSDIR(rank));
128 }
129 int r = _enqueue(mdcache->get_inode(fp.get_ino()), header, true);
130 if (r < 0) {
131 return r;
132 }
133 //to make sure mdsdir is always on the top
134 top = false;
135 }
136 int r = _enqueue(in, header, top);
137 if (r < 0)
138 return r;
139
140 clog_scrub_summary(in);
141
142 kick_off_scrubs();
143 return 0;
144 }
145
146 void ScrubStack::add_to_waiting(MDSCacheObject *obj)
147 {
148 scrubs_in_progress++;
149 obj->item_scrub.remove_myself();
150 scrub_waiting.push_back(&obj->item_scrub);
151 }
152
153 void ScrubStack::remove_from_waiting(MDSCacheObject *obj, bool kick)
154 {
155 scrubs_in_progress--;
156 if (obj->item_scrub.is_on_list()) {
157 obj->item_scrub.remove_myself();
158 scrub_stack.push_front(&obj->item_scrub);
159 if (kick)
160 kick_off_scrubs();
161 }
162 }
163
164 class C_RetryScrub : public MDSInternalContext {
165 public:
166 C_RetryScrub(ScrubStack *s, MDSCacheObject *o) :
167 MDSInternalContext(s->mdcache->mds), stack(s), obj(o) {
168 stack->add_to_waiting(obj);
169 }
170 void finish(int r) override {
171 stack->remove_from_waiting(obj);
172 }
173 private:
174 ScrubStack *stack;
175 MDSCacheObject *obj;
176 };
177
178 void ScrubStack::kick_off_scrubs()
179 {
180 ceph_assert(ceph_mutex_is_locked(mdcache->mds->mds_lock));
181 dout(20) << __func__ << ": state=" << state << dendl;
182
183 if (clear_stack || state == STATE_PAUSING || state == STATE_PAUSED) {
184 if (scrubs_in_progress == 0) {
185 dout(10) << __func__ << ": in progress scrub operations finished, "
186 << stack_size << " in the stack" << dendl;
187
188 State final_state = state;
189 if (clear_stack) {
190 abort_pending_scrubs();
191 final_state = STATE_IDLE;
192 }
193 if (state == STATE_PAUSING) {
194 final_state = STATE_PAUSED;
195 }
196
197 set_state(final_state);
198 complete_control_contexts(0);
199 }
200
201 return;
202 }
203
204 dout(20) << __func__ << " entering with " << scrubs_in_progress << " in "
205 "progress and " << stack_size << " in the stack" << dendl;
206 elist<MDSCacheObject*>::iterator it = scrub_stack.begin();
207 while (g_conf()->mds_max_scrub_ops_in_progress > scrubs_in_progress) {
208 if (it.end()) {
209 if (scrubs_in_progress == 0) {
210 set_state(STATE_IDLE);
211 }
212
213 return;
214 }
215
216 assert(state == STATE_RUNNING || state == STATE_IDLE);
217 set_state(STATE_RUNNING);
218
219 if (CInode *in = dynamic_cast<CInode*>(*it)) {
220 dout(20) << __func__ << " examining " << *in << dendl;
221 ++it;
222
223 if (!validate_inode_auth(in))
224 continue;
225
226 if (!in->is_dir()) {
227 // it's a regular file, symlink, or hard link
228 dequeue(in); // we only touch it this once, so remove from stack
229
230 scrub_file_inode(in);
231 } else {
232 bool added_children = false;
233 bool done = false; // it's done, so pop it off the stack
234 scrub_dir_inode(in, &added_children, &done);
235 if (done) {
236 dout(20) << __func__ << " dir inode, done" << dendl;
237 dequeue(in);
238 }
239 if (added_children) {
240 // dirfrags were queued at top of stack
241 it = scrub_stack.begin();
242 }
243 }
244 } else if (CDir *dir = dynamic_cast<CDir*>(*it)) {
245 auto next = it;
246 ++next;
247 bool done = false; // it's done, so pop it off the stack
248 scrub_dirfrag(dir, &done);
249 if (done) {
250 dout(20) << __func__ << " dirfrag, done" << dendl;
251 ++it; // child inodes were queued at bottom of stack
252 dequeue(dir);
253 } else {
254 it = next;
255 }
256 } else {
257 ceph_assert(0 == "dentry in scrub stack");
258 }
259 }
260 }
261
262 bool ScrubStack::validate_inode_auth(CInode *in)
263 {
264 if (in->is_auth()) {
265 if (!in->can_auth_pin()) {
266 dout(10) << __func__ << " can't auth pin" << dendl;
267 in->add_waiter(CInode::WAIT_UNFREEZE, new C_RetryScrub(this, in));
268 return false;
269 }
270 return true;
271 } else {
272 MDSRank *mds = mdcache->mds;
273 if (in->is_ambiguous_auth()) {
274 dout(10) << __func__ << " ambiguous auth" << dendl;
275 in->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, new C_RetryScrub(this, in));
276 } else if (mds->is_cluster_degraded()) {
277 dout(20) << __func__ << " cluster degraded" << dendl;
278 mds->wait_for_cluster_recovered(new C_RetryScrub(this, in));
279 } else {
280 ScrubHeaderRef header = in->get_scrub_header();
281 ceph_assert(header);
282
283 auto ret = remote_scrubs.emplace(std::piecewise_construct,
284 std::forward_as_tuple(in),
285 std::forward_as_tuple());
286 ceph_assert(ret.second); // FIXME: parallel scrubs?
287 auto &scrub_r = ret.first->second;
288 scrub_r.tag = header->get_tag();
289
290 mds_rank_t auth = in->authority().first;
291 dout(10) << __func__ << " forward to mds." << auth << dendl;
292 auto r = make_message<MMDSScrub>(MMDSScrub::OP_QUEUEINO, in->ino(),
293 std::move(in->scrub_queued_frags()),
294 header->get_tag(), header->get_origin(),
295 header->is_internal_tag(), header->get_force(),
296 header->get_recursive(), header->get_repair());
297 mdcache->mds->send_message_mds(r, auth);
298
299 scrub_r.gather_set.insert(auth);
300 // wait for ACK
301 add_to_waiting(in);
302 }
303 return false;
304 }
305 }
306
307 void ScrubStack::scrub_dir_inode(CInode *in, bool *added_children, bool *done)
308 {
309 dout(10) << __func__ << " " << *in << dendl;
310 ceph_assert(in->is_auth());
311 MDSRank *mds = mdcache->mds;
312
313 ScrubHeaderRef header = in->get_scrub_header();
314 ceph_assert(header);
315
316 MDSGatherBuilder gather(g_ceph_context);
317
318 auto &queued = in->scrub_queued_frags();
319 std::map<mds_rank_t, fragset_t> scrub_remote;
320
321 frag_vec_t frags;
322 in->dirfragtree.get_leaves(frags);
323 dout(20) << __func__ << "recursive mode, frags " << frags << dendl;
324 for (auto &fg : frags) {
325 if (queued.contains(fg))
326 continue;
327 CDir *dir = in->get_or_open_dirfrag(mdcache, fg);
328 if (!dir->is_auth()) {
329 if (dir->is_ambiguous_auth()) {
330 dout(20) << __func__ << " ambiguous auth " << *dir << dendl;
331 dir->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, gather.new_sub());
332 } else if (mds->is_cluster_degraded()) {
333 dout(20) << __func__ << " cluster degraded" << dendl;
334 mds->wait_for_cluster_recovered(gather.new_sub());
335 } else {
336 mds_rank_t auth = dir->authority().first;
337 scrub_remote[auth].insert_raw(fg);
338 }
339 } else if (!dir->can_auth_pin()) {
340 dout(20) << __func__ << " freezing/frozen " << *dir << dendl;
341 dir->add_waiter(CDir::WAIT_UNFREEZE, gather.new_sub());
342 } else if (dir->get_version() == 0) {
343 dout(20) << __func__ << " barebones " << *dir << dendl;
344 dir->fetch_keys({}, gather.new_sub());
345 } else {
346 _enqueue(dir, header, true);
347 queued.insert_raw(dir->get_frag());
348 *added_children = true;
349 }
350 }
351
352 queued.simplify();
353
354 if (gather.has_subs()) {
355 gather.set_finisher(new C_RetryScrub(this, in));
356 gather.activate();
357 return;
358 }
359
360 if (!scrub_remote.empty()) {
361 auto ret = remote_scrubs.emplace(std::piecewise_construct,
362 std::forward_as_tuple(in),
363 std::forward_as_tuple());
364 ceph_assert(ret.second); // FIXME: parallel scrubs?
365 auto &scrub_r = ret.first->second;
366 scrub_r.tag = header->get_tag();
367
368 for (auto& p : scrub_remote) {
369 p.second.simplify();
370 dout(20) << __func__ << " forward " << p.second << " to mds." << p.first << dendl;
371 auto r = make_message<MMDSScrub>(MMDSScrub::OP_QUEUEDIR, in->ino(),
372 std::move(p.second), header->get_tag(),
373 header->get_origin(), header->is_internal_tag(),
374 header->get_force(), header->get_recursive(),
375 header->get_repair());
376 mds->send_message_mds(r, p.first);
377 scrub_r.gather_set.insert(p.first);
378 }
379 // wait for ACKs
380 add_to_waiting(in);
381 return;
382 }
383
384 scrub_dir_inode_final(in);
385
386 *done = true;
387 dout(10) << __func__ << " done" << dendl;
388 }
389
390 class C_InodeValidated : public MDSInternalContext
391 {
392 public:
393 ScrubStack *stack;
394 CInode::validated_data result;
395 CInode *target;
396
397 C_InodeValidated(MDSRank *mds, ScrubStack *stack_, CInode *target_)
398 : MDSInternalContext(mds), stack(stack_), target(target_)
399 {
400 stack->scrubs_in_progress++;
401 }
402 void finish(int r) override {
403 stack->_validate_inode_done(target, r, result);
404 stack->scrubs_in_progress--;
405 stack->kick_off_scrubs();
406 }
407 };
408
409 void ScrubStack::scrub_dir_inode_final(CInode *in)
410 {
411 dout(20) << __func__ << " " << *in << dendl;
412
413 C_InodeValidated *fin = new C_InodeValidated(mdcache->mds, this, in);
414 in->validate_disk_state(&fin->result, fin);
415 return;
416 }
417
418 void ScrubStack::scrub_dirfrag(CDir *dir, bool *done)
419 {
420 ceph_assert(dir != NULL);
421
422 dout(10) << __func__ << " " << *dir << dendl;
423
424 if (!dir->is_complete()) {
425 dir->fetch(new C_RetryScrub(this, dir), true); // already auth pinned
426 dout(10) << __func__ << " incomplete, fetching" << dendl;
427 return;
428 }
429
430 ScrubHeaderRef header = dir->get_scrub_header();
431 version_t last_scrub = dir->scrub_info()->last_recursive.version;
432 if (header->get_recursive()) {
433 auto next_seq = mdcache->get_global_snaprealm()->get_newest_seq()+1;
434 for (auto it = dir->begin(); it != dir->end(); /* nop */) {
435 auto [dnk, dn] = *it;
436 ++it; /* trim (in the future) may remove dentry */
437
438 if (dn->scrub(next_seq)) {
439 std::string path;
440 dir->get_inode()->make_path_string(path, true);
441 clog->warn() << "Scrub error on dentry " << *dn
442 << " see " << g_conf()->name
443 << " log and `damage ls` output for details";
444 }
445
446 if (dnk.snapid != CEPH_NOSNAP) {
447 continue;
448 }
449
450 CDentry::linkage_t *dnl = dn->get_linkage();
451 if (dn->get_version() <= last_scrub &&
452 dnl->get_remote_d_type() != DT_DIR &&
453 !header->get_force()) {
454 dout(15) << __func__ << " skip dentry " << dnk
455 << ", no change since last scrub" << dendl;
456 continue;
457 }
458 if (dnl->is_primary()) {
459 _enqueue(dnl->get_inode(), header, false);
460 } else if (dnl->is_remote()) {
461 // TODO: check remote linkage
462 }
463 }
464 }
465
466 if (!dir->scrub_local()) {
467 std::string path;
468 dir->get_inode()->make_path_string(path, true);
469 clog->warn() << "Scrub error on dir " << dir->ino()
470 << " (" << path << ") see " << g_conf()->name
471 << " log and `damage ls` output for details";
472 }
473
474 dir->scrub_finished();
475 dir->auth_unpin(this);
476
477 *done = true;
478 dout(10) << __func__ << " done" << dendl;
479 }
480
481 void ScrubStack::scrub_file_inode(CInode *in)
482 {
483 C_InodeValidated *fin = new C_InodeValidated(mdcache->mds, this, in);
484 // At this stage the DN is already past scrub_initialize, so
485 // it's in the cache, it has PIN_SCRUBQUEUE and it is authpinned
486 in->validate_disk_state(&fin->result, fin);
487 }
488
489 void ScrubStack::_validate_inode_done(CInode *in, int r,
490 const CInode::validated_data &result)
491 {
492 LogChannelRef clog = mdcache->mds->clog;
493 const ScrubHeaderRefConst header = in->scrub_info()->header;
494
495 std::string path;
496 if (!result.passed_validation) {
497 // Build path string for use in messages
498 in->make_path_string(path, true);
499 }
500
501 if (result.backtrace.checked && !result.backtrace.passed &&
502 !result.backtrace.repaired)
503 {
504 // Record backtrace fails as remote linkage damage, as
505 // we may not be able to resolve hard links to this inode
506 mdcache->mds->damage_table.notify_remote_damaged(in->ino(), path);
507 } else if (result.inode.checked && !result.inode.passed &&
508 !result.inode.repaired) {
509 // Record damaged inode structures as damaged dentries as
510 // that is where they are stored
511 auto parent = in->get_projected_parent_dn();
512 if (parent) {
513 auto dir = parent->get_dir();
514 mdcache->mds->damage_table.notify_dentry(
515 dir->inode->ino(), dir->frag, parent->last, parent->get_name(), path);
516 }
517 }
518
519 // Inform the cluster log if we found an error
520 if (!result.passed_validation) {
521 if (result.all_damage_repaired()) {
522 clog->info() << "Scrub repaired inode " << in->ino()
523 << " (" << path << ")";
524 } else {
525 clog->warn() << "Scrub error on inode " << in->ino()
526 << " (" << path << ") see " << g_conf()->name
527 << " log and `damage ls` output for details";
528 }
529
530 // Put the verbose JSON output into the MDS log for later inspection
531 JSONFormatter f;
532 result.dump(&f);
533 CachedStackStringStream css;
534 f.flush(*css);
535 derr << __func__ << " scrub error on inode " << *in << ": " << css->strv()
536 << dendl;
537 } else {
538 dout(10) << __func__ << " scrub passed on inode " << *in << dendl;
539 }
540
541 in->scrub_finished();
542 }
543
544 void ScrubStack::complete_control_contexts(int r) {
545 ceph_assert(ceph_mutex_is_locked_by_me(mdcache->mds->mds_lock));
546
547 for (auto &ctx : control_ctxs) {
548 ctx->complete(r);
549 }
550 control_ctxs.clear();
551 }
552
553 void ScrubStack::set_state(State next_state) {
554 if (state != next_state) {
555 dout(20) << __func__ << ", from state=" << state << ", to state="
556 << next_state << dendl;
557 state = next_state;
558 clog_scrub_summary();
559 }
560 }
561
562 bool ScrubStack::scrub_in_transition_state() {
563 ceph_assert(ceph_mutex_is_locked_by_me(mdcache->mds->mds_lock));
564 dout(20) << __func__ << ": state=" << state << dendl;
565
566 // STATE_RUNNING is considered as a transition state so as to
567 // "delay" the scrub control operation.
568 if (state == STATE_RUNNING || state == STATE_PAUSING) {
569 return true;
570 }
571
572 return false;
573 }
574
575 std::string_view ScrubStack::scrub_summary() {
576 ceph_assert(ceph_mutex_is_locked_by_me(mdcache->mds->mds_lock));
577
578 bool have_more = false;
579 CachedStackStringStream cs;
580
581 if (state == STATE_IDLE) {
582 if (scrubbing_map.empty())
583 return "idle";
584 *cs << "idle+waiting";
585 }
586
587 if (state == STATE_RUNNING) {
588 if (clear_stack) {
589 *cs << "aborting";
590 } else {
591 *cs << "active";
592 }
593 } else {
594 if (state == STATE_PAUSING) {
595 have_more = true;
596 *cs << "pausing";
597 } else if (state == STATE_PAUSED) {
598 have_more = true;
599 *cs << "paused";
600 }
601
602 if (clear_stack) {
603 if (have_more) {
604 *cs << "+";
605 }
606 *cs << "aborting";
607 }
608 }
609
610 if (!scrubbing_map.empty()) {
611 *cs << " paths [";
612 bool first = true;
613 for (auto &p : scrubbing_map) {
614 if (!first)
615 *cs << ",";
616 auto& header = p.second;
617 if (CInode *in = mdcache->get_inode(header->get_origin()))
618 *cs << scrub_inode_path(in);
619 else
620 *cs << "#" << header->get_origin();
621 first = false;
622 }
623 *cs << "]";
624 }
625
626 return cs->strv();
627 }
628
629 void ScrubStack::scrub_status(Formatter *f) {
630 ceph_assert(ceph_mutex_is_locked_by_me(mdcache->mds->mds_lock));
631
632 f->open_object_section("result");
633
634 CachedStackStringStream css;
635 bool have_more = false;
636
637 if (state == STATE_IDLE) {
638 if (scrubbing_map.empty())
639 *css << "no active scrubs running";
640 else
641 *css << state << " (waiting for more scrubs)";
642 } else if (state == STATE_RUNNING) {
643 if (clear_stack) {
644 *css << "ABORTING";
645 } else {
646 *css << "scrub active";
647 }
648 *css << " (" << stack_size << " inodes in the stack)";
649 } else {
650 if (state == STATE_PAUSING || state == STATE_PAUSED) {
651 have_more = true;
652 *css << state;
653 }
654 if (clear_stack) {
655 if (have_more) {
656 *css << "+";
657 }
658 *css << "ABORTING";
659 }
660
661 *css << " (" << stack_size << " inodes in the stack)";
662 }
663 f->dump_string("status", css->strv());
664
665 f->open_object_section("scrubs");
666
667 for (auto& p : scrubbing_map) {
668 have_more = false;
669 auto& header = p.second;
670
671 std::string tag(header->get_tag());
672 f->open_object_section(tag.c_str()); // scrub id
673
674 if (CInode *in = mdcache->get_inode(header->get_origin()))
675 f->dump_string("path", scrub_inode_path(in));
676 else
677 f->dump_stream("path") << "#" << header->get_origin();
678
679 f->dump_string("tag", header->get_tag());
680
681 CachedStackStringStream optcss;
682 if (header->get_recursive()) {
683 *optcss << "recursive";
684 have_more = true;
685 }
686 if (header->get_repair()) {
687 if (have_more) {
688 *optcss << ",";
689 }
690 *optcss << "repair";
691 have_more = true;
692 }
693 if (header->get_force()) {
694 if (have_more) {
695 *optcss << ",";
696 }
697 *optcss << "force";
698 }
699 if (header->get_scrub_mdsdir()) {
700 if (have_more) {
701 *optcss << ",";
702 }
703 *optcss << "scrub_mdsdir";
704 }
705
706 f->dump_string("options", optcss->strv());
707 f->close_section(); // scrub id
708 }
709 f->close_section(); // scrubs
710 f->close_section(); // result
711 }
712
713 void ScrubStack::abort_pending_scrubs() {
714 ceph_assert(ceph_mutex_is_locked_by_me(mdcache->mds->mds_lock));
715 ceph_assert(clear_stack);
716
717 auto abort_one = [this](MDSCacheObject *obj) {
718 if (CInode *in = dynamic_cast<CInode*>(obj)) {
719 in->scrub_aborted();
720 } else if (CDir *dir = dynamic_cast<CDir*>(obj)) {
721 dir->scrub_aborted();
722 dir->auth_unpin(this);
723 } else {
724 ceph_abort(0 == "dentry in scrub stack");
725 }
726 };
727 for (auto it = scrub_stack.begin(); !it.end(); ++it)
728 abort_one(*it);
729 for (auto it = scrub_waiting.begin(); !it.end(); ++it)
730 abort_one(*it);
731
732 stack_size = 0;
733 scrub_stack.clear();
734 scrub_waiting.clear();
735
736 for (auto& p : remote_scrubs)
737 remove_from_waiting(p.first, false);
738 remote_scrubs.clear();
739
740 clear_stack = false;
741 }
742
743 void ScrubStack::send_state_message(int op) {
744 MDSRank *mds = mdcache->mds;
745 set<mds_rank_t> up_mds;
746 mds->get_mds_map()->get_up_mds_set(up_mds);
747 for (auto& r : up_mds) {
748 if (r == 0)
749 continue;
750 auto m = make_message<MMDSScrub>(op);
751 mds->send_message_mds(m, r);
752 }
753 }
754
755 void ScrubStack::scrub_abort(Context *on_finish) {
756 ceph_assert(ceph_mutex_is_locked_by_me(mdcache->mds->mds_lock));
757
758 dout(10) << __func__ << ": aborting with " << scrubs_in_progress
759 << " scrubs in progress and " << stack_size << " in the"
760 << " stack" << dendl;
761
762 if (mdcache->mds->get_nodeid() == 0) {
763 scrub_epoch_last_abort = scrub_epoch;
764 scrub_any_peer_aborting = true;
765 send_state_message(MMDSScrub::OP_ABORT);
766 }
767
768 clear_stack = true;
769 if (scrub_in_transition_state()) {
770 if (on_finish)
771 control_ctxs.push_back(on_finish);
772 return;
773 }
774
775 abort_pending_scrubs();
776 if (state != STATE_PAUSED)
777 set_state(STATE_IDLE);
778
779 if (on_finish)
780 on_finish->complete(0);
781 }
782
783 void ScrubStack::scrub_pause(Context *on_finish) {
784 ceph_assert(ceph_mutex_is_locked_by_me(mdcache->mds->mds_lock));
785
786 dout(10) << __func__ << ": pausing with " << scrubs_in_progress
787 << " scrubs in progress and " << stack_size << " in the"
788 << " stack" << dendl;
789
790 if (mdcache->mds->get_nodeid() == 0)
791 send_state_message(MMDSScrub::OP_PAUSE);
792
793 // abort is in progress
794 if (clear_stack) {
795 if (on_finish)
796 on_finish->complete(-CEPHFS_EINVAL);
797 return;
798 }
799
800 bool done = scrub_in_transition_state();
801 if (done) {
802 set_state(STATE_PAUSING);
803 if (on_finish)
804 control_ctxs.push_back(on_finish);
805 return;
806 }
807
808 set_state(STATE_PAUSED);
809 if (on_finish)
810 on_finish->complete(0);
811 }
812
813 bool ScrubStack::scrub_resume() {
814 ceph_assert(ceph_mutex_is_locked_by_me(mdcache->mds->mds_lock));
815 dout(20) << __func__ << ": state=" << state << dendl;
816
817 if (mdcache->mds->get_nodeid() == 0)
818 send_state_message(MMDSScrub::OP_RESUME);
819
820 int r = 0;
821
822 if (clear_stack) {
823 r = -CEPHFS_EINVAL;
824 } else if (state == STATE_PAUSING) {
825 set_state(STATE_RUNNING);
826 complete_control_contexts(-CEPHFS_ECANCELED);
827 } else if (state == STATE_PAUSED) {
828 set_state(STATE_RUNNING);
829 kick_off_scrubs();
830 }
831
832 return r;
833 }
834
835 // send current scrub summary to cluster log
836 void ScrubStack::clog_scrub_summary(CInode *in) {
837 if (in) {
838 std::string what;
839 if (clear_stack) {
840 what = "aborted";
841 } else if (in->scrub_is_in_progress()) {
842 what = "queued";
843 } else {
844 what = "completed";
845 }
846 clog->info() << "scrub " << what << " for path: " << scrub_inode_path(in);
847 }
848
849 clog->info() << "scrub summary: " << scrub_summary();
850 }
851
852 void ScrubStack::dispatch(const cref_t<Message> &m)
853 {
854 switch (m->get_type()) {
855 case MSG_MDS_SCRUB:
856 handle_scrub(ref_cast<MMDSScrub>(m));
857 break;
858
859 case MSG_MDS_SCRUB_STATS:
860 handle_scrub_stats(ref_cast<MMDSScrubStats>(m));
861 break;
862
863 default:
864 derr << " scrub stack unknown message " << m->get_type() << dendl_impl;
865 ceph_abort_msg("scrub stack unknown message");
866 }
867 }
868
869 bool ScrubStack::remove_inode_if_stacked(CInode *in) {
870 MDSCacheObject *obj = dynamic_cast<MDSCacheObject*>(in);
871 if(obj->item_scrub.is_on_list()) {
872 dout(20) << "removing inode " << *in << " from scrub_stack" << dendl;
873 obj->put(MDSCacheObject::PIN_SCRUBQUEUE);
874 obj->item_scrub.remove_myself();
875 stack_size--;
876 return true;
877 }
878 return false;
879 }
880
881 void ScrubStack::handle_scrub(const cref_t<MMDSScrub> &m)
882 {
883
884 mds_rank_t from = mds_rank_t(m->get_source().num());
885 dout(10) << __func__ << " " << *m << " from mds." << from << dendl;
886
887 switch (m->get_op()) {
888 case MMDSScrub::OP_QUEUEDIR:
889 {
890 CInode *diri = mdcache->get_inode(m->get_ino());
891 ceph_assert(diri);
892
893 std::vector<CDir*> dfs;
894 MDSGatherBuilder gather(g_ceph_context);
895 for (const auto& fg : m->get_frags()) {
896 CDir *dir = diri->get_dirfrag(fg);
897 if (!dir) {
898 dout(10) << __func__ << " no frag " << fg << dendl;
899 continue;
900 }
901 if (!dir->is_auth()) {
902 dout(10) << __func__ << " not auth " << *dir << dendl;
903 continue;
904 }
905 if (!dir->can_auth_pin()) {
906 dout(10) << __func__ << " can't auth pin " << *dir << dendl;
907 dir->add_waiter(CDir::WAIT_UNFREEZE, gather.new_sub());
908 continue;
909 }
910 dfs.push_back(dir);
911 }
912
913 if (gather.has_subs()) {
914 gather.set_finisher(new C_MDS_RetryMessage(mdcache->mds, m));
915 gather.activate();
916 return;
917 }
918
919 fragset_t queued;
920 if (!dfs.empty()) {
921 ScrubHeaderRef header;
922 if (auto it = scrubbing_map.find(m->get_tag()); it != scrubbing_map.end()) {
923 header = it->second;
924 } else {
925 header = std::make_shared<ScrubHeader>(m->get_tag(), m->is_internal_tag(),
926 m->is_force(), m->is_recursive(),
927 m->is_repair());
928 header->set_origin(m->get_origin());
929 scrubbing_map.emplace(header->get_tag(), header);
930 }
931 for (auto dir : dfs) {
932 queued.insert_raw(dir->get_frag());
933 _enqueue(dir, header, true);
934 }
935 queued.simplify();
936 kick_off_scrubs();
937 }
938
939 auto r = make_message<MMDSScrub>(MMDSScrub::OP_QUEUEDIR_ACK, m->get_ino(),
940 std::move(queued), m->get_tag());
941 mdcache->mds->send_message_mds(r, from);
942 }
943 break;
944 case MMDSScrub::OP_QUEUEDIR_ACK:
945 {
946 CInode *diri = mdcache->get_inode(m->get_ino());
947 ceph_assert(diri);
948 auto it = remote_scrubs.find(diri);
949 if (it != remote_scrubs.end() &&
950 m->get_tag() == it->second.tag) {
951 if (it->second.gather_set.erase(from)) {
952 auto &queued = diri->scrub_queued_frags();
953 for (auto &fg : m->get_frags())
954 queued.insert_raw(fg);
955 queued.simplify();
956
957 if (it->second.gather_set.empty()) {
958 remote_scrubs.erase(it);
959
960 const auto& header = diri->get_scrub_header();
961 header->set_epoch_last_forwarded(scrub_epoch);
962 remove_from_waiting(diri);
963 }
964 }
965 }
966 }
967 break;
968 case MMDSScrub::OP_QUEUEINO:
969 {
970 CInode *in = mdcache->get_inode(m->get_ino());
971 ceph_assert(in);
972
973 ScrubHeaderRef header;
974 if (auto it = scrubbing_map.find(m->get_tag()); it != scrubbing_map.end()) {
975 header = it->second;
976 } else {
977 header = std::make_shared<ScrubHeader>(m->get_tag(), m->is_internal_tag(),
978 m->is_force(), m->is_recursive(),
979 m->is_repair());
980 header->set_origin(m->get_origin());
981 scrubbing_map.emplace(header->get_tag(), header);
982 }
983
984 _enqueue(in, header, true);
985 in->scrub_queued_frags() = m->get_frags();
986 kick_off_scrubs();
987
988 fragset_t queued;
989 auto r = make_message<MMDSScrub>(MMDSScrub::OP_QUEUEINO_ACK, m->get_ino(),
990 std::move(queued), m->get_tag());
991 mdcache->mds->send_message_mds(r, from);
992 }
993 break;
994 case MMDSScrub::OP_QUEUEINO_ACK:
995 {
996 CInode *in = mdcache->get_inode(m->get_ino());
997 ceph_assert(in);
998 auto it = remote_scrubs.find(in);
999 if (it != remote_scrubs.end() &&
1000 m->get_tag() == it->second.tag &&
1001 it->second.gather_set.erase(from)) {
1002 ceph_assert(it->second.gather_set.empty());
1003 remote_scrubs.erase(it);
1004
1005 remove_from_waiting(in, false);
1006 dequeue(in);
1007
1008 const auto& header = in->get_scrub_header();
1009 header->set_epoch_last_forwarded(scrub_epoch);
1010 in->scrub_finished();
1011
1012 kick_off_scrubs();
1013 }
1014 }
1015 break;
1016 case MMDSScrub::OP_ABORT:
1017 scrub_abort(nullptr);
1018 break;
1019 case MMDSScrub::OP_PAUSE:
1020 scrub_pause(nullptr);
1021 break;
1022 case MMDSScrub::OP_RESUME:
1023 scrub_resume();
1024 break;
1025 default:
1026 derr << " scrub stack unknown scrub operation " << m->get_op() << dendl_impl;
1027 ceph_abort_msg("scrub stack unknown scrub operation");
1028 }
1029 }
1030
1031 void ScrubStack::handle_scrub_stats(const cref_t<MMDSScrubStats> &m)
1032 {
1033 mds_rank_t from = mds_rank_t(m->get_source().num());
1034 dout(7) << __func__ << " " << *m << " from mds." << from << dendl;
1035
1036 if (from == 0) {
1037 if (scrub_epoch != m->get_epoch() - 1) {
1038 scrub_epoch = m->get_epoch() - 1;
1039 for (auto& p : scrubbing_map) {
1040 if (p.second->get_epoch_last_forwarded())
1041 p.second->set_epoch_last_forwarded(scrub_epoch);
1042 }
1043 }
1044 bool any_finished = false;
1045 bool any_repaired = false;
1046 std::set<std::string> scrubbing_tags;
1047 for (auto it = scrubbing_map.begin(); it != scrubbing_map.end(); ) {
1048 auto& header = it->second;
1049 if (header->get_num_pending() ||
1050 header->get_epoch_last_forwarded() >= scrub_epoch) {
1051 scrubbing_tags.insert(it->first);
1052 ++it;
1053 } else if (m->is_finished(it->first)) {
1054 any_finished = true;
1055 if (header->get_repaired())
1056 any_repaired = true;
1057 scrubbing_map.erase(it++);
1058 } else {
1059 ++it;
1060 }
1061 }
1062
1063 scrub_epoch = m->get_epoch();
1064
1065 auto ack = make_message<MMDSScrubStats>(scrub_epoch,
1066 std::move(scrubbing_tags), clear_stack);
1067 mdcache->mds->send_message_mds(ack, 0);
1068
1069 if (any_finished)
1070 clog_scrub_summary();
1071 if (any_repaired)
1072 mdcache->mds->mdlog->trim_all();
1073 } else {
1074 if (scrub_epoch == m->get_epoch() &&
1075 (size_t)from < mds_scrub_stats.size()) {
1076 auto& stat = mds_scrub_stats[from];
1077 stat.epoch_acked = m->get_epoch();
1078 stat.scrubbing_tags = m->get_scrubbing_tags();
1079 stat.aborting = m->is_aborting();
1080 }
1081 }
1082 }
1083
1084 void ScrubStack::advance_scrub_status()
1085 {
1086 if (!scrub_any_peer_aborting && scrubbing_map.empty())
1087 return;
1088
1089 MDSRank *mds = mdcache->mds;
1090
1091 set<mds_rank_t> up_mds;
1092 mds->get_mds_map()->get_up_mds_set(up_mds);
1093 auto up_max = *up_mds.rbegin();
1094
1095 bool update_scrubbing = false;
1096 std::set<std::string> scrubbing_tags;
1097
1098 if (up_max == 0) {
1099 update_scrubbing = true;
1100 scrub_any_peer_aborting = false;
1101 } else if (mds_scrub_stats.size() > (size_t)(up_max)) {
1102 bool any_aborting = false;
1103 bool fully_acked = true;
1104 for (const auto& stat : mds_scrub_stats) {
1105 if (stat.aborting || stat.epoch_acked <= scrub_epoch_last_abort)
1106 any_aborting = true;
1107 if (stat.epoch_acked != scrub_epoch) {
1108 fully_acked = false;
1109 continue;
1110 }
1111 scrubbing_tags.insert(stat.scrubbing_tags.begin(),
1112 stat.scrubbing_tags.end());
1113 }
1114 if (!any_aborting)
1115 scrub_any_peer_aborting = false;
1116 if (fully_acked) {
1117 // handle_scrub_stats() reports scrub is still in-progress if it has
1118 // forwarded any object to other mds since previous epoch. Let's assume,
1119 // at time 'A', we got scrub stats from all mds for previous epoch. If
1120 // a scrub is not reported by any mds, we know there is no forward of
1121 // the scrub since time 'A'. So we can consider the scrub is finished.
1122 if (scrub_epoch_fully_acked + 1 == scrub_epoch)
1123 update_scrubbing = true;
1124 scrub_epoch_fully_acked = scrub_epoch;
1125 }
1126 }
1127
1128 if (mds_scrub_stats.size() != (size_t)up_max + 1)
1129 mds_scrub_stats.resize((size_t)up_max + 1);
1130 mds_scrub_stats.at(0).epoch_acked = scrub_epoch + 1;
1131
1132 bool any_finished = false;
1133 bool any_repaired = false;
1134
1135 for (auto it = scrubbing_map.begin(); it != scrubbing_map.end(); ) {
1136 auto& header = it->second;
1137 if (header->get_num_pending() ||
1138 header->get_epoch_last_forwarded() >= scrub_epoch) {
1139 if (update_scrubbing && up_max != 0)
1140 scrubbing_tags.insert(it->first);
1141 ++it;
1142 } else if (update_scrubbing && !scrubbing_tags.count(it->first)) {
1143 // no longer being scrubbed globally
1144 any_finished = true;
1145 if (header->get_repaired())
1146 any_repaired = true;
1147 scrubbing_map.erase(it++);
1148 } else {
1149 ++it;
1150 }
1151 }
1152
1153 ++scrub_epoch;
1154
1155 for (auto& r : up_mds) {
1156 if (r == 0)
1157 continue;
1158 auto m = update_scrubbing ?
1159 make_message<MMDSScrubStats>(scrub_epoch, scrubbing_tags) :
1160 make_message<MMDSScrubStats>(scrub_epoch);
1161 mds->send_message_mds(m, r);
1162 }
1163
1164 if (any_finished)
1165 clog_scrub_summary();
1166 if (any_repaired)
1167 mdcache->mds->mdlog->trim_all();
1168 }
1169
1170 void ScrubStack::handle_mds_failure(mds_rank_t mds)
1171 {
1172 if (mds == 0) {
1173 scrub_abort(nullptr);
1174 return;
1175 }
1176
1177 bool kick = false;
1178 for (auto it = remote_scrubs.begin(); it != remote_scrubs.end(); ) {
1179 if (it->second.gather_set.erase(mds) &&
1180 it->second.gather_set.empty()) {
1181 CInode *in = it->first;
1182 remote_scrubs.erase(it++);
1183 remove_from_waiting(in, false);
1184 kick = true;
1185 } else {
1186 ++it;
1187 }
1188 }
1189 if (kick)
1190 kick_off_scrubs();
1191 }