]> git.proxmox.com Git - ceph.git/blob - ceph/src/mgr/Mgr.cc
add subtree-ish sources for 12.0.3
[ceph.git] / ceph / src / mgr / Mgr.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2016 John Spray <john.spray@redhat.com>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 */
13
14 #include <Python.h>
15
16 #include "osdc/Objecter.h"
17 #include "client/Client.h"
18 #include "common/errno.h"
19 #include "mon/MonClient.h"
20 #include "include/stringify.h"
21 #include "global/global_context.h"
22 #include "global/signal_handler.h"
23
24 #include "mgr/MgrContext.h"
25
26 #include "MgrPyModule.h"
27 #include "DaemonServer.h"
28 #include "messages/MMgrBeacon.h"
29 #include "messages/MMgrDigest.h"
30 #include "messages/MCommand.h"
31 #include "messages/MCommandReply.h"
32 #include "messages/MLog.h"
33
34 #include "Mgr.h"
35
36 #define dout_context g_ceph_context
37 #define dout_subsys ceph_subsys_mgr
38 #undef dout_prefix
39 #define dout_prefix *_dout << "mgr " << __func__ << " "
40
41
42 Mgr::Mgr(MonClient *monc_, Messenger *clientm_, Objecter *objecter_,
43 Client* client_, LogChannelRef clog_, LogChannelRef audit_clog_) :
44 monc(monc_),
45 objecter(objecter_),
46 client(client_),
47 client_messenger(clientm_),
48 lock("Mgr::lock"),
49 timer(g_ceph_context, lock),
50 finisher(g_ceph_context, "Mgr", "mgr-fin"),
51 py_modules(daemon_state, cluster_state, *monc, *objecter, *client,
52 finisher),
53 cluster_state(monc, nullptr),
54 server(monc, finisher, daemon_state, cluster_state, py_modules,
55 clog_, audit_clog_),
56 initialized(false),
57 initializing(false)
58 {
59 cluster_state.set_objecter(objecter);
60 }
61
62
63 Mgr::~Mgr()
64 {
65 }
66
67
68 /**
69 * Context for completion of metadata mon commands: take
70 * the result and stash it in DaemonStateIndex
71 */
72 class MetadataUpdate : public Context
73 {
74 DaemonStateIndex &daemon_state;
75 DaemonKey key;
76
77 std::map<std::string, std::string> defaults;
78
79 public:
80 bufferlist outbl;
81 std::string outs;
82
83 MetadataUpdate(DaemonStateIndex &daemon_state_, const DaemonKey &key_)
84 : daemon_state(daemon_state_), key(key_) {}
85
86 void set_default(const std::string &k, const std::string &v)
87 {
88 defaults[k] = v;
89 }
90
91 void finish(int r) override
92 {
93 daemon_state.clear_updating(key);
94 if (r == 0) {
95 if (key.first == CEPH_ENTITY_TYPE_MDS) {
96 json_spirit::mValue json_result;
97 bool read_ok = json_spirit::read(
98 outbl.to_str(), json_result);
99 if (!read_ok) {
100 dout(1) << "mon returned invalid JSON for "
101 << ceph_entity_type_name(key.first)
102 << "." << key.second << dendl;
103 return;
104 }
105
106 json_spirit::mObject daemon_meta = json_result.get_obj();
107
108 // Apply any defaults
109 for (const auto &i : defaults) {
110 if (daemon_meta.find(i.first) == daemon_meta.end()) {
111 daemon_meta[i.first] = i.second;
112 }
113 }
114
115 DaemonStatePtr state;
116 if (daemon_state.exists(key)) {
117 state = daemon_state.get(key);
118 // TODO lock state
119 daemon_meta.erase("name");
120 daemon_meta.erase("hostname");
121 state->metadata.clear();
122 for (const auto &i : daemon_meta) {
123 state->metadata[i.first] = i.second.get_str();
124 }
125 } else {
126 state = std::make_shared<DaemonState>(daemon_state.types);
127 state->key = key;
128 state->hostname = daemon_meta.at("hostname").get_str();
129
130 for (const auto &i : daemon_meta) {
131 state->metadata[i.first] = i.second.get_str();
132 }
133
134 daemon_state.insert(state);
135 }
136 } else if (key.first == CEPH_ENTITY_TYPE_OSD) {
137 } else {
138 ceph_abort();
139 }
140 } else {
141 dout(1) << "mon failed to return metadata for "
142 << ceph_entity_type_name(key.first)
143 << "." << key.second << ": " << cpp_strerror(r) << dendl;
144 }
145 }
146 };
147
148
149 void Mgr::background_init()
150 {
151 Mutex::Locker l(lock);
152 assert(!initializing);
153 assert(!initialized);
154 initializing = true;
155
156 finisher.start();
157
158 finisher.queue(new FunctionContext([this](int r){
159 init();
160 }));
161 }
162
163 void Mgr::init()
164 {
165 Mutex::Locker l(lock);
166 assert(initializing);
167 assert(!initialized);
168
169 // Start communicating with daemons to learn statistics etc
170 int r = server.init(monc->get_global_id(), client_messenger->get_myaddr());
171 if (r < 0) {
172 derr << "Initialize server fail"<< dendl;
173 return;
174 }
175 dout(4) << "Initialized server at " << server.get_myaddr() << dendl;
176
177 // Preload all daemon metadata (will subsequently keep this
178 // up to date by watching maps, so do the initial load before
179 // we subscribe to any maps)
180 dout(4) << "Loading daemon metadata..." << dendl;
181 load_all_metadata();
182
183 // subscribe to all the maps
184 monc->sub_want("log-info", 0, 0);
185 monc->sub_want("mgrdigest", 0, 0);
186 monc->sub_want("fsmap", 0, 0);
187
188 dout(4) << "waiting for OSDMap..." << dendl;
189 // Subscribe to OSDMap update to pass on to ClusterState
190 objecter->maybe_request_map();
191
192 // reset the mon session. we get these maps through subscriptions which
193 // are stateful with the connection, so even if *we* don't have them a
194 // previous incarnation sharing the same MonClient may have.
195 monc->reopen_session();
196
197 // Start Objecter and wait for OSD map
198 lock.Unlock(); // Drop lock because OSDMap dispatch calls into my ms_dispatch
199 objecter->wait_for_osd_map();
200 lock.Lock();
201
202 // Populate PGs in ClusterState
203 objecter->with_osdmap([this](const OSDMap &osd_map) {
204 cluster_state.notify_osdmap(osd_map);
205 });
206
207 // Wait for FSMap
208 dout(4) << "waiting for FSMap..." << dendl;
209 while (!cluster_state.have_fsmap()) {
210 fs_map_cond.Wait(lock);
211 }
212
213 dout(4) << "waiting for config-keys..." << dendl;
214
215 // Preload config keys (`get` for plugins is to be a fast local
216 // operation, we we don't have to synchronize these later because
217 // all sets will come via mgr)
218 load_config();
219
220 // Wait for MgrDigest...?
221 // TODO
222
223 // assume finisher already initialized in background_init
224
225 py_modules.init();
226 py_modules.start();
227
228 dout(4) << "Complete." << dendl;
229 initializing = false;
230 initialized = true;
231 }
232
233 void Mgr::load_all_metadata()
234 {
235 assert(lock.is_locked_by_me());
236
237 JSONCommand mds_cmd;
238 mds_cmd.run(monc, "{\"prefix\": \"mds metadata\"}");
239 JSONCommand osd_cmd;
240 osd_cmd.run(monc, "{\"prefix\": \"osd metadata\"}");
241 JSONCommand mon_cmd;
242 mon_cmd.run(monc, "{\"prefix\": \"mon metadata\"}");
243
244 lock.Unlock();
245 mds_cmd.wait();
246 osd_cmd.wait();
247 mon_cmd.wait();
248 lock.Lock();
249
250 assert(mds_cmd.r == 0);
251 assert(mon_cmd.r == 0);
252 assert(osd_cmd.r == 0);
253
254 for (auto &metadata_val : mds_cmd.json_result.get_array()) {
255 json_spirit::mObject daemon_meta = metadata_val.get_obj();
256 if (daemon_meta.count("hostname") == 0) {
257 dout(1) << "Skipping incomplete metadata entry" << dendl;
258 continue;
259 }
260
261 DaemonStatePtr dm = std::make_shared<DaemonState>(daemon_state.types);
262 dm->key = DaemonKey(CEPH_ENTITY_TYPE_MDS,
263 daemon_meta.at("name").get_str());
264 dm->hostname = daemon_meta.at("hostname").get_str();
265
266 daemon_meta.erase("name");
267 daemon_meta.erase("hostname");
268
269 for (const auto &i : daemon_meta) {
270 dm->metadata[i.first] = i.second.get_str();
271 }
272
273 daemon_state.insert(dm);
274 }
275
276 for (auto &metadata_val : mon_cmd.json_result.get_array()) {
277 json_spirit::mObject daemon_meta = metadata_val.get_obj();
278 if (daemon_meta.count("hostname") == 0) {
279 dout(1) << "Skipping incomplete metadata entry" << dendl;
280 continue;
281 }
282
283 DaemonStatePtr dm = std::make_shared<DaemonState>(daemon_state.types);
284 dm->key = DaemonKey(CEPH_ENTITY_TYPE_MON,
285 daemon_meta.at("name").get_str());
286 dm->hostname = daemon_meta.at("hostname").get_str();
287
288 daemon_meta.erase("name");
289 daemon_meta.erase("hostname");
290
291 for (const auto &i : daemon_meta) {
292 dm->metadata[i.first] = i.second.get_str();
293 }
294
295 daemon_state.insert(dm);
296 }
297
298 for (auto &osd_metadata_val : osd_cmd.json_result.get_array()) {
299 json_spirit::mObject osd_metadata = osd_metadata_val.get_obj();
300 if (osd_metadata.count("hostname") == 0) {
301 dout(1) << "Skipping incomplete metadata entry" << dendl;
302 continue;
303 }
304 dout(4) << osd_metadata.at("hostname").get_str() << dendl;
305
306 DaemonStatePtr dm = std::make_shared<DaemonState>(daemon_state.types);
307 dm->key = DaemonKey(CEPH_ENTITY_TYPE_OSD,
308 stringify(osd_metadata.at("id").get_int()));
309 dm->hostname = osd_metadata.at("hostname").get_str();
310
311 osd_metadata.erase("id");
312 osd_metadata.erase("hostname");
313
314 for (const auto &i : osd_metadata) {
315 dm->metadata[i.first] = i.second.get_str();
316 }
317
318 daemon_state.insert(dm);
319 }
320 }
321
322 void Mgr::load_config()
323 {
324 assert(lock.is_locked_by_me());
325
326 dout(10) << "listing keys" << dendl;
327 JSONCommand cmd;
328 cmd.run(monc, "{\"prefix\": \"config-key list\"}");
329 lock.Unlock();
330 cmd.wait();
331 lock.Lock();
332 assert(cmd.r == 0);
333
334 std::map<std::string, std::string> loaded;
335
336 for (auto &key_str : cmd.json_result.get_array()) {
337 std::string const key = key_str.get_str();
338 dout(20) << "saw key '" << key << "'" << dendl;
339
340 const std::string config_prefix = PyModules::config_prefix;
341
342 if (key.substr(0, config_prefix.size()) == config_prefix) {
343 dout(20) << "fetching '" << key << "'" << dendl;
344 Command get_cmd;
345 std::ostringstream cmd_json;
346 cmd_json << "{\"prefix\": \"config-key get\", \"key\": \"" << key << "\"}";
347 get_cmd.run(monc, cmd_json.str());
348 get_cmd.wait();
349 assert(get_cmd.r == 0);
350
351 loaded[key] = get_cmd.outbl.to_str();
352 }
353 }
354
355 py_modules.insert_config(loaded);
356 }
357
358 void Mgr::shutdown()
359 {
360 finisher.queue(new FunctionContext([&](int) {
361 {
362 Mutex::Locker l(lock);
363 monc->sub_unwant("log-info");
364 monc->sub_unwant("mgrdigest");
365 monc->sub_unwant("fsmap");
366 // First stop the server so that we're not taking any more incoming
367 // requests
368 server.shutdown();
369 }
370 // after the messenger is stopped, signal modules to shutdown via finisher
371 py_modules.shutdown();
372 }));
373
374 // Then stop the finisher to ensure its enqueued contexts aren't going
375 // to touch references to the things we're about to tear down
376 finisher.wait_for_empty();
377 finisher.stop();
378 }
379
380 void Mgr::handle_osd_map()
381 {
382 assert(lock.is_locked_by_me());
383
384 std::set<std::string> names_exist;
385
386 /**
387 * When we see a new OSD map, inspect the entity addrs to
388 * see if they have changed (service restart), and if so
389 * reload the metadata.
390 */
391 objecter->with_osdmap([this, &names_exist](const OSDMap &osd_map) {
392 for (unsigned int osd_id = 0; osd_id < osd_map.get_num_osds(); ++osd_id) {
393 if (!osd_map.exists(osd_id)) {
394 continue;
395 }
396
397 // Remember which OSDs exist so that we can cull any that don't
398 names_exist.insert(stringify(osd_id));
399
400 // Consider whether to update the daemon metadata (new/restarted daemon)
401 bool update_meta = false;
402 const auto k = DaemonKey(CEPH_ENTITY_TYPE_OSD, stringify(osd_id));
403 if (daemon_state.is_updating(k)) {
404 continue;
405 }
406
407 if (daemon_state.exists(k)) {
408 auto metadata = daemon_state.get(k);
409 auto addr_iter = metadata->metadata.find("front_addr");
410 if (addr_iter != metadata->metadata.end()) {
411 const std::string &metadata_addr = addr_iter->second;
412 const auto &map_addr = osd_map.get_addr(osd_id);
413
414 if (metadata_addr != stringify(map_addr)) {
415 dout(4) << "OSD[" << osd_id << "] addr change " << metadata_addr
416 << " != " << stringify(map_addr) << dendl;
417 update_meta = true;
418 } else {
419 dout(20) << "OSD[" << osd_id << "] addr unchanged: "
420 << metadata_addr << dendl;
421 }
422 } else {
423 // Awkward case where daemon went into DaemonState because it
424 // sent us a report but its metadata didn't get loaded yet
425 update_meta = true;
426 }
427 } else {
428 update_meta = true;
429 }
430
431 if (update_meta) {
432 daemon_state.notify_updating(k);
433 auto c = new MetadataUpdate(daemon_state, k);
434 std::ostringstream cmd;
435 cmd << "{\"prefix\": \"osd metadata\", \"id\": "
436 << osd_id << "}";
437 monc->start_mon_command(
438 {cmd.str()},
439 {}, &c->outbl, &c->outs, c);
440 }
441 }
442
443 cluster_state.notify_osdmap(osd_map);
444 });
445
446 // TODO: same culling for MonMap
447 daemon_state.cull(CEPH_ENTITY_TYPE_OSD, names_exist);
448 }
449
450 void Mgr::handle_log(MLog *m)
451 {
452 for (const auto &e : m->entries) {
453 py_modules.notify_all(e);
454 }
455
456 m->put();
457 }
458
459 bool Mgr::ms_dispatch(Message *m)
460 {
461 dout(4) << *m << dendl;
462 Mutex::Locker l(lock);
463
464 switch (m->get_type()) {
465 case MSG_MGR_DIGEST:
466 handle_mgr_digest(static_cast<MMgrDigest*>(m));
467 break;
468 case CEPH_MSG_MON_MAP:
469 // FIXME: we probably never get called here because MonClient
470 // has consumed the message. For consuming OSDMap we need
471 // to be the tail dispatcher, but to see MonMap we would
472 // need to be at the head.
473 // Result is that ClusterState has access to monmap (it reads
474 // from monclient anyway), but we don't see notifications. Hook
475 // into MonClient to get notifications instead of messing
476 // with message delivery to achieve it?
477 ceph_abort();
478
479 py_modules.notify_all("mon_map", "");
480 m->put();
481 break;
482 case CEPH_MSG_FS_MAP:
483 py_modules.notify_all("fs_map", "");
484 handle_fs_map((MFSMap*)m);
485 return false; // I shall let this pass through for Client
486 break;
487 case CEPH_MSG_OSD_MAP:
488 handle_osd_map();
489
490 py_modules.notify_all("osd_map", "");
491
492 // Continuous subscribe, so that we can generate notifications
493 // for our MgrPyModules
494 objecter->maybe_request_map();
495 m->put();
496 break;
497 case MSG_LOG:
498 handle_log(static_cast<MLog *>(m));
499 break;
500
501 default:
502 return false;
503 }
504 return true;
505 }
506
507
508 void Mgr::handle_fs_map(MFSMap* m)
509 {
510 assert(lock.is_locked_by_me());
511
512 std::set<std::string> names_exist;
513
514 const FSMap &new_fsmap = m->get_fsmap();
515
516 fs_map_cond.Signal();
517
518 // TODO: callers (e.g. from python land) are potentially going to see
519 // the new fsmap before we've bothered populating all the resulting
520 // daemon_state. Maybe we should block python land while we're making
521 // this kind of update?
522
523 cluster_state.set_fsmap(new_fsmap);
524
525 auto mds_info = new_fsmap.get_mds_info();
526 for (const auto &i : mds_info) {
527 const auto &info = i.second;
528
529 if (!new_fsmap.gid_exists(i.first)){
530 continue;
531 }
532
533 // Remember which MDS exists so that we can cull any that don't
534 names_exist.insert(info.name);
535
536 const auto k = DaemonKey(CEPH_ENTITY_TYPE_MDS, info.name);
537 if (daemon_state.is_updating(k)) {
538 continue;
539 }
540
541 bool update = false;
542 if (daemon_state.exists(k)) {
543 auto metadata = daemon_state.get(k);
544 if (metadata->metadata.empty() ||
545 metadata->metadata.count("addr") == 0) {
546 update = true;
547 } else {
548 auto metadata_addr = metadata->metadata.at("addr");
549 const auto map_addr = info.addr;
550 update = metadata_addr != stringify(map_addr);
551 if (update) {
552 dout(4) << "MDS[" << info.name << "] addr change " << metadata_addr
553 << " != " << stringify(map_addr) << dendl;
554 }
555 }
556 } else {
557 update = true;
558 }
559
560 if (update) {
561 daemon_state.notify_updating(k);
562 auto c = new MetadataUpdate(daemon_state, k);
563
564 // Older MDS daemons don't have addr in the metadata, so
565 // fake it if the returned metadata doesn't have the field.
566 c->set_default("addr", stringify(info.addr));
567
568 std::ostringstream cmd;
569 cmd << "{\"prefix\": \"mds metadata\", \"who\": \""
570 << info.name << "\"}";
571 monc->start_mon_command(
572 {cmd.str()},
573 {}, &c->outbl, &c->outs, c);
574 }
575 }
576 daemon_state.cull(CEPH_ENTITY_TYPE_MDS, names_exist);
577 }
578
579
580 void Mgr::handle_mgr_digest(MMgrDigest* m)
581 {
582 dout(10) << m->mon_status_json.length() << dendl;
583 dout(10) << m->health_json.length() << dendl;
584 cluster_state.load_digest(m);
585 py_modules.notify_all("mon_status", "");
586 py_modules.notify_all("health", "");
587
588 // Hack: use this as a tick/opportunity to prompt python-land that
589 // the pgmap might have changed since last time we were here.
590 py_modules.notify_all("pg_summary", "");
591 dout(10) << "done." << dendl;
592
593 m->put();
594 }
595