]> git.proxmox.com Git - ceph.git/blame - ceph/src/mgr/PyModules.cc
bump version to 12.0.3-pve3
[ceph.git] / ceph / src / mgr / PyModules.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2014 John Spray <john.spray@inktank.com>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 */
13
14// Include this first to get python headers earlier
15#include "PyState.h"
16
17#include <boost/tokenizer.hpp>
18#include "common/errno.h"
19#include "include/stringify.h"
20
21#include "PyFormatter.h"
22
23#include "osd/OSDMap.h"
24#include "mon/MonMap.h"
25
26#include "mgr/MgrContext.h"
27
28#include "PyModules.h"
29
30#define dout_context g_ceph_context
31#define dout_subsys ceph_subsys_mgr
32
33#undef dout_prefix
34#define dout_prefix *_dout << "mgr[py] "
35
36namespace {
37 PyObject* log_write(PyObject*, PyObject* args) {
38 char* m = nullptr;
39 if (PyArg_ParseTuple(args, "s", &m)) {
40 auto len = strlen(m);
41 if (len && m[len-1] == '\n') {
42 m[len-1] = '\0';
43 }
44 dout(4) << m << dendl;
45 }
46 Py_RETURN_NONE;
47 }
48
49 PyObject* log_flush(PyObject*, PyObject*){
50 Py_RETURN_NONE;
51 }
52
53 static PyMethodDef log_methods[] = {
54 {"write", log_write, METH_VARARGS, "write stdout and stderr"},
55 {"flush", log_flush, METH_VARARGS, "flush"},
56 {nullptr, nullptr, 0, nullptr}
57 };
58}
59
60#undef dout_prefix
61#define dout_prefix *_dout << "mgr " << __func__ << " "
62
63PyModules::PyModules(DaemonStateIndex &ds, ClusterState &cs, MonClient &mc,
64 Objecter &objecter_, Client &client_,
65 Finisher &f)
66 : daemon_state(ds), cluster_state(cs), monc(mc),
67 objecter(objecter_), client(client_),
68 finisher(f)
69{}
70
71// we can not have the default destructor in header, because ServeThread is
72// still an "incomplete" type. so we need to define the destructor here.
73PyModules::~PyModules() = default;
74
75void PyModules::dump_server(const std::string &hostname,
76 const DaemonStateCollection &dmc,
77 Formatter *f)
78{
79 f->dump_string("hostname", hostname);
80 f->open_array_section("services");
81 std::string ceph_version;
82
83 for (const auto &i : dmc) {
84 const auto &key = i.first;
85 const std::string str_type = ceph_entity_type_name(key.first);
86 const std::string &svc_name = key.second;
87
88 // TODO: pick the highest version, and make sure that
89 // somewhere else (during health reporting?) we are
90 // indicating to the user if we see mixed versions
91 auto ver_iter = i.second->metadata.find("ceph_version");
92 if (ver_iter != i.second->metadata.end()) {
93 ceph_version = i.second->metadata.at("ceph_version");
94 }
95
96 f->open_object_section("service");
97 f->dump_string("type", str_type);
98 f->dump_string("id", svc_name);
99 f->close_section();
100 }
101 f->close_section();
102
103 f->dump_string("ceph_version", ceph_version);
104}
105
106
107
108PyObject *PyModules::get_server_python(const std::string &hostname)
109{
110 PyThreadState *tstate = PyEval_SaveThread();
111 Mutex::Locker l(lock);
112 PyEval_RestoreThread(tstate);
113 dout(10) << " (" << hostname << ")" << dendl;
114
115 auto dmc = daemon_state.get_by_server(hostname);
116
117 PyFormatter f;
118 dump_server(hostname, dmc, &f);
119 return f.get();
120}
121
122
123PyObject *PyModules::list_servers_python()
124{
125 PyThreadState *tstate = PyEval_SaveThread();
126 Mutex::Locker l(lock);
127 PyEval_RestoreThread(tstate);
128 dout(10) << " >" << dendl;
129
130 PyFormatter f(false, true);
131 const auto &all = daemon_state.get_all_servers();
132 for (const auto &i : all) {
133 const auto &hostname = i.first;
134
135 f.open_object_section("server");
136 dump_server(hostname, i.second, &f);
137 f.close_section();
138 }
139
140 return f.get();
141}
142
143PyObject *PyModules::get_metadata_python(std::string const &handle,
144 entity_type_t svc_type, const std::string &svc_id)
145{
146 auto metadata = daemon_state.get(DaemonKey(svc_type, svc_id));
147 PyFormatter f;
148 f.dump_string("hostname", metadata->hostname);
149 for (const auto &i : metadata->metadata) {
150 f.dump_string(i.first.c_str(), i.second);
151 }
152
153 return f.get();
154}
155
156
157PyObject *PyModules::get_python(const std::string &what)
158{
159 PyThreadState *tstate = PyEval_SaveThread();
160 Mutex::Locker l(lock);
161 PyEval_RestoreThread(tstate);
162
163 if (what == "fs_map") {
164 PyFormatter f;
165 cluster_state.with_fsmap([&f](const FSMap &fsmap) {
166 fsmap.dump(&f);
167 });
168 return f.get();
169 } else if (what == "osdmap_crush_map_text") {
170 bufferlist rdata;
171 cluster_state.with_osdmap([&rdata](const OSDMap &osd_map){
172 osd_map.crush->encode(rdata, CEPH_FEATURES_SUPPORTED_DEFAULT);
173 });
174 std::string crush_text = rdata.to_str();
175 return PyString_FromString(crush_text.c_str());
176 } else if (what.substr(0, 7) == "osd_map") {
177 PyFormatter f;
178 cluster_state.with_osdmap([&f, &what](const OSDMap &osd_map){
179 if (what == "osd_map") {
180 osd_map.dump(&f);
181 } else if (what == "osd_map_tree") {
182 osd_map.print_tree(&f, nullptr);
183 } else if (what == "osd_map_crush") {
184 osd_map.crush->dump(&f);
185 }
186 });
187 return f.get();
188 } else if (what == "config") {
189 PyFormatter f;
190 g_conf->show_config(&f);
191 return f.get();
192 } else if (what == "mon_map") {
193 PyFormatter f;
194 cluster_state.with_monmap(
195 [&f](const MonMap &monmap) {
196 monmap.dump(&f);
197 }
198 );
199 return f.get();
200 } else if (what == "osd_metadata") {
201 PyFormatter f;
202 auto dmc = daemon_state.get_by_type(CEPH_ENTITY_TYPE_OSD);
203 for (const auto &i : dmc) {
204 f.open_object_section(i.first.second.c_str());
205 f.dump_string("hostname", i.second->hostname);
206 for (const auto &j : i.second->metadata) {
207 f.dump_string(j.first.c_str(), j.second);
208 }
209 f.close_section();
210 }
211 return f.get();
212 } else if (what == "pg_summary") {
213 PyFormatter f;
214 cluster_state.with_pgmap(
215 [&f](const PGMap &pg_map) {
216 std::map<std::string, std::map<std::string, uint32_t> > osds;
217 std::map<std::string, std::map<std::string, uint32_t> > pools;
218 std::map<std::string, uint32_t> all;
219 for (const auto &i : pg_map.pg_stat) {
220 const auto pool = i.first.m_pool;
221 const std::string state = pg_state_string(i.second.state);
222 // Insert to per-pool map
223 pools[stringify(pool)][state]++;
224 for (const auto &osd_id : i.second.acting) {
225 osds[stringify(osd_id)][state]++;
226 }
227 all[state]++;
228 }
229 f.open_object_section("by_osd");
230 for (const auto &i : osds) {
231 f.open_object_section(i.first.c_str());
232 for (const auto &j : i.second) {
233 f.dump_int(j.first.c_str(), j.second);
234 }
235 f.close_section();
236 }
237 f.close_section();
238 f.open_object_section("by_pool");
239 for (const auto &i : pools) {
240 f.open_object_section(i.first.c_str());
241 for (const auto &j : i.second) {
242 f.dump_int(j.first.c_str(), j.second);
243 }
244 f.close_section();
245 }
246 f.close_section();
247 f.open_object_section("all");
248 for (const auto &i : all) {
249 f.dump_int(i.first.c_str(), i.second);
250 }
251 f.close_section();
252 }
253 );
254 return f.get();
255
256 } else if (what == "df") {
257 PyFormatter f;
258
259 cluster_state.with_osdmap([this, &f](const OSDMap &osd_map){
260 cluster_state.with_pgmap(
261 [&osd_map, &f](const PGMap &pg_map) {
262 pg_map.dump_fs_stats(nullptr, &f, true);
263 pg_map.dump_pool_stats(osd_map, nullptr, &f, true);
264 });
265 });
266 return f.get();
267 } else if (what == "osd_stats") {
268 PyFormatter f;
269 cluster_state.with_pgmap(
270 [&f](const PGMap &pg_map) {
271 pg_map.dump_osd_stats(&f);
272 });
273 return f.get();
274 } else if (what == "health" || what == "mon_status") {
275 PyFormatter f;
276 bufferlist json;
277 if (what == "health") {
278 json = cluster_state.get_health();
279 } else if (what == "mon_status") {
280 json = cluster_state.get_mon_status();
281 } else {
282 assert(false);
283 }
284 f.dump_string("json", json.to_str());
285 return f.get();
286 } else {
287 derr << "Python module requested unknown data '" << what << "'" << dendl;
288 Py_RETURN_NONE;
289 }
290}
291
292std::string PyModules::get_site_packages()
293{
294 std::stringstream site_packages;
295
296 // CPython doesn't auto-add site-packages dirs to sys.path for us,
297 // but it does provide a module that we can ask for them.
298 auto site_module = PyImport_ImportModule("site");
299 assert(site_module);
300
301 auto site_packages_fn = PyObject_GetAttrString(site_module, "getsitepackages");
302 if (site_packages_fn != nullptr) {
303 auto site_packages_list = PyObject_CallObject(site_packages_fn, nullptr);
304 assert(site_packages_list);
305
306 auto n = PyList_Size(site_packages_list);
307 for (Py_ssize_t i = 0; i < n; ++i) {
308 if (i != 0) {
309 site_packages << ":";
310 }
311 site_packages << PyString_AsString(PyList_GetItem(site_packages_list, i));
312 }
313
314 Py_DECREF(site_packages_list);
315 Py_DECREF(site_packages_fn);
316 } else {
317 // Fall back to generating our own site-packages paths by imitating
318 // what the standard site.py does. This is annoying but it lets us
319 // run inside virtualenvs :-/
320
321 auto site_packages_fn = PyObject_GetAttrString(site_module, "addsitepackages");
322 assert(site_packages_fn);
323
324 auto known_paths = PySet_New(nullptr);
325 auto pArgs = PyTuple_Pack(1, known_paths);
326 PyObject_CallObject(site_packages_fn, pArgs);
327 Py_DECREF(pArgs);
328 Py_DECREF(known_paths);
329 Py_DECREF(site_packages_fn);
330
331 auto sys_module = PyImport_ImportModule("sys");
332 assert(sys_module);
333 auto sys_path = PyObject_GetAttrString(sys_module, "path");
334 assert(sys_path);
335
336 dout(1) << "sys.path:" << dendl;
337 auto n = PyList_Size(sys_path);
338 bool first = true;
339 for (Py_ssize_t i = 0; i < n; ++i) {
340 dout(1) << " " << PyString_AsString(PyList_GetItem(sys_path, i)) << dendl;
341 if (first) {
342 first = false;
343 } else {
344 site_packages << ":";
345 }
346 site_packages << PyString_AsString(PyList_GetItem(sys_path, i));
347 }
348
349 Py_DECREF(sys_path);
350 Py_DECREF(sys_module);
351 }
352
353 Py_DECREF(site_module);
354
355 return site_packages.str();
356}
357
358
359int PyModules::init()
360{
361 Mutex::Locker locker(lock);
362
363 global_handle = this;
364
365 // Set up global python interpreter
366 Py_SetProgramName(const_cast<char*>(PYTHON_EXECUTABLE));
367 Py_InitializeEx(0);
368
369 // Some python modules do not cope with an unpopulated argv, so lets
370 // fake one. This step also picks up site-packages into sys.path.
371 const char *argv[] = {"ceph-mgr"};
372 PySys_SetArgv(1, (char**)argv);
373
374 if (g_conf->daemonize) {
375 auto py_logger = Py_InitModule("ceph_logger", log_methods);
376#if PY_MAJOR_VERSION >= 3
377 PySys_SetObject("stderr", py_logger);
378 PySys_SetObject("stdout", py_logger);
379#else
380 PySys_SetObject(const_cast<char*>("stderr"), py_logger);
381 PySys_SetObject(const_cast<char*>("stdout"), py_logger);
382#endif
383 }
384 // Populate python namespace with callable hooks
385 Py_InitModule("ceph_state", CephStateMethods);
386
387 // Configure sys.path to include mgr_module_path
388 std::string sys_path = std::string(Py_GetPath()) + ":" + get_site_packages()
389 + ":" + g_conf->mgr_module_path;
390 dout(10) << "Computed sys.path '" << sys_path << "'" << dendl;
391 PySys_SetPath((char*)(sys_path.c_str()));
392
393 // Let CPython know that we will be calling it back from other
394 // threads in future.
395 if (! PyEval_ThreadsInitialized()) {
396 PyEval_InitThreads();
397 }
398
399 // Load python code
400 boost::tokenizer<> tok(g_conf->mgr_modules);
401 for(const auto& module_name : tok) {
402 dout(1) << "Loading python module '" << module_name << "'" << dendl;
403 auto mod = std::unique_ptr<MgrPyModule>(new MgrPyModule(module_name));
404 int r = mod->load();
405 if (r != 0) {
406 derr << "Error loading module '" << module_name << "': "
407 << cpp_strerror(r) << dendl;
408 derr << handle_pyerror() << dendl;
409 // Don't drop out here, load the other modules
410 } else {
411 // Success!
412 modules[module_name] = std::move(mod);
413 }
414 }
415
416 // Drop the GIL
417 PyEval_SaveThread();
418
419 return 0;
420}
421
422class ServeThread : public Thread
423{
424 MgrPyModule *mod;
425
426public:
427 ServeThread(MgrPyModule *mod_)
428 : mod(mod_) {}
429
430 void *entry() override
431 {
432 PyGILState_STATE gstate;
433 gstate = PyGILState_Ensure();
434
435 dout(4) << "Entering thread for " << mod->get_name() << dendl;
436 mod->serve();
437
438 PyGILState_Release(gstate);
439
440 return nullptr;
441 }
442};
443
444void PyModules::start()
445{
446 Mutex::Locker l(lock);
447
448 dout(1) << "Creating threads for " << modules.size() << " modules" << dendl;
449 for (auto& i : modules) {
450 auto thread = new ServeThread(i.second.get());
451 serve_threads[i.first].reset(thread);
452 }
453
454 for (auto &i : serve_threads) {
455 std::ostringstream thread_name;
456 thread_name << "mgr." << i.first;
457 dout(4) << "Starting thread for " << i.first << dendl;
458 i.second->create(thread_name.str().c_str());
459 }
460}
461
462void PyModules::shutdown()
463{
464 Mutex::Locker locker(lock);
465 assert(global_handle);
466
467 // Signal modules to drop out of serve() and/or tear down resources
468 for (auto &i : modules) {
469 auto module = i.second.get();
470 const auto& name = i.first;
471 dout(10) << "waiting for module " << name << " to shutdown" << dendl;
472 module->shutdown();
473 dout(10) << "module " << name << " shutdown" << dendl;
474 }
475
476 // For modules implementing serve(), finish the threads where we
477 // were running that.
478 for (auto &i : serve_threads) {
479 lock.Unlock();
480 i.second->join();
481 lock.Lock();
482 }
483 serve_threads.clear();
484
485 modules.clear();
486
487 PyGILState_Ensure();
488 Py_Finalize();
489
490 // nobody needs me anymore.
491 global_handle = nullptr;
492}
493
494void PyModules::notify_all(const std::string &notify_type,
495 const std::string &notify_id)
496{
497 Mutex::Locker l(lock);
498
499 dout(10) << __func__ << ": notify_all " << notify_type << dendl;
500 for (auto& i : modules) {
501 auto module = i.second.get();
502 // Send all python calls down a Finisher to avoid blocking
503 // C++ code, and avoid any potential lock cycles.
504 finisher.queue(new FunctionContext([module, notify_type, notify_id](int r){
505 module->notify(notify_type, notify_id);
506 }));
507 }
508}
509
510void PyModules::notify_all(const LogEntry &log_entry)
511{
512 Mutex::Locker l(lock);
513
514 dout(10) << __func__ << ": notify_all (clog)" << dendl;
515 for (auto& i : modules) {
516 auto module = i.second.get();
517 // Send all python calls down a Finisher to avoid blocking
518 // C++ code, and avoid any potential lock cycles.
519 //
520 // Note intentional use of non-reference lambda binding on
521 // log_entry: we take a copy because caller's instance is
522 // probably ephemeral.
523 finisher.queue(new FunctionContext([module, log_entry](int r){
524 module->notify_clog(log_entry);
525 }));
526 }
527}
528
529bool PyModules::get_config(const std::string &handle,
530 const std::string &key, std::string *val) const
531{
532 PyThreadState *tstate = PyEval_SaveThread();
533 Mutex::Locker l(lock);
534 PyEval_RestoreThread(tstate);
535
536 const std::string global_key = config_prefix + handle + "." + key;
537
538 if (config_cache.count(global_key)) {
539 *val = config_cache.at(global_key);
540 return true;
541 } else {
542 return false;
543 }
544}
545
546void PyModules::set_config(const std::string &handle,
547 const std::string &key, const std::string &val)
548{
549 const std::string global_key = config_prefix + handle + "." + key;
550
551 Command set_cmd;
552 {
553 PyThreadState *tstate = PyEval_SaveThread();
554 Mutex::Locker l(lock);
555 PyEval_RestoreThread(tstate);
556 config_cache[global_key] = val;
557
558 std::ostringstream cmd_json;
559
560 JSONFormatter jf;
561 jf.open_object_section("cmd");
562 jf.dump_string("prefix", "config-key put");
563 jf.dump_string("key", global_key);
564 jf.dump_string("val", val);
565 jf.close_section();
566 jf.flush(cmd_json);
567
568 set_cmd.run(&monc, cmd_json.str());
569 }
570 set_cmd.wait();
571
572 if (set_cmd.r != 0) {
573 // config-key put will fail if mgr's auth key has insufficient
574 // permission to set config keys
575 // FIXME: should this somehow raise an exception back into Python land?
576 dout(0) << "`config-key put " << global_key << " " << val << "` failed: "
577 << cpp_strerror(set_cmd.r) << dendl;
578 dout(0) << "mon returned " << set_cmd.r << ": " << set_cmd.outs << dendl;
579 }
580}
581
582std::vector<ModuleCommand> PyModules::get_commands()
583{
584 Mutex::Locker l(lock);
585
586 std::vector<ModuleCommand> result;
587 for (auto& i : modules) {
588 auto module = i.second.get();
589 auto mod_commands = module->get_commands();
590 for (auto j : mod_commands) {
591 result.push_back(j);
592 }
593 }
594
595 return result;
596}
597
598void PyModules::insert_config(const std::map<std::string,
599 std::string> &new_config)
600{
601 Mutex::Locker l(lock);
602
603 dout(4) << "Loaded " << new_config.size() << " config settings" << dendl;
604 config_cache = new_config;
605}
606
607void PyModules::log(const std::string &handle,
608 int level, const std::string &record)
609{
610#undef dout_prefix
611#define dout_prefix *_dout << "mgr[" << handle << "] "
612 dout(level) << record << dendl;
613#undef dout_prefix
614#define dout_prefix *_dout << "mgr " << __func__ << " "
615}
616
617PyObject* PyModules::get_counter_python(
618 const std::string &handle,
619 entity_type_t svc_type,
620 const std::string &svc_id,
621 const std::string &path)
622{
623 PyThreadState *tstate = PyEval_SaveThread();
624 Mutex::Locker l(lock);
625 PyEval_RestoreThread(tstate);
626
627 PyFormatter f;
628 f.open_array_section(path.c_str());
629
630 auto metadata = daemon_state.get(DaemonKey(svc_type, svc_id));
631
632 // FIXME: this is unsafe, I need to either be inside DaemonStateIndex's
633 // lock or put a lock on individual DaemonStates
634 if (metadata) {
635 if (metadata->perf_counters.instances.count(path)) {
636 auto counter_instance = metadata->perf_counters.instances.at(path);
637 const auto &data = counter_instance.get_data();
638 for (const auto &datapoint : data) {
639 f.open_array_section("datapoint");
640 f.dump_unsigned("t", datapoint.t.sec());
641 f.dump_unsigned("v", datapoint.v);
642 f.close_section();
643
644 }
645 } else {
646 dout(4) << "Missing counter: '" << path << "' ("
647 << ceph_entity_type_name(svc_type) << "."
648 << svc_id << ")" << dendl;
649 dout(20) << "Paths are:" << dendl;
650 for (const auto &i : metadata->perf_counters.instances) {
651 dout(20) << i.first << dendl;
652 }
653 }
654 } else {
655 dout(4) << "No daemon state for "
656 << ceph_entity_type_name(svc_type) << "."
657 << svc_id << ")" << dendl;
658 }
659 f.close_section();
660 return f.get();
661}
662
663PyObject *PyModules::get_context()
664{
665 PyThreadState *tstate = PyEval_SaveThread();
666 Mutex::Locker l(lock);
667 PyEval_RestoreThread(tstate);
668
669 // Construct a capsule containing ceph context.
670 // Not incrementing/decrementing ref count on the context because
671 // it's the global one and it has process lifetime.
672 auto capsule = PyCapsule_New(g_ceph_context, nullptr, nullptr);
673 return capsule;
674}
675