1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2014 UnitedStack <haomai@unitedstack.com>
8 * Author: Haomai Wang <haomaiwang@gmail.com>
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
17 #ifndef CEPH_MSG_ASYNCCONNECTION_H
18 #define CEPH_MSG_ASYNCCONNECTION_H
29 #include "auth/AuthSessionHandler.h"
30 #include "common/ceph_time.h"
31 #include "common/perf_counters.h"
32 #include "include/buffer.h"
33 #include "msg/Connection.h"
34 #include "msg/Messenger.h"
42 static const int ASYNC_IOV_MAX
= (IOV_MAX
>= 1024 ? IOV_MAX
/ 4 : IOV_MAX
);
45 * AsyncConnection maintains a logic session between two endpoints. In other
46 * word, a pair of addresses can find the only AsyncConnection. AsyncConnection
47 * will handle with network fault or read/write transactions. If one file
48 * descriptor broken, AsyncConnection will maintain the message queue and
49 * sequence, try to reconnect peer endpoint.
51 class AsyncConnection
: public Connection
{
53 ssize_t
read_bulk(char *buf
, unsigned len
);
54 ssize_t
do_sendmsg(struct msghdr
&msg
, unsigned len
, bool more
);
55 ssize_t
try_send(bufferlist
&bl
, bool more
=false) {
56 std::lock_guard
<std::mutex
> l(write_lock
);
57 outcoming_bl
.claim_append(bl
);
58 return _try_send(more
);
60 ssize_t
_try_send(bool more
=false);
61 ssize_t
_send(Message
*m
);
62 void prepare_send_message(uint64_t features
, Message
*m
, bufferlist
&bl
);
63 ssize_t
read_until(unsigned needed
, char *p
);
64 ssize_t
_process_connection();
67 int handle_connect_reply(ceph_msg_connect
&connect
, ceph_msg_connect_reply
&r
);
68 ssize_t
handle_connect_msg(ceph_msg_connect
&m
, bufferlist
&aubl
, bufferlist
&bl
);
69 void was_session_reset();
71 void discard_out_queue();
72 void discard_requeued_up_to(uint64_t seq
);
74 int randomize_out_seq();
75 void handle_ack(uint64_t seq
);
76 void _append_keepalive_or_ack(bool ack
=false, utime_t
*t
=NULL
);
77 ssize_t
write_message(Message
*m
, bufferlist
& bl
, bool more
);
79 ssize_t
_reply_accept(char tag
, ceph_msg_connect
&connect
, ceph_msg_connect_reply
&reply
,
80 bufferlist
&authorizer_reply
) {
83 reply
.features
= ((uint64_t)connect
.features
& policy
.features_supported
) | policy
.features_required
;
84 reply
.authorizer_len
= authorizer_reply
.length();
85 reply_bl
.append((char*)&reply
, sizeof(reply
));
86 if (reply
.authorizer_len
) {
87 reply_bl
.append(authorizer_reply
.c_str(), authorizer_reply
.length());
89 ssize_t r
= try_send(reply_bl
);
95 state
= STATE_ACCEPTING_WAIT_CONNECT_MSG
;
98 bool is_queued() const {
99 return !out_q
.empty() || outcoming_bl
.length();
101 void shutdown_socket() {
102 for (auto &&t
: register_time_events
)
103 center
->delete_time_event(t
);
104 register_time_events
.clear();
106 center
->delete_time_event(last_tick_id
);
110 center
->delete_file_event(cs
.fd(), EVENT_READABLE
|EVENT_WRITABLE
);
115 Message
*_get_next_outgoing(bufferlist
*bl
) {
117 while (!m
&& !out_q
.empty()) {
118 map
<int, list
<pair
<bufferlist
, Message
*> > >::reverse_iterator it
= out_q
.rbegin();
119 if (!it
->second
.empty()) {
120 list
<pair
<bufferlist
, Message
*> >::iterator p
= it
->second
.begin();
126 if (it
->second
.empty())
127 out_q
.erase(it
->first
);
131 bool _has_next_outgoing() const {
132 return !out_q
.empty();
134 void reset_recv_state();
137 * The DelayedDelivery is for injecting delays into Message delivery off
138 * the socket. It is only enabled if delays are requested, and if they
139 * are then it pulls Messages off the DelayQueue and puts them into the
140 * AsyncMessenger event queue.
142 class DelayedDelivery
: public EventCallback
{
143 std::set
<uint64_t> register_time_events
; // need to delete it if stop
144 std::deque
<std::pair
<utime_t
, Message
*> > delay_queue
;
145 std::mutex delay_lock
;
146 AsyncMessenger
*msgr
;
148 DispatchQueue
*dispatch_queue
;
150 std::atomic_bool stop_dispatch
;
153 explicit DelayedDelivery(AsyncMessenger
*omsgr
, EventCenter
*c
,
154 DispatchQueue
*q
, uint64_t cid
)
155 : msgr(omsgr
), center(c
), dispatch_queue(q
), conn_id(cid
),
156 stop_dispatch(false) { }
157 ~DelayedDelivery() override
{
158 assert(register_time_events
.empty());
159 assert(delay_queue
.empty());
161 void set_center(EventCenter
*c
) { center
= c
; }
162 void do_request(int id
) override
;
163 void queue(double delay_period
, utime_t release
, Message
*m
) {
164 std::lock_guard
<std::mutex
> l(delay_lock
);
165 delay_queue
.push_back(std::make_pair(release
, m
));
166 register_time_events
.insert(center
->create_time_event(delay_period
*1000000, this));
169 stop_dispatch
= true;
170 center
->submit_to(center
->get_id(), [this] () mutable {
171 std::lock_guard
<std::mutex
> l(delay_lock
);
172 while (!delay_queue
.empty()) {
173 Message
*m
= delay_queue
.front().second
;
174 dispatch_queue
->dispatch_throttle_release(m
->get_dispatch_throttle_size());
176 delay_queue
.pop_front();
178 for (auto i
: register_time_events
)
179 center
->delete_time_event(i
);
180 register_time_events
.clear();
181 stop_dispatch
= false;
184 bool ready() const { return !stop_dispatch
&& delay_queue
.empty() && register_time_events
.empty(); }
189 AsyncConnection(CephContext
*cct
, AsyncMessenger
*m
, DispatchQueue
*q
, Worker
*w
);
190 ~AsyncConnection() override
;
191 void maybe_start_delay_thread();
193 ostream
& _conn_prefix(std::ostream
*_dout
);
195 bool is_connected() override
{
196 return can_write
.load() == WriteStatus::CANWRITE
;
199 // Only call when AsyncConnection first construct
200 void connect(const entity_addr_t
& addr
, int type
) {
203 policy
= msgr
->get_policy(type
);
206 // Only call when AsyncConnection first construct
207 void accept(ConnectedSocket socket
, entity_addr_t
&addr
);
208 int send_message(Message
*m
) override
;
210 void send_keepalive() override
;
211 void mark_down() override
;
212 void mark_disposable() override
{
213 std::lock_guard
<std::mutex
> l(lock
);
221 STATE_OPEN_KEEPALIVE2
,
222 STATE_OPEN_KEEPALIVE2_ACK
,
224 STATE_OPEN_MESSAGE_HEADER
,
225 STATE_OPEN_MESSAGE_THROTTLE_MESSAGE
,
226 STATE_OPEN_MESSAGE_THROTTLE_BYTES
,
227 STATE_OPEN_MESSAGE_THROTTLE_DISPATCH_QUEUE
,
228 STATE_OPEN_MESSAGE_READ_FRONT
,
229 STATE_OPEN_MESSAGE_READ_MIDDLE
,
230 STATE_OPEN_MESSAGE_READ_DATA_PREPARE
,
231 STATE_OPEN_MESSAGE_READ_DATA
,
232 STATE_OPEN_MESSAGE_READ_FOOTER_AND_DISPATCH
,
233 STATE_OPEN_TAG_CLOSE
,
237 STATE_CONNECTING_WAIT_BANNER_AND_IDENTIFY
,
238 STATE_CONNECTING_SEND_CONNECT_MSG
,
239 STATE_CONNECTING_WAIT_CONNECT_REPLY
,
240 STATE_CONNECTING_WAIT_CONNECT_REPLY_AUTH
,
241 STATE_CONNECTING_WAIT_ACK_SEQ
,
242 STATE_CONNECTING_READY
,
244 STATE_ACCEPTING_WAIT_BANNER_ADDR
,
245 STATE_ACCEPTING_WAIT_CONNECT_MSG
,
246 STATE_ACCEPTING_WAIT_CONNECT_MSG_AUTH
,
247 STATE_ACCEPTING_WAIT_SEQ
,
248 STATE_ACCEPTING_READY
,
251 STATE_WAIT
, // just wait for racing connection
254 static const int TCP_PREFETCH_MIN_SIZE
;
255 static const char *get_state_name(int state
) {
256 const char* const statenames
[] = {"STATE_NONE",
258 "STATE_OPEN_KEEPALIVE2",
259 "STATE_OPEN_KEEPALIVE2_ACK",
260 "STATE_OPEN_TAG_ACK",
261 "STATE_OPEN_MESSAGE_HEADER",
262 "STATE_OPEN_MESSAGE_THROTTLE_MESSAGE",
263 "STATE_OPEN_MESSAGE_THROTTLE_BYTES",
264 "STATE_OPEN_MESSAGE_THROTTLE_DISPATCH_QUEUE",
265 "STATE_OPEN_MESSAGE_READ_FRONT",
266 "STATE_OPEN_MESSAGE_READ_MIDDLE",
267 "STATE_OPEN_MESSAGE_READ_DATA_PREPARE",
268 "STATE_OPEN_MESSAGE_READ_DATA",
269 "STATE_OPEN_MESSAGE_READ_FOOTER_AND_DISPATCH",
270 "STATE_OPEN_TAG_CLOSE",
273 "STATE_CONNECTING_RE",
274 "STATE_CONNECTING_WAIT_BANNER_AND_IDENTIFY",
275 "STATE_CONNECTING_SEND_CONNECT_MSG",
276 "STATE_CONNECTING_WAIT_CONNECT_REPLY",
277 "STATE_CONNECTING_WAIT_CONNECT_REPLY_AUTH",
278 "STATE_CONNECTING_WAIT_ACK_SEQ",
279 "STATE_CONNECTING_READY",
281 "STATE_ACCEPTING_WAIT_BANNER_ADDR",
282 "STATE_ACCEPTING_WAIT_CONNECT_MSG",
283 "STATE_ACCEPTING_WAIT_CONNECT_MSG_AUTH",
284 "STATE_ACCEPTING_WAIT_SEQ",
285 "STATE_ACCEPTING_READY",
289 return statenames
[state
];
292 AsyncMessenger
*async_msgr
;
294 PerfCounters
*logger
;
296 __u32 connect_seq
, peer_global_seq
;
298 atomic64_t ack_left
, in_seq
;
300 int state_after_send
;
303 Messenger::Policy policy
;
305 DispatchQueue
*dispatch_queue
;
307 std::mutex write_lock
;
308 enum class WriteStatus
{
314 std::atomic
<WriteStatus
> can_write
;
316 map
<int, list
<pair
<bufferlist
, Message
*> > > out_q
; // priority queue for outbound msgs
317 list
<Message
*> sent
; // the first bufferlist need to inject seq
318 bufferlist outcoming_bl
;
322 utime_t backoff
; // backoff time
323 EventCallbackRef read_handler
;
324 EventCallbackRef write_handler
;
325 EventCallbackRef wakeup_handler
;
326 EventCallbackRef tick_handler
;
327 struct iovec msgvec
[ASYNC_IOV_MAX
];
329 uint32_t recv_max_prefetch
;
332 set
<uint64_t> register_time_events
; // need to delete it if stop
333 ceph::coarse_mono_clock::time_point last_active
;
334 uint64_t last_tick_id
= 0;
335 const uint64_t inactive_timeout_us
;
337 // Tis section are temp variables used by state transition
341 utime_t throttle_stamp
;
343 uint64_t cur_msg_size
;
344 ceph_msg_header current_header
;
346 bufferlist::iterator data_blp
;
347 bufferlist front
, middle
, data
;
348 ceph_msg_connect connect_msg
;
351 AuthAuthorizer
*authorizer
;
352 bufferlist authorizer_buf
;
353 ceph_msg_connect_reply connect_reply
;
355 entity_addr_t socket_addr
;
356 CryptoKey session_key
;
357 bool replacing
; // when replacing process happened, we will reply connect
358 // side with RETRY tag and accept side will clear replaced
359 // connection. So when connect side reissue connect_msg,
360 // there won't exists conflicting connection so we use
361 // "replacing" to skip RESETSESSION to avoid detect wrong
363 bool is_reset_from_peer
;
366 // used only for local state, it will be overwrite when state transition
368 // used only by "read_until"
369 uint64_t state_offset
;
372 ceph::shared_ptr
<AuthSessionHandler
> session_security
;
375 // used by eventcallback
378 void wakeup_from(uint64_t id
);
379 void tick(uint64_t id
);
380 void local_deliver();
381 void stop(bool queue_reset
) {
383 bool need_queue_reset
= (state
!= STATE_CLOSED
) && queue_reset
;
386 if (need_queue_reset
)
387 dispatch_queue
->queue_reset(this);
392 delete write_handler
;
393 delete wakeup_handler
;
400 PerfCounters
*get_perf_counter() {
403 }; /* AsyncConnection */
405 typedef boost::intrusive_ptr
<AsyncConnection
> AsyncConnectionRef
;