]> git.proxmox.com Git - ceph.git/blob - ceph/src/msg/async/AsyncConnection.h
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / msg / async / AsyncConnection.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2014 UnitedStack <haomai@unitedstack.com>
7 *
8 * Author: Haomai Wang <haomaiwang@gmail.com>
9 *
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
14 *
15 */
16
17 #ifndef CEPH_MSG_ASYNCCONNECTION_H
18 #define CEPH_MSG_ASYNCCONNECTION_H
19
20 #include <atomic>
21 #include <pthread.h>
22 #include <climits>
23 #include <list>
24 #include <mutex>
25 #include <map>
26 #include <functional>
27 #include <optional>
28
29 #include "auth/AuthSessionHandler.h"
30 #include "common/ceph_time.h"
31 #include "common/perf_counters.h"
32 #include "include/buffer.h"
33 #include "msg/Connection.h"
34 #include "msg/Messenger.h"
35
36 #include "Event.h"
37 #include "Stack.h"
38
39 class AsyncMessenger;
40 class DispatchQueue;
41 class Worker;
42 class Protocol;
43
44 static const int ASYNC_IOV_MAX = (IOV_MAX >= 1024 ? IOV_MAX / 4 : IOV_MAX);
45
46 /*
47 * AsyncConnection maintains a logic session between two endpoints. In other
48 * word, a pair of addresses can find the only AsyncConnection. AsyncConnection
49 * will handle with network fault or read/write transactions. If one file
50 * descriptor broken, AsyncConnection will maintain the message queue and
51 * sequence, try to reconnect peer endpoint.
52 */
53 class AsyncConnection : public Connection {
54
55 ssize_t read(unsigned len, char *buffer,
56 std::function<void(char *, ssize_t)> callback);
57 ssize_t read_until(unsigned needed, char *p);
58 ssize_t read_bulk(char *buf, unsigned len);
59
60 ssize_t write(bufferlist &bl, std::function<void(ssize_t)> callback,
61 bool more=false);
62 ssize_t _try_send(bool more=false);
63
64 void _connect();
65 void _stop();
66 void fault();
67 void inject_delay();
68
69 bool is_queued() const;
70 void shutdown_socket();
71
72 /**
73 * The DelayedDelivery is for injecting delays into Message delivery off
74 * the socket. It is only enabled if delays are requested, and if they
75 * are then it pulls Messages off the DelayQueue and puts them into the
76 * AsyncMessenger event queue.
77 */
78 class DelayedDelivery : public EventCallback {
79 std::set<uint64_t> register_time_events; // need to delete it if stop
80 std::deque<Message*> delay_queue;
81 std::mutex delay_lock;
82 AsyncMessenger *msgr;
83 EventCenter *center;
84 DispatchQueue *dispatch_queue;
85 uint64_t conn_id;
86 std::atomic_bool stop_dispatch;
87
88 public:
89 explicit DelayedDelivery(AsyncMessenger *omsgr, EventCenter *c,
90 DispatchQueue *q, uint64_t cid)
91 : msgr(omsgr), center(c), dispatch_queue(q), conn_id(cid),
92 stop_dispatch(false) { }
93 ~DelayedDelivery() override {
94 ceph_assert(register_time_events.empty());
95 ceph_assert(delay_queue.empty());
96 }
97 void set_center(EventCenter *c) { center = c; }
98 void do_request(uint64_t id) override;
99 void queue(double delay_period, Message *m) {
100 std::lock_guard<std::mutex> l(delay_lock);
101 delay_queue.push_back(m);
102 register_time_events.insert(center->create_time_event(delay_period*1000000, this));
103 }
104 void discard();
105 bool ready() const { return !stop_dispatch && delay_queue.empty() && register_time_events.empty(); }
106 void flush();
107 } *delay_state;
108
109 public:
110 AsyncConnection(CephContext *cct, AsyncMessenger *m, DispatchQueue *q,
111 Worker *w, bool is_msgr2, bool local);
112 ~AsyncConnection() override;
113 void maybe_start_delay_thread();
114
115 ostream& _conn_prefix(std::ostream *_dout);
116
117 bool is_connected() override;
118
119 // Only call when AsyncConnection first construct
120 void connect(const entity_addrvec_t& addrs, int type, entity_addr_t& target);
121
122 // Only call when AsyncConnection first construct
123 void accept(ConnectedSocket socket,
124 const entity_addr_t &listen_addr,
125 const entity_addr_t &peer_addr);
126 int send_message(Message *m) override;
127
128 void send_keepalive() override;
129 void mark_down() override;
130 void mark_disposable() override {
131 std::lock_guard<std::mutex> l(lock);
132 policy.lossy = true;
133 }
134
135 entity_addr_t get_peer_socket_addr() const override {
136 return target_addr;
137 }
138
139 int get_con_mode() const override;
140
141 private:
142 enum {
143 STATE_NONE,
144 STATE_CONNECTING,
145 STATE_CONNECTING_RE,
146 STATE_ACCEPTING,
147 STATE_CONNECTION_ESTABLISHED,
148 STATE_CLOSED
149 };
150
151 static const uint32_t TCP_PREFETCH_MIN_SIZE;
152 static const char *get_state_name(int state) {
153 const char* const statenames[] = {"STATE_NONE",
154 "STATE_CONNECTING",
155 "STATE_CONNECTING_RE",
156 "STATE_ACCEPTING",
157 "STATE_CONNECTION_ESTABLISHED",
158 "STATE_CLOSED"};
159 return statenames[state];
160 }
161
162 AsyncMessenger *async_msgr;
163 uint64_t conn_id;
164 PerfCounters *logger;
165 int state;
166 ConnectedSocket cs;
167 int port;
168 Messenger::Policy policy;
169
170 DispatchQueue *dispatch_queue;
171
172 // lockfree, only used in own thread
173 bufferlist outcoming_bl;
174 bool open_write = false;
175
176 std::mutex write_lock;
177
178 std::mutex lock;
179 EventCallbackRef read_handler;
180 EventCallbackRef write_handler;
181 EventCallbackRef write_callback_handler;
182 EventCallbackRef wakeup_handler;
183 EventCallbackRef tick_handler;
184 char *recv_buf;
185 uint32_t recv_max_prefetch;
186 uint32_t recv_start;
187 uint32_t recv_end;
188 set<uint64_t> register_time_events; // need to delete it if stop
189 ceph::coarse_mono_clock::time_point last_active;
190 ceph::mono_clock::time_point recv_start_time;
191 uint64_t last_tick_id = 0;
192 const uint64_t inactive_timeout_us;
193
194 // Tis section are temp variables used by state transition
195
196 // Accepting state
197 bool msgr2 = false;
198 entity_addr_t socket_addr; ///< local socket addr
199 entity_addr_t target_addr; ///< which of the peer_addrs we're connecting to (as clienet) or should reconnect to (as peer)
200
201 entity_addr_t _infer_target_addr(const entity_addrvec_t& av);
202
203 // used only by "read_until"
204 uint64_t state_offset;
205 Worker *worker;
206 EventCenter *center;
207
208 std::unique_ptr<Protocol> protocol;
209
210 std::optional<std::function<void(ssize_t)>> writeCallback;
211 std::function<void(char *, ssize_t)> readCallback;
212 std::optional<unsigned> pendingReadLen;
213 char *read_buffer;
214
215 public:
216 // used by eventcallback
217 void handle_write();
218 void handle_write_callback();
219 void process();
220 void wakeup_from(uint64_t id);
221 void tick(uint64_t id);
222 void local_deliver();
223 void stop(bool queue_reset);
224 void cleanup();
225 PerfCounters *get_perf_counter() {
226 return logger;
227 }
228
229 friend class Protocol;
230 friend class ProtocolV1;
231 friend class ProtocolV2;
232 }; /* AsyncConnection */
233
234 typedef boost::intrusive_ptr<AsyncConnection> AsyncConnectionRef;
235
236 #endif