1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
15 #include <sys/types.h>
16 #include <sys/socket.h>
17 #include <netinet/in.h>
18 #include <netinet/ip.h>
19 #include <netinet/tcp.h>
24 #include "msg/Message.h"
26 #include "SimpleMessenger.h"
28 #include "common/debug.h"
29 #include "common/errno.h"
30 #include "common/valgrind.h"
32 // Below included to get encode_encrypt(); That probably should be in Crypto.h, instead
34 #include "auth/Crypto.h"
35 #include "auth/cephx/CephxProtocol.h"
36 #include "auth/AuthSessionHandler.h"
38 #include "include/sock_compat.h"
40 // Constant to limit starting sequence number to 2^31. Nothing special about it, just a big number. PLR
41 #define SEQ_MASK 0x7fffffff
42 #define dout_subsys ceph_subsys_ms
45 #define dout_prefix *_dout << *this
46 ostream
& Pipe::_pipe_prefix(std::ostream
&out
) const {
47 return out
<< "-- " << msgr
->get_myinst().addr
<< " >> " << peer_addr
<< " pipe(" << this
48 << " sd=" << sd
<< " :" << port
50 << " pgs=" << peer_global_seq
51 << " cs=" << connect_seq
52 << " l=" << policy
.lossy
53 << " c=" << connection_state
57 ostream
& operator<<(ostream
&out
, const Pipe
&pipe
) {
58 return pipe
._pipe_prefix(out
);
62 * The DelayedDelivery is for injecting delays into Message delivery off
63 * the socket. It is only enabled if delays are requested, and if they
64 * are then it pulls Messages off the DelayQueue and puts them into the
65 * in_q (SimpleMessenger::dispatch_queue).
66 * Please note that this probably has issues with Pipe shutdown and
67 * replacement semantics. I've tried, but no guarantees.
69 class Pipe::DelayedDelivery
: public Thread
{
71 std::deque
< pair
<utime_t
,Message
*> > delay_queue
;
76 bool stop_delayed_delivery
;
77 bool delay_dispatching
; // we are in fast dispatch now
78 bool stop_fast_dispatching_flag
; // we need to stop fast dispatching
81 explicit DelayedDelivery(Pipe
*p
)
83 delay_lock("Pipe::DelayedDelivery::delay_lock"), flush_count(0),
85 stop_delayed_delivery(false),
86 delay_dispatching(false),
87 stop_fast_dispatching_flag(false) { }
88 ~DelayedDelivery() override
{
91 void *entry() override
;
92 void queue(utime_t release
, Message
*m
) {
93 Mutex::Locker
l(delay_lock
);
94 delay_queue
.push_back(make_pair(release
, m
));
100 Mutex::Locker
l(delay_lock
);
101 return flush_count
> 0 || active_flush
;
103 void wait_for_flush() {
104 Mutex::Locker
l(delay_lock
);
105 while (flush_count
> 0 || active_flush
)
106 delay_cond
.Wait(delay_lock
);
110 stop_delayed_delivery
= true;
114 void steal_for_pipe(Pipe
*new_owner
) {
115 Mutex::Locker
l(delay_lock
);
119 * We need to stop fast dispatching before we need to stop putting
120 * normal messages into the DispatchQueue.
122 void stop_fast_dispatching();
125 /**************************************
129 Pipe::Pipe(SimpleMessenger
*r
, int st
, PipeConnection
*con
)
130 : RefCountedObject(r
->cct
),
135 conn_id(r
->dispatch_queue
.get_id()),
140 pipe_lock("SimpleMessenger::Pipe::pipe_lock"),
142 connection_state(NULL
),
143 reader_running(false), reader_needs_join(false),
144 reader_dispatching(false), notify_on_dispatch_done(false),
145 writer_running(false),
146 in_q(&(r
->dispatch_queue
)),
147 send_keepalive(false),
148 send_keepalive_ack(false),
149 connect_seq(0), peer_global_seq(0),
150 out_seq(0), in_seq(0), in_seq_acked(0) {
151 ANNOTATE_BENIGN_RACE_SIZED(&sd
, sizeof(sd
), "Pipe socket");
152 ANNOTATE_BENIGN_RACE_SIZED(&state
, sizeof(state
), "Pipe state");
153 ANNOTATE_BENIGN_RACE_SIZED(&recv_len
, sizeof(recv_len
), "Pipe recv_len");
154 ANNOTATE_BENIGN_RACE_SIZED(&recv_ofs
, sizeof(recv_ofs
), "Pipe recv_ofs");
156 connection_state
= con
;
157 connection_state
->reset_pipe(this);
159 connection_state
= new PipeConnection(msgr
->cct
, msgr
);
160 connection_state
->pipe
= get();
163 if (randomize_out_seq()) {
164 lsubdout(msgr
->cct
,ms
,15) << "Pipe(): Could not get random bytes to set seq number for session reset; set seq number to " << out_seq
<< dendl
;
168 msgr
->timeout
= msgr
->cct
->_conf
->ms_tcp_read_timeout
* 1000; //convert to ms
169 if (msgr
->timeout
== 0)
172 recv_max_prefetch
= msgr
->cct
->_conf
->ms_tcp_prefetch_max_size
;
173 recv_buf
= new char[recv_max_prefetch
];
178 assert(out_q
.empty());
179 assert(sent
.empty());
184 void Pipe::handle_ack(uint64_t seq
)
186 lsubdout(msgr
->cct
, ms
, 15) << "reader got ack seq " << seq
<< dendl
;
188 while (!sent
.empty() &&
189 sent
.front()->get_seq() <= seq
) {
190 Message
*m
= sent
.front();
192 lsubdout(msgr
->cct
, ms
, 10) << "reader got ack seq "
193 << seq
<< " >= " << m
->get_seq() << " on " << m
<< " " << *m
<< dendl
;
198 void Pipe::start_reader()
200 assert(pipe_lock
.is_locked());
201 assert(!reader_running
);
202 if (reader_needs_join
) {
203 reader_thread
.join();
204 reader_needs_join
= false;
206 reader_running
= true;
207 reader_thread
.create("ms_pipe_read", msgr
->cct
->_conf
->ms_rwthread_stack_bytes
);
210 void Pipe::maybe_start_delay_thread()
213 auto pos
= msgr
->cct
->_conf
->get_val
<std::string
>("ms_inject_delay_type").find(ceph_entity_type_name(connection_state
->peer_type
));
214 if (pos
!= string::npos
) {
215 lsubdout(msgr
->cct
, ms
, 1) << "setting up a delay queue on Pipe " << this << dendl
;
216 delay_thread
= new DelayedDelivery(this);
217 delay_thread
->create("ms_pipe_delay");
222 void Pipe::start_writer()
224 assert(pipe_lock
.is_locked());
225 assert(!writer_running
);
226 writer_running
= true;
227 writer_thread
.create("ms_pipe_write", msgr
->cct
->_conf
->ms_rwthread_stack_bytes
);
230 void Pipe::join_reader()
236 reader_thread
.join();
238 reader_needs_join
= false;
241 void Pipe::DelayedDelivery::discard()
243 lgeneric_subdout(pipe
->msgr
->cct
, ms
, 20) << *pipe
<< "DelayedDelivery::discard" << dendl
;
244 Mutex::Locker
l(delay_lock
);
245 while (!delay_queue
.empty()) {
246 Message
*m
= delay_queue
.front().second
;
247 pipe
->in_q
->dispatch_throttle_release(m
->get_dispatch_throttle_size());
249 delay_queue
.pop_front();
253 void Pipe::DelayedDelivery::flush()
255 lgeneric_subdout(pipe
->msgr
->cct
, ms
, 20) << *pipe
<< "DelayedDelivery::flush" << dendl
;
256 Mutex::Locker
l(delay_lock
);
257 flush_count
= delay_queue
.size();
261 void *Pipe::DelayedDelivery::entry()
263 Mutex::Locker
locker(delay_lock
);
264 lgeneric_subdout(pipe
->msgr
->cct
, ms
, 20) << *pipe
<< "DelayedDelivery::entry start" << dendl
;
266 while (!stop_delayed_delivery
) {
267 if (delay_queue
.empty()) {
268 lgeneric_subdout(pipe
->msgr
->cct
, ms
, 30) << *pipe
<< "DelayedDelivery::entry sleeping on delay_cond because delay queue is empty" << dendl
;
269 delay_cond
.Wait(delay_lock
);
272 utime_t release
= delay_queue
.front().first
;
273 Message
*m
= delay_queue
.front().second
;
274 string delay_msg_type
= pipe
->msgr
->cct
->_conf
->ms_inject_delay_msg_type
;
276 (release
> ceph_clock_now() &&
277 (delay_msg_type
.empty() || m
->get_type_name() == delay_msg_type
))) {
278 lgeneric_subdout(pipe
->msgr
->cct
, ms
, 10) << *pipe
<< "DelayedDelivery::entry sleeping on delay_cond until " << release
<< dendl
;
279 delay_cond
.WaitUntil(delay_lock
, release
);
282 lgeneric_subdout(pipe
->msgr
->cct
, ms
, 10) << *pipe
<< "DelayedDelivery::entry dequeuing message " << m
<< " for delivery, past " << release
<< dendl
;
283 delay_queue
.pop_front();
284 if (flush_count
> 0) {
288 if (pipe
->in_q
->can_fast_dispatch(m
)) {
289 if (!stop_fast_dispatching_flag
) {
290 delay_dispatching
= true;
292 pipe
->in_q
->fast_dispatch(m
);
294 delay_dispatching
= false;
295 if (stop_fast_dispatching_flag
) {
296 // we need to let the stopping thread proceed
303 pipe
->in_q
->enqueue(m
, m
->get_priority(), pipe
->conn_id
);
305 active_flush
= false;
307 lgeneric_subdout(pipe
->msgr
->cct
, ms
, 20) << *pipe
<< "DelayedDelivery::entry stop" << dendl
;
311 void Pipe::DelayedDelivery::stop_fast_dispatching() {
312 Mutex::Locker
l(delay_lock
);
313 stop_fast_dispatching_flag
= true;
314 while (delay_dispatching
)
315 delay_cond
.Wait(delay_lock
);
321 ldout(msgr
->cct
,10) << "accept" << dendl
;
322 assert(pipe_lock
.is_locked());
323 assert(state
== STATE_ACCEPTING
);
329 entity_addr_t socket_addr
;
332 char banner
[strlen(CEPH_BANNER
)+1];
334 ceph_msg_connect connect
;
335 ceph_msg_connect_reply reply
;
338 bufferlist authorizer
, authorizer_reply
;
339 bool authorizer_valid
;
340 uint64_t feat_missing
;
341 bool replaced
= false;
342 // this variable denotes if the connection attempt from peer is a hard
343 // reset or not, it is true if there is an existing connection and the
344 // connection sequence from peer is equal to zero
345 bool is_reset_from_peer
= false;
346 CryptoKey session_key
;
347 int removed
; // single-use down below
349 // this should roughly mirror pseudocode at
350 // http://ceph.com/wiki/Messaging_protocol
352 uint64_t existing_seq
= -1;
354 // used for reading in the remote acked seq on connect
355 uint64_t newly_acked_seq
= 0;
359 set_socket_options();
362 r
= tcp_write(CEPH_BANNER
, strlen(CEPH_BANNER
));
364 ldout(msgr
->cct
,10) << "accept couldn't write banner" << dendl
;
369 ::encode(msgr
->my_inst
.addr
, addrs
, 0); // legacy
371 port
= msgr
->my_inst
.addr
.get_port();
373 // and peer's socket addr (they might not know their ip)
376 r
= ::getpeername(sd
, (sockaddr
*)&ss
, &len
);
378 ldout(msgr
->cct
,0) << "accept failed to getpeername " << cpp_strerror(errno
) << dendl
;
381 socket_addr
.set_sockaddr((sockaddr
*)&ss
);
382 ::encode(socket_addr
, addrs
, 0); // legacy
384 r
= tcp_write(addrs
.c_str(), addrs
.length());
386 ldout(msgr
->cct
,10) << "accept couldn't write my+peer addr" << dendl
;
390 ldout(msgr
->cct
,1) << "accept sd=" << sd
<< " " << socket_addr
<< dendl
;
393 if (tcp_read(banner
, strlen(CEPH_BANNER
)) < 0) {
394 ldout(msgr
->cct
,10) << "accept couldn't read banner" << dendl
;
397 if (memcmp(banner
, CEPH_BANNER
, strlen(CEPH_BANNER
))) {
398 banner
[strlen(CEPH_BANNER
)] = 0;
399 ldout(msgr
->cct
,1) << "accept peer sent bad banner '" << banner
<< "' (should be '" << CEPH_BANNER
<< "')" << dendl
;
403 bufferptr
tp(sizeof(ceph_entity_addr
));
404 addrbl
.push_back(std::move(tp
));
406 if (tcp_read(addrbl
.c_str(), addrbl
.length()) < 0) {
407 ldout(msgr
->cct
,10) << "accept couldn't read peer_addr" << dendl
;
411 bufferlist::iterator ti
= addrbl
.begin();
412 ::decode(peer_addr
, ti
);
415 ldout(msgr
->cct
,10) << "accept peer addr is " << peer_addr
<< dendl
;
416 if (peer_addr
.is_blank_ip()) {
417 // peer apparently doesn't know what ip they have; figure it out for them.
418 int port
= peer_addr
.get_port();
419 peer_addr
.u
= socket_addr
.u
;
420 peer_addr
.set_port(port
);
421 ldout(msgr
->cct
,0) << "accept peer addr is really " << peer_addr
422 << " (socket is " << socket_addr
<< ")" << dendl
;
424 set_peer_addr(peer_addr
); // so that connection_state gets set up
427 if (tcp_read((char*)&connect
, sizeof(connect
)) < 0) {
428 ldout(msgr
->cct
,10) << "accept couldn't read connect" << dendl
;
433 if (connect
.authorizer_len
) {
434 bp
= buffer::create(connect
.authorizer_len
);
435 if (tcp_read(bp
.c_str(), connect
.authorizer_len
) < 0) {
436 ldout(msgr
->cct
,10) << "accept couldn't read connect authorizer" << dendl
;
439 authorizer
.push_back(std::move(bp
));
440 authorizer_reply
.clear();
443 ldout(msgr
->cct
,20) << "accept got peer connect_seq " << connect
.connect_seq
444 << " global_seq " << connect
.global_seq
447 msgr
->lock
.Lock(); // FIXME
449 if (msgr
->dispatch_queue
.stop
)
451 if (state
!= STATE_ACCEPTING
) {
455 // note peer's type, flags
456 set_peer_type(connect
.host_type
);
457 policy
= msgr
->get_policy(connect
.host_type
);
458 ldout(msgr
->cct
,10) << "accept of host_type " << connect
.host_type
459 << ", policy.lossy=" << policy
.lossy
460 << " policy.server=" << policy
.server
461 << " policy.standby=" << policy
.standby
462 << " policy.resetcheck=" << policy
.resetcheck
465 memset(&reply
, 0, sizeof(reply
));
466 reply
.protocol_version
= msgr
->get_proto_version(peer_type
, false);
470 ldout(msgr
->cct
,10) << "accept my proto " << reply
.protocol_version
471 << ", their proto " << connect
.protocol_version
<< dendl
;
472 if (connect
.protocol_version
!= reply
.protocol_version
) {
473 reply
.tag
= CEPH_MSGR_TAG_BADPROTOVER
;
477 // require signatures for cephx?
478 if (connect
.authorizer_protocol
== CEPH_AUTH_CEPHX
) {
479 if (peer_type
== CEPH_ENTITY_TYPE_OSD
||
480 peer_type
== CEPH_ENTITY_TYPE_MDS
) {
481 if (msgr
->cct
->_conf
->cephx_require_signatures
||
482 msgr
->cct
->_conf
->cephx_cluster_require_signatures
) {
483 ldout(msgr
->cct
,10) << "using cephx, requiring MSG_AUTH feature bit for cluster" << dendl
;
484 policy
.features_required
|= CEPH_FEATURE_MSG_AUTH
;
487 if (msgr
->cct
->_conf
->cephx_require_signatures
||
488 msgr
->cct
->_conf
->cephx_service_require_signatures
) {
489 ldout(msgr
->cct
,10) << "using cephx, requiring MSG_AUTH feature bit for service" << dendl
;
490 policy
.features_required
|= CEPH_FEATURE_MSG_AUTH
;
495 feat_missing
= policy
.features_required
& ~(uint64_t)connect
.features
;
497 ldout(msgr
->cct
,1) << "peer missing required features " << std::hex
<< feat_missing
<< std::dec
<< dendl
;
498 reply
.tag
= CEPH_MSGR_TAG_FEATURES
;
502 // Check the authorizer. If not good, bail out.
506 if (!msgr
->verify_authorizer(connection_state
.get(), peer_type
, connect
.authorizer_protocol
, authorizer
,
507 authorizer_reply
, authorizer_valid
, session_key
) ||
509 ldout(msgr
->cct
,0) << "accept: got bad authorizer" << dendl
;
511 if (state
!= STATE_ACCEPTING
)
512 goto shutting_down_msgr_unlocked
;
513 reply
.tag
= CEPH_MSGR_TAG_BADAUTHORIZER
;
514 session_security
.reset();
518 // We've verified the authorizer for this pipe, so set up the session security structure. PLR
520 ldout(msgr
->cct
,10) << "accept: setting up session_security." << dendl
;
522 retry_existing_lookup
:
525 if (msgr
->dispatch_queue
.stop
)
527 if (state
!= STATE_ACCEPTING
)
531 existing
= msgr
->_lookup_pipe(peer_addr
);
533 existing
->pipe_lock
.Lock(true); // skip lockdep check (we are locking a second Pipe here)
534 if (existing
->reader_dispatching
) {
535 /** we need to wait, or we can deadlock if downstream
536 * fast_dispatchers are (naughtily!) waiting on resources
537 * held by somebody trying to make use of the SimpleMessenger lock.
538 * So drop locks, wait, and retry. It just looks like a slow network
541 * We take a ref to existing here since it might get reaped before we
542 * wake up (see bug #15870). We can be confident that it lived until
543 * locked it since we held the msgr lock from _lookup_pipe through to
544 * locking existing->lock and checking reader_dispatching.
549 existing
->notify_on_dispatch_done
= true;
550 while (existing
->reader_dispatching
)
551 existing
->cond
.Wait(existing
->pipe_lock
);
552 existing
->pipe_lock
.Unlock();
555 goto retry_existing_lookup
;
558 if (connect
.global_seq
< existing
->peer_global_seq
) {
559 ldout(msgr
->cct
,10) << "accept existing " << existing
<< ".gseq " << existing
->peer_global_seq
560 << " > " << connect
.global_seq
<< ", RETRY_GLOBAL" << dendl
;
561 reply
.tag
= CEPH_MSGR_TAG_RETRY_GLOBAL
;
562 reply
.global_seq
= existing
->peer_global_seq
; // so we can send it below..
563 existing
->pipe_lock
.Unlock();
567 ldout(msgr
->cct
,10) << "accept existing " << existing
<< ".gseq " << existing
->peer_global_seq
568 << " <= " << connect
.global_seq
<< ", looks ok" << dendl
;
571 if (existing
->policy
.lossy
) {
572 ldout(msgr
->cct
,0) << "accept replacing existing (lossy) channel (new one lossy="
573 << policy
.lossy
<< ")" << dendl
;
574 existing
->was_session_reset();
578 ldout(msgr
->cct
,0) << "accept connect_seq " << connect
.connect_seq
579 << " vs existing " << existing
->connect_seq
580 << " state " << existing
->get_state_name() << dendl
;
582 if (connect
.connect_seq
== 0 && existing
->connect_seq
> 0) {
583 ldout(msgr
->cct
,0) << "accept peer reset, then tried to connect to us, replacing" << dendl
;
584 // this is a hard reset from peer
585 is_reset_from_peer
= true;
586 if (policy
.resetcheck
)
587 existing
->was_session_reset(); // this resets out_queue, msg_ and connect_seq #'s
591 if (connect
.connect_seq
< existing
->connect_seq
) {
592 // old attempt, or we sent READY but they didn't get it.
593 ldout(msgr
->cct
,10) << "accept existing " << existing
<< ".cseq " << existing
->connect_seq
594 << " > " << connect
.connect_seq
<< ", RETRY_SESSION" << dendl
;
598 if (connect
.connect_seq
== existing
->connect_seq
) {
599 // if the existing connection successfully opened, and/or
600 // subsequently went to standby, then the peer should bump
601 // their connect_seq and retry: this is not a connection race
602 // we need to resolve here.
603 if (existing
->state
== STATE_OPEN
||
604 existing
->state
== STATE_STANDBY
) {
605 ldout(msgr
->cct
,10) << "accept connection race, existing " << existing
606 << ".cseq " << existing
->connect_seq
607 << " == " << connect
.connect_seq
608 << ", OPEN|STANDBY, RETRY_SESSION" << dendl
;
613 if (peer_addr
< msgr
->my_inst
.addr
||
614 existing
->policy
.server
) {
616 ldout(msgr
->cct
,10) << "accept connection race, existing " << existing
<< ".cseq " << existing
->connect_seq
617 << " == " << connect
.connect_seq
<< ", or we are server, replacing my attempt" << dendl
;
618 if (!(existing
->state
== STATE_CONNECTING
||
619 existing
->state
== STATE_WAIT
))
620 lderr(msgr
->cct
) << "accept race bad state, would replace, existing="
621 << existing
->get_state_name()
622 << " " << existing
<< ".cseq=" << existing
->connect_seq
623 << " == " << connect
.connect_seq
625 assert(existing
->state
== STATE_CONNECTING
||
626 existing
->state
== STATE_WAIT
);
629 // our existing outgoing wins
630 ldout(msgr
->cct
,10) << "accept connection race, existing " << existing
<< ".cseq " << existing
->connect_seq
631 << " == " << connect
.connect_seq
<< ", sending WAIT" << dendl
;
632 assert(peer_addr
> msgr
->my_inst
.addr
);
633 if (!(existing
->state
== STATE_CONNECTING
))
634 lderr(msgr
->cct
) << "accept race bad state, would send wait, existing="
635 << existing
->get_state_name()
636 << " " << existing
<< ".cseq=" << existing
->connect_seq
637 << " == " << connect
.connect_seq
639 assert(existing
->state
== STATE_CONNECTING
);
640 // make sure our outgoing connection will follow through
641 existing
->_send_keepalive();
642 reply
.tag
= CEPH_MSGR_TAG_WAIT
;
643 existing
->pipe_lock
.Unlock();
649 assert(connect
.connect_seq
> existing
->connect_seq
);
650 assert(connect
.global_seq
>= existing
->peer_global_seq
);
651 if (policy
.resetcheck
&& // RESETSESSION only used by servers; peers do not reset each other
652 existing
->connect_seq
== 0) {
653 ldout(msgr
->cct
,0) << "accept we reset (peer sent cseq " << connect
.connect_seq
654 << ", " << existing
<< ".cseq = " << existing
->connect_seq
655 << "), sending RESETSESSION" << dendl
;
656 reply
.tag
= CEPH_MSGR_TAG_RESETSESSION
;
658 existing
->pipe_lock
.Unlock();
663 ldout(msgr
->cct
,10) << "accept peer sent cseq " << connect
.connect_seq
664 << " > " << existing
->connect_seq
<< dendl
;
667 else if (connect
.connect_seq
> 0) {
668 // we reset, and they are opening a new session
669 ldout(msgr
->cct
,0) << "accept we reset (peer sent cseq " << connect
.connect_seq
<< "), sending RESETSESSION" << dendl
;
671 reply
.tag
= CEPH_MSGR_TAG_RESETSESSION
;
675 ldout(msgr
->cct
,10) << "accept new session" << dendl
;
682 assert(existing
->pipe_lock
.is_locked());
683 assert(pipe_lock
.is_locked());
684 reply
.tag
= CEPH_MSGR_TAG_RETRY_SESSION
;
685 reply
.connect_seq
= existing
->connect_seq
+ 1;
686 existing
->pipe_lock
.Unlock();
691 assert(pipe_lock
.is_locked());
692 reply
.features
= ((uint64_t)connect
.features
& policy
.features_supported
) | policy
.features_required
;
693 reply
.authorizer_len
= authorizer_reply
.length();
695 r
= tcp_write((char*)&reply
, sizeof(reply
));
698 if (reply
.authorizer_len
) {
699 r
= tcp_write(authorizer_reply
.c_str(), authorizer_reply
.length());
706 assert(existing
->pipe_lock
.is_locked());
707 assert(pipe_lock
.is_locked());
708 // if it is a hard reset from peer, we don't need a round-trip to negotiate in/out sequence
709 if ((connect
.features
& CEPH_FEATURE_RECONNECT_SEQ
) && !is_reset_from_peer
) {
710 reply_tag
= CEPH_MSGR_TAG_SEQ
;
711 existing_seq
= existing
->in_seq
;
713 ldout(msgr
->cct
,10) << "accept replacing " << existing
<< dendl
;
715 existing
->unregister_pipe();
718 if (existing
->policy
.lossy
) {
719 // disconnect from the Connection
720 assert(existing
->connection_state
);
721 if (existing
->connection_state
->clear_pipe(existing
))
722 msgr
->dispatch_queue
.queue_reset(existing
->connection_state
.get());
724 // queue a reset on the new connection, which we're dumping for the old
725 msgr
->dispatch_queue
.queue_reset(connection_state
.get());
727 // drop my Connection, and take a ref to the existing one. do not
728 // clear existing->connection_state, since read_message and
729 // write_message both dereference it without pipe_lock.
730 connection_state
= existing
->connection_state
;
732 // make existing Connection reference us
733 connection_state
->reset_pipe(this);
735 if (existing
->delay_thread
) {
736 existing
->delay_thread
->steal_for_pipe(this);
737 delay_thread
= existing
->delay_thread
;
738 existing
->delay_thread
= NULL
;
739 delay_thread
->flush();
742 // steal incoming queue
743 uint64_t replaced_conn_id
= conn_id
;
744 conn_id
= existing
->conn_id
;
745 existing
->conn_id
= replaced_conn_id
;
747 // reset the in_seq if this is a hard reset from peer,
748 // otherwise we respect our original connection's value
749 in_seq
= is_reset_from_peer
? 0 : existing
->in_seq
;
750 in_seq_acked
= in_seq
;
752 // steal outgoing queue and out_seq
753 existing
->requeue_sent();
754 out_seq
= existing
->out_seq
;
755 ldout(msgr
->cct
,10) << "accept re-queuing on out_seq " << out_seq
<< " in_seq " << in_seq
<< dendl
;
756 for (map
<int, list
<Message
*> >::iterator p
= existing
->out_q
.begin();
757 p
!= existing
->out_q
.end();
759 out_q
[p
->first
].splice(out_q
[p
->first
].begin(), p
->second
);
761 existing
->stop_and_wait();
762 existing
->pipe_lock
.Unlock();
766 assert(pipe_lock
.is_locked());
767 connect_seq
= connect
.connect_seq
+ 1;
768 peer_global_seq
= connect
.global_seq
;
769 assert(state
== STATE_ACCEPTING
);
771 ldout(msgr
->cct
,10) << "accept success, connect_seq = " << connect_seq
<< ", sending READY" << dendl
;
774 reply
.tag
= (reply_tag
? reply_tag
: CEPH_MSGR_TAG_READY
);
775 reply
.features
= policy
.features_supported
;
776 reply
.global_seq
= msgr
->get_global_seq();
777 reply
.connect_seq
= connect_seq
;
779 reply
.authorizer_len
= authorizer_reply
.length();
781 reply
.flags
= reply
.flags
| CEPH_MSG_CONNECT_LOSSY
;
783 connection_state
->set_features((uint64_t)reply
.features
& (uint64_t)connect
.features
);
784 ldout(msgr
->cct
,10) << "accept features " << connection_state
->get_features() << dendl
;
786 session_security
.reset(
787 get_auth_session_handler(msgr
->cct
,
788 connect
.authorizer_protocol
,
790 connection_state
->get_features()));
793 msgr
->dispatch_queue
.queue_accept(connection_state
.get());
794 msgr
->ms_deliver_handle_fast_accept(connection_state
.get());
797 if (msgr
->dispatch_queue
.stop
)
799 removed
= msgr
->accepting_pipes
.erase(this);
800 assert(removed
== 1);
805 r
= tcp_write((char*)&reply
, sizeof(reply
));
807 goto fail_registered
;
810 if (reply
.authorizer_len
) {
811 r
= tcp_write(authorizer_reply
.c_str(), authorizer_reply
.length());
813 goto fail_registered
;
817 if (reply_tag
== CEPH_MSGR_TAG_SEQ
) {
818 if (tcp_write((char*)&existing_seq
, sizeof(existing_seq
)) < 0) {
819 ldout(msgr
->cct
,2) << "accept write error on in_seq" << dendl
;
820 goto fail_registered
;
822 if (tcp_read((char*)&newly_acked_seq
, sizeof(newly_acked_seq
)) < 0) {
823 ldout(msgr
->cct
,2) << "accept read error on newly_acked_seq" << dendl
;
824 goto fail_registered
;
829 discard_requeued_up_to(newly_acked_seq
);
830 if (state
!= STATE_CLOSED
) {
831 ldout(msgr
->cct
,10) << "accept starting writer, state " << get_state_name() << dendl
;
834 ldout(msgr
->cct
,20) << "accept done" << dendl
;
836 maybe_start_delay_thread();
838 return 0; // success.
841 ldout(msgr
->cct
, 10) << "accept fault after register" << dendl
;
843 if (msgr
->cct
->_conf
->ms_inject_internal_delays
) {
844 ldout(msgr
->cct
, 10) << " sleep for " << msgr
->cct
->_conf
->ms_inject_internal_delays
<< dendl
;
846 t
.set_from_double(msgr
->cct
->_conf
->ms_inject_internal_delays
);
852 if (state
!= STATE_CLOSED
) {
853 bool queued
= is_queued();
854 ldout(msgr
->cct
, 10) << " queued = " << (int)queued
<< dendl
;
856 state
= policy
.server
? STATE_STANDBY
: STATE_CONNECTING
;
857 } else if (replaced
) {
858 state
= STATE_STANDBY
;
860 state
= STATE_CLOSED
;
864 if (queued
|| replaced
)
871 shutting_down_msgr_unlocked
:
872 assert(pipe_lock
.is_locked());
874 if (msgr
->cct
->_conf
->ms_inject_internal_delays
) {
875 ldout(msgr
->cct
, 10) << " sleep for " << msgr
->cct
->_conf
->ms_inject_internal_delays
<< dendl
;
877 t
.set_from_double(msgr
->cct
->_conf
->ms_inject_internal_delays
);
881 state
= STATE_CLOSED
;
887 void Pipe::set_socket_options()
889 // disable Nagle algorithm?
890 if (msgr
->cct
->_conf
->ms_tcp_nodelay
) {
892 int r
= ::setsockopt(sd
, IPPROTO_TCP
, TCP_NODELAY
, (char*)&flag
, sizeof(flag
));
895 ldout(msgr
->cct
,0) << "couldn't set TCP_NODELAY: "
896 << cpp_strerror(r
) << dendl
;
899 if (msgr
->cct
->_conf
->ms_tcp_rcvbuf
) {
900 int size
= msgr
->cct
->_conf
->ms_tcp_rcvbuf
;
901 int r
= ::setsockopt(sd
, SOL_SOCKET
, SO_RCVBUF
, (void*)&size
, sizeof(size
));
904 ldout(msgr
->cct
,0) << "couldn't set SO_RCVBUF to " << size
905 << ": " << cpp_strerror(r
) << dendl
;
910 #ifdef CEPH_USE_SO_NOSIGPIPE
912 int r
= ::setsockopt(sd
, SOL_SOCKET
, SO_NOSIGPIPE
, (void*)&val
, sizeof(val
));
915 ldout(msgr
->cct
,0) << "couldn't set SO_NOSIGPIPE: "
916 << cpp_strerror(r
) << dendl
;
921 int prio
= msgr
->get_socket_priority();
924 #ifdef IPTOS_CLASS_CS6
925 int iptos
= IPTOS_CLASS_CS6
;
927 if (!peer_addr
.is_blank_ip()) {
928 addr_family
= peer_addr
.get_family();
930 addr_family
= msgr
->get_myaddr().get_family();
932 switch (addr_family
) {
934 r
= ::setsockopt(sd
, IPPROTO_IP
, IP_TOS
, &iptos
, sizeof(iptos
));
937 r
= ::setsockopt(sd
, IPPROTO_IPV6
, IPV6_TCLASS
, &iptos
, sizeof(iptos
));
940 lderr(msgr
->cct
) << "couldn't set ToS of unknown family ("
941 << addr_family
<< ")"
942 << " to " << iptos
<< dendl
;
947 ldout(msgr
->cct
,0) << "couldn't set TOS to " << iptos
948 << ": " << cpp_strerror(r
) << dendl
;
951 // setsockopt(IPTOS_CLASS_CS6) sets the priority of the socket as 0.
952 // See http://goo.gl/QWhvsD and http://goo.gl/laTbjT
953 // We need to call setsockopt(SO_PRIORITY) after it.
954 r
= ::setsockopt(sd
, SOL_SOCKET
, SO_PRIORITY
, &prio
, sizeof(prio
));
957 ldout(msgr
->cct
,0) << "couldn't set SO_PRIORITY to " << prio
958 << ": " << cpp_strerror(r
) << dendl
;
966 bool got_bad_auth
= false;
968 ldout(msgr
->cct
,10) << "connect " << connect_seq
<< dendl
;
969 assert(pipe_lock
.is_locked());
971 __u32 cseq
= connect_seq
;
972 __u32 gseq
= msgr
->get_global_seq();
974 // stop reader thread
982 struct iovec msgvec
[2];
984 char banner
[strlen(CEPH_BANNER
) + 1]; // extra byte makes coverity happy
986 entity_addr_t peer_addr_for_me
, socket_addr
;
987 AuthAuthorizer
*authorizer
= NULL
;
988 bufferlist addrbl
, myaddrbl
;
989 const md_config_t
*conf
= msgr
->cct
->_conf
;
991 // close old socket. this is safe because we stopped the reader thread above.
996 sd
= ::socket(peer_addr
.get_family(), SOCK_STREAM
, 0);
999 lderr(msgr
->cct
) << "connect couldn't create socket " << cpp_strerror(rc
) << dendl
;
1005 set_socket_options();
1008 entity_addr_t addr2bind
= msgr
->get_myaddr();
1009 if (msgr
->cct
->_conf
->ms_bind_before_connect
&& (!addr2bind
.is_blank_ip())) {
1010 addr2bind
.set_port(0);
1011 int r
= ::bind(sd
, addr2bind
.get_sockaddr(), addr2bind
.get_sockaddr_len());
1013 ldout(msgr
->cct
,2) << "client bind error " << ", " << cpp_strerror(errno
) << dendl
;
1020 ldout(msgr
->cct
,10) << "connecting to " << peer_addr
<< dendl
;
1021 rc
= ::connect(sd
, peer_addr
.get_sockaddr(), peer_addr
.get_sockaddr_len());
1023 int stored_errno
= errno
;
1024 ldout(msgr
->cct
,2) << "connect error " << peer_addr
1025 << ", " << cpp_strerror(stored_errno
) << dendl
;
1026 if (stored_errno
== ECONNREFUSED
) {
1027 ldout(msgr
->cct
, 2) << "connection refused!" << dendl
;
1028 msgr
->dispatch_queue
.queue_refused(connection_state
.get());
1034 // FIXME: this should be non-blocking, or in some other way verify the banner as we get it.
1035 rc
= tcp_read((char*)&banner
, strlen(CEPH_BANNER
));
1037 ldout(msgr
->cct
,2) << "connect couldn't read banner, " << cpp_strerror(rc
) << dendl
;
1040 if (memcmp(banner
, CEPH_BANNER
, strlen(CEPH_BANNER
))) {
1041 ldout(msgr
->cct
,0) << "connect protocol error (bad banner) on peer " << peer_addr
<< dendl
;
1045 memset(&msg
, 0, sizeof(msg
));
1046 msgvec
[0].iov_base
= banner
;
1047 msgvec
[0].iov_len
= strlen(CEPH_BANNER
);
1048 msg
.msg_iov
= msgvec
;
1050 msglen
= msgvec
[0].iov_len
;
1051 rc
= do_sendmsg(&msg
, msglen
);
1053 ldout(msgr
->cct
,2) << "connect couldn't write my banner, " << cpp_strerror(rc
) << dendl
;
1059 #if defined(__linux__) || defined(DARWIN) || defined(__FreeBSD__)
1060 bufferptr
p(sizeof(ceph_entity_addr
) * 2);
1062 int wirelen
= sizeof(__u32
) * 2 + sizeof(ceph_sockaddr_storage
);
1063 bufferptr
p(wirelen
* 2);
1065 addrbl
.push_back(std::move(p
));
1067 rc
= tcp_read(addrbl
.c_str(), addrbl
.length());
1069 ldout(msgr
->cct
,2) << "connect couldn't read peer addrs, " << cpp_strerror(rc
) << dendl
;
1073 bufferlist::iterator p
= addrbl
.begin();
1075 ::decode(peer_addr_for_me
, p
);
1077 catch (buffer::error
& e
) {
1078 ldout(msgr
->cct
,2) << "connect couldn't decode peer addrs: " << e
.what()
1082 port
= peer_addr_for_me
.get_port();
1084 ldout(msgr
->cct
,20) << "connect read peer addr " << paddr
<< " on socket " << sd
<< dendl
;
1085 if (peer_addr
!= paddr
) {
1086 if (paddr
.is_blank_ip() &&
1087 peer_addr
.get_port() == paddr
.get_port() &&
1088 peer_addr
.get_nonce() == paddr
.get_nonce()) {
1089 ldout(msgr
->cct
,0) << "connect claims to be "
1090 << paddr
<< " not " << peer_addr
<< " - presumably this is the same node!" << dendl
;
1092 ldout(msgr
->cct
,10) << "connect claims to be "
1093 << paddr
<< " not " << peer_addr
<< dendl
;
1098 ldout(msgr
->cct
,20) << "connect peer addr for me is " << peer_addr_for_me
<< dendl
;
1100 msgr
->learned_addr(peer_addr_for_me
);
1102 ::encode(msgr
->my_inst
.addr
, myaddrbl
, 0); // legacy
1104 memset(&msg
, 0, sizeof(msg
));
1105 msgvec
[0].iov_base
= myaddrbl
.c_str();
1106 msgvec
[0].iov_len
= myaddrbl
.length();
1107 msg
.msg_iov
= msgvec
;
1109 msglen
= msgvec
[0].iov_len
;
1110 rc
= do_sendmsg(&msg
, msglen
);
1112 ldout(msgr
->cct
,2) << "connect couldn't write my addr, " << cpp_strerror(rc
) << dendl
;
1115 ldout(msgr
->cct
,10) << "connect sent my addr " << msgr
->my_inst
.addr
<< dendl
;
1120 authorizer
= msgr
->get_authorizer(peer_type
, false);
1121 bufferlist authorizer_reply
;
1123 ceph_msg_connect connect
;
1124 connect
.features
= policy
.features_supported
;
1125 connect
.host_type
= msgr
->get_myinst().name
.type();
1126 connect
.global_seq
= gseq
;
1127 connect
.connect_seq
= cseq
;
1128 connect
.protocol_version
= msgr
->get_proto_version(peer_type
, true);
1129 connect
.authorizer_protocol
= authorizer
? authorizer
->protocol
: 0;
1130 connect
.authorizer_len
= authorizer
? authorizer
->bl
.length() : 0;
1132 ldout(msgr
->cct
,10) << "connect.authorizer_len=" << connect
.authorizer_len
1133 << " protocol=" << connect
.authorizer_protocol
<< dendl
;
1136 connect
.flags
|= CEPH_MSG_CONNECT_LOSSY
; // this is fyi, actually, server decides!
1137 memset(&msg
, 0, sizeof(msg
));
1138 msgvec
[0].iov_base
= (char*)&connect
;
1139 msgvec
[0].iov_len
= sizeof(connect
);
1140 msg
.msg_iov
= msgvec
;
1142 msglen
= msgvec
[0].iov_len
;
1144 msgvec
[1].iov_base
= authorizer
->bl
.c_str();
1145 msgvec
[1].iov_len
= authorizer
->bl
.length();
1147 msglen
+= msgvec
[1].iov_len
;
1150 ldout(msgr
->cct
,10) << "connect sending gseq=" << gseq
<< " cseq=" << cseq
1151 << " proto=" << connect
.protocol_version
<< dendl
;
1152 rc
= do_sendmsg(&msg
, msglen
);
1154 ldout(msgr
->cct
,2) << "connect couldn't write gseq, cseq, " << cpp_strerror(rc
) << dendl
;
1158 ldout(msgr
->cct
,20) << "connect wrote (self +) cseq, waiting for reply" << dendl
;
1159 ceph_msg_connect_reply reply
;
1160 rc
= tcp_read((char*)&reply
, sizeof(reply
));
1162 ldout(msgr
->cct
,2) << "connect read reply " << cpp_strerror(rc
) << dendl
;
1166 ldout(msgr
->cct
,20) << "connect got reply tag " << (int)reply
.tag
1167 << " connect_seq " << reply
.connect_seq
1168 << " global_seq " << reply
.global_seq
1169 << " proto " << reply
.protocol_version
1170 << " flags " << (int)reply
.flags
1171 << " features " << reply
.features
1174 authorizer_reply
.clear();
1176 if (reply
.authorizer_len
) {
1177 ldout(msgr
->cct
,10) << "reply.authorizer_len=" << reply
.authorizer_len
<< dendl
;
1178 bufferptr bp
= buffer::create(reply
.authorizer_len
);
1179 rc
= tcp_read(bp
.c_str(), reply
.authorizer_len
);
1181 ldout(msgr
->cct
,10) << "connect couldn't read connect authorizer_reply" << cpp_strerror(rc
) << dendl
;
1184 authorizer_reply
.push_back(bp
);
1188 bufferlist::iterator iter
= authorizer_reply
.begin();
1189 if (!authorizer
->verify_reply(iter
)) {
1190 ldout(msgr
->cct
,0) << "failed verifying authorize reply" << dendl
;
1195 if (conf
->ms_inject_internal_delays
) {
1196 ldout(msgr
->cct
, 10) << " sleep for " << msgr
->cct
->_conf
->ms_inject_internal_delays
<< dendl
;
1198 t
.set_from_double(msgr
->cct
->_conf
->ms_inject_internal_delays
);
1203 if (state
!= STATE_CONNECTING
) {
1204 ldout(msgr
->cct
,0) << "connect got RESETSESSION but no longer connecting" << dendl
;
1208 if (reply
.tag
== CEPH_MSGR_TAG_FEATURES
) {
1209 ldout(msgr
->cct
,0) << "connect protocol feature mismatch, my " << std::hex
1210 << connect
.features
<< " < peer " << reply
.features
1211 << " missing " << (reply
.features
& ~policy
.features_supported
)
1212 << std::dec
<< dendl
;
1216 if (reply
.tag
== CEPH_MSGR_TAG_BADPROTOVER
) {
1217 ldout(msgr
->cct
,0) << "connect protocol version mismatch, my " << connect
.protocol_version
1218 << " != " << reply
.protocol_version
<< dendl
;
1222 if (reply
.tag
== CEPH_MSGR_TAG_BADAUTHORIZER
) {
1223 ldout(msgr
->cct
,0) << "connect got BADAUTHORIZER" << dendl
;
1226 got_bad_auth
= true;
1229 authorizer
= msgr
->get_authorizer(peer_type
, true); // try harder
1232 if (reply
.tag
== CEPH_MSGR_TAG_RESETSESSION
) {
1233 ldout(msgr
->cct
,0) << "connect got RESETSESSION" << dendl
;
1234 was_session_reset();
1239 if (reply
.tag
== CEPH_MSGR_TAG_RETRY_GLOBAL
) {
1240 gseq
= msgr
->get_global_seq(reply
.global_seq
);
1241 ldout(msgr
->cct
,10) << "connect got RETRY_GLOBAL " << reply
.global_seq
1242 << " chose new " << gseq
<< dendl
;
1246 if (reply
.tag
== CEPH_MSGR_TAG_RETRY_SESSION
) {
1247 assert(reply
.connect_seq
> connect_seq
);
1248 ldout(msgr
->cct
,10) << "connect got RETRY_SESSION " << connect_seq
1249 << " -> " << reply
.connect_seq
<< dendl
;
1250 cseq
= connect_seq
= reply
.connect_seq
;
1255 if (reply
.tag
== CEPH_MSGR_TAG_WAIT
) {
1256 ldout(msgr
->cct
,3) << "connect got WAIT (connection race)" << dendl
;
1261 if (reply
.tag
== CEPH_MSGR_TAG_READY
||
1262 reply
.tag
== CEPH_MSGR_TAG_SEQ
) {
1263 uint64_t feat_missing
= policy
.features_required
& ~(uint64_t)reply
.features
;
1265 ldout(msgr
->cct
,1) << "missing required features " << std::hex
<< feat_missing
<< std::dec
<< dendl
;
1269 if (reply
.tag
== CEPH_MSGR_TAG_SEQ
) {
1270 ldout(msgr
->cct
,10) << "got CEPH_MSGR_TAG_SEQ, reading acked_seq and writing in_seq" << dendl
;
1271 uint64_t newly_acked_seq
= 0;
1272 rc
= tcp_read((char*)&newly_acked_seq
, sizeof(newly_acked_seq
));
1274 ldout(msgr
->cct
,2) << "connect read error on newly_acked_seq" << cpp_strerror(rc
) << dendl
;
1277 ldout(msgr
->cct
,2) << " got newly_acked_seq " << newly_acked_seq
1278 << " vs out_seq " << out_seq
<< dendl
;
1279 while (newly_acked_seq
> out_seq
) {
1280 Message
*m
= _get_next_outgoing();
1282 ldout(msgr
->cct
,2) << " discarding previously sent " << m
->get_seq()
1283 << " " << *m
<< dendl
;
1284 assert(m
->get_seq() <= newly_acked_seq
);
1288 if (tcp_write((char*)&in_seq
, sizeof(in_seq
)) < 0) {
1289 ldout(msgr
->cct
,2) << "connect write error on in_seq" << dendl
;
1295 peer_global_seq
= reply
.global_seq
;
1296 policy
.lossy
= reply
.flags
& CEPH_MSG_CONNECT_LOSSY
;
1298 connect_seq
= cseq
+ 1;
1299 assert(connect_seq
== reply
.connect_seq
);
1300 backoff
= utime_t();
1301 connection_state
->set_features((uint64_t)reply
.features
& (uint64_t)connect
.features
);
1302 ldout(msgr
->cct
,10) << "connect success " << connect_seq
<< ", lossy = " << policy
.lossy
1303 << ", features " << connection_state
->get_features() << dendl
;
1306 // If we have an authorizer, get a new AuthSessionHandler to deal with ongoing security of the
1309 if (authorizer
!= NULL
) {
1310 session_security
.reset(
1311 get_auth_session_handler(msgr
->cct
,
1312 authorizer
->protocol
,
1313 authorizer
->session_key
,
1314 connection_state
->get_features()));
1316 // We have no authorizer, so we shouldn't be applying security to messages in this pipe. PLR
1317 session_security
.reset();
1320 msgr
->dispatch_queue
.queue_connect(connection_state
.get());
1321 msgr
->ms_deliver_handle_fast_connect(connection_state
.get());
1323 if (!reader_running
) {
1324 ldout(msgr
->cct
,20) << "connect starting reader" << dendl
;
1327 maybe_start_delay_thread();
1333 ldout(msgr
->cct
,0) << "connect got bad tag " << (int)tag
<< dendl
;
1338 if (conf
->ms_inject_internal_delays
) {
1339 ldout(msgr
->cct
, 10) << " sleep for " << msgr
->cct
->_conf
->ms_inject_internal_delays
<< dendl
;
1341 t
.set_from_double(msgr
->cct
->_conf
->ms_inject_internal_delays
);
1347 if (state
== STATE_CONNECTING
)
1350 ldout(msgr
->cct
,3) << "connect fault, but state = " << get_state_name()
1351 << " != connecting, stopping" << dendl
;
1358 void Pipe::register_pipe()
1360 ldout(msgr
->cct
,10) << "register_pipe" << dendl
;
1361 assert(msgr
->lock
.is_locked());
1362 Pipe
*existing
= msgr
->_lookup_pipe(peer_addr
);
1363 assert(existing
== NULL
);
1364 msgr
->rank_pipe
[peer_addr
] = this;
1367 void Pipe::unregister_pipe()
1369 assert(msgr
->lock
.is_locked());
1370 ceph::unordered_map
<entity_addr_t
,Pipe
*>::iterator p
= msgr
->rank_pipe
.find(peer_addr
);
1371 if (p
!= msgr
->rank_pipe
.end() && p
->second
== this) {
1372 ldout(msgr
->cct
,10) << "unregister_pipe" << dendl
;
1373 msgr
->rank_pipe
.erase(p
);
1375 ldout(msgr
->cct
,10) << "unregister_pipe - not registered" << dendl
;
1376 msgr
->accepting_pipes
.erase(this); // somewhat overkill, but safe.
1382 ldout(msgr
->cct
, 20) << "join" << dendl
;
1383 if (writer_thread
.is_started())
1384 writer_thread
.join();
1385 if (reader_thread
.is_started())
1386 reader_thread
.join();
1388 ldout(msgr
->cct
, 20) << "joining delay_thread" << dendl
;
1389 delay_thread
->stop();
1390 delay_thread
->join();
1394 void Pipe::requeue_sent()
1399 list
<Message
*>& rq
= out_q
[CEPH_MSG_PRIO_HIGHEST
];
1400 while (!sent
.empty()) {
1401 Message
*m
= sent
.back();
1403 ldout(msgr
->cct
,10) << "requeue_sent " << *m
<< " for resend seq " << out_seq
1404 << " (" << m
->get_seq() << ")" << dendl
;
1410 void Pipe::discard_requeued_up_to(uint64_t seq
)
1412 ldout(msgr
->cct
, 10) << "discard_requeued_up_to " << seq
<< dendl
;
1413 if (out_q
.count(CEPH_MSG_PRIO_HIGHEST
) == 0)
1415 list
<Message
*>& rq
= out_q
[CEPH_MSG_PRIO_HIGHEST
];
1416 while (!rq
.empty()) {
1417 Message
*m
= rq
.front();
1418 if (m
->get_seq() == 0 || m
->get_seq() > seq
)
1420 ldout(msgr
->cct
,10) << "discard_requeued_up_to " << *m
<< " for resend seq " << out_seq
1421 << " <= " << seq
<< ", discarding" << dendl
;
1427 out_q
.erase(CEPH_MSG_PRIO_HIGHEST
);
1431 * Tears down the Pipe's message queues, and removes them from the DispatchQueue
1432 * Must hold pipe_lock prior to calling.
1434 void Pipe::discard_out_queue()
1436 ldout(msgr
->cct
,10) << "discard_queue" << dendl
;
1438 for (list
<Message
*>::iterator p
= sent
.begin(); p
!= sent
.end(); ++p
) {
1439 ldout(msgr
->cct
,20) << " discard " << *p
<< dendl
;
1443 for (map
<int,list
<Message
*> >::iterator p
= out_q
.begin(); p
!= out_q
.end(); ++p
)
1444 for (list
<Message
*>::iterator r
= p
->second
.begin(); r
!= p
->second
.end(); ++r
) {
1445 ldout(msgr
->cct
,20) << " discard " << *r
<< dendl
;
1451 void Pipe::fault(bool onread
)
1453 const md_config_t
*conf
= msgr
->cct
->_conf
;
1454 assert(pipe_lock
.is_locked());
1457 if (onread
&& state
== STATE_CONNECTING
) {
1458 ldout(msgr
->cct
,10) << "fault already connecting, reader shutting down" << dendl
;
1462 ldout(msgr
->cct
,2) << "fault " << cpp_strerror(errno
) << dendl
;
1464 if (state
== STATE_CLOSED
||
1465 state
== STATE_CLOSING
) {
1466 ldout(msgr
->cct
,10) << "fault already closed|closing" << dendl
;
1467 if (connection_state
->clear_pipe(this))
1468 msgr
->dispatch_queue
.queue_reset(connection_state
.get());
1475 if (policy
.lossy
&& state
!= STATE_CONNECTING
) {
1476 ldout(msgr
->cct
,10) << "fault on lossy channel, failing" << dendl
;
1478 // disconnect from Connection, and mark it failed. future messages
1480 assert(connection_state
);
1482 bool cleared
= connection_state
->clear_pipe(this);
1484 // crib locks, blech. note that Pipe is now STATE_CLOSED and the
1485 // rank_pipe entry is ignored by others.
1488 if (conf
->ms_inject_internal_delays
) {
1489 ldout(msgr
->cct
, 10) << " sleep for " << msgr
->cct
->_conf
->ms_inject_internal_delays
<< dendl
;
1491 t
.set_from_double(msgr
->cct
->_conf
->ms_inject_internal_delays
);
1498 msgr
->lock
.Unlock();
1501 delay_thread
->discard();
1502 in_q
->discard_queue(conn_id
);
1503 discard_out_queue();
1505 msgr
->dispatch_queue
.queue_reset(connection_state
.get());
1509 // queue delayed items immediately
1511 delay_thread
->flush();
1513 // requeue sent items
1516 if (policy
.standby
&& !is_queued()) {
1517 ldout(msgr
->cct
,0) << "fault with nothing to send, going to standby" << dendl
;
1518 state
= STATE_STANDBY
;
1522 if (state
!= STATE_CONNECTING
) {
1523 if (policy
.server
) {
1524 ldout(msgr
->cct
,0) << "fault, server, going to standby" << dendl
;
1525 state
= STATE_STANDBY
;
1527 ldout(msgr
->cct
,0) << "fault, initiating reconnect" << dendl
;
1529 state
= STATE_CONNECTING
;
1531 backoff
= utime_t();
1532 } else if (backoff
== utime_t()) {
1533 ldout(msgr
->cct
,0) << "fault" << dendl
;
1534 backoff
.set_from_double(conf
->ms_initial_backoff
);
1536 ldout(msgr
->cct
,10) << "fault waiting " << backoff
<< dendl
;
1537 cond
.WaitInterval(pipe_lock
, backoff
);
1539 if (backoff
> conf
->ms_max_backoff
)
1540 backoff
.set_from_double(conf
->ms_max_backoff
);
1541 ldout(msgr
->cct
,10) << "fault done waiting or woke up" << dendl
;
1545 int Pipe::randomize_out_seq()
1547 if (connection_state
->get_features() & CEPH_FEATURE_MSG_AUTH
) {
1548 // Set out_seq to a random value, so CRC won't be predictable. Don't bother checking seq_error
1549 // here. We'll check it on the call. PLR
1550 int seq_error
= get_random_bytes((char *)&out_seq
, sizeof(out_seq
));
1551 out_seq
&= SEQ_MASK
;
1552 lsubdout(msgr
->cct
, ms
, 10) << "randomize_out_seq " << out_seq
<< dendl
;
1555 // previously, seq #'s always started at 0.
1561 void Pipe::was_session_reset()
1563 assert(pipe_lock
.is_locked());
1565 ldout(msgr
->cct
,10) << "was_session_reset" << dendl
;
1566 in_q
->discard_queue(conn_id
);
1568 delay_thread
->discard();
1569 discard_out_queue();
1571 msgr
->dispatch_queue
.queue_remote_reset(connection_state
.get());
1573 if (randomize_out_seq()) {
1574 lsubdout(msgr
->cct
,ms
,15) << "was_session_reset(): Could not get random bytes to set seq number for session reset; set seq number to " << out_seq
<< dendl
;
1583 ldout(msgr
->cct
,10) << "stop" << dendl
;
1584 assert(pipe_lock
.is_locked());
1585 state
= STATE_CLOSED
;
1586 state_closed
= true;
1591 void Pipe::stop_and_wait()
1593 assert(pipe_lock
.is_locked_by_me());
1594 if (state
!= STATE_CLOSED
)
1597 if (msgr
->cct
->_conf
->ms_inject_internal_delays
) {
1598 ldout(msgr
->cct
, 10) << __func__
<< " sleep for "
1599 << msgr
->cct
->_conf
->ms_inject_internal_delays
1602 t
.set_from_double(msgr
->cct
->_conf
->ms_inject_internal_delays
);
1608 delay_thread
->stop_fast_dispatching();
1611 while (reader_running
&&
1613 cond
.Wait(pipe_lock
);
1616 /* read msgs from socket.
1623 if (state
== STATE_ACCEPTING
) {
1625 assert(pipe_lock
.is_locked());
1629 while (state
!= STATE_CLOSED
&&
1630 state
!= STATE_CONNECTING
) {
1631 assert(pipe_lock
.is_locked());
1633 // sleep if (re)connecting
1634 if (state
== STATE_STANDBY
) {
1635 ldout(msgr
->cct
,20) << "reader sleeping during reconnect|standby" << dendl
;
1636 cond
.Wait(pipe_lock
);
1640 // get a reference to the AuthSessionHandler while we have the pipe_lock
1641 ceph::shared_ptr
<AuthSessionHandler
> auth_handler
= session_security
;
1646 ldout(msgr
->cct
,20) << "reader reading tag..." << dendl
;
1647 if (tcp_read((char*)&tag
, 1) < 0) {
1649 ldout(msgr
->cct
,2) << "reader couldn't read tag, " << cpp_strerror(errno
) << dendl
;
1654 if (tag
== CEPH_MSGR_TAG_KEEPALIVE
) {
1655 ldout(msgr
->cct
,2) << "reader got KEEPALIVE" << dendl
;
1657 connection_state
->set_last_keepalive(ceph_clock_now());
1660 if (tag
== CEPH_MSGR_TAG_KEEPALIVE2
) {
1661 ldout(msgr
->cct
,30) << "reader got KEEPALIVE2 tag ..." << dendl
;
1663 int rc
= tcp_read((char*)&t
, sizeof(t
));
1666 ldout(msgr
->cct
,2) << "reader couldn't read KEEPALIVE2 stamp "
1667 << cpp_strerror(errno
) << dendl
;
1670 send_keepalive_ack
= true;
1671 keepalive_ack_stamp
= utime_t(t
);
1672 ldout(msgr
->cct
,2) << "reader got KEEPALIVE2 " << keepalive_ack_stamp
1674 connection_state
->set_last_keepalive(ceph_clock_now());
1679 if (tag
== CEPH_MSGR_TAG_KEEPALIVE2_ACK
) {
1680 ldout(msgr
->cct
,2) << "reader got KEEPALIVE_ACK" << dendl
;
1681 struct ceph_timespec t
;
1682 int rc
= tcp_read((char*)&t
, sizeof(t
));
1685 ldout(msgr
->cct
,2) << "reader couldn't read KEEPALIVE2 stamp " << cpp_strerror(errno
) << dendl
;
1688 connection_state
->set_last_keepalive_ack(utime_t(t
));
1694 if (tag
== CEPH_MSGR_TAG_ACK
) {
1695 ldout(msgr
->cct
,20) << "reader got ACK" << dendl
;
1697 int rc
= tcp_read((char*)&seq
, sizeof(seq
));
1700 ldout(msgr
->cct
,2) << "reader couldn't read ack seq, " << cpp_strerror(errno
) << dendl
;
1702 } else if (state
!= STATE_CLOSED
) {
1708 else if (tag
== CEPH_MSGR_TAG_MSG
) {
1709 ldout(msgr
->cct
,20) << "reader got MSG" << dendl
;
1711 int r
= read_message(&m
, auth_handler
.get());
1721 m
->trace
.event("pipe read message");
1723 if (state
== STATE_CLOSED
||
1724 state
== STATE_CONNECTING
) {
1725 in_q
->dispatch_throttle_release(m
->get_dispatch_throttle_size());
1730 // check received seq#. if it is old, drop the message.
1731 // note that incoming messages may skip ahead. this is convenient for the client
1732 // side queueing because messages can't be renumbered, but the (kernel) client will
1733 // occasionally pull a message out of the sent queue to send elsewhere. in that case
1734 // it doesn't matter if we "got" it or not.
1735 if (m
->get_seq() <= in_seq
) {
1736 ldout(msgr
->cct
,0) << "reader got old message "
1737 << m
->get_seq() << " <= " << in_seq
<< " " << m
<< " " << *m
1738 << ", discarding" << dendl
;
1739 in_q
->dispatch_throttle_release(m
->get_dispatch_throttle_size());
1741 if (connection_state
->has_feature(CEPH_FEATURE_RECONNECT_SEQ
) &&
1742 msgr
->cct
->_conf
->ms_die_on_old_message
)
1743 assert(0 == "old msgs despite reconnect_seq feature");
1746 if (m
->get_seq() > in_seq
+ 1) {
1747 ldout(msgr
->cct
,0) << "reader missed message? skipped from seq "
1748 << in_seq
<< " to " << m
->get_seq() << dendl
;
1749 if (msgr
->cct
->_conf
->ms_die_on_skipped_message
)
1750 assert(0 == "skipped incoming seq");
1753 m
->set_connection(connection_state
.get());
1755 // note last received message.
1756 in_seq
= m
->get_seq();
1758 cond
.Signal(); // wake up writer, to ack this
1760 ldout(msgr
->cct
,10) << "reader got message "
1761 << m
->get_seq() << " " << m
<< " " << *m
1763 in_q
->fast_preprocess(m
);
1767 if (rand() % 10000 < msgr
->cct
->_conf
->ms_inject_delay_probability
* 10000.0) {
1768 release
= m
->get_recv_stamp();
1769 release
+= msgr
->cct
->_conf
->ms_inject_delay_max
* (double)(rand() % 10000) / 10000.0;
1770 lsubdout(msgr
->cct
, ms
, 1) << "queue_received will delay until " << release
<< " on " << m
<< " " << *m
<< dendl
;
1772 delay_thread
->queue(release
, m
);
1774 if (in_q
->can_fast_dispatch(m
)) {
1775 reader_dispatching
= true;
1777 in_q
->fast_dispatch(m
);
1779 reader_dispatching
= false;
1780 if (state
== STATE_CLOSED
||
1781 notify_on_dispatch_done
) { // there might be somebody waiting
1782 notify_on_dispatch_done
= false;
1786 in_q
->enqueue(m
, m
->get_priority(), conn_id
);
1791 else if (tag
== CEPH_MSGR_TAG_CLOSE
) {
1792 ldout(msgr
->cct
,20) << "reader got CLOSE" << dendl
;
1794 if (state
== STATE_CLOSING
) {
1795 state
= STATE_CLOSED
;
1796 state_closed
= true;
1798 state
= STATE_CLOSING
;
1804 ldout(msgr
->cct
,0) << "reader bad tag " << (int)tag
<< dendl
;
1812 reader_running
= false;
1813 reader_needs_join
= true;
1814 unlock_maybe_reap();
1815 ldout(msgr
->cct
,10) << "reader done" << dendl
;
1818 /* write msgs to socket.
1824 while (state
!= STATE_CLOSED
) {// && state != STATE_WAIT) {
1825 ldout(msgr
->cct
,10) << "writer: state = " << get_state_name()
1826 << " policy.server=" << policy
.server
<< dendl
;
1829 if (is_queued() && state
== STATE_STANDBY
&& !policy
.server
)
1830 state
= STATE_CONNECTING
;
1833 if (state
== STATE_CONNECTING
) {
1834 assert(!policy
.server
);
1839 if (state
== STATE_CLOSING
) {
1841 ldout(msgr
->cct
,20) << "writer writing CLOSE tag" << dendl
;
1842 char tag
= CEPH_MSGR_TAG_CLOSE
;
1843 state
= STATE_CLOSED
;
1844 state_closed
= true;
1847 // we can ignore return value, actually; we don't care if this succeeds.
1848 int r
= ::write(sd
, &tag
, 1);
1855 if (state
!= STATE_CONNECTING
&& state
!= STATE_WAIT
&& state
!= STATE_STANDBY
&&
1856 (is_queued() || in_seq
> in_seq_acked
)) {
1859 if (send_keepalive
) {
1861 if (connection_state
->has_feature(CEPH_FEATURE_MSGR_KEEPALIVE2
)) {
1863 rc
= write_keepalive2(CEPH_MSGR_TAG_KEEPALIVE2
,
1867 rc
= write_keepalive();
1871 ldout(msgr
->cct
,2) << "writer couldn't write keepalive[2], "
1872 << cpp_strerror(errno
) << dendl
;
1876 send_keepalive
= false;
1878 if (send_keepalive_ack
) {
1879 utime_t t
= keepalive_ack_stamp
;
1881 int rc
= write_keepalive2(CEPH_MSGR_TAG_KEEPALIVE2_ACK
, t
);
1884 ldout(msgr
->cct
,2) << "writer couldn't write keepalive_ack, " << cpp_strerror(errno
) << dendl
;
1888 send_keepalive_ack
= false;
1892 if (in_seq
> in_seq_acked
) {
1893 uint64_t send_seq
= in_seq
;
1895 int rc
= write_ack(send_seq
);
1898 ldout(msgr
->cct
,2) << "writer couldn't write ack, " << cpp_strerror(errno
) << dendl
;
1902 in_seq_acked
= send_seq
;
1905 // grab outgoing message
1906 Message
*m
= _get_next_outgoing();
1908 m
->set_seq(++out_seq
);
1909 if (!policy
.lossy
) {
1915 // associate message with Connection (for benefit of encode_payload)
1916 m
->set_connection(connection_state
.get());
1918 uint64_t features
= connection_state
->get_features();
1920 if (m
->empty_payload())
1921 ldout(msgr
->cct
,20) << "writer encoding " << m
->get_seq() << " features " << features
1922 << " " << m
<< " " << *m
<< dendl
;
1924 ldout(msgr
->cct
,20) << "writer half-reencoding " << m
->get_seq() << " features " << features
1925 << " " << m
<< " " << *m
<< dendl
;
1927 // encode and copy out of *m
1928 m
->encode(features
, msgr
->crcflags
);
1930 // prepare everything
1931 const ceph_msg_header
& header
= m
->get_header();
1932 const ceph_msg_footer
& footer
= m
->get_footer();
1934 // Now that we have all the crcs calculated, handle the
1935 // digital signature for the message, if the pipe has session
1936 // security set up. Some session security options do not
1937 // actually calculate and check the signature, but they should
1938 // handle the calls to sign_message and check_signature. PLR
1939 if (session_security
.get() == NULL
) {
1940 ldout(msgr
->cct
, 20) << "writer no session security" << dendl
;
1942 if (session_security
->sign_message(m
)) {
1943 ldout(msgr
->cct
, 20) << "writer failed to sign seq # " << header
.seq
1944 << "): sig = " << footer
.sig
<< dendl
;
1946 ldout(msgr
->cct
, 20) << "writer signed seq # " << header
.seq
1947 << "): sig = " << footer
.sig
<< dendl
;
1951 bufferlist blist
= m
->get_payload();
1952 blist
.append(m
->get_middle());
1953 blist
.append(m
->get_data());
1957 m
->trace
.event("pipe writing message");
1959 ldout(msgr
->cct
,20) << "writer sending " << m
->get_seq() << " " << m
<< dendl
;
1960 int rc
= write_message(header
, footer
, blist
);
1964 ldout(msgr
->cct
,1) << "writer error sending " << m
<< ", "
1965 << cpp_strerror(errno
) << dendl
;
1974 ldout(msgr
->cct
,20) << "writer sleeping" << dendl
;
1975 cond
.Wait(pipe_lock
);
1978 ldout(msgr
->cct
,20) << "writer finishing" << dendl
;
1981 writer_running
= false;
1982 unlock_maybe_reap();
1983 ldout(msgr
->cct
,10) << "writer done" << dendl
;
1986 void Pipe::unlock_maybe_reap()
1988 if (!reader_running
&& !writer_running
) {
1991 if (delay_thread
&& delay_thread
->is_flushing()) {
1992 delay_thread
->wait_for_flush();
1994 msgr
->queue_reap(this);
2000 static void alloc_aligned_buffer(bufferlist
& data
, unsigned len
, unsigned off
)
2002 // create a buffer to read into that matches the data alignment
2003 unsigned left
= len
;
2004 if (off
& ~CEPH_PAGE_MASK
) {
2007 head
= MIN(CEPH_PAGE_SIZE
- (off
& ~CEPH_PAGE_MASK
), left
);
2008 data
.push_back(buffer::create(head
));
2011 unsigned middle
= left
& CEPH_PAGE_MASK
;
2013 data
.push_back(buffer::create_page_aligned(middle
));
2017 data
.push_back(buffer::create(left
));
2021 int Pipe::read_message(Message
**pm
, AuthSessionHandler
* auth_handler
)
2025 //ldout(msgr->cct,10) << "receiver.read_message from sd " << sd << dendl;
2027 ceph_msg_header header
;
2028 ceph_msg_footer footer
;
2029 __u32 header_crc
= 0;
2031 if (connection_state
->has_feature(CEPH_FEATURE_NOSRCADDR
)) {
2032 if (tcp_read((char*)&header
, sizeof(header
)) < 0)
2034 if (msgr
->crcflags
& MSG_CRC_HEADER
) {
2035 header_crc
= ceph_crc32c(0, (unsigned char *)&header
, sizeof(header
) - sizeof(header
.crc
));
2038 ceph_msg_header_old oldheader
;
2039 if (tcp_read((char*)&oldheader
, sizeof(oldheader
)) < 0)
2042 memcpy(&header
, &oldheader
, sizeof(header
));
2043 header
.src
= oldheader
.src
.name
;
2044 header
.reserved
= oldheader
.reserved
;
2045 if (msgr
->crcflags
& MSG_CRC_HEADER
) {
2046 header
.crc
= oldheader
.crc
;
2047 header_crc
= ceph_crc32c(0, (unsigned char *)&oldheader
, sizeof(oldheader
) - sizeof(oldheader
.crc
));
2051 ldout(msgr
->cct
,20) << "reader got envelope type=" << header
.type
2052 << " src " << entity_name_t(header
.src
)
2053 << " front=" << header
.front_len
2054 << " data=" << header
.data_len
2055 << " off " << header
.data_off
2058 // verify header crc
2059 if ((msgr
->crcflags
& MSG_CRC_HEADER
) && header_crc
!= header
.crc
) {
2060 ldout(msgr
->cct
,0) << "reader got bad header crc " << header_crc
<< " != " << header
.crc
<< dendl
;
2064 bufferlist front
, middle
, data
;
2065 int front_len
, middle_len
;
2066 unsigned data_len
, data_off
;
2069 utime_t recv_stamp
= ceph_clock_now();
2071 if (policy
.throttler_messages
) {
2072 ldout(msgr
->cct
,10) << "reader wants " << 1 << " message from policy throttler "
2073 << policy
.throttler_messages
->get_current() << "/"
2074 << policy
.throttler_messages
->get_max() << dendl
;
2075 policy
.throttler_messages
->get();
2078 uint64_t message_size
= header
.front_len
+ header
.middle_len
+ header
.data_len
;
2080 if (policy
.throttler_bytes
) {
2081 ldout(msgr
->cct
,10) << "reader wants " << message_size
<< " bytes from policy throttler "
2082 << policy
.throttler_bytes
->get_current() << "/"
2083 << policy
.throttler_bytes
->get_max() << dendl
;
2084 policy
.throttler_bytes
->get(message_size
);
2087 // throttle total bytes waiting for dispatch. do this _after_ the
2088 // policy throttle, as this one does not deadlock (unless dispatch
2089 // blocks indefinitely, which it shouldn't). in contrast, the
2090 // policy throttle carries for the lifetime of the message.
2091 ldout(msgr
->cct
,10) << "reader wants " << message_size
<< " from dispatch throttler "
2092 << in_q
->dispatch_throttler
.get_current() << "/"
2093 << in_q
->dispatch_throttler
.get_max() << dendl
;
2094 in_q
->dispatch_throttler
.get(message_size
);
2097 utime_t throttle_stamp
= ceph_clock_now();
2100 front_len
= header
.front_len
;
2102 bufferptr bp
= buffer::create(front_len
);
2103 if (tcp_read(bp
.c_str(), front_len
) < 0)
2104 goto out_dethrottle
;
2105 front
.push_back(std::move(bp
));
2106 ldout(msgr
->cct
,20) << "reader got front " << front
.length() << dendl
;
2110 middle_len
= header
.middle_len
;
2112 bufferptr bp
= buffer::create(middle_len
);
2113 if (tcp_read(bp
.c_str(), middle_len
) < 0)
2114 goto out_dethrottle
;
2115 middle
.push_back(std::move(bp
));
2116 ldout(msgr
->cct
,20) << "reader got middle " << middle
.length() << dendl
;
2121 data_len
= le32_to_cpu(header
.data_len
);
2122 data_off
= le32_to_cpu(header
.data_off
);
2124 unsigned offset
= 0;
2125 unsigned left
= data_len
;
2127 bufferlist newbuf
, rxbuf
;
2128 bufferlist::iterator blp
;
2129 int rxbuf_version
= 0;
2133 if (tcp_read_wait() < 0)
2134 goto out_dethrottle
;
2137 connection_state
->lock
.Lock();
2138 map
<ceph_tid_t
,pair
<bufferlist
,int> >::iterator p
= connection_state
->rx_buffers
.find(header
.tid
);
2139 if (p
!= connection_state
->rx_buffers
.end()) {
2140 if (rxbuf
.length() == 0 || p
->second
.second
!= rxbuf_version
) {
2141 ldout(msgr
->cct
,10) << "reader seleting rx buffer v " << p
->second
.second
2142 << " at offset " << offset
2143 << " len " << p
->second
.first
.length() << dendl
;
2144 rxbuf
= p
->second
.first
;
2145 rxbuf_version
= p
->second
.second
;
2146 // make sure it's big enough
2147 if (rxbuf
.length() < data_len
)
2148 rxbuf
.push_back(buffer::create(data_len
- rxbuf
.length()));
2149 blp
= p
->second
.first
.begin();
2150 blp
.advance(offset
);
2153 if (!newbuf
.length()) {
2154 ldout(msgr
->cct
,20) << "reader allocating new rx buffer at offset " << offset
<< dendl
;
2155 alloc_aligned_buffer(newbuf
, data_len
, data_off
);
2156 blp
= newbuf
.begin();
2157 blp
.advance(offset
);
2160 bufferptr bp
= blp
.get_current_ptr();
2161 int read
= MIN(bp
.length(), left
);
2162 ldout(msgr
->cct
,20) << "reader reading nonblocking into " << (void*)bp
.c_str() << " len " << bp
.length() << dendl
;
2163 ssize_t got
= tcp_read_nonblocking(bp
.c_str(), read
);
2164 ldout(msgr
->cct
,30) << "reader read " << got
<< " of " << read
<< dendl
;
2165 connection_state
->lock
.Unlock();
2167 goto out_dethrottle
;
2170 data
.append(bp
, 0, got
);
2173 } // else we got a signal or something; just loop.
2178 if (connection_state
->has_feature(CEPH_FEATURE_MSG_AUTH
)) {
2179 if (tcp_read((char*)&footer
, sizeof(footer
)) < 0)
2180 goto out_dethrottle
;
2182 ceph_msg_footer_old old_footer
;
2183 if (tcp_read((char*)&old_footer
, sizeof(old_footer
)) < 0)
2184 goto out_dethrottle
;
2185 footer
.front_crc
= old_footer
.front_crc
;
2186 footer
.middle_crc
= old_footer
.middle_crc
;
2187 footer
.data_crc
= old_footer
.data_crc
;
2189 footer
.flags
= old_footer
.flags
;
2192 aborted
= (footer
.flags
& CEPH_MSG_FOOTER_COMPLETE
) == 0;
2193 ldout(msgr
->cct
,10) << "aborted = " << aborted
<< dendl
;
2195 ldout(msgr
->cct
,0) << "reader got " << front
.length() << " + " << middle
.length() << " + " << data
.length()
2196 << " byte message.. ABORTED" << dendl
;
2198 goto out_dethrottle
;
2201 ldout(msgr
->cct
,20) << "reader got " << front
.length() << " + " << middle
.length() << " + " << data
.length()
2202 << " byte message" << dendl
;
2203 message
= decode_message(msgr
->cct
, msgr
->crcflags
, header
, footer
,
2204 front
, middle
, data
, connection_state
.get());
2207 goto out_dethrottle
;
2211 // Check the signature if one should be present. A zero return indicates success. PLR
2214 if (auth_handler
== NULL
) {
2215 ldout(msgr
->cct
, 10) << "No session security set" << dendl
;
2217 if (auth_handler
->check_message_signature(message
)) {
2218 ldout(msgr
->cct
, 0) << "Signature check failed" << dendl
;
2221 goto out_dethrottle
;
2225 message
->set_byte_throttler(policy
.throttler_bytes
);
2226 message
->set_message_throttler(policy
.throttler_messages
);
2228 // store reservation size in message, so we don't get confused
2229 // by messages entering the dispatch queue through other paths.
2230 message
->set_dispatch_throttle_size(message_size
);
2232 message
->set_recv_stamp(recv_stamp
);
2233 message
->set_throttle_stamp(throttle_stamp
);
2234 message
->set_recv_complete_stamp(ceph_clock_now());
2240 // release bytes reserved from the throttlers on failure
2241 if (policy
.throttler_messages
) {
2242 ldout(msgr
->cct
,10) << "reader releasing " << 1 << " message to policy throttler "
2243 << policy
.throttler_messages
->get_current() << "/"
2244 << policy
.throttler_messages
->get_max() << dendl
;
2245 policy
.throttler_messages
->put();
2248 if (policy
.throttler_bytes
) {
2249 ldout(msgr
->cct
,10) << "reader releasing " << message_size
<< " bytes to policy throttler "
2250 << policy
.throttler_bytes
->get_current() << "/"
2251 << policy
.throttler_bytes
->get_max() << dendl
;
2252 policy
.throttler_bytes
->put(message_size
);
2255 in_q
->dispatch_throttle_release(message_size
);
2260 int Pipe::do_sendmsg(struct msghdr
*msg
, unsigned len
, bool more
)
2262 MSGR_SIGPIPE_STOPPER
;
2265 r
= ::sendmsg(sd
, msg
, MSG_NOSIGNAL
| (more
? MSG_MORE
: 0));
2267 ldout(msgr
->cct
,10) << "do_sendmsg hmm do_sendmsg got r==0!" << dendl
;
2270 ldout(msgr
->cct
,1) << "do_sendmsg error " << cpp_strerror(r
) << dendl
;
2273 if (state
== STATE_CLOSED
) {
2274 ldout(msgr
->cct
,10) << "do_sendmsg oh look, state == CLOSED, giving up" << dendl
;
2275 return -EINTR
; // close enough
2279 if (len
== 0) break;
2281 // hrmph. trim r bytes off the front of our message.
2282 ldout(msgr
->cct
,20) << "do_sendmsg short write did " << r
<< ", still have " << len
<< dendl
;
2284 if (msg
->msg_iov
[0].iov_len
<= (size_t)r
) {
2285 // lose this whole item
2286 //ldout(msgr->cct,30) << "skipping " << msg->msg_iov[0].iov_len << ", " << (msg->msg_iovlen-1) << " v, " << r << " left" << dendl;
2287 r
-= msg
->msg_iov
[0].iov_len
;
2292 //ldout(msgr->cct,30) << "adjusting " << msg->msg_iov[0].iov_len << ", " << msg->msg_iovlen << " v, " << r << " left" << dendl;
2293 msg
->msg_iov
[0].iov_base
= (char *)msg
->msg_iov
[0].iov_base
+ r
;
2294 msg
->msg_iov
[0].iov_len
-= r
;
2303 int Pipe::write_ack(uint64_t seq
)
2305 ldout(msgr
->cct
,10) << "write_ack " << seq
<< dendl
;
2307 char c
= CEPH_MSGR_TAG_ACK
;
2312 memset(&msg
, 0, sizeof(msg
));
2313 struct iovec msgvec
[2];
2314 msgvec
[0].iov_base
= &c
;
2315 msgvec
[0].iov_len
= 1;
2316 msgvec
[1].iov_base
= &s
;
2317 msgvec
[1].iov_len
= sizeof(s
);
2318 msg
.msg_iov
= msgvec
;
2321 if (do_sendmsg(&msg
, 1 + sizeof(s
), true) < 0)
2326 int Pipe::write_keepalive()
2328 ldout(msgr
->cct
,10) << "write_keepalive" << dendl
;
2330 char c
= CEPH_MSGR_TAG_KEEPALIVE
;
2333 memset(&msg
, 0, sizeof(msg
));
2334 struct iovec msgvec
[2];
2335 msgvec
[0].iov_base
= &c
;
2336 msgvec
[0].iov_len
= 1;
2337 msg
.msg_iov
= msgvec
;
2340 if (do_sendmsg(&msg
, 1) < 0)
2345 int Pipe::write_keepalive2(char tag
, const utime_t
& t
)
2347 ldout(msgr
->cct
,10) << "write_keepalive2 " << (int)tag
<< " " << t
<< dendl
;
2348 struct ceph_timespec ts
;
2349 t
.encode_timeval(&ts
);
2351 memset(&msg
, 0, sizeof(msg
));
2352 struct iovec msgvec
[2];
2353 msgvec
[0].iov_base
= &tag
;
2354 msgvec
[0].iov_len
= 1;
2355 msgvec
[1].iov_base
= &ts
;
2356 msgvec
[1].iov_len
= sizeof(ts
);
2357 msg
.msg_iov
= msgvec
;
2360 if (do_sendmsg(&msg
, 1 + sizeof(ts
)) < 0)
2366 int Pipe::write_message(const ceph_msg_header
& header
, const ceph_msg_footer
& footer
, bufferlist
& blist
)
2370 // set up msghdr and iovecs
2372 memset(&msg
, 0, sizeof(msg
));
2373 msg
.msg_iov
= msgvec
;
2377 char tag
= CEPH_MSGR_TAG_MSG
;
2378 msgvec
[msg
.msg_iovlen
].iov_base
= &tag
;
2379 msgvec
[msg
.msg_iovlen
].iov_len
= 1;
2384 ceph_msg_header_old oldheader
;
2385 if (connection_state
->has_feature(CEPH_FEATURE_NOSRCADDR
)) {
2386 msgvec
[msg
.msg_iovlen
].iov_base
= (char*)&header
;
2387 msgvec
[msg
.msg_iovlen
].iov_len
= sizeof(header
);
2388 msglen
+= sizeof(header
);
2391 memcpy(&oldheader
, &header
, sizeof(header
));
2392 oldheader
.src
.name
= header
.src
;
2393 oldheader
.src
.addr
= connection_state
->get_peer_addr();
2394 oldheader
.orig_src
= oldheader
.src
;
2395 oldheader
.reserved
= header
.reserved
;
2396 if (msgr
->crcflags
& MSG_CRC_HEADER
) {
2397 oldheader
.crc
= ceph_crc32c(0, (unsigned char*)&oldheader
,
2398 sizeof(oldheader
) - sizeof(oldheader
.crc
));
2402 msgvec
[msg
.msg_iovlen
].iov_base
= (char*)&oldheader
;
2403 msgvec
[msg
.msg_iovlen
].iov_len
= sizeof(oldheader
);
2404 msglen
+= sizeof(oldheader
);
2408 // payload (front+data)
2409 list
<bufferptr
>::const_iterator pb
= blist
.buffers().begin();
2410 unsigned b_off
= 0; // carry-over buffer offset, if any
2411 unsigned bl_pos
= 0; // blist pos
2412 unsigned left
= blist
.length();
2415 unsigned donow
= MIN(left
, pb
->length()-b_off
);
2417 ldout(msgr
->cct
,0) << "donow = " << donow
<< " left " << left
<< " pb->length " << pb
->length()
2418 << " b_off " << b_off
<< dendl
;
2421 ldout(msgr
->cct
,30) << " bl_pos " << bl_pos
<< " b_off " << b_off
2422 << " leftinchunk " << left
2423 << " buffer len " << pb
->length()
2424 << " writing " << donow
2427 if (msg
.msg_iovlen
>= SM_IOV_MAX
-2) {
2428 if (do_sendmsg(&msg
, msglen
, true))
2431 // and restart the iov
2432 msg
.msg_iov
= msgvec
;
2437 msgvec
[msg
.msg_iovlen
].iov_base
= (void*)(pb
->c_str()+b_off
);
2438 msgvec
[msg
.msg_iovlen
].iov_len
= donow
;
2442 assert(left
>= donow
);
2448 while (b_off
== pb
->length()) {
2455 // send footer; if receiver doesn't support signatures, use the old footer format
2457 ceph_msg_footer_old old_footer
;
2458 if (connection_state
->has_feature(CEPH_FEATURE_MSG_AUTH
)) {
2459 msgvec
[msg
.msg_iovlen
].iov_base
= (void*)&footer
;
2460 msgvec
[msg
.msg_iovlen
].iov_len
= sizeof(footer
);
2461 msglen
+= sizeof(footer
);
2464 if (msgr
->crcflags
& MSG_CRC_HEADER
) {
2465 old_footer
.front_crc
= footer
.front_crc
;
2466 old_footer
.middle_crc
= footer
.middle_crc
;
2468 old_footer
.front_crc
= old_footer
.middle_crc
= 0;
2470 old_footer
.data_crc
= msgr
->crcflags
& MSG_CRC_DATA
? footer
.data_crc
: 0;
2471 old_footer
.flags
= footer
.flags
;
2472 msgvec
[msg
.msg_iovlen
].iov_base
= (char*)&old_footer
;
2473 msgvec
[msg
.msg_iovlen
].iov_len
= sizeof(old_footer
);
2474 msglen
+= sizeof(old_footer
);
2479 if (do_sendmsg(&msg
, msglen
))
2493 int Pipe::tcp_read(char *buf
, unsigned len
)
2500 if (msgr
->cct
->_conf
->ms_inject_socket_failures
&& sd
>= 0) {
2501 if (rand() % msgr
->cct
->_conf
->ms_inject_socket_failures
== 0) {
2502 ldout(msgr
->cct
, 0) << "injecting socket failure" << dendl
;
2503 ::shutdown(sd
, SHUT_RDWR
);
2507 if (tcp_read_wait() < 0)
2510 ssize_t got
= tcp_read_nonblocking(buf
, len
);
2517 //lgeneric_dout(cct, DBL) << "tcp_read got " << got << ", " << len << " left" << dendl;
2522 int Pipe::tcp_read_wait()
2529 pfd
.events
= POLLIN
;
2530 #if defined(__linux__)
2531 pfd
.events
|= POLLRDHUP
;
2534 if (has_pending_data())
2537 int r
= poll(&pfd
, 1, msgr
->timeout
);
2543 evmask
= POLLERR
| POLLHUP
| POLLNVAL
;
2544 #if defined(__linux__)
2545 evmask
|= POLLRDHUP
;
2547 if (pfd
.revents
& evmask
)
2550 if (!(pfd
.revents
& POLLIN
))
2556 ssize_t
Pipe::do_recv(char *buf
, size_t len
, int flags
)
2559 ssize_t got
= ::recv( sd
, buf
, len
, flags
);
2561 if (errno
== EINTR
) {
2564 ldout(msgr
->cct
, 10) << __func__
<< " socket " << sd
<< " returned "
2565 << got
<< " " << cpp_strerror(errno
) << dendl
;
2574 ssize_t
Pipe::buffered_recv(char *buf
, size_t len
, int flags
)
2577 ssize_t total_recv
= 0;
2578 if (recv_len
> recv_ofs
) {
2579 int to_read
= MIN(recv_len
- recv_ofs
, left
);
2580 memcpy(buf
, &recv_buf
[recv_ofs
], to_read
);
2581 recv_ofs
+= to_read
;
2587 total_recv
+= to_read
;
2590 /* nothing left in the prefetch buffer */
2592 if (left
> recv_max_prefetch
) {
2593 /* this was a large read, we don't prefetch for these */
2594 ssize_t ret
= do_recv(buf
, left
, flags
);
2605 ssize_t got
= do_recv(recv_buf
, recv_max_prefetch
, flags
);
2613 recv_len
= (size_t)got
;
2614 got
= MIN(left
, (size_t)got
);
2615 memcpy(buf
, recv_buf
, got
);
2621 ssize_t
Pipe::tcp_read_nonblocking(char *buf
, unsigned len
)
2623 ssize_t got
= buffered_recv(buf
, len
, MSG_DONTWAIT
);
2625 ldout(msgr
->cct
, 10) << __func__
<< " socket " << sd
<< " returned "
2626 << got
<< " " << cpp_strerror(errno
) << dendl
;
2630 /* poll() said there was data, but we didn't read any - peer
2631 * sent a FIN. Maybe POLLRDHUP signals this, but this is
2632 * standard socket behavior as documented by Stevens.
2639 int Pipe::tcp_write(const char *buf
, unsigned len
)
2645 pfd
.events
= POLLOUT
| POLLHUP
| POLLNVAL
| POLLERR
;
2646 #if defined(__linux__)
2647 pfd
.events
|= POLLRDHUP
;
2650 if (msgr
->cct
->_conf
->ms_inject_socket_failures
&& sd
>= 0) {
2651 if (rand() % msgr
->cct
->_conf
->ms_inject_socket_failures
== 0) {
2652 ldout(msgr
->cct
, 0) << "injecting socket failure" << dendl
;
2653 ::shutdown(sd
, SHUT_RDWR
);
2657 if (poll(&pfd
, 1, -1) < 0)
2660 if (!(pfd
.revents
& POLLOUT
))
2663 //lgeneric_dout(cct, DBL) << "tcp_write writing " << len << dendl;
2666 MSGR_SIGPIPE_STOPPER
;
2667 int did
= ::send( sd
, buf
, len
, MSG_NOSIGNAL
);
2669 //lgeneric_dout(cct, 1) << "tcp_write error did = " << did << " " << cpp_strerror(errno) << dendl;
2670 //lgeneric_derr(cct, 1) << "tcp_write error did = " << did << " " << cpp_strerror(errno) << dendl;
2675 //lgeneric_dout(cct, DBL) << "tcp_write did " << did << ", " << len << " left" << dendl;