]> git.proxmox.com Git - ceph.git/blob - ceph/src/crimson/net/ProtocolV2.h
update ceph source to reef 18.2.1
[ceph.git] / ceph / src / crimson / net / ProtocolV2.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #pragma once
5
6 #include <seastar/core/shared_future.hh>
7 #include <seastar/core/sleep.hh>
8
9 #include "io_handler.h"
10
11 namespace crimson::net {
12
13 class ProtocolV2 final : public HandshakeListener {
14 using AuthConnectionMetaRef = seastar::lw_shared_ptr<AuthConnectionMeta>;
15
16 public:
17 ProtocolV2(SocketConnection &,
18 IOHandler &);
19
20 ~ProtocolV2() final;
21
22 ProtocolV2(const ProtocolV2 &) = delete;
23 ProtocolV2(ProtocolV2 &&) = delete;
24 ProtocolV2 &operator=(const ProtocolV2 &) = delete;
25 ProtocolV2 &operator=(ProtocolV2 &&) = delete;
26
27 /**
28 * as HandshakeListener
29 */
30 private:
31 seastar::future<> notify_out(
32 crosscore_t::seq_t cc_seq) final;
33
34 seastar::future<> notify_out_fault(
35 crosscore_t::seq_t cc_seq,
36 const char *where,
37 std::exception_ptr,
38 io_handler_state) final;
39
40 seastar::future<> notify_mark_down(
41 crosscore_t::seq_t cc_seq) final;
42
43 /*
44 * as ProtocolV2 to be called by SocketConnection
45 */
46 public:
47 void start_connect(const entity_addr_t& peer_addr,
48 const entity_name_t& peer_name);
49
50 void start_accept(SocketFRef&& socket,
51 const entity_addr_t& peer_addr);
52
53 seastar::future<> close_clean_yielded();
54
55 #ifdef UNIT_TESTS_BUILT
56 bool is_ready() const {
57 return state == state_t::READY;
58 }
59
60 bool is_standby() const {
61 return state == state_t::STANDBY;
62 }
63
64 bool is_closed_clean() const {
65 return closed_clean;
66 }
67
68 bool is_closed() const {
69 return state == state_t::CLOSING;
70 }
71
72 #endif
73 private:
74 using io_state_t = IOHandler::io_state_t;
75
76 seastar::future<> wait_switch_io_shard() {
77 if (pr_switch_io_shard.has_value()) {
78 return pr_switch_io_shard->get_shared_future();
79 } else {
80 return seastar::now();
81 }
82 }
83
84 seastar::future<> wait_exit_io() {
85 if (pr_exit_io.has_value()) {
86 return pr_exit_io->get_shared_future();
87 } else {
88 assert(!need_exit_io);
89 return seastar::now();
90 }
91 }
92
93 enum class state_t {
94 NONE = 0,
95 ACCEPTING,
96 SERVER_WAIT,
97 ESTABLISHING,
98 CONNECTING,
99 READY,
100 STANDBY,
101 WAIT,
102 REPLACING,
103 CLOSING
104 };
105
106 static const char *get_state_name(state_t state) {
107 const char *const statenames[] = {"NONE",
108 "ACCEPTING",
109 "SERVER_WAIT",
110 "ESTABLISHING",
111 "CONNECTING",
112 "READY",
113 "STANDBY",
114 "WAIT",
115 "REPLACING",
116 "CLOSING"};
117 return statenames[static_cast<int>(state)];
118 }
119
120 void trigger_state_phase1(state_t new_state);
121
122 void trigger_state_phase2(state_t new_state, io_state_t new_io_state);
123
124 void trigger_state(state_t new_state, io_state_t new_io_state) {
125 ceph_assert_always(!pr_switch_io_shard.has_value());
126 trigger_state_phase1(new_state);
127 trigger_state_phase2(new_state, new_io_state);
128 }
129
130 template <typename Func, typename T>
131 void gated_execute(const char *what, T &who, Func &&func) {
132 gate.dispatch_in_background(what, who, [this, &who, &func] {
133 if (!execution_done.available()) {
134 // discard the unready future
135 gate.dispatch_in_background(
136 "gated_execute_abandon",
137 who,
138 [fut=std::move(execution_done)]() mutable {
139 return std::move(fut);
140 }
141 );
142 }
143 seastar::promise<> pr;
144 execution_done = pr.get_future();
145 return seastar::futurize_invoke(std::forward<Func>(func)
146 ).finally([pr=std::move(pr)]() mutable {
147 pr.set_value();
148 });
149 });
150 }
151
152 void fault(state_t expected_state,
153 const char *where,
154 std::exception_ptr eptr);
155
156 void reset_session(bool is_full);
157 seastar::future<std::tuple<entity_type_t, entity_addr_t>>
158 banner_exchange(bool is_connect);
159
160 enum class next_step_t {
161 ready,
162 wait,
163 none, // protocol should have been aborted or failed
164 };
165
166 // CONNECTING (client)
167 seastar::future<> handle_auth_reply();
168 inline seastar::future<> client_auth() {
169 std::vector<uint32_t> empty;
170 return client_auth(empty);
171 }
172 seastar::future<> client_auth(std::vector<uint32_t> &allowed_methods);
173
174 seastar::future<next_step_t> process_wait();
175 seastar::future<next_step_t> client_connect();
176 seastar::future<next_step_t> client_reconnect();
177 void execute_connecting();
178
179 // ACCEPTING (server)
180 seastar::future<> _auth_bad_method(int r);
181 seastar::future<> _handle_auth_request(bufferlist& auth_payload, bool more);
182 seastar::future<> server_auth();
183
184 bool validate_peer_name(const entity_name_t& peer_name) const;
185 seastar::future<next_step_t> send_wait();
186 seastar::future<next_step_t> reuse_connection(ProtocolV2* existing_proto,
187 bool do_reset=false,
188 bool reconnect=false,
189 uint64_t conn_seq=0,
190 uint64_t msg_seq=0);
191
192 seastar::future<next_step_t> handle_existing_connection(SocketConnectionRef existing_conn);
193 seastar::future<next_step_t> server_connect();
194
195 seastar::future<next_step_t> read_reconnect();
196 seastar::future<next_step_t> send_retry(uint64_t connect_seq);
197 seastar::future<next_step_t> send_retry_global(uint64_t global_seq);
198 seastar::future<next_step_t> send_reset(bool full);
199 seastar::future<next_step_t> server_reconnect();
200
201 void execute_accepting();
202
203 // CONNECTING/ACCEPTING
204 seastar::future<> finish_auth();
205
206 // ESTABLISHING
207 void execute_establishing(SocketConnectionRef existing_conn);
208
209 // ESTABLISHING/REPLACING (server)
210 seastar::future<> send_server_ident();
211
212 // REPLACING (server)
213 void trigger_replacing(bool reconnect,
214 bool do_reset,
215 FrameAssemblerV2::mover_t &&mover,
216 AuthConnectionMetaRef&& new_auth_meta,
217 uint64_t new_peer_global_seq,
218 // !reconnect
219 uint64_t new_client_cookie,
220 entity_name_t new_peer_name,
221 uint64_t new_conn_features,
222 uint64_t new_peer_supported_features,
223 // reconnect
224 uint64_t new_connect_seq,
225 uint64_t new_msg_seq);
226
227 // READY
228 void execute_ready();
229
230 // STANDBY
231 void execute_standby();
232
233 // WAIT
234 void execute_wait(bool max_backoff);
235
236 // SERVER_WAIT
237 void execute_server_wait();
238
239 // CLOSING
240 // reentrant
241 void do_close(bool is_dispatch_reset,
242 std::optional<std::function<void()>> f_accept_new=std::nullopt);
243
244 private:
245 SocketConnection &conn;
246
247 SocketMessenger &messenger;
248
249 IOHandler &io_handler;
250
251 // asynchronously populated from io_handler
252 io_handler_state io_states;
253
254 crosscore_t crosscore;
255
256 bool has_socket = false;
257
258 // the socket exists and it is not shutdown
259 bool is_socket_valid = false;
260
261 FrameAssemblerV2Ref frame_assembler;
262
263 bool need_notify_out = false;
264
265 std::optional<seastar::shared_promise<>> pr_switch_io_shard;
266
267 bool need_exit_io = false;
268
269 std::optional<seastar::shared_promise<>> pr_exit_io;
270
271 AuthConnectionMetaRef auth_meta;
272
273 crimson::common::Gated gate;
274
275 seastar::shared_promise<> pr_closed_clean;
276
277 #ifdef UNIT_TESTS_BUILT
278 bool closed_clean = false;
279
280 #endif
281 state_t state = state_t::NONE;
282
283 uint64_t peer_supported_features = 0;
284
285 uint64_t client_cookie = 0;
286 uint64_t server_cookie = 0;
287 uint64_t global_seq = 0;
288 uint64_t peer_global_seq = 0;
289 uint64_t connect_seq = 0;
290
291 seastar::future<> execution_done = seastar::now();
292
293 class Timer {
294 double last_dur_ = 0.0;
295 const SocketConnection& conn;
296 std::optional<seastar::abort_source> as;
297 public:
298 Timer(SocketConnection& conn) : conn(conn) {}
299 double last_dur() const { return last_dur_; }
300 seastar::future<> backoff(double seconds);
301 void cancel() {
302 last_dur_ = 0.0;
303 if (as) {
304 as->request_abort();
305 as = std::nullopt;
306 }
307 }
308 };
309 Timer protocol_timer;
310 };
311
312 struct create_handlers_ret {
313 std::unique_ptr<ConnectionHandler> io_handler;
314 std::unique_ptr<ProtocolV2> protocol;
315 };
316 inline create_handlers_ret create_handlers(ChainedDispatchers &dispatchers, SocketConnection &conn) {
317 std::unique_ptr<ConnectionHandler> io_handler = std::make_unique<IOHandler>(dispatchers, conn);
318 IOHandler &io_handler_concrete = static_cast<IOHandler&>(*io_handler);
319 auto protocol = std::make_unique<ProtocolV2>(conn, io_handler_concrete);
320 io_handler_concrete.set_handshake_listener(*protocol);
321 return {std::move(io_handler), std::move(protocol)};
322 }
323
324 } // namespace crimson::net
325
326 #if FMT_VERSION >= 90000
327 template <> struct fmt::formatter<crimson::net::ProtocolV2> : fmt::ostream_formatter {};
328 #endif