#include <seastar/core/gate.hh>
#include <seastar/core/reactor.hh>
#include <seastar/core/sharded.hh>
+#include <seastar/core/shared_future.hh>
#include "Messenger.h"
#include "SocketConnection.h"
-namespace ceph::net {
+namespace crimson::net {
-class SocketMessenger final : public Messenger, public seastar::peering_sharded_service<SocketMessenger> {
- const int master_sid;
- const seastar::shard_id sid;
+class FixedCPUServerSocket;
+
+class SocketMessenger final : public Messenger {
+ const seastar::shard_id master_sid;
seastar::promise<> shutdown_promise;
- std::optional<seastar::server_socket> listener;
+ FixedCPUServerSocket* listener = nullptr;
Dispatcher *dispatcher = nullptr;
std::map<entity_addr_t, SocketConnectionRef> connections;
std::set<SocketConnectionRef> accepting_conns;
// Distinguish messengers with meaningful names for debugging
const std::string logic_name;
const uint32_t nonce;
+ // specifying we haven't learned our addr; set false when we find it.
+ bool need_addr = true;
+ uint32_t global_seq = 0;
+ bool started = false;
- seastar::future<> accept(seastar::connected_socket socket,
- seastar::socket_address paddr);
-
- void do_bind(const entity_addrvec_t& addr);
- seastar::future<> do_start(Dispatcher *disp);
- seastar::foreign_ptr<ConnectionRef> do_connect(const entity_addr_t& peer_addr,
- const entity_type_t& peer_type);
- seastar::future<> do_shutdown();
- // conn sharding options:
- // 0. Compatible (master_sid >= 0): place all connections to one master shard
- // 1. Simplest (master_sid < 0): sharded by ip only
- // 2. Balanced (not implemented): sharded by ip + port + nonce,
- // but, need to move SocketConnection between cores.
- seastar::shard_id locate_shard(const entity_addr_t& addr);
+ seastar::future<> do_bind(const entity_addrvec_t& addr);
public:
SocketMessenger(const entity_name_t& myname,
const std::string& logic_name,
- uint32_t nonce,
- int master_sid);
+ uint32_t nonce);
+ ~SocketMessenger() override { ceph_assert(!listener); }
seastar::future<> set_myaddrs(const entity_addrvec_t& addr) override;
seastar::future<> start(Dispatcher *dispatcher) override;
- seastar::future<ConnectionXRef> connect(const entity_addr_t& peer_addr,
- const entity_type_t& peer_type) override;
+ ConnectionRef connect(const entity_addr_t& peer_addr,
+ const entity_type_t& peer_type) override;
// can only wait once
seastar::future<> wait() override {
+ assert(seastar::engine().cpu_id() == master_sid);
return shutdown_promise.get_future();
}
seastar::future<> shutdown() override;
- Messenger* get_local_shard() override {
- return &container().local();
- }
-
void print(ostream& out) const override {
out << get_myname()
<< "(" << logic_name
<< ") " << get_myaddr();
}
+ SocketPolicy get_policy(entity_type_t peer_type) const override;
+
+ SocketPolicy get_default_policy() const override;
+
void set_default_policy(const SocketPolicy& p) override;
void set_policy(entity_type_t peer_type, const SocketPolicy& p) override;
void set_policy_throttler(entity_type_t peer_type, Throttle* throttle) override;
public:
- seastar::future<> learned_addr(const entity_addr_t &peer_addr_for_me);
+ seastar::future<uint32_t> get_global_seq(uint32_t old=0);
+ seastar::future<> learned_addr(const entity_addr_t &peer_addr_for_me,
+ const SocketConnection& conn);
SocketConnectionRef lookup_conn(const entity_addr_t& addr);
void accept_conn(SocketConnectionRef);
void unaccept_conn(SocketConnectionRef);
void register_conn(SocketConnectionRef);
void unregister_conn(SocketConnectionRef);
-
- // required by sharded<>
- seastar::future<> stop() {
- return seastar::make_ready_future<>();
- }
-
seastar::shard_id shard_id() const {
- return sid;
+ assert(seastar::engine().cpu_id() == master_sid);
+ return master_sid;
}
};
-} // namespace ceph::net
+} // namespace crimson::net