1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2014 UnitedStack <haomai@unitedstack.com>
8 * Author: Haomai Wang <haomaiwang@gmail.com>
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
17 #ifndef CEPH_MSG_ASYNCCONNECTION_H
18 #define CEPH_MSG_ASYNCCONNECTION_H
28 #include "auth/AuthSessionHandler.h"
29 #include "common/ceph_time.h"
30 #include "common/perf_counters.h"
31 #include "include/buffer.h"
32 #include "msg/Connection.h"
33 #include "msg/Messenger.h"
41 static const int ASYNC_IOV_MAX
= (IOV_MAX
>= 1024 ? IOV_MAX
/ 4 : IOV_MAX
);
44 * AsyncConnection maintains a logic session between two endpoints. In other
45 * word, a pair of addresses can find the only AsyncConnection. AsyncConnection
46 * will handle with network fault or read/write transactions. If one file
47 * descriptor broken, AsyncConnection will maintain the message queue and
48 * sequence, try to reconnect peer endpoint.
50 class AsyncConnection
: public Connection
{
52 ssize_t
read_bulk(char *buf
, unsigned len
);
53 ssize_t
do_sendmsg(struct msghdr
&msg
, unsigned len
, bool more
);
54 ssize_t
try_send(bufferlist
&bl
, bool more
=false) {
55 std::lock_guard
<std::mutex
> l(write_lock
);
56 outcoming_bl
.claim_append(bl
);
57 return _try_send(more
);
59 ssize_t
_try_send(bool more
=false);
60 ssize_t
_send(Message
*m
);
61 void prepare_send_message(uint64_t features
, Message
*m
, bufferlist
&bl
);
62 ssize_t
read_until(unsigned needed
, char *p
);
63 ssize_t
_process_connection();
66 int handle_connect_reply(ceph_msg_connect
&connect
, ceph_msg_connect_reply
&r
);
67 ssize_t
handle_connect_msg(ceph_msg_connect
&m
, bufferlist
&aubl
, bufferlist
&bl
);
68 void was_session_reset();
70 void discard_out_queue();
71 void discard_requeued_up_to(uint64_t seq
);
73 int randomize_out_seq();
74 void handle_ack(uint64_t seq
);
75 void _append_keepalive_or_ack(bool ack
=false, utime_t
*t
=NULL
);
76 ssize_t
write_message(Message
*m
, bufferlist
& bl
, bool more
);
78 ssize_t
_reply_accept(char tag
, ceph_msg_connect
&connect
, ceph_msg_connect_reply
&reply
,
79 bufferlist
&authorizer_reply
) {
82 reply
.features
= ((uint64_t)connect
.features
& policy
.features_supported
) | policy
.features_required
;
83 reply
.authorizer_len
= authorizer_reply
.length();
84 reply_bl
.append((char*)&reply
, sizeof(reply
));
85 if (reply
.authorizer_len
) {
86 reply_bl
.append(authorizer_reply
.c_str(), authorizer_reply
.length());
88 ssize_t r
= try_send(reply_bl
);
94 state
= STATE_ACCEPTING_WAIT_CONNECT_MSG
;
97 bool is_queued() const {
98 return !out_q
.empty() || outcoming_bl
.length();
100 void shutdown_socket() {
101 for (auto &&t
: register_time_events
)
102 center
->delete_time_event(t
);
103 register_time_events
.clear();
105 center
->delete_time_event(last_tick_id
);
109 center
->delete_file_event(cs
.fd(), EVENT_READABLE
|EVENT_WRITABLE
);
114 Message
*_get_next_outgoing(bufferlist
*bl
) {
116 while (!m
&& !out_q
.empty()) {
117 map
<int, list
<pair
<bufferlist
, Message
*> > >::reverse_iterator it
= out_q
.rbegin();
118 if (!it
->second
.empty()) {
119 list
<pair
<bufferlist
, Message
*> >::iterator p
= it
->second
.begin();
125 if (it
->second
.empty())
126 out_q
.erase(it
->first
);
130 bool _has_next_outgoing() const {
131 return !out_q
.empty();
133 void reset_recv_state();
136 * The DelayedDelivery is for injecting delays into Message delivery off
137 * the socket. It is only enabled if delays are requested, and if they
138 * are then it pulls Messages off the DelayQueue and puts them into the
139 * AsyncMessenger event queue.
141 class DelayedDelivery
: public EventCallback
{
142 std::set
<uint64_t> register_time_events
; // need to delete it if stop
143 std::deque
<std::pair
<utime_t
, Message
*> > delay_queue
;
144 std::mutex delay_lock
;
145 AsyncMessenger
*msgr
;
147 DispatchQueue
*dispatch_queue
;
149 std::atomic_bool stop_dispatch
;
152 explicit DelayedDelivery(AsyncMessenger
*omsgr
, EventCenter
*c
,
153 DispatchQueue
*q
, uint64_t cid
)
154 : msgr(omsgr
), center(c
), dispatch_queue(q
), conn_id(cid
),
155 stop_dispatch(false) { }
156 ~DelayedDelivery() override
{
157 assert(register_time_events
.empty());
158 assert(delay_queue
.empty());
160 void set_center(EventCenter
*c
) { center
= c
; }
161 void do_request(int id
) override
;
162 void queue(double delay_period
, utime_t release
, Message
*m
) {
163 std::lock_guard
<std::mutex
> l(delay_lock
);
164 delay_queue
.push_back(std::make_pair(release
, m
));
165 register_time_events
.insert(center
->create_time_event(delay_period
*1000000, this));
168 stop_dispatch
= true;
169 center
->submit_to(center
->get_id(), [this] () mutable {
170 std::lock_guard
<std::mutex
> l(delay_lock
);
171 while (!delay_queue
.empty()) {
172 Message
*m
= delay_queue
.front().second
;
173 dispatch_queue
->dispatch_throttle_release(m
->get_dispatch_throttle_size());
175 delay_queue
.pop_front();
177 for (auto i
: register_time_events
)
178 center
->delete_time_event(i
);
179 register_time_events
.clear();
180 stop_dispatch
= false;
183 bool ready() const { return !stop_dispatch
&& delay_queue
.empty() && register_time_events
.empty(); }
188 AsyncConnection(CephContext
*cct
, AsyncMessenger
*m
, DispatchQueue
*q
, Worker
*w
);
189 ~AsyncConnection() override
;
190 void maybe_start_delay_thread();
192 ostream
& _conn_prefix(std::ostream
*_dout
);
194 bool is_connected() override
{
195 return can_write
.load() == WriteStatus::CANWRITE
;
198 // Only call when AsyncConnection first construct
199 void connect(const entity_addr_t
& addr
, int type
) {
202 policy
= msgr
->get_policy(type
);
205 // Only call when AsyncConnection first construct
206 void accept(ConnectedSocket socket
, entity_addr_t
&addr
);
207 int send_message(Message
*m
) override
;
209 void send_keepalive() override
;
210 void mark_down() override
;
211 void mark_disposable() override
{
212 std::lock_guard
<std::mutex
> l(lock
);
220 STATE_OPEN_KEEPALIVE2
,
221 STATE_OPEN_KEEPALIVE2_ACK
,
223 STATE_OPEN_MESSAGE_HEADER
,
224 STATE_OPEN_MESSAGE_THROTTLE_MESSAGE
,
225 STATE_OPEN_MESSAGE_THROTTLE_BYTES
,
226 STATE_OPEN_MESSAGE_THROTTLE_DISPATCH_QUEUE
,
227 STATE_OPEN_MESSAGE_READ_FRONT
,
228 STATE_OPEN_MESSAGE_READ_MIDDLE
,
229 STATE_OPEN_MESSAGE_READ_DATA_PREPARE
,
230 STATE_OPEN_MESSAGE_READ_DATA
,
231 STATE_OPEN_MESSAGE_READ_FOOTER_AND_DISPATCH
,
232 STATE_OPEN_TAG_CLOSE
,
236 STATE_CONNECTING_WAIT_BANNER_AND_IDENTIFY
,
237 STATE_CONNECTING_SEND_CONNECT_MSG
,
238 STATE_CONNECTING_WAIT_CONNECT_REPLY
,
239 STATE_CONNECTING_WAIT_CONNECT_REPLY_AUTH
,
240 STATE_CONNECTING_WAIT_ACK_SEQ
,
241 STATE_CONNECTING_READY
,
243 STATE_ACCEPTING_WAIT_BANNER_ADDR
,
244 STATE_ACCEPTING_WAIT_CONNECT_MSG
,
245 STATE_ACCEPTING_WAIT_CONNECT_MSG_AUTH
,
246 STATE_ACCEPTING_WAIT_SEQ
,
247 STATE_ACCEPTING_READY
,
250 STATE_WAIT
, // just wait for racing connection
253 static const int TCP_PREFETCH_MIN_SIZE
;
254 static const char *get_state_name(int state
) {
255 const char* const statenames
[] = {"STATE_NONE",
257 "STATE_OPEN_KEEPALIVE2",
258 "STATE_OPEN_KEEPALIVE2_ACK",
259 "STATE_OPEN_TAG_ACK",
260 "STATE_OPEN_MESSAGE_HEADER",
261 "STATE_OPEN_MESSAGE_THROTTLE_MESSAGE",
262 "STATE_OPEN_MESSAGE_THROTTLE_BYTES",
263 "STATE_OPEN_MESSAGE_THROTTLE_DISPATCH_QUEUE",
264 "STATE_OPEN_MESSAGE_READ_FRONT",
265 "STATE_OPEN_MESSAGE_READ_MIDDLE",
266 "STATE_OPEN_MESSAGE_READ_DATA_PREPARE",
267 "STATE_OPEN_MESSAGE_READ_DATA",
268 "STATE_OPEN_MESSAGE_READ_FOOTER_AND_DISPATCH",
269 "STATE_OPEN_TAG_CLOSE",
272 "STATE_CONNECTING_RE",
273 "STATE_CONNECTING_WAIT_BANNER_AND_IDENTIFY",
274 "STATE_CONNECTING_SEND_CONNECT_MSG",
275 "STATE_CONNECTING_WAIT_CONNECT_REPLY",
276 "STATE_CONNECTING_WAIT_CONNECT_REPLY_AUTH",
277 "STATE_CONNECTING_WAIT_ACK_SEQ",
278 "STATE_CONNECTING_READY",
280 "STATE_ACCEPTING_WAIT_BANNER_ADDR",
281 "STATE_ACCEPTING_WAIT_CONNECT_MSG",
282 "STATE_ACCEPTING_WAIT_CONNECT_MSG_AUTH",
283 "STATE_ACCEPTING_WAIT_SEQ",
284 "STATE_ACCEPTING_READY",
288 return statenames
[state
];
291 AsyncMessenger
*async_msgr
;
293 PerfCounters
*logger
;
295 __u32 connect_seq
, peer_global_seq
;
296 std::atomic
<uint64_t> out_seq
{0};
297 std::atomic
<uint64_t> ack_left
{0}, in_seq
{0};
299 int state_after_send
;
302 Messenger::Policy policy
;
304 DispatchQueue
*dispatch_queue
;
306 // lockfree, only used in own thread
307 bufferlist outcoming_bl
;
308 bool open_write
= false;
310 std::mutex write_lock
;
311 enum class WriteStatus
{
317 std::atomic
<WriteStatus
> can_write
;
318 list
<Message
*> sent
; // the first bufferlist need to inject seq
319 map
<int, list
<pair
<bufferlist
, Message
*> > > out_q
; // priority queue for outbound msgs
323 utime_t backoff
; // backoff time
324 EventCallbackRef read_handler
;
325 EventCallbackRef write_handler
;
326 EventCallbackRef wakeup_handler
;
327 EventCallbackRef tick_handler
;
328 struct iovec msgvec
[ASYNC_IOV_MAX
];
330 uint32_t recv_max_prefetch
;
333 set
<uint64_t> register_time_events
; // need to delete it if stop
334 ceph::coarse_mono_clock::time_point last_active
;
335 uint64_t last_tick_id
= 0;
336 const uint64_t inactive_timeout_us
;
338 // Tis section are temp variables used by state transition
342 utime_t throttle_stamp
;
344 uint64_t cur_msg_size
;
345 ceph_msg_header current_header
;
347 bufferlist::iterator data_blp
;
348 bufferlist front
, middle
, data
;
349 ceph_msg_connect connect_msg
;
352 AuthAuthorizer
*authorizer
;
353 bufferlist authorizer_buf
;
354 ceph_msg_connect_reply connect_reply
;
356 entity_addr_t socket_addr
;
357 CryptoKey session_key
;
358 bool replacing
; // when replacing process happened, we will reply connect
359 // side with RETRY tag and accept side will clear replaced
360 // connection. So when connect side reissue connect_msg,
361 // there won't exists conflicting connection so we use
362 // "replacing" to skip RESETSESSION to avoid detect wrong
364 bool is_reset_from_peer
;
367 // used only for local state, it will be overwrite when state transition
369 // used only by "read_until"
370 uint64_t state_offset
;
373 ceph::shared_ptr
<AuthSessionHandler
> session_security
;
374 std::unique_ptr
<AuthAuthorizerChallenge
> authorizer_challenge
; // accept side
377 // used by eventcallback
380 void wakeup_from(uint64_t id
);
381 void tick(uint64_t id
);
382 void local_deliver();
383 void stop(bool queue_reset
) {
385 bool need_queue_reset
= (state
!= STATE_CLOSED
) && queue_reset
;
388 if (need_queue_reset
)
389 dispatch_queue
->queue_reset(this);
394 delete write_handler
;
395 delete wakeup_handler
;
402 PerfCounters
*get_perf_counter() {
405 }; /* AsyncConnection */
407 typedef boost::intrusive_ptr
<AsyncConnection
> AsyncConnectionRef
;