1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2014 John Spray <john.spray@inktank.com>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
14 // Include this first to get python headers earlier
18 #include "common/errno.h"
19 #include "include/stringify.h"
21 #include "PyFormatter.h"
23 #include "osd/OSDMap.h"
24 #include "mon/MonMap.h"
26 #include "mgr/MgrContext.h"
28 #include "PyModules.h"
30 #define dout_context g_ceph_context
31 #define dout_subsys ceph_subsys_mgr
33 #define dout_prefix *_dout << "mgr " << __func__ << " "
35 // definition for non-const static member
36 std::string
PyModules::config_prefix
;
38 // constructor/destructor implementations cannot be in .h,
39 // because ServeThread is still an "incomplete" type there
41 PyModules::PyModules(DaemonStateIndex
&ds
, ClusterState
&cs
,
42 MonClient
&mc
, LogChannelRef clog_
, Objecter
&objecter_
,
43 Client
&client_
, Finisher
&f
)
44 : daemon_state(ds
), cluster_state(cs
), monc(mc
), clog(clog_
),
45 objecter(objecter_
), client(client_
), finisher(f
),
49 PyModules::~PyModules() = default;
51 void PyModules::dump_server(const std::string
&hostname
,
52 const DaemonStateCollection
&dmc
,
55 f
->dump_string("hostname", hostname
);
56 f
->open_array_section("services");
57 std::string ceph_version
;
59 for (const auto &i
: dmc
) {
60 Mutex::Locker
l(i
.second
->lock
);
61 const auto &key
= i
.first
;
62 const std::string
&str_type
= key
.first
;
63 const std::string
&svc_name
= key
.second
;
65 // TODO: pick the highest version, and make sure that
66 // somewhere else (during health reporting?) we are
67 // indicating to the user if we see mixed versions
68 auto ver_iter
= i
.second
->metadata
.find("ceph_version");
69 if (ver_iter
!= i
.second
->metadata
.end()) {
70 ceph_version
= i
.second
->metadata
.at("ceph_version");
73 f
->open_object_section("service");
74 f
->dump_string("type", str_type
);
75 f
->dump_string("id", svc_name
);
80 f
->dump_string("ceph_version", ceph_version
);
85 PyObject
*PyModules::get_server_python(const std::string
&hostname
)
87 PyThreadState
*tstate
= PyEval_SaveThread();
88 Mutex::Locker
l(lock
);
89 PyEval_RestoreThread(tstate
);
90 dout(10) << " (" << hostname
<< ")" << dendl
;
92 auto dmc
= daemon_state
.get_by_server(hostname
);
95 dump_server(hostname
, dmc
, &f
);
100 PyObject
*PyModules::list_servers_python()
102 PyThreadState
*tstate
= PyEval_SaveThread();
103 Mutex::Locker
l(lock
);
104 PyEval_RestoreThread(tstate
);
105 dout(10) << " >" << dendl
;
107 PyFormatter
f(false, true);
108 const auto &all
= daemon_state
.get_all_servers();
109 for (const auto &i
: all
) {
110 const auto &hostname
= i
.first
;
112 f
.open_object_section("server");
113 dump_server(hostname
, i
.second
, &f
);
120 PyObject
*PyModules::get_metadata_python(
121 std::string
const &handle
,
122 const std::string
&svc_name
,
123 const std::string
&svc_id
)
125 auto metadata
= daemon_state
.get(DaemonKey(svc_name
, svc_id
));
126 Mutex::Locker
l(metadata
->lock
);
128 f
.dump_string("hostname", metadata
->hostname
);
129 for (const auto &i
: metadata
->metadata
) {
130 f
.dump_string(i
.first
.c_str(), i
.second
);
136 PyObject
*PyModules::get_daemon_status_python(
137 std::string
const &handle
,
138 const std::string
&svc_name
,
139 const std::string
&svc_id
)
141 auto metadata
= daemon_state
.get(DaemonKey(svc_name
, svc_id
));
142 Mutex::Locker
l(metadata
->lock
);
144 for (const auto &i
: metadata
->service_status
) {
145 f
.dump_string(i
.first
.c_str(), i
.second
);
150 PyObject
*PyModules::get_python(const std::string
&what
)
152 PyThreadState
*tstate
= PyEval_SaveThread();
153 Mutex::Locker
l(lock
);
154 PyEval_RestoreThread(tstate
);
156 if (what
== "fs_map") {
158 cluster_state
.with_fsmap([&f
](const FSMap
&fsmap
) {
162 } else if (what
== "osdmap_crush_map_text") {
164 cluster_state
.with_osdmap([&rdata
](const OSDMap
&osd_map
){
165 osd_map
.crush
->encode(rdata
, CEPH_FEATURES_SUPPORTED_DEFAULT
);
167 std::string crush_text
= rdata
.to_str();
168 return PyString_FromString(crush_text
.c_str());
169 } else if (what
.substr(0, 7) == "osd_map") {
171 cluster_state
.with_osdmap([&f
, &what
](const OSDMap
&osd_map
){
172 if (what
== "osd_map") {
174 } else if (what
== "osd_map_tree") {
175 osd_map
.print_tree(&f
, nullptr);
176 } else if (what
== "osd_map_crush") {
177 osd_map
.crush
->dump(&f
);
181 } else if (what
== "config") {
183 g_conf
->show_config(&f
);
185 } else if (what
== "mon_map") {
187 cluster_state
.with_monmap(
188 [&f
](const MonMap
&monmap
) {
193 } else if (what
== "service_map") {
195 cluster_state
.with_servicemap(
196 [&f
](const ServiceMap
&service_map
) {
197 service_map
.dump(&f
);
201 } else if (what
== "osd_metadata") {
203 auto dmc
= daemon_state
.get_by_service("osd");
204 for (const auto &i
: dmc
) {
205 Mutex::Locker
l(i
.second
->lock
);
206 f
.open_object_section(i
.first
.second
.c_str());
207 f
.dump_string("hostname", i
.second
->hostname
);
208 for (const auto &j
: i
.second
->metadata
) {
209 f
.dump_string(j
.first
.c_str(), j
.second
);
214 } else if (what
== "pg_summary") {
216 cluster_state
.with_pgmap(
217 [&f
](const PGMap
&pg_map
) {
218 std::map
<std::string
, std::map
<std::string
, uint32_t> > osds
;
219 std::map
<std::string
, std::map
<std::string
, uint32_t> > pools
;
220 std::map
<std::string
, uint32_t> all
;
221 for (const auto &i
: pg_map
.pg_stat
) {
222 const auto pool
= i
.first
.m_pool
;
223 const std::string state
= pg_state_string(i
.second
.state
);
224 // Insert to per-pool map
225 pools
[stringify(pool
)][state
]++;
226 for (const auto &osd_id
: i
.second
.acting
) {
227 osds
[stringify(osd_id
)][state
]++;
231 f
.open_object_section("by_osd");
232 for (const auto &i
: osds
) {
233 f
.open_object_section(i
.first
.c_str());
234 for (const auto &j
: i
.second
) {
235 f
.dump_int(j
.first
.c_str(), j
.second
);
240 f
.open_object_section("by_pool");
241 for (const auto &i
: pools
) {
242 f
.open_object_section(i
.first
.c_str());
243 for (const auto &j
: i
.second
) {
244 f
.dump_int(j
.first
.c_str(), j
.second
);
249 f
.open_object_section("all");
250 for (const auto &i
: all
) {
251 f
.dump_int(i
.first
.c_str(), i
.second
);
258 } else if (what
== "df") {
261 cluster_state
.with_osdmap([this, &f
](const OSDMap
&osd_map
){
262 cluster_state
.with_pgmap(
263 [&osd_map
, &f
](const PGMap
&pg_map
) {
264 pg_map
.dump_fs_stats(nullptr, &f
, true);
265 pg_map
.dump_pool_stats_full(osd_map
, nullptr, &f
, true);
269 } else if (what
== "osd_stats") {
271 cluster_state
.with_pgmap(
272 [&f
](const PGMap
&pg_map
) {
273 pg_map
.dump_osd_stats(&f
);
276 } else if (what
== "health" || what
== "mon_status") {
279 if (what
== "health") {
280 json
= cluster_state
.get_health();
281 } else if (what
== "mon_status") {
282 json
= cluster_state
.get_mon_status();
286 f
.dump_string("json", json
.to_str());
288 } else if (what
== "mgr_map") {
290 cluster_state
.with_mgrmap([&f
](const MgrMap
&mgr_map
) {
295 derr
<< "Python module requested unknown data '" << what
<< "'" << dendl
;
300 std::string
PyModules::get_site_packages()
302 std::stringstream site_packages
;
304 // CPython doesn't auto-add site-packages dirs to sys.path for us,
305 // but it does provide a module that we can ask for them.
306 auto site_module
= PyImport_ImportModule("site");
309 auto site_packages_fn
= PyObject_GetAttrString(site_module
, "getsitepackages");
310 if (site_packages_fn
!= nullptr) {
311 auto site_packages_list
= PyObject_CallObject(site_packages_fn
, nullptr);
312 assert(site_packages_list
);
314 auto n
= PyList_Size(site_packages_list
);
315 for (Py_ssize_t i
= 0; i
< n
; ++i
) {
317 site_packages
<< ":";
319 site_packages
<< PyString_AsString(PyList_GetItem(site_packages_list
, i
));
322 Py_DECREF(site_packages_list
);
323 Py_DECREF(site_packages_fn
);
325 // Fall back to generating our own site-packages paths by imitating
326 // what the standard site.py does. This is annoying but it lets us
327 // run inside virtualenvs :-/
329 auto site_packages_fn
= PyObject_GetAttrString(site_module
, "addsitepackages");
330 assert(site_packages_fn
);
332 auto known_paths
= PySet_New(nullptr);
333 auto pArgs
= PyTuple_Pack(1, known_paths
);
334 PyObject_CallObject(site_packages_fn
, pArgs
);
336 Py_DECREF(known_paths
);
337 Py_DECREF(site_packages_fn
);
339 auto sys_module
= PyImport_ImportModule("sys");
341 auto sys_path
= PyObject_GetAttrString(sys_module
, "path");
344 dout(1) << "sys.path:" << dendl
;
345 auto n
= PyList_Size(sys_path
);
347 for (Py_ssize_t i
= 0; i
< n
; ++i
) {
348 dout(1) << " " << PyString_AsString(PyList_GetItem(sys_path
, i
)) << dendl
;
352 site_packages
<< ":";
354 site_packages
<< PyString_AsString(PyList_GetItem(sys_path
, i
));
358 Py_DECREF(sys_module
);
361 Py_DECREF(site_module
);
363 return site_packages
.str();
367 int PyModules::init()
369 Mutex::Locker
locker(lock
);
371 global_handle
= this;
372 // namespace in config-key prefixed by "mgr/"
373 config_prefix
= std::string(g_conf
->name
.get_type_str()) + "/";
375 // Set up global python interpreter
376 Py_SetProgramName(const_cast<char*>(PYTHON_EXECUTABLE
));
379 // Let CPython know that we will be calling it back from other
380 // threads in future.
381 if (! PyEval_ThreadsInitialized()) {
382 PyEval_InitThreads();
385 // Configure sys.path to include mgr_module_path
386 std::string sys_path
= std::string(Py_GetPath()) + ":" + get_site_packages()
387 + ":" + g_conf
->mgr_module_path
;
388 dout(10) << "Computed sys.path '" << sys_path
<< "'" << dendl
;
390 // Drop the GIL and remember the main thread state (current
391 // thread state becomes NULL)
392 pMainThreadState
= PyEval_SaveThread();
394 std::list
<std::string
> failed_modules
;
398 cluster_state
.with_mgrmap([&](const MgrMap
& m
) {
401 for (const auto& module_name
: ls
) {
402 dout(1) << "Loading python module '" << module_name
<< "'" << dendl
;
403 auto mod
= std::unique_ptr
<MgrPyModule
>(new MgrPyModule(module_name
, sys_path
, pMainThreadState
));
406 // Don't use handle_pyerror() here; we don't have the GIL
407 // or the right thread state (this is deliberate).
408 derr
<< "Error loading module '" << module_name
<< "': "
409 << cpp_strerror(r
) << dendl
;
410 failed_modules
.push_back(module_name
);
411 // Don't drop out here, load the other modules
414 modules
[module_name
] = std::move(mod
);
418 if (!failed_modules
.empty()) {
419 clog
->error() << "Failed to load ceph-mgr modules: " << joinify(
420 failed_modules
.begin(), failed_modules
.end(), std::string(", "));
426 class ServeThread
: public Thread
433 ServeThread(MgrPyModule
*mod_
)
436 void *entry() override
440 // No need to acquire the GIL here; the module does it.
441 dout(4) << "Entering thread for " << mod
->get_name() << dendl
;
449 void PyModules::start()
451 Mutex::Locker
l(lock
);
453 dout(1) << "Creating threads for " << modules
.size() << " modules" << dendl
;
454 for (auto& i
: modules
) {
455 auto thread
= new ServeThread(i
.second
.get());
456 serve_threads
[i
.first
].reset(thread
);
459 for (auto &i
: serve_threads
) {
460 std::ostringstream thread_name
;
461 thread_name
<< "mgr." << i
.first
;
462 dout(4) << "Starting thread for " << i
.first
<< dendl
;
463 i
.second
->create(thread_name
.str().c_str());
467 void PyModules::shutdown()
469 Mutex::Locker
locker(lock
);
470 assert(global_handle
);
472 // Signal modules to drop out of serve() and/or tear down resources
473 for (auto &i
: modules
) {
474 auto module
= i
.second
.get();
475 const auto& name
= i
.first
;
476 dout(10) << "waiting for module " << name
<< " to shutdown" << dendl
;
480 dout(10) << "module " << name
<< " shutdown" << dendl
;
483 // For modules implementing serve(), finish the threads where we
484 // were running that.
485 for (auto &i
: serve_threads
) {
490 serve_threads
.clear();
494 PyEval_RestoreThread(pMainThreadState
);
497 // nobody needs me anymore.
498 global_handle
= nullptr;
501 void PyModules::notify_all(const std::string
¬ify_type
,
502 const std::string
¬ify_id
)
504 Mutex::Locker
l(lock
);
506 dout(10) << __func__
<< ": notify_all " << notify_type
<< dendl
;
507 for (auto& i
: modules
) {
508 auto module
= i
.second
.get();
509 if (!serve_threads
[i
.first
]->running
)
511 // Send all python calls down a Finisher to avoid blocking
512 // C++ code, and avoid any potential lock cycles.
513 finisher
.queue(new FunctionContext([module
, notify_type
, notify_id
](int r
){
514 module
->notify(notify_type
, notify_id
);
519 void PyModules::notify_all(const LogEntry
&log_entry
)
521 Mutex::Locker
l(lock
);
523 dout(10) << __func__
<< ": notify_all (clog)" << dendl
;
524 for (auto& i
: modules
) {
525 auto module
= i
.second
.get();
526 if (!serve_threads
[i
.first
]->running
)
528 // Send all python calls down a Finisher to avoid blocking
529 // C++ code, and avoid any potential lock cycles.
531 // Note intentional use of non-reference lambda binding on
532 // log_entry: we take a copy because caller's instance is
533 // probably ephemeral.
534 finisher
.queue(new FunctionContext([module
, log_entry
](int r
){
535 module
->notify_clog(log_entry
);
540 bool PyModules::get_config(const std::string
&handle
,
541 const std::string
&key
, std::string
*val
) const
543 PyThreadState
*tstate
= PyEval_SaveThread();
544 Mutex::Locker
l(lock
);
545 PyEval_RestoreThread(tstate
);
547 const std::string global_key
= config_prefix
+ handle
+ "/" + key
;
549 dout(4) << __func__
<< "key: " << global_key
<< dendl
;
551 if (config_cache
.count(global_key
)) {
552 *val
= config_cache
.at(global_key
);
559 PyObject
*PyModules::get_config_prefix(const std::string
&handle
,
560 const std::string
&prefix
) const
562 PyThreadState
*tstate
= PyEval_SaveThread();
563 Mutex::Locker
l(lock
);
564 PyEval_RestoreThread(tstate
);
566 const std::string base_prefix
= config_prefix
+ handle
+ "/";
567 const std::string global_prefix
= base_prefix
+ prefix
;
568 dout(4) << __func__
<< "prefix: " << global_prefix
<< dendl
;
571 for (auto p
= config_cache
.lower_bound(global_prefix
);
572 p
!= config_cache
.end() && p
->first
.find(global_prefix
) == 0;
574 f
.dump_string(p
->first
.c_str() + base_prefix
.size(), p
->second
);
579 void PyModules::set_config(const std::string
&handle
,
580 const std::string
&key
, const std::string
&val
)
582 const std::string global_key
= config_prefix
+ handle
+ "/" + key
;
586 PyThreadState
*tstate
= PyEval_SaveThread();
587 Mutex::Locker
l(lock
);
588 PyEval_RestoreThread(tstate
);
589 config_cache
[global_key
] = val
;
591 std::ostringstream cmd_json
;
594 jf
.open_object_section("cmd");
595 jf
.dump_string("prefix", "config-key set");
596 jf
.dump_string("key", global_key
);
597 jf
.dump_string("val", val
);
601 set_cmd
.run(&monc
, cmd_json
.str());
605 if (set_cmd
.r
!= 0) {
606 // config-key set will fail if mgr's auth key has insufficient
607 // permission to set config keys
608 // FIXME: should this somehow raise an exception back into Python land?
609 dout(0) << "`config-key set " << global_key
<< " " << val
<< "` failed: "
610 << cpp_strerror(set_cmd
.r
) << dendl
;
611 dout(0) << "mon returned " << set_cmd
.r
<< ": " << set_cmd
.outs
<< dendl
;
615 std::vector
<ModuleCommand
> PyModules::get_py_commands() const
617 Mutex::Locker
l(lock
);
619 std::vector
<ModuleCommand
> result
;
620 for (const auto& i
: modules
) {
621 auto module
= i
.second
.get();
622 auto mod_commands
= module
->get_commands();
623 for (auto j
: mod_commands
) {
631 std::vector
<MonCommand
> PyModules::get_commands() const
633 std::vector
<ModuleCommand
> commands
= get_py_commands();
634 std::vector
<MonCommand
> result
;
635 for (auto &pyc
: commands
) {
636 result
.push_back({pyc
.cmdstring
, pyc
.helpstring
, "mgr",
637 pyc
.perm
, "cli", MonCommand::FLAG_MGR
});
642 void PyModules::insert_config(const std::map
<std::string
,
643 std::string
> &new_config
)
645 Mutex::Locker
l(lock
);
647 dout(4) << "Loaded " << new_config
.size() << " config settings" << dendl
;
648 config_cache
= new_config
;
651 void PyModules::log(const std::string
&handle
,
652 int level
, const std::string
&record
)
655 #define dout_prefix *_dout << "mgr[" << handle << "] "
656 dout(level
) << record
<< dendl
;
658 #define dout_prefix *_dout << "mgr " << __func__ << " "
661 PyObject
* PyModules::get_counter_python(
662 const std::string
&handle
,
663 const std::string
&svc_name
,
664 const std::string
&svc_id
,
665 const std::string
&path
)
667 PyThreadState
*tstate
= PyEval_SaveThread();
668 Mutex::Locker
l(lock
);
669 PyEval_RestoreThread(tstate
);
672 f
.open_array_section(path
.c_str());
674 auto metadata
= daemon_state
.get(DaemonKey(svc_name
, svc_id
));
676 Mutex::Locker
l2(metadata
->lock
);
678 if (metadata
->perf_counters
.instances
.count(path
)) {
679 auto counter_instance
= metadata
->perf_counters
.instances
.at(path
);
680 const auto &data
= counter_instance
.get_data();
681 for (const auto &datapoint
: data
) {
682 f
.open_array_section("datapoint");
683 f
.dump_unsigned("t", datapoint
.t
.sec());
684 f
.dump_unsigned("v", datapoint
.v
);
689 dout(4) << "Missing counter: '" << path
<< "' ("
690 << svc_name
<< "." << svc_id
<< ")" << dendl
;
691 dout(20) << "Paths are:" << dendl
;
692 for (const auto &i
: metadata
->perf_counters
.instances
) {
693 dout(20) << i
.first
<< dendl
;
697 dout(4) << "No daemon state for "
698 << svc_name
<< "." << svc_id
<< ")" << dendl
;
704 PyObject
* PyModules::get_perf_schema_python(
705 const std::string
&handle
,
706 const std::string svc_type
,
707 const std::string
&svc_id
)
709 PyThreadState
*tstate
= PyEval_SaveThread();
710 Mutex::Locker
l(lock
);
711 PyEval_RestoreThread(tstate
);
713 DaemonStateCollection states
;
715 if (svc_type
== "") {
716 states
= daemon_state
.get_all();
717 } else if (svc_id
.empty()) {
718 states
= daemon_state
.get_by_service(svc_type
);
720 auto key
= DaemonKey(svc_type
, svc_id
);
721 // so that the below can be a loop in all cases
722 if (daemon_state
.exists(key
)) {
723 states
[key
] = daemon_state
.get(key
);
728 f
.open_object_section("perf_schema");
730 // FIXME: this is unsafe, I need to either be inside DaemonStateIndex's
731 // lock or put a lock on individual DaemonStates
732 if (!states
.empty()) {
733 for (auto statepair
: states
) {
734 std::ostringstream daemon_name
;
735 auto key
= statepair
.first
;
736 auto state
= statepair
.second
;
737 Mutex::Locker
l(state
->lock
);
738 daemon_name
<< key
.first
<< "." << key
.second
;
739 f
.open_object_section(daemon_name
.str().c_str());
741 for (auto typestr
: state
->perf_counters
.declared_types
) {
742 f
.open_object_section(typestr
.c_str());
743 auto type
= state
->perf_counters
.types
[typestr
];
744 f
.dump_string("description", type
.description
);
745 if (!type
.nick
.empty()) {
746 f
.dump_string("nick", type
.nick
);
748 f
.dump_unsigned("type", type
.type
);
754 dout(4) << __func__
<< ": No daemon state found for "
755 << svc_type
<< "." << svc_id
<< ")" << dendl
;
761 PyObject
*PyModules::get_context()
763 PyThreadState
*tstate
= PyEval_SaveThread();
764 Mutex::Locker
l(lock
);
765 PyEval_RestoreThread(tstate
);
767 // Construct a capsule containing ceph context.
768 // Not incrementing/decrementing ref count on the context because
769 // it's the global one and it has process lifetime.
770 auto capsule
= PyCapsule_New(g_ceph_context
, nullptr, nullptr);
774 static void _list_modules(
775 const std::string path
,
776 std::set
<std::string
> *modules
)
778 DIR *dir
= opendir(path
.c_str());
782 struct dirent
*entry
= NULL
;
783 while ((entry
= readdir(dir
)) != NULL
) {
784 string
n(entry
->d_name
);
785 string fn
= path
+ "/" + n
;
787 int r
= ::stat(fn
.c_str(), &st
);
788 if (r
== 0 && S_ISDIR(st
.st_mode
)) {
789 string initfn
= fn
+ "/module.py";
790 r
= ::stat(initfn
.c_str(), &st
);
799 void PyModules::list_modules(std::set
<std::string
> *modules
)
801 _list_modules(g_conf
->mgr_module_path
, modules
);
804 void PyModules::set_health_checks(const std::string
& handle
,
805 health_check_map_t
&& checks
)
807 Mutex::Locker
l(lock
);
808 auto p
= modules
.find(handle
);
809 if (p
!= modules
.end()) {
810 p
->second
->set_health_checks(std::move(checks
));
814 void PyModules::get_health_checks(health_check_map_t
*checks
)
816 Mutex::Locker
l(lock
);
817 for (auto& p
: modules
) {
818 p
.second
->get_health_checks(checks
);