]> git.proxmox.com Git - ceph.git/blob - ceph/src/mgr/Mgr.cc
update sources to v12.2.3
[ceph.git] / ceph / src / mgr / Mgr.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2016 John Spray <john.spray@redhat.com>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 */
13
14 #include <Python.h>
15
16 #include "osdc/Objecter.h"
17 #include "client/Client.h"
18 #include "common/errno.h"
19 #include "mon/MonClient.h"
20 #include "include/stringify.h"
21 #include "global/global_context.h"
22 #include "global/signal_handler.h"
23
24 #include "mgr/MgrContext.h"
25 #include "mgr/mgr_commands.h"
26
27 //#include "MgrPyModule.h"
28 #include "DaemonServer.h"
29 #include "messages/MMgrBeacon.h"
30 #include "messages/MMgrDigest.h"
31 #include "messages/MCommand.h"
32 #include "messages/MCommandReply.h"
33 #include "messages/MLog.h"
34 #include "messages/MServiceMap.h"
35
36 #include "Mgr.h"
37
38 #define dout_context g_ceph_context
39 #define dout_subsys ceph_subsys_mgr
40 #undef dout_prefix
41 #define dout_prefix *_dout << "mgr " << __func__ << " "
42
43
44 Mgr::Mgr(MonClient *monc_, const MgrMap& mgrmap,
45 PyModuleRegistry *py_module_registry_,
46 Messenger *clientm_, Objecter *objecter_,
47 Client* client_, LogChannelRef clog_, LogChannelRef audit_clog_) :
48 monc(monc_),
49 objecter(objecter_),
50 client(client_),
51 client_messenger(clientm_),
52 lock("Mgr::lock"),
53 timer(g_ceph_context, lock),
54 finisher(g_ceph_context, "Mgr", "mgr-fin"),
55 digest_received(false),
56 py_module_registry(py_module_registry_),
57 cluster_state(monc, nullptr, mgrmap),
58 server(monc, finisher, daemon_state, cluster_state, *py_module_registry,
59 clog_, audit_clog_),
60 clog(clog_),
61 audit_clog(audit_clog_),
62 initialized(false),
63 initializing(false)
64 {
65 cluster_state.set_objecter(objecter);
66 }
67
68
69 Mgr::~Mgr()
70 {
71 }
72
73 void MetadataUpdate::finish(int r)
74 {
75 daemon_state.clear_updating(key);
76 if (r == 0) {
77 if (key.first == "mds" || key.first == "osd") {
78 json_spirit::mValue json_result;
79 bool read_ok = json_spirit::read(
80 outbl.to_str(), json_result);
81 if (!read_ok) {
82 dout(1) << "mon returned invalid JSON for "
83 << key.first << "." << key.second << dendl;
84 return;
85 }
86 dout(4) << "mon returned valid metadata JSON for "
87 << key.first << "." << key.second << dendl;
88
89 json_spirit::mObject daemon_meta = json_result.get_obj();
90
91 // Apply any defaults
92 for (const auto &i : defaults) {
93 if (daemon_meta.find(i.first) == daemon_meta.end()) {
94 daemon_meta[i.first] = i.second;
95 }
96 }
97
98 DaemonStatePtr state;
99 if (daemon_state.exists(key)) {
100 state = daemon_state.get(key);
101 Mutex::Locker l(state->lock);
102 if (key.first == "mds") {
103 daemon_meta.erase("name");
104 } else if (key.first == "osd") {
105 daemon_meta.erase("id");
106 }
107 daemon_meta.erase("hostname");
108 state->metadata.clear();
109 for (const auto &i : daemon_meta) {
110 state->metadata[i.first] = i.second.get_str();
111 }
112 } else {
113 state = std::make_shared<DaemonState>(daemon_state.types);
114 state->key = key;
115 state->hostname = daemon_meta.at("hostname").get_str();
116
117 if (key.first == "mds") {
118 daemon_meta.erase("name");
119 } else if (key.first == "osd") {
120 daemon_meta.erase("id");
121 }
122 daemon_meta.erase("hostname");
123
124 for (const auto &i : daemon_meta) {
125 state->metadata[i.first] = i.second.get_str();
126 }
127
128 daemon_state.insert(state);
129 }
130 } else {
131 ceph_abort();
132 }
133 } else {
134 dout(1) << "mon failed to return metadata for "
135 << key.first << "." << key.second << ": "
136 << cpp_strerror(r) << dendl;
137 }
138 }
139
140 void Mgr::background_init(Context *completion)
141 {
142 Mutex::Locker l(lock);
143 assert(!initializing);
144 assert(!initialized);
145 initializing = true;
146
147 finisher.start();
148
149 finisher.queue(new FunctionContext([this, completion](int r){
150 init();
151 completion->complete(0);
152 }));
153 }
154
155 void Mgr::init()
156 {
157 Mutex::Locker l(lock);
158 assert(initializing);
159 assert(!initialized);
160
161 // Start communicating with daemons to learn statistics etc
162 int r = server.init(monc->get_global_id(), client_messenger->get_myaddr());
163 if (r < 0) {
164 derr << "Initialize server fail"<< dendl;
165 return;
166 }
167 dout(4) << "Initialized server at " << server.get_myaddr() << dendl;
168
169 // Preload all daemon metadata (will subsequently keep this
170 // up to date by watching maps, so do the initial load before
171 // we subscribe to any maps)
172 dout(4) << "Loading daemon metadata..." << dendl;
173 load_all_metadata();
174
175 // subscribe to all the maps
176 monc->sub_want("log-info", 0, 0);
177 monc->sub_want("mgrdigest", 0, 0);
178 monc->sub_want("fsmap", 0, 0);
179 monc->sub_want("servicemap", 0, 0);
180
181 dout(4) << "waiting for OSDMap..." << dendl;
182 // Subscribe to OSDMap update to pass on to ClusterState
183 objecter->maybe_request_map();
184
185 // reset the mon session. we get these maps through subscriptions which
186 // are stateful with the connection, so even if *we* don't have them a
187 // previous incarnation sharing the same MonClient may have.
188 monc->reopen_session();
189
190 // Start Objecter and wait for OSD map
191 lock.Unlock(); // Drop lock because OSDMap dispatch calls into my ms_dispatch
192 objecter->wait_for_osd_map();
193 lock.Lock();
194
195 // Populate PGs in ClusterState
196 objecter->with_osdmap([this](const OSDMap &osd_map) {
197 cluster_state.notify_osdmap(osd_map);
198 });
199
200 // Wait for FSMap
201 dout(4) << "waiting for FSMap..." << dendl;
202 while (!cluster_state.have_fsmap()) {
203 fs_map_cond.Wait(lock);
204 }
205
206 dout(4) << "waiting for config-keys..." << dendl;
207
208 // Preload config keys (`get` for plugins is to be a fast local
209 // operation, we we don't have to synchronize these later because
210 // all sets will come via mgr)
211 auto loaded_config = load_config();
212
213 // Wait for MgrDigest...
214 dout(4) << "waiting for MgrDigest..." << dendl;
215 while (!digest_received) {
216 digest_cond.Wait(lock);
217 }
218
219 // assume finisher already initialized in background_init
220 dout(4) << "starting python modules..." << dendl;
221 py_module_registry->active_start(loaded_config, daemon_state, cluster_state, *monc,
222 clog, *objecter, *client, finisher);
223
224 dout(4) << "Complete." << dendl;
225 initializing = false;
226 initialized = true;
227 }
228
229 void Mgr::load_all_metadata()
230 {
231 assert(lock.is_locked_by_me());
232
233 JSONCommand mds_cmd;
234 mds_cmd.run(monc, "{\"prefix\": \"mds metadata\"}");
235 JSONCommand osd_cmd;
236 osd_cmd.run(monc, "{\"prefix\": \"osd metadata\"}");
237 JSONCommand mon_cmd;
238 mon_cmd.run(monc, "{\"prefix\": \"mon metadata\"}");
239
240 lock.Unlock();
241 mds_cmd.wait();
242 osd_cmd.wait();
243 mon_cmd.wait();
244 lock.Lock();
245
246 assert(mds_cmd.r == 0);
247 assert(mon_cmd.r == 0);
248 assert(osd_cmd.r == 0);
249
250 for (auto &metadata_val : mds_cmd.json_result.get_array()) {
251 json_spirit::mObject daemon_meta = metadata_val.get_obj();
252 if (daemon_meta.count("hostname") == 0) {
253 dout(1) << "Skipping incomplete metadata entry" << dendl;
254 continue;
255 }
256
257 DaemonStatePtr dm = std::make_shared<DaemonState>(daemon_state.types);
258 dm->key = DaemonKey("mds",
259 daemon_meta.at("name").get_str());
260 dm->hostname = daemon_meta.at("hostname").get_str();
261
262 daemon_meta.erase("name");
263 daemon_meta.erase("hostname");
264
265 for (const auto &i : daemon_meta) {
266 dm->metadata[i.first] = i.second.get_str();
267 }
268
269 daemon_state.insert(dm);
270 }
271
272 for (auto &metadata_val : mon_cmd.json_result.get_array()) {
273 json_spirit::mObject daemon_meta = metadata_val.get_obj();
274 if (daemon_meta.count("hostname") == 0) {
275 dout(1) << "Skipping incomplete metadata entry" << dendl;
276 continue;
277 }
278
279 DaemonStatePtr dm = std::make_shared<DaemonState>(daemon_state.types);
280 dm->key = DaemonKey("mon",
281 daemon_meta.at("name").get_str());
282 dm->hostname = daemon_meta.at("hostname").get_str();
283
284 daemon_meta.erase("name");
285 daemon_meta.erase("hostname");
286
287 for (const auto &i : daemon_meta) {
288 dm->metadata[i.first] = i.second.get_str();
289 }
290
291 daemon_state.insert(dm);
292 }
293
294 for (auto &osd_metadata_val : osd_cmd.json_result.get_array()) {
295 json_spirit::mObject osd_metadata = osd_metadata_val.get_obj();
296 if (osd_metadata.count("hostname") == 0) {
297 dout(1) << "Skipping incomplete metadata entry" << dendl;
298 continue;
299 }
300 dout(4) << osd_metadata.at("hostname").get_str() << dendl;
301
302 DaemonStatePtr dm = std::make_shared<DaemonState>(daemon_state.types);
303 dm->key = DaemonKey("osd",
304 stringify(osd_metadata.at("id").get_int()));
305 dm->hostname = osd_metadata.at("hostname").get_str();
306
307 osd_metadata.erase("id");
308 osd_metadata.erase("hostname");
309
310 for (const auto &i : osd_metadata) {
311 dm->metadata[i.first] = i.second.get_str();
312 }
313
314 daemon_state.insert(dm);
315 }
316 }
317
318 std::map<std::string, std::string> Mgr::load_config()
319 {
320 assert(lock.is_locked_by_me());
321
322 dout(10) << "listing keys" << dendl;
323 JSONCommand cmd;
324 cmd.run(monc, "{\"prefix\": \"config-key ls\"}");
325 lock.Unlock();
326 cmd.wait();
327 lock.Lock();
328 assert(cmd.r == 0);
329
330 std::map<std::string, std::string> loaded;
331
332 for (auto &key_str : cmd.json_result.get_array()) {
333 std::string const key = key_str.get_str();
334 dout(20) << "saw key '" << key << "'" << dendl;
335
336 const std::string config_prefix = PyModuleRegistry::config_prefix;
337
338 if (key.substr(0, config_prefix.size()) == config_prefix) {
339 dout(20) << "fetching '" << key << "'" << dendl;
340 Command get_cmd;
341 std::ostringstream cmd_json;
342 cmd_json << "{\"prefix\": \"config-key get\", \"key\": \"" << key << "\"}";
343 get_cmd.run(monc, cmd_json.str());
344 lock.Unlock();
345 get_cmd.wait();
346 lock.Lock();
347 assert(get_cmd.r == 0);
348 loaded[key] = get_cmd.outbl.to_str();
349 }
350 }
351
352 return loaded;
353 }
354
355 void Mgr::shutdown()
356 {
357 finisher.queue(new FunctionContext([&](int) {
358 {
359 Mutex::Locker l(lock);
360 monc->sub_unwant("log-info");
361 monc->sub_unwant("mgrdigest");
362 monc->sub_unwant("fsmap");
363 // First stop the server so that we're not taking any more incoming
364 // requests
365 server.shutdown();
366 }
367 // after the messenger is stopped, signal modules to shutdown via finisher
368 py_module_registry->active_shutdown();
369 }));
370
371 // Then stop the finisher to ensure its enqueued contexts aren't going
372 // to touch references to the things we're about to tear down
373 finisher.wait_for_empty();
374 finisher.stop();
375 }
376
377 void Mgr::handle_osd_map()
378 {
379 assert(lock.is_locked_by_me());
380
381 std::set<std::string> names_exist;
382
383 /**
384 * When we see a new OSD map, inspect the entity addrs to
385 * see if they have changed (service restart), and if so
386 * reload the metadata.
387 */
388 objecter->with_osdmap([this, &names_exist](const OSDMap &osd_map) {
389 for (unsigned int osd_id = 0; osd_id < osd_map.get_max_osd(); ++osd_id) {
390 if (!osd_map.exists(osd_id)) {
391 continue;
392 }
393
394 // Remember which OSDs exist so that we can cull any that don't
395 names_exist.insert(stringify(osd_id));
396
397 // Consider whether to update the daemon metadata (new/restarted daemon)
398 bool update_meta = false;
399 const auto k = DaemonKey("osd", stringify(osd_id));
400 if (daemon_state.is_updating(k)) {
401 continue;
402 }
403
404 if (daemon_state.exists(k)) {
405 auto metadata = daemon_state.get(k);
406 Mutex::Locker l(metadata->lock);
407 auto addr_iter = metadata->metadata.find("front_addr");
408 if (addr_iter != metadata->metadata.end()) {
409 const std::string &metadata_addr = addr_iter->second;
410 const auto &map_addr = osd_map.get_addr(osd_id);
411
412 if (metadata_addr != stringify(map_addr)) {
413 dout(4) << "OSD[" << osd_id << "] addr change " << metadata_addr
414 << " != " << stringify(map_addr) << dendl;
415 update_meta = true;
416 } else {
417 dout(20) << "OSD[" << osd_id << "] addr unchanged: "
418 << metadata_addr << dendl;
419 }
420 } else {
421 // Awkward case where daemon went into DaemonState because it
422 // sent us a report but its metadata didn't get loaded yet
423 update_meta = true;
424 }
425 } else {
426 update_meta = true;
427 }
428
429 if (update_meta) {
430 daemon_state.notify_updating(k);
431 auto c = new MetadataUpdate(daemon_state, k);
432 std::ostringstream cmd;
433 cmd << "{\"prefix\": \"osd metadata\", \"id\": "
434 << osd_id << "}";
435 monc->start_mon_command(
436 {cmd.str()},
437 {}, &c->outbl, &c->outs, c);
438 }
439 }
440
441 cluster_state.notify_osdmap(osd_map);
442 });
443
444 // TODO: same culling for MonMap
445 daemon_state.cull("osd", names_exist);
446 }
447
448 void Mgr::handle_log(MLog *m)
449 {
450 for (const auto &e : m->entries) {
451 py_module_registry->notify_all(e);
452 }
453
454 m->put();
455 }
456
457 void Mgr::handle_service_map(MServiceMap *m)
458 {
459 dout(10) << "e" << m->service_map.epoch << dendl;
460 cluster_state.set_service_map(m->service_map);
461 server.got_service_map();
462 }
463
464 bool Mgr::ms_dispatch(Message *m)
465 {
466 dout(4) << *m << dendl;
467 Mutex::Locker l(lock);
468
469 switch (m->get_type()) {
470 case MSG_MGR_DIGEST:
471 handle_mgr_digest(static_cast<MMgrDigest*>(m));
472 break;
473 case CEPH_MSG_MON_MAP:
474 py_module_registry->notify_all("mon_map", "");
475 m->put();
476 break;
477 case CEPH_MSG_FS_MAP:
478 py_module_registry->notify_all("fs_map", "");
479 handle_fs_map((MFSMap*)m);
480 return false; // I shall let this pass through for Client
481 break;
482 case CEPH_MSG_OSD_MAP:
483 handle_osd_map();
484
485 py_module_registry->notify_all("osd_map", "");
486
487 // Continuous subscribe, so that we can generate notifications
488 // for our MgrPyModules
489 objecter->maybe_request_map();
490 m->put();
491 break;
492 case MSG_SERVICE_MAP:
493 handle_service_map((MServiceMap*)m);
494 py_module_registry->notify_all("service_map", "");
495 m->put();
496 break;
497 case MSG_LOG:
498 handle_log(static_cast<MLog *>(m));
499 break;
500
501 default:
502 return false;
503 }
504 return true;
505 }
506
507
508 void Mgr::handle_fs_map(MFSMap* m)
509 {
510 assert(lock.is_locked_by_me());
511
512 std::set<std::string> names_exist;
513
514 const FSMap &new_fsmap = m->get_fsmap();
515
516 fs_map_cond.Signal();
517
518 // TODO: callers (e.g. from python land) are potentially going to see
519 // the new fsmap before we've bothered populating all the resulting
520 // daemon_state. Maybe we should block python land while we're making
521 // this kind of update?
522
523 cluster_state.set_fsmap(new_fsmap);
524
525 auto mds_info = new_fsmap.get_mds_info();
526 for (const auto &i : mds_info) {
527 const auto &info = i.second;
528
529 if (!new_fsmap.gid_exists(i.first)){
530 continue;
531 }
532
533 // Remember which MDS exists so that we can cull any that don't
534 names_exist.insert(info.name);
535
536 const auto k = DaemonKey("mds", info.name);
537 if (daemon_state.is_updating(k)) {
538 continue;
539 }
540
541 bool update = false;
542 if (daemon_state.exists(k)) {
543 auto metadata = daemon_state.get(k);
544 Mutex::Locker l(metadata->lock);
545 if (metadata->metadata.empty() ||
546 metadata->metadata.count("addr") == 0) {
547 update = true;
548 } else {
549 auto metadata_addr = metadata->metadata.at("addr");
550 const auto map_addr = info.addr;
551 update = metadata_addr != stringify(map_addr);
552 if (update) {
553 dout(4) << "MDS[" << info.name << "] addr change " << metadata_addr
554 << " != " << stringify(map_addr) << dendl;
555 }
556 }
557 } else {
558 update = true;
559 }
560
561 if (update) {
562 daemon_state.notify_updating(k);
563 auto c = new MetadataUpdate(daemon_state, k);
564
565 // Older MDS daemons don't have addr in the metadata, so
566 // fake it if the returned metadata doesn't have the field.
567 c->set_default("addr", stringify(info.addr));
568
569 std::ostringstream cmd;
570 cmd << "{\"prefix\": \"mds metadata\", \"who\": \""
571 << info.name << "\"}";
572 monc->start_mon_command(
573 {cmd.str()},
574 {}, &c->outbl, &c->outs, c);
575 }
576 }
577 daemon_state.cull("mds", names_exist);
578 }
579
580 bool Mgr::got_mgr_map(const MgrMap& m)
581 {
582 Mutex::Locker l(lock);
583 dout(10) << m << dendl;
584
585 set<string> old_modules;
586 cluster_state.with_mgrmap([&](const MgrMap& m) {
587 old_modules = m.modules;
588 });
589 if (m.modules != old_modules) {
590 derr << "mgrmap module list changed to (" << m.modules << "), respawn"
591 << dendl;
592 return true;
593 }
594
595 cluster_state.set_mgr_map(m);
596
597 return false;
598 }
599
600 void Mgr::handle_mgr_digest(MMgrDigest* m)
601 {
602 dout(10) << m->mon_status_json.length() << dendl;
603 dout(10) << m->health_json.length() << dendl;
604 cluster_state.load_digest(m);
605 py_module_registry->notify_all("mon_status", "");
606 py_module_registry->notify_all("health", "");
607
608 // Hack: use this as a tick/opportunity to prompt python-land that
609 // the pgmap might have changed since last time we were here.
610 py_module_registry->notify_all("pg_summary", "");
611 dout(10) << "done." << dendl;
612
613 m->put();
614
615 if (!digest_received) {
616 digest_received = true;
617 digest_cond.Signal();
618 }
619 }
620
621 void Mgr::tick()
622 {
623 dout(10) << dendl;
624 server.send_report();
625 }
626
627 std::vector<MonCommand> Mgr::get_command_set() const
628 {
629 Mutex::Locker l(lock);
630
631 std::vector<MonCommand> commands = mgr_commands;
632 std::vector<MonCommand> py_commands = py_module_registry->get_commands();
633 commands.insert(commands.end(), py_commands.begin(), py_commands.end());
634 return commands;
635 }
636
637 std::map<std::string, std::string> Mgr::get_services() const
638 {
639 Mutex::Locker l(lock);
640
641 return py_module_registry->get_services();
642 }
643