1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2014 John Spray <john.spray@inktank.com>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
14 // Include this first to get python headers earlier
18 #include <boost/tokenizer.hpp>
19 #include "common/errno.h"
20 #include "include/stringify.h"
22 #include "PyFormatter.h"
24 #include "osd/OSDMap.h"
25 #include "mon/MonMap.h"
27 #include "mgr/MgrContext.h"
29 #include "PyModules.h"
31 #define dout_context g_ceph_context
32 #define dout_subsys ceph_subsys_mgr
34 #define dout_prefix *_dout << "mgr " << __func__ << " "
36 // definition for non-const static member
37 std::string
PyModules::config_prefix
;
39 // constructor/destructor implementations cannot be in .h,
40 // because ServeThread is still an "incomplete" type there
42 PyModules::PyModules(DaemonStateIndex
&ds
, ClusterState
&cs
,
43 MonClient
&mc
, Objecter
&objecter_
, Client
&client_
,
45 : daemon_state(ds
), cluster_state(cs
), monc(mc
),
46 objecter(objecter_
), client(client_
), finisher(f
),
50 PyModules::~PyModules() = default;
52 void PyModules::dump_server(const std::string
&hostname
,
53 const DaemonStateCollection
&dmc
,
56 f
->dump_string("hostname", hostname
);
57 f
->open_array_section("services");
58 std::string ceph_version
;
60 for (const auto &i
: dmc
) {
61 const auto &key
= i
.first
;
62 const std::string str_type
= ceph_entity_type_name(key
.first
);
63 const std::string
&svc_name
= key
.second
;
65 // TODO: pick the highest version, and make sure that
66 // somewhere else (during health reporting?) we are
67 // indicating to the user if we see mixed versions
68 auto ver_iter
= i
.second
->metadata
.find("ceph_version");
69 if (ver_iter
!= i
.second
->metadata
.end()) {
70 ceph_version
= i
.second
->metadata
.at("ceph_version");
73 f
->open_object_section("service");
74 f
->dump_string("type", str_type
);
75 f
->dump_string("id", svc_name
);
80 f
->dump_string("ceph_version", ceph_version
);
85 PyObject
*PyModules::get_server_python(const std::string
&hostname
)
87 PyThreadState
*tstate
= PyEval_SaveThread();
88 Mutex::Locker
l(lock
);
89 PyEval_RestoreThread(tstate
);
90 dout(10) << " (" << hostname
<< ")" << dendl
;
92 auto dmc
= daemon_state
.get_by_server(hostname
);
95 dump_server(hostname
, dmc
, &f
);
100 PyObject
*PyModules::list_servers_python()
102 PyThreadState
*tstate
= PyEval_SaveThread();
103 Mutex::Locker
l(lock
);
104 PyEval_RestoreThread(tstate
);
105 dout(10) << " >" << dendl
;
107 PyFormatter
f(false, true);
108 const auto &all
= daemon_state
.get_all_servers();
109 for (const auto &i
: all
) {
110 const auto &hostname
= i
.first
;
112 f
.open_object_section("server");
113 dump_server(hostname
, i
.second
, &f
);
120 PyObject
*PyModules::get_metadata_python(std::string
const &handle
,
121 entity_type_t svc_type
, const std::string
&svc_id
)
123 auto metadata
= daemon_state
.get(DaemonKey(svc_type
, svc_id
));
125 f
.dump_string("hostname", metadata
->hostname
);
126 for (const auto &i
: metadata
->metadata
) {
127 f
.dump_string(i
.first
.c_str(), i
.second
);
134 PyObject
*PyModules::get_python(const std::string
&what
)
136 PyThreadState
*tstate
= PyEval_SaveThread();
137 Mutex::Locker
l(lock
);
138 PyEval_RestoreThread(tstate
);
140 if (what
== "fs_map") {
142 cluster_state
.with_fsmap([&f
](const FSMap
&fsmap
) {
146 } else if (what
== "osdmap_crush_map_text") {
148 cluster_state
.with_osdmap([&rdata
](const OSDMap
&osd_map
){
149 osd_map
.crush
->encode(rdata
, CEPH_FEATURES_SUPPORTED_DEFAULT
);
151 std::string crush_text
= rdata
.to_str();
152 return PyString_FromString(crush_text
.c_str());
153 } else if (what
.substr(0, 7) == "osd_map") {
155 cluster_state
.with_osdmap([&f
, &what
](const OSDMap
&osd_map
){
156 if (what
== "osd_map") {
158 } else if (what
== "osd_map_tree") {
159 osd_map
.print_tree(&f
, nullptr);
160 } else if (what
== "osd_map_crush") {
161 osd_map
.crush
->dump(&f
);
165 } else if (what
== "config") {
167 g_conf
->show_config(&f
);
169 } else if (what
== "mon_map") {
171 cluster_state
.with_monmap(
172 [&f
](const MonMap
&monmap
) {
177 } else if (what
== "osd_metadata") {
179 auto dmc
= daemon_state
.get_by_type(CEPH_ENTITY_TYPE_OSD
);
180 for (const auto &i
: dmc
) {
181 f
.open_object_section(i
.first
.second
.c_str());
182 f
.dump_string("hostname", i
.second
->hostname
);
183 for (const auto &j
: i
.second
->metadata
) {
184 f
.dump_string(j
.first
.c_str(), j
.second
);
189 } else if (what
== "pg_summary") {
191 cluster_state
.with_pgmap(
192 [&f
](const PGMap
&pg_map
) {
193 std::map
<std::string
, std::map
<std::string
, uint32_t> > osds
;
194 std::map
<std::string
, std::map
<std::string
, uint32_t> > pools
;
195 std::map
<std::string
, uint32_t> all
;
196 for (const auto &i
: pg_map
.pg_stat
) {
197 const auto pool
= i
.first
.m_pool
;
198 const std::string state
= pg_state_string(i
.second
.state
);
199 // Insert to per-pool map
200 pools
[stringify(pool
)][state
]++;
201 for (const auto &osd_id
: i
.second
.acting
) {
202 osds
[stringify(osd_id
)][state
]++;
206 f
.open_object_section("by_osd");
207 for (const auto &i
: osds
) {
208 f
.open_object_section(i
.first
.c_str());
209 for (const auto &j
: i
.second
) {
210 f
.dump_int(j
.first
.c_str(), j
.second
);
215 f
.open_object_section("by_pool");
216 for (const auto &i
: pools
) {
217 f
.open_object_section(i
.first
.c_str());
218 for (const auto &j
: i
.second
) {
219 f
.dump_int(j
.first
.c_str(), j
.second
);
224 f
.open_object_section("all");
225 for (const auto &i
: all
) {
226 f
.dump_int(i
.first
.c_str(), i
.second
);
233 } else if (what
== "df") {
236 cluster_state
.with_osdmap([this, &f
](const OSDMap
&osd_map
){
237 cluster_state
.with_pgmap(
238 [&osd_map
, &f
](const PGMap
&pg_map
) {
239 pg_map
.dump_fs_stats(nullptr, &f
, true);
240 pg_map
.dump_pool_stats_full(osd_map
, nullptr, &f
, true);
244 } else if (what
== "osd_stats") {
246 cluster_state
.with_pgmap(
247 [&f
](const PGMap
&pg_map
) {
248 pg_map
.dump_osd_stats(&f
);
251 } else if (what
== "health" || what
== "mon_status") {
254 if (what
== "health") {
255 json
= cluster_state
.get_health();
256 } else if (what
== "mon_status") {
257 json
= cluster_state
.get_mon_status();
261 f
.dump_string("json", json
.to_str());
264 derr
<< "Python module requested unknown data '" << what
<< "'" << dendl
;
269 std::string
PyModules::get_site_packages()
271 std::stringstream site_packages
;
273 // CPython doesn't auto-add site-packages dirs to sys.path for us,
274 // but it does provide a module that we can ask for them.
275 auto site_module
= PyImport_ImportModule("site");
278 auto site_packages_fn
= PyObject_GetAttrString(site_module
, "getsitepackages");
279 if (site_packages_fn
!= nullptr) {
280 auto site_packages_list
= PyObject_CallObject(site_packages_fn
, nullptr);
281 assert(site_packages_list
);
283 auto n
= PyList_Size(site_packages_list
);
284 for (Py_ssize_t i
= 0; i
< n
; ++i
) {
286 site_packages
<< ":";
288 site_packages
<< PyString_AsString(PyList_GetItem(site_packages_list
, i
));
291 Py_DECREF(site_packages_list
);
292 Py_DECREF(site_packages_fn
);
294 // Fall back to generating our own site-packages paths by imitating
295 // what the standard site.py does. This is annoying but it lets us
296 // run inside virtualenvs :-/
298 auto site_packages_fn
= PyObject_GetAttrString(site_module
, "addsitepackages");
299 assert(site_packages_fn
);
301 auto known_paths
= PySet_New(nullptr);
302 auto pArgs
= PyTuple_Pack(1, known_paths
);
303 PyObject_CallObject(site_packages_fn
, pArgs
);
305 Py_DECREF(known_paths
);
306 Py_DECREF(site_packages_fn
);
308 auto sys_module
= PyImport_ImportModule("sys");
310 auto sys_path
= PyObject_GetAttrString(sys_module
, "path");
313 dout(1) << "sys.path:" << dendl
;
314 auto n
= PyList_Size(sys_path
);
316 for (Py_ssize_t i
= 0; i
< n
; ++i
) {
317 dout(1) << " " << PyString_AsString(PyList_GetItem(sys_path
, i
)) << dendl
;
321 site_packages
<< ":";
323 site_packages
<< PyString_AsString(PyList_GetItem(sys_path
, i
));
327 Py_DECREF(sys_module
);
330 Py_DECREF(site_module
);
332 return site_packages
.str();
336 int PyModules::init()
338 Mutex::Locker
locker(lock
);
340 global_handle
= this;
341 // namespace in config-key prefixed by "mgr/"
342 config_prefix
= std::string(g_conf
->name
.get_type_str()) + "/";
344 // Set up global python interpreter
345 Py_SetProgramName(const_cast<char*>(PYTHON_EXECUTABLE
));
348 // Let CPython know that we will be calling it back from other
349 // threads in future.
350 if (! PyEval_ThreadsInitialized()) {
351 PyEval_InitThreads();
354 // Configure sys.path to include mgr_module_path
355 std::string sys_path
= std::string(Py_GetPath()) + ":" + get_site_packages()
356 + ":" + g_conf
->mgr_module_path
;
357 dout(10) << "Computed sys.path '" << sys_path
<< "'" << dendl
;
359 // Drop the GIL and remember the main thread state (current
360 // thread state becomes NULL)
361 pMainThreadState
= PyEval_SaveThread();
364 boost::tokenizer
<> tok(g_conf
->mgr_modules
);
365 for(const auto& module_name
: tok
) {
366 dout(1) << "Loading python module '" << module_name
<< "'" << dendl
;
367 auto mod
= std::unique_ptr
<MgrPyModule
>(new MgrPyModule(module_name
, sys_path
, pMainThreadState
));
370 // Don't use handle_pyerror() here; we don't have the GIL
371 // or the right thread state (this is deliberate).
372 derr
<< "Error loading module '" << module_name
<< "': "
373 << cpp_strerror(r
) << dendl
;
374 // Don't drop out here, load the other modules
377 modules
[module_name
] = std::move(mod
);
384 class ServeThread
: public Thread
391 ServeThread(MgrPyModule
*mod_
)
394 void *entry() override
398 // No need to acquire the GIL here; the module does it.
399 dout(4) << "Entering thread for " << mod
->get_name() << dendl
;
407 void PyModules::start()
409 Mutex::Locker
l(lock
);
411 dout(1) << "Creating threads for " << modules
.size() << " modules" << dendl
;
412 for (auto& i
: modules
) {
413 auto thread
= new ServeThread(i
.second
.get());
414 serve_threads
[i
.first
].reset(thread
);
417 for (auto &i
: serve_threads
) {
418 std::ostringstream thread_name
;
419 thread_name
<< "mgr." << i
.first
;
420 dout(4) << "Starting thread for " << i
.first
<< dendl
;
421 i
.second
->create(thread_name
.str().c_str());
425 void PyModules::shutdown()
427 Mutex::Locker
locker(lock
);
428 assert(global_handle
);
430 // Signal modules to drop out of serve() and/or tear down resources
431 for (auto &i
: modules
) {
432 auto module
= i
.second
.get();
433 const auto& name
= i
.first
;
434 dout(10) << "waiting for module " << name
<< " to shutdown" << dendl
;
438 dout(10) << "module " << name
<< " shutdown" << dendl
;
441 // For modules implementing serve(), finish the threads where we
442 // were running that.
443 for (auto &i
: serve_threads
) {
448 serve_threads
.clear();
452 PyEval_RestoreThread(pMainThreadState
);
455 // nobody needs me anymore.
456 global_handle
= nullptr;
459 void PyModules::notify_all(const std::string
¬ify_type
,
460 const std::string
¬ify_id
)
462 Mutex::Locker
l(lock
);
464 dout(10) << __func__
<< ": notify_all " << notify_type
<< dendl
;
465 for (auto& i
: modules
) {
466 auto module
= i
.second
.get();
467 if (!serve_threads
[i
.first
]->running
)
469 // Send all python calls down a Finisher to avoid blocking
470 // C++ code, and avoid any potential lock cycles.
471 finisher
.queue(new FunctionContext([module
, notify_type
, notify_id
](int r
){
472 module
->notify(notify_type
, notify_id
);
477 void PyModules::notify_all(const LogEntry
&log_entry
)
479 Mutex::Locker
l(lock
);
481 dout(10) << __func__
<< ": notify_all (clog)" << dendl
;
482 for (auto& i
: modules
) {
483 auto module
= i
.second
.get();
484 if (!serve_threads
[i
.first
]->running
)
486 // Send all python calls down a Finisher to avoid blocking
487 // C++ code, and avoid any potential lock cycles.
489 // Note intentional use of non-reference lambda binding on
490 // log_entry: we take a copy because caller's instance is
491 // probably ephemeral.
492 finisher
.queue(new FunctionContext([module
, log_entry
](int r
){
493 module
->notify_clog(log_entry
);
498 bool PyModules::get_config(const std::string
&handle
,
499 const std::string
&key
, std::string
*val
) const
501 PyThreadState
*tstate
= PyEval_SaveThread();
502 Mutex::Locker
l(lock
);
503 PyEval_RestoreThread(tstate
);
505 const std::string global_key
= config_prefix
+ handle
+ "/" + key
;
507 dout(4) << __func__
<< "key: " << global_key
<< dendl
;
509 if (config_cache
.count(global_key
)) {
510 *val
= config_cache
.at(global_key
);
517 PyObject
*PyModules::get_config_prefix(const std::string
&handle
,
518 const std::string
&prefix
) const
520 PyThreadState
*tstate
= PyEval_SaveThread();
521 Mutex::Locker
l(lock
);
522 PyEval_RestoreThread(tstate
);
524 const std::string base_prefix
= config_prefix
+ handle
+ "/";
525 const std::string global_prefix
= base_prefix
+ prefix
;
526 dout(4) << __func__
<< "prefix: " << global_prefix
<< dendl
;
529 for (auto p
= config_cache
.lower_bound(global_prefix
);
530 p
!= config_cache
.end() && p
->first
.find(global_prefix
) == 0;
532 f
.dump_string(p
->first
.c_str() + base_prefix
.size(), p
->second
);
537 void PyModules::set_config(const std::string
&handle
,
538 const std::string
&key
, const std::string
&val
)
540 const std::string global_key
= config_prefix
+ handle
+ "/" + key
;
544 PyThreadState
*tstate
= PyEval_SaveThread();
545 Mutex::Locker
l(lock
);
546 PyEval_RestoreThread(tstate
);
547 config_cache
[global_key
] = val
;
549 std::ostringstream cmd_json
;
552 jf
.open_object_section("cmd");
553 jf
.dump_string("prefix", "config-key put");
554 jf
.dump_string("key", global_key
);
555 jf
.dump_string("val", val
);
559 set_cmd
.run(&monc
, cmd_json
.str());
563 if (set_cmd
.r
!= 0) {
564 // config-key put will fail if mgr's auth key has insufficient
565 // permission to set config keys
566 // FIXME: should this somehow raise an exception back into Python land?
567 dout(0) << "`config-key put " << global_key
<< " " << val
<< "` failed: "
568 << cpp_strerror(set_cmd
.r
) << dendl
;
569 dout(0) << "mon returned " << set_cmd
.r
<< ": " << set_cmd
.outs
<< dendl
;
573 std::vector
<ModuleCommand
> PyModules::get_commands()
575 Mutex::Locker
l(lock
);
577 std::vector
<ModuleCommand
> result
;
578 for (auto& i
: modules
) {
579 auto module
= i
.second
.get();
580 auto mod_commands
= module
->get_commands();
581 for (auto j
: mod_commands
) {
589 void PyModules::insert_config(const std::map
<std::string
,
590 std::string
> &new_config
)
592 Mutex::Locker
l(lock
);
594 dout(4) << "Loaded " << new_config
.size() << " config settings" << dendl
;
595 config_cache
= new_config
;
598 void PyModules::log(const std::string
&handle
,
599 int level
, const std::string
&record
)
602 #define dout_prefix *_dout << "mgr[" << handle << "] "
603 dout(level
) << record
<< dendl
;
605 #define dout_prefix *_dout << "mgr " << __func__ << " "
608 PyObject
* PyModules::get_counter_python(
609 const std::string
&handle
,
610 entity_type_t svc_type
,
611 const std::string
&svc_id
,
612 const std::string
&path
)
614 PyThreadState
*tstate
= PyEval_SaveThread();
615 Mutex::Locker
l(lock
);
616 PyEval_RestoreThread(tstate
);
619 f
.open_array_section(path
.c_str());
621 auto metadata
= daemon_state
.get(DaemonKey(svc_type
, svc_id
));
623 // FIXME: this is unsafe, I need to either be inside DaemonStateIndex's
624 // lock or put a lock on individual DaemonStates
626 if (metadata
->perf_counters
.instances
.count(path
)) {
627 auto counter_instance
= metadata
->perf_counters
.instances
.at(path
);
628 const auto &data
= counter_instance
.get_data();
629 for (const auto &datapoint
: data
) {
630 f
.open_array_section("datapoint");
631 f
.dump_unsigned("t", datapoint
.t
.sec());
632 f
.dump_unsigned("v", datapoint
.v
);
637 dout(4) << "Missing counter: '" << path
<< "' ("
638 << ceph_entity_type_name(svc_type
) << "."
639 << svc_id
<< ")" << dendl
;
640 dout(20) << "Paths are:" << dendl
;
641 for (const auto &i
: metadata
->perf_counters
.instances
) {
642 dout(20) << i
.first
<< dendl
;
646 dout(4) << "No daemon state for "
647 << ceph_entity_type_name(svc_type
) << "."
648 << svc_id
<< ")" << dendl
;
654 PyObject
*PyModules::get_context()
656 PyThreadState
*tstate
= PyEval_SaveThread();
657 Mutex::Locker
l(lock
);
658 PyEval_RestoreThread(tstate
);
660 // Construct a capsule containing ceph context.
661 // Not incrementing/decrementing ref count on the context because
662 // it's the global one and it has process lifetime.
663 auto capsule
= PyCapsule_New(g_ceph_context
, nullptr, nullptr);