]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/common/admin_socket.cc
Import ceph 15.2.8
[ceph.git] / ceph / src / common / admin_socket.cc
index 022f35609eab370aa4b8fa51a9004625ad0ad939..d7a7ee5550c89afa72f1070a24267ff82ea8507a 100644 (file)
 #include "common/safe_io.h"
 #include "common/Thread.h"
 #include "common/version.h"
+#include "common/ceph_mutex.h"
 
+#ifndef WITH_SEASTAR
+#include "common/Cond.h"
+#endif
+
+#include "messages/MCommand.h"
+#include "messages/MCommandReply.h"
+#include "messages/MMonCommand.h"
+#include "messages/MMonCommandAck.h"
 
 // re-include our assert to clobber the system one; fix dout:
 #include "include/ceph_assert.h"
@@ -34,6 +43,7 @@
 
 
 using std::ostringstream;
+using namespace TOPNSPC::common;
 
 /*
  * UNIX domain sockets created by an application persist even after that
@@ -99,18 +109,18 @@ AdminSocket::~AdminSocket()
  * It only handles one connection at a time at the moment. All I/O is nonblocking,
  * so that we can implement sensible timeouts. [TODO: make all I/O nonblocking]
  *
- * This thread also listens to m_shutdown_rd_fd. If there is any data sent to this
- * pipe, the thread terminates itself gracefully, allowing the
- * AdminSocketConfigObs class to join() it.
+ * This thread also listens to m_wakeup_rd_fd. If there is any data sent to this
+ * pipe, the thread wakes up.  If m_shutdown is set, the thread terminates
+ * itself gracefully, allowing the AdminSocketConfigObs class to join() it.
  */
 
-std::string AdminSocket::create_shutdown_pipe(int *pipe_rd, int *pipe_wr)
+std::string AdminSocket::create_wakeup_pipe(int *pipe_rd, int *pipe_wr)
 {
   int pipefd[2];
-  if (pipe_cloexec(pipefd) < 0) {
+  if (pipe_cloexec(pipefd, O_NONBLOCK) < 0) {
     int e = errno;
     ostringstream oss;
-    oss << "AdminSocket::create_shutdown_pipe error: " << cpp_strerror(e);
+    oss << "AdminSocket::create_wakeup_pipe error: " << cpp_strerror(e);
     return oss.str();
   }
   
@@ -119,15 +129,15 @@ std::string AdminSocket::create_shutdown_pipe(int *pipe_rd, int *pipe_wr)
   return "";
 }
 
-std::string AdminSocket::destroy_shutdown_pipe()
+std::string AdminSocket::destroy_wakeup_pipe()
 {
-  // Send a byte to the shutdown pipe that the thread is listening to
+  // Send a byte to the wakeup pipe that the thread is listening to
   char buf[1] = { 0x0 };
-  int ret = safe_write(m_shutdown_wr_fd, buf, sizeof(buf));
+  int ret = safe_write(m_wakeup_wr_fd, buf, sizeof(buf));
 
   // Close write end
-  retry_sys_call(::close, m_shutdown_wr_fd);
-  m_shutdown_wr_fd = -1;
+  retry_sys_call(::close, m_wakeup_wr_fd);
+  m_wakeup_wr_fd = -1;
 
   if (ret != 0) {
     ostringstream oss;
@@ -140,8 +150,8 @@ std::string AdminSocket::destroy_shutdown_pipe()
 
   // Close read end. Doing this before join() blocks the listenter and prevents
   // joining.
-  retry_sys_call(::close, m_shutdown_rd_fd);
-  m_shutdown_rd_fd = -1;
+  retry_sys_call(::close, m_wakeup_rd_fd);
+  m_wakeup_rd_fd = -1;
 
   return "";
 }
@@ -224,9 +234,10 @@ void AdminSocket::entry() noexcept
     memset(fds, 0, sizeof(fds));
     fds[0].fd = m_sock_fd;
     fds[0].events = POLLIN | POLLRDBAND;
-    fds[1].fd = m_shutdown_rd_fd;
+    fds[1].fd = m_wakeup_rd_fd;
     fds[1].events = POLLIN | POLLRDBAND;
 
+    ldout(m_cct,20) << __func__ << " waiting" << dendl;
     int ret = poll(fds, 2, -1);
     if (ret < 0) {
       int err = errno;
@@ -237,12 +248,24 @@ void AdminSocket::entry() noexcept
                   << cpp_strerror(err) << dendl;
       return;
     }
+    ldout(m_cct,20) << __func__ << " awake" << dendl;
 
     if (fds[0].revents & POLLIN) {
       // Send out some data
       do_accept();
     }
     if (fds[1].revents & POLLIN) {
+      // read off one byte
+      char buf;
+      auto s = ::read(m_wakeup_rd_fd, &buf, 1);
+      if (s == -1) {
+        int e = errno;
+        ldout(m_cct, 5) << "AdminSocket: (ignoring) read(2) error: '"
+                       << cpp_strerror(e) << dendl;
+      }
+      do_tell_queue();
+    }
+    if (m_shutdown) {
       // Parent wants us to shut down
       return;
     }
@@ -274,7 +297,7 @@ void AdminSocket::chmod(mode_t mode)
   }
 }
 
-bool AdminSocket::do_accept()
+void AdminSocket::do_accept()
 {
   struct sockaddr_un address;
   socklen_t address_length = sizeof(address);
@@ -285,7 +308,7 @@ bool AdminSocket::do_accept()
     int err = errno;
     lderr(m_cct) << "AdminSocket: do_accept error: '"
                           << cpp_strerror(err) << dendl;
-    return false;
+    return;
   }
   ldout(m_cct, 30) << "AdminSocket: finished accept" << dendl;
 
@@ -300,7 +323,7 @@ bool AdminSocket::do_accept()
                     << cpp_strerror(ret) << dendl;
       }
       retry_sys_call(::close, connection_fd);
-      return false;
+      return;
     }
     if (cmd[0] == '\0') {
       // old protocol: __be32
@@ -334,165 +357,236 @@ bool AdminSocket::do_accept()
     if (++pos >= sizeof(cmd)) {
       lderr(m_cct) << "AdminSocket: error reading request too long" << dendl;
       retry_sys_call(::close, connection_fd);
-      return false;
+      return;
     }
   }
 
-  bool rval;
-  bufferlist out;
-  rval = execute_command(c, out);
-  if (rval) {
-    uint32_t len = htonl(out.length());
-    int ret = safe_write(connection_fd, &len, sizeof(len));
-    if (ret < 0) {
-      lderr(m_cct) << "AdminSocket: error writing response length "
-          << cpp_strerror(ret) << dendl;
-      rval = false;
-    } else {
-      if (out.write_fd(connection_fd) >= 0)
-        rval = true;
+  std::vector<std::string> cmdvec = { c };
+  bufferlist empty, out;
+  ostringstream err;
+  int rval = execute_command(cmdvec, empty /* inbl */, err, &out);
+
+  // Unfortunately, the asok wire protocol does not let us pass an error code,
+  // and many asok command implementations return helpful error strings.  So,
+  // let's prepend an error string to the output if there is an error code.
+  if (rval < 0) {
+    ostringstream ss;
+    ss << "ERROR: " << cpp_strerror(rval) << "\n";
+    ss << err.str() << "\n";
+    bufferlist o;
+    o.append(ss.str());
+    o.claim_append(out);
+    out.claim_append(o);
+  }
+  uint32_t len = htonl(out.length());
+  int ret = safe_write(connection_fd, &len, sizeof(len));
+  if (ret < 0) {
+    lderr(m_cct) << "AdminSocket: error writing response length "
+                << cpp_strerror(ret) << dendl;
+  } else {
+    if (out.write_fd(connection_fd) < 0) {
+      lderr(m_cct) << "AdminSocket: error writing response payload "
+                  << cpp_strerror(ret) << dendl;
     }
   }
-
   retry_sys_call(::close, connection_fd);
+}
+
+void AdminSocket::do_tell_queue()
+{
+  ldout(m_cct,10) << __func__ << dendl;
+  std::list<cref_t<MCommand>> q;
+  std::list<cref_t<MMonCommand>> lq;
+  {
+    std::lock_guard l(tell_lock);
+    q.swap(tell_queue);
+    lq.swap(tell_legacy_queue);
+  }
+  for (auto& m : q) {
+    bufferlist outbl;
+    execute_command(
+      m->cmd,
+      m->get_data(),
+      [m](int r, const std::string& err, bufferlist& outbl) {
+       auto reply = new MCommandReply(r, err);
+       reply->set_tid(m->get_tid());
+       reply->set_data(outbl);
+#ifdef WITH_SEASTAR
+#warning "fix message send with crimson"
+#else
+       m->get_connection()->send_message(reply);
+#endif
+      });
+  }
+  for (auto& m : lq) {
+    bufferlist outbl;
+    execute_command(
+      m->cmd,
+      m->get_data(),
+      [m](int r, const std::string& err, bufferlist& outbl) {
+       auto reply = new MMonCommandAck(m->cmd, r, err, 0);
+       reply->set_tid(m->get_tid());
+       reply->set_data(outbl);
+#ifdef WITH_SEASTAR
+#warning "fix message send with crimson"
+#else
+       m->get_connection()->send_message(reply);
+#endif
+      });
+  }
+}
+
+int AdminSocket::execute_command(
+  const std::vector<std::string>& cmd,
+  const bufferlist& inbl,
+  std::ostream& errss,
+  bufferlist *outbl)
+{
+#ifdef WITH_SEASTAR
+#warning "must implement admin socket blocking execute_command() for crimson"
+  return -ENOSYS;
+#else
+  bool done = false;
+  int rval = 0;
+  ceph::mutex mylock = ceph::make_mutex("admin_socket::excute_command::mylock");
+  ceph::condition_variable mycond;
+  C_SafeCond fin(mylock, mycond, &done, &rval);
+  execute_command(
+    cmd,
+    inbl,
+    [&errss, outbl, &fin](int r, const std::string& err, bufferlist& out) {
+      errss << err;
+      outbl->claim(out);
+      fin.finish(r);
+    });
+  {
+    std::unique_lock l{mylock};
+    mycond.wait(l, [&done] { return done;});
+  }
   return rval;
+#endif
 }
 
-int AdminSocket::execute_command(const std::string& cmd, ceph::bufferlist& out)
+void AdminSocket::execute_command(
+  const std::vector<std::string>& cmdvec,
+  const bufferlist& inbl,
+  std::function<void(int,const std::string&,bufferlist&)> on_finish)
 {
   cmdmap_t cmdmap;
   string format;
-  vector<string> cmdvec;
   stringstream errss;
-  cmdvec.push_back(cmd);
+  bufferlist empty;
+  ldout(m_cct,10) << __func__ << " cmdvec='" << cmdvec << "'" << dendl;
   if (!cmdmap_from_json(cmdvec, &cmdmap, errss)) {
     ldout(m_cct, 0) << "AdminSocket: " << errss.str() << dendl;
-    return false;
+    return on_finish(-EINVAL, "invalid json", empty);
   }
-  string match;
+  string prefix;
   try {
-    cmd_getval(m_cct, cmdmap, "format", format);
-    cmd_getval(m_cct, cmdmap, "prefix", match);
+    cmd_getval(cmdmap, "format", format);
+    cmd_getval(cmdmap, "prefix", prefix);
   } catch (const bad_cmd_get& e) {
-    return false;
+    return on_finish(-EINVAL, "invalid json, missing format and/or prefix",
+                    empty);
   }
-  if (format != "json" && format != "json-pretty" &&
-      format != "xml" && format != "xml-pretty")
-    format = "json-pretty";
 
-  std::unique_lock l(lock);
-  decltype(hooks)::iterator p;
-  while (match.size()) {
-    p = hooks.find(match);
-    if (p != hooks.cend())
-      break;
-
-    // drop right-most word
-    size_t pos = match.rfind(' ');
-    if (pos == std::string::npos) {
-      match.clear();  // we fail
-      break;
-    } else {
-      match.resize(pos);
-    }
+  Formatter *f = Formatter::create(format, "json-pretty", "json-pretty");
+
+  auto [retval, hook] = find_matched_hook(prefix, cmdmap);
+  switch (retval) {
+  case ENOENT:
+    lderr(m_cct) << "AdminSocket: request '" << cmdvec
+                << "' not defined" << dendl;
+    delete f;
+    return on_finish(-EINVAL, "unknown command prefix "s + prefix, empty);
+  case EINVAL:
+    delete f;
+    return on_finish(-EINVAL, "invalid command json", empty);
+  default:
+    assert(retval == 0);
   }
 
-  if (p == hooks.cend()) {
-    lderr(m_cct) << "AdminSocket: request '" << cmd << "' not defined" << dendl;
-    return false;
-  }
-  string args;
-  if (match != cmd) {
-    args = cmd.substr(match.length() + 1);
-  }
-
-  // Drop lock to avoid cycles in cases where the hook takes
-  // the same lock that was held during calls to register/unregister,
-  // and set in_hook to allow unregister to wait for us before
-  // removing this hook.
-  in_hook = true;
-  auto match_hook = p->second.hook;
-  l.unlock();
-  bool success = (validate(match, cmdmap, out) &&
-      match_hook->call(match, cmdmap, format, out));
-  l.lock();
+  hook->call_async(
+    prefix, cmdmap, f, inbl,
+    [f, on_finish](int r, const std::string& err, bufferlist& out) {
+      // handle either existing output in bufferlist *or* via formatter
+      if (r >= 0 && out.length() == 0) {
+       f->flush(out);
+      }
+      delete f;
+      on_finish(r, err, out);
+    });
+
+  std::unique_lock l(lock);
   in_hook = false;
   in_hook_cond.notify_all();
-  if (!success) {
-    ldout(m_cct, 0) << "AdminSocket: request '" << match << "' args '" << args
-        << "' to " << match_hook << " failed" << dendl;
-    out.append("failed");
-  } else {
-    ldout(m_cct, 5) << "AdminSocket: request '" << match << "' '" << args
-        << "' to " << match_hook
-        << " returned " << out.length() << " bytes" << dendl;
-  }
-  return true;
 }
 
-
-
-bool AdminSocket::validate(const std::string& command,
-                          const cmdmap_t& cmdmap,
-                          bufferlist& out) const
+std::pair<int, AdminSocketHook*>
+AdminSocket::find_matched_hook(std::string& prefix,
+                              const cmdmap_t& cmdmap)
 {
-  stringstream os;
-  if (validate_cmd(m_cct, hooks.at(command).desc, cmdmap, os)) {
-    return true;
-  } else {
-    out.append(os);
-    return false;
+  std::unique_lock l(lock);
+  // Drop lock after done with the lookup to avoid cycles in cases where the
+  // hook takes the same lock that was held during calls to
+  // register/unregister, and set in_hook to allow unregister to wait for us
+  // before removing this hook.
+  auto [hooks_begin, hooks_end] = hooks.equal_range(prefix);
+  if (hooks_begin == hooks_end) {
+    return {ENOENT, nullptr};
   }
+  // make sure one of the registered commands with this prefix validates.
+  stringstream errss;
+  for (auto hook = hooks_begin; hook != hooks_end; ++hook) {
+    if (validate_cmd(m_cct, hook->second.desc, cmdmap, errss)) {
+      in_hook = true;
+      return {0, hook->second.hook};
+    }
+  }
+  return {EINVAL, nullptr};
+}
+
+void AdminSocket::queue_tell_command(cref_t<MCommand> m)
+{
+  ldout(m_cct,10) << __func__ << " " << *m << dendl;
+  std::lock_guard l(tell_lock);
+  tell_queue.push_back(std::move(m));
+  wakeup();
+}
+void AdminSocket::queue_tell_command(cref_t<MMonCommand> m)
+{
+  ldout(m_cct,10) << __func__ << " " << *m << dendl;
+  std::lock_guard l(tell_lock);
+  tell_legacy_queue.push_back(std::move(m));
+  wakeup();
 }
 
-int AdminSocket::register_command(std::string_view command,
-                                 std::string_view cmddesc,
+int AdminSocket::register_command(std::string_view cmddesc,
                                  AdminSocketHook *hook,
                                  std::string_view help)
 {
   int ret;
   std::unique_lock l(lock);
-  auto i = hooks.find(command);
-  if (i != hooks.cend()) {
-    ldout(m_cct, 5) << "register_command " << command << " hook " << hook
+  string prefix = cmddesc_get_prefix(cmddesc);
+  auto i = hooks.find(prefix);
+  if (i != hooks.cend() &&
+      i->second.desc == cmddesc) {
+    ldout(m_cct, 5) << "register_command " << prefix
+                   << " cmddesc " << cmddesc << " hook " << hook
                    << " EEXIST" << dendl;
     ret = -EEXIST;
   } else {
-    ldout(m_cct, 5) << "register_command " << command << " hook " << hook
+    ldout(m_cct, 5) << "register_command " << prefix << " hook " << hook
                    << dendl;
     hooks.emplace_hint(i,
                       std::piecewise_construct,
-                      std::forward_as_tuple(command),
+                      std::forward_as_tuple(prefix),
                       std::forward_as_tuple(hook, cmddesc, help));
     ret = 0;
   }
   return ret;
 }
 
-int AdminSocket::unregister_command(std::string_view command)
-{
-  int ret;
-  std::unique_lock l(lock);
-  auto i = hooks.find(command);
-  if (i != hooks.cend()) {
-    ldout(m_cct, 5) << "unregister_command " << command << dendl;
-
-    // If we are currently processing a command, wait for it to
-    // complete in case it referenced the hook that we are
-    // unregistering.
-    in_hook_cond.wait(l, [this]() { return !in_hook; });
-
-    hooks.erase(i);
-
-
-    ret = 0;
-  } else {
-    ldout(m_cct, 5) << "unregister_command " << command << " ENOENT" << dendl;
-    ret = -ENOENT;
-  }
-  return ret;
-}
-
 void AdminSocket::unregister_commands(const AdminSocketHook *hook)
 {
   std::unique_lock l(lock);
@@ -514,27 +608,25 @@ void AdminSocket::unregister_commands(const AdminSocketHook *hook)
 
 class VersionHook : public AdminSocketHook {
 public:
-  bool call(std::string_view command, const cmdmap_t& cmdmap,
-           std::string_view format, bufferlist& out) override {
+  int call(std::string_view command, const cmdmap_t& cmdmap,
+          Formatter *f,
+          std::ostream& errss,
+          bufferlist& out) override {
     if (command == "0"sv) {
       out.append(CEPH_ADMIN_SOCK_VERSION);
     } else {
-      JSONFormatter jf;
-      jf.open_object_section("version");
+      f->open_object_section("version");
       if (command == "version") {
-       jf.dump_string("version", ceph_version_to_str());
-       jf.dump_string("release", ceph_release_name(ceph_release()));
-       jf.dump_string("release_type", ceph_release_type());
+       f->dump_string("version", ceph_version_to_str());
+       f->dump_string("release", ceph_release_to_str());
+       f->dump_string("release_type", ceph_release_type());
       } else if (command == "git_version") {
-       jf.dump_string("git_version", git_version_to_str());
+       f->dump_string("git_version", git_version_to_str());
       }
       ostringstream ss;
-      jf.close_section();
-      jf.enable_line_break();
-      jf.flush(ss);
-      out.append(ss.str());
+      f->close_section();
     }
-    return true;
+    return 0;
   }
 };
 
@@ -542,21 +634,17 @@ class HelpHook : public AdminSocketHook {
   AdminSocket *m_as;
 public:
   explicit HelpHook(AdminSocket *as) : m_as(as) {}
-  bool call(std::string_view command, const cmdmap_t& cmdmap,
-           std::string_view format,
-           bufferlist& out) override {
-    std::unique_ptr<Formatter> f(Formatter::create(format, "json-pretty"sv,
-                                                  "json-pretty"sv));
+  int call(std::string_view command, const cmdmap_t& cmdmap,
+          Formatter *f,
+          std::ostream& errss,
+          bufferlist& out) override {
     f->open_object_section("help");
     for (const auto& [command, info] : m_as->hooks) {
       if (info.help.length())
        f->dump_string(command.c_str(), info.help);
     }
     f->close_section();
-    ostringstream ss;
-    f->flush(ss);
-    out.append(ss.str());
-    return true;
+    return 0;
   }
 };
 
@@ -564,30 +652,27 @@ class GetdescsHook : public AdminSocketHook {
   AdminSocket *m_as;
 public:
   explicit GetdescsHook(AdminSocket *as) : m_as(as) {}
-  bool call(std::string_view command, const cmdmap_t& cmdmap,
-           std::string_view format, bufferlist& out) override {
+  int call(std::string_view command, const cmdmap_t& cmdmap,
+          Formatter *f,
+          std::ostream& errss,
+          bufferlist& out) override {
     int cmdnum = 0;
-    JSONFormatter jf;
-    jf.open_object_section("command_descriptions");
+    f->open_object_section("command_descriptions");
     for (const auto& [command, info] : m_as->hooks) {
       // GCC 8 actually has [[maybe_unused]] on a structured binding
       // do what you'd expect. GCC 7 does not.
       (void)command;
       ostringstream secname;
       secname << "cmd" << setfill('0') << std::setw(3) << cmdnum;
-      dump_cmd_and_help_to_json(&jf,
+      dump_cmd_and_help_to_json(f,
                                 CEPH_FEATURES_ALL,
                                secname.str().c_str(),
                                info.desc,
                                info.help);
       cmdnum++;
     }
-    jf.close_section(); // command_descriptions
-    jf.enable_line_break();
-    ostringstream ss;
-    jf.flush(ss);
-    out.append(ss.str());
-    return true;
+    f->close_section(); // command_descriptions
+    return 0;
   }
 };
 
@@ -598,7 +683,7 @@ bool AdminSocket::init(const std::string& path)
   /* Set up things for the new thread */
   std::string err;
   int pipe_rd = -1, pipe_wr = -1;
-  err = create_shutdown_pipe(&pipe_rd, &pipe_wr);
+  err = create_wakeup_pipe(&pipe_rd, &pipe_wr);
   if (!err.empty()) {
     lderr(m_cct) << "AdminSocketConfigObs::init: error: " << err << dendl;
     return false;
@@ -614,20 +699,20 @@ bool AdminSocket::init(const std::string& path)
 
   /* Create new thread */
   m_sock_fd = sock_fd;
-  m_shutdown_rd_fd = pipe_rd;
-  m_shutdown_wr_fd = pipe_wr;
+  m_wakeup_rd_fd = pipe_rd;
+  m_wakeup_wr_fd = pipe_wr;
   m_path = path;
 
   version_hook = std::make_unique<VersionHook>();
-  register_command("0", "0", version_hook.get(), "");
-  register_command("version", "version", version_hook.get(), "get ceph version");
-  register_command("git_version", "git_version", version_hook.get(),
+  register_command("0", version_hook.get(), "");
+  register_command("version", version_hook.get(), "get ceph version");
+  register_command("git_version", version_hook.get(),
                   "get git sha1");
   help_hook = std::make_unique<HelpHook>(this);
-  register_command("help", "help", help_hook.get(),
+  register_command("help", help_hook.get(),
                   "list available commands");
   getdescs_hook = std::make_unique<GetdescsHook>(this);
-  register_command("get_command_descriptions", "get_command_descriptions",
+  register_command("get_command_descriptions",
                   getdescs_hook.get(), "list available commands");
 
   th = make_named_thread("admin_socket", &AdminSocket::entry, this);
@@ -640,12 +725,13 @@ void AdminSocket::shutdown()
   // Under normal operation this is unlikely to occur.  However for some unit
   // tests, some object members are not initialized and so cannot be deleted
   // without fault.
-  if (m_shutdown_wr_fd < 0)
+  if (m_wakeup_wr_fd < 0)
     return;
 
   ldout(m_cct, 5) << "shutdown" << dendl;
+  m_shutdown = true;
 
-  auto err = destroy_shutdown_pipe();
+  auto err = destroy_wakeup_pipe();
   if (!err.empty()) {
     lderr(m_cct) << "AdminSocket::shutdown: error: " << err << dendl;
   }
@@ -655,12 +741,20 @@ void AdminSocket::shutdown()
   unregister_commands(version_hook.get());
   version_hook.reset();
 
-  unregister_command("help");
+  unregister_commands(help_hook.get());
   help_hook.reset();
 
-  unregister_command("get_command_descriptions");
+  unregister_commands(getdescs_hook.get());
   getdescs_hook.reset();
 
   remove_cleanup_file(m_path);
   m_path.clear();
 }
+
+void AdminSocket::wakeup()
+{
+  // Send a byte to the wakeup pipe that the thread is listening to
+  char buf[1] = { 0x0 };
+  int r = safe_write(m_wakeup_wr_fd, buf, sizeof(buf));
+  (void)r;
+}