]> git.proxmox.com Git - ceph.git/blob - ceph/src/msg/async/AsyncConnection.h
add subtree-ish sources for 12.0.3
[ceph.git] / ceph / src / msg / async / AsyncConnection.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2014 UnitedStack <haomai@unitedstack.com>
7 *
8 * Author: Haomai Wang <haomaiwang@gmail.com>
9 *
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
14 *
15 */
16
17 #ifndef CEPH_MSG_ASYNCCONNECTION_H
18 #define CEPH_MSG_ASYNCCONNECTION_H
19
20 #include <atomic>
21 #include <pthread.h>
22 #include <signal.h>
23 #include <climits>
24 #include <list>
25 #include <mutex>
26 #include <map>
27 using namespace std;
28
29 #include "auth/AuthSessionHandler.h"
30 #include "common/ceph_time.h"
31 #include "common/perf_counters.h"
32 #include "include/buffer.h"
33 #include "msg/Connection.h"
34 #include "msg/Messenger.h"
35
36 #include "Event.h"
37 #include "Stack.h"
38
39 class AsyncMessenger;
40 class Worker;
41
42 static const int ASYNC_IOV_MAX = (IOV_MAX >= 1024 ? IOV_MAX / 4 : IOV_MAX);
43
44 /*
45 * AsyncConnection maintains a logic session between two endpoints. In other
46 * word, a pair of addresses can find the only AsyncConnection. AsyncConnection
47 * will handle with network fault or read/write transactions. If one file
48 * descriptor broken, AsyncConnection will maintain the message queue and
49 * sequence, try to reconnect peer endpoint.
50 */
51 class AsyncConnection : public Connection {
52
53 ssize_t read_bulk(char *buf, unsigned len);
54 ssize_t do_sendmsg(struct msghdr &msg, unsigned len, bool more);
55 ssize_t try_send(bufferlist &bl, bool more=false) {
56 std::lock_guard<std::mutex> l(write_lock);
57 outcoming_bl.claim_append(bl);
58 return _try_send(more);
59 }
60 ssize_t _try_send(bool more=false);
61 ssize_t _send(Message *m);
62 void prepare_send_message(uint64_t features, Message *m, bufferlist &bl);
63 ssize_t read_until(unsigned needed, char *p);
64 ssize_t _process_connection();
65 void _connect();
66 void _stop();
67 int handle_connect_reply(ceph_msg_connect &connect, ceph_msg_connect_reply &r);
68 ssize_t handle_connect_msg(ceph_msg_connect &m, bufferlist &aubl, bufferlist &bl);
69 void was_session_reset();
70 void fault();
71 void discard_out_queue();
72 void discard_requeued_up_to(uint64_t seq);
73 void requeue_sent();
74 int randomize_out_seq();
75 void handle_ack(uint64_t seq);
76 void _append_keepalive_or_ack(bool ack=false, utime_t *t=NULL);
77 ssize_t write_message(Message *m, bufferlist& bl, bool more);
78 void inject_delay();
79 ssize_t _reply_accept(char tag, ceph_msg_connect &connect, ceph_msg_connect_reply &reply,
80 bufferlist &authorizer_reply) {
81 bufferlist reply_bl;
82 reply.tag = tag;
83 reply.features = ((uint64_t)connect.features & policy.features_supported) | policy.features_required;
84 reply.authorizer_len = authorizer_reply.length();
85 reply_bl.append((char*)&reply, sizeof(reply));
86 if (reply.authorizer_len) {
87 reply_bl.append(authorizer_reply.c_str(), authorizer_reply.length());
88 }
89 ssize_t r = try_send(reply_bl);
90 if (r < 0) {
91 inject_delay();
92 return -1;
93 }
94
95 state = STATE_ACCEPTING_WAIT_CONNECT_MSG;
96 return 0;
97 }
98 bool is_queued() const {
99 return !out_q.empty() || outcoming_bl.length();
100 }
101 void shutdown_socket() {
102 for (auto &&t : register_time_events)
103 center->delete_time_event(t);
104 register_time_events.clear();
105 if (last_tick_id) {
106 center->delete_time_event(last_tick_id);
107 last_tick_id = 0;
108 }
109 if (cs) {
110 center->delete_file_event(cs.fd(), EVENT_READABLE|EVENT_WRITABLE);
111 cs.shutdown();
112 cs.close();
113 }
114 }
115 Message *_get_next_outgoing(bufferlist *bl) {
116 Message *m = 0;
117 while (!m && !out_q.empty()) {
118 map<int, list<pair<bufferlist, Message*> > >::reverse_iterator it = out_q.rbegin();
119 if (!it->second.empty()) {
120 list<pair<bufferlist, Message*> >::iterator p = it->second.begin();
121 m = p->second;
122 if (bl)
123 bl->swap(p->first);
124 it->second.erase(p);
125 }
126 if (it->second.empty())
127 out_q.erase(it->first);
128 }
129 return m;
130 }
131 bool _has_next_outgoing() const {
132 return !out_q.empty();
133 }
134 void reset_recv_state();
135
136 /**
137 * The DelayedDelivery is for injecting delays into Message delivery off
138 * the socket. It is only enabled if delays are requested, and if they
139 * are then it pulls Messages off the DelayQueue and puts them into the
140 * AsyncMessenger event queue.
141 */
142 class DelayedDelivery : public EventCallback {
143 std::set<uint64_t> register_time_events; // need to delete it if stop
144 std::deque<std::pair<utime_t, Message*> > delay_queue;
145 std::mutex delay_lock;
146 AsyncMessenger *msgr;
147 EventCenter *center;
148 DispatchQueue *dispatch_queue;
149 uint64_t conn_id;
150 std::atomic_bool stop_dispatch;
151
152 public:
153 explicit DelayedDelivery(AsyncMessenger *omsgr, EventCenter *c,
154 DispatchQueue *q, uint64_t cid)
155 : msgr(omsgr), center(c), dispatch_queue(q), conn_id(cid),
156 stop_dispatch(false) { }
157 ~DelayedDelivery() override {
158 assert(register_time_events.empty());
159 assert(delay_queue.empty());
160 }
161 void set_center(EventCenter *c) { center = c; }
162 void do_request(int id) override;
163 void queue(double delay_period, utime_t release, Message *m) {
164 std::lock_guard<std::mutex> l(delay_lock);
165 delay_queue.push_back(std::make_pair(release, m));
166 register_time_events.insert(center->create_time_event(delay_period*1000000, this));
167 }
168 void discard() {
169 stop_dispatch = true;
170 center->submit_to(center->get_id(), [this] () mutable {
171 std::lock_guard<std::mutex> l(delay_lock);
172 while (!delay_queue.empty()) {
173 Message *m = delay_queue.front().second;
174 dispatch_queue->dispatch_throttle_release(m->get_dispatch_throttle_size());
175 m->put();
176 delay_queue.pop_front();
177 }
178 for (auto i : register_time_events)
179 center->delete_time_event(i);
180 register_time_events.clear();
181 stop_dispatch = false;
182 }, true);
183 }
184 bool ready() const { return !stop_dispatch && delay_queue.empty() && register_time_events.empty(); }
185 void flush();
186 } *delay_state;
187
188 public:
189 AsyncConnection(CephContext *cct, AsyncMessenger *m, DispatchQueue *q, Worker *w);
190 ~AsyncConnection() override;
191 void maybe_start_delay_thread();
192
193 ostream& _conn_prefix(std::ostream *_dout);
194
195 bool is_connected() override {
196 return can_write.load() == WriteStatus::CANWRITE;
197 }
198
199 // Only call when AsyncConnection first construct
200 void connect(const entity_addr_t& addr, int type) {
201 set_peer_type(type);
202 set_peer_addr(addr);
203 policy = msgr->get_policy(type);
204 _connect();
205 }
206 // Only call when AsyncConnection first construct
207 void accept(ConnectedSocket socket, entity_addr_t &addr);
208 int send_message(Message *m) override;
209
210 void send_keepalive() override;
211 void mark_down() override;
212 void mark_disposable() override {
213 std::lock_guard<std::mutex> l(lock);
214 policy.lossy = true;
215 }
216
217 private:
218 enum {
219 STATE_NONE,
220 STATE_OPEN,
221 STATE_OPEN_KEEPALIVE2,
222 STATE_OPEN_KEEPALIVE2_ACK,
223 STATE_OPEN_TAG_ACK,
224 STATE_OPEN_MESSAGE_HEADER,
225 STATE_OPEN_MESSAGE_THROTTLE_MESSAGE,
226 STATE_OPEN_MESSAGE_THROTTLE_BYTES,
227 STATE_OPEN_MESSAGE_THROTTLE_DISPATCH_QUEUE,
228 STATE_OPEN_MESSAGE_READ_FRONT,
229 STATE_OPEN_MESSAGE_READ_MIDDLE,
230 STATE_OPEN_MESSAGE_READ_DATA_PREPARE,
231 STATE_OPEN_MESSAGE_READ_DATA,
232 STATE_OPEN_MESSAGE_READ_FOOTER_AND_DISPATCH,
233 STATE_OPEN_TAG_CLOSE,
234 STATE_WAIT_SEND,
235 STATE_CONNECTING,
236 STATE_CONNECTING_RE,
237 STATE_CONNECTING_WAIT_BANNER_AND_IDENTIFY,
238 STATE_CONNECTING_SEND_CONNECT_MSG,
239 STATE_CONNECTING_WAIT_CONNECT_REPLY,
240 STATE_CONNECTING_WAIT_CONNECT_REPLY_AUTH,
241 STATE_CONNECTING_WAIT_ACK_SEQ,
242 STATE_CONNECTING_READY,
243 STATE_ACCEPTING,
244 STATE_ACCEPTING_WAIT_BANNER_ADDR,
245 STATE_ACCEPTING_WAIT_CONNECT_MSG,
246 STATE_ACCEPTING_WAIT_CONNECT_MSG_AUTH,
247 STATE_ACCEPTING_WAIT_SEQ,
248 STATE_ACCEPTING_READY,
249 STATE_STANDBY,
250 STATE_CLOSED,
251 STATE_WAIT, // just wait for racing connection
252 };
253
254 static const int TCP_PREFETCH_MIN_SIZE;
255 static const char *get_state_name(int state) {
256 const char* const statenames[] = {"STATE_NONE",
257 "STATE_OPEN",
258 "STATE_OPEN_KEEPALIVE2",
259 "STATE_OPEN_KEEPALIVE2_ACK",
260 "STATE_OPEN_TAG_ACK",
261 "STATE_OPEN_MESSAGE_HEADER",
262 "STATE_OPEN_MESSAGE_THROTTLE_MESSAGE",
263 "STATE_OPEN_MESSAGE_THROTTLE_BYTES",
264 "STATE_OPEN_MESSAGE_THROTTLE_DISPATCH_QUEUE",
265 "STATE_OPEN_MESSAGE_READ_FRONT",
266 "STATE_OPEN_MESSAGE_READ_MIDDLE",
267 "STATE_OPEN_MESSAGE_READ_DATA_PREPARE",
268 "STATE_OPEN_MESSAGE_READ_DATA",
269 "STATE_OPEN_MESSAGE_READ_FOOTER_AND_DISPATCH",
270 "STATE_OPEN_TAG_CLOSE",
271 "STATE_WAIT_SEND",
272 "STATE_CONNECTING",
273 "STATE_CONNECTING_RE",
274 "STATE_CONNECTING_WAIT_BANNER_AND_IDENTIFY",
275 "STATE_CONNECTING_SEND_CONNECT_MSG",
276 "STATE_CONNECTING_WAIT_CONNECT_REPLY",
277 "STATE_CONNECTING_WAIT_CONNECT_REPLY_AUTH",
278 "STATE_CONNECTING_WAIT_ACK_SEQ",
279 "STATE_CONNECTING_READY",
280 "STATE_ACCEPTING",
281 "STATE_ACCEPTING_WAIT_BANNER_ADDR",
282 "STATE_ACCEPTING_WAIT_CONNECT_MSG",
283 "STATE_ACCEPTING_WAIT_CONNECT_MSG_AUTH",
284 "STATE_ACCEPTING_WAIT_SEQ",
285 "STATE_ACCEPTING_READY",
286 "STATE_STANDBY",
287 "STATE_CLOSED",
288 "STATE_WAIT"};
289 return statenames[state];
290 }
291
292 AsyncMessenger *async_msgr;
293 uint64_t conn_id;
294 PerfCounters *logger;
295 int global_seq;
296 __u32 connect_seq, peer_global_seq;
297 atomic64_t out_seq;
298 atomic64_t ack_left, in_seq;
299 int state;
300 int state_after_send;
301 ConnectedSocket cs;
302 int port;
303 Messenger::Policy policy;
304
305 DispatchQueue *dispatch_queue;
306
307 std::mutex write_lock;
308 enum class WriteStatus {
309 NOWRITE,
310 REPLACING,
311 CANWRITE,
312 CLOSED
313 };
314 std::atomic<WriteStatus> can_write;
315 bool open_write;
316 map<int, list<pair<bufferlist, Message*> > > out_q; // priority queue for outbound msgs
317 list<Message*> sent; // the first bufferlist need to inject seq
318 bufferlist outcoming_bl;
319 bool keepalive;
320
321 std::mutex lock;
322 utime_t backoff; // backoff time
323 EventCallbackRef read_handler;
324 EventCallbackRef write_handler;
325 EventCallbackRef wakeup_handler;
326 EventCallbackRef tick_handler;
327 struct iovec msgvec[ASYNC_IOV_MAX];
328 char *recv_buf;
329 uint32_t recv_max_prefetch;
330 uint32_t recv_start;
331 uint32_t recv_end;
332 set<uint64_t> register_time_events; // need to delete it if stop
333 ceph::coarse_mono_clock::time_point last_active;
334 uint64_t last_tick_id = 0;
335 const uint64_t inactive_timeout_us;
336
337 // Tis section are temp variables used by state transition
338
339 // Open state
340 utime_t recv_stamp;
341 utime_t throttle_stamp;
342 unsigned msg_left;
343 uint64_t cur_msg_size;
344 ceph_msg_header current_header;
345 bufferlist data_buf;
346 bufferlist::iterator data_blp;
347 bufferlist front, middle, data;
348 ceph_msg_connect connect_msg;
349 // Connecting state
350 bool got_bad_auth;
351 AuthAuthorizer *authorizer;
352 bufferlist authorizer_buf;
353 ceph_msg_connect_reply connect_reply;
354 // Accepting state
355 entity_addr_t socket_addr;
356 CryptoKey session_key;
357 bool replacing; // when replacing process happened, we will reply connect
358 // side with RETRY tag and accept side will clear replaced
359 // connection. So when connect side reissue connect_msg,
360 // there won't exists conflicting connection so we use
361 // "replacing" to skip RESETSESSION to avoid detect wrong
362 // presentation
363 bool is_reset_from_peer;
364 bool once_ready;
365
366 // used only for local state, it will be overwrite when state transition
367 char *state_buffer;
368 // used only by "read_until"
369 uint64_t state_offset;
370 Worker *worker;
371 EventCenter *center;
372 ceph::shared_ptr<AuthSessionHandler> session_security;
373
374 public:
375 // used by eventcallback
376 void handle_write();
377 void process();
378 void wakeup_from(uint64_t id);
379 void tick(uint64_t id);
380 void local_deliver();
381 void stop(bool queue_reset) {
382 lock.lock();
383 bool need_queue_reset = (state != STATE_CLOSED) && queue_reset;
384 _stop();
385 lock.unlock();
386 if (need_queue_reset)
387 dispatch_queue->queue_reset(this);
388 }
389 void cleanup() {
390 shutdown_socket();
391 delete read_handler;
392 delete write_handler;
393 delete wakeup_handler;
394 delete tick_handler;
395 if (delay_state) {
396 delete delay_state;
397 delay_state = NULL;
398 }
399 }
400 PerfCounters *get_perf_counter() {
401 return logger;
402 }
403 }; /* AsyncConnection */
404
405 typedef boost::intrusive_ptr<AsyncConnection> AsyncConnectionRef;
406
407 #endif