]>
Commit | Line | Data |
---|---|---|
11fdf7f2 TL |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | ||
4 | #ifndef _MSG_ASYNC_PROTOCOL_V1_ | |
5 | #define _MSG_ASYNC_PROTOCOL_V1_ | |
6 | ||
7 | #include "Protocol.h" | |
8 | ||
9 | class ProtocolV1; | |
10 | using CtPtr = Ct<ProtocolV1>*; | |
11 | ||
12 | class ProtocolV1 : public Protocol { | |
13 | /* | |
14 | * ProtocolV1 State Machine | |
15 | * | |
16 | ||
17 | send_server_banner send_client_banner | |
18 | | | | |
19 | v v | |
20 | wait_client_banner wait_server_banner | |
21 | | | | |
22 | | v | |
23 | v handle_server_banner_and_identify | |
24 | wait_connect_message <---------\ | | |
25 | | | | v | |
26 | | wait_connect_message_auth | send_connect_message <----------\ | |
27 | | | | | | | |
28 | v v | | | | |
29 | handle_connect_message_2 | v | | |
30 | | | | wait_connect_reply | | |
31 | v v | | | | | |
32 | replace -> send_connect_message_reply | V | | |
33 | | | wait_connect_reply_auth | | |
34 | | | | | | |
35 | v v v | | |
36 | open ---\ handle_connect_reply_2 --------/ | |
37 | | | | | |
38 | | v v | |
39 | | wait_seq wait_ack_seq | |
40 | | | | | |
41 | v v v | |
42 | server_ready client_ready | |
43 | | | | |
44 | \------------------> wait_message <------------/ | |
45 | | ^ | ^ | |
46 | /------------------------/ | | | | |
47 | | | | \----------------- ------------\ | |
48 | v /----------/ v | | |
49 | handle_keepalive2 | handle_message_header read_message_footer | |
50 | handle_keepalive2_ack | | ^ | |
51 | handle_tag_ack | v | | |
52 | | | throttle_message read_message_data | |
53 | \----------------/ | ^ | |
54 | v | | |
55 | read_message_front --> read_message_middle --/ | |
56 | */ | |
57 | ||
58 | protected: | |
59 | ||
60 | enum State { | |
61 | NONE = 0, | |
62 | START_CONNECT, | |
63 | CONNECTING, | |
64 | CONNECTING_WAIT_BANNER_AND_IDENTIFY, | |
65 | CONNECTING_SEND_CONNECT_MSG, | |
66 | START_ACCEPT, | |
67 | ACCEPTING, | |
68 | ACCEPTING_WAIT_CONNECT_MSG_AUTH, | |
69 | ACCEPTING_HANDLED_CONNECT_MSG, | |
70 | OPENED, | |
71 | THROTTLE_MESSAGE, | |
72 | THROTTLE_BYTES, | |
73 | THROTTLE_DISPATCH_QUEUE, | |
74 | READ_MESSAGE_FRONT, | |
75 | READ_FOOTER_AND_DISPATCH, | |
76 | CLOSED, | |
77 | WAIT, | |
78 | STANDBY | |
79 | }; | |
80 | ||
81 | static const char *get_state_name(int state) { | |
82 | const char *const statenames[] = {"NONE", | |
83 | "START_CONNECT", | |
84 | "CONNECTING", | |
85 | "CONNECTING_WAIT_BANNER_AND_IDENTIFY", | |
86 | "CONNECTING_SEND_CONNECT_MSG", | |
87 | "START_ACCEPT", | |
88 | "ACCEPTING", | |
89 | "ACCEPTING_WAIT_CONNECT_MSG_AUTH", | |
90 | "ACCEPTING_HANDLED_CONNECT_MSG", | |
91 | "OPENED", | |
92 | "THROTTLE_MESSAGE", | |
93 | "THROTTLE_BYTES", | |
94 | "THROTTLE_DISPATCH_QUEUE", | |
95 | "READ_MESSAGE_FRONT", | |
96 | "READ_FOOTER_AND_DISPATCH", | |
97 | "CLOSED", | |
98 | "WAIT", | |
99 | "STANDBY"}; | |
100 | return statenames[state]; | |
101 | } | |
102 | ||
103 | char *temp_buffer; | |
104 | ||
105 | enum class WriteStatus { NOWRITE, REPLACING, CANWRITE, CLOSED }; | |
106 | std::atomic<WriteStatus> can_write; | |
107 | std::list<Message *> sent; // the first bufferlist need to inject seq | |
108 | // priority queue for outbound msgs | |
109 | std::map<int, std::list<std::pair<bufferlist, Message *>>> out_q; | |
110 | bool keepalive; | |
494da23a | 111 | bool write_in_progress = false; |
11fdf7f2 TL |
112 | |
113 | __u32 connect_seq, peer_global_seq; | |
114 | std::atomic<uint64_t> in_seq{0}; | |
115 | std::atomic<uint64_t> out_seq{0}; | |
116 | std::atomic<uint64_t> ack_left{0}; | |
117 | ||
11fdf7f2 | 118 | std::shared_ptr<AuthSessionHandler> session_security; |
11fdf7f2 TL |
119 | |
120 | // Open state | |
121 | ceph_msg_connect connect_msg; | |
122 | ceph_msg_connect_reply connect_reply; | |
9f95a23c TL |
123 | bufferlist authorizer_buf; // auth(orizer) payload read off the wire |
124 | bufferlist authorizer_more; // connect-side auth retry (we added challenge) | |
11fdf7f2 TL |
125 | |
126 | utime_t backoff; // backoff time | |
127 | utime_t recv_stamp; | |
128 | utime_t throttle_stamp; | |
129 | unsigned msg_left; | |
130 | uint64_t cur_msg_size; | |
131 | ceph_msg_header current_header; | |
132 | bufferlist data_buf; | |
133 | bufferlist::iterator data_blp; | |
134 | bufferlist front, middle, data; | |
135 | ||
136 | bool replacing; // when replacing process happened, we will reply connect | |
137 | // side with RETRY tag and accept side will clear replaced | |
138 | // connection. So when connect side reissue connect_msg, | |
139 | // there won't exists conflicting connection so we use | |
140 | // "replacing" to skip RESETSESSION to avoid detect wrong | |
141 | // presentation | |
142 | bool is_reset_from_peer; | |
143 | bool once_ready; | |
144 | ||
145 | State state; | |
146 | ||
147 | void run_continuation(CtPtr pcontinuation); | |
148 | CtPtr read(CONTINUATION_RX_TYPE<ProtocolV1> &next, int len, | |
149 | char *buffer = nullptr); | |
150 | CtPtr write(CONTINUATION_TX_TYPE<ProtocolV1> &next,bufferlist &bl); | |
151 | inline CtPtr _fault() { // helper fault method that stops continuation | |
152 | fault(); | |
153 | return nullptr; | |
154 | } | |
155 | ||
156 | CONTINUATION_DECL(ProtocolV1, wait_message); | |
157 | READ_HANDLER_CONTINUATION_DECL(ProtocolV1, handle_message); | |
158 | READ_HANDLER_CONTINUATION_DECL(ProtocolV1, handle_keepalive2); | |
159 | READ_HANDLER_CONTINUATION_DECL(ProtocolV1, handle_keepalive2_ack); | |
160 | READ_HANDLER_CONTINUATION_DECL(ProtocolV1, handle_tag_ack); | |
161 | READ_HANDLER_CONTINUATION_DECL(ProtocolV1, handle_message_header); | |
162 | CONTINUATION_DECL(ProtocolV1, throttle_message); | |
163 | CONTINUATION_DECL(ProtocolV1, throttle_bytes); | |
164 | CONTINUATION_DECL(ProtocolV1, throttle_dispatch_queue); | |
165 | READ_HANDLER_CONTINUATION_DECL(ProtocolV1, handle_message_front); | |
166 | READ_HANDLER_CONTINUATION_DECL(ProtocolV1, handle_message_middle); | |
167 | CONTINUATION_DECL(ProtocolV1, read_message_data); | |
168 | READ_HANDLER_CONTINUATION_DECL(ProtocolV1, handle_message_data); | |
169 | READ_HANDLER_CONTINUATION_DECL(ProtocolV1, handle_message_footer); | |
170 | ||
171 | CtPtr ready(); | |
172 | CtPtr wait_message(); | |
173 | CtPtr handle_message(char *buffer, int r); | |
174 | ||
175 | CtPtr handle_keepalive2(char *buffer, int r); | |
176 | void append_keepalive_or_ack(bool ack = false, utime_t *t = nullptr); | |
177 | CtPtr handle_keepalive2_ack(char *buffer, int r); | |
178 | CtPtr handle_tag_ack(char *buffer, int r); | |
179 | ||
180 | CtPtr handle_message_header(char *buffer, int r); | |
181 | CtPtr throttle_message(); | |
182 | CtPtr throttle_bytes(); | |
183 | CtPtr throttle_dispatch_queue(); | |
184 | CtPtr read_message_front(); | |
185 | CtPtr handle_message_front(char *buffer, int r); | |
186 | CtPtr read_message_middle(); | |
187 | CtPtr handle_message_middle(char *buffer, int r); | |
188 | CtPtr read_message_data_prepare(); | |
189 | CtPtr read_message_data(); | |
190 | CtPtr handle_message_data(char *buffer, int r); | |
191 | CtPtr read_message_footer(); | |
192 | CtPtr handle_message_footer(char *buffer, int r); | |
193 | ||
194 | void session_reset(); | |
195 | void randomize_out_seq(); | |
196 | ||
197 | Message *_get_next_outgoing(bufferlist *bl); | |
198 | ||
199 | void prepare_send_message(uint64_t features, Message *m, bufferlist &bl); | |
200 | ssize_t write_message(Message *m, bufferlist &bl, bool more); | |
201 | ||
202 | void requeue_sent(); | |
203 | uint64_t discard_requeued_up_to(uint64_t out_seq, uint64_t seq); | |
204 | void discard_out_queue(); | |
205 | ||
206 | void reset_recv_state(); | |
9f95a23c | 207 | void reset_security(); |
11fdf7f2 TL |
208 | |
209 | ostream &_conn_prefix(std::ostream *_dout); | |
210 | ||
211 | public: | |
212 | ProtocolV1(AsyncConnection *connection); | |
213 | virtual ~ProtocolV1(); | |
214 | ||
215 | virtual void connect() override; | |
216 | virtual void accept() override; | |
217 | virtual bool is_connected() override; | |
218 | virtual void stop() override; | |
219 | virtual void fault() override; | |
220 | virtual void send_message(Message *m) override; | |
221 | virtual void send_keepalive() override; | |
222 | ||
223 | virtual void read_event() override; | |
224 | virtual void write_event() override; | |
225 | virtual bool is_queued() override; | |
226 | ||
227 | // Client Protocol | |
228 | private: | |
229 | int global_seq; | |
11fdf7f2 TL |
230 | |
231 | CONTINUATION_DECL(ProtocolV1, send_client_banner); | |
232 | WRITE_HANDLER_CONTINUATION_DECL(ProtocolV1, handle_client_banner_write); | |
233 | READ_HANDLER_CONTINUATION_DECL(ProtocolV1, handle_server_banner_and_identify); | |
234 | WRITE_HANDLER_CONTINUATION_DECL(ProtocolV1, handle_my_addr_write); | |
235 | CONTINUATION_DECL(ProtocolV1, send_connect_message); | |
236 | WRITE_HANDLER_CONTINUATION_DECL(ProtocolV1, handle_connect_message_write); | |
237 | READ_HANDLER_CONTINUATION_DECL(ProtocolV1, handle_connect_reply_1); | |
238 | READ_HANDLER_CONTINUATION_DECL(ProtocolV1, handle_connect_reply_auth); | |
239 | READ_HANDLER_CONTINUATION_DECL(ProtocolV1, handle_ack_seq); | |
240 | WRITE_HANDLER_CONTINUATION_DECL(ProtocolV1, handle_in_seq_write); | |
241 | ||
242 | CtPtr send_client_banner(); | |
243 | CtPtr handle_client_banner_write(int r); | |
244 | CtPtr wait_server_banner(); | |
245 | CtPtr handle_server_banner_and_identify(char *buffer, int r); | |
246 | CtPtr handle_my_addr_write(int r); | |
247 | CtPtr send_connect_message(); | |
248 | CtPtr handle_connect_message_write(int r); | |
249 | CtPtr wait_connect_reply(); | |
250 | CtPtr handle_connect_reply_1(char *buffer, int r); | |
251 | CtPtr wait_connect_reply_auth(); | |
252 | CtPtr handle_connect_reply_auth(char *buffer, int r); | |
253 | CtPtr handle_connect_reply_2(); | |
254 | CtPtr wait_ack_seq(); | |
255 | CtPtr handle_ack_seq(char *buffer, int r); | |
256 | CtPtr handle_in_seq_write(int r); | |
257 | CtPtr client_ready(); | |
258 | ||
259 | // Server Protocol | |
260 | protected: | |
261 | bool wait_for_seq; | |
262 | ||
263 | CONTINUATION_DECL(ProtocolV1, send_server_banner); | |
264 | WRITE_HANDLER_CONTINUATION_DECL(ProtocolV1, handle_server_banner_write); | |
265 | READ_HANDLER_CONTINUATION_DECL(ProtocolV1, handle_client_banner); | |
266 | CONTINUATION_DECL(ProtocolV1, wait_connect_message); | |
267 | READ_HANDLER_CONTINUATION_DECL(ProtocolV1, handle_connect_message_1); | |
268 | READ_HANDLER_CONTINUATION_DECL(ProtocolV1, handle_connect_message_auth); | |
269 | WRITE_HANDLER_CONTINUATION_DECL(ProtocolV1, | |
270 | handle_connect_message_reply_write); | |
271 | WRITE_HANDLER_CONTINUATION_DECL(ProtocolV1, | |
272 | handle_ready_connect_message_reply_write); | |
273 | READ_HANDLER_CONTINUATION_DECL(ProtocolV1, handle_seq); | |
274 | ||
275 | CtPtr send_server_banner(); | |
276 | CtPtr handle_server_banner_write(int r); | |
277 | CtPtr wait_client_banner(); | |
278 | CtPtr handle_client_banner(char *buffer, int r); | |
279 | CtPtr wait_connect_message(); | |
280 | CtPtr handle_connect_message_1(char *buffer, int r); | |
281 | CtPtr wait_connect_message_auth(); | |
282 | CtPtr handle_connect_message_auth(char *buffer, int r); | |
283 | CtPtr handle_connect_message_2(); | |
284 | CtPtr send_connect_message_reply(char tag, ceph_msg_connect_reply &reply, | |
285 | bufferlist &authorizer_reply); | |
286 | CtPtr handle_connect_message_reply_write(int r); | |
9f95a23c | 287 | CtPtr replace(const AsyncConnectionRef& existing, ceph_msg_connect_reply &reply, |
11fdf7f2 TL |
288 | bufferlist &authorizer_reply); |
289 | CtPtr open(ceph_msg_connect_reply &reply, bufferlist &authorizer_reply); | |
290 | CtPtr handle_ready_connect_message_reply_write(int r); | |
291 | CtPtr wait_seq(); | |
292 | CtPtr handle_seq(char *buffer, int r); | |
293 | CtPtr server_ready(); | |
294 | }; | |
295 | ||
296 | class LoopbackProtocolV1 : public ProtocolV1 { | |
297 | public: | |
298 | LoopbackProtocolV1(AsyncConnection *connection) : ProtocolV1(connection) { | |
299 | this->can_write = WriteStatus::CANWRITE; | |
300 | } | |
301 | }; | |
302 | ||
303 | #endif /* _MSG_ASYNC_PROTOCOL_V1_ */ |