]> git.proxmox.com Git - ceph.git/blame - ceph/src/mds/MDSRank.cc
import ceph 12.2.12
[ceph.git] / ceph / src / mds / MDSRank.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2015 Red Hat
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
94b18763
FG
15#include <boost/utility/string_view.hpp>
16
7c673cae
FG
17#include "common/debug.h"
18#include "common/errno.h"
19
20#include "messages/MClientRequestForward.h"
21#include "messages/MMDSLoadTargets.h"
22#include "messages/MMDSMap.h"
23#include "messages/MMDSTableRequest.h"
24#include "messages/MCommand.h"
25#include "messages/MCommandReply.h"
26
27#include "MDSDaemon.h"
28#include "MDSMap.h"
29#include "SnapClient.h"
30#include "SnapServer.h"
31#include "MDBalancer.h"
91327a77 32#include "Migrator.h"
7c673cae 33#include "Locker.h"
7c673cae
FG
34#include "InoTable.h"
35#include "mon/MonClient.h"
36#include "common/HeartbeatMap.h"
37#include "ScrubStack.h"
38
39
40#include "MDSRank.h"
41
42#define dout_context g_ceph_context
43#define dout_subsys ceph_subsys_mds
44#undef dout_prefix
45#define dout_prefix *_dout << "mds." << whoami << '.' << incarnation << ' '
46
f64942e4
AA
47class C_Flush_Journal : public MDSInternalContext {
48public:
49 C_Flush_Journal(MDCache *mdcache, MDLog *mdlog, MDSRank *mds,
50 std::ostream *ss, Context *on_finish)
51 : MDSInternalContext(mds),
52 mdcache(mdcache), mdlog(mdlog), ss(ss), on_finish(on_finish),
53 whoami(mds->whoami), incarnation(mds->incarnation) {
54 }
55
56 void send() {
57 assert(mds->mds_lock.is_locked());
58
59 dout(20) << __func__ << dendl;
60
61 if (mdcache->is_readonly()) {
62 dout(5) << __func__ << ": read-only FS" << dendl;
63 complete(-EROFS);
64 return;
65 }
66
67 if (!mds->is_active()) {
68 dout(5) << __func__ << ": MDS not active, no-op" << dendl;
69 complete(0);
70 return;
71 }
72
73 flush_mdlog();
74 }
75
76private:
77
78 void flush_mdlog() {
79 dout(20) << __func__ << dendl;
80
81 // I need to seal off the current segment, and then mark all
82 // previous segments for expiry
83 mdlog->start_new_segment();
84
85 Context *ctx = new FunctionContext([this](int r) {
86 handle_flush_mdlog(r);
87 });
88
89 // Flush initially so that all the segments older than our new one
90 // will be elegible for expiry
91 mdlog->flush();
92 mdlog->wait_for_safe(new MDSInternalContextWrapper(mds, ctx));
93 }
94
95 void handle_flush_mdlog(int r) {
96 dout(20) << __func__ << ": r=" << r << dendl;
97
98 if (r != 0) {
99 *ss << "Error " << r << " (" << cpp_strerror(r) << ") while flushing journal";
100 complete(r);
101 return;
102 }
103
104 clear_mdlog();
105 }
106
107 void clear_mdlog() {
108 dout(20) << __func__ << dendl;
109
110 Context *ctx = new FunctionContext([this](int r) {
111 handle_clear_mdlog(r);
112 });
113
114 // Because we may not be the last wait_for_safe context on MDLog,
115 // and subsequent contexts might wake up in the middle of our
116 // later trim_all and interfere with expiry (by e.g. marking
117 // dirs/dentries dirty on previous log segments), we run a second
118 // wait_for_safe here. See #10368
119 mdlog->wait_for_safe(new MDSInternalContextWrapper(mds, ctx));
120 }
121
122 void handle_clear_mdlog(int r) {
123 dout(20) << __func__ << ": r=" << r << dendl;
124
125 if (r != 0) {
126 *ss << "Error " << r << " (" << cpp_strerror(r) << ") while flushing journal";
127 complete(r);
128 return;
129 }
130
131 trim_mdlog();
132 }
133
134 void trim_mdlog() {
135 // Put all the old log segments into expiring or expired state
136 dout(5) << __func__ << ": beginning segment expiry" << dendl;
137
138 int ret = mdlog->trim_all();
139 if (ret != 0) {
140 *ss << "Error " << ret << " (" << cpp_strerror(ret) << ") while trimming log";
141 complete(ret);
142 return;
143 }
144
145 expire_segments();
146 }
147
148 void expire_segments() {
149 dout(20) << __func__ << dendl;
150
151 // Attach contexts to wait for all expiring segments to expire
152 MDSGatherBuilder *expiry_gather = new MDSGatherBuilder(g_ceph_context);
153
154 const auto &expiring_segments = mdlog->get_expiring_segments();
155 for (auto p : expiring_segments) {
156 p->wait_for_expiry(expiry_gather->new_sub());
157 }
158 dout(5) << __func__ << ": waiting for " << expiry_gather->num_subs_created()
159 << " segments to expire" << dendl;
160
161 if (!expiry_gather->has_subs()) {
162 trim_segments();
163 delete expiry_gather;
164 return;
165 }
166
167 Context *ctx = new FunctionContext([this](int r) {
168 handle_expire_segments(r);
169 });
170 expiry_gather->set_finisher(new MDSInternalContextWrapper(mds, ctx));
171 expiry_gather->activate();
172 }
173
174 void handle_expire_segments(int r) {
175 dout(20) << __func__ << ": r=" << r << dendl;
176
177 ceph_assert(r == 0); // MDLog is not allowed to raise errors via
178 // wait_for_expiry
179 trim_segments();
180 }
181
182 void trim_segments() {
183 dout(20) << __func__ << dendl;
184
185 Context *ctx = new C_OnFinisher(new FunctionContext([this](int _) {
186 Mutex::Locker locker(mds->mds_lock);
187 trim_expired_segments();
188 }), mds->finisher);
189 ctx->complete(0);
190 }
191
192 void trim_expired_segments() {
193 dout(5) << __func__ << ": expiry complete, expire_pos/trim_pos is now "
194 << std::hex << mdlog->get_journaler()->get_expire_pos() << "/"
195 << mdlog->get_journaler()->get_trimmed_pos() << dendl;
196
197 // Now everyone I'm interested in is expired
198 mdlog->trim_expired_segments();
199
200 dout(5) << __func__ << ": trim complete, expire_pos/trim_pos is now "
201 << std::hex << mdlog->get_journaler()->get_expire_pos() << "/"
202 << mdlog->get_journaler()->get_trimmed_pos() << dendl;
203
204 write_journal_head();
205 }
206
207 void write_journal_head() {
208 dout(20) << __func__ << dendl;
209
210 Context *ctx = new FunctionContext([this](int r) {
211 Mutex::Locker locker(mds->mds_lock);
212 handle_write_head(r);
213 });
214 // Flush the journal header so that readers will start from after
215 // the flushed region
216 mdlog->get_journaler()->write_head(ctx);
217 }
218
219 void handle_write_head(int r) {
220 if (r != 0) {
221 *ss << "Error " << r << " (" << cpp_strerror(r) << ") while writing header";
222 } else {
223 dout(5) << __func__ << ": write_head complete, all done!" << dendl;
224 }
225
226 complete(r);
227 }
228
229 void finish(int r) override {
230 dout(20) << __func__ << ": r=" << r << dendl;
231 on_finish->complete(r);
232 }
233
234 MDCache *mdcache;
235 MDLog *mdlog;
236 std::ostream *ss;
237 Context *on_finish;
238
239 // so as to use dout
240 mds_rank_t whoami;
241 int incarnation;
242};
243
244class C_Drop_Cache : public MDSInternalContext {
245public:
246 C_Drop_Cache(Server *server, MDCache *mdcache, MDLog *mdlog,
247 MDSRank *mds, uint64_t recall_timeout,
248 Formatter *f, Context *on_finish)
249 : MDSInternalContext(mds),
250 server(server), mdcache(mdcache), mdlog(mdlog),
a8e16298
TL
251 recall_timeout(recall_timeout), recall_start(mono_clock::now()),
252 f(f), on_finish(on_finish),
f64942e4
AA
253 whoami(mds->whoami), incarnation(mds->incarnation) {
254 }
255
256 void send() {
257 // not really a hard requirement here, but lets ensure this in
258 // case we change the logic here.
259 assert(mds->mds_lock.is_locked());
260
261 dout(20) << __func__ << dendl;
a8e16298 262 f->open_object_section("result");
f64942e4
AA
263 recall_client_state();
264 }
265
266private:
267 // context which completes itself (with -ETIMEDOUT) after a specified
268 // timeout or when explicitly completed, whichever comes first. Note
269 // that the context does not detroy itself after completion -- it
270 // needs to be explicitly freed.
271 class C_ContextTimeout : public MDSInternalContext {
272 public:
273 C_ContextTimeout(MDSRank *mds, uint64_t timeout, Context *on_finish)
274 : MDSInternalContext(mds),
275 timeout(timeout),
276 lock("mds::context::timeout", false, true),
277 on_finish(on_finish) {
278 }
279 ~C_ContextTimeout() {
280 ceph_assert(timer_task == nullptr);
281 }
282
283 void start_timer() {
284 if (!timeout) {
285 return;
286 }
287
288 timer_task = new FunctionContext([this](int _) {
289 timer_task = nullptr;
290 complete(-ETIMEDOUT);
291 });
292 mds->timer.add_event_after(timeout, timer_task);
293 }
294
295 void finish(int r) override {
296 Context *ctx = nullptr;
297 {
298 Mutex::Locker locker(lock);
299 std::swap(on_finish, ctx);
300 }
301 if (ctx != nullptr) {
302 ctx->complete(r);
303 }
304 }
305 void complete(int r) override {
306 if (timer_task != nullptr) {
307 mds->timer.cancel_event(timer_task);
308 }
309
310 finish(r);
311 }
312
313 uint64_t timeout;
314 Mutex lock;
315 Context *on_finish = nullptr;
316 Context *timer_task = nullptr;
317 };
318
319 void recall_client_state() {
320 dout(20) << __func__ << dendl;
a8e16298
TL
321 auto now = mono_clock::now();
322 auto duration = std::chrono::duration<double>(now-recall_start).count();
f64942e4
AA
323
324 MDSGatherBuilder *gather = new MDSGatherBuilder(g_ceph_context);
a8e16298
TL
325 auto result = server->recall_client_state(gather, Server::RecallFlags::STEADY);
326 auto& throttled = result.first;
327 auto& count = result.second;
328 dout(10) << __func__
329 << (throttled ? " (throttled)" : "")
330 << " recalled " << count << " caps" << dendl;
331
332 caps_recalled += count;
333 if ((throttled || count > 0) && (recall_timeout == 0 || duration < recall_timeout)) {
334 auto timer = new FunctionContext([this](int _) {
335 recall_client_state();
336 });
337 mds->timer.add_event_after(1.0, timer);
338 } else {
339 if (!gather->has_subs()) {
340 delete gather;
341 return handle_recall_client_state(0);
342 } else if (recall_timeout > 0 && duration > recall_timeout) {
343 delete gather;
344 return handle_recall_client_state(-ETIMEDOUT);
345 } else {
346 uint64_t remaining = (recall_timeout == 0 ? 0 : recall_timeout-duration);
347 C_ContextTimeout *ctx = new C_ContextTimeout(
348 mds, remaining, new FunctionContext([this](int r) {
349 handle_recall_client_state(r);
350 }));
351
352 ctx->start_timer();
353 gather->set_finisher(new MDSInternalContextWrapper(mds, ctx));
354 gather->activate();
355 }
f64942e4 356 }
f64942e4
AA
357 }
358
359 void handle_recall_client_state(int r) {
360 dout(20) << __func__ << ": r=" << r << dendl;
361
362 // client recall section
363 f->open_object_section("client_recall");
364 f->dump_int("return_code", r);
365 f->dump_string("message", cpp_strerror(r));
a8e16298 366 f->dump_int("recalled", caps_recalled);
f64942e4
AA
367 f->close_section();
368
369 // we can still continue after recall timeout
f64942e4
AA
370 flush_journal();
371 }
372
373 void flush_journal() {
374 dout(20) << __func__ << dendl;
375
376 Context *ctx = new FunctionContext([this](int r) {
377 handle_flush_journal(r);
378 });
379
380 C_Flush_Journal *flush_journal = new C_Flush_Journal(mdcache, mdlog, mds, &ss, ctx);
381 flush_journal->send();
382 }
383
384 void handle_flush_journal(int r) {
385 dout(20) << __func__ << ": r=" << r << dendl;
386
387 if (r != 0) {
388 cmd_err(f, ss.str());
389 complete(r);
390 return;
391 }
392
393 // journal flush section
394 f->open_object_section("flush_journal");
395 f->dump_int("return_code", r);
396 f->dump_string("message", ss.str());
397 f->close_section();
398
a8e16298
TL
399 trim_cache();
400 }
401
402 void trim_cache() {
403 dout(20) << __func__ << dendl;
404
405 auto p = mdcache->trim(UINT64_MAX);
406 auto& throttled = p.first;
407 auto& count = p.second;
408 dout(10) << __func__
409 << (throttled ? " (throttled)" : "")
410 << " trimmed " << count << " caps" << dendl;
411 dentries_trimmed += count;
412 if (throttled && count > 0) {
413 auto timer = new FunctionContext([this](int _) {
414 trim_cache();
415 });
416 mds->timer.add_event_after(1.0, timer);
417 } else {
418 cache_status();
419 }
f64942e4
AA
420 }
421
422 void cache_status() {
423 dout(20) << __func__ << dendl;
424
a8e16298
TL
425 f->open_object_section("trim_cache");
426 f->dump_int("trimmed", dentries_trimmed);
427 f->close_section();
428
f64942e4
AA
429 // cache status section
430 mdcache->cache_status(f);
f64942e4
AA
431
432 complete(0);
433 }
434
435 void finish(int r) override {
436 dout(20) << __func__ << ": r=" << r << dendl;
437
a8e16298
TL
438 auto d = std::chrono::duration<double>(mono_clock::now()-recall_start);
439 f->dump_float("duration", d.count());
440
441 f->close_section();
f64942e4
AA
442 on_finish->complete(r);
443 }
444
445 Server *server;
446 MDCache *mdcache;
447 MDLog *mdlog;
448 uint64_t recall_timeout;
a8e16298 449 mono_time recall_start;
f64942e4
AA
450 Formatter *f;
451 Context *on_finish;
452
453 int retval = 0;
454 std::stringstream ss;
a8e16298
TL
455 uint64_t caps_recalled = 0;
456 uint64_t dentries_trimmed = 0;
f64942e4
AA
457
458 // so as to use dout
459 mds_rank_t whoami;
460 int incarnation;
461
462 void cmd_err(Formatter *f, boost::string_view err) {
463 f->reset();
464 f->open_object_section("result");
465 f->dump_string("error", err);
466 f->close_section();
467 }
468};
469
7c673cae
FG
470MDSRank::MDSRank(
471 mds_rank_t whoami_,
472 Mutex &mds_lock_,
473 LogChannelRef &clog_,
474 SafeTimer &timer_,
475 Beacon &beacon_,
476 MDSMap *& mdsmap_,
477 Messenger *msgr,
478 MonClient *monc_,
479 Context *respawn_hook_,
480 Context *suicide_hook_)
481 :
482 whoami(whoami_), incarnation(0),
b32b8144 483 mds_lock(mds_lock_), cct(msgr->cct), clog(clog_), timer(timer_),
7c673cae
FG
484 mdsmap(mdsmap_),
485 objecter(new Objecter(g_ceph_context, msgr, monc_, nullptr, 0, 0)),
486 server(NULL), mdcache(NULL), locker(NULL), mdlog(NULL),
487 balancer(NULL), scrubstack(NULL),
488 damage_table(whoami_),
489 inotable(NULL), snapserver(NULL), snapclient(NULL),
490 sessionmap(this), logger(NULL), mlogger(NULL),
491 op_tracker(g_ceph_context, g_conf->mds_enable_op_tracker,
492 g_conf->osd_num_op_tracker_shard),
493 last_state(MDSMap::STATE_BOOT),
494 state(MDSMap::STATE_BOOT),
495 cluster_degraded(false), stopping(false),
496 purge_queue(g_ceph_context, whoami_,
497 mdsmap_->get_metadata_pool(), objecter,
498 new FunctionContext(
499 [this](int r){
500 // Purge Queue operates inside mds_lock when we're calling into
501 // it, and outside when in background, so must handle both cases.
502 if (mds_lock.is_locked_by_me()) {
f64942e4 503 handle_write_error(r);
7c673cae 504 } else {
f64942e4
AA
505 Mutex::Locker l(mds_lock);
506 handle_write_error(r);
7c673cae
FG
507 }
508 }
509 )
510 ),
511 progress_thread(this), dispatch_depth(0),
512 hb(NULL), last_tid(0), osd_epoch_barrier(0), beacon(beacon_),
513 mds_slow_req_count(0),
514 last_client_mdsmap_bcast(0),
515 messenger(msgr), monc(monc_),
516 respawn_hook(respawn_hook_),
517 suicide_hook(suicide_hook_),
94b18763
FG
518 standby_replaying(false),
519 starttime(mono_clock::now())
7c673cae
FG
520{
521 hb = g_ceph_context->get_heartbeat_map()->add_worker("MDSRank", pthread_self());
522
523 purge_queue.update_op_limit(*mdsmap);
524
525 objecter->unset_honor_osdmap_full();
526
b32b8144 527 finisher = new Finisher(cct);
7c673cae
FG
528
529 mdcache = new MDCache(this, purge_queue);
530 mdlog = new MDLog(this);
531 balancer = new MDBalancer(this, messenger, monc);
532
533 scrubstack = new ScrubStack(mdcache, finisher);
534
535 inotable = new InoTable(this);
536 snapserver = new SnapServer(this, monc);
537 snapclient = new SnapClient(this);
538
539 server = new Server(this);
540 locker = new Locker(this, mdcache);
541
b32b8144
FG
542 op_tracker.set_complaint_and_threshold(cct->_conf->mds_op_complaint_time,
543 cct->_conf->mds_op_log_threshold);
544 op_tracker.set_history_size_and_duration(cct->_conf->mds_op_history_size,
545 cct->_conf->mds_op_history_duration);
7c673cae
FG
546}
547
548MDSRank::~MDSRank()
549{
550 if (hb) {
551 g_ceph_context->get_heartbeat_map()->remove_worker(hb);
552 }
553
554 if (scrubstack) { delete scrubstack; scrubstack = NULL; }
555 if (mdcache) { delete mdcache; mdcache = NULL; }
556 if (mdlog) { delete mdlog; mdlog = NULL; }
557 if (balancer) { delete balancer; balancer = NULL; }
558 if (inotable) { delete inotable; inotable = NULL; }
559 if (snapserver) { delete snapserver; snapserver = NULL; }
560 if (snapclient) { delete snapclient; snapclient = NULL; }
561 if (mdsmap) { delete mdsmap; mdsmap = 0; }
562
563 if (server) { delete server; server = 0; }
564 if (locker) { delete locker; locker = 0; }
565
566 if (logger) {
567 g_ceph_context->get_perfcounters_collection()->remove(logger);
568 delete logger;
569 logger = 0;
570 }
571 if (mlogger) {
572 g_ceph_context->get_perfcounters_collection()->remove(mlogger);
573 delete mlogger;
574 mlogger = 0;
575 }
576
577 delete finisher;
578 finisher = NULL;
579
580 delete suicide_hook;
581 suicide_hook = NULL;
582
583 delete respawn_hook;
584 respawn_hook = NULL;
585
586 delete objecter;
587 objecter = nullptr;
588}
589
590void MDSRankDispatcher::init()
591{
592 objecter->init();
593 messenger->add_dispatcher_head(objecter);
594
595 objecter->start();
596
597 update_log_config();
598 create_logger();
599
600 // Expose the OSDMap (already populated during MDS::init) to anyone
601 // who is interested in it.
602 handle_osd_map();
603
604 progress_thread.create("mds_rank_progr");
605
606 purge_queue.init();
607
608 finisher->start();
609}
610
611void MDSRank::update_targets(utime_t now)
612{
613 // get MonMap's idea of my export_targets
614 const set<mds_rank_t>& map_targets = mdsmap->get_mds_info(get_nodeid()).export_targets;
615
616 dout(20) << "updating export targets, currently " << map_targets.size() << " ranks are targets" << dendl;
617
618 bool send = false;
619 set<mds_rank_t> new_map_targets;
620
621 auto it = export_targets.begin();
622 while (it != export_targets.end()) {
623 mds_rank_t rank = it->first;
624 double val = it->second.get(now);
625 dout(20) << "export target mds." << rank << " value is " << val << " @ " << now << dendl;
626
627 if (val <= 0.01) {
628 dout(15) << "export target mds." << rank << " is no longer an export target" << dendl;
629 export_targets.erase(it++);
630 send = true;
631 continue;
632 }
633 if (!map_targets.count(rank)) {
634 dout(15) << "export target mds." << rank << " not in map's export_targets" << dendl;
635 send = true;
636 }
637 new_map_targets.insert(rank);
638 it++;
639 }
640 if (new_map_targets.size() < map_targets.size()) {
641 dout(15) << "export target map holds stale targets, sending update" << dendl;
642 send = true;
643 }
644
645 if (send) {
646 dout(15) << "updating export_targets, now " << new_map_targets.size() << " ranks are targets" << dendl;
647 MMDSLoadTargets* m = new MMDSLoadTargets(mds_gid_t(monc->get_global_id()), new_map_targets);
648 monc->send_mon_message(m);
649 }
650}
651
652void MDSRank::hit_export_target(utime_t now, mds_rank_t rank, double amount)
653{
654 double rate = g_conf->mds_bal_target_decay;
655 if (amount < 0.0) {
656 amount = 100.0/g_conf->mds_bal_target_decay; /* a good default for "i am trying to keep this export_target active" */
657 }
658 auto em = export_targets.emplace(std::piecewise_construct, std::forward_as_tuple(rank), std::forward_as_tuple(now, DecayRate(rate)));
659 if (em.second) {
660 dout(15) << "hit export target (new) " << amount << " @ " << now << dendl;
661 } else {
662 dout(15) << "hit export target " << amount << " @ " << now << dendl;
663 }
664 em.first->second.hit(now, amount);
665}
666
667void MDSRankDispatcher::tick()
668{
669 heartbeat_reset();
670
671 if (beacon.is_laggy()) {
91327a77 672 dout(1) << "skipping upkeep work because connection to Monitors appears laggy" << dendl;
7c673cae
FG
673 return;
674 }
675
676 check_ops_in_flight();
677
678 // Wake up thread in case we use to be laggy and have waiting_for_nolaggy
679 // messages to progress.
680 progress_thread.signal();
681
682 // make sure mds log flushes, trims periodically
683 mdlog->flush();
684
91327a77
AA
685 // update average session uptime
686 sessionmap.update_average_session_age();
687
7c673cae 688 if (is_active() || is_stopping()) {
a8e16298 689 server->recall_client_state(nullptr, Server::RecallFlags::ENFORCE_MAX);
7c673cae
FG
690 mdcache->trim();
691 mdcache->trim_client_leases();
692 mdcache->check_memory_usage();
693 mdlog->trim(); // NOT during recovery!
694 }
695
696 // log
7c673cae 697 if (logger) {
7c673cae
FG
698 logger->set(l_mds_subtrees, mdcache->num_subtrees());
699
700 mdcache->log_stat();
701 }
702
703 // ...
704 if (is_clientreplay() || is_active() || is_stopping()) {
705 server->find_idle_sessions();
91327a77 706 server->evict_cap_revoke_non_responders();
7c673cae
FG
707 locker->tick();
708 }
709
710 if (is_reconnect())
711 server->reconnect_tick();
712
713 if (is_active()) {
714 balancer->tick();
715 mdcache->find_stale_fragment_freeze();
716 mdcache->migrator->find_stale_export_freeze();
717 if (snapserver)
718 snapserver->check_osd_map(false);
719 }
720
721 if (is_active() || is_stopping()) {
722 update_targets(ceph_clock_now());
723 }
724
725 // shut down?
726 if (is_stopping()) {
727 mdlog->trim();
728 if (mdcache->shutdown_pass()) {
729 uint64_t pq_progress = 0 ;
730 uint64_t pq_total = 0;
731 size_t pq_in_flight = 0;
732 if (!purge_queue.drain(&pq_progress, &pq_total, &pq_in_flight)) {
733 dout(7) << "shutdown_pass=true, but still waiting for purge queue"
734 << dendl;
735 // This takes unbounded time, so we must indicate progress
736 // to the administrator: we do it in a slightly imperfect way
737 // by sending periodic (tick frequency) clog messages while
738 // in this state.
739 clog->info() << "MDS rank " << whoami << " waiting for purge queue ("
740 << std::dec << pq_progress << "/" << pq_total << " " << pq_in_flight
741 << " files purging" << ")";
742 } else {
743 dout(7) << "shutdown_pass=true, finished w/ shutdown, moving to "
744 "down:stopped" << dendl;
745 stopping_done();
746 }
747 }
748 else {
749 dout(7) << "shutdown_pass=false" << dendl;
750 }
751 }
752
753 // Expose ourselves to Beacon to update health indicators
754 beacon.notify_health(this);
755}
756
757void MDSRankDispatcher::shutdown()
758{
759 // It should never be possible for shutdown to get called twice, because
760 // anyone picking up mds_lock checks if stopping is true and drops
761 // out if it is.
762 assert(stopping == false);
763 stopping = true;
764
765 dout(1) << __func__ << ": shutting down rank " << whoami << dendl;
766
767 timer.shutdown();
768
769 // MDLog has to shut down before the finisher, because some of its
770 // threads block on IOs that require finisher to complete.
771 mdlog->shutdown();
772
773 // shut down cache
774 mdcache->shutdown();
775
776 purge_queue.shutdown();
777
778 mds_lock.Unlock();
779 finisher->stop(); // no flushing
780 mds_lock.Lock();
781
31f18b77 782 if (objecter->initialized)
7c673cae
FG
783 objecter->shutdown();
784
785 monc->shutdown();
786
787 op_tracker.on_shutdown();
788
789 progress_thread.shutdown();
790
791 // release mds_lock for finisher/messenger threads (e.g.
792 // MDSDaemon::ms_handle_reset called from Messenger).
793 mds_lock.Unlock();
794
795 // shut down messenger
796 messenger->shutdown();
797
798 mds_lock.Lock();
799
800 // Workaround unclean shutdown: HeartbeatMap will assert if
801 // worker is not removed (as we do in ~MDS), but ~MDS is not
802 // always called after suicide.
803 if (hb) {
804 g_ceph_context->get_heartbeat_map()->remove_worker(hb);
805 hb = NULL;
806 }
807}
808
809/**
810 * Helper for simple callbacks that call a void fn with no args.
811 */
812class C_MDS_VoidFn : public MDSInternalContext
813{
814 typedef void (MDSRank::*fn_ptr)();
815 protected:
816 fn_ptr fn;
817 public:
818 C_MDS_VoidFn(MDSRank *mds_, fn_ptr fn_)
819 : MDSInternalContext(mds_), fn(fn_)
820 {
821 assert(mds_);
822 assert(fn_);
823 }
824
825 void finish(int r) override
826 {
827 (mds->*fn)();
828 }
829};
830
831int64_t MDSRank::get_metadata_pool()
832{
833 return mdsmap->get_metadata_pool();
834}
835
836MDSTableClient *MDSRank::get_table_client(int t)
837{
838 switch (t) {
839 case TABLE_ANCHOR: return NULL;
840 case TABLE_SNAP: return snapclient;
841 default: ceph_abort();
842 }
843}
844
845MDSTableServer *MDSRank::get_table_server(int t)
846{
847 switch (t) {
848 case TABLE_ANCHOR: return NULL;
849 case TABLE_SNAP: return snapserver;
850 default: ceph_abort();
851 }
852}
853
854void MDSRank::suicide()
855{
856 if (suicide_hook) {
857 suicide_hook->complete(0);
858 suicide_hook = NULL;
859 }
860}
861
862void MDSRank::respawn()
863{
864 if (respawn_hook) {
865 respawn_hook->complete(0);
866 respawn_hook = NULL;
867 }
868}
869
870void MDSRank::damaged()
871{
872 assert(whoami != MDS_RANK_NONE);
873 assert(mds_lock.is_locked_by_me());
874
875 beacon.set_want_state(mdsmap, MDSMap::STATE_DAMAGED);
876 monc->flush_log(); // Flush any clog error from before we were called
877 beacon.notify_health(this); // Include latest status in our swan song
878 beacon.send_and_wait(g_conf->mds_mon_shutdown_timeout);
879
880 // It's okay if we timed out and the mon didn't get our beacon, because
881 // another daemon (or ourselves after respawn) will eventually take the
882 // rank and report DAMAGED again when it hits same problem we did.
883
884 respawn(); // Respawn into standby in case mon has other work for us
885}
886
887void MDSRank::damaged_unlocked()
888{
889 Mutex::Locker l(mds_lock);
890 damaged();
891}
892
893void MDSRank::handle_write_error(int err)
894{
895 if (err == -EBLACKLISTED) {
896 derr << "we have been blacklisted (fenced), respawning..." << dendl;
897 respawn();
898 return;
899 }
900
901 if (g_conf->mds_action_on_write_error >= 2) {
902 derr << "unhandled write error " << cpp_strerror(err) << ", suicide..." << dendl;
903 respawn();
904 } else if (g_conf->mds_action_on_write_error == 1) {
905 derr << "unhandled write error " << cpp_strerror(err) << ", force readonly..." << dendl;
906 mdcache->force_readonly();
907 } else {
908 // ignore;
909 derr << "unhandled write error " << cpp_strerror(err) << ", ignore..." << dendl;
910 }
911}
912
913void *MDSRank::ProgressThread::entry()
914{
915 Mutex::Locker l(mds->mds_lock);
916 while (true) {
917 while (!mds->stopping &&
918 mds->finished_queue.empty() &&
919 (mds->waiting_for_nolaggy.empty() || mds->beacon.is_laggy())) {
920 cond.Wait(mds->mds_lock);
921 }
922
923 if (mds->stopping) {
924 break;
925 }
926
927 mds->_advance_queues();
928 }
929
930 return NULL;
931}
932
933
934void MDSRank::ProgressThread::shutdown()
935{
936 assert(mds->mds_lock.is_locked_by_me());
937 assert(mds->stopping);
938
939 if (am_self()) {
940 // Stopping is set, we will fall out of our main loop naturally
941 } else {
942 // Kick the thread to notice mds->stopping, and join it
943 cond.Signal();
944 mds->mds_lock.Unlock();
945 if (is_started())
946 join();
947 mds->mds_lock.Lock();
948 }
949}
950
951bool MDSRankDispatcher::ms_dispatch(Message *m)
952{
f64942e4
AA
953 if (m->get_source().is_client()) {
954 Session *session = static_cast<Session*>(m->get_connection()->get_priv());
955 if (session)
956 session->last_seen = Session::clock::now();
957 }
958
7c673cae 959 inc_dispatch_depth();
f64942e4 960 bool ret = _dispatch(m, true);
7c673cae
FG
961 dec_dispatch_depth();
962 return ret;
963}
964
965/* If this function returns true, it recognizes the message and has taken the
966 * reference. If it returns false, it has done neither. */
967bool MDSRank::_dispatch(Message *m, bool new_msg)
968{
969 if (is_stale_message(m)) {
970 m->put();
971 return true;
972 }
973
974 if (beacon.is_laggy()) {
91327a77 975 dout(5) << " laggy, deferring " << *m << dendl;
7c673cae
FG
976 waiting_for_nolaggy.push_back(m);
977 } else if (new_msg && !waiting_for_nolaggy.empty()) {
91327a77 978 dout(5) << " there are deferred messages, deferring " << *m << dendl;
7c673cae
FG
979 waiting_for_nolaggy.push_back(m);
980 } else {
981 if (!handle_deferrable_message(m)) {
982 dout(0) << "unrecognized message " << *m << dendl;
983 return false;
984 }
985
986 heartbeat_reset();
987 }
988
989 if (dispatch_depth > 1)
990 return true;
991
992 // finish any triggered contexts
993 _advance_queues();
994
995 if (beacon.is_laggy()) {
996 // We've gone laggy during dispatch, don't do any
997 // more housekeeping
998 return true;
999 }
1000
1001 // done with all client replayed requests?
1002 if (is_clientreplay() &&
1003 mdcache->is_open() &&
1004 replay_queue.empty() &&
1005 beacon.get_want_state() == MDSMap::STATE_CLIENTREPLAY) {
1006 int num_requests = mdcache->get_num_client_requests();
1007 dout(10) << " still have " << num_requests << " active replay requests" << dendl;
1008 if (num_requests == 0)
1009 clientreplay_done();
1010 }
1011
1012 // hack: thrash exports
1013 static utime_t start;
1014 utime_t now = ceph_clock_now();
1015 if (start == utime_t())
1016 start = now;
1017 /*double el = now - start;
1018 if (el > 30.0 &&
1019 el < 60.0)*/
1020 for (int i=0; i<g_conf->mds_thrash_exports; i++) {
1021 set<mds_rank_t> s;
1022 if (!is_active()) break;
1023 mdsmap->get_mds_set(s, MDSMap::STATE_ACTIVE);
1024 if (s.size() < 2 || CInode::count() < 10)
1025 break; // need peers for this to work.
1026 if (mdcache->migrator->get_num_exporting() > g_conf->mds_thrash_exports * 5 ||
1027 mdcache->migrator->get_export_queue_size() > g_conf->mds_thrash_exports * 10)
1028 break;
1029
1030 dout(7) << "mds thrashing exports pass " << (i+1) << "/" << g_conf->mds_thrash_exports << dendl;
1031
1032 // pick a random dir inode
1033 CInode *in = mdcache->hack_pick_random_inode();
1034
1035 list<CDir*> ls;
1036 in->get_dirfrags(ls);
1037 if (!ls.empty()) { // must be an open dir.
1038 list<CDir*>::iterator p = ls.begin();
1039 int n = rand() % ls.size();
1040 while (n--)
1041 ++p;
1042 CDir *dir = *p;
1043 if (!dir->get_parent_dir()) continue; // must be linked.
1044 if (!dir->is_auth()) continue; // must be auth.
1045
1046 mds_rank_t dest;
1047 do {
1048 int k = rand() % s.size();
1049 set<mds_rank_t>::iterator p = s.begin();
1050 while (k--) ++p;
1051 dest = *p;
1052 } while (dest == whoami);
1053 mdcache->migrator->export_dir_nicely(dir,dest);
1054 }
1055 }
1056 // hack: thrash fragments
1057 for (int i=0; i<g_conf->mds_thrash_fragments; i++) {
1058 if (!is_active()) break;
1059 if (mdcache->get_num_fragmenting_dirs() > 5 * g_conf->mds_thrash_fragments) break;
1060 dout(7) << "mds thrashing fragments pass " << (i+1) << "/" << g_conf->mds_thrash_fragments << dendl;
1061
1062 // pick a random dir inode
1063 CInode *in = mdcache->hack_pick_random_inode();
1064
1065 list<CDir*> ls;
1066 in->get_dirfrags(ls);
1067 if (ls.empty()) continue; // must be an open dir.
1068 CDir *dir = ls.front();
1069 if (!dir->get_parent_dir()) continue; // must be linked.
1070 if (!dir->is_auth()) continue; // must be auth.
1071 frag_t fg = dir->get_frag();
1072 if (mdsmap->allows_dirfrags()) {
1073 if ((fg == frag_t() || (rand() % (1 << fg.bits()) == 0))) {
1074 mdcache->split_dir(dir, 1);
1075 } else {
1076 balancer->queue_merge(dir);
1077 }
1078 }
1079 }
1080
1081 // hack: force hash root?
1082 /*
1083 if (false &&
1084 mdcache->get_root() &&
1085 mdcache->get_root()->dir &&
1086 !(mdcache->get_root()->dir->is_hashed() ||
1087 mdcache->get_root()->dir->is_hashing())) {
1088 dout(0) << "hashing root" << dendl;
1089 mdcache->migrator->hash_dir(mdcache->get_root()->dir);
1090 }
1091 */
1092
c07f9fc5
FG
1093 update_mlogger();
1094 return true;
1095}
1096
1097void MDSRank::update_mlogger()
1098{
7c673cae
FG
1099 if (mlogger) {
1100 mlogger->set(l_mdm_ino, CInode::count());
1101 mlogger->set(l_mdm_dir, CDir::count());
1102 mlogger->set(l_mdm_dn, CDentry::count());
1103 mlogger->set(l_mdm_cap, Capability::count());
7c673cae
FG
1104 mlogger->set(l_mdm_inoa, CInode::increments());
1105 mlogger->set(l_mdm_inos, CInode::decrements());
1106 mlogger->set(l_mdm_dira, CDir::increments());
1107 mlogger->set(l_mdm_dirs, CDir::decrements());
1108 mlogger->set(l_mdm_dna, CDentry::increments());
1109 mlogger->set(l_mdm_dns, CDentry::decrements());
1110 mlogger->set(l_mdm_capa, Capability::increments());
1111 mlogger->set(l_mdm_caps, Capability::decrements());
7c673cae
FG
1112 mlogger->set(l_mdm_buf, buffer::get_total_alloc());
1113 }
7c673cae
FG
1114}
1115
1116/*
1117 * lower priority messages we defer if we seem laggy
1118 */
1119bool MDSRank::handle_deferrable_message(Message *m)
1120{
1121 int port = m->get_type() & 0xff00;
1122
1123 switch (port) {
1124 case MDS_PORT_CACHE:
1125 ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MDS);
1126 mdcache->dispatch(m);
1127 break;
1128
1129 case MDS_PORT_MIGRATOR:
1130 ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MDS);
1131 mdcache->migrator->dispatch(m);
1132 break;
1133
1134 default:
1135 switch (m->get_type()) {
1136 // SERVER
1137 case CEPH_MSG_CLIENT_SESSION:
1138 case CEPH_MSG_CLIENT_RECONNECT:
1139 ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_CLIENT);
1140 // fall-thru
1141 case CEPH_MSG_CLIENT_REQUEST:
1142 server->dispatch(m);
1143 break;
1144 case MSG_MDS_SLAVE_REQUEST:
1145 ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MDS);
1146 server->dispatch(m);
1147 break;
1148
1149 case MSG_MDS_HEARTBEAT:
1150 ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MDS);
1151 balancer->proc_message(m);
1152 break;
1153
1154 case MSG_MDS_TABLE_REQUEST:
1155 ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MDS);
1156 {
1157 MMDSTableRequest *req = static_cast<MMDSTableRequest*>(m);
1158 if (req->op < 0) {
1159 MDSTableClient *client = get_table_client(req->table);
1160 client->handle_request(req);
1161 } else {
1162 MDSTableServer *server = get_table_server(req->table);
1163 server->handle_request(req);
1164 }
1165 }
1166 break;
1167
1168 case MSG_MDS_LOCK:
1169 case MSG_MDS_INODEFILECAPS:
1170 ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MDS);
1171 locker->dispatch(m);
1172 break;
1173
1174 case CEPH_MSG_CLIENT_CAPS:
1175 case CEPH_MSG_CLIENT_CAPRELEASE:
1176 case CEPH_MSG_CLIENT_LEASE:
1177 ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_CLIENT);
1178 locker->dispatch(m);
1179 break;
1180
1181 default:
1182 return false;
1183 }
1184 }
1185
1186 return true;
1187}
1188
1189/**
1190 * Advance finished_queue and waiting_for_nolaggy.
1191 *
1192 * Usually drain both queues, but may not drain waiting_for_nolaggy
1193 * if beacon is currently laggy.
1194 */
1195void MDSRank::_advance_queues()
1196{
1197 assert(mds_lock.is_locked_by_me());
1198
1199 while (!finished_queue.empty()) {
1200 dout(7) << "mds has " << finished_queue.size() << " queued contexts" << dendl;
1201 dout(10) << finished_queue << dendl;
1202 list<MDSInternalContextBase*> ls;
1203 ls.swap(finished_queue);
1204 while (!ls.empty()) {
1205 dout(10) << " finish " << ls.front() << dendl;
1206 ls.front()->complete(0);
1207 ls.pop_front();
1208
1209 heartbeat_reset();
1210 }
1211 }
1212
1213 while (!waiting_for_nolaggy.empty()) {
1214 // stop if we're laggy now!
1215 if (beacon.is_laggy())
1216 break;
1217
1218 Message *old = waiting_for_nolaggy.front();
1219 waiting_for_nolaggy.pop_front();
1220
1221 if (is_stale_message(old)) {
1222 old->put();
1223 } else {
1224 dout(7) << " processing laggy deferred " << *old << dendl;
1225 if (!handle_deferrable_message(old)) {
1226 dout(0) << "unrecognized message " << *old << dendl;
1227 old->put();
1228 }
1229 }
1230
1231 heartbeat_reset();
1232 }
1233}
1234
1235/**
1236 * Call this when you take mds_lock, or periodically if you're going to
1237 * hold the lock for a long time (e.g. iterating over clients/inodes)
1238 */
1239void MDSRank::heartbeat_reset()
1240{
1241 // Any thread might jump into mds_lock and call us immediately
1242 // after a call to suicide() completes, in which case MDSRank::hb
1243 // has been freed and we are a no-op.
1244 if (!hb) {
1245 assert(stopping);
1246 return;
1247 }
1248
1249 // NB not enabling suicide grace, because the mon takes care of killing us
1250 // (by blacklisting us) when we fail to send beacons, and it's simpler to
1251 // only have one way of dying.
f64942e4
AA
1252 auto grace = g_conf->get_val<double>("mds_heartbeat_grace");
1253 g_ceph_context->get_heartbeat_map()->reset_timeout(hb, grace, 0);
7c673cae
FG
1254}
1255
1256bool MDSRank::is_stale_message(Message *m) const
1257{
1258 // from bad mds?
1259 if (m->get_source().is_mds()) {
1260 mds_rank_t from = mds_rank_t(m->get_source().num());
1261 if (!mdsmap->have_inst(from) ||
1262 mdsmap->get_inst(from) != m->get_source_inst() ||
1263 mdsmap->is_down(from)) {
1264 // bogus mds?
1265 if (m->get_type() == CEPH_MSG_MDS_MAP) {
1266 dout(5) << "got " << *m << " from old/bad/imposter mds " << m->get_source()
1267 << ", but it's an mdsmap, looking at it" << dendl;
1268 } else if (m->get_type() == MSG_MDS_CACHEEXPIRE &&
1269 mdsmap->get_inst(from) == m->get_source_inst()) {
1270 dout(5) << "got " << *m << " from down mds " << m->get_source()
1271 << ", but it's a cache_expire, looking at it" << dendl;
1272 } else {
1273 dout(5) << "got " << *m << " from down/old/bad/imposter mds " << m->get_source()
1274 << ", dropping" << dendl;
1275 return true;
1276 }
1277 }
1278 }
1279 return false;
1280}
1281
94b18763
FG
1282Session *MDSRank::get_session(Message *m)
1283{
1284 Session *session = static_cast<Session *>(m->get_connection()->get_priv());
1285 if (session) {
28e407b8 1286 session->put(); // do not carry ref
94b18763
FG
1287 dout(20) << "get_session have " << session << " " << session->info.inst
1288 << " state " << session->get_state_name() << dendl;
28e407b8
AA
1289 // Check if we've imported an open session since (new sessions start closed)
1290 if (session->is_closed()) {
1291 Session *imported_session = sessionmap.get_session(session->info.inst.name);
1292 if (imported_session && imported_session != session) {
1293 dout(10) << __func__ << " replacing connection bootstrap session " << session << " with imported session " << imported_session << dendl;
1294 imported_session->info.auth_name = session->info.auth_name;
1295 //assert(session->info.auth_name == imported_session->info.auth_name);
1296 assert(session->info.inst == imported_session->info.inst);
1297 imported_session->connection = session->connection;
1298 // send out any queued messages
1299 while (!session->preopen_out_queue.empty()) {
1300 imported_session->connection->send_message(session->preopen_out_queue.front());
1301 session->preopen_out_queue.pop_front();
1302 }
1303 imported_session->auth_caps = session->auth_caps;
1304 assert(session->get_nref() == 1);
1305 imported_session->connection->set_priv(imported_session->get());
f64942e4 1306 imported_session->last_seen = session->last_seen;
28e407b8
AA
1307 session = imported_session;
1308 }
1309 }
94b18763
FG
1310 } else {
1311 dout(20) << "get_session dne for " << m->get_source_inst() << dendl;
1312 }
1313 return session;
1314}
7c673cae
FG
1315
1316void MDSRank::send_message(Message *m, Connection *c)
1317{
1318 assert(c);
1319 c->send_message(m);
1320}
1321
1322
1323void MDSRank::send_message_mds(Message *m, mds_rank_t mds)
1324{
1325 if (!mdsmap->is_up(mds)) {
1326 dout(10) << "send_message_mds mds." << mds << " not up, dropping " << *m << dendl;
1327 m->put();
1328 return;
1329 }
1330
1331 // send mdsmap first?
1332 if (mds != whoami && peer_mdsmap_epoch[mds] < mdsmap->get_epoch()) {
1333 messenger->send_message(new MMDSMap(monc->get_fsid(), mdsmap),
1334 mdsmap->get_inst(mds));
1335 peer_mdsmap_epoch[mds] = mdsmap->get_epoch();
1336 }
1337
1338 // send message
1339 messenger->send_message(m, mdsmap->get_inst(mds));
1340}
1341
1342void MDSRank::forward_message_mds(Message *m, mds_rank_t mds)
1343{
1344 assert(mds != whoami);
1345
1346 // client request?
1347 if (m->get_type() == CEPH_MSG_CLIENT_REQUEST &&
1348 (static_cast<MClientRequest*>(m))->get_source().is_client()) {
1349 MClientRequest *creq = static_cast<MClientRequest*>(m);
1350 creq->inc_num_fwd(); // inc forward counter
1351
1352 /*
1353 * don't actually forward if non-idempotent!
1354 * client has to do it. although the MDS will ignore duplicate requests,
1355 * the affected metadata may migrate, in which case the new authority
1356 * won't have the metareq_id in the completed request map.
1357 */
1358 // NEW: always make the client resend!
1359 bool client_must_resend = true; //!creq->can_forward();
1360
1361 // tell the client where it should go
1362 messenger->send_message(new MClientRequestForward(creq->get_tid(), mds, creq->get_num_fwd(),
1363 client_must_resend),
1364 creq->get_source_inst());
1365
1366 if (client_must_resend) {
1367 m->put();
1368 return;
1369 }
1370 }
1371
1372 // these are the only types of messages we should be 'forwarding'; they
1373 // explicitly encode their source mds, which gets clobbered when we resend
1374 // them here.
1375 assert(m->get_type() == MSG_MDS_DIRUPDATE ||
1376 m->get_type() == MSG_MDS_EXPORTDIRDISCOVER);
1377
1378 // send mdsmap first?
1379 if (peer_mdsmap_epoch[mds] < mdsmap->get_epoch()) {
1380 messenger->send_message(new MMDSMap(monc->get_fsid(), mdsmap),
1381 mdsmap->get_inst(mds));
1382 peer_mdsmap_epoch[mds] = mdsmap->get_epoch();
1383 }
1384
1385 messenger->send_message(m, mdsmap->get_inst(mds));
1386}
1387
1388
1389
1390void MDSRank::send_message_client_counted(Message *m, client_t client)
1391{
1392 Session *session = sessionmap.get_session(entity_name_t::CLIENT(client.v));
1393 if (session) {
1394 send_message_client_counted(m, session);
1395 } else {
1396 dout(10) << "send_message_client_counted no session for client." << client << " " << *m << dendl;
1397 }
1398}
1399
1400void MDSRank::send_message_client_counted(Message *m, Connection *connection)
1401{
1402 Session *session = static_cast<Session *>(connection->get_priv());
1403 if (session) {
1404 session->put(); // do not carry ref
1405 send_message_client_counted(m, session);
1406 } else {
1407 dout(10) << "send_message_client_counted has no session for " << m->get_source_inst() << dendl;
1408 // another Connection took over the Session
1409 }
1410}
1411
1412void MDSRank::send_message_client_counted(Message *m, Session *session)
1413{
1414 version_t seq = session->inc_push_seq();
1415 dout(10) << "send_message_client_counted " << session->info.inst.name << " seq "
1416 << seq << " " << *m << dendl;
1417 if (session->connection) {
1418 session->connection->send_message(m);
1419 } else {
1420 session->preopen_out_queue.push_back(m);
1421 }
1422}
1423
1424void MDSRank::send_message_client(Message *m, Session *session)
1425{
1426 dout(10) << "send_message_client " << session->info.inst << " " << *m << dendl;
1427 if (session->connection) {
1428 session->connection->send_message(m);
1429 } else {
1430 session->preopen_out_queue.push_back(m);
1431 }
1432}
1433
1434/**
1435 * This is used whenever a RADOS operation has been cancelled
1436 * or a RADOS client has been blacklisted, to cause the MDS and
1437 * any clients to wait for this OSD epoch before using any new caps.
1438 *
1439 * See doc/cephfs/eviction
1440 */
1441void MDSRank::set_osd_epoch_barrier(epoch_t e)
1442{
1443 dout(4) << __func__ << ": epoch=" << e << dendl;
1444 osd_epoch_barrier = e;
1445}
1446
7c673cae
FG
1447void MDSRank::retry_dispatch(Message *m)
1448{
1449 inc_dispatch_depth();
1450 _dispatch(m, false);
1451 dec_dispatch_depth();
1452}
1453
91327a77 1454double MDSRank::get_dispatch_queue_max_age(utime_t now) const
7c673cae 1455{
91327a77 1456 return messenger->get_dispatch_queue_max_age(now);
7c673cae
FG
1457}
1458
1459bool MDSRank::is_daemon_stopping() const
1460{
1461 return stopping;
1462}
1463
1464void MDSRank::request_state(MDSMap::DaemonState s)
1465{
1466 dout(3) << "request_state " << ceph_mds_state_name(s) << dendl;
1467 beacon.set_want_state(mdsmap, s);
1468 beacon.send();
1469}
1470
1471
1472class C_MDS_BootStart : public MDSInternalContext {
1473 MDSRank::BootStep nextstep;
1474public:
1475 C_MDS_BootStart(MDSRank *m, MDSRank::BootStep n)
1476 : MDSInternalContext(m), nextstep(n) {}
1477 void finish(int r) override {
1478 mds->boot_start(nextstep, r);
1479 }
1480};
1481
1482
1483void MDSRank::boot_start(BootStep step, int r)
1484{
1485 // Handle errors from previous step
1486 if (r < 0) {
1487 if (is_standby_replay() && (r == -EAGAIN)) {
1488 dout(0) << "boot_start encountered an error EAGAIN"
1489 << ", respawning since we fell behind journal" << dendl;
1490 respawn();
1491 } else if (r == -EINVAL || r == -ENOENT) {
1492 // Invalid or absent data, indicates damaged on-disk structures
1493 clog->error() << "Error loading MDS rank " << whoami << ": "
1494 << cpp_strerror(r);
1495 damaged();
1496 assert(r == 0); // Unreachable, damaged() calls respawn()
f64942e4
AA
1497 } else if (r == -EROFS) {
1498 dout(0) << "boot error forcing transition to read-only; MDS will try to continue" << dendl;
7c673cae
FG
1499 } else {
1500 // Completely unexpected error, give up and die
1501 dout(0) << "boot_start encountered an error, failing" << dendl;
1502 suicide();
1503 return;
1504 }
1505 }
1506
1507 assert(is_starting() || is_any_replay());
1508
1509 switch(step) {
1510 case MDS_BOOT_INITIAL:
1511 {
1512 mdcache->init_layouts();
1513
1514 MDSGatherBuilder gather(g_ceph_context,
1515 new C_MDS_BootStart(this, MDS_BOOT_OPEN_ROOT));
a8e16298 1516 dout(2) << "Booting: " << step << ": opening inotable" << dendl;
7c673cae
FG
1517 inotable->set_rank(whoami);
1518 inotable->load(gather.new_sub());
1519
a8e16298 1520 dout(2) << "Booting: " << step << ": opening sessionmap" << dendl;
7c673cae
FG
1521 sessionmap.set_rank(whoami);
1522 sessionmap.load(gather.new_sub());
1523
a8e16298 1524 dout(2) << "Booting: " << step << ": opening mds log" << dendl;
7c673cae
FG
1525 mdlog->open(gather.new_sub());
1526
3efd9988 1527 if (is_starting()) {
a8e16298 1528 dout(2) << "Booting: " << step << ": opening purge queue" << dendl;
3efd9988
FG
1529 purge_queue.open(new C_IO_Wrapper(this, gather.new_sub()));
1530 } else if (!standby_replaying) {
a8e16298 1531 dout(2) << "Booting: " << step << ": opening purge queue (async)" << dendl;
3efd9988
FG
1532 purge_queue.open(NULL);
1533 }
1534
7c673cae 1535 if (mdsmap->get_tableserver() == whoami) {
a8e16298 1536 dout(2) << "Booting: " << step << ": opening snap table" << dendl;
7c673cae
FG
1537 snapserver->set_rank(whoami);
1538 snapserver->load(gather.new_sub());
1539 }
1540
1541 gather.activate();
1542 }
1543 break;
1544 case MDS_BOOT_OPEN_ROOT:
1545 {
a8e16298 1546 dout(2) << "Booting: " << step << ": loading/discovering base inodes" << dendl;
7c673cae
FG
1547
1548 MDSGatherBuilder gather(g_ceph_context,
1549 new C_MDS_BootStart(this, MDS_BOOT_PREPARE_LOG));
1550
28e407b8
AA
1551 if (is_starting()) {
1552 // load mydir frag for the first log segment (creating subtree map)
1553 mdcache->open_mydir_frag(gather.new_sub());
1554 } else {
1555 mdcache->open_mydir_inode(gather.new_sub());
1556 }
7c673cae 1557
28e407b8
AA
1558 if (whoami == mdsmap->get_root()) { // load root inode off disk if we are auth
1559 mdcache->open_root_inode(gather.new_sub());
1560 } else if (is_any_replay()) {
1561 // replay. make up fake root inode to start with
1562 mdcache->create_root_inode();
1563 }
7c673cae
FG
1564 gather.activate();
1565 }
1566 break;
1567 case MDS_BOOT_PREPARE_LOG:
1568 if (is_any_replay()) {
a8e16298 1569 dout(2) << "Booting: " << step << ": replaying mds log" << dendl;
3efd9988
FG
1570 MDSGatherBuilder gather(g_ceph_context,
1571 new C_MDS_BootStart(this, MDS_BOOT_REPLAY_DONE));
1572
1573 if (!standby_replaying) {
a8e16298 1574 dout(2) << "Booting: " << step << ": waiting for purge queue recovered" << dendl;
3efd9988
FG
1575 purge_queue.wait_for_recovery(new C_IO_Wrapper(this, gather.new_sub()));
1576 }
1577
1578 mdlog->replay(gather.new_sub());
1579 gather.activate();
7c673cae 1580 } else {
a8e16298 1581 dout(2) << "Booting: " << step << ": positioning at end of old mds log" << dendl;
7c673cae
FG
1582 mdlog->append();
1583 starting_done();
1584 }
1585 break;
1586 case MDS_BOOT_REPLAY_DONE:
1587 assert(is_any_replay());
1588
1589 // Sessiontable and inotable should be in sync after replay, validate
1590 // that they are consistent.
1591 validate_sessions();
1592
1593 replay_done();
1594 break;
1595 }
1596}
1597
1598void MDSRank::validate_sessions()
1599{
1600 assert(mds_lock.is_locked_by_me());
28e407b8 1601 bool valid = true;
7c673cae
FG
1602
1603 // Identify any sessions which have state inconsistent with other,
1604 // after they have been loaded from rados during startup.
1605 // Mitigate bugs like: http://tracker.ceph.com/issues/16842
1606 const auto &sessions = sessionmap.get_sessions();
1607 for (const auto &i : sessions) {
1608 Session *session = i.second;
1609 interval_set<inodeno_t> badones;
1610 if (inotable->intersects_free(session->info.prealloc_inos, &badones)) {
28e407b8
AA
1611 clog->error() << "client " << *session
1612 << "loaded with preallocated inodes that are inconsistent with inotable";
1613 valid = false;
7c673cae
FG
1614 }
1615 }
1616
28e407b8
AA
1617 if (!valid) {
1618 damaged();
1619 assert(valid);
7c673cae
FG
1620 }
1621}
1622
1623void MDSRank::starting_done()
1624{
1625 dout(3) << "starting_done" << dendl;
1626 assert(is_starting());
1627 request_state(MDSMap::STATE_ACTIVE);
1628
28e407b8 1629 mdlog->start_new_segment();
7c673cae
FG
1630}
1631
1632
1633void MDSRank::calc_recovery_set()
1634{
1635 // initialize gather sets
1636 set<mds_rank_t> rs;
1637 mdsmap->get_recovery_mds_set(rs);
1638 rs.erase(whoami);
1639 mdcache->set_recovery_set(rs);
1640
1641 dout(1) << " recovery set is " << rs << dendl;
1642}
1643
1644
1645void MDSRank::replay_start()
1646{
1647 dout(1) << "replay_start" << dendl;
1648
1649 if (is_standby_replay())
1650 standby_replaying = true;
1651
1652 calc_recovery_set();
1653
1654 // Check if we need to wait for a newer OSD map before starting
1655 Context *fin = new C_IO_Wrapper(this, new C_MDS_BootStart(this, MDS_BOOT_INITIAL));
1656 bool const ready = objecter->wait_for_map(
1657 mdsmap->get_last_failure_osd_epoch(),
1658 fin);
1659
1660 if (ready) {
1661 delete fin;
1662 boot_start();
1663 } else {
1664 dout(1) << " waiting for osdmap " << mdsmap->get_last_failure_osd_epoch()
1665 << " (which blacklists prior instance)" << dendl;
1666 }
1667}
1668
1669
1670class MDSRank::C_MDS_StandbyReplayRestartFinish : public MDSIOContext {
1671 uint64_t old_read_pos;
1672public:
1673 C_MDS_StandbyReplayRestartFinish(MDSRank *mds_, uint64_t old_read_pos_) :
1674 MDSIOContext(mds_), old_read_pos(old_read_pos_) {}
1675 void finish(int r) override {
1676 mds->_standby_replay_restart_finish(r, old_read_pos);
1677 }
91327a77
AA
1678 void print(ostream& out) const override {
1679 out << "standby_replay_restart";
1680 }
7c673cae
FG
1681};
1682
1683void MDSRank::_standby_replay_restart_finish(int r, uint64_t old_read_pos)
1684{
1685 if (old_read_pos < mdlog->get_journaler()->get_trimmed_pos()) {
1686 dout(0) << "standby MDS fell behind active MDS journal's expire_pos, restarting" << dendl;
1687 respawn(); /* we're too far back, and this is easier than
1688 trying to reset everything in the cache, etc */
1689 } else {
1690 mdlog->standby_trim_segments();
1691 boot_start(MDS_BOOT_PREPARE_LOG, r);
1692 }
1693}
1694
3efd9988
FG
1695class MDSRank::C_MDS_StandbyReplayRestart : public MDSInternalContext {
1696public:
1697 explicit C_MDS_StandbyReplayRestart(MDSRank *m) : MDSInternalContext(m) {}
1698 void finish(int r) override {
1699 assert(!r);
1700 mds->standby_replay_restart();
1701 }
1702};
1703
1704void MDSRank::standby_replay_restart()
7c673cae
FG
1705{
1706 if (standby_replaying) {
1707 /* Go around for another pass of replaying in standby */
f64942e4 1708 dout(5) << "Restarting replay as standby-replay" << dendl;
7c673cae
FG
1709 mdlog->get_journaler()->reread_head_and_probe(
1710 new C_MDS_StandbyReplayRestartFinish(
1711 this,
1712 mdlog->get_journaler()->get_read_pos()));
1713 } else {
1714 /* We are transitioning out of standby: wait for OSD map update
1715 before making final pass */
1716 dout(1) << "standby_replay_restart (final takeover pass)" << dendl;
3efd9988
FG
1717 Context *fin = new C_IO_Wrapper(this, new C_MDS_StandbyReplayRestart(this));
1718 bool ready = objecter->wait_for_map(mdsmap->get_last_failure_osd_epoch(), fin);
7c673cae
FG
1719 if (ready) {
1720 delete fin;
1721 mdlog->get_journaler()->reread_head_and_probe(
1722 new C_MDS_StandbyReplayRestartFinish(
1723 this,
1724 mdlog->get_journaler()->get_read_pos()));
3efd9988
FG
1725
1726 dout(1) << " opening purge queue (async)" << dendl;
1727 purge_queue.open(NULL);
7c673cae
FG
1728 } else {
1729 dout(1) << " waiting for osdmap " << mdsmap->get_last_failure_osd_epoch()
1730 << " (which blacklists prior instance)" << dendl;
1731 }
1732 }
1733}
1734
7c673cae
FG
1735void MDSRank::replay_done()
1736{
f64942e4
AA
1737 if (!standby_replaying) {
1738 dout(1) << "Finished replaying journal" << dendl;
1739 } else {
1740 dout(5) << "Finished replaying journal as standby-replay" << dendl;
1741 }
7c673cae
FG
1742
1743 if (is_standby_replay()) {
1744 // The replay was done in standby state, and we are still in that state
1745 assert(standby_replaying);
1746 dout(10) << "setting replay timer" << dendl;
1747 timer.add_event_after(g_conf->mds_replay_interval,
1748 new C_MDS_StandbyReplayRestart(this));
1749 return;
1750 } else if (standby_replaying) {
1751 // The replay was done in standby state, we have now _left_ that state
1752 dout(10) << " last replay pass was as a standby; making final pass" << dendl;
1753 standby_replaying = false;
1754 standby_replay_restart();
1755 return;
1756 } else {
1757 // Replay is complete, journal read should be up to date
1758 assert(mdlog->get_journaler()->get_read_pos() == mdlog->get_journaler()->get_write_pos());
1759 assert(!is_standby_replay());
1760
1761 // Reformat and come back here
1762 if (mdlog->get_journaler()->get_stream_format() < g_conf->mds_journal_format) {
f64942e4 1763 dout(4) << "reformatting journal on standby-replay->replay transition" << dendl;
7c673cae
FG
1764 mdlog->reopen(new C_MDS_BootStart(this, MDS_BOOT_REPLAY_DONE));
1765 return;
1766 }
1767 }
1768
1769 dout(1) << "making mds journal writeable" << dendl;
1770 mdlog->get_journaler()->set_writeable();
1771 mdlog->get_journaler()->trim_tail();
1772
1773 if (g_conf->mds_wipe_sessions) {
1774 dout(1) << "wiping out client sessions" << dendl;
1775 sessionmap.wipe();
1776 sessionmap.save(new C_MDSInternalNoop);
1777 }
1778 if (g_conf->mds_wipe_ino_prealloc) {
1779 dout(1) << "wiping out ino prealloc from sessions" << dendl;
1780 sessionmap.wipe_ino_prealloc();
1781 sessionmap.save(new C_MDSInternalNoop);
1782 }
1783 if (g_conf->mds_skip_ino) {
1784 inodeno_t i = g_conf->mds_skip_ino;
1785 dout(1) << "skipping " << i << " inodes" << dendl;
1786 inotable->skip_inos(i);
1787 inotable->save(new C_MDSInternalNoop);
1788 }
1789
1790 if (mdsmap->get_num_in_mds() == 1 &&
1791 mdsmap->get_num_failed_mds() == 0) { // just me!
1792 dout(2) << "i am alone, moving to state reconnect" << dendl;
1793 request_state(MDSMap::STATE_RECONNECT);
1794 } else {
1795 dout(2) << "i am not alone, moving to state resolve" << dendl;
1796 request_state(MDSMap::STATE_RESOLVE);
1797 }
1798}
1799
1800void MDSRank::reopen_log()
1801{
1802 dout(1) << "reopen_log" << dendl;
1803 mdcache->rollback_uncommitted_fragments();
1804}
1805
1806
1807void MDSRank::resolve_start()
1808{
1809 dout(1) << "resolve_start" << dendl;
1810
1811 reopen_log();
1812
1813 mdcache->resolve_start(new C_MDS_VoidFn(this, &MDSRank::resolve_done));
1814 finish_contexts(g_ceph_context, waiting_for_resolve);
1815}
1816void MDSRank::resolve_done()
1817{
1818 dout(1) << "resolve_done" << dendl;
1819 request_state(MDSMap::STATE_RECONNECT);
1820}
1821
1822void MDSRank::reconnect_start()
1823{
1824 dout(1) << "reconnect_start" << dendl;
1825
1826 if (last_state == MDSMap::STATE_REPLAY) {
1827 reopen_log();
1828 }
1829
31f18b77
FG
1830 // Drop any blacklisted clients from the SessionMap before going
1831 // into reconnect, so that we don't wait for them.
1832 objecter->enable_blacklist_events();
1833 std::set<entity_addr_t> blacklist;
1834 epoch_t epoch = 0;
1835 objecter->with_osdmap([this, &blacklist, &epoch](const OSDMap& o) {
1836 o.get_blacklist(&blacklist);
1837 epoch = o.get_epoch();
1838 });
1839 auto killed = server->apply_blacklist(blacklist);
1840 dout(4) << "reconnect_start: killed " << killed << " blacklisted sessions ("
1841 << blacklist.size() << " blacklist entries, "
1842 << sessionmap.get_sessions().size() << ")" << dendl;
1843 if (killed) {
1844 set_osd_epoch_barrier(epoch);
1845 }
1846
7c673cae
FG
1847 server->reconnect_clients(new C_MDS_VoidFn(this, &MDSRank::reconnect_done));
1848 finish_contexts(g_ceph_context, waiting_for_reconnect);
1849}
1850void MDSRank::reconnect_done()
1851{
1852 dout(1) << "reconnect_done" << dendl;
1853 request_state(MDSMap::STATE_REJOIN); // move to rejoin state
1854}
1855
1856void MDSRank::rejoin_joint_start()
1857{
1858 dout(1) << "rejoin_joint_start" << dendl;
1859 mdcache->rejoin_send_rejoins();
1860}
1861void MDSRank::rejoin_start()
1862{
1863 dout(1) << "rejoin_start" << dendl;
1864 mdcache->rejoin_start(new C_MDS_VoidFn(this, &MDSRank::rejoin_done));
a8e16298 1865 finish_contexts(g_ceph_context, waiting_for_rejoin);
7c673cae
FG
1866}
1867void MDSRank::rejoin_done()
1868{
1869 dout(1) << "rejoin_done" << dendl;
1870 mdcache->show_subtrees();
1871 mdcache->show_cache();
1872
1873 // funny case: is our cache empty? no subtrees?
1874 if (!mdcache->is_subtrees()) {
1875 if (whoami == 0) {
1876 // The root should always have a subtree!
1877 clog->error() << "No subtrees found for root MDS rank!";
1878 damaged();
1879 assert(mdcache->is_subtrees());
1880 } else {
1881 dout(1) << " empty cache, no subtrees, leaving cluster" << dendl;
1882 request_state(MDSMap::STATE_STOPPED);
1883 }
1884 return;
1885 }
1886
1887 if (replay_queue.empty())
1888 request_state(MDSMap::STATE_ACTIVE);
1889 else
1890 request_state(MDSMap::STATE_CLIENTREPLAY);
1891}
1892
1893void MDSRank::clientreplay_start()
1894{
1895 dout(1) << "clientreplay_start" << dendl;
1896 finish_contexts(g_ceph_context, waiting_for_replay); // kick waiters
1897 mdcache->start_files_to_recover();
1898 queue_one_replay();
1899}
1900
1901bool MDSRank::queue_one_replay()
1902{
1903 if (replay_queue.empty()) {
1904 mdlog->wait_for_safe(new C_MDS_VoidFn(this, &MDSRank::clientreplay_done));
1905 return false;
1906 }
1907 queue_waiter(replay_queue.front());
1908 replay_queue.pop_front();
1909 return true;
1910}
1911
1912void MDSRank::clientreplay_done()
1913{
1914 dout(1) << "clientreplay_done" << dendl;
1915 request_state(MDSMap::STATE_ACTIVE);
1916}
1917
1918void MDSRank::active_start()
1919{
1920 dout(1) << "active_start" << dendl;
1921
28e407b8
AA
1922 if (last_state == MDSMap::STATE_CREATING ||
1923 last_state == MDSMap::STATE_STARTING) {
7c673cae
FG
1924 mdcache->open_root();
1925 }
1926
1927 mdcache->clean_open_file_lists();
1928 mdcache->export_remaining_imported_caps();
1929 finish_contexts(g_ceph_context, waiting_for_replay); // kick waiters
1930 mdcache->start_files_to_recover();
1931
1932 mdcache->reissue_all_caps();
1933 mdcache->activate_stray_manager();
1934
1935 finish_contexts(g_ceph_context, waiting_for_active); // kick waiters
1936}
1937
1938void MDSRank::recovery_done(int oldstate)
1939{
1940 dout(1) << "recovery_done -- successful recovery!" << dendl;
1941 assert(is_clientreplay() || is_active());
1942
1943 // kick snaptable (resent AGREEs)
1944 if (mdsmap->get_tableserver() == whoami) {
1945 set<mds_rank_t> active;
1adf2230 1946 mdsmap->get_mds_set_lower_bound(active, MDSMap::STATE_CLIENTREPLAY);
7c673cae
FG
1947 snapserver->finish_recovery(active);
1948 }
1949
1950 if (oldstate == MDSMap::STATE_CREATING)
1951 return;
1952
1953 mdcache->start_recovered_truncates();
1954 mdcache->do_file_recover();
1955
1956 // tell connected clients
1957 //bcast_mds_map(); // not anymore, they get this from the monitor
1958
1959 mdcache->populate_mydir();
1960}
1961
1962void MDSRank::creating_done()
1963{
1964 dout(1)<< "creating_done" << dendl;
1965 request_state(MDSMap::STATE_ACTIVE);
1966}
1967
1968void MDSRank::boot_create()
1969{
1970 dout(3) << "boot_create" << dendl;
1971
1972 MDSGatherBuilder fin(g_ceph_context, new C_MDS_VoidFn(this, &MDSRank::creating_done));
1973
1974 mdcache->init_layouts();
1975
1976 snapserver->set_rank(whoami);
1977 inotable->set_rank(whoami);
1978 sessionmap.set_rank(whoami);
1979
1980 // start with a fresh journal
1981 dout(10) << "boot_create creating fresh journal" << dendl;
1982 mdlog->create(fin.new_sub());
1983
1984 // open new journal segment, but do not journal subtree map (yet)
1985 mdlog->prepare_new_segment();
1986
1987 if (whoami == mdsmap->get_root()) {
1988 dout(3) << "boot_create creating fresh hierarchy" << dendl;
1989 mdcache->create_empty_hierarchy(fin.get());
1990 }
1991
1992 dout(3) << "boot_create creating mydir hierarchy" << dendl;
1993 mdcache->create_mydir_hierarchy(fin.get());
1994
1995 // fixme: fake out inotable (reset, pretend loaded)
1996 dout(10) << "boot_create creating fresh inotable table" << dendl;
1997 inotable->reset();
1998 inotable->save(fin.new_sub());
1999
2000 // write empty sessionmap
2001 sessionmap.save(fin.new_sub());
2002
2003 // Create empty purge queue
2004 purge_queue.create(new C_IO_Wrapper(this, fin.new_sub()));
2005
2006 // initialize tables
2007 if (mdsmap->get_tableserver() == whoami) {
2008 dout(10) << "boot_create creating fresh snaptable" << dendl;
2009 snapserver->reset();
2010 snapserver->save(fin.new_sub());
2011 }
2012
2013 assert(g_conf->mds_kill_create_at != 1);
2014
2015 // ok now journal it
2016 mdlog->journal_segment_subtree_map(fin.new_sub());
2017 mdlog->flush();
2018
31f18b77
FG
2019 // Usually we do this during reconnect, but creation skips that.
2020 objecter->enable_blacklist_events();
2021
7c673cae
FG
2022 fin.activate();
2023}
2024
2025void MDSRank::stopping_start()
2026{
a8e16298 2027 dout(2) << "Stopping..." << dendl;
7c673cae
FG
2028
2029 if (mdsmap->get_num_in_mds() == 1 && !sessionmap.empty()) {
2030 // we're the only mds up!
2031 dout(0) << "we are the last MDS, and have mounted clients: we cannot flush our journal. suicide!" << dendl;
2032 suicide();
2033 }
2034
2035 mdcache->shutdown_start();
2036}
2037
2038void MDSRank::stopping_done()
2039{
a8e16298 2040 dout(2) << "Finished stopping..." << dendl;
7c673cae
FG
2041
2042 // tell monitor we shut down cleanly.
2043 request_state(MDSMap::STATE_STOPPED);
2044}
2045
2046void MDSRankDispatcher::handle_mds_map(
2047 MMDSMap *m,
2048 MDSMap *oldmap)
2049{
2050 // I am only to be passed MDSMaps in which I hold a rank
2051 assert(whoami != MDS_RANK_NONE);
2052
2053 MDSMap::DaemonState oldstate = state;
2054 mds_gid_t mds_gid = mds_gid_t(monc->get_global_id());
2055 state = mdsmap->get_state_gid(mds_gid);
2056 if (state != oldstate) {
2057 last_state = oldstate;
2058 incarnation = mdsmap->get_inc_gid(mds_gid);
2059 }
2060
2061 version_t epoch = m->get_epoch();
2062
2063 // note source's map version
2064 if (m->get_source().is_mds() &&
2065 peer_mdsmap_epoch[mds_rank_t(m->get_source().num())] < epoch) {
2066 dout(15) << " peer " << m->get_source()
2067 << " has mdsmap epoch >= " << epoch
2068 << dendl;
2069 peer_mdsmap_epoch[mds_rank_t(m->get_source().num())] = epoch;
2070 }
2071
2072 // Validate state transitions while I hold a rank
2073 if (!MDSMap::state_transition_valid(oldstate, state)) {
2074 derr << "Invalid state transition " << ceph_mds_state_name(oldstate)
2075 << "->" << ceph_mds_state_name(state) << dendl;
2076 respawn();
2077 }
2078
2079 if (oldstate != state) {
2080 // update messenger.
2081 if (state == MDSMap::STATE_STANDBY_REPLAY) {
2082 dout(1) << "handle_mds_map i am now mds." << mds_gid << "." << incarnation
2083 << " replaying mds." << whoami << "." << incarnation << dendl;
2084 messenger->set_myname(entity_name_t::MDS(mds_gid));
2085 } else {
2086 dout(1) << "handle_mds_map i am now mds." << whoami << "." << incarnation << dendl;
2087 messenger->set_myname(entity_name_t::MDS(whoami));
2088 }
2089 }
2090
2091 // tell objecter my incarnation
2092 if (objecter->get_client_incarnation() != incarnation)
2093 objecter->set_client_incarnation(incarnation);
2094
2095 // for debug
2096 if (g_conf->mds_dump_cache_on_map)
2097 mdcache->dump_cache();
2098
1adf2230
AA
2099 cluster_degraded = mdsmap->is_degraded();
2100
2101 // mdsmap and oldmap can be discontinuous. failover might happen in the missing mdsmap.
2102 // the 'restart' set tracks ranks that have restarted since the old mdsmap
2103 set<mds_rank_t> restart;
2104 // replaying mds does not communicate with other ranks
2105 if (state >= MDSMap::STATE_RESOLVE) {
2106 // did someone fail?
2107 // new down?
2108 set<mds_rank_t> olddown, down;
2109 oldmap->get_down_mds_set(&olddown);
2110 mdsmap->get_down_mds_set(&down);
2111 for (const auto& r : down) {
2112 if (oldmap->have_inst(r) && olddown.count(r) == 0) {
2113 messenger->mark_down(oldmap->get_inst(r).addr);
2114 handle_mds_failure(r);
2115 }
2116 }
2117
2118 // did someone fail?
2119 // did their addr/inst change?
2120 set<mds_rank_t> up;
2121 mdsmap->get_up_mds_set(up);
2122 for (const auto& r : up) {
2123 auto& info = mdsmap->get_info(r);
2124 if (oldmap->have_inst(r)) {
2125 auto& oldinfo = oldmap->get_info(r);
2126 if (info.inc != oldinfo.inc) {
2127 messenger->mark_down(oldinfo.addr);
2128 if (info.state == MDSMap::STATE_REPLAY ||
2129 info.state == MDSMap::STATE_RESOLVE) {
2130 restart.insert(r);
2131 handle_mds_failure(r);
2132 } else {
2133 assert(info.state == MDSMap::STATE_STARTING ||
2134 info.state == MDSMap::STATE_ACTIVE);
2135 // -> stopped (missing) -> starting -> active
2136 restart.insert(r);
2137 mdcache->migrator->handle_mds_failure_or_stop(r);
2138 }
2139 }
2140 } else {
2141 if (info.state == MDSMap::STATE_REPLAY ||
2142 info.state == MDSMap::STATE_RESOLVE) {
2143 // -> starting/creating (missing) -> active (missing) -> replay -> resolve
2144 restart.insert(r);
2145 handle_mds_failure(r);
2146 } else {
2147 assert(info.state == MDSMap::STATE_CREATING ||
2148 info.state == MDSMap::STATE_STARTING ||
2149 info.state == MDSMap::STATE_ACTIVE);
2150 }
2151 }
2152 }
2153 }
2154
7c673cae
FG
2155 // did it change?
2156 if (oldstate != state) {
2157 dout(1) << "handle_mds_map state change "
2158 << ceph_mds_state_name(oldstate) << " --> "
2159 << ceph_mds_state_name(state) << dendl;
2160 beacon.set_want_state(mdsmap, state);
2161
2162 if (oldstate == MDSMap::STATE_STANDBY_REPLAY) {
2163 dout(10) << "Monitor activated us! Deactivating replay loop" << dendl;
2164 assert (state == MDSMap::STATE_REPLAY);
2165 } else {
2166 // did i just recover?
2167 if ((is_active() || is_clientreplay()) &&
2168 (oldstate == MDSMap::STATE_CREATING ||
2169 oldstate == MDSMap::STATE_REJOIN ||
2170 oldstate == MDSMap::STATE_RECONNECT))
2171 recovery_done(oldstate);
2172
2173 if (is_active()) {
2174 active_start();
2175 } else if (is_any_replay()) {
2176 replay_start();
2177 } else if (is_resolve()) {
2178 resolve_start();
2179 } else if (is_reconnect()) {
2180 reconnect_start();
2181 } else if (is_rejoin()) {
2182 rejoin_start();
2183 } else if (is_clientreplay()) {
2184 clientreplay_start();
2185 } else if (is_creating()) {
2186 boot_create();
2187 } else if (is_starting()) {
2188 boot_start();
2189 } else if (is_stopping()) {
2190 assert(oldstate == MDSMap::STATE_ACTIVE);
2191 stopping_start();
2192 }
2193 }
2194 }
2195
2196 // RESOLVE
2197 // is someone else newly resolving?
1adf2230
AA
2198 if (state >= MDSMap::STATE_RESOLVE) {
2199 if ((!oldmap->is_resolving() || !restart.empty()) && mdsmap->is_resolving()) {
7c673cae
FG
2200 set<mds_rank_t> resolve;
2201 mdsmap->get_mds_set(resolve, MDSMap::STATE_RESOLVE);
2202 dout(10) << " resolve set is " << resolve << dendl;
2203 calc_recovery_set();
2204 mdcache->send_resolves();
2205 }
2206 }
2207
2208 // REJOIN
2209 // is everybody finally rejoining?
1adf2230 2210 if (state >= MDSMap::STATE_REJOIN) {
7c673cae
FG
2211 // did we start?
2212 if (!oldmap->is_rejoining() && mdsmap->is_rejoining())
2213 rejoin_joint_start();
2214
2215 // did we finish?
2216 if (g_conf->mds_dump_cache_after_rejoin &&
2217 oldmap->is_rejoining() && !mdsmap->is_rejoining())
2218 mdcache->dump_cache(); // for DEBUG only
2219
d2e6a577
FG
2220 if (oldstate >= MDSMap::STATE_REJOIN ||
2221 oldstate == MDSMap::STATE_STARTING) {
7c673cae
FG
2222 // ACTIVE|CLIENTREPLAY|REJOIN => we can discover from them.
2223 set<mds_rank_t> olddis, dis;
1adf2230
AA
2224 oldmap->get_mds_set_lower_bound(olddis, MDSMap::STATE_REJOIN);
2225 mdsmap->get_mds_set_lower_bound(dis, MDSMap::STATE_REJOIN);
2226 for (const auto& r : dis) {
2227 if (r == whoami)
2228 continue; // not me
2229 if (!olddis.count(r) || restart.count(r)) { // newly so?
2230 mdcache->kick_discovers(r);
2231 mdcache->kick_open_ino_peers(r);
7c673cae 2232 }
1adf2230 2233 }
7c673cae
FG
2234 }
2235 }
2236
7c673cae
FG
2237 if (oldmap->is_degraded() && !cluster_degraded && state >= MDSMap::STATE_ACTIVE) {
2238 dout(1) << "cluster recovered." << dendl;
2239 auto it = waiting_for_active_peer.find(MDS_RANK_NONE);
2240 if (it != waiting_for_active_peer.end()) {
2241 queue_waiters(it->second);
2242 waiting_for_active_peer.erase(it);
2243 }
2244 }
2245
2246 // did someone go active?
1adf2230
AA
2247 if (state >= MDSMap::STATE_CLIENTREPLAY &&
2248 oldstate >= MDSMap::STATE_CLIENTREPLAY) {
7c673cae 2249 set<mds_rank_t> oldactive, active;
1adf2230
AA
2250 oldmap->get_mds_set_lower_bound(oldactive, MDSMap::STATE_CLIENTREPLAY);
2251 mdsmap->get_mds_set_lower_bound(active, MDSMap::STATE_CLIENTREPLAY);
2252 for (const auto& r : active) {
2253 if (r == whoami)
2254 continue; // not me
2255 if (!oldactive.count(r) || restart.count(r)) // newly so?
2256 handle_mds_recovery(r);
7c673cae
FG
2257 }
2258 }
2259
1adf2230 2260 if (state >= MDSMap::STATE_CLIENTREPLAY) {
7c673cae
FG
2261 // did anyone stop?
2262 set<mds_rank_t> oldstopped, stopped;
2263 oldmap->get_stopped_mds_set(oldstopped);
2264 mdsmap->get_stopped_mds_set(stopped);
1adf2230
AA
2265 for (const auto& r : stopped)
2266 if (oldstopped.count(r) == 0) // newly so?
2267 mdcache->migrator->handle_mds_failure_or_stop(r);
7c673cae
FG
2268 }
2269
2270 {
2271 map<epoch_t,list<MDSInternalContextBase*> >::iterator p = waiting_for_mdsmap.begin();
2272 while (p != waiting_for_mdsmap.end() && p->first <= mdsmap->get_epoch()) {
2273 list<MDSInternalContextBase*> ls;
2274 ls.swap(p->second);
2275 waiting_for_mdsmap.erase(p++);
91327a77 2276 queue_waiters(ls);
7c673cae
FG
2277 }
2278 }
2279
2280 if (is_active()) {
2281 // Before going active, set OSD epoch barrier to latest (so that
2282 // we don't risk handing out caps to clients with old OSD maps that
2283 // might not include barriers from the previous incarnation of this MDS)
2284 set_osd_epoch_barrier(objecter->with_osdmap(
2285 std::mem_fn(&OSDMap::get_epoch)));
2286 }
2287
2288 if (is_active()) {
2289 bool found = false;
2290 MDSMap::mds_info_t info = mdsmap->get_info(whoami);
2291
2292 for (map<mds_gid_t,MDSMap::mds_info_t>::const_iterator p = mdsmap->get_mds_info().begin();
2293 p != mdsmap->get_mds_info().end();
2294 ++p) {
2295 if (p->second.state == MDSMap::STATE_STANDBY_REPLAY &&
2296 (p->second.standby_for_rank == whoami ||(info.name.length() && p->second.standby_for_name == info.name))) {
2297 found = true;
2298 break;
2299 }
2300 if (found)
2301 mdlog->set_write_iohint(0);
2302 else
2303 mdlog->set_write_iohint(CEPH_OSD_OP_FLAG_FADVISE_DONTNEED);
2304 }
2305 }
2306
2307 if (oldmap->get_max_mds() != mdsmap->get_max_mds()) {
2308 purge_queue.update_op_limit(*mdsmap);
2309 }
2310}
2311
2312void MDSRank::handle_mds_recovery(mds_rank_t who)
2313{
2314 dout(5) << "handle_mds_recovery mds." << who << dendl;
2315
2316 mdcache->handle_mds_recovery(who);
2317
2318 if (mdsmap->get_tableserver() == whoami) {
2319 snapserver->handle_mds_recovery(who);
2320 }
2321
2322 queue_waiters(waiting_for_active_peer[who]);
2323 waiting_for_active_peer.erase(who);
2324}
2325
2326void MDSRank::handle_mds_failure(mds_rank_t who)
2327{
2328 if (who == whoami) {
2329 dout(5) << "handle_mds_failure for myself; not doing anything" << dendl;
2330 return;
2331 }
2332 dout(5) << "handle_mds_failure mds." << who << dendl;
2333
2334 mdcache->handle_mds_failure(who);
2335
2336 snapclient->handle_mds_failure(who);
2337}
2338
2339bool MDSRankDispatcher::handle_asok_command(
2340 std::string command, cmdmap_t& cmdmap, Formatter *f,
2341 std::ostream& ss)
2342{
2343 if (command == "dump_ops_in_flight" ||
2344 command == "ops") {
2345 if (!op_tracker.dump_ops_in_flight(f)) {
2346 ss << "op_tracker tracking is not enabled now, so no ops are tracked currently, even those get stuck. \
2347 please enable \"osd_enable_op_tracker\", and the tracker will start to track new ops received afterwards.";
2348 }
2349 } else if (command == "dump_blocked_ops") {
2350 if (!op_tracker.dump_ops_in_flight(f, true)) {
2351 ss << "op_tracker tracking is not enabled now, so no ops are tracked currently, even those get stuck. \
2352 Please enable \"osd_enable_op_tracker\", and the tracker will start to track new ops received afterwards.";
2353 }
2354 } else if (command == "dump_historic_ops") {
2355 if (!op_tracker.dump_historic_ops(f)) {
2356 ss << "op_tracker tracking is not enabled now, so no ops are tracked currently, even those get stuck. \
2357 please enable \"osd_enable_op_tracker\", and the tracker will start to track new ops received afterwards.";
2358 }
2359 } else if (command == "dump_historic_ops_by_duration") {
2360 if (!op_tracker.dump_historic_ops(f, true)) {
2361 ss << "op_tracker tracking is not enabled now, so no ops are tracked currently, even those get stuck. \
2362 please enable \"osd_enable_op_tracker\", and the tracker will start to track new ops received afterwards.";
2363 }
2364 } else if (command == "osdmap barrier") {
2365 int64_t target_epoch = 0;
2366 bool got_val = cmd_getval(g_ceph_context, cmdmap, "target_epoch", target_epoch);
2367
2368 if (!got_val) {
2369 ss << "no target epoch given";
2370 return true;
2371 }
2372
2373 mds_lock.Lock();
2374 set_osd_epoch_barrier(target_epoch);
2375 mds_lock.Unlock();
2376
2377 C_SaferCond cond;
2378 bool already_got = objecter->wait_for_map(target_epoch, &cond);
2379 if (!already_got) {
2380 dout(4) << __func__ << ": waiting for OSD epoch " << target_epoch << dendl;
2381 cond.wait();
2382 }
2383 } else if (command == "session ls") {
2384 Mutex::Locker l(mds_lock);
2385
2386 heartbeat_reset();
2387
2388 dump_sessions(SessionFilter(), f);
2389 } else if (command == "session evict") {
2390 std::string client_id;
2391 const bool got_arg = cmd_getval(g_ceph_context, cmdmap, "client_id", client_id);
2392 if(!got_arg) {
2393 ss << "Invalid client_id specified";
2394 return true;
2395 }
2396
2397 mds_lock.Lock();
31f18b77
FG
2398 std::stringstream dss;
2399 bool evicted = evict_client(strtol(client_id.c_str(), 0, 10), true,
2400 g_conf->mds_session_blacklist_on_evict, dss);
2401 if (!evicted) {
7c673cae
FG
2402 dout(15) << dss.str() << dendl;
2403 ss << dss.str();
2404 }
2405 mds_lock.Unlock();
2406 } else if (command == "scrub_path") {
2407 string path;
2408 vector<string> scrubop_vec;
2409 cmd_getval(g_ceph_context, cmdmap, "scrubops", scrubop_vec);
2410 cmd_getval(g_ceph_context, cmdmap, "path", path);
2411 command_scrub_path(f, path, scrubop_vec);
2412 } else if (command == "tag path") {
2413 string path;
2414 cmd_getval(g_ceph_context, cmdmap, "path", path);
2415 string tag;
2416 cmd_getval(g_ceph_context, cmdmap, "tag", tag);
2417 command_tag_path(f, path, tag);
2418 } else if (command == "flush_path") {
2419 string path;
2420 cmd_getval(g_ceph_context, cmdmap, "path", path);
2421 command_flush_path(f, path);
2422 } else if (command == "flush journal") {
2423 command_flush_journal(f);
2424 } else if (command == "get subtrees") {
2425 command_get_subtrees(f);
2426 } else if (command == "export dir") {
2427 string path;
2428 if(!cmd_getval(g_ceph_context, cmdmap, "path", path)) {
2429 ss << "malformed path";
2430 return true;
2431 }
2432 int64_t rank;
2433 if(!cmd_getval(g_ceph_context, cmdmap, "rank", rank)) {
2434 ss << "malformed rank";
2435 return true;
2436 }
2437 command_export_dir(f, path, (mds_rank_t)rank);
2438 } else if (command == "dump cache") {
2439 Mutex::Locker l(mds_lock);
2440 string path;
31f18b77 2441 int r;
7c673cae 2442 if(!cmd_getval(g_ceph_context, cmdmap, "path", path)) {
31f18b77 2443 r = mdcache->dump_cache(f);
7c673cae 2444 } else {
31f18b77
FG
2445 r = mdcache->dump_cache(path);
2446 }
2447
2448 if (r != 0) {
2449 ss << "Failed to dump cache: " << cpp_strerror(r);
181888fb
FG
2450 f->reset();
2451 }
2452 } else if (command == "cache status") {
2453 Mutex::Locker l(mds_lock);
f64942e4
AA
2454 mdcache->cache_status(f);
2455 } else if (command == "cache drop") {
2456 int64_t timeout;
2457 if (!cmd_getval(g_ceph_context, cmdmap, "timeout", timeout)) {
2458 timeout = 0;
2459 }
2460
2461 C_SaferCond cond;
2462 command_cache_drop((uint64_t)timeout, f, &cond);
2463 int r = cond.wait();
181888fb 2464 if (r != 0) {
f64942e4 2465 f->flush(ss);
7c673cae
FG
2466 }
2467 } else if (command == "dump tree") {
2468 string root;
2469 int64_t depth;
2470 cmd_getval(g_ceph_context, cmdmap, "root", root);
2471 if (!cmd_getval(g_ceph_context, cmdmap, "depth", depth))
2472 depth = -1;
2473 {
2474 Mutex::Locker l(mds_lock);
31f18b77
FG
2475 int r = mdcache->dump_cache(root, depth, f);
2476 if (r != 0) {
2477 ss << "Failed to dump tree: " << cpp_strerror(r);
181888fb 2478 f->reset();
31f18b77 2479 }
7c673cae 2480 }
28e407b8
AA
2481 } else if (command == "dump loads") {
2482 Mutex::Locker l(mds_lock);
2483 int r = balancer->dump_loads(f);
2484 if (r != 0) {
2485 ss << "Failed to dump loads: " << cpp_strerror(r);
2486 f->reset();
2487 }
7c673cae
FG
2488 } else if (command == "force_readonly") {
2489 Mutex::Locker l(mds_lock);
2490 mdcache->force_readonly();
2491 } else if (command == "dirfrag split") {
2492 command_dirfrag_split(cmdmap, ss);
2493 } else if (command == "dirfrag merge") {
2494 command_dirfrag_merge(cmdmap, ss);
2495 } else if (command == "dirfrag ls") {
2496 command_dirfrag_ls(cmdmap, ss, f);
2497 } else {
2498 return false;
2499 }
2500
2501 return true;
2502}
2503
f64942e4 2504class C_MDS_Send_Command_Reply : public MDSInternalContext {
7c673cae
FG
2505protected:
2506 MCommand *m;
2507public:
2508 C_MDS_Send_Command_Reply(MDSRank *_mds, MCommand *_m) :
2509 MDSInternalContext(_mds), m(_m) { m->get(); }
f64942e4
AA
2510
2511 void send(int r, boost::string_view ss) {
2512 std::stringstream ds;
2513 send(r, ss, ds);
2514 }
2515
2516 void send(int r, boost::string_view ss, std::stringstream &ds) {
7c673cae 2517 bufferlist bl;
f64942e4
AA
2518 bl.append(ds);
2519 MDSDaemon::send_command_reply(m, mds, r, bl, ss);
7c673cae
FG
2520 m->put();
2521 }
f64942e4
AA
2522
2523 void finish(int r) override {
7c673cae
FG
2524 send(r, "");
2525 }
2526};
2527
2528/**
2529 * This function drops the mds_lock, so don't do anything with
2530 * MDSRank after calling it (we could have gone into shutdown): just
2531 * send your result back to the calling client and finish.
2532 */
31f18b77 2533void MDSRankDispatcher::evict_clients(const SessionFilter &filter, MCommand *m)
7c673cae
FG
2534{
2535 C_MDS_Send_Command_Reply *reply = new C_MDS_Send_Command_Reply(this, m);
2536
2537 if (is_any_replay()) {
2538 reply->send(-EAGAIN, "MDS is replaying log");
2539 delete reply;
2540 return;
2541 }
2542
2543 std::list<Session*> victims;
2544 const auto sessions = sessionmap.get_sessions();
2545 for (const auto p : sessions) {
2546 if (!p.first.is_client()) {
2547 continue;
2548 }
2549
2550 Session *s = p.second;
2551
2552 if (filter.match(*s, std::bind(&Server::waiting_for_reconnect, server, std::placeholders::_1))) {
2553 victims.push_back(s);
2554 }
2555 }
2556
2557 dout(20) << __func__ << " matched " << victims.size() << " sessions" << dendl;
2558
2559 if (victims.empty()) {
2560 reply->send(0, "");
2561 delete reply;
2562 return;
2563 }
2564
2565 C_GatherBuilder gather(g_ceph_context, reply);
2566 for (const auto s : victims) {
31f18b77
FG
2567 std::stringstream ss;
2568 evict_client(s->info.inst.name.num(), false,
2569 g_conf->mds_session_blacklist_on_evict, ss, gather.new_sub());
7c673cae
FG
2570 }
2571 gather.activate();
2572}
2573
2574void MDSRankDispatcher::dump_sessions(const SessionFilter &filter, Formatter *f) const
2575{
2576 // Dump sessions, decorated with recovery/replay status
2577 f->open_array_section("sessions");
2578 const ceph::unordered_map<entity_name_t, Session*> session_map = sessionmap.get_sessions();
2579 for (ceph::unordered_map<entity_name_t,Session*>::const_iterator p = session_map.begin();
2580 p != session_map.end();
2581 ++p) {
2582 if (!p->first.is_client()) {
2583 continue;
2584 }
2585
2586 Session *s = p->second;
2587
2588 if (!filter.match(*s, std::bind(&Server::waiting_for_reconnect, server, std::placeholders::_1))) {
2589 continue;
2590 }
2591
2592 f->open_object_section("session");
2593 f->dump_int("id", p->first.num());
2594
2595 f->dump_int("num_leases", s->leases.size());
2596 f->dump_int("num_caps", s->caps.size());
2597
2598 f->dump_string("state", s->get_state_name());
91327a77
AA
2599 if (s->is_open() || s->is_stale()) {
2600 f->dump_unsigned("request_load_avg", s->get_load_avg());
2601 }
2602 f->dump_float("uptime", s->get_session_uptime());
7c673cae
FG
2603 f->dump_int("replay_requests", is_clientreplay() ? s->get_request_count() : 0);
2604 f->dump_unsigned("completed_requests", s->get_num_completed_requests());
2605 f->dump_bool("reconnecting", server->waiting_for_reconnect(p->first.num()));
2606 f->dump_stream("inst") << s->info.inst;
2607 f->open_object_section("client_metadata");
2608 for (map<string, string>::const_iterator i = s->info.client_metadata.begin();
2609 i != s->info.client_metadata.end(); ++i) {
2610 f->dump_string(i->first.c_str(), i->second);
2611 }
2612 f->close_section(); // client_metadata
2613 f->close_section(); //session
2614 }
2615 f->close_section(); //sessions
2616}
2617
94b18763 2618void MDSRank::command_scrub_path(Formatter *f, boost::string_view path, vector<string>& scrubop_vec)
7c673cae
FG
2619{
2620 bool force = false;
2621 bool recursive = false;
2622 bool repair = false;
2623 for (vector<string>::iterator i = scrubop_vec.begin() ; i != scrubop_vec.end(); ++i) {
2624 if (*i == "force")
2625 force = true;
2626 else if (*i == "recursive")
2627 recursive = true;
2628 else if (*i == "repair")
2629 repair = true;
2630 }
2631 C_SaferCond scond;
2632 {
2633 Mutex::Locker l(mds_lock);
2634 mdcache->enqueue_scrub(path, "", force, recursive, repair, f, &scond);
2635 }
2636 scond.wait();
2637 // scrub_dentry() finishers will dump the data for us; we're done!
2638}
2639
2640void MDSRank::command_tag_path(Formatter *f,
94b18763 2641 boost::string_view path, boost::string_view tag)
7c673cae
FG
2642{
2643 C_SaferCond scond;
2644 {
2645 Mutex::Locker l(mds_lock);
2646 mdcache->enqueue_scrub(path, tag, true, true, false, f, &scond);
2647 }
2648 scond.wait();
2649}
2650
94b18763 2651void MDSRank::command_flush_path(Formatter *f, boost::string_view path)
7c673cae
FG
2652{
2653 C_SaferCond scond;
2654 {
2655 Mutex::Locker l(mds_lock);
2656 mdcache->flush_dentry(path, &scond);
2657 }
2658 int r = scond.wait();
2659 f->open_object_section("results");
2660 f->dump_int("return_code", r);
2661 f->close_section(); // results
2662}
2663
f64942e4
AA
2664// synchronous wrapper around "journal flush" asynchronous context
2665// execution.
2666void MDSRank::command_flush_journal(Formatter *f) {
2667 ceph_assert(f != NULL);
7c673cae 2668
f64942e4 2669 C_SaferCond cond;
7c673cae 2670 std::stringstream ss;
7c673cae 2671
7c673cae 2672 {
f64942e4
AA
2673 Mutex::Locker locker(mds_lock);
2674 C_Flush_Journal *flush_journal = new C_Flush_Journal(mdcache, mdlog, this, &ss, &cond);
2675 flush_journal->send();
7c673cae 2676 }
f64942e4 2677 int r = cond.wait();
7c673cae 2678
f64942e4
AA
2679 f->open_object_section("result");
2680 f->dump_string("message", ss.str());
2681 f->dump_int("return_code", r);
2682 f->close_section();
7c673cae
FG
2683}
2684
7c673cae
FG
2685void MDSRank::command_get_subtrees(Formatter *f)
2686{
2687 assert(f != NULL);
2688 Mutex::Locker l(mds_lock);
2689
2690 std::list<CDir*> subtrees;
2691 mdcache->list_subtrees(subtrees);
2692
2693 f->open_array_section("subtrees");
2694 for (std::list<CDir*>::iterator i = subtrees.begin(); i != subtrees.end(); ++i) {
2695 const CDir *dir = *i;
2696
2697 f->open_object_section("subtree");
2698 {
2699 f->dump_bool("is_auth", dir->is_auth());
2700 f->dump_int("auth_first", dir->get_dir_auth().first);
2701 f->dump_int("auth_second", dir->get_dir_auth().second);
d2e6a577 2702 f->dump_int("export_pin", dir->inode->get_export_pin());
7c673cae
FG
2703 f->open_object_section("dir");
2704 dir->dump(f);
2705 f->close_section();
2706 }
2707 f->close_section();
2708 }
2709 f->close_section();
2710}
2711
2712
2713void MDSRank::command_export_dir(Formatter *f,
94b18763 2714 boost::string_view path,
7c673cae
FG
2715 mds_rank_t target)
2716{
2717 int r = _command_export_dir(path, target);
2718 f->open_object_section("results");
2719 f->dump_int("return_code", r);
2720 f->close_section(); // results
2721}
2722
2723int MDSRank::_command_export_dir(
94b18763 2724 boost::string_view path,
7c673cae
FG
2725 mds_rank_t target)
2726{
2727 Mutex::Locker l(mds_lock);
94b18763 2728 filepath fp(path);
7c673cae
FG
2729
2730 if (target == whoami || !mdsmap->is_up(target) || !mdsmap->is_in(target)) {
2731 derr << "bad MDS target " << target << dendl;
2732 return -ENOENT;
2733 }
2734
2735 CInode *in = mdcache->cache_traverse(fp);
2736 if (!in) {
2737 derr << "Bath path '" << path << "'" << dendl;
2738 return -ENOENT;
2739 }
2740 CDir *dir = in->get_dirfrag(frag_t());
2741 if (!dir || !(dir->is_auth())) {
2742 derr << "bad export_dir path dirfrag frag_t() or dir not auth" << dendl;
2743 return -EINVAL;
2744 }
2745
2746 mdcache->migrator->export_dir(dir, target);
2747 return 0;
2748}
2749
2750CDir *MDSRank::_command_dirfrag_get(
2751 const cmdmap_t &cmdmap,
2752 std::ostream &ss)
2753{
2754 std::string path;
2755 bool got = cmd_getval(g_ceph_context, cmdmap, "path", path);
2756 if (!got) {
2757 ss << "missing path argument";
2758 return NULL;
2759 }
2760
2761 std::string frag_str;
2762 if (!cmd_getval(g_ceph_context, cmdmap, "frag", frag_str)) {
2763 ss << "missing frag argument";
2764 return NULL;
2765 }
2766
2767 CInode *in = mdcache->cache_traverse(filepath(path.c_str()));
2768 if (!in) {
2769 // TODO really we should load something in if it's not in cache,
2770 // but the infrastructure is harder, and we might still be unable
2771 // to act on it if someone else is auth.
2772 ss << "directory '" << path << "' inode not in cache";
2773 return NULL;
2774 }
2775
2776 frag_t fg;
2777
2778 if (!fg.parse(frag_str.c_str())) {
2779 ss << "frag " << frag_str << " failed to parse";
2780 return NULL;
2781 }
2782
2783 CDir *dir = in->get_dirfrag(fg);
2784 if (!dir) {
2785 ss << "frag 0x" << std::hex << in->ino() << "/" << fg << " not in cache ("
2786 "use `dirfrag ls` to see if it should exist)";
2787 return NULL;
2788 }
2789
2790 if (!dir->is_auth()) {
2791 ss << "frag " << dir->dirfrag() << " not auth (auth = "
2792 << dir->authority() << ")";
2793 return NULL;
2794 }
2795
2796 return dir;
2797}
2798
2799bool MDSRank::command_dirfrag_split(
2800 cmdmap_t cmdmap,
2801 std::ostream &ss)
2802{
2803 Mutex::Locker l(mds_lock);
2804 if (!mdsmap->allows_dirfrags()) {
2805 ss << "dirfrags are disallowed by the mds map!";
2806 return false;
2807 }
2808
2809 int64_t by = 0;
2810 if (!cmd_getval(g_ceph_context, cmdmap, "bits", by)) {
2811 ss << "missing bits argument";
2812 return false;
2813 }
2814
2815 if (by <= 0) {
2816 ss << "must split by >0 bits";
2817 return false;
2818 }
2819
2820 CDir *dir = _command_dirfrag_get(cmdmap, ss);
2821 if (!dir) {
2822 return false;
2823 }
2824
2825 mdcache->split_dir(dir, by);
2826
2827 return true;
2828}
2829
2830bool MDSRank::command_dirfrag_merge(
2831 cmdmap_t cmdmap,
2832 std::ostream &ss)
2833{
2834 Mutex::Locker l(mds_lock);
2835 std::string path;
2836 bool got = cmd_getval(g_ceph_context, cmdmap, "path", path);
2837 if (!got) {
2838 ss << "missing path argument";
2839 return false;
2840 }
2841
2842 std::string frag_str;
2843 if (!cmd_getval(g_ceph_context, cmdmap, "frag", frag_str)) {
2844 ss << "missing frag argument";
2845 return false;
2846 }
2847
2848 CInode *in = mdcache->cache_traverse(filepath(path.c_str()));
2849 if (!in) {
2850 ss << "directory '" << path << "' inode not in cache";
2851 return false;
2852 }
2853
2854 frag_t fg;
2855 if (!fg.parse(frag_str.c_str())) {
2856 ss << "frag " << frag_str << " failed to parse";
2857 return false;
2858 }
2859
2860 mdcache->merge_dir(in, fg);
2861
2862 return true;
2863}
2864
2865bool MDSRank::command_dirfrag_ls(
2866 cmdmap_t cmdmap,
2867 std::ostream &ss,
2868 Formatter *f)
2869{
2870 Mutex::Locker l(mds_lock);
2871 std::string path;
2872 bool got = cmd_getval(g_ceph_context, cmdmap, "path", path);
2873 if (!got) {
2874 ss << "missing path argument";
2875 return false;
2876 }
2877
2878 CInode *in = mdcache->cache_traverse(filepath(path.c_str()));
2879 if (!in) {
2880 ss << "directory inode not in cache";
2881 return false;
2882 }
2883
2884 f->open_array_section("frags");
2885 std::list<frag_t> frags;
2886 // NB using get_leaves_under instead of get_dirfrags to give
2887 // you the list of what dirfrags may exist, not which are in cache
2888 in->dirfragtree.get_leaves_under(frag_t(), frags);
2889 for (std::list<frag_t>::iterator i = frags.begin();
2890 i != frags.end(); ++i) {
2891 f->open_object_section("frag");
2892 f->dump_int("value", i->value());
2893 f->dump_int("bits", i->bits());
2894 std::ostringstream frag_str;
2895 frag_str << std::hex << i->value() << "/" << std::dec << i->bits();
2896 f->dump_string("str", frag_str.str());
2897 f->close_section();
2898 }
2899 f->close_section();
2900
2901 return true;
2902}
2903
2904void MDSRank::dump_status(Formatter *f) const
2905{
2906 if (state == MDSMap::STATE_REPLAY ||
2907 state == MDSMap::STATE_STANDBY_REPLAY) {
2908 mdlog->dump_replay_status(f);
2909 } else if (state == MDSMap::STATE_RESOLVE) {
2910 mdcache->dump_resolve_status(f);
2911 } else if (state == MDSMap::STATE_RECONNECT) {
2912 server->dump_reconnect_status(f);
2913 } else if (state == MDSMap::STATE_REJOIN) {
2914 mdcache->dump_rejoin_status(f);
2915 } else if (state == MDSMap::STATE_CLIENTREPLAY) {
2916 dump_clientreplay_status(f);
2917 }
94b18763 2918 f->dump_float("rank_uptime", get_uptime().count());
7c673cae
FG
2919}
2920
2921void MDSRank::dump_clientreplay_status(Formatter *f) const
2922{
2923 f->open_object_section("clientreplay_status");
2924 f->dump_unsigned("clientreplay_queue", replay_queue.size());
2925 f->dump_unsigned("active_replay", mdcache->get_num_client_requests());
2926 f->close_section();
2927}
2928
2929void MDSRankDispatcher::update_log_config()
2930{
2931 map<string,string> log_to_monitors;
2932 map<string,string> log_to_syslog;
2933 map<string,string> log_channel;
2934 map<string,string> log_prio;
2935 map<string,string> log_to_graylog;
2936 map<string,string> log_to_graylog_host;
2937 map<string,string> log_to_graylog_port;
2938 uuid_d fsid;
2939 string host;
2940
2941 if (parse_log_client_options(g_ceph_context, log_to_monitors, log_to_syslog,
2942 log_channel, log_prio, log_to_graylog,
2943 log_to_graylog_host, log_to_graylog_port,
2944 fsid, host) == 0)
2945 clog->update_config(log_to_monitors, log_to_syslog,
2946 log_channel, log_prio, log_to_graylog,
2947 log_to_graylog_host, log_to_graylog_port,
2948 fsid, host);
2949 dout(10) << __func__ << " log_to_monitors " << log_to_monitors << dendl;
2950}
2951
2952void MDSRank::create_logger()
2953{
2954 dout(10) << "create_logger" << dendl;
2955 {
2956 PerfCountersBuilder mds_plb(g_ceph_context, "mds", l_mds_first, l_mds_last);
2957
91327a77
AA
2958 // super useful (high prio) perf stats
2959 mds_plb.add_u64_counter(l_mds_request, "request", "Requests", "req",
2960 PerfCountersBuilder::PRIO_CRITICAL);
2961 mds_plb.add_time_avg(l_mds_reply_latency, "reply_latency", "Reply latency", "rlat",
2962 PerfCountersBuilder::PRIO_CRITICAL);
2963 mds_plb.add_u64(l_mds_inodes, "inodes", "Inodes", "inos",
2964 PerfCountersBuilder::PRIO_CRITICAL);
2965 mds_plb.add_u64_counter(l_mds_forward, "forward", "Forwarding request", "fwd",
2966 PerfCountersBuilder::PRIO_INTERESTING);
2967 mds_plb.add_u64(l_mds_caps, "caps", "Capabilities", "caps",
2968 PerfCountersBuilder::PRIO_INTERESTING);
2969 mds_plb.add_u64_counter(l_mds_exported_inodes, "exported_inodes", "Exported inodes",
2970 "exi", PerfCountersBuilder::PRIO_INTERESTING);
2971 mds_plb.add_u64_counter(l_mds_imported_inodes, "imported_inodes", "Imported inodes",
2972 "imi", PerfCountersBuilder::PRIO_INTERESTING);
2973
2974 // useful dir/inode/subtree stats
2975 mds_plb.set_prio_default(PerfCountersBuilder::PRIO_USEFUL);
7c673cae
FG
2976 mds_plb.add_u64_counter(l_mds_dir_fetch, "dir_fetch", "Directory fetch");
2977 mds_plb.add_u64_counter(l_mds_dir_commit, "dir_commit", "Directory commit");
2978 mds_plb.add_u64_counter(l_mds_dir_split, "dir_split", "Directory split");
2979 mds_plb.add_u64_counter(l_mds_dir_merge, "dir_merge", "Directory merge");
7c673cae 2980 mds_plb.add_u64(l_mds_inode_max, "inode_max", "Max inodes, cache size");
7c673cae
FG
2981 mds_plb.add_u64(l_mds_inodes_pinned, "inodes_pinned", "Inodes pinned");
2982 mds_plb.add_u64(l_mds_inodes_expired, "inodes_expired", "Inodes expired");
91327a77
AA
2983 mds_plb.add_u64(l_mds_inodes_with_caps, "inodes_with_caps",
2984 "Inodes with capabilities");
7c673cae 2985 mds_plb.add_u64(l_mds_subtrees, "subtrees", "Subtrees");
91327a77 2986 mds_plb.add_u64(l_mds_load_cent, "load_cent", "Load per cent");
7c673cae 2987
91327a77
AA
2988 // low prio stats
2989 mds_plb.set_prio_default(PerfCountersBuilder::PRIO_DEBUGONLY);
2990 mds_plb.add_u64_counter(l_mds_reply, "reply", "Replies");
2991 mds_plb.add_u64(l_mds_inodes_top, "inodes_top", "Inodes on top");
2992 mds_plb.add_u64(l_mds_inodes_bottom, "inodes_bottom", "Inodes on bottom");
2993 mds_plb.add_u64(
2994 l_mds_inodes_pin_tail, "inodes_pin_tail", "Inodes on pin tail");
7c673cae
FG
2995 mds_plb.add_u64_counter(l_mds_traverse, "traverse", "Traverses");
2996 mds_plb.add_u64_counter(l_mds_traverse_hit, "traverse_hit", "Traverse hits");
2997 mds_plb.add_u64_counter(l_mds_traverse_forward, "traverse_forward",
91327a77 2998 "Traverse forwards");
7c673cae 2999 mds_plb.add_u64_counter(l_mds_traverse_discover, "traverse_discover",
91327a77 3000 "Traverse directory discovers");
7c673cae 3001 mds_plb.add_u64_counter(l_mds_traverse_dir_fetch, "traverse_dir_fetch",
91327a77 3002 "Traverse incomplete directory content fetchings");
7c673cae 3003 mds_plb.add_u64_counter(l_mds_traverse_remote_ino, "traverse_remote_ino",
91327a77 3004 "Traverse remote dentries");
7c673cae 3005 mds_plb.add_u64_counter(l_mds_traverse_lock, "traverse_lock",
91327a77 3006 "Traverse locks");
7c673cae 3007 mds_plb.add_u64(l_mds_dispatch_queue_len, "q", "Dispatch queue length");
7c673cae 3008 mds_plb.add_u64_counter(l_mds_exported, "exported", "Exports");
7c673cae 3009 mds_plb.add_u64_counter(l_mds_imported, "imported", "Imports");
91327a77 3010
7c673cae
FG
3011 logger = mds_plb.create_perf_counters();
3012 g_ceph_context->get_perfcounters_collection()->add(logger);
3013 }
3014
3015 {
3016 PerfCountersBuilder mdm_plb(g_ceph_context, "mds_mem", l_mdm_first, l_mdm_last);
b32b8144
FG
3017 mdm_plb.add_u64(l_mdm_ino, "ino", "Inodes", "ino",
3018 PerfCountersBuilder::PRIO_INTERESTING);
91327a77
AA
3019 mdm_plb.add_u64(l_mdm_dn, "dn", "Dentries", "dn",
3020 PerfCountersBuilder::PRIO_INTERESTING);
3021
3022 mdm_plb.set_prio_default(PerfCountersBuilder::PRIO_USEFUL);
7c673cae
FG
3023 mdm_plb.add_u64_counter(l_mdm_inoa, "ino+", "Inodes opened");
3024 mdm_plb.add_u64_counter(l_mdm_inos, "ino-", "Inodes closed");
3025 mdm_plb.add_u64(l_mdm_dir, "dir", "Directories");
3026 mdm_plb.add_u64_counter(l_mdm_dira, "dir+", "Directories opened");
3027 mdm_plb.add_u64_counter(l_mdm_dirs, "dir-", "Directories closed");
7c673cae
FG
3028 mdm_plb.add_u64_counter(l_mdm_dna, "dn+", "Dentries opened");
3029 mdm_plb.add_u64_counter(l_mdm_dns, "dn-", "Dentries closed");
3030 mdm_plb.add_u64(l_mdm_cap, "cap", "Capabilities");
3031 mdm_plb.add_u64_counter(l_mdm_capa, "cap+", "Capabilities added");
3032 mdm_plb.add_u64_counter(l_mdm_caps, "cap-", "Capabilities removed");
7c673cae
FG
3033 mdm_plb.add_u64(l_mdm_heap, "heap", "Heap size");
3034 mdm_plb.add_u64(l_mdm_buf, "buf", "Buffer size");
91327a77
AA
3035
3036 mdm_plb.set_prio_default(PerfCountersBuilder::PRIO_DEBUGONLY);
3037 mdm_plb.add_u64(l_mdm_rss, "rss", "RSS");
3038
7c673cae
FG
3039 mlogger = mdm_plb.create_perf_counters();
3040 g_ceph_context->get_perfcounters_collection()->add(mlogger);
3041 }
3042
3043 mdlog->create_logger();
3044 server->create_logger();
3045 purge_queue.create_logger();
3046 sessionmap.register_perfcounters();
3047 mdcache->register_perfcounters();
3048}
3049
3050void MDSRank::check_ops_in_flight()
3051{
3052 vector<string> warnings;
3053 int slow = 0;
3054 if (op_tracker.check_ops_in_flight(warnings, &slow)) {
3055 for (vector<string>::iterator i = warnings.begin();
3056 i != warnings.end();
3057 ++i) {
3058 clog->warn() << *i;
3059 }
3060 }
3061
3062 // set mds slow request count
3063 mds_slow_req_count = slow;
3064 return;
3065}
3066
3067void MDSRankDispatcher::handle_osd_map()
3068{
3069 if (is_active() && snapserver) {
3070 snapserver->check_osd_map(true);
3071 }
3072
3073 server->handle_osd_map();
3074
3075 purge_queue.update_op_limit(*mdsmap);
3076
31f18b77
FG
3077 std::set<entity_addr_t> newly_blacklisted;
3078 objecter->consume_blacklist_events(&newly_blacklisted);
3079 auto epoch = objecter->with_osdmap([](const OSDMap &o){return o.get_epoch();});
3080 dout(4) << "handle_osd_map epoch " << epoch << ", "
3081 << newly_blacklisted.size() << " new blacklist entries" << dendl;
3082 auto victims = server->apply_blacklist(newly_blacklisted);
3083 if (victims) {
3084 set_osd_epoch_barrier(epoch);
3085 }
3086
3087
7c673cae
FG
3088 // By default the objecter only requests OSDMap updates on use,
3089 // we would like to always receive the latest maps in order to
3090 // apply policy based on the FULL flag.
3091 objecter->maybe_request_map();
3092}
3093
31f18b77
FG
3094bool MDSRank::evict_client(int64_t session_id,
3095 bool wait, bool blacklist, std::stringstream& err_ss,
3096 Context *on_killed)
7c673cae 3097{
31f18b77
FG
3098 assert(mds_lock.is_locked_by_me());
3099
3100 // Mutually exclusive args
3101 assert(!(wait && on_killed != nullptr));
3102
7c673cae
FG
3103 if (is_any_replay()) {
3104 err_ss << "MDS is replaying log";
3105 return false;
3106 }
3107
31f18b77
FG
3108 Session *session = sessionmap.get_session(
3109 entity_name_t(CEPH_ENTITY_TYPE_CLIENT, session_id));
7c673cae
FG
3110 if (!session) {
3111 err_ss << "session " << session_id << " not in sessionmap!";
3112 return false;
3113 }
7c673cae 3114
a8e16298
TL
3115 auto& addr = session->info.inst.addr;
3116 {
3117 std::stringstream ss;
3118 ss << "Evicting " << (blacklist ? "(and blacklisting) " : "")
3119 << "client session " << session_id << " (" << addr << ")";
3120 dout(1) << ss.str() << dendl;
3121 clog->info() << ss.str();
3122 }
3123
31f18b77
FG
3124 dout(4) << "Preparing blacklist command... (wait=" << wait << ")" << dendl;
3125 stringstream ss;
3126 ss << "{\"prefix\":\"osd blacklist\", \"blacklistop\":\"add\",";
3127 ss << "\"addr\":\"";
a8e16298 3128 ss << addr;
31f18b77
FG
3129 ss << "\"}";
3130 std::string tmp = ss.str();
3131 std::vector<std::string> cmd = {tmp};
3132
91327a77 3133 auto kill_client_session = [this, session_id, wait, on_killed](){
31f18b77
FG
3134 assert(mds_lock.is_locked_by_me());
3135 Session *session = sessionmap.get_session(
3136 entity_name_t(CEPH_ENTITY_TYPE_CLIENT, session_id));
3137 if (session) {
91327a77 3138 if (on_killed || !wait) {
31f18b77
FG
3139 server->kill_session(session, on_killed);
3140 } else {
3141 C_SaferCond on_safe;
3142 server->kill_session(session, &on_safe);
3143
3144 mds_lock.Unlock();
3145 on_safe.wait();
3146 mds_lock.Lock();
3147 }
3148 } else {
3149 dout(1) << "session " << session_id << " was removed while we waited "
3150 "for blacklist" << dendl;
3151
3152 // Even though it wasn't us that removed it, kick our completion
3153 // as the session has been removed.
3154 if (on_killed) {
3155 on_killed->complete(0);
3156 }
3157 }
3158 };
3159
91327a77 3160 auto apply_blacklist = [this, cmd](std::function<void ()> fn){
31f18b77
FG
3161 assert(mds_lock.is_locked_by_me());
3162
91327a77 3163 Context *on_blacklist_done = new FunctionContext([this, fn](int r) {
31f18b77
FG
3164 objecter->wait_for_latest_osdmap(
3165 new C_OnFinisher(
91327a77 3166 new FunctionContext([this, fn](int r) {
31f18b77
FG
3167 Mutex::Locker l(mds_lock);
3168 auto epoch = objecter->with_osdmap([](const OSDMap &o){
3169 return o.get_epoch();
3170 });
3171
3172 set_osd_epoch_barrier(epoch);
3173
3174 fn();
3175 }), finisher)
3176 );
3177 });
3178
3179 dout(4) << "Sending mon blacklist command: " << cmd[0] << dendl;
3180 monc->start_mon_command(cmd, {}, nullptr, nullptr, on_blacklist_done);
3181 };
3182
31f18b77
FG
3183 if (wait) {
3184 if (blacklist) {
91327a77
AA
3185 C_SaferCond inline_ctx;
3186 apply_blacklist([&inline_ctx](){inline_ctx.complete(0);});
3187 mds_lock.Unlock();
3188 inline_ctx.wait();
3189 mds_lock.Lock();
31f18b77
FG
3190 }
3191
3192 // We dropped mds_lock, so check that session still exists
3193 session = sessionmap.get_session(entity_name_t(CEPH_ENTITY_TYPE_CLIENT,
91327a77 3194 session_id));
31f18b77
FG
3195 if (!session) {
3196 dout(1) << "session " << session_id << " was removed while we waited "
3197 "for blacklist" << dendl;
3198 return true;
3199 }
91327a77 3200 kill_client_session();
7c673cae 3201 } else {
31f18b77 3202 if (blacklist) {
91327a77 3203 apply_blacklist(kill_client_session);
31f18b77 3204 } else {
91327a77 3205 kill_client_session();
31f18b77 3206 }
7c673cae 3207 }
31f18b77 3208
7c673cae
FG
3209 return true;
3210}
3211
3212void MDSRank::bcast_mds_map()
3213{
3214 dout(7) << "bcast_mds_map " << mdsmap->get_epoch() << dendl;
3215
3216 // share the map with mounted clients
3217 set<Session*> clients;
3218 sessionmap.get_client_session_set(clients);
3219 for (set<Session*>::const_iterator p = clients.begin();
3220 p != clients.end();
3221 ++p)
3222 (*p)->connection->send_message(new MMDSMap(monc->get_fsid(), mdsmap));
3223 last_client_mdsmap_bcast = mdsmap->get_epoch();
3224}
3225
3226MDSRankDispatcher::MDSRankDispatcher(
3227 mds_rank_t whoami_,
3228 Mutex &mds_lock_,
3229 LogChannelRef &clog_,
3230 SafeTimer &timer_,
3231 Beacon &beacon_,
3232 MDSMap *& mdsmap_,
3233 Messenger *msgr,
3234 MonClient *monc_,
3235 Context *respawn_hook_,
3236 Context *suicide_hook_)
3237 : MDSRank(whoami_, mds_lock_, clog_, timer_, beacon_, mdsmap_,
3238 msgr, monc_, respawn_hook_, suicide_hook_)
3239{}
3240
3241bool MDSRankDispatcher::handle_command(
3242 const cmdmap_t &cmdmap,
3243 MCommand *m,
3244 int *r,
3245 std::stringstream *ds,
3246 std::stringstream *ss,
f64942e4 3247 Context **run_later,
7c673cae
FG
3248 bool *need_reply)
3249{
3250 assert(r != nullptr);
3251 assert(ds != nullptr);
3252 assert(ss != nullptr);
3253
3254 *need_reply = true;
3255
3256 std::string prefix;
3257 cmd_getval(g_ceph_context, cmdmap, "prefix", prefix);
3258
31f18b77 3259 if (prefix == "session ls" || prefix == "client ls") {
7c673cae
FG
3260 std::vector<std::string> filter_args;
3261 cmd_getval(g_ceph_context, cmdmap, "filters", filter_args);
3262
3263 SessionFilter filter;
3264 *r = filter.parse(filter_args, ss);
3265 if (*r != 0) {
3266 return true;
3267 }
3268
f64942e4
AA
3269 JSONFormatter f(true);
3270 dump_sessions(filter, &f);
3271 f.flush(*ds);
7c673cae 3272 return true;
31f18b77 3273 } else if (prefix == "session evict" || prefix == "client evict") {
7c673cae
FG
3274 std::vector<std::string> filter_args;
3275 cmd_getval(g_ceph_context, cmdmap, "filters", filter_args);
3276
3277 SessionFilter filter;
3278 *r = filter.parse(filter_args, ss);
3279 if (*r != 0) {
3280 return true;
3281 }
3282
31f18b77 3283 evict_clients(filter, m);
7c673cae
FG
3284
3285 *need_reply = false;
3286 return true;
3287 } else if (prefix == "damage ls") {
f64942e4
AA
3288 JSONFormatter f(true);
3289 damage_table.dump(&f);
3290 f.flush(*ds);
7c673cae
FG
3291 return true;
3292 } else if (prefix == "damage rm") {
3293 damage_entry_id_t id = 0;
3294 bool got = cmd_getval(g_ceph_context, cmdmap, "damage_id", (int64_t&)id);
3295 if (!got) {
3296 *r = -EINVAL;
3297 return true;
3298 }
3299
3300 damage_table.erase(id);
f64942e4
AA
3301 return true;
3302 } else if (prefix == "cache drop") {
3303 int64_t timeout;
3304 if (!cmd_getval(g_ceph_context, cmdmap, "timeout", timeout)) {
3305 timeout = 0;
3306 }
3307
3308 JSONFormatter *f = new JSONFormatter(true);
3309 C_MDS_Send_Command_Reply *reply = new C_MDS_Send_Command_Reply(this, m);
3310 Context *on_finish = new FunctionContext([this, f, reply](int r) {
3311 cache_drop_send_reply(f, reply, r);
3312 delete f;
3313 delete reply;
3314 });
3315
3316 *need_reply = false;
3317 *run_later = new C_OnFinisher(
3318 new FunctionContext([this, timeout, f, on_finish](int _) {
3319 command_cache_drop((uint64_t)timeout, f, on_finish);
3320 }), finisher);
3321
7c673cae
FG
3322 return true;
3323 } else {
3324 return false;
3325 }
3326}
3327
f64942e4
AA
3328void MDSRank::cache_drop_send_reply(Formatter *f, C_MDS_Send_Command_Reply *reply, int r) {
3329 dout(20) << __func__ << ": r=" << r << dendl;
3330
3331 std::stringstream ds;
3332 std::stringstream ss;
3333 if (r != 0) {
3334 f->flush(ss);
3335 } else {
3336 f->flush(ds);
3337 }
3338
3339 reply->send(r, ss.str(), ds);
3340}
3341
3342void MDSRank::command_cache_drop(uint64_t timeout, Formatter *f, Context *on_finish) {
3343 dout(20) << __func__ << dendl;
3344
3345 Mutex::Locker locker(mds_lock);
3346 C_Drop_Cache *request = new C_Drop_Cache(server, mdcache, mdlog, this,
3347 timeout, f, on_finish);
3348 request->send();
3349}
3350
7c673cae
FG
3351epoch_t MDSRank::get_osd_epoch() const
3352{
3353 return objecter->with_osdmap(std::mem_fn(&OSDMap::get_epoch));
3354}
3355