]> git.proxmox.com Git - ceph.git/blob - ceph/src/mgr/Mgr.cc
d7aebd8fceca8a24cc26e04c7e6ead6ecf90e31b
[ceph.git] / ceph / src / mgr / Mgr.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2016 John Spray <john.spray@redhat.com>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 */
13
14 #include <Python.h>
15
16 #include "osdc/Objecter.h"
17 #include "client/Client.h"
18 #include "common/errno.h"
19 #include "mon/MonClient.h"
20 #include "include/stringify.h"
21 #include "global/global_context.h"
22 #include "global/signal_handler.h"
23
24 #ifdef WITH_LIBCEPHSQLITE
25 # include <sqlite3.h>
26 # include "include/libcephsqlite.h"
27 #endif
28
29 #include "mgr/MgrContext.h"
30
31 #include "DaemonServer.h"
32 #include "messages/MMgrDigest.h"
33 #include "messages/MCommand.h"
34 #include "messages/MCommandReply.h"
35 #include "messages/MLog.h"
36 #include "messages/MServiceMap.h"
37 #include "messages/MKVData.h"
38 #include "PyModule.h"
39 #include "Mgr.h"
40
41 #define dout_context g_ceph_context
42 #define dout_subsys ceph_subsys_mgr
43 #undef dout_prefix
44 #define dout_prefix *_dout << "mgr " << __func__ << " "
45
46 using namespace std::literals;
47
48 using std::map;
49 using std::ostringstream;
50 using std::string;
51
52 Mgr::Mgr(MonClient *monc_, const MgrMap& mgrmap,
53 PyModuleRegistry *py_module_registry_,
54 Messenger *clientm_, Objecter *objecter_,
55 Client* client_, LogChannelRef clog_, LogChannelRef audit_clog_) :
56 monc(monc_),
57 objecter(objecter_),
58 client(client_),
59 client_messenger(clientm_),
60 finisher(g_ceph_context, "Mgr", "mgr-fin"),
61 digest_received(false),
62 py_module_registry(py_module_registry_),
63 cluster_state(monc, nullptr, mgrmap),
64 server(monc, finisher, daemon_state, cluster_state, *py_module_registry,
65 clog_, audit_clog_),
66 clog(clog_),
67 audit_clog(audit_clog_),
68 initialized(false),
69 initializing(false)
70 {
71 cluster_state.set_objecter(objecter);
72 }
73
74
75 Mgr::~Mgr()
76 {
77 }
78
79 void MetadataUpdate::finish(int r)
80 {
81 daemon_state.clear_updating(key);
82 if (r == 0) {
83 if (key.type == "mds" || key.type == "osd" ||
84 key.type == "mgr" || key.type == "mon") {
85 json_spirit::mValue json_result;
86 bool read_ok = json_spirit::read(
87 outbl.to_str(), json_result);
88 if (!read_ok) {
89 dout(1) << "mon returned invalid JSON for " << key << dendl;
90 return;
91 }
92 if (json_result.type() != json_spirit::obj_type) {
93 dout(1) << "mon returned valid JSON " << key
94 << " but not an object: '" << outbl.to_str() << "'" << dendl;
95 return;
96 }
97 dout(4) << "mon returned valid metadata JSON for " << key << dendl;
98
99 json_spirit::mObject daemon_meta = json_result.get_obj();
100
101 // Skip daemon who doesn't have hostname yet
102 if (daemon_meta.count("hostname") == 0) {
103 dout(1) << "Skipping incomplete metadata entry for " << key << dendl;
104 return;
105 }
106
107 // Apply any defaults
108 for (const auto &i : defaults) {
109 if (daemon_meta.find(i.first) == daemon_meta.end()) {
110 daemon_meta[i.first] = i.second;
111 }
112 }
113
114 if (daemon_state.exists(key)) {
115 DaemonStatePtr state = daemon_state.get(key);
116 map<string,string> m;
117 {
118 std::lock_guard l(state->lock);
119 state->hostname = daemon_meta.at("hostname").get_str();
120
121 if (key.type == "mds" || key.type == "mgr" || key.type == "mon") {
122 daemon_meta.erase("name");
123 } else if (key.type == "osd") {
124 daemon_meta.erase("id");
125 }
126 daemon_meta.erase("hostname");
127 for (const auto &[key, val] : daemon_meta) {
128 m.emplace(key, val.get_str());
129 }
130 }
131 daemon_state.update_metadata(state, m);
132 } else {
133 auto state = std::make_shared<DaemonState>(daemon_state.types);
134 state->key = key;
135 state->hostname = daemon_meta.at("hostname").get_str();
136
137 if (key.type == "mds" || key.type == "mgr" || key.type == "mon") {
138 daemon_meta.erase("name");
139 } else if (key.type == "osd") {
140 daemon_meta.erase("id");
141 }
142 daemon_meta.erase("hostname");
143
144 map<string,string> m;
145 for (const auto &[key, val] : daemon_meta) {
146 m.emplace(key, val.get_str());
147 }
148 state->set_metadata(m);
149
150 daemon_state.insert(state);
151 }
152 } else {
153 ceph_abort();
154 }
155 } else {
156 dout(1) << "mon failed to return metadata for " << key
157 << ": " << cpp_strerror(r) << dendl;
158 }
159 }
160
161 void Mgr::background_init(Context *completion)
162 {
163 std::lock_guard l(lock);
164 ceph_assert(!initializing);
165 ceph_assert(!initialized);
166 initializing = true;
167
168 finisher.start();
169
170 finisher.queue(new LambdaContext([this, completion](int r){
171 init();
172 completion->complete(0);
173 }));
174 }
175
176 std::map<std::string, std::string> Mgr::load_store()
177 {
178 ceph_assert(ceph_mutex_is_locked_by_me(lock));
179
180 dout(10) << "listing keys" << dendl;
181 JSONCommand cmd;
182 cmd.run(monc, "{\"prefix\": \"config-key ls\"}");
183 lock.unlock();
184 cmd.wait();
185 lock.lock();
186 ceph_assert(cmd.r == 0);
187
188 std::map<std::string, std::string> loaded;
189
190 for (auto &key_str : cmd.json_result.get_array()) {
191 std::string const key = key_str.get_str();
192
193 dout(20) << "saw key '" << key << "'" << dendl;
194
195 const std::string store_prefix = PyModule::mgr_store_prefix;
196 const std::string device_prefix = "device/";
197
198 if (key.substr(0, device_prefix.size()) == device_prefix ||
199 key.substr(0, store_prefix.size()) == store_prefix) {
200 dout(20) << "fetching '" << key << "'" << dendl;
201 Command get_cmd;
202 std::ostringstream cmd_json;
203 cmd_json << "{\"prefix\": \"config-key get\", \"key\": \"" << key << "\"}";
204 get_cmd.run(monc, cmd_json.str());
205 lock.unlock();
206 get_cmd.wait();
207 lock.lock();
208 if (get_cmd.r == 0) { // tolerate racing config-key change
209 loaded[key] = get_cmd.outbl.to_str();
210 }
211 }
212 }
213
214 return loaded;
215 }
216
217 void Mgr::handle_signal(int signum)
218 {
219 ceph_assert(signum == SIGINT || signum == SIGTERM);
220 shutdown();
221 }
222
223 static void handle_mgr_signal(int signum)
224 {
225 derr << " *** Got signal " << sig_str(signum) << " ***" << dendl;
226
227 // The python modules don't reliably shut down, so don't even
228 // try. The mon will blocklist us (and all of our rados/cephfs
229 // clients) anyway. Just exit!
230
231 _exit(0); // exit with 0 result code, as if we had done an orderly shutdown
232 }
233
234 void Mgr::init()
235 {
236 std::unique_lock l(lock);
237 ceph_assert(initializing);
238 ceph_assert(!initialized);
239
240 // Enable signal handlers
241 register_async_signal_handler_oneshot(SIGINT, handle_mgr_signal);
242 register_async_signal_handler_oneshot(SIGTERM, handle_mgr_signal);
243
244 // Only pacific+ monitors support subscribe to kv updates
245 bool mon_allows_kv_sub = false;
246 monc->with_monmap(
247 [&](const MonMap &monmap) {
248 if (monmap.get_required_features().contains_all(
249 ceph::features::mon::FEATURE_PACIFIC)) {
250 mon_allows_kv_sub = true;
251 }
252 });
253 if (!mon_allows_kv_sub) {
254 // mons are still pre-pacific. wait long enough to ensure our
255 // next beacon is processed so that our module options are
256 // propagated. See https://tracker.ceph.com/issues/49778
257 lock.unlock();
258 dout(10) << "waiting a bit for the pre-pacific mon to process our beacon" << dendl;
259 sleep(g_conf().get_val<std::chrono::seconds>("mgr_tick_period").count() * 3);
260 lock.lock();
261 }
262
263 // subscribe to all the maps
264 monc->sub_want("log-info", 0, 0);
265 monc->sub_want("mgrdigest", 0, 0);
266 monc->sub_want("fsmap", 0, 0);
267 monc->sub_want("servicemap", 0, 0);
268 if (mon_allows_kv_sub) {
269 monc->sub_want("kv:config/", 0, 0);
270 monc->sub_want("kv:mgr/", 0, 0);
271 monc->sub_want("kv:device/", 0, 0);
272 }
273
274 dout(4) << "waiting for OSDMap..." << dendl;
275 // Subscribe to OSDMap update to pass on to ClusterState
276 objecter->maybe_request_map();
277
278 // reset the mon session. we get these maps through subscriptions which
279 // are stateful with the connection, so even if *we* don't have them a
280 // previous incarnation sharing the same MonClient may have.
281 monc->reopen_session();
282
283 // Start Objecter and wait for OSD map
284 lock.unlock(); // Drop lock because OSDMap dispatch calls into my ms_dispatch
285 epoch_t e;
286 cluster_state.with_mgrmap([&e](const MgrMap& m) {
287 e = m.last_failure_osd_epoch;
288 });
289 /* wait for any blocklists to be applied to previous mgr instance */
290 dout(4) << "Waiting for new OSDMap (e=" << e
291 << ") that may blocklist prior active." << dendl;
292 objecter->wait_for_osd_map(e);
293 lock.lock();
294
295 // Start communicating with daemons to learn statistics etc
296 int r = server.init(monc->get_global_id(), client_messenger->get_myaddrs());
297 if (r < 0) {
298 derr << "Initialize server fail: " << cpp_strerror(r) << dendl;
299 // This is typically due to a bind() failure, so let's let
300 // systemd restart us.
301 exit(1);
302 }
303 dout(4) << "Initialized server at " << server.get_myaddrs() << dendl;
304
305 // Preload all daemon metadata (will subsequently keep this
306 // up to date by watching maps, so do the initial load before
307 // we subscribe to any maps)
308 dout(4) << "Loading daemon metadata..." << dendl;
309 load_all_metadata();
310
311 // Populate PGs in ClusterState
312 cluster_state.with_osdmap_and_pgmap([this](const OSDMap &osd_map,
313 const PGMap& pg_map) {
314 cluster_state.notify_osdmap(osd_map);
315 });
316
317 // Wait for FSMap
318 dout(4) << "waiting for FSMap..." << dendl;
319 fs_map_cond.wait(l, [this] { return cluster_state.have_fsmap();});
320
321 // Wait for MgrDigest...
322 dout(4) << "waiting for MgrDigest..." << dendl;
323 digest_cond.wait(l, [this] { return digest_received; });
324
325 if (!mon_allows_kv_sub) {
326 dout(4) << "loading config-key data from pre-pacific mon cluster..." << dendl;
327 pre_init_store = load_store();
328 }
329
330 dout(4) << "initializing device state..." << dendl;
331 // Note: we only have to do this during startup because once we are
332 // active the only changes to this state will originate from one of our
333 // own modules.
334 for (auto p = pre_init_store.lower_bound("device/");
335 p != pre_init_store.end() && p->first.find("device/") == 0;
336 ++p) {
337 string devid = p->first.substr(7);
338 dout(10) << " updating " << devid << dendl;
339 map<string,string> meta;
340 ostringstream ss;
341 int r = get_json_str_map(p->second, ss, &meta, false);
342 if (r < 0) {
343 derr << __func__ << " failed to parse " << p->second << ": " << ss.str()
344 << dendl;
345 } else {
346 daemon_state.with_device_create(
347 devid, [&meta] (DeviceState& dev) {
348 dev.set_metadata(std::move(meta));
349 });
350 }
351 }
352
353 // assume finisher already initialized in background_init
354 dout(4) << "starting python modules..." << dendl;
355 py_module_registry->active_start(
356 daemon_state, cluster_state,
357 pre_init_store, mon_allows_kv_sub,
358 *monc, clog, audit_clog, *objecter, *client,
359 finisher, server);
360
361 cluster_state.final_init();
362
363 AdminSocket *admin_socket = g_ceph_context->get_admin_socket();
364 r = admin_socket->register_command(
365 "mgr_status", this,
366 "Dump mgr status");
367 ceph_assert(r == 0);
368
369 #ifdef WITH_LIBCEPHSQLITE
370 dout(4) << "Using sqlite3 version: " << sqlite3_libversion() << dendl;
371 /* See libcephsqlite.h for rationale of this code. */
372 sqlite3_auto_extension((void (*)())sqlite3_cephsqlite_init);
373 {
374 sqlite3* db = nullptr;
375 if (int rc = sqlite3_open_v2(":memory:", &db, SQLITE_OPEN_READWRITE, nullptr); rc == SQLITE_OK) {
376 sqlite3_close(db);
377 } else {
378 derr << "could not open sqlite3: " << rc << dendl;
379 ceph_abort();
380 }
381 }
382 {
383 char *ident = nullptr;
384 if (int rc = cephsqlite_setcct(g_ceph_context, &ident); rc < 0) {
385 derr << "could not set libcephsqlite cct: " << rc << dendl;
386 ceph_abort();
387 }
388 entity_addrvec_t addrv;
389 addrv.parse(ident);
390 ident = (char*)realloc(ident, 0);
391 py_module_registry->register_client("libcephsqlite", addrv);
392 }
393 #endif
394
395 dout(4) << "Complete." << dendl;
396 initializing = false;
397 initialized = true;
398 }
399
400 void Mgr::load_all_metadata()
401 {
402 ceph_assert(ceph_mutex_is_locked_by_me(lock));
403
404 JSONCommand mds_cmd;
405 mds_cmd.run(monc, "{\"prefix\": \"mds metadata\"}");
406 JSONCommand osd_cmd;
407 osd_cmd.run(monc, "{\"prefix\": \"osd metadata\"}");
408 JSONCommand mon_cmd;
409 mon_cmd.run(monc, "{\"prefix\": \"mon metadata\"}");
410
411 lock.unlock();
412 mds_cmd.wait();
413 osd_cmd.wait();
414 mon_cmd.wait();
415 lock.lock();
416
417 ceph_assert(mds_cmd.r == 0);
418 ceph_assert(mon_cmd.r == 0);
419 ceph_assert(osd_cmd.r == 0);
420
421 for (auto &metadata_val : mds_cmd.json_result.get_array()) {
422 json_spirit::mObject daemon_meta = metadata_val.get_obj();
423 if (daemon_meta.count("hostname") == 0) {
424 dout(1) << "Skipping incomplete metadata entry" << dendl;
425 continue;
426 }
427
428 DaemonStatePtr dm = std::make_shared<DaemonState>(daemon_state.types);
429 dm->key = DaemonKey{"mds",
430 daemon_meta.at("name").get_str()};
431 dm->hostname = daemon_meta.at("hostname").get_str();
432
433 daemon_meta.erase("name");
434 daemon_meta.erase("hostname");
435
436 for (const auto &[key, val] : daemon_meta) {
437 dm->metadata.emplace(key, val.get_str());
438 }
439
440 daemon_state.insert(dm);
441 }
442
443 for (auto &metadata_val : mon_cmd.json_result.get_array()) {
444 json_spirit::mObject daemon_meta = metadata_val.get_obj();
445 if (daemon_meta.count("hostname") == 0) {
446 dout(1) << "Skipping incomplete metadata entry" << dendl;
447 continue;
448 }
449
450 DaemonStatePtr dm = std::make_shared<DaemonState>(daemon_state.types);
451 dm->key = DaemonKey{"mon",
452 daemon_meta.at("name").get_str()};
453 dm->hostname = daemon_meta.at("hostname").get_str();
454
455 daemon_meta.erase("name");
456 daemon_meta.erase("hostname");
457
458 map<string,string> m;
459 for (const auto &[key, val] : daemon_meta) {
460 m.emplace(key, val.get_str());
461 }
462 dm->set_metadata(m);
463
464 daemon_state.insert(dm);
465 }
466
467 for (auto &osd_metadata_val : osd_cmd.json_result.get_array()) {
468 json_spirit::mObject osd_metadata = osd_metadata_val.get_obj();
469 if (osd_metadata.count("hostname") == 0) {
470 dout(1) << "Skipping incomplete metadata entry" << dendl;
471 continue;
472 }
473 dout(4) << osd_metadata.at("hostname").get_str() << dendl;
474
475 DaemonStatePtr dm = std::make_shared<DaemonState>(daemon_state.types);
476 dm->key = DaemonKey{"osd",
477 stringify(osd_metadata.at("id").get_int())};
478 dm->hostname = osd_metadata.at("hostname").get_str();
479
480 osd_metadata.erase("id");
481 osd_metadata.erase("hostname");
482
483 map<string,string> m;
484 for (const auto &i : osd_metadata) {
485 m[i.first] = i.second.get_str();
486 }
487 dm->set_metadata(m);
488
489 daemon_state.insert(dm);
490 }
491 }
492
493
494 void Mgr::shutdown()
495 {
496 dout(10) << "mgr shutdown init" << dendl;
497 finisher.queue(new LambdaContext([&](int) {
498 {
499 std::lock_guard l(lock);
500 // First stop the server so that we're not taking any more incoming
501 // requests
502 server.shutdown();
503 }
504 // after the messenger is stopped, signal modules to shutdown via finisher
505 py_module_registry->active_shutdown();
506 }));
507
508 // Then stop the finisher to ensure its enqueued contexts aren't going
509 // to touch references to the things we're about to tear down
510 finisher.wait_for_empty();
511 finisher.stop();
512 }
513
514 void Mgr::handle_osd_map()
515 {
516 ceph_assert(ceph_mutex_is_locked_by_me(lock));
517
518 std::set<std::string> names_exist;
519
520 /**
521 * When we see a new OSD map, inspect the entity addrs to
522 * see if they have changed (service restart), and if so
523 * reload the metadata.
524 */
525 cluster_state.with_osdmap_and_pgmap([this, &names_exist](const OSDMap &osd_map,
526 const PGMap &pg_map) {
527 for (int osd_id = 0; osd_id < osd_map.get_max_osd(); ++osd_id) {
528 if (!osd_map.exists(osd_id)) {
529 continue;
530 }
531
532 // Remember which OSDs exist so that we can cull any that don't
533 names_exist.insert(stringify(osd_id));
534
535 // Consider whether to update the daemon metadata (new/restarted daemon)
536 const auto k = DaemonKey{"osd", std::to_string(osd_id)};
537 if (daemon_state.is_updating(k)) {
538 continue;
539 }
540
541 bool update_meta = false;
542 if (daemon_state.exists(k)) {
543 if (osd_map.get_up_from(osd_id) == osd_map.get_epoch()) {
544 dout(4) << "Mgr::handle_osd_map: osd." << osd_id
545 << " joined cluster at " << "e" << osd_map.get_epoch()
546 << dendl;
547 update_meta = true;
548 }
549 } else {
550 update_meta = true;
551 }
552 if (update_meta) {
553 auto c = new MetadataUpdate(daemon_state, k);
554 std::ostringstream cmd;
555 cmd << "{\"prefix\": \"osd metadata\", \"id\": "
556 << osd_id << "}";
557 monc->start_mon_command(
558 {cmd.str()},
559 {}, &c->outbl, &c->outs, c);
560 }
561 }
562
563 cluster_state.notify_osdmap(osd_map);
564 });
565
566 // TODO: same culling for MonMap
567 daemon_state.cull("osd", names_exist);
568 }
569
570 void Mgr::handle_log(ref_t<MLog> m)
571 {
572 for (const auto &e : m->entries) {
573 py_module_registry->notify_all(e);
574 }
575 }
576
577 void Mgr::handle_service_map(ref_t<MServiceMap> m)
578 {
579 dout(10) << "e" << m->service_map.epoch << dendl;
580 monc->sub_got("servicemap", m->service_map.epoch);
581 cluster_state.set_service_map(m->service_map);
582 server.got_service_map();
583 }
584
585 void Mgr::handle_mon_map()
586 {
587 dout(20) << __func__ << dendl;
588 assert(ceph_mutex_is_locked_by_me(lock));
589 std::set<std::string> names_exist;
590 cluster_state.with_monmap([&] (auto &monmap) {
591 for (unsigned int i = 0; i < monmap.size(); i++) {
592 names_exist.insert(monmap.get_name(i));
593 }
594 });
595 for (const auto& name : names_exist) {
596 const auto k = DaemonKey{"mon", name};
597 if (daemon_state.is_updating(k)) {
598 continue;
599 }
600 auto c = new MetadataUpdate(daemon_state, k);
601 const char* cmd = R"({{"prefix": "mon metadata", "id": "{}"}})";
602 monc->start_mon_command({fmt::format(cmd, name)}, {},
603 &c->outbl, &c->outs, c);
604 }
605 daemon_state.cull("mon", names_exist);
606 }
607
608 bool Mgr::ms_dispatch2(const ref_t<Message>& m)
609 {
610 dout(10) << *m << dendl;
611 std::lock_guard l(lock);
612
613 switch (m->get_type()) {
614 case MSG_MGR_DIGEST:
615 handle_mgr_digest(ref_cast<MMgrDigest>(m));
616 break;
617 case CEPH_MSG_MON_MAP:
618 py_module_registry->notify_all("mon_map", "");
619 handle_mon_map();
620 break;
621 case CEPH_MSG_FS_MAP:
622 py_module_registry->notify_all("fs_map", "");
623 handle_fs_map(ref_cast<MFSMap>(m));
624 return false; // I shall let this pass through for Client
625 case CEPH_MSG_OSD_MAP:
626 handle_osd_map();
627
628 py_module_registry->notify_all("osd_map", "");
629
630 // Continuous subscribe, so that we can generate notifications
631 // for our MgrPyModules
632 objecter->maybe_request_map();
633 break;
634 case MSG_SERVICE_MAP:
635 handle_service_map(ref_cast<MServiceMap>(m));
636 //no users: py_module_registry->notify_all("service_map", "");
637 break;
638 case MSG_LOG:
639 handle_log(ref_cast<MLog>(m));
640 break;
641 case MSG_KV_DATA:
642 {
643 auto msg = ref_cast<MKVData>(m);
644 monc->sub_got("kv:"s + msg->prefix, msg->version);
645 if (!msg->data.empty()) {
646 if (initialized) {
647 py_module_registry->update_kv_data(
648 msg->prefix,
649 msg->incremental,
650 msg->data
651 );
652 } else {
653 // before we have created the ActivePyModules, we need to
654 // track the store regions we're monitoring
655 if (!msg->incremental) {
656 dout(10) << "full update on " << msg->prefix << dendl;
657 auto p = pre_init_store.lower_bound(msg->prefix);
658 while (p != pre_init_store.end() && p->first.find(msg->prefix) == 0) {
659 dout(20) << " rm prior " << p->first << dendl;
660 p = pre_init_store.erase(p);
661 }
662 } else {
663 dout(10) << "incremental update on " << msg->prefix << dendl;
664 }
665 for (auto& i : msg->data) {
666 if (i.second) {
667 dout(20) << " set " << i.first << " = " << i.second->to_str() << dendl;
668 pre_init_store[i.first] = i.second->to_str();
669 } else {
670 dout(20) << " rm " << i.first << dendl;
671 pre_init_store.erase(i.first);
672 }
673 }
674 }
675 }
676 }
677 break;
678
679 default:
680 return false;
681 }
682 return true;
683 }
684
685
686 void Mgr::handle_fs_map(ref_t<MFSMap> m)
687 {
688 ceph_assert(ceph_mutex_is_locked_by_me(lock));
689
690 std::set<std::string> names_exist;
691 const FSMap &new_fsmap = m->get_fsmap();
692
693 monc->sub_got("fsmap", m->epoch);
694
695 fs_map_cond.notify_all();
696
697 // TODO: callers (e.g. from python land) are potentially going to see
698 // the new fsmap before we've bothered populating all the resulting
699 // daemon_state. Maybe we should block python land while we're making
700 // this kind of update?
701
702 cluster_state.set_fsmap(new_fsmap);
703
704 auto mds_info = new_fsmap.get_mds_info();
705 for (const auto &i : mds_info) {
706 const auto &info = i.second;
707
708 if (!new_fsmap.gid_exists(i.first)){
709 continue;
710 }
711
712 // Remember which MDS exists so that we can cull any that don't
713 names_exist.insert(info.name);
714
715 const auto k = DaemonKey{"mds", info.name};
716 if (daemon_state.is_updating(k)) {
717 continue;
718 }
719
720 bool update = false;
721 if (daemon_state.exists(k)) {
722 auto metadata = daemon_state.get(k);
723 std::lock_guard l(metadata->lock);
724 if (metadata->metadata.empty() ||
725 metadata->metadata.count("addr") == 0) {
726 update = true;
727 } else {
728 auto metadata_addrs = metadata->metadata.at("addr");
729 const auto map_addrs = info.addrs;
730 update = metadata_addrs != stringify(map_addrs);
731 if (update) {
732 dout(4) << "MDS[" << info.name << "] addr change " << metadata_addrs
733 << " != " << stringify(map_addrs) << dendl;
734 }
735 }
736 } else {
737 update = true;
738 }
739
740 if (update) {
741 auto c = new MetadataUpdate(daemon_state, k);
742
743 // Older MDS daemons don't have addr in the metadata, so
744 // fake it if the returned metadata doesn't have the field.
745 c->set_default("addr", stringify(info.addrs));
746
747 std::ostringstream cmd;
748 cmd << "{\"prefix\": \"mds metadata\", \"who\": \""
749 << info.name << "\"}";
750 monc->start_mon_command(
751 {cmd.str()},
752 {}, &c->outbl, &c->outs, c);
753 }
754 }
755 daemon_state.cull("mds", names_exist);
756 }
757
758 bool Mgr::got_mgr_map(const MgrMap& m)
759 {
760 std::lock_guard l(lock);
761 dout(10) << m << dendl;
762
763 set<string> old_modules;
764 cluster_state.with_mgrmap([&](const MgrMap& m) {
765 old_modules = m.modules;
766 });
767 if (m.modules != old_modules) {
768 derr << "mgrmap module list changed to (" << m.modules << "), respawn"
769 << dendl;
770 return true;
771 }
772
773 cluster_state.set_mgr_map(m);
774 server.got_mgr_map();
775
776 return false;
777 }
778
779 void Mgr::handle_mgr_digest(ref_t<MMgrDigest> m)
780 {
781 dout(10) << m->mon_status_json.length() << dendl;
782 dout(10) << m->health_json.length() << dendl;
783 cluster_state.load_digest(m.get());
784 //no users: py_module_registry->notify_all("mon_status", "");
785 py_module_registry->notify_all("health", "");
786
787 // Hack: use this as a tick/opportunity to prompt python-land that
788 // the pgmap might have changed since last time we were here.
789 py_module_registry->notify_all("pg_summary", "");
790 dout(10) << "done." << dendl;
791 m.reset();
792
793 if (!digest_received) {
794 digest_received = true;
795 digest_cond.notify_all();
796 }
797 }
798
799 std::map<std::string, std::string> Mgr::get_services() const
800 {
801 std::lock_guard l(lock);
802
803 return py_module_registry->get_services();
804 }
805
806 int Mgr::call(
807 std::string_view admin_command,
808 const cmdmap_t& cmdmap,
809 Formatter *f,
810 std::ostream& errss,
811 bufferlist& out)
812 {
813 try {
814 if (admin_command == "mgr_status") {
815 f->open_object_section("mgr_status");
816 cluster_state.with_mgrmap(
817 [f](const MgrMap& mm) {
818 f->dump_unsigned("mgrmap_epoch", mm.get_epoch());
819 });
820 f->dump_bool("initialized", initialized);
821 f->close_section();
822 return 0;
823 } else {
824 return -ENOSYS;
825 }
826 } catch (const TOPNSPC::common::bad_cmd_get& e) {
827 errss << e.what();
828 return -EINVAL;
829 }
830 return 0;
831 }