1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2016 John Spray <john.spray@redhat.com>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
16 #include "osdc/Objecter.h"
17 #include "client/Client.h"
18 #include "common/errno.h"
19 #include "mon/MonClient.h"
20 #include "include/stringify.h"
21 #include "global/global_context.h"
22 #include "global/signal_handler.h"
24 #include "mgr/MgrContext.h"
25 #include "mgr/mgr_commands.h"
27 //#include "MgrPyModule.h"
28 #include "DaemonServer.h"
29 #include "messages/MMgrBeacon.h"
30 #include "messages/MMgrDigest.h"
31 #include "messages/MCommand.h"
32 #include "messages/MCommandReply.h"
33 #include "messages/MLog.h"
34 #include "messages/MServiceMap.h"
38 #define dout_context g_ceph_context
39 #define dout_subsys ceph_subsys_mgr
41 #define dout_prefix *_dout << "mgr " << __func__ << " "
44 Mgr::Mgr(MonClient
*monc_
, const MgrMap
& mgrmap
,
45 PyModuleRegistry
*py_module_registry_
,
46 Messenger
*clientm_
, Objecter
*objecter_
,
47 Client
* client_
, LogChannelRef clog_
, LogChannelRef audit_clog_
) :
51 client_messenger(clientm_
),
53 timer(g_ceph_context
, lock
),
54 finisher(g_ceph_context
, "Mgr", "mgr-fin"),
55 digest_received(false),
56 py_module_registry(py_module_registry_
),
57 cluster_state(monc
, nullptr, mgrmap
),
58 server(monc
, finisher
, daemon_state
, cluster_state
, *py_module_registry
,
61 audit_clog(audit_clog_
),
65 cluster_state
.set_objecter(objecter
);
75 * Context for completion of metadata mon commands: take
76 * the result and stash it in DaemonStateIndex
78 class MetadataUpdate
: public Context
80 DaemonStateIndex
&daemon_state
;
83 std::map
<std::string
, std::string
> defaults
;
89 MetadataUpdate(DaemonStateIndex
&daemon_state_
, const DaemonKey
&key_
)
90 : daemon_state(daemon_state_
), key(key_
) {}
92 void set_default(const std::string
&k
, const std::string
&v
)
97 void finish(int r
) override
99 daemon_state
.clear_updating(key
);
101 if (key
.first
== "mds") {
102 json_spirit::mValue json_result
;
103 bool read_ok
= json_spirit::read(
104 outbl
.to_str(), json_result
);
106 dout(1) << "mon returned invalid JSON for "
107 << key
.first
<< "." << key
.second
<< dendl
;
111 json_spirit::mObject daemon_meta
= json_result
.get_obj();
113 // Apply any defaults
114 for (const auto &i
: defaults
) {
115 if (daemon_meta
.find(i
.first
) == daemon_meta
.end()) {
116 daemon_meta
[i
.first
] = i
.second
;
120 DaemonStatePtr state
;
121 if (daemon_state
.exists(key
)) {
122 state
= daemon_state
.get(key
);
123 Mutex::Locker
l(state
->lock
);
124 daemon_meta
.erase("name");
125 daemon_meta
.erase("hostname");
126 state
->metadata
.clear();
127 for (const auto &i
: daemon_meta
) {
128 state
->metadata
[i
.first
] = i
.second
.get_str();
131 state
= std::make_shared
<DaemonState
>(daemon_state
.types
);
133 state
->hostname
= daemon_meta
.at("hostname").get_str();
135 for (const auto &i
: daemon_meta
) {
136 state
->metadata
[i
.first
] = i
.second
.get_str();
139 daemon_state
.insert(state
);
141 } else if (key
.first
== "osd") {
146 dout(1) << "mon failed to return metadata for "
147 << key
.first
<< "." << key
.second
<< ": "
148 << cpp_strerror(r
) << dendl
;
154 void Mgr::background_init(Context
*completion
)
156 Mutex::Locker
l(lock
);
157 assert(!initializing
);
158 assert(!initialized
);
163 finisher
.queue(new FunctionContext([this, completion
](int r
){
165 completion
->complete(0);
171 Mutex::Locker
l(lock
);
172 assert(initializing
);
173 assert(!initialized
);
175 // Start communicating with daemons to learn statistics etc
176 int r
= server
.init(monc
->get_global_id(), client_messenger
->get_myaddr());
178 derr
<< "Initialize server fail"<< dendl
;
181 dout(4) << "Initialized server at " << server
.get_myaddr() << dendl
;
183 // Preload all daemon metadata (will subsequently keep this
184 // up to date by watching maps, so do the initial load before
185 // we subscribe to any maps)
186 dout(4) << "Loading daemon metadata..." << dendl
;
189 // subscribe to all the maps
190 monc
->sub_want("log-info", 0, 0);
191 monc
->sub_want("mgrdigest", 0, 0);
192 monc
->sub_want("fsmap", 0, 0);
193 monc
->sub_want("servicemap", 0, 0);
195 dout(4) << "waiting for OSDMap..." << dendl
;
196 // Subscribe to OSDMap update to pass on to ClusterState
197 objecter
->maybe_request_map();
199 // reset the mon session. we get these maps through subscriptions which
200 // are stateful with the connection, so even if *we* don't have them a
201 // previous incarnation sharing the same MonClient may have.
202 monc
->reopen_session();
204 // Start Objecter and wait for OSD map
205 lock
.Unlock(); // Drop lock because OSDMap dispatch calls into my ms_dispatch
206 objecter
->wait_for_osd_map();
209 // Populate PGs in ClusterState
210 objecter
->with_osdmap([this](const OSDMap
&osd_map
) {
211 cluster_state
.notify_osdmap(osd_map
);
215 dout(4) << "waiting for FSMap..." << dendl
;
216 while (!cluster_state
.have_fsmap()) {
217 fs_map_cond
.Wait(lock
);
220 dout(4) << "waiting for config-keys..." << dendl
;
222 // Preload config keys (`get` for plugins is to be a fast local
223 // operation, we we don't have to synchronize these later because
224 // all sets will come via mgr)
225 auto loaded_config
= load_config();
227 // Wait for MgrDigest...
228 dout(4) << "waiting for MgrDigest..." << dendl
;
229 while (!digest_received
) {
230 digest_cond
.Wait(lock
);
233 // assume finisher already initialized in background_init
234 dout(4) << "starting python modules..." << dendl
;
235 py_module_registry
->active_start(loaded_config
, daemon_state
, cluster_state
, *monc
,
236 clog
, *objecter
, *client
, finisher
);
238 dout(4) << "Complete." << dendl
;
239 initializing
= false;
243 void Mgr::load_all_metadata()
245 assert(lock
.is_locked_by_me());
248 mds_cmd
.run(monc
, "{\"prefix\": \"mds metadata\"}");
250 osd_cmd
.run(monc
, "{\"prefix\": \"osd metadata\"}");
252 mon_cmd
.run(monc
, "{\"prefix\": \"mon metadata\"}");
260 assert(mds_cmd
.r
== 0);
261 assert(mon_cmd
.r
== 0);
262 assert(osd_cmd
.r
== 0);
264 for (auto &metadata_val
: mds_cmd
.json_result
.get_array()) {
265 json_spirit::mObject daemon_meta
= metadata_val
.get_obj();
266 if (daemon_meta
.count("hostname") == 0) {
267 dout(1) << "Skipping incomplete metadata entry" << dendl
;
271 DaemonStatePtr dm
= std::make_shared
<DaemonState
>(daemon_state
.types
);
272 dm
->key
= DaemonKey("mds",
273 daemon_meta
.at("name").get_str());
274 dm
->hostname
= daemon_meta
.at("hostname").get_str();
276 daemon_meta
.erase("name");
277 daemon_meta
.erase("hostname");
279 for (const auto &i
: daemon_meta
) {
280 dm
->metadata
[i
.first
] = i
.second
.get_str();
283 daemon_state
.insert(dm
);
286 for (auto &metadata_val
: mon_cmd
.json_result
.get_array()) {
287 json_spirit::mObject daemon_meta
= metadata_val
.get_obj();
288 if (daemon_meta
.count("hostname") == 0) {
289 dout(1) << "Skipping incomplete metadata entry" << dendl
;
293 DaemonStatePtr dm
= std::make_shared
<DaemonState
>(daemon_state
.types
);
294 dm
->key
= DaemonKey("mon",
295 daemon_meta
.at("name").get_str());
296 dm
->hostname
= daemon_meta
.at("hostname").get_str();
298 daemon_meta
.erase("name");
299 daemon_meta
.erase("hostname");
301 for (const auto &i
: daemon_meta
) {
302 dm
->metadata
[i
.first
] = i
.second
.get_str();
305 daemon_state
.insert(dm
);
308 for (auto &osd_metadata_val
: osd_cmd
.json_result
.get_array()) {
309 json_spirit::mObject osd_metadata
= osd_metadata_val
.get_obj();
310 if (osd_metadata
.count("hostname") == 0) {
311 dout(1) << "Skipping incomplete metadata entry" << dendl
;
314 dout(4) << osd_metadata
.at("hostname").get_str() << dendl
;
316 DaemonStatePtr dm
= std::make_shared
<DaemonState
>(daemon_state
.types
);
317 dm
->key
= DaemonKey("osd",
318 stringify(osd_metadata
.at("id").get_int()));
319 dm
->hostname
= osd_metadata
.at("hostname").get_str();
321 osd_metadata
.erase("id");
322 osd_metadata
.erase("hostname");
324 for (const auto &i
: osd_metadata
) {
325 dm
->metadata
[i
.first
] = i
.second
.get_str();
328 daemon_state
.insert(dm
);
332 std::map
<std::string
, std::string
> Mgr::load_config()
334 assert(lock
.is_locked_by_me());
336 dout(10) << "listing keys" << dendl
;
338 cmd
.run(monc
, "{\"prefix\": \"config-key ls\"}");
344 std::map
<std::string
, std::string
> loaded
;
346 for (auto &key_str
: cmd
.json_result
.get_array()) {
347 std::string
const key
= key_str
.get_str();
348 dout(20) << "saw key '" << key
<< "'" << dendl
;
350 const std::string config_prefix
= PyModuleRegistry::config_prefix
;
352 if (key
.substr(0, config_prefix
.size()) == config_prefix
) {
353 dout(20) << "fetching '" << key
<< "'" << dendl
;
355 std::ostringstream cmd_json
;
356 cmd_json
<< "{\"prefix\": \"config-key get\", \"key\": \"" << key
<< "\"}";
357 get_cmd
.run(monc
, cmd_json
.str());
361 assert(get_cmd
.r
== 0);
362 loaded
[key
] = get_cmd
.outbl
.to_str();
371 finisher
.queue(new FunctionContext([&](int) {
373 Mutex::Locker
l(lock
);
374 monc
->sub_unwant("log-info");
375 monc
->sub_unwant("mgrdigest");
376 monc
->sub_unwant("fsmap");
377 // First stop the server so that we're not taking any more incoming
381 // after the messenger is stopped, signal modules to shutdown via finisher
382 py_module_registry
->active_shutdown();
385 // Then stop the finisher to ensure its enqueued contexts aren't going
386 // to touch references to the things we're about to tear down
387 finisher
.wait_for_empty();
391 void Mgr::handle_osd_map()
393 assert(lock
.is_locked_by_me());
395 std::set
<std::string
> names_exist
;
398 * When we see a new OSD map, inspect the entity addrs to
399 * see if they have changed (service restart), and if so
400 * reload the metadata.
402 objecter
->with_osdmap([this, &names_exist
](const OSDMap
&osd_map
) {
403 for (unsigned int osd_id
= 0; osd_id
< osd_map
.get_num_osds(); ++osd_id
) {
404 if (!osd_map
.exists(osd_id
)) {
408 // Remember which OSDs exist so that we can cull any that don't
409 names_exist
.insert(stringify(osd_id
));
411 // Consider whether to update the daemon metadata (new/restarted daemon)
412 bool update_meta
= false;
413 const auto k
= DaemonKey("osd", stringify(osd_id
));
414 if (daemon_state
.is_updating(k
)) {
418 if (daemon_state
.exists(k
)) {
419 auto metadata
= daemon_state
.get(k
);
420 Mutex::Locker
l(metadata
->lock
);
421 auto addr_iter
= metadata
->metadata
.find("front_addr");
422 if (addr_iter
!= metadata
->metadata
.end()) {
423 const std::string
&metadata_addr
= addr_iter
->second
;
424 const auto &map_addr
= osd_map
.get_addr(osd_id
);
426 if (metadata_addr
!= stringify(map_addr
)) {
427 dout(4) << "OSD[" << osd_id
<< "] addr change " << metadata_addr
428 << " != " << stringify(map_addr
) << dendl
;
431 dout(20) << "OSD[" << osd_id
<< "] addr unchanged: "
432 << metadata_addr
<< dendl
;
435 // Awkward case where daemon went into DaemonState because it
436 // sent us a report but its metadata didn't get loaded yet
444 daemon_state
.notify_updating(k
);
445 auto c
= new MetadataUpdate(daemon_state
, k
);
446 std::ostringstream cmd
;
447 cmd
<< "{\"prefix\": \"osd metadata\", \"id\": "
449 monc
->start_mon_command(
451 {}, &c
->outbl
, &c
->outs
, c
);
455 cluster_state
.notify_osdmap(osd_map
);
458 // TODO: same culling for MonMap
459 daemon_state
.cull("osd", names_exist
);
462 void Mgr::handle_log(MLog
*m
)
464 for (const auto &e
: m
->entries
) {
465 py_module_registry
->notify_all(e
);
471 void Mgr::handle_service_map(MServiceMap
*m
)
473 dout(10) << "e" << m
->service_map
.epoch
<< dendl
;
474 cluster_state
.set_service_map(m
->service_map
);
475 server
.got_service_map();
478 bool Mgr::ms_dispatch(Message
*m
)
480 dout(4) << *m
<< dendl
;
481 Mutex::Locker
l(lock
);
483 switch (m
->get_type()) {
485 handle_mgr_digest(static_cast<MMgrDigest
*>(m
));
487 case CEPH_MSG_MON_MAP
:
488 py_module_registry
->notify_all("mon_map", "");
491 case CEPH_MSG_FS_MAP
:
492 py_module_registry
->notify_all("fs_map", "");
493 handle_fs_map((MFSMap
*)m
);
494 return false; // I shall let this pass through for Client
496 case CEPH_MSG_OSD_MAP
:
499 py_module_registry
->notify_all("osd_map", "");
501 // Continuous subscribe, so that we can generate notifications
502 // for our MgrPyModules
503 objecter
->maybe_request_map();
506 case MSG_SERVICE_MAP
:
507 handle_service_map((MServiceMap
*)m
);
508 py_module_registry
->notify_all("service_map", "");
512 handle_log(static_cast<MLog
*>(m
));
522 void Mgr::handle_fs_map(MFSMap
* m
)
524 assert(lock
.is_locked_by_me());
526 std::set
<std::string
> names_exist
;
528 const FSMap
&new_fsmap
= m
->get_fsmap();
530 fs_map_cond
.Signal();
532 // TODO: callers (e.g. from python land) are potentially going to see
533 // the new fsmap before we've bothered populating all the resulting
534 // daemon_state. Maybe we should block python land while we're making
535 // this kind of update?
537 cluster_state
.set_fsmap(new_fsmap
);
539 auto mds_info
= new_fsmap
.get_mds_info();
540 for (const auto &i
: mds_info
) {
541 const auto &info
= i
.second
;
543 if (!new_fsmap
.gid_exists(i
.first
)){
547 // Remember which MDS exists so that we can cull any that don't
548 names_exist
.insert(info
.name
);
550 const auto k
= DaemonKey("mds", info
.name
);
551 if (daemon_state
.is_updating(k
)) {
556 if (daemon_state
.exists(k
)) {
557 auto metadata
= daemon_state
.get(k
);
558 Mutex::Locker
l(metadata
->lock
);
559 if (metadata
->metadata
.empty() ||
560 metadata
->metadata
.count("addr") == 0) {
563 auto metadata_addr
= metadata
->metadata
.at("addr");
564 const auto map_addr
= info
.addr
;
565 update
= metadata_addr
!= stringify(map_addr
);
567 dout(4) << "MDS[" << info
.name
<< "] addr change " << metadata_addr
568 << " != " << stringify(map_addr
) << dendl
;
576 daemon_state
.notify_updating(k
);
577 auto c
= new MetadataUpdate(daemon_state
, k
);
579 // Older MDS daemons don't have addr in the metadata, so
580 // fake it if the returned metadata doesn't have the field.
581 c
->set_default("addr", stringify(info
.addr
));
583 std::ostringstream cmd
;
584 cmd
<< "{\"prefix\": \"mds metadata\", \"who\": \""
585 << info
.name
<< "\"}";
586 monc
->start_mon_command(
588 {}, &c
->outbl
, &c
->outs
, c
);
591 daemon_state
.cull("mds", names_exist
);
594 bool Mgr::got_mgr_map(const MgrMap
& m
)
596 Mutex::Locker
l(lock
);
597 dout(10) << m
<< dendl
;
599 set
<string
> old_modules
;
600 cluster_state
.with_mgrmap([&](const MgrMap
& m
) {
601 old_modules
= m
.modules
;
603 if (m
.modules
!= old_modules
) {
604 derr
<< "mgrmap module list changed to (" << m
.modules
<< "), respawn"
609 cluster_state
.set_mgr_map(m
);
614 void Mgr::handle_mgr_digest(MMgrDigest
* m
)
616 dout(10) << m
->mon_status_json
.length() << dendl
;
617 dout(10) << m
->health_json
.length() << dendl
;
618 cluster_state
.load_digest(m
);
619 py_module_registry
->notify_all("mon_status", "");
620 py_module_registry
->notify_all("health", "");
622 // Hack: use this as a tick/opportunity to prompt python-land that
623 // the pgmap might have changed since last time we were here.
624 py_module_registry
->notify_all("pg_summary", "");
625 dout(10) << "done." << dendl
;
629 if (!digest_received
) {
630 digest_received
= true;
631 digest_cond
.Signal();
638 server
.send_report();
641 std::vector
<MonCommand
> Mgr::get_command_set() const
643 Mutex::Locker
l(lock
);
645 std::vector
<MonCommand
> commands
= mgr_commands
;
646 std::vector
<MonCommand
> py_commands
= py_module_registry
->get_commands();
647 commands
.insert(commands
.end(), py_commands
.begin(), py_commands
.end());
651 std::map
<std::string
, std::string
> Mgr::get_services() const
653 Mutex::Locker
l(lock
);
655 return py_module_registry
->get_services();