]>
Commit | Line | Data |
---|---|---|
11fdf7f2 | 1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
7c673cae FG |
2 | // vim: ts=8 sw=2 smarttab |
3 | /* | |
4 | * Ceph - scalable distributed file system | |
5 | * | |
6 | * Copyright (C) 2014 UnitedStack <haomai@unitedstack.com> | |
7 | * | |
8 | * Author: Haomai Wang <haomaiwang@gmail.com> | |
9 | * | |
10 | * This is free software; you can redistribute it and/or | |
11 | * modify it under the terms of the GNU Lesser General Public | |
12 | * License version 2.1, as published by the Free Software | |
13 | * Foundation. See file COPYING. | |
14 | * | |
15 | */ | |
16 | ||
17 | #ifndef CEPH_MSG_ASYNCCONNECTION_H | |
18 | #define CEPH_MSG_ASYNCCONNECTION_H | |
19 | ||
20 | #include <atomic> | |
21 | #include <pthread.h> | |
7c673cae FG |
22 | #include <climits> |
23 | #include <list> | |
24 | #include <mutex> | |
25 | #include <map> | |
11fdf7f2 TL |
26 | #include <functional> |
27 | #include <optional> | |
7c673cae FG |
28 | |
29 | #include "auth/AuthSessionHandler.h" | |
30 | #include "common/ceph_time.h" | |
31 | #include "common/perf_counters.h" | |
32 | #include "include/buffer.h" | |
33 | #include "msg/Connection.h" | |
34 | #include "msg/Messenger.h" | |
35 | ||
36 | #include "Event.h" | |
37 | #include "Stack.h" | |
38 | ||
39 | class AsyncMessenger; | |
11fdf7f2 | 40 | class DispatchQueue; |
7c673cae | 41 | class Worker; |
11fdf7f2 | 42 | class Protocol; |
7c673cae FG |
43 | |
44 | static const int ASYNC_IOV_MAX = (IOV_MAX >= 1024 ? IOV_MAX / 4 : IOV_MAX); | |
45 | ||
46 | /* | |
47 | * AsyncConnection maintains a logic session between two endpoints. In other | |
48 | * word, a pair of addresses can find the only AsyncConnection. AsyncConnection | |
49 | * will handle with network fault or read/write transactions. If one file | |
50 | * descriptor broken, AsyncConnection will maintain the message queue and | |
51 | * sequence, try to reconnect peer endpoint. | |
52 | */ | |
53 | class AsyncConnection : public Connection { | |
54 | ||
11fdf7f2 TL |
55 | ssize_t read(unsigned len, char *buffer, |
56 | std::function<void(char *, ssize_t)> callback); | |
57 | ssize_t read_until(unsigned needed, char *p); | |
7c673cae | 58 | ssize_t read_bulk(char *buf, unsigned len); |
11fdf7f2 TL |
59 | |
60 | ssize_t write(bufferlist &bl, std::function<void(ssize_t)> callback, | |
61 | bool more=false); | |
7c673cae | 62 | ssize_t _try_send(bool more=false); |
11fdf7f2 | 63 | |
7c673cae FG |
64 | void _connect(); |
65 | void _stop(); | |
7c673cae | 66 | void fault(); |
7c673cae | 67 | void inject_delay(); |
7c673cae | 68 | |
11fdf7f2 TL |
69 | bool is_queued() const; |
70 | void shutdown_socket(); | |
7c673cae FG |
71 | |
72 | /** | |
73 | * The DelayedDelivery is for injecting delays into Message delivery off | |
74 | * the socket. It is only enabled if delays are requested, and if they | |
75 | * are then it pulls Messages off the DelayQueue and puts them into the | |
76 | * AsyncMessenger event queue. | |
77 | */ | |
78 | class DelayedDelivery : public EventCallback { | |
79 | std::set<uint64_t> register_time_events; // need to delete it if stop | |
11fdf7f2 | 80 | std::deque<Message*> delay_queue; |
7c673cae FG |
81 | std::mutex delay_lock; |
82 | AsyncMessenger *msgr; | |
83 | EventCenter *center; | |
84 | DispatchQueue *dispatch_queue; | |
85 | uint64_t conn_id; | |
86 | std::atomic_bool stop_dispatch; | |
87 | ||
88 | public: | |
89 | explicit DelayedDelivery(AsyncMessenger *omsgr, EventCenter *c, | |
90 | DispatchQueue *q, uint64_t cid) | |
91 | : msgr(omsgr), center(c), dispatch_queue(q), conn_id(cid), | |
92 | stop_dispatch(false) { } | |
93 | ~DelayedDelivery() override { | |
11fdf7f2 TL |
94 | ceph_assert(register_time_events.empty()); |
95 | ceph_assert(delay_queue.empty()); | |
7c673cae FG |
96 | } |
97 | void set_center(EventCenter *c) { center = c; } | |
11fdf7f2 TL |
98 | void do_request(uint64_t id) override; |
99 | void queue(double delay_period, Message *m) { | |
7c673cae | 100 | std::lock_guard<std::mutex> l(delay_lock); |
11fdf7f2 | 101 | delay_queue.push_back(m); |
7c673cae FG |
102 | register_time_events.insert(center->create_time_event(delay_period*1000000, this)); |
103 | } | |
11fdf7f2 | 104 | void discard(); |
7c673cae FG |
105 | bool ready() const { return !stop_dispatch && delay_queue.empty() && register_time_events.empty(); } |
106 | void flush(); | |
107 | } *delay_state; | |
108 | ||
109 | public: | |
11fdf7f2 TL |
110 | AsyncConnection(CephContext *cct, AsyncMessenger *m, DispatchQueue *q, |
111 | Worker *w, bool is_msgr2, bool local); | |
7c673cae FG |
112 | ~AsyncConnection() override; |
113 | void maybe_start_delay_thread(); | |
114 | ||
115 | ostream& _conn_prefix(std::ostream *_dout); | |
116 | ||
11fdf7f2 | 117 | bool is_connected() override; |
7c673cae FG |
118 | |
119 | // Only call when AsyncConnection first construct | |
11fdf7f2 TL |
120 | void connect(const entity_addrvec_t& addrs, int type, entity_addr_t& target); |
121 | ||
7c673cae | 122 | // Only call when AsyncConnection first construct |
11fdf7f2 TL |
123 | void accept(ConnectedSocket socket, |
124 | const entity_addr_t &listen_addr, | |
125 | const entity_addr_t &peer_addr); | |
7c673cae FG |
126 | int send_message(Message *m) override; |
127 | ||
128 | void send_keepalive() override; | |
129 | void mark_down() override; | |
130 | void mark_disposable() override { | |
131 | std::lock_guard<std::mutex> l(lock); | |
132 | policy.lossy = true; | |
133 | } | |
11fdf7f2 TL |
134 | |
135 | entity_addr_t get_peer_socket_addr() const override { | |
136 | return target_addr; | |
137 | } | |
138 | ||
139 | int get_con_mode() const override; | |
140 | ||
7c673cae FG |
141 | private: |
142 | enum { | |
143 | STATE_NONE, | |
7c673cae FG |
144 | STATE_CONNECTING, |
145 | STATE_CONNECTING_RE, | |
7c673cae | 146 | STATE_ACCEPTING, |
11fdf7f2 TL |
147 | STATE_CONNECTION_ESTABLISHED, |
148 | STATE_CLOSED | |
7c673cae FG |
149 | }; |
150 | ||
11fdf7f2 | 151 | static const uint32_t TCP_PREFETCH_MIN_SIZE; |
7c673cae FG |
152 | static const char *get_state_name(int state) { |
153 | const char* const statenames[] = {"STATE_NONE", | |
7c673cae FG |
154 | "STATE_CONNECTING", |
155 | "STATE_CONNECTING_RE", | |
7c673cae | 156 | "STATE_ACCEPTING", |
11fdf7f2 TL |
157 | "STATE_CONNECTION_ESTABLISHED", |
158 | "STATE_CLOSED"}; | |
7c673cae FG |
159 | return statenames[state]; |
160 | } | |
161 | ||
162 | AsyncMessenger *async_msgr; | |
163 | uint64_t conn_id; | |
164 | PerfCounters *logger; | |
7c673cae | 165 | int state; |
7c673cae FG |
166 | ConnectedSocket cs; |
167 | int port; | |
168 | Messenger::Policy policy; | |
169 | ||
170 | DispatchQueue *dispatch_queue; | |
171 | ||
31f18b77 FG |
172 | // lockfree, only used in own thread |
173 | bufferlist outcoming_bl; | |
174 | bool open_write = false; | |
175 | ||
7c673cae | 176 | std::mutex write_lock; |
7c673cae FG |
177 | |
178 | std::mutex lock; | |
7c673cae FG |
179 | EventCallbackRef read_handler; |
180 | EventCallbackRef write_handler; | |
11fdf7f2 | 181 | EventCallbackRef write_callback_handler; |
7c673cae FG |
182 | EventCallbackRef wakeup_handler; |
183 | EventCallbackRef tick_handler; | |
7c673cae FG |
184 | char *recv_buf; |
185 | uint32_t recv_max_prefetch; | |
186 | uint32_t recv_start; | |
187 | uint32_t recv_end; | |
188 | set<uint64_t> register_time_events; // need to delete it if stop | |
189 | ceph::coarse_mono_clock::time_point last_active; | |
11fdf7f2 | 190 | ceph::mono_clock::time_point recv_start_time; |
7c673cae FG |
191 | uint64_t last_tick_id = 0; |
192 | const uint64_t inactive_timeout_us; | |
193 | ||
194 | // Tis section are temp variables used by state transition | |
195 | ||
7c673cae | 196 | // Accepting state |
11fdf7f2 TL |
197 | bool msgr2 = false; |
198 | entity_addr_t socket_addr; ///< local socket addr | |
199 | entity_addr_t target_addr; ///< which of the peer_addrs we're connecting to (as clienet) or should reconnect to (as peer) | |
200 | ||
201 | entity_addr_t _infer_target_addr(const entity_addrvec_t& av); | |
202 | ||
7c673cae FG |
203 | // used only by "read_until" |
204 | uint64_t state_offset; | |
205 | Worker *worker; | |
206 | EventCenter *center; | |
11fdf7f2 TL |
207 | |
208 | std::unique_ptr<Protocol> protocol; | |
209 | ||
210 | std::optional<std::function<void(ssize_t)>> writeCallback; | |
211 | std::function<void(char *, ssize_t)> readCallback; | |
212 | std::optional<unsigned> pendingReadLen; | |
213 | char *read_buffer; | |
7c673cae FG |
214 | |
215 | public: | |
216 | // used by eventcallback | |
217 | void handle_write(); | |
11fdf7f2 | 218 | void handle_write_callback(); |
7c673cae FG |
219 | void process(); |
220 | void wakeup_from(uint64_t id); | |
221 | void tick(uint64_t id); | |
222 | void local_deliver(); | |
11fdf7f2 TL |
223 | void stop(bool queue_reset); |
224 | void cleanup(); | |
7c673cae FG |
225 | PerfCounters *get_perf_counter() { |
226 | return logger; | |
227 | } | |
11fdf7f2 TL |
228 | |
229 | friend class Protocol; | |
230 | friend class ProtocolV1; | |
231 | friend class ProtocolV2; | |
7c673cae FG |
232 | }; /* AsyncConnection */ |
233 | ||
234 | typedef boost::intrusive_ptr<AsyncConnection> AsyncConnectionRef; | |
235 | ||
236 | #endif |