1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2014 John Spray <john.spray@inktank.com>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
14 // Include this first to get python headers earlier
17 #include "ActivePyModules.h"
19 #include <rocksdb/version.h>
21 #include "common/errno.h"
22 #include "include/stringify.h"
24 #include "mon/MonMap.h"
25 #include "osd/OSDMap.h"
26 #include "osd/osd_types.h"
27 #include "mgr/MgrContext.h"
28 #include "mgr/TTLCache.h"
29 #include "mgr/mgr_perf_counters.h"
31 #include "DaemonKey.h"
32 #include "DaemonServer.h"
33 #include "mgr/MgrContext.h"
34 #include "PyFormatter.h"
35 // For ::mgr_store_prefix
37 #include "PyModuleRegistry.h"
40 #define dout_context g_ceph_context
41 #define dout_subsys ceph_subsys_mgr
43 #define dout_prefix *_dout << "mgr " << __func__ << " "
47 using namespace std::literals
;
49 ActivePyModules::ActivePyModules(
50 PyModuleConfig
&module_config_
,
51 std::map
<std::string
, std::string
> store_data
,
52 bool mon_provides_kv_sub
,
53 DaemonStateIndex
&ds
, ClusterState
&cs
,
54 MonClient
&mc
, LogChannelRef clog_
,
55 LogChannelRef audit_clog_
, Objecter
&objecter_
,
56 Client
&client_
, Finisher
&f
, DaemonServer
&server
,
57 PyModuleRegistry
&pmr
)
58 : module_config(module_config_
), daemon_state(ds
), cluster_state(cs
),
59 monc(mc
), clog(clog_
), audit_clog(audit_clog_
), objecter(objecter_
),
60 client(client_
), finisher(f
),
61 cmd_finisher(g_ceph_context
, "cmd_finisher", "cmdfin"),
62 server(server
), py_module_registry(pmr
)
64 store_cache
= std::move(store_data
);
65 // we can only trust our ConfigMap if the mon cluster has provided
66 // kv sub since our startup.
67 have_local_config_map
= mon_provides_kv_sub
;
68 _refresh_config_map();
72 ActivePyModules::~ActivePyModules() = default;
74 void ActivePyModules::dump_server(const std::string
&hostname
,
75 const DaemonStateCollection
&dmc
,
78 f
->dump_string("hostname", hostname
);
79 f
->open_array_section("services");
80 std::string ceph_version
;
82 for (const auto &[key
, state
] : dmc
) {
84 without_gil([&ceph_version
, &id
, state
=state
] {
85 std::lock_guard
l(state
->lock
);
86 // TODO: pick the highest version, and make sure that
87 // somewhere else (during health reporting?) we are
88 // indicating to the user if we see mixed versions
89 auto ver_iter
= state
->metadata
.find("ceph_version");
90 if (ver_iter
!= state
->metadata
.end()) {
91 ceph_version
= state
->metadata
.at("ceph_version");
93 if (state
->metadata
.find("id") != state
->metadata
.end()) {
94 id
= state
->metadata
.at("id");
97 f
->open_object_section("service");
98 f
->dump_string("type", key
.type
);
99 f
->dump_string("id", key
.name
);
101 f
->dump_string("name", id
);
107 f
->dump_string("ceph_version", ceph_version
);
110 PyObject
*ActivePyModules::get_server_python(const std::string
&hostname
)
112 const auto dmc
= without_gil([&]{
113 std::lock_guard
l(lock
);
114 dout(10) << " (" << hostname
<< ")" << dendl
;
115 return daemon_state
.get_by_server(hostname
);
118 dump_server(hostname
, dmc
, &f
);
123 PyObject
*ActivePyModules::list_servers_python()
125 dout(10) << " >" << dendl
;
127 without_gil_t no_gil
;
128 return daemon_state
.with_daemons_by_server([this, &no_gil
]
129 (const std::map
<std::string
, DaemonStateCollection
> &all
) {
130 no_gil
.acquire_gil();
131 PyFormatter
f(false, true);
132 for (const auto &[hostname
, daemon_state
] : all
) {
133 f
.open_object_section("server");
134 dump_server(hostname
, daemon_state
, &f
);
141 PyObject
*ActivePyModules::get_metadata_python(
142 const std::string
&svc_type
,
143 const std::string
&svc_id
)
145 auto metadata
= daemon_state
.get(DaemonKey
{svc_type
, svc_id
});
146 if (metadata
== nullptr) {
147 derr
<< "Requested missing service " << svc_type
<< "." << svc_id
<< dendl
;
150 auto l
= without_gil([&] {
151 return std::lock_guard(lock
);
154 f
.dump_string("hostname", metadata
->hostname
);
155 for (const auto &[key
, val
] : metadata
->metadata
) {
156 f
.dump_string(key
, val
);
162 PyObject
*ActivePyModules::get_daemon_status_python(
163 const std::string
&svc_type
,
164 const std::string
&svc_id
)
166 auto metadata
= daemon_state
.get(DaemonKey
{svc_type
, svc_id
});
167 if (metadata
== nullptr) {
168 derr
<< "Requested missing service " << svc_type
<< "." << svc_id
<< dendl
;
171 auto l
= without_gil([&] {
172 return std::lock_guard(lock
);
175 for (const auto &[daemon
, status
] : metadata
->service_status
) {
176 f
.dump_string(daemon
, status
);
181 void ActivePyModules::update_cache_metrics() {
182 auto hit_miss_ratio
= ttl_cache
.get_hit_miss_ratio();
183 perfcounter
->set(l_mgr_cache_hit
, hit_miss_ratio
.first
);
184 perfcounter
->set(l_mgr_cache_miss
, hit_miss_ratio
.second
);
187 PyObject
*ActivePyModules::cacheable_get_python(const std::string
&what
)
189 uint64_t ttl_seconds
= g_conf().get_val
<uint64_t>("mgr_ttl_cache_expire_seconds");
190 if(ttl_seconds
> 0) {
191 ttl_cache
.set_ttl(ttl_seconds
);
193 PyObject
* cached
= ttl_cache
.get(what
);
194 update_cache_metrics();
196 } catch (std::out_of_range
& e
) {}
199 PyObject
*obj
= get_python(what
);
200 if(ttl_seconds
&& ttl_cache
.is_cacheable(what
)) {
201 ttl_cache
.insert(what
, obj
);
204 update_cache_metrics();
208 PyObject
*ActivePyModules::get_python(const std::string
&what
)
210 uint64_t ttl_seconds
= g_conf().get_val
<uint64_t>("mgr_ttl_cache_expire_seconds");
214 // Use PyJSONFormatter if TTL cache is enabled.
215 Formatter
&f
= ttl_seconds
? (Formatter
&)jf
: (Formatter
&)pf
;
217 if (what
== "fs_map") {
218 without_gil_t no_gil
;
219 cluster_state
.with_fsmap([&](const FSMap
&fsmap
) {
220 no_gil
.acquire_gil();
223 } else if (what
== "osdmap_crush_map_text") {
224 without_gil_t no_gil
;
226 cluster_state
.with_osdmap([&](const OSDMap
&osd_map
){
227 osd_map
.crush
->encode(rdata
, CEPH_FEATURES_SUPPORTED_DEFAULT
);
229 std::string crush_text
= rdata
.to_str();
230 no_gil
.acquire_gil();
231 return PyUnicode_FromString(crush_text
.c_str());
232 } else if (what
.substr(0, 7) == "osd_map") {
233 without_gil_t no_gil
;
234 cluster_state
.with_osdmap([&](const OSDMap
&osd_map
){
235 no_gil
.acquire_gil();
236 if (what
== "osd_map") {
238 } else if (what
== "osd_map_tree") {
239 osd_map
.print_tree(&f
, nullptr);
240 } else if (what
== "osd_map_crush") {
241 osd_map
.crush
->dump(&f
);
244 } else if (what
== "modified_config_options") {
245 without_gil_t no_gil
;
246 auto all_daemons
= daemon_state
.get_all();
248 for (auto& [key
, daemon
] : all_daemons
) {
249 std::lock_guard
l(daemon
->lock
);
250 for (auto& [name
, valmap
] : daemon
->config
) {
254 no_gil
.acquire_gil();
255 f
.open_array_section("options");
256 for (auto& name
: names
) {
257 f
.dump_string("name", name
);
260 } else if (what
.substr(0, 6) == "config") {
261 if (what
== "config_options") {
262 g_conf().config_options(&f
);
263 } else if (what
== "config") {
264 g_conf().show_config(&f
);
266 } else if (what
== "mon_map") {
267 without_gil_t no_gil
;
268 cluster_state
.with_monmap([&](const MonMap
&monmap
) {
269 no_gil
.acquire_gil();
272 } else if (what
== "service_map") {
273 without_gil_t no_gil
;
274 cluster_state
.with_servicemap([&](const ServiceMap
&service_map
) {
275 no_gil
.acquire_gil();
276 service_map
.dump(&f
);
278 } else if (what
== "osd_metadata") {
279 without_gil_t no_gil
;
280 auto dmc
= daemon_state
.get_by_service("osd");
281 for (const auto &[key
, state
] : dmc
) {
282 std::lock_guard
l(state
->lock
);
283 with_gil(no_gil
, [&f
, &name
=key
.name
, state
=state
] {
284 f
.open_object_section(name
.c_str());
285 f
.dump_string("hostname", state
->hostname
);
286 for (const auto &[name
, val
] : state
->metadata
) {
287 f
.dump_string(name
.c_str(), val
);
292 } else if (what
== "mds_metadata") {
293 without_gil_t no_gil
;
294 auto dmc
= daemon_state
.get_by_service("mds");
295 for (const auto &[key
, state
] : dmc
) {
296 std::lock_guard
l(state
->lock
);
297 with_gil(no_gil
, [&f
, &name
=key
.name
, state
=state
] {
298 f
.open_object_section(name
.c_str());
299 f
.dump_string("hostname", state
->hostname
);
300 for (const auto &[name
, val
] : state
->metadata
) {
301 f
.dump_string(name
.c_str(), val
);
306 } else if (what
== "pg_summary") {
307 without_gil_t no_gil
;
308 cluster_state
.with_pgmap(
309 [&f
, &no_gil
](const PGMap
&pg_map
) {
310 std::map
<std::string
, std::map
<std::string
, uint32_t> > osds
;
311 std::map
<std::string
, std::map
<std::string
, uint32_t> > pools
;
312 std::map
<std::string
, uint32_t> all
;
313 for (const auto &i
: pg_map
.pg_stat
) {
314 const auto pool
= i
.first
.m_pool
;
315 const std::string state
= pg_state_string(i
.second
.state
);
316 // Insert to per-pool map
317 pools
[stringify(pool
)][state
]++;
318 for (const auto &osd_id
: i
.second
.acting
) {
319 osds
[stringify(osd_id
)][state
]++;
323 no_gil
.acquire_gil();
324 f
.open_object_section("by_osd");
325 for (const auto &i
: osds
) {
326 f
.open_object_section(i
.first
.c_str());
327 for (const auto &j
: i
.second
) {
328 f
.dump_int(j
.first
.c_str(), j
.second
);
333 f
.open_object_section("by_pool");
334 for (const auto &i
: pools
) {
335 f
.open_object_section(i
.first
.c_str());
336 for (const auto &j
: i
.second
) {
337 f
.dump_int(j
.first
.c_str(), j
.second
);
342 f
.open_object_section("all");
343 for (const auto &i
: all
) {
344 f
.dump_int(i
.first
.c_str(), i
.second
);
347 f
.open_object_section("pg_stats_sum");
348 pg_map
.pg_sum
.dump(&f
);
352 } else if (what
== "pg_status") {
353 without_gil_t no_gil
;
354 cluster_state
.with_pgmap(
355 [&](const PGMap
&pg_map
) {
356 no_gil
.acquire_gil();
357 pg_map
.print_summary(&f
, nullptr);
360 } else if (what
== "pg_dump") {
361 without_gil_t no_gil
;
362 cluster_state
.with_pgmap(
363 [&](const PGMap
&pg_map
) {
364 no_gil
.acquire_gil();
365 pg_map
.dump(&f
, false);
368 } else if (what
== "devices") {
369 without_gil_t no_gil
;
370 daemon_state
.with_devices2(
372 with_gil(no_gil
, [&] { f
.open_array_section("devices"); });
374 [&](const DeviceState
&dev
) {
375 with_gil(no_gil
, [&] { f
.dump_object("device", dev
); });
377 with_gil(no_gil
, [&] {
380 } else if (what
.size() > 7 &&
381 what
.substr(0, 7) == "device ") {
382 without_gil_t no_gil
;
383 string devid
= what
.substr(7);
384 if (!daemon_state
.with_device(devid
,
385 [&] (const DeviceState
& dev
) {
386 with_gil_t with_gil
{no_gil
};
387 f
.dump_object("device", dev
);
391 } else if (what
== "io_rate") {
392 without_gil_t no_gil
;
393 cluster_state
.with_pgmap(
394 [&](const PGMap
&pg_map
) {
395 no_gil
.acquire_gil();
396 pg_map
.dump_delta(&f
);
399 } else if (what
== "df") {
400 without_gil_t no_gil
;
401 cluster_state
.with_osdmap_and_pgmap(
403 const OSDMap
& osd_map
,
404 const PGMap
&pg_map
) {
405 no_gil
.acquire_gil();
406 pg_map
.dump_cluster_stats(nullptr, &f
, true);
407 pg_map
.dump_pool_stats_full(osd_map
, nullptr, &f
, true);
409 } else if (what
== "pg_stats") {
410 without_gil_t no_gil
;
411 cluster_state
.with_pgmap([&](const PGMap
&pg_map
) {
412 no_gil
.acquire_gil();
413 pg_map
.dump_pg_stats(&f
, false);
415 } else if (what
== "pool_stats") {
416 without_gil_t no_gil
;
417 cluster_state
.with_pgmap([&](const PGMap
&pg_map
) {
418 no_gil
.acquire_gil();
419 pg_map
.dump_pool_stats(&f
);
421 } else if (what
== "pg_ready") {
422 server
.dump_pg_ready(&f
);
423 } else if (what
== "pg_progress") {
424 without_gil_t no_gil
;
425 cluster_state
.with_pgmap([&](const PGMap
&pg_map
) {
426 no_gil
.acquire_gil();
427 pg_map
.dump_pg_progress(&f
);
428 server
.dump_pg_ready(&f
);
430 } else if (what
== "osd_stats") {
431 without_gil_t no_gil
;
432 cluster_state
.with_pgmap([&](const PGMap
&pg_map
) {
433 no_gil
.acquire_gil();
434 pg_map
.dump_osd_stats(&f
, false);
436 } else if (what
== "osd_ping_times") {
437 without_gil_t no_gil
;
438 cluster_state
.with_pgmap([&](const PGMap
&pg_map
) {
439 no_gil
.acquire_gil();
440 pg_map
.dump_osd_ping_times(&f
);
442 } else if (what
== "osd_pool_stats") {
443 without_gil_t no_gil
;
444 int64_t poolid
= -ENOENT
;
445 cluster_state
.with_osdmap_and_pgmap([&](const OSDMap
& osdmap
,
446 const PGMap
& pg_map
) {
447 no_gil
.acquire_gil();
448 f
.open_array_section("pool_stats");
449 for (auto &p
: osdmap
.get_pools()) {
451 pg_map
.dump_pool_stats_and_io_rate(poolid
, osdmap
, &f
, nullptr);
455 } else if (what
== "health") {
456 without_gil_t no_gil
;
457 cluster_state
.with_health([&](const ceph::bufferlist
&health_json
) {
458 no_gil
.acquire_gil();
459 f
.dump_string("json", health_json
.to_str());
461 } else if (what
== "mon_status") {
462 without_gil_t no_gil
;
463 cluster_state
.with_mon_status(
464 [&](const ceph::bufferlist
&mon_status_json
) {
465 no_gil
.acquire_gil();
466 f
.dump_string("json", mon_status_json
.to_str());
468 } else if (what
== "mgr_map") {
469 without_gil_t no_gil
;
470 cluster_state
.with_mgrmap([&](const MgrMap
&mgr_map
) {
471 no_gil
.acquire_gil();
474 } else if (what
== "mgr_ips") {
475 entity_addrvec_t myaddrs
= server
.get_myaddrs();
476 f
.open_array_section("ips");
477 std::set
<std::string
> did
;
478 for (auto& i
: myaddrs
.v
) {
479 std::string ip
= i
.ip_only_to_str();
480 if (auto [where
, inserted
] = did
.insert(ip
); inserted
) {
481 f
.dump_string("ip", ip
);
485 } else if (what
== "have_local_config_map") {
486 f
.dump_bool("have_local_config_map", have_local_config_map
);
487 } else if (what
== "active_clean_pgs"){
488 without_gil_t no_gil
;
489 cluster_state
.with_pgmap(
490 [&](const PGMap
&pg_map
) {
491 no_gil
.acquire_gil();
492 f
.open_array_section("pg_stats");
493 for (auto &i
: pg_map
.pg_stat
) {
494 const auto state
= i
.second
.state
;
495 const auto pgid_raw
= i
.first
;
496 const auto pgid
= stringify(pgid_raw
.m_pool
) + "." + stringify(pgid_raw
.m_seed
);
497 const auto reported_epoch
= i
.second
.reported_epoch
;
498 if (state
& PG_STATE_ACTIVE
&& state
& PG_STATE_CLEAN
) {
499 f
.open_object_section("pg_stat");
500 f
.dump_string("pgid", pgid
);
501 f
.dump_string("state", pg_state_string(state
));
502 f
.dump_unsigned("reported_epoch", reported_epoch
);
507 const auto num_pg
= pg_map
.num_pg
;
508 f
.dump_unsigned("total_num_pgs", num_pg
);
511 derr
<< "Python module requested unknown data '" << what
<< "'" << dendl
;
514 without_gil_t no_gil
;
515 no_gil
.acquire_gil();
523 void ActivePyModules::start_one(PyModuleRef py_module
)
525 std::lock_guard
l(lock
);
527 const auto name
= py_module
->get_name();
528 auto active_module
= std::make_shared
<ActivePyModule
>(py_module
, clog
);
530 pending_modules
.insert(name
);
531 // Send all python calls down a Finisher to avoid blocking
532 // C++ code, and avoid any potential lock cycles.
533 finisher
.queue(new LambdaContext([this, active_module
, name
](int) {
534 int r
= active_module
->load(this);
535 std::lock_guard
l(lock
);
536 pending_modules
.erase(name
);
538 derr
<< "Failed to run module in active mode ('" << name
<< "')"
541 auto em
= modules
.emplace(name
, active_module
);
542 ceph_assert(em
.second
); // actually inserted
544 dout(4) << "Starting thread for " << name
<< dendl
;
545 active_module
->thread
.create(active_module
->get_thread_name());
550 void ActivePyModules::shutdown()
552 std::lock_guard
locker(lock
);
554 // Signal modules to drop out of serve() and/or tear down resources
555 for (auto& [name
, module
] : modules
) {
557 dout(10) << "calling module " << name
<< " shutdown()" << dendl
;
559 dout(10) << "module " << name
<< " shutdown() returned" << dendl
;
563 // For modules implementing serve(), finish the threads where we
564 // were running that.
565 for (auto& [name
, module
] : modules
) {
567 dout(10) << "joining module " << name
<< dendl
;
568 module
->thread
.join();
569 dout(10) << "joined module " << name
<< dendl
;
573 cmd_finisher
.wait_for_empty();
579 void ActivePyModules::notify_all(const std::string
¬ify_type
,
580 const std::string
¬ify_id
)
582 std::lock_guard
l(lock
);
584 dout(10) << __func__
<< ": notify_all " << notify_type
<< dendl
;
585 for (auto& [name
, module
] : modules
) {
586 if (!py_module_registry
.should_notify(name
, notify_type
)) {
589 // Send all python calls down a Finisher to avoid blocking
590 // C++ code, and avoid any potential lock cycles.
591 dout(15) << "queuing notify (" << notify_type
<< ") to " << name
<< dendl
;
592 // workaround for https://bugs.llvm.org/show_bug.cgi?id=35984
593 finisher
.queue(new LambdaContext([module
=module
, notify_type
, notify_id
]
595 module
->notify(notify_type
, notify_id
);
600 void ActivePyModules::notify_all(const LogEntry
&log_entry
)
602 std::lock_guard
l(lock
);
604 dout(10) << __func__
<< ": notify_all (clog)" << dendl
;
605 for (auto& [name
, module
] : modules
) {
606 if (!py_module_registry
.should_notify(name
, "clog")) {
609 // Send all python calls down a Finisher to avoid blocking
610 // C++ code, and avoid any potential lock cycles.
612 // Note intentional use of non-reference lambda binding on
613 // log_entry: we take a copy because caller's instance is
614 // probably ephemeral.
615 dout(15) << "queuing notify (clog) to " << name
<< dendl
;
616 // workaround for https://bugs.llvm.org/show_bug.cgi?id=35984
617 finisher
.queue(new LambdaContext([module
=module
, log_entry
](int r
){
618 module
->notify_clog(log_entry
);
623 bool ActivePyModules::get_store(const std::string
&module_name
,
624 const std::string
&key
, std::string
*val
) const
626 without_gil_t no_gil
;
627 std::lock_guard
l(lock
);
629 const std::string global_key
= PyModule::mgr_store_prefix
630 + module_name
+ "/" + key
;
632 dout(4) << __func__
<< " key: " << global_key
<< dendl
;
634 auto i
= store_cache
.find(global_key
);
635 if (i
!= store_cache
.end()) {
643 PyObject
*ActivePyModules::dispatch_remote(
644 const std::string
&other_module
,
645 const std::string
&method
,
650 auto mod_iter
= modules
.find(other_module
);
651 ceph_assert(mod_iter
!= modules
.end());
653 return mod_iter
->second
->dispatch_remote(method
, args
, kwargs
, err
);
656 bool ActivePyModules::get_config(const std::string
&module_name
,
657 const std::string
&key
, std::string
*val
) const
659 const std::string global_key
= "mgr/" + module_name
+ "/" + key
;
661 dout(20) << " key: " << global_key
<< dendl
;
663 std::lock_guard
lock(module_config
.lock
);
665 auto i
= module_config
.config
.find(global_key
);
666 if (i
!= module_config
.config
.end()) {
674 PyObject
*ActivePyModules::get_typed_config(
675 const std::string
&module_name
,
676 const std::string
&key
,
677 const std::string
&prefix
) const
679 without_gil_t no_gil
;
681 std::string final_key
;
684 final_key
= prefix
+ "/" + key
;
685 found
= get_config(module_name
, final_key
, &value
);
689 found
= get_config(module_name
, final_key
, &value
);
692 PyModuleRef module
= py_module_registry
.get_module(module_name
);
693 no_gil
.acquire_gil();
695 derr
<< "Module '" << module_name
<< "' is not available" << dendl
;
698 // removing value to hide sensitive data going into mgr logs
699 // leaving this for debugging purposes
700 // dout(10) << __func__ << " " << final_key << " found: " << value << dendl;
701 dout(10) << __func__
<< " " << final_key
<< " found" << dendl
;
702 return module
->get_typed_option_value(key
, value
);
705 dout(10) << " [" << prefix
<< "/]" << key
<< " not found "
708 dout(10) << " " << key
<< " not found " << dendl
;
713 PyObject
*ActivePyModules::get_store_prefix(const std::string
&module_name
,
714 const std::string
&prefix
) const
716 without_gil_t no_gil
;
717 std::lock_guard
l(lock
);
718 std::lock_guard
lock(module_config
.lock
);
719 no_gil
.acquire_gil();
721 const std::string base_prefix
= PyModule::mgr_store_prefix
723 const std::string global_prefix
= base_prefix
+ prefix
;
724 dout(4) << __func__
<< " prefix: " << global_prefix
<< dendl
;
727 for (auto p
= store_cache
.lower_bound(global_prefix
);
728 p
!= store_cache
.end() && p
->first
.find(global_prefix
) == 0; ++p
) {
729 f
.dump_string(p
->first
.c_str() + base_prefix
.size(), p
->second
);
734 void ActivePyModules::set_store(const std::string
&module_name
,
735 const std::string
&key
, const std::optional
<std::string
>& val
)
737 const std::string global_key
= PyModule::mgr_store_prefix
738 + module_name
+ "/" + key
;
742 std::lock_guard
l(lock
);
744 // NOTE: this isn't strictly necessary since we'll also get an MKVData
745 // update from the mon due to our subscription *before* our command is acked.
747 store_cache
[global_key
] = *val
;
749 store_cache
.erase(global_key
);
752 std::ostringstream cmd_json
;
754 jf
.open_object_section("cmd");
756 jf
.dump_string("prefix", "config-key set");
757 jf
.dump_string("key", global_key
);
758 jf
.dump_string("val", *val
);
760 jf
.dump_string("prefix", "config-key del");
761 jf
.dump_string("key", global_key
);
765 set_cmd
.run(&monc
, cmd_json
.str());
769 if (set_cmd
.r
!= 0) {
770 // config-key set will fail if mgr's auth key has insufficient
771 // permission to set config keys
772 // FIXME: should this somehow raise an exception back into Python land?
773 dout(0) << "`config-key set " << global_key
<< " " << val
<< "` failed: "
774 << cpp_strerror(set_cmd
.r
) << dendl
;
775 dout(0) << "mon returned " << set_cmd
.r
<< ": " << set_cmd
.outs
<< dendl
;
779 std::pair
<int, std::string
> ActivePyModules::set_config(
780 const std::string
&module_name
,
781 const std::string
&key
,
782 const std::optional
<std::string
>& val
)
784 return module_config
.set_config(&monc
, module_name
, key
, val
);
787 std::map
<std::string
, std::string
> ActivePyModules::get_services() const
789 std::map
<std::string
, std::string
> result
;
790 std::lock_guard
l(lock
);
791 for (const auto& [name
, module
] : modules
) {
792 std::string svc_str
= module
->get_uri();
793 if (!svc_str
.empty()) {
794 result
[name
] = svc_str
;
801 void ActivePyModules::update_kv_data(
802 const std::string prefix
,
804 const map
<std::string
, std::optional
<bufferlist
>, std::less
<>>& data
)
806 std::lock_guard
l(lock
);
807 bool do_config
= false;
809 dout(10) << "full update on " << prefix
<< dendl
;
810 auto p
= store_cache
.lower_bound(prefix
);
811 while (p
!= store_cache
.end() && p
->first
.find(prefix
) == 0) {
812 dout(20) << " rm prior " << p
->first
<< dendl
;
813 p
= store_cache
.erase(p
);
816 dout(10) << "incremental update on " << prefix
<< dendl
;
818 for (auto& i
: data
) {
820 dout(20) << " set " << i
.first
<< " = " << i
.second
->to_str() << dendl
;
821 store_cache
[i
.first
] = i
.second
->to_str();
823 dout(20) << " rm " << i
.first
<< dendl
;
824 store_cache
.erase(i
.first
);
826 if (i
.first
.find("config/") == 0) {
831 _refresh_config_map();
835 void ActivePyModules::_refresh_config_map()
839 for (auto p
= store_cache
.lower_bound("config/");
840 p
!= store_cache
.end() && p
->first
.find("config/") == 0;
842 string key
= p
->first
.substr(7);
843 if (key
.find("mgr/") == 0) {
844 // NOTE: for now, we ignore module options. see also ceph_foreign_option_get().
847 string value
= p
->second
;
850 config_map
.parse_key(key
, &name
, &who
);
852 const Option
*opt
= g_conf().find_option(name
);
854 config_map
.stray_options
.push_back(
855 std::unique_ptr
<Option
>(
856 new Option(name
, Option::TYPE_STR
, Option::LEVEL_UNKNOWN
)));
857 opt
= config_map
.stray_options
.back().get();
861 int r
= opt
->pre_validate(&value
, &err
);
863 dout(10) << __func__
<< " pre-validate failed on '" << name
<< "' = '"
864 << value
<< "' for " << name
<< dendl
;
867 MaskedOption
mopt(opt
);
868 mopt
.raw_value
= value
;
871 !ConfigMap::parse_mask(who
, §ion_name
, &mopt
.mask
)) {
872 derr
<< __func__
<< " invalid mask for key " << key
<< dendl
;
873 } else if (opt
->has_flag(Option::FLAG_NO_MON_UPDATE
)) {
874 dout(10) << __func__
<< " NO_MON_UPDATE option '"
875 << name
<< "' = '" << value
<< "' for " << name
878 Section
*section
= &config_map
.global
;;
879 if (section_name
.size() && section_name
!= "global") {
880 if (section_name
.find('.') != std::string::npos
) {
881 section
= &config_map
.by_id
[section_name
];
883 section
= &config_map
.by_type
[section_name
];
886 section
->options
.insert(make_pair(name
, std::move(mopt
)));
891 PyObject
* ActivePyModules::with_perf_counters(
892 std::function
<void(PerfCounterInstance
& counter_instance
, PerfCounterType
& counter_type
, PyFormatter
& f
)> fct
,
893 const std::string
&svc_name
,
894 const std::string
&svc_id
,
895 const std::string
&path
) const
898 f
.open_array_section(path
);
900 without_gil_t no_gil
;
901 std::lock_guard
l(lock
);
902 auto metadata
= daemon_state
.get(DaemonKey
{svc_name
, svc_id
});
904 std::lock_guard
l2(metadata
->lock
);
905 if (metadata
->perf_counters
.instances
.count(path
)) {
906 auto counter_instance
= metadata
->perf_counters
.instances
.at(path
);
907 auto counter_type
= metadata
->perf_counters
.types
.at(path
);
908 with_gil(no_gil
, [&] {
909 fct(counter_instance
, counter_type
, f
);
912 dout(4) << "Missing counter: '" << path
<< "' ("
913 << svc_name
<< "." << svc_id
<< ")" << dendl
;
914 dout(20) << "Paths are:" << dendl
;
915 for (const auto &i
: metadata
->perf_counters
.instances
) {
916 dout(20) << i
.first
<< dendl
;
920 dout(4) << "No daemon state for " << svc_name
<< "." << svc_id
<< ")"
928 PyObject
* ActivePyModules::get_counter_python(
929 const std::string
&svc_name
,
930 const std::string
&svc_id
,
931 const std::string
&path
)
933 auto extract_counters
= [](
934 PerfCounterInstance
& counter_instance
,
935 PerfCounterType
& counter_type
,
938 if (counter_type
.type
& PERFCOUNTER_LONGRUNAVG
) {
939 const auto &avg_data
= counter_instance
.get_data_avg();
940 for (const auto &datapoint
: avg_data
) {
941 f
.open_array_section("datapoint");
942 f
.dump_float("t", datapoint
.t
);
943 f
.dump_unsigned("s", datapoint
.s
);
944 f
.dump_unsigned("c", datapoint
.c
);
948 const auto &data
= counter_instance
.get_data();
949 for (const auto &datapoint
: data
) {
950 f
.open_array_section("datapoint");
951 f
.dump_float("t", datapoint
.t
);
952 f
.dump_unsigned("v", datapoint
.v
);
957 return with_perf_counters(extract_counters
, svc_name
, svc_id
, path
);
960 PyObject
* ActivePyModules::get_latest_counter_python(
961 const std::string
&svc_name
,
962 const std::string
&svc_id
,
963 const std::string
&path
)
965 auto extract_latest_counters
= [](
966 PerfCounterInstance
& counter_instance
,
967 PerfCounterType
& counter_type
,
970 if (counter_type
.type
& PERFCOUNTER_LONGRUNAVG
) {
971 const auto &datapoint
= counter_instance
.get_latest_data_avg();
972 f
.dump_float("t", datapoint
.t
);
973 f
.dump_unsigned("s", datapoint
.s
);
974 f
.dump_unsigned("c", datapoint
.c
);
976 const auto &datapoint
= counter_instance
.get_latest_data();
977 f
.dump_float("t", datapoint
.t
);
978 f
.dump_unsigned("v", datapoint
.v
);
981 return with_perf_counters(extract_latest_counters
, svc_name
, svc_id
, path
);
984 PyObject
* ActivePyModules::get_perf_schema_python(
985 const std::string
&svc_type
,
986 const std::string
&svc_id
)
988 without_gil_t no_gil
;
989 std::lock_guard
l(lock
);
991 DaemonStateCollection daemons
;
993 if (svc_type
== "") {
994 daemons
= daemon_state
.get_all();
995 } else if (svc_id
.empty()) {
996 daemons
= daemon_state
.get_by_service(svc_type
);
998 auto key
= DaemonKey
{svc_type
, svc_id
};
999 // so that the below can be a loop in all cases
1000 auto got
= daemon_state
.get(key
);
1001 if (got
!= nullptr) {
1006 auto f
= with_gil(no_gil
, [&] {
1007 return PyFormatter();
1009 if (!daemons
.empty()) {
1010 for (auto& [key
, state
] : daemons
) {
1011 std::lock_guard
l(state
->lock
);
1012 with_gil(no_gil
, [&, key
=ceph::to_string(key
), state
=state
] {
1013 f
.open_object_section(key
.c_str());
1014 for (auto ctr_inst_iter
: state
->perf_counters
.instances
) {
1015 const auto &counter_name
= ctr_inst_iter
.first
;
1016 f
.open_object_section(counter_name
.c_str());
1017 auto type
= state
->perf_counters
.types
[counter_name
];
1018 f
.dump_string("description", type
.description
);
1019 if (!type
.nick
.empty()) {
1020 f
.dump_string("nick", type
.nick
);
1022 f
.dump_unsigned("type", type
.type
);
1023 f
.dump_unsigned("priority", type
.priority
);
1024 f
.dump_unsigned("units", type
.unit
);
1031 dout(4) << __func__
<< ": No daemon state found for "
1032 << svc_type
<< "." << svc_id
<< ")" << dendl
;
1037 PyObject
* ActivePyModules::get_rocksdb_version()
1039 std::string version
= std::to_string(ROCKSDB_MAJOR
) + "." +
1040 std::to_string(ROCKSDB_MINOR
) + "." +
1041 std::to_string(ROCKSDB_PATCH
);
1043 return PyUnicode_FromString(version
.c_str());
1046 PyObject
*ActivePyModules::get_context()
1048 auto l
= without_gil([&] {
1049 return std::lock_guard(lock
);
1051 // Construct a capsule containing ceph context.
1052 // Not incrementing/decrementing ref count on the context because
1053 // it's the global one and it has process lifetime.
1054 auto capsule
= PyCapsule_New(g_ceph_context
, nullptr, nullptr);
1059 * Helper for our wrapped types that take a capsule in their constructor.
1061 PyObject
*construct_with_capsule(
1062 const std::string
&module_name
,
1063 const std::string
&clsname
,
1066 // Look up the OSDMap type which we will construct
1067 PyObject
*module
= PyImport_ImportModule(module_name
.c_str());
1069 derr
<< "Failed to import python module:" << dendl
;
1070 derr
<< handle_pyerror(true, module_name
,
1071 "construct_with_capsule "s
+ module_name
+ " " + clsname
) << dendl
;
1073 ceph_assert(module
);
1075 PyObject
*wrapper_type
= PyObject_GetAttrString(
1076 module
, (const char*)clsname
.c_str());
1077 if (!wrapper_type
) {
1078 derr
<< "Failed to get python type:" << dendl
;
1079 derr
<< handle_pyerror(true, module_name
,
1080 "construct_with_capsule "s
+ module_name
+ " " + clsname
) << dendl
;
1082 ceph_assert(wrapper_type
);
1084 // Construct a capsule containing an OSDMap.
1085 auto wrapped_capsule
= PyCapsule_New(wrapped
, nullptr, nullptr);
1086 ceph_assert(wrapped_capsule
);
1088 // Construct the python OSDMap
1089 auto pArgs
= PyTuple_Pack(1, wrapped_capsule
);
1090 auto wrapper_instance
= PyObject_CallObject(wrapper_type
, pArgs
);
1091 if (wrapper_instance
== nullptr) {
1092 derr
<< "Failed to construct python OSDMap:" << dendl
;
1093 derr
<< handle_pyerror(true, module_name
,
1094 "construct_with_capsule "s
+ module_name
+ " " + clsname
) << dendl
;
1096 ceph_assert(wrapper_instance
!= nullptr);
1098 Py_DECREF(wrapped_capsule
);
1100 Py_DECREF(wrapper_type
);
1103 return wrapper_instance
;
1106 PyObject
*ActivePyModules::get_osdmap()
1108 auto newmap
= without_gil([&] {
1109 OSDMap
*newmap
= new OSDMap
;
1110 cluster_state
.with_osdmap([&](const OSDMap
& o
) {
1111 newmap
->deepish_copy_from(o
);
1115 return construct_with_capsule("mgr_module", "OSDMap", (void*)newmap
);
1118 PyObject
*ActivePyModules::get_foreign_config(
1119 const std::string
& who
,
1120 const std::string
& name
)
1122 dout(10) << "ceph_foreign_option_get " << who
<< " " << name
<< dendl
;
1124 // NOTE: for now this will only work with build-in options, not module options.
1125 const Option
*opt
= g_conf().find_option(name
);
1127 dout(4) << "ceph_foreign_option_get " << name
<< " not found " << dendl
;
1128 PyErr_Format(PyExc_KeyError
, "option not found: %s", name
.c_str());
1132 // If the monitors are not yet running pacific, we cannot rely on our local
1134 if (!have_local_config_map
) {
1135 dout(20) << "mon cluster wasn't pacific when we started: falling back to 'config get'"
1137 without_gil_t no_gil
;
1140 std::lock_guard
l(lock
);
1143 "{\"prefix\": \"config get\","s
+
1144 "\"who\": \""s
+ who
+ "\","s
+
1145 "\"key\": \""s
+ name
+ "\"}");
1148 dout(10) << "ceph_foreign_option_get (mon command) " << who
<< " " << name
<< " = "
1149 << cmd
.outbl
.to_str() << dendl
;
1150 no_gil
.acquire_gil();
1151 return get_python_typed_option_value(opt
->type
, cmd
.outbl
.to_str());
1154 // mimic the behavor of mon/ConfigMonitor's 'config get' command
1156 if (!entity
.from_str(who
) &&
1157 !entity
.from_str(who
+ ".")) {
1158 dout(5) << "unrecognized entity '" << who
<< "'" << dendl
;
1159 PyErr_Format(PyExc_KeyError
, "invalid entity: %s", who
.c_str());
1163 without_gil_t no_gil
;
1166 // FIXME: this is super inefficient, since we generate the entire daemon
1167 // config just to extract one value from it!
1169 std::map
<std::string
,std::string
,std::less
<>> config
;
1170 cluster_state
.with_osdmap([&](const OSDMap
&osdmap
) {
1171 map
<string
,string
> crush_location
;
1172 string device_class
;
1173 if (entity
.is_osd()) {
1174 osdmap
.crush
->get_full_location(who
, &crush_location
);
1175 int id
= atoi(entity
.get_id().c_str());
1176 const char *c
= osdmap
.crush
->get_item_class(id
);
1180 dout(10) << __func__
<< " crush_location " << crush_location
1181 << " class " << device_class
<< dendl
;
1184 std::map
<std::string
,pair
<std::string
,const MaskedOption
*>> src
;
1185 config
= config_map
.generate_entity_map(
1193 // get a single value
1195 auto p
= config
.find(name
);
1196 if (p
!= config
.end()) {
1199 if (!entity
.is_client() &&
1200 opt
->daemon_value
!= Option::value_t
{}) {
1201 value
= Option::to_str(opt
->daemon_value
);
1203 value
= Option::to_str(opt
->value
);
1207 dout(10) << "ceph_foreign_option_get (configmap) " << who
<< " " << name
<< " = "
1210 no_gil
.acquire_gil();
1211 return get_python_typed_option_value(opt
->type
, value
);
1214 void ActivePyModules::set_health_checks(const std::string
& module_name
,
1215 health_check_map_t
&& checks
)
1217 bool changed
= false;
1220 auto p
= modules
.find(module_name
);
1221 if (p
!= modules
.end()) {
1222 changed
= p
->second
->set_health_checks(std::move(checks
));
1226 // immediately schedule a report to be sent to the monitors with the new
1227 // health checks that have changed. This is done asynchronusly to avoid
1228 // blocking python land. ActivePyModules::lock needs to be dropped to make
1231 // send_report callers: DaemonServer::lock -> PyModuleRegistery::lock
1232 // active_start: PyModuleRegistry::lock -> ActivePyModules::lock
1234 // if we don't release this->lock before calling schedule_tick a cycle is
1235 // formed with the addition of ActivePyModules::lock -> DaemonServer::lock.
1236 // This is still correct as send_report is run asynchronously under
1237 // DaemonServer::lock.
1239 server
.schedule_tick(0);
1242 int ActivePyModules::handle_command(
1243 const ModuleCommand
& module_command
,
1244 const MgrSession
& session
,
1245 const cmdmap_t
&cmdmap
,
1246 const bufferlist
&inbuf
,
1247 std::stringstream
*ds
,
1248 std::stringstream
*ss
)
1251 auto mod_iter
= modules
.find(module_command
.module_name
);
1252 if (mod_iter
== modules
.end()) {
1253 *ss
<< "Module '" << module_command
.module_name
<< "' is not available";
1259 return mod_iter
->second
->handle_command(module_command
, session
, cmdmap
,
1263 void ActivePyModules::get_health_checks(health_check_map_t
*checks
)
1265 std::lock_guard
l(lock
);
1266 for (auto& [name
, module
] : modules
) {
1267 dout(15) << "getting health checks for " << name
<< dendl
;
1268 module
->get_health_checks(checks
);
1272 void ActivePyModules::update_progress_event(
1273 const std::string
& evid
,
1274 const std::string
& desc
,
1278 std::lock_guard
l(lock
);
1279 auto& pe
= progress_events
[evid
];
1281 pe
.progress
= progress
;
1282 pe
.add_to_ceph_s
= add_to_ceph_s
;
1285 void ActivePyModules::complete_progress_event(const std::string
& evid
)
1287 std::lock_guard
l(lock
);
1288 progress_events
.erase(evid
);
1291 void ActivePyModules::clear_all_progress_events()
1293 std::lock_guard
l(lock
);
1294 progress_events
.clear();
1297 void ActivePyModules::get_progress_events(std::map
<std::string
,ProgressEvent
> *events
)
1299 std::lock_guard
l(lock
);
1300 *events
= progress_events
;
1303 void ActivePyModules::config_notify()
1305 std::lock_guard
l(lock
);
1306 for (auto& [name
, module
] : modules
) {
1307 // Send all python calls down a Finisher to avoid blocking
1308 // C++ code, and avoid any potential lock cycles.
1309 dout(15) << "notify (config) " << name
<< dendl
;
1310 // workaround for https://bugs.llvm.org/show_bug.cgi?id=35984
1311 finisher
.queue(new LambdaContext([module
=module
](int r
){
1312 module
->config_notify();
1317 void ActivePyModules::set_uri(const std::string
& module_name
,
1318 const std::string
&uri
)
1320 std::lock_guard
l(lock
);
1322 dout(4) << " module " << module_name
<< " set URI '" << uri
<< "'" << dendl
;
1324 modules
.at(module_name
)->set_uri(uri
);
1327 void ActivePyModules::set_device_wear_level(const std::string
& devid
,
1331 map
<string
,string
> meta
;
1332 daemon_state
.with_device(
1334 [wear_level
, &meta
] (DeviceState
& dev
) {
1335 dev
.set_wear_level(wear_level
);
1336 meta
= dev
.metadata
;
1340 json_spirit::Object json_object
;
1341 for (auto& i
: meta
) {
1342 json_spirit::Config::add(json_object
, i
.first
, i
.second
);
1345 json
.append(json_spirit::write(json_object
));
1348 "\"prefix\": \"config-key set\", "
1349 "\"key\": \"device/" + devid
+ "\""
1353 set_cmd
.run(&monc
, cmd
, json
);
1357 MetricQueryID
ActivePyModules::add_osd_perf_query(
1358 const OSDPerfMetricQuery
&query
,
1359 const std::optional
<OSDPerfMetricLimit
> &limit
)
1361 return server
.add_osd_perf_query(query
, limit
);
1364 void ActivePyModules::remove_osd_perf_query(MetricQueryID query_id
)
1366 int r
= server
.remove_osd_perf_query(query_id
);
1368 dout(0) << "remove_osd_perf_query for query_id=" << query_id
<< " failed: "
1369 << cpp_strerror(r
) << dendl
;
1373 PyObject
*ActivePyModules::get_osd_perf_counters(MetricQueryID query_id
)
1375 OSDPerfCollector
collector(query_id
);
1376 int r
= server
.get_osd_perf_counters(&collector
);
1378 dout(0) << "get_osd_perf_counters for query_id=" << query_id
<< " failed: "
1379 << cpp_strerror(r
) << dendl
;
1384 const std::map
<OSDPerfMetricKey
, PerformanceCounters
> &counters
= collector
.counters
;
1386 f
.open_array_section("counters");
1387 for (auto &[key
, instance_counters
] : counters
) {
1388 f
.open_object_section("i");
1389 f
.open_array_section("k");
1390 for (auto &sub_key
: key
) {
1391 f
.open_array_section("s");
1392 for (size_t i
= 0; i
< sub_key
.size(); i
++) {
1393 f
.dump_string(stringify(i
).c_str(), sub_key
[i
]);
1395 f
.close_section(); // s
1397 f
.close_section(); // k
1398 f
.open_array_section("c");
1399 for (auto &c
: instance_counters
) {
1400 f
.open_array_section("p");
1401 f
.dump_unsigned("0", c
.first
);
1402 f
.dump_unsigned("1", c
.second
);
1403 f
.close_section(); // p
1405 f
.close_section(); // c
1406 f
.close_section(); // i
1408 f
.close_section(); // counters
1413 MetricQueryID
ActivePyModules::add_mds_perf_query(
1414 const MDSPerfMetricQuery
&query
,
1415 const std::optional
<MDSPerfMetricLimit
> &limit
)
1417 return server
.add_mds_perf_query(query
, limit
);
1420 void ActivePyModules::remove_mds_perf_query(MetricQueryID query_id
)
1422 int r
= server
.remove_mds_perf_query(query_id
);
1424 dout(0) << "remove_mds_perf_query for query_id=" << query_id
<< " failed: "
1425 << cpp_strerror(r
) << dendl
;
1429 PyObject
*ActivePyModules::get_mds_perf_counters(MetricQueryID query_id
)
1431 MDSPerfCollector
collector(query_id
);
1432 int r
= server
.get_mds_perf_counters(&collector
);
1434 dout(0) << "get_mds_perf_counters for query_id=" << query_id
<< " failed: "
1435 << cpp_strerror(r
) << dendl
;
1440 const std::map
<MDSPerfMetricKey
, PerformanceCounters
> &counters
= collector
.counters
;
1442 f
.open_array_section("metrics");
1444 f
.open_array_section("delayed_ranks");
1445 f
.dump_string("ranks", stringify(collector
.delayed_ranks
).c_str());
1446 f
.close_section(); // delayed_ranks
1448 f
.open_array_section("counters");
1449 for (auto &[key
, instance_counters
] : counters
) {
1450 f
.open_object_section("i");
1451 f
.open_array_section("k");
1452 for (auto &sub_key
: key
) {
1453 f
.open_array_section("s");
1454 for (size_t i
= 0; i
< sub_key
.size(); i
++) {
1455 f
.dump_string(stringify(i
).c_str(), sub_key
[i
]);
1457 f
.close_section(); // s
1459 f
.close_section(); // k
1460 f
.open_array_section("c");
1461 for (auto &c
: instance_counters
) {
1462 f
.open_array_section("p");
1463 f
.dump_unsigned("0", c
.first
);
1464 f
.dump_unsigned("1", c
.second
);
1465 f
.close_section(); // p
1467 f
.close_section(); // c
1468 f
.close_section(); // i
1470 f
.close_section(); // counters
1471 f
.close_section(); // metrics
1476 void ActivePyModules::cluster_log(const std::string
&channel
, clog_type prio
,
1477 const std::string
&message
)
1479 std::lock_guard
l(lock
);
1481 auto cl
= monc
.get_log_client()->create_channel(channel
);
1482 cl
->parse_client_options(g_ceph_context
);
1483 cl
->do_log(prio
, message
);
1486 void ActivePyModules::register_client(std::string_view name
, std::string addrs
)
1488 std::lock_guard
l(lock
);
1490 entity_addrvec_t addrv
;
1491 addrv
.parse(addrs
.data());
1493 dout(7) << "registering msgr client handle " << addrv
<< dendl
;
1494 py_module_registry
.register_client(name
, std::move(addrv
));
1497 void ActivePyModules::unregister_client(std::string_view name
, std::string addrs
)
1499 std::lock_guard
l(lock
);
1501 entity_addrvec_t addrv
;
1502 addrv
.parse(addrs
.data());
1504 dout(7) << "unregistering msgr client handle " << addrv
<< dendl
;
1505 py_module_registry
.unregister_client(name
, addrv
);