]> git.proxmox.com Git - ceph.git/blob - ceph/src/mds/MDSDaemon.cc
fef8a18e002b5312de6a25b99ae517ad817a087c
[ceph.git] / ceph / src / mds / MDSDaemon.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #include <unistd.h>
16
17 #include "include/compat.h"
18 #include "include/types.h"
19 #include "include/str_list.h"
20
21 #include "common/Clock.h"
22 #include "common/HeartbeatMap.h"
23 #include "common/Timer.h"
24 #include "common/ceph_argparse.h"
25 #include "common/config.h"
26 #include "common/entity_name.h"
27 #include "common/errno.h"
28 #include "common/perf_counters.h"
29 #include "common/signal.h"
30 #include "common/version.h"
31
32 #include "global/signal_handler.h"
33
34 #include "msg/Messenger.h"
35 #include "mon/MonClient.h"
36
37 #include "osdc/Objecter.h"
38
39 #include "MDSMap.h"
40
41 #include "MDSDaemon.h"
42 #include "Server.h"
43 #include "Locker.h"
44
45 #include "SnapServer.h"
46 #include "SnapClient.h"
47
48 #include "events/ESession.h"
49 #include "events/ESubtreeMap.h"
50
51 #include "auth/AuthAuthorizeHandler.h"
52 #include "auth/RotatingKeyRing.h"
53 #include "auth/KeyRing.h"
54
55 #include "perfglue/cpu_profiler.h"
56 #include "perfglue/heap_profiler.h"
57
58 #define dout_context g_ceph_context
59 #define dout_subsys ceph_subsys_mds
60 #undef dout_prefix
61 #define dout_prefix *_dout << "mds." << name << ' '
62
63 using std::string;
64 using std::vector;
65 using TOPNSPC::common::cmd_getval;
66
67 // cons/des
68 MDSDaemon::MDSDaemon(std::string_view n, Messenger *m, MonClient *mc,
69 boost::asio::io_context& ioctx) :
70 Dispatcher(m->cct),
71 timer(m->cct, mds_lock),
72 gss_ktfile_client(m->cct->_conf.get_val<std::string>("gss_ktab_client_file")),
73 beacon(m->cct, mc, n),
74 name(n),
75 messenger(m),
76 monc(mc),
77 ioctx(ioctx),
78 mgrc(m->cct, m, &mc->monmap),
79 log_client(m->cct, messenger, &mc->monmap, LogClient::NO_FLAGS),
80 starttime(mono_clock::now())
81 {
82 orig_argc = 0;
83 orig_argv = NULL;
84
85 clog = log_client.create_channel();
86 if (!gss_ktfile_client.empty()) {
87 // Assert we can export environment variable
88 /*
89 The default client keytab is used, if it is present and readable,
90 to automatically obtain initial credentials for GSSAPI client
91 applications. The principal name of the first entry in the client
92 keytab is used by default when obtaining initial credentials.
93 1. The KRB5_CLIENT_KTNAME environment variable.
94 2. The default_client_keytab_name profile variable in [libdefaults].
95 3. The hardcoded default, DEFCKTNAME.
96 */
97 const int32_t set_result(setenv("KRB5_CLIENT_KTNAME",
98 gss_ktfile_client.c_str(), 1));
99 ceph_assert(set_result == 0);
100 }
101
102 mdsmap.reset(new MDSMap);
103 }
104
105 MDSDaemon::~MDSDaemon() {
106 std::lock_guard lock(mds_lock);
107
108 delete mds_rank;
109 mds_rank = NULL;
110 }
111
112 class MDSSocketHook : public AdminSocketHook {
113 MDSDaemon *mds;
114 public:
115 explicit MDSSocketHook(MDSDaemon *m) : mds(m) {}
116 int call(
117 std::string_view command,
118 const cmdmap_t& cmdmap,
119 Formatter *f,
120 std::ostream& errss,
121 ceph::buffer::list& out) override {
122 ceph_abort("should go to call_async");
123 }
124 void call_async(
125 std::string_view command,
126 const cmdmap_t& cmdmap,
127 Formatter *f,
128 const bufferlist& inbl,
129 std::function<void(int,const std::string&,bufferlist&)> on_finish) override {
130 mds->asok_command(command, cmdmap, f, inbl, on_finish);
131 }
132 };
133
134 void MDSDaemon::asok_command(
135 std::string_view command,
136 const cmdmap_t& cmdmap,
137 Formatter *f,
138 const bufferlist& inbl,
139 std::function<void(int,const std::string&,bufferlist&)> on_finish)
140 {
141 dout(1) << "asok_command: " << command << " " << cmdmap
142 << " (starting...)" << dendl;
143
144 int r = -CEPHFS_ENOSYS;
145 bufferlist outbl;
146 CachedStackStringStream css;
147 auto& ss = *css;
148 if (command == "status") {
149 dump_status(f);
150 r = 0;
151 } else if (command == "exit") {
152 outbl.append("Exiting...\n");
153 r = 0;
154 std::thread t([this](){
155 // Wait a little to improve chances of caller getting
156 // our response before seeing us disappear from mdsmap
157 sleep(1);
158 std::lock_guard l(mds_lock);
159 suicide();
160 });
161 t.detach();
162 } else if (command == "respawn") {
163 outbl.append("Respawning...\n");
164 r = 0;
165 std::thread t([this](){
166 // Wait a little to improve chances of caller getting
167 // our response before seeing us disappear from mdsmap
168 sleep(1);
169 std::lock_guard l(mds_lock);
170 respawn();
171 });
172 t.detach();
173 } else if (command == "heap") {
174 if (!ceph_using_tcmalloc()) {
175 ss << "not using tcmalloc";
176 r = -CEPHFS_EOPNOTSUPP;
177 } else {
178 string heapcmd;
179 cmd_getval(cmdmap, "heapcmd", heapcmd);
180 vector<string> heapcmd_vec;
181 get_str_vec(heapcmd, heapcmd_vec);
182 string value;
183 if (cmd_getval(cmdmap, "value", value)) {
184 heapcmd_vec.push_back(value);
185 }
186 std::stringstream outss;
187 ceph_heap_profiler_handle_command(heapcmd_vec, outss);
188 outbl.append(outss);
189 r = 0;
190 }
191 } else if (command == "cpu_profiler") {
192 string arg;
193 cmd_getval(cmdmap, "arg", arg);
194 vector<string> argvec;
195 get_str_vec(arg, argvec);
196 cpu_profiler_handle_command(argvec, ss);
197 r = 0;
198 } else {
199 if (mds_rank == NULL) {
200 dout(1) << "Can't run that command on an inactive MDS!" << dendl;
201 f->dump_string("error", "mds_not_active");
202 } else {
203 try {
204 mds_rank->handle_asok_command(command, cmdmap, f, inbl, on_finish);
205 return;
206 } catch (const TOPNSPC::common::bad_cmd_get& e) {
207 ss << e.what();
208 r = -CEPHFS_EINVAL;
209 }
210 }
211 }
212 on_finish(r, ss.str(), outbl);
213 }
214
215 void MDSDaemon::dump_status(Formatter *f)
216 {
217 f->open_object_section("status");
218 f->dump_stream("cluster_fsid") << monc->get_fsid();
219 if (mds_rank) {
220 f->dump_int("whoami", mds_rank->get_nodeid());
221 } else {
222 f->dump_int("whoami", MDS_RANK_NONE);
223 }
224
225 f->dump_int("id", monc->get_global_id());
226 f->dump_string("want_state", ceph_mds_state_name(beacon.get_want_state()));
227 f->dump_string("state", ceph_mds_state_name(mdsmap->get_state_gid(mds_gid_t(
228 monc->get_global_id()))));
229 if (mds_rank) {
230 std::lock_guard l(mds_lock);
231 mds_rank->dump_status(f);
232 }
233
234 f->dump_unsigned("mdsmap_epoch", mdsmap->get_epoch());
235 if (mds_rank) {
236 f->dump_unsigned("osdmap_epoch", mds_rank->get_osd_epoch());
237 f->dump_unsigned("osdmap_epoch_barrier", mds_rank->get_osd_epoch_barrier());
238 } else {
239 f->dump_unsigned("osdmap_epoch", 0);
240 f->dump_unsigned("osdmap_epoch_barrier", 0);
241 }
242
243 f->dump_float("uptime", get_uptime().count());
244
245 f->close_section(); // status
246 }
247
248 void MDSDaemon::set_up_admin_socket()
249 {
250 int r;
251 AdminSocket *admin_socket = g_ceph_context->get_admin_socket();
252 ceph_assert(asok_hook == nullptr);
253 asok_hook = new MDSSocketHook(this);
254 r = admin_socket->register_command("status", asok_hook,
255 "high-level status of MDS");
256 ceph_assert(r == 0);
257 r = admin_socket->register_command("dump_ops_in_flight", asok_hook,
258 "show the ops currently in flight");
259 ceph_assert(r == 0);
260 r = admin_socket->register_command("ops", asok_hook,
261 "show the ops currently in flight");
262 ceph_assert(r == 0);
263 r = admin_socket->register_command("dump_blocked_ops",
264 asok_hook,
265 "show the blocked ops currently in flight");
266 ceph_assert(r == 0);
267 r = admin_socket->register_command("dump_historic_ops",
268 asok_hook,
269 "show recent ops");
270 ceph_assert(r == 0);
271 r = admin_socket->register_command("dump_historic_ops_by_duration",
272 asok_hook,
273 "show recent ops, sorted by op duration");
274 ceph_assert(r == 0);
275 r = admin_socket->register_command("scrub_path name=path,type=CephString "
276 "name=scrubops,type=CephChoices,"
277 "strings=force|recursive|repair,n=N,req=false "
278 "name=tag,type=CephString,req=false",
279 asok_hook,
280 "scrub an inode and output results");
281 ceph_assert(r == 0);
282 r = admin_socket->register_command("scrub start "
283 "name=path,type=CephString "
284 "name=scrubops,type=CephChoices,strings=force|recursive|repair,n=N,req=false "
285 "name=tag,type=CephString,req=false",
286 asok_hook,
287 "scrub and inode and output results");
288 ceph_assert(r == 0);
289 r = admin_socket->register_command("scrub abort",
290 asok_hook,
291 "Abort in progress scrub operations(s)");
292 ceph_assert(r == 0);
293 r = admin_socket->register_command("scrub pause",
294 asok_hook,
295 "Pause in progress scrub operations(s)");
296 ceph_assert(r == 0);
297 r = admin_socket->register_command("scrub resume",
298 asok_hook,
299 "Resume paused scrub operations(s)");
300 ceph_assert(r == 0);
301 r = admin_socket->register_command("scrub status",
302 asok_hook,
303 "Status of scrub operations(s)");
304 ceph_assert(r == 0);
305 r = admin_socket->register_command("tag path name=path,type=CephString"
306 " name=tag,type=CephString",
307 asok_hook,
308 "Apply scrub tag recursively");
309 ceph_assert(r == 0);
310 r = admin_socket->register_command("flush_path name=path,type=CephString",
311 asok_hook,
312 "flush an inode (and its dirfrags)");
313 ceph_assert(r == 0);
314 r = admin_socket->register_command("export dir "
315 "name=path,type=CephString "
316 "name=rank,type=CephInt",
317 asok_hook,
318 "migrate a subtree to named MDS");
319 ceph_assert(r == 0);
320 r = admin_socket->register_command("dump cache "
321 "name=path,type=CephString,req=false "
322 "name=timeout,type=CephInt,range=0,req=false",
323 asok_hook,
324 "dump metadata cache (optionally to a file)");
325 ceph_assert(r == 0);
326 r = admin_socket->register_command("cache drop "
327 "name=timeout,type=CephInt,range=0,req=false",
328 asok_hook,
329 "trim cache and optionally request client to release all caps and flush the journal");
330 ceph_assert(r == 0);
331 r = admin_socket->register_command("cache status",
332 asok_hook,
333 "show cache status");
334 ceph_assert(r == 0);
335 r = admin_socket->register_command("dump tree "
336 "name=root,type=CephString,req=true "
337 "name=depth,type=CephInt,req=false ",
338 asok_hook,
339 "dump metadata cache for subtree");
340 ceph_assert(r == 0);
341 r = admin_socket->register_command("dump loads",
342 asok_hook,
343 "dump metadata loads");
344 ceph_assert(r == 0);
345 r = admin_socket->register_command("dump snaps name=server,type=CephChoices,strings=--server,req=false",
346 asok_hook,
347 "dump snapshots");
348 ceph_assert(r == 0);
349 r = admin_socket->register_command("session ls "
350 "name=cap_dump,type=CephBool,req=false "
351 "name=filters,type=CephString,n=N,req=false ",
352 asok_hook,
353 "List client sessions based on a filter");
354 ceph_assert(r == 0);
355 r = admin_socket->register_command("client ls "
356 "name=cap_dump,type=CephBool,req=false "
357 "name=filters,type=CephString,n=N,req=false ",
358 asok_hook,
359 "List client sessions based on a filter");
360 ceph_assert(r == 0);
361 r = admin_socket->register_command("session evict name=filters,type=CephString,n=N,req=false",
362 asok_hook,
363 "Evict client session(s) based on a filter");
364 ceph_assert(r == 0);
365 r = admin_socket->register_command("client evict name=filters,type=CephString,n=N,req=false",
366 asok_hook,
367 "Evict client session(s) based on a filter");
368 ceph_assert(r == 0);
369 r = admin_socket->register_command("session kill name=client_id,type=CephString",
370 asok_hook,
371 "Evict a client session by id");
372 ceph_assert(r == 0);
373 r = admin_socket->register_command("session ls name=cap_dump,type=CephBool,req=false",
374 asok_hook,
375 "Enumerate connected CephFS clients");
376 ceph_assert(r == 0);
377 r = admin_socket->register_command("session config "
378 "name=client_id,type=CephInt,req=true "
379 "name=option,type=CephString,req=true "
380 "name=value,type=CephString,req=false ",
381 asok_hook,
382 "Config a CephFS client session");
383 ceph_assert(r == 0);
384 r = admin_socket->register_command("client config "
385 "name=client_id,type=CephInt,req=true "
386 "name=option,type=CephString,req=true "
387 "name=value,type=CephString,req=false ",
388 asok_hook,
389 "Config a CephFS client session");
390 ceph_assert(r == 0);
391 r = admin_socket->register_command("damage ls",
392 asok_hook,
393 "List detected metadata damage");
394 ceph_assert(r == 0);
395 r = admin_socket->register_command("damage rm "
396 "name=damage_id,type=CephInt",
397 asok_hook,
398 "Remove a damage table entry");
399 ceph_assert(r == 0);
400 r = admin_socket->register_command("osdmap barrier name=target_epoch,type=CephInt",
401 asok_hook,
402 "Wait until the MDS has this OSD map epoch");
403 ceph_assert(r == 0);
404 r = admin_socket->register_command("flush journal",
405 asok_hook,
406 "Flush the journal to the backing store");
407 ceph_assert(r == 0);
408 r = admin_socket->register_command("force_readonly",
409 asok_hook,
410 "Force MDS to read-only mode");
411 ceph_assert(r == 0);
412 r = admin_socket->register_command("get subtrees",
413 asok_hook,
414 "Return the subtree map");
415 ceph_assert(r == 0);
416 r = admin_socket->register_command("dirfrag split "
417 "name=path,type=CephString,req=true "
418 "name=frag,type=CephString,req=true "
419 "name=bits,type=CephInt,req=true ",
420 asok_hook,
421 "Fragment directory by path");
422 ceph_assert(r == 0);
423 r = admin_socket->register_command("dirfrag merge "
424 "name=path,type=CephString,req=true "
425 "name=frag,type=CephString,req=true",
426 asok_hook,
427 "De-fragment directory by path");
428 ceph_assert(r == 0);
429 r = admin_socket->register_command("dirfrag ls "
430 "name=path,type=CephString,req=true",
431 asok_hook,
432 "List fragments in directory");
433 ceph_assert(r == 0);
434 r = admin_socket->register_command("openfiles ls",
435 asok_hook,
436 "List the opening files and their caps");
437 ceph_assert(r == 0);
438 r = admin_socket->register_command("dump inode "
439 "name=number,type=CephInt,req=true",
440 asok_hook,
441 "dump inode by inode number");
442 ceph_assert(r == 0);
443 r = admin_socket->register_command("exit",
444 asok_hook,
445 "Terminate this MDS");
446 r = admin_socket->register_command("respawn",
447 asok_hook,
448 "Respawn this MDS");
449 ceph_assert(r == 0);
450 r = admin_socket->register_command(
451 "heap " \
452 "name=heapcmd,type=CephChoices,strings=" \
453 "dump|start_profiler|stop_profiler|release|get_release_rate|set_release_rate|stats " \
454 "name=value,type=CephString,req=false",
455 asok_hook,
456 "show heap usage info (available only if compiled with tcmalloc)");
457 ceph_assert(r == 0);
458 r = admin_socket->register_command(
459 "cpu_profiler " \
460 "name=arg,type=CephChoices,strings=status|flush",
461 asok_hook,
462 "run cpu profiling on daemon");
463 ceph_assert(r == 0);
464 }
465
466 void MDSDaemon::clean_up_admin_socket()
467 {
468 g_ceph_context->get_admin_socket()->unregister_commands(asok_hook);
469 delete asok_hook;
470 asok_hook = NULL;
471 }
472
473 int MDSDaemon::init()
474 {
475 #ifdef _WIN32
476 // Some file related flags and types are stubbed on Windows. In order to avoid
477 // incorrect behavior, we're going to prevent the MDS from running on Windows
478 // until those limitations are addressed. MDS clients, however, are allowed
479 // to run on Windows.
480 derr << "The Ceph MDS does not support running on Windows at the moment."
481 << dendl;
482 return -CEPHFS_ENOSYS;
483 #endif // _WIN32
484
485 dout(10) << "Dumping misc struct sizes:" << dendl;
486 dout(10) << sizeof(MDSCacheObject) << "\tMDSCacheObject" << dendl;
487 dout(10) << sizeof(CInode) << "\tCInode" << dendl;
488 dout(10) << sizeof(elist<void*>::item) << "\telist<>::item" << dendl;
489 dout(10) << sizeof(CInode::mempool_inode) << "\tinode" << dendl;
490 dout(10) << sizeof(CInode::mempool_old_inode) << "\told_inode" << dendl;
491 dout(10) << sizeof(nest_info_t) << "\tnest_info_t" << dendl;
492 dout(10) << sizeof(frag_info_t) << "\tfrag_info_t" << dendl;
493 dout(10) << sizeof(SimpleLock) << "\tSimpleLock" << dendl;
494 dout(10) << sizeof(ScatterLock) << "\tScatterLock" << dendl;
495 dout(10) << sizeof(CDentry) << "\tCDentry" << dendl;
496 dout(10) << sizeof(elist<void*>::item) << "\telist<>::item" << dendl;
497 dout(10) << sizeof(SimpleLock) << "\tSimpleLock" << dendl;
498 dout(10) << sizeof(CDir) << "\tCDir" << dendl;
499 dout(10) << sizeof(elist<void*>::item) << "\telist<>::item" << dendl;
500 dout(10) << sizeof(fnode_t) << "\tfnode_t" << dendl;
501 dout(10) << sizeof(nest_info_t) << "\tnest_info_t" << dendl;
502 dout(10) << sizeof(frag_info_t) << "\tfrag_info_t" << dendl;
503 dout(10) << sizeof(Capability) << "\tCapability" << dendl;
504 dout(10) << sizeof(xlist<void*>::item) << "\txlist<>::item" << dendl;
505
506 messenger->add_dispatcher_tail(&beacon);
507 messenger->add_dispatcher_tail(this);
508
509 // init monc
510 monc->set_messenger(messenger);
511
512 monc->set_want_keys(CEPH_ENTITY_TYPE_MON | CEPH_ENTITY_TYPE_OSD |
513 CEPH_ENTITY_TYPE_MDS | CEPH_ENTITY_TYPE_MGR);
514 int r = 0;
515 r = monc->init();
516 if (r < 0) {
517 derr << "ERROR: failed to init monc: " << cpp_strerror(-r) << dendl;
518 mds_lock.lock();
519 suicide();
520 mds_lock.unlock();
521 return r;
522 }
523
524 messenger->set_auth_client(monc);
525 messenger->set_auth_server(monc);
526 monc->set_handle_authentication_dispatcher(this);
527
528 // tell monc about log_client so it will know about mon session resets
529 monc->set_log_client(&log_client);
530
531 r = monc->authenticate();
532 if (r < 0) {
533 derr << "ERROR: failed to authenticate: " << cpp_strerror(-r) << dendl;
534 mds_lock.lock();
535 suicide();
536 mds_lock.unlock();
537 return r;
538 }
539
540 int rotating_auth_attempts = 0;
541 auto rotating_auth_timeout =
542 g_conf().get_val<int64_t>("rotating_keys_bootstrap_timeout");
543 while (monc->wait_auth_rotating(rotating_auth_timeout) < 0) {
544 if (++rotating_auth_attempts <= g_conf()->max_rotating_auth_attempts) {
545 derr << "unable to obtain rotating service keys; retrying" << dendl;
546 continue;
547 }
548 derr << "ERROR: failed to refresh rotating keys, "
549 << "maximum retry time reached." << dendl;
550 std::lock_guard locker{mds_lock};
551 suicide();
552 return -CEPHFS_ETIMEDOUT;
553 }
554
555 mds_lock.lock();
556 if (beacon.get_want_state() == CEPH_MDS_STATE_DNE) {
557 dout(4) << __func__ << ": terminated already, dropping out" << dendl;
558 mds_lock.unlock();
559 return 0;
560 }
561
562 monc->sub_want("mdsmap", 0, 0);
563 monc->renew_subs();
564
565 mds_lock.unlock();
566
567 // Set up admin socket before taking mds_lock, so that ordering
568 // is consistent (later we take mds_lock within asok callbacks)
569 set_up_admin_socket();
570 std::lock_guard locker{mds_lock};
571 if (beacon.get_want_state() == MDSMap::STATE_DNE) {
572 suicide(); // we could do something more graceful here
573 dout(4) << __func__ << ": terminated already, dropping out" << dendl;
574 return 0;
575 }
576
577 timer.init();
578
579 beacon.init(*mdsmap);
580 messenger->set_myname(entity_name_t::MDS(MDS_RANK_NONE));
581
582 // schedule tick
583 reset_tick();
584 return 0;
585 }
586
587 void MDSDaemon::reset_tick()
588 {
589 // cancel old
590 if (tick_event) timer.cancel_event(tick_event);
591
592 // schedule
593 tick_event = timer.add_event_after(
594 g_conf()->mds_tick_interval,
595 new LambdaContext([this](int) {
596 ceph_assert(ceph_mutex_is_locked_by_me(mds_lock));
597 tick();
598 }));
599 }
600
601 void MDSDaemon::tick()
602 {
603 // reschedule
604 reset_tick();
605
606 // Call through to subsystems' tick functions
607 if (mds_rank) {
608 mds_rank->tick();
609 }
610 }
611
612 void MDSDaemon::handle_command(const cref_t<MCommand> &m)
613 {
614 auto priv = m->get_connection()->get_priv();
615 auto session = static_cast<Session *>(priv.get());
616 ceph_assert(session != NULL);
617
618 int r = 0;
619 cmdmap_t cmdmap;
620 CachedStackStringStream css;
621 auto& ss = *css;
622 bufferlist outbl;
623
624 // If someone is using a closed session for sending commands (e.g.
625 // the ceph CLI) then we should feel free to clean up this connection
626 // as soon as we've sent them a response.
627 const bool live_session =
628 session->get_state_seq() > 0 &&
629 mds_rank &&
630 mds_rank->sessionmap.get_session(session->info.inst.name);
631
632 if (!live_session) {
633 // This session only existed to issue commands, so terminate it
634 // as soon as we can.
635 ceph_assert(session->is_closed());
636 session->get_connection()->mark_disposable();
637 }
638 priv.reset();
639
640 if (!session->auth_caps.allow_all()) {
641 dout(1) << __func__
642 << ": received command from client without `tell` capability: "
643 << *m->get_connection()->peer_addrs << dendl;
644
645 ss << "permission denied";
646 r = -CEPHFS_EACCES;
647 } else if (m->cmd.empty()) {
648 r = -CEPHFS_EINVAL;
649 ss << "no command given";
650 } else if (!TOPNSPC::common::cmdmap_from_json(m->cmd, &cmdmap, ss)) {
651 r = -CEPHFS_EINVAL;
652 } else {
653 cct->get_admin_socket()->queue_tell_command(m);
654 return;
655 }
656
657 auto reply = make_message<MCommandReply>(r, ss.str());
658 reply->set_tid(m->get_tid());
659 reply->set_data(outbl);
660 m->get_connection()->send_message2(reply);
661 }
662
663 void MDSDaemon::handle_mds_map(const cref_t<MMDSMap> &m)
664 {
665 version_t epoch = m->get_epoch();
666
667 // is it new?
668 if (epoch <= mdsmap->get_epoch()) {
669 dout(5) << "handle_mds_map old map epoch " << epoch << " <= "
670 << mdsmap->get_epoch() << ", discarding" << dendl;
671 return;
672 }
673
674 dout(1) << "Updating MDS map to version " << epoch << " from " << m->get_source() << dendl;
675
676 // keep old map, for a moment
677 std::unique_ptr<MDSMap> oldmap;
678 oldmap.swap(mdsmap);
679
680 // decode and process
681 mdsmap.reset(new MDSMap);
682 mdsmap->decode(m->get_encoded());
683
684 monc->sub_got("mdsmap", mdsmap->get_epoch());
685
686 // verify compatset
687 CompatSet mdsmap_compat(MDSMap::get_compat_set_all());
688 dout(10) << " my compat " << mdsmap_compat << dendl;
689 dout(10) << " mdsmap compat " << mdsmap->compat << dendl;
690 if (!mdsmap_compat.writeable(mdsmap->compat)) {
691 dout(0) << "handle_mds_map mdsmap compatset " << mdsmap->compat
692 << " not writeable with daemon features " << mdsmap_compat
693 << ", killing myself" << dendl;
694 suicide();
695 return;
696 }
697
698 // Calculate my effective rank (either my owned rank or the rank I'm following if STATE_STANDBY_REPLAY
699 const auto addrs = messenger->get_myaddrs();
700 const auto myid = monc->get_global_id();
701 const auto mygid = mds_gid_t(myid);
702 const auto whoami = mdsmap->get_rank_gid(mygid);
703 const auto old_state = oldmap->get_state_gid(mygid);
704 const auto new_state = mdsmap->get_state_gid(mygid);
705 const auto incarnation = mdsmap->get_inc_gid(mygid);
706 dout(10) << "my gid is " << myid << dendl;
707 dout(10) << "map says I am mds." << whoami << "." << incarnation
708 << " state " << ceph_mds_state_name(new_state) << dendl;
709 dout(10) << "msgr says I am " << addrs << dendl;
710
711 // If we're removed from the MDSMap, stop all processing.
712 using DS = MDSMap::DaemonState;
713 if (old_state != DS::STATE_NULL && new_state == DS::STATE_NULL) {
714 const auto& oldinfo = oldmap->get_info_gid(mygid);
715 dout(1) << "Map removed me " << oldinfo
716 << " from cluster; respawning! See cluster/monitor logs for details." << dendl;
717 respawn();
718 }
719
720 if (old_state == DS::STATE_NULL && new_state != DS::STATE_NULL) {
721 /* The MDS has been added to the FSMap, now we can init the MgrClient */
722 mgrc.init();
723 messenger->add_dispatcher_tail(&mgrc);
724 monc->sub_want("mgrmap", 0, 0);
725 monc->renew_subs(); /* MgrMap receipt drives connection to ceph-mgr */
726 }
727
728 // mark down any failed peers
729 for (const auto& [gid, info] : oldmap->get_mds_info()) {
730 if (mdsmap->get_mds_info().count(gid) == 0) {
731 dout(10) << " peer mds gid " << gid << " removed from map" << dendl;
732 messenger->mark_down_addrs(info.addrs);
733 }
734 }
735
736 if (whoami == MDS_RANK_NONE) {
737 // We do not hold a rank:
738 dout(10) << __func__ << ": handling map in rankless mode" << dendl;
739
740 if (new_state == DS::STATE_STANDBY) {
741 /* Note: STATE_BOOT is never an actual state in the FSMap. The Monitors
742 * generally mark a new MDS as STANDBY (although it's possible to
743 * immediately be assigned a rank).
744 */
745 if (old_state == DS::STATE_NULL) {
746 dout(1) << "Monitors have assigned me to become a standby." << dendl;
747 beacon.set_want_state(*mdsmap, new_state);
748 } else if (old_state == DS::STATE_STANDBY) {
749 dout(5) << "I am still standby" << dendl;
750 }
751 } else if (new_state == DS::STATE_NULL) {
752 /* We are not in the MDSMap yet! Keep waiting: */
753 ceph_assert(beacon.get_want_state() == DS::STATE_BOOT);
754 dout(10) << "not in map yet" << dendl;
755 } else {
756 /* We moved to standby somehow from another state */
757 ceph_abort("invalid transition to standby");
758 }
759 } else {
760 // Did we already hold a different rank? MDSMonitor shouldn't try
761 // to change that out from under me!
762 if (mds_rank && whoami != mds_rank->get_nodeid()) {
763 derr << "Invalid rank transition " << mds_rank->get_nodeid() << "->"
764 << whoami << dendl;
765 respawn();
766 }
767
768 // Did I previously not hold a rank? Initialize!
769 if (mds_rank == NULL) {
770 mds_rank = new MDSRankDispatcher(whoami, mds_lock, clog,
771 timer, beacon, mdsmap, messenger, monc, &mgrc,
772 new LambdaContext([this](int r){respawn();}),
773 new LambdaContext([this](int r){suicide();}),
774 ioctx);
775 dout(10) << __func__ << ": initializing MDS rank "
776 << mds_rank->get_nodeid() << dendl;
777 mds_rank->init();
778 }
779
780 // MDSRank is active: let him process the map, we have no say.
781 dout(10) << __func__ << ": handling map as rank "
782 << mds_rank->get_nodeid() << dendl;
783 mds_rank->handle_mds_map(m, *oldmap);
784 }
785
786 beacon.notify_mdsmap(*mdsmap);
787 }
788
789 void MDSDaemon::handle_signal(int signum)
790 {
791 ceph_assert(signum == SIGINT || signum == SIGTERM);
792 derr << "*** got signal " << sig_str(signum) << " ***" << dendl;
793 {
794 std::lock_guard l(mds_lock);
795 if (stopping) {
796 return;
797 }
798 suicide();
799 }
800 }
801
802 void MDSDaemon::suicide()
803 {
804 ceph_assert(ceph_mutex_is_locked(mds_lock));
805
806 // make sure we don't suicide twice
807 ceph_assert(stopping == false);
808 stopping = true;
809
810 dout(1) << "suicide! Wanted state "
811 << ceph_mds_state_name(beacon.get_want_state()) << dendl;
812
813 if (tick_event) {
814 timer.cancel_event(tick_event);
815 tick_event = 0;
816 }
817
818 clean_up_admin_socket();
819
820 // Notify the Monitors (MDSMonitor) that we're dying, so that it doesn't have
821 // to wait for us to go laggy. Only do this if we're actually in the MDSMap,
822 // because otherwise the MDSMonitor will drop our message.
823 beacon.set_want_state(*mdsmap, MDSMap::STATE_DNE);
824 if (!mdsmap->is_dne_gid(mds_gid_t(monc->get_global_id()))) {
825 beacon.send_and_wait(1);
826 }
827 beacon.shutdown();
828
829 if (mgrc.is_initialized())
830 mgrc.shutdown();
831
832 if (mds_rank) {
833 mds_rank->shutdown();
834 } else {
835 timer.shutdown();
836
837 monc->shutdown();
838 messenger->shutdown();
839 }
840 }
841
842 void MDSDaemon::respawn()
843 {
844 // --- WARNING TO FUTURE COPY/PASTERS ---
845 // You must also add a call like
846 //
847 // ceph_pthread_setname(pthread_self(), "ceph-mds");
848 //
849 // to main() so that /proc/$pid/stat field 2 contains "(ceph-mds)"
850 // instead of "(exe)", so that killall (and log rotation) will work.
851
852 dout(1) << "respawn!" << dendl;
853
854 /* Dump recent in case the MDS was stuck doing something which caused it to
855 * be removed from the MDSMap leading to respawn. */
856 g_ceph_context->_log->dump_recent();
857
858 /* valgrind can't handle execve; just exit and let QA infra restart */
859 if (g_conf().get_val<bool>("mds_valgrind_exit")) {
860 _exit(0);
861 }
862
863 char *new_argv[orig_argc+1];
864 dout(1) << " e: '" << orig_argv[0] << "'" << dendl;
865 for (int i=0; i<orig_argc; i++) {
866 new_argv[i] = (char *)orig_argv[i];
867 dout(1) << " " << i << ": '" << orig_argv[i] << "'" << dendl;
868 }
869 new_argv[orig_argc] = NULL;
870
871 /* Determine the path to our executable, test if Linux /proc/self/exe exists.
872 * This allows us to exec the same executable even if it has since been
873 * unlinked.
874 */
875 char exe_path[PATH_MAX] = "";
876 #ifdef PROCPREFIX
877 if (readlink(PROCPREFIX "/proc/self/exe", exe_path, PATH_MAX-1) != -1) {
878 dout(1) << "respawning with exe " << exe_path << dendl;
879 strcpy(exe_path, PROCPREFIX "/proc/self/exe");
880 } else {
881 #else
882 {
883 #endif
884 /* Print CWD for the user's interest */
885 char buf[PATH_MAX];
886 char *cwd = getcwd(buf, sizeof(buf));
887 ceph_assert(cwd);
888 dout(1) << " cwd " << cwd << dendl;
889
890 /* Fall back to a best-effort: just running in our CWD */
891 strncpy(exe_path, orig_argv[0], PATH_MAX-1);
892 }
893
894 dout(1) << " exe_path " << exe_path << dendl;
895
896 unblock_all_signals(NULL);
897 execv(exe_path, new_argv);
898
899 dout(0) << "respawn execv " << orig_argv[0]
900 << " failed with " << cpp_strerror(errno) << dendl;
901
902 // We have to assert out here, because suicide() returns, and callers
903 // to respawn expect it never to return.
904 ceph_abort();
905 }
906
907
908
909 bool MDSDaemon::ms_dispatch2(const ref_t<Message> &m)
910 {
911 std::lock_guard l(mds_lock);
912 if (stopping) {
913 return false;
914 }
915
916 // Drop out early if shutting down
917 if (beacon.get_want_state() == CEPH_MDS_STATE_DNE) {
918 dout(10) << " stopping, discarding " << *m << dendl;
919 return true;
920 }
921
922 // First see if it's a daemon message
923 const bool handled_core = handle_core_message(m);
924 if (handled_core) {
925 return true;
926 }
927
928 // Not core, try it as a rank message
929 if (mds_rank) {
930 return mds_rank->ms_dispatch(m);
931 } else {
932 return false;
933 }
934 }
935
936 /*
937 * high priority messages we always process
938 */
939
940 #define ALLOW_MESSAGES_FROM(peers) \
941 do { \
942 if (m->get_connection() && (m->get_connection()->get_peer_type() & (peers)) == 0) { \
943 dout(0) << __FILE__ << "." << __LINE__ << ": filtered out request, peer=" \
944 << m->get_connection()->get_peer_type() << " allowing=" \
945 << #peers << " message=" << *m << dendl; \
946 return true; \
947 } \
948 } while (0)
949
950 bool MDSDaemon::handle_core_message(const cref_t<Message> &m)
951 {
952 switch (m->get_type()) {
953 case CEPH_MSG_MON_MAP:
954 ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MON);
955 break;
956
957 // MDS
958 case CEPH_MSG_MDS_MAP:
959 ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MON | CEPH_ENTITY_TYPE_MDS);
960 handle_mds_map(ref_cast<MMDSMap>(m));
961 break;
962
963 case MSG_REMOVE_SNAPS:
964 ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MON);
965 mds_rank->snapserver->handle_remove_snaps(ref_cast<MRemoveSnaps>(m));
966 break;
967
968 // OSD
969 case MSG_COMMAND:
970 handle_command(ref_cast<MCommand>(m));
971 break;
972 case CEPH_MSG_OSD_MAP:
973 ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MON | CEPH_ENTITY_TYPE_OSD);
974
975 if (mds_rank) {
976 mds_rank->handle_osd_map();
977 }
978 break;
979
980 case MSG_MON_COMMAND:
981 ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MON);
982 clog->warn() << "dropping `mds tell` command from legacy monitor";
983 break;
984
985 default:
986 return false;
987 }
988 return true;
989 }
990
991 void MDSDaemon::ms_handle_connect(Connection *con)
992 {
993 }
994
995 bool MDSDaemon::ms_handle_reset(Connection *con)
996 {
997 if (con->get_peer_type() != CEPH_ENTITY_TYPE_CLIENT)
998 return false;
999
1000 std::lock_guard l(mds_lock);
1001 if (stopping) {
1002 return false;
1003 }
1004 dout(5) << "ms_handle_reset on " << con->get_peer_socket_addr() << dendl;
1005 if (beacon.get_want_state() == CEPH_MDS_STATE_DNE)
1006 return false;
1007
1008 auto priv = con->get_priv();
1009 if (auto session = static_cast<Session *>(priv.get()); session) {
1010 if (session->is_closed()) {
1011 dout(3) << "ms_handle_reset closing connection for session " << session->info.inst << dendl;
1012 con->mark_down();
1013 con->set_priv(nullptr);
1014 }
1015 } else {
1016 con->mark_down();
1017 }
1018 return false;
1019 }
1020
1021
1022 void MDSDaemon::ms_handle_remote_reset(Connection *con)
1023 {
1024 if (con->get_peer_type() != CEPH_ENTITY_TYPE_CLIENT)
1025 return;
1026
1027 std::lock_guard l(mds_lock);
1028 if (stopping) {
1029 return;
1030 }
1031
1032 dout(5) << "ms_handle_remote_reset on " << con->get_peer_socket_addr() << dendl;
1033 if (beacon.get_want_state() == CEPH_MDS_STATE_DNE)
1034 return;
1035
1036 auto priv = con->get_priv();
1037 if (auto session = static_cast<Session *>(priv.get()); session) {
1038 if (session->is_closed()) {
1039 dout(3) << "ms_handle_remote_reset closing connection for session " << session->info.inst << dendl;
1040 con->mark_down();
1041 con->set_priv(nullptr);
1042 }
1043 }
1044 }
1045
1046 bool MDSDaemon::ms_handle_refused(Connection *con)
1047 {
1048 // do nothing for now
1049 return false;
1050 }
1051
1052 bool MDSDaemon::parse_caps(const AuthCapsInfo& info, MDSAuthCaps& caps)
1053 {
1054 caps.clear();
1055 if (info.allow_all) {
1056 caps.set_allow_all();
1057 return true;
1058 } else {
1059 auto it = info.caps.begin();
1060 string auth_cap_str;
1061 try {
1062 decode(auth_cap_str, it);
1063 } catch (const buffer::error& e) {
1064 dout(1) << __func__ << ": cannot decode auth caps buffer of length " << info.caps.length() << dendl;
1065 return false;
1066 }
1067
1068 dout(10) << __func__ << ": parsing auth_cap_str='" << auth_cap_str << "'" << dendl;
1069 CachedStackStringStream cs;
1070 if (caps.parse(g_ceph_context, auth_cap_str, cs.get())) {
1071 return true;
1072 } else {
1073 dout(1) << __func__ << ": auth cap parse error: " << cs->strv() << " parsing '" << auth_cap_str << "'" << dendl;
1074 return false;
1075 }
1076 }
1077 }
1078
1079 int MDSDaemon::ms_handle_authentication(Connection *con)
1080 {
1081 /* N.B. without mds_lock! */
1082 MDSAuthCaps caps;
1083 return parse_caps(con->get_peer_caps_info(), caps) ? 0 : -1;
1084 }
1085
1086 void MDSDaemon::ms_handle_accept(Connection *con)
1087 {
1088 entity_name_t n(con->get_peer_type(), con->get_peer_global_id());
1089 std::lock_guard l(mds_lock);
1090 if (stopping) {
1091 return;
1092 }
1093
1094 // We allow connections and assign Session instances to connections
1095 // even if we have not been assigned a rank, because clients with
1096 // "allow *" are allowed to connect and do 'tell' operations before
1097 // we have a rank.
1098 Session *s = NULL;
1099 if (mds_rank) {
1100 // If we do hold a rank, see if this is an existing client establishing
1101 // a new connection, rather than a new client
1102 s = mds_rank->sessionmap.get_session(n);
1103 }
1104
1105 // Wire up a Session* to this connection
1106 // It doesn't go into a SessionMap instance until it sends an explicit
1107 // request to open a session (initial state of Session is `closed`)
1108 if (!s) {
1109 s = new Session(con);
1110 dout(10) << " new session " << s << " for " << s->info.inst
1111 << " con " << con << dendl;
1112 con->set_priv(RefCountedPtr{s, false});
1113 if (mds_rank) {
1114 mds_rank->kick_waiters_for_any_client_connection();
1115 }
1116 } else {
1117 dout(10) << " existing session " << s << " for " << s->info.inst
1118 << " existing con " << s->get_connection()
1119 << ", new/authorizing con " << con << dendl;
1120 con->set_priv(RefCountedPtr{s});
1121 }
1122
1123 parse_caps(con->get_peer_caps_info(), s->auth_caps);
1124
1125 dout(10) << "ms_handle_accept " << con->get_peer_socket_addr() << " con " << con << " session " << s << dendl;
1126 if (s) {
1127 if (s->get_connection() != con) {
1128 dout(10) << " session connection " << s->get_connection()
1129 << " -> " << con << dendl;
1130 s->set_connection(con);
1131
1132 // send out any queued messages
1133 while (!s->preopen_out_queue.empty()) {
1134 con->send_message2(s->preopen_out_queue.front());
1135 s->preopen_out_queue.pop_front();
1136 }
1137 }
1138 }
1139 }
1140
1141 bool MDSDaemon::is_clean_shutdown()
1142 {
1143 if (mds_rank) {
1144 return mds_rank->is_stopped();
1145 } else {
1146 return true;
1147 }
1148 }