]> git.proxmox.com Git - ceph.git/blob - ceph/src/mds/MDSRank.cc
bump version to 12.2.5-pve1
[ceph.git] / ceph / src / mds / MDSRank.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2015 Red Hat
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #include <boost/utility/string_view.hpp>
16
17 #include "common/debug.h"
18 #include "common/errno.h"
19
20 #include "messages/MClientRequestForward.h"
21 #include "messages/MMDSLoadTargets.h"
22 #include "messages/MMDSMap.h"
23 #include "messages/MMDSTableRequest.h"
24 #include "messages/MCommand.h"
25 #include "messages/MCommandReply.h"
26
27 #include "MDSDaemon.h"
28 #include "MDSMap.h"
29 #include "SnapClient.h"
30 #include "SnapServer.h"
31 #include "MDBalancer.h"
32 #include "Locker.h"
33 #include "Server.h"
34 #include "InoTable.h"
35 #include "mon/MonClient.h"
36 #include "common/HeartbeatMap.h"
37 #include "ScrubStack.h"
38
39
40 #include "MDSRank.h"
41
42 #define dout_context g_ceph_context
43 #define dout_subsys ceph_subsys_mds
44 #undef dout_prefix
45 #define dout_prefix *_dout << "mds." << whoami << '.' << incarnation << ' '
46
47 MDSRank::MDSRank(
48 mds_rank_t whoami_,
49 Mutex &mds_lock_,
50 LogChannelRef &clog_,
51 SafeTimer &timer_,
52 Beacon &beacon_,
53 MDSMap *& mdsmap_,
54 Messenger *msgr,
55 MonClient *monc_,
56 Context *respawn_hook_,
57 Context *suicide_hook_)
58 :
59 whoami(whoami_), incarnation(0),
60 mds_lock(mds_lock_), cct(msgr->cct), clog(clog_), timer(timer_),
61 mdsmap(mdsmap_),
62 objecter(new Objecter(g_ceph_context, msgr, monc_, nullptr, 0, 0)),
63 server(NULL), mdcache(NULL), locker(NULL), mdlog(NULL),
64 balancer(NULL), scrubstack(NULL),
65 damage_table(whoami_),
66 inotable(NULL), snapserver(NULL), snapclient(NULL),
67 sessionmap(this), logger(NULL), mlogger(NULL),
68 op_tracker(g_ceph_context, g_conf->mds_enable_op_tracker,
69 g_conf->osd_num_op_tracker_shard),
70 last_state(MDSMap::STATE_BOOT),
71 state(MDSMap::STATE_BOOT),
72 cluster_degraded(false), stopping(false),
73 purge_queue(g_ceph_context, whoami_,
74 mdsmap_->get_metadata_pool(), objecter,
75 new FunctionContext(
76 [this](int r){
77 // Purge Queue operates inside mds_lock when we're calling into
78 // it, and outside when in background, so must handle both cases.
79 if (mds_lock.is_locked_by_me()) {
80 damaged();
81 } else {
82 damaged_unlocked();
83 }
84 }
85 )
86 ),
87 progress_thread(this), dispatch_depth(0),
88 hb(NULL), last_tid(0), osd_epoch_barrier(0), beacon(beacon_),
89 mds_slow_req_count(0),
90 last_client_mdsmap_bcast(0),
91 messenger(msgr), monc(monc_),
92 respawn_hook(respawn_hook_),
93 suicide_hook(suicide_hook_),
94 standby_replaying(false),
95 starttime(mono_clock::now())
96 {
97 hb = g_ceph_context->get_heartbeat_map()->add_worker("MDSRank", pthread_self());
98
99 purge_queue.update_op_limit(*mdsmap);
100
101 objecter->unset_honor_osdmap_full();
102
103 finisher = new Finisher(cct);
104
105 mdcache = new MDCache(this, purge_queue);
106 mdlog = new MDLog(this);
107 balancer = new MDBalancer(this, messenger, monc);
108
109 scrubstack = new ScrubStack(mdcache, finisher);
110
111 inotable = new InoTable(this);
112 snapserver = new SnapServer(this, monc);
113 snapclient = new SnapClient(this);
114
115 server = new Server(this);
116 locker = new Locker(this, mdcache);
117
118 op_tracker.set_complaint_and_threshold(cct->_conf->mds_op_complaint_time,
119 cct->_conf->mds_op_log_threshold);
120 op_tracker.set_history_size_and_duration(cct->_conf->mds_op_history_size,
121 cct->_conf->mds_op_history_duration);
122 }
123
124 MDSRank::~MDSRank()
125 {
126 if (hb) {
127 g_ceph_context->get_heartbeat_map()->remove_worker(hb);
128 }
129
130 if (scrubstack) { delete scrubstack; scrubstack = NULL; }
131 if (mdcache) { delete mdcache; mdcache = NULL; }
132 if (mdlog) { delete mdlog; mdlog = NULL; }
133 if (balancer) { delete balancer; balancer = NULL; }
134 if (inotable) { delete inotable; inotable = NULL; }
135 if (snapserver) { delete snapserver; snapserver = NULL; }
136 if (snapclient) { delete snapclient; snapclient = NULL; }
137 if (mdsmap) { delete mdsmap; mdsmap = 0; }
138
139 if (server) { delete server; server = 0; }
140 if (locker) { delete locker; locker = 0; }
141
142 if (logger) {
143 g_ceph_context->get_perfcounters_collection()->remove(logger);
144 delete logger;
145 logger = 0;
146 }
147 if (mlogger) {
148 g_ceph_context->get_perfcounters_collection()->remove(mlogger);
149 delete mlogger;
150 mlogger = 0;
151 }
152
153 delete finisher;
154 finisher = NULL;
155
156 delete suicide_hook;
157 suicide_hook = NULL;
158
159 delete respawn_hook;
160 respawn_hook = NULL;
161
162 delete objecter;
163 objecter = nullptr;
164 }
165
166 void MDSRankDispatcher::init()
167 {
168 objecter->init();
169 messenger->add_dispatcher_head(objecter);
170
171 objecter->start();
172
173 update_log_config();
174 create_logger();
175
176 // Expose the OSDMap (already populated during MDS::init) to anyone
177 // who is interested in it.
178 handle_osd_map();
179
180 progress_thread.create("mds_rank_progr");
181
182 purge_queue.init();
183
184 finisher->start();
185 }
186
187 void MDSRank::update_targets(utime_t now)
188 {
189 // get MonMap's idea of my export_targets
190 const set<mds_rank_t>& map_targets = mdsmap->get_mds_info(get_nodeid()).export_targets;
191
192 dout(20) << "updating export targets, currently " << map_targets.size() << " ranks are targets" << dendl;
193
194 bool send = false;
195 set<mds_rank_t> new_map_targets;
196
197 auto it = export_targets.begin();
198 while (it != export_targets.end()) {
199 mds_rank_t rank = it->first;
200 double val = it->second.get(now);
201 dout(20) << "export target mds." << rank << " value is " << val << " @ " << now << dendl;
202
203 if (val <= 0.01) {
204 dout(15) << "export target mds." << rank << " is no longer an export target" << dendl;
205 export_targets.erase(it++);
206 send = true;
207 continue;
208 }
209 if (!map_targets.count(rank)) {
210 dout(15) << "export target mds." << rank << " not in map's export_targets" << dendl;
211 send = true;
212 }
213 new_map_targets.insert(rank);
214 it++;
215 }
216 if (new_map_targets.size() < map_targets.size()) {
217 dout(15) << "export target map holds stale targets, sending update" << dendl;
218 send = true;
219 }
220
221 if (send) {
222 dout(15) << "updating export_targets, now " << new_map_targets.size() << " ranks are targets" << dendl;
223 MMDSLoadTargets* m = new MMDSLoadTargets(mds_gid_t(monc->get_global_id()), new_map_targets);
224 monc->send_mon_message(m);
225 }
226 }
227
228 void MDSRank::hit_export_target(utime_t now, mds_rank_t rank, double amount)
229 {
230 double rate = g_conf->mds_bal_target_decay;
231 if (amount < 0.0) {
232 amount = 100.0/g_conf->mds_bal_target_decay; /* a good default for "i am trying to keep this export_target active" */
233 }
234 auto em = export_targets.emplace(std::piecewise_construct, std::forward_as_tuple(rank), std::forward_as_tuple(now, DecayRate(rate)));
235 if (em.second) {
236 dout(15) << "hit export target (new) " << amount << " @ " << now << dendl;
237 } else {
238 dout(15) << "hit export target " << amount << " @ " << now << dendl;
239 }
240 em.first->second.hit(now, amount);
241 }
242
243 void MDSRankDispatcher::tick()
244 {
245 heartbeat_reset();
246
247 if (beacon.is_laggy()) {
248 dout(5) << "tick bailing out since we seem laggy" << dendl;
249 return;
250 }
251
252 check_ops_in_flight();
253
254 // Wake up thread in case we use to be laggy and have waiting_for_nolaggy
255 // messages to progress.
256 progress_thread.signal();
257
258 // make sure mds log flushes, trims periodically
259 mdlog->flush();
260
261 if (is_active() || is_stopping()) {
262 mdcache->trim();
263 mdcache->trim_client_leases();
264 mdcache->check_memory_usage();
265 mdlog->trim(); // NOT during recovery!
266 }
267
268 // log
269 mds_load_t load = balancer->get_load(ceph_clock_now());
270
271 if (logger) {
272 logger->set(l_mds_load_cent, 100 * load.mds_load());
273 logger->set(l_mds_dispatch_queue_len, messenger->get_dispatch_queue_len());
274 logger->set(l_mds_subtrees, mdcache->num_subtrees());
275
276 mdcache->log_stat();
277 }
278
279 // ...
280 if (is_clientreplay() || is_active() || is_stopping()) {
281 server->find_idle_sessions();
282 locker->tick();
283 }
284
285 if (is_reconnect())
286 server->reconnect_tick();
287
288 if (is_active()) {
289 balancer->tick();
290 mdcache->find_stale_fragment_freeze();
291 mdcache->migrator->find_stale_export_freeze();
292 if (snapserver)
293 snapserver->check_osd_map(false);
294 }
295
296 if (is_active() || is_stopping()) {
297 update_targets(ceph_clock_now());
298 }
299
300 // shut down?
301 if (is_stopping()) {
302 mdlog->trim();
303 if (mdcache->shutdown_pass()) {
304 uint64_t pq_progress = 0 ;
305 uint64_t pq_total = 0;
306 size_t pq_in_flight = 0;
307 if (!purge_queue.drain(&pq_progress, &pq_total, &pq_in_flight)) {
308 dout(7) << "shutdown_pass=true, but still waiting for purge queue"
309 << dendl;
310 // This takes unbounded time, so we must indicate progress
311 // to the administrator: we do it in a slightly imperfect way
312 // by sending periodic (tick frequency) clog messages while
313 // in this state.
314 clog->info() << "MDS rank " << whoami << " waiting for purge queue ("
315 << std::dec << pq_progress << "/" << pq_total << " " << pq_in_flight
316 << " files purging" << ")";
317 } else {
318 dout(7) << "shutdown_pass=true, finished w/ shutdown, moving to "
319 "down:stopped" << dendl;
320 stopping_done();
321 }
322 }
323 else {
324 dout(7) << "shutdown_pass=false" << dendl;
325 }
326 }
327
328 // Expose ourselves to Beacon to update health indicators
329 beacon.notify_health(this);
330 }
331
332 void MDSRankDispatcher::shutdown()
333 {
334 // It should never be possible for shutdown to get called twice, because
335 // anyone picking up mds_lock checks if stopping is true and drops
336 // out if it is.
337 assert(stopping == false);
338 stopping = true;
339
340 dout(1) << __func__ << ": shutting down rank " << whoami << dendl;
341
342 timer.shutdown();
343
344 // MDLog has to shut down before the finisher, because some of its
345 // threads block on IOs that require finisher to complete.
346 mdlog->shutdown();
347
348 // shut down cache
349 mdcache->shutdown();
350
351 purge_queue.shutdown();
352
353 mds_lock.Unlock();
354 finisher->stop(); // no flushing
355 mds_lock.Lock();
356
357 if (objecter->initialized)
358 objecter->shutdown();
359
360 monc->shutdown();
361
362 op_tracker.on_shutdown();
363
364 progress_thread.shutdown();
365
366 // release mds_lock for finisher/messenger threads (e.g.
367 // MDSDaemon::ms_handle_reset called from Messenger).
368 mds_lock.Unlock();
369
370 // shut down messenger
371 messenger->shutdown();
372
373 mds_lock.Lock();
374
375 // Workaround unclean shutdown: HeartbeatMap will assert if
376 // worker is not removed (as we do in ~MDS), but ~MDS is not
377 // always called after suicide.
378 if (hb) {
379 g_ceph_context->get_heartbeat_map()->remove_worker(hb);
380 hb = NULL;
381 }
382 }
383
384 /**
385 * Helper for simple callbacks that call a void fn with no args.
386 */
387 class C_MDS_VoidFn : public MDSInternalContext
388 {
389 typedef void (MDSRank::*fn_ptr)();
390 protected:
391 fn_ptr fn;
392 public:
393 C_MDS_VoidFn(MDSRank *mds_, fn_ptr fn_)
394 : MDSInternalContext(mds_), fn(fn_)
395 {
396 assert(mds_);
397 assert(fn_);
398 }
399
400 void finish(int r) override
401 {
402 (mds->*fn)();
403 }
404 };
405
406 int64_t MDSRank::get_metadata_pool()
407 {
408 return mdsmap->get_metadata_pool();
409 }
410
411 MDSTableClient *MDSRank::get_table_client(int t)
412 {
413 switch (t) {
414 case TABLE_ANCHOR: return NULL;
415 case TABLE_SNAP: return snapclient;
416 default: ceph_abort();
417 }
418 }
419
420 MDSTableServer *MDSRank::get_table_server(int t)
421 {
422 switch (t) {
423 case TABLE_ANCHOR: return NULL;
424 case TABLE_SNAP: return snapserver;
425 default: ceph_abort();
426 }
427 }
428
429 void MDSRank::suicide()
430 {
431 if (suicide_hook) {
432 suicide_hook->complete(0);
433 suicide_hook = NULL;
434 }
435 }
436
437 void MDSRank::respawn()
438 {
439 if (respawn_hook) {
440 respawn_hook->complete(0);
441 respawn_hook = NULL;
442 }
443 }
444
445 void MDSRank::damaged()
446 {
447 assert(whoami != MDS_RANK_NONE);
448 assert(mds_lock.is_locked_by_me());
449
450 beacon.set_want_state(mdsmap, MDSMap::STATE_DAMAGED);
451 monc->flush_log(); // Flush any clog error from before we were called
452 beacon.notify_health(this); // Include latest status in our swan song
453 beacon.send_and_wait(g_conf->mds_mon_shutdown_timeout);
454
455 // It's okay if we timed out and the mon didn't get our beacon, because
456 // another daemon (or ourselves after respawn) will eventually take the
457 // rank and report DAMAGED again when it hits same problem we did.
458
459 respawn(); // Respawn into standby in case mon has other work for us
460 }
461
462 void MDSRank::damaged_unlocked()
463 {
464 Mutex::Locker l(mds_lock);
465 damaged();
466 }
467
468 void MDSRank::handle_write_error(int err)
469 {
470 if (err == -EBLACKLISTED) {
471 derr << "we have been blacklisted (fenced), respawning..." << dendl;
472 respawn();
473 return;
474 }
475
476 if (g_conf->mds_action_on_write_error >= 2) {
477 derr << "unhandled write error " << cpp_strerror(err) << ", suicide..." << dendl;
478 respawn();
479 } else if (g_conf->mds_action_on_write_error == 1) {
480 derr << "unhandled write error " << cpp_strerror(err) << ", force readonly..." << dendl;
481 mdcache->force_readonly();
482 } else {
483 // ignore;
484 derr << "unhandled write error " << cpp_strerror(err) << ", ignore..." << dendl;
485 }
486 }
487
488 void *MDSRank::ProgressThread::entry()
489 {
490 Mutex::Locker l(mds->mds_lock);
491 while (true) {
492 while (!mds->stopping &&
493 mds->finished_queue.empty() &&
494 (mds->waiting_for_nolaggy.empty() || mds->beacon.is_laggy())) {
495 cond.Wait(mds->mds_lock);
496 }
497
498 if (mds->stopping) {
499 break;
500 }
501
502 mds->_advance_queues();
503 }
504
505 return NULL;
506 }
507
508
509 void MDSRank::ProgressThread::shutdown()
510 {
511 assert(mds->mds_lock.is_locked_by_me());
512 assert(mds->stopping);
513
514 if (am_self()) {
515 // Stopping is set, we will fall out of our main loop naturally
516 } else {
517 // Kick the thread to notice mds->stopping, and join it
518 cond.Signal();
519 mds->mds_lock.Unlock();
520 if (is_started())
521 join();
522 mds->mds_lock.Lock();
523 }
524 }
525
526 bool MDSRankDispatcher::ms_dispatch(Message *m)
527 {
528 bool ret;
529 inc_dispatch_depth();
530 ret = _dispatch(m, true);
531 dec_dispatch_depth();
532 return ret;
533 }
534
535 /* If this function returns true, it recognizes the message and has taken the
536 * reference. If it returns false, it has done neither. */
537 bool MDSRank::_dispatch(Message *m, bool new_msg)
538 {
539 if (is_stale_message(m)) {
540 m->put();
541 return true;
542 }
543
544 if (beacon.is_laggy()) {
545 dout(10) << " laggy, deferring " << *m << dendl;
546 waiting_for_nolaggy.push_back(m);
547 } else if (new_msg && !waiting_for_nolaggy.empty()) {
548 dout(10) << " there are deferred messages, deferring " << *m << dendl;
549 waiting_for_nolaggy.push_back(m);
550 } else {
551 if (!handle_deferrable_message(m)) {
552 dout(0) << "unrecognized message " << *m << dendl;
553 return false;
554 }
555
556 heartbeat_reset();
557 }
558
559 if (dispatch_depth > 1)
560 return true;
561
562 // finish any triggered contexts
563 _advance_queues();
564
565 if (beacon.is_laggy()) {
566 // We've gone laggy during dispatch, don't do any
567 // more housekeeping
568 return true;
569 }
570
571 // done with all client replayed requests?
572 if (is_clientreplay() &&
573 mdcache->is_open() &&
574 replay_queue.empty() &&
575 beacon.get_want_state() == MDSMap::STATE_CLIENTREPLAY) {
576 int num_requests = mdcache->get_num_client_requests();
577 dout(10) << " still have " << num_requests << " active replay requests" << dendl;
578 if (num_requests == 0)
579 clientreplay_done();
580 }
581
582 // hack: thrash exports
583 static utime_t start;
584 utime_t now = ceph_clock_now();
585 if (start == utime_t())
586 start = now;
587 /*double el = now - start;
588 if (el > 30.0 &&
589 el < 60.0)*/
590 for (int i=0; i<g_conf->mds_thrash_exports; i++) {
591 set<mds_rank_t> s;
592 if (!is_active()) break;
593 mdsmap->get_mds_set(s, MDSMap::STATE_ACTIVE);
594 if (s.size() < 2 || CInode::count() < 10)
595 break; // need peers for this to work.
596 if (mdcache->migrator->get_num_exporting() > g_conf->mds_thrash_exports * 5 ||
597 mdcache->migrator->get_export_queue_size() > g_conf->mds_thrash_exports * 10)
598 break;
599
600 dout(7) << "mds thrashing exports pass " << (i+1) << "/" << g_conf->mds_thrash_exports << dendl;
601
602 // pick a random dir inode
603 CInode *in = mdcache->hack_pick_random_inode();
604
605 list<CDir*> ls;
606 in->get_dirfrags(ls);
607 if (!ls.empty()) { // must be an open dir.
608 list<CDir*>::iterator p = ls.begin();
609 int n = rand() % ls.size();
610 while (n--)
611 ++p;
612 CDir *dir = *p;
613 if (!dir->get_parent_dir()) continue; // must be linked.
614 if (!dir->is_auth()) continue; // must be auth.
615
616 mds_rank_t dest;
617 do {
618 int k = rand() % s.size();
619 set<mds_rank_t>::iterator p = s.begin();
620 while (k--) ++p;
621 dest = *p;
622 } while (dest == whoami);
623 mdcache->migrator->export_dir_nicely(dir,dest);
624 }
625 }
626 // hack: thrash fragments
627 for (int i=0; i<g_conf->mds_thrash_fragments; i++) {
628 if (!is_active()) break;
629 if (mdcache->get_num_fragmenting_dirs() > 5 * g_conf->mds_thrash_fragments) break;
630 dout(7) << "mds thrashing fragments pass " << (i+1) << "/" << g_conf->mds_thrash_fragments << dendl;
631
632 // pick a random dir inode
633 CInode *in = mdcache->hack_pick_random_inode();
634
635 list<CDir*> ls;
636 in->get_dirfrags(ls);
637 if (ls.empty()) continue; // must be an open dir.
638 CDir *dir = ls.front();
639 if (!dir->get_parent_dir()) continue; // must be linked.
640 if (!dir->is_auth()) continue; // must be auth.
641 frag_t fg = dir->get_frag();
642 if (mdsmap->allows_dirfrags()) {
643 if ((fg == frag_t() || (rand() % (1 << fg.bits()) == 0))) {
644 mdcache->split_dir(dir, 1);
645 } else {
646 balancer->queue_merge(dir);
647 }
648 }
649 }
650
651 // hack: force hash root?
652 /*
653 if (false &&
654 mdcache->get_root() &&
655 mdcache->get_root()->dir &&
656 !(mdcache->get_root()->dir->is_hashed() ||
657 mdcache->get_root()->dir->is_hashing())) {
658 dout(0) << "hashing root" << dendl;
659 mdcache->migrator->hash_dir(mdcache->get_root()->dir);
660 }
661 */
662
663 update_mlogger();
664 return true;
665 }
666
667 void MDSRank::update_mlogger()
668 {
669 if (mlogger) {
670 mlogger->set(l_mdm_ino, CInode::count());
671 mlogger->set(l_mdm_dir, CDir::count());
672 mlogger->set(l_mdm_dn, CDentry::count());
673 mlogger->set(l_mdm_cap, Capability::count());
674 mlogger->set(l_mdm_inoa, CInode::increments());
675 mlogger->set(l_mdm_inos, CInode::decrements());
676 mlogger->set(l_mdm_dira, CDir::increments());
677 mlogger->set(l_mdm_dirs, CDir::decrements());
678 mlogger->set(l_mdm_dna, CDentry::increments());
679 mlogger->set(l_mdm_dns, CDentry::decrements());
680 mlogger->set(l_mdm_capa, Capability::increments());
681 mlogger->set(l_mdm_caps, Capability::decrements());
682 mlogger->set(l_mdm_buf, buffer::get_total_alloc());
683 }
684 }
685
686 /*
687 * lower priority messages we defer if we seem laggy
688 */
689 bool MDSRank::handle_deferrable_message(Message *m)
690 {
691 int port = m->get_type() & 0xff00;
692
693 switch (port) {
694 case MDS_PORT_CACHE:
695 ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MDS);
696 mdcache->dispatch(m);
697 break;
698
699 case MDS_PORT_MIGRATOR:
700 ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MDS);
701 mdcache->migrator->dispatch(m);
702 break;
703
704 default:
705 switch (m->get_type()) {
706 // SERVER
707 case CEPH_MSG_CLIENT_SESSION:
708 case CEPH_MSG_CLIENT_RECONNECT:
709 ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_CLIENT);
710 // fall-thru
711 case CEPH_MSG_CLIENT_REQUEST:
712 server->dispatch(m);
713 break;
714 case MSG_MDS_SLAVE_REQUEST:
715 ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MDS);
716 server->dispatch(m);
717 break;
718
719 case MSG_MDS_HEARTBEAT:
720 ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MDS);
721 balancer->proc_message(m);
722 break;
723
724 case MSG_MDS_TABLE_REQUEST:
725 ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MDS);
726 {
727 MMDSTableRequest *req = static_cast<MMDSTableRequest*>(m);
728 if (req->op < 0) {
729 MDSTableClient *client = get_table_client(req->table);
730 client->handle_request(req);
731 } else {
732 MDSTableServer *server = get_table_server(req->table);
733 server->handle_request(req);
734 }
735 }
736 break;
737
738 case MSG_MDS_LOCK:
739 case MSG_MDS_INODEFILECAPS:
740 ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MDS);
741 locker->dispatch(m);
742 break;
743
744 case CEPH_MSG_CLIENT_CAPS:
745 case CEPH_MSG_CLIENT_CAPRELEASE:
746 case CEPH_MSG_CLIENT_LEASE:
747 ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_CLIENT);
748 locker->dispatch(m);
749 break;
750
751 default:
752 return false;
753 }
754 }
755
756 return true;
757 }
758
759 /**
760 * Advance finished_queue and waiting_for_nolaggy.
761 *
762 * Usually drain both queues, but may not drain waiting_for_nolaggy
763 * if beacon is currently laggy.
764 */
765 void MDSRank::_advance_queues()
766 {
767 assert(mds_lock.is_locked_by_me());
768
769 while (!finished_queue.empty()) {
770 dout(7) << "mds has " << finished_queue.size() << " queued contexts" << dendl;
771 dout(10) << finished_queue << dendl;
772 list<MDSInternalContextBase*> ls;
773 ls.swap(finished_queue);
774 while (!ls.empty()) {
775 dout(10) << " finish " << ls.front() << dendl;
776 ls.front()->complete(0);
777 ls.pop_front();
778
779 heartbeat_reset();
780 }
781 }
782
783 while (!waiting_for_nolaggy.empty()) {
784 // stop if we're laggy now!
785 if (beacon.is_laggy())
786 break;
787
788 Message *old = waiting_for_nolaggy.front();
789 waiting_for_nolaggy.pop_front();
790
791 if (is_stale_message(old)) {
792 old->put();
793 } else {
794 dout(7) << " processing laggy deferred " << *old << dendl;
795 if (!handle_deferrable_message(old)) {
796 dout(0) << "unrecognized message " << *old << dendl;
797 old->put();
798 }
799 }
800
801 heartbeat_reset();
802 }
803 }
804
805 /**
806 * Call this when you take mds_lock, or periodically if you're going to
807 * hold the lock for a long time (e.g. iterating over clients/inodes)
808 */
809 void MDSRank::heartbeat_reset()
810 {
811 // Any thread might jump into mds_lock and call us immediately
812 // after a call to suicide() completes, in which case MDSRank::hb
813 // has been freed and we are a no-op.
814 if (!hb) {
815 assert(stopping);
816 return;
817 }
818
819 // NB not enabling suicide grace, because the mon takes care of killing us
820 // (by blacklisting us) when we fail to send beacons, and it's simpler to
821 // only have one way of dying.
822 g_ceph_context->get_heartbeat_map()->reset_timeout(hb, g_conf->mds_beacon_grace, 0);
823 }
824
825 bool MDSRank::is_stale_message(Message *m) const
826 {
827 // from bad mds?
828 if (m->get_source().is_mds()) {
829 mds_rank_t from = mds_rank_t(m->get_source().num());
830 if (!mdsmap->have_inst(from) ||
831 mdsmap->get_inst(from) != m->get_source_inst() ||
832 mdsmap->is_down(from)) {
833 // bogus mds?
834 if (m->get_type() == CEPH_MSG_MDS_MAP) {
835 dout(5) << "got " << *m << " from old/bad/imposter mds " << m->get_source()
836 << ", but it's an mdsmap, looking at it" << dendl;
837 } else if (m->get_type() == MSG_MDS_CACHEEXPIRE &&
838 mdsmap->get_inst(from) == m->get_source_inst()) {
839 dout(5) << "got " << *m << " from down mds " << m->get_source()
840 << ", but it's a cache_expire, looking at it" << dendl;
841 } else {
842 dout(5) << "got " << *m << " from down/old/bad/imposter mds " << m->get_source()
843 << ", dropping" << dendl;
844 return true;
845 }
846 }
847 }
848 return false;
849 }
850
851 Session *MDSRank::get_session(Message *m)
852 {
853 Session *session = static_cast<Session *>(m->get_connection()->get_priv());
854 if (session) {
855 dout(20) << "get_session have " << session << " " << session->info.inst
856 << " state " << session->get_state_name() << dendl;
857 session->put(); // not carry ref
858 } else {
859 dout(20) << "get_session dne for " << m->get_source_inst() << dendl;
860 }
861 return session;
862 }
863
864 void MDSRank::send_message(Message *m, Connection *c)
865 {
866 assert(c);
867 c->send_message(m);
868 }
869
870
871 void MDSRank::send_message_mds(Message *m, mds_rank_t mds)
872 {
873 if (!mdsmap->is_up(mds)) {
874 dout(10) << "send_message_mds mds." << mds << " not up, dropping " << *m << dendl;
875 m->put();
876 return;
877 }
878
879 // send mdsmap first?
880 if (mds != whoami && peer_mdsmap_epoch[mds] < mdsmap->get_epoch()) {
881 messenger->send_message(new MMDSMap(monc->get_fsid(), mdsmap),
882 mdsmap->get_inst(mds));
883 peer_mdsmap_epoch[mds] = mdsmap->get_epoch();
884 }
885
886 // send message
887 messenger->send_message(m, mdsmap->get_inst(mds));
888 }
889
890 void MDSRank::forward_message_mds(Message *m, mds_rank_t mds)
891 {
892 assert(mds != whoami);
893
894 // client request?
895 if (m->get_type() == CEPH_MSG_CLIENT_REQUEST &&
896 (static_cast<MClientRequest*>(m))->get_source().is_client()) {
897 MClientRequest *creq = static_cast<MClientRequest*>(m);
898 creq->inc_num_fwd(); // inc forward counter
899
900 /*
901 * don't actually forward if non-idempotent!
902 * client has to do it. although the MDS will ignore duplicate requests,
903 * the affected metadata may migrate, in which case the new authority
904 * won't have the metareq_id in the completed request map.
905 */
906 // NEW: always make the client resend!
907 bool client_must_resend = true; //!creq->can_forward();
908
909 // tell the client where it should go
910 messenger->send_message(new MClientRequestForward(creq->get_tid(), mds, creq->get_num_fwd(),
911 client_must_resend),
912 creq->get_source_inst());
913
914 if (client_must_resend) {
915 m->put();
916 return;
917 }
918 }
919
920 // these are the only types of messages we should be 'forwarding'; they
921 // explicitly encode their source mds, which gets clobbered when we resend
922 // them here.
923 assert(m->get_type() == MSG_MDS_DIRUPDATE ||
924 m->get_type() == MSG_MDS_EXPORTDIRDISCOVER);
925
926 // send mdsmap first?
927 if (peer_mdsmap_epoch[mds] < mdsmap->get_epoch()) {
928 messenger->send_message(new MMDSMap(monc->get_fsid(), mdsmap),
929 mdsmap->get_inst(mds));
930 peer_mdsmap_epoch[mds] = mdsmap->get_epoch();
931 }
932
933 messenger->send_message(m, mdsmap->get_inst(mds));
934 }
935
936
937
938 void MDSRank::send_message_client_counted(Message *m, client_t client)
939 {
940 Session *session = sessionmap.get_session(entity_name_t::CLIENT(client.v));
941 if (session) {
942 send_message_client_counted(m, session);
943 } else {
944 dout(10) << "send_message_client_counted no session for client." << client << " " << *m << dendl;
945 }
946 }
947
948 void MDSRank::send_message_client_counted(Message *m, Connection *connection)
949 {
950 Session *session = static_cast<Session *>(connection->get_priv());
951 if (session) {
952 session->put(); // do not carry ref
953 send_message_client_counted(m, session);
954 } else {
955 dout(10) << "send_message_client_counted has no session for " << m->get_source_inst() << dendl;
956 // another Connection took over the Session
957 }
958 }
959
960 void MDSRank::send_message_client_counted(Message *m, Session *session)
961 {
962 version_t seq = session->inc_push_seq();
963 dout(10) << "send_message_client_counted " << session->info.inst.name << " seq "
964 << seq << " " << *m << dendl;
965 if (session->connection) {
966 session->connection->send_message(m);
967 } else {
968 session->preopen_out_queue.push_back(m);
969 }
970 }
971
972 void MDSRank::send_message_client(Message *m, Session *session)
973 {
974 dout(10) << "send_message_client " << session->info.inst << " " << *m << dendl;
975 if (session->connection) {
976 session->connection->send_message(m);
977 } else {
978 session->preopen_out_queue.push_back(m);
979 }
980 }
981
982 /**
983 * This is used whenever a RADOS operation has been cancelled
984 * or a RADOS client has been blacklisted, to cause the MDS and
985 * any clients to wait for this OSD epoch before using any new caps.
986 *
987 * See doc/cephfs/eviction
988 */
989 void MDSRank::set_osd_epoch_barrier(epoch_t e)
990 {
991 dout(4) << __func__ << ": epoch=" << e << dendl;
992 osd_epoch_barrier = e;
993 }
994
995 void MDSRank::retry_dispatch(Message *m)
996 {
997 inc_dispatch_depth();
998 _dispatch(m, false);
999 dec_dispatch_depth();
1000 }
1001
1002 utime_t MDSRank::get_laggy_until() const
1003 {
1004 return beacon.get_laggy_until();
1005 }
1006
1007 bool MDSRank::is_daemon_stopping() const
1008 {
1009 return stopping;
1010 }
1011
1012 void MDSRank::request_state(MDSMap::DaemonState s)
1013 {
1014 dout(3) << "request_state " << ceph_mds_state_name(s) << dendl;
1015 beacon.set_want_state(mdsmap, s);
1016 beacon.send();
1017 }
1018
1019
1020 class C_MDS_BootStart : public MDSInternalContext {
1021 MDSRank::BootStep nextstep;
1022 public:
1023 C_MDS_BootStart(MDSRank *m, MDSRank::BootStep n)
1024 : MDSInternalContext(m), nextstep(n) {}
1025 void finish(int r) override {
1026 mds->boot_start(nextstep, r);
1027 }
1028 };
1029
1030
1031 void MDSRank::boot_start(BootStep step, int r)
1032 {
1033 // Handle errors from previous step
1034 if (r < 0) {
1035 if (is_standby_replay() && (r == -EAGAIN)) {
1036 dout(0) << "boot_start encountered an error EAGAIN"
1037 << ", respawning since we fell behind journal" << dendl;
1038 respawn();
1039 } else if (r == -EINVAL || r == -ENOENT) {
1040 // Invalid or absent data, indicates damaged on-disk structures
1041 clog->error() << "Error loading MDS rank " << whoami << ": "
1042 << cpp_strerror(r);
1043 damaged();
1044 assert(r == 0); // Unreachable, damaged() calls respawn()
1045 } else {
1046 // Completely unexpected error, give up and die
1047 dout(0) << "boot_start encountered an error, failing" << dendl;
1048 suicide();
1049 return;
1050 }
1051 }
1052
1053 assert(is_starting() || is_any_replay());
1054
1055 switch(step) {
1056 case MDS_BOOT_INITIAL:
1057 {
1058 mdcache->init_layouts();
1059
1060 MDSGatherBuilder gather(g_ceph_context,
1061 new C_MDS_BootStart(this, MDS_BOOT_OPEN_ROOT));
1062 dout(2) << "boot_start " << step << ": opening inotable" << dendl;
1063 inotable->set_rank(whoami);
1064 inotable->load(gather.new_sub());
1065
1066 dout(2) << "boot_start " << step << ": opening sessionmap" << dendl;
1067 sessionmap.set_rank(whoami);
1068 sessionmap.load(gather.new_sub());
1069
1070 dout(2) << "boot_start " << step << ": opening mds log" << dendl;
1071 mdlog->open(gather.new_sub());
1072
1073 if (is_starting()) {
1074 dout(2) << "boot_start " << step << ": opening purge queue" << dendl;
1075 purge_queue.open(new C_IO_Wrapper(this, gather.new_sub()));
1076 } else if (!standby_replaying) {
1077 dout(2) << "boot_start " << step << ": opening purge queue (async)" << dendl;
1078 purge_queue.open(NULL);
1079 }
1080
1081 if (mdsmap->get_tableserver() == whoami) {
1082 dout(2) << "boot_start " << step << ": opening snap table" << dendl;
1083 snapserver->set_rank(whoami);
1084 snapserver->load(gather.new_sub());
1085 }
1086
1087 gather.activate();
1088 }
1089 break;
1090 case MDS_BOOT_OPEN_ROOT:
1091 {
1092 dout(2) << "boot_start " << step << ": loading/discovering base inodes" << dendl;
1093
1094 MDSGatherBuilder gather(g_ceph_context,
1095 new C_MDS_BootStart(this, MDS_BOOT_PREPARE_LOG));
1096
1097 mdcache->open_mydir_inode(gather.new_sub());
1098
1099 if (is_starting() ||
1100 whoami == mdsmap->get_root()) { // load root inode off disk if we are auth
1101 mdcache->open_root_inode(gather.new_sub());
1102 } else {
1103 // replay. make up fake root inode to start with
1104 (void)mdcache->create_root_inode();
1105 }
1106 gather.activate();
1107 }
1108 break;
1109 case MDS_BOOT_PREPARE_LOG:
1110 if (is_any_replay()) {
1111 dout(2) << "boot_start " << step << ": replaying mds log" << dendl;
1112 MDSGatherBuilder gather(g_ceph_context,
1113 new C_MDS_BootStart(this, MDS_BOOT_REPLAY_DONE));
1114
1115 if (!standby_replaying) {
1116 dout(2) << "boot_start " << step << ": waiting for purge queue recovered" << dendl;
1117 purge_queue.wait_for_recovery(new C_IO_Wrapper(this, gather.new_sub()));
1118 }
1119
1120 mdlog->replay(gather.new_sub());
1121 gather.activate();
1122 } else {
1123 dout(2) << "boot_start " << step << ": positioning at end of old mds log" << dendl;
1124 mdlog->append();
1125 starting_done();
1126 }
1127 break;
1128 case MDS_BOOT_REPLAY_DONE:
1129 assert(is_any_replay());
1130
1131 // Sessiontable and inotable should be in sync after replay, validate
1132 // that they are consistent.
1133 validate_sessions();
1134
1135 replay_done();
1136 break;
1137 }
1138 }
1139
1140 void MDSRank::validate_sessions()
1141 {
1142 assert(mds_lock.is_locked_by_me());
1143 std::vector<Session*> victims;
1144
1145 // Identify any sessions which have state inconsistent with other,
1146 // after they have been loaded from rados during startup.
1147 // Mitigate bugs like: http://tracker.ceph.com/issues/16842
1148 const auto &sessions = sessionmap.get_sessions();
1149 for (const auto &i : sessions) {
1150 Session *session = i.second;
1151 interval_set<inodeno_t> badones;
1152 if (inotable->intersects_free(session->info.prealloc_inos, &badones)) {
1153 clog->error() << "Client session loaded with invalid preallocated "
1154 "inodes, evicting session " << *session;
1155
1156 // Make the session consistent with inotable so that it can
1157 // be cleanly torn down
1158 session->info.prealloc_inos.subtract(badones);
1159
1160 victims.push_back(session);
1161 }
1162 }
1163
1164 for (const auto &session: victims) {
1165 server->kill_session(session, nullptr);
1166 }
1167 }
1168
1169 void MDSRank::starting_done()
1170 {
1171 dout(3) << "starting_done" << dendl;
1172 assert(is_starting());
1173 request_state(MDSMap::STATE_ACTIVE);
1174
1175 mdcache->open_root();
1176
1177 if (mdcache->is_open()) {
1178 mdlog->start_new_segment();
1179 } else {
1180 mdcache->wait_for_open(new MDSInternalContextWrapper(this,
1181 new FunctionContext([this] (int r) {
1182 mdlog->start_new_segment();
1183 })));
1184 }
1185 }
1186
1187
1188 void MDSRank::calc_recovery_set()
1189 {
1190 // initialize gather sets
1191 set<mds_rank_t> rs;
1192 mdsmap->get_recovery_mds_set(rs);
1193 rs.erase(whoami);
1194 mdcache->set_recovery_set(rs);
1195
1196 dout(1) << " recovery set is " << rs << dendl;
1197 }
1198
1199
1200 void MDSRank::replay_start()
1201 {
1202 dout(1) << "replay_start" << dendl;
1203
1204 if (is_standby_replay())
1205 standby_replaying = true;
1206
1207 calc_recovery_set();
1208
1209 // Check if we need to wait for a newer OSD map before starting
1210 Context *fin = new C_IO_Wrapper(this, new C_MDS_BootStart(this, MDS_BOOT_INITIAL));
1211 bool const ready = objecter->wait_for_map(
1212 mdsmap->get_last_failure_osd_epoch(),
1213 fin);
1214
1215 if (ready) {
1216 delete fin;
1217 boot_start();
1218 } else {
1219 dout(1) << " waiting for osdmap " << mdsmap->get_last_failure_osd_epoch()
1220 << " (which blacklists prior instance)" << dendl;
1221 }
1222 }
1223
1224
1225 class MDSRank::C_MDS_StandbyReplayRestartFinish : public MDSIOContext {
1226 uint64_t old_read_pos;
1227 public:
1228 C_MDS_StandbyReplayRestartFinish(MDSRank *mds_, uint64_t old_read_pos_) :
1229 MDSIOContext(mds_), old_read_pos(old_read_pos_) {}
1230 void finish(int r) override {
1231 mds->_standby_replay_restart_finish(r, old_read_pos);
1232 }
1233 };
1234
1235 void MDSRank::_standby_replay_restart_finish(int r, uint64_t old_read_pos)
1236 {
1237 if (old_read_pos < mdlog->get_journaler()->get_trimmed_pos()) {
1238 dout(0) << "standby MDS fell behind active MDS journal's expire_pos, restarting" << dendl;
1239 respawn(); /* we're too far back, and this is easier than
1240 trying to reset everything in the cache, etc */
1241 } else {
1242 mdlog->standby_trim_segments();
1243 boot_start(MDS_BOOT_PREPARE_LOG, r);
1244 }
1245 }
1246
1247 class MDSRank::C_MDS_StandbyReplayRestart : public MDSInternalContext {
1248 public:
1249 explicit C_MDS_StandbyReplayRestart(MDSRank *m) : MDSInternalContext(m) {}
1250 void finish(int r) override {
1251 assert(!r);
1252 mds->standby_replay_restart();
1253 }
1254 };
1255
1256 void MDSRank::standby_replay_restart()
1257 {
1258 if (standby_replaying) {
1259 /* Go around for another pass of replaying in standby */
1260 dout(4) << "standby_replay_restart (as standby)" << dendl;
1261 mdlog->get_journaler()->reread_head_and_probe(
1262 new C_MDS_StandbyReplayRestartFinish(
1263 this,
1264 mdlog->get_journaler()->get_read_pos()));
1265 } else {
1266 /* We are transitioning out of standby: wait for OSD map update
1267 before making final pass */
1268 dout(1) << "standby_replay_restart (final takeover pass)" << dendl;
1269 Context *fin = new C_IO_Wrapper(this, new C_MDS_StandbyReplayRestart(this));
1270 bool ready = objecter->wait_for_map(mdsmap->get_last_failure_osd_epoch(), fin);
1271 if (ready) {
1272 delete fin;
1273 mdlog->get_journaler()->reread_head_and_probe(
1274 new C_MDS_StandbyReplayRestartFinish(
1275 this,
1276 mdlog->get_journaler()->get_read_pos()));
1277
1278 dout(1) << " opening purge queue (async)" << dendl;
1279 purge_queue.open(NULL);
1280 } else {
1281 dout(1) << " waiting for osdmap " << mdsmap->get_last_failure_osd_epoch()
1282 << " (which blacklists prior instance)" << dendl;
1283 }
1284 }
1285 }
1286
1287 void MDSRank::replay_done()
1288 {
1289 dout(1) << "replay_done" << (standby_replaying ? " (as standby)" : "") << dendl;
1290
1291 if (is_standby_replay()) {
1292 // The replay was done in standby state, and we are still in that state
1293 assert(standby_replaying);
1294 dout(10) << "setting replay timer" << dendl;
1295 timer.add_event_after(g_conf->mds_replay_interval,
1296 new C_MDS_StandbyReplayRestart(this));
1297 return;
1298 } else if (standby_replaying) {
1299 // The replay was done in standby state, we have now _left_ that state
1300 dout(10) << " last replay pass was as a standby; making final pass" << dendl;
1301 standby_replaying = false;
1302 standby_replay_restart();
1303 return;
1304 } else {
1305 // Replay is complete, journal read should be up to date
1306 assert(mdlog->get_journaler()->get_read_pos() == mdlog->get_journaler()->get_write_pos());
1307 assert(!is_standby_replay());
1308
1309 // Reformat and come back here
1310 if (mdlog->get_journaler()->get_stream_format() < g_conf->mds_journal_format) {
1311 dout(4) << "reformatting journal on standbyreplay->replay transition" << dendl;
1312 mdlog->reopen(new C_MDS_BootStart(this, MDS_BOOT_REPLAY_DONE));
1313 return;
1314 }
1315 }
1316
1317 dout(1) << "making mds journal writeable" << dendl;
1318 mdlog->get_journaler()->set_writeable();
1319 mdlog->get_journaler()->trim_tail();
1320
1321 if (g_conf->mds_wipe_sessions) {
1322 dout(1) << "wiping out client sessions" << dendl;
1323 sessionmap.wipe();
1324 sessionmap.save(new C_MDSInternalNoop);
1325 }
1326 if (g_conf->mds_wipe_ino_prealloc) {
1327 dout(1) << "wiping out ino prealloc from sessions" << dendl;
1328 sessionmap.wipe_ino_prealloc();
1329 sessionmap.save(new C_MDSInternalNoop);
1330 }
1331 if (g_conf->mds_skip_ino) {
1332 inodeno_t i = g_conf->mds_skip_ino;
1333 dout(1) << "skipping " << i << " inodes" << dendl;
1334 inotable->skip_inos(i);
1335 inotable->save(new C_MDSInternalNoop);
1336 }
1337
1338 if (mdsmap->get_num_in_mds() == 1 &&
1339 mdsmap->get_num_failed_mds() == 0) { // just me!
1340 dout(2) << "i am alone, moving to state reconnect" << dendl;
1341 request_state(MDSMap::STATE_RECONNECT);
1342 } else {
1343 dout(2) << "i am not alone, moving to state resolve" << dendl;
1344 request_state(MDSMap::STATE_RESOLVE);
1345 }
1346 }
1347
1348 void MDSRank::reopen_log()
1349 {
1350 dout(1) << "reopen_log" << dendl;
1351 mdcache->rollback_uncommitted_fragments();
1352 }
1353
1354
1355 void MDSRank::resolve_start()
1356 {
1357 dout(1) << "resolve_start" << dendl;
1358
1359 reopen_log();
1360
1361 mdcache->resolve_start(new C_MDS_VoidFn(this, &MDSRank::resolve_done));
1362 finish_contexts(g_ceph_context, waiting_for_resolve);
1363 }
1364 void MDSRank::resolve_done()
1365 {
1366 dout(1) << "resolve_done" << dendl;
1367 request_state(MDSMap::STATE_RECONNECT);
1368 }
1369
1370 void MDSRank::reconnect_start()
1371 {
1372 dout(1) << "reconnect_start" << dendl;
1373
1374 if (last_state == MDSMap::STATE_REPLAY) {
1375 reopen_log();
1376 }
1377
1378 // Drop any blacklisted clients from the SessionMap before going
1379 // into reconnect, so that we don't wait for them.
1380 objecter->enable_blacklist_events();
1381 std::set<entity_addr_t> blacklist;
1382 epoch_t epoch = 0;
1383 objecter->with_osdmap([this, &blacklist, &epoch](const OSDMap& o) {
1384 o.get_blacklist(&blacklist);
1385 epoch = o.get_epoch();
1386 });
1387 auto killed = server->apply_blacklist(blacklist);
1388 dout(4) << "reconnect_start: killed " << killed << " blacklisted sessions ("
1389 << blacklist.size() << " blacklist entries, "
1390 << sessionmap.get_sessions().size() << ")" << dendl;
1391 if (killed) {
1392 set_osd_epoch_barrier(epoch);
1393 }
1394
1395 server->reconnect_clients(new C_MDS_VoidFn(this, &MDSRank::reconnect_done));
1396 finish_contexts(g_ceph_context, waiting_for_reconnect);
1397 }
1398 void MDSRank::reconnect_done()
1399 {
1400 dout(1) << "reconnect_done" << dendl;
1401 request_state(MDSMap::STATE_REJOIN); // move to rejoin state
1402 }
1403
1404 void MDSRank::rejoin_joint_start()
1405 {
1406 dout(1) << "rejoin_joint_start" << dendl;
1407 mdcache->rejoin_send_rejoins();
1408 }
1409 void MDSRank::rejoin_start()
1410 {
1411 dout(1) << "rejoin_start" << dendl;
1412 mdcache->rejoin_start(new C_MDS_VoidFn(this, &MDSRank::rejoin_done));
1413 }
1414 void MDSRank::rejoin_done()
1415 {
1416 dout(1) << "rejoin_done" << dendl;
1417 mdcache->show_subtrees();
1418 mdcache->show_cache();
1419
1420 // funny case: is our cache empty? no subtrees?
1421 if (!mdcache->is_subtrees()) {
1422 if (whoami == 0) {
1423 // The root should always have a subtree!
1424 clog->error() << "No subtrees found for root MDS rank!";
1425 damaged();
1426 assert(mdcache->is_subtrees());
1427 } else {
1428 dout(1) << " empty cache, no subtrees, leaving cluster" << dendl;
1429 request_state(MDSMap::STATE_STOPPED);
1430 }
1431 return;
1432 }
1433
1434 if (replay_queue.empty())
1435 request_state(MDSMap::STATE_ACTIVE);
1436 else
1437 request_state(MDSMap::STATE_CLIENTREPLAY);
1438 }
1439
1440 void MDSRank::clientreplay_start()
1441 {
1442 dout(1) << "clientreplay_start" << dendl;
1443 finish_contexts(g_ceph_context, waiting_for_replay); // kick waiters
1444 mdcache->start_files_to_recover();
1445 queue_one_replay();
1446 }
1447
1448 bool MDSRank::queue_one_replay()
1449 {
1450 if (replay_queue.empty()) {
1451 mdlog->wait_for_safe(new C_MDS_VoidFn(this, &MDSRank::clientreplay_done));
1452 return false;
1453 }
1454 queue_waiter(replay_queue.front());
1455 replay_queue.pop_front();
1456 return true;
1457 }
1458
1459 void MDSRank::clientreplay_done()
1460 {
1461 dout(1) << "clientreplay_done" << dendl;
1462 request_state(MDSMap::STATE_ACTIVE);
1463 }
1464
1465 void MDSRank::active_start()
1466 {
1467 dout(1) << "active_start" << dendl;
1468
1469 if (last_state == MDSMap::STATE_CREATING) {
1470 mdcache->open_root();
1471 }
1472
1473 mdcache->clean_open_file_lists();
1474 mdcache->export_remaining_imported_caps();
1475 finish_contexts(g_ceph_context, waiting_for_replay); // kick waiters
1476 mdcache->start_files_to_recover();
1477
1478 mdcache->reissue_all_caps();
1479 mdcache->activate_stray_manager();
1480
1481 finish_contexts(g_ceph_context, waiting_for_active); // kick waiters
1482 }
1483
1484 void MDSRank::recovery_done(int oldstate)
1485 {
1486 dout(1) << "recovery_done -- successful recovery!" << dendl;
1487 assert(is_clientreplay() || is_active());
1488
1489 // kick snaptable (resent AGREEs)
1490 if (mdsmap->get_tableserver() == whoami) {
1491 set<mds_rank_t> active;
1492 mdsmap->get_clientreplay_or_active_or_stopping_mds_set(active);
1493 snapserver->finish_recovery(active);
1494 }
1495
1496 if (oldstate == MDSMap::STATE_CREATING)
1497 return;
1498
1499 mdcache->start_recovered_truncates();
1500 mdcache->do_file_recover();
1501
1502 // tell connected clients
1503 //bcast_mds_map(); // not anymore, they get this from the monitor
1504
1505 mdcache->populate_mydir();
1506 }
1507
1508 void MDSRank::creating_done()
1509 {
1510 dout(1)<< "creating_done" << dendl;
1511 request_state(MDSMap::STATE_ACTIVE);
1512 }
1513
1514 void MDSRank::boot_create()
1515 {
1516 dout(3) << "boot_create" << dendl;
1517
1518 MDSGatherBuilder fin(g_ceph_context, new C_MDS_VoidFn(this, &MDSRank::creating_done));
1519
1520 mdcache->init_layouts();
1521
1522 snapserver->set_rank(whoami);
1523 inotable->set_rank(whoami);
1524 sessionmap.set_rank(whoami);
1525
1526 // start with a fresh journal
1527 dout(10) << "boot_create creating fresh journal" << dendl;
1528 mdlog->create(fin.new_sub());
1529
1530 // open new journal segment, but do not journal subtree map (yet)
1531 mdlog->prepare_new_segment();
1532
1533 if (whoami == mdsmap->get_root()) {
1534 dout(3) << "boot_create creating fresh hierarchy" << dendl;
1535 mdcache->create_empty_hierarchy(fin.get());
1536 }
1537
1538 dout(3) << "boot_create creating mydir hierarchy" << dendl;
1539 mdcache->create_mydir_hierarchy(fin.get());
1540
1541 // fixme: fake out inotable (reset, pretend loaded)
1542 dout(10) << "boot_create creating fresh inotable table" << dendl;
1543 inotable->reset();
1544 inotable->save(fin.new_sub());
1545
1546 // write empty sessionmap
1547 sessionmap.save(fin.new_sub());
1548
1549 // Create empty purge queue
1550 purge_queue.create(new C_IO_Wrapper(this, fin.new_sub()));
1551
1552 // initialize tables
1553 if (mdsmap->get_tableserver() == whoami) {
1554 dout(10) << "boot_create creating fresh snaptable" << dendl;
1555 snapserver->reset();
1556 snapserver->save(fin.new_sub());
1557 }
1558
1559 assert(g_conf->mds_kill_create_at != 1);
1560
1561 // ok now journal it
1562 mdlog->journal_segment_subtree_map(fin.new_sub());
1563 mdlog->flush();
1564
1565 // Usually we do this during reconnect, but creation skips that.
1566 objecter->enable_blacklist_events();
1567
1568 fin.activate();
1569 }
1570
1571 void MDSRank::stopping_start()
1572 {
1573 dout(2) << "stopping_start" << dendl;
1574
1575 if (mdsmap->get_num_in_mds() == 1 && !sessionmap.empty()) {
1576 // we're the only mds up!
1577 dout(0) << "we are the last MDS, and have mounted clients: we cannot flush our journal. suicide!" << dendl;
1578 suicide();
1579 }
1580
1581 mdcache->shutdown_start();
1582 }
1583
1584 void MDSRank::stopping_done()
1585 {
1586 dout(2) << "stopping_done" << dendl;
1587
1588 // tell monitor we shut down cleanly.
1589 request_state(MDSMap::STATE_STOPPED);
1590 }
1591
1592 void MDSRankDispatcher::handle_mds_map(
1593 MMDSMap *m,
1594 MDSMap *oldmap)
1595 {
1596 // I am only to be passed MDSMaps in which I hold a rank
1597 assert(whoami != MDS_RANK_NONE);
1598
1599 MDSMap::DaemonState oldstate = state;
1600 mds_gid_t mds_gid = mds_gid_t(monc->get_global_id());
1601 state = mdsmap->get_state_gid(mds_gid);
1602 if (state != oldstate) {
1603 last_state = oldstate;
1604 incarnation = mdsmap->get_inc_gid(mds_gid);
1605 }
1606
1607 version_t epoch = m->get_epoch();
1608
1609 // note source's map version
1610 if (m->get_source().is_mds() &&
1611 peer_mdsmap_epoch[mds_rank_t(m->get_source().num())] < epoch) {
1612 dout(15) << " peer " << m->get_source()
1613 << " has mdsmap epoch >= " << epoch
1614 << dendl;
1615 peer_mdsmap_epoch[mds_rank_t(m->get_source().num())] = epoch;
1616 }
1617
1618 // Validate state transitions while I hold a rank
1619 if (!MDSMap::state_transition_valid(oldstate, state)) {
1620 derr << "Invalid state transition " << ceph_mds_state_name(oldstate)
1621 << "->" << ceph_mds_state_name(state) << dendl;
1622 respawn();
1623 }
1624
1625 if (oldstate != state) {
1626 // update messenger.
1627 if (state == MDSMap::STATE_STANDBY_REPLAY) {
1628 dout(1) << "handle_mds_map i am now mds." << mds_gid << "." << incarnation
1629 << " replaying mds." << whoami << "." << incarnation << dendl;
1630 messenger->set_myname(entity_name_t::MDS(mds_gid));
1631 } else {
1632 dout(1) << "handle_mds_map i am now mds." << whoami << "." << incarnation << dendl;
1633 messenger->set_myname(entity_name_t::MDS(whoami));
1634 }
1635 }
1636
1637 // tell objecter my incarnation
1638 if (objecter->get_client_incarnation() != incarnation)
1639 objecter->set_client_incarnation(incarnation);
1640
1641 // for debug
1642 if (g_conf->mds_dump_cache_on_map)
1643 mdcache->dump_cache();
1644
1645 // did it change?
1646 if (oldstate != state) {
1647 dout(1) << "handle_mds_map state change "
1648 << ceph_mds_state_name(oldstate) << " --> "
1649 << ceph_mds_state_name(state) << dendl;
1650 beacon.set_want_state(mdsmap, state);
1651
1652 if (oldstate == MDSMap::STATE_STANDBY_REPLAY) {
1653 dout(10) << "Monitor activated us! Deactivating replay loop" << dendl;
1654 assert (state == MDSMap::STATE_REPLAY);
1655 } else {
1656 // did i just recover?
1657 if ((is_active() || is_clientreplay()) &&
1658 (oldstate == MDSMap::STATE_CREATING ||
1659 oldstate == MDSMap::STATE_REJOIN ||
1660 oldstate == MDSMap::STATE_RECONNECT))
1661 recovery_done(oldstate);
1662
1663 if (is_active()) {
1664 active_start();
1665 } else if (is_any_replay()) {
1666 replay_start();
1667 } else if (is_resolve()) {
1668 resolve_start();
1669 } else if (is_reconnect()) {
1670 reconnect_start();
1671 } else if (is_rejoin()) {
1672 rejoin_start();
1673 } else if (is_clientreplay()) {
1674 clientreplay_start();
1675 } else if (is_creating()) {
1676 boot_create();
1677 } else if (is_starting()) {
1678 boot_start();
1679 } else if (is_stopping()) {
1680 assert(oldstate == MDSMap::STATE_ACTIVE);
1681 stopping_start();
1682 }
1683 }
1684 }
1685
1686 // RESOLVE
1687 // is someone else newly resolving?
1688 if (is_resolve() || is_reconnect() || is_rejoin() ||
1689 is_clientreplay() || is_active() || is_stopping()) {
1690 if (!oldmap->is_resolving() && mdsmap->is_resolving()) {
1691 set<mds_rank_t> resolve;
1692 mdsmap->get_mds_set(resolve, MDSMap::STATE_RESOLVE);
1693 dout(10) << " resolve set is " << resolve << dendl;
1694 calc_recovery_set();
1695 mdcache->send_resolves();
1696 }
1697 }
1698
1699 // REJOIN
1700 // is everybody finally rejoining?
1701 if (is_starting() || is_rejoin() || is_clientreplay() || is_active() || is_stopping()) {
1702 // did we start?
1703 if (!oldmap->is_rejoining() && mdsmap->is_rejoining())
1704 rejoin_joint_start();
1705
1706 // did we finish?
1707 if (g_conf->mds_dump_cache_after_rejoin &&
1708 oldmap->is_rejoining() && !mdsmap->is_rejoining())
1709 mdcache->dump_cache(); // for DEBUG only
1710
1711 if (oldstate >= MDSMap::STATE_REJOIN ||
1712 oldstate == MDSMap::STATE_STARTING) {
1713 // ACTIVE|CLIENTREPLAY|REJOIN => we can discover from them.
1714 set<mds_rank_t> olddis, dis;
1715 oldmap->get_mds_set(olddis, MDSMap::STATE_ACTIVE);
1716 oldmap->get_mds_set(olddis, MDSMap::STATE_CLIENTREPLAY);
1717 oldmap->get_mds_set(olddis, MDSMap::STATE_REJOIN);
1718 mdsmap->get_mds_set(dis, MDSMap::STATE_ACTIVE);
1719 mdsmap->get_mds_set(dis, MDSMap::STATE_CLIENTREPLAY);
1720 mdsmap->get_mds_set(dis, MDSMap::STATE_REJOIN);
1721 for (set<mds_rank_t>::iterator p = dis.begin(); p != dis.end(); ++p)
1722 if (*p != whoami && // not me
1723 olddis.count(*p) == 0) { // newly so?
1724 mdcache->kick_discovers(*p);
1725 mdcache->kick_open_ino_peers(*p);
1726 }
1727 }
1728 }
1729
1730 cluster_degraded = mdsmap->is_degraded();
1731 if (oldmap->is_degraded() && !cluster_degraded && state >= MDSMap::STATE_ACTIVE) {
1732 dout(1) << "cluster recovered." << dendl;
1733 auto it = waiting_for_active_peer.find(MDS_RANK_NONE);
1734 if (it != waiting_for_active_peer.end()) {
1735 queue_waiters(it->second);
1736 waiting_for_active_peer.erase(it);
1737 }
1738 }
1739
1740 // did someone go active?
1741 if (oldstate >= MDSMap::STATE_CLIENTREPLAY &&
1742 (is_clientreplay() || is_active() || is_stopping())) {
1743 set<mds_rank_t> oldactive, active;
1744 oldmap->get_mds_set(oldactive, MDSMap::STATE_ACTIVE);
1745 oldmap->get_mds_set(oldactive, MDSMap::STATE_CLIENTREPLAY);
1746 mdsmap->get_mds_set(active, MDSMap::STATE_ACTIVE);
1747 mdsmap->get_mds_set(active, MDSMap::STATE_CLIENTREPLAY);
1748 for (set<mds_rank_t>::iterator p = active.begin(); p != active.end(); ++p)
1749 if (*p != whoami && // not me
1750 oldactive.count(*p) == 0) // newly so?
1751 handle_mds_recovery(*p);
1752 }
1753
1754 // did someone fail?
1755 // new down?
1756 {
1757 set<mds_rank_t> olddown, down;
1758 oldmap->get_down_mds_set(&olddown);
1759 mdsmap->get_down_mds_set(&down);
1760 for (set<mds_rank_t>::iterator p = down.begin(); p != down.end(); ++p) {
1761 if (oldmap->have_inst(*p) && olddown.count(*p) == 0) {
1762 messenger->mark_down(oldmap->get_inst(*p).addr);
1763 handle_mds_failure(*p);
1764 }
1765 }
1766 }
1767
1768 // did someone fail?
1769 // did their addr/inst change?
1770 {
1771 set<mds_rank_t> up;
1772 mdsmap->get_up_mds_set(up);
1773 for (set<mds_rank_t>::iterator p = up.begin(); p != up.end(); ++p) {
1774 if (oldmap->have_inst(*p) &&
1775 oldmap->get_inst(*p) != mdsmap->get_inst(*p)) {
1776 messenger->mark_down(oldmap->get_inst(*p).addr);
1777 handle_mds_failure(*p);
1778 }
1779 }
1780 }
1781
1782 if (is_clientreplay() || is_active() || is_stopping()) {
1783 // did anyone stop?
1784 set<mds_rank_t> oldstopped, stopped;
1785 oldmap->get_stopped_mds_set(oldstopped);
1786 mdsmap->get_stopped_mds_set(stopped);
1787 for (set<mds_rank_t>::iterator p = stopped.begin(); p != stopped.end(); ++p)
1788 if (oldstopped.count(*p) == 0) // newly so?
1789 mdcache->migrator->handle_mds_failure_or_stop(*p);
1790 }
1791
1792 {
1793 map<epoch_t,list<MDSInternalContextBase*> >::iterator p = waiting_for_mdsmap.begin();
1794 while (p != waiting_for_mdsmap.end() && p->first <= mdsmap->get_epoch()) {
1795 list<MDSInternalContextBase*> ls;
1796 ls.swap(p->second);
1797 waiting_for_mdsmap.erase(p++);
1798 finish_contexts(g_ceph_context, ls);
1799 }
1800 }
1801
1802 if (is_active()) {
1803 // Before going active, set OSD epoch barrier to latest (so that
1804 // we don't risk handing out caps to clients with old OSD maps that
1805 // might not include barriers from the previous incarnation of this MDS)
1806 set_osd_epoch_barrier(objecter->with_osdmap(
1807 std::mem_fn(&OSDMap::get_epoch)));
1808 }
1809
1810 if (is_active()) {
1811 bool found = false;
1812 MDSMap::mds_info_t info = mdsmap->get_info(whoami);
1813
1814 for (map<mds_gid_t,MDSMap::mds_info_t>::const_iterator p = mdsmap->get_mds_info().begin();
1815 p != mdsmap->get_mds_info().end();
1816 ++p) {
1817 if (p->second.state == MDSMap::STATE_STANDBY_REPLAY &&
1818 (p->second.standby_for_rank == whoami ||(info.name.length() && p->second.standby_for_name == info.name))) {
1819 found = true;
1820 break;
1821 }
1822 if (found)
1823 mdlog->set_write_iohint(0);
1824 else
1825 mdlog->set_write_iohint(CEPH_OSD_OP_FLAG_FADVISE_DONTNEED);
1826 }
1827 }
1828
1829 if (oldmap->get_max_mds() != mdsmap->get_max_mds()) {
1830 purge_queue.update_op_limit(*mdsmap);
1831 }
1832 }
1833
1834 void MDSRank::handle_mds_recovery(mds_rank_t who)
1835 {
1836 dout(5) << "handle_mds_recovery mds." << who << dendl;
1837
1838 mdcache->handle_mds_recovery(who);
1839
1840 if (mdsmap->get_tableserver() == whoami) {
1841 snapserver->handle_mds_recovery(who);
1842 }
1843
1844 queue_waiters(waiting_for_active_peer[who]);
1845 waiting_for_active_peer.erase(who);
1846 }
1847
1848 void MDSRank::handle_mds_failure(mds_rank_t who)
1849 {
1850 if (who == whoami) {
1851 dout(5) << "handle_mds_failure for myself; not doing anything" << dendl;
1852 return;
1853 }
1854 dout(5) << "handle_mds_failure mds." << who << dendl;
1855
1856 mdcache->handle_mds_failure(who);
1857
1858 snapclient->handle_mds_failure(who);
1859 }
1860
1861 bool MDSRankDispatcher::handle_asok_command(
1862 std::string command, cmdmap_t& cmdmap, Formatter *f,
1863 std::ostream& ss)
1864 {
1865 if (command == "dump_ops_in_flight" ||
1866 command == "ops") {
1867 if (!op_tracker.dump_ops_in_flight(f)) {
1868 ss << "op_tracker tracking is not enabled now, so no ops are tracked currently, even those get stuck. \
1869 please enable \"osd_enable_op_tracker\", and the tracker will start to track new ops received afterwards.";
1870 }
1871 } else if (command == "dump_blocked_ops") {
1872 if (!op_tracker.dump_ops_in_flight(f, true)) {
1873 ss << "op_tracker tracking is not enabled now, so no ops are tracked currently, even those get stuck. \
1874 Please enable \"osd_enable_op_tracker\", and the tracker will start to track new ops received afterwards.";
1875 }
1876 } else if (command == "dump_historic_ops") {
1877 if (!op_tracker.dump_historic_ops(f)) {
1878 ss << "op_tracker tracking is not enabled now, so no ops are tracked currently, even those get stuck. \
1879 please enable \"osd_enable_op_tracker\", and the tracker will start to track new ops received afterwards.";
1880 }
1881 } else if (command == "dump_historic_ops_by_duration") {
1882 if (!op_tracker.dump_historic_ops(f, true)) {
1883 ss << "op_tracker tracking is not enabled now, so no ops are tracked currently, even those get stuck. \
1884 please enable \"osd_enable_op_tracker\", and the tracker will start to track new ops received afterwards.";
1885 }
1886 } else if (command == "osdmap barrier") {
1887 int64_t target_epoch = 0;
1888 bool got_val = cmd_getval(g_ceph_context, cmdmap, "target_epoch", target_epoch);
1889
1890 if (!got_val) {
1891 ss << "no target epoch given";
1892 return true;
1893 }
1894
1895 mds_lock.Lock();
1896 set_osd_epoch_barrier(target_epoch);
1897 mds_lock.Unlock();
1898
1899 C_SaferCond cond;
1900 bool already_got = objecter->wait_for_map(target_epoch, &cond);
1901 if (!already_got) {
1902 dout(4) << __func__ << ": waiting for OSD epoch " << target_epoch << dendl;
1903 cond.wait();
1904 }
1905 } else if (command == "session ls") {
1906 Mutex::Locker l(mds_lock);
1907
1908 heartbeat_reset();
1909
1910 dump_sessions(SessionFilter(), f);
1911 } else if (command == "session evict") {
1912 std::string client_id;
1913 const bool got_arg = cmd_getval(g_ceph_context, cmdmap, "client_id", client_id);
1914 if(!got_arg) {
1915 ss << "Invalid client_id specified";
1916 return true;
1917 }
1918
1919 mds_lock.Lock();
1920 std::stringstream dss;
1921 bool evicted = evict_client(strtol(client_id.c_str(), 0, 10), true,
1922 g_conf->mds_session_blacklist_on_evict, dss);
1923 if (!evicted) {
1924 dout(15) << dss.str() << dendl;
1925 ss << dss.str();
1926 }
1927 mds_lock.Unlock();
1928 } else if (command == "scrub_path") {
1929 string path;
1930 vector<string> scrubop_vec;
1931 cmd_getval(g_ceph_context, cmdmap, "scrubops", scrubop_vec);
1932 cmd_getval(g_ceph_context, cmdmap, "path", path);
1933 command_scrub_path(f, path, scrubop_vec);
1934 } else if (command == "tag path") {
1935 string path;
1936 cmd_getval(g_ceph_context, cmdmap, "path", path);
1937 string tag;
1938 cmd_getval(g_ceph_context, cmdmap, "tag", tag);
1939 command_tag_path(f, path, tag);
1940 } else if (command == "flush_path") {
1941 string path;
1942 cmd_getval(g_ceph_context, cmdmap, "path", path);
1943 command_flush_path(f, path);
1944 } else if (command == "flush journal") {
1945 command_flush_journal(f);
1946 } else if (command == "get subtrees") {
1947 command_get_subtrees(f);
1948 } else if (command == "export dir") {
1949 string path;
1950 if(!cmd_getval(g_ceph_context, cmdmap, "path", path)) {
1951 ss << "malformed path";
1952 return true;
1953 }
1954 int64_t rank;
1955 if(!cmd_getval(g_ceph_context, cmdmap, "rank", rank)) {
1956 ss << "malformed rank";
1957 return true;
1958 }
1959 command_export_dir(f, path, (mds_rank_t)rank);
1960 } else if (command == "dump cache") {
1961 Mutex::Locker l(mds_lock);
1962 string path;
1963 int r;
1964 if(!cmd_getval(g_ceph_context, cmdmap, "path", path)) {
1965 r = mdcache->dump_cache(f);
1966 } else {
1967 r = mdcache->dump_cache(path);
1968 }
1969
1970 if (r != 0) {
1971 ss << "Failed to dump cache: " << cpp_strerror(r);
1972 f->reset();
1973 }
1974 } else if (command == "cache status") {
1975 Mutex::Locker l(mds_lock);
1976 int r = mdcache->cache_status(f);
1977 if (r != 0) {
1978 ss << "Failed to get cache status: " << cpp_strerror(r);
1979 }
1980 } else if (command == "dump tree") {
1981 string root;
1982 int64_t depth;
1983 cmd_getval(g_ceph_context, cmdmap, "root", root);
1984 if (!cmd_getval(g_ceph_context, cmdmap, "depth", depth))
1985 depth = -1;
1986 {
1987 Mutex::Locker l(mds_lock);
1988 int r = mdcache->dump_cache(root, depth, f);
1989 if (r != 0) {
1990 ss << "Failed to dump tree: " << cpp_strerror(r);
1991 f->reset();
1992 }
1993 }
1994 } else if (command == "force_readonly") {
1995 Mutex::Locker l(mds_lock);
1996 mdcache->force_readonly();
1997 } else if (command == "dirfrag split") {
1998 command_dirfrag_split(cmdmap, ss);
1999 } else if (command == "dirfrag merge") {
2000 command_dirfrag_merge(cmdmap, ss);
2001 } else if (command == "dirfrag ls") {
2002 command_dirfrag_ls(cmdmap, ss, f);
2003 } else {
2004 return false;
2005 }
2006
2007 return true;
2008 }
2009
2010 class C_MDS_Send_Command_Reply : public MDSInternalContext
2011 {
2012 protected:
2013 MCommand *m;
2014 public:
2015 C_MDS_Send_Command_Reply(MDSRank *_mds, MCommand *_m) :
2016 MDSInternalContext(_mds), m(_m) { m->get(); }
2017 void send (int r, boost::string_view out_str) {
2018 bufferlist bl;
2019 MDSDaemon::send_command_reply(m, mds, r, bl, out_str);
2020 m->put();
2021 }
2022 void finish (int r) override {
2023 send(r, "");
2024 }
2025 };
2026
2027 /**
2028 * This function drops the mds_lock, so don't do anything with
2029 * MDSRank after calling it (we could have gone into shutdown): just
2030 * send your result back to the calling client and finish.
2031 */
2032 void MDSRankDispatcher::evict_clients(const SessionFilter &filter, MCommand *m)
2033 {
2034 C_MDS_Send_Command_Reply *reply = new C_MDS_Send_Command_Reply(this, m);
2035
2036 if (is_any_replay()) {
2037 reply->send(-EAGAIN, "MDS is replaying log");
2038 delete reply;
2039 return;
2040 }
2041
2042 std::list<Session*> victims;
2043 const auto sessions = sessionmap.get_sessions();
2044 for (const auto p : sessions) {
2045 if (!p.first.is_client()) {
2046 continue;
2047 }
2048
2049 Session *s = p.second;
2050
2051 if (filter.match(*s, std::bind(&Server::waiting_for_reconnect, server, std::placeholders::_1))) {
2052 victims.push_back(s);
2053 }
2054 }
2055
2056 dout(20) << __func__ << " matched " << victims.size() << " sessions" << dendl;
2057
2058 if (victims.empty()) {
2059 reply->send(0, "");
2060 delete reply;
2061 return;
2062 }
2063
2064 C_GatherBuilder gather(g_ceph_context, reply);
2065 for (const auto s : victims) {
2066 std::stringstream ss;
2067 evict_client(s->info.inst.name.num(), false,
2068 g_conf->mds_session_blacklist_on_evict, ss, gather.new_sub());
2069 }
2070 gather.activate();
2071 }
2072
2073 void MDSRankDispatcher::dump_sessions(const SessionFilter &filter, Formatter *f) const
2074 {
2075 // Dump sessions, decorated with recovery/replay status
2076 f->open_array_section("sessions");
2077 const ceph::unordered_map<entity_name_t, Session*> session_map = sessionmap.get_sessions();
2078 for (ceph::unordered_map<entity_name_t,Session*>::const_iterator p = session_map.begin();
2079 p != session_map.end();
2080 ++p) {
2081 if (!p->first.is_client()) {
2082 continue;
2083 }
2084
2085 Session *s = p->second;
2086
2087 if (!filter.match(*s, std::bind(&Server::waiting_for_reconnect, server, std::placeholders::_1))) {
2088 continue;
2089 }
2090
2091 f->open_object_section("session");
2092 f->dump_int("id", p->first.num());
2093
2094 f->dump_int("num_leases", s->leases.size());
2095 f->dump_int("num_caps", s->caps.size());
2096
2097 f->dump_string("state", s->get_state_name());
2098 f->dump_int("replay_requests", is_clientreplay() ? s->get_request_count() : 0);
2099 f->dump_unsigned("completed_requests", s->get_num_completed_requests());
2100 f->dump_bool("reconnecting", server->waiting_for_reconnect(p->first.num()));
2101 f->dump_stream("inst") << s->info.inst;
2102 f->open_object_section("client_metadata");
2103 for (map<string, string>::const_iterator i = s->info.client_metadata.begin();
2104 i != s->info.client_metadata.end(); ++i) {
2105 f->dump_string(i->first.c_str(), i->second);
2106 }
2107 f->close_section(); // client_metadata
2108 f->close_section(); //session
2109 }
2110 f->close_section(); //sessions
2111 }
2112
2113 void MDSRank::command_scrub_path(Formatter *f, boost::string_view path, vector<string>& scrubop_vec)
2114 {
2115 bool force = false;
2116 bool recursive = false;
2117 bool repair = false;
2118 for (vector<string>::iterator i = scrubop_vec.begin() ; i != scrubop_vec.end(); ++i) {
2119 if (*i == "force")
2120 force = true;
2121 else if (*i == "recursive")
2122 recursive = true;
2123 else if (*i == "repair")
2124 repair = true;
2125 }
2126 C_SaferCond scond;
2127 {
2128 Mutex::Locker l(mds_lock);
2129 mdcache->enqueue_scrub(path, "", force, recursive, repair, f, &scond);
2130 }
2131 scond.wait();
2132 // scrub_dentry() finishers will dump the data for us; we're done!
2133 }
2134
2135 void MDSRank::command_tag_path(Formatter *f,
2136 boost::string_view path, boost::string_view tag)
2137 {
2138 C_SaferCond scond;
2139 {
2140 Mutex::Locker l(mds_lock);
2141 mdcache->enqueue_scrub(path, tag, true, true, false, f, &scond);
2142 }
2143 scond.wait();
2144 }
2145
2146 void MDSRank::command_flush_path(Formatter *f, boost::string_view path)
2147 {
2148 C_SaferCond scond;
2149 {
2150 Mutex::Locker l(mds_lock);
2151 mdcache->flush_dentry(path, &scond);
2152 }
2153 int r = scond.wait();
2154 f->open_object_section("results");
2155 f->dump_int("return_code", r);
2156 f->close_section(); // results
2157 }
2158
2159 /**
2160 * Wrapper around _command_flush_journal that
2161 * handles serialization of result
2162 */
2163 void MDSRank::command_flush_journal(Formatter *f)
2164 {
2165 assert(f != NULL);
2166
2167 std::stringstream ss;
2168 const int r = _command_flush_journal(&ss);
2169 f->open_object_section("result");
2170 f->dump_string("message", ss.str());
2171 f->dump_int("return_code", r);
2172 f->close_section();
2173 }
2174
2175 /**
2176 * Implementation of "flush journal" asok command.
2177 *
2178 * @param ss
2179 * Optionally populate with a human readable string describing the
2180 * reason for any unexpected return status.
2181 */
2182 int MDSRank::_command_flush_journal(std::stringstream *ss)
2183 {
2184 assert(ss != NULL);
2185
2186 Mutex::Locker l(mds_lock);
2187
2188 if (mdcache->is_readonly()) {
2189 dout(5) << __func__ << ": read-only FS" << dendl;
2190 return -EROFS;
2191 }
2192
2193 if (!is_active()) {
2194 dout(5) << __func__ << ": MDS not active, no-op" << dendl;
2195 return 0;
2196 }
2197
2198 // I need to seal off the current segment, and then mark all previous segments
2199 // for expiry
2200 mdlog->start_new_segment();
2201 int r = 0;
2202
2203 // Flush initially so that all the segments older than our new one
2204 // will be elegible for expiry
2205 {
2206 C_SaferCond mdlog_flushed;
2207 mdlog->flush();
2208 mdlog->wait_for_safe(new MDSInternalContextWrapper(this, &mdlog_flushed));
2209 mds_lock.Unlock();
2210 r = mdlog_flushed.wait();
2211 mds_lock.Lock();
2212 if (r != 0) {
2213 *ss << "Error " << r << " (" << cpp_strerror(r) << ") while flushing journal";
2214 return r;
2215 }
2216 }
2217
2218 // Because we may not be the last wait_for_safe context on MDLog, and
2219 // subsequent contexts might wake up in the middle of our later trim_all
2220 // and interfere with expiry (by e.g. marking dirs/dentries dirty
2221 // on previous log segments), we run a second wait_for_safe here.
2222 // See #10368
2223 {
2224 C_SaferCond mdlog_cleared;
2225 mdlog->wait_for_safe(new MDSInternalContextWrapper(this, &mdlog_cleared));
2226 mds_lock.Unlock();
2227 r = mdlog_cleared.wait();
2228 mds_lock.Lock();
2229 if (r != 0) {
2230 *ss << "Error " << r << " (" << cpp_strerror(r) << ") while flushing journal";
2231 return r;
2232 }
2233 }
2234
2235 // Put all the old log segments into expiring or expired state
2236 dout(5) << __func__ << ": beginning segment expiry" << dendl;
2237 r = mdlog->trim_all();
2238 if (r != 0) {
2239 *ss << "Error " << r << " (" << cpp_strerror(r) << ") while trimming log";
2240 return r;
2241 }
2242
2243 // Attach contexts to wait for all expiring segments to expire
2244 MDSGatherBuilder expiry_gather(g_ceph_context);
2245
2246 const std::set<LogSegment*> &expiring_segments = mdlog->get_expiring_segments();
2247 for (std::set<LogSegment*>::const_iterator i = expiring_segments.begin();
2248 i != expiring_segments.end(); ++i) {
2249 (*i)->wait_for_expiry(expiry_gather.new_sub());
2250 }
2251 dout(5) << __func__ << ": waiting for " << expiry_gather.num_subs_created()
2252 << " segments to expire" << dendl;
2253
2254 if (expiry_gather.has_subs()) {
2255 C_SaferCond cond;
2256 expiry_gather.set_finisher(new MDSInternalContextWrapper(this, &cond));
2257 expiry_gather.activate();
2258
2259 // Drop mds_lock to allow progress until expiry is complete
2260 mds_lock.Unlock();
2261 int r = cond.wait();
2262 mds_lock.Lock();
2263
2264 assert(r == 0); // MDLog is not allowed to raise errors via wait_for_expiry
2265 }
2266
2267 dout(5) << __func__ << ": expiry complete, expire_pos/trim_pos is now " << std::hex <<
2268 mdlog->get_journaler()->get_expire_pos() << "/" <<
2269 mdlog->get_journaler()->get_trimmed_pos() << dendl;
2270
2271 // Now everyone I'm interested in is expired
2272 mdlog->trim_expired_segments();
2273
2274 dout(5) << __func__ << ": trim complete, expire_pos/trim_pos is now " << std::hex <<
2275 mdlog->get_journaler()->get_expire_pos() << "/" <<
2276 mdlog->get_journaler()->get_trimmed_pos() << dendl;
2277
2278 // Flush the journal header so that readers will start from after the flushed region
2279 C_SaferCond wrote_head;
2280 mdlog->get_journaler()->write_head(&wrote_head);
2281 mds_lock.Unlock(); // Drop lock to allow messenger dispatch progress
2282 r = wrote_head.wait();
2283 mds_lock.Lock();
2284 if (r != 0) {
2285 *ss << "Error " << r << " (" << cpp_strerror(r) << ") while writing header";
2286 return r;
2287 }
2288
2289 dout(5) << __func__ << ": write_head complete, all done!" << dendl;
2290
2291 return 0;
2292 }
2293
2294
2295 void MDSRank::command_get_subtrees(Formatter *f)
2296 {
2297 assert(f != NULL);
2298 Mutex::Locker l(mds_lock);
2299
2300 std::list<CDir*> subtrees;
2301 mdcache->list_subtrees(subtrees);
2302
2303 f->open_array_section("subtrees");
2304 for (std::list<CDir*>::iterator i = subtrees.begin(); i != subtrees.end(); ++i) {
2305 const CDir *dir = *i;
2306
2307 f->open_object_section("subtree");
2308 {
2309 f->dump_bool("is_auth", dir->is_auth());
2310 f->dump_int("auth_first", dir->get_dir_auth().first);
2311 f->dump_int("auth_second", dir->get_dir_auth().second);
2312 f->dump_int("export_pin", dir->inode->get_export_pin());
2313 f->open_object_section("dir");
2314 dir->dump(f);
2315 f->close_section();
2316 }
2317 f->close_section();
2318 }
2319 f->close_section();
2320 }
2321
2322
2323 void MDSRank::command_export_dir(Formatter *f,
2324 boost::string_view path,
2325 mds_rank_t target)
2326 {
2327 int r = _command_export_dir(path, target);
2328 f->open_object_section("results");
2329 f->dump_int("return_code", r);
2330 f->close_section(); // results
2331 }
2332
2333 int MDSRank::_command_export_dir(
2334 boost::string_view path,
2335 mds_rank_t target)
2336 {
2337 Mutex::Locker l(mds_lock);
2338 filepath fp(path);
2339
2340 if (target == whoami || !mdsmap->is_up(target) || !mdsmap->is_in(target)) {
2341 derr << "bad MDS target " << target << dendl;
2342 return -ENOENT;
2343 }
2344
2345 CInode *in = mdcache->cache_traverse(fp);
2346 if (!in) {
2347 derr << "Bath path '" << path << "'" << dendl;
2348 return -ENOENT;
2349 }
2350 CDir *dir = in->get_dirfrag(frag_t());
2351 if (!dir || !(dir->is_auth())) {
2352 derr << "bad export_dir path dirfrag frag_t() or dir not auth" << dendl;
2353 return -EINVAL;
2354 }
2355
2356 mdcache->migrator->export_dir(dir, target);
2357 return 0;
2358 }
2359
2360 CDir *MDSRank::_command_dirfrag_get(
2361 const cmdmap_t &cmdmap,
2362 std::ostream &ss)
2363 {
2364 std::string path;
2365 bool got = cmd_getval(g_ceph_context, cmdmap, "path", path);
2366 if (!got) {
2367 ss << "missing path argument";
2368 return NULL;
2369 }
2370
2371 std::string frag_str;
2372 if (!cmd_getval(g_ceph_context, cmdmap, "frag", frag_str)) {
2373 ss << "missing frag argument";
2374 return NULL;
2375 }
2376
2377 CInode *in = mdcache->cache_traverse(filepath(path.c_str()));
2378 if (!in) {
2379 // TODO really we should load something in if it's not in cache,
2380 // but the infrastructure is harder, and we might still be unable
2381 // to act on it if someone else is auth.
2382 ss << "directory '" << path << "' inode not in cache";
2383 return NULL;
2384 }
2385
2386 frag_t fg;
2387
2388 if (!fg.parse(frag_str.c_str())) {
2389 ss << "frag " << frag_str << " failed to parse";
2390 return NULL;
2391 }
2392
2393 CDir *dir = in->get_dirfrag(fg);
2394 if (!dir) {
2395 ss << "frag 0x" << std::hex << in->ino() << "/" << fg << " not in cache ("
2396 "use `dirfrag ls` to see if it should exist)";
2397 return NULL;
2398 }
2399
2400 if (!dir->is_auth()) {
2401 ss << "frag " << dir->dirfrag() << " not auth (auth = "
2402 << dir->authority() << ")";
2403 return NULL;
2404 }
2405
2406 return dir;
2407 }
2408
2409 bool MDSRank::command_dirfrag_split(
2410 cmdmap_t cmdmap,
2411 std::ostream &ss)
2412 {
2413 Mutex::Locker l(mds_lock);
2414 if (!mdsmap->allows_dirfrags()) {
2415 ss << "dirfrags are disallowed by the mds map!";
2416 return false;
2417 }
2418
2419 int64_t by = 0;
2420 if (!cmd_getval(g_ceph_context, cmdmap, "bits", by)) {
2421 ss << "missing bits argument";
2422 return false;
2423 }
2424
2425 if (by <= 0) {
2426 ss << "must split by >0 bits";
2427 return false;
2428 }
2429
2430 CDir *dir = _command_dirfrag_get(cmdmap, ss);
2431 if (!dir) {
2432 return false;
2433 }
2434
2435 mdcache->split_dir(dir, by);
2436
2437 return true;
2438 }
2439
2440 bool MDSRank::command_dirfrag_merge(
2441 cmdmap_t cmdmap,
2442 std::ostream &ss)
2443 {
2444 Mutex::Locker l(mds_lock);
2445 std::string path;
2446 bool got = cmd_getval(g_ceph_context, cmdmap, "path", path);
2447 if (!got) {
2448 ss << "missing path argument";
2449 return false;
2450 }
2451
2452 std::string frag_str;
2453 if (!cmd_getval(g_ceph_context, cmdmap, "frag", frag_str)) {
2454 ss << "missing frag argument";
2455 return false;
2456 }
2457
2458 CInode *in = mdcache->cache_traverse(filepath(path.c_str()));
2459 if (!in) {
2460 ss << "directory '" << path << "' inode not in cache";
2461 return false;
2462 }
2463
2464 frag_t fg;
2465 if (!fg.parse(frag_str.c_str())) {
2466 ss << "frag " << frag_str << " failed to parse";
2467 return false;
2468 }
2469
2470 mdcache->merge_dir(in, fg);
2471
2472 return true;
2473 }
2474
2475 bool MDSRank::command_dirfrag_ls(
2476 cmdmap_t cmdmap,
2477 std::ostream &ss,
2478 Formatter *f)
2479 {
2480 Mutex::Locker l(mds_lock);
2481 std::string path;
2482 bool got = cmd_getval(g_ceph_context, cmdmap, "path", path);
2483 if (!got) {
2484 ss << "missing path argument";
2485 return false;
2486 }
2487
2488 CInode *in = mdcache->cache_traverse(filepath(path.c_str()));
2489 if (!in) {
2490 ss << "directory inode not in cache";
2491 return false;
2492 }
2493
2494 f->open_array_section("frags");
2495 std::list<frag_t> frags;
2496 // NB using get_leaves_under instead of get_dirfrags to give
2497 // you the list of what dirfrags may exist, not which are in cache
2498 in->dirfragtree.get_leaves_under(frag_t(), frags);
2499 for (std::list<frag_t>::iterator i = frags.begin();
2500 i != frags.end(); ++i) {
2501 f->open_object_section("frag");
2502 f->dump_int("value", i->value());
2503 f->dump_int("bits", i->bits());
2504 std::ostringstream frag_str;
2505 frag_str << std::hex << i->value() << "/" << std::dec << i->bits();
2506 f->dump_string("str", frag_str.str());
2507 f->close_section();
2508 }
2509 f->close_section();
2510
2511 return true;
2512 }
2513
2514 void MDSRank::dump_status(Formatter *f) const
2515 {
2516 if (state == MDSMap::STATE_REPLAY ||
2517 state == MDSMap::STATE_STANDBY_REPLAY) {
2518 mdlog->dump_replay_status(f);
2519 } else if (state == MDSMap::STATE_RESOLVE) {
2520 mdcache->dump_resolve_status(f);
2521 } else if (state == MDSMap::STATE_RECONNECT) {
2522 server->dump_reconnect_status(f);
2523 } else if (state == MDSMap::STATE_REJOIN) {
2524 mdcache->dump_rejoin_status(f);
2525 } else if (state == MDSMap::STATE_CLIENTREPLAY) {
2526 dump_clientreplay_status(f);
2527 }
2528 f->dump_float("rank_uptime", get_uptime().count());
2529 }
2530
2531 void MDSRank::dump_clientreplay_status(Formatter *f) const
2532 {
2533 f->open_object_section("clientreplay_status");
2534 f->dump_unsigned("clientreplay_queue", replay_queue.size());
2535 f->dump_unsigned("active_replay", mdcache->get_num_client_requests());
2536 f->close_section();
2537 }
2538
2539 void MDSRankDispatcher::update_log_config()
2540 {
2541 map<string,string> log_to_monitors;
2542 map<string,string> log_to_syslog;
2543 map<string,string> log_channel;
2544 map<string,string> log_prio;
2545 map<string,string> log_to_graylog;
2546 map<string,string> log_to_graylog_host;
2547 map<string,string> log_to_graylog_port;
2548 uuid_d fsid;
2549 string host;
2550
2551 if (parse_log_client_options(g_ceph_context, log_to_monitors, log_to_syslog,
2552 log_channel, log_prio, log_to_graylog,
2553 log_to_graylog_host, log_to_graylog_port,
2554 fsid, host) == 0)
2555 clog->update_config(log_to_monitors, log_to_syslog,
2556 log_channel, log_prio, log_to_graylog,
2557 log_to_graylog_host, log_to_graylog_port,
2558 fsid, host);
2559 dout(10) << __func__ << " log_to_monitors " << log_to_monitors << dendl;
2560 }
2561
2562 void MDSRank::create_logger()
2563 {
2564 dout(10) << "create_logger" << dendl;
2565 {
2566 PerfCountersBuilder mds_plb(g_ceph_context, "mds", l_mds_first, l_mds_last);
2567
2568 mds_plb.add_u64_counter(
2569 l_mds_request, "request", "Requests", "req",
2570 PerfCountersBuilder::PRIO_CRITICAL);
2571 mds_plb.add_u64_counter(l_mds_reply, "reply", "Replies");
2572 mds_plb.add_time_avg(
2573 l_mds_reply_latency, "reply_latency", "Reply latency", "rlat",
2574 PerfCountersBuilder::PRIO_CRITICAL);
2575 mds_plb.add_u64_counter(
2576 l_mds_forward, "forward", "Forwarding request", "fwd",
2577 PerfCountersBuilder::PRIO_INTERESTING);
2578 mds_plb.add_u64_counter(l_mds_dir_fetch, "dir_fetch", "Directory fetch");
2579 mds_plb.add_u64_counter(l_mds_dir_commit, "dir_commit", "Directory commit");
2580 mds_plb.add_u64_counter(l_mds_dir_split, "dir_split", "Directory split");
2581 mds_plb.add_u64_counter(l_mds_dir_merge, "dir_merge", "Directory merge");
2582
2583 mds_plb.add_u64(l_mds_inode_max, "inode_max", "Max inodes, cache size");
2584 mds_plb.add_u64(l_mds_inodes, "inodes", "Inodes", "inos",
2585 PerfCountersBuilder::PRIO_CRITICAL);
2586 mds_plb.add_u64(l_mds_inodes_top, "inodes_top", "Inodes on top");
2587 mds_plb.add_u64(l_mds_inodes_bottom, "inodes_bottom", "Inodes on bottom");
2588 mds_plb.add_u64(
2589 l_mds_inodes_pin_tail, "inodes_pin_tail", "Inodes on pin tail");
2590 mds_plb.add_u64(l_mds_inodes_pinned, "inodes_pinned", "Inodes pinned");
2591 mds_plb.add_u64(l_mds_inodes_expired, "inodes_expired", "Inodes expired");
2592 mds_plb.add_u64(
2593 l_mds_inodes_with_caps, "inodes_with_caps", "Inodes with capabilities");
2594 mds_plb.add_u64(l_mds_caps, "caps", "Capabilities", "caps",
2595 PerfCountersBuilder::PRIO_INTERESTING);
2596 mds_plb.add_u64(l_mds_subtrees, "subtrees", "Subtrees");
2597
2598 mds_plb.add_u64_counter(l_mds_traverse, "traverse", "Traverses");
2599 mds_plb.add_u64_counter(l_mds_traverse_hit, "traverse_hit", "Traverse hits");
2600 mds_plb.add_u64_counter(l_mds_traverse_forward, "traverse_forward",
2601 "Traverse forwards");
2602 mds_plb.add_u64_counter(l_mds_traverse_discover, "traverse_discover",
2603 "Traverse directory discovers");
2604 mds_plb.add_u64_counter(l_mds_traverse_dir_fetch, "traverse_dir_fetch",
2605 "Traverse incomplete directory content fetchings");
2606 mds_plb.add_u64_counter(l_mds_traverse_remote_ino, "traverse_remote_ino",
2607 "Traverse remote dentries");
2608 mds_plb.add_u64_counter(l_mds_traverse_lock, "traverse_lock",
2609 "Traverse locks");
2610
2611 mds_plb.add_u64(l_mds_load_cent, "load_cent", "Load per cent");
2612 mds_plb.add_u64(l_mds_dispatch_queue_len, "q", "Dispatch queue length");
2613
2614 mds_plb.add_u64_counter(l_mds_exported, "exported", "Exports");
2615 mds_plb.add_u64_counter(
2616 l_mds_exported_inodes, "exported_inodes", "Exported inodes", "exi",
2617 PerfCountersBuilder::PRIO_INTERESTING);
2618 mds_plb.add_u64_counter(l_mds_imported, "imported", "Imports");
2619 mds_plb.add_u64_counter(
2620 l_mds_imported_inodes, "imported_inodes", "Imported inodes", "imi",
2621 PerfCountersBuilder::PRIO_INTERESTING);
2622 logger = mds_plb.create_perf_counters();
2623 g_ceph_context->get_perfcounters_collection()->add(logger);
2624 }
2625
2626 {
2627 PerfCountersBuilder mdm_plb(g_ceph_context, "mds_mem", l_mdm_first, l_mdm_last);
2628 mdm_plb.add_u64(l_mdm_ino, "ino", "Inodes", "ino",
2629 PerfCountersBuilder::PRIO_INTERESTING);
2630 mdm_plb.add_u64_counter(l_mdm_inoa, "ino+", "Inodes opened");
2631 mdm_plb.add_u64_counter(l_mdm_inos, "ino-", "Inodes closed");
2632 mdm_plb.add_u64(l_mdm_dir, "dir", "Directories");
2633 mdm_plb.add_u64_counter(l_mdm_dira, "dir+", "Directories opened");
2634 mdm_plb.add_u64_counter(l_mdm_dirs, "dir-", "Directories closed");
2635 mdm_plb.add_u64(l_mdm_dn, "dn", "Dentries", "dn",
2636 PerfCountersBuilder::PRIO_INTERESTING);
2637 mdm_plb.add_u64_counter(l_mdm_dna, "dn+", "Dentries opened");
2638 mdm_plb.add_u64_counter(l_mdm_dns, "dn-", "Dentries closed");
2639 mdm_plb.add_u64(l_mdm_cap, "cap", "Capabilities");
2640 mdm_plb.add_u64_counter(l_mdm_capa, "cap+", "Capabilities added");
2641 mdm_plb.add_u64_counter(l_mdm_caps, "cap-", "Capabilities removed");
2642 mdm_plb.add_u64(l_mdm_rss, "rss", "RSS");
2643 mdm_plb.add_u64(l_mdm_heap, "heap", "Heap size");
2644 mdm_plb.add_u64(l_mdm_buf, "buf", "Buffer size");
2645 mlogger = mdm_plb.create_perf_counters();
2646 g_ceph_context->get_perfcounters_collection()->add(mlogger);
2647 }
2648
2649 mdlog->create_logger();
2650 server->create_logger();
2651 purge_queue.create_logger();
2652 sessionmap.register_perfcounters();
2653 mdcache->register_perfcounters();
2654 }
2655
2656 void MDSRank::check_ops_in_flight()
2657 {
2658 vector<string> warnings;
2659 int slow = 0;
2660 if (op_tracker.check_ops_in_flight(warnings, &slow)) {
2661 for (vector<string>::iterator i = warnings.begin();
2662 i != warnings.end();
2663 ++i) {
2664 clog->warn() << *i;
2665 }
2666 }
2667
2668 // set mds slow request count
2669 mds_slow_req_count = slow;
2670 return;
2671 }
2672
2673 void MDSRankDispatcher::handle_osd_map()
2674 {
2675 if (is_active() && snapserver) {
2676 snapserver->check_osd_map(true);
2677 }
2678
2679 server->handle_osd_map();
2680
2681 purge_queue.update_op_limit(*mdsmap);
2682
2683 std::set<entity_addr_t> newly_blacklisted;
2684 objecter->consume_blacklist_events(&newly_blacklisted);
2685 auto epoch = objecter->with_osdmap([](const OSDMap &o){return o.get_epoch();});
2686 dout(4) << "handle_osd_map epoch " << epoch << ", "
2687 << newly_blacklisted.size() << " new blacklist entries" << dendl;
2688 auto victims = server->apply_blacklist(newly_blacklisted);
2689 if (victims) {
2690 set_osd_epoch_barrier(epoch);
2691 }
2692
2693
2694 // By default the objecter only requests OSDMap updates on use,
2695 // we would like to always receive the latest maps in order to
2696 // apply policy based on the FULL flag.
2697 objecter->maybe_request_map();
2698 }
2699
2700 bool MDSRank::evict_client(int64_t session_id,
2701 bool wait, bool blacklist, std::stringstream& err_ss,
2702 Context *on_killed)
2703 {
2704 assert(mds_lock.is_locked_by_me());
2705
2706 // Mutually exclusive args
2707 assert(!(wait && on_killed != nullptr));
2708
2709 if (is_any_replay()) {
2710 err_ss << "MDS is replaying log";
2711 return false;
2712 }
2713
2714 Session *session = sessionmap.get_session(
2715 entity_name_t(CEPH_ENTITY_TYPE_CLIENT, session_id));
2716 if (!session) {
2717 err_ss << "session " << session_id << " not in sessionmap!";
2718 return false;
2719 }
2720
2721 dout(4) << "Preparing blacklist command... (wait=" << wait << ")" << dendl;
2722 stringstream ss;
2723 ss << "{\"prefix\":\"osd blacklist\", \"blacklistop\":\"add\",";
2724 ss << "\"addr\":\"";
2725 ss << session->info.inst.addr;
2726 ss << "\"}";
2727 std::string tmp = ss.str();
2728 std::vector<std::string> cmd = {tmp};
2729
2730 auto kill_mds_session = [this, session_id, on_killed](){
2731 assert(mds_lock.is_locked_by_me());
2732 Session *session = sessionmap.get_session(
2733 entity_name_t(CEPH_ENTITY_TYPE_CLIENT, session_id));
2734 if (session) {
2735 if (on_killed) {
2736 server->kill_session(session, on_killed);
2737 } else {
2738 C_SaferCond on_safe;
2739 server->kill_session(session, &on_safe);
2740
2741 mds_lock.Unlock();
2742 on_safe.wait();
2743 mds_lock.Lock();
2744 }
2745 } else {
2746 dout(1) << "session " << session_id << " was removed while we waited "
2747 "for blacklist" << dendl;
2748
2749 // Even though it wasn't us that removed it, kick our completion
2750 // as the session has been removed.
2751 if (on_killed) {
2752 on_killed->complete(0);
2753 }
2754 }
2755 };
2756
2757 auto background_blacklist = [this, session_id, cmd](std::function<void ()> fn){
2758 assert(mds_lock.is_locked_by_me());
2759
2760 Context *on_blacklist_done = new FunctionContext([this, session_id, fn](int r) {
2761 objecter->wait_for_latest_osdmap(
2762 new C_OnFinisher(
2763 new FunctionContext([this, session_id, fn](int r) {
2764 Mutex::Locker l(mds_lock);
2765 auto epoch = objecter->with_osdmap([](const OSDMap &o){
2766 return o.get_epoch();
2767 });
2768
2769 set_osd_epoch_barrier(epoch);
2770
2771 fn();
2772 }), finisher)
2773 );
2774 });
2775
2776 dout(4) << "Sending mon blacklist command: " << cmd[0] << dendl;
2777 monc->start_mon_command(cmd, {}, nullptr, nullptr, on_blacklist_done);
2778 };
2779
2780 auto blocking_blacklist = [this, cmd, &err_ss, background_blacklist](){
2781 C_SaferCond inline_ctx;
2782 background_blacklist([&inline_ctx](){inline_ctx.complete(0);});
2783 mds_lock.Unlock();
2784 inline_ctx.wait();
2785 mds_lock.Lock();
2786 };
2787
2788 if (wait) {
2789 if (blacklist) {
2790 blocking_blacklist();
2791 }
2792
2793 // We dropped mds_lock, so check that session still exists
2794 session = sessionmap.get_session(entity_name_t(CEPH_ENTITY_TYPE_CLIENT,
2795 session_id));
2796 if (!session) {
2797 dout(1) << "session " << session_id << " was removed while we waited "
2798 "for blacklist" << dendl;
2799 return true;
2800 }
2801 kill_mds_session();
2802 } else {
2803 if (blacklist) {
2804 background_blacklist(kill_mds_session);
2805 } else {
2806 kill_mds_session();
2807 }
2808 }
2809
2810 return true;
2811 }
2812
2813 void MDSRank::bcast_mds_map()
2814 {
2815 dout(7) << "bcast_mds_map " << mdsmap->get_epoch() << dendl;
2816
2817 // share the map with mounted clients
2818 set<Session*> clients;
2819 sessionmap.get_client_session_set(clients);
2820 for (set<Session*>::const_iterator p = clients.begin();
2821 p != clients.end();
2822 ++p)
2823 (*p)->connection->send_message(new MMDSMap(monc->get_fsid(), mdsmap));
2824 last_client_mdsmap_bcast = mdsmap->get_epoch();
2825 }
2826
2827 MDSRankDispatcher::MDSRankDispatcher(
2828 mds_rank_t whoami_,
2829 Mutex &mds_lock_,
2830 LogChannelRef &clog_,
2831 SafeTimer &timer_,
2832 Beacon &beacon_,
2833 MDSMap *& mdsmap_,
2834 Messenger *msgr,
2835 MonClient *monc_,
2836 Context *respawn_hook_,
2837 Context *suicide_hook_)
2838 : MDSRank(whoami_, mds_lock_, clog_, timer_, beacon_, mdsmap_,
2839 msgr, monc_, respawn_hook_, suicide_hook_)
2840 {}
2841
2842 bool MDSRankDispatcher::handle_command(
2843 const cmdmap_t &cmdmap,
2844 MCommand *m,
2845 int *r,
2846 std::stringstream *ds,
2847 std::stringstream *ss,
2848 bool *need_reply)
2849 {
2850 assert(r != nullptr);
2851 assert(ds != nullptr);
2852 assert(ss != nullptr);
2853
2854 *need_reply = true;
2855
2856 std::string prefix;
2857 cmd_getval(g_ceph_context, cmdmap, "prefix", prefix);
2858
2859 if (prefix == "session ls" || prefix == "client ls") {
2860 std::vector<std::string> filter_args;
2861 cmd_getval(g_ceph_context, cmdmap, "filters", filter_args);
2862
2863 SessionFilter filter;
2864 *r = filter.parse(filter_args, ss);
2865 if (*r != 0) {
2866 return true;
2867 }
2868
2869 Formatter *f = new JSONFormatter(true);
2870 dump_sessions(filter, f);
2871 f->flush(*ds);
2872 delete f;
2873 return true;
2874 } else if (prefix == "session evict" || prefix == "client evict") {
2875 std::vector<std::string> filter_args;
2876 cmd_getval(g_ceph_context, cmdmap, "filters", filter_args);
2877
2878 SessionFilter filter;
2879 *r = filter.parse(filter_args, ss);
2880 if (*r != 0) {
2881 return true;
2882 }
2883
2884 evict_clients(filter, m);
2885
2886 *need_reply = false;
2887 return true;
2888 } else if (prefix == "damage ls") {
2889 Formatter *f = new JSONFormatter(true);
2890 damage_table.dump(f);
2891 f->flush(*ds);
2892 delete f;
2893 return true;
2894 } else if (prefix == "damage rm") {
2895 damage_entry_id_t id = 0;
2896 bool got = cmd_getval(g_ceph_context, cmdmap, "damage_id", (int64_t&)id);
2897 if (!got) {
2898 *r = -EINVAL;
2899 return true;
2900 }
2901
2902 damage_table.erase(id);
2903 return true;
2904 } else {
2905 return false;
2906 }
2907 }
2908
2909 epoch_t MDSRank::get_osd_epoch() const
2910 {
2911 return objecter->with_osdmap(std::mem_fn(&OSDMap::get_epoch));
2912 }
2913