class ProtocolV2;
class SocketMessenger;
+class SocketConnection;
using SocketConnectionRef = seastar::shared_ptr<SocketConnection>;
#ifdef UNIT_TESTS_BUILT
* ConnectionHandler
*
* The interface class to implement Connection, called by SocketConnection.
+ *
+ * The operations must be done in get_shard_id().
*/
class ConnectionHandler {
public:
ConnectionHandler &operator=(const ConnectionHandler &) = delete;
ConnectionHandler &operator=(ConnectionHandler &&) = delete;
+ virtual seastar::shard_id get_shard_id() const = 0;
+
virtual bool is_connected() const = 0;
- virtual seastar::future<> send(MessageURef) = 0;
+ virtual seastar::future<> send(MessageFRef) = 0;
virtual seastar::future<> send_keepalive() = 0;
};
class SocketConnection : public Connection {
- const seastar::shard_id core;
-
- SocketMessenger& messenger;
-
- std::unique_ptr<ConnectionHandler> io_handler;
-
- std::unique_ptr<ProtocolV2> protocol;
-
- SocketRef socket;
-
- entity_name_t peer_name = {0, entity_name_t::NEW};
-
- entity_addr_t peer_addr;
-
- // which of the peer_addrs we're connecting to (as client)
- // or should reconnect to (as peer)
- entity_addr_t target_addr;
-
- uint64_t features = 0;
-
- ceph::net::Policy<crimson::common::Throttle> policy;
-
- uint64_t peer_global_id = 0;
-
- std::unique_ptr<user_private_t> user_private;
-
- // Connection interfaces, public to users
+ /*
+ * Connection interfaces, public to users
+ * Working in ConnectionHandler::get_shard_id()
+ */
public:
SocketConnection(SocketMessenger& messenger,
ChainedDispatchers& dispatchers);
~SocketConnection() override;
+ const seastar::shard_id get_shard_id() const override {
+ return io_handler->get_shard_id();
+ }
+
const entity_name_t &get_peer_name() const override {
return peer_name;
}
void print(std::ostream& out) const override;
- // public to SocketMessenger
+ /*
+ * Public to SocketMessenger
+ * Working in SocketMessenger::get_shard_id();
+ */
public:
/// start a handshake from the client's perspective,
/// only call when SocketConnection first construct
/// start a handshake from the server's perspective,
/// only call when SocketConnection first construct
- void start_accept(SocketRef&& socket,
+ void start_accept(SocketFRef&& socket,
const entity_addr_t& peer_addr);
seastar::future<> close_clean_yielded();
seastar::socket_address get_local_address() const;
- SocketMessenger &get_messenger() const {
- return messenger;
- }
+ seastar::shard_id get_messenger_shard_id() const;
+
+ SocketMessenger &get_messenger() const;
ConnectionRef get_local_shared_foreign_from_this();
private:
- seastar::shard_id shard_id() const;
-
- void set_peer_type(entity_type_t peer_type) {
- // it is not allowed to assign an unknown value when the current
- // value is known
- assert(!(peer_type == 0 &&
- peer_name.type() != 0));
- // it is not allowed to assign a different known value when the
- // current value is also known.
- assert(!(peer_type != 0 &&
- peer_name.type() != 0 &&
- peer_type != peer_name.type()));
- peer_name._type = peer_type;
- }
+ void set_peer_type(entity_type_t peer_type);
- void set_peer_id(int64_t peer_id) {
- // it is not allowed to assign an unknown value when the current
- // value is known
- assert(!(peer_id == entity_name_t::NEW &&
- peer_name.num() != entity_name_t::NEW));
- // it is not allowed to assign a different known value when the
- // current value is also known.
- assert(!(peer_id != entity_name_t::NEW &&
- peer_name.num() != entity_name_t::NEW &&
- peer_id != peer_name.num()));
- peer_name._num = peer_id;
- }
+ void set_peer_id(int64_t peer_id);
void set_peer_name(entity_name_t name) {
set_peer_type(name.type());
set_peer_id(name.num());
}
- void set_features(uint64_t f) {
- features = f;
- }
+ void set_features(uint64_t f);
+
+ void set_socket(Socket *s);
#ifdef UNIT_TESTS_BUILT
- bool is_closed_clean() const override;
+ bool is_protocol_ready() const override;
+
+ bool is_protocol_standby() const override;
- bool is_closed() const override;
+ bool is_protocol_closed_clean() const override;
+
+ bool is_protocol_closed() const override;
// peer wins if myaddr > peeraddr
bool peer_wins() const override;
bool peer_wins() const;
#endif
+private:
+ const seastar::shard_id msgr_sid;
+
+ /*
+ * Core owner is messenger core, may allow to access from the I/O core.
+ */
+ SocketMessenger& messenger;
+
+ std::unique_ptr<ProtocolV2> protocol;
+
+ Socket *socket = nullptr;
+
+ entity_name_t peer_name = {0, entity_name_t::NEW};
+
+ entity_addr_t peer_addr;
+
+ // which of the peer_addrs we're connecting to (as client)
+ // or should reconnect to (as peer)
+ entity_addr_t target_addr;
+
+ uint64_t features = 0;
+
+ ceph::net::Policy<crimson::common::Throttle> policy;
+
+ uint64_t peer_global_id = 0;
+
+ /*
+ * Core owner is I/O core (mutable).
+ */
+ std::unique_ptr<ConnectionHandler> io_handler;
+
+ /*
+ * Core owner is up to the connection user.
+ */
+ std::unique_ptr<user_private_t> user_private;
+
friend class IOHandler;
friend class ProtocolV2;
friend class FrameAssemblerV2;