1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2014 John Spray <john.spray@inktank.com>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
14 // Include this first to get python headers earlier
17 #include "common/errno.h"
18 #include "include/stringify.h"
20 #include "PyFormatter.h"
22 #include "osd/OSDMap.h"
23 #include "mon/MonMap.h"
25 #include "mgr/MgrContext.h"
27 // For ::config_prefix
29 #include "PyModuleRegistry.h"
31 #include "ActivePyModules.h"
32 #include "DaemonServer.h"
34 #define dout_context g_ceph_context
35 #define dout_subsys ceph_subsys_mgr
37 #define dout_prefix *_dout << "mgr " << __func__ << " "
39 ActivePyModules::ActivePyModules(PyModuleConfig
&module_config_
,
40 std::map
<std::string
, std::string
> store_data
,
41 DaemonStateIndex
&ds
, ClusterState
&cs
,
42 MonClient
&mc
, LogChannelRef clog_
,
43 LogChannelRef audit_clog_
, Objecter
&objecter_
,
44 Client
&client_
, Finisher
&f
, DaemonServer
&server
,
45 PyModuleRegistry
&pmr
)
46 : module_config(module_config_
), daemon_state(ds
), cluster_state(cs
),
47 monc(mc
), clog(clog_
), audit_clog(audit_clog_
), objecter(objecter_
),
48 client(client_
), finisher(f
),
49 cmd_finisher(g_ceph_context
, "cmd_finisher", "cmdfin"),
50 server(server
), py_module_registry(pmr
), lock("ActivePyModules")
52 store_cache
= std::move(store_data
);
56 ActivePyModules::~ActivePyModules() = default;
58 void ActivePyModules::dump_server(const std::string
&hostname
,
59 const DaemonStateCollection
&dmc
,
62 f
->dump_string("hostname", hostname
);
63 f
->open_array_section("services");
64 std::string ceph_version
;
66 for (const auto &i
: dmc
) {
67 std::lock_guard
l(i
.second
->lock
);
68 const auto &key
= i
.first
;
69 const std::string
&str_type
= key
.first
;
70 const std::string
&svc_name
= key
.second
;
72 // TODO: pick the highest version, and make sure that
73 // somewhere else (during health reporting?) we are
74 // indicating to the user if we see mixed versions
75 auto ver_iter
= i
.second
->metadata
.find("ceph_version");
76 if (ver_iter
!= i
.second
->metadata
.end()) {
77 ceph_version
= i
.second
->metadata
.at("ceph_version");
80 f
->open_object_section("service");
81 f
->dump_string("type", str_type
);
82 f
->dump_string("id", svc_name
);
87 f
->dump_string("ceph_version", ceph_version
);
92 PyObject
*ActivePyModules::get_server_python(const std::string
&hostname
)
94 PyThreadState
*tstate
= PyEval_SaveThread();
95 std::lock_guard
l(lock
);
96 PyEval_RestoreThread(tstate
);
97 dout(10) << " (" << hostname
<< ")" << dendl
;
99 auto dmc
= daemon_state
.get_by_server(hostname
);
102 dump_server(hostname
, dmc
, &f
);
107 PyObject
*ActivePyModules::list_servers_python()
109 PyFormatter
f(false, true);
110 PyThreadState
*tstate
= PyEval_SaveThread();
111 dout(10) << " >" << dendl
;
113 daemon_state
.with_daemons_by_server([this, &f
, &tstate
]
114 (const std::map
<std::string
, DaemonStateCollection
> &all
) {
115 PyEval_RestoreThread(tstate
);
117 for (const auto &i
: all
) {
118 const auto &hostname
= i
.first
;
120 f
.open_object_section("server");
121 dump_server(hostname
, i
.second
, &f
);
129 PyObject
*ActivePyModules::get_metadata_python(
130 const std::string
&svc_type
,
131 const std::string
&svc_id
)
133 auto metadata
= daemon_state
.get(DaemonKey(svc_type
, svc_id
));
134 if (metadata
== nullptr) {
135 derr
<< "Requested missing service " << svc_type
<< "." << svc_id
<< dendl
;
139 std::lock_guard
l(metadata
->lock
);
141 f
.dump_string("hostname", metadata
->hostname
);
142 for (const auto &i
: metadata
->metadata
) {
143 f
.dump_string(i
.first
.c_str(), i
.second
);
149 PyObject
*ActivePyModules::get_daemon_status_python(
150 const std::string
&svc_type
,
151 const std::string
&svc_id
)
153 auto metadata
= daemon_state
.get(DaemonKey(svc_type
, svc_id
));
154 if (metadata
== nullptr) {
155 derr
<< "Requested missing service " << svc_type
<< "." << svc_id
<< dendl
;
159 std::lock_guard
l(metadata
->lock
);
161 for (const auto &i
: metadata
->service_status
) {
162 f
.dump_string(i
.first
.c_str(), i
.second
);
167 PyObject
*ActivePyModules::get_python(const std::string
&what
)
171 // Drop the GIL, as most of the following blocks will block on
172 // a mutex -- they are all responsible for re-taking the GIL before
173 // touching the PyFormatter instance or returning from the function.
174 PyThreadState
*tstate
= PyEval_SaveThread();
176 if (what
== "fs_map") {
177 cluster_state
.with_fsmap([&f
, &tstate
](const FSMap
&fsmap
) {
178 PyEval_RestoreThread(tstate
);
182 } else if (what
== "osdmap_crush_map_text") {
184 cluster_state
.with_osdmap([&rdata
, &tstate
](const OSDMap
&osd_map
){
185 PyEval_RestoreThread(tstate
);
186 osd_map
.crush
->encode(rdata
, CEPH_FEATURES_SUPPORTED_DEFAULT
);
188 std::string crush_text
= rdata
.to_str();
189 return PyString_FromString(crush_text
.c_str());
190 } else if (what
.substr(0, 7) == "osd_map") {
191 cluster_state
.with_osdmap([&f
, &what
, &tstate
](const OSDMap
&osd_map
){
192 PyEval_RestoreThread(tstate
);
193 if (what
== "osd_map") {
195 } else if (what
== "osd_map_tree") {
196 osd_map
.print_tree(&f
, nullptr);
197 } else if (what
== "osd_map_crush") {
198 osd_map
.crush
->dump(&f
);
202 } else if (what
.substr(0, 6) == "config") {
203 PyEval_RestoreThread(tstate
);
204 if (what
== "config_options") {
205 g_conf().config_options(&f
);
206 } else if (what
== "config") {
207 g_conf().show_config(&f
);
210 } else if (what
== "mon_map") {
211 cluster_state
.with_monmap(
212 [&f
, &tstate
](const MonMap
&monmap
) {
213 PyEval_RestoreThread(tstate
);
218 } else if (what
== "service_map") {
219 cluster_state
.with_servicemap(
220 [&f
, &tstate
](const ServiceMap
&service_map
) {
221 PyEval_RestoreThread(tstate
);
222 service_map
.dump(&f
);
226 } else if (what
== "osd_metadata") {
227 auto dmc
= daemon_state
.get_by_service("osd");
228 PyEval_RestoreThread(tstate
);
230 for (const auto &i
: dmc
) {
231 std::lock_guard
l(i
.second
->lock
);
232 f
.open_object_section(i
.first
.second
.c_str());
233 f
.dump_string("hostname", i
.second
->hostname
);
234 for (const auto &j
: i
.second
->metadata
) {
235 f
.dump_string(j
.first
.c_str(), j
.second
);
240 } else if (what
== "pg_summary") {
241 cluster_state
.with_pgmap(
242 [&f
, &tstate
](const PGMap
&pg_map
) {
243 PyEval_RestoreThread(tstate
);
245 std::map
<std::string
, std::map
<std::string
, uint32_t> > osds
;
246 std::map
<std::string
, std::map
<std::string
, uint32_t> > pools
;
247 std::map
<std::string
, uint32_t> all
;
248 for (const auto &i
: pg_map
.pg_stat
) {
249 const auto pool
= i
.first
.m_pool
;
250 const std::string state
= pg_state_string(i
.second
.state
);
251 // Insert to per-pool map
252 pools
[stringify(pool
)][state
]++;
253 for (const auto &osd_id
: i
.second
.acting
) {
254 osds
[stringify(osd_id
)][state
]++;
258 f
.open_object_section("by_osd");
259 for (const auto &i
: osds
) {
260 f
.open_object_section(i
.first
.c_str());
261 for (const auto &j
: i
.second
) {
262 f
.dump_int(j
.first
.c_str(), j
.second
);
267 f
.open_object_section("by_pool");
268 for (const auto &i
: pools
) {
269 f
.open_object_section(i
.first
.c_str());
270 for (const auto &j
: i
.second
) {
271 f
.dump_int(j
.first
.c_str(), j
.second
);
276 f
.open_object_section("all");
277 for (const auto &i
: all
) {
278 f
.dump_int(i
.first
.c_str(), i
.second
);
281 f
.open_object_section("pg_stats_sum");
282 pg_map
.pg_sum
.dump(&f
);
287 } else if (what
== "pg_status") {
288 cluster_state
.with_pgmap(
289 [&f
, &tstate
](const PGMap
&pg_map
) {
290 PyEval_RestoreThread(tstate
);
291 pg_map
.print_summary(&f
, nullptr);
295 } else if (what
== "pg_dump") {
296 cluster_state
.with_pgmap(
297 [&f
, &tstate
](const PGMap
&pg_map
) {
298 PyEval_RestoreThread(tstate
);
303 } else if (what
== "devices") {
304 daemon_state
.with_devices2(
306 PyEval_RestoreThread(tstate
);
307 f
.open_array_section("devices");
309 [&f
] (const DeviceState
& dev
) {
310 f
.dump_object("device", dev
);
314 } else if (what
.size() > 7 &&
315 what
.substr(0, 7) == "device ") {
316 string devid
= what
.substr(7);
317 daemon_state
.with_device(devid
, [&f
, &tstate
] (const DeviceState
& dev
) {
318 PyEval_RestoreThread(tstate
);
319 f
.dump_object("device", dev
);
322 } else if (what
== "io_rate") {
323 cluster_state
.with_pgmap(
324 [&f
, &tstate
](const PGMap
&pg_map
) {
325 PyEval_RestoreThread(tstate
);
326 pg_map
.dump_delta(&f
);
330 } else if (what
== "df") {
331 cluster_state
.with_osdmap_and_pgmap(
333 const OSDMap
& osd_map
,
334 const PGMap
&pg_map
) {
335 PyEval_RestoreThread(tstate
);
336 pg_map
.dump_cluster_stats(nullptr, &f
, true);
337 pg_map
.dump_pool_stats_full(osd_map
, nullptr, &f
, true);
340 } else if (what
== "osd_stats") {
341 cluster_state
.with_pgmap(
342 [&f
, &tstate
](const PGMap
&pg_map
) {
343 PyEval_RestoreThread(tstate
);
344 pg_map
.dump_osd_stats(&f
);
347 } else if (what
== "osd_pool_stats") {
348 int64_t poolid
= -ENOENT
;
350 cluster_state
.with_osdmap_and_pgmap([&](const OSDMap
& osdmap
,
351 const PGMap
& pg_map
) {
352 PyEval_RestoreThread(tstate
);
353 f
.open_array_section("pool_stats");
354 for (auto &p
: osdmap
.get_pools()) {
356 pg_map
.dump_pool_stats_and_io_rate(poolid
, osdmap
, &f
, nullptr);
361 } else if (what
== "health" || what
== "mon_status") {
363 if (what
== "health") {
364 json
= cluster_state
.get_health();
365 } else if (what
== "mon_status") {
366 json
= cluster_state
.get_mon_status();
371 PyEval_RestoreThread(tstate
);
372 f
.dump_string("json", json
.to_str());
374 } else if (what
== "mgr_map") {
375 cluster_state
.with_mgrmap([&f
, &tstate
](const MgrMap
&mgr_map
) {
376 PyEval_RestoreThread(tstate
);
381 derr
<< "Python module requested unknown data '" << what
<< "'" << dendl
;
382 PyEval_RestoreThread(tstate
);
387 void ActivePyModules::start_one(PyModuleRef py_module
)
389 std::lock_guard
l(lock
);
391 ceph_assert(modules
.count(py_module
->get_name()) == 0);
393 const auto name
= py_module
->get_name();
394 modules
[name
].reset(new ActivePyModule(py_module
, clog
));
395 auto active_module
= modules
.at(name
).get();
397 // Send all python calls down a Finisher to avoid blocking
398 // C++ code, and avoid any potential lock cycles.
399 finisher
.queue(new FunctionContext([this, active_module
, name
](int) {
400 int r
= active_module
->load(this);
402 derr
<< "Failed to run module in active mode ('" << name
<< "')"
404 std::lock_guard
l(lock
);
407 dout(4) << "Starting thread for " << name
<< dendl
;
408 active_module
->thread
.create(active_module
->get_thread_name());
413 void ActivePyModules::shutdown()
415 std::lock_guard
locker(lock
);
417 // Signal modules to drop out of serve() and/or tear down resources
418 for (auto &i
: modules
) {
419 auto module
= i
.second
.get();
420 const auto& name
= i
.first
;
423 dout(10) << "calling module " << name
<< " shutdown()" << dendl
;
425 dout(10) << "module " << name
<< " shutdown() returned" << dendl
;
429 // For modules implementing serve(), finish the threads where we
430 // were running that.
431 for (auto &i
: modules
) {
433 dout(10) << "joining module " << i
.first
<< dendl
;
434 i
.second
->thread
.join();
435 dout(10) << "joined module " << i
.first
<< dendl
;
439 cmd_finisher
.wait_for_empty();
445 void ActivePyModules::notify_all(const std::string
¬ify_type
,
446 const std::string
¬ify_id
)
448 std::lock_guard
l(lock
);
450 dout(10) << __func__
<< ": notify_all " << notify_type
<< dendl
;
451 for (auto& i
: modules
) {
452 auto module
= i
.second
.get();
453 // Send all python calls down a Finisher to avoid blocking
454 // C++ code, and avoid any potential lock cycles.
455 finisher
.queue(new FunctionContext([module
, notify_type
, notify_id
](int r
){
456 module
->notify(notify_type
, notify_id
);
461 void ActivePyModules::notify_all(const LogEntry
&log_entry
)
463 std::lock_guard
l(lock
);
465 dout(10) << __func__
<< ": notify_all (clog)" << dendl
;
466 for (auto& i
: modules
) {
467 auto module
= i
.second
.get();
468 // Send all python calls down a Finisher to avoid blocking
469 // C++ code, and avoid any potential lock cycles.
471 // Note intentional use of non-reference lambda binding on
472 // log_entry: we take a copy because caller's instance is
473 // probably ephemeral.
474 finisher
.queue(new FunctionContext([module
, log_entry
](int r
){
475 module
->notify_clog(log_entry
);
480 bool ActivePyModules::get_store(const std::string
&module_name
,
481 const std::string
&key
, std::string
*val
) const
483 PyThreadState
*tstate
= PyEval_SaveThread();
484 std::lock_guard
l(lock
);
485 PyEval_RestoreThread(tstate
);
487 const std::string global_key
= PyModule::config_prefix
488 + module_name
+ "/" + key
;
490 dout(4) << __func__
<< " key: " << global_key
<< dendl
;
492 auto i
= store_cache
.find(global_key
);
493 if (i
!= store_cache
.end()) {
501 PyObject
*ActivePyModules::dispatch_remote(
502 const std::string
&other_module
,
503 const std::string
&method
,
508 auto mod_iter
= modules
.find(other_module
);
509 ceph_assert(mod_iter
!= modules
.end());
511 return mod_iter
->second
->dispatch_remote(method
, args
, kwargs
, err
);
514 bool ActivePyModules::get_config(const std::string
&module_name
,
515 const std::string
&key
, std::string
*val
) const
517 const std::string global_key
= PyModule::config_prefix
518 + module_name
+ "/" + key
;
520 dout(4) << __func__
<< " key: " << global_key
<< dendl
;
522 std::lock_guard
lock(module_config
.lock
);
524 auto i
= module_config
.config
.find(global_key
);
525 if (i
!= module_config
.config
.end()) {
533 PyObject
*ActivePyModules::get_typed_config(
534 const std::string
&module_name
,
535 const std::string
&key
,
536 const std::string
&prefix
) const
538 PyThreadState
*tstate
= PyEval_SaveThread();
540 std::string final_key
;
543 final_key
= prefix
+ "/" + key
;
544 found
= get_config(module_name
, final_key
, &value
);
548 found
= get_config(module_name
, final_key
, &value
);
551 PyModuleRef module
= py_module_registry
.get_module(module_name
);
552 PyEval_RestoreThread(tstate
);
554 derr
<< "Module '" << module_name
<< "' is not available" << dendl
;
557 dout(10) << __func__
<< " " << final_key
<< " found: " << value
<< dendl
;
558 return module
->get_typed_option_value(key
, value
);
560 PyEval_RestoreThread(tstate
);
562 dout(4) << __func__
<< " [" << prefix
<< "/]" << key
<< " not found "
565 dout(4) << __func__
<< " " << key
<< " not found " << dendl
;
570 PyObject
*ActivePyModules::get_store_prefix(const std::string
&module_name
,
571 const std::string
&prefix
) const
573 PyThreadState
*tstate
= PyEval_SaveThread();
574 std::lock_guard
l(lock
);
575 std::lock_guard
lock(module_config
.lock
);
576 PyEval_RestoreThread(tstate
);
578 const std::string base_prefix
= PyModule::config_prefix
580 const std::string global_prefix
= base_prefix
+ prefix
;
581 dout(4) << __func__
<< " prefix: " << global_prefix
<< dendl
;
585 for (auto p
= store_cache
.lower_bound(global_prefix
);
586 p
!= store_cache
.end() && p
->first
.find(global_prefix
) == 0;
588 f
.dump_string(p
->first
.c_str() + base_prefix
.size(), p
->second
);
593 void ActivePyModules::set_store(const std::string
&module_name
,
594 const std::string
&key
, const boost::optional
<std::string
>& val
)
596 const std::string global_key
= PyModule::config_prefix
597 + module_name
+ "/" + key
;
601 std::lock_guard
l(lock
);
603 store_cache
[global_key
] = *val
;
605 store_cache
.erase(global_key
);
608 std::ostringstream cmd_json
;
610 jf
.open_object_section("cmd");
612 jf
.dump_string("prefix", "config-key set");
613 jf
.dump_string("key", global_key
);
614 jf
.dump_string("val", *val
);
616 jf
.dump_string("prefix", "config-key del");
617 jf
.dump_string("key", global_key
);
621 set_cmd
.run(&monc
, cmd_json
.str());
625 if (set_cmd
.r
!= 0) {
626 // config-key set will fail if mgr's auth key has insufficient
627 // permission to set config keys
628 // FIXME: should this somehow raise an exception back into Python land?
629 dout(0) << "`config-key set " << global_key
<< " " << val
<< "` failed: "
630 << cpp_strerror(set_cmd
.r
) << dendl
;
631 dout(0) << "mon returned " << set_cmd
.r
<< ": " << set_cmd
.outs
<< dendl
;
635 void ActivePyModules::set_config(const std::string
&module_name
,
636 const std::string
&key
, const boost::optional
<std::string
>& val
)
638 module_config
.set_config(&monc
, module_name
, key
, val
);
641 std::map
<std::string
, std::string
> ActivePyModules::get_services() const
643 std::map
<std::string
, std::string
> result
;
644 std::lock_guard
l(lock
);
645 for (const auto& i
: modules
) {
646 const auto &module
= i
.second
.get();
647 std::string svc_str
= module
->get_uri();
648 if (!svc_str
.empty()) {
649 result
[module
->get_name()] = svc_str
;
656 PyObject
* ActivePyModules::with_perf_counters(
657 std::function
<void(PerfCounterInstance
& counter_instance
, PerfCounterType
& counter_type
, PyFormatter
& f
)> fct
,
658 const std::string
&svc_name
,
659 const std::string
&svc_id
,
660 const std::string
&path
) const
662 PyThreadState
*tstate
= PyEval_SaveThread();
663 std::lock_guard
l(lock
);
664 PyEval_RestoreThread(tstate
);
667 f
.open_array_section(path
.c_str());
669 auto metadata
= daemon_state
.get(DaemonKey(svc_name
, svc_id
));
671 std::lock_guard
l2(metadata
->lock
);
672 if (metadata
->perf_counters
.instances
.count(path
)) {
673 auto counter_instance
= metadata
->perf_counters
.instances
.at(path
);
674 auto counter_type
= metadata
->perf_counters
.types
.at(path
);
675 fct(counter_instance
, counter_type
, f
);
677 dout(4) << "Missing counter: '" << path
<< "' ("
678 << svc_name
<< "." << svc_id
<< ")" << dendl
;
679 dout(20) << "Paths are:" << dendl
;
680 for (const auto &i
: metadata
->perf_counters
.instances
) {
681 dout(20) << i
.first
<< dendl
;
685 dout(4) << "No daemon state for "
686 << svc_name
<< "." << svc_id
<< ")" << dendl
;
692 PyObject
* ActivePyModules::get_counter_python(
693 const std::string
&svc_name
,
694 const std::string
&svc_id
,
695 const std::string
&path
)
697 auto extract_counters
= [](
698 PerfCounterInstance
& counter_instance
,
699 PerfCounterType
& counter_type
,
702 if (counter_type
.type
& PERFCOUNTER_LONGRUNAVG
) {
703 const auto &avg_data
= counter_instance
.get_data_avg();
704 for (const auto &datapoint
: avg_data
) {
705 f
.open_array_section("datapoint");
706 f
.dump_unsigned("t", datapoint
.t
.sec());
707 f
.dump_unsigned("s", datapoint
.s
);
708 f
.dump_unsigned("c", datapoint
.c
);
712 const auto &data
= counter_instance
.get_data();
713 for (const auto &datapoint
: data
) {
714 f
.open_array_section("datapoint");
715 f
.dump_unsigned("t", datapoint
.t
.sec());
716 f
.dump_unsigned("v", datapoint
.v
);
721 return with_perf_counters(extract_counters
, svc_name
, svc_id
, path
);
724 PyObject
* ActivePyModules::get_latest_counter_python(
725 const std::string
&svc_name
,
726 const std::string
&svc_id
,
727 const std::string
&path
)
729 auto extract_latest_counters
= [](
730 PerfCounterInstance
& counter_instance
,
731 PerfCounterType
& counter_type
,
734 if (counter_type
.type
& PERFCOUNTER_LONGRUNAVG
) {
735 const auto &datapoint
= counter_instance
.get_latest_data_avg();
736 f
.dump_unsigned("t", datapoint
.t
.sec());
737 f
.dump_unsigned("s", datapoint
.s
);
738 f
.dump_unsigned("c", datapoint
.c
);
740 const auto &datapoint
= counter_instance
.get_latest_data();
741 f
.dump_unsigned("t", datapoint
.t
.sec());
742 f
.dump_unsigned("v", datapoint
.v
);
745 return with_perf_counters(extract_latest_counters
, svc_name
, svc_id
, path
);
748 PyObject
* ActivePyModules::get_perf_schema_python(
749 const std::string
&svc_type
,
750 const std::string
&svc_id
)
752 PyThreadState
*tstate
= PyEval_SaveThread();
753 std::lock_guard
l(lock
);
754 PyEval_RestoreThread(tstate
);
756 DaemonStateCollection daemons
;
758 if (svc_type
== "") {
759 daemons
= daemon_state
.get_all();
760 } else if (svc_id
.empty()) {
761 daemons
= daemon_state
.get_by_service(svc_type
);
763 auto key
= DaemonKey(svc_type
, svc_id
);
764 // so that the below can be a loop in all cases
765 auto got
= daemon_state
.get(key
);
766 if (got
!= nullptr) {
772 if (!daemons
.empty()) {
773 for (auto statepair
: daemons
) {
774 auto key
= statepair
.first
;
775 auto state
= statepair
.second
;
777 std::ostringstream daemon_name
;
778 daemon_name
<< key
.first
<< "." << key
.second
;
779 f
.open_object_section(daemon_name
.str().c_str());
781 std::lock_guard
l(state
->lock
);
782 for (auto ctr_inst_iter
: state
->perf_counters
.instances
) {
783 const auto &counter_name
= ctr_inst_iter
.first
;
784 f
.open_object_section(counter_name
.c_str());
785 auto type
= state
->perf_counters
.types
[counter_name
];
786 f
.dump_string("description", type
.description
);
787 if (!type
.nick
.empty()) {
788 f
.dump_string("nick", type
.nick
);
790 f
.dump_unsigned("type", type
.type
);
791 f
.dump_unsigned("priority", type
.priority
);
792 f
.dump_unsigned("units", type
.unit
);
798 dout(4) << __func__
<< ": No daemon state found for "
799 << svc_type
<< "." << svc_id
<< ")" << dendl
;
804 PyObject
*ActivePyModules::get_context()
806 PyThreadState
*tstate
= PyEval_SaveThread();
807 std::lock_guard
l(lock
);
808 PyEval_RestoreThread(tstate
);
810 // Construct a capsule containing ceph context.
811 // Not incrementing/decrementing ref count on the context because
812 // it's the global one and it has process lifetime.
813 auto capsule
= PyCapsule_New(g_ceph_context
, nullptr, nullptr);
818 * Helper for our wrapped types that take a capsule in their constructor.
820 PyObject
*construct_with_capsule(
821 const std::string
&module_name
,
822 const std::string
&clsname
,
825 // Look up the OSDMap type which we will construct
826 PyObject
*module
= PyImport_ImportModule(module_name
.c_str());
828 derr
<< "Failed to import python module:" << dendl
;
829 derr
<< handle_pyerror() << dendl
;
833 PyObject
*wrapper_type
= PyObject_GetAttrString(
834 module
, (const char*)clsname
.c_str());
836 derr
<< "Failed to get python type:" << dendl
;
837 derr
<< handle_pyerror() << dendl
;
839 ceph_assert(wrapper_type
);
841 // Construct a capsule containing an OSDMap.
842 auto wrapped_capsule
= PyCapsule_New(wrapped
, nullptr, nullptr);
843 ceph_assert(wrapped_capsule
);
845 // Construct the python OSDMap
846 auto pArgs
= PyTuple_Pack(1, wrapped_capsule
);
847 auto wrapper_instance
= PyObject_CallObject(wrapper_type
, pArgs
);
848 if (wrapper_instance
== nullptr) {
849 derr
<< "Failed to construct python OSDMap:" << dendl
;
850 derr
<< handle_pyerror() << dendl
;
852 ceph_assert(wrapper_instance
!= nullptr);
854 Py_DECREF(wrapped_capsule
);
856 Py_DECREF(wrapper_type
);
859 return wrapper_instance
;
862 PyObject
*ActivePyModules::get_osdmap()
864 OSDMap
*newmap
= new OSDMap
;
866 PyThreadState
*tstate
= PyEval_SaveThread();
868 std::lock_guard
l(lock
);
869 cluster_state
.with_osdmap([&](const OSDMap
& o
) {
870 newmap
->deepish_copy_from(o
);
873 PyEval_RestoreThread(tstate
);
875 return construct_with_capsule("mgr_module", "OSDMap", (void*)newmap
);
878 void ActivePyModules::set_health_checks(const std::string
& module_name
,
879 health_check_map_t
&& checks
)
881 bool changed
= false;
884 auto p
= modules
.find(module_name
);
885 if (p
!= modules
.end()) {
886 changed
= p
->second
->set_health_checks(std::move(checks
));
890 // immediately schedule a report to be sent to the monitors with the new
891 // health checks that have changed. This is done asynchronusly to avoid
892 // blocking python land. ActivePyModules::lock needs to be dropped to make
895 // send_report callers: DaemonServer::lock -> PyModuleRegistery::lock
896 // active_start: PyModuleRegistry::lock -> ActivePyModules::lock
898 // if we don't release this->lock before calling schedule_tick a cycle is
899 // formed with the addition of ActivePyModules::lock -> DaemonServer::lock.
900 // This is still correct as send_report is run asynchronously under
901 // DaemonServer::lock.
903 server
.schedule_tick(0);
906 int ActivePyModules::handle_command(
907 std::string
const &module_name
,
908 const cmdmap_t
&cmdmap
,
909 const bufferlist
&inbuf
,
910 std::stringstream
*ds
,
911 std::stringstream
*ss
)
914 auto mod_iter
= modules
.find(module_name
);
915 if (mod_iter
== modules
.end()) {
916 *ss
<< "Module '" << module_name
<< "' is not available";
922 return mod_iter
->second
->handle_command(cmdmap
, inbuf
, ds
, ss
);
925 void ActivePyModules::get_health_checks(health_check_map_t
*checks
)
927 std::lock_guard
l(lock
);
928 for (auto& p
: modules
) {
929 p
.second
->get_health_checks(checks
);
933 void ActivePyModules::update_progress_event(
934 const std::string
& evid
,
935 const std::string
& desc
,
938 std::lock_guard
l(lock
);
939 auto& pe
= progress_events
[evid
];
941 pe
.progress
= progress
;
944 void ActivePyModules::complete_progress_event(const std::string
& evid
)
946 std::lock_guard
l(lock
);
947 progress_events
.erase(evid
);
950 void ActivePyModules::clear_all_progress_events()
952 std::lock_guard
l(lock
);
953 progress_events
.clear();
956 void ActivePyModules::get_progress_events(std::map
<std::string
,ProgressEvent
> *events
)
958 std::lock_guard
l(lock
);
959 *events
= progress_events
;
962 void ActivePyModules::config_notify()
964 std::lock_guard
l(lock
);
965 for (auto& i
: modules
) {
966 auto module
= i
.second
.get();
967 // Send all python calls down a Finisher to avoid blocking
968 // C++ code, and avoid any potential lock cycles.
969 finisher
.queue(new FunctionContext([module
](int r
){
970 module
->config_notify();
975 void ActivePyModules::set_uri(const std::string
& module_name
,
976 const std::string
&uri
)
978 std::lock_guard
l(lock
);
980 dout(4) << " module " << module_name
<< " set URI '" << uri
<< "'" << dendl
;
982 modules
[module_name
]->set_uri(uri
);
985 OSDPerfMetricQueryID
ActivePyModules::add_osd_perf_query(
986 const OSDPerfMetricQuery
&query
,
987 const std::optional
<OSDPerfMetricLimit
> &limit
)
989 return server
.add_osd_perf_query(query
, limit
);
992 void ActivePyModules::remove_osd_perf_query(OSDPerfMetricQueryID query_id
)
994 int r
= server
.remove_osd_perf_query(query_id
);
996 dout(0) << "remove_osd_perf_query for query_id=" << query_id
<< " failed: "
997 << cpp_strerror(r
) << dendl
;
1001 PyObject
*ActivePyModules::get_osd_perf_counters(OSDPerfMetricQueryID query_id
)
1003 std::map
<OSDPerfMetricKey
, PerformanceCounters
> counters
;
1005 int r
= server
.get_osd_perf_counters(query_id
, &counters
);
1007 dout(0) << "get_osd_perf_counters for query_id=" << query_id
<< " failed: "
1008 << cpp_strerror(r
) << dendl
;
1014 f
.open_array_section("counters");
1015 for (auto &it
: counters
) {
1016 auto &key
= it
.first
;
1017 auto &instance_counters
= it
.second
;
1018 f
.open_object_section("i");
1019 f
.open_array_section("k");
1020 for (auto &sub_key
: key
) {
1021 f
.open_array_section("s");
1022 for (size_t i
= 0; i
< sub_key
.size(); i
++) {
1023 f
.dump_string(stringify(i
).c_str(), sub_key
[i
]);
1025 f
.close_section(); // s
1027 f
.close_section(); // k
1028 f
.open_array_section("c");
1029 for (auto &c
: instance_counters
) {
1030 f
.open_array_section("p");
1031 f
.dump_unsigned("0", c
.first
);
1032 f
.dump_unsigned("1", c
.second
);
1033 f
.close_section(); // p
1035 f
.close_section(); // c
1036 f
.close_section(); // i
1038 f
.close_section(); // counters
1043 void ActivePyModules::cluster_log(const std::string
&channel
, clog_type prio
,
1044 const std::string
&message
)
1046 std::lock_guard
l(lock
);
1048 if (channel
== "audit") {
1049 audit_clog
->do_log(prio
, message
);
1051 clog
->do_log(prio
, message
);