]> git.proxmox.com Git - ceph.git/blame - ceph/src/msg/async/ProtocolV1.h
import 15.2.0 Octopus source
[ceph.git] / ceph / src / msg / async / ProtocolV1.h
CommitLineData
11fdf7f2
TL
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#ifndef _MSG_ASYNC_PROTOCOL_V1_
5#define _MSG_ASYNC_PROTOCOL_V1_
6
7#include "Protocol.h"
8
9class ProtocolV1;
10using CtPtr = Ct<ProtocolV1>*;
11
12class ProtocolV1 : public Protocol {
13/*
14 * ProtocolV1 State Machine
15 *
16
17 send_server_banner send_client_banner
18 | |
19 v v
20 wait_client_banner wait_server_banner
21 | |
22 | v
23 v handle_server_banner_and_identify
24 wait_connect_message <---------\ |
25 | | | v
26 | wait_connect_message_auth | send_connect_message <----------\
27 | | | | |
28 v v | | |
29handle_connect_message_2 | v |
30 | | | wait_connect_reply |
31 v v | | | |
32 replace -> send_connect_message_reply | V |
33 | | wait_connect_reply_auth |
34 | | | |
35 v v v |
36 open ---\ handle_connect_reply_2 --------/
37 | | |
38 | v v
39 | wait_seq wait_ack_seq
40 | | |
41 v v v
42 server_ready client_ready
43 | |
44 \------------------> wait_message <------------/
45 | ^ | ^
46 /------------------------/ | | |
47 | | | \----------------- ------------\
48 v /----------/ v |
49handle_keepalive2 | handle_message_header read_message_footer
50handle_keepalive2_ack | | ^
51handle_tag_ack | v |
52 | | throttle_message read_message_data
53 \----------------/ | ^
54 v |
55 read_message_front --> read_message_middle --/
56*/
57
58protected:
59
60 enum State {
61 NONE = 0,
62 START_CONNECT,
63 CONNECTING,
64 CONNECTING_WAIT_BANNER_AND_IDENTIFY,
65 CONNECTING_SEND_CONNECT_MSG,
66 START_ACCEPT,
67 ACCEPTING,
68 ACCEPTING_WAIT_CONNECT_MSG_AUTH,
69 ACCEPTING_HANDLED_CONNECT_MSG,
70 OPENED,
71 THROTTLE_MESSAGE,
72 THROTTLE_BYTES,
73 THROTTLE_DISPATCH_QUEUE,
74 READ_MESSAGE_FRONT,
75 READ_FOOTER_AND_DISPATCH,
76 CLOSED,
77 WAIT,
78 STANDBY
79 };
80
81 static const char *get_state_name(int state) {
82 const char *const statenames[] = {"NONE",
83 "START_CONNECT",
84 "CONNECTING",
85 "CONNECTING_WAIT_BANNER_AND_IDENTIFY",
86 "CONNECTING_SEND_CONNECT_MSG",
87 "START_ACCEPT",
88 "ACCEPTING",
89 "ACCEPTING_WAIT_CONNECT_MSG_AUTH",
90 "ACCEPTING_HANDLED_CONNECT_MSG",
91 "OPENED",
92 "THROTTLE_MESSAGE",
93 "THROTTLE_BYTES",
94 "THROTTLE_DISPATCH_QUEUE",
95 "READ_MESSAGE_FRONT",
96 "READ_FOOTER_AND_DISPATCH",
97 "CLOSED",
98 "WAIT",
99 "STANDBY"};
100 return statenames[state];
101 }
102
103 char *temp_buffer;
104
105 enum class WriteStatus { NOWRITE, REPLACING, CANWRITE, CLOSED };
106 std::atomic<WriteStatus> can_write;
107 std::list<Message *> sent; // the first bufferlist need to inject seq
108 // priority queue for outbound msgs
109 std::map<int, std::list<std::pair<bufferlist, Message *>>> out_q;
110 bool keepalive;
494da23a 111 bool write_in_progress = false;
11fdf7f2
TL
112
113 __u32 connect_seq, peer_global_seq;
114 std::atomic<uint64_t> in_seq{0};
115 std::atomic<uint64_t> out_seq{0};
116 std::atomic<uint64_t> ack_left{0};
117
11fdf7f2 118 std::shared_ptr<AuthSessionHandler> session_security;
11fdf7f2
TL
119
120 // Open state
121 ceph_msg_connect connect_msg;
122 ceph_msg_connect_reply connect_reply;
9f95a23c
TL
123 bufferlist authorizer_buf; // auth(orizer) payload read off the wire
124 bufferlist authorizer_more; // connect-side auth retry (we added challenge)
11fdf7f2
TL
125
126 utime_t backoff; // backoff time
127 utime_t recv_stamp;
128 utime_t throttle_stamp;
129 unsigned msg_left;
130 uint64_t cur_msg_size;
131 ceph_msg_header current_header;
132 bufferlist data_buf;
133 bufferlist::iterator data_blp;
134 bufferlist front, middle, data;
135
136 bool replacing; // when replacing process happened, we will reply connect
137 // side with RETRY tag and accept side will clear replaced
138 // connection. So when connect side reissue connect_msg,
139 // there won't exists conflicting connection so we use
140 // "replacing" to skip RESETSESSION to avoid detect wrong
141 // presentation
142 bool is_reset_from_peer;
143 bool once_ready;
144
145 State state;
146
147 void run_continuation(CtPtr pcontinuation);
148 CtPtr read(CONTINUATION_RX_TYPE<ProtocolV1> &next, int len,
149 char *buffer = nullptr);
150 CtPtr write(CONTINUATION_TX_TYPE<ProtocolV1> &next,bufferlist &bl);
151 inline CtPtr _fault() { // helper fault method that stops continuation
152 fault();
153 return nullptr;
154 }
155
156 CONTINUATION_DECL(ProtocolV1, wait_message);
157 READ_HANDLER_CONTINUATION_DECL(ProtocolV1, handle_message);
158 READ_HANDLER_CONTINUATION_DECL(ProtocolV1, handle_keepalive2);
159 READ_HANDLER_CONTINUATION_DECL(ProtocolV1, handle_keepalive2_ack);
160 READ_HANDLER_CONTINUATION_DECL(ProtocolV1, handle_tag_ack);
161 READ_HANDLER_CONTINUATION_DECL(ProtocolV1, handle_message_header);
162 CONTINUATION_DECL(ProtocolV1, throttle_message);
163 CONTINUATION_DECL(ProtocolV1, throttle_bytes);
164 CONTINUATION_DECL(ProtocolV1, throttle_dispatch_queue);
165 READ_HANDLER_CONTINUATION_DECL(ProtocolV1, handle_message_front);
166 READ_HANDLER_CONTINUATION_DECL(ProtocolV1, handle_message_middle);
167 CONTINUATION_DECL(ProtocolV1, read_message_data);
168 READ_HANDLER_CONTINUATION_DECL(ProtocolV1, handle_message_data);
169 READ_HANDLER_CONTINUATION_DECL(ProtocolV1, handle_message_footer);
170
171 CtPtr ready();
172 CtPtr wait_message();
173 CtPtr handle_message(char *buffer, int r);
174
175 CtPtr handle_keepalive2(char *buffer, int r);
176 void append_keepalive_or_ack(bool ack = false, utime_t *t = nullptr);
177 CtPtr handle_keepalive2_ack(char *buffer, int r);
178 CtPtr handle_tag_ack(char *buffer, int r);
179
180 CtPtr handle_message_header(char *buffer, int r);
181 CtPtr throttle_message();
182 CtPtr throttle_bytes();
183 CtPtr throttle_dispatch_queue();
184 CtPtr read_message_front();
185 CtPtr handle_message_front(char *buffer, int r);
186 CtPtr read_message_middle();
187 CtPtr handle_message_middle(char *buffer, int r);
188 CtPtr read_message_data_prepare();
189 CtPtr read_message_data();
190 CtPtr handle_message_data(char *buffer, int r);
191 CtPtr read_message_footer();
192 CtPtr handle_message_footer(char *buffer, int r);
193
194 void session_reset();
195 void randomize_out_seq();
196
197 Message *_get_next_outgoing(bufferlist *bl);
198
199 void prepare_send_message(uint64_t features, Message *m, bufferlist &bl);
200 ssize_t write_message(Message *m, bufferlist &bl, bool more);
201
202 void requeue_sent();
203 uint64_t discard_requeued_up_to(uint64_t out_seq, uint64_t seq);
204 void discard_out_queue();
205
206 void reset_recv_state();
9f95a23c 207 void reset_security();
11fdf7f2
TL
208
209 ostream &_conn_prefix(std::ostream *_dout);
210
211public:
212 ProtocolV1(AsyncConnection *connection);
213 virtual ~ProtocolV1();
214
215 virtual void connect() override;
216 virtual void accept() override;
217 virtual bool is_connected() override;
218 virtual void stop() override;
219 virtual void fault() override;
220 virtual void send_message(Message *m) override;
221 virtual void send_keepalive() override;
222
223 virtual void read_event() override;
224 virtual void write_event() override;
225 virtual bool is_queued() override;
226
227 // Client Protocol
228private:
229 int global_seq;
11fdf7f2
TL
230
231 CONTINUATION_DECL(ProtocolV1, send_client_banner);
232 WRITE_HANDLER_CONTINUATION_DECL(ProtocolV1, handle_client_banner_write);
233 READ_HANDLER_CONTINUATION_DECL(ProtocolV1, handle_server_banner_and_identify);
234 WRITE_HANDLER_CONTINUATION_DECL(ProtocolV1, handle_my_addr_write);
235 CONTINUATION_DECL(ProtocolV1, send_connect_message);
236 WRITE_HANDLER_CONTINUATION_DECL(ProtocolV1, handle_connect_message_write);
237 READ_HANDLER_CONTINUATION_DECL(ProtocolV1, handle_connect_reply_1);
238 READ_HANDLER_CONTINUATION_DECL(ProtocolV1, handle_connect_reply_auth);
239 READ_HANDLER_CONTINUATION_DECL(ProtocolV1, handle_ack_seq);
240 WRITE_HANDLER_CONTINUATION_DECL(ProtocolV1, handle_in_seq_write);
241
242 CtPtr send_client_banner();
243 CtPtr handle_client_banner_write(int r);
244 CtPtr wait_server_banner();
245 CtPtr handle_server_banner_and_identify(char *buffer, int r);
246 CtPtr handle_my_addr_write(int r);
247 CtPtr send_connect_message();
248 CtPtr handle_connect_message_write(int r);
249 CtPtr wait_connect_reply();
250 CtPtr handle_connect_reply_1(char *buffer, int r);
251 CtPtr wait_connect_reply_auth();
252 CtPtr handle_connect_reply_auth(char *buffer, int r);
253 CtPtr handle_connect_reply_2();
254 CtPtr wait_ack_seq();
255 CtPtr handle_ack_seq(char *buffer, int r);
256 CtPtr handle_in_seq_write(int r);
257 CtPtr client_ready();
258
259 // Server Protocol
260protected:
261 bool wait_for_seq;
262
263 CONTINUATION_DECL(ProtocolV1, send_server_banner);
264 WRITE_HANDLER_CONTINUATION_DECL(ProtocolV1, handle_server_banner_write);
265 READ_HANDLER_CONTINUATION_DECL(ProtocolV1, handle_client_banner);
266 CONTINUATION_DECL(ProtocolV1, wait_connect_message);
267 READ_HANDLER_CONTINUATION_DECL(ProtocolV1, handle_connect_message_1);
268 READ_HANDLER_CONTINUATION_DECL(ProtocolV1, handle_connect_message_auth);
269 WRITE_HANDLER_CONTINUATION_DECL(ProtocolV1,
270 handle_connect_message_reply_write);
271 WRITE_HANDLER_CONTINUATION_DECL(ProtocolV1,
272 handle_ready_connect_message_reply_write);
273 READ_HANDLER_CONTINUATION_DECL(ProtocolV1, handle_seq);
274
275 CtPtr send_server_banner();
276 CtPtr handle_server_banner_write(int r);
277 CtPtr wait_client_banner();
278 CtPtr handle_client_banner(char *buffer, int r);
279 CtPtr wait_connect_message();
280 CtPtr handle_connect_message_1(char *buffer, int r);
281 CtPtr wait_connect_message_auth();
282 CtPtr handle_connect_message_auth(char *buffer, int r);
283 CtPtr handle_connect_message_2();
284 CtPtr send_connect_message_reply(char tag, ceph_msg_connect_reply &reply,
285 bufferlist &authorizer_reply);
286 CtPtr handle_connect_message_reply_write(int r);
9f95a23c 287 CtPtr replace(const AsyncConnectionRef& existing, ceph_msg_connect_reply &reply,
11fdf7f2
TL
288 bufferlist &authorizer_reply);
289 CtPtr open(ceph_msg_connect_reply &reply, bufferlist &authorizer_reply);
290 CtPtr handle_ready_connect_message_reply_write(int r);
291 CtPtr wait_seq();
292 CtPtr handle_seq(char *buffer, int r);
293 CtPtr server_ready();
294};
295
296class LoopbackProtocolV1 : public ProtocolV1 {
297public:
298 LoopbackProtocolV1(AsyncConnection *connection) : ProtocolV1(connection) {
299 this->can_write = WriteStatus::CANWRITE;
300 }
301};
302
303#endif /* _MSG_ASYNC_PROTOCOL_V1_ */