1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2016 John Spray <john.spray@redhat.com>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
16 #include "osdc/Objecter.h"
17 #include "client/Client.h"
18 #include "common/errno.h"
19 #include "mon/MonClient.h"
20 #include "include/stringify.h"
21 #include "global/global_context.h"
22 #include "global/signal_handler.h"
24 #include "mgr/MgrContext.h"
25 #include "mgr/mgr_commands.h"
27 #include "MgrPyModule.h"
28 #include "DaemonServer.h"
29 #include "messages/MMgrBeacon.h"
30 #include "messages/MMgrDigest.h"
31 #include "messages/MCommand.h"
32 #include "messages/MCommandReply.h"
33 #include "messages/MLog.h"
34 #include "messages/MServiceMap.h"
38 #define dout_context g_ceph_context
39 #define dout_subsys ceph_subsys_mgr
41 #define dout_prefix *_dout << "mgr " << __func__ << " "
44 Mgr::Mgr(MonClient
*monc_
, const MgrMap
& mgrmap
,
45 Messenger
*clientm_
, Objecter
*objecter_
,
46 Client
* client_
, LogChannelRef clog_
, LogChannelRef audit_clog_
) :
50 client_messenger(clientm_
),
52 timer(g_ceph_context
, lock
),
53 finisher(g_ceph_context
, "Mgr", "mgr-fin"),
54 digest_received(false),
55 py_modules(daemon_state
, cluster_state
, *monc
, clog_
, *objecter
, *client
,
57 cluster_state(monc
, nullptr, mgrmap
),
58 server(monc
, finisher
, daemon_state
, cluster_state
, py_modules
,
63 cluster_state
.set_objecter(objecter
);
73 * Context for completion of metadata mon commands: take
74 * the result and stash it in DaemonStateIndex
76 class MetadataUpdate
: public Context
78 DaemonStateIndex
&daemon_state
;
81 std::map
<std::string
, std::string
> defaults
;
87 MetadataUpdate(DaemonStateIndex
&daemon_state_
, const DaemonKey
&key_
)
88 : daemon_state(daemon_state_
), key(key_
) {}
90 void set_default(const std::string
&k
, const std::string
&v
)
95 void finish(int r
) override
97 daemon_state
.clear_updating(key
);
99 if (key
.first
== "mds") {
100 json_spirit::mValue json_result
;
101 bool read_ok
= json_spirit::read(
102 outbl
.to_str(), json_result
);
104 dout(1) << "mon returned invalid JSON for "
105 << key
.first
<< "." << key
.second
<< dendl
;
109 json_spirit::mObject daemon_meta
= json_result
.get_obj();
111 // Apply any defaults
112 for (const auto &i
: defaults
) {
113 if (daemon_meta
.find(i
.first
) == daemon_meta
.end()) {
114 daemon_meta
[i
.first
] = i
.second
;
118 DaemonStatePtr state
;
119 if (daemon_state
.exists(key
)) {
120 state
= daemon_state
.get(key
);
121 Mutex::Locker
l(state
->lock
);
122 daemon_meta
.erase("name");
123 daemon_meta
.erase("hostname");
124 state
->metadata
.clear();
125 for (const auto &i
: daemon_meta
) {
126 state
->metadata
[i
.first
] = i
.second
.get_str();
129 state
= std::make_shared
<DaemonState
>(daemon_state
.types
);
131 state
->hostname
= daemon_meta
.at("hostname").get_str();
133 for (const auto &i
: daemon_meta
) {
134 state
->metadata
[i
.first
] = i
.second
.get_str();
137 daemon_state
.insert(state
);
139 } else if (key
.first
== "osd") {
144 dout(1) << "mon failed to return metadata for "
145 << key
.first
<< "." << key
.second
<< ": "
146 << cpp_strerror(r
) << dendl
;
152 void Mgr::background_init(Context
*completion
)
154 Mutex::Locker
l(lock
);
155 assert(!initializing
);
156 assert(!initialized
);
161 finisher
.queue(new FunctionContext([this, completion
](int r
){
163 completion
->complete(0);
169 Mutex::Locker
l(lock
);
170 assert(initializing
);
171 assert(!initialized
);
173 // Start communicating with daemons to learn statistics etc
174 int r
= server
.init(monc
->get_global_id(), client_messenger
->get_myaddr());
176 derr
<< "Initialize server fail"<< dendl
;
179 dout(4) << "Initialized server at " << server
.get_myaddr() << dendl
;
181 // Preload all daemon metadata (will subsequently keep this
182 // up to date by watching maps, so do the initial load before
183 // we subscribe to any maps)
184 dout(4) << "Loading daemon metadata..." << dendl
;
187 // subscribe to all the maps
188 monc
->sub_want("log-info", 0, 0);
189 monc
->sub_want("mgrdigest", 0, 0);
190 monc
->sub_want("fsmap", 0, 0);
191 monc
->sub_want("servicemap", 0, 0);
193 dout(4) << "waiting for OSDMap..." << dendl
;
194 // Subscribe to OSDMap update to pass on to ClusterState
195 objecter
->maybe_request_map();
197 // reset the mon session. we get these maps through subscriptions which
198 // are stateful with the connection, so even if *we* don't have them a
199 // previous incarnation sharing the same MonClient may have.
200 monc
->reopen_session();
202 // Start Objecter and wait for OSD map
203 lock
.Unlock(); // Drop lock because OSDMap dispatch calls into my ms_dispatch
204 objecter
->wait_for_osd_map();
207 // Populate PGs in ClusterState
208 objecter
->with_osdmap([this](const OSDMap
&osd_map
) {
209 cluster_state
.notify_osdmap(osd_map
);
213 dout(4) << "waiting for FSMap..." << dendl
;
214 while (!cluster_state
.have_fsmap()) {
215 fs_map_cond
.Wait(lock
);
218 dout(4) << "waiting for config-keys..." << dendl
;
220 // Preload config keys (`get` for plugins is to be a fast local
221 // operation, we we don't have to synchronize these later because
222 // all sets will come via mgr)
225 // Wait for MgrDigest...
226 dout(4) << "waiting for MgrDigest..." << dendl
;
227 while (!digest_received
) {
228 digest_cond
.Wait(lock
);
231 // assume finisher already initialized in background_init
232 dout(4) << "starting PyModules..." << dendl
;
236 dout(4) << "Complete." << dendl
;
237 initializing
= false;
241 void Mgr::load_all_metadata()
243 assert(lock
.is_locked_by_me());
246 mds_cmd
.run(monc
, "{\"prefix\": \"mds metadata\"}");
248 osd_cmd
.run(monc
, "{\"prefix\": \"osd metadata\"}");
250 mon_cmd
.run(monc
, "{\"prefix\": \"mon metadata\"}");
258 assert(mds_cmd
.r
== 0);
259 assert(mon_cmd
.r
== 0);
260 assert(osd_cmd
.r
== 0);
262 for (auto &metadata_val
: mds_cmd
.json_result
.get_array()) {
263 json_spirit::mObject daemon_meta
= metadata_val
.get_obj();
264 if (daemon_meta
.count("hostname") == 0) {
265 dout(1) << "Skipping incomplete metadata entry" << dendl
;
269 DaemonStatePtr dm
= std::make_shared
<DaemonState
>(daemon_state
.types
);
270 dm
->key
= DaemonKey("mds",
271 daemon_meta
.at("name").get_str());
272 dm
->hostname
= daemon_meta
.at("hostname").get_str();
274 daemon_meta
.erase("name");
275 daemon_meta
.erase("hostname");
277 for (const auto &i
: daemon_meta
) {
278 dm
->metadata
[i
.first
] = i
.second
.get_str();
281 daemon_state
.insert(dm
);
284 for (auto &metadata_val
: mon_cmd
.json_result
.get_array()) {
285 json_spirit::mObject daemon_meta
= metadata_val
.get_obj();
286 if (daemon_meta
.count("hostname") == 0) {
287 dout(1) << "Skipping incomplete metadata entry" << dendl
;
291 DaemonStatePtr dm
= std::make_shared
<DaemonState
>(daemon_state
.types
);
292 dm
->key
= DaemonKey("mon",
293 daemon_meta
.at("name").get_str());
294 dm
->hostname
= daemon_meta
.at("hostname").get_str();
296 daemon_meta
.erase("name");
297 daemon_meta
.erase("hostname");
299 for (const auto &i
: daemon_meta
) {
300 dm
->metadata
[i
.first
] = i
.second
.get_str();
303 daemon_state
.insert(dm
);
306 for (auto &osd_metadata_val
: osd_cmd
.json_result
.get_array()) {
307 json_spirit::mObject osd_metadata
= osd_metadata_val
.get_obj();
308 if (osd_metadata
.count("hostname") == 0) {
309 dout(1) << "Skipping incomplete metadata entry" << dendl
;
312 dout(4) << osd_metadata
.at("hostname").get_str() << dendl
;
314 DaemonStatePtr dm
= std::make_shared
<DaemonState
>(daemon_state
.types
);
315 dm
->key
= DaemonKey("osd",
316 stringify(osd_metadata
.at("id").get_int()));
317 dm
->hostname
= osd_metadata
.at("hostname").get_str();
319 osd_metadata
.erase("id");
320 osd_metadata
.erase("hostname");
322 for (const auto &i
: osd_metadata
) {
323 dm
->metadata
[i
.first
] = i
.second
.get_str();
326 daemon_state
.insert(dm
);
330 void Mgr::load_config()
332 assert(lock
.is_locked_by_me());
334 dout(10) << "listing keys" << dendl
;
336 cmd
.run(monc
, "{\"prefix\": \"config-key ls\"}");
342 std::map
<std::string
, std::string
> loaded
;
344 for (auto &key_str
: cmd
.json_result
.get_array()) {
345 std::string
const key
= key_str
.get_str();
346 dout(20) << "saw key '" << key
<< "'" << dendl
;
348 const std::string config_prefix
= PyModules::config_prefix
;
350 if (key
.substr(0, config_prefix
.size()) == config_prefix
) {
351 dout(20) << "fetching '" << key
<< "'" << dendl
;
353 std::ostringstream cmd_json
;
354 cmd_json
<< "{\"prefix\": \"config-key get\", \"key\": \"" << key
<< "\"}";
355 get_cmd
.run(monc
, cmd_json
.str());
359 assert(get_cmd
.r
== 0);
360 loaded
[key
] = get_cmd
.outbl
.to_str();
364 py_modules
.insert_config(loaded
);
369 finisher
.queue(new FunctionContext([&](int) {
371 Mutex::Locker
l(lock
);
372 monc
->sub_unwant("log-info");
373 monc
->sub_unwant("mgrdigest");
374 monc
->sub_unwant("fsmap");
375 // First stop the server so that we're not taking any more incoming
379 // after the messenger is stopped, signal modules to shutdown via finisher
380 py_modules
.shutdown();
383 // Then stop the finisher to ensure its enqueued contexts aren't going
384 // to touch references to the things we're about to tear down
385 finisher
.wait_for_empty();
389 void Mgr::handle_osd_map()
391 assert(lock
.is_locked_by_me());
393 std::set
<std::string
> names_exist
;
396 * When we see a new OSD map, inspect the entity addrs to
397 * see if they have changed (service restart), and if so
398 * reload the metadata.
400 objecter
->with_osdmap([this, &names_exist
](const OSDMap
&osd_map
) {
401 for (unsigned int osd_id
= 0; osd_id
< osd_map
.get_num_osds(); ++osd_id
) {
402 if (!osd_map
.exists(osd_id
)) {
406 // Remember which OSDs exist so that we can cull any that don't
407 names_exist
.insert(stringify(osd_id
));
409 // Consider whether to update the daemon metadata (new/restarted daemon)
410 bool update_meta
= false;
411 const auto k
= DaemonKey("osd", stringify(osd_id
));
412 if (daemon_state
.is_updating(k
)) {
416 if (daemon_state
.exists(k
)) {
417 auto metadata
= daemon_state
.get(k
);
418 Mutex::Locker
l(metadata
->lock
);
419 auto addr_iter
= metadata
->metadata
.find("front_addr");
420 if (addr_iter
!= metadata
->metadata
.end()) {
421 const std::string
&metadata_addr
= addr_iter
->second
;
422 const auto &map_addr
= osd_map
.get_addr(osd_id
);
424 if (metadata_addr
!= stringify(map_addr
)) {
425 dout(4) << "OSD[" << osd_id
<< "] addr change " << metadata_addr
426 << " != " << stringify(map_addr
) << dendl
;
429 dout(20) << "OSD[" << osd_id
<< "] addr unchanged: "
430 << metadata_addr
<< dendl
;
433 // Awkward case where daemon went into DaemonState because it
434 // sent us a report but its metadata didn't get loaded yet
442 daemon_state
.notify_updating(k
);
443 auto c
= new MetadataUpdate(daemon_state
, k
);
444 std::ostringstream cmd
;
445 cmd
<< "{\"prefix\": \"osd metadata\", \"id\": "
447 monc
->start_mon_command(
449 {}, &c
->outbl
, &c
->outs
, c
);
453 cluster_state
.notify_osdmap(osd_map
);
456 // TODO: same culling for MonMap
457 daemon_state
.cull("osd", names_exist
);
460 void Mgr::handle_log(MLog
*m
)
462 for (const auto &e
: m
->entries
) {
463 py_modules
.notify_all(e
);
469 void Mgr::handle_service_map(MServiceMap
*m
)
471 dout(10) << "e" << m
->service_map
.epoch
<< dendl
;
472 cluster_state
.set_service_map(m
->service_map
);
473 server
.got_service_map();
476 bool Mgr::ms_dispatch(Message
*m
)
478 dout(4) << *m
<< dendl
;
479 Mutex::Locker
l(lock
);
481 switch (m
->get_type()) {
483 handle_mgr_digest(static_cast<MMgrDigest
*>(m
));
485 case CEPH_MSG_MON_MAP
:
486 py_modules
.notify_all("mon_map", "");
489 case CEPH_MSG_FS_MAP
:
490 py_modules
.notify_all("fs_map", "");
491 handle_fs_map((MFSMap
*)m
);
492 return false; // I shall let this pass through for Client
494 case CEPH_MSG_OSD_MAP
:
497 py_modules
.notify_all("osd_map", "");
499 // Continuous subscribe, so that we can generate notifications
500 // for our MgrPyModules
501 objecter
->maybe_request_map();
504 case MSG_SERVICE_MAP
:
505 handle_service_map((MServiceMap
*)m
);
506 py_modules
.notify_all("service_map", "");
510 handle_log(static_cast<MLog
*>(m
));
520 void Mgr::handle_fs_map(MFSMap
* m
)
522 assert(lock
.is_locked_by_me());
524 std::set
<std::string
> names_exist
;
526 const FSMap
&new_fsmap
= m
->get_fsmap();
528 fs_map_cond
.Signal();
530 // TODO: callers (e.g. from python land) are potentially going to see
531 // the new fsmap before we've bothered populating all the resulting
532 // daemon_state. Maybe we should block python land while we're making
533 // this kind of update?
535 cluster_state
.set_fsmap(new_fsmap
);
537 auto mds_info
= new_fsmap
.get_mds_info();
538 for (const auto &i
: mds_info
) {
539 const auto &info
= i
.second
;
541 if (!new_fsmap
.gid_exists(i
.first
)){
545 // Remember which MDS exists so that we can cull any that don't
546 names_exist
.insert(info
.name
);
548 const auto k
= DaemonKey("mds", info
.name
);
549 if (daemon_state
.is_updating(k
)) {
554 if (daemon_state
.exists(k
)) {
555 auto metadata
= daemon_state
.get(k
);
556 Mutex::Locker
l(metadata
->lock
);
557 if (metadata
->metadata
.empty() ||
558 metadata
->metadata
.count("addr") == 0) {
561 auto metadata_addr
= metadata
->metadata
.at("addr");
562 const auto map_addr
= info
.addr
;
563 update
= metadata_addr
!= stringify(map_addr
);
565 dout(4) << "MDS[" << info
.name
<< "] addr change " << metadata_addr
566 << " != " << stringify(map_addr
) << dendl
;
574 daemon_state
.notify_updating(k
);
575 auto c
= new MetadataUpdate(daemon_state
, k
);
577 // Older MDS daemons don't have addr in the metadata, so
578 // fake it if the returned metadata doesn't have the field.
579 c
->set_default("addr", stringify(info
.addr
));
581 std::ostringstream cmd
;
582 cmd
<< "{\"prefix\": \"mds metadata\", \"who\": \""
583 << info
.name
<< "\"}";
584 monc
->start_mon_command(
586 {}, &c
->outbl
, &c
->outs
, c
);
589 daemon_state
.cull("mds", names_exist
);
592 bool Mgr::got_mgr_map(const MgrMap
& m
)
594 Mutex::Locker
l(lock
);
595 dout(10) << m
<< dendl
;
597 set
<string
> old_modules
;
598 cluster_state
.with_mgrmap([&](const MgrMap
& m
) {
599 old_modules
= m
.modules
;
601 if (m
.modules
!= old_modules
) {
602 derr
<< "mgrmap module list changed to (" << m
.modules
<< "), respawn"
607 cluster_state
.set_mgr_map(m
);
612 void Mgr::handle_mgr_digest(MMgrDigest
* m
)
614 dout(10) << m
->mon_status_json
.length() << dendl
;
615 dout(10) << m
->health_json
.length() << dendl
;
616 cluster_state
.load_digest(m
);
617 py_modules
.notify_all("mon_status", "");
618 py_modules
.notify_all("health", "");
620 // Hack: use this as a tick/opportunity to prompt python-land that
621 // the pgmap might have changed since last time we were here.
622 py_modules
.notify_all("pg_summary", "");
623 dout(10) << "done." << dendl
;
627 if (!digest_received
) {
628 digest_received
= true;
629 digest_cond
.Signal();
636 server
.send_report();
639 std::vector
<MonCommand
> Mgr::get_command_set() const
641 Mutex::Locker
l(lock
);
643 std::vector
<MonCommand
> commands
= mgr_commands
;
644 std::vector
<MonCommand
> py_commands
= py_modules
.get_commands();
645 commands
.insert(commands
.end(), py_commands
.begin(), py_commands
.end());