1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2015 Red Hat
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
15 #include <boost/utility/string_view.hpp>
17 #include "common/debug.h"
18 #include "common/errno.h"
20 #include "messages/MClientRequestForward.h"
21 #include "messages/MMDSLoadTargets.h"
22 #include "messages/MMDSMap.h"
23 #include "messages/MMDSTableRequest.h"
24 #include "messages/MCommand.h"
25 #include "messages/MCommandReply.h"
27 #include "MDSDaemon.h"
29 #include "SnapClient.h"
30 #include "SnapServer.h"
31 #include "MDBalancer.h"
35 #include "mon/MonClient.h"
36 #include "common/HeartbeatMap.h"
37 #include "ScrubStack.h"
42 #define dout_context g_ceph_context
43 #define dout_subsys ceph_subsys_mds
45 #define dout_prefix *_dout << "mds." << whoami << '.' << incarnation << ' '
56 Context
*respawn_hook_
,
57 Context
*suicide_hook_
)
59 whoami(whoami_
), incarnation(0),
60 mds_lock(mds_lock_
), cct(msgr
->cct
), clog(clog_
), timer(timer_
),
62 objecter(new Objecter(g_ceph_context
, msgr
, monc_
, nullptr, 0, 0)),
63 server(NULL
), mdcache(NULL
), locker(NULL
), mdlog(NULL
),
64 balancer(NULL
), scrubstack(NULL
),
65 damage_table(whoami_
),
66 inotable(NULL
), snapserver(NULL
), snapclient(NULL
),
67 sessionmap(this), logger(NULL
), mlogger(NULL
),
68 op_tracker(g_ceph_context
, g_conf
->mds_enable_op_tracker
,
69 g_conf
->osd_num_op_tracker_shard
),
70 last_state(MDSMap::STATE_BOOT
),
71 state(MDSMap::STATE_BOOT
),
72 cluster_degraded(false), stopping(false),
73 purge_queue(g_ceph_context
, whoami_
,
74 mdsmap_
->get_metadata_pool(), objecter
,
77 // Purge Queue operates inside mds_lock when we're calling into
78 // it, and outside when in background, so must handle both cases.
79 if (mds_lock
.is_locked_by_me()) {
87 progress_thread(this), dispatch_depth(0),
88 hb(NULL
), last_tid(0), osd_epoch_barrier(0), beacon(beacon_
),
89 mds_slow_req_count(0),
90 last_client_mdsmap_bcast(0),
91 messenger(msgr
), monc(monc_
),
92 respawn_hook(respawn_hook_
),
93 suicide_hook(suicide_hook_
),
94 standby_replaying(false),
95 starttime(mono_clock::now())
97 hb
= g_ceph_context
->get_heartbeat_map()->add_worker("MDSRank", pthread_self());
99 purge_queue
.update_op_limit(*mdsmap
);
101 objecter
->unset_honor_osdmap_full();
103 finisher
= new Finisher(cct
);
105 mdcache
= new MDCache(this, purge_queue
);
106 mdlog
= new MDLog(this);
107 balancer
= new MDBalancer(this, messenger
, monc
);
109 scrubstack
= new ScrubStack(mdcache
, finisher
);
111 inotable
= new InoTable(this);
112 snapserver
= new SnapServer(this, monc
);
113 snapclient
= new SnapClient(this);
115 server
= new Server(this);
116 locker
= new Locker(this, mdcache
);
118 op_tracker
.set_complaint_and_threshold(cct
->_conf
->mds_op_complaint_time
,
119 cct
->_conf
->mds_op_log_threshold
);
120 op_tracker
.set_history_size_and_duration(cct
->_conf
->mds_op_history_size
,
121 cct
->_conf
->mds_op_history_duration
);
127 g_ceph_context
->get_heartbeat_map()->remove_worker(hb
);
130 if (scrubstack
) { delete scrubstack
; scrubstack
= NULL
; }
131 if (mdcache
) { delete mdcache
; mdcache
= NULL
; }
132 if (mdlog
) { delete mdlog
; mdlog
= NULL
; }
133 if (balancer
) { delete balancer
; balancer
= NULL
; }
134 if (inotable
) { delete inotable
; inotable
= NULL
; }
135 if (snapserver
) { delete snapserver
; snapserver
= NULL
; }
136 if (snapclient
) { delete snapclient
; snapclient
= NULL
; }
137 if (mdsmap
) { delete mdsmap
; mdsmap
= 0; }
139 if (server
) { delete server
; server
= 0; }
140 if (locker
) { delete locker
; locker
= 0; }
143 g_ceph_context
->get_perfcounters_collection()->remove(logger
);
148 g_ceph_context
->get_perfcounters_collection()->remove(mlogger
);
166 void MDSRankDispatcher::init()
169 messenger
->add_dispatcher_head(objecter
);
176 // Expose the OSDMap (already populated during MDS::init) to anyone
177 // who is interested in it.
180 progress_thread
.create("mds_rank_progr");
187 void MDSRank::update_targets(utime_t now
)
189 // get MonMap's idea of my export_targets
190 const set
<mds_rank_t
>& map_targets
= mdsmap
->get_mds_info(get_nodeid()).export_targets
;
192 dout(20) << "updating export targets, currently " << map_targets
.size() << " ranks are targets" << dendl
;
195 set
<mds_rank_t
> new_map_targets
;
197 auto it
= export_targets
.begin();
198 while (it
!= export_targets
.end()) {
199 mds_rank_t rank
= it
->first
;
200 double val
= it
->second
.get(now
);
201 dout(20) << "export target mds." << rank
<< " value is " << val
<< " @ " << now
<< dendl
;
204 dout(15) << "export target mds." << rank
<< " is no longer an export target" << dendl
;
205 export_targets
.erase(it
++);
209 if (!map_targets
.count(rank
)) {
210 dout(15) << "export target mds." << rank
<< " not in map's export_targets" << dendl
;
213 new_map_targets
.insert(rank
);
216 if (new_map_targets
.size() < map_targets
.size()) {
217 dout(15) << "export target map holds stale targets, sending update" << dendl
;
222 dout(15) << "updating export_targets, now " << new_map_targets
.size() << " ranks are targets" << dendl
;
223 MMDSLoadTargets
* m
= new MMDSLoadTargets(mds_gid_t(monc
->get_global_id()), new_map_targets
);
224 monc
->send_mon_message(m
);
228 void MDSRank::hit_export_target(utime_t now
, mds_rank_t rank
, double amount
)
230 double rate
= g_conf
->mds_bal_target_decay
;
232 amount
= 100.0/g_conf
->mds_bal_target_decay
; /* a good default for "i am trying to keep this export_target active" */
234 auto em
= export_targets
.emplace(std::piecewise_construct
, std::forward_as_tuple(rank
), std::forward_as_tuple(now
, DecayRate(rate
)));
236 dout(15) << "hit export target (new) " << amount
<< " @ " << now
<< dendl
;
238 dout(15) << "hit export target " << amount
<< " @ " << now
<< dendl
;
240 em
.first
->second
.hit(now
, amount
);
243 void MDSRankDispatcher::tick()
247 if (beacon
.is_laggy()) {
248 dout(5) << "tick bailing out since we seem laggy" << dendl
;
252 check_ops_in_flight();
254 // Wake up thread in case we use to be laggy and have waiting_for_nolaggy
255 // messages to progress.
256 progress_thread
.signal();
258 // make sure mds log flushes, trims periodically
261 if (is_active() || is_stopping()) {
263 mdcache
->trim_client_leases();
264 mdcache
->check_memory_usage();
265 mdlog
->trim(); // NOT during recovery!
270 logger
->set(l_mds_subtrees
, mdcache
->num_subtrees());
276 if (is_clientreplay() || is_active() || is_stopping()) {
277 server
->find_idle_sessions();
282 server
->reconnect_tick();
286 mdcache
->find_stale_fragment_freeze();
287 mdcache
->migrator
->find_stale_export_freeze();
289 snapserver
->check_osd_map(false);
292 if (is_active() || is_stopping()) {
293 update_targets(ceph_clock_now());
299 if (mdcache
->shutdown_pass()) {
300 uint64_t pq_progress
= 0 ;
301 uint64_t pq_total
= 0;
302 size_t pq_in_flight
= 0;
303 if (!purge_queue
.drain(&pq_progress
, &pq_total
, &pq_in_flight
)) {
304 dout(7) << "shutdown_pass=true, but still waiting for purge queue"
306 // This takes unbounded time, so we must indicate progress
307 // to the administrator: we do it in a slightly imperfect way
308 // by sending periodic (tick frequency) clog messages while
310 clog
->info() << "MDS rank " << whoami
<< " waiting for purge queue ("
311 << std::dec
<< pq_progress
<< "/" << pq_total
<< " " << pq_in_flight
312 << " files purging" << ")";
314 dout(7) << "shutdown_pass=true, finished w/ shutdown, moving to "
315 "down:stopped" << dendl
;
320 dout(7) << "shutdown_pass=false" << dendl
;
324 // Expose ourselves to Beacon to update health indicators
325 beacon
.notify_health(this);
328 void MDSRankDispatcher::shutdown()
330 // It should never be possible for shutdown to get called twice, because
331 // anyone picking up mds_lock checks if stopping is true and drops
333 assert(stopping
== false);
336 dout(1) << __func__
<< ": shutting down rank " << whoami
<< dendl
;
340 // MDLog has to shut down before the finisher, because some of its
341 // threads block on IOs that require finisher to complete.
347 purge_queue
.shutdown();
350 finisher
->stop(); // no flushing
353 if (objecter
->initialized
)
354 objecter
->shutdown();
358 op_tracker
.on_shutdown();
360 progress_thread
.shutdown();
362 // release mds_lock for finisher/messenger threads (e.g.
363 // MDSDaemon::ms_handle_reset called from Messenger).
366 // shut down messenger
367 messenger
->shutdown();
371 // Workaround unclean shutdown: HeartbeatMap will assert if
372 // worker is not removed (as we do in ~MDS), but ~MDS is not
373 // always called after suicide.
375 g_ceph_context
->get_heartbeat_map()->remove_worker(hb
);
381 * Helper for simple callbacks that call a void fn with no args.
383 class C_MDS_VoidFn
: public MDSInternalContext
385 typedef void (MDSRank::*fn_ptr
)();
389 C_MDS_VoidFn(MDSRank
*mds_
, fn_ptr fn_
)
390 : MDSInternalContext(mds_
), fn(fn_
)
396 void finish(int r
) override
402 int64_t MDSRank::get_metadata_pool()
404 return mdsmap
->get_metadata_pool();
407 MDSTableClient
*MDSRank::get_table_client(int t
)
410 case TABLE_ANCHOR
: return NULL
;
411 case TABLE_SNAP
: return snapclient
;
412 default: ceph_abort();
416 MDSTableServer
*MDSRank::get_table_server(int t
)
419 case TABLE_ANCHOR
: return NULL
;
420 case TABLE_SNAP
: return snapserver
;
421 default: ceph_abort();
425 void MDSRank::suicide()
428 suicide_hook
->complete(0);
433 void MDSRank::respawn()
436 respawn_hook
->complete(0);
441 void MDSRank::damaged()
443 assert(whoami
!= MDS_RANK_NONE
);
444 assert(mds_lock
.is_locked_by_me());
446 beacon
.set_want_state(mdsmap
, MDSMap::STATE_DAMAGED
);
447 monc
->flush_log(); // Flush any clog error from before we were called
448 beacon
.notify_health(this); // Include latest status in our swan song
449 beacon
.send_and_wait(g_conf
->mds_mon_shutdown_timeout
);
451 // It's okay if we timed out and the mon didn't get our beacon, because
452 // another daemon (or ourselves after respawn) will eventually take the
453 // rank and report DAMAGED again when it hits same problem we did.
455 respawn(); // Respawn into standby in case mon has other work for us
458 void MDSRank::damaged_unlocked()
460 Mutex::Locker
l(mds_lock
);
464 void MDSRank::handle_write_error(int err
)
466 if (err
== -EBLACKLISTED
) {
467 derr
<< "we have been blacklisted (fenced), respawning..." << dendl
;
472 if (g_conf
->mds_action_on_write_error
>= 2) {
473 derr
<< "unhandled write error " << cpp_strerror(err
) << ", suicide..." << dendl
;
475 } else if (g_conf
->mds_action_on_write_error
== 1) {
476 derr
<< "unhandled write error " << cpp_strerror(err
) << ", force readonly..." << dendl
;
477 mdcache
->force_readonly();
480 derr
<< "unhandled write error " << cpp_strerror(err
) << ", ignore..." << dendl
;
484 void *MDSRank::ProgressThread::entry()
486 Mutex::Locker
l(mds
->mds_lock
);
488 while (!mds
->stopping
&&
489 mds
->finished_queue
.empty() &&
490 (mds
->waiting_for_nolaggy
.empty() || mds
->beacon
.is_laggy())) {
491 cond
.Wait(mds
->mds_lock
);
498 mds
->_advance_queues();
505 void MDSRank::ProgressThread::shutdown()
507 assert(mds
->mds_lock
.is_locked_by_me());
508 assert(mds
->stopping
);
511 // Stopping is set, we will fall out of our main loop naturally
513 // Kick the thread to notice mds->stopping, and join it
515 mds
->mds_lock
.Unlock();
518 mds
->mds_lock
.Lock();
522 bool MDSRankDispatcher::ms_dispatch(Message
*m
)
525 inc_dispatch_depth();
526 ret
= _dispatch(m
, true);
527 dec_dispatch_depth();
531 /* If this function returns true, it recognizes the message and has taken the
532 * reference. If it returns false, it has done neither. */
533 bool MDSRank::_dispatch(Message
*m
, bool new_msg
)
535 if (is_stale_message(m
)) {
540 if (beacon
.is_laggy()) {
541 dout(10) << " laggy, deferring " << *m
<< dendl
;
542 waiting_for_nolaggy
.push_back(m
);
543 } else if (new_msg
&& !waiting_for_nolaggy
.empty()) {
544 dout(10) << " there are deferred messages, deferring " << *m
<< dendl
;
545 waiting_for_nolaggy
.push_back(m
);
547 if (!handle_deferrable_message(m
)) {
548 dout(0) << "unrecognized message " << *m
<< dendl
;
555 if (dispatch_depth
> 1)
558 // finish any triggered contexts
561 if (beacon
.is_laggy()) {
562 // We've gone laggy during dispatch, don't do any
567 // done with all client replayed requests?
568 if (is_clientreplay() &&
569 mdcache
->is_open() &&
570 replay_queue
.empty() &&
571 beacon
.get_want_state() == MDSMap::STATE_CLIENTREPLAY
) {
572 int num_requests
= mdcache
->get_num_client_requests();
573 dout(10) << " still have " << num_requests
<< " active replay requests" << dendl
;
574 if (num_requests
== 0)
578 // hack: thrash exports
579 static utime_t start
;
580 utime_t now
= ceph_clock_now();
581 if (start
== utime_t())
583 /*double el = now - start;
586 for (int i
=0; i
<g_conf
->mds_thrash_exports
; i
++) {
588 if (!is_active()) break;
589 mdsmap
->get_mds_set(s
, MDSMap::STATE_ACTIVE
);
590 if (s
.size() < 2 || CInode::count() < 10)
591 break; // need peers for this to work.
592 if (mdcache
->migrator
->get_num_exporting() > g_conf
->mds_thrash_exports
* 5 ||
593 mdcache
->migrator
->get_export_queue_size() > g_conf
->mds_thrash_exports
* 10)
596 dout(7) << "mds thrashing exports pass " << (i
+1) << "/" << g_conf
->mds_thrash_exports
<< dendl
;
598 // pick a random dir inode
599 CInode
*in
= mdcache
->hack_pick_random_inode();
602 in
->get_dirfrags(ls
);
603 if (!ls
.empty()) { // must be an open dir.
604 list
<CDir
*>::iterator p
= ls
.begin();
605 int n
= rand() % ls
.size();
609 if (!dir
->get_parent_dir()) continue; // must be linked.
610 if (!dir
->is_auth()) continue; // must be auth.
614 int k
= rand() % s
.size();
615 set
<mds_rank_t
>::iterator p
= s
.begin();
618 } while (dest
== whoami
);
619 mdcache
->migrator
->export_dir_nicely(dir
,dest
);
622 // hack: thrash fragments
623 for (int i
=0; i
<g_conf
->mds_thrash_fragments
; i
++) {
624 if (!is_active()) break;
625 if (mdcache
->get_num_fragmenting_dirs() > 5 * g_conf
->mds_thrash_fragments
) break;
626 dout(7) << "mds thrashing fragments pass " << (i
+1) << "/" << g_conf
->mds_thrash_fragments
<< dendl
;
628 // pick a random dir inode
629 CInode
*in
= mdcache
->hack_pick_random_inode();
632 in
->get_dirfrags(ls
);
633 if (ls
.empty()) continue; // must be an open dir.
634 CDir
*dir
= ls
.front();
635 if (!dir
->get_parent_dir()) continue; // must be linked.
636 if (!dir
->is_auth()) continue; // must be auth.
637 frag_t fg
= dir
->get_frag();
638 if (mdsmap
->allows_dirfrags()) {
639 if ((fg
== frag_t() || (rand() % (1 << fg
.bits()) == 0))) {
640 mdcache
->split_dir(dir
, 1);
642 balancer
->queue_merge(dir
);
647 // hack: force hash root?
650 mdcache->get_root() &&
651 mdcache->get_root()->dir &&
652 !(mdcache->get_root()->dir->is_hashed() ||
653 mdcache->get_root()->dir->is_hashing())) {
654 dout(0) << "hashing root" << dendl;
655 mdcache->migrator->hash_dir(mdcache->get_root()->dir);
663 void MDSRank::update_mlogger()
666 mlogger
->set(l_mdm_ino
, CInode::count());
667 mlogger
->set(l_mdm_dir
, CDir::count());
668 mlogger
->set(l_mdm_dn
, CDentry::count());
669 mlogger
->set(l_mdm_cap
, Capability::count());
670 mlogger
->set(l_mdm_inoa
, CInode::increments());
671 mlogger
->set(l_mdm_inos
, CInode::decrements());
672 mlogger
->set(l_mdm_dira
, CDir::increments());
673 mlogger
->set(l_mdm_dirs
, CDir::decrements());
674 mlogger
->set(l_mdm_dna
, CDentry::increments());
675 mlogger
->set(l_mdm_dns
, CDentry::decrements());
676 mlogger
->set(l_mdm_capa
, Capability::increments());
677 mlogger
->set(l_mdm_caps
, Capability::decrements());
678 mlogger
->set(l_mdm_buf
, buffer::get_total_alloc());
683 * lower priority messages we defer if we seem laggy
685 bool MDSRank::handle_deferrable_message(Message
*m
)
687 int port
= m
->get_type() & 0xff00;
691 ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MDS
);
692 mdcache
->dispatch(m
);
695 case MDS_PORT_MIGRATOR
:
696 ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MDS
);
697 mdcache
->migrator
->dispatch(m
);
701 switch (m
->get_type()) {
703 case CEPH_MSG_CLIENT_SESSION
:
704 case CEPH_MSG_CLIENT_RECONNECT
:
705 ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_CLIENT
);
707 case CEPH_MSG_CLIENT_REQUEST
:
710 case MSG_MDS_SLAVE_REQUEST
:
711 ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MDS
);
715 case MSG_MDS_HEARTBEAT
:
716 ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MDS
);
717 balancer
->proc_message(m
);
720 case MSG_MDS_TABLE_REQUEST
:
721 ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MDS
);
723 MMDSTableRequest
*req
= static_cast<MMDSTableRequest
*>(m
);
725 MDSTableClient
*client
= get_table_client(req
->table
);
726 client
->handle_request(req
);
728 MDSTableServer
*server
= get_table_server(req
->table
);
729 server
->handle_request(req
);
735 case MSG_MDS_INODEFILECAPS
:
736 ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MDS
);
740 case CEPH_MSG_CLIENT_CAPS
:
741 case CEPH_MSG_CLIENT_CAPRELEASE
:
742 case CEPH_MSG_CLIENT_LEASE
:
743 ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_CLIENT
);
756 * Advance finished_queue and waiting_for_nolaggy.
758 * Usually drain both queues, but may not drain waiting_for_nolaggy
759 * if beacon is currently laggy.
761 void MDSRank::_advance_queues()
763 assert(mds_lock
.is_locked_by_me());
765 while (!finished_queue
.empty()) {
766 dout(7) << "mds has " << finished_queue
.size() << " queued contexts" << dendl
;
767 dout(10) << finished_queue
<< dendl
;
768 list
<MDSInternalContextBase
*> ls
;
769 ls
.swap(finished_queue
);
770 while (!ls
.empty()) {
771 dout(10) << " finish " << ls
.front() << dendl
;
772 ls
.front()->complete(0);
779 while (!waiting_for_nolaggy
.empty()) {
780 // stop if we're laggy now!
781 if (beacon
.is_laggy())
784 Message
*old
= waiting_for_nolaggy
.front();
785 waiting_for_nolaggy
.pop_front();
787 if (is_stale_message(old
)) {
790 dout(7) << " processing laggy deferred " << *old
<< dendl
;
791 if (!handle_deferrable_message(old
)) {
792 dout(0) << "unrecognized message " << *old
<< dendl
;
802 * Call this when you take mds_lock, or periodically if you're going to
803 * hold the lock for a long time (e.g. iterating over clients/inodes)
805 void MDSRank::heartbeat_reset()
807 // Any thread might jump into mds_lock and call us immediately
808 // after a call to suicide() completes, in which case MDSRank::hb
809 // has been freed and we are a no-op.
815 // NB not enabling suicide grace, because the mon takes care of killing us
816 // (by blacklisting us) when we fail to send beacons, and it's simpler to
817 // only have one way of dying.
818 g_ceph_context
->get_heartbeat_map()->reset_timeout(hb
, g_conf
->mds_beacon_grace
, 0);
821 bool MDSRank::is_stale_message(Message
*m
) const
824 if (m
->get_source().is_mds()) {
825 mds_rank_t from
= mds_rank_t(m
->get_source().num());
826 if (!mdsmap
->have_inst(from
) ||
827 mdsmap
->get_inst(from
) != m
->get_source_inst() ||
828 mdsmap
->is_down(from
)) {
830 if (m
->get_type() == CEPH_MSG_MDS_MAP
) {
831 dout(5) << "got " << *m
<< " from old/bad/imposter mds " << m
->get_source()
832 << ", but it's an mdsmap, looking at it" << dendl
;
833 } else if (m
->get_type() == MSG_MDS_CACHEEXPIRE
&&
834 mdsmap
->get_inst(from
) == m
->get_source_inst()) {
835 dout(5) << "got " << *m
<< " from down mds " << m
->get_source()
836 << ", but it's a cache_expire, looking at it" << dendl
;
838 dout(5) << "got " << *m
<< " from down/old/bad/imposter mds " << m
->get_source()
839 << ", dropping" << dendl
;
847 Session
*MDSRank::get_session(Message
*m
)
849 Session
*session
= static_cast<Session
*>(m
->get_connection()->get_priv());
851 session
->put(); // do not carry ref
852 dout(20) << "get_session have " << session
<< " " << session
->info
.inst
853 << " state " << session
->get_state_name() << dendl
;
854 // Check if we've imported an open session since (new sessions start closed)
855 if (session
->is_closed()) {
856 Session
*imported_session
= sessionmap
.get_session(session
->info
.inst
.name
);
857 if (imported_session
&& imported_session
!= session
) {
858 dout(10) << __func__
<< " replacing connection bootstrap session " << session
<< " with imported session " << imported_session
<< dendl
;
859 imported_session
->info
.auth_name
= session
->info
.auth_name
;
860 //assert(session->info.auth_name == imported_session->info.auth_name);
861 assert(session
->info
.inst
== imported_session
->info
.inst
);
862 imported_session
->connection
= session
->connection
;
863 // send out any queued messages
864 while (!session
->preopen_out_queue
.empty()) {
865 imported_session
->connection
->send_message(session
->preopen_out_queue
.front());
866 session
->preopen_out_queue
.pop_front();
868 imported_session
->auth_caps
= session
->auth_caps
;
869 assert(session
->get_nref() == 1);
870 imported_session
->connection
->set_priv(imported_session
->get());
871 session
= imported_session
;
875 dout(20) << "get_session dne for " << m
->get_source_inst() << dendl
;
880 void MDSRank::send_message(Message
*m
, Connection
*c
)
887 void MDSRank::send_message_mds(Message
*m
, mds_rank_t mds
)
889 if (!mdsmap
->is_up(mds
)) {
890 dout(10) << "send_message_mds mds." << mds
<< " not up, dropping " << *m
<< dendl
;
895 // send mdsmap first?
896 if (mds
!= whoami
&& peer_mdsmap_epoch
[mds
] < mdsmap
->get_epoch()) {
897 messenger
->send_message(new MMDSMap(monc
->get_fsid(), mdsmap
),
898 mdsmap
->get_inst(mds
));
899 peer_mdsmap_epoch
[mds
] = mdsmap
->get_epoch();
903 messenger
->send_message(m
, mdsmap
->get_inst(mds
));
906 void MDSRank::forward_message_mds(Message
*m
, mds_rank_t mds
)
908 assert(mds
!= whoami
);
911 if (m
->get_type() == CEPH_MSG_CLIENT_REQUEST
&&
912 (static_cast<MClientRequest
*>(m
))->get_source().is_client()) {
913 MClientRequest
*creq
= static_cast<MClientRequest
*>(m
);
914 creq
->inc_num_fwd(); // inc forward counter
917 * don't actually forward if non-idempotent!
918 * client has to do it. although the MDS will ignore duplicate requests,
919 * the affected metadata may migrate, in which case the new authority
920 * won't have the metareq_id in the completed request map.
922 // NEW: always make the client resend!
923 bool client_must_resend
= true; //!creq->can_forward();
925 // tell the client where it should go
926 messenger
->send_message(new MClientRequestForward(creq
->get_tid(), mds
, creq
->get_num_fwd(),
928 creq
->get_source_inst());
930 if (client_must_resend
) {
936 // these are the only types of messages we should be 'forwarding'; they
937 // explicitly encode their source mds, which gets clobbered when we resend
939 assert(m
->get_type() == MSG_MDS_DIRUPDATE
||
940 m
->get_type() == MSG_MDS_EXPORTDIRDISCOVER
);
942 // send mdsmap first?
943 if (peer_mdsmap_epoch
[mds
] < mdsmap
->get_epoch()) {
944 messenger
->send_message(new MMDSMap(monc
->get_fsid(), mdsmap
),
945 mdsmap
->get_inst(mds
));
946 peer_mdsmap_epoch
[mds
] = mdsmap
->get_epoch();
949 messenger
->send_message(m
, mdsmap
->get_inst(mds
));
954 void MDSRank::send_message_client_counted(Message
*m
, client_t client
)
956 Session
*session
= sessionmap
.get_session(entity_name_t::CLIENT(client
.v
));
958 send_message_client_counted(m
, session
);
960 dout(10) << "send_message_client_counted no session for client." << client
<< " " << *m
<< dendl
;
964 void MDSRank::send_message_client_counted(Message
*m
, Connection
*connection
)
966 Session
*session
= static_cast<Session
*>(connection
->get_priv());
968 session
->put(); // do not carry ref
969 send_message_client_counted(m
, session
);
971 dout(10) << "send_message_client_counted has no session for " << m
->get_source_inst() << dendl
;
972 // another Connection took over the Session
976 void MDSRank::send_message_client_counted(Message
*m
, Session
*session
)
978 version_t seq
= session
->inc_push_seq();
979 dout(10) << "send_message_client_counted " << session
->info
.inst
.name
<< " seq "
980 << seq
<< " " << *m
<< dendl
;
981 if (session
->connection
) {
982 session
->connection
->send_message(m
);
984 session
->preopen_out_queue
.push_back(m
);
988 void MDSRank::send_message_client(Message
*m
, Session
*session
)
990 dout(10) << "send_message_client " << session
->info
.inst
<< " " << *m
<< dendl
;
991 if (session
->connection
) {
992 session
->connection
->send_message(m
);
994 session
->preopen_out_queue
.push_back(m
);
999 * This is used whenever a RADOS operation has been cancelled
1000 * or a RADOS client has been blacklisted, to cause the MDS and
1001 * any clients to wait for this OSD epoch before using any new caps.
1003 * See doc/cephfs/eviction
1005 void MDSRank::set_osd_epoch_barrier(epoch_t e
)
1007 dout(4) << __func__
<< ": epoch=" << e
<< dendl
;
1008 osd_epoch_barrier
= e
;
1011 void MDSRank::retry_dispatch(Message
*m
)
1013 inc_dispatch_depth();
1014 _dispatch(m
, false);
1015 dec_dispatch_depth();
1018 utime_t
MDSRank::get_laggy_until() const
1020 return beacon
.get_laggy_until();
1023 bool MDSRank::is_daemon_stopping() const
1028 void MDSRank::request_state(MDSMap::DaemonState s
)
1030 dout(3) << "request_state " << ceph_mds_state_name(s
) << dendl
;
1031 beacon
.set_want_state(mdsmap
, s
);
1036 class C_MDS_BootStart
: public MDSInternalContext
{
1037 MDSRank::BootStep nextstep
;
1039 C_MDS_BootStart(MDSRank
*m
, MDSRank::BootStep n
)
1040 : MDSInternalContext(m
), nextstep(n
) {}
1041 void finish(int r
) override
{
1042 mds
->boot_start(nextstep
, r
);
1047 void MDSRank::boot_start(BootStep step
, int r
)
1049 // Handle errors from previous step
1051 if (is_standby_replay() && (r
== -EAGAIN
)) {
1052 dout(0) << "boot_start encountered an error EAGAIN"
1053 << ", respawning since we fell behind journal" << dendl
;
1055 } else if (r
== -EINVAL
|| r
== -ENOENT
) {
1056 // Invalid or absent data, indicates damaged on-disk structures
1057 clog
->error() << "Error loading MDS rank " << whoami
<< ": "
1060 assert(r
== 0); // Unreachable, damaged() calls respawn()
1062 // Completely unexpected error, give up and die
1063 dout(0) << "boot_start encountered an error, failing" << dendl
;
1069 assert(is_starting() || is_any_replay());
1072 case MDS_BOOT_INITIAL
:
1074 mdcache
->init_layouts();
1076 MDSGatherBuilder
gather(g_ceph_context
,
1077 new C_MDS_BootStart(this, MDS_BOOT_OPEN_ROOT
));
1078 dout(2) << "boot_start " << step
<< ": opening inotable" << dendl
;
1079 inotable
->set_rank(whoami
);
1080 inotable
->load(gather
.new_sub());
1082 dout(2) << "boot_start " << step
<< ": opening sessionmap" << dendl
;
1083 sessionmap
.set_rank(whoami
);
1084 sessionmap
.load(gather
.new_sub());
1086 dout(2) << "boot_start " << step
<< ": opening mds log" << dendl
;
1087 mdlog
->open(gather
.new_sub());
1089 if (is_starting()) {
1090 dout(2) << "boot_start " << step
<< ": opening purge queue" << dendl
;
1091 purge_queue
.open(new C_IO_Wrapper(this, gather
.new_sub()));
1092 } else if (!standby_replaying
) {
1093 dout(2) << "boot_start " << step
<< ": opening purge queue (async)" << dendl
;
1094 purge_queue
.open(NULL
);
1097 if (mdsmap
->get_tableserver() == whoami
) {
1098 dout(2) << "boot_start " << step
<< ": opening snap table" << dendl
;
1099 snapserver
->set_rank(whoami
);
1100 snapserver
->load(gather
.new_sub());
1106 case MDS_BOOT_OPEN_ROOT
:
1108 dout(2) << "boot_start " << step
<< ": loading/discovering base inodes" << dendl
;
1110 MDSGatherBuilder
gather(g_ceph_context
,
1111 new C_MDS_BootStart(this, MDS_BOOT_PREPARE_LOG
));
1113 if (is_starting()) {
1114 // load mydir frag for the first log segment (creating subtree map)
1115 mdcache
->open_mydir_frag(gather
.new_sub());
1117 mdcache
->open_mydir_inode(gather
.new_sub());
1120 if (whoami
== mdsmap
->get_root()) { // load root inode off disk if we are auth
1121 mdcache
->open_root_inode(gather
.new_sub());
1122 } else if (is_any_replay()) {
1123 // replay. make up fake root inode to start with
1124 mdcache
->create_root_inode();
1129 case MDS_BOOT_PREPARE_LOG
:
1130 if (is_any_replay()) {
1131 dout(2) << "boot_start " << step
<< ": replaying mds log" << dendl
;
1132 MDSGatherBuilder
gather(g_ceph_context
,
1133 new C_MDS_BootStart(this, MDS_BOOT_REPLAY_DONE
));
1135 if (!standby_replaying
) {
1136 dout(2) << "boot_start " << step
<< ": waiting for purge queue recovered" << dendl
;
1137 purge_queue
.wait_for_recovery(new C_IO_Wrapper(this, gather
.new_sub()));
1140 mdlog
->replay(gather
.new_sub());
1143 dout(2) << "boot_start " << step
<< ": positioning at end of old mds log" << dendl
;
1148 case MDS_BOOT_REPLAY_DONE
:
1149 assert(is_any_replay());
1151 // Sessiontable and inotable should be in sync after replay, validate
1152 // that they are consistent.
1153 validate_sessions();
1160 void MDSRank::validate_sessions()
1162 assert(mds_lock
.is_locked_by_me());
1165 // Identify any sessions which have state inconsistent with other,
1166 // after they have been loaded from rados during startup.
1167 // Mitigate bugs like: http://tracker.ceph.com/issues/16842
1168 const auto &sessions
= sessionmap
.get_sessions();
1169 for (const auto &i
: sessions
) {
1170 Session
*session
= i
.second
;
1171 interval_set
<inodeno_t
> badones
;
1172 if (inotable
->intersects_free(session
->info
.prealloc_inos
, &badones
)) {
1173 clog
->error() << "client " << *session
1174 << "loaded with preallocated inodes that are inconsistent with inotable";
1185 void MDSRank::starting_done()
1187 dout(3) << "starting_done" << dendl
;
1188 assert(is_starting());
1189 request_state(MDSMap::STATE_ACTIVE
);
1191 mdlog
->start_new_segment();
1195 void MDSRank::calc_recovery_set()
1197 // initialize gather sets
1199 mdsmap
->get_recovery_mds_set(rs
);
1201 mdcache
->set_recovery_set(rs
);
1203 dout(1) << " recovery set is " << rs
<< dendl
;
1207 void MDSRank::replay_start()
1209 dout(1) << "replay_start" << dendl
;
1211 if (is_standby_replay())
1212 standby_replaying
= true;
1214 calc_recovery_set();
1216 // Check if we need to wait for a newer OSD map before starting
1217 Context
*fin
= new C_IO_Wrapper(this, new C_MDS_BootStart(this, MDS_BOOT_INITIAL
));
1218 bool const ready
= objecter
->wait_for_map(
1219 mdsmap
->get_last_failure_osd_epoch(),
1226 dout(1) << " waiting for osdmap " << mdsmap
->get_last_failure_osd_epoch()
1227 << " (which blacklists prior instance)" << dendl
;
1232 class MDSRank::C_MDS_StandbyReplayRestartFinish
: public MDSIOContext
{
1233 uint64_t old_read_pos
;
1235 C_MDS_StandbyReplayRestartFinish(MDSRank
*mds_
, uint64_t old_read_pos_
) :
1236 MDSIOContext(mds_
), old_read_pos(old_read_pos_
) {}
1237 void finish(int r
) override
{
1238 mds
->_standby_replay_restart_finish(r
, old_read_pos
);
1242 void MDSRank::_standby_replay_restart_finish(int r
, uint64_t old_read_pos
)
1244 if (old_read_pos
< mdlog
->get_journaler()->get_trimmed_pos()) {
1245 dout(0) << "standby MDS fell behind active MDS journal's expire_pos, restarting" << dendl
;
1246 respawn(); /* we're too far back, and this is easier than
1247 trying to reset everything in the cache, etc */
1249 mdlog
->standby_trim_segments();
1250 boot_start(MDS_BOOT_PREPARE_LOG
, r
);
1254 class MDSRank::C_MDS_StandbyReplayRestart
: public MDSInternalContext
{
1256 explicit C_MDS_StandbyReplayRestart(MDSRank
*m
) : MDSInternalContext(m
) {}
1257 void finish(int r
) override
{
1259 mds
->standby_replay_restart();
1263 void MDSRank::standby_replay_restart()
1265 if (standby_replaying
) {
1266 /* Go around for another pass of replaying in standby */
1267 dout(4) << "standby_replay_restart (as standby)" << dendl
;
1268 mdlog
->get_journaler()->reread_head_and_probe(
1269 new C_MDS_StandbyReplayRestartFinish(
1271 mdlog
->get_journaler()->get_read_pos()));
1273 /* We are transitioning out of standby: wait for OSD map update
1274 before making final pass */
1275 dout(1) << "standby_replay_restart (final takeover pass)" << dendl
;
1276 Context
*fin
= new C_IO_Wrapper(this, new C_MDS_StandbyReplayRestart(this));
1277 bool ready
= objecter
->wait_for_map(mdsmap
->get_last_failure_osd_epoch(), fin
);
1280 mdlog
->get_journaler()->reread_head_and_probe(
1281 new C_MDS_StandbyReplayRestartFinish(
1283 mdlog
->get_journaler()->get_read_pos()));
1285 dout(1) << " opening purge queue (async)" << dendl
;
1286 purge_queue
.open(NULL
);
1288 dout(1) << " waiting for osdmap " << mdsmap
->get_last_failure_osd_epoch()
1289 << " (which blacklists prior instance)" << dendl
;
1294 void MDSRank::replay_done()
1296 dout(1) << "replay_done" << (standby_replaying
? " (as standby)" : "") << dendl
;
1298 if (is_standby_replay()) {
1299 // The replay was done in standby state, and we are still in that state
1300 assert(standby_replaying
);
1301 dout(10) << "setting replay timer" << dendl
;
1302 timer
.add_event_after(g_conf
->mds_replay_interval
,
1303 new C_MDS_StandbyReplayRestart(this));
1305 } else if (standby_replaying
) {
1306 // The replay was done in standby state, we have now _left_ that state
1307 dout(10) << " last replay pass was as a standby; making final pass" << dendl
;
1308 standby_replaying
= false;
1309 standby_replay_restart();
1312 // Replay is complete, journal read should be up to date
1313 assert(mdlog
->get_journaler()->get_read_pos() == mdlog
->get_journaler()->get_write_pos());
1314 assert(!is_standby_replay());
1316 // Reformat and come back here
1317 if (mdlog
->get_journaler()->get_stream_format() < g_conf
->mds_journal_format
) {
1318 dout(4) << "reformatting journal on standbyreplay->replay transition" << dendl
;
1319 mdlog
->reopen(new C_MDS_BootStart(this, MDS_BOOT_REPLAY_DONE
));
1324 dout(1) << "making mds journal writeable" << dendl
;
1325 mdlog
->get_journaler()->set_writeable();
1326 mdlog
->get_journaler()->trim_tail();
1328 if (g_conf
->mds_wipe_sessions
) {
1329 dout(1) << "wiping out client sessions" << dendl
;
1331 sessionmap
.save(new C_MDSInternalNoop
);
1333 if (g_conf
->mds_wipe_ino_prealloc
) {
1334 dout(1) << "wiping out ino prealloc from sessions" << dendl
;
1335 sessionmap
.wipe_ino_prealloc();
1336 sessionmap
.save(new C_MDSInternalNoop
);
1338 if (g_conf
->mds_skip_ino
) {
1339 inodeno_t i
= g_conf
->mds_skip_ino
;
1340 dout(1) << "skipping " << i
<< " inodes" << dendl
;
1341 inotable
->skip_inos(i
);
1342 inotable
->save(new C_MDSInternalNoop
);
1345 if (mdsmap
->get_num_in_mds() == 1 &&
1346 mdsmap
->get_num_failed_mds() == 0) { // just me!
1347 dout(2) << "i am alone, moving to state reconnect" << dendl
;
1348 request_state(MDSMap::STATE_RECONNECT
);
1350 dout(2) << "i am not alone, moving to state resolve" << dendl
;
1351 request_state(MDSMap::STATE_RESOLVE
);
1355 void MDSRank::reopen_log()
1357 dout(1) << "reopen_log" << dendl
;
1358 mdcache
->rollback_uncommitted_fragments();
1362 void MDSRank::resolve_start()
1364 dout(1) << "resolve_start" << dendl
;
1368 mdcache
->resolve_start(new C_MDS_VoidFn(this, &MDSRank::resolve_done
));
1369 finish_contexts(g_ceph_context
, waiting_for_resolve
);
1371 void MDSRank::resolve_done()
1373 dout(1) << "resolve_done" << dendl
;
1374 request_state(MDSMap::STATE_RECONNECT
);
1377 void MDSRank::reconnect_start()
1379 dout(1) << "reconnect_start" << dendl
;
1381 if (last_state
== MDSMap::STATE_REPLAY
) {
1385 // Drop any blacklisted clients from the SessionMap before going
1386 // into reconnect, so that we don't wait for them.
1387 objecter
->enable_blacklist_events();
1388 std::set
<entity_addr_t
> blacklist
;
1390 objecter
->with_osdmap([this, &blacklist
, &epoch
](const OSDMap
& o
) {
1391 o
.get_blacklist(&blacklist
);
1392 epoch
= o
.get_epoch();
1394 auto killed
= server
->apply_blacklist(blacklist
);
1395 dout(4) << "reconnect_start: killed " << killed
<< " blacklisted sessions ("
1396 << blacklist
.size() << " blacklist entries, "
1397 << sessionmap
.get_sessions().size() << ")" << dendl
;
1399 set_osd_epoch_barrier(epoch
);
1402 server
->reconnect_clients(new C_MDS_VoidFn(this, &MDSRank::reconnect_done
));
1403 finish_contexts(g_ceph_context
, waiting_for_reconnect
);
1405 void MDSRank::reconnect_done()
1407 dout(1) << "reconnect_done" << dendl
;
1408 request_state(MDSMap::STATE_REJOIN
); // move to rejoin state
1411 void MDSRank::rejoin_joint_start()
1413 dout(1) << "rejoin_joint_start" << dendl
;
1414 mdcache
->rejoin_send_rejoins();
1416 void MDSRank::rejoin_start()
1418 dout(1) << "rejoin_start" << dendl
;
1419 mdcache
->rejoin_start(new C_MDS_VoidFn(this, &MDSRank::rejoin_done
));
1421 void MDSRank::rejoin_done()
1423 dout(1) << "rejoin_done" << dendl
;
1424 mdcache
->show_subtrees();
1425 mdcache
->show_cache();
1427 // funny case: is our cache empty? no subtrees?
1428 if (!mdcache
->is_subtrees()) {
1430 // The root should always have a subtree!
1431 clog
->error() << "No subtrees found for root MDS rank!";
1433 assert(mdcache
->is_subtrees());
1435 dout(1) << " empty cache, no subtrees, leaving cluster" << dendl
;
1436 request_state(MDSMap::STATE_STOPPED
);
1441 if (replay_queue
.empty())
1442 request_state(MDSMap::STATE_ACTIVE
);
1444 request_state(MDSMap::STATE_CLIENTREPLAY
);
1447 void MDSRank::clientreplay_start()
1449 dout(1) << "clientreplay_start" << dendl
;
1450 finish_contexts(g_ceph_context
, waiting_for_replay
); // kick waiters
1451 mdcache
->start_files_to_recover();
1455 bool MDSRank::queue_one_replay()
1457 if (replay_queue
.empty()) {
1458 mdlog
->wait_for_safe(new C_MDS_VoidFn(this, &MDSRank::clientreplay_done
));
1461 queue_waiter(replay_queue
.front());
1462 replay_queue
.pop_front();
1466 void MDSRank::clientreplay_done()
1468 dout(1) << "clientreplay_done" << dendl
;
1469 request_state(MDSMap::STATE_ACTIVE
);
1472 void MDSRank::active_start()
1474 dout(1) << "active_start" << dendl
;
1476 if (last_state
== MDSMap::STATE_CREATING
||
1477 last_state
== MDSMap::STATE_STARTING
) {
1478 mdcache
->open_root();
1481 mdcache
->clean_open_file_lists();
1482 mdcache
->export_remaining_imported_caps();
1483 finish_contexts(g_ceph_context
, waiting_for_replay
); // kick waiters
1484 mdcache
->start_files_to_recover();
1486 mdcache
->reissue_all_caps();
1487 mdcache
->activate_stray_manager();
1489 finish_contexts(g_ceph_context
, waiting_for_active
); // kick waiters
1492 void MDSRank::recovery_done(int oldstate
)
1494 dout(1) << "recovery_done -- successful recovery!" << dendl
;
1495 assert(is_clientreplay() || is_active());
1497 // kick snaptable (resent AGREEs)
1498 if (mdsmap
->get_tableserver() == whoami
) {
1499 set
<mds_rank_t
> active
;
1500 mdsmap
->get_clientreplay_or_active_or_stopping_mds_set(active
);
1501 snapserver
->finish_recovery(active
);
1504 if (oldstate
== MDSMap::STATE_CREATING
)
1507 mdcache
->start_recovered_truncates();
1508 mdcache
->do_file_recover();
1510 // tell connected clients
1511 //bcast_mds_map(); // not anymore, they get this from the monitor
1513 mdcache
->populate_mydir();
1516 void MDSRank::creating_done()
1518 dout(1)<< "creating_done" << dendl
;
1519 request_state(MDSMap::STATE_ACTIVE
);
1522 void MDSRank::boot_create()
1524 dout(3) << "boot_create" << dendl
;
1526 MDSGatherBuilder
fin(g_ceph_context
, new C_MDS_VoidFn(this, &MDSRank::creating_done
));
1528 mdcache
->init_layouts();
1530 snapserver
->set_rank(whoami
);
1531 inotable
->set_rank(whoami
);
1532 sessionmap
.set_rank(whoami
);
1534 // start with a fresh journal
1535 dout(10) << "boot_create creating fresh journal" << dendl
;
1536 mdlog
->create(fin
.new_sub());
1538 // open new journal segment, but do not journal subtree map (yet)
1539 mdlog
->prepare_new_segment();
1541 if (whoami
== mdsmap
->get_root()) {
1542 dout(3) << "boot_create creating fresh hierarchy" << dendl
;
1543 mdcache
->create_empty_hierarchy(fin
.get());
1546 dout(3) << "boot_create creating mydir hierarchy" << dendl
;
1547 mdcache
->create_mydir_hierarchy(fin
.get());
1549 // fixme: fake out inotable (reset, pretend loaded)
1550 dout(10) << "boot_create creating fresh inotable table" << dendl
;
1552 inotable
->save(fin
.new_sub());
1554 // write empty sessionmap
1555 sessionmap
.save(fin
.new_sub());
1557 // Create empty purge queue
1558 purge_queue
.create(new C_IO_Wrapper(this, fin
.new_sub()));
1560 // initialize tables
1561 if (mdsmap
->get_tableserver() == whoami
) {
1562 dout(10) << "boot_create creating fresh snaptable" << dendl
;
1563 snapserver
->reset();
1564 snapserver
->save(fin
.new_sub());
1567 assert(g_conf
->mds_kill_create_at
!= 1);
1569 // ok now journal it
1570 mdlog
->journal_segment_subtree_map(fin
.new_sub());
1573 // Usually we do this during reconnect, but creation skips that.
1574 objecter
->enable_blacklist_events();
1579 void MDSRank::stopping_start()
1581 dout(2) << "stopping_start" << dendl
;
1583 if (mdsmap
->get_num_in_mds() == 1 && !sessionmap
.empty()) {
1584 // we're the only mds up!
1585 dout(0) << "we are the last MDS, and have mounted clients: we cannot flush our journal. suicide!" << dendl
;
1589 mdcache
->shutdown_start();
1592 void MDSRank::stopping_done()
1594 dout(2) << "stopping_done" << dendl
;
1596 // tell monitor we shut down cleanly.
1597 request_state(MDSMap::STATE_STOPPED
);
1600 void MDSRankDispatcher::handle_mds_map(
1604 // I am only to be passed MDSMaps in which I hold a rank
1605 assert(whoami
!= MDS_RANK_NONE
);
1607 MDSMap::DaemonState oldstate
= state
;
1608 mds_gid_t mds_gid
= mds_gid_t(monc
->get_global_id());
1609 state
= mdsmap
->get_state_gid(mds_gid
);
1610 if (state
!= oldstate
) {
1611 last_state
= oldstate
;
1612 incarnation
= mdsmap
->get_inc_gid(mds_gid
);
1615 version_t epoch
= m
->get_epoch();
1617 // note source's map version
1618 if (m
->get_source().is_mds() &&
1619 peer_mdsmap_epoch
[mds_rank_t(m
->get_source().num())] < epoch
) {
1620 dout(15) << " peer " << m
->get_source()
1621 << " has mdsmap epoch >= " << epoch
1623 peer_mdsmap_epoch
[mds_rank_t(m
->get_source().num())] = epoch
;
1626 // Validate state transitions while I hold a rank
1627 if (!MDSMap::state_transition_valid(oldstate
, state
)) {
1628 derr
<< "Invalid state transition " << ceph_mds_state_name(oldstate
)
1629 << "->" << ceph_mds_state_name(state
) << dendl
;
1633 if (oldstate
!= state
) {
1634 // update messenger.
1635 if (state
== MDSMap::STATE_STANDBY_REPLAY
) {
1636 dout(1) << "handle_mds_map i am now mds." << mds_gid
<< "." << incarnation
1637 << " replaying mds." << whoami
<< "." << incarnation
<< dendl
;
1638 messenger
->set_myname(entity_name_t::MDS(mds_gid
));
1640 dout(1) << "handle_mds_map i am now mds." << whoami
<< "." << incarnation
<< dendl
;
1641 messenger
->set_myname(entity_name_t::MDS(whoami
));
1645 // tell objecter my incarnation
1646 if (objecter
->get_client_incarnation() != incarnation
)
1647 objecter
->set_client_incarnation(incarnation
);
1650 if (g_conf
->mds_dump_cache_on_map
)
1651 mdcache
->dump_cache();
1654 if (oldstate
!= state
) {
1655 dout(1) << "handle_mds_map state change "
1656 << ceph_mds_state_name(oldstate
) << " --> "
1657 << ceph_mds_state_name(state
) << dendl
;
1658 beacon
.set_want_state(mdsmap
, state
);
1660 if (oldstate
== MDSMap::STATE_STANDBY_REPLAY
) {
1661 dout(10) << "Monitor activated us! Deactivating replay loop" << dendl
;
1662 assert (state
== MDSMap::STATE_REPLAY
);
1664 // did i just recover?
1665 if ((is_active() || is_clientreplay()) &&
1666 (oldstate
== MDSMap::STATE_CREATING
||
1667 oldstate
== MDSMap::STATE_REJOIN
||
1668 oldstate
== MDSMap::STATE_RECONNECT
))
1669 recovery_done(oldstate
);
1673 } else if (is_any_replay()) {
1675 } else if (is_resolve()) {
1677 } else if (is_reconnect()) {
1679 } else if (is_rejoin()) {
1681 } else if (is_clientreplay()) {
1682 clientreplay_start();
1683 } else if (is_creating()) {
1685 } else if (is_starting()) {
1687 } else if (is_stopping()) {
1688 assert(oldstate
== MDSMap::STATE_ACTIVE
);
1695 // is someone else newly resolving?
1696 if (is_resolve() || is_reconnect() || is_rejoin() ||
1697 is_clientreplay() || is_active() || is_stopping()) {
1698 if (!oldmap
->is_resolving() && mdsmap
->is_resolving()) {
1699 set
<mds_rank_t
> resolve
;
1700 mdsmap
->get_mds_set(resolve
, MDSMap::STATE_RESOLVE
);
1701 dout(10) << " resolve set is " << resolve
<< dendl
;
1702 calc_recovery_set();
1703 mdcache
->send_resolves();
1708 // is everybody finally rejoining?
1709 if (is_rejoin() || is_clientreplay() || is_active() || is_stopping()) {
1711 if (!oldmap
->is_rejoining() && mdsmap
->is_rejoining())
1712 rejoin_joint_start();
1715 if (g_conf
->mds_dump_cache_after_rejoin
&&
1716 oldmap
->is_rejoining() && !mdsmap
->is_rejoining())
1717 mdcache
->dump_cache(); // for DEBUG only
1719 if (oldstate
>= MDSMap::STATE_REJOIN
||
1720 oldstate
== MDSMap::STATE_STARTING
) {
1721 // ACTIVE|CLIENTREPLAY|REJOIN => we can discover from them.
1722 set
<mds_rank_t
> olddis
, dis
;
1723 oldmap
->get_mds_set(olddis
, MDSMap::STATE_ACTIVE
);
1724 oldmap
->get_mds_set(olddis
, MDSMap::STATE_CLIENTREPLAY
);
1725 oldmap
->get_mds_set(olddis
, MDSMap::STATE_REJOIN
);
1726 mdsmap
->get_mds_set(dis
, MDSMap::STATE_ACTIVE
);
1727 mdsmap
->get_mds_set(dis
, MDSMap::STATE_CLIENTREPLAY
);
1728 mdsmap
->get_mds_set(dis
, MDSMap::STATE_REJOIN
);
1729 for (set
<mds_rank_t
>::iterator p
= dis
.begin(); p
!= dis
.end(); ++p
)
1730 if (*p
!= whoami
&& // not me
1731 olddis
.count(*p
) == 0) { // newly so?
1732 mdcache
->kick_discovers(*p
);
1733 mdcache
->kick_open_ino_peers(*p
);
1738 cluster_degraded
= mdsmap
->is_degraded();
1739 if (oldmap
->is_degraded() && !cluster_degraded
&& state
>= MDSMap::STATE_ACTIVE
) {
1740 dout(1) << "cluster recovered." << dendl
;
1741 auto it
= waiting_for_active_peer
.find(MDS_RANK_NONE
);
1742 if (it
!= waiting_for_active_peer
.end()) {
1743 queue_waiters(it
->second
);
1744 waiting_for_active_peer
.erase(it
);
1748 // did someone go active?
1749 if (oldstate
>= MDSMap::STATE_CLIENTREPLAY
&&
1750 (is_clientreplay() || is_active() || is_stopping())) {
1751 set
<mds_rank_t
> oldactive
, active
;
1752 oldmap
->get_mds_set(oldactive
, MDSMap::STATE_ACTIVE
);
1753 oldmap
->get_mds_set(oldactive
, MDSMap::STATE_CLIENTREPLAY
);
1754 mdsmap
->get_mds_set(active
, MDSMap::STATE_ACTIVE
);
1755 mdsmap
->get_mds_set(active
, MDSMap::STATE_CLIENTREPLAY
);
1756 for (set
<mds_rank_t
>::iterator p
= active
.begin(); p
!= active
.end(); ++p
)
1757 if (*p
!= whoami
&& // not me
1758 oldactive
.count(*p
) == 0) // newly so?
1759 handle_mds_recovery(*p
);
1762 // did someone fail?
1765 set
<mds_rank_t
> olddown
, down
;
1766 oldmap
->get_down_mds_set(&olddown
);
1767 mdsmap
->get_down_mds_set(&down
);
1768 for (set
<mds_rank_t
>::iterator p
= down
.begin(); p
!= down
.end(); ++p
) {
1769 if (oldmap
->have_inst(*p
) && olddown
.count(*p
) == 0) {
1770 messenger
->mark_down(oldmap
->get_inst(*p
).addr
);
1771 handle_mds_failure(*p
);
1776 // did someone fail?
1777 // did their addr/inst change?
1780 mdsmap
->get_up_mds_set(up
);
1781 for (set
<mds_rank_t
>::iterator p
= up
.begin(); p
!= up
.end(); ++p
) {
1782 if (oldmap
->have_inst(*p
) &&
1783 oldmap
->get_inst(*p
) != mdsmap
->get_inst(*p
)) {
1784 messenger
->mark_down(oldmap
->get_inst(*p
).addr
);
1785 handle_mds_failure(*p
);
1790 if (is_clientreplay() || is_active() || is_stopping()) {
1792 set
<mds_rank_t
> oldstopped
, stopped
;
1793 oldmap
->get_stopped_mds_set(oldstopped
);
1794 mdsmap
->get_stopped_mds_set(stopped
);
1795 for (set
<mds_rank_t
>::iterator p
= stopped
.begin(); p
!= stopped
.end(); ++p
)
1796 if (oldstopped
.count(*p
) == 0) // newly so?
1797 mdcache
->migrator
->handle_mds_failure_or_stop(*p
);
1801 map
<epoch_t
,list
<MDSInternalContextBase
*> >::iterator p
= waiting_for_mdsmap
.begin();
1802 while (p
!= waiting_for_mdsmap
.end() && p
->first
<= mdsmap
->get_epoch()) {
1803 list
<MDSInternalContextBase
*> ls
;
1805 waiting_for_mdsmap
.erase(p
++);
1806 finish_contexts(g_ceph_context
, ls
);
1811 // Before going active, set OSD epoch barrier to latest (so that
1812 // we don't risk handing out caps to clients with old OSD maps that
1813 // might not include barriers from the previous incarnation of this MDS)
1814 set_osd_epoch_barrier(objecter
->with_osdmap(
1815 std::mem_fn(&OSDMap::get_epoch
)));
1820 MDSMap::mds_info_t info
= mdsmap
->get_info(whoami
);
1822 for (map
<mds_gid_t
,MDSMap::mds_info_t
>::const_iterator p
= mdsmap
->get_mds_info().begin();
1823 p
!= mdsmap
->get_mds_info().end();
1825 if (p
->second
.state
== MDSMap::STATE_STANDBY_REPLAY
&&
1826 (p
->second
.standby_for_rank
== whoami
||(info
.name
.length() && p
->second
.standby_for_name
== info
.name
))) {
1831 mdlog
->set_write_iohint(0);
1833 mdlog
->set_write_iohint(CEPH_OSD_OP_FLAG_FADVISE_DONTNEED
);
1837 if (oldmap
->get_max_mds() != mdsmap
->get_max_mds()) {
1838 purge_queue
.update_op_limit(*mdsmap
);
1842 void MDSRank::handle_mds_recovery(mds_rank_t who
)
1844 dout(5) << "handle_mds_recovery mds." << who
<< dendl
;
1846 mdcache
->handle_mds_recovery(who
);
1848 if (mdsmap
->get_tableserver() == whoami
) {
1849 snapserver
->handle_mds_recovery(who
);
1852 queue_waiters(waiting_for_active_peer
[who
]);
1853 waiting_for_active_peer
.erase(who
);
1856 void MDSRank::handle_mds_failure(mds_rank_t who
)
1858 if (who
== whoami
) {
1859 dout(5) << "handle_mds_failure for myself; not doing anything" << dendl
;
1862 dout(5) << "handle_mds_failure mds." << who
<< dendl
;
1864 mdcache
->handle_mds_failure(who
);
1866 snapclient
->handle_mds_failure(who
);
1869 bool MDSRankDispatcher::handle_asok_command(
1870 std::string command
, cmdmap_t
& cmdmap
, Formatter
*f
,
1873 if (command
== "dump_ops_in_flight" ||
1875 if (!op_tracker
.dump_ops_in_flight(f
)) {
1876 ss
<< "op_tracker tracking is not enabled now, so no ops are tracked currently, even those get stuck. \
1877 please enable \"osd_enable_op_tracker\", and the tracker will start to track new ops received afterwards.";
1879 } else if (command
== "dump_blocked_ops") {
1880 if (!op_tracker
.dump_ops_in_flight(f
, true)) {
1881 ss
<< "op_tracker tracking is not enabled now, so no ops are tracked currently, even those get stuck. \
1882 Please enable \"osd_enable_op_tracker\", and the tracker will start to track new ops received afterwards.";
1884 } else if (command
== "dump_historic_ops") {
1885 if (!op_tracker
.dump_historic_ops(f
)) {
1886 ss
<< "op_tracker tracking is not enabled now, so no ops are tracked currently, even those get stuck. \
1887 please enable \"osd_enable_op_tracker\", and the tracker will start to track new ops received afterwards.";
1889 } else if (command
== "dump_historic_ops_by_duration") {
1890 if (!op_tracker
.dump_historic_ops(f
, true)) {
1891 ss
<< "op_tracker tracking is not enabled now, so no ops are tracked currently, even those get stuck. \
1892 please enable \"osd_enable_op_tracker\", and the tracker will start to track new ops received afterwards.";
1894 } else if (command
== "osdmap barrier") {
1895 int64_t target_epoch
= 0;
1896 bool got_val
= cmd_getval(g_ceph_context
, cmdmap
, "target_epoch", target_epoch
);
1899 ss
<< "no target epoch given";
1904 set_osd_epoch_barrier(target_epoch
);
1908 bool already_got
= objecter
->wait_for_map(target_epoch
, &cond
);
1910 dout(4) << __func__
<< ": waiting for OSD epoch " << target_epoch
<< dendl
;
1913 } else if (command
== "session ls") {
1914 Mutex::Locker
l(mds_lock
);
1918 dump_sessions(SessionFilter(), f
);
1919 } else if (command
== "session evict") {
1920 std::string client_id
;
1921 const bool got_arg
= cmd_getval(g_ceph_context
, cmdmap
, "client_id", client_id
);
1923 ss
<< "Invalid client_id specified";
1928 std::stringstream dss
;
1929 bool evicted
= evict_client(strtol(client_id
.c_str(), 0, 10), true,
1930 g_conf
->mds_session_blacklist_on_evict
, dss
);
1932 dout(15) << dss
.str() << dendl
;
1936 } else if (command
== "scrub_path") {
1938 vector
<string
> scrubop_vec
;
1939 cmd_getval(g_ceph_context
, cmdmap
, "scrubops", scrubop_vec
);
1940 cmd_getval(g_ceph_context
, cmdmap
, "path", path
);
1941 command_scrub_path(f
, path
, scrubop_vec
);
1942 } else if (command
== "tag path") {
1944 cmd_getval(g_ceph_context
, cmdmap
, "path", path
);
1946 cmd_getval(g_ceph_context
, cmdmap
, "tag", tag
);
1947 command_tag_path(f
, path
, tag
);
1948 } else if (command
== "flush_path") {
1950 cmd_getval(g_ceph_context
, cmdmap
, "path", path
);
1951 command_flush_path(f
, path
);
1952 } else if (command
== "flush journal") {
1953 command_flush_journal(f
);
1954 } else if (command
== "get subtrees") {
1955 command_get_subtrees(f
);
1956 } else if (command
== "export dir") {
1958 if(!cmd_getval(g_ceph_context
, cmdmap
, "path", path
)) {
1959 ss
<< "malformed path";
1963 if(!cmd_getval(g_ceph_context
, cmdmap
, "rank", rank
)) {
1964 ss
<< "malformed rank";
1967 command_export_dir(f
, path
, (mds_rank_t
)rank
);
1968 } else if (command
== "dump cache") {
1969 Mutex::Locker
l(mds_lock
);
1972 if(!cmd_getval(g_ceph_context
, cmdmap
, "path", path
)) {
1973 r
= mdcache
->dump_cache(f
);
1975 r
= mdcache
->dump_cache(path
);
1979 ss
<< "Failed to dump cache: " << cpp_strerror(r
);
1982 } else if (command
== "cache status") {
1983 Mutex::Locker
l(mds_lock
);
1984 int r
= mdcache
->cache_status(f
);
1986 ss
<< "Failed to get cache status: " << cpp_strerror(r
);
1988 } else if (command
== "dump tree") {
1991 cmd_getval(g_ceph_context
, cmdmap
, "root", root
);
1992 if (!cmd_getval(g_ceph_context
, cmdmap
, "depth", depth
))
1995 Mutex::Locker
l(mds_lock
);
1996 int r
= mdcache
->dump_cache(root
, depth
, f
);
1998 ss
<< "Failed to dump tree: " << cpp_strerror(r
);
2002 } else if (command
== "dump loads") {
2003 Mutex::Locker
l(mds_lock
);
2004 int r
= balancer
->dump_loads(f
);
2006 ss
<< "Failed to dump loads: " << cpp_strerror(r
);
2009 } else if (command
== "force_readonly") {
2010 Mutex::Locker
l(mds_lock
);
2011 mdcache
->force_readonly();
2012 } else if (command
== "dirfrag split") {
2013 command_dirfrag_split(cmdmap
, ss
);
2014 } else if (command
== "dirfrag merge") {
2015 command_dirfrag_merge(cmdmap
, ss
);
2016 } else if (command
== "dirfrag ls") {
2017 command_dirfrag_ls(cmdmap
, ss
, f
);
2025 class C_MDS_Send_Command_Reply
: public MDSInternalContext
2030 C_MDS_Send_Command_Reply(MDSRank
*_mds
, MCommand
*_m
) :
2031 MDSInternalContext(_mds
), m(_m
) { m
->get(); }
2032 void send (int r
, boost::string_view out_str
) {
2034 MDSDaemon::send_command_reply(m
, mds
, r
, bl
, out_str
);
2037 void finish (int r
) override
{
2043 * This function drops the mds_lock, so don't do anything with
2044 * MDSRank after calling it (we could have gone into shutdown): just
2045 * send your result back to the calling client and finish.
2047 void MDSRankDispatcher::evict_clients(const SessionFilter
&filter
, MCommand
*m
)
2049 C_MDS_Send_Command_Reply
*reply
= new C_MDS_Send_Command_Reply(this, m
);
2051 if (is_any_replay()) {
2052 reply
->send(-EAGAIN
, "MDS is replaying log");
2057 std::list
<Session
*> victims
;
2058 const auto sessions
= sessionmap
.get_sessions();
2059 for (const auto p
: sessions
) {
2060 if (!p
.first
.is_client()) {
2064 Session
*s
= p
.second
;
2066 if (filter
.match(*s
, std::bind(&Server::waiting_for_reconnect
, server
, std::placeholders::_1
))) {
2067 victims
.push_back(s
);
2071 dout(20) << __func__
<< " matched " << victims
.size() << " sessions" << dendl
;
2073 if (victims
.empty()) {
2079 C_GatherBuilder
gather(g_ceph_context
, reply
);
2080 for (const auto s
: victims
) {
2081 std::stringstream ss
;
2082 evict_client(s
->info
.inst
.name
.num(), false,
2083 g_conf
->mds_session_blacklist_on_evict
, ss
, gather
.new_sub());
2088 void MDSRankDispatcher::dump_sessions(const SessionFilter
&filter
, Formatter
*f
) const
2090 // Dump sessions, decorated with recovery/replay status
2091 f
->open_array_section("sessions");
2092 const ceph::unordered_map
<entity_name_t
, Session
*> session_map
= sessionmap
.get_sessions();
2093 for (ceph::unordered_map
<entity_name_t
,Session
*>::const_iterator p
= session_map
.begin();
2094 p
!= session_map
.end();
2096 if (!p
->first
.is_client()) {
2100 Session
*s
= p
->second
;
2102 if (!filter
.match(*s
, std::bind(&Server::waiting_for_reconnect
, server
, std::placeholders::_1
))) {
2106 f
->open_object_section("session");
2107 f
->dump_int("id", p
->first
.num());
2109 f
->dump_int("num_leases", s
->leases
.size());
2110 f
->dump_int("num_caps", s
->caps
.size());
2112 f
->dump_string("state", s
->get_state_name());
2113 f
->dump_int("replay_requests", is_clientreplay() ? s
->get_request_count() : 0);
2114 f
->dump_unsigned("completed_requests", s
->get_num_completed_requests());
2115 f
->dump_bool("reconnecting", server
->waiting_for_reconnect(p
->first
.num()));
2116 f
->dump_stream("inst") << s
->info
.inst
;
2117 f
->open_object_section("client_metadata");
2118 for (map
<string
, string
>::const_iterator i
= s
->info
.client_metadata
.begin();
2119 i
!= s
->info
.client_metadata
.end(); ++i
) {
2120 f
->dump_string(i
->first
.c_str(), i
->second
);
2122 f
->close_section(); // client_metadata
2123 f
->close_section(); //session
2125 f
->close_section(); //sessions
2128 void MDSRank::command_scrub_path(Formatter
*f
, boost::string_view path
, vector
<string
>& scrubop_vec
)
2131 bool recursive
= false;
2132 bool repair
= false;
2133 for (vector
<string
>::iterator i
= scrubop_vec
.begin() ; i
!= scrubop_vec
.end(); ++i
) {
2136 else if (*i
== "recursive")
2138 else if (*i
== "repair")
2143 Mutex::Locker
l(mds_lock
);
2144 mdcache
->enqueue_scrub(path
, "", force
, recursive
, repair
, f
, &scond
);
2147 // scrub_dentry() finishers will dump the data for us; we're done!
2150 void MDSRank::command_tag_path(Formatter
*f
,
2151 boost::string_view path
, boost::string_view tag
)
2155 Mutex::Locker
l(mds_lock
);
2156 mdcache
->enqueue_scrub(path
, tag
, true, true, false, f
, &scond
);
2161 void MDSRank::command_flush_path(Formatter
*f
, boost::string_view path
)
2165 Mutex::Locker
l(mds_lock
);
2166 mdcache
->flush_dentry(path
, &scond
);
2168 int r
= scond
.wait();
2169 f
->open_object_section("results");
2170 f
->dump_int("return_code", r
);
2171 f
->close_section(); // results
2175 * Wrapper around _command_flush_journal that
2176 * handles serialization of result
2178 void MDSRank::command_flush_journal(Formatter
*f
)
2182 std::stringstream ss
;
2183 const int r
= _command_flush_journal(&ss
);
2184 f
->open_object_section("result");
2185 f
->dump_string("message", ss
.str());
2186 f
->dump_int("return_code", r
);
2191 * Implementation of "flush journal" asok command.
2194 * Optionally populate with a human readable string describing the
2195 * reason for any unexpected return status.
2197 int MDSRank::_command_flush_journal(std::stringstream
*ss
)
2201 Mutex::Locker
l(mds_lock
);
2203 if (mdcache
->is_readonly()) {
2204 dout(5) << __func__
<< ": read-only FS" << dendl
;
2209 dout(5) << __func__
<< ": MDS not active, no-op" << dendl
;
2213 // I need to seal off the current segment, and then mark all previous segments
2215 mdlog
->start_new_segment();
2218 // Flush initially so that all the segments older than our new one
2219 // will be elegible for expiry
2221 C_SaferCond mdlog_flushed
;
2223 mdlog
->wait_for_safe(new MDSInternalContextWrapper(this, &mdlog_flushed
));
2225 r
= mdlog_flushed
.wait();
2228 *ss
<< "Error " << r
<< " (" << cpp_strerror(r
) << ") while flushing journal";
2233 // Because we may not be the last wait_for_safe context on MDLog, and
2234 // subsequent contexts might wake up in the middle of our later trim_all
2235 // and interfere with expiry (by e.g. marking dirs/dentries dirty
2236 // on previous log segments), we run a second wait_for_safe here.
2239 C_SaferCond mdlog_cleared
;
2240 mdlog
->wait_for_safe(new MDSInternalContextWrapper(this, &mdlog_cleared
));
2242 r
= mdlog_cleared
.wait();
2245 *ss
<< "Error " << r
<< " (" << cpp_strerror(r
) << ") while flushing journal";
2250 // Put all the old log segments into expiring or expired state
2251 dout(5) << __func__
<< ": beginning segment expiry" << dendl
;
2252 r
= mdlog
->trim_all();
2254 *ss
<< "Error " << r
<< " (" << cpp_strerror(r
) << ") while trimming log";
2258 // Attach contexts to wait for all expiring segments to expire
2259 MDSGatherBuilder
expiry_gather(g_ceph_context
);
2261 const std::set
<LogSegment
*> &expiring_segments
= mdlog
->get_expiring_segments();
2262 for (std::set
<LogSegment
*>::const_iterator i
= expiring_segments
.begin();
2263 i
!= expiring_segments
.end(); ++i
) {
2264 (*i
)->wait_for_expiry(expiry_gather
.new_sub());
2266 dout(5) << __func__
<< ": waiting for " << expiry_gather
.num_subs_created()
2267 << " segments to expire" << dendl
;
2269 if (expiry_gather
.has_subs()) {
2271 expiry_gather
.set_finisher(new MDSInternalContextWrapper(this, &cond
));
2272 expiry_gather
.activate();
2274 // Drop mds_lock to allow progress until expiry is complete
2276 int r
= cond
.wait();
2279 assert(r
== 0); // MDLog is not allowed to raise errors via wait_for_expiry
2282 dout(5) << __func__
<< ": expiry complete, expire_pos/trim_pos is now " << std::hex
<<
2283 mdlog
->get_journaler()->get_expire_pos() << "/" <<
2284 mdlog
->get_journaler()->get_trimmed_pos() << dendl
;
2286 // Now everyone I'm interested in is expired
2287 mdlog
->trim_expired_segments();
2289 dout(5) << __func__
<< ": trim complete, expire_pos/trim_pos is now " << std::hex
<<
2290 mdlog
->get_journaler()->get_expire_pos() << "/" <<
2291 mdlog
->get_journaler()->get_trimmed_pos() << dendl
;
2293 // Flush the journal header so that readers will start from after the flushed region
2294 C_SaferCond wrote_head
;
2295 mdlog
->get_journaler()->write_head(&wrote_head
);
2296 mds_lock
.Unlock(); // Drop lock to allow messenger dispatch progress
2297 r
= wrote_head
.wait();
2300 *ss
<< "Error " << r
<< " (" << cpp_strerror(r
) << ") while writing header";
2304 dout(5) << __func__
<< ": write_head complete, all done!" << dendl
;
2310 void MDSRank::command_get_subtrees(Formatter
*f
)
2313 Mutex::Locker
l(mds_lock
);
2315 std::list
<CDir
*> subtrees
;
2316 mdcache
->list_subtrees(subtrees
);
2318 f
->open_array_section("subtrees");
2319 for (std::list
<CDir
*>::iterator i
= subtrees
.begin(); i
!= subtrees
.end(); ++i
) {
2320 const CDir
*dir
= *i
;
2322 f
->open_object_section("subtree");
2324 f
->dump_bool("is_auth", dir
->is_auth());
2325 f
->dump_int("auth_first", dir
->get_dir_auth().first
);
2326 f
->dump_int("auth_second", dir
->get_dir_auth().second
);
2327 f
->dump_int("export_pin", dir
->inode
->get_export_pin());
2328 f
->open_object_section("dir");
2338 void MDSRank::command_export_dir(Formatter
*f
,
2339 boost::string_view path
,
2342 int r
= _command_export_dir(path
, target
);
2343 f
->open_object_section("results");
2344 f
->dump_int("return_code", r
);
2345 f
->close_section(); // results
2348 int MDSRank::_command_export_dir(
2349 boost::string_view path
,
2352 Mutex::Locker
l(mds_lock
);
2355 if (target
== whoami
|| !mdsmap
->is_up(target
) || !mdsmap
->is_in(target
)) {
2356 derr
<< "bad MDS target " << target
<< dendl
;
2360 CInode
*in
= mdcache
->cache_traverse(fp
);
2362 derr
<< "Bath path '" << path
<< "'" << dendl
;
2365 CDir
*dir
= in
->get_dirfrag(frag_t());
2366 if (!dir
|| !(dir
->is_auth())) {
2367 derr
<< "bad export_dir path dirfrag frag_t() or dir not auth" << dendl
;
2371 mdcache
->migrator
->export_dir(dir
, target
);
2375 CDir
*MDSRank::_command_dirfrag_get(
2376 const cmdmap_t
&cmdmap
,
2380 bool got
= cmd_getval(g_ceph_context
, cmdmap
, "path", path
);
2382 ss
<< "missing path argument";
2386 std::string frag_str
;
2387 if (!cmd_getval(g_ceph_context
, cmdmap
, "frag", frag_str
)) {
2388 ss
<< "missing frag argument";
2392 CInode
*in
= mdcache
->cache_traverse(filepath(path
.c_str()));
2394 // TODO really we should load something in if it's not in cache,
2395 // but the infrastructure is harder, and we might still be unable
2396 // to act on it if someone else is auth.
2397 ss
<< "directory '" << path
<< "' inode not in cache";
2403 if (!fg
.parse(frag_str
.c_str())) {
2404 ss
<< "frag " << frag_str
<< " failed to parse";
2408 CDir
*dir
= in
->get_dirfrag(fg
);
2410 ss
<< "frag 0x" << std::hex
<< in
->ino() << "/" << fg
<< " not in cache ("
2411 "use `dirfrag ls` to see if it should exist)";
2415 if (!dir
->is_auth()) {
2416 ss
<< "frag " << dir
->dirfrag() << " not auth (auth = "
2417 << dir
->authority() << ")";
2424 bool MDSRank::command_dirfrag_split(
2428 Mutex::Locker
l(mds_lock
);
2429 if (!mdsmap
->allows_dirfrags()) {
2430 ss
<< "dirfrags are disallowed by the mds map!";
2435 if (!cmd_getval(g_ceph_context
, cmdmap
, "bits", by
)) {
2436 ss
<< "missing bits argument";
2441 ss
<< "must split by >0 bits";
2445 CDir
*dir
= _command_dirfrag_get(cmdmap
, ss
);
2450 mdcache
->split_dir(dir
, by
);
2455 bool MDSRank::command_dirfrag_merge(
2459 Mutex::Locker
l(mds_lock
);
2461 bool got
= cmd_getval(g_ceph_context
, cmdmap
, "path", path
);
2463 ss
<< "missing path argument";
2467 std::string frag_str
;
2468 if (!cmd_getval(g_ceph_context
, cmdmap
, "frag", frag_str
)) {
2469 ss
<< "missing frag argument";
2473 CInode
*in
= mdcache
->cache_traverse(filepath(path
.c_str()));
2475 ss
<< "directory '" << path
<< "' inode not in cache";
2480 if (!fg
.parse(frag_str
.c_str())) {
2481 ss
<< "frag " << frag_str
<< " failed to parse";
2485 mdcache
->merge_dir(in
, fg
);
2490 bool MDSRank::command_dirfrag_ls(
2495 Mutex::Locker
l(mds_lock
);
2497 bool got
= cmd_getval(g_ceph_context
, cmdmap
, "path", path
);
2499 ss
<< "missing path argument";
2503 CInode
*in
= mdcache
->cache_traverse(filepath(path
.c_str()));
2505 ss
<< "directory inode not in cache";
2509 f
->open_array_section("frags");
2510 std::list
<frag_t
> frags
;
2511 // NB using get_leaves_under instead of get_dirfrags to give
2512 // you the list of what dirfrags may exist, not which are in cache
2513 in
->dirfragtree
.get_leaves_under(frag_t(), frags
);
2514 for (std::list
<frag_t
>::iterator i
= frags
.begin();
2515 i
!= frags
.end(); ++i
) {
2516 f
->open_object_section("frag");
2517 f
->dump_int("value", i
->value());
2518 f
->dump_int("bits", i
->bits());
2519 std::ostringstream frag_str
;
2520 frag_str
<< std::hex
<< i
->value() << "/" << std::dec
<< i
->bits();
2521 f
->dump_string("str", frag_str
.str());
2529 void MDSRank::dump_status(Formatter
*f
) const
2531 if (state
== MDSMap::STATE_REPLAY
||
2532 state
== MDSMap::STATE_STANDBY_REPLAY
) {
2533 mdlog
->dump_replay_status(f
);
2534 } else if (state
== MDSMap::STATE_RESOLVE
) {
2535 mdcache
->dump_resolve_status(f
);
2536 } else if (state
== MDSMap::STATE_RECONNECT
) {
2537 server
->dump_reconnect_status(f
);
2538 } else if (state
== MDSMap::STATE_REJOIN
) {
2539 mdcache
->dump_rejoin_status(f
);
2540 } else if (state
== MDSMap::STATE_CLIENTREPLAY
) {
2541 dump_clientreplay_status(f
);
2543 f
->dump_float("rank_uptime", get_uptime().count());
2546 void MDSRank::dump_clientreplay_status(Formatter
*f
) const
2548 f
->open_object_section("clientreplay_status");
2549 f
->dump_unsigned("clientreplay_queue", replay_queue
.size());
2550 f
->dump_unsigned("active_replay", mdcache
->get_num_client_requests());
2554 void MDSRankDispatcher::update_log_config()
2556 map
<string
,string
> log_to_monitors
;
2557 map
<string
,string
> log_to_syslog
;
2558 map
<string
,string
> log_channel
;
2559 map
<string
,string
> log_prio
;
2560 map
<string
,string
> log_to_graylog
;
2561 map
<string
,string
> log_to_graylog_host
;
2562 map
<string
,string
> log_to_graylog_port
;
2566 if (parse_log_client_options(g_ceph_context
, log_to_monitors
, log_to_syslog
,
2567 log_channel
, log_prio
, log_to_graylog
,
2568 log_to_graylog_host
, log_to_graylog_port
,
2570 clog
->update_config(log_to_monitors
, log_to_syslog
,
2571 log_channel
, log_prio
, log_to_graylog
,
2572 log_to_graylog_host
, log_to_graylog_port
,
2574 dout(10) << __func__
<< " log_to_monitors " << log_to_monitors
<< dendl
;
2577 void MDSRank::create_logger()
2579 dout(10) << "create_logger" << dendl
;
2581 PerfCountersBuilder
mds_plb(g_ceph_context
, "mds", l_mds_first
, l_mds_last
);
2583 mds_plb
.add_u64_counter(
2584 l_mds_request
, "request", "Requests", "req",
2585 PerfCountersBuilder::PRIO_CRITICAL
);
2586 mds_plb
.add_u64_counter(l_mds_reply
, "reply", "Replies");
2587 mds_plb
.add_time_avg(
2588 l_mds_reply_latency
, "reply_latency", "Reply latency", "rlat",
2589 PerfCountersBuilder::PRIO_CRITICAL
);
2590 mds_plb
.add_u64_counter(
2591 l_mds_forward
, "forward", "Forwarding request", "fwd",
2592 PerfCountersBuilder::PRIO_INTERESTING
);
2593 mds_plb
.add_u64_counter(l_mds_dir_fetch
, "dir_fetch", "Directory fetch");
2594 mds_plb
.add_u64_counter(l_mds_dir_commit
, "dir_commit", "Directory commit");
2595 mds_plb
.add_u64_counter(l_mds_dir_split
, "dir_split", "Directory split");
2596 mds_plb
.add_u64_counter(l_mds_dir_merge
, "dir_merge", "Directory merge");
2598 mds_plb
.add_u64(l_mds_inode_max
, "inode_max", "Max inodes, cache size");
2599 mds_plb
.add_u64(l_mds_inodes
, "inodes", "Inodes", "inos",
2600 PerfCountersBuilder::PRIO_CRITICAL
);
2601 mds_plb
.add_u64(l_mds_inodes_top
, "inodes_top", "Inodes on top");
2602 mds_plb
.add_u64(l_mds_inodes_bottom
, "inodes_bottom", "Inodes on bottom");
2604 l_mds_inodes_pin_tail
, "inodes_pin_tail", "Inodes on pin tail");
2605 mds_plb
.add_u64(l_mds_inodes_pinned
, "inodes_pinned", "Inodes pinned");
2606 mds_plb
.add_u64(l_mds_inodes_expired
, "inodes_expired", "Inodes expired");
2608 l_mds_inodes_with_caps
, "inodes_with_caps", "Inodes with capabilities");
2609 mds_plb
.add_u64(l_mds_caps
, "caps", "Capabilities", "caps",
2610 PerfCountersBuilder::PRIO_INTERESTING
);
2611 mds_plb
.add_u64(l_mds_subtrees
, "subtrees", "Subtrees");
2613 mds_plb
.add_u64_counter(l_mds_traverse
, "traverse", "Traverses");
2614 mds_plb
.add_u64_counter(l_mds_traverse_hit
, "traverse_hit", "Traverse hits");
2615 mds_plb
.add_u64_counter(l_mds_traverse_forward
, "traverse_forward",
2616 "Traverse forwards");
2617 mds_plb
.add_u64_counter(l_mds_traverse_discover
, "traverse_discover",
2618 "Traverse directory discovers");
2619 mds_plb
.add_u64_counter(l_mds_traverse_dir_fetch
, "traverse_dir_fetch",
2620 "Traverse incomplete directory content fetchings");
2621 mds_plb
.add_u64_counter(l_mds_traverse_remote_ino
, "traverse_remote_ino",
2622 "Traverse remote dentries");
2623 mds_plb
.add_u64_counter(l_mds_traverse_lock
, "traverse_lock",
2626 mds_plb
.add_u64(l_mds_load_cent
, "load_cent", "Load per cent");
2627 mds_plb
.add_u64(l_mds_dispatch_queue_len
, "q", "Dispatch queue length");
2629 mds_plb
.add_u64_counter(l_mds_exported
, "exported", "Exports");
2630 mds_plb
.add_u64_counter(
2631 l_mds_exported_inodes
, "exported_inodes", "Exported inodes", "exi",
2632 PerfCountersBuilder::PRIO_INTERESTING
);
2633 mds_plb
.add_u64_counter(l_mds_imported
, "imported", "Imports");
2634 mds_plb
.add_u64_counter(
2635 l_mds_imported_inodes
, "imported_inodes", "Imported inodes", "imi",
2636 PerfCountersBuilder::PRIO_INTERESTING
);
2637 logger
= mds_plb
.create_perf_counters();
2638 g_ceph_context
->get_perfcounters_collection()->add(logger
);
2642 PerfCountersBuilder
mdm_plb(g_ceph_context
, "mds_mem", l_mdm_first
, l_mdm_last
);
2643 mdm_plb
.add_u64(l_mdm_ino
, "ino", "Inodes", "ino",
2644 PerfCountersBuilder::PRIO_INTERESTING
);
2645 mdm_plb
.add_u64_counter(l_mdm_inoa
, "ino+", "Inodes opened");
2646 mdm_plb
.add_u64_counter(l_mdm_inos
, "ino-", "Inodes closed");
2647 mdm_plb
.add_u64(l_mdm_dir
, "dir", "Directories");
2648 mdm_plb
.add_u64_counter(l_mdm_dira
, "dir+", "Directories opened");
2649 mdm_plb
.add_u64_counter(l_mdm_dirs
, "dir-", "Directories closed");
2650 mdm_plb
.add_u64(l_mdm_dn
, "dn", "Dentries", "dn",
2651 PerfCountersBuilder::PRIO_INTERESTING
);
2652 mdm_plb
.add_u64_counter(l_mdm_dna
, "dn+", "Dentries opened");
2653 mdm_plb
.add_u64_counter(l_mdm_dns
, "dn-", "Dentries closed");
2654 mdm_plb
.add_u64(l_mdm_cap
, "cap", "Capabilities");
2655 mdm_plb
.add_u64_counter(l_mdm_capa
, "cap+", "Capabilities added");
2656 mdm_plb
.add_u64_counter(l_mdm_caps
, "cap-", "Capabilities removed");
2657 mdm_plb
.add_u64(l_mdm_rss
, "rss", "RSS");
2658 mdm_plb
.add_u64(l_mdm_heap
, "heap", "Heap size");
2659 mdm_plb
.add_u64(l_mdm_buf
, "buf", "Buffer size");
2660 mlogger
= mdm_plb
.create_perf_counters();
2661 g_ceph_context
->get_perfcounters_collection()->add(mlogger
);
2664 mdlog
->create_logger();
2665 server
->create_logger();
2666 purge_queue
.create_logger();
2667 sessionmap
.register_perfcounters();
2668 mdcache
->register_perfcounters();
2671 void MDSRank::check_ops_in_flight()
2673 vector
<string
> warnings
;
2675 if (op_tracker
.check_ops_in_flight(warnings
, &slow
)) {
2676 for (vector
<string
>::iterator i
= warnings
.begin();
2677 i
!= warnings
.end();
2683 // set mds slow request count
2684 mds_slow_req_count
= slow
;
2688 void MDSRankDispatcher::handle_osd_map()
2690 if (is_active() && snapserver
) {
2691 snapserver
->check_osd_map(true);
2694 server
->handle_osd_map();
2696 purge_queue
.update_op_limit(*mdsmap
);
2698 std::set
<entity_addr_t
> newly_blacklisted
;
2699 objecter
->consume_blacklist_events(&newly_blacklisted
);
2700 auto epoch
= objecter
->with_osdmap([](const OSDMap
&o
){return o
.get_epoch();});
2701 dout(4) << "handle_osd_map epoch " << epoch
<< ", "
2702 << newly_blacklisted
.size() << " new blacklist entries" << dendl
;
2703 auto victims
= server
->apply_blacklist(newly_blacklisted
);
2705 set_osd_epoch_barrier(epoch
);
2709 // By default the objecter only requests OSDMap updates on use,
2710 // we would like to always receive the latest maps in order to
2711 // apply policy based on the FULL flag.
2712 objecter
->maybe_request_map();
2715 bool MDSRank::evict_client(int64_t session_id
,
2716 bool wait
, bool blacklist
, std::stringstream
& err_ss
,
2719 assert(mds_lock
.is_locked_by_me());
2721 // Mutually exclusive args
2722 assert(!(wait
&& on_killed
!= nullptr));
2724 if (is_any_replay()) {
2725 err_ss
<< "MDS is replaying log";
2729 Session
*session
= sessionmap
.get_session(
2730 entity_name_t(CEPH_ENTITY_TYPE_CLIENT
, session_id
));
2732 err_ss
<< "session " << session_id
<< " not in sessionmap!";
2736 dout(4) << "Preparing blacklist command... (wait=" << wait
<< ")" << dendl
;
2738 ss
<< "{\"prefix\":\"osd blacklist\", \"blacklistop\":\"add\",";
2739 ss
<< "\"addr\":\"";
2740 ss
<< session
->info
.inst
.addr
;
2742 std::string tmp
= ss
.str();
2743 std::vector
<std::string
> cmd
= {tmp
};
2745 auto kill_mds_session
= [this, session_id
, on_killed
](){
2746 assert(mds_lock
.is_locked_by_me());
2747 Session
*session
= sessionmap
.get_session(
2748 entity_name_t(CEPH_ENTITY_TYPE_CLIENT
, session_id
));
2751 server
->kill_session(session
, on_killed
);
2753 C_SaferCond on_safe
;
2754 server
->kill_session(session
, &on_safe
);
2761 dout(1) << "session " << session_id
<< " was removed while we waited "
2762 "for blacklist" << dendl
;
2764 // Even though it wasn't us that removed it, kick our completion
2765 // as the session has been removed.
2767 on_killed
->complete(0);
2772 auto background_blacklist
= [this, session_id
, cmd
](std::function
<void ()> fn
){
2773 assert(mds_lock
.is_locked_by_me());
2775 Context
*on_blacklist_done
= new FunctionContext([this, session_id
, fn
](int r
) {
2776 objecter
->wait_for_latest_osdmap(
2778 new FunctionContext([this, session_id
, fn
](int r
) {
2779 Mutex::Locker
l(mds_lock
);
2780 auto epoch
= objecter
->with_osdmap([](const OSDMap
&o
){
2781 return o
.get_epoch();
2784 set_osd_epoch_barrier(epoch
);
2791 dout(4) << "Sending mon blacklist command: " << cmd
[0] << dendl
;
2792 monc
->start_mon_command(cmd
, {}, nullptr, nullptr, on_blacklist_done
);
2795 auto blocking_blacklist
= [this, cmd
, &err_ss
, background_blacklist
](){
2796 C_SaferCond inline_ctx
;
2797 background_blacklist([&inline_ctx
](){inline_ctx
.complete(0);});
2805 blocking_blacklist();
2808 // We dropped mds_lock, so check that session still exists
2809 session
= sessionmap
.get_session(entity_name_t(CEPH_ENTITY_TYPE_CLIENT
,
2812 dout(1) << "session " << session_id
<< " was removed while we waited "
2813 "for blacklist" << dendl
;
2819 background_blacklist(kill_mds_session
);
2828 void MDSRank::bcast_mds_map()
2830 dout(7) << "bcast_mds_map " << mdsmap
->get_epoch() << dendl
;
2832 // share the map with mounted clients
2833 set
<Session
*> clients
;
2834 sessionmap
.get_client_session_set(clients
);
2835 for (set
<Session
*>::const_iterator p
= clients
.begin();
2838 (*p
)->connection
->send_message(new MMDSMap(monc
->get_fsid(), mdsmap
));
2839 last_client_mdsmap_bcast
= mdsmap
->get_epoch();
2842 MDSRankDispatcher::MDSRankDispatcher(
2845 LogChannelRef
&clog_
,
2851 Context
*respawn_hook_
,
2852 Context
*suicide_hook_
)
2853 : MDSRank(whoami_
, mds_lock_
, clog_
, timer_
, beacon_
, mdsmap_
,
2854 msgr
, monc_
, respawn_hook_
, suicide_hook_
)
2857 bool MDSRankDispatcher::handle_command(
2858 const cmdmap_t
&cmdmap
,
2861 std::stringstream
*ds
,
2862 std::stringstream
*ss
,
2865 assert(r
!= nullptr);
2866 assert(ds
!= nullptr);
2867 assert(ss
!= nullptr);
2872 cmd_getval(g_ceph_context
, cmdmap
, "prefix", prefix
);
2874 if (prefix
== "session ls" || prefix
== "client ls") {
2875 std::vector
<std::string
> filter_args
;
2876 cmd_getval(g_ceph_context
, cmdmap
, "filters", filter_args
);
2878 SessionFilter filter
;
2879 *r
= filter
.parse(filter_args
, ss
);
2884 Formatter
*f
= new JSONFormatter(true);
2885 dump_sessions(filter
, f
);
2889 } else if (prefix
== "session evict" || prefix
== "client evict") {
2890 std::vector
<std::string
> filter_args
;
2891 cmd_getval(g_ceph_context
, cmdmap
, "filters", filter_args
);
2893 SessionFilter filter
;
2894 *r
= filter
.parse(filter_args
, ss
);
2899 evict_clients(filter
, m
);
2901 *need_reply
= false;
2903 } else if (prefix
== "damage ls") {
2904 Formatter
*f
= new JSONFormatter(true);
2905 damage_table
.dump(f
);
2909 } else if (prefix
== "damage rm") {
2910 damage_entry_id_t id
= 0;
2911 bool got
= cmd_getval(g_ceph_context
, cmdmap
, "damage_id", (int64_t&)id
);
2917 damage_table
.erase(id
);
2924 epoch_t
MDSRank::get_osd_epoch() const
2926 return objecter
->with_osdmap(std::mem_fn(&OSDMap::get_epoch
));