]> git.proxmox.com Git - ceph.git/blob - ceph/src/mds/MDSRank.cc
import ceph pacific 16.2.5
[ceph.git] / ceph / src / mds / MDSRank.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2015 Red Hat
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #include <string_view>
16 #include <typeinfo>
17 #include "common/debug.h"
18 #include "common/errno.h"
19 #include "common/async/blocked_completion.h"
20
21 #include "messages/MClientRequestForward.h"
22 #include "messages/MMDSLoadTargets.h"
23 #include "messages/MMDSTableRequest.h"
24 #include "messages/MMDSMetrics.h"
25
26 #include "mgr/MgrClient.h"
27
28 #include "MDSDaemon.h"
29 #include "MDSMap.h"
30 #include "MetricAggregator.h"
31 #include "SnapClient.h"
32 #include "SnapServer.h"
33 #include "MDBalancer.h"
34 #include "Migrator.h"
35 #include "Locker.h"
36 #include "InoTable.h"
37 #include "mon/MonClient.h"
38 #include "common/HeartbeatMap.h"
39 #include "ScrubStack.h"
40
41
42 #include "MDSRank.h"
43
44 #define dout_context g_ceph_context
45 #define dout_subsys ceph_subsys_mds
46 #undef dout_prefix
47 #define dout_prefix *_dout << "mds." << whoami << '.' << incarnation << ' '
48 using TOPNSPC::common::cmd_getval;
49 class C_Flush_Journal : public MDSInternalContext {
50 public:
51 C_Flush_Journal(MDCache *mdcache, MDLog *mdlog, MDSRank *mds,
52 std::ostream *ss, Context *on_finish)
53 : MDSInternalContext(mds),
54 mdcache(mdcache), mdlog(mdlog), ss(ss), on_finish(on_finish),
55 whoami(mds->whoami), incarnation(mds->incarnation) {
56 }
57
58 void send() {
59 assert(ceph_mutex_is_locked(mds->mds_lock));
60
61 dout(20) << __func__ << dendl;
62
63 if (mdcache->is_readonly()) {
64 dout(5) << __func__ << ": read-only FS" << dendl;
65 complete(-CEPHFS_EROFS);
66 return;
67 }
68
69 if (!mds->is_active()) {
70 dout(5) << __func__ << ": MDS not active, no-op" << dendl;
71 complete(0);
72 return;
73 }
74
75 flush_mdlog();
76 }
77
78 private:
79
80 void flush_mdlog() {
81 dout(20) << __func__ << dendl;
82
83 // I need to seal off the current segment, and then mark all
84 // previous segments for expiry
85 mdlog->start_new_segment();
86
87 Context *ctx = new LambdaContext([this](int r) {
88 handle_flush_mdlog(r);
89 });
90
91 // Flush initially so that all the segments older than our new one
92 // will be elegible for expiry
93 mdlog->flush();
94 mdlog->wait_for_safe(new MDSInternalContextWrapper(mds, ctx));
95 }
96
97 void handle_flush_mdlog(int r) {
98 dout(20) << __func__ << ": r=" << r << dendl;
99
100 if (r != 0) {
101 *ss << "Error " << r << " (" << cpp_strerror(r) << ") while flushing journal";
102 complete(r);
103 return;
104 }
105
106 clear_mdlog();
107 }
108
109 void clear_mdlog() {
110 dout(20) << __func__ << dendl;
111
112 Context *ctx = new LambdaContext([this](int r) {
113 handle_clear_mdlog(r);
114 });
115
116 // Because we may not be the last wait_for_safe context on MDLog,
117 // and subsequent contexts might wake up in the middle of our
118 // later trim_all and interfere with expiry (by e.g. marking
119 // dirs/dentries dirty on previous log segments), we run a second
120 // wait_for_safe here. See #10368
121 mdlog->wait_for_safe(new MDSInternalContextWrapper(mds, ctx));
122 }
123
124 void handle_clear_mdlog(int r) {
125 dout(20) << __func__ << ": r=" << r << dendl;
126
127 if (r != 0) {
128 *ss << "Error " << r << " (" << cpp_strerror(r) << ") while flushing journal";
129 complete(r);
130 return;
131 }
132
133 trim_mdlog();
134 }
135
136 void trim_mdlog() {
137 // Put all the old log segments into expiring or expired state
138 dout(5) << __func__ << ": beginning segment expiry" << dendl;
139
140 int ret = mdlog->trim_all();
141 if (ret != 0) {
142 *ss << "Error " << ret << " (" << cpp_strerror(ret) << ") while trimming log";
143 complete(ret);
144 return;
145 }
146
147 expire_segments();
148 }
149
150 void expire_segments() {
151 dout(20) << __func__ << dendl;
152
153 // Attach contexts to wait for all expiring segments to expire
154 MDSGatherBuilder expiry_gather(g_ceph_context);
155
156 const auto &expiring_segments = mdlog->get_expiring_segments();
157 for (auto p : expiring_segments) {
158 p->wait_for_expiry(expiry_gather.new_sub());
159 }
160 dout(5) << __func__ << ": waiting for " << expiry_gather.num_subs_created()
161 << " segments to expire" << dendl;
162
163 if (!expiry_gather.has_subs()) {
164 trim_segments();
165 return;
166 }
167
168 Context *ctx = new LambdaContext([this](int r) {
169 handle_expire_segments(r);
170 });
171 expiry_gather.set_finisher(new MDSInternalContextWrapper(mds, ctx));
172 expiry_gather.activate();
173 }
174
175 void handle_expire_segments(int r) {
176 dout(20) << __func__ << ": r=" << r << dendl;
177
178 ceph_assert(r == 0); // MDLog is not allowed to raise errors via
179 // wait_for_expiry
180 trim_segments();
181 }
182
183 void trim_segments() {
184 dout(20) << __func__ << dendl;
185
186 Context *ctx = new C_OnFinisher(new LambdaContext([this](int) {
187 std::lock_guard locker(mds->mds_lock);
188 trim_expired_segments();
189 }), mds->finisher);
190 ctx->complete(0);
191 }
192
193 void trim_expired_segments() {
194 dout(5) << __func__ << ": expiry complete, expire_pos/trim_pos is now "
195 << std::hex << mdlog->get_journaler()->get_expire_pos() << "/"
196 << mdlog->get_journaler()->get_trimmed_pos() << dendl;
197
198 // Now everyone I'm interested in is expired
199 mdlog->trim_expired_segments();
200
201 dout(5) << __func__ << ": trim complete, expire_pos/trim_pos is now "
202 << std::hex << mdlog->get_journaler()->get_expire_pos() << "/"
203 << mdlog->get_journaler()->get_trimmed_pos() << dendl;
204
205 write_journal_head();
206 }
207
208 void write_journal_head() {
209 dout(20) << __func__ << dendl;
210
211 Context *ctx = new LambdaContext([this](int r) {
212 std::lock_guard locker(mds->mds_lock);
213 handle_write_head(r);
214 });
215 // Flush the journal header so that readers will start from after
216 // the flushed region
217 mdlog->get_journaler()->write_head(ctx);
218 }
219
220 void handle_write_head(int r) {
221 if (r != 0) {
222 *ss << "Error " << r << " (" << cpp_strerror(r) << ") while writing header";
223 } else {
224 dout(5) << __func__ << ": write_head complete, all done!" << dendl;
225 }
226
227 complete(r);
228 }
229
230 void finish(int r) override {
231 dout(20) << __func__ << ": r=" << r << dendl;
232 on_finish->complete(r);
233 }
234
235 MDCache *mdcache;
236 MDLog *mdlog;
237 std::ostream *ss;
238 Context *on_finish;
239
240 // so as to use dout
241 mds_rank_t whoami;
242 int incarnation;
243 };
244
245 class C_Drop_Cache : public MDSInternalContext {
246 public:
247 C_Drop_Cache(Server *server, MDCache *mdcache, MDLog *mdlog,
248 MDSRank *mds, uint64_t recall_timeout,
249 Formatter *f, Context *on_finish)
250 : MDSInternalContext(mds),
251 server(server), mdcache(mdcache), mdlog(mdlog),
252 recall_timeout(recall_timeout), recall_start(mono_clock::now()),
253 f(f), on_finish(on_finish),
254 whoami(mds->whoami), incarnation(mds->incarnation) {
255 }
256
257 void send() {
258 // not really a hard requirement here, but lets ensure this in
259 // case we change the logic here.
260 assert(ceph_mutex_is_locked(mds->mds_lock));
261
262 dout(20) << __func__ << dendl;
263 f->open_object_section("result");
264 recall_client_state();
265 }
266
267 private:
268 // context which completes itself (with -CEPHFS_ETIMEDOUT) after a specified
269 // timeout or when explicitly completed, whichever comes first. Note
270 // that the context does not detroy itself after completion -- it
271 // needs to be explicitly freed.
272 class C_ContextTimeout : public MDSInternalContext {
273 public:
274 C_ContextTimeout(MDSRank *mds, uint64_t timeout, Context *on_finish)
275 : MDSInternalContext(mds),
276 timeout(timeout),
277 on_finish(on_finish) {
278 }
279 ~C_ContextTimeout() {
280 ceph_assert(timer_task == nullptr);
281 }
282
283 void start_timer() {
284 if (!timeout) {
285 return;
286 }
287
288 timer_task = new LambdaContext([this](int) {
289 timer_task = nullptr;
290 complete(-CEPHFS_ETIMEDOUT);
291 });
292 mds->timer.add_event_after(timeout, timer_task);
293 }
294
295 void finish(int r) override {
296 Context *ctx = nullptr;
297 {
298 std::lock_guard locker(lock);
299 std::swap(on_finish, ctx);
300 }
301 if (ctx != nullptr) {
302 ctx->complete(r);
303 }
304 }
305 void complete(int r) override {
306 if (timer_task != nullptr) {
307 mds->timer.cancel_event(timer_task);
308 }
309
310 finish(r);
311 }
312
313 uint64_t timeout;
314 ceph::mutex lock = ceph::make_mutex("mds::context::timeout");
315 Context *on_finish = nullptr;
316 Context *timer_task = nullptr;
317 };
318
319 auto do_trim() {
320 auto [throttled, count] = mdcache->trim(UINT64_MAX);
321 dout(10) << __func__
322 << (throttled ? " (throttled)" : "")
323 << " trimmed " << count << " caps" << dendl;
324 dentries_trimmed += count;
325 return std::make_pair(throttled, count);
326 }
327
328 void recall_client_state() {
329 dout(20) << __func__ << dendl;
330 auto now = mono_clock::now();
331 auto duration = std::chrono::duration<double>(now-recall_start).count();
332
333 MDSGatherBuilder gather(g_ceph_context);
334 auto flags = Server::RecallFlags::STEADY|Server::RecallFlags::TRIM;
335 auto [throttled, count] = server->recall_client_state(&gather, flags);
336 dout(10) << __func__
337 << (throttled ? " (throttled)" : "")
338 << " recalled " << count << " caps" << dendl;
339
340 caps_recalled += count;
341 if ((throttled || count > 0) && (recall_timeout == 0 || duration < recall_timeout)) {
342 C_ContextTimeout *ctx = new C_ContextTimeout(
343 mds, 1, new LambdaContext([this](int r) {
344 recall_client_state();
345 }));
346 ctx->start_timer();
347 gather.set_finisher(new MDSInternalContextWrapper(mds, ctx));
348 gather.activate();
349 mdlog->flush(); /* use down-time to incrementally flush log */
350 do_trim(); /* use down-time to incrementally trim cache */
351 } else {
352 if (!gather.has_subs()) {
353 return handle_recall_client_state(0);
354 } else if (recall_timeout > 0 && duration > recall_timeout) {
355 gather.set_finisher(new C_MDSInternalNoop);
356 gather.activate();
357 return handle_recall_client_state(-CEPHFS_ETIMEDOUT);
358 } else {
359 uint64_t remaining = (recall_timeout == 0 ? 0 : recall_timeout-duration);
360 C_ContextTimeout *ctx = new C_ContextTimeout(
361 mds, remaining, new LambdaContext([this](int r) {
362 handle_recall_client_state(r);
363 }));
364
365 ctx->start_timer();
366 gather.set_finisher(new MDSInternalContextWrapper(mds, ctx));
367 gather.activate();
368 }
369 }
370 }
371
372 void handle_recall_client_state(int r) {
373 dout(20) << __func__ << ": r=" << r << dendl;
374
375 // client recall section
376 f->open_object_section("client_recall");
377 f->dump_int("return_code", r);
378 f->dump_string("message", cpp_strerror(r));
379 f->dump_int("recalled", caps_recalled);
380 f->close_section();
381
382 // we can still continue after recall timeout
383 flush_journal();
384 }
385
386 void flush_journal() {
387 dout(20) << __func__ << dendl;
388
389 Context *ctx = new LambdaContext([this](int r) {
390 handle_flush_journal(r);
391 });
392
393 C_Flush_Journal *flush_journal = new C_Flush_Journal(mdcache, mdlog, mds, &ss, ctx);
394 flush_journal->send();
395 }
396
397 void handle_flush_journal(int r) {
398 dout(20) << __func__ << ": r=" << r << dendl;
399
400 if (r != 0) {
401 cmd_err(f, ss.str());
402 complete(r);
403 return;
404 }
405
406 // journal flush section
407 f->open_object_section("flush_journal");
408 f->dump_int("return_code", r);
409 f->dump_string("message", ss.str());
410 f->close_section();
411
412 trim_cache();
413 }
414
415 void trim_cache() {
416 dout(20) << __func__ << dendl;
417
418 auto [throttled, count] = do_trim();
419 if (throttled && count > 0) {
420 auto timer = new LambdaContext([this](int) {
421 trim_cache();
422 });
423 mds->timer.add_event_after(1.0, timer);
424 } else {
425 cache_status();
426 }
427 }
428
429 void cache_status() {
430 dout(20) << __func__ << dendl;
431
432 f->open_object_section("trim_cache");
433 f->dump_int("trimmed", dentries_trimmed);
434 f->close_section();
435
436 // cache status section
437 mdcache->cache_status(f);
438
439 complete(0);
440 }
441
442 void finish(int r) override {
443 dout(20) << __func__ << ": r=" << r << dendl;
444
445 auto d = std::chrono::duration<double>(mono_clock::now()-recall_start);
446 f->dump_float("duration", d.count());
447
448 f->close_section();
449 on_finish->complete(r);
450 }
451
452 Server *server;
453 MDCache *mdcache;
454 MDLog *mdlog;
455 uint64_t recall_timeout;
456 mono_time recall_start;
457 Formatter *f;
458 Context *on_finish;
459
460 int retval = 0;
461 std::stringstream ss;
462 uint64_t caps_recalled = 0;
463 uint64_t dentries_trimmed = 0;
464
465 // so as to use dout
466 mds_rank_t whoami;
467 int incarnation;
468
469 void cmd_err(Formatter *f, std::string_view err) {
470 f->reset();
471 f->open_object_section("result");
472 f->dump_string("error", err);
473 f->close_section();
474 }
475 };
476
477 MDSRank::MDSRank(
478 mds_rank_t whoami_,
479 std::string fs_name_,
480 ceph::mutex &mds_lock_,
481 LogChannelRef &clog_,
482 SafeTimer &timer_,
483 Beacon &beacon_,
484 std::unique_ptr<MDSMap>& mdsmap_,
485 Messenger *msgr,
486 MonClient *monc_,
487 MgrClient *mgrc,
488 Context *respawn_hook_,
489 Context *suicide_hook_,
490 boost::asio::io_context& ioc) :
491 cct(msgr->cct), mds_lock(mds_lock_), clog(clog_),
492 timer(timer_), mdsmap(mdsmap_),
493 objecter(new Objecter(g_ceph_context, msgr, monc_, ioc)),
494 damage_table(whoami_), sessionmap(this),
495 op_tracker(g_ceph_context, g_conf()->mds_enable_op_tracker,
496 g_conf()->osd_num_op_tracker_shard),
497 progress_thread(this), whoami(whoami_), fs_name(fs_name_),
498 purge_queue(g_ceph_context, whoami_,
499 mdsmap_->get_metadata_pool(), objecter,
500 new LambdaContext([this](int r) {
501 std::lock_guard l(mds_lock);
502 handle_write_error(r);
503 }
504 )
505 ),
506 metrics_handler(cct, this),
507 beacon(beacon_),
508 messenger(msgr), monc(monc_), mgrc(mgrc),
509 respawn_hook(respawn_hook_),
510 suicide_hook(suicide_hook_),
511 starttime(mono_clock::now()),
512 ioc(ioc)
513 {
514 hb = g_ceph_context->get_heartbeat_map()->add_worker("MDSRank", pthread_self());
515
516 // The metadata pool won't change in the whole life time
517 // of the fs, with this we can get rid of the mds_lock
518 // in many places too.
519 metadata_pool = mdsmap->get_metadata_pool();
520
521 purge_queue.update_op_limit(*mdsmap);
522
523 objecter->unset_honor_pool_full();
524
525 finisher = new Finisher(cct, "MDSRank", "MR_Finisher");
526
527 mdcache = new MDCache(this, purge_queue);
528 mdlog = new MDLog(this);
529 balancer = new MDBalancer(this, messenger, monc);
530
531 scrubstack = new ScrubStack(mdcache, clog, finisher);
532
533 inotable = new InoTable(this);
534 snapserver = new SnapServer(this, monc);
535 snapclient = new SnapClient(this);
536
537 server = new Server(this, &metrics_handler);
538 locker = new Locker(this, mdcache);
539
540 heartbeat_grace = g_conf().get_val<double>("mds_heartbeat_grace");
541 op_tracker.set_complaint_and_threshold(cct->_conf->mds_op_complaint_time,
542 cct->_conf->mds_op_log_threshold);
543 op_tracker.set_history_size_and_duration(cct->_conf->mds_op_history_size,
544 cct->_conf->mds_op_history_duration);
545
546 schedule_update_timer_task();
547 }
548
549 MDSRank::~MDSRank()
550 {
551 if (hb) {
552 g_ceph_context->get_heartbeat_map()->remove_worker(hb);
553 }
554
555 if (scrubstack) { delete scrubstack; scrubstack = NULL; }
556 if (mdcache) { delete mdcache; mdcache = NULL; }
557 if (mdlog) { delete mdlog; mdlog = NULL; }
558 if (balancer) { delete balancer; balancer = NULL; }
559 if (inotable) { delete inotable; inotable = NULL; }
560 if (snapserver) { delete snapserver; snapserver = NULL; }
561 if (snapclient) { delete snapclient; snapclient = NULL; }
562
563 if (server) { delete server; server = 0; }
564 if (locker) { delete locker; locker = 0; }
565
566 if (logger) {
567 g_ceph_context->get_perfcounters_collection()->remove(logger);
568 delete logger;
569 logger = 0;
570 }
571 if (mlogger) {
572 g_ceph_context->get_perfcounters_collection()->remove(mlogger);
573 delete mlogger;
574 mlogger = 0;
575 }
576
577 delete finisher;
578 finisher = NULL;
579
580 delete suicide_hook;
581 suicide_hook = NULL;
582
583 delete respawn_hook;
584 respawn_hook = NULL;
585
586 delete objecter;
587 objecter = nullptr;
588 }
589
590 void MDSRankDispatcher::init()
591 {
592 objecter->init();
593 messenger->add_dispatcher_head(objecter);
594
595 objecter->start();
596
597 update_log_config();
598 create_logger();
599
600 // Expose the OSDMap (already populated during MDS::init) to anyone
601 // who is interested in it.
602 handle_osd_map();
603
604 progress_thread.create("mds_rank_progr");
605
606 purge_queue.init();
607
608 finisher->start();
609 }
610
611 void MDSRank::update_targets()
612 {
613 // get MonMap's idea of my export_targets
614 const set<mds_rank_t>& map_targets = mdsmap->get_mds_info(get_nodeid()).export_targets;
615
616 dout(20) << "updating export targets, currently " << map_targets.size() << " ranks are targets" << dendl;
617
618 bool send = false;
619 set<mds_rank_t> new_map_targets;
620
621 auto it = export_targets.begin();
622 while (it != export_targets.end()) {
623 mds_rank_t rank = it->first;
624 auto &counter = it->second;
625 dout(20) << "export target mds." << rank << " is " << counter << dendl;
626
627 double val = counter.get();
628 if (val <= 0.01) {
629 dout(15) << "export target mds." << rank << " is no longer an export target" << dendl;
630 export_targets.erase(it++);
631 send = true;
632 continue;
633 }
634 if (!map_targets.count(rank)) {
635 dout(15) << "export target mds." << rank << " not in map's export_targets" << dendl;
636 send = true;
637 }
638 new_map_targets.insert(rank);
639 it++;
640 }
641 if (new_map_targets.size() < map_targets.size()) {
642 dout(15) << "export target map holds stale targets, sending update" << dendl;
643 send = true;
644 }
645
646 if (send) {
647 dout(15) << "updating export_targets, now " << new_map_targets.size() << " ranks are targets" << dendl;
648 auto m = make_message<MMDSLoadTargets>(mds_gid_t(monc->get_global_id()), new_map_targets);
649 monc->send_mon_message(m.detach());
650 }
651 }
652
653 void MDSRank::hit_export_target(mds_rank_t rank, double amount)
654 {
655 double rate = g_conf()->mds_bal_target_decay;
656 if (amount < 0.0) {
657 amount = 100.0/g_conf()->mds_bal_target_decay; /* a good default for "i am trying to keep this export_target active" */
658 }
659 auto em = export_targets.emplace(std::piecewise_construct, std::forward_as_tuple(rank), std::forward_as_tuple(DecayRate(rate)));
660 auto &counter = em.first->second;
661 counter.hit(amount);
662 if (em.second) {
663 dout(15) << "hit export target (new) is " << counter << dendl;
664 } else {
665 dout(15) << "hit export target is " << counter << dendl;
666 }
667 }
668
669 class C_MDS_MonCommand : public MDSInternalContext {
670 std::string cmd;
671 public:
672 std::string outs;
673 C_MDS_MonCommand(MDSRank *m, std::string_view c)
674 : MDSInternalContext(m), cmd(c) {}
675 void finish(int r) override {
676 mds->_mon_command_finish(r, cmd, outs);
677 }
678 };
679
680 void MDSRank::_mon_command_finish(int r, std::string_view cmd, std::string_view outs)
681 {
682 if (r < 0) {
683 dout(0) << __func__ << ": mon command " << cmd << " failed with errno " << r
684 << " (" << outs << ")" << dendl;
685 } else {
686 dout(1) << __func__ << ": mon command " << cmd << " succeed" << dendl;
687 }
688 }
689
690 void MDSRank::set_mdsmap_multimds_snaps_allowed()
691 {
692 static bool already_sent = false;
693 if (already_sent)
694 return;
695
696 CachedStackStringStream css;
697 *css << "{\"prefix\":\"fs set\", \"fs_name\":\"" << mdsmap->get_fs_name() << "\", ";
698 *css << "\"var\":\"allow_multimds_snaps\", \"val\":\"true\", ";
699 *css << "\"confirm\":\"--yes-i-am-really-a-mds\"}";
700 std::vector<std::string> cmd = {css->str()};
701
702 dout(0) << __func__ << ": sending mon command: " << cmd[0] << dendl;
703
704 C_MDS_MonCommand *fin = new C_MDS_MonCommand(this, cmd[0]);
705 monc->start_mon_command(cmd, {}, nullptr, &fin->outs, new C_IO_Wrapper(this, fin));
706
707 already_sent = true;
708 }
709
710 void MDSRankDispatcher::tick()
711 {
712 heartbeat_reset();
713
714 if (beacon.is_laggy()) {
715 dout(1) << "skipping upkeep work because connection to Monitors appears laggy" << dendl;
716 return;
717 }
718
719 check_ops_in_flight();
720
721 // Wake up thread in case we use to be laggy and have waiting_for_nolaggy
722 // messages to progress.
723 progress_thread.signal();
724
725 // make sure mds log flushes, trims periodically
726 mdlog->flush();
727
728 // update average session uptime
729 sessionmap.update_average_session_age();
730
731 if (is_active() || is_stopping()) {
732 mdlog->trim(); // NOT during recovery!
733 }
734
735 // ...
736 if (is_clientreplay() || is_active() || is_stopping()) {
737 server->find_idle_sessions();
738 server->evict_cap_revoke_non_responders();
739 locker->tick();
740 }
741
742 // log
743 if (logger) {
744 logger->set(l_mds_subtrees, mdcache->num_subtrees());
745 mdcache->log_stat();
746 }
747
748 if (is_reconnect())
749 server->reconnect_tick();
750
751 if (is_active()) {
752 balancer->tick();
753 mdcache->find_stale_fragment_freeze();
754 mdcache->migrator->find_stale_export_freeze();
755
756 if (mdsmap->get_tableserver() == whoami) {
757 snapserver->check_osd_map(false);
758 // Filesystem was created by pre-mimic mds. Allow multi-active mds after
759 // all old snapshots are deleted.
760 if (!mdsmap->allows_multimds_snaps() &&
761 snapserver->can_allow_multimds_snaps()) {
762 set_mdsmap_multimds_snaps_allowed();
763 }
764 }
765
766 if (whoami == 0)
767 scrubstack->advance_scrub_status();
768 }
769
770 if (is_active() || is_stopping()) {
771 update_targets();
772 }
773
774 // shut down?
775 if (is_stopping()) {
776 mdlog->trim();
777 if (mdcache->shutdown_pass()) {
778 uint64_t pq_progress = 0 ;
779 uint64_t pq_total = 0;
780 size_t pq_in_flight = 0;
781 if (!purge_queue.drain(&pq_progress, &pq_total, &pq_in_flight)) {
782 dout(7) << "shutdown_pass=true, but still waiting for purge queue"
783 << dendl;
784 // This takes unbounded time, so we must indicate progress
785 // to the administrator: we do it in a slightly imperfect way
786 // by sending periodic (tick frequency) clog messages while
787 // in this state.
788 clog->info() << "MDS rank " << whoami << " waiting for purge queue ("
789 << std::dec << pq_progress << "/" << pq_total << " " << pq_in_flight
790 << " files purging" << ")";
791 } else {
792 dout(7) << "shutdown_pass=true, finished w/ shutdown, moving to "
793 "down:stopped" << dendl;
794 stopping_done();
795 }
796 }
797 else {
798 dout(7) << "shutdown_pass=false" << dendl;
799 }
800 }
801
802 // Expose ourselves to Beacon to update health indicators
803 beacon.notify_health(this);
804 }
805
806 void MDSRankDispatcher::shutdown()
807 {
808 // It should never be possible for shutdown to get called twice, because
809 // anyone picking up mds_lock checks if stopping is true and drops
810 // out if it is.
811 ceph_assert(stopping == false);
812 stopping = true;
813
814 dout(1) << __func__ << ": shutting down rank " << whoami << dendl;
815
816 g_conf().remove_observer(this);
817
818 timer.shutdown();
819
820 // MDLog has to shut down before the finisher, because some of its
821 // threads block on IOs that require finisher to complete.
822 mdlog->shutdown();
823
824 // shut down cache
825 mdcache->shutdown();
826
827 purge_queue.shutdown();
828
829 // shutdown metrics handler/updater -- this is ok even if it was not
830 // inited.
831 metrics_handler.shutdown();
832
833 // shutdown metric aggergator
834 if (metric_aggregator != nullptr) {
835 metric_aggregator->shutdown();
836 }
837
838 mds_lock.unlock();
839 finisher->stop(); // no flushing
840 mds_lock.lock();
841
842 if (objecter->initialized)
843 objecter->shutdown();
844
845 monc->shutdown();
846
847 op_tracker.on_shutdown();
848
849 progress_thread.shutdown();
850
851 // release mds_lock for finisher/messenger threads (e.g.
852 // MDSDaemon::ms_handle_reset called from Messenger).
853 mds_lock.unlock();
854
855 // shut down messenger
856 messenger->shutdown();
857
858 mds_lock.lock();
859
860 // Workaround unclean shutdown: HeartbeatMap will assert if
861 // worker is not removed (as we do in ~MDS), but ~MDS is not
862 // always called after suicide.
863 if (hb) {
864 g_ceph_context->get_heartbeat_map()->remove_worker(hb);
865 hb = NULL;
866 }
867 }
868
869 /**
870 * Helper for simple callbacks that call a void fn with no args.
871 */
872 class C_MDS_VoidFn : public MDSInternalContext
873 {
874 typedef void (MDSRank::*fn_ptr)();
875 protected:
876 fn_ptr fn;
877 public:
878 C_MDS_VoidFn(MDSRank *mds_, fn_ptr fn_)
879 : MDSInternalContext(mds_), fn(fn_)
880 {
881 ceph_assert(mds_);
882 ceph_assert(fn_);
883 }
884
885 void finish(int r) override
886 {
887 (mds->*fn)();
888 }
889 };
890
891 MDSTableClient *MDSRank::get_table_client(int t)
892 {
893 switch (t) {
894 case TABLE_ANCHOR: return NULL;
895 case TABLE_SNAP: return snapclient;
896 default: ceph_abort();
897 }
898 }
899
900 MDSTableServer *MDSRank::get_table_server(int t)
901 {
902 switch (t) {
903 case TABLE_ANCHOR: return NULL;
904 case TABLE_SNAP: return snapserver;
905 default: ceph_abort();
906 }
907 }
908
909 void MDSRank::suicide()
910 {
911 if (suicide_hook) {
912 suicide_hook->complete(0);
913 suicide_hook = NULL;
914 }
915 }
916
917 void MDSRank::respawn()
918 {
919 if (respawn_hook) {
920 respawn_hook->complete(0);
921 respawn_hook = NULL;
922 }
923 }
924
925 void MDSRank::damaged()
926 {
927 ceph_assert(whoami != MDS_RANK_NONE);
928 ceph_assert(ceph_mutex_is_locked_by_me(mds_lock));
929
930 beacon.set_want_state(*mdsmap, MDSMap::STATE_DAMAGED);
931 monc->flush_log(); // Flush any clog error from before we were called
932 beacon.notify_health(this); // Include latest status in our swan song
933 beacon.send_and_wait(g_conf()->mds_mon_shutdown_timeout);
934
935 // It's okay if we timed out and the mon didn't get our beacon, because
936 // another daemon (or ourselves after respawn) will eventually take the
937 // rank and report DAMAGED again when it hits same problem we did.
938
939 respawn(); // Respawn into standby in case mon has other work for us
940 }
941
942 void MDSRank::damaged_unlocked()
943 {
944 std::lock_guard l(mds_lock);
945 damaged();
946 }
947
948 void MDSRank::handle_write_error(int err)
949 {
950 if (err == -CEPHFS_EBLOCKLISTED) {
951 derr << "we have been blocklisted (fenced), respawning..." << dendl;
952 respawn();
953 return;
954 }
955
956 if (g_conf()->mds_action_on_write_error >= 2) {
957 derr << "unhandled write error " << cpp_strerror(err) << ", suicide..." << dendl;
958 respawn();
959 } else if (g_conf()->mds_action_on_write_error == 1) {
960 derr << "unhandled write error " << cpp_strerror(err) << ", force readonly..." << dendl;
961 mdcache->force_readonly();
962 } else {
963 // ignore;
964 derr << "unhandled write error " << cpp_strerror(err) << ", ignore..." << dendl;
965 }
966 }
967
968 void MDSRank::handle_write_error_with_lock(int err)
969 {
970 std::scoped_lock l(mds_lock);
971 handle_write_error(err);
972 }
973
974 void *MDSRank::ProgressThread::entry()
975 {
976 std::unique_lock l(mds->mds_lock);
977 while (true) {
978 cond.wait(l, [this] {
979 return (mds->stopping ||
980 !mds->finished_queue.empty() ||
981 (!mds->waiting_for_nolaggy.empty() && !mds->beacon.is_laggy()));
982 });
983
984 if (mds->stopping) {
985 break;
986 }
987
988 mds->_advance_queues();
989 }
990
991 return NULL;
992 }
993
994
995 void MDSRank::ProgressThread::shutdown()
996 {
997 ceph_assert(ceph_mutex_is_locked_by_me(mds->mds_lock));
998 ceph_assert(mds->stopping);
999
1000 if (am_self()) {
1001 // Stopping is set, we will fall out of our main loop naturally
1002 } else {
1003 // Kick the thread to notice mds->stopping, and join it
1004 cond.notify_all();
1005 mds->mds_lock.unlock();
1006 if (is_started())
1007 join();
1008 mds->mds_lock.lock();
1009 }
1010 }
1011
1012 bool MDSRankDispatcher::ms_dispatch(const cref_t<Message> &m)
1013 {
1014 if (m->get_source().is_mds()) {
1015 const Message *msg = m.get();
1016 const MMDSOp *op = dynamic_cast<const MMDSOp*>(msg);
1017 if (!op)
1018 dout(0) << typeid(*msg).name() << " is not an MMDSOp type" << dendl;
1019 ceph_assert(op);
1020 }
1021 else if (m->get_source().is_client()) {
1022 Session *session = static_cast<Session*>(m->get_connection()->get_priv().get());
1023 if (session)
1024 session->last_seen = Session::clock::now();
1025 }
1026
1027 inc_dispatch_depth();
1028 bool ret = _dispatch(m, true);
1029 dec_dispatch_depth();
1030 return ret;
1031 }
1032
1033 bool MDSRank::_dispatch(const cref_t<Message> &m, bool new_msg)
1034 {
1035 if (is_stale_message(m)) {
1036 return true;
1037 }
1038 // do not proceed if this message cannot be handled
1039 if (!is_valid_message(m)) {
1040 return false;
1041 }
1042
1043 if (beacon.is_laggy()) {
1044 dout(5) << " laggy, deferring " << *m << dendl;
1045 waiting_for_nolaggy.push_back(m);
1046 } else if (new_msg && !waiting_for_nolaggy.empty()) {
1047 dout(5) << " there are deferred messages, deferring " << *m << dendl;
1048 waiting_for_nolaggy.push_back(m);
1049 } else {
1050 handle_message(m);
1051 heartbeat_reset();
1052 }
1053
1054 if (dispatch_depth > 1)
1055 return true;
1056
1057 // finish any triggered contexts
1058 _advance_queues();
1059
1060 if (beacon.is_laggy()) {
1061 // We've gone laggy during dispatch, don't do any
1062 // more housekeeping
1063 return true;
1064 }
1065
1066 // hack: thrash exports
1067 static utime_t start;
1068 utime_t now = ceph_clock_now();
1069 if (start == utime_t())
1070 start = now;
1071 /*double el = now - start;
1072 if (el > 30.0 &&
1073 el < 60.0)*/
1074 for (int i=0; i<g_conf()->mds_thrash_exports; i++) {
1075 set<mds_rank_t> s;
1076 if (!is_active()) break;
1077 mdsmap->get_mds_set(s, MDSMap::STATE_ACTIVE);
1078 if (s.size() < 2 || CInode::count() < 10)
1079 break; // need peers for this to work.
1080 if (mdcache->migrator->get_num_exporting() > g_conf()->mds_thrash_exports * 5 ||
1081 mdcache->migrator->get_export_queue_size() > g_conf()->mds_thrash_exports * 10)
1082 break;
1083
1084 dout(7) << "mds thrashing exports pass " << (i+1) << "/" << g_conf()->mds_thrash_exports << dendl;
1085
1086 // pick a random dir inode
1087 CInode *in = mdcache->hack_pick_random_inode();
1088
1089 auto&& ls = in->get_dirfrags();
1090 if (!ls.empty()) { // must be an open dir.
1091 const auto& dir = ls[rand() % ls.size()];
1092 if (!dir->get_parent_dir()) continue; // must be linked.
1093 if (!dir->is_auth()) continue; // must be auth.
1094
1095 mds_rank_t dest;
1096 do {
1097 int k = rand() % s.size();
1098 set<mds_rank_t>::iterator p = s.begin();
1099 while (k--) ++p;
1100 dest = *p;
1101 } while (dest == whoami);
1102 mdcache->migrator->export_dir_nicely(dir,dest);
1103 }
1104 }
1105 // hack: thrash fragments
1106 for (int i=0; i<g_conf()->mds_thrash_fragments; i++) {
1107 if (!is_active()) break;
1108 if (mdcache->get_num_fragmenting_dirs() > 5 * g_conf()->mds_thrash_fragments) break;
1109 dout(7) << "mds thrashing fragments pass " << (i+1) << "/" << g_conf()->mds_thrash_fragments << dendl;
1110
1111 // pick a random dir inode
1112 CInode *in = mdcache->hack_pick_random_inode();
1113
1114 auto&& ls = in->get_dirfrags();
1115 if (ls.empty()) continue; // must be an open dir.
1116 CDir *dir = ls.front();
1117 if (!dir->get_parent_dir()) continue; // must be linked.
1118 if (!dir->is_auth()) continue; // must be auth.
1119 frag_t fg = dir->get_frag();
1120 if ((fg == frag_t() || (rand() % (1 << fg.bits()) == 0))) {
1121 mdcache->split_dir(dir, 1);
1122 } else {
1123 balancer->queue_merge(dir);
1124 }
1125 }
1126
1127 // hack: force hash root?
1128 /*
1129 if (false &&
1130 mdcache->get_root() &&
1131 mdcache->get_root()->dir &&
1132 !(mdcache->get_root()->dir->is_hashed() ||
1133 mdcache->get_root()->dir->is_hashing())) {
1134 dout(0) << "hashing root" << dendl;
1135 mdcache->migrator->hash_dir(mdcache->get_root()->dir);
1136 }
1137 */
1138
1139 update_mlogger();
1140 return true;
1141 }
1142
1143 void MDSRank::update_mlogger()
1144 {
1145 if (mlogger) {
1146 mlogger->set(l_mdm_ino, CInode::count());
1147 mlogger->set(l_mdm_dir, CDir::count());
1148 mlogger->set(l_mdm_dn, CDentry::count());
1149 mlogger->set(l_mdm_cap, Capability::count());
1150 mlogger->set(l_mdm_inoa, CInode::increments());
1151 mlogger->set(l_mdm_inos, CInode::decrements());
1152 mlogger->set(l_mdm_dira, CDir::increments());
1153 mlogger->set(l_mdm_dirs, CDir::decrements());
1154 mlogger->set(l_mdm_dna, CDentry::increments());
1155 mlogger->set(l_mdm_dns, CDentry::decrements());
1156 mlogger->set(l_mdm_capa, Capability::increments());
1157 mlogger->set(l_mdm_caps, Capability::decrements());
1158 }
1159 }
1160
1161 // message types that the mds can handle
1162 bool MDSRank::is_valid_message(const cref_t<Message> &m) {
1163 int port = m->get_type() & 0xff00;
1164 int type = m->get_type();
1165
1166 if (port == MDS_PORT_CACHE ||
1167 port == MDS_PORT_MIGRATOR ||
1168 type == CEPH_MSG_CLIENT_SESSION ||
1169 type == CEPH_MSG_CLIENT_RECONNECT ||
1170 type == CEPH_MSG_CLIENT_RECLAIM ||
1171 type == CEPH_MSG_CLIENT_REQUEST ||
1172 type == MSG_MDS_PEER_REQUEST ||
1173 type == MSG_MDS_HEARTBEAT ||
1174 type == MSG_MDS_TABLE_REQUEST ||
1175 type == MSG_MDS_LOCK ||
1176 type == MSG_MDS_INODEFILECAPS ||
1177 type == MSG_MDS_SCRUB ||
1178 type == MSG_MDS_SCRUB_STATS ||
1179 type == CEPH_MSG_CLIENT_CAPS ||
1180 type == CEPH_MSG_CLIENT_CAPRELEASE ||
1181 type == CEPH_MSG_CLIENT_LEASE) {
1182 return true;
1183 }
1184
1185 return false;
1186 }
1187
1188 /*
1189 * lower priority messages we defer if we seem laggy
1190 */
1191
1192 #define ALLOW_MESSAGES_FROM(peers) \
1193 do { \
1194 if (m->get_connection() && (m->get_connection()->get_peer_type() & (peers)) == 0) { \
1195 dout(0) << __FILE__ << "." << __LINE__ << ": filtered out request, peer=" << m->get_connection()->get_peer_type() \
1196 << " allowing=" << #peers << " message=" << *m << dendl; \
1197 return; \
1198 } \
1199 } while (0)
1200
1201 void MDSRank::handle_message(const cref_t<Message> &m)
1202 {
1203 int port = m->get_type() & 0xff00;
1204
1205 switch (port) {
1206 case MDS_PORT_CACHE:
1207 ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MDS);
1208 mdcache->dispatch(m);
1209 break;
1210
1211 case MDS_PORT_MIGRATOR:
1212 ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MDS);
1213 mdcache->migrator->dispatch(m);
1214 break;
1215
1216 default:
1217 switch (m->get_type()) {
1218 // SERVER
1219 case CEPH_MSG_CLIENT_SESSION:
1220 case CEPH_MSG_CLIENT_RECONNECT:
1221 case CEPH_MSG_CLIENT_RECLAIM:
1222 ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_CLIENT);
1223 // fall-thru
1224 case CEPH_MSG_CLIENT_REQUEST:
1225 server->dispatch(m);
1226 break;
1227 case MSG_MDS_PEER_REQUEST:
1228 ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MDS);
1229 server->dispatch(m);
1230 break;
1231
1232 case MSG_MDS_HEARTBEAT:
1233 ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MDS);
1234 balancer->proc_message(m);
1235 break;
1236
1237 case MSG_MDS_TABLE_REQUEST:
1238 ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MDS);
1239 {
1240 const cref_t<MMDSTableRequest> &req = ref_cast<MMDSTableRequest>(m);
1241 if (req->op < 0) {
1242 MDSTableClient *client = get_table_client(req->table);
1243 client->handle_request(req);
1244 } else {
1245 MDSTableServer *server = get_table_server(req->table);
1246 server->handle_request(req);
1247 }
1248 }
1249 break;
1250
1251 case MSG_MDS_LOCK:
1252 case MSG_MDS_INODEFILECAPS:
1253 ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MDS);
1254 locker->dispatch(m);
1255 break;
1256
1257 case CEPH_MSG_CLIENT_CAPS:
1258 case CEPH_MSG_CLIENT_CAPRELEASE:
1259 case CEPH_MSG_CLIENT_LEASE:
1260 ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_CLIENT);
1261 locker->dispatch(m);
1262 break;
1263
1264 case MSG_MDS_SCRUB:
1265 case MSG_MDS_SCRUB_STATS:
1266 ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MDS);
1267 scrubstack->dispatch(m);
1268 break;
1269
1270 default:
1271 derr << "unrecognized message " << *m << dendl;
1272 }
1273 }
1274 }
1275
1276 /**
1277 * Advance finished_queue and waiting_for_nolaggy.
1278 *
1279 * Usually drain both queues, but may not drain waiting_for_nolaggy
1280 * if beacon is currently laggy.
1281 */
1282 void MDSRank::_advance_queues()
1283 {
1284 ceph_assert(ceph_mutex_is_locked_by_me(mds_lock));
1285
1286 if (!finished_queue.empty()) {
1287 dout(7) << "mds has " << finished_queue.size() << " queued contexts" << dendl;
1288 while (!finished_queue.empty()) {
1289 auto fin = finished_queue.front();
1290 finished_queue.pop_front();
1291
1292 dout(10) << " finish " << fin << dendl;
1293 fin->complete(0);
1294
1295 heartbeat_reset();
1296 }
1297 }
1298
1299 while (!waiting_for_nolaggy.empty()) {
1300 // stop if we're laggy now!
1301 if (beacon.is_laggy())
1302 break;
1303
1304 cref_t<Message> old = waiting_for_nolaggy.front();
1305 waiting_for_nolaggy.pop_front();
1306
1307 if (!is_stale_message(old)) {
1308 dout(7) << " processing laggy deferred " << *old << dendl;
1309 ceph_assert(is_valid_message(old));
1310 handle_message(old);
1311 }
1312
1313 heartbeat_reset();
1314 }
1315 }
1316
1317 /**
1318 * Call this when you take mds_lock, or periodically if you're going to
1319 * hold the lock for a long time (e.g. iterating over clients/inodes)
1320 */
1321 void MDSRank::heartbeat_reset()
1322 {
1323 // Any thread might jump into mds_lock and call us immediately
1324 // after a call to suicide() completes, in which case MDSRank::hb
1325 // has been freed and we are a no-op.
1326 if (!hb) {
1327 ceph_assert(stopping);
1328 return;
1329 }
1330
1331 // NB not enabling suicide grace, because the mon takes care of killing us
1332 // (by blocklisting us) when we fail to send beacons, and it's simpler to
1333 // only have one way of dying.
1334 g_ceph_context->get_heartbeat_map()->reset_timeout(hb,
1335 ceph::make_timespan(heartbeat_grace),
1336 ceph::timespan::zero());
1337 }
1338
1339 bool MDSRank::is_stale_message(const cref_t<Message> &m) const
1340 {
1341 // from bad mds?
1342 if (m->get_source().is_mds()) {
1343 mds_rank_t from = mds_rank_t(m->get_source().num());
1344 bool bad = false;
1345 if (mdsmap->is_down(from)) {
1346 bad = true;
1347 } else {
1348 // FIXME: this is a convoluted check. we should be maintaining a nice
1349 // clean map of current ConnectionRefs for current mdses!!!
1350 auto c = messenger->connect_to(CEPH_ENTITY_TYPE_MDS,
1351 mdsmap->get_addrs(from));
1352 if (c != m->get_connection()) {
1353 bad = true;
1354 dout(5) << " mds." << from << " should be " << c << " "
1355 << c->get_peer_addrs() << " but this message is "
1356 << m->get_connection() << " " << m->get_source_addrs()
1357 << dendl;
1358 }
1359 }
1360 if (bad) {
1361 // bogus mds?
1362 if (m->get_type() == CEPH_MSG_MDS_MAP) {
1363 dout(5) << "got " << *m << " from old/bad/imposter mds " << m->get_source()
1364 << ", but it's an mdsmap, looking at it" << dendl;
1365 } else if (m->get_type() == MSG_MDS_CACHEEXPIRE &&
1366 mdsmap->get_addrs(from) == m->get_source_addrs()) {
1367 dout(5) << "got " << *m << " from down mds " << m->get_source()
1368 << ", but it's a cache_expire, looking at it" << dendl;
1369 } else {
1370 dout(5) << "got " << *m << " from down/old/bad/imposter mds " << m->get_source()
1371 << ", dropping" << dendl;
1372 return true;
1373 }
1374 }
1375 }
1376 return false;
1377 }
1378
1379 Session *MDSRank::get_session(const cref_t<Message> &m)
1380 {
1381 // do not carry ref
1382 auto session = static_cast<Session *>(m->get_connection()->get_priv().get());
1383 if (session) {
1384 dout(20) << "get_session have " << session << " " << session->info.inst
1385 << " state " << session->get_state_name() << dendl;
1386 // Check if we've imported an open session since (new sessions start closed)
1387 if (session->is_closed()) {
1388 Session *imported_session = sessionmap.get_session(session->info.inst.name);
1389 if (imported_session && imported_session != session) {
1390 dout(10) << __func__ << " replacing connection bootstrap session "
1391 << session << " with imported session " << imported_session
1392 << dendl;
1393 imported_session->info.auth_name = session->info.auth_name;
1394 //assert(session->info.auth_name == imported_session->info.auth_name);
1395 ceph_assert(session->info.inst == imported_session->info.inst);
1396 imported_session->set_connection(session->get_connection().get());
1397 // send out any queued messages
1398 while (!session->preopen_out_queue.empty()) {
1399 imported_session->get_connection()->send_message2(std::move(session->preopen_out_queue.front()));
1400 session->preopen_out_queue.pop_front();
1401 }
1402 imported_session->auth_caps = session->auth_caps;
1403 imported_session->last_seen = session->last_seen;
1404 ceph_assert(session->get_nref() == 1);
1405 imported_session->get_connection()->set_priv(imported_session->get());
1406 session = imported_session;
1407 }
1408 }
1409 } else {
1410 dout(20) << "get_session dne for " << m->get_source_inst() << dendl;
1411 }
1412 return session;
1413 }
1414
1415 void MDSRank::send_message(const ref_t<Message>& m, const ConnectionRef& c)
1416 {
1417 ceph_assert(c);
1418 c->send_message2(m);
1419 }
1420
1421
1422 void MDSRank::send_message_mds(const ref_t<Message>& m, mds_rank_t mds)
1423 {
1424 if (!mdsmap->is_up(mds)) {
1425 dout(10) << "send_message_mds mds." << mds << " not up, dropping " << *m << dendl;
1426 return;
1427 }
1428
1429 // send mdsmap first?
1430 auto addrs = mdsmap->get_addrs(mds);
1431 if (mds != whoami && peer_mdsmap_epoch[mds] < mdsmap->get_epoch()) {
1432 auto _m = make_message<MMDSMap>(monc->get_fsid(), *mdsmap,
1433 std::string(mdsmap->get_fs_name()));
1434 send_message_mds(_m, addrs);
1435 peer_mdsmap_epoch[mds] = mdsmap->get_epoch();
1436 }
1437
1438 // send message
1439 send_message_mds(m, addrs);
1440 }
1441
1442 void MDSRank::send_message_mds(const ref_t<Message>& m, const entity_addrvec_t &addr)
1443 {
1444 messenger->send_to_mds(ref_t<Message>(m).detach(), addr);
1445 }
1446
1447 void MDSRank::forward_message_mds(const cref_t<MClientRequest>& m, mds_rank_t mds)
1448 {
1449 ceph_assert(mds != whoami);
1450
1451 /*
1452 * don't actually forward if non-idempotent!
1453 * client has to do it. although the MDS will ignore duplicate requests,
1454 * the affected metadata may migrate, in which case the new authority
1455 * won't have the metareq_id in the completed request map.
1456 */
1457 // NEW: always make the client resend!
1458 bool client_must_resend = true; //!creq->can_forward();
1459
1460 // tell the client where it should go
1461 auto session = get_session(m);
1462 auto f = make_message<MClientRequestForward>(m->get_tid(), mds, m->get_num_fwd()+1, client_must_resend);
1463 send_message_client(f, session);
1464 }
1465
1466 void MDSRank::send_message_client_counted(const ref_t<Message>& m, client_t client)
1467 {
1468 Session *session = sessionmap.get_session(entity_name_t::CLIENT(client.v));
1469 if (session) {
1470 send_message_client_counted(m, session);
1471 } else {
1472 dout(10) << "send_message_client_counted no session for client." << client << " " << *m << dendl;
1473 }
1474 }
1475
1476 void MDSRank::send_message_client_counted(const ref_t<Message>& m, const ConnectionRef& connection)
1477 {
1478 // do not carry ref
1479 auto session = static_cast<Session *>(connection->get_priv().get());
1480 if (session) {
1481 send_message_client_counted(m, session);
1482 } else {
1483 dout(10) << "send_message_client_counted has no session for " << m->get_source_inst() << dendl;
1484 // another Connection took over the Session
1485 }
1486 }
1487
1488 void MDSRank::send_message_client_counted(const ref_t<Message>& m, Session* session)
1489 {
1490 version_t seq = session->inc_push_seq();
1491 dout(10) << "send_message_client_counted " << session->info.inst.name << " seq "
1492 << seq << " " << *m << dendl;
1493 if (session->get_connection()) {
1494 session->get_connection()->send_message2(m);
1495 } else {
1496 session->preopen_out_queue.push_back(m);
1497 }
1498 }
1499
1500 void MDSRank::send_message_client(const ref_t<Message>& m, Session* session)
1501 {
1502 dout(10) << "send_message_client " << session->info.inst << " " << *m << dendl;
1503 if (session->get_connection()) {
1504 session->get_connection()->send_message2(m);
1505 } else {
1506 session->preopen_out_queue.push_back(m);
1507 }
1508 }
1509
1510 /**
1511 * This is used whenever a RADOS operation has been cancelled
1512 * or a RADOS client has been blocklisted, to cause the MDS and
1513 * any clients to wait for this OSD epoch before using any new caps.
1514 *
1515 * See doc/cephfs/eviction
1516 */
1517 void MDSRank::set_osd_epoch_barrier(epoch_t e)
1518 {
1519 dout(4) << __func__ << ": epoch=" << e << dendl;
1520 osd_epoch_barrier = e;
1521 }
1522
1523 void MDSRank::retry_dispatch(const cref_t<Message> &m)
1524 {
1525 inc_dispatch_depth();
1526 _dispatch(m, false);
1527 dec_dispatch_depth();
1528 }
1529
1530 double MDSRank::get_dispatch_queue_max_age(utime_t now) const
1531 {
1532 return messenger->get_dispatch_queue_max_age(now);
1533 }
1534
1535 bool MDSRank::is_daemon_stopping() const
1536 {
1537 return stopping;
1538 }
1539
1540 void MDSRank::request_state(MDSMap::DaemonState s)
1541 {
1542 dout(3) << "request_state " << ceph_mds_state_name(s) << dendl;
1543 beacon.set_want_state(*mdsmap, s);
1544 beacon.send();
1545 }
1546
1547
1548 class C_MDS_BootStart : public MDSInternalContext {
1549 MDSRank::BootStep nextstep;
1550 public:
1551 C_MDS_BootStart(MDSRank *m, MDSRank::BootStep n)
1552 : MDSInternalContext(m), nextstep(n) {}
1553 void finish(int r) override {
1554 mds->boot_start(nextstep, r);
1555 }
1556 };
1557
1558
1559 void MDSRank::boot_start(BootStep step, int r)
1560 {
1561 // Handle errors from previous step
1562 if (r < 0) {
1563 if (is_standby_replay() && (r == -CEPHFS_EAGAIN)) {
1564 dout(0) << "boot_start encountered an error CEPHFS_EAGAIN"
1565 << ", respawning since we fell behind journal" << dendl;
1566 respawn();
1567 } else if (r == -CEPHFS_EINVAL || r == -CEPHFS_ENOENT) {
1568 // Invalid or absent data, indicates damaged on-disk structures
1569 clog->error() << "Error loading MDS rank " << whoami << ": "
1570 << cpp_strerror(r);
1571 damaged();
1572 ceph_assert(r == 0); // Unreachable, damaged() calls respawn()
1573 } else if (r == -CEPHFS_EROFS) {
1574 dout(0) << "boot error forcing transition to read-only; MDS will try to continue" << dendl;
1575 } else {
1576 // Completely unexpected error, give up and die
1577 dout(0) << "boot_start encountered an error, failing" << dendl;
1578 suicide();
1579 return;
1580 }
1581 }
1582
1583 ceph_assert(is_starting() || is_any_replay());
1584
1585 switch(step) {
1586 case MDS_BOOT_INITIAL:
1587 {
1588 mdcache->init_layouts();
1589
1590 MDSGatherBuilder gather(g_ceph_context,
1591 new C_MDS_BootStart(this, MDS_BOOT_OPEN_ROOT));
1592 dout(2) << "Booting: " << step << ": opening inotable" << dendl;
1593 inotable->set_rank(whoami);
1594 inotable->load(gather.new_sub());
1595
1596 dout(2) << "Booting: " << step << ": opening sessionmap" << dendl;
1597 sessionmap.set_rank(whoami);
1598 sessionmap.load(gather.new_sub());
1599
1600 dout(2) << "Booting: " << step << ": opening mds log" << dendl;
1601 mdlog->open(gather.new_sub());
1602
1603 if (is_starting()) {
1604 dout(2) << "Booting: " << step << ": opening purge queue" << dendl;
1605 purge_queue.open(new C_IO_Wrapper(this, gather.new_sub()));
1606 } else if (!standby_replaying) {
1607 dout(2) << "Booting: " << step << ": opening purge queue (async)" << dendl;
1608 purge_queue.open(NULL);
1609 dout(2) << "Booting: " << step << ": loading open file table (async)" << dendl;
1610 mdcache->open_file_table.load(nullptr);
1611 }
1612
1613 if (mdsmap->get_tableserver() == whoami) {
1614 dout(2) << "Booting: " << step << ": opening snap table" << dendl;
1615 snapserver->set_rank(whoami);
1616 snapserver->load(gather.new_sub());
1617 }
1618
1619 gather.activate();
1620 }
1621 break;
1622 case MDS_BOOT_OPEN_ROOT:
1623 {
1624 dout(2) << "Booting: " << step << ": loading/discovering base inodes" << dendl;
1625
1626 MDSGatherBuilder gather(g_ceph_context,
1627 new C_MDS_BootStart(this, MDS_BOOT_PREPARE_LOG));
1628
1629 if (is_starting()) {
1630 // load mydir frag for the first log segment (creating subtree map)
1631 mdcache->open_mydir_frag(gather.new_sub());
1632 } else {
1633 mdcache->open_mydir_inode(gather.new_sub());
1634 }
1635
1636 mdcache->create_global_snaprealm();
1637
1638 if (whoami == mdsmap->get_root()) { // load root inode off disk if we are auth
1639 mdcache->open_root_inode(gather.new_sub());
1640 } else if (is_any_replay()) {
1641 // replay. make up fake root inode to start with
1642 mdcache->create_root_inode();
1643 }
1644 gather.activate();
1645 }
1646 break;
1647 case MDS_BOOT_PREPARE_LOG:
1648 if (is_any_replay()) {
1649 dout(2) << "Booting: " << step << ": replaying mds log" << dendl;
1650 MDSGatherBuilder gather(g_ceph_context,
1651 new C_MDS_BootStart(this, MDS_BOOT_REPLAY_DONE));
1652
1653 if (!standby_replaying) {
1654 dout(2) << "Booting: " << step << ": waiting for purge queue recovered" << dendl;
1655 purge_queue.wait_for_recovery(new C_IO_Wrapper(this, gather.new_sub()));
1656 }
1657
1658 mdlog->replay(gather.new_sub());
1659 gather.activate();
1660 } else {
1661 dout(2) << "Booting: " << step << ": positioning at end of old mds log" << dendl;
1662 mdlog->append();
1663 starting_done();
1664 }
1665 break;
1666 case MDS_BOOT_REPLAY_DONE:
1667 ceph_assert(is_any_replay());
1668
1669 // Sessiontable and inotable should be in sync after replay, validate
1670 // that they are consistent.
1671 validate_sessions();
1672
1673 replay_done();
1674 break;
1675 }
1676 }
1677
1678 void MDSRank::validate_sessions()
1679 {
1680 ceph_assert(ceph_mutex_is_locked_by_me(mds_lock));
1681 bool valid = true;
1682
1683 // Identify any sessions which have state inconsistent with other,
1684 // after they have been loaded from rados during startup.
1685 // Mitigate bugs like: http://tracker.ceph.com/issues/16842
1686 for (const auto &i : sessionmap.get_sessions()) {
1687 Session *session = i.second;
1688 ceph_assert(session->info.prealloc_inos == session->free_prealloc_inos);
1689
1690 interval_set<inodeno_t> badones;
1691 if (inotable->intersects_free(session->info.prealloc_inos, &badones)) {
1692 clog->error() << "client " << *session
1693 << "loaded with preallocated inodes that are inconsistent with inotable";
1694 valid = false;
1695 }
1696 }
1697
1698 if (!valid) {
1699 damaged();
1700 ceph_assert(valid);
1701 }
1702 }
1703
1704 void MDSRank::starting_done()
1705 {
1706 dout(3) << "starting_done" << dendl;
1707 ceph_assert(is_starting());
1708 request_state(MDSMap::STATE_ACTIVE);
1709
1710 mdlog->start_new_segment();
1711
1712 // sync snaptable cache
1713 snapclient->sync(new C_MDSInternalNoop);
1714 }
1715
1716
1717 void MDSRank::calc_recovery_set()
1718 {
1719 // initialize gather sets
1720 set<mds_rank_t> rs;
1721 mdsmap->get_recovery_mds_set(rs);
1722 rs.erase(whoami);
1723 mdcache->set_recovery_set(rs);
1724
1725 dout(1) << " recovery set is " << rs << dendl;
1726 }
1727
1728 void MDSRank::replay_start()
1729 {
1730 dout(1) << "replay_start" << dendl;
1731
1732 if (is_standby_replay())
1733 standby_replaying = true;
1734
1735 // Check if we need to wait for a newer OSD map before starting
1736 bool const ready = objecter->with_osdmap(
1737 [this](const OSDMap& o) {
1738 return o.get_epoch() >= mdsmap->get_last_failure_osd_epoch();
1739 });
1740
1741 if (ready) {
1742 boot_start();
1743 } else {
1744 dout(1) << " waiting for osdmap " << mdsmap->get_last_failure_osd_epoch()
1745 << " (which blocklists prior instance)" << dendl;
1746 Context *fin = new C_IO_Wrapper(this, new C_MDS_BootStart(this, MDS_BOOT_INITIAL));
1747 objecter->wait_for_map(
1748 mdsmap->get_last_failure_osd_epoch(),
1749 lambdafy(fin));
1750 }
1751 }
1752
1753
1754 class MDSRank::C_MDS_StandbyReplayRestartFinish : public MDSIOContext {
1755 uint64_t old_read_pos;
1756 public:
1757 C_MDS_StandbyReplayRestartFinish(MDSRank *mds_, uint64_t old_read_pos_) :
1758 MDSIOContext(mds_), old_read_pos(old_read_pos_) {}
1759 void finish(int r) override {
1760 mds->_standby_replay_restart_finish(r, old_read_pos);
1761 }
1762 void print(ostream& out) const override {
1763 out << "standby_replay_restart";
1764 }
1765 };
1766
1767 void MDSRank::_standby_replay_restart_finish(int r, uint64_t old_read_pos)
1768 {
1769 if (old_read_pos < mdlog->get_journaler()->get_trimmed_pos()) {
1770 dout(0) << "standby MDS fell behind active MDS journal's expire_pos, restarting" << dendl;
1771 respawn(); /* we're too far back, and this is easier than
1772 trying to reset everything in the cache, etc */
1773 } else {
1774 mdlog->standby_trim_segments();
1775 boot_start(MDS_BOOT_PREPARE_LOG, r);
1776 }
1777 }
1778
1779 class MDSRank::C_MDS_StandbyReplayRestart : public MDSInternalContext {
1780 public:
1781 explicit C_MDS_StandbyReplayRestart(MDSRank *m) : MDSInternalContext(m) {}
1782 void finish(int r) override {
1783 ceph_assert(!r);
1784 mds->standby_replay_restart();
1785 }
1786 };
1787
1788 void MDSRank::standby_replay_restart()
1789 {
1790 if (standby_replaying) {
1791 /* Go around for another pass of replaying in standby */
1792 dout(5) << "Restarting replay as standby-replay" << dendl;
1793 mdlog->get_journaler()->reread_head_and_probe(
1794 new C_MDS_StandbyReplayRestartFinish(
1795 this,
1796 mdlog->get_journaler()->get_read_pos()));
1797 } else {
1798 /* We are transitioning out of standby: wait for OSD map update
1799 before making final pass */
1800 dout(1) << "standby_replay_restart (final takeover pass)" << dendl;
1801 bool ready = objecter->with_osdmap(
1802 [this](const OSDMap& o) {
1803 return o.get_epoch() >= mdsmap->get_last_failure_osd_epoch();
1804 });
1805 if (ready) {
1806 mdlog->get_journaler()->reread_head_and_probe(
1807 new C_MDS_StandbyReplayRestartFinish(
1808 this,
1809 mdlog->get_journaler()->get_read_pos()));
1810
1811 dout(1) << " opening purge_queue (async)" << dendl;
1812 purge_queue.open(NULL);
1813 dout(1) << " opening open_file_table (async)" << dendl;
1814 mdcache->open_file_table.load(nullptr);
1815 } else {
1816 auto fin = new C_IO_Wrapper(this, new C_MDS_StandbyReplayRestart(this));
1817 dout(1) << " waiting for osdmap " << mdsmap->get_last_failure_osd_epoch()
1818 << " (which blocklists prior instance)" << dendl;
1819 objecter->wait_for_map(mdsmap->get_last_failure_osd_epoch(),
1820 lambdafy(fin));
1821 }
1822 }
1823 }
1824
1825 void MDSRank::replay_done()
1826 {
1827 if (!standby_replaying) {
1828 dout(1) << "Finished replaying journal" << dendl;
1829 } else {
1830 dout(5) << "Finished replaying journal as standby-replay" << dendl;
1831 }
1832
1833 if (is_standby_replay()) {
1834 // The replay was done in standby state, and we are still in that state
1835 ceph_assert(standby_replaying);
1836 dout(10) << "setting replay timer" << dendl;
1837 timer.add_event_after(g_conf()->mds_replay_interval,
1838 new C_MDS_StandbyReplayRestart(this));
1839 return;
1840 } else if (standby_replaying) {
1841 // The replay was done in standby state, we have now _left_ that state
1842 dout(10) << " last replay pass was as a standby; making final pass" << dendl;
1843 standby_replaying = false;
1844 standby_replay_restart();
1845 return;
1846 } else {
1847 // Replay is complete, journal read should be up to date
1848 ceph_assert(mdlog->get_journaler()->get_read_pos() == mdlog->get_journaler()->get_write_pos());
1849 ceph_assert(!is_standby_replay());
1850
1851 // Reformat and come back here
1852 if (mdlog->get_journaler()->get_stream_format() < g_conf()->mds_journal_format) {
1853 dout(4) << "reformatting journal on standby-replay->replay transition" << dendl;
1854 mdlog->reopen(new C_MDS_BootStart(this, MDS_BOOT_REPLAY_DONE));
1855 return;
1856 }
1857 }
1858
1859 dout(1) << "making mds journal writeable" << dendl;
1860 mdlog->get_journaler()->set_writeable();
1861 mdlog->get_journaler()->trim_tail();
1862
1863 if (mdsmap->get_tableserver() == whoami &&
1864 snapserver->upgrade_format()) {
1865 dout(1) << "upgrading snaptable format" << dendl;
1866 snapserver->save(new C_MDSInternalNoop);
1867 }
1868
1869 if (g_conf()->mds_wipe_sessions) {
1870 dout(1) << "wiping out client sessions" << dendl;
1871 sessionmap.wipe();
1872 sessionmap.save(new C_MDSInternalNoop);
1873 }
1874 if (g_conf()->mds_wipe_ino_prealloc) {
1875 dout(1) << "wiping out ino prealloc from sessions" << dendl;
1876 sessionmap.wipe_ino_prealloc();
1877 sessionmap.save(new C_MDSInternalNoop);
1878 }
1879 if (g_conf()->mds_skip_ino) {
1880 inodeno_t i = g_conf()->mds_skip_ino;
1881 dout(1) << "skipping " << i << " inodes" << dendl;
1882 inotable->skip_inos(i);
1883 inotable->save(new C_MDSInternalNoop);
1884 }
1885
1886 if (mdsmap->get_num_in_mds() == 1 &&
1887 mdsmap->get_num_failed_mds() == 0) { // just me!
1888 dout(2) << "i am alone, moving to state reconnect" << dendl;
1889 request_state(MDSMap::STATE_RECONNECT);
1890 // sync snaptable cache
1891 snapclient->sync(new C_MDSInternalNoop);
1892 } else {
1893 dout(2) << "i am not alone, moving to state resolve" << dendl;
1894 request_state(MDSMap::STATE_RESOLVE);
1895 }
1896 }
1897
1898 void MDSRank::reopen_log()
1899 {
1900 dout(1) << "reopen_log" << dendl;
1901 mdcache->rollback_uncommitted_fragments();
1902 }
1903
1904 void MDSRank::resolve_start()
1905 {
1906 dout(1) << "resolve_start" << dendl;
1907
1908 reopen_log();
1909
1910 calc_recovery_set();
1911
1912 mdcache->resolve_start(new C_MDS_VoidFn(this, &MDSRank::resolve_done));
1913 finish_contexts(g_ceph_context, waiting_for_resolve);
1914 }
1915
1916 void MDSRank::resolve_done()
1917 {
1918 dout(1) << "resolve_done" << dendl;
1919 request_state(MDSMap::STATE_RECONNECT);
1920 // sync snaptable cache
1921 snapclient->sync(new C_MDSInternalNoop);
1922 }
1923
1924 void MDSRank::reconnect_start()
1925 {
1926 dout(1) << "reconnect_start" << dendl;
1927
1928 if (last_state == MDSMap::STATE_REPLAY) {
1929 reopen_log();
1930 }
1931
1932 // Drop any blocklisted clients from the SessionMap before going
1933 // into reconnect, so that we don't wait for them.
1934 objecter->enable_blocklist_events();
1935 std::set<entity_addr_t> blocklist;
1936 epoch_t epoch = 0;
1937 objecter->with_osdmap([&blocklist, &epoch](const OSDMap& o) {
1938 o.get_blocklist(&blocklist);
1939 epoch = o.get_epoch();
1940 });
1941 auto killed = server->apply_blocklist(blocklist);
1942 dout(4) << "reconnect_start: killed " << killed << " blocklisted sessions ("
1943 << blocklist.size() << " blocklist entries, "
1944 << sessionmap.get_sessions().size() << ")" << dendl;
1945 if (killed) {
1946 set_osd_epoch_barrier(epoch);
1947 }
1948
1949 server->reconnect_clients(new C_MDS_VoidFn(this, &MDSRank::reconnect_done));
1950 finish_contexts(g_ceph_context, waiting_for_reconnect);
1951 }
1952 void MDSRank::reconnect_done()
1953 {
1954 dout(1) << "reconnect_done" << dendl;
1955 request_state(MDSMap::STATE_REJOIN); // move to rejoin state
1956 }
1957
1958 void MDSRank::rejoin_joint_start()
1959 {
1960 dout(1) << "rejoin_joint_start" << dendl;
1961 mdcache->rejoin_send_rejoins();
1962 }
1963 void MDSRank::rejoin_start()
1964 {
1965 dout(1) << "rejoin_start" << dendl;
1966 mdcache->rejoin_start(new C_MDS_VoidFn(this, &MDSRank::rejoin_done));
1967 finish_contexts(g_ceph_context, waiting_for_rejoin);
1968 }
1969 void MDSRank::rejoin_done()
1970 {
1971 dout(1) << "rejoin_done" << dendl;
1972 mdcache->show_subtrees();
1973 mdcache->show_cache();
1974
1975 if (mdcache->is_any_uncommitted_fragment()) {
1976 dout(1) << " waiting for uncommitted fragments" << dendl;
1977 mdcache->wait_for_uncommitted_fragments(new C_MDS_VoidFn(this, &MDSRank::rejoin_done));
1978 return;
1979 }
1980
1981 // funny case: is our cache empty? no subtrees?
1982 if (!mdcache->is_subtrees()) {
1983 if (whoami == 0) {
1984 // The root should always have a subtree!
1985 clog->error() << "No subtrees found for root MDS rank!";
1986 damaged();
1987 ceph_assert(mdcache->is_subtrees());
1988 } else {
1989 dout(1) << " empty cache, no subtrees, leaving cluster" << dendl;
1990 request_state(MDSMap::STATE_STOPPED);
1991 }
1992 return;
1993 }
1994
1995 if (replay_queue.empty() && !server->get_num_pending_reclaim()) {
1996 request_state(MDSMap::STATE_ACTIVE);
1997 } else {
1998 replaying_requests_done = replay_queue.empty();
1999 request_state(MDSMap::STATE_CLIENTREPLAY);
2000 }
2001 }
2002
2003 void MDSRank::clientreplay_start()
2004 {
2005 dout(1) << "clientreplay_start" << dendl;
2006 finish_contexts(g_ceph_context, waiting_for_replay); // kick waiters
2007 queue_one_replay();
2008 }
2009
2010 bool MDSRank::queue_one_replay()
2011 {
2012 if (!replay_queue.empty()) {
2013 queue_waiter(replay_queue.front());
2014 replay_queue.pop_front();
2015 return true;
2016 }
2017 if (!replaying_requests_done) {
2018 replaying_requests_done = true;
2019 mdlog->flush();
2020 }
2021 maybe_clientreplay_done();
2022 return false;
2023 }
2024
2025 void MDSRank::maybe_clientreplay_done()
2026 {
2027 if (is_clientreplay() && get_want_state() == MDSMap::STATE_CLIENTREPLAY) {
2028
2029 // don't go to active if there are session waiting for being reclaimed
2030 if (replaying_requests_done && !server->get_num_pending_reclaim()) {
2031 mdlog->wait_for_safe(new C_MDS_VoidFn(this, &MDSRank::clientreplay_done));
2032 return;
2033 }
2034
2035 dout(1) << " still have " << replay_queue.size() + (int)!replaying_requests_done
2036 << " requests need to be replayed, " << server->get_num_pending_reclaim()
2037 << " sessions need to be reclaimed" << dendl;
2038 }
2039 }
2040
2041 void MDSRank::clientreplay_done()
2042 {
2043 dout(1) << "clientreplay_done" << dendl;
2044 request_state(MDSMap::STATE_ACTIVE);
2045 }
2046
2047 void MDSRank::active_start()
2048 {
2049 dout(1) << "active_start" << dendl;
2050
2051 if (last_state == MDSMap::STATE_CREATING ||
2052 last_state == MDSMap::STATE_STARTING) {
2053 mdcache->open_root();
2054 }
2055
2056 dout(10) << __func__ << ": initializing metrics handler" << dendl;
2057 metrics_handler.init();
2058 messenger->add_dispatcher_tail(&metrics_handler);
2059
2060 // metric aggregation is solely done by rank 0
2061 if (is_rank0()) {
2062 dout(10) << __func__ << ": initializing metric aggregator" << dendl;
2063 ceph_assert(metric_aggregator == nullptr);
2064 metric_aggregator = std::make_unique<MetricAggregator>(cct, this, mgrc);
2065 metric_aggregator->init();
2066 messenger->add_dispatcher_tail(metric_aggregator.get());
2067 }
2068
2069 mdcache->clean_open_file_lists();
2070 mdcache->export_remaining_imported_caps();
2071 finish_contexts(g_ceph_context, waiting_for_replay); // kick waiters
2072
2073 mdcache->reissue_all_caps();
2074
2075 finish_contexts(g_ceph_context, waiting_for_active); // kick waiters
2076 }
2077
2078 void MDSRank::recovery_done(int oldstate)
2079 {
2080 dout(1) << "recovery_done -- successful recovery!" << dendl;
2081 ceph_assert(is_clientreplay() || is_active());
2082
2083 if (oldstate == MDSMap::STATE_CREATING)
2084 return;
2085
2086 mdcache->start_recovered_truncates();
2087 mdcache->start_purge_inodes();
2088 mdcache->start_files_to_recover();
2089
2090 mdcache->populate_mydir();
2091 }
2092
2093 void MDSRank::creating_done()
2094 {
2095 dout(1)<< "creating_done" << dendl;
2096 request_state(MDSMap::STATE_ACTIVE);
2097 // sync snaptable cache
2098 snapclient->sync(new C_MDSInternalNoop);
2099 }
2100
2101 void MDSRank::boot_create()
2102 {
2103 dout(3) << "boot_create" << dendl;
2104
2105 MDSGatherBuilder fin(g_ceph_context, new C_MDS_VoidFn(this, &MDSRank::creating_done));
2106
2107 mdcache->init_layouts();
2108
2109 inotable->set_rank(whoami);
2110 sessionmap.set_rank(whoami);
2111
2112 // start with a fresh journal
2113 dout(10) << "boot_create creating fresh journal" << dendl;
2114 mdlog->create(fin.new_sub());
2115
2116 // open new journal segment, but do not journal subtree map (yet)
2117 mdlog->prepare_new_segment();
2118
2119 if (whoami == mdsmap->get_root()) {
2120 dout(3) << "boot_create creating fresh hierarchy" << dendl;
2121 mdcache->create_empty_hierarchy(fin.get());
2122 }
2123
2124 dout(3) << "boot_create creating mydir hierarchy" << dendl;
2125 mdcache->create_mydir_hierarchy(fin.get());
2126
2127 dout(3) << "boot_create creating global snaprealm" << dendl;
2128 mdcache->create_global_snaprealm();
2129
2130 // fixme: fake out inotable (reset, pretend loaded)
2131 dout(10) << "boot_create creating fresh inotable table" << dendl;
2132 inotable->reset();
2133 inotable->save(fin.new_sub());
2134
2135 // write empty sessionmap
2136 sessionmap.save(fin.new_sub());
2137
2138 // Create empty purge queue
2139 purge_queue.create(new C_IO_Wrapper(this, fin.new_sub()));
2140
2141 // initialize tables
2142 if (mdsmap->get_tableserver() == whoami) {
2143 dout(10) << "boot_create creating fresh snaptable" << dendl;
2144 snapserver->set_rank(whoami);
2145 snapserver->reset();
2146 snapserver->save(fin.new_sub());
2147 }
2148
2149 ceph_assert(g_conf()->mds_kill_create_at != 1);
2150
2151 // ok now journal it
2152 mdlog->journal_segment_subtree_map(fin.new_sub());
2153 mdlog->flush();
2154
2155 // Usually we do this during reconnect, but creation skips that.
2156 objecter->enable_blocklist_events();
2157
2158 fin.activate();
2159 }
2160
2161 void MDSRank::stopping_start()
2162 {
2163 dout(2) << "Stopping..." << dendl;
2164
2165 if (mdsmap->get_num_in_mds() == 1 && !sessionmap.empty()) {
2166 std::vector<Session*> victims;
2167 const auto& sessions = sessionmap.get_sessions();
2168 for (const auto& p : sessions) {
2169 if (!p.first.is_client()) {
2170 continue;
2171 }
2172
2173 Session *s = p.second;
2174 victims.push_back(s);
2175 }
2176
2177 dout(20) << __func__ << " matched " << victims.size() << " sessions" << dendl;
2178 ceph_assert(!victims.empty());
2179
2180 C_GatherBuilder gather(g_ceph_context, new C_MDSInternalNoop);
2181 for (const auto &s : victims) {
2182 CachedStackStringStream css;
2183 evict_client(s->get_client().v, false,
2184 g_conf()->mds_session_blocklist_on_evict, *css, gather.new_sub());
2185 }
2186 gather.activate();
2187 }
2188
2189 mdcache->shutdown_start();
2190 }
2191
2192 void MDSRank::stopping_done()
2193 {
2194 dout(2) << "Finished stopping..." << dendl;
2195
2196 // tell monitor we shut down cleanly.
2197 request_state(MDSMap::STATE_STOPPED);
2198 }
2199
2200 void MDSRankDispatcher::handle_mds_map(
2201 const cref_t<MMDSMap> &m,
2202 const MDSMap &oldmap)
2203 {
2204 // I am only to be passed MDSMaps in which I hold a rank
2205 ceph_assert(whoami != MDS_RANK_NONE);
2206
2207 MDSMap::DaemonState oldstate = state;
2208 mds_gid_t mds_gid = mds_gid_t(monc->get_global_id());
2209 state = mdsmap->get_state_gid(mds_gid);
2210 if (state != oldstate) {
2211 last_state = oldstate;
2212 incarnation = mdsmap->get_inc_gid(mds_gid);
2213 }
2214
2215 version_t epoch = m->get_epoch();
2216
2217 // note source's map version
2218 if (m->get_source().is_mds() &&
2219 peer_mdsmap_epoch[mds_rank_t(m->get_source().num())] < epoch) {
2220 dout(15) << " peer " << m->get_source()
2221 << " has mdsmap epoch >= " << epoch
2222 << dendl;
2223 peer_mdsmap_epoch[mds_rank_t(m->get_source().num())] = epoch;
2224 }
2225
2226 // Validate state transitions while I hold a rank
2227 if (!MDSMap::state_transition_valid(oldstate, state)) {
2228 derr << "Invalid state transition " << ceph_mds_state_name(oldstate)
2229 << "->" << ceph_mds_state_name(state) << dendl;
2230 respawn();
2231 }
2232
2233 if (oldstate != state) {
2234 // update messenger.
2235 if (state == MDSMap::STATE_STANDBY_REPLAY) {
2236 dout(1) << "handle_mds_map i am now mds." << mds_gid << "." << incarnation
2237 << " replaying mds." << whoami << "." << incarnation << dendl;
2238 messenger->set_myname(entity_name_t::MDS(mds_gid));
2239 } else {
2240 dout(1) << "handle_mds_map i am now mds." << whoami << "." << incarnation << dendl;
2241 messenger->set_myname(entity_name_t::MDS(whoami));
2242 }
2243 }
2244
2245 // tell objecter my incarnation
2246 if (objecter->get_client_incarnation() != incarnation)
2247 objecter->set_client_incarnation(incarnation);
2248
2249 if (mdsmap->get_required_client_features() != oldmap.get_required_client_features())
2250 server->update_required_client_features();
2251
2252 // for debug
2253 if (g_conf()->mds_dump_cache_on_map)
2254 mdcache->dump_cache();
2255
2256 cluster_degraded = mdsmap->is_degraded();
2257
2258 // mdsmap and oldmap can be discontinuous. failover might happen in the missing mdsmap.
2259 // the 'restart' set tracks ranks that have restarted since the old mdsmap
2260 set<mds_rank_t> restart;
2261 // replaying mds does not communicate with other ranks
2262 if (state >= MDSMap::STATE_RESOLVE) {
2263 // did someone fail?
2264 // new down?
2265 set<mds_rank_t> olddown, down;
2266 oldmap.get_down_mds_set(&olddown);
2267 mdsmap->get_down_mds_set(&down);
2268 for (const auto& r : down) {
2269 if (oldmap.have_inst(r) && olddown.count(r) == 0) {
2270 messenger->mark_down_addrs(oldmap.get_addrs(r));
2271 handle_mds_failure(r);
2272 }
2273 }
2274
2275 // did someone fail?
2276 // did their addr/inst change?
2277 set<mds_rank_t> up;
2278 mdsmap->get_up_mds_set(up);
2279 for (const auto& r : up) {
2280 auto& info = mdsmap->get_info(r);
2281 if (oldmap.have_inst(r)) {
2282 auto& oldinfo = oldmap.get_info(r);
2283 if (info.inc != oldinfo.inc) {
2284 messenger->mark_down_addrs(oldinfo.get_addrs());
2285 if (info.state == MDSMap::STATE_REPLAY ||
2286 info.state == MDSMap::STATE_RESOLVE) {
2287 restart.insert(r);
2288 handle_mds_failure(r);
2289 } else {
2290 ceph_assert(info.state == MDSMap::STATE_STARTING ||
2291 info.state == MDSMap::STATE_ACTIVE);
2292 // -> stopped (missing) -> starting -> active
2293 restart.insert(r);
2294 mdcache->migrator->handle_mds_failure_or_stop(r);
2295 if (mdsmap->get_tableserver() == whoami)
2296 snapserver->handle_mds_failure_or_stop(r);
2297 }
2298 }
2299 } else {
2300 if (info.state == MDSMap::STATE_REPLAY ||
2301 info.state == MDSMap::STATE_RESOLVE) {
2302 // -> starting/creating (missing) -> active (missing) -> replay -> resolve
2303 restart.insert(r);
2304 handle_mds_failure(r);
2305 } else {
2306 ceph_assert(info.state == MDSMap::STATE_CREATING ||
2307 info.state == MDSMap::STATE_STARTING ||
2308 info.state == MDSMap::STATE_ACTIVE);
2309 }
2310 }
2311 }
2312 }
2313
2314 // did it change?
2315 if (oldstate != state) {
2316 dout(1) << "handle_mds_map state change "
2317 << ceph_mds_state_name(oldstate) << " --> "
2318 << ceph_mds_state_name(state) << dendl;
2319 beacon.set_want_state(*mdsmap, state);
2320
2321 if (oldstate == MDSMap::STATE_STANDBY_REPLAY) {
2322 dout(10) << "Monitor activated us! Deactivating replay loop" << dendl;
2323 assert (state == MDSMap::STATE_REPLAY);
2324 } else {
2325 // did i just recover?
2326 if ((is_active() || is_clientreplay()) &&
2327 (oldstate == MDSMap::STATE_CREATING ||
2328 oldstate == MDSMap::STATE_REJOIN ||
2329 oldstate == MDSMap::STATE_RECONNECT))
2330 recovery_done(oldstate);
2331
2332 if (is_active()) {
2333 active_start();
2334 } else if (is_any_replay()) {
2335 replay_start();
2336 } else if (is_resolve()) {
2337 resolve_start();
2338 } else if (is_reconnect()) {
2339 reconnect_start();
2340 } else if (is_rejoin()) {
2341 rejoin_start();
2342 } else if (is_clientreplay()) {
2343 clientreplay_start();
2344 } else if (is_creating()) {
2345 boot_create();
2346 } else if (is_starting()) {
2347 boot_start();
2348 } else if (is_stopping()) {
2349 ceph_assert(oldstate == MDSMap::STATE_ACTIVE);
2350 stopping_start();
2351 }
2352 }
2353 }
2354
2355 // RESOLVE
2356 // is someone else newly resolving?
2357 if (state >= MDSMap::STATE_RESOLVE) {
2358 // recover snaptable
2359 if (mdsmap->get_tableserver() == whoami) {
2360 if (oldstate < MDSMap::STATE_RESOLVE) {
2361 set<mds_rank_t> s;
2362 mdsmap->get_mds_set_lower_bound(s, MDSMap::STATE_RESOLVE);
2363 snapserver->finish_recovery(s);
2364 } else {
2365 set<mds_rank_t> old_set, new_set;
2366 oldmap.get_mds_set_lower_bound(old_set, MDSMap::STATE_RESOLVE);
2367 mdsmap->get_mds_set_lower_bound(new_set, MDSMap::STATE_RESOLVE);
2368 for (const auto& r : new_set) {
2369 if (r == whoami)
2370 continue; // not me
2371 if (!old_set.count(r) || restart.count(r)) { // newly so?
2372 snapserver->handle_mds_recovery(r);
2373 }
2374 }
2375 }
2376 }
2377
2378 if ((!oldmap.is_resolving() || !restart.empty()) && mdsmap->is_resolving()) {
2379 set<mds_rank_t> resolve;
2380 mdsmap->get_mds_set(resolve, MDSMap::STATE_RESOLVE);
2381 dout(10) << " resolve set is " << resolve << dendl;
2382 calc_recovery_set();
2383 mdcache->send_resolves();
2384 }
2385 }
2386
2387 // REJOIN
2388 // is everybody finally rejoining?
2389 if (state >= MDSMap::STATE_REJOIN) {
2390 // did we start?
2391 if (!oldmap.is_rejoining() && mdsmap->is_rejoining())
2392 rejoin_joint_start();
2393
2394 // did we finish?
2395 if (g_conf()->mds_dump_cache_after_rejoin &&
2396 oldmap.is_rejoining() && !mdsmap->is_rejoining())
2397 mdcache->dump_cache(); // for DEBUG only
2398
2399 if (oldstate >= MDSMap::STATE_REJOIN ||
2400 oldstate == MDSMap::STATE_STARTING) {
2401 // ACTIVE|CLIENTREPLAY|REJOIN => we can discover from them.
2402 set<mds_rank_t> olddis, dis;
2403 oldmap.get_mds_set_lower_bound(olddis, MDSMap::STATE_REJOIN);
2404 mdsmap->get_mds_set_lower_bound(dis, MDSMap::STATE_REJOIN);
2405 for (const auto& r : dis) {
2406 if (r == whoami)
2407 continue; // not me
2408 if (!olddis.count(r) || restart.count(r)) { // newly so?
2409 mdcache->kick_discovers(r);
2410 mdcache->kick_open_ino_peers(r);
2411 }
2412 }
2413 }
2414 }
2415
2416 if (oldmap.is_degraded() && !cluster_degraded && state >= MDSMap::STATE_ACTIVE) {
2417 dout(1) << "cluster recovered." << dendl;
2418 auto it = waiting_for_active_peer.find(MDS_RANK_NONE);
2419 if (it != waiting_for_active_peer.end()) {
2420 queue_waiters(it->second);
2421 waiting_for_active_peer.erase(it);
2422 }
2423 }
2424
2425 // did someone go active?
2426 if (state >= MDSMap::STATE_CLIENTREPLAY &&
2427 oldstate >= MDSMap::STATE_CLIENTREPLAY) {
2428 set<mds_rank_t> oldactive, active;
2429 oldmap.get_mds_set_lower_bound(oldactive, MDSMap::STATE_CLIENTREPLAY);
2430 mdsmap->get_mds_set_lower_bound(active, MDSMap::STATE_CLIENTREPLAY);
2431 for (const auto& r : active) {
2432 if (r == whoami)
2433 continue; // not me
2434 if (!oldactive.count(r) || restart.count(r)) // newly so?
2435 handle_mds_recovery(r);
2436 }
2437 }
2438
2439 if (is_clientreplay() || is_active() || is_stopping()) {
2440 // did anyone stop?
2441 set<mds_rank_t> oldstopped, stopped;
2442 oldmap.get_stopped_mds_set(oldstopped);
2443 mdsmap->get_stopped_mds_set(stopped);
2444 for (const auto& r : stopped)
2445 if (oldstopped.count(r) == 0) { // newly so?
2446 mdcache->migrator->handle_mds_failure_or_stop(r);
2447 if (mdsmap->get_tableserver() == whoami)
2448 snapserver->handle_mds_failure_or_stop(r);
2449 }
2450 }
2451
2452 {
2453 map<epoch_t,MDSContext::vec >::iterator p = waiting_for_mdsmap.begin();
2454 while (p != waiting_for_mdsmap.end() && p->first <= mdsmap->get_epoch()) {
2455 MDSContext::vec ls;
2456 ls.swap(p->second);
2457 waiting_for_mdsmap.erase(p++);
2458 queue_waiters(ls);
2459 }
2460 }
2461
2462 if (is_active()) {
2463 // Before going active, set OSD epoch barrier to latest (so that
2464 // we don't risk handing out caps to clients with old OSD maps that
2465 // might not include barriers from the previous incarnation of this MDS)
2466 set_osd_epoch_barrier(objecter->with_osdmap(
2467 std::mem_fn(&OSDMap::get_epoch)));
2468
2469 /* Now check if we should hint to the OSD that a read may follow */
2470 if (mdsmap->has_standby_replay(whoami))
2471 mdlog->set_write_iohint(0);
2472 else
2473 mdlog->set_write_iohint(CEPH_OSD_OP_FLAG_FADVISE_DONTNEED);
2474 }
2475
2476 if (oldmap.get_max_mds() != mdsmap->get_max_mds()) {
2477 purge_queue.update_op_limit(*mdsmap);
2478 }
2479
2480 if (mdsmap->get_inline_data_enabled() && !oldmap.get_inline_data_enabled())
2481 dout(0) << "WARNING: inline_data support has been deprecated and will be removed in a future release" << dendl;
2482
2483 mdcache->handle_mdsmap(*mdsmap, oldmap);
2484
2485 if (metric_aggregator != nullptr) {
2486 metric_aggregator->notify_mdsmap(*mdsmap);
2487 }
2488 metrics_handler.notify_mdsmap(*mdsmap);
2489 }
2490
2491 void MDSRank::handle_mds_recovery(mds_rank_t who)
2492 {
2493 dout(5) << "handle_mds_recovery mds." << who << dendl;
2494
2495 mdcache->handle_mds_recovery(who);
2496
2497 queue_waiters(waiting_for_active_peer[who]);
2498 waiting_for_active_peer.erase(who);
2499 }
2500
2501 void MDSRank::handle_mds_failure(mds_rank_t who)
2502 {
2503 if (who == whoami) {
2504 dout(5) << "handle_mds_failure for myself; not doing anything" << dendl;
2505 return;
2506 }
2507 dout(5) << "handle_mds_failure mds." << who << dendl;
2508
2509 mdcache->handle_mds_failure(who);
2510
2511 if (mdsmap->get_tableserver() == whoami)
2512 snapserver->handle_mds_failure_or_stop(who);
2513
2514 snapclient->handle_mds_failure(who);
2515
2516 scrubstack->handle_mds_failure(who);
2517 }
2518
2519 void MDSRankDispatcher::handle_asok_command(
2520 std::string_view command,
2521 const cmdmap_t& cmdmap,
2522 Formatter *f,
2523 const bufferlist &inbl,
2524 std::function<void(int,const std::string&,bufferlist&)> on_finish)
2525 {
2526 int r = 0;
2527 CachedStackStringStream css;
2528 bufferlist outbl;
2529 if (command == "dump_ops_in_flight" ||
2530 command == "ops") {
2531 if (!op_tracker.dump_ops_in_flight(f)) {
2532 *css << "op_tracker disabled; set mds_enable_op_tracker=true to enable";
2533 }
2534 } else if (command == "dump_blocked_ops") {
2535 if (!op_tracker.dump_ops_in_flight(f, true)) {
2536 *css << "op_tracker disabled; set mds_enable_op_tracker=true to enable";
2537 }
2538 } else if (command == "dump_historic_ops") {
2539 if (!op_tracker.dump_historic_ops(f)) {
2540 *css << "op_tracker disabled; set mds_enable_op_tracker=true to enable";
2541 }
2542 } else if (command == "dump_historic_ops_by_duration") {
2543 if (!op_tracker.dump_historic_ops(f, true)) {
2544 *css << "op_tracker disabled; set mds_enable_op_tracker=true to enable";
2545 }
2546 } else if (command == "osdmap barrier") {
2547 int64_t target_epoch = 0;
2548 bool got_val = cmd_getval(cmdmap, "target_epoch", target_epoch);
2549
2550 if (!got_val) {
2551 *css << "no target epoch given";
2552 r = -CEPHFS_EINVAL;
2553 goto out;
2554 }
2555 {
2556 std::lock_guard l(mds_lock);
2557 set_osd_epoch_barrier(target_epoch);
2558 }
2559 boost::system::error_code ec;
2560 dout(4) << __func__ << ": possibly waiting for OSD epoch " << target_epoch << dendl;
2561 objecter->wait_for_map(target_epoch, ceph::async::use_blocked[ec]);
2562 } else if (command == "session ls" ||
2563 command == "client ls") {
2564 std::lock_guard l(mds_lock);
2565 bool cap_dump = false;
2566 std::vector<std::string> filter_args;
2567 cmd_getval(cmdmap, "cap_dump", cap_dump);
2568 cmd_getval(cmdmap, "filters", filter_args);
2569
2570 SessionFilter filter;
2571 r = filter.parse(filter_args, css.get());
2572 if (r != 0) {
2573 goto out;
2574 }
2575 dump_sessions(filter, f, cap_dump);
2576 } else if (command == "session evict" ||
2577 command == "client evict") {
2578 std::lock_guard l(mds_lock);
2579 std::vector<std::string> filter_args;
2580 cmd_getval(cmdmap, "filters", filter_args);
2581
2582 SessionFilter filter;
2583 r = filter.parse(filter_args, css.get());
2584 if (r != 0) {
2585 r = -CEPHFS_EINVAL;
2586 goto out;
2587 }
2588 evict_clients(filter, on_finish);
2589 return;
2590 } else if (command == "session kill") {
2591 std::string client_id;
2592 if (!cmd_getval(cmdmap, "client_id", client_id)) {
2593 *css << "Invalid client_id specified";
2594 r = -CEPHFS_ENOENT;
2595 goto out;
2596 }
2597 std::lock_guard l(mds_lock);
2598 bool evicted = evict_client(strtol(client_id.c_str(), 0, 10), true,
2599 g_conf()->mds_session_blocklist_on_evict, *css);
2600 if (!evicted) {
2601 dout(15) << css->strv() << dendl;
2602 r = -CEPHFS_ENOENT;
2603 }
2604 } else if (command == "session config" ||
2605 command == "client config") {
2606 int64_t client_id;
2607 std::string option;
2608 std::string value;
2609
2610 cmd_getval(cmdmap, "client_id", client_id);
2611 cmd_getval(cmdmap, "option", option);
2612 bool got_value = cmd_getval(cmdmap, "value", value);
2613
2614 std::lock_guard l(mds_lock);
2615 r = config_client(client_id, !got_value, option, value, *css);
2616 } else if (command == "scrub start" ||
2617 command == "scrub_start") {
2618 if (whoami != 0) {
2619 *css << "Not rank 0";
2620 r = -CEPHFS_EXDEV;
2621 goto out;
2622 }
2623
2624 string path;
2625 string tag;
2626 vector<string> scrubop_vec;
2627 cmd_getval(cmdmap, "scrubops", scrubop_vec);
2628 cmd_getval(cmdmap, "path", path);
2629 cmd_getval(cmdmap, "tag", tag);
2630
2631 finisher->queue(
2632 new LambdaContext(
2633 [this, on_finish, f, path, tag, scrubop_vec](int r) {
2634 command_scrub_start(
2635 f, path, tag, scrubop_vec,
2636 new LambdaContext(
2637 [on_finish](int r) {
2638 bufferlist outbl;
2639 on_finish(r, {}, outbl);
2640 }));
2641 }));
2642 return;
2643 } else if (command == "scrub abort") {
2644 if (whoami != 0) {
2645 *css << "Not rank 0";
2646 r = -CEPHFS_EXDEV;
2647 goto out;
2648 }
2649
2650 finisher->queue(
2651 new LambdaContext(
2652 [this, on_finish, f](int r) {
2653 command_scrub_abort(
2654 f,
2655 new LambdaContext(
2656 [on_finish, f](int r) {
2657 bufferlist outbl;
2658 f->open_object_section("result");
2659 f->dump_int("return_code", r);
2660 f->close_section();
2661 on_finish(r, {}, outbl);
2662 }));
2663 }));
2664 return;
2665 } else if (command == "scrub pause") {
2666 if (whoami != 0) {
2667 *css << "Not rank 0";
2668 r = -CEPHFS_EXDEV;
2669 goto out;
2670 }
2671
2672 finisher->queue(
2673 new LambdaContext(
2674 [this, on_finish, f](int r) {
2675 command_scrub_pause(
2676 f,
2677 new LambdaContext(
2678 [on_finish, f](int r) {
2679 bufferlist outbl;
2680 f->open_object_section("result");
2681 f->dump_int("return_code", r);
2682 f->close_section();
2683 on_finish(r, {}, outbl);
2684 }));
2685 }));
2686 return;
2687 } else if (command == "scrub resume") {
2688 if (whoami != 0) {
2689 *css << "Not rank 0";
2690 r = -CEPHFS_EXDEV;
2691 goto out;
2692 }
2693 command_scrub_resume(f);
2694 } else if (command == "scrub status") {
2695 command_scrub_status(f);
2696 } else if (command == "tag path") {
2697 if (whoami != 0) {
2698 *css << "Not rank 0";
2699 r = -CEPHFS_EXDEV;
2700 goto out;
2701 }
2702 string path;
2703 cmd_getval(cmdmap, "path", path);
2704 string tag;
2705 cmd_getval(cmdmap, "tag", tag);
2706 command_tag_path(f, path, tag);
2707 } else if (command == "flush_path") {
2708 string path;
2709 cmd_getval(cmdmap, "path", path);
2710 command_flush_path(f, path);
2711 } else if (command == "flush journal") {
2712 command_flush_journal(f);
2713 } else if (command == "get subtrees") {
2714 command_get_subtrees(f);
2715 } else if (command == "export dir") {
2716 string path;
2717 if(!cmd_getval(cmdmap, "path", path)) {
2718 *css << "malformed path";
2719 r = -CEPHFS_EINVAL;
2720 goto out;
2721 }
2722 int64_t rank;
2723 if(!cmd_getval(cmdmap, "rank", rank)) {
2724 *css << "malformed rank";
2725 r = -CEPHFS_EINVAL;
2726 goto out;
2727 }
2728 command_export_dir(f, path, (mds_rank_t)rank);
2729 } else if (command == "dump cache") {
2730 std::lock_guard l(mds_lock);
2731 string path;
2732 if (!cmd_getval(cmdmap, "path", path)) {
2733 r = mdcache->dump_cache(f);
2734 } else {
2735 r = mdcache->dump_cache(path);
2736 }
2737 } else if (command == "cache drop") {
2738 int64_t timeout = 0;
2739 cmd_getval(cmdmap, "timeout", timeout);
2740 finisher->queue(
2741 new LambdaContext(
2742 [this, on_finish, f, timeout](int r) {
2743 command_cache_drop(
2744 timeout, f,
2745 new LambdaContext(
2746 [on_finish](int r) {
2747 bufferlist outbl;
2748 on_finish(r, {}, outbl);
2749 }));
2750 }));
2751 return;
2752 } else if (command == "cache status") {
2753 std::lock_guard l(mds_lock);
2754 mdcache->cache_status(f);
2755 } else if (command == "dump tree") {
2756 command_dump_tree(cmdmap, *css, f);
2757 } else if (command == "dump loads") {
2758 std::lock_guard l(mds_lock);
2759 r = balancer->dump_loads(f);
2760 } else if (command == "dump snaps") {
2761 std::lock_guard l(mds_lock);
2762 string server;
2763 cmd_getval(cmdmap, "server", server);
2764 if (server == "--server") {
2765 if (mdsmap->get_tableserver() == whoami) {
2766 snapserver->dump(f);
2767 } else {
2768 r = -CEPHFS_EXDEV;
2769 *css << "Not snapserver";
2770 }
2771 } else {
2772 r = snapclient->dump_cache(f);
2773 }
2774 } else if (command == "force_readonly") {
2775 std::lock_guard l(mds_lock);
2776 mdcache->force_readonly();
2777 } else if (command == "dirfrag split") {
2778 command_dirfrag_split(cmdmap, *css);
2779 } else if (command == "dirfrag merge") {
2780 command_dirfrag_merge(cmdmap, *css);
2781 } else if (command == "dirfrag ls") {
2782 command_dirfrag_ls(cmdmap, *css, f);
2783 } else if (command == "openfiles ls") {
2784 command_openfiles_ls(f);
2785 } else if (command == "dump inode") {
2786 command_dump_inode(f, cmdmap, *css);
2787 } else if (command == "damage ls") {
2788 std::lock_guard l(mds_lock);
2789 damage_table.dump(f);
2790 } else if (command == "damage rm") {
2791 std::lock_guard l(mds_lock);
2792 damage_entry_id_t id = 0;
2793 if (!cmd_getval(cmdmap, "damage_id", (int64_t&)id)) {
2794 r = -CEPHFS_EINVAL;
2795 goto out;
2796 }
2797 damage_table.erase(id);
2798 } else {
2799 r = -CEPHFS_ENOSYS;
2800 }
2801 out:
2802 on_finish(r, css->str(), outbl);
2803 }
2804
2805 /**
2806 * This function drops the mds_lock, so don't do anything with
2807 * MDSRank after calling it (we could have gone into shutdown): just
2808 * send your result back to the calling client and finish.
2809 */
2810 void MDSRankDispatcher::evict_clients(
2811 const SessionFilter &filter,
2812 std::function<void(int,const std::string&,bufferlist&)> on_finish)
2813 {
2814 bufferlist outbl;
2815 if (is_any_replay()) {
2816 on_finish(-CEPHFS_EAGAIN, "MDS is replaying log", outbl);
2817 return;
2818 }
2819
2820 std::vector<Session*> victims;
2821 const auto& sessions = sessionmap.get_sessions();
2822 for (const auto& p : sessions) {
2823 if (!p.first.is_client()) {
2824 continue;
2825 }
2826
2827 Session *s = p.second;
2828
2829 if (filter.match(*s, std::bind(&Server::waiting_for_reconnect, server,
2830 std::placeholders::_1))) {
2831 victims.push_back(s);
2832 }
2833 }
2834
2835 dout(20) << __func__ << " matched " << victims.size() << " sessions" << dendl;
2836
2837 if (victims.empty()) {
2838 on_finish(0, {}, outbl);
2839 return;
2840 }
2841
2842 C_GatherBuilder gather(g_ceph_context,
2843 new LambdaContext([on_finish](int r) {
2844 bufferlist bl;
2845 on_finish(r, {}, bl);
2846 }));
2847 for (const auto s : victims) {
2848 CachedStackStringStream css;
2849 evict_client(s->get_client().v, false,
2850 g_conf()->mds_session_blocklist_on_evict, *css, gather.new_sub());
2851 }
2852 gather.activate();
2853 }
2854
2855 void MDSRankDispatcher::dump_sessions(const SessionFilter &filter, Formatter *f, bool cap_dump) const
2856 {
2857 // Dump sessions, decorated with recovery/replay status
2858 f->open_array_section("sessions");
2859 for (auto& [name, s] : sessionmap.get_sessions()) {
2860 if (!name.is_client()) {
2861 continue;
2862 }
2863
2864 if (!filter.match(*s, std::bind(&Server::waiting_for_reconnect, server, std::placeholders::_1))) {
2865 continue;
2866 }
2867
2868 f->open_object_section("session");
2869 s->dump(f, cap_dump);
2870 f->close_section();
2871 }
2872 f->close_section(); // sessions
2873 }
2874
2875 void MDSRank::command_scrub_start(Formatter *f,
2876 std::string_view path, std::string_view tag,
2877 const vector<string>& scrubop_vec, Context *on_finish)
2878 {
2879 bool force = false;
2880 bool recursive = false;
2881 bool repair = false;
2882 for (auto &op : scrubop_vec) {
2883 if (op == "force")
2884 force = true;
2885 else if (op == "recursive")
2886 recursive = true;
2887 else if (op == "repair")
2888 repair = true;
2889 }
2890
2891 std::lock_guard l(mds_lock);
2892 mdcache->enqueue_scrub(path, tag, force, recursive, repair, f, on_finish);
2893 // scrub_dentry() finishers will dump the data for us; we're done!
2894 }
2895
2896 void MDSRank::command_tag_path(Formatter *f,
2897 std::string_view path, std::string_view tag)
2898 {
2899 C_SaferCond scond;
2900 {
2901 std::lock_guard l(mds_lock);
2902 mdcache->enqueue_scrub(path, tag, true, true, false, f, &scond);
2903 }
2904 scond.wait();
2905 }
2906
2907 void MDSRank::command_scrub_abort(Formatter *f, Context *on_finish) {
2908 std::lock_guard l(mds_lock);
2909 scrubstack->scrub_abort(on_finish);
2910 }
2911
2912 void MDSRank::command_scrub_pause(Formatter *f, Context *on_finish) {
2913 std::lock_guard l(mds_lock);
2914 scrubstack->scrub_pause(on_finish);
2915 }
2916
2917 void MDSRank::command_scrub_resume(Formatter *f) {
2918 std::lock_guard l(mds_lock);
2919 int r = scrubstack->scrub_resume();
2920
2921 f->open_object_section("result");
2922 f->dump_int("return_code", r);
2923 f->close_section();
2924 }
2925
2926 void MDSRank::command_scrub_status(Formatter *f) {
2927 std::lock_guard l(mds_lock);
2928 scrubstack->scrub_status(f);
2929 }
2930
2931 void MDSRank::command_flush_path(Formatter *f, std::string_view path)
2932 {
2933 C_SaferCond scond;
2934 {
2935 std::lock_guard l(mds_lock);
2936 mdcache->flush_dentry(path, &scond);
2937 }
2938 int r = scond.wait();
2939 f->open_object_section("results");
2940 f->dump_int("return_code", r);
2941 f->close_section(); // results
2942 }
2943
2944 // synchronous wrapper around "journal flush" asynchronous context
2945 // execution.
2946 void MDSRank::command_flush_journal(Formatter *f) {
2947 ceph_assert(f != NULL);
2948
2949 C_SaferCond cond;
2950 CachedStackStringStream css;
2951 {
2952 std::lock_guard locker(mds_lock);
2953 C_Flush_Journal *flush_journal = new C_Flush_Journal(mdcache, mdlog, this, css.get(), &cond);
2954 flush_journal->send();
2955 }
2956 int r = cond.wait();
2957
2958 f->open_object_section("result");
2959 f->dump_string("message", css->strv());
2960 f->dump_int("return_code", r);
2961 f->close_section();
2962 }
2963
2964 void MDSRank::command_get_subtrees(Formatter *f)
2965 {
2966 ceph_assert(f != NULL);
2967 std::lock_guard l(mds_lock);
2968
2969 std::vector<CDir*> subtrees;
2970 mdcache->get_subtrees(subtrees);
2971
2972 f->open_array_section("subtrees");
2973 for (const auto& dir : subtrees) {
2974 f->open_object_section("subtree");
2975 {
2976 f->dump_bool("is_auth", dir->is_auth());
2977 f->dump_int("auth_first", dir->get_dir_auth().first);
2978 f->dump_int("auth_second", dir->get_dir_auth().second); {
2979 mds_rank_t export_pin = dir->inode->get_export_pin(false);
2980 f->dump_int("export_pin", export_pin >= 0 ? export_pin : -1);
2981 f->dump_bool("distributed_ephemeral_pin", export_pin == MDS_RANK_EPHEMERAL_DIST);
2982 f->dump_bool("random_ephemeral_pin", export_pin == MDS_RANK_EPHEMERAL_RAND);
2983 }
2984 f->dump_int("export_pin_target", dir->get_export_pin(false));
2985 f->open_object_section("dir");
2986 dir->dump(f);
2987 f->close_section();
2988 }
2989 f->close_section();
2990 }
2991 f->close_section();
2992 }
2993
2994
2995 void MDSRank::command_export_dir(Formatter *f,
2996 std::string_view path,
2997 mds_rank_t target)
2998 {
2999 int r = _command_export_dir(path, target);
3000 f->open_object_section("results");
3001 f->dump_int("return_code", r);
3002 f->close_section(); // results
3003 }
3004
3005 int MDSRank::_command_export_dir(
3006 std::string_view path,
3007 mds_rank_t target)
3008 {
3009 std::lock_guard l(mds_lock);
3010 filepath fp(path);
3011
3012 if (target == whoami || !mdsmap->is_up(target) || !mdsmap->is_in(target)) {
3013 derr << "bad MDS target " << target << dendl;
3014 return -CEPHFS_ENOENT;
3015 }
3016
3017 CInode *in = mdcache->cache_traverse(fp);
3018 if (!in) {
3019 derr << "Bath path '" << path << "'" << dendl;
3020 return -CEPHFS_ENOENT;
3021 }
3022 CDir *dir = in->get_dirfrag(frag_t());
3023 if (!dir || !(dir->is_auth())) {
3024 derr << "bad export_dir path dirfrag frag_t() or dir not auth" << dendl;
3025 return -CEPHFS_EINVAL;
3026 }
3027
3028 mdcache->migrator->export_dir(dir, target);
3029 return 0;
3030 }
3031
3032 void MDSRank::command_dump_tree(const cmdmap_t &cmdmap, std::ostream &ss, Formatter *f)
3033 {
3034 std::string root;
3035 int64_t depth;
3036 cmd_getval(cmdmap, "root", root);
3037 if (!cmd_getval(cmdmap, "depth", depth))
3038 depth = -1;
3039 std::lock_guard l(mds_lock);
3040 CInode *in = mdcache->cache_traverse(filepath(root.c_str()));
3041 if (!in) {
3042 ss << "root inode is not in cache";
3043 return;
3044 }
3045 f->open_array_section("inodes");
3046 mdcache->dump_tree(in, 0, depth, f);
3047 f->close_section();
3048 }
3049
3050 CDir *MDSRank::_command_dirfrag_get(
3051 const cmdmap_t &cmdmap,
3052 std::ostream &ss)
3053 {
3054 std::string path;
3055 bool got = cmd_getval(cmdmap, "path", path);
3056 if (!got) {
3057 ss << "missing path argument";
3058 return NULL;
3059 }
3060
3061 std::string frag_str;
3062 if (!cmd_getval(cmdmap, "frag", frag_str)) {
3063 ss << "missing frag argument";
3064 return NULL;
3065 }
3066
3067 CInode *in = mdcache->cache_traverse(filepath(path.c_str()));
3068 if (!in) {
3069 // TODO really we should load something in if it's not in cache,
3070 // but the infrastructure is harder, and we might still be unable
3071 // to act on it if someone else is auth.
3072 ss << "directory '" << path << "' inode not in cache";
3073 return NULL;
3074 }
3075
3076 frag_t fg;
3077
3078 if (!fg.parse(frag_str.c_str())) {
3079 ss << "frag " << frag_str << " failed to parse";
3080 return NULL;
3081 }
3082
3083 CDir *dir = in->get_dirfrag(fg);
3084 if (!dir) {
3085 ss << "frag " << in->ino() << "/" << fg << " not in cache ("
3086 "use `dirfrag ls` to see if it should exist)";
3087 return NULL;
3088 }
3089
3090 if (!dir->is_auth()) {
3091 ss << "frag " << dir->dirfrag() << " not auth (auth = "
3092 << dir->authority() << ")";
3093 return NULL;
3094 }
3095
3096 return dir;
3097 }
3098
3099 bool MDSRank::command_dirfrag_split(
3100 cmdmap_t cmdmap,
3101 std::ostream &ss)
3102 {
3103 std::lock_guard l(mds_lock);
3104 int64_t by = 0;
3105 if (!cmd_getval(cmdmap, "bits", by)) {
3106 ss << "missing bits argument";
3107 return false;
3108 }
3109
3110 if (by <= 0) {
3111 ss << "must split by >0 bits";
3112 return false;
3113 }
3114
3115 CDir *dir = _command_dirfrag_get(cmdmap, ss);
3116 if (!dir) {
3117 return false;
3118 }
3119
3120 mdcache->split_dir(dir, by);
3121
3122 return true;
3123 }
3124
3125 bool MDSRank::command_dirfrag_merge(
3126 cmdmap_t cmdmap,
3127 std::ostream &ss)
3128 {
3129 std::lock_guard l(mds_lock);
3130 std::string path;
3131 bool got = cmd_getval(cmdmap, "path", path);
3132 if (!got) {
3133 ss << "missing path argument";
3134 return false;
3135 }
3136
3137 std::string frag_str;
3138 if (!cmd_getval(cmdmap, "frag", frag_str)) {
3139 ss << "missing frag argument";
3140 return false;
3141 }
3142
3143 CInode *in = mdcache->cache_traverse(filepath(path.c_str()));
3144 if (!in) {
3145 ss << "directory '" << path << "' inode not in cache";
3146 return false;
3147 }
3148
3149 frag_t fg;
3150 if (!fg.parse(frag_str.c_str())) {
3151 ss << "frag " << frag_str << " failed to parse";
3152 return false;
3153 }
3154
3155 mdcache->merge_dir(in, fg);
3156
3157 return true;
3158 }
3159
3160 bool MDSRank::command_dirfrag_ls(
3161 cmdmap_t cmdmap,
3162 std::ostream &ss,
3163 Formatter *f)
3164 {
3165 std::lock_guard l(mds_lock);
3166 std::string path;
3167 bool got = cmd_getval(cmdmap, "path", path);
3168 if (!got) {
3169 ss << "missing path argument";
3170 return false;
3171 }
3172
3173 CInode *in = mdcache->cache_traverse(filepath(path.c_str()));
3174 if (!in) {
3175 ss << "directory inode not in cache";
3176 return false;
3177 }
3178
3179 f->open_array_section("frags");
3180 frag_vec_t leaves;
3181 // NB using get_leaves_under instead of get_dirfrags to give
3182 // you the list of what dirfrags may exist, not which are in cache
3183 in->dirfragtree.get_leaves_under(frag_t(), leaves);
3184 for (const auto& leaf : leaves) {
3185 f->open_object_section("frag");
3186 f->dump_int("value", leaf.value());
3187 f->dump_int("bits", leaf.bits());
3188 CachedStackStringStream css;
3189 *css << std::hex << leaf.value() << "/" << std::dec << leaf.bits();
3190 f->dump_string("str", css->strv());
3191 f->close_section();
3192 }
3193 f->close_section();
3194
3195 return true;
3196 }
3197
3198 void MDSRank::command_openfiles_ls(Formatter *f)
3199 {
3200 std::lock_guard l(mds_lock);
3201 mdcache->dump_openfiles(f);
3202 }
3203
3204 void MDSRank::command_dump_inode(Formatter *f, const cmdmap_t &cmdmap, std::ostream &ss)
3205 {
3206 std::lock_guard l(mds_lock);
3207 int64_t number;
3208 bool got = cmd_getval(cmdmap, "number", number);
3209 if (!got) {
3210 ss << "missing inode number";
3211 return;
3212 }
3213
3214 bool success = mdcache->dump_inode(f, number);
3215 if (!success) {
3216 ss << "dump inode failed, wrong inode number or the inode is not cached";
3217 }
3218 }
3219
3220 void MDSRank::dump_status(Formatter *f) const
3221 {
3222 f->dump_string("fs_name", fs_name);
3223 if (state == MDSMap::STATE_REPLAY ||
3224 state == MDSMap::STATE_STANDBY_REPLAY) {
3225 mdlog->dump_replay_status(f);
3226 } else if (state == MDSMap::STATE_RESOLVE) {
3227 mdcache->dump_resolve_status(f);
3228 } else if (state == MDSMap::STATE_RECONNECT) {
3229 server->dump_reconnect_status(f);
3230 } else if (state == MDSMap::STATE_REJOIN) {
3231 mdcache->dump_rejoin_status(f);
3232 } else if (state == MDSMap::STATE_CLIENTREPLAY) {
3233 dump_clientreplay_status(f);
3234 }
3235 f->dump_float("rank_uptime", get_uptime().count());
3236 }
3237
3238 void MDSRank::dump_clientreplay_status(Formatter *f) const
3239 {
3240 f->open_object_section("clientreplay_status");
3241 f->dump_unsigned("clientreplay_queue", replay_queue.size());
3242 f->dump_unsigned("active_replay", mdcache->get_num_client_requests());
3243 f->close_section();
3244 }
3245
3246 void MDSRankDispatcher::update_log_config()
3247 {
3248 map<string,string> log_to_monitors;
3249 map<string,string> log_to_syslog;
3250 map<string,string> log_channel;
3251 map<string,string> log_prio;
3252 map<string,string> log_to_graylog;
3253 map<string,string> log_to_graylog_host;
3254 map<string,string> log_to_graylog_port;
3255 uuid_d fsid;
3256 string host;
3257
3258 if (parse_log_client_options(g_ceph_context, log_to_monitors, log_to_syslog,
3259 log_channel, log_prio, log_to_graylog,
3260 log_to_graylog_host, log_to_graylog_port,
3261 fsid, host) == 0)
3262 clog->update_config(log_to_monitors, log_to_syslog,
3263 log_channel, log_prio, log_to_graylog,
3264 log_to_graylog_host, log_to_graylog_port,
3265 fsid, host);
3266 dout(10) << __func__ << " log_to_monitors " << log_to_monitors << dendl;
3267 }
3268
3269 void MDSRank::create_logger()
3270 {
3271 dout(10) << "create_logger" << dendl;
3272 {
3273 PerfCountersBuilder mds_plb(g_ceph_context, "mds", l_mds_first, l_mds_last);
3274
3275 // super useful (high prio) perf stats
3276 mds_plb.add_u64_counter(l_mds_request, "request", "Requests", "req",
3277 PerfCountersBuilder::PRIO_CRITICAL);
3278 mds_plb.add_time_avg(l_mds_reply_latency, "reply_latency", "Reply latency", "rlat",
3279 PerfCountersBuilder::PRIO_CRITICAL);
3280 mds_plb.add_u64(l_mds_inodes, "inodes", "Inodes", "inos",
3281 PerfCountersBuilder::PRIO_CRITICAL);
3282 mds_plb.add_u64_counter(l_mds_forward, "forward", "Forwarding request", "fwd",
3283 PerfCountersBuilder::PRIO_INTERESTING);
3284 mds_plb.add_u64(l_mds_caps, "caps", "Capabilities", "caps",
3285 PerfCountersBuilder::PRIO_INTERESTING);
3286 mds_plb.add_u64_counter(l_mds_exported_inodes, "exported_inodes", "Exported inodes",
3287 "exi", PerfCountersBuilder::PRIO_INTERESTING);
3288 mds_plb.add_u64_counter(l_mds_imported_inodes, "imported_inodes", "Imported inodes",
3289 "imi", PerfCountersBuilder::PRIO_INTERESTING);
3290
3291 // caps msg stats
3292 mds_plb.add_u64_counter(l_mdss_handle_client_caps, "handle_client_caps",
3293 "Client caps msg", "hcc", PerfCountersBuilder::PRIO_INTERESTING);
3294 mds_plb.add_u64_counter(l_mdss_handle_client_caps_dirty, "handle_client_caps_dirty",
3295 "Client dirty caps msg", "hccd", PerfCountersBuilder::PRIO_INTERESTING);
3296 mds_plb.add_u64_counter(l_mdss_handle_client_cap_release, "handle_client_cap_release",
3297 "Client cap release msg", "hccr", PerfCountersBuilder::PRIO_INTERESTING);
3298 mds_plb.add_u64_counter(l_mdss_process_request_cap_release, "process_request_cap_release",
3299 "Process request cap release", "prcr", PerfCountersBuilder::PRIO_INTERESTING);
3300 mds_plb.add_u64_counter(l_mdss_ceph_cap_op_revoke, "ceph_cap_op_revoke",
3301 "Revoke caps", "crev", PerfCountersBuilder::PRIO_INTERESTING);
3302 mds_plb.add_u64_counter(l_mdss_ceph_cap_op_grant, "ceph_cap_op_grant",
3303 "Grant caps", "cgra", PerfCountersBuilder::PRIO_INTERESTING);
3304 mds_plb.add_u64_counter(l_mdss_ceph_cap_op_trunc, "ceph_cap_op_trunc",
3305 "caps truncate notify", "ctru", PerfCountersBuilder::PRIO_INTERESTING);
3306 mds_plb.add_u64_counter(l_mdss_ceph_cap_op_flushsnap_ack, "ceph_cap_op_flushsnap_ack",
3307 "caps truncate notify", "cfsa", PerfCountersBuilder::PRIO_INTERESTING);
3308 mds_plb.add_u64_counter(l_mdss_ceph_cap_op_flush_ack, "ceph_cap_op_flush_ack",
3309 "caps truncate notify", "cfa", PerfCountersBuilder::PRIO_INTERESTING);
3310 mds_plb.add_u64_counter(l_mdss_handle_inode_file_caps, "handle_inode_file_caps",
3311 "Inter mds caps msg", "hifc", PerfCountersBuilder::PRIO_INTERESTING);
3312
3313 // useful dir/inode/subtree stats
3314 mds_plb.set_prio_default(PerfCountersBuilder::PRIO_USEFUL);
3315 mds_plb.add_u64(l_mds_root_rfiles, "root_rfiles", "root inode rfiles");
3316 mds_plb.add_u64(l_mds_root_rbytes, "root_rbytes", "root inode rbytes");
3317 mds_plb.add_u64(l_mds_root_rsnaps, "root_rsnaps", "root inode rsnaps");
3318 mds_plb.add_u64_counter(l_mds_dir_fetch, "dir_fetch", "Directory fetch");
3319 mds_plb.add_u64_counter(l_mds_dir_commit, "dir_commit", "Directory commit");
3320 mds_plb.add_u64_counter(l_mds_dir_split, "dir_split", "Directory split");
3321 mds_plb.add_u64_counter(l_mds_dir_merge, "dir_merge", "Directory merge");
3322 mds_plb.add_u64(l_mds_inodes_pinned, "inodes_pinned", "Inodes pinned");
3323 mds_plb.add_u64(l_mds_inodes_expired, "inodes_expired", "Inodes expired");
3324 mds_plb.add_u64(l_mds_inodes_with_caps, "inodes_with_caps",
3325 "Inodes with capabilities");
3326 mds_plb.add_u64(l_mds_subtrees, "subtrees", "Subtrees");
3327 mds_plb.add_u64(l_mds_load_cent, "load_cent", "Load per cent");
3328 mds_plb.add_u64_counter(l_mds_openino_dir_fetch, "openino_dir_fetch",
3329 "OpenIno incomplete directory fetchings");
3330
3331 // low prio stats
3332 mds_plb.set_prio_default(PerfCountersBuilder::PRIO_DEBUGONLY);
3333 mds_plb.add_u64_counter(l_mds_reply, "reply", "Replies");
3334 mds_plb.add_u64(l_mds_inodes_top, "inodes_top", "Inodes on top");
3335 mds_plb.add_u64(l_mds_inodes_bottom, "inodes_bottom", "Inodes on bottom");
3336 mds_plb.add_u64(
3337 l_mds_inodes_pin_tail, "inodes_pin_tail", "Inodes on pin tail");
3338 mds_plb.add_u64_counter(l_mds_traverse, "traverse", "Traverses");
3339 mds_plb.add_u64_counter(l_mds_traverse_hit, "traverse_hit", "Traverse hits");
3340 mds_plb.add_u64_counter(l_mds_traverse_forward, "traverse_forward",
3341 "Traverse forwards");
3342 mds_plb.add_u64_counter(l_mds_traverse_discover, "traverse_discover",
3343 "Traverse directory discovers");
3344 mds_plb.add_u64_counter(l_mds_traverse_dir_fetch, "traverse_dir_fetch",
3345 "Traverse incomplete directory content fetchings");
3346 mds_plb.add_u64_counter(l_mds_traverse_remote_ino, "traverse_remote_ino",
3347 "Traverse remote dentries");
3348 mds_plb.add_u64_counter(l_mds_traverse_lock, "traverse_lock",
3349 "Traverse locks");
3350 mds_plb.add_u64(l_mds_dispatch_queue_len, "q", "Dispatch queue length");
3351 mds_plb.add_u64_counter(l_mds_exported, "exported", "Exports");
3352 mds_plb.add_u64_counter(l_mds_imported, "imported", "Imports");
3353 mds_plb.add_u64_counter(l_mds_openino_backtrace_fetch, "openino_backtrace_fetch",
3354 "OpenIno backtrace fetchings");
3355 mds_plb.add_u64_counter(l_mds_openino_peer_discover, "openino_peer_discover",
3356 "OpenIno peer inode discovers");
3357
3358 // scrub stats
3359 mds_plb.add_u64(l_mds_scrub_backtrace_fetch, "scrub_backtrace_fetch",
3360 "Scrub backtrace fetchings");
3361 mds_plb.add_u64(l_mds_scrub_set_tag, "scrub_set_tag",
3362 "Scrub set tags");
3363 mds_plb.add_u64(l_mds_scrub_backtrace_repaired, "scrub_backtrace_repaired",
3364 "Scrub backtraces repaired");
3365 mds_plb.add_u64(l_mds_scrub_inotable_repaired, "scrub_inotable_repaired",
3366 "Scrub inotable repaired");
3367 mds_plb.add_u64(l_mds_scrub_dir_inodes, "scrub_dir_inodes",
3368 "Scrub directory inodes");
3369 mds_plb.add_u64(l_mds_scrub_dir_base_inodes, "scrub_dir_base_inodes",
3370 "Scrub directory base inodes");
3371 mds_plb.add_u64(l_mds_scrub_dirfrag_rstats, "scrub_dirfrag_rstats",
3372 "Scrub dirfrags rstates");
3373 mds_plb.add_u64(l_mds_scrub_file_inodes, "scrub_file_inodes",
3374 "Scrub file inodes");
3375
3376 logger = mds_plb.create_perf_counters();
3377 g_ceph_context->get_perfcounters_collection()->add(logger);
3378 }
3379
3380 {
3381 PerfCountersBuilder mdm_plb(g_ceph_context, "mds_mem", l_mdm_first, l_mdm_last);
3382 mdm_plb.add_u64(l_mdm_ino, "ino", "Inodes", "ino",
3383 PerfCountersBuilder::PRIO_INTERESTING);
3384 mdm_plb.add_u64(l_mdm_dn, "dn", "Dentries", "dn",
3385 PerfCountersBuilder::PRIO_INTERESTING);
3386
3387 mdm_plb.set_prio_default(PerfCountersBuilder::PRIO_USEFUL);
3388 mdm_plb.add_u64_counter(l_mdm_inoa, "ino+", "Inodes opened");
3389 mdm_plb.add_u64_counter(l_mdm_inos, "ino-", "Inodes closed");
3390 mdm_plb.add_u64(l_mdm_dir, "dir", "Directories");
3391 mdm_plb.add_u64_counter(l_mdm_dira, "dir+", "Directories opened");
3392 mdm_plb.add_u64_counter(l_mdm_dirs, "dir-", "Directories closed");
3393 mdm_plb.add_u64_counter(l_mdm_dna, "dn+", "Dentries opened");
3394 mdm_plb.add_u64_counter(l_mdm_dns, "dn-", "Dentries closed");
3395 mdm_plb.add_u64(l_mdm_cap, "cap", "Capabilities");
3396 mdm_plb.add_u64_counter(l_mdm_capa, "cap+", "Capabilities added");
3397 mdm_plb.add_u64_counter(l_mdm_caps, "cap-", "Capabilities removed");
3398 mdm_plb.add_u64(l_mdm_heap, "heap", "Heap size");
3399
3400 mdm_plb.set_prio_default(PerfCountersBuilder::PRIO_DEBUGONLY);
3401 mdm_plb.add_u64(l_mdm_rss, "rss", "RSS");
3402
3403 mlogger = mdm_plb.create_perf_counters();
3404 g_ceph_context->get_perfcounters_collection()->add(mlogger);
3405 }
3406
3407 mdlog->create_logger();
3408 server->create_logger();
3409 purge_queue.create_logger();
3410 sessionmap.register_perfcounters();
3411 mdcache->register_perfcounters();
3412 }
3413
3414 void MDSRank::check_ops_in_flight()
3415 {
3416 string summary;
3417 vector<string> warnings;
3418 int slow = 0;
3419 if (op_tracker.check_ops_in_flight(&summary, warnings, &slow)) {
3420 clog->warn() << summary;
3421 for (const auto& warning : warnings) {
3422 clog->warn() << warning;
3423 }
3424 }
3425
3426 // set mds slow request count
3427 mds_slow_req_count = slow;
3428 return;
3429 }
3430
3431 void MDSRankDispatcher::handle_osd_map()
3432 {
3433 if (is_active() &&
3434 mdsmap->get_tableserver() == whoami) {
3435 snapserver->check_osd_map(true);
3436 }
3437
3438 server->handle_osd_map();
3439
3440 purge_queue.update_op_limit(*mdsmap);
3441
3442 std::set<entity_addr_t> newly_blocklisted;
3443 objecter->consume_blocklist_events(&newly_blocklisted);
3444 auto epoch = objecter->with_osdmap([](const OSDMap &o){return o.get_epoch();});
3445 dout(4) << "handle_osd_map epoch " << epoch << ", "
3446 << newly_blocklisted.size() << " new blocklist entries" << dendl;
3447 auto victims = server->apply_blocklist(newly_blocklisted);
3448 if (victims) {
3449 set_osd_epoch_barrier(epoch);
3450 }
3451
3452
3453 // By default the objecter only requests OSDMap updates on use,
3454 // we would like to always receive the latest maps in order to
3455 // apply policy based on the FULL flag.
3456 objecter->maybe_request_map();
3457 }
3458
3459 int MDSRank::config_client(int64_t session_id, bool remove,
3460 const std::string& option, const std::string& value,
3461 std::ostream& ss)
3462 {
3463 Session *session = sessionmap.get_session(entity_name_t(CEPH_ENTITY_TYPE_CLIENT, session_id));
3464 if (!session) {
3465 ss << "session " << session_id << " not in sessionmap!";
3466 return -CEPHFS_ENOENT;
3467 }
3468
3469 if (option == "timeout") {
3470 if (remove) {
3471 auto it = session->info.client_metadata.find("timeout");
3472 if (it == session->info.client_metadata.end()) {
3473 ss << "Nonexistent config: " << option;
3474 return -CEPHFS_ENODATA;
3475 }
3476 session->info.client_metadata.erase(it);
3477 } else {
3478 char *end;
3479 strtoul(value.c_str(), &end, 0);
3480 if (*end) {
3481 ss << "Invalid config for timeout: " << value;
3482 return -CEPHFS_EINVAL;
3483 }
3484 session->info.client_metadata[option] = value;
3485 }
3486 //sessionmap._mark_dirty(session, true);
3487 } else {
3488 ss << "Invalid config option: " << option;
3489 return -CEPHFS_EINVAL;
3490 }
3491
3492 return 0;
3493 }
3494
3495 bool MDSRank::evict_client(int64_t session_id,
3496 bool wait, bool blocklist, std::ostream& err_ss,
3497 Context *on_killed)
3498 {
3499 ceph_assert(ceph_mutex_is_locked_by_me(mds_lock));
3500
3501 // Mutually exclusive args
3502 ceph_assert(!(wait && on_killed != nullptr));
3503
3504 if (is_any_replay()) {
3505 err_ss << "MDS is replaying log";
3506 return false;
3507 }
3508
3509 Session *session = sessionmap.get_session(
3510 entity_name_t(CEPH_ENTITY_TYPE_CLIENT, session_id));
3511 if (!session) {
3512 err_ss << "session " << session_id << " not in sessionmap!";
3513 return false;
3514 }
3515
3516 auto& addr = session->info.inst.addr;
3517 {
3518 CachedStackStringStream css;
3519 *css << "Evicting " << (blocklist ? "(and blocklisting) " : "")
3520 << "client session " << session_id << " (" << addr << ")";
3521 dout(1) << css->strv() << dendl;
3522 clog->info() << css->strv();
3523 }
3524
3525 dout(4) << "Preparing blocklist command... (wait=" << wait << ")" << dendl;
3526 CachedStackStringStream css;
3527 *css << "{\"prefix\":\"osd blocklist\", \"blocklistop\":\"add\",";
3528 *css << "\"addr\":\"";
3529 *css << addr;
3530 *css << "\"}";
3531 std::vector<std::string> cmd = {css->str()};
3532
3533 auto kill_client_session = [this, session_id, wait, on_killed](){
3534 ceph_assert(ceph_mutex_is_locked_by_me(mds_lock));
3535 Session *session = sessionmap.get_session(
3536 entity_name_t(CEPH_ENTITY_TYPE_CLIENT, session_id));
3537 if (session) {
3538 if (on_killed || !wait) {
3539 server->kill_session(session, on_killed);
3540 } else {
3541 C_SaferCond on_safe;
3542 server->kill_session(session, &on_safe);
3543
3544 mds_lock.unlock();
3545 on_safe.wait();
3546 mds_lock.lock();
3547 }
3548 } else {
3549 dout(1) << "session " << session_id << " was removed while we waited "
3550 "for blocklist" << dendl;
3551
3552 // Even though it wasn't us that removed it, kick our completion
3553 // as the session has been removed.
3554 if (on_killed) {
3555 on_killed->complete(0);
3556 }
3557 }
3558 };
3559
3560 auto apply_blocklist = [this, cmd](std::function<void ()> fn){
3561 ceph_assert(ceph_mutex_is_locked_by_me(mds_lock));
3562
3563 Context *on_blocklist_done = new LambdaContext([this, fn](int r) {
3564 objecter->wait_for_latest_osdmap(
3565 lambdafy((new C_OnFinisher(
3566 new LambdaContext([this, fn](int r) {
3567 std::lock_guard l(mds_lock);
3568 auto epoch = objecter->with_osdmap([](const OSDMap &o){
3569 return o.get_epoch();
3570 });
3571
3572 set_osd_epoch_barrier(epoch);
3573
3574 fn();
3575 }), finisher)
3576 )));
3577 });
3578
3579 dout(4) << "Sending mon blocklist command: " << cmd[0] << dendl;
3580 monc->start_mon_command(cmd, {}, nullptr, nullptr, on_blocklist_done);
3581 };
3582
3583 if (wait) {
3584 if (blocklist) {
3585 C_SaferCond inline_ctx;
3586 apply_blocklist([&inline_ctx](){inline_ctx.complete(0);});
3587 mds_lock.unlock();
3588 inline_ctx.wait();
3589 mds_lock.lock();
3590 }
3591
3592 // We dropped mds_lock, so check that session still exists
3593 session = sessionmap.get_session(entity_name_t(CEPH_ENTITY_TYPE_CLIENT,
3594 session_id));
3595 if (!session) {
3596 dout(1) << "session " << session_id << " was removed while we waited "
3597 "for blocklist" << dendl;
3598 return true;
3599 }
3600 kill_client_session();
3601 } else {
3602 if (blocklist) {
3603 apply_blocklist(kill_client_session);
3604 } else {
3605 kill_client_session();
3606 }
3607 }
3608
3609 return true;
3610 }
3611
3612 MDSRankDispatcher::MDSRankDispatcher(
3613 mds_rank_t whoami_,
3614 std::string fs_name_,
3615 ceph::mutex &mds_lock_,
3616 LogChannelRef &clog_,
3617 SafeTimer &timer_,
3618 Beacon &beacon_,
3619 std::unique_ptr<MDSMap> &mdsmap_,
3620 Messenger *msgr,
3621 MonClient *monc_,
3622 MgrClient *mgrc,
3623 Context *respawn_hook_,
3624 Context *suicide_hook_,
3625 boost::asio::io_context& ioc)
3626 : MDSRank(whoami_, fs_name_, mds_lock_, clog_, timer_, beacon_, mdsmap_,
3627 msgr, monc_, mgrc, respawn_hook_, suicide_hook_, ioc)
3628 {
3629 g_conf().add_observer(this);
3630 }
3631
3632 void MDSRank::command_cache_drop(uint64_t timeout, Formatter *f, Context *on_finish) {
3633 dout(20) << __func__ << dendl;
3634
3635 std::lock_guard locker(mds_lock);
3636 C_Drop_Cache *request = new C_Drop_Cache(server, mdcache, mdlog, this,
3637 timeout, f, on_finish);
3638 request->send();
3639 }
3640
3641 epoch_t MDSRank::get_osd_epoch() const
3642 {
3643 return objecter->with_osdmap(std::mem_fn(&OSDMap::get_epoch));
3644 }
3645
3646 const char** MDSRankDispatcher::get_tracked_conf_keys() const
3647 {
3648 static const char* KEYS[] = {
3649 "clog_to_graylog",
3650 "clog_to_graylog_host",
3651 "clog_to_graylog_port",
3652 "clog_to_monitors",
3653 "clog_to_syslog",
3654 "clog_to_syslog_facility",
3655 "clog_to_syslog_level",
3656 "fsid",
3657 "host",
3658 "mds_bal_fragment_dirs",
3659 "mds_bal_fragment_interval",
3660 "mds_cache_memory_limit",
3661 "mds_cache_mid",
3662 "mds_cache_reservation",
3663 "mds_cache_trim_decay_rate",
3664 "mds_cap_revoke_eviction_timeout",
3665 "mds_dump_cache_threshold_file",
3666 "mds_dump_cache_threshold_formatter",
3667 "mds_enable_op_tracker",
3668 "mds_export_ephemeral_random",
3669 "mds_export_ephemeral_random_max",
3670 "mds_export_ephemeral_distributed",
3671 "mds_health_cache_threshold",
3672 "mds_inject_migrator_session_race",
3673 "mds_log_pause",
3674 "mds_max_export_size",
3675 "mds_max_purge_files",
3676 "mds_forward_all_requests_to_auth",
3677 "mds_max_purge_ops",
3678 "mds_max_purge_ops_per_pg",
3679 "mds_max_snaps_per_dir",
3680 "mds_op_complaint_time",
3681 "mds_op_history_duration",
3682 "mds_op_history_size",
3683 "mds_op_log_threshold",
3684 "mds_recall_max_decay_rate",
3685 "mds_recall_warning_decay_rate",
3686 "mds_request_load_average_decay_rate",
3687 "mds_session_cache_liveness_decay_rate",
3688 "mds_heartbeat_grace",
3689 "mds_session_cap_acquisition_decay_rate",
3690 "mds_max_caps_per_client",
3691 "mds_session_cap_acquisition_throttle",
3692 "mds_session_max_caps_throttle_ratio",
3693 "mds_cap_acquisition_throttle_retry_request_time",
3694 "mds_alternate_name_max",
3695 NULL
3696 };
3697 return KEYS;
3698 }
3699
3700 void MDSRankDispatcher::handle_conf_change(const ConfigProxy& conf, const std::set<std::string>& changed)
3701 {
3702 // XXX with or without mds_lock!
3703
3704 if (changed.count("mds_heartbeat_grace")) {
3705 heartbeat_grace = conf.get_val<double>("mds_heartbeat_grace");
3706 }
3707 if (changed.count("mds_op_complaint_time") || changed.count("mds_op_log_threshold")) {
3708 op_tracker.set_complaint_and_threshold(conf->mds_op_complaint_time, conf->mds_op_log_threshold);
3709 }
3710 if (changed.count("mds_op_history_size") || changed.count("mds_op_history_duration")) {
3711 op_tracker.set_history_size_and_duration(conf->mds_op_history_size, conf->mds_op_history_duration);
3712 }
3713 if (changed.count("mds_enable_op_tracker")) {
3714 op_tracker.set_tracking(conf->mds_enable_op_tracker);
3715 }
3716 if (changed.count("clog_to_monitors") ||
3717 changed.count("clog_to_syslog") ||
3718 changed.count("clog_to_syslog_level") ||
3719 changed.count("clog_to_syslog_facility") ||
3720 changed.count("clog_to_graylog") ||
3721 changed.count("clog_to_graylog_host") ||
3722 changed.count("clog_to_graylog_port") ||
3723 changed.count("host") ||
3724 changed.count("fsid")) {
3725 update_log_config();
3726 }
3727
3728 finisher->queue(new LambdaContext([this, changed](int) {
3729 std::scoped_lock lock(mds_lock);
3730
3731 dout(10) << "flushing conf change to components: " << changed << dendl;
3732
3733 if (changed.count("mds_log_pause") && !g_conf()->mds_log_pause) {
3734 mdlog->kick_submitter();
3735 }
3736 sessionmap.handle_conf_change(changed);
3737 server->handle_conf_change(changed);
3738 mdcache->handle_conf_change(changed, *mdsmap);
3739 purge_queue.handle_conf_change(changed, *mdsmap);
3740 }));
3741 }
3742
3743 void MDSRank::get_task_status(std::map<std::string, std::string> *status) {
3744 dout(20) << __func__ << dendl;
3745
3746 // scrub summary for now..
3747 std::string_view scrub_summary = scrubstack->scrub_summary();
3748 if (!ScrubStack::is_idle(scrub_summary)) {
3749 send_status = true;
3750 status->emplace(SCRUB_STATUS_KEY, std::move(scrub_summary));
3751 }
3752 }
3753
3754 void MDSRank::schedule_update_timer_task() {
3755 dout(20) << __func__ << dendl;
3756
3757 timer.add_event_after(g_conf().get_val<double>("mds_task_status_update_interval"),
3758 new LambdaContext([this](int) {
3759 send_task_status();
3760 }));
3761 }
3762
3763 void MDSRank::send_task_status() {
3764 std::map<std::string, std::string> status;
3765 get_task_status(&status);
3766
3767 if (send_status) {
3768 if (status.empty()) {
3769 send_status = false;
3770 }
3771
3772 dout(20) << __func__ << ": updating " << status.size() << " status keys" << dendl;
3773 int r = mgrc->service_daemon_update_task_status(std::move(status));
3774 if (r < 0) {
3775 derr << ": failed to update service daemon status: " << cpp_strerror(r) << dendl;
3776 }
3777
3778 }
3779
3780 schedule_update_timer_task();
3781 }