]> git.proxmox.com Git - ceph.git/blob - ceph/src/mds/ScrubStack.cc
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / mds / ScrubStack.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2014 Red Hat
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #include "ScrubStack.h"
16 #include "common/Finisher.h"
17 #include "mds/MDSRank.h"
18 #include "mds/MDCache.h"
19 #include "mds/MDSContinuation.h"
20
21 #define dout_context g_ceph_context
22 #define dout_subsys ceph_subsys_mds
23 #undef dout_prefix
24 #define dout_prefix _prefix(_dout, mdcache->mds)
25 static ostream& _prefix(std::ostream *_dout, MDSRank *mds) {
26 return *_dout << "mds." << mds->get_nodeid() << ".scrubstack ";
27 }
28
29 std::ostream &operator<<(std::ostream &os, const ScrubStack::State &state) {
30 switch(state) {
31 case ScrubStack::STATE_RUNNING:
32 os << "RUNNING";
33 break;
34 case ScrubStack::STATE_IDLE:
35 os << "IDLE";
36 break;
37 case ScrubStack::STATE_PAUSING:
38 os << "PAUSING";
39 break;
40 case ScrubStack::STATE_PAUSED:
41 os << "PAUSED";
42 break;
43 default:
44 ceph_abort();
45 }
46
47 return os;
48 }
49
50 void ScrubStack::dequeue(MDSCacheObject *obj)
51 {
52 dout(20) << "dequeue " << *obj << " from ScrubStack" << dendl;
53 ceph_assert(obj->item_scrub.is_on_list());
54 obj->put(MDSCacheObject::PIN_SCRUBQUEUE);
55 obj->item_scrub.remove_myself();
56 stack_size--;
57 }
58
59 int ScrubStack::_enqueue(MDSCacheObject *obj, ScrubHeaderRef& header, bool top)
60 {
61 ceph_assert(ceph_mutex_is_locked_by_me(mdcache->mds->mds_lock));
62 if (CInode *in = dynamic_cast<CInode*>(obj)) {
63 if (in->scrub_is_in_progress()) {
64 dout(10) << __func__ << " with {" << *in << "}" << ", already in scrubbing" << dendl;
65 return -CEPHFS_EBUSY;
66 }
67
68 dout(10) << __func__ << " with {" << *in << "}" << ", top=" << top << dendl;
69 in->scrub_initialize(header);
70 } else if (CDir *dir = dynamic_cast<CDir*>(obj)) {
71 if (dir->scrub_is_in_progress()) {
72 dout(10) << __func__ << " with {" << *dir << "}" << ", already in scrubbing" << dendl;
73 return -CEPHFS_EBUSY;
74 }
75
76 dout(10) << __func__ << " with {" << *dir << "}" << ", top=" << top << dendl;
77 // The edge directory must be in memory
78 dir->auth_pin(this);
79 dir->scrub_initialize(header);
80 } else {
81 ceph_assert(0 == "queue dentry to scrub stack");
82 }
83
84 dout(20) << "enqueue " << *obj << " to " << (top ? "top" : "bottom") << " of ScrubStack" << dendl;
85 if (!obj->item_scrub.is_on_list()) {
86 obj->get(MDSCacheObject::PIN_SCRUBQUEUE);
87 stack_size++;
88 }
89 if (top)
90 scrub_stack.push_front(&obj->item_scrub);
91 else
92 scrub_stack.push_back(&obj->item_scrub);
93 return 0;
94 }
95
96 int ScrubStack::enqueue(CInode *in, ScrubHeaderRef& header, bool top)
97 {
98 // abort in progress
99 if (clear_stack)
100 return -CEPHFS_EAGAIN;
101
102 header->set_origin(in->ino());
103 auto ret = scrubbing_map.emplace(header->get_tag(), header);
104 if (!ret.second) {
105 dout(10) << __func__ << " with {" << *in << "}"
106 << ", conflicting tag " << header->get_tag() << dendl;
107 return -CEPHFS_EEXIST;
108 }
109
110 int r = _enqueue(in, header, top);
111 if (r < 0)
112 return r;
113
114 clog_scrub_summary(in);
115
116 kick_off_scrubs();
117 return 0;
118 }
119
120 void ScrubStack::add_to_waiting(MDSCacheObject *obj)
121 {
122 scrubs_in_progress++;
123 obj->item_scrub.remove_myself();
124 scrub_waiting.push_back(&obj->item_scrub);
125 }
126
127 void ScrubStack::remove_from_waiting(MDSCacheObject *obj, bool kick)
128 {
129 scrubs_in_progress--;
130 if (obj->item_scrub.is_on_list()) {
131 obj->item_scrub.remove_myself();
132 scrub_stack.push_front(&obj->item_scrub);
133 if (kick)
134 kick_off_scrubs();
135 }
136 }
137
138 class C_RetryScrub : public MDSInternalContext {
139 public:
140 C_RetryScrub(ScrubStack *s, MDSCacheObject *o) :
141 MDSInternalContext(s->mdcache->mds), stack(s), obj(o) {
142 stack->add_to_waiting(obj);
143 }
144 void finish(int r) override {
145 stack->remove_from_waiting(obj);
146 }
147 private:
148 ScrubStack *stack;
149 MDSCacheObject *obj;
150 };
151
152 void ScrubStack::kick_off_scrubs()
153 {
154 ceph_assert(ceph_mutex_is_locked(mdcache->mds->mds_lock));
155 dout(20) << __func__ << ": state=" << state << dendl;
156
157 if (clear_stack || state == STATE_PAUSING || state == STATE_PAUSED) {
158 if (scrubs_in_progress == 0) {
159 dout(10) << __func__ << ": in progress scrub operations finished, "
160 << stack_size << " in the stack" << dendl;
161
162 State final_state = state;
163 if (clear_stack) {
164 abort_pending_scrubs();
165 final_state = STATE_IDLE;
166 }
167 if (state == STATE_PAUSING) {
168 final_state = STATE_PAUSED;
169 }
170
171 set_state(final_state);
172 complete_control_contexts(0);
173 }
174
175 return;
176 }
177
178 dout(20) << __func__ << " entering with " << scrubs_in_progress << " in "
179 "progress and " << stack_size << " in the stack" << dendl;
180 elist<MDSCacheObject*>::iterator it = scrub_stack.begin();
181 while (g_conf()->mds_max_scrub_ops_in_progress > scrubs_in_progress) {
182 if (it.end()) {
183 if (scrubs_in_progress == 0) {
184 set_state(STATE_IDLE);
185 }
186
187 return;
188 }
189
190 assert(state == STATE_RUNNING || state == STATE_IDLE);
191 set_state(STATE_RUNNING);
192
193 if (CInode *in = dynamic_cast<CInode*>(*it)) {
194 dout(20) << __func__ << " examining " << *in << dendl;
195 ++it;
196
197 if (!validate_inode_auth(in))
198 continue;
199
200 if (!in->is_dir()) {
201 // it's a regular file, symlink, or hard link
202 dequeue(in); // we only touch it this once, so remove from stack
203
204 scrub_file_inode(in);
205 } else {
206 bool added_children = false;
207 bool done = false; // it's done, so pop it off the stack
208 scrub_dir_inode(in, &added_children, &done);
209 if (done) {
210 dout(20) << __func__ << " dir inode, done" << dendl;
211 dequeue(in);
212 }
213 if (added_children) {
214 // dirfrags were queued at top of stack
215 it = scrub_stack.begin();
216 }
217 }
218 } else if (CDir *dir = dynamic_cast<CDir*>(*it)) {
219 auto next = it;
220 ++next;
221 bool done = false; // it's done, so pop it off the stack
222 scrub_dirfrag(dir, &done);
223 if (done) {
224 dout(20) << __func__ << " dirfrag, done" << dendl;
225 ++it; // child inodes were queued at bottom of stack
226 dequeue(dir);
227 } else {
228 it = next;
229 }
230 } else {
231 ceph_assert(0 == "dentry in scrub stack");
232 }
233 }
234 }
235
236 bool ScrubStack::validate_inode_auth(CInode *in)
237 {
238 if (in->is_auth()) {
239 if (!in->can_auth_pin()) {
240 dout(10) << __func__ << " can't auth pin" << dendl;
241 in->add_waiter(CInode::WAIT_UNFREEZE, new C_RetryScrub(this, in));
242 return false;
243 }
244 return true;
245 } else {
246 MDSRank *mds = mdcache->mds;
247 if (in->is_ambiguous_auth()) {
248 dout(10) << __func__ << " ambiguous auth" << dendl;
249 in->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, new C_RetryScrub(this, in));
250 } else if (mds->is_cluster_degraded()) {
251 dout(20) << __func__ << " cluster degraded" << dendl;
252 mds->wait_for_cluster_recovered(new C_RetryScrub(this, in));
253 } else {
254 ScrubHeaderRef header = in->get_scrub_header();
255 ceph_assert(header);
256
257 auto ret = remote_scrubs.emplace(std::piecewise_construct,
258 std::forward_as_tuple(in),
259 std::forward_as_tuple());
260 ceph_assert(ret.second); // FIXME: parallel scrubs?
261 auto &scrub_r = ret.first->second;
262 scrub_r.tag = header->get_tag();
263
264 mds_rank_t auth = in->authority().first;
265 dout(10) << __func__ << " forward to mds." << auth << dendl;
266 auto r = make_message<MMDSScrub>(MMDSScrub::OP_QUEUEINO, in->ino(),
267 std::move(in->scrub_queued_frags()),
268 header->get_tag(), header->get_origin(),
269 header->is_internal_tag(), header->get_force(),
270 header->get_recursive(), header->get_repair());
271 mdcache->mds->send_message_mds(r, auth);
272
273 scrub_r.gather_set.insert(auth);
274 // wait for ACK
275 add_to_waiting(in);
276 }
277 return false;
278 }
279 }
280
281 void ScrubStack::scrub_dir_inode(CInode *in, bool *added_children, bool *done)
282 {
283 dout(10) << __func__ << " " << *in << dendl;
284 ceph_assert(in->is_auth());
285 MDSRank *mds = mdcache->mds;
286
287 ScrubHeaderRef header = in->get_scrub_header();
288 ceph_assert(header);
289
290 MDSGatherBuilder gather(g_ceph_context);
291
292 auto &queued = in->scrub_queued_frags();
293 std::map<mds_rank_t, fragset_t> scrub_remote;
294
295 frag_vec_t frags;
296 in->dirfragtree.get_leaves(frags);
297 dout(20) << __func__ << "recursive mode, frags " << frags << dendl;
298 for (auto &fg : frags) {
299 if (queued.contains(fg))
300 continue;
301 CDir *dir = in->get_or_open_dirfrag(mdcache, fg);
302 if (!dir->is_auth()) {
303 if (dir->is_ambiguous_auth()) {
304 dout(20) << __func__ << " ambiguous auth " << *dir << dendl;
305 dir->add_waiter(MDSCacheObject::WAIT_SINGLEAUTH, gather.new_sub());
306 } else if (mds->is_cluster_degraded()) {
307 dout(20) << __func__ << " cluster degraded" << dendl;
308 mds->wait_for_cluster_recovered(gather.new_sub());
309 } else {
310 mds_rank_t auth = dir->authority().first;
311 scrub_remote[auth].insert_raw(fg);
312 }
313 } else if (!dir->can_auth_pin()) {
314 dout(20) << __func__ << " freezing/frozen " << *dir << dendl;
315 dir->add_waiter(CDir::WAIT_UNFREEZE, gather.new_sub());
316 } else if (dir->get_version() == 0) {
317 dout(20) << __func__ << " barebones " << *dir << dendl;
318 dir->fetch(gather.new_sub());
319 } else {
320 _enqueue(dir, header, true);
321 queued.insert_raw(dir->get_frag());
322 *added_children = true;
323 }
324 }
325
326 queued.simplify();
327
328 if (gather.has_subs()) {
329 gather.set_finisher(new C_RetryScrub(this, in));
330 gather.activate();
331 return;
332 }
333
334 if (!scrub_remote.empty()) {
335 auto ret = remote_scrubs.emplace(std::piecewise_construct,
336 std::forward_as_tuple(in),
337 std::forward_as_tuple());
338 ceph_assert(ret.second); // FIXME: parallel scrubs?
339 auto &scrub_r = ret.first->second;
340 scrub_r.tag = header->get_tag();
341
342 for (auto& p : scrub_remote) {
343 p.second.simplify();
344 dout(20) << __func__ << " forward " << p.second << " to mds." << p.first << dendl;
345 auto r = make_message<MMDSScrub>(MMDSScrub::OP_QUEUEDIR, in->ino(),
346 std::move(p.second), header->get_tag(),
347 header->get_origin(), header->is_internal_tag(),
348 header->get_force(), header->get_recursive(),
349 header->get_repair());
350 mds->send_message_mds(r, p.first);
351 scrub_r.gather_set.insert(p.first);
352 }
353 // wait for ACKs
354 add_to_waiting(in);
355 return;
356 }
357
358 scrub_dir_inode_final(in);
359
360 *done = true;
361 dout(10) << __func__ << " done" << dendl;
362 }
363
364 class C_InodeValidated : public MDSInternalContext
365 {
366 public:
367 ScrubStack *stack;
368 CInode::validated_data result;
369 CInode *target;
370
371 C_InodeValidated(MDSRank *mds, ScrubStack *stack_, CInode *target_)
372 : MDSInternalContext(mds), stack(stack_), target(target_)
373 {
374 stack->scrubs_in_progress++;
375 }
376 void finish(int r) override {
377 stack->_validate_inode_done(target, r, result);
378 stack->scrubs_in_progress--;
379 stack->kick_off_scrubs();
380 }
381 };
382
383 void ScrubStack::scrub_dir_inode_final(CInode *in)
384 {
385 dout(20) << __func__ << " " << *in << dendl;
386
387 C_InodeValidated *fin = new C_InodeValidated(mdcache->mds, this, in);
388 in->validate_disk_state(&fin->result, fin);
389 return;
390 }
391
392 void ScrubStack::scrub_dirfrag(CDir *dir, bool *done)
393 {
394 ceph_assert(dir != NULL);
395
396 dout(10) << __func__ << " " << *dir << dendl;
397
398 if (!dir->is_complete()) {
399 dir->fetch(new C_RetryScrub(this, dir), true); // already auth pinned
400 dout(10) << __func__ << " incomplete, fetching" << dendl;
401 return;
402 }
403
404 ScrubHeaderRef header = dir->get_scrub_header();
405 version_t last_scrub = dir->scrub_info()->last_recursive.version;
406 if (header->get_recursive()) {
407 for (auto it = dir->begin(); it != dir->end(); ++it) {
408 if (it->first.snapid != CEPH_NOSNAP)
409 continue;
410 CDentry *dn = it->second;
411 CDentry::linkage_t *dnl = dn->get_linkage();
412 if (dn->get_version() <= last_scrub &&
413 dnl->get_remote_d_type() != DT_DIR &&
414 !header->get_force()) {
415 dout(15) << __func__ << " skip dentry " << it->first
416 << ", no change since last scrub" << dendl;
417 continue;
418 }
419 if (dnl->is_primary()) {
420 _enqueue(dnl->get_inode(), header, false);
421 } else if (dnl->is_remote()) {
422 // TODO: check remote linkage
423 }
424 }
425 }
426
427 dir->scrub_local();
428
429 dir->scrub_finished();
430 dir->auth_unpin(this);
431
432 *done = true;
433 dout(10) << __func__ << " done" << dendl;
434 }
435
436 void ScrubStack::scrub_file_inode(CInode *in)
437 {
438 C_InodeValidated *fin = new C_InodeValidated(mdcache->mds, this, in);
439 // At this stage the DN is already past scrub_initialize, so
440 // it's in the cache, it has PIN_SCRUBQUEUE and it is authpinned
441 in->validate_disk_state(&fin->result, fin);
442 }
443
444 void ScrubStack::_validate_inode_done(CInode *in, int r,
445 const CInode::validated_data &result)
446 {
447 LogChannelRef clog = mdcache->mds->clog;
448 const ScrubHeaderRefConst header = in->scrub_info()->header;
449
450 std::string path;
451 if (!result.passed_validation) {
452 // Build path string for use in messages
453 in->make_path_string(path, true);
454 }
455
456 if (result.backtrace.checked && !result.backtrace.passed &&
457 !result.backtrace.repaired)
458 {
459 // Record backtrace fails as remote linkage damage, as
460 // we may not be able to resolve hard links to this inode
461 mdcache->mds->damage_table.notify_remote_damaged(in->ino(), path);
462 } else if (result.inode.checked && !result.inode.passed &&
463 !result.inode.repaired) {
464 // Record damaged inode structures as damaged dentries as
465 // that is where they are stored
466 auto parent = in->get_projected_parent_dn();
467 if (parent) {
468 auto dir = parent->get_dir();
469 mdcache->mds->damage_table.notify_dentry(
470 dir->inode->ino(), dir->frag, parent->last, parent->get_name(), path);
471 }
472 }
473
474 // Inform the cluster log if we found an error
475 if (!result.passed_validation) {
476 if (result.all_damage_repaired()) {
477 clog->info() << "Scrub repaired inode " << in->ino()
478 << " (" << path << ")";
479 } else {
480 clog->warn() << "Scrub error on inode " << in->ino()
481 << " (" << path << ") see " << g_conf()->name
482 << " log and `damage ls` output for details";
483 }
484
485 // Put the verbose JSON output into the MDS log for later inspection
486 JSONFormatter f;
487 result.dump(&f);
488 CachedStackStringStream css;
489 f.flush(*css);
490 derr << __func__ << " scrub error on inode " << *in << ": " << css->strv()
491 << dendl;
492 } else {
493 dout(10) << __func__ << " scrub passed on inode " << *in << dendl;
494 }
495
496 in->scrub_finished();
497 }
498
499 void ScrubStack::complete_control_contexts(int r) {
500 ceph_assert(ceph_mutex_is_locked_by_me(mdcache->mds->mds_lock));
501
502 for (auto &ctx : control_ctxs) {
503 ctx->complete(r);
504 }
505 control_ctxs.clear();
506 }
507
508 void ScrubStack::set_state(State next_state) {
509 if (state != next_state) {
510 dout(20) << __func__ << ", from state=" << state << ", to state="
511 << next_state << dendl;
512 state = next_state;
513 clog_scrub_summary();
514 }
515 }
516
517 bool ScrubStack::scrub_in_transition_state() {
518 ceph_assert(ceph_mutex_is_locked_by_me(mdcache->mds->mds_lock));
519 dout(20) << __func__ << ": state=" << state << dendl;
520
521 // STATE_RUNNING is considered as a transition state so as to
522 // "delay" the scrub control operation.
523 if (state == STATE_RUNNING || state == STATE_PAUSING) {
524 return true;
525 }
526
527 return false;
528 }
529
530 std::string_view ScrubStack::scrub_summary() {
531 ceph_assert(ceph_mutex_is_locked_by_me(mdcache->mds->mds_lock));
532
533 bool have_more = false;
534 CachedStackStringStream cs;
535
536 if (state == STATE_IDLE) {
537 if (scrubbing_map.empty())
538 return "idle";
539 *cs << "idle+waiting";
540 }
541
542 if (state == STATE_RUNNING) {
543 if (clear_stack) {
544 *cs << "aborting";
545 } else {
546 *cs << "active";
547 }
548 } else {
549 if (state == STATE_PAUSING) {
550 have_more = true;
551 *cs << "pausing";
552 } else if (state == STATE_PAUSED) {
553 have_more = true;
554 *cs << "paused";
555 }
556
557 if (clear_stack) {
558 if (have_more) {
559 *cs << "+";
560 }
561 *cs << "aborting";
562 }
563 }
564
565 if (!scrubbing_map.empty()) {
566 *cs << " paths [";
567 bool first = true;
568 for (auto &p : scrubbing_map) {
569 if (!first)
570 *cs << ",";
571 auto& header = p.second;
572 if (CInode *in = mdcache->get_inode(header->get_origin()))
573 *cs << scrub_inode_path(in);
574 else
575 *cs << "#" << header->get_origin();
576 first = false;
577 }
578 *cs << "]";
579 }
580
581 return cs->strv();
582 }
583
584 void ScrubStack::scrub_status(Formatter *f) {
585 ceph_assert(ceph_mutex_is_locked_by_me(mdcache->mds->mds_lock));
586
587 f->open_object_section("result");
588
589 CachedStackStringStream css;
590 bool have_more = false;
591
592 if (state == STATE_IDLE) {
593 if (scrubbing_map.empty())
594 *css << "no active scrubs running";
595 else
596 *css << state << " (waiting for more scrubs)";
597 } else if (state == STATE_RUNNING) {
598 if (clear_stack) {
599 *css << "ABORTING";
600 } else {
601 *css << "scrub active";
602 }
603 *css << " (" << stack_size << " inodes in the stack)";
604 } else {
605 if (state == STATE_PAUSING || state == STATE_PAUSED) {
606 have_more = true;
607 *css << state;
608 }
609 if (clear_stack) {
610 if (have_more) {
611 *css << "+";
612 }
613 *css << "ABORTING";
614 }
615
616 *css << " (" << stack_size << " inodes in the stack)";
617 }
618 f->dump_string("status", css->strv());
619
620 f->open_object_section("scrubs");
621
622 for (auto& p : scrubbing_map) {
623 have_more = false;
624 auto& header = p.second;
625
626 std::string tag(header->get_tag());
627 f->open_object_section(tag.c_str()); // scrub id
628
629 if (CInode *in = mdcache->get_inode(header->get_origin()))
630 f->dump_string("path", scrub_inode_path(in));
631 else
632 f->dump_stream("path") << "#" << header->get_origin();
633
634 f->dump_string("tag", header->get_tag());
635
636 CachedStackStringStream optcss;
637 if (header->get_recursive()) {
638 *optcss << "recursive";
639 have_more = true;
640 }
641 if (header->get_repair()) {
642 if (have_more) {
643 *optcss << ",";
644 }
645 *optcss << "repair";
646 have_more = true;
647 }
648 if (header->get_force()) {
649 if (have_more) {
650 *optcss << ",";
651 }
652 *optcss << "force";
653 }
654
655 f->dump_string("options", optcss->strv());
656 f->close_section(); // scrub id
657 }
658 f->close_section(); // scrubs
659 f->close_section(); // result
660 }
661
662 void ScrubStack::abort_pending_scrubs() {
663 ceph_assert(ceph_mutex_is_locked_by_me(mdcache->mds->mds_lock));
664 ceph_assert(clear_stack);
665
666 auto abort_one = [this](MDSCacheObject *obj) {
667 if (CInode *in = dynamic_cast<CInode*>(obj)) {
668 in->scrub_aborted();
669 } else if (CDir *dir = dynamic_cast<CDir*>(obj)) {
670 dir->scrub_aborted();
671 dir->auth_unpin(this);
672 } else {
673 ceph_abort(0 == "dentry in scrub stack");
674 }
675 };
676 for (auto it = scrub_stack.begin(); !it.end(); ++it)
677 abort_one(*it);
678 for (auto it = scrub_waiting.begin(); !it.end(); ++it)
679 abort_one(*it);
680
681 stack_size = 0;
682 scrub_stack.clear();
683 scrub_waiting.clear();
684
685 for (auto& p : remote_scrubs)
686 remove_from_waiting(p.first, false);
687 remote_scrubs.clear();
688
689 clear_stack = false;
690 }
691
692 void ScrubStack::send_state_message(int op) {
693 MDSRank *mds = mdcache->mds;
694 set<mds_rank_t> up_mds;
695 mds->get_mds_map()->get_up_mds_set(up_mds);
696 for (auto& r : up_mds) {
697 if (r == 0)
698 continue;
699 auto m = make_message<MMDSScrub>(op);
700 mds->send_message_mds(m, r);
701 }
702 }
703
704 void ScrubStack::scrub_abort(Context *on_finish) {
705 ceph_assert(ceph_mutex_is_locked_by_me(mdcache->mds->mds_lock));
706
707 dout(10) << __func__ << ": aborting with " << scrubs_in_progress
708 << " scrubs in progress and " << stack_size << " in the"
709 << " stack" << dendl;
710
711 if (mdcache->mds->get_nodeid() == 0) {
712 scrub_epoch_last_abort = scrub_epoch;
713 scrub_any_peer_aborting = true;
714 send_state_message(MMDSScrub::OP_ABORT);
715 }
716
717 clear_stack = true;
718 if (scrub_in_transition_state()) {
719 if (on_finish)
720 control_ctxs.push_back(on_finish);
721 return;
722 }
723
724 abort_pending_scrubs();
725 if (state != STATE_PAUSED)
726 set_state(STATE_IDLE);
727
728 if (on_finish)
729 on_finish->complete(0);
730 }
731
732 void ScrubStack::scrub_pause(Context *on_finish) {
733 ceph_assert(ceph_mutex_is_locked_by_me(mdcache->mds->mds_lock));
734
735 dout(10) << __func__ << ": pausing with " << scrubs_in_progress
736 << " scrubs in progress and " << stack_size << " in the"
737 << " stack" << dendl;
738
739 if (mdcache->mds->get_nodeid() == 0)
740 send_state_message(MMDSScrub::OP_PAUSE);
741
742 // abort is in progress
743 if (clear_stack) {
744 if (on_finish)
745 on_finish->complete(-CEPHFS_EINVAL);
746 return;
747 }
748
749 bool done = scrub_in_transition_state();
750 if (done) {
751 set_state(STATE_PAUSING);
752 if (on_finish)
753 control_ctxs.push_back(on_finish);
754 return;
755 }
756
757 set_state(STATE_PAUSED);
758 if (on_finish)
759 on_finish->complete(0);
760 }
761
762 bool ScrubStack::scrub_resume() {
763 ceph_assert(ceph_mutex_is_locked_by_me(mdcache->mds->mds_lock));
764 dout(20) << __func__ << ": state=" << state << dendl;
765
766 if (mdcache->mds->get_nodeid() == 0)
767 send_state_message(MMDSScrub::OP_RESUME);
768
769 int r = 0;
770
771 if (clear_stack) {
772 r = -CEPHFS_EINVAL;
773 } else if (state == STATE_PAUSING) {
774 set_state(STATE_RUNNING);
775 complete_control_contexts(-CEPHFS_ECANCELED);
776 } else if (state == STATE_PAUSED) {
777 set_state(STATE_RUNNING);
778 kick_off_scrubs();
779 }
780
781 return r;
782 }
783
784 // send current scrub summary to cluster log
785 void ScrubStack::clog_scrub_summary(CInode *in) {
786 if (in) {
787 std::string what;
788 if (clear_stack) {
789 what = "aborted";
790 } else if (in->scrub_is_in_progress()) {
791 what = "queued";
792 } else {
793 what = "completed";
794 }
795 clog->info() << "scrub " << what << " for path: " << scrub_inode_path(in);
796 }
797
798 clog->info() << "scrub summary: " << scrub_summary();
799 }
800
801 void ScrubStack::dispatch(const cref_t<Message> &m)
802 {
803 switch (m->get_type()) {
804 case MSG_MDS_SCRUB:
805 handle_scrub(ref_cast<MMDSScrub>(m));
806 break;
807
808 case MSG_MDS_SCRUB_STATS:
809 handle_scrub_stats(ref_cast<MMDSScrubStats>(m));
810 break;
811
812 default:
813 derr << " scrub stack unknown message " << m->get_type() << dendl_impl;
814 ceph_abort_msg("scrub stack unknown message");
815 }
816 }
817
818 void ScrubStack::handle_scrub(const cref_t<MMDSScrub> &m)
819 {
820
821 mds_rank_t from = mds_rank_t(m->get_source().num());
822 dout(10) << __func__ << " " << *m << " from mds." << from << dendl;
823
824 switch (m->get_op()) {
825 case MMDSScrub::OP_QUEUEDIR:
826 {
827 CInode *diri = mdcache->get_inode(m->get_ino());
828 ceph_assert(diri);
829
830 std::vector<CDir*> dfs;
831 MDSGatherBuilder gather(g_ceph_context);
832 for (const auto& fg : m->get_frags()) {
833 CDir *dir = diri->get_dirfrag(fg);
834 if (!dir) {
835 dout(10) << __func__ << " no frag " << fg << dendl;
836 continue;
837 }
838 if (!dir->is_auth()) {
839 dout(10) << __func__ << " not auth " << *dir << dendl;
840 continue;
841 }
842 if (!dir->can_auth_pin()) {
843 dout(10) << __func__ << " can't auth pin " << *dir << dendl;
844 dir->add_waiter(CDir::WAIT_UNFREEZE, gather.new_sub());
845 continue;
846 }
847 dfs.push_back(dir);
848 }
849
850 if (gather.has_subs()) {
851 gather.set_finisher(new C_MDS_RetryMessage(mdcache->mds, m));
852 gather.activate();
853 return;
854 }
855
856 fragset_t queued;
857 if (!dfs.empty()) {
858 ScrubHeaderRef header;
859 if (auto it = scrubbing_map.find(m->get_tag()); it != scrubbing_map.end()) {
860 header = it->second;
861 } else {
862 header = std::make_shared<ScrubHeader>(m->get_tag(), m->is_internal_tag(),
863 m->is_force(), m->is_recursive(),
864 m->is_repair());
865 header->set_origin(m->get_origin());
866 scrubbing_map.emplace(header->get_tag(), header);
867 }
868 for (auto dir : dfs) {
869 queued.insert_raw(dir->get_frag());
870 _enqueue(dir, header, true);
871 }
872 queued.simplify();
873 kick_off_scrubs();
874 }
875
876 auto r = make_message<MMDSScrub>(MMDSScrub::OP_QUEUEDIR_ACK, m->get_ino(),
877 std::move(queued), m->get_tag());
878 mdcache->mds->send_message_mds(r, from);
879 }
880 break;
881 case MMDSScrub::OP_QUEUEDIR_ACK:
882 {
883 CInode *diri = mdcache->get_inode(m->get_ino());
884 ceph_assert(diri);
885 auto it = remote_scrubs.find(diri);
886 if (it != remote_scrubs.end() &&
887 m->get_tag() == it->second.tag) {
888 if (it->second.gather_set.erase(from)) {
889 auto &queued = diri->scrub_queued_frags();
890 for (auto &fg : m->get_frags())
891 queued.insert_raw(fg);
892 queued.simplify();
893
894 if (it->second.gather_set.empty()) {
895 remote_scrubs.erase(it);
896
897 const auto& header = diri->get_scrub_header();
898 header->set_epoch_last_forwarded(scrub_epoch);
899 remove_from_waiting(diri);
900 }
901 }
902 }
903 }
904 break;
905 case MMDSScrub::OP_QUEUEINO:
906 {
907 CInode *in = mdcache->get_inode(m->get_ino());
908 ceph_assert(in);
909
910 ScrubHeaderRef header;
911 if (auto it = scrubbing_map.find(m->get_tag()); it != scrubbing_map.end()) {
912 header = it->second;
913 } else {
914 header = std::make_shared<ScrubHeader>(m->get_tag(), m->is_internal_tag(),
915 m->is_force(), m->is_recursive(),
916 m->is_repair());
917 header->set_origin(m->get_origin());
918 scrubbing_map.emplace(header->get_tag(), header);
919 }
920
921 _enqueue(in, header, true);
922 in->scrub_queued_frags() = m->get_frags();
923 kick_off_scrubs();
924
925 fragset_t queued;
926 auto r = make_message<MMDSScrub>(MMDSScrub::OP_QUEUEINO_ACK, m->get_ino(),
927 std::move(queued), m->get_tag());
928 mdcache->mds->send_message_mds(r, from);
929 }
930 break;
931 case MMDSScrub::OP_QUEUEINO_ACK:
932 {
933 CInode *in = mdcache->get_inode(m->get_ino());
934 ceph_assert(in);
935 auto it = remote_scrubs.find(in);
936 if (it != remote_scrubs.end() &&
937 m->get_tag() == it->second.tag &&
938 it->second.gather_set.erase(from)) {
939 ceph_assert(it->second.gather_set.empty());
940 remote_scrubs.erase(it);
941
942 remove_from_waiting(in, false);
943 dequeue(in);
944
945 const auto& header = in->get_scrub_header();
946 header->set_epoch_last_forwarded(scrub_epoch);
947 in->scrub_finished();
948
949 kick_off_scrubs();
950 }
951 }
952 break;
953 case MMDSScrub::OP_ABORT:
954 scrub_abort(nullptr);
955 break;
956 case MMDSScrub::OP_PAUSE:
957 scrub_pause(nullptr);
958 break;
959 case MMDSScrub::OP_RESUME:
960 scrub_resume();
961 break;
962 default:
963 derr << " scrub stack unknown scrub operation " << m->get_op() << dendl_impl;
964 ceph_abort_msg("scrub stack unknown scrub operation");
965 }
966 }
967
968 void ScrubStack::handle_scrub_stats(const cref_t<MMDSScrubStats> &m)
969 {
970 mds_rank_t from = mds_rank_t(m->get_source().num());
971 dout(7) << __func__ << " " << *m << " from mds." << from << dendl;
972
973 if (from == 0) {
974 if (scrub_epoch != m->get_epoch() - 1) {
975 scrub_epoch = m->get_epoch() - 1;
976 for (auto& p : scrubbing_map) {
977 if (p.second->get_epoch_last_forwarded())
978 p.second->set_epoch_last_forwarded(scrub_epoch);
979 }
980 }
981 bool any_finished = false;
982 bool any_repaired = false;
983 std::set<std::string> scrubbing_tags;
984 for (auto it = scrubbing_map.begin(); it != scrubbing_map.end(); ) {
985 auto& header = it->second;
986 if (header->get_num_pending() ||
987 header->get_epoch_last_forwarded() >= scrub_epoch) {
988 scrubbing_tags.insert(it->first);
989 ++it;
990 } else if (m->is_finished(it->first)) {
991 any_finished = true;
992 if (header->get_repaired())
993 any_repaired = true;
994 scrubbing_map.erase(it++);
995 } else {
996 ++it;
997 }
998 }
999
1000 scrub_epoch = m->get_epoch();
1001
1002 auto ack = make_message<MMDSScrubStats>(scrub_epoch,
1003 std::move(scrubbing_tags), clear_stack);
1004 mdcache->mds->send_message_mds(ack, 0);
1005
1006 if (any_finished)
1007 clog_scrub_summary();
1008 if (any_repaired)
1009 mdcache->mds->mdlog->trim_all();
1010 } else {
1011 if (scrub_epoch == m->get_epoch() &&
1012 (size_t)from < mds_scrub_stats.size()) {
1013 auto& stat = mds_scrub_stats[from];
1014 stat.epoch_acked = m->get_epoch();
1015 stat.scrubbing_tags = m->get_scrubbing_tags();
1016 stat.aborting = m->is_aborting();
1017 }
1018 }
1019 }
1020
1021 void ScrubStack::advance_scrub_status()
1022 {
1023 if (!scrub_any_peer_aborting && scrubbing_map.empty())
1024 return;
1025
1026 MDSRank *mds = mdcache->mds;
1027
1028 set<mds_rank_t> up_mds;
1029 mds->get_mds_map()->get_up_mds_set(up_mds);
1030 auto up_max = *up_mds.rbegin();
1031
1032 bool update_scrubbing = false;
1033 std::set<std::string> scrubbing_tags;
1034
1035 if (up_max == 0) {
1036 update_scrubbing = true;
1037 scrub_any_peer_aborting = false;
1038 } else if (mds_scrub_stats.size() > (size_t)(up_max)) {
1039 bool any_aborting = false;
1040 bool fully_acked = true;
1041 for (const auto& stat : mds_scrub_stats) {
1042 if (stat.aborting || stat.epoch_acked <= scrub_epoch_last_abort)
1043 any_aborting = true;
1044 if (stat.epoch_acked != scrub_epoch) {
1045 fully_acked = false;
1046 continue;
1047 }
1048 scrubbing_tags.insert(stat.scrubbing_tags.begin(),
1049 stat.scrubbing_tags.end());
1050 }
1051 if (!any_aborting)
1052 scrub_any_peer_aborting = false;
1053 if (fully_acked) {
1054 // handle_scrub_stats() reports scrub is still in-progress if it has
1055 // forwarded any object to other mds since previous epoch. Let's assume,
1056 // at time 'A', we got scrub stats from all mds for previous epoch. If
1057 // a scrub is not reported by any mds, we know there is no forward of
1058 // the scrub since time 'A'. So we can consider the scrub is finished.
1059 if (scrub_epoch_fully_acked + 1 == scrub_epoch)
1060 update_scrubbing = true;
1061 scrub_epoch_fully_acked = scrub_epoch;
1062 }
1063 }
1064
1065 if (mds_scrub_stats.size() != (size_t)up_max + 1)
1066 mds_scrub_stats.resize((size_t)up_max + 1);
1067 mds_scrub_stats.at(0).epoch_acked = scrub_epoch + 1;
1068
1069 bool any_finished = false;
1070 bool any_repaired = false;
1071
1072 for (auto it = scrubbing_map.begin(); it != scrubbing_map.end(); ) {
1073 auto& header = it->second;
1074 if (header->get_num_pending() ||
1075 header->get_epoch_last_forwarded() >= scrub_epoch) {
1076 if (update_scrubbing && up_max != 0)
1077 scrubbing_tags.insert(it->first);
1078 ++it;
1079 } else if (update_scrubbing && !scrubbing_tags.count(it->first)) {
1080 // no longer being scrubbed globally
1081 any_finished = true;
1082 if (header->get_repaired())
1083 any_repaired = true;
1084 scrubbing_map.erase(it++);
1085 } else {
1086 ++it;
1087 }
1088 }
1089
1090 ++scrub_epoch;
1091
1092 for (auto& r : up_mds) {
1093 if (r == 0)
1094 continue;
1095 auto m = update_scrubbing ?
1096 make_message<MMDSScrubStats>(scrub_epoch, scrubbing_tags) :
1097 make_message<MMDSScrubStats>(scrub_epoch);
1098 mds->send_message_mds(m, r);
1099 }
1100
1101 if (any_finished)
1102 clog_scrub_summary();
1103 if (any_repaired)
1104 mdcache->mds->mdlog->trim_all();
1105 }
1106
1107 void ScrubStack::handle_mds_failure(mds_rank_t mds)
1108 {
1109 if (mds == 0) {
1110 scrub_abort(nullptr);
1111 return;
1112 }
1113
1114 bool kick = false;
1115 for (auto it = remote_scrubs.begin(); it != remote_scrubs.end(); ) {
1116 if (it->second.gather_set.erase(mds) &&
1117 it->second.gather_set.empty()) {
1118 CInode *in = it->first;
1119 remote_scrubs.erase(it++);
1120 remove_from_waiting(in, false);
1121 kick = true;
1122 } else {
1123 ++it;
1124 }
1125 }
1126 if (kick)
1127 kick_off_scrubs();
1128 }