]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/test/msgr/test_msgr.cc
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / test / msgr / test_msgr.cc
index 6c56600787e16137530ae0c34462983ff6618709..cbeee9bb1ac49b521f96b99e86a4dabb1d191596 100644 (file)
@@ -1,4 +1,4 @@
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
 // vim: ts=8 sw=2 smarttab
 /*
  * Ceph - scalable distributed file system
@@ -19,6 +19,8 @@
 #include <unistd.h>
 #include <stdlib.h>
 #include <time.h>
+#include <set>
+#include <list>
 #include "common/Mutex.h"
 #include "common/Cond.h"
 #include "common/ceph_argparse.h"
@@ -39,7 +41,9 @@
 typedef boost::mt11213b gen_type;
 
 #include "common/dout.h"
-#include "include/assert.h"
+#include "include/ceph_assert.h"
+
+#include "auth/DummyAuth.h"
 
 #define dout_subsys ceph_subsys_ms
 #undef dout_prefix
@@ -59,16 +63,24 @@ typedef boost::mt11213b gen_type;
 
 class MessengerTest : public ::testing::TestWithParam<const char*> {
  public:
+  DummyAuthClientServer dummy_auth;
   Messenger *server_msgr;
   Messenger *client_msgr;
 
-  MessengerTest(): server_msgr(NULL), client_msgr(NULL) {}
+  MessengerTest() : dummy_auth(g_ceph_context),
+                   server_msgr(NULL), client_msgr(NULL) {
+    dummy_auth.auth_registry.refresh_config();
+  }
   void SetUp() override {
     lderr(g_ceph_context) << __func__ << " start set up " << GetParam() << dendl;
     server_msgr = Messenger::create(g_ceph_context, string(GetParam()), entity_name_t::OSD(0), "server", getpid(), 0);
     client_msgr = Messenger::create(g_ceph_context, string(GetParam()), entity_name_t::CLIENT(-1), "client", getpid(), 0);
     server_msgr->set_default_policy(Messenger::Policy::stateless_server(0));
     client_msgr->set_default_policy(Messenger::Policy::lossy_client(0));
+    server_msgr->set_auth_client(&dummy_auth);
+    server_msgr->set_auth_server(&dummy_auth);
+    client_msgr->set_auth_client(&dummy_auth);
+    client_msgr->set_auth_server(&dummy_auth);
   }
   void TearDown() override {
     ASSERT_EQ(server_msgr->get_dispatch_queue_len(), 0);
@@ -98,10 +110,14 @@ class FakeDispatcher : public Dispatcher {
   bool got_remote_reset;
   bool got_connect;
   bool loopback;
+  entity_addrvec_t last_accept;
 
   explicit FakeDispatcher(bool s): Dispatcher(g_ceph_context), lock("FakeDispatcher::lock"),
                           is_server(s), got_new(false), got_remote_reset(false),
-                          got_connect(false), loopback(false) {}
+                          got_connect(false), loopback(false) {
+    // don't need authorizers
+    ms_set_require_authorizer(false);
+  }
   bool ms_can_fast_dispatch_any() const override { return true; }
   bool ms_can_fast_dispatch(const Message *m) const override {
     switch (m->get_type()) {
@@ -115,32 +131,31 @@ class FakeDispatcher : public Dispatcher {
   void ms_handle_fast_connect(Connection *con) override {
     lock.Lock();
     lderr(g_ceph_context) << __func__ << " " << con << dendl;
-    Session *s = static_cast<Session*>(con->get_priv());
+    auto s = con->get_priv();
     if (!s) {
-      s = new Session(con);
-      con->set_priv(s->get());
-      lderr(g_ceph_context) << __func__ << " con: " << con << " count: " << s->count << dendl;
+      auto session = new Session(con);
+      con->set_priv(RefCountedPtr{session, false});
+      lderr(g_ceph_context) << __func__ << " con: " << con
+                           << " count: " << session->count << dendl;
     }
-    s->put();
     got_connect = true;
     cond.Signal();
     lock.Unlock();
   }
   void ms_handle_fast_accept(Connection *con) override {
-    Session *s = static_cast<Session*>(con->get_priv());
-    if (!s) {
-      s = new Session(con);
-      con->set_priv(s->get());
+    last_accept = con->get_peer_addrs();
+    if (!con->get_priv()) {
+      con->set_priv(RefCountedPtr{new Session(con), false});
     }
-    s->put();
   }
   bool ms_dispatch(Message *m) override {
-    Session *s = static_cast<Session*>(m->get_connection()->get_priv());
+    auto priv = m->get_connection()->get_priv();
+    auto s = static_cast<Session*>(priv.get());
     if (!s) {
       s = new Session(m->get_connection());
-      m->get_connection()->set_priv(s->get());
+      priv.reset(s, false);
+      m->get_connection()->set_priv(priv);
     }
-    s->put();
     s->count++;
     lderr(g_ceph_context) << __func__ << " conn: " << m->get_connection() << " session " << s << " count: " << s->count << dendl;
     if (is_server) {
@@ -155,22 +170,20 @@ class FakeDispatcher : public Dispatcher {
   bool ms_handle_reset(Connection *con) override {
     Mutex::Locker l(lock);
     lderr(g_ceph_context) << __func__ << " " << con << dendl;
-    Session *s = static_cast<Session*>(con->get_priv());
-    if (s) {
-      s->con.reset(NULL);  // break con <-> session ref cycle
-      con->set_priv(NULL);   // break ref <-> session cycle, if any
-      s->put();
+    auto priv = con->get_priv();
+    if (auto s = static_cast<Session*>(priv.get()); s) {
+      s->con.reset();  // break con <-> session ref cycle
+      con->set_priv(nullptr);   // break ref <-> session cycle, if any
     }
     return true;
   }
   void ms_handle_remote_reset(Connection *con) override {
     Mutex::Locker l(lock);
     lderr(g_ceph_context) << __func__ << " " << con << dendl;
-    Session *s = static_cast<Session*>(con->get_priv());
-    if (s) {
-      s->con.reset(NULL);  // break con <-> session ref cycle
-      con->set_priv(NULL);   // break ref <-> session cycle, if any
-      s->put();
+    auto priv = con->get_priv();
+    if (auto s = static_cast<Session*>(priv.get()); s) {
+      s->con.reset();  // break con <-> session ref cycle
+      con->set_priv(nullptr);   // break ref <-> session cycle, if any
     }
     got_remote_reset = true;
     cond.Signal();
@@ -179,21 +192,22 @@ class FakeDispatcher : public Dispatcher {
     return false;
   }
   void ms_fast_dispatch(Message *m) override {
-    Session *s = static_cast<Session*>(m->get_connection()->get_priv());
+    auto priv = m->get_connection()->get_priv();
+    auto s = static_cast<Session*>(priv.get());
     if (!s) {
       s = new Session(m->get_connection());
-      m->get_connection()->set_priv(s->get());
+      priv.reset(s, false);
+      m->get_connection()->set_priv(priv);
     }
-    s->put();
     s->count++;
     lderr(g_ceph_context) << __func__ << " conn: " << m->get_connection() << " session " << s << " count: " << s->count << dendl;
     if (is_server) {
       if (loopback)
-        assert(m->get_source().is_osd());
+        ceph_assert(m->get_source().is_osd());
       else
         reply_message(m);
     } else if (loopback) {
-      assert(m->get_source().is_client());
+      ceph_assert(m->get_source().is_client());
     }
     m->put();
     Mutex::Locker l(lock);
@@ -201,12 +215,8 @@ class FakeDispatcher : public Dispatcher {
     cond.Signal();
   }
 
-  bool ms_verify_authorizer(Connection *con, int peer_type, int protocol,
-                            bufferlist& authorizer, bufferlist& authorizer_reply,
-                            bool& isvalid, CryptoKey& session_key,
-                           std::unique_ptr<AuthAuthorizerChallenge> *challenge) override {
-    isvalid = true;
-    return true;
+  int ms_handle_authentication(Connection *con) override {
+    return 1;
   }
 
   void reply_message(Message *m) {
@@ -217,10 +227,600 @@ class FakeDispatcher : public Dispatcher {
 
 typedef FakeDispatcher::Session Session;
 
+struct TestInterceptor : public Interceptor {
+
+  bool step_waiting = false;
+  bool waiting = true;
+  std::map<Connection *, uint32_t> current_step;
+  std::map<Connection *, std::list<uint32_t>> step_history;
+  std::map<uint32_t, std::optional<ACTION>> decisions;
+  std::set<uint32_t> breakpoints;
+
+  uint32_t count_step(Connection *conn, uint32_t step) {
+    uint32_t count = 0;
+    for (auto s : step_history[conn]) {
+      if (s == step) {
+        count++;
+      }
+    }
+    return count;
+  }
+
+  void breakpoint(uint32_t step) {
+    breakpoints.insert(step);
+  }
+
+  void remove_bp(uint32_t step) {
+    breakpoints.erase(step);
+  }
+
+  Connection *wait(uint32_t step, Connection *conn=nullptr) {
+    std::unique_lock<std::mutex> l(lock);
+    while(true) {
+      if (conn) {
+        auto it = current_step.find(conn);
+        if (it != current_step.end()) {
+          if (it->second == step) {
+            break;
+          }
+        }
+      } else {
+        for (auto it : current_step) {
+          if (it.second == step) {
+            conn = it.first;
+            break; 
+          }
+        }
+        if (conn) {
+          break;
+        }
+      }
+      step_waiting = true;
+      cond_var.wait(l);
+    }
+    step_waiting = false;
+    return conn;
+  }
+
+  ACTION wait_for_decision(uint32_t step, std::unique_lock<std::mutex> &l) {
+    if (decisions[step]) {
+      return *(decisions[step]);
+    }
+    waiting = true;
+    while(waiting) {
+      cond_var.wait(l);
+    }
+    return *(decisions[step]);
+  }
+
+  void proceed(uint32_t step, ACTION decision) {
+    std::unique_lock<std::mutex> l(lock);
+    decisions[step] = decision;
+    if (waiting) {
+      waiting = false;
+      cond_var.notify_one();
+    }
+  }
+
+  ACTION intercept(Connection *conn, uint32_t step) override {
+    lderr(g_ceph_context) << __func__ << " conn(" << conn 
+                          << ") intercept called on step=" << step << dendl;
+
+    {
+      std::unique_lock<std::mutex> l(lock);
+      step_history[conn].push_back(step);
+      current_step[conn] = step;  
+      if (step_waiting) {
+        cond_var.notify_one();
+      }
+    }
+
+    std::unique_lock<std::mutex> l(lock);
+    ACTION decision = ACTION::CONTINUE;
+    if (breakpoints.find(step) != breakpoints.end()) {
+      lderr(g_ceph_context) << __func__ << " conn(" << conn 
+                            << ") pausing on step=" << step << dendl;
+      decision = wait_for_decision(step, l);
+    } else {
+      if (decisions[step]) {
+        decision = *(decisions[step]);
+      }
+    }
+    lderr(g_ceph_context) << __func__ << " conn(" << conn 
+                          << ") resuming step=" << step << " with decision=" 
+                          << decision << dendl;
+    decisions[step].reset();
+    return decision;
+  }
+
+};
+
+/**
+ * Scenario: A connects to B, and B connects to A at the same time.
+ */ 
+TEST_P(MessengerTest, ConnectionRaceTest) {
+  if (string(GetParam()) == "simple") {
+    return;
+  }
+
+  FakeDispatcher cli_dispatcher(false), srv_dispatcher(false);
+
+  TestInterceptor *cli_interceptor = new TestInterceptor();
+  TestInterceptor *srv_interceptor = new TestInterceptor();
+
+  server_msgr->set_policy(entity_name_t::TYPE_CLIENT, Messenger::Policy::lossless_peer_reuse(0));
+  server_msgr->interceptor = srv_interceptor;
+
+  client_msgr->set_policy(entity_name_t::TYPE_OSD, Messenger::Policy::lossless_peer_reuse(0));
+  client_msgr->interceptor = cli_interceptor;
+
+  entity_addr_t bind_addr;
+  bind_addr.parse("v2:127.0.0.1:3300");
+  server_msgr->bind(bind_addr);
+  server_msgr->add_dispatcher_head(&srv_dispatcher);
+  server_msgr->start();
+
+  bind_addr.parse("v2:127.0.0.1:3301");
+  client_msgr->bind(bind_addr);
+  client_msgr->add_dispatcher_head(&cli_dispatcher);
+  client_msgr->start();
+
+  // pause before sending client_ident message
+  cli_interceptor->breakpoint(11);
+  // pause before sending client_ident message
+  srv_interceptor->breakpoint(11);
+
+  ConnectionRef c2s = client_msgr->connect_to(server_msgr->get_mytype(),
+                                                                           server_msgr->get_myaddrs());
+  MPing *m1 = new MPing();
+  ASSERT_EQ(c2s->send_message(m1), 0);
+
+  ConnectionRef s2c = server_msgr->connect_to(client_msgr->get_mytype(),
+                                                                           client_msgr->get_myaddrs());
+  MPing *m2 = new MPing();
+  ASSERT_EQ(s2c->send_message(m2), 0);
+
+  cli_interceptor->wait(11, c2s.get());
+  srv_interceptor->wait(11, s2c.get());
+
+  // at this point both connections (A->B, B->A) are paused just before sending
+  // the client_ident message.
+
+  cli_interceptor->remove_bp(11);
+  srv_interceptor->remove_bp(11);
+
+  cli_interceptor->proceed(11, Interceptor::ACTION::CONTINUE);
+  srv_interceptor->proceed(11, Interceptor::ACTION::CONTINUE);
+
+  {
+    Mutex::Locker l(cli_dispatcher.lock);
+    while (!cli_dispatcher.got_new)
+      cli_dispatcher.cond.Wait(cli_dispatcher.lock);
+    cli_dispatcher.got_new = false;
+  }
+
+  {
+    Mutex::Locker l(srv_dispatcher.lock);
+    while (!srv_dispatcher.got_new)
+      srv_dispatcher.cond.Wait(srv_dispatcher.lock);
+    srv_dispatcher.got_new = false;
+  }
+  
+  ASSERT_TRUE(s2c->is_connected());
+  ASSERT_EQ(1u, static_cast<Session*>(s2c->get_priv().get())->get_count());
+  ASSERT_TRUE(s2c->peer_is_client());
+
+  ASSERT_TRUE(c2s->is_connected());
+  ASSERT_EQ(1u, static_cast<Session*>(c2s->get_priv().get())->get_count());
+  ASSERT_TRUE(c2s->peer_is_osd());
+
+  client_msgr->shutdown();
+  client_msgr->wait();
+  server_msgr->shutdown();
+  server_msgr->wait();
+
+  delete cli_interceptor;
+  delete srv_interceptor;
+}
+
+/**
+ * Scenario:
+ *    - A connects to B
+ *    - A sends client_ident to B
+ *    - B fails before sending server_ident to A
+ *    - A reconnects
+ */ 
+TEST_P(MessengerTest, MissingServerIdenTest) {
+  if (string(GetParam()) == "simple") {
+    return;
+  }
+
+  FakeDispatcher cli_dispatcher(false), srv_dispatcher(false);
+
+  TestInterceptor *cli_interceptor = new TestInterceptor();
+  TestInterceptor *srv_interceptor = new TestInterceptor();
+
+  server_msgr->set_policy(entity_name_t::TYPE_CLIENT, Messenger::Policy::stateful_server(0));
+  server_msgr->interceptor = srv_interceptor;
+
+  client_msgr->set_policy(entity_name_t::TYPE_OSD, Messenger::Policy::lossy_client(0));
+  client_msgr->interceptor = cli_interceptor;
+
+  entity_addr_t bind_addr;
+  bind_addr.parse("v2:127.0.0.1:3300");
+  server_msgr->bind(bind_addr);
+  server_msgr->add_dispatcher_head(&srv_dispatcher);
+  server_msgr->start();
+
+  bind_addr.parse("v2:127.0.0.1:3301");
+  client_msgr->bind(bind_addr);
+  client_msgr->add_dispatcher_head(&cli_dispatcher);
+  client_msgr->start();
+
+  // pause before sending client_ident message
+  srv_interceptor->breakpoint(12);
+
+  ConnectionRef c2s = client_msgr->connect_to(server_msgr->get_mytype(),
+                                                                           server_msgr->get_myaddrs());
+  MPing *m1 = new MPing();
+  ASSERT_EQ(c2s->send_message(m1), 0);
+
+  Connection *c2s_accepter = srv_interceptor->wait(12);
+  srv_interceptor->remove_bp(12);
+
+  // We inject a message from this side of the connection to force it to be
+  // in standby when we inject the failure below
+  MPing *m2 = new MPing();
+  ASSERT_EQ(c2s_accepter->send_message(m2), 0);
+
+  srv_interceptor->proceed(12, Interceptor::ACTION::FAIL);
+
+  {
+    Mutex::Locker l(srv_dispatcher.lock);
+    while (!srv_dispatcher.got_new)
+      srv_dispatcher.cond.Wait(srv_dispatcher.lock);
+    srv_dispatcher.got_new = false;
+  }
+
+  {
+    Mutex::Locker l(cli_dispatcher.lock);
+    while (!cli_dispatcher.got_new)
+      cli_dispatcher.cond.Wait(cli_dispatcher.lock);
+    cli_dispatcher.got_new = false;
+  }
+  
+  ASSERT_TRUE(c2s->is_connected());
+  ASSERT_EQ(1u, static_cast<Session*>(c2s->get_priv().get())->get_count());
+  ASSERT_TRUE(c2s->peer_is_osd());
+
+  ASSERT_TRUE(c2s_accepter->is_connected());
+  ASSERT_EQ(1u, static_cast<Session*>(c2s_accepter->get_priv().get())->get_count());
+  ASSERT_TRUE(c2s_accepter->peer_is_client());
+
+  client_msgr->shutdown();
+  client_msgr->wait();
+  server_msgr->shutdown();
+  server_msgr->wait();
+
+  delete cli_interceptor;
+  delete srv_interceptor;
+}
+
+/**
+ * Scenario:
+ *    - A connects to B
+ *    - A sends client_ident to B
+ *    - B fails before sending server_ident to A
+ *    - A goes to standby
+ *    - B reconnects to A
+ */ 
+TEST_P(MessengerTest, MissingServerIdenTest2) {
+  if (string(GetParam()) == "simple") {
+    return;
+  }
+
+  FakeDispatcher cli_dispatcher(false), srv_dispatcher(false);
+
+  TestInterceptor *cli_interceptor = new TestInterceptor();
+  TestInterceptor *srv_interceptor = new TestInterceptor();
+
+  server_msgr->set_policy(entity_name_t::TYPE_CLIENT, Messenger::Policy::lossless_peer(0));
+  server_msgr->interceptor = srv_interceptor;
+
+  client_msgr->set_policy(entity_name_t::TYPE_OSD, Messenger::Policy::lossless_peer(0));
+  client_msgr->interceptor = cli_interceptor;
+
+  entity_addr_t bind_addr;
+  bind_addr.parse("v2:127.0.0.1:3300");
+  server_msgr->bind(bind_addr);
+  server_msgr->add_dispatcher_head(&srv_dispatcher);
+  server_msgr->start();
+
+  bind_addr.parse("v2:127.0.0.1:3301");
+  client_msgr->bind(bind_addr);
+  client_msgr->add_dispatcher_head(&cli_dispatcher);
+  client_msgr->start();
+
+  // pause before sending client_ident message
+  srv_interceptor->breakpoint(12);
+
+  ConnectionRef c2s = client_msgr->connect_to(server_msgr->get_mytype(),
+                                                                           server_msgr->get_myaddrs());
+
+  Connection *c2s_accepter = srv_interceptor->wait(12);
+  srv_interceptor->remove_bp(12);
+
+  // We inject a message from this side of the connection to force it to be
+  // in standby when we inject the failure below
+  MPing *m2 = new MPing();
+  ASSERT_EQ(c2s_accepter->send_message(m2), 0);
+
+  srv_interceptor->proceed(12, Interceptor::ACTION::FAIL);
+
+  {
+    Mutex::Locker l(cli_dispatcher.lock);
+    while (!cli_dispatcher.got_new)
+      cli_dispatcher.cond.Wait(cli_dispatcher.lock);
+    cli_dispatcher.got_new = false;
+  }
+  
+  ASSERT_TRUE(c2s->is_connected());
+  ASSERT_EQ(1u, static_cast<Session*>(c2s->get_priv().get())->get_count());
+  ASSERT_TRUE(c2s->peer_is_osd());
+
+  ASSERT_TRUE(c2s_accepter->is_connected());
+  ASSERT_EQ(0u, static_cast<Session*>(c2s_accepter->get_priv().get())->get_count());
+  ASSERT_TRUE(c2s_accepter->peer_is_client());
+
+  client_msgr->shutdown();
+  client_msgr->wait();
+  server_msgr->shutdown();
+  server_msgr->wait();
+
+  delete cli_interceptor;
+  delete srv_interceptor;
+}
+
+/**
+ * Scenario:
+ *    - A connects to B
+ *    - A and B exchange messages
+ *    - A fails
+ *    - B goes into standby
+ *    - A reconnects
+ */ 
+TEST_P(MessengerTest, ReconnectTest) {
+  if (string(GetParam()) == "simple") {
+    return;
+  }
+
+  FakeDispatcher cli_dispatcher(false), srv_dispatcher(true);
+
+  TestInterceptor *cli_interceptor = new TestInterceptor();
+  TestInterceptor *srv_interceptor = new TestInterceptor();
+
+  server_msgr->set_policy(entity_name_t::TYPE_CLIENT, Messenger::Policy::stateful_server(0));
+  server_msgr->interceptor = srv_interceptor;
+
+  client_msgr->set_policy(entity_name_t::TYPE_OSD, Messenger::Policy::lossless_peer(0));
+  client_msgr->interceptor = cli_interceptor;
+
+  entity_addr_t bind_addr;
+  bind_addr.parse("v2:127.0.0.1:3300");
+  server_msgr->bind(bind_addr);
+  server_msgr->add_dispatcher_head(&srv_dispatcher);
+  server_msgr->start();
+
+  bind_addr.parse("v2:127.0.0.1:3301");
+  client_msgr->bind(bind_addr);
+  client_msgr->add_dispatcher_head(&cli_dispatcher);
+  client_msgr->start();
+
+  ConnectionRef c2s = client_msgr->connect_to(server_msgr->get_mytype(),
+                                                                           server_msgr->get_myaddrs());
+
+  MPing *m1 = new MPing();
+  ASSERT_EQ(c2s->send_message(m1), 0);
+
+  {
+    Mutex::Locker l(cli_dispatcher.lock);
+    while (!cli_dispatcher.got_new)
+      cli_dispatcher.cond.Wait(cli_dispatcher.lock);
+    cli_dispatcher.got_new = false;
+  }
+
+  ASSERT_TRUE(c2s->is_connected());
+  ASSERT_EQ(1u, static_cast<Session*>(c2s->get_priv().get())->get_count());
+  ASSERT_TRUE(c2s->peer_is_osd());
+
+  cli_interceptor->breakpoint(16);
+  
+  MPing *m2 = new MPing();
+  ASSERT_EQ(c2s->send_message(m2), 0);
+
+  cli_interceptor->wait(16, c2s.get());
+  cli_interceptor->remove_bp(16);
+
+  // at this point client and server are connected together
+
+  srv_interceptor->breakpoint(15);
+
+  // failing client
+  cli_interceptor->proceed(16, Interceptor::ACTION::FAIL);
+
+  MPing *m3 = new MPing();
+  ASSERT_EQ(c2s->send_message(m3), 0);
+
+  Connection *c2s_accepter = srv_interceptor->wait(15);
+  // the srv end of theconnection is now paused at ready
+  // this means that the reconnect was successful
+  srv_interceptor->remove_bp(15);
+
+  ASSERT_TRUE(c2s_accepter->peer_is_client());
+  // c2s_accepter sent 0 reconnect messages
+  ASSERT_EQ(srv_interceptor->count_step(c2s_accepter, 13), 0u);
+  // c2s_accepter sent 1 reconnect_ok messages
+  ASSERT_EQ(srv_interceptor->count_step(c2s_accepter, 14), 1u);
+  // c2s sent 1 reconnect messages
+  ASSERT_EQ(cli_interceptor->count_step(c2s.get(), 13), 1u);
+  // c2s sent 0 reconnect_ok messages
+  ASSERT_EQ(cli_interceptor->count_step(c2s.get(), 14), 0u);
+
+  srv_interceptor->proceed(15, Interceptor::ACTION::CONTINUE);
+
+  {
+    Mutex::Locker l(cli_dispatcher.lock);
+    while (!cli_dispatcher.got_new)
+      cli_dispatcher.cond.Wait(cli_dispatcher.lock);
+    cli_dispatcher.got_new = false;
+  }
+
+  client_msgr->shutdown();
+  client_msgr->wait();
+  server_msgr->shutdown();
+  server_msgr->wait();
+
+  delete cli_interceptor;
+  delete srv_interceptor;
+}
+
+/**
+ * Scenario:
+ *    - A connects to B
+ *    - A and B exchange messages
+ *    - A fails
+ *    - A reconnects // B reconnects
+ */ 
+TEST_P(MessengerTest, ReconnectRaceTest) {
+  if (string(GetParam()) == "simple") {
+    return;
+  }
+
+  FakeDispatcher cli_dispatcher(false), srv_dispatcher(true);
+
+  TestInterceptor *cli_interceptor = new TestInterceptor();
+  TestInterceptor *srv_interceptor = new TestInterceptor();
+
+  server_msgr->set_policy(entity_name_t::TYPE_CLIENT, Messenger::Policy::lossless_peer(0));
+  server_msgr->interceptor = srv_interceptor;
+
+  client_msgr->set_policy(entity_name_t::TYPE_OSD, Messenger::Policy::lossless_peer(0));
+  client_msgr->interceptor = cli_interceptor;
+
+  entity_addr_t bind_addr;
+  bind_addr.parse("v2:127.0.0.1:3300");
+  server_msgr->bind(bind_addr);
+  server_msgr->add_dispatcher_head(&srv_dispatcher);
+  server_msgr->start();
+
+  bind_addr.parse("v2:127.0.0.1:3301");
+  client_msgr->bind(bind_addr);
+  client_msgr->add_dispatcher_head(&cli_dispatcher);
+  client_msgr->start();
+
+  ConnectionRef c2s = client_msgr->connect_to(server_msgr->get_mytype(),
+                                                                           server_msgr->get_myaddrs());
+
+  MPing *m1 = new MPing();
+  ASSERT_EQ(c2s->send_message(m1), 0);
+
+  {
+    Mutex::Locker l(cli_dispatcher.lock);
+    while (!cli_dispatcher.got_new)
+      cli_dispatcher.cond.Wait(cli_dispatcher.lock);
+    cli_dispatcher.got_new = false;
+  }
+
+  ASSERT_TRUE(c2s->is_connected());
+  ASSERT_EQ(1u, static_cast<Session*>(c2s->get_priv().get())->get_count());
+  ASSERT_TRUE(c2s->peer_is_osd());
+
+  cli_interceptor->breakpoint(16);
+  
+  MPing *m2 = new MPing();
+  ASSERT_EQ(c2s->send_message(m2), 0);
+
+  cli_interceptor->wait(16, c2s.get());
+  cli_interceptor->remove_bp(16);
+
+  // at this point client and server are connected together
+
+  // force both client and server to race on reconnect
+  cli_interceptor->breakpoint(13);
+  srv_interceptor->breakpoint(13);
+
+  // failing client
+  // this will cause both client and server to reconnect at the same time
+  cli_interceptor->proceed(16, Interceptor::ACTION::FAIL);
+
+  MPing *m3 = new MPing();
+  ASSERT_EQ(c2s->send_message(m3), 0);
+
+  cli_interceptor->wait(13, c2s.get());
+  srv_interceptor->wait(13);
+
+  cli_interceptor->remove_bp(13);
+  srv_interceptor->remove_bp(13);
+
+  // pause on "ready"
+  srv_interceptor->breakpoint(15);
+
+  cli_interceptor->proceed(13, Interceptor::ACTION::CONTINUE);
+  srv_interceptor->proceed(13, Interceptor::ACTION::CONTINUE);
+
+  Connection *c2s_accepter = srv_interceptor->wait(15);
+
+  // the server has reconnected and is "ready"
+  srv_interceptor->remove_bp(15);
+
+  ASSERT_TRUE(c2s_accepter->peer_is_client());
+  ASSERT_TRUE(c2s->peer_is_osd());
+  
+  // the server should win the reconnect race
+
+  // c2s_accepter sent 1 or 2 reconnect messages
+  ASSERT_LT(srv_interceptor->count_step(c2s_accepter, 13), 3u);
+  ASSERT_GT(srv_interceptor->count_step(c2s_accepter, 13), 0u);
+  // c2s_accepter sent 0 reconnect_ok messages
+  ASSERT_EQ(srv_interceptor->count_step(c2s_accepter, 14), 0u);
+  // c2s sent 1 reconnect messages
+  ASSERT_EQ(cli_interceptor->count_step(c2s.get(), 13), 1u);
+  // c2s sent 1 reconnect_ok messages
+  ASSERT_EQ(cli_interceptor->count_step(c2s.get(), 14), 1u);
+
+  if (srv_interceptor->count_step(c2s_accepter, 13) == 2) {
+    // if the server send the reconnect message two times then
+    // the client must have sent a session retry message to the server
+    ASSERT_EQ(cli_interceptor->count_step(c2s.get(), 18), 1u);
+  } else {
+    ASSERT_EQ(cli_interceptor->count_step(c2s.get(), 18), 0u);
+  }
+
+  srv_interceptor->proceed(15, Interceptor::ACTION::CONTINUE);
+
+  {
+    Mutex::Locker l(cli_dispatcher.lock);
+    while (!cli_dispatcher.got_new)
+      cli_dispatcher.cond.Wait(cli_dispatcher.lock);
+    cli_dispatcher.got_new = false;
+  }
+
+  client_msgr->shutdown();
+  client_msgr->wait();
+  server_msgr->shutdown();
+  server_msgr->wait();
+
+  delete cli_interceptor;
+  delete srv_interceptor;
+}
+
 TEST_P(MessengerTest, SimpleTest) {
   FakeDispatcher cli_dispatcher(false), srv_dispatcher(true);
   entity_addr_t bind_addr;
-  bind_addr.parse("127.0.0.1");
+  if (string(GetParam()) == "simple")
+    bind_addr.parse("v1:127.0.0.1");
+  else
+    bind_addr.parse("v2:127.0.0.1");
   server_msgr->bind(bind_addr);
   server_msgr->add_dispatcher_head(&srv_dispatcher);
   server_msgr->start();
@@ -230,7 +830,8 @@ TEST_P(MessengerTest, SimpleTest) {
 
   // 1. simple round trip
   MPing *m = new MPing();
-  ConnectionRef conn = client_msgr->get_connection(server_msgr->get_myinst());
+  ConnectionRef conn = client_msgr->connect_to(server_msgr->get_mytype(),
+                                              server_msgr->get_myaddrs());
   {
     ASSERT_EQ(conn->send_message(m), 0);
     Mutex::Locker l(cli_dispatcher.lock);
@@ -239,17 +840,23 @@ TEST_P(MessengerTest, SimpleTest) {
     cli_dispatcher.got_new = false;
   }
   ASSERT_TRUE(conn->is_connected());
-  ASSERT_TRUE((static_cast<Session*>(conn->get_priv()))->get_count() == 1);
+  ASSERT_EQ(1u, static_cast<Session*>(conn->get_priv().get())->get_count());
   ASSERT_TRUE(conn->peer_is_osd());
 
   // 2. test rebind port
   set<int> avoid_ports;
-  for (int i = 0; i < 10 ; i++)
-    avoid_ports.insert(server_msgr->get_myaddr().get_port() + i);
+  for (int i = 0; i < 10 ; i++) {
+    for (auto a : server_msgr->get_myaddrs().v) {
+      avoid_ports.insert(a.get_port() + i);
+    }
+  }
   server_msgr->rebind(avoid_ports);
-  ASSERT_TRUE(avoid_ports.count(server_msgr->get_myaddr().get_port()) == 0);
+  for (auto a : server_msgr->get_myaddrs().v) {
+    ASSERT_TRUE(avoid_ports.count(a.get_port()) == 0);
+  }
 
-  conn = client_msgr->get_connection(server_msgr->get_myinst());
+  conn = client_msgr->connect_to(server_msgr->get_mytype(),
+                                server_msgr->get_myaddrs());
   {
     m = new MPing();
     ASSERT_EQ(conn->send_message(m), 0);
@@ -258,7 +865,7 @@ TEST_P(MessengerTest, SimpleTest) {
       cli_dispatcher.cond.Wait(cli_dispatcher.lock);
     cli_dispatcher.got_new = false;
   }
-  ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1);
+  ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
 
   // 3. test markdown connection
   conn->mark_down();
@@ -285,7 +892,96 @@ TEST_P(MessengerTest, SimpleTest) {
     cli_dispatcher.got_new = false;
   }
   srv_dispatcher.loopback = false;
-  ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1);
+  ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
+  client_msgr->shutdown();
+  client_msgr->wait();
+  server_msgr->shutdown();
+  server_msgr->wait();
+}
+
+TEST_P(MessengerTest, SimpleMsgr2Test) {
+  FakeDispatcher cli_dispatcher(false), srv_dispatcher(true);
+  entity_addr_t legacy_addr;
+  legacy_addr.parse("v1:127.0.0.1");
+  entity_addr_t msgr2_addr;
+  msgr2_addr.parse("v2:127.0.0.1");
+  entity_addrvec_t bind_addrs;
+  bind_addrs.v.push_back(legacy_addr);
+  bind_addrs.v.push_back(msgr2_addr);
+  server_msgr->bindv(bind_addrs);
+  server_msgr->add_dispatcher_head(&srv_dispatcher);
+  server_msgr->start();
+
+  client_msgr->add_dispatcher_head(&cli_dispatcher);
+  client_msgr->start();
+
+  // 1. simple round trip
+  MPing *m = new MPing();
+  ConnectionRef conn = client_msgr->connect_to(
+    server_msgr->get_mytype(),
+    server_msgr->get_myaddrs());
+  {
+    ASSERT_EQ(conn->send_message(m), 0);
+    Mutex::Locker l(cli_dispatcher.lock);
+    while (!cli_dispatcher.got_new)
+      cli_dispatcher.cond.Wait(cli_dispatcher.lock);
+    cli_dispatcher.got_new = false;
+  }
+  ASSERT_TRUE(conn->is_connected());
+  ASSERT_EQ(1u, static_cast<Session*>(conn->get_priv().get())->get_count());
+  ASSERT_TRUE(conn->peer_is_osd());
+
+  // 2. test rebind port
+  set<int> avoid_ports;
+  for (int i = 0; i < 10 ; i++) {
+    for (auto a : server_msgr->get_myaddrs().v) {
+      avoid_ports.insert(a.get_port() + i);
+    }
+  }
+  server_msgr->rebind(avoid_ports);
+  for (auto a : server_msgr->get_myaddrs().v) {
+    ASSERT_TRUE(avoid_ports.count(a.get_port()) == 0);
+  }
+
+  conn = client_msgr->connect_to(
+       server_msgr->get_mytype(),
+       server_msgr->get_myaddrs());
+  {
+    m = new MPing();
+    ASSERT_EQ(conn->send_message(m), 0);
+    Mutex::Locker l(cli_dispatcher.lock);
+    while (!cli_dispatcher.got_new)
+      cli_dispatcher.cond.Wait(cli_dispatcher.lock);
+    cli_dispatcher.got_new = false;
+  }
+  ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
+
+  // 3. test markdown connection
+  conn->mark_down();
+  ASSERT_FALSE(conn->is_connected());
+
+  // 4. test failed connection
+  server_msgr->shutdown();
+  server_msgr->wait();
+
+  m = new MPing();
+  conn->send_message(m);
+  CHECK_AND_WAIT_TRUE(!conn->is_connected());
+  ASSERT_FALSE(conn->is_connected());
+
+  // 5. loopback connection
+  srv_dispatcher.loopback = true;
+  conn = client_msgr->get_loopback_connection();
+  {
+    m = new MPing();
+    ASSERT_EQ(conn->send_message(m), 0);
+    Mutex::Locker l(cli_dispatcher.lock);
+    while (!cli_dispatcher.got_new)
+      cli_dispatcher.cond.Wait(cli_dispatcher.lock);
+    cli_dispatcher.got_new = false;
+  }
+  srv_dispatcher.loopback = false;
+  ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
   client_msgr->shutdown();
   client_msgr->wait();
   server_msgr->shutdown();
@@ -295,7 +991,10 @@ TEST_P(MessengerTest, SimpleTest) {
 TEST_P(MessengerTest, NameAddrTest) {
   FakeDispatcher cli_dispatcher(false), srv_dispatcher(true);
   entity_addr_t bind_addr;
-  bind_addr.parse("127.0.0.1");
+  if (string(GetParam()) == "simple")
+    bind_addr.parse("v1:127.0.0.1");
+  else
+    bind_addr.parse("v2:127.0.0.1");
   server_msgr->bind(bind_addr);
   server_msgr->add_dispatcher_head(&srv_dispatcher);
   server_msgr->start();
@@ -304,7 +1003,8 @@ TEST_P(MessengerTest, NameAddrTest) {
   client_msgr->start();
 
   MPing *m = new MPing();
-  ConnectionRef conn = client_msgr->get_connection(server_msgr->get_myinst());
+  ConnectionRef conn = client_msgr->connect_to(server_msgr->get_mytype(),
+                                              server_msgr->get_myaddrs());
   {
     ASSERT_EQ(conn->send_message(m), 0);
     Mutex::Locker l(cli_dispatcher.lock);
@@ -312,12 +1012,13 @@ TEST_P(MessengerTest, NameAddrTest) {
       cli_dispatcher.cond.Wait(cli_dispatcher.lock);
     cli_dispatcher.got_new = false;
   }
-  ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1);
-  ASSERT_TRUE(conn->get_peer_addr() == server_msgr->get_myaddr());
-  ConnectionRef server_conn = server_msgr->get_connection(client_msgr->get_myinst());
-  // Make should server_conn is the one we already accepted from client,
-  // so it means client_msgr has the same addr when server connection has
-  ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1);
+  ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
+  ASSERT_TRUE(conn->get_peer_addrs() == server_msgr->get_myaddrs());
+  ConnectionRef server_conn = server_msgr->connect_to(
+    client_msgr->get_mytype(), srv_dispatcher.last_accept);
+  // Verify that server_conn is the one we already accepted from client,
+  // so it means the session counter in server_conn is also incremented.
+  ASSERT_EQ(1U, static_cast<Session*>(server_conn->get_priv().get())->get_count());
   server_msgr->shutdown();
   client_msgr->shutdown();
   server_msgr->wait();
@@ -327,10 +1028,15 @@ TEST_P(MessengerTest, NameAddrTest) {
 TEST_P(MessengerTest, FeatureTest) {
   FakeDispatcher cli_dispatcher(false), srv_dispatcher(true);
   entity_addr_t bind_addr;
-  bind_addr.parse("127.0.0.1");
+  if (string(GetParam()) == "simple")
+    bind_addr.parse("v1:127.0.0.1");
+  else
+    bind_addr.parse("v2:127.0.0.1");
   uint64_t all_feature_supported, feature_required, feature_supported = 0;
   for (int i = 0; i < 10; i++)
     feature_supported |= 1ULL << i;
+  feature_supported |= CEPH_FEATUREMASK_MSG_ADDR2;
+  feature_supported |= CEPH_FEATUREMASK_SERVER_NAUTILUS;
   feature_required = feature_supported | 1ULL << 13;
   all_feature_supported = feature_required | 1ULL << 14;
 
@@ -349,7 +1055,8 @@ TEST_P(MessengerTest, FeatureTest) {
   client_msgr->start();
 
   MPing *m = new MPing();
-  ConnectionRef conn = client_msgr->get_connection(server_msgr->get_myinst());
+  ConnectionRef conn = client_msgr->connect_to(server_msgr->get_mytype(),
+                                              server_msgr->get_myaddrs());
   conn->send_message(m);
   CHECK_AND_WAIT_TRUE(!conn->is_connected());
   // should failed build a connection
@@ -364,7 +1071,8 @@ TEST_P(MessengerTest, FeatureTest) {
   client_msgr->set_policy(entity_name_t::TYPE_OSD, p);
   client_msgr->start();
 
-  conn = client_msgr->get_connection(server_msgr->get_myinst());
+  conn = client_msgr->connect_to(server_msgr->get_mytype(),
+                                server_msgr->get_myaddrs());
   {
     m = new MPing();
     ASSERT_EQ(conn->send_message(m), 0);
@@ -373,7 +1081,7 @@ TEST_P(MessengerTest, FeatureTest) {
       cli_dispatcher.cond.Wait(cli_dispatcher.lock);
     cli_dispatcher.got_new = false;
   }
-  ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1);
+  ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
 
   server_msgr->shutdown();
   client_msgr->shutdown();
@@ -382,10 +1090,13 @@ TEST_P(MessengerTest, FeatureTest) {
 }
 
 TEST_P(MessengerTest, TimeoutTest) {
-  g_ceph_context->_conf->set_val("ms_tcp_read_timeout", "1");
+  g_ceph_context->_conf.set_val("ms_tcp_read_timeout", "1");
   FakeDispatcher cli_dispatcher(false), srv_dispatcher(true);
   entity_addr_t bind_addr;
-  bind_addr.parse("127.0.0.1");
+  if (string(GetParam()) == "simple")
+    bind_addr.parse("v1:127.0.0.1");
+  else
+    bind_addr.parse("v2:127.0.0.1");
   server_msgr->bind(bind_addr);
   server_msgr->add_dispatcher_head(&srv_dispatcher);
   server_msgr->start();
@@ -395,7 +1106,8 @@ TEST_P(MessengerTest, TimeoutTest) {
 
   // 1. build the connection
   MPing *m = new MPing();
-  ConnectionRef conn = client_msgr->get_connection(server_msgr->get_myinst());
+  ConnectionRef conn = client_msgr->connect_to(server_msgr->get_mytype(),
+                                                   server_msgr->get_myaddrs());
   {
     ASSERT_EQ(conn->send_message(m), 0);
     Mutex::Locker l(cli_dispatcher.lock);
@@ -404,7 +1116,7 @@ TEST_P(MessengerTest, TimeoutTest) {
     cli_dispatcher.got_new = false;
   }
   ASSERT_TRUE(conn->is_connected());
-  ASSERT_TRUE((static_cast<Session*>(conn->get_priv()))->get_count() == 1);
+  ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
   ASSERT_TRUE(conn->peer_is_osd());
 
   // 2. wait for idle
@@ -416,14 +1128,17 @@ TEST_P(MessengerTest, TimeoutTest) {
 
   client_msgr->shutdown();
   client_msgr->wait();
-  g_ceph_context->_conf->set_val("ms_tcp_read_timeout", "900");
+  g_ceph_context->_conf.set_val("ms_tcp_read_timeout", "900");
 }
 
 TEST_P(MessengerTest, StatefulTest) {
   Message *m;
   FakeDispatcher cli_dispatcher(false), srv_dispatcher(true);
   entity_addr_t bind_addr;
-  bind_addr.parse("127.0.0.1");
+  if (string(GetParam()) == "simple")
+    bind_addr.parse("v1:127.0.0.1");
+  else
+    bind_addr.parse("v2:127.0.0.1");
   Messenger::Policy p = Messenger::Policy::stateful_server(0);
   server_msgr->set_policy(entity_name_t::TYPE_CLIENT, p);
   p = Messenger::Policy::lossless_client(0);
@@ -436,7 +1151,8 @@ TEST_P(MessengerTest, StatefulTest) {
   client_msgr->start();
 
   // 1. test for server standby
-  ConnectionRef conn = client_msgr->get_connection(server_msgr->get_myinst());
+  ConnectionRef conn = client_msgr->connect_to(server_msgr->get_mytype(),
+                                              server_msgr->get_myaddrs());
   {
     m = new MPing();
     ASSERT_EQ(conn->send_message(m), 0);
@@ -445,15 +1161,17 @@ TEST_P(MessengerTest, StatefulTest) {
       cli_dispatcher.cond.Wait(cli_dispatcher.lock);
     cli_dispatcher.got_new = false;
   }
-  ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1);
+  ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
   conn->mark_down();
   ASSERT_FALSE(conn->is_connected());
-  ConnectionRef server_conn = server_msgr->get_connection(client_msgr->get_myinst());
+  ConnectionRef server_conn = server_msgr->connect_to(
+    client_msgr->get_mytype(), srv_dispatcher.last_accept);
   // don't lose state
-  ASSERT_TRUE(static_cast<Session*>(server_conn->get_priv())->get_count() == 1);
+  ASSERT_EQ(1U, static_cast<Session*>(server_conn->get_priv().get())->get_count());
 
   srv_dispatcher.got_new = false;
-  conn = client_msgr->get_connection(server_msgr->get_myinst());
+  conn = client_msgr->connect_to(server_msgr->get_mytype(),
+                                server_msgr->get_myaddrs());
   {
     m = new MPing();
     ASSERT_EQ(conn->send_message(m), 0);
@@ -462,8 +1180,9 @@ TEST_P(MessengerTest, StatefulTest) {
       cli_dispatcher.cond.Wait(cli_dispatcher.lock);
     cli_dispatcher.got_new = false;
   }
-  ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1);
-  server_conn = server_msgr->get_connection(client_msgr->get_myinst());
+  ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
+  server_conn = server_msgr->connect_to(client_msgr->get_mytype(),
+                                       srv_dispatcher.last_accept);
   {
     Mutex::Locker l(srv_dispatcher.lock);
     while (!srv_dispatcher.got_remote_reset)
@@ -503,9 +1222,10 @@ TEST_P(MessengerTest, StatefulTest) {
     cli_dispatcher.got_new = false;
   }
   // resetcheck happen
-  ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv())->get_count());
-  server_conn = server_msgr->get_connection(client_msgr->get_myinst());
-  ASSERT_EQ(1U, static_cast<Session*>(server_conn->get_priv())->get_count());
+  ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
+  server_conn = server_msgr->connect_to(client_msgr->get_mytype(),
+                                       srv_dispatcher.last_accept);
+  ASSERT_EQ(1U, static_cast<Session*>(server_conn->get_priv().get())->get_count());
   cli_dispatcher.got_remote_reset = false;
 
   server_msgr->shutdown();
@@ -518,7 +1238,10 @@ TEST_P(MessengerTest, StatelessTest) {
   Message *m;
   FakeDispatcher cli_dispatcher(false), srv_dispatcher(true);
   entity_addr_t bind_addr;
-  bind_addr.parse("127.0.0.1");
+  if (string(GetParam()) == "simple")
+    bind_addr.parse("v1:127.0.0.1");
+  else
+    bind_addr.parse("v2:127.0.0.1");
   Messenger::Policy p = Messenger::Policy::stateless_server(0);
   server_msgr->set_policy(entity_name_t::TYPE_CLIENT, p);
   p = Messenger::Policy::lossy_client(0);
@@ -531,7 +1254,8 @@ TEST_P(MessengerTest, StatelessTest) {
   client_msgr->start();
 
   // 1. test for server lose state
-  ConnectionRef conn = client_msgr->get_connection(server_msgr->get_myinst());
+  ConnectionRef conn = client_msgr->connect_to(server_msgr->get_mytype(),
+                                                   server_msgr->get_myaddrs());
   {
     m = new MPing();
     ASSERT_EQ(conn->send_message(m), 0);
@@ -540,12 +1264,13 @@ TEST_P(MessengerTest, StatelessTest) {
       cli_dispatcher.cond.Wait(cli_dispatcher.lock);
     cli_dispatcher.got_new = false;
   }
-  ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1);
+  ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
   conn->mark_down();
   ASSERT_FALSE(conn->is_connected());
 
   srv_dispatcher.got_new = false;
-  conn = client_msgr->get_connection(server_msgr->get_myinst());
+  conn = client_msgr->connect_to(server_msgr->get_mytype(),
+                                     server_msgr->get_myaddrs());
   {
     m = new MPing();
     ASSERT_EQ(conn->send_message(m), 0);
@@ -554,15 +1279,16 @@ TEST_P(MessengerTest, StatelessTest) {
       cli_dispatcher.cond.Wait(cli_dispatcher.lock);
     cli_dispatcher.got_new = false;
   }
-  ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1);
-  ConnectionRef server_conn = server_msgr->get_connection(client_msgr->get_myinst());
+  ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
+  ConnectionRef server_conn = server_msgr->connect_to(client_msgr->get_mytype(),
+                                                     srv_dispatcher.last_accept);
   // server lose state
   {
     Mutex::Locker l(srv_dispatcher.lock);
     while (!srv_dispatcher.got_new)
       srv_dispatcher.cond.Wait(srv_dispatcher.lock);
   }
-  ASSERT_EQ(1U, static_cast<Session*>(server_conn->get_priv())->get_count());
+  ASSERT_EQ(1U, static_cast<Session*>(server_conn->get_priv().get())->get_count());
 
   // 2. test for client lossy
   server_conn->mark_down();
@@ -570,7 +1296,8 @@ TEST_P(MessengerTest, StatelessTest) {
   conn->send_keepalive();
   CHECK_AND_WAIT_TRUE(!conn->is_connected());
   ASSERT_FALSE(conn->is_connected());
-  conn = client_msgr->get_connection(server_msgr->get_myinst());
+  conn = client_msgr->connect_to(server_msgr->get_mytype(),
+                                server_msgr->get_myaddrs());
   {
     m = new MPing();
     ASSERT_EQ(conn->send_message(m), 0);
@@ -579,7 +1306,7 @@ TEST_P(MessengerTest, StatelessTest) {
       cli_dispatcher.cond.Wait(cli_dispatcher.lock);
     cli_dispatcher.got_new = false;
   }
-  ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1);
+  ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
 
   server_msgr->shutdown();
   client_msgr->shutdown();
@@ -591,7 +1318,10 @@ TEST_P(MessengerTest, ClientStandbyTest) {
   Message *m;
   FakeDispatcher cli_dispatcher(false), srv_dispatcher(true);
   entity_addr_t bind_addr;
-  bind_addr.parse("127.0.0.1");
+  if (string(GetParam()) == "simple")
+    bind_addr.parse("v1:127.0.0.1");
+  else
+    bind_addr.parse("v2:127.0.0.1");
   Messenger::Policy p = Messenger::Policy::stateful_server(0);
   server_msgr->set_policy(entity_name_t::TYPE_CLIENT, p);
   p = Messenger::Policy::lossless_peer(0);
@@ -604,7 +1334,8 @@ TEST_P(MessengerTest, ClientStandbyTest) {
   client_msgr->start();
 
   // 1. test for client standby, resetcheck
-  ConnectionRef conn = client_msgr->get_connection(server_msgr->get_myinst());
+  ConnectionRef conn = client_msgr->connect_to(server_msgr->get_mytype(),
+                                                   server_msgr->get_myaddrs());
   {
     m = new MPing();
     ASSERT_EQ(conn->send_message(m), 0);
@@ -613,8 +1344,10 @@ TEST_P(MessengerTest, ClientStandbyTest) {
       cli_dispatcher.cond.Wait(cli_dispatcher.lock);
     cli_dispatcher.got_new = false;
   }
-  ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1);
-  ConnectionRef server_conn = server_msgr->get_connection(client_msgr->get_myinst());
+  ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
+  ConnectionRef server_conn = server_msgr->connect_to(
+    client_msgr->get_mytype(),
+    srv_dispatcher.last_accept);
   ASSERT_FALSE(cli_dispatcher.got_remote_reset);
   cli_dispatcher.got_connect = false;
   server_conn->mark_down();
@@ -644,9 +1377,10 @@ TEST_P(MessengerTest, ClientStandbyTest) {
       cli_dispatcher.cond.Wait(cli_dispatcher.lock);
     cli_dispatcher.got_new = false;
   }
-  ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1);
-  server_conn = server_msgr->get_connection(client_msgr->get_myinst());
-  ASSERT_TRUE(static_cast<Session*>(server_conn->get_priv())->get_count() == 1);
+  ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
+  server_conn = server_msgr->connect_to(client_msgr->get_mytype(),
+                                       srv_dispatcher.last_accept);
+  ASSERT_EQ(1U, static_cast<Session*>(server_conn->get_priv().get())->get_count());
 
   server_msgr->shutdown();
   client_msgr->shutdown();
@@ -655,12 +1389,15 @@ TEST_P(MessengerTest, ClientStandbyTest) {
 }
 
 TEST_P(MessengerTest, AuthTest) {
-  g_ceph_context->_conf->set_val("auth_cluster_required", "cephx");
-  g_ceph_context->_conf->set_val("auth_service_required", "cephx");
-  g_ceph_context->_conf->set_val("auth_client_required", "cephx");
+  g_ceph_context->_conf.set_val("auth_cluster_required", "cephx");
+  g_ceph_context->_conf.set_val("auth_service_required", "cephx");
+  g_ceph_context->_conf.set_val("auth_client_required", "cephx");
   FakeDispatcher cli_dispatcher(false), srv_dispatcher(true);
   entity_addr_t bind_addr;
-  bind_addr.parse("127.0.0.1");
+  if (string(GetParam()) == "simple")
+    bind_addr.parse("v1:127.0.0.1");
+  else
+    bind_addr.parse("v2:127.0.0.1");
   server_msgr->bind(bind_addr);
   server_msgr->add_dispatcher_head(&srv_dispatcher);
   server_msgr->start();
@@ -670,7 +1407,8 @@ TEST_P(MessengerTest, AuthTest) {
 
   // 1. simple auth round trip
   MPing *m = new MPing();
-  ConnectionRef conn = client_msgr->get_connection(server_msgr->get_myinst());
+  ConnectionRef conn = client_msgr->connect_to(server_msgr->get_mytype(),
+                                                   server_msgr->get_myaddrs());
   {
     ASSERT_EQ(conn->send_message(m), 0);
     Mutex::Locker l(cli_dispatcher.lock);
@@ -679,15 +1417,16 @@ TEST_P(MessengerTest, AuthTest) {
     cli_dispatcher.got_new = false;
   }
   ASSERT_TRUE(conn->is_connected());
-  ASSERT_TRUE((static_cast<Session*>(conn->get_priv()))->get_count() == 1);
+  ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
 
   // 2. mix auth
-  g_ceph_context->_conf->set_val("auth_cluster_required", "none");
-  g_ceph_context->_conf->set_val("auth_service_required", "none");
-  g_ceph_context->_conf->set_val("auth_client_required", "none");
+  g_ceph_context->_conf.set_val("auth_cluster_required", "none");
+  g_ceph_context->_conf.set_val("auth_service_required", "none");
+  g_ceph_context->_conf.set_val("auth_client_required", "none");
   conn->mark_down();
   ASSERT_FALSE(conn->is_connected());
-  conn = client_msgr->get_connection(server_msgr->get_myinst());
+  conn = client_msgr->connect_to(server_msgr->get_mytype(),
+                                                   server_msgr->get_myaddrs());
   {
     MPing *m = new MPing();
     ASSERT_EQ(conn->send_message(m), 0);
@@ -697,8 +1436,7 @@ TEST_P(MessengerTest, AuthTest) {
     cli_dispatcher.got_new = false;
   }
   ASSERT_TRUE(conn->is_connected());
-  ASSERT_TRUE((static_cast<Session*>(conn->get_priv()))->get_count() == 1);
-
+  ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
   server_msgr->shutdown();
   client_msgr->shutdown();
   server_msgr->wait();
@@ -708,7 +1446,10 @@ TEST_P(MessengerTest, AuthTest) {
 TEST_P(MessengerTest, MessageTest) {
   FakeDispatcher cli_dispatcher(false), srv_dispatcher(true);
   entity_addr_t bind_addr;
-  bind_addr.parse("127.0.0.1");
+  if (string(GetParam()) == "simple")
+    bind_addr.parse("v1:127.0.0.1");
+  else
+    bind_addr.parse("v2:127.0.0.1");
   Messenger::Policy p = Messenger::Policy::stateful_server(0);
   server_msgr->set_policy(entity_name_t::TYPE_CLIENT, p);
   p = Messenger::Policy::lossless_peer(0);
@@ -724,7 +1465,8 @@ TEST_P(MessengerTest, MessageTest) {
   // 1. A very large "front"(as well as "payload")
   // Because a external message need to invade Messenger::decode_message,
   // here we only use existing message class(MCommand)
-  ConnectionRef conn = client_msgr->get_connection(server_msgr->get_myinst());
+  ConnectionRef conn = client_msgr->connect_to(server_msgr->get_mytype(),
+                                                   server_msgr->get_myaddrs());
   {
     uuid_d uuid;
     uuid.generate_random();
@@ -775,8 +1517,8 @@ struct Payload {
     PING = 0,
     PONG = 1,
   };
-  uint8_t who;
-  uint64_t seq;
+  uint8_t who = 0;
+  uint64_t seq = 0;
   bufferlist data;
 
   Payload(Who who, uint64_t seq, const bufferlist& data)
@@ -813,7 +1555,10 @@ class SyntheticDispatcher : public Dispatcher {
 
   SyntheticDispatcher(bool s, SyntheticWorkload *wl):
       Dispatcher(g_ceph_context), lock("SyntheticDispatcher::lock"), is_server(s), got_new(false),
-      got_remote_reset(false), got_connect(false), index(0), workload(wl) {}
+      got_remote_reset(false), got_connect(false), index(0), workload(wl) {
+    // don't need authorizers
+    ms_set_require_authorizer(false);
+  }
   bool ms_can_fast_dispatch_any() const override { return true; }
   bool ms_can_fast_dispatch(const Message *m) const override {
     switch (m->get_type()) {
@@ -868,8 +1613,8 @@ class SyntheticDispatcher : public Dispatcher {
     }
 
     Payload pl;
-    auto p = m->get_data().begin();
-    ::decode(pl, p);
+    auto p = m->get_data().cbegin();
+    decode(pl, p);
     if (pl.who == Payload::PING) {
       lderr(g_ceph_context) << __func__ << " conn=" << m->get_connection() << pl << dendl;
       reply_message(m, pl);
@@ -892,18 +1637,14 @@ class SyntheticDispatcher : public Dispatcher {
     }
   }
 
-  bool ms_verify_authorizer(Connection *con, int peer_type, int protocol,
-                            bufferlist& authorizer, bufferlist& authorizer_reply,
-                            bool& isvalid, CryptoKey& session_key,
-                           std::unique_ptr<AuthAuthorizerChallenge> *challenge) override {
-    isvalid = true;
-    return true;
+  int ms_handle_authentication(Connection *con) override {
+    return 1;
   }
 
   void reply_message(const Message *m, Payload& pl) {
     pl.who = Payload::PONG;
     bufferlist bl;
-    ::encode(pl, bl);
+    encode(pl, bl);
     MPing *rm = new MPing();
     rm->set_data(bl);
     m->get_connection()->send_message(rm);
@@ -914,7 +1655,7 @@ class SyntheticDispatcher : public Dispatcher {
     Message *m = new MPing();
     Payload pl{Payload::PING, index++, data};
     bufferlist bl;
-    ::encode(pl, bl);
+    encode(pl, bl);
     m->set_data(bl);
     if (!con->get_messenger()->get_default_policy().lossy) {
       Mutex::Locker l(lock);
@@ -954,10 +1695,12 @@ class SyntheticWorkload {
   Cond cond;
   set<Messenger*> available_servers;
   set<Messenger*> available_clients;
+  Messenger::Policy client_policy;
   map<ConnectionRef, pair<Messenger*, Messenger*> > available_connections;
   SyntheticDispatcher dispatcher;
   gen_type rng;
   vector<bufferlist> rand_data;
+  DummyAuthClientServer dummy_auth;
 
  public:
   static const unsigned max_in_flight = 64;
@@ -965,8 +1708,13 @@ class SyntheticWorkload {
   static const unsigned max_message_len = 1024 * 1024 * 4;
 
   SyntheticWorkload(int servers, int clients, string type, int random_num,
-                    Messenger::Policy srv_policy, Messenger::Policy cli_policy):
-      lock("SyntheticWorkload::lock"), dispatcher(false, this), rng(time(NULL)) {
+                    Messenger::Policy srv_policy, Messenger::Policy cli_policy)
+    : lock("SyntheticWorkload::lock"),
+      client_policy(cli_policy),
+      dispatcher(false, this),
+      rng(time(NULL)),
+      dummy_auth(g_ceph_context) {
+    dummy_auth.auth_registry.refresh_config();
     Messenger *msgr;
     int base_port = 16800;
     entity_addr_t bind_addr;
@@ -974,12 +1722,16 @@ class SyntheticWorkload {
     for (int i = 0; i < servers; ++i) {
       msgr = Messenger::create(g_ceph_context, type, entity_name_t::OSD(0),
                                "server", getpid()+i, 0);
-      snprintf(addr, sizeof(addr), "127.0.0.1:%d", base_port+i);
+      snprintf(addr, sizeof(addr), "%s127.0.0.1:%d",
+              (type == "simple") ? "v1:":"v2:",
+              base_port+i);
       bind_addr.parse(addr);
       msgr->bind(bind_addr);
       msgr->add_dispatcher_head(&dispatcher);
+      msgr->set_auth_client(&dummy_auth);
+      msgr->set_auth_server(&dummy_auth);
 
-      assert(msgr);
+      ceph_assert(msgr);
       msgr->set_default_policy(srv_policy);
       available_servers.insert(msgr);
       msgr->start();
@@ -989,13 +1741,17 @@ class SyntheticWorkload {
       msgr = Messenger::create(g_ceph_context, type, entity_name_t::CLIENT(-1),
                                "client", getpid()+i+servers, 0);
       if (cli_policy.standby) {
-        snprintf(addr, sizeof(addr), "127.0.0.1:%d", base_port+i+servers);
+        snprintf(addr, sizeof(addr), "%s127.0.0.1:%d",
+                (type == "simple") ? "v1:":"v2:",
+                base_port+i+servers);
         bind_addr.parse(addr);
         msgr->bind(bind_addr);
       }
       msgr->add_dispatcher_head(&dispatcher);
+      msgr->set_auth_client(&dummy_auth);
+      msgr->set_auth_server(&dummy_auth);
 
-      assert(msgr);
+      ceph_assert(msgr);
       msgr->set_default_policy(cli_policy);
       available_clients.insert(msgr);
       msgr->start();
@@ -1023,7 +1779,7 @@ class SyntheticWorkload {
       usleep(500);
       lock.Lock();
     }
-    assert(lock.is_locked());
+    ceph_assert(lock.is_locked());
     boost::uniform_int<> choose(0, available_connections.size() - 1);
     int index = choose(rng);
     map<ConnectionRef, pair<Messenger*, Messenger*> >::iterator i = available_connections.begin();
@@ -1061,16 +1817,16 @@ class SyntheticWorkload {
       boost::uniform_int<> choose(0, available_servers.size() - 1);
       if (server->get_default_policy().server) {
         p = make_pair(client, server);
+       ConnectionRef conn = client->connect_to(server->get_mytype(),
+                                               server->get_myaddrs());
+       available_connections[conn] = p;
       } else {
-        ConnectionRef conn = client->get_connection(server->get_myinst());
-        if (available_connections.count(conn) || choose(rng) % 2)
-          p = make_pair(client, server);
-        else
-          p = make_pair(server, client);
+        ConnectionRef conn = client->connect_to(server->get_mytype(),
+                                               server->get_myaddrs());
+       p = make_pair(client, server);
+       available_connections[conn] = p;
       }
     }
-    ConnectionRef conn = p.first->get_connection(p.second->get_myinst());
-    available_connections[conn] = p;
   }
 
   void send_message() {
@@ -1100,14 +1856,19 @@ class SyntheticWorkload {
     ConnectionRef conn = _get_random_connection();
     dispatcher.clear_pending(conn);
     conn->mark_down();
-    pair<Messenger*, Messenger*> &p = available_connections[conn];
-    // it's a lossless policy, so we need to mark down each side
-    if (!p.first->get_default_policy().server && !p.second->get_default_policy().server) {
-      ASSERT_EQ(conn->get_messenger(), p.first);
-      ConnectionRef peer = p.second->get_connection(p.first->get_myinst());
-      peer->mark_down();
-      dispatcher.clear_pending(peer);
-      available_connections.erase(peer);
+    if (!client_policy.server &&
+       !client_policy.lossy &&
+       client_policy.standby) {
+      // it's a lossless policy, so we need to mark down each side
+      pair<Messenger*, Messenger*> &p = available_connections[conn];
+      if (!p.first->get_default_policy().server && !p.second->get_default_policy().server) {
+       ASSERT_EQ(conn->get_messenger(), p.first);
+       ConnectionRef peer = p.second->connect_to(p.first->get_mytype(),
+                                                 p.first->get_myaddrs());
+       peer->mark_down();
+       dispatcher.clear_pending(peer);
+       available_connections.erase(peer);
+      }
     }
     ASSERT_EQ(available_connections.erase(conn), 1U);
   }
@@ -1131,7 +1892,7 @@ class SyntheticWorkload {
       if (i++ % 50 == 0)
         print_internal_state(true);
       if (timeout_us < 0)
-        assert(0 == " loop time exceed 5 mins, it looks we stuck into some problems!");
+        ceph_abort_msg(" loop time exceed 5 mins, it looks we stuck into some problems!");
     }
     for (set<Messenger*>::iterator it = available_servers.begin();
          it != available_servers.end(); ++it) {
@@ -1225,9 +1986,9 @@ TEST_P(MessengerTest, SyntheticStressTest1) {
 
 TEST_P(MessengerTest, SyntheticInjectTest) {
   uint64_t dispatch_throttle_bytes = g_ceph_context->_conf->ms_dispatch_throttle_bytes;
-  g_ceph_context->_conf->set_val("ms_inject_socket_failures", "30");
-  g_ceph_context->_conf->set_val("ms_inject_internal_delays", "0.1");
-  g_ceph_context->_conf->set_val("ms_dispatch_throttle_bytes", "16777216");
+  g_ceph_context->_conf.set_val("ms_inject_socket_failures", "30");
+  g_ceph_context->_conf.set_val("ms_inject_internal_delays", "0.1");
+  g_ceph_context->_conf.set_val("ms_dispatch_throttle_bytes", "16777216");
   SyntheticWorkload test_msg(8, 32, GetParam(), 100,
                              Messenger::Policy::stateful_server(0),
                              Messenger::Policy::lossless_client(0));
@@ -1254,15 +2015,15 @@ TEST_P(MessengerTest, SyntheticInjectTest) {
     }
   }
   test_msg.wait_for_done();
-  g_ceph_context->_conf->set_val("ms_inject_socket_failures", "0");
-  g_ceph_context->_conf->set_val("ms_inject_internal_delays", "0");
-  g_ceph_context->_conf->set_val(
+  g_ceph_context->_conf.set_val("ms_inject_socket_failures", "0");
+  g_ceph_context->_conf.set_val("ms_inject_internal_delays", "0");
+  g_ceph_context->_conf.set_val(
       "ms_dispatch_throttle_bytes", std::to_string(dispatch_throttle_bytes));
 }
 
 TEST_P(MessengerTest, SyntheticInjectTest2) {
-  g_ceph_context->_conf->set_val("ms_inject_socket_failures", "30");
-  g_ceph_context->_conf->set_val("ms_inject_internal_delays", "0.1");
+  g_ceph_context->_conf.set_val("ms_inject_socket_failures", "30");
+  g_ceph_context->_conf.set_val("ms_inject_internal_delays", "0.1");
   SyntheticWorkload test_msg(8, 16, GetParam(), 100,
                              Messenger::Policy::lossless_peer_reuse(0),
                              Messenger::Policy::lossless_peer_reuse(0));
@@ -1289,13 +2050,13 @@ TEST_P(MessengerTest, SyntheticInjectTest2) {
     }
   }
   test_msg.wait_for_done();
-  g_ceph_context->_conf->set_val("ms_inject_socket_failures", "0");
-  g_ceph_context->_conf->set_val("ms_inject_internal_delays", "0");
+  g_ceph_context->_conf.set_val("ms_inject_socket_failures", "0");
+  g_ceph_context->_conf.set_val("ms_inject_internal_delays", "0");
 }
 
 TEST_P(MessengerTest, SyntheticInjectTest3) {
-  g_ceph_context->_conf->set_val("ms_inject_socket_failures", "600");
-  g_ceph_context->_conf->set_val("ms_inject_internal_delays", "0.1");
+  g_ceph_context->_conf.set_val("ms_inject_socket_failures", "600");
+  g_ceph_context->_conf.set_val("ms_inject_internal_delays", "0.1");
   SyntheticWorkload test_msg(8, 16, GetParam(), 100,
                              Messenger::Policy::stateless_server(0),
                              Messenger::Policy::lossy_client(0));
@@ -1322,17 +2083,17 @@ TEST_P(MessengerTest, SyntheticInjectTest3) {
     }
   }
   test_msg.wait_for_done();
-  g_ceph_context->_conf->set_val("ms_inject_socket_failures", "0");
-  g_ceph_context->_conf->set_val("ms_inject_internal_delays", "0");
+  g_ceph_context->_conf.set_val("ms_inject_socket_failures", "0");
+  g_ceph_context->_conf.set_val("ms_inject_internal_delays", "0");
 }
 
 
 TEST_P(MessengerTest, SyntheticInjectTest4) {
-  g_ceph_context->_conf->set_val("ms_inject_socket_failures", "30");
-  g_ceph_context->_conf->set_val("ms_inject_internal_delays", "0.1");
-  g_ceph_context->_conf->set_val("ms_inject_delay_probability", "1");
-  g_ceph_context->_conf->set_val("ms_inject_delay_type", "client osd", false);
-  g_ceph_context->_conf->set_val("ms_inject_delay_max", "5");
+  g_ceph_context->_conf.set_val("ms_inject_socket_failures", "30");
+  g_ceph_context->_conf.set_val("ms_inject_internal_delays", "0.1");
+  g_ceph_context->_conf.set_val("ms_inject_delay_probability", "1");
+  g_ceph_context->_conf.set_val("ms_inject_delay_type", "client osd");
+  g_ceph_context->_conf.set_val("ms_inject_delay_max", "5");
   SyntheticWorkload test_msg(16, 32, GetParam(), 100,
                              Messenger::Policy::lossless_peer(0),
                              Messenger::Policy::lossless_peer(0));
@@ -1359,11 +2120,11 @@ TEST_P(MessengerTest, SyntheticInjectTest4) {
     }
   }
   test_msg.wait_for_done();
-  g_ceph_context->_conf->set_val("ms_inject_socket_failures", "0");
-  g_ceph_context->_conf->set_val("ms_inject_internal_delays", "0");
-  g_ceph_context->_conf->set_val("ms_inject_delay_probability", "0");
-  g_ceph_context->_conf->set_val("ms_inject_delay_type", "", false);
-  g_ceph_context->_conf->set_val("ms_inject_delay_max", "0");
+  g_ceph_context->_conf.set_val("ms_inject_socket_failures", "0");
+  g_ceph_context->_conf.set_val("ms_inject_internal_delays", "0");
+  g_ceph_context->_conf.set_val("ms_inject_delay_probability", "0");
+  g_ceph_context->_conf.set_val("ms_inject_delay_type", "");
+  g_ceph_context->_conf.set_val("ms_inject_delay_max", "0");
 }
 
 
@@ -1374,7 +2135,10 @@ class MarkdownDispatcher : public Dispatcher {
  public:
   std::atomic<uint64_t> count = { 0 };
   explicit MarkdownDispatcher(bool s): Dispatcher(g_ceph_context), lock("MarkdownDispatcher::lock"),
-                              last_mark(false) {}
+                              last_mark(false) {
+    // don't need authorizers
+    ms_set_require_authorizer(false);
+  }
   bool ms_can_fast_dispatch_any() const override { return false; }
   bool ms_can_fast_dispatch(const Message *m) const override {
     switch (m->get_type()) {
@@ -1436,12 +2200,8 @@ class MarkdownDispatcher : public Dispatcher {
   void ms_fast_dispatch(Message *m) override {
     ceph_abort();
   }
-  bool ms_verify_authorizer(Connection *con, int peer_type, int protocol,
-                            bufferlist& authorizer, bufferlist& authorizer_reply,
-                            bool& isvalid, CryptoKey& session_key,
-                           std::unique_ptr<AuthAuthorizerChallenge> *challenge) override {
-    isvalid = true;
-    return true;
+  int ms_handle_authentication(Connection *con) override {
+    return 1;
   }
 };
 
@@ -1450,17 +2210,31 @@ class MarkdownDispatcher : public Dispatcher {
 TEST_P(MessengerTest, MarkdownTest) {
   Messenger *server_msgr2 = Messenger::create(g_ceph_context, string(GetParam()), entity_name_t::OSD(0), "server", getpid(), 0);
   MarkdownDispatcher cli_dispatcher(false), srv_dispatcher(true);
+  DummyAuthClientServer dummy_auth(g_ceph_context);
+  dummy_auth.auth_registry.refresh_config();
   entity_addr_t bind_addr;
-  bind_addr.parse("127.0.0.1:16800");
+  if (string(GetParam()) == "simple")
+    bind_addr.parse("v1:127.0.0.1:16800");
+  else
+    bind_addr.parse("v2:127.0.0.1:16800");
   server_msgr->bind(bind_addr);
   server_msgr->add_dispatcher_head(&srv_dispatcher);
+  server_msgr->set_auth_client(&dummy_auth);
+  server_msgr->set_auth_server(&dummy_auth);
   server_msgr->start();
-  bind_addr.parse("127.0.0.1:16801");
+  if (string(GetParam()) == "simple")
+    bind_addr.parse("v1:127.0.0.1:16801");
+  else
+    bind_addr.parse("v2:127.0.0.1:16801");
   server_msgr2->bind(bind_addr);
   server_msgr2->add_dispatcher_head(&srv_dispatcher);
+  server_msgr2->set_auth_client(&dummy_auth);
+  server_msgr2->set_auth_server(&dummy_auth);
   server_msgr2->start();
 
   client_msgr->add_dispatcher_head(&cli_dispatcher);
+  client_msgr->set_auth_client(&dummy_auth);
+  client_msgr->set_auth_server(&dummy_auth);
   client_msgr->start();
 
   int i = 1000;
@@ -1468,8 +2242,10 @@ TEST_P(MessengerTest, MarkdownTest) {
   bool equal = false;
   uint64_t equal_count = 0;
   while (i--) {
-    ConnectionRef conn1 = client_msgr->get_connection(server_msgr->get_myinst());
-    ConnectionRef conn2 = client_msgr->get_connection(server_msgr2->get_myinst());
+    ConnectionRef conn1 = client_msgr->connect_to(server_msgr->get_mytype(),
+                                                      server_msgr->get_myaddrs());
+    ConnectionRef conn2 = client_msgr->connect_to(server_msgr2->get_mytype(),
+                                                      server_msgr2->get_myaddrs());
     MPing *m = new MPing();
     ASSERT_EQ(conn1->send_message(m), 0);
     m = new MPing();
@@ -1522,16 +2298,18 @@ TEST(DummyTest, ValueParameterizedTestsAreNotSupportedOnThisPlatform) {}
 int main(int argc, char **argv) {
   vector<const char*> args;
   argv_to_vec(argc, (const char **)argv, args);
-  env_to_vec(args);
-
-  auto cct = global_init(NULL, args, CEPH_ENTITY_TYPE_CLIENT, CODE_ENVIRONMENT_UTILITY, 0);
-  g_ceph_context->_conf->set_val("auth_cluster_required", "none");
-  g_ceph_context->_conf->set_val("auth_service_required", "none");
-  g_ceph_context->_conf->set_val("auth_client_required", "none");
-  g_ceph_context->_conf->set_val("enable_experimental_unrecoverable_data_corrupting_features", "ms-type-async");
-  g_ceph_context->_conf->set_val("ms_die_on_bad_msg", "true");
-  g_ceph_context->_conf->set_val("ms_die_on_old_message", "true");
-  g_ceph_context->_conf->set_val("ms_max_backoff", "1");
+
+  auto cct = global_init(NULL, args, CEPH_ENTITY_TYPE_CLIENT,
+                        CODE_ENVIRONMENT_UTILITY,
+                        CINIT_FLAG_NO_DEFAULT_CONFIG_FILE);
+  g_ceph_context->_conf.set_val("auth_cluster_required", "none");
+  g_ceph_context->_conf.set_val("auth_service_required", "none");
+  g_ceph_context->_conf.set_val("auth_client_required", "none");
+  g_ceph_context->_conf.set_val("keyring", "/dev/null");
+  g_ceph_context->_conf.set_val("enable_experimental_unrecoverable_data_corrupting_features", "ms-type-async");
+  g_ceph_context->_conf.set_val("ms_die_on_bad_msg", "true");
+  g_ceph_context->_conf.set_val("ms_die_on_old_message", "true");
+  g_ceph_context->_conf.set_val("ms_max_backoff", "1");
   common_init_finish(g_ceph_context);
 
   ::testing::InitGoogleTest(&argc, argv);