1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2016 John Spray <john.spray@redhat.com>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
16 #include "osdc/Objecter.h"
17 #include "client/Client.h"
18 #include "common/errno.h"
19 #include "mon/MonClient.h"
20 #include "include/stringify.h"
21 #include "global/global_context.h"
22 #include "global/signal_handler.h"
24 #include "mgr/MgrContext.h"
26 #include "DaemonServer.h"
27 #include "messages/MMgrDigest.h"
28 #include "messages/MCommand.h"
29 #include "messages/MCommandReply.h"
30 #include "messages/MLog.h"
31 #include "messages/MServiceMap.h"
35 #define dout_context g_ceph_context
36 #define dout_subsys ceph_subsys_mgr
38 #define dout_prefix *_dout << "mgr " << __func__ << " "
41 Mgr::Mgr(MonClient
*monc_
, const MgrMap
& mgrmap
,
42 PyModuleRegistry
*py_module_registry_
,
43 Messenger
*clientm_
, Objecter
*objecter_
,
44 Client
* client_
, LogChannelRef clog_
, LogChannelRef audit_clog_
) :
48 client_messenger(clientm_
),
50 finisher(g_ceph_context
, "Mgr", "mgr-fin"),
51 digest_received(false),
52 py_module_registry(py_module_registry_
),
53 cluster_state(monc
, nullptr, mgrmap
),
54 server(monc
, finisher
, daemon_state
, cluster_state
, *py_module_registry
,
57 audit_clog(audit_clog_
),
61 cluster_state
.set_objecter(objecter
);
69 void MetadataUpdate::finish(int r
)
71 daemon_state
.clear_updating(key
);
73 if (key
.first
== "mds" || key
.first
== "osd" ||
74 key
.first
== "mgr" || key
.first
== "mon") {
75 json_spirit::mValue json_result
;
76 bool read_ok
= json_spirit::read(
77 outbl
.to_str(), json_result
);
79 dout(1) << "mon returned invalid JSON for "
80 << key
.first
<< "." << key
.second
<< dendl
;
83 if (json_result
.type() != json_spirit::obj_type
) {
84 dout(1) << "mon returned valid JSON "
85 << key
.first
<< "." << key
.second
86 << " but not an object: '" << outbl
.to_str() << "'" << dendl
;
89 dout(4) << "mon returned valid metadata JSON for "
90 << key
.first
<< "." << key
.second
<< dendl
;
92 json_spirit::mObject daemon_meta
= json_result
.get_obj();
94 // Skip daemon who doesn't have hostname yet
95 if (daemon_meta
.count("hostname") == 0) {
96 dout(1) << "Skipping incomplete metadata entry for "
97 << key
.first
<< "." << key
.second
<< dendl
;
101 // Apply any defaults
102 for (const auto &i
: defaults
) {
103 if (daemon_meta
.find(i
.first
) == daemon_meta
.end()) {
104 daemon_meta
[i
.first
] = i
.second
;
108 DaemonStatePtr state
;
109 if (daemon_state
.exists(key
)) {
110 state
= daemon_state
.get(key
);
111 if (key
.first
== "mds" || key
.first
== "mgr" || key
.first
== "mon") {
112 daemon_meta
.erase("name");
113 } else if (key
.first
== "osd") {
114 daemon_meta
.erase("id");
116 daemon_meta
.erase("hostname");
117 map
<string
,string
> m
;
118 for (const auto &i
: daemon_meta
) {
119 m
[i
.first
] = i
.second
.get_str();
122 daemon_state
.update_metadata(state
, m
);
124 state
= std::make_shared
<DaemonState
>(daemon_state
.types
);
126 state
->hostname
= daemon_meta
.at("hostname").get_str();
128 if (key
.first
== "mds" || key
.first
== "mgr" || key
.first
== "mon") {
129 daemon_meta
.erase("name");
130 } else if (key
.first
== "osd") {
131 daemon_meta
.erase("id");
133 daemon_meta
.erase("hostname");
135 map
<string
,string
> m
;
136 for (const auto &i
: daemon_meta
) {
137 m
[i
.first
] = i
.second
.get_str();
139 state
->set_metadata(m
);
141 daemon_state
.insert(state
);
147 dout(1) << "mon failed to return metadata for "
148 << key
.first
<< "." << key
.second
<< ": "
149 << cpp_strerror(r
) << dendl
;
153 void Mgr::background_init(Context
*completion
)
155 std::lock_guard
l(lock
);
156 ceph_assert(!initializing
);
157 ceph_assert(!initialized
);
162 finisher
.queue(new FunctionContext([this, completion
](int r
){
164 completion
->complete(0);
168 std::map
<std::string
, std::string
> Mgr::load_store()
170 ceph_assert(lock
.is_locked_by_me());
172 dout(10) << "listing keys" << dendl
;
174 cmd
.run(monc
, "{\"prefix\": \"config-key ls\"}");
178 ceph_assert(cmd
.r
== 0);
180 std::map
<std::string
, std::string
> loaded
;
182 for (auto &key_str
: cmd
.json_result
.get_array()) {
183 std::string
const key
= key_str
.get_str();
185 dout(20) << "saw key '" << key
<< "'" << dendl
;
187 const std::string config_prefix
= PyModule::config_prefix
;
188 const std::string device_prefix
= "device/";
190 if (key
.substr(0, config_prefix
.size()) == config_prefix
||
191 key
.substr(0, device_prefix
.size()) == device_prefix
) {
192 dout(20) << "fetching '" << key
<< "'" << dendl
;
194 std::ostringstream cmd_json
;
195 cmd_json
<< "{\"prefix\": \"config-key get\", \"key\": \"" << key
<< "\"}";
196 get_cmd
.run(monc
, cmd_json
.str());
200 if (get_cmd
.r
== 0) { // tolerate racing config-key change
201 if (key
.substr(0, device_prefix
.size()) == device_prefix
) {
203 string devid
= key
.substr(device_prefix
.size());
204 map
<string
,string
> meta
;
206 string val
= get_cmd
.outbl
.to_str();
207 int r
= get_json_str_map(val
, ss
, &meta
, false);
209 derr
<< __func__
<< " failed to parse " << val
<< ": " << ss
.str()
212 daemon_state
.with_device_create(
213 devid
, [&meta
] (DeviceState
& dev
) {
214 dev
.set_metadata(std::move(meta
));
219 loaded
[key
] = get_cmd
.outbl
.to_str();
230 std::lock_guard
l(lock
);
231 ceph_assert(initializing
);
232 ceph_assert(!initialized
);
234 // Start communicating with daemons to learn statistics etc
235 int r
= server
.init(monc
->get_global_id(), client_messenger
->get_myaddrs());
237 derr
<< "Initialize server fail: " << cpp_strerror(r
) << dendl
;
238 // This is typically due to a bind() failure, so let's let
239 // systemd restart us.
242 dout(4) << "Initialized server at " << server
.get_myaddrs() << dendl
;
244 // Preload all daemon metadata (will subsequently keep this
245 // up to date by watching maps, so do the initial load before
246 // we subscribe to any maps)
247 dout(4) << "Loading daemon metadata..." << dendl
;
250 // subscribe to all the maps
251 monc
->sub_want("log-info", 0, 0);
252 monc
->sub_want("mgrdigest", 0, 0);
253 monc
->sub_want("fsmap", 0, 0);
254 monc
->sub_want("servicemap", 0, 0);
256 dout(4) << "waiting for OSDMap..." << dendl
;
257 // Subscribe to OSDMap update to pass on to ClusterState
258 objecter
->maybe_request_map();
260 // reset the mon session. we get these maps through subscriptions which
261 // are stateful with the connection, so even if *we* don't have them a
262 // previous incarnation sharing the same MonClient may have.
263 monc
->reopen_session();
265 // Start Objecter and wait for OSD map
266 lock
.Unlock(); // Drop lock because OSDMap dispatch calls into my ms_dispatch
267 objecter
->wait_for_osd_map();
270 // Populate PGs in ClusterState
271 cluster_state
.with_osdmap_and_pgmap([this](const OSDMap
&osd_map
,
272 const PGMap
& pg_map
) {
273 cluster_state
.notify_osdmap(osd_map
);
277 dout(4) << "waiting for FSMap..." << dendl
;
278 while (!cluster_state
.have_fsmap()) {
279 fs_map_cond
.Wait(lock
);
282 dout(4) << "waiting for config-keys..." << dendl
;
284 // Wait for MgrDigest...
285 dout(4) << "waiting for MgrDigest..." << dendl
;
286 while (!digest_received
) {
287 digest_cond
.Wait(lock
);
290 // Load module KV store
291 auto kv_store
= load_store();
293 // Migrate config from KV store on luminous->mimic
294 // drop lock because we do blocking config sets to mon
296 py_module_registry
->upgrade_config(monc
, kv_store
);
299 // assume finisher already initialized in background_init
300 dout(4) << "starting python modules..." << dendl
;
301 py_module_registry
->active_start(daemon_state
, cluster_state
,
302 kv_store
, *monc
, clog
, audit_clog
, *objecter
, *client
,
305 dout(4) << "Complete." << dendl
;
306 initializing
= false;
310 void Mgr::load_all_metadata()
312 ceph_assert(lock
.is_locked_by_me());
315 mds_cmd
.run(monc
, "{\"prefix\": \"mds metadata\"}");
317 osd_cmd
.run(monc
, "{\"prefix\": \"osd metadata\"}");
319 mon_cmd
.run(monc
, "{\"prefix\": \"mon metadata\"}");
327 ceph_assert(mds_cmd
.r
== 0);
328 ceph_assert(mon_cmd
.r
== 0);
329 ceph_assert(osd_cmd
.r
== 0);
331 for (auto &metadata_val
: mds_cmd
.json_result
.get_array()) {
332 json_spirit::mObject daemon_meta
= metadata_val
.get_obj();
333 if (daemon_meta
.count("hostname") == 0) {
334 dout(1) << "Skipping incomplete metadata entry" << dendl
;
338 DaemonStatePtr dm
= std::make_shared
<DaemonState
>(daemon_state
.types
);
339 dm
->key
= DaemonKey("mds",
340 daemon_meta
.at("name").get_str());
341 dm
->hostname
= daemon_meta
.at("hostname").get_str();
343 daemon_meta
.erase("name");
344 daemon_meta
.erase("hostname");
346 for (const auto &i
: daemon_meta
) {
347 dm
->metadata
[i
.first
] = i
.second
.get_str();
350 daemon_state
.insert(dm
);
353 for (auto &metadata_val
: mon_cmd
.json_result
.get_array()) {
354 json_spirit::mObject daemon_meta
= metadata_val
.get_obj();
355 if (daemon_meta
.count("hostname") == 0) {
356 dout(1) << "Skipping incomplete metadata entry" << dendl
;
360 DaemonStatePtr dm
= std::make_shared
<DaemonState
>(daemon_state
.types
);
361 dm
->key
= DaemonKey("mon",
362 daemon_meta
.at("name").get_str());
363 dm
->hostname
= daemon_meta
.at("hostname").get_str();
365 daemon_meta
.erase("name");
366 daemon_meta
.erase("hostname");
368 map
<string
,string
> m
;
369 for (const auto &i
: daemon_meta
) {
370 m
[i
.first
] = i
.second
.get_str();
374 daemon_state
.insert(dm
);
377 for (auto &osd_metadata_val
: osd_cmd
.json_result
.get_array()) {
378 json_spirit::mObject osd_metadata
= osd_metadata_val
.get_obj();
379 if (osd_metadata
.count("hostname") == 0) {
380 dout(1) << "Skipping incomplete metadata entry" << dendl
;
383 dout(4) << osd_metadata
.at("hostname").get_str() << dendl
;
385 DaemonStatePtr dm
= std::make_shared
<DaemonState
>(daemon_state
.types
);
386 dm
->key
= DaemonKey("osd",
387 stringify(osd_metadata
.at("id").get_int()));
388 dm
->hostname
= osd_metadata
.at("hostname").get_str();
390 osd_metadata
.erase("id");
391 osd_metadata
.erase("hostname");
393 map
<string
,string
> m
;
394 for (const auto &i
: osd_metadata
) {
395 m
[i
.first
] = i
.second
.get_str();
399 daemon_state
.insert(dm
);
406 finisher
.queue(new FunctionContext([&](int) {
408 std::lock_guard
l(lock
);
409 monc
->sub_unwant("log-info");
410 monc
->sub_unwant("mgrdigest");
411 monc
->sub_unwant("fsmap");
412 // First stop the server so that we're not taking any more incoming
416 // after the messenger is stopped, signal modules to shutdown via finisher
417 py_module_registry
->active_shutdown();
420 // Then stop the finisher to ensure its enqueued contexts aren't going
421 // to touch references to the things we're about to tear down
422 finisher
.wait_for_empty();
426 void Mgr::handle_osd_map()
428 ceph_assert(lock
.is_locked_by_me());
430 std::set
<std::string
> names_exist
;
433 * When we see a new OSD map, inspect the entity addrs to
434 * see if they have changed (service restart), and if so
435 * reload the metadata.
437 cluster_state
.with_osdmap_and_pgmap([this, &names_exist
](const OSDMap
&osd_map
,
438 const PGMap
&pg_map
) {
439 for (int osd_id
= 0; osd_id
< osd_map
.get_max_osd(); ++osd_id
) {
440 if (!osd_map
.exists(osd_id
)) {
444 // Remember which OSDs exist so that we can cull any that don't
445 names_exist
.insert(stringify(osd_id
));
447 // Consider whether to update the daemon metadata (new/restarted daemon)
448 bool update_meta
= false;
449 const auto k
= DaemonKey("osd", stringify(osd_id
));
450 if (daemon_state
.is_updating(k
)) {
454 if (daemon_state
.exists(k
)) {
455 auto metadata
= daemon_state
.get(k
);
456 std::lock_guard
l(metadata
->lock
);
457 auto addr_iter
= metadata
->metadata
.find("front_addr");
458 if (addr_iter
!= metadata
->metadata
.end()) {
459 const std::string
&metadata_addr
= addr_iter
->second
;
460 const auto &map_addrs
= osd_map
.get_addrs(osd_id
);
462 if (metadata_addr
!= stringify(map_addrs
)) {
463 dout(4) << "OSD[" << osd_id
<< "] addr change " << metadata_addr
464 << " != " << stringify(map_addrs
) << dendl
;
467 dout(20) << "OSD[" << osd_id
<< "] addr unchanged: "
468 << metadata_addr
<< dendl
;
471 // Awkward case where daemon went into DaemonState because it
472 // sent us a report but its metadata didn't get loaded yet
480 auto c
= new MetadataUpdate(daemon_state
, k
);
481 std::ostringstream cmd
;
482 cmd
<< "{\"prefix\": \"osd metadata\", \"id\": "
484 monc
->start_mon_command(
486 {}, &c
->outbl
, &c
->outs
, c
);
490 cluster_state
.notify_osdmap(osd_map
);
493 // TODO: same culling for MonMap
494 daemon_state
.cull("osd", names_exist
);
497 void Mgr::handle_log(MLog
*m
)
499 for (const auto &e
: m
->entries
) {
500 py_module_registry
->notify_all(e
);
506 void Mgr::handle_service_map(MServiceMap
*m
)
508 dout(10) << "e" << m
->service_map
.epoch
<< dendl
;
509 cluster_state
.set_service_map(m
->service_map
);
510 server
.got_service_map();
513 void Mgr::handle_mon_map()
515 dout(20) << __func__
<< dendl
;
516 assert(lock
.is_locked_by_me());
517 std::set
<std::string
> names_exist
;
518 cluster_state
.with_monmap([&] (auto &monmap
) {
519 for (unsigned int i
= 0; i
< monmap
.size(); i
++) {
520 names_exist
.insert(monmap
.get_name(i
));
523 daemon_state
.cull("mon", names_exist
);
526 bool Mgr::ms_dispatch(Message
*m
)
528 dout(4) << *m
<< dendl
;
529 std::lock_guard
l(lock
);
531 switch (m
->get_type()) {
533 handle_mgr_digest(static_cast<MMgrDigest
*>(m
));
535 case CEPH_MSG_MON_MAP
:
536 py_module_registry
->notify_all("mon_map", "");
540 case CEPH_MSG_FS_MAP
:
541 py_module_registry
->notify_all("fs_map", "");
542 handle_fs_map((MFSMap
*)m
);
543 return false; // I shall let this pass through for Client
545 case CEPH_MSG_OSD_MAP
:
548 py_module_registry
->notify_all("osd_map", "");
550 // Continuous subscribe, so that we can generate notifications
551 // for our MgrPyModules
552 objecter
->maybe_request_map();
555 case MSG_SERVICE_MAP
:
556 handle_service_map(static_cast<MServiceMap
*>(m
));
557 py_module_registry
->notify_all("service_map", "");
561 handle_log(static_cast<MLog
*>(m
));
571 void Mgr::handle_fs_map(MFSMap
* m
)
573 ceph_assert(lock
.is_locked_by_me());
575 std::set
<std::string
> names_exist
;
577 const FSMap
&new_fsmap
= m
->get_fsmap();
579 fs_map_cond
.Signal();
581 // TODO: callers (e.g. from python land) are potentially going to see
582 // the new fsmap before we've bothered populating all the resulting
583 // daemon_state. Maybe we should block python land while we're making
584 // this kind of update?
586 cluster_state
.set_fsmap(new_fsmap
);
588 auto mds_info
= new_fsmap
.get_mds_info();
589 for (const auto &i
: mds_info
) {
590 const auto &info
= i
.second
;
592 if (!new_fsmap
.gid_exists(i
.first
)){
596 // Remember which MDS exists so that we can cull any that don't
597 names_exist
.insert(info
.name
);
599 const auto k
= DaemonKey("mds", info
.name
);
600 if (daemon_state
.is_updating(k
)) {
605 if (daemon_state
.exists(k
)) {
606 auto metadata
= daemon_state
.get(k
);
607 std::lock_guard
l(metadata
->lock
);
608 if (metadata
->metadata
.empty() ||
609 metadata
->metadata
.count("addr") == 0) {
612 auto metadata_addrs
= metadata
->metadata
.at("addr");
613 const auto map_addrs
= info
.addrs
;
614 update
= metadata_addrs
!= stringify(map_addrs
);
616 dout(4) << "MDS[" << info
.name
<< "] addr change " << metadata_addrs
617 << " != " << stringify(map_addrs
) << dendl
;
625 auto c
= new MetadataUpdate(daemon_state
, k
);
627 // Older MDS daemons don't have addr in the metadata, so
628 // fake it if the returned metadata doesn't have the field.
629 c
->set_default("addr", stringify(info
.addrs
));
631 std::ostringstream cmd
;
632 cmd
<< "{\"prefix\": \"mds metadata\", \"who\": \""
633 << info
.name
<< "\"}";
634 monc
->start_mon_command(
636 {}, &c
->outbl
, &c
->outs
, c
);
639 daemon_state
.cull("mds", names_exist
);
642 bool Mgr::got_mgr_map(const MgrMap
& m
)
644 std::lock_guard
l(lock
);
645 dout(10) << m
<< dendl
;
647 set
<string
> old_modules
;
648 cluster_state
.with_mgrmap([&](const MgrMap
& m
) {
649 old_modules
= m
.modules
;
651 if (m
.modules
!= old_modules
) {
652 derr
<< "mgrmap module list changed to (" << m
.modules
<< "), respawn"
657 cluster_state
.set_mgr_map(m
);
658 server
.got_mgr_map();
663 void Mgr::handle_mgr_digest(MMgrDigest
* m
)
665 dout(10) << m
->mon_status_json
.length() << dendl
;
666 dout(10) << m
->health_json
.length() << dendl
;
667 cluster_state
.load_digest(m
);
668 py_module_registry
->notify_all("mon_status", "");
669 py_module_registry
->notify_all("health", "");
671 // Hack: use this as a tick/opportunity to prompt python-land that
672 // the pgmap might have changed since last time we were here.
673 py_module_registry
->notify_all("pg_summary", "");
674 dout(10) << "done." << dendl
;
678 if (!digest_received
) {
679 digest_received
= true;
680 digest_cond
.Signal();
684 std::map
<std::string
, std::string
> Mgr::get_services() const
686 std::lock_guard
l(lock
);
688 return py_module_registry
->get_services();