1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
15 #include <sys/types.h>
16 #include <sys/socket.h>
17 #include <netinet/in.h>
18 #include <netinet/ip.h>
19 #include <netinet/tcp.h>
24 #include "msg/Message.h"
26 #include "SimpleMessenger.h"
28 #include "common/debug.h"
29 #include "common/errno.h"
30 #include "common/valgrind.h"
32 // Below included to get encode_encrypt(); That probably should be in Crypto.h, instead
34 #include "auth/Crypto.h"
35 #include "auth/cephx/CephxProtocol.h"
36 #include "auth/AuthSessionHandler.h"
38 #include "include/compat.h"
39 #include "include/sock_compat.h"
41 // Constant to limit starting sequence number to 2^31. Nothing special about it, just a big number. PLR
42 #define SEQ_MASK 0x7fffffff
43 #define dout_subsys ceph_subsys_ms
46 #define dout_prefix *_dout << *this
47 ostream
& Pipe::_pipe_prefix(std::ostream
&out
) const {
48 return out
<< "-- " << msgr
->get_myinst().addr
<< " >> " << peer_addr
<< " pipe(" << this
49 << " sd=" << sd
<< " :" << port
51 << " pgs=" << peer_global_seq
52 << " cs=" << connect_seq
53 << " l=" << policy
.lossy
54 << " c=" << connection_state
58 ostream
& operator<<(ostream
&out
, const Pipe
&pipe
) {
59 return pipe
._pipe_prefix(out
);
63 * The DelayedDelivery is for injecting delays into Message delivery off
64 * the socket. It is only enabled if delays are requested, and if they
65 * are then it pulls Messages off the DelayQueue and puts them into the
66 * in_q (SimpleMessenger::dispatch_queue).
67 * Please note that this probably has issues with Pipe shutdown and
68 * replacement semantics. I've tried, but no guarantees.
70 class Pipe::DelayedDelivery
: public Thread
{
72 std::deque
< pair
<utime_t
,Message
*> > delay_queue
;
77 bool stop_delayed_delivery
;
78 bool delay_dispatching
; // we are in fast dispatch now
79 bool stop_fast_dispatching_flag
; // we need to stop fast dispatching
82 explicit DelayedDelivery(Pipe
*p
)
84 delay_lock("Pipe::DelayedDelivery::delay_lock"), flush_count(0),
86 stop_delayed_delivery(false),
87 delay_dispatching(false),
88 stop_fast_dispatching_flag(false) { }
89 ~DelayedDelivery() override
{
92 void *entry() override
;
93 void queue(utime_t release
, Message
*m
) {
94 Mutex::Locker
l(delay_lock
);
95 delay_queue
.push_back(make_pair(release
, m
));
101 Mutex::Locker
l(delay_lock
);
102 return flush_count
> 0 || active_flush
;
104 void wait_for_flush() {
105 Mutex::Locker
l(delay_lock
);
106 while (flush_count
> 0 || active_flush
)
107 delay_cond
.Wait(delay_lock
);
111 stop_delayed_delivery
= true;
115 void steal_for_pipe(Pipe
*new_owner
) {
116 Mutex::Locker
l(delay_lock
);
120 * We need to stop fast dispatching before we need to stop putting
121 * normal messages into the DispatchQueue.
123 void stop_fast_dispatching();
126 /**************************************
130 Pipe::Pipe(SimpleMessenger
*r
, int st
, PipeConnection
*con
)
131 : RefCountedObject(r
->cct
),
136 conn_id(r
->dispatch_queue
.get_id()),
141 pipe_lock("SimpleMessenger::Pipe::pipe_lock"),
143 connection_state(NULL
),
144 reader_running(false), reader_needs_join(false),
145 reader_dispatching(false), notify_on_dispatch_done(false),
146 writer_running(false),
147 in_q(&(r
->dispatch_queue
)),
148 send_keepalive(false),
149 send_keepalive_ack(false),
150 connect_seq(0), peer_global_seq(0),
151 out_seq(0), in_seq(0), in_seq_acked(0) {
152 ANNOTATE_BENIGN_RACE_SIZED(&sd
, sizeof(sd
), "Pipe socket");
153 ANNOTATE_BENIGN_RACE_SIZED(&state
, sizeof(state
), "Pipe state");
154 ANNOTATE_BENIGN_RACE_SIZED(&recv_len
, sizeof(recv_len
), "Pipe recv_len");
155 ANNOTATE_BENIGN_RACE_SIZED(&recv_ofs
, sizeof(recv_ofs
), "Pipe recv_ofs");
157 connection_state
= con
;
158 connection_state
->reset_pipe(this);
160 connection_state
= new PipeConnection(msgr
->cct
, msgr
);
161 connection_state
->pipe
= get();
164 if (randomize_out_seq()) {
165 lsubdout(msgr
->cct
,ms
,15) << "Pipe(): Could not get random bytes to set seq number for session reset; set seq number to " << out_seq
<< dendl
;
169 msgr
->timeout
= msgr
->cct
->_conf
->ms_tcp_read_timeout
* 1000; //convert to ms
170 if (msgr
->timeout
== 0)
173 recv_max_prefetch
= msgr
->cct
->_conf
->ms_tcp_prefetch_max_size
;
174 recv_buf
= new char[recv_max_prefetch
];
179 assert(out_q
.empty());
180 assert(sent
.empty());
185 void Pipe::handle_ack(uint64_t seq
)
187 lsubdout(msgr
->cct
, ms
, 15) << "reader got ack seq " << seq
<< dendl
;
189 while (!sent
.empty() &&
190 sent
.front()->get_seq() <= seq
) {
191 Message
*m
= sent
.front();
193 lsubdout(msgr
->cct
, ms
, 10) << "reader got ack seq "
194 << seq
<< " >= " << m
->get_seq() << " on " << m
<< " " << *m
<< dendl
;
199 void Pipe::start_reader()
201 assert(pipe_lock
.is_locked());
202 assert(!reader_running
);
203 if (reader_needs_join
) {
204 reader_thread
.join();
205 reader_needs_join
= false;
207 reader_running
= true;
208 reader_thread
.create("ms_pipe_read", msgr
->cct
->_conf
->ms_rwthread_stack_bytes
);
211 void Pipe::maybe_start_delay_thread()
214 auto pos
= msgr
->cct
->_conf
->get_val
<std::string
>("ms_inject_delay_type").find(ceph_entity_type_name(connection_state
->peer_type
));
215 if (pos
!= string::npos
) {
216 lsubdout(msgr
->cct
, ms
, 1) << "setting up a delay queue on Pipe " << this << dendl
;
217 delay_thread
= new DelayedDelivery(this);
218 delay_thread
->create("ms_pipe_delay");
223 void Pipe::start_writer()
225 assert(pipe_lock
.is_locked());
226 assert(!writer_running
);
227 writer_running
= true;
228 writer_thread
.create("ms_pipe_write", msgr
->cct
->_conf
->ms_rwthread_stack_bytes
);
231 void Pipe::join_reader()
237 reader_thread
.join();
239 reader_needs_join
= false;
242 void Pipe::DelayedDelivery::discard()
244 lgeneric_subdout(pipe
->msgr
->cct
, ms
, 20) << *pipe
<< "DelayedDelivery::discard" << dendl
;
245 Mutex::Locker
l(delay_lock
);
246 while (!delay_queue
.empty()) {
247 Message
*m
= delay_queue
.front().second
;
248 pipe
->in_q
->dispatch_throttle_release(m
->get_dispatch_throttle_size());
250 delay_queue
.pop_front();
254 void Pipe::DelayedDelivery::flush()
256 lgeneric_subdout(pipe
->msgr
->cct
, ms
, 20) << *pipe
<< "DelayedDelivery::flush" << dendl
;
257 Mutex::Locker
l(delay_lock
);
258 flush_count
= delay_queue
.size();
262 void *Pipe::DelayedDelivery::entry()
264 Mutex::Locker
locker(delay_lock
);
265 lgeneric_subdout(pipe
->msgr
->cct
, ms
, 20) << *pipe
<< "DelayedDelivery::entry start" << dendl
;
267 while (!stop_delayed_delivery
) {
268 if (delay_queue
.empty()) {
269 lgeneric_subdout(pipe
->msgr
->cct
, ms
, 30) << *pipe
<< "DelayedDelivery::entry sleeping on delay_cond because delay queue is empty" << dendl
;
270 delay_cond
.Wait(delay_lock
);
273 utime_t release
= delay_queue
.front().first
;
274 Message
*m
= delay_queue
.front().second
;
275 string delay_msg_type
= pipe
->msgr
->cct
->_conf
->ms_inject_delay_msg_type
;
277 (release
> ceph_clock_now() &&
278 (delay_msg_type
.empty() || m
->get_type_name() == delay_msg_type
))) {
279 lgeneric_subdout(pipe
->msgr
->cct
, ms
, 10) << *pipe
<< "DelayedDelivery::entry sleeping on delay_cond until " << release
<< dendl
;
280 delay_cond
.WaitUntil(delay_lock
, release
);
283 lgeneric_subdout(pipe
->msgr
->cct
, ms
, 10) << *pipe
<< "DelayedDelivery::entry dequeuing message " << m
<< " for delivery, past " << release
<< dendl
;
284 delay_queue
.pop_front();
285 if (flush_count
> 0) {
289 if (pipe
->in_q
->can_fast_dispatch(m
)) {
290 if (!stop_fast_dispatching_flag
) {
291 delay_dispatching
= true;
293 pipe
->in_q
->fast_dispatch(m
);
295 delay_dispatching
= false;
296 if (stop_fast_dispatching_flag
) {
297 // we need to let the stopping thread proceed
304 pipe
->in_q
->enqueue(m
, m
->get_priority(), pipe
->conn_id
);
306 active_flush
= false;
308 lgeneric_subdout(pipe
->msgr
->cct
, ms
, 20) << *pipe
<< "DelayedDelivery::entry stop" << dendl
;
312 void Pipe::DelayedDelivery::stop_fast_dispatching() {
313 Mutex::Locker
l(delay_lock
);
314 stop_fast_dispatching_flag
= true;
315 while (delay_dispatching
)
316 delay_cond
.Wait(delay_lock
);
322 ldout(msgr
->cct
,10) << "accept" << dendl
;
323 assert(pipe_lock
.is_locked());
324 assert(state
== STATE_ACCEPTING
);
330 entity_addr_t socket_addr
;
333 char banner
[strlen(CEPH_BANNER
)+1];
335 ceph_msg_connect connect
;
336 ceph_msg_connect_reply reply
;
339 bufferlist authorizer
, authorizer_reply
;
340 bool authorizer_valid
;
341 uint64_t feat_missing
;
342 bool replaced
= false;
343 // this variable denotes if the connection attempt from peer is a hard
344 // reset or not, it is true if there is an existing connection and the
345 // connection sequence from peer is equal to zero
346 bool is_reset_from_peer
= false;
347 CryptoKey session_key
;
348 int removed
; // single-use down below
350 // this should roughly mirror pseudocode at
351 // http://ceph.com/wiki/Messaging_protocol
353 uint64_t existing_seq
= -1;
355 // used for reading in the remote acked seq on connect
356 uint64_t newly_acked_seq
= 0;
358 bool need_challenge
= false;
359 bool had_challenge
= false;
360 std::unique_ptr
<AuthAuthorizerChallenge
> authorizer_challenge
;
364 set_socket_options();
367 r
= tcp_write(CEPH_BANNER
, strlen(CEPH_BANNER
));
369 ldout(msgr
->cct
,10) << "accept couldn't write banner" << dendl
;
374 ::encode(msgr
->my_inst
.addr
, addrs
, 0); // legacy
376 port
= msgr
->my_inst
.addr
.get_port();
378 // and peer's socket addr (they might not know their ip)
381 r
= ::getpeername(sd
, (sockaddr
*)&ss
, &len
);
383 ldout(msgr
->cct
,0) << "accept failed to getpeername " << cpp_strerror(errno
) << dendl
;
386 socket_addr
.set_sockaddr((sockaddr
*)&ss
);
387 ::encode(socket_addr
, addrs
, 0); // legacy
389 r
= tcp_write(addrs
.c_str(), addrs
.length());
391 ldout(msgr
->cct
,10) << "accept couldn't write my+peer addr" << dendl
;
395 ldout(msgr
->cct
,1) << "accept sd=" << sd
<< " " << socket_addr
<< dendl
;
398 if (tcp_read(banner
, strlen(CEPH_BANNER
)) < 0) {
399 ldout(msgr
->cct
,10) << "accept couldn't read banner" << dendl
;
402 if (memcmp(banner
, CEPH_BANNER
, strlen(CEPH_BANNER
))) {
403 banner
[strlen(CEPH_BANNER
)] = 0;
404 ldout(msgr
->cct
,1) << "accept peer sent bad banner '" << banner
<< "' (should be '" << CEPH_BANNER
<< "')" << dendl
;
408 bufferptr
tp(sizeof(ceph_entity_addr
));
409 addrbl
.push_back(std::move(tp
));
411 if (tcp_read(addrbl
.c_str(), addrbl
.length()) < 0) {
412 ldout(msgr
->cct
,10) << "accept couldn't read peer_addr" << dendl
;
416 bufferlist::iterator ti
= addrbl
.begin();
417 ::decode(peer_addr
, ti
);
418 } catch (const buffer::error
& e
) {
419 ldout(msgr
->cct
,2) << __func__
<< " decode peer_addr failed: " << e
.what()
424 ldout(msgr
->cct
,10) << "accept peer addr is " << peer_addr
<< dendl
;
425 if (peer_addr
.is_blank_ip()) {
426 // peer apparently doesn't know what ip they have; figure it out for them.
427 int port
= peer_addr
.get_port();
428 peer_addr
.u
= socket_addr
.u
;
429 peer_addr
.set_port(port
);
430 ldout(msgr
->cct
,0) << "accept peer addr is really " << peer_addr
431 << " (socket is " << socket_addr
<< ")" << dendl
;
433 set_peer_addr(peer_addr
); // so that connection_state gets set up
436 if (tcp_read((char*)&connect
, sizeof(connect
)) < 0) {
437 ldout(msgr
->cct
,10) << "accept couldn't read connect" << dendl
;
442 if (connect
.authorizer_len
) {
443 bp
= buffer::create(connect
.authorizer_len
);
444 if (tcp_read(bp
.c_str(), connect
.authorizer_len
) < 0) {
445 ldout(msgr
->cct
,10) << "accept couldn't read connect authorizer" << dendl
;
448 authorizer
.push_back(std::move(bp
));
449 authorizer_reply
.clear();
452 ldout(msgr
->cct
,20) << "accept got peer connect_seq " << connect
.connect_seq
453 << " global_seq " << connect
.global_seq
456 msgr
->lock
.Lock(); // FIXME
458 if (msgr
->dispatch_queue
.stop
)
460 if (state
!= STATE_ACCEPTING
) {
464 // note peer's type, flags
465 set_peer_type(connect
.host_type
);
466 policy
= msgr
->get_policy(connect
.host_type
);
467 ldout(msgr
->cct
,10) << "accept of host_type " << connect
.host_type
468 << ", policy.lossy=" << policy
.lossy
469 << " policy.server=" << policy
.server
470 << " policy.standby=" << policy
.standby
471 << " policy.resetcheck=" << policy
.resetcheck
474 memset(&reply
, 0, sizeof(reply
));
475 reply
.protocol_version
= msgr
->get_proto_version(peer_type
, false);
479 ldout(msgr
->cct
,10) << "accept my proto " << reply
.protocol_version
480 << ", their proto " << connect
.protocol_version
<< dendl
;
481 if (connect
.protocol_version
!= reply
.protocol_version
) {
482 reply
.tag
= CEPH_MSGR_TAG_BADPROTOVER
;
486 // require signatures for cephx?
487 if (connect
.authorizer_protocol
== CEPH_AUTH_CEPHX
) {
488 if (peer_type
== CEPH_ENTITY_TYPE_OSD
||
489 peer_type
== CEPH_ENTITY_TYPE_MDS
||
490 peer_type
== CEPH_ENTITY_TYPE_MGR
) {
491 if (msgr
->cct
->_conf
->cephx_require_signatures
||
492 msgr
->cct
->_conf
->cephx_cluster_require_signatures
) {
493 ldout(msgr
->cct
,10) << "using cephx, requiring MSG_AUTH feature bit for cluster" << dendl
;
494 policy
.features_required
|= CEPH_FEATURE_MSG_AUTH
;
496 if (msgr
->cct
->_conf
->cephx_require_version
>= 2 ||
497 msgr
->cct
->_conf
->cephx_cluster_require_version
>= 2) {
498 ldout(msgr
->cct
,10) << "using cephx, requiring cephx v2 feature bit for cluster" << dendl
;
499 policy
.features_required
|= CEPH_FEATUREMASK_CEPHX_V2
;
502 if (msgr
->cct
->_conf
->cephx_require_signatures
||
503 msgr
->cct
->_conf
->cephx_service_require_signatures
) {
504 ldout(msgr
->cct
,10) << "using cephx, requiring MSG_AUTH feature bit for service" << dendl
;
505 policy
.features_required
|= CEPH_FEATURE_MSG_AUTH
;
507 if (msgr
->cct
->_conf
->cephx_require_version
>= 2 ||
508 msgr
->cct
->_conf
->cephx_service_require_version
>= 2) {
509 ldout(msgr
->cct
,10) << "using cephx, requiring cephx v2 feature bit for cluster" << dendl
;
510 policy
.features_required
|= CEPH_FEATUREMASK_CEPHX_V2
;
515 feat_missing
= policy
.features_required
& ~(uint64_t)connect
.features
;
517 ldout(msgr
->cct
,1) << "peer missing required features " << std::hex
<< feat_missing
<< std::dec
<< dendl
;
518 reply
.tag
= CEPH_MSGR_TAG_FEATURES
;
522 // Check the authorizer. If not good, bail out.
526 need_challenge
= HAVE_FEATURE(connect
.features
, CEPHX_V2
);
527 had_challenge
= (bool)authorizer_challenge
;
528 authorizer_reply
.clear();
529 if (!msgr
->verify_authorizer(
530 connection_state
.get(), peer_type
, connect
.authorizer_protocol
, authorizer
,
531 authorizer_reply
, authorizer_valid
, session_key
,
532 need_challenge
? &authorizer_challenge
: nullptr) ||
535 if (state
!= STATE_ACCEPTING
)
536 goto shutting_down_msgr_unlocked
;
537 if (!had_challenge
&& need_challenge
&& authorizer_challenge
) {
538 ldout(msgr
->cct
,10) << "accept: challenging authorizer "
539 << authorizer_reply
.length()
540 << " bytes" << dendl
;
541 assert(authorizer_reply
.length());
542 reply
.tag
= CEPH_MSGR_TAG_CHALLENGE_AUTHORIZER
;
544 ldout(msgr
->cct
,0) << "accept: got bad authorizer" << dendl
;
545 reply
.tag
= CEPH_MSGR_TAG_BADAUTHORIZER
;
547 session_security
.reset();
551 // We've verified the authorizer for this pipe, so set up the session security structure. PLR
553 ldout(msgr
->cct
,10) << "accept: setting up session_security." << dendl
;
555 retry_existing_lookup
:
558 if (msgr
->dispatch_queue
.stop
)
560 if (state
!= STATE_ACCEPTING
)
564 existing
= msgr
->_lookup_pipe(peer_addr
);
566 existing
->pipe_lock
.Lock(true); // skip lockdep check (we are locking a second Pipe here)
567 if (existing
->reader_dispatching
) {
568 /** we need to wait, or we can deadlock if downstream
569 * fast_dispatchers are (naughtily!) waiting on resources
570 * held by somebody trying to make use of the SimpleMessenger lock.
571 * So drop locks, wait, and retry. It just looks like a slow network
574 * We take a ref to existing here since it might get reaped before we
575 * wake up (see bug #15870). We can be confident that it lived until
576 * locked it since we held the msgr lock from _lookup_pipe through to
577 * locking existing->lock and checking reader_dispatching.
582 existing
->notify_on_dispatch_done
= true;
583 while (existing
->reader_dispatching
)
584 existing
->cond
.Wait(existing
->pipe_lock
);
585 existing
->pipe_lock
.Unlock();
588 goto retry_existing_lookup
;
591 if (connect
.global_seq
< existing
->peer_global_seq
) {
592 ldout(msgr
->cct
,10) << "accept existing " << existing
<< ".gseq " << existing
->peer_global_seq
593 << " > " << connect
.global_seq
<< ", RETRY_GLOBAL" << dendl
;
594 reply
.tag
= CEPH_MSGR_TAG_RETRY_GLOBAL
;
595 reply
.global_seq
= existing
->peer_global_seq
; // so we can send it below..
596 existing
->pipe_lock
.Unlock();
600 ldout(msgr
->cct
,10) << "accept existing " << existing
<< ".gseq " << existing
->peer_global_seq
601 << " <= " << connect
.global_seq
<< ", looks ok" << dendl
;
604 if (existing
->policy
.lossy
) {
605 ldout(msgr
->cct
,0) << "accept replacing existing (lossy) channel (new one lossy="
606 << policy
.lossy
<< ")" << dendl
;
607 existing
->was_session_reset();
611 ldout(msgr
->cct
,0) << "accept connect_seq " << connect
.connect_seq
612 << " vs existing " << existing
->connect_seq
613 << " state " << existing
->get_state_name() << dendl
;
615 if (connect
.connect_seq
== 0 && existing
->connect_seq
> 0) {
616 ldout(msgr
->cct
,0) << "accept peer reset, then tried to connect to us, replacing" << dendl
;
617 // this is a hard reset from peer
618 is_reset_from_peer
= true;
619 if (policy
.resetcheck
)
620 existing
->was_session_reset(); // this resets out_queue, msg_ and connect_seq #'s
624 if (connect
.connect_seq
< existing
->connect_seq
) {
625 // old attempt, or we sent READY but they didn't get it.
626 ldout(msgr
->cct
,10) << "accept existing " << existing
<< ".cseq " << existing
->connect_seq
627 << " > " << connect
.connect_seq
<< ", RETRY_SESSION" << dendl
;
631 if (connect
.connect_seq
== existing
->connect_seq
) {
632 // if the existing connection successfully opened, and/or
633 // subsequently went to standby, then the peer should bump
634 // their connect_seq and retry: this is not a connection race
635 // we need to resolve here.
636 if (existing
->state
== STATE_OPEN
||
637 existing
->state
== STATE_STANDBY
) {
638 ldout(msgr
->cct
,10) << "accept connection race, existing " << existing
639 << ".cseq " << existing
->connect_seq
640 << " == " << connect
.connect_seq
641 << ", OPEN|STANDBY, RETRY_SESSION" << dendl
;
646 if (peer_addr
< msgr
->my_inst
.addr
||
647 existing
->policy
.server
) {
649 ldout(msgr
->cct
,10) << "accept connection race, existing " << existing
<< ".cseq " << existing
->connect_seq
650 << " == " << connect
.connect_seq
<< ", or we are server, replacing my attempt" << dendl
;
651 if (!(existing
->state
== STATE_CONNECTING
||
652 existing
->state
== STATE_WAIT
))
653 lderr(msgr
->cct
) << "accept race bad state, would replace, existing="
654 << existing
->get_state_name()
655 << " " << existing
<< ".cseq=" << existing
->connect_seq
656 << " == " << connect
.connect_seq
658 assert(existing
->state
== STATE_CONNECTING
||
659 existing
->state
== STATE_WAIT
);
662 // our existing outgoing wins
663 ldout(msgr
->cct
,10) << "accept connection race, existing " << existing
<< ".cseq " << existing
->connect_seq
664 << " == " << connect
.connect_seq
<< ", sending WAIT" << dendl
;
665 assert(peer_addr
> msgr
->my_inst
.addr
);
666 if (!(existing
->state
== STATE_CONNECTING
))
667 lderr(msgr
->cct
) << "accept race bad state, would send wait, existing="
668 << existing
->get_state_name()
669 << " " << existing
<< ".cseq=" << existing
->connect_seq
670 << " == " << connect
.connect_seq
672 assert(existing
->state
== STATE_CONNECTING
);
673 // make sure our outgoing connection will follow through
674 existing
->_send_keepalive();
675 reply
.tag
= CEPH_MSGR_TAG_WAIT
;
676 existing
->pipe_lock
.Unlock();
682 assert(connect
.connect_seq
> existing
->connect_seq
);
683 assert(connect
.global_seq
>= existing
->peer_global_seq
);
684 if (policy
.resetcheck
&& // RESETSESSION only used by servers; peers do not reset each other
685 existing
->connect_seq
== 0) {
686 ldout(msgr
->cct
,0) << "accept we reset (peer sent cseq " << connect
.connect_seq
687 << ", " << existing
<< ".cseq = " << existing
->connect_seq
688 << "), sending RESETSESSION" << dendl
;
689 reply
.tag
= CEPH_MSGR_TAG_RESETSESSION
;
691 existing
->pipe_lock
.Unlock();
696 ldout(msgr
->cct
,10) << "accept peer sent cseq " << connect
.connect_seq
697 << " > " << existing
->connect_seq
<< dendl
;
700 else if (connect
.connect_seq
> 0) {
701 // we reset, and they are opening a new session
702 ldout(msgr
->cct
,0) << "accept we reset (peer sent cseq " << connect
.connect_seq
<< "), sending RESETSESSION" << dendl
;
704 reply
.tag
= CEPH_MSGR_TAG_RESETSESSION
;
708 ldout(msgr
->cct
,10) << "accept new session" << dendl
;
715 assert(existing
->pipe_lock
.is_locked());
716 assert(pipe_lock
.is_locked());
717 reply
.tag
= CEPH_MSGR_TAG_RETRY_SESSION
;
718 reply
.connect_seq
= existing
->connect_seq
+ 1;
719 existing
->pipe_lock
.Unlock();
724 assert(pipe_lock
.is_locked());
725 reply
.features
= ((uint64_t)connect
.features
& policy
.features_supported
) | policy
.features_required
;
726 reply
.authorizer_len
= authorizer_reply
.length();
728 r
= tcp_write((char*)&reply
, sizeof(reply
));
731 if (reply
.authorizer_len
) {
732 r
= tcp_write(authorizer_reply
.c_str(), authorizer_reply
.length());
739 assert(existing
->pipe_lock
.is_locked());
740 assert(pipe_lock
.is_locked());
741 // if it is a hard reset from peer, we don't need a round-trip to negotiate in/out sequence
742 if ((connect
.features
& CEPH_FEATURE_RECONNECT_SEQ
) && !is_reset_from_peer
) {
743 reply_tag
= CEPH_MSGR_TAG_SEQ
;
744 existing_seq
= existing
->in_seq
;
746 ldout(msgr
->cct
,10) << "accept replacing " << existing
<< dendl
;
748 existing
->unregister_pipe();
751 if (existing
->policy
.lossy
) {
752 // disconnect from the Connection
753 assert(existing
->connection_state
);
754 if (existing
->connection_state
->clear_pipe(existing
))
755 msgr
->dispatch_queue
.queue_reset(existing
->connection_state
.get());
757 // queue a reset on the new connection, which we're dumping for the old
758 msgr
->dispatch_queue
.queue_reset(connection_state
.get());
760 // drop my Connection, and take a ref to the existing one. do not
761 // clear existing->connection_state, since read_message and
762 // write_message both dereference it without pipe_lock.
763 connection_state
= existing
->connection_state
;
765 // make existing Connection reference us
766 connection_state
->reset_pipe(this);
768 if (existing
->delay_thread
) {
769 existing
->delay_thread
->steal_for_pipe(this);
770 delay_thread
= existing
->delay_thread
;
771 existing
->delay_thread
= NULL
;
772 delay_thread
->flush();
775 // steal incoming queue
776 uint64_t replaced_conn_id
= conn_id
;
777 conn_id
= existing
->conn_id
;
778 existing
->conn_id
= replaced_conn_id
;
780 // reset the in_seq if this is a hard reset from peer,
781 // otherwise we respect our original connection's value
782 in_seq
= is_reset_from_peer
? 0 : existing
->in_seq
;
783 in_seq_acked
= in_seq
;
785 // steal outgoing queue and out_seq
786 existing
->requeue_sent();
787 out_seq
= existing
->out_seq
;
788 ldout(msgr
->cct
,10) << "accept re-queuing on out_seq " << out_seq
<< " in_seq " << in_seq
<< dendl
;
789 for (map
<int, list
<Message
*> >::iterator p
= existing
->out_q
.begin();
790 p
!= existing
->out_q
.end();
792 out_q
[p
->first
].splice(out_q
[p
->first
].begin(), p
->second
);
794 existing
->stop_and_wait();
795 existing
->pipe_lock
.Unlock();
799 assert(pipe_lock
.is_locked());
800 connect_seq
= connect
.connect_seq
+ 1;
801 peer_global_seq
= connect
.global_seq
;
802 assert(state
== STATE_ACCEPTING
);
804 ldout(msgr
->cct
,10) << "accept success, connect_seq = " << connect_seq
<< ", sending READY" << dendl
;
807 reply
.tag
= (reply_tag
? reply_tag
: CEPH_MSGR_TAG_READY
);
808 reply
.features
= policy
.features_supported
;
809 reply
.global_seq
= msgr
->get_global_seq();
810 reply
.connect_seq
= connect_seq
;
812 reply
.authorizer_len
= authorizer_reply
.length();
814 reply
.flags
= reply
.flags
| CEPH_MSG_CONNECT_LOSSY
;
816 connection_state
->set_features((uint64_t)reply
.features
& (uint64_t)connect
.features
);
817 ldout(msgr
->cct
,10) << "accept features " << connection_state
->get_features() << dendl
;
819 session_security
.reset(
820 get_auth_session_handler(msgr
->cct
,
821 connect
.authorizer_protocol
,
823 connection_state
->get_features()));
826 msgr
->dispatch_queue
.queue_accept(connection_state
.get());
827 msgr
->ms_deliver_handle_fast_accept(connection_state
.get());
830 if (msgr
->dispatch_queue
.stop
)
832 removed
= msgr
->accepting_pipes
.erase(this);
833 assert(removed
== 1);
838 r
= tcp_write((char*)&reply
, sizeof(reply
));
840 goto fail_registered
;
843 if (reply
.authorizer_len
) {
844 r
= tcp_write(authorizer_reply
.c_str(), authorizer_reply
.length());
846 goto fail_registered
;
850 if (reply_tag
== CEPH_MSGR_TAG_SEQ
) {
851 if (tcp_write((char*)&existing_seq
, sizeof(existing_seq
)) < 0) {
852 ldout(msgr
->cct
,2) << "accept write error on in_seq" << dendl
;
853 goto fail_registered
;
855 if (tcp_read((char*)&newly_acked_seq
, sizeof(newly_acked_seq
)) < 0) {
856 ldout(msgr
->cct
,2) << "accept read error on newly_acked_seq" << dendl
;
857 goto fail_registered
;
862 discard_requeued_up_to(newly_acked_seq
);
863 if (state
!= STATE_CLOSED
) {
864 ldout(msgr
->cct
,10) << "accept starting writer, state " << get_state_name() << dendl
;
867 ldout(msgr
->cct
,20) << "accept done" << dendl
;
869 maybe_start_delay_thread();
871 return 0; // success.
874 ldout(msgr
->cct
, 10) << "accept fault after register" << dendl
;
876 if (msgr
->cct
->_conf
->ms_inject_internal_delays
) {
877 ldout(msgr
->cct
, 10) << " sleep for " << msgr
->cct
->_conf
->ms_inject_internal_delays
<< dendl
;
879 t
.set_from_double(msgr
->cct
->_conf
->ms_inject_internal_delays
);
885 if (state
!= STATE_CLOSED
) {
886 bool queued
= is_queued();
887 ldout(msgr
->cct
, 10) << " queued = " << (int)queued
<< dendl
;
889 state
= policy
.server
? STATE_STANDBY
: STATE_CONNECTING
;
890 } else if (replaced
) {
891 state
= STATE_STANDBY
;
893 state
= STATE_CLOSED
;
897 if (queued
|| replaced
)
904 shutting_down_msgr_unlocked
:
905 assert(pipe_lock
.is_locked());
907 if (msgr
->cct
->_conf
->ms_inject_internal_delays
) {
908 ldout(msgr
->cct
, 10) << " sleep for " << msgr
->cct
->_conf
->ms_inject_internal_delays
<< dendl
;
910 t
.set_from_double(msgr
->cct
->_conf
->ms_inject_internal_delays
);
914 state
= STATE_CLOSED
;
920 void Pipe::set_socket_options()
922 // disable Nagle algorithm?
923 if (msgr
->cct
->_conf
->ms_tcp_nodelay
) {
925 int r
= ::setsockopt(sd
, IPPROTO_TCP
, TCP_NODELAY
, (char*)&flag
, sizeof(flag
));
928 ldout(msgr
->cct
,0) << "couldn't set TCP_NODELAY: "
929 << cpp_strerror(r
) << dendl
;
932 if (msgr
->cct
->_conf
->ms_tcp_rcvbuf
) {
933 int size
= msgr
->cct
->_conf
->ms_tcp_rcvbuf
;
934 int r
= ::setsockopt(sd
, SOL_SOCKET
, SO_RCVBUF
, (void*)&size
, sizeof(size
));
937 ldout(msgr
->cct
,0) << "couldn't set SO_RCVBUF to " << size
938 << ": " << cpp_strerror(r
) << dendl
;
943 #ifdef CEPH_USE_SO_NOSIGPIPE
945 int r
= ::setsockopt(sd
, SOL_SOCKET
, SO_NOSIGPIPE
, (void*)&val
, sizeof(val
));
948 ldout(msgr
->cct
,0) << "couldn't set SO_NOSIGPIPE: "
949 << cpp_strerror(r
) << dendl
;
954 int prio
= msgr
->get_socket_priority();
957 #ifdef IPTOS_CLASS_CS6
958 int iptos
= IPTOS_CLASS_CS6
;
960 if (!peer_addr
.is_blank_ip()) {
961 addr_family
= peer_addr
.get_family();
963 addr_family
= msgr
->get_myaddr().get_family();
965 switch (addr_family
) {
967 r
= ::setsockopt(sd
, IPPROTO_IP
, IP_TOS
, &iptos
, sizeof(iptos
));
970 r
= ::setsockopt(sd
, IPPROTO_IPV6
, IPV6_TCLASS
, &iptos
, sizeof(iptos
));
973 lderr(msgr
->cct
) << "couldn't set ToS of unknown family ("
974 << addr_family
<< ")"
975 << " to " << iptos
<< dendl
;
980 ldout(msgr
->cct
,0) << "couldn't set TOS to " << iptos
981 << ": " << cpp_strerror(r
) << dendl
;
984 // setsockopt(IPTOS_CLASS_CS6) sets the priority of the socket as 0.
985 // See http://goo.gl/QWhvsD and http://goo.gl/laTbjT
986 // We need to call setsockopt(SO_PRIORITY) after it.
987 r
= ::setsockopt(sd
, SOL_SOCKET
, SO_PRIORITY
, &prio
, sizeof(prio
));
990 ldout(msgr
->cct
,0) << "couldn't set SO_PRIORITY to " << prio
991 << ": " << cpp_strerror(r
) << dendl
;
999 bool got_bad_auth
= false;
1001 ldout(msgr
->cct
,10) << "connect " << connect_seq
<< dendl
;
1002 assert(pipe_lock
.is_locked());
1004 __u32 cseq
= connect_seq
;
1005 __u32 gseq
= msgr
->get_global_seq();
1007 // stop reader thread
1015 struct iovec msgvec
[2];
1017 char banner
[strlen(CEPH_BANNER
) + 1]; // extra byte makes coverity happy
1018 entity_addr_t paddr
;
1019 entity_addr_t peer_addr_for_me
, socket_addr
;
1020 AuthAuthorizer
*authorizer
= NULL
;
1021 bufferlist addrbl
, myaddrbl
;
1022 const md_config_t
*conf
= msgr
->cct
->_conf
;
1024 // close old socket. this is safe because we stopped the reader thread above.
1029 sd
= socket_cloexec(peer_addr
.get_family(), SOCK_STREAM
, 0);
1032 lderr(msgr
->cct
) << "connect couldn't create socket " << cpp_strerror(e
) << dendl
;
1039 set_socket_options();
1042 entity_addr_t addr2bind
= msgr
->get_myaddr();
1043 if (msgr
->cct
->_conf
->ms_bind_before_connect
&& (!addr2bind
.is_blank_ip())) {
1044 addr2bind
.set_port(0);
1045 int r
= ::bind(sd
, addr2bind
.get_sockaddr(), addr2bind
.get_sockaddr_len());
1047 ldout(msgr
->cct
,2) << "client bind error " << ", " << cpp_strerror(errno
) << dendl
;
1054 ldout(msgr
->cct
,10) << "connecting to " << peer_addr
<< dendl
;
1055 rc
= ::connect(sd
, peer_addr
.get_sockaddr(), peer_addr
.get_sockaddr_len());
1057 int stored_errno
= errno
;
1058 ldout(msgr
->cct
,2) << "connect error " << peer_addr
1059 << ", " << cpp_strerror(stored_errno
) << dendl
;
1060 if (stored_errno
== ECONNREFUSED
) {
1061 ldout(msgr
->cct
, 2) << "connection refused!" << dendl
;
1062 msgr
->dispatch_queue
.queue_refused(connection_state
.get());
1068 // FIXME: this should be non-blocking, or in some other way verify the banner as we get it.
1069 rc
= tcp_read((char*)&banner
, strlen(CEPH_BANNER
));
1071 ldout(msgr
->cct
,2) << "connect couldn't read banner, " << cpp_strerror(rc
) << dendl
;
1074 if (memcmp(banner
, CEPH_BANNER
, strlen(CEPH_BANNER
))) {
1075 ldout(msgr
->cct
,0) << "connect protocol error (bad banner) on peer " << peer_addr
<< dendl
;
1079 memset(&msg
, 0, sizeof(msg
));
1080 msgvec
[0].iov_base
= banner
;
1081 msgvec
[0].iov_len
= strlen(CEPH_BANNER
);
1082 msg
.msg_iov
= msgvec
;
1084 msglen
= msgvec
[0].iov_len
;
1085 rc
= do_sendmsg(&msg
, msglen
);
1087 ldout(msgr
->cct
,2) << "connect couldn't write my banner, " << cpp_strerror(rc
) << dendl
;
1093 #if defined(__linux__) || defined(DARWIN) || defined(__FreeBSD__)
1094 bufferptr
p(sizeof(ceph_entity_addr
) * 2);
1096 int wirelen
= sizeof(__u32
) * 2 + sizeof(ceph_sockaddr_storage
);
1097 bufferptr
p(wirelen
* 2);
1099 addrbl
.push_back(std::move(p
));
1101 rc
= tcp_read(addrbl
.c_str(), addrbl
.length());
1103 ldout(msgr
->cct
,2) << "connect couldn't read peer addrs, " << cpp_strerror(rc
) << dendl
;
1107 bufferlist::iterator p
= addrbl
.begin();
1109 ::decode(peer_addr_for_me
, p
);
1111 catch (buffer::error
& e
) {
1112 ldout(msgr
->cct
,2) << "connect couldn't decode peer addrs: " << e
.what()
1116 port
= peer_addr_for_me
.get_port();
1118 ldout(msgr
->cct
,20) << "connect read peer addr " << paddr
<< " on socket " << sd
<< dendl
;
1119 if (peer_addr
!= paddr
) {
1120 if (paddr
.is_blank_ip() &&
1121 peer_addr
.get_port() == paddr
.get_port() &&
1122 peer_addr
.get_nonce() == paddr
.get_nonce()) {
1123 ldout(msgr
->cct
,0) << "connect claims to be "
1124 << paddr
<< " not " << peer_addr
<< " - presumably this is the same node!" << dendl
;
1126 ldout(msgr
->cct
,10) << "connect claims to be "
1127 << paddr
<< " not " << peer_addr
<< dendl
;
1132 ldout(msgr
->cct
,20) << "connect peer addr for me is " << peer_addr_for_me
<< dendl
;
1134 msgr
->learned_addr(peer_addr_for_me
);
1136 ::encode(msgr
->my_inst
.addr
, myaddrbl
, 0); // legacy
1138 memset(&msg
, 0, sizeof(msg
));
1139 msgvec
[0].iov_base
= myaddrbl
.c_str();
1140 msgvec
[0].iov_len
= myaddrbl
.length();
1141 msg
.msg_iov
= msgvec
;
1143 msglen
= msgvec
[0].iov_len
;
1144 rc
= do_sendmsg(&msg
, msglen
);
1146 ldout(msgr
->cct
,2) << "connect couldn't write my addr, " << cpp_strerror(rc
) << dendl
;
1149 ldout(msgr
->cct
,10) << "connect sent my addr " << msgr
->my_inst
.addr
<< dendl
;
1154 authorizer
= msgr
->get_authorizer(peer_type
, false);
1156 bufferlist authorizer_reply
;
1158 ceph_msg_connect connect
;
1159 connect
.features
= policy
.features_supported
;
1160 connect
.host_type
= msgr
->get_myinst().name
.type();
1161 connect
.global_seq
= gseq
;
1162 connect
.connect_seq
= cseq
;
1163 connect
.protocol_version
= msgr
->get_proto_version(peer_type
, true);
1164 connect
.authorizer_protocol
= authorizer
? authorizer
->protocol
: 0;
1165 connect
.authorizer_len
= authorizer
? authorizer
->bl
.length() : 0;
1167 ldout(msgr
->cct
,10) << "connect.authorizer_len=" << connect
.authorizer_len
1168 << " protocol=" << connect
.authorizer_protocol
<< dendl
;
1171 connect
.flags
|= CEPH_MSG_CONNECT_LOSSY
; // this is fyi, actually, server decides!
1172 memset(&msg
, 0, sizeof(msg
));
1173 msgvec
[0].iov_base
= (char*)&connect
;
1174 msgvec
[0].iov_len
= sizeof(connect
);
1175 msg
.msg_iov
= msgvec
;
1177 msglen
= msgvec
[0].iov_len
;
1179 msgvec
[1].iov_base
= authorizer
->bl
.c_str();
1180 msgvec
[1].iov_len
= authorizer
->bl
.length();
1182 msglen
+= msgvec
[1].iov_len
;
1185 ldout(msgr
->cct
,10) << "connect sending gseq=" << gseq
<< " cseq=" << cseq
1186 << " proto=" << connect
.protocol_version
<< dendl
;
1187 rc
= do_sendmsg(&msg
, msglen
);
1189 ldout(msgr
->cct
,2) << "connect couldn't write gseq, cseq, " << cpp_strerror(rc
) << dendl
;
1193 ldout(msgr
->cct
,20) << "connect wrote (self +) cseq, waiting for reply" << dendl
;
1194 ceph_msg_connect_reply reply
;
1195 rc
= tcp_read((char*)&reply
, sizeof(reply
));
1197 ldout(msgr
->cct
,2) << "connect read reply " << cpp_strerror(rc
) << dendl
;
1201 ldout(msgr
->cct
,20) << "connect got reply tag " << (int)reply
.tag
1202 << " connect_seq " << reply
.connect_seq
1203 << " global_seq " << reply
.global_seq
1204 << " proto " << reply
.protocol_version
1205 << " flags " << (int)reply
.flags
1206 << " features " << reply
.features
1209 authorizer_reply
.clear();
1211 if (reply
.authorizer_len
) {
1212 ldout(msgr
->cct
,10) << "reply.authorizer_len=" << reply
.authorizer_len
<< dendl
;
1213 bufferptr bp
= buffer::create(reply
.authorizer_len
);
1214 rc
= tcp_read(bp
.c_str(), reply
.authorizer_len
);
1216 ldout(msgr
->cct
,10) << "connect couldn't read connect authorizer_reply" << cpp_strerror(rc
) << dendl
;
1219 authorizer_reply
.push_back(bp
);
1222 if (reply
.tag
== CEPH_MSGR_TAG_CHALLENGE_AUTHORIZER
) {
1223 authorizer
->add_challenge(msgr
->cct
, authorizer_reply
);
1224 ldout(msgr
->cct
,10) << " got authorizer challenge, " << authorizer_reply
.length()
1225 << " bytes" << dendl
;
1230 bufferlist::iterator iter
= authorizer_reply
.begin();
1231 if (!authorizer
->verify_reply(iter
)) {
1232 ldout(msgr
->cct
,0) << "failed verifying authorize reply" << dendl
;
1237 if (conf
->ms_inject_internal_delays
) {
1238 ldout(msgr
->cct
, 10) << " sleep for " << msgr
->cct
->_conf
->ms_inject_internal_delays
<< dendl
;
1240 t
.set_from_double(msgr
->cct
->_conf
->ms_inject_internal_delays
);
1245 if (state
!= STATE_CONNECTING
) {
1246 ldout(msgr
->cct
,0) << "connect got RESETSESSION but no longer connecting" << dendl
;
1250 if (reply
.tag
== CEPH_MSGR_TAG_FEATURES
) {
1251 ldout(msgr
->cct
,0) << "connect protocol feature mismatch, my " << std::hex
1252 << connect
.features
<< " < peer " << reply
.features
1253 << " missing " << (reply
.features
& ~policy
.features_supported
)
1254 << std::dec
<< dendl
;
1258 if (reply
.tag
== CEPH_MSGR_TAG_BADPROTOVER
) {
1259 ldout(msgr
->cct
,0) << "connect protocol version mismatch, my " << connect
.protocol_version
1260 << " != " << reply
.protocol_version
<< dendl
;
1264 if (reply
.tag
== CEPH_MSGR_TAG_BADAUTHORIZER
) {
1265 ldout(msgr
->cct
,0) << "connect got BADAUTHORIZER" << dendl
;
1268 got_bad_auth
= true;
1271 authorizer
= msgr
->get_authorizer(peer_type
, true); // try harder
1274 if (reply
.tag
== CEPH_MSGR_TAG_RESETSESSION
) {
1275 ldout(msgr
->cct
,0) << "connect got RESETSESSION" << dendl
;
1276 was_session_reset();
1281 if (reply
.tag
== CEPH_MSGR_TAG_RETRY_GLOBAL
) {
1282 gseq
= msgr
->get_global_seq(reply
.global_seq
);
1283 ldout(msgr
->cct
,10) << "connect got RETRY_GLOBAL " << reply
.global_seq
1284 << " chose new " << gseq
<< dendl
;
1288 if (reply
.tag
== CEPH_MSGR_TAG_RETRY_SESSION
) {
1289 assert(reply
.connect_seq
> connect_seq
);
1290 ldout(msgr
->cct
,10) << "connect got RETRY_SESSION " << connect_seq
1291 << " -> " << reply
.connect_seq
<< dendl
;
1292 cseq
= connect_seq
= reply
.connect_seq
;
1297 if (reply
.tag
== CEPH_MSGR_TAG_WAIT
) {
1298 ldout(msgr
->cct
,3) << "connect got WAIT (connection race)" << dendl
;
1303 if (reply
.tag
== CEPH_MSGR_TAG_READY
||
1304 reply
.tag
== CEPH_MSGR_TAG_SEQ
) {
1305 uint64_t feat_missing
= policy
.features_required
& ~(uint64_t)reply
.features
;
1307 ldout(msgr
->cct
,1) << "missing required features " << std::hex
<< feat_missing
<< std::dec
<< dendl
;
1311 if (reply
.tag
== CEPH_MSGR_TAG_SEQ
) {
1312 ldout(msgr
->cct
,10) << "got CEPH_MSGR_TAG_SEQ, reading acked_seq and writing in_seq" << dendl
;
1313 uint64_t newly_acked_seq
= 0;
1314 rc
= tcp_read((char*)&newly_acked_seq
, sizeof(newly_acked_seq
));
1316 ldout(msgr
->cct
,2) << "connect read error on newly_acked_seq" << cpp_strerror(rc
) << dendl
;
1319 ldout(msgr
->cct
,2) << " got newly_acked_seq " << newly_acked_seq
1320 << " vs out_seq " << out_seq
<< dendl
;
1321 while (newly_acked_seq
> out_seq
) {
1322 Message
*m
= _get_next_outgoing();
1324 ldout(msgr
->cct
,2) << " discarding previously sent " << m
->get_seq()
1325 << " " << *m
<< dendl
;
1326 assert(m
->get_seq() <= newly_acked_seq
);
1330 if (tcp_write((char*)&in_seq
, sizeof(in_seq
)) < 0) {
1331 ldout(msgr
->cct
,2) << "connect write error on in_seq" << dendl
;
1337 peer_global_seq
= reply
.global_seq
;
1338 policy
.lossy
= reply
.flags
& CEPH_MSG_CONNECT_LOSSY
;
1340 connect_seq
= cseq
+ 1;
1341 assert(connect_seq
== reply
.connect_seq
);
1342 backoff
= utime_t();
1343 connection_state
->set_features((uint64_t)reply
.features
& (uint64_t)connect
.features
);
1344 ldout(msgr
->cct
,10) << "connect success " << connect_seq
<< ", lossy = " << policy
.lossy
1345 << ", features " << connection_state
->get_features() << dendl
;
1348 // If we have an authorizer, get a new AuthSessionHandler to deal with ongoing security of the
1351 if (authorizer
!= NULL
) {
1352 session_security
.reset(
1353 get_auth_session_handler(msgr
->cct
,
1354 authorizer
->protocol
,
1355 authorizer
->session_key
,
1356 connection_state
->get_features()));
1358 // We have no authorizer, so we shouldn't be applying security to messages in this pipe. PLR
1359 session_security
.reset();
1362 msgr
->dispatch_queue
.queue_connect(connection_state
.get());
1363 msgr
->ms_deliver_handle_fast_connect(connection_state
.get());
1365 if (!reader_running
) {
1366 ldout(msgr
->cct
,20) << "connect starting reader" << dendl
;
1369 maybe_start_delay_thread();
1375 ldout(msgr
->cct
,0) << "connect got bad tag " << (int)tag
<< dendl
;
1380 if (conf
->ms_inject_internal_delays
) {
1381 ldout(msgr
->cct
, 10) << " sleep for " << msgr
->cct
->_conf
->ms_inject_internal_delays
<< dendl
;
1383 t
.set_from_double(msgr
->cct
->_conf
->ms_inject_internal_delays
);
1389 if (state
== STATE_CONNECTING
)
1392 ldout(msgr
->cct
,3) << "connect fault, but state = " << get_state_name()
1393 << " != connecting, stopping" << dendl
;
1400 void Pipe::register_pipe()
1402 ldout(msgr
->cct
,10) << "register_pipe" << dendl
;
1403 assert(msgr
->lock
.is_locked());
1404 Pipe
*existing
= msgr
->_lookup_pipe(peer_addr
);
1405 assert(existing
== NULL
);
1406 msgr
->rank_pipe
[peer_addr
] = this;
1409 void Pipe::unregister_pipe()
1411 assert(msgr
->lock
.is_locked());
1412 ceph::unordered_map
<entity_addr_t
,Pipe
*>::iterator p
= msgr
->rank_pipe
.find(peer_addr
);
1413 if (p
!= msgr
->rank_pipe
.end() && p
->second
== this) {
1414 ldout(msgr
->cct
,10) << "unregister_pipe" << dendl
;
1415 msgr
->rank_pipe
.erase(p
);
1417 ldout(msgr
->cct
,10) << "unregister_pipe - not registered" << dendl
;
1418 msgr
->accepting_pipes
.erase(this); // somewhat overkill, but safe.
1424 ldout(msgr
->cct
, 20) << "join" << dendl
;
1425 if (writer_thread
.is_started())
1426 writer_thread
.join();
1427 if (reader_thread
.is_started())
1428 reader_thread
.join();
1430 ldout(msgr
->cct
, 20) << "joining delay_thread" << dendl
;
1431 delay_thread
->stop();
1432 delay_thread
->join();
1436 void Pipe::requeue_sent()
1441 list
<Message
*>& rq
= out_q
[CEPH_MSG_PRIO_HIGHEST
];
1442 while (!sent
.empty()) {
1443 Message
*m
= sent
.back();
1445 ldout(msgr
->cct
,10) << "requeue_sent " << *m
<< " for resend seq " << out_seq
1446 << " (" << m
->get_seq() << ")" << dendl
;
1452 void Pipe::discard_requeued_up_to(uint64_t seq
)
1454 ldout(msgr
->cct
, 10) << "discard_requeued_up_to " << seq
<< dendl
;
1455 if (out_q
.count(CEPH_MSG_PRIO_HIGHEST
) == 0)
1457 list
<Message
*>& rq
= out_q
[CEPH_MSG_PRIO_HIGHEST
];
1458 while (!rq
.empty()) {
1459 Message
*m
= rq
.front();
1460 if (m
->get_seq() == 0 || m
->get_seq() > seq
)
1462 ldout(msgr
->cct
,10) << "discard_requeued_up_to " << *m
<< " for resend seq " << out_seq
1463 << " <= " << seq
<< ", discarding" << dendl
;
1469 out_q
.erase(CEPH_MSG_PRIO_HIGHEST
);
1473 * Tears down the Pipe's message queues, and removes them from the DispatchQueue
1474 * Must hold pipe_lock prior to calling.
1476 void Pipe::discard_out_queue()
1478 ldout(msgr
->cct
,10) << "discard_queue" << dendl
;
1480 for (list
<Message
*>::iterator p
= sent
.begin(); p
!= sent
.end(); ++p
) {
1481 ldout(msgr
->cct
,20) << " discard " << *p
<< dendl
;
1485 for (map
<int,list
<Message
*> >::iterator p
= out_q
.begin(); p
!= out_q
.end(); ++p
)
1486 for (list
<Message
*>::iterator r
= p
->second
.begin(); r
!= p
->second
.end(); ++r
) {
1487 ldout(msgr
->cct
,20) << " discard " << *r
<< dendl
;
1493 void Pipe::fault(bool onread
)
1495 const md_config_t
*conf
= msgr
->cct
->_conf
;
1496 assert(pipe_lock
.is_locked());
1499 if (onread
&& state
== STATE_CONNECTING
) {
1500 ldout(msgr
->cct
,10) << "fault already connecting, reader shutting down" << dendl
;
1504 ldout(msgr
->cct
,2) << "fault " << cpp_strerror(errno
) << dendl
;
1506 if (state
== STATE_CLOSED
||
1507 state
== STATE_CLOSING
) {
1508 ldout(msgr
->cct
,10) << "fault already closed|closing" << dendl
;
1509 if (connection_state
->clear_pipe(this))
1510 msgr
->dispatch_queue
.queue_reset(connection_state
.get());
1517 if (policy
.lossy
&& state
!= STATE_CONNECTING
) {
1518 ldout(msgr
->cct
,10) << "fault on lossy channel, failing" << dendl
;
1520 // disconnect from Connection, and mark it failed. future messages
1522 assert(connection_state
);
1524 bool cleared
= connection_state
->clear_pipe(this);
1526 // crib locks, blech. note that Pipe is now STATE_CLOSED and the
1527 // rank_pipe entry is ignored by others.
1530 if (conf
->ms_inject_internal_delays
) {
1531 ldout(msgr
->cct
, 10) << " sleep for " << msgr
->cct
->_conf
->ms_inject_internal_delays
<< dendl
;
1533 t
.set_from_double(msgr
->cct
->_conf
->ms_inject_internal_delays
);
1540 msgr
->lock
.Unlock();
1543 delay_thread
->discard();
1544 in_q
->discard_queue(conn_id
);
1545 discard_out_queue();
1547 msgr
->dispatch_queue
.queue_reset(connection_state
.get());
1551 // queue delayed items immediately
1553 delay_thread
->flush();
1555 // requeue sent items
1558 if (policy
.standby
&& !is_queued()) {
1559 ldout(msgr
->cct
,0) << "fault with nothing to send, going to standby" << dendl
;
1560 state
= STATE_STANDBY
;
1564 if (state
!= STATE_CONNECTING
) {
1565 if (policy
.server
) {
1566 ldout(msgr
->cct
,0) << "fault, server, going to standby" << dendl
;
1567 state
= STATE_STANDBY
;
1569 ldout(msgr
->cct
,0) << "fault, initiating reconnect" << dendl
;
1571 state
= STATE_CONNECTING
;
1573 backoff
= utime_t();
1574 } else if (backoff
== utime_t()) {
1575 ldout(msgr
->cct
,0) << "fault" << dendl
;
1576 backoff
.set_from_double(conf
->ms_initial_backoff
);
1578 ldout(msgr
->cct
,10) << "fault waiting " << backoff
<< dendl
;
1579 cond
.WaitInterval(pipe_lock
, backoff
);
1581 if (backoff
> conf
->ms_max_backoff
)
1582 backoff
.set_from_double(conf
->ms_max_backoff
);
1583 ldout(msgr
->cct
,10) << "fault done waiting or woke up" << dendl
;
1587 int Pipe::randomize_out_seq()
1589 if (connection_state
->get_features() & CEPH_FEATURE_MSG_AUTH
) {
1590 // Set out_seq to a random value, so CRC won't be predictable. Don't bother checking seq_error
1591 // here. We'll check it on the call. PLR
1592 int seq_error
= get_random_bytes((char *)&out_seq
, sizeof(out_seq
));
1593 out_seq
&= SEQ_MASK
;
1594 lsubdout(msgr
->cct
, ms
, 10) << "randomize_out_seq " << out_seq
<< dendl
;
1597 // previously, seq #'s always started at 0.
1603 void Pipe::was_session_reset()
1605 assert(pipe_lock
.is_locked());
1607 ldout(msgr
->cct
,10) << "was_session_reset" << dendl
;
1608 in_q
->discard_queue(conn_id
);
1610 delay_thread
->discard();
1611 discard_out_queue();
1613 msgr
->dispatch_queue
.queue_remote_reset(connection_state
.get());
1615 if (randomize_out_seq()) {
1616 lsubdout(msgr
->cct
,ms
,15) << "was_session_reset(): Could not get random bytes to set seq number for session reset; set seq number to " << out_seq
<< dendl
;
1625 ldout(msgr
->cct
,10) << "stop" << dendl
;
1626 assert(pipe_lock
.is_locked());
1627 state
= STATE_CLOSED
;
1628 state_closed
= true;
1633 void Pipe::stop_and_wait()
1635 assert(pipe_lock
.is_locked_by_me());
1636 if (state
!= STATE_CLOSED
)
1639 if (msgr
->cct
->_conf
->ms_inject_internal_delays
) {
1640 ldout(msgr
->cct
, 10) << __func__
<< " sleep for "
1641 << msgr
->cct
->_conf
->ms_inject_internal_delays
1644 t
.set_from_double(msgr
->cct
->_conf
->ms_inject_internal_delays
);
1650 delay_thread
->stop_fast_dispatching();
1653 while (reader_running
&&
1655 cond
.Wait(pipe_lock
);
1658 /* read msgs from socket.
1665 if (state
== STATE_ACCEPTING
) {
1667 assert(pipe_lock
.is_locked());
1671 while (state
!= STATE_CLOSED
&&
1672 state
!= STATE_CONNECTING
) {
1673 assert(pipe_lock
.is_locked());
1675 // sleep if (re)connecting
1676 if (state
== STATE_STANDBY
) {
1677 ldout(msgr
->cct
,20) << "reader sleeping during reconnect|standby" << dendl
;
1678 cond
.Wait(pipe_lock
);
1682 // get a reference to the AuthSessionHandler while we have the pipe_lock
1683 ceph::shared_ptr
<AuthSessionHandler
> auth_handler
= session_security
;
1688 ldout(msgr
->cct
,20) << "reader reading tag..." << dendl
;
1689 if (tcp_read((char*)&tag
, 1) < 0) {
1691 ldout(msgr
->cct
,2) << "reader couldn't read tag, " << cpp_strerror(errno
) << dendl
;
1696 if (tag
== CEPH_MSGR_TAG_KEEPALIVE
) {
1697 ldout(msgr
->cct
,2) << "reader got KEEPALIVE" << dendl
;
1699 connection_state
->set_last_keepalive(ceph_clock_now());
1702 if (tag
== CEPH_MSGR_TAG_KEEPALIVE2
) {
1703 ldout(msgr
->cct
,30) << "reader got KEEPALIVE2 tag ..." << dendl
;
1705 int rc
= tcp_read((char*)&t
, sizeof(t
));
1708 ldout(msgr
->cct
,2) << "reader couldn't read KEEPALIVE2 stamp "
1709 << cpp_strerror(errno
) << dendl
;
1712 send_keepalive_ack
= true;
1713 keepalive_ack_stamp
= utime_t(t
);
1714 ldout(msgr
->cct
,2) << "reader got KEEPALIVE2 " << keepalive_ack_stamp
1716 connection_state
->set_last_keepalive(ceph_clock_now());
1721 if (tag
== CEPH_MSGR_TAG_KEEPALIVE2_ACK
) {
1722 ldout(msgr
->cct
,2) << "reader got KEEPALIVE_ACK" << dendl
;
1723 struct ceph_timespec t
;
1724 int rc
= tcp_read((char*)&t
, sizeof(t
));
1727 ldout(msgr
->cct
,2) << "reader couldn't read KEEPALIVE2 stamp " << cpp_strerror(errno
) << dendl
;
1730 connection_state
->set_last_keepalive_ack(utime_t(t
));
1736 if (tag
== CEPH_MSGR_TAG_ACK
) {
1737 ldout(msgr
->cct
,20) << "reader got ACK" << dendl
;
1739 int rc
= tcp_read((char*)&seq
, sizeof(seq
));
1742 ldout(msgr
->cct
,2) << "reader couldn't read ack seq, " << cpp_strerror(errno
) << dendl
;
1744 } else if (state
!= STATE_CLOSED
) {
1750 else if (tag
== CEPH_MSGR_TAG_MSG
) {
1751 ldout(msgr
->cct
,20) << "reader got MSG" << dendl
;
1753 int r
= read_message(&m
, auth_handler
.get());
1763 m
->trace
.event("pipe read message");
1765 if (state
== STATE_CLOSED
||
1766 state
== STATE_CONNECTING
) {
1767 in_q
->dispatch_throttle_release(m
->get_dispatch_throttle_size());
1772 // check received seq#. if it is old, drop the message.
1773 // note that incoming messages may skip ahead. this is convenient for the client
1774 // side queueing because messages can't be renumbered, but the (kernel) client will
1775 // occasionally pull a message out of the sent queue to send elsewhere. in that case
1776 // it doesn't matter if we "got" it or not.
1777 if (m
->get_seq() <= in_seq
) {
1778 ldout(msgr
->cct
,0) << "reader got old message "
1779 << m
->get_seq() << " <= " << in_seq
<< " " << m
<< " " << *m
1780 << ", discarding" << dendl
;
1781 in_q
->dispatch_throttle_release(m
->get_dispatch_throttle_size());
1783 if (connection_state
->has_feature(CEPH_FEATURE_RECONNECT_SEQ
) &&
1784 msgr
->cct
->_conf
->ms_die_on_old_message
)
1785 assert(0 == "old msgs despite reconnect_seq feature");
1788 if (m
->get_seq() > in_seq
+ 1) {
1789 ldout(msgr
->cct
,0) << "reader missed message? skipped from seq "
1790 << in_seq
<< " to " << m
->get_seq() << dendl
;
1791 if (msgr
->cct
->_conf
->ms_die_on_skipped_message
)
1792 assert(0 == "skipped incoming seq");
1795 m
->set_connection(connection_state
.get());
1797 // note last received message.
1798 in_seq
= m
->get_seq();
1800 cond
.Signal(); // wake up writer, to ack this
1802 ldout(msgr
->cct
,10) << "reader got message "
1803 << m
->get_seq() << " " << m
<< " " << *m
1805 in_q
->fast_preprocess(m
);
1809 if (rand() % 10000 < msgr
->cct
->_conf
->ms_inject_delay_probability
* 10000.0) {
1810 release
= m
->get_recv_stamp();
1811 release
+= msgr
->cct
->_conf
->ms_inject_delay_max
* (double)(rand() % 10000) / 10000.0;
1812 lsubdout(msgr
->cct
, ms
, 1) << "queue_received will delay until " << release
<< " on " << m
<< " " << *m
<< dendl
;
1814 delay_thread
->queue(release
, m
);
1816 if (in_q
->can_fast_dispatch(m
)) {
1817 reader_dispatching
= true;
1819 in_q
->fast_dispatch(m
);
1821 reader_dispatching
= false;
1822 if (state
== STATE_CLOSED
||
1823 notify_on_dispatch_done
) { // there might be somebody waiting
1824 notify_on_dispatch_done
= false;
1828 in_q
->enqueue(m
, m
->get_priority(), conn_id
);
1833 else if (tag
== CEPH_MSGR_TAG_CLOSE
) {
1834 ldout(msgr
->cct
,20) << "reader got CLOSE" << dendl
;
1836 if (state
== STATE_CLOSING
) {
1837 state
= STATE_CLOSED
;
1838 state_closed
= true;
1840 state
= STATE_CLOSING
;
1846 ldout(msgr
->cct
,0) << "reader bad tag " << (int)tag
<< dendl
;
1854 reader_running
= false;
1855 reader_needs_join
= true;
1856 unlock_maybe_reap();
1857 ldout(msgr
->cct
,10) << "reader done" << dendl
;
1860 /* write msgs to socket.
1866 while (state
!= STATE_CLOSED
) {// && state != STATE_WAIT) {
1867 ldout(msgr
->cct
,10) << "writer: state = " << get_state_name()
1868 << " policy.server=" << policy
.server
<< dendl
;
1871 if (is_queued() && state
== STATE_STANDBY
&& !policy
.server
)
1872 state
= STATE_CONNECTING
;
1875 if (state
== STATE_CONNECTING
) {
1876 assert(!policy
.server
);
1881 if (state
== STATE_CLOSING
) {
1883 ldout(msgr
->cct
,20) << "writer writing CLOSE tag" << dendl
;
1884 char tag
= CEPH_MSGR_TAG_CLOSE
;
1885 state
= STATE_CLOSED
;
1886 state_closed
= true;
1889 // we can ignore return value, actually; we don't care if this succeeds.
1890 int r
= ::write(sd
, &tag
, 1);
1897 if (state
!= STATE_CONNECTING
&& state
!= STATE_WAIT
&& state
!= STATE_STANDBY
&&
1898 (is_queued() || in_seq
> in_seq_acked
)) {
1901 if (send_keepalive
) {
1903 if (connection_state
->has_feature(CEPH_FEATURE_MSGR_KEEPALIVE2
)) {
1905 rc
= write_keepalive2(CEPH_MSGR_TAG_KEEPALIVE2
,
1909 rc
= write_keepalive();
1913 ldout(msgr
->cct
,2) << "writer couldn't write keepalive[2], "
1914 << cpp_strerror(errno
) << dendl
;
1918 send_keepalive
= false;
1920 if (send_keepalive_ack
) {
1921 utime_t t
= keepalive_ack_stamp
;
1923 int rc
= write_keepalive2(CEPH_MSGR_TAG_KEEPALIVE2_ACK
, t
);
1926 ldout(msgr
->cct
,2) << "writer couldn't write keepalive_ack, " << cpp_strerror(errno
) << dendl
;
1930 send_keepalive_ack
= false;
1934 if (in_seq
> in_seq_acked
) {
1935 uint64_t send_seq
= in_seq
;
1937 int rc
= write_ack(send_seq
);
1940 ldout(msgr
->cct
,2) << "writer couldn't write ack, " << cpp_strerror(errno
) << dendl
;
1944 in_seq_acked
= send_seq
;
1947 // grab outgoing message
1948 Message
*m
= _get_next_outgoing();
1950 m
->set_seq(++out_seq
);
1951 if (!policy
.lossy
) {
1957 // associate message with Connection (for benefit of encode_payload)
1958 m
->set_connection(connection_state
.get());
1960 uint64_t features
= connection_state
->get_features();
1962 if (m
->empty_payload())
1963 ldout(msgr
->cct
,20) << "writer encoding " << m
->get_seq() << " features " << features
1964 << " " << m
<< " " << *m
<< dendl
;
1966 ldout(msgr
->cct
,20) << "writer half-reencoding " << m
->get_seq() << " features " << features
1967 << " " << m
<< " " << *m
<< dendl
;
1969 // encode and copy out of *m
1970 m
->encode(features
, msgr
->crcflags
);
1972 // prepare everything
1973 const ceph_msg_header
& header
= m
->get_header();
1974 const ceph_msg_footer
& footer
= m
->get_footer();
1976 // Now that we have all the crcs calculated, handle the
1977 // digital signature for the message, if the pipe has session
1978 // security set up. Some session security options do not
1979 // actually calculate and check the signature, but they should
1980 // handle the calls to sign_message and check_signature. PLR
1981 if (session_security
.get() == NULL
) {
1982 ldout(msgr
->cct
, 20) << "writer no session security" << dendl
;
1984 if (session_security
->sign_message(m
)) {
1985 ldout(msgr
->cct
, 20) << "writer failed to sign seq # " << header
.seq
1986 << "): sig = " << footer
.sig
<< dendl
;
1988 ldout(msgr
->cct
, 20) << "writer signed seq # " << header
.seq
1989 << "): sig = " << footer
.sig
<< dendl
;
1993 bufferlist blist
= m
->get_payload();
1994 blist
.append(m
->get_middle());
1995 blist
.append(m
->get_data());
1999 m
->trace
.event("pipe writing message");
2001 ldout(msgr
->cct
,20) << "writer sending " << m
->get_seq() << " " << m
<< dendl
;
2002 int rc
= write_message(header
, footer
, blist
);
2006 ldout(msgr
->cct
,1) << "writer error sending " << m
<< ", "
2007 << cpp_strerror(errno
) << dendl
;
2016 ldout(msgr
->cct
,20) << "writer sleeping" << dendl
;
2017 cond
.Wait(pipe_lock
);
2020 ldout(msgr
->cct
,20) << "writer finishing" << dendl
;
2023 writer_running
= false;
2024 unlock_maybe_reap();
2025 ldout(msgr
->cct
,10) << "writer done" << dendl
;
2028 void Pipe::unlock_maybe_reap()
2030 if (!reader_running
&& !writer_running
) {
2033 if (delay_thread
&& delay_thread
->is_flushing()) {
2034 delay_thread
->wait_for_flush();
2036 msgr
->queue_reap(this);
2042 static void alloc_aligned_buffer(bufferlist
& data
, unsigned len
, unsigned off
)
2044 // create a buffer to read into that matches the data alignment
2045 unsigned left
= len
;
2046 if (off
& ~CEPH_PAGE_MASK
) {
2049 head
= MIN(CEPH_PAGE_SIZE
- (off
& ~CEPH_PAGE_MASK
), left
);
2050 data
.push_back(buffer::create(head
));
2053 unsigned middle
= left
& CEPH_PAGE_MASK
;
2055 data
.push_back(buffer::create_page_aligned(middle
));
2059 data
.push_back(buffer::create(left
));
2063 int Pipe::read_message(Message
**pm
, AuthSessionHandler
* auth_handler
)
2067 //ldout(msgr->cct,10) << "receiver.read_message from sd " << sd << dendl;
2069 ceph_msg_header header
;
2070 ceph_msg_footer footer
;
2071 __u32 header_crc
= 0;
2073 if (connection_state
->has_feature(CEPH_FEATURE_NOSRCADDR
)) {
2074 if (tcp_read((char*)&header
, sizeof(header
)) < 0)
2076 if (msgr
->crcflags
& MSG_CRC_HEADER
) {
2077 header_crc
= ceph_crc32c(0, (unsigned char *)&header
, sizeof(header
) - sizeof(header
.crc
));
2080 ceph_msg_header_old oldheader
;
2081 if (tcp_read((char*)&oldheader
, sizeof(oldheader
)) < 0)
2084 memcpy(&header
, &oldheader
, sizeof(header
));
2085 header
.src
= oldheader
.src
.name
;
2086 header
.reserved
= oldheader
.reserved
;
2087 if (msgr
->crcflags
& MSG_CRC_HEADER
) {
2088 header
.crc
= oldheader
.crc
;
2089 header_crc
= ceph_crc32c(0, (unsigned char *)&oldheader
, sizeof(oldheader
) - sizeof(oldheader
.crc
));
2093 ldout(msgr
->cct
,20) << "reader got envelope type=" << header
.type
2094 << " src " << entity_name_t(header
.src
)
2095 << " front=" << header
.front_len
2096 << " data=" << header
.data_len
2097 << " off " << header
.data_off
2100 // verify header crc
2101 if ((msgr
->crcflags
& MSG_CRC_HEADER
) && header_crc
!= header
.crc
) {
2102 ldout(msgr
->cct
,0) << "reader got bad header crc " << header_crc
<< " != " << header
.crc
<< dendl
;
2106 bufferlist front
, middle
, data
;
2107 int front_len
, middle_len
;
2108 unsigned data_len
, data_off
;
2111 utime_t recv_stamp
= ceph_clock_now();
2113 if (policy
.throttler_messages
) {
2114 ldout(msgr
->cct
,10) << "reader wants " << 1 << " message from policy throttler "
2115 << policy
.throttler_messages
->get_current() << "/"
2116 << policy
.throttler_messages
->get_max() << dendl
;
2117 policy
.throttler_messages
->get();
2120 uint64_t message_size
= header
.front_len
+ header
.middle_len
+ header
.data_len
;
2122 if (policy
.throttler_bytes
) {
2123 ldout(msgr
->cct
,10) << "reader wants " << message_size
<< " bytes from policy throttler "
2124 << policy
.throttler_bytes
->get_current() << "/"
2125 << policy
.throttler_bytes
->get_max() << dendl
;
2126 policy
.throttler_bytes
->get(message_size
);
2129 // throttle total bytes waiting for dispatch. do this _after_ the
2130 // policy throttle, as this one does not deadlock (unless dispatch
2131 // blocks indefinitely, which it shouldn't). in contrast, the
2132 // policy throttle carries for the lifetime of the message.
2133 ldout(msgr
->cct
,10) << "reader wants " << message_size
<< " from dispatch throttler "
2134 << in_q
->dispatch_throttler
.get_current() << "/"
2135 << in_q
->dispatch_throttler
.get_max() << dendl
;
2136 in_q
->dispatch_throttler
.get(message_size
);
2139 utime_t throttle_stamp
= ceph_clock_now();
2142 front_len
= header
.front_len
;
2144 bufferptr bp
= buffer::create(front_len
);
2145 if (tcp_read(bp
.c_str(), front_len
) < 0)
2146 goto out_dethrottle
;
2147 front
.push_back(std::move(bp
));
2148 ldout(msgr
->cct
,20) << "reader got front " << front
.length() << dendl
;
2152 middle_len
= header
.middle_len
;
2154 bufferptr bp
= buffer::create(middle_len
);
2155 if (tcp_read(bp
.c_str(), middle_len
) < 0)
2156 goto out_dethrottle
;
2157 middle
.push_back(std::move(bp
));
2158 ldout(msgr
->cct
,20) << "reader got middle " << middle
.length() << dendl
;
2163 data_len
= le32_to_cpu(header
.data_len
);
2164 data_off
= le32_to_cpu(header
.data_off
);
2166 unsigned offset
= 0;
2167 unsigned left
= data_len
;
2169 bufferlist newbuf
, rxbuf
;
2170 bufferlist::iterator blp
;
2171 int rxbuf_version
= 0;
2175 if (tcp_read_wait() < 0)
2176 goto out_dethrottle
;
2179 connection_state
->lock
.Lock();
2180 map
<ceph_tid_t
,pair
<bufferlist
,int> >::iterator p
= connection_state
->rx_buffers
.find(header
.tid
);
2181 if (p
!= connection_state
->rx_buffers
.end()) {
2182 if (rxbuf
.length() == 0 || p
->second
.second
!= rxbuf_version
) {
2183 ldout(msgr
->cct
,10) << "reader seleting rx buffer v " << p
->second
.second
2184 << " at offset " << offset
2185 << " len " << p
->second
.first
.length() << dendl
;
2186 rxbuf
= p
->second
.first
;
2187 rxbuf_version
= p
->second
.second
;
2188 // make sure it's big enough
2189 if (rxbuf
.length() < data_len
)
2190 rxbuf
.push_back(buffer::create(data_len
- rxbuf
.length()));
2191 blp
= p
->second
.first
.begin();
2192 blp
.advance(offset
);
2195 if (!newbuf
.length()) {
2196 ldout(msgr
->cct
,20) << "reader allocating new rx buffer at offset " << offset
<< dendl
;
2197 alloc_aligned_buffer(newbuf
, data_len
, data_off
);
2198 blp
= newbuf
.begin();
2199 blp
.advance(offset
);
2202 bufferptr bp
= blp
.get_current_ptr();
2203 int read
= MIN(bp
.length(), left
);
2204 ldout(msgr
->cct
,20) << "reader reading nonblocking into " << (void*)bp
.c_str() << " len " << bp
.length() << dendl
;
2205 ssize_t got
= tcp_read_nonblocking(bp
.c_str(), read
);
2206 ldout(msgr
->cct
,30) << "reader read " << got
<< " of " << read
<< dendl
;
2207 connection_state
->lock
.Unlock();
2209 goto out_dethrottle
;
2212 data
.append(bp
, 0, got
);
2215 } // else we got a signal or something; just loop.
2220 if (connection_state
->has_feature(CEPH_FEATURE_MSG_AUTH
)) {
2221 if (tcp_read((char*)&footer
, sizeof(footer
)) < 0)
2222 goto out_dethrottle
;
2224 ceph_msg_footer_old old_footer
;
2225 if (tcp_read((char*)&old_footer
, sizeof(old_footer
)) < 0)
2226 goto out_dethrottle
;
2227 footer
.front_crc
= old_footer
.front_crc
;
2228 footer
.middle_crc
= old_footer
.middle_crc
;
2229 footer
.data_crc
= old_footer
.data_crc
;
2231 footer
.flags
= old_footer
.flags
;
2234 aborted
= (footer
.flags
& CEPH_MSG_FOOTER_COMPLETE
) == 0;
2235 ldout(msgr
->cct
,10) << "aborted = " << aborted
<< dendl
;
2237 ldout(msgr
->cct
,0) << "reader got " << front
.length() << " + " << middle
.length() << " + " << data
.length()
2238 << " byte message.. ABORTED" << dendl
;
2240 goto out_dethrottle
;
2243 ldout(msgr
->cct
,20) << "reader got " << front
.length() << " + " << middle
.length() << " + " << data
.length()
2244 << " byte message" << dendl
;
2245 message
= decode_message(msgr
->cct
, msgr
->crcflags
, header
, footer
,
2246 front
, middle
, data
, connection_state
.get());
2249 goto out_dethrottle
;
2253 // Check the signature if one should be present. A zero return indicates success. PLR
2256 if (auth_handler
== NULL
) {
2257 ldout(msgr
->cct
, 10) << "No session security set" << dendl
;
2259 if (auth_handler
->check_message_signature(message
)) {
2260 ldout(msgr
->cct
, 0) << "Signature check failed" << dendl
;
2263 goto out_dethrottle
;
2267 message
->set_byte_throttler(policy
.throttler_bytes
);
2268 message
->set_message_throttler(policy
.throttler_messages
);
2270 // store reservation size in message, so we don't get confused
2271 // by messages entering the dispatch queue through other paths.
2272 message
->set_dispatch_throttle_size(message_size
);
2274 message
->set_recv_stamp(recv_stamp
);
2275 message
->set_throttle_stamp(throttle_stamp
);
2276 message
->set_recv_complete_stamp(ceph_clock_now());
2282 // release bytes reserved from the throttlers on failure
2283 if (policy
.throttler_messages
) {
2284 ldout(msgr
->cct
,10) << "reader releasing " << 1 << " message to policy throttler "
2285 << policy
.throttler_messages
->get_current() << "/"
2286 << policy
.throttler_messages
->get_max() << dendl
;
2287 policy
.throttler_messages
->put();
2290 if (policy
.throttler_bytes
) {
2291 ldout(msgr
->cct
,10) << "reader releasing " << message_size
<< " bytes to policy throttler "
2292 << policy
.throttler_bytes
->get_current() << "/"
2293 << policy
.throttler_bytes
->get_max() << dendl
;
2294 policy
.throttler_bytes
->put(message_size
);
2297 in_q
->dispatch_throttle_release(message_size
);
2302 int Pipe::do_sendmsg(struct msghdr
*msg
, unsigned len
, bool more
)
2304 MSGR_SIGPIPE_STOPPER
;
2307 r
= ::sendmsg(sd
, msg
, MSG_NOSIGNAL
| (more
? MSG_MORE
: 0));
2309 ldout(msgr
->cct
,10) << "do_sendmsg hmm do_sendmsg got r==0!" << dendl
;
2312 ldout(msgr
->cct
,1) << "do_sendmsg error " << cpp_strerror(r
) << dendl
;
2315 if (state
== STATE_CLOSED
) {
2316 ldout(msgr
->cct
,10) << "do_sendmsg oh look, state == CLOSED, giving up" << dendl
;
2317 return -EINTR
; // close enough
2321 if (len
== 0) break;
2323 // hrmph. trim r bytes off the front of our message.
2324 ldout(msgr
->cct
,20) << "do_sendmsg short write did " << r
<< ", still have " << len
<< dendl
;
2326 if (msg
->msg_iov
[0].iov_len
<= (size_t)r
) {
2327 // lose this whole item
2328 //ldout(msgr->cct,30) << "skipping " << msg->msg_iov[0].iov_len << ", " << (msg->msg_iovlen-1) << " v, " << r << " left" << dendl;
2329 r
-= msg
->msg_iov
[0].iov_len
;
2334 //ldout(msgr->cct,30) << "adjusting " << msg->msg_iov[0].iov_len << ", " << msg->msg_iovlen << " v, " << r << " left" << dendl;
2335 msg
->msg_iov
[0].iov_base
= (char *)msg
->msg_iov
[0].iov_base
+ r
;
2336 msg
->msg_iov
[0].iov_len
-= r
;
2345 int Pipe::write_ack(uint64_t seq
)
2347 ldout(msgr
->cct
,10) << "write_ack " << seq
<< dendl
;
2349 char c
= CEPH_MSGR_TAG_ACK
;
2354 memset(&msg
, 0, sizeof(msg
));
2355 struct iovec msgvec
[2];
2356 msgvec
[0].iov_base
= &c
;
2357 msgvec
[0].iov_len
= 1;
2358 msgvec
[1].iov_base
= &s
;
2359 msgvec
[1].iov_len
= sizeof(s
);
2360 msg
.msg_iov
= msgvec
;
2363 if (do_sendmsg(&msg
, 1 + sizeof(s
), true) < 0)
2368 int Pipe::write_keepalive()
2370 ldout(msgr
->cct
,10) << "write_keepalive" << dendl
;
2372 char c
= CEPH_MSGR_TAG_KEEPALIVE
;
2375 memset(&msg
, 0, sizeof(msg
));
2376 struct iovec msgvec
[2];
2377 msgvec
[0].iov_base
= &c
;
2378 msgvec
[0].iov_len
= 1;
2379 msg
.msg_iov
= msgvec
;
2382 if (do_sendmsg(&msg
, 1) < 0)
2387 int Pipe::write_keepalive2(char tag
, const utime_t
& t
)
2389 ldout(msgr
->cct
,10) << "write_keepalive2 " << (int)tag
<< " " << t
<< dendl
;
2390 struct ceph_timespec ts
;
2391 t
.encode_timeval(&ts
);
2393 memset(&msg
, 0, sizeof(msg
));
2394 struct iovec msgvec
[2];
2395 msgvec
[0].iov_base
= &tag
;
2396 msgvec
[0].iov_len
= 1;
2397 msgvec
[1].iov_base
= &ts
;
2398 msgvec
[1].iov_len
= sizeof(ts
);
2399 msg
.msg_iov
= msgvec
;
2402 if (do_sendmsg(&msg
, 1 + sizeof(ts
)) < 0)
2408 int Pipe::write_message(const ceph_msg_header
& header
, const ceph_msg_footer
& footer
, bufferlist
& blist
)
2412 // set up msghdr and iovecs
2414 memset(&msg
, 0, sizeof(msg
));
2415 msg
.msg_iov
= msgvec
;
2419 char tag
= CEPH_MSGR_TAG_MSG
;
2420 msgvec
[msg
.msg_iovlen
].iov_base
= &tag
;
2421 msgvec
[msg
.msg_iovlen
].iov_len
= 1;
2426 ceph_msg_header_old oldheader
;
2427 if (connection_state
->has_feature(CEPH_FEATURE_NOSRCADDR
)) {
2428 msgvec
[msg
.msg_iovlen
].iov_base
= (char*)&header
;
2429 msgvec
[msg
.msg_iovlen
].iov_len
= sizeof(header
);
2430 msglen
+= sizeof(header
);
2433 memcpy(&oldheader
, &header
, sizeof(header
));
2434 oldheader
.src
.name
= header
.src
;
2435 oldheader
.src
.addr
= connection_state
->get_peer_addr();
2436 oldheader
.orig_src
= oldheader
.src
;
2437 oldheader
.reserved
= header
.reserved
;
2438 if (msgr
->crcflags
& MSG_CRC_HEADER
) {
2439 oldheader
.crc
= ceph_crc32c(0, (unsigned char*)&oldheader
,
2440 sizeof(oldheader
) - sizeof(oldheader
.crc
));
2444 msgvec
[msg
.msg_iovlen
].iov_base
= (char*)&oldheader
;
2445 msgvec
[msg
.msg_iovlen
].iov_len
= sizeof(oldheader
);
2446 msglen
+= sizeof(oldheader
);
2450 // payload (front+data)
2451 list
<bufferptr
>::const_iterator pb
= blist
.buffers().begin();
2452 unsigned b_off
= 0; // carry-over buffer offset, if any
2453 unsigned bl_pos
= 0; // blist pos
2454 unsigned left
= blist
.length();
2457 unsigned donow
= MIN(left
, pb
->length()-b_off
);
2459 ldout(msgr
->cct
,0) << "donow = " << donow
<< " left " << left
<< " pb->length " << pb
->length()
2460 << " b_off " << b_off
<< dendl
;
2463 ldout(msgr
->cct
,30) << " bl_pos " << bl_pos
<< " b_off " << b_off
2464 << " leftinchunk " << left
2465 << " buffer len " << pb
->length()
2466 << " writing " << donow
2469 if (msg
.msg_iovlen
>= SM_IOV_MAX
-2) {
2470 if (do_sendmsg(&msg
, msglen
, true))
2473 // and restart the iov
2474 msg
.msg_iov
= msgvec
;
2479 msgvec
[msg
.msg_iovlen
].iov_base
= (void*)(pb
->c_str()+b_off
);
2480 msgvec
[msg
.msg_iovlen
].iov_len
= donow
;
2484 assert(left
>= donow
);
2490 while (b_off
== pb
->length()) {
2497 // send footer; if receiver doesn't support signatures, use the old footer format
2499 ceph_msg_footer_old old_footer
;
2500 if (connection_state
->has_feature(CEPH_FEATURE_MSG_AUTH
)) {
2501 msgvec
[msg
.msg_iovlen
].iov_base
= (void*)&footer
;
2502 msgvec
[msg
.msg_iovlen
].iov_len
= sizeof(footer
);
2503 msglen
+= sizeof(footer
);
2506 if (msgr
->crcflags
& MSG_CRC_HEADER
) {
2507 old_footer
.front_crc
= footer
.front_crc
;
2508 old_footer
.middle_crc
= footer
.middle_crc
;
2510 old_footer
.front_crc
= old_footer
.middle_crc
= 0;
2512 old_footer
.data_crc
= msgr
->crcflags
& MSG_CRC_DATA
? footer
.data_crc
: 0;
2513 old_footer
.flags
= footer
.flags
;
2514 msgvec
[msg
.msg_iovlen
].iov_base
= (char*)&old_footer
;
2515 msgvec
[msg
.msg_iovlen
].iov_len
= sizeof(old_footer
);
2516 msglen
+= sizeof(old_footer
);
2521 if (do_sendmsg(&msg
, msglen
))
2535 int Pipe::tcp_read(char *buf
, unsigned len
)
2542 if (msgr
->cct
->_conf
->ms_inject_socket_failures
&& sd
>= 0) {
2543 if (rand() % msgr
->cct
->_conf
->ms_inject_socket_failures
== 0) {
2544 ldout(msgr
->cct
, 0) << "injecting socket failure" << dendl
;
2545 ::shutdown(sd
, SHUT_RDWR
);
2549 if (tcp_read_wait() < 0)
2552 ssize_t got
= tcp_read_nonblocking(buf
, len
);
2559 //lgeneric_dout(cct, DBL) << "tcp_read got " << got << ", " << len << " left" << dendl;
2564 int Pipe::tcp_read_wait()
2571 pfd
.events
= POLLIN
;
2572 #if defined(__linux__)
2573 pfd
.events
|= POLLRDHUP
;
2576 if (has_pending_data())
2579 int r
= poll(&pfd
, 1, msgr
->timeout
);
2585 evmask
= POLLERR
| POLLHUP
| POLLNVAL
;
2586 #if defined(__linux__)
2587 evmask
|= POLLRDHUP
;
2589 if (pfd
.revents
& evmask
)
2592 if (!(pfd
.revents
& POLLIN
))
2598 ssize_t
Pipe::do_recv(char *buf
, size_t len
, int flags
)
2601 ssize_t got
= ::recv( sd
, buf
, len
, flags
);
2603 if (errno
== EINTR
) {
2606 ldout(msgr
->cct
, 10) << __func__
<< " socket " << sd
<< " returned "
2607 << got
<< " " << cpp_strerror(errno
) << dendl
;
2616 ssize_t
Pipe::buffered_recv(char *buf
, size_t len
, int flags
)
2619 ssize_t total_recv
= 0;
2620 if (recv_len
> recv_ofs
) {
2621 int to_read
= MIN(recv_len
- recv_ofs
, left
);
2622 memcpy(buf
, &recv_buf
[recv_ofs
], to_read
);
2623 recv_ofs
+= to_read
;
2629 total_recv
+= to_read
;
2632 /* nothing left in the prefetch buffer */
2634 if (left
> recv_max_prefetch
) {
2635 /* this was a large read, we don't prefetch for these */
2636 ssize_t ret
= do_recv(buf
, left
, flags
);
2647 ssize_t got
= do_recv(recv_buf
, recv_max_prefetch
, flags
);
2655 recv_len
= (size_t)got
;
2656 got
= MIN(left
, (size_t)got
);
2657 memcpy(buf
, recv_buf
, got
);
2663 ssize_t
Pipe::tcp_read_nonblocking(char *buf
, unsigned len
)
2665 ssize_t got
= buffered_recv(buf
, len
, MSG_DONTWAIT
);
2667 ldout(msgr
->cct
, 10) << __func__
<< " socket " << sd
<< " returned "
2668 << got
<< " " << cpp_strerror(errno
) << dendl
;
2672 /* poll() said there was data, but we didn't read any - peer
2673 * sent a FIN. Maybe POLLRDHUP signals this, but this is
2674 * standard socket behavior as documented by Stevens.
2681 int Pipe::tcp_write(const char *buf
, unsigned len
)
2687 pfd
.events
= POLLOUT
| POLLHUP
| POLLNVAL
| POLLERR
;
2688 #if defined(__linux__)
2689 pfd
.events
|= POLLRDHUP
;
2692 if (msgr
->cct
->_conf
->ms_inject_socket_failures
&& sd
>= 0) {
2693 if (rand() % msgr
->cct
->_conf
->ms_inject_socket_failures
== 0) {
2694 ldout(msgr
->cct
, 0) << "injecting socket failure" << dendl
;
2695 ::shutdown(sd
, SHUT_RDWR
);
2699 if (poll(&pfd
, 1, -1) < 0)
2702 if (!(pfd
.revents
& POLLOUT
))
2705 //lgeneric_dout(cct, DBL) << "tcp_write writing " << len << dendl;
2708 MSGR_SIGPIPE_STOPPER
;
2709 int did
= ::send( sd
, buf
, len
, MSG_NOSIGNAL
);
2711 //lgeneric_dout(cct, 1) << "tcp_write error did = " << did << " " << cpp_strerror(errno) << dendl;
2712 //lgeneric_derr(cct, 1) << "tcp_write error did = " << did << " " << cpp_strerror(errno) << dendl;
2717 //lgeneric_dout(cct, DBL) << "tcp_write did " << did << ", " << len << " left" << dendl;