]> git.proxmox.com Git - ceph.git/blob - ceph/src/mgr/PyModules.cc
update sources to v12.1.1
[ceph.git] / ceph / src / mgr / PyModules.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2014 John Spray <john.spray@inktank.com>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 */
13
14 // Include this first to get python headers earlier
15 #include "PyState.h"
16 #include "Gil.h"
17
18 #include "common/errno.h"
19 #include "include/stringify.h"
20
21 #include "PyFormatter.h"
22
23 #include "osd/OSDMap.h"
24 #include "mon/MonMap.h"
25
26 #include "mgr/MgrContext.h"
27
28 #include "PyModules.h"
29
30 #define dout_context g_ceph_context
31 #define dout_subsys ceph_subsys_mgr
32 #undef dout_prefix
33 #define dout_prefix *_dout << "mgr " << __func__ << " "
34
35 // definition for non-const static member
36 std::string PyModules::config_prefix;
37
38 // constructor/destructor implementations cannot be in .h,
39 // because ServeThread is still an "incomplete" type there
40
41 PyModules::PyModules(DaemonStateIndex &ds, ClusterState &cs,
42 MonClient &mc, LogChannelRef clog_, Objecter &objecter_,
43 Client &client_, Finisher &f)
44 : daemon_state(ds), cluster_state(cs), monc(mc), clog(clog_),
45 objecter(objecter_), client(client_), finisher(f),
46 lock("PyModules")
47 {}
48
49 PyModules::~PyModules() = default;
50
51 void PyModules::dump_server(const std::string &hostname,
52 const DaemonStateCollection &dmc,
53 Formatter *f)
54 {
55 f->dump_string("hostname", hostname);
56 f->open_array_section("services");
57 std::string ceph_version;
58
59 for (const auto &i : dmc) {
60 const auto &key = i.first;
61 const std::string &str_type = key.first;
62 const std::string &svc_name = key.second;
63
64 // TODO: pick the highest version, and make sure that
65 // somewhere else (during health reporting?) we are
66 // indicating to the user if we see mixed versions
67 auto ver_iter = i.second->metadata.find("ceph_version");
68 if (ver_iter != i.second->metadata.end()) {
69 ceph_version = i.second->metadata.at("ceph_version");
70 }
71
72 f->open_object_section("service");
73 f->dump_string("type", str_type);
74 f->dump_string("id", svc_name);
75 f->close_section();
76 }
77 f->close_section();
78
79 f->dump_string("ceph_version", ceph_version);
80 }
81
82
83
84 PyObject *PyModules::get_server_python(const std::string &hostname)
85 {
86 PyThreadState *tstate = PyEval_SaveThread();
87 Mutex::Locker l(lock);
88 PyEval_RestoreThread(tstate);
89 dout(10) << " (" << hostname << ")" << dendl;
90
91 auto dmc = daemon_state.get_by_server(hostname);
92
93 PyFormatter f;
94 dump_server(hostname, dmc, &f);
95 return f.get();
96 }
97
98
99 PyObject *PyModules::list_servers_python()
100 {
101 PyThreadState *tstate = PyEval_SaveThread();
102 Mutex::Locker l(lock);
103 PyEval_RestoreThread(tstate);
104 dout(10) << " >" << dendl;
105
106 PyFormatter f(false, true);
107 const auto &all = daemon_state.get_all_servers();
108 for (const auto &i : all) {
109 const auto &hostname = i.first;
110
111 f.open_object_section("server");
112 dump_server(hostname, i.second, &f);
113 f.close_section();
114 }
115
116 return f.get();
117 }
118
119 PyObject *PyModules::get_metadata_python(
120 std::string const &handle,
121 const std::string &svc_name,
122 const std::string &svc_id)
123 {
124 auto metadata = daemon_state.get(DaemonKey(svc_name, svc_id));
125 PyFormatter f;
126 f.dump_string("hostname", metadata->hostname);
127 for (const auto &i : metadata->metadata) {
128 f.dump_string(i.first.c_str(), i.second);
129 }
130
131 return f.get();
132 }
133
134 PyObject *PyModules::get_daemon_status_python(
135 std::string const &handle,
136 const std::string &svc_name,
137 const std::string &svc_id)
138 {
139 auto metadata = daemon_state.get(DaemonKey(svc_name, svc_id));
140 PyFormatter f;
141 for (const auto &i : metadata->service_status) {
142 f.dump_string(i.first.c_str(), i.second);
143 }
144 return f.get();
145 }
146
147 PyObject *PyModules::get_python(const std::string &what)
148 {
149 PyThreadState *tstate = PyEval_SaveThread();
150 Mutex::Locker l(lock);
151 PyEval_RestoreThread(tstate);
152
153 if (what == "fs_map") {
154 PyFormatter f;
155 cluster_state.with_fsmap([&f](const FSMap &fsmap) {
156 fsmap.dump(&f);
157 });
158 return f.get();
159 } else if (what == "osdmap_crush_map_text") {
160 bufferlist rdata;
161 cluster_state.with_osdmap([&rdata](const OSDMap &osd_map){
162 osd_map.crush->encode(rdata, CEPH_FEATURES_SUPPORTED_DEFAULT);
163 });
164 std::string crush_text = rdata.to_str();
165 return PyString_FromString(crush_text.c_str());
166 } else if (what.substr(0, 7) == "osd_map") {
167 PyFormatter f;
168 cluster_state.with_osdmap([&f, &what](const OSDMap &osd_map){
169 if (what == "osd_map") {
170 osd_map.dump(&f);
171 } else if (what == "osd_map_tree") {
172 osd_map.print_tree(&f, nullptr);
173 } else if (what == "osd_map_crush") {
174 osd_map.crush->dump(&f);
175 }
176 });
177 return f.get();
178 } else if (what == "config") {
179 PyFormatter f;
180 g_conf->show_config(&f);
181 return f.get();
182 } else if (what == "mon_map") {
183 PyFormatter f;
184 cluster_state.with_monmap(
185 [&f](const MonMap &monmap) {
186 monmap.dump(&f);
187 }
188 );
189 return f.get();
190 } else if (what == "service_map") {
191 PyFormatter f;
192 cluster_state.with_servicemap(
193 [&f](const ServiceMap &service_map) {
194 service_map.dump(&f);
195 }
196 );
197 return f.get();
198 } else if (what == "osd_metadata") {
199 PyFormatter f;
200 auto dmc = daemon_state.get_by_service("osd");
201 for (const auto &i : dmc) {
202 f.open_object_section(i.first.second.c_str());
203 f.dump_string("hostname", i.second->hostname);
204 for (const auto &j : i.second->metadata) {
205 f.dump_string(j.first.c_str(), j.second);
206 }
207 f.close_section();
208 }
209 return f.get();
210 } else if (what == "pg_summary") {
211 PyFormatter f;
212 cluster_state.with_pgmap(
213 [&f](const PGMap &pg_map) {
214 std::map<std::string, std::map<std::string, uint32_t> > osds;
215 std::map<std::string, std::map<std::string, uint32_t> > pools;
216 std::map<std::string, uint32_t> all;
217 for (const auto &i : pg_map.pg_stat) {
218 const auto pool = i.first.m_pool;
219 const std::string state = pg_state_string(i.second.state);
220 // Insert to per-pool map
221 pools[stringify(pool)][state]++;
222 for (const auto &osd_id : i.second.acting) {
223 osds[stringify(osd_id)][state]++;
224 }
225 all[state]++;
226 }
227 f.open_object_section("by_osd");
228 for (const auto &i : osds) {
229 f.open_object_section(i.first.c_str());
230 for (const auto &j : i.second) {
231 f.dump_int(j.first.c_str(), j.second);
232 }
233 f.close_section();
234 }
235 f.close_section();
236 f.open_object_section("by_pool");
237 for (const auto &i : pools) {
238 f.open_object_section(i.first.c_str());
239 for (const auto &j : i.second) {
240 f.dump_int(j.first.c_str(), j.second);
241 }
242 f.close_section();
243 }
244 f.close_section();
245 f.open_object_section("all");
246 for (const auto &i : all) {
247 f.dump_int(i.first.c_str(), i.second);
248 }
249 f.close_section();
250 }
251 );
252 return f.get();
253
254 } else if (what == "df") {
255 PyFormatter f;
256
257 cluster_state.with_osdmap([this, &f](const OSDMap &osd_map){
258 cluster_state.with_pgmap(
259 [&osd_map, &f](const PGMap &pg_map) {
260 pg_map.dump_fs_stats(nullptr, &f, true);
261 pg_map.dump_pool_stats_full(osd_map, nullptr, &f, true);
262 });
263 });
264 return f.get();
265 } else if (what == "osd_stats") {
266 PyFormatter f;
267 cluster_state.with_pgmap(
268 [&f](const PGMap &pg_map) {
269 pg_map.dump_osd_stats(&f);
270 });
271 return f.get();
272 } else if (what == "health" || what == "mon_status") {
273 PyFormatter f;
274 bufferlist json;
275 if (what == "health") {
276 json = cluster_state.get_health();
277 } else if (what == "mon_status") {
278 json = cluster_state.get_mon_status();
279 } else {
280 assert(false);
281 }
282 f.dump_string("json", json.to_str());
283 return f.get();
284 } else if (what == "mgr_map") {
285 PyFormatter f;
286 cluster_state.with_mgrmap([&f](const MgrMap &mgr_map) {
287 mgr_map.dump(&f);
288 });
289 return f.get();
290 } else {
291 derr << "Python module requested unknown data '" << what << "'" << dendl;
292 Py_RETURN_NONE;
293 }
294 }
295
296 std::string PyModules::get_site_packages()
297 {
298 std::stringstream site_packages;
299
300 // CPython doesn't auto-add site-packages dirs to sys.path for us,
301 // but it does provide a module that we can ask for them.
302 auto site_module = PyImport_ImportModule("site");
303 assert(site_module);
304
305 auto site_packages_fn = PyObject_GetAttrString(site_module, "getsitepackages");
306 if (site_packages_fn != nullptr) {
307 auto site_packages_list = PyObject_CallObject(site_packages_fn, nullptr);
308 assert(site_packages_list);
309
310 auto n = PyList_Size(site_packages_list);
311 for (Py_ssize_t i = 0; i < n; ++i) {
312 if (i != 0) {
313 site_packages << ":";
314 }
315 site_packages << PyString_AsString(PyList_GetItem(site_packages_list, i));
316 }
317
318 Py_DECREF(site_packages_list);
319 Py_DECREF(site_packages_fn);
320 } else {
321 // Fall back to generating our own site-packages paths by imitating
322 // what the standard site.py does. This is annoying but it lets us
323 // run inside virtualenvs :-/
324
325 auto site_packages_fn = PyObject_GetAttrString(site_module, "addsitepackages");
326 assert(site_packages_fn);
327
328 auto known_paths = PySet_New(nullptr);
329 auto pArgs = PyTuple_Pack(1, known_paths);
330 PyObject_CallObject(site_packages_fn, pArgs);
331 Py_DECREF(pArgs);
332 Py_DECREF(known_paths);
333 Py_DECREF(site_packages_fn);
334
335 auto sys_module = PyImport_ImportModule("sys");
336 assert(sys_module);
337 auto sys_path = PyObject_GetAttrString(sys_module, "path");
338 assert(sys_path);
339
340 dout(1) << "sys.path:" << dendl;
341 auto n = PyList_Size(sys_path);
342 bool first = true;
343 for (Py_ssize_t i = 0; i < n; ++i) {
344 dout(1) << " " << PyString_AsString(PyList_GetItem(sys_path, i)) << dendl;
345 if (first) {
346 first = false;
347 } else {
348 site_packages << ":";
349 }
350 site_packages << PyString_AsString(PyList_GetItem(sys_path, i));
351 }
352
353 Py_DECREF(sys_path);
354 Py_DECREF(sys_module);
355 }
356
357 Py_DECREF(site_module);
358
359 return site_packages.str();
360 }
361
362
363 int PyModules::init()
364 {
365 Mutex::Locker locker(lock);
366
367 global_handle = this;
368 // namespace in config-key prefixed by "mgr/"
369 config_prefix = std::string(g_conf->name.get_type_str()) + "/";
370
371 // Set up global python interpreter
372 Py_SetProgramName(const_cast<char*>(PYTHON_EXECUTABLE));
373 Py_InitializeEx(0);
374
375 // Let CPython know that we will be calling it back from other
376 // threads in future.
377 if (! PyEval_ThreadsInitialized()) {
378 PyEval_InitThreads();
379 }
380
381 // Configure sys.path to include mgr_module_path
382 std::string sys_path = std::string(Py_GetPath()) + ":" + get_site_packages()
383 + ":" + g_conf->mgr_module_path;
384 dout(10) << "Computed sys.path '" << sys_path << "'" << dendl;
385
386 // Drop the GIL and remember the main thread state (current
387 // thread state becomes NULL)
388 pMainThreadState = PyEval_SaveThread();
389
390 std::list<std::string> failed_modules;
391
392 // Load python code
393 set<string> ls;
394 cluster_state.with_mgrmap([&](const MgrMap& m) {
395 ls = m.modules;
396 });
397 for (const auto& module_name : ls) {
398 dout(1) << "Loading python module '" << module_name << "'" << dendl;
399 auto mod = std::unique_ptr<MgrPyModule>(new MgrPyModule(module_name, sys_path, pMainThreadState));
400 int r = mod->load();
401 if (r != 0) {
402 // Don't use handle_pyerror() here; we don't have the GIL
403 // or the right thread state (this is deliberate).
404 derr << "Error loading module '" << module_name << "': "
405 << cpp_strerror(r) << dendl;
406 failed_modules.push_back(module_name);
407 // Don't drop out here, load the other modules
408 } else {
409 // Success!
410 modules[module_name] = std::move(mod);
411 }
412 }
413
414 if (!failed_modules.empty()) {
415 clog->error() << "Failed to load ceph-mgr modules: " << joinify(
416 failed_modules.begin(), failed_modules.end(), std::string(", "));
417 }
418
419 return 0;
420 }
421
422 class ServeThread : public Thread
423 {
424 MgrPyModule *mod;
425
426 public:
427 bool running;
428
429 ServeThread(MgrPyModule *mod_)
430 : mod(mod_) {}
431
432 void *entry() override
433 {
434 running = true;
435
436 // No need to acquire the GIL here; the module does it.
437 dout(4) << "Entering thread for " << mod->get_name() << dendl;
438 mod->serve();
439
440 running = false;
441 return nullptr;
442 }
443 };
444
445 void PyModules::start()
446 {
447 Mutex::Locker l(lock);
448
449 dout(1) << "Creating threads for " << modules.size() << " modules" << dendl;
450 for (auto& i : modules) {
451 auto thread = new ServeThread(i.second.get());
452 serve_threads[i.first].reset(thread);
453 }
454
455 for (auto &i : serve_threads) {
456 std::ostringstream thread_name;
457 thread_name << "mgr." << i.first;
458 dout(4) << "Starting thread for " << i.first << dendl;
459 i.second->create(thread_name.str().c_str());
460 }
461 }
462
463 void PyModules::shutdown()
464 {
465 Mutex::Locker locker(lock);
466 assert(global_handle);
467
468 // Signal modules to drop out of serve() and/or tear down resources
469 for (auto &i : modules) {
470 auto module = i.second.get();
471 const auto& name = i.first;
472 dout(10) << "waiting for module " << name << " to shutdown" << dendl;
473 lock.Unlock();
474 module->shutdown();
475 lock.Lock();
476 dout(10) << "module " << name << " shutdown" << dendl;
477 }
478
479 // For modules implementing serve(), finish the threads where we
480 // were running that.
481 for (auto &i : serve_threads) {
482 lock.Unlock();
483 i.second->join();
484 lock.Lock();
485 }
486 serve_threads.clear();
487
488 modules.clear();
489
490 PyEval_RestoreThread(pMainThreadState);
491 Py_Finalize();
492
493 // nobody needs me anymore.
494 global_handle = nullptr;
495 }
496
497 void PyModules::notify_all(const std::string &notify_type,
498 const std::string &notify_id)
499 {
500 Mutex::Locker l(lock);
501
502 dout(10) << __func__ << ": notify_all " << notify_type << dendl;
503 for (auto& i : modules) {
504 auto module = i.second.get();
505 if (!serve_threads[i.first]->running)
506 continue;
507 // Send all python calls down a Finisher to avoid blocking
508 // C++ code, and avoid any potential lock cycles.
509 finisher.queue(new FunctionContext([module, notify_type, notify_id](int r){
510 module->notify(notify_type, notify_id);
511 }));
512 }
513 }
514
515 void PyModules::notify_all(const LogEntry &log_entry)
516 {
517 Mutex::Locker l(lock);
518
519 dout(10) << __func__ << ": notify_all (clog)" << dendl;
520 for (auto& i : modules) {
521 auto module = i.second.get();
522 if (!serve_threads[i.first]->running)
523 continue;
524 // Send all python calls down a Finisher to avoid blocking
525 // C++ code, and avoid any potential lock cycles.
526 //
527 // Note intentional use of non-reference lambda binding on
528 // log_entry: we take a copy because caller's instance is
529 // probably ephemeral.
530 finisher.queue(new FunctionContext([module, log_entry](int r){
531 module->notify_clog(log_entry);
532 }));
533 }
534 }
535
536 bool PyModules::get_config(const std::string &handle,
537 const std::string &key, std::string *val) const
538 {
539 PyThreadState *tstate = PyEval_SaveThread();
540 Mutex::Locker l(lock);
541 PyEval_RestoreThread(tstate);
542
543 const std::string global_key = config_prefix + handle + "/" + key;
544
545 dout(4) << __func__ << "key: " << global_key << dendl;
546
547 if (config_cache.count(global_key)) {
548 *val = config_cache.at(global_key);
549 return true;
550 } else {
551 return false;
552 }
553 }
554
555 PyObject *PyModules::get_config_prefix(const std::string &handle,
556 const std::string &prefix) const
557 {
558 PyThreadState *tstate = PyEval_SaveThread();
559 Mutex::Locker l(lock);
560 PyEval_RestoreThread(tstate);
561
562 const std::string base_prefix = config_prefix + handle + "/";
563 const std::string global_prefix = base_prefix + prefix;
564 dout(4) << __func__ << "prefix: " << global_prefix << dendl;
565
566 PyFormatter f;
567 for (auto p = config_cache.lower_bound(global_prefix);
568 p != config_cache.end() && p->first.find(global_prefix) == 0;
569 ++p) {
570 f.dump_string(p->first.c_str() + base_prefix.size(), p->second);
571 }
572 return f.get();
573 }
574
575 void PyModules::set_config(const std::string &handle,
576 const std::string &key, const std::string &val)
577 {
578 const std::string global_key = config_prefix + handle + "/" + key;
579
580 Command set_cmd;
581 {
582 PyThreadState *tstate = PyEval_SaveThread();
583 Mutex::Locker l(lock);
584 PyEval_RestoreThread(tstate);
585 config_cache[global_key] = val;
586
587 std::ostringstream cmd_json;
588
589 JSONFormatter jf;
590 jf.open_object_section("cmd");
591 jf.dump_string("prefix", "config-key put");
592 jf.dump_string("key", global_key);
593 jf.dump_string("val", val);
594 jf.close_section();
595 jf.flush(cmd_json);
596
597 set_cmd.run(&monc, cmd_json.str());
598 }
599 set_cmd.wait();
600
601 if (set_cmd.r != 0) {
602 // config-key put will fail if mgr's auth key has insufficient
603 // permission to set config keys
604 // FIXME: should this somehow raise an exception back into Python land?
605 dout(0) << "`config-key put " << global_key << " " << val << "` failed: "
606 << cpp_strerror(set_cmd.r) << dendl;
607 dout(0) << "mon returned " << set_cmd.r << ": " << set_cmd.outs << dendl;
608 }
609 }
610
611 std::vector<ModuleCommand> PyModules::get_commands()
612 {
613 Mutex::Locker l(lock);
614
615 std::vector<ModuleCommand> result;
616 for (auto& i : modules) {
617 auto module = i.second.get();
618 auto mod_commands = module->get_commands();
619 for (auto j : mod_commands) {
620 result.push_back(j);
621 }
622 }
623
624 return result;
625 }
626
627 void PyModules::insert_config(const std::map<std::string,
628 std::string> &new_config)
629 {
630 Mutex::Locker l(lock);
631
632 dout(4) << "Loaded " << new_config.size() << " config settings" << dendl;
633 config_cache = new_config;
634 }
635
636 void PyModules::log(const std::string &handle,
637 int level, const std::string &record)
638 {
639 #undef dout_prefix
640 #define dout_prefix *_dout << "mgr[" << handle << "] "
641 dout(level) << record << dendl;
642 #undef dout_prefix
643 #define dout_prefix *_dout << "mgr " << __func__ << " "
644 }
645
646 PyObject* PyModules::get_counter_python(
647 const std::string &handle,
648 const std::string &svc_name,
649 const std::string &svc_id,
650 const std::string &path)
651 {
652 PyThreadState *tstate = PyEval_SaveThread();
653 Mutex::Locker l(lock);
654 PyEval_RestoreThread(tstate);
655
656 PyFormatter f;
657 f.open_array_section(path.c_str());
658
659 auto metadata = daemon_state.get(DaemonKey(svc_name, svc_id));
660
661 // FIXME: this is unsafe, I need to either be inside DaemonStateIndex's
662 // lock or put a lock on individual DaemonStates
663 if (metadata) {
664 if (metadata->perf_counters.instances.count(path)) {
665 auto counter_instance = metadata->perf_counters.instances.at(path);
666 const auto &data = counter_instance.get_data();
667 for (const auto &datapoint : data) {
668 f.open_array_section("datapoint");
669 f.dump_unsigned("t", datapoint.t.sec());
670 f.dump_unsigned("v", datapoint.v);
671 f.close_section();
672
673 }
674 } else {
675 dout(4) << "Missing counter: '" << path << "' ("
676 << svc_name << "." << svc_id << ")" << dendl;
677 dout(20) << "Paths are:" << dendl;
678 for (const auto &i : metadata->perf_counters.instances) {
679 dout(20) << i.first << dendl;
680 }
681 }
682 } else {
683 dout(4) << "No daemon state for "
684 << svc_name << "." << svc_id << ")" << dendl;
685 }
686 f.close_section();
687 return f.get();
688 }
689
690 PyObject *PyModules::get_context()
691 {
692 PyThreadState *tstate = PyEval_SaveThread();
693 Mutex::Locker l(lock);
694 PyEval_RestoreThread(tstate);
695
696 // Construct a capsule containing ceph context.
697 // Not incrementing/decrementing ref count on the context because
698 // it's the global one and it has process lifetime.
699 auto capsule = PyCapsule_New(g_ceph_context, nullptr, nullptr);
700 return capsule;
701 }
702
703 static void _list_modules(
704 const std::string path,
705 std::set<std::string> *modules)
706 {
707 DIR *dir = opendir(path.c_str());
708 if (!dir) {
709 return;
710 }
711 struct dirent *entry = NULL;
712 while ((entry = readdir(dir)) != NULL) {
713 string n(entry->d_name);
714 string fn = path + "/" + n;
715 struct stat st;
716 int r = ::stat(fn.c_str(), &st);
717 if (r == 0 && S_ISDIR(st.st_mode)) {
718 string initfn = fn + "/module.py";
719 r = ::stat(initfn.c_str(), &st);
720 if (r == 0) {
721 modules->insert(n);
722 }
723 }
724 }
725 closedir(dir);
726 }
727
728 void PyModules::list_modules(std::set<std::string> *modules)
729 {
730 _list_modules(g_conf->mgr_module_path, modules);
731 }