]> git.proxmox.com Git - ceph.git/blob - ceph/src/mds/MDSRank.cc
update source to 12.2.11
[ceph.git] / ceph / src / mds / MDSRank.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2015 Red Hat
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #include <boost/utility/string_view.hpp>
16
17 #include "common/debug.h"
18 #include "common/errno.h"
19
20 #include "messages/MClientRequestForward.h"
21 #include "messages/MMDSLoadTargets.h"
22 #include "messages/MMDSMap.h"
23 #include "messages/MMDSTableRequest.h"
24 #include "messages/MCommand.h"
25 #include "messages/MCommandReply.h"
26
27 #include "MDSDaemon.h"
28 #include "MDSMap.h"
29 #include "SnapClient.h"
30 #include "SnapServer.h"
31 #include "MDBalancer.h"
32 #include "Migrator.h"
33 #include "Locker.h"
34 #include "InoTable.h"
35 #include "mon/MonClient.h"
36 #include "common/HeartbeatMap.h"
37 #include "ScrubStack.h"
38
39
40 #include "MDSRank.h"
41
42 #define dout_context g_ceph_context
43 #define dout_subsys ceph_subsys_mds
44 #undef dout_prefix
45 #define dout_prefix *_dout << "mds." << whoami << '.' << incarnation << ' '
46
47 class C_Flush_Journal : public MDSInternalContext {
48 public:
49 C_Flush_Journal(MDCache *mdcache, MDLog *mdlog, MDSRank *mds,
50 std::ostream *ss, Context *on_finish)
51 : MDSInternalContext(mds),
52 mdcache(mdcache), mdlog(mdlog), ss(ss), on_finish(on_finish),
53 whoami(mds->whoami), incarnation(mds->incarnation) {
54 }
55
56 void send() {
57 assert(mds->mds_lock.is_locked());
58
59 dout(20) << __func__ << dendl;
60
61 if (mdcache->is_readonly()) {
62 dout(5) << __func__ << ": read-only FS" << dendl;
63 complete(-EROFS);
64 return;
65 }
66
67 if (!mds->is_active()) {
68 dout(5) << __func__ << ": MDS not active, no-op" << dendl;
69 complete(0);
70 return;
71 }
72
73 flush_mdlog();
74 }
75
76 private:
77
78 void flush_mdlog() {
79 dout(20) << __func__ << dendl;
80
81 // I need to seal off the current segment, and then mark all
82 // previous segments for expiry
83 mdlog->start_new_segment();
84
85 Context *ctx = new FunctionContext([this](int r) {
86 handle_flush_mdlog(r);
87 });
88
89 // Flush initially so that all the segments older than our new one
90 // will be elegible for expiry
91 mdlog->flush();
92 mdlog->wait_for_safe(new MDSInternalContextWrapper(mds, ctx));
93 }
94
95 void handle_flush_mdlog(int r) {
96 dout(20) << __func__ << ": r=" << r << dendl;
97
98 if (r != 0) {
99 *ss << "Error " << r << " (" << cpp_strerror(r) << ") while flushing journal";
100 complete(r);
101 return;
102 }
103
104 clear_mdlog();
105 }
106
107 void clear_mdlog() {
108 dout(20) << __func__ << dendl;
109
110 Context *ctx = new FunctionContext([this](int r) {
111 handle_clear_mdlog(r);
112 });
113
114 // Because we may not be the last wait_for_safe context on MDLog,
115 // and subsequent contexts might wake up in the middle of our
116 // later trim_all and interfere with expiry (by e.g. marking
117 // dirs/dentries dirty on previous log segments), we run a second
118 // wait_for_safe here. See #10368
119 mdlog->wait_for_safe(new MDSInternalContextWrapper(mds, ctx));
120 }
121
122 void handle_clear_mdlog(int r) {
123 dout(20) << __func__ << ": r=" << r << dendl;
124
125 if (r != 0) {
126 *ss << "Error " << r << " (" << cpp_strerror(r) << ") while flushing journal";
127 complete(r);
128 return;
129 }
130
131 trim_mdlog();
132 }
133
134 void trim_mdlog() {
135 // Put all the old log segments into expiring or expired state
136 dout(5) << __func__ << ": beginning segment expiry" << dendl;
137
138 int ret = mdlog->trim_all();
139 if (ret != 0) {
140 *ss << "Error " << ret << " (" << cpp_strerror(ret) << ") while trimming log";
141 complete(ret);
142 return;
143 }
144
145 expire_segments();
146 }
147
148 void expire_segments() {
149 dout(20) << __func__ << dendl;
150
151 // Attach contexts to wait for all expiring segments to expire
152 MDSGatherBuilder *expiry_gather = new MDSGatherBuilder(g_ceph_context);
153
154 const auto &expiring_segments = mdlog->get_expiring_segments();
155 for (auto p : expiring_segments) {
156 p->wait_for_expiry(expiry_gather->new_sub());
157 }
158 dout(5) << __func__ << ": waiting for " << expiry_gather->num_subs_created()
159 << " segments to expire" << dendl;
160
161 if (!expiry_gather->has_subs()) {
162 trim_segments();
163 delete expiry_gather;
164 return;
165 }
166
167 Context *ctx = new FunctionContext([this](int r) {
168 handle_expire_segments(r);
169 });
170 expiry_gather->set_finisher(new MDSInternalContextWrapper(mds, ctx));
171 expiry_gather->activate();
172 }
173
174 void handle_expire_segments(int r) {
175 dout(20) << __func__ << ": r=" << r << dendl;
176
177 ceph_assert(r == 0); // MDLog is not allowed to raise errors via
178 // wait_for_expiry
179 trim_segments();
180 }
181
182 void trim_segments() {
183 dout(20) << __func__ << dendl;
184
185 Context *ctx = new C_OnFinisher(new FunctionContext([this](int _) {
186 Mutex::Locker locker(mds->mds_lock);
187 trim_expired_segments();
188 }), mds->finisher);
189 ctx->complete(0);
190 }
191
192 void trim_expired_segments() {
193 dout(5) << __func__ << ": expiry complete, expire_pos/trim_pos is now "
194 << std::hex << mdlog->get_journaler()->get_expire_pos() << "/"
195 << mdlog->get_journaler()->get_trimmed_pos() << dendl;
196
197 // Now everyone I'm interested in is expired
198 mdlog->trim_expired_segments();
199
200 dout(5) << __func__ << ": trim complete, expire_pos/trim_pos is now "
201 << std::hex << mdlog->get_journaler()->get_expire_pos() << "/"
202 << mdlog->get_journaler()->get_trimmed_pos() << dendl;
203
204 write_journal_head();
205 }
206
207 void write_journal_head() {
208 dout(20) << __func__ << dendl;
209
210 Context *ctx = new FunctionContext([this](int r) {
211 Mutex::Locker locker(mds->mds_lock);
212 handle_write_head(r);
213 });
214 // Flush the journal header so that readers will start from after
215 // the flushed region
216 mdlog->get_journaler()->write_head(ctx);
217 }
218
219 void handle_write_head(int r) {
220 if (r != 0) {
221 *ss << "Error " << r << " (" << cpp_strerror(r) << ") while writing header";
222 } else {
223 dout(5) << __func__ << ": write_head complete, all done!" << dendl;
224 }
225
226 complete(r);
227 }
228
229 void finish(int r) override {
230 dout(20) << __func__ << ": r=" << r << dendl;
231 on_finish->complete(r);
232 }
233
234 MDCache *mdcache;
235 MDLog *mdlog;
236 std::ostream *ss;
237 Context *on_finish;
238
239 // so as to use dout
240 mds_rank_t whoami;
241 int incarnation;
242 };
243
244 class C_Drop_Cache : public MDSInternalContext {
245 public:
246 C_Drop_Cache(Server *server, MDCache *mdcache, MDLog *mdlog,
247 MDSRank *mds, uint64_t recall_timeout,
248 Formatter *f, Context *on_finish)
249 : MDSInternalContext(mds),
250 server(server), mdcache(mdcache), mdlog(mdlog),
251 recall_timeout(recall_timeout), f(f), on_finish(on_finish),
252 whoami(mds->whoami), incarnation(mds->incarnation) {
253 }
254
255 void send() {
256 // not really a hard requirement here, but lets ensure this in
257 // case we change the logic here.
258 assert(mds->mds_lock.is_locked());
259
260 dout(20) << __func__ << dendl;
261 recall_client_state();
262 }
263
264 private:
265 // context which completes itself (with -ETIMEDOUT) after a specified
266 // timeout or when explicitly completed, whichever comes first. Note
267 // that the context does not detroy itself after completion -- it
268 // needs to be explicitly freed.
269 class C_ContextTimeout : public MDSInternalContext {
270 public:
271 C_ContextTimeout(MDSRank *mds, uint64_t timeout, Context *on_finish)
272 : MDSInternalContext(mds),
273 timeout(timeout),
274 lock("mds::context::timeout", false, true),
275 on_finish(on_finish) {
276 }
277 ~C_ContextTimeout() {
278 ceph_assert(timer_task == nullptr);
279 }
280
281 void start_timer() {
282 if (!timeout) {
283 return;
284 }
285
286 timer_task = new FunctionContext([this](int _) {
287 timer_task = nullptr;
288 complete(-ETIMEDOUT);
289 });
290 mds->timer.add_event_after(timeout, timer_task);
291 }
292
293 void finish(int r) override {
294 Context *ctx = nullptr;
295 {
296 Mutex::Locker locker(lock);
297 std::swap(on_finish, ctx);
298 }
299 if (ctx != nullptr) {
300 ctx->complete(r);
301 }
302 }
303 void complete(int r) override {
304 if (timer_task != nullptr) {
305 mds->timer.cancel_event(timer_task);
306 }
307
308 finish(r);
309 }
310
311 uint64_t timeout;
312 Mutex lock;
313 Context *on_finish = nullptr;
314 Context *timer_task = nullptr;
315 };
316
317 void recall_client_state() {
318 dout(20) << __func__ << dendl;
319
320 f->open_object_section("result");
321
322 MDSGatherBuilder *gather = new MDSGatherBuilder(g_ceph_context);
323 server->recall_client_state(1.0, true, gather);
324 if (!gather->has_subs()) {
325 handle_recall_client_state(0);
326 delete gather;
327 return;
328 }
329
330 C_ContextTimeout *ctx = new C_ContextTimeout(
331 mds, recall_timeout, new FunctionContext([this](int r) {
332 handle_recall_client_state(r);
333 }));
334
335 ctx->start_timer();
336 gather->set_finisher(new MDSInternalContextWrapper(mds, ctx));
337 gather->activate();
338 }
339
340 void handle_recall_client_state(int r) {
341 dout(20) << __func__ << ": r=" << r << dendl;
342
343 // client recall section
344 f->open_object_section("client_recall");
345 f->dump_int("return_code", r);
346 f->dump_string("message", cpp_strerror(r));
347 f->close_section();
348
349 // we can still continue after recall timeout
350 trim_cache();
351 }
352
353 void trim_cache() {
354 dout(20) << __func__ << dendl;
355
356 if (!mdcache->trim(UINT64_MAX)) {
357 cmd_err(f, "failed to trim cache");
358 complete(-EINVAL);
359 return;
360 }
361
362 flush_journal();
363 }
364
365 void flush_journal() {
366 dout(20) << __func__ << dendl;
367
368 Context *ctx = new FunctionContext([this](int r) {
369 handle_flush_journal(r);
370 });
371
372 C_Flush_Journal *flush_journal = new C_Flush_Journal(mdcache, mdlog, mds, &ss, ctx);
373 flush_journal->send();
374 }
375
376 void handle_flush_journal(int r) {
377 dout(20) << __func__ << ": r=" << r << dendl;
378
379 if (r != 0) {
380 cmd_err(f, ss.str());
381 complete(r);
382 return;
383 }
384
385 // journal flush section
386 f->open_object_section("flush_journal");
387 f->dump_int("return_code", r);
388 f->dump_string("message", ss.str());
389 f->close_section();
390
391 cache_status();
392 }
393
394 void cache_status() {
395 dout(20) << __func__ << dendl;
396
397 // cache status section
398 mdcache->cache_status(f);
399 f->close_section();
400
401 complete(0);
402 }
403
404 void finish(int r) override {
405 dout(20) << __func__ << ": r=" << r << dendl;
406
407 on_finish->complete(r);
408 }
409
410 Server *server;
411 MDCache *mdcache;
412 MDLog *mdlog;
413 uint64_t recall_timeout;
414 Formatter *f;
415 Context *on_finish;
416
417 int retval = 0;
418 std::stringstream ss;
419
420 // so as to use dout
421 mds_rank_t whoami;
422 int incarnation;
423
424 void cmd_err(Formatter *f, boost::string_view err) {
425 f->reset();
426 f->open_object_section("result");
427 f->dump_string("error", err);
428 f->close_section();
429 }
430 };
431
432 MDSRank::MDSRank(
433 mds_rank_t whoami_,
434 Mutex &mds_lock_,
435 LogChannelRef &clog_,
436 SafeTimer &timer_,
437 Beacon &beacon_,
438 MDSMap *& mdsmap_,
439 Messenger *msgr,
440 MonClient *monc_,
441 Context *respawn_hook_,
442 Context *suicide_hook_)
443 :
444 whoami(whoami_), incarnation(0),
445 mds_lock(mds_lock_), cct(msgr->cct), clog(clog_), timer(timer_),
446 mdsmap(mdsmap_),
447 objecter(new Objecter(g_ceph_context, msgr, monc_, nullptr, 0, 0)),
448 server(NULL), mdcache(NULL), locker(NULL), mdlog(NULL),
449 balancer(NULL), scrubstack(NULL),
450 damage_table(whoami_),
451 inotable(NULL), snapserver(NULL), snapclient(NULL),
452 sessionmap(this), logger(NULL), mlogger(NULL),
453 op_tracker(g_ceph_context, g_conf->mds_enable_op_tracker,
454 g_conf->osd_num_op_tracker_shard),
455 last_state(MDSMap::STATE_BOOT),
456 state(MDSMap::STATE_BOOT),
457 cluster_degraded(false), stopping(false),
458 purge_queue(g_ceph_context, whoami_,
459 mdsmap_->get_metadata_pool(), objecter,
460 new FunctionContext(
461 [this](int r){
462 // Purge Queue operates inside mds_lock when we're calling into
463 // it, and outside when in background, so must handle both cases.
464 if (mds_lock.is_locked_by_me()) {
465 handle_write_error(r);
466 } else {
467 Mutex::Locker l(mds_lock);
468 handle_write_error(r);
469 }
470 }
471 )
472 ),
473 progress_thread(this), dispatch_depth(0),
474 hb(NULL), last_tid(0), osd_epoch_barrier(0), beacon(beacon_),
475 mds_slow_req_count(0),
476 last_client_mdsmap_bcast(0),
477 messenger(msgr), monc(monc_),
478 respawn_hook(respawn_hook_),
479 suicide_hook(suicide_hook_),
480 standby_replaying(false),
481 starttime(mono_clock::now())
482 {
483 hb = g_ceph_context->get_heartbeat_map()->add_worker("MDSRank", pthread_self());
484
485 purge_queue.update_op_limit(*mdsmap);
486
487 objecter->unset_honor_osdmap_full();
488
489 finisher = new Finisher(cct);
490
491 mdcache = new MDCache(this, purge_queue);
492 mdlog = new MDLog(this);
493 balancer = new MDBalancer(this, messenger, monc);
494
495 scrubstack = new ScrubStack(mdcache, finisher);
496
497 inotable = new InoTable(this);
498 snapserver = new SnapServer(this, monc);
499 snapclient = new SnapClient(this);
500
501 server = new Server(this);
502 locker = new Locker(this, mdcache);
503
504 op_tracker.set_complaint_and_threshold(cct->_conf->mds_op_complaint_time,
505 cct->_conf->mds_op_log_threshold);
506 op_tracker.set_history_size_and_duration(cct->_conf->mds_op_history_size,
507 cct->_conf->mds_op_history_duration);
508 }
509
510 MDSRank::~MDSRank()
511 {
512 if (hb) {
513 g_ceph_context->get_heartbeat_map()->remove_worker(hb);
514 }
515
516 if (scrubstack) { delete scrubstack; scrubstack = NULL; }
517 if (mdcache) { delete mdcache; mdcache = NULL; }
518 if (mdlog) { delete mdlog; mdlog = NULL; }
519 if (balancer) { delete balancer; balancer = NULL; }
520 if (inotable) { delete inotable; inotable = NULL; }
521 if (snapserver) { delete snapserver; snapserver = NULL; }
522 if (snapclient) { delete snapclient; snapclient = NULL; }
523 if (mdsmap) { delete mdsmap; mdsmap = 0; }
524
525 if (server) { delete server; server = 0; }
526 if (locker) { delete locker; locker = 0; }
527
528 if (logger) {
529 g_ceph_context->get_perfcounters_collection()->remove(logger);
530 delete logger;
531 logger = 0;
532 }
533 if (mlogger) {
534 g_ceph_context->get_perfcounters_collection()->remove(mlogger);
535 delete mlogger;
536 mlogger = 0;
537 }
538
539 delete finisher;
540 finisher = NULL;
541
542 delete suicide_hook;
543 suicide_hook = NULL;
544
545 delete respawn_hook;
546 respawn_hook = NULL;
547
548 delete objecter;
549 objecter = nullptr;
550 }
551
552 void MDSRankDispatcher::init()
553 {
554 objecter->init();
555 messenger->add_dispatcher_head(objecter);
556
557 objecter->start();
558
559 update_log_config();
560 create_logger();
561
562 // Expose the OSDMap (already populated during MDS::init) to anyone
563 // who is interested in it.
564 handle_osd_map();
565
566 progress_thread.create("mds_rank_progr");
567
568 purge_queue.init();
569
570 finisher->start();
571 }
572
573 void MDSRank::update_targets(utime_t now)
574 {
575 // get MonMap's idea of my export_targets
576 const set<mds_rank_t>& map_targets = mdsmap->get_mds_info(get_nodeid()).export_targets;
577
578 dout(20) << "updating export targets, currently " << map_targets.size() << " ranks are targets" << dendl;
579
580 bool send = false;
581 set<mds_rank_t> new_map_targets;
582
583 auto it = export_targets.begin();
584 while (it != export_targets.end()) {
585 mds_rank_t rank = it->first;
586 double val = it->second.get(now);
587 dout(20) << "export target mds." << rank << " value is " << val << " @ " << now << dendl;
588
589 if (val <= 0.01) {
590 dout(15) << "export target mds." << rank << " is no longer an export target" << dendl;
591 export_targets.erase(it++);
592 send = true;
593 continue;
594 }
595 if (!map_targets.count(rank)) {
596 dout(15) << "export target mds." << rank << " not in map's export_targets" << dendl;
597 send = true;
598 }
599 new_map_targets.insert(rank);
600 it++;
601 }
602 if (new_map_targets.size() < map_targets.size()) {
603 dout(15) << "export target map holds stale targets, sending update" << dendl;
604 send = true;
605 }
606
607 if (send) {
608 dout(15) << "updating export_targets, now " << new_map_targets.size() << " ranks are targets" << dendl;
609 MMDSLoadTargets* m = new MMDSLoadTargets(mds_gid_t(monc->get_global_id()), new_map_targets);
610 monc->send_mon_message(m);
611 }
612 }
613
614 void MDSRank::hit_export_target(utime_t now, mds_rank_t rank, double amount)
615 {
616 double rate = g_conf->mds_bal_target_decay;
617 if (amount < 0.0) {
618 amount = 100.0/g_conf->mds_bal_target_decay; /* a good default for "i am trying to keep this export_target active" */
619 }
620 auto em = export_targets.emplace(std::piecewise_construct, std::forward_as_tuple(rank), std::forward_as_tuple(now, DecayRate(rate)));
621 if (em.second) {
622 dout(15) << "hit export target (new) " << amount << " @ " << now << dendl;
623 } else {
624 dout(15) << "hit export target " << amount << " @ " << now << dendl;
625 }
626 em.first->second.hit(now, amount);
627 }
628
629 void MDSRankDispatcher::tick()
630 {
631 heartbeat_reset();
632
633 if (beacon.is_laggy()) {
634 dout(1) << "skipping upkeep work because connection to Monitors appears laggy" << dendl;
635 return;
636 }
637
638 check_ops_in_flight();
639
640 // Wake up thread in case we use to be laggy and have waiting_for_nolaggy
641 // messages to progress.
642 progress_thread.signal();
643
644 // make sure mds log flushes, trims periodically
645 mdlog->flush();
646
647 // update average session uptime
648 sessionmap.update_average_session_age();
649
650 if (is_active() || is_stopping()) {
651 mdcache->trim();
652 mdcache->trim_client_leases();
653 mdcache->check_memory_usage();
654 mdlog->trim(); // NOT during recovery!
655 }
656
657 // log
658 if (logger) {
659 logger->set(l_mds_subtrees, mdcache->num_subtrees());
660
661 mdcache->log_stat();
662 }
663
664 // ...
665 if (is_clientreplay() || is_active() || is_stopping()) {
666 server->find_idle_sessions();
667 server->evict_cap_revoke_non_responders();
668 locker->tick();
669 }
670
671 if (is_reconnect())
672 server->reconnect_tick();
673
674 if (is_active()) {
675 balancer->tick();
676 mdcache->find_stale_fragment_freeze();
677 mdcache->migrator->find_stale_export_freeze();
678 if (snapserver)
679 snapserver->check_osd_map(false);
680 }
681
682 if (is_active() || is_stopping()) {
683 update_targets(ceph_clock_now());
684 }
685
686 // shut down?
687 if (is_stopping()) {
688 mdlog->trim();
689 if (mdcache->shutdown_pass()) {
690 uint64_t pq_progress = 0 ;
691 uint64_t pq_total = 0;
692 size_t pq_in_flight = 0;
693 if (!purge_queue.drain(&pq_progress, &pq_total, &pq_in_flight)) {
694 dout(7) << "shutdown_pass=true, but still waiting for purge queue"
695 << dendl;
696 // This takes unbounded time, so we must indicate progress
697 // to the administrator: we do it in a slightly imperfect way
698 // by sending periodic (tick frequency) clog messages while
699 // in this state.
700 clog->info() << "MDS rank " << whoami << " waiting for purge queue ("
701 << std::dec << pq_progress << "/" << pq_total << " " << pq_in_flight
702 << " files purging" << ")";
703 } else {
704 dout(7) << "shutdown_pass=true, finished w/ shutdown, moving to "
705 "down:stopped" << dendl;
706 stopping_done();
707 }
708 }
709 else {
710 dout(7) << "shutdown_pass=false" << dendl;
711 }
712 }
713
714 // Expose ourselves to Beacon to update health indicators
715 beacon.notify_health(this);
716 }
717
718 void MDSRankDispatcher::shutdown()
719 {
720 // It should never be possible for shutdown to get called twice, because
721 // anyone picking up mds_lock checks if stopping is true and drops
722 // out if it is.
723 assert(stopping == false);
724 stopping = true;
725
726 dout(1) << __func__ << ": shutting down rank " << whoami << dendl;
727
728 timer.shutdown();
729
730 // MDLog has to shut down before the finisher, because some of its
731 // threads block on IOs that require finisher to complete.
732 mdlog->shutdown();
733
734 // shut down cache
735 mdcache->shutdown();
736
737 purge_queue.shutdown();
738
739 mds_lock.Unlock();
740 finisher->stop(); // no flushing
741 mds_lock.Lock();
742
743 if (objecter->initialized)
744 objecter->shutdown();
745
746 monc->shutdown();
747
748 op_tracker.on_shutdown();
749
750 progress_thread.shutdown();
751
752 // release mds_lock for finisher/messenger threads (e.g.
753 // MDSDaemon::ms_handle_reset called from Messenger).
754 mds_lock.Unlock();
755
756 // shut down messenger
757 messenger->shutdown();
758
759 mds_lock.Lock();
760
761 // Workaround unclean shutdown: HeartbeatMap will assert if
762 // worker is not removed (as we do in ~MDS), but ~MDS is not
763 // always called after suicide.
764 if (hb) {
765 g_ceph_context->get_heartbeat_map()->remove_worker(hb);
766 hb = NULL;
767 }
768 }
769
770 /**
771 * Helper for simple callbacks that call a void fn with no args.
772 */
773 class C_MDS_VoidFn : public MDSInternalContext
774 {
775 typedef void (MDSRank::*fn_ptr)();
776 protected:
777 fn_ptr fn;
778 public:
779 C_MDS_VoidFn(MDSRank *mds_, fn_ptr fn_)
780 : MDSInternalContext(mds_), fn(fn_)
781 {
782 assert(mds_);
783 assert(fn_);
784 }
785
786 void finish(int r) override
787 {
788 (mds->*fn)();
789 }
790 };
791
792 int64_t MDSRank::get_metadata_pool()
793 {
794 return mdsmap->get_metadata_pool();
795 }
796
797 MDSTableClient *MDSRank::get_table_client(int t)
798 {
799 switch (t) {
800 case TABLE_ANCHOR: return NULL;
801 case TABLE_SNAP: return snapclient;
802 default: ceph_abort();
803 }
804 }
805
806 MDSTableServer *MDSRank::get_table_server(int t)
807 {
808 switch (t) {
809 case TABLE_ANCHOR: return NULL;
810 case TABLE_SNAP: return snapserver;
811 default: ceph_abort();
812 }
813 }
814
815 void MDSRank::suicide()
816 {
817 if (suicide_hook) {
818 suicide_hook->complete(0);
819 suicide_hook = NULL;
820 }
821 }
822
823 void MDSRank::respawn()
824 {
825 if (respawn_hook) {
826 respawn_hook->complete(0);
827 respawn_hook = NULL;
828 }
829 }
830
831 void MDSRank::damaged()
832 {
833 assert(whoami != MDS_RANK_NONE);
834 assert(mds_lock.is_locked_by_me());
835
836 beacon.set_want_state(mdsmap, MDSMap::STATE_DAMAGED);
837 monc->flush_log(); // Flush any clog error from before we were called
838 beacon.notify_health(this); // Include latest status in our swan song
839 beacon.send_and_wait(g_conf->mds_mon_shutdown_timeout);
840
841 // It's okay if we timed out and the mon didn't get our beacon, because
842 // another daemon (or ourselves after respawn) will eventually take the
843 // rank and report DAMAGED again when it hits same problem we did.
844
845 respawn(); // Respawn into standby in case mon has other work for us
846 }
847
848 void MDSRank::damaged_unlocked()
849 {
850 Mutex::Locker l(mds_lock);
851 damaged();
852 }
853
854 void MDSRank::handle_write_error(int err)
855 {
856 if (err == -EBLACKLISTED) {
857 derr << "we have been blacklisted (fenced), respawning..." << dendl;
858 respawn();
859 return;
860 }
861
862 if (g_conf->mds_action_on_write_error >= 2) {
863 derr << "unhandled write error " << cpp_strerror(err) << ", suicide..." << dendl;
864 respawn();
865 } else if (g_conf->mds_action_on_write_error == 1) {
866 derr << "unhandled write error " << cpp_strerror(err) << ", force readonly..." << dendl;
867 mdcache->force_readonly();
868 } else {
869 // ignore;
870 derr << "unhandled write error " << cpp_strerror(err) << ", ignore..." << dendl;
871 }
872 }
873
874 void *MDSRank::ProgressThread::entry()
875 {
876 Mutex::Locker l(mds->mds_lock);
877 while (true) {
878 while (!mds->stopping &&
879 mds->finished_queue.empty() &&
880 (mds->waiting_for_nolaggy.empty() || mds->beacon.is_laggy())) {
881 cond.Wait(mds->mds_lock);
882 }
883
884 if (mds->stopping) {
885 break;
886 }
887
888 mds->_advance_queues();
889 }
890
891 return NULL;
892 }
893
894
895 void MDSRank::ProgressThread::shutdown()
896 {
897 assert(mds->mds_lock.is_locked_by_me());
898 assert(mds->stopping);
899
900 if (am_self()) {
901 // Stopping is set, we will fall out of our main loop naturally
902 } else {
903 // Kick the thread to notice mds->stopping, and join it
904 cond.Signal();
905 mds->mds_lock.Unlock();
906 if (is_started())
907 join();
908 mds->mds_lock.Lock();
909 }
910 }
911
912 bool MDSRankDispatcher::ms_dispatch(Message *m)
913 {
914 if (m->get_source().is_client()) {
915 Session *session = static_cast<Session*>(m->get_connection()->get_priv());
916 if (session)
917 session->last_seen = Session::clock::now();
918 }
919
920 inc_dispatch_depth();
921 bool ret = _dispatch(m, true);
922 dec_dispatch_depth();
923 return ret;
924 }
925
926 /* If this function returns true, it recognizes the message and has taken the
927 * reference. If it returns false, it has done neither. */
928 bool MDSRank::_dispatch(Message *m, bool new_msg)
929 {
930 if (is_stale_message(m)) {
931 m->put();
932 return true;
933 }
934
935 if (beacon.is_laggy()) {
936 dout(5) << " laggy, deferring " << *m << dendl;
937 waiting_for_nolaggy.push_back(m);
938 } else if (new_msg && !waiting_for_nolaggy.empty()) {
939 dout(5) << " there are deferred messages, deferring " << *m << dendl;
940 waiting_for_nolaggy.push_back(m);
941 } else {
942 if (!handle_deferrable_message(m)) {
943 dout(0) << "unrecognized message " << *m << dendl;
944 return false;
945 }
946
947 heartbeat_reset();
948 }
949
950 if (dispatch_depth > 1)
951 return true;
952
953 // finish any triggered contexts
954 _advance_queues();
955
956 if (beacon.is_laggy()) {
957 // We've gone laggy during dispatch, don't do any
958 // more housekeeping
959 return true;
960 }
961
962 // done with all client replayed requests?
963 if (is_clientreplay() &&
964 mdcache->is_open() &&
965 replay_queue.empty() &&
966 beacon.get_want_state() == MDSMap::STATE_CLIENTREPLAY) {
967 int num_requests = mdcache->get_num_client_requests();
968 dout(10) << " still have " << num_requests << " active replay requests" << dendl;
969 if (num_requests == 0)
970 clientreplay_done();
971 }
972
973 // hack: thrash exports
974 static utime_t start;
975 utime_t now = ceph_clock_now();
976 if (start == utime_t())
977 start = now;
978 /*double el = now - start;
979 if (el > 30.0 &&
980 el < 60.0)*/
981 for (int i=0; i<g_conf->mds_thrash_exports; i++) {
982 set<mds_rank_t> s;
983 if (!is_active()) break;
984 mdsmap->get_mds_set(s, MDSMap::STATE_ACTIVE);
985 if (s.size() < 2 || CInode::count() < 10)
986 break; // need peers for this to work.
987 if (mdcache->migrator->get_num_exporting() > g_conf->mds_thrash_exports * 5 ||
988 mdcache->migrator->get_export_queue_size() > g_conf->mds_thrash_exports * 10)
989 break;
990
991 dout(7) << "mds thrashing exports pass " << (i+1) << "/" << g_conf->mds_thrash_exports << dendl;
992
993 // pick a random dir inode
994 CInode *in = mdcache->hack_pick_random_inode();
995
996 list<CDir*> ls;
997 in->get_dirfrags(ls);
998 if (!ls.empty()) { // must be an open dir.
999 list<CDir*>::iterator p = ls.begin();
1000 int n = rand() % ls.size();
1001 while (n--)
1002 ++p;
1003 CDir *dir = *p;
1004 if (!dir->get_parent_dir()) continue; // must be linked.
1005 if (!dir->is_auth()) continue; // must be auth.
1006
1007 mds_rank_t dest;
1008 do {
1009 int k = rand() % s.size();
1010 set<mds_rank_t>::iterator p = s.begin();
1011 while (k--) ++p;
1012 dest = *p;
1013 } while (dest == whoami);
1014 mdcache->migrator->export_dir_nicely(dir,dest);
1015 }
1016 }
1017 // hack: thrash fragments
1018 for (int i=0; i<g_conf->mds_thrash_fragments; i++) {
1019 if (!is_active()) break;
1020 if (mdcache->get_num_fragmenting_dirs() > 5 * g_conf->mds_thrash_fragments) break;
1021 dout(7) << "mds thrashing fragments pass " << (i+1) << "/" << g_conf->mds_thrash_fragments << dendl;
1022
1023 // pick a random dir inode
1024 CInode *in = mdcache->hack_pick_random_inode();
1025
1026 list<CDir*> ls;
1027 in->get_dirfrags(ls);
1028 if (ls.empty()) continue; // must be an open dir.
1029 CDir *dir = ls.front();
1030 if (!dir->get_parent_dir()) continue; // must be linked.
1031 if (!dir->is_auth()) continue; // must be auth.
1032 frag_t fg = dir->get_frag();
1033 if (mdsmap->allows_dirfrags()) {
1034 if ((fg == frag_t() || (rand() % (1 << fg.bits()) == 0))) {
1035 mdcache->split_dir(dir, 1);
1036 } else {
1037 balancer->queue_merge(dir);
1038 }
1039 }
1040 }
1041
1042 // hack: force hash root?
1043 /*
1044 if (false &&
1045 mdcache->get_root() &&
1046 mdcache->get_root()->dir &&
1047 !(mdcache->get_root()->dir->is_hashed() ||
1048 mdcache->get_root()->dir->is_hashing())) {
1049 dout(0) << "hashing root" << dendl;
1050 mdcache->migrator->hash_dir(mdcache->get_root()->dir);
1051 }
1052 */
1053
1054 update_mlogger();
1055 return true;
1056 }
1057
1058 void MDSRank::update_mlogger()
1059 {
1060 if (mlogger) {
1061 mlogger->set(l_mdm_ino, CInode::count());
1062 mlogger->set(l_mdm_dir, CDir::count());
1063 mlogger->set(l_mdm_dn, CDentry::count());
1064 mlogger->set(l_mdm_cap, Capability::count());
1065 mlogger->set(l_mdm_inoa, CInode::increments());
1066 mlogger->set(l_mdm_inos, CInode::decrements());
1067 mlogger->set(l_mdm_dira, CDir::increments());
1068 mlogger->set(l_mdm_dirs, CDir::decrements());
1069 mlogger->set(l_mdm_dna, CDentry::increments());
1070 mlogger->set(l_mdm_dns, CDentry::decrements());
1071 mlogger->set(l_mdm_capa, Capability::increments());
1072 mlogger->set(l_mdm_caps, Capability::decrements());
1073 mlogger->set(l_mdm_buf, buffer::get_total_alloc());
1074 }
1075 }
1076
1077 /*
1078 * lower priority messages we defer if we seem laggy
1079 */
1080 bool MDSRank::handle_deferrable_message(Message *m)
1081 {
1082 int port = m->get_type() & 0xff00;
1083
1084 switch (port) {
1085 case MDS_PORT_CACHE:
1086 ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MDS);
1087 mdcache->dispatch(m);
1088 break;
1089
1090 case MDS_PORT_MIGRATOR:
1091 ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MDS);
1092 mdcache->migrator->dispatch(m);
1093 break;
1094
1095 default:
1096 switch (m->get_type()) {
1097 // SERVER
1098 case CEPH_MSG_CLIENT_SESSION:
1099 case CEPH_MSG_CLIENT_RECONNECT:
1100 ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_CLIENT);
1101 // fall-thru
1102 case CEPH_MSG_CLIENT_REQUEST:
1103 server->dispatch(m);
1104 break;
1105 case MSG_MDS_SLAVE_REQUEST:
1106 ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MDS);
1107 server->dispatch(m);
1108 break;
1109
1110 case MSG_MDS_HEARTBEAT:
1111 ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MDS);
1112 balancer->proc_message(m);
1113 break;
1114
1115 case MSG_MDS_TABLE_REQUEST:
1116 ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MDS);
1117 {
1118 MMDSTableRequest *req = static_cast<MMDSTableRequest*>(m);
1119 if (req->op < 0) {
1120 MDSTableClient *client = get_table_client(req->table);
1121 client->handle_request(req);
1122 } else {
1123 MDSTableServer *server = get_table_server(req->table);
1124 server->handle_request(req);
1125 }
1126 }
1127 break;
1128
1129 case MSG_MDS_LOCK:
1130 case MSG_MDS_INODEFILECAPS:
1131 ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MDS);
1132 locker->dispatch(m);
1133 break;
1134
1135 case CEPH_MSG_CLIENT_CAPS:
1136 case CEPH_MSG_CLIENT_CAPRELEASE:
1137 case CEPH_MSG_CLIENT_LEASE:
1138 ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_CLIENT);
1139 locker->dispatch(m);
1140 break;
1141
1142 default:
1143 return false;
1144 }
1145 }
1146
1147 return true;
1148 }
1149
1150 /**
1151 * Advance finished_queue and waiting_for_nolaggy.
1152 *
1153 * Usually drain both queues, but may not drain waiting_for_nolaggy
1154 * if beacon is currently laggy.
1155 */
1156 void MDSRank::_advance_queues()
1157 {
1158 assert(mds_lock.is_locked_by_me());
1159
1160 while (!finished_queue.empty()) {
1161 dout(7) << "mds has " << finished_queue.size() << " queued contexts" << dendl;
1162 dout(10) << finished_queue << dendl;
1163 list<MDSInternalContextBase*> ls;
1164 ls.swap(finished_queue);
1165 while (!ls.empty()) {
1166 dout(10) << " finish " << ls.front() << dendl;
1167 ls.front()->complete(0);
1168 ls.pop_front();
1169
1170 heartbeat_reset();
1171 }
1172 }
1173
1174 while (!waiting_for_nolaggy.empty()) {
1175 // stop if we're laggy now!
1176 if (beacon.is_laggy())
1177 break;
1178
1179 Message *old = waiting_for_nolaggy.front();
1180 waiting_for_nolaggy.pop_front();
1181
1182 if (is_stale_message(old)) {
1183 old->put();
1184 } else {
1185 dout(7) << " processing laggy deferred " << *old << dendl;
1186 if (!handle_deferrable_message(old)) {
1187 dout(0) << "unrecognized message " << *old << dendl;
1188 old->put();
1189 }
1190 }
1191
1192 heartbeat_reset();
1193 }
1194 }
1195
1196 /**
1197 * Call this when you take mds_lock, or periodically if you're going to
1198 * hold the lock for a long time (e.g. iterating over clients/inodes)
1199 */
1200 void MDSRank::heartbeat_reset()
1201 {
1202 // Any thread might jump into mds_lock and call us immediately
1203 // after a call to suicide() completes, in which case MDSRank::hb
1204 // has been freed and we are a no-op.
1205 if (!hb) {
1206 assert(stopping);
1207 return;
1208 }
1209
1210 // NB not enabling suicide grace, because the mon takes care of killing us
1211 // (by blacklisting us) when we fail to send beacons, and it's simpler to
1212 // only have one way of dying.
1213 auto grace = g_conf->get_val<double>("mds_heartbeat_grace");
1214 g_ceph_context->get_heartbeat_map()->reset_timeout(hb, grace, 0);
1215 }
1216
1217 bool MDSRank::is_stale_message(Message *m) const
1218 {
1219 // from bad mds?
1220 if (m->get_source().is_mds()) {
1221 mds_rank_t from = mds_rank_t(m->get_source().num());
1222 if (!mdsmap->have_inst(from) ||
1223 mdsmap->get_inst(from) != m->get_source_inst() ||
1224 mdsmap->is_down(from)) {
1225 // bogus mds?
1226 if (m->get_type() == CEPH_MSG_MDS_MAP) {
1227 dout(5) << "got " << *m << " from old/bad/imposter mds " << m->get_source()
1228 << ", but it's an mdsmap, looking at it" << dendl;
1229 } else if (m->get_type() == MSG_MDS_CACHEEXPIRE &&
1230 mdsmap->get_inst(from) == m->get_source_inst()) {
1231 dout(5) << "got " << *m << " from down mds " << m->get_source()
1232 << ", but it's a cache_expire, looking at it" << dendl;
1233 } else {
1234 dout(5) << "got " << *m << " from down/old/bad/imposter mds " << m->get_source()
1235 << ", dropping" << dendl;
1236 return true;
1237 }
1238 }
1239 }
1240 return false;
1241 }
1242
1243 Session *MDSRank::get_session(Message *m)
1244 {
1245 Session *session = static_cast<Session *>(m->get_connection()->get_priv());
1246 if (session) {
1247 session->put(); // do not carry ref
1248 dout(20) << "get_session have " << session << " " << session->info.inst
1249 << " state " << session->get_state_name() << dendl;
1250 // Check if we've imported an open session since (new sessions start closed)
1251 if (session->is_closed()) {
1252 Session *imported_session = sessionmap.get_session(session->info.inst.name);
1253 if (imported_session && imported_session != session) {
1254 dout(10) << __func__ << " replacing connection bootstrap session " << session << " with imported session " << imported_session << dendl;
1255 imported_session->info.auth_name = session->info.auth_name;
1256 //assert(session->info.auth_name == imported_session->info.auth_name);
1257 assert(session->info.inst == imported_session->info.inst);
1258 imported_session->connection = session->connection;
1259 // send out any queued messages
1260 while (!session->preopen_out_queue.empty()) {
1261 imported_session->connection->send_message(session->preopen_out_queue.front());
1262 session->preopen_out_queue.pop_front();
1263 }
1264 imported_session->auth_caps = session->auth_caps;
1265 assert(session->get_nref() == 1);
1266 imported_session->connection->set_priv(imported_session->get());
1267 imported_session->last_seen = session->last_seen;
1268 session = imported_session;
1269 }
1270 }
1271 } else {
1272 dout(20) << "get_session dne for " << m->get_source_inst() << dendl;
1273 }
1274 return session;
1275 }
1276
1277 void MDSRank::send_message(Message *m, Connection *c)
1278 {
1279 assert(c);
1280 c->send_message(m);
1281 }
1282
1283
1284 void MDSRank::send_message_mds(Message *m, mds_rank_t mds)
1285 {
1286 if (!mdsmap->is_up(mds)) {
1287 dout(10) << "send_message_mds mds." << mds << " not up, dropping " << *m << dendl;
1288 m->put();
1289 return;
1290 }
1291
1292 // send mdsmap first?
1293 if (mds != whoami && peer_mdsmap_epoch[mds] < mdsmap->get_epoch()) {
1294 messenger->send_message(new MMDSMap(monc->get_fsid(), mdsmap),
1295 mdsmap->get_inst(mds));
1296 peer_mdsmap_epoch[mds] = mdsmap->get_epoch();
1297 }
1298
1299 // send message
1300 messenger->send_message(m, mdsmap->get_inst(mds));
1301 }
1302
1303 void MDSRank::forward_message_mds(Message *m, mds_rank_t mds)
1304 {
1305 assert(mds != whoami);
1306
1307 // client request?
1308 if (m->get_type() == CEPH_MSG_CLIENT_REQUEST &&
1309 (static_cast<MClientRequest*>(m))->get_source().is_client()) {
1310 MClientRequest *creq = static_cast<MClientRequest*>(m);
1311 creq->inc_num_fwd(); // inc forward counter
1312
1313 /*
1314 * don't actually forward if non-idempotent!
1315 * client has to do it. although the MDS will ignore duplicate requests,
1316 * the affected metadata may migrate, in which case the new authority
1317 * won't have the metareq_id in the completed request map.
1318 */
1319 // NEW: always make the client resend!
1320 bool client_must_resend = true; //!creq->can_forward();
1321
1322 // tell the client where it should go
1323 messenger->send_message(new MClientRequestForward(creq->get_tid(), mds, creq->get_num_fwd(),
1324 client_must_resend),
1325 creq->get_source_inst());
1326
1327 if (client_must_resend) {
1328 m->put();
1329 return;
1330 }
1331 }
1332
1333 // these are the only types of messages we should be 'forwarding'; they
1334 // explicitly encode their source mds, which gets clobbered when we resend
1335 // them here.
1336 assert(m->get_type() == MSG_MDS_DIRUPDATE ||
1337 m->get_type() == MSG_MDS_EXPORTDIRDISCOVER);
1338
1339 // send mdsmap first?
1340 if (peer_mdsmap_epoch[mds] < mdsmap->get_epoch()) {
1341 messenger->send_message(new MMDSMap(monc->get_fsid(), mdsmap),
1342 mdsmap->get_inst(mds));
1343 peer_mdsmap_epoch[mds] = mdsmap->get_epoch();
1344 }
1345
1346 messenger->send_message(m, mdsmap->get_inst(mds));
1347 }
1348
1349
1350
1351 void MDSRank::send_message_client_counted(Message *m, client_t client)
1352 {
1353 Session *session = sessionmap.get_session(entity_name_t::CLIENT(client.v));
1354 if (session) {
1355 send_message_client_counted(m, session);
1356 } else {
1357 dout(10) << "send_message_client_counted no session for client." << client << " " << *m << dendl;
1358 }
1359 }
1360
1361 void MDSRank::send_message_client_counted(Message *m, Connection *connection)
1362 {
1363 Session *session = static_cast<Session *>(connection->get_priv());
1364 if (session) {
1365 session->put(); // do not carry ref
1366 send_message_client_counted(m, session);
1367 } else {
1368 dout(10) << "send_message_client_counted has no session for " << m->get_source_inst() << dendl;
1369 // another Connection took over the Session
1370 }
1371 }
1372
1373 void MDSRank::send_message_client_counted(Message *m, Session *session)
1374 {
1375 version_t seq = session->inc_push_seq();
1376 dout(10) << "send_message_client_counted " << session->info.inst.name << " seq "
1377 << seq << " " << *m << dendl;
1378 if (session->connection) {
1379 session->connection->send_message(m);
1380 } else {
1381 session->preopen_out_queue.push_back(m);
1382 }
1383 }
1384
1385 void MDSRank::send_message_client(Message *m, Session *session)
1386 {
1387 dout(10) << "send_message_client " << session->info.inst << " " << *m << dendl;
1388 if (session->connection) {
1389 session->connection->send_message(m);
1390 } else {
1391 session->preopen_out_queue.push_back(m);
1392 }
1393 }
1394
1395 /**
1396 * This is used whenever a RADOS operation has been cancelled
1397 * or a RADOS client has been blacklisted, to cause the MDS and
1398 * any clients to wait for this OSD epoch before using any new caps.
1399 *
1400 * See doc/cephfs/eviction
1401 */
1402 void MDSRank::set_osd_epoch_barrier(epoch_t e)
1403 {
1404 dout(4) << __func__ << ": epoch=" << e << dendl;
1405 osd_epoch_barrier = e;
1406 }
1407
1408 void MDSRank::retry_dispatch(Message *m)
1409 {
1410 inc_dispatch_depth();
1411 _dispatch(m, false);
1412 dec_dispatch_depth();
1413 }
1414
1415 double MDSRank::get_dispatch_queue_max_age(utime_t now) const
1416 {
1417 return messenger->get_dispatch_queue_max_age(now);
1418 }
1419
1420 bool MDSRank::is_daemon_stopping() const
1421 {
1422 return stopping;
1423 }
1424
1425 void MDSRank::request_state(MDSMap::DaemonState s)
1426 {
1427 dout(3) << "request_state " << ceph_mds_state_name(s) << dendl;
1428 beacon.set_want_state(mdsmap, s);
1429 beacon.send();
1430 }
1431
1432
1433 class C_MDS_BootStart : public MDSInternalContext {
1434 MDSRank::BootStep nextstep;
1435 public:
1436 C_MDS_BootStart(MDSRank *m, MDSRank::BootStep n)
1437 : MDSInternalContext(m), nextstep(n) {}
1438 void finish(int r) override {
1439 mds->boot_start(nextstep, r);
1440 }
1441 };
1442
1443
1444 void MDSRank::boot_start(BootStep step, int r)
1445 {
1446 // Handle errors from previous step
1447 if (r < 0) {
1448 if (is_standby_replay() && (r == -EAGAIN)) {
1449 dout(0) << "boot_start encountered an error EAGAIN"
1450 << ", respawning since we fell behind journal" << dendl;
1451 respawn();
1452 } else if (r == -EINVAL || r == -ENOENT) {
1453 // Invalid or absent data, indicates damaged on-disk structures
1454 clog->error() << "Error loading MDS rank " << whoami << ": "
1455 << cpp_strerror(r);
1456 damaged();
1457 assert(r == 0); // Unreachable, damaged() calls respawn()
1458 } else if (r == -EROFS) {
1459 dout(0) << "boot error forcing transition to read-only; MDS will try to continue" << dendl;
1460 } else {
1461 // Completely unexpected error, give up and die
1462 dout(0) << "boot_start encountered an error, failing" << dendl;
1463 suicide();
1464 return;
1465 }
1466 }
1467
1468 assert(is_starting() || is_any_replay());
1469
1470 switch(step) {
1471 case MDS_BOOT_INITIAL:
1472 {
1473 mdcache->init_layouts();
1474
1475 MDSGatherBuilder gather(g_ceph_context,
1476 new C_MDS_BootStart(this, MDS_BOOT_OPEN_ROOT));
1477 dout(2) << "boot_start " << step << ": opening inotable" << dendl;
1478 inotable->set_rank(whoami);
1479 inotable->load(gather.new_sub());
1480
1481 dout(2) << "boot_start " << step << ": opening sessionmap" << dendl;
1482 sessionmap.set_rank(whoami);
1483 sessionmap.load(gather.new_sub());
1484
1485 dout(2) << "boot_start " << step << ": opening mds log" << dendl;
1486 mdlog->open(gather.new_sub());
1487
1488 if (is_starting()) {
1489 dout(2) << "boot_start " << step << ": opening purge queue" << dendl;
1490 purge_queue.open(new C_IO_Wrapper(this, gather.new_sub()));
1491 } else if (!standby_replaying) {
1492 dout(2) << "boot_start " << step << ": opening purge queue (async)" << dendl;
1493 purge_queue.open(NULL);
1494 }
1495
1496 if (mdsmap->get_tableserver() == whoami) {
1497 dout(2) << "boot_start " << step << ": opening snap table" << dendl;
1498 snapserver->set_rank(whoami);
1499 snapserver->load(gather.new_sub());
1500 }
1501
1502 gather.activate();
1503 }
1504 break;
1505 case MDS_BOOT_OPEN_ROOT:
1506 {
1507 dout(2) << "boot_start " << step << ": loading/discovering base inodes" << dendl;
1508
1509 MDSGatherBuilder gather(g_ceph_context,
1510 new C_MDS_BootStart(this, MDS_BOOT_PREPARE_LOG));
1511
1512 if (is_starting()) {
1513 // load mydir frag for the first log segment (creating subtree map)
1514 mdcache->open_mydir_frag(gather.new_sub());
1515 } else {
1516 mdcache->open_mydir_inode(gather.new_sub());
1517 }
1518
1519 if (whoami == mdsmap->get_root()) { // load root inode off disk if we are auth
1520 mdcache->open_root_inode(gather.new_sub());
1521 } else if (is_any_replay()) {
1522 // replay. make up fake root inode to start with
1523 mdcache->create_root_inode();
1524 }
1525 gather.activate();
1526 }
1527 break;
1528 case MDS_BOOT_PREPARE_LOG:
1529 if (is_any_replay()) {
1530 dout(2) << "boot_start " << step << ": replaying mds log" << dendl;
1531 MDSGatherBuilder gather(g_ceph_context,
1532 new C_MDS_BootStart(this, MDS_BOOT_REPLAY_DONE));
1533
1534 if (!standby_replaying) {
1535 dout(2) << "boot_start " << step << ": waiting for purge queue recovered" << dendl;
1536 purge_queue.wait_for_recovery(new C_IO_Wrapper(this, gather.new_sub()));
1537 }
1538
1539 mdlog->replay(gather.new_sub());
1540 gather.activate();
1541 } else {
1542 dout(2) << "boot_start " << step << ": positioning at end of old mds log" << dendl;
1543 mdlog->append();
1544 starting_done();
1545 }
1546 break;
1547 case MDS_BOOT_REPLAY_DONE:
1548 assert(is_any_replay());
1549
1550 // Sessiontable and inotable should be in sync after replay, validate
1551 // that they are consistent.
1552 validate_sessions();
1553
1554 replay_done();
1555 break;
1556 }
1557 }
1558
1559 void MDSRank::validate_sessions()
1560 {
1561 assert(mds_lock.is_locked_by_me());
1562 bool valid = true;
1563
1564 // Identify any sessions which have state inconsistent with other,
1565 // after they have been loaded from rados during startup.
1566 // Mitigate bugs like: http://tracker.ceph.com/issues/16842
1567 const auto &sessions = sessionmap.get_sessions();
1568 for (const auto &i : sessions) {
1569 Session *session = i.second;
1570 interval_set<inodeno_t> badones;
1571 if (inotable->intersects_free(session->info.prealloc_inos, &badones)) {
1572 clog->error() << "client " << *session
1573 << "loaded with preallocated inodes that are inconsistent with inotable";
1574 valid = false;
1575 }
1576 }
1577
1578 if (!valid) {
1579 damaged();
1580 assert(valid);
1581 }
1582 }
1583
1584 void MDSRank::starting_done()
1585 {
1586 dout(3) << "starting_done" << dendl;
1587 assert(is_starting());
1588 request_state(MDSMap::STATE_ACTIVE);
1589
1590 mdlog->start_new_segment();
1591 }
1592
1593
1594 void MDSRank::calc_recovery_set()
1595 {
1596 // initialize gather sets
1597 set<mds_rank_t> rs;
1598 mdsmap->get_recovery_mds_set(rs);
1599 rs.erase(whoami);
1600 mdcache->set_recovery_set(rs);
1601
1602 dout(1) << " recovery set is " << rs << dendl;
1603 }
1604
1605
1606 void MDSRank::replay_start()
1607 {
1608 dout(1) << "replay_start" << dendl;
1609
1610 if (is_standby_replay())
1611 standby_replaying = true;
1612
1613 calc_recovery_set();
1614
1615 // Check if we need to wait for a newer OSD map before starting
1616 Context *fin = new C_IO_Wrapper(this, new C_MDS_BootStart(this, MDS_BOOT_INITIAL));
1617 bool const ready = objecter->wait_for_map(
1618 mdsmap->get_last_failure_osd_epoch(),
1619 fin);
1620
1621 if (ready) {
1622 delete fin;
1623 boot_start();
1624 } else {
1625 dout(1) << " waiting for osdmap " << mdsmap->get_last_failure_osd_epoch()
1626 << " (which blacklists prior instance)" << dendl;
1627 }
1628 }
1629
1630
1631 class MDSRank::C_MDS_StandbyReplayRestartFinish : public MDSIOContext {
1632 uint64_t old_read_pos;
1633 public:
1634 C_MDS_StandbyReplayRestartFinish(MDSRank *mds_, uint64_t old_read_pos_) :
1635 MDSIOContext(mds_), old_read_pos(old_read_pos_) {}
1636 void finish(int r) override {
1637 mds->_standby_replay_restart_finish(r, old_read_pos);
1638 }
1639 void print(ostream& out) const override {
1640 out << "standby_replay_restart";
1641 }
1642 };
1643
1644 void MDSRank::_standby_replay_restart_finish(int r, uint64_t old_read_pos)
1645 {
1646 if (old_read_pos < mdlog->get_journaler()->get_trimmed_pos()) {
1647 dout(0) << "standby MDS fell behind active MDS journal's expire_pos, restarting" << dendl;
1648 respawn(); /* we're too far back, and this is easier than
1649 trying to reset everything in the cache, etc */
1650 } else {
1651 mdlog->standby_trim_segments();
1652 boot_start(MDS_BOOT_PREPARE_LOG, r);
1653 }
1654 }
1655
1656 class MDSRank::C_MDS_StandbyReplayRestart : public MDSInternalContext {
1657 public:
1658 explicit C_MDS_StandbyReplayRestart(MDSRank *m) : MDSInternalContext(m) {}
1659 void finish(int r) override {
1660 assert(!r);
1661 mds->standby_replay_restart();
1662 }
1663 };
1664
1665 void MDSRank::standby_replay_restart()
1666 {
1667 if (standby_replaying) {
1668 /* Go around for another pass of replaying in standby */
1669 dout(5) << "Restarting replay as standby-replay" << dendl;
1670 mdlog->get_journaler()->reread_head_and_probe(
1671 new C_MDS_StandbyReplayRestartFinish(
1672 this,
1673 mdlog->get_journaler()->get_read_pos()));
1674 } else {
1675 /* We are transitioning out of standby: wait for OSD map update
1676 before making final pass */
1677 dout(1) << "standby_replay_restart (final takeover pass)" << dendl;
1678 Context *fin = new C_IO_Wrapper(this, new C_MDS_StandbyReplayRestart(this));
1679 bool ready = objecter->wait_for_map(mdsmap->get_last_failure_osd_epoch(), fin);
1680 if (ready) {
1681 delete fin;
1682 mdlog->get_journaler()->reread_head_and_probe(
1683 new C_MDS_StandbyReplayRestartFinish(
1684 this,
1685 mdlog->get_journaler()->get_read_pos()));
1686
1687 dout(1) << " opening purge queue (async)" << dendl;
1688 purge_queue.open(NULL);
1689 } else {
1690 dout(1) << " waiting for osdmap " << mdsmap->get_last_failure_osd_epoch()
1691 << " (which blacklists prior instance)" << dendl;
1692 }
1693 }
1694 }
1695
1696 void MDSRank::replay_done()
1697 {
1698 if (!standby_replaying) {
1699 dout(1) << "Finished replaying journal" << dendl;
1700 } else {
1701 dout(5) << "Finished replaying journal as standby-replay" << dendl;
1702 }
1703
1704 if (is_standby_replay()) {
1705 // The replay was done in standby state, and we are still in that state
1706 assert(standby_replaying);
1707 dout(10) << "setting replay timer" << dendl;
1708 timer.add_event_after(g_conf->mds_replay_interval,
1709 new C_MDS_StandbyReplayRestart(this));
1710 return;
1711 } else if (standby_replaying) {
1712 // The replay was done in standby state, we have now _left_ that state
1713 dout(10) << " last replay pass was as a standby; making final pass" << dendl;
1714 standby_replaying = false;
1715 standby_replay_restart();
1716 return;
1717 } else {
1718 // Replay is complete, journal read should be up to date
1719 assert(mdlog->get_journaler()->get_read_pos() == mdlog->get_journaler()->get_write_pos());
1720 assert(!is_standby_replay());
1721
1722 // Reformat and come back here
1723 if (mdlog->get_journaler()->get_stream_format() < g_conf->mds_journal_format) {
1724 dout(4) << "reformatting journal on standby-replay->replay transition" << dendl;
1725 mdlog->reopen(new C_MDS_BootStart(this, MDS_BOOT_REPLAY_DONE));
1726 return;
1727 }
1728 }
1729
1730 dout(1) << "making mds journal writeable" << dendl;
1731 mdlog->get_journaler()->set_writeable();
1732 mdlog->get_journaler()->trim_tail();
1733
1734 if (g_conf->mds_wipe_sessions) {
1735 dout(1) << "wiping out client sessions" << dendl;
1736 sessionmap.wipe();
1737 sessionmap.save(new C_MDSInternalNoop);
1738 }
1739 if (g_conf->mds_wipe_ino_prealloc) {
1740 dout(1) << "wiping out ino prealloc from sessions" << dendl;
1741 sessionmap.wipe_ino_prealloc();
1742 sessionmap.save(new C_MDSInternalNoop);
1743 }
1744 if (g_conf->mds_skip_ino) {
1745 inodeno_t i = g_conf->mds_skip_ino;
1746 dout(1) << "skipping " << i << " inodes" << dendl;
1747 inotable->skip_inos(i);
1748 inotable->save(new C_MDSInternalNoop);
1749 }
1750
1751 if (mdsmap->get_num_in_mds() == 1 &&
1752 mdsmap->get_num_failed_mds() == 0) { // just me!
1753 dout(2) << "i am alone, moving to state reconnect" << dendl;
1754 request_state(MDSMap::STATE_RECONNECT);
1755 } else {
1756 dout(2) << "i am not alone, moving to state resolve" << dendl;
1757 request_state(MDSMap::STATE_RESOLVE);
1758 }
1759 }
1760
1761 void MDSRank::reopen_log()
1762 {
1763 dout(1) << "reopen_log" << dendl;
1764 mdcache->rollback_uncommitted_fragments();
1765 }
1766
1767
1768 void MDSRank::resolve_start()
1769 {
1770 dout(1) << "resolve_start" << dendl;
1771
1772 reopen_log();
1773
1774 mdcache->resolve_start(new C_MDS_VoidFn(this, &MDSRank::resolve_done));
1775 finish_contexts(g_ceph_context, waiting_for_resolve);
1776 }
1777 void MDSRank::resolve_done()
1778 {
1779 dout(1) << "resolve_done" << dendl;
1780 request_state(MDSMap::STATE_RECONNECT);
1781 }
1782
1783 void MDSRank::reconnect_start()
1784 {
1785 dout(1) << "reconnect_start" << dendl;
1786
1787 if (last_state == MDSMap::STATE_REPLAY) {
1788 reopen_log();
1789 }
1790
1791 // Drop any blacklisted clients from the SessionMap before going
1792 // into reconnect, so that we don't wait for them.
1793 objecter->enable_blacklist_events();
1794 std::set<entity_addr_t> blacklist;
1795 epoch_t epoch = 0;
1796 objecter->with_osdmap([this, &blacklist, &epoch](const OSDMap& o) {
1797 o.get_blacklist(&blacklist);
1798 epoch = o.get_epoch();
1799 });
1800 auto killed = server->apply_blacklist(blacklist);
1801 dout(4) << "reconnect_start: killed " << killed << " blacklisted sessions ("
1802 << blacklist.size() << " blacklist entries, "
1803 << sessionmap.get_sessions().size() << ")" << dendl;
1804 if (killed) {
1805 set_osd_epoch_barrier(epoch);
1806 }
1807
1808 server->reconnect_clients(new C_MDS_VoidFn(this, &MDSRank::reconnect_done));
1809 finish_contexts(g_ceph_context, waiting_for_reconnect);
1810 }
1811 void MDSRank::reconnect_done()
1812 {
1813 dout(1) << "reconnect_done" << dendl;
1814 request_state(MDSMap::STATE_REJOIN); // move to rejoin state
1815 }
1816
1817 void MDSRank::rejoin_joint_start()
1818 {
1819 dout(1) << "rejoin_joint_start" << dendl;
1820 mdcache->rejoin_send_rejoins();
1821 }
1822 void MDSRank::rejoin_start()
1823 {
1824 dout(1) << "rejoin_start" << dendl;
1825 mdcache->rejoin_start(new C_MDS_VoidFn(this, &MDSRank::rejoin_done));
1826 }
1827 void MDSRank::rejoin_done()
1828 {
1829 dout(1) << "rejoin_done" << dendl;
1830 mdcache->show_subtrees();
1831 mdcache->show_cache();
1832
1833 // funny case: is our cache empty? no subtrees?
1834 if (!mdcache->is_subtrees()) {
1835 if (whoami == 0) {
1836 // The root should always have a subtree!
1837 clog->error() << "No subtrees found for root MDS rank!";
1838 damaged();
1839 assert(mdcache->is_subtrees());
1840 } else {
1841 dout(1) << " empty cache, no subtrees, leaving cluster" << dendl;
1842 request_state(MDSMap::STATE_STOPPED);
1843 }
1844 return;
1845 }
1846
1847 if (replay_queue.empty())
1848 request_state(MDSMap::STATE_ACTIVE);
1849 else
1850 request_state(MDSMap::STATE_CLIENTREPLAY);
1851 }
1852
1853 void MDSRank::clientreplay_start()
1854 {
1855 dout(1) << "clientreplay_start" << dendl;
1856 finish_contexts(g_ceph_context, waiting_for_replay); // kick waiters
1857 mdcache->start_files_to_recover();
1858 queue_one_replay();
1859 }
1860
1861 bool MDSRank::queue_one_replay()
1862 {
1863 if (replay_queue.empty()) {
1864 mdlog->wait_for_safe(new C_MDS_VoidFn(this, &MDSRank::clientreplay_done));
1865 return false;
1866 }
1867 queue_waiter(replay_queue.front());
1868 replay_queue.pop_front();
1869 return true;
1870 }
1871
1872 void MDSRank::clientreplay_done()
1873 {
1874 dout(1) << "clientreplay_done" << dendl;
1875 request_state(MDSMap::STATE_ACTIVE);
1876 }
1877
1878 void MDSRank::active_start()
1879 {
1880 dout(1) << "active_start" << dendl;
1881
1882 if (last_state == MDSMap::STATE_CREATING ||
1883 last_state == MDSMap::STATE_STARTING) {
1884 mdcache->open_root();
1885 }
1886
1887 mdcache->clean_open_file_lists();
1888 mdcache->export_remaining_imported_caps();
1889 finish_contexts(g_ceph_context, waiting_for_replay); // kick waiters
1890 mdcache->start_files_to_recover();
1891
1892 mdcache->reissue_all_caps();
1893 mdcache->activate_stray_manager();
1894
1895 finish_contexts(g_ceph_context, waiting_for_active); // kick waiters
1896 }
1897
1898 void MDSRank::recovery_done(int oldstate)
1899 {
1900 dout(1) << "recovery_done -- successful recovery!" << dendl;
1901 assert(is_clientreplay() || is_active());
1902
1903 // kick snaptable (resent AGREEs)
1904 if (mdsmap->get_tableserver() == whoami) {
1905 set<mds_rank_t> active;
1906 mdsmap->get_mds_set_lower_bound(active, MDSMap::STATE_CLIENTREPLAY);
1907 snapserver->finish_recovery(active);
1908 }
1909
1910 if (oldstate == MDSMap::STATE_CREATING)
1911 return;
1912
1913 mdcache->start_recovered_truncates();
1914 mdcache->do_file_recover();
1915
1916 // tell connected clients
1917 //bcast_mds_map(); // not anymore, they get this from the monitor
1918
1919 mdcache->populate_mydir();
1920 }
1921
1922 void MDSRank::creating_done()
1923 {
1924 dout(1)<< "creating_done" << dendl;
1925 request_state(MDSMap::STATE_ACTIVE);
1926 }
1927
1928 void MDSRank::boot_create()
1929 {
1930 dout(3) << "boot_create" << dendl;
1931
1932 MDSGatherBuilder fin(g_ceph_context, new C_MDS_VoidFn(this, &MDSRank::creating_done));
1933
1934 mdcache->init_layouts();
1935
1936 snapserver->set_rank(whoami);
1937 inotable->set_rank(whoami);
1938 sessionmap.set_rank(whoami);
1939
1940 // start with a fresh journal
1941 dout(10) << "boot_create creating fresh journal" << dendl;
1942 mdlog->create(fin.new_sub());
1943
1944 // open new journal segment, but do not journal subtree map (yet)
1945 mdlog->prepare_new_segment();
1946
1947 if (whoami == mdsmap->get_root()) {
1948 dout(3) << "boot_create creating fresh hierarchy" << dendl;
1949 mdcache->create_empty_hierarchy(fin.get());
1950 }
1951
1952 dout(3) << "boot_create creating mydir hierarchy" << dendl;
1953 mdcache->create_mydir_hierarchy(fin.get());
1954
1955 // fixme: fake out inotable (reset, pretend loaded)
1956 dout(10) << "boot_create creating fresh inotable table" << dendl;
1957 inotable->reset();
1958 inotable->save(fin.new_sub());
1959
1960 // write empty sessionmap
1961 sessionmap.save(fin.new_sub());
1962
1963 // Create empty purge queue
1964 purge_queue.create(new C_IO_Wrapper(this, fin.new_sub()));
1965
1966 // initialize tables
1967 if (mdsmap->get_tableserver() == whoami) {
1968 dout(10) << "boot_create creating fresh snaptable" << dendl;
1969 snapserver->reset();
1970 snapserver->save(fin.new_sub());
1971 }
1972
1973 assert(g_conf->mds_kill_create_at != 1);
1974
1975 // ok now journal it
1976 mdlog->journal_segment_subtree_map(fin.new_sub());
1977 mdlog->flush();
1978
1979 // Usually we do this during reconnect, but creation skips that.
1980 objecter->enable_blacklist_events();
1981
1982 fin.activate();
1983 }
1984
1985 void MDSRank::stopping_start()
1986 {
1987 dout(2) << "stopping_start" << dendl;
1988
1989 if (mdsmap->get_num_in_mds() == 1 && !sessionmap.empty()) {
1990 // we're the only mds up!
1991 dout(0) << "we are the last MDS, and have mounted clients: we cannot flush our journal. suicide!" << dendl;
1992 suicide();
1993 }
1994
1995 mdcache->shutdown_start();
1996 }
1997
1998 void MDSRank::stopping_done()
1999 {
2000 dout(2) << "stopping_done" << dendl;
2001
2002 // tell monitor we shut down cleanly.
2003 request_state(MDSMap::STATE_STOPPED);
2004 }
2005
2006 void MDSRankDispatcher::handle_mds_map(
2007 MMDSMap *m,
2008 MDSMap *oldmap)
2009 {
2010 // I am only to be passed MDSMaps in which I hold a rank
2011 assert(whoami != MDS_RANK_NONE);
2012
2013 MDSMap::DaemonState oldstate = state;
2014 mds_gid_t mds_gid = mds_gid_t(monc->get_global_id());
2015 state = mdsmap->get_state_gid(mds_gid);
2016 if (state != oldstate) {
2017 last_state = oldstate;
2018 incarnation = mdsmap->get_inc_gid(mds_gid);
2019 }
2020
2021 version_t epoch = m->get_epoch();
2022
2023 // note source's map version
2024 if (m->get_source().is_mds() &&
2025 peer_mdsmap_epoch[mds_rank_t(m->get_source().num())] < epoch) {
2026 dout(15) << " peer " << m->get_source()
2027 << " has mdsmap epoch >= " << epoch
2028 << dendl;
2029 peer_mdsmap_epoch[mds_rank_t(m->get_source().num())] = epoch;
2030 }
2031
2032 // Validate state transitions while I hold a rank
2033 if (!MDSMap::state_transition_valid(oldstate, state)) {
2034 derr << "Invalid state transition " << ceph_mds_state_name(oldstate)
2035 << "->" << ceph_mds_state_name(state) << dendl;
2036 respawn();
2037 }
2038
2039 if (oldstate != state) {
2040 // update messenger.
2041 if (state == MDSMap::STATE_STANDBY_REPLAY) {
2042 dout(1) << "handle_mds_map i am now mds." << mds_gid << "." << incarnation
2043 << " replaying mds." << whoami << "." << incarnation << dendl;
2044 messenger->set_myname(entity_name_t::MDS(mds_gid));
2045 } else {
2046 dout(1) << "handle_mds_map i am now mds." << whoami << "." << incarnation << dendl;
2047 messenger->set_myname(entity_name_t::MDS(whoami));
2048 }
2049 }
2050
2051 // tell objecter my incarnation
2052 if (objecter->get_client_incarnation() != incarnation)
2053 objecter->set_client_incarnation(incarnation);
2054
2055 // for debug
2056 if (g_conf->mds_dump_cache_on_map)
2057 mdcache->dump_cache();
2058
2059 cluster_degraded = mdsmap->is_degraded();
2060
2061 // mdsmap and oldmap can be discontinuous. failover might happen in the missing mdsmap.
2062 // the 'restart' set tracks ranks that have restarted since the old mdsmap
2063 set<mds_rank_t> restart;
2064 // replaying mds does not communicate with other ranks
2065 if (state >= MDSMap::STATE_RESOLVE) {
2066 // did someone fail?
2067 // new down?
2068 set<mds_rank_t> olddown, down;
2069 oldmap->get_down_mds_set(&olddown);
2070 mdsmap->get_down_mds_set(&down);
2071 for (const auto& r : down) {
2072 if (oldmap->have_inst(r) && olddown.count(r) == 0) {
2073 messenger->mark_down(oldmap->get_inst(r).addr);
2074 handle_mds_failure(r);
2075 }
2076 }
2077
2078 // did someone fail?
2079 // did their addr/inst change?
2080 set<mds_rank_t> up;
2081 mdsmap->get_up_mds_set(up);
2082 for (const auto& r : up) {
2083 auto& info = mdsmap->get_info(r);
2084 if (oldmap->have_inst(r)) {
2085 auto& oldinfo = oldmap->get_info(r);
2086 if (info.inc != oldinfo.inc) {
2087 messenger->mark_down(oldinfo.addr);
2088 if (info.state == MDSMap::STATE_REPLAY ||
2089 info.state == MDSMap::STATE_RESOLVE) {
2090 restart.insert(r);
2091 handle_mds_failure(r);
2092 } else {
2093 assert(info.state == MDSMap::STATE_STARTING ||
2094 info.state == MDSMap::STATE_ACTIVE);
2095 // -> stopped (missing) -> starting -> active
2096 restart.insert(r);
2097 mdcache->migrator->handle_mds_failure_or_stop(r);
2098 }
2099 }
2100 } else {
2101 if (info.state == MDSMap::STATE_REPLAY ||
2102 info.state == MDSMap::STATE_RESOLVE) {
2103 // -> starting/creating (missing) -> active (missing) -> replay -> resolve
2104 restart.insert(r);
2105 handle_mds_failure(r);
2106 } else {
2107 assert(info.state == MDSMap::STATE_CREATING ||
2108 info.state == MDSMap::STATE_STARTING ||
2109 info.state == MDSMap::STATE_ACTIVE);
2110 }
2111 }
2112 }
2113 }
2114
2115 // did it change?
2116 if (oldstate != state) {
2117 dout(1) << "handle_mds_map state change "
2118 << ceph_mds_state_name(oldstate) << " --> "
2119 << ceph_mds_state_name(state) << dendl;
2120 beacon.set_want_state(mdsmap, state);
2121
2122 if (oldstate == MDSMap::STATE_STANDBY_REPLAY) {
2123 dout(10) << "Monitor activated us! Deactivating replay loop" << dendl;
2124 assert (state == MDSMap::STATE_REPLAY);
2125 } else {
2126 // did i just recover?
2127 if ((is_active() || is_clientreplay()) &&
2128 (oldstate == MDSMap::STATE_CREATING ||
2129 oldstate == MDSMap::STATE_REJOIN ||
2130 oldstate == MDSMap::STATE_RECONNECT))
2131 recovery_done(oldstate);
2132
2133 if (is_active()) {
2134 active_start();
2135 } else if (is_any_replay()) {
2136 replay_start();
2137 } else if (is_resolve()) {
2138 resolve_start();
2139 } else if (is_reconnect()) {
2140 reconnect_start();
2141 } else if (is_rejoin()) {
2142 rejoin_start();
2143 } else if (is_clientreplay()) {
2144 clientreplay_start();
2145 } else if (is_creating()) {
2146 boot_create();
2147 } else if (is_starting()) {
2148 boot_start();
2149 } else if (is_stopping()) {
2150 assert(oldstate == MDSMap::STATE_ACTIVE);
2151 stopping_start();
2152 }
2153 }
2154 }
2155
2156 // RESOLVE
2157 // is someone else newly resolving?
2158 if (state >= MDSMap::STATE_RESOLVE) {
2159 if ((!oldmap->is_resolving() || !restart.empty()) && mdsmap->is_resolving()) {
2160 set<mds_rank_t> resolve;
2161 mdsmap->get_mds_set(resolve, MDSMap::STATE_RESOLVE);
2162 dout(10) << " resolve set is " << resolve << dendl;
2163 calc_recovery_set();
2164 mdcache->send_resolves();
2165 }
2166 }
2167
2168 // REJOIN
2169 // is everybody finally rejoining?
2170 if (state >= MDSMap::STATE_REJOIN) {
2171 // did we start?
2172 if (!oldmap->is_rejoining() && mdsmap->is_rejoining())
2173 rejoin_joint_start();
2174
2175 // did we finish?
2176 if (g_conf->mds_dump_cache_after_rejoin &&
2177 oldmap->is_rejoining() && !mdsmap->is_rejoining())
2178 mdcache->dump_cache(); // for DEBUG only
2179
2180 if (oldstate >= MDSMap::STATE_REJOIN ||
2181 oldstate == MDSMap::STATE_STARTING) {
2182 // ACTIVE|CLIENTREPLAY|REJOIN => we can discover from them.
2183 set<mds_rank_t> olddis, dis;
2184 oldmap->get_mds_set_lower_bound(olddis, MDSMap::STATE_REJOIN);
2185 mdsmap->get_mds_set_lower_bound(dis, MDSMap::STATE_REJOIN);
2186 for (const auto& r : dis) {
2187 if (r == whoami)
2188 continue; // not me
2189 if (!olddis.count(r) || restart.count(r)) { // newly so?
2190 mdcache->kick_discovers(r);
2191 mdcache->kick_open_ino_peers(r);
2192 }
2193 }
2194 }
2195 }
2196
2197 if (oldmap->is_degraded() && !cluster_degraded && state >= MDSMap::STATE_ACTIVE) {
2198 dout(1) << "cluster recovered." << dendl;
2199 auto it = waiting_for_active_peer.find(MDS_RANK_NONE);
2200 if (it != waiting_for_active_peer.end()) {
2201 queue_waiters(it->second);
2202 waiting_for_active_peer.erase(it);
2203 }
2204 }
2205
2206 // did someone go active?
2207 if (state >= MDSMap::STATE_CLIENTREPLAY &&
2208 oldstate >= MDSMap::STATE_CLIENTREPLAY) {
2209 set<mds_rank_t> oldactive, active;
2210 oldmap->get_mds_set_lower_bound(oldactive, MDSMap::STATE_CLIENTREPLAY);
2211 mdsmap->get_mds_set_lower_bound(active, MDSMap::STATE_CLIENTREPLAY);
2212 for (const auto& r : active) {
2213 if (r == whoami)
2214 continue; // not me
2215 if (!oldactive.count(r) || restart.count(r)) // newly so?
2216 handle_mds_recovery(r);
2217 }
2218 }
2219
2220 if (state >= MDSMap::STATE_CLIENTREPLAY) {
2221 // did anyone stop?
2222 set<mds_rank_t> oldstopped, stopped;
2223 oldmap->get_stopped_mds_set(oldstopped);
2224 mdsmap->get_stopped_mds_set(stopped);
2225 for (const auto& r : stopped)
2226 if (oldstopped.count(r) == 0) // newly so?
2227 mdcache->migrator->handle_mds_failure_or_stop(r);
2228 }
2229
2230 {
2231 map<epoch_t,list<MDSInternalContextBase*> >::iterator p = waiting_for_mdsmap.begin();
2232 while (p != waiting_for_mdsmap.end() && p->first <= mdsmap->get_epoch()) {
2233 list<MDSInternalContextBase*> ls;
2234 ls.swap(p->second);
2235 waiting_for_mdsmap.erase(p++);
2236 queue_waiters(ls);
2237 }
2238 }
2239
2240 if (is_active()) {
2241 // Before going active, set OSD epoch barrier to latest (so that
2242 // we don't risk handing out caps to clients with old OSD maps that
2243 // might not include barriers from the previous incarnation of this MDS)
2244 set_osd_epoch_barrier(objecter->with_osdmap(
2245 std::mem_fn(&OSDMap::get_epoch)));
2246 }
2247
2248 if (is_active()) {
2249 bool found = false;
2250 MDSMap::mds_info_t info = mdsmap->get_info(whoami);
2251
2252 for (map<mds_gid_t,MDSMap::mds_info_t>::const_iterator p = mdsmap->get_mds_info().begin();
2253 p != mdsmap->get_mds_info().end();
2254 ++p) {
2255 if (p->second.state == MDSMap::STATE_STANDBY_REPLAY &&
2256 (p->second.standby_for_rank == whoami ||(info.name.length() && p->second.standby_for_name == info.name))) {
2257 found = true;
2258 break;
2259 }
2260 if (found)
2261 mdlog->set_write_iohint(0);
2262 else
2263 mdlog->set_write_iohint(CEPH_OSD_OP_FLAG_FADVISE_DONTNEED);
2264 }
2265 }
2266
2267 if (oldmap->get_max_mds() != mdsmap->get_max_mds()) {
2268 purge_queue.update_op_limit(*mdsmap);
2269 }
2270 }
2271
2272 void MDSRank::handle_mds_recovery(mds_rank_t who)
2273 {
2274 dout(5) << "handle_mds_recovery mds." << who << dendl;
2275
2276 mdcache->handle_mds_recovery(who);
2277
2278 if (mdsmap->get_tableserver() == whoami) {
2279 snapserver->handle_mds_recovery(who);
2280 }
2281
2282 queue_waiters(waiting_for_active_peer[who]);
2283 waiting_for_active_peer.erase(who);
2284 }
2285
2286 void MDSRank::handle_mds_failure(mds_rank_t who)
2287 {
2288 if (who == whoami) {
2289 dout(5) << "handle_mds_failure for myself; not doing anything" << dendl;
2290 return;
2291 }
2292 dout(5) << "handle_mds_failure mds." << who << dendl;
2293
2294 mdcache->handle_mds_failure(who);
2295
2296 snapclient->handle_mds_failure(who);
2297 }
2298
2299 bool MDSRankDispatcher::handle_asok_command(
2300 std::string command, cmdmap_t& cmdmap, Formatter *f,
2301 std::ostream& ss)
2302 {
2303 if (command == "dump_ops_in_flight" ||
2304 command == "ops") {
2305 if (!op_tracker.dump_ops_in_flight(f)) {
2306 ss << "op_tracker tracking is not enabled now, so no ops are tracked currently, even those get stuck. \
2307 please enable \"osd_enable_op_tracker\", and the tracker will start to track new ops received afterwards.";
2308 }
2309 } else if (command == "dump_blocked_ops") {
2310 if (!op_tracker.dump_ops_in_flight(f, true)) {
2311 ss << "op_tracker tracking is not enabled now, so no ops are tracked currently, even those get stuck. \
2312 Please enable \"osd_enable_op_tracker\", and the tracker will start to track new ops received afterwards.";
2313 }
2314 } else if (command == "dump_historic_ops") {
2315 if (!op_tracker.dump_historic_ops(f)) {
2316 ss << "op_tracker tracking is not enabled now, so no ops are tracked currently, even those get stuck. \
2317 please enable \"osd_enable_op_tracker\", and the tracker will start to track new ops received afterwards.";
2318 }
2319 } else if (command == "dump_historic_ops_by_duration") {
2320 if (!op_tracker.dump_historic_ops(f, true)) {
2321 ss << "op_tracker tracking is not enabled now, so no ops are tracked currently, even those get stuck. \
2322 please enable \"osd_enable_op_tracker\", and the tracker will start to track new ops received afterwards.";
2323 }
2324 } else if (command == "osdmap barrier") {
2325 int64_t target_epoch = 0;
2326 bool got_val = cmd_getval(g_ceph_context, cmdmap, "target_epoch", target_epoch);
2327
2328 if (!got_val) {
2329 ss << "no target epoch given";
2330 return true;
2331 }
2332
2333 mds_lock.Lock();
2334 set_osd_epoch_barrier(target_epoch);
2335 mds_lock.Unlock();
2336
2337 C_SaferCond cond;
2338 bool already_got = objecter->wait_for_map(target_epoch, &cond);
2339 if (!already_got) {
2340 dout(4) << __func__ << ": waiting for OSD epoch " << target_epoch << dendl;
2341 cond.wait();
2342 }
2343 } else if (command == "session ls") {
2344 Mutex::Locker l(mds_lock);
2345
2346 heartbeat_reset();
2347
2348 dump_sessions(SessionFilter(), f);
2349 } else if (command == "session evict") {
2350 std::string client_id;
2351 const bool got_arg = cmd_getval(g_ceph_context, cmdmap, "client_id", client_id);
2352 if(!got_arg) {
2353 ss << "Invalid client_id specified";
2354 return true;
2355 }
2356
2357 mds_lock.Lock();
2358 std::stringstream dss;
2359 bool evicted = evict_client(strtol(client_id.c_str(), 0, 10), true,
2360 g_conf->mds_session_blacklist_on_evict, dss);
2361 if (!evicted) {
2362 dout(15) << dss.str() << dendl;
2363 ss << dss.str();
2364 }
2365 mds_lock.Unlock();
2366 } else if (command == "scrub_path") {
2367 string path;
2368 vector<string> scrubop_vec;
2369 cmd_getval(g_ceph_context, cmdmap, "scrubops", scrubop_vec);
2370 cmd_getval(g_ceph_context, cmdmap, "path", path);
2371 command_scrub_path(f, path, scrubop_vec);
2372 } else if (command == "tag path") {
2373 string path;
2374 cmd_getval(g_ceph_context, cmdmap, "path", path);
2375 string tag;
2376 cmd_getval(g_ceph_context, cmdmap, "tag", tag);
2377 command_tag_path(f, path, tag);
2378 } else if (command == "flush_path") {
2379 string path;
2380 cmd_getval(g_ceph_context, cmdmap, "path", path);
2381 command_flush_path(f, path);
2382 } else if (command == "flush journal") {
2383 command_flush_journal(f);
2384 } else if (command == "get subtrees") {
2385 command_get_subtrees(f);
2386 } else if (command == "export dir") {
2387 string path;
2388 if(!cmd_getval(g_ceph_context, cmdmap, "path", path)) {
2389 ss << "malformed path";
2390 return true;
2391 }
2392 int64_t rank;
2393 if(!cmd_getval(g_ceph_context, cmdmap, "rank", rank)) {
2394 ss << "malformed rank";
2395 return true;
2396 }
2397 command_export_dir(f, path, (mds_rank_t)rank);
2398 } else if (command == "dump cache") {
2399 Mutex::Locker l(mds_lock);
2400 string path;
2401 int r;
2402 if(!cmd_getval(g_ceph_context, cmdmap, "path", path)) {
2403 r = mdcache->dump_cache(f);
2404 } else {
2405 r = mdcache->dump_cache(path);
2406 }
2407
2408 if (r != 0) {
2409 ss << "Failed to dump cache: " << cpp_strerror(r);
2410 f->reset();
2411 }
2412 } else if (command == "cache status") {
2413 Mutex::Locker l(mds_lock);
2414 mdcache->cache_status(f);
2415 } else if (command == "cache drop") {
2416 int64_t timeout;
2417 if (!cmd_getval(g_ceph_context, cmdmap, "timeout", timeout)) {
2418 timeout = 0;
2419 }
2420
2421 C_SaferCond cond;
2422 command_cache_drop((uint64_t)timeout, f, &cond);
2423 int r = cond.wait();
2424 if (r != 0) {
2425 f->flush(ss);
2426 }
2427 } else if (command == "dump tree") {
2428 string root;
2429 int64_t depth;
2430 cmd_getval(g_ceph_context, cmdmap, "root", root);
2431 if (!cmd_getval(g_ceph_context, cmdmap, "depth", depth))
2432 depth = -1;
2433 {
2434 Mutex::Locker l(mds_lock);
2435 int r = mdcache->dump_cache(root, depth, f);
2436 if (r != 0) {
2437 ss << "Failed to dump tree: " << cpp_strerror(r);
2438 f->reset();
2439 }
2440 }
2441 } else if (command == "dump loads") {
2442 Mutex::Locker l(mds_lock);
2443 int r = balancer->dump_loads(f);
2444 if (r != 0) {
2445 ss << "Failed to dump loads: " << cpp_strerror(r);
2446 f->reset();
2447 }
2448 } else if (command == "force_readonly") {
2449 Mutex::Locker l(mds_lock);
2450 mdcache->force_readonly();
2451 } else if (command == "dirfrag split") {
2452 command_dirfrag_split(cmdmap, ss);
2453 } else if (command == "dirfrag merge") {
2454 command_dirfrag_merge(cmdmap, ss);
2455 } else if (command == "dirfrag ls") {
2456 command_dirfrag_ls(cmdmap, ss, f);
2457 } else {
2458 return false;
2459 }
2460
2461 return true;
2462 }
2463
2464 class C_MDS_Send_Command_Reply : public MDSInternalContext {
2465 protected:
2466 MCommand *m;
2467 public:
2468 C_MDS_Send_Command_Reply(MDSRank *_mds, MCommand *_m) :
2469 MDSInternalContext(_mds), m(_m) { m->get(); }
2470
2471 void send(int r, boost::string_view ss) {
2472 std::stringstream ds;
2473 send(r, ss, ds);
2474 }
2475
2476 void send(int r, boost::string_view ss, std::stringstream &ds) {
2477 bufferlist bl;
2478 bl.append(ds);
2479 MDSDaemon::send_command_reply(m, mds, r, bl, ss);
2480 m->put();
2481 }
2482
2483 void finish(int r) override {
2484 send(r, "");
2485 }
2486 };
2487
2488 /**
2489 * This function drops the mds_lock, so don't do anything with
2490 * MDSRank after calling it (we could have gone into shutdown): just
2491 * send your result back to the calling client and finish.
2492 */
2493 void MDSRankDispatcher::evict_clients(const SessionFilter &filter, MCommand *m)
2494 {
2495 C_MDS_Send_Command_Reply *reply = new C_MDS_Send_Command_Reply(this, m);
2496
2497 if (is_any_replay()) {
2498 reply->send(-EAGAIN, "MDS is replaying log");
2499 delete reply;
2500 return;
2501 }
2502
2503 std::list<Session*> victims;
2504 const auto sessions = sessionmap.get_sessions();
2505 for (const auto p : sessions) {
2506 if (!p.first.is_client()) {
2507 continue;
2508 }
2509
2510 Session *s = p.second;
2511
2512 if (filter.match(*s, std::bind(&Server::waiting_for_reconnect, server, std::placeholders::_1))) {
2513 victims.push_back(s);
2514 }
2515 }
2516
2517 dout(20) << __func__ << " matched " << victims.size() << " sessions" << dendl;
2518
2519 if (victims.empty()) {
2520 reply->send(0, "");
2521 delete reply;
2522 return;
2523 }
2524
2525 C_GatherBuilder gather(g_ceph_context, reply);
2526 for (const auto s : victims) {
2527 std::stringstream ss;
2528 evict_client(s->info.inst.name.num(), false,
2529 g_conf->mds_session_blacklist_on_evict, ss, gather.new_sub());
2530 }
2531 gather.activate();
2532 }
2533
2534 void MDSRankDispatcher::dump_sessions(const SessionFilter &filter, Formatter *f) const
2535 {
2536 // Dump sessions, decorated with recovery/replay status
2537 f->open_array_section("sessions");
2538 const ceph::unordered_map<entity_name_t, Session*> session_map = sessionmap.get_sessions();
2539 for (ceph::unordered_map<entity_name_t,Session*>::const_iterator p = session_map.begin();
2540 p != session_map.end();
2541 ++p) {
2542 if (!p->first.is_client()) {
2543 continue;
2544 }
2545
2546 Session *s = p->second;
2547
2548 if (!filter.match(*s, std::bind(&Server::waiting_for_reconnect, server, std::placeholders::_1))) {
2549 continue;
2550 }
2551
2552 f->open_object_section("session");
2553 f->dump_int("id", p->first.num());
2554
2555 f->dump_int("num_leases", s->leases.size());
2556 f->dump_int("num_caps", s->caps.size());
2557
2558 f->dump_string("state", s->get_state_name());
2559 if (s->is_open() || s->is_stale()) {
2560 f->dump_unsigned("request_load_avg", s->get_load_avg());
2561 }
2562 f->dump_float("uptime", s->get_session_uptime());
2563 f->dump_int("replay_requests", is_clientreplay() ? s->get_request_count() : 0);
2564 f->dump_unsigned("completed_requests", s->get_num_completed_requests());
2565 f->dump_bool("reconnecting", server->waiting_for_reconnect(p->first.num()));
2566 f->dump_stream("inst") << s->info.inst;
2567 f->open_object_section("client_metadata");
2568 for (map<string, string>::const_iterator i = s->info.client_metadata.begin();
2569 i != s->info.client_metadata.end(); ++i) {
2570 f->dump_string(i->first.c_str(), i->second);
2571 }
2572 f->close_section(); // client_metadata
2573 f->close_section(); //session
2574 }
2575 f->close_section(); //sessions
2576 }
2577
2578 void MDSRank::command_scrub_path(Formatter *f, boost::string_view path, vector<string>& scrubop_vec)
2579 {
2580 bool force = false;
2581 bool recursive = false;
2582 bool repair = false;
2583 for (vector<string>::iterator i = scrubop_vec.begin() ; i != scrubop_vec.end(); ++i) {
2584 if (*i == "force")
2585 force = true;
2586 else if (*i == "recursive")
2587 recursive = true;
2588 else if (*i == "repair")
2589 repair = true;
2590 }
2591 C_SaferCond scond;
2592 {
2593 Mutex::Locker l(mds_lock);
2594 mdcache->enqueue_scrub(path, "", force, recursive, repair, f, &scond);
2595 }
2596 scond.wait();
2597 // scrub_dentry() finishers will dump the data for us; we're done!
2598 }
2599
2600 void MDSRank::command_tag_path(Formatter *f,
2601 boost::string_view path, boost::string_view tag)
2602 {
2603 C_SaferCond scond;
2604 {
2605 Mutex::Locker l(mds_lock);
2606 mdcache->enqueue_scrub(path, tag, true, true, false, f, &scond);
2607 }
2608 scond.wait();
2609 }
2610
2611 void MDSRank::command_flush_path(Formatter *f, boost::string_view path)
2612 {
2613 C_SaferCond scond;
2614 {
2615 Mutex::Locker l(mds_lock);
2616 mdcache->flush_dentry(path, &scond);
2617 }
2618 int r = scond.wait();
2619 f->open_object_section("results");
2620 f->dump_int("return_code", r);
2621 f->close_section(); // results
2622 }
2623
2624 // synchronous wrapper around "journal flush" asynchronous context
2625 // execution.
2626 void MDSRank::command_flush_journal(Formatter *f) {
2627 ceph_assert(f != NULL);
2628
2629 C_SaferCond cond;
2630 std::stringstream ss;
2631
2632 {
2633 Mutex::Locker locker(mds_lock);
2634 C_Flush_Journal *flush_journal = new C_Flush_Journal(mdcache, mdlog, this, &ss, &cond);
2635 flush_journal->send();
2636 }
2637 int r = cond.wait();
2638
2639 f->open_object_section("result");
2640 f->dump_string("message", ss.str());
2641 f->dump_int("return_code", r);
2642 f->close_section();
2643 }
2644
2645 void MDSRank::command_get_subtrees(Formatter *f)
2646 {
2647 assert(f != NULL);
2648 Mutex::Locker l(mds_lock);
2649
2650 std::list<CDir*> subtrees;
2651 mdcache->list_subtrees(subtrees);
2652
2653 f->open_array_section("subtrees");
2654 for (std::list<CDir*>::iterator i = subtrees.begin(); i != subtrees.end(); ++i) {
2655 const CDir *dir = *i;
2656
2657 f->open_object_section("subtree");
2658 {
2659 f->dump_bool("is_auth", dir->is_auth());
2660 f->dump_int("auth_first", dir->get_dir_auth().first);
2661 f->dump_int("auth_second", dir->get_dir_auth().second);
2662 f->dump_int("export_pin", dir->inode->get_export_pin());
2663 f->open_object_section("dir");
2664 dir->dump(f);
2665 f->close_section();
2666 }
2667 f->close_section();
2668 }
2669 f->close_section();
2670 }
2671
2672
2673 void MDSRank::command_export_dir(Formatter *f,
2674 boost::string_view path,
2675 mds_rank_t target)
2676 {
2677 int r = _command_export_dir(path, target);
2678 f->open_object_section("results");
2679 f->dump_int("return_code", r);
2680 f->close_section(); // results
2681 }
2682
2683 int MDSRank::_command_export_dir(
2684 boost::string_view path,
2685 mds_rank_t target)
2686 {
2687 Mutex::Locker l(mds_lock);
2688 filepath fp(path);
2689
2690 if (target == whoami || !mdsmap->is_up(target) || !mdsmap->is_in(target)) {
2691 derr << "bad MDS target " << target << dendl;
2692 return -ENOENT;
2693 }
2694
2695 CInode *in = mdcache->cache_traverse(fp);
2696 if (!in) {
2697 derr << "Bath path '" << path << "'" << dendl;
2698 return -ENOENT;
2699 }
2700 CDir *dir = in->get_dirfrag(frag_t());
2701 if (!dir || !(dir->is_auth())) {
2702 derr << "bad export_dir path dirfrag frag_t() or dir not auth" << dendl;
2703 return -EINVAL;
2704 }
2705
2706 mdcache->migrator->export_dir(dir, target);
2707 return 0;
2708 }
2709
2710 CDir *MDSRank::_command_dirfrag_get(
2711 const cmdmap_t &cmdmap,
2712 std::ostream &ss)
2713 {
2714 std::string path;
2715 bool got = cmd_getval(g_ceph_context, cmdmap, "path", path);
2716 if (!got) {
2717 ss << "missing path argument";
2718 return NULL;
2719 }
2720
2721 std::string frag_str;
2722 if (!cmd_getval(g_ceph_context, cmdmap, "frag", frag_str)) {
2723 ss << "missing frag argument";
2724 return NULL;
2725 }
2726
2727 CInode *in = mdcache->cache_traverse(filepath(path.c_str()));
2728 if (!in) {
2729 // TODO really we should load something in if it's not in cache,
2730 // but the infrastructure is harder, and we might still be unable
2731 // to act on it if someone else is auth.
2732 ss << "directory '" << path << "' inode not in cache";
2733 return NULL;
2734 }
2735
2736 frag_t fg;
2737
2738 if (!fg.parse(frag_str.c_str())) {
2739 ss << "frag " << frag_str << " failed to parse";
2740 return NULL;
2741 }
2742
2743 CDir *dir = in->get_dirfrag(fg);
2744 if (!dir) {
2745 ss << "frag 0x" << std::hex << in->ino() << "/" << fg << " not in cache ("
2746 "use `dirfrag ls` to see if it should exist)";
2747 return NULL;
2748 }
2749
2750 if (!dir->is_auth()) {
2751 ss << "frag " << dir->dirfrag() << " not auth (auth = "
2752 << dir->authority() << ")";
2753 return NULL;
2754 }
2755
2756 return dir;
2757 }
2758
2759 bool MDSRank::command_dirfrag_split(
2760 cmdmap_t cmdmap,
2761 std::ostream &ss)
2762 {
2763 Mutex::Locker l(mds_lock);
2764 if (!mdsmap->allows_dirfrags()) {
2765 ss << "dirfrags are disallowed by the mds map!";
2766 return false;
2767 }
2768
2769 int64_t by = 0;
2770 if (!cmd_getval(g_ceph_context, cmdmap, "bits", by)) {
2771 ss << "missing bits argument";
2772 return false;
2773 }
2774
2775 if (by <= 0) {
2776 ss << "must split by >0 bits";
2777 return false;
2778 }
2779
2780 CDir *dir = _command_dirfrag_get(cmdmap, ss);
2781 if (!dir) {
2782 return false;
2783 }
2784
2785 mdcache->split_dir(dir, by);
2786
2787 return true;
2788 }
2789
2790 bool MDSRank::command_dirfrag_merge(
2791 cmdmap_t cmdmap,
2792 std::ostream &ss)
2793 {
2794 Mutex::Locker l(mds_lock);
2795 std::string path;
2796 bool got = cmd_getval(g_ceph_context, cmdmap, "path", path);
2797 if (!got) {
2798 ss << "missing path argument";
2799 return false;
2800 }
2801
2802 std::string frag_str;
2803 if (!cmd_getval(g_ceph_context, cmdmap, "frag", frag_str)) {
2804 ss << "missing frag argument";
2805 return false;
2806 }
2807
2808 CInode *in = mdcache->cache_traverse(filepath(path.c_str()));
2809 if (!in) {
2810 ss << "directory '" << path << "' inode not in cache";
2811 return false;
2812 }
2813
2814 frag_t fg;
2815 if (!fg.parse(frag_str.c_str())) {
2816 ss << "frag " << frag_str << " failed to parse";
2817 return false;
2818 }
2819
2820 mdcache->merge_dir(in, fg);
2821
2822 return true;
2823 }
2824
2825 bool MDSRank::command_dirfrag_ls(
2826 cmdmap_t cmdmap,
2827 std::ostream &ss,
2828 Formatter *f)
2829 {
2830 Mutex::Locker l(mds_lock);
2831 std::string path;
2832 bool got = cmd_getval(g_ceph_context, cmdmap, "path", path);
2833 if (!got) {
2834 ss << "missing path argument";
2835 return false;
2836 }
2837
2838 CInode *in = mdcache->cache_traverse(filepath(path.c_str()));
2839 if (!in) {
2840 ss << "directory inode not in cache";
2841 return false;
2842 }
2843
2844 f->open_array_section("frags");
2845 std::list<frag_t> frags;
2846 // NB using get_leaves_under instead of get_dirfrags to give
2847 // you the list of what dirfrags may exist, not which are in cache
2848 in->dirfragtree.get_leaves_under(frag_t(), frags);
2849 for (std::list<frag_t>::iterator i = frags.begin();
2850 i != frags.end(); ++i) {
2851 f->open_object_section("frag");
2852 f->dump_int("value", i->value());
2853 f->dump_int("bits", i->bits());
2854 std::ostringstream frag_str;
2855 frag_str << std::hex << i->value() << "/" << std::dec << i->bits();
2856 f->dump_string("str", frag_str.str());
2857 f->close_section();
2858 }
2859 f->close_section();
2860
2861 return true;
2862 }
2863
2864 void MDSRank::dump_status(Formatter *f) const
2865 {
2866 if (state == MDSMap::STATE_REPLAY ||
2867 state == MDSMap::STATE_STANDBY_REPLAY) {
2868 mdlog->dump_replay_status(f);
2869 } else if (state == MDSMap::STATE_RESOLVE) {
2870 mdcache->dump_resolve_status(f);
2871 } else if (state == MDSMap::STATE_RECONNECT) {
2872 server->dump_reconnect_status(f);
2873 } else if (state == MDSMap::STATE_REJOIN) {
2874 mdcache->dump_rejoin_status(f);
2875 } else if (state == MDSMap::STATE_CLIENTREPLAY) {
2876 dump_clientreplay_status(f);
2877 }
2878 f->dump_float("rank_uptime", get_uptime().count());
2879 }
2880
2881 void MDSRank::dump_clientreplay_status(Formatter *f) const
2882 {
2883 f->open_object_section("clientreplay_status");
2884 f->dump_unsigned("clientreplay_queue", replay_queue.size());
2885 f->dump_unsigned("active_replay", mdcache->get_num_client_requests());
2886 f->close_section();
2887 }
2888
2889 void MDSRankDispatcher::update_log_config()
2890 {
2891 map<string,string> log_to_monitors;
2892 map<string,string> log_to_syslog;
2893 map<string,string> log_channel;
2894 map<string,string> log_prio;
2895 map<string,string> log_to_graylog;
2896 map<string,string> log_to_graylog_host;
2897 map<string,string> log_to_graylog_port;
2898 uuid_d fsid;
2899 string host;
2900
2901 if (parse_log_client_options(g_ceph_context, log_to_monitors, log_to_syslog,
2902 log_channel, log_prio, log_to_graylog,
2903 log_to_graylog_host, log_to_graylog_port,
2904 fsid, host) == 0)
2905 clog->update_config(log_to_monitors, log_to_syslog,
2906 log_channel, log_prio, log_to_graylog,
2907 log_to_graylog_host, log_to_graylog_port,
2908 fsid, host);
2909 dout(10) << __func__ << " log_to_monitors " << log_to_monitors << dendl;
2910 }
2911
2912 void MDSRank::create_logger()
2913 {
2914 dout(10) << "create_logger" << dendl;
2915 {
2916 PerfCountersBuilder mds_plb(g_ceph_context, "mds", l_mds_first, l_mds_last);
2917
2918 // super useful (high prio) perf stats
2919 mds_plb.add_u64_counter(l_mds_request, "request", "Requests", "req",
2920 PerfCountersBuilder::PRIO_CRITICAL);
2921 mds_plb.add_time_avg(l_mds_reply_latency, "reply_latency", "Reply latency", "rlat",
2922 PerfCountersBuilder::PRIO_CRITICAL);
2923 mds_plb.add_u64(l_mds_inodes, "inodes", "Inodes", "inos",
2924 PerfCountersBuilder::PRIO_CRITICAL);
2925 mds_plb.add_u64_counter(l_mds_forward, "forward", "Forwarding request", "fwd",
2926 PerfCountersBuilder::PRIO_INTERESTING);
2927 mds_plb.add_u64(l_mds_caps, "caps", "Capabilities", "caps",
2928 PerfCountersBuilder::PRIO_INTERESTING);
2929 mds_plb.add_u64_counter(l_mds_exported_inodes, "exported_inodes", "Exported inodes",
2930 "exi", PerfCountersBuilder::PRIO_INTERESTING);
2931 mds_plb.add_u64_counter(l_mds_imported_inodes, "imported_inodes", "Imported inodes",
2932 "imi", PerfCountersBuilder::PRIO_INTERESTING);
2933
2934 // useful dir/inode/subtree stats
2935 mds_plb.set_prio_default(PerfCountersBuilder::PRIO_USEFUL);
2936 mds_plb.add_u64_counter(l_mds_dir_fetch, "dir_fetch", "Directory fetch");
2937 mds_plb.add_u64_counter(l_mds_dir_commit, "dir_commit", "Directory commit");
2938 mds_plb.add_u64_counter(l_mds_dir_split, "dir_split", "Directory split");
2939 mds_plb.add_u64_counter(l_mds_dir_merge, "dir_merge", "Directory merge");
2940 mds_plb.add_u64(l_mds_inode_max, "inode_max", "Max inodes, cache size");
2941 mds_plb.add_u64(l_mds_inodes_pinned, "inodes_pinned", "Inodes pinned");
2942 mds_plb.add_u64(l_mds_inodes_expired, "inodes_expired", "Inodes expired");
2943 mds_plb.add_u64(l_mds_inodes_with_caps, "inodes_with_caps",
2944 "Inodes with capabilities");
2945 mds_plb.add_u64(l_mds_subtrees, "subtrees", "Subtrees");
2946 mds_plb.add_u64(l_mds_load_cent, "load_cent", "Load per cent");
2947
2948 // low prio stats
2949 mds_plb.set_prio_default(PerfCountersBuilder::PRIO_DEBUGONLY);
2950 mds_plb.add_u64_counter(l_mds_reply, "reply", "Replies");
2951 mds_plb.add_u64(l_mds_inodes_top, "inodes_top", "Inodes on top");
2952 mds_plb.add_u64(l_mds_inodes_bottom, "inodes_bottom", "Inodes on bottom");
2953 mds_plb.add_u64(
2954 l_mds_inodes_pin_tail, "inodes_pin_tail", "Inodes on pin tail");
2955 mds_plb.add_u64_counter(l_mds_traverse, "traverse", "Traverses");
2956 mds_plb.add_u64_counter(l_mds_traverse_hit, "traverse_hit", "Traverse hits");
2957 mds_plb.add_u64_counter(l_mds_traverse_forward, "traverse_forward",
2958 "Traverse forwards");
2959 mds_plb.add_u64_counter(l_mds_traverse_discover, "traverse_discover",
2960 "Traverse directory discovers");
2961 mds_plb.add_u64_counter(l_mds_traverse_dir_fetch, "traverse_dir_fetch",
2962 "Traverse incomplete directory content fetchings");
2963 mds_plb.add_u64_counter(l_mds_traverse_remote_ino, "traverse_remote_ino",
2964 "Traverse remote dentries");
2965 mds_plb.add_u64_counter(l_mds_traverse_lock, "traverse_lock",
2966 "Traverse locks");
2967 mds_plb.add_u64(l_mds_dispatch_queue_len, "q", "Dispatch queue length");
2968 mds_plb.add_u64_counter(l_mds_exported, "exported", "Exports");
2969 mds_plb.add_u64_counter(l_mds_imported, "imported", "Imports");
2970
2971 logger = mds_plb.create_perf_counters();
2972 g_ceph_context->get_perfcounters_collection()->add(logger);
2973 }
2974
2975 {
2976 PerfCountersBuilder mdm_plb(g_ceph_context, "mds_mem", l_mdm_first, l_mdm_last);
2977 mdm_plb.add_u64(l_mdm_ino, "ino", "Inodes", "ino",
2978 PerfCountersBuilder::PRIO_INTERESTING);
2979 mdm_plb.add_u64(l_mdm_dn, "dn", "Dentries", "dn",
2980 PerfCountersBuilder::PRIO_INTERESTING);
2981
2982 mdm_plb.set_prio_default(PerfCountersBuilder::PRIO_USEFUL);
2983 mdm_plb.add_u64_counter(l_mdm_inoa, "ino+", "Inodes opened");
2984 mdm_plb.add_u64_counter(l_mdm_inos, "ino-", "Inodes closed");
2985 mdm_plb.add_u64(l_mdm_dir, "dir", "Directories");
2986 mdm_plb.add_u64_counter(l_mdm_dira, "dir+", "Directories opened");
2987 mdm_plb.add_u64_counter(l_mdm_dirs, "dir-", "Directories closed");
2988 mdm_plb.add_u64_counter(l_mdm_dna, "dn+", "Dentries opened");
2989 mdm_plb.add_u64_counter(l_mdm_dns, "dn-", "Dentries closed");
2990 mdm_plb.add_u64(l_mdm_cap, "cap", "Capabilities");
2991 mdm_plb.add_u64_counter(l_mdm_capa, "cap+", "Capabilities added");
2992 mdm_plb.add_u64_counter(l_mdm_caps, "cap-", "Capabilities removed");
2993 mdm_plb.add_u64(l_mdm_heap, "heap", "Heap size");
2994 mdm_plb.add_u64(l_mdm_buf, "buf", "Buffer size");
2995
2996 mdm_plb.set_prio_default(PerfCountersBuilder::PRIO_DEBUGONLY);
2997 mdm_plb.add_u64(l_mdm_rss, "rss", "RSS");
2998
2999 mlogger = mdm_plb.create_perf_counters();
3000 g_ceph_context->get_perfcounters_collection()->add(mlogger);
3001 }
3002
3003 mdlog->create_logger();
3004 server->create_logger();
3005 purge_queue.create_logger();
3006 sessionmap.register_perfcounters();
3007 mdcache->register_perfcounters();
3008 }
3009
3010 void MDSRank::check_ops_in_flight()
3011 {
3012 vector<string> warnings;
3013 int slow = 0;
3014 if (op_tracker.check_ops_in_flight(warnings, &slow)) {
3015 for (vector<string>::iterator i = warnings.begin();
3016 i != warnings.end();
3017 ++i) {
3018 clog->warn() << *i;
3019 }
3020 }
3021
3022 // set mds slow request count
3023 mds_slow_req_count = slow;
3024 return;
3025 }
3026
3027 void MDSRankDispatcher::handle_osd_map()
3028 {
3029 if (is_active() && snapserver) {
3030 snapserver->check_osd_map(true);
3031 }
3032
3033 server->handle_osd_map();
3034
3035 purge_queue.update_op_limit(*mdsmap);
3036
3037 std::set<entity_addr_t> newly_blacklisted;
3038 objecter->consume_blacklist_events(&newly_blacklisted);
3039 auto epoch = objecter->with_osdmap([](const OSDMap &o){return o.get_epoch();});
3040 dout(4) << "handle_osd_map epoch " << epoch << ", "
3041 << newly_blacklisted.size() << " new blacklist entries" << dendl;
3042 auto victims = server->apply_blacklist(newly_blacklisted);
3043 if (victims) {
3044 set_osd_epoch_barrier(epoch);
3045 }
3046
3047
3048 // By default the objecter only requests OSDMap updates on use,
3049 // we would like to always receive the latest maps in order to
3050 // apply policy based on the FULL flag.
3051 objecter->maybe_request_map();
3052 }
3053
3054 bool MDSRank::evict_client(int64_t session_id,
3055 bool wait, bool blacklist, std::stringstream& err_ss,
3056 Context *on_killed)
3057 {
3058 assert(mds_lock.is_locked_by_me());
3059
3060 // Mutually exclusive args
3061 assert(!(wait && on_killed != nullptr));
3062
3063 if (is_any_replay()) {
3064 err_ss << "MDS is replaying log";
3065 return false;
3066 }
3067
3068 Session *session = sessionmap.get_session(
3069 entity_name_t(CEPH_ENTITY_TYPE_CLIENT, session_id));
3070 if (!session) {
3071 err_ss << "session " << session_id << " not in sessionmap!";
3072 return false;
3073 }
3074
3075 dout(4) << "Preparing blacklist command... (wait=" << wait << ")" << dendl;
3076 stringstream ss;
3077 ss << "{\"prefix\":\"osd blacklist\", \"blacklistop\":\"add\",";
3078 ss << "\"addr\":\"";
3079 ss << session->info.inst.addr;
3080 ss << "\"}";
3081 std::string tmp = ss.str();
3082 std::vector<std::string> cmd = {tmp};
3083
3084 auto kill_client_session = [this, session_id, wait, on_killed](){
3085 assert(mds_lock.is_locked_by_me());
3086 Session *session = sessionmap.get_session(
3087 entity_name_t(CEPH_ENTITY_TYPE_CLIENT, session_id));
3088 if (session) {
3089 if (on_killed || !wait) {
3090 server->kill_session(session, on_killed);
3091 } else {
3092 C_SaferCond on_safe;
3093 server->kill_session(session, &on_safe);
3094
3095 mds_lock.Unlock();
3096 on_safe.wait();
3097 mds_lock.Lock();
3098 }
3099 } else {
3100 dout(1) << "session " << session_id << " was removed while we waited "
3101 "for blacklist" << dendl;
3102
3103 // Even though it wasn't us that removed it, kick our completion
3104 // as the session has been removed.
3105 if (on_killed) {
3106 on_killed->complete(0);
3107 }
3108 }
3109 };
3110
3111 auto apply_blacklist = [this, cmd](std::function<void ()> fn){
3112 assert(mds_lock.is_locked_by_me());
3113
3114 Context *on_blacklist_done = new FunctionContext([this, fn](int r) {
3115 objecter->wait_for_latest_osdmap(
3116 new C_OnFinisher(
3117 new FunctionContext([this, fn](int r) {
3118 Mutex::Locker l(mds_lock);
3119 auto epoch = objecter->with_osdmap([](const OSDMap &o){
3120 return o.get_epoch();
3121 });
3122
3123 set_osd_epoch_barrier(epoch);
3124
3125 fn();
3126 }), finisher)
3127 );
3128 });
3129
3130 dout(4) << "Sending mon blacklist command: " << cmd[0] << dendl;
3131 monc->start_mon_command(cmd, {}, nullptr, nullptr, on_blacklist_done);
3132 };
3133
3134 if (wait) {
3135 if (blacklist) {
3136 C_SaferCond inline_ctx;
3137 apply_blacklist([&inline_ctx](){inline_ctx.complete(0);});
3138 mds_lock.Unlock();
3139 inline_ctx.wait();
3140 mds_lock.Lock();
3141 }
3142
3143 // We dropped mds_lock, so check that session still exists
3144 session = sessionmap.get_session(entity_name_t(CEPH_ENTITY_TYPE_CLIENT,
3145 session_id));
3146 if (!session) {
3147 dout(1) << "session " << session_id << " was removed while we waited "
3148 "for blacklist" << dendl;
3149 return true;
3150 }
3151 kill_client_session();
3152 } else {
3153 if (blacklist) {
3154 apply_blacklist(kill_client_session);
3155 } else {
3156 kill_client_session();
3157 }
3158 }
3159
3160 return true;
3161 }
3162
3163 void MDSRank::bcast_mds_map()
3164 {
3165 dout(7) << "bcast_mds_map " << mdsmap->get_epoch() << dendl;
3166
3167 // share the map with mounted clients
3168 set<Session*> clients;
3169 sessionmap.get_client_session_set(clients);
3170 for (set<Session*>::const_iterator p = clients.begin();
3171 p != clients.end();
3172 ++p)
3173 (*p)->connection->send_message(new MMDSMap(monc->get_fsid(), mdsmap));
3174 last_client_mdsmap_bcast = mdsmap->get_epoch();
3175 }
3176
3177 MDSRankDispatcher::MDSRankDispatcher(
3178 mds_rank_t whoami_,
3179 Mutex &mds_lock_,
3180 LogChannelRef &clog_,
3181 SafeTimer &timer_,
3182 Beacon &beacon_,
3183 MDSMap *& mdsmap_,
3184 Messenger *msgr,
3185 MonClient *monc_,
3186 Context *respawn_hook_,
3187 Context *suicide_hook_)
3188 : MDSRank(whoami_, mds_lock_, clog_, timer_, beacon_, mdsmap_,
3189 msgr, monc_, respawn_hook_, suicide_hook_)
3190 {}
3191
3192 bool MDSRankDispatcher::handle_command(
3193 const cmdmap_t &cmdmap,
3194 MCommand *m,
3195 int *r,
3196 std::stringstream *ds,
3197 std::stringstream *ss,
3198 Context **run_later,
3199 bool *need_reply)
3200 {
3201 assert(r != nullptr);
3202 assert(ds != nullptr);
3203 assert(ss != nullptr);
3204
3205 *need_reply = true;
3206
3207 std::string prefix;
3208 cmd_getval(g_ceph_context, cmdmap, "prefix", prefix);
3209
3210 if (prefix == "session ls" || prefix == "client ls") {
3211 std::vector<std::string> filter_args;
3212 cmd_getval(g_ceph_context, cmdmap, "filters", filter_args);
3213
3214 SessionFilter filter;
3215 *r = filter.parse(filter_args, ss);
3216 if (*r != 0) {
3217 return true;
3218 }
3219
3220 JSONFormatter f(true);
3221 dump_sessions(filter, &f);
3222 f.flush(*ds);
3223 return true;
3224 } else if (prefix == "session evict" || prefix == "client evict") {
3225 std::vector<std::string> filter_args;
3226 cmd_getval(g_ceph_context, cmdmap, "filters", filter_args);
3227
3228 SessionFilter filter;
3229 *r = filter.parse(filter_args, ss);
3230 if (*r != 0) {
3231 return true;
3232 }
3233
3234 evict_clients(filter, m);
3235
3236 *need_reply = false;
3237 return true;
3238 } else if (prefix == "damage ls") {
3239 JSONFormatter f(true);
3240 damage_table.dump(&f);
3241 f.flush(*ds);
3242 return true;
3243 } else if (prefix == "damage rm") {
3244 damage_entry_id_t id = 0;
3245 bool got = cmd_getval(g_ceph_context, cmdmap, "damage_id", (int64_t&)id);
3246 if (!got) {
3247 *r = -EINVAL;
3248 return true;
3249 }
3250
3251 damage_table.erase(id);
3252 return true;
3253 } else if (prefix == "cache drop") {
3254 int64_t timeout;
3255 if (!cmd_getval(g_ceph_context, cmdmap, "timeout", timeout)) {
3256 timeout = 0;
3257 }
3258
3259 JSONFormatter *f = new JSONFormatter(true);
3260 C_MDS_Send_Command_Reply *reply = new C_MDS_Send_Command_Reply(this, m);
3261 Context *on_finish = new FunctionContext([this, f, reply](int r) {
3262 cache_drop_send_reply(f, reply, r);
3263 delete f;
3264 delete reply;
3265 });
3266
3267 *need_reply = false;
3268 *run_later = new C_OnFinisher(
3269 new FunctionContext([this, timeout, f, on_finish](int _) {
3270 command_cache_drop((uint64_t)timeout, f, on_finish);
3271 }), finisher);
3272
3273 return true;
3274 } else {
3275 return false;
3276 }
3277 }
3278
3279 void MDSRank::cache_drop_send_reply(Formatter *f, C_MDS_Send_Command_Reply *reply, int r) {
3280 dout(20) << __func__ << ": r=" << r << dendl;
3281
3282 std::stringstream ds;
3283 std::stringstream ss;
3284 if (r != 0) {
3285 f->flush(ss);
3286 } else {
3287 f->flush(ds);
3288 }
3289
3290 reply->send(r, ss.str(), ds);
3291 }
3292
3293 void MDSRank::command_cache_drop(uint64_t timeout, Formatter *f, Context *on_finish) {
3294 dout(20) << __func__ << dendl;
3295
3296 Mutex::Locker locker(mds_lock);
3297 C_Drop_Cache *request = new C_Drop_Cache(server, mdcache, mdlog, this,
3298 timeout, f, on_finish);
3299 request->send();
3300 }
3301
3302 epoch_t MDSRank::get_osd_epoch() const
3303 {
3304 return objecter->with_osdmap(std::mem_fn(&OSDMap::get_epoch));
3305 }
3306