1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2014 John Spray <john.spray@inktank.com>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
14 // Include this first to get python headers earlier
18 #include "common/errno.h"
19 #include "include/stringify.h"
21 #include "PyFormatter.h"
23 #include "osd/OSDMap.h"
24 #include "mon/MonMap.h"
26 #include "mgr/MgrContext.h"
28 #include "PyModules.h"
30 #define dout_context g_ceph_context
31 #define dout_subsys ceph_subsys_mgr
33 #define dout_prefix *_dout << "mgr " << __func__ << " "
35 // definition for non-const static member
36 std::string
PyModules::config_prefix
;
38 // constructor/destructor implementations cannot be in .h,
39 // because ServeThread is still an "incomplete" type there
41 PyModules::PyModules(DaemonStateIndex
&ds
, ClusterState
&cs
,
42 MonClient
&mc
, LogChannelRef clog_
, Objecter
&objecter_
,
43 Client
&client_
, Finisher
&f
)
44 : daemon_state(ds
), cluster_state(cs
), monc(mc
), clog(clog_
),
45 objecter(objecter_
), client(client_
), finisher(f
),
49 PyModules::~PyModules() = default;
51 void PyModules::dump_server(const std::string
&hostname
,
52 const DaemonStateCollection
&dmc
,
55 f
->dump_string("hostname", hostname
);
56 f
->open_array_section("services");
57 std::string ceph_version
;
59 for (const auto &i
: dmc
) {
60 const auto &key
= i
.first
;
61 const std::string
&str_type
= key
.first
;
62 const std::string
&svc_name
= key
.second
;
64 // TODO: pick the highest version, and make sure that
65 // somewhere else (during health reporting?) we are
66 // indicating to the user if we see mixed versions
67 auto ver_iter
= i
.second
->metadata
.find("ceph_version");
68 if (ver_iter
!= i
.second
->metadata
.end()) {
69 ceph_version
= i
.second
->metadata
.at("ceph_version");
72 f
->open_object_section("service");
73 f
->dump_string("type", str_type
);
74 f
->dump_string("id", svc_name
);
79 f
->dump_string("ceph_version", ceph_version
);
84 PyObject
*PyModules::get_server_python(const std::string
&hostname
)
86 PyThreadState
*tstate
= PyEval_SaveThread();
87 Mutex::Locker
l(lock
);
88 PyEval_RestoreThread(tstate
);
89 dout(10) << " (" << hostname
<< ")" << dendl
;
91 auto dmc
= daemon_state
.get_by_server(hostname
);
94 dump_server(hostname
, dmc
, &f
);
99 PyObject
*PyModules::list_servers_python()
101 PyThreadState
*tstate
= PyEval_SaveThread();
102 Mutex::Locker
l(lock
);
103 PyEval_RestoreThread(tstate
);
104 dout(10) << " >" << dendl
;
106 PyFormatter
f(false, true);
107 const auto &all
= daemon_state
.get_all_servers();
108 for (const auto &i
: all
) {
109 const auto &hostname
= i
.first
;
111 f
.open_object_section("server");
112 dump_server(hostname
, i
.second
, &f
);
119 PyObject
*PyModules::get_metadata_python(
120 std::string
const &handle
,
121 const std::string
&svc_name
,
122 const std::string
&svc_id
)
124 auto metadata
= daemon_state
.get(DaemonKey(svc_name
, svc_id
));
126 f
.dump_string("hostname", metadata
->hostname
);
127 for (const auto &i
: metadata
->metadata
) {
128 f
.dump_string(i
.first
.c_str(), i
.second
);
134 PyObject
*PyModules::get_daemon_status_python(
135 std::string
const &handle
,
136 const std::string
&svc_name
,
137 const std::string
&svc_id
)
139 auto metadata
= daemon_state
.get(DaemonKey(svc_name
, svc_id
));
141 for (const auto &i
: metadata
->service_status
) {
142 f
.dump_string(i
.first
.c_str(), i
.second
);
147 PyObject
*PyModules::get_python(const std::string
&what
)
149 PyThreadState
*tstate
= PyEval_SaveThread();
150 Mutex::Locker
l(lock
);
151 PyEval_RestoreThread(tstate
);
153 if (what
== "fs_map") {
155 cluster_state
.with_fsmap([&f
](const FSMap
&fsmap
) {
159 } else if (what
== "osdmap_crush_map_text") {
161 cluster_state
.with_osdmap([&rdata
](const OSDMap
&osd_map
){
162 osd_map
.crush
->encode(rdata
, CEPH_FEATURES_SUPPORTED_DEFAULT
);
164 std::string crush_text
= rdata
.to_str();
165 return PyString_FromString(crush_text
.c_str());
166 } else if (what
.substr(0, 7) == "osd_map") {
168 cluster_state
.with_osdmap([&f
, &what
](const OSDMap
&osd_map
){
169 if (what
== "osd_map") {
171 } else if (what
== "osd_map_tree") {
172 osd_map
.print_tree(&f
, nullptr);
173 } else if (what
== "osd_map_crush") {
174 osd_map
.crush
->dump(&f
);
178 } else if (what
== "config") {
180 g_conf
->show_config(&f
);
182 } else if (what
== "mon_map") {
184 cluster_state
.with_monmap(
185 [&f
](const MonMap
&monmap
) {
190 } else if (what
== "service_map") {
192 cluster_state
.with_servicemap(
193 [&f
](const ServiceMap
&service_map
) {
194 service_map
.dump(&f
);
198 } else if (what
== "osd_metadata") {
200 auto dmc
= daemon_state
.get_by_service("osd");
201 for (const auto &i
: dmc
) {
202 f
.open_object_section(i
.first
.second
.c_str());
203 f
.dump_string("hostname", i
.second
->hostname
);
204 for (const auto &j
: i
.second
->metadata
) {
205 f
.dump_string(j
.first
.c_str(), j
.second
);
210 } else if (what
== "pg_summary") {
212 cluster_state
.with_pgmap(
213 [&f
](const PGMap
&pg_map
) {
214 std::map
<std::string
, std::map
<std::string
, uint32_t> > osds
;
215 std::map
<std::string
, std::map
<std::string
, uint32_t> > pools
;
216 std::map
<std::string
, uint32_t> all
;
217 for (const auto &i
: pg_map
.pg_stat
) {
218 const auto pool
= i
.first
.m_pool
;
219 const std::string state
= pg_state_string(i
.second
.state
);
220 // Insert to per-pool map
221 pools
[stringify(pool
)][state
]++;
222 for (const auto &osd_id
: i
.second
.acting
) {
223 osds
[stringify(osd_id
)][state
]++;
227 f
.open_object_section("by_osd");
228 for (const auto &i
: osds
) {
229 f
.open_object_section(i
.first
.c_str());
230 for (const auto &j
: i
.second
) {
231 f
.dump_int(j
.first
.c_str(), j
.second
);
236 f
.open_object_section("by_pool");
237 for (const auto &i
: pools
) {
238 f
.open_object_section(i
.first
.c_str());
239 for (const auto &j
: i
.second
) {
240 f
.dump_int(j
.first
.c_str(), j
.second
);
245 f
.open_object_section("all");
246 for (const auto &i
: all
) {
247 f
.dump_int(i
.first
.c_str(), i
.second
);
254 } else if (what
== "df") {
257 cluster_state
.with_osdmap([this, &f
](const OSDMap
&osd_map
){
258 cluster_state
.with_pgmap(
259 [&osd_map
, &f
](const PGMap
&pg_map
) {
260 pg_map
.dump_fs_stats(nullptr, &f
, true);
261 pg_map
.dump_pool_stats_full(osd_map
, nullptr, &f
, true);
265 } else if (what
== "osd_stats") {
267 cluster_state
.with_pgmap(
268 [&f
](const PGMap
&pg_map
) {
269 pg_map
.dump_osd_stats(&f
);
272 } else if (what
== "health" || what
== "mon_status") {
275 if (what
== "health") {
276 json
= cluster_state
.get_health();
277 } else if (what
== "mon_status") {
278 json
= cluster_state
.get_mon_status();
282 f
.dump_string("json", json
.to_str());
284 } else if (what
== "mgr_map") {
286 cluster_state
.with_mgrmap([&f
](const MgrMap
&mgr_map
) {
291 derr
<< "Python module requested unknown data '" << what
<< "'" << dendl
;
296 std::string
PyModules::get_site_packages()
298 std::stringstream site_packages
;
300 // CPython doesn't auto-add site-packages dirs to sys.path for us,
301 // but it does provide a module that we can ask for them.
302 auto site_module
= PyImport_ImportModule("site");
305 auto site_packages_fn
= PyObject_GetAttrString(site_module
, "getsitepackages");
306 if (site_packages_fn
!= nullptr) {
307 auto site_packages_list
= PyObject_CallObject(site_packages_fn
, nullptr);
308 assert(site_packages_list
);
310 auto n
= PyList_Size(site_packages_list
);
311 for (Py_ssize_t i
= 0; i
< n
; ++i
) {
313 site_packages
<< ":";
315 site_packages
<< PyString_AsString(PyList_GetItem(site_packages_list
, i
));
318 Py_DECREF(site_packages_list
);
319 Py_DECREF(site_packages_fn
);
321 // Fall back to generating our own site-packages paths by imitating
322 // what the standard site.py does. This is annoying but it lets us
323 // run inside virtualenvs :-/
325 auto site_packages_fn
= PyObject_GetAttrString(site_module
, "addsitepackages");
326 assert(site_packages_fn
);
328 auto known_paths
= PySet_New(nullptr);
329 auto pArgs
= PyTuple_Pack(1, known_paths
);
330 PyObject_CallObject(site_packages_fn
, pArgs
);
332 Py_DECREF(known_paths
);
333 Py_DECREF(site_packages_fn
);
335 auto sys_module
= PyImport_ImportModule("sys");
337 auto sys_path
= PyObject_GetAttrString(sys_module
, "path");
340 dout(1) << "sys.path:" << dendl
;
341 auto n
= PyList_Size(sys_path
);
343 for (Py_ssize_t i
= 0; i
< n
; ++i
) {
344 dout(1) << " " << PyString_AsString(PyList_GetItem(sys_path
, i
)) << dendl
;
348 site_packages
<< ":";
350 site_packages
<< PyString_AsString(PyList_GetItem(sys_path
, i
));
354 Py_DECREF(sys_module
);
357 Py_DECREF(site_module
);
359 return site_packages
.str();
363 int PyModules::init()
365 Mutex::Locker
locker(lock
);
367 global_handle
= this;
368 // namespace in config-key prefixed by "mgr/"
369 config_prefix
= std::string(g_conf
->name
.get_type_str()) + "/";
371 // Set up global python interpreter
372 Py_SetProgramName(const_cast<char*>(PYTHON_EXECUTABLE
));
375 // Let CPython know that we will be calling it back from other
376 // threads in future.
377 if (! PyEval_ThreadsInitialized()) {
378 PyEval_InitThreads();
381 // Configure sys.path to include mgr_module_path
382 std::string sys_path
= std::string(Py_GetPath()) + ":" + get_site_packages()
383 + ":" + g_conf
->mgr_module_path
;
384 dout(10) << "Computed sys.path '" << sys_path
<< "'" << dendl
;
386 // Drop the GIL and remember the main thread state (current
387 // thread state becomes NULL)
388 pMainThreadState
= PyEval_SaveThread();
390 std::list
<std::string
> failed_modules
;
394 cluster_state
.with_mgrmap([&](const MgrMap
& m
) {
397 for (const auto& module_name
: ls
) {
398 dout(1) << "Loading python module '" << module_name
<< "'" << dendl
;
399 auto mod
= std::unique_ptr
<MgrPyModule
>(new MgrPyModule(module_name
, sys_path
, pMainThreadState
));
402 // Don't use handle_pyerror() here; we don't have the GIL
403 // or the right thread state (this is deliberate).
404 derr
<< "Error loading module '" << module_name
<< "': "
405 << cpp_strerror(r
) << dendl
;
406 failed_modules
.push_back(module_name
);
407 // Don't drop out here, load the other modules
410 modules
[module_name
] = std::move(mod
);
414 if (!failed_modules
.empty()) {
415 clog
->error() << "Failed to load ceph-mgr modules: " << joinify(
416 failed_modules
.begin(), failed_modules
.end(), std::string(", "));
422 class ServeThread
: public Thread
429 ServeThread(MgrPyModule
*mod_
)
432 void *entry() override
436 // No need to acquire the GIL here; the module does it.
437 dout(4) << "Entering thread for " << mod
->get_name() << dendl
;
445 void PyModules::start()
447 Mutex::Locker
l(lock
);
449 dout(1) << "Creating threads for " << modules
.size() << " modules" << dendl
;
450 for (auto& i
: modules
) {
451 auto thread
= new ServeThread(i
.second
.get());
452 serve_threads
[i
.first
].reset(thread
);
455 for (auto &i
: serve_threads
) {
456 std::ostringstream thread_name
;
457 thread_name
<< "mgr." << i
.first
;
458 dout(4) << "Starting thread for " << i
.first
<< dendl
;
459 i
.second
->create(thread_name
.str().c_str());
463 void PyModules::shutdown()
465 Mutex::Locker
locker(lock
);
466 assert(global_handle
);
468 // Signal modules to drop out of serve() and/or tear down resources
469 for (auto &i
: modules
) {
470 auto module
= i
.second
.get();
471 const auto& name
= i
.first
;
472 dout(10) << "waiting for module " << name
<< " to shutdown" << dendl
;
476 dout(10) << "module " << name
<< " shutdown" << dendl
;
479 // For modules implementing serve(), finish the threads where we
480 // were running that.
481 for (auto &i
: serve_threads
) {
486 serve_threads
.clear();
490 PyEval_RestoreThread(pMainThreadState
);
493 // nobody needs me anymore.
494 global_handle
= nullptr;
497 void PyModules::notify_all(const std::string
¬ify_type
,
498 const std::string
¬ify_id
)
500 Mutex::Locker
l(lock
);
502 dout(10) << __func__
<< ": notify_all " << notify_type
<< dendl
;
503 for (auto& i
: modules
) {
504 auto module
= i
.second
.get();
505 if (!serve_threads
[i
.first
]->running
)
507 // Send all python calls down a Finisher to avoid blocking
508 // C++ code, and avoid any potential lock cycles.
509 finisher
.queue(new FunctionContext([module
, notify_type
, notify_id
](int r
){
510 module
->notify(notify_type
, notify_id
);
515 void PyModules::notify_all(const LogEntry
&log_entry
)
517 Mutex::Locker
l(lock
);
519 dout(10) << __func__
<< ": notify_all (clog)" << dendl
;
520 for (auto& i
: modules
) {
521 auto module
= i
.second
.get();
522 if (!serve_threads
[i
.first
]->running
)
524 // Send all python calls down a Finisher to avoid blocking
525 // C++ code, and avoid any potential lock cycles.
527 // Note intentional use of non-reference lambda binding on
528 // log_entry: we take a copy because caller's instance is
529 // probably ephemeral.
530 finisher
.queue(new FunctionContext([module
, log_entry
](int r
){
531 module
->notify_clog(log_entry
);
536 bool PyModules::get_config(const std::string
&handle
,
537 const std::string
&key
, std::string
*val
) const
539 PyThreadState
*tstate
= PyEval_SaveThread();
540 Mutex::Locker
l(lock
);
541 PyEval_RestoreThread(tstate
);
543 const std::string global_key
= config_prefix
+ handle
+ "/" + key
;
545 dout(4) << __func__
<< "key: " << global_key
<< dendl
;
547 if (config_cache
.count(global_key
)) {
548 *val
= config_cache
.at(global_key
);
555 PyObject
*PyModules::get_config_prefix(const std::string
&handle
,
556 const std::string
&prefix
) const
558 PyThreadState
*tstate
= PyEval_SaveThread();
559 Mutex::Locker
l(lock
);
560 PyEval_RestoreThread(tstate
);
562 const std::string base_prefix
= config_prefix
+ handle
+ "/";
563 const std::string global_prefix
= base_prefix
+ prefix
;
564 dout(4) << __func__
<< "prefix: " << global_prefix
<< dendl
;
567 for (auto p
= config_cache
.lower_bound(global_prefix
);
568 p
!= config_cache
.end() && p
->first
.find(global_prefix
) == 0;
570 f
.dump_string(p
->first
.c_str() + base_prefix
.size(), p
->second
);
575 void PyModules::set_config(const std::string
&handle
,
576 const std::string
&key
, const std::string
&val
)
578 const std::string global_key
= config_prefix
+ handle
+ "/" + key
;
582 PyThreadState
*tstate
= PyEval_SaveThread();
583 Mutex::Locker
l(lock
);
584 PyEval_RestoreThread(tstate
);
585 config_cache
[global_key
] = val
;
587 std::ostringstream cmd_json
;
590 jf
.open_object_section("cmd");
591 jf
.dump_string("prefix", "config-key put");
592 jf
.dump_string("key", global_key
);
593 jf
.dump_string("val", val
);
597 set_cmd
.run(&monc
, cmd_json
.str());
601 if (set_cmd
.r
!= 0) {
602 // config-key put will fail if mgr's auth key has insufficient
603 // permission to set config keys
604 // FIXME: should this somehow raise an exception back into Python land?
605 dout(0) << "`config-key put " << global_key
<< " " << val
<< "` failed: "
606 << cpp_strerror(set_cmd
.r
) << dendl
;
607 dout(0) << "mon returned " << set_cmd
.r
<< ": " << set_cmd
.outs
<< dendl
;
611 std::vector
<ModuleCommand
> PyModules::get_commands()
613 Mutex::Locker
l(lock
);
615 std::vector
<ModuleCommand
> result
;
616 for (auto& i
: modules
) {
617 auto module
= i
.second
.get();
618 auto mod_commands
= module
->get_commands();
619 for (auto j
: mod_commands
) {
627 void PyModules::insert_config(const std::map
<std::string
,
628 std::string
> &new_config
)
630 Mutex::Locker
l(lock
);
632 dout(4) << "Loaded " << new_config
.size() << " config settings" << dendl
;
633 config_cache
= new_config
;
636 void PyModules::log(const std::string
&handle
,
637 int level
, const std::string
&record
)
640 #define dout_prefix *_dout << "mgr[" << handle << "] "
641 dout(level
) << record
<< dendl
;
643 #define dout_prefix *_dout << "mgr " << __func__ << " "
646 PyObject
* PyModules::get_counter_python(
647 const std::string
&handle
,
648 const std::string
&svc_name
,
649 const std::string
&svc_id
,
650 const std::string
&path
)
652 PyThreadState
*tstate
= PyEval_SaveThread();
653 Mutex::Locker
l(lock
);
654 PyEval_RestoreThread(tstate
);
657 f
.open_array_section(path
.c_str());
659 auto metadata
= daemon_state
.get(DaemonKey(svc_name
, svc_id
));
661 // FIXME: this is unsafe, I need to either be inside DaemonStateIndex's
662 // lock or put a lock on individual DaemonStates
664 if (metadata
->perf_counters
.instances
.count(path
)) {
665 auto counter_instance
= metadata
->perf_counters
.instances
.at(path
);
666 const auto &data
= counter_instance
.get_data();
667 for (const auto &datapoint
: data
) {
668 f
.open_array_section("datapoint");
669 f
.dump_unsigned("t", datapoint
.t
.sec());
670 f
.dump_unsigned("v", datapoint
.v
);
675 dout(4) << "Missing counter: '" << path
<< "' ("
676 << svc_name
<< "." << svc_id
<< ")" << dendl
;
677 dout(20) << "Paths are:" << dendl
;
678 for (const auto &i
: metadata
->perf_counters
.instances
) {
679 dout(20) << i
.first
<< dendl
;
683 dout(4) << "No daemon state for "
684 << svc_name
<< "." << svc_id
<< ")" << dendl
;
690 PyObject
*PyModules::get_context()
692 PyThreadState
*tstate
= PyEval_SaveThread();
693 Mutex::Locker
l(lock
);
694 PyEval_RestoreThread(tstate
);
696 // Construct a capsule containing ceph context.
697 // Not incrementing/decrementing ref count on the context because
698 // it's the global one and it has process lifetime.
699 auto capsule
= PyCapsule_New(g_ceph_context
, nullptr, nullptr);
703 static void _list_modules(
704 const std::string path
,
705 std::set
<std::string
> *modules
)
707 DIR *dir
= opendir(path
.c_str());
711 struct dirent
*entry
= NULL
;
712 while ((entry
= readdir(dir
)) != NULL
) {
713 string
n(entry
->d_name
);
714 string fn
= path
+ "/" + n
;
716 int r
= ::stat(fn
.c_str(), &st
);
717 if (r
== 0 && S_ISDIR(st
.st_mode
)) {
718 string initfn
= fn
+ "/module.py";
719 r
= ::stat(initfn
.c_str(), &st
);
728 void PyModules::list_modules(std::set
<std::string
> *modules
)
730 _list_modules(g_conf
->mgr_module_path
, modules
);