]> git.proxmox.com Git - ceph.git/blob - ceph/src/mgr/Mgr.cc
update sources to v12.1.2
[ceph.git] / ceph / src / mgr / Mgr.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2016 John Spray <john.spray@redhat.com>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 */
13
14 #include <Python.h>
15
16 #include "osdc/Objecter.h"
17 #include "client/Client.h"
18 #include "common/errno.h"
19 #include "mon/MonClient.h"
20 #include "include/stringify.h"
21 #include "global/global_context.h"
22 #include "global/signal_handler.h"
23
24 #include "mgr/MgrContext.h"
25 #include "mgr/mgr_commands.h"
26
27 #include "MgrPyModule.h"
28 #include "DaemonServer.h"
29 #include "messages/MMgrBeacon.h"
30 #include "messages/MMgrDigest.h"
31 #include "messages/MCommand.h"
32 #include "messages/MCommandReply.h"
33 #include "messages/MLog.h"
34 #include "messages/MServiceMap.h"
35
36 #include "Mgr.h"
37
38 #define dout_context g_ceph_context
39 #define dout_subsys ceph_subsys_mgr
40 #undef dout_prefix
41 #define dout_prefix *_dout << "mgr " << __func__ << " "
42
43
44 Mgr::Mgr(MonClient *monc_, const MgrMap& mgrmap,
45 Messenger *clientm_, Objecter *objecter_,
46 Client* client_, LogChannelRef clog_, LogChannelRef audit_clog_) :
47 monc(monc_),
48 objecter(objecter_),
49 client(client_),
50 client_messenger(clientm_),
51 lock("Mgr::lock"),
52 timer(g_ceph_context, lock),
53 finisher(g_ceph_context, "Mgr", "mgr-fin"),
54 digest_received(false),
55 py_modules(daemon_state, cluster_state, *monc, clog_, *objecter, *client,
56 finisher),
57 cluster_state(monc, nullptr, mgrmap),
58 server(monc, finisher, daemon_state, cluster_state, py_modules,
59 clog_, audit_clog_),
60 initialized(false),
61 initializing(false)
62 {
63 cluster_state.set_objecter(objecter);
64 }
65
66
67 Mgr::~Mgr()
68 {
69 }
70
71
72 /**
73 * Context for completion of metadata mon commands: take
74 * the result and stash it in DaemonStateIndex
75 */
76 class MetadataUpdate : public Context
77 {
78 DaemonStateIndex &daemon_state;
79 DaemonKey key;
80
81 std::map<std::string, std::string> defaults;
82
83 public:
84 bufferlist outbl;
85 std::string outs;
86
87 MetadataUpdate(DaemonStateIndex &daemon_state_, const DaemonKey &key_)
88 : daemon_state(daemon_state_), key(key_) {}
89
90 void set_default(const std::string &k, const std::string &v)
91 {
92 defaults[k] = v;
93 }
94
95 void finish(int r) override
96 {
97 daemon_state.clear_updating(key);
98 if (r == 0) {
99 if (key.first == "mds") {
100 json_spirit::mValue json_result;
101 bool read_ok = json_spirit::read(
102 outbl.to_str(), json_result);
103 if (!read_ok) {
104 dout(1) << "mon returned invalid JSON for "
105 << key.first << "." << key.second << dendl;
106 return;
107 }
108
109 json_spirit::mObject daemon_meta = json_result.get_obj();
110
111 // Apply any defaults
112 for (const auto &i : defaults) {
113 if (daemon_meta.find(i.first) == daemon_meta.end()) {
114 daemon_meta[i.first] = i.second;
115 }
116 }
117
118 DaemonStatePtr state;
119 if (daemon_state.exists(key)) {
120 state = daemon_state.get(key);
121 Mutex::Locker l(state->lock);
122 daemon_meta.erase("name");
123 daemon_meta.erase("hostname");
124 state->metadata.clear();
125 for (const auto &i : daemon_meta) {
126 state->metadata[i.first] = i.second.get_str();
127 }
128 } else {
129 state = std::make_shared<DaemonState>(daemon_state.types);
130 state->key = key;
131 state->hostname = daemon_meta.at("hostname").get_str();
132
133 for (const auto &i : daemon_meta) {
134 state->metadata[i.first] = i.second.get_str();
135 }
136
137 daemon_state.insert(state);
138 }
139 } else if (key.first == "osd") {
140 } else {
141 ceph_abort();
142 }
143 } else {
144 dout(1) << "mon failed to return metadata for "
145 << key.first << "." << key.second << ": "
146 << cpp_strerror(r) << dendl;
147 }
148 }
149 };
150
151
152 void Mgr::background_init(Context *completion)
153 {
154 Mutex::Locker l(lock);
155 assert(!initializing);
156 assert(!initialized);
157 initializing = true;
158
159 finisher.start();
160
161 finisher.queue(new FunctionContext([this, completion](int r){
162 init();
163 completion->complete(0);
164 }));
165 }
166
167 void Mgr::init()
168 {
169 Mutex::Locker l(lock);
170 assert(initializing);
171 assert(!initialized);
172
173 // Start communicating with daemons to learn statistics etc
174 int r = server.init(monc->get_global_id(), client_messenger->get_myaddr());
175 if (r < 0) {
176 derr << "Initialize server fail"<< dendl;
177 return;
178 }
179 dout(4) << "Initialized server at " << server.get_myaddr() << dendl;
180
181 // Preload all daemon metadata (will subsequently keep this
182 // up to date by watching maps, so do the initial load before
183 // we subscribe to any maps)
184 dout(4) << "Loading daemon metadata..." << dendl;
185 load_all_metadata();
186
187 // subscribe to all the maps
188 monc->sub_want("log-info", 0, 0);
189 monc->sub_want("mgrdigest", 0, 0);
190 monc->sub_want("fsmap", 0, 0);
191 monc->sub_want("servicemap", 0, 0);
192
193 dout(4) << "waiting for OSDMap..." << dendl;
194 // Subscribe to OSDMap update to pass on to ClusterState
195 objecter->maybe_request_map();
196
197 // reset the mon session. we get these maps through subscriptions which
198 // are stateful with the connection, so even if *we* don't have them a
199 // previous incarnation sharing the same MonClient may have.
200 monc->reopen_session();
201
202 // Start Objecter and wait for OSD map
203 lock.Unlock(); // Drop lock because OSDMap dispatch calls into my ms_dispatch
204 objecter->wait_for_osd_map();
205 lock.Lock();
206
207 // Populate PGs in ClusterState
208 objecter->with_osdmap([this](const OSDMap &osd_map) {
209 cluster_state.notify_osdmap(osd_map);
210 });
211
212 // Wait for FSMap
213 dout(4) << "waiting for FSMap..." << dendl;
214 while (!cluster_state.have_fsmap()) {
215 fs_map_cond.Wait(lock);
216 }
217
218 dout(4) << "waiting for config-keys..." << dendl;
219
220 // Preload config keys (`get` for plugins is to be a fast local
221 // operation, we we don't have to synchronize these later because
222 // all sets will come via mgr)
223 load_config();
224
225 // Wait for MgrDigest...
226 dout(4) << "waiting for MgrDigest..." << dendl;
227 while (!digest_received) {
228 digest_cond.Wait(lock);
229 }
230
231 // assume finisher already initialized in background_init
232 dout(4) << "starting PyModules..." << dendl;
233 py_modules.init();
234 py_modules.start();
235
236 dout(4) << "Complete." << dendl;
237 initializing = false;
238 initialized = true;
239 }
240
241 void Mgr::load_all_metadata()
242 {
243 assert(lock.is_locked_by_me());
244
245 JSONCommand mds_cmd;
246 mds_cmd.run(monc, "{\"prefix\": \"mds metadata\"}");
247 JSONCommand osd_cmd;
248 osd_cmd.run(monc, "{\"prefix\": \"osd metadata\"}");
249 JSONCommand mon_cmd;
250 mon_cmd.run(monc, "{\"prefix\": \"mon metadata\"}");
251
252 lock.Unlock();
253 mds_cmd.wait();
254 osd_cmd.wait();
255 mon_cmd.wait();
256 lock.Lock();
257
258 assert(mds_cmd.r == 0);
259 assert(mon_cmd.r == 0);
260 assert(osd_cmd.r == 0);
261
262 for (auto &metadata_val : mds_cmd.json_result.get_array()) {
263 json_spirit::mObject daemon_meta = metadata_val.get_obj();
264 if (daemon_meta.count("hostname") == 0) {
265 dout(1) << "Skipping incomplete metadata entry" << dendl;
266 continue;
267 }
268
269 DaemonStatePtr dm = std::make_shared<DaemonState>(daemon_state.types);
270 dm->key = DaemonKey("mds",
271 daemon_meta.at("name").get_str());
272 dm->hostname = daemon_meta.at("hostname").get_str();
273
274 daemon_meta.erase("name");
275 daemon_meta.erase("hostname");
276
277 for (const auto &i : daemon_meta) {
278 dm->metadata[i.first] = i.second.get_str();
279 }
280
281 daemon_state.insert(dm);
282 }
283
284 for (auto &metadata_val : mon_cmd.json_result.get_array()) {
285 json_spirit::mObject daemon_meta = metadata_val.get_obj();
286 if (daemon_meta.count("hostname") == 0) {
287 dout(1) << "Skipping incomplete metadata entry" << dendl;
288 continue;
289 }
290
291 DaemonStatePtr dm = std::make_shared<DaemonState>(daemon_state.types);
292 dm->key = DaemonKey("mon",
293 daemon_meta.at("name").get_str());
294 dm->hostname = daemon_meta.at("hostname").get_str();
295
296 daemon_meta.erase("name");
297 daemon_meta.erase("hostname");
298
299 for (const auto &i : daemon_meta) {
300 dm->metadata[i.first] = i.second.get_str();
301 }
302
303 daemon_state.insert(dm);
304 }
305
306 for (auto &osd_metadata_val : osd_cmd.json_result.get_array()) {
307 json_spirit::mObject osd_metadata = osd_metadata_val.get_obj();
308 if (osd_metadata.count("hostname") == 0) {
309 dout(1) << "Skipping incomplete metadata entry" << dendl;
310 continue;
311 }
312 dout(4) << osd_metadata.at("hostname").get_str() << dendl;
313
314 DaemonStatePtr dm = std::make_shared<DaemonState>(daemon_state.types);
315 dm->key = DaemonKey("osd",
316 stringify(osd_metadata.at("id").get_int()));
317 dm->hostname = osd_metadata.at("hostname").get_str();
318
319 osd_metadata.erase("id");
320 osd_metadata.erase("hostname");
321
322 for (const auto &i : osd_metadata) {
323 dm->metadata[i.first] = i.second.get_str();
324 }
325
326 daemon_state.insert(dm);
327 }
328 }
329
330 void Mgr::load_config()
331 {
332 assert(lock.is_locked_by_me());
333
334 dout(10) << "listing keys" << dendl;
335 JSONCommand cmd;
336 cmd.run(monc, "{\"prefix\": \"config-key ls\"}");
337 lock.Unlock();
338 cmd.wait();
339 lock.Lock();
340 assert(cmd.r == 0);
341
342 std::map<std::string, std::string> loaded;
343
344 for (auto &key_str : cmd.json_result.get_array()) {
345 std::string const key = key_str.get_str();
346 dout(20) << "saw key '" << key << "'" << dendl;
347
348 const std::string config_prefix = PyModules::config_prefix;
349
350 if (key.substr(0, config_prefix.size()) == config_prefix) {
351 dout(20) << "fetching '" << key << "'" << dendl;
352 Command get_cmd;
353 std::ostringstream cmd_json;
354 cmd_json << "{\"prefix\": \"config-key get\", \"key\": \"" << key << "\"}";
355 get_cmd.run(monc, cmd_json.str());
356 lock.Unlock();
357 get_cmd.wait();
358 lock.Lock();
359 assert(get_cmd.r == 0);
360 loaded[key] = get_cmd.outbl.to_str();
361 }
362 }
363
364 py_modules.insert_config(loaded);
365 }
366
367 void Mgr::shutdown()
368 {
369 finisher.queue(new FunctionContext([&](int) {
370 {
371 Mutex::Locker l(lock);
372 monc->sub_unwant("log-info");
373 monc->sub_unwant("mgrdigest");
374 monc->sub_unwant("fsmap");
375 // First stop the server so that we're not taking any more incoming
376 // requests
377 server.shutdown();
378 }
379 // after the messenger is stopped, signal modules to shutdown via finisher
380 py_modules.shutdown();
381 }));
382
383 // Then stop the finisher to ensure its enqueued contexts aren't going
384 // to touch references to the things we're about to tear down
385 finisher.wait_for_empty();
386 finisher.stop();
387 }
388
389 void Mgr::handle_osd_map()
390 {
391 assert(lock.is_locked_by_me());
392
393 std::set<std::string> names_exist;
394
395 /**
396 * When we see a new OSD map, inspect the entity addrs to
397 * see if they have changed (service restart), and if so
398 * reload the metadata.
399 */
400 objecter->with_osdmap([this, &names_exist](const OSDMap &osd_map) {
401 for (unsigned int osd_id = 0; osd_id < osd_map.get_num_osds(); ++osd_id) {
402 if (!osd_map.exists(osd_id)) {
403 continue;
404 }
405
406 // Remember which OSDs exist so that we can cull any that don't
407 names_exist.insert(stringify(osd_id));
408
409 // Consider whether to update the daemon metadata (new/restarted daemon)
410 bool update_meta = false;
411 const auto k = DaemonKey("osd", stringify(osd_id));
412 if (daemon_state.is_updating(k)) {
413 continue;
414 }
415
416 if (daemon_state.exists(k)) {
417 auto metadata = daemon_state.get(k);
418 Mutex::Locker l(metadata->lock);
419 auto addr_iter = metadata->metadata.find("front_addr");
420 if (addr_iter != metadata->metadata.end()) {
421 const std::string &metadata_addr = addr_iter->second;
422 const auto &map_addr = osd_map.get_addr(osd_id);
423
424 if (metadata_addr != stringify(map_addr)) {
425 dout(4) << "OSD[" << osd_id << "] addr change " << metadata_addr
426 << " != " << stringify(map_addr) << dendl;
427 update_meta = true;
428 } else {
429 dout(20) << "OSD[" << osd_id << "] addr unchanged: "
430 << metadata_addr << dendl;
431 }
432 } else {
433 // Awkward case where daemon went into DaemonState because it
434 // sent us a report but its metadata didn't get loaded yet
435 update_meta = true;
436 }
437 } else {
438 update_meta = true;
439 }
440
441 if (update_meta) {
442 daemon_state.notify_updating(k);
443 auto c = new MetadataUpdate(daemon_state, k);
444 std::ostringstream cmd;
445 cmd << "{\"prefix\": \"osd metadata\", \"id\": "
446 << osd_id << "}";
447 monc->start_mon_command(
448 {cmd.str()},
449 {}, &c->outbl, &c->outs, c);
450 }
451 }
452
453 cluster_state.notify_osdmap(osd_map);
454 });
455
456 // TODO: same culling for MonMap
457 daemon_state.cull("osd", names_exist);
458 }
459
460 void Mgr::handle_log(MLog *m)
461 {
462 for (const auto &e : m->entries) {
463 py_modules.notify_all(e);
464 }
465
466 m->put();
467 }
468
469 void Mgr::handle_service_map(MServiceMap *m)
470 {
471 dout(10) << "e" << m->service_map.epoch << dendl;
472 cluster_state.set_service_map(m->service_map);
473 server.got_service_map();
474 }
475
476 bool Mgr::ms_dispatch(Message *m)
477 {
478 dout(4) << *m << dendl;
479 Mutex::Locker l(lock);
480
481 switch (m->get_type()) {
482 case MSG_MGR_DIGEST:
483 handle_mgr_digest(static_cast<MMgrDigest*>(m));
484 break;
485 case CEPH_MSG_MON_MAP:
486 py_modules.notify_all("mon_map", "");
487 m->put();
488 break;
489 case CEPH_MSG_FS_MAP:
490 py_modules.notify_all("fs_map", "");
491 handle_fs_map((MFSMap*)m);
492 return false; // I shall let this pass through for Client
493 break;
494 case CEPH_MSG_OSD_MAP:
495 handle_osd_map();
496
497 py_modules.notify_all("osd_map", "");
498
499 // Continuous subscribe, so that we can generate notifications
500 // for our MgrPyModules
501 objecter->maybe_request_map();
502 m->put();
503 break;
504 case MSG_SERVICE_MAP:
505 handle_service_map((MServiceMap*)m);
506 py_modules.notify_all("service_map", "");
507 m->put();
508 break;
509 case MSG_LOG:
510 handle_log(static_cast<MLog *>(m));
511 break;
512
513 default:
514 return false;
515 }
516 return true;
517 }
518
519
520 void Mgr::handle_fs_map(MFSMap* m)
521 {
522 assert(lock.is_locked_by_me());
523
524 std::set<std::string> names_exist;
525
526 const FSMap &new_fsmap = m->get_fsmap();
527
528 fs_map_cond.Signal();
529
530 // TODO: callers (e.g. from python land) are potentially going to see
531 // the new fsmap before we've bothered populating all the resulting
532 // daemon_state. Maybe we should block python land while we're making
533 // this kind of update?
534
535 cluster_state.set_fsmap(new_fsmap);
536
537 auto mds_info = new_fsmap.get_mds_info();
538 for (const auto &i : mds_info) {
539 const auto &info = i.second;
540
541 if (!new_fsmap.gid_exists(i.first)){
542 continue;
543 }
544
545 // Remember which MDS exists so that we can cull any that don't
546 names_exist.insert(info.name);
547
548 const auto k = DaemonKey("mds", info.name);
549 if (daemon_state.is_updating(k)) {
550 continue;
551 }
552
553 bool update = false;
554 if (daemon_state.exists(k)) {
555 auto metadata = daemon_state.get(k);
556 Mutex::Locker l(metadata->lock);
557 if (metadata->metadata.empty() ||
558 metadata->metadata.count("addr") == 0) {
559 update = true;
560 } else {
561 auto metadata_addr = metadata->metadata.at("addr");
562 const auto map_addr = info.addr;
563 update = metadata_addr != stringify(map_addr);
564 if (update) {
565 dout(4) << "MDS[" << info.name << "] addr change " << metadata_addr
566 << " != " << stringify(map_addr) << dendl;
567 }
568 }
569 } else {
570 update = true;
571 }
572
573 if (update) {
574 daemon_state.notify_updating(k);
575 auto c = new MetadataUpdate(daemon_state, k);
576
577 // Older MDS daemons don't have addr in the metadata, so
578 // fake it if the returned metadata doesn't have the field.
579 c->set_default("addr", stringify(info.addr));
580
581 std::ostringstream cmd;
582 cmd << "{\"prefix\": \"mds metadata\", \"who\": \""
583 << info.name << "\"}";
584 monc->start_mon_command(
585 {cmd.str()},
586 {}, &c->outbl, &c->outs, c);
587 }
588 }
589 daemon_state.cull("mds", names_exist);
590 }
591
592 bool Mgr::got_mgr_map(const MgrMap& m)
593 {
594 Mutex::Locker l(lock);
595 dout(10) << m << dendl;
596
597 set<string> old_modules;
598 cluster_state.with_mgrmap([&](const MgrMap& m) {
599 old_modules = m.modules;
600 });
601 if (m.modules != old_modules) {
602 derr << "mgrmap module list changed to (" << m.modules << "), respawn"
603 << dendl;
604 return true;
605 }
606
607 cluster_state.set_mgr_map(m);
608
609 return false;
610 }
611
612 void Mgr::handle_mgr_digest(MMgrDigest* m)
613 {
614 dout(10) << m->mon_status_json.length() << dendl;
615 dout(10) << m->health_json.length() << dendl;
616 cluster_state.load_digest(m);
617 py_modules.notify_all("mon_status", "");
618 py_modules.notify_all("health", "");
619
620 // Hack: use this as a tick/opportunity to prompt python-land that
621 // the pgmap might have changed since last time we were here.
622 py_modules.notify_all("pg_summary", "");
623 dout(10) << "done." << dendl;
624
625 m->put();
626
627 if (!digest_received) {
628 digest_received = true;
629 digest_cond.Signal();
630 }
631 }
632
633 void Mgr::tick()
634 {
635 dout(10) << dendl;
636 server.send_report();
637 }
638
639 std::vector<MonCommand> Mgr::get_command_set() const
640 {
641 Mutex::Locker l(lock);
642
643 std::vector<MonCommand> commands = mgr_commands;
644 std::vector<MonCommand> py_commands = py_modules.get_commands();
645 commands.insert(commands.end(), py_commands.begin(), py_commands.end());
646 return commands;
647 }
648