1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2016 John Spray <john.spray@redhat.com>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
16 #include "osdc/Objecter.h"
17 #include "client/Client.h"
18 #include "common/errno.h"
19 #include "mon/MonClient.h"
20 #include "include/stringify.h"
21 #include "global/global_context.h"
22 #include "global/signal_handler.h"
24 #include "mgr/MgrContext.h"
25 #include "mgr/mgr_commands.h"
27 //#include "MgrPyModule.h"
28 #include "DaemonServer.h"
29 #include "messages/MMgrBeacon.h"
30 #include "messages/MMgrDigest.h"
31 #include "messages/MCommand.h"
32 #include "messages/MCommandReply.h"
33 #include "messages/MLog.h"
34 #include "messages/MServiceMap.h"
38 #define dout_context g_ceph_context
39 #define dout_subsys ceph_subsys_mgr
41 #define dout_prefix *_dout << "mgr " << __func__ << " "
44 Mgr::Mgr(MonClient
*monc_
, const MgrMap
& mgrmap
,
45 PyModuleRegistry
*py_module_registry_
,
46 Messenger
*clientm_
, Objecter
*objecter_
,
47 Client
* client_
, LogChannelRef clog_
, LogChannelRef audit_clog_
) :
51 client_messenger(clientm_
),
53 timer(g_ceph_context
, lock
),
54 finisher(g_ceph_context
, "Mgr", "mgr-fin"),
55 digest_received(false),
56 py_module_registry(py_module_registry_
),
57 cluster_state(monc
, nullptr, mgrmap
),
58 server(monc
, finisher
, daemon_state
, cluster_state
, *py_module_registry
,
61 audit_clog(audit_clog_
),
65 cluster_state
.set_objecter(objecter
);
73 void MetadataUpdate::finish(int r
)
75 daemon_state
.clear_updating(key
);
77 if (key
.first
== "mds" || key
.first
== "osd") {
78 json_spirit::mValue json_result
;
79 bool read_ok
= json_spirit::read(
80 outbl
.to_str(), json_result
);
82 dout(1) << "mon returned invalid JSON for "
83 << key
.first
<< "." << key
.second
<< dendl
;
86 dout(4) << "mon returned valid metadata JSON for "
87 << key
.first
<< "." << key
.second
<< dendl
;
89 json_spirit::mObject daemon_meta
= json_result
.get_obj();
92 for (const auto &i
: defaults
) {
93 if (daemon_meta
.find(i
.first
) == daemon_meta
.end()) {
94 daemon_meta
[i
.first
] = i
.second
;
99 if (daemon_state
.exists(key
)) {
100 state
= daemon_state
.get(key
);
101 Mutex::Locker
l(state
->lock
);
102 if (key
.first
== "mds") {
103 daemon_meta
.erase("name");
104 } else if (key
.first
== "osd") {
105 daemon_meta
.erase("id");
107 daemon_meta
.erase("hostname");
108 state
->metadata
.clear();
109 for (const auto &i
: daemon_meta
) {
110 state
->metadata
[i
.first
] = i
.second
.get_str();
113 state
= std::make_shared
<DaemonState
>(daemon_state
.types
);
115 state
->hostname
= daemon_meta
.at("hostname").get_str();
117 if (key
.first
== "mds") {
118 daemon_meta
.erase("name");
119 } else if (key
.first
== "osd") {
120 daemon_meta
.erase("id");
122 daemon_meta
.erase("hostname");
124 for (const auto &i
: daemon_meta
) {
125 state
->metadata
[i
.first
] = i
.second
.get_str();
128 daemon_state
.insert(state
);
134 dout(1) << "mon failed to return metadata for "
135 << key
.first
<< "." << key
.second
<< ": "
136 << cpp_strerror(r
) << dendl
;
140 void Mgr::background_init(Context
*completion
)
142 Mutex::Locker
l(lock
);
143 assert(!initializing
);
144 assert(!initialized
);
149 finisher
.queue(new FunctionContext([this, completion
](int r
){
151 completion
->complete(0);
157 Mutex::Locker
l(lock
);
158 assert(initializing
);
159 assert(!initialized
);
161 // Start communicating with daemons to learn statistics etc
162 int r
= server
.init(monc
->get_global_id(), client_messenger
->get_myaddr());
164 derr
<< "Initialize server fail"<< dendl
;
167 dout(4) << "Initialized server at " << server
.get_myaddr() << dendl
;
169 // Preload all daemon metadata (will subsequently keep this
170 // up to date by watching maps, so do the initial load before
171 // we subscribe to any maps)
172 dout(4) << "Loading daemon metadata..." << dendl
;
175 // subscribe to all the maps
176 monc
->sub_want("log-info", 0, 0);
177 monc
->sub_want("mgrdigest", 0, 0);
178 monc
->sub_want("fsmap", 0, 0);
179 monc
->sub_want("servicemap", 0, 0);
181 dout(4) << "waiting for OSDMap..." << dendl
;
182 // Subscribe to OSDMap update to pass on to ClusterState
183 objecter
->maybe_request_map();
185 // reset the mon session. we get these maps through subscriptions which
186 // are stateful with the connection, so even if *we* don't have them a
187 // previous incarnation sharing the same MonClient may have.
188 monc
->reopen_session();
190 // Start Objecter and wait for OSD map
191 lock
.Unlock(); // Drop lock because OSDMap dispatch calls into my ms_dispatch
192 objecter
->wait_for_osd_map();
195 // Populate PGs in ClusterState
196 objecter
->with_osdmap([this](const OSDMap
&osd_map
) {
197 cluster_state
.notify_osdmap(osd_map
);
201 dout(4) << "waiting for FSMap..." << dendl
;
202 while (!cluster_state
.have_fsmap()) {
203 fs_map_cond
.Wait(lock
);
206 dout(4) << "waiting for config-keys..." << dendl
;
208 // Preload config keys (`get` for plugins is to be a fast local
209 // operation, we we don't have to synchronize these later because
210 // all sets will come via mgr)
211 auto loaded_config
= load_config();
213 // Wait for MgrDigest...
214 dout(4) << "waiting for MgrDigest..." << dendl
;
215 while (!digest_received
) {
216 digest_cond
.Wait(lock
);
219 // assume finisher already initialized in background_init
220 dout(4) << "starting python modules..." << dendl
;
221 py_module_registry
->active_start(loaded_config
, daemon_state
, cluster_state
, *monc
,
222 clog
, *objecter
, *client
, finisher
);
224 dout(4) << "Complete." << dendl
;
225 initializing
= false;
229 void Mgr::load_all_metadata()
231 assert(lock
.is_locked_by_me());
234 mds_cmd
.run(monc
, "{\"prefix\": \"mds metadata\"}");
236 osd_cmd
.run(monc
, "{\"prefix\": \"osd metadata\"}");
238 mon_cmd
.run(monc
, "{\"prefix\": \"mon metadata\"}");
246 assert(mds_cmd
.r
== 0);
247 assert(mon_cmd
.r
== 0);
248 assert(osd_cmd
.r
== 0);
250 for (auto &metadata_val
: mds_cmd
.json_result
.get_array()) {
251 json_spirit::mObject daemon_meta
= metadata_val
.get_obj();
252 if (daemon_meta
.count("hostname") == 0) {
253 dout(1) << "Skipping incomplete metadata entry" << dendl
;
257 DaemonStatePtr dm
= std::make_shared
<DaemonState
>(daemon_state
.types
);
258 dm
->key
= DaemonKey("mds",
259 daemon_meta
.at("name").get_str());
260 dm
->hostname
= daemon_meta
.at("hostname").get_str();
262 daemon_meta
.erase("name");
263 daemon_meta
.erase("hostname");
265 for (const auto &i
: daemon_meta
) {
266 dm
->metadata
[i
.first
] = i
.second
.get_str();
269 daemon_state
.insert(dm
);
272 for (auto &metadata_val
: mon_cmd
.json_result
.get_array()) {
273 json_spirit::mObject daemon_meta
= metadata_val
.get_obj();
274 if (daemon_meta
.count("hostname") == 0) {
275 dout(1) << "Skipping incomplete metadata entry" << dendl
;
279 DaemonStatePtr dm
= std::make_shared
<DaemonState
>(daemon_state
.types
);
280 dm
->key
= DaemonKey("mon",
281 daemon_meta
.at("name").get_str());
282 dm
->hostname
= daemon_meta
.at("hostname").get_str();
284 daemon_meta
.erase("name");
285 daemon_meta
.erase("hostname");
287 for (const auto &i
: daemon_meta
) {
288 dm
->metadata
[i
.first
] = i
.second
.get_str();
291 daemon_state
.insert(dm
);
294 for (auto &osd_metadata_val
: osd_cmd
.json_result
.get_array()) {
295 json_spirit::mObject osd_metadata
= osd_metadata_val
.get_obj();
296 if (osd_metadata
.count("hostname") == 0) {
297 dout(1) << "Skipping incomplete metadata entry" << dendl
;
300 dout(4) << osd_metadata
.at("hostname").get_str() << dendl
;
302 DaemonStatePtr dm
= std::make_shared
<DaemonState
>(daemon_state
.types
);
303 dm
->key
= DaemonKey("osd",
304 stringify(osd_metadata
.at("id").get_int()));
305 dm
->hostname
= osd_metadata
.at("hostname").get_str();
307 osd_metadata
.erase("id");
308 osd_metadata
.erase("hostname");
310 for (const auto &i
: osd_metadata
) {
311 dm
->metadata
[i
.first
] = i
.second
.get_str();
314 daemon_state
.insert(dm
);
318 std::map
<std::string
, std::string
> Mgr::load_config()
320 assert(lock
.is_locked_by_me());
322 dout(10) << "listing keys" << dendl
;
324 cmd
.run(monc
, "{\"prefix\": \"config-key ls\"}");
330 std::map
<std::string
, std::string
> loaded
;
332 for (auto &key_str
: cmd
.json_result
.get_array()) {
333 std::string
const key
= key_str
.get_str();
334 dout(20) << "saw key '" << key
<< "'" << dendl
;
336 const std::string config_prefix
= PyModuleRegistry::config_prefix
;
338 if (key
.substr(0, config_prefix
.size()) == config_prefix
) {
339 dout(20) << "fetching '" << key
<< "'" << dendl
;
341 std::ostringstream cmd_json
;
342 cmd_json
<< "{\"prefix\": \"config-key get\", \"key\": \"" << key
<< "\"}";
343 get_cmd
.run(monc
, cmd_json
.str());
347 assert(get_cmd
.r
== 0);
348 loaded
[key
] = get_cmd
.outbl
.to_str();
357 finisher
.queue(new FunctionContext([&](int) {
359 Mutex::Locker
l(lock
);
360 monc
->sub_unwant("log-info");
361 monc
->sub_unwant("mgrdigest");
362 monc
->sub_unwant("fsmap");
363 // First stop the server so that we're not taking any more incoming
367 // after the messenger is stopped, signal modules to shutdown via finisher
368 py_module_registry
->active_shutdown();
371 // Then stop the finisher to ensure its enqueued contexts aren't going
372 // to touch references to the things we're about to tear down
373 finisher
.wait_for_empty();
377 void Mgr::handle_osd_map()
379 assert(lock
.is_locked_by_me());
381 std::set
<std::string
> names_exist
;
384 * When we see a new OSD map, inspect the entity addrs to
385 * see if they have changed (service restart), and if so
386 * reload the metadata.
388 objecter
->with_osdmap([this, &names_exist
](const OSDMap
&osd_map
) {
389 for (unsigned int osd_id
= 0; osd_id
< osd_map
.get_max_osd(); ++osd_id
) {
390 if (!osd_map
.exists(osd_id
)) {
394 // Remember which OSDs exist so that we can cull any that don't
395 names_exist
.insert(stringify(osd_id
));
397 // Consider whether to update the daemon metadata (new/restarted daemon)
398 bool update_meta
= false;
399 const auto k
= DaemonKey("osd", stringify(osd_id
));
400 if (daemon_state
.is_updating(k
)) {
404 if (daemon_state
.exists(k
)) {
405 auto metadata
= daemon_state
.get(k
);
406 Mutex::Locker
l(metadata
->lock
);
407 auto addr_iter
= metadata
->metadata
.find("front_addr");
408 if (addr_iter
!= metadata
->metadata
.end()) {
409 const std::string
&metadata_addr
= addr_iter
->second
;
410 const auto &map_addr
= osd_map
.get_addr(osd_id
);
412 if (metadata_addr
!= stringify(map_addr
)) {
413 dout(4) << "OSD[" << osd_id
<< "] addr change " << metadata_addr
414 << " != " << stringify(map_addr
) << dendl
;
417 dout(20) << "OSD[" << osd_id
<< "] addr unchanged: "
418 << metadata_addr
<< dendl
;
421 // Awkward case where daemon went into DaemonState because it
422 // sent us a report but its metadata didn't get loaded yet
430 daemon_state
.notify_updating(k
);
431 auto c
= new MetadataUpdate(daemon_state
, k
);
432 std::ostringstream cmd
;
433 cmd
<< "{\"prefix\": \"osd metadata\", \"id\": "
435 monc
->start_mon_command(
437 {}, &c
->outbl
, &c
->outs
, c
);
441 cluster_state
.notify_osdmap(osd_map
);
444 // TODO: same culling for MonMap
445 daemon_state
.cull("osd", names_exist
);
448 void Mgr::handle_log(MLog
*m
)
450 for (const auto &e
: m
->entries
) {
451 py_module_registry
->notify_all(e
);
457 void Mgr::handle_service_map(MServiceMap
*m
)
459 dout(10) << "e" << m
->service_map
.epoch
<< dendl
;
460 cluster_state
.set_service_map(m
->service_map
);
461 server
.got_service_map();
464 bool Mgr::ms_dispatch(Message
*m
)
466 dout(4) << *m
<< dendl
;
467 Mutex::Locker
l(lock
);
469 switch (m
->get_type()) {
471 handle_mgr_digest(static_cast<MMgrDigest
*>(m
));
473 case CEPH_MSG_MON_MAP
:
474 py_module_registry
->notify_all("mon_map", "");
477 case CEPH_MSG_FS_MAP
:
478 py_module_registry
->notify_all("fs_map", "");
479 handle_fs_map((MFSMap
*)m
);
480 return false; // I shall let this pass through for Client
482 case CEPH_MSG_OSD_MAP
:
485 py_module_registry
->notify_all("osd_map", "");
487 // Continuous subscribe, so that we can generate notifications
488 // for our MgrPyModules
489 objecter
->maybe_request_map();
492 case MSG_SERVICE_MAP
:
493 handle_service_map((MServiceMap
*)m
);
494 py_module_registry
->notify_all("service_map", "");
498 handle_log(static_cast<MLog
*>(m
));
508 void Mgr::handle_fs_map(MFSMap
* m
)
510 assert(lock
.is_locked_by_me());
512 std::set
<std::string
> names_exist
;
514 const FSMap
&new_fsmap
= m
->get_fsmap();
516 fs_map_cond
.Signal();
518 // TODO: callers (e.g. from python land) are potentially going to see
519 // the new fsmap before we've bothered populating all the resulting
520 // daemon_state. Maybe we should block python land while we're making
521 // this kind of update?
523 cluster_state
.set_fsmap(new_fsmap
);
525 auto mds_info
= new_fsmap
.get_mds_info();
526 for (const auto &i
: mds_info
) {
527 const auto &info
= i
.second
;
529 if (!new_fsmap
.gid_exists(i
.first
)){
533 // Remember which MDS exists so that we can cull any that don't
534 names_exist
.insert(info
.name
);
536 const auto k
= DaemonKey("mds", info
.name
);
537 if (daemon_state
.is_updating(k
)) {
542 if (daemon_state
.exists(k
)) {
543 auto metadata
= daemon_state
.get(k
);
544 Mutex::Locker
l(metadata
->lock
);
545 if (metadata
->metadata
.empty() ||
546 metadata
->metadata
.count("addr") == 0) {
549 auto metadata_addr
= metadata
->metadata
.at("addr");
550 const auto map_addr
= info
.addr
;
551 update
= metadata_addr
!= stringify(map_addr
);
553 dout(4) << "MDS[" << info
.name
<< "] addr change " << metadata_addr
554 << " != " << stringify(map_addr
) << dendl
;
562 daemon_state
.notify_updating(k
);
563 auto c
= new MetadataUpdate(daemon_state
, k
);
565 // Older MDS daemons don't have addr in the metadata, so
566 // fake it if the returned metadata doesn't have the field.
567 c
->set_default("addr", stringify(info
.addr
));
569 std::ostringstream cmd
;
570 cmd
<< "{\"prefix\": \"mds metadata\", \"who\": \""
571 << info
.name
<< "\"}";
572 monc
->start_mon_command(
574 {}, &c
->outbl
, &c
->outs
, c
);
577 daemon_state
.cull("mds", names_exist
);
580 bool Mgr::got_mgr_map(const MgrMap
& m
)
582 Mutex::Locker
l(lock
);
583 dout(10) << m
<< dendl
;
585 set
<string
> old_modules
;
586 cluster_state
.with_mgrmap([&](const MgrMap
& m
) {
587 old_modules
= m
.modules
;
589 if (m
.modules
!= old_modules
) {
590 derr
<< "mgrmap module list changed to (" << m
.modules
<< "), respawn"
595 cluster_state
.set_mgr_map(m
);
600 void Mgr::handle_mgr_digest(MMgrDigest
* m
)
602 dout(10) << m
->mon_status_json
.length() << dendl
;
603 dout(10) << m
->health_json
.length() << dendl
;
604 cluster_state
.load_digest(m
);
605 py_module_registry
->notify_all("mon_status", "");
606 py_module_registry
->notify_all("health", "");
608 // Hack: use this as a tick/opportunity to prompt python-land that
609 // the pgmap might have changed since last time we were here.
610 py_module_registry
->notify_all("pg_summary", "");
611 dout(10) << "done." << dendl
;
615 if (!digest_received
) {
616 digest_received
= true;
617 digest_cond
.Signal();
624 server
.send_report();
627 std::vector
<MonCommand
> Mgr::get_command_set() const
629 Mutex::Locker
l(lock
);
631 std::vector
<MonCommand
> commands
= mgr_commands
;
632 std::vector
<MonCommand
> py_commands
= py_module_registry
->get_commands();
633 commands
.insert(commands
.end(), py_commands
.begin(), py_commands
.end());
637 std::map
<std::string
, std::string
> Mgr::get_services() const
639 Mutex::Locker
l(lock
);
641 return py_module_registry
->get_services();