1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2016 John Spray <john.spray@redhat.com>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
16 #include "osdc/Objecter.h"
17 #include "client/Client.h"
18 #include "common/errno.h"
19 #include "mon/MonClient.h"
20 #include "include/stringify.h"
21 #include "global/global_context.h"
22 #include "global/signal_handler.h"
24 #include "mgr/MgrContext.h"
26 #include "MgrPyModule.h"
27 #include "DaemonServer.h"
28 #include "messages/MMgrBeacon.h"
29 #include "messages/MMgrDigest.h"
30 #include "messages/MCommand.h"
31 #include "messages/MCommandReply.h"
32 #include "messages/MLog.h"
33 #include "messages/MServiceMap.h"
37 #define dout_context g_ceph_context
38 #define dout_subsys ceph_subsys_mgr
40 #define dout_prefix *_dout << "mgr " << __func__ << " "
43 Mgr::Mgr(MonClient
*monc_
, const MgrMap
& mgrmap
,
44 Messenger
*clientm_
, Objecter
*objecter_
,
45 Client
* client_
, LogChannelRef clog_
, LogChannelRef audit_clog_
) :
49 client_messenger(clientm_
),
51 timer(g_ceph_context
, lock
),
52 finisher(g_ceph_context
, "Mgr", "mgr-fin"),
53 digest_received(false),
54 py_modules(daemon_state
, cluster_state
, *monc
, clog_
, *objecter
, *client
,
56 cluster_state(monc
, nullptr, mgrmap
),
57 server(monc
, finisher
, daemon_state
, cluster_state
, py_modules
,
62 cluster_state
.set_objecter(objecter
);
72 * Context for completion of metadata mon commands: take
73 * the result and stash it in DaemonStateIndex
75 class MetadataUpdate
: public Context
77 DaemonStateIndex
&daemon_state
;
80 std::map
<std::string
, std::string
> defaults
;
86 MetadataUpdate(DaemonStateIndex
&daemon_state_
, const DaemonKey
&key_
)
87 : daemon_state(daemon_state_
), key(key_
) {}
89 void set_default(const std::string
&k
, const std::string
&v
)
94 void finish(int r
) override
96 daemon_state
.clear_updating(key
);
98 if (key
.first
== "mds") {
99 json_spirit::mValue json_result
;
100 bool read_ok
= json_spirit::read(
101 outbl
.to_str(), json_result
);
103 dout(1) << "mon returned invalid JSON for "
104 << key
.first
<< "." << key
.second
<< dendl
;
108 json_spirit::mObject daemon_meta
= json_result
.get_obj();
110 // Apply any defaults
111 for (const auto &i
: defaults
) {
112 if (daemon_meta
.find(i
.first
) == daemon_meta
.end()) {
113 daemon_meta
[i
.first
] = i
.second
;
117 DaemonStatePtr state
;
118 if (daemon_state
.exists(key
)) {
119 state
= daemon_state
.get(key
);
121 daemon_meta
.erase("name");
122 daemon_meta
.erase("hostname");
123 state
->metadata
.clear();
124 for (const auto &i
: daemon_meta
) {
125 state
->metadata
[i
.first
] = i
.second
.get_str();
128 state
= std::make_shared
<DaemonState
>(daemon_state
.types
);
130 state
->hostname
= daemon_meta
.at("hostname").get_str();
132 for (const auto &i
: daemon_meta
) {
133 state
->metadata
[i
.first
] = i
.second
.get_str();
136 daemon_state
.insert(state
);
138 } else if (key
.first
== "osd") {
143 dout(1) << "mon failed to return metadata for "
144 << key
.first
<< "." << key
.second
<< ": "
145 << cpp_strerror(r
) << dendl
;
151 void Mgr::background_init(Context
*completion
)
153 Mutex::Locker
l(lock
);
154 assert(!initializing
);
155 assert(!initialized
);
160 finisher
.queue(new FunctionContext([this, completion
](int r
){
162 completion
->complete(0);
168 Mutex::Locker
l(lock
);
169 assert(initializing
);
170 assert(!initialized
);
172 // Start communicating with daemons to learn statistics etc
173 int r
= server
.init(monc
->get_global_id(), client_messenger
->get_myaddr());
175 derr
<< "Initialize server fail"<< dendl
;
178 dout(4) << "Initialized server at " << server
.get_myaddr() << dendl
;
180 // Preload all daemon metadata (will subsequently keep this
181 // up to date by watching maps, so do the initial load before
182 // we subscribe to any maps)
183 dout(4) << "Loading daemon metadata..." << dendl
;
186 // subscribe to all the maps
187 monc
->sub_want("log-info", 0, 0);
188 monc
->sub_want("mgrdigest", 0, 0);
189 monc
->sub_want("fsmap", 0, 0);
190 monc
->sub_want("servicemap", 0, 0);
192 dout(4) << "waiting for OSDMap..." << dendl
;
193 // Subscribe to OSDMap update to pass on to ClusterState
194 objecter
->maybe_request_map();
196 // reset the mon session. we get these maps through subscriptions which
197 // are stateful with the connection, so even if *we* don't have them a
198 // previous incarnation sharing the same MonClient may have.
199 monc
->reopen_session();
201 // Start Objecter and wait for OSD map
202 lock
.Unlock(); // Drop lock because OSDMap dispatch calls into my ms_dispatch
203 objecter
->wait_for_osd_map();
206 // Populate PGs in ClusterState
207 objecter
->with_osdmap([this](const OSDMap
&osd_map
) {
208 cluster_state
.notify_osdmap(osd_map
);
212 dout(4) << "waiting for FSMap..." << dendl
;
213 while (!cluster_state
.have_fsmap()) {
214 fs_map_cond
.Wait(lock
);
217 dout(4) << "waiting for config-keys..." << dendl
;
219 // Preload config keys (`get` for plugins is to be a fast local
220 // operation, we we don't have to synchronize these later because
221 // all sets will come via mgr)
224 // Wait for MgrDigest...
225 dout(4) << "waiting for MgrDigest..." << dendl
;
226 while (!digest_received
) {
227 digest_cond
.Wait(lock
);
230 // assume finisher already initialized in background_init
231 dout(4) << "starting PyModules..." << dendl
;
235 dout(4) << "Complete." << dendl
;
236 initializing
= false;
240 void Mgr::load_all_metadata()
242 assert(lock
.is_locked_by_me());
245 mds_cmd
.run(monc
, "{\"prefix\": \"mds metadata\"}");
247 osd_cmd
.run(monc
, "{\"prefix\": \"osd metadata\"}");
249 mon_cmd
.run(monc
, "{\"prefix\": \"mon metadata\"}");
257 assert(mds_cmd
.r
== 0);
258 assert(mon_cmd
.r
== 0);
259 assert(osd_cmd
.r
== 0);
261 for (auto &metadata_val
: mds_cmd
.json_result
.get_array()) {
262 json_spirit::mObject daemon_meta
= metadata_val
.get_obj();
263 if (daemon_meta
.count("hostname") == 0) {
264 dout(1) << "Skipping incomplete metadata entry" << dendl
;
268 DaemonStatePtr dm
= std::make_shared
<DaemonState
>(daemon_state
.types
);
269 dm
->key
= DaemonKey("mds",
270 daemon_meta
.at("name").get_str());
271 dm
->hostname
= daemon_meta
.at("hostname").get_str();
273 daemon_meta
.erase("name");
274 daemon_meta
.erase("hostname");
276 for (const auto &i
: daemon_meta
) {
277 dm
->metadata
[i
.first
] = i
.second
.get_str();
280 daemon_state
.insert(dm
);
283 for (auto &metadata_val
: mon_cmd
.json_result
.get_array()) {
284 json_spirit::mObject daemon_meta
= metadata_val
.get_obj();
285 if (daemon_meta
.count("hostname") == 0) {
286 dout(1) << "Skipping incomplete metadata entry" << dendl
;
290 DaemonStatePtr dm
= std::make_shared
<DaemonState
>(daemon_state
.types
);
291 dm
->key
= DaemonKey("mon",
292 daemon_meta
.at("name").get_str());
293 dm
->hostname
= daemon_meta
.at("hostname").get_str();
295 daemon_meta
.erase("name");
296 daemon_meta
.erase("hostname");
298 for (const auto &i
: daemon_meta
) {
299 dm
->metadata
[i
.first
] = i
.second
.get_str();
302 daemon_state
.insert(dm
);
305 for (auto &osd_metadata_val
: osd_cmd
.json_result
.get_array()) {
306 json_spirit::mObject osd_metadata
= osd_metadata_val
.get_obj();
307 if (osd_metadata
.count("hostname") == 0) {
308 dout(1) << "Skipping incomplete metadata entry" << dendl
;
311 dout(4) << osd_metadata
.at("hostname").get_str() << dendl
;
313 DaemonStatePtr dm
= std::make_shared
<DaemonState
>(daemon_state
.types
);
314 dm
->key
= DaemonKey("osd",
315 stringify(osd_metadata
.at("id").get_int()));
316 dm
->hostname
= osd_metadata
.at("hostname").get_str();
318 osd_metadata
.erase("id");
319 osd_metadata
.erase("hostname");
321 for (const auto &i
: osd_metadata
) {
322 dm
->metadata
[i
.first
] = i
.second
.get_str();
325 daemon_state
.insert(dm
);
329 void Mgr::load_config()
331 assert(lock
.is_locked_by_me());
333 dout(10) << "listing keys" << dendl
;
335 cmd
.run(monc
, "{\"prefix\": \"config-key list\"}");
341 std::map
<std::string
, std::string
> loaded
;
343 for (auto &key_str
: cmd
.json_result
.get_array()) {
344 std::string
const key
= key_str
.get_str();
345 dout(20) << "saw key '" << key
<< "'" << dendl
;
347 const std::string config_prefix
= PyModules::config_prefix
;
349 if (key
.substr(0, config_prefix
.size()) == config_prefix
) {
350 dout(20) << "fetching '" << key
<< "'" << dendl
;
352 std::ostringstream cmd_json
;
353 cmd_json
<< "{\"prefix\": \"config-key get\", \"key\": \"" << key
<< "\"}";
354 get_cmd
.run(monc
, cmd_json
.str());
358 assert(get_cmd
.r
== 0);
359 loaded
[key
] = get_cmd
.outbl
.to_str();
363 py_modules
.insert_config(loaded
);
368 finisher
.queue(new FunctionContext([&](int) {
370 Mutex::Locker
l(lock
);
371 monc
->sub_unwant("log-info");
372 monc
->sub_unwant("mgrdigest");
373 monc
->sub_unwant("fsmap");
374 // First stop the server so that we're not taking any more incoming
378 // after the messenger is stopped, signal modules to shutdown via finisher
379 py_modules
.shutdown();
382 // Then stop the finisher to ensure its enqueued contexts aren't going
383 // to touch references to the things we're about to tear down
384 finisher
.wait_for_empty();
388 void Mgr::handle_osd_map()
390 assert(lock
.is_locked_by_me());
392 std::set
<std::string
> names_exist
;
395 * When we see a new OSD map, inspect the entity addrs to
396 * see if they have changed (service restart), and if so
397 * reload the metadata.
399 objecter
->with_osdmap([this, &names_exist
](const OSDMap
&osd_map
) {
400 for (unsigned int osd_id
= 0; osd_id
< osd_map
.get_num_osds(); ++osd_id
) {
401 if (!osd_map
.exists(osd_id
)) {
405 // Remember which OSDs exist so that we can cull any that don't
406 names_exist
.insert(stringify(osd_id
));
408 // Consider whether to update the daemon metadata (new/restarted daemon)
409 bool update_meta
= false;
410 const auto k
= DaemonKey("osd", stringify(osd_id
));
411 if (daemon_state
.is_updating(k
)) {
415 if (daemon_state
.exists(k
)) {
416 auto metadata
= daemon_state
.get(k
);
417 auto addr_iter
= metadata
->metadata
.find("front_addr");
418 if (addr_iter
!= metadata
->metadata
.end()) {
419 const std::string
&metadata_addr
= addr_iter
->second
;
420 const auto &map_addr
= osd_map
.get_addr(osd_id
);
422 if (metadata_addr
!= stringify(map_addr
)) {
423 dout(4) << "OSD[" << osd_id
<< "] addr change " << metadata_addr
424 << " != " << stringify(map_addr
) << dendl
;
427 dout(20) << "OSD[" << osd_id
<< "] addr unchanged: "
428 << metadata_addr
<< dendl
;
431 // Awkward case where daemon went into DaemonState because it
432 // sent us a report but its metadata didn't get loaded yet
440 daemon_state
.notify_updating(k
);
441 auto c
= new MetadataUpdate(daemon_state
, k
);
442 std::ostringstream cmd
;
443 cmd
<< "{\"prefix\": \"osd metadata\", \"id\": "
445 monc
->start_mon_command(
447 {}, &c
->outbl
, &c
->outs
, c
);
451 cluster_state
.notify_osdmap(osd_map
);
454 // TODO: same culling for MonMap
455 daemon_state
.cull("osd", names_exist
);
458 void Mgr::handle_log(MLog
*m
)
460 for (const auto &e
: m
->entries
) {
461 py_modules
.notify_all(e
);
467 void Mgr::handle_service_map(MServiceMap
*m
)
469 dout(10) << "e" << m
->service_map
.epoch
<< dendl
;
470 cluster_state
.set_service_map(m
->service_map
);
471 server
.got_service_map();
474 bool Mgr::ms_dispatch(Message
*m
)
476 dout(4) << *m
<< dendl
;
477 Mutex::Locker
l(lock
);
479 switch (m
->get_type()) {
481 handle_mgr_digest(static_cast<MMgrDigest
*>(m
));
483 case CEPH_MSG_MON_MAP
:
484 py_modules
.notify_all("mon_map", "");
487 case CEPH_MSG_FS_MAP
:
488 py_modules
.notify_all("fs_map", "");
489 handle_fs_map((MFSMap
*)m
);
490 return false; // I shall let this pass through for Client
492 case CEPH_MSG_OSD_MAP
:
495 py_modules
.notify_all("osd_map", "");
497 // Continuous subscribe, so that we can generate notifications
498 // for our MgrPyModules
499 objecter
->maybe_request_map();
502 case MSG_SERVICE_MAP
:
503 handle_service_map((MServiceMap
*)m
);
504 py_modules
.notify_all("service_map", "");
508 handle_log(static_cast<MLog
*>(m
));
518 void Mgr::handle_fs_map(MFSMap
* m
)
520 assert(lock
.is_locked_by_me());
522 std::set
<std::string
> names_exist
;
524 const FSMap
&new_fsmap
= m
->get_fsmap();
526 fs_map_cond
.Signal();
528 // TODO: callers (e.g. from python land) are potentially going to see
529 // the new fsmap before we've bothered populating all the resulting
530 // daemon_state. Maybe we should block python land while we're making
531 // this kind of update?
533 cluster_state
.set_fsmap(new_fsmap
);
535 auto mds_info
= new_fsmap
.get_mds_info();
536 for (const auto &i
: mds_info
) {
537 const auto &info
= i
.second
;
539 if (!new_fsmap
.gid_exists(i
.first
)){
543 // Remember which MDS exists so that we can cull any that don't
544 names_exist
.insert(info
.name
);
546 const auto k
= DaemonKey("mds", info
.name
);
547 if (daemon_state
.is_updating(k
)) {
552 if (daemon_state
.exists(k
)) {
553 auto metadata
= daemon_state
.get(k
);
554 if (metadata
->metadata
.empty() ||
555 metadata
->metadata
.count("addr") == 0) {
558 auto metadata_addr
= metadata
->metadata
.at("addr");
559 const auto map_addr
= info
.addr
;
560 update
= metadata_addr
!= stringify(map_addr
);
562 dout(4) << "MDS[" << info
.name
<< "] addr change " << metadata_addr
563 << " != " << stringify(map_addr
) << dendl
;
571 daemon_state
.notify_updating(k
);
572 auto c
= new MetadataUpdate(daemon_state
, k
);
574 // Older MDS daemons don't have addr in the metadata, so
575 // fake it if the returned metadata doesn't have the field.
576 c
->set_default("addr", stringify(info
.addr
));
578 std::ostringstream cmd
;
579 cmd
<< "{\"prefix\": \"mds metadata\", \"who\": \""
580 << info
.name
<< "\"}";
581 monc
->start_mon_command(
583 {}, &c
->outbl
, &c
->outs
, c
);
586 daemon_state
.cull("mds", names_exist
);
589 bool Mgr::got_mgr_map(const MgrMap
& m
)
591 Mutex::Locker
l(lock
);
592 dout(10) << m
<< dendl
;
594 set
<string
> old_modules
;
595 cluster_state
.with_mgrmap([&](const MgrMap
& m
) {
596 old_modules
= m
.modules
;
598 if (m
.modules
!= old_modules
) {
599 derr
<< "mgrmap module list changed to (" << m
.modules
<< "), respawn"
604 cluster_state
.set_mgr_map(m
);
609 void Mgr::handle_mgr_digest(MMgrDigest
* m
)
611 dout(10) << m
->mon_status_json
.length() << dendl
;
612 dout(10) << m
->health_json
.length() << dendl
;
613 cluster_state
.load_digest(m
);
614 py_modules
.notify_all("mon_status", "");
615 py_modules
.notify_all("health", "");
617 // Hack: use this as a tick/opportunity to prompt python-land that
618 // the pgmap might have changed since last time we were here.
619 py_modules
.notify_all("pg_summary", "");
620 dout(10) << "done." << dendl
;
624 if (!digest_received
) {
625 digest_received
= true;
626 digest_cond
.Signal();
633 server
.send_report();