1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
6 #include <seastar/core/sleep.hh>
9 #include "msg/async/frames_v2.h"
10 #include "msg/async/crypto_onwire.h"
12 namespace crimson::net
{
14 class ProtocolV2 final
: public Protocol
{
16 ProtocolV2(ChainedDispatchers
& dispatchers
,
17 SocketConnection
& conn
,
18 SocketMessenger
& messenger
);
19 ~ProtocolV2() override
;
20 void print(std::ostream
&) const final
;
22 void on_closed() override
;
23 bool is_connected() const override
;
25 void start_connect(const entity_addr_t
& peer_addr
,
26 const entity_name_t
& peer_name
) override
;
28 void start_accept(SocketRef
&& socket
,
29 const entity_addr_t
& peer_addr
) override
;
31 void trigger_close() override
;
33 ceph::bufferlist
do_sweep_messages(
34 const std::deque
<MessageRef
>& msgs
,
36 bool require_keepalive
,
37 std::optional
<utime_t
> keepalive_ack
,
38 bool require_ack
) override
;
40 void notify_write() override
;
43 SocketMessenger
&messenger
;
57 state_t state
= state_t::NONE
;
59 static const char *get_state_name(state_t state
) {
60 const char *const statenames
[] = {"NONE",
70 return statenames
[static_cast<int>(state
)];
73 void trigger_state(state_t state
, write_state_t write_state
, bool reentrant
);
75 uint64_t connection_features
= 0;
76 uint64_t peer_required_features
= 0;
78 uint64_t client_cookie
= 0;
79 uint64_t server_cookie
= 0;
80 uint64_t global_seq
= 0;
81 uint64_t peer_global_seq
= 0;
82 uint64_t connect_seq
= 0;
84 seastar::shared_future
<> execution_done
= seastar::now();
86 template <typename Func
>
87 void gated_execute(const char* what
, Func
&& func
) {
88 gate
.dispatch_in_background(what
, *this, [this, &func
] {
89 execution_done
= seastar::futurize_invoke(std::forward
<Func
>(func
));
90 return execution_done
.get_future();
95 double last_dur_
= 0.0;
96 const SocketConnection
& conn
;
97 std::optional
<seastar::abort_source
> as
;
99 Timer(SocketConnection
& conn
) : conn(conn
) {}
100 double last_dur() const { return last_dur_
; }
101 seastar::future
<> backoff(double seconds
);
110 Timer protocol_timer
;
112 // TODO: Frame related implementations, probably to a separate class.
114 bool record_io
= false;
115 ceph::bufferlist rxbuf
;
116 ceph::bufferlist txbuf
;
118 void enable_recording();
119 seastar::future
<Socket::tmp_buf
> read_exactly(size_t bytes
);
120 seastar::future
<bufferlist
> read(size_t bytes
);
121 seastar::future
<> write(bufferlist
&& buf
);
122 seastar::future
<> write_flush(bufferlist
&& buf
);
124 ceph::crypto::onwire::rxtx_t session_stream_handlers
;
125 ceph::msgr::v2::FrameAssembler tx_frame_asm
{&session_stream_handlers
, false};
126 ceph::msgr::v2::FrameAssembler rx_frame_asm
{&session_stream_handlers
, false};
127 ceph::bufferlist rx_preamble
;
128 ceph::msgr::v2::segment_bls_t rx_segments_data
;
130 size_t get_current_msg_size() const;
131 seastar::future
<ceph::msgr::v2::Tag
> read_main_preamble();
132 seastar::future
<> read_frame_payload();
134 seastar::future
<> write_frame(F
&frame
, bool flush
=true);
137 void fault(bool backoff
, const char* func_name
, std::exception_ptr eptr
);
138 void reset_session(bool full
);
139 seastar::future
<std::tuple
<entity_type_t
, entity_addr_t
>>
140 banner_exchange(bool is_connect
);
142 enum class next_step_t
{
145 none
, // protocol should have been aborted or failed
148 // CONNECTING (client)
149 seastar::future
<> handle_auth_reply();
150 inline seastar::future
<> client_auth() {
151 std::vector
<uint32_t> empty
;
152 return client_auth(empty
);
154 seastar::future
<> client_auth(std::vector
<uint32_t> &allowed_methods
);
156 seastar::future
<next_step_t
> process_wait();
157 seastar::future
<next_step_t
> client_connect();
158 seastar::future
<next_step_t
> client_reconnect();
159 void execute_connecting();
161 // ACCEPTING (server)
162 seastar::future
<> _auth_bad_method(int r
);
163 seastar::future
<> _handle_auth_request(bufferlist
& auth_payload
, bool more
);
164 seastar::future
<> server_auth();
166 bool validate_peer_name(const entity_name_t
& peer_name
) const;
167 seastar::future
<next_step_t
> send_wait();
168 seastar::future
<next_step_t
> reuse_connection(ProtocolV2
* existing_proto
,
170 bool reconnect
=false,
174 seastar::future
<next_step_t
> handle_existing_connection(SocketConnectionRef existing_conn
);
175 seastar::future
<next_step_t
> server_connect();
177 seastar::future
<next_step_t
> read_reconnect();
178 seastar::future
<next_step_t
> send_retry(uint64_t connect_seq
);
179 seastar::future
<next_step_t
> send_retry_global(uint64_t global_seq
);
180 seastar::future
<next_step_t
> send_reset(bool full
);
181 seastar::future
<next_step_t
> server_reconnect();
183 void execute_accepting();
185 // CONNECTING/ACCEPTING
186 seastar::future
<> finish_auth();
189 void execute_establishing(SocketConnectionRef existing_conn
, bool dispatch_reset
);
191 // ESTABLISHING/REPLACING (server)
192 seastar::future
<> send_server_ident();
194 // REPLACING (server)
195 void trigger_replacing(bool reconnect
,
197 SocketRef
&& new_socket
,
198 AuthConnectionMetaRef
&& new_auth_meta
,
199 ceph::crypto::onwire::rxtx_t new_rxtx
,
200 uint64_t new_peer_global_seq
,
202 uint64_t new_client_cookie
,
203 entity_name_t new_peer_name
,
204 uint64_t new_conn_features
,
208 uint64_t new_connect_seq
,
209 uint64_t new_msg_seq
);
212 seastar::future
<> read_message(utime_t throttle_stamp
);
213 void execute_ready(bool dispatch_connect
);
216 void execute_standby();
219 void execute_wait(bool max_backoff
);
222 void execute_server_wait();
225 } // namespace crimson::net