1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
17 #include "include/compat.h"
18 #include "include/types.h"
19 #include "include/str_list.h"
21 #include "common/Clock.h"
22 #include "common/HeartbeatMap.h"
23 #include "common/Timer.h"
24 #include "common/ceph_argparse.h"
25 #include "common/config.h"
26 #include "common/entity_name.h"
27 #include "common/errno.h"
28 #include "common/perf_counters.h"
29 #include "common/signal.h"
30 #include "common/version.h"
32 #include "global/signal_handler.h"
34 #include "msg/Messenger.h"
35 #include "mon/MonClient.h"
37 #include "osdc/Objecter.h"
41 #include "MDSDaemon.h"
45 #include "SnapServer.h"
46 #include "SnapClient.h"
48 #include "events/ESession.h"
49 #include "events/ESubtreeMap.h"
51 #include "auth/AuthAuthorizeHandler.h"
52 #include "auth/RotatingKeyRing.h"
53 #include "auth/KeyRing.h"
55 #include "perfglue/cpu_profiler.h"
56 #include "perfglue/heap_profiler.h"
58 #define dout_context g_ceph_context
59 #define dout_subsys ceph_subsys_mds
61 #define dout_prefix *_dout << "mds." << name << ' '
65 using TOPNSPC::common::cmd_getval
;
68 MDSDaemon::MDSDaemon(std::string_view n
, Messenger
*m
, MonClient
*mc
,
69 boost::asio::io_context
& ioctx
) :
71 timer(m
->cct
, mds_lock
),
72 gss_ktfile_client(m
->cct
->_conf
.get_val
<std::string
>("gss_ktab_client_file")),
73 beacon(m
->cct
, mc
, n
),
78 mgrc(m
->cct
, m
, &mc
->monmap
),
79 log_client(m
->cct
, messenger
, &mc
->monmap
, LogClient::NO_FLAGS
),
80 starttime(mono_clock::now())
85 clog
= log_client
.create_channel();
86 if (!gss_ktfile_client
.empty()) {
87 // Assert we can export environment variable
89 The default client keytab is used, if it is present and readable,
90 to automatically obtain initial credentials for GSSAPI client
91 applications. The principal name of the first entry in the client
92 keytab is used by default when obtaining initial credentials.
93 1. The KRB5_CLIENT_KTNAME environment variable.
94 2. The default_client_keytab_name profile variable in [libdefaults].
95 3. The hardcoded default, DEFCKTNAME.
97 const int32_t set_result(setenv("KRB5_CLIENT_KTNAME",
98 gss_ktfile_client
.c_str(), 1));
99 ceph_assert(set_result
== 0);
102 mdsmap
.reset(new MDSMap
);
105 MDSDaemon::~MDSDaemon() {
106 std::lock_guard
lock(mds_lock
);
112 class MDSSocketHook
: public AdminSocketHook
{
115 explicit MDSSocketHook(MDSDaemon
*m
) : mds(m
) {}
117 std::string_view command
,
118 const cmdmap_t
& cmdmap
,
121 ceph::buffer::list
& out
) override
{
122 ceph_abort("should go to call_async");
125 std::string_view command
,
126 const cmdmap_t
& cmdmap
,
128 const bufferlist
& inbl
,
129 std::function
<void(int,const std::string
&,bufferlist
&)> on_finish
) override
{
130 mds
->asok_command(command
, cmdmap
, f
, inbl
, on_finish
);
134 void MDSDaemon::asok_command(
135 std::string_view command
,
136 const cmdmap_t
& cmdmap
,
138 const bufferlist
& inbl
,
139 std::function
<void(int,const std::string
&,bufferlist
&)> on_finish
)
141 dout(1) << "asok_command: " << command
<< " " << cmdmap
142 << " (starting...)" << dendl
;
144 int r
= -CEPHFS_ENOSYS
;
146 CachedStackStringStream css
;
148 if (command
== "status") {
151 } else if (command
== "exit") {
152 outbl
.append("Exiting...\n");
154 std::thread
t([this](){
155 // Wait a little to improve chances of caller getting
156 // our response before seeing us disappear from mdsmap
158 std::lock_guard
l(mds_lock
);
162 } else if (command
== "respawn") {
163 outbl
.append("Respawning...\n");
165 std::thread
t([this](){
166 // Wait a little to improve chances of caller getting
167 // our response before seeing us disappear from mdsmap
169 std::lock_guard
l(mds_lock
);
173 } else if (command
== "heap") {
174 if (!ceph_using_tcmalloc()) {
175 ss
<< "not using tcmalloc";
176 r
= -CEPHFS_EOPNOTSUPP
;
179 cmd_getval(cmdmap
, "heapcmd", heapcmd
);
180 vector
<string
> heapcmd_vec
;
181 get_str_vec(heapcmd
, heapcmd_vec
);
183 if (cmd_getval(cmdmap
, "value", value
)) {
184 heapcmd_vec
.push_back(value
);
186 std::stringstream outss
;
187 ceph_heap_profiler_handle_command(heapcmd_vec
, outss
);
191 } else if (command
== "cpu_profiler") {
193 cmd_getval(cmdmap
, "arg", arg
);
194 vector
<string
> argvec
;
195 get_str_vec(arg
, argvec
);
196 cpu_profiler_handle_command(argvec
, ss
);
199 if (mds_rank
== NULL
) {
200 dout(1) << "Can't run that command on an inactive MDS!" << dendl
;
201 f
->dump_string("error", "mds_not_active");
204 mds_rank
->handle_asok_command(command
, cmdmap
, f
, inbl
, on_finish
);
206 } catch (const TOPNSPC::common::bad_cmd_get
& e
) {
212 on_finish(r
, ss
.str(), outbl
);
215 void MDSDaemon::dump_status(Formatter
*f
)
217 f
->open_object_section("status");
218 f
->dump_stream("cluster_fsid") << monc
->get_fsid();
220 f
->dump_int("whoami", mds_rank
->get_nodeid());
222 f
->dump_int("whoami", MDS_RANK_NONE
);
225 f
->dump_int("id", monc
->get_global_id());
226 f
->dump_string("want_state", ceph_mds_state_name(beacon
.get_want_state()));
227 f
->dump_string("state", ceph_mds_state_name(mdsmap
->get_state_gid(mds_gid_t(
228 monc
->get_global_id()))));
230 std::lock_guard
l(mds_lock
);
231 mds_rank
->dump_status(f
);
234 f
->dump_unsigned("mdsmap_epoch", mdsmap
->get_epoch());
236 f
->dump_unsigned("osdmap_epoch", mds_rank
->get_osd_epoch());
237 f
->dump_unsigned("osdmap_epoch_barrier", mds_rank
->get_osd_epoch_barrier());
239 f
->dump_unsigned("osdmap_epoch", 0);
240 f
->dump_unsigned("osdmap_epoch_barrier", 0);
243 f
->dump_float("uptime", get_uptime().count());
245 f
->close_section(); // status
248 void MDSDaemon::set_up_admin_socket()
251 AdminSocket
*admin_socket
= g_ceph_context
->get_admin_socket();
252 ceph_assert(asok_hook
== nullptr);
253 asok_hook
= new MDSSocketHook(this);
254 r
= admin_socket
->register_command("status", asok_hook
,
255 "high-level status of MDS");
257 r
= admin_socket
->register_command("dump_ops_in_flight", asok_hook
,
258 "show the ops currently in flight");
260 r
= admin_socket
->register_command("ops", asok_hook
,
261 "show the ops currently in flight");
263 r
= admin_socket
->register_command("dump_blocked_ops",
265 "show the blocked ops currently in flight");
267 r
= admin_socket
->register_command("dump_historic_ops",
271 r
= admin_socket
->register_command("dump_historic_ops_by_duration",
273 "show recent ops, sorted by op duration");
275 r
= admin_socket
->register_command("scrub_path name=path,type=CephString "
276 "name=scrubops,type=CephChoices,"
277 "strings=force|recursive|repair,n=N,req=false "
278 "name=tag,type=CephString,req=false",
280 "scrub an inode and output results");
282 r
= admin_socket
->register_command("scrub start "
283 "name=path,type=CephString "
284 "name=scrubops,type=CephChoices,strings=force|recursive|repair,n=N,req=false "
285 "name=tag,type=CephString,req=false",
287 "scrub and inode and output results");
289 r
= admin_socket
->register_command("scrub abort",
291 "Abort in progress scrub operations(s)");
293 r
= admin_socket
->register_command("scrub pause",
295 "Pause in progress scrub operations(s)");
297 r
= admin_socket
->register_command("scrub resume",
299 "Resume paused scrub operations(s)");
301 r
= admin_socket
->register_command("scrub status",
303 "Status of scrub operations(s)");
305 r
= admin_socket
->register_command("tag path name=path,type=CephString"
306 " name=tag,type=CephString",
308 "Apply scrub tag recursively");
310 r
= admin_socket
->register_command("flush_path name=path,type=CephString",
312 "flush an inode (and its dirfrags)");
314 r
= admin_socket
->register_command("export dir "
315 "name=path,type=CephString "
316 "name=rank,type=CephInt",
318 "migrate a subtree to named MDS");
320 r
= admin_socket
->register_command("dump cache "
321 "name=path,type=CephString,req=false "
322 "name=timeout,type=CephInt,range=0,req=false",
324 "dump metadata cache (optionally to a file)");
326 r
= admin_socket
->register_command("cache drop "
327 "name=timeout,type=CephInt,range=0,req=false",
329 "trim cache and optionally request client to release all caps and flush the journal");
331 r
= admin_socket
->register_command("cache status",
333 "show cache status");
335 r
= admin_socket
->register_command("dump tree "
336 "name=root,type=CephString,req=true "
337 "name=depth,type=CephInt,req=false ",
339 "dump metadata cache for subtree");
341 r
= admin_socket
->register_command("dump loads",
343 "dump metadata loads");
345 r
= admin_socket
->register_command("dump snaps name=server,type=CephChoices,strings=--server,req=false",
349 r
= admin_socket
->register_command("session ls "
350 "name=cap_dump,type=CephBool,req=false "
351 "name=filters,type=CephString,n=N,req=false ",
353 "List client sessions based on a filter");
355 r
= admin_socket
->register_command("client ls "
356 "name=cap_dump,type=CephBool,req=false "
357 "name=filters,type=CephString,n=N,req=false ",
359 "List client sessions based on a filter");
361 r
= admin_socket
->register_command("session evict name=filters,type=CephString,n=N,req=false",
363 "Evict client session(s) based on a filter");
365 r
= admin_socket
->register_command("client evict name=filters,type=CephString,n=N,req=false",
367 "Evict client session(s) based on a filter");
369 r
= admin_socket
->register_command("session kill name=client_id,type=CephString",
371 "Evict a client session by id");
373 r
= admin_socket
->register_command("session ls name=cap_dump,type=CephBool,req=false",
375 "Enumerate connected CephFS clients");
377 r
= admin_socket
->register_command("session config "
378 "name=client_id,type=CephInt,req=true "
379 "name=option,type=CephString,req=true "
380 "name=value,type=CephString,req=false ",
382 "Config a CephFS client session");
384 r
= admin_socket
->register_command("client config "
385 "name=client_id,type=CephInt,req=true "
386 "name=option,type=CephString,req=true "
387 "name=value,type=CephString,req=false ",
389 "Config a CephFS client session");
391 r
= admin_socket
->register_command("damage ls",
393 "List detected metadata damage");
395 r
= admin_socket
->register_command("damage rm "
396 "name=damage_id,type=CephInt",
398 "Remove a damage table entry");
400 r
= admin_socket
->register_command("osdmap barrier name=target_epoch,type=CephInt",
402 "Wait until the MDS has this OSD map epoch");
404 r
= admin_socket
->register_command("flush journal",
406 "Flush the journal to the backing store");
408 r
= admin_socket
->register_command("force_readonly",
410 "Force MDS to read-only mode");
412 r
= admin_socket
->register_command("get subtrees",
414 "Return the subtree map");
416 r
= admin_socket
->register_command("dirfrag split "
417 "name=path,type=CephString,req=true "
418 "name=frag,type=CephString,req=true "
419 "name=bits,type=CephInt,req=true ",
421 "Fragment directory by path");
423 r
= admin_socket
->register_command("dirfrag merge "
424 "name=path,type=CephString,req=true "
425 "name=frag,type=CephString,req=true",
427 "De-fragment directory by path");
429 r
= admin_socket
->register_command("dirfrag ls "
430 "name=path,type=CephString,req=true",
432 "List fragments in directory");
434 r
= admin_socket
->register_command("openfiles ls",
436 "List the opening files and their caps");
438 r
= admin_socket
->register_command("dump inode "
439 "name=number,type=CephInt,req=true",
441 "dump inode by inode number");
443 r
= admin_socket
->register_command("exit",
445 "Terminate this MDS");
446 r
= admin_socket
->register_command("respawn",
450 r
= admin_socket
->register_command(
452 "name=heapcmd,type=CephChoices,strings=" \
453 "dump|start_profiler|stop_profiler|release|get_release_rate|set_release_rate|stats " \
454 "name=value,type=CephString,req=false",
456 "show heap usage info (available only if compiled with tcmalloc)");
458 r
= admin_socket
->register_command(
460 "name=arg,type=CephChoices,strings=status|flush",
462 "run cpu profiling on daemon");
466 void MDSDaemon::clean_up_admin_socket()
468 g_ceph_context
->get_admin_socket()->unregister_commands(asok_hook
);
473 int MDSDaemon::init()
476 // Some file related flags and types are stubbed on Windows. In order to avoid
477 // incorrect behavior, we're going to prevent the MDS from running on Windows
478 // until those limitations are addressed. MDS clients, however, are allowed
479 // to run on Windows.
480 derr
<< "The Ceph MDS does not support running on Windows at the moment."
482 return -CEPHFS_ENOSYS
;
485 dout(10) << "Dumping misc struct sizes:" << dendl
;
486 dout(10) << sizeof(MDSCacheObject
) << "\tMDSCacheObject" << dendl
;
487 dout(10) << sizeof(CInode
) << "\tCInode" << dendl
;
488 dout(10) << sizeof(elist
<void*>::item
) << "\telist<>::item" << dendl
;
489 dout(10) << sizeof(CInode::mempool_inode
) << "\tinode" << dendl
;
490 dout(10) << sizeof(CInode::mempool_old_inode
) << "\told_inode" << dendl
;
491 dout(10) << sizeof(nest_info_t
) << "\tnest_info_t" << dendl
;
492 dout(10) << sizeof(frag_info_t
) << "\tfrag_info_t" << dendl
;
493 dout(10) << sizeof(SimpleLock
) << "\tSimpleLock" << dendl
;
494 dout(10) << sizeof(ScatterLock
) << "\tScatterLock" << dendl
;
495 dout(10) << sizeof(CDentry
) << "\tCDentry" << dendl
;
496 dout(10) << sizeof(elist
<void*>::item
) << "\telist<>::item" << dendl
;
497 dout(10) << sizeof(SimpleLock
) << "\tSimpleLock" << dendl
;
498 dout(10) << sizeof(CDir
) << "\tCDir" << dendl
;
499 dout(10) << sizeof(elist
<void*>::item
) << "\telist<>::item" << dendl
;
500 dout(10) << sizeof(fnode_t
) << "\tfnode_t" << dendl
;
501 dout(10) << sizeof(nest_info_t
) << "\tnest_info_t" << dendl
;
502 dout(10) << sizeof(frag_info_t
) << "\tfrag_info_t" << dendl
;
503 dout(10) << sizeof(Capability
) << "\tCapability" << dendl
;
504 dout(10) << sizeof(xlist
<void*>::item
) << "\txlist<>::item" << dendl
;
506 messenger
->add_dispatcher_tail(&beacon
);
507 messenger
->add_dispatcher_tail(this);
510 monc
->set_messenger(messenger
);
512 monc
->set_want_keys(CEPH_ENTITY_TYPE_MON
| CEPH_ENTITY_TYPE_OSD
|
513 CEPH_ENTITY_TYPE_MDS
| CEPH_ENTITY_TYPE_MGR
);
517 derr
<< "ERROR: failed to init monc: " << cpp_strerror(-r
) << dendl
;
524 messenger
->set_auth_client(monc
);
525 messenger
->set_auth_server(monc
);
526 monc
->set_handle_authentication_dispatcher(this);
528 // tell monc about log_client so it will know about mon session resets
529 monc
->set_log_client(&log_client
);
531 r
= monc
->authenticate();
533 derr
<< "ERROR: failed to authenticate: " << cpp_strerror(-r
) << dendl
;
540 int rotating_auth_attempts
= 0;
541 auto rotating_auth_timeout
=
542 g_conf().get_val
<int64_t>("rotating_keys_bootstrap_timeout");
543 while (monc
->wait_auth_rotating(rotating_auth_timeout
) < 0) {
544 if (++rotating_auth_attempts
<= g_conf()->max_rotating_auth_attempts
) {
545 derr
<< "unable to obtain rotating service keys; retrying" << dendl
;
548 derr
<< "ERROR: failed to refresh rotating keys, "
549 << "maximum retry time reached." << dendl
;
550 std::lock_guard locker
{mds_lock
};
552 return -CEPHFS_ETIMEDOUT
;
556 if (beacon
.get_want_state() == CEPH_MDS_STATE_DNE
) {
557 dout(4) << __func__
<< ": terminated already, dropping out" << dendl
;
562 monc
->sub_want("mdsmap", 0, 0);
567 // Set up admin socket before taking mds_lock, so that ordering
568 // is consistent (later we take mds_lock within asok callbacks)
569 set_up_admin_socket();
570 std::lock_guard locker
{mds_lock
};
571 if (beacon
.get_want_state() == MDSMap::STATE_DNE
) {
572 suicide(); // we could do something more graceful here
573 dout(4) << __func__
<< ": terminated already, dropping out" << dendl
;
579 beacon
.init(*mdsmap
);
580 messenger
->set_myname(entity_name_t::MDS(MDS_RANK_NONE
));
587 void MDSDaemon::reset_tick()
590 if (tick_event
) timer
.cancel_event(tick_event
);
593 tick_event
= timer
.add_event_after(
594 g_conf()->mds_tick_interval
,
595 new LambdaContext([this](int) {
596 ceph_assert(ceph_mutex_is_locked_by_me(mds_lock
));
601 void MDSDaemon::tick()
606 // Call through to subsystems' tick functions
612 void MDSDaemon::handle_command(const cref_t
<MCommand
> &m
)
614 auto priv
= m
->get_connection()->get_priv();
615 auto session
= static_cast<Session
*>(priv
.get());
616 ceph_assert(session
!= NULL
);
620 CachedStackStringStream css
;
624 // If someone is using a closed session for sending commands (e.g.
625 // the ceph CLI) then we should feel free to clean up this connection
626 // as soon as we've sent them a response.
627 const bool live_session
=
628 session
->get_state_seq() > 0 &&
630 mds_rank
->sessionmap
.get_session(session
->info
.inst
.name
);
633 // This session only existed to issue commands, so terminate it
634 // as soon as we can.
635 ceph_assert(session
->is_closed());
636 session
->get_connection()->mark_disposable();
640 if (!session
->auth_caps
.allow_all()) {
642 << ": received command from client without `tell` capability: "
643 << *m
->get_connection()->peer_addrs
<< dendl
;
645 ss
<< "permission denied";
647 } else if (m
->cmd
.empty()) {
649 ss
<< "no command given";
650 } else if (!TOPNSPC::common::cmdmap_from_json(m
->cmd
, &cmdmap
, ss
)) {
653 cct
->get_admin_socket()->queue_tell_command(m
);
657 auto reply
= make_message
<MCommandReply
>(r
, ss
.str());
658 reply
->set_tid(m
->get_tid());
659 reply
->set_data(outbl
);
660 m
->get_connection()->send_message2(reply
);
663 void MDSDaemon::handle_mds_map(const cref_t
<MMDSMap
> &m
)
665 version_t epoch
= m
->get_epoch();
668 if (epoch
<= mdsmap
->get_epoch()) {
669 dout(5) << "handle_mds_map old map epoch " << epoch
<< " <= "
670 << mdsmap
->get_epoch() << ", discarding" << dendl
;
674 dout(1) << "Updating MDS map to version " << epoch
<< " from " << m
->get_source() << dendl
;
676 // keep old map, for a moment
677 std::unique_ptr
<MDSMap
> oldmap
;
680 // decode and process
681 mdsmap
.reset(new MDSMap
);
682 mdsmap
->decode(m
->get_encoded());
684 monc
->sub_got("mdsmap", mdsmap
->get_epoch());
687 CompatSet
mdsmap_compat(MDSMap::get_compat_set_all());
688 dout(10) << " my compat " << mdsmap_compat
<< dendl
;
689 dout(10) << " mdsmap compat " << mdsmap
->compat
<< dendl
;
690 if (!mdsmap_compat
.writeable(mdsmap
->compat
)) {
691 dout(0) << "handle_mds_map mdsmap compatset " << mdsmap
->compat
692 << " not writeable with daemon features " << mdsmap_compat
693 << ", killing myself" << dendl
;
698 // Calculate my effective rank (either my owned rank or the rank I'm following if STATE_STANDBY_REPLAY
699 const auto addrs
= messenger
->get_myaddrs();
700 const auto myid
= monc
->get_global_id();
701 const auto mygid
= mds_gid_t(myid
);
702 const auto whoami
= mdsmap
->get_rank_gid(mygid
);
703 const auto old_state
= oldmap
->get_state_gid(mygid
);
704 const auto new_state
= mdsmap
->get_state_gid(mygid
);
705 const auto incarnation
= mdsmap
->get_inc_gid(mygid
);
706 dout(10) << "my gid is " << myid
<< dendl
;
707 dout(10) << "map says I am mds." << whoami
<< "." << incarnation
708 << " state " << ceph_mds_state_name(new_state
) << dendl
;
709 dout(10) << "msgr says I am " << addrs
<< dendl
;
711 // If we're removed from the MDSMap, stop all processing.
712 using DS
= MDSMap::DaemonState
;
713 if (old_state
!= DS::STATE_NULL
&& new_state
== DS::STATE_NULL
) {
714 const auto& oldinfo
= oldmap
->get_info_gid(mygid
);
715 dout(1) << "Map removed me " << oldinfo
716 << " from cluster; respawning! See cluster/monitor logs for details." << dendl
;
720 if (old_state
== DS::STATE_NULL
&& new_state
!= DS::STATE_NULL
) {
721 /* The MDS has been added to the FSMap, now we can init the MgrClient */
723 messenger
->add_dispatcher_tail(&mgrc
);
724 monc
->sub_want("mgrmap", 0, 0);
725 monc
->renew_subs(); /* MgrMap receipt drives connection to ceph-mgr */
728 // mark down any failed peers
729 for (const auto& [gid
, info
] : oldmap
->get_mds_info()) {
730 if (mdsmap
->get_mds_info().count(gid
) == 0) {
731 dout(10) << " peer mds gid " << gid
<< " removed from map" << dendl
;
732 messenger
->mark_down_addrs(info
.addrs
);
736 if (whoami
== MDS_RANK_NONE
) {
737 // We do not hold a rank:
738 dout(10) << __func__
<< ": handling map in rankless mode" << dendl
;
740 if (new_state
== DS::STATE_STANDBY
) {
741 /* Note: STATE_BOOT is never an actual state in the FSMap. The Monitors
742 * generally mark a new MDS as STANDBY (although it's possible to
743 * immediately be assigned a rank).
745 if (old_state
== DS::STATE_NULL
) {
746 dout(1) << "Monitors have assigned me to become a standby." << dendl
;
747 beacon
.set_want_state(*mdsmap
, new_state
);
748 } else if (old_state
== DS::STATE_STANDBY
) {
749 dout(5) << "I am still standby" << dendl
;
751 } else if (new_state
== DS::STATE_NULL
) {
752 /* We are not in the MDSMap yet! Keep waiting: */
753 ceph_assert(beacon
.get_want_state() == DS::STATE_BOOT
);
754 dout(10) << "not in map yet" << dendl
;
756 /* We moved to standby somehow from another state */
757 ceph_abort("invalid transition to standby");
760 // Did we already hold a different rank? MDSMonitor shouldn't try
761 // to change that out from under me!
762 if (mds_rank
&& whoami
!= mds_rank
->get_nodeid()) {
763 derr
<< "Invalid rank transition " << mds_rank
->get_nodeid() << "->"
768 // Did I previously not hold a rank? Initialize!
769 if (mds_rank
== NULL
) {
770 mds_rank
= new MDSRankDispatcher(whoami
, mds_lock
, clog
,
771 timer
, beacon
, mdsmap
, messenger
, monc
, &mgrc
,
772 new LambdaContext([this](int r
){respawn();}),
773 new LambdaContext([this](int r
){suicide();}),
775 dout(10) << __func__
<< ": initializing MDS rank "
776 << mds_rank
->get_nodeid() << dendl
;
780 // MDSRank is active: let him process the map, we have no say.
781 dout(10) << __func__
<< ": handling map as rank "
782 << mds_rank
->get_nodeid() << dendl
;
783 mds_rank
->handle_mds_map(m
, *oldmap
);
786 beacon
.notify_mdsmap(*mdsmap
);
789 void MDSDaemon::handle_signal(int signum
)
791 ceph_assert(signum
== SIGINT
|| signum
== SIGTERM
);
792 derr
<< "*** got signal " << sig_str(signum
) << " ***" << dendl
;
794 std::lock_guard
l(mds_lock
);
802 void MDSDaemon::suicide()
804 ceph_assert(ceph_mutex_is_locked(mds_lock
));
806 // make sure we don't suicide twice
807 ceph_assert(stopping
== false);
810 dout(1) << "suicide! Wanted state "
811 << ceph_mds_state_name(beacon
.get_want_state()) << dendl
;
814 timer
.cancel_event(tick_event
);
818 clean_up_admin_socket();
820 // Notify the Monitors (MDSMonitor) that we're dying, so that it doesn't have
821 // to wait for us to go laggy. Only do this if we're actually in the MDSMap,
822 // because otherwise the MDSMonitor will drop our message.
823 beacon
.set_want_state(*mdsmap
, MDSMap::STATE_DNE
);
824 if (!mdsmap
->is_dne_gid(mds_gid_t(monc
->get_global_id()))) {
825 beacon
.send_and_wait(1);
829 if (mgrc
.is_initialized())
833 mds_rank
->shutdown();
838 messenger
->shutdown();
842 void MDSDaemon::respawn()
844 // --- WARNING TO FUTURE COPY/PASTERS ---
845 // You must also add a call like
847 // ceph_pthread_setname(pthread_self(), "ceph-mds");
849 // to main() so that /proc/$pid/stat field 2 contains "(ceph-mds)"
850 // instead of "(exe)", so that killall (and log rotation) will work.
852 dout(1) << "respawn!" << dendl
;
854 /* Dump recent in case the MDS was stuck doing something which caused it to
855 * be removed from the MDSMap leading to respawn. */
856 g_ceph_context
->_log
->dump_recent();
858 /* valgrind can't handle execve; just exit and let QA infra restart */
859 if (g_conf().get_val
<bool>("mds_valgrind_exit")) {
863 char *new_argv
[orig_argc
+1];
864 dout(1) << " e: '" << orig_argv
[0] << "'" << dendl
;
865 for (int i
=0; i
<orig_argc
; i
++) {
866 new_argv
[i
] = (char *)orig_argv
[i
];
867 dout(1) << " " << i
<< ": '" << orig_argv
[i
] << "'" << dendl
;
869 new_argv
[orig_argc
] = NULL
;
871 /* Determine the path to our executable, test if Linux /proc/self/exe exists.
872 * This allows us to exec the same executable even if it has since been
875 char exe_path
[PATH_MAX
] = "";
877 if (readlink(PROCPREFIX
"/proc/self/exe", exe_path
, PATH_MAX
-1) != -1) {
878 dout(1) << "respawning with exe " << exe_path
<< dendl
;
879 strcpy(exe_path
, PROCPREFIX
"/proc/self/exe");
884 /* Print CWD for the user's interest */
886 char *cwd
= getcwd(buf
, sizeof(buf
));
888 dout(1) << " cwd " << cwd
<< dendl
;
890 /* Fall back to a best-effort: just running in our CWD */
891 strncpy(exe_path
, orig_argv
[0], PATH_MAX
-1);
894 dout(1) << " exe_path " << exe_path
<< dendl
;
896 unblock_all_signals(NULL
);
897 execv(exe_path
, new_argv
);
899 dout(0) << "respawn execv " << orig_argv
[0]
900 << " failed with " << cpp_strerror(errno
) << dendl
;
902 // We have to assert out here, because suicide() returns, and callers
903 // to respawn expect it never to return.
909 bool MDSDaemon::ms_dispatch2(const ref_t
<Message
> &m
)
911 std::lock_guard
l(mds_lock
);
916 // Drop out early if shutting down
917 if (beacon
.get_want_state() == CEPH_MDS_STATE_DNE
) {
918 dout(10) << " stopping, discarding " << *m
<< dendl
;
922 // First see if it's a daemon message
923 const bool handled_core
= handle_core_message(m
);
928 // Not core, try it as a rank message
930 return mds_rank
->ms_dispatch(m
);
937 * high priority messages we always process
940 #define ALLOW_MESSAGES_FROM(peers) \
942 if (m->get_connection() && (m->get_connection()->get_peer_type() & (peers)) == 0) { \
943 dout(0) << __FILE__ << "." << __LINE__ << ": filtered out request, peer=" \
944 << m->get_connection()->get_peer_type() << " allowing=" \
945 << #peers << " message=" << *m << dendl; \
950 bool MDSDaemon::handle_core_message(const cref_t
<Message
> &m
)
952 switch (m
->get_type()) {
953 case CEPH_MSG_MON_MAP
:
954 ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MON
);
958 case CEPH_MSG_MDS_MAP
:
959 ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MON
| CEPH_ENTITY_TYPE_MDS
);
960 handle_mds_map(ref_cast
<MMDSMap
>(m
));
963 case MSG_REMOVE_SNAPS
:
964 ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MON
);
965 mds_rank
->snapserver
->handle_remove_snaps(ref_cast
<MRemoveSnaps
>(m
));
970 handle_command(ref_cast
<MCommand
>(m
));
972 case CEPH_MSG_OSD_MAP
:
973 ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MON
| CEPH_ENTITY_TYPE_OSD
);
976 mds_rank
->handle_osd_map();
980 case MSG_MON_COMMAND
:
981 ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MON
);
982 clog
->warn() << "dropping `mds tell` command from legacy monitor";
991 void MDSDaemon::ms_handle_connect(Connection
*con
)
995 bool MDSDaemon::ms_handle_reset(Connection
*con
)
997 if (con
->get_peer_type() != CEPH_ENTITY_TYPE_CLIENT
)
1000 std::lock_guard
l(mds_lock
);
1004 dout(5) << "ms_handle_reset on " << con
->get_peer_socket_addr() << dendl
;
1005 if (beacon
.get_want_state() == CEPH_MDS_STATE_DNE
)
1008 auto priv
= con
->get_priv();
1009 if (auto session
= static_cast<Session
*>(priv
.get()); session
) {
1010 if (session
->is_closed()) {
1011 dout(3) << "ms_handle_reset closing connection for session " << session
->info
.inst
<< dendl
;
1013 con
->set_priv(nullptr);
1022 void MDSDaemon::ms_handle_remote_reset(Connection
*con
)
1024 if (con
->get_peer_type() != CEPH_ENTITY_TYPE_CLIENT
)
1027 std::lock_guard
l(mds_lock
);
1032 dout(5) << "ms_handle_remote_reset on " << con
->get_peer_socket_addr() << dendl
;
1033 if (beacon
.get_want_state() == CEPH_MDS_STATE_DNE
)
1036 auto priv
= con
->get_priv();
1037 if (auto session
= static_cast<Session
*>(priv
.get()); session
) {
1038 if (session
->is_closed()) {
1039 dout(3) << "ms_handle_remote_reset closing connection for session " << session
->info
.inst
<< dendl
;
1041 con
->set_priv(nullptr);
1046 bool MDSDaemon::ms_handle_refused(Connection
*con
)
1048 // do nothing for now
1052 bool MDSDaemon::parse_caps(const AuthCapsInfo
& info
, MDSAuthCaps
& caps
)
1055 if (info
.allow_all
) {
1056 caps
.set_allow_all();
1059 auto it
= info
.caps
.begin();
1060 string auth_cap_str
;
1062 decode(auth_cap_str
, it
);
1063 } catch (const buffer::error
& e
) {
1064 dout(1) << __func__
<< ": cannot decode auth caps buffer of length " << info
.caps
.length() << dendl
;
1068 dout(10) << __func__
<< ": parsing auth_cap_str='" << auth_cap_str
<< "'" << dendl
;
1069 CachedStackStringStream cs
;
1070 if (caps
.parse(g_ceph_context
, auth_cap_str
, cs
.get())) {
1073 dout(1) << __func__
<< ": auth cap parse error: " << cs
->strv() << " parsing '" << auth_cap_str
<< "'" << dendl
;
1079 int MDSDaemon::ms_handle_authentication(Connection
*con
)
1081 /* N.B. without mds_lock! */
1083 return parse_caps(con
->get_peer_caps_info(), caps
) ? 0 : -1;
1086 void MDSDaemon::ms_handle_accept(Connection
*con
)
1088 entity_name_t
n(con
->get_peer_type(), con
->get_peer_global_id());
1089 std::lock_guard
l(mds_lock
);
1094 // We allow connections and assign Session instances to connections
1095 // even if we have not been assigned a rank, because clients with
1096 // "allow *" are allowed to connect and do 'tell' operations before
1100 // If we do hold a rank, see if this is an existing client establishing
1101 // a new connection, rather than a new client
1102 s
= mds_rank
->sessionmap
.get_session(n
);
1105 // Wire up a Session* to this connection
1106 // It doesn't go into a SessionMap instance until it sends an explicit
1107 // request to open a session (initial state of Session is `closed`)
1109 s
= new Session(con
);
1110 dout(10) << " new session " << s
<< " for " << s
->info
.inst
1111 << " con " << con
<< dendl
;
1112 con
->set_priv(RefCountedPtr
{s
, false});
1114 mds_rank
->kick_waiters_for_any_client_connection();
1117 dout(10) << " existing session " << s
<< " for " << s
->info
.inst
1118 << " existing con " << s
->get_connection()
1119 << ", new/authorizing con " << con
<< dendl
;
1120 con
->set_priv(RefCountedPtr
{s
});
1123 parse_caps(con
->get_peer_caps_info(), s
->auth_caps
);
1125 dout(10) << "ms_handle_accept " << con
->get_peer_socket_addr() << " con " << con
<< " session " << s
<< dendl
;
1127 if (s
->get_connection() != con
) {
1128 dout(10) << " session connection " << s
->get_connection()
1129 << " -> " << con
<< dendl
;
1130 s
->set_connection(con
);
1132 // send out any queued messages
1133 while (!s
->preopen_out_queue
.empty()) {
1134 con
->send_message2(s
->preopen_out_queue
.front());
1135 s
->preopen_out_queue
.pop_front();
1141 bool MDSDaemon::is_clean_shutdown()
1144 return mds_rank
->is_stopped();