class ProtocolV2 final : public Protocol {
public:
- ProtocolV2(Dispatcher& dispatcher,
+ ProtocolV2(ChainedDispatchers& dispatchers,
SocketConnection& conn,
SocketMessenger& messenger);
~ProtocolV2() override;
-
+ void print(std::ostream&) const final;
private:
+ void on_closed() override;
+ bool is_connected() const override;
+
void start_connect(const entity_addr_t& peer_addr,
- const entity_type_t& peer_type) override;
+ const entity_name_t& peer_name) override;
void start_accept(SocketRef&& socket,
const entity_addr_t& peer_addr) override;
seastar::shared_future<> execution_done = seastar::now();
+ template <typename Func>
+ void gated_execute(const char* what, Func&& func) {
+ gate.dispatch_in_background(what, *this, [this, &func] {
+ execution_done = seastar::futurize_invoke(std::forward<Func>(func));
+ return execution_done.get_future();
+ });
+ }
+
class Timer {
double last_dur_ = 0.0;
const SocketConnection& conn;
seastar::future<> write_flush(bufferlist&& buf);
ceph::crypto::onwire::rxtx_t session_stream_handlers;
- boost::container::static_vector<ceph::msgr::v2::segment_t,
- ceph::msgr::v2::MAX_NUM_SEGMENTS> rx_segments_desc;
- ceph::msgr::v2::FrameAssembler tx_frame_asm;
+ ceph::msgr::v2::FrameAssembler tx_frame_asm{&session_stream_handlers, false};
+ ceph::msgr::v2::FrameAssembler rx_frame_asm{&session_stream_handlers, false};
+ ceph::bufferlist rx_preamble;
ceph::msgr::v2::segment_bls_t rx_segments_data;
size_t get_current_msg_size() const;
private:
void fault(bool backoff, const char* func_name, std::exception_ptr eptr);
- void dispatch_reset();
void reset_session(bool full);
- seastar::future<entity_type_t, entity_addr_t> banner_exchange();
+ seastar::future<std::tuple<entity_type_t, entity_addr_t>>
+ banner_exchange(bool is_connect);
enum class next_step_t {
ready,
seastar::future<> _handle_auth_request(bufferlist& auth_payload, bool more);
seastar::future<> server_auth();
+ bool validate_peer_name(const entity_name_t& peer_name) const;
seastar::future<next_step_t> send_wait();
seastar::future<next_step_t> reuse_connection(ProtocolV2* existing_proto,
bool do_reset=false,
seastar::future<> finish_auth();
// ESTABLISHING
- void execute_establishing();
+ void execute_establishing(SocketConnectionRef existing_conn, bool dispatch_reset);
// ESTABLISHING/REPLACING (server)
seastar::future<> send_server_ident();
uint64_t new_client_cookie,
entity_name_t new_peer_name,
uint64_t new_conn_features,
+ bool tx_is_rev1,
+ bool rx_is_rev1,
// reconnect
uint64_t new_connect_seq,
uint64_t new_msg_seq);
// READY
seastar::future<> read_message(utime_t throttle_stamp);
- void execute_ready();
+ void execute_ready(bool dispatch_connect);
// STANDBY
void execute_standby();