]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/msg/Messenger.h
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / msg / Messenger.h
index 5975c583d3a4ca5c6112fe173f71116dc9bd298f..2602765cf4457813dbeb6d30abf103c88c11aeaa 100644 (file)
@@ -1,4 +1,4 @@
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
 // vim: ts=8 sw=2 smarttab
 /*
  * Ceph - scalable distributed file system
@@ -7,9 +7,9 @@
  *
  * This is free software; you can redistribute it and/or
  * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software 
+ * License version 2.1, as published by the Free Software
  * Foundation.  See file COPYING.
- * 
+ *
  */
 
 
 #define CEPH_MESSENGER_H
 
 #include <map>
-using namespace std;
+#include <deque>
+
+#include <errno.h>
+#include <sstream>
+#include <memory>
 
 #include "Message.h"
 #include "Dispatcher.h"
-#include "common/Mutex.h"
+#include "Policy.h"
 #include "common/Cond.h"
+#include "common/Mutex.h"
+#include "common/Throttle.h"
 #include "include/Context.h"
 #include "include/types.h"
 #include "include/ceph_features.h"
 #include "auth/Crypto.h"
+#include "common/item_history.h"
+#include "auth/AuthRegistry.h"
+#include "include/ceph_assert.h"
 
 #include <errno.h>
 #include <sstream>
@@ -37,19 +46,44 @@ using namespace std;
 
 class Timer;
 
+class AuthClient;
+class AuthServer;
+
+#ifdef UNIT_TESTS_BUILT
+
+struct Interceptor {
+  std::mutex lock;
+  std::condition_variable cond_var;
+
+  enum ACTION : uint32_t {
+    CONTINUE = 0,
+    FAIL,
+    STOP
+  };
+
+  virtual ~Interceptor() {}
+  virtual ACTION intercept(Connection *conn, uint32_t step) = 0;
+};
+
+#endif
 
 class Messenger {
 private:
-  list<Dispatcher*> dispatchers;
-  list <Dispatcher*> fast_dispatchers;
+  std::deque<Dispatcher*> dispatchers;
+  std::deque<Dispatcher*> fast_dispatchers;
   ZTracer::Endpoint trace_endpoint;
 
+protected:
   void set_endpoint_addr(const entity_addr_t& a,
                          const entity_name_t &name);
 
 protected:
   /// the "name" of the local daemon. eg client.99
-  entity_inst_t my_inst;
+  entity_name_t my_name;
+
+  /// my addr
+  safe_item_history<entity_addrvec_t> my_addrs;
+
   int default_send_priority;
   /// set to true once the Messenger has started, and set to false on shutdown
   bool started;
@@ -57,6 +91,13 @@ protected:
   int socket_priority;
 
 public:
+  AuthClient *auth_client = 0;
+  AuthServer *auth_server = 0;
+
+#ifdef UNIT_TESTS_BUILT
+  Interceptor *interceptor = nullptr;
+#endif
+
   /**
    * Various Messenger conditional config/type flags to allow
    * different "transport" Messengers to tune themselves
@@ -72,85 +113,19 @@ public:
   CephContext *cct;
   int crcflags;
 
-  /**
-   * A Policy describes the rules of a Connection. Is there a limit on how
-   * much data this Connection can have locally? When the underlying connection
-   * experiences an error, does the Connection disappear? Can this Messenger
-   * re-establish the underlying connection?
-   */
-  struct Policy {
-    /// If true, the Connection is tossed out on errors.
-    bool lossy;
-    /// If true, the underlying connection can't be re-established from this end.
-    bool server;
-    /// If true, we will standby when idle
-    bool standby;
-    /// If true, we will try to detect session resets
-    bool resetcheck;
-    /**
-     *  The throttler is used to limit how much data is held by Messages from
-     *  the associated Connection(s). When reading in a new Message, the Messenger
-     *  will call throttler->throttle() for the size of the new Message.
-     */
-    Throttle *throttler_bytes;
-    Throttle *throttler_messages;
-
-    /// Specify features supported locally by the endpoint.
-    uint64_t features_supported;
-    /// Specify features any remotes must have to talk to this endpoint.
-    uint64_t features_required;
-
-    Policy()
-      : lossy(false), server(false), standby(false), resetcheck(true),
-       throttler_bytes(NULL),
-       throttler_messages(NULL),
-       features_supported(CEPH_FEATURES_SUPPORTED_DEFAULT),
-       features_required(0) {}
-  private:
-    Policy(bool l, bool s, bool st, bool r, uint64_t req)
-      : lossy(l), server(s), standby(st), resetcheck(r),
-       throttler_bytes(NULL),
-       throttler_messages(NULL),
-       features_supported(CEPH_FEATURES_SUPPORTED_DEFAULT),
-       features_required(req) {}
-
-  public:
-    static Policy stateful_server(uint64_t req) {
-      return Policy(false, true, true, true, req);
-    }
-    static Policy stateless_server(uint64_t req) {
-      return Policy(true, true, false, false, req);
-    }
-    static Policy lossless_peer(uint64_t req) {
-      return Policy(false, false, true, false, req);
-    }
-    static Policy lossless_peer_reuse(uint64_t req) {
-      return Policy(false, false, true, true, req);
-    }
-    static Policy lossy_client(uint64_t req) {
-      return Policy(true, false, false, false, req);
-    }
-    static Policy lossless_client(uint64_t req) {
-      return Policy(false, false, false, true, req);
-    }
-  };
+  using Policy = ceph::net::Policy<Throttle>;
 
+protected:
+  // for authentication
+  AuthRegistry auth_registry;
+
+public:
   /**
    * Messenger constructor. Call this from your implementation.
    * Messenger users should construct full implementations directly,
    * or use the create() function.
    */
-  Messenger(CephContext *cct_, entity_name_t w)
-    : trace_endpoint("0.0.0.0", 0, "Messenger"),
-      my_inst(),
-      default_send_priority(CEPH_MSG_PRIO_DEFAULT), started(false),
-      magic(0),
-      socket_priority(-1),
-      cct(cct_),
-      crcflags(get_default_crc_flags(cct->_conf))
-  {
-    my_inst.name = w;
-  }
+  Messenger(CephContext *cct_, entity_name_t w);
   virtual ~Messenger() {}
 
   /**
@@ -193,20 +168,15 @@ public:
    * @defgroup Accessors
    * @{
    */
+  int get_mytype() const { return my_name.type(); }
+
   /**
-   * Retrieve the Messenger's instance.
+   * Retrieve the Messenger's name
    *
-   * @return A const reference to the instance this Messenger
+   * @return A const reference to the name this Messenger
    * currently believes to be its own.
    */
-  const entity_inst_t& get_myinst() { return my_inst; }
-  /**
-   * set messenger's instance
-   */
-  void set_myinst(entity_inst_t i) { my_inst = i; }
-
-  uint32_t get_magic() { return magic; }
-  void set_magic(int _magic) { magic = _magic; }
+  const entity_name_t& get_myname() { return my_name; }
 
   /**
    * Retrieve the Messenger's address.
@@ -214,14 +184,41 @@ public:
    * @return A const reference to the address this Messenger
    * currently believes to be its own.
    */
-  const entity_addr_t& get_myaddr() { return my_inst.addr; }
+  const entity_addrvec_t& get_myaddrs() {
+    return *my_addrs;
+  }
+
+  /**
+   * get legacy addr for myself, suitable for protocol v1
+   *
+   * Note that myaddrs might be a proper addrvec with v1 in it, or it might be an
+   * ANY addr (if i am a pure client).
+   */
+  entity_addr_t get_myaddr_legacy() {
+    return my_addrs->as_legacy_addr();
+  }
+
+
+  /**
+   * set messenger's instance
+   */
+  uint32_t get_magic() { return magic; }
+  void set_magic(int _magic) { magic = _magic; }
+
+  void set_auth_client(AuthClient *ac) {
+    auth_client = ac;
+  }
+  void set_auth_server(AuthServer *as) {
+    auth_server = as;
+  }
+
 protected:
   /**
    * set messenger's address
    */
-  virtual void set_myaddr(const entity_addr_t& a) {
-    my_inst.addr = a;
-    set_endpoint_addr(a, my_inst.name);
+  virtual void set_myaddrs(const entity_addrvec_t& a) {
+    my_addrs = a;
+    set_endpoint_addr(a.front(), my_name);
   }
 public:
   /**
@@ -231,13 +228,6 @@ public:
     return &trace_endpoint;
   }
 
-  /**
-   * Retrieve the Messenger's name.
-   *
-   * @return A const reference to the name this Messenger
-   * currently believes to be its own.
-   */
-  const entity_name_t& get_myname() { return my_inst.name; }
   /**
    * Set the name of the local entity. The name is reported to others and
    * can be changed while the system is running, but doing so at incorrect
@@ -245,7 +235,8 @@ public:
    *
    * @param m The name to set.
    */
-  void set_myname(const entity_name_t& m) { my_inst.name = m; }
+  void set_myname(const entity_name_t& m) { my_name = m; }
+
   /**
    * Set the unknown address components for this Messenger.
    * This is useful if the Messenger doesn't know its full address just by
@@ -255,7 +246,7 @@ public:
    *
    * @param addr The address to use as a template.
    */
-  virtual void set_addr_unknowns(const entity_addr_t &addr) = 0;
+  virtual bool set_addr_unknowns(const entity_addrvec_t &addrs) = 0;
   /**
    * Set the address for this Messenger. This is useful if the Messenger
    * binds to a specific address but advertises a different address on the
@@ -263,7 +254,7 @@ public:
    *
    * @param addr The address to use.
    */
-  virtual void set_addr(const entity_addr_t &addr) = 0;
+  virtual void set_addrs(const entity_addrvec_t &addr) = 0;
   /// Get the default send priority.
   int get_default_send_priority() { return default_send_priority; }
   /**
@@ -277,11 +268,6 @@ public:
    * (0 if the queue is empty)
    */
   virtual double get_dispatch_queue_max_age(utime_t now) = 0;
-  /**
-   * Get the default crc flags for this messenger.
-   * but not yet dispatched.
-   */
-  static int get_default_crc_flags(md_config_t *);
 
   /**
    * @} // Accessors
@@ -357,7 +343,7 @@ public:
    * @param p The cluster protocol to use. Defined externally.
    */
   void set_default_send_priority(int p) {
-    assert(!started);
+    ceph_assert(!started);
     default_send_priority = p;
   }
   /**
@@ -388,7 +374,7 @@ public:
    *
    * @param d The Dispatcher to insert into the list.
    */
-  void add_dispatcher_head(Dispatcher *d) { 
+  void add_dispatcher_head(Dispatcher *d) {
     bool first = dispatchers.empty();
     dispatchers.push_front(d);
     if (d->ms_can_fast_dispatch_any())
@@ -403,7 +389,7 @@ public:
    *
    * @param d The Dispatcher to insert into the list.
    */
-  void add_dispatcher_tail(Dispatcher *d) { 
+  void add_dispatcher_tail(Dispatcher *d) {
     bool first = dispatchers.empty();
     dispatchers.push_back(d);
     if (d->ms_can_fast_dispatch_any())
@@ -422,6 +408,7 @@ public:
    * we can be more specific about the failure.
    */
   virtual int bind(const entity_addr_t& bind_addr) = 0;
+
   /**
    * This function performs a full restart of the Messenger component,
    * whatever that means.  Other entities who connect to this
@@ -440,6 +427,14 @@ public:
    * @return 0 on success, or -1 on error, or -errno if
    */
   virtual int client_bind(const entity_addr_t& bind_addr) = 0;
+
+  virtual int bindv(const entity_addrvec_t& addrs);
+
+
+  virtual bool should_use_msgr2() {
+    return false;
+  }
+
   /**
    * @} // Configuration
    */
@@ -495,7 +490,26 @@ public:
    *
    * @return 0 on success, or -errno on failure.
    */
-  virtual int send_message(Message *m, const entity_inst_t& dest) = 0;
+  virtual int send_to(
+    Message *m,
+    int type,
+    const entity_addrvec_t& addr) = 0;
+  int send_to_mon(
+    Message *m, const entity_addrvec_t& addrs) {
+    return send_to(m, CEPH_ENTITY_TYPE_MON, addrs);
+  }
+  int send_to_mds(
+    Message *m, const entity_addrvec_t& addrs) {
+    return send_to(m, CEPH_ENTITY_TYPE_MDS, addrs);
+  }
+  int send_to_osd(
+    Message *m, const entity_addrvec_t& addrs) {
+    return send_to(m, CEPH_ENTITY_TYPE_OSD, addrs);
+  }
+  int send_to_mgr(
+    Message *m, const entity_addrvec_t& addrs) {
+    return send_to(m, CEPH_ENTITY_TYPE_MGR, addrs);
+  }
 
   /**
    * @} // Messaging
@@ -512,7 +526,21 @@ public:
    *
    * @param dest The entity to get a connection for.
    */
-  virtual ConnectionRef get_connection(const entity_inst_t& dest) = 0;
+  virtual ConnectionRef connect_to(
+    int type, const entity_addrvec_t& dest) = 0;
+  ConnectionRef connect_to_mon(const entity_addrvec_t& dest) {
+    return connect_to(CEPH_ENTITY_TYPE_MON, dest);
+  }
+  ConnectionRef connect_to_mds(const entity_addrvec_t& dest) {
+    return connect_to(CEPH_ENTITY_TYPE_MDS, dest);
+  }
+  ConnectionRef connect_to_osd(const entity_addrvec_t& dest) {
+    return connect_to(CEPH_ENTITY_TYPE_OSD, dest);
+  }
+  ConnectionRef connect_to_mgr(const entity_addrvec_t& dest) {
+    return connect_to(CEPH_ENTITY_TYPE_MGR, dest);
+  }
+
   /**
    * Get the Connection object associated with ourselves.
    */
@@ -535,6 +563,9 @@ public:
    * @param a The address to mark down.
    */
   virtual void mark_down(const entity_addr_t& a) = 0;
+  virtual void mark_down_addrs(const entity_addrvec_t& a) {
+    mark_down(a.legacy_addr());
+  }
   /**
    * Mark all the existing Connections down. This is equivalent
    * to iterating over all Connections and calling mark_down()
@@ -585,16 +616,16 @@ public:
       } else {
        blocked = true;
        int r = pthread_sigmask(SIG_BLOCK, &pipe_mask, &existing_mask);
-       assert(r == 0);
+       ceph_assert(r == 0);
       }
     }
     ~sigpipe_stopper() {
       if (blocked) {
        struct timespec nowait{0};
        int r = sigtimedwait(&pipe_mask, 0, &nowait);
-       assert(r == EAGAIN || r == 0);
+       ceph_assert(r == EAGAIN || r == 0);
        r = pthread_sigmask(SIG_SETMASK, &existing_mask, 0);
-       assert(r == 0);
+       ceph_assert(r == 0);
       }
     }
   };
@@ -613,11 +644,9 @@ public:
    *
    * @param m The Message we are testing.
    */
-  bool ms_can_fast_dispatch(const Message *m) {
-    for (list<Dispatcher*>::iterator p = fast_dispatchers.begin();
-        p != fast_dispatchers.end();
-        ++p) {
-      if ((*p)->ms_can_fast_dispatch(m))
+  bool ms_can_fast_dispatch(const Message::const_ref& m) {
+    for (const auto &dispatcher : fast_dispatchers) {
+      if (dispatcher->ms_can_fast_dispatch2(m))
        return true;
     }
     return false;
@@ -626,52 +655,49 @@ public:
   /**
    * Deliver a single Message via "fast dispatch".
    *
-   * @param m The Message we are fast dispatching. We take ownership
-   * of one reference to it.
+   * @param m The Message we are fast dispatching.
    * If none of our Dispatchers can handle it, ceph_abort().
    */
-  void ms_fast_dispatch(Message *m) {
+  void ms_fast_dispatch(const Message::ref &m) {
     m->set_dispatch_stamp(ceph_clock_now());
-    for (list<Dispatcher*>::iterator p = fast_dispatchers.begin();
-        p != fast_dispatchers.end();
-        ++p) {
-      if ((*p)->ms_can_fast_dispatch(m)) {
-       (*p)->ms_fast_dispatch(m);
+    for (const auto &dispatcher : fast_dispatchers) {
+      if (dispatcher->ms_can_fast_dispatch2(m)) {
+       dispatcher->ms_fast_dispatch2(m);
        return;
       }
     }
     ceph_abort();
   }
+  void ms_fast_dispatch(Message *m) {
+    return ms_fast_dispatch(Message::ref(m, false)); /* consume ref */
+  }
   /**
    *
    */
-  void ms_fast_preprocess(Message *m) {
-    for (list<Dispatcher*>::iterator p = fast_dispatchers.begin();
-        p != fast_dispatchers.end();
-        ++p) {
-      (*p)->ms_fast_preprocess(m);
+  void ms_fast_preprocess(const Message::ref &m) {
+    for (const auto &dispatcher : fast_dispatchers) {
+      dispatcher->ms_fast_preprocess2(m);
     }
   }
   /**
    *  Deliver a single Message. Send it to each Dispatcher
    *  in sequence until one of them handles it.
-   *  If none of our Dispatchers can handle it, assert(0).
+   *  If none of our Dispatchers can handle it, ceph_abort().
    *
-   *  @param m The Message to deliver. We take ownership of
-   *  one reference to it.
+   *  @param m The Message to deliver.
    */
-  void ms_deliver_dispatch(Message *m) {
+  void ms_deliver_dispatch(const Message::ref &m) {
     m->set_dispatch_stamp(ceph_clock_now());
-    for (list<Dispatcher*>::iterator p = dispatchers.begin();
-        p != dispatchers.end();
-        ++p) {
-      if ((*p)->ms_dispatch(m))
+    for (const auto &dispatcher : dispatchers) {
+      if (dispatcher->ms_dispatch2(m))
        return;
     }
     lsubdout(cct, ms, 0) << "ms_deliver_dispatch: unhandled message " << m << " " << *m << " from "
                         << m->get_source_inst() << dendl;
-    assert(!cct->_conf->ms_die_on_unhandled_msg);
-    m->put();
+    ceph_assert(!cct->_conf->ms_die_on_unhandled_msg);
+  }
+  void ms_deliver_dispatch(Message *m) {
+    return ms_deliver_dispatch(Message::ref(m, false)); /* consume ref */
   }
   /**
    * Notify each Dispatcher of a new Connection. Call
@@ -681,10 +707,9 @@ public:
    * @param con Pointer to the new Connection.
    */
   void ms_deliver_handle_connect(Connection *con) {
-    for (list<Dispatcher*>::iterator p = dispatchers.begin();
-        p != dispatchers.end();
-        ++p)
-      (*p)->ms_handle_connect(con);
+    for (const auto& dispatcher : dispatchers) {
+      dispatcher->ms_handle_connect(con);
+    }
   }
 
   /**
@@ -695,23 +720,21 @@ public:
    * @param con Pointer to the new Connection.
    */
   void ms_deliver_handle_fast_connect(Connection *con) {
-    for (list<Dispatcher*>::iterator p = fast_dispatchers.begin();
-         p != fast_dispatchers.end();
-         ++p)
-      (*p)->ms_handle_fast_connect(con);
+    for (const auto& dispatcher : fast_dispatchers) {
+      dispatcher->ms_handle_fast_connect(con);
+    }
   }
 
   /**
-   * Notify each Dispatcher of a new incomming Connection. Call
+   * Notify each Dispatcher of a new incoming Connection. Call
    * this function whenever a new Connection is accepted.
    *
    * @param con Pointer to the new Connection.
    */
   void ms_deliver_handle_accept(Connection *con) {
-    for (list<Dispatcher*>::iterator p = dispatchers.begin();
-        p != dispatchers.end();
-        ++p)
-      (*p)->ms_handle_accept(con);
+    for (const auto& dispatcher : dispatchers) {
+      dispatcher->ms_handle_accept(con);
+    }
   }
 
   /**
@@ -721,10 +744,9 @@ public:
    * @param con Pointer to the new Connection.
    */
   void ms_deliver_handle_fast_accept(Connection *con) {
-    for (list<Dispatcher*>::iterator p = fast_dispatchers.begin();
-         p != fast_dispatchers.end();
-         ++p)
-      (*p)->ms_handle_fast_accept(con);
+    for (const auto& dispatcher : fast_dispatchers) {
+      dispatcher->ms_handle_fast_accept(con);
+    }
   }
 
   /**
@@ -735,10 +757,8 @@ public:
    * @param con Pointer to the broken Connection.
    */
   void ms_deliver_handle_reset(Connection *con) {
-    for (list<Dispatcher*>::iterator p = dispatchers.begin();
-        p != dispatchers.end();
-        ++p) {
-      if ((*p)->ms_handle_reset(con))
+    for (const auto& dispatcher : dispatchers) {
+      if (dispatcher->ms_handle_reset(con))
        return;
     }
   }
@@ -750,10 +770,9 @@ public:
    * @param con Pointer to the broken Connection.
    */
   void ms_deliver_handle_remote_reset(Connection *con) {
-    for (list<Dispatcher*>::iterator p = dispatchers.begin();
-        p != dispatchers.end();
-        ++p)
-      (*p)->ms_handle_remote_reset(con);
+    for (const auto& dispatcher : dispatchers) {
+      dispatcher->ms_handle_remote_reset(con);
+    }
   }
 
   /**
@@ -765,10 +784,8 @@ public:
    * @param con Pointer to the broken Connection.
    */
   void ms_deliver_handle_refused(Connection *con) {
-    for (list<Dispatcher*>::iterator p = dispatchers.begin();
-         p != dispatchers.end();
-         ++p) {
-      if ((*p)->ms_handle_refused(con))
+    for (const auto& dispatcher : dispatchers) {
+      if (dispatcher->ms_handle_refused(con))
         return;
     }
   }
@@ -780,12 +797,10 @@ public:
    * @param force_new True if we want to wait for new keys, false otherwise.
    * @return A pointer to the AuthAuthorizer, if we have one; NULL otherwise
    */
-  AuthAuthorizer *ms_deliver_get_authorizer(int peer_type, bool force_new) {
+  AuthAuthorizer *ms_deliver_get_authorizer(int peer_type) {
     AuthAuthorizer *a = 0;
-    for (list<Dispatcher*>::iterator p = dispatchers.begin();
-        p != dispatchers.end();
-        ++p) {
-      if ((*p)->ms_get_authorizer(peer_type, &a, force_new))
+    for (const auto& dispatcher : dispatchers) {
+      if (dispatcher->ms_get_authorizer(peer_type, &a))
        return a;
     }
     return NULL;
@@ -804,19 +819,13 @@ public:
    * @return True if we were able to prove or disprove correctness of
    * authorizer, false otherwise.
    */
-  bool ms_deliver_verify_authorizer(Connection *con, int peer_type,
-                                   int protocol, bufferlist& authorizer, bufferlist& authorizer_reply,
-                                   bool& isvalid, CryptoKey& session_key,
-                                   std::unique_ptr<AuthAuthorizerChallenge> *challenge) {
-    for (list<Dispatcher*>::iterator p = dispatchers.begin();
-        p != dispatchers.end();
-        ++p) {
-      if ((*p)->ms_verify_authorizer(con, peer_type, protocol, authorizer, authorizer_reply,
-                                    isvalid, session_key, challenge))
-       return true;
-    }
-    return false;
-  }
+  bool ms_deliver_verify_authorizer(
+    Connection *con, int peer_type,
+    int protocol, bufferlist& authorizer, bufferlist& authorizer_reply,
+    bool& isvalid,
+    CryptoKey& session_key,
+    std::string *connection_secret,
+    std::unique_ptr<AuthAuthorizerChallenge> *challenge);
 
   /**
    * @} // Dispatcher Interfacing