]> git.proxmox.com Git - ceph.git/blame - ceph/src/msg/async/ProtocolV2.h
import 15.2.5
[ceph.git] / ceph / src / msg / async / ProtocolV2.h
CommitLineData
11fdf7f2
TL
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#ifndef _MSG_ASYNC_PROTOCOL_V2_
5#define _MSG_ASYNC_PROTOCOL_V2_
6
11fdf7f2
TL
7#include "Protocol.h"
8#include "crypto_onwire.h"
9#include "frames_v2.h"
10
11class ProtocolV2 : public Protocol {
12private:
13 enum State {
14 NONE,
15 START_CONNECT,
16 BANNER_CONNECTING,
17 HELLO_CONNECTING,
18 AUTH_CONNECTING,
19 AUTH_CONNECTING_SIGN,
20 SESSION_CONNECTING,
21 SESSION_RECONNECTING,
22 START_ACCEPT,
23 BANNER_ACCEPTING,
24 HELLO_ACCEPTING,
25 AUTH_ACCEPTING,
26 AUTH_ACCEPTING_MORE,
27 AUTH_ACCEPTING_SIGN,
28 SESSION_ACCEPTING,
29 READY,
30 THROTTLE_MESSAGE,
31 THROTTLE_BYTES,
32 THROTTLE_DISPATCH_QUEUE,
33 THROTTLE_DONE,
34 READ_MESSAGE_COMPLETE,
35 STANDBY,
36 WAIT,
37 CLOSED
38 };
39
40 static const char *get_state_name(int state) {
41 const char *const statenames[] = {"NONE",
42 "START_CONNECT",
43 "BANNER_CONNECTING",
44 "HELLO_CONNECTING",
45 "AUTH_CONNECTING",
46 "AUTH_CONNECTING_SIGN",
47 "SESSION_CONNECTING",
48 "SESSION_RECONNECTING",
49 "START_ACCEPT",
50 "BANNER_ACCEPTING",
51 "HELLO_ACCEPTING",
52 "AUTH_ACCEPTING",
53 "AUTH_ACCEPTING_MORE",
54 "AUTH_ACCEPTING_SIGN",
55 "SESSION_ACCEPTING",
56 "READY",
57 "THROTTLE_MESSAGE",
58 "THROTTLE_BYTES",
59 "THROTTLE_DISPATCH_QUEUE",
60 "THROTTLE_DONE",
61 "READ_MESSAGE_COMPLETE",
62 "STANDBY",
63 "WAIT",
64 "CLOSED"};
65 return statenames[state];
66 }
67
11fdf7f2
TL
68 // TODO: move into auth_meta?
69 ceph::crypto::onwire::rxtx_t session_stream_handlers;
f6b5b4d7 70
11fdf7f2
TL
71 entity_name_t peer_name;
72 State state;
f6b5b4d7 73 uint64_t peer_supported_features; // CEPH_MSGR2_FEATURE_*
11fdf7f2
TL
74
75 uint64_t client_cookie;
76 uint64_t server_cookie;
77 uint64_t global_seq;
78 uint64_t connect_seq;
79 uint64_t peer_global_seq;
80 uint64_t message_seq;
81 bool reconnecting;
82 bool replacing;
83 bool can_write;
84 struct out_queue_entry_t {
85 bool is_prepared {false};
86 Message* m {nullptr};
87 };
88 std::map<int, std::list<out_queue_entry_t>> out_queue;
89 std::list<Message *> sent;
90 std::atomic<uint64_t> out_seq{0};
91 std::atomic<uint64_t> in_seq{0};
92 std::atomic<uint64_t> ack_left{0};
93
94 using ProtFuncPtr = void (ProtocolV2::*)();
95 Ct<ProtocolV2> *bannerExchangeCallback;
96
f6b5b4d7
TL
97 ceph::msgr::v2::FrameAssembler tx_frame_asm;
98 ceph::msgr::v2::FrameAssembler rx_frame_asm;
99
100 ceph::bufferlist rx_preamble;
101 ceph::bufferlist rx_epilogue;
102 ceph::msgr::v2::segment_bls_t rx_segments_data;
11fdf7f2
TL
103 ceph::msgr::v2::Tag next_tag;
104 utime_t backoff; // backoff time
105 utime_t recv_stamp;
106 utime_t throttle_stamp;
107
108 struct {
109 ceph::bufferlist rxbuf;
110 ceph::bufferlist txbuf;
111 bool enabled {true};
112 } pre_auth;
113
114 bool keepalive;
494da23a 115 bool write_in_progress = false;
11fdf7f2
TL
116
117 ostream &_conn_prefix(std::ostream *_dout);
118 void run_continuation(Ct<ProtocolV2> *pcontinuation);
119 void run_continuation(Ct<ProtocolV2> &continuation);
120
121 Ct<ProtocolV2> *read(CONTINUATION_RXBPTR_TYPE<ProtocolV2> &next,
122 rx_buffer_t&& buffer);
123 template <class F>
124 Ct<ProtocolV2> *write(const std::string &desc,
125 CONTINUATION_TYPE<ProtocolV2> &next,
126 F &frame);
127 Ct<ProtocolV2> *write(const std::string &desc,
128 CONTINUATION_TYPE<ProtocolV2> &next,
129 bufferlist &buffer);
130
801d1391
TL
131 template <class F>
132 bool append_frame(F& frame);
133
11fdf7f2
TL
134 void requeue_sent();
135 uint64_t discard_requeued_up_to(uint64_t out_seq, uint64_t seq);
136 void reset_recv_state();
9f95a23c 137 void reset_security();
11fdf7f2
TL
138 void reset_throttle();
139 Ct<ProtocolV2> *_fault();
140 void discard_out_queue();
141 void reset_session();
142 void prepare_send_message(uint64_t features, Message *m);
143 out_queue_entry_t _get_next_outgoing();
144 ssize_t write_message(Message *m, bool more);
11fdf7f2
TL
145 void handle_message_ack(uint64_t seq);
146
147 CONTINUATION_DECL(ProtocolV2, _wait_for_peer_banner);
148 READ_BPTR_HANDLER_CONTINUATION_DECL(ProtocolV2, _handle_peer_banner);
149 READ_BPTR_HANDLER_CONTINUATION_DECL(ProtocolV2, _handle_peer_banner_payload);
150
151 Ct<ProtocolV2> *_banner_exchange(Ct<ProtocolV2> &callback);
152 Ct<ProtocolV2> *_wait_for_peer_banner();
153 Ct<ProtocolV2> *_handle_peer_banner(rx_buffer_t &&buffer, int r);
154 Ct<ProtocolV2> *_handle_peer_banner_payload(rx_buffer_t &&buffer, int r);
155 Ct<ProtocolV2> *handle_hello(ceph::bufferlist &payload);
156
157 CONTINUATION_DECL(ProtocolV2, read_frame);
158 CONTINUATION_DECL(ProtocolV2, finish_auth);
159 READ_BPTR_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_read_frame_preamble_main);
160 READ_BPTR_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_read_frame_segment);
161 READ_BPTR_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_read_frame_epilogue_main);
162 CONTINUATION_DECL(ProtocolV2, throttle_message);
163 CONTINUATION_DECL(ProtocolV2, throttle_bytes);
164 CONTINUATION_DECL(ProtocolV2, throttle_dispatch_queue);
165
166 Ct<ProtocolV2> *read_frame();
167 Ct<ProtocolV2> *finish_auth();
168 Ct<ProtocolV2> *finish_client_auth();
169 Ct<ProtocolV2> *handle_read_frame_preamble_main(rx_buffer_t &&buffer, int r);
170 Ct<ProtocolV2> *read_frame_segment();
171 Ct<ProtocolV2> *handle_read_frame_segment(rx_buffer_t &&rx_buffer, int r);
f6b5b4d7 172 Ct<ProtocolV2> *_handle_read_frame_segment();
11fdf7f2 173 Ct<ProtocolV2> *handle_read_frame_epilogue_main(rx_buffer_t &&buffer, int r);
f6b5b4d7 174 Ct<ProtocolV2> *_handle_read_frame_epilogue_main();
11fdf7f2
TL
175 Ct<ProtocolV2> *handle_read_frame_dispatch();
176 Ct<ProtocolV2> *handle_frame_payload();
177
178 Ct<ProtocolV2> *ready();
179
180 Ct<ProtocolV2> *handle_message();
181 Ct<ProtocolV2> *throttle_message();
182 Ct<ProtocolV2> *throttle_bytes();
183 Ct<ProtocolV2> *throttle_dispatch_queue();
184 Ct<ProtocolV2> *read_message_data_prepare();
185
186 Ct<ProtocolV2> *handle_keepalive2(ceph::bufferlist &payload);
187 Ct<ProtocolV2> *handle_keepalive2_ack(ceph::bufferlist &payload);
188
189 Ct<ProtocolV2> *handle_message_ack(ceph::bufferlist &payload);
190
191public:
192 uint64_t connection_features;
193
194 ProtocolV2(AsyncConnection *connection);
195 virtual ~ProtocolV2();
196
197 virtual void connect() override;
198 virtual void accept() override;
199 virtual bool is_connected() override;
200 virtual void stop() override;
201 virtual void fault() override;
202 virtual void send_message(Message *m) override;
203 virtual void send_keepalive() override;
204
205 virtual void read_event() override;
206 virtual void write_event() override;
207 virtual bool is_queued() override;
208
209private:
210 // Client Protocol
211 CONTINUATION_DECL(ProtocolV2, start_client_banner_exchange);
212 CONTINUATION_DECL(ProtocolV2, post_client_banner_exchange);
213
214 Ct<ProtocolV2> *start_client_banner_exchange();
215 Ct<ProtocolV2> *post_client_banner_exchange();
216 inline Ct<ProtocolV2> *send_auth_request() {
217 std::vector<uint32_t> empty;
218 return send_auth_request(empty);
219 }
220 Ct<ProtocolV2> *send_auth_request(std::vector<uint32_t> &allowed_methods);
221 Ct<ProtocolV2> *handle_auth_bad_method(ceph::bufferlist &payload);
222 Ct<ProtocolV2> *handle_auth_reply_more(ceph::bufferlist &payload);
223 Ct<ProtocolV2> *handle_auth_done(ceph::bufferlist &payload);
224 Ct<ProtocolV2> *handle_auth_signature(ceph::bufferlist &payload);
225 Ct<ProtocolV2> *send_client_ident();
226 Ct<ProtocolV2> *send_reconnect();
227 Ct<ProtocolV2> *handle_ident_missing_features(ceph::bufferlist &payload);
228 Ct<ProtocolV2> *handle_session_reset(ceph::bufferlist &payload);
229 Ct<ProtocolV2> *handle_session_retry(ceph::bufferlist &payload);
230 Ct<ProtocolV2> *handle_session_retry_global(ceph::bufferlist &payload);
231 Ct<ProtocolV2> *handle_wait(ceph::bufferlist &payload);
232 Ct<ProtocolV2> *handle_reconnect_ok(ceph::bufferlist &payload);
233 Ct<ProtocolV2> *handle_server_ident(ceph::bufferlist &payload);
234
235 // Server Protocol
236 CONTINUATION_DECL(ProtocolV2, start_server_banner_exchange);
237 CONTINUATION_DECL(ProtocolV2, post_server_banner_exchange);
238 CONTINUATION_DECL(ProtocolV2, server_ready);
239
240 Ct<ProtocolV2> *start_server_banner_exchange();
241 Ct<ProtocolV2> *post_server_banner_exchange();
242 Ct<ProtocolV2> *handle_auth_request(ceph::bufferlist &payload);
243 Ct<ProtocolV2> *handle_auth_request_more(ceph::bufferlist &payload);
244 Ct<ProtocolV2> *_handle_auth_request(bufferlist& auth_payload, bool more);
245 Ct<ProtocolV2> *_auth_bad_method(int r);
246 Ct<ProtocolV2> *handle_client_ident(ceph::bufferlist &payload);
247 Ct<ProtocolV2> *handle_ident_missing_features_write(int r);
248 Ct<ProtocolV2> *handle_reconnect(ceph::bufferlist &payload);
9f95a23c
TL
249 Ct<ProtocolV2> *handle_existing_connection(const AsyncConnectionRef& existing);
250 Ct<ProtocolV2> *reuse_connection(const AsyncConnectionRef& existing,
11fdf7f2
TL
251 ProtocolV2 *exproto);
252 Ct<ProtocolV2> *send_server_ident();
253 Ct<ProtocolV2> *send_reconnect_ok();
254 Ct<ProtocolV2> *server_ready();
255
11fdf7f2
TL
256 size_t get_current_msg_size() const;
257};
258
259#endif /* _MSG_ASYNC_PROTOCOL_V2_ */