]> git.proxmox.com Git - ceph.git/blame - ceph/src/mgr/Mgr.cc
update sources to v12.2.5
[ceph.git] / ceph / src / mgr / Mgr.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2016 John Spray <john.spray@redhat.com>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 */
13
14#include <Python.h>
15
16#include "osdc/Objecter.h"
17#include "client/Client.h"
18#include "common/errno.h"
19#include "mon/MonClient.h"
20#include "include/stringify.h"
21#include "global/global_context.h"
22#include "global/signal_handler.h"
23
24#include "mgr/MgrContext.h"
c07f9fc5 25#include "mgr/mgr_commands.h"
7c673cae 26
3efd9988 27//#include "MgrPyModule.h"
7c673cae
FG
28#include "DaemonServer.h"
29#include "messages/MMgrBeacon.h"
30#include "messages/MMgrDigest.h"
31#include "messages/MCommand.h"
32#include "messages/MCommandReply.h"
33#include "messages/MLog.h"
224ce89b 34#include "messages/MServiceMap.h"
7c673cae
FG
35
36#include "Mgr.h"
37
38#define dout_context g_ceph_context
39#define dout_subsys ceph_subsys_mgr
40#undef dout_prefix
41#define dout_prefix *_dout << "mgr " << __func__ << " "
42
43
224ce89b 44Mgr::Mgr(MonClient *monc_, const MgrMap& mgrmap,
3efd9988 45 PyModuleRegistry *py_module_registry_,
224ce89b 46 Messenger *clientm_, Objecter *objecter_,
7c673cae
FG
47 Client* client_, LogChannelRef clog_, LogChannelRef audit_clog_) :
48 monc(monc_),
49 objecter(objecter_),
50 client(client_),
51 client_messenger(clientm_),
52 lock("Mgr::lock"),
53 timer(g_ceph_context, lock),
54 finisher(g_ceph_context, "Mgr", "mgr-fin"),
224ce89b 55 digest_received(false),
3efd9988 56 py_module_registry(py_module_registry_),
224ce89b 57 cluster_state(monc, nullptr, mgrmap),
3efd9988 58 server(monc, finisher, daemon_state, cluster_state, *py_module_registry,
7c673cae 59 clog_, audit_clog_),
3efd9988
FG
60 clog(clog_),
61 audit_clog(audit_clog_),
7c673cae
FG
62 initialized(false),
63 initializing(false)
64{
65 cluster_state.set_objecter(objecter);
66}
67
68
69Mgr::~Mgr()
70{
71}
72
b32b8144 73void MetadataUpdate::finish(int r)
7c673cae 74{
b32b8144
FG
75 daemon_state.clear_updating(key);
76 if (r == 0) {
77 if (key.first == "mds" || key.first == "osd") {
78 json_spirit::mValue json_result;
79 bool read_ok = json_spirit::read(
80 outbl.to_str(), json_result);
81 if (!read_ok) {
82 dout(1) << "mon returned invalid JSON for "
83 << key.first << "." << key.second << dendl;
84 return;
85 }
86 dout(4) << "mon returned valid metadata JSON for "
87 << key.first << "." << key.second << dendl;
7c673cae 88
b32b8144 89 json_spirit::mObject daemon_meta = json_result.get_obj();
7c673cae 90
b32b8144
FG
91 // Apply any defaults
92 for (const auto &i : defaults) {
93 if (daemon_meta.find(i.first) == daemon_meta.end()) {
94 daemon_meta[i.first] = i.second;
7c673cae 95 }
b32b8144 96 }
7c673cae 97
b32b8144
FG
98 DaemonStatePtr state;
99 if (daemon_state.exists(key)) {
100 state = daemon_state.get(key);
101 Mutex::Locker l(state->lock);
102 if (key.first == "mds") {
103 daemon_meta.erase("name");
104 } else if (key.first == "osd") {
105 daemon_meta.erase("id");
106 }
107 daemon_meta.erase("hostname");
108 state->metadata.clear();
109 for (const auto &i : daemon_meta) {
110 state->metadata[i.first] = i.second.get_str();
7c673cae 111 }
b32b8144
FG
112 } else {
113 state = std::make_shared<DaemonState>(daemon_state.types);
114 state->key = key;
115 state->hostname = daemon_meta.at("hostname").get_str();
7c673cae 116
b32b8144 117 if (key.first == "mds") {
7c673cae 118 daemon_meta.erase("name");
b32b8144
FG
119 } else if (key.first == "osd") {
120 daemon_meta.erase("id");
7c673cae 121 }
b32b8144
FG
122 daemon_meta.erase("hostname");
123
124 for (const auto &i : daemon_meta) {
125 state->metadata[i.first] = i.second.get_str();
126 }
127
128 daemon_state.insert(state);
7c673cae
FG
129 }
130 } else {
b32b8144 131 ceph_abort();
7c673cae 132 }
b32b8144
FG
133 } else {
134 dout(1) << "mon failed to return metadata for "
135 << key.first << "." << key.second << ": "
136 << cpp_strerror(r) << dendl;
7c673cae 137 }
b32b8144 138}
7c673cae 139
224ce89b 140void Mgr::background_init(Context *completion)
7c673cae
FG
141{
142 Mutex::Locker l(lock);
143 assert(!initializing);
144 assert(!initialized);
145 initializing = true;
146
147 finisher.start();
148
224ce89b 149 finisher.queue(new FunctionContext([this, completion](int r){
7c673cae 150 init();
224ce89b 151 completion->complete(0);
7c673cae
FG
152 }));
153}
154
155void Mgr::init()
156{
157 Mutex::Locker l(lock);
158 assert(initializing);
159 assert(!initialized);
160
161 // Start communicating with daemons to learn statistics etc
162 int r = server.init(monc->get_global_id(), client_messenger->get_myaddr());
163 if (r < 0) {
94b18763
FG
164 derr << "Initialize server fail: " << cpp_strerror(r) << dendl;
165 // This is typically due to a bind() failure, so let's let
166 // systemd restart us.
167 exit(1);
7c673cae
FG
168 }
169 dout(4) << "Initialized server at " << server.get_myaddr() << dendl;
170
171 // Preload all daemon metadata (will subsequently keep this
172 // up to date by watching maps, so do the initial load before
173 // we subscribe to any maps)
174 dout(4) << "Loading daemon metadata..." << dendl;
175 load_all_metadata();
176
177 // subscribe to all the maps
178 monc->sub_want("log-info", 0, 0);
179 monc->sub_want("mgrdigest", 0, 0);
180 monc->sub_want("fsmap", 0, 0);
224ce89b 181 monc->sub_want("servicemap", 0, 0);
7c673cae
FG
182
183 dout(4) << "waiting for OSDMap..." << dendl;
184 // Subscribe to OSDMap update to pass on to ClusterState
185 objecter->maybe_request_map();
186
187 // reset the mon session. we get these maps through subscriptions which
188 // are stateful with the connection, so even if *we* don't have them a
189 // previous incarnation sharing the same MonClient may have.
190 monc->reopen_session();
191
192 // Start Objecter and wait for OSD map
193 lock.Unlock(); // Drop lock because OSDMap dispatch calls into my ms_dispatch
194 objecter->wait_for_osd_map();
195 lock.Lock();
196
197 // Populate PGs in ClusterState
198 objecter->with_osdmap([this](const OSDMap &osd_map) {
199 cluster_state.notify_osdmap(osd_map);
200 });
201
202 // Wait for FSMap
203 dout(4) << "waiting for FSMap..." << dendl;
204 while (!cluster_state.have_fsmap()) {
205 fs_map_cond.Wait(lock);
206 }
207
208 dout(4) << "waiting for config-keys..." << dendl;
209
210 // Preload config keys (`get` for plugins is to be a fast local
211 // operation, we we don't have to synchronize these later because
212 // all sets will come via mgr)
3efd9988 213 auto loaded_config = load_config();
7c673cae 214
224ce89b
WB
215 // Wait for MgrDigest...
216 dout(4) << "waiting for MgrDigest..." << dendl;
217 while (!digest_received) {
218 digest_cond.Wait(lock);
219 }
7c673cae
FG
220
221 // assume finisher already initialized in background_init
3efd9988
FG
222 dout(4) << "starting python modules..." << dendl;
223 py_module_registry->active_start(loaded_config, daemon_state, cluster_state, *monc,
224 clog, *objecter, *client, finisher);
7c673cae
FG
225
226 dout(4) << "Complete." << dendl;
227 initializing = false;
228 initialized = true;
229}
230
231void Mgr::load_all_metadata()
232{
233 assert(lock.is_locked_by_me());
234
235 JSONCommand mds_cmd;
236 mds_cmd.run(monc, "{\"prefix\": \"mds metadata\"}");
237 JSONCommand osd_cmd;
238 osd_cmd.run(monc, "{\"prefix\": \"osd metadata\"}");
239 JSONCommand mon_cmd;
240 mon_cmd.run(monc, "{\"prefix\": \"mon metadata\"}");
241
242 lock.Unlock();
243 mds_cmd.wait();
244 osd_cmd.wait();
245 mon_cmd.wait();
246 lock.Lock();
247
248 assert(mds_cmd.r == 0);
249 assert(mon_cmd.r == 0);
250 assert(osd_cmd.r == 0);
251
252 for (auto &metadata_val : mds_cmd.json_result.get_array()) {
253 json_spirit::mObject daemon_meta = metadata_val.get_obj();
254 if (daemon_meta.count("hostname") == 0) {
255 dout(1) << "Skipping incomplete metadata entry" << dendl;
256 continue;
257 }
258
259 DaemonStatePtr dm = std::make_shared<DaemonState>(daemon_state.types);
224ce89b 260 dm->key = DaemonKey("mds",
7c673cae
FG
261 daemon_meta.at("name").get_str());
262 dm->hostname = daemon_meta.at("hostname").get_str();
263
264 daemon_meta.erase("name");
265 daemon_meta.erase("hostname");
266
267 for (const auto &i : daemon_meta) {
268 dm->metadata[i.first] = i.second.get_str();
269 }
270
271 daemon_state.insert(dm);
272 }
273
274 for (auto &metadata_val : mon_cmd.json_result.get_array()) {
275 json_spirit::mObject daemon_meta = metadata_val.get_obj();
276 if (daemon_meta.count("hostname") == 0) {
277 dout(1) << "Skipping incomplete metadata entry" << dendl;
278 continue;
279 }
280
281 DaemonStatePtr dm = std::make_shared<DaemonState>(daemon_state.types);
224ce89b 282 dm->key = DaemonKey("mon",
7c673cae
FG
283 daemon_meta.at("name").get_str());
284 dm->hostname = daemon_meta.at("hostname").get_str();
285
286 daemon_meta.erase("name");
287 daemon_meta.erase("hostname");
288
289 for (const auto &i : daemon_meta) {
290 dm->metadata[i.first] = i.second.get_str();
291 }
292
293 daemon_state.insert(dm);
294 }
295
296 for (auto &osd_metadata_val : osd_cmd.json_result.get_array()) {
297 json_spirit::mObject osd_metadata = osd_metadata_val.get_obj();
298 if (osd_metadata.count("hostname") == 0) {
299 dout(1) << "Skipping incomplete metadata entry" << dendl;
300 continue;
301 }
302 dout(4) << osd_metadata.at("hostname").get_str() << dendl;
303
304 DaemonStatePtr dm = std::make_shared<DaemonState>(daemon_state.types);
224ce89b 305 dm->key = DaemonKey("osd",
7c673cae
FG
306 stringify(osd_metadata.at("id").get_int()));
307 dm->hostname = osd_metadata.at("hostname").get_str();
308
309 osd_metadata.erase("id");
310 osd_metadata.erase("hostname");
311
312 for (const auto &i : osd_metadata) {
313 dm->metadata[i.first] = i.second.get_str();
314 }
315
316 daemon_state.insert(dm);
317 }
318}
319
3efd9988 320std::map<std::string, std::string> Mgr::load_config()
7c673cae
FG
321{
322 assert(lock.is_locked_by_me());
323
324 dout(10) << "listing keys" << dendl;
325 JSONCommand cmd;
c07f9fc5 326 cmd.run(monc, "{\"prefix\": \"config-key ls\"}");
7c673cae
FG
327 lock.Unlock();
328 cmd.wait();
329 lock.Lock();
330 assert(cmd.r == 0);
331
332 std::map<std::string, std::string> loaded;
333
334 for (auto &key_str : cmd.json_result.get_array()) {
335 std::string const key = key_str.get_str();
336 dout(20) << "saw key '" << key << "'" << dendl;
337
3efd9988 338 const std::string config_prefix = PyModuleRegistry::config_prefix;
7c673cae
FG
339
340 if (key.substr(0, config_prefix.size()) == config_prefix) {
341 dout(20) << "fetching '" << key << "'" << dendl;
342 Command get_cmd;
343 std::ostringstream cmd_json;
344 cmd_json << "{\"prefix\": \"config-key get\", \"key\": \"" << key << "\"}";
345 get_cmd.run(monc, cmd_json.str());
31f18b77 346 lock.Unlock();
7c673cae 347 get_cmd.wait();
31f18b77 348 lock.Lock();
7c673cae 349 assert(get_cmd.r == 0);
7c673cae
FG
350 loaded[key] = get_cmd.outbl.to_str();
351 }
352 }
353
3efd9988 354 return loaded;
7c673cae
FG
355}
356
357void Mgr::shutdown()
358{
359 finisher.queue(new FunctionContext([&](int) {
360 {
361 Mutex::Locker l(lock);
362 monc->sub_unwant("log-info");
363 monc->sub_unwant("mgrdigest");
364 monc->sub_unwant("fsmap");
365 // First stop the server so that we're not taking any more incoming
366 // requests
367 server.shutdown();
368 }
369 // after the messenger is stopped, signal modules to shutdown via finisher
3efd9988 370 py_module_registry->active_shutdown();
7c673cae
FG
371 }));
372
373 // Then stop the finisher to ensure its enqueued contexts aren't going
374 // to touch references to the things we're about to tear down
375 finisher.wait_for_empty();
376 finisher.stop();
377}
378
379void Mgr::handle_osd_map()
380{
381 assert(lock.is_locked_by_me());
382
383 std::set<std::string> names_exist;
384
385 /**
386 * When we see a new OSD map, inspect the entity addrs to
387 * see if they have changed (service restart), and if so
388 * reload the metadata.
389 */
390 objecter->with_osdmap([this, &names_exist](const OSDMap &osd_map) {
b32b8144 391 for (unsigned int osd_id = 0; osd_id < osd_map.get_max_osd(); ++osd_id) {
7c673cae
FG
392 if (!osd_map.exists(osd_id)) {
393 continue;
394 }
395
396 // Remember which OSDs exist so that we can cull any that don't
397 names_exist.insert(stringify(osd_id));
398
399 // Consider whether to update the daemon metadata (new/restarted daemon)
400 bool update_meta = false;
224ce89b 401 const auto k = DaemonKey("osd", stringify(osd_id));
7c673cae
FG
402 if (daemon_state.is_updating(k)) {
403 continue;
404 }
405
406 if (daemon_state.exists(k)) {
407 auto metadata = daemon_state.get(k);
c07f9fc5 408 Mutex::Locker l(metadata->lock);
7c673cae
FG
409 auto addr_iter = metadata->metadata.find("front_addr");
410 if (addr_iter != metadata->metadata.end()) {
411 const std::string &metadata_addr = addr_iter->second;
412 const auto &map_addr = osd_map.get_addr(osd_id);
413
414 if (metadata_addr != stringify(map_addr)) {
415 dout(4) << "OSD[" << osd_id << "] addr change " << metadata_addr
416 << " != " << stringify(map_addr) << dendl;
417 update_meta = true;
418 } else {
419 dout(20) << "OSD[" << osd_id << "] addr unchanged: "
420 << metadata_addr << dendl;
421 }
422 } else {
423 // Awkward case where daemon went into DaemonState because it
424 // sent us a report but its metadata didn't get loaded yet
425 update_meta = true;
426 }
427 } else {
428 update_meta = true;
429 }
430
431 if (update_meta) {
432 daemon_state.notify_updating(k);
433 auto c = new MetadataUpdate(daemon_state, k);
434 std::ostringstream cmd;
435 cmd << "{\"prefix\": \"osd metadata\", \"id\": "
436 << osd_id << "}";
437 monc->start_mon_command(
438 {cmd.str()},
439 {}, &c->outbl, &c->outs, c);
440 }
441 }
442
443 cluster_state.notify_osdmap(osd_map);
444 });
445
446 // TODO: same culling for MonMap
224ce89b 447 daemon_state.cull("osd", names_exist);
7c673cae
FG
448}
449
450void Mgr::handle_log(MLog *m)
451{
452 for (const auto &e : m->entries) {
3efd9988 453 py_module_registry->notify_all(e);
7c673cae
FG
454 }
455
456 m->put();
457}
458
224ce89b
WB
459void Mgr::handle_service_map(MServiceMap *m)
460{
461 dout(10) << "e" << m->service_map.epoch << dendl;
462 cluster_state.set_service_map(m->service_map);
463 server.got_service_map();
464}
465
7c673cae
FG
466bool Mgr::ms_dispatch(Message *m)
467{
468 dout(4) << *m << dendl;
469 Mutex::Locker l(lock);
470
471 switch (m->get_type()) {
472 case MSG_MGR_DIGEST:
473 handle_mgr_digest(static_cast<MMgrDigest*>(m));
474 break;
475 case CEPH_MSG_MON_MAP:
3efd9988 476 py_module_registry->notify_all("mon_map", "");
7c673cae
FG
477 m->put();
478 break;
479 case CEPH_MSG_FS_MAP:
3efd9988 480 py_module_registry->notify_all("fs_map", "");
7c673cae
FG
481 handle_fs_map((MFSMap*)m);
482 return false; // I shall let this pass through for Client
483 break;
484 case CEPH_MSG_OSD_MAP:
485 handle_osd_map();
486
3efd9988 487 py_module_registry->notify_all("osd_map", "");
7c673cae
FG
488
489 // Continuous subscribe, so that we can generate notifications
490 // for our MgrPyModules
491 objecter->maybe_request_map();
492 m->put();
493 break;
224ce89b
WB
494 case MSG_SERVICE_MAP:
495 handle_service_map((MServiceMap*)m);
3efd9988 496 py_module_registry->notify_all("service_map", "");
224ce89b
WB
497 m->put();
498 break;
7c673cae
FG
499 case MSG_LOG:
500 handle_log(static_cast<MLog *>(m));
501 break;
502
503 default:
504 return false;
505 }
506 return true;
507}
508
509
510void Mgr::handle_fs_map(MFSMap* m)
511{
512 assert(lock.is_locked_by_me());
513
514 std::set<std::string> names_exist;
515
516 const FSMap &new_fsmap = m->get_fsmap();
517
518 fs_map_cond.Signal();
519
520 // TODO: callers (e.g. from python land) are potentially going to see
521 // the new fsmap before we've bothered populating all the resulting
522 // daemon_state. Maybe we should block python land while we're making
523 // this kind of update?
524
525 cluster_state.set_fsmap(new_fsmap);
526
527 auto mds_info = new_fsmap.get_mds_info();
528 for (const auto &i : mds_info) {
529 const auto &info = i.second;
530
531 if (!new_fsmap.gid_exists(i.first)){
532 continue;
533 }
534
535 // Remember which MDS exists so that we can cull any that don't
536 names_exist.insert(info.name);
537
224ce89b 538 const auto k = DaemonKey("mds", info.name);
7c673cae
FG
539 if (daemon_state.is_updating(k)) {
540 continue;
541 }
542
543 bool update = false;
544 if (daemon_state.exists(k)) {
545 auto metadata = daemon_state.get(k);
c07f9fc5 546 Mutex::Locker l(metadata->lock);
7c673cae
FG
547 if (metadata->metadata.empty() ||
548 metadata->metadata.count("addr") == 0) {
549 update = true;
550 } else {
551 auto metadata_addr = metadata->metadata.at("addr");
552 const auto map_addr = info.addr;
553 update = metadata_addr != stringify(map_addr);
554 if (update) {
555 dout(4) << "MDS[" << info.name << "] addr change " << metadata_addr
556 << " != " << stringify(map_addr) << dendl;
557 }
558 }
559 } else {
560 update = true;
561 }
562
563 if (update) {
564 daemon_state.notify_updating(k);
565 auto c = new MetadataUpdate(daemon_state, k);
566
567 // Older MDS daemons don't have addr in the metadata, so
568 // fake it if the returned metadata doesn't have the field.
569 c->set_default("addr", stringify(info.addr));
570
571 std::ostringstream cmd;
572 cmd << "{\"prefix\": \"mds metadata\", \"who\": \""
573 << info.name << "\"}";
574 monc->start_mon_command(
575 {cmd.str()},
576 {}, &c->outbl, &c->outs, c);
577 }
578 }
224ce89b 579 daemon_state.cull("mds", names_exist);
7c673cae
FG
580}
581
224ce89b
WB
582bool Mgr::got_mgr_map(const MgrMap& m)
583{
584 Mutex::Locker l(lock);
585 dout(10) << m << dendl;
586
587 set<string> old_modules;
588 cluster_state.with_mgrmap([&](const MgrMap& m) {
589 old_modules = m.modules;
590 });
591 if (m.modules != old_modules) {
592 derr << "mgrmap module list changed to (" << m.modules << "), respawn"
593 << dendl;
594 return true;
595 }
596
597 cluster_state.set_mgr_map(m);
598
599 return false;
600}
7c673cae
FG
601
602void Mgr::handle_mgr_digest(MMgrDigest* m)
603{
604 dout(10) << m->mon_status_json.length() << dendl;
605 dout(10) << m->health_json.length() << dendl;
606 cluster_state.load_digest(m);
3efd9988
FG
607 py_module_registry->notify_all("mon_status", "");
608 py_module_registry->notify_all("health", "");
7c673cae
FG
609
610 // Hack: use this as a tick/opportunity to prompt python-land that
611 // the pgmap might have changed since last time we were here.
3efd9988 612 py_module_registry->notify_all("pg_summary", "");
7c673cae
FG
613 dout(10) << "done." << dendl;
614
615 m->put();
224ce89b
WB
616
617 if (!digest_received) {
618 digest_received = true;
619 digest_cond.Signal();
620 }
7c673cae
FG
621}
622
31f18b77
FG
623void Mgr::tick()
624{
224ce89b 625 dout(10) << dendl;
31f18b77
FG
626 server.send_report();
627}
c07f9fc5
FG
628
629std::vector<MonCommand> Mgr::get_command_set() const
630{
631 Mutex::Locker l(lock);
632
633 std::vector<MonCommand> commands = mgr_commands;
3efd9988 634 std::vector<MonCommand> py_commands = py_module_registry->get_commands();
c07f9fc5
FG
635 commands.insert(commands.end(), py_commands.begin(), py_commands.end());
636 return commands;
637}
638
3efd9988
FG
639std::map<std::string, std::string> Mgr::get_services() const
640{
641 Mutex::Locker l(lock);
642
643 return py_module_registry->get_services();
644}
645