]> git.proxmox.com Git - ceph.git/blob - ceph/src/mgr/ActivePyModules.cc
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / mgr / ActivePyModules.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2014 John Spray <john.spray@inktank.com>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 */
13
14 // Include this first to get python headers earlier
15 #include "Gil.h"
16
17 #include "common/errno.h"
18 #include "include/stringify.h"
19
20 #include "PyFormatter.h"
21
22 #include "osd/OSDMap.h"
23 #include "mon/MonMap.h"
24
25 #include "mgr/MgrContext.h"
26
27 // For ::config_prefix
28 #include "PyModule.h"
29 #include "PyModuleRegistry.h"
30
31 #include "ActivePyModules.h"
32 #include "DaemonServer.h"
33
34 #define dout_context g_ceph_context
35 #define dout_subsys ceph_subsys_mgr
36 #undef dout_prefix
37 #define dout_prefix *_dout << "mgr " << __func__ << " "
38
39 ActivePyModules::ActivePyModules(PyModuleConfig &module_config_,
40 std::map<std::string, std::string> store_data,
41 DaemonStateIndex &ds, ClusterState &cs,
42 MonClient &mc, LogChannelRef clog_,
43 LogChannelRef audit_clog_, Objecter &objecter_,
44 Client &client_, Finisher &f, DaemonServer &server,
45 PyModuleRegistry &pmr)
46 : module_config(module_config_), daemon_state(ds), cluster_state(cs),
47 monc(mc), clog(clog_), audit_clog(audit_clog_), objecter(objecter_),
48 client(client_), finisher(f),
49 server(server), py_module_registry(pmr), lock("ActivePyModules")
50 {
51 store_cache = std::move(store_data);
52 }
53
54 ActivePyModules::~ActivePyModules() = default;
55
56 void ActivePyModules::dump_server(const std::string &hostname,
57 const DaemonStateCollection &dmc,
58 Formatter *f)
59 {
60 f->dump_string("hostname", hostname);
61 f->open_array_section("services");
62 std::string ceph_version;
63
64 for (const auto &i : dmc) {
65 std::lock_guard l(i.second->lock);
66 const auto &key = i.first;
67 const std::string &str_type = key.first;
68 const std::string &svc_name = key.second;
69
70 // TODO: pick the highest version, and make sure that
71 // somewhere else (during health reporting?) we are
72 // indicating to the user if we see mixed versions
73 auto ver_iter = i.second->metadata.find("ceph_version");
74 if (ver_iter != i.second->metadata.end()) {
75 ceph_version = i.second->metadata.at("ceph_version");
76 }
77
78 f->open_object_section("service");
79 f->dump_string("type", str_type);
80 f->dump_string("id", svc_name);
81 f->close_section();
82 }
83 f->close_section();
84
85 f->dump_string("ceph_version", ceph_version);
86 }
87
88
89
90 PyObject *ActivePyModules::get_server_python(const std::string &hostname)
91 {
92 PyThreadState *tstate = PyEval_SaveThread();
93 std::lock_guard l(lock);
94 PyEval_RestoreThread(tstate);
95 dout(10) << " (" << hostname << ")" << dendl;
96
97 auto dmc = daemon_state.get_by_server(hostname);
98
99 PyFormatter f;
100 dump_server(hostname, dmc, &f);
101 return f.get();
102 }
103
104
105 PyObject *ActivePyModules::list_servers_python()
106 {
107 PyFormatter f(false, true);
108 PyThreadState *tstate = PyEval_SaveThread();
109 dout(10) << " >" << dendl;
110
111 daemon_state.with_daemons_by_server([this, &f, &tstate]
112 (const std::map<std::string, DaemonStateCollection> &all) {
113 PyEval_RestoreThread(tstate);
114
115 for (const auto &i : all) {
116 const auto &hostname = i.first;
117
118 f.open_object_section("server");
119 dump_server(hostname, i.second, &f);
120 f.close_section();
121 }
122 });
123
124 return f.get();
125 }
126
127 PyObject *ActivePyModules::get_metadata_python(
128 const std::string &svc_type,
129 const std::string &svc_id)
130 {
131 auto metadata = daemon_state.get(DaemonKey(svc_type, svc_id));
132 if (metadata == nullptr) {
133 derr << "Requested missing service " << svc_type << "." << svc_id << dendl;
134 Py_RETURN_NONE;
135 }
136
137 std::lock_guard l(metadata->lock);
138 PyFormatter f;
139 f.dump_string("hostname", metadata->hostname);
140 for (const auto &i : metadata->metadata) {
141 f.dump_string(i.first.c_str(), i.second);
142 }
143
144 return f.get();
145 }
146
147 PyObject *ActivePyModules::get_daemon_status_python(
148 const std::string &svc_type,
149 const std::string &svc_id)
150 {
151 auto metadata = daemon_state.get(DaemonKey(svc_type, svc_id));
152 if (metadata == nullptr) {
153 derr << "Requested missing service " << svc_type << "." << svc_id << dendl;
154 Py_RETURN_NONE;
155 }
156
157 std::lock_guard l(metadata->lock);
158 PyFormatter f;
159 for (const auto &i : metadata->service_status) {
160 f.dump_string(i.first.c_str(), i.second);
161 }
162 return f.get();
163 }
164
165 PyObject *ActivePyModules::get_python(const std::string &what)
166 {
167 PyFormatter f;
168
169 // Drop the GIL, as most of the following blocks will block on
170 // a mutex -- they are all responsible for re-taking the GIL before
171 // touching the PyFormatter instance or returning from the function.
172 PyThreadState *tstate = PyEval_SaveThread();
173
174 if (what == "fs_map") {
175 cluster_state.with_fsmap([&f, &tstate](const FSMap &fsmap) {
176 PyEval_RestoreThread(tstate);
177 fsmap.dump(&f);
178 });
179 return f.get();
180 } else if (what == "osdmap_crush_map_text") {
181 bufferlist rdata;
182 cluster_state.with_osdmap([&rdata, &tstate](const OSDMap &osd_map){
183 PyEval_RestoreThread(tstate);
184 osd_map.crush->encode(rdata, CEPH_FEATURES_SUPPORTED_DEFAULT);
185 });
186 std::string crush_text = rdata.to_str();
187 return PyString_FromString(crush_text.c_str());
188 } else if (what.substr(0, 7) == "osd_map") {
189 cluster_state.with_osdmap([&f, &what, &tstate](const OSDMap &osd_map){
190 PyEval_RestoreThread(tstate);
191 if (what == "osd_map") {
192 osd_map.dump(&f);
193 } else if (what == "osd_map_tree") {
194 osd_map.print_tree(&f, nullptr);
195 } else if (what == "osd_map_crush") {
196 osd_map.crush->dump(&f);
197 }
198 });
199 return f.get();
200 } else if (what.substr(0, 6) == "config") {
201 PyEval_RestoreThread(tstate);
202 if (what == "config_options") {
203 g_conf().config_options(&f);
204 } else if (what == "config") {
205 g_conf().show_config(&f);
206 }
207 return f.get();
208 } else if (what == "mon_map") {
209 cluster_state.with_monmap(
210 [&f, &tstate](const MonMap &monmap) {
211 PyEval_RestoreThread(tstate);
212 monmap.dump(&f);
213 }
214 );
215 return f.get();
216 } else if (what == "service_map") {
217 cluster_state.with_servicemap(
218 [&f, &tstate](const ServiceMap &service_map) {
219 PyEval_RestoreThread(tstate);
220 service_map.dump(&f);
221 }
222 );
223 return f.get();
224 } else if (what == "osd_metadata") {
225 auto dmc = daemon_state.get_by_service("osd");
226 PyEval_RestoreThread(tstate);
227
228 for (const auto &i : dmc) {
229 std::lock_guard l(i.second->lock);
230 f.open_object_section(i.first.second.c_str());
231 f.dump_string("hostname", i.second->hostname);
232 for (const auto &j : i.second->metadata) {
233 f.dump_string(j.first.c_str(), j.second);
234 }
235 f.close_section();
236 }
237 return f.get();
238 } else if (what == "pg_summary") {
239 cluster_state.with_pgmap(
240 [&f, &tstate](const PGMap &pg_map) {
241 PyEval_RestoreThread(tstate);
242
243 std::map<std::string, std::map<std::string, uint32_t> > osds;
244 std::map<std::string, std::map<std::string, uint32_t> > pools;
245 std::map<std::string, uint32_t> all;
246 for (const auto &i : pg_map.pg_stat) {
247 const auto pool = i.first.m_pool;
248 const std::string state = pg_state_string(i.second.state);
249 // Insert to per-pool map
250 pools[stringify(pool)][state]++;
251 for (const auto &osd_id : i.second.acting) {
252 osds[stringify(osd_id)][state]++;
253 }
254 all[state]++;
255 }
256 f.open_object_section("by_osd");
257 for (const auto &i : osds) {
258 f.open_object_section(i.first.c_str());
259 for (const auto &j : i.second) {
260 f.dump_int(j.first.c_str(), j.second);
261 }
262 f.close_section();
263 }
264 f.close_section();
265 f.open_object_section("by_pool");
266 for (const auto &i : pools) {
267 f.open_object_section(i.first.c_str());
268 for (const auto &j : i.second) {
269 f.dump_int(j.first.c_str(), j.second);
270 }
271 f.close_section();
272 }
273 f.close_section();
274 f.open_object_section("all");
275 for (const auto &i : all) {
276 f.dump_int(i.first.c_str(), i.second);
277 }
278 f.close_section();
279 f.open_object_section("pg_stats_sum");
280 pg_map.pg_sum.dump(&f);
281 f.close_section();
282 }
283 );
284 return f.get();
285 } else if (what == "pg_status") {
286 cluster_state.with_pgmap(
287 [&f, &tstate](const PGMap &pg_map) {
288 PyEval_RestoreThread(tstate);
289 pg_map.print_summary(&f, nullptr);
290 }
291 );
292 return f.get();
293 } else if (what == "pg_dump") {
294 cluster_state.with_pgmap(
295 [&f, &tstate](const PGMap &pg_map) {
296 PyEval_RestoreThread(tstate);
297 pg_map.dump(&f);
298 }
299 );
300 return f.get();
301 } else if (what == "devices") {
302 daemon_state.with_devices2(
303 [&tstate, &f]() {
304 PyEval_RestoreThread(tstate);
305 f.open_array_section("devices");
306 },
307 [&f] (const DeviceState& dev) {
308 f.dump_object("device", dev);
309 });
310 f.close_section();
311 return f.get();
312 } else if (what.size() > 7 &&
313 what.substr(0, 7) == "device ") {
314 string devid = what.substr(7);
315 daemon_state.with_device(devid, [&f, &tstate] (const DeviceState& dev) {
316 PyEval_RestoreThread(tstate);
317 f.dump_object("device", dev);
318 });
319 return f.get();
320 } else if (what == "io_rate") {
321 cluster_state.with_pgmap(
322 [&f, &tstate](const PGMap &pg_map) {
323 PyEval_RestoreThread(tstate);
324 pg_map.dump_delta(&f);
325 }
326 );
327 return f.get();
328 } else if (what == "df") {
329 cluster_state.with_osdmap_and_pgmap(
330 [this, &f, &tstate](
331 const OSDMap& osd_map,
332 const PGMap &pg_map) {
333 PyEval_RestoreThread(tstate);
334 pg_map.dump_cluster_stats(nullptr, &f, true);
335 pg_map.dump_pool_stats_full(osd_map, nullptr, &f, true);
336 });
337 return f.get();
338 } else if (what == "osd_stats") {
339 cluster_state.with_pgmap(
340 [&f, &tstate](const PGMap &pg_map) {
341 PyEval_RestoreThread(tstate);
342 pg_map.dump_osd_stats(&f);
343 });
344 return f.get();
345 } else if (what == "osd_pool_stats") {
346 int64_t poolid = -ENOENT;
347 string pool_name;
348 cluster_state.with_osdmap_and_pgmap([&](const OSDMap& osdmap,
349 const PGMap& pg_map) {
350 PyEval_RestoreThread(tstate);
351 f.open_array_section("pool_stats");
352 for (auto &p : osdmap.get_pools()) {
353 poolid = p.first;
354 pg_map.dump_pool_stats_and_io_rate(poolid, osdmap, &f, nullptr);
355 }
356 f.close_section();
357 });
358 return f.get();
359 } else if (what == "health" || what == "mon_status") {
360 bufferlist json;
361 if (what == "health") {
362 json = cluster_state.get_health();
363 } else if (what == "mon_status") {
364 json = cluster_state.get_mon_status();
365 } else {
366 ceph_abort();
367 }
368
369 PyEval_RestoreThread(tstate);
370 f.dump_string("json", json.to_str());
371 return f.get();
372 } else if (what == "mgr_map") {
373 cluster_state.with_mgrmap([&f, &tstate](const MgrMap &mgr_map) {
374 PyEval_RestoreThread(tstate);
375 mgr_map.dump(&f);
376 });
377 return f.get();
378 } else {
379 derr << "Python module requested unknown data '" << what << "'" << dendl;
380 PyEval_RestoreThread(tstate);
381 Py_RETURN_NONE;
382 }
383 }
384
385 void ActivePyModules::start_one(PyModuleRef py_module)
386 {
387 std::lock_guard l(lock);
388
389 ceph_assert(modules.count(py_module->get_name()) == 0);
390
391 const auto name = py_module->get_name();
392 modules[name].reset(new ActivePyModule(py_module, clog));
393 auto active_module = modules.at(name).get();
394
395 // Send all python calls down a Finisher to avoid blocking
396 // C++ code, and avoid any potential lock cycles.
397 finisher.queue(new FunctionContext([this, active_module, name](int) {
398 int r = active_module->load(this);
399 if (r != 0) {
400 derr << "Failed to run module in active mode ('" << name << "')"
401 << dendl;
402 std::lock_guard l(lock);
403 modules.erase(name);
404 } else {
405 dout(4) << "Starting thread for " << name << dendl;
406 active_module->thread.create(active_module->get_thread_name());
407 }
408 }));
409 }
410
411 void ActivePyModules::shutdown()
412 {
413 std::lock_guard locker(lock);
414
415 // Signal modules to drop out of serve() and/or tear down resources
416 for (auto &i : modules) {
417 auto module = i.second.get();
418 const auto& name = i.first;
419
420 lock.Unlock();
421 dout(10) << "calling module " << name << " shutdown()" << dendl;
422 module->shutdown();
423 dout(10) << "module " << name << " shutdown() returned" << dendl;
424 lock.Lock();
425 }
426
427 // For modules implementing serve(), finish the threads where we
428 // were running that.
429 for (auto &i : modules) {
430 lock.Unlock();
431 dout(10) << "joining module " << i.first << dendl;
432 i.second->thread.join();
433 dout(10) << "joined module " << i.first << dendl;
434 lock.Lock();
435 }
436
437 modules.clear();
438 }
439
440 void ActivePyModules::notify_all(const std::string &notify_type,
441 const std::string &notify_id)
442 {
443 std::lock_guard l(lock);
444
445 dout(10) << __func__ << ": notify_all " << notify_type << dendl;
446 for (auto& i : modules) {
447 auto module = i.second.get();
448 // Send all python calls down a Finisher to avoid blocking
449 // C++ code, and avoid any potential lock cycles.
450 finisher.queue(new FunctionContext([module, notify_type, notify_id](int r){
451 module->notify(notify_type, notify_id);
452 }));
453 }
454 }
455
456 void ActivePyModules::notify_all(const LogEntry &log_entry)
457 {
458 std::lock_guard l(lock);
459
460 dout(10) << __func__ << ": notify_all (clog)" << dendl;
461 for (auto& i : modules) {
462 auto module = i.second.get();
463 // Send all python calls down a Finisher to avoid blocking
464 // C++ code, and avoid any potential lock cycles.
465 //
466 // Note intentional use of non-reference lambda binding on
467 // log_entry: we take a copy because caller's instance is
468 // probably ephemeral.
469 finisher.queue(new FunctionContext([module, log_entry](int r){
470 module->notify_clog(log_entry);
471 }));
472 }
473 }
474
475 bool ActivePyModules::get_store(const std::string &module_name,
476 const std::string &key, std::string *val) const
477 {
478 PyThreadState *tstate = PyEval_SaveThread();
479 std::lock_guard l(lock);
480 PyEval_RestoreThread(tstate);
481
482 const std::string global_key = PyModule::config_prefix
483 + module_name + "/" + key;
484
485 dout(4) << __func__ << " key: " << global_key << dendl;
486
487 auto i = store_cache.find(global_key);
488 if (i != store_cache.end()) {
489 *val = i->second;
490 return true;
491 } else {
492 return false;
493 }
494 }
495
496 PyObject *ActivePyModules::dispatch_remote(
497 const std::string &other_module,
498 const std::string &method,
499 PyObject *args,
500 PyObject *kwargs,
501 std::string *err)
502 {
503 auto mod_iter = modules.find(other_module);
504 ceph_assert(mod_iter != modules.end());
505
506 return mod_iter->second->dispatch_remote(method, args, kwargs, err);
507 }
508
509 bool ActivePyModules::get_config(const std::string &module_name,
510 const std::string &key, std::string *val) const
511 {
512 const std::string global_key = PyModule::config_prefix
513 + module_name + "/" + key;
514
515 dout(4) << __func__ << " key: " << global_key << dendl;
516
517 std::lock_guard lock(module_config.lock);
518
519 auto i = module_config.config.find(global_key);
520 if (i != module_config.config.end()) {
521 *val = i->second;
522 return true;
523 } else {
524 return false;
525 }
526 }
527
528 PyObject *ActivePyModules::get_typed_config(
529 const std::string &module_name,
530 const std::string &key,
531 const std::string &prefix) const
532 {
533 PyThreadState *tstate = PyEval_SaveThread();
534 std::string value;
535 std::string final_key;
536 bool found = false;
537 if (prefix.size()) {
538 final_key = prefix + "/" + key;
539 found = get_config(module_name, final_key, &value);
540 }
541 if (!found) {
542 final_key = key;
543 found = get_config(module_name, final_key, &value);
544 }
545 if (found) {
546 PyModuleRef module = py_module_registry.get_module(module_name);
547 PyEval_RestoreThread(tstate);
548 if (!module) {
549 derr << "Module '" << module_name << "' is not available" << dendl;
550 Py_RETURN_NONE;
551 }
552 dout(10) << __func__ << " " << final_key << " found: " << value << dendl;
553 return module->get_typed_option_value(key, value);
554 }
555 PyEval_RestoreThread(tstate);
556 if (prefix.size()) {
557 dout(4) << __func__ << " [" << prefix << "/]" << key << " not found "
558 << dendl;
559 } else {
560 dout(4) << __func__ << " " << key << " not found " << dendl;
561 }
562 Py_RETURN_NONE;
563 }
564
565 PyObject *ActivePyModules::get_store_prefix(const std::string &module_name,
566 const std::string &prefix) const
567 {
568 PyThreadState *tstate = PyEval_SaveThread();
569 std::lock_guard l(lock);
570 std::lock_guard lock(module_config.lock);
571 PyEval_RestoreThread(tstate);
572
573 const std::string base_prefix = PyModule::config_prefix
574 + module_name + "/";
575 const std::string global_prefix = base_prefix + prefix;
576 dout(4) << __func__ << " prefix: " << global_prefix << dendl;
577
578 PyFormatter f;
579
580 for (auto p = store_cache.lower_bound(global_prefix);
581 p != store_cache.end() && p->first.find(global_prefix) == 0;
582 ++p) {
583 f.dump_string(p->first.c_str() + base_prefix.size(), p->second);
584 }
585 return f.get();
586 }
587
588 void ActivePyModules::set_store(const std::string &module_name,
589 const std::string &key, const boost::optional<std::string>& val)
590 {
591 const std::string global_key = PyModule::config_prefix
592 + module_name + "/" + key;
593
594 Command set_cmd;
595 {
596 PyThreadState *tstate = PyEval_SaveThread();
597 std::lock_guard l(lock);
598 PyEval_RestoreThread(tstate);
599
600 if (val) {
601 store_cache[global_key] = *val;
602 } else {
603 store_cache.erase(global_key);
604 }
605
606 std::ostringstream cmd_json;
607 JSONFormatter jf;
608 jf.open_object_section("cmd");
609 if (val) {
610 jf.dump_string("prefix", "config-key set");
611 jf.dump_string("key", global_key);
612 jf.dump_string("val", *val);
613 } else {
614 jf.dump_string("prefix", "config-key del");
615 jf.dump_string("key", global_key);
616 }
617 jf.close_section();
618 jf.flush(cmd_json);
619 set_cmd.run(&monc, cmd_json.str());
620 }
621 set_cmd.wait();
622
623 if (set_cmd.r != 0) {
624 // config-key set will fail if mgr's auth key has insufficient
625 // permission to set config keys
626 // FIXME: should this somehow raise an exception back into Python land?
627 dout(0) << "`config-key set " << global_key << " " << val << "` failed: "
628 << cpp_strerror(set_cmd.r) << dendl;
629 dout(0) << "mon returned " << set_cmd.r << ": " << set_cmd.outs << dendl;
630 }
631 }
632
633 void ActivePyModules::set_config(const std::string &module_name,
634 const std::string &key, const boost::optional<std::string>& val)
635 {
636 module_config.set_config(&monc, module_name, key, val);
637 }
638
639 std::map<std::string, std::string> ActivePyModules::get_services() const
640 {
641 std::map<std::string, std::string> result;
642 std::lock_guard l(lock);
643 for (const auto& i : modules) {
644 const auto &module = i.second.get();
645 std::string svc_str = module->get_uri();
646 if (!svc_str.empty()) {
647 result[module->get_name()] = svc_str;
648 }
649 }
650
651 return result;
652 }
653
654 PyObject* ActivePyModules::with_perf_counters(
655 std::function<void(PerfCounterInstance& counter_instance, PerfCounterType& counter_type, PyFormatter& f)> fct,
656 const std::string &svc_name,
657 const std::string &svc_id,
658 const std::string &path) const
659 {
660 PyThreadState *tstate = PyEval_SaveThread();
661 std::lock_guard l(lock);
662 PyEval_RestoreThread(tstate);
663
664 PyFormatter f;
665 f.open_array_section(path.c_str());
666
667 auto metadata = daemon_state.get(DaemonKey(svc_name, svc_id));
668 if (metadata) {
669 std::lock_guard l2(metadata->lock);
670 if (metadata->perf_counters.instances.count(path)) {
671 auto counter_instance = metadata->perf_counters.instances.at(path);
672 auto counter_type = metadata->perf_counters.types.at(path);
673 fct(counter_instance, counter_type, f);
674 } else {
675 dout(4) << "Missing counter: '" << path << "' ("
676 << svc_name << "." << svc_id << ")" << dendl;
677 dout(20) << "Paths are:" << dendl;
678 for (const auto &i : metadata->perf_counters.instances) {
679 dout(20) << i.first << dendl;
680 }
681 }
682 } else {
683 dout(4) << "No daemon state for "
684 << svc_name << "." << svc_id << ")" << dendl;
685 }
686 f.close_section();
687 return f.get();
688 }
689
690 PyObject* ActivePyModules::get_counter_python(
691 const std::string &svc_name,
692 const std::string &svc_id,
693 const std::string &path)
694 {
695 auto extract_counters = [](
696 PerfCounterInstance& counter_instance,
697 PerfCounterType& counter_type,
698 PyFormatter& f)
699 {
700 if (counter_type.type & PERFCOUNTER_LONGRUNAVG) {
701 const auto &avg_data = counter_instance.get_data_avg();
702 for (const auto &datapoint : avg_data) {
703 f.open_array_section("datapoint");
704 f.dump_unsigned("t", datapoint.t.sec());
705 f.dump_unsigned("s", datapoint.s);
706 f.dump_unsigned("c", datapoint.c);
707 f.close_section();
708 }
709 } else {
710 const auto &data = counter_instance.get_data();
711 for (const auto &datapoint : data) {
712 f.open_array_section("datapoint");
713 f.dump_unsigned("t", datapoint.t.sec());
714 f.dump_unsigned("v", datapoint.v);
715 f.close_section();
716 }
717 }
718 };
719 return with_perf_counters(extract_counters, svc_name, svc_id, path);
720 }
721
722 PyObject* ActivePyModules::get_latest_counter_python(
723 const std::string &svc_name,
724 const std::string &svc_id,
725 const std::string &path)
726 {
727 auto extract_latest_counters = [](
728 PerfCounterInstance& counter_instance,
729 PerfCounterType& counter_type,
730 PyFormatter& f)
731 {
732 if (counter_type.type & PERFCOUNTER_LONGRUNAVG) {
733 const auto &datapoint = counter_instance.get_latest_data_avg();
734 f.dump_unsigned("t", datapoint.t.sec());
735 f.dump_unsigned("s", datapoint.s);
736 f.dump_unsigned("c", datapoint.c);
737 } else {
738 const auto &datapoint = counter_instance.get_latest_data();
739 f.dump_unsigned("t", datapoint.t.sec());
740 f.dump_unsigned("v", datapoint.v);
741 }
742 };
743 return with_perf_counters(extract_latest_counters, svc_name, svc_id, path);
744 }
745
746 PyObject* ActivePyModules::get_perf_schema_python(
747 const std::string &svc_type,
748 const std::string &svc_id)
749 {
750 PyThreadState *tstate = PyEval_SaveThread();
751 std::lock_guard l(lock);
752 PyEval_RestoreThread(tstate);
753
754 DaemonStateCollection daemons;
755
756 if (svc_type == "") {
757 daemons = daemon_state.get_all();
758 } else if (svc_id.empty()) {
759 daemons = daemon_state.get_by_service(svc_type);
760 } else {
761 auto key = DaemonKey(svc_type, svc_id);
762 // so that the below can be a loop in all cases
763 auto got = daemon_state.get(key);
764 if (got != nullptr) {
765 daemons[key] = got;
766 }
767 }
768
769 PyFormatter f;
770 if (!daemons.empty()) {
771 for (auto statepair : daemons) {
772 auto key = statepair.first;
773 auto state = statepair.second;
774
775 std::ostringstream daemon_name;
776 daemon_name << key.first << "." << key.second;
777 f.open_object_section(daemon_name.str().c_str());
778
779 std::lock_guard l(state->lock);
780 for (auto ctr_inst_iter : state->perf_counters.instances) {
781 const auto &counter_name = ctr_inst_iter.first;
782 f.open_object_section(counter_name.c_str());
783 auto type = state->perf_counters.types[counter_name];
784 f.dump_string("description", type.description);
785 if (!type.nick.empty()) {
786 f.dump_string("nick", type.nick);
787 }
788 f.dump_unsigned("type", type.type);
789 f.dump_unsigned("priority", type.priority);
790 f.dump_unsigned("units", type.unit);
791 f.close_section();
792 }
793 f.close_section();
794 }
795 } else {
796 dout(4) << __func__ << ": No daemon state found for "
797 << svc_type << "." << svc_id << ")" << dendl;
798 }
799 return f.get();
800 }
801
802 PyObject *ActivePyModules::get_context()
803 {
804 PyThreadState *tstate = PyEval_SaveThread();
805 std::lock_guard l(lock);
806 PyEval_RestoreThread(tstate);
807
808 // Construct a capsule containing ceph context.
809 // Not incrementing/decrementing ref count on the context because
810 // it's the global one and it has process lifetime.
811 auto capsule = PyCapsule_New(g_ceph_context, nullptr, nullptr);
812 return capsule;
813 }
814
815 /**
816 * Helper for our wrapped types that take a capsule in their constructor.
817 */
818 PyObject *construct_with_capsule(
819 const std::string &module_name,
820 const std::string &clsname,
821 void *wrapped)
822 {
823 // Look up the OSDMap type which we will construct
824 PyObject *module = PyImport_ImportModule(module_name.c_str());
825 if (!module) {
826 derr << "Failed to import python module:" << dendl;
827 derr << handle_pyerror() << dendl;
828 }
829 ceph_assert(module);
830
831 PyObject *wrapper_type = PyObject_GetAttrString(
832 module, (const char*)clsname.c_str());
833 if (!wrapper_type) {
834 derr << "Failed to get python type:" << dendl;
835 derr << handle_pyerror() << dendl;
836 }
837 ceph_assert(wrapper_type);
838
839 // Construct a capsule containing an OSDMap.
840 auto wrapped_capsule = PyCapsule_New(wrapped, nullptr, nullptr);
841 ceph_assert(wrapped_capsule);
842
843 // Construct the python OSDMap
844 auto pArgs = PyTuple_Pack(1, wrapped_capsule);
845 auto wrapper_instance = PyObject_CallObject(wrapper_type, pArgs);
846 if (wrapper_instance == nullptr) {
847 derr << "Failed to construct python OSDMap:" << dendl;
848 derr << handle_pyerror() << dendl;
849 }
850 ceph_assert(wrapper_instance != nullptr);
851 Py_DECREF(pArgs);
852 Py_DECREF(wrapped_capsule);
853
854 Py_DECREF(wrapper_type);
855 Py_DECREF(module);
856
857 return wrapper_instance;
858 }
859
860 PyObject *ActivePyModules::get_osdmap()
861 {
862 OSDMap *newmap = new OSDMap;
863
864 PyThreadState *tstate = PyEval_SaveThread();
865 {
866 std::lock_guard l(lock);
867 cluster_state.with_osdmap([&](const OSDMap& o) {
868 newmap->deepish_copy_from(o);
869 });
870 }
871 PyEval_RestoreThread(tstate);
872
873 return construct_with_capsule("mgr_module", "OSDMap", (void*)newmap);
874 }
875
876 void ActivePyModules::set_health_checks(const std::string& module_name,
877 health_check_map_t&& checks)
878 {
879 bool changed = false;
880
881 lock.Lock();
882 auto p = modules.find(module_name);
883 if (p != modules.end()) {
884 changed = p->second->set_health_checks(std::move(checks));
885 }
886 lock.Unlock();
887
888 // immediately schedule a report to be sent to the monitors with the new
889 // health checks that have changed. This is done asynchronusly to avoid
890 // blocking python land. ActivePyModules::lock needs to be dropped to make
891 // lockdep happy:
892 //
893 // send_report callers: DaemonServer::lock -> PyModuleRegistery::lock
894 // active_start: PyModuleRegistry::lock -> ActivePyModules::lock
895 //
896 // if we don't release this->lock before calling schedule_tick a cycle is
897 // formed with the addition of ActivePyModules::lock -> DaemonServer::lock.
898 // This is still correct as send_report is run asynchronously under
899 // DaemonServer::lock.
900 if (changed)
901 server.schedule_tick(0);
902 }
903
904 int ActivePyModules::handle_command(
905 std::string const &module_name,
906 const cmdmap_t &cmdmap,
907 const bufferlist &inbuf,
908 std::stringstream *ds,
909 std::stringstream *ss)
910 {
911 lock.Lock();
912 auto mod_iter = modules.find(module_name);
913 if (mod_iter == modules.end()) {
914 *ss << "Module '" << module_name << "' is not available";
915 return -ENOENT;
916 }
917
918 lock.Unlock();
919 return mod_iter->second->handle_command(cmdmap, inbuf, ds, ss);
920 }
921
922 void ActivePyModules::get_health_checks(health_check_map_t *checks)
923 {
924 std::lock_guard l(lock);
925 for (auto& p : modules) {
926 p.second->get_health_checks(checks);
927 }
928 }
929
930 void ActivePyModules::update_progress_event(
931 const std::string& evid,
932 const std::string& desc,
933 float progress)
934 {
935 std::lock_guard l(lock);
936 auto& pe = progress_events[evid];
937 pe.message = desc;
938 pe.progress = progress;
939 }
940
941 void ActivePyModules::complete_progress_event(const std::string& evid)
942 {
943 std::lock_guard l(lock);
944 progress_events.erase(evid);
945 }
946
947 void ActivePyModules::clear_all_progress_events()
948 {
949 std::lock_guard l(lock);
950 progress_events.clear();
951 }
952
953 void ActivePyModules::get_progress_events(std::map<std::string,ProgressEvent> *events)
954 {
955 std::lock_guard l(lock);
956 *events = progress_events;
957 }
958
959 void ActivePyModules::config_notify()
960 {
961 std::lock_guard l(lock);
962 for (auto& i : modules) {
963 auto module = i.second.get();
964 // Send all python calls down a Finisher to avoid blocking
965 // C++ code, and avoid any potential lock cycles.
966 finisher.queue(new FunctionContext([module](int r){
967 module->config_notify();
968 }));
969 }
970 }
971
972 void ActivePyModules::set_uri(const std::string& module_name,
973 const std::string &uri)
974 {
975 std::lock_guard l(lock);
976
977 dout(4) << " module " << module_name << " set URI '" << uri << "'" << dendl;
978
979 modules[module_name]->set_uri(uri);
980 }
981
982 OSDPerfMetricQueryID ActivePyModules::add_osd_perf_query(
983 const OSDPerfMetricQuery &query,
984 const std::optional<OSDPerfMetricLimit> &limit)
985 {
986 return server.add_osd_perf_query(query, limit);
987 }
988
989 void ActivePyModules::remove_osd_perf_query(OSDPerfMetricQueryID query_id)
990 {
991 int r = server.remove_osd_perf_query(query_id);
992 if (r < 0) {
993 dout(0) << "remove_osd_perf_query for query_id=" << query_id << " failed: "
994 << cpp_strerror(r) << dendl;
995 }
996 }
997
998 PyObject *ActivePyModules::get_osd_perf_counters(OSDPerfMetricQueryID query_id)
999 {
1000 std::map<OSDPerfMetricKey, PerformanceCounters> counters;
1001
1002 int r = server.get_osd_perf_counters(query_id, &counters);
1003 if (r < 0) {
1004 dout(0) << "get_osd_perf_counters for query_id=" << query_id << " failed: "
1005 << cpp_strerror(r) << dendl;
1006 Py_RETURN_NONE;
1007 }
1008
1009 PyFormatter f;
1010
1011 f.open_array_section("counters");
1012 for (auto &it : counters) {
1013 auto &key = it.first;
1014 auto &instance_counters = it.second;
1015 f.open_object_section("i");
1016 f.open_array_section("k");
1017 for (auto &sub_key : key) {
1018 f.open_array_section("s");
1019 for (size_t i = 0; i < sub_key.size(); i++) {
1020 f.dump_string(stringify(i).c_str(), sub_key[i]);
1021 }
1022 f.close_section(); // s
1023 }
1024 f.close_section(); // k
1025 f.open_array_section("c");
1026 for (auto &c : instance_counters) {
1027 f.open_array_section("p");
1028 f.dump_unsigned("0", c.first);
1029 f.dump_unsigned("1", c.second);
1030 f.close_section(); // p
1031 }
1032 f.close_section(); // c
1033 f.close_section(); // i
1034 }
1035 f.close_section(); // counters
1036
1037 return f.get();
1038 }
1039
1040 void ActivePyModules::cluster_log(const std::string &channel, clog_type prio,
1041 const std::string &message)
1042 {
1043 std::lock_guard l(lock);
1044
1045 if (channel == "audit") {
1046 audit_clog->do_log(prio, message);
1047 } else {
1048 clog->do_log(prio, message);
1049 }
1050 }