]> git.proxmox.com Git - ceph.git/blob - ceph/src/mgr/ActivePyModules.cc
import ceph nautilus 14.2.2
[ceph.git] / ceph / src / mgr / ActivePyModules.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2014 John Spray <john.spray@inktank.com>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 */
13
14 // Include this first to get python headers earlier
15 #include "Gil.h"
16
17 #include "common/errno.h"
18 #include "include/stringify.h"
19
20 #include "PyFormatter.h"
21
22 #include "osd/OSDMap.h"
23 #include "mon/MonMap.h"
24
25 #include "mgr/MgrContext.h"
26
27 // For ::config_prefix
28 #include "PyModule.h"
29 #include "PyModuleRegistry.h"
30
31 #include "ActivePyModules.h"
32 #include "DaemonServer.h"
33
34 #define dout_context g_ceph_context
35 #define dout_subsys ceph_subsys_mgr
36 #undef dout_prefix
37 #define dout_prefix *_dout << "mgr " << __func__ << " "
38
39 ActivePyModules::ActivePyModules(PyModuleConfig &module_config_,
40 std::map<std::string, std::string> store_data,
41 DaemonStateIndex &ds, ClusterState &cs,
42 MonClient &mc, LogChannelRef clog_,
43 LogChannelRef audit_clog_, Objecter &objecter_,
44 Client &client_, Finisher &f, DaemonServer &server,
45 PyModuleRegistry &pmr)
46 : module_config(module_config_), daemon_state(ds), cluster_state(cs),
47 monc(mc), clog(clog_), audit_clog(audit_clog_), objecter(objecter_),
48 client(client_), finisher(f),
49 cmd_finisher(g_ceph_context, "cmd_finisher", "cmdfin"),
50 server(server), py_module_registry(pmr), lock("ActivePyModules")
51 {
52 store_cache = std::move(store_data);
53 cmd_finisher.start();
54 }
55
56 ActivePyModules::~ActivePyModules() = default;
57
58 void ActivePyModules::dump_server(const std::string &hostname,
59 const DaemonStateCollection &dmc,
60 Formatter *f)
61 {
62 f->dump_string("hostname", hostname);
63 f->open_array_section("services");
64 std::string ceph_version;
65
66 for (const auto &i : dmc) {
67 std::lock_guard l(i.second->lock);
68 const auto &key = i.first;
69 const std::string &str_type = key.first;
70 const std::string &svc_name = key.second;
71
72 // TODO: pick the highest version, and make sure that
73 // somewhere else (during health reporting?) we are
74 // indicating to the user if we see mixed versions
75 auto ver_iter = i.second->metadata.find("ceph_version");
76 if (ver_iter != i.second->metadata.end()) {
77 ceph_version = i.second->metadata.at("ceph_version");
78 }
79
80 f->open_object_section("service");
81 f->dump_string("type", str_type);
82 f->dump_string("id", svc_name);
83 f->close_section();
84 }
85 f->close_section();
86
87 f->dump_string("ceph_version", ceph_version);
88 }
89
90
91
92 PyObject *ActivePyModules::get_server_python(const std::string &hostname)
93 {
94 PyThreadState *tstate = PyEval_SaveThread();
95 std::lock_guard l(lock);
96 PyEval_RestoreThread(tstate);
97 dout(10) << " (" << hostname << ")" << dendl;
98
99 auto dmc = daemon_state.get_by_server(hostname);
100
101 PyFormatter f;
102 dump_server(hostname, dmc, &f);
103 return f.get();
104 }
105
106
107 PyObject *ActivePyModules::list_servers_python()
108 {
109 PyFormatter f(false, true);
110 PyThreadState *tstate = PyEval_SaveThread();
111 dout(10) << " >" << dendl;
112
113 daemon_state.with_daemons_by_server([this, &f, &tstate]
114 (const std::map<std::string, DaemonStateCollection> &all) {
115 PyEval_RestoreThread(tstate);
116
117 for (const auto &i : all) {
118 const auto &hostname = i.first;
119
120 f.open_object_section("server");
121 dump_server(hostname, i.second, &f);
122 f.close_section();
123 }
124 });
125
126 return f.get();
127 }
128
129 PyObject *ActivePyModules::get_metadata_python(
130 const std::string &svc_type,
131 const std::string &svc_id)
132 {
133 auto metadata = daemon_state.get(DaemonKey(svc_type, svc_id));
134 if (metadata == nullptr) {
135 derr << "Requested missing service " << svc_type << "." << svc_id << dendl;
136 Py_RETURN_NONE;
137 }
138
139 std::lock_guard l(metadata->lock);
140 PyFormatter f;
141 f.dump_string("hostname", metadata->hostname);
142 for (const auto &i : metadata->metadata) {
143 f.dump_string(i.first.c_str(), i.second);
144 }
145
146 return f.get();
147 }
148
149 PyObject *ActivePyModules::get_daemon_status_python(
150 const std::string &svc_type,
151 const std::string &svc_id)
152 {
153 auto metadata = daemon_state.get(DaemonKey(svc_type, svc_id));
154 if (metadata == nullptr) {
155 derr << "Requested missing service " << svc_type << "." << svc_id << dendl;
156 Py_RETURN_NONE;
157 }
158
159 std::lock_guard l(metadata->lock);
160 PyFormatter f;
161 for (const auto &i : metadata->service_status) {
162 f.dump_string(i.first.c_str(), i.second);
163 }
164 return f.get();
165 }
166
167 PyObject *ActivePyModules::get_python(const std::string &what)
168 {
169 PyFormatter f;
170
171 // Drop the GIL, as most of the following blocks will block on
172 // a mutex -- they are all responsible for re-taking the GIL before
173 // touching the PyFormatter instance or returning from the function.
174 PyThreadState *tstate = PyEval_SaveThread();
175
176 if (what == "fs_map") {
177 cluster_state.with_fsmap([&f, &tstate](const FSMap &fsmap) {
178 PyEval_RestoreThread(tstate);
179 fsmap.dump(&f);
180 });
181 return f.get();
182 } else if (what == "osdmap_crush_map_text") {
183 bufferlist rdata;
184 cluster_state.with_osdmap([&rdata, &tstate](const OSDMap &osd_map){
185 PyEval_RestoreThread(tstate);
186 osd_map.crush->encode(rdata, CEPH_FEATURES_SUPPORTED_DEFAULT);
187 });
188 std::string crush_text = rdata.to_str();
189 return PyString_FromString(crush_text.c_str());
190 } else if (what.substr(0, 7) == "osd_map") {
191 cluster_state.with_osdmap([&f, &what, &tstate](const OSDMap &osd_map){
192 PyEval_RestoreThread(tstate);
193 if (what == "osd_map") {
194 osd_map.dump(&f);
195 } else if (what == "osd_map_tree") {
196 osd_map.print_tree(&f, nullptr);
197 } else if (what == "osd_map_crush") {
198 osd_map.crush->dump(&f);
199 }
200 });
201 return f.get();
202 } else if (what.substr(0, 6) == "config") {
203 PyEval_RestoreThread(tstate);
204 if (what == "config_options") {
205 g_conf().config_options(&f);
206 } else if (what == "config") {
207 g_conf().show_config(&f);
208 }
209 return f.get();
210 } else if (what == "mon_map") {
211 cluster_state.with_monmap(
212 [&f, &tstate](const MonMap &monmap) {
213 PyEval_RestoreThread(tstate);
214 monmap.dump(&f);
215 }
216 );
217 return f.get();
218 } else if (what == "service_map") {
219 cluster_state.with_servicemap(
220 [&f, &tstate](const ServiceMap &service_map) {
221 PyEval_RestoreThread(tstate);
222 service_map.dump(&f);
223 }
224 );
225 return f.get();
226 } else if (what == "osd_metadata") {
227 auto dmc = daemon_state.get_by_service("osd");
228 PyEval_RestoreThread(tstate);
229
230 for (const auto &i : dmc) {
231 std::lock_guard l(i.second->lock);
232 f.open_object_section(i.first.second.c_str());
233 f.dump_string("hostname", i.second->hostname);
234 for (const auto &j : i.second->metadata) {
235 f.dump_string(j.first.c_str(), j.second);
236 }
237 f.close_section();
238 }
239 return f.get();
240 } else if (what == "pg_summary") {
241 cluster_state.with_pgmap(
242 [&f, &tstate](const PGMap &pg_map) {
243 PyEval_RestoreThread(tstate);
244
245 std::map<std::string, std::map<std::string, uint32_t> > osds;
246 std::map<std::string, std::map<std::string, uint32_t> > pools;
247 std::map<std::string, uint32_t> all;
248 for (const auto &i : pg_map.pg_stat) {
249 const auto pool = i.first.m_pool;
250 const std::string state = pg_state_string(i.second.state);
251 // Insert to per-pool map
252 pools[stringify(pool)][state]++;
253 for (const auto &osd_id : i.second.acting) {
254 osds[stringify(osd_id)][state]++;
255 }
256 all[state]++;
257 }
258 f.open_object_section("by_osd");
259 for (const auto &i : osds) {
260 f.open_object_section(i.first.c_str());
261 for (const auto &j : i.second) {
262 f.dump_int(j.first.c_str(), j.second);
263 }
264 f.close_section();
265 }
266 f.close_section();
267 f.open_object_section("by_pool");
268 for (const auto &i : pools) {
269 f.open_object_section(i.first.c_str());
270 for (const auto &j : i.second) {
271 f.dump_int(j.first.c_str(), j.second);
272 }
273 f.close_section();
274 }
275 f.close_section();
276 f.open_object_section("all");
277 for (const auto &i : all) {
278 f.dump_int(i.first.c_str(), i.second);
279 }
280 f.close_section();
281 f.open_object_section("pg_stats_sum");
282 pg_map.pg_sum.dump(&f);
283 f.close_section();
284 }
285 );
286 return f.get();
287 } else if (what == "pg_status") {
288 cluster_state.with_pgmap(
289 [&f, &tstate](const PGMap &pg_map) {
290 PyEval_RestoreThread(tstate);
291 pg_map.print_summary(&f, nullptr);
292 }
293 );
294 return f.get();
295 } else if (what == "pg_dump") {
296 cluster_state.with_pgmap(
297 [&f, &tstate](const PGMap &pg_map) {
298 PyEval_RestoreThread(tstate);
299 pg_map.dump(&f);
300 }
301 );
302 return f.get();
303 } else if (what == "devices") {
304 daemon_state.with_devices2(
305 [&tstate, &f]() {
306 PyEval_RestoreThread(tstate);
307 f.open_array_section("devices");
308 },
309 [&f] (const DeviceState& dev) {
310 f.dump_object("device", dev);
311 });
312 f.close_section();
313 return f.get();
314 } else if (what.size() > 7 &&
315 what.substr(0, 7) == "device ") {
316 string devid = what.substr(7);
317 daemon_state.with_device(devid, [&f, &tstate] (const DeviceState& dev) {
318 PyEval_RestoreThread(tstate);
319 f.dump_object("device", dev);
320 });
321 return f.get();
322 } else if (what == "io_rate") {
323 cluster_state.with_pgmap(
324 [&f, &tstate](const PGMap &pg_map) {
325 PyEval_RestoreThread(tstate);
326 pg_map.dump_delta(&f);
327 }
328 );
329 return f.get();
330 } else if (what == "df") {
331 cluster_state.with_osdmap_and_pgmap(
332 [this, &f, &tstate](
333 const OSDMap& osd_map,
334 const PGMap &pg_map) {
335 PyEval_RestoreThread(tstate);
336 pg_map.dump_cluster_stats(nullptr, &f, true);
337 pg_map.dump_pool_stats_full(osd_map, nullptr, &f, true);
338 });
339 return f.get();
340 } else if (what == "osd_stats") {
341 cluster_state.with_pgmap(
342 [&f, &tstate](const PGMap &pg_map) {
343 PyEval_RestoreThread(tstate);
344 pg_map.dump_osd_stats(&f);
345 });
346 return f.get();
347 } else if (what == "osd_pool_stats") {
348 int64_t poolid = -ENOENT;
349 string pool_name;
350 cluster_state.with_osdmap_and_pgmap([&](const OSDMap& osdmap,
351 const PGMap& pg_map) {
352 PyEval_RestoreThread(tstate);
353 f.open_array_section("pool_stats");
354 for (auto &p : osdmap.get_pools()) {
355 poolid = p.first;
356 pg_map.dump_pool_stats_and_io_rate(poolid, osdmap, &f, nullptr);
357 }
358 f.close_section();
359 });
360 return f.get();
361 } else if (what == "health" || what == "mon_status") {
362 bufferlist json;
363 if (what == "health") {
364 json = cluster_state.get_health();
365 } else if (what == "mon_status") {
366 json = cluster_state.get_mon_status();
367 } else {
368 ceph_abort();
369 }
370
371 PyEval_RestoreThread(tstate);
372 f.dump_string("json", json.to_str());
373 return f.get();
374 } else if (what == "mgr_map") {
375 cluster_state.with_mgrmap([&f, &tstate](const MgrMap &mgr_map) {
376 PyEval_RestoreThread(tstate);
377 mgr_map.dump(&f);
378 });
379 return f.get();
380 } else {
381 derr << "Python module requested unknown data '" << what << "'" << dendl;
382 PyEval_RestoreThread(tstate);
383 Py_RETURN_NONE;
384 }
385 }
386
387 void ActivePyModules::start_one(PyModuleRef py_module)
388 {
389 std::lock_guard l(lock);
390
391 ceph_assert(modules.count(py_module->get_name()) == 0);
392
393 const auto name = py_module->get_name();
394 modules[name].reset(new ActivePyModule(py_module, clog));
395 auto active_module = modules.at(name).get();
396
397 // Send all python calls down a Finisher to avoid blocking
398 // C++ code, and avoid any potential lock cycles.
399 finisher.queue(new FunctionContext([this, active_module, name](int) {
400 int r = active_module->load(this);
401 if (r != 0) {
402 derr << "Failed to run module in active mode ('" << name << "')"
403 << dendl;
404 std::lock_guard l(lock);
405 modules.erase(name);
406 } else {
407 dout(4) << "Starting thread for " << name << dendl;
408 active_module->thread.create(active_module->get_thread_name());
409 }
410 }));
411 }
412
413 void ActivePyModules::shutdown()
414 {
415 std::lock_guard locker(lock);
416
417 // Signal modules to drop out of serve() and/or tear down resources
418 for (auto &i : modules) {
419 auto module = i.second.get();
420 const auto& name = i.first;
421
422 lock.Unlock();
423 dout(10) << "calling module " << name << " shutdown()" << dendl;
424 module->shutdown();
425 dout(10) << "module " << name << " shutdown() returned" << dendl;
426 lock.Lock();
427 }
428
429 // For modules implementing serve(), finish the threads where we
430 // were running that.
431 for (auto &i : modules) {
432 lock.Unlock();
433 dout(10) << "joining module " << i.first << dendl;
434 i.second->thread.join();
435 dout(10) << "joined module " << i.first << dendl;
436 lock.Lock();
437 }
438
439 cmd_finisher.wait_for_empty();
440 cmd_finisher.stop();
441
442 modules.clear();
443 }
444
445 void ActivePyModules::notify_all(const std::string &notify_type,
446 const std::string &notify_id)
447 {
448 std::lock_guard l(lock);
449
450 dout(10) << __func__ << ": notify_all " << notify_type << dendl;
451 for (auto& i : modules) {
452 auto module = i.second.get();
453 // Send all python calls down a Finisher to avoid blocking
454 // C++ code, and avoid any potential lock cycles.
455 finisher.queue(new FunctionContext([module, notify_type, notify_id](int r){
456 module->notify(notify_type, notify_id);
457 }));
458 }
459 }
460
461 void ActivePyModules::notify_all(const LogEntry &log_entry)
462 {
463 std::lock_guard l(lock);
464
465 dout(10) << __func__ << ": notify_all (clog)" << dendl;
466 for (auto& i : modules) {
467 auto module = i.second.get();
468 // Send all python calls down a Finisher to avoid blocking
469 // C++ code, and avoid any potential lock cycles.
470 //
471 // Note intentional use of non-reference lambda binding on
472 // log_entry: we take a copy because caller's instance is
473 // probably ephemeral.
474 finisher.queue(new FunctionContext([module, log_entry](int r){
475 module->notify_clog(log_entry);
476 }));
477 }
478 }
479
480 bool ActivePyModules::get_store(const std::string &module_name,
481 const std::string &key, std::string *val) const
482 {
483 PyThreadState *tstate = PyEval_SaveThread();
484 std::lock_guard l(lock);
485 PyEval_RestoreThread(tstate);
486
487 const std::string global_key = PyModule::config_prefix
488 + module_name + "/" + key;
489
490 dout(4) << __func__ << " key: " << global_key << dendl;
491
492 auto i = store_cache.find(global_key);
493 if (i != store_cache.end()) {
494 *val = i->second;
495 return true;
496 } else {
497 return false;
498 }
499 }
500
501 PyObject *ActivePyModules::dispatch_remote(
502 const std::string &other_module,
503 const std::string &method,
504 PyObject *args,
505 PyObject *kwargs,
506 std::string *err)
507 {
508 auto mod_iter = modules.find(other_module);
509 ceph_assert(mod_iter != modules.end());
510
511 return mod_iter->second->dispatch_remote(method, args, kwargs, err);
512 }
513
514 bool ActivePyModules::get_config(const std::string &module_name,
515 const std::string &key, std::string *val) const
516 {
517 const std::string global_key = PyModule::config_prefix
518 + module_name + "/" + key;
519
520 dout(4) << __func__ << " key: " << global_key << dendl;
521
522 std::lock_guard lock(module_config.lock);
523
524 auto i = module_config.config.find(global_key);
525 if (i != module_config.config.end()) {
526 *val = i->second;
527 return true;
528 } else {
529 return false;
530 }
531 }
532
533 PyObject *ActivePyModules::get_typed_config(
534 const std::string &module_name,
535 const std::string &key,
536 const std::string &prefix) const
537 {
538 PyThreadState *tstate = PyEval_SaveThread();
539 std::string value;
540 std::string final_key;
541 bool found = false;
542 if (prefix.size()) {
543 final_key = prefix + "/" + key;
544 found = get_config(module_name, final_key, &value);
545 }
546 if (!found) {
547 final_key = key;
548 found = get_config(module_name, final_key, &value);
549 }
550 if (found) {
551 PyModuleRef module = py_module_registry.get_module(module_name);
552 PyEval_RestoreThread(tstate);
553 if (!module) {
554 derr << "Module '" << module_name << "' is not available" << dendl;
555 Py_RETURN_NONE;
556 }
557 dout(10) << __func__ << " " << final_key << " found: " << value << dendl;
558 return module->get_typed_option_value(key, value);
559 }
560 PyEval_RestoreThread(tstate);
561 if (prefix.size()) {
562 dout(4) << __func__ << " [" << prefix << "/]" << key << " not found "
563 << dendl;
564 } else {
565 dout(4) << __func__ << " " << key << " not found " << dendl;
566 }
567 Py_RETURN_NONE;
568 }
569
570 PyObject *ActivePyModules::get_store_prefix(const std::string &module_name,
571 const std::string &prefix) const
572 {
573 PyThreadState *tstate = PyEval_SaveThread();
574 std::lock_guard l(lock);
575 std::lock_guard lock(module_config.lock);
576 PyEval_RestoreThread(tstate);
577
578 const std::string base_prefix = PyModule::config_prefix
579 + module_name + "/";
580 const std::string global_prefix = base_prefix + prefix;
581 dout(4) << __func__ << " prefix: " << global_prefix << dendl;
582
583 PyFormatter f;
584
585 for (auto p = store_cache.lower_bound(global_prefix);
586 p != store_cache.end() && p->first.find(global_prefix) == 0;
587 ++p) {
588 f.dump_string(p->first.c_str() + base_prefix.size(), p->second);
589 }
590 return f.get();
591 }
592
593 void ActivePyModules::set_store(const std::string &module_name,
594 const std::string &key, const boost::optional<std::string>& val)
595 {
596 const std::string global_key = PyModule::config_prefix
597 + module_name + "/" + key;
598
599 Command set_cmd;
600 {
601 std::lock_guard l(lock);
602 if (val) {
603 store_cache[global_key] = *val;
604 } else {
605 store_cache.erase(global_key);
606 }
607
608 std::ostringstream cmd_json;
609 JSONFormatter jf;
610 jf.open_object_section("cmd");
611 if (val) {
612 jf.dump_string("prefix", "config-key set");
613 jf.dump_string("key", global_key);
614 jf.dump_string("val", *val);
615 } else {
616 jf.dump_string("prefix", "config-key del");
617 jf.dump_string("key", global_key);
618 }
619 jf.close_section();
620 jf.flush(cmd_json);
621 set_cmd.run(&monc, cmd_json.str());
622 }
623 set_cmd.wait();
624
625 if (set_cmd.r != 0) {
626 // config-key set will fail if mgr's auth key has insufficient
627 // permission to set config keys
628 // FIXME: should this somehow raise an exception back into Python land?
629 dout(0) << "`config-key set " << global_key << " " << val << "` failed: "
630 << cpp_strerror(set_cmd.r) << dendl;
631 dout(0) << "mon returned " << set_cmd.r << ": " << set_cmd.outs << dendl;
632 }
633 }
634
635 void ActivePyModules::set_config(const std::string &module_name,
636 const std::string &key, const boost::optional<std::string>& val)
637 {
638 module_config.set_config(&monc, module_name, key, val);
639 }
640
641 std::map<std::string, std::string> ActivePyModules::get_services() const
642 {
643 std::map<std::string, std::string> result;
644 std::lock_guard l(lock);
645 for (const auto& i : modules) {
646 const auto &module = i.second.get();
647 std::string svc_str = module->get_uri();
648 if (!svc_str.empty()) {
649 result[module->get_name()] = svc_str;
650 }
651 }
652
653 return result;
654 }
655
656 PyObject* ActivePyModules::with_perf_counters(
657 std::function<void(PerfCounterInstance& counter_instance, PerfCounterType& counter_type, PyFormatter& f)> fct,
658 const std::string &svc_name,
659 const std::string &svc_id,
660 const std::string &path) const
661 {
662 PyThreadState *tstate = PyEval_SaveThread();
663 std::lock_guard l(lock);
664 PyEval_RestoreThread(tstate);
665
666 PyFormatter f;
667 f.open_array_section(path.c_str());
668
669 auto metadata = daemon_state.get(DaemonKey(svc_name, svc_id));
670 if (metadata) {
671 std::lock_guard l2(metadata->lock);
672 if (metadata->perf_counters.instances.count(path)) {
673 auto counter_instance = metadata->perf_counters.instances.at(path);
674 auto counter_type = metadata->perf_counters.types.at(path);
675 fct(counter_instance, counter_type, f);
676 } else {
677 dout(4) << "Missing counter: '" << path << "' ("
678 << svc_name << "." << svc_id << ")" << dendl;
679 dout(20) << "Paths are:" << dendl;
680 for (const auto &i : metadata->perf_counters.instances) {
681 dout(20) << i.first << dendl;
682 }
683 }
684 } else {
685 dout(4) << "No daemon state for "
686 << svc_name << "." << svc_id << ")" << dendl;
687 }
688 f.close_section();
689 return f.get();
690 }
691
692 PyObject* ActivePyModules::get_counter_python(
693 const std::string &svc_name,
694 const std::string &svc_id,
695 const std::string &path)
696 {
697 auto extract_counters = [](
698 PerfCounterInstance& counter_instance,
699 PerfCounterType& counter_type,
700 PyFormatter& f)
701 {
702 if (counter_type.type & PERFCOUNTER_LONGRUNAVG) {
703 const auto &avg_data = counter_instance.get_data_avg();
704 for (const auto &datapoint : avg_data) {
705 f.open_array_section("datapoint");
706 f.dump_unsigned("t", datapoint.t.sec());
707 f.dump_unsigned("s", datapoint.s);
708 f.dump_unsigned("c", datapoint.c);
709 f.close_section();
710 }
711 } else {
712 const auto &data = counter_instance.get_data();
713 for (const auto &datapoint : data) {
714 f.open_array_section("datapoint");
715 f.dump_unsigned("t", datapoint.t.sec());
716 f.dump_unsigned("v", datapoint.v);
717 f.close_section();
718 }
719 }
720 };
721 return with_perf_counters(extract_counters, svc_name, svc_id, path);
722 }
723
724 PyObject* ActivePyModules::get_latest_counter_python(
725 const std::string &svc_name,
726 const std::string &svc_id,
727 const std::string &path)
728 {
729 auto extract_latest_counters = [](
730 PerfCounterInstance& counter_instance,
731 PerfCounterType& counter_type,
732 PyFormatter& f)
733 {
734 if (counter_type.type & PERFCOUNTER_LONGRUNAVG) {
735 const auto &datapoint = counter_instance.get_latest_data_avg();
736 f.dump_unsigned("t", datapoint.t.sec());
737 f.dump_unsigned("s", datapoint.s);
738 f.dump_unsigned("c", datapoint.c);
739 } else {
740 const auto &datapoint = counter_instance.get_latest_data();
741 f.dump_unsigned("t", datapoint.t.sec());
742 f.dump_unsigned("v", datapoint.v);
743 }
744 };
745 return with_perf_counters(extract_latest_counters, svc_name, svc_id, path);
746 }
747
748 PyObject* ActivePyModules::get_perf_schema_python(
749 const std::string &svc_type,
750 const std::string &svc_id)
751 {
752 PyThreadState *tstate = PyEval_SaveThread();
753 std::lock_guard l(lock);
754 PyEval_RestoreThread(tstate);
755
756 DaemonStateCollection daemons;
757
758 if (svc_type == "") {
759 daemons = daemon_state.get_all();
760 } else if (svc_id.empty()) {
761 daemons = daemon_state.get_by_service(svc_type);
762 } else {
763 auto key = DaemonKey(svc_type, svc_id);
764 // so that the below can be a loop in all cases
765 auto got = daemon_state.get(key);
766 if (got != nullptr) {
767 daemons[key] = got;
768 }
769 }
770
771 PyFormatter f;
772 if (!daemons.empty()) {
773 for (auto statepair : daemons) {
774 auto key = statepair.first;
775 auto state = statepair.second;
776
777 std::ostringstream daemon_name;
778 daemon_name << key.first << "." << key.second;
779 f.open_object_section(daemon_name.str().c_str());
780
781 std::lock_guard l(state->lock);
782 for (auto ctr_inst_iter : state->perf_counters.instances) {
783 const auto &counter_name = ctr_inst_iter.first;
784 f.open_object_section(counter_name.c_str());
785 auto type = state->perf_counters.types[counter_name];
786 f.dump_string("description", type.description);
787 if (!type.nick.empty()) {
788 f.dump_string("nick", type.nick);
789 }
790 f.dump_unsigned("type", type.type);
791 f.dump_unsigned("priority", type.priority);
792 f.dump_unsigned("units", type.unit);
793 f.close_section();
794 }
795 f.close_section();
796 }
797 } else {
798 dout(4) << __func__ << ": No daemon state found for "
799 << svc_type << "." << svc_id << ")" << dendl;
800 }
801 return f.get();
802 }
803
804 PyObject *ActivePyModules::get_context()
805 {
806 PyThreadState *tstate = PyEval_SaveThread();
807 std::lock_guard l(lock);
808 PyEval_RestoreThread(tstate);
809
810 // Construct a capsule containing ceph context.
811 // Not incrementing/decrementing ref count on the context because
812 // it's the global one and it has process lifetime.
813 auto capsule = PyCapsule_New(g_ceph_context, nullptr, nullptr);
814 return capsule;
815 }
816
817 /**
818 * Helper for our wrapped types that take a capsule in their constructor.
819 */
820 PyObject *construct_with_capsule(
821 const std::string &module_name,
822 const std::string &clsname,
823 void *wrapped)
824 {
825 // Look up the OSDMap type which we will construct
826 PyObject *module = PyImport_ImportModule(module_name.c_str());
827 if (!module) {
828 derr << "Failed to import python module:" << dendl;
829 derr << handle_pyerror() << dendl;
830 }
831 ceph_assert(module);
832
833 PyObject *wrapper_type = PyObject_GetAttrString(
834 module, (const char*)clsname.c_str());
835 if (!wrapper_type) {
836 derr << "Failed to get python type:" << dendl;
837 derr << handle_pyerror() << dendl;
838 }
839 ceph_assert(wrapper_type);
840
841 // Construct a capsule containing an OSDMap.
842 auto wrapped_capsule = PyCapsule_New(wrapped, nullptr, nullptr);
843 ceph_assert(wrapped_capsule);
844
845 // Construct the python OSDMap
846 auto pArgs = PyTuple_Pack(1, wrapped_capsule);
847 auto wrapper_instance = PyObject_CallObject(wrapper_type, pArgs);
848 if (wrapper_instance == nullptr) {
849 derr << "Failed to construct python OSDMap:" << dendl;
850 derr << handle_pyerror() << dendl;
851 }
852 ceph_assert(wrapper_instance != nullptr);
853 Py_DECREF(pArgs);
854 Py_DECREF(wrapped_capsule);
855
856 Py_DECREF(wrapper_type);
857 Py_DECREF(module);
858
859 return wrapper_instance;
860 }
861
862 PyObject *ActivePyModules::get_osdmap()
863 {
864 OSDMap *newmap = new OSDMap;
865
866 PyThreadState *tstate = PyEval_SaveThread();
867 {
868 std::lock_guard l(lock);
869 cluster_state.with_osdmap([&](const OSDMap& o) {
870 newmap->deepish_copy_from(o);
871 });
872 }
873 PyEval_RestoreThread(tstate);
874
875 return construct_with_capsule("mgr_module", "OSDMap", (void*)newmap);
876 }
877
878 void ActivePyModules::set_health_checks(const std::string& module_name,
879 health_check_map_t&& checks)
880 {
881 bool changed = false;
882
883 lock.Lock();
884 auto p = modules.find(module_name);
885 if (p != modules.end()) {
886 changed = p->second->set_health_checks(std::move(checks));
887 }
888 lock.Unlock();
889
890 // immediately schedule a report to be sent to the monitors with the new
891 // health checks that have changed. This is done asynchronusly to avoid
892 // blocking python land. ActivePyModules::lock needs to be dropped to make
893 // lockdep happy:
894 //
895 // send_report callers: DaemonServer::lock -> PyModuleRegistery::lock
896 // active_start: PyModuleRegistry::lock -> ActivePyModules::lock
897 //
898 // if we don't release this->lock before calling schedule_tick a cycle is
899 // formed with the addition of ActivePyModules::lock -> DaemonServer::lock.
900 // This is still correct as send_report is run asynchronously under
901 // DaemonServer::lock.
902 if (changed)
903 server.schedule_tick(0);
904 }
905
906 int ActivePyModules::handle_command(
907 std::string const &module_name,
908 const cmdmap_t &cmdmap,
909 const bufferlist &inbuf,
910 std::stringstream *ds,
911 std::stringstream *ss)
912 {
913 lock.Lock();
914 auto mod_iter = modules.find(module_name);
915 if (mod_iter == modules.end()) {
916 *ss << "Module '" << module_name << "' is not available";
917 lock.Unlock();
918 return -ENOENT;
919 }
920
921 lock.Unlock();
922 return mod_iter->second->handle_command(cmdmap, inbuf, ds, ss);
923 }
924
925 void ActivePyModules::get_health_checks(health_check_map_t *checks)
926 {
927 std::lock_guard l(lock);
928 for (auto& p : modules) {
929 p.second->get_health_checks(checks);
930 }
931 }
932
933 void ActivePyModules::update_progress_event(
934 const std::string& evid,
935 const std::string& desc,
936 float progress)
937 {
938 std::lock_guard l(lock);
939 auto& pe = progress_events[evid];
940 pe.message = desc;
941 pe.progress = progress;
942 }
943
944 void ActivePyModules::complete_progress_event(const std::string& evid)
945 {
946 std::lock_guard l(lock);
947 progress_events.erase(evid);
948 }
949
950 void ActivePyModules::clear_all_progress_events()
951 {
952 std::lock_guard l(lock);
953 progress_events.clear();
954 }
955
956 void ActivePyModules::get_progress_events(std::map<std::string,ProgressEvent> *events)
957 {
958 std::lock_guard l(lock);
959 *events = progress_events;
960 }
961
962 void ActivePyModules::config_notify()
963 {
964 std::lock_guard l(lock);
965 for (auto& i : modules) {
966 auto module = i.second.get();
967 // Send all python calls down a Finisher to avoid blocking
968 // C++ code, and avoid any potential lock cycles.
969 finisher.queue(new FunctionContext([module](int r){
970 module->config_notify();
971 }));
972 }
973 }
974
975 void ActivePyModules::set_uri(const std::string& module_name,
976 const std::string &uri)
977 {
978 std::lock_guard l(lock);
979
980 dout(4) << " module " << module_name << " set URI '" << uri << "'" << dendl;
981
982 modules[module_name]->set_uri(uri);
983 }
984
985 OSDPerfMetricQueryID ActivePyModules::add_osd_perf_query(
986 const OSDPerfMetricQuery &query,
987 const std::optional<OSDPerfMetricLimit> &limit)
988 {
989 return server.add_osd_perf_query(query, limit);
990 }
991
992 void ActivePyModules::remove_osd_perf_query(OSDPerfMetricQueryID query_id)
993 {
994 int r = server.remove_osd_perf_query(query_id);
995 if (r < 0) {
996 dout(0) << "remove_osd_perf_query for query_id=" << query_id << " failed: "
997 << cpp_strerror(r) << dendl;
998 }
999 }
1000
1001 PyObject *ActivePyModules::get_osd_perf_counters(OSDPerfMetricQueryID query_id)
1002 {
1003 std::map<OSDPerfMetricKey, PerformanceCounters> counters;
1004
1005 int r = server.get_osd_perf_counters(query_id, &counters);
1006 if (r < 0) {
1007 dout(0) << "get_osd_perf_counters for query_id=" << query_id << " failed: "
1008 << cpp_strerror(r) << dendl;
1009 Py_RETURN_NONE;
1010 }
1011
1012 PyFormatter f;
1013
1014 f.open_array_section("counters");
1015 for (auto &it : counters) {
1016 auto &key = it.first;
1017 auto &instance_counters = it.second;
1018 f.open_object_section("i");
1019 f.open_array_section("k");
1020 for (auto &sub_key : key) {
1021 f.open_array_section("s");
1022 for (size_t i = 0; i < sub_key.size(); i++) {
1023 f.dump_string(stringify(i).c_str(), sub_key[i]);
1024 }
1025 f.close_section(); // s
1026 }
1027 f.close_section(); // k
1028 f.open_array_section("c");
1029 for (auto &c : instance_counters) {
1030 f.open_array_section("p");
1031 f.dump_unsigned("0", c.first);
1032 f.dump_unsigned("1", c.second);
1033 f.close_section(); // p
1034 }
1035 f.close_section(); // c
1036 f.close_section(); // i
1037 }
1038 f.close_section(); // counters
1039
1040 return f.get();
1041 }
1042
1043 void ActivePyModules::cluster_log(const std::string &channel, clog_type prio,
1044 const std::string &message)
1045 {
1046 std::lock_guard l(lock);
1047
1048 if (channel == "audit") {
1049 audit_clog->do_log(prio, message);
1050 } else {
1051 clog->do_log(prio, message);
1052 }
1053 }