1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2014 John Spray <john.spray@inktank.com>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
14 // Include this first to get python headers earlier
17 #include "common/errno.h"
18 #include "include/stringify.h"
20 #include "PyFormatter.h"
22 #include "osd/OSDMap.h"
23 #include "mon/MonMap.h"
25 #include "mgr/MgrContext.h"
27 // For ::config_prefix
29 #include "PyModuleRegistry.h"
31 #include "ActivePyModules.h"
32 #include "DaemonKey.h"
33 #include "DaemonServer.h"
35 #define dout_context g_ceph_context
36 #define dout_subsys ceph_subsys_mgr
38 #define dout_prefix *_dout << "mgr " << __func__ << " "
40 ActivePyModules::ActivePyModules(PyModuleConfig
&module_config_
,
41 std::map
<std::string
, std::string
> store_data
,
42 DaemonStateIndex
&ds
, ClusterState
&cs
,
43 MonClient
&mc
, LogChannelRef clog_
,
44 LogChannelRef audit_clog_
, Objecter
&objecter_
,
45 Client
&client_
, Finisher
&f
, DaemonServer
&server
,
46 PyModuleRegistry
&pmr
)
47 : module_config(module_config_
), daemon_state(ds
), cluster_state(cs
),
48 monc(mc
), clog(clog_
), audit_clog(audit_clog_
), objecter(objecter_
),
49 client(client_
), finisher(f
),
50 cmd_finisher(g_ceph_context
, "cmd_finisher", "cmdfin"),
51 server(server
), py_module_registry(pmr
)
53 store_cache
= std::move(store_data
);
57 ActivePyModules::~ActivePyModules() = default;
59 void ActivePyModules::dump_server(const std::string
&hostname
,
60 const DaemonStateCollection
&dmc
,
63 f
->dump_string("hostname", hostname
);
64 f
->open_array_section("services");
65 std::string ceph_version
;
67 for (const auto &[key
, state
] : dmc
) {
68 std::lock_guard
l(state
->lock
);
69 // TODO: pick the highest version, and make sure that
70 // somewhere else (during health reporting?) we are
71 // indicating to the user if we see mixed versions
72 auto ver_iter
= state
->metadata
.find("ceph_version");
73 if (ver_iter
!= state
->metadata
.end()) {
74 ceph_version
= state
->metadata
.at("ceph_version");
77 f
->open_object_section("service");
78 f
->dump_string("type", key
.type
);
79 f
->dump_string("id", key
.name
);
84 f
->dump_string("ceph_version", ceph_version
);
89 PyObject
*ActivePyModules::get_server_python(const std::string
&hostname
)
91 PyThreadState
*tstate
= PyEval_SaveThread();
92 std::lock_guard
l(lock
);
93 PyEval_RestoreThread(tstate
);
94 dout(10) << " (" << hostname
<< ")" << dendl
;
96 auto dmc
= daemon_state
.get_by_server(hostname
);
99 dump_server(hostname
, dmc
, &f
);
104 PyObject
*ActivePyModules::list_servers_python()
106 PyFormatter
f(false, true);
107 PyThreadState
*tstate
= PyEval_SaveThread();
108 dout(10) << " >" << dendl
;
110 daemon_state
.with_daemons_by_server([this, &f
, &tstate
]
111 (const std::map
<std::string
, DaemonStateCollection
> &all
) {
112 PyEval_RestoreThread(tstate
);
114 for (const auto &i
: all
) {
115 const auto &hostname
= i
.first
;
117 f
.open_object_section("server");
118 dump_server(hostname
, i
.second
, &f
);
126 PyObject
*ActivePyModules::get_metadata_python(
127 const std::string
&svc_type
,
128 const std::string
&svc_id
)
130 auto metadata
= daemon_state
.get(DaemonKey
{svc_type
, svc_id
});
131 if (metadata
== nullptr) {
132 derr
<< "Requested missing service " << svc_type
<< "." << svc_id
<< dendl
;
136 std::lock_guard
l(metadata
->lock
);
138 f
.dump_string("hostname", metadata
->hostname
);
139 for (const auto &i
: metadata
->metadata
) {
140 f
.dump_string(i
.first
.c_str(), i
.second
);
146 PyObject
*ActivePyModules::get_daemon_status_python(
147 const std::string
&svc_type
,
148 const std::string
&svc_id
)
150 auto metadata
= daemon_state
.get(DaemonKey
{svc_type
, svc_id
});
151 if (metadata
== nullptr) {
152 derr
<< "Requested missing service " << svc_type
<< "." << svc_id
<< dendl
;
156 std::lock_guard
l(metadata
->lock
);
158 for (const auto &i
: metadata
->service_status
) {
159 f
.dump_string(i
.first
.c_str(), i
.second
);
164 PyObject
*ActivePyModules::get_python(const std::string
&what
)
168 // Drop the GIL, as most of the following blocks will block on
169 // a mutex -- they are all responsible for re-taking the GIL before
170 // touching the PyFormatter instance or returning from the function.
171 PyThreadState
*tstate
= PyEval_SaveThread();
173 if (what
== "fs_map") {
174 cluster_state
.with_fsmap([&f
, &tstate
](const FSMap
&fsmap
) {
175 PyEval_RestoreThread(tstate
);
179 } else if (what
== "osdmap_crush_map_text") {
181 cluster_state
.with_osdmap([&rdata
, &tstate
](const OSDMap
&osd_map
){
182 PyEval_RestoreThread(tstate
);
183 osd_map
.crush
->encode(rdata
, CEPH_FEATURES_SUPPORTED_DEFAULT
);
185 std::string crush_text
= rdata
.to_str();
186 return PyUnicode_FromString(crush_text
.c_str());
187 } else if (what
.substr(0, 7) == "osd_map") {
188 cluster_state
.with_osdmap([&f
, &what
, &tstate
](const OSDMap
&osd_map
){
189 PyEval_RestoreThread(tstate
);
190 if (what
== "osd_map") {
192 } else if (what
== "osd_map_tree") {
193 osd_map
.print_tree(&f
, nullptr);
194 } else if (what
== "osd_map_crush") {
195 osd_map
.crush
->dump(&f
);
199 } else if (what
== "modified_config_options") {
200 PyEval_RestoreThread(tstate
);
201 auto all_daemons
= daemon_state
.get_all();
203 for (auto& [key
, daemon
] : all_daemons
) {
204 std::lock_guard
l(daemon
->lock
);
205 for (auto& [name
, valmap
] : daemon
->config
) {
209 f
.open_array_section("options");
210 for (auto& name
: names
) {
211 f
.dump_string("name", name
);
215 } else if (what
.substr(0, 6) == "config") {
216 PyEval_RestoreThread(tstate
);
217 if (what
== "config_options") {
218 g_conf().config_options(&f
);
219 } else if (what
== "config") {
220 g_conf().show_config(&f
);
223 } else if (what
== "mon_map") {
224 cluster_state
.with_monmap(
225 [&f
, &tstate
](const MonMap
&monmap
) {
226 PyEval_RestoreThread(tstate
);
231 } else if (what
== "service_map") {
232 cluster_state
.with_servicemap(
233 [&f
, &tstate
](const ServiceMap
&service_map
) {
234 PyEval_RestoreThread(tstate
);
235 service_map
.dump(&f
);
239 } else if (what
== "osd_metadata") {
240 auto dmc
= daemon_state
.get_by_service("osd");
241 PyEval_RestoreThread(tstate
);
243 for (const auto &[key
, state
] : dmc
) {
244 std::lock_guard
l(state
->lock
);
245 f
.open_object_section(key
.name
.c_str());
246 f
.dump_string("hostname", state
->hostname
);
247 for (const auto &[name
, val
] : state
->metadata
) {
248 f
.dump_string(name
.c_str(), val
);
253 } else if (what
== "mds_metadata") {
254 auto dmc
= daemon_state
.get_by_service("mds");
255 PyEval_RestoreThread(tstate
);
257 for (const auto &[key
, state
] : dmc
) {
258 std::lock_guard
l(state
->lock
);
259 f
.open_object_section(key
.name
.c_str());
260 f
.dump_string("hostname", state
->hostname
);
261 for (const auto &[name
, val
] : state
->metadata
) {
262 f
.dump_string(name
.c_str(), val
);
267 } else if (what
== "pg_summary") {
268 cluster_state
.with_pgmap(
269 [&f
, &tstate
](const PGMap
&pg_map
) {
270 PyEval_RestoreThread(tstate
);
272 std::map
<std::string
, std::map
<std::string
, uint32_t> > osds
;
273 std::map
<std::string
, std::map
<std::string
, uint32_t> > pools
;
274 std::map
<std::string
, uint32_t> all
;
275 for (const auto &i
: pg_map
.pg_stat
) {
276 const auto pool
= i
.first
.m_pool
;
277 const std::string state
= pg_state_string(i
.second
.state
);
278 // Insert to per-pool map
279 pools
[stringify(pool
)][state
]++;
280 for (const auto &osd_id
: i
.second
.acting
) {
281 osds
[stringify(osd_id
)][state
]++;
285 f
.open_object_section("by_osd");
286 for (const auto &i
: osds
) {
287 f
.open_object_section(i
.first
.c_str());
288 for (const auto &j
: i
.second
) {
289 f
.dump_int(j
.first
.c_str(), j
.second
);
294 f
.open_object_section("by_pool");
295 for (const auto &i
: pools
) {
296 f
.open_object_section(i
.first
.c_str());
297 for (const auto &j
: i
.second
) {
298 f
.dump_int(j
.first
.c_str(), j
.second
);
303 f
.open_object_section("all");
304 for (const auto &i
: all
) {
305 f
.dump_int(i
.first
.c_str(), i
.second
);
308 f
.open_object_section("pg_stats_sum");
309 pg_map
.pg_sum
.dump(&f
);
314 } else if (what
== "pg_status") {
315 cluster_state
.with_pgmap(
316 [&f
, &tstate
](const PGMap
&pg_map
) {
317 PyEval_RestoreThread(tstate
);
318 pg_map
.print_summary(&f
, nullptr);
322 } else if (what
== "pg_dump") {
323 cluster_state
.with_pgmap(
324 [&f
, &tstate
](const PGMap
&pg_map
) {
325 PyEval_RestoreThread(tstate
);
326 pg_map
.dump(&f
, false);
330 } else if (what
== "devices") {
331 daemon_state
.with_devices2(
333 PyEval_RestoreThread(tstate
);
334 f
.open_array_section("devices");
336 [&f
] (const DeviceState
& dev
) {
337 f
.dump_object("device", dev
);
341 } else if (what
.size() > 7 &&
342 what
.substr(0, 7) == "device ") {
343 string devid
= what
.substr(7);
344 if (!daemon_state
.with_device(
346 [&f
, &tstate
] (const DeviceState
& dev
) {
347 PyEval_RestoreThread(tstate
);
348 f
.dump_object("device", dev
);
351 PyEval_RestoreThread(tstate
);
354 } else if (what
== "io_rate") {
355 cluster_state
.with_pgmap(
356 [&f
, &tstate
](const PGMap
&pg_map
) {
357 PyEval_RestoreThread(tstate
);
358 pg_map
.dump_delta(&f
);
362 } else if (what
== "df") {
363 cluster_state
.with_osdmap_and_pgmap(
365 const OSDMap
& osd_map
,
366 const PGMap
&pg_map
) {
367 PyEval_RestoreThread(tstate
);
368 pg_map
.dump_cluster_stats(nullptr, &f
, true);
369 pg_map
.dump_pool_stats_full(osd_map
, nullptr, &f
, true);
372 } else if (what
== "pg_stats") {
373 cluster_state
.with_pgmap(
374 [&f
, &tstate
](const PGMap
&pg_map
) {
375 PyEval_RestoreThread(tstate
);
376 pg_map
.dump_pg_stats(&f
, false);
379 } else if (what
== "pool_stats") {
380 cluster_state
.with_pgmap(
381 [&f
, &tstate
](const PGMap
&pg_map
) {
382 PyEval_RestoreThread(tstate
);
383 pg_map
.dump_pool_stats(&f
);
386 } else if (what
== "pg_ready") {
387 PyEval_RestoreThread(tstate
);
388 server
.dump_pg_ready(&f
);
390 } else if (what
== "osd_stats") {
391 cluster_state
.with_pgmap(
392 [&f
, &tstate
](const PGMap
&pg_map
) {
393 PyEval_RestoreThread(tstate
);
394 pg_map
.dump_osd_stats(&f
, false);
397 } else if (what
== "osd_ping_times") {
398 cluster_state
.with_pgmap(
399 [&f
, &tstate
](const PGMap
&pg_map
) {
400 PyEval_RestoreThread(tstate
);
401 pg_map
.dump_osd_ping_times(&f
);
404 } else if (what
== "osd_pool_stats") {
405 int64_t poolid
= -ENOENT
;
406 cluster_state
.with_osdmap_and_pgmap([&](const OSDMap
& osdmap
,
407 const PGMap
& pg_map
) {
408 PyEval_RestoreThread(tstate
);
409 f
.open_array_section("pool_stats");
410 for (auto &p
: osdmap
.get_pools()) {
412 pg_map
.dump_pool_stats_and_io_rate(poolid
, osdmap
, &f
, nullptr);
417 } else if (what
== "health" || what
== "mon_status") {
419 if (what
== "health") {
420 json
= cluster_state
.get_health();
421 } else if (what
== "mon_status") {
422 json
= cluster_state
.get_mon_status();
427 PyEval_RestoreThread(tstate
);
428 f
.dump_string("json", json
.to_str());
430 } else if (what
== "mgr_map") {
431 cluster_state
.with_mgrmap([&f
, &tstate
](const MgrMap
&mgr_map
) {
432 PyEval_RestoreThread(tstate
);
437 derr
<< "Python module requested unknown data '" << what
<< "'" << dendl
;
438 PyEval_RestoreThread(tstate
);
443 void ActivePyModules::start_one(PyModuleRef py_module
)
445 std::lock_guard
l(lock
);
447 const auto name
= py_module
->get_name();
448 auto em
= modules
.emplace(name
,
449 std::make_shared
<ActivePyModule
>(py_module
, clog
));
450 ceph_assert(em
.second
); // actually inserted
451 auto& active_module
= em
.first
->second
;
453 // Send all python calls down a Finisher to avoid blocking
454 // C++ code, and avoid any potential lock cycles.
455 finisher
.queue(new LambdaContext([this, active_module
, name
](int) {
456 int r
= active_module
->load(this);
458 derr
<< "Failed to run module in active mode ('" << name
<< "')"
460 std::lock_guard
l(lock
);
463 dout(4) << "Starting thread for " << name
<< dendl
;
464 active_module
->thread
.create(active_module
->get_thread_name());
469 void ActivePyModules::shutdown()
471 std::lock_guard
locker(lock
);
473 // Signal modules to drop out of serve() and/or tear down resources
474 for (auto& [name
, module
] : modules
) {
476 dout(10) << "calling module " << name
<< " shutdown()" << dendl
;
478 dout(10) << "module " << name
<< " shutdown() returned" << dendl
;
482 // For modules implementing serve(), finish the threads where we
483 // were running that.
484 for (auto& [name
, module
] : modules
) {
486 dout(10) << "joining module " << name
<< dendl
;
487 module
->thread
.join();
488 dout(10) << "joined module " << name
<< dendl
;
492 cmd_finisher
.wait_for_empty();
498 void ActivePyModules::notify_all(const std::string
¬ify_type
,
499 const std::string
¬ify_id
)
501 std::lock_guard
l(lock
);
503 dout(10) << __func__
<< ": notify_all " << notify_type
<< dendl
;
504 for (auto& [name
, module
] : modules
) {
505 // Send all python calls down a Finisher to avoid blocking
506 // C++ code, and avoid any potential lock cycles.
507 dout(15) << "queuing notify to " << name
<< dendl
;
508 // workaround for https://bugs.llvm.org/show_bug.cgi?id=35984
509 finisher
.queue(new LambdaContext([module
=module
, notify_type
, notify_id
]
511 module
->notify(notify_type
, notify_id
);
516 void ActivePyModules::notify_all(const LogEntry
&log_entry
)
518 std::lock_guard
l(lock
);
520 dout(10) << __func__
<< ": notify_all (clog)" << dendl
;
521 for (auto& [name
, module
] : modules
) {
522 // Send all python calls down a Finisher to avoid blocking
523 // C++ code, and avoid any potential lock cycles.
525 // Note intentional use of non-reference lambda binding on
526 // log_entry: we take a copy because caller's instance is
527 // probably ephemeral.
528 dout(15) << "queuing notify (clog) to " << name
<< dendl
;
529 // workaround for https://bugs.llvm.org/show_bug.cgi?id=35984
530 finisher
.queue(new LambdaContext([module
=module
, log_entry
](int r
){
531 module
->notify_clog(log_entry
);
536 bool ActivePyModules::get_store(const std::string
&module_name
,
537 const std::string
&key
, std::string
*val
) const
539 PyThreadState
*tstate
= PyEval_SaveThread();
540 std::lock_guard
l(lock
);
541 PyEval_RestoreThread(tstate
);
543 const std::string global_key
= PyModule::config_prefix
544 + module_name
+ "/" + key
;
546 dout(4) << __func__
<< " key: " << global_key
<< dendl
;
548 auto i
= store_cache
.find(global_key
);
549 if (i
!= store_cache
.end()) {
557 PyObject
*ActivePyModules::dispatch_remote(
558 const std::string
&other_module
,
559 const std::string
&method
,
564 auto mod_iter
= modules
.find(other_module
);
565 ceph_assert(mod_iter
!= modules
.end());
567 return mod_iter
->second
->dispatch_remote(method
, args
, kwargs
, err
);
570 bool ActivePyModules::get_config(const std::string
&module_name
,
571 const std::string
&key
, std::string
*val
) const
573 const std::string global_key
= PyModule::config_prefix
574 + module_name
+ "/" + key
;
576 dout(20) << " key: " << global_key
<< dendl
;
578 std::lock_guard
lock(module_config
.lock
);
580 auto i
= module_config
.config
.find(global_key
);
581 if (i
!= module_config
.config
.end()) {
589 PyObject
*ActivePyModules::get_typed_config(
590 const std::string
&module_name
,
591 const std::string
&key
,
592 const std::string
&prefix
) const
594 PyThreadState
*tstate
= PyEval_SaveThread();
596 std::string final_key
;
599 final_key
= prefix
+ "/" + key
;
600 found
= get_config(module_name
, final_key
, &value
);
604 found
= get_config(module_name
, final_key
, &value
);
607 PyModuleRef module
= py_module_registry
.get_module(module_name
);
608 PyEval_RestoreThread(tstate
);
610 derr
<< "Module '" << module_name
<< "' is not available" << dendl
;
613 dout(10) << __func__
<< " " << final_key
<< " found: " << value
<< dendl
;
614 return module
->get_typed_option_value(key
, value
);
616 PyEval_RestoreThread(tstate
);
618 dout(10) << " [" << prefix
<< "/]" << key
<< " not found "
621 dout(10) << " " << key
<< " not found " << dendl
;
626 PyObject
*ActivePyModules::get_store_prefix(const std::string
&module_name
,
627 const std::string
&prefix
) const
629 PyThreadState
*tstate
= PyEval_SaveThread();
630 std::lock_guard
l(lock
);
631 std::lock_guard
lock(module_config
.lock
);
632 PyEval_RestoreThread(tstate
);
634 const std::string base_prefix
= PyModule::config_prefix
636 const std::string global_prefix
= base_prefix
+ prefix
;
637 dout(4) << __func__
<< " prefix: " << global_prefix
<< dendl
;
641 for (auto p
= store_cache
.lower_bound(global_prefix
);
642 p
!= store_cache
.end() && p
->first
.find(global_prefix
) == 0;
644 f
.dump_string(p
->first
.c_str() + base_prefix
.size(), p
->second
);
649 void ActivePyModules::set_store(const std::string
&module_name
,
650 const std::string
&key
, const boost::optional
<std::string
>& val
)
652 const std::string global_key
= PyModule::config_prefix
653 + module_name
+ "/" + key
;
657 std::lock_guard
l(lock
);
659 store_cache
[global_key
] = *val
;
661 store_cache
.erase(global_key
);
664 std::ostringstream cmd_json
;
666 jf
.open_object_section("cmd");
668 jf
.dump_string("prefix", "config-key set");
669 jf
.dump_string("key", global_key
);
670 jf
.dump_string("val", *val
);
672 jf
.dump_string("prefix", "config-key del");
673 jf
.dump_string("key", global_key
);
677 set_cmd
.run(&monc
, cmd_json
.str());
681 if (set_cmd
.r
!= 0) {
682 // config-key set will fail if mgr's auth key has insufficient
683 // permission to set config keys
684 // FIXME: should this somehow raise an exception back into Python land?
685 dout(0) << "`config-key set " << global_key
<< " " << val
<< "` failed: "
686 << cpp_strerror(set_cmd
.r
) << dendl
;
687 dout(0) << "mon returned " << set_cmd
.r
<< ": " << set_cmd
.outs
<< dendl
;
691 void ActivePyModules::set_config(const std::string
&module_name
,
692 const std::string
&key
, const boost::optional
<std::string
>& val
)
694 module_config
.set_config(&monc
, module_name
, key
, val
);
697 std::map
<std::string
, std::string
> ActivePyModules::get_services() const
699 std::map
<std::string
, std::string
> result
;
700 std::lock_guard
l(lock
);
701 for (const auto& [name
, module
] : modules
) {
702 std::string svc_str
= module
->get_uri();
703 if (!svc_str
.empty()) {
704 result
[name
] = svc_str
;
711 PyObject
* ActivePyModules::with_perf_counters(
712 std::function
<void(PerfCounterInstance
& counter_instance
, PerfCounterType
& counter_type
, PyFormatter
& f
)> fct
,
713 const std::string
&svc_name
,
714 const std::string
&svc_id
,
715 const std::string
&path
) const
717 PyThreadState
*tstate
= PyEval_SaveThread();
718 std::lock_guard
l(lock
);
719 PyEval_RestoreThread(tstate
);
722 f
.open_array_section(path
.c_str());
724 auto metadata
= daemon_state
.get(DaemonKey
{svc_name
, svc_id
});
726 std::lock_guard
l2(metadata
->lock
);
727 if (metadata
->perf_counters
.instances
.count(path
)) {
728 auto counter_instance
= metadata
->perf_counters
.instances
.at(path
);
729 auto counter_type
= metadata
->perf_counters
.types
.at(path
);
730 fct(counter_instance
, counter_type
, f
);
732 dout(4) << "Missing counter: '" << path
<< "' ("
733 << svc_name
<< "." << svc_id
<< ")" << dendl
;
734 dout(20) << "Paths are:" << dendl
;
735 for (const auto &i
: metadata
->perf_counters
.instances
) {
736 dout(20) << i
.first
<< dendl
;
740 dout(4) << "No daemon state for "
741 << svc_name
<< "." << svc_id
<< ")" << dendl
;
747 PyObject
* ActivePyModules::get_counter_python(
748 const std::string
&svc_name
,
749 const std::string
&svc_id
,
750 const std::string
&path
)
752 auto extract_counters
= [](
753 PerfCounterInstance
& counter_instance
,
754 PerfCounterType
& counter_type
,
757 if (counter_type
.type
& PERFCOUNTER_LONGRUNAVG
) {
758 const auto &avg_data
= counter_instance
.get_data_avg();
759 for (const auto &datapoint
: avg_data
) {
760 f
.open_array_section("datapoint");
761 f
.dump_float("t", datapoint
.t
);
762 f
.dump_unsigned("s", datapoint
.s
);
763 f
.dump_unsigned("c", datapoint
.c
);
767 const auto &data
= counter_instance
.get_data();
768 for (const auto &datapoint
: data
) {
769 f
.open_array_section("datapoint");
770 f
.dump_float("t", datapoint
.t
);
771 f
.dump_unsigned("v", datapoint
.v
);
776 return with_perf_counters(extract_counters
, svc_name
, svc_id
, path
);
779 PyObject
* ActivePyModules::get_latest_counter_python(
780 const std::string
&svc_name
,
781 const std::string
&svc_id
,
782 const std::string
&path
)
784 auto extract_latest_counters
= [](
785 PerfCounterInstance
& counter_instance
,
786 PerfCounterType
& counter_type
,
789 if (counter_type
.type
& PERFCOUNTER_LONGRUNAVG
) {
790 const auto &datapoint
= counter_instance
.get_latest_data_avg();
791 f
.dump_float("t", datapoint
.t
);
792 f
.dump_unsigned("s", datapoint
.s
);
793 f
.dump_unsigned("c", datapoint
.c
);
795 const auto &datapoint
= counter_instance
.get_latest_data();
796 f
.dump_float("t", datapoint
.t
);
797 f
.dump_unsigned("v", datapoint
.v
);
800 return with_perf_counters(extract_latest_counters
, svc_name
, svc_id
, path
);
803 PyObject
* ActivePyModules::get_perf_schema_python(
804 const std::string
&svc_type
,
805 const std::string
&svc_id
)
807 PyThreadState
*tstate
= PyEval_SaveThread();
808 std::lock_guard
l(lock
);
809 PyEval_RestoreThread(tstate
);
811 DaemonStateCollection daemons
;
813 if (svc_type
== "") {
814 daemons
= daemon_state
.get_all();
815 } else if (svc_id
.empty()) {
816 daemons
= daemon_state
.get_by_service(svc_type
);
818 auto key
= DaemonKey
{svc_type
, svc_id
};
819 // so that the below can be a loop in all cases
820 auto got
= daemon_state
.get(key
);
821 if (got
!= nullptr) {
827 if (!daemons
.empty()) {
828 for (auto& [key
, state
] : daemons
) {
829 f
.open_object_section(ceph::to_string(key
).c_str());
831 std::lock_guard
l(state
->lock
);
832 for (auto ctr_inst_iter
: state
->perf_counters
.instances
) {
833 const auto &counter_name
= ctr_inst_iter
.first
;
834 f
.open_object_section(counter_name
.c_str());
835 auto type
= state
->perf_counters
.types
[counter_name
];
836 f
.dump_string("description", type
.description
);
837 if (!type
.nick
.empty()) {
838 f
.dump_string("nick", type
.nick
);
840 f
.dump_unsigned("type", type
.type
);
841 f
.dump_unsigned("priority", type
.priority
);
842 f
.dump_unsigned("units", type
.unit
);
848 dout(4) << __func__
<< ": No daemon state found for "
849 << svc_type
<< "." << svc_id
<< ")" << dendl
;
854 PyObject
*ActivePyModules::get_context()
856 PyThreadState
*tstate
= PyEval_SaveThread();
857 std::lock_guard
l(lock
);
858 PyEval_RestoreThread(tstate
);
860 // Construct a capsule containing ceph context.
861 // Not incrementing/decrementing ref count on the context because
862 // it's the global one and it has process lifetime.
863 auto capsule
= PyCapsule_New(g_ceph_context
, nullptr, nullptr);
868 * Helper for our wrapped types that take a capsule in their constructor.
870 PyObject
*construct_with_capsule(
871 const std::string
&module_name
,
872 const std::string
&clsname
,
875 // Look up the OSDMap type which we will construct
876 PyObject
*module
= PyImport_ImportModule(module_name
.c_str());
878 derr
<< "Failed to import python module:" << dendl
;
879 derr
<< handle_pyerror() << dendl
;
883 PyObject
*wrapper_type
= PyObject_GetAttrString(
884 module
, (const char*)clsname
.c_str());
886 derr
<< "Failed to get python type:" << dendl
;
887 derr
<< handle_pyerror() << dendl
;
889 ceph_assert(wrapper_type
);
891 // Construct a capsule containing an OSDMap.
892 auto wrapped_capsule
= PyCapsule_New(wrapped
, nullptr, nullptr);
893 ceph_assert(wrapped_capsule
);
895 // Construct the python OSDMap
896 auto pArgs
= PyTuple_Pack(1, wrapped_capsule
);
897 auto wrapper_instance
= PyObject_CallObject(wrapper_type
, pArgs
);
898 if (wrapper_instance
== nullptr) {
899 derr
<< "Failed to construct python OSDMap:" << dendl
;
900 derr
<< handle_pyerror() << dendl
;
902 ceph_assert(wrapper_instance
!= nullptr);
904 Py_DECREF(wrapped_capsule
);
906 Py_DECREF(wrapper_type
);
909 return wrapper_instance
;
912 PyObject
*ActivePyModules::get_osdmap()
914 OSDMap
*newmap
= new OSDMap
;
916 PyThreadState
*tstate
= PyEval_SaveThread();
918 std::lock_guard
l(lock
);
919 cluster_state
.with_osdmap([&](const OSDMap
& o
) {
920 newmap
->deepish_copy_from(o
);
923 PyEval_RestoreThread(tstate
);
925 return construct_with_capsule("mgr_module", "OSDMap", (void*)newmap
);
928 void ActivePyModules::set_health_checks(const std::string
& module_name
,
929 health_check_map_t
&& checks
)
931 bool changed
= false;
934 auto p
= modules
.find(module_name
);
935 if (p
!= modules
.end()) {
936 changed
= p
->second
->set_health_checks(std::move(checks
));
940 // immediately schedule a report to be sent to the monitors with the new
941 // health checks that have changed. This is done asynchronusly to avoid
942 // blocking python land. ActivePyModules::lock needs to be dropped to make
945 // send_report callers: DaemonServer::lock -> PyModuleRegistery::lock
946 // active_start: PyModuleRegistry::lock -> ActivePyModules::lock
948 // if we don't release this->lock before calling schedule_tick a cycle is
949 // formed with the addition of ActivePyModules::lock -> DaemonServer::lock.
950 // This is still correct as send_report is run asynchronously under
951 // DaemonServer::lock.
953 server
.schedule_tick(0);
956 int ActivePyModules::handle_command(
957 const ModuleCommand
& module_command
,
958 const MgrSession
& session
,
959 const cmdmap_t
&cmdmap
,
960 const bufferlist
&inbuf
,
961 std::stringstream
*ds
,
962 std::stringstream
*ss
)
965 auto mod_iter
= modules
.find(module_command
.module_name
);
966 if (mod_iter
== modules
.end()) {
967 *ss
<< "Module '" << module_command
.module_name
<< "' is not available";
973 return mod_iter
->second
->handle_command(module_command
, session
, cmdmap
,
977 void ActivePyModules::get_health_checks(health_check_map_t
*checks
)
979 std::lock_guard
l(lock
);
980 for (auto& [name
, module
] : modules
) {
981 dout(15) << "getting health checks for " << name
<< dendl
;
982 module
->get_health_checks(checks
);
986 void ActivePyModules::update_progress_event(
987 const std::string
& evid
,
988 const std::string
& desc
,
991 std::lock_guard
l(lock
);
992 auto& pe
= progress_events
[evid
];
994 pe
.progress
= progress
;
997 void ActivePyModules::complete_progress_event(const std::string
& evid
)
999 std::lock_guard
l(lock
);
1000 progress_events
.erase(evid
);
1003 void ActivePyModules::clear_all_progress_events()
1005 std::lock_guard
l(lock
);
1006 progress_events
.clear();
1009 void ActivePyModules::get_progress_events(std::map
<std::string
,ProgressEvent
> *events
)
1011 std::lock_guard
l(lock
);
1012 *events
= progress_events
;
1015 void ActivePyModules::config_notify()
1017 std::lock_guard
l(lock
);
1018 for (auto& [name
, module
] : modules
) {
1019 // Send all python calls down a Finisher to avoid blocking
1020 // C++ code, and avoid any potential lock cycles.
1021 dout(15) << "notify (config) " << name
<< dendl
;
1022 // workaround for https://bugs.llvm.org/show_bug.cgi?id=35984
1023 finisher
.queue(new LambdaContext([module
=module
](int r
){
1024 module
->config_notify();
1029 void ActivePyModules::set_uri(const std::string
& module_name
,
1030 const std::string
&uri
)
1032 std::lock_guard
l(lock
);
1034 dout(4) << " module " << module_name
<< " set URI '" << uri
<< "'" << dendl
;
1036 modules
.at(module_name
)->set_uri(uri
);
1039 MetricQueryID
ActivePyModules::add_osd_perf_query(
1040 const OSDPerfMetricQuery
&query
,
1041 const std::optional
<OSDPerfMetricLimit
> &limit
)
1043 return server
.add_osd_perf_query(query
, limit
);
1046 void ActivePyModules::remove_osd_perf_query(MetricQueryID query_id
)
1048 int r
= server
.remove_osd_perf_query(query_id
);
1050 dout(0) << "remove_osd_perf_query for query_id=" << query_id
<< " failed: "
1051 << cpp_strerror(r
) << dendl
;
1055 PyObject
*ActivePyModules::get_osd_perf_counters(MetricQueryID query_id
)
1057 std::map
<OSDPerfMetricKey
, PerformanceCounters
> counters
;
1059 int r
= server
.get_osd_perf_counters(query_id
, &counters
);
1061 dout(0) << "get_osd_perf_counters for query_id=" << query_id
<< " failed: "
1062 << cpp_strerror(r
) << dendl
;
1068 f
.open_array_section("counters");
1069 for (auto &it
: counters
) {
1070 auto &key
= it
.first
;
1071 auto &instance_counters
= it
.second
;
1072 f
.open_object_section("i");
1073 f
.open_array_section("k");
1074 for (auto &sub_key
: key
) {
1075 f
.open_array_section("s");
1076 for (size_t i
= 0; i
< sub_key
.size(); i
++) {
1077 f
.dump_string(stringify(i
).c_str(), sub_key
[i
]);
1079 f
.close_section(); // s
1081 f
.close_section(); // k
1082 f
.open_array_section("c");
1083 for (auto &c
: instance_counters
) {
1084 f
.open_array_section("p");
1085 f
.dump_unsigned("0", c
.first
);
1086 f
.dump_unsigned("1", c
.second
);
1087 f
.close_section(); // p
1089 f
.close_section(); // c
1090 f
.close_section(); // i
1092 f
.close_section(); // counters
1097 void ActivePyModules::cluster_log(const std::string
&channel
, clog_type prio
,
1098 const std::string
&message
)
1100 std::lock_guard
l(lock
);
1102 auto cl
= monc
.get_log_client()->create_channel(channel
);
1103 map
<string
,string
> log_to_monitors
;
1104 map
<string
,string
> log_to_syslog
;
1105 map
<string
,string
> log_channel
;
1106 map
<string
,string
> log_prio
;
1107 map
<string
,string
> log_to_graylog
;
1108 map
<string
,string
> log_to_graylog_host
;
1109 map
<string
,string
> log_to_graylog_port
;
1112 if (parse_log_client_options(g_ceph_context
, log_to_monitors
, log_to_syslog
,
1113 log_channel
, log_prio
, log_to_graylog
,
1114 log_to_graylog_host
, log_to_graylog_port
,
1116 cl
->update_config(log_to_monitors
, log_to_syslog
,
1117 log_channel
, log_prio
, log_to_graylog
,
1118 log_to_graylog_host
, log_to_graylog_port
,
1120 cl
->do_log(prio
, message
);
1123 void ActivePyModules::register_client(std::string_view name
, std::string addrs
)
1125 std::lock_guard
l(lock
);
1127 entity_addrvec_t addrv
;
1128 addrv
.parse(addrs
.data());
1130 dout(7) << "registering msgr client handle " << addrv
<< dendl
;
1131 py_module_registry
.register_client(name
, std::move(addrv
));
1134 void ActivePyModules::unregister_client(std::string_view name
, std::string addrs
)
1136 std::lock_guard
l(lock
);
1138 entity_addrvec_t addrv
;
1139 addrv
.parse(addrs
.data());
1141 dout(7) << "unregistering msgr client handle " << addrv
<< dendl
;
1142 py_module_registry
.unregister_client(name
, addrv
);