1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2014 John Spray <john.spray@inktank.com>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
14 // Include this first to get python headers earlier
17 #include "common/errno.h"
18 #include "include/stringify.h"
20 #include "PyFormatter.h"
22 #include "osd/OSDMap.h"
23 #include "mon/MonMap.h"
25 #include "mgr/MgrContext.h"
27 // For ::config_prefix
29 #include "PyModuleRegistry.h"
31 #include "ActivePyModules.h"
32 #include "DaemonServer.h"
34 #define dout_context g_ceph_context
35 #define dout_subsys ceph_subsys_mgr
37 #define dout_prefix *_dout << "mgr " << __func__ << " "
39 ActivePyModules::ActivePyModules(PyModuleConfig
&module_config_
,
40 std::map
<std::string
, std::string
> store_data
,
41 DaemonStateIndex
&ds
, ClusterState
&cs
,
42 MonClient
&mc
, LogChannelRef clog_
,
43 LogChannelRef audit_clog_
, Objecter
&objecter_
,
44 Client
&client_
, Finisher
&f
, DaemonServer
&server
,
45 PyModuleRegistry
&pmr
)
46 : module_config(module_config_
), daemon_state(ds
), cluster_state(cs
),
47 monc(mc
), clog(clog_
), audit_clog(audit_clog_
), objecter(objecter_
),
48 client(client_
), finisher(f
),
49 server(server
), py_module_registry(pmr
), lock("ActivePyModules")
51 store_cache
= std::move(store_data
);
54 ActivePyModules::~ActivePyModules() = default;
56 void ActivePyModules::dump_server(const std::string
&hostname
,
57 const DaemonStateCollection
&dmc
,
60 f
->dump_string("hostname", hostname
);
61 f
->open_array_section("services");
62 std::string ceph_version
;
64 for (const auto &i
: dmc
) {
65 std::lock_guard
l(i
.second
->lock
);
66 const auto &key
= i
.first
;
67 const std::string
&str_type
= key
.first
;
68 const std::string
&svc_name
= key
.second
;
70 // TODO: pick the highest version, and make sure that
71 // somewhere else (during health reporting?) we are
72 // indicating to the user if we see mixed versions
73 auto ver_iter
= i
.second
->metadata
.find("ceph_version");
74 if (ver_iter
!= i
.second
->metadata
.end()) {
75 ceph_version
= i
.second
->metadata
.at("ceph_version");
78 f
->open_object_section("service");
79 f
->dump_string("type", str_type
);
80 f
->dump_string("id", svc_name
);
85 f
->dump_string("ceph_version", ceph_version
);
90 PyObject
*ActivePyModules::get_server_python(const std::string
&hostname
)
92 PyThreadState
*tstate
= PyEval_SaveThread();
93 std::lock_guard
l(lock
);
94 PyEval_RestoreThread(tstate
);
95 dout(10) << " (" << hostname
<< ")" << dendl
;
97 auto dmc
= daemon_state
.get_by_server(hostname
);
100 dump_server(hostname
, dmc
, &f
);
105 PyObject
*ActivePyModules::list_servers_python()
107 PyFormatter
f(false, true);
108 PyThreadState
*tstate
= PyEval_SaveThread();
109 dout(10) << " >" << dendl
;
111 daemon_state
.with_daemons_by_server([this, &f
, &tstate
]
112 (const std::map
<std::string
, DaemonStateCollection
> &all
) {
113 PyEval_RestoreThread(tstate
);
115 for (const auto &i
: all
) {
116 const auto &hostname
= i
.first
;
118 f
.open_object_section("server");
119 dump_server(hostname
, i
.second
, &f
);
127 PyObject
*ActivePyModules::get_metadata_python(
128 const std::string
&svc_type
,
129 const std::string
&svc_id
)
131 auto metadata
= daemon_state
.get(DaemonKey(svc_type
, svc_id
));
132 if (metadata
== nullptr) {
133 derr
<< "Requested missing service " << svc_type
<< "." << svc_id
<< dendl
;
137 std::lock_guard
l(metadata
->lock
);
139 f
.dump_string("hostname", metadata
->hostname
);
140 for (const auto &i
: metadata
->metadata
) {
141 f
.dump_string(i
.first
.c_str(), i
.second
);
147 PyObject
*ActivePyModules::get_daemon_status_python(
148 const std::string
&svc_type
,
149 const std::string
&svc_id
)
151 auto metadata
= daemon_state
.get(DaemonKey(svc_type
, svc_id
));
152 if (metadata
== nullptr) {
153 derr
<< "Requested missing service " << svc_type
<< "." << svc_id
<< dendl
;
157 std::lock_guard
l(metadata
->lock
);
159 for (const auto &i
: metadata
->service_status
) {
160 f
.dump_string(i
.first
.c_str(), i
.second
);
165 PyObject
*ActivePyModules::get_python(const std::string
&what
)
169 // Drop the GIL, as most of the following blocks will block on
170 // a mutex -- they are all responsible for re-taking the GIL before
171 // touching the PyFormatter instance or returning from the function.
172 PyThreadState
*tstate
= PyEval_SaveThread();
174 if (what
== "fs_map") {
175 cluster_state
.with_fsmap([&f
, &tstate
](const FSMap
&fsmap
) {
176 PyEval_RestoreThread(tstate
);
180 } else if (what
== "osdmap_crush_map_text") {
182 cluster_state
.with_osdmap([&rdata
, &tstate
](const OSDMap
&osd_map
){
183 PyEval_RestoreThread(tstate
);
184 osd_map
.crush
->encode(rdata
, CEPH_FEATURES_SUPPORTED_DEFAULT
);
186 std::string crush_text
= rdata
.to_str();
187 return PyString_FromString(crush_text
.c_str());
188 } else if (what
.substr(0, 7) == "osd_map") {
189 cluster_state
.with_osdmap([&f
, &what
, &tstate
](const OSDMap
&osd_map
){
190 PyEval_RestoreThread(tstate
);
191 if (what
== "osd_map") {
193 } else if (what
== "osd_map_tree") {
194 osd_map
.print_tree(&f
, nullptr);
195 } else if (what
== "osd_map_crush") {
196 osd_map
.crush
->dump(&f
);
200 } else if (what
.substr(0, 6) == "config") {
201 PyEval_RestoreThread(tstate
);
202 if (what
== "config_options") {
203 g_conf().config_options(&f
);
204 } else if (what
== "config") {
205 g_conf().show_config(&f
);
208 } else if (what
== "mon_map") {
209 cluster_state
.with_monmap(
210 [&f
, &tstate
](const MonMap
&monmap
) {
211 PyEval_RestoreThread(tstate
);
216 } else if (what
== "service_map") {
217 cluster_state
.with_servicemap(
218 [&f
, &tstate
](const ServiceMap
&service_map
) {
219 PyEval_RestoreThread(tstate
);
220 service_map
.dump(&f
);
224 } else if (what
== "osd_metadata") {
225 auto dmc
= daemon_state
.get_by_service("osd");
226 PyEval_RestoreThread(tstate
);
228 for (const auto &i
: dmc
) {
229 std::lock_guard
l(i
.second
->lock
);
230 f
.open_object_section(i
.first
.second
.c_str());
231 f
.dump_string("hostname", i
.second
->hostname
);
232 for (const auto &j
: i
.second
->metadata
) {
233 f
.dump_string(j
.first
.c_str(), j
.second
);
238 } else if (what
== "pg_summary") {
239 cluster_state
.with_pgmap(
240 [&f
, &tstate
](const PGMap
&pg_map
) {
241 PyEval_RestoreThread(tstate
);
243 std::map
<std::string
, std::map
<std::string
, uint32_t> > osds
;
244 std::map
<std::string
, std::map
<std::string
, uint32_t> > pools
;
245 std::map
<std::string
, uint32_t> all
;
246 for (const auto &i
: pg_map
.pg_stat
) {
247 const auto pool
= i
.first
.m_pool
;
248 const std::string state
= pg_state_string(i
.second
.state
);
249 // Insert to per-pool map
250 pools
[stringify(pool
)][state
]++;
251 for (const auto &osd_id
: i
.second
.acting
) {
252 osds
[stringify(osd_id
)][state
]++;
256 f
.open_object_section("by_osd");
257 for (const auto &i
: osds
) {
258 f
.open_object_section(i
.first
.c_str());
259 for (const auto &j
: i
.second
) {
260 f
.dump_int(j
.first
.c_str(), j
.second
);
265 f
.open_object_section("by_pool");
266 for (const auto &i
: pools
) {
267 f
.open_object_section(i
.first
.c_str());
268 for (const auto &j
: i
.second
) {
269 f
.dump_int(j
.first
.c_str(), j
.second
);
274 f
.open_object_section("all");
275 for (const auto &i
: all
) {
276 f
.dump_int(i
.first
.c_str(), i
.second
);
279 f
.open_object_section("pg_stats_sum");
280 pg_map
.pg_sum
.dump(&f
);
285 } else if (what
== "pg_status") {
286 cluster_state
.with_pgmap(
287 [&f
, &tstate
](const PGMap
&pg_map
) {
288 PyEval_RestoreThread(tstate
);
289 pg_map
.print_summary(&f
, nullptr);
293 } else if (what
== "pg_dump") {
294 cluster_state
.with_pgmap(
295 [&f
, &tstate
](const PGMap
&pg_map
) {
296 PyEval_RestoreThread(tstate
);
301 } else if (what
== "devices") {
302 daemon_state
.with_devices2(
304 PyEval_RestoreThread(tstate
);
305 f
.open_array_section("devices");
307 [&f
] (const DeviceState
& dev
) {
308 f
.dump_object("device", dev
);
312 } else if (what
.size() > 7 &&
313 what
.substr(0, 7) == "device ") {
314 string devid
= what
.substr(7);
315 daemon_state
.with_device(devid
, [&f
, &tstate
] (const DeviceState
& dev
) {
316 PyEval_RestoreThread(tstate
);
317 f
.dump_object("device", dev
);
320 } else if (what
== "io_rate") {
321 cluster_state
.with_pgmap(
322 [&f
, &tstate
](const PGMap
&pg_map
) {
323 PyEval_RestoreThread(tstate
);
324 pg_map
.dump_delta(&f
);
328 } else if (what
== "df") {
329 cluster_state
.with_osdmap_and_pgmap(
331 const OSDMap
& osd_map
,
332 const PGMap
&pg_map
) {
333 PyEval_RestoreThread(tstate
);
334 pg_map
.dump_cluster_stats(nullptr, &f
, true);
335 pg_map
.dump_pool_stats_full(osd_map
, nullptr, &f
, true);
338 } else if (what
== "osd_stats") {
339 cluster_state
.with_pgmap(
340 [&f
, &tstate
](const PGMap
&pg_map
) {
341 PyEval_RestoreThread(tstate
);
342 pg_map
.dump_osd_stats(&f
);
345 } else if (what
== "osd_pool_stats") {
346 int64_t poolid
= -ENOENT
;
348 cluster_state
.with_osdmap_and_pgmap([&](const OSDMap
& osdmap
,
349 const PGMap
& pg_map
) {
350 PyEval_RestoreThread(tstate
);
351 f
.open_array_section("pool_stats");
352 for (auto &p
: osdmap
.get_pools()) {
354 pg_map
.dump_pool_stats_and_io_rate(poolid
, osdmap
, &f
, nullptr);
359 } else if (what
== "health" || what
== "mon_status") {
361 if (what
== "health") {
362 json
= cluster_state
.get_health();
363 } else if (what
== "mon_status") {
364 json
= cluster_state
.get_mon_status();
369 PyEval_RestoreThread(tstate
);
370 f
.dump_string("json", json
.to_str());
372 } else if (what
== "mgr_map") {
373 cluster_state
.with_mgrmap([&f
, &tstate
](const MgrMap
&mgr_map
) {
374 PyEval_RestoreThread(tstate
);
379 derr
<< "Python module requested unknown data '" << what
<< "'" << dendl
;
380 PyEval_RestoreThread(tstate
);
385 void ActivePyModules::start_one(PyModuleRef py_module
)
387 std::lock_guard
l(lock
);
389 ceph_assert(modules
.count(py_module
->get_name()) == 0);
391 const auto name
= py_module
->get_name();
392 modules
[name
].reset(new ActivePyModule(py_module
, clog
));
393 auto active_module
= modules
.at(name
).get();
395 // Send all python calls down a Finisher to avoid blocking
396 // C++ code, and avoid any potential lock cycles.
397 finisher
.queue(new FunctionContext([this, active_module
, name
](int) {
398 int r
= active_module
->load(this);
400 derr
<< "Failed to run module in active mode ('" << name
<< "')"
402 std::lock_guard
l(lock
);
405 dout(4) << "Starting thread for " << name
<< dendl
;
406 active_module
->thread
.create(active_module
->get_thread_name());
411 void ActivePyModules::shutdown()
413 std::lock_guard
locker(lock
);
415 // Signal modules to drop out of serve() and/or tear down resources
416 for (auto &i
: modules
) {
417 auto module
= i
.second
.get();
418 const auto& name
= i
.first
;
421 dout(10) << "calling module " << name
<< " shutdown()" << dendl
;
423 dout(10) << "module " << name
<< " shutdown() returned" << dendl
;
427 // For modules implementing serve(), finish the threads where we
428 // were running that.
429 for (auto &i
: modules
) {
431 dout(10) << "joining module " << i
.first
<< dendl
;
432 i
.second
->thread
.join();
433 dout(10) << "joined module " << i
.first
<< dendl
;
440 void ActivePyModules::notify_all(const std::string
¬ify_type
,
441 const std::string
¬ify_id
)
443 std::lock_guard
l(lock
);
445 dout(10) << __func__
<< ": notify_all " << notify_type
<< dendl
;
446 for (auto& i
: modules
) {
447 auto module
= i
.second
.get();
448 // Send all python calls down a Finisher to avoid blocking
449 // C++ code, and avoid any potential lock cycles.
450 finisher
.queue(new FunctionContext([module
, notify_type
, notify_id
](int r
){
451 module
->notify(notify_type
, notify_id
);
456 void ActivePyModules::notify_all(const LogEntry
&log_entry
)
458 std::lock_guard
l(lock
);
460 dout(10) << __func__
<< ": notify_all (clog)" << dendl
;
461 for (auto& i
: modules
) {
462 auto module
= i
.second
.get();
463 // Send all python calls down a Finisher to avoid blocking
464 // C++ code, and avoid any potential lock cycles.
466 // Note intentional use of non-reference lambda binding on
467 // log_entry: we take a copy because caller's instance is
468 // probably ephemeral.
469 finisher
.queue(new FunctionContext([module
, log_entry
](int r
){
470 module
->notify_clog(log_entry
);
475 bool ActivePyModules::get_store(const std::string
&module_name
,
476 const std::string
&key
, std::string
*val
) const
478 PyThreadState
*tstate
= PyEval_SaveThread();
479 std::lock_guard
l(lock
);
480 PyEval_RestoreThread(tstate
);
482 const std::string global_key
= PyModule::config_prefix
483 + module_name
+ "/" + key
;
485 dout(4) << __func__
<< " key: " << global_key
<< dendl
;
487 auto i
= store_cache
.find(global_key
);
488 if (i
!= store_cache
.end()) {
496 PyObject
*ActivePyModules::dispatch_remote(
497 const std::string
&other_module
,
498 const std::string
&method
,
503 auto mod_iter
= modules
.find(other_module
);
504 ceph_assert(mod_iter
!= modules
.end());
506 return mod_iter
->second
->dispatch_remote(method
, args
, kwargs
, err
);
509 bool ActivePyModules::get_config(const std::string
&module_name
,
510 const std::string
&key
, std::string
*val
) const
512 const std::string global_key
= PyModule::config_prefix
513 + module_name
+ "/" + key
;
515 dout(4) << __func__
<< " key: " << global_key
<< dendl
;
517 std::lock_guard
lock(module_config
.lock
);
519 auto i
= module_config
.config
.find(global_key
);
520 if (i
!= module_config
.config
.end()) {
528 PyObject
*ActivePyModules::get_typed_config(
529 const std::string
&module_name
,
530 const std::string
&key
,
531 const std::string
&prefix
) const
533 PyThreadState
*tstate
= PyEval_SaveThread();
535 std::string final_key
;
538 final_key
= prefix
+ "/" + key
;
539 found
= get_config(module_name
, final_key
, &value
);
543 found
= get_config(module_name
, final_key
, &value
);
546 PyModuleRef module
= py_module_registry
.get_module(module_name
);
547 PyEval_RestoreThread(tstate
);
549 derr
<< "Module '" << module_name
<< "' is not available" << dendl
;
552 dout(10) << __func__
<< " " << final_key
<< " found: " << value
<< dendl
;
553 return module
->get_typed_option_value(key
, value
);
555 PyEval_RestoreThread(tstate
);
557 dout(4) << __func__
<< " [" << prefix
<< "/]" << key
<< " not found "
560 dout(4) << __func__
<< " " << key
<< " not found " << dendl
;
565 PyObject
*ActivePyModules::get_store_prefix(const std::string
&module_name
,
566 const std::string
&prefix
) const
568 PyThreadState
*tstate
= PyEval_SaveThread();
569 std::lock_guard
l(lock
);
570 std::lock_guard
lock(module_config
.lock
);
571 PyEval_RestoreThread(tstate
);
573 const std::string base_prefix
= PyModule::config_prefix
575 const std::string global_prefix
= base_prefix
+ prefix
;
576 dout(4) << __func__
<< " prefix: " << global_prefix
<< dendl
;
580 for (auto p
= store_cache
.lower_bound(global_prefix
);
581 p
!= store_cache
.end() && p
->first
.find(global_prefix
) == 0;
583 f
.dump_string(p
->first
.c_str() + base_prefix
.size(), p
->second
);
588 void ActivePyModules::set_store(const std::string
&module_name
,
589 const std::string
&key
, const boost::optional
<std::string
>& val
)
591 const std::string global_key
= PyModule::config_prefix
592 + module_name
+ "/" + key
;
596 PyThreadState
*tstate
= PyEval_SaveThread();
597 std::lock_guard
l(lock
);
598 PyEval_RestoreThread(tstate
);
601 store_cache
[global_key
] = *val
;
603 store_cache
.erase(global_key
);
606 std::ostringstream cmd_json
;
608 jf
.open_object_section("cmd");
610 jf
.dump_string("prefix", "config-key set");
611 jf
.dump_string("key", global_key
);
612 jf
.dump_string("val", *val
);
614 jf
.dump_string("prefix", "config-key del");
615 jf
.dump_string("key", global_key
);
619 set_cmd
.run(&monc
, cmd_json
.str());
623 if (set_cmd
.r
!= 0) {
624 // config-key set will fail if mgr's auth key has insufficient
625 // permission to set config keys
626 // FIXME: should this somehow raise an exception back into Python land?
627 dout(0) << "`config-key set " << global_key
<< " " << val
<< "` failed: "
628 << cpp_strerror(set_cmd
.r
) << dendl
;
629 dout(0) << "mon returned " << set_cmd
.r
<< ": " << set_cmd
.outs
<< dendl
;
633 void ActivePyModules::set_config(const std::string
&module_name
,
634 const std::string
&key
, const boost::optional
<std::string
>& val
)
636 module_config
.set_config(&monc
, module_name
, key
, val
);
639 std::map
<std::string
, std::string
> ActivePyModules::get_services() const
641 std::map
<std::string
, std::string
> result
;
642 std::lock_guard
l(lock
);
643 for (const auto& i
: modules
) {
644 const auto &module
= i
.second
.get();
645 std::string svc_str
= module
->get_uri();
646 if (!svc_str
.empty()) {
647 result
[module
->get_name()] = svc_str
;
654 PyObject
* ActivePyModules::with_perf_counters(
655 std::function
<void(PerfCounterInstance
& counter_instance
, PerfCounterType
& counter_type
, PyFormatter
& f
)> fct
,
656 const std::string
&svc_name
,
657 const std::string
&svc_id
,
658 const std::string
&path
) const
660 PyThreadState
*tstate
= PyEval_SaveThread();
661 std::lock_guard
l(lock
);
662 PyEval_RestoreThread(tstate
);
665 f
.open_array_section(path
.c_str());
667 auto metadata
= daemon_state
.get(DaemonKey(svc_name
, svc_id
));
669 std::lock_guard
l2(metadata
->lock
);
670 if (metadata
->perf_counters
.instances
.count(path
)) {
671 auto counter_instance
= metadata
->perf_counters
.instances
.at(path
);
672 auto counter_type
= metadata
->perf_counters
.types
.at(path
);
673 fct(counter_instance
, counter_type
, f
);
675 dout(4) << "Missing counter: '" << path
<< "' ("
676 << svc_name
<< "." << svc_id
<< ")" << dendl
;
677 dout(20) << "Paths are:" << dendl
;
678 for (const auto &i
: metadata
->perf_counters
.instances
) {
679 dout(20) << i
.first
<< dendl
;
683 dout(4) << "No daemon state for "
684 << svc_name
<< "." << svc_id
<< ")" << dendl
;
690 PyObject
* ActivePyModules::get_counter_python(
691 const std::string
&svc_name
,
692 const std::string
&svc_id
,
693 const std::string
&path
)
695 auto extract_counters
= [](
696 PerfCounterInstance
& counter_instance
,
697 PerfCounterType
& counter_type
,
700 if (counter_type
.type
& PERFCOUNTER_LONGRUNAVG
) {
701 const auto &avg_data
= counter_instance
.get_data_avg();
702 for (const auto &datapoint
: avg_data
) {
703 f
.open_array_section("datapoint");
704 f
.dump_unsigned("t", datapoint
.t
.sec());
705 f
.dump_unsigned("s", datapoint
.s
);
706 f
.dump_unsigned("c", datapoint
.c
);
710 const auto &data
= counter_instance
.get_data();
711 for (const auto &datapoint
: data
) {
712 f
.open_array_section("datapoint");
713 f
.dump_unsigned("t", datapoint
.t
.sec());
714 f
.dump_unsigned("v", datapoint
.v
);
719 return with_perf_counters(extract_counters
, svc_name
, svc_id
, path
);
722 PyObject
* ActivePyModules::get_latest_counter_python(
723 const std::string
&svc_name
,
724 const std::string
&svc_id
,
725 const std::string
&path
)
727 auto extract_latest_counters
= [](
728 PerfCounterInstance
& counter_instance
,
729 PerfCounterType
& counter_type
,
732 if (counter_type
.type
& PERFCOUNTER_LONGRUNAVG
) {
733 const auto &datapoint
= counter_instance
.get_latest_data_avg();
734 f
.dump_unsigned("t", datapoint
.t
.sec());
735 f
.dump_unsigned("s", datapoint
.s
);
736 f
.dump_unsigned("c", datapoint
.c
);
738 const auto &datapoint
= counter_instance
.get_latest_data();
739 f
.dump_unsigned("t", datapoint
.t
.sec());
740 f
.dump_unsigned("v", datapoint
.v
);
743 return with_perf_counters(extract_latest_counters
, svc_name
, svc_id
, path
);
746 PyObject
* ActivePyModules::get_perf_schema_python(
747 const std::string
&svc_type
,
748 const std::string
&svc_id
)
750 PyThreadState
*tstate
= PyEval_SaveThread();
751 std::lock_guard
l(lock
);
752 PyEval_RestoreThread(tstate
);
754 DaemonStateCollection daemons
;
756 if (svc_type
== "") {
757 daemons
= daemon_state
.get_all();
758 } else if (svc_id
.empty()) {
759 daemons
= daemon_state
.get_by_service(svc_type
);
761 auto key
= DaemonKey(svc_type
, svc_id
);
762 // so that the below can be a loop in all cases
763 auto got
= daemon_state
.get(key
);
764 if (got
!= nullptr) {
770 if (!daemons
.empty()) {
771 for (auto statepair
: daemons
) {
772 auto key
= statepair
.first
;
773 auto state
= statepair
.second
;
775 std::ostringstream daemon_name
;
776 daemon_name
<< key
.first
<< "." << key
.second
;
777 f
.open_object_section(daemon_name
.str().c_str());
779 std::lock_guard
l(state
->lock
);
780 for (auto ctr_inst_iter
: state
->perf_counters
.instances
) {
781 const auto &counter_name
= ctr_inst_iter
.first
;
782 f
.open_object_section(counter_name
.c_str());
783 auto type
= state
->perf_counters
.types
[counter_name
];
784 f
.dump_string("description", type
.description
);
785 if (!type
.nick
.empty()) {
786 f
.dump_string("nick", type
.nick
);
788 f
.dump_unsigned("type", type
.type
);
789 f
.dump_unsigned("priority", type
.priority
);
790 f
.dump_unsigned("units", type
.unit
);
796 dout(4) << __func__
<< ": No daemon state found for "
797 << svc_type
<< "." << svc_id
<< ")" << dendl
;
802 PyObject
*ActivePyModules::get_context()
804 PyThreadState
*tstate
= PyEval_SaveThread();
805 std::lock_guard
l(lock
);
806 PyEval_RestoreThread(tstate
);
808 // Construct a capsule containing ceph context.
809 // Not incrementing/decrementing ref count on the context because
810 // it's the global one and it has process lifetime.
811 auto capsule
= PyCapsule_New(g_ceph_context
, nullptr, nullptr);
816 * Helper for our wrapped types that take a capsule in their constructor.
818 PyObject
*construct_with_capsule(
819 const std::string
&module_name
,
820 const std::string
&clsname
,
823 // Look up the OSDMap type which we will construct
824 PyObject
*module
= PyImport_ImportModule(module_name
.c_str());
826 derr
<< "Failed to import python module:" << dendl
;
827 derr
<< handle_pyerror() << dendl
;
831 PyObject
*wrapper_type
= PyObject_GetAttrString(
832 module
, (const char*)clsname
.c_str());
834 derr
<< "Failed to get python type:" << dendl
;
835 derr
<< handle_pyerror() << dendl
;
837 ceph_assert(wrapper_type
);
839 // Construct a capsule containing an OSDMap.
840 auto wrapped_capsule
= PyCapsule_New(wrapped
, nullptr, nullptr);
841 ceph_assert(wrapped_capsule
);
843 // Construct the python OSDMap
844 auto pArgs
= PyTuple_Pack(1, wrapped_capsule
);
845 auto wrapper_instance
= PyObject_CallObject(wrapper_type
, pArgs
);
846 if (wrapper_instance
== nullptr) {
847 derr
<< "Failed to construct python OSDMap:" << dendl
;
848 derr
<< handle_pyerror() << dendl
;
850 ceph_assert(wrapper_instance
!= nullptr);
852 Py_DECREF(wrapped_capsule
);
854 Py_DECREF(wrapper_type
);
857 return wrapper_instance
;
860 PyObject
*ActivePyModules::get_osdmap()
862 OSDMap
*newmap
= new OSDMap
;
864 PyThreadState
*tstate
= PyEval_SaveThread();
866 std::lock_guard
l(lock
);
867 cluster_state
.with_osdmap([&](const OSDMap
& o
) {
868 newmap
->deepish_copy_from(o
);
871 PyEval_RestoreThread(tstate
);
873 return construct_with_capsule("mgr_module", "OSDMap", (void*)newmap
);
876 void ActivePyModules::set_health_checks(const std::string
& module_name
,
877 health_check_map_t
&& checks
)
879 bool changed
= false;
882 auto p
= modules
.find(module_name
);
883 if (p
!= modules
.end()) {
884 changed
= p
->second
->set_health_checks(std::move(checks
));
888 // immediately schedule a report to be sent to the monitors with the new
889 // health checks that have changed. This is done asynchronusly to avoid
890 // blocking python land. ActivePyModules::lock needs to be dropped to make
893 // send_report callers: DaemonServer::lock -> PyModuleRegistery::lock
894 // active_start: PyModuleRegistry::lock -> ActivePyModules::lock
896 // if we don't release this->lock before calling schedule_tick a cycle is
897 // formed with the addition of ActivePyModules::lock -> DaemonServer::lock.
898 // This is still correct as send_report is run asynchronously under
899 // DaemonServer::lock.
901 server
.schedule_tick(0);
904 int ActivePyModules::handle_command(
905 std::string
const &module_name
,
906 const cmdmap_t
&cmdmap
,
907 const bufferlist
&inbuf
,
908 std::stringstream
*ds
,
909 std::stringstream
*ss
)
912 auto mod_iter
= modules
.find(module_name
);
913 if (mod_iter
== modules
.end()) {
914 *ss
<< "Module '" << module_name
<< "' is not available";
919 return mod_iter
->second
->handle_command(cmdmap
, inbuf
, ds
, ss
);
922 void ActivePyModules::get_health_checks(health_check_map_t
*checks
)
924 std::lock_guard
l(lock
);
925 for (auto& p
: modules
) {
926 p
.second
->get_health_checks(checks
);
930 void ActivePyModules::update_progress_event(
931 const std::string
& evid
,
932 const std::string
& desc
,
935 std::lock_guard
l(lock
);
936 auto& pe
= progress_events
[evid
];
938 pe
.progress
= progress
;
941 void ActivePyModules::complete_progress_event(const std::string
& evid
)
943 std::lock_guard
l(lock
);
944 progress_events
.erase(evid
);
947 void ActivePyModules::clear_all_progress_events()
949 std::lock_guard
l(lock
);
950 progress_events
.clear();
953 void ActivePyModules::get_progress_events(std::map
<std::string
,ProgressEvent
> *events
)
955 std::lock_guard
l(lock
);
956 *events
= progress_events
;
959 void ActivePyModules::config_notify()
961 std::lock_guard
l(lock
);
962 for (auto& i
: modules
) {
963 auto module
= i
.second
.get();
964 // Send all python calls down a Finisher to avoid blocking
965 // C++ code, and avoid any potential lock cycles.
966 finisher
.queue(new FunctionContext([module
](int r
){
967 module
->config_notify();
972 void ActivePyModules::set_uri(const std::string
& module_name
,
973 const std::string
&uri
)
975 std::lock_guard
l(lock
);
977 dout(4) << " module " << module_name
<< " set URI '" << uri
<< "'" << dendl
;
979 modules
[module_name
]->set_uri(uri
);
982 OSDPerfMetricQueryID
ActivePyModules::add_osd_perf_query(
983 const OSDPerfMetricQuery
&query
,
984 const std::optional
<OSDPerfMetricLimit
> &limit
)
986 return server
.add_osd_perf_query(query
, limit
);
989 void ActivePyModules::remove_osd_perf_query(OSDPerfMetricQueryID query_id
)
991 int r
= server
.remove_osd_perf_query(query_id
);
993 dout(0) << "remove_osd_perf_query for query_id=" << query_id
<< " failed: "
994 << cpp_strerror(r
) << dendl
;
998 PyObject
*ActivePyModules::get_osd_perf_counters(OSDPerfMetricQueryID query_id
)
1000 std::map
<OSDPerfMetricKey
, PerformanceCounters
> counters
;
1002 int r
= server
.get_osd_perf_counters(query_id
, &counters
);
1004 dout(0) << "get_osd_perf_counters for query_id=" << query_id
<< " failed: "
1005 << cpp_strerror(r
) << dendl
;
1011 f
.open_array_section("counters");
1012 for (auto &it
: counters
) {
1013 auto &key
= it
.first
;
1014 auto &instance_counters
= it
.second
;
1015 f
.open_object_section("i");
1016 f
.open_array_section("k");
1017 for (auto &sub_key
: key
) {
1018 f
.open_array_section("s");
1019 for (size_t i
= 0; i
< sub_key
.size(); i
++) {
1020 f
.dump_string(stringify(i
).c_str(), sub_key
[i
]);
1022 f
.close_section(); // s
1024 f
.close_section(); // k
1025 f
.open_array_section("c");
1026 for (auto &c
: instance_counters
) {
1027 f
.open_array_section("p");
1028 f
.dump_unsigned("0", c
.first
);
1029 f
.dump_unsigned("1", c
.second
);
1030 f
.close_section(); // p
1032 f
.close_section(); // c
1033 f
.close_section(); // i
1035 f
.close_section(); // counters
1040 void ActivePyModules::cluster_log(const std::string
&channel
, clog_type prio
,
1041 const std::string
&message
)
1043 std::lock_guard
l(lock
);
1045 if (channel
== "audit") {
1046 audit_clog
->do_log(prio
, message
);
1048 clog
->do_log(prio
, message
);