]> git.proxmox.com Git - ceph.git/blame - ceph/src/mgr/Mgr.cc
update sources to v12.1.1
[ceph.git] / ceph / src / mgr / Mgr.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2016 John Spray <john.spray@redhat.com>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 */
13
14#include <Python.h>
15
16#include "osdc/Objecter.h"
17#include "client/Client.h"
18#include "common/errno.h"
19#include "mon/MonClient.h"
20#include "include/stringify.h"
21#include "global/global_context.h"
22#include "global/signal_handler.h"
23
24#include "mgr/MgrContext.h"
25
26#include "MgrPyModule.h"
27#include "DaemonServer.h"
28#include "messages/MMgrBeacon.h"
29#include "messages/MMgrDigest.h"
30#include "messages/MCommand.h"
31#include "messages/MCommandReply.h"
32#include "messages/MLog.h"
224ce89b 33#include "messages/MServiceMap.h"
7c673cae
FG
34
35#include "Mgr.h"
36
37#define dout_context g_ceph_context
38#define dout_subsys ceph_subsys_mgr
39#undef dout_prefix
40#define dout_prefix *_dout << "mgr " << __func__ << " "
41
42
224ce89b
WB
43Mgr::Mgr(MonClient *monc_, const MgrMap& mgrmap,
44 Messenger *clientm_, Objecter *objecter_,
7c673cae
FG
45 Client* client_, LogChannelRef clog_, LogChannelRef audit_clog_) :
46 monc(monc_),
47 objecter(objecter_),
48 client(client_),
49 client_messenger(clientm_),
50 lock("Mgr::lock"),
51 timer(g_ceph_context, lock),
52 finisher(g_ceph_context, "Mgr", "mgr-fin"),
224ce89b
WB
53 digest_received(false),
54 py_modules(daemon_state, cluster_state, *monc, clog_, *objecter, *client,
7c673cae 55 finisher),
224ce89b 56 cluster_state(monc, nullptr, mgrmap),
7c673cae
FG
57 server(monc, finisher, daemon_state, cluster_state, py_modules,
58 clog_, audit_clog_),
59 initialized(false),
60 initializing(false)
61{
62 cluster_state.set_objecter(objecter);
63}
64
65
66Mgr::~Mgr()
67{
68}
69
70
71/**
72 * Context for completion of metadata mon commands: take
73 * the result and stash it in DaemonStateIndex
74 */
75class MetadataUpdate : public Context
76{
77 DaemonStateIndex &daemon_state;
78 DaemonKey key;
79
80 std::map<std::string, std::string> defaults;
81
82public:
83 bufferlist outbl;
84 std::string outs;
85
86 MetadataUpdate(DaemonStateIndex &daemon_state_, const DaemonKey &key_)
87 : daemon_state(daemon_state_), key(key_) {}
88
89 void set_default(const std::string &k, const std::string &v)
90 {
91 defaults[k] = v;
92 }
93
94 void finish(int r) override
95 {
96 daemon_state.clear_updating(key);
97 if (r == 0) {
224ce89b 98 if (key.first == "mds") {
7c673cae
FG
99 json_spirit::mValue json_result;
100 bool read_ok = json_spirit::read(
101 outbl.to_str(), json_result);
102 if (!read_ok) {
103 dout(1) << "mon returned invalid JSON for "
224ce89b 104 << key.first << "." << key.second << dendl;
7c673cae
FG
105 return;
106 }
107
108 json_spirit::mObject daemon_meta = json_result.get_obj();
109
110 // Apply any defaults
111 for (const auto &i : defaults) {
112 if (daemon_meta.find(i.first) == daemon_meta.end()) {
113 daemon_meta[i.first] = i.second;
114 }
115 }
116
117 DaemonStatePtr state;
118 if (daemon_state.exists(key)) {
119 state = daemon_state.get(key);
120 // TODO lock state
121 daemon_meta.erase("name");
122 daemon_meta.erase("hostname");
123 state->metadata.clear();
124 for (const auto &i : daemon_meta) {
125 state->metadata[i.first] = i.second.get_str();
126 }
127 } else {
128 state = std::make_shared<DaemonState>(daemon_state.types);
129 state->key = key;
130 state->hostname = daemon_meta.at("hostname").get_str();
131
132 for (const auto &i : daemon_meta) {
133 state->metadata[i.first] = i.second.get_str();
134 }
135
136 daemon_state.insert(state);
137 }
224ce89b 138 } else if (key.first == "osd") {
7c673cae
FG
139 } else {
140 ceph_abort();
141 }
142 } else {
143 dout(1) << "mon failed to return metadata for "
224ce89b
WB
144 << key.first << "." << key.second << ": "
145 << cpp_strerror(r) << dendl;
7c673cae
FG
146 }
147 }
148};
149
150
224ce89b 151void Mgr::background_init(Context *completion)
7c673cae
FG
152{
153 Mutex::Locker l(lock);
154 assert(!initializing);
155 assert(!initialized);
156 initializing = true;
157
158 finisher.start();
159
224ce89b 160 finisher.queue(new FunctionContext([this, completion](int r){
7c673cae 161 init();
224ce89b 162 completion->complete(0);
7c673cae
FG
163 }));
164}
165
166void Mgr::init()
167{
168 Mutex::Locker l(lock);
169 assert(initializing);
170 assert(!initialized);
171
172 // Start communicating with daemons to learn statistics etc
173 int r = server.init(monc->get_global_id(), client_messenger->get_myaddr());
174 if (r < 0) {
175 derr << "Initialize server fail"<< dendl;
176 return;
177 }
178 dout(4) << "Initialized server at " << server.get_myaddr() << dendl;
179
180 // Preload all daemon metadata (will subsequently keep this
181 // up to date by watching maps, so do the initial load before
182 // we subscribe to any maps)
183 dout(4) << "Loading daemon metadata..." << dendl;
184 load_all_metadata();
185
186 // subscribe to all the maps
187 monc->sub_want("log-info", 0, 0);
188 monc->sub_want("mgrdigest", 0, 0);
189 monc->sub_want("fsmap", 0, 0);
224ce89b 190 monc->sub_want("servicemap", 0, 0);
7c673cae
FG
191
192 dout(4) << "waiting for OSDMap..." << dendl;
193 // Subscribe to OSDMap update to pass on to ClusterState
194 objecter->maybe_request_map();
195
196 // reset the mon session. we get these maps through subscriptions which
197 // are stateful with the connection, so even if *we* don't have them a
198 // previous incarnation sharing the same MonClient may have.
199 monc->reopen_session();
200
201 // Start Objecter and wait for OSD map
202 lock.Unlock(); // Drop lock because OSDMap dispatch calls into my ms_dispatch
203 objecter->wait_for_osd_map();
204 lock.Lock();
205
206 // Populate PGs in ClusterState
207 objecter->with_osdmap([this](const OSDMap &osd_map) {
208 cluster_state.notify_osdmap(osd_map);
209 });
210
211 // Wait for FSMap
212 dout(4) << "waiting for FSMap..." << dendl;
213 while (!cluster_state.have_fsmap()) {
214 fs_map_cond.Wait(lock);
215 }
216
217 dout(4) << "waiting for config-keys..." << dendl;
218
219 // Preload config keys (`get` for plugins is to be a fast local
220 // operation, we we don't have to synchronize these later because
221 // all sets will come via mgr)
222 load_config();
223
224ce89b
WB
224 // Wait for MgrDigest...
225 dout(4) << "waiting for MgrDigest..." << dendl;
226 while (!digest_received) {
227 digest_cond.Wait(lock);
228 }
7c673cae
FG
229
230 // assume finisher already initialized in background_init
224ce89b 231 dout(4) << "starting PyModules..." << dendl;
7c673cae
FG
232 py_modules.init();
233 py_modules.start();
234
235 dout(4) << "Complete." << dendl;
236 initializing = false;
237 initialized = true;
238}
239
240void Mgr::load_all_metadata()
241{
242 assert(lock.is_locked_by_me());
243
244 JSONCommand mds_cmd;
245 mds_cmd.run(monc, "{\"prefix\": \"mds metadata\"}");
246 JSONCommand osd_cmd;
247 osd_cmd.run(monc, "{\"prefix\": \"osd metadata\"}");
248 JSONCommand mon_cmd;
249 mon_cmd.run(monc, "{\"prefix\": \"mon metadata\"}");
250
251 lock.Unlock();
252 mds_cmd.wait();
253 osd_cmd.wait();
254 mon_cmd.wait();
255 lock.Lock();
256
257 assert(mds_cmd.r == 0);
258 assert(mon_cmd.r == 0);
259 assert(osd_cmd.r == 0);
260
261 for (auto &metadata_val : mds_cmd.json_result.get_array()) {
262 json_spirit::mObject daemon_meta = metadata_val.get_obj();
263 if (daemon_meta.count("hostname") == 0) {
264 dout(1) << "Skipping incomplete metadata entry" << dendl;
265 continue;
266 }
267
268 DaemonStatePtr dm = std::make_shared<DaemonState>(daemon_state.types);
224ce89b 269 dm->key = DaemonKey("mds",
7c673cae
FG
270 daemon_meta.at("name").get_str());
271 dm->hostname = daemon_meta.at("hostname").get_str();
272
273 daemon_meta.erase("name");
274 daemon_meta.erase("hostname");
275
276 for (const auto &i : daemon_meta) {
277 dm->metadata[i.first] = i.second.get_str();
278 }
279
280 daemon_state.insert(dm);
281 }
282
283 for (auto &metadata_val : mon_cmd.json_result.get_array()) {
284 json_spirit::mObject daemon_meta = metadata_val.get_obj();
285 if (daemon_meta.count("hostname") == 0) {
286 dout(1) << "Skipping incomplete metadata entry" << dendl;
287 continue;
288 }
289
290 DaemonStatePtr dm = std::make_shared<DaemonState>(daemon_state.types);
224ce89b 291 dm->key = DaemonKey("mon",
7c673cae
FG
292 daemon_meta.at("name").get_str());
293 dm->hostname = daemon_meta.at("hostname").get_str();
294
295 daemon_meta.erase("name");
296 daemon_meta.erase("hostname");
297
298 for (const auto &i : daemon_meta) {
299 dm->metadata[i.first] = i.second.get_str();
300 }
301
302 daemon_state.insert(dm);
303 }
304
305 for (auto &osd_metadata_val : osd_cmd.json_result.get_array()) {
306 json_spirit::mObject osd_metadata = osd_metadata_val.get_obj();
307 if (osd_metadata.count("hostname") == 0) {
308 dout(1) << "Skipping incomplete metadata entry" << dendl;
309 continue;
310 }
311 dout(4) << osd_metadata.at("hostname").get_str() << dendl;
312
313 DaemonStatePtr dm = std::make_shared<DaemonState>(daemon_state.types);
224ce89b 314 dm->key = DaemonKey("osd",
7c673cae
FG
315 stringify(osd_metadata.at("id").get_int()));
316 dm->hostname = osd_metadata.at("hostname").get_str();
317
318 osd_metadata.erase("id");
319 osd_metadata.erase("hostname");
320
321 for (const auto &i : osd_metadata) {
322 dm->metadata[i.first] = i.second.get_str();
323 }
324
325 daemon_state.insert(dm);
326 }
327}
328
329void Mgr::load_config()
330{
331 assert(lock.is_locked_by_me());
332
333 dout(10) << "listing keys" << dendl;
334 JSONCommand cmd;
335 cmd.run(monc, "{\"prefix\": \"config-key list\"}");
336 lock.Unlock();
337 cmd.wait();
338 lock.Lock();
339 assert(cmd.r == 0);
340
341 std::map<std::string, std::string> loaded;
342
343 for (auto &key_str : cmd.json_result.get_array()) {
344 std::string const key = key_str.get_str();
345 dout(20) << "saw key '" << key << "'" << dendl;
346
347 const std::string config_prefix = PyModules::config_prefix;
348
349 if (key.substr(0, config_prefix.size()) == config_prefix) {
350 dout(20) << "fetching '" << key << "'" << dendl;
351 Command get_cmd;
352 std::ostringstream cmd_json;
353 cmd_json << "{\"prefix\": \"config-key get\", \"key\": \"" << key << "\"}";
354 get_cmd.run(monc, cmd_json.str());
31f18b77 355 lock.Unlock();
7c673cae 356 get_cmd.wait();
31f18b77 357 lock.Lock();
7c673cae 358 assert(get_cmd.r == 0);
7c673cae
FG
359 loaded[key] = get_cmd.outbl.to_str();
360 }
361 }
362
363 py_modules.insert_config(loaded);
364}
365
366void Mgr::shutdown()
367{
368 finisher.queue(new FunctionContext([&](int) {
369 {
370 Mutex::Locker l(lock);
371 monc->sub_unwant("log-info");
372 monc->sub_unwant("mgrdigest");
373 monc->sub_unwant("fsmap");
374 // First stop the server so that we're not taking any more incoming
375 // requests
376 server.shutdown();
377 }
378 // after the messenger is stopped, signal modules to shutdown via finisher
379 py_modules.shutdown();
380 }));
381
382 // Then stop the finisher to ensure its enqueued contexts aren't going
383 // to touch references to the things we're about to tear down
384 finisher.wait_for_empty();
385 finisher.stop();
386}
387
388void Mgr::handle_osd_map()
389{
390 assert(lock.is_locked_by_me());
391
392 std::set<std::string> names_exist;
393
394 /**
395 * When we see a new OSD map, inspect the entity addrs to
396 * see if they have changed (service restart), and if so
397 * reload the metadata.
398 */
399 objecter->with_osdmap([this, &names_exist](const OSDMap &osd_map) {
400 for (unsigned int osd_id = 0; osd_id < osd_map.get_num_osds(); ++osd_id) {
401 if (!osd_map.exists(osd_id)) {
402 continue;
403 }
404
405 // Remember which OSDs exist so that we can cull any that don't
406 names_exist.insert(stringify(osd_id));
407
408 // Consider whether to update the daemon metadata (new/restarted daemon)
409 bool update_meta = false;
224ce89b 410 const auto k = DaemonKey("osd", stringify(osd_id));
7c673cae
FG
411 if (daemon_state.is_updating(k)) {
412 continue;
413 }
414
415 if (daemon_state.exists(k)) {
416 auto metadata = daemon_state.get(k);
417 auto addr_iter = metadata->metadata.find("front_addr");
418 if (addr_iter != metadata->metadata.end()) {
419 const std::string &metadata_addr = addr_iter->second;
420 const auto &map_addr = osd_map.get_addr(osd_id);
421
422 if (metadata_addr != stringify(map_addr)) {
423 dout(4) << "OSD[" << osd_id << "] addr change " << metadata_addr
424 << " != " << stringify(map_addr) << dendl;
425 update_meta = true;
426 } else {
427 dout(20) << "OSD[" << osd_id << "] addr unchanged: "
428 << metadata_addr << dendl;
429 }
430 } else {
431 // Awkward case where daemon went into DaemonState because it
432 // sent us a report but its metadata didn't get loaded yet
433 update_meta = true;
434 }
435 } else {
436 update_meta = true;
437 }
438
439 if (update_meta) {
440 daemon_state.notify_updating(k);
441 auto c = new MetadataUpdate(daemon_state, k);
442 std::ostringstream cmd;
443 cmd << "{\"prefix\": \"osd metadata\", \"id\": "
444 << osd_id << "}";
445 monc->start_mon_command(
446 {cmd.str()},
447 {}, &c->outbl, &c->outs, c);
448 }
449 }
450
451 cluster_state.notify_osdmap(osd_map);
452 });
453
454 // TODO: same culling for MonMap
224ce89b 455 daemon_state.cull("osd", names_exist);
7c673cae
FG
456}
457
458void Mgr::handle_log(MLog *m)
459{
460 for (const auto &e : m->entries) {
461 py_modules.notify_all(e);
462 }
463
464 m->put();
465}
466
224ce89b
WB
467void Mgr::handle_service_map(MServiceMap *m)
468{
469 dout(10) << "e" << m->service_map.epoch << dendl;
470 cluster_state.set_service_map(m->service_map);
471 server.got_service_map();
472}
473
7c673cae
FG
474bool Mgr::ms_dispatch(Message *m)
475{
476 dout(4) << *m << dendl;
477 Mutex::Locker l(lock);
478
479 switch (m->get_type()) {
480 case MSG_MGR_DIGEST:
481 handle_mgr_digest(static_cast<MMgrDigest*>(m));
482 break;
483 case CEPH_MSG_MON_MAP:
7c673cae
FG
484 py_modules.notify_all("mon_map", "");
485 m->put();
486 break;
487 case CEPH_MSG_FS_MAP:
488 py_modules.notify_all("fs_map", "");
489 handle_fs_map((MFSMap*)m);
490 return false; // I shall let this pass through for Client
491 break;
492 case CEPH_MSG_OSD_MAP:
493 handle_osd_map();
494
495 py_modules.notify_all("osd_map", "");
496
497 // Continuous subscribe, so that we can generate notifications
498 // for our MgrPyModules
499 objecter->maybe_request_map();
500 m->put();
501 break;
224ce89b
WB
502 case MSG_SERVICE_MAP:
503 handle_service_map((MServiceMap*)m);
504 py_modules.notify_all("service_map", "");
505 m->put();
506 break;
7c673cae
FG
507 case MSG_LOG:
508 handle_log(static_cast<MLog *>(m));
509 break;
510
511 default:
512 return false;
513 }
514 return true;
515}
516
517
518void Mgr::handle_fs_map(MFSMap* m)
519{
520 assert(lock.is_locked_by_me());
521
522 std::set<std::string> names_exist;
523
524 const FSMap &new_fsmap = m->get_fsmap();
525
526 fs_map_cond.Signal();
527
528 // TODO: callers (e.g. from python land) are potentially going to see
529 // the new fsmap before we've bothered populating all the resulting
530 // daemon_state. Maybe we should block python land while we're making
531 // this kind of update?
532
533 cluster_state.set_fsmap(new_fsmap);
534
535 auto mds_info = new_fsmap.get_mds_info();
536 for (const auto &i : mds_info) {
537 const auto &info = i.second;
538
539 if (!new_fsmap.gid_exists(i.first)){
540 continue;
541 }
542
543 // Remember which MDS exists so that we can cull any that don't
544 names_exist.insert(info.name);
545
224ce89b 546 const auto k = DaemonKey("mds", info.name);
7c673cae
FG
547 if (daemon_state.is_updating(k)) {
548 continue;
549 }
550
551 bool update = false;
552 if (daemon_state.exists(k)) {
553 auto metadata = daemon_state.get(k);
554 if (metadata->metadata.empty() ||
555 metadata->metadata.count("addr") == 0) {
556 update = true;
557 } else {
558 auto metadata_addr = metadata->metadata.at("addr");
559 const auto map_addr = info.addr;
560 update = metadata_addr != stringify(map_addr);
561 if (update) {
562 dout(4) << "MDS[" << info.name << "] addr change " << metadata_addr
563 << " != " << stringify(map_addr) << dendl;
564 }
565 }
566 } else {
567 update = true;
568 }
569
570 if (update) {
571 daemon_state.notify_updating(k);
572 auto c = new MetadataUpdate(daemon_state, k);
573
574 // Older MDS daemons don't have addr in the metadata, so
575 // fake it if the returned metadata doesn't have the field.
576 c->set_default("addr", stringify(info.addr));
577
578 std::ostringstream cmd;
579 cmd << "{\"prefix\": \"mds metadata\", \"who\": \""
580 << info.name << "\"}";
581 monc->start_mon_command(
582 {cmd.str()},
583 {}, &c->outbl, &c->outs, c);
584 }
585 }
224ce89b 586 daemon_state.cull("mds", names_exist);
7c673cae
FG
587}
588
224ce89b
WB
589bool Mgr::got_mgr_map(const MgrMap& m)
590{
591 Mutex::Locker l(lock);
592 dout(10) << m << dendl;
593
594 set<string> old_modules;
595 cluster_state.with_mgrmap([&](const MgrMap& m) {
596 old_modules = m.modules;
597 });
598 if (m.modules != old_modules) {
599 derr << "mgrmap module list changed to (" << m.modules << "), respawn"
600 << dendl;
601 return true;
602 }
603
604 cluster_state.set_mgr_map(m);
605
606 return false;
607}
7c673cae
FG
608
609void Mgr::handle_mgr_digest(MMgrDigest* m)
610{
611 dout(10) << m->mon_status_json.length() << dendl;
612 dout(10) << m->health_json.length() << dendl;
613 cluster_state.load_digest(m);
614 py_modules.notify_all("mon_status", "");
615 py_modules.notify_all("health", "");
616
617 // Hack: use this as a tick/opportunity to prompt python-land that
618 // the pgmap might have changed since last time we were here.
619 py_modules.notify_all("pg_summary", "");
620 dout(10) << "done." << dendl;
621
622 m->put();
224ce89b
WB
623
624 if (!digest_received) {
625 digest_received = true;
626 digest_cond.Signal();
627 }
7c673cae
FG
628}
629
31f18b77
FG
630void Mgr::tick()
631{
224ce89b 632 dout(10) << dendl;
31f18b77
FG
633 server.send_report();
634}