]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/mgr/DaemonServer.cc
update sources to v12.1.2
[ceph.git] / ceph / src / mgr / DaemonServer.cc
index 6454c8da306a52b96bd631527f2b61c2dc09e5b9..a58675ed24c071b5e38ad48db43958f59fda8e7e 100644 (file)
@@ -17,6 +17,9 @@
 #include "auth/RotatingKeyRing.h"
 #include "json_spirit/json_spirit_writer.h"
 
+#include "mgr/mgr_commands.h"
+#include "mon/MonCommand.h"
+
 #include "messages/MMgrOpen.h"
 #include "messages/MMgrConfigure.h"
 #include "messages/MMonMgrReport.h"
@@ -24,6 +27,7 @@
 #include "messages/MCommandReply.h"
 #include "messages/MPGStats.h"
 #include "messages/MOSDScrub.h"
+#include "messages/MOSDForceRecovery.h"
 #include "common/errno.h"
 
 #define dout_context g_ceph_context
@@ -31,6 +35,8 @@
 #undef dout_prefix
 #define dout_prefix *_dout << "mgr.server " << __func__ << " "
 
+
+
 DaemonServer::DaemonServer(MonClient *monc_,
                            Finisher &finisher_,
                           DaemonStateIndex &daemon_state_,
@@ -143,12 +149,18 @@ bool DaemonServer::ms_verify_authorizer(Connection *con,
   s->inst.addr = con->get_peer_addr();
   AuthCapsInfo caps_info;
 
-  is_valid = handler->verify_authorizer(
-    cct, monc->rotating_secrets.get(),
-    authorizer_data,
-    authorizer_reply, s->entity_name,
-    s->global_id, caps_info,
-    session_key);
+  RotatingKeyRing *keys = monc->rotating_secrets.get();
+  if (keys) {
+    is_valid = handler->verify_authorizer(
+      cct, keys,
+      authorizer_data,
+      authorizer_reply, s->entity_name,
+      s->global_id, caps_info,
+      session_key);
+  } else {
+    dout(10) << __func__ << " no rotating_keys (yet), denied" << dendl;
+    is_valid = false;
+  }
 
   if (is_valid) {
     if (caps_info.allow_all) {
@@ -307,16 +319,18 @@ bool DaemonServer::handle_open(MMgrOpen *m)
   configure->stats_period = g_conf->mgr_stats_period;
   m->get_connection()->send_message(configure);
 
+  DaemonStatePtr daemon;
   if (daemon_state.exists(key)) {
+    daemon = daemon_state.get(key);
+  }
+  if (daemon) {
     dout(20) << "updating existing DaemonState for " << m->daemon_name << dendl;
+    Mutex::Locker l(daemon->lock);
     daemon_state.get(key)->perf_counters.clear();
   }
 
   if (m->service_daemon) {
-    DaemonStatePtr daemon;
-    if (daemon_state.exists(key)) {
-      daemon = daemon_state.get(key);
-    } else {
+    if (!daemon) {
       dout(4) << "constructing new DaemonState for " << key << dendl;
       daemon = std::make_shared<DaemonState>(daemon_state.types);
       daemon->key = key;
@@ -325,6 +339,7 @@ bool DaemonServer::handle_open(MMgrOpen *m)
       }
       daemon_state.insert(daemon);
     }
+    Mutex::Locker l(daemon->lock);
     daemon->service_daemon = true;
     daemon->metadata = m->daemon_metadata;
     daemon->service_status = m->daemon_status;
@@ -385,7 +400,16 @@ bool DaemonServer::handle_report(MMgrReport *m)
   }
   assert(daemon != nullptr);
   auto &daemon_counters = daemon->perf_counters;
-  daemon_counters.update(m);
+  {
+    Mutex::Locker l(daemon->lock);
+    daemon_counters.update(m);
+  }
+  // if there are any schema updates, notify the python modules
+  if (!m->declare_types.empty() || !m->undeclare_types.empty()) {
+    ostringstream oss;
+    oss << key.first << '.' << key.second;
+    py_modules.notify_all("perf_schema_update", oss.str());
+  }
 
   if (daemon->service_daemon) {
     utime_t now = ceph_clock_now();
@@ -402,24 +426,6 @@ bool DaemonServer::handle_report(MMgrReport *m)
   return true;
 }
 
-struct MgrCommand {
-  string cmdstring;
-  string helpstring;
-  string module;
-  string perm;
-  string availability;
-
-  bool requires_perm(char p) const {
-    return (perm.find(p) != string::npos);
-  }
-
-} mgr_commands[] = {
-
-#define COMMAND(parsesig, helptext, module, perm, availability) \
-  {parsesig, helptext, module, perm, availability},
-#include "MgrCommands.h"
-#undef COMMAND
-};
 
 void DaemonServer::_generate_command_map(
   map<string,cmd_vartype>& cmdmap,
@@ -444,16 +450,14 @@ void DaemonServer::_generate_command_map(
   }
 }
 
-const MgrCommand *DaemonServer::_get_mgrcommand(
+const MonCommand *DaemonServer::_get_mgrcommand(
   const string &cmd_prefix,
-  MgrCommand *cmds,
-  int cmds_size)
+  const std::vector<MonCommand> &cmds)
 {
-  MgrCommand *this_cmd = NULL;
-  for (MgrCommand *cp = cmds;
-       cp < &cmds[cmds_size]; cp++) {
-    if (cp->cmdstring.compare(0, cmd_prefix.size(), cmd_prefix) == 0) {
-      this_cmd = cp;
+  const MonCommand *this_cmd = nullptr;
+  for (const auto &cmd : cmds) {
+    if (cmd.cmdstring.compare(0, cmd_prefix.size(), cmd_prefix) == 0) {
+      this_cmd = &cmd;
       break;
     }
   }
@@ -466,7 +470,7 @@ bool DaemonServer::_allowed_command(
   const string &prefix,
   const map<string,cmd_vartype>& cmdmap,
   const map<string,string>& param_str_map,
-  const MgrCommand *this_cmd) {
+  const MonCommand *this_cmd) {
 
   if (s->entity_name.is_mon()) {
     // mon is all-powerful.  even when it is forwarding commands on behalf of
@@ -593,30 +597,29 @@ bool DaemonServer::handle_command(MCommand *m)
   dout(4) << "prefix=" << prefix << dendl;
 
   if (prefix == "get_command_descriptions") {
-    int cmdnum = 0;
-
     dout(10) << "reading commands from python modules" << dendl;
-    auto py_commands = py_modules.get_commands();
+    const auto py_commands = py_modules.get_commands();
 
+    int cmdnum = 0;
     JSONFormatter f;
     f.open_object_section("command_descriptions");
-    for (const auto &pyc : py_commands) {
+
+    auto dump_cmd = [&cmdnum, &f](const MonCommand &mc){
       ostringstream secname;
       secname << "cmd" << setfill('0') << std::setw(3) << cmdnum;
-      dout(20) << "Dumping " << pyc.cmdstring << " (" << pyc.helpstring
-               << ")" << dendl;
-      dump_cmddesc_to_json(&f, secname.str(), pyc.cmdstring, pyc.helpstring,
-                          "mgr", pyc.perm, "cli", 0);
+      dump_cmddesc_to_json(&f, secname.str(), mc.cmdstring, mc.helpstring,
+                           mc.module, mc.req_perms, mc.availability, 0);
       cmdnum++;
+    };
+
+    for (const auto &pyc : py_commands) {
+      dump_cmd(pyc);
     }
 
-    for (const auto &cp : mgr_commands) {
-      ostringstream secname;
-      secname << "cmd" << setfill('0') << std::setw(3) << cmdnum;
-      dump_cmddesc_to_json(&f, secname.str(), cp.cmdstring, cp.helpstring,
-                          cp.module, cp.perm, cp.availability, 0);
-      cmdnum++;
+    for (const auto &mgr_cmd : mgr_commands) {
+      dump_cmd(mgr_cmd);
     }
+
     f.close_section(); // command_descriptions
     f.flush(cmdctx->odata);
     cmdctx->reply(0, ss);
@@ -624,11 +627,10 @@ bool DaemonServer::handle_command(MCommand *m)
   }
 
   // lookup command
-  const MgrCommand *mgr_cmd = _get_mgrcommand(prefix, mgr_commands,
-                                              ARRAY_SIZE(mgr_commands));
+  const MonCommand *mgr_cmd = _get_mgrcommand(prefix, mgr_commands);
   _generate_command_map(cmdctx->cmdmap, param_str_map);
   if (!mgr_cmd) {
-    MgrCommand py_command = {"", "", "py", "rw", "cli"};
+    MonCommand py_command = {"", "", "py", "rw", "cli"};
     if (!_allowed_command(session.get(), py_command.module, prefix, cmdctx->cmdmap,
                           param_str_map, &py_command)) {
       dout(1) << " access denied" << dendl;
@@ -685,6 +687,7 @@ bool DaemonServer::handle_command(MCommand *m)
        DaemonKey key(p.first, q.first);
        assert(daemon_state.exists(key));
        auto daemon = daemon_state.get(key);
+       Mutex::Locker l(daemon->lock);
        f->dump_stream("status_stamp") << daemon->service_status_stamp;
        f->dump_stream("last_beacon") << daemon->last_service_beacon;
        f->open_object_section("status");
@@ -921,6 +924,148 @@ bool DaemonServer::handle_command(MCommand *m)
       });
     cmdctx->reply(r, "");
     return true;
+  } else if (prefix == "pg force-recovery" ||
+              prefix == "pg force-backfill" ||
+              prefix == "pg cancel-force-recovery" ||
+              prefix == "pg cancel-force-backfill") {
+    string forceop = prefix.substr(3, string::npos);
+    list<pg_t> parsed_pgs;
+    map<int, list<pg_t> > osdpgs;
+
+    // figure out actual op just once
+    int actual_op = 0;
+    if (forceop == "force-recovery") {
+      actual_op = OFR_RECOVERY;
+    } else if (forceop == "force-backfill") {
+      actual_op = OFR_BACKFILL;
+    } else if (forceop == "cancel-force-backfill") {
+      actual_op = OFR_BACKFILL | OFR_CANCEL;
+    } else if (forceop == "cancel-force-recovery") {
+      actual_op = OFR_RECOVERY | OFR_CANCEL;
+    }
+
+    // covnert pg names to pgs, discard any invalid ones while at it
+    {
+      // we don't want to keep pgidstr and pgidstr_nodup forever
+      vector<string> pgidstr;
+      // get pgids to process and prune duplicates
+      cmd_getval(g_ceph_context, cmdctx->cmdmap, "pgid", pgidstr);
+      set<string> pgidstr_nodup(pgidstr.begin(), pgidstr.end());
+      if (pgidstr.size() != pgidstr_nodup.size()) {
+       // move elements only when there were duplicates, as this
+       // reorders them
+       pgidstr.resize(pgidstr_nodup.size());
+       auto it = pgidstr_nodup.begin();
+       for (size_t i = 0 ; i < pgidstr_nodup.size(); i++) {
+         pgidstr[i] = std::move(*it++);
+       }
+      }
+
+      cluster_state.with_pgmap([&](const PGMap& pg_map) {
+       for (auto& pstr : pgidstr) {
+         pg_t parsed_pg;
+         if (!parsed_pg.parse(pstr.c_str())) {
+           ss << "invalid pgid '" << pstr << "'; ";
+           r = -EINVAL;
+         } else {
+           auto workit = pg_map.pg_stat.find(parsed_pg);
+           if (workit == pg_map.pg_stat.end()) {
+             ss << "pg " << pstr << " not exists; ";
+             r = -ENOENT;
+           } else {
+             pg_stat_t workpg = workit->second;
+
+             // discard pgs for which user requests are pointless
+             switch (actual_op)
+             {
+               case OFR_RECOVERY:
+                 if ((workpg.state & (PG_STATE_DEGRADED | PG_STATE_RECOVERY_WAIT | PG_STATE_RECOVERING)) == 0) {
+                   // don't return error, user script may be racing with cluster. not fatal.
+                   ss << "pg " << pstr << " doesn't require recovery; ";
+                   continue;
+                 } else  if (workpg.state & PG_STATE_FORCED_RECOVERY) {
+                   ss << "pg " << pstr << " recovery already forced; ";
+                   // return error, as it may be a bug in user script
+                   r = -EINVAL;
+                   continue;
+                 }
+                 break;
+               case OFR_BACKFILL:
+                 if ((workpg.state & (PG_STATE_DEGRADED | PG_STATE_BACKFILL_WAIT | PG_STATE_BACKFILL)) == 0) {
+                   ss << "pg " << pstr << " doesn't require backfilling; ";
+                   continue;
+                 } else  if (workpg.state & PG_STATE_FORCED_BACKFILL) {
+                   ss << "pg " << pstr << " backfill already forced; ";
+                   r = -EINVAL;
+                   continue;
+                 }
+                 break;
+               case OFR_BACKFILL | OFR_CANCEL:
+                 if ((workpg.state & PG_STATE_FORCED_BACKFILL) == 0) {
+                   ss << "pg " << pstr << " backfill not forced; ";
+                   continue;
+                 }
+                 break;
+               case OFR_RECOVERY | OFR_CANCEL:
+                 if ((workpg.state & PG_STATE_FORCED_RECOVERY) == 0) {
+                   ss << "pg " << pstr << " recovery not forced; ";
+                   continue;
+                 }
+                 break;
+               default:
+                 assert(0 == "actual_op value is not supported");
+             }
+
+             parsed_pgs.push_back(std::move(parsed_pg));
+           }
+         }
+       }
+
+       // group pgs to process by osd
+       for (auto& pgid : parsed_pgs) {
+         auto workit = pg_map.pg_stat.find(pgid);
+         if (workit != pg_map.pg_stat.end()) {
+           pg_stat_t workpg = workit->second;
+           set<int32_t> osds(workpg.up.begin(), workpg.up.end());
+           osds.insert(workpg.acting.begin(), workpg.acting.end());
+           for (auto i : osds) {
+             osdpgs[i].push_back(pgid);
+           }
+         }
+       }
+
+      });
+    }
+
+    // respond with error only when no pgs are correct
+    // yes, in case of mixed errors, only the last one will be emitted,
+    // but the message presented will be fine
+    if (parsed_pgs.size() != 0) {
+      // clear error to not confuse users/scripts
+      r = 0;
+    }
+
+    // optimize the command -> messages conversion, use only one message per distinct OSD
+    cluster_state.with_osdmap([&](const OSDMap& osdmap) {
+      for (auto& i : osdpgs) {
+       if (osdmap.is_up(i.first)) {
+         vector<pg_t> pgvec(make_move_iterator(i.second.begin()), make_move_iterator(i.second.end()));
+         auto p = osd_cons.find(i.first);
+         if (p == osd_cons.end()) {
+           ss << "osd." << i.first << " is not currently connected";
+           r = -EAGAIN;
+           continue;
+         }
+         for (auto& con : p->second) {
+           con->send_message(new MOSDForceRecovery(monc->get_fsid(), pgvec, actual_op));
+         }
+         ss << "instructing pg(s) " << i.second << " on osd." << i.first << " to " << forceop << "; ";
+       }
+      }
+    });
+    ss << std::endl;
+    cmdctx->reply(r, ss);
+    return true;
   } else {
     r = cluster_state.with_pgmap([&](const PGMap& pg_map) {
        return cluster_state.with_osdmap([&](const OSDMap& osdmap) {
@@ -937,7 +1082,7 @@ bool DaemonServer::handle_command(MCommand *m)
 
   // None of the special native commands, 
   MgrPyModule *handler = nullptr;
-  auto py_commands = py_modules.get_commands();
+  auto py_commands = py_modules.get_py_commands();
   for (const auto &pyc : py_commands) {
     auto pyc_prefix = cmddesc_get_prefix(pyc.cmdstring);
     dout(1) << "pyc_prefix: '" << pyc_prefix << "'" << dendl;
@@ -983,6 +1128,7 @@ void DaemonServer::_prune_pending_service_map()
        continue;
       }
       auto daemon = daemon_state.get(key);
+      Mutex::Locker l(daemon->lock);
       if (daemon->last_service_beacon == utime_t()) {
        // we must have just restarted; assume they are alive now.
        daemon->last_service_beacon = ceph_clock_now();
@@ -1023,6 +1169,8 @@ void DaemonServer::send_report()
   }
 
   auto m = new MMonMgrReport();
+  py_modules.get_health_checks(&m->health_checks);
+
   cluster_state.with_pgmap([&](const PGMap& pg_map) {
       cluster_state.update_delta_stats();
 
@@ -1045,6 +1193,7 @@ void DaemonServer::send_report()
 
          pg_map.get_health_checks(g_ceph_context, osdmap,
                                   &m->health_checks);
+
          dout(10) << m->health_checks.checks.size() << " health checks"
                   << dendl;
          dout(20) << "health checks:\n";