1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
17 #include "include/compat.h"
18 #include "include/types.h"
19 #include "include/str_list.h"
21 #include "common/Clock.h"
22 #include "common/HeartbeatMap.h"
23 #include "common/Timer.h"
24 #include "common/backport14.h"
25 #include "common/ceph_argparse.h"
26 #include "common/config.h"
27 #include "common/entity_name.h"
28 #include "common/errno.h"
29 #include "common/perf_counters.h"
30 #include "common/signal.h"
31 #include "common/version.h"
33 #include "global/signal_handler.h"
35 #include "msg/Messenger.h"
36 #include "mon/MonClient.h"
38 #include "osdc/Objecter.h"
42 #include "MDSDaemon.h"
46 #include "SnapServer.h"
47 #include "SnapClient.h"
49 #include "events/ESession.h"
50 #include "events/ESubtreeMap.h"
52 #include "messages/MMDSMap.h"
54 #include "messages/MGenericMessage.h"
56 #include "messages/MMonCommand.h"
57 #include "messages/MCommand.h"
58 #include "messages/MCommandReply.h"
60 #include "auth/AuthAuthorizeHandler.h"
61 #include "auth/RotatingKeyRing.h"
62 #include "auth/KeyRing.h"
64 #include "perfglue/cpu_profiler.h"
65 #include "perfglue/heap_profiler.h"
67 #define dout_context g_ceph_context
68 #define dout_subsys ceph_subsys_mds
70 #define dout_prefix *_dout << "mds." << name << ' '
73 MDSDaemon::MDSDaemon(boost::string_view n
, Messenger
*m
, MonClient
*mc
) :
75 mds_lock("MDSDaemon::mds_lock"),
77 timer(m
->cct
, mds_lock
),
78 beacon(m
->cct
, mc
, n
),
79 authorize_handler_cluster_registry(new AuthAuthorizeHandlerRegistry(m
->cct
,
80 m
->cct
->_conf
->auth_supported
.empty() ?
81 m
->cct
->_conf
->auth_cluster_required
:
82 m
->cct
->_conf
->auth_supported
)),
83 authorize_handler_service_registry(new AuthAuthorizeHandlerRegistry(m
->cct
,
84 m
->cct
->_conf
->auth_supported
.empty() ?
85 m
->cct
->_conf
->auth_service_required
:
86 m
->cct
->_conf
->auth_supported
)),
91 log_client(m
->cct
, messenger
, &mc
->monmap
, LogClient::NO_FLAGS
),
94 starttime(mono_clock::now())
99 clog
= log_client
.create_channel();
101 monc
->set_messenger(messenger
);
106 MDSDaemon::~MDSDaemon() {
107 Mutex::Locker
lock(mds_lock
);
114 delete authorize_handler_service_registry
;
115 delete authorize_handler_cluster_registry
;
118 class MDSSocketHook
: public AdminSocketHook
{
121 explicit MDSSocketHook(MDSDaemon
*m
) : mds(m
) {}
122 bool call(std::string command
, cmdmap_t
& cmdmap
, std::string format
,
123 bufferlist
& out
) override
{
125 bool r
= mds
->asok_command(command
, cmdmap
, format
, ss
);
131 bool MDSDaemon::asok_command(string command
, cmdmap_t
& cmdmap
, string format
,
134 dout(1) << "asok_command: " << command
<< " (starting...)" << dendl
;
136 Formatter
*f
= Formatter::create(format
, "json-pretty", "json-pretty");
137 bool handled
= false;
138 if (command
== "status") {
142 if (mds_rank
== NULL
) {
143 dout(1) << "Can't run that command on an inactive MDS!" << dendl
;
144 f
->dump_string("error", "mds_not_active");
146 handled
= mds_rank
->handle_asok_command(command
, cmdmap
, f
, ss
);
152 dout(1) << "asok_command: " << command
<< " (complete)" << dendl
;
157 void MDSDaemon::dump_status(Formatter
*f
)
159 f
->open_object_section("status");
160 f
->dump_stream("cluster_fsid") << monc
->get_fsid();
162 f
->dump_int("whoami", mds_rank
->get_nodeid());
164 f
->dump_int("whoami", MDS_RANK_NONE
);
167 f
->dump_int("id", monc
->get_global_id());
168 f
->dump_string("want_state", ceph_mds_state_name(beacon
.get_want_state()));
169 f
->dump_string("state", ceph_mds_state_name(mdsmap
->get_state_gid(mds_gid_t(
170 monc
->get_global_id()))));
172 Mutex::Locker
l(mds_lock
);
173 mds_rank
->dump_status(f
);
176 f
->dump_unsigned("mdsmap_epoch", mdsmap
->get_epoch());
178 f
->dump_unsigned("osdmap_epoch", mds_rank
->get_osd_epoch());
179 f
->dump_unsigned("osdmap_epoch_barrier", mds_rank
->get_osd_epoch_barrier());
181 f
->dump_unsigned("osdmap_epoch", 0);
182 f
->dump_unsigned("osdmap_epoch_barrier", 0);
185 f
->dump_float("uptime", get_uptime().count());
187 f
->close_section(); // status
190 void MDSDaemon::set_up_admin_socket()
193 AdminSocket
*admin_socket
= g_ceph_context
->get_admin_socket();
194 assert(asok_hook
== nullptr);
195 asok_hook
= new MDSSocketHook(this);
196 r
= admin_socket
->register_command("status", "status", asok_hook
,
197 "high-level status of MDS");
199 r
= admin_socket
->register_command("dump_ops_in_flight",
200 "dump_ops_in_flight", asok_hook
,
201 "show the ops currently in flight");
203 r
= admin_socket
->register_command("ops",
205 "show the ops currently in flight");
207 r
= admin_socket
->register_command("dump_blocked_ops", "dump_blocked_ops",
209 "show the blocked ops currently in flight");
211 r
= admin_socket
->register_command("dump_historic_ops", "dump_historic_ops",
213 "show slowest recent ops");
215 r
= admin_socket
->register_command("dump_historic_ops_by_duration", "dump_historic_ops_by_duration",
217 "show slowest recent ops, sorted by op duration");
219 r
= admin_socket
->register_command("scrub_path",
220 "scrub_path name=path,type=CephString "
221 "name=scrubops,type=CephChoices,"
222 "strings=force|recursive|repair,n=N,req=false",
224 "scrub an inode and output results");
226 r
= admin_socket
->register_command("tag path",
227 "tag path name=path,type=CephString"
228 " name=tag,type=CephString",
230 "Apply scrub tag recursively");
232 r
= admin_socket
->register_command("flush_path",
233 "flush_path name=path,type=CephString",
235 "flush an inode (and its dirfrags)");
237 r
= admin_socket
->register_command("export dir",
239 "name=path,type=CephString "
240 "name=rank,type=CephInt",
242 "migrate a subtree to named MDS");
244 r
= admin_socket
->register_command("dump cache",
245 "dump cache name=path,type=CephString,req=false",
247 "dump metadata cache (optionally to a file)");
249 r
= admin_socket
->register_command("cache status",
252 "show cache status");
254 r
= admin_socket
->register_command("cache drop",
255 "cache drop name=timeout,type=CephInt,range=0,req=false",
259 r
= admin_socket
->register_command("dump tree",
261 "name=root,type=CephString,req=true "
262 "name=depth,type=CephInt,req=false ",
264 "dump metadata cache for subtree");
266 r
= admin_socket
->register_command("dump loads",
269 "dump metadata loads");
271 r
= admin_socket
->register_command("session evict",
272 "session evict name=client_id,type=CephString",
274 "Evict a CephFS client");
276 r
= admin_socket
->register_command("osdmap barrier",
277 "osdmap barrier name=target_epoch,type=CephInt",
279 "Wait until the MDS has this OSD map epoch");
281 r
= admin_socket
->register_command("session ls",
284 "Enumerate connected CephFS clients");
286 r
= admin_socket
->register_command("flush journal",
289 "Flush the journal to the backing store");
291 r
= admin_socket
->register_command("force_readonly",
294 "Force MDS to read-only mode");
296 r
= admin_socket
->register_command("get subtrees",
299 "Return the subtree map");
301 r
= admin_socket
->register_command("dirfrag split",
303 "name=path,type=CephString,req=true "
304 "name=frag,type=CephString,req=true "
305 "name=bits,type=CephInt,req=true ",
307 "Fragment directory by path");
309 r
= admin_socket
->register_command("dirfrag merge",
311 "name=path,type=CephString,req=true "
312 "name=frag,type=CephString,req=true",
314 "De-fragment directory by path");
316 r
= admin_socket
->register_command("dirfrag ls",
318 "name=path,type=CephString,req=true",
320 "List fragments in directory");
324 void MDSDaemon::clean_up_admin_socket()
326 AdminSocket
*admin_socket
= g_ceph_context
->get_admin_socket();
327 admin_socket
->unregister_command("status");
328 admin_socket
->unregister_command("dump_ops_in_flight");
329 admin_socket
->unregister_command("ops");
330 admin_socket
->unregister_command("dump_blocked_ops");
331 admin_socket
->unregister_command("dump_historic_ops");
332 admin_socket
->unregister_command("dump_historic_ops_by_duration");
333 admin_socket
->unregister_command("scrub_path");
334 admin_socket
->unregister_command("tag path");
335 admin_socket
->unregister_command("flush_path");
336 admin_socket
->unregister_command("export dir");
337 admin_socket
->unregister_command("dump cache");
338 admin_socket
->unregister_command("cache status");
339 admin_socket
->unregister_command("dump tree");
340 admin_socket
->unregister_command("dump loads");
341 admin_socket
->unregister_command("session evict");
342 admin_socket
->unregister_command("osdmap barrier");
343 admin_socket
->unregister_command("session ls");
344 admin_socket
->unregister_command("flush journal");
345 admin_socket
->unregister_command("force_readonly");
346 admin_socket
->unregister_command("get subtrees");
347 admin_socket
->unregister_command("dirfrag split");
348 admin_socket
->unregister_command("dirfrag merge");
349 admin_socket
->unregister_command("dirfrag ls");
354 const char** MDSDaemon::get_tracked_conf_keys() const
356 static const char* KEYS
[] = {
357 "mds_op_complaint_time", "mds_op_log_threshold",
358 "mds_op_history_size", "mds_op_history_duration",
359 "mds_enable_op_tracker",
364 "clog_to_syslog_facility",
365 "clog_to_syslog_level",
367 "clog_to_graylog_host",
368 "clog_to_graylog_port",
371 "mds_cache_memory_limit",
372 "mds_cache_reservation",
373 "mds_health_cache_threshold",
375 "mds_dump_cache_threshold_formatter",
376 "mds_cache_trim_decay_rate",
377 "mds_dump_cache_threshold_file",
379 "mds_bal_fragment_interval",
382 "mds_max_purge_ops_per_pg",
383 "mds_max_purge_files",
385 "mds_max_export_size",
386 "mds_inject_migrator_session_race",
387 "mds_inject_migrator_message_loss",
390 "mds_cap_revoke_eviction_timeout",
392 "mds_request_load_average_decay_rate",
393 "mds_recall_max_decay_rate",
399 void MDSDaemon::handle_conf_change(const struct md_config_t
*conf
,
400 const std::set
<std::string
> &changed
)
402 // We may be called within mds_lock (via `tell`) or outwith the
403 // lock (via admin socket `config set`), so handle either case.
404 const bool initially_locked
= mds_lock
.is_locked_by_me();
405 if (!initially_locked
) {
409 if (changed
.count("mds_op_complaint_time") ||
410 changed
.count("mds_op_log_threshold")) {
412 mds_rank
->op_tracker
.set_complaint_and_threshold(conf
->mds_op_complaint_time
,
413 conf
->mds_op_log_threshold
);
416 if (changed
.count("mds_op_history_size") ||
417 changed
.count("mds_op_history_duration")) {
419 mds_rank
->op_tracker
.set_history_size_and_duration(conf
->mds_op_history_size
,
420 conf
->mds_op_history_duration
);
423 if (changed
.count("mds_enable_op_tracker")) {
425 mds_rank
->op_tracker
.set_tracking(conf
->mds_enable_op_tracker
);
428 if (changed
.count("clog_to_monitors") ||
429 changed
.count("clog_to_syslog") ||
430 changed
.count("clog_to_syslog_level") ||
431 changed
.count("clog_to_syslog_facility") ||
432 changed
.count("clog_to_graylog") ||
433 changed
.count("clog_to_graylog_host") ||
434 changed
.count("clog_to_graylog_port") ||
435 changed
.count("host") ||
436 changed
.count("fsid")) {
438 mds_rank
->update_log_config();
442 if (!g_conf
->mds_log_pause
&& changed
.count("mds_log_pause")) {
444 mds_rank
->mdlog
->kick_submitter();
449 mds_rank
->handle_conf_change(conf
, changed
);
452 if (!initially_locked
) {
458 int MDSDaemon::init()
460 dout(10) << sizeof(MDSCacheObject
) << "\tMDSCacheObject" << dendl
;
461 dout(10) << sizeof(CInode
) << "\tCInode" << dendl
;
462 dout(10) << sizeof(elist
<void*>::item
) << "\t elist<>::item *7=" << 7*sizeof(elist
<void*>::item
) << dendl
;
463 dout(10) << sizeof(CInode::mempool_inode
) << "\t inode " << dendl
;
464 dout(10) << sizeof(CInode::mempool_old_inode
) << "\t old_inode " << dendl
;
465 dout(10) << sizeof(nest_info_t
) << "\t nest_info_t " << dendl
;
466 dout(10) << sizeof(frag_info_t
) << "\t frag_info_t " << dendl
;
467 dout(10) << sizeof(SimpleLock
) << "\t SimpleLock *5=" << 5*sizeof(SimpleLock
) << dendl
;
468 dout(10) << sizeof(ScatterLock
) << "\t ScatterLock *3=" << 3*sizeof(ScatterLock
) << dendl
;
469 dout(10) << sizeof(CDentry
) << "\tCDentry" << dendl
;
470 dout(10) << sizeof(elist
<void*>::item
) << "\t elist<>::item" << dendl
;
471 dout(10) << sizeof(SimpleLock
) << "\t SimpleLock" << dendl
;
472 dout(10) << sizeof(CDir
) << "\tCDir " << dendl
;
473 dout(10) << sizeof(elist
<void*>::item
) << "\t elist<>::item *2=" << 2*sizeof(elist
<void*>::item
) << dendl
;
474 dout(10) << sizeof(fnode_t
) << "\t fnode_t " << dendl
;
475 dout(10) << sizeof(nest_info_t
) << "\t nest_info_t *2" << dendl
;
476 dout(10) << sizeof(frag_info_t
) << "\t frag_info_t *2" << dendl
;
477 dout(10) << sizeof(Capability
) << "\tCapability " << dendl
;
478 dout(10) << sizeof(xlist
<void*>::item
) << "\t xlist<>::item *2=" << 2*sizeof(xlist
<void*>::item
) << dendl
;
480 messenger
->add_dispatcher_tail(&beacon
);
481 messenger
->add_dispatcher_tail(this);
484 monc
->set_messenger(messenger
);
486 monc
->set_want_keys(CEPH_ENTITY_TYPE_MON
| CEPH_ENTITY_TYPE_OSD
|
487 CEPH_ENTITY_TYPE_MDS
| CEPH_ENTITY_TYPE_MGR
);
491 derr
<< "ERROR: failed to get monmap: " << cpp_strerror(-r
) << dendl
;
498 // tell monc about log_client so it will know about mon session resets
499 monc
->set_log_client(&log_client
);
501 r
= monc
->authenticate();
503 derr
<< "ERROR: failed to authenticate: " << cpp_strerror(-r
) << dendl
;
510 int rotating_auth_attempts
= 0;
511 while (monc
->wait_auth_rotating(30.0) < 0) {
512 if (++rotating_auth_attempts
<= g_conf
->max_rotating_auth_attempts
) {
513 derr
<< "unable to obtain rotating service keys; retrying" << dendl
;
516 derr
<< "ERROR: failed to refresh rotating keys, "
517 << "maximum retry time reached." << dendl
;
525 messenger
->add_dispatcher_head(&mgrc
);
528 if (beacon
.get_want_state() == CEPH_MDS_STATE_DNE
) {
529 dout(4) << __func__
<< ": terminated already, dropping out" << dendl
;
534 monc
->sub_want("mdsmap", 0, 0);
535 monc
->sub_want("mgrmap", 0, 0);
540 // Set up admin socket before taking mds_lock, so that ordering
541 // is consistent (later we take mds_lock within asok callbacks)
542 set_up_admin_socket();
543 g_conf
->add_observer(this);
545 if (beacon
.get_want_state() == MDSMap::STATE_DNE
) {
546 suicide(); // we could do something more graceful here
547 dout(4) << __func__
<< ": terminated already, dropping out" << dendl
;
555 messenger
->set_myname(entity_name_t::MDS(MDS_RANK_NONE
));
564 void MDSDaemon::reset_tick()
567 if (tick_event
) timer
.cancel_event(tick_event
);
570 tick_event
= timer
.add_event_after(
571 g_conf
->mds_tick_interval
,
572 new FunctionContext([this](int) {
573 assert(mds_lock
.is_locked_by_me());
578 void MDSDaemon::tick()
583 // Call through to subsystems' tick functions
589 void MDSDaemon::send_command_reply(MCommand
*m
, MDSRank
*mds_rank
,
590 int r
, bufferlist outbl
,
591 boost::string_view outs
)
593 Session
*session
= static_cast<Session
*>(m
->get_connection()->get_priv());
594 assert(session
!= NULL
);
595 // If someone is using a closed session for sending commands (e.g.
596 // the ceph CLI) then we should feel free to clean up this connection
597 // as soon as we've sent them a response.
598 const bool live_session
=
599 session
->get_state_seq() > 0 &&
601 mds_rank
->sessionmap
.get_session(session
->info
.inst
.name
);
604 // This session only existed to issue commands, so terminate it
605 // as soon as we can.
606 assert(session
->is_closed());
607 session
->connection
->mark_disposable();
611 MCommandReply
*reply
= new MCommandReply(r
, outs
);
612 reply
->set_tid(m
->get_tid());
613 reply
->set_data(outbl
);
614 m
->get_connection()->send_message(reply
);
617 /* This function DOES put the passed message before returning*/
618 void MDSDaemon::handle_command(MCommand
*m
)
620 Session
*session
= static_cast<Session
*>(m
->get_connection()->get_priv());
621 assert(session
!= NULL
);
625 std::stringstream ss
;
628 Context
*run_after
= NULL
;
629 bool need_reply
= true;
631 if (!session
->auth_caps
.allow_all()) {
633 << ": received command from client without `tell` capability: "
634 << m
->get_connection()->peer_addr
<< dendl
;
636 ss
<< "permission denied";
638 } else if (m
->cmd
.empty()) {
640 ss
<< "no command given";
642 } else if (!cmdmap_from_json(m
->cmd
, &cmdmap
, ss
)) {
646 r
= _handle_command(cmdmap
, m
, &outbl
, &outs
, &run_after
, &need_reply
);
651 send_command_reply(m
, mds_rank
, r
, outbl
, outs
);
655 run_after
->complete(0);
669 #define COMMAND(parsesig, helptext, module, perm, availability) \
670 {parsesig, helptext, module, perm, availability},
672 COMMAND("injectargs " \
673 "name=injected_args,type=CephString,n=N",
674 "inject configuration arguments into running MDS",
675 "mds", "*", "cli,rest")
676 COMMAND("config set " \
677 "name=key,type=CephString name=value,type=CephString",
678 "Set a configuration option at runtime (not persistent)",
679 "mds", "*", "cli,rest")
681 "Terminate this MDS",
682 "mds", "*", "cli,rest")
685 "mds", "*", "cli,rest")
686 COMMAND("session kill " \
687 "name=session_id,type=CephInt",
688 "End a client session",
689 "mds", "*", "cli,rest")
690 COMMAND("cpu_profiler " \
691 "name=arg,type=CephChoices,strings=status|flush",
692 "run cpu profiling on daemon", "mds", "rw", "cli,rest")
693 COMMAND("session ls " \
694 "name=filters,type=CephString,n=N,req=false",
695 "List client sessions", "mds", "r", "cli,rest")
696 COMMAND("client ls " \
697 "name=filters,type=CephString,n=N,req=false",
698 "List client sessions", "mds", "r", "cli,rest")
699 COMMAND("session evict " \
700 "name=filters,type=CephString,n=N,req=false",
701 "Evict client session(s)", "mds", "rw", "cli,rest")
702 COMMAND("client evict " \
703 "name=filters,type=CephString,n=N,req=false",
704 "Evict client session(s)", "mds", "rw", "cli,rest")
706 "List detected metadata damage", "mds", "r", "cli,rest")
707 COMMAND("damage rm name=damage_id,type=CephInt",
708 "Remove a damage table entry", "mds", "rw", "cli,rest")
709 COMMAND("version", "report version of MDS", "mds", "r", "cli,rest")
711 "name=heapcmd,type=CephChoices,strings=dump|start_profiler|stop_profiler|release|stats", \
712 "show heap usage info (available only if compiled with tcmalloc)", \
713 "mds", "*", "cli,rest")
714 COMMAND("cache drop name=timeout,type=CephInt,range=0,req=false", "trim cache and optionally "
715 "request client to release all caps and flush the journal", "mds",
720 int MDSDaemon::_handle_command(
721 const cmdmap_t
&cmdmap
,
728 assert(outbl
!= NULL
);
729 assert(outs
!= NULL
);
731 class SuicideLater
: public Context
736 explicit SuicideLater(MDSDaemon
*mds_
) : mds(mds_
) {}
737 void finish(int r
) override
{
738 // Wait a little to improve chances of caller getting
739 // our response before seeing us disappear from mdsmap
747 class RespawnLater
: public Context
753 explicit RespawnLater(MDSDaemon
*mds_
) : mds(mds_
) {}
754 void finish(int r
) override
{
755 // Wait a little to improve chances of caller getting
756 // our response before seeing us disappear from mdsmap
763 std::stringstream ds
;
764 std::stringstream ss
;
767 std::unique_ptr
<Formatter
> f(Formatter::create(format
));
768 cmd_getval(cct
, cmdmap
, "prefix", prefix
);
772 if (prefix
== "get_command_descriptions") {
774 std::unique_ptr
<JSONFormatter
> f(ceph::make_unique
<JSONFormatter
>());
775 f
->open_object_section("command_descriptions");
776 for (MDSCommand
*cp
= mds_commands
;
777 cp
< &mds_commands
[ARRAY_SIZE(mds_commands
)]; cp
++) {
779 ostringstream secname
;
780 secname
<< "cmd" << setfill('0') << std::setw(3) << cmdnum
;
781 dump_cmddesc_to_json(f
.get(), secname
.str(), cp
->cmdstring
, cp
->helpstring
,
782 cp
->module
, cp
->perm
, cp
->availability
, 0);
785 f
->close_section(); // command_descriptions
791 cmd_getval(cct
, cmdmap
, "format", format
);
792 if (prefix
== "version") {
794 f
->open_object_section("version");
795 f
->dump_string("version", pretty_version_to_str());
799 ds
<< pretty_version_to_str();
801 } else if (prefix
== "injectargs") {
802 vector
<string
> argsvec
;
803 cmd_getval(cct
, cmdmap
, "injected_args", argsvec
);
805 if (argsvec
.empty()) {
807 ss
<< "ignoring empty injectargs";
810 string args
= argsvec
.front();
811 for (vector
<string
>::iterator a
= ++argsvec
.begin(); a
!= argsvec
.end(); ++a
)
813 r
= cct
->_conf
->injectargs(args
, &ss
);
814 } else if (prefix
== "config set") {
816 cmd_getval(cct
, cmdmap
, "key", key
);
818 cmd_getval(cct
, cmdmap
, "value", val
);
819 r
= cct
->_conf
->set_val(key
, val
, true, &ss
);
821 cct
->_conf
->apply_changes(nullptr);
823 } else if (prefix
== "exit") {
824 // We will send response before executing
826 *run_later
= new SuicideLater(this);
827 } else if (prefix
== "respawn") {
828 // We will send response before executing
829 ss
<< "Respawning...";
830 *run_later
= new RespawnLater(this);
831 } else if (prefix
== "session kill") {
832 if (mds_rank
== NULL
) {
834 ss
<< "MDS not active";
837 // FIXME harmonize `session kill` with admin socket session evict
838 int64_t session_id
= 0;
839 bool got
= cmd_getval(cct
, cmdmap
, "session_id", session_id
);
841 bool killed
= mds_rank
->evict_client(session_id
, false,
842 g_conf
->mds_session_blacklist_on_evict
,
846 } else if (prefix
== "heap") {
847 if (!ceph_using_tcmalloc()) {
849 ss
<< "could not issue heap profiler command -- not using tcmalloc!";
852 cmd_getval(cct
, cmdmap
, "heapcmd", heapcmd
);
853 vector
<string
> heapcmd_vec
;
854 get_str_vec(heapcmd
, heapcmd_vec
);
855 ceph_heap_profiler_handle_command(heapcmd_vec
, ds
);
857 } else if (prefix
== "cpu_profiler") {
859 cmd_getval(cct
, cmdmap
, "arg", arg
);
860 vector
<string
> argvec
;
861 get_str_vec(arg
, argvec
);
862 cpu_profiler_handle_command(argvec
, ds
);
864 // Give MDSRank a shot at the command
866 ss
<< "MDS not active";
870 bool handled
= mds_rank
->handle_command(cmdmap
, m
, &r
, &ds
, &ss
,
871 run_later
, need_reply
);
873 // MDSDaemon doesn't know this command
874 ss
<< "unrecognized command! " << prefix
;
886 /* This function deletes the passed message before returning. */
888 void MDSDaemon::handle_mds_map(MMDSMap
*m
)
890 version_t epoch
= m
->get_epoch();
893 if (epoch
<= mdsmap
->get_epoch()) {
894 dout(5) << "handle_mds_map old map epoch " << epoch
<< " <= "
895 << mdsmap
->get_epoch() << ", discarding" << dendl
;
900 dout(1) << "Updating MDS map to version " << epoch
<< " from " << m
->get_source() << dendl
;
904 // keep old map, for a moment
905 MDSMap
*oldmap
= mdsmap
;
907 // decode and process
909 mdsmap
->decode(m
->get_encoded());
910 const MDSMap::DaemonState new_state
= mdsmap
->get_state_gid(mds_gid_t(monc
->get_global_id()));
911 const int incarnation
= mdsmap
->get_inc_gid(mds_gid_t(monc
->get_global_id()));
913 monc
->sub_got("mdsmap", mdsmap
->get_epoch());
915 // Calculate my effective rank (either my owned rank or my
916 // standby_for_rank if in standby replay)
917 mds_rank_t whoami
= mdsmap
->get_rank_gid(mds_gid_t(monc
->get_global_id()));
920 CompatSet
mdsmap_compat(MDSMap::get_compat_set_all());
921 dout(10) << " my compat " << mdsmap_compat
<< dendl
;
922 dout(10) << " mdsmap compat " << mdsmap
->compat
<< dendl
;
923 if (!mdsmap_compat
.writeable(mdsmap
->compat
)) {
924 dout(0) << "handle_mds_map mdsmap compatset " << mdsmap
->compat
925 << " not writeable with daemon features " << mdsmap_compat
926 << ", killing myself" << dendl
;
931 // mark down any failed peers
932 for (map
<mds_gid_t
,MDSMap::mds_info_t
>::const_iterator p
= oldmap
->get_mds_info().begin();
933 p
!= oldmap
->get_mds_info().end();
935 if (mdsmap
->get_mds_info().count(p
->first
) == 0) {
936 dout(10) << " peer mds gid " << p
->first
<< " removed from map" << dendl
;
937 messenger
->mark_down(p
->second
.addr
);
941 if (whoami
== MDS_RANK_NONE
&&
942 new_state
== MDSMap::STATE_STANDBY_REPLAY
) {
943 whoami
= mdsmap
->get_mds_info_gid(mds_gid_t(monc
->get_global_id())).standby_for_rank
;
947 addr
= messenger
->get_myaddr();
948 dout(10) << "map says I am " << addr
<< " mds." << whoami
<< "." << incarnation
949 << " state " << ceph_mds_state_name(new_state
) << dendl
;
951 if (whoami
== MDS_RANK_NONE
) {
952 if (mds_rank
!= NULL
) {
953 const auto myid
= monc
->get_global_id();
954 // We have entered a rank-holding state, we shouldn't be back
956 if (g_conf
->mds_enforce_unique_name
) {
957 if (mds_gid_t existing
= mdsmap
->find_mds_gid_by_name(name
)) {
958 const MDSMap::mds_info_t
& i
= mdsmap
->get_info_gid(existing
);
959 if (i
.global_id
> myid
) {
960 dout(1) << "Map replaced me with another mds." << whoami
961 << " with gid (" << i
.global_id
<< ") larger than myself ("
962 << myid
<< "); quitting!" << dendl
;
963 // Call suicide() rather than respawn() because if someone else
964 // has taken our ID, we don't want to keep restarting and
965 // fighting them for the ID.
973 dout(1) << "Map removed me (mds." << whoami
<< " gid:"
974 << myid
<< ") from cluster due to lost contact; respawning" << dendl
;
977 // MDSRank not active: process the map here to see if we have
978 // been assigned a rank.
979 dout(10) << __func__
<< ": handling map in rankless mode" << dendl
;
980 _handle_mds_map(oldmap
);
983 // Did we already hold a different rank? MDSMonitor shouldn't try
984 // to change that out from under me!
985 if (mds_rank
&& whoami
!= mds_rank
->get_nodeid()) {
986 derr
<< "Invalid rank transition " << mds_rank
->get_nodeid() << "->"
991 // Did I previously not hold a rank? Initialize!
992 if (mds_rank
== NULL
) {
993 mds_rank
= new MDSRankDispatcher(whoami
, mds_lock
, clog
,
994 timer
, beacon
, mdsmap
, messenger
, monc
,
995 new FunctionContext([this](int r
){respawn();}),
996 new FunctionContext([this](int r
){suicide();}));
997 dout(10) << __func__
<< ": initializing MDS rank "
998 << mds_rank
->get_nodeid() << dendl
;
1002 // MDSRank is active: let him process the map, we have no say.
1003 dout(10) << __func__
<< ": handling map as rank "
1004 << mds_rank
->get_nodeid() << dendl
;
1005 mds_rank
->handle_mds_map(m
, oldmap
);
1009 beacon
.notify_mdsmap(mdsmap
);
1014 void MDSDaemon::_handle_mds_map(MDSMap
*oldmap
)
1016 MDSMap::DaemonState new_state
= mdsmap
->get_state_gid(mds_gid_t(monc
->get_global_id()));
1018 // Normal rankless case, we're marked as standby
1019 if (new_state
== MDSMap::STATE_STANDBY
) {
1020 beacon
.set_want_state(mdsmap
, new_state
);
1021 dout(1) << "Map has assigned me to become a standby" << dendl
;
1026 // Case where we thought we were standby, but MDSMap disagrees
1027 if (beacon
.get_want_state() == MDSMap::STATE_STANDBY
) {
1028 dout(10) << "dropped out of mdsmap, try to re-add myself" << dendl
;
1029 new_state
= MDSMap::STATE_BOOT
;
1030 beacon
.set_want_state(mdsmap
, new_state
);
1034 // Case where we have sent a boot beacon that isn't reflected yet
1035 if (beacon
.get_want_state() == MDSMap::STATE_BOOT
) {
1036 dout(10) << "not in map yet" << dendl
;
1040 void MDSDaemon::handle_signal(int signum
)
1042 assert(signum
== SIGINT
|| signum
== SIGTERM
);
1043 derr
<< "*** got signal " << sig_str(signum
) << " ***" << dendl
;
1045 Mutex::Locker
l(mds_lock
);
1053 void MDSDaemon::suicide()
1055 assert(mds_lock
.is_locked());
1057 // make sure we don't suicide twice
1058 assert(stopping
== false);
1061 dout(1) << "suicide! Wanted state "
1062 << ceph_mds_state_name(beacon
.get_want_state()) << dendl
;
1065 timer
.cancel_event(tick_event
);
1069 //because add_observer is called after set_up_admin_socket
1070 //so we can use asok_hook to avoid assert in the remove_observer
1071 if (asok_hook
!= NULL
) {
1073 g_conf
->remove_observer(this);
1077 clean_up_admin_socket();
1079 // Inform MDS we are going away, then shut down beacon
1080 beacon
.set_want_state(mdsmap
, MDSMap::STATE_DNE
);
1081 if (!mdsmap
->is_dne_gid(mds_gid_t(monc
->get_global_id()))) {
1082 // Notify the MDSMonitor that we're dying, so that it doesn't have to
1083 // wait for us to go laggy. Only do this if we're actually in the
1084 // MDSMap, because otherwise the MDSMonitor will drop our message.
1085 beacon
.send_and_wait(1);
1092 mds_rank
->shutdown();
1097 messenger
->shutdown();
1101 void MDSDaemon::respawn()
1103 dout(1) << "respawn!" << dendl
;
1105 /* Dump recent in case the MDS was stuck doing something which caused it to
1106 * be removed from the MDSMap leading to respawn. */
1107 g_ceph_context
->_log
->dump_recent();
1109 char *new_argv
[orig_argc
+1];
1110 dout(1) << " e: '" << orig_argv
[0] << "'" << dendl
;
1111 for (int i
=0; i
<orig_argc
; i
++) {
1112 new_argv
[i
] = (char *)orig_argv
[i
];
1113 dout(1) << " " << i
<< ": '" << orig_argv
[i
] << "'" << dendl
;
1115 new_argv
[orig_argc
] = NULL
;
1117 /* Determine the path to our executable, test if Linux /proc/self/exe exists.
1118 * This allows us to exec the same executable even if it has since been
1121 char exe_path
[PATH_MAX
] = "";
1122 if (readlink(PROCPREFIX
"/proc/self/exe", exe_path
, PATH_MAX
-1) == -1) {
1123 /* Print CWD for the user's interest */
1125 char *cwd
= getcwd(buf
, sizeof(buf
));
1127 dout(1) << " cwd " << cwd
<< dendl
;
1129 /* Fall back to a best-effort: just running in our CWD */
1130 strncpy(exe_path
, orig_argv
[0], PATH_MAX
-1);
1132 dout(1) << "respawning with exe " << exe_path
<< dendl
;
1133 strcpy(exe_path
, PROCPREFIX
"/proc/self/exe");
1136 dout(1) << " exe_path " << exe_path
<< dendl
;
1138 unblock_all_signals(NULL
);
1139 execv(exe_path
, new_argv
);
1141 dout(0) << "respawn execv " << orig_argv
[0]
1142 << " failed with " << cpp_strerror(errno
) << dendl
;
1144 // We have to assert out here, because suicide() returns, and callers
1145 // to respawn expect it never to return.
1151 bool MDSDaemon::ms_dispatch(Message
*m
)
1153 Mutex::Locker
l(mds_lock
);
1158 // Drop out early if shutting down
1159 if (beacon
.get_want_state() == CEPH_MDS_STATE_DNE
) {
1160 dout(10) << " stopping, discarding " << *m
<< dendl
;
1165 // First see if it's a daemon message
1166 const bool handled_core
= handle_core_message(m
);
1171 // Not core, try it as a rank message
1173 return mds_rank
->ms_dispatch(m
);
1179 bool MDSDaemon::ms_get_authorizer(int dest_type
, AuthAuthorizer
**authorizer
, bool force_new
)
1181 dout(10) << "MDSDaemon::ms_get_authorizer type="
1182 << ceph_entity_type_name(dest_type
) << dendl
;
1184 /* monitor authorization is being handled on different layer */
1185 if (dest_type
== CEPH_ENTITY_TYPE_MON
)
1189 if (monc
->wait_auth_rotating(10) < 0)
1193 *authorizer
= monc
->build_authorizer(dest_type
);
1194 return *authorizer
!= NULL
;
1199 * high priority messages we always process
1201 bool MDSDaemon::handle_core_message(Message
*m
)
1203 switch (m
->get_type()) {
1204 case CEPH_MSG_MON_MAP
:
1205 ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MON
);
1210 case CEPH_MSG_MDS_MAP
:
1211 ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MON
| CEPH_ENTITY_TYPE_MDS
);
1212 handle_mds_map(static_cast<MMDSMap
*>(m
));
1217 handle_command(static_cast<MCommand
*>(m
));
1219 case CEPH_MSG_OSD_MAP
:
1220 ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MON
| CEPH_ENTITY_TYPE_OSD
);
1223 mds_rank
->handle_osd_map();
1228 case MSG_MON_COMMAND
:
1229 ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MON
);
1230 clog
->warn() << "dropping `mds tell` command from legacy monitor";
1240 void MDSDaemon::ms_handle_connect(Connection
*con
)
1244 bool MDSDaemon::ms_handle_reset(Connection
*con
)
1246 if (con
->get_peer_type() != CEPH_ENTITY_TYPE_CLIENT
)
1249 Mutex::Locker
l(mds_lock
);
1253 dout(5) << "ms_handle_reset on " << con
->get_peer_addr() << dendl
;
1254 if (beacon
.get_want_state() == CEPH_MDS_STATE_DNE
)
1257 Session
*session
= static_cast<Session
*>(con
->get_priv());
1259 if (session
->is_closed()) {
1260 dout(3) << "ms_handle_reset closing connection for session " << session
->info
.inst
<< dendl
;
1262 con
->set_priv(NULL
);
1272 void MDSDaemon::ms_handle_remote_reset(Connection
*con
)
1274 if (con
->get_peer_type() != CEPH_ENTITY_TYPE_CLIENT
)
1277 Mutex::Locker
l(mds_lock
);
1282 dout(5) << "ms_handle_remote_reset on " << con
->get_peer_addr() << dendl
;
1283 if (beacon
.get_want_state() == CEPH_MDS_STATE_DNE
)
1286 Session
*session
= static_cast<Session
*>(con
->get_priv());
1288 if (session
->is_closed()) {
1289 dout(3) << "ms_handle_remote_reset closing connection for session " << session
->info
.inst
<< dendl
;
1291 con
->set_priv(NULL
);
1297 bool MDSDaemon::ms_handle_refused(Connection
*con
)
1299 // do nothing for now
1303 bool MDSDaemon::ms_verify_authorizer(Connection
*con
, int peer_type
,
1304 int protocol
, bufferlist
& authorizer_data
, bufferlist
& authorizer_reply
,
1305 bool& is_valid
, CryptoKey
& session_key
,
1306 std::unique_ptr
<AuthAuthorizerChallenge
> *challenge
)
1308 Mutex::Locker
l(mds_lock
);
1312 if (beacon
.get_want_state() == CEPH_MDS_STATE_DNE
)
1315 AuthAuthorizeHandler
*authorize_handler
= 0;
1316 switch (peer_type
) {
1317 case CEPH_ENTITY_TYPE_MDS
:
1318 authorize_handler
= authorize_handler_cluster_registry
->get_handler(protocol
);
1321 authorize_handler
= authorize_handler_service_registry
->get_handler(protocol
);
1323 if (!authorize_handler
) {
1324 dout(0) << "No AuthAuthorizeHandler found for protocol " << protocol
<< dendl
;
1329 AuthCapsInfo caps_info
;
1333 RotatingKeyRing
*keys
= monc
->rotating_secrets
.get();
1335 is_valid
= authorize_handler
->verify_authorizer(
1337 authorizer_data
, authorizer_reply
, name
, global_id
, caps_info
,
1338 session_key
, nullptr, challenge
);
1340 dout(10) << __func__
<< " no rotating_keys (yet), denied" << dendl
;
1345 entity_name_t
n(con
->get_peer_type(), global_id
);
1347 // We allow connections and assign Session instances to connections
1348 // even if we have not been assigned a rank, because clients with
1349 // "allow *" are allowed to connect and do 'tell' operations before
1353 // If we do hold a rank, see if this is an existing client establishing
1354 // a new connection, rather than a new client
1355 s
= mds_rank
->sessionmap
.get_session(n
);
1358 // Wire up a Session* to this connection
1359 // It doesn't go into a SessionMap instance until it sends an explicit
1360 // request to open a session (initial state of Session is `closed`)
1362 s
= new Session(nullptr);
1363 s
->info
.auth_name
= name
;
1364 s
->info
.inst
.addr
= con
->get_peer_addr();
1365 s
->info
.inst
.name
= n
;
1366 dout(10) << " new session " << s
<< " for " << s
->info
.inst
<< " con " << con
<< dendl
;
1368 s
->connection
= con
;
1370 mds_rank
->kick_waiters_for_any_client_connection();
1373 dout(10) << " existing session " << s
<< " for " << s
->info
.inst
<< " existing con " << s
->connection
1374 << ", new/authorizing con " << con
<< dendl
;
1375 con
->set_priv(s
->get());
1379 // Wait until we fully accept the connection before setting
1380 // s->connection. In particular, if there are multiple incoming
1381 // connection attempts, they will all get their authorizer
1382 // validated, but some of them may "lose the race" and get
1383 // dropped. We only want to consider the winner(s). See
1384 // ms_handle_accept(). This is important for Sessions we replay
1385 // from the journal on recovery that don't have established
1386 // messenger state; we want the con from only the winning
1387 // connect attempt(s). (Normal reconnects that don't follow MDS
1388 // recovery are reconnected to the existing con by the
1392 if (caps_info
.allow_all
) {
1393 // Flag for auth providers that don't provide cap strings
1394 s
->auth_caps
.set_allow_all();
1396 bufferlist::iterator p
= caps_info
.caps
.begin();
1397 string auth_cap_str
;
1399 ::decode(auth_cap_str
, p
);
1401 dout(10) << __func__
<< ": parsing auth_cap_str='" << auth_cap_str
<< "'" << dendl
;
1402 std::ostringstream errstr
;
1403 if (!s
->auth_caps
.parse(g_ceph_context
, auth_cap_str
, &errstr
)) {
1404 dout(1) << __func__
<< ": auth cap parse error: " << errstr
.str()
1405 << " parsing '" << auth_cap_str
<< "'" << dendl
;
1406 clog
->warn() << name
<< " mds cap '" << auth_cap_str
1407 << "' does not parse: " << errstr
.str();
1410 } catch (buffer::error
& e
) {
1411 // Assume legacy auth, defaults to:
1412 // * permit all filesystem ops
1413 // * permit no `tell` ops
1414 dout(1) << __func__
<< ": cannot decode auth caps bl of length " << caps_info
.caps
.length() << dendl
;
1420 return true; // we made a decision (see is_valid)
1424 void MDSDaemon::ms_handle_accept(Connection
*con
)
1426 Mutex::Locker
l(mds_lock
);
1431 Session
*s
= static_cast<Session
*>(con
->get_priv());
1432 dout(10) << "ms_handle_accept " << con
->get_peer_addr() << " con " << con
<< " session " << s
<< dendl
;
1434 if (s
->connection
!= con
) {
1435 dout(10) << " session connection " << s
->connection
<< " -> " << con
<< dendl
;
1436 s
->connection
= con
;
1438 // send out any queued messages
1439 while (!s
->preopen_out_queue
.empty()) {
1440 con
->send_message(s
->preopen_out_queue
.front());
1441 s
->preopen_out_queue
.pop_front();
1448 bool MDSDaemon::is_clean_shutdown()
1451 return mds_rank
->is_stopped();