1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2014 John Spray <john.spray@inktank.com>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
14 // Include this first to get python headers earlier
17 #include "common/errno.h"
18 #include "include/stringify.h"
20 #include "PyFormatter.h"
22 #include "osd/OSDMap.h"
23 #include "mon/MonMap.h"
25 #include "mgr/MgrContext.h"
27 // For ::config_prefix
29 #include "PyModuleRegistry.h"
31 #include "ActivePyModules.h"
32 #include "DaemonKey.h"
33 #include "DaemonServer.h"
35 #define dout_context g_ceph_context
36 #define dout_subsys ceph_subsys_mgr
38 #define dout_prefix *_dout << "mgr " << __func__ << " "
40 ActivePyModules::ActivePyModules(PyModuleConfig
&module_config_
,
41 std::map
<std::string
, std::string
> store_data
,
42 DaemonStateIndex
&ds
, ClusterState
&cs
,
43 MonClient
&mc
, LogChannelRef clog_
,
44 LogChannelRef audit_clog_
, Objecter
&objecter_
,
45 Client
&client_
, Finisher
&f
, DaemonServer
&server
,
46 PyModuleRegistry
&pmr
)
47 : module_config(module_config_
), daemon_state(ds
), cluster_state(cs
),
48 monc(mc
), clog(clog_
), audit_clog(audit_clog_
), objecter(objecter_
),
49 client(client_
), finisher(f
),
50 cmd_finisher(g_ceph_context
, "cmd_finisher", "cmdfin"),
51 server(server
), py_module_registry(pmr
)
53 store_cache
= std::move(store_data
);
57 ActivePyModules::~ActivePyModules() = default;
59 void ActivePyModules::dump_server(const std::string
&hostname
,
60 const DaemonStateCollection
&dmc
,
63 f
->dump_string("hostname", hostname
);
64 f
->open_array_section("services");
65 std::string ceph_version
;
67 for (const auto &[key
, state
] : dmc
) {
68 std::lock_guard
l(state
->lock
);
69 // TODO: pick the highest version, and make sure that
70 // somewhere else (during health reporting?) we are
71 // indicating to the user if we see mixed versions
72 auto ver_iter
= state
->metadata
.find("ceph_version");
73 if (ver_iter
!= state
->metadata
.end()) {
74 ceph_version
= state
->metadata
.at("ceph_version");
77 f
->open_object_section("service");
78 f
->dump_string("type", key
.type
);
79 f
->dump_string("id", key
.name
);
84 f
->dump_string("ceph_version", ceph_version
);
89 PyObject
*ActivePyModules::get_server_python(const std::string
&hostname
)
91 PyThreadState
*tstate
= PyEval_SaveThread();
92 std::lock_guard
l(lock
);
93 PyEval_RestoreThread(tstate
);
94 dout(10) << " (" << hostname
<< ")" << dendl
;
96 auto dmc
= daemon_state
.get_by_server(hostname
);
99 dump_server(hostname
, dmc
, &f
);
104 PyObject
*ActivePyModules::list_servers_python()
106 PyFormatter
f(false, true);
107 PyThreadState
*tstate
= PyEval_SaveThread();
108 dout(10) << " >" << dendl
;
110 daemon_state
.with_daemons_by_server([this, &f
, &tstate
]
111 (const std::map
<std::string
, DaemonStateCollection
> &all
) {
112 PyEval_RestoreThread(tstate
);
114 for (const auto &i
: all
) {
115 const auto &hostname
= i
.first
;
117 f
.open_object_section("server");
118 dump_server(hostname
, i
.second
, &f
);
126 PyObject
*ActivePyModules::get_metadata_python(
127 const std::string
&svc_type
,
128 const std::string
&svc_id
)
130 auto metadata
= daemon_state
.get(DaemonKey
{svc_type
, svc_id
});
131 if (metadata
== nullptr) {
132 derr
<< "Requested missing service " << svc_type
<< "." << svc_id
<< dendl
;
136 std::lock_guard
l(metadata
->lock
);
138 f
.dump_string("hostname", metadata
->hostname
);
139 for (const auto &i
: metadata
->metadata
) {
140 f
.dump_string(i
.first
.c_str(), i
.second
);
146 PyObject
*ActivePyModules::get_daemon_status_python(
147 const std::string
&svc_type
,
148 const std::string
&svc_id
)
150 auto metadata
= daemon_state
.get(DaemonKey
{svc_type
, svc_id
});
151 if (metadata
== nullptr) {
152 derr
<< "Requested missing service " << svc_type
<< "." << svc_id
<< dendl
;
156 std::lock_guard
l(metadata
->lock
);
158 for (const auto &i
: metadata
->service_status
) {
159 f
.dump_string(i
.first
.c_str(), i
.second
);
164 PyObject
*ActivePyModules::get_python(const std::string
&what
)
168 // Drop the GIL, as most of the following blocks will block on
169 // a mutex -- they are all responsible for re-taking the GIL before
170 // touching the PyFormatter instance or returning from the function.
171 PyThreadState
*tstate
= PyEval_SaveThread();
173 if (what
== "fs_map") {
174 cluster_state
.with_fsmap([&f
, &tstate
](const FSMap
&fsmap
) {
175 PyEval_RestoreThread(tstate
);
179 } else if (what
== "osdmap_crush_map_text") {
181 cluster_state
.with_osdmap([&rdata
, &tstate
](const OSDMap
&osd_map
){
182 PyEval_RestoreThread(tstate
);
183 osd_map
.crush
->encode(rdata
, CEPH_FEATURES_SUPPORTED_DEFAULT
);
185 std::string crush_text
= rdata
.to_str();
186 return PyUnicode_FromString(crush_text
.c_str());
187 } else if (what
.substr(0, 7) == "osd_map") {
188 cluster_state
.with_osdmap([&f
, &what
, &tstate
](const OSDMap
&osd_map
){
189 PyEval_RestoreThread(tstate
);
190 if (what
== "osd_map") {
192 } else if (what
== "osd_map_tree") {
193 osd_map
.print_tree(&f
, nullptr);
194 } else if (what
== "osd_map_crush") {
195 osd_map
.crush
->dump(&f
);
199 } else if (what
== "modified_config_options") {
200 PyEval_RestoreThread(tstate
);
201 auto all_daemons
= daemon_state
.get_all();
203 for (auto& [key
, daemon
] : all_daemons
) {
204 std::lock_guard
l(daemon
->lock
);
205 for (auto& [name
, valmap
] : daemon
->config
) {
209 f
.open_array_section("options");
210 for (auto& name
: names
) {
211 f
.dump_string("name", name
);
215 } else if (what
.substr(0, 6) == "config") {
216 PyEval_RestoreThread(tstate
);
217 if (what
== "config_options") {
218 g_conf().config_options(&f
);
219 } else if (what
== "config") {
220 g_conf().show_config(&f
);
223 } else if (what
== "mon_map") {
224 cluster_state
.with_monmap(
225 [&f
, &tstate
](const MonMap
&monmap
) {
226 PyEval_RestoreThread(tstate
);
231 } else if (what
== "service_map") {
232 cluster_state
.with_servicemap(
233 [&f
, &tstate
](const ServiceMap
&service_map
) {
234 PyEval_RestoreThread(tstate
);
235 service_map
.dump(&f
);
239 } else if (what
== "osd_metadata") {
240 auto dmc
= daemon_state
.get_by_service("osd");
241 PyEval_RestoreThread(tstate
);
243 for (const auto &[key
, state
] : dmc
) {
244 std::lock_guard
l(state
->lock
);
245 f
.open_object_section(key
.name
.c_str());
246 f
.dump_string("hostname", state
->hostname
);
247 for (const auto &[name
, val
] : state
->metadata
) {
248 f
.dump_string(name
.c_str(), val
);
253 } else if (what
== "mds_metadata") {
254 auto dmc
= daemon_state
.get_by_service("mds");
255 PyEval_RestoreThread(tstate
);
257 for (const auto &[key
, state
] : dmc
) {
258 std::lock_guard
l(state
->lock
);
259 f
.open_object_section(key
.name
.c_str());
260 f
.dump_string("hostname", state
->hostname
);
261 for (const auto &[name
, val
] : state
->metadata
) {
262 f
.dump_string(name
.c_str(), val
);
267 } else if (what
== "pg_summary") {
268 cluster_state
.with_pgmap(
269 [&f
, &tstate
](const PGMap
&pg_map
) {
270 PyEval_RestoreThread(tstate
);
272 std::map
<std::string
, std::map
<std::string
, uint32_t> > osds
;
273 std::map
<std::string
, std::map
<std::string
, uint32_t> > pools
;
274 std::map
<std::string
, uint32_t> all
;
275 for (const auto &i
: pg_map
.pg_stat
) {
276 const auto pool
= i
.first
.m_pool
;
277 const std::string state
= pg_state_string(i
.second
.state
);
278 // Insert to per-pool map
279 pools
[stringify(pool
)][state
]++;
280 for (const auto &osd_id
: i
.second
.acting
) {
281 osds
[stringify(osd_id
)][state
]++;
285 f
.open_object_section("by_osd");
286 for (const auto &i
: osds
) {
287 f
.open_object_section(i
.first
.c_str());
288 for (const auto &j
: i
.second
) {
289 f
.dump_int(j
.first
.c_str(), j
.second
);
294 f
.open_object_section("by_pool");
295 for (const auto &i
: pools
) {
296 f
.open_object_section(i
.first
.c_str());
297 for (const auto &j
: i
.second
) {
298 f
.dump_int(j
.first
.c_str(), j
.second
);
303 f
.open_object_section("all");
304 for (const auto &i
: all
) {
305 f
.dump_int(i
.first
.c_str(), i
.second
);
308 f
.open_object_section("pg_stats_sum");
309 pg_map
.pg_sum
.dump(&f
);
314 } else if (what
== "pg_status") {
315 cluster_state
.with_pgmap(
316 [&f
, &tstate
](const PGMap
&pg_map
) {
317 PyEval_RestoreThread(tstate
);
318 pg_map
.print_summary(&f
, nullptr);
322 } else if (what
== "pg_dump") {
323 cluster_state
.with_pgmap(
324 [&f
, &tstate
](const PGMap
&pg_map
) {
325 PyEval_RestoreThread(tstate
);
326 pg_map
.dump(&f
, false);
330 } else if (what
== "devices") {
331 daemon_state
.with_devices2(
333 PyEval_RestoreThread(tstate
);
334 f
.open_array_section("devices");
336 [&f
] (const DeviceState
& dev
) {
337 f
.dump_object("device", dev
);
341 } else if (what
.size() > 7 &&
342 what
.substr(0, 7) == "device ") {
343 string devid
= what
.substr(7);
344 if (!daemon_state
.with_device(
346 [&f
, &tstate
] (const DeviceState
& dev
) {
347 PyEval_RestoreThread(tstate
);
348 f
.dump_object("device", dev
);
351 PyEval_RestoreThread(tstate
);
354 } else if (what
== "io_rate") {
355 cluster_state
.with_pgmap(
356 [&f
, &tstate
](const PGMap
&pg_map
) {
357 PyEval_RestoreThread(tstate
);
358 pg_map
.dump_delta(&f
);
362 } else if (what
== "df") {
363 cluster_state
.with_osdmap_and_pgmap(
365 const OSDMap
& osd_map
,
366 const PGMap
&pg_map
) {
367 PyEval_RestoreThread(tstate
);
368 pg_map
.dump_cluster_stats(nullptr, &f
, true);
369 pg_map
.dump_pool_stats_full(osd_map
, nullptr, &f
, true);
372 } else if (what
== "pg_stats") {
373 cluster_state
.with_pgmap(
374 [&f
, &tstate
](const PGMap
&pg_map
) {
375 PyEval_RestoreThread(tstate
);
376 pg_map
.dump_pg_stats(&f
, false);
379 } else if (what
== "pool_stats") {
380 cluster_state
.with_pgmap(
381 [&f
, &tstate
](const PGMap
&pg_map
) {
382 PyEval_RestoreThread(tstate
);
383 pg_map
.dump_pool_stats(&f
);
386 } else if (what
== "pg_ready") {
387 PyEval_RestoreThread(tstate
);
388 server
.dump_pg_ready(&f
);
390 } else if (what
== "osd_stats") {
391 cluster_state
.with_pgmap(
392 [&f
, &tstate
](const PGMap
&pg_map
) {
393 PyEval_RestoreThread(tstate
);
394 pg_map
.dump_osd_stats(&f
, false);
397 } else if (what
== "osd_ping_times") {
398 cluster_state
.with_pgmap(
399 [&f
, &tstate
](const PGMap
&pg_map
) {
400 PyEval_RestoreThread(tstate
);
401 pg_map
.dump_osd_ping_times(&f
);
404 } else if (what
== "osd_pool_stats") {
405 int64_t poolid
= -ENOENT
;
406 cluster_state
.with_osdmap_and_pgmap([&](const OSDMap
& osdmap
,
407 const PGMap
& pg_map
) {
408 PyEval_RestoreThread(tstate
);
409 f
.open_array_section("pool_stats");
410 for (auto &p
: osdmap
.get_pools()) {
412 pg_map
.dump_pool_stats_and_io_rate(poolid
, osdmap
, &f
, nullptr);
417 } else if (what
== "health") {
418 cluster_state
.with_health(
419 [&f
, &tstate
](const ceph::bufferlist
&health_json
) {
420 PyEval_RestoreThread(tstate
);
421 f
.dump_string("json", health_json
.to_str());
424 } else if (what
== "mon_status") {
425 cluster_state
.with_mon_status(
426 [&f
, &tstate
](const ceph::bufferlist
&mon_status_json
) {
427 PyEval_RestoreThread(tstate
);
428 f
.dump_string("json", mon_status_json
.to_str());
431 } else if (what
== "mgr_map") {
432 cluster_state
.with_mgrmap([&f
, &tstate
](const MgrMap
&mgr_map
) {
433 PyEval_RestoreThread(tstate
);
438 derr
<< "Python module requested unknown data '" << what
<< "'" << dendl
;
439 PyEval_RestoreThread(tstate
);
444 void ActivePyModules::start_one(PyModuleRef py_module
)
446 std::lock_guard
l(lock
);
448 const auto name
= py_module
->get_name();
449 auto active_module
= std::make_shared
<ActivePyModule
>(py_module
, clog
);
451 pending_modules
.insert(name
);
452 // Send all python calls down a Finisher to avoid blocking
453 // C++ code, and avoid any potential lock cycles.
454 finisher
.queue(new LambdaContext([this, active_module
, name
](int) {
455 int r
= active_module
->load(this);
456 std::lock_guard
l(lock
);
457 pending_modules
.erase(name
);
459 derr
<< "Failed to run module in active mode ('" << name
<< "')"
462 auto em
= modules
.emplace(name
, active_module
);
463 ceph_assert(em
.second
); // actually inserted
465 dout(4) << "Starting thread for " << name
<< dendl
;
466 active_module
->thread
.create(active_module
->get_thread_name());
471 void ActivePyModules::shutdown()
473 std::lock_guard
locker(lock
);
475 // Signal modules to drop out of serve() and/or tear down resources
476 for (auto& [name
, module
] : modules
) {
478 dout(10) << "calling module " << name
<< " shutdown()" << dendl
;
480 dout(10) << "module " << name
<< " shutdown() returned" << dendl
;
484 // For modules implementing serve(), finish the threads where we
485 // were running that.
486 for (auto& [name
, module
] : modules
) {
488 dout(10) << "joining module " << name
<< dendl
;
489 module
->thread
.join();
490 dout(10) << "joined module " << name
<< dendl
;
494 cmd_finisher
.wait_for_empty();
500 void ActivePyModules::notify_all(const std::string
¬ify_type
,
501 const std::string
¬ify_id
)
503 std::lock_guard
l(lock
);
505 dout(10) << __func__
<< ": notify_all " << notify_type
<< dendl
;
506 for (auto& [name
, module
] : modules
) {
507 // Send all python calls down a Finisher to avoid blocking
508 // C++ code, and avoid any potential lock cycles.
509 dout(15) << "queuing notify to " << name
<< dendl
;
510 // workaround for https://bugs.llvm.org/show_bug.cgi?id=35984
511 finisher
.queue(new LambdaContext([module
=module
, notify_type
, notify_id
]
513 module
->notify(notify_type
, notify_id
);
518 void ActivePyModules::notify_all(const LogEntry
&log_entry
)
520 std::lock_guard
l(lock
);
522 dout(10) << __func__
<< ": notify_all (clog)" << dendl
;
523 for (auto& [name
, module
] : modules
) {
524 // Send all python calls down a Finisher to avoid blocking
525 // C++ code, and avoid any potential lock cycles.
527 // Note intentional use of non-reference lambda binding on
528 // log_entry: we take a copy because caller's instance is
529 // probably ephemeral.
530 dout(15) << "queuing notify (clog) to " << name
<< dendl
;
531 // workaround for https://bugs.llvm.org/show_bug.cgi?id=35984
532 finisher
.queue(new LambdaContext([module
=module
, log_entry
](int r
){
533 module
->notify_clog(log_entry
);
538 bool ActivePyModules::get_store(const std::string
&module_name
,
539 const std::string
&key
, std::string
*val
) const
541 PyThreadState
*tstate
= PyEval_SaveThread();
542 std::lock_guard
l(lock
);
543 PyEval_RestoreThread(tstate
);
545 const std::string global_key
= PyModule::config_prefix
546 + module_name
+ "/" + key
;
548 dout(4) << __func__
<< " key: " << global_key
<< dendl
;
550 auto i
= store_cache
.find(global_key
);
551 if (i
!= store_cache
.end()) {
559 PyObject
*ActivePyModules::dispatch_remote(
560 const std::string
&other_module
,
561 const std::string
&method
,
566 auto mod_iter
= modules
.find(other_module
);
567 ceph_assert(mod_iter
!= modules
.end());
569 return mod_iter
->second
->dispatch_remote(method
, args
, kwargs
, err
);
572 bool ActivePyModules::get_config(const std::string
&module_name
,
573 const std::string
&key
, std::string
*val
) const
575 const std::string global_key
= PyModule::config_prefix
576 + module_name
+ "/" + key
;
578 dout(20) << " key: " << global_key
<< dendl
;
580 std::lock_guard
lock(module_config
.lock
);
582 auto i
= module_config
.config
.find(global_key
);
583 if (i
!= module_config
.config
.end()) {
591 PyObject
*ActivePyModules::get_typed_config(
592 const std::string
&module_name
,
593 const std::string
&key
,
594 const std::string
&prefix
) const
596 PyThreadState
*tstate
= PyEval_SaveThread();
598 std::string final_key
;
601 final_key
= prefix
+ "/" + key
;
602 found
= get_config(module_name
, final_key
, &value
);
606 found
= get_config(module_name
, final_key
, &value
);
609 PyModuleRef module
= py_module_registry
.get_module(module_name
);
610 PyEval_RestoreThread(tstate
);
612 derr
<< "Module '" << module_name
<< "' is not available" << dendl
;
615 dout(10) << __func__
<< " " << final_key
<< " found: " << value
<< dendl
;
616 return module
->get_typed_option_value(key
, value
);
618 PyEval_RestoreThread(tstate
);
620 dout(10) << " [" << prefix
<< "/]" << key
<< " not found "
623 dout(10) << " " << key
<< " not found " << dendl
;
628 PyObject
*ActivePyModules::get_store_prefix(const std::string
&module_name
,
629 const std::string
&prefix
) const
631 PyThreadState
*tstate
= PyEval_SaveThread();
632 std::lock_guard
l(lock
);
633 std::lock_guard
lock(module_config
.lock
);
634 PyEval_RestoreThread(tstate
);
636 const std::string base_prefix
= PyModule::config_prefix
638 const std::string global_prefix
= base_prefix
+ prefix
;
639 dout(4) << __func__
<< " prefix: " << global_prefix
<< dendl
;
643 for (auto p
= store_cache
.lower_bound(global_prefix
);
644 p
!= store_cache
.end() && p
->first
.find(global_prefix
) == 0;
646 f
.dump_string(p
->first
.c_str() + base_prefix
.size(), p
->second
);
651 void ActivePyModules::set_store(const std::string
&module_name
,
652 const std::string
&key
, const boost::optional
<std::string
>& val
)
654 const std::string global_key
= PyModule::config_prefix
655 + module_name
+ "/" + key
;
659 std::lock_guard
l(lock
);
661 store_cache
[global_key
] = *val
;
663 store_cache
.erase(global_key
);
666 std::ostringstream cmd_json
;
668 jf
.open_object_section("cmd");
670 jf
.dump_string("prefix", "config-key set");
671 jf
.dump_string("key", global_key
);
672 jf
.dump_string("val", *val
);
674 jf
.dump_string("prefix", "config-key del");
675 jf
.dump_string("key", global_key
);
679 set_cmd
.run(&monc
, cmd_json
.str());
683 if (set_cmd
.r
!= 0) {
684 // config-key set will fail if mgr's auth key has insufficient
685 // permission to set config keys
686 // FIXME: should this somehow raise an exception back into Python land?
687 dout(0) << "`config-key set " << global_key
<< " " << val
<< "` failed: "
688 << cpp_strerror(set_cmd
.r
) << dendl
;
689 dout(0) << "mon returned " << set_cmd
.r
<< ": " << set_cmd
.outs
<< dendl
;
693 void ActivePyModules::set_config(const std::string
&module_name
,
694 const std::string
&key
, const boost::optional
<std::string
>& val
)
696 module_config
.set_config(&monc
, module_name
, key
, val
);
699 std::map
<std::string
, std::string
> ActivePyModules::get_services() const
701 std::map
<std::string
, std::string
> result
;
702 std::lock_guard
l(lock
);
703 for (const auto& [name
, module
] : modules
) {
704 std::string svc_str
= module
->get_uri();
705 if (!svc_str
.empty()) {
706 result
[name
] = svc_str
;
713 PyObject
* ActivePyModules::with_perf_counters(
714 std::function
<void(PerfCounterInstance
& counter_instance
, PerfCounterType
& counter_type
, PyFormatter
& f
)> fct
,
715 const std::string
&svc_name
,
716 const std::string
&svc_id
,
717 const std::string
&path
) const
719 PyThreadState
*tstate
= PyEval_SaveThread();
720 std::lock_guard
l(lock
);
721 PyEval_RestoreThread(tstate
);
724 f
.open_array_section(path
.c_str());
726 auto metadata
= daemon_state
.get(DaemonKey
{svc_name
, svc_id
});
728 std::lock_guard
l2(metadata
->lock
);
729 if (metadata
->perf_counters
.instances
.count(path
)) {
730 auto counter_instance
= metadata
->perf_counters
.instances
.at(path
);
731 auto counter_type
= metadata
->perf_counters
.types
.at(path
);
732 fct(counter_instance
, counter_type
, f
);
734 dout(4) << "Missing counter: '" << path
<< "' ("
735 << svc_name
<< "." << svc_id
<< ")" << dendl
;
736 dout(20) << "Paths are:" << dendl
;
737 for (const auto &i
: metadata
->perf_counters
.instances
) {
738 dout(20) << i
.first
<< dendl
;
742 dout(4) << "No daemon state for "
743 << svc_name
<< "." << svc_id
<< ")" << dendl
;
749 PyObject
* ActivePyModules::get_counter_python(
750 const std::string
&svc_name
,
751 const std::string
&svc_id
,
752 const std::string
&path
)
754 auto extract_counters
= [](
755 PerfCounterInstance
& counter_instance
,
756 PerfCounterType
& counter_type
,
759 if (counter_type
.type
& PERFCOUNTER_LONGRUNAVG
) {
760 const auto &avg_data
= counter_instance
.get_data_avg();
761 for (const auto &datapoint
: avg_data
) {
762 f
.open_array_section("datapoint");
763 f
.dump_float("t", datapoint
.t
);
764 f
.dump_unsigned("s", datapoint
.s
);
765 f
.dump_unsigned("c", datapoint
.c
);
769 const auto &data
= counter_instance
.get_data();
770 for (const auto &datapoint
: data
) {
771 f
.open_array_section("datapoint");
772 f
.dump_float("t", datapoint
.t
);
773 f
.dump_unsigned("v", datapoint
.v
);
778 return with_perf_counters(extract_counters
, svc_name
, svc_id
, path
);
781 PyObject
* ActivePyModules::get_latest_counter_python(
782 const std::string
&svc_name
,
783 const std::string
&svc_id
,
784 const std::string
&path
)
786 auto extract_latest_counters
= [](
787 PerfCounterInstance
& counter_instance
,
788 PerfCounterType
& counter_type
,
791 if (counter_type
.type
& PERFCOUNTER_LONGRUNAVG
) {
792 const auto &datapoint
= counter_instance
.get_latest_data_avg();
793 f
.dump_float("t", datapoint
.t
);
794 f
.dump_unsigned("s", datapoint
.s
);
795 f
.dump_unsigned("c", datapoint
.c
);
797 const auto &datapoint
= counter_instance
.get_latest_data();
798 f
.dump_float("t", datapoint
.t
);
799 f
.dump_unsigned("v", datapoint
.v
);
802 return with_perf_counters(extract_latest_counters
, svc_name
, svc_id
, path
);
805 PyObject
* ActivePyModules::get_perf_schema_python(
806 const std::string
&svc_type
,
807 const std::string
&svc_id
)
809 PyThreadState
*tstate
= PyEval_SaveThread();
810 std::lock_guard
l(lock
);
811 PyEval_RestoreThread(tstate
);
813 DaemonStateCollection daemons
;
815 if (svc_type
== "") {
816 daemons
= daemon_state
.get_all();
817 } else if (svc_id
.empty()) {
818 daemons
= daemon_state
.get_by_service(svc_type
);
820 auto key
= DaemonKey
{svc_type
, svc_id
};
821 // so that the below can be a loop in all cases
822 auto got
= daemon_state
.get(key
);
823 if (got
!= nullptr) {
829 if (!daemons
.empty()) {
830 for (auto& [key
, state
] : daemons
) {
831 f
.open_object_section(ceph::to_string(key
).c_str());
833 std::lock_guard
l(state
->lock
);
834 for (auto ctr_inst_iter
: state
->perf_counters
.instances
) {
835 const auto &counter_name
= ctr_inst_iter
.first
;
836 f
.open_object_section(counter_name
.c_str());
837 auto type
= state
->perf_counters
.types
[counter_name
];
838 f
.dump_string("description", type
.description
);
839 if (!type
.nick
.empty()) {
840 f
.dump_string("nick", type
.nick
);
842 f
.dump_unsigned("type", type
.type
);
843 f
.dump_unsigned("priority", type
.priority
);
844 f
.dump_unsigned("units", type
.unit
);
850 dout(4) << __func__
<< ": No daemon state found for "
851 << svc_type
<< "." << svc_id
<< ")" << dendl
;
856 PyObject
*ActivePyModules::get_context()
858 PyThreadState
*tstate
= PyEval_SaveThread();
859 std::lock_guard
l(lock
);
860 PyEval_RestoreThread(tstate
);
862 // Construct a capsule containing ceph context.
863 // Not incrementing/decrementing ref count on the context because
864 // it's the global one and it has process lifetime.
865 auto capsule
= PyCapsule_New(g_ceph_context
, nullptr, nullptr);
870 * Helper for our wrapped types that take a capsule in their constructor.
872 PyObject
*construct_with_capsule(
873 const std::string
&module_name
,
874 const std::string
&clsname
,
877 // Look up the OSDMap type which we will construct
878 PyObject
*module
= PyImport_ImportModule(module_name
.c_str());
880 derr
<< "Failed to import python module:" << dendl
;
881 derr
<< handle_pyerror() << dendl
;
885 PyObject
*wrapper_type
= PyObject_GetAttrString(
886 module
, (const char*)clsname
.c_str());
888 derr
<< "Failed to get python type:" << dendl
;
889 derr
<< handle_pyerror() << dendl
;
891 ceph_assert(wrapper_type
);
893 // Construct a capsule containing an OSDMap.
894 auto wrapped_capsule
= PyCapsule_New(wrapped
, nullptr, nullptr);
895 ceph_assert(wrapped_capsule
);
897 // Construct the python OSDMap
898 auto pArgs
= PyTuple_Pack(1, wrapped_capsule
);
899 auto wrapper_instance
= PyObject_CallObject(wrapper_type
, pArgs
);
900 if (wrapper_instance
== nullptr) {
901 derr
<< "Failed to construct python OSDMap:" << dendl
;
902 derr
<< handle_pyerror() << dendl
;
904 ceph_assert(wrapper_instance
!= nullptr);
906 Py_DECREF(wrapped_capsule
);
908 Py_DECREF(wrapper_type
);
911 return wrapper_instance
;
914 PyObject
*ActivePyModules::get_osdmap()
916 OSDMap
*newmap
= new OSDMap
;
918 PyThreadState
*tstate
= PyEval_SaveThread();
920 std::lock_guard
l(lock
);
921 cluster_state
.with_osdmap([&](const OSDMap
& o
) {
922 newmap
->deepish_copy_from(o
);
925 PyEval_RestoreThread(tstate
);
927 return construct_with_capsule("mgr_module", "OSDMap", (void*)newmap
);
930 void ActivePyModules::set_health_checks(const std::string
& module_name
,
931 health_check_map_t
&& checks
)
933 bool changed
= false;
936 auto p
= modules
.find(module_name
);
937 if (p
!= modules
.end()) {
938 changed
= p
->second
->set_health_checks(std::move(checks
));
942 // immediately schedule a report to be sent to the monitors with the new
943 // health checks that have changed. This is done asynchronusly to avoid
944 // blocking python land. ActivePyModules::lock needs to be dropped to make
947 // send_report callers: DaemonServer::lock -> PyModuleRegistery::lock
948 // active_start: PyModuleRegistry::lock -> ActivePyModules::lock
950 // if we don't release this->lock before calling schedule_tick a cycle is
951 // formed with the addition of ActivePyModules::lock -> DaemonServer::lock.
952 // This is still correct as send_report is run asynchronously under
953 // DaemonServer::lock.
955 server
.schedule_tick(0);
958 int ActivePyModules::handle_command(
959 const ModuleCommand
& module_command
,
960 const MgrSession
& session
,
961 const cmdmap_t
&cmdmap
,
962 const bufferlist
&inbuf
,
963 std::stringstream
*ds
,
964 std::stringstream
*ss
)
967 auto mod_iter
= modules
.find(module_command
.module_name
);
968 if (mod_iter
== modules
.end()) {
969 *ss
<< "Module '" << module_command
.module_name
<< "' is not available";
975 return mod_iter
->second
->handle_command(module_command
, session
, cmdmap
,
979 void ActivePyModules::get_health_checks(health_check_map_t
*checks
)
981 std::lock_guard
l(lock
);
982 for (auto& [name
, module
] : modules
) {
983 dout(15) << "getting health checks for " << name
<< dendl
;
984 module
->get_health_checks(checks
);
988 void ActivePyModules::update_progress_event(
989 const std::string
& evid
,
990 const std::string
& desc
,
993 std::lock_guard
l(lock
);
994 auto& pe
= progress_events
[evid
];
996 pe
.progress
= progress
;
999 void ActivePyModules::complete_progress_event(const std::string
& evid
)
1001 std::lock_guard
l(lock
);
1002 progress_events
.erase(evid
);
1005 void ActivePyModules::clear_all_progress_events()
1007 std::lock_guard
l(lock
);
1008 progress_events
.clear();
1011 void ActivePyModules::get_progress_events(std::map
<std::string
,ProgressEvent
> *events
)
1013 std::lock_guard
l(lock
);
1014 *events
= progress_events
;
1017 void ActivePyModules::config_notify()
1019 std::lock_guard
l(lock
);
1020 for (auto& [name
, module
] : modules
) {
1021 // Send all python calls down a Finisher to avoid blocking
1022 // C++ code, and avoid any potential lock cycles.
1023 dout(15) << "notify (config) " << name
<< dendl
;
1024 // workaround for https://bugs.llvm.org/show_bug.cgi?id=35984
1025 finisher
.queue(new LambdaContext([module
=module
](int r
){
1026 module
->config_notify();
1031 void ActivePyModules::set_uri(const std::string
& module_name
,
1032 const std::string
&uri
)
1034 std::lock_guard
l(lock
);
1036 dout(4) << " module " << module_name
<< " set URI '" << uri
<< "'" << dendl
;
1038 modules
.at(module_name
)->set_uri(uri
);
1041 MetricQueryID
ActivePyModules::add_osd_perf_query(
1042 const OSDPerfMetricQuery
&query
,
1043 const std::optional
<OSDPerfMetricLimit
> &limit
)
1045 return server
.add_osd_perf_query(query
, limit
);
1048 void ActivePyModules::remove_osd_perf_query(MetricQueryID query_id
)
1050 int r
= server
.remove_osd_perf_query(query_id
);
1052 dout(0) << "remove_osd_perf_query for query_id=" << query_id
<< " failed: "
1053 << cpp_strerror(r
) << dendl
;
1057 PyObject
*ActivePyModules::get_osd_perf_counters(MetricQueryID query_id
)
1059 std::map
<OSDPerfMetricKey
, PerformanceCounters
> counters
;
1061 int r
= server
.get_osd_perf_counters(query_id
, &counters
);
1063 dout(0) << "get_osd_perf_counters for query_id=" << query_id
<< " failed: "
1064 << cpp_strerror(r
) << dendl
;
1070 f
.open_array_section("counters");
1071 for (auto &it
: counters
) {
1072 auto &key
= it
.first
;
1073 auto &instance_counters
= it
.second
;
1074 f
.open_object_section("i");
1075 f
.open_array_section("k");
1076 for (auto &sub_key
: key
) {
1077 f
.open_array_section("s");
1078 for (size_t i
= 0; i
< sub_key
.size(); i
++) {
1079 f
.dump_string(stringify(i
).c_str(), sub_key
[i
]);
1081 f
.close_section(); // s
1083 f
.close_section(); // k
1084 f
.open_array_section("c");
1085 for (auto &c
: instance_counters
) {
1086 f
.open_array_section("p");
1087 f
.dump_unsigned("0", c
.first
);
1088 f
.dump_unsigned("1", c
.second
);
1089 f
.close_section(); // p
1091 f
.close_section(); // c
1092 f
.close_section(); // i
1094 f
.close_section(); // counters
1099 void ActivePyModules::cluster_log(const std::string
&channel
, clog_type prio
,
1100 const std::string
&message
)
1102 std::lock_guard
l(lock
);
1104 auto cl
= monc
.get_log_client()->create_channel(channel
);
1105 map
<string
,string
> log_to_monitors
;
1106 map
<string
,string
> log_to_syslog
;
1107 map
<string
,string
> log_channel
;
1108 map
<string
,string
> log_prio
;
1109 map
<string
,string
> log_to_graylog
;
1110 map
<string
,string
> log_to_graylog_host
;
1111 map
<string
,string
> log_to_graylog_port
;
1114 if (parse_log_client_options(g_ceph_context
, log_to_monitors
, log_to_syslog
,
1115 log_channel
, log_prio
, log_to_graylog
,
1116 log_to_graylog_host
, log_to_graylog_port
,
1118 cl
->update_config(log_to_monitors
, log_to_syslog
,
1119 log_channel
, log_prio
, log_to_graylog
,
1120 log_to_graylog_host
, log_to_graylog_port
,
1122 cl
->do_log(prio
, message
);
1125 void ActivePyModules::register_client(std::string_view name
, std::string addrs
)
1127 std::lock_guard
l(lock
);
1129 entity_addrvec_t addrv
;
1130 addrv
.parse(addrs
.data());
1132 dout(7) << "registering msgr client handle " << addrv
<< dendl
;
1133 py_module_registry
.register_client(name
, std::move(addrv
));
1136 void ActivePyModules::unregister_client(std::string_view name
, std::string addrs
)
1138 std::lock_guard
l(lock
);
1140 entity_addrvec_t addrv
;
1141 addrv
.parse(addrs
.data());
1143 dout(7) << "unregistering msgr client handle " << addrv
<< dendl
;
1144 py_module_registry
.unregister_client(name
, addrv
);