]> git.proxmox.com Git - ceph.git/blob - ceph/src/msg/async/AsyncConnection.h
update sources to 12.2.7
[ceph.git] / ceph / src / msg / async / AsyncConnection.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2014 UnitedStack <haomai@unitedstack.com>
7 *
8 * Author: Haomai Wang <haomaiwang@gmail.com>
9 *
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
14 *
15 */
16
17 #ifndef CEPH_MSG_ASYNCCONNECTION_H
18 #define CEPH_MSG_ASYNCCONNECTION_H
19
20 #include <atomic>
21 #include <pthread.h>
22 #include <climits>
23 #include <list>
24 #include <mutex>
25 #include <map>
26 using namespace std;
27
28 #include "auth/AuthSessionHandler.h"
29 #include "common/ceph_time.h"
30 #include "common/perf_counters.h"
31 #include "include/buffer.h"
32 #include "msg/Connection.h"
33 #include "msg/Messenger.h"
34
35 #include "Event.h"
36 #include "Stack.h"
37
38 class AsyncMessenger;
39 class Worker;
40
41 static const int ASYNC_IOV_MAX = (IOV_MAX >= 1024 ? IOV_MAX / 4 : IOV_MAX);
42
43 /*
44 * AsyncConnection maintains a logic session between two endpoints. In other
45 * word, a pair of addresses can find the only AsyncConnection. AsyncConnection
46 * will handle with network fault or read/write transactions. If one file
47 * descriptor broken, AsyncConnection will maintain the message queue and
48 * sequence, try to reconnect peer endpoint.
49 */
50 class AsyncConnection : public Connection {
51
52 ssize_t read_bulk(char *buf, unsigned len);
53 ssize_t do_sendmsg(struct msghdr &msg, unsigned len, bool more);
54 ssize_t try_send(bufferlist &bl, bool more=false) {
55 std::lock_guard<std::mutex> l(write_lock);
56 outcoming_bl.claim_append(bl);
57 return _try_send(more);
58 }
59 ssize_t _try_send(bool more=false);
60 ssize_t _send(Message *m);
61 void prepare_send_message(uint64_t features, Message *m, bufferlist &bl);
62 ssize_t read_until(unsigned needed, char *p);
63 ssize_t _process_connection();
64 void _connect();
65 void _stop();
66 int handle_connect_reply(ceph_msg_connect &connect, ceph_msg_connect_reply &r);
67 ssize_t handle_connect_msg(ceph_msg_connect &m, bufferlist &aubl, bufferlist &bl);
68 void was_session_reset();
69 void fault();
70 void discard_out_queue();
71 void discard_requeued_up_to(uint64_t seq);
72 void requeue_sent();
73 int randomize_out_seq();
74 void handle_ack(uint64_t seq);
75 void _append_keepalive_or_ack(bool ack=false, utime_t *t=NULL);
76 ssize_t write_message(Message *m, bufferlist& bl, bool more);
77 void inject_delay();
78 ssize_t _reply_accept(char tag, ceph_msg_connect &connect, ceph_msg_connect_reply &reply,
79 bufferlist &authorizer_reply) {
80 bufferlist reply_bl;
81 reply.tag = tag;
82 reply.features = ((uint64_t)connect.features & policy.features_supported) | policy.features_required;
83 reply.authorizer_len = authorizer_reply.length();
84 reply_bl.append((char*)&reply, sizeof(reply));
85 if (reply.authorizer_len) {
86 reply_bl.append(authorizer_reply.c_str(), authorizer_reply.length());
87 }
88 ssize_t r = try_send(reply_bl);
89 if (r < 0) {
90 inject_delay();
91 return -1;
92 }
93
94 state = STATE_ACCEPTING_WAIT_CONNECT_MSG;
95 return 0;
96 }
97 bool is_queued() const {
98 return !out_q.empty() || outcoming_bl.length();
99 }
100 void shutdown_socket() {
101 for (auto &&t : register_time_events)
102 center->delete_time_event(t);
103 register_time_events.clear();
104 if (last_tick_id) {
105 center->delete_time_event(last_tick_id);
106 last_tick_id = 0;
107 }
108 if (cs) {
109 center->delete_file_event(cs.fd(), EVENT_READABLE|EVENT_WRITABLE);
110 cs.shutdown();
111 cs.close();
112 }
113 }
114 Message *_get_next_outgoing(bufferlist *bl) {
115 Message *m = 0;
116 while (!m && !out_q.empty()) {
117 map<int, list<pair<bufferlist, Message*> > >::reverse_iterator it = out_q.rbegin();
118 if (!it->second.empty()) {
119 list<pair<bufferlist, Message*> >::iterator p = it->second.begin();
120 m = p->second;
121 if (bl)
122 bl->swap(p->first);
123 it->second.erase(p);
124 }
125 if (it->second.empty())
126 out_q.erase(it->first);
127 }
128 return m;
129 }
130 bool _has_next_outgoing() const {
131 return !out_q.empty();
132 }
133 void reset_recv_state();
134
135 /**
136 * The DelayedDelivery is for injecting delays into Message delivery off
137 * the socket. It is only enabled if delays are requested, and if they
138 * are then it pulls Messages off the DelayQueue and puts them into the
139 * AsyncMessenger event queue.
140 */
141 class DelayedDelivery : public EventCallback {
142 std::set<uint64_t> register_time_events; // need to delete it if stop
143 std::deque<std::pair<utime_t, Message*> > delay_queue;
144 std::mutex delay_lock;
145 AsyncMessenger *msgr;
146 EventCenter *center;
147 DispatchQueue *dispatch_queue;
148 uint64_t conn_id;
149 std::atomic_bool stop_dispatch;
150
151 public:
152 explicit DelayedDelivery(AsyncMessenger *omsgr, EventCenter *c,
153 DispatchQueue *q, uint64_t cid)
154 : msgr(omsgr), center(c), dispatch_queue(q), conn_id(cid),
155 stop_dispatch(false) { }
156 ~DelayedDelivery() override {
157 assert(register_time_events.empty());
158 assert(delay_queue.empty());
159 }
160 void set_center(EventCenter *c) { center = c; }
161 void do_request(int id) override;
162 void queue(double delay_period, utime_t release, Message *m) {
163 std::lock_guard<std::mutex> l(delay_lock);
164 delay_queue.push_back(std::make_pair(release, m));
165 register_time_events.insert(center->create_time_event(delay_period*1000000, this));
166 }
167 void discard() {
168 stop_dispatch = true;
169 center->submit_to(center->get_id(), [this] () mutable {
170 std::lock_guard<std::mutex> l(delay_lock);
171 while (!delay_queue.empty()) {
172 Message *m = delay_queue.front().second;
173 dispatch_queue->dispatch_throttle_release(m->get_dispatch_throttle_size());
174 m->put();
175 delay_queue.pop_front();
176 }
177 for (auto i : register_time_events)
178 center->delete_time_event(i);
179 register_time_events.clear();
180 stop_dispatch = false;
181 }, true);
182 }
183 bool ready() const { return !stop_dispatch && delay_queue.empty() && register_time_events.empty(); }
184 void flush();
185 } *delay_state;
186
187 public:
188 AsyncConnection(CephContext *cct, AsyncMessenger *m, DispatchQueue *q, Worker *w);
189 ~AsyncConnection() override;
190 void maybe_start_delay_thread();
191
192 ostream& _conn_prefix(std::ostream *_dout);
193
194 bool is_connected() override {
195 return can_write.load() == WriteStatus::CANWRITE;
196 }
197
198 // Only call when AsyncConnection first construct
199 void connect(const entity_addr_t& addr, int type) {
200 set_peer_type(type);
201 set_peer_addr(addr);
202 policy = msgr->get_policy(type);
203 _connect();
204 }
205 // Only call when AsyncConnection first construct
206 void accept(ConnectedSocket socket, entity_addr_t &addr);
207 int send_message(Message *m) override;
208
209 void send_keepalive() override;
210 void mark_down() override;
211 void mark_disposable() override {
212 std::lock_guard<std::mutex> l(lock);
213 policy.lossy = true;
214 }
215
216 private:
217 enum {
218 STATE_NONE,
219 STATE_OPEN,
220 STATE_OPEN_KEEPALIVE2,
221 STATE_OPEN_KEEPALIVE2_ACK,
222 STATE_OPEN_TAG_ACK,
223 STATE_OPEN_MESSAGE_HEADER,
224 STATE_OPEN_MESSAGE_THROTTLE_MESSAGE,
225 STATE_OPEN_MESSAGE_THROTTLE_BYTES,
226 STATE_OPEN_MESSAGE_THROTTLE_DISPATCH_QUEUE,
227 STATE_OPEN_MESSAGE_READ_FRONT,
228 STATE_OPEN_MESSAGE_READ_MIDDLE,
229 STATE_OPEN_MESSAGE_READ_DATA_PREPARE,
230 STATE_OPEN_MESSAGE_READ_DATA,
231 STATE_OPEN_MESSAGE_READ_FOOTER_AND_DISPATCH,
232 STATE_OPEN_TAG_CLOSE,
233 STATE_WAIT_SEND,
234 STATE_CONNECTING,
235 STATE_CONNECTING_RE,
236 STATE_CONNECTING_WAIT_BANNER_AND_IDENTIFY,
237 STATE_CONNECTING_SEND_CONNECT_MSG,
238 STATE_CONNECTING_WAIT_CONNECT_REPLY,
239 STATE_CONNECTING_WAIT_CONNECT_REPLY_AUTH,
240 STATE_CONNECTING_WAIT_ACK_SEQ,
241 STATE_CONNECTING_READY,
242 STATE_ACCEPTING,
243 STATE_ACCEPTING_WAIT_BANNER_ADDR,
244 STATE_ACCEPTING_WAIT_CONNECT_MSG,
245 STATE_ACCEPTING_WAIT_CONNECT_MSG_AUTH,
246 STATE_ACCEPTING_WAIT_SEQ,
247 STATE_ACCEPTING_READY,
248 STATE_STANDBY,
249 STATE_CLOSED,
250 STATE_WAIT, // just wait for racing connection
251 };
252
253 static const int TCP_PREFETCH_MIN_SIZE;
254 static const char *get_state_name(int state) {
255 const char* const statenames[] = {"STATE_NONE",
256 "STATE_OPEN",
257 "STATE_OPEN_KEEPALIVE2",
258 "STATE_OPEN_KEEPALIVE2_ACK",
259 "STATE_OPEN_TAG_ACK",
260 "STATE_OPEN_MESSAGE_HEADER",
261 "STATE_OPEN_MESSAGE_THROTTLE_MESSAGE",
262 "STATE_OPEN_MESSAGE_THROTTLE_BYTES",
263 "STATE_OPEN_MESSAGE_THROTTLE_DISPATCH_QUEUE",
264 "STATE_OPEN_MESSAGE_READ_FRONT",
265 "STATE_OPEN_MESSAGE_READ_MIDDLE",
266 "STATE_OPEN_MESSAGE_READ_DATA_PREPARE",
267 "STATE_OPEN_MESSAGE_READ_DATA",
268 "STATE_OPEN_MESSAGE_READ_FOOTER_AND_DISPATCH",
269 "STATE_OPEN_TAG_CLOSE",
270 "STATE_WAIT_SEND",
271 "STATE_CONNECTING",
272 "STATE_CONNECTING_RE",
273 "STATE_CONNECTING_WAIT_BANNER_AND_IDENTIFY",
274 "STATE_CONNECTING_SEND_CONNECT_MSG",
275 "STATE_CONNECTING_WAIT_CONNECT_REPLY",
276 "STATE_CONNECTING_WAIT_CONNECT_REPLY_AUTH",
277 "STATE_CONNECTING_WAIT_ACK_SEQ",
278 "STATE_CONNECTING_READY",
279 "STATE_ACCEPTING",
280 "STATE_ACCEPTING_WAIT_BANNER_ADDR",
281 "STATE_ACCEPTING_WAIT_CONNECT_MSG",
282 "STATE_ACCEPTING_WAIT_CONNECT_MSG_AUTH",
283 "STATE_ACCEPTING_WAIT_SEQ",
284 "STATE_ACCEPTING_READY",
285 "STATE_STANDBY",
286 "STATE_CLOSED",
287 "STATE_WAIT"};
288 return statenames[state];
289 }
290
291 AsyncMessenger *async_msgr;
292 uint64_t conn_id;
293 PerfCounters *logger;
294 int global_seq;
295 __u32 connect_seq, peer_global_seq;
296 std::atomic<uint64_t> out_seq{0};
297 std::atomic<uint64_t> ack_left{0}, in_seq{0};
298 int state;
299 int state_after_send;
300 ConnectedSocket cs;
301 int port;
302 Messenger::Policy policy;
303
304 DispatchQueue *dispatch_queue;
305
306 // lockfree, only used in own thread
307 bufferlist outcoming_bl;
308 bool open_write = false;
309
310 std::mutex write_lock;
311 enum class WriteStatus {
312 NOWRITE,
313 REPLACING,
314 CANWRITE,
315 CLOSED
316 };
317 std::atomic<WriteStatus> can_write;
318 list<Message*> sent; // the first bufferlist need to inject seq
319 map<int, list<pair<bufferlist, Message*> > > out_q; // priority queue for outbound msgs
320 bool keepalive;
321
322 std::mutex lock;
323 utime_t backoff; // backoff time
324 EventCallbackRef read_handler;
325 EventCallbackRef write_handler;
326 EventCallbackRef wakeup_handler;
327 EventCallbackRef tick_handler;
328 struct iovec msgvec[ASYNC_IOV_MAX];
329 char *recv_buf;
330 uint32_t recv_max_prefetch;
331 uint32_t recv_start;
332 uint32_t recv_end;
333 set<uint64_t> register_time_events; // need to delete it if stop
334 ceph::coarse_mono_clock::time_point last_active;
335 uint64_t last_tick_id = 0;
336 const uint64_t inactive_timeout_us;
337
338 // Tis section are temp variables used by state transition
339
340 // Open state
341 utime_t recv_stamp;
342 utime_t throttle_stamp;
343 unsigned msg_left;
344 uint64_t cur_msg_size;
345 ceph_msg_header current_header;
346 bufferlist data_buf;
347 bufferlist::iterator data_blp;
348 bufferlist front, middle, data;
349 ceph_msg_connect connect_msg;
350 // Connecting state
351 bool got_bad_auth;
352 AuthAuthorizer *authorizer;
353 bufferlist authorizer_buf;
354 ceph_msg_connect_reply connect_reply;
355 // Accepting state
356 entity_addr_t socket_addr;
357 CryptoKey session_key;
358 bool replacing; // when replacing process happened, we will reply connect
359 // side with RETRY tag and accept side will clear replaced
360 // connection. So when connect side reissue connect_msg,
361 // there won't exists conflicting connection so we use
362 // "replacing" to skip RESETSESSION to avoid detect wrong
363 // presentation
364 bool is_reset_from_peer;
365 bool once_ready;
366
367 // used only for local state, it will be overwrite when state transition
368 char *state_buffer;
369 // used only by "read_until"
370 uint64_t state_offset;
371 Worker *worker;
372 EventCenter *center;
373 ceph::shared_ptr<AuthSessionHandler> session_security;
374 std::unique_ptr<AuthAuthorizerChallenge> authorizer_challenge; // accept side
375
376 public:
377 // used by eventcallback
378 void handle_write();
379 void process();
380 void wakeup_from(uint64_t id);
381 void tick(uint64_t id);
382 void local_deliver();
383 void stop(bool queue_reset) {
384 lock.lock();
385 bool need_queue_reset = (state != STATE_CLOSED) && queue_reset;
386 _stop();
387 lock.unlock();
388 if (need_queue_reset)
389 dispatch_queue->queue_reset(this);
390 }
391 void cleanup() {
392 shutdown_socket();
393 delete read_handler;
394 delete write_handler;
395 delete wakeup_handler;
396 delete tick_handler;
397 if (delay_state) {
398 delete delay_state;
399 delay_state = NULL;
400 }
401 }
402 PerfCounters *get_perf_counter() {
403 return logger;
404 }
405 }; /* AsyncConnection */
406
407 typedef boost::intrusive_ptr<AsyncConnection> AsyncConnectionRef;
408
409 #endif