1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
6 #include <seastar/core/shared_future.hh>
7 #include <seastar/core/sleep.hh>
9 #include "io_handler.h"
11 namespace crimson::net
{
13 class ProtocolV2 final
: public HandshakeListener
{
14 using AuthConnectionMetaRef
= seastar::lw_shared_ptr
<AuthConnectionMeta
>;
17 ProtocolV2(SocketConnection
&,
22 ProtocolV2(const ProtocolV2
&) = delete;
23 ProtocolV2(ProtocolV2
&&) = delete;
24 ProtocolV2
&operator=(const ProtocolV2
&) = delete;
25 ProtocolV2
&operator=(ProtocolV2
&&) = delete;
28 * as HandshakeListener
31 void notify_out() final
;
33 void notify_out_fault(const char *, std::exception_ptr
) final
;
35 void notify_mark_down() final
;
38 * as ProtocolV2 to be called by SocketConnection
41 void start_connect(const entity_addr_t
& peer_addr
,
42 const entity_name_t
& peer_name
);
44 void start_accept(SocketRef
&& socket
,
45 const entity_addr_t
& peer_addr
);
47 seastar::future
<> close_clean_yielded();
49 #ifdef UNIT_TESTS_BUILT
50 bool is_closed_clean() const {
54 bool is_closed() const {
60 seastar::future
<> wait_exit_io() {
61 if (exit_io
.has_value()) {
62 return exit_io
->get_shared_future();
64 return seastar::now();
81 static const char *get_state_name(state_t state
) {
82 const char *const statenames
[] = {"NONE",
92 return statenames
[static_cast<int>(state
)];
95 void trigger_state(state_t state
, IOHandler::io_state_t io_state
, bool reentrant
);
97 template <typename Func
, typename T
>
98 void gated_execute(const char *what
, T
&who
, Func
&&func
) {
99 gate
.dispatch_in_background(what
, who
, [this, &who
, &func
] {
100 if (!execution_done
.available()) {
101 // discard the unready future
102 gate
.dispatch_in_background(
103 "gated_execute_abandon",
105 [fut
=std::move(execution_done
)]() mutable {
106 return std::move(fut
);
110 seastar::promise
<> pr
;
111 execution_done
= pr
.get_future();
112 return seastar::futurize_invoke(std::forward
<Func
>(func
)
113 ).finally([pr
=std::move(pr
)]() mutable {
119 void fault(state_t expected_state
,
121 std::exception_ptr eptr
);
123 void reset_session(bool is_full
);
124 seastar::future
<std::tuple
<entity_type_t
, entity_addr_t
>>
125 banner_exchange(bool is_connect
);
127 enum class next_step_t
{
130 none
, // protocol should have been aborted or failed
133 // CONNECTING (client)
134 seastar::future
<> handle_auth_reply();
135 inline seastar::future
<> client_auth() {
136 std::vector
<uint32_t> empty
;
137 return client_auth(empty
);
139 seastar::future
<> client_auth(std::vector
<uint32_t> &allowed_methods
);
141 seastar::future
<next_step_t
> process_wait();
142 seastar::future
<next_step_t
> client_connect();
143 seastar::future
<next_step_t
> client_reconnect();
144 void execute_connecting();
146 // ACCEPTING (server)
147 seastar::future
<> _auth_bad_method(int r
);
148 seastar::future
<> _handle_auth_request(bufferlist
& auth_payload
, bool more
);
149 seastar::future
<> server_auth();
151 bool validate_peer_name(const entity_name_t
& peer_name
) const;
152 seastar::future
<next_step_t
> send_wait();
153 seastar::future
<next_step_t
> reuse_connection(ProtocolV2
* existing_proto
,
155 bool reconnect
=false,
159 seastar::future
<next_step_t
> handle_existing_connection(SocketConnectionRef existing_conn
);
160 seastar::future
<next_step_t
> server_connect();
162 seastar::future
<next_step_t
> read_reconnect();
163 seastar::future
<next_step_t
> send_retry(uint64_t connect_seq
);
164 seastar::future
<next_step_t
> send_retry_global(uint64_t global_seq
);
165 seastar::future
<next_step_t
> send_reset(bool full
);
166 seastar::future
<next_step_t
> server_reconnect();
168 void execute_accepting();
170 // CONNECTING/ACCEPTING
171 seastar::future
<> finish_auth();
174 void execute_establishing(SocketConnectionRef existing_conn
);
176 // ESTABLISHING/REPLACING (server)
177 seastar::future
<> send_server_ident();
179 // REPLACING (server)
180 void trigger_replacing(bool reconnect
,
182 FrameAssemblerV2::mover_t
&&mover
,
183 AuthConnectionMetaRef
&& new_auth_meta
,
184 uint64_t new_peer_global_seq
,
186 uint64_t new_client_cookie
,
187 entity_name_t new_peer_name
,
188 uint64_t new_conn_features
,
189 uint64_t new_peer_supported_features
,
191 uint64_t new_connect_seq
,
192 uint64_t new_msg_seq
);
195 void execute_ready();
198 void execute_standby();
201 void execute_wait(bool max_backoff
);
204 void execute_server_wait();
208 void do_close(bool is_dispatch_reset
,
209 std::optional
<std::function
<void()>> f_accept_new
=std::nullopt
);
212 SocketConnection
&conn
;
214 SocketMessenger
&messenger
;
216 IOHandler
&io_handler
;
218 bool has_socket
= false;
220 // the socket exists and it is not shutdown
221 bool is_socket_valid
= false;
223 FrameAssemblerV2Ref frame_assembler
;
225 std::optional
<seastar::shared_promise
<>> exit_io
;
227 AuthConnectionMetaRef auth_meta
;
229 crimson::common::Gated gate
;
233 // become valid only after closed == true
234 seastar::shared_future
<> closed_clean_fut
;
236 #ifdef UNIT_TESTS_BUILT
237 bool closed_clean
= false;
240 state_t state
= state_t::NONE
;
242 uint64_t peer_supported_features
= 0;
244 uint64_t client_cookie
= 0;
245 uint64_t server_cookie
= 0;
246 uint64_t global_seq
= 0;
247 uint64_t peer_global_seq
= 0;
248 uint64_t connect_seq
= 0;
250 seastar::future
<> execution_done
= seastar::now();
253 double last_dur_
= 0.0;
254 const SocketConnection
& conn
;
255 std::optional
<seastar::abort_source
> as
;
257 Timer(SocketConnection
& conn
) : conn(conn
) {}
258 double last_dur() const { return last_dur_
; }
259 seastar::future
<> backoff(double seconds
);
268 Timer protocol_timer
;
271 struct create_handlers_ret
{
272 std::unique_ptr
<ConnectionHandler
> io_handler
;
273 std::unique_ptr
<ProtocolV2
> protocol
;
275 inline create_handlers_ret
create_handlers(ChainedDispatchers
&dispatchers
, SocketConnection
&conn
) {
276 std::unique_ptr
<ConnectionHandler
> io_handler
= std::make_unique
<IOHandler
>(dispatchers
, conn
);
277 IOHandler
&io_handler_concrete
= static_cast<IOHandler
&>(*io_handler
);
278 auto protocol
= std::make_unique
<ProtocolV2
>(conn
, io_handler_concrete
);
279 io_handler_concrete
.set_handshake_listener(*protocol
);
280 return {std::move(io_handler
), std::move(protocol
)};
283 } // namespace crimson::net
285 #if FMT_VERSION >= 90000
286 template <> struct fmt::formatter
<crimson::net::ProtocolV2
> : fmt::ostream_formatter
{};