]> git.proxmox.com Git - ceph.git/blame - ceph/src/msg/async/AsyncConnection.h
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / msg / async / AsyncConnection.h
CommitLineData
11fdf7f2 1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
7c673cae
FG
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2014 UnitedStack <haomai@unitedstack.com>
7 *
8 * Author: Haomai Wang <haomaiwang@gmail.com>
9 *
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
14 *
15 */
16
17#ifndef CEPH_MSG_ASYNCCONNECTION_H
18#define CEPH_MSG_ASYNCCONNECTION_H
19
20#include <atomic>
21#include <pthread.h>
7c673cae
FG
22#include <climits>
23#include <list>
24#include <mutex>
25#include <map>
11fdf7f2
TL
26#include <functional>
27#include <optional>
7c673cae
FG
28
29#include "auth/AuthSessionHandler.h"
30#include "common/ceph_time.h"
31#include "common/perf_counters.h"
32#include "include/buffer.h"
33#include "msg/Connection.h"
34#include "msg/Messenger.h"
35
36#include "Event.h"
37#include "Stack.h"
38
39class AsyncMessenger;
11fdf7f2 40class DispatchQueue;
7c673cae 41class Worker;
11fdf7f2 42class Protocol;
7c673cae
FG
43
44static const int ASYNC_IOV_MAX = (IOV_MAX >= 1024 ? IOV_MAX / 4 : IOV_MAX);
45
46/*
47 * AsyncConnection maintains a logic session between two endpoints. In other
48 * word, a pair of addresses can find the only AsyncConnection. AsyncConnection
49 * will handle with network fault or read/write transactions. If one file
50 * descriptor broken, AsyncConnection will maintain the message queue and
51 * sequence, try to reconnect peer endpoint.
52 */
53class AsyncConnection : public Connection {
54
11fdf7f2
TL
55 ssize_t read(unsigned len, char *buffer,
56 std::function<void(char *, ssize_t)> callback);
57 ssize_t read_until(unsigned needed, char *p);
7c673cae 58 ssize_t read_bulk(char *buf, unsigned len);
11fdf7f2
TL
59
60 ssize_t write(bufferlist &bl, std::function<void(ssize_t)> callback,
61 bool more=false);
7c673cae 62 ssize_t _try_send(bool more=false);
11fdf7f2 63
7c673cae
FG
64 void _connect();
65 void _stop();
7c673cae 66 void fault();
7c673cae 67 void inject_delay();
7c673cae 68
11fdf7f2
TL
69 bool is_queued() const;
70 void shutdown_socket();
7c673cae
FG
71
72 /**
73 * The DelayedDelivery is for injecting delays into Message delivery off
74 * the socket. It is only enabled if delays are requested, and if they
75 * are then it pulls Messages off the DelayQueue and puts them into the
76 * AsyncMessenger event queue.
77 */
78 class DelayedDelivery : public EventCallback {
79 std::set<uint64_t> register_time_events; // need to delete it if stop
11fdf7f2 80 std::deque<Message*> delay_queue;
7c673cae
FG
81 std::mutex delay_lock;
82 AsyncMessenger *msgr;
83 EventCenter *center;
84 DispatchQueue *dispatch_queue;
85 uint64_t conn_id;
86 std::atomic_bool stop_dispatch;
87
88 public:
89 explicit DelayedDelivery(AsyncMessenger *omsgr, EventCenter *c,
90 DispatchQueue *q, uint64_t cid)
91 : msgr(omsgr), center(c), dispatch_queue(q), conn_id(cid),
92 stop_dispatch(false) { }
93 ~DelayedDelivery() override {
11fdf7f2
TL
94 ceph_assert(register_time_events.empty());
95 ceph_assert(delay_queue.empty());
7c673cae
FG
96 }
97 void set_center(EventCenter *c) { center = c; }
11fdf7f2
TL
98 void do_request(uint64_t id) override;
99 void queue(double delay_period, Message *m) {
7c673cae 100 std::lock_guard<std::mutex> l(delay_lock);
11fdf7f2 101 delay_queue.push_back(m);
7c673cae
FG
102 register_time_events.insert(center->create_time_event(delay_period*1000000, this));
103 }
11fdf7f2 104 void discard();
7c673cae
FG
105 bool ready() const { return !stop_dispatch && delay_queue.empty() && register_time_events.empty(); }
106 void flush();
107 } *delay_state;
108
109 public:
11fdf7f2
TL
110 AsyncConnection(CephContext *cct, AsyncMessenger *m, DispatchQueue *q,
111 Worker *w, bool is_msgr2, bool local);
7c673cae
FG
112 ~AsyncConnection() override;
113 void maybe_start_delay_thread();
114
115 ostream& _conn_prefix(std::ostream *_dout);
116
11fdf7f2 117 bool is_connected() override;
7c673cae
FG
118
119 // Only call when AsyncConnection first construct
11fdf7f2
TL
120 void connect(const entity_addrvec_t& addrs, int type, entity_addr_t& target);
121
7c673cae 122 // Only call when AsyncConnection first construct
11fdf7f2
TL
123 void accept(ConnectedSocket socket,
124 const entity_addr_t &listen_addr,
125 const entity_addr_t &peer_addr);
7c673cae
FG
126 int send_message(Message *m) override;
127
128 void send_keepalive() override;
129 void mark_down() override;
130 void mark_disposable() override {
131 std::lock_guard<std::mutex> l(lock);
132 policy.lossy = true;
133 }
11fdf7f2
TL
134
135 entity_addr_t get_peer_socket_addr() const override {
136 return target_addr;
137 }
138
139 int get_con_mode() const override;
140
7c673cae
FG
141 private:
142 enum {
143 STATE_NONE,
7c673cae
FG
144 STATE_CONNECTING,
145 STATE_CONNECTING_RE,
7c673cae 146 STATE_ACCEPTING,
11fdf7f2
TL
147 STATE_CONNECTION_ESTABLISHED,
148 STATE_CLOSED
7c673cae
FG
149 };
150
11fdf7f2 151 static const uint32_t TCP_PREFETCH_MIN_SIZE;
7c673cae
FG
152 static const char *get_state_name(int state) {
153 const char* const statenames[] = {"STATE_NONE",
7c673cae
FG
154 "STATE_CONNECTING",
155 "STATE_CONNECTING_RE",
7c673cae 156 "STATE_ACCEPTING",
11fdf7f2
TL
157 "STATE_CONNECTION_ESTABLISHED",
158 "STATE_CLOSED"};
7c673cae
FG
159 return statenames[state];
160 }
161
162 AsyncMessenger *async_msgr;
163 uint64_t conn_id;
164 PerfCounters *logger;
7c673cae 165 int state;
7c673cae
FG
166 ConnectedSocket cs;
167 int port;
168 Messenger::Policy policy;
169
170 DispatchQueue *dispatch_queue;
171
31f18b77
FG
172 // lockfree, only used in own thread
173 bufferlist outcoming_bl;
174 bool open_write = false;
175
7c673cae 176 std::mutex write_lock;
7c673cae
FG
177
178 std::mutex lock;
7c673cae
FG
179 EventCallbackRef read_handler;
180 EventCallbackRef write_handler;
11fdf7f2 181 EventCallbackRef write_callback_handler;
7c673cae
FG
182 EventCallbackRef wakeup_handler;
183 EventCallbackRef tick_handler;
7c673cae
FG
184 char *recv_buf;
185 uint32_t recv_max_prefetch;
186 uint32_t recv_start;
187 uint32_t recv_end;
188 set<uint64_t> register_time_events; // need to delete it if stop
189 ceph::coarse_mono_clock::time_point last_active;
11fdf7f2 190 ceph::mono_clock::time_point recv_start_time;
7c673cae
FG
191 uint64_t last_tick_id = 0;
192 const uint64_t inactive_timeout_us;
193
194 // Tis section are temp variables used by state transition
195
7c673cae 196 // Accepting state
11fdf7f2
TL
197 bool msgr2 = false;
198 entity_addr_t socket_addr; ///< local socket addr
199 entity_addr_t target_addr; ///< which of the peer_addrs we're connecting to (as clienet) or should reconnect to (as peer)
200
201 entity_addr_t _infer_target_addr(const entity_addrvec_t& av);
202
7c673cae
FG
203 // used only by "read_until"
204 uint64_t state_offset;
205 Worker *worker;
206 EventCenter *center;
11fdf7f2
TL
207
208 std::unique_ptr<Protocol> protocol;
209
210 std::optional<std::function<void(ssize_t)>> writeCallback;
211 std::function<void(char *, ssize_t)> readCallback;
212 std::optional<unsigned> pendingReadLen;
213 char *read_buffer;
7c673cae
FG
214
215 public:
216 // used by eventcallback
217 void handle_write();
11fdf7f2 218 void handle_write_callback();
7c673cae
FG
219 void process();
220 void wakeup_from(uint64_t id);
221 void tick(uint64_t id);
222 void local_deliver();
11fdf7f2
TL
223 void stop(bool queue_reset);
224 void cleanup();
7c673cae
FG
225 PerfCounters *get_perf_counter() {
226 return logger;
227 }
11fdf7f2
TL
228
229 friend class Protocol;
230 friend class ProtocolV1;
231 friend class ProtocolV2;
7c673cae
FG
232}; /* AsyncConnection */
233
234typedef boost::intrusive_ptr<AsyncConnection> AsyncConnectionRef;
235
236#endif