]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/crimson/net/SocketMessenger.h
import 15.2.0 Octopus source
[ceph.git] / ceph / src / crimson / net / SocketMessenger.h
index 535dea3a1403ea8cbf1bd2f0f2d5e0c2264815ae..d1b86e16a2abada7b1640e19b979922313f9bca2 100644 (file)
 #include <seastar/core/gate.hh>
 #include <seastar/core/reactor.hh>
 #include <seastar/core/sharded.hh>
+#include <seastar/core/shared_future.hh>
 
 #include "Messenger.h"
 #include "SocketConnection.h"
 
-namespace ceph::net {
+namespace crimson::net {
 
-class SocketMessenger final : public Messenger, public seastar::peering_sharded_service<SocketMessenger> {
-  const int master_sid;
-  const seastar::shard_id sid;
+class FixedCPUServerSocket;
+
+class SocketMessenger final : public Messenger {
+  const seastar::shard_id master_sid;
   seastar::promise<> shutdown_promise;
 
-  std::optional<seastar::server_socket> listener;
+  FixedCPUServerSocket* listener = nullptr;
   Dispatcher *dispatcher = nullptr;
   std::map<entity_addr_t, SocketConnectionRef> connections;
   std::set<SocketConnectionRef> accepting_conns;
@@ -39,27 +41,18 @@ class SocketMessenger final : public Messenger, public seastar::peering_sharded_
   // Distinguish messengers with meaningful names for debugging
   const std::string logic_name;
   const uint32_t nonce;
+  // specifying we haven't learned our addr; set false when we find it.
+  bool need_addr = true;
+  uint32_t global_seq = 0;
+  bool started = false;
 
-  seastar::future<> accept(seastar::connected_socket socket,
-                           seastar::socket_address paddr);
-
-  void do_bind(const entity_addrvec_t& addr);
-  seastar::future<> do_start(Dispatcher *disp);
-  seastar::foreign_ptr<ConnectionRef> do_connect(const entity_addr_t& peer_addr,
-                                                 const entity_type_t& peer_type);
-  seastar::future<> do_shutdown();
-  // conn sharding options:
-  // 0. Compatible (master_sid >= 0): place all connections to one master shard
-  // 1. Simplest (master_sid < 0): sharded by ip only
-  // 2. Balanced (not implemented): sharded by ip + port + nonce,
-  //        but, need to move SocketConnection between cores.
-  seastar::shard_id locate_shard(const entity_addr_t& addr);
+  seastar::future<> do_bind(const entity_addrvec_t& addr);
 
  public:
   SocketMessenger(const entity_name_t& myname,
                   const std::string& logic_name,
-                  uint32_t nonce,
-                  int master_sid);
+                  uint32_t nonce);
+  ~SocketMessenger() override { ceph_assert(!listener); }
 
   seastar::future<> set_myaddrs(const entity_addrvec_t& addr) override;
 
@@ -72,25 +65,26 @@ class SocketMessenger final : public Messenger, public seastar::peering_sharded_
 
   seastar::future<> start(Dispatcher *dispatcher) override;
 
-  seastar::future<ConnectionXRef> connect(const entity_addr_t& peer_addr,
-                                          const entity_type_t& peer_type) override;
+  ConnectionRef connect(const entity_addr_t& peer_addr,
+                        const entity_type_t& peer_type) override;
   // can only wait once
   seastar::future<> wait() override {
+    assert(seastar::engine().cpu_id() == master_sid);
     return shutdown_promise.get_future();
   }
 
   seastar::future<> shutdown() override;
 
-  Messenger* get_local_shard() override {
-    return &container().local();
-  }
-
   void print(ostream& out) const override {
     out << get_myname()
         << "(" << logic_name
         << ") " << get_myaddr();
   }
 
+  SocketPolicy get_policy(entity_type_t peer_type) const override;
+
+  SocketPolicy get_default_policy() const override;
+
   void set_default_policy(const SocketPolicy& p) override;
 
   void set_policy(entity_type_t peer_type, const SocketPolicy& p) override;
@@ -98,22 +92,19 @@ class SocketMessenger final : public Messenger, public seastar::peering_sharded_
   void set_policy_throttler(entity_type_t peer_type, Throttle* throttle) override;
 
  public:
-  seastar::future<> learned_addr(const entity_addr_t &peer_addr_for_me);
+  seastar::future<uint32_t> get_global_seq(uint32_t old=0);
+  seastar::future<> learned_addr(const entity_addr_t &peer_addr_for_me,
+                                 const SocketConnection& conn);
 
   SocketConnectionRef lookup_conn(const entity_addr_t& addr);
   void accept_conn(SocketConnectionRef);
   void unaccept_conn(SocketConnectionRef);
   void register_conn(SocketConnectionRef);
   void unregister_conn(SocketConnectionRef);
-
-  // required by sharded<>
-  seastar::future<> stop() {
-    return seastar::make_ready_future<>();
-  }
-
   seastar::shard_id shard_id() const {
-    return sid;
+    assert(seastar::engine().cpu_id() == master_sid);
+    return master_sid;
   }
 };
 
-} // namespace ceph::net
+} // namespace crimson::net