1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2014 UnitedStack <haomai@unitedstack.com>
8 * Author: Haomai Wang <haomaiwang@gmail.com>
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
17 #ifndef CEPH_MSG_ASYNCCONNECTION_H
18 #define CEPH_MSG_ASYNCCONNECTION_H
29 #include "auth/AuthSessionHandler.h"
30 #include "common/ceph_time.h"
31 #include "common/perf_counters.h"
32 #include "include/buffer.h"
33 #include "msg/Connection.h"
34 #include "msg/Messenger.h"
44 static const int ASYNC_IOV_MAX
= (IOV_MAX
>= 1024 ? IOV_MAX
/ 4 : IOV_MAX
);
47 * AsyncConnection maintains a logic session between two endpoints. In other
48 * word, a pair of addresses can find the only AsyncConnection. AsyncConnection
49 * will handle with network fault or read/write transactions. If one file
50 * descriptor broken, AsyncConnection will maintain the message queue and
51 * sequence, try to reconnect peer endpoint.
53 class AsyncConnection
: public Connection
{
55 ssize_t
read(unsigned len
, char *buffer
,
56 std::function
<void(char *, ssize_t
)> callback
);
57 ssize_t
read_until(unsigned needed
, char *p
);
58 ssize_t
read_bulk(char *buf
, unsigned len
);
60 ssize_t
write(bufferlist
&bl
, std::function
<void(ssize_t
)> callback
,
62 ssize_t
_try_send(bool more
=false);
69 bool is_queued() const;
70 void shutdown_socket();
73 * The DelayedDelivery is for injecting delays into Message delivery off
74 * the socket. It is only enabled if delays are requested, and if they
75 * are then it pulls Messages off the DelayQueue and puts them into the
76 * AsyncMessenger event queue.
78 class DelayedDelivery
: public EventCallback
{
79 std::set
<uint64_t> register_time_events
; // need to delete it if stop
80 std::deque
<Message
*> delay_queue
;
81 std::mutex delay_lock
;
84 DispatchQueue
*dispatch_queue
;
86 std::atomic_bool stop_dispatch
;
89 explicit DelayedDelivery(AsyncMessenger
*omsgr
, EventCenter
*c
,
90 DispatchQueue
*q
, uint64_t cid
)
91 : msgr(omsgr
), center(c
), dispatch_queue(q
), conn_id(cid
),
92 stop_dispatch(false) { }
93 ~DelayedDelivery() override
{
94 ceph_assert(register_time_events
.empty());
95 ceph_assert(delay_queue
.empty());
97 void set_center(EventCenter
*c
) { center
= c
; }
98 void do_request(uint64_t id
) override
;
99 void queue(double delay_period
, Message
*m
) {
100 std::lock_guard
<std::mutex
> l(delay_lock
);
101 delay_queue
.push_back(m
);
102 register_time_events
.insert(center
->create_time_event(delay_period
*1000000, this));
105 bool ready() const { return !stop_dispatch
&& delay_queue
.empty() && register_time_events
.empty(); }
110 AsyncConnection(CephContext
*cct
, AsyncMessenger
*m
, DispatchQueue
*q
,
111 Worker
*w
, bool is_msgr2
, bool local
);
112 ~AsyncConnection() override
;
113 void maybe_start_delay_thread();
115 ostream
& _conn_prefix(std::ostream
*_dout
);
117 bool is_connected() override
;
119 // Only call when AsyncConnection first construct
120 void connect(const entity_addrvec_t
& addrs
, int type
, entity_addr_t
& target
);
122 // Only call when AsyncConnection first construct
123 void accept(ConnectedSocket socket
,
124 const entity_addr_t
&listen_addr
,
125 const entity_addr_t
&peer_addr
);
126 int send_message(Message
*m
) override
;
128 void send_keepalive() override
;
129 void mark_down() override
;
130 void mark_disposable() override
{
131 std::lock_guard
<std::mutex
> l(lock
);
135 entity_addr_t
get_peer_socket_addr() const override
{
139 int get_con_mode() const override
;
147 STATE_CONNECTION_ESTABLISHED
,
151 static const uint32_t TCP_PREFETCH_MIN_SIZE
;
152 static const char *get_state_name(int state
) {
153 const char* const statenames
[] = {"STATE_NONE",
155 "STATE_CONNECTING_RE",
157 "STATE_CONNECTION_ESTABLISHED",
159 return statenames
[state
];
162 AsyncMessenger
*async_msgr
;
164 PerfCounters
*logger
;
168 Messenger::Policy policy
;
170 DispatchQueue
*dispatch_queue
;
172 // lockfree, only used in own thread
173 bufferlist outcoming_bl
;
174 bool open_write
= false;
176 std::mutex write_lock
;
179 EventCallbackRef read_handler
;
180 EventCallbackRef write_handler
;
181 EventCallbackRef write_callback_handler
;
182 EventCallbackRef wakeup_handler
;
183 EventCallbackRef tick_handler
;
185 uint32_t recv_max_prefetch
;
188 set
<uint64_t> register_time_events
; // need to delete it if stop
189 ceph::coarse_mono_clock::time_point last_active
;
190 ceph::mono_clock::time_point recv_start_time
;
191 uint64_t last_tick_id
= 0;
192 const uint64_t inactive_timeout_us
;
194 // Tis section are temp variables used by state transition
198 entity_addr_t socket_addr
; ///< local socket addr
199 entity_addr_t target_addr
; ///< which of the peer_addrs we're connecting to (as clienet) or should reconnect to (as peer)
201 entity_addr_t
_infer_target_addr(const entity_addrvec_t
& av
);
203 // used only by "read_until"
204 uint64_t state_offset
;
208 std::unique_ptr
<Protocol
> protocol
;
210 std::optional
<std::function
<void(ssize_t
)>> writeCallback
;
211 std::function
<void(char *, ssize_t
)> readCallback
;
212 std::optional
<unsigned> pendingReadLen
;
216 // used by eventcallback
218 void handle_write_callback();
220 void wakeup_from(uint64_t id
);
221 void tick(uint64_t id
);
222 void local_deliver();
223 void stop(bool queue_reset
);
225 PerfCounters
*get_perf_counter() {
229 friend class Protocol
;
230 friend class ProtocolV1
;
231 friend class ProtocolV2
;
232 }; /* AsyncConnection */
234 typedef boost::intrusive_ptr
<AsyncConnection
> AsyncConnectionRef
;