]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | /* | |
4 | * Ceph - scalable distributed file system | |
5 | * | |
6 | * Copyright (C) 2014 UnitedStack <haomai@unitedstack.com> | |
7 | * | |
8 | * Author: Haomai Wang <haomaiwang@gmail.com> | |
9 | * | |
10 | * This is free software; you can redistribute it and/or | |
11 | * modify it under the terms of the GNU Lesser General Public | |
12 | * License version 2.1, as published by the Free Software | |
13 | * Foundation. See file COPYING. | |
14 | * | |
15 | */ | |
16 | ||
17 | #ifndef CEPH_MSG_ASYNCCONNECTION_H | |
18 | #define CEPH_MSG_ASYNCCONNECTION_H | |
19 | ||
20 | #include <atomic> | |
21 | #include <pthread.h> | |
7c673cae FG |
22 | #include <climits> |
23 | #include <list> | |
24 | #include <mutex> | |
25 | #include <map> | |
26 | using namespace std; | |
27 | ||
28 | #include "auth/AuthSessionHandler.h" | |
29 | #include "common/ceph_time.h" | |
30 | #include "common/perf_counters.h" | |
31 | #include "include/buffer.h" | |
32 | #include "msg/Connection.h" | |
33 | #include "msg/Messenger.h" | |
34 | ||
35 | #include "Event.h" | |
36 | #include "Stack.h" | |
37 | ||
38 | class AsyncMessenger; | |
39 | class Worker; | |
40 | ||
41 | static const int ASYNC_IOV_MAX = (IOV_MAX >= 1024 ? IOV_MAX / 4 : IOV_MAX); | |
42 | ||
43 | /* | |
44 | * AsyncConnection maintains a logic session between two endpoints. In other | |
45 | * word, a pair of addresses can find the only AsyncConnection. AsyncConnection | |
46 | * will handle with network fault or read/write transactions. If one file | |
47 | * descriptor broken, AsyncConnection will maintain the message queue and | |
48 | * sequence, try to reconnect peer endpoint. | |
49 | */ | |
50 | class AsyncConnection : public Connection { | |
51 | ||
52 | ssize_t read_bulk(char *buf, unsigned len); | |
53 | ssize_t do_sendmsg(struct msghdr &msg, unsigned len, bool more); | |
54 | ssize_t try_send(bufferlist &bl, bool more=false) { | |
55 | std::lock_guard<std::mutex> l(write_lock); | |
56 | outcoming_bl.claim_append(bl); | |
57 | return _try_send(more); | |
58 | } | |
59 | ssize_t _try_send(bool more=false); | |
60 | ssize_t _send(Message *m); | |
61 | void prepare_send_message(uint64_t features, Message *m, bufferlist &bl); | |
62 | ssize_t read_until(unsigned needed, char *p); | |
63 | ssize_t _process_connection(); | |
64 | void _connect(); | |
65 | void _stop(); | |
66 | int handle_connect_reply(ceph_msg_connect &connect, ceph_msg_connect_reply &r); | |
67 | ssize_t handle_connect_msg(ceph_msg_connect &m, bufferlist &aubl, bufferlist &bl); | |
68 | void was_session_reset(); | |
69 | void fault(); | |
70 | void discard_out_queue(); | |
71 | void discard_requeued_up_to(uint64_t seq); | |
72 | void requeue_sent(); | |
73 | int randomize_out_seq(); | |
74 | void handle_ack(uint64_t seq); | |
75 | void _append_keepalive_or_ack(bool ack=false, utime_t *t=NULL); | |
76 | ssize_t write_message(Message *m, bufferlist& bl, bool more); | |
77 | void inject_delay(); | |
78 | ssize_t _reply_accept(char tag, ceph_msg_connect &connect, ceph_msg_connect_reply &reply, | |
79 | bufferlist &authorizer_reply) { | |
80 | bufferlist reply_bl; | |
81 | reply.tag = tag; | |
82 | reply.features = ((uint64_t)connect.features & policy.features_supported) | policy.features_required; | |
83 | reply.authorizer_len = authorizer_reply.length(); | |
84 | reply_bl.append((char*)&reply, sizeof(reply)); | |
85 | if (reply.authorizer_len) { | |
86 | reply_bl.append(authorizer_reply.c_str(), authorizer_reply.length()); | |
87 | } | |
88 | ssize_t r = try_send(reply_bl); | |
89 | if (r < 0) { | |
90 | inject_delay(); | |
91 | return -1; | |
92 | } | |
93 | ||
94 | state = STATE_ACCEPTING_WAIT_CONNECT_MSG; | |
95 | return 0; | |
96 | } | |
97 | bool is_queued() const { | |
98 | return !out_q.empty() || outcoming_bl.length(); | |
99 | } | |
100 | void shutdown_socket() { | |
101 | for (auto &&t : register_time_events) | |
102 | center->delete_time_event(t); | |
103 | register_time_events.clear(); | |
104 | if (last_tick_id) { | |
105 | center->delete_time_event(last_tick_id); | |
106 | last_tick_id = 0; | |
107 | } | |
108 | if (cs) { | |
109 | center->delete_file_event(cs.fd(), EVENT_READABLE|EVENT_WRITABLE); | |
110 | cs.shutdown(); | |
111 | cs.close(); | |
112 | } | |
113 | } | |
114 | Message *_get_next_outgoing(bufferlist *bl) { | |
115 | Message *m = 0; | |
116 | while (!m && !out_q.empty()) { | |
117 | map<int, list<pair<bufferlist, Message*> > >::reverse_iterator it = out_q.rbegin(); | |
118 | if (!it->second.empty()) { | |
119 | list<pair<bufferlist, Message*> >::iterator p = it->second.begin(); | |
120 | m = p->second; | |
121 | if (bl) | |
122 | bl->swap(p->first); | |
123 | it->second.erase(p); | |
124 | } | |
125 | if (it->second.empty()) | |
126 | out_q.erase(it->first); | |
127 | } | |
128 | return m; | |
129 | } | |
130 | bool _has_next_outgoing() const { | |
131 | return !out_q.empty(); | |
132 | } | |
133 | void reset_recv_state(); | |
134 | ||
135 | /** | |
136 | * The DelayedDelivery is for injecting delays into Message delivery off | |
137 | * the socket. It is only enabled if delays are requested, and if they | |
138 | * are then it pulls Messages off the DelayQueue and puts them into the | |
139 | * AsyncMessenger event queue. | |
140 | */ | |
141 | class DelayedDelivery : public EventCallback { | |
142 | std::set<uint64_t> register_time_events; // need to delete it if stop | |
143 | std::deque<std::pair<utime_t, Message*> > delay_queue; | |
144 | std::mutex delay_lock; | |
145 | AsyncMessenger *msgr; | |
146 | EventCenter *center; | |
147 | DispatchQueue *dispatch_queue; | |
148 | uint64_t conn_id; | |
149 | std::atomic_bool stop_dispatch; | |
150 | ||
151 | public: | |
152 | explicit DelayedDelivery(AsyncMessenger *omsgr, EventCenter *c, | |
153 | DispatchQueue *q, uint64_t cid) | |
154 | : msgr(omsgr), center(c), dispatch_queue(q), conn_id(cid), | |
155 | stop_dispatch(false) { } | |
156 | ~DelayedDelivery() override { | |
157 | assert(register_time_events.empty()); | |
158 | assert(delay_queue.empty()); | |
159 | } | |
160 | void set_center(EventCenter *c) { center = c; } | |
161 | void do_request(int id) override; | |
162 | void queue(double delay_period, utime_t release, Message *m) { | |
163 | std::lock_guard<std::mutex> l(delay_lock); | |
164 | delay_queue.push_back(std::make_pair(release, m)); | |
165 | register_time_events.insert(center->create_time_event(delay_period*1000000, this)); | |
166 | } | |
167 | void discard() { | |
168 | stop_dispatch = true; | |
169 | center->submit_to(center->get_id(), [this] () mutable { | |
170 | std::lock_guard<std::mutex> l(delay_lock); | |
171 | while (!delay_queue.empty()) { | |
172 | Message *m = delay_queue.front().second; | |
173 | dispatch_queue->dispatch_throttle_release(m->get_dispatch_throttle_size()); | |
174 | m->put(); | |
175 | delay_queue.pop_front(); | |
176 | } | |
177 | for (auto i : register_time_events) | |
178 | center->delete_time_event(i); | |
179 | register_time_events.clear(); | |
180 | stop_dispatch = false; | |
181 | }, true); | |
182 | } | |
183 | bool ready() const { return !stop_dispatch && delay_queue.empty() && register_time_events.empty(); } | |
184 | void flush(); | |
185 | } *delay_state; | |
186 | ||
187 | public: | |
188 | AsyncConnection(CephContext *cct, AsyncMessenger *m, DispatchQueue *q, Worker *w); | |
189 | ~AsyncConnection() override; | |
190 | void maybe_start_delay_thread(); | |
191 | ||
192 | ostream& _conn_prefix(std::ostream *_dout); | |
193 | ||
194 | bool is_connected() override { | |
195 | return can_write.load() == WriteStatus::CANWRITE; | |
196 | } | |
197 | ||
198 | // Only call when AsyncConnection first construct | |
199 | void connect(const entity_addr_t& addr, int type) { | |
200 | set_peer_type(type); | |
201 | set_peer_addr(addr); | |
202 | policy = msgr->get_policy(type); | |
203 | _connect(); | |
204 | } | |
205 | // Only call when AsyncConnection first construct | |
206 | void accept(ConnectedSocket socket, entity_addr_t &addr); | |
207 | int send_message(Message *m) override; | |
208 | ||
209 | void send_keepalive() override; | |
210 | void mark_down() override; | |
211 | void mark_disposable() override { | |
212 | std::lock_guard<std::mutex> l(lock); | |
213 | policy.lossy = true; | |
214 | } | |
215 | ||
216 | private: | |
217 | enum { | |
218 | STATE_NONE, | |
219 | STATE_OPEN, | |
220 | STATE_OPEN_KEEPALIVE2, | |
221 | STATE_OPEN_KEEPALIVE2_ACK, | |
222 | STATE_OPEN_TAG_ACK, | |
223 | STATE_OPEN_MESSAGE_HEADER, | |
224 | STATE_OPEN_MESSAGE_THROTTLE_MESSAGE, | |
225 | STATE_OPEN_MESSAGE_THROTTLE_BYTES, | |
226 | STATE_OPEN_MESSAGE_THROTTLE_DISPATCH_QUEUE, | |
227 | STATE_OPEN_MESSAGE_READ_FRONT, | |
228 | STATE_OPEN_MESSAGE_READ_MIDDLE, | |
229 | STATE_OPEN_MESSAGE_READ_DATA_PREPARE, | |
230 | STATE_OPEN_MESSAGE_READ_DATA, | |
231 | STATE_OPEN_MESSAGE_READ_FOOTER_AND_DISPATCH, | |
232 | STATE_OPEN_TAG_CLOSE, | |
233 | STATE_WAIT_SEND, | |
234 | STATE_CONNECTING, | |
235 | STATE_CONNECTING_RE, | |
236 | STATE_CONNECTING_WAIT_BANNER_AND_IDENTIFY, | |
237 | STATE_CONNECTING_SEND_CONNECT_MSG, | |
238 | STATE_CONNECTING_WAIT_CONNECT_REPLY, | |
239 | STATE_CONNECTING_WAIT_CONNECT_REPLY_AUTH, | |
240 | STATE_CONNECTING_WAIT_ACK_SEQ, | |
241 | STATE_CONNECTING_READY, | |
242 | STATE_ACCEPTING, | |
243 | STATE_ACCEPTING_WAIT_BANNER_ADDR, | |
244 | STATE_ACCEPTING_WAIT_CONNECT_MSG, | |
245 | STATE_ACCEPTING_WAIT_CONNECT_MSG_AUTH, | |
246 | STATE_ACCEPTING_WAIT_SEQ, | |
247 | STATE_ACCEPTING_READY, | |
248 | STATE_STANDBY, | |
249 | STATE_CLOSED, | |
250 | STATE_WAIT, // just wait for racing connection | |
251 | }; | |
252 | ||
253 | static const int TCP_PREFETCH_MIN_SIZE; | |
254 | static const char *get_state_name(int state) { | |
255 | const char* const statenames[] = {"STATE_NONE", | |
256 | "STATE_OPEN", | |
257 | "STATE_OPEN_KEEPALIVE2", | |
258 | "STATE_OPEN_KEEPALIVE2_ACK", | |
259 | "STATE_OPEN_TAG_ACK", | |
260 | "STATE_OPEN_MESSAGE_HEADER", | |
261 | "STATE_OPEN_MESSAGE_THROTTLE_MESSAGE", | |
262 | "STATE_OPEN_MESSAGE_THROTTLE_BYTES", | |
263 | "STATE_OPEN_MESSAGE_THROTTLE_DISPATCH_QUEUE", | |
264 | "STATE_OPEN_MESSAGE_READ_FRONT", | |
265 | "STATE_OPEN_MESSAGE_READ_MIDDLE", | |
266 | "STATE_OPEN_MESSAGE_READ_DATA_PREPARE", | |
267 | "STATE_OPEN_MESSAGE_READ_DATA", | |
268 | "STATE_OPEN_MESSAGE_READ_FOOTER_AND_DISPATCH", | |
269 | "STATE_OPEN_TAG_CLOSE", | |
270 | "STATE_WAIT_SEND", | |
271 | "STATE_CONNECTING", | |
272 | "STATE_CONNECTING_RE", | |
273 | "STATE_CONNECTING_WAIT_BANNER_AND_IDENTIFY", | |
274 | "STATE_CONNECTING_SEND_CONNECT_MSG", | |
275 | "STATE_CONNECTING_WAIT_CONNECT_REPLY", | |
276 | "STATE_CONNECTING_WAIT_CONNECT_REPLY_AUTH", | |
277 | "STATE_CONNECTING_WAIT_ACK_SEQ", | |
278 | "STATE_CONNECTING_READY", | |
279 | "STATE_ACCEPTING", | |
280 | "STATE_ACCEPTING_WAIT_BANNER_ADDR", | |
281 | "STATE_ACCEPTING_WAIT_CONNECT_MSG", | |
282 | "STATE_ACCEPTING_WAIT_CONNECT_MSG_AUTH", | |
283 | "STATE_ACCEPTING_WAIT_SEQ", | |
284 | "STATE_ACCEPTING_READY", | |
285 | "STATE_STANDBY", | |
286 | "STATE_CLOSED", | |
287 | "STATE_WAIT"}; | |
288 | return statenames[state]; | |
289 | } | |
290 | ||
291 | AsyncMessenger *async_msgr; | |
292 | uint64_t conn_id; | |
293 | PerfCounters *logger; | |
294 | int global_seq; | |
295 | __u32 connect_seq, peer_global_seq; | |
31f18b77 FG |
296 | std::atomic<uint64_t> out_seq{0}; |
297 | std::atomic<uint64_t> ack_left{0}, in_seq{0}; | |
7c673cae FG |
298 | int state; |
299 | int state_after_send; | |
300 | ConnectedSocket cs; | |
301 | int port; | |
302 | Messenger::Policy policy; | |
303 | ||
304 | DispatchQueue *dispatch_queue; | |
305 | ||
31f18b77 FG |
306 | // lockfree, only used in own thread |
307 | bufferlist outcoming_bl; | |
308 | bool open_write = false; | |
309 | ||
7c673cae FG |
310 | std::mutex write_lock; |
311 | enum class WriteStatus { | |
312 | NOWRITE, | |
313 | REPLACING, | |
314 | CANWRITE, | |
315 | CLOSED | |
316 | }; | |
317 | std::atomic<WriteStatus> can_write; | |
7c673cae | 318 | list<Message*> sent; // the first bufferlist need to inject seq |
31f18b77 | 319 | map<int, list<pair<bufferlist, Message*> > > out_q; // priority queue for outbound msgs |
7c673cae FG |
320 | bool keepalive; |
321 | ||
322 | std::mutex lock; | |
323 | utime_t backoff; // backoff time | |
324 | EventCallbackRef read_handler; | |
325 | EventCallbackRef write_handler; | |
326 | EventCallbackRef wakeup_handler; | |
327 | EventCallbackRef tick_handler; | |
328 | struct iovec msgvec[ASYNC_IOV_MAX]; | |
329 | char *recv_buf; | |
330 | uint32_t recv_max_prefetch; | |
331 | uint32_t recv_start; | |
332 | uint32_t recv_end; | |
333 | set<uint64_t> register_time_events; // need to delete it if stop | |
334 | ceph::coarse_mono_clock::time_point last_active; | |
335 | uint64_t last_tick_id = 0; | |
336 | const uint64_t inactive_timeout_us; | |
337 | ||
338 | // Tis section are temp variables used by state transition | |
339 | ||
340 | // Open state | |
341 | utime_t recv_stamp; | |
342 | utime_t throttle_stamp; | |
343 | unsigned msg_left; | |
344 | uint64_t cur_msg_size; | |
345 | ceph_msg_header current_header; | |
346 | bufferlist data_buf; | |
347 | bufferlist::iterator data_blp; | |
348 | bufferlist front, middle, data; | |
349 | ceph_msg_connect connect_msg; | |
350 | // Connecting state | |
351 | bool got_bad_auth; | |
352 | AuthAuthorizer *authorizer; | |
353 | bufferlist authorizer_buf; | |
354 | ceph_msg_connect_reply connect_reply; | |
355 | // Accepting state | |
356 | entity_addr_t socket_addr; | |
357 | CryptoKey session_key; | |
358 | bool replacing; // when replacing process happened, we will reply connect | |
359 | // side with RETRY tag and accept side will clear replaced | |
360 | // connection. So when connect side reissue connect_msg, | |
361 | // there won't exists conflicting connection so we use | |
362 | // "replacing" to skip RESETSESSION to avoid detect wrong | |
363 | // presentation | |
364 | bool is_reset_from_peer; | |
365 | bool once_ready; | |
366 | ||
367 | // used only for local state, it will be overwrite when state transition | |
368 | char *state_buffer; | |
369 | // used only by "read_until" | |
370 | uint64_t state_offset; | |
371 | Worker *worker; | |
372 | EventCenter *center; | |
373 | ceph::shared_ptr<AuthSessionHandler> session_security; | |
28e407b8 | 374 | std::unique_ptr<AuthAuthorizerChallenge> authorizer_challenge; // accept side |
7c673cae FG |
375 | |
376 | public: | |
377 | // used by eventcallback | |
378 | void handle_write(); | |
379 | void process(); | |
380 | void wakeup_from(uint64_t id); | |
381 | void tick(uint64_t id); | |
382 | void local_deliver(); | |
383 | void stop(bool queue_reset) { | |
384 | lock.lock(); | |
385 | bool need_queue_reset = (state != STATE_CLOSED) && queue_reset; | |
386 | _stop(); | |
387 | lock.unlock(); | |
388 | if (need_queue_reset) | |
389 | dispatch_queue->queue_reset(this); | |
390 | } | |
391 | void cleanup() { | |
392 | shutdown_socket(); | |
393 | delete read_handler; | |
394 | delete write_handler; | |
395 | delete wakeup_handler; | |
396 | delete tick_handler; | |
397 | if (delay_state) { | |
398 | delete delay_state; | |
399 | delay_state = NULL; | |
400 | } | |
401 | } | |
402 | PerfCounters *get_perf_counter() { | |
403 | return logger; | |
404 | } | |
405 | }; /* AsyncConnection */ | |
406 | ||
407 | typedef boost::intrusive_ptr<AsyncConnection> AsyncConnectionRef; | |
408 | ||
409 | #endif |