]> git.proxmox.com Git - ceph.git/blob - ceph/src/crimson/net/ProtocolV2.h
be9a2281668757ff6149399242ba628b393328a1
[ceph.git] / ceph / src / crimson / net / ProtocolV2.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #pragma once
5
6 #include <seastar/core/sleep.hh>
7
8 #include "Protocol.h"
9 #include "msg/async/frames_v2.h"
10 #include "msg/async/crypto_onwire.h"
11
12 namespace crimson::net {
13
14 class ProtocolV2 final : public Protocol {
15 public:
16 ProtocolV2(ChainedDispatchers& dispatchers,
17 SocketConnection& conn,
18 SocketMessenger& messenger);
19 ~ProtocolV2() override;
20 void print(std::ostream&) const final;
21 private:
22 void on_closed() override;
23 bool is_connected() const override;
24
25 void start_connect(const entity_addr_t& peer_addr,
26 const entity_name_t& peer_name) override;
27
28 void start_accept(SocketRef&& socket,
29 const entity_addr_t& peer_addr) override;
30
31 void trigger_close() override;
32
33 ceph::bufferlist do_sweep_messages(
34 const std::deque<MessageRef>& msgs,
35 size_t num_msgs,
36 bool require_keepalive,
37 std::optional<utime_t> keepalive_ack,
38 bool require_ack) override;
39
40 void notify_write() override;
41
42 private:
43 SocketMessenger &messenger;
44
45 enum class state_t {
46 NONE = 0,
47 ACCEPTING,
48 SERVER_WAIT,
49 ESTABLISHING,
50 CONNECTING,
51 READY,
52 STANDBY,
53 WAIT,
54 REPLACING,
55 CLOSING
56 };
57 state_t state = state_t::NONE;
58
59 static const char *get_state_name(state_t state) {
60 const char *const statenames[] = {"NONE",
61 "ACCEPTING",
62 "SERVER_WAIT",
63 "ESTABLISHING",
64 "CONNECTING",
65 "READY",
66 "STANDBY",
67 "WAIT",
68 "REPLACING",
69 "CLOSING"};
70 return statenames[static_cast<int>(state)];
71 }
72
73 void trigger_state(state_t state, write_state_t write_state, bool reentrant);
74
75 uint64_t connection_features = 0;
76 uint64_t peer_required_features = 0;
77
78 uint64_t client_cookie = 0;
79 uint64_t server_cookie = 0;
80 uint64_t global_seq = 0;
81 uint64_t peer_global_seq = 0;
82 uint64_t connect_seq = 0;
83
84 seastar::shared_future<> execution_done = seastar::now();
85
86 template <typename Func>
87 void gated_execute(const char* what, Func&& func) {
88 gate.dispatch_in_background(what, *this, [this, &func] {
89 execution_done = seastar::futurize_invoke(std::forward<Func>(func));
90 return execution_done.get_future();
91 });
92 }
93
94 class Timer {
95 double last_dur_ = 0.0;
96 const SocketConnection& conn;
97 std::optional<seastar::abort_source> as;
98 public:
99 Timer(SocketConnection& conn) : conn(conn) {}
100 double last_dur() const { return last_dur_; }
101 seastar::future<> backoff(double seconds);
102 void cancel() {
103 last_dur_ = 0.0;
104 if (as) {
105 as->request_abort();
106 as = std::nullopt;
107 }
108 }
109 };
110 Timer protocol_timer;
111
112 // TODO: Frame related implementations, probably to a separate class.
113 private:
114 bool record_io = false;
115 ceph::bufferlist rxbuf;
116 ceph::bufferlist txbuf;
117
118 void enable_recording();
119 seastar::future<Socket::tmp_buf> read_exactly(size_t bytes);
120 seastar::future<bufferlist> read(size_t bytes);
121 seastar::future<> write(bufferlist&& buf);
122 seastar::future<> write_flush(bufferlist&& buf);
123
124 ceph::crypto::onwire::rxtx_t session_stream_handlers;
125 ceph::msgr::v2::FrameAssembler tx_frame_asm{&session_stream_handlers, false};
126 ceph::msgr::v2::FrameAssembler rx_frame_asm{&session_stream_handlers, false};
127 ceph::bufferlist rx_preamble;
128 ceph::msgr::v2::segment_bls_t rx_segments_data;
129
130 size_t get_current_msg_size() const;
131 seastar::future<ceph::msgr::v2::Tag> read_main_preamble();
132 seastar::future<> read_frame_payload();
133 template <class F>
134 seastar::future<> write_frame(F &frame, bool flush=true);
135
136 private:
137 void fault(bool backoff, const char* func_name, std::exception_ptr eptr);
138 void reset_session(bool full);
139 seastar::future<std::tuple<entity_type_t, entity_addr_t>>
140 banner_exchange(bool is_connect);
141
142 enum class next_step_t {
143 ready,
144 wait,
145 none, // protocol should have been aborted or failed
146 };
147
148 // CONNECTING (client)
149 seastar::future<> handle_auth_reply();
150 inline seastar::future<> client_auth() {
151 std::vector<uint32_t> empty;
152 return client_auth(empty);
153 }
154 seastar::future<> client_auth(std::vector<uint32_t> &allowed_methods);
155
156 seastar::future<next_step_t> process_wait();
157 seastar::future<next_step_t> client_connect();
158 seastar::future<next_step_t> client_reconnect();
159 void execute_connecting();
160
161 // ACCEPTING (server)
162 seastar::future<> _auth_bad_method(int r);
163 seastar::future<> _handle_auth_request(bufferlist& auth_payload, bool more);
164 seastar::future<> server_auth();
165
166 bool validate_peer_name(const entity_name_t& peer_name) const;
167 seastar::future<next_step_t> send_wait();
168 seastar::future<next_step_t> reuse_connection(ProtocolV2* existing_proto,
169 bool do_reset=false,
170 bool reconnect=false,
171 uint64_t conn_seq=0,
172 uint64_t msg_seq=0);
173
174 seastar::future<next_step_t> handle_existing_connection(SocketConnectionRef existing_conn);
175 seastar::future<next_step_t> server_connect();
176
177 seastar::future<next_step_t> read_reconnect();
178 seastar::future<next_step_t> send_retry(uint64_t connect_seq);
179 seastar::future<next_step_t> send_retry_global(uint64_t global_seq);
180 seastar::future<next_step_t> send_reset(bool full);
181 seastar::future<next_step_t> server_reconnect();
182
183 void execute_accepting();
184
185 // CONNECTING/ACCEPTING
186 seastar::future<> finish_auth();
187
188 // ESTABLISHING
189 void execute_establishing(SocketConnectionRef existing_conn, bool dispatch_reset);
190
191 // ESTABLISHING/REPLACING (server)
192 seastar::future<> send_server_ident();
193
194 // REPLACING (server)
195 void trigger_replacing(bool reconnect,
196 bool do_reset,
197 SocketRef&& new_socket,
198 AuthConnectionMetaRef&& new_auth_meta,
199 ceph::crypto::onwire::rxtx_t new_rxtx,
200 uint64_t new_peer_global_seq,
201 // !reconnect
202 uint64_t new_client_cookie,
203 entity_name_t new_peer_name,
204 uint64_t new_conn_features,
205 bool tx_is_rev1,
206 bool rx_is_rev1,
207 // reconnect
208 uint64_t new_connect_seq,
209 uint64_t new_msg_seq);
210
211 // READY
212 seastar::future<> read_message(utime_t throttle_stamp);
213 void execute_ready(bool dispatch_connect);
214
215 // STANDBY
216 void execute_standby();
217
218 // WAIT
219 void execute_wait(bool max_backoff);
220
221 // SERVER_WAIT
222 void execute_server_wait();
223 };
224
225 } // namespace crimson::net