1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2016 John Spray <john.spray@redhat.com>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
16 #include "osdc/Objecter.h"
17 #include "client/Client.h"
18 #include "common/errno.h"
19 #include "mon/MonClient.h"
20 #include "include/stringify.h"
21 #include "global/global_context.h"
22 #include "global/signal_handler.h"
24 #include "mgr/MgrContext.h"
26 #include "MgrPyModule.h"
27 #include "DaemonServer.h"
28 #include "messages/MMgrBeacon.h"
29 #include "messages/MMgrDigest.h"
30 #include "messages/MCommand.h"
31 #include "messages/MCommandReply.h"
32 #include "messages/MLog.h"
36 #define dout_context g_ceph_context
37 #define dout_subsys ceph_subsys_mgr
39 #define dout_prefix *_dout << "mgr " << __func__ << " "
42 Mgr::Mgr(MonClient
*monc_
, Messenger
*clientm_
, Objecter
*objecter_
,
43 Client
* client_
, LogChannelRef clog_
, LogChannelRef audit_clog_
) :
47 client_messenger(clientm_
),
49 timer(g_ceph_context
, lock
),
50 finisher(g_ceph_context
, "Mgr", "mgr-fin"),
51 py_modules(daemon_state
, cluster_state
, *monc
, *objecter
, *client
,
53 cluster_state(monc
, nullptr),
54 server(monc
, finisher
, daemon_state
, cluster_state
, py_modules
,
59 cluster_state
.set_objecter(objecter
);
69 * Context for completion of metadata mon commands: take
70 * the result and stash it in DaemonStateIndex
72 class MetadataUpdate
: public Context
74 DaemonStateIndex
&daemon_state
;
77 std::map
<std::string
, std::string
> defaults
;
83 MetadataUpdate(DaemonStateIndex
&daemon_state_
, const DaemonKey
&key_
)
84 : daemon_state(daemon_state_
), key(key_
) {}
86 void set_default(const std::string
&k
, const std::string
&v
)
91 void finish(int r
) override
93 daemon_state
.clear_updating(key
);
95 if (key
.first
== CEPH_ENTITY_TYPE_MDS
) {
96 json_spirit::mValue json_result
;
97 bool read_ok
= json_spirit::read(
98 outbl
.to_str(), json_result
);
100 dout(1) << "mon returned invalid JSON for "
101 << ceph_entity_type_name(key
.first
)
102 << "." << key
.second
<< dendl
;
106 json_spirit::mObject daemon_meta
= json_result
.get_obj();
108 // Apply any defaults
109 for (const auto &i
: defaults
) {
110 if (daemon_meta
.find(i
.first
) == daemon_meta
.end()) {
111 daemon_meta
[i
.first
] = i
.second
;
115 DaemonStatePtr state
;
116 if (daemon_state
.exists(key
)) {
117 state
= daemon_state
.get(key
);
119 daemon_meta
.erase("name");
120 daemon_meta
.erase("hostname");
121 state
->metadata
.clear();
122 for (const auto &i
: daemon_meta
) {
123 state
->metadata
[i
.first
] = i
.second
.get_str();
126 state
= std::make_shared
<DaemonState
>(daemon_state
.types
);
128 state
->hostname
= daemon_meta
.at("hostname").get_str();
130 for (const auto &i
: daemon_meta
) {
131 state
->metadata
[i
.first
] = i
.second
.get_str();
134 daemon_state
.insert(state
);
136 } else if (key
.first
== CEPH_ENTITY_TYPE_OSD
) {
141 dout(1) << "mon failed to return metadata for "
142 << ceph_entity_type_name(key
.first
)
143 << "." << key
.second
<< ": " << cpp_strerror(r
) << dendl
;
149 void Mgr::background_init()
151 Mutex::Locker
l(lock
);
152 assert(!initializing
);
153 assert(!initialized
);
158 finisher
.queue(new FunctionContext([this](int r
){
165 Mutex::Locker
l(lock
);
166 assert(initializing
);
167 assert(!initialized
);
169 // Start communicating with daemons to learn statistics etc
170 int r
= server
.init(monc
->get_global_id(), client_messenger
->get_myaddr());
172 derr
<< "Initialize server fail"<< dendl
;
175 dout(4) << "Initialized server at " << server
.get_myaddr() << dendl
;
177 // Preload all daemon metadata (will subsequently keep this
178 // up to date by watching maps, so do the initial load before
179 // we subscribe to any maps)
180 dout(4) << "Loading daemon metadata..." << dendl
;
183 // subscribe to all the maps
184 monc
->sub_want("log-info", 0, 0);
185 monc
->sub_want("mgrdigest", 0, 0);
186 monc
->sub_want("fsmap", 0, 0);
188 dout(4) << "waiting for OSDMap..." << dendl
;
189 // Subscribe to OSDMap update to pass on to ClusterState
190 objecter
->maybe_request_map();
192 // reset the mon session. we get these maps through subscriptions which
193 // are stateful with the connection, so even if *we* don't have them a
194 // previous incarnation sharing the same MonClient may have.
195 monc
->reopen_session();
197 // Start Objecter and wait for OSD map
198 lock
.Unlock(); // Drop lock because OSDMap dispatch calls into my ms_dispatch
199 objecter
->wait_for_osd_map();
202 // Populate PGs in ClusterState
203 objecter
->with_osdmap([this](const OSDMap
&osd_map
) {
204 cluster_state
.notify_osdmap(osd_map
);
208 dout(4) << "waiting for FSMap..." << dendl
;
209 while (!cluster_state
.have_fsmap()) {
210 fs_map_cond
.Wait(lock
);
213 dout(4) << "waiting for config-keys..." << dendl
;
215 // Preload config keys (`get` for plugins is to be a fast local
216 // operation, we we don't have to synchronize these later because
217 // all sets will come via mgr)
220 // Wait for MgrDigest...?
223 // assume finisher already initialized in background_init
228 dout(4) << "Complete." << dendl
;
229 initializing
= false;
233 void Mgr::load_all_metadata()
235 assert(lock
.is_locked_by_me());
238 mds_cmd
.run(monc
, "{\"prefix\": \"mds metadata\"}");
240 osd_cmd
.run(monc
, "{\"prefix\": \"osd metadata\"}");
242 mon_cmd
.run(monc
, "{\"prefix\": \"mon metadata\"}");
250 assert(mds_cmd
.r
== 0);
251 assert(mon_cmd
.r
== 0);
252 assert(osd_cmd
.r
== 0);
254 for (auto &metadata_val
: mds_cmd
.json_result
.get_array()) {
255 json_spirit::mObject daemon_meta
= metadata_val
.get_obj();
256 if (daemon_meta
.count("hostname") == 0) {
257 dout(1) << "Skipping incomplete metadata entry" << dendl
;
261 DaemonStatePtr dm
= std::make_shared
<DaemonState
>(daemon_state
.types
);
262 dm
->key
= DaemonKey(CEPH_ENTITY_TYPE_MDS
,
263 daemon_meta
.at("name").get_str());
264 dm
->hostname
= daemon_meta
.at("hostname").get_str();
266 daemon_meta
.erase("name");
267 daemon_meta
.erase("hostname");
269 for (const auto &i
: daemon_meta
) {
270 dm
->metadata
[i
.first
] = i
.second
.get_str();
273 daemon_state
.insert(dm
);
276 for (auto &metadata_val
: mon_cmd
.json_result
.get_array()) {
277 json_spirit::mObject daemon_meta
= metadata_val
.get_obj();
278 if (daemon_meta
.count("hostname") == 0) {
279 dout(1) << "Skipping incomplete metadata entry" << dendl
;
283 DaemonStatePtr dm
= std::make_shared
<DaemonState
>(daemon_state
.types
);
284 dm
->key
= DaemonKey(CEPH_ENTITY_TYPE_MON
,
285 daemon_meta
.at("name").get_str());
286 dm
->hostname
= daemon_meta
.at("hostname").get_str();
288 daemon_meta
.erase("name");
289 daemon_meta
.erase("hostname");
291 for (const auto &i
: daemon_meta
) {
292 dm
->metadata
[i
.first
] = i
.second
.get_str();
295 daemon_state
.insert(dm
);
298 for (auto &osd_metadata_val
: osd_cmd
.json_result
.get_array()) {
299 json_spirit::mObject osd_metadata
= osd_metadata_val
.get_obj();
300 if (osd_metadata
.count("hostname") == 0) {
301 dout(1) << "Skipping incomplete metadata entry" << dendl
;
304 dout(4) << osd_metadata
.at("hostname").get_str() << dendl
;
306 DaemonStatePtr dm
= std::make_shared
<DaemonState
>(daemon_state
.types
);
307 dm
->key
= DaemonKey(CEPH_ENTITY_TYPE_OSD
,
308 stringify(osd_metadata
.at("id").get_int()));
309 dm
->hostname
= osd_metadata
.at("hostname").get_str();
311 osd_metadata
.erase("id");
312 osd_metadata
.erase("hostname");
314 for (const auto &i
: osd_metadata
) {
315 dm
->metadata
[i
.first
] = i
.second
.get_str();
318 daemon_state
.insert(dm
);
322 void Mgr::load_config()
324 assert(lock
.is_locked_by_me());
326 dout(10) << "listing keys" << dendl
;
328 cmd
.run(monc
, "{\"prefix\": \"config-key list\"}");
334 std::map
<std::string
, std::string
> loaded
;
336 for (auto &key_str
: cmd
.json_result
.get_array()) {
337 std::string
const key
= key_str
.get_str();
338 dout(20) << "saw key '" << key
<< "'" << dendl
;
340 const std::string config_prefix
= PyModules::config_prefix
;
342 if (key
.substr(0, config_prefix
.size()) == config_prefix
) {
343 dout(20) << "fetching '" << key
<< "'" << dendl
;
345 std::ostringstream cmd_json
;
346 cmd_json
<< "{\"prefix\": \"config-key get\", \"key\": \"" << key
<< "\"}";
347 get_cmd
.run(monc
, cmd_json
.str());
351 assert(get_cmd
.r
== 0);
352 loaded
[key
] = get_cmd
.outbl
.to_str();
356 py_modules
.insert_config(loaded
);
361 finisher
.queue(new FunctionContext([&](int) {
363 Mutex::Locker
l(lock
);
364 monc
->sub_unwant("log-info");
365 monc
->sub_unwant("mgrdigest");
366 monc
->sub_unwant("fsmap");
367 // First stop the server so that we're not taking any more incoming
371 // after the messenger is stopped, signal modules to shutdown via finisher
372 py_modules
.shutdown();
375 // Then stop the finisher to ensure its enqueued contexts aren't going
376 // to touch references to the things we're about to tear down
377 finisher
.wait_for_empty();
381 void Mgr::handle_osd_map()
383 assert(lock
.is_locked_by_me());
385 std::set
<std::string
> names_exist
;
388 * When we see a new OSD map, inspect the entity addrs to
389 * see if they have changed (service restart), and if so
390 * reload the metadata.
392 objecter
->with_osdmap([this, &names_exist
](const OSDMap
&osd_map
) {
393 for (unsigned int osd_id
= 0; osd_id
< osd_map
.get_num_osds(); ++osd_id
) {
394 if (!osd_map
.exists(osd_id
)) {
398 // Remember which OSDs exist so that we can cull any that don't
399 names_exist
.insert(stringify(osd_id
));
401 // Consider whether to update the daemon metadata (new/restarted daemon)
402 bool update_meta
= false;
403 const auto k
= DaemonKey(CEPH_ENTITY_TYPE_OSD
, stringify(osd_id
));
404 if (daemon_state
.is_updating(k
)) {
408 if (daemon_state
.exists(k
)) {
409 auto metadata
= daemon_state
.get(k
);
410 auto addr_iter
= metadata
->metadata
.find("front_addr");
411 if (addr_iter
!= metadata
->metadata
.end()) {
412 const std::string
&metadata_addr
= addr_iter
->second
;
413 const auto &map_addr
= osd_map
.get_addr(osd_id
);
415 if (metadata_addr
!= stringify(map_addr
)) {
416 dout(4) << "OSD[" << osd_id
<< "] addr change " << metadata_addr
417 << " != " << stringify(map_addr
) << dendl
;
420 dout(20) << "OSD[" << osd_id
<< "] addr unchanged: "
421 << metadata_addr
<< dendl
;
424 // Awkward case where daemon went into DaemonState because it
425 // sent us a report but its metadata didn't get loaded yet
433 daemon_state
.notify_updating(k
);
434 auto c
= new MetadataUpdate(daemon_state
, k
);
435 std::ostringstream cmd
;
436 cmd
<< "{\"prefix\": \"osd metadata\", \"id\": "
438 monc
->start_mon_command(
440 {}, &c
->outbl
, &c
->outs
, c
);
444 cluster_state
.notify_osdmap(osd_map
);
447 // TODO: same culling for MonMap
448 daemon_state
.cull(CEPH_ENTITY_TYPE_OSD
, names_exist
);
451 void Mgr::handle_log(MLog
*m
)
453 for (const auto &e
: m
->entries
) {
454 py_modules
.notify_all(e
);
460 bool Mgr::ms_dispatch(Message
*m
)
462 dout(4) << *m
<< dendl
;
463 Mutex::Locker
l(lock
);
465 switch (m
->get_type()) {
467 handle_mgr_digest(static_cast<MMgrDigest
*>(m
));
469 case CEPH_MSG_MON_MAP
:
470 py_modules
.notify_all("mon_map", "");
473 case CEPH_MSG_FS_MAP
:
474 py_modules
.notify_all("fs_map", "");
475 handle_fs_map((MFSMap
*)m
);
476 return false; // I shall let this pass through for Client
478 case CEPH_MSG_OSD_MAP
:
481 py_modules
.notify_all("osd_map", "");
483 // Continuous subscribe, so that we can generate notifications
484 // for our MgrPyModules
485 objecter
->maybe_request_map();
489 handle_log(static_cast<MLog
*>(m
));
499 void Mgr::handle_fs_map(MFSMap
* m
)
501 assert(lock
.is_locked_by_me());
503 std::set
<std::string
> names_exist
;
505 const FSMap
&new_fsmap
= m
->get_fsmap();
507 fs_map_cond
.Signal();
509 // TODO: callers (e.g. from python land) are potentially going to see
510 // the new fsmap before we've bothered populating all the resulting
511 // daemon_state. Maybe we should block python land while we're making
512 // this kind of update?
514 cluster_state
.set_fsmap(new_fsmap
);
516 auto mds_info
= new_fsmap
.get_mds_info();
517 for (const auto &i
: mds_info
) {
518 const auto &info
= i
.second
;
520 if (!new_fsmap
.gid_exists(i
.first
)){
524 // Remember which MDS exists so that we can cull any that don't
525 names_exist
.insert(info
.name
);
527 const auto k
= DaemonKey(CEPH_ENTITY_TYPE_MDS
, info
.name
);
528 if (daemon_state
.is_updating(k
)) {
533 if (daemon_state
.exists(k
)) {
534 auto metadata
= daemon_state
.get(k
);
535 if (metadata
->metadata
.empty() ||
536 metadata
->metadata
.count("addr") == 0) {
539 auto metadata_addr
= metadata
->metadata
.at("addr");
540 const auto map_addr
= info
.addr
;
541 update
= metadata_addr
!= stringify(map_addr
);
543 dout(4) << "MDS[" << info
.name
<< "] addr change " << metadata_addr
544 << " != " << stringify(map_addr
) << dendl
;
552 daemon_state
.notify_updating(k
);
553 auto c
= new MetadataUpdate(daemon_state
, k
);
555 // Older MDS daemons don't have addr in the metadata, so
556 // fake it if the returned metadata doesn't have the field.
557 c
->set_default("addr", stringify(info
.addr
));
559 std::ostringstream cmd
;
560 cmd
<< "{\"prefix\": \"mds metadata\", \"who\": \""
561 << info
.name
<< "\"}";
562 monc
->start_mon_command(
564 {}, &c
->outbl
, &c
->outs
, c
);
567 daemon_state
.cull(CEPH_ENTITY_TYPE_MDS
, names_exist
);
571 void Mgr::handle_mgr_digest(MMgrDigest
* m
)
573 dout(10) << m
->mon_status_json
.length() << dendl
;
574 dout(10) << m
->health_json
.length() << dendl
;
575 cluster_state
.load_digest(m
);
576 py_modules
.notify_all("mon_status", "");
577 py_modules
.notify_all("health", "");
579 // Hack: use this as a tick/opportunity to prompt python-land that
580 // the pgmap might have changed since last time we were here.
581 py_modules
.notify_all("pg_summary", "");
582 dout(10) << "done." << dendl
;
590 server
.send_report();