]> git.proxmox.com Git - ceph.git/blob - ceph/src/msg/async/AsyncConnection.h
add stop-gap to fix compat with CPUs not supporting SSE 4.1
[ceph.git] / ceph / src / msg / async / AsyncConnection.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2014 UnitedStack <haomai@unitedstack.com>
7 *
8 * Author: Haomai Wang <haomaiwang@gmail.com>
9 *
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
14 *
15 */
16
17 #ifndef CEPH_MSG_ASYNCCONNECTION_H
18 #define CEPH_MSG_ASYNCCONNECTION_H
19
20 #include <atomic>
21 #include <pthread.h>
22 #include <climits>
23 #include <list>
24 #include <mutex>
25 #include <map>
26 #include <functional>
27 #include <optional>
28
29 #include "auth/AuthSessionHandler.h"
30 #include "common/ceph_time.h"
31 #include "common/perf_counters.h"
32 #include "include/buffer.h"
33 #include "msg/Connection.h"
34 #include "msg/Messenger.h"
35
36 #include "Event.h"
37 #include "Stack.h"
38
39 class AsyncMessenger;
40 class DispatchQueue;
41 class Worker;
42 class Protocol;
43
44 static const int ASYNC_IOV_MAX = (IOV_MAX >= 1024 ? IOV_MAX / 4 : IOV_MAX);
45
46 /*
47 * AsyncConnection maintains a logic session between two endpoints. In other
48 * word, a pair of addresses can find the only AsyncConnection. AsyncConnection
49 * will handle with network fault or read/write transactions. If one file
50 * descriptor broken, AsyncConnection will maintain the message queue and
51 * sequence, try to reconnect peer endpoint.
52 */
53 class AsyncConnection : public Connection {
54 ssize_t read(unsigned len, char *buffer,
55 std::function<void(char *, ssize_t)> callback);
56 ssize_t read_until(unsigned needed, char *p);
57 ssize_t read_bulk(char *buf, unsigned len);
58
59 ssize_t write(ceph::buffer::list &bl, std::function<void(ssize_t)> callback,
60 bool more=false);
61 ssize_t _try_send(bool more=false);
62
63 void _connect();
64 void _stop();
65 void fault();
66 void inject_delay();
67 bool inject_network_congestion() const;
68
69 bool is_queued() const;
70 void shutdown_socket();
71
72 /**
73 * The DelayedDelivery is for injecting delays into Message delivery off
74 * the socket. It is only enabled if delays are requested, and if they
75 * are then it pulls Messages off the DelayQueue and puts them into the
76 * AsyncMessenger event queue.
77 */
78 class DelayedDelivery : public EventCallback {
79 std::set<uint64_t> register_time_events; // need to delete it if stop
80 std::deque<Message*> delay_queue;
81 std::mutex delay_lock;
82 AsyncMessenger *msgr;
83 EventCenter *center;
84 DispatchQueue *dispatch_queue;
85 uint64_t conn_id;
86 std::atomic_bool stop_dispatch;
87
88 public:
89 explicit DelayedDelivery(AsyncMessenger *omsgr, EventCenter *c,
90 DispatchQueue *q, uint64_t cid)
91 : msgr(omsgr), center(c), dispatch_queue(q), conn_id(cid),
92 stop_dispatch(false) { }
93 ~DelayedDelivery() override {
94 ceph_assert(register_time_events.empty());
95 ceph_assert(delay_queue.empty());
96 }
97 void set_center(EventCenter *c) { center = c; }
98 void do_request(uint64_t id) override;
99 void queue(double delay_period, Message *m) {
100 std::lock_guard<std::mutex> l(delay_lock);
101 delay_queue.push_back(m);
102 register_time_events.insert(center->create_time_event(delay_period*1000000, this));
103 }
104 void discard();
105 bool ready() const { return !stop_dispatch && delay_queue.empty() && register_time_events.empty(); }
106 void flush();
107 } *delay_state;
108
109 private:
110 FRIEND_MAKE_REF(AsyncConnection);
111 AsyncConnection(CephContext *cct, AsyncMessenger *m, DispatchQueue *q,
112 Worker *w, bool is_msgr2, bool local);
113 ~AsyncConnection() override;
114 bool unregistered = false;
115 public:
116 void maybe_start_delay_thread();
117
118 std::ostream& _conn_prefix(std::ostream *_dout);
119
120 bool is_connected() override;
121
122 // Only call when AsyncConnection first construct
123 void connect(const entity_addrvec_t& addrs, int type, entity_addr_t& target);
124
125 // Only call when AsyncConnection first construct
126 void accept(ConnectedSocket socket,
127 const entity_addr_t &listen_addr,
128 const entity_addr_t &peer_addr);
129 int send_message(Message *m) override;
130
131 void send_keepalive() override;
132 void mark_down() override;
133 void mark_disposable() override {
134 std::lock_guard<std::mutex> l(lock);
135 policy.lossy = true;
136 }
137
138 entity_addr_t get_peer_socket_addr() const override {
139 return target_addr;
140 }
141
142 int get_con_mode() const override;
143
144 bool is_unregistered() const {
145 return unregistered;
146 }
147
148 void unregister() {
149 unregistered = true;
150 }
151
152 private:
153 enum {
154 STATE_NONE,
155 STATE_CONNECTING,
156 STATE_CONNECTING_RE,
157 STATE_ACCEPTING,
158 STATE_CONNECTION_ESTABLISHED,
159 STATE_CLOSED
160 };
161
162 static const uint32_t TCP_PREFETCH_MIN_SIZE;
163 static const char *get_state_name(int state) {
164 const char* const statenames[] = {"STATE_NONE",
165 "STATE_CONNECTING",
166 "STATE_CONNECTING_RE",
167 "STATE_ACCEPTING",
168 "STATE_CONNECTION_ESTABLISHED",
169 "STATE_CLOSED"};
170 return statenames[state];
171 }
172
173 AsyncMessenger *async_msgr;
174 uint64_t conn_id;
175 PerfCounters *logger;
176 int state;
177 ConnectedSocket cs;
178 int port;
179 public:
180 Messenger::Policy policy;
181 private:
182
183 DispatchQueue *dispatch_queue;
184
185 // lockfree, only used in own thread
186 ceph::buffer::list outgoing_bl;
187 bool open_write = false;
188
189 std::mutex write_lock;
190
191 std::mutex lock;
192 EventCallbackRef read_handler;
193 EventCallbackRef write_handler;
194 EventCallbackRef write_callback_handler;
195 EventCallbackRef wakeup_handler;
196 EventCallbackRef tick_handler;
197 char *recv_buf;
198 uint32_t recv_max_prefetch;
199 uint32_t recv_start;
200 uint32_t recv_end;
201 std::set<uint64_t> register_time_events; // need to delete it if stop
202 ceph::coarse_mono_clock::time_point last_connect_started;
203 ceph::coarse_mono_clock::time_point last_active;
204 ceph::mono_clock::time_point recv_start_time;
205 uint64_t last_tick_id = 0;
206 const uint64_t connect_timeout_us;
207 const uint64_t inactive_timeout_us;
208
209 // Tis section are temp variables used by state transition
210
211 // Accepting state
212 bool msgr2 = false;
213 entity_addr_t socket_addr; ///< local socket addr
214 entity_addr_t target_addr; ///< which of the peer_addrs we're connecting to (as clienet) or should reconnect to (as peer)
215
216 entity_addr_t _infer_target_addr(const entity_addrvec_t& av);
217
218 // used only by "read_until"
219 uint64_t state_offset;
220 Worker *worker;
221 EventCenter *center;
222
223 std::unique_ptr<Protocol> protocol;
224
225 std::optional<std::function<void(ssize_t)>> writeCallback;
226 std::function<void(char *, ssize_t)> readCallback;
227 std::optional<unsigned> pendingReadLen;
228 char *read_buffer;
229
230 public:
231 // used by eventcallback
232 void handle_write();
233 void handle_write_callback();
234 void process();
235 void wakeup_from(uint64_t id);
236 void tick(uint64_t id);
237 void stop(bool queue_reset);
238 void cleanup();
239 PerfCounters *get_perf_counter() {
240 return logger;
241 }
242
243 bool is_msgr2() const override;
244
245 friend class Protocol;
246 friend class ProtocolV1;
247 friend class ProtocolV2;
248 }; /* AsyncConnection */
249
250 using AsyncConnectionRef = ceph::ref_t<AsyncConnection>;
251
252 #endif