]> git.proxmox.com Git - ceph.git/blob - ceph/src/mds/MDSRank.cc
6c18068893d36138df16c657ece919811fec7123
[ceph.git] / ceph / src / mds / MDSRank.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2015 Red Hat
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #include <string_view>
16
17 #include "common/debug.h"
18 #include "common/errno.h"
19
20 #include "messages/MClientRequestForward.h"
21 #include "messages/MMDSLoadTargets.h"
22 #include "messages/MMDSTableRequest.h"
23
24 #include "mgr/MgrClient.h"
25
26 #include "MDSDaemon.h"
27 #include "MDSMap.h"
28 #include "SnapClient.h"
29 #include "SnapServer.h"
30 #include "MDBalancer.h"
31 #include "Migrator.h"
32 #include "Locker.h"
33 #include "InoTable.h"
34 #include "mon/MonClient.h"
35 #include "common/HeartbeatMap.h"
36 #include "ScrubStack.h"
37
38
39 #include "MDSRank.h"
40
41 #define dout_context g_ceph_context
42 #define dout_subsys ceph_subsys_mds
43 #undef dout_prefix
44 #define dout_prefix *_dout << "mds." << whoami << '.' << incarnation << ' '
45 using TOPNSPC::common::cmd_getval;
46 class C_Flush_Journal : public MDSInternalContext {
47 public:
48 C_Flush_Journal(MDCache *mdcache, MDLog *mdlog, MDSRank *mds,
49 std::ostream *ss, Context *on_finish)
50 : MDSInternalContext(mds),
51 mdcache(mdcache), mdlog(mdlog), ss(ss), on_finish(on_finish),
52 whoami(mds->whoami), incarnation(mds->incarnation) {
53 }
54
55 void send() {
56 assert(ceph_mutex_is_locked(mds->mds_lock));
57
58 dout(20) << __func__ << dendl;
59
60 if (mdcache->is_readonly()) {
61 dout(5) << __func__ << ": read-only FS" << dendl;
62 complete(-EROFS);
63 return;
64 }
65
66 if (!mds->is_active()) {
67 dout(5) << __func__ << ": MDS not active, no-op" << dendl;
68 complete(0);
69 return;
70 }
71
72 flush_mdlog();
73 }
74
75 private:
76
77 void flush_mdlog() {
78 dout(20) << __func__ << dendl;
79
80 // I need to seal off the current segment, and then mark all
81 // previous segments for expiry
82 mdlog->start_new_segment();
83
84 Context *ctx = new LambdaContext([this](int r) {
85 handle_flush_mdlog(r);
86 });
87
88 // Flush initially so that all the segments older than our new one
89 // will be elegible for expiry
90 mdlog->flush();
91 mdlog->wait_for_safe(new MDSInternalContextWrapper(mds, ctx));
92 }
93
94 void handle_flush_mdlog(int r) {
95 dout(20) << __func__ << ": r=" << r << dendl;
96
97 if (r != 0) {
98 *ss << "Error " << r << " (" << cpp_strerror(r) << ") while flushing journal";
99 complete(r);
100 return;
101 }
102
103 clear_mdlog();
104 }
105
106 void clear_mdlog() {
107 dout(20) << __func__ << dendl;
108
109 Context *ctx = new LambdaContext([this](int r) {
110 handle_clear_mdlog(r);
111 });
112
113 // Because we may not be the last wait_for_safe context on MDLog,
114 // and subsequent contexts might wake up in the middle of our
115 // later trim_all and interfere with expiry (by e.g. marking
116 // dirs/dentries dirty on previous log segments), we run a second
117 // wait_for_safe here. See #10368
118 mdlog->wait_for_safe(new MDSInternalContextWrapper(mds, ctx));
119 }
120
121 void handle_clear_mdlog(int r) {
122 dout(20) << __func__ << ": r=" << r << dendl;
123
124 if (r != 0) {
125 *ss << "Error " << r << " (" << cpp_strerror(r) << ") while flushing journal";
126 complete(r);
127 return;
128 }
129
130 trim_mdlog();
131 }
132
133 void trim_mdlog() {
134 // Put all the old log segments into expiring or expired state
135 dout(5) << __func__ << ": beginning segment expiry" << dendl;
136
137 int ret = mdlog->trim_all();
138 if (ret != 0) {
139 *ss << "Error " << ret << " (" << cpp_strerror(ret) << ") while trimming log";
140 complete(ret);
141 return;
142 }
143
144 expire_segments();
145 }
146
147 void expire_segments() {
148 dout(20) << __func__ << dendl;
149
150 // Attach contexts to wait for all expiring segments to expire
151 MDSGatherBuilder *expiry_gather = new MDSGatherBuilder(g_ceph_context);
152
153 const auto &expiring_segments = mdlog->get_expiring_segments();
154 for (auto p : expiring_segments) {
155 p->wait_for_expiry(expiry_gather->new_sub());
156 }
157 dout(5) << __func__ << ": waiting for " << expiry_gather->num_subs_created()
158 << " segments to expire" << dendl;
159
160 if (!expiry_gather->has_subs()) {
161 trim_segments();
162 delete expiry_gather;
163 return;
164 }
165
166 Context *ctx = new LambdaContext([this](int r) {
167 handle_expire_segments(r);
168 });
169 expiry_gather->set_finisher(new MDSInternalContextWrapper(mds, ctx));
170 expiry_gather->activate();
171 }
172
173 void handle_expire_segments(int r) {
174 dout(20) << __func__ << ": r=" << r << dendl;
175
176 ceph_assert(r == 0); // MDLog is not allowed to raise errors via
177 // wait_for_expiry
178 trim_segments();
179 }
180
181 void trim_segments() {
182 dout(20) << __func__ << dendl;
183
184 Context *ctx = new C_OnFinisher(new LambdaContext([this](int) {
185 std::lock_guard locker(mds->mds_lock);
186 trim_expired_segments();
187 }), mds->finisher);
188 ctx->complete(0);
189 }
190
191 void trim_expired_segments() {
192 dout(5) << __func__ << ": expiry complete, expire_pos/trim_pos is now "
193 << std::hex << mdlog->get_journaler()->get_expire_pos() << "/"
194 << mdlog->get_journaler()->get_trimmed_pos() << dendl;
195
196 // Now everyone I'm interested in is expired
197 mdlog->trim_expired_segments();
198
199 dout(5) << __func__ << ": trim complete, expire_pos/trim_pos is now "
200 << std::hex << mdlog->get_journaler()->get_expire_pos() << "/"
201 << mdlog->get_journaler()->get_trimmed_pos() << dendl;
202
203 write_journal_head();
204 }
205
206 void write_journal_head() {
207 dout(20) << __func__ << dendl;
208
209 Context *ctx = new LambdaContext([this](int r) {
210 std::lock_guard locker(mds->mds_lock);
211 handle_write_head(r);
212 });
213 // Flush the journal header so that readers will start from after
214 // the flushed region
215 mdlog->get_journaler()->write_head(ctx);
216 }
217
218 void handle_write_head(int r) {
219 if (r != 0) {
220 *ss << "Error " << r << " (" << cpp_strerror(r) << ") while writing header";
221 } else {
222 dout(5) << __func__ << ": write_head complete, all done!" << dendl;
223 }
224
225 complete(r);
226 }
227
228 void finish(int r) override {
229 dout(20) << __func__ << ": r=" << r << dendl;
230 on_finish->complete(r);
231 }
232
233 MDCache *mdcache;
234 MDLog *mdlog;
235 std::ostream *ss;
236 Context *on_finish;
237
238 // so as to use dout
239 mds_rank_t whoami;
240 int incarnation;
241 };
242
243 class C_Drop_Cache : public MDSInternalContext {
244 public:
245 C_Drop_Cache(Server *server, MDCache *mdcache, MDLog *mdlog,
246 MDSRank *mds, uint64_t recall_timeout,
247 Formatter *f, Context *on_finish)
248 : MDSInternalContext(mds),
249 server(server), mdcache(mdcache), mdlog(mdlog),
250 recall_timeout(recall_timeout), recall_start(mono_clock::now()),
251 f(f), on_finish(on_finish),
252 whoami(mds->whoami), incarnation(mds->incarnation) {
253 }
254
255 void send() {
256 // not really a hard requirement here, but lets ensure this in
257 // case we change the logic here.
258 assert(ceph_mutex_is_locked(mds->mds_lock));
259
260 dout(20) << __func__ << dendl;
261 f->open_object_section("result");
262 recall_client_state();
263 }
264
265 private:
266 // context which completes itself (with -ETIMEDOUT) after a specified
267 // timeout or when explicitly completed, whichever comes first. Note
268 // that the context does not detroy itself after completion -- it
269 // needs to be explicitly freed.
270 class C_ContextTimeout : public MDSInternalContext {
271 public:
272 C_ContextTimeout(MDSRank *mds, uint64_t timeout, Context *on_finish)
273 : MDSInternalContext(mds),
274 timeout(timeout),
275 on_finish(on_finish) {
276 }
277 ~C_ContextTimeout() {
278 ceph_assert(timer_task == nullptr);
279 }
280
281 void start_timer() {
282 if (!timeout) {
283 return;
284 }
285
286 timer_task = new LambdaContext([this](int) {
287 timer_task = nullptr;
288 complete(-ETIMEDOUT);
289 });
290 mds->timer.add_event_after(timeout, timer_task);
291 }
292
293 void finish(int r) override {
294 Context *ctx = nullptr;
295 {
296 std::lock_guard locker(lock);
297 std::swap(on_finish, ctx);
298 }
299 if (ctx != nullptr) {
300 ctx->complete(r);
301 }
302 }
303 void complete(int r) override {
304 if (timer_task != nullptr) {
305 mds->timer.cancel_event(timer_task);
306 }
307
308 finish(r);
309 }
310
311 uint64_t timeout;
312 ceph::mutex lock = ceph::make_mutex("mds::context::timeout");
313 Context *on_finish = nullptr;
314 Context *timer_task = nullptr;
315 };
316
317 auto do_trim() {
318 auto [throttled, count] = mdcache->trim(UINT64_MAX);
319 dout(10) << __func__
320 << (throttled ? " (throttled)" : "")
321 << " trimmed " << count << " caps" << dendl;
322 dentries_trimmed += count;
323 return std::make_pair(throttled, count);
324 }
325
326 void recall_client_state() {
327 dout(20) << __func__ << dendl;
328 auto now = mono_clock::now();
329 auto duration = std::chrono::duration<double>(now-recall_start).count();
330
331 MDSGatherBuilder *gather = new MDSGatherBuilder(g_ceph_context);
332 auto flags = Server::RecallFlags::STEADY|Server::RecallFlags::TRIM;
333 auto [throttled, count] = server->recall_client_state(gather, flags);
334 dout(10) << __func__
335 << (throttled ? " (throttled)" : "")
336 << " recalled " << count << " caps" << dendl;
337
338 caps_recalled += count;
339 if ((throttled || count > 0) && (recall_timeout == 0 || duration < recall_timeout)) {
340 C_ContextTimeout *ctx = new C_ContextTimeout(
341 mds, 1, new LambdaContext([this](int r) {
342 recall_client_state();
343 }));
344 ctx->start_timer();
345 gather->set_finisher(new MDSInternalContextWrapper(mds, ctx));
346 gather->activate();
347 mdlog->flush(); /* use down-time to incrementally flush log */
348 do_trim(); /* use down-time to incrementally trim cache */
349 } else {
350 if (!gather->has_subs()) {
351 delete gather;
352 return handle_recall_client_state(0);
353 } else if (recall_timeout > 0 && duration > recall_timeout) {
354 gather->set_finisher(new C_MDSInternalNoop);
355 gather->activate();
356 return handle_recall_client_state(-ETIMEDOUT);
357 } else {
358 uint64_t remaining = (recall_timeout == 0 ? 0 : recall_timeout-duration);
359 C_ContextTimeout *ctx = new C_ContextTimeout(
360 mds, remaining, new LambdaContext([this](int r) {
361 handle_recall_client_state(r);
362 }));
363
364 ctx->start_timer();
365 gather->set_finisher(new MDSInternalContextWrapper(mds, ctx));
366 gather->activate();
367 }
368 }
369 }
370
371 void handle_recall_client_state(int r) {
372 dout(20) << __func__ << ": r=" << r << dendl;
373
374 // client recall section
375 f->open_object_section("client_recall");
376 f->dump_int("return_code", r);
377 f->dump_string("message", cpp_strerror(r));
378 f->dump_int("recalled", caps_recalled);
379 f->close_section();
380
381 // we can still continue after recall timeout
382 flush_journal();
383 }
384
385 void flush_journal() {
386 dout(20) << __func__ << dendl;
387
388 Context *ctx = new LambdaContext([this](int r) {
389 handle_flush_journal(r);
390 });
391
392 C_Flush_Journal *flush_journal = new C_Flush_Journal(mdcache, mdlog, mds, &ss, ctx);
393 flush_journal->send();
394 }
395
396 void handle_flush_journal(int r) {
397 dout(20) << __func__ << ": r=" << r << dendl;
398
399 if (r != 0) {
400 cmd_err(f, ss.str());
401 complete(r);
402 return;
403 }
404
405 // journal flush section
406 f->open_object_section("flush_journal");
407 f->dump_int("return_code", r);
408 f->dump_string("message", ss.str());
409 f->close_section();
410
411 trim_cache();
412 }
413
414 void trim_cache() {
415 dout(20) << __func__ << dendl;
416
417 auto [throttled, count] = do_trim();
418 if (throttled && count > 0) {
419 auto timer = new LambdaContext([this](int) {
420 trim_cache();
421 });
422 mds->timer.add_event_after(1.0, timer);
423 } else {
424 cache_status();
425 }
426 }
427
428 void cache_status() {
429 dout(20) << __func__ << dendl;
430
431 f->open_object_section("trim_cache");
432 f->dump_int("trimmed", dentries_trimmed);
433 f->close_section();
434
435 // cache status section
436 mdcache->cache_status(f);
437
438 complete(0);
439 }
440
441 void finish(int r) override {
442 dout(20) << __func__ << ": r=" << r << dendl;
443
444 auto d = std::chrono::duration<double>(mono_clock::now()-recall_start);
445 f->dump_float("duration", d.count());
446
447 f->close_section();
448 on_finish->complete(r);
449 }
450
451 Server *server;
452 MDCache *mdcache;
453 MDLog *mdlog;
454 uint64_t recall_timeout;
455 mono_time recall_start;
456 Formatter *f;
457 Context *on_finish;
458
459 int retval = 0;
460 std::stringstream ss;
461 uint64_t caps_recalled = 0;
462 uint64_t dentries_trimmed = 0;
463
464 // so as to use dout
465 mds_rank_t whoami;
466 int incarnation;
467
468 void cmd_err(Formatter *f, std::string_view err) {
469 f->reset();
470 f->open_object_section("result");
471 f->dump_string("error", err);
472 f->close_section();
473 }
474 };
475
476 MDSRank::MDSRank(
477 mds_rank_t whoami_,
478 ceph::mutex &mds_lock_,
479 LogChannelRef &clog_,
480 SafeTimer &timer_,
481 Beacon &beacon_,
482 std::unique_ptr<MDSMap>& mdsmap_,
483 Messenger *msgr,
484 MonClient *monc_,
485 MgrClient *mgrc,
486 Context *respawn_hook_,
487 Context *suicide_hook_) :
488 cct(msgr->cct), mds_lock(mds_lock_), clog(clog_),
489 timer(timer_), mdsmap(mdsmap_),
490 objecter(new Objecter(g_ceph_context, msgr, monc_, nullptr, 0, 0)),
491 damage_table(whoami_), sessionmap(this),
492 op_tracker(g_ceph_context, g_conf()->mds_enable_op_tracker,
493 g_conf()->osd_num_op_tracker_shard),
494 progress_thread(this), whoami(whoami_),
495 purge_queue(g_ceph_context, whoami_,
496 mdsmap_->get_metadata_pool(), objecter,
497 new LambdaContext([this](int r) {
498 std::lock_guard l(mds_lock);
499 handle_write_error(r);
500 }
501 )
502 ),
503 beacon(beacon_),
504 messenger(msgr), monc(monc_), mgrc(mgrc),
505 respawn_hook(respawn_hook_),
506 suicide_hook(suicide_hook_),
507 starttime(mono_clock::now())
508 {
509 hb = g_ceph_context->get_heartbeat_map()->add_worker("MDSRank", pthread_self());
510
511 purge_queue.update_op_limit(*mdsmap);
512
513 objecter->unset_honor_pool_full();
514
515 finisher = new Finisher(cct, "MDSRank", "MR_Finisher");
516
517 mdcache = new MDCache(this, purge_queue);
518 mdlog = new MDLog(this);
519 balancer = new MDBalancer(this, messenger, monc);
520
521 scrubstack = new ScrubStack(mdcache, clog, finisher);
522
523 inotable = new InoTable(this);
524 snapserver = new SnapServer(this, monc);
525 snapclient = new SnapClient(this);
526
527 server = new Server(this);
528 locker = new Locker(this, mdcache);
529
530 op_tracker.set_complaint_and_threshold(cct->_conf->mds_op_complaint_time,
531 cct->_conf->mds_op_log_threshold);
532 op_tracker.set_history_size_and_duration(cct->_conf->mds_op_history_size,
533 cct->_conf->mds_op_history_duration);
534
535 schedule_update_timer_task();
536 }
537
538 MDSRank::~MDSRank()
539 {
540 if (hb) {
541 g_ceph_context->get_heartbeat_map()->remove_worker(hb);
542 }
543
544 if (scrubstack) { delete scrubstack; scrubstack = NULL; }
545 if (mdcache) { delete mdcache; mdcache = NULL; }
546 if (mdlog) { delete mdlog; mdlog = NULL; }
547 if (balancer) { delete balancer; balancer = NULL; }
548 if (inotable) { delete inotable; inotable = NULL; }
549 if (snapserver) { delete snapserver; snapserver = NULL; }
550 if (snapclient) { delete snapclient; snapclient = NULL; }
551
552 if (server) { delete server; server = 0; }
553 if (locker) { delete locker; locker = 0; }
554
555 if (logger) {
556 g_ceph_context->get_perfcounters_collection()->remove(logger);
557 delete logger;
558 logger = 0;
559 }
560 if (mlogger) {
561 g_ceph_context->get_perfcounters_collection()->remove(mlogger);
562 delete mlogger;
563 mlogger = 0;
564 }
565
566 delete finisher;
567 finisher = NULL;
568
569 delete suicide_hook;
570 suicide_hook = NULL;
571
572 delete respawn_hook;
573 respawn_hook = NULL;
574
575 delete objecter;
576 objecter = nullptr;
577 }
578
579 void MDSRankDispatcher::init()
580 {
581 objecter->init();
582 messenger->add_dispatcher_head(objecter);
583
584 objecter->start();
585
586 update_log_config();
587 create_logger();
588
589 // Expose the OSDMap (already populated during MDS::init) to anyone
590 // who is interested in it.
591 handle_osd_map();
592
593 progress_thread.create("mds_rank_progr");
594
595 purge_queue.init();
596
597 finisher->start();
598 }
599
600 void MDSRank::update_targets()
601 {
602 // get MonMap's idea of my export_targets
603 const set<mds_rank_t>& map_targets = mdsmap->get_mds_info(get_nodeid()).export_targets;
604
605 dout(20) << "updating export targets, currently " << map_targets.size() << " ranks are targets" << dendl;
606
607 bool send = false;
608 set<mds_rank_t> new_map_targets;
609
610 auto it = export_targets.begin();
611 while (it != export_targets.end()) {
612 mds_rank_t rank = it->first;
613 auto &counter = it->second;
614 dout(20) << "export target mds." << rank << " is " << counter << dendl;
615
616 double val = counter.get();
617 if (val <= 0.01) {
618 dout(15) << "export target mds." << rank << " is no longer an export target" << dendl;
619 export_targets.erase(it++);
620 send = true;
621 continue;
622 }
623 if (!map_targets.count(rank)) {
624 dout(15) << "export target mds." << rank << " not in map's export_targets" << dendl;
625 send = true;
626 }
627 new_map_targets.insert(rank);
628 it++;
629 }
630 if (new_map_targets.size() < map_targets.size()) {
631 dout(15) << "export target map holds stale targets, sending update" << dendl;
632 send = true;
633 }
634
635 if (send) {
636 dout(15) << "updating export_targets, now " << new_map_targets.size() << " ranks are targets" << dendl;
637 auto m = make_message<MMDSLoadTargets>(mds_gid_t(monc->get_global_id()), new_map_targets);
638 monc->send_mon_message(m.detach());
639 }
640 }
641
642 void MDSRank::hit_export_target(mds_rank_t rank, double amount)
643 {
644 double rate = g_conf()->mds_bal_target_decay;
645 if (amount < 0.0) {
646 amount = 100.0/g_conf()->mds_bal_target_decay; /* a good default for "i am trying to keep this export_target active" */
647 }
648 auto em = export_targets.emplace(std::piecewise_construct, std::forward_as_tuple(rank), std::forward_as_tuple(DecayRate(rate)));
649 auto &counter = em.first->second;
650 counter.hit(amount);
651 if (em.second) {
652 dout(15) << "hit export target (new) is " << counter << dendl;
653 } else {
654 dout(15) << "hit export target is " << counter << dendl;
655 }
656 }
657
658 class C_MDS_MonCommand : public MDSInternalContext {
659 std::string cmd;
660 public:
661 std::string outs;
662 C_MDS_MonCommand(MDSRank *m, std::string_view c)
663 : MDSInternalContext(m), cmd(c) {}
664 void finish(int r) override {
665 mds->_mon_command_finish(r, cmd, outs);
666 }
667 };
668
669 void MDSRank::_mon_command_finish(int r, std::string_view cmd, std::string_view outs)
670 {
671 if (r < 0) {
672 dout(0) << __func__ << ": mon command " << cmd << " failed with errno " << r
673 << " (" << outs << ")" << dendl;
674 } else {
675 dout(1) << __func__ << ": mon command " << cmd << " succeed" << dendl;
676 }
677 }
678
679 void MDSRank::set_mdsmap_multimds_snaps_allowed()
680 {
681 static bool already_sent = false;
682 if (already_sent)
683 return;
684
685 stringstream ss;
686 ss << "{\"prefix\":\"fs set\", \"fs_name\":\"" << mdsmap->get_fs_name() << "\", ";
687 ss << "\"var\":\"allow_multimds_snaps\", \"val\":\"true\", ";
688 ss << "\"confirm\":\"--yes-i-am-really-a-mds\"}";
689 std::vector<std::string> cmd = {ss.str()};
690
691 dout(0) << __func__ << ": sending mon command: " << cmd[0] << dendl;
692
693 C_MDS_MonCommand *fin = new C_MDS_MonCommand(this, cmd[0]);
694 monc->start_mon_command(cmd, {}, nullptr, &fin->outs, new C_IO_Wrapper(this, fin));
695
696 already_sent = true;
697 }
698
699 void MDSRank::mark_base_recursively_scrubbed(inodeno_t ino)
700 {
701 if (mdsmap->get_tableserver() == whoami)
702 snapserver->mark_base_recursively_scrubbed(ino);
703 }
704
705 void MDSRankDispatcher::tick()
706 {
707 heartbeat_reset();
708
709 if (beacon.is_laggy()) {
710 dout(1) << "skipping upkeep work because connection to Monitors appears laggy" << dendl;
711 return;
712 }
713
714 check_ops_in_flight();
715
716 // Wake up thread in case we use to be laggy and have waiting_for_nolaggy
717 // messages to progress.
718 progress_thread.signal();
719
720 // make sure mds log flushes, trims periodically
721 mdlog->flush();
722
723 // update average session uptime
724 sessionmap.update_average_session_age();
725
726 if (is_active() || is_stopping()) {
727 mdlog->trim(); // NOT during recovery!
728 }
729
730 // ...
731 if (is_cache_trimmable()) {
732 server->find_idle_sessions();
733 server->evict_cap_revoke_non_responders();
734 locker->tick();
735 }
736
737 // log
738 if (logger) {
739 logger->set(l_mds_subtrees, mdcache->num_subtrees());
740 mdcache->log_stat();
741 }
742
743 if (is_reconnect())
744 server->reconnect_tick();
745
746 if (is_active()) {
747 balancer->tick();
748 mdcache->find_stale_fragment_freeze();
749 mdcache->migrator->find_stale_export_freeze();
750
751 if (mdsmap->get_tableserver() == whoami) {
752 snapserver->check_osd_map(false);
753 // Filesystem was created by pre-mimic mds. Allow multi-active mds after
754 // all old snapshots are deleted.
755 if (!mdsmap->allows_multimds_snaps() &&
756 snapserver->can_allow_multimds_snaps()) {
757 set_mdsmap_multimds_snaps_allowed();
758 }
759 }
760 }
761
762 if (is_active() || is_stopping()) {
763 update_targets();
764 }
765
766 // shut down?
767 if (is_stopping()) {
768 mdlog->trim();
769 if (mdcache->shutdown_pass()) {
770 uint64_t pq_progress = 0 ;
771 uint64_t pq_total = 0;
772 size_t pq_in_flight = 0;
773 if (!purge_queue.drain(&pq_progress, &pq_total, &pq_in_flight)) {
774 dout(7) << "shutdown_pass=true, but still waiting for purge queue"
775 << dendl;
776 // This takes unbounded time, so we must indicate progress
777 // to the administrator: we do it in a slightly imperfect way
778 // by sending periodic (tick frequency) clog messages while
779 // in this state.
780 clog->info() << "MDS rank " << whoami << " waiting for purge queue ("
781 << std::dec << pq_progress << "/" << pq_total << " " << pq_in_flight
782 << " files purging" << ")";
783 } else {
784 dout(7) << "shutdown_pass=true, finished w/ shutdown, moving to "
785 "down:stopped" << dendl;
786 stopping_done();
787 }
788 }
789 else {
790 dout(7) << "shutdown_pass=false" << dendl;
791 }
792 }
793
794 // Expose ourselves to Beacon to update health indicators
795 beacon.notify_health(this);
796 }
797
798 void MDSRankDispatcher::shutdown()
799 {
800 // It should never be possible for shutdown to get called twice, because
801 // anyone picking up mds_lock checks if stopping is true and drops
802 // out if it is.
803 ceph_assert(stopping == false);
804 stopping = true;
805
806 dout(1) << __func__ << ": shutting down rank " << whoami << dendl;
807
808 g_conf().remove_observer(this);
809
810 timer.shutdown();
811
812 // MDLog has to shut down before the finisher, because some of its
813 // threads block on IOs that require finisher to complete.
814 mdlog->shutdown();
815
816 // shut down cache
817 mdcache->shutdown();
818
819 purge_queue.shutdown();
820
821 mds_lock.unlock();
822 finisher->stop(); // no flushing
823 mds_lock.lock();
824
825 if (objecter->initialized)
826 objecter->shutdown();
827
828 monc->shutdown();
829
830 op_tracker.on_shutdown();
831
832 progress_thread.shutdown();
833
834 // release mds_lock for finisher/messenger threads (e.g.
835 // MDSDaemon::ms_handle_reset called from Messenger).
836 mds_lock.unlock();
837
838 // shut down messenger
839 messenger->shutdown();
840
841 mds_lock.lock();
842
843 // Workaround unclean shutdown: HeartbeatMap will assert if
844 // worker is not removed (as we do in ~MDS), but ~MDS is not
845 // always called after suicide.
846 if (hb) {
847 g_ceph_context->get_heartbeat_map()->remove_worker(hb);
848 hb = NULL;
849 }
850 }
851
852 /**
853 * Helper for simple callbacks that call a void fn with no args.
854 */
855 class C_MDS_VoidFn : public MDSInternalContext
856 {
857 typedef void (MDSRank::*fn_ptr)();
858 protected:
859 fn_ptr fn;
860 public:
861 C_MDS_VoidFn(MDSRank *mds_, fn_ptr fn_)
862 : MDSInternalContext(mds_), fn(fn_)
863 {
864 ceph_assert(mds_);
865 ceph_assert(fn_);
866 }
867
868 void finish(int r) override
869 {
870 (mds->*fn)();
871 }
872 };
873
874 int64_t MDSRank::get_metadata_pool()
875 {
876 return mdsmap->get_metadata_pool();
877 }
878
879 MDSTableClient *MDSRank::get_table_client(int t)
880 {
881 switch (t) {
882 case TABLE_ANCHOR: return NULL;
883 case TABLE_SNAP: return snapclient;
884 default: ceph_abort();
885 }
886 }
887
888 MDSTableServer *MDSRank::get_table_server(int t)
889 {
890 switch (t) {
891 case TABLE_ANCHOR: return NULL;
892 case TABLE_SNAP: return snapserver;
893 default: ceph_abort();
894 }
895 }
896
897 void MDSRank::suicide()
898 {
899 if (suicide_hook) {
900 suicide_hook->complete(0);
901 suicide_hook = NULL;
902 }
903 }
904
905 void MDSRank::respawn()
906 {
907 if (respawn_hook) {
908 respawn_hook->complete(0);
909 respawn_hook = NULL;
910 }
911 }
912
913 void MDSRank::damaged()
914 {
915 ceph_assert(whoami != MDS_RANK_NONE);
916 ceph_assert(ceph_mutex_is_locked_by_me(mds_lock));
917
918 beacon.set_want_state(*mdsmap, MDSMap::STATE_DAMAGED);
919 monc->flush_log(); // Flush any clog error from before we were called
920 beacon.notify_health(this); // Include latest status in our swan song
921 beacon.send_and_wait(g_conf()->mds_mon_shutdown_timeout);
922
923 // It's okay if we timed out and the mon didn't get our beacon, because
924 // another daemon (or ourselves after respawn) will eventually take the
925 // rank and report DAMAGED again when it hits same problem we did.
926
927 respawn(); // Respawn into standby in case mon has other work for us
928 }
929
930 void MDSRank::damaged_unlocked()
931 {
932 std::lock_guard l(mds_lock);
933 damaged();
934 }
935
936 void MDSRank::handle_write_error(int err)
937 {
938 if (err == -EBLACKLISTED) {
939 derr << "we have been blacklisted (fenced), respawning..." << dendl;
940 respawn();
941 return;
942 }
943
944 if (g_conf()->mds_action_on_write_error >= 2) {
945 derr << "unhandled write error " << cpp_strerror(err) << ", suicide..." << dendl;
946 respawn();
947 } else if (g_conf()->mds_action_on_write_error == 1) {
948 derr << "unhandled write error " << cpp_strerror(err) << ", force readonly..." << dendl;
949 mdcache->force_readonly();
950 } else {
951 // ignore;
952 derr << "unhandled write error " << cpp_strerror(err) << ", ignore..." << dendl;
953 }
954 }
955
956 void *MDSRank::ProgressThread::entry()
957 {
958 std::unique_lock l(mds->mds_lock);
959 while (true) {
960 cond.wait(l, [this] {
961 return (mds->stopping ||
962 !mds->finished_queue.empty() ||
963 (!mds->waiting_for_nolaggy.empty() && !mds->beacon.is_laggy()));
964 });
965
966 if (mds->stopping) {
967 break;
968 }
969
970 mds->_advance_queues();
971 }
972
973 return NULL;
974 }
975
976
977 void MDSRank::ProgressThread::shutdown()
978 {
979 ceph_assert(ceph_mutex_is_locked_by_me(mds->mds_lock));
980 ceph_assert(mds->stopping);
981
982 if (am_self()) {
983 // Stopping is set, we will fall out of our main loop naturally
984 } else {
985 // Kick the thread to notice mds->stopping, and join it
986 cond.notify_all();
987 mds->mds_lock.unlock();
988 if (is_started())
989 join();
990 mds->mds_lock.lock();
991 }
992 }
993
994 bool MDSRankDispatcher::ms_dispatch(const cref_t<Message> &m)
995 {
996 if (m->get_source().is_client()) {
997 Session *session = static_cast<Session*>(m->get_connection()->get_priv().get());
998 if (session)
999 session->last_seen = Session::clock::now();
1000 }
1001
1002 inc_dispatch_depth();
1003 bool ret = _dispatch(m, true);
1004 dec_dispatch_depth();
1005 return ret;
1006 }
1007
1008 bool MDSRank::_dispatch(const cref_t<Message> &m, bool new_msg)
1009 {
1010 if (is_stale_message(m)) {
1011 return true;
1012 }
1013
1014 if (beacon.is_laggy()) {
1015 dout(5) << " laggy, deferring " << *m << dendl;
1016 waiting_for_nolaggy.push_back(m);
1017 } else if (new_msg && !waiting_for_nolaggy.empty()) {
1018 dout(5) << " there are deferred messages, deferring " << *m << dendl;
1019 waiting_for_nolaggy.push_back(m);
1020 } else {
1021 if (!handle_deferrable_message(m)) {
1022 return false;
1023 }
1024
1025 heartbeat_reset();
1026 }
1027
1028 if (dispatch_depth > 1)
1029 return true;
1030
1031 // finish any triggered contexts
1032 _advance_queues();
1033
1034 if (beacon.is_laggy()) {
1035 // We've gone laggy during dispatch, don't do any
1036 // more housekeeping
1037 return true;
1038 }
1039
1040 // hack: thrash exports
1041 static utime_t start;
1042 utime_t now = ceph_clock_now();
1043 if (start == utime_t())
1044 start = now;
1045 /*double el = now - start;
1046 if (el > 30.0 &&
1047 el < 60.0)*/
1048 for (int i=0; i<g_conf()->mds_thrash_exports; i++) {
1049 set<mds_rank_t> s;
1050 if (!is_active()) break;
1051 mdsmap->get_mds_set(s, MDSMap::STATE_ACTIVE);
1052 if (s.size() < 2 || CInode::count() < 10)
1053 break; // need peers for this to work.
1054 if (mdcache->migrator->get_num_exporting() > g_conf()->mds_thrash_exports * 5 ||
1055 mdcache->migrator->get_export_queue_size() > g_conf()->mds_thrash_exports * 10)
1056 break;
1057
1058 dout(7) << "mds thrashing exports pass " << (i+1) << "/" << g_conf()->mds_thrash_exports << dendl;
1059
1060 // pick a random dir inode
1061 CInode *in = mdcache->hack_pick_random_inode();
1062
1063 auto&& ls = in->get_dirfrags();
1064 if (!ls.empty()) { // must be an open dir.
1065 const auto& dir = ls[rand() % ls.size()];
1066 if (!dir->get_parent_dir()) continue; // must be linked.
1067 if (!dir->is_auth()) continue; // must be auth.
1068
1069 mds_rank_t dest;
1070 do {
1071 int k = rand() % s.size();
1072 set<mds_rank_t>::iterator p = s.begin();
1073 while (k--) ++p;
1074 dest = *p;
1075 } while (dest == whoami);
1076 mdcache->migrator->export_dir_nicely(dir,dest);
1077 }
1078 }
1079 // hack: thrash fragments
1080 for (int i=0; i<g_conf()->mds_thrash_fragments; i++) {
1081 if (!is_active()) break;
1082 if (mdcache->get_num_fragmenting_dirs() > 5 * g_conf()->mds_thrash_fragments) break;
1083 dout(7) << "mds thrashing fragments pass " << (i+1) << "/" << g_conf()->mds_thrash_fragments << dendl;
1084
1085 // pick a random dir inode
1086 CInode *in = mdcache->hack_pick_random_inode();
1087
1088 auto&& ls = in->get_dirfrags();
1089 if (ls.empty()) continue; // must be an open dir.
1090 CDir *dir = ls.front();
1091 if (!dir->get_parent_dir()) continue; // must be linked.
1092 if (!dir->is_auth()) continue; // must be auth.
1093 frag_t fg = dir->get_frag();
1094 if ((fg == frag_t() || (rand() % (1 << fg.bits()) == 0))) {
1095 mdcache->split_dir(dir, 1);
1096 } else {
1097 balancer->queue_merge(dir);
1098 }
1099 }
1100
1101 // hack: force hash root?
1102 /*
1103 if (false &&
1104 mdcache->get_root() &&
1105 mdcache->get_root()->dir &&
1106 !(mdcache->get_root()->dir->is_hashed() ||
1107 mdcache->get_root()->dir->is_hashing())) {
1108 dout(0) << "hashing root" << dendl;
1109 mdcache->migrator->hash_dir(mdcache->get_root()->dir);
1110 }
1111 */
1112
1113 update_mlogger();
1114 return true;
1115 }
1116
1117 void MDSRank::update_mlogger()
1118 {
1119 if (mlogger) {
1120 mlogger->set(l_mdm_ino, CInode::count());
1121 mlogger->set(l_mdm_dir, CDir::count());
1122 mlogger->set(l_mdm_dn, CDentry::count());
1123 mlogger->set(l_mdm_cap, Capability::count());
1124 mlogger->set(l_mdm_inoa, CInode::increments());
1125 mlogger->set(l_mdm_inos, CInode::decrements());
1126 mlogger->set(l_mdm_dira, CDir::increments());
1127 mlogger->set(l_mdm_dirs, CDir::decrements());
1128 mlogger->set(l_mdm_dna, CDentry::increments());
1129 mlogger->set(l_mdm_dns, CDentry::decrements());
1130 mlogger->set(l_mdm_capa, Capability::increments());
1131 mlogger->set(l_mdm_caps, Capability::decrements());
1132 }
1133 }
1134
1135 /*
1136 * lower priority messages we defer if we seem laggy
1137 */
1138 bool MDSRank::handle_deferrable_message(const cref_t<Message> &m)
1139 {
1140 int port = m->get_type() & 0xff00;
1141
1142 switch (port) {
1143 case MDS_PORT_CACHE:
1144 ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MDS);
1145 mdcache->dispatch(m);
1146 break;
1147
1148 case MDS_PORT_MIGRATOR:
1149 ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MDS);
1150 mdcache->migrator->dispatch(m);
1151 break;
1152
1153 default:
1154 switch (m->get_type()) {
1155 // SERVER
1156 case CEPH_MSG_CLIENT_SESSION:
1157 case CEPH_MSG_CLIENT_RECONNECT:
1158 case CEPH_MSG_CLIENT_RECLAIM:
1159 ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_CLIENT);
1160 // fall-thru
1161 case CEPH_MSG_CLIENT_REQUEST:
1162 server->dispatch(m);
1163 break;
1164 case MSG_MDS_SLAVE_REQUEST:
1165 ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MDS);
1166 server->dispatch(m);
1167 break;
1168
1169 case MSG_MDS_HEARTBEAT:
1170 ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MDS);
1171 balancer->proc_message(m);
1172 break;
1173
1174 case MSG_MDS_TABLE_REQUEST:
1175 ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MDS);
1176 {
1177 const cref_t<MMDSTableRequest> &req = ref_cast<MMDSTableRequest>(m);
1178 if (req->op < 0) {
1179 MDSTableClient *client = get_table_client(req->table);
1180 client->handle_request(req);
1181 } else {
1182 MDSTableServer *server = get_table_server(req->table);
1183 server->handle_request(req);
1184 }
1185 }
1186 break;
1187
1188 case MSG_MDS_LOCK:
1189 case MSG_MDS_INODEFILECAPS:
1190 ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MDS);
1191 locker->dispatch(m);
1192 break;
1193
1194 case CEPH_MSG_CLIENT_CAPS:
1195 case CEPH_MSG_CLIENT_CAPRELEASE:
1196 case CEPH_MSG_CLIENT_LEASE:
1197 ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_CLIENT);
1198 locker->dispatch(m);
1199 break;
1200
1201 default:
1202 return false;
1203 }
1204 }
1205
1206 return true;
1207 }
1208
1209 /**
1210 * Advance finished_queue and waiting_for_nolaggy.
1211 *
1212 * Usually drain both queues, but may not drain waiting_for_nolaggy
1213 * if beacon is currently laggy.
1214 */
1215 void MDSRank::_advance_queues()
1216 {
1217 ceph_assert(ceph_mutex_is_locked_by_me(mds_lock));
1218
1219 if (!finished_queue.empty()) {
1220 dout(7) << "mds has " << finished_queue.size() << " queued contexts" << dendl;
1221 while (!finished_queue.empty()) {
1222 auto fin = finished_queue.front();
1223 finished_queue.pop_front();
1224
1225 dout(10) << " finish " << fin << dendl;
1226 fin->complete(0);
1227
1228 heartbeat_reset();
1229 }
1230 }
1231
1232 while (!waiting_for_nolaggy.empty()) {
1233 // stop if we're laggy now!
1234 if (beacon.is_laggy())
1235 break;
1236
1237 cref_t<Message> old = waiting_for_nolaggy.front();
1238 waiting_for_nolaggy.pop_front();
1239
1240 if (!is_stale_message(old)) {
1241 dout(7) << " processing laggy deferred " << *old << dendl;
1242 if (!handle_deferrable_message(old)) {
1243 dout(0) << "unrecognized message " << *old << dendl;
1244 }
1245 }
1246
1247 heartbeat_reset();
1248 }
1249 }
1250
1251 /**
1252 * Call this when you take mds_lock, or periodically if you're going to
1253 * hold the lock for a long time (e.g. iterating over clients/inodes)
1254 */
1255 void MDSRank::heartbeat_reset()
1256 {
1257 // Any thread might jump into mds_lock and call us immediately
1258 // after a call to suicide() completes, in which case MDSRank::hb
1259 // has been freed and we are a no-op.
1260 if (!hb) {
1261 ceph_assert(stopping);
1262 return;
1263 }
1264
1265 // NB not enabling suicide grace, because the mon takes care of killing us
1266 // (by blacklisting us) when we fail to send beacons, and it's simpler to
1267 // only have one way of dying.
1268 auto grace = g_conf().get_val<double>("mds_heartbeat_grace");
1269 g_ceph_context->get_heartbeat_map()->reset_timeout(hb, grace, 0);
1270 }
1271
1272 bool MDSRank::is_stale_message(const cref_t<Message> &m) const
1273 {
1274 // from bad mds?
1275 if (m->get_source().is_mds()) {
1276 mds_rank_t from = mds_rank_t(m->get_source().num());
1277 bool bad = false;
1278 if (mdsmap->is_down(from)) {
1279 bad = true;
1280 } else {
1281 // FIXME: this is a convoluted check. we should be maintaining a nice
1282 // clean map of current ConnectionRefs for current mdses!!!
1283 auto c = messenger->connect_to(CEPH_ENTITY_TYPE_MDS,
1284 mdsmap->get_addrs(from));
1285 if (c != m->get_connection()) {
1286 bad = true;
1287 dout(5) << " mds." << from << " should be " << c << " "
1288 << c->get_peer_addrs() << " but this message is "
1289 << m->get_connection() << " " << m->get_source_addrs()
1290 << dendl;
1291 }
1292 }
1293 if (bad) {
1294 // bogus mds?
1295 if (m->get_type() == CEPH_MSG_MDS_MAP) {
1296 dout(5) << "got " << *m << " from old/bad/imposter mds " << m->get_source()
1297 << ", but it's an mdsmap, looking at it" << dendl;
1298 } else if (m->get_type() == MSG_MDS_CACHEEXPIRE &&
1299 mdsmap->get_addrs(from) == m->get_source_addrs()) {
1300 dout(5) << "got " << *m << " from down mds " << m->get_source()
1301 << ", but it's a cache_expire, looking at it" << dendl;
1302 } else {
1303 dout(5) << "got " << *m << " from down/old/bad/imposter mds " << m->get_source()
1304 << ", dropping" << dendl;
1305 return true;
1306 }
1307 }
1308 }
1309 return false;
1310 }
1311
1312 Session *MDSRank::get_session(const cref_t<Message> &m)
1313 {
1314 // do not carry ref
1315 auto session = static_cast<Session *>(m->get_connection()->get_priv().get());
1316 if (session) {
1317 dout(20) << "get_session have " << session << " " << session->info.inst
1318 << " state " << session->get_state_name() << dendl;
1319 // Check if we've imported an open session since (new sessions start closed)
1320 if (session->is_closed()) {
1321 Session *imported_session = sessionmap.get_session(session->info.inst.name);
1322 if (imported_session && imported_session != session) {
1323 dout(10) << __func__ << " replacing connection bootstrap session "
1324 << session << " with imported session " << imported_session
1325 << dendl;
1326 imported_session->info.auth_name = session->info.auth_name;
1327 //assert(session->info.auth_name == imported_session->info.auth_name);
1328 ceph_assert(session->info.inst == imported_session->info.inst);
1329 imported_session->set_connection(session->get_connection().get());
1330 // send out any queued messages
1331 while (!session->preopen_out_queue.empty()) {
1332 imported_session->get_connection()->send_message2(std::move(session->preopen_out_queue.front()));
1333 session->preopen_out_queue.pop_front();
1334 }
1335 imported_session->auth_caps = session->auth_caps;
1336 imported_session->last_seen = session->last_seen;
1337 ceph_assert(session->get_nref() == 1);
1338 imported_session->get_connection()->set_priv(imported_session->get());
1339 session = imported_session;
1340 }
1341 }
1342 } else {
1343 dout(20) << "get_session dne for " << m->get_source_inst() << dendl;
1344 }
1345 return session;
1346 }
1347
1348 void MDSRank::send_message(const ref_t<Message>& m, const ConnectionRef& c)
1349 {
1350 ceph_assert(c);
1351 c->send_message2(m);
1352 }
1353
1354
1355 void MDSRank::send_message_mds(const ref_t<Message>& m, mds_rank_t mds)
1356 {
1357 if (!mdsmap->is_up(mds)) {
1358 dout(10) << "send_message_mds mds." << mds << " not up, dropping " << *m << dendl;
1359 return;
1360 }
1361
1362 // send mdsmap first?
1363 if (mds != whoami && peer_mdsmap_epoch[mds] < mdsmap->get_epoch()) {
1364 auto _m = make_message<MMDSMap>(monc->get_fsid(), *mdsmap);
1365 messenger->send_to_mds(_m.detach(), mdsmap->get_addrs(mds));
1366 peer_mdsmap_epoch[mds] = mdsmap->get_epoch();
1367 }
1368
1369 // send message
1370 messenger->send_to_mds(ref_t<Message>(m).detach(), mdsmap->get_addrs(mds));
1371 }
1372
1373 void MDSRank::forward_message_mds(const cref_t<MClientRequest>& m, mds_rank_t mds)
1374 {
1375 ceph_assert(mds != whoami);
1376
1377 /*
1378 * don't actually forward if non-idempotent!
1379 * client has to do it. although the MDS will ignore duplicate requests,
1380 * the affected metadata may migrate, in which case the new authority
1381 * won't have the metareq_id in the completed request map.
1382 */
1383 // NEW: always make the client resend!
1384 bool client_must_resend = true; //!creq->can_forward();
1385
1386 // tell the client where it should go
1387 auto session = get_session(m);
1388 auto f = make_message<MClientRequestForward>(m->get_tid(), mds, m->get_num_fwd()+1, client_must_resend);
1389 send_message_client(f, session);
1390 }
1391
1392 void MDSRank::send_message_client_counted(const ref_t<Message>& m, client_t client)
1393 {
1394 Session *session = sessionmap.get_session(entity_name_t::CLIENT(client.v));
1395 if (session) {
1396 send_message_client_counted(m, session);
1397 } else {
1398 dout(10) << "send_message_client_counted no session for client." << client << " " << *m << dendl;
1399 }
1400 }
1401
1402 void MDSRank::send_message_client_counted(const ref_t<Message>& m, const ConnectionRef& connection)
1403 {
1404 // do not carry ref
1405 auto session = static_cast<Session *>(connection->get_priv().get());
1406 if (session) {
1407 send_message_client_counted(m, session);
1408 } else {
1409 dout(10) << "send_message_client_counted has no session for " << m->get_source_inst() << dendl;
1410 // another Connection took over the Session
1411 }
1412 }
1413
1414 void MDSRank::send_message_client_counted(const ref_t<Message>& m, Session* session)
1415 {
1416 version_t seq = session->inc_push_seq();
1417 dout(10) << "send_message_client_counted " << session->info.inst.name << " seq "
1418 << seq << " " << *m << dendl;
1419 if (session->get_connection()) {
1420 session->get_connection()->send_message2(m);
1421 } else {
1422 session->preopen_out_queue.push_back(m);
1423 }
1424 }
1425
1426 void MDSRank::send_message_client(const ref_t<Message>& m, Session* session)
1427 {
1428 dout(10) << "send_message_client " << session->info.inst << " " << *m << dendl;
1429 if (session->get_connection()) {
1430 session->get_connection()->send_message2(m);
1431 } else {
1432 session->preopen_out_queue.push_back(m);
1433 }
1434 }
1435
1436 /**
1437 * This is used whenever a RADOS operation has been cancelled
1438 * or a RADOS client has been blacklisted, to cause the MDS and
1439 * any clients to wait for this OSD epoch before using any new caps.
1440 *
1441 * See doc/cephfs/eviction
1442 */
1443 void MDSRank::set_osd_epoch_barrier(epoch_t e)
1444 {
1445 dout(4) << __func__ << ": epoch=" << e << dendl;
1446 osd_epoch_barrier = e;
1447 }
1448
1449 void MDSRank::retry_dispatch(const cref_t<Message> &m)
1450 {
1451 inc_dispatch_depth();
1452 _dispatch(m, false);
1453 dec_dispatch_depth();
1454 }
1455
1456 double MDSRank::get_dispatch_queue_max_age(utime_t now) const
1457 {
1458 return messenger->get_dispatch_queue_max_age(now);
1459 }
1460
1461 bool MDSRank::is_daemon_stopping() const
1462 {
1463 return stopping;
1464 }
1465
1466 void MDSRank::request_state(MDSMap::DaemonState s)
1467 {
1468 dout(3) << "request_state " << ceph_mds_state_name(s) << dendl;
1469 beacon.set_want_state(*mdsmap, s);
1470 beacon.send();
1471 }
1472
1473
1474 class C_MDS_BootStart : public MDSInternalContext {
1475 MDSRank::BootStep nextstep;
1476 public:
1477 C_MDS_BootStart(MDSRank *m, MDSRank::BootStep n)
1478 : MDSInternalContext(m), nextstep(n) {}
1479 void finish(int r) override {
1480 mds->boot_start(nextstep, r);
1481 }
1482 };
1483
1484
1485 void MDSRank::boot_start(BootStep step, int r)
1486 {
1487 // Handle errors from previous step
1488 if (r < 0) {
1489 if (is_standby_replay() && (r == -EAGAIN)) {
1490 dout(0) << "boot_start encountered an error EAGAIN"
1491 << ", respawning since we fell behind journal" << dendl;
1492 respawn();
1493 } else if (r == -EINVAL || r == -ENOENT) {
1494 // Invalid or absent data, indicates damaged on-disk structures
1495 clog->error() << "Error loading MDS rank " << whoami << ": "
1496 << cpp_strerror(r);
1497 damaged();
1498 ceph_assert(r == 0); // Unreachable, damaged() calls respawn()
1499 } else if (r == -EROFS) {
1500 dout(0) << "boot error forcing transition to read-only; MDS will try to continue" << dendl;
1501 } else {
1502 // Completely unexpected error, give up and die
1503 dout(0) << "boot_start encountered an error, failing" << dendl;
1504 suicide();
1505 return;
1506 }
1507 }
1508
1509 ceph_assert(is_starting() || is_any_replay());
1510
1511 switch(step) {
1512 case MDS_BOOT_INITIAL:
1513 {
1514 mdcache->init_layouts();
1515
1516 MDSGatherBuilder gather(g_ceph_context,
1517 new C_MDS_BootStart(this, MDS_BOOT_OPEN_ROOT));
1518 dout(2) << "Booting: " << step << ": opening inotable" << dendl;
1519 inotable->set_rank(whoami);
1520 inotable->load(gather.new_sub());
1521
1522 dout(2) << "Booting: " << step << ": opening sessionmap" << dendl;
1523 sessionmap.set_rank(whoami);
1524 sessionmap.load(gather.new_sub());
1525
1526 dout(2) << "Booting: " << step << ": opening mds log" << dendl;
1527 mdlog->open(gather.new_sub());
1528
1529 if (is_starting()) {
1530 dout(2) << "Booting: " << step << ": opening purge queue" << dendl;
1531 purge_queue.open(new C_IO_Wrapper(this, gather.new_sub()));
1532 } else if (!standby_replaying) {
1533 dout(2) << "Booting: " << step << ": opening purge queue (async)" << dendl;
1534 purge_queue.open(NULL);
1535 dout(2) << "Booting: " << step << ": loading open file table (async)" << dendl;
1536 mdcache->open_file_table.load(nullptr);
1537 }
1538
1539 if (mdsmap->get_tableserver() == whoami) {
1540 dout(2) << "Booting: " << step << ": opening snap table" << dendl;
1541 snapserver->set_rank(whoami);
1542 snapserver->load(gather.new_sub());
1543 }
1544
1545 gather.activate();
1546 }
1547 break;
1548 case MDS_BOOT_OPEN_ROOT:
1549 {
1550 dout(2) << "Booting: " << step << ": loading/discovering base inodes" << dendl;
1551
1552 MDSGatherBuilder gather(g_ceph_context,
1553 new C_MDS_BootStart(this, MDS_BOOT_PREPARE_LOG));
1554
1555 if (is_starting()) {
1556 // load mydir frag for the first log segment (creating subtree map)
1557 mdcache->open_mydir_frag(gather.new_sub());
1558 } else {
1559 mdcache->open_mydir_inode(gather.new_sub());
1560 }
1561
1562 mdcache->create_global_snaprealm();
1563
1564 if (whoami == mdsmap->get_root()) { // load root inode off disk if we are auth
1565 mdcache->open_root_inode(gather.new_sub());
1566 } else if (is_any_replay()) {
1567 // replay. make up fake root inode to start with
1568 mdcache->create_root_inode();
1569 }
1570 gather.activate();
1571 }
1572 break;
1573 case MDS_BOOT_PREPARE_LOG:
1574 if (is_any_replay()) {
1575 dout(2) << "Booting: " << step << ": replaying mds log" << dendl;
1576 MDSGatherBuilder gather(g_ceph_context,
1577 new C_MDS_BootStart(this, MDS_BOOT_REPLAY_DONE));
1578
1579 if (!standby_replaying) {
1580 dout(2) << "Booting: " << step << ": waiting for purge queue recovered" << dendl;
1581 purge_queue.wait_for_recovery(new C_IO_Wrapper(this, gather.new_sub()));
1582 }
1583
1584 mdlog->replay(gather.new_sub());
1585 gather.activate();
1586 } else {
1587 dout(2) << "Booting: " << step << ": positioning at end of old mds log" << dendl;
1588 mdlog->append();
1589 starting_done();
1590 }
1591 break;
1592 case MDS_BOOT_REPLAY_DONE:
1593 ceph_assert(is_any_replay());
1594
1595 // Sessiontable and inotable should be in sync after replay, validate
1596 // that they are consistent.
1597 validate_sessions();
1598
1599 replay_done();
1600 break;
1601 }
1602 }
1603
1604 void MDSRank::validate_sessions()
1605 {
1606 ceph_assert(ceph_mutex_is_locked_by_me(mds_lock));
1607 bool valid = true;
1608
1609 // Identify any sessions which have state inconsistent with other,
1610 // after they have been loaded from rados during startup.
1611 // Mitigate bugs like: http://tracker.ceph.com/issues/16842
1612 for (const auto &i : sessionmap.get_sessions()) {
1613 Session *session = i.second;
1614 interval_set<inodeno_t> badones;
1615 if (inotable->intersects_free(session->info.prealloc_inos, &badones)) {
1616 clog->error() << "client " << *session
1617 << "loaded with preallocated inodes that are inconsistent with inotable";
1618 valid = false;
1619 }
1620 }
1621
1622 if (!valid) {
1623 damaged();
1624 ceph_assert(valid);
1625 }
1626 }
1627
1628 void MDSRank::starting_done()
1629 {
1630 dout(3) << "starting_done" << dendl;
1631 ceph_assert(is_starting());
1632 request_state(MDSMap::STATE_ACTIVE);
1633
1634 mdlog->start_new_segment();
1635
1636 // sync snaptable cache
1637 snapclient->sync(new C_MDSInternalNoop);
1638 }
1639
1640
1641 void MDSRank::calc_recovery_set()
1642 {
1643 // initialize gather sets
1644 set<mds_rank_t> rs;
1645 mdsmap->get_recovery_mds_set(rs);
1646 rs.erase(whoami);
1647 mdcache->set_recovery_set(rs);
1648
1649 dout(1) << " recovery set is " << rs << dendl;
1650 }
1651
1652
1653 void MDSRank::replay_start()
1654 {
1655 dout(1) << "replay_start" << dendl;
1656
1657 if (is_standby_replay())
1658 standby_replaying = true;
1659
1660 calc_recovery_set();
1661
1662 // Check if we need to wait for a newer OSD map before starting
1663 Context *fin = new C_IO_Wrapper(this, new C_MDS_BootStart(this, MDS_BOOT_INITIAL));
1664 bool const ready = objecter->wait_for_map(
1665 mdsmap->get_last_failure_osd_epoch(),
1666 fin);
1667
1668 if (ready) {
1669 delete fin;
1670 boot_start();
1671 } else {
1672 dout(1) << " waiting for osdmap " << mdsmap->get_last_failure_osd_epoch()
1673 << " (which blacklists prior instance)" << dendl;
1674 }
1675 }
1676
1677
1678 class MDSRank::C_MDS_StandbyReplayRestartFinish : public MDSIOContext {
1679 uint64_t old_read_pos;
1680 public:
1681 C_MDS_StandbyReplayRestartFinish(MDSRank *mds_, uint64_t old_read_pos_) :
1682 MDSIOContext(mds_), old_read_pos(old_read_pos_) {}
1683 void finish(int r) override {
1684 mds->_standby_replay_restart_finish(r, old_read_pos);
1685 }
1686 void print(ostream& out) const override {
1687 out << "standby_replay_restart";
1688 }
1689 };
1690
1691 void MDSRank::_standby_replay_restart_finish(int r, uint64_t old_read_pos)
1692 {
1693 if (old_read_pos < mdlog->get_journaler()->get_trimmed_pos()) {
1694 dout(0) << "standby MDS fell behind active MDS journal's expire_pos, restarting" << dendl;
1695 respawn(); /* we're too far back, and this is easier than
1696 trying to reset everything in the cache, etc */
1697 } else {
1698 mdlog->standby_trim_segments();
1699 boot_start(MDS_BOOT_PREPARE_LOG, r);
1700 }
1701 }
1702
1703 class MDSRank::C_MDS_StandbyReplayRestart : public MDSInternalContext {
1704 public:
1705 explicit C_MDS_StandbyReplayRestart(MDSRank *m) : MDSInternalContext(m) {}
1706 void finish(int r) override {
1707 ceph_assert(!r);
1708 mds->standby_replay_restart();
1709 }
1710 };
1711
1712 void MDSRank::standby_replay_restart()
1713 {
1714 if (standby_replaying) {
1715 /* Go around for another pass of replaying in standby */
1716 dout(5) << "Restarting replay as standby-replay" << dendl;
1717 mdlog->get_journaler()->reread_head_and_probe(
1718 new C_MDS_StandbyReplayRestartFinish(
1719 this,
1720 mdlog->get_journaler()->get_read_pos()));
1721 } else {
1722 /* We are transitioning out of standby: wait for OSD map update
1723 before making final pass */
1724 dout(1) << "standby_replay_restart (final takeover pass)" << dendl;
1725 Context *fin = new C_IO_Wrapper(this, new C_MDS_StandbyReplayRestart(this));
1726 bool ready = objecter->wait_for_map(mdsmap->get_last_failure_osd_epoch(), fin);
1727 if (ready) {
1728 delete fin;
1729 mdlog->get_journaler()->reread_head_and_probe(
1730 new C_MDS_StandbyReplayRestartFinish(
1731 this,
1732 mdlog->get_journaler()->get_read_pos()));
1733
1734 dout(1) << " opening purge_queue (async)" << dendl;
1735 purge_queue.open(NULL);
1736 dout(1) << " opening open_file_table (async)" << dendl;
1737 mdcache->open_file_table.load(nullptr);
1738 } else {
1739 dout(1) << " waiting for osdmap " << mdsmap->get_last_failure_osd_epoch()
1740 << " (which blacklists prior instance)" << dendl;
1741 }
1742 }
1743 }
1744
1745 void MDSRank::replay_done()
1746 {
1747 if (!standby_replaying) {
1748 dout(1) << "Finished replaying journal" << dendl;
1749 } else {
1750 dout(5) << "Finished replaying journal as standby-replay" << dendl;
1751 }
1752
1753 if (is_standby_replay()) {
1754 // The replay was done in standby state, and we are still in that state
1755 ceph_assert(standby_replaying);
1756 dout(10) << "setting replay timer" << dendl;
1757 timer.add_event_after(g_conf()->mds_replay_interval,
1758 new C_MDS_StandbyReplayRestart(this));
1759 return;
1760 } else if (standby_replaying) {
1761 // The replay was done in standby state, we have now _left_ that state
1762 dout(10) << " last replay pass was as a standby; making final pass" << dendl;
1763 standby_replaying = false;
1764 standby_replay_restart();
1765 return;
1766 } else {
1767 // Replay is complete, journal read should be up to date
1768 ceph_assert(mdlog->get_journaler()->get_read_pos() == mdlog->get_journaler()->get_write_pos());
1769 ceph_assert(!is_standby_replay());
1770
1771 // Reformat and come back here
1772 if (mdlog->get_journaler()->get_stream_format() < g_conf()->mds_journal_format) {
1773 dout(4) << "reformatting journal on standby-replay->replay transition" << dendl;
1774 mdlog->reopen(new C_MDS_BootStart(this, MDS_BOOT_REPLAY_DONE));
1775 return;
1776 }
1777 }
1778
1779 dout(1) << "making mds journal writeable" << dendl;
1780 mdlog->get_journaler()->set_writeable();
1781 mdlog->get_journaler()->trim_tail();
1782
1783 if (mdsmap->get_tableserver() == whoami &&
1784 snapserver->upgrade_format()) {
1785 dout(1) << "upgrading snaptable format" << dendl;
1786 snapserver->save(new C_MDSInternalNoop);
1787 }
1788
1789 if (g_conf()->mds_wipe_sessions) {
1790 dout(1) << "wiping out client sessions" << dendl;
1791 sessionmap.wipe();
1792 sessionmap.save(new C_MDSInternalNoop);
1793 }
1794 if (g_conf()->mds_wipe_ino_prealloc) {
1795 dout(1) << "wiping out ino prealloc from sessions" << dendl;
1796 sessionmap.wipe_ino_prealloc();
1797 sessionmap.save(new C_MDSInternalNoop);
1798 }
1799 if (g_conf()->mds_skip_ino) {
1800 inodeno_t i = g_conf()->mds_skip_ino;
1801 dout(1) << "skipping " << i << " inodes" << dendl;
1802 inotable->skip_inos(i);
1803 inotable->save(new C_MDSInternalNoop);
1804 }
1805
1806 if (mdsmap->get_num_in_mds() == 1 &&
1807 mdsmap->get_num_failed_mds() == 0) { // just me!
1808 dout(2) << "i am alone, moving to state reconnect" << dendl;
1809 request_state(MDSMap::STATE_RECONNECT);
1810 // sync snaptable cache
1811 snapclient->sync(new C_MDSInternalNoop);
1812 } else {
1813 dout(2) << "i am not alone, moving to state resolve" << dendl;
1814 request_state(MDSMap::STATE_RESOLVE);
1815 }
1816 }
1817
1818 void MDSRank::reopen_log()
1819 {
1820 dout(1) << "reopen_log" << dendl;
1821 mdcache->rollback_uncommitted_fragments();
1822 }
1823
1824 void MDSRank::resolve_start()
1825 {
1826 dout(1) << "resolve_start" << dendl;
1827
1828 reopen_log();
1829
1830 mdcache->resolve_start(new C_MDS_VoidFn(this, &MDSRank::resolve_done));
1831 finish_contexts(g_ceph_context, waiting_for_resolve);
1832 }
1833
1834 void MDSRank::resolve_done()
1835 {
1836 dout(1) << "resolve_done" << dendl;
1837 request_state(MDSMap::STATE_RECONNECT);
1838 // sync snaptable cache
1839 snapclient->sync(new C_MDSInternalNoop);
1840 }
1841
1842 void MDSRank::reconnect_start()
1843 {
1844 dout(1) << "reconnect_start" << dendl;
1845
1846 if (last_state == MDSMap::STATE_REPLAY) {
1847 reopen_log();
1848 }
1849
1850 // Drop any blacklisted clients from the SessionMap before going
1851 // into reconnect, so that we don't wait for them.
1852 objecter->enable_blacklist_events();
1853 std::set<entity_addr_t> blacklist;
1854 epoch_t epoch = 0;
1855 objecter->with_osdmap([&blacklist, &epoch](const OSDMap& o) {
1856 o.get_blacklist(&blacklist);
1857 epoch = o.get_epoch();
1858 });
1859 auto killed = server->apply_blacklist(blacklist);
1860 dout(4) << "reconnect_start: killed " << killed << " blacklisted sessions ("
1861 << blacklist.size() << " blacklist entries, "
1862 << sessionmap.get_sessions().size() << ")" << dendl;
1863 if (killed) {
1864 set_osd_epoch_barrier(epoch);
1865 }
1866
1867 server->reconnect_clients(new C_MDS_VoidFn(this, &MDSRank::reconnect_done));
1868 finish_contexts(g_ceph_context, waiting_for_reconnect);
1869 }
1870 void MDSRank::reconnect_done()
1871 {
1872 dout(1) << "reconnect_done" << dendl;
1873 request_state(MDSMap::STATE_REJOIN); // move to rejoin state
1874 }
1875
1876 void MDSRank::rejoin_joint_start()
1877 {
1878 dout(1) << "rejoin_joint_start" << dendl;
1879 mdcache->rejoin_send_rejoins();
1880 }
1881 void MDSRank::rejoin_start()
1882 {
1883 dout(1) << "rejoin_start" << dendl;
1884 mdcache->rejoin_start(new C_MDS_VoidFn(this, &MDSRank::rejoin_done));
1885 finish_contexts(g_ceph_context, waiting_for_rejoin);
1886 }
1887 void MDSRank::rejoin_done()
1888 {
1889 dout(1) << "rejoin_done" << dendl;
1890 mdcache->show_subtrees();
1891 mdcache->show_cache();
1892
1893 // funny case: is our cache empty? no subtrees?
1894 if (!mdcache->is_subtrees()) {
1895 if (whoami == 0) {
1896 // The root should always have a subtree!
1897 clog->error() << "No subtrees found for root MDS rank!";
1898 damaged();
1899 ceph_assert(mdcache->is_subtrees());
1900 } else {
1901 dout(1) << " empty cache, no subtrees, leaving cluster" << dendl;
1902 request_state(MDSMap::STATE_STOPPED);
1903 }
1904 return;
1905 }
1906
1907 if (replay_queue.empty() && !server->get_num_pending_reclaim()) {
1908 request_state(MDSMap::STATE_ACTIVE);
1909 } else {
1910 replaying_requests_done = replay_queue.empty();
1911 request_state(MDSMap::STATE_CLIENTREPLAY);
1912 }
1913 }
1914
1915 void MDSRank::clientreplay_start()
1916 {
1917 dout(1) << "clientreplay_start" << dendl;
1918 finish_contexts(g_ceph_context, waiting_for_replay); // kick waiters
1919 mdcache->start_files_to_recover();
1920 queue_one_replay();
1921 }
1922
1923 bool MDSRank::queue_one_replay()
1924 {
1925 if (!replay_queue.empty()) {
1926 queue_waiter(replay_queue.front());
1927 replay_queue.pop_front();
1928 return true;
1929 }
1930 if (!replaying_requests_done) {
1931 replaying_requests_done = true;
1932 mdlog->flush();
1933 }
1934 maybe_clientreplay_done();
1935 return false;
1936 }
1937
1938 void MDSRank::maybe_clientreplay_done()
1939 {
1940 if (is_clientreplay() && get_want_state() == MDSMap::STATE_CLIENTREPLAY) {
1941
1942 // don't go to active if there are session waiting for being reclaimed
1943 if (replaying_requests_done && !server->get_num_pending_reclaim()) {
1944 mdlog->wait_for_safe(new C_MDS_VoidFn(this, &MDSRank::clientreplay_done));
1945 return;
1946 }
1947
1948 dout(1) << " still have " << replay_queue.size() + (int)!replaying_requests_done
1949 << " requests need to be replayed, " << server->get_num_pending_reclaim()
1950 << " sessions need to be reclaimed" << dendl;
1951 }
1952 }
1953
1954 void MDSRank::clientreplay_done()
1955 {
1956 dout(1) << "clientreplay_done" << dendl;
1957 request_state(MDSMap::STATE_ACTIVE);
1958 }
1959
1960 void MDSRank::active_start()
1961 {
1962 dout(1) << "active_start" << dendl;
1963
1964 if (last_state == MDSMap::STATE_CREATING ||
1965 last_state == MDSMap::STATE_STARTING) {
1966 mdcache->open_root();
1967 }
1968
1969 mdcache->clean_open_file_lists();
1970 mdcache->export_remaining_imported_caps();
1971 finish_contexts(g_ceph_context, waiting_for_replay); // kick waiters
1972 mdcache->start_files_to_recover();
1973
1974 mdcache->reissue_all_caps();
1975
1976 finish_contexts(g_ceph_context, waiting_for_active); // kick waiters
1977 }
1978
1979 void MDSRank::recovery_done(int oldstate)
1980 {
1981 dout(1) << "recovery_done -- successful recovery!" << dendl;
1982 ceph_assert(is_clientreplay() || is_active());
1983
1984 if (oldstate == MDSMap::STATE_CREATING)
1985 return;
1986
1987 mdcache->start_recovered_truncates();
1988 mdcache->start_purge_inodes();
1989 mdcache->do_file_recover();
1990
1991 // tell connected clients
1992 //bcast_mds_map(); // not anymore, they get this from the monitor
1993
1994 mdcache->populate_mydir();
1995 }
1996
1997 void MDSRank::creating_done()
1998 {
1999 dout(1)<< "creating_done" << dendl;
2000 request_state(MDSMap::STATE_ACTIVE);
2001 // sync snaptable cache
2002 snapclient->sync(new C_MDSInternalNoop);
2003 }
2004
2005 void MDSRank::boot_create()
2006 {
2007 dout(3) << "boot_create" << dendl;
2008
2009 MDSGatherBuilder fin(g_ceph_context, new C_MDS_VoidFn(this, &MDSRank::creating_done));
2010
2011 mdcache->init_layouts();
2012
2013 inotable->set_rank(whoami);
2014 sessionmap.set_rank(whoami);
2015
2016 // start with a fresh journal
2017 dout(10) << "boot_create creating fresh journal" << dendl;
2018 mdlog->create(fin.new_sub());
2019
2020 // open new journal segment, but do not journal subtree map (yet)
2021 mdlog->prepare_new_segment();
2022
2023 if (whoami == mdsmap->get_root()) {
2024 dout(3) << "boot_create creating fresh hierarchy" << dendl;
2025 mdcache->create_empty_hierarchy(fin.get());
2026 }
2027
2028 dout(3) << "boot_create creating mydir hierarchy" << dendl;
2029 mdcache->create_mydir_hierarchy(fin.get());
2030
2031 dout(3) << "boot_create creating global snaprealm" << dendl;
2032 mdcache->create_global_snaprealm();
2033
2034 // fixme: fake out inotable (reset, pretend loaded)
2035 dout(10) << "boot_create creating fresh inotable table" << dendl;
2036 inotable->reset();
2037 inotable->save(fin.new_sub());
2038
2039 // write empty sessionmap
2040 sessionmap.save(fin.new_sub());
2041
2042 // Create empty purge queue
2043 purge_queue.create(new C_IO_Wrapper(this, fin.new_sub()));
2044
2045 // initialize tables
2046 if (mdsmap->get_tableserver() == whoami) {
2047 dout(10) << "boot_create creating fresh snaptable" << dendl;
2048 snapserver->set_rank(whoami);
2049 snapserver->reset();
2050 snapserver->save(fin.new_sub());
2051 }
2052
2053 ceph_assert(g_conf()->mds_kill_create_at != 1);
2054
2055 // ok now journal it
2056 mdlog->journal_segment_subtree_map(fin.new_sub());
2057 mdlog->flush();
2058
2059 // Usually we do this during reconnect, but creation skips that.
2060 objecter->enable_blacklist_events();
2061
2062 fin.activate();
2063 }
2064
2065 void MDSRank::stopping_start()
2066 {
2067 dout(2) << "Stopping..." << dendl;
2068
2069 if (mdsmap->get_num_in_mds() == 1 && !sessionmap.empty()) {
2070 std::vector<Session*> victims;
2071 const auto& sessions = sessionmap.get_sessions();
2072 for (const auto& p : sessions) {
2073 if (!p.first.is_client()) {
2074 continue;
2075 }
2076
2077 Session *s = p.second;
2078 victims.push_back(s);
2079 }
2080
2081 dout(20) << __func__ << " matched " << victims.size() << " sessions" << dendl;
2082 ceph_assert(!victims.empty());
2083
2084 C_GatherBuilder gather(g_ceph_context, new C_MDSInternalNoop);
2085 for (const auto &s : victims) {
2086 std::stringstream ss;
2087 evict_client(s->get_client().v, false,
2088 g_conf()->mds_session_blacklist_on_evict, ss, gather.new_sub());
2089 }
2090 gather.activate();
2091 }
2092
2093 mdcache->shutdown_start();
2094 }
2095
2096 void MDSRank::stopping_done()
2097 {
2098 dout(2) << "Finished stopping..." << dendl;
2099
2100 // tell monitor we shut down cleanly.
2101 request_state(MDSMap::STATE_STOPPED);
2102 }
2103
2104 void MDSRankDispatcher::handle_mds_map(
2105 const cref_t<MMDSMap> &m,
2106 const MDSMap &oldmap)
2107 {
2108 // I am only to be passed MDSMaps in which I hold a rank
2109 ceph_assert(whoami != MDS_RANK_NONE);
2110
2111 MDSMap::DaemonState oldstate = state;
2112 mds_gid_t mds_gid = mds_gid_t(monc->get_global_id());
2113 state = mdsmap->get_state_gid(mds_gid);
2114 if (state != oldstate) {
2115 last_state = oldstate;
2116 incarnation = mdsmap->get_inc_gid(mds_gid);
2117 }
2118
2119 version_t epoch = m->get_epoch();
2120
2121 // note source's map version
2122 if (m->get_source().is_mds() &&
2123 peer_mdsmap_epoch[mds_rank_t(m->get_source().num())] < epoch) {
2124 dout(15) << " peer " << m->get_source()
2125 << " has mdsmap epoch >= " << epoch
2126 << dendl;
2127 peer_mdsmap_epoch[mds_rank_t(m->get_source().num())] = epoch;
2128 }
2129
2130 // Validate state transitions while I hold a rank
2131 if (!MDSMap::state_transition_valid(oldstate, state)) {
2132 derr << "Invalid state transition " << ceph_mds_state_name(oldstate)
2133 << "->" << ceph_mds_state_name(state) << dendl;
2134 respawn();
2135 }
2136
2137 if (oldstate != state) {
2138 // update messenger.
2139 if (state == MDSMap::STATE_STANDBY_REPLAY) {
2140 dout(1) << "handle_mds_map i am now mds." << mds_gid << "." << incarnation
2141 << " replaying mds." << whoami << "." << incarnation << dendl;
2142 messenger->set_myname(entity_name_t::MDS(mds_gid));
2143 } else {
2144 dout(1) << "handle_mds_map i am now mds." << whoami << "." << incarnation << dendl;
2145 messenger->set_myname(entity_name_t::MDS(whoami));
2146 }
2147 }
2148
2149 // tell objecter my incarnation
2150 if (objecter->get_client_incarnation() != incarnation)
2151 objecter->set_client_incarnation(incarnation);
2152
2153 if (mdsmap->get_min_compat_client() < ceph_release_t::max &&
2154 oldmap.get_min_compat_client() != mdsmap->get_min_compat_client())
2155 server->update_required_client_features();
2156
2157 // for debug
2158 if (g_conf()->mds_dump_cache_on_map)
2159 mdcache->dump_cache();
2160
2161 cluster_degraded = mdsmap->is_degraded();
2162
2163 // mdsmap and oldmap can be discontinuous. failover might happen in the missing mdsmap.
2164 // the 'restart' set tracks ranks that have restarted since the old mdsmap
2165 set<mds_rank_t> restart;
2166 // replaying mds does not communicate with other ranks
2167 if (state >= MDSMap::STATE_RESOLVE) {
2168 // did someone fail?
2169 // new down?
2170 set<mds_rank_t> olddown, down;
2171 oldmap.get_down_mds_set(&olddown);
2172 mdsmap->get_down_mds_set(&down);
2173 for (const auto& r : down) {
2174 if (oldmap.have_inst(r) && olddown.count(r) == 0) {
2175 messenger->mark_down_addrs(oldmap.get_addrs(r));
2176 handle_mds_failure(r);
2177 }
2178 }
2179
2180 // did someone fail?
2181 // did their addr/inst change?
2182 set<mds_rank_t> up;
2183 mdsmap->get_up_mds_set(up);
2184 for (const auto& r : up) {
2185 auto& info = mdsmap->get_info(r);
2186 if (oldmap.have_inst(r)) {
2187 auto& oldinfo = oldmap.get_info(r);
2188 if (info.inc != oldinfo.inc) {
2189 messenger->mark_down_addrs(oldinfo.get_addrs());
2190 if (info.state == MDSMap::STATE_REPLAY ||
2191 info.state == MDSMap::STATE_RESOLVE) {
2192 restart.insert(r);
2193 handle_mds_failure(r);
2194 } else {
2195 ceph_assert(info.state == MDSMap::STATE_STARTING ||
2196 info.state == MDSMap::STATE_ACTIVE);
2197 // -> stopped (missing) -> starting -> active
2198 restart.insert(r);
2199 mdcache->migrator->handle_mds_failure_or_stop(r);
2200 if (mdsmap->get_tableserver() == whoami)
2201 snapserver->handle_mds_failure_or_stop(r);
2202 }
2203 }
2204 } else {
2205 if (info.state == MDSMap::STATE_REPLAY ||
2206 info.state == MDSMap::STATE_RESOLVE) {
2207 // -> starting/creating (missing) -> active (missing) -> replay -> resolve
2208 restart.insert(r);
2209 handle_mds_failure(r);
2210 } else {
2211 ceph_assert(info.state == MDSMap::STATE_CREATING ||
2212 info.state == MDSMap::STATE_STARTING ||
2213 info.state == MDSMap::STATE_ACTIVE);
2214 }
2215 }
2216 }
2217 }
2218
2219 // did it change?
2220 if (oldstate != state) {
2221 dout(1) << "handle_mds_map state change "
2222 << ceph_mds_state_name(oldstate) << " --> "
2223 << ceph_mds_state_name(state) << dendl;
2224 beacon.set_want_state(*mdsmap, state);
2225
2226 if (oldstate == MDSMap::STATE_STANDBY_REPLAY) {
2227 dout(10) << "Monitor activated us! Deactivating replay loop" << dendl;
2228 assert (state == MDSMap::STATE_REPLAY);
2229 } else {
2230 // did i just recover?
2231 if ((is_active() || is_clientreplay()) &&
2232 (oldstate == MDSMap::STATE_CREATING ||
2233 oldstate == MDSMap::STATE_REJOIN ||
2234 oldstate == MDSMap::STATE_RECONNECT))
2235 recovery_done(oldstate);
2236
2237 if (is_active()) {
2238 active_start();
2239 } else if (is_any_replay()) {
2240 replay_start();
2241 } else if (is_resolve()) {
2242 resolve_start();
2243 } else if (is_reconnect()) {
2244 reconnect_start();
2245 } else if (is_rejoin()) {
2246 rejoin_start();
2247 } else if (is_clientreplay()) {
2248 clientreplay_start();
2249 } else if (is_creating()) {
2250 boot_create();
2251 } else if (is_starting()) {
2252 boot_start();
2253 } else if (is_stopping()) {
2254 ceph_assert(oldstate == MDSMap::STATE_ACTIVE);
2255 stopping_start();
2256 }
2257 }
2258 }
2259
2260 // RESOLVE
2261 // is someone else newly resolving?
2262 if (state >= MDSMap::STATE_RESOLVE) {
2263 // recover snaptable
2264 if (mdsmap->get_tableserver() == whoami) {
2265 if (oldstate < MDSMap::STATE_RESOLVE) {
2266 set<mds_rank_t> s;
2267 mdsmap->get_mds_set_lower_bound(s, MDSMap::STATE_RESOLVE);
2268 snapserver->finish_recovery(s);
2269 } else {
2270 set<mds_rank_t> old_set, new_set;
2271 oldmap.get_mds_set_lower_bound(old_set, MDSMap::STATE_RESOLVE);
2272 mdsmap->get_mds_set_lower_bound(new_set, MDSMap::STATE_RESOLVE);
2273 for (const auto& r : new_set) {
2274 if (r == whoami)
2275 continue; // not me
2276 if (!old_set.count(r) || restart.count(r)) { // newly so?
2277 snapserver->handle_mds_recovery(r);
2278 }
2279 }
2280 }
2281 }
2282
2283 if ((!oldmap.is_resolving() || !restart.empty()) && mdsmap->is_resolving()) {
2284 set<mds_rank_t> resolve;
2285 mdsmap->get_mds_set(resolve, MDSMap::STATE_RESOLVE);
2286 dout(10) << " resolve set is " << resolve << dendl;
2287 calc_recovery_set();
2288 mdcache->send_resolves();
2289 }
2290 }
2291
2292 // REJOIN
2293 // is everybody finally rejoining?
2294 if (state >= MDSMap::STATE_REJOIN) {
2295 // did we start?
2296 if (!oldmap.is_rejoining() && mdsmap->is_rejoining())
2297 rejoin_joint_start();
2298
2299 // did we finish?
2300 if (g_conf()->mds_dump_cache_after_rejoin &&
2301 oldmap.is_rejoining() && !mdsmap->is_rejoining())
2302 mdcache->dump_cache(); // for DEBUG only
2303
2304 if (oldstate >= MDSMap::STATE_REJOIN ||
2305 oldstate == MDSMap::STATE_STARTING) {
2306 // ACTIVE|CLIENTREPLAY|REJOIN => we can discover from them.
2307 set<mds_rank_t> olddis, dis;
2308 oldmap.get_mds_set_lower_bound(olddis, MDSMap::STATE_REJOIN);
2309 mdsmap->get_mds_set_lower_bound(dis, MDSMap::STATE_REJOIN);
2310 for (const auto& r : dis) {
2311 if (r == whoami)
2312 continue; // not me
2313 if (!olddis.count(r) || restart.count(r)) { // newly so?
2314 mdcache->kick_discovers(r);
2315 mdcache->kick_open_ino_peers(r);
2316 }
2317 }
2318 }
2319 }
2320
2321 if (oldmap.is_degraded() && !cluster_degraded && state >= MDSMap::STATE_ACTIVE) {
2322 dout(1) << "cluster recovered." << dendl;
2323 auto it = waiting_for_active_peer.find(MDS_RANK_NONE);
2324 if (it != waiting_for_active_peer.end()) {
2325 queue_waiters(it->second);
2326 waiting_for_active_peer.erase(it);
2327 }
2328 }
2329
2330 // did someone go active?
2331 if (state >= MDSMap::STATE_CLIENTREPLAY &&
2332 oldstate >= MDSMap::STATE_CLIENTREPLAY) {
2333 set<mds_rank_t> oldactive, active;
2334 oldmap.get_mds_set_lower_bound(oldactive, MDSMap::STATE_CLIENTREPLAY);
2335 mdsmap->get_mds_set_lower_bound(active, MDSMap::STATE_CLIENTREPLAY);
2336 for (const auto& r : active) {
2337 if (r == whoami)
2338 continue; // not me
2339 if (!oldactive.count(r) || restart.count(r)) // newly so?
2340 handle_mds_recovery(r);
2341 }
2342 }
2343
2344 if (is_clientreplay() || is_active() || is_stopping()) {
2345 // did anyone stop?
2346 set<mds_rank_t> oldstopped, stopped;
2347 oldmap.get_stopped_mds_set(oldstopped);
2348 mdsmap->get_stopped_mds_set(stopped);
2349 for (const auto& r : stopped)
2350 if (oldstopped.count(r) == 0) { // newly so?
2351 mdcache->migrator->handle_mds_failure_or_stop(r);
2352 if (mdsmap->get_tableserver() == whoami)
2353 snapserver->handle_mds_failure_or_stop(r);
2354 }
2355 }
2356
2357 {
2358 map<epoch_t,MDSContext::vec >::iterator p = waiting_for_mdsmap.begin();
2359 while (p != waiting_for_mdsmap.end() && p->first <= mdsmap->get_epoch()) {
2360 MDSContext::vec ls;
2361 ls.swap(p->second);
2362 waiting_for_mdsmap.erase(p++);
2363 queue_waiters(ls);
2364 }
2365 }
2366
2367 if (is_active()) {
2368 // Before going active, set OSD epoch barrier to latest (so that
2369 // we don't risk handing out caps to clients with old OSD maps that
2370 // might not include barriers from the previous incarnation of this MDS)
2371 set_osd_epoch_barrier(objecter->with_osdmap(
2372 std::mem_fn(&OSDMap::get_epoch)));
2373
2374 /* Now check if we should hint to the OSD that a read may follow */
2375 if (mdsmap->has_standby_replay(whoami))
2376 mdlog->set_write_iohint(0);
2377 else
2378 mdlog->set_write_iohint(CEPH_OSD_OP_FLAG_FADVISE_DONTNEED);
2379 }
2380
2381 if (oldmap.get_max_mds() != mdsmap->get_max_mds()) {
2382 purge_queue.update_op_limit(*mdsmap);
2383 }
2384
2385 if (mdsmap->get_inline_data_enabled() && !oldmap.get_inline_data_enabled())
2386 dout(0) << "WARNING: inline_data support has been deprecated and will be removed in a future release" << dendl;
2387
2388 if (scrubstack->is_scrubbing()) {
2389 if (mdsmap->get_max_mds() > 1) {
2390 auto c = new C_MDSInternalNoop;
2391 scrubstack->scrub_abort(c);
2392 }
2393 }
2394 mdcache->handle_mdsmap(*mdsmap);
2395 }
2396
2397 void MDSRank::handle_mds_recovery(mds_rank_t who)
2398 {
2399 dout(5) << "handle_mds_recovery mds." << who << dendl;
2400
2401 mdcache->handle_mds_recovery(who);
2402
2403 queue_waiters(waiting_for_active_peer[who]);
2404 waiting_for_active_peer.erase(who);
2405 }
2406
2407 void MDSRank::handle_mds_failure(mds_rank_t who)
2408 {
2409 if (who == whoami) {
2410 dout(5) << "handle_mds_failure for myself; not doing anything" << dendl;
2411 return;
2412 }
2413 dout(5) << "handle_mds_failure mds." << who << dendl;
2414
2415 mdcache->handle_mds_failure(who);
2416
2417 if (mdsmap->get_tableserver() == whoami)
2418 snapserver->handle_mds_failure_or_stop(who);
2419
2420 snapclient->handle_mds_failure(who);
2421 }
2422
2423 void MDSRankDispatcher::handle_asok_command(
2424 std::string_view command,
2425 const cmdmap_t& cmdmap,
2426 Formatter *f,
2427 const bufferlist &inbl,
2428 std::function<void(int,const std::string&,bufferlist&)> on_finish)
2429 {
2430 int r = 0;
2431 stringstream ss;
2432 bufferlist outbl;
2433 if (command == "dump_ops_in_flight" ||
2434 command == "ops") {
2435 if (!op_tracker.dump_ops_in_flight(f)) {
2436 ss << "op_tracker disabled; set mds_enable_op_tracker=true to enable";
2437 }
2438 } else if (command == "dump_blocked_ops") {
2439 if (!op_tracker.dump_ops_in_flight(f, true)) {
2440 ss << "op_tracker disabled; set mds_enable_op_tracker=true to enable";
2441 }
2442 } else if (command == "dump_historic_ops") {
2443 if (!op_tracker.dump_historic_ops(f)) {
2444 ss << "op_tracker disabled; set mds_enable_op_tracker=true to enable";
2445 }
2446 } else if (command == "dump_historic_ops_by_duration") {
2447 if (!op_tracker.dump_historic_ops(f, true)) {
2448 ss << "op_tracker disabled; set mds_enable_op_tracker=true to enable";
2449 }
2450 } else if (command == "osdmap barrier") {
2451 int64_t target_epoch = 0;
2452 bool got_val = cmd_getval(cmdmap, "target_epoch", target_epoch);
2453
2454 if (!got_val) {
2455 ss << "no target epoch given";
2456 r = -EINVAL;
2457 goto out;
2458 }
2459 {
2460 std::lock_guard l(mds_lock);
2461 set_osd_epoch_barrier(target_epoch);
2462 }
2463 C_SaferCond cond;
2464 bool already_got = objecter->wait_for_map(target_epoch, &cond);
2465 if (!already_got) {
2466 dout(4) << __func__ << ": waiting for OSD epoch " << target_epoch << dendl;
2467 cond.wait();
2468 }
2469 } else if (command == "session ls" ||
2470 command == "client ls") {
2471 std::lock_guard l(mds_lock);
2472 std::vector<std::string> filter_args;
2473 cmd_getval(cmdmap, "filters", filter_args);
2474 SessionFilter filter;
2475 r = filter.parse(filter_args, &ss);
2476 if (r != 0) {
2477 goto out;
2478 }
2479 dump_sessions(filter, f);
2480 } else if (command == "session evict" ||
2481 command == "client evict") {
2482 std::lock_guard l(mds_lock);
2483 std::vector<std::string> filter_args;
2484 cmd_getval(cmdmap, "filters", filter_args);
2485
2486 SessionFilter filter;
2487 r = filter.parse(filter_args, &ss);
2488 if (r != 0) {
2489 r = -EINVAL;
2490 goto out;
2491 }
2492 evict_clients(filter, on_finish);
2493 return;
2494 } else if (command == "session kill") {
2495 std::string client_id;
2496 if (!cmd_getval(cmdmap, "client_id", client_id)) {
2497 ss << "Invalid client_id specified";
2498 r = -ENOENT;
2499 goto out;
2500 }
2501 std::lock_guard l(mds_lock);
2502 bool evicted = evict_client(strtol(client_id.c_str(), 0, 10), true,
2503 g_conf()->mds_session_blacklist_on_evict, ss);
2504 if (!evicted) {
2505 dout(15) << ss.str() << dendl;
2506 r = -ENOENT;
2507 }
2508 } else if (command == "session config" ||
2509 command == "client config") {
2510 int64_t client_id;
2511 std::string option;
2512 std::string value;
2513
2514 cmd_getval(cmdmap, "client_id", client_id);
2515 cmd_getval(cmdmap, "option", option);
2516 bool got_value = cmd_getval(cmdmap, "value", value);
2517
2518 std::lock_guard l(mds_lock);
2519 r = config_client(client_id, !got_value, option, value, ss);
2520 } else if (command == "scrub start" ||
2521 command == "scrub_start") {
2522 string path;
2523 string tag;
2524 vector<string> scrubop_vec;
2525 cmd_getval(cmdmap, "scrubops", scrubop_vec);
2526 cmd_getval(cmdmap, "path", path);
2527 cmd_getval(cmdmap, "tag", tag);
2528
2529 /* Multiple MDS scrub is not currently supported. See also: https://tracker.ceph.com/issues/12274 */
2530 if (mdsmap->get_max_mds() > 1) {
2531 ss << "Scrub is not currently supported for multiple active MDS. Please reduce max_mds to 1 and then scrub.";
2532 r = -EINVAL;
2533 goto out;
2534 }
2535
2536 finisher->queue(
2537 new LambdaContext(
2538 [this, on_finish, f, path, tag, scrubop_vec](int r) {
2539 command_scrub_start(
2540 f, path, tag, scrubop_vec,
2541 new LambdaContext(
2542 [on_finish](int r) {
2543 bufferlist outbl;
2544 on_finish(r, {}, outbl);
2545 }));
2546 }));
2547 return;
2548 } else if (command == "scrub abort") {
2549 finisher->queue(
2550 new LambdaContext(
2551 [this, on_finish, f](int r) {
2552 command_scrub_abort(
2553 f,
2554 new LambdaContext(
2555 [on_finish, f](int r) {
2556 bufferlist outbl;
2557 f->open_object_section("result");
2558 f->dump_int("return_code", r);
2559 f->close_section();
2560 on_finish(r, {}, outbl);
2561 }));
2562 }));
2563 return;
2564 } else if (command == "scrub pause") {
2565 finisher->queue(
2566 new LambdaContext(
2567 [this, on_finish, f](int r) {
2568 command_scrub_pause(
2569 f,
2570 new LambdaContext(
2571 [on_finish, f](int r) {
2572 bufferlist outbl;
2573 f->open_object_section("result");
2574 f->dump_int("return_code", r);
2575 f->close_section();
2576 on_finish(r, {}, outbl);
2577 }));
2578 }));
2579 return;
2580 } else if (command == "scrub resume") {
2581 command_scrub_resume(f);
2582 } else if (command == "scrub status") {
2583 command_scrub_status(f);
2584 } else if (command == "tag path") {
2585 string path;
2586 cmd_getval(cmdmap, "path", path);
2587 string tag;
2588 cmd_getval(cmdmap, "tag", tag);
2589 command_tag_path(f, path, tag);
2590 } else if (command == "flush_path") {
2591 string path;
2592 cmd_getval(cmdmap, "path", path);
2593 command_flush_path(f, path);
2594 } else if (command == "flush journal") {
2595 command_flush_journal(f);
2596 } else if (command == "get subtrees") {
2597 command_get_subtrees(f);
2598 } else if (command == "export dir") {
2599 string path;
2600 if(!cmd_getval(cmdmap, "path", path)) {
2601 ss << "malformed path";
2602 r = -EINVAL;
2603 goto out;
2604 }
2605 int64_t rank;
2606 if(!cmd_getval(cmdmap, "rank", rank)) {
2607 ss << "malformed rank";
2608 r = -EINVAL;
2609 goto out;
2610 }
2611 command_export_dir(f, path, (mds_rank_t)rank);
2612 } else if (command == "dump cache") {
2613 std::lock_guard l(mds_lock);
2614 string path;
2615 if (!cmd_getval(cmdmap, "path", path)) {
2616 r = mdcache->dump_cache(f);
2617 } else {
2618 r = mdcache->dump_cache(path);
2619 }
2620 } else if (command == "cache drop") {
2621 int64_t timeout = 0;
2622 cmd_getval(cmdmap, "timeout", timeout);
2623 finisher->queue(
2624 new LambdaContext(
2625 [this, on_finish, f, timeout](int r) {
2626 command_cache_drop(
2627 timeout, f,
2628 new LambdaContext(
2629 [on_finish](int r) {
2630 bufferlist outbl;
2631 on_finish(r, {}, outbl);
2632 }));
2633 }));
2634 return;
2635 } else if (command == "cache status") {
2636 std::lock_guard l(mds_lock);
2637 mdcache->cache_status(f);
2638 } else if (command == "dump tree") {
2639 command_dump_tree(cmdmap, ss, f);
2640 } else if (command == "dump loads") {
2641 std::lock_guard l(mds_lock);
2642 r = balancer->dump_loads(f);
2643 } else if (command == "dump snaps") {
2644 std::lock_guard l(mds_lock);
2645 string server;
2646 cmd_getval(cmdmap, "server", server);
2647 if (server == "--server") {
2648 if (mdsmap->get_tableserver() == whoami) {
2649 snapserver->dump(f);
2650 } else {
2651 r = -EXDEV;
2652 ss << "Not snapserver";
2653 }
2654 } else {
2655 r = snapclient->dump_cache(f);
2656 }
2657 } else if (command == "force_readonly") {
2658 std::lock_guard l(mds_lock);
2659 mdcache->force_readonly();
2660 } else if (command == "dirfrag split") {
2661 command_dirfrag_split(cmdmap, ss);
2662 } else if (command == "dirfrag merge") {
2663 command_dirfrag_merge(cmdmap, ss);
2664 } else if (command == "dirfrag ls") {
2665 command_dirfrag_ls(cmdmap, ss, f);
2666 } else if (command == "openfiles ls") {
2667 command_openfiles_ls(f);
2668 } else if (command == "dump inode") {
2669 command_dump_inode(f, cmdmap, ss);
2670 } else if (command == "damage ls") {
2671 std::lock_guard l(mds_lock);
2672 damage_table.dump(f);
2673 } else if (command == "damage rm") {
2674 std::lock_guard l(mds_lock);
2675 damage_entry_id_t id = 0;
2676 if (!cmd_getval(cmdmap, "damage_id", (int64_t&)id)) {
2677 r = -EINVAL;
2678 goto out;
2679 }
2680 damage_table.erase(id);
2681 } else {
2682 r = -ENOSYS;
2683 }
2684 out:
2685 on_finish(r, ss.str(), outbl);
2686 }
2687
2688 /**
2689 * This function drops the mds_lock, so don't do anything with
2690 * MDSRank after calling it (we could have gone into shutdown): just
2691 * send your result back to the calling client and finish.
2692 */
2693 void MDSRankDispatcher::evict_clients(
2694 const SessionFilter &filter,
2695 std::function<void(int,const std::string&,bufferlist&)> on_finish)
2696 {
2697 bufferlist outbl;
2698 if (is_any_replay()) {
2699 on_finish(-EAGAIN, "MDS is replaying log", outbl);
2700 return;
2701 }
2702
2703 std::vector<Session*> victims;
2704 const auto& sessions = sessionmap.get_sessions();
2705 for (const auto& p : sessions) {
2706 if (!p.first.is_client()) {
2707 continue;
2708 }
2709
2710 Session *s = p.second;
2711
2712 if (filter.match(*s, std::bind(&Server::waiting_for_reconnect, server,
2713 std::placeholders::_1))) {
2714 victims.push_back(s);
2715 }
2716 }
2717
2718 dout(20) << __func__ << " matched " << victims.size() << " sessions" << dendl;
2719
2720 if (victims.empty()) {
2721 on_finish(0, {}, outbl);
2722 return;
2723 }
2724
2725 C_GatherBuilder gather(g_ceph_context,
2726 new LambdaContext([on_finish](int r) {
2727 bufferlist bl;
2728 on_finish(r, {}, bl);
2729 }));
2730 for (const auto s : victims) {
2731 std::stringstream ss;
2732 evict_client(s->get_client().v, false,
2733 g_conf()->mds_session_blacklist_on_evict, ss, gather.new_sub());
2734 }
2735 gather.activate();
2736 }
2737
2738 void MDSRankDispatcher::dump_sessions(const SessionFilter &filter, Formatter *f) const
2739 {
2740 // Dump sessions, decorated with recovery/replay status
2741 f->open_array_section("sessions");
2742 for (auto& [name, s] : sessionmap.get_sessions()) {
2743 if (!name.is_client()) {
2744 continue;
2745 }
2746
2747 if (!filter.match(*s, std::bind(&Server::waiting_for_reconnect, server, std::placeholders::_1))) {
2748 continue;
2749 }
2750
2751 f->dump_object("session", *s);
2752 }
2753 f->close_section(); // sessions
2754 }
2755
2756 void MDSRank::command_scrub_start(Formatter *f,
2757 std::string_view path, std::string_view tag,
2758 const vector<string>& scrubop_vec, Context *on_finish)
2759 {
2760 bool force = false;
2761 bool recursive = false;
2762 bool repair = false;
2763 for (auto &op : scrubop_vec) {
2764 if (op == "force")
2765 force = true;
2766 else if (op == "recursive")
2767 recursive = true;
2768 else if (op == "repair")
2769 repair = true;
2770 }
2771
2772 std::lock_guard l(mds_lock);
2773 mdcache->enqueue_scrub(path, tag, force, recursive, repair, f, on_finish);
2774 // scrub_dentry() finishers will dump the data for us; we're done!
2775 }
2776
2777 void MDSRank::command_tag_path(Formatter *f,
2778 std::string_view path, std::string_view tag)
2779 {
2780 C_SaferCond scond;
2781 {
2782 std::lock_guard l(mds_lock);
2783 mdcache->enqueue_scrub(path, tag, true, true, false, f, &scond);
2784 }
2785 scond.wait();
2786 }
2787
2788 void MDSRank::command_scrub_abort(Formatter *f, Context *on_finish) {
2789 std::lock_guard l(mds_lock);
2790 scrubstack->scrub_abort(on_finish);
2791 }
2792
2793 void MDSRank::command_scrub_pause(Formatter *f, Context *on_finish) {
2794 std::lock_guard l(mds_lock);
2795 scrubstack->scrub_pause(on_finish);
2796 }
2797
2798 void MDSRank::command_scrub_resume(Formatter *f) {
2799 std::lock_guard l(mds_lock);
2800 int r = scrubstack->scrub_resume();
2801
2802 f->open_object_section("result");
2803 f->dump_int("return_code", r);
2804 f->close_section();
2805 }
2806
2807 void MDSRank::command_scrub_status(Formatter *f) {
2808 std::lock_guard l(mds_lock);
2809 scrubstack->scrub_status(f);
2810 }
2811
2812 void MDSRank::command_flush_path(Formatter *f, std::string_view path)
2813 {
2814 C_SaferCond scond;
2815 {
2816 std::lock_guard l(mds_lock);
2817 mdcache->flush_dentry(path, &scond);
2818 }
2819 int r = scond.wait();
2820 f->open_object_section("results");
2821 f->dump_int("return_code", r);
2822 f->close_section(); // results
2823 }
2824
2825 // synchronous wrapper around "journal flush" asynchronous context
2826 // execution.
2827 void MDSRank::command_flush_journal(Formatter *f) {
2828 ceph_assert(f != NULL);
2829
2830 C_SaferCond cond;
2831 std::stringstream ss;
2832 {
2833 std::lock_guard locker(mds_lock);
2834 C_Flush_Journal *flush_journal = new C_Flush_Journal(mdcache, mdlog, this, &ss, &cond);
2835 flush_journal->send();
2836 }
2837 int r = cond.wait();
2838
2839 f->open_object_section("result");
2840 f->dump_string("message", ss.str());
2841 f->dump_int("return_code", r);
2842 f->close_section();
2843 }
2844
2845 void MDSRank::command_get_subtrees(Formatter *f)
2846 {
2847 ceph_assert(f != NULL);
2848 std::lock_guard l(mds_lock);
2849
2850 std::vector<CDir*> subtrees;
2851 mdcache->get_subtrees(subtrees);
2852
2853 f->open_array_section("subtrees");
2854 for (const auto& dir : subtrees) {
2855 f->open_object_section("subtree");
2856 {
2857 f->dump_bool("is_auth", dir->is_auth());
2858 f->dump_int("auth_first", dir->get_dir_auth().first);
2859 f->dump_int("auth_second", dir->get_dir_auth().second);
2860 f->dump_int("export_pin", dir->inode->get_export_pin());
2861 f->open_object_section("dir");
2862 dir->dump(f);
2863 f->close_section();
2864 }
2865 f->close_section();
2866 }
2867 f->close_section();
2868 }
2869
2870
2871 void MDSRank::command_export_dir(Formatter *f,
2872 std::string_view path,
2873 mds_rank_t target)
2874 {
2875 int r = _command_export_dir(path, target);
2876 f->open_object_section("results");
2877 f->dump_int("return_code", r);
2878 f->close_section(); // results
2879 }
2880
2881 int MDSRank::_command_export_dir(
2882 std::string_view path,
2883 mds_rank_t target)
2884 {
2885 std::lock_guard l(mds_lock);
2886 filepath fp(path);
2887
2888 if (target == whoami || !mdsmap->is_up(target) || !mdsmap->is_in(target)) {
2889 derr << "bad MDS target " << target << dendl;
2890 return -ENOENT;
2891 }
2892
2893 CInode *in = mdcache->cache_traverse(fp);
2894 if (!in) {
2895 derr << "Bath path '" << path << "'" << dendl;
2896 return -ENOENT;
2897 }
2898 CDir *dir = in->get_dirfrag(frag_t());
2899 if (!dir || !(dir->is_auth())) {
2900 derr << "bad export_dir path dirfrag frag_t() or dir not auth" << dendl;
2901 return -EINVAL;
2902 }
2903
2904 mdcache->migrator->export_dir(dir, target);
2905 return 0;
2906 }
2907
2908 void MDSRank::command_dump_tree(const cmdmap_t &cmdmap, std::ostream &ss, Formatter *f)
2909 {
2910 std::string root;
2911 int64_t depth;
2912 cmd_getval(cmdmap, "root", root);
2913 if (!cmd_getval(cmdmap, "depth", depth))
2914 depth = -1;
2915 std::lock_guard l(mds_lock);
2916 CInode *in = mdcache->cache_traverse(filepath(root.c_str()));
2917 if (!in) {
2918 ss << "root inode is not in cache";
2919 return;
2920 }
2921 f->open_array_section("inodes");
2922 mdcache->dump_tree(in, 0, depth, f);
2923 f->close_section();
2924 }
2925
2926 CDir *MDSRank::_command_dirfrag_get(
2927 const cmdmap_t &cmdmap,
2928 std::ostream &ss)
2929 {
2930 std::string path;
2931 bool got = cmd_getval(cmdmap, "path", path);
2932 if (!got) {
2933 ss << "missing path argument";
2934 return NULL;
2935 }
2936
2937 std::string frag_str;
2938 if (!cmd_getval(cmdmap, "frag", frag_str)) {
2939 ss << "missing frag argument";
2940 return NULL;
2941 }
2942
2943 CInode *in = mdcache->cache_traverse(filepath(path.c_str()));
2944 if (!in) {
2945 // TODO really we should load something in if it's not in cache,
2946 // but the infrastructure is harder, and we might still be unable
2947 // to act on it if someone else is auth.
2948 ss << "directory '" << path << "' inode not in cache";
2949 return NULL;
2950 }
2951
2952 frag_t fg;
2953
2954 if (!fg.parse(frag_str.c_str())) {
2955 ss << "frag " << frag_str << " failed to parse";
2956 return NULL;
2957 }
2958
2959 CDir *dir = in->get_dirfrag(fg);
2960 if (!dir) {
2961 ss << "frag " << in->ino() << "/" << fg << " not in cache ("
2962 "use `dirfrag ls` to see if it should exist)";
2963 return NULL;
2964 }
2965
2966 if (!dir->is_auth()) {
2967 ss << "frag " << dir->dirfrag() << " not auth (auth = "
2968 << dir->authority() << ")";
2969 return NULL;
2970 }
2971
2972 return dir;
2973 }
2974
2975 bool MDSRank::command_dirfrag_split(
2976 cmdmap_t cmdmap,
2977 std::ostream &ss)
2978 {
2979 std::lock_guard l(mds_lock);
2980 int64_t by = 0;
2981 if (!cmd_getval(cmdmap, "bits", by)) {
2982 ss << "missing bits argument";
2983 return false;
2984 }
2985
2986 if (by <= 0) {
2987 ss << "must split by >0 bits";
2988 return false;
2989 }
2990
2991 CDir *dir = _command_dirfrag_get(cmdmap, ss);
2992 if (!dir) {
2993 return false;
2994 }
2995
2996 mdcache->split_dir(dir, by);
2997
2998 return true;
2999 }
3000
3001 bool MDSRank::command_dirfrag_merge(
3002 cmdmap_t cmdmap,
3003 std::ostream &ss)
3004 {
3005 std::lock_guard l(mds_lock);
3006 std::string path;
3007 bool got = cmd_getval(cmdmap, "path", path);
3008 if (!got) {
3009 ss << "missing path argument";
3010 return false;
3011 }
3012
3013 std::string frag_str;
3014 if (!cmd_getval(cmdmap, "frag", frag_str)) {
3015 ss << "missing frag argument";
3016 return false;
3017 }
3018
3019 CInode *in = mdcache->cache_traverse(filepath(path.c_str()));
3020 if (!in) {
3021 ss << "directory '" << path << "' inode not in cache";
3022 return false;
3023 }
3024
3025 frag_t fg;
3026 if (!fg.parse(frag_str.c_str())) {
3027 ss << "frag " << frag_str << " failed to parse";
3028 return false;
3029 }
3030
3031 mdcache->merge_dir(in, fg);
3032
3033 return true;
3034 }
3035
3036 bool MDSRank::command_dirfrag_ls(
3037 cmdmap_t cmdmap,
3038 std::ostream &ss,
3039 Formatter *f)
3040 {
3041 std::lock_guard l(mds_lock);
3042 std::string path;
3043 bool got = cmd_getval(cmdmap, "path", path);
3044 if (!got) {
3045 ss << "missing path argument";
3046 return false;
3047 }
3048
3049 CInode *in = mdcache->cache_traverse(filepath(path.c_str()));
3050 if (!in) {
3051 ss << "directory inode not in cache";
3052 return false;
3053 }
3054
3055 f->open_array_section("frags");
3056 frag_vec_t leaves;
3057 // NB using get_leaves_under instead of get_dirfrags to give
3058 // you the list of what dirfrags may exist, not which are in cache
3059 in->dirfragtree.get_leaves_under(frag_t(), leaves);
3060 for (const auto& leaf : leaves) {
3061 f->open_object_section("frag");
3062 f->dump_int("value", leaf.value());
3063 f->dump_int("bits", leaf.bits());
3064 CachedStackStringStream css;
3065 *css << std::hex << leaf.value() << "/" << std::dec << leaf.bits();
3066 f->dump_string("str", css->strv());
3067 f->close_section();
3068 }
3069 f->close_section();
3070
3071 return true;
3072 }
3073
3074 void MDSRank::command_openfiles_ls(Formatter *f)
3075 {
3076 std::lock_guard l(mds_lock);
3077 mdcache->dump_openfiles(f);
3078 }
3079
3080 void MDSRank::command_dump_inode(Formatter *f, const cmdmap_t &cmdmap, std::ostream &ss)
3081 {
3082 std::lock_guard l(mds_lock);
3083 int64_t number;
3084 bool got = cmd_getval(cmdmap, "number", number);
3085 if (!got) {
3086 ss << "missing inode number";
3087 return;
3088 }
3089
3090 bool success = mdcache->dump_inode(f, number);
3091 if (!success) {
3092 ss << "dump inode failed, wrong inode number or the inode is not cached";
3093 }
3094 }
3095
3096 void MDSRank::dump_status(Formatter *f) const
3097 {
3098 if (state == MDSMap::STATE_REPLAY ||
3099 state == MDSMap::STATE_STANDBY_REPLAY) {
3100 mdlog->dump_replay_status(f);
3101 } else if (state == MDSMap::STATE_RESOLVE) {
3102 mdcache->dump_resolve_status(f);
3103 } else if (state == MDSMap::STATE_RECONNECT) {
3104 server->dump_reconnect_status(f);
3105 } else if (state == MDSMap::STATE_REJOIN) {
3106 mdcache->dump_rejoin_status(f);
3107 } else if (state == MDSMap::STATE_CLIENTREPLAY) {
3108 dump_clientreplay_status(f);
3109 }
3110 f->dump_float("rank_uptime", get_uptime().count());
3111 }
3112
3113 void MDSRank::dump_clientreplay_status(Formatter *f) const
3114 {
3115 f->open_object_section("clientreplay_status");
3116 f->dump_unsigned("clientreplay_queue", replay_queue.size());
3117 f->dump_unsigned("active_replay", mdcache->get_num_client_requests());
3118 f->close_section();
3119 }
3120
3121 void MDSRankDispatcher::update_log_config()
3122 {
3123 map<string,string> log_to_monitors;
3124 map<string,string> log_to_syslog;
3125 map<string,string> log_channel;
3126 map<string,string> log_prio;
3127 map<string,string> log_to_graylog;
3128 map<string,string> log_to_graylog_host;
3129 map<string,string> log_to_graylog_port;
3130 uuid_d fsid;
3131 string host;
3132
3133 if (parse_log_client_options(g_ceph_context, log_to_monitors, log_to_syslog,
3134 log_channel, log_prio, log_to_graylog,
3135 log_to_graylog_host, log_to_graylog_port,
3136 fsid, host) == 0)
3137 clog->update_config(log_to_monitors, log_to_syslog,
3138 log_channel, log_prio, log_to_graylog,
3139 log_to_graylog_host, log_to_graylog_port,
3140 fsid, host);
3141 dout(10) << __func__ << " log_to_monitors " << log_to_monitors << dendl;
3142 }
3143
3144 void MDSRank::create_logger()
3145 {
3146 dout(10) << "create_logger" << dendl;
3147 {
3148 PerfCountersBuilder mds_plb(g_ceph_context, "mds", l_mds_first, l_mds_last);
3149
3150 // super useful (high prio) perf stats
3151 mds_plb.add_u64_counter(l_mds_request, "request", "Requests", "req",
3152 PerfCountersBuilder::PRIO_CRITICAL);
3153 mds_plb.add_time_avg(l_mds_reply_latency, "reply_latency", "Reply latency", "rlat",
3154 PerfCountersBuilder::PRIO_CRITICAL);
3155 mds_plb.add_u64(l_mds_inodes, "inodes", "Inodes", "inos",
3156 PerfCountersBuilder::PRIO_CRITICAL);
3157 mds_plb.add_u64_counter(l_mds_forward, "forward", "Forwarding request", "fwd",
3158 PerfCountersBuilder::PRIO_INTERESTING);
3159 mds_plb.add_u64(l_mds_caps, "caps", "Capabilities", "caps",
3160 PerfCountersBuilder::PRIO_INTERESTING);
3161 mds_plb.add_u64_counter(l_mds_exported_inodes, "exported_inodes", "Exported inodes",
3162 "exi", PerfCountersBuilder::PRIO_INTERESTING);
3163 mds_plb.add_u64_counter(l_mds_imported_inodes, "imported_inodes", "Imported inodes",
3164 "imi", PerfCountersBuilder::PRIO_INTERESTING);
3165
3166 // useful dir/inode/subtree stats
3167 mds_plb.set_prio_default(PerfCountersBuilder::PRIO_USEFUL);
3168 mds_plb.add_u64(l_mds_root_rfiles, "root_rfiles", "root inode rfiles");
3169 mds_plb.add_u64(l_mds_root_rbytes, "root_rbytes", "root inode rbytes");
3170 mds_plb.add_u64(l_mds_root_rsnaps, "root_rsnaps", "root inode rsnaps");
3171 mds_plb.add_u64_counter(l_mds_dir_fetch, "dir_fetch", "Directory fetch");
3172 mds_plb.add_u64_counter(l_mds_dir_commit, "dir_commit", "Directory commit");
3173 mds_plb.add_u64_counter(l_mds_dir_split, "dir_split", "Directory split");
3174 mds_plb.add_u64_counter(l_mds_dir_merge, "dir_merge", "Directory merge");
3175 mds_plb.add_u64(l_mds_inodes_pinned, "inodes_pinned", "Inodes pinned");
3176 mds_plb.add_u64(l_mds_inodes_expired, "inodes_expired", "Inodes expired");
3177 mds_plb.add_u64(l_mds_inodes_with_caps, "inodes_with_caps",
3178 "Inodes with capabilities");
3179 mds_plb.add_u64(l_mds_subtrees, "subtrees", "Subtrees");
3180 mds_plb.add_u64(l_mds_load_cent, "load_cent", "Load per cent");
3181 mds_plb.add_u64_counter(l_mds_openino_dir_fetch, "openino_dir_fetch",
3182 "OpenIno incomplete directory fetchings");
3183
3184 // low prio stats
3185 mds_plb.set_prio_default(PerfCountersBuilder::PRIO_DEBUGONLY);
3186 mds_plb.add_u64_counter(l_mds_reply, "reply", "Replies");
3187 mds_plb.add_u64(l_mds_inodes_top, "inodes_top", "Inodes on top");
3188 mds_plb.add_u64(l_mds_inodes_bottom, "inodes_bottom", "Inodes on bottom");
3189 mds_plb.add_u64(
3190 l_mds_inodes_pin_tail, "inodes_pin_tail", "Inodes on pin tail");
3191 mds_plb.add_u64_counter(l_mds_traverse, "traverse", "Traverses");
3192 mds_plb.add_u64_counter(l_mds_traverse_hit, "traverse_hit", "Traverse hits");
3193 mds_plb.add_u64_counter(l_mds_traverse_forward, "traverse_forward",
3194 "Traverse forwards");
3195 mds_plb.add_u64_counter(l_mds_traverse_discover, "traverse_discover",
3196 "Traverse directory discovers");
3197 mds_plb.add_u64_counter(l_mds_traverse_dir_fetch, "traverse_dir_fetch",
3198 "Traverse incomplete directory content fetchings");
3199 mds_plb.add_u64_counter(l_mds_traverse_remote_ino, "traverse_remote_ino",
3200 "Traverse remote dentries");
3201 mds_plb.add_u64_counter(l_mds_traverse_lock, "traverse_lock",
3202 "Traverse locks");
3203 mds_plb.add_u64(l_mds_dispatch_queue_len, "q", "Dispatch queue length");
3204 mds_plb.add_u64_counter(l_mds_exported, "exported", "Exports");
3205 mds_plb.add_u64_counter(l_mds_imported, "imported", "Imports");
3206 mds_plb.add_u64_counter(l_mds_openino_backtrace_fetch, "openino_backtrace_fetch",
3207 "OpenIno backtrace fetchings");
3208 mds_plb.add_u64_counter(l_mds_openino_peer_discover, "openino_peer_discover",
3209 "OpenIno peer inode discovers");
3210
3211 logger = mds_plb.create_perf_counters();
3212 g_ceph_context->get_perfcounters_collection()->add(logger);
3213 }
3214
3215 {
3216 PerfCountersBuilder mdm_plb(g_ceph_context, "mds_mem", l_mdm_first, l_mdm_last);
3217 mdm_plb.add_u64(l_mdm_ino, "ino", "Inodes", "ino",
3218 PerfCountersBuilder::PRIO_INTERESTING);
3219 mdm_plb.add_u64(l_mdm_dn, "dn", "Dentries", "dn",
3220 PerfCountersBuilder::PRIO_INTERESTING);
3221
3222 mdm_plb.set_prio_default(PerfCountersBuilder::PRIO_USEFUL);
3223 mdm_plb.add_u64_counter(l_mdm_inoa, "ino+", "Inodes opened");
3224 mdm_plb.add_u64_counter(l_mdm_inos, "ino-", "Inodes closed");
3225 mdm_plb.add_u64(l_mdm_dir, "dir", "Directories");
3226 mdm_plb.add_u64_counter(l_mdm_dira, "dir+", "Directories opened");
3227 mdm_plb.add_u64_counter(l_mdm_dirs, "dir-", "Directories closed");
3228 mdm_plb.add_u64_counter(l_mdm_dna, "dn+", "Dentries opened");
3229 mdm_plb.add_u64_counter(l_mdm_dns, "dn-", "Dentries closed");
3230 mdm_plb.add_u64(l_mdm_cap, "cap", "Capabilities");
3231 mdm_plb.add_u64_counter(l_mdm_capa, "cap+", "Capabilities added");
3232 mdm_plb.add_u64_counter(l_mdm_caps, "cap-", "Capabilities removed");
3233 mdm_plb.add_u64(l_mdm_heap, "heap", "Heap size");
3234
3235 mdm_plb.set_prio_default(PerfCountersBuilder::PRIO_DEBUGONLY);
3236 mdm_plb.add_u64(l_mdm_rss, "rss", "RSS");
3237
3238 mlogger = mdm_plb.create_perf_counters();
3239 g_ceph_context->get_perfcounters_collection()->add(mlogger);
3240 }
3241
3242 mdlog->create_logger();
3243 server->create_logger();
3244 purge_queue.create_logger();
3245 sessionmap.register_perfcounters();
3246 mdcache->register_perfcounters();
3247 }
3248
3249 void MDSRank::check_ops_in_flight()
3250 {
3251 string summary;
3252 vector<string> warnings;
3253 int slow = 0;
3254 if (op_tracker.check_ops_in_flight(&summary, warnings, &slow)) {
3255 clog->warn() << summary;
3256 for (const auto& warning : warnings) {
3257 clog->warn() << warning;
3258 }
3259 }
3260
3261 // set mds slow request count
3262 mds_slow_req_count = slow;
3263 return;
3264 }
3265
3266 void MDSRankDispatcher::handle_osd_map()
3267 {
3268 if (is_active() &&
3269 mdsmap->get_tableserver() == whoami) {
3270 snapserver->check_osd_map(true);
3271 }
3272
3273 server->handle_osd_map();
3274
3275 purge_queue.update_op_limit(*mdsmap);
3276
3277 std::set<entity_addr_t> newly_blacklisted;
3278 objecter->consume_blacklist_events(&newly_blacklisted);
3279 auto epoch = objecter->with_osdmap([](const OSDMap &o){return o.get_epoch();});
3280 dout(4) << "handle_osd_map epoch " << epoch << ", "
3281 << newly_blacklisted.size() << " new blacklist entries" << dendl;
3282 auto victims = server->apply_blacklist(newly_blacklisted);
3283 if (victims) {
3284 set_osd_epoch_barrier(epoch);
3285 }
3286
3287
3288 // By default the objecter only requests OSDMap updates on use,
3289 // we would like to always receive the latest maps in order to
3290 // apply policy based on the FULL flag.
3291 objecter->maybe_request_map();
3292 }
3293
3294 int MDSRank::config_client(int64_t session_id, bool remove,
3295 const std::string& option, const std::string& value,
3296 std::ostream& ss)
3297 {
3298 Session *session = sessionmap.get_session(entity_name_t(CEPH_ENTITY_TYPE_CLIENT, session_id));
3299 if (!session) {
3300 ss << "session " << session_id << " not in sessionmap!";
3301 return -ENOENT;
3302 }
3303
3304 if (option == "timeout") {
3305 if (remove) {
3306 auto it = session->info.client_metadata.find("timeout");
3307 if (it == session->info.client_metadata.end()) {
3308 ss << "Nonexistent config: " << option;
3309 return -ENODATA;
3310 }
3311 session->info.client_metadata.erase(it);
3312 } else {
3313 char *end;
3314 strtoul(value.c_str(), &end, 0);
3315 if (*end) {
3316 ss << "Invalid config for timeout: " << value;
3317 return -EINVAL;
3318 }
3319 session->info.client_metadata[option] = value;
3320 }
3321 //sessionmap._mark_dirty(session, true);
3322 } else {
3323 ss << "Invalid config option: " << option;
3324 return -EINVAL;
3325 }
3326
3327 return 0;
3328 }
3329
3330 bool MDSRank::evict_client(int64_t session_id,
3331 bool wait, bool blacklist, std::ostream& err_ss,
3332 Context *on_killed)
3333 {
3334 ceph_assert(ceph_mutex_is_locked_by_me(mds_lock));
3335
3336 // Mutually exclusive args
3337 ceph_assert(!(wait && on_killed != nullptr));
3338
3339 if (is_any_replay()) {
3340 err_ss << "MDS is replaying log";
3341 return false;
3342 }
3343
3344 Session *session = sessionmap.get_session(
3345 entity_name_t(CEPH_ENTITY_TYPE_CLIENT, session_id));
3346 if (!session) {
3347 err_ss << "session " << session_id << " not in sessionmap!";
3348 return false;
3349 }
3350
3351 auto& addr = session->info.inst.addr;
3352 {
3353 CachedStackStringStream css;
3354 *css << "Evicting " << (blacklist ? "(and blacklisting) " : "")
3355 << "client session " << session_id << " (" << addr << ")";
3356 dout(1) << css->strv() << dendl;
3357 clog->info() << css->strv();
3358 }
3359
3360 dout(4) << "Preparing blacklist command... (wait=" << wait << ")" << dendl;
3361 stringstream ss;
3362 ss << "{\"prefix\":\"osd blacklist\", \"blacklistop\":\"add\",";
3363 ss << "\"addr\":\"";
3364 ss << addr;
3365 ss << "\"}";
3366 std::string tmp = ss.str();
3367 std::vector<std::string> cmd = {tmp};
3368
3369 auto kill_client_session = [this, session_id, wait, on_killed](){
3370 ceph_assert(ceph_mutex_is_locked_by_me(mds_lock));
3371 Session *session = sessionmap.get_session(
3372 entity_name_t(CEPH_ENTITY_TYPE_CLIENT, session_id));
3373 if (session) {
3374 if (on_killed || !wait) {
3375 server->kill_session(session, on_killed);
3376 } else {
3377 C_SaferCond on_safe;
3378 server->kill_session(session, &on_safe);
3379
3380 mds_lock.unlock();
3381 on_safe.wait();
3382 mds_lock.lock();
3383 }
3384 } else {
3385 dout(1) << "session " << session_id << " was removed while we waited "
3386 "for blacklist" << dendl;
3387
3388 // Even though it wasn't us that removed it, kick our completion
3389 // as the session has been removed.
3390 if (on_killed) {
3391 on_killed->complete(0);
3392 }
3393 }
3394 };
3395
3396 auto apply_blacklist = [this, cmd](std::function<void ()> fn){
3397 ceph_assert(ceph_mutex_is_locked_by_me(mds_lock));
3398
3399 Context *on_blacklist_done = new LambdaContext([this, fn](int r) {
3400 objecter->wait_for_latest_osdmap(
3401 new C_OnFinisher(
3402 new LambdaContext([this, fn](int r) {
3403 std::lock_guard l(mds_lock);
3404 auto epoch = objecter->with_osdmap([](const OSDMap &o){
3405 return o.get_epoch();
3406 });
3407
3408 set_osd_epoch_barrier(epoch);
3409
3410 fn();
3411 }), finisher)
3412 );
3413 });
3414
3415 dout(4) << "Sending mon blacklist command: " << cmd[0] << dendl;
3416 monc->start_mon_command(cmd, {}, nullptr, nullptr, on_blacklist_done);
3417 };
3418
3419 if (wait) {
3420 if (blacklist) {
3421 C_SaferCond inline_ctx;
3422 apply_blacklist([&inline_ctx](){inline_ctx.complete(0);});
3423 mds_lock.unlock();
3424 inline_ctx.wait();
3425 mds_lock.lock();
3426 }
3427
3428 // We dropped mds_lock, so check that session still exists
3429 session = sessionmap.get_session(entity_name_t(CEPH_ENTITY_TYPE_CLIENT,
3430 session_id));
3431 if (!session) {
3432 dout(1) << "session " << session_id << " was removed while we waited "
3433 "for blacklist" << dendl;
3434 return true;
3435 }
3436 kill_client_session();
3437 } else {
3438 if (blacklist) {
3439 apply_blacklist(kill_client_session);
3440 } else {
3441 kill_client_session();
3442 }
3443 }
3444
3445 return true;
3446 }
3447
3448 void MDSRank::bcast_mds_map()
3449 {
3450 dout(7) << "bcast_mds_map " << mdsmap->get_epoch() << dendl;
3451
3452 // share the map with mounted clients
3453 set<Session*> clients;
3454 sessionmap.get_client_session_set(clients);
3455 for (const auto &session : clients) {
3456 auto m = make_message<MMDSMap>(monc->get_fsid(), *mdsmap);
3457 session->get_connection()->send_message2(std::move(m));
3458 }
3459 last_client_mdsmap_bcast = mdsmap->get_epoch();
3460 }
3461
3462 MDSRankDispatcher::MDSRankDispatcher(
3463 mds_rank_t whoami_,
3464 ceph::mutex &mds_lock_,
3465 LogChannelRef &clog_,
3466 SafeTimer &timer_,
3467 Beacon &beacon_,
3468 std::unique_ptr<MDSMap> &mdsmap_,
3469 Messenger *msgr,
3470 MonClient *monc_,
3471 MgrClient *mgrc,
3472 Context *respawn_hook_,
3473 Context *suicide_hook_)
3474 : MDSRank(whoami_, mds_lock_, clog_, timer_, beacon_, mdsmap_,
3475 msgr, monc_, mgrc, respawn_hook_, suicide_hook_)
3476 {
3477 g_conf().add_observer(this);
3478 }
3479
3480 void MDSRank::command_cache_drop(uint64_t timeout, Formatter *f, Context *on_finish) {
3481 dout(20) << __func__ << dendl;
3482
3483 std::lock_guard locker(mds_lock);
3484 C_Drop_Cache *request = new C_Drop_Cache(server, mdcache, mdlog, this,
3485 timeout, f, on_finish);
3486 request->send();
3487 }
3488
3489 epoch_t MDSRank::get_osd_epoch() const
3490 {
3491 return objecter->with_osdmap(std::mem_fn(&OSDMap::get_epoch));
3492 }
3493
3494 const char** MDSRankDispatcher::get_tracked_conf_keys() const
3495 {
3496 static const char* KEYS[] = {
3497 "clog_to_graylog",
3498 "clog_to_graylog_host",
3499 "clog_to_graylog_port",
3500 "clog_to_monitors",
3501 "clog_to_syslog",
3502 "clog_to_syslog_facility",
3503 "clog_to_syslog_level",
3504 "fsid",
3505 "host",
3506 "mds_bal_fragment_dirs",
3507 "mds_bal_fragment_interval",
3508 "mds_cache_memory_limit",
3509 "mds_cache_mid",
3510 "mds_cache_reservation",
3511 "mds_cache_trim_decay_rate",
3512 "mds_cap_revoke_eviction_timeout",
3513 "mds_dump_cache_threshold_file",
3514 "mds_dump_cache_threshold_formatter",
3515 "mds_enable_op_tracker",
3516 "mds_health_cache_threshold",
3517 "mds_inject_migrator_session_race",
3518 "mds_log_pause",
3519 "mds_max_export_size",
3520 "mds_max_purge_files",
3521 "mds_forward_all_requests_to_auth",
3522 "mds_max_purge_ops",
3523 "mds_max_purge_ops_per_pg",
3524 "mds_max_snaps_per_dir",
3525 "mds_op_complaint_time",
3526 "mds_op_history_duration",
3527 "mds_op_history_size",
3528 "mds_op_log_threshold",
3529 "mds_recall_max_decay_rate",
3530 "mds_recall_warning_decay_rate",
3531 "mds_request_load_average_decay_rate",
3532 "mds_session_cache_liveness_decay_rate",
3533 "mds_replay_unsafe_with_closed_session",
3534 NULL
3535 };
3536 return KEYS;
3537 }
3538
3539 void MDSRankDispatcher::handle_conf_change(const ConfigProxy& conf, const std::set<std::string>& changed)
3540 {
3541 // XXX with or without mds_lock!
3542
3543 if (changed.count("mds_op_complaint_time") || changed.count("mds_op_log_threshold")) {
3544 op_tracker.set_complaint_and_threshold(conf->mds_op_complaint_time, conf->mds_op_log_threshold);
3545 }
3546 if (changed.count("mds_op_history_size") || changed.count("mds_op_history_duration")) {
3547 op_tracker.set_history_size_and_duration(conf->mds_op_history_size, conf->mds_op_history_duration);
3548 }
3549 if (changed.count("mds_enable_op_tracker")) {
3550 op_tracker.set_tracking(conf->mds_enable_op_tracker);
3551 }
3552 if (changed.count("clog_to_monitors") ||
3553 changed.count("clog_to_syslog") ||
3554 changed.count("clog_to_syslog_level") ||
3555 changed.count("clog_to_syslog_facility") ||
3556 changed.count("clog_to_graylog") ||
3557 changed.count("clog_to_graylog_host") ||
3558 changed.count("clog_to_graylog_port") ||
3559 changed.count("host") ||
3560 changed.count("fsid")) {
3561 update_log_config();
3562 }
3563
3564 finisher->queue(new LambdaContext([this, changed](int) {
3565 std::scoped_lock lock(mds_lock);
3566
3567 if (changed.count("mds_log_pause") && !g_conf()->mds_log_pause) {
3568 mdlog->kick_submitter();
3569 }
3570 sessionmap.handle_conf_change(changed);
3571 server->handle_conf_change(changed);
3572 mdcache->handle_conf_change(changed, *mdsmap);
3573 purge_queue.handle_conf_change(changed, *mdsmap);
3574 }));
3575 }
3576
3577 void MDSRank::get_task_status(std::map<std::string, std::string> *status) {
3578 dout(20) << __func__ << dendl;
3579
3580 // scrub summary for now..
3581 std::string_view scrub_summary = scrubstack->scrub_summary();
3582 status->emplace(SCRUB_STATUS_KEY, std::move(scrub_summary));
3583 }
3584
3585 void MDSRank::schedule_update_timer_task() {
3586 dout(20) << __func__ << dendl;
3587
3588 timer.add_event_after(g_conf().get_val<double>("mds_task_status_update_interval"),
3589 new LambdaContext([this](int) {
3590 send_task_status();
3591 }));
3592 }
3593
3594 void MDSRank::send_task_status() {
3595 std::map<std::string, std::string> status;
3596 get_task_status(&status);
3597
3598 if (!status.empty()) {
3599 dout(20) << __func__ << ": updating " << status.size() << " status keys" << dendl;
3600
3601 int r = mgrc->service_daemon_update_task_status(std::move(status));
3602 if (r < 0) {
3603 derr << ": failed to update service daemon status: " << cpp_strerror(r) << dendl;
3604 }
3605 }
3606
3607 schedule_update_timer_task();
3608 }