]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/mgr/ActivePyModules.cc
import 15.2.5
[ceph.git] / ceph / src / mgr / ActivePyModules.cc
index 82f845091cf92c3a5717fe7c0e5168e1bbe2f390..210f7755090857e448761c9d11170dd7dad2452a 100644 (file)
@@ -12,7 +12,6 @@
  */
 
 // Include this first to get python headers earlier
-#include "BaseMgrModule.h"
 #include "Gil.h"
 
 #include "common/errno.h"
 #include "mgr/MgrContext.h"
 
 // For ::config_prefix
+#include "PyModule.h"
 #include "PyModuleRegistry.h"
 
 #include "ActivePyModules.h"
+#include "DaemonKey.h"
+#include "DaemonServer.h"
 
 #define dout_context g_ceph_context
 #define dout_subsys ceph_subsys_mgr
 #undef dout_prefix
 #define dout_prefix *_dout << "mgr " << __func__ << " "
 
-
-ActivePyModules::ActivePyModules(PyModuleConfig const &config_,
+ActivePyModules::ActivePyModules(PyModuleConfig &module_config_,
+          std::map<std::string, std::string> store_data,
           DaemonStateIndex &ds, ClusterState &cs,
-         MonClient &mc, LogChannelRef clog_, Objecter &objecter_,
-          Client &client_, Finisher &f)
-  : config_cache(config_), daemon_state(ds), cluster_state(cs),
-    monc(mc), clog(clog_), objecter(objecter_), client(client_), finisher(f),
-    lock("ActivePyModules")
-{}
+          MonClient &mc, LogChannelRef clog_,
+          LogChannelRef audit_clog_, Objecter &objecter_,
+          Client &client_, Finisher &f, DaemonServer &server,
+          PyModuleRegistry &pmr)
+  : module_config(module_config_), daemon_state(ds), cluster_state(cs),
+    monc(mc), clog(clog_), audit_clog(audit_clog_), objecter(objecter_),
+    client(client_), finisher(f),
+    cmd_finisher(g_ceph_context, "cmd_finisher", "cmdfin"),
+    server(server), py_module_registry(pmr)
+{
+  store_cache = std::move(store_data);
+  cmd_finisher.start();
+}
 
 ActivePyModules::~ActivePyModules() = default;
 
@@ -55,23 +64,19 @@ void ActivePyModules::dump_server(const std::string &hostname,
   f->open_array_section("services");
   std::string ceph_version;
 
-  for (const auto &i : dmc) {
-    Mutex::Locker l(i.second->lock);
-    const auto &key = i.first;
-    const std::string &str_type = key.first;
-    const std::string &svc_name = key.second;
-
+  for (const auto &[key, state] : dmc) {
+    std::lock_guard l(state->lock);
     // TODO: pick the highest version, and make sure that
     // somewhere else (during health reporting?) we are
     // indicating to the user if we see mixed versions
-    auto ver_iter = i.second->metadata.find("ceph_version");
-    if (ver_iter != i.second->metadata.end()) {
-      ceph_version = i.second->metadata.at("ceph_version");
+    auto ver_iter = state->metadata.find("ceph_version");
+    if (ver_iter != state->metadata.end()) {
+      ceph_version = state->metadata.at("ceph_version");
     }
 
     f->open_object_section("service");
-    f->dump_string("type", str_type);
-    f->dump_string("id", svc_name);
+    f->dump_string("type", key.type);
+    f->dump_string("id", key.name);
     f->close_section();
   }
   f->close_section();
@@ -84,7 +89,7 @@ void ActivePyModules::dump_server(const std::string &hostname,
 PyObject *ActivePyModules::get_server_python(const std::string &hostname)
 {
   PyThreadState *tstate = PyEval_SaveThread();
-  Mutex::Locker l(lock);
+  std::lock_guard l(lock);
   PyEval_RestoreThread(tstate);
   dout(10) << " (" << hostname << ")" << dendl;
 
@@ -98,14 +103,14 @@ PyObject *ActivePyModules::get_server_python(const std::string &hostname)
 
 PyObject *ActivePyModules::list_servers_python()
 {
+  PyFormatter f(false, true);
   PyThreadState *tstate = PyEval_SaveThread();
-  Mutex::Locker l(lock);
-  PyEval_RestoreThread(tstate);
   dout(10) << " >" << dendl;
 
-  PyFormatter f(false, true);
-  daemon_state.with_daemons_by_server([this, &f]
+  daemon_state.with_daemons_by_server([this, &f, &tstate]
       (const std::map<std::string, DaemonStateCollection> &all) {
+    PyEval_RestoreThread(tstate);
+
     for (const auto &i : all) {
       const auto &hostname = i.first;
 
@@ -122,13 +127,13 @@ PyObject *ActivePyModules::get_metadata_python(
   const std::string &svc_type,
   const std::string &svc_id)
 {
-  auto metadata = daemon_state.get(DaemonKey(svc_type, svc_id));
+  auto metadata = daemon_state.get(DaemonKey{svc_type, svc_id});
   if (metadata == nullptr) {
     derr << "Requested missing service " << svc_type << "." << svc_id << dendl;
     Py_RETURN_NONE;
   }
 
-  Mutex::Locker l(metadata->lock);
+  std::lock_guard l(metadata->lock);
   PyFormatter f;
   f.dump_string("hostname", metadata->hostname);
   for (const auto &i : metadata->metadata) {
@@ -142,13 +147,13 @@ PyObject *ActivePyModules::get_daemon_status_python(
   const std::string &svc_type,
   const std::string &svc_id)
 {
-  auto metadata = daemon_state.get(DaemonKey(svc_type, svc_id));
+  auto metadata = daemon_state.get(DaemonKey{svc_type, svc_id});
   if (metadata == nullptr) {
     derr << "Requested missing service " << svc_type << "." << svc_id << dendl;
     Py_RETURN_NONE;
   }
 
-  Mutex::Locker l(metadata->lock);
+  std::lock_guard l(metadata->lock);
   PyFormatter f;
   for (const auto &i : metadata->service_status) {
     f.dump_string(i.first.c_str(), i.second);
@@ -158,26 +163,30 @@ PyObject *ActivePyModules::get_daemon_status_python(
 
 PyObject *ActivePyModules::get_python(const std::string &what)
 {
+  PyFormatter f;
+
+  // Drop the GIL, as most of the following blocks will block on
+  // a mutex -- they are all responsible for re-taking the GIL before
+  // touching the PyFormatter instance or returning from the function.
   PyThreadState *tstate = PyEval_SaveThread();
-  Mutex::Locker l(lock);
-  PyEval_RestoreThread(tstate);
 
   if (what == "fs_map") {
-    PyFormatter f;
-    cluster_state.with_fsmap([&f](const FSMap &fsmap) {
+    cluster_state.with_fsmap([&f, &tstate](const FSMap &fsmap) {
+      PyEval_RestoreThread(tstate);
       fsmap.dump(&f);
     });
     return f.get();
   } else if (what == "osdmap_crush_map_text") {
     bufferlist rdata;
-    cluster_state.with_osdmap([&rdata](const OSDMap &osd_map){
-       osd_map.crush->encode(rdata, CEPH_FEATURES_SUPPORTED_DEFAULT);
+    cluster_state.with_osdmap([&rdata, &tstate](const OSDMap &osd_map){
+      PyEval_RestoreThread(tstate);
+      osd_map.crush->encode(rdata, CEPH_FEATURES_SUPPORTED_DEFAULT);
     });
     std::string crush_text = rdata.to_str();
-    return PyString_FromString(crush_text.c_str());
+    return PyUnicode_FromString(crush_text.c_str());
   } else if (what.substr(0, 7) == "osd_map") {
-    PyFormatter f;
-    cluster_state.with_osdmap([&f, &what](const OSDMap &osd_map){
+    cluster_state.with_osdmap([&f, &what, &tstate](const OSDMap &osd_map){
+      PyEval_RestoreThread(tstate);
       if (what == "osd_map") {
         osd_map.dump(&f);
       } else if (what == "osd_map_tree") {
@@ -187,43 +196,79 @@ PyObject *ActivePyModules::get_python(const std::string &what)
       }
     });
     return f.get();
-  } else if (what == "config") {
-    PyFormatter f;
-    g_conf->show_config(&f);
+  } else if (what == "modified_config_options") {
+    PyEval_RestoreThread(tstate);
+    auto all_daemons = daemon_state.get_all();
+    set<string> names;
+    for (auto& [key, daemon] : all_daemons) {
+      std::lock_guard l(daemon->lock);
+      for (auto& [name, valmap] : daemon->config) {
+       names.insert(name);
+      }
+    }
+    f.open_array_section("options");
+    for (auto& name : names) {
+      f.dump_string("name", name);
+    }
+    f.close_section();
+    return f.get();
+  } else if (what.substr(0, 6) == "config") {
+    PyEval_RestoreThread(tstate);
+    if (what == "config_options") {
+      g_conf().config_options(&f);
+    } else if (what == "config") {
+      g_conf().show_config(&f);
+    }
     return f.get();
   } else if (what == "mon_map") {
-    PyFormatter f;
     cluster_state.with_monmap(
-      [&f](const MonMap &monmap) {
+      [&f, &tstate](const MonMap &monmap) {
+        PyEval_RestoreThread(tstate);
         monmap.dump(&f);
       }
     );
     return f.get();
   } else if (what == "service_map") {
-    PyFormatter f;
     cluster_state.with_servicemap(
-      [&f](const ServiceMap &service_map) {
+      [&f, &tstate](const ServiceMap &service_map) {
+        PyEval_RestoreThread(tstate);
         service_map.dump(&f);
       }
     );
     return f.get();
   } else if (what == "osd_metadata") {
-    PyFormatter f;
     auto dmc = daemon_state.get_by_service("osd");
-    for (const auto &i : dmc) {
-      Mutex::Locker l(i.second->lock);
-      f.open_object_section(i.first.second.c_str());
-      f.dump_string("hostname", i.second->hostname);
-      for (const auto &j : i.second->metadata) {
-        f.dump_string(j.first.c_str(), j.second);
+    PyEval_RestoreThread(tstate);
+
+    for (const auto &[key, state] : dmc) {
+      std::lock_guard l(state->lock);
+      f.open_object_section(key.name.c_str());
+      f.dump_string("hostname", state->hostname);
+      for (const auto &[name, val] : state->metadata) {
+        f.dump_string(name.c_str(), val);
+      }
+      f.close_section();
+    }
+    return f.get();
+  } else if (what == "mds_metadata") {
+    auto dmc = daemon_state.get_by_service("mds");
+    PyEval_RestoreThread(tstate);
+
+    for (const auto &[key, state] : dmc) {
+      std::lock_guard l(state->lock);
+      f.open_object_section(key.name.c_str());
+      f.dump_string("hostname", state->hostname);
+      for (const auto &[name, val] : state->metadata) {
+        f.dump_string(name.c_str(), val);
       }
       f.close_section();
     }
     return f.get();
   } else if (what == "pg_summary") {
-    PyFormatter f;
     cluster_state.with_pgmap(
-        [&f](const PGMap &pg_map) {
+        [&f, &tstate](const PGMap &pg_map) {
+          PyEval_RestoreThread(tstate);
+
           std::map<std::string, std::map<std::string, uint32_t> > osds;
           std::map<std::string, std::map<std::string, uint32_t> > pools;
           std::map<std::string, uint32_t> all;
@@ -260,212 +305,362 @@ PyObject *ActivePyModules::get_python(const std::string &what)
             f.dump_int(i.first.c_str(), i.second);
           }
           f.close_section();
+          f.open_object_section("pg_stats_sum");
+          pg_map.pg_sum.dump(&f);
+          f.close_section();
         }
     );
     return f.get();
   } else if (what == "pg_status") {
-    PyFormatter f;
     cluster_state.with_pgmap(
-        [&f](const PGMap &pg_map) {
+        [&f, &tstate](const PGMap &pg_map) {
+          PyEval_RestoreThread(tstate);
          pg_map.print_summary(&f, nullptr);
         }
     );
     return f.get();
   } else if (what == "pg_dump") {
-    PyFormatter f;
-        cluster_state.with_pgmap(
-        [&f](const PGMap &pg_map) {
-         pg_map.dump(&f);
-        }
+    cluster_state.with_pgmap(
+      [&f, &tstate](const PGMap &pg_map) {
+        PyEval_RestoreThread(tstate);
+       pg_map.dump(&f, false);
+      }
+    );
+    return f.get();
+  } else if (what == "devices") {
+    daemon_state.with_devices2(
+      [&tstate, &f]() {
+       PyEval_RestoreThread(tstate);
+       f.open_array_section("devices");
+      },
+      [&f] (const DeviceState& dev) {
+       f.dump_object("device", dev);
+      });
+    f.close_section();
+    return f.get();
+  } else if (what.size() > 7 &&
+            what.substr(0, 7) == "device ") {
+    string devid = what.substr(7);
+    if (!daemon_state.with_device(
+         devid,
+         [&f, &tstate] (const DeviceState& dev) {
+           PyEval_RestoreThread(tstate);
+           f.dump_object("device", dev);
+         })) {
+      // device not found
+      PyEval_RestoreThread(tstate);
+    }
+    return f.get();
+  } else if (what == "io_rate") {
+    cluster_state.with_pgmap(
+      [&f, &tstate](const PGMap &pg_map) {
+        PyEval_RestoreThread(tstate);
+        pg_map.dump_delta(&f);
+      }
     );
     return f.get();
   } else if (what == "df") {
-    PyFormatter f;
-
-    cluster_state.with_osdmap([this, &f](const OSDMap &osd_map){
-      cluster_state.with_pgmap(
-          [&osd_map, &f](const PGMap &pg_map) {
-        pg_map.dump_fs_stats(nullptr, &f, true);
+    cluster_state.with_osdmap_and_pgmap(
+      [&f, &tstate](
+       const OSDMap& osd_map,
+       const PGMap &pg_map) {
+       PyEval_RestoreThread(tstate);
+        pg_map.dump_cluster_stats(nullptr, &f, true);
         pg_map.dump_pool_stats_full(osd_map, nullptr, &f, true);
       });
+    return f.get();
+  } else if (what == "pg_stats") {
+    cluster_state.with_pgmap(
+        [&f, &tstate](const PGMap &pg_map) {
+      PyEval_RestoreThread(tstate);
+      pg_map.dump_pg_stats(&f, false);
+    });
+    return f.get();
+  } else if (what == "pool_stats") {
+    cluster_state.with_pgmap(
+        [&f, &tstate](const PGMap &pg_map) {
+      PyEval_RestoreThread(tstate);
+      pg_map.dump_pool_stats(&f);
     });
     return f.get();
+  } else if (what == "pg_ready") {
+    PyEval_RestoreThread(tstate);
+    server.dump_pg_ready(&f);
+    return f.get();
   } else if (what == "osd_stats") {
-    PyFormatter f;
     cluster_state.with_pgmap(
-        [&f](const PGMap &pg_map) {
-      pg_map.dump_osd_stats(&f);
+        [&f, &tstate](const PGMap &pg_map) {
+      PyEval_RestoreThread(tstate);
+      pg_map.dump_osd_stats(&f, false);
     });
     return f.get();
-  } else if (what == "health" || what == "mon_status") {
-    PyFormatter f;
-    bufferlist json;
-    if (what == "health") {
-      json = cluster_state.get_health();
-    } else if (what == "mon_status") {
-      json = cluster_state.get_mon_status();
-    } else {
-      assert(false);
-    }
-    f.dump_string("json", json.to_str());
+  } else if (what == "osd_ping_times") {
+    cluster_state.with_pgmap(
+        [&f, &tstate](const PGMap &pg_map) {
+      PyEval_RestoreThread(tstate);
+      pg_map.dump_osd_ping_times(&f);
+    });
+    return f.get();
+  } else if (what == "osd_pool_stats") {
+    int64_t poolid = -ENOENT;
+    cluster_state.with_osdmap_and_pgmap([&](const OSDMap& osdmap,
+                                           const PGMap& pg_map) {
+        PyEval_RestoreThread(tstate);
+        f.open_array_section("pool_stats");
+        for (auto &p : osdmap.get_pools()) {
+          poolid = p.first;
+          pg_map.dump_pool_stats_and_io_rate(poolid, osdmap, &f, nullptr);
+        }
+        f.close_section();
+    });
+    return f.get();
+  } else if (what == "health") {
+    cluster_state.with_health(
+        [&f, &tstate](const ceph::bufferlist &health_json) {
+      PyEval_RestoreThread(tstate);
+      f.dump_string("json", health_json.to_str());
+    });
+    return f.get();
+  } else if (what == "mon_status") {
+    cluster_state.with_mon_status(
+        [&f, &tstate](const ceph::bufferlist &mon_status_json) {
+      PyEval_RestoreThread(tstate);
+      f.dump_string("json", mon_status_json.to_str());
+    });
     return f.get();
   } else if (what == "mgr_map") {
-    PyFormatter f;
-    cluster_state.with_mgrmap([&f](const MgrMap &mgr_map) {
+    cluster_state.with_mgrmap([&f, &tstate](const MgrMap &mgr_map) {
+      PyEval_RestoreThread(tstate);
       mgr_map.dump(&f);
     });
     return f.get();
   } else {
     derr << "Python module requested unknown data '" << what << "'" << dendl;
+    PyEval_RestoreThread(tstate);
     Py_RETURN_NONE;
   }
 }
 
-int ActivePyModules::start_one(std::string const &module_name,
-    PyObject *pClass, const SafeThreadState &pMyThreadState)
+void ActivePyModules::start_one(PyModuleRef py_module)
 {
-  Mutex::Locker l(lock);
-
-  assert(modules.count(module_name) == 0);
-
-  modules[module_name].reset(new ActivePyModule(
-      module_name, pClass,
-      pMyThreadState, clog));
-
-  int r = modules[module_name]->load(this);
-  if (r != 0) {
-    return r;
-  } else {
-    dout(4) << "Starting thread for " << module_name << dendl;
-    // Giving Thread the module's module_name member as its
-    // char* thread name: thread must not outlive module class lifetime.
-    modules[module_name]->thread.create(
-        modules[module_name]->get_name().c_str());
+  std::lock_guard l(lock);
+
+  const auto name = py_module->get_name();
+  auto active_module = std::make_shared<ActivePyModule>(py_module, clog);
+
+  pending_modules.insert(name);
+  // Send all python calls down a Finisher to avoid blocking
+  // C++ code, and avoid any potential lock cycles.
+  finisher.queue(new LambdaContext([this, active_module, name](int) {
+    int r = active_module->load(this);
+    std::lock_guard l(lock);
+    pending_modules.erase(name);
+    if (r != 0) {
+      derr << "Failed to run module in active mode ('" << name << "')"
+           << dendl;
+    } else {
+      auto em = modules.emplace(name, active_module);
+      ceph_assert(em.second); // actually inserted
 
-    return 0;
-  }
+      dout(4) << "Starting thread for " << name << dendl;
+      active_module->thread.create(active_module->get_thread_name());
+    }
+  }));
 }
 
 void ActivePyModules::shutdown()
 {
-  Mutex::Locker locker(lock);
+  std::lock_guard locker(lock);
 
   // Signal modules to drop out of serve() and/or tear down resources
-  for (auto &i : modules) {
-    auto module = i.second.get();
-    const auto& name = i.first;
-
-    lock.Unlock();
+  for (auto& [name, module] : modules) {
+    lock.unlock();
     dout(10) << "calling module " << name << " shutdown()" << dendl;
     module->shutdown();
     dout(10) << "module " << name << " shutdown() returned" << dendl;
-    lock.Lock();
+    lock.lock();
   }
 
   // For modules implementing serve(), finish the threads where we
   // were running that.
-  for (auto &i : modules) {
-    lock.Unlock();
-    dout(10) << "joining module " << i.first << dendl;
-    i.second->thread.join();
-    dout(10) << "joined module " << i.first << dendl;
-    lock.Lock();
+  for (auto& [name, module] : modules) {
+    lock.unlock();
+    dout(10) << "joining module " << name << dendl;
+    module->thread.join();
+    dout(10) << "joined module " << name << dendl;
+    lock.lock();
   }
 
+  cmd_finisher.wait_for_empty();
+  cmd_finisher.stop();
+
   modules.clear();
 }
 
 void ActivePyModules::notify_all(const std::string &notify_type,
                      const std::string &notify_id)
 {
-  Mutex::Locker l(lock);
+  std::lock_guard l(lock);
 
   dout(10) << __func__ << ": notify_all " << notify_type << dendl;
-  for (auto& i : modules) {
-    auto module = i.second.get();
+  for (auto& [name, module] : modules) {
     // Send all python calls down a Finisher to avoid blocking
     // C++ code, and avoid any potential lock cycles.
-    finisher.queue(new FunctionContext([module, notify_type, notify_id](int r){
-      module->notify(notify_type, notify_id);
+    dout(15) << "queuing notify to " << name << dendl;
+    // workaround for https://bugs.llvm.org/show_bug.cgi?id=35984
+    finisher.queue(new LambdaContext([module=module, notify_type, notify_id]
+      (int r){ 
+        module->notify(notify_type, notify_id); 
     }));
   }
 }
 
 void ActivePyModules::notify_all(const LogEntry &log_entry)
 {
-  Mutex::Locker l(lock);
+  std::lock_guard l(lock);
 
   dout(10) << __func__ << ": notify_all (clog)" << dendl;
-  for (auto& i : modules) {
-    auto module = i.second.get();
+  for (auto& [name, module] : modules) {
     // Send all python calls down a Finisher to avoid blocking
     // C++ code, and avoid any potential lock cycles.
     //
     // Note intentional use of non-reference lambda binding on
     // log_entry: we take a copy because caller's instance is
     // probably ephemeral.
-    finisher.queue(new FunctionContext([module, log_entry](int r){
+    dout(15) << "queuing notify (clog) to " << name << dendl;
+    // workaround for https://bugs.llvm.org/show_bug.cgi?id=35984
+    finisher.queue(new LambdaContext([module=module, log_entry](int r){
       module->notify_clog(log_entry);
     }));
   }
 }
 
-bool ActivePyModules::get_config(const std::string &module_name,
+bool ActivePyModules::get_store(const std::string &module_name,
     const std::string &key, std::string *val) const
 {
   PyThreadState *tstate = PyEval_SaveThread();
-  Mutex::Locker l(lock);
+  std::lock_guard l(lock);
   PyEval_RestoreThread(tstate);
 
-  const std::string global_key = PyModuleRegistry::config_prefix
+  const std::string global_key = PyModule::config_prefix
     + module_name + "/" + key;
 
-  dout(4) << __func__ << "key: " << global_key << dendl;
+  dout(4) << __func__ << " key: " << global_key << dendl;
 
-  if (config_cache.count(global_key)) {
-    *val = config_cache.at(global_key);
+  auto i = store_cache.find(global_key);
+  if (i != store_cache.end()) {
+    *val = i->second;
     return true;
   } else {
     return false;
   }
 }
 
-PyObject *ActivePyModules::get_config_prefix(const std::string &module_name,
+PyObject *ActivePyModules::dispatch_remote(
+    const std::string &other_module,
+    const std::string &method,
+    PyObject *args,
+    PyObject *kwargs,
+    std::string *err)
+{
+  auto mod_iter = modules.find(other_module);
+  ceph_assert(mod_iter != modules.end());
+
+  return mod_iter->second->dispatch_remote(method, args, kwargs, err);
+}
+
+bool ActivePyModules::get_config(const std::string &module_name,
+    const std::string &key, std::string *val) const
+{
+  const std::string global_key = PyModule::config_prefix
+    + module_name + "/" + key;
+
+  dout(20) << " key: " << global_key << dendl;
+
+  std::lock_guard lock(module_config.lock);
+
+  auto i = module_config.config.find(global_key);
+  if (i != module_config.config.end()) {
+    *val = i->second;
+    return true;
+  } else {
+    return false;
+  }
+}
+
+PyObject *ActivePyModules::get_typed_config(
+  const std::string &module_name,
+  const std::string &key,
+  const std::string &prefix) const
+{
+  PyThreadState *tstate = PyEval_SaveThread();
+  std::string value;
+  std::string final_key;
+  bool found = false;
+  if (prefix.size()) {
+    final_key = prefix + "/" + key;
+    found = get_config(module_name, final_key, &value);
+  }
+  if (!found) {
+    final_key = key;
+    found = get_config(module_name, final_key, &value);
+  }
+  if (found) {
+    PyModuleRef module = py_module_registry.get_module(module_name);
+    PyEval_RestoreThread(tstate);
+    if (!module) {
+        derr << "Module '" << module_name << "' is not available" << dendl;
+        Py_RETURN_NONE;
+    }
+    dout(10) << __func__ << " " << final_key << " found: " << value << dendl;
+    return module->get_typed_option_value(key, value);
+  }
+  PyEval_RestoreThread(tstate);
+  if (prefix.size()) {
+    dout(10) << " [" << prefix << "/]" << key << " not found "
+           << dendl;
+  } else {
+    dout(10) << " " << key << " not found " << dendl;
+  }
+  Py_RETURN_NONE;
+}
+
+PyObject *ActivePyModules::get_store_prefix(const std::string &module_name,
     const std::string &prefix) const
 {
   PyThreadState *tstate = PyEval_SaveThread();
-  Mutex::Locker l(lock);
+  std::lock_guard l(lock);
+  std::lock_guard lock(module_config.lock);
   PyEval_RestoreThread(tstate);
 
-  const std::string base_prefix = PyModuleRegistry::config_prefix
+  const std::string base_prefix = PyModule::config_prefix
                                     + module_name + "/";
   const std::string global_prefix = base_prefix + prefix;
-  dout(4) << __func__ << "prefix: " << global_prefix << dendl;
+  dout(4) << __func__ << " prefix: " << global_prefix << dendl;
 
   PyFormatter f;
-  for (auto p = config_cache.lower_bound(global_prefix);
-       p != config_cache.end() && p->first.find(global_prefix) == 0;
+
+  for (auto p = store_cache.lower_bound(global_prefix);
+       p != store_cache.end() && p->first.find(global_prefix) == 0;
        ++p) {
     f.dump_string(p->first.c_str() + base_prefix.size(), p->second);
   }
   return f.get();
 }
 
-void ActivePyModules::set_config(const std::string &module_name,
+void ActivePyModules::set_store(const std::string &module_name,
     const std::string &key, const boost::optional<std::string>& val)
 {
-  const std::string global_key = PyModuleRegistry::config_prefix
+  const std::string global_key = PyModule::config_prefix
                                    + module_name + "/" + key;
 
   Command set_cmd;
   {
-    PyThreadState *tstate = PyEval_SaveThread();
-    Mutex::Locker l(lock);
-    PyEval_RestoreThread(tstate);
+    std::lock_guard l(lock);
     if (val) {
-      config_cache[global_key] = *val;
+      store_cache[global_key] = *val;
     } else {
-      config_cache.erase(global_key);
+      store_cache.erase(global_key);
     }
 
     std::ostringstream cmd_json;
@@ -495,77 +690,49 @@ void ActivePyModules::set_config(const std::string &module_name,
   }
 }
 
-std::vector<ModuleCommand> ActivePyModules::get_py_commands() const
-{
-  Mutex::Locker l(lock);
-
-  std::vector<ModuleCommand> result;
-  for (const auto& i : modules) {
-    auto module = i.second.get();
-    auto mod_commands = module->get_commands();
-    for (auto j : mod_commands) {
-      result.push_back(j);
-    }
-  }
-
-  return result;
-}
-
-std::vector<MonCommand> ActivePyModules::get_commands() const
+void ActivePyModules::set_config(const std::string &module_name,
+    const std::string &key, const boost::optional<std::string>& val)
 {
-  std::vector<ModuleCommand> commands = get_py_commands();
-  std::vector<MonCommand> result;
-  for (auto &pyc: commands) {
-    result.push_back({pyc.cmdstring, pyc.helpstring, "mgr",
-                        pyc.perm, "cli", MonCommand::FLAG_MGR});
-  }
-  return result;
+  module_config.set_config(&monc, module_name, key, val);
 }
 
-
 std::map<std::string, std::string> ActivePyModules::get_services() const
 {
   std::map<std::string, std::string> result;
-  Mutex::Locker l(lock);
-  for (const auto& i : modules) {
-    const auto &module = i.second.get();
+  std::lock_guard l(lock);
+  for (const auto& [name, module] : modules) {
     std::string svc_str = module->get_uri();
     if (!svc_str.empty()) {
-      result[module->get_name()] = svc_str;
+      result[name] = svc_str;
     }
   }
 
   return result;
 }
 
-PyObject* ActivePyModules::get_counter_python(
+PyObject* ActivePyModules::with_perf_counters(
+    std::function<void(PerfCounterInstance& counter_instance, PerfCounterType& counter_type, PyFormatter& f)> fct,
     const std::string &svc_name,
     const std::string &svc_id,
-    const std::string &path)
+    const std::string &path) const
 {
   PyThreadState *tstate = PyEval_SaveThread();
-  Mutex::Locker l(lock);
+  std::lock_guard l(lock);
   PyEval_RestoreThread(tstate);
 
   PyFormatter f;
   f.open_array_section(path.c_str());
 
-  auto metadata = daemon_state.get(DaemonKey(svc_name, svc_id));
+  auto metadata = daemon_state.get(DaemonKey{svc_name, svc_id});
   if (metadata) {
-    Mutex::Locker l2(metadata->lock);
+    std::lock_guard l2(metadata->lock);
     if (metadata->perf_counters.instances.count(path)) {
       auto counter_instance = metadata->perf_counters.instances.at(path);
-      const auto &data = counter_instance.get_data();
-      for (const auto &datapoint : data) {
-        f.open_array_section("datapoint");
-        f.dump_unsigned("t", datapoint.t.sec());
-        f.dump_unsigned("v", datapoint.v);
-        f.close_section();
-
-      }
+      auto counter_type = metadata->perf_counters.types.at(path);
+      fct(counter_instance, counter_type, f);
     } else {
       dout(4) << "Missing counter: '" << path << "' ("
-              << svc_name << "." << svc_id << ")" << dendl;
+        << svc_name << "." << svc_id << ")" << dendl;
       dout(20) << "Paths are:" << dendl;
       for (const auto &i : metadata->perf_counters.instances) {
         dout(20) << i.first << dendl;
@@ -573,28 +740,84 @@ PyObject* ActivePyModules::get_counter_python(
     }
   } else {
     dout(4) << "No daemon state for "
-              << svc_name << "." << svc_id << ")" << dendl;
+      << svc_name << "." << svc_id << ")" << dendl;
   }
   f.close_section();
   return f.get();
 }
 
+PyObject* ActivePyModules::get_counter_python(
+    const std::string &svc_name,
+    const std::string &svc_id,
+    const std::string &path)
+{
+  auto extract_counters = [](
+      PerfCounterInstance& counter_instance,
+      PerfCounterType& counter_type,
+      PyFormatter& f)
+  {
+    if (counter_type.type & PERFCOUNTER_LONGRUNAVG) {
+      const auto &avg_data = counter_instance.get_data_avg();
+      for (const auto &datapoint : avg_data) {
+        f.open_array_section("datapoint");
+        f.dump_float("t", datapoint.t);
+        f.dump_unsigned("s", datapoint.s);
+        f.dump_unsigned("c", datapoint.c);
+        f.close_section();
+      }
+    } else {
+      const auto &data = counter_instance.get_data();
+      for (const auto &datapoint : data) {
+        f.open_array_section("datapoint");
+        f.dump_float("t", datapoint.t);
+        f.dump_unsigned("v", datapoint.v);
+        f.close_section();
+      }
+    }
+  };
+  return with_perf_counters(extract_counters, svc_name, svc_id, path);
+}
+
+PyObject* ActivePyModules::get_latest_counter_python(
+    const std::string &svc_name,
+    const std::string &svc_id,
+    const std::string &path)
+{
+  auto extract_latest_counters = [](
+      PerfCounterInstance& counter_instance,
+      PerfCounterType& counter_type,
+      PyFormatter& f)
+  {
+    if (counter_type.type & PERFCOUNTER_LONGRUNAVG) {
+      const auto &datapoint = counter_instance.get_latest_data_avg();
+      f.dump_float("t", datapoint.t);
+      f.dump_unsigned("s", datapoint.s);
+      f.dump_unsigned("c", datapoint.c);
+    } else {
+      const auto &datapoint = counter_instance.get_latest_data();
+      f.dump_float("t", datapoint.t);
+      f.dump_unsigned("v", datapoint.v);
+    }
+  };
+  return with_perf_counters(extract_latest_counters, svc_name, svc_id, path);
+}
+
 PyObject* ActivePyModules::get_perf_schema_python(
-    const std::string svc_type,
+    const std::string &svc_type,
     const std::string &svc_id)
 {
   PyThreadState *tstate = PyEval_SaveThread();
-  Mutex::Locker l(lock);
+  std::lock_guard l(lock);
   PyEval_RestoreThread(tstate);
 
   DaemonStateCollection daemons;
 
   if (svc_type == "") {
-    daemons = std::move(daemon_state.get_all());
+    daemons = daemon_state.get_all();
   } else if (svc_id.empty()) {
-    daemons = std::move(daemon_state.get_by_service(svc_type));
+    daemons = daemon_state.get_by_service(svc_type);
   } else {
-    auto key = DaemonKey(svc_type, svc_id);
+    auto key = DaemonKey{svc_type, svc_id};
     // so that the below can be a loop in all cases
     auto got = daemon_state.get(key);
     if (got != nullptr) {
@@ -604,15 +827,10 @@ PyObject* ActivePyModules::get_perf_schema_python(
 
   PyFormatter f;
   if (!daemons.empty()) {
-    for (auto statepair : daemons) {
-      auto key = statepair.first;
-      auto state = statepair.second;
+    for (auto& [key, state] : daemons) {
+      f.open_object_section(ceph::to_string(key).c_str());
 
-      std::ostringstream daemon_name;
-      daemon_name << key.first << "." << key.second;
-      f.open_object_section(daemon_name.str().c_str());
-
-      Mutex::Locker l(state->lock);
+      std::lock_guard l(state->lock);
       for (auto ctr_inst_iter : state->perf_counters.instances) {
         const auto &counter_name = ctr_inst_iter.first;
        f.open_object_section(counter_name.c_str());
@@ -623,6 +841,7 @@ PyObject* ActivePyModules::get_perf_schema_python(
        }
        f.dump_unsigned("type", type.type);
        f.dump_unsigned("priority", type.priority);
+       f.dump_unsigned("units", type.unit);
        f.close_section();
       }
       f.close_section();
@@ -637,7 +856,7 @@ PyObject* ActivePyModules::get_perf_schema_python(
 PyObject *ActivePyModules::get_context()
 {
   PyThreadState *tstate = PyEval_SaveThread();
-  Mutex::Locker l(lock);
+  std::lock_guard l(lock);
   PyEval_RestoreThread(tstate);
 
   // Construct a capsule containing ceph context.
@@ -661,7 +880,7 @@ PyObject *construct_with_capsule(
     derr << "Failed to import python module:" << dendl;
     derr << handle_pyerror() << dendl;
   }
-  assert(module);
+  ceph_assert(module);
 
   PyObject *wrapper_type = PyObject_GetAttrString(
       module, (const char*)clsname.c_str());
@@ -669,11 +888,11 @@ PyObject *construct_with_capsule(
     derr << "Failed to get python type:" << dendl;
     derr << handle_pyerror() << dendl;
   }
-  assert(wrapper_type);
+  ceph_assert(wrapper_type);
 
   // Construct a capsule containing an OSDMap.
   auto wrapped_capsule = PyCapsule_New(wrapped, nullptr, nullptr);
-  assert(wrapped_capsule);
+  ceph_assert(wrapped_capsule);
 
   // Construct the python OSDMap
   auto pArgs = PyTuple_Pack(1, wrapped_capsule);
@@ -682,7 +901,7 @@ PyObject *construct_with_capsule(
     derr << "Failed to construct python OSDMap:" << dendl;
     derr << handle_pyerror() << dendl;
   }
-  assert(wrapper_instance != nullptr);
+  ceph_assert(wrapper_instance != nullptr);
   Py_DECREF(pArgs);
   Py_DECREF(wrapped_capsule);
 
@@ -694,15 +913,16 @@ PyObject *construct_with_capsule(
 
 PyObject *ActivePyModules::get_osdmap()
 {
-  PyThreadState *tstate = PyEval_SaveThread();
-  Mutex::Locker l(lock);
-  PyEval_RestoreThread(tstate);
-
   OSDMap *newmap = new OSDMap;
 
-  cluster_state.with_osdmap([&](const OSDMap& o) {
-      newmap->deepish_copy_from(o);
-    });
+  PyThreadState *tstate = PyEval_SaveThread();
+  {
+    std::lock_guard l(lock);
+    cluster_state.with_osdmap([&](const OSDMap& o) {
+        newmap->deepish_copy_from(o);
+      });
+  }
+  PyEval_RestoreThread(tstate);
 
   return construct_with_capsule("mgr_module", "OSDMap", (void*)newmap);
 }
@@ -710,28 +930,216 @@ PyObject *ActivePyModules::get_osdmap()
 void ActivePyModules::set_health_checks(const std::string& module_name,
                                  health_check_map_t&& checks)
 {
-  Mutex::Locker l(lock);
+  bool changed = false;
+
+  lock.lock();
   auto p = modules.find(module_name);
   if (p != modules.end()) {
-    p->second->set_health_checks(std::move(checks));
+    changed = p->second->set_health_checks(std::move(checks));
   }
+  lock.unlock();
+
+  // immediately schedule a report to be sent to the monitors with the new
+  // health checks that have changed. This is done asynchronusly to avoid
+  // blocking python land. ActivePyModules::lock needs to be dropped to make
+  // lockdep happy:
+  //
+  //   send_report callers: DaemonServer::lock -> PyModuleRegistery::lock
+  //   active_start: PyModuleRegistry::lock -> ActivePyModules::lock
+  //
+  // if we don't release this->lock before calling schedule_tick a cycle is
+  // formed with the addition of ActivePyModules::lock -> DaemonServer::lock.
+  // This is still correct as send_report is run asynchronously under
+  // DaemonServer::lock.
+  if (changed)
+    server.schedule_tick(0);
+}
+
+int ActivePyModules::handle_command(
+  const ModuleCommand& module_command,
+  const MgrSession& session,
+  const cmdmap_t &cmdmap,
+  const bufferlist &inbuf,
+  std::stringstream *ds,
+  std::stringstream *ss)
+{
+  lock.lock();
+  auto mod_iter = modules.find(module_command.module_name);
+  if (mod_iter == modules.end()) {
+    *ss << "Module '" << module_command.module_name << "' is not available";
+    lock.unlock();
+    return -ENOENT;
+  }
+
+  lock.unlock();
+  return mod_iter->second->handle_command(module_command, session, cmdmap,
+                                          inbuf, ds, ss);
 }
 
 void ActivePyModules::get_health_checks(health_check_map_t *checks)
 {
-  Mutex::Locker l(lock);
-  for (auto& p : modules) {
-    p.second->get_health_checks(checks);
+  std::lock_guard l(lock);
+  for (auto& [name, module] : modules) {
+    dout(15) << "getting health checks for " << name << dendl;
+    module->get_health_checks(checks);
+  }
+}
+
+void ActivePyModules::update_progress_event(
+  const std::string& evid,
+  const std::string& desc,
+  float progress)
+{
+  std::lock_guard l(lock);
+  auto& pe = progress_events[evid];
+  pe.message = desc;
+  pe.progress = progress;
+}
+
+void ActivePyModules::complete_progress_event(const std::string& evid)
+{
+  std::lock_guard l(lock);
+  progress_events.erase(evid);
+}
+
+void ActivePyModules::clear_all_progress_events()
+{
+  std::lock_guard l(lock);
+  progress_events.clear();
+}
+
+void ActivePyModules::get_progress_events(std::map<std::string,ProgressEvent> *events)
+{
+  std::lock_guard l(lock);
+  *events = progress_events;
+}
+
+void ActivePyModules::config_notify()
+{
+  std::lock_guard l(lock);
+  for (auto& [name, module] : modules) {
+    // Send all python calls down a Finisher to avoid blocking
+    // C++ code, and avoid any potential lock cycles.
+    dout(15) << "notify (config) " << name << dendl;
+    // workaround for https://bugs.llvm.org/show_bug.cgi?id=35984
+    finisher.queue(new LambdaContext([module=module](int r){ 
+      module->config_notify();
+    }));
   }
 }
 
 void ActivePyModules::set_uri(const std::string& module_name,
                         const std::string &uri)
 {
-  Mutex::Locker l(lock);
+  std::lock_guard l(lock);
 
   dout(4) << " module " << module_name << " set URI '" << uri << "'" << dendl;
 
-  modules[module_name]->set_uri(uri);
+  modules.at(module_name)->set_uri(uri);
+}
+
+MetricQueryID ActivePyModules::add_osd_perf_query(
+    const OSDPerfMetricQuery &query,
+    const std::optional<OSDPerfMetricLimit> &limit)
+{
+  return server.add_osd_perf_query(query, limit);
+}
+
+void ActivePyModules::remove_osd_perf_query(MetricQueryID query_id)
+{
+  int r = server.remove_osd_perf_query(query_id);
+  if (r < 0) {
+    dout(0) << "remove_osd_perf_query for query_id=" << query_id << " failed: "
+            << cpp_strerror(r) << dendl;
+  }
 }
 
+PyObject *ActivePyModules::get_osd_perf_counters(MetricQueryID query_id)
+{
+  std::map<OSDPerfMetricKey, PerformanceCounters> counters;
+
+  int r = server.get_osd_perf_counters(query_id, &counters);
+  if (r < 0) {
+    dout(0) << "get_osd_perf_counters for query_id=" << query_id << " failed: "
+            << cpp_strerror(r) << dendl;
+    Py_RETURN_NONE;
+  }
+
+  PyFormatter f;
+
+  f.open_array_section("counters");
+  for (auto &it : counters) {
+    auto &key = it.first;
+    auto  &instance_counters = it.second;
+    f.open_object_section("i");
+    f.open_array_section("k");
+    for (auto &sub_key : key) {
+      f.open_array_section("s");
+      for (size_t i = 0; i < sub_key.size(); i++) {
+        f.dump_string(stringify(i).c_str(), sub_key[i]);
+      }
+      f.close_section(); // s
+    }
+    f.close_section(); // k
+    f.open_array_section("c");
+    for (auto &c : instance_counters) {
+      f.open_array_section("p");
+      f.dump_unsigned("0", c.first);
+      f.dump_unsigned("1", c.second);
+      f.close_section(); // p
+    }
+    f.close_section(); // c
+    f.close_section(); // i
+  }
+  f.close_section(); // counters
+
+  return f.get();
+}
+
+void ActivePyModules::cluster_log(const std::string &channel, clog_type prio,
+  const std::string &message)
+{
+  std::lock_guard l(lock);
+
+  auto cl = monc.get_log_client()->create_channel(channel);
+  map<string,string> log_to_monitors;
+  map<string,string> log_to_syslog;
+  map<string,string> log_channel;
+  map<string,string> log_prio;
+  map<string,string> log_to_graylog;
+  map<string,string> log_to_graylog_host;
+  map<string,string> log_to_graylog_port;
+  uuid_d fsid;
+  string host;
+  if (parse_log_client_options(g_ceph_context, log_to_monitors, log_to_syslog,
+                              log_channel, log_prio, log_to_graylog,
+                              log_to_graylog_host, log_to_graylog_port,
+                              fsid, host) == 0)
+    cl->update_config(log_to_monitors, log_to_syslog,
+                     log_channel, log_prio, log_to_graylog,
+                     log_to_graylog_host, log_to_graylog_port,
+                     fsid, host);
+  cl->do_log(prio, message);
+}
+
+void ActivePyModules::register_client(std::string_view name, std::string addrs)
+{
+  std::lock_guard l(lock);
+
+  entity_addrvec_t addrv;
+  addrv.parse(addrs.data());
+
+  dout(7) << "registering msgr client handle " << addrv << dendl;
+  py_module_registry.register_client(name, std::move(addrv));
+}
+
+void ActivePyModules::unregister_client(std::string_view name, std::string addrs)
+{
+  std::lock_guard l(lock);
+
+  entity_addrvec_t addrv;
+  addrv.parse(addrs.data());
+
+  dout(7) << "unregistering msgr client handle " << addrv << dendl;
+  py_module_registry.unregister_client(name, addrv);
+}