1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2014 John Spray <john.spray@inktank.com>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
14 // Include this first to get python headers earlier
17 #include "common/errno.h"
18 #include "include/stringify.h"
20 #include "PyFormatter.h"
22 #include "osd/OSDMap.h"
23 #include "mon/MonMap.h"
24 #include "osd/osd_types.h"
25 #include "mgr/MgrContext.h"
27 // For ::mgr_store_prefix
29 #include "PyModuleRegistry.h"
32 #include "ActivePyModules.h"
33 #include "DaemonKey.h"
34 #include "DaemonServer.h"
36 #define dout_context g_ceph_context
37 #define dout_subsys ceph_subsys_mgr
39 #define dout_prefix *_dout << "mgr " << __func__ << " "
41 ActivePyModules::ActivePyModules(
42 PyModuleConfig
&module_config_
,
43 std::map
<std::string
, std::string
> store_data
,
44 bool mon_provides_kv_sub
,
45 DaemonStateIndex
&ds
, ClusterState
&cs
,
46 MonClient
&mc
, LogChannelRef clog_
,
47 LogChannelRef audit_clog_
, Objecter
&objecter_
,
48 Client
&client_
, Finisher
&f
, DaemonServer
&server
,
49 PyModuleRegistry
&pmr
)
50 : module_config(module_config_
), daemon_state(ds
), cluster_state(cs
),
51 monc(mc
), clog(clog_
), audit_clog(audit_clog_
), objecter(objecter_
),
52 client(client_
), finisher(f
),
53 cmd_finisher(g_ceph_context
, "cmd_finisher", "cmdfin"),
54 server(server
), py_module_registry(pmr
)
56 store_cache
= std::move(store_data
);
57 // we can only trust our ConfigMap if the mon cluster has provided
58 // kv sub since our startup.
59 have_local_config_map
= mon_provides_kv_sub
;
60 _refresh_config_map();
64 ActivePyModules::~ActivePyModules() = default;
66 void ActivePyModules::dump_server(const std::string
&hostname
,
67 const DaemonStateCollection
&dmc
,
70 f
->dump_string("hostname", hostname
);
71 f
->open_array_section("services");
72 std::string ceph_version
;
74 for (const auto &[key
, state
] : dmc
) {
75 without_gil([&ceph_version
, state
=state
] {
76 std::lock_guard
l(state
->lock
);
77 // TODO: pick the highest version, and make sure that
78 // somewhere else (during health reporting?) we are
79 // indicating to the user if we see mixed versions
80 auto ver_iter
= state
->metadata
.find("ceph_version");
81 if (ver_iter
!= state
->metadata
.end()) {
82 ceph_version
= state
->metadata
.at("ceph_version");
85 f
->open_object_section("service");
86 f
->dump_string("type", key
.type
);
87 f
->dump_string("id", key
.name
);
92 f
->dump_string("ceph_version", ceph_version
);
95 PyObject
*ActivePyModules::get_server_python(const std::string
&hostname
)
97 const auto dmc
= without_gil([&]{
98 std::lock_guard
l(lock
);
99 dout(10) << " (" << hostname
<< ")" << dendl
;
100 return daemon_state
.get_by_server(hostname
);
103 dump_server(hostname
, dmc
, &f
);
108 PyObject
*ActivePyModules::list_servers_python()
110 dout(10) << " >" << dendl
;
112 without_gil_t no_gil
;
113 return daemon_state
.with_daemons_by_server([this, &no_gil
]
114 (const std::map
<std::string
, DaemonStateCollection
> &all
) {
115 with_gil_t with_gil
{no_gil
};
116 PyFormatter
f(false, true);
117 for (const auto &[hostname
, daemon_state
] : all
) {
118 f
.open_object_section("server");
119 dump_server(hostname
, daemon_state
, &f
);
126 PyObject
*ActivePyModules::get_metadata_python(
127 const std::string
&svc_type
,
128 const std::string
&svc_id
)
130 auto metadata
= daemon_state
.get(DaemonKey
{svc_type
, svc_id
});
131 if (metadata
== nullptr) {
132 derr
<< "Requested missing service " << svc_type
<< "." << svc_id
<< dendl
;
135 auto l
= without_gil([&] {
136 return std::lock_guard(lock
);
139 f
.dump_string("hostname", metadata
->hostname
);
140 for (const auto &[key
, val
] : metadata
->metadata
) {
141 f
.dump_string(key
, val
);
147 PyObject
*ActivePyModules::get_daemon_status_python(
148 const std::string
&svc_type
,
149 const std::string
&svc_id
)
151 auto metadata
= daemon_state
.get(DaemonKey
{svc_type
, svc_id
});
152 if (metadata
== nullptr) {
153 derr
<< "Requested missing service " << svc_type
<< "." << svc_id
<< dendl
;
156 auto l
= without_gil([&] {
157 return std::lock_guard(lock
);
160 for (const auto &[daemon
, status
] : metadata
->service_status
) {
161 f
.dump_string(daemon
, status
);
166 PyObject
*ActivePyModules::get_python(const std::string
&what
)
170 // Drop the GIL, as most of the following blocks will block on
171 // a mutex -- they are all responsible for re-taking the GIL before
172 // touching the PyFormatter instance or returning from the function.
173 without_gil_t no_gil
;
175 if (what
== "fs_map") {
176 return cluster_state
.with_fsmap([&](const FSMap
&fsmap
) {
177 with_gil_t with_gil
{no_gil
};
181 } else if (what
== "osdmap_crush_map_text") {
183 cluster_state
.with_osdmap([&](const OSDMap
&osd_map
){
184 osd_map
.crush
->encode(rdata
, CEPH_FEATURES_SUPPORTED_DEFAULT
);
186 std::string crush_text
= rdata
.to_str();
187 with_gil_t with_gil
{no_gil
};
188 return PyUnicode_FromString(crush_text
.c_str());
189 } else if (what
.substr(0, 7) == "osd_map") {
190 return cluster_state
.with_osdmap([&](const OSDMap
&osd_map
){
191 with_gil_t with_gil
{no_gil
};
192 if (what
== "osd_map") {
194 } else if (what
== "osd_map_tree") {
195 osd_map
.print_tree(&f
, nullptr);
196 } else if (what
== "osd_map_crush") {
197 osd_map
.crush
->dump(&f
);
201 } else if (what
== "modified_config_options") {
202 auto all_daemons
= daemon_state
.get_all();
204 for (auto& [key
, daemon
] : all_daemons
) {
205 std::lock_guard
l(daemon
->lock
);
206 for (auto& [name
, valmap
] : daemon
->config
) {
210 with_gil_t with_gil
{no_gil
};
211 f
.open_array_section("options");
212 for (auto& name
: names
) {
213 f
.dump_string("name", name
);
217 } else if (what
.substr(0, 6) == "config") {
218 with_gil_t with_gil
{no_gil
};
219 if (what
== "config_options") {
220 g_conf().config_options(&f
);
221 } else if (what
== "config") {
222 g_conf().show_config(&f
);
225 } else if (what
== "mon_map") {
226 return cluster_state
.with_monmap([&](const MonMap
&monmap
) {
227 with_gil_t with_gil
{no_gil
};
231 } else if (what
== "service_map") {
232 return cluster_state
.with_servicemap([&](const ServiceMap
&service_map
) {
233 with_gil_t with_gil
{no_gil
};
234 service_map
.dump(&f
);
237 } else if (what
== "osd_metadata") {
238 auto dmc
= daemon_state
.get_by_service("osd");
239 for (const auto &[key
, state
] : dmc
) {
240 std::lock_guard
l(state
->lock
);
241 with_gil(no_gil
, [&f
, &name
=key
.name
, state
=state
] {
242 f
.open_object_section(name
.c_str());
243 f
.dump_string("hostname", state
->hostname
);
244 for (const auto &[name
, val
] : state
->metadata
) {
245 f
.dump_string(name
.c_str(), val
);
250 return with_gil(no_gil
, [&] { return f
.get(); });
251 } else if (what
== "mds_metadata") {
252 auto dmc
= daemon_state
.get_by_service("mds");
253 for (const auto &[key
, state
] : dmc
) {
254 std::lock_guard
l(state
->lock
);
255 with_gil(no_gil
, [&f
, &name
=key
.name
, state
=state
] {
256 f
.open_object_section(name
.c_str());
257 f
.dump_string("hostname", state
->hostname
);
258 for (const auto &[name
, val
] : state
->metadata
) {
259 f
.dump_string(name
.c_str(), val
);
264 return with_gil(no_gil
, [&] { return f
.get(); });
265 } else if (what
== "pg_summary") {
266 return cluster_state
.with_pgmap(
267 [&f
, &no_gil
](const PGMap
&pg_map
) {
268 std::map
<std::string
, std::map
<std::string
, uint32_t> > osds
;
269 std::map
<std::string
, std::map
<std::string
, uint32_t> > pools
;
270 std::map
<std::string
, uint32_t> all
;
271 for (const auto &i
: pg_map
.pg_stat
) {
272 const auto pool
= i
.first
.m_pool
;
273 const std::string state
= pg_state_string(i
.second
.state
);
274 // Insert to per-pool map
275 pools
[stringify(pool
)][state
]++;
276 for (const auto &osd_id
: i
.second
.acting
) {
277 osds
[stringify(osd_id
)][state
]++;
281 with_gil_t with_gil
{no_gil
};
282 f
.open_object_section("by_osd");
283 for (const auto &i
: osds
) {
284 f
.open_object_section(i
.first
.c_str());
285 for (const auto &j
: i
.second
) {
286 f
.dump_int(j
.first
.c_str(), j
.second
);
291 f
.open_object_section("by_pool");
292 for (const auto &i
: pools
) {
293 f
.open_object_section(i
.first
.c_str());
294 for (const auto &j
: i
.second
) {
295 f
.dump_int(j
.first
.c_str(), j
.second
);
300 f
.open_object_section("all");
301 for (const auto &i
: all
) {
302 f
.dump_int(i
.first
.c_str(), i
.second
);
305 f
.open_object_section("pg_stats_sum");
306 pg_map
.pg_sum
.dump(&f
);
311 } else if (what
== "pg_status") {
312 return cluster_state
.with_pgmap(
313 [&](const PGMap
&pg_map
) {
314 with_gil_t with_gil
{no_gil
};
315 pg_map
.print_summary(&f
, nullptr);
319 } else if (what
== "pg_dump") {
320 return cluster_state
.with_pgmap(
321 [&](const PGMap
&pg_map
) {
322 with_gil_t with_gil
{no_gil
};
323 pg_map
.dump(&f
, false);
327 } else if (what
== "devices") {
328 daemon_state
.with_devices2(
330 with_gil(no_gil
, [&] { f
.open_array_section("devices"); });
332 [&](const DeviceState
&dev
) {
333 with_gil(no_gil
, [&] { f
.dump_object("device", dev
); });
335 return with_gil(no_gil
, [&] {
339 } else if (what
.size() > 7 &&
340 what
.substr(0, 7) == "device ") {
341 string devid
= what
.substr(7);
342 if (!daemon_state
.with_device(devid
,
343 [&] (const DeviceState
& dev
) {
344 with_gil_t with_gil
{no_gil
};
345 f
.dump_object("device", dev
);
349 return with_gil(no_gil
, [&] { return f
.get(); });
350 } else if (what
== "io_rate") {
351 return cluster_state
.with_pgmap(
352 [&](const PGMap
&pg_map
) {
353 with_gil_t with_gil
{no_gil
};
354 pg_map
.dump_delta(&f
);
358 } else if (what
== "df") {
359 return cluster_state
.with_osdmap_and_pgmap(
361 const OSDMap
& osd_map
,
362 const PGMap
&pg_map
) {
363 with_gil_t with_gil
{no_gil
};
364 pg_map
.dump_cluster_stats(nullptr, &f
, true);
365 pg_map
.dump_pool_stats_full(osd_map
, nullptr, &f
, true);
368 } else if (what
== "pg_stats") {
369 return cluster_state
.with_pgmap([&](const PGMap
&pg_map
) {
370 with_gil_t with_gil
{no_gil
};
371 pg_map
.dump_pg_stats(&f
, false);
374 } else if (what
== "pool_stats") {
375 return cluster_state
.with_pgmap([&](const PGMap
&pg_map
) {
376 with_gil_t with_gil
{no_gil
};
377 pg_map
.dump_pool_stats(&f
);
380 } else if (what
== "pg_ready") {
381 with_gil_t with_gil
{no_gil
};
382 server
.dump_pg_ready(&f
);
384 } else if (what
== "osd_stats") {
385 return cluster_state
.with_pgmap([&](const PGMap
&pg_map
) {
386 with_gil_t with_gil
{no_gil
};
387 pg_map
.dump_osd_stats(&f
, false);
390 } else if (what
== "osd_ping_times") {
391 return cluster_state
.with_pgmap([&](const PGMap
&pg_map
) {
392 with_gil_t with_gil
{no_gil
};
393 pg_map
.dump_osd_ping_times(&f
);
396 } else if (what
== "osd_pool_stats") {
397 int64_t poolid
= -ENOENT
;
398 return cluster_state
.with_osdmap_and_pgmap([&](const OSDMap
& osdmap
,
399 const PGMap
& pg_map
) {
400 with_gil_t with_gil
{no_gil
};
401 f
.open_array_section("pool_stats");
402 for (auto &p
: osdmap
.get_pools()) {
404 pg_map
.dump_pool_stats_and_io_rate(poolid
, osdmap
, &f
, nullptr);
409 } else if (what
== "health") {
410 return cluster_state
.with_health([&](const ceph::bufferlist
&health_json
) {
411 with_gil_t with_gil
{no_gil
};
412 f
.dump_string("json", health_json
.to_str());
415 } else if (what
== "mon_status") {
416 return cluster_state
.with_mon_status(
417 [&](const ceph::bufferlist
&mon_status_json
) {
418 with_gil_t with_gil
{no_gil
};
419 f
.dump_string("json", mon_status_json
.to_str());
422 } else if (what
== "mgr_map") {
423 return cluster_state
.with_mgrmap([&](const MgrMap
&mgr_map
) {
424 with_gil_t with_gil
{no_gil
};
428 } else if (what
== "mgr_ips") {
429 entity_addrvec_t myaddrs
= server
.get_myaddrs();
430 with_gil_t with_gil
{no_gil
};
431 f
.open_array_section("ips");
432 std::set
<std::string
> did
;
433 for (auto& i
: myaddrs
.v
) {
434 std::string ip
= i
.ip_only_to_str();
435 if (auto [where
, inserted
] = did
.insert(ip
); inserted
) {
436 f
.dump_string("ip", ip
);
441 } else if (what
== "have_local_config_map") {
442 with_gil_t with_gil
{no_gil
};
443 f
.dump_bool("have_local_config_map", have_local_config_map
);
445 } else if (what
== "active_clean_pgs"){
446 cluster_state
.with_pgmap(
447 [&](const PGMap
&pg_map
) {
448 with_gil_t with_gil
{no_gil
};
449 f
.open_array_section("pg_stats");
450 for (auto &i
: pg_map
.pg_stat
) {
451 const auto state
= i
.second
.state
;
452 const auto pgid_raw
= i
.first
;
453 const auto pgid
= stringify(pgid_raw
.m_pool
) + "." + stringify(pgid_raw
.m_seed
);
454 const auto reported_epoch
= i
.second
.reported_epoch
;
455 if (state
& PG_STATE_ACTIVE
&& state
& PG_STATE_CLEAN
) {
456 f
.open_object_section("pg_stat");
457 f
.dump_string("pgid", pgid
);
458 f
.dump_string("state", pg_state_string(state
));
459 f
.dump_unsigned("reported_epoch", reported_epoch
);
464 const auto num_pg
= pg_map
.num_pg
;
465 f
.dump_unsigned("total_num_pgs", num_pg
);
469 derr
<< "Python module requested unknown data '" << what
<< "'" << dendl
;
470 with_gil_t with_gil
{no_gil
};
475 void ActivePyModules::start_one(PyModuleRef py_module
)
477 std::lock_guard
l(lock
);
479 const auto name
= py_module
->get_name();
480 auto active_module
= std::make_shared
<ActivePyModule
>(py_module
, clog
);
482 pending_modules
.insert(name
);
483 // Send all python calls down a Finisher to avoid blocking
484 // C++ code, and avoid any potential lock cycles.
485 finisher
.queue(new LambdaContext([this, active_module
, name
](int) {
486 int r
= active_module
->load(this);
487 std::lock_guard
l(lock
);
488 pending_modules
.erase(name
);
490 derr
<< "Failed to run module in active mode ('" << name
<< "')"
493 auto em
= modules
.emplace(name
, active_module
);
494 ceph_assert(em
.second
); // actually inserted
496 dout(4) << "Starting thread for " << name
<< dendl
;
497 active_module
->thread
.create(active_module
->get_thread_name());
502 void ActivePyModules::shutdown()
504 std::lock_guard
locker(lock
);
506 // Signal modules to drop out of serve() and/or tear down resources
507 for (auto& [name
, module
] : modules
) {
509 dout(10) << "calling module " << name
<< " shutdown()" << dendl
;
511 dout(10) << "module " << name
<< " shutdown() returned" << dendl
;
515 // For modules implementing serve(), finish the threads where we
516 // were running that.
517 for (auto& [name
, module
] : modules
) {
519 dout(10) << "joining module " << name
<< dendl
;
520 module
->thread
.join();
521 dout(10) << "joined module " << name
<< dendl
;
525 cmd_finisher
.wait_for_empty();
531 void ActivePyModules::notify_all(const std::string
¬ify_type
,
532 const std::string
¬ify_id
)
534 std::lock_guard
l(lock
);
536 dout(10) << __func__
<< ": notify_all " << notify_type
<< dendl
;
537 for (auto& [name
, module
] : modules
) {
538 // Send all python calls down a Finisher to avoid blocking
539 // C++ code, and avoid any potential lock cycles.
540 dout(15) << "queuing notify to " << name
<< dendl
;
541 // workaround for https://bugs.llvm.org/show_bug.cgi?id=35984
542 finisher
.queue(new LambdaContext([module
=module
, notify_type
, notify_id
]
544 module
->notify(notify_type
, notify_id
);
549 void ActivePyModules::notify_all(const LogEntry
&log_entry
)
551 std::lock_guard
l(lock
);
553 dout(10) << __func__
<< ": notify_all (clog)" << dendl
;
554 for (auto& [name
, module
] : modules
) {
555 // Send all python calls down a Finisher to avoid blocking
556 // C++ code, and avoid any potential lock cycles.
558 // Note intentional use of non-reference lambda binding on
559 // log_entry: we take a copy because caller's instance is
560 // probably ephemeral.
561 dout(15) << "queuing notify (clog) to " << name
<< dendl
;
562 // workaround for https://bugs.llvm.org/show_bug.cgi?id=35984
563 finisher
.queue(new LambdaContext([module
=module
, log_entry
](int r
){
564 module
->notify_clog(log_entry
);
569 bool ActivePyModules::get_store(const std::string
&module_name
,
570 const std::string
&key
, std::string
*val
) const
572 without_gil_t no_gil
;
573 std::lock_guard
l(lock
);
575 const std::string global_key
= PyModule::mgr_store_prefix
576 + module_name
+ "/" + key
;
578 dout(4) << __func__
<< " key: " << global_key
<< dendl
;
580 auto i
= store_cache
.find(global_key
);
581 if (i
!= store_cache
.end()) {
589 PyObject
*ActivePyModules::dispatch_remote(
590 const std::string
&other_module
,
591 const std::string
&method
,
596 auto mod_iter
= modules
.find(other_module
);
597 ceph_assert(mod_iter
!= modules
.end());
599 return mod_iter
->second
->dispatch_remote(method
, args
, kwargs
, err
);
602 bool ActivePyModules::get_config(const std::string
&module_name
,
603 const std::string
&key
, std::string
*val
) const
605 const std::string global_key
= "mgr/" + module_name
+ "/" + key
;
607 dout(20) << " key: " << global_key
<< dendl
;
609 std::lock_guard
lock(module_config
.lock
);
611 auto i
= module_config
.config
.find(global_key
);
612 if (i
!= module_config
.config
.end()) {
620 PyObject
*ActivePyModules::get_typed_config(
621 const std::string
&module_name
,
622 const std::string
&key
,
623 const std::string
&prefix
) const
625 without_gil_t no_gil
;
627 std::string final_key
;
630 final_key
= prefix
+ "/" + key
;
631 found
= get_config(module_name
, final_key
, &value
);
635 found
= get_config(module_name
, final_key
, &value
);
638 PyModuleRef module
= py_module_registry
.get_module(module_name
);
639 with_gil_t with_gil
{no_gil
};
641 derr
<< "Module '" << module_name
<< "' is not available" << dendl
;
644 // removing value to hide sensitive data going into mgr logs
645 // leaving this for debugging purposes
646 // dout(10) << __func__ << " " << final_key << " found: " << value << dendl;
647 dout(10) << __func__
<< " " << final_key
<< " found" << dendl
;
648 return module
->get_typed_option_value(key
, value
);
651 dout(10) << " [" << prefix
<< "/]" << key
<< " not found "
654 dout(10) << " " << key
<< " not found " << dendl
;
656 with_gil_t with_gil
{no_gil
};
660 PyObject
*ActivePyModules::get_store_prefix(const std::string
&module_name
,
661 const std::string
&prefix
) const
663 without_gil_t no_gil
;
664 std::lock_guard
l(lock
);
665 std::lock_guard
lock(module_config
.lock
);
667 const std::string base_prefix
= PyModule::mgr_store_prefix
669 const std::string global_prefix
= base_prefix
+ prefix
;
670 dout(4) << __func__
<< " prefix: " << global_prefix
<< dendl
;
672 return with_gil(no_gil
, [&] {
674 for (auto p
= store_cache
.lower_bound(global_prefix
);
675 p
!= store_cache
.end() && p
->first
.find(global_prefix
) == 0; ++p
) {
676 f
.dump_string(p
->first
.c_str() + base_prefix
.size(), p
->second
);
682 void ActivePyModules::set_store(const std::string
&module_name
,
683 const std::string
&key
, const boost::optional
<std::string
>& val
)
685 const std::string global_key
= PyModule::mgr_store_prefix
686 + module_name
+ "/" + key
;
690 std::lock_guard
l(lock
);
692 // NOTE: this isn't strictly necessary since we'll also get an MKVData
693 // update from the mon due to our subscription *before* our command is acked.
695 store_cache
[global_key
] = *val
;
697 store_cache
.erase(global_key
);
700 std::ostringstream cmd_json
;
702 jf
.open_object_section("cmd");
704 jf
.dump_string("prefix", "config-key set");
705 jf
.dump_string("key", global_key
);
706 jf
.dump_string("val", *val
);
708 jf
.dump_string("prefix", "config-key del");
709 jf
.dump_string("key", global_key
);
713 set_cmd
.run(&monc
, cmd_json
.str());
717 if (set_cmd
.r
!= 0) {
718 // config-key set will fail if mgr's auth key has insufficient
719 // permission to set config keys
720 // FIXME: should this somehow raise an exception back into Python land?
721 dout(0) << "`config-key set " << global_key
<< " " << val
<< "` failed: "
722 << cpp_strerror(set_cmd
.r
) << dendl
;
723 dout(0) << "mon returned " << set_cmd
.r
<< ": " << set_cmd
.outs
<< dendl
;
727 void ActivePyModules::set_config(const std::string
&module_name
,
728 const std::string
&key
, const boost::optional
<std::string
>& val
)
730 module_config
.set_config(&monc
, module_name
, key
, val
);
733 std::map
<std::string
, std::string
> ActivePyModules::get_services() const
735 std::map
<std::string
, std::string
> result
;
736 std::lock_guard
l(lock
);
737 for (const auto& [name
, module
] : modules
) {
738 std::string svc_str
= module
->get_uri();
739 if (!svc_str
.empty()) {
740 result
[name
] = svc_str
;
747 void ActivePyModules::update_kv_data(
748 const std::string prefix
,
750 const map
<std::string
, boost::optional
<bufferlist
>, std::less
<>>& data
)
752 std::lock_guard
l(lock
);
753 bool do_config
= false;
755 dout(10) << "full update on " << prefix
<< dendl
;
756 auto p
= store_cache
.lower_bound(prefix
);
757 while (p
!= store_cache
.end() && p
->first
.find(prefix
) == 0) {
758 dout(20) << " rm prior " << p
->first
<< dendl
;
759 p
= store_cache
.erase(p
);
762 dout(10) << "incremental update on " << prefix
<< dendl
;
764 for (auto& i
: data
) {
766 dout(20) << " set " << i
.first
<< " = " << i
.second
->to_str() << dendl
;
767 store_cache
[i
.first
] = i
.second
->to_str();
769 dout(20) << " rm " << i
.first
<< dendl
;
770 store_cache
.erase(i
.first
);
772 if (i
.first
.find("config/") == 0) {
777 _refresh_config_map();
781 void ActivePyModules::_refresh_config_map()
785 for (auto p
= store_cache
.lower_bound("config/");
786 p
!= store_cache
.end() && p
->first
.find("config/") == 0;
788 string key
= p
->first
.substr(7);
789 if (key
.find("mgr/") == 0) {
790 // NOTE: for now, we ignore module options. see also ceph_foreign_option_get().
793 string value
= p
->second
;
796 config_map
.parse_key(key
, &name
, &who
);
798 const Option
*opt
= g_conf().find_option(name
);
800 config_map
.stray_options
.push_back(
801 std::unique_ptr
<Option
>(
802 new Option(name
, Option::TYPE_STR
, Option::LEVEL_UNKNOWN
)));
803 opt
= config_map
.stray_options
.back().get();
807 int r
= opt
->pre_validate(&value
, &err
);
809 dout(10) << __func__
<< " pre-validate failed on '" << name
<< "' = '"
810 << value
<< "' for " << name
<< dendl
;
813 MaskedOption
mopt(opt
);
814 mopt
.raw_value
= value
;
817 !ConfigMap::parse_mask(who
, §ion_name
, &mopt
.mask
)) {
818 derr
<< __func__
<< " invalid mask for key " << key
<< dendl
;
819 } else if (opt
->has_flag(Option::FLAG_NO_MON_UPDATE
)) {
820 dout(10) << __func__
<< " NO_MON_UPDATE option '"
821 << name
<< "' = '" << value
<< "' for " << name
824 Section
*section
= &config_map
.global
;;
825 if (section_name
.size() && section_name
!= "global") {
826 if (section_name
.find('.') != std::string::npos
) {
827 section
= &config_map
.by_id
[section_name
];
829 section
= &config_map
.by_type
[section_name
];
832 section
->options
.insert(make_pair(name
, std::move(mopt
)));
837 PyObject
* ActivePyModules::with_perf_counters(
838 std::function
<void(PerfCounterInstance
& counter_instance
, PerfCounterType
& counter_type
, PyFormatter
& f
)> fct
,
839 const std::string
&svc_name
,
840 const std::string
&svc_id
,
841 const std::string
&path
) const
844 f
.open_array_section(path
);
846 without_gil_t no_gil
;
847 std::lock_guard
l(lock
);
848 auto metadata
= daemon_state
.get(DaemonKey
{svc_name
, svc_id
});
850 std::lock_guard
l2(metadata
->lock
);
851 if (metadata
->perf_counters
.instances
.count(path
)) {
852 auto counter_instance
= metadata
->perf_counters
.instances
.at(path
);
853 auto counter_type
= metadata
->perf_counters
.types
.at(path
);
854 with_gil(no_gil
, [&] {
855 fct(counter_instance
, counter_type
, f
);
858 dout(4) << "Missing counter: '" << path
<< "' ("
859 << svc_name
<< "." << svc_id
<< ")" << dendl
;
860 dout(20) << "Paths are:" << dendl
;
861 for (const auto &i
: metadata
->perf_counters
.instances
) {
862 dout(20) << i
.first
<< dendl
;
866 dout(4) << "No daemon state for " << svc_name
<< "." << svc_id
<< ")"
874 PyObject
* ActivePyModules::get_counter_python(
875 const std::string
&svc_name
,
876 const std::string
&svc_id
,
877 const std::string
&path
)
879 auto extract_counters
= [](
880 PerfCounterInstance
& counter_instance
,
881 PerfCounterType
& counter_type
,
884 if (counter_type
.type
& PERFCOUNTER_LONGRUNAVG
) {
885 const auto &avg_data
= counter_instance
.get_data_avg();
886 for (const auto &datapoint
: avg_data
) {
887 f
.open_array_section("datapoint");
888 f
.dump_float("t", datapoint
.t
);
889 f
.dump_unsigned("s", datapoint
.s
);
890 f
.dump_unsigned("c", datapoint
.c
);
894 const auto &data
= counter_instance
.get_data();
895 for (const auto &datapoint
: data
) {
896 f
.open_array_section("datapoint");
897 f
.dump_float("t", datapoint
.t
);
898 f
.dump_unsigned("v", datapoint
.v
);
903 return with_perf_counters(extract_counters
, svc_name
, svc_id
, path
);
906 PyObject
* ActivePyModules::get_latest_counter_python(
907 const std::string
&svc_name
,
908 const std::string
&svc_id
,
909 const std::string
&path
)
911 auto extract_latest_counters
= [](
912 PerfCounterInstance
& counter_instance
,
913 PerfCounterType
& counter_type
,
916 if (counter_type
.type
& PERFCOUNTER_LONGRUNAVG
) {
917 const auto &datapoint
= counter_instance
.get_latest_data_avg();
918 f
.dump_float("t", datapoint
.t
);
919 f
.dump_unsigned("s", datapoint
.s
);
920 f
.dump_unsigned("c", datapoint
.c
);
922 const auto &datapoint
= counter_instance
.get_latest_data();
923 f
.dump_float("t", datapoint
.t
);
924 f
.dump_unsigned("v", datapoint
.v
);
927 return with_perf_counters(extract_latest_counters
, svc_name
, svc_id
, path
);
930 PyObject
* ActivePyModules::get_perf_schema_python(
931 const std::string
&svc_type
,
932 const std::string
&svc_id
)
934 without_gil_t no_gil
;
935 std::lock_guard
l(lock
);
937 DaemonStateCollection daemons
;
939 if (svc_type
== "") {
940 daemons
= daemon_state
.get_all();
941 } else if (svc_id
.empty()) {
942 daemons
= daemon_state
.get_by_service(svc_type
);
944 auto key
= DaemonKey
{svc_type
, svc_id
};
945 // so that the below can be a loop in all cases
946 auto got
= daemon_state
.get(key
);
947 if (got
!= nullptr) {
952 auto f
= with_gil(no_gil
, [&] {
953 return PyFormatter();
955 if (!daemons
.empty()) {
956 for (auto& [key
, state
] : daemons
) {
957 std::lock_guard
l(state
->lock
);
958 with_gil(no_gil
, [&, key
=ceph::to_string(key
), state
=state
] {
959 f
.open_object_section(key
.c_str());
960 for (auto ctr_inst_iter
: state
->perf_counters
.instances
) {
961 const auto &counter_name
= ctr_inst_iter
.first
;
962 f
.open_object_section(counter_name
.c_str());
963 auto type
= state
->perf_counters
.types
[counter_name
];
964 f
.dump_string("description", type
.description
);
965 if (!type
.nick
.empty()) {
966 f
.dump_string("nick", type
.nick
);
968 f
.dump_unsigned("type", type
.type
);
969 f
.dump_unsigned("priority", type
.priority
);
970 f
.dump_unsigned("units", type
.unit
);
977 dout(4) << __func__
<< ": No daemon state found for "
978 << svc_type
<< "." << svc_id
<< ")" << dendl
;
983 PyObject
*ActivePyModules::get_context()
985 auto l
= without_gil([&] {
986 return std::lock_guard(lock
);
988 // Construct a capsule containing ceph context.
989 // Not incrementing/decrementing ref count on the context because
990 // it's the global one and it has process lifetime.
991 auto capsule
= PyCapsule_New(g_ceph_context
, nullptr, nullptr);
996 * Helper for our wrapped types that take a capsule in their constructor.
998 PyObject
*construct_with_capsule(
999 const std::string
&module_name
,
1000 const std::string
&clsname
,
1003 // Look up the OSDMap type which we will construct
1004 PyObject
*module
= PyImport_ImportModule(module_name
.c_str());
1006 derr
<< "Failed to import python module:" << dendl
;
1007 derr
<< handle_pyerror() << dendl
;
1009 ceph_assert(module
);
1011 PyObject
*wrapper_type
= PyObject_GetAttrString(
1012 module
, (const char*)clsname
.c_str());
1013 if (!wrapper_type
) {
1014 derr
<< "Failed to get python type:" << dendl
;
1015 derr
<< handle_pyerror() << dendl
;
1017 ceph_assert(wrapper_type
);
1019 // Construct a capsule containing an OSDMap.
1020 auto wrapped_capsule
= PyCapsule_New(wrapped
, nullptr, nullptr);
1021 ceph_assert(wrapped_capsule
);
1023 // Construct the python OSDMap
1024 auto pArgs
= PyTuple_Pack(1, wrapped_capsule
);
1025 auto wrapper_instance
= PyObject_CallObject(wrapper_type
, pArgs
);
1026 if (wrapper_instance
== nullptr) {
1027 derr
<< "Failed to construct python OSDMap:" << dendl
;
1028 derr
<< handle_pyerror() << dendl
;
1030 ceph_assert(wrapper_instance
!= nullptr);
1032 Py_DECREF(wrapped_capsule
);
1034 Py_DECREF(wrapper_type
);
1037 return wrapper_instance
;
1040 PyObject
*ActivePyModules::get_osdmap()
1042 auto newmap
= without_gil([&] {
1043 OSDMap
*newmap
= new OSDMap
;
1044 cluster_state
.with_osdmap([&](const OSDMap
& o
) {
1045 newmap
->deepish_copy_from(o
);
1049 return construct_with_capsule("mgr_module", "OSDMap", (void*)newmap
);
1052 PyObject
*ActivePyModules::get_foreign_config(
1053 const std::string
& who
,
1054 const std::string
& name
)
1056 dout(10) << "ceph_foreign_option_get " << who
<< " " << name
<< dendl
;
1058 // NOTE: for now this will only work with build-in options, not module options.
1059 const Option
*opt
= g_conf().find_option(name
);
1061 dout(4) << "ceph_foreign_option_get " << name
<< " not found " << dendl
;
1062 PyErr_Format(PyExc_KeyError
, "option not found: %s", name
.c_str());
1066 // If the monitors are not yet running pacific, we cannot rely on our local
1068 if (!have_local_config_map
) {
1069 dout(20) << "mon cluster wasn't pacific when we started: falling back to 'config get'"
1071 without_gil_t no_gil
;
1074 std::lock_guard
l(lock
);
1077 "{\"prefix\": \"config get\","s
+
1078 "\"who\": \""s
+ who
+ "\","s
+
1079 "\"key\": \""s
+ name
+ "\"}");
1082 dout(10) << "ceph_foreign_option_get (mon command) " << who
<< " " << name
<< " = "
1083 << cmd
.outbl
.to_str() << dendl
;
1084 with_gil_t
gil(no_gil
);
1085 return get_python_typed_option_value(opt
->type
, cmd
.outbl
.to_str());
1088 // mimic the behavor of mon/ConfigMonitor's 'config get' command
1090 if (!entity
.from_str(who
) &&
1091 !entity
.from_str(who
+ ".")) {
1092 dout(5) << "unrecognized entity '" << who
<< "'" << dendl
;
1093 PyErr_Format(PyExc_KeyError
, "invalid entity: %s", who
.c_str());
1097 without_gil_t no_gil
;
1100 // FIXME: this is super inefficient, since we generate the entire daemon
1101 // config just to extract one value from it!
1103 std::map
<std::string
,std::string
,std::less
<>> config
;
1104 cluster_state
.with_osdmap([&](const OSDMap
&osdmap
) {
1105 map
<string
,string
> crush_location
;
1106 string device_class
;
1107 if (entity
.is_osd()) {
1108 osdmap
.crush
->get_full_location(who
, &crush_location
);
1109 int id
= atoi(entity
.get_id().c_str());
1110 const char *c
= osdmap
.crush
->get_item_class(id
);
1114 dout(10) << __func__
<< " crush_location " << crush_location
1115 << " class " << device_class
<< dendl
;
1118 std::map
<std::string
,pair
<std::string
,const MaskedOption
*>> src
;
1119 config
= config_map
.generate_entity_map(
1127 // get a single value
1129 auto p
= config
.find(name
);
1130 if (p
!= config
.end()) {
1133 if (!entity
.is_client() &&
1134 !boost::get
<boost::blank
>(&opt
->daemon_value
)) {
1135 value
= Option::to_str(opt
->daemon_value
);
1137 value
= Option::to_str(opt
->value
);
1141 dout(10) << "ceph_foreign_option_get (configmap) " << who
<< " " << name
<< " = "
1144 with_gil_t
with_gil(no_gil
);
1145 return get_python_typed_option_value(opt
->type
, value
);
1148 void ActivePyModules::set_health_checks(const std::string
& module_name
,
1149 health_check_map_t
&& checks
)
1151 bool changed
= false;
1154 auto p
= modules
.find(module_name
);
1155 if (p
!= modules
.end()) {
1156 changed
= p
->second
->set_health_checks(std::move(checks
));
1160 // immediately schedule a report to be sent to the monitors with the new
1161 // health checks that have changed. This is done asynchronusly to avoid
1162 // blocking python land. ActivePyModules::lock needs to be dropped to make
1165 // send_report callers: DaemonServer::lock -> PyModuleRegistery::lock
1166 // active_start: PyModuleRegistry::lock -> ActivePyModules::lock
1168 // if we don't release this->lock before calling schedule_tick a cycle is
1169 // formed with the addition of ActivePyModules::lock -> DaemonServer::lock.
1170 // This is still correct as send_report is run asynchronously under
1171 // DaemonServer::lock.
1173 server
.schedule_tick(0);
1176 int ActivePyModules::handle_command(
1177 const ModuleCommand
& module_command
,
1178 const MgrSession
& session
,
1179 const cmdmap_t
&cmdmap
,
1180 const bufferlist
&inbuf
,
1181 std::stringstream
*ds
,
1182 std::stringstream
*ss
)
1185 auto mod_iter
= modules
.find(module_command
.module_name
);
1186 if (mod_iter
== modules
.end()) {
1187 *ss
<< "Module '" << module_command
.module_name
<< "' is not available";
1193 return mod_iter
->second
->handle_command(module_command
, session
, cmdmap
,
1197 void ActivePyModules::get_health_checks(health_check_map_t
*checks
)
1199 std::lock_guard
l(lock
);
1200 for (auto& [name
, module
] : modules
) {
1201 dout(15) << "getting health checks for " << name
<< dendl
;
1202 module
->get_health_checks(checks
);
1206 void ActivePyModules::update_progress_event(
1207 const std::string
& evid
,
1208 const std::string
& desc
,
1212 std::lock_guard
l(lock
);
1213 auto& pe
= progress_events
[evid
];
1215 pe
.progress
= progress
;
1216 pe
.add_to_ceph_s
= add_to_ceph_s
;
1219 void ActivePyModules::complete_progress_event(const std::string
& evid
)
1221 std::lock_guard
l(lock
);
1222 progress_events
.erase(evid
);
1225 void ActivePyModules::clear_all_progress_events()
1227 std::lock_guard
l(lock
);
1228 progress_events
.clear();
1231 void ActivePyModules::get_progress_events(std::map
<std::string
,ProgressEvent
> *events
)
1233 std::lock_guard
l(lock
);
1234 *events
= progress_events
;
1237 void ActivePyModules::config_notify()
1239 std::lock_guard
l(lock
);
1240 for (auto& [name
, module
] : modules
) {
1241 // Send all python calls down a Finisher to avoid blocking
1242 // C++ code, and avoid any potential lock cycles.
1243 dout(15) << "notify (config) " << name
<< dendl
;
1244 // workaround for https://bugs.llvm.org/show_bug.cgi?id=35984
1245 finisher
.queue(new LambdaContext([module
=module
](int r
){
1246 module
->config_notify();
1251 void ActivePyModules::set_uri(const std::string
& module_name
,
1252 const std::string
&uri
)
1254 std::lock_guard
l(lock
);
1256 dout(4) << " module " << module_name
<< " set URI '" << uri
<< "'" << dendl
;
1258 modules
.at(module_name
)->set_uri(uri
);
1261 void ActivePyModules::set_device_wear_level(const std::string
& devid
,
1265 map
<string
,string
> meta
;
1266 daemon_state
.with_device(
1268 [wear_level
, &meta
] (DeviceState
& dev
) {
1269 dev
.set_wear_level(wear_level
);
1270 meta
= dev
.metadata
;
1274 json_spirit::Object json_object
;
1275 for (auto& i
: meta
) {
1276 json_spirit::Config::add(json_object
, i
.first
, i
.second
);
1279 json
.append(json_spirit::write(json_object
));
1282 "\"prefix\": \"config-key set\", "
1283 "\"key\": \"device/" + devid
+ "\""
1287 set_cmd
.run(&monc
, cmd
, json
);
1291 MetricQueryID
ActivePyModules::add_osd_perf_query(
1292 const OSDPerfMetricQuery
&query
,
1293 const std::optional
<OSDPerfMetricLimit
> &limit
)
1295 return server
.add_osd_perf_query(query
, limit
);
1298 void ActivePyModules::remove_osd_perf_query(MetricQueryID query_id
)
1300 int r
= server
.remove_osd_perf_query(query_id
);
1302 dout(0) << "remove_osd_perf_query for query_id=" << query_id
<< " failed: "
1303 << cpp_strerror(r
) << dendl
;
1307 PyObject
*ActivePyModules::get_osd_perf_counters(MetricQueryID query_id
)
1309 OSDPerfCollector
collector(query_id
);
1310 int r
= server
.get_osd_perf_counters(&collector
);
1312 dout(0) << "get_osd_perf_counters for query_id=" << query_id
<< " failed: "
1313 << cpp_strerror(r
) << dendl
;
1318 const std::map
<OSDPerfMetricKey
, PerformanceCounters
> &counters
= collector
.counters
;
1320 f
.open_array_section("counters");
1321 for (auto &[key
, instance_counters
] : counters
) {
1322 f
.open_object_section("i");
1323 f
.open_array_section("k");
1324 for (auto &sub_key
: key
) {
1325 f
.open_array_section("s");
1326 for (size_t i
= 0; i
< sub_key
.size(); i
++) {
1327 f
.dump_string(stringify(i
).c_str(), sub_key
[i
]);
1329 f
.close_section(); // s
1331 f
.close_section(); // k
1332 f
.open_array_section("c");
1333 for (auto &c
: instance_counters
) {
1334 f
.open_array_section("p");
1335 f
.dump_unsigned("0", c
.first
);
1336 f
.dump_unsigned("1", c
.second
);
1337 f
.close_section(); // p
1339 f
.close_section(); // c
1340 f
.close_section(); // i
1342 f
.close_section(); // counters
1347 MetricQueryID
ActivePyModules::add_mds_perf_query(
1348 const MDSPerfMetricQuery
&query
,
1349 const std::optional
<MDSPerfMetricLimit
> &limit
)
1351 return server
.add_mds_perf_query(query
, limit
);
1354 void ActivePyModules::remove_mds_perf_query(MetricQueryID query_id
)
1356 int r
= server
.remove_mds_perf_query(query_id
);
1358 dout(0) << "remove_mds_perf_query for query_id=" << query_id
<< " failed: "
1359 << cpp_strerror(r
) << dendl
;
1363 PyObject
*ActivePyModules::get_mds_perf_counters(MetricQueryID query_id
)
1365 MDSPerfCollector
collector(query_id
);
1366 int r
= server
.get_mds_perf_counters(&collector
);
1368 dout(0) << "get_mds_perf_counters for query_id=" << query_id
<< " failed: "
1369 << cpp_strerror(r
) << dendl
;
1374 const std::map
<MDSPerfMetricKey
, PerformanceCounters
> &counters
= collector
.counters
;
1376 f
.open_array_section("metrics");
1378 f
.open_array_section("delayed_ranks");
1379 f
.dump_string("ranks", stringify(collector
.delayed_ranks
).c_str());
1380 f
.close_section(); // delayed_ranks
1382 f
.open_array_section("counters");
1383 for (auto &[key
, instance_counters
] : counters
) {
1384 f
.open_object_section("i");
1385 f
.open_array_section("k");
1386 for (auto &sub_key
: key
) {
1387 f
.open_array_section("s");
1388 for (size_t i
= 0; i
< sub_key
.size(); i
++) {
1389 f
.dump_string(stringify(i
).c_str(), sub_key
[i
]);
1391 f
.close_section(); // s
1393 f
.close_section(); // k
1394 f
.open_array_section("c");
1395 for (auto &c
: instance_counters
) {
1396 f
.open_array_section("p");
1397 f
.dump_unsigned("0", c
.first
);
1398 f
.dump_unsigned("1", c
.second
);
1399 f
.close_section(); // p
1401 f
.close_section(); // c
1402 f
.close_section(); // i
1404 f
.close_section(); // counters
1405 f
.close_section(); // metrics
1410 void ActivePyModules::cluster_log(const std::string
&channel
, clog_type prio
,
1411 const std::string
&message
)
1413 std::lock_guard
l(lock
);
1415 auto cl
= monc
.get_log_client()->create_channel(channel
);
1416 map
<string
,string
> log_to_monitors
;
1417 map
<string
,string
> log_to_syslog
;
1418 map
<string
,string
> log_channel
;
1419 map
<string
,string
> log_prio
;
1420 map
<string
,string
> log_to_graylog
;
1421 map
<string
,string
> log_to_graylog_host
;
1422 map
<string
,string
> log_to_graylog_port
;
1425 if (parse_log_client_options(g_ceph_context
, log_to_monitors
, log_to_syslog
,
1426 log_channel
, log_prio
, log_to_graylog
,
1427 log_to_graylog_host
, log_to_graylog_port
,
1429 cl
->update_config(log_to_monitors
, log_to_syslog
,
1430 log_channel
, log_prio
, log_to_graylog
,
1431 log_to_graylog_host
, log_to_graylog_port
,
1433 cl
->do_log(prio
, message
);
1436 void ActivePyModules::register_client(std::string_view name
, std::string addrs
)
1438 std::lock_guard
l(lock
);
1440 entity_addrvec_t addrv
;
1441 addrv
.parse(addrs
.data());
1443 dout(7) << "registering msgr client handle " << addrv
<< dendl
;
1444 py_module_registry
.register_client(name
, std::move(addrv
));
1447 void ActivePyModules::unregister_client(std::string_view name
, std::string addrs
)
1449 std::lock_guard
l(lock
);
1451 entity_addrvec_t addrv
;
1452 addrv
.parse(addrs
.data());
1454 dout(7) << "unregistering msgr client handle " << addrv
<< dendl
;
1455 py_module_registry
.unregister_client(name
, addrv
);