1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2014 UnitedStack <haomai@unitedstack.com>
8 * Author: Haomai Wang <haomaiwang@gmail.com>
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
17 #ifndef CEPH_MSG_ASYNCCONNECTION_H
18 #define CEPH_MSG_ASYNCCONNECTION_H
29 #include "auth/AuthSessionHandler.h"
30 #include "common/ceph_time.h"
31 #include "common/perf_counters.h"
32 #include "include/buffer.h"
33 #include "msg/Connection.h"
34 #include "msg/Messenger.h"
44 static const int ASYNC_IOV_MAX
= (IOV_MAX
>= 1024 ? IOV_MAX
/ 4 : IOV_MAX
);
47 * AsyncConnection maintains a logic session between two endpoints. In other
48 * word, a pair of addresses can find the only AsyncConnection. AsyncConnection
49 * will handle with network fault or read/write transactions. If one file
50 * descriptor broken, AsyncConnection will maintain the message queue and
51 * sequence, try to reconnect peer endpoint.
53 class AsyncConnection
: public Connection
{
54 ssize_t
read(unsigned len
, char *buffer
,
55 std::function
<void(char *, ssize_t
)> callback
);
56 ssize_t
read_until(unsigned needed
, char *p
);
57 ssize_t
read_bulk(char *buf
, unsigned len
);
59 ssize_t
write(ceph::buffer::list
&bl
, std::function
<void(ssize_t
)> callback
,
61 ssize_t
_try_send(bool more
=false);
67 bool inject_network_congestion() const;
69 bool is_queued() const;
70 void shutdown_socket();
73 * The DelayedDelivery is for injecting delays into Message delivery off
74 * the socket. It is only enabled if delays are requested, and if they
75 * are then it pulls Messages off the DelayQueue and puts them into the
76 * AsyncMessenger event queue.
78 class DelayedDelivery
: public EventCallback
{
79 std::set
<uint64_t> register_time_events
; // need to delete it if stop
80 std::deque
<Message
*> delay_queue
;
81 std::mutex delay_lock
;
84 DispatchQueue
*dispatch_queue
;
86 std::atomic_bool stop_dispatch
;
89 explicit DelayedDelivery(AsyncMessenger
*omsgr
, EventCenter
*c
,
90 DispatchQueue
*q
, uint64_t cid
)
91 : msgr(omsgr
), center(c
), dispatch_queue(q
), conn_id(cid
),
92 stop_dispatch(false) { }
93 ~DelayedDelivery() override
{
94 ceph_assert(register_time_events
.empty());
95 ceph_assert(delay_queue
.empty());
97 void set_center(EventCenter
*c
) { center
= c
; }
98 void do_request(uint64_t id
) override
;
99 void queue(double delay_period
, Message
*m
) {
100 std::lock_guard
<std::mutex
> l(delay_lock
);
101 delay_queue
.push_back(m
);
102 register_time_events
.insert(center
->create_time_event(delay_period
*1000000, this));
105 bool ready() const { return !stop_dispatch
&& delay_queue
.empty() && register_time_events
.empty(); }
110 FRIEND_MAKE_REF(AsyncConnection
);
111 AsyncConnection(CephContext
*cct
, AsyncMessenger
*m
, DispatchQueue
*q
,
112 Worker
*w
, bool is_msgr2
, bool local
);
113 ~AsyncConnection() override
;
114 bool unregistered
= false;
116 void maybe_start_delay_thread();
118 std::ostream
& _conn_prefix(std::ostream
*_dout
);
120 bool is_connected() override
;
122 // Only call when AsyncConnection first construct
123 void connect(const entity_addrvec_t
& addrs
, int type
, entity_addr_t
& target
);
125 // Only call when AsyncConnection first construct
126 void accept(ConnectedSocket socket
,
127 const entity_addr_t
&listen_addr
,
128 const entity_addr_t
&peer_addr
);
129 int send_message(Message
*m
) override
;
131 void send_keepalive() override
;
132 void mark_down() override
;
133 void mark_disposable() override
{
134 std::lock_guard
<std::mutex
> l(lock
);
138 entity_addr_t
get_peer_socket_addr() const override
{
142 int get_con_mode() const override
;
144 bool is_unregistered() const {
158 STATE_CONNECTION_ESTABLISHED
,
162 static const uint32_t TCP_PREFETCH_MIN_SIZE
;
163 static const char *get_state_name(int state
) {
164 const char* const statenames
[] = {"STATE_NONE",
166 "STATE_CONNECTING_RE",
168 "STATE_CONNECTION_ESTABLISHED",
170 return statenames
[state
];
173 AsyncMessenger
*async_msgr
;
175 PerfCounters
*logger
;
180 Messenger::Policy policy
;
183 DispatchQueue
*dispatch_queue
;
185 // lockfree, only used in own thread
186 ceph::buffer::list outgoing_bl
;
187 bool open_write
= false;
189 std::mutex write_lock
;
192 EventCallbackRef read_handler
;
193 EventCallbackRef write_handler
;
194 EventCallbackRef write_callback_handler
;
195 EventCallbackRef wakeup_handler
;
196 EventCallbackRef tick_handler
;
198 uint32_t recv_max_prefetch
;
201 std::set
<uint64_t> register_time_events
; // need to delete it if stop
202 ceph::coarse_mono_clock::time_point last_connect_started
;
203 ceph::coarse_mono_clock::time_point last_active
;
204 ceph::mono_clock::time_point recv_start_time
;
205 uint64_t last_tick_id
= 0;
206 const uint64_t connect_timeout_us
;
207 const uint64_t inactive_timeout_us
;
209 // Tis section are temp variables used by state transition
213 entity_addr_t socket_addr
; ///< local socket addr
214 entity_addr_t target_addr
; ///< which of the peer_addrs we're connecting to (as clienet) or should reconnect to (as peer)
216 entity_addr_t
_infer_target_addr(const entity_addrvec_t
& av
);
218 // used only by "read_until"
219 uint64_t state_offset
;
223 std::unique_ptr
<Protocol
> protocol
;
225 std::optional
<std::function
<void(ssize_t
)>> writeCallback
;
226 std::function
<void(char *, ssize_t
)> readCallback
;
227 std::optional
<unsigned> pendingReadLen
;
231 // used by eventcallback
233 void handle_write_callback();
235 void wakeup_from(uint64_t id
);
236 void tick(uint64_t id
);
237 void stop(bool queue_reset
);
239 PerfCounters
*get_perf_counter() {
243 bool is_msgr2() const override
;
245 friend class Protocol
;
246 friend class ProtocolV1
;
247 friend class ProtocolV2
;
248 }; /* AsyncConnection */
250 using AsyncConnectionRef
= ceph::ref_t
<AsyncConnection
>;