]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/mon/MonClient.h
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / mon / MonClient.h
index cd5394f5e285ce84bd06ebabd5cf18ba9a670ee6..cb16965450bf27f237828fccb99bf4da778ad309 100644 (file)
 #include "MonMap.h"
 #include "MonSub.h"
 
+#include "common/async/completion.h"
 #include "common/Timer.h"
-#include "common/Finisher.h"
 #include "common/config.h"
+#include "messages/MMonGetVersion.h"
 
 #include "auth/AuthClient.h"
 #include "auth/AuthServer.h"
 class MMonMap;
 class MConfig;
 class MMonGetVersionReply;
-struct MMonSubscribeAck;
 class MMonCommandAck;
-struct MAuthReply;
 class LogClient;
-class AuthAuthorizer;
 class AuthClientHandler;
 class AuthRegistry;
 class KeyRing;
@@ -141,7 +139,6 @@ private:
 
 struct MonClientPinger : public Dispatcher,
                         public AuthClient {
-
   ceph::mutex lock = ceph::make_mutex("MonClientPinger::lock");
   ceph::condition_variable ping_recvd_cond;
   std::string *result;
@@ -240,11 +237,49 @@ struct MonClientPinger : public Dispatcher,
   }
 };
 
+const boost::system::error_category& monc_category() noexcept;
+
+enum class monc_errc {
+  shutting_down = 1, // Command failed due to MonClient shutting down
+  session_reset, // Monitor session was reset
+  rank_dne, // Requested monitor rank does not exist
+  mon_dne, // Requested monitor does not exist
+  timed_out, // Monitor operation timed out
+  mon_unavailable // Monitor unavailable
+};
+
+namespace boost::system {
+template<>
+struct is_error_code_enum<::monc_errc> {
+  static const bool value = true;
+};
+}
+
+//  implicit conversion:
+inline boost::system::error_code make_error_code(monc_errc e) noexcept {
+  return { static_cast<int>(e), monc_category() };
+}
+
+// explicit conversion:
+inline boost::system::error_condition make_error_condition(monc_errc e) noexcept {
+  return { static_cast<int>(e), monc_category() };
+}
+
+const boost::system::error_category& monc_category() noexcept;
 
 class MonClient : public Dispatcher,
                  public AuthClient,
                  public AuthServer /* for mgr, osd, mds */ {
+  static constexpr auto dout_subsys = ceph_subsys_monc;
 public:
+  // Error, Newest, Oldest
+  using VersionSig = void(boost::system::error_code, version_t, version_t);
+  using VersionCompletion = ceph::async::Completion<VersionSig>;
+
+  using CommandSig = void(boost::system::error_code, std::string,
+                         ceph::buffer::list);
+  using CommandCompletion = ceph::async::Completion<CommandSig>;
+
   MonMap monmap;
   std::map<std::string,std::string> config_mgr;
 
@@ -259,7 +294,8 @@ private:
 
   mutable ceph::mutex monc_lock = ceph::make_mutex("MonClient::monc_lock");
   SafeTimer timer;
-  Finisher finisher;
+  boost::asio::io_context& service;
+  boost::asio::io_context::strand finish_strand{service};
 
   bool initialized;
   bool stopping = false;
@@ -290,7 +326,9 @@ private:
   bool want_monmap;
   ceph::condition_variable map_cond;
   bool passthrough_monmap = false;
-  bool got_config = false;
+
+  bool want_bootstrap_config = false;
+  ceph::ref_t<MConfig> bootstrap_config;
 
   // authenticate
   std::unique_ptr<AuthClientHandler> auth;
@@ -307,14 +345,13 @@ private:
   double reopen_interval_multiplier;
 
   Dispatcher *handle_authentication_dispatcher = nullptr;
-  
   bool _opened() const;
   bool _hunting() const;
   void _start_hunting();
   void _finish_hunting(int auth_err);
   void _finish_auth(int auth_err);
   void _reopen_session(int rank = -1);
-  MonConnection& _add_conn(unsigned rank);
+  void _add_conn(unsigned rank);
   void _add_conns();
   void _un_backoff();
   void _send_mon_message(MessageRef m);
@@ -364,7 +401,7 @@ public:
     bool more,
     uint32_t auth_method,
     const ceph::buffer::list& bl,
-    ceph::buffer::list *reply);
+    ceph::buffer::list *reply) override;
 
   void set_entity_name(EntityName name) { entity_name = name; }
   void set_handle_authentication_dispatcher(Dispatcher *d) {
@@ -413,12 +450,12 @@ public:
     std::lock_guard l(monc_lock);
     return sub.inc_want(what, start, flags);
   }
-  
+
   std::unique_ptr<KeyRing> keyring;
   std::unique_ptr<RotatingKeyRing> rotating_secrets;
 
  public:
-  explicit MonClient(CephContext *cct_);
+  MonClient(CephContext *cct_, boost::asio::io_context& service);
   MonClient(const MonClient &) = delete;
   MonClient& operator=(const MonClient &) = delete;
   ~MonClient() override;
@@ -517,66 +554,207 @@ private:
   struct MonCommand {
     // for tell only
     std::string target_name;
-    int target_rank;
+    int target_rank = -1;
     ConnectionRef target_con;
     std::unique_ptr<MonConnection> target_session;
     unsigned send_attempts = 0;  ///< attempt count for legacy mons
     utime_t last_send_attempt;
-
     uint64_t tid;
     std::vector<std::string> cmd;
     ceph::buffer::list inbl;
-    ceph::buffer::list *poutbl;
-    std::string *prs;
-    int *prval;
-    Context *onfinish, *ontimeout;
-
-    explicit MonCommand(uint64_t t)
-      : target_rank(-1),
-       tid(t),
-       poutbl(NULL), prs(NULL), prval(NULL), onfinish(NULL), ontimeout(NULL)
-    {}
+    std::unique_ptr<CommandCompletion> onfinish;
+    std::optional<boost::asio::steady_timer> cancel_timer;
+
+    MonCommand(MonClient& monc, uint64_t t, std::unique_ptr<CommandCompletion> onfinish)
+      : tid(t), onfinish(std::move(onfinish)) {
+      auto timeout =
+          monc.cct->_conf.get_val<std::chrono::seconds>("rados_mon_op_timeout");
+      if (timeout.count() > 0) {
+       cancel_timer.emplace(monc.service, timeout);
+       cancel_timer->async_wait(
+          [this, &monc](boost::system::error_code ec) {
+           if (ec)
+             return;
+           std::scoped_lock l(monc.monc_lock);
+           monc._cancel_mon_command(tid);
+         });
+      }
+    }
 
     bool is_tell() const {
       return target_name.size() || target_rank >= 0;
     }
   };
+  friend MonCommand;
   std::map<uint64_t,MonCommand*> mon_commands;
 
   void _send_command(MonCommand *r);
   void _check_tell_commands();
   void _resend_mon_commands();
   int _cancel_mon_command(uint64_t tid);
-  void _finish_command(MonCommand *r, int ret, std::string rs);
+  void _finish_command(MonCommand *r, boost::system::error_code ret, std::string_view rs,
+                      bufferlist&& bl);
   void _finish_auth();
   void handle_mon_command_ack(MMonCommandAck *ack);
   void handle_command_reply(MCommandReply *reply);
 
 public:
-  void start_mon_command(const std::vector<std::string>& cmd, const ceph::buffer::list& inbl,
-                       ceph::buffer::list *outbl, std::string *outs,
-                       Context *onfinish);
+  template<typename CompletionToken>
+  auto start_mon_command(const std::vector<std::string>& cmd,
+                         const ceph::buffer::list& inbl,
+                        CompletionToken&& token) {
+    ldout(cct,10) << __func__ << " cmd=" << cmd << dendl;
+    boost::asio::async_completion<CompletionToken, CommandSig> init(token);
+    {
+      std::scoped_lock l(monc_lock);
+      auto h = CommandCompletion::create(service.get_executor(),
+                                        std::move(init.completion_handler));
+      if (!initialized || stopping) {
+       ceph::async::post(std::move(h), monc_errc::shutting_down, std::string{},
+                         bufferlist{});
+      } else {
+       auto r = new MonCommand(*this, ++last_mon_command_tid, std::move(h));
+       r->cmd = cmd;
+       r->inbl = inbl;
+       mon_commands.emplace(r->tid, r);
+       _send_command(r);
+      }
+    }
+    return init.result.get();
+  }
+
+  template<typename CompletionToken>
+  auto start_mon_command(int mon_rank, const std::vector<std::string>& cmd,
+                        const ceph::buffer::list& inbl, CompletionToken&& token) {
+    ldout(cct,10) << __func__ << " cmd=" << cmd << dendl;
+    boost::asio::async_completion<CompletionToken, CommandSig> init(token);
+    {
+      std::scoped_lock l(monc_lock);
+      auto h = CommandCompletion::create(service.get_executor(),
+                                        std::move(init.completion_handler));
+      if (!initialized || stopping) {
+       ceph::async::post(std::move(h), monc_errc::shutting_down, std::string{},
+                         bufferlist{});
+      } else {
+       auto r = new MonCommand(*this, ++last_mon_command_tid, std::move(h));
+       r->target_rank = mon_rank;
+       r->cmd = cmd;
+       r->inbl = inbl;
+       mon_commands.emplace(r->tid, r);
+       _send_command(r);
+      }
+    }
+    return init.result.get();
+  }
+
+  template<typename CompletionToken>
+  auto start_mon_command(const std::string& mon_name,
+                         const std::vector<std::string>& cmd,
+                        const ceph::buffer::list& inbl,
+                        CompletionToken&& token) {
+    ldout(cct,10) << __func__ << " cmd=" << cmd << dendl;
+    boost::asio::async_completion<CompletionToken, CommandSig> init(token);
+    {
+      std::scoped_lock l(monc_lock);
+      auto h = CommandCompletion::create(service.get_executor(),
+                                        std::move(init.completion_handler));
+      if (!initialized || stopping) {
+       ceph::async::post(std::move(h), monc_errc::shutting_down, std::string{},
+                         bufferlist{});
+      } else {
+       auto r = new MonCommand(*this, ++last_mon_command_tid, std::move(h));
+       // detect/tolerate mon *rank* passed as a string
+       std::string err;
+       int rank = strict_strtoll(mon_name.c_str(), 10, &err);
+       if (err.size() == 0 && rank >= 0) {
+         ldout(cct,10) << __func__ << " interpreting name '" << mon_name
+                       << "' as rank " << rank << dendl;
+         r->target_rank = rank;
+       } else {
+         r->target_name = mon_name;
+       }
+       r->cmd = cmd;
+       r->inbl = inbl;
+       mon_commands.emplace(r->tid, r);
+       _send_command(r);
+      }
+    }
+    return init.result.get();
+  }
+
+  class ContextVerter {
+    std::string* outs;
+    ceph::bufferlist* outbl;
+    Context* onfinish;
+
+  public:
+    ContextVerter(std::string* outs, ceph::bufferlist* outbl, Context* onfinish)
+      : outs(outs), outbl(outbl), onfinish(onfinish) {}
+    ~ContextVerter() = default;
+    ContextVerter(const ContextVerter&) = default;
+    ContextVerter& operator =(const ContextVerter&) = default;
+    ContextVerter(ContextVerter&&) = default;
+    ContextVerter& operator =(ContextVerter&&) = default;
+
+    void operator()(boost::system::error_code e,
+                   std::string s,
+                   ceph::bufferlist bl) {
+      if (outs)
+       *outs = std::move(s);
+      if (outbl)
+       *outbl = std::move(bl);
+      if (onfinish)
+       onfinish->complete(ceph::from_error_code(e));
+    }
+  };
+
+  void start_mon_command(const vector<string>& cmd, const bufferlist& inbl,
+                        bufferlist *outbl, string *outs,
+                        Context *onfinish) {
+    start_mon_command(cmd, inbl, ContextVerter(outs, outbl, onfinish));
+  }
   void start_mon_command(int mon_rank,
-                         const std::vector<std::string>& cmd, const ceph::buffer::list& inbl,
-                         ceph::buffer::list *outbl, std::string *outs,
-                         Context *onfinish);
-  void start_mon_command(const std::string &mon_name,  ///< mon name, with mon. prefix
-                         const std::vector<std::string>& cmd, const ceph::buffer::list& inbl,
-                         ceph::buffer::list *outbl, std::string *outs,
-                         Context *onfinish);
+                        const vector<string>& cmd, const bufferlist& inbl,
+                        bufferlist *outbl, string *outs,
+                        Context *onfinish) {
+    start_mon_command(mon_rank, cmd, inbl, ContextVerter(outs, outbl, onfinish));
+  }
+  void start_mon_command(const string &mon_name,  ///< mon name, with mon. prefix
+                        const vector<string>& cmd, const bufferlist& inbl,
+                        bufferlist *outbl, string *outs,
+                        Context *onfinish) {
+    start_mon_command(mon_name, cmd, inbl, ContextVerter(outs, outbl, onfinish));
+  }
+
 
   // version requests
 public:
   /**
    * get latest known version(s) of cluster map
    *
-   * @param map std::string name of map (e.g., 'osdmap')
-   * @param newest pointer where newest map version will be stored
-   * @param oldest pointer where oldest map version will be stored
-   * @param onfinish context that will be triggered on completion
-   * @return (via context) 0 on success, -EAGAIN if we need to resubmit our request
+   * @param map string name of map (e.g., 'osdmap')
+   * @param token context that will be triggered on completion
+   * @return (via Completion) {} on success,
+   *         boost::system::errc::resource_unavailable_try_again if we need to
+   *         resubmit our request
    */
-  void get_version(std::string map, version_t *newest, version_t *oldest, Context *onfinish);
+  template<typename CompletionToken>
+  auto get_version(std::string&& map, CompletionToken&& token) {
+    boost::asio::async_completion<CompletionToken, VersionSig> init(token);
+    {
+      std::scoped_lock l(monc_lock);
+      auto m = ceph::make_message<MMonGetVersion>();
+      m->what = std::move(map);
+      m->handle = ++version_req_id;
+      version_requests.emplace(m->handle,
+                              VersionCompletion::create(
+                                service.get_executor(),
+                                std::move(init.completion_handler)));
+      _send_mon_message(m);
+    }
+    return init.result.get();
+  }
+
   /**
    * Run a callback within our lock, with a reference
    * to the MonMap
@@ -595,16 +773,10 @@ public:
   md_config_t::config_callback get_config_callback();
 
 private:
-  struct version_req_d {
-    Context *context;
-    version_t *newest, *oldest;
-    version_req_d(Context *con, version_t *n, version_t *o) : context(con),newest(n), oldest(o) {}
-  };
 
-  std::map<ceph_tid_t, version_req_d*> version_requests;
+  std::map<ceph_tid_t, std::unique_ptr<VersionCompletion>> version_requests;
   ceph_tid_t version_req_id;
   void handle_get_version_reply(MMonGetVersionReply* m);
-
   md_config_t::config_callback config_cb;
   std::function<void(void)> config_notify_cb;
 };