]> git.proxmox.com Git - ceph.git/blame - ceph/src/mgr/Mgr.cc
update sources to 12.2.2
[ceph.git] / ceph / src / mgr / Mgr.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2016 John Spray <john.spray@redhat.com>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 */
13
14#include <Python.h>
15
16#include "osdc/Objecter.h"
17#include "client/Client.h"
18#include "common/errno.h"
19#include "mon/MonClient.h"
20#include "include/stringify.h"
21#include "global/global_context.h"
22#include "global/signal_handler.h"
23
24#include "mgr/MgrContext.h"
c07f9fc5 25#include "mgr/mgr_commands.h"
7c673cae 26
3efd9988 27//#include "MgrPyModule.h"
7c673cae
FG
28#include "DaemonServer.h"
29#include "messages/MMgrBeacon.h"
30#include "messages/MMgrDigest.h"
31#include "messages/MCommand.h"
32#include "messages/MCommandReply.h"
33#include "messages/MLog.h"
224ce89b 34#include "messages/MServiceMap.h"
7c673cae
FG
35
36#include "Mgr.h"
37
38#define dout_context g_ceph_context
39#define dout_subsys ceph_subsys_mgr
40#undef dout_prefix
41#define dout_prefix *_dout << "mgr " << __func__ << " "
42
43
224ce89b 44Mgr::Mgr(MonClient *monc_, const MgrMap& mgrmap,
3efd9988 45 PyModuleRegistry *py_module_registry_,
224ce89b 46 Messenger *clientm_, Objecter *objecter_,
7c673cae
FG
47 Client* client_, LogChannelRef clog_, LogChannelRef audit_clog_) :
48 monc(monc_),
49 objecter(objecter_),
50 client(client_),
51 client_messenger(clientm_),
52 lock("Mgr::lock"),
53 timer(g_ceph_context, lock),
54 finisher(g_ceph_context, "Mgr", "mgr-fin"),
224ce89b 55 digest_received(false),
3efd9988 56 py_module_registry(py_module_registry_),
224ce89b 57 cluster_state(monc, nullptr, mgrmap),
3efd9988 58 server(monc, finisher, daemon_state, cluster_state, *py_module_registry,
7c673cae 59 clog_, audit_clog_),
3efd9988
FG
60 clog(clog_),
61 audit_clog(audit_clog_),
7c673cae
FG
62 initialized(false),
63 initializing(false)
64{
65 cluster_state.set_objecter(objecter);
66}
67
68
69Mgr::~Mgr()
70{
71}
72
73
74/**
75 * Context for completion of metadata mon commands: take
76 * the result and stash it in DaemonStateIndex
77 */
78class MetadataUpdate : public Context
79{
80 DaemonStateIndex &daemon_state;
81 DaemonKey key;
82
83 std::map<std::string, std::string> defaults;
84
85public:
86 bufferlist outbl;
87 std::string outs;
88
89 MetadataUpdate(DaemonStateIndex &daemon_state_, const DaemonKey &key_)
90 : daemon_state(daemon_state_), key(key_) {}
91
92 void set_default(const std::string &k, const std::string &v)
93 {
94 defaults[k] = v;
95 }
96
97 void finish(int r) override
98 {
99 daemon_state.clear_updating(key);
100 if (r == 0) {
224ce89b 101 if (key.first == "mds") {
7c673cae
FG
102 json_spirit::mValue json_result;
103 bool read_ok = json_spirit::read(
104 outbl.to_str(), json_result);
105 if (!read_ok) {
106 dout(1) << "mon returned invalid JSON for "
224ce89b 107 << key.first << "." << key.second << dendl;
7c673cae
FG
108 return;
109 }
110
111 json_spirit::mObject daemon_meta = json_result.get_obj();
112
113 // Apply any defaults
114 for (const auto &i : defaults) {
115 if (daemon_meta.find(i.first) == daemon_meta.end()) {
116 daemon_meta[i.first] = i.second;
117 }
118 }
119
120 DaemonStatePtr state;
121 if (daemon_state.exists(key)) {
122 state = daemon_state.get(key);
c07f9fc5 123 Mutex::Locker l(state->lock);
7c673cae
FG
124 daemon_meta.erase("name");
125 daemon_meta.erase("hostname");
126 state->metadata.clear();
127 for (const auto &i : daemon_meta) {
128 state->metadata[i.first] = i.second.get_str();
129 }
130 } else {
131 state = std::make_shared<DaemonState>(daemon_state.types);
132 state->key = key;
133 state->hostname = daemon_meta.at("hostname").get_str();
134
135 for (const auto &i : daemon_meta) {
136 state->metadata[i.first] = i.second.get_str();
137 }
138
139 daemon_state.insert(state);
140 }
224ce89b 141 } else if (key.first == "osd") {
7c673cae
FG
142 } else {
143 ceph_abort();
144 }
145 } else {
146 dout(1) << "mon failed to return metadata for "
224ce89b
WB
147 << key.first << "." << key.second << ": "
148 << cpp_strerror(r) << dendl;
7c673cae
FG
149 }
150 }
151};
152
153
224ce89b 154void Mgr::background_init(Context *completion)
7c673cae
FG
155{
156 Mutex::Locker l(lock);
157 assert(!initializing);
158 assert(!initialized);
159 initializing = true;
160
161 finisher.start();
162
224ce89b 163 finisher.queue(new FunctionContext([this, completion](int r){
7c673cae 164 init();
224ce89b 165 completion->complete(0);
7c673cae
FG
166 }));
167}
168
169void Mgr::init()
170{
171 Mutex::Locker l(lock);
172 assert(initializing);
173 assert(!initialized);
174
175 // Start communicating with daemons to learn statistics etc
176 int r = server.init(monc->get_global_id(), client_messenger->get_myaddr());
177 if (r < 0) {
178 derr << "Initialize server fail"<< dendl;
179 return;
180 }
181 dout(4) << "Initialized server at " << server.get_myaddr() << dendl;
182
183 // Preload all daemon metadata (will subsequently keep this
184 // up to date by watching maps, so do the initial load before
185 // we subscribe to any maps)
186 dout(4) << "Loading daemon metadata..." << dendl;
187 load_all_metadata();
188
189 // subscribe to all the maps
190 monc->sub_want("log-info", 0, 0);
191 monc->sub_want("mgrdigest", 0, 0);
192 monc->sub_want("fsmap", 0, 0);
224ce89b 193 monc->sub_want("servicemap", 0, 0);
7c673cae
FG
194
195 dout(4) << "waiting for OSDMap..." << dendl;
196 // Subscribe to OSDMap update to pass on to ClusterState
197 objecter->maybe_request_map();
198
199 // reset the mon session. we get these maps through subscriptions which
200 // are stateful with the connection, so even if *we* don't have them a
201 // previous incarnation sharing the same MonClient may have.
202 monc->reopen_session();
203
204 // Start Objecter and wait for OSD map
205 lock.Unlock(); // Drop lock because OSDMap dispatch calls into my ms_dispatch
206 objecter->wait_for_osd_map();
207 lock.Lock();
208
209 // Populate PGs in ClusterState
210 objecter->with_osdmap([this](const OSDMap &osd_map) {
211 cluster_state.notify_osdmap(osd_map);
212 });
213
214 // Wait for FSMap
215 dout(4) << "waiting for FSMap..." << dendl;
216 while (!cluster_state.have_fsmap()) {
217 fs_map_cond.Wait(lock);
218 }
219
220 dout(4) << "waiting for config-keys..." << dendl;
221
222 // Preload config keys (`get` for plugins is to be a fast local
223 // operation, we we don't have to synchronize these later because
224 // all sets will come via mgr)
3efd9988 225 auto loaded_config = load_config();
7c673cae 226
224ce89b
WB
227 // Wait for MgrDigest...
228 dout(4) << "waiting for MgrDigest..." << dendl;
229 while (!digest_received) {
230 digest_cond.Wait(lock);
231 }
7c673cae
FG
232
233 // assume finisher already initialized in background_init
3efd9988
FG
234 dout(4) << "starting python modules..." << dendl;
235 py_module_registry->active_start(loaded_config, daemon_state, cluster_state, *monc,
236 clog, *objecter, *client, finisher);
7c673cae
FG
237
238 dout(4) << "Complete." << dendl;
239 initializing = false;
240 initialized = true;
241}
242
243void Mgr::load_all_metadata()
244{
245 assert(lock.is_locked_by_me());
246
247 JSONCommand mds_cmd;
248 mds_cmd.run(monc, "{\"prefix\": \"mds metadata\"}");
249 JSONCommand osd_cmd;
250 osd_cmd.run(monc, "{\"prefix\": \"osd metadata\"}");
251 JSONCommand mon_cmd;
252 mon_cmd.run(monc, "{\"prefix\": \"mon metadata\"}");
253
254 lock.Unlock();
255 mds_cmd.wait();
256 osd_cmd.wait();
257 mon_cmd.wait();
258 lock.Lock();
259
260 assert(mds_cmd.r == 0);
261 assert(mon_cmd.r == 0);
262 assert(osd_cmd.r == 0);
263
264 for (auto &metadata_val : mds_cmd.json_result.get_array()) {
265 json_spirit::mObject daemon_meta = metadata_val.get_obj();
266 if (daemon_meta.count("hostname") == 0) {
267 dout(1) << "Skipping incomplete metadata entry" << dendl;
268 continue;
269 }
270
271 DaemonStatePtr dm = std::make_shared<DaemonState>(daemon_state.types);
224ce89b 272 dm->key = DaemonKey("mds",
7c673cae
FG
273 daemon_meta.at("name").get_str());
274 dm->hostname = daemon_meta.at("hostname").get_str();
275
276 daemon_meta.erase("name");
277 daemon_meta.erase("hostname");
278
279 for (const auto &i : daemon_meta) {
280 dm->metadata[i.first] = i.second.get_str();
281 }
282
283 daemon_state.insert(dm);
284 }
285
286 for (auto &metadata_val : mon_cmd.json_result.get_array()) {
287 json_spirit::mObject daemon_meta = metadata_val.get_obj();
288 if (daemon_meta.count("hostname") == 0) {
289 dout(1) << "Skipping incomplete metadata entry" << dendl;
290 continue;
291 }
292
293 DaemonStatePtr dm = std::make_shared<DaemonState>(daemon_state.types);
224ce89b 294 dm->key = DaemonKey("mon",
7c673cae
FG
295 daemon_meta.at("name").get_str());
296 dm->hostname = daemon_meta.at("hostname").get_str();
297
298 daemon_meta.erase("name");
299 daemon_meta.erase("hostname");
300
301 for (const auto &i : daemon_meta) {
302 dm->metadata[i.first] = i.second.get_str();
303 }
304
305 daemon_state.insert(dm);
306 }
307
308 for (auto &osd_metadata_val : osd_cmd.json_result.get_array()) {
309 json_spirit::mObject osd_metadata = osd_metadata_val.get_obj();
310 if (osd_metadata.count("hostname") == 0) {
311 dout(1) << "Skipping incomplete metadata entry" << dendl;
312 continue;
313 }
314 dout(4) << osd_metadata.at("hostname").get_str() << dendl;
315
316 DaemonStatePtr dm = std::make_shared<DaemonState>(daemon_state.types);
224ce89b 317 dm->key = DaemonKey("osd",
7c673cae
FG
318 stringify(osd_metadata.at("id").get_int()));
319 dm->hostname = osd_metadata.at("hostname").get_str();
320
321 osd_metadata.erase("id");
322 osd_metadata.erase("hostname");
323
324 for (const auto &i : osd_metadata) {
325 dm->metadata[i.first] = i.second.get_str();
326 }
327
328 daemon_state.insert(dm);
329 }
330}
331
3efd9988 332std::map<std::string, std::string> Mgr::load_config()
7c673cae
FG
333{
334 assert(lock.is_locked_by_me());
335
336 dout(10) << "listing keys" << dendl;
337 JSONCommand cmd;
c07f9fc5 338 cmd.run(monc, "{\"prefix\": \"config-key ls\"}");
7c673cae
FG
339 lock.Unlock();
340 cmd.wait();
341 lock.Lock();
342 assert(cmd.r == 0);
343
344 std::map<std::string, std::string> loaded;
345
346 for (auto &key_str : cmd.json_result.get_array()) {
347 std::string const key = key_str.get_str();
348 dout(20) << "saw key '" << key << "'" << dendl;
349
3efd9988 350 const std::string config_prefix = PyModuleRegistry::config_prefix;
7c673cae
FG
351
352 if (key.substr(0, config_prefix.size()) == config_prefix) {
353 dout(20) << "fetching '" << key << "'" << dendl;
354 Command get_cmd;
355 std::ostringstream cmd_json;
356 cmd_json << "{\"prefix\": \"config-key get\", \"key\": \"" << key << "\"}";
357 get_cmd.run(monc, cmd_json.str());
31f18b77 358 lock.Unlock();
7c673cae 359 get_cmd.wait();
31f18b77 360 lock.Lock();
7c673cae 361 assert(get_cmd.r == 0);
7c673cae
FG
362 loaded[key] = get_cmd.outbl.to_str();
363 }
364 }
365
3efd9988 366 return loaded;
7c673cae
FG
367}
368
369void Mgr::shutdown()
370{
371 finisher.queue(new FunctionContext([&](int) {
372 {
373 Mutex::Locker l(lock);
374 monc->sub_unwant("log-info");
375 monc->sub_unwant("mgrdigest");
376 monc->sub_unwant("fsmap");
377 // First stop the server so that we're not taking any more incoming
378 // requests
379 server.shutdown();
380 }
381 // after the messenger is stopped, signal modules to shutdown via finisher
3efd9988 382 py_module_registry->active_shutdown();
7c673cae
FG
383 }));
384
385 // Then stop the finisher to ensure its enqueued contexts aren't going
386 // to touch references to the things we're about to tear down
387 finisher.wait_for_empty();
388 finisher.stop();
389}
390
391void Mgr::handle_osd_map()
392{
393 assert(lock.is_locked_by_me());
394
395 std::set<std::string> names_exist;
396
397 /**
398 * When we see a new OSD map, inspect the entity addrs to
399 * see if they have changed (service restart), and if so
400 * reload the metadata.
401 */
402 objecter->with_osdmap([this, &names_exist](const OSDMap &osd_map) {
403 for (unsigned int osd_id = 0; osd_id < osd_map.get_num_osds(); ++osd_id) {
404 if (!osd_map.exists(osd_id)) {
405 continue;
406 }
407
408 // Remember which OSDs exist so that we can cull any that don't
409 names_exist.insert(stringify(osd_id));
410
411 // Consider whether to update the daemon metadata (new/restarted daemon)
412 bool update_meta = false;
224ce89b 413 const auto k = DaemonKey("osd", stringify(osd_id));
7c673cae
FG
414 if (daemon_state.is_updating(k)) {
415 continue;
416 }
417
418 if (daemon_state.exists(k)) {
419 auto metadata = daemon_state.get(k);
c07f9fc5 420 Mutex::Locker l(metadata->lock);
7c673cae
FG
421 auto addr_iter = metadata->metadata.find("front_addr");
422 if (addr_iter != metadata->metadata.end()) {
423 const std::string &metadata_addr = addr_iter->second;
424 const auto &map_addr = osd_map.get_addr(osd_id);
425
426 if (metadata_addr != stringify(map_addr)) {
427 dout(4) << "OSD[" << osd_id << "] addr change " << metadata_addr
428 << " != " << stringify(map_addr) << dendl;
429 update_meta = true;
430 } else {
431 dout(20) << "OSD[" << osd_id << "] addr unchanged: "
432 << metadata_addr << dendl;
433 }
434 } else {
435 // Awkward case where daemon went into DaemonState because it
436 // sent us a report but its metadata didn't get loaded yet
437 update_meta = true;
438 }
439 } else {
440 update_meta = true;
441 }
442
443 if (update_meta) {
444 daemon_state.notify_updating(k);
445 auto c = new MetadataUpdate(daemon_state, k);
446 std::ostringstream cmd;
447 cmd << "{\"prefix\": \"osd metadata\", \"id\": "
448 << osd_id << "}";
449 monc->start_mon_command(
450 {cmd.str()},
451 {}, &c->outbl, &c->outs, c);
452 }
453 }
454
455 cluster_state.notify_osdmap(osd_map);
456 });
457
458 // TODO: same culling for MonMap
224ce89b 459 daemon_state.cull("osd", names_exist);
7c673cae
FG
460}
461
462void Mgr::handle_log(MLog *m)
463{
464 for (const auto &e : m->entries) {
3efd9988 465 py_module_registry->notify_all(e);
7c673cae
FG
466 }
467
468 m->put();
469}
470
224ce89b
WB
471void Mgr::handle_service_map(MServiceMap *m)
472{
473 dout(10) << "e" << m->service_map.epoch << dendl;
474 cluster_state.set_service_map(m->service_map);
475 server.got_service_map();
476}
477
7c673cae
FG
478bool Mgr::ms_dispatch(Message *m)
479{
480 dout(4) << *m << dendl;
481 Mutex::Locker l(lock);
482
483 switch (m->get_type()) {
484 case MSG_MGR_DIGEST:
485 handle_mgr_digest(static_cast<MMgrDigest*>(m));
486 break;
487 case CEPH_MSG_MON_MAP:
3efd9988 488 py_module_registry->notify_all("mon_map", "");
7c673cae
FG
489 m->put();
490 break;
491 case CEPH_MSG_FS_MAP:
3efd9988 492 py_module_registry->notify_all("fs_map", "");
7c673cae
FG
493 handle_fs_map((MFSMap*)m);
494 return false; // I shall let this pass through for Client
495 break;
496 case CEPH_MSG_OSD_MAP:
497 handle_osd_map();
498
3efd9988 499 py_module_registry->notify_all("osd_map", "");
7c673cae
FG
500
501 // Continuous subscribe, so that we can generate notifications
502 // for our MgrPyModules
503 objecter->maybe_request_map();
504 m->put();
505 break;
224ce89b
WB
506 case MSG_SERVICE_MAP:
507 handle_service_map((MServiceMap*)m);
3efd9988 508 py_module_registry->notify_all("service_map", "");
224ce89b
WB
509 m->put();
510 break;
7c673cae
FG
511 case MSG_LOG:
512 handle_log(static_cast<MLog *>(m));
513 break;
514
515 default:
516 return false;
517 }
518 return true;
519}
520
521
522void Mgr::handle_fs_map(MFSMap* m)
523{
524 assert(lock.is_locked_by_me());
525
526 std::set<std::string> names_exist;
527
528 const FSMap &new_fsmap = m->get_fsmap();
529
530 fs_map_cond.Signal();
531
532 // TODO: callers (e.g. from python land) are potentially going to see
533 // the new fsmap before we've bothered populating all the resulting
534 // daemon_state. Maybe we should block python land while we're making
535 // this kind of update?
536
537 cluster_state.set_fsmap(new_fsmap);
538
539 auto mds_info = new_fsmap.get_mds_info();
540 for (const auto &i : mds_info) {
541 const auto &info = i.second;
542
543 if (!new_fsmap.gid_exists(i.first)){
544 continue;
545 }
546
547 // Remember which MDS exists so that we can cull any that don't
548 names_exist.insert(info.name);
549
224ce89b 550 const auto k = DaemonKey("mds", info.name);
7c673cae
FG
551 if (daemon_state.is_updating(k)) {
552 continue;
553 }
554
555 bool update = false;
556 if (daemon_state.exists(k)) {
557 auto metadata = daemon_state.get(k);
c07f9fc5 558 Mutex::Locker l(metadata->lock);
7c673cae
FG
559 if (metadata->metadata.empty() ||
560 metadata->metadata.count("addr") == 0) {
561 update = true;
562 } else {
563 auto metadata_addr = metadata->metadata.at("addr");
564 const auto map_addr = info.addr;
565 update = metadata_addr != stringify(map_addr);
566 if (update) {
567 dout(4) << "MDS[" << info.name << "] addr change " << metadata_addr
568 << " != " << stringify(map_addr) << dendl;
569 }
570 }
571 } else {
572 update = true;
573 }
574
575 if (update) {
576 daemon_state.notify_updating(k);
577 auto c = new MetadataUpdate(daemon_state, k);
578
579 // Older MDS daemons don't have addr in the metadata, so
580 // fake it if the returned metadata doesn't have the field.
581 c->set_default("addr", stringify(info.addr));
582
583 std::ostringstream cmd;
584 cmd << "{\"prefix\": \"mds metadata\", \"who\": \""
585 << info.name << "\"}";
586 monc->start_mon_command(
587 {cmd.str()},
588 {}, &c->outbl, &c->outs, c);
589 }
590 }
224ce89b 591 daemon_state.cull("mds", names_exist);
7c673cae
FG
592}
593
224ce89b
WB
594bool Mgr::got_mgr_map(const MgrMap& m)
595{
596 Mutex::Locker l(lock);
597 dout(10) << m << dendl;
598
599 set<string> old_modules;
600 cluster_state.with_mgrmap([&](const MgrMap& m) {
601 old_modules = m.modules;
602 });
603 if (m.modules != old_modules) {
604 derr << "mgrmap module list changed to (" << m.modules << "), respawn"
605 << dendl;
606 return true;
607 }
608
609 cluster_state.set_mgr_map(m);
610
611 return false;
612}
7c673cae
FG
613
614void Mgr::handle_mgr_digest(MMgrDigest* m)
615{
616 dout(10) << m->mon_status_json.length() << dendl;
617 dout(10) << m->health_json.length() << dendl;
618 cluster_state.load_digest(m);
3efd9988
FG
619 py_module_registry->notify_all("mon_status", "");
620 py_module_registry->notify_all("health", "");
7c673cae
FG
621
622 // Hack: use this as a tick/opportunity to prompt python-land that
623 // the pgmap might have changed since last time we were here.
3efd9988 624 py_module_registry->notify_all("pg_summary", "");
7c673cae
FG
625 dout(10) << "done." << dendl;
626
627 m->put();
224ce89b
WB
628
629 if (!digest_received) {
630 digest_received = true;
631 digest_cond.Signal();
632 }
7c673cae
FG
633}
634
31f18b77
FG
635void Mgr::tick()
636{
224ce89b 637 dout(10) << dendl;
31f18b77
FG
638 server.send_report();
639}
c07f9fc5
FG
640
641std::vector<MonCommand> Mgr::get_command_set() const
642{
643 Mutex::Locker l(lock);
644
645 std::vector<MonCommand> commands = mgr_commands;
3efd9988 646 std::vector<MonCommand> py_commands = py_module_registry->get_commands();
c07f9fc5
FG
647 commands.insert(commands.end(), py_commands.begin(), py_commands.end());
648 return commands;
649}
650
3efd9988
FG
651std::map<std::string, std::string> Mgr::get_services() const
652{
653 Mutex::Locker l(lock);
654
655 return py_module_registry->get_services();
656}
657