1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
6 #include <seastar/core/sleep.hh>
9 #include "msg/async/frames_v2.h"
10 #include "msg/async/crypto_onwire.h"
11 #include "msg/async/compression_onwire.h"
13 namespace crimson::net
{
15 class ProtocolV2 final
: public Protocol
{
17 ProtocolV2(ChainedDispatchers
& dispatchers
,
18 SocketConnection
& conn
,
19 SocketMessenger
& messenger
);
20 ~ProtocolV2() override
;
21 void print(std::ostream
&) const final
;
23 void on_closed() override
;
24 bool is_connected() const override
;
26 void start_connect(const entity_addr_t
& peer_addr
,
27 const entity_name_t
& peer_name
) override
;
29 void start_accept(SocketRef
&& socket
,
30 const entity_addr_t
& peer_addr
) override
;
32 void trigger_close() override
;
34 ceph::bufferlist
do_sweep_messages(
35 const std::deque
<MessageURef
>& msgs
,
37 bool require_keepalive
,
38 std::optional
<utime_t
> keepalive_ack
,
39 bool require_ack
) override
;
41 void notify_write() override
;
44 SocketMessenger
&messenger
;
58 state_t state
= state_t::NONE
;
60 static const char *get_state_name(state_t state
) {
61 const char *const statenames
[] = {"NONE",
71 return statenames
[static_cast<int>(state
)];
74 void trigger_state(state_t state
, write_state_t write_state
, bool reentrant
);
76 uint64_t connection_features
= 0;
77 uint64_t peer_required_features
= 0;
79 uint64_t client_cookie
= 0;
80 uint64_t server_cookie
= 0;
81 uint64_t global_seq
= 0;
82 uint64_t peer_global_seq
= 0;
83 uint64_t connect_seq
= 0;
85 seastar::shared_future
<> execution_done
= seastar::now();
87 template <typename Func
>
88 void gated_execute(const char* what
, Func
&& func
) {
89 gate
.dispatch_in_background(what
, *this, [this, &func
] {
90 execution_done
= seastar::futurize_invoke(std::forward
<Func
>(func
));
91 return execution_done
.get_future();
96 double last_dur_
= 0.0;
97 const SocketConnection
& conn
;
98 std::optional
<seastar::abort_source
> as
;
100 Timer(SocketConnection
& conn
) : conn(conn
) {}
101 double last_dur() const { return last_dur_
; }
102 seastar::future
<> backoff(double seconds
);
111 Timer protocol_timer
;
113 // TODO: Frame related implementations, probably to a separate class.
115 bool record_io
= false;
116 ceph::bufferlist rxbuf
;
117 ceph::bufferlist txbuf
;
119 void enable_recording();
120 seastar::future
<Socket::tmp_buf
> read_exactly(size_t bytes
);
121 seastar::future
<bufferlist
> read(size_t bytes
);
122 seastar::future
<> write(bufferlist
&& buf
);
123 seastar::future
<> write_flush(bufferlist
&& buf
);
125 ceph::crypto::onwire::rxtx_t session_stream_handlers
;
126 ceph::compression::onwire::rxtx_t session_comp_handlers
;
127 ceph::msgr::v2::FrameAssembler tx_frame_asm
{
128 &session_stream_handlers
, false, common::local_conf()->ms_crc_data
,
129 &session_comp_handlers
};
130 ceph::msgr::v2::FrameAssembler rx_frame_asm
{
131 &session_stream_handlers
, false, common::local_conf()->ms_crc_data
,
132 &session_comp_handlers
};
133 ceph::bufferlist rx_preamble
;
134 ceph::msgr::v2::segment_bls_t rx_segments_data
;
136 size_t get_current_msg_size() const;
137 seastar::future
<ceph::msgr::v2::Tag
> read_main_preamble();
138 seastar::future
<> read_frame_payload();
140 seastar::future
<> write_frame(F
&frame
, bool flush
=true);
143 void fault(bool backoff
, const char* func_name
, std::exception_ptr eptr
);
144 void reset_session(bool full
);
145 seastar::future
<std::tuple
<entity_type_t
, entity_addr_t
>>
146 banner_exchange(bool is_connect
);
148 enum class next_step_t
{
151 none
, // protocol should have been aborted or failed
154 // CONNECTING (client)
155 seastar::future
<> handle_auth_reply();
156 inline seastar::future
<> client_auth() {
157 std::vector
<uint32_t> empty
;
158 return client_auth(empty
);
160 seastar::future
<> client_auth(std::vector
<uint32_t> &allowed_methods
);
162 seastar::future
<next_step_t
> process_wait();
163 seastar::future
<next_step_t
> client_connect();
164 seastar::future
<next_step_t
> client_reconnect();
165 void execute_connecting();
167 // ACCEPTING (server)
168 seastar::future
<> _auth_bad_method(int r
);
169 seastar::future
<> _handle_auth_request(bufferlist
& auth_payload
, bool more
);
170 seastar::future
<> server_auth();
172 bool validate_peer_name(const entity_name_t
& peer_name
) const;
173 seastar::future
<next_step_t
> send_wait();
174 seastar::future
<next_step_t
> reuse_connection(ProtocolV2
* existing_proto
,
176 bool reconnect
=false,
180 seastar::future
<next_step_t
> handle_existing_connection(SocketConnectionRef existing_conn
);
181 seastar::future
<next_step_t
> server_connect();
183 seastar::future
<next_step_t
> read_reconnect();
184 seastar::future
<next_step_t
> send_retry(uint64_t connect_seq
);
185 seastar::future
<next_step_t
> send_retry_global(uint64_t global_seq
);
186 seastar::future
<next_step_t
> send_reset(bool full
);
187 seastar::future
<next_step_t
> server_reconnect();
189 void execute_accepting();
191 // CONNECTING/ACCEPTING
192 seastar::future
<> finish_auth();
195 void execute_establishing(SocketConnectionRef existing_conn
, bool dispatch_reset
);
197 // ESTABLISHING/REPLACING (server)
198 seastar::future
<> send_server_ident();
200 // REPLACING (server)
201 void trigger_replacing(bool reconnect
,
203 SocketRef
&& new_socket
,
204 AuthConnectionMetaRef
&& new_auth_meta
,
205 ceph::crypto::onwire::rxtx_t new_rxtx
,
206 uint64_t new_peer_global_seq
,
208 uint64_t new_client_cookie
,
209 entity_name_t new_peer_name
,
210 uint64_t new_conn_features
,
214 uint64_t new_connect_seq
,
215 uint64_t new_msg_seq
);
218 seastar::future
<> read_message(utime_t throttle_stamp
);
219 void execute_ready(bool dispatch_connect
);
222 void execute_standby();
225 void execute_wait(bool max_backoff
);
228 void execute_server_wait();
231 } // namespace crimson::net