]> git.proxmox.com Git - ceph.git/blob - ceph/src/crimson/net/ProtocolV2.h
import quincy beta 17.1.0
[ceph.git] / ceph / src / crimson / net / ProtocolV2.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #pragma once
5
6 #include <seastar/core/sleep.hh>
7
8 #include "Protocol.h"
9 #include "msg/async/frames_v2.h"
10 #include "msg/async/crypto_onwire.h"
11 #include "msg/async/compression_onwire.h"
12
13 namespace crimson::net {
14
15 class ProtocolV2 final : public Protocol {
16 public:
17 ProtocolV2(ChainedDispatchers& dispatchers,
18 SocketConnection& conn,
19 SocketMessenger& messenger);
20 ~ProtocolV2() override;
21 void print(std::ostream&) const final;
22 private:
23 void on_closed() override;
24 bool is_connected() const override;
25
26 void start_connect(const entity_addr_t& peer_addr,
27 const entity_name_t& peer_name) override;
28
29 void start_accept(SocketRef&& socket,
30 const entity_addr_t& peer_addr) override;
31
32 void trigger_close() override;
33
34 ceph::bufferlist do_sweep_messages(
35 const std::deque<MessageURef>& msgs,
36 size_t num_msgs,
37 bool require_keepalive,
38 std::optional<utime_t> keepalive_ack,
39 bool require_ack) override;
40
41 void notify_write() override;
42
43 private:
44 SocketMessenger &messenger;
45
46 enum class state_t {
47 NONE = 0,
48 ACCEPTING,
49 SERVER_WAIT,
50 ESTABLISHING,
51 CONNECTING,
52 READY,
53 STANDBY,
54 WAIT,
55 REPLACING,
56 CLOSING
57 };
58 state_t state = state_t::NONE;
59
60 static const char *get_state_name(state_t state) {
61 const char *const statenames[] = {"NONE",
62 "ACCEPTING",
63 "SERVER_WAIT",
64 "ESTABLISHING",
65 "CONNECTING",
66 "READY",
67 "STANDBY",
68 "WAIT",
69 "REPLACING",
70 "CLOSING"};
71 return statenames[static_cast<int>(state)];
72 }
73
74 void trigger_state(state_t state, write_state_t write_state, bool reentrant);
75
76 uint64_t connection_features = 0;
77 uint64_t peer_required_features = 0;
78
79 uint64_t client_cookie = 0;
80 uint64_t server_cookie = 0;
81 uint64_t global_seq = 0;
82 uint64_t peer_global_seq = 0;
83 uint64_t connect_seq = 0;
84
85 seastar::shared_future<> execution_done = seastar::now();
86
87 template <typename Func>
88 void gated_execute(const char* what, Func&& func) {
89 gate.dispatch_in_background(what, *this, [this, &func] {
90 execution_done = seastar::futurize_invoke(std::forward<Func>(func));
91 return execution_done.get_future();
92 });
93 }
94
95 class Timer {
96 double last_dur_ = 0.0;
97 const SocketConnection& conn;
98 std::optional<seastar::abort_source> as;
99 public:
100 Timer(SocketConnection& conn) : conn(conn) {}
101 double last_dur() const { return last_dur_; }
102 seastar::future<> backoff(double seconds);
103 void cancel() {
104 last_dur_ = 0.0;
105 if (as) {
106 as->request_abort();
107 as = std::nullopt;
108 }
109 }
110 };
111 Timer protocol_timer;
112
113 // TODO: Frame related implementations, probably to a separate class.
114 private:
115 bool record_io = false;
116 ceph::bufferlist rxbuf;
117 ceph::bufferlist txbuf;
118
119 void enable_recording();
120 seastar::future<Socket::tmp_buf> read_exactly(size_t bytes);
121 seastar::future<bufferlist> read(size_t bytes);
122 seastar::future<> write(bufferlist&& buf);
123 seastar::future<> write_flush(bufferlist&& buf);
124
125 ceph::crypto::onwire::rxtx_t session_stream_handlers;
126 ceph::compression::onwire::rxtx_t session_comp_handlers;
127 ceph::msgr::v2::FrameAssembler tx_frame_asm{
128 &session_stream_handlers, false, common::local_conf()->ms_crc_data,
129 &session_comp_handlers};
130 ceph::msgr::v2::FrameAssembler rx_frame_asm{
131 &session_stream_handlers, false, common::local_conf()->ms_crc_data,
132 &session_comp_handlers};
133 ceph::bufferlist rx_preamble;
134 ceph::msgr::v2::segment_bls_t rx_segments_data;
135
136 size_t get_current_msg_size() const;
137 seastar::future<ceph::msgr::v2::Tag> read_main_preamble();
138 seastar::future<> read_frame_payload();
139 template <class F>
140 seastar::future<> write_frame(F &frame, bool flush=true);
141
142 private:
143 void fault(bool backoff, const char* func_name, std::exception_ptr eptr);
144 void reset_session(bool full);
145 seastar::future<std::tuple<entity_type_t, entity_addr_t>>
146 banner_exchange(bool is_connect);
147
148 enum class next_step_t {
149 ready,
150 wait,
151 none, // protocol should have been aborted or failed
152 };
153
154 // CONNECTING (client)
155 seastar::future<> handle_auth_reply();
156 inline seastar::future<> client_auth() {
157 std::vector<uint32_t> empty;
158 return client_auth(empty);
159 }
160 seastar::future<> client_auth(std::vector<uint32_t> &allowed_methods);
161
162 seastar::future<next_step_t> process_wait();
163 seastar::future<next_step_t> client_connect();
164 seastar::future<next_step_t> client_reconnect();
165 void execute_connecting();
166
167 // ACCEPTING (server)
168 seastar::future<> _auth_bad_method(int r);
169 seastar::future<> _handle_auth_request(bufferlist& auth_payload, bool more);
170 seastar::future<> server_auth();
171
172 bool validate_peer_name(const entity_name_t& peer_name) const;
173 seastar::future<next_step_t> send_wait();
174 seastar::future<next_step_t> reuse_connection(ProtocolV2* existing_proto,
175 bool do_reset=false,
176 bool reconnect=false,
177 uint64_t conn_seq=0,
178 uint64_t msg_seq=0);
179
180 seastar::future<next_step_t> handle_existing_connection(SocketConnectionRef existing_conn);
181 seastar::future<next_step_t> server_connect();
182
183 seastar::future<next_step_t> read_reconnect();
184 seastar::future<next_step_t> send_retry(uint64_t connect_seq);
185 seastar::future<next_step_t> send_retry_global(uint64_t global_seq);
186 seastar::future<next_step_t> send_reset(bool full);
187 seastar::future<next_step_t> server_reconnect();
188
189 void execute_accepting();
190
191 // CONNECTING/ACCEPTING
192 seastar::future<> finish_auth();
193
194 // ESTABLISHING
195 void execute_establishing(SocketConnectionRef existing_conn, bool dispatch_reset);
196
197 // ESTABLISHING/REPLACING (server)
198 seastar::future<> send_server_ident();
199
200 // REPLACING (server)
201 void trigger_replacing(bool reconnect,
202 bool do_reset,
203 SocketRef&& new_socket,
204 AuthConnectionMetaRef&& new_auth_meta,
205 ceph::crypto::onwire::rxtx_t new_rxtx,
206 uint64_t new_peer_global_seq,
207 // !reconnect
208 uint64_t new_client_cookie,
209 entity_name_t new_peer_name,
210 uint64_t new_conn_features,
211 bool tx_is_rev1,
212 bool rx_is_rev1,
213 // reconnect
214 uint64_t new_connect_seq,
215 uint64_t new_msg_seq);
216
217 // READY
218 seastar::future<> read_message(utime_t throttle_stamp);
219 void execute_ready(bool dispatch_connect);
220
221 // STANDBY
222 void execute_standby();
223
224 // WAIT
225 void execute_wait(bool max_backoff);
226
227 // SERVER_WAIT
228 void execute_server_wait();
229 };
230
231 } // namespace crimson::net