]> git.proxmox.com Git - ceph.git/blob - ceph/src/mds/MDSRank.cc
update sources to 12.2.8
[ceph.git] / ceph / src / mds / MDSRank.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2015 Red Hat
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #include <boost/utility/string_view.hpp>
16
17 #include "common/debug.h"
18 #include "common/errno.h"
19
20 #include "messages/MClientRequestForward.h"
21 #include "messages/MMDSLoadTargets.h"
22 #include "messages/MMDSMap.h"
23 #include "messages/MMDSTableRequest.h"
24 #include "messages/MCommand.h"
25 #include "messages/MCommandReply.h"
26
27 #include "MDSDaemon.h"
28 #include "MDSMap.h"
29 #include "SnapClient.h"
30 #include "SnapServer.h"
31 #include "MDBalancer.h"
32 #include "Locker.h"
33 #include "Server.h"
34 #include "InoTable.h"
35 #include "mon/MonClient.h"
36 #include "common/HeartbeatMap.h"
37 #include "ScrubStack.h"
38
39
40 #include "MDSRank.h"
41
42 #define dout_context g_ceph_context
43 #define dout_subsys ceph_subsys_mds
44 #undef dout_prefix
45 #define dout_prefix *_dout << "mds." << whoami << '.' << incarnation << ' '
46
47 MDSRank::MDSRank(
48 mds_rank_t whoami_,
49 Mutex &mds_lock_,
50 LogChannelRef &clog_,
51 SafeTimer &timer_,
52 Beacon &beacon_,
53 MDSMap *& mdsmap_,
54 Messenger *msgr,
55 MonClient *monc_,
56 Context *respawn_hook_,
57 Context *suicide_hook_)
58 :
59 whoami(whoami_), incarnation(0),
60 mds_lock(mds_lock_), cct(msgr->cct), clog(clog_), timer(timer_),
61 mdsmap(mdsmap_),
62 objecter(new Objecter(g_ceph_context, msgr, monc_, nullptr, 0, 0)),
63 server(NULL), mdcache(NULL), locker(NULL), mdlog(NULL),
64 balancer(NULL), scrubstack(NULL),
65 damage_table(whoami_),
66 inotable(NULL), snapserver(NULL), snapclient(NULL),
67 sessionmap(this), logger(NULL), mlogger(NULL),
68 op_tracker(g_ceph_context, g_conf->mds_enable_op_tracker,
69 g_conf->osd_num_op_tracker_shard),
70 last_state(MDSMap::STATE_BOOT),
71 state(MDSMap::STATE_BOOT),
72 cluster_degraded(false), stopping(false),
73 purge_queue(g_ceph_context, whoami_,
74 mdsmap_->get_metadata_pool(), objecter,
75 new FunctionContext(
76 [this](int r){
77 // Purge Queue operates inside mds_lock when we're calling into
78 // it, and outside when in background, so must handle both cases.
79 if (mds_lock.is_locked_by_me()) {
80 damaged();
81 } else {
82 damaged_unlocked();
83 }
84 }
85 )
86 ),
87 progress_thread(this), dispatch_depth(0),
88 hb(NULL), last_tid(0), osd_epoch_barrier(0), beacon(beacon_),
89 mds_slow_req_count(0),
90 last_client_mdsmap_bcast(0),
91 messenger(msgr), monc(monc_),
92 respawn_hook(respawn_hook_),
93 suicide_hook(suicide_hook_),
94 standby_replaying(false),
95 starttime(mono_clock::now())
96 {
97 hb = g_ceph_context->get_heartbeat_map()->add_worker("MDSRank", pthread_self());
98
99 purge_queue.update_op_limit(*mdsmap);
100
101 objecter->unset_honor_osdmap_full();
102
103 finisher = new Finisher(cct);
104
105 mdcache = new MDCache(this, purge_queue);
106 mdlog = new MDLog(this);
107 balancer = new MDBalancer(this, messenger, monc);
108
109 scrubstack = new ScrubStack(mdcache, finisher);
110
111 inotable = new InoTable(this);
112 snapserver = new SnapServer(this, monc);
113 snapclient = new SnapClient(this);
114
115 server = new Server(this);
116 locker = new Locker(this, mdcache);
117
118 op_tracker.set_complaint_and_threshold(cct->_conf->mds_op_complaint_time,
119 cct->_conf->mds_op_log_threshold);
120 op_tracker.set_history_size_and_duration(cct->_conf->mds_op_history_size,
121 cct->_conf->mds_op_history_duration);
122 }
123
124 MDSRank::~MDSRank()
125 {
126 if (hb) {
127 g_ceph_context->get_heartbeat_map()->remove_worker(hb);
128 }
129
130 if (scrubstack) { delete scrubstack; scrubstack = NULL; }
131 if (mdcache) { delete mdcache; mdcache = NULL; }
132 if (mdlog) { delete mdlog; mdlog = NULL; }
133 if (balancer) { delete balancer; balancer = NULL; }
134 if (inotable) { delete inotable; inotable = NULL; }
135 if (snapserver) { delete snapserver; snapserver = NULL; }
136 if (snapclient) { delete snapclient; snapclient = NULL; }
137 if (mdsmap) { delete mdsmap; mdsmap = 0; }
138
139 if (server) { delete server; server = 0; }
140 if (locker) { delete locker; locker = 0; }
141
142 if (logger) {
143 g_ceph_context->get_perfcounters_collection()->remove(logger);
144 delete logger;
145 logger = 0;
146 }
147 if (mlogger) {
148 g_ceph_context->get_perfcounters_collection()->remove(mlogger);
149 delete mlogger;
150 mlogger = 0;
151 }
152
153 delete finisher;
154 finisher = NULL;
155
156 delete suicide_hook;
157 suicide_hook = NULL;
158
159 delete respawn_hook;
160 respawn_hook = NULL;
161
162 delete objecter;
163 objecter = nullptr;
164 }
165
166 void MDSRankDispatcher::init()
167 {
168 objecter->init();
169 messenger->add_dispatcher_head(objecter);
170
171 objecter->start();
172
173 update_log_config();
174 create_logger();
175
176 // Expose the OSDMap (already populated during MDS::init) to anyone
177 // who is interested in it.
178 handle_osd_map();
179
180 progress_thread.create("mds_rank_progr");
181
182 purge_queue.init();
183
184 finisher->start();
185 }
186
187 void MDSRank::update_targets(utime_t now)
188 {
189 // get MonMap's idea of my export_targets
190 const set<mds_rank_t>& map_targets = mdsmap->get_mds_info(get_nodeid()).export_targets;
191
192 dout(20) << "updating export targets, currently " << map_targets.size() << " ranks are targets" << dendl;
193
194 bool send = false;
195 set<mds_rank_t> new_map_targets;
196
197 auto it = export_targets.begin();
198 while (it != export_targets.end()) {
199 mds_rank_t rank = it->first;
200 double val = it->second.get(now);
201 dout(20) << "export target mds." << rank << " value is " << val << " @ " << now << dendl;
202
203 if (val <= 0.01) {
204 dout(15) << "export target mds." << rank << " is no longer an export target" << dendl;
205 export_targets.erase(it++);
206 send = true;
207 continue;
208 }
209 if (!map_targets.count(rank)) {
210 dout(15) << "export target mds." << rank << " not in map's export_targets" << dendl;
211 send = true;
212 }
213 new_map_targets.insert(rank);
214 it++;
215 }
216 if (new_map_targets.size() < map_targets.size()) {
217 dout(15) << "export target map holds stale targets, sending update" << dendl;
218 send = true;
219 }
220
221 if (send) {
222 dout(15) << "updating export_targets, now " << new_map_targets.size() << " ranks are targets" << dendl;
223 MMDSLoadTargets* m = new MMDSLoadTargets(mds_gid_t(monc->get_global_id()), new_map_targets);
224 monc->send_mon_message(m);
225 }
226 }
227
228 void MDSRank::hit_export_target(utime_t now, mds_rank_t rank, double amount)
229 {
230 double rate = g_conf->mds_bal_target_decay;
231 if (amount < 0.0) {
232 amount = 100.0/g_conf->mds_bal_target_decay; /* a good default for "i am trying to keep this export_target active" */
233 }
234 auto em = export_targets.emplace(std::piecewise_construct, std::forward_as_tuple(rank), std::forward_as_tuple(now, DecayRate(rate)));
235 if (em.second) {
236 dout(15) << "hit export target (new) " << amount << " @ " << now << dendl;
237 } else {
238 dout(15) << "hit export target " << amount << " @ " << now << dendl;
239 }
240 em.first->second.hit(now, amount);
241 }
242
243 void MDSRankDispatcher::tick()
244 {
245 heartbeat_reset();
246
247 if (beacon.is_laggy()) {
248 dout(5) << "tick bailing out since we seem laggy" << dendl;
249 return;
250 }
251
252 check_ops_in_flight();
253
254 // Wake up thread in case we use to be laggy and have waiting_for_nolaggy
255 // messages to progress.
256 progress_thread.signal();
257
258 // make sure mds log flushes, trims periodically
259 mdlog->flush();
260
261 if (is_active() || is_stopping()) {
262 mdcache->trim();
263 mdcache->trim_client_leases();
264 mdcache->check_memory_usage();
265 mdlog->trim(); // NOT during recovery!
266 }
267
268 // log
269 if (logger) {
270 logger->set(l_mds_subtrees, mdcache->num_subtrees());
271
272 mdcache->log_stat();
273 }
274
275 // ...
276 if (is_clientreplay() || is_active() || is_stopping()) {
277 server->find_idle_sessions();
278 locker->tick();
279 }
280
281 if (is_reconnect())
282 server->reconnect_tick();
283
284 if (is_active()) {
285 balancer->tick();
286 mdcache->find_stale_fragment_freeze();
287 mdcache->migrator->find_stale_export_freeze();
288 if (snapserver)
289 snapserver->check_osd_map(false);
290 }
291
292 if (is_active() || is_stopping()) {
293 update_targets(ceph_clock_now());
294 }
295
296 // shut down?
297 if (is_stopping()) {
298 mdlog->trim();
299 if (mdcache->shutdown_pass()) {
300 uint64_t pq_progress = 0 ;
301 uint64_t pq_total = 0;
302 size_t pq_in_flight = 0;
303 if (!purge_queue.drain(&pq_progress, &pq_total, &pq_in_flight)) {
304 dout(7) << "shutdown_pass=true, but still waiting for purge queue"
305 << dendl;
306 // This takes unbounded time, so we must indicate progress
307 // to the administrator: we do it in a slightly imperfect way
308 // by sending periodic (tick frequency) clog messages while
309 // in this state.
310 clog->info() << "MDS rank " << whoami << " waiting for purge queue ("
311 << std::dec << pq_progress << "/" << pq_total << " " << pq_in_flight
312 << " files purging" << ")";
313 } else {
314 dout(7) << "shutdown_pass=true, finished w/ shutdown, moving to "
315 "down:stopped" << dendl;
316 stopping_done();
317 }
318 }
319 else {
320 dout(7) << "shutdown_pass=false" << dendl;
321 }
322 }
323
324 // Expose ourselves to Beacon to update health indicators
325 beacon.notify_health(this);
326 }
327
328 void MDSRankDispatcher::shutdown()
329 {
330 // It should never be possible for shutdown to get called twice, because
331 // anyone picking up mds_lock checks if stopping is true and drops
332 // out if it is.
333 assert(stopping == false);
334 stopping = true;
335
336 dout(1) << __func__ << ": shutting down rank " << whoami << dendl;
337
338 timer.shutdown();
339
340 // MDLog has to shut down before the finisher, because some of its
341 // threads block on IOs that require finisher to complete.
342 mdlog->shutdown();
343
344 // shut down cache
345 mdcache->shutdown();
346
347 purge_queue.shutdown();
348
349 mds_lock.Unlock();
350 finisher->stop(); // no flushing
351 mds_lock.Lock();
352
353 if (objecter->initialized)
354 objecter->shutdown();
355
356 monc->shutdown();
357
358 op_tracker.on_shutdown();
359
360 progress_thread.shutdown();
361
362 // release mds_lock for finisher/messenger threads (e.g.
363 // MDSDaemon::ms_handle_reset called from Messenger).
364 mds_lock.Unlock();
365
366 // shut down messenger
367 messenger->shutdown();
368
369 mds_lock.Lock();
370
371 // Workaround unclean shutdown: HeartbeatMap will assert if
372 // worker is not removed (as we do in ~MDS), but ~MDS is not
373 // always called after suicide.
374 if (hb) {
375 g_ceph_context->get_heartbeat_map()->remove_worker(hb);
376 hb = NULL;
377 }
378 }
379
380 /**
381 * Helper for simple callbacks that call a void fn with no args.
382 */
383 class C_MDS_VoidFn : public MDSInternalContext
384 {
385 typedef void (MDSRank::*fn_ptr)();
386 protected:
387 fn_ptr fn;
388 public:
389 C_MDS_VoidFn(MDSRank *mds_, fn_ptr fn_)
390 : MDSInternalContext(mds_), fn(fn_)
391 {
392 assert(mds_);
393 assert(fn_);
394 }
395
396 void finish(int r) override
397 {
398 (mds->*fn)();
399 }
400 };
401
402 int64_t MDSRank::get_metadata_pool()
403 {
404 return mdsmap->get_metadata_pool();
405 }
406
407 MDSTableClient *MDSRank::get_table_client(int t)
408 {
409 switch (t) {
410 case TABLE_ANCHOR: return NULL;
411 case TABLE_SNAP: return snapclient;
412 default: ceph_abort();
413 }
414 }
415
416 MDSTableServer *MDSRank::get_table_server(int t)
417 {
418 switch (t) {
419 case TABLE_ANCHOR: return NULL;
420 case TABLE_SNAP: return snapserver;
421 default: ceph_abort();
422 }
423 }
424
425 void MDSRank::suicide()
426 {
427 if (suicide_hook) {
428 suicide_hook->complete(0);
429 suicide_hook = NULL;
430 }
431 }
432
433 void MDSRank::respawn()
434 {
435 if (respawn_hook) {
436 respawn_hook->complete(0);
437 respawn_hook = NULL;
438 }
439 }
440
441 void MDSRank::damaged()
442 {
443 assert(whoami != MDS_RANK_NONE);
444 assert(mds_lock.is_locked_by_me());
445
446 beacon.set_want_state(mdsmap, MDSMap::STATE_DAMAGED);
447 monc->flush_log(); // Flush any clog error from before we were called
448 beacon.notify_health(this); // Include latest status in our swan song
449 beacon.send_and_wait(g_conf->mds_mon_shutdown_timeout);
450
451 // It's okay if we timed out and the mon didn't get our beacon, because
452 // another daemon (or ourselves after respawn) will eventually take the
453 // rank and report DAMAGED again when it hits same problem we did.
454
455 respawn(); // Respawn into standby in case mon has other work for us
456 }
457
458 void MDSRank::damaged_unlocked()
459 {
460 Mutex::Locker l(mds_lock);
461 damaged();
462 }
463
464 void MDSRank::handle_write_error(int err)
465 {
466 if (err == -EBLACKLISTED) {
467 derr << "we have been blacklisted (fenced), respawning..." << dendl;
468 respawn();
469 return;
470 }
471
472 if (g_conf->mds_action_on_write_error >= 2) {
473 derr << "unhandled write error " << cpp_strerror(err) << ", suicide..." << dendl;
474 respawn();
475 } else if (g_conf->mds_action_on_write_error == 1) {
476 derr << "unhandled write error " << cpp_strerror(err) << ", force readonly..." << dendl;
477 mdcache->force_readonly();
478 } else {
479 // ignore;
480 derr << "unhandled write error " << cpp_strerror(err) << ", ignore..." << dendl;
481 }
482 }
483
484 void *MDSRank::ProgressThread::entry()
485 {
486 Mutex::Locker l(mds->mds_lock);
487 while (true) {
488 while (!mds->stopping &&
489 mds->finished_queue.empty() &&
490 (mds->waiting_for_nolaggy.empty() || mds->beacon.is_laggy())) {
491 cond.Wait(mds->mds_lock);
492 }
493
494 if (mds->stopping) {
495 break;
496 }
497
498 mds->_advance_queues();
499 }
500
501 return NULL;
502 }
503
504
505 void MDSRank::ProgressThread::shutdown()
506 {
507 assert(mds->mds_lock.is_locked_by_me());
508 assert(mds->stopping);
509
510 if (am_self()) {
511 // Stopping is set, we will fall out of our main loop naturally
512 } else {
513 // Kick the thread to notice mds->stopping, and join it
514 cond.Signal();
515 mds->mds_lock.Unlock();
516 if (is_started())
517 join();
518 mds->mds_lock.Lock();
519 }
520 }
521
522 bool MDSRankDispatcher::ms_dispatch(Message *m)
523 {
524 bool ret;
525 inc_dispatch_depth();
526 ret = _dispatch(m, true);
527 dec_dispatch_depth();
528 return ret;
529 }
530
531 /* If this function returns true, it recognizes the message and has taken the
532 * reference. If it returns false, it has done neither. */
533 bool MDSRank::_dispatch(Message *m, bool new_msg)
534 {
535 if (is_stale_message(m)) {
536 m->put();
537 return true;
538 }
539
540 if (beacon.is_laggy()) {
541 dout(10) << " laggy, deferring " << *m << dendl;
542 waiting_for_nolaggy.push_back(m);
543 } else if (new_msg && !waiting_for_nolaggy.empty()) {
544 dout(10) << " there are deferred messages, deferring " << *m << dendl;
545 waiting_for_nolaggy.push_back(m);
546 } else {
547 if (!handle_deferrable_message(m)) {
548 dout(0) << "unrecognized message " << *m << dendl;
549 return false;
550 }
551
552 heartbeat_reset();
553 }
554
555 if (dispatch_depth > 1)
556 return true;
557
558 // finish any triggered contexts
559 _advance_queues();
560
561 if (beacon.is_laggy()) {
562 // We've gone laggy during dispatch, don't do any
563 // more housekeeping
564 return true;
565 }
566
567 // done with all client replayed requests?
568 if (is_clientreplay() &&
569 mdcache->is_open() &&
570 replay_queue.empty() &&
571 beacon.get_want_state() == MDSMap::STATE_CLIENTREPLAY) {
572 int num_requests = mdcache->get_num_client_requests();
573 dout(10) << " still have " << num_requests << " active replay requests" << dendl;
574 if (num_requests == 0)
575 clientreplay_done();
576 }
577
578 // hack: thrash exports
579 static utime_t start;
580 utime_t now = ceph_clock_now();
581 if (start == utime_t())
582 start = now;
583 /*double el = now - start;
584 if (el > 30.0 &&
585 el < 60.0)*/
586 for (int i=0; i<g_conf->mds_thrash_exports; i++) {
587 set<mds_rank_t> s;
588 if (!is_active()) break;
589 mdsmap->get_mds_set(s, MDSMap::STATE_ACTIVE);
590 if (s.size() < 2 || CInode::count() < 10)
591 break; // need peers for this to work.
592 if (mdcache->migrator->get_num_exporting() > g_conf->mds_thrash_exports * 5 ||
593 mdcache->migrator->get_export_queue_size() > g_conf->mds_thrash_exports * 10)
594 break;
595
596 dout(7) << "mds thrashing exports pass " << (i+1) << "/" << g_conf->mds_thrash_exports << dendl;
597
598 // pick a random dir inode
599 CInode *in = mdcache->hack_pick_random_inode();
600
601 list<CDir*> ls;
602 in->get_dirfrags(ls);
603 if (!ls.empty()) { // must be an open dir.
604 list<CDir*>::iterator p = ls.begin();
605 int n = rand() % ls.size();
606 while (n--)
607 ++p;
608 CDir *dir = *p;
609 if (!dir->get_parent_dir()) continue; // must be linked.
610 if (!dir->is_auth()) continue; // must be auth.
611
612 mds_rank_t dest;
613 do {
614 int k = rand() % s.size();
615 set<mds_rank_t>::iterator p = s.begin();
616 while (k--) ++p;
617 dest = *p;
618 } while (dest == whoami);
619 mdcache->migrator->export_dir_nicely(dir,dest);
620 }
621 }
622 // hack: thrash fragments
623 for (int i=0; i<g_conf->mds_thrash_fragments; i++) {
624 if (!is_active()) break;
625 if (mdcache->get_num_fragmenting_dirs() > 5 * g_conf->mds_thrash_fragments) break;
626 dout(7) << "mds thrashing fragments pass " << (i+1) << "/" << g_conf->mds_thrash_fragments << dendl;
627
628 // pick a random dir inode
629 CInode *in = mdcache->hack_pick_random_inode();
630
631 list<CDir*> ls;
632 in->get_dirfrags(ls);
633 if (ls.empty()) continue; // must be an open dir.
634 CDir *dir = ls.front();
635 if (!dir->get_parent_dir()) continue; // must be linked.
636 if (!dir->is_auth()) continue; // must be auth.
637 frag_t fg = dir->get_frag();
638 if (mdsmap->allows_dirfrags()) {
639 if ((fg == frag_t() || (rand() % (1 << fg.bits()) == 0))) {
640 mdcache->split_dir(dir, 1);
641 } else {
642 balancer->queue_merge(dir);
643 }
644 }
645 }
646
647 // hack: force hash root?
648 /*
649 if (false &&
650 mdcache->get_root() &&
651 mdcache->get_root()->dir &&
652 !(mdcache->get_root()->dir->is_hashed() ||
653 mdcache->get_root()->dir->is_hashing())) {
654 dout(0) << "hashing root" << dendl;
655 mdcache->migrator->hash_dir(mdcache->get_root()->dir);
656 }
657 */
658
659 update_mlogger();
660 return true;
661 }
662
663 void MDSRank::update_mlogger()
664 {
665 if (mlogger) {
666 mlogger->set(l_mdm_ino, CInode::count());
667 mlogger->set(l_mdm_dir, CDir::count());
668 mlogger->set(l_mdm_dn, CDentry::count());
669 mlogger->set(l_mdm_cap, Capability::count());
670 mlogger->set(l_mdm_inoa, CInode::increments());
671 mlogger->set(l_mdm_inos, CInode::decrements());
672 mlogger->set(l_mdm_dira, CDir::increments());
673 mlogger->set(l_mdm_dirs, CDir::decrements());
674 mlogger->set(l_mdm_dna, CDentry::increments());
675 mlogger->set(l_mdm_dns, CDentry::decrements());
676 mlogger->set(l_mdm_capa, Capability::increments());
677 mlogger->set(l_mdm_caps, Capability::decrements());
678 mlogger->set(l_mdm_buf, buffer::get_total_alloc());
679 }
680 }
681
682 /*
683 * lower priority messages we defer if we seem laggy
684 */
685 bool MDSRank::handle_deferrable_message(Message *m)
686 {
687 int port = m->get_type() & 0xff00;
688
689 switch (port) {
690 case MDS_PORT_CACHE:
691 ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MDS);
692 mdcache->dispatch(m);
693 break;
694
695 case MDS_PORT_MIGRATOR:
696 ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MDS);
697 mdcache->migrator->dispatch(m);
698 break;
699
700 default:
701 switch (m->get_type()) {
702 // SERVER
703 case CEPH_MSG_CLIENT_SESSION:
704 case CEPH_MSG_CLIENT_RECONNECT:
705 ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_CLIENT);
706 // fall-thru
707 case CEPH_MSG_CLIENT_REQUEST:
708 server->dispatch(m);
709 break;
710 case MSG_MDS_SLAVE_REQUEST:
711 ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MDS);
712 server->dispatch(m);
713 break;
714
715 case MSG_MDS_HEARTBEAT:
716 ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MDS);
717 balancer->proc_message(m);
718 break;
719
720 case MSG_MDS_TABLE_REQUEST:
721 ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MDS);
722 {
723 MMDSTableRequest *req = static_cast<MMDSTableRequest*>(m);
724 if (req->op < 0) {
725 MDSTableClient *client = get_table_client(req->table);
726 client->handle_request(req);
727 } else {
728 MDSTableServer *server = get_table_server(req->table);
729 server->handle_request(req);
730 }
731 }
732 break;
733
734 case MSG_MDS_LOCK:
735 case MSG_MDS_INODEFILECAPS:
736 ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MDS);
737 locker->dispatch(m);
738 break;
739
740 case CEPH_MSG_CLIENT_CAPS:
741 case CEPH_MSG_CLIENT_CAPRELEASE:
742 case CEPH_MSG_CLIENT_LEASE:
743 ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_CLIENT);
744 locker->dispatch(m);
745 break;
746
747 default:
748 return false;
749 }
750 }
751
752 return true;
753 }
754
755 /**
756 * Advance finished_queue and waiting_for_nolaggy.
757 *
758 * Usually drain both queues, but may not drain waiting_for_nolaggy
759 * if beacon is currently laggy.
760 */
761 void MDSRank::_advance_queues()
762 {
763 assert(mds_lock.is_locked_by_me());
764
765 while (!finished_queue.empty()) {
766 dout(7) << "mds has " << finished_queue.size() << " queued contexts" << dendl;
767 dout(10) << finished_queue << dendl;
768 list<MDSInternalContextBase*> ls;
769 ls.swap(finished_queue);
770 while (!ls.empty()) {
771 dout(10) << " finish " << ls.front() << dendl;
772 ls.front()->complete(0);
773 ls.pop_front();
774
775 heartbeat_reset();
776 }
777 }
778
779 while (!waiting_for_nolaggy.empty()) {
780 // stop if we're laggy now!
781 if (beacon.is_laggy())
782 break;
783
784 Message *old = waiting_for_nolaggy.front();
785 waiting_for_nolaggy.pop_front();
786
787 if (is_stale_message(old)) {
788 old->put();
789 } else {
790 dout(7) << " processing laggy deferred " << *old << dendl;
791 if (!handle_deferrable_message(old)) {
792 dout(0) << "unrecognized message " << *old << dendl;
793 old->put();
794 }
795 }
796
797 heartbeat_reset();
798 }
799 }
800
801 /**
802 * Call this when you take mds_lock, or periodically if you're going to
803 * hold the lock for a long time (e.g. iterating over clients/inodes)
804 */
805 void MDSRank::heartbeat_reset()
806 {
807 // Any thread might jump into mds_lock and call us immediately
808 // after a call to suicide() completes, in which case MDSRank::hb
809 // has been freed and we are a no-op.
810 if (!hb) {
811 assert(stopping);
812 return;
813 }
814
815 // NB not enabling suicide grace, because the mon takes care of killing us
816 // (by blacklisting us) when we fail to send beacons, and it's simpler to
817 // only have one way of dying.
818 g_ceph_context->get_heartbeat_map()->reset_timeout(hb, g_conf->mds_beacon_grace, 0);
819 }
820
821 bool MDSRank::is_stale_message(Message *m) const
822 {
823 // from bad mds?
824 if (m->get_source().is_mds()) {
825 mds_rank_t from = mds_rank_t(m->get_source().num());
826 if (!mdsmap->have_inst(from) ||
827 mdsmap->get_inst(from) != m->get_source_inst() ||
828 mdsmap->is_down(from)) {
829 // bogus mds?
830 if (m->get_type() == CEPH_MSG_MDS_MAP) {
831 dout(5) << "got " << *m << " from old/bad/imposter mds " << m->get_source()
832 << ", but it's an mdsmap, looking at it" << dendl;
833 } else if (m->get_type() == MSG_MDS_CACHEEXPIRE &&
834 mdsmap->get_inst(from) == m->get_source_inst()) {
835 dout(5) << "got " << *m << " from down mds " << m->get_source()
836 << ", but it's a cache_expire, looking at it" << dendl;
837 } else {
838 dout(5) << "got " << *m << " from down/old/bad/imposter mds " << m->get_source()
839 << ", dropping" << dendl;
840 return true;
841 }
842 }
843 }
844 return false;
845 }
846
847 Session *MDSRank::get_session(Message *m)
848 {
849 Session *session = static_cast<Session *>(m->get_connection()->get_priv());
850 if (session) {
851 session->put(); // do not carry ref
852 dout(20) << "get_session have " << session << " " << session->info.inst
853 << " state " << session->get_state_name() << dendl;
854 // Check if we've imported an open session since (new sessions start closed)
855 if (session->is_closed()) {
856 Session *imported_session = sessionmap.get_session(session->info.inst.name);
857 if (imported_session && imported_session != session) {
858 dout(10) << __func__ << " replacing connection bootstrap session " << session << " with imported session " << imported_session << dendl;
859 imported_session->info.auth_name = session->info.auth_name;
860 //assert(session->info.auth_name == imported_session->info.auth_name);
861 assert(session->info.inst == imported_session->info.inst);
862 imported_session->connection = session->connection;
863 // send out any queued messages
864 while (!session->preopen_out_queue.empty()) {
865 imported_session->connection->send_message(session->preopen_out_queue.front());
866 session->preopen_out_queue.pop_front();
867 }
868 imported_session->auth_caps = session->auth_caps;
869 assert(session->get_nref() == 1);
870 imported_session->connection->set_priv(imported_session->get());
871 session = imported_session;
872 }
873 }
874 } else {
875 dout(20) << "get_session dne for " << m->get_source_inst() << dendl;
876 }
877 return session;
878 }
879
880 void MDSRank::send_message(Message *m, Connection *c)
881 {
882 assert(c);
883 c->send_message(m);
884 }
885
886
887 void MDSRank::send_message_mds(Message *m, mds_rank_t mds)
888 {
889 if (!mdsmap->is_up(mds)) {
890 dout(10) << "send_message_mds mds." << mds << " not up, dropping " << *m << dendl;
891 m->put();
892 return;
893 }
894
895 // send mdsmap first?
896 if (mds != whoami && peer_mdsmap_epoch[mds] < mdsmap->get_epoch()) {
897 messenger->send_message(new MMDSMap(monc->get_fsid(), mdsmap),
898 mdsmap->get_inst(mds));
899 peer_mdsmap_epoch[mds] = mdsmap->get_epoch();
900 }
901
902 // send message
903 messenger->send_message(m, mdsmap->get_inst(mds));
904 }
905
906 void MDSRank::forward_message_mds(Message *m, mds_rank_t mds)
907 {
908 assert(mds != whoami);
909
910 // client request?
911 if (m->get_type() == CEPH_MSG_CLIENT_REQUEST &&
912 (static_cast<MClientRequest*>(m))->get_source().is_client()) {
913 MClientRequest *creq = static_cast<MClientRequest*>(m);
914 creq->inc_num_fwd(); // inc forward counter
915
916 /*
917 * don't actually forward if non-idempotent!
918 * client has to do it. although the MDS will ignore duplicate requests,
919 * the affected metadata may migrate, in which case the new authority
920 * won't have the metareq_id in the completed request map.
921 */
922 // NEW: always make the client resend!
923 bool client_must_resend = true; //!creq->can_forward();
924
925 // tell the client where it should go
926 messenger->send_message(new MClientRequestForward(creq->get_tid(), mds, creq->get_num_fwd(),
927 client_must_resend),
928 creq->get_source_inst());
929
930 if (client_must_resend) {
931 m->put();
932 return;
933 }
934 }
935
936 // these are the only types of messages we should be 'forwarding'; they
937 // explicitly encode their source mds, which gets clobbered when we resend
938 // them here.
939 assert(m->get_type() == MSG_MDS_DIRUPDATE ||
940 m->get_type() == MSG_MDS_EXPORTDIRDISCOVER);
941
942 // send mdsmap first?
943 if (peer_mdsmap_epoch[mds] < mdsmap->get_epoch()) {
944 messenger->send_message(new MMDSMap(monc->get_fsid(), mdsmap),
945 mdsmap->get_inst(mds));
946 peer_mdsmap_epoch[mds] = mdsmap->get_epoch();
947 }
948
949 messenger->send_message(m, mdsmap->get_inst(mds));
950 }
951
952
953
954 void MDSRank::send_message_client_counted(Message *m, client_t client)
955 {
956 Session *session = sessionmap.get_session(entity_name_t::CLIENT(client.v));
957 if (session) {
958 send_message_client_counted(m, session);
959 } else {
960 dout(10) << "send_message_client_counted no session for client." << client << " " << *m << dendl;
961 }
962 }
963
964 void MDSRank::send_message_client_counted(Message *m, Connection *connection)
965 {
966 Session *session = static_cast<Session *>(connection->get_priv());
967 if (session) {
968 session->put(); // do not carry ref
969 send_message_client_counted(m, session);
970 } else {
971 dout(10) << "send_message_client_counted has no session for " << m->get_source_inst() << dendl;
972 // another Connection took over the Session
973 }
974 }
975
976 void MDSRank::send_message_client_counted(Message *m, Session *session)
977 {
978 version_t seq = session->inc_push_seq();
979 dout(10) << "send_message_client_counted " << session->info.inst.name << " seq "
980 << seq << " " << *m << dendl;
981 if (session->connection) {
982 session->connection->send_message(m);
983 } else {
984 session->preopen_out_queue.push_back(m);
985 }
986 }
987
988 void MDSRank::send_message_client(Message *m, Session *session)
989 {
990 dout(10) << "send_message_client " << session->info.inst << " " << *m << dendl;
991 if (session->connection) {
992 session->connection->send_message(m);
993 } else {
994 session->preopen_out_queue.push_back(m);
995 }
996 }
997
998 /**
999 * This is used whenever a RADOS operation has been cancelled
1000 * or a RADOS client has been blacklisted, to cause the MDS and
1001 * any clients to wait for this OSD epoch before using any new caps.
1002 *
1003 * See doc/cephfs/eviction
1004 */
1005 void MDSRank::set_osd_epoch_barrier(epoch_t e)
1006 {
1007 dout(4) << __func__ << ": epoch=" << e << dendl;
1008 osd_epoch_barrier = e;
1009 }
1010
1011 void MDSRank::retry_dispatch(Message *m)
1012 {
1013 inc_dispatch_depth();
1014 _dispatch(m, false);
1015 dec_dispatch_depth();
1016 }
1017
1018 utime_t MDSRank::get_laggy_until() const
1019 {
1020 return beacon.get_laggy_until();
1021 }
1022
1023 bool MDSRank::is_daemon_stopping() const
1024 {
1025 return stopping;
1026 }
1027
1028 void MDSRank::request_state(MDSMap::DaemonState s)
1029 {
1030 dout(3) << "request_state " << ceph_mds_state_name(s) << dendl;
1031 beacon.set_want_state(mdsmap, s);
1032 beacon.send();
1033 }
1034
1035
1036 class C_MDS_BootStart : public MDSInternalContext {
1037 MDSRank::BootStep nextstep;
1038 public:
1039 C_MDS_BootStart(MDSRank *m, MDSRank::BootStep n)
1040 : MDSInternalContext(m), nextstep(n) {}
1041 void finish(int r) override {
1042 mds->boot_start(nextstep, r);
1043 }
1044 };
1045
1046
1047 void MDSRank::boot_start(BootStep step, int r)
1048 {
1049 // Handle errors from previous step
1050 if (r < 0) {
1051 if (is_standby_replay() && (r == -EAGAIN)) {
1052 dout(0) << "boot_start encountered an error EAGAIN"
1053 << ", respawning since we fell behind journal" << dendl;
1054 respawn();
1055 } else if (r == -EINVAL || r == -ENOENT) {
1056 // Invalid or absent data, indicates damaged on-disk structures
1057 clog->error() << "Error loading MDS rank " << whoami << ": "
1058 << cpp_strerror(r);
1059 damaged();
1060 assert(r == 0); // Unreachable, damaged() calls respawn()
1061 } else {
1062 // Completely unexpected error, give up and die
1063 dout(0) << "boot_start encountered an error, failing" << dendl;
1064 suicide();
1065 return;
1066 }
1067 }
1068
1069 assert(is_starting() || is_any_replay());
1070
1071 switch(step) {
1072 case MDS_BOOT_INITIAL:
1073 {
1074 mdcache->init_layouts();
1075
1076 MDSGatherBuilder gather(g_ceph_context,
1077 new C_MDS_BootStart(this, MDS_BOOT_OPEN_ROOT));
1078 dout(2) << "boot_start " << step << ": opening inotable" << dendl;
1079 inotable->set_rank(whoami);
1080 inotable->load(gather.new_sub());
1081
1082 dout(2) << "boot_start " << step << ": opening sessionmap" << dendl;
1083 sessionmap.set_rank(whoami);
1084 sessionmap.load(gather.new_sub());
1085
1086 dout(2) << "boot_start " << step << ": opening mds log" << dendl;
1087 mdlog->open(gather.new_sub());
1088
1089 if (is_starting()) {
1090 dout(2) << "boot_start " << step << ": opening purge queue" << dendl;
1091 purge_queue.open(new C_IO_Wrapper(this, gather.new_sub()));
1092 } else if (!standby_replaying) {
1093 dout(2) << "boot_start " << step << ": opening purge queue (async)" << dendl;
1094 purge_queue.open(NULL);
1095 }
1096
1097 if (mdsmap->get_tableserver() == whoami) {
1098 dout(2) << "boot_start " << step << ": opening snap table" << dendl;
1099 snapserver->set_rank(whoami);
1100 snapserver->load(gather.new_sub());
1101 }
1102
1103 gather.activate();
1104 }
1105 break;
1106 case MDS_BOOT_OPEN_ROOT:
1107 {
1108 dout(2) << "boot_start " << step << ": loading/discovering base inodes" << dendl;
1109
1110 MDSGatherBuilder gather(g_ceph_context,
1111 new C_MDS_BootStart(this, MDS_BOOT_PREPARE_LOG));
1112
1113 if (is_starting()) {
1114 // load mydir frag for the first log segment (creating subtree map)
1115 mdcache->open_mydir_frag(gather.new_sub());
1116 } else {
1117 mdcache->open_mydir_inode(gather.new_sub());
1118 }
1119
1120 if (whoami == mdsmap->get_root()) { // load root inode off disk if we are auth
1121 mdcache->open_root_inode(gather.new_sub());
1122 } else if (is_any_replay()) {
1123 // replay. make up fake root inode to start with
1124 mdcache->create_root_inode();
1125 }
1126 gather.activate();
1127 }
1128 break;
1129 case MDS_BOOT_PREPARE_LOG:
1130 if (is_any_replay()) {
1131 dout(2) << "boot_start " << step << ": replaying mds log" << dendl;
1132 MDSGatherBuilder gather(g_ceph_context,
1133 new C_MDS_BootStart(this, MDS_BOOT_REPLAY_DONE));
1134
1135 if (!standby_replaying) {
1136 dout(2) << "boot_start " << step << ": waiting for purge queue recovered" << dendl;
1137 purge_queue.wait_for_recovery(new C_IO_Wrapper(this, gather.new_sub()));
1138 }
1139
1140 mdlog->replay(gather.new_sub());
1141 gather.activate();
1142 } else {
1143 dout(2) << "boot_start " << step << ": positioning at end of old mds log" << dendl;
1144 mdlog->append();
1145 starting_done();
1146 }
1147 break;
1148 case MDS_BOOT_REPLAY_DONE:
1149 assert(is_any_replay());
1150
1151 // Sessiontable and inotable should be in sync after replay, validate
1152 // that they are consistent.
1153 validate_sessions();
1154
1155 replay_done();
1156 break;
1157 }
1158 }
1159
1160 void MDSRank::validate_sessions()
1161 {
1162 assert(mds_lock.is_locked_by_me());
1163 bool valid = true;
1164
1165 // Identify any sessions which have state inconsistent with other,
1166 // after they have been loaded from rados during startup.
1167 // Mitigate bugs like: http://tracker.ceph.com/issues/16842
1168 const auto &sessions = sessionmap.get_sessions();
1169 for (const auto &i : sessions) {
1170 Session *session = i.second;
1171 interval_set<inodeno_t> badones;
1172 if (inotable->intersects_free(session->info.prealloc_inos, &badones)) {
1173 clog->error() << "client " << *session
1174 << "loaded with preallocated inodes that are inconsistent with inotable";
1175 valid = false;
1176 }
1177 }
1178
1179 if (!valid) {
1180 damaged();
1181 assert(valid);
1182 }
1183 }
1184
1185 void MDSRank::starting_done()
1186 {
1187 dout(3) << "starting_done" << dendl;
1188 assert(is_starting());
1189 request_state(MDSMap::STATE_ACTIVE);
1190
1191 mdlog->start_new_segment();
1192 }
1193
1194
1195 void MDSRank::calc_recovery_set()
1196 {
1197 // initialize gather sets
1198 set<mds_rank_t> rs;
1199 mdsmap->get_recovery_mds_set(rs);
1200 rs.erase(whoami);
1201 mdcache->set_recovery_set(rs);
1202
1203 dout(1) << " recovery set is " << rs << dendl;
1204 }
1205
1206
1207 void MDSRank::replay_start()
1208 {
1209 dout(1) << "replay_start" << dendl;
1210
1211 if (is_standby_replay())
1212 standby_replaying = true;
1213
1214 calc_recovery_set();
1215
1216 // Check if we need to wait for a newer OSD map before starting
1217 Context *fin = new C_IO_Wrapper(this, new C_MDS_BootStart(this, MDS_BOOT_INITIAL));
1218 bool const ready = objecter->wait_for_map(
1219 mdsmap->get_last_failure_osd_epoch(),
1220 fin);
1221
1222 if (ready) {
1223 delete fin;
1224 boot_start();
1225 } else {
1226 dout(1) << " waiting for osdmap " << mdsmap->get_last_failure_osd_epoch()
1227 << " (which blacklists prior instance)" << dendl;
1228 }
1229 }
1230
1231
1232 class MDSRank::C_MDS_StandbyReplayRestartFinish : public MDSIOContext {
1233 uint64_t old_read_pos;
1234 public:
1235 C_MDS_StandbyReplayRestartFinish(MDSRank *mds_, uint64_t old_read_pos_) :
1236 MDSIOContext(mds_), old_read_pos(old_read_pos_) {}
1237 void finish(int r) override {
1238 mds->_standby_replay_restart_finish(r, old_read_pos);
1239 }
1240 };
1241
1242 void MDSRank::_standby_replay_restart_finish(int r, uint64_t old_read_pos)
1243 {
1244 if (old_read_pos < mdlog->get_journaler()->get_trimmed_pos()) {
1245 dout(0) << "standby MDS fell behind active MDS journal's expire_pos, restarting" << dendl;
1246 respawn(); /* we're too far back, and this is easier than
1247 trying to reset everything in the cache, etc */
1248 } else {
1249 mdlog->standby_trim_segments();
1250 boot_start(MDS_BOOT_PREPARE_LOG, r);
1251 }
1252 }
1253
1254 class MDSRank::C_MDS_StandbyReplayRestart : public MDSInternalContext {
1255 public:
1256 explicit C_MDS_StandbyReplayRestart(MDSRank *m) : MDSInternalContext(m) {}
1257 void finish(int r) override {
1258 assert(!r);
1259 mds->standby_replay_restart();
1260 }
1261 };
1262
1263 void MDSRank::standby_replay_restart()
1264 {
1265 if (standby_replaying) {
1266 /* Go around for another pass of replaying in standby */
1267 dout(4) << "standby_replay_restart (as standby)" << dendl;
1268 mdlog->get_journaler()->reread_head_and_probe(
1269 new C_MDS_StandbyReplayRestartFinish(
1270 this,
1271 mdlog->get_journaler()->get_read_pos()));
1272 } else {
1273 /* We are transitioning out of standby: wait for OSD map update
1274 before making final pass */
1275 dout(1) << "standby_replay_restart (final takeover pass)" << dendl;
1276 Context *fin = new C_IO_Wrapper(this, new C_MDS_StandbyReplayRestart(this));
1277 bool ready = objecter->wait_for_map(mdsmap->get_last_failure_osd_epoch(), fin);
1278 if (ready) {
1279 delete fin;
1280 mdlog->get_journaler()->reread_head_and_probe(
1281 new C_MDS_StandbyReplayRestartFinish(
1282 this,
1283 mdlog->get_journaler()->get_read_pos()));
1284
1285 dout(1) << " opening purge queue (async)" << dendl;
1286 purge_queue.open(NULL);
1287 } else {
1288 dout(1) << " waiting for osdmap " << mdsmap->get_last_failure_osd_epoch()
1289 << " (which blacklists prior instance)" << dendl;
1290 }
1291 }
1292 }
1293
1294 void MDSRank::replay_done()
1295 {
1296 dout(1) << "replay_done" << (standby_replaying ? " (as standby)" : "") << dendl;
1297
1298 if (is_standby_replay()) {
1299 // The replay was done in standby state, and we are still in that state
1300 assert(standby_replaying);
1301 dout(10) << "setting replay timer" << dendl;
1302 timer.add_event_after(g_conf->mds_replay_interval,
1303 new C_MDS_StandbyReplayRestart(this));
1304 return;
1305 } else if (standby_replaying) {
1306 // The replay was done in standby state, we have now _left_ that state
1307 dout(10) << " last replay pass was as a standby; making final pass" << dendl;
1308 standby_replaying = false;
1309 standby_replay_restart();
1310 return;
1311 } else {
1312 // Replay is complete, journal read should be up to date
1313 assert(mdlog->get_journaler()->get_read_pos() == mdlog->get_journaler()->get_write_pos());
1314 assert(!is_standby_replay());
1315
1316 // Reformat and come back here
1317 if (mdlog->get_journaler()->get_stream_format() < g_conf->mds_journal_format) {
1318 dout(4) << "reformatting journal on standbyreplay->replay transition" << dendl;
1319 mdlog->reopen(new C_MDS_BootStart(this, MDS_BOOT_REPLAY_DONE));
1320 return;
1321 }
1322 }
1323
1324 dout(1) << "making mds journal writeable" << dendl;
1325 mdlog->get_journaler()->set_writeable();
1326 mdlog->get_journaler()->trim_tail();
1327
1328 if (g_conf->mds_wipe_sessions) {
1329 dout(1) << "wiping out client sessions" << dendl;
1330 sessionmap.wipe();
1331 sessionmap.save(new C_MDSInternalNoop);
1332 }
1333 if (g_conf->mds_wipe_ino_prealloc) {
1334 dout(1) << "wiping out ino prealloc from sessions" << dendl;
1335 sessionmap.wipe_ino_prealloc();
1336 sessionmap.save(new C_MDSInternalNoop);
1337 }
1338 if (g_conf->mds_skip_ino) {
1339 inodeno_t i = g_conf->mds_skip_ino;
1340 dout(1) << "skipping " << i << " inodes" << dendl;
1341 inotable->skip_inos(i);
1342 inotable->save(new C_MDSInternalNoop);
1343 }
1344
1345 if (mdsmap->get_num_in_mds() == 1 &&
1346 mdsmap->get_num_failed_mds() == 0) { // just me!
1347 dout(2) << "i am alone, moving to state reconnect" << dendl;
1348 request_state(MDSMap::STATE_RECONNECT);
1349 } else {
1350 dout(2) << "i am not alone, moving to state resolve" << dendl;
1351 request_state(MDSMap::STATE_RESOLVE);
1352 }
1353 }
1354
1355 void MDSRank::reopen_log()
1356 {
1357 dout(1) << "reopen_log" << dendl;
1358 mdcache->rollback_uncommitted_fragments();
1359 }
1360
1361
1362 void MDSRank::resolve_start()
1363 {
1364 dout(1) << "resolve_start" << dendl;
1365
1366 reopen_log();
1367
1368 mdcache->resolve_start(new C_MDS_VoidFn(this, &MDSRank::resolve_done));
1369 finish_contexts(g_ceph_context, waiting_for_resolve);
1370 }
1371 void MDSRank::resolve_done()
1372 {
1373 dout(1) << "resolve_done" << dendl;
1374 request_state(MDSMap::STATE_RECONNECT);
1375 }
1376
1377 void MDSRank::reconnect_start()
1378 {
1379 dout(1) << "reconnect_start" << dendl;
1380
1381 if (last_state == MDSMap::STATE_REPLAY) {
1382 reopen_log();
1383 }
1384
1385 // Drop any blacklisted clients from the SessionMap before going
1386 // into reconnect, so that we don't wait for them.
1387 objecter->enable_blacklist_events();
1388 std::set<entity_addr_t> blacklist;
1389 epoch_t epoch = 0;
1390 objecter->with_osdmap([this, &blacklist, &epoch](const OSDMap& o) {
1391 o.get_blacklist(&blacklist);
1392 epoch = o.get_epoch();
1393 });
1394 auto killed = server->apply_blacklist(blacklist);
1395 dout(4) << "reconnect_start: killed " << killed << " blacklisted sessions ("
1396 << blacklist.size() << " blacklist entries, "
1397 << sessionmap.get_sessions().size() << ")" << dendl;
1398 if (killed) {
1399 set_osd_epoch_barrier(epoch);
1400 }
1401
1402 server->reconnect_clients(new C_MDS_VoidFn(this, &MDSRank::reconnect_done));
1403 finish_contexts(g_ceph_context, waiting_for_reconnect);
1404 }
1405 void MDSRank::reconnect_done()
1406 {
1407 dout(1) << "reconnect_done" << dendl;
1408 request_state(MDSMap::STATE_REJOIN); // move to rejoin state
1409 }
1410
1411 void MDSRank::rejoin_joint_start()
1412 {
1413 dout(1) << "rejoin_joint_start" << dendl;
1414 mdcache->rejoin_send_rejoins();
1415 }
1416 void MDSRank::rejoin_start()
1417 {
1418 dout(1) << "rejoin_start" << dendl;
1419 mdcache->rejoin_start(new C_MDS_VoidFn(this, &MDSRank::rejoin_done));
1420 }
1421 void MDSRank::rejoin_done()
1422 {
1423 dout(1) << "rejoin_done" << dendl;
1424 mdcache->show_subtrees();
1425 mdcache->show_cache();
1426
1427 // funny case: is our cache empty? no subtrees?
1428 if (!mdcache->is_subtrees()) {
1429 if (whoami == 0) {
1430 // The root should always have a subtree!
1431 clog->error() << "No subtrees found for root MDS rank!";
1432 damaged();
1433 assert(mdcache->is_subtrees());
1434 } else {
1435 dout(1) << " empty cache, no subtrees, leaving cluster" << dendl;
1436 request_state(MDSMap::STATE_STOPPED);
1437 }
1438 return;
1439 }
1440
1441 if (replay_queue.empty())
1442 request_state(MDSMap::STATE_ACTIVE);
1443 else
1444 request_state(MDSMap::STATE_CLIENTREPLAY);
1445 }
1446
1447 void MDSRank::clientreplay_start()
1448 {
1449 dout(1) << "clientreplay_start" << dendl;
1450 finish_contexts(g_ceph_context, waiting_for_replay); // kick waiters
1451 mdcache->start_files_to_recover();
1452 queue_one_replay();
1453 }
1454
1455 bool MDSRank::queue_one_replay()
1456 {
1457 if (replay_queue.empty()) {
1458 mdlog->wait_for_safe(new C_MDS_VoidFn(this, &MDSRank::clientreplay_done));
1459 return false;
1460 }
1461 queue_waiter(replay_queue.front());
1462 replay_queue.pop_front();
1463 return true;
1464 }
1465
1466 void MDSRank::clientreplay_done()
1467 {
1468 dout(1) << "clientreplay_done" << dendl;
1469 request_state(MDSMap::STATE_ACTIVE);
1470 }
1471
1472 void MDSRank::active_start()
1473 {
1474 dout(1) << "active_start" << dendl;
1475
1476 if (last_state == MDSMap::STATE_CREATING ||
1477 last_state == MDSMap::STATE_STARTING) {
1478 mdcache->open_root();
1479 }
1480
1481 mdcache->clean_open_file_lists();
1482 mdcache->export_remaining_imported_caps();
1483 finish_contexts(g_ceph_context, waiting_for_replay); // kick waiters
1484 mdcache->start_files_to_recover();
1485
1486 mdcache->reissue_all_caps();
1487 mdcache->activate_stray_manager();
1488
1489 finish_contexts(g_ceph_context, waiting_for_active); // kick waiters
1490 }
1491
1492 void MDSRank::recovery_done(int oldstate)
1493 {
1494 dout(1) << "recovery_done -- successful recovery!" << dendl;
1495 assert(is_clientreplay() || is_active());
1496
1497 // kick snaptable (resent AGREEs)
1498 if (mdsmap->get_tableserver() == whoami) {
1499 set<mds_rank_t> active;
1500 mdsmap->get_mds_set_lower_bound(active, MDSMap::STATE_CLIENTREPLAY);
1501 snapserver->finish_recovery(active);
1502 }
1503
1504 if (oldstate == MDSMap::STATE_CREATING)
1505 return;
1506
1507 mdcache->start_recovered_truncates();
1508 mdcache->do_file_recover();
1509
1510 // tell connected clients
1511 //bcast_mds_map(); // not anymore, they get this from the monitor
1512
1513 mdcache->populate_mydir();
1514 }
1515
1516 void MDSRank::creating_done()
1517 {
1518 dout(1)<< "creating_done" << dendl;
1519 request_state(MDSMap::STATE_ACTIVE);
1520 }
1521
1522 void MDSRank::boot_create()
1523 {
1524 dout(3) << "boot_create" << dendl;
1525
1526 MDSGatherBuilder fin(g_ceph_context, new C_MDS_VoidFn(this, &MDSRank::creating_done));
1527
1528 mdcache->init_layouts();
1529
1530 snapserver->set_rank(whoami);
1531 inotable->set_rank(whoami);
1532 sessionmap.set_rank(whoami);
1533
1534 // start with a fresh journal
1535 dout(10) << "boot_create creating fresh journal" << dendl;
1536 mdlog->create(fin.new_sub());
1537
1538 // open new journal segment, but do not journal subtree map (yet)
1539 mdlog->prepare_new_segment();
1540
1541 if (whoami == mdsmap->get_root()) {
1542 dout(3) << "boot_create creating fresh hierarchy" << dendl;
1543 mdcache->create_empty_hierarchy(fin.get());
1544 }
1545
1546 dout(3) << "boot_create creating mydir hierarchy" << dendl;
1547 mdcache->create_mydir_hierarchy(fin.get());
1548
1549 // fixme: fake out inotable (reset, pretend loaded)
1550 dout(10) << "boot_create creating fresh inotable table" << dendl;
1551 inotable->reset();
1552 inotable->save(fin.new_sub());
1553
1554 // write empty sessionmap
1555 sessionmap.save(fin.new_sub());
1556
1557 // Create empty purge queue
1558 purge_queue.create(new C_IO_Wrapper(this, fin.new_sub()));
1559
1560 // initialize tables
1561 if (mdsmap->get_tableserver() == whoami) {
1562 dout(10) << "boot_create creating fresh snaptable" << dendl;
1563 snapserver->reset();
1564 snapserver->save(fin.new_sub());
1565 }
1566
1567 assert(g_conf->mds_kill_create_at != 1);
1568
1569 // ok now journal it
1570 mdlog->journal_segment_subtree_map(fin.new_sub());
1571 mdlog->flush();
1572
1573 // Usually we do this during reconnect, but creation skips that.
1574 objecter->enable_blacklist_events();
1575
1576 fin.activate();
1577 }
1578
1579 void MDSRank::stopping_start()
1580 {
1581 dout(2) << "stopping_start" << dendl;
1582
1583 if (mdsmap->get_num_in_mds() == 1 && !sessionmap.empty()) {
1584 // we're the only mds up!
1585 dout(0) << "we are the last MDS, and have mounted clients: we cannot flush our journal. suicide!" << dendl;
1586 suicide();
1587 }
1588
1589 mdcache->shutdown_start();
1590 }
1591
1592 void MDSRank::stopping_done()
1593 {
1594 dout(2) << "stopping_done" << dendl;
1595
1596 // tell monitor we shut down cleanly.
1597 request_state(MDSMap::STATE_STOPPED);
1598 }
1599
1600 void MDSRankDispatcher::handle_mds_map(
1601 MMDSMap *m,
1602 MDSMap *oldmap)
1603 {
1604 // I am only to be passed MDSMaps in which I hold a rank
1605 assert(whoami != MDS_RANK_NONE);
1606
1607 MDSMap::DaemonState oldstate = state;
1608 mds_gid_t mds_gid = mds_gid_t(monc->get_global_id());
1609 state = mdsmap->get_state_gid(mds_gid);
1610 if (state != oldstate) {
1611 last_state = oldstate;
1612 incarnation = mdsmap->get_inc_gid(mds_gid);
1613 }
1614
1615 version_t epoch = m->get_epoch();
1616
1617 // note source's map version
1618 if (m->get_source().is_mds() &&
1619 peer_mdsmap_epoch[mds_rank_t(m->get_source().num())] < epoch) {
1620 dout(15) << " peer " << m->get_source()
1621 << " has mdsmap epoch >= " << epoch
1622 << dendl;
1623 peer_mdsmap_epoch[mds_rank_t(m->get_source().num())] = epoch;
1624 }
1625
1626 // Validate state transitions while I hold a rank
1627 if (!MDSMap::state_transition_valid(oldstate, state)) {
1628 derr << "Invalid state transition " << ceph_mds_state_name(oldstate)
1629 << "->" << ceph_mds_state_name(state) << dendl;
1630 respawn();
1631 }
1632
1633 if (oldstate != state) {
1634 // update messenger.
1635 if (state == MDSMap::STATE_STANDBY_REPLAY) {
1636 dout(1) << "handle_mds_map i am now mds." << mds_gid << "." << incarnation
1637 << " replaying mds." << whoami << "." << incarnation << dendl;
1638 messenger->set_myname(entity_name_t::MDS(mds_gid));
1639 } else {
1640 dout(1) << "handle_mds_map i am now mds." << whoami << "." << incarnation << dendl;
1641 messenger->set_myname(entity_name_t::MDS(whoami));
1642 }
1643 }
1644
1645 // tell objecter my incarnation
1646 if (objecter->get_client_incarnation() != incarnation)
1647 objecter->set_client_incarnation(incarnation);
1648
1649 // for debug
1650 if (g_conf->mds_dump_cache_on_map)
1651 mdcache->dump_cache();
1652
1653 cluster_degraded = mdsmap->is_degraded();
1654
1655 // mdsmap and oldmap can be discontinuous. failover might happen in the missing mdsmap.
1656 // the 'restart' set tracks ranks that have restarted since the old mdsmap
1657 set<mds_rank_t> restart;
1658 // replaying mds does not communicate with other ranks
1659 if (state >= MDSMap::STATE_RESOLVE) {
1660 // did someone fail?
1661 // new down?
1662 set<mds_rank_t> olddown, down;
1663 oldmap->get_down_mds_set(&olddown);
1664 mdsmap->get_down_mds_set(&down);
1665 for (const auto& r : down) {
1666 if (oldmap->have_inst(r) && olddown.count(r) == 0) {
1667 messenger->mark_down(oldmap->get_inst(r).addr);
1668 handle_mds_failure(r);
1669 }
1670 }
1671
1672 // did someone fail?
1673 // did their addr/inst change?
1674 set<mds_rank_t> up;
1675 mdsmap->get_up_mds_set(up);
1676 for (const auto& r : up) {
1677 auto& info = mdsmap->get_info(r);
1678 if (oldmap->have_inst(r)) {
1679 auto& oldinfo = oldmap->get_info(r);
1680 if (info.inc != oldinfo.inc) {
1681 messenger->mark_down(oldinfo.addr);
1682 if (info.state == MDSMap::STATE_REPLAY ||
1683 info.state == MDSMap::STATE_RESOLVE) {
1684 restart.insert(r);
1685 handle_mds_failure(r);
1686 } else {
1687 assert(info.state == MDSMap::STATE_STARTING ||
1688 info.state == MDSMap::STATE_ACTIVE);
1689 // -> stopped (missing) -> starting -> active
1690 restart.insert(r);
1691 mdcache->migrator->handle_mds_failure_or_stop(r);
1692 }
1693 }
1694 } else {
1695 if (info.state == MDSMap::STATE_REPLAY ||
1696 info.state == MDSMap::STATE_RESOLVE) {
1697 // -> starting/creating (missing) -> active (missing) -> replay -> resolve
1698 restart.insert(r);
1699 handle_mds_failure(r);
1700 } else {
1701 assert(info.state == MDSMap::STATE_CREATING ||
1702 info.state == MDSMap::STATE_STARTING ||
1703 info.state == MDSMap::STATE_ACTIVE);
1704 }
1705 }
1706 }
1707 }
1708
1709 // did it change?
1710 if (oldstate != state) {
1711 dout(1) << "handle_mds_map state change "
1712 << ceph_mds_state_name(oldstate) << " --> "
1713 << ceph_mds_state_name(state) << dendl;
1714 beacon.set_want_state(mdsmap, state);
1715
1716 if (oldstate == MDSMap::STATE_STANDBY_REPLAY) {
1717 dout(10) << "Monitor activated us! Deactivating replay loop" << dendl;
1718 assert (state == MDSMap::STATE_REPLAY);
1719 } else {
1720 // did i just recover?
1721 if ((is_active() || is_clientreplay()) &&
1722 (oldstate == MDSMap::STATE_CREATING ||
1723 oldstate == MDSMap::STATE_REJOIN ||
1724 oldstate == MDSMap::STATE_RECONNECT))
1725 recovery_done(oldstate);
1726
1727 if (is_active()) {
1728 active_start();
1729 } else if (is_any_replay()) {
1730 replay_start();
1731 } else if (is_resolve()) {
1732 resolve_start();
1733 } else if (is_reconnect()) {
1734 reconnect_start();
1735 } else if (is_rejoin()) {
1736 rejoin_start();
1737 } else if (is_clientreplay()) {
1738 clientreplay_start();
1739 } else if (is_creating()) {
1740 boot_create();
1741 } else if (is_starting()) {
1742 boot_start();
1743 } else if (is_stopping()) {
1744 assert(oldstate == MDSMap::STATE_ACTIVE);
1745 stopping_start();
1746 }
1747 }
1748 }
1749
1750 // RESOLVE
1751 // is someone else newly resolving?
1752 if (state >= MDSMap::STATE_RESOLVE) {
1753 if ((!oldmap->is_resolving() || !restart.empty()) && mdsmap->is_resolving()) {
1754 set<mds_rank_t> resolve;
1755 mdsmap->get_mds_set(resolve, MDSMap::STATE_RESOLVE);
1756 dout(10) << " resolve set is " << resolve << dendl;
1757 calc_recovery_set();
1758 mdcache->send_resolves();
1759 }
1760 }
1761
1762 // REJOIN
1763 // is everybody finally rejoining?
1764 if (state >= MDSMap::STATE_REJOIN) {
1765 // did we start?
1766 if (!oldmap->is_rejoining() && mdsmap->is_rejoining())
1767 rejoin_joint_start();
1768
1769 // did we finish?
1770 if (g_conf->mds_dump_cache_after_rejoin &&
1771 oldmap->is_rejoining() && !mdsmap->is_rejoining())
1772 mdcache->dump_cache(); // for DEBUG only
1773
1774 if (oldstate >= MDSMap::STATE_REJOIN ||
1775 oldstate == MDSMap::STATE_STARTING) {
1776 // ACTIVE|CLIENTREPLAY|REJOIN => we can discover from them.
1777 set<mds_rank_t> olddis, dis;
1778 oldmap->get_mds_set_lower_bound(olddis, MDSMap::STATE_REJOIN);
1779 mdsmap->get_mds_set_lower_bound(dis, MDSMap::STATE_REJOIN);
1780 for (const auto& r : dis) {
1781 if (r == whoami)
1782 continue; // not me
1783 if (!olddis.count(r) || restart.count(r)) { // newly so?
1784 mdcache->kick_discovers(r);
1785 mdcache->kick_open_ino_peers(r);
1786 }
1787 }
1788 }
1789 }
1790
1791 if (oldmap->is_degraded() && !cluster_degraded && state >= MDSMap::STATE_ACTIVE) {
1792 dout(1) << "cluster recovered." << dendl;
1793 auto it = waiting_for_active_peer.find(MDS_RANK_NONE);
1794 if (it != waiting_for_active_peer.end()) {
1795 queue_waiters(it->second);
1796 waiting_for_active_peer.erase(it);
1797 }
1798 }
1799
1800 // did someone go active?
1801 if (state >= MDSMap::STATE_CLIENTREPLAY &&
1802 oldstate >= MDSMap::STATE_CLIENTREPLAY) {
1803 set<mds_rank_t> oldactive, active;
1804 oldmap->get_mds_set_lower_bound(oldactive, MDSMap::STATE_CLIENTREPLAY);
1805 mdsmap->get_mds_set_lower_bound(active, MDSMap::STATE_CLIENTREPLAY);
1806 for (const auto& r : active) {
1807 if (r == whoami)
1808 continue; // not me
1809 if (!oldactive.count(r) || restart.count(r)) // newly so?
1810 handle_mds_recovery(r);
1811 }
1812 }
1813
1814 if (state >= MDSMap::STATE_CLIENTREPLAY) {
1815 // did anyone stop?
1816 set<mds_rank_t> oldstopped, stopped;
1817 oldmap->get_stopped_mds_set(oldstopped);
1818 mdsmap->get_stopped_mds_set(stopped);
1819 for (const auto& r : stopped)
1820 if (oldstopped.count(r) == 0) // newly so?
1821 mdcache->migrator->handle_mds_failure_or_stop(r);
1822 }
1823
1824 {
1825 map<epoch_t,list<MDSInternalContextBase*> >::iterator p = waiting_for_mdsmap.begin();
1826 while (p != waiting_for_mdsmap.end() && p->first <= mdsmap->get_epoch()) {
1827 list<MDSInternalContextBase*> ls;
1828 ls.swap(p->second);
1829 waiting_for_mdsmap.erase(p++);
1830 finish_contexts(g_ceph_context, ls);
1831 }
1832 }
1833
1834 if (is_active()) {
1835 // Before going active, set OSD epoch barrier to latest (so that
1836 // we don't risk handing out caps to clients with old OSD maps that
1837 // might not include barriers from the previous incarnation of this MDS)
1838 set_osd_epoch_barrier(objecter->with_osdmap(
1839 std::mem_fn(&OSDMap::get_epoch)));
1840 }
1841
1842 if (is_active()) {
1843 bool found = false;
1844 MDSMap::mds_info_t info = mdsmap->get_info(whoami);
1845
1846 for (map<mds_gid_t,MDSMap::mds_info_t>::const_iterator p = mdsmap->get_mds_info().begin();
1847 p != mdsmap->get_mds_info().end();
1848 ++p) {
1849 if (p->second.state == MDSMap::STATE_STANDBY_REPLAY &&
1850 (p->second.standby_for_rank == whoami ||(info.name.length() && p->second.standby_for_name == info.name))) {
1851 found = true;
1852 break;
1853 }
1854 if (found)
1855 mdlog->set_write_iohint(0);
1856 else
1857 mdlog->set_write_iohint(CEPH_OSD_OP_FLAG_FADVISE_DONTNEED);
1858 }
1859 }
1860
1861 if (oldmap->get_max_mds() != mdsmap->get_max_mds()) {
1862 purge_queue.update_op_limit(*mdsmap);
1863 }
1864 }
1865
1866 void MDSRank::handle_mds_recovery(mds_rank_t who)
1867 {
1868 dout(5) << "handle_mds_recovery mds." << who << dendl;
1869
1870 mdcache->handle_mds_recovery(who);
1871
1872 if (mdsmap->get_tableserver() == whoami) {
1873 snapserver->handle_mds_recovery(who);
1874 }
1875
1876 queue_waiters(waiting_for_active_peer[who]);
1877 waiting_for_active_peer.erase(who);
1878 }
1879
1880 void MDSRank::handle_mds_failure(mds_rank_t who)
1881 {
1882 if (who == whoami) {
1883 dout(5) << "handle_mds_failure for myself; not doing anything" << dendl;
1884 return;
1885 }
1886 dout(5) << "handle_mds_failure mds." << who << dendl;
1887
1888 mdcache->handle_mds_failure(who);
1889
1890 snapclient->handle_mds_failure(who);
1891 }
1892
1893 bool MDSRankDispatcher::handle_asok_command(
1894 std::string command, cmdmap_t& cmdmap, Formatter *f,
1895 std::ostream& ss)
1896 {
1897 if (command == "dump_ops_in_flight" ||
1898 command == "ops") {
1899 if (!op_tracker.dump_ops_in_flight(f)) {
1900 ss << "op_tracker tracking is not enabled now, so no ops are tracked currently, even those get stuck. \
1901 please enable \"osd_enable_op_tracker\", and the tracker will start to track new ops received afterwards.";
1902 }
1903 } else if (command == "dump_blocked_ops") {
1904 if (!op_tracker.dump_ops_in_flight(f, true)) {
1905 ss << "op_tracker tracking is not enabled now, so no ops are tracked currently, even those get stuck. \
1906 Please enable \"osd_enable_op_tracker\", and the tracker will start to track new ops received afterwards.";
1907 }
1908 } else if (command == "dump_historic_ops") {
1909 if (!op_tracker.dump_historic_ops(f)) {
1910 ss << "op_tracker tracking is not enabled now, so no ops are tracked currently, even those get stuck. \
1911 please enable \"osd_enable_op_tracker\", and the tracker will start to track new ops received afterwards.";
1912 }
1913 } else if (command == "dump_historic_ops_by_duration") {
1914 if (!op_tracker.dump_historic_ops(f, true)) {
1915 ss << "op_tracker tracking is not enabled now, so no ops are tracked currently, even those get stuck. \
1916 please enable \"osd_enable_op_tracker\", and the tracker will start to track new ops received afterwards.";
1917 }
1918 } else if (command == "osdmap barrier") {
1919 int64_t target_epoch = 0;
1920 bool got_val = cmd_getval(g_ceph_context, cmdmap, "target_epoch", target_epoch);
1921
1922 if (!got_val) {
1923 ss << "no target epoch given";
1924 return true;
1925 }
1926
1927 mds_lock.Lock();
1928 set_osd_epoch_barrier(target_epoch);
1929 mds_lock.Unlock();
1930
1931 C_SaferCond cond;
1932 bool already_got = objecter->wait_for_map(target_epoch, &cond);
1933 if (!already_got) {
1934 dout(4) << __func__ << ": waiting for OSD epoch " << target_epoch << dendl;
1935 cond.wait();
1936 }
1937 } else if (command == "session ls") {
1938 Mutex::Locker l(mds_lock);
1939
1940 heartbeat_reset();
1941
1942 dump_sessions(SessionFilter(), f);
1943 } else if (command == "session evict") {
1944 std::string client_id;
1945 const bool got_arg = cmd_getval(g_ceph_context, cmdmap, "client_id", client_id);
1946 if(!got_arg) {
1947 ss << "Invalid client_id specified";
1948 return true;
1949 }
1950
1951 mds_lock.Lock();
1952 std::stringstream dss;
1953 bool evicted = evict_client(strtol(client_id.c_str(), 0, 10), true,
1954 g_conf->mds_session_blacklist_on_evict, dss);
1955 if (!evicted) {
1956 dout(15) << dss.str() << dendl;
1957 ss << dss.str();
1958 }
1959 mds_lock.Unlock();
1960 } else if (command == "scrub_path") {
1961 string path;
1962 vector<string> scrubop_vec;
1963 cmd_getval(g_ceph_context, cmdmap, "scrubops", scrubop_vec);
1964 cmd_getval(g_ceph_context, cmdmap, "path", path);
1965 command_scrub_path(f, path, scrubop_vec);
1966 } else if (command == "tag path") {
1967 string path;
1968 cmd_getval(g_ceph_context, cmdmap, "path", path);
1969 string tag;
1970 cmd_getval(g_ceph_context, cmdmap, "tag", tag);
1971 command_tag_path(f, path, tag);
1972 } else if (command == "flush_path") {
1973 string path;
1974 cmd_getval(g_ceph_context, cmdmap, "path", path);
1975 command_flush_path(f, path);
1976 } else if (command == "flush journal") {
1977 command_flush_journal(f);
1978 } else if (command == "get subtrees") {
1979 command_get_subtrees(f);
1980 } else if (command == "export dir") {
1981 string path;
1982 if(!cmd_getval(g_ceph_context, cmdmap, "path", path)) {
1983 ss << "malformed path";
1984 return true;
1985 }
1986 int64_t rank;
1987 if(!cmd_getval(g_ceph_context, cmdmap, "rank", rank)) {
1988 ss << "malformed rank";
1989 return true;
1990 }
1991 command_export_dir(f, path, (mds_rank_t)rank);
1992 } else if (command == "dump cache") {
1993 Mutex::Locker l(mds_lock);
1994 string path;
1995 int r;
1996 if(!cmd_getval(g_ceph_context, cmdmap, "path", path)) {
1997 r = mdcache->dump_cache(f);
1998 } else {
1999 r = mdcache->dump_cache(path);
2000 }
2001
2002 if (r != 0) {
2003 ss << "Failed to dump cache: " << cpp_strerror(r);
2004 f->reset();
2005 }
2006 } else if (command == "cache status") {
2007 Mutex::Locker l(mds_lock);
2008 int r = mdcache->cache_status(f);
2009 if (r != 0) {
2010 ss << "Failed to get cache status: " << cpp_strerror(r);
2011 }
2012 } else if (command == "dump tree") {
2013 string root;
2014 int64_t depth;
2015 cmd_getval(g_ceph_context, cmdmap, "root", root);
2016 if (!cmd_getval(g_ceph_context, cmdmap, "depth", depth))
2017 depth = -1;
2018 {
2019 Mutex::Locker l(mds_lock);
2020 int r = mdcache->dump_cache(root, depth, f);
2021 if (r != 0) {
2022 ss << "Failed to dump tree: " << cpp_strerror(r);
2023 f->reset();
2024 }
2025 }
2026 } else if (command == "dump loads") {
2027 Mutex::Locker l(mds_lock);
2028 int r = balancer->dump_loads(f);
2029 if (r != 0) {
2030 ss << "Failed to dump loads: " << cpp_strerror(r);
2031 f->reset();
2032 }
2033 } else if (command == "force_readonly") {
2034 Mutex::Locker l(mds_lock);
2035 mdcache->force_readonly();
2036 } else if (command == "dirfrag split") {
2037 command_dirfrag_split(cmdmap, ss);
2038 } else if (command == "dirfrag merge") {
2039 command_dirfrag_merge(cmdmap, ss);
2040 } else if (command == "dirfrag ls") {
2041 command_dirfrag_ls(cmdmap, ss, f);
2042 } else {
2043 return false;
2044 }
2045
2046 return true;
2047 }
2048
2049 class C_MDS_Send_Command_Reply : public MDSInternalContext
2050 {
2051 protected:
2052 MCommand *m;
2053 public:
2054 C_MDS_Send_Command_Reply(MDSRank *_mds, MCommand *_m) :
2055 MDSInternalContext(_mds), m(_m) { m->get(); }
2056 void send (int r, boost::string_view out_str) {
2057 bufferlist bl;
2058 MDSDaemon::send_command_reply(m, mds, r, bl, out_str);
2059 m->put();
2060 }
2061 void finish (int r) override {
2062 send(r, "");
2063 }
2064 };
2065
2066 /**
2067 * This function drops the mds_lock, so don't do anything with
2068 * MDSRank after calling it (we could have gone into shutdown): just
2069 * send your result back to the calling client and finish.
2070 */
2071 void MDSRankDispatcher::evict_clients(const SessionFilter &filter, MCommand *m)
2072 {
2073 C_MDS_Send_Command_Reply *reply = new C_MDS_Send_Command_Reply(this, m);
2074
2075 if (is_any_replay()) {
2076 reply->send(-EAGAIN, "MDS is replaying log");
2077 delete reply;
2078 return;
2079 }
2080
2081 std::list<Session*> victims;
2082 const auto sessions = sessionmap.get_sessions();
2083 for (const auto p : sessions) {
2084 if (!p.first.is_client()) {
2085 continue;
2086 }
2087
2088 Session *s = p.second;
2089
2090 if (filter.match(*s, std::bind(&Server::waiting_for_reconnect, server, std::placeholders::_1))) {
2091 victims.push_back(s);
2092 }
2093 }
2094
2095 dout(20) << __func__ << " matched " << victims.size() << " sessions" << dendl;
2096
2097 if (victims.empty()) {
2098 reply->send(0, "");
2099 delete reply;
2100 return;
2101 }
2102
2103 C_GatherBuilder gather(g_ceph_context, reply);
2104 for (const auto s : victims) {
2105 std::stringstream ss;
2106 evict_client(s->info.inst.name.num(), false,
2107 g_conf->mds_session_blacklist_on_evict, ss, gather.new_sub());
2108 }
2109 gather.activate();
2110 }
2111
2112 void MDSRankDispatcher::dump_sessions(const SessionFilter &filter, Formatter *f) const
2113 {
2114 // Dump sessions, decorated with recovery/replay status
2115 f->open_array_section("sessions");
2116 const ceph::unordered_map<entity_name_t, Session*> session_map = sessionmap.get_sessions();
2117 for (ceph::unordered_map<entity_name_t,Session*>::const_iterator p = session_map.begin();
2118 p != session_map.end();
2119 ++p) {
2120 if (!p->first.is_client()) {
2121 continue;
2122 }
2123
2124 Session *s = p->second;
2125
2126 if (!filter.match(*s, std::bind(&Server::waiting_for_reconnect, server, std::placeholders::_1))) {
2127 continue;
2128 }
2129
2130 f->open_object_section("session");
2131 f->dump_int("id", p->first.num());
2132
2133 f->dump_int("num_leases", s->leases.size());
2134 f->dump_int("num_caps", s->caps.size());
2135
2136 f->dump_string("state", s->get_state_name());
2137 f->dump_int("replay_requests", is_clientreplay() ? s->get_request_count() : 0);
2138 f->dump_unsigned("completed_requests", s->get_num_completed_requests());
2139 f->dump_bool("reconnecting", server->waiting_for_reconnect(p->first.num()));
2140 f->dump_stream("inst") << s->info.inst;
2141 f->open_object_section("client_metadata");
2142 for (map<string, string>::const_iterator i = s->info.client_metadata.begin();
2143 i != s->info.client_metadata.end(); ++i) {
2144 f->dump_string(i->first.c_str(), i->second);
2145 }
2146 f->close_section(); // client_metadata
2147 f->close_section(); //session
2148 }
2149 f->close_section(); //sessions
2150 }
2151
2152 void MDSRank::command_scrub_path(Formatter *f, boost::string_view path, vector<string>& scrubop_vec)
2153 {
2154 bool force = false;
2155 bool recursive = false;
2156 bool repair = false;
2157 for (vector<string>::iterator i = scrubop_vec.begin() ; i != scrubop_vec.end(); ++i) {
2158 if (*i == "force")
2159 force = true;
2160 else if (*i == "recursive")
2161 recursive = true;
2162 else if (*i == "repair")
2163 repair = true;
2164 }
2165 C_SaferCond scond;
2166 {
2167 Mutex::Locker l(mds_lock);
2168 mdcache->enqueue_scrub(path, "", force, recursive, repair, f, &scond);
2169 }
2170 scond.wait();
2171 // scrub_dentry() finishers will dump the data for us; we're done!
2172 }
2173
2174 void MDSRank::command_tag_path(Formatter *f,
2175 boost::string_view path, boost::string_view tag)
2176 {
2177 C_SaferCond scond;
2178 {
2179 Mutex::Locker l(mds_lock);
2180 mdcache->enqueue_scrub(path, tag, true, true, false, f, &scond);
2181 }
2182 scond.wait();
2183 }
2184
2185 void MDSRank::command_flush_path(Formatter *f, boost::string_view path)
2186 {
2187 C_SaferCond scond;
2188 {
2189 Mutex::Locker l(mds_lock);
2190 mdcache->flush_dentry(path, &scond);
2191 }
2192 int r = scond.wait();
2193 f->open_object_section("results");
2194 f->dump_int("return_code", r);
2195 f->close_section(); // results
2196 }
2197
2198 /**
2199 * Wrapper around _command_flush_journal that
2200 * handles serialization of result
2201 */
2202 void MDSRank::command_flush_journal(Formatter *f)
2203 {
2204 assert(f != NULL);
2205
2206 std::stringstream ss;
2207 const int r = _command_flush_journal(&ss);
2208 f->open_object_section("result");
2209 f->dump_string("message", ss.str());
2210 f->dump_int("return_code", r);
2211 f->close_section();
2212 }
2213
2214 /**
2215 * Implementation of "flush journal" asok command.
2216 *
2217 * @param ss
2218 * Optionally populate with a human readable string describing the
2219 * reason for any unexpected return status.
2220 */
2221 int MDSRank::_command_flush_journal(std::stringstream *ss)
2222 {
2223 assert(ss != NULL);
2224
2225 Mutex::Locker l(mds_lock);
2226
2227 if (mdcache->is_readonly()) {
2228 dout(5) << __func__ << ": read-only FS" << dendl;
2229 return -EROFS;
2230 }
2231
2232 if (!is_active()) {
2233 dout(5) << __func__ << ": MDS not active, no-op" << dendl;
2234 return 0;
2235 }
2236
2237 // I need to seal off the current segment, and then mark all previous segments
2238 // for expiry
2239 mdlog->start_new_segment();
2240 int r = 0;
2241
2242 // Flush initially so that all the segments older than our new one
2243 // will be elegible for expiry
2244 {
2245 C_SaferCond mdlog_flushed;
2246 mdlog->flush();
2247 mdlog->wait_for_safe(new MDSInternalContextWrapper(this, &mdlog_flushed));
2248 mds_lock.Unlock();
2249 r = mdlog_flushed.wait();
2250 mds_lock.Lock();
2251 if (r != 0) {
2252 *ss << "Error " << r << " (" << cpp_strerror(r) << ") while flushing journal";
2253 return r;
2254 }
2255 }
2256
2257 // Because we may not be the last wait_for_safe context on MDLog, and
2258 // subsequent contexts might wake up in the middle of our later trim_all
2259 // and interfere with expiry (by e.g. marking dirs/dentries dirty
2260 // on previous log segments), we run a second wait_for_safe here.
2261 // See #10368
2262 {
2263 C_SaferCond mdlog_cleared;
2264 mdlog->wait_for_safe(new MDSInternalContextWrapper(this, &mdlog_cleared));
2265 mds_lock.Unlock();
2266 r = mdlog_cleared.wait();
2267 mds_lock.Lock();
2268 if (r != 0) {
2269 *ss << "Error " << r << " (" << cpp_strerror(r) << ") while flushing journal";
2270 return r;
2271 }
2272 }
2273
2274 // Put all the old log segments into expiring or expired state
2275 dout(5) << __func__ << ": beginning segment expiry" << dendl;
2276 r = mdlog->trim_all();
2277 if (r != 0) {
2278 *ss << "Error " << r << " (" << cpp_strerror(r) << ") while trimming log";
2279 return r;
2280 }
2281
2282 // Attach contexts to wait for all expiring segments to expire
2283 MDSGatherBuilder expiry_gather(g_ceph_context);
2284
2285 const std::set<LogSegment*> &expiring_segments = mdlog->get_expiring_segments();
2286 for (std::set<LogSegment*>::const_iterator i = expiring_segments.begin();
2287 i != expiring_segments.end(); ++i) {
2288 (*i)->wait_for_expiry(expiry_gather.new_sub());
2289 }
2290 dout(5) << __func__ << ": waiting for " << expiry_gather.num_subs_created()
2291 << " segments to expire" << dendl;
2292
2293 if (expiry_gather.has_subs()) {
2294 C_SaferCond cond;
2295 expiry_gather.set_finisher(new MDSInternalContextWrapper(this, &cond));
2296 expiry_gather.activate();
2297
2298 // Drop mds_lock to allow progress until expiry is complete
2299 mds_lock.Unlock();
2300 int r = cond.wait();
2301 mds_lock.Lock();
2302
2303 assert(r == 0); // MDLog is not allowed to raise errors via wait_for_expiry
2304 }
2305
2306 dout(5) << __func__ << ": expiry complete, expire_pos/trim_pos is now " << std::hex <<
2307 mdlog->get_journaler()->get_expire_pos() << "/" <<
2308 mdlog->get_journaler()->get_trimmed_pos() << dendl;
2309
2310 // Now everyone I'm interested in is expired
2311 mdlog->trim_expired_segments();
2312
2313 dout(5) << __func__ << ": trim complete, expire_pos/trim_pos is now " << std::hex <<
2314 mdlog->get_journaler()->get_expire_pos() << "/" <<
2315 mdlog->get_journaler()->get_trimmed_pos() << dendl;
2316
2317 // Flush the journal header so that readers will start from after the flushed region
2318 C_SaferCond wrote_head;
2319 mdlog->get_journaler()->write_head(&wrote_head);
2320 mds_lock.Unlock(); // Drop lock to allow messenger dispatch progress
2321 r = wrote_head.wait();
2322 mds_lock.Lock();
2323 if (r != 0) {
2324 *ss << "Error " << r << " (" << cpp_strerror(r) << ") while writing header";
2325 return r;
2326 }
2327
2328 dout(5) << __func__ << ": write_head complete, all done!" << dendl;
2329
2330 return 0;
2331 }
2332
2333
2334 void MDSRank::command_get_subtrees(Formatter *f)
2335 {
2336 assert(f != NULL);
2337 Mutex::Locker l(mds_lock);
2338
2339 std::list<CDir*> subtrees;
2340 mdcache->list_subtrees(subtrees);
2341
2342 f->open_array_section("subtrees");
2343 for (std::list<CDir*>::iterator i = subtrees.begin(); i != subtrees.end(); ++i) {
2344 const CDir *dir = *i;
2345
2346 f->open_object_section("subtree");
2347 {
2348 f->dump_bool("is_auth", dir->is_auth());
2349 f->dump_int("auth_first", dir->get_dir_auth().first);
2350 f->dump_int("auth_second", dir->get_dir_auth().second);
2351 f->dump_int("export_pin", dir->inode->get_export_pin());
2352 f->open_object_section("dir");
2353 dir->dump(f);
2354 f->close_section();
2355 }
2356 f->close_section();
2357 }
2358 f->close_section();
2359 }
2360
2361
2362 void MDSRank::command_export_dir(Formatter *f,
2363 boost::string_view path,
2364 mds_rank_t target)
2365 {
2366 int r = _command_export_dir(path, target);
2367 f->open_object_section("results");
2368 f->dump_int("return_code", r);
2369 f->close_section(); // results
2370 }
2371
2372 int MDSRank::_command_export_dir(
2373 boost::string_view path,
2374 mds_rank_t target)
2375 {
2376 Mutex::Locker l(mds_lock);
2377 filepath fp(path);
2378
2379 if (target == whoami || !mdsmap->is_up(target) || !mdsmap->is_in(target)) {
2380 derr << "bad MDS target " << target << dendl;
2381 return -ENOENT;
2382 }
2383
2384 CInode *in = mdcache->cache_traverse(fp);
2385 if (!in) {
2386 derr << "Bath path '" << path << "'" << dendl;
2387 return -ENOENT;
2388 }
2389 CDir *dir = in->get_dirfrag(frag_t());
2390 if (!dir || !(dir->is_auth())) {
2391 derr << "bad export_dir path dirfrag frag_t() or dir not auth" << dendl;
2392 return -EINVAL;
2393 }
2394
2395 mdcache->migrator->export_dir(dir, target);
2396 return 0;
2397 }
2398
2399 CDir *MDSRank::_command_dirfrag_get(
2400 const cmdmap_t &cmdmap,
2401 std::ostream &ss)
2402 {
2403 std::string path;
2404 bool got = cmd_getval(g_ceph_context, cmdmap, "path", path);
2405 if (!got) {
2406 ss << "missing path argument";
2407 return NULL;
2408 }
2409
2410 std::string frag_str;
2411 if (!cmd_getval(g_ceph_context, cmdmap, "frag", frag_str)) {
2412 ss << "missing frag argument";
2413 return NULL;
2414 }
2415
2416 CInode *in = mdcache->cache_traverse(filepath(path.c_str()));
2417 if (!in) {
2418 // TODO really we should load something in if it's not in cache,
2419 // but the infrastructure is harder, and we might still be unable
2420 // to act on it if someone else is auth.
2421 ss << "directory '" << path << "' inode not in cache";
2422 return NULL;
2423 }
2424
2425 frag_t fg;
2426
2427 if (!fg.parse(frag_str.c_str())) {
2428 ss << "frag " << frag_str << " failed to parse";
2429 return NULL;
2430 }
2431
2432 CDir *dir = in->get_dirfrag(fg);
2433 if (!dir) {
2434 ss << "frag 0x" << std::hex << in->ino() << "/" << fg << " not in cache ("
2435 "use `dirfrag ls` to see if it should exist)";
2436 return NULL;
2437 }
2438
2439 if (!dir->is_auth()) {
2440 ss << "frag " << dir->dirfrag() << " not auth (auth = "
2441 << dir->authority() << ")";
2442 return NULL;
2443 }
2444
2445 return dir;
2446 }
2447
2448 bool MDSRank::command_dirfrag_split(
2449 cmdmap_t cmdmap,
2450 std::ostream &ss)
2451 {
2452 Mutex::Locker l(mds_lock);
2453 if (!mdsmap->allows_dirfrags()) {
2454 ss << "dirfrags are disallowed by the mds map!";
2455 return false;
2456 }
2457
2458 int64_t by = 0;
2459 if (!cmd_getval(g_ceph_context, cmdmap, "bits", by)) {
2460 ss << "missing bits argument";
2461 return false;
2462 }
2463
2464 if (by <= 0) {
2465 ss << "must split by >0 bits";
2466 return false;
2467 }
2468
2469 CDir *dir = _command_dirfrag_get(cmdmap, ss);
2470 if (!dir) {
2471 return false;
2472 }
2473
2474 mdcache->split_dir(dir, by);
2475
2476 return true;
2477 }
2478
2479 bool MDSRank::command_dirfrag_merge(
2480 cmdmap_t cmdmap,
2481 std::ostream &ss)
2482 {
2483 Mutex::Locker l(mds_lock);
2484 std::string path;
2485 bool got = cmd_getval(g_ceph_context, cmdmap, "path", path);
2486 if (!got) {
2487 ss << "missing path argument";
2488 return false;
2489 }
2490
2491 std::string frag_str;
2492 if (!cmd_getval(g_ceph_context, cmdmap, "frag", frag_str)) {
2493 ss << "missing frag argument";
2494 return false;
2495 }
2496
2497 CInode *in = mdcache->cache_traverse(filepath(path.c_str()));
2498 if (!in) {
2499 ss << "directory '" << path << "' inode not in cache";
2500 return false;
2501 }
2502
2503 frag_t fg;
2504 if (!fg.parse(frag_str.c_str())) {
2505 ss << "frag " << frag_str << " failed to parse";
2506 return false;
2507 }
2508
2509 mdcache->merge_dir(in, fg);
2510
2511 return true;
2512 }
2513
2514 bool MDSRank::command_dirfrag_ls(
2515 cmdmap_t cmdmap,
2516 std::ostream &ss,
2517 Formatter *f)
2518 {
2519 Mutex::Locker l(mds_lock);
2520 std::string path;
2521 bool got = cmd_getval(g_ceph_context, cmdmap, "path", path);
2522 if (!got) {
2523 ss << "missing path argument";
2524 return false;
2525 }
2526
2527 CInode *in = mdcache->cache_traverse(filepath(path.c_str()));
2528 if (!in) {
2529 ss << "directory inode not in cache";
2530 return false;
2531 }
2532
2533 f->open_array_section("frags");
2534 std::list<frag_t> frags;
2535 // NB using get_leaves_under instead of get_dirfrags to give
2536 // you the list of what dirfrags may exist, not which are in cache
2537 in->dirfragtree.get_leaves_under(frag_t(), frags);
2538 for (std::list<frag_t>::iterator i = frags.begin();
2539 i != frags.end(); ++i) {
2540 f->open_object_section("frag");
2541 f->dump_int("value", i->value());
2542 f->dump_int("bits", i->bits());
2543 std::ostringstream frag_str;
2544 frag_str << std::hex << i->value() << "/" << std::dec << i->bits();
2545 f->dump_string("str", frag_str.str());
2546 f->close_section();
2547 }
2548 f->close_section();
2549
2550 return true;
2551 }
2552
2553 void MDSRank::dump_status(Formatter *f) const
2554 {
2555 if (state == MDSMap::STATE_REPLAY ||
2556 state == MDSMap::STATE_STANDBY_REPLAY) {
2557 mdlog->dump_replay_status(f);
2558 } else if (state == MDSMap::STATE_RESOLVE) {
2559 mdcache->dump_resolve_status(f);
2560 } else if (state == MDSMap::STATE_RECONNECT) {
2561 server->dump_reconnect_status(f);
2562 } else if (state == MDSMap::STATE_REJOIN) {
2563 mdcache->dump_rejoin_status(f);
2564 } else if (state == MDSMap::STATE_CLIENTREPLAY) {
2565 dump_clientreplay_status(f);
2566 }
2567 f->dump_float("rank_uptime", get_uptime().count());
2568 }
2569
2570 void MDSRank::dump_clientreplay_status(Formatter *f) const
2571 {
2572 f->open_object_section("clientreplay_status");
2573 f->dump_unsigned("clientreplay_queue", replay_queue.size());
2574 f->dump_unsigned("active_replay", mdcache->get_num_client_requests());
2575 f->close_section();
2576 }
2577
2578 void MDSRankDispatcher::update_log_config()
2579 {
2580 map<string,string> log_to_monitors;
2581 map<string,string> log_to_syslog;
2582 map<string,string> log_channel;
2583 map<string,string> log_prio;
2584 map<string,string> log_to_graylog;
2585 map<string,string> log_to_graylog_host;
2586 map<string,string> log_to_graylog_port;
2587 uuid_d fsid;
2588 string host;
2589
2590 if (parse_log_client_options(g_ceph_context, log_to_monitors, log_to_syslog,
2591 log_channel, log_prio, log_to_graylog,
2592 log_to_graylog_host, log_to_graylog_port,
2593 fsid, host) == 0)
2594 clog->update_config(log_to_monitors, log_to_syslog,
2595 log_channel, log_prio, log_to_graylog,
2596 log_to_graylog_host, log_to_graylog_port,
2597 fsid, host);
2598 dout(10) << __func__ << " log_to_monitors " << log_to_monitors << dendl;
2599 }
2600
2601 void MDSRank::create_logger()
2602 {
2603 dout(10) << "create_logger" << dendl;
2604 {
2605 PerfCountersBuilder mds_plb(g_ceph_context, "mds", l_mds_first, l_mds_last);
2606
2607 mds_plb.add_u64_counter(
2608 l_mds_request, "request", "Requests", "req",
2609 PerfCountersBuilder::PRIO_CRITICAL);
2610 mds_plb.add_u64_counter(l_mds_reply, "reply", "Replies");
2611 mds_plb.add_time_avg(
2612 l_mds_reply_latency, "reply_latency", "Reply latency", "rlat",
2613 PerfCountersBuilder::PRIO_CRITICAL);
2614 mds_plb.add_u64_counter(
2615 l_mds_forward, "forward", "Forwarding request", "fwd",
2616 PerfCountersBuilder::PRIO_INTERESTING);
2617 mds_plb.add_u64_counter(l_mds_dir_fetch, "dir_fetch", "Directory fetch");
2618 mds_plb.add_u64_counter(l_mds_dir_commit, "dir_commit", "Directory commit");
2619 mds_plb.add_u64_counter(l_mds_dir_split, "dir_split", "Directory split");
2620 mds_plb.add_u64_counter(l_mds_dir_merge, "dir_merge", "Directory merge");
2621
2622 mds_plb.add_u64(l_mds_inode_max, "inode_max", "Max inodes, cache size");
2623 mds_plb.add_u64(l_mds_inodes, "inodes", "Inodes", "inos",
2624 PerfCountersBuilder::PRIO_CRITICAL);
2625 mds_plb.add_u64(l_mds_inodes_top, "inodes_top", "Inodes on top");
2626 mds_plb.add_u64(l_mds_inodes_bottom, "inodes_bottom", "Inodes on bottom");
2627 mds_plb.add_u64(
2628 l_mds_inodes_pin_tail, "inodes_pin_tail", "Inodes on pin tail");
2629 mds_plb.add_u64(l_mds_inodes_pinned, "inodes_pinned", "Inodes pinned");
2630 mds_plb.add_u64(l_mds_inodes_expired, "inodes_expired", "Inodes expired");
2631 mds_plb.add_u64(
2632 l_mds_inodes_with_caps, "inodes_with_caps", "Inodes with capabilities");
2633 mds_plb.add_u64(l_mds_caps, "caps", "Capabilities", "caps",
2634 PerfCountersBuilder::PRIO_INTERESTING);
2635 mds_plb.add_u64(l_mds_subtrees, "subtrees", "Subtrees");
2636
2637 mds_plb.add_u64_counter(l_mds_traverse, "traverse", "Traverses");
2638 mds_plb.add_u64_counter(l_mds_traverse_hit, "traverse_hit", "Traverse hits");
2639 mds_plb.add_u64_counter(l_mds_traverse_forward, "traverse_forward",
2640 "Traverse forwards");
2641 mds_plb.add_u64_counter(l_mds_traverse_discover, "traverse_discover",
2642 "Traverse directory discovers");
2643 mds_plb.add_u64_counter(l_mds_traverse_dir_fetch, "traverse_dir_fetch",
2644 "Traverse incomplete directory content fetchings");
2645 mds_plb.add_u64_counter(l_mds_traverse_remote_ino, "traverse_remote_ino",
2646 "Traverse remote dentries");
2647 mds_plb.add_u64_counter(l_mds_traverse_lock, "traverse_lock",
2648 "Traverse locks");
2649
2650 mds_plb.add_u64(l_mds_load_cent, "load_cent", "Load per cent");
2651 mds_plb.add_u64(l_mds_dispatch_queue_len, "q", "Dispatch queue length");
2652
2653 mds_plb.add_u64_counter(l_mds_exported, "exported", "Exports");
2654 mds_plb.add_u64_counter(
2655 l_mds_exported_inodes, "exported_inodes", "Exported inodes", "exi",
2656 PerfCountersBuilder::PRIO_INTERESTING);
2657 mds_plb.add_u64_counter(l_mds_imported, "imported", "Imports");
2658 mds_plb.add_u64_counter(
2659 l_mds_imported_inodes, "imported_inodes", "Imported inodes", "imi",
2660 PerfCountersBuilder::PRIO_INTERESTING);
2661 logger = mds_plb.create_perf_counters();
2662 g_ceph_context->get_perfcounters_collection()->add(logger);
2663 }
2664
2665 {
2666 PerfCountersBuilder mdm_plb(g_ceph_context, "mds_mem", l_mdm_first, l_mdm_last);
2667 mdm_plb.add_u64(l_mdm_ino, "ino", "Inodes", "ino",
2668 PerfCountersBuilder::PRIO_INTERESTING);
2669 mdm_plb.add_u64_counter(l_mdm_inoa, "ino+", "Inodes opened");
2670 mdm_plb.add_u64_counter(l_mdm_inos, "ino-", "Inodes closed");
2671 mdm_plb.add_u64(l_mdm_dir, "dir", "Directories");
2672 mdm_plb.add_u64_counter(l_mdm_dira, "dir+", "Directories opened");
2673 mdm_plb.add_u64_counter(l_mdm_dirs, "dir-", "Directories closed");
2674 mdm_plb.add_u64(l_mdm_dn, "dn", "Dentries", "dn",
2675 PerfCountersBuilder::PRIO_INTERESTING);
2676 mdm_plb.add_u64_counter(l_mdm_dna, "dn+", "Dentries opened");
2677 mdm_plb.add_u64_counter(l_mdm_dns, "dn-", "Dentries closed");
2678 mdm_plb.add_u64(l_mdm_cap, "cap", "Capabilities");
2679 mdm_plb.add_u64_counter(l_mdm_capa, "cap+", "Capabilities added");
2680 mdm_plb.add_u64_counter(l_mdm_caps, "cap-", "Capabilities removed");
2681 mdm_plb.add_u64(l_mdm_rss, "rss", "RSS");
2682 mdm_plb.add_u64(l_mdm_heap, "heap", "Heap size");
2683 mdm_plb.add_u64(l_mdm_buf, "buf", "Buffer size");
2684 mlogger = mdm_plb.create_perf_counters();
2685 g_ceph_context->get_perfcounters_collection()->add(mlogger);
2686 }
2687
2688 mdlog->create_logger();
2689 server->create_logger();
2690 purge_queue.create_logger();
2691 sessionmap.register_perfcounters();
2692 mdcache->register_perfcounters();
2693 }
2694
2695 void MDSRank::check_ops_in_flight()
2696 {
2697 vector<string> warnings;
2698 int slow = 0;
2699 if (op_tracker.check_ops_in_flight(warnings, &slow)) {
2700 for (vector<string>::iterator i = warnings.begin();
2701 i != warnings.end();
2702 ++i) {
2703 clog->warn() << *i;
2704 }
2705 }
2706
2707 // set mds slow request count
2708 mds_slow_req_count = slow;
2709 return;
2710 }
2711
2712 void MDSRankDispatcher::handle_osd_map()
2713 {
2714 if (is_active() && snapserver) {
2715 snapserver->check_osd_map(true);
2716 }
2717
2718 server->handle_osd_map();
2719
2720 purge_queue.update_op_limit(*mdsmap);
2721
2722 std::set<entity_addr_t> newly_blacklisted;
2723 objecter->consume_blacklist_events(&newly_blacklisted);
2724 auto epoch = objecter->with_osdmap([](const OSDMap &o){return o.get_epoch();});
2725 dout(4) << "handle_osd_map epoch " << epoch << ", "
2726 << newly_blacklisted.size() << " new blacklist entries" << dendl;
2727 auto victims = server->apply_blacklist(newly_blacklisted);
2728 if (victims) {
2729 set_osd_epoch_barrier(epoch);
2730 }
2731
2732
2733 // By default the objecter only requests OSDMap updates on use,
2734 // we would like to always receive the latest maps in order to
2735 // apply policy based on the FULL flag.
2736 objecter->maybe_request_map();
2737 }
2738
2739 bool MDSRank::evict_client(int64_t session_id,
2740 bool wait, bool blacklist, std::stringstream& err_ss,
2741 Context *on_killed)
2742 {
2743 assert(mds_lock.is_locked_by_me());
2744
2745 // Mutually exclusive args
2746 assert(!(wait && on_killed != nullptr));
2747
2748 if (is_any_replay()) {
2749 err_ss << "MDS is replaying log";
2750 return false;
2751 }
2752
2753 Session *session = sessionmap.get_session(
2754 entity_name_t(CEPH_ENTITY_TYPE_CLIENT, session_id));
2755 if (!session) {
2756 err_ss << "session " << session_id << " not in sessionmap!";
2757 return false;
2758 }
2759
2760 dout(4) << "Preparing blacklist command... (wait=" << wait << ")" << dendl;
2761 stringstream ss;
2762 ss << "{\"prefix\":\"osd blacklist\", \"blacklistop\":\"add\",";
2763 ss << "\"addr\":\"";
2764 ss << session->info.inst.addr;
2765 ss << "\"}";
2766 std::string tmp = ss.str();
2767 std::vector<std::string> cmd = {tmp};
2768
2769 auto kill_mds_session = [this, session_id, on_killed](){
2770 assert(mds_lock.is_locked_by_me());
2771 Session *session = sessionmap.get_session(
2772 entity_name_t(CEPH_ENTITY_TYPE_CLIENT, session_id));
2773 if (session) {
2774 if (on_killed) {
2775 server->kill_session(session, on_killed);
2776 } else {
2777 C_SaferCond on_safe;
2778 server->kill_session(session, &on_safe);
2779
2780 mds_lock.Unlock();
2781 on_safe.wait();
2782 mds_lock.Lock();
2783 }
2784 } else {
2785 dout(1) << "session " << session_id << " was removed while we waited "
2786 "for blacklist" << dendl;
2787
2788 // Even though it wasn't us that removed it, kick our completion
2789 // as the session has been removed.
2790 if (on_killed) {
2791 on_killed->complete(0);
2792 }
2793 }
2794 };
2795
2796 auto background_blacklist = [this, session_id, cmd](std::function<void ()> fn){
2797 assert(mds_lock.is_locked_by_me());
2798
2799 Context *on_blacklist_done = new FunctionContext([this, session_id, fn](int r) {
2800 objecter->wait_for_latest_osdmap(
2801 new C_OnFinisher(
2802 new FunctionContext([this, session_id, fn](int r) {
2803 Mutex::Locker l(mds_lock);
2804 auto epoch = objecter->with_osdmap([](const OSDMap &o){
2805 return o.get_epoch();
2806 });
2807
2808 set_osd_epoch_barrier(epoch);
2809
2810 fn();
2811 }), finisher)
2812 );
2813 });
2814
2815 dout(4) << "Sending mon blacklist command: " << cmd[0] << dendl;
2816 monc->start_mon_command(cmd, {}, nullptr, nullptr, on_blacklist_done);
2817 };
2818
2819 auto blocking_blacklist = [this, cmd, &err_ss, background_blacklist](){
2820 C_SaferCond inline_ctx;
2821 background_blacklist([&inline_ctx](){inline_ctx.complete(0);});
2822 mds_lock.Unlock();
2823 inline_ctx.wait();
2824 mds_lock.Lock();
2825 };
2826
2827 if (wait) {
2828 if (blacklist) {
2829 blocking_blacklist();
2830 }
2831
2832 // We dropped mds_lock, so check that session still exists
2833 session = sessionmap.get_session(entity_name_t(CEPH_ENTITY_TYPE_CLIENT,
2834 session_id));
2835 if (!session) {
2836 dout(1) << "session " << session_id << " was removed while we waited "
2837 "for blacklist" << dendl;
2838 return true;
2839 }
2840 kill_mds_session();
2841 } else {
2842 if (blacklist) {
2843 background_blacklist(kill_mds_session);
2844 } else {
2845 kill_mds_session();
2846 }
2847 }
2848
2849 return true;
2850 }
2851
2852 void MDSRank::bcast_mds_map()
2853 {
2854 dout(7) << "bcast_mds_map " << mdsmap->get_epoch() << dendl;
2855
2856 // share the map with mounted clients
2857 set<Session*> clients;
2858 sessionmap.get_client_session_set(clients);
2859 for (set<Session*>::const_iterator p = clients.begin();
2860 p != clients.end();
2861 ++p)
2862 (*p)->connection->send_message(new MMDSMap(monc->get_fsid(), mdsmap));
2863 last_client_mdsmap_bcast = mdsmap->get_epoch();
2864 }
2865
2866 MDSRankDispatcher::MDSRankDispatcher(
2867 mds_rank_t whoami_,
2868 Mutex &mds_lock_,
2869 LogChannelRef &clog_,
2870 SafeTimer &timer_,
2871 Beacon &beacon_,
2872 MDSMap *& mdsmap_,
2873 Messenger *msgr,
2874 MonClient *monc_,
2875 Context *respawn_hook_,
2876 Context *suicide_hook_)
2877 : MDSRank(whoami_, mds_lock_, clog_, timer_, beacon_, mdsmap_,
2878 msgr, monc_, respawn_hook_, suicide_hook_)
2879 {}
2880
2881 bool MDSRankDispatcher::handle_command(
2882 const cmdmap_t &cmdmap,
2883 MCommand *m,
2884 int *r,
2885 std::stringstream *ds,
2886 std::stringstream *ss,
2887 bool *need_reply)
2888 {
2889 assert(r != nullptr);
2890 assert(ds != nullptr);
2891 assert(ss != nullptr);
2892
2893 *need_reply = true;
2894
2895 std::string prefix;
2896 cmd_getval(g_ceph_context, cmdmap, "prefix", prefix);
2897
2898 if (prefix == "session ls" || prefix == "client ls") {
2899 std::vector<std::string> filter_args;
2900 cmd_getval(g_ceph_context, cmdmap, "filters", filter_args);
2901
2902 SessionFilter filter;
2903 *r = filter.parse(filter_args, ss);
2904 if (*r != 0) {
2905 return true;
2906 }
2907
2908 Formatter *f = new JSONFormatter(true);
2909 dump_sessions(filter, f);
2910 f->flush(*ds);
2911 delete f;
2912 return true;
2913 } else if (prefix == "session evict" || prefix == "client evict") {
2914 std::vector<std::string> filter_args;
2915 cmd_getval(g_ceph_context, cmdmap, "filters", filter_args);
2916
2917 SessionFilter filter;
2918 *r = filter.parse(filter_args, ss);
2919 if (*r != 0) {
2920 return true;
2921 }
2922
2923 evict_clients(filter, m);
2924
2925 *need_reply = false;
2926 return true;
2927 } else if (prefix == "damage ls") {
2928 Formatter *f = new JSONFormatter(true);
2929 damage_table.dump(f);
2930 f->flush(*ds);
2931 delete f;
2932 return true;
2933 } else if (prefix == "damage rm") {
2934 damage_entry_id_t id = 0;
2935 bool got = cmd_getval(g_ceph_context, cmdmap, "damage_id", (int64_t&)id);
2936 if (!got) {
2937 *r = -EINVAL;
2938 return true;
2939 }
2940
2941 damage_table.erase(id);
2942 return true;
2943 } else {
2944 return false;
2945 }
2946 }
2947
2948 epoch_t MDSRank::get_osd_epoch() const
2949 {
2950 return objecter->with_osdmap(std::mem_fn(&OSDMap::get_epoch));
2951 }
2952