1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
15 #include <sys/types.h>
16 #include <sys/socket.h>
17 #include <netinet/in.h>
18 #include <netinet/ip.h>
19 #include <netinet/tcp.h>
24 #include "msg/Message.h"
26 #include "SimpleMessenger.h"
28 #include "common/debug.h"
29 #include "common/errno.h"
30 #include "common/valgrind.h"
32 // Below included to get encode_encrypt(); That probably should be in Crypto.h, instead
34 #include "auth/cephx/CephxProtocol.h"
35 #include "auth/AuthSessionHandler.h"
37 #include "include/compat.h"
38 #include "include/sock_compat.h"
39 #include "include/random.h"
41 // Constant to limit starting sequence number to 2^31. Nothing special about it, just a big number. PLR
42 #define SEQ_MASK 0x7fffffff
43 #define dout_subsys ceph_subsys_ms
46 #define dout_prefix *_dout << *this
47 ostream
& Pipe::_pipe_prefix(std::ostream
&out
) const {
48 return out
<< "-- " << msgr
->get_myaddr_legacy() << " >> " << peer_addr
50 << " sd=" << sd
<< " :" << port
52 << " pgs=" << peer_global_seq
53 << " cs=" << connect_seq
54 << " l=" << policy
.lossy
55 << " c=" << connection_state
59 ostream
& operator<<(ostream
&out
, const Pipe
&pipe
) {
60 return pipe
._pipe_prefix(out
);
64 * The DelayedDelivery is for injecting delays into Message delivery off
65 * the socket. It is only enabled if delays are requested, and if they
66 * are then it pulls Messages off the DelayQueue and puts them into the
67 * in_q (SimpleMessenger::dispatch_queue).
68 * Please note that this probably has issues with Pipe shutdown and
69 * replacement semantics. I've tried, but no guarantees.
71 class Pipe::DelayedDelivery
: public Thread
{
73 std::deque
< pair
<utime_t
,Message
*> > delay_queue
;
78 bool stop_delayed_delivery
;
79 bool delay_dispatching
; // we are in fast dispatch now
80 bool stop_fast_dispatching_flag
; // we need to stop fast dispatching
83 explicit DelayedDelivery(Pipe
*p
)
85 delay_lock("Pipe::DelayedDelivery::delay_lock"), flush_count(0),
87 stop_delayed_delivery(false),
88 delay_dispatching(false),
89 stop_fast_dispatching_flag(false) { }
90 ~DelayedDelivery() override
{
93 void *entry() override
;
94 void queue(utime_t release
, Message
*m
) {
95 Mutex::Locker
l(delay_lock
);
96 delay_queue
.push_back(make_pair(release
, m
));
102 Mutex::Locker
l(delay_lock
);
103 return flush_count
> 0 || active_flush
;
105 void wait_for_flush() {
106 Mutex::Locker
l(delay_lock
);
107 while (flush_count
> 0 || active_flush
)
108 delay_cond
.Wait(delay_lock
);
112 stop_delayed_delivery
= true;
116 void steal_for_pipe(Pipe
*new_owner
) {
117 Mutex::Locker
l(delay_lock
);
121 * We need to stop fast dispatching before we need to stop putting
122 * normal messages into the DispatchQueue.
124 void stop_fast_dispatching();
127 /**************************************
131 Pipe::Pipe(SimpleMessenger
*r
, int st
, PipeConnection
*con
)
132 : RefCountedObject(r
->cct
),
137 conn_id(r
->dispatch_queue
.get_id()),
142 pipe_lock("SimpleMessenger::Pipe::pipe_lock"),
144 connection_state(NULL
),
145 reader_running(false), reader_needs_join(false),
146 reader_dispatching(false), notify_on_dispatch_done(false),
147 writer_running(false),
148 in_q(&(r
->dispatch_queue
)),
149 send_keepalive(false),
150 send_keepalive_ack(false),
151 connect_seq(0), peer_global_seq(0),
152 out_seq(0), in_seq(0), in_seq_acked(0) {
153 ANNOTATE_BENIGN_RACE_SIZED(&sd
, sizeof(sd
), "Pipe socket");
154 ANNOTATE_BENIGN_RACE_SIZED(&state
, sizeof(state
), "Pipe state");
155 ANNOTATE_BENIGN_RACE_SIZED(&recv_len
, sizeof(recv_len
), "Pipe recv_len");
156 ANNOTATE_BENIGN_RACE_SIZED(&recv_ofs
, sizeof(recv_ofs
), "Pipe recv_ofs");
158 connection_state
= con
;
159 connection_state
->reset_pipe(this);
161 connection_state
= new PipeConnection(msgr
->cct
, msgr
);
162 connection_state
->pipe
= get();
167 msgr
->timeout
= msgr
->cct
->_conf
->ms_connection_idle_timeout
* 1000; //convert to ms
168 if (msgr
->timeout
== 0)
171 recv_max_prefetch
= msgr
->cct
->_conf
->ms_tcp_prefetch_max_size
;
172 recv_buf
= new char[recv_max_prefetch
];
177 ceph_assert(out_q
.empty());
178 ceph_assert(sent
.empty());
183 void Pipe::handle_ack(uint64_t seq
)
185 lsubdout(msgr
->cct
, ms
, 15) << "reader got ack seq " << seq
<< dendl
;
187 while (!sent
.empty() &&
188 sent
.front()->get_seq() <= seq
) {
189 Message
*m
= sent
.front();
191 lsubdout(msgr
->cct
, ms
, 10) << "reader got ack seq "
192 << seq
<< " >= " << m
->get_seq() << " on " << m
<< " " << *m
<< dendl
;
197 void Pipe::start_reader()
199 ceph_assert(pipe_lock
.is_locked());
200 ceph_assert(!reader_running
);
201 if (reader_needs_join
) {
202 reader_thread
.join();
203 reader_needs_join
= false;
205 reader_running
= true;
206 reader_thread
.create("ms_pipe_read", msgr
->cct
->_conf
->ms_rwthread_stack_bytes
);
209 void Pipe::maybe_start_delay_thread()
212 auto pos
= msgr
->cct
->_conf
.get_val
<std::string
>("ms_inject_delay_type").find(ceph_entity_type_name(connection_state
->peer_type
));
213 if (pos
!= string::npos
) {
214 lsubdout(msgr
->cct
, ms
, 1) << "setting up a delay queue on Pipe " << this << dendl
;
215 delay_thread
= new DelayedDelivery(this);
216 delay_thread
->create("ms_pipe_delay");
221 void Pipe::start_writer()
223 ceph_assert(pipe_lock
.is_locked());
224 ceph_assert(!writer_running
);
225 writer_running
= true;
226 writer_thread
.create("ms_pipe_write", msgr
->cct
->_conf
->ms_rwthread_stack_bytes
);
229 void Pipe::join_reader()
235 reader_thread
.join();
237 reader_needs_join
= false;
240 void Pipe::DelayedDelivery::discard()
242 lgeneric_subdout(pipe
->msgr
->cct
, ms
, 20) << *pipe
<< "DelayedDelivery::discard" << dendl
;
243 Mutex::Locker
l(delay_lock
);
244 while (!delay_queue
.empty()) {
245 Message
*m
= delay_queue
.front().second
;
246 pipe
->in_q
->dispatch_throttle_release(m
->get_dispatch_throttle_size());
248 delay_queue
.pop_front();
252 void Pipe::DelayedDelivery::flush()
254 lgeneric_subdout(pipe
->msgr
->cct
, ms
, 20) << *pipe
<< "DelayedDelivery::flush" << dendl
;
255 Mutex::Locker
l(delay_lock
);
256 flush_count
= delay_queue
.size();
260 void *Pipe::DelayedDelivery::entry()
262 Mutex::Locker
locker(delay_lock
);
263 lgeneric_subdout(pipe
->msgr
->cct
, ms
, 20) << *pipe
<< "DelayedDelivery::entry start" << dendl
;
265 while (!stop_delayed_delivery
) {
266 if (delay_queue
.empty()) {
267 lgeneric_subdout(pipe
->msgr
->cct
, ms
, 30) << *pipe
<< "DelayedDelivery::entry sleeping on delay_cond because delay queue is empty" << dendl
;
268 delay_cond
.Wait(delay_lock
);
271 utime_t release
= delay_queue
.front().first
;
272 Message
*m
= delay_queue
.front().second
;
273 string delay_msg_type
= pipe
->msgr
->cct
->_conf
->ms_inject_delay_msg_type
;
275 (release
> ceph_clock_now() &&
276 (delay_msg_type
.empty() || m
->get_type_name() == delay_msg_type
))) {
277 lgeneric_subdout(pipe
->msgr
->cct
, ms
, 10) << *pipe
<< "DelayedDelivery::entry sleeping on delay_cond until " << release
<< dendl
;
278 delay_cond
.WaitUntil(delay_lock
, release
);
281 lgeneric_subdout(pipe
->msgr
->cct
, ms
, 10) << *pipe
<< "DelayedDelivery::entry dequeuing message " << m
<< " for delivery, past " << release
<< dendl
;
282 delay_queue
.pop_front();
283 if (flush_count
> 0) {
287 if (pipe
->in_q
->can_fast_dispatch(m
)) {
288 if (!stop_fast_dispatching_flag
) {
289 delay_dispatching
= true;
291 pipe
->in_q
->fast_dispatch(m
);
293 delay_dispatching
= false;
294 if (stop_fast_dispatching_flag
) {
295 // we need to let the stopping thread proceed
302 pipe
->in_q
->enqueue(m
, m
->get_priority(), pipe
->conn_id
);
304 active_flush
= false;
306 lgeneric_subdout(pipe
->msgr
->cct
, ms
, 20) << *pipe
<< "DelayedDelivery::entry stop" << dendl
;
310 void Pipe::DelayedDelivery::stop_fast_dispatching() {
311 Mutex::Locker
l(delay_lock
);
312 stop_fast_dispatching_flag
= true;
313 while (delay_dispatching
)
314 delay_cond
.Wait(delay_lock
);
320 ldout(msgr
->cct
,10) << "accept" << dendl
;
321 ceph_assert(pipe_lock
.is_locked());
322 ceph_assert(state
== STATE_ACCEPTING
);
328 entity_addr_t socket_addr
;
331 char banner
[strlen(CEPH_BANNER
)+1];
333 ceph_msg_connect connect
;
334 ceph_msg_connect_reply reply
;
337 bufferlist authorizer
, authorizer_reply
;
338 bool authorizer_valid
;
339 uint64_t feat_missing
;
340 bool replaced
= false;
341 // this variable denotes if the connection attempt from peer is a hard
342 // reset or not, it is true if there is an existing connection and the
343 // connection sequence from peer is equal to zero
344 bool is_reset_from_peer
= false;
345 CryptoKey session_key
;
346 int removed
; // single-use down below
348 // this should roughly mirror pseudocode at
349 // http://ceph.com/wiki/Messaging_protocol
351 uint64_t existing_seq
= -1;
353 // used for reading in the remote acked seq on connect
354 uint64_t newly_acked_seq
= 0;
356 bool need_challenge
= false;
357 bool had_challenge
= false;
358 std::unique_ptr
<AuthAuthorizerChallenge
> authorizer_challenge
;
362 set_socket_options();
365 r
= tcp_write(CEPH_BANNER
, strlen(CEPH_BANNER
));
367 ldout(msgr
->cct
,10) << "accept couldn't write banner" << dendl
;
372 encode(msgr
->my_addr
, addrs
, 0); // legacy
374 port
= msgr
->my_addr
.get_port();
376 // and peer's socket addr (they might not know their ip)
379 r
= ::getpeername(sd
, (sockaddr
*)&ss
, &len
);
381 ldout(msgr
->cct
,0) << "accept failed to getpeername " << cpp_strerror(errno
) << dendl
;
384 socket_addr
.set_sockaddr((sockaddr
*)&ss
);
385 encode(socket_addr
, addrs
, 0); // legacy
387 r
= tcp_write(addrs
.c_str(), addrs
.length());
389 ldout(msgr
->cct
,10) << "accept couldn't write my+peer addr" << dendl
;
393 ldout(msgr
->cct
,1) << "accept sd=" << sd
<< " " << socket_addr
<< dendl
;
396 if (tcp_read(banner
, strlen(CEPH_BANNER
)) < 0) {
397 ldout(msgr
->cct
,10) << "accept couldn't read banner" << dendl
;
400 if (memcmp(banner
, CEPH_BANNER
, strlen(CEPH_BANNER
))) {
401 banner
[strlen(CEPH_BANNER
)] = 0;
402 ldout(msgr
->cct
,1) << "accept peer sent bad banner '" << banner
<< "' (should be '" << CEPH_BANNER
<< "')" << dendl
;
406 bufferptr
tp(sizeof(ceph_entity_addr
));
407 addrbl
.push_back(std::move(tp
));
409 if (tcp_read(addrbl
.c_str(), addrbl
.length()) < 0) {
410 ldout(msgr
->cct
,10) << "accept couldn't read peer_addr" << dendl
;
414 auto ti
= addrbl
.cbegin();
415 decode(peer_addr
, ti
);
416 } catch (const buffer::error
& e
) {
417 ldout(msgr
->cct
,2) << __func__
<< " decode peer_addr failed: " << e
.what()
422 ldout(msgr
->cct
,10) << "accept peer addr is " << peer_addr
<< dendl
;
423 if (peer_addr
.is_blank_ip()) {
424 // peer apparently doesn't know what ip they have; figure it out for them.
425 int port
= peer_addr
.get_port();
426 peer_addr
.u
= socket_addr
.u
;
427 peer_addr
.set_port(port
);
428 ldout(msgr
->cct
,0) << "accept peer addr is really " << peer_addr
429 << " (socket is " << socket_addr
<< ")" << dendl
;
431 set_peer_addr(peer_addr
); // so that connection_state gets set up
434 if (tcp_read((char*)&connect
, sizeof(connect
)) < 0) {
435 ldout(msgr
->cct
,10) << "accept couldn't read connect" << dendl
;
440 if (connect
.authorizer_len
) {
441 bp
= buffer::create(connect
.authorizer_len
);
442 if (tcp_read(bp
.c_str(), connect
.authorizer_len
) < 0) {
443 ldout(msgr
->cct
,10) << "accept couldn't read connect authorizer" << dendl
;
446 authorizer
.push_back(std::move(bp
));
447 authorizer_reply
.clear();
450 ldout(msgr
->cct
,20) << "accept got peer connect_seq " << connect
.connect_seq
451 << " global_seq " << connect
.global_seq
454 msgr
->lock
.Lock(); // FIXME
456 if (msgr
->dispatch_queue
.stop
)
458 if (state
!= STATE_ACCEPTING
) {
462 // note peer's type, flags
463 set_peer_type(connect
.host_type
);
464 policy
= msgr
->get_policy(connect
.host_type
);
465 ldout(msgr
->cct
,10) << "accept of host_type " << connect
.host_type
466 << ", policy.lossy=" << policy
.lossy
467 << " policy.server=" << policy
.server
468 << " policy.standby=" << policy
.standby
469 << " policy.resetcheck=" << policy
.resetcheck
472 memset(&reply
, 0, sizeof(reply
));
473 reply
.protocol_version
= msgr
->get_proto_version(peer_type
, false);
477 ldout(msgr
->cct
,10) << "accept my proto " << reply
.protocol_version
478 << ", their proto " << connect
.protocol_version
<< dendl
;
479 if (connect
.protocol_version
!= reply
.protocol_version
) {
480 reply
.tag
= CEPH_MSGR_TAG_BADPROTOVER
;
484 // require signatures for cephx?
485 if (connect
.authorizer_protocol
== CEPH_AUTH_CEPHX
) {
486 if (peer_type
== CEPH_ENTITY_TYPE_OSD
||
487 peer_type
== CEPH_ENTITY_TYPE_MDS
||
488 peer_type
== CEPH_ENTITY_TYPE_MGR
) {
489 if (msgr
->cct
->_conf
->cephx_require_signatures
||
490 msgr
->cct
->_conf
->cephx_cluster_require_signatures
) {
491 ldout(msgr
->cct
,10) << "using cephx, requiring MSG_AUTH feature bit for cluster" << dendl
;
492 policy
.features_required
|= CEPH_FEATURE_MSG_AUTH
;
494 if (msgr
->cct
->_conf
->cephx_require_version
>= 2 ||
495 msgr
->cct
->_conf
->cephx_cluster_require_version
>= 2) {
496 ldout(msgr
->cct
,10) << "using cephx, requiring cephx v2 feature bit for cluster" << dendl
;
497 policy
.features_required
|= CEPH_FEATUREMASK_CEPHX_V2
;
500 if (msgr
->cct
->_conf
->cephx_require_signatures
||
501 msgr
->cct
->_conf
->cephx_service_require_signatures
) {
502 ldout(msgr
->cct
,10) << "using cephx, requiring MSG_AUTH feature bit for service" << dendl
;
503 policy
.features_required
|= CEPH_FEATURE_MSG_AUTH
;
505 if (msgr
->cct
->_conf
->cephx_require_version
>= 2 ||
506 msgr
->cct
->_conf
->cephx_service_require_version
>= 2) {
507 ldout(msgr
->cct
,10) << "using cephx, requiring cephx v2 feature bit for cluster" << dendl
;
508 policy
.features_required
|= CEPH_FEATUREMASK_CEPHX_V2
;
513 feat_missing
= policy
.features_required
& ~(uint64_t)connect
.features
;
515 ldout(msgr
->cct
,1) << "peer missing required features " << std::hex
<< feat_missing
<< std::dec
<< dendl
;
516 reply
.tag
= CEPH_MSGR_TAG_FEATURES
;
520 // Check the authorizer. If not good, bail out.
524 need_challenge
= HAVE_FEATURE(connect
.features
, CEPHX_V2
);
525 had_challenge
= (bool)authorizer_challenge
;
526 authorizer_reply
.clear();
527 if (!msgr
->ms_deliver_verify_authorizer(
528 connection_state
.get(), peer_type
, connect
.authorizer_protocol
,
530 authorizer_reply
, authorizer_valid
, session_key
,
531 nullptr /* connection_secret */,
532 need_challenge
? &authorizer_challenge
: nullptr) ||
535 if (state
!= STATE_ACCEPTING
)
536 goto shutting_down_msgr_unlocked
;
537 if (!had_challenge
&& need_challenge
&& authorizer_challenge
) {
538 ldout(msgr
->cct
,10) << "accept: challenging authorizer "
539 << authorizer_reply
.length()
540 << " bytes" << dendl
;
541 ceph_assert(authorizer_reply
.length());
542 reply
.tag
= CEPH_MSGR_TAG_CHALLENGE_AUTHORIZER
;
544 ldout(msgr
->cct
,0) << "accept: got bad authorizer" << dendl
;
545 reply
.tag
= CEPH_MSGR_TAG_BADAUTHORIZER
;
547 session_security
.reset();
551 // We've verified the authorizer for this pipe, so set up the session security structure. PLR
553 ldout(msgr
->cct
,10) << "accept: setting up session_security." << dendl
;
555 retry_existing_lookup
:
558 if (msgr
->dispatch_queue
.stop
)
560 if (state
!= STATE_ACCEPTING
)
564 existing
= msgr
->_lookup_pipe(peer_addr
);
566 existing
->pipe_lock
.Lock(true); // skip lockdep check (we are locking a second Pipe here)
567 if (existing
->reader_dispatching
) {
568 /** we need to wait, or we can deadlock if downstream
569 * fast_dispatchers are (naughtily!) waiting on resources
570 * held by somebody trying to make use of the SimpleMessenger lock.
571 * So drop locks, wait, and retry. It just looks like a slow network
574 * We take a ref to existing here since it might get reaped before we
575 * wake up (see bug #15870). We can be confident that it lived until
576 * locked it since we held the msgr lock from _lookup_pipe through to
577 * locking existing->lock and checking reader_dispatching.
582 existing
->notify_on_dispatch_done
= true;
583 while (existing
->reader_dispatching
)
584 existing
->cond
.Wait(existing
->pipe_lock
);
585 existing
->pipe_lock
.Unlock();
588 goto retry_existing_lookup
;
591 if (connect
.global_seq
< existing
->peer_global_seq
) {
592 ldout(msgr
->cct
,10) << "accept existing " << existing
<< ".gseq " << existing
->peer_global_seq
593 << " > " << connect
.global_seq
<< ", RETRY_GLOBAL" << dendl
;
594 reply
.tag
= CEPH_MSGR_TAG_RETRY_GLOBAL
;
595 reply
.global_seq
= existing
->peer_global_seq
; // so we can send it below..
596 existing
->pipe_lock
.Unlock();
600 ldout(msgr
->cct
,10) << "accept existing " << existing
<< ".gseq " << existing
->peer_global_seq
601 << " <= " << connect
.global_seq
<< ", looks ok" << dendl
;
604 if (existing
->policy
.lossy
) {
605 ldout(msgr
->cct
,0) << "accept replacing existing (lossy) channel (new one lossy="
606 << policy
.lossy
<< ")" << dendl
;
607 existing
->was_session_reset();
611 ldout(msgr
->cct
,0) << "accept connect_seq " << connect
.connect_seq
612 << " vs existing " << existing
->connect_seq
613 << " state " << existing
->get_state_name() << dendl
;
615 if (connect
.connect_seq
== 0 && existing
->connect_seq
> 0) {
616 ldout(msgr
->cct
,0) << "accept peer reset, then tried to connect to us, replacing" << dendl
;
617 // this is a hard reset from peer
618 is_reset_from_peer
= true;
619 if (policy
.resetcheck
)
620 existing
->was_session_reset(); // this resets out_queue, msg_ and connect_seq #'s
624 if (connect
.connect_seq
< existing
->connect_seq
) {
625 // old attempt, or we sent READY but they didn't get it.
626 ldout(msgr
->cct
,10) << "accept existing " << existing
<< ".cseq " << existing
->connect_seq
627 << " > " << connect
.connect_seq
<< ", RETRY_SESSION" << dendl
;
631 if (connect
.connect_seq
== existing
->connect_seq
) {
632 // if the existing connection successfully opened, and/or
633 // subsequently went to standby, then the peer should bump
634 // their connect_seq and retry: this is not a connection race
635 // we need to resolve here.
636 if (existing
->state
== STATE_OPEN
||
637 existing
->state
== STATE_STANDBY
) {
638 ldout(msgr
->cct
,10) << "accept connection race, existing " << existing
639 << ".cseq " << existing
->connect_seq
640 << " == " << connect
.connect_seq
641 << ", OPEN|STANDBY, RETRY_SESSION" << dendl
;
646 if (peer_addr
< msgr
->my_addr
||
647 existing
->policy
.server
) {
649 ldout(msgr
->cct
,10) << "accept connection race, existing " << existing
<< ".cseq " << existing
->connect_seq
650 << " == " << connect
.connect_seq
<< ", or we are server, replacing my attempt" << dendl
;
651 if (!(existing
->state
== STATE_CONNECTING
||
652 existing
->state
== STATE_WAIT
))
653 lderr(msgr
->cct
) << "accept race bad state, would replace, existing="
654 << existing
->get_state_name()
655 << " " << existing
<< ".cseq=" << existing
->connect_seq
656 << " == " << connect
.connect_seq
658 ceph_assert(existing
->state
== STATE_CONNECTING
||
659 existing
->state
== STATE_WAIT
);
662 // our existing outgoing wins
663 ldout(msgr
->cct
,10) << "accept connection race, existing " << existing
<< ".cseq " << existing
->connect_seq
664 << " == " << connect
.connect_seq
<< ", sending WAIT" << dendl
;
665 ceph_assert(peer_addr
> msgr
->my_addr
);
666 if (!(existing
->state
== STATE_CONNECTING
))
667 lderr(msgr
->cct
) << "accept race bad state, would send wait, existing="
668 << existing
->get_state_name()
669 << " " << existing
<< ".cseq=" << existing
->connect_seq
670 << " == " << connect
.connect_seq
672 ceph_assert(existing
->state
== STATE_CONNECTING
);
673 // make sure our outgoing connection will follow through
674 existing
->_send_keepalive();
675 reply
.tag
= CEPH_MSGR_TAG_WAIT
;
676 existing
->pipe_lock
.Unlock();
682 ceph_assert(connect
.connect_seq
> existing
->connect_seq
);
683 ceph_assert(connect
.global_seq
>= existing
->peer_global_seq
);
684 if (policy
.resetcheck
&& // RESETSESSION only used by servers; peers do not reset each other
685 existing
->connect_seq
== 0) {
686 ldout(msgr
->cct
,0) << "accept we reset (peer sent cseq " << connect
.connect_seq
687 << ", " << existing
<< ".cseq = " << existing
->connect_seq
688 << "), sending RESETSESSION" << dendl
;
689 reply
.tag
= CEPH_MSGR_TAG_RESETSESSION
;
691 existing
->pipe_lock
.Unlock();
696 ldout(msgr
->cct
,10) << "accept peer sent cseq " << connect
.connect_seq
697 << " > " << existing
->connect_seq
<< dendl
;
700 else if (connect
.connect_seq
> 0) {
701 // we reset, and they are opening a new session
702 ldout(msgr
->cct
,0) << "accept we reset (peer sent cseq " << connect
.connect_seq
<< "), sending RESETSESSION" << dendl
;
704 reply
.tag
= CEPH_MSGR_TAG_RESETSESSION
;
708 ldout(msgr
->cct
,10) << "accept new session" << dendl
;
715 ceph_assert(existing
->pipe_lock
.is_locked());
716 ceph_assert(pipe_lock
.is_locked());
717 reply
.tag
= CEPH_MSGR_TAG_RETRY_SESSION
;
718 reply
.connect_seq
= existing
->connect_seq
+ 1;
719 existing
->pipe_lock
.Unlock();
724 ceph_assert(pipe_lock
.is_locked());
725 reply
.features
= ((uint64_t)connect
.features
& policy
.features_supported
) | policy
.features_required
;
726 reply
.authorizer_len
= authorizer_reply
.length();
728 r
= tcp_write((char*)&reply
, sizeof(reply
));
731 if (reply
.authorizer_len
) {
732 r
= tcp_write(authorizer_reply
.c_str(), authorizer_reply
.length());
739 ceph_assert(existing
->pipe_lock
.is_locked());
740 ceph_assert(pipe_lock
.is_locked());
741 // if it is a hard reset from peer, we don't need a round-trip to negotiate in/out sequence
742 if ((connect
.features
& CEPH_FEATURE_RECONNECT_SEQ
) && !is_reset_from_peer
) {
743 reply_tag
= CEPH_MSGR_TAG_SEQ
;
744 existing_seq
= existing
->in_seq
;
746 ldout(msgr
->cct
,10) << "accept replacing " << existing
<< dendl
;
748 existing
->unregister_pipe();
751 if (existing
->policy
.lossy
) {
752 // disconnect from the Connection
753 ceph_assert(existing
->connection_state
);
754 if (existing
->connection_state
->clear_pipe(existing
))
755 msgr
->dispatch_queue
.queue_reset(existing
->connection_state
.get());
757 // queue a reset on the new connection, which we're dumping for the old
758 msgr
->dispatch_queue
.queue_reset(connection_state
.get());
760 // drop my Connection, and take a ref to the existing one. do not
761 // clear existing->connection_state, since read_message and
762 // write_message both dereference it without pipe_lock.
763 connection_state
= existing
->connection_state
;
765 // make existing Connection reference us
766 connection_state
->reset_pipe(this);
768 if (existing
->delay_thread
) {
769 existing
->delay_thread
->steal_for_pipe(this);
770 delay_thread
= existing
->delay_thread
;
771 existing
->delay_thread
= NULL
;
772 delay_thread
->flush();
775 // steal incoming queue
776 uint64_t replaced_conn_id
= conn_id
;
777 conn_id
= existing
->conn_id
;
778 existing
->conn_id
= replaced_conn_id
;
780 // reset the in_seq if this is a hard reset from peer,
781 // otherwise we respect our original connection's value
782 in_seq
= is_reset_from_peer
? 0 : existing
->in_seq
;
783 in_seq_acked
= in_seq
;
785 // steal outgoing queue and out_seq
786 existing
->requeue_sent();
787 out_seq
= existing
->out_seq
;
788 ldout(msgr
->cct
,10) << "accept re-queuing on out_seq " << out_seq
<< " in_seq " << in_seq
<< dendl
;
789 for (map
<int, list
<Message
*> >::iterator p
= existing
->out_q
.begin();
790 p
!= existing
->out_q
.end();
792 out_q
[p
->first
].splice(out_q
[p
->first
].begin(), p
->second
);
794 existing
->stop_and_wait();
795 existing
->pipe_lock
.Unlock();
799 ceph_assert(pipe_lock
.is_locked());
800 connect_seq
= connect
.connect_seq
+ 1;
801 peer_global_seq
= connect
.global_seq
;
802 ceph_assert(state
== STATE_ACCEPTING
);
804 ldout(msgr
->cct
,10) << "accept success, connect_seq = " << connect_seq
<< ", sending READY" << dendl
;
807 reply
.tag
= (reply_tag
? reply_tag
: CEPH_MSGR_TAG_READY
);
808 reply
.features
= policy
.features_supported
;
809 reply
.global_seq
= msgr
->get_global_seq();
810 reply
.connect_seq
= connect_seq
;
812 reply
.authorizer_len
= authorizer_reply
.length();
814 reply
.flags
= reply
.flags
| CEPH_MSG_CONNECT_LOSSY
;
816 connection_state
->set_features((uint64_t)reply
.features
& (uint64_t)connect
.features
);
817 ldout(msgr
->cct
,10) << "accept features " << connection_state
->get_features() << dendl
;
819 session_security
.reset(
820 get_auth_session_handler(msgr
->cct
,
821 connect
.authorizer_protocol
,
823 connection_state
->get_features()));
826 msgr
->dispatch_queue
.queue_accept(connection_state
.get());
827 msgr
->ms_deliver_handle_fast_accept(connection_state
.get());
830 if (msgr
->dispatch_queue
.stop
)
832 removed
= msgr
->accepting_pipes
.erase(this);
833 ceph_assert(removed
== 1);
838 r
= tcp_write((char*)&reply
, sizeof(reply
));
840 goto fail_registered
;
843 if (reply
.authorizer_len
) {
844 r
= tcp_write(authorizer_reply
.c_str(), authorizer_reply
.length());
846 goto fail_registered
;
850 if (reply_tag
== CEPH_MSGR_TAG_SEQ
) {
851 if (tcp_write((char*)&existing_seq
, sizeof(existing_seq
)) < 0) {
852 ldout(msgr
->cct
,2) << "accept write error on in_seq" << dendl
;
853 goto fail_registered
;
855 if (tcp_read((char*)&newly_acked_seq
, sizeof(newly_acked_seq
)) < 0) {
856 ldout(msgr
->cct
,2) << "accept read error on newly_acked_seq" << dendl
;
857 goto fail_registered
;
862 discard_requeued_up_to(newly_acked_seq
);
863 if (state
!= STATE_CLOSED
) {
864 ldout(msgr
->cct
,10) << "accept starting writer, state " << get_state_name() << dendl
;
867 ldout(msgr
->cct
,20) << "accept done" << dendl
;
869 maybe_start_delay_thread();
871 return 0; // success.
874 ldout(msgr
->cct
, 10) << "accept fault after register" << dendl
;
876 if (msgr
->cct
->_conf
->ms_inject_internal_delays
) {
877 ldout(msgr
->cct
, 10) << " sleep for " << msgr
->cct
->_conf
->ms_inject_internal_delays
<< dendl
;
879 t
.set_from_double(msgr
->cct
->_conf
->ms_inject_internal_delays
);
885 if (state
!= STATE_CLOSED
) {
886 bool queued
= is_queued();
887 ldout(msgr
->cct
, 10) << " queued = " << (int)queued
<< dendl
;
889 state
= policy
.server
? STATE_STANDBY
: STATE_CONNECTING
;
890 } else if (replaced
) {
891 state
= STATE_STANDBY
;
893 state
= STATE_CLOSED
;
897 if (queued
|| replaced
)
904 shutting_down_msgr_unlocked
:
905 ceph_assert(pipe_lock
.is_locked());
907 if (msgr
->cct
->_conf
->ms_inject_internal_delays
) {
908 ldout(msgr
->cct
, 10) << " sleep for " << msgr
->cct
->_conf
->ms_inject_internal_delays
<< dendl
;
910 t
.set_from_double(msgr
->cct
->_conf
->ms_inject_internal_delays
);
914 state
= STATE_CLOSED
;
920 void Pipe::set_socket_options()
922 // disable Nagle algorithm?
923 if (msgr
->cct
->_conf
->ms_tcp_nodelay
) {
925 int r
= ::setsockopt(sd
, IPPROTO_TCP
, TCP_NODELAY
, (char*)&flag
, sizeof(flag
));
928 ldout(msgr
->cct
,0) << "couldn't set TCP_NODELAY: "
929 << cpp_strerror(r
) << dendl
;
932 if (msgr
->cct
->_conf
->ms_tcp_rcvbuf
) {
933 int size
= msgr
->cct
->_conf
->ms_tcp_rcvbuf
;
934 int r
= ::setsockopt(sd
, SOL_SOCKET
, SO_RCVBUF
, (void*)&size
, sizeof(size
));
937 ldout(msgr
->cct
,0) << "couldn't set SO_RCVBUF to " << size
938 << ": " << cpp_strerror(r
) << dendl
;
943 #ifdef CEPH_USE_SO_NOSIGPIPE
945 int r
= ::setsockopt(sd
, SOL_SOCKET
, SO_NOSIGPIPE
, (void*)&val
, sizeof(val
));
948 ldout(msgr
->cct
,0) << "couldn't set SO_NOSIGPIPE: "
949 << cpp_strerror(r
) << dendl
;
954 int prio
= msgr
->get_socket_priority();
957 #ifdef IPTOS_CLASS_CS6
958 int iptos
= IPTOS_CLASS_CS6
;
960 if (!peer_addr
.is_blank_ip()) {
961 addr_family
= peer_addr
.get_family();
963 addr_family
= msgr
->get_myaddr_legacy().get_family();
965 switch (addr_family
) {
967 r
= ::setsockopt(sd
, IPPROTO_IP
, IP_TOS
, &iptos
, sizeof(iptos
));
970 r
= ::setsockopt(sd
, IPPROTO_IPV6
, IPV6_TCLASS
, &iptos
, sizeof(iptos
));
973 lderr(msgr
->cct
) << "couldn't set ToS of unknown family ("
974 << addr_family
<< ")"
975 << " to " << iptos
<< dendl
;
980 ldout(msgr
->cct
,0) << "couldn't set TOS to " << iptos
981 << ": " << cpp_strerror(r
) << dendl
;
984 // setsockopt(IPTOS_CLASS_CS6) sets the priority of the socket as 0.
985 // See http://goo.gl/QWhvsD and http://goo.gl/laTbjT
986 // We need to call setsockopt(SO_PRIORITY) after it.
987 r
= ::setsockopt(sd
, SOL_SOCKET
, SO_PRIORITY
, &prio
, sizeof(prio
));
990 ldout(msgr
->cct
,0) << "couldn't set SO_PRIORITY to " << prio
991 << ": " << cpp_strerror(r
) << dendl
;
999 ldout(msgr
->cct
,10) << "connect " << connect_seq
<< dendl
;
1000 ceph_assert(pipe_lock
.is_locked());
1002 __u32 cseq
= connect_seq
;
1003 __u32 gseq
= msgr
->get_global_seq();
1005 // stop reader thread
1013 struct iovec msgvec
[2];
1015 char banner
[strlen(CEPH_BANNER
) + 1]; // extra byte makes coverity happy
1016 entity_addr_t paddr
;
1017 entity_addr_t peer_addr_for_me
, socket_addr
;
1018 AuthAuthorizer
*authorizer
= NULL
;
1019 bufferlist addrbl
, myaddrbl
;
1020 const auto& conf
= msgr
->cct
->_conf
;
1022 // close old socket. this is safe because we stopped the reader thread above.
1027 sd
= socket_cloexec(peer_addr
.get_family(), SOCK_STREAM
, 0);
1030 lderr(msgr
->cct
) << "connect couldn't create socket " << cpp_strerror(e
) << dendl
;
1037 set_socket_options();
1040 entity_addr_t addr2bind
= msgr
->get_myaddr_legacy();
1041 if (msgr
->cct
->_conf
->ms_bind_before_connect
&& (!addr2bind
.is_blank_ip())) {
1042 addr2bind
.set_port(0);
1043 int r
= ::bind(sd
, addr2bind
.get_sockaddr(), addr2bind
.get_sockaddr_len());
1045 ldout(msgr
->cct
,2) << "client bind error " << ", " << cpp_strerror(errno
) << dendl
;
1052 ldout(msgr
->cct
,10) << "connecting to " << peer_addr
<< dendl
;
1053 rc
= ::connect(sd
, peer_addr
.get_sockaddr(), peer_addr
.get_sockaddr_len());
1055 int stored_errno
= errno
;
1056 ldout(msgr
->cct
,2) << "connect error " << peer_addr
1057 << ", " << cpp_strerror(stored_errno
) << dendl
;
1058 if (stored_errno
== ECONNREFUSED
) {
1059 ldout(msgr
->cct
, 2) << "connection refused!" << dendl
;
1060 msgr
->dispatch_queue
.queue_refused(connection_state
.get());
1066 // FIXME: this should be non-blocking, or in some other way verify the banner as we get it.
1067 rc
= tcp_read((char*)&banner
, strlen(CEPH_BANNER
));
1069 ldout(msgr
->cct
,2) << "connect couldn't read banner, " << cpp_strerror(rc
) << dendl
;
1072 if (memcmp(banner
, CEPH_BANNER
, strlen(CEPH_BANNER
))) {
1073 ldout(msgr
->cct
,0) << "connect protocol error (bad banner) on peer " << peer_addr
<< dendl
;
1077 memset(&msg
, 0, sizeof(msg
));
1078 msgvec
[0].iov_base
= banner
;
1079 msgvec
[0].iov_len
= strlen(CEPH_BANNER
);
1080 msg
.msg_iov
= msgvec
;
1082 msglen
= msgvec
[0].iov_len
;
1083 rc
= do_sendmsg(&msg
, msglen
);
1085 ldout(msgr
->cct
,2) << "connect couldn't write my banner, " << cpp_strerror(rc
) << dendl
;
1091 #if defined(__linux__) || defined(__APPLE__) || defined(__FreeBSD__)
1092 bufferptr
p(sizeof(ceph_entity_addr
) * 2);
1094 int wirelen
= sizeof(__u32
) * 2 + sizeof(ceph_sockaddr_storage
);
1095 bufferptr
p(wirelen
* 2);
1097 addrbl
.push_back(std::move(p
));
1099 rc
= tcp_read(addrbl
.c_str(), addrbl
.length());
1101 ldout(msgr
->cct
,2) << "connect couldn't read peer addrs, " << cpp_strerror(rc
) << dendl
;
1105 auto p
= addrbl
.cbegin();
1107 decode(peer_addr_for_me
, p
);
1109 catch (buffer::error
& e
) {
1110 ldout(msgr
->cct
,2) << "connect couldn't decode peer addrs: " << e
.what()
1114 port
= peer_addr_for_me
.get_port();
1116 ldout(msgr
->cct
,20) << "connect read peer addr " << paddr
<< " on socket " << sd
<< dendl
;
1117 if (peer_addr
!= paddr
) {
1118 if (paddr
.is_blank_ip() &&
1119 peer_addr
.get_port() == paddr
.get_port() &&
1120 peer_addr
.get_nonce() == paddr
.get_nonce()) {
1121 ldout(msgr
->cct
,0) << "connect claims to be "
1122 << paddr
<< " not " << peer_addr
<< " - presumably this is the same node!" << dendl
;
1124 ldout(msgr
->cct
,10) << "connect claims to be "
1125 << paddr
<< " not " << peer_addr
<< dendl
;
1130 ldout(msgr
->cct
,20) << "connect peer addr for me is " << peer_addr_for_me
<< dendl
;
1132 msgr
->learned_addr(peer_addr_for_me
);
1134 encode(msgr
->my_addr
, myaddrbl
, 0); // legacy
1136 memset(&msg
, 0, sizeof(msg
));
1137 msgvec
[0].iov_base
= myaddrbl
.c_str();
1138 msgvec
[0].iov_len
= myaddrbl
.length();
1139 msg
.msg_iov
= msgvec
;
1141 msglen
= msgvec
[0].iov_len
;
1142 rc
= do_sendmsg(&msg
, msglen
);
1144 ldout(msgr
->cct
,2) << "connect couldn't write my addr, " << cpp_strerror(rc
) << dendl
;
1147 ldout(msgr
->cct
,10) << "connect sent my addr " << msgr
->my_addr
<< dendl
;
1152 authorizer
= msgr
->ms_deliver_get_authorizer(peer_type
);
1154 bufferlist authorizer_reply
;
1156 ceph_msg_connect connect
;
1157 connect
.features
= policy
.features_supported
;
1158 connect
.host_type
= msgr
->get_myname().type();
1159 connect
.global_seq
= gseq
;
1160 connect
.connect_seq
= cseq
;
1161 connect
.protocol_version
= msgr
->get_proto_version(peer_type
, true);
1162 connect
.authorizer_protocol
= authorizer
? authorizer
->protocol
: 0;
1163 connect
.authorizer_len
= authorizer
? authorizer
->bl
.length() : 0;
1165 ldout(msgr
->cct
,10) << "connect.authorizer_len=" << connect
.authorizer_len
1166 << " protocol=" << connect
.authorizer_protocol
<< dendl
;
1169 connect
.flags
|= CEPH_MSG_CONNECT_LOSSY
; // this is fyi, actually, server decides!
1170 memset(&msg
, 0, sizeof(msg
));
1171 msgvec
[0].iov_base
= (char*)&connect
;
1172 msgvec
[0].iov_len
= sizeof(connect
);
1173 msg
.msg_iov
= msgvec
;
1175 msglen
= msgvec
[0].iov_len
;
1177 msgvec
[1].iov_base
= authorizer
->bl
.c_str();
1178 msgvec
[1].iov_len
= authorizer
->bl
.length();
1180 msglen
+= msgvec
[1].iov_len
;
1183 ldout(msgr
->cct
,10) << "connect sending gseq=" << gseq
<< " cseq=" << cseq
1184 << " proto=" << connect
.protocol_version
<< dendl
;
1185 rc
= do_sendmsg(&msg
, msglen
);
1187 ldout(msgr
->cct
,2) << "connect couldn't write gseq, cseq, " << cpp_strerror(rc
) << dendl
;
1191 ldout(msgr
->cct
,20) << "connect wrote (self +) cseq, waiting for reply" << dendl
;
1192 ceph_msg_connect_reply reply
;
1193 rc
= tcp_read((char*)&reply
, sizeof(reply
));
1195 ldout(msgr
->cct
,2) << "connect read reply " << cpp_strerror(rc
) << dendl
;
1199 ldout(msgr
->cct
,20) << "connect got reply tag " << (int)reply
.tag
1200 << " connect_seq " << reply
.connect_seq
1201 << " global_seq " << reply
.global_seq
1202 << " proto " << reply
.protocol_version
1203 << " flags " << (int)reply
.flags
1204 << " features " << reply
.features
1207 authorizer_reply
.clear();
1209 if (reply
.authorizer_len
) {
1210 ldout(msgr
->cct
,10) << "reply.authorizer_len=" << reply
.authorizer_len
<< dendl
;
1211 bufferptr bp
= buffer::create(reply
.authorizer_len
);
1212 rc
= tcp_read(bp
.c_str(), reply
.authorizer_len
);
1214 ldout(msgr
->cct
,10) << "connect couldn't read connect authorizer_reply" << cpp_strerror(rc
) << dendl
;
1217 authorizer_reply
.push_back(bp
);
1220 if (reply
.tag
== CEPH_MSGR_TAG_CHALLENGE_AUTHORIZER
) {
1221 authorizer
->add_challenge(msgr
->cct
, authorizer_reply
);
1222 ldout(msgr
->cct
,10) << " got authorizer challenge, " << authorizer_reply
.length()
1223 << " bytes" << dendl
;
1228 auto iter
= authorizer_reply
.cbegin();
1229 if (!authorizer
->verify_reply(iter
, nullptr /* connection_secret */)) {
1230 ldout(msgr
->cct
,0) << "failed verifying authorize reply" << dendl
;
1235 if (conf
->ms_inject_internal_delays
) {
1236 ldout(msgr
->cct
, 10) << " sleep for " << msgr
->cct
->_conf
->ms_inject_internal_delays
<< dendl
;
1238 t
.set_from_double(msgr
->cct
->_conf
->ms_inject_internal_delays
);
1243 if (state
!= STATE_CONNECTING
) {
1244 ldout(msgr
->cct
,0) << "connect got RESETSESSION but no longer connecting" << dendl
;
1248 if (reply
.tag
== CEPH_MSGR_TAG_FEATURES
) {
1249 ldout(msgr
->cct
,0) << "connect protocol feature mismatch, my " << std::hex
1250 << connect
.features
<< " < peer " << reply
.features
1251 << " missing " << (reply
.features
& ~policy
.features_supported
)
1252 << std::dec
<< dendl
;
1256 if (reply
.tag
== CEPH_MSGR_TAG_BADPROTOVER
) {
1257 ldout(msgr
->cct
,0) << "connect protocol version mismatch, my " << connect
.protocol_version
1258 << " != " << reply
.protocol_version
<< dendl
;
1262 if (reply
.tag
== CEPH_MSGR_TAG_BADAUTHORIZER
) {
1263 ldout(msgr
->cct
,0) << "connect got BADAUTHORIZER" << dendl
;
1266 if (reply
.tag
== CEPH_MSGR_TAG_RESETSESSION
) {
1267 ldout(msgr
->cct
,0) << "connect got RESETSESSION" << dendl
;
1268 was_session_reset();
1273 if (reply
.tag
== CEPH_MSGR_TAG_RETRY_GLOBAL
) {
1274 gseq
= msgr
->get_global_seq(reply
.global_seq
);
1275 ldout(msgr
->cct
,10) << "connect got RETRY_GLOBAL " << reply
.global_seq
1276 << " chose new " << gseq
<< dendl
;
1280 if (reply
.tag
== CEPH_MSGR_TAG_RETRY_SESSION
) {
1281 ceph_assert(reply
.connect_seq
> connect_seq
);
1282 ldout(msgr
->cct
,10) << "connect got RETRY_SESSION " << connect_seq
1283 << " -> " << reply
.connect_seq
<< dendl
;
1284 cseq
= connect_seq
= reply
.connect_seq
;
1289 if (reply
.tag
== CEPH_MSGR_TAG_WAIT
) {
1290 ldout(msgr
->cct
,3) << "connect got WAIT (connection race)" << dendl
;
1295 if (reply
.tag
== CEPH_MSGR_TAG_READY
||
1296 reply
.tag
== CEPH_MSGR_TAG_SEQ
) {
1297 uint64_t feat_missing
= policy
.features_required
& ~(uint64_t)reply
.features
;
1299 ldout(msgr
->cct
,1) << "missing required features " << std::hex
<< feat_missing
<< std::dec
<< dendl
;
1303 if (reply
.tag
== CEPH_MSGR_TAG_SEQ
) {
1304 ldout(msgr
->cct
,10) << "got CEPH_MSGR_TAG_SEQ, reading acked_seq and writing in_seq" << dendl
;
1305 uint64_t newly_acked_seq
= 0;
1306 rc
= tcp_read((char*)&newly_acked_seq
, sizeof(newly_acked_seq
));
1308 ldout(msgr
->cct
,2) << "connect read error on newly_acked_seq" << cpp_strerror(rc
) << dendl
;
1311 ldout(msgr
->cct
,2) << " got newly_acked_seq " << newly_acked_seq
1312 << " vs out_seq " << out_seq
<< dendl
;
1313 while (newly_acked_seq
> out_seq
) {
1314 Message
*m
= _get_next_outgoing();
1316 ldout(msgr
->cct
,2) << " discarding previously sent " << m
->get_seq()
1317 << " " << *m
<< dendl
;
1318 ceph_assert(m
->get_seq() <= newly_acked_seq
);
1322 if (tcp_write((char*)&in_seq
, sizeof(in_seq
)) < 0) {
1323 ldout(msgr
->cct
,2) << "connect write error on in_seq" << dendl
;
1329 peer_global_seq
= reply
.global_seq
;
1330 policy
.lossy
= reply
.flags
& CEPH_MSG_CONNECT_LOSSY
;
1332 connect_seq
= cseq
+ 1;
1333 ceph_assert(connect_seq
== reply
.connect_seq
);
1334 backoff
= utime_t();
1335 connection_state
->set_features((uint64_t)reply
.features
& (uint64_t)connect
.features
);
1336 ldout(msgr
->cct
,10) << "connect success " << connect_seq
<< ", lossy = " << policy
.lossy
1337 << ", features " << connection_state
->get_features() << dendl
;
1340 // If we have an authorizer, get a new AuthSessionHandler to deal with ongoing security of the
1343 if (authorizer
!= NULL
) {
1344 session_security
.reset(
1345 get_auth_session_handler(
1347 authorizer
->protocol
,
1348 authorizer
->session_key
,
1349 connection_state
->get_features()));
1351 // We have no authorizer, so we shouldn't be applying security to messages in this pipe. PLR
1352 session_security
.reset();
1355 msgr
->dispatch_queue
.queue_connect(connection_state
.get());
1356 msgr
->ms_deliver_handle_fast_connect(connection_state
.get());
1358 if (!reader_running
) {
1359 ldout(msgr
->cct
,20) << "connect starting reader" << dendl
;
1362 maybe_start_delay_thread();
1368 ldout(msgr
->cct
,0) << "connect got bad tag " << (int)tag
<< dendl
;
1373 if (conf
->ms_inject_internal_delays
) {
1374 ldout(msgr
->cct
, 10) << " sleep for " << msgr
->cct
->_conf
->ms_inject_internal_delays
<< dendl
;
1376 t
.set_from_double(msgr
->cct
->_conf
->ms_inject_internal_delays
);
1382 if (state
== STATE_CONNECTING
)
1385 ldout(msgr
->cct
,3) << "connect fault, but state = " << get_state_name()
1386 << " != connecting, stopping" << dendl
;
1393 void Pipe::register_pipe()
1395 ldout(msgr
->cct
,10) << "register_pipe" << dendl
;
1396 ceph_assert(msgr
->lock
.is_locked());
1397 Pipe
*existing
= msgr
->_lookup_pipe(peer_addr
);
1398 ceph_assert(existing
== NULL
);
1399 msgr
->rank_pipe
[peer_addr
] = this;
1402 void Pipe::unregister_pipe()
1404 ceph_assert(msgr
->lock
.is_locked());
1405 ceph::unordered_map
<entity_addr_t
,Pipe
*>::iterator p
= msgr
->rank_pipe
.find(peer_addr
);
1406 if (p
!= msgr
->rank_pipe
.end() && p
->second
== this) {
1407 ldout(msgr
->cct
,10) << "unregister_pipe" << dendl
;
1408 msgr
->rank_pipe
.erase(p
);
1410 ldout(msgr
->cct
,10) << "unregister_pipe - not registered" << dendl
;
1411 msgr
->accepting_pipes
.erase(this); // somewhat overkill, but safe.
1417 ldout(msgr
->cct
, 20) << "join" << dendl
;
1418 if (writer_thread
.is_started())
1419 writer_thread
.join();
1420 if (reader_thread
.is_started())
1421 reader_thread
.join();
1423 ldout(msgr
->cct
, 20) << "joining delay_thread" << dendl
;
1424 delay_thread
->stop();
1425 delay_thread
->join();
1429 void Pipe::requeue_sent()
1434 list
<Message
*>& rq
= out_q
[CEPH_MSG_PRIO_HIGHEST
];
1435 while (!sent
.empty()) {
1436 Message
*m
= sent
.back();
1438 ldout(msgr
->cct
,10) << "requeue_sent " << *m
<< " for resend seq " << out_seq
1439 << " (" << m
->get_seq() << ")" << dendl
;
1445 void Pipe::discard_requeued_up_to(uint64_t seq
)
1447 ldout(msgr
->cct
, 10) << "discard_requeued_up_to " << seq
<< dendl
;
1448 if (out_q
.count(CEPH_MSG_PRIO_HIGHEST
) == 0) {
1452 list
<Message
*>& rq
= out_q
[CEPH_MSG_PRIO_HIGHEST
];
1453 while (!rq
.empty()) {
1454 Message
*m
= rq
.front();
1455 if (m
->get_seq() == 0 || m
->get_seq() > seq
)
1457 ldout(msgr
->cct
,10) << "discard_requeued_up_to " << *m
<< " for resend seq " << out_seq
1458 << " <= " << seq
<< ", discarding" << dendl
;
1464 out_q
.erase(CEPH_MSG_PRIO_HIGHEST
);
1468 * Tears down the Pipe's message queues, and removes them from the DispatchQueue
1469 * Must hold pipe_lock prior to calling.
1471 void Pipe::discard_out_queue()
1473 ldout(msgr
->cct
,10) << "discard_queue" << dendl
;
1475 for (list
<Message
*>::iterator p
= sent
.begin(); p
!= sent
.end(); ++p
) {
1476 ldout(msgr
->cct
,20) << " discard " << *p
<< dendl
;
1480 for (map
<int,list
<Message
*> >::iterator p
= out_q
.begin(); p
!= out_q
.end(); ++p
)
1481 for (list
<Message
*>::iterator r
= p
->second
.begin(); r
!= p
->second
.end(); ++r
) {
1482 ldout(msgr
->cct
,20) << " discard " << *r
<< dendl
;
1488 void Pipe::fault(bool onread
)
1490 const auto& conf
= msgr
->cct
->_conf
;
1491 ceph_assert(pipe_lock
.is_locked());
1494 if (onread
&& state
== STATE_CONNECTING
) {
1495 ldout(msgr
->cct
,10) << "fault already connecting, reader shutting down" << dendl
;
1499 ldout(msgr
->cct
,2) << "fault " << cpp_strerror(errno
) << dendl
;
1501 if (state
== STATE_CLOSED
||
1502 state
== STATE_CLOSING
) {
1503 ldout(msgr
->cct
,10) << "fault already closed|closing" << dendl
;
1504 if (connection_state
->clear_pipe(this))
1505 msgr
->dispatch_queue
.queue_reset(connection_state
.get());
1512 if (policy
.lossy
&& state
!= STATE_CONNECTING
) {
1513 ldout(msgr
->cct
,10) << "fault on lossy channel, failing" << dendl
;
1515 // disconnect from Connection, and mark it failed. future messages
1517 ceph_assert(connection_state
);
1519 bool cleared
= connection_state
->clear_pipe(this);
1521 // crib locks, blech. note that Pipe is now STATE_CLOSED and the
1522 // rank_pipe entry is ignored by others.
1525 if (conf
->ms_inject_internal_delays
) {
1526 ldout(msgr
->cct
, 10) << " sleep for " << msgr
->cct
->_conf
->ms_inject_internal_delays
<< dendl
;
1528 t
.set_from_double(msgr
->cct
->_conf
->ms_inject_internal_delays
);
1535 msgr
->lock
.Unlock();
1538 delay_thread
->discard();
1539 in_q
->discard_queue(conn_id
);
1540 discard_out_queue();
1542 msgr
->dispatch_queue
.queue_reset(connection_state
.get());
1546 // queue delayed items immediately
1548 delay_thread
->flush();
1550 // requeue sent items
1553 if (policy
.standby
&& !is_queued()) {
1554 ldout(msgr
->cct
,0) << "fault with nothing to send, going to standby" << dendl
;
1555 state
= STATE_STANDBY
;
1559 if (state
!= STATE_CONNECTING
) {
1560 if (policy
.server
) {
1561 ldout(msgr
->cct
,0) << "fault, server, going to standby" << dendl
;
1562 state
= STATE_STANDBY
;
1564 ldout(msgr
->cct
,0) << "fault, initiating reconnect" << dendl
;
1566 state
= STATE_CONNECTING
;
1568 backoff
= utime_t();
1569 } else if (backoff
== utime_t()) {
1570 ldout(msgr
->cct
,0) << "fault" << dendl
;
1571 backoff
.set_from_double(conf
->ms_initial_backoff
);
1573 ldout(msgr
->cct
,10) << "fault waiting " << backoff
<< dendl
;
1574 cond
.WaitInterval(pipe_lock
, backoff
);
1576 if (backoff
> conf
->ms_max_backoff
)
1577 backoff
.set_from_double(conf
->ms_max_backoff
);
1578 ldout(msgr
->cct
,10) << "fault done waiting or woke up" << dendl
;
1582 void Pipe::randomize_out_seq()
1584 if (connection_state
->get_features() & CEPH_FEATURE_MSG_AUTH
) {
1585 // Set out_seq to a random value, so CRC won't be predictable.
1586 out_seq
= ceph::util::generate_random_number
<uint64_t>(0, SEQ_MASK
);
1587 lsubdout(msgr
->cct
, ms
, 10) << "randomize_out_seq " << out_seq
<< dendl
;
1589 // previously, seq #'s always started at 0.
1594 void Pipe::was_session_reset()
1596 ceph_assert(pipe_lock
.is_locked());
1598 ldout(msgr
->cct
,10) << "was_session_reset" << dendl
;
1599 in_q
->discard_queue(conn_id
);
1601 delay_thread
->discard();
1602 discard_out_queue();
1604 msgr
->dispatch_queue
.queue_remote_reset(connection_state
.get());
1606 randomize_out_seq();
1614 ldout(msgr
->cct
,10) << "stop" << dendl
;
1615 ceph_assert(pipe_lock
.is_locked());
1616 state
= STATE_CLOSED
;
1617 state_closed
= true;
1622 void Pipe::stop_and_wait()
1624 ceph_assert(pipe_lock
.is_locked_by_me());
1625 if (state
!= STATE_CLOSED
)
1628 if (msgr
->cct
->_conf
->ms_inject_internal_delays
) {
1629 ldout(msgr
->cct
, 10) << __func__
<< " sleep for "
1630 << msgr
->cct
->_conf
->ms_inject_internal_delays
1633 t
.set_from_double(msgr
->cct
->_conf
->ms_inject_internal_delays
);
1639 delay_thread
->stop_fast_dispatching();
1642 while (reader_running
&&
1644 cond
.Wait(pipe_lock
);
1647 /* read msgs from socket.
1654 if (state
== STATE_ACCEPTING
) {
1656 ceph_assert(pipe_lock
.is_locked());
1660 while (state
!= STATE_CLOSED
&&
1661 state
!= STATE_CONNECTING
) {
1662 ceph_assert(pipe_lock
.is_locked());
1664 // sleep if (re)connecting
1665 if (state
== STATE_STANDBY
) {
1666 ldout(msgr
->cct
,20) << "reader sleeping during reconnect|standby" << dendl
;
1667 cond
.Wait(pipe_lock
);
1671 // get a reference to the AuthSessionHandler while we have the pipe_lock
1672 std::shared_ptr
<AuthSessionHandler
> auth_handler
= session_security
;
1677 ldout(msgr
->cct
,20) << "reader reading tag..." << dendl
;
1678 if (tcp_read((char*)&tag
, 1) < 0) {
1680 ldout(msgr
->cct
,2) << "reader couldn't read tag, " << cpp_strerror(errno
) << dendl
;
1685 if (tag
== CEPH_MSGR_TAG_KEEPALIVE
) {
1686 ldout(msgr
->cct
,2) << "reader got KEEPALIVE" << dendl
;
1688 connection_state
->set_last_keepalive(ceph_clock_now());
1691 if (tag
== CEPH_MSGR_TAG_KEEPALIVE2
) {
1692 ldout(msgr
->cct
,30) << "reader got KEEPALIVE2 tag ..." << dendl
;
1694 int rc
= tcp_read((char*)&t
, sizeof(t
));
1697 ldout(msgr
->cct
,2) << "reader couldn't read KEEPALIVE2 stamp "
1698 << cpp_strerror(errno
) << dendl
;
1701 send_keepalive_ack
= true;
1702 keepalive_ack_stamp
= utime_t(t
);
1703 ldout(msgr
->cct
,2) << "reader got KEEPALIVE2 " << keepalive_ack_stamp
1705 connection_state
->set_last_keepalive(ceph_clock_now());
1710 if (tag
== CEPH_MSGR_TAG_KEEPALIVE2_ACK
) {
1711 ldout(msgr
->cct
,2) << "reader got KEEPALIVE_ACK" << dendl
;
1712 struct ceph_timespec t
;
1713 int rc
= tcp_read((char*)&t
, sizeof(t
));
1716 ldout(msgr
->cct
,2) << "reader couldn't read KEEPALIVE2 stamp " << cpp_strerror(errno
) << dendl
;
1719 connection_state
->set_last_keepalive_ack(utime_t(t
));
1725 if (tag
== CEPH_MSGR_TAG_ACK
) {
1726 ldout(msgr
->cct
,20) << "reader got ACK" << dendl
;
1728 int rc
= tcp_read((char*)&seq
, sizeof(seq
));
1731 ldout(msgr
->cct
,2) << "reader couldn't read ack seq, " << cpp_strerror(errno
) << dendl
;
1733 } else if (state
!= STATE_CLOSED
) {
1739 else if (tag
== CEPH_MSGR_TAG_MSG
) {
1740 ldout(msgr
->cct
,20) << "reader got MSG" << dendl
;
1742 int r
= read_message(&m
, auth_handler
.get());
1752 m
->trace
.event("pipe read message");
1754 if (state
== STATE_CLOSED
||
1755 state
== STATE_CONNECTING
) {
1756 in_q
->dispatch_throttle_release(m
->get_dispatch_throttle_size());
1761 // check received seq#. if it is old, drop the message.
1762 // note that incoming messages may skip ahead. this is convenient for the client
1763 // side queueing because messages can't be renumbered, but the (kernel) client will
1764 // occasionally pull a message out of the sent queue to send elsewhere. in that case
1765 // it doesn't matter if we "got" it or not.
1766 if (m
->get_seq() <= in_seq
) {
1767 ldout(msgr
->cct
,0) << "reader got old message "
1768 << m
->get_seq() << " <= " << in_seq
<< " " << m
<< " " << *m
1769 << ", discarding" << dendl
;
1770 in_q
->dispatch_throttle_release(m
->get_dispatch_throttle_size());
1772 if (connection_state
->has_feature(CEPH_FEATURE_RECONNECT_SEQ
) &&
1773 msgr
->cct
->_conf
->ms_die_on_old_message
)
1774 ceph_abort_msg("old msgs despite reconnect_seq feature");
1777 if (m
->get_seq() > in_seq
+ 1) {
1778 ldout(msgr
->cct
,0) << "reader missed message? skipped from seq "
1779 << in_seq
<< " to " << m
->get_seq() << dendl
;
1780 if (msgr
->cct
->_conf
->ms_die_on_skipped_message
)
1781 ceph_abort_msg("skipped incoming seq");
1784 m
->set_connection(connection_state
.get());
1786 // note last received message.
1787 in_seq
= m
->get_seq();
1789 cond
.Signal(); // wake up writer, to ack this
1791 ldout(msgr
->cct
,10) << "reader got message "
1792 << m
->get_seq() << " " << m
<< " " << *m
1794 in_q
->fast_preprocess(m
);
1798 if (rand() % 10000 < msgr
->cct
->_conf
->ms_inject_delay_probability
* 10000.0) {
1799 release
= m
->get_recv_stamp();
1800 release
+= msgr
->cct
->_conf
->ms_inject_delay_max
* (double)(rand() % 10000) / 10000.0;
1801 lsubdout(msgr
->cct
, ms
, 1) << "queue_received will delay until " << release
<< " on " << m
<< " " << *m
<< dendl
;
1803 delay_thread
->queue(release
, m
);
1805 if (in_q
->can_fast_dispatch(m
)) {
1806 reader_dispatching
= true;
1808 in_q
->fast_dispatch(m
);
1810 reader_dispatching
= false;
1811 if (state
== STATE_CLOSED
||
1812 notify_on_dispatch_done
) { // there might be somebody waiting
1813 notify_on_dispatch_done
= false;
1817 in_q
->enqueue(m
, m
->get_priority(), conn_id
);
1822 else if (tag
== CEPH_MSGR_TAG_CLOSE
) {
1823 ldout(msgr
->cct
,20) << "reader got CLOSE" << dendl
;
1825 if (state
== STATE_CLOSING
) {
1826 state
= STATE_CLOSED
;
1827 state_closed
= true;
1829 state
= STATE_CLOSING
;
1835 ldout(msgr
->cct
,0) << "reader bad tag " << (int)tag
<< dendl
;
1843 reader_running
= false;
1844 reader_needs_join
= true;
1845 unlock_maybe_reap();
1846 ldout(msgr
->cct
,10) << "reader done" << dendl
;
1849 /* write msgs to socket.
1855 while (state
!= STATE_CLOSED
) {// && state != STATE_WAIT) {
1856 ldout(msgr
->cct
,10) << "writer: state = " << get_state_name()
1857 << " policy.server=" << policy
.server
<< dendl
;
1860 if (is_queued() && state
== STATE_STANDBY
&& !policy
.server
)
1861 state
= STATE_CONNECTING
;
1864 if (state
== STATE_CONNECTING
) {
1865 ceph_assert(!policy
.server
);
1870 if (state
== STATE_CLOSING
) {
1872 ldout(msgr
->cct
,20) << "writer writing CLOSE tag" << dendl
;
1873 char tag
= CEPH_MSGR_TAG_CLOSE
;
1874 state
= STATE_CLOSED
;
1875 state_closed
= true;
1878 // we can ignore return value, actually; we don't care if this succeeds.
1879 int r
= ::write(sd
, &tag
, 1);
1886 if (state
!= STATE_CONNECTING
&& state
!= STATE_WAIT
&& state
!= STATE_STANDBY
&&
1887 (is_queued() || in_seq
> in_seq_acked
)) {
1890 if (send_keepalive
) {
1892 if (connection_state
->has_feature(CEPH_FEATURE_MSGR_KEEPALIVE2
)) {
1894 rc
= write_keepalive2(CEPH_MSGR_TAG_KEEPALIVE2
,
1898 rc
= write_keepalive();
1902 ldout(msgr
->cct
,2) << "writer couldn't write keepalive[2], "
1903 << cpp_strerror(errno
) << dendl
;
1907 send_keepalive
= false;
1909 if (send_keepalive_ack
) {
1910 utime_t t
= keepalive_ack_stamp
;
1912 int rc
= write_keepalive2(CEPH_MSGR_TAG_KEEPALIVE2_ACK
, t
);
1915 ldout(msgr
->cct
,2) << "writer couldn't write keepalive_ack, " << cpp_strerror(errno
) << dendl
;
1919 send_keepalive_ack
= false;
1923 if (in_seq
> in_seq_acked
) {
1924 uint64_t send_seq
= in_seq
;
1926 int rc
= write_ack(send_seq
);
1929 ldout(msgr
->cct
,2) << "writer couldn't write ack, " << cpp_strerror(errno
) << dendl
;
1933 in_seq_acked
= send_seq
;
1936 // grab outgoing message
1937 Message
*m
= _get_next_outgoing();
1939 m
->set_seq(++out_seq
);
1940 if (!policy
.lossy
) {
1946 // associate message with Connection (for benefit of encode_payload)
1947 m
->set_connection(connection_state
.get());
1949 uint64_t features
= connection_state
->get_features();
1951 if (m
->empty_payload())
1952 ldout(msgr
->cct
,20) << "writer encoding " << m
->get_seq() << " features " << features
1953 << " " << m
<< " " << *m
<< dendl
;
1955 ldout(msgr
->cct
,20) << "writer half-reencoding " << m
->get_seq() << " features " << features
1956 << " " << m
<< " " << *m
<< dendl
;
1958 // encode and copy out of *m
1959 m
->encode(features
, msgr
->crcflags
);
1961 // prepare everything
1962 const ceph_msg_header
& header
= m
->get_header();
1963 const ceph_msg_footer
& footer
= m
->get_footer();
1965 // Now that we have all the crcs calculated, handle the
1966 // digital signature for the message, if the pipe has session
1967 // security set up. Some session security options do not
1968 // actually calculate and check the signature, but they should
1969 // handle the calls to sign_message and check_signature. PLR
1970 if (session_security
.get() == NULL
) {
1971 ldout(msgr
->cct
, 20) << "writer no session security" << dendl
;
1973 if (session_security
->sign_message(m
)) {
1974 ldout(msgr
->cct
, 20) << "writer failed to sign seq # " << header
.seq
1975 << "): sig = " << footer
.sig
<< dendl
;
1977 ldout(msgr
->cct
, 20) << "writer signed seq # " << header
.seq
1978 << "): sig = " << footer
.sig
<< dendl
;
1982 bufferlist blist
= m
->get_payload();
1983 blist
.append(m
->get_middle());
1984 blist
.append(m
->get_data());
1988 m
->trace
.event("pipe writing message");
1990 ldout(msgr
->cct
,20) << "writer sending " << m
->get_seq() << " " << m
<< dendl
;
1991 int rc
= write_message(header
, footer
, blist
);
1995 ldout(msgr
->cct
,1) << "writer error sending " << m
<< ", "
1996 << cpp_strerror(errno
) << dendl
;
2005 ldout(msgr
->cct
,20) << "writer sleeping" << dendl
;
2006 cond
.Wait(pipe_lock
);
2009 ldout(msgr
->cct
,20) << "writer finishing" << dendl
;
2012 writer_running
= false;
2013 unlock_maybe_reap();
2014 ldout(msgr
->cct
,10) << "writer done" << dendl
;
2017 void Pipe::unlock_maybe_reap()
2019 if (!reader_running
&& !writer_running
) {
2022 if (delay_thread
&& delay_thread
->is_flushing()) {
2023 delay_thread
->wait_for_flush();
2025 msgr
->queue_reap(this);
2031 static void alloc_aligned_buffer(bufferlist
& data
, unsigned len
, unsigned off
)
2033 // create a buffer to read into that matches the data alignment
2034 unsigned left
= len
;
2035 if (off
& ~CEPH_PAGE_MASK
) {
2038 head
= std::min
<uint64_t>(CEPH_PAGE_SIZE
- (off
& ~CEPH_PAGE_MASK
), left
);
2039 data
.push_back(buffer::create(head
));
2042 unsigned middle
= left
& CEPH_PAGE_MASK
;
2044 data
.push_back(buffer::create_small_page_aligned(middle
));
2048 data
.push_back(buffer::create(left
));
2052 int Pipe::read_message(Message
**pm
, AuthSessionHandler
* auth_handler
)
2056 //ldout(msgr->cct,10) << "receiver.read_message from sd " << sd << dendl;
2058 ceph_msg_header header
;
2059 ceph_msg_footer footer
;
2060 __u32 header_crc
= 0;
2062 if (tcp_read((char*)&header
, sizeof(header
)) < 0)
2064 if (msgr
->crcflags
& MSG_CRC_HEADER
) {
2065 header_crc
= ceph_crc32c(0, (unsigned char *)&header
, sizeof(header
) - sizeof(header
.crc
));
2068 ldout(msgr
->cct
,20) << "reader got envelope type=" << header
.type
2069 << " src " << entity_name_t(header
.src
)
2070 << " front=" << header
.front_len
2071 << " data=" << header
.data_len
2072 << " off " << header
.data_off
2075 // verify header crc
2076 if ((msgr
->crcflags
& MSG_CRC_HEADER
) && header_crc
!= header
.crc
) {
2077 ldout(msgr
->cct
,0) << "reader got bad header crc " << header_crc
<< " != " << header
.crc
<< dendl
;
2081 bufferlist front
, middle
, data
;
2082 int front_len
, middle_len
;
2083 unsigned data_len
, data_off
;
2086 utime_t recv_stamp
= ceph_clock_now();
2088 if (policy
.throttler_messages
) {
2089 ldout(msgr
->cct
,10) << "reader wants " << 1 << " message from policy throttler "
2090 << policy
.throttler_messages
->get_current() << "/"
2091 << policy
.throttler_messages
->get_max() << dendl
;
2092 policy
.throttler_messages
->get();
2095 uint64_t message_size
= header
.front_len
+ header
.middle_len
+ header
.data_len
;
2097 if (policy
.throttler_bytes
) {
2098 ldout(msgr
->cct
,10) << "reader wants " << message_size
<< " bytes from policy throttler "
2099 << policy
.throttler_bytes
->get_current() << "/"
2100 << policy
.throttler_bytes
->get_max() << dendl
;
2101 policy
.throttler_bytes
->get(message_size
);
2104 // throttle total bytes waiting for dispatch. do this _after_ the
2105 // policy throttle, as this one does not deadlock (unless dispatch
2106 // blocks indefinitely, which it shouldn't). in contrast, the
2107 // policy throttle carries for the lifetime of the message.
2108 ldout(msgr
->cct
,10) << "reader wants " << message_size
<< " from dispatch throttler "
2109 << in_q
->dispatch_throttler
.get_current() << "/"
2110 << in_q
->dispatch_throttler
.get_max() << dendl
;
2111 in_q
->dispatch_throttler
.get(message_size
);
2114 utime_t throttle_stamp
= ceph_clock_now();
2117 front_len
= header
.front_len
;
2119 bufferptr bp
= buffer::create(front_len
);
2120 if (tcp_read(bp
.c_str(), front_len
) < 0)
2121 goto out_dethrottle
;
2122 front
.push_back(std::move(bp
));
2123 ldout(msgr
->cct
,20) << "reader got front " << front
.length() << dendl
;
2127 middle_len
= header
.middle_len
;
2129 bufferptr bp
= buffer::create(middle_len
);
2130 if (tcp_read(bp
.c_str(), middle_len
) < 0)
2131 goto out_dethrottle
;
2132 middle
.push_back(std::move(bp
));
2133 ldout(msgr
->cct
,20) << "reader got middle " << middle
.length() << dendl
;
2138 data_len
= le32_to_cpu(header
.data_len
);
2139 data_off
= le32_to_cpu(header
.data_off
);
2141 unsigned offset
= 0;
2142 unsigned left
= data_len
;
2144 bufferlist newbuf
, rxbuf
;
2145 bufferlist::iterator blp
;
2146 // int rxbuf_version = 0;
2150 if (tcp_read_wait() < 0)
2151 goto out_dethrottle
;
2155 // The rx_buffers implementation is buggy:
2156 // - see http://tracker.ceph.com/issues/22480
2158 // - From inspection, I think that we have problems if we read *part*
2159 // of the message into an rx_buffer, then drop the lock, someone revokes,
2160 // and then later try to read the rest. In that case our final bufferlist
2161 // will have part of the original static_buffer from the first chunk and
2162 // partly a piece that we allocated. I think that to make this correct,
2163 // we need to keep the bufferlist we are reading into in Connection under
2164 // the lock, and on revoke, if the data is partly read, rebuild() to copy
2165 // into fresh buffers so that all references to our static buffer are
2168 // - Also... what happens if we fully read into the static
2169 // buffer, then revoke? We still have some bufferlist out there
2170 // in the process of getting dispatched back to objecter or
2171 // librados that references the static buffer.
2172 connection_state
->lock
.Lock();
2173 map
<ceph_tid_t
,pair
<bufferlist
,int> >::iterator p
= connection_state
->rx_buffers
.find(header
.tid
);
2174 if (p
!= connection_state
->rx_buffers
.end()) {
2175 if (rxbuf
.length() == 0 || p
->second
.second
!= rxbuf_version
) {
2176 ldout(msgr
->cct
,10) << "reader seleting rx buffer v " << p
->second
.second
2177 << " at offset " << offset
2178 << " len " << p
->second
.first
.length() << dendl
;
2179 rxbuf
= p
->second
.first
;
2180 rxbuf_version
= p
->second
.second
;
2181 // make sure it's big enough
2182 if (rxbuf
.length() < data_len
)
2183 rxbuf
.push_back(buffer::create(data_len
- rxbuf
.length()));
2184 blp
= p
->second
.first
.begin();
2185 blp
.advance(offset
);
2188 if (!newbuf
.length()) {
2189 ldout(msgr
->cct
,20) << "reader allocating new rx buffer at offset " << offset
<< dendl
;
2190 alloc_aligned_buffer(newbuf
, data_len
, data_off
);
2191 blp
= newbuf
.begin();
2192 blp
.advance(offset
);
2195 bufferptr bp
= blp
.get_current_ptr();
2196 int read
= std::min(bp
.length(), left
);
2197 ldout(msgr
->cct
,20) << "reader reading nonblocking into " << (void*)bp
.c_str() << " len " << bp
.length() << dendl
;
2198 ssize_t got
= tcp_read_nonblocking(bp
.c_str(), read
);
2199 ldout(msgr
->cct
,30) << "reader read " << got
<< " of " << read
<< dendl
;
2200 connection_state
->lock
.Unlock();
2202 // rx_buffer-less implementation
2203 if (!newbuf
.length()) {
2204 ldout(msgr
->cct
,20) << "reader allocating new rx buffer at offset "
2206 alloc_aligned_buffer(newbuf
, data_len
, data_off
);
2207 blp
= newbuf
.begin();
2208 blp
.advance(offset
);
2210 bufferptr bp
= blp
.get_current_ptr();
2211 int read
= std::min(bp
.length(), left
);
2212 ldout(msgr
->cct
,20) << "reader reading nonblocking into "
2213 << (void*)bp
.c_str() << " len " << bp
.length()
2215 ssize_t got
= tcp_read_nonblocking(bp
.c_str(), read
);
2216 ldout(msgr
->cct
,30) << "reader read " << got
<< " of " << read
<< dendl
;
2219 goto out_dethrottle
;
2221 blp
.advance(static_cast<size_t>(got
));
2222 data
.append(bp
, 0, got
);
2225 } // else we got a signal or something; just loop.
2230 if (connection_state
->has_feature(CEPH_FEATURE_MSG_AUTH
)) {
2231 if (tcp_read((char*)&footer
, sizeof(footer
)) < 0)
2232 goto out_dethrottle
;
2234 ceph_msg_footer_old old_footer
;
2235 if (tcp_read((char*)&old_footer
, sizeof(old_footer
)) < 0)
2236 goto out_dethrottle
;
2237 footer
.front_crc
= old_footer
.front_crc
;
2238 footer
.middle_crc
= old_footer
.middle_crc
;
2239 footer
.data_crc
= old_footer
.data_crc
;
2241 footer
.flags
= old_footer
.flags
;
2244 aborted
= (footer
.flags
& CEPH_MSG_FOOTER_COMPLETE
) == 0;
2245 ldout(msgr
->cct
,10) << "aborted = " << aborted
<< dendl
;
2247 ldout(msgr
->cct
,0) << "reader got " << front
.length() << " + " << middle
.length() << " + " << data
.length()
2248 << " byte message.. ABORTED" << dendl
;
2250 goto out_dethrottle
;
2253 ldout(msgr
->cct
,20) << "reader got " << front
.length() << " + " << middle
.length() << " + " << data
.length()
2254 << " byte message" << dendl
;
2255 message
= decode_message(msgr
->cct
, msgr
->crcflags
, header
, footer
,
2256 front
, middle
, data
, connection_state
.get());
2259 goto out_dethrottle
;
2263 // Check the signature if one should be present. A zero return indicates success. PLR
2266 if (auth_handler
== NULL
) {
2267 ldout(msgr
->cct
, 10) << "No session security set" << dendl
;
2269 if (auth_handler
->check_message_signature(message
)) {
2270 ldout(msgr
->cct
, 0) << "Signature check failed" << dendl
;
2273 goto out_dethrottle
;
2277 message
->set_byte_throttler(policy
.throttler_bytes
);
2278 message
->set_message_throttler(policy
.throttler_messages
);
2280 // store reservation size in message, so we don't get confused
2281 // by messages entering the dispatch queue through other paths.
2282 message
->set_dispatch_throttle_size(message_size
);
2284 message
->set_recv_stamp(recv_stamp
);
2285 message
->set_throttle_stamp(throttle_stamp
);
2286 message
->set_recv_complete_stamp(ceph_clock_now());
2292 // release bytes reserved from the throttlers on failure
2293 if (policy
.throttler_messages
) {
2294 ldout(msgr
->cct
,10) << "reader releasing " << 1 << " message to policy throttler "
2295 << policy
.throttler_messages
->get_current() << "/"
2296 << policy
.throttler_messages
->get_max() << dendl
;
2297 policy
.throttler_messages
->put();
2300 if (policy
.throttler_bytes
) {
2301 ldout(msgr
->cct
,10) << "reader releasing " << message_size
<< " bytes to policy throttler "
2302 << policy
.throttler_bytes
->get_current() << "/"
2303 << policy
.throttler_bytes
->get_max() << dendl
;
2304 policy
.throttler_bytes
->put(message_size
);
2307 in_q
->dispatch_throttle_release(message_size
);
2312 int Pipe::do_sendmsg(struct msghdr
*msg
, unsigned len
, bool more
)
2314 MSGR_SIGPIPE_STOPPER
;
2317 r
= ::sendmsg(sd
, msg
, MSG_NOSIGNAL
| (more
? MSG_MORE
: 0));
2319 ldout(msgr
->cct
,10) << "do_sendmsg hmm do_sendmsg got r==0!" << dendl
;
2322 ldout(msgr
->cct
,1) << "do_sendmsg error " << cpp_strerror(r
) << dendl
;
2325 if (state
== STATE_CLOSED
) {
2326 ldout(msgr
->cct
,10) << "do_sendmsg oh look, state == CLOSED, giving up" << dendl
;
2327 return -EINTR
; // close enough
2331 if (len
== 0) break;
2333 // hrmph. trim r bytes off the front of our message.
2334 ldout(msgr
->cct
,20) << "do_sendmsg short write did " << r
<< ", still have " << len
<< dendl
;
2336 if (msg
->msg_iov
[0].iov_len
<= (size_t)r
) {
2337 // lose this whole item
2338 //ldout(msgr->cct,30) << "skipping " << msg->msg_iov[0].iov_len << ", " << (msg->msg_iovlen-1) << " v, " << r << " left" << dendl;
2339 r
-= msg
->msg_iov
[0].iov_len
;
2344 //ldout(msgr->cct,30) << "adjusting " << msg->msg_iov[0].iov_len << ", " << msg->msg_iovlen << " v, " << r << " left" << dendl;
2345 msg
->msg_iov
[0].iov_base
= (char *)msg
->msg_iov
[0].iov_base
+ r
;
2346 msg
->msg_iov
[0].iov_len
-= r
;
2355 int Pipe::write_ack(uint64_t seq
)
2357 ldout(msgr
->cct
,10) << "write_ack " << seq
<< dendl
;
2359 char c
= CEPH_MSGR_TAG_ACK
;
2364 memset(&msg
, 0, sizeof(msg
));
2365 struct iovec msgvec
[2];
2366 msgvec
[0].iov_base
= &c
;
2367 msgvec
[0].iov_len
= 1;
2368 msgvec
[1].iov_base
= &s
;
2369 msgvec
[1].iov_len
= sizeof(s
);
2370 msg
.msg_iov
= msgvec
;
2373 if (do_sendmsg(&msg
, 1 + sizeof(s
), true) < 0)
2378 int Pipe::write_keepalive()
2380 ldout(msgr
->cct
,10) << "write_keepalive" << dendl
;
2382 char c
= CEPH_MSGR_TAG_KEEPALIVE
;
2385 memset(&msg
, 0, sizeof(msg
));
2386 struct iovec msgvec
[2];
2387 msgvec
[0].iov_base
= &c
;
2388 msgvec
[0].iov_len
= 1;
2389 msg
.msg_iov
= msgvec
;
2392 if (do_sendmsg(&msg
, 1) < 0)
2397 int Pipe::write_keepalive2(char tag
, const utime_t
& t
)
2399 ldout(msgr
->cct
,10) << "write_keepalive2 " << (int)tag
<< " " << t
<< dendl
;
2400 struct ceph_timespec ts
;
2401 t
.encode_timeval(&ts
);
2403 memset(&msg
, 0, sizeof(msg
));
2404 struct iovec msgvec
[2];
2405 msgvec
[0].iov_base
= &tag
;
2406 msgvec
[0].iov_len
= 1;
2407 msgvec
[1].iov_base
= &ts
;
2408 msgvec
[1].iov_len
= sizeof(ts
);
2409 msg
.msg_iov
= msgvec
;
2412 if (do_sendmsg(&msg
, 1 + sizeof(ts
)) < 0)
2418 int Pipe::write_message(const ceph_msg_header
& header
, const ceph_msg_footer
& footer
, bufferlist
& blist
)
2422 // set up msghdr and iovecs
2424 memset(&msg
, 0, sizeof(msg
));
2425 msg
.msg_iov
= msgvec
;
2429 char tag
= CEPH_MSGR_TAG_MSG
;
2430 msgvec
[msg
.msg_iovlen
].iov_base
= &tag
;
2431 msgvec
[msg
.msg_iovlen
].iov_len
= 1;
2436 msgvec
[msg
.msg_iovlen
].iov_base
= (char*)&header
;
2437 msgvec
[msg
.msg_iovlen
].iov_len
= sizeof(header
);
2438 msglen
+= sizeof(header
);
2441 // payload (front+data)
2442 auto pb
= std::cbegin(blist
.buffers());
2443 unsigned b_off
= 0; // carry-over buffer offset, if any
2444 unsigned bl_pos
= 0; // blist pos
2445 unsigned left
= blist
.length();
2448 unsigned donow
= std::min(left
, pb
->length()-b_off
);
2450 ldout(msgr
->cct
,0) << "donow = " << donow
<< " left " << left
<< " pb->length " << pb
->length()
2451 << " b_off " << b_off
<< dendl
;
2453 ceph_assert(donow
> 0);
2454 ldout(msgr
->cct
,30) << " bl_pos " << bl_pos
<< " b_off " << b_off
2455 << " leftinchunk " << left
2456 << " buffer len " << pb
->length()
2457 << " writing " << donow
2460 if (msg
.msg_iovlen
>= SM_IOV_MAX
-2) {
2461 if (do_sendmsg(&msg
, msglen
, true))
2464 // and restart the iov
2465 msg
.msg_iov
= msgvec
;
2470 msgvec
[msg
.msg_iovlen
].iov_base
= (void*)(pb
->c_str()+b_off
);
2471 msgvec
[msg
.msg_iovlen
].iov_len
= donow
;
2475 ceph_assert(left
>= donow
);
2481 while (b_off
== pb
->length()) {
2486 ceph_assert(left
== 0);
2488 // send footer; if receiver doesn't support signatures, use the old footer format
2490 ceph_msg_footer_old old_footer
;
2491 if (connection_state
->has_feature(CEPH_FEATURE_MSG_AUTH
)) {
2492 msgvec
[msg
.msg_iovlen
].iov_base
= (void*)&footer
;
2493 msgvec
[msg
.msg_iovlen
].iov_len
= sizeof(footer
);
2494 msglen
+= sizeof(footer
);
2497 if (msgr
->crcflags
& MSG_CRC_HEADER
) {
2498 old_footer
.front_crc
= footer
.front_crc
;
2499 old_footer
.middle_crc
= footer
.middle_crc
;
2501 old_footer
.front_crc
= old_footer
.middle_crc
= 0;
2503 old_footer
.data_crc
= msgr
->crcflags
& MSG_CRC_DATA
? footer
.data_crc
: 0;
2504 old_footer
.flags
= footer
.flags
;
2505 msgvec
[msg
.msg_iovlen
].iov_base
= (char*)&old_footer
;
2506 msgvec
[msg
.msg_iovlen
].iov_len
= sizeof(old_footer
);
2507 msglen
+= sizeof(old_footer
);
2512 if (do_sendmsg(&msg
, msglen
))
2526 int Pipe::tcp_read(char *buf
, unsigned len
)
2533 if (msgr
->cct
->_conf
->ms_inject_socket_failures
&& sd
>= 0) {
2534 if (rand() % msgr
->cct
->_conf
->ms_inject_socket_failures
== 0) {
2535 ldout(msgr
->cct
, 0) << "injecting socket failure" << dendl
;
2536 ::shutdown(sd
, SHUT_RDWR
);
2540 if (tcp_read_wait() < 0)
2543 ssize_t got
= tcp_read_nonblocking(buf
, len
);
2550 //lgeneric_dout(cct, DBL) << "tcp_read got " << got << ", " << len << " left" << dendl;
2555 int Pipe::tcp_read_wait()
2562 pfd
.events
= POLLIN
;
2563 #if defined(__linux__)
2564 pfd
.events
|= POLLRDHUP
;
2567 if (has_pending_data())
2570 int r
= poll(&pfd
, 1, msgr
->timeout
);
2576 evmask
= POLLERR
| POLLHUP
| POLLNVAL
;
2577 #if defined(__linux__)
2578 evmask
|= POLLRDHUP
;
2580 if (pfd
.revents
& evmask
)
2583 if (!(pfd
.revents
& POLLIN
))
2589 ssize_t
Pipe::do_recv(char *buf
, size_t len
, int flags
)
2592 ssize_t got
= ::recv( sd
, buf
, len
, flags
);
2594 if (errno
== EINTR
) {
2597 ldout(msgr
->cct
, 10) << __func__
<< " socket " << sd
<< " returned "
2598 << got
<< " " << cpp_strerror(errno
) << dendl
;
2607 ssize_t
Pipe::buffered_recv(char *buf
, size_t len
, int flags
)
2610 ssize_t total_recv
= 0;
2611 if (recv_len
> recv_ofs
) {
2612 int to_read
= std::min(recv_len
- recv_ofs
, left
);
2613 memcpy(buf
, &recv_buf
[recv_ofs
], to_read
);
2614 recv_ofs
+= to_read
;
2620 total_recv
+= to_read
;
2623 /* nothing left in the prefetch buffer */
2625 if (left
> recv_max_prefetch
) {
2626 /* this was a large read, we don't prefetch for these */
2627 ssize_t ret
= do_recv(buf
, left
, flags
);
2638 ssize_t got
= do_recv(recv_buf
, recv_max_prefetch
, flags
);
2646 recv_len
= (size_t)got
;
2647 got
= std::min(left
, (size_t)got
);
2648 memcpy(buf
, recv_buf
, got
);
2654 ssize_t
Pipe::tcp_read_nonblocking(char *buf
, unsigned len
)
2656 ssize_t got
= buffered_recv(buf
, len
, MSG_DONTWAIT
);
2658 ldout(msgr
->cct
, 10) << __func__
<< " socket " << sd
<< " returned "
2659 << got
<< " " << cpp_strerror(errno
) << dendl
;
2663 /* poll() said there was data, but we didn't read any - peer
2664 * sent a FIN. Maybe POLLRDHUP signals this, but this is
2665 * standard socket behavior as documented by Stevens.
2672 int Pipe::tcp_write(const char *buf
, unsigned len
)
2678 pfd
.events
= POLLOUT
| POLLHUP
| POLLNVAL
| POLLERR
;
2679 #if defined(__linux__)
2680 pfd
.events
|= POLLRDHUP
;
2683 if (msgr
->cct
->_conf
->ms_inject_socket_failures
&& sd
>= 0) {
2684 if (rand() % msgr
->cct
->_conf
->ms_inject_socket_failures
== 0) {
2685 ldout(msgr
->cct
, 0) << "injecting socket failure" << dendl
;
2686 ::shutdown(sd
, SHUT_RDWR
);
2690 if (poll(&pfd
, 1, -1) < 0)
2693 if (!(pfd
.revents
& POLLOUT
))
2696 //lgeneric_dout(cct, DBL) << "tcp_write writing " << len << dendl;
2697 ceph_assert(len
> 0);
2699 MSGR_SIGPIPE_STOPPER
;
2700 int did
= ::send( sd
, buf
, len
, MSG_NOSIGNAL
);
2702 //lgeneric_dout(cct, 1) << "tcp_write error did = " << did << " " << cpp_strerror(errno) << dendl;
2703 //lgeneric_derr(cct, 1) << "tcp_write error did = " << did << " " << cpp_strerror(errno) << dendl;
2708 //lgeneric_dout(cct, DBL) << "tcp_write did " << did << ", " << len << " left" << dendl;