]> git.proxmox.com Git - ceph.git/blob - ceph/src/mgr/PyModules.cc
988e16ba608039f1ec15cbed311da129f40e5f1b
[ceph.git] / ceph / src / mgr / PyModules.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2014 John Spray <john.spray@inktank.com>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 */
13
14 // Include this first to get python headers earlier
15 #include "PyState.h"
16 #include "Gil.h"
17
18 #include <boost/tokenizer.hpp>
19 #include "common/errno.h"
20 #include "include/stringify.h"
21
22 #include "PyFormatter.h"
23
24 #include "osd/OSDMap.h"
25 #include "mon/MonMap.h"
26
27 #include "mgr/MgrContext.h"
28
29 #include "PyModules.h"
30
31 #define dout_context g_ceph_context
32 #define dout_subsys ceph_subsys_mgr
33 #undef dout_prefix
34 #define dout_prefix *_dout << "mgr " << __func__ << " "
35
36 // definition for non-const static member
37 std::string PyModules::config_prefix;
38
39 // constructor/destructor implementations cannot be in .h,
40 // because ServeThread is still an "incomplete" type there
41
42 PyModules::PyModules(DaemonStateIndex &ds, ClusterState &cs,
43 MonClient &mc, Objecter &objecter_, Client &client_,
44 Finisher &f)
45 : daemon_state(ds), cluster_state(cs), monc(mc),
46 objecter(objecter_), client(client_), finisher(f),
47 lock("PyModules")
48 {}
49
50 PyModules::~PyModules() = default;
51
52 void PyModules::dump_server(const std::string &hostname,
53 const DaemonStateCollection &dmc,
54 Formatter *f)
55 {
56 f->dump_string("hostname", hostname);
57 f->open_array_section("services");
58 std::string ceph_version;
59
60 for (const auto &i : dmc) {
61 const auto &key = i.first;
62 const std::string str_type = ceph_entity_type_name(key.first);
63 const std::string &svc_name = key.second;
64
65 // TODO: pick the highest version, and make sure that
66 // somewhere else (during health reporting?) we are
67 // indicating to the user if we see mixed versions
68 auto ver_iter = i.second->metadata.find("ceph_version");
69 if (ver_iter != i.second->metadata.end()) {
70 ceph_version = i.second->metadata.at("ceph_version");
71 }
72
73 f->open_object_section("service");
74 f->dump_string("type", str_type);
75 f->dump_string("id", svc_name);
76 f->close_section();
77 }
78 f->close_section();
79
80 f->dump_string("ceph_version", ceph_version);
81 }
82
83
84
85 PyObject *PyModules::get_server_python(const std::string &hostname)
86 {
87 PyThreadState *tstate = PyEval_SaveThread();
88 Mutex::Locker l(lock);
89 PyEval_RestoreThread(tstate);
90 dout(10) << " (" << hostname << ")" << dendl;
91
92 auto dmc = daemon_state.get_by_server(hostname);
93
94 PyFormatter f;
95 dump_server(hostname, dmc, &f);
96 return f.get();
97 }
98
99
100 PyObject *PyModules::list_servers_python()
101 {
102 PyThreadState *tstate = PyEval_SaveThread();
103 Mutex::Locker l(lock);
104 PyEval_RestoreThread(tstate);
105 dout(10) << " >" << dendl;
106
107 PyFormatter f(false, true);
108 const auto &all = daemon_state.get_all_servers();
109 for (const auto &i : all) {
110 const auto &hostname = i.first;
111
112 f.open_object_section("server");
113 dump_server(hostname, i.second, &f);
114 f.close_section();
115 }
116
117 return f.get();
118 }
119
120 PyObject *PyModules::get_metadata_python(std::string const &handle,
121 entity_type_t svc_type, const std::string &svc_id)
122 {
123 auto metadata = daemon_state.get(DaemonKey(svc_type, svc_id));
124 PyFormatter f;
125 f.dump_string("hostname", metadata->hostname);
126 for (const auto &i : metadata->metadata) {
127 f.dump_string(i.first.c_str(), i.second);
128 }
129
130 return f.get();
131 }
132
133
134 PyObject *PyModules::get_python(const std::string &what)
135 {
136 PyThreadState *tstate = PyEval_SaveThread();
137 Mutex::Locker l(lock);
138 PyEval_RestoreThread(tstate);
139
140 if (what == "fs_map") {
141 PyFormatter f;
142 cluster_state.with_fsmap([&f](const FSMap &fsmap) {
143 fsmap.dump(&f);
144 });
145 return f.get();
146 } else if (what == "osdmap_crush_map_text") {
147 bufferlist rdata;
148 cluster_state.with_osdmap([&rdata](const OSDMap &osd_map){
149 osd_map.crush->encode(rdata, CEPH_FEATURES_SUPPORTED_DEFAULT);
150 });
151 std::string crush_text = rdata.to_str();
152 return PyString_FromString(crush_text.c_str());
153 } else if (what.substr(0, 7) == "osd_map") {
154 PyFormatter f;
155 cluster_state.with_osdmap([&f, &what](const OSDMap &osd_map){
156 if (what == "osd_map") {
157 osd_map.dump(&f);
158 } else if (what == "osd_map_tree") {
159 osd_map.print_tree(&f, nullptr);
160 } else if (what == "osd_map_crush") {
161 osd_map.crush->dump(&f);
162 }
163 });
164 return f.get();
165 } else if (what == "config") {
166 PyFormatter f;
167 g_conf->show_config(&f);
168 return f.get();
169 } else if (what == "mon_map") {
170 PyFormatter f;
171 cluster_state.with_monmap(
172 [&f](const MonMap &monmap) {
173 monmap.dump(&f);
174 }
175 );
176 return f.get();
177 } else if (what == "osd_metadata") {
178 PyFormatter f;
179 auto dmc = daemon_state.get_by_type(CEPH_ENTITY_TYPE_OSD);
180 for (const auto &i : dmc) {
181 f.open_object_section(i.first.second.c_str());
182 f.dump_string("hostname", i.second->hostname);
183 for (const auto &j : i.second->metadata) {
184 f.dump_string(j.first.c_str(), j.second);
185 }
186 f.close_section();
187 }
188 return f.get();
189 } else if (what == "pg_summary") {
190 PyFormatter f;
191 cluster_state.with_pgmap(
192 [&f](const PGMap &pg_map) {
193 std::map<std::string, std::map<std::string, uint32_t> > osds;
194 std::map<std::string, std::map<std::string, uint32_t> > pools;
195 std::map<std::string, uint32_t> all;
196 for (const auto &i : pg_map.pg_stat) {
197 const auto pool = i.first.m_pool;
198 const std::string state = pg_state_string(i.second.state);
199 // Insert to per-pool map
200 pools[stringify(pool)][state]++;
201 for (const auto &osd_id : i.second.acting) {
202 osds[stringify(osd_id)][state]++;
203 }
204 all[state]++;
205 }
206 f.open_object_section("by_osd");
207 for (const auto &i : osds) {
208 f.open_object_section(i.first.c_str());
209 for (const auto &j : i.second) {
210 f.dump_int(j.first.c_str(), j.second);
211 }
212 f.close_section();
213 }
214 f.close_section();
215 f.open_object_section("by_pool");
216 for (const auto &i : pools) {
217 f.open_object_section(i.first.c_str());
218 for (const auto &j : i.second) {
219 f.dump_int(j.first.c_str(), j.second);
220 }
221 f.close_section();
222 }
223 f.close_section();
224 f.open_object_section("all");
225 for (const auto &i : all) {
226 f.dump_int(i.first.c_str(), i.second);
227 }
228 f.close_section();
229 }
230 );
231 return f.get();
232
233 } else if (what == "df") {
234 PyFormatter f;
235
236 cluster_state.with_osdmap([this, &f](const OSDMap &osd_map){
237 cluster_state.with_pgmap(
238 [&osd_map, &f](const PGMap &pg_map) {
239 pg_map.dump_fs_stats(nullptr, &f, true);
240 pg_map.dump_pool_stats_full(osd_map, nullptr, &f, true);
241 });
242 });
243 return f.get();
244 } else if (what == "osd_stats") {
245 PyFormatter f;
246 cluster_state.with_pgmap(
247 [&f](const PGMap &pg_map) {
248 pg_map.dump_osd_stats(&f);
249 });
250 return f.get();
251 } else if (what == "health" || what == "mon_status") {
252 PyFormatter f;
253 bufferlist json;
254 if (what == "health") {
255 json = cluster_state.get_health();
256 } else if (what == "mon_status") {
257 json = cluster_state.get_mon_status();
258 } else {
259 assert(false);
260 }
261 f.dump_string("json", json.to_str());
262 return f.get();
263 } else {
264 derr << "Python module requested unknown data '" << what << "'" << dendl;
265 Py_RETURN_NONE;
266 }
267 }
268
269 std::string PyModules::get_site_packages()
270 {
271 std::stringstream site_packages;
272
273 // CPython doesn't auto-add site-packages dirs to sys.path for us,
274 // but it does provide a module that we can ask for them.
275 auto site_module = PyImport_ImportModule("site");
276 assert(site_module);
277
278 auto site_packages_fn = PyObject_GetAttrString(site_module, "getsitepackages");
279 if (site_packages_fn != nullptr) {
280 auto site_packages_list = PyObject_CallObject(site_packages_fn, nullptr);
281 assert(site_packages_list);
282
283 auto n = PyList_Size(site_packages_list);
284 for (Py_ssize_t i = 0; i < n; ++i) {
285 if (i != 0) {
286 site_packages << ":";
287 }
288 site_packages << PyString_AsString(PyList_GetItem(site_packages_list, i));
289 }
290
291 Py_DECREF(site_packages_list);
292 Py_DECREF(site_packages_fn);
293 } else {
294 // Fall back to generating our own site-packages paths by imitating
295 // what the standard site.py does. This is annoying but it lets us
296 // run inside virtualenvs :-/
297
298 auto site_packages_fn = PyObject_GetAttrString(site_module, "addsitepackages");
299 assert(site_packages_fn);
300
301 auto known_paths = PySet_New(nullptr);
302 auto pArgs = PyTuple_Pack(1, known_paths);
303 PyObject_CallObject(site_packages_fn, pArgs);
304 Py_DECREF(pArgs);
305 Py_DECREF(known_paths);
306 Py_DECREF(site_packages_fn);
307
308 auto sys_module = PyImport_ImportModule("sys");
309 assert(sys_module);
310 auto sys_path = PyObject_GetAttrString(sys_module, "path");
311 assert(sys_path);
312
313 dout(1) << "sys.path:" << dendl;
314 auto n = PyList_Size(sys_path);
315 bool first = true;
316 for (Py_ssize_t i = 0; i < n; ++i) {
317 dout(1) << " " << PyString_AsString(PyList_GetItem(sys_path, i)) << dendl;
318 if (first) {
319 first = false;
320 } else {
321 site_packages << ":";
322 }
323 site_packages << PyString_AsString(PyList_GetItem(sys_path, i));
324 }
325
326 Py_DECREF(sys_path);
327 Py_DECREF(sys_module);
328 }
329
330 Py_DECREF(site_module);
331
332 return site_packages.str();
333 }
334
335
336 int PyModules::init()
337 {
338 Mutex::Locker locker(lock);
339
340 global_handle = this;
341 // namespace in config-key prefixed by "mgr/"
342 config_prefix = std::string(g_conf->name.get_type_str()) + "/";
343
344 // Set up global python interpreter
345 Py_SetProgramName(const_cast<char*>(PYTHON_EXECUTABLE));
346 Py_InitializeEx(0);
347
348 // Let CPython know that we will be calling it back from other
349 // threads in future.
350 if (! PyEval_ThreadsInitialized()) {
351 PyEval_InitThreads();
352 }
353
354 // Configure sys.path to include mgr_module_path
355 std::string sys_path = std::string(Py_GetPath()) + ":" + get_site_packages()
356 + ":" + g_conf->mgr_module_path;
357 dout(10) << "Computed sys.path '" << sys_path << "'" << dendl;
358
359 // Drop the GIL and remember the main thread state (current
360 // thread state becomes NULL)
361 pMainThreadState = PyEval_SaveThread();
362
363 // Load python code
364 boost::tokenizer<> tok(g_conf->mgr_modules);
365 for(const auto& module_name : tok) {
366 dout(1) << "Loading python module '" << module_name << "'" << dendl;
367 auto mod = std::unique_ptr<MgrPyModule>(new MgrPyModule(module_name, sys_path, pMainThreadState));
368 int r = mod->load();
369 if (r != 0) {
370 // Don't use handle_pyerror() here; we don't have the GIL
371 // or the right thread state (this is deliberate).
372 derr << "Error loading module '" << module_name << "': "
373 << cpp_strerror(r) << dendl;
374 // Don't drop out here, load the other modules
375 } else {
376 // Success!
377 modules[module_name] = std::move(mod);
378 }
379 }
380
381 return 0;
382 }
383
384 class ServeThread : public Thread
385 {
386 MgrPyModule *mod;
387
388 public:
389 bool running;
390
391 ServeThread(MgrPyModule *mod_)
392 : mod(mod_) {}
393
394 void *entry() override
395 {
396 running = true;
397
398 // No need to acquire the GIL here; the module does it.
399 dout(4) << "Entering thread for " << mod->get_name() << dendl;
400 mod->serve();
401
402 running = false;
403 return nullptr;
404 }
405 };
406
407 void PyModules::start()
408 {
409 Mutex::Locker l(lock);
410
411 dout(1) << "Creating threads for " << modules.size() << " modules" << dendl;
412 for (auto& i : modules) {
413 auto thread = new ServeThread(i.second.get());
414 serve_threads[i.first].reset(thread);
415 }
416
417 for (auto &i : serve_threads) {
418 std::ostringstream thread_name;
419 thread_name << "mgr." << i.first;
420 dout(4) << "Starting thread for " << i.first << dendl;
421 i.second->create(thread_name.str().c_str());
422 }
423 }
424
425 void PyModules::shutdown()
426 {
427 Mutex::Locker locker(lock);
428 assert(global_handle);
429
430 // Signal modules to drop out of serve() and/or tear down resources
431 for (auto &i : modules) {
432 auto module = i.second.get();
433 const auto& name = i.first;
434 dout(10) << "waiting for module " << name << " to shutdown" << dendl;
435 lock.Unlock();
436 module->shutdown();
437 lock.Lock();
438 dout(10) << "module " << name << " shutdown" << dendl;
439 }
440
441 // For modules implementing serve(), finish the threads where we
442 // were running that.
443 for (auto &i : serve_threads) {
444 lock.Unlock();
445 i.second->join();
446 lock.Lock();
447 }
448 serve_threads.clear();
449
450 modules.clear();
451
452 PyEval_RestoreThread(pMainThreadState);
453 Py_Finalize();
454
455 // nobody needs me anymore.
456 global_handle = nullptr;
457 }
458
459 void PyModules::notify_all(const std::string &notify_type,
460 const std::string &notify_id)
461 {
462 Mutex::Locker l(lock);
463
464 dout(10) << __func__ << ": notify_all " << notify_type << dendl;
465 for (auto& i : modules) {
466 auto module = i.second.get();
467 if (!serve_threads[i.first]->running)
468 continue;
469 // Send all python calls down a Finisher to avoid blocking
470 // C++ code, and avoid any potential lock cycles.
471 finisher.queue(new FunctionContext([module, notify_type, notify_id](int r){
472 module->notify(notify_type, notify_id);
473 }));
474 }
475 }
476
477 void PyModules::notify_all(const LogEntry &log_entry)
478 {
479 Mutex::Locker l(lock);
480
481 dout(10) << __func__ << ": notify_all (clog)" << dendl;
482 for (auto& i : modules) {
483 auto module = i.second.get();
484 if (!serve_threads[i.first]->running)
485 continue;
486 // Send all python calls down a Finisher to avoid blocking
487 // C++ code, and avoid any potential lock cycles.
488 //
489 // Note intentional use of non-reference lambda binding on
490 // log_entry: we take a copy because caller's instance is
491 // probably ephemeral.
492 finisher.queue(new FunctionContext([module, log_entry](int r){
493 module->notify_clog(log_entry);
494 }));
495 }
496 }
497
498 bool PyModules::get_config(const std::string &handle,
499 const std::string &key, std::string *val) const
500 {
501 PyThreadState *tstate = PyEval_SaveThread();
502 Mutex::Locker l(lock);
503 PyEval_RestoreThread(tstate);
504
505 const std::string global_key = config_prefix + handle + "/" + key;
506
507 dout(4) << __func__ << "key: " << global_key << dendl;
508
509 if (config_cache.count(global_key)) {
510 *val = config_cache.at(global_key);
511 return true;
512 } else {
513 return false;
514 }
515 }
516
517 PyObject *PyModules::get_config_prefix(const std::string &handle,
518 const std::string &prefix) const
519 {
520 PyThreadState *tstate = PyEval_SaveThread();
521 Mutex::Locker l(lock);
522 PyEval_RestoreThread(tstate);
523
524 const std::string base_prefix = config_prefix + handle + "/";
525 const std::string global_prefix = base_prefix + prefix;
526 dout(4) << __func__ << "prefix: " << global_prefix << dendl;
527
528 PyFormatter f;
529 for (auto p = config_cache.lower_bound(global_prefix);
530 p != config_cache.end() && p->first.find(global_prefix) == 0;
531 ++p) {
532 f.dump_string(p->first.c_str() + base_prefix.size(), p->second);
533 }
534 return f.get();
535 }
536
537 void PyModules::set_config(const std::string &handle,
538 const std::string &key, const std::string &val)
539 {
540 const std::string global_key = config_prefix + handle + "/" + key;
541
542 Command set_cmd;
543 {
544 PyThreadState *tstate = PyEval_SaveThread();
545 Mutex::Locker l(lock);
546 PyEval_RestoreThread(tstate);
547 config_cache[global_key] = val;
548
549 std::ostringstream cmd_json;
550
551 JSONFormatter jf;
552 jf.open_object_section("cmd");
553 jf.dump_string("prefix", "config-key put");
554 jf.dump_string("key", global_key);
555 jf.dump_string("val", val);
556 jf.close_section();
557 jf.flush(cmd_json);
558
559 set_cmd.run(&monc, cmd_json.str());
560 }
561 set_cmd.wait();
562
563 if (set_cmd.r != 0) {
564 // config-key put will fail if mgr's auth key has insufficient
565 // permission to set config keys
566 // FIXME: should this somehow raise an exception back into Python land?
567 dout(0) << "`config-key put " << global_key << " " << val << "` failed: "
568 << cpp_strerror(set_cmd.r) << dendl;
569 dout(0) << "mon returned " << set_cmd.r << ": " << set_cmd.outs << dendl;
570 }
571 }
572
573 std::vector<ModuleCommand> PyModules::get_commands()
574 {
575 Mutex::Locker l(lock);
576
577 std::vector<ModuleCommand> result;
578 for (auto& i : modules) {
579 auto module = i.second.get();
580 auto mod_commands = module->get_commands();
581 for (auto j : mod_commands) {
582 result.push_back(j);
583 }
584 }
585
586 return result;
587 }
588
589 void PyModules::insert_config(const std::map<std::string,
590 std::string> &new_config)
591 {
592 Mutex::Locker l(lock);
593
594 dout(4) << "Loaded " << new_config.size() << " config settings" << dendl;
595 config_cache = new_config;
596 }
597
598 void PyModules::log(const std::string &handle,
599 int level, const std::string &record)
600 {
601 #undef dout_prefix
602 #define dout_prefix *_dout << "mgr[" << handle << "] "
603 dout(level) << record << dendl;
604 #undef dout_prefix
605 #define dout_prefix *_dout << "mgr " << __func__ << " "
606 }
607
608 PyObject* PyModules::get_counter_python(
609 const std::string &handle,
610 entity_type_t svc_type,
611 const std::string &svc_id,
612 const std::string &path)
613 {
614 PyThreadState *tstate = PyEval_SaveThread();
615 Mutex::Locker l(lock);
616 PyEval_RestoreThread(tstate);
617
618 PyFormatter f;
619 f.open_array_section(path.c_str());
620
621 auto metadata = daemon_state.get(DaemonKey(svc_type, svc_id));
622
623 // FIXME: this is unsafe, I need to either be inside DaemonStateIndex's
624 // lock or put a lock on individual DaemonStates
625 if (metadata) {
626 if (metadata->perf_counters.instances.count(path)) {
627 auto counter_instance = metadata->perf_counters.instances.at(path);
628 const auto &data = counter_instance.get_data();
629 for (const auto &datapoint : data) {
630 f.open_array_section("datapoint");
631 f.dump_unsigned("t", datapoint.t.sec());
632 f.dump_unsigned("v", datapoint.v);
633 f.close_section();
634
635 }
636 } else {
637 dout(4) << "Missing counter: '" << path << "' ("
638 << ceph_entity_type_name(svc_type) << "."
639 << svc_id << ")" << dendl;
640 dout(20) << "Paths are:" << dendl;
641 for (const auto &i : metadata->perf_counters.instances) {
642 dout(20) << i.first << dendl;
643 }
644 }
645 } else {
646 dout(4) << "No daemon state for "
647 << ceph_entity_type_name(svc_type) << "."
648 << svc_id << ")" << dendl;
649 }
650 f.close_section();
651 return f.get();
652 }
653
654 PyObject *PyModules::get_context()
655 {
656 PyThreadState *tstate = PyEval_SaveThread();
657 Mutex::Locker l(lock);
658 PyEval_RestoreThread(tstate);
659
660 // Construct a capsule containing ceph context.
661 // Not incrementing/decrementing ref count on the context because
662 // it's the global one and it has process lifetime.
663 auto capsule = PyCapsule_New(g_ceph_context, nullptr, nullptr);
664 return capsule;
665 }
666