1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2014 John Spray <john.spray@inktank.com>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
14 // Include this first to get python headers earlier
15 #include "BaseMgrModule.h"
18 #include "common/errno.h"
19 #include "include/stringify.h"
21 #include "PyFormatter.h"
23 #include "osd/OSDMap.h"
24 #include "mon/MonMap.h"
26 #include "mgr/MgrContext.h"
28 // For ::config_prefix
29 #include "PyModuleRegistry.h"
31 #include "ActivePyModules.h"
33 #define dout_context g_ceph_context
34 #define dout_subsys ceph_subsys_mgr
36 #define dout_prefix *_dout << "mgr " << __func__ << " "
39 ActivePyModules::ActivePyModules(PyModuleConfig
const &config_
,
40 DaemonStateIndex
&ds
, ClusterState
&cs
,
41 MonClient
&mc
, LogChannelRef clog_
, Objecter
&objecter_
,
42 Client
&client_
, Finisher
&f
)
43 : config_cache(config_
), daemon_state(ds
), cluster_state(cs
),
44 monc(mc
), clog(clog_
), objecter(objecter_
), client(client_
), finisher(f
),
45 lock("ActivePyModules")
48 ActivePyModules::~ActivePyModules() = default;
50 void ActivePyModules::dump_server(const std::string
&hostname
,
51 const DaemonStateCollection
&dmc
,
54 f
->dump_string("hostname", hostname
);
55 f
->open_array_section("services");
56 std::string ceph_version
;
58 for (const auto &i
: dmc
) {
59 Mutex::Locker
l(i
.second
->lock
);
60 const auto &key
= i
.first
;
61 const std::string
&str_type
= key
.first
;
62 const std::string
&svc_name
= key
.second
;
64 // TODO: pick the highest version, and make sure that
65 // somewhere else (during health reporting?) we are
66 // indicating to the user if we see mixed versions
67 auto ver_iter
= i
.second
->metadata
.find("ceph_version");
68 if (ver_iter
!= i
.second
->metadata
.end()) {
69 ceph_version
= i
.second
->metadata
.at("ceph_version");
72 f
->open_object_section("service");
73 f
->dump_string("type", str_type
);
74 f
->dump_string("id", svc_name
);
79 f
->dump_string("ceph_version", ceph_version
);
84 PyObject
*ActivePyModules::get_server_python(const std::string
&hostname
)
86 PyThreadState
*tstate
= PyEval_SaveThread();
87 Mutex::Locker
l(lock
);
88 PyEval_RestoreThread(tstate
);
89 dout(10) << " (" << hostname
<< ")" << dendl
;
91 auto dmc
= daemon_state
.get_by_server(hostname
);
94 dump_server(hostname
, dmc
, &f
);
99 PyObject
*ActivePyModules::list_servers_python()
101 PyThreadState
*tstate
= PyEval_SaveThread();
102 Mutex::Locker
l(lock
);
103 PyEval_RestoreThread(tstate
);
104 dout(10) << " >" << dendl
;
106 PyFormatter
f(false, true);
107 daemon_state
.with_daemons_by_server([this, &f
]
108 (const std::map
<std::string
, DaemonStateCollection
> &all
) {
109 for (const auto &i
: all
) {
110 const auto &hostname
= i
.first
;
112 f
.open_object_section("server");
113 dump_server(hostname
, i
.second
, &f
);
121 PyObject
*ActivePyModules::get_metadata_python(
122 const std::string
&svc_type
,
123 const std::string
&svc_id
)
125 auto metadata
= daemon_state
.get(DaemonKey(svc_type
, svc_id
));
126 if (metadata
== nullptr) {
127 derr
<< "Requested missing service " << svc_type
<< "." << svc_id
<< dendl
;
131 Mutex::Locker
l(metadata
->lock
);
133 f
.dump_string("hostname", metadata
->hostname
);
134 for (const auto &i
: metadata
->metadata
) {
135 f
.dump_string(i
.first
.c_str(), i
.second
);
141 PyObject
*ActivePyModules::get_daemon_status_python(
142 const std::string
&svc_type
,
143 const std::string
&svc_id
)
145 auto metadata
= daemon_state
.get(DaemonKey(svc_type
, svc_id
));
146 if (metadata
== nullptr) {
147 derr
<< "Requested missing service " << svc_type
<< "." << svc_id
<< dendl
;
151 Mutex::Locker
l(metadata
->lock
);
153 for (const auto &i
: metadata
->service_status
) {
154 f
.dump_string(i
.first
.c_str(), i
.second
);
159 PyObject
*ActivePyModules::get_python(const std::string
&what
)
161 PyThreadState
*tstate
= PyEval_SaveThread();
162 Mutex::Locker
l(lock
);
163 PyEval_RestoreThread(tstate
);
165 if (what
== "fs_map") {
167 cluster_state
.with_fsmap([&f
](const FSMap
&fsmap
) {
171 } else if (what
== "osdmap_crush_map_text") {
173 cluster_state
.with_osdmap([&rdata
](const OSDMap
&osd_map
){
174 osd_map
.crush
->encode(rdata
, CEPH_FEATURES_SUPPORTED_DEFAULT
);
176 std::string crush_text
= rdata
.to_str();
177 return PyString_FromString(crush_text
.c_str());
178 } else if (what
.substr(0, 7) == "osd_map") {
180 cluster_state
.with_osdmap([&f
, &what
](const OSDMap
&osd_map
){
181 if (what
== "osd_map") {
183 } else if (what
== "osd_map_tree") {
184 osd_map
.print_tree(&f
, nullptr);
185 } else if (what
== "osd_map_crush") {
186 osd_map
.crush
->dump(&f
);
190 } else if (what
.substr(0, 6) == "config") {
192 if (what
== "config_options") {
193 g_conf
->config_options(&f
);
194 } else if (what
== "config") {
195 g_conf
->show_config(&f
);
198 } else if (what
== "mon_map") {
200 cluster_state
.with_monmap(
201 [&f
](const MonMap
&monmap
) {
206 } else if (what
== "service_map") {
208 cluster_state
.with_servicemap(
209 [&f
](const ServiceMap
&service_map
) {
210 service_map
.dump(&f
);
214 } else if (what
== "osd_metadata") {
216 auto dmc
= daemon_state
.get_by_service("osd");
217 for (const auto &i
: dmc
) {
218 Mutex::Locker
l(i
.second
->lock
);
219 f
.open_object_section(i
.first
.second
.c_str());
220 f
.dump_string("hostname", i
.second
->hostname
);
221 for (const auto &j
: i
.second
->metadata
) {
222 f
.dump_string(j
.first
.c_str(), j
.second
);
227 } else if (what
== "pg_summary") {
229 cluster_state
.with_pgmap(
230 [&f
](const PGMap
&pg_map
) {
231 std::map
<std::string
, std::map
<std::string
, uint32_t> > osds
;
232 std::map
<std::string
, std::map
<std::string
, uint32_t> > pools
;
233 std::map
<std::string
, uint32_t> all
;
234 for (const auto &i
: pg_map
.pg_stat
) {
235 const auto pool
= i
.first
.m_pool
;
236 const std::string state
= pg_state_string(i
.second
.state
);
237 // Insert to per-pool map
238 pools
[stringify(pool
)][state
]++;
239 for (const auto &osd_id
: i
.second
.acting
) {
240 osds
[stringify(osd_id
)][state
]++;
244 f
.open_object_section("by_osd");
245 for (const auto &i
: osds
) {
246 f
.open_object_section(i
.first
.c_str());
247 for (const auto &j
: i
.second
) {
248 f
.dump_int(j
.first
.c_str(), j
.second
);
253 f
.open_object_section("by_pool");
254 for (const auto &i
: pools
) {
255 f
.open_object_section(i
.first
.c_str());
256 for (const auto &j
: i
.second
) {
257 f
.dump_int(j
.first
.c_str(), j
.second
);
262 f
.open_object_section("all");
263 for (const auto &i
: all
) {
264 f
.dump_int(i
.first
.c_str(), i
.second
);
267 f
.open_object_section("pg_stats_sum");
268 pg_map
.pg_sum
.dump(&f
);
273 } else if (what
== "pg_status") {
275 cluster_state
.with_pgmap(
276 [&f
](const PGMap
&pg_map
) {
277 pg_map
.print_summary(&f
, nullptr);
281 } else if (what
== "pg_dump") {
283 cluster_state
.with_pgmap(
284 [&f
](const PGMap
&pg_map
) {
289 } else if (what
== "df") {
292 cluster_state
.with_pgmap([this, &f
](const PGMap
&pg_map
) {
293 cluster_state
.with_osdmap(
294 [&pg_map
, &f
](const OSDMap
&osd_map
) {
295 pg_map
.dump_fs_stats(nullptr, &f
, true);
296 pg_map
.dump_pool_stats_full(osd_map
, nullptr, &f
, true);
300 } else if (what
== "osd_stats") {
302 cluster_state
.with_pgmap(
303 [&f
](const PGMap
&pg_map
) {
304 pg_map
.dump_osd_stats(&f
);
307 } else if (what
== "health" || what
== "mon_status") {
310 if (what
== "health") {
311 json
= cluster_state
.get_health();
312 } else if (what
== "mon_status") {
313 json
= cluster_state
.get_mon_status();
317 f
.dump_string("json", json
.to_str());
319 } else if (what
== "mgr_map") {
321 cluster_state
.with_mgrmap([&f
](const MgrMap
&mgr_map
) {
326 derr
<< "Python module requested unknown data '" << what
<< "'" << dendl
;
331 int ActivePyModules::start_one(std::string
const &module_name
,
332 PyObject
*pClass
, const SafeThreadState
&pMyThreadState
)
334 Mutex::Locker
l(lock
);
336 assert(modules
.count(module_name
) == 0);
338 modules
[module_name
].reset(new ActivePyModule(
340 pMyThreadState
, clog
));
342 int r
= modules
[module_name
]->load(this);
346 dout(4) << "Starting thread for " << module_name
<< dendl
;
347 // Giving Thread the module's module_name member as its
348 // char* thread name: thread must not outlive module class lifetime.
349 modules
[module_name
]->thread
.create(
350 modules
[module_name
]->get_name().c_str());
356 void ActivePyModules::shutdown()
358 Mutex::Locker
locker(lock
);
360 // Signal modules to drop out of serve() and/or tear down resources
361 for (auto &i
: modules
) {
362 auto module
= i
.second
.get();
363 const auto& name
= i
.first
;
366 dout(10) << "calling module " << name
<< " shutdown()" << dendl
;
368 dout(10) << "module " << name
<< " shutdown() returned" << dendl
;
372 // For modules implementing serve(), finish the threads where we
373 // were running that.
374 for (auto &i
: modules
) {
376 dout(10) << "joining module " << i
.first
<< dendl
;
377 i
.second
->thread
.join();
378 dout(10) << "joined module " << i
.first
<< dendl
;
385 void ActivePyModules::notify_all(const std::string
¬ify_type
,
386 const std::string
¬ify_id
)
388 Mutex::Locker
l(lock
);
390 dout(10) << __func__
<< ": notify_all " << notify_type
<< dendl
;
391 for (auto& i
: modules
) {
392 auto module
= i
.second
.get();
393 // Send all python calls down a Finisher to avoid blocking
394 // C++ code, and avoid any potential lock cycles.
395 finisher
.queue(new FunctionContext([module
, notify_type
, notify_id
](int r
){
396 module
->notify(notify_type
, notify_id
);
401 void ActivePyModules::notify_all(const LogEntry
&log_entry
)
403 Mutex::Locker
l(lock
);
405 dout(10) << __func__
<< ": notify_all (clog)" << dendl
;
406 for (auto& i
: modules
) {
407 auto module
= i
.second
.get();
408 // Send all python calls down a Finisher to avoid blocking
409 // C++ code, and avoid any potential lock cycles.
411 // Note intentional use of non-reference lambda binding on
412 // log_entry: we take a copy because caller's instance is
413 // probably ephemeral.
414 finisher
.queue(new FunctionContext([module
, log_entry
](int r
){
415 module
->notify_clog(log_entry
);
420 bool ActivePyModules::get_config(const std::string
&module_name
,
421 const std::string
&key
, std::string
*val
) const
423 const std::string global_key
= PyModuleRegistry::config_prefix
424 + module_name
+ "/" + key
;
426 dout(4) << __func__
<< "key: " << global_key
<< dendl
;
428 Mutex::Locker
l(lock
);
430 if (config_cache
.count(global_key
)) {
431 *val
= config_cache
.at(global_key
);
438 PyObject
*ActivePyModules::get_config_prefix(const std::string
&module_name
,
439 const std::string
&prefix
) const
441 PyThreadState
*tstate
= PyEval_SaveThread();
442 Mutex::Locker
l(lock
);
443 PyEval_RestoreThread(tstate
);
445 const std::string base_prefix
= PyModuleRegistry::config_prefix
447 const std::string global_prefix
= base_prefix
+ prefix
;
448 dout(4) << __func__
<< "prefix: " << global_prefix
<< dendl
;
451 for (auto p
= config_cache
.lower_bound(global_prefix
);
452 p
!= config_cache
.end() && p
->first
.find(global_prefix
) == 0;
454 f
.dump_string(p
->first
.c_str() + base_prefix
.size(), p
->second
);
459 void ActivePyModules::set_config(const std::string
&module_name
,
460 const std::string
&key
, const boost::optional
<std::string
>& val
)
462 const std::string global_key
= PyModuleRegistry::config_prefix
463 + module_name
+ "/" + key
;
467 PyThreadState
*tstate
= PyEval_SaveThread();
468 Mutex::Locker
l(lock
);
469 PyEval_RestoreThread(tstate
);
471 config_cache
[global_key
] = *val
;
473 config_cache
.erase(global_key
);
476 std::ostringstream cmd_json
;
478 jf
.open_object_section("cmd");
480 jf
.dump_string("prefix", "config-key set");
481 jf
.dump_string("key", global_key
);
482 jf
.dump_string("val", *val
);
484 jf
.dump_string("prefix", "config-key del");
485 jf
.dump_string("key", global_key
);
489 set_cmd
.run(&monc
, cmd_json
.str());
493 if (set_cmd
.r
!= 0) {
494 // config-key set will fail if mgr's auth key has insufficient
495 // permission to set config keys
496 // FIXME: should this somehow raise an exception back into Python land?
497 dout(0) << "`config-key set " << global_key
<< " " << val
<< "` failed: "
498 << cpp_strerror(set_cmd
.r
) << dendl
;
499 dout(0) << "mon returned " << set_cmd
.r
<< ": " << set_cmd
.outs
<< dendl
;
503 std::vector
<ModuleCommand
> ActivePyModules::get_py_commands() const
505 Mutex::Locker
l(lock
);
507 std::vector
<ModuleCommand
> result
;
508 for (const auto& i
: modules
) {
509 auto module
= i
.second
.get();
510 auto mod_commands
= module
->get_commands();
511 for (auto j
: mod_commands
) {
519 std::vector
<MonCommand
> ActivePyModules::get_commands() const
521 std::vector
<ModuleCommand
> commands
= get_py_commands();
522 std::vector
<MonCommand
> result
;
523 for (auto &pyc
: commands
) {
524 result
.push_back({pyc
.cmdstring
, pyc
.helpstring
, "mgr",
525 pyc
.perm
, "cli", MonCommand::FLAG_MGR
});
531 std::map
<std::string
, std::string
> ActivePyModules::get_services() const
533 std::map
<std::string
, std::string
> result
;
534 Mutex::Locker
l(lock
);
535 for (const auto& i
: modules
) {
536 const auto &module
= i
.second
.get();
537 std::string svc_str
= module
->get_uri();
538 if (!svc_str
.empty()) {
539 result
[module
->get_name()] = svc_str
;
546 PyObject
* ActivePyModules::get_counter_python(
547 const std::string
&svc_name
,
548 const std::string
&svc_id
,
549 const std::string
&path
)
551 PyThreadState
*tstate
= PyEval_SaveThread();
552 Mutex::Locker
l(lock
);
553 PyEval_RestoreThread(tstate
);
556 f
.open_array_section(path
.c_str());
558 auto metadata
= daemon_state
.get(DaemonKey(svc_name
, svc_id
));
560 Mutex::Locker
l2(metadata
->lock
);
561 if (metadata
->perf_counters
.instances
.count(path
)) {
562 auto counter_instance
= metadata
->perf_counters
.instances
.at(path
);
563 auto counter_type
= metadata
->perf_counters
.types
.at(path
);
564 if (counter_type
.type
& PERFCOUNTER_LONGRUNAVG
) {
565 const auto &avg_data
= counter_instance
.get_data_avg();
566 for (const auto &datapoint
: avg_data
) {
567 f
.open_array_section("datapoint");
568 f
.dump_unsigned("t", datapoint
.t
.sec());
569 f
.dump_unsigned("s", datapoint
.s
);
570 f
.dump_unsigned("c", datapoint
.c
);
574 const auto &data
= counter_instance
.get_data();
575 for (const auto &datapoint
: data
) {
576 f
.open_array_section("datapoint");
577 f
.dump_unsigned("t", datapoint
.t
.sec());
578 f
.dump_unsigned("v", datapoint
.v
);
583 dout(4) << "Missing counter: '" << path
<< "' ("
584 << svc_name
<< "." << svc_id
<< ")" << dendl
;
585 dout(20) << "Paths are:" << dendl
;
586 for (const auto &i
: metadata
->perf_counters
.instances
) {
587 dout(20) << i
.first
<< dendl
;
591 dout(4) << "No daemon state for "
592 << svc_name
<< "." << svc_id
<< ")" << dendl
;
598 PyObject
* ActivePyModules::get_perf_schema_python(
599 const std::string svc_type
,
600 const std::string
&svc_id
)
602 PyThreadState
*tstate
= PyEval_SaveThread();
603 Mutex::Locker
l(lock
);
604 PyEval_RestoreThread(tstate
);
606 DaemonStateCollection daemons
;
608 if (svc_type
== "") {
609 daemons
= std::move(daemon_state
.get_all());
610 } else if (svc_id
.empty()) {
611 daemons
= std::move(daemon_state
.get_by_service(svc_type
));
613 auto key
= DaemonKey(svc_type
, svc_id
);
614 // so that the below can be a loop in all cases
615 auto got
= daemon_state
.get(key
);
616 if (got
!= nullptr) {
622 if (!daemons
.empty()) {
623 for (auto statepair
: daemons
) {
624 auto key
= statepair
.first
;
625 auto state
= statepair
.second
;
627 std::ostringstream daemon_name
;
628 daemon_name
<< key
.first
<< "." << key
.second
;
629 f
.open_object_section(daemon_name
.str().c_str());
631 Mutex::Locker
l(state
->lock
);
632 for (auto ctr_inst_iter
: state
->perf_counters
.instances
) {
633 const auto &counter_name
= ctr_inst_iter
.first
;
634 f
.open_object_section(counter_name
.c_str());
635 auto type
= state
->perf_counters
.types
[counter_name
];
636 f
.dump_string("description", type
.description
);
637 if (!type
.nick
.empty()) {
638 f
.dump_string("nick", type
.nick
);
640 f
.dump_unsigned("type", type
.type
);
641 f
.dump_unsigned("priority", type
.priority
);
642 f
.dump_unsigned("units", type
.unit
);
648 dout(4) << __func__
<< ": No daemon state found for "
649 << svc_type
<< "." << svc_id
<< ")" << dendl
;
654 PyObject
*ActivePyModules::get_context()
656 PyThreadState
*tstate
= PyEval_SaveThread();
657 Mutex::Locker
l(lock
);
658 PyEval_RestoreThread(tstate
);
660 // Construct a capsule containing ceph context.
661 // Not incrementing/decrementing ref count on the context because
662 // it's the global one and it has process lifetime.
663 auto capsule
= PyCapsule_New(g_ceph_context
, nullptr, nullptr);
668 * Helper for our wrapped types that take a capsule in their constructor.
670 PyObject
*construct_with_capsule(
671 const std::string
&module_name
,
672 const std::string
&clsname
,
675 // Look up the OSDMap type which we will construct
676 PyObject
*module
= PyImport_ImportModule(module_name
.c_str());
678 derr
<< "Failed to import python module:" << dendl
;
679 derr
<< handle_pyerror() << dendl
;
683 PyObject
*wrapper_type
= PyObject_GetAttrString(
684 module
, (const char*)clsname
.c_str());
686 derr
<< "Failed to get python type:" << dendl
;
687 derr
<< handle_pyerror() << dendl
;
689 assert(wrapper_type
);
691 // Construct a capsule containing an OSDMap.
692 auto wrapped_capsule
= PyCapsule_New(wrapped
, nullptr, nullptr);
693 assert(wrapped_capsule
);
695 // Construct the python OSDMap
696 auto pArgs
= PyTuple_Pack(1, wrapped_capsule
);
697 auto wrapper_instance
= PyObject_CallObject(wrapper_type
, pArgs
);
698 if (wrapper_instance
== nullptr) {
699 derr
<< "Failed to construct python OSDMap:" << dendl
;
700 derr
<< handle_pyerror() << dendl
;
702 assert(wrapper_instance
!= nullptr);
704 Py_DECREF(wrapped_capsule
);
706 Py_DECREF(wrapper_type
);
709 return wrapper_instance
;
712 PyObject
*ActivePyModules::get_osdmap()
714 PyThreadState
*tstate
= PyEval_SaveThread();
715 Mutex::Locker
l(lock
);
716 PyEval_RestoreThread(tstate
);
718 OSDMap
*newmap
= new OSDMap
;
720 cluster_state
.with_osdmap([&](const OSDMap
& o
) {
721 newmap
->deepish_copy_from(o
);
724 return construct_with_capsule("mgr_module", "OSDMap", (void*)newmap
);
727 void ActivePyModules::set_health_checks(const std::string
& module_name
,
728 health_check_map_t
&& checks
)
730 Mutex::Locker
l(lock
);
731 auto p
= modules
.find(module_name
);
732 if (p
!= modules
.end()) {
733 p
->second
->set_health_checks(std::move(checks
));
737 void ActivePyModules::get_health_checks(health_check_map_t
*checks
)
739 Mutex::Locker
l(lock
);
740 for (auto& p
: modules
) {
741 p
.second
->get_health_checks(checks
);
745 void ActivePyModules::set_uri(const std::string
& module_name
,
746 const std::string
&uri
)
748 Mutex::Locker
l(lock
);
750 dout(4) << " module " << module_name
<< " set URI '" << uri
<< "'" << dendl
;
752 modules
[module_name
]->set_uri(uri
);