1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
15 #include <sys/types.h>
16 #include <sys/socket.h>
17 #include <netinet/in.h>
18 #include <netinet/ip.h>
19 #include <netinet/tcp.h>
24 #include "msg/Message.h"
26 #include "SimpleMessenger.h"
28 #include "common/debug.h"
29 #include "common/errno.h"
30 #include "common/valgrind.h"
32 // Below included to get encode_encrypt(); That probably should be in Crypto.h, instead
34 #include "auth/Crypto.h"
35 #include "auth/cephx/CephxProtocol.h"
36 #include "auth/AuthSessionHandler.h"
38 #include "include/sock_compat.h"
40 // Constant to limit starting sequence number to 2^31. Nothing special about it, just a big number. PLR
41 #define SEQ_MASK 0x7fffffff
42 #define dout_subsys ceph_subsys_ms
45 #define dout_prefix *_dout << *this
46 ostream
& Pipe::_pipe_prefix(std::ostream
&out
) const {
47 return out
<< "-- " << msgr
->get_myinst().addr
<< " >> " << peer_addr
<< " pipe(" << this
48 << " sd=" << sd
<< " :" << port
50 << " pgs=" << peer_global_seq
51 << " cs=" << connect_seq
52 << " l=" << policy
.lossy
53 << " c=" << connection_state
57 ostream
& operator<<(ostream
&out
, const Pipe
&pipe
) {
58 return pipe
._pipe_prefix(out
);
62 * The DelayedDelivery is for injecting delays into Message delivery off
63 * the socket. It is only enabled if delays are requested, and if they
64 * are then it pulls Messages off the DelayQueue and puts them into the
65 * in_q (SimpleMessenger::dispatch_queue).
66 * Please note that this probably has issues with Pipe shutdown and
67 * replacement semantics. I've tried, but no guarantees.
69 class Pipe::DelayedDelivery
: public Thread
{
71 std::deque
< pair
<utime_t
,Message
*> > delay_queue
;
76 bool stop_delayed_delivery
;
77 bool delay_dispatching
; // we are in fast dispatch now
78 bool stop_fast_dispatching_flag
; // we need to stop fast dispatching
81 explicit DelayedDelivery(Pipe
*p
)
83 delay_lock("Pipe::DelayedDelivery::delay_lock"), flush_count(0),
85 stop_delayed_delivery(false),
86 delay_dispatching(false),
87 stop_fast_dispatching_flag(false) { }
88 ~DelayedDelivery() override
{
91 void *entry() override
;
92 void queue(utime_t release
, Message
*m
) {
93 Mutex::Locker
l(delay_lock
);
94 delay_queue
.push_back(make_pair(release
, m
));
100 Mutex::Locker
l(delay_lock
);
101 return flush_count
> 0 || active_flush
;
103 void wait_for_flush() {
104 Mutex::Locker
l(delay_lock
);
105 while (flush_count
> 0 || active_flush
)
106 delay_cond
.Wait(delay_lock
);
110 stop_delayed_delivery
= true;
114 void steal_for_pipe(Pipe
*new_owner
) {
115 Mutex::Locker
l(delay_lock
);
119 * We need to stop fast dispatching before we need to stop putting
120 * normal messages into the DispatchQueue.
122 void stop_fast_dispatching();
125 /**************************************
129 Pipe::Pipe(SimpleMessenger
*r
, int st
, PipeConnection
*con
)
130 : RefCountedObject(r
->cct
),
135 conn_id(r
->dispatch_queue
.get_id()),
140 pipe_lock("SimpleMessenger::Pipe::pipe_lock"),
142 connection_state(NULL
),
143 reader_running(false), reader_needs_join(false),
144 reader_dispatching(false), notify_on_dispatch_done(false),
145 writer_running(false),
146 in_q(&(r
->dispatch_queue
)),
147 send_keepalive(false),
148 send_keepalive_ack(false),
149 connect_seq(0), peer_global_seq(0),
150 out_seq(0), in_seq(0), in_seq_acked(0) {
151 ANNOTATE_BENIGN_RACE_SIZED(&sd
, sizeof(sd
), "Pipe socket");
152 ANNOTATE_BENIGN_RACE_SIZED(&state
, sizeof(state
), "Pipe state");
153 ANNOTATE_BENIGN_RACE_SIZED(&recv_len
, sizeof(recv_len
), "Pipe recv_len");
154 ANNOTATE_BENIGN_RACE_SIZED(&recv_ofs
, sizeof(recv_ofs
), "Pipe recv_ofs");
156 connection_state
= con
;
157 connection_state
->reset_pipe(this);
159 connection_state
= new PipeConnection(msgr
->cct
, msgr
);
160 connection_state
->pipe
= get();
163 if (randomize_out_seq()) {
164 lsubdout(msgr
->cct
,ms
,15) << "Pipe(): Could not get random bytes to set seq number for session reset; set seq number to " << out_seq
<< dendl
;
168 msgr
->timeout
= msgr
->cct
->_conf
->ms_tcp_read_timeout
* 1000; //convert to ms
169 if (msgr
->timeout
== 0)
172 recv_max_prefetch
= msgr
->cct
->_conf
->ms_tcp_prefetch_max_size
;
173 recv_buf
= new char[recv_max_prefetch
];
178 assert(out_q
.empty());
179 assert(sent
.empty());
184 void Pipe::handle_ack(uint64_t seq
)
186 lsubdout(msgr
->cct
, ms
, 15) << "reader got ack seq " << seq
<< dendl
;
188 while (!sent
.empty() &&
189 sent
.front()->get_seq() <= seq
) {
190 Message
*m
= sent
.front();
192 lsubdout(msgr
->cct
, ms
, 10) << "reader got ack seq "
193 << seq
<< " >= " << m
->get_seq() << " on " << m
<< " " << *m
<< dendl
;
198 void Pipe::start_reader()
200 assert(pipe_lock
.is_locked());
201 assert(!reader_running
);
202 if (reader_needs_join
) {
203 reader_thread
.join();
204 reader_needs_join
= false;
206 reader_running
= true;
207 reader_thread
.create("ms_pipe_read", msgr
->cct
->_conf
->ms_rwthread_stack_bytes
);
210 void Pipe::maybe_start_delay_thread()
213 auto pos
= msgr
->cct
->_conf
->get_val
<std::string
>("ms_inject_delay_type").find(ceph_entity_type_name(connection_state
->peer_type
));
214 if (pos
!= string::npos
) {
215 lsubdout(msgr
->cct
, ms
, 1) << "setting up a delay queue on Pipe " << this << dendl
;
216 delay_thread
= new DelayedDelivery(this);
217 delay_thread
->create("ms_pipe_delay");
222 void Pipe::start_writer()
224 assert(pipe_lock
.is_locked());
225 assert(!writer_running
);
226 writer_running
= true;
227 writer_thread
.create("ms_pipe_write", msgr
->cct
->_conf
->ms_rwthread_stack_bytes
);
230 void Pipe::join_reader()
236 reader_thread
.join();
238 reader_needs_join
= false;
241 void Pipe::DelayedDelivery::discard()
243 lgeneric_subdout(pipe
->msgr
->cct
, ms
, 20) << *pipe
<< "DelayedDelivery::discard" << dendl
;
244 Mutex::Locker
l(delay_lock
);
245 while (!delay_queue
.empty()) {
246 Message
*m
= delay_queue
.front().second
;
247 pipe
->in_q
->dispatch_throttle_release(m
->get_dispatch_throttle_size());
249 delay_queue
.pop_front();
253 void Pipe::DelayedDelivery::flush()
255 lgeneric_subdout(pipe
->msgr
->cct
, ms
, 20) << *pipe
<< "DelayedDelivery::flush" << dendl
;
256 Mutex::Locker
l(delay_lock
);
257 flush_count
= delay_queue
.size();
261 void *Pipe::DelayedDelivery::entry()
263 Mutex::Locker
locker(delay_lock
);
264 lgeneric_subdout(pipe
->msgr
->cct
, ms
, 20) << *pipe
<< "DelayedDelivery::entry start" << dendl
;
266 while (!stop_delayed_delivery
) {
267 if (delay_queue
.empty()) {
268 lgeneric_subdout(pipe
->msgr
->cct
, ms
, 30) << *pipe
<< "DelayedDelivery::entry sleeping on delay_cond because delay queue is empty" << dendl
;
269 delay_cond
.Wait(delay_lock
);
272 utime_t release
= delay_queue
.front().first
;
273 Message
*m
= delay_queue
.front().second
;
274 string delay_msg_type
= pipe
->msgr
->cct
->_conf
->ms_inject_delay_msg_type
;
276 (release
> ceph_clock_now() &&
277 (delay_msg_type
.empty() || m
->get_type_name() == delay_msg_type
))) {
278 lgeneric_subdout(pipe
->msgr
->cct
, ms
, 10) << *pipe
<< "DelayedDelivery::entry sleeping on delay_cond until " << release
<< dendl
;
279 delay_cond
.WaitUntil(delay_lock
, release
);
282 lgeneric_subdout(pipe
->msgr
->cct
, ms
, 10) << *pipe
<< "DelayedDelivery::entry dequeuing message " << m
<< " for delivery, past " << release
<< dendl
;
283 delay_queue
.pop_front();
284 if (flush_count
> 0) {
288 if (pipe
->in_q
->can_fast_dispatch(m
)) {
289 if (!stop_fast_dispatching_flag
) {
290 delay_dispatching
= true;
292 pipe
->in_q
->fast_dispatch(m
);
294 delay_dispatching
= false;
295 if (stop_fast_dispatching_flag
) {
296 // we need to let the stopping thread proceed
303 pipe
->in_q
->enqueue(m
, m
->get_priority(), pipe
->conn_id
);
305 active_flush
= false;
307 lgeneric_subdout(pipe
->msgr
->cct
, ms
, 20) << *pipe
<< "DelayedDelivery::entry stop" << dendl
;
311 void Pipe::DelayedDelivery::stop_fast_dispatching() {
312 Mutex::Locker
l(delay_lock
);
313 stop_fast_dispatching_flag
= true;
314 while (delay_dispatching
)
315 delay_cond
.Wait(delay_lock
);
321 ldout(msgr
->cct
,10) << "accept" << dendl
;
322 assert(pipe_lock
.is_locked());
323 assert(state
== STATE_ACCEPTING
);
329 entity_addr_t socket_addr
;
332 char banner
[strlen(CEPH_BANNER
)+1];
334 ceph_msg_connect connect
;
335 ceph_msg_connect_reply reply
;
338 bufferlist authorizer
, authorizer_reply
;
339 bool authorizer_valid
;
340 uint64_t feat_missing
;
341 bool replaced
= false;
342 // this variable denotes if the connection attempt from peer is a hard
343 // reset or not, it is true if there is an existing connection and the
344 // connection sequence from peer is equal to zero
345 bool is_reset_from_peer
= false;
346 CryptoKey session_key
;
347 int removed
; // single-use down below
349 // this should roughly mirror pseudocode at
350 // http://ceph.com/wiki/Messaging_protocol
352 uint64_t existing_seq
= -1;
354 // used for reading in the remote acked seq on connect
355 uint64_t newly_acked_seq
= 0;
359 set_socket_options();
362 r
= tcp_write(CEPH_BANNER
, strlen(CEPH_BANNER
));
364 ldout(msgr
->cct
,10) << "accept couldn't write banner" << dendl
;
369 ::encode(msgr
->my_inst
.addr
, addrs
, 0); // legacy
371 port
= msgr
->my_inst
.addr
.get_port();
373 // and peer's socket addr (they might not know their ip)
376 r
= ::getpeername(sd
, (sockaddr
*)&ss
, &len
);
378 ldout(msgr
->cct
,0) << "accept failed to getpeername " << cpp_strerror(errno
) << dendl
;
381 socket_addr
.set_sockaddr((sockaddr
*)&ss
);
382 ::encode(socket_addr
, addrs
, 0); // legacy
384 r
= tcp_write(addrs
.c_str(), addrs
.length());
386 ldout(msgr
->cct
,10) << "accept couldn't write my+peer addr" << dendl
;
390 ldout(msgr
->cct
,1) << "accept sd=" << sd
<< " " << socket_addr
<< dendl
;
393 if (tcp_read(banner
, strlen(CEPH_BANNER
)) < 0) {
394 ldout(msgr
->cct
,10) << "accept couldn't read banner" << dendl
;
397 if (memcmp(banner
, CEPH_BANNER
, strlen(CEPH_BANNER
))) {
398 banner
[strlen(CEPH_BANNER
)] = 0;
399 ldout(msgr
->cct
,1) << "accept peer sent bad banner '" << banner
<< "' (should be '" << CEPH_BANNER
<< "')" << dendl
;
403 bufferptr
tp(sizeof(ceph_entity_addr
));
404 addrbl
.push_back(std::move(tp
));
406 if (tcp_read(addrbl
.c_str(), addrbl
.length()) < 0) {
407 ldout(msgr
->cct
,10) << "accept couldn't read peer_addr" << dendl
;
411 bufferlist::iterator ti
= addrbl
.begin();
412 ::decode(peer_addr
, ti
);
415 ldout(msgr
->cct
,10) << "accept peer addr is " << peer_addr
<< dendl
;
416 if (peer_addr
.is_blank_ip()) {
417 // peer apparently doesn't know what ip they have; figure it out for them.
418 int port
= peer_addr
.get_port();
419 peer_addr
.u
= socket_addr
.u
;
420 peer_addr
.set_port(port
);
421 ldout(msgr
->cct
,0) << "accept peer addr is really " << peer_addr
422 << " (socket is " << socket_addr
<< ")" << dendl
;
424 set_peer_addr(peer_addr
); // so that connection_state gets set up
427 if (tcp_read((char*)&connect
, sizeof(connect
)) < 0) {
428 ldout(msgr
->cct
,10) << "accept couldn't read connect" << dendl
;
433 if (connect
.authorizer_len
) {
434 bp
= buffer::create(connect
.authorizer_len
);
435 if (tcp_read(bp
.c_str(), connect
.authorizer_len
) < 0) {
436 ldout(msgr
->cct
,10) << "accept couldn't read connect authorizer" << dendl
;
439 authorizer
.push_back(std::move(bp
));
440 authorizer_reply
.clear();
443 ldout(msgr
->cct
,20) << "accept got peer connect_seq " << connect
.connect_seq
444 << " global_seq " << connect
.global_seq
447 msgr
->lock
.Lock(); // FIXME
449 if (msgr
->dispatch_queue
.stop
)
451 if (state
!= STATE_ACCEPTING
) {
455 // note peer's type, flags
456 set_peer_type(connect
.host_type
);
457 policy
= msgr
->get_policy(connect
.host_type
);
458 ldout(msgr
->cct
,10) << "accept of host_type " << connect
.host_type
459 << ", policy.lossy=" << policy
.lossy
460 << " policy.server=" << policy
.server
461 << " policy.standby=" << policy
.standby
462 << " policy.resetcheck=" << policy
.resetcheck
465 memset(&reply
, 0, sizeof(reply
));
466 reply
.protocol_version
= msgr
->get_proto_version(peer_type
, false);
470 ldout(msgr
->cct
,10) << "accept my proto " << reply
.protocol_version
471 << ", their proto " << connect
.protocol_version
<< dendl
;
472 if (connect
.protocol_version
!= reply
.protocol_version
) {
473 reply
.tag
= CEPH_MSGR_TAG_BADPROTOVER
;
477 // require signatures for cephx?
478 if (connect
.authorizer_protocol
== CEPH_AUTH_CEPHX
) {
479 if (peer_type
== CEPH_ENTITY_TYPE_OSD
||
480 peer_type
== CEPH_ENTITY_TYPE_MDS
) {
481 if (msgr
->cct
->_conf
->cephx_require_signatures
||
482 msgr
->cct
->_conf
->cephx_cluster_require_signatures
) {
483 ldout(msgr
->cct
,10) << "using cephx, requiring MSG_AUTH feature bit for cluster" << dendl
;
484 policy
.features_required
|= CEPH_FEATURE_MSG_AUTH
;
487 if (msgr
->cct
->_conf
->cephx_require_signatures
||
488 msgr
->cct
->_conf
->cephx_service_require_signatures
) {
489 ldout(msgr
->cct
,10) << "using cephx, requiring MSG_AUTH feature bit for service" << dendl
;
490 policy
.features_required
|= CEPH_FEATURE_MSG_AUTH
;
495 feat_missing
= policy
.features_required
& ~(uint64_t)connect
.features
;
497 ldout(msgr
->cct
,1) << "peer missing required features " << std::hex
<< feat_missing
<< std::dec
<< dendl
;
498 reply
.tag
= CEPH_MSGR_TAG_FEATURES
;
502 // Check the authorizer. If not good, bail out.
506 if (!msgr
->verify_authorizer(connection_state
.get(), peer_type
, connect
.authorizer_protocol
, authorizer
,
507 authorizer_reply
, authorizer_valid
, session_key
) ||
509 ldout(msgr
->cct
,0) << "accept: got bad authorizer" << dendl
;
511 if (state
!= STATE_ACCEPTING
)
512 goto shutting_down_msgr_unlocked
;
513 reply
.tag
= CEPH_MSGR_TAG_BADAUTHORIZER
;
514 session_security
.reset();
518 // We've verified the authorizer for this pipe, so set up the session security structure. PLR
520 ldout(msgr
->cct
,10) << "accept: setting up session_security." << dendl
;
522 retry_existing_lookup
:
525 if (msgr
->dispatch_queue
.stop
)
527 if (state
!= STATE_ACCEPTING
)
531 existing
= msgr
->_lookup_pipe(peer_addr
);
533 existing
->pipe_lock
.Lock(true); // skip lockdep check (we are locking a second Pipe here)
534 if (existing
->reader_dispatching
) {
535 /** we need to wait, or we can deadlock if downstream
536 * fast_dispatchers are (naughtily!) waiting on resources
537 * held by somebody trying to make use of the SimpleMessenger lock.
538 * So drop locks, wait, and retry. It just looks like a slow network
541 * We take a ref to existing here since it might get reaped before we
542 * wake up (see bug #15870). We can be confident that it lived until
543 * locked it since we held the msgr lock from _lookup_pipe through to
544 * locking existing->lock and checking reader_dispatching.
549 existing
->notify_on_dispatch_done
= true;
550 while (existing
->reader_dispatching
)
551 existing
->cond
.Wait(existing
->pipe_lock
);
552 existing
->pipe_lock
.Unlock();
555 goto retry_existing_lookup
;
558 if (connect
.global_seq
< existing
->peer_global_seq
) {
559 ldout(msgr
->cct
,10) << "accept existing " << existing
<< ".gseq " << existing
->peer_global_seq
560 << " > " << connect
.global_seq
<< ", RETRY_GLOBAL" << dendl
;
561 reply
.tag
= CEPH_MSGR_TAG_RETRY_GLOBAL
;
562 reply
.global_seq
= existing
->peer_global_seq
; // so we can send it below..
563 existing
->pipe_lock
.Unlock();
567 ldout(msgr
->cct
,10) << "accept existing " << existing
<< ".gseq " << existing
->peer_global_seq
568 << " <= " << connect
.global_seq
<< ", looks ok" << dendl
;
571 if (existing
->policy
.lossy
) {
572 ldout(msgr
->cct
,0) << "accept replacing existing (lossy) channel (new one lossy="
573 << policy
.lossy
<< ")" << dendl
;
574 existing
->was_session_reset();
578 ldout(msgr
->cct
,0) << "accept connect_seq " << connect
.connect_seq
579 << " vs existing " << existing
->connect_seq
580 << " state " << existing
->get_state_name() << dendl
;
582 if (connect
.connect_seq
== 0 && existing
->connect_seq
> 0) {
583 ldout(msgr
->cct
,0) << "accept peer reset, then tried to connect to us, replacing" << dendl
;
584 // this is a hard reset from peer
585 is_reset_from_peer
= true;
586 if (policy
.resetcheck
)
587 existing
->was_session_reset(); // this resets out_queue, msg_ and connect_seq #'s
591 if (connect
.connect_seq
< existing
->connect_seq
) {
592 // old attempt, or we sent READY but they didn't get it.
593 ldout(msgr
->cct
,10) << "accept existing " << existing
<< ".cseq " << existing
->connect_seq
594 << " > " << connect
.connect_seq
<< ", RETRY_SESSION" << dendl
;
598 if (connect
.connect_seq
== existing
->connect_seq
) {
599 // if the existing connection successfully opened, and/or
600 // subsequently went to standby, then the peer should bump
601 // their connect_seq and retry: this is not a connection race
602 // we need to resolve here.
603 if (existing
->state
== STATE_OPEN
||
604 existing
->state
== STATE_STANDBY
) {
605 ldout(msgr
->cct
,10) << "accept connection race, existing " << existing
606 << ".cseq " << existing
->connect_seq
607 << " == " << connect
.connect_seq
608 << ", OPEN|STANDBY, RETRY_SESSION" << dendl
;
613 if (peer_addr
< msgr
->my_inst
.addr
||
614 existing
->policy
.server
) {
616 ldout(msgr
->cct
,10) << "accept connection race, existing " << existing
<< ".cseq " << existing
->connect_seq
617 << " == " << connect
.connect_seq
<< ", or we are server, replacing my attempt" << dendl
;
618 if (!(existing
->state
== STATE_CONNECTING
||
619 existing
->state
== STATE_WAIT
))
620 lderr(msgr
->cct
) << "accept race bad state, would replace, existing="
621 << existing
->get_state_name()
622 << " " << existing
<< ".cseq=" << existing
->connect_seq
623 << " == " << connect
.connect_seq
625 assert(existing
->state
== STATE_CONNECTING
||
626 existing
->state
== STATE_WAIT
);
629 // our existing outgoing wins
630 ldout(msgr
->cct
,10) << "accept connection race, existing " << existing
<< ".cseq " << existing
->connect_seq
631 << " == " << connect
.connect_seq
<< ", sending WAIT" << dendl
;
632 assert(peer_addr
> msgr
->my_inst
.addr
);
633 if (!(existing
->state
== STATE_CONNECTING
))
634 lderr(msgr
->cct
) << "accept race bad state, would send wait, existing="
635 << existing
->get_state_name()
636 << " " << existing
<< ".cseq=" << existing
->connect_seq
637 << " == " << connect
.connect_seq
639 assert(existing
->state
== STATE_CONNECTING
);
640 // make sure our outgoing connection will follow through
641 existing
->_send_keepalive();
642 reply
.tag
= CEPH_MSGR_TAG_WAIT
;
643 existing
->pipe_lock
.Unlock();
649 assert(connect
.connect_seq
> existing
->connect_seq
);
650 assert(connect
.global_seq
>= existing
->peer_global_seq
);
651 if (policy
.resetcheck
&& // RESETSESSION only used by servers; peers do not reset each other
652 existing
->connect_seq
== 0) {
653 ldout(msgr
->cct
,0) << "accept we reset (peer sent cseq " << connect
.connect_seq
654 << ", " << existing
<< ".cseq = " << existing
->connect_seq
655 << "), sending RESETSESSION" << dendl
;
656 reply
.tag
= CEPH_MSGR_TAG_RESETSESSION
;
658 existing
->pipe_lock
.Unlock();
663 ldout(msgr
->cct
,10) << "accept peer sent cseq " << connect
.connect_seq
664 << " > " << existing
->connect_seq
<< dendl
;
667 else if (connect
.connect_seq
> 0) {
668 // we reset, and they are opening a new session
669 ldout(msgr
->cct
,0) << "accept we reset (peer sent cseq " << connect
.connect_seq
<< "), sending RESETSESSION" << dendl
;
671 reply
.tag
= CEPH_MSGR_TAG_RESETSESSION
;
675 ldout(msgr
->cct
,10) << "accept new session" << dendl
;
682 assert(existing
->pipe_lock
.is_locked());
683 assert(pipe_lock
.is_locked());
684 reply
.tag
= CEPH_MSGR_TAG_RETRY_SESSION
;
685 reply
.connect_seq
= existing
->connect_seq
+ 1;
686 existing
->pipe_lock
.Unlock();
691 assert(pipe_lock
.is_locked());
692 reply
.features
= ((uint64_t)connect
.features
& policy
.features_supported
) | policy
.features_required
;
693 reply
.authorizer_len
= authorizer_reply
.length();
695 r
= tcp_write((char*)&reply
, sizeof(reply
));
698 if (reply
.authorizer_len
) {
699 r
= tcp_write(authorizer_reply
.c_str(), authorizer_reply
.length());
706 assert(existing
->pipe_lock
.is_locked());
707 assert(pipe_lock
.is_locked());
708 // if it is a hard reset from peer, we don't need a round-trip to negotiate in/out sequence
709 if ((connect
.features
& CEPH_FEATURE_RECONNECT_SEQ
) && !is_reset_from_peer
) {
710 reply_tag
= CEPH_MSGR_TAG_SEQ
;
711 existing_seq
= existing
->in_seq
;
713 ldout(msgr
->cct
,10) << "accept replacing " << existing
<< dendl
;
715 existing
->unregister_pipe();
718 if (existing
->policy
.lossy
) {
719 // disconnect from the Connection
720 assert(existing
->connection_state
);
721 if (existing
->connection_state
->clear_pipe(existing
))
722 msgr
->dispatch_queue
.queue_reset(existing
->connection_state
.get());
724 // queue a reset on the new connection, which we're dumping for the old
725 msgr
->dispatch_queue
.queue_reset(connection_state
.get());
727 // drop my Connection, and take a ref to the existing one. do not
728 // clear existing->connection_state, since read_message and
729 // write_message both dereference it without pipe_lock.
730 connection_state
= existing
->connection_state
;
732 // make existing Connection reference us
733 connection_state
->reset_pipe(this);
735 if (existing
->delay_thread
) {
736 existing
->delay_thread
->steal_for_pipe(this);
737 delay_thread
= existing
->delay_thread
;
738 existing
->delay_thread
= NULL
;
739 delay_thread
->flush();
742 // steal incoming queue
743 uint64_t replaced_conn_id
= conn_id
;
744 conn_id
= existing
->conn_id
;
745 existing
->conn_id
= replaced_conn_id
;
747 // reset the in_seq if this is a hard reset from peer,
748 // otherwise we respect our original connection's value
749 in_seq
= is_reset_from_peer
? 0 : existing
->in_seq
;
750 in_seq_acked
= in_seq
;
752 // steal outgoing queue and out_seq
753 existing
->requeue_sent();
754 out_seq
= existing
->out_seq
;
755 ldout(msgr
->cct
,10) << "accept re-queuing on out_seq " << out_seq
<< " in_seq " << in_seq
<< dendl
;
756 for (map
<int, list
<Message
*> >::iterator p
= existing
->out_q
.begin();
757 p
!= existing
->out_q
.end();
759 out_q
[p
->first
].splice(out_q
[p
->first
].begin(), p
->second
);
761 existing
->stop_and_wait();
762 existing
->pipe_lock
.Unlock();
766 assert(pipe_lock
.is_locked());
767 connect_seq
= connect
.connect_seq
+ 1;
768 peer_global_seq
= connect
.global_seq
;
769 assert(state
== STATE_ACCEPTING
);
771 ldout(msgr
->cct
,10) << "accept success, connect_seq = " << connect_seq
<< ", sending READY" << dendl
;
774 reply
.tag
= (reply_tag
? reply_tag
: CEPH_MSGR_TAG_READY
);
775 reply
.features
= policy
.features_supported
;
776 reply
.global_seq
= msgr
->get_global_seq();
777 reply
.connect_seq
= connect_seq
;
779 reply
.authorizer_len
= authorizer_reply
.length();
781 reply
.flags
= reply
.flags
| CEPH_MSG_CONNECT_LOSSY
;
783 connection_state
->set_features((uint64_t)reply
.features
& (uint64_t)connect
.features
);
784 ldout(msgr
->cct
,10) << "accept features " << connection_state
->get_features() << dendl
;
786 session_security
.reset(
787 get_auth_session_handler(msgr
->cct
,
788 connect
.authorizer_protocol
,
790 connection_state
->get_features()));
793 msgr
->dispatch_queue
.queue_accept(connection_state
.get());
794 msgr
->ms_deliver_handle_fast_accept(connection_state
.get());
797 if (msgr
->dispatch_queue
.stop
)
799 removed
= msgr
->accepting_pipes
.erase(this);
800 assert(removed
== 1);
805 r
= tcp_write((char*)&reply
, sizeof(reply
));
807 goto fail_registered
;
810 if (reply
.authorizer_len
) {
811 r
= tcp_write(authorizer_reply
.c_str(), authorizer_reply
.length());
813 goto fail_registered
;
817 if (reply_tag
== CEPH_MSGR_TAG_SEQ
) {
818 if (tcp_write((char*)&existing_seq
, sizeof(existing_seq
)) < 0) {
819 ldout(msgr
->cct
,2) << "accept write error on in_seq" << dendl
;
820 goto fail_registered
;
822 if (tcp_read((char*)&newly_acked_seq
, sizeof(newly_acked_seq
)) < 0) {
823 ldout(msgr
->cct
,2) << "accept read error on newly_acked_seq" << dendl
;
824 goto fail_registered
;
829 discard_requeued_up_to(newly_acked_seq
);
830 if (state
!= STATE_CLOSED
) {
831 ldout(msgr
->cct
,10) << "accept starting writer, state " << get_state_name() << dendl
;
834 ldout(msgr
->cct
,20) << "accept done" << dendl
;
836 maybe_start_delay_thread();
838 return 0; // success.
841 ldout(msgr
->cct
, 10) << "accept fault after register" << dendl
;
843 if (msgr
->cct
->_conf
->ms_inject_internal_delays
) {
844 ldout(msgr
->cct
, 10) << " sleep for " << msgr
->cct
->_conf
->ms_inject_internal_delays
<< dendl
;
846 t
.set_from_double(msgr
->cct
->_conf
->ms_inject_internal_delays
);
852 if (state
!= STATE_CLOSED
) {
853 bool queued
= is_queued();
854 ldout(msgr
->cct
, 10) << " queued = " << (int)queued
<< dendl
;
856 state
= policy
.server
? STATE_STANDBY
: STATE_CONNECTING
;
857 } else if (replaced
) {
858 state
= STATE_STANDBY
;
860 state
= STATE_CLOSED
;
864 if (queued
|| replaced
)
871 shutting_down_msgr_unlocked
:
872 assert(pipe_lock
.is_locked());
874 if (msgr
->cct
->_conf
->ms_inject_internal_delays
) {
875 ldout(msgr
->cct
, 10) << " sleep for " << msgr
->cct
->_conf
->ms_inject_internal_delays
<< dendl
;
877 t
.set_from_double(msgr
->cct
->_conf
->ms_inject_internal_delays
);
881 state
= STATE_CLOSED
;
887 void Pipe::set_socket_options()
889 // disable Nagle algorithm?
890 if (msgr
->cct
->_conf
->ms_tcp_nodelay
) {
892 int r
= ::setsockopt(sd
, IPPROTO_TCP
, TCP_NODELAY
, (char*)&flag
, sizeof(flag
));
895 ldout(msgr
->cct
,0) << "couldn't set TCP_NODELAY: "
896 << cpp_strerror(r
) << dendl
;
899 if (msgr
->cct
->_conf
->ms_tcp_rcvbuf
) {
900 int size
= msgr
->cct
->_conf
->ms_tcp_rcvbuf
;
901 int r
= ::setsockopt(sd
, SOL_SOCKET
, SO_RCVBUF
, (void*)&size
, sizeof(size
));
904 ldout(msgr
->cct
,0) << "couldn't set SO_RCVBUF to " << size
905 << ": " << cpp_strerror(r
) << dendl
;
910 #if defined(SO_NOSIGPIPE)
912 int r
= ::setsockopt(sd
, SOL_SOCKET
, SO_NOSIGPIPE
, (void*)&val
, sizeof(val
));
915 ldout(msgr
->cct
,0) << "couldn't set SO_NOSIGPIPE: "
916 << cpp_strerror(r
) << dendl
;
921 int prio
= msgr
->get_socket_priority();
924 #ifdef IPTOS_CLASS_CS6
925 int iptos
= IPTOS_CLASS_CS6
;
927 if (!peer_addr
.is_blank_ip()) {
928 addr_family
= peer_addr
.get_family();
930 addr_family
= msgr
->get_myaddr().get_family();
932 switch (addr_family
) {
934 r
= ::setsockopt(sd
, IPPROTO_IP
, IP_TOS
, &iptos
, sizeof(iptos
));
937 r
= ::setsockopt(sd
, IPPROTO_IPV6
, IPV6_TCLASS
, &iptos
, sizeof(iptos
));
940 lderr(msgr
->cct
) << "couldn't set ToS of unknown family ("
941 << addr_family
<< ")"
942 << " to " << iptos
<< dendl
;
947 ldout(msgr
->cct
,0) << "couldn't set TOS to " << iptos
948 << ": " << cpp_strerror(r
) << dendl
;
951 // setsockopt(IPTOS_CLASS_CS6) sets the priority of the socket as 0.
952 // See http://goo.gl/QWhvsD and http://goo.gl/laTbjT
953 // We need to call setsockopt(SO_PRIORITY) after it.
954 r
= ::setsockopt(sd
, SOL_SOCKET
, SO_PRIORITY
, &prio
, sizeof(prio
));
957 ldout(msgr
->cct
,0) << "couldn't set SO_PRIORITY to " << prio
958 << ": " << cpp_strerror(r
) << dendl
;
966 bool got_bad_auth
= false;
968 ldout(msgr
->cct
,10) << "connect " << connect_seq
<< dendl
;
969 assert(pipe_lock
.is_locked());
971 __u32 cseq
= connect_seq
;
972 __u32 gseq
= msgr
->get_global_seq();
974 // stop reader thread
982 struct iovec msgvec
[2];
984 char banner
[strlen(CEPH_BANNER
) + 1]; // extra byte makes coverity happy
986 entity_addr_t peer_addr_for_me
, socket_addr
;
987 AuthAuthorizer
*authorizer
= NULL
;
988 bufferlist addrbl
, myaddrbl
;
989 const md_config_t
*conf
= msgr
->cct
->_conf
;
991 // close old socket. this is safe because we stopped the reader thread above.
996 sd
= ::socket(peer_addr
.get_family(), SOCK_STREAM
, 0);
999 lderr(msgr
->cct
) << "connect couldn't create socket " << cpp_strerror(rc
) << dendl
;
1005 set_socket_options();
1008 entity_addr_t addr2bind
= msgr
->get_myaddr();
1009 if (msgr
->cct
->_conf
->ms_bind_before_connect
&& (!addr2bind
.is_blank_ip())) {
1010 addr2bind
.set_port(0);
1011 int r
= ::bind(sd
, addr2bind
.get_sockaddr(), addr2bind
.get_sockaddr_len());
1013 ldout(msgr
->cct
,2) << "client bind error " << ", " << cpp_strerror(errno
) << dendl
;
1020 ldout(msgr
->cct
,10) << "connecting to " << peer_addr
<< dendl
;
1021 rc
= ::connect(sd
, peer_addr
.get_sockaddr(), peer_addr
.get_sockaddr_len());
1023 int stored_errno
= errno
;
1024 ldout(msgr
->cct
,2) << "connect error " << peer_addr
1025 << ", " << cpp_strerror(stored_errno
) << dendl
;
1026 if (stored_errno
== ECONNREFUSED
) {
1027 ldout(msgr
->cct
, 2) << "connection refused!" << dendl
;
1028 msgr
->dispatch_queue
.queue_refused(connection_state
.get());
1034 // FIXME: this should be non-blocking, or in some other way verify the banner as we get it.
1035 rc
= tcp_read((char*)&banner
, strlen(CEPH_BANNER
));
1037 ldout(msgr
->cct
,2) << "connect couldn't read banner, " << cpp_strerror(rc
) << dendl
;
1040 if (memcmp(banner
, CEPH_BANNER
, strlen(CEPH_BANNER
))) {
1041 ldout(msgr
->cct
,0) << "connect protocol error (bad banner) on peer " << peer_addr
<< dendl
;
1045 memset(&msg
, 0, sizeof(msg
));
1046 msgvec
[0].iov_base
= banner
;
1047 msgvec
[0].iov_len
= strlen(CEPH_BANNER
);
1048 msg
.msg_iov
= msgvec
;
1050 msglen
= msgvec
[0].iov_len
;
1051 rc
= do_sendmsg(&msg
, msglen
);
1053 ldout(msgr
->cct
,2) << "connect couldn't write my banner, " << cpp_strerror(rc
) << dendl
;
1059 #if defined(__linux__) || defined(DARWIN) || defined(__FreeBSD__)
1060 bufferptr
p(sizeof(ceph_entity_addr
) * 2);
1062 int wirelen
= sizeof(__u32
) * 2 + sizeof(ceph_sockaddr_storage
);
1063 bufferptr
p(wirelen
* 2);
1065 addrbl
.push_back(std::move(p
));
1067 rc
= tcp_read(addrbl
.c_str(), addrbl
.length());
1069 ldout(msgr
->cct
,2) << "connect couldn't read peer addrs, " << cpp_strerror(rc
) << dendl
;
1073 bufferlist::iterator p
= addrbl
.begin();
1075 ::decode(peer_addr_for_me
, p
);
1077 catch (buffer::error
& e
) {
1078 ldout(msgr
->cct
,2) << "connect couldn't decode peer addrs: " << e
.what()
1082 port
= peer_addr_for_me
.get_port();
1084 ldout(msgr
->cct
,20) << "connect read peer addr " << paddr
<< " on socket " << sd
<< dendl
;
1085 if (peer_addr
!= paddr
) {
1086 if (paddr
.is_blank_ip() &&
1087 peer_addr
.get_port() == paddr
.get_port() &&
1088 peer_addr
.get_nonce() == paddr
.get_nonce()) {
1089 ldout(msgr
->cct
,0) << "connect claims to be "
1090 << paddr
<< " not " << peer_addr
<< " - presumably this is the same node!" << dendl
;
1092 ldout(msgr
->cct
,10) << "connect claims to be "
1093 << paddr
<< " not " << peer_addr
1094 << " (peer is possibly using public_bind_addr?) " << dendl
;
1098 ldout(msgr
->cct
,20) << "connect peer addr for me is " << peer_addr_for_me
<< dendl
;
1100 msgr
->learned_addr(peer_addr_for_me
);
1102 ::encode(msgr
->my_inst
.addr
, myaddrbl
, 0); // legacy
1104 memset(&msg
, 0, sizeof(msg
));
1105 msgvec
[0].iov_base
= myaddrbl
.c_str();
1106 msgvec
[0].iov_len
= myaddrbl
.length();
1107 msg
.msg_iov
= msgvec
;
1109 msglen
= msgvec
[0].iov_len
;
1110 rc
= do_sendmsg(&msg
, msglen
);
1112 ldout(msgr
->cct
,2) << "connect couldn't write my addr, " << cpp_strerror(rc
) << dendl
;
1115 ldout(msgr
->cct
,10) << "connect sent my addr " << msgr
->my_inst
.addr
<< dendl
;
1120 authorizer
= msgr
->get_authorizer(peer_type
, false);
1121 bufferlist authorizer_reply
;
1123 ceph_msg_connect connect
;
1124 connect
.features
= policy
.features_supported
;
1125 connect
.host_type
= msgr
->get_myinst().name
.type();
1126 connect
.global_seq
= gseq
;
1127 connect
.connect_seq
= cseq
;
1128 connect
.protocol_version
= msgr
->get_proto_version(peer_type
, true);
1129 connect
.authorizer_protocol
= authorizer
? authorizer
->protocol
: 0;
1130 connect
.authorizer_len
= authorizer
? authorizer
->bl
.length() : 0;
1132 ldout(msgr
->cct
,10) << "connect.authorizer_len=" << connect
.authorizer_len
1133 << " protocol=" << connect
.authorizer_protocol
<< dendl
;
1136 connect
.flags
|= CEPH_MSG_CONNECT_LOSSY
; // this is fyi, actually, server decides!
1137 memset(&msg
, 0, sizeof(msg
));
1138 msgvec
[0].iov_base
= (char*)&connect
;
1139 msgvec
[0].iov_len
= sizeof(connect
);
1140 msg
.msg_iov
= msgvec
;
1142 msglen
= msgvec
[0].iov_len
;
1144 msgvec
[1].iov_base
= authorizer
->bl
.c_str();
1145 msgvec
[1].iov_len
= authorizer
->bl
.length();
1147 msglen
+= msgvec
[1].iov_len
;
1150 ldout(msgr
->cct
,10) << "connect sending gseq=" << gseq
<< " cseq=" << cseq
1151 << " proto=" << connect
.protocol_version
<< dendl
;
1152 rc
= do_sendmsg(&msg
, msglen
);
1154 ldout(msgr
->cct
,2) << "connect couldn't write gseq, cseq, " << cpp_strerror(rc
) << dendl
;
1158 ldout(msgr
->cct
,20) << "connect wrote (self +) cseq, waiting for reply" << dendl
;
1159 ceph_msg_connect_reply reply
;
1160 rc
= tcp_read((char*)&reply
, sizeof(reply
));
1162 ldout(msgr
->cct
,2) << "connect read reply " << cpp_strerror(rc
) << dendl
;
1166 ldout(msgr
->cct
,20) << "connect got reply tag " << (int)reply
.tag
1167 << " connect_seq " << reply
.connect_seq
1168 << " global_seq " << reply
.global_seq
1169 << " proto " << reply
.protocol_version
1170 << " flags " << (int)reply
.flags
1171 << " features " << reply
.features
1174 authorizer_reply
.clear();
1176 if (reply
.authorizer_len
) {
1177 ldout(msgr
->cct
,10) << "reply.authorizer_len=" << reply
.authorizer_len
<< dendl
;
1178 bufferptr bp
= buffer::create(reply
.authorizer_len
);
1179 rc
= tcp_read(bp
.c_str(), reply
.authorizer_len
);
1181 ldout(msgr
->cct
,10) << "connect couldn't read connect authorizer_reply" << cpp_strerror(rc
) << dendl
;
1184 authorizer_reply
.push_back(bp
);
1188 bufferlist::iterator iter
= authorizer_reply
.begin();
1189 if (!authorizer
->verify_reply(iter
)) {
1190 ldout(msgr
->cct
,0) << "failed verifying authorize reply" << dendl
;
1195 if (conf
->ms_inject_internal_delays
) {
1196 ldout(msgr
->cct
, 10) << " sleep for " << msgr
->cct
->_conf
->ms_inject_internal_delays
<< dendl
;
1198 t
.set_from_double(msgr
->cct
->_conf
->ms_inject_internal_delays
);
1203 if (state
!= STATE_CONNECTING
) {
1204 ldout(msgr
->cct
,0) << "connect got RESETSESSION but no longer connecting" << dendl
;
1208 if (reply
.tag
== CEPH_MSGR_TAG_FEATURES
) {
1209 ldout(msgr
->cct
,0) << "connect protocol feature mismatch, my " << std::hex
1210 << connect
.features
<< " < peer " << reply
.features
1211 << " missing " << (reply
.features
& ~policy
.features_supported
)
1212 << std::dec
<< dendl
;
1216 if (reply
.tag
== CEPH_MSGR_TAG_BADPROTOVER
) {
1217 ldout(msgr
->cct
,0) << "connect protocol version mismatch, my " << connect
.protocol_version
1218 << " != " << reply
.protocol_version
<< dendl
;
1222 if (reply
.tag
== CEPH_MSGR_TAG_BADAUTHORIZER
) {
1223 ldout(msgr
->cct
,0) << "connect got BADAUTHORIZER" << dendl
;
1226 got_bad_auth
= true;
1229 authorizer
= msgr
->get_authorizer(peer_type
, true); // try harder
1232 if (reply
.tag
== CEPH_MSGR_TAG_RESETSESSION
) {
1233 ldout(msgr
->cct
,0) << "connect got RESETSESSION" << dendl
;
1234 was_session_reset();
1239 if (reply
.tag
== CEPH_MSGR_TAG_RETRY_GLOBAL
) {
1240 gseq
= msgr
->get_global_seq(reply
.global_seq
);
1241 ldout(msgr
->cct
,10) << "connect got RETRY_GLOBAL " << reply
.global_seq
1242 << " chose new " << gseq
<< dendl
;
1246 if (reply
.tag
== CEPH_MSGR_TAG_RETRY_SESSION
) {
1247 assert(reply
.connect_seq
> connect_seq
);
1248 ldout(msgr
->cct
,10) << "connect got RETRY_SESSION " << connect_seq
1249 << " -> " << reply
.connect_seq
<< dendl
;
1250 cseq
= connect_seq
= reply
.connect_seq
;
1255 if (reply
.tag
== CEPH_MSGR_TAG_WAIT
) {
1256 ldout(msgr
->cct
,3) << "connect got WAIT (connection race)" << dendl
;
1261 if (reply
.tag
== CEPH_MSGR_TAG_READY
||
1262 reply
.tag
== CEPH_MSGR_TAG_SEQ
) {
1263 uint64_t feat_missing
= policy
.features_required
& ~(uint64_t)reply
.features
;
1265 ldout(msgr
->cct
,1) << "missing required features " << std::hex
<< feat_missing
<< std::dec
<< dendl
;
1269 if (reply
.tag
== CEPH_MSGR_TAG_SEQ
) {
1270 ldout(msgr
->cct
,10) << "got CEPH_MSGR_TAG_SEQ, reading acked_seq and writing in_seq" << dendl
;
1271 uint64_t newly_acked_seq
= 0;
1272 rc
= tcp_read((char*)&newly_acked_seq
, sizeof(newly_acked_seq
));
1274 ldout(msgr
->cct
,2) << "connect read error on newly_acked_seq" << cpp_strerror(rc
) << dendl
;
1277 ldout(msgr
->cct
,2) << " got newly_acked_seq " << newly_acked_seq
1278 << " vs out_seq " << out_seq
<< dendl
;
1279 while (newly_acked_seq
> out_seq
) {
1280 Message
*m
= _get_next_outgoing();
1282 ldout(msgr
->cct
,2) << " discarding previously sent " << m
->get_seq()
1283 << " " << *m
<< dendl
;
1284 assert(m
->get_seq() <= newly_acked_seq
);
1288 if (tcp_write((char*)&in_seq
, sizeof(in_seq
)) < 0) {
1289 ldout(msgr
->cct
,2) << "connect write error on in_seq" << dendl
;
1295 peer_global_seq
= reply
.global_seq
;
1296 policy
.lossy
= reply
.flags
& CEPH_MSG_CONNECT_LOSSY
;
1298 connect_seq
= cseq
+ 1;
1299 assert(connect_seq
== reply
.connect_seq
);
1300 backoff
= utime_t();
1301 connection_state
->set_features((uint64_t)reply
.features
& (uint64_t)connect
.features
);
1302 ldout(msgr
->cct
,10) << "connect success " << connect_seq
<< ", lossy = " << policy
.lossy
1303 << ", features " << connection_state
->get_features() << dendl
;
1306 // If we have an authorizer, get a new AuthSessionHandler to deal with ongoing security of the
1309 if (authorizer
!= NULL
) {
1310 session_security
.reset(
1311 get_auth_session_handler(msgr
->cct
,
1312 authorizer
->protocol
,
1313 authorizer
->session_key
,
1314 connection_state
->get_features()));
1316 // We have no authorizer, so we shouldn't be applying security to messages in this pipe. PLR
1317 session_security
.reset();
1320 msgr
->dispatch_queue
.queue_connect(connection_state
.get());
1321 msgr
->ms_deliver_handle_fast_connect(connection_state
.get());
1323 if (!reader_running
) {
1324 ldout(msgr
->cct
,20) << "connect starting reader" << dendl
;
1327 maybe_start_delay_thread();
1333 ldout(msgr
->cct
,0) << "connect got bad tag " << (int)tag
<< dendl
;
1338 if (conf
->ms_inject_internal_delays
) {
1339 ldout(msgr
->cct
, 10) << " sleep for " << msgr
->cct
->_conf
->ms_inject_internal_delays
<< dendl
;
1341 t
.set_from_double(msgr
->cct
->_conf
->ms_inject_internal_delays
);
1347 if (state
== STATE_CONNECTING
)
1350 ldout(msgr
->cct
,3) << "connect fault, but state = " << get_state_name()
1351 << " != connecting, stopping" << dendl
;
1358 void Pipe::register_pipe()
1360 ldout(msgr
->cct
,10) << "register_pipe" << dendl
;
1361 assert(msgr
->lock
.is_locked());
1362 Pipe
*existing
= msgr
->_lookup_pipe(peer_addr
);
1363 assert(existing
== NULL
);
1364 msgr
->rank_pipe
[peer_addr
] = this;
1367 void Pipe::unregister_pipe()
1369 assert(msgr
->lock
.is_locked());
1370 ceph::unordered_map
<entity_addr_t
,Pipe
*>::iterator p
= msgr
->rank_pipe
.find(peer_addr
);
1371 if (p
!= msgr
->rank_pipe
.end() && p
->second
== this) {
1372 ldout(msgr
->cct
,10) << "unregister_pipe" << dendl
;
1373 msgr
->rank_pipe
.erase(p
);
1375 ldout(msgr
->cct
,10) << "unregister_pipe - not registered" << dendl
;
1376 msgr
->accepting_pipes
.erase(this); // somewhat overkill, but safe.
1382 ldout(msgr
->cct
, 20) << "join" << dendl
;
1383 if (writer_thread
.is_started())
1384 writer_thread
.join();
1385 if (reader_thread
.is_started())
1386 reader_thread
.join();
1388 ldout(msgr
->cct
, 20) << "joining delay_thread" << dendl
;
1389 delay_thread
->stop();
1390 delay_thread
->join();
1394 void Pipe::requeue_sent()
1399 list
<Message
*>& rq
= out_q
[CEPH_MSG_PRIO_HIGHEST
];
1400 while (!sent
.empty()) {
1401 Message
*m
= sent
.back();
1403 ldout(msgr
->cct
,10) << "requeue_sent " << *m
<< " for resend seq " << out_seq
1404 << " (" << m
->get_seq() << ")" << dendl
;
1410 void Pipe::discard_requeued_up_to(uint64_t seq
)
1412 ldout(msgr
->cct
, 10) << "discard_requeued_up_to " << seq
<< dendl
;
1413 if (out_q
.count(CEPH_MSG_PRIO_HIGHEST
) == 0)
1415 list
<Message
*>& rq
= out_q
[CEPH_MSG_PRIO_HIGHEST
];
1416 while (!rq
.empty()) {
1417 Message
*m
= rq
.front();
1418 if (m
->get_seq() == 0 || m
->get_seq() > seq
)
1420 ldout(msgr
->cct
,10) << "discard_requeued_up_to " << *m
<< " for resend seq " << out_seq
1421 << " <= " << seq
<< ", discarding" << dendl
;
1427 out_q
.erase(CEPH_MSG_PRIO_HIGHEST
);
1431 * Tears down the Pipe's message queues, and removes them from the DispatchQueue
1432 * Must hold pipe_lock prior to calling.
1434 void Pipe::discard_out_queue()
1436 ldout(msgr
->cct
,10) << "discard_queue" << dendl
;
1438 for (list
<Message
*>::iterator p
= sent
.begin(); p
!= sent
.end(); ++p
) {
1439 ldout(msgr
->cct
,20) << " discard " << *p
<< dendl
;
1443 for (map
<int,list
<Message
*> >::iterator p
= out_q
.begin(); p
!= out_q
.end(); ++p
)
1444 for (list
<Message
*>::iterator r
= p
->second
.begin(); r
!= p
->second
.end(); ++r
) {
1445 ldout(msgr
->cct
,20) << " discard " << *r
<< dendl
;
1451 void Pipe::fault(bool onread
)
1453 const md_config_t
*conf
= msgr
->cct
->_conf
;
1454 assert(pipe_lock
.is_locked());
1457 if (onread
&& state
== STATE_CONNECTING
) {
1458 ldout(msgr
->cct
,10) << "fault already connecting, reader shutting down" << dendl
;
1462 ldout(msgr
->cct
,2) << "fault " << cpp_strerror(errno
) << dendl
;
1464 if (state
== STATE_CLOSED
||
1465 state
== STATE_CLOSING
) {
1466 ldout(msgr
->cct
,10) << "fault already closed|closing" << dendl
;
1467 if (connection_state
->clear_pipe(this))
1468 msgr
->dispatch_queue
.queue_reset(connection_state
.get());
1475 if (policy
.lossy
&& state
!= STATE_CONNECTING
) {
1476 ldout(msgr
->cct
,10) << "fault on lossy channel, failing" << dendl
;
1478 // disconnect from Connection, and mark it failed. future messages
1480 assert(connection_state
);
1482 bool cleared
= connection_state
->clear_pipe(this);
1484 // crib locks, blech. note that Pipe is now STATE_CLOSED and the
1485 // rank_pipe entry is ignored by others.
1488 if (conf
->ms_inject_internal_delays
) {
1489 ldout(msgr
->cct
, 10) << " sleep for " << msgr
->cct
->_conf
->ms_inject_internal_delays
<< dendl
;
1491 t
.set_from_double(msgr
->cct
->_conf
->ms_inject_internal_delays
);
1498 msgr
->lock
.Unlock();
1501 delay_thread
->discard();
1502 in_q
->discard_queue(conn_id
);
1503 discard_out_queue();
1505 msgr
->dispatch_queue
.queue_reset(connection_state
.get());
1509 // queue delayed items immediately
1511 delay_thread
->flush();
1513 // requeue sent items
1516 if (policy
.standby
&& !is_queued()) {
1517 ldout(msgr
->cct
,0) << "fault with nothing to send, going to standby" << dendl
;
1518 state
= STATE_STANDBY
;
1522 if (state
!= STATE_CONNECTING
) {
1523 if (policy
.server
) {
1524 ldout(msgr
->cct
,0) << "fault, server, going to standby" << dendl
;
1525 state
= STATE_STANDBY
;
1527 ldout(msgr
->cct
,0) << "fault, initiating reconnect" << dendl
;
1529 state
= STATE_CONNECTING
;
1531 backoff
= utime_t();
1532 } else if (backoff
== utime_t()) {
1533 ldout(msgr
->cct
,0) << "fault" << dendl
;
1534 backoff
.set_from_double(conf
->ms_initial_backoff
);
1536 ldout(msgr
->cct
,10) << "fault waiting " << backoff
<< dendl
;
1537 cond
.WaitInterval(pipe_lock
, backoff
);
1539 if (backoff
> conf
->ms_max_backoff
)
1540 backoff
.set_from_double(conf
->ms_max_backoff
);
1541 ldout(msgr
->cct
,10) << "fault done waiting or woke up" << dendl
;
1545 int Pipe::randomize_out_seq()
1547 if (connection_state
->get_features() & CEPH_FEATURE_MSG_AUTH
) {
1548 // Set out_seq to a random value, so CRC won't be predictable. Don't bother checking seq_error
1549 // here. We'll check it on the call. PLR
1550 int seq_error
= get_random_bytes((char *)&out_seq
, sizeof(out_seq
));
1551 out_seq
&= SEQ_MASK
;
1552 lsubdout(msgr
->cct
, ms
, 10) << "randomize_out_seq " << out_seq
<< dendl
;
1555 // previously, seq #'s always started at 0.
1561 void Pipe::was_session_reset()
1563 assert(pipe_lock
.is_locked());
1565 ldout(msgr
->cct
,10) << "was_session_reset" << dendl
;
1566 in_q
->discard_queue(conn_id
);
1568 delay_thread
->discard();
1569 discard_out_queue();
1571 msgr
->dispatch_queue
.queue_remote_reset(connection_state
.get());
1573 if (randomize_out_seq()) {
1574 lsubdout(msgr
->cct
,ms
,15) << "was_session_reset(): Could not get random bytes to set seq number for session reset; set seq number to " << out_seq
<< dendl
;
1583 ldout(msgr
->cct
,10) << "stop" << dendl
;
1584 assert(pipe_lock
.is_locked());
1585 state
= STATE_CLOSED
;
1586 state_closed
= true;
1591 void Pipe::stop_and_wait()
1593 assert(pipe_lock
.is_locked_by_me());
1594 if (state
!= STATE_CLOSED
)
1597 if (msgr
->cct
->_conf
->ms_inject_internal_delays
) {
1598 ldout(msgr
->cct
, 10) << __func__
<< " sleep for "
1599 << msgr
->cct
->_conf
->ms_inject_internal_delays
1602 t
.set_from_double(msgr
->cct
->_conf
->ms_inject_internal_delays
);
1608 delay_thread
->stop_fast_dispatching();
1611 while (reader_running
&&
1613 cond
.Wait(pipe_lock
);
1616 /* read msgs from socket.
1623 if (state
== STATE_ACCEPTING
) {
1625 assert(pipe_lock
.is_locked());
1629 while (state
!= STATE_CLOSED
&&
1630 state
!= STATE_CONNECTING
) {
1631 assert(pipe_lock
.is_locked());
1633 // sleep if (re)connecting
1634 if (state
== STATE_STANDBY
) {
1635 ldout(msgr
->cct
,20) << "reader sleeping during reconnect|standby" << dendl
;
1636 cond
.Wait(pipe_lock
);
1640 // get a reference to the AuthSessionHandler while we have the pipe_lock
1641 ceph::shared_ptr
<AuthSessionHandler
> auth_handler
= session_security
;
1646 ldout(msgr
->cct
,20) << "reader reading tag..." << dendl
;
1647 if (tcp_read((char*)&tag
, 1) < 0) {
1649 ldout(msgr
->cct
,2) << "reader couldn't read tag, " << cpp_strerror(errno
) << dendl
;
1654 if (tag
== CEPH_MSGR_TAG_KEEPALIVE
) {
1655 ldout(msgr
->cct
,2) << "reader got KEEPALIVE" << dendl
;
1657 connection_state
->set_last_keepalive(ceph_clock_now());
1660 if (tag
== CEPH_MSGR_TAG_KEEPALIVE2
) {
1661 ldout(msgr
->cct
,30) << "reader got KEEPALIVE2 tag ..." << dendl
;
1663 int rc
= tcp_read((char*)&t
, sizeof(t
));
1666 ldout(msgr
->cct
,2) << "reader couldn't read KEEPALIVE2 stamp "
1667 << cpp_strerror(errno
) << dendl
;
1670 send_keepalive_ack
= true;
1671 keepalive_ack_stamp
= utime_t(t
);
1672 ldout(msgr
->cct
,2) << "reader got KEEPALIVE2 " << keepalive_ack_stamp
1674 connection_state
->set_last_keepalive(ceph_clock_now());
1679 if (tag
== CEPH_MSGR_TAG_KEEPALIVE2_ACK
) {
1680 ldout(msgr
->cct
,2) << "reader got KEEPALIVE_ACK" << dendl
;
1681 struct ceph_timespec t
;
1682 int rc
= tcp_read((char*)&t
, sizeof(t
));
1685 ldout(msgr
->cct
,2) << "reader couldn't read KEEPALIVE2 stamp " << cpp_strerror(errno
) << dendl
;
1688 connection_state
->set_last_keepalive_ack(utime_t(t
));
1694 if (tag
== CEPH_MSGR_TAG_ACK
) {
1695 ldout(msgr
->cct
,20) << "reader got ACK" << dendl
;
1697 int rc
= tcp_read((char*)&seq
, sizeof(seq
));
1700 ldout(msgr
->cct
,2) << "reader couldn't read ack seq, " << cpp_strerror(errno
) << dendl
;
1702 } else if (state
!= STATE_CLOSED
) {
1708 else if (tag
== CEPH_MSGR_TAG_MSG
) {
1709 ldout(msgr
->cct
,20) << "reader got MSG" << dendl
;
1711 int r
= read_message(&m
, auth_handler
.get());
1721 m
->trace
.event("pipe read message");
1723 if (state
== STATE_CLOSED
||
1724 state
== STATE_CONNECTING
) {
1725 in_q
->dispatch_throttle_release(m
->get_dispatch_throttle_size());
1730 // check received seq#. if it is old, drop the message.
1731 // note that incoming messages may skip ahead. this is convenient for the client
1732 // side queueing because messages can't be renumbered, but the (kernel) client will
1733 // occasionally pull a message out of the sent queue to send elsewhere. in that case
1734 // it doesn't matter if we "got" it or not.
1735 if (m
->get_seq() <= in_seq
) {
1736 ldout(msgr
->cct
,0) << "reader got old message "
1737 << m
->get_seq() << " <= " << in_seq
<< " " << m
<< " " << *m
1738 << ", discarding" << dendl
;
1739 in_q
->dispatch_throttle_release(m
->get_dispatch_throttle_size());
1741 if (connection_state
->has_feature(CEPH_FEATURE_RECONNECT_SEQ
) &&
1742 msgr
->cct
->_conf
->ms_die_on_old_message
)
1743 assert(0 == "old msgs despite reconnect_seq feature");
1746 if (m
->get_seq() > in_seq
+ 1) {
1747 ldout(msgr
->cct
,0) << "reader missed message? skipped from seq "
1748 << in_seq
<< " to " << m
->get_seq() << dendl
;
1749 if (msgr
->cct
->_conf
->ms_die_on_skipped_message
)
1750 assert(0 == "skipped incoming seq");
1753 m
->set_connection(connection_state
.get());
1755 // note last received message.
1756 in_seq
= m
->get_seq();
1758 cond
.Signal(); // wake up writer, to ack this
1760 ldout(msgr
->cct
,10) << "reader got message "
1761 << m
->get_seq() << " " << m
<< " " << *m
1763 in_q
->fast_preprocess(m
);
1767 if (rand() % 10000 < msgr
->cct
->_conf
->ms_inject_delay_probability
* 10000.0) {
1768 release
= m
->get_recv_stamp();
1769 release
+= msgr
->cct
->_conf
->ms_inject_delay_max
* (double)(rand() % 10000) / 10000.0;
1770 lsubdout(msgr
->cct
, ms
, 1) << "queue_received will delay until " << release
<< " on " << m
<< " " << *m
<< dendl
;
1772 delay_thread
->queue(release
, m
);
1774 if (in_q
->can_fast_dispatch(m
)) {
1775 reader_dispatching
= true;
1777 in_q
->fast_dispatch(m
);
1779 reader_dispatching
= false;
1780 if (state
== STATE_CLOSED
||
1781 notify_on_dispatch_done
) { // there might be somebody waiting
1782 notify_on_dispatch_done
= false;
1786 in_q
->enqueue(m
, m
->get_priority(), conn_id
);
1791 else if (tag
== CEPH_MSGR_TAG_CLOSE
) {
1792 ldout(msgr
->cct
,20) << "reader got CLOSE" << dendl
;
1794 if (state
== STATE_CLOSING
) {
1795 state
= STATE_CLOSED
;
1796 state_closed
= true;
1798 state
= STATE_CLOSING
;
1804 ldout(msgr
->cct
,0) << "reader bad tag " << (int)tag
<< dendl
;
1812 reader_running
= false;
1813 reader_needs_join
= true;
1814 unlock_maybe_reap();
1815 ldout(msgr
->cct
,10) << "reader done" << dendl
;
1818 /* write msgs to socket.
1824 while (state
!= STATE_CLOSED
) {// && state != STATE_WAIT) {
1825 ldout(msgr
->cct
,10) << "writer: state = " << get_state_name()
1826 << " policy.server=" << policy
.server
<< dendl
;
1829 if (is_queued() && state
== STATE_STANDBY
&& !policy
.server
)
1830 state
= STATE_CONNECTING
;
1833 if (state
== STATE_CONNECTING
) {
1834 assert(!policy
.server
);
1839 if (state
== STATE_CLOSING
) {
1841 ldout(msgr
->cct
,20) << "writer writing CLOSE tag" << dendl
;
1842 char tag
= CEPH_MSGR_TAG_CLOSE
;
1843 state
= STATE_CLOSED
;
1844 state_closed
= true;
1847 // we can ignore return value, actually; we don't care if this succeeds.
1848 int r
= ::write(sd
, &tag
, 1);
1855 if (state
!= STATE_CONNECTING
&& state
!= STATE_WAIT
&& state
!= STATE_STANDBY
&&
1856 (is_queued() || in_seq
> in_seq_acked
)) {
1859 if (send_keepalive
) {
1861 if (connection_state
->has_feature(CEPH_FEATURE_MSGR_KEEPALIVE2
)) {
1863 rc
= write_keepalive2(CEPH_MSGR_TAG_KEEPALIVE2
,
1867 rc
= write_keepalive();
1871 ldout(msgr
->cct
,2) << "writer couldn't write keepalive[2], "
1872 << cpp_strerror(errno
) << dendl
;
1876 send_keepalive
= false;
1878 if (send_keepalive_ack
) {
1879 utime_t t
= keepalive_ack_stamp
;
1881 int rc
= write_keepalive2(CEPH_MSGR_TAG_KEEPALIVE2_ACK
, t
);
1884 ldout(msgr
->cct
,2) << "writer couldn't write keepalive_ack, " << cpp_strerror(errno
) << dendl
;
1888 send_keepalive_ack
= false;
1892 if (in_seq
> in_seq_acked
) {
1893 uint64_t send_seq
= in_seq
;
1895 int rc
= write_ack(send_seq
);
1898 ldout(msgr
->cct
,2) << "writer couldn't write ack, " << cpp_strerror(errno
) << dendl
;
1902 in_seq_acked
= send_seq
;
1905 // grab outgoing message
1906 Message
*m
= _get_next_outgoing();
1908 m
->set_seq(++out_seq
);
1909 if (!policy
.lossy
) {
1915 // associate message with Connection (for benefit of encode_payload)
1916 m
->set_connection(connection_state
.get());
1918 uint64_t features
= connection_state
->get_features();
1920 if (m
->empty_payload())
1921 ldout(msgr
->cct
,20) << "writer encoding " << m
->get_seq() << " features " << features
1922 << " " << m
<< " " << *m
<< dendl
;
1924 ldout(msgr
->cct
,20) << "writer half-reencoding " << m
->get_seq() << " features " << features
1925 << " " << m
<< " " << *m
<< dendl
;
1927 // encode and copy out of *m
1928 m
->encode(features
, msgr
->crcflags
);
1930 // prepare everything
1931 const ceph_msg_header
& header
= m
->get_header();
1932 const ceph_msg_footer
& footer
= m
->get_footer();
1934 // Now that we have all the crcs calculated, handle the
1935 // digital signature for the message, if the pipe has session
1936 // security set up. Some session security options do not
1937 // actually calculate and check the signature, but they should
1938 // handle the calls to sign_message and check_signature. PLR
1939 if (session_security
.get() == NULL
) {
1940 ldout(msgr
->cct
, 20) << "writer no session security" << dendl
;
1942 if (session_security
->sign_message(m
)) {
1943 ldout(msgr
->cct
, 20) << "writer failed to sign seq # " << header
.seq
1944 << "): sig = " << footer
.sig
<< dendl
;
1946 ldout(msgr
->cct
, 20) << "writer signed seq # " << header
.seq
1947 << "): sig = " << footer
.sig
<< dendl
;
1951 bufferlist blist
= m
->get_payload();
1952 blist
.append(m
->get_middle());
1953 blist
.append(m
->get_data());
1957 m
->trace
.event("pipe writing message");
1959 ldout(msgr
->cct
,20) << "writer sending " << m
->get_seq() << " " << m
<< dendl
;
1960 int rc
= write_message(header
, footer
, blist
);
1964 ldout(msgr
->cct
,1) << "writer error sending " << m
<< ", "
1965 << cpp_strerror(errno
) << dendl
;
1974 ldout(msgr
->cct
,20) << "writer sleeping" << dendl
;
1975 cond
.Wait(pipe_lock
);
1978 ldout(msgr
->cct
,20) << "writer finishing" << dendl
;
1981 writer_running
= false;
1982 unlock_maybe_reap();
1983 ldout(msgr
->cct
,10) << "writer done" << dendl
;
1986 void Pipe::unlock_maybe_reap()
1988 if (!reader_running
&& !writer_running
) {
1991 if (delay_thread
&& delay_thread
->is_flushing()) {
1992 delay_thread
->wait_for_flush();
1994 msgr
->queue_reap(this);
2000 static void alloc_aligned_buffer(bufferlist
& data
, unsigned len
, unsigned off
)
2002 // create a buffer to read into that matches the data alignment
2003 unsigned left
= len
;
2004 if (off
& ~CEPH_PAGE_MASK
) {
2007 head
= MIN(CEPH_PAGE_SIZE
- (off
& ~CEPH_PAGE_MASK
), left
);
2008 data
.push_back(buffer::create(head
));
2011 unsigned middle
= left
& CEPH_PAGE_MASK
;
2013 data
.push_back(buffer::create_page_aligned(middle
));
2017 data
.push_back(buffer::create(left
));
2021 int Pipe::read_message(Message
**pm
, AuthSessionHandler
* auth_handler
)
2025 //ldout(msgr->cct,10) << "receiver.read_message from sd " << sd << dendl;
2027 ceph_msg_header header
;
2028 ceph_msg_footer footer
;
2029 __u32 header_crc
= 0;
2031 if (connection_state
->has_feature(CEPH_FEATURE_NOSRCADDR
)) {
2032 if (tcp_read((char*)&header
, sizeof(header
)) < 0)
2034 if (msgr
->crcflags
& MSG_CRC_HEADER
) {
2035 header_crc
= ceph_crc32c(0, (unsigned char *)&header
, sizeof(header
) - sizeof(header
.crc
));
2038 ceph_msg_header_old oldheader
;
2039 if (tcp_read((char*)&oldheader
, sizeof(oldheader
)) < 0)
2042 memcpy(&header
, &oldheader
, sizeof(header
));
2043 header
.src
= oldheader
.src
.name
;
2044 header
.reserved
= oldheader
.reserved
;
2045 if (msgr
->crcflags
& MSG_CRC_HEADER
) {
2046 header
.crc
= oldheader
.crc
;
2047 header_crc
= ceph_crc32c(0, (unsigned char *)&oldheader
, sizeof(oldheader
) - sizeof(oldheader
.crc
));
2051 ldout(msgr
->cct
,20) << "reader got envelope type=" << header
.type
2052 << " src " << entity_name_t(header
.src
)
2053 << " front=" << header
.front_len
2054 << " data=" << header
.data_len
2055 << " off " << header
.data_off
2058 // verify header crc
2059 if ((msgr
->crcflags
& MSG_CRC_HEADER
) && header_crc
!= header
.crc
) {
2060 ldout(msgr
->cct
,0) << "reader got bad header crc " << header_crc
<< " != " << header
.crc
<< dendl
;
2064 bufferlist front
, middle
, data
;
2065 int front_len
, middle_len
;
2066 unsigned data_len
, data_off
;
2069 utime_t recv_stamp
= ceph_clock_now();
2071 if (policy
.throttler_messages
) {
2072 ldout(msgr
->cct
,10) << "reader wants " << 1 << " message from policy throttler "
2073 << policy
.throttler_messages
->get_current() << "/"
2074 << policy
.throttler_messages
->get_max() << dendl
;
2075 policy
.throttler_messages
->get();
2078 uint64_t message_size
= header
.front_len
+ header
.middle_len
+ header
.data_len
;
2080 if (policy
.throttler_bytes
) {
2081 ldout(msgr
->cct
,10) << "reader wants " << message_size
<< " bytes from policy throttler "
2082 << policy
.throttler_bytes
->get_current() << "/"
2083 << policy
.throttler_bytes
->get_max() << dendl
;
2084 policy
.throttler_bytes
->get(message_size
);
2087 // throttle total bytes waiting for dispatch. do this _after_ the
2088 // policy throttle, as this one does not deadlock (unless dispatch
2089 // blocks indefinitely, which it shouldn't). in contrast, the
2090 // policy throttle carries for the lifetime of the message.
2091 ldout(msgr
->cct
,10) << "reader wants " << message_size
<< " from dispatch throttler "
2092 << in_q
->dispatch_throttler
.get_current() << "/"
2093 << in_q
->dispatch_throttler
.get_max() << dendl
;
2094 in_q
->dispatch_throttler
.get(message_size
);
2097 utime_t throttle_stamp
= ceph_clock_now();
2100 front_len
= header
.front_len
;
2102 bufferptr bp
= buffer::create(front_len
);
2103 if (tcp_read(bp
.c_str(), front_len
) < 0)
2104 goto out_dethrottle
;
2105 front
.push_back(std::move(bp
));
2106 ldout(msgr
->cct
,20) << "reader got front " << front
.length() << dendl
;
2110 middle_len
= header
.middle_len
;
2112 bufferptr bp
= buffer::create(middle_len
);
2113 if (tcp_read(bp
.c_str(), middle_len
) < 0)
2114 goto out_dethrottle
;
2115 middle
.push_back(std::move(bp
));
2116 ldout(msgr
->cct
,20) << "reader got middle " << middle
.length() << dendl
;
2121 data_len
= le32_to_cpu(header
.data_len
);
2122 data_off
= le32_to_cpu(header
.data_off
);
2124 unsigned offset
= 0;
2125 unsigned left
= data_len
;
2127 bufferlist newbuf
, rxbuf
;
2128 bufferlist::iterator blp
;
2129 int rxbuf_version
= 0;
2133 if (tcp_read_wait() < 0)
2134 goto out_dethrottle
;
2137 connection_state
->lock
.Lock();
2138 map
<ceph_tid_t
,pair
<bufferlist
,int> >::iterator p
= connection_state
->rx_buffers
.find(header
.tid
);
2139 if (p
!= connection_state
->rx_buffers
.end()) {
2140 if (rxbuf
.length() == 0 || p
->second
.second
!= rxbuf_version
) {
2141 ldout(msgr
->cct
,10) << "reader seleting rx buffer v " << p
->second
.second
2142 << " at offset " << offset
2143 << " len " << p
->second
.first
.length() << dendl
;
2144 rxbuf
= p
->second
.first
;
2145 rxbuf_version
= p
->second
.second
;
2146 // make sure it's big enough
2147 if (rxbuf
.length() < data_len
)
2148 rxbuf
.push_back(buffer::create(data_len
- rxbuf
.length()));
2149 blp
= p
->second
.first
.begin();
2150 blp
.advance(offset
);
2153 if (!newbuf
.length()) {
2154 ldout(msgr
->cct
,20) << "reader allocating new rx buffer at offset " << offset
<< dendl
;
2155 alloc_aligned_buffer(newbuf
, data_len
, data_off
);
2156 blp
= newbuf
.begin();
2157 blp
.advance(offset
);
2160 bufferptr bp
= blp
.get_current_ptr();
2161 int read
= MIN(bp
.length(), left
);
2162 ldout(msgr
->cct
,20) << "reader reading nonblocking into " << (void*)bp
.c_str() << " len " << bp
.length() << dendl
;
2163 ssize_t got
= tcp_read_nonblocking(bp
.c_str(), read
);
2164 ldout(msgr
->cct
,30) << "reader read " << got
<< " of " << read
<< dendl
;
2165 connection_state
->lock
.Unlock();
2167 goto out_dethrottle
;
2170 data
.append(bp
, 0, got
);
2173 } // else we got a signal or something; just loop.
2178 if (connection_state
->has_feature(CEPH_FEATURE_MSG_AUTH
)) {
2179 if (tcp_read((char*)&footer
, sizeof(footer
)) < 0)
2180 goto out_dethrottle
;
2182 ceph_msg_footer_old old_footer
;
2183 if (tcp_read((char*)&old_footer
, sizeof(old_footer
)) < 0)
2184 goto out_dethrottle
;
2185 footer
.front_crc
= old_footer
.front_crc
;
2186 footer
.middle_crc
= old_footer
.middle_crc
;
2187 footer
.data_crc
= old_footer
.data_crc
;
2189 footer
.flags
= old_footer
.flags
;
2192 aborted
= (footer
.flags
& CEPH_MSG_FOOTER_COMPLETE
) == 0;
2193 ldout(msgr
->cct
,10) << "aborted = " << aborted
<< dendl
;
2195 ldout(msgr
->cct
,0) << "reader got " << front
.length() << " + " << middle
.length() << " + " << data
.length()
2196 << " byte message.. ABORTED" << dendl
;
2198 goto out_dethrottle
;
2201 ldout(msgr
->cct
,20) << "reader got " << front
.length() << " + " << middle
.length() << " + " << data
.length()
2202 << " byte message" << dendl
;
2203 message
= decode_message(msgr
->cct
, msgr
->crcflags
, header
, footer
,
2204 front
, middle
, data
, connection_state
.get());
2207 goto out_dethrottle
;
2211 // Check the signature if one should be present. A zero return indicates success. PLR
2214 if (auth_handler
== NULL
) {
2215 ldout(msgr
->cct
, 10) << "No session security set" << dendl
;
2217 if (auth_handler
->check_message_signature(message
)) {
2218 ldout(msgr
->cct
, 0) << "Signature check failed" << dendl
;
2221 goto out_dethrottle
;
2225 message
->set_byte_throttler(policy
.throttler_bytes
);
2226 message
->set_message_throttler(policy
.throttler_messages
);
2228 // store reservation size in message, so we don't get confused
2229 // by messages entering the dispatch queue through other paths.
2230 message
->set_dispatch_throttle_size(message_size
);
2232 message
->set_recv_stamp(recv_stamp
);
2233 message
->set_throttle_stamp(throttle_stamp
);
2234 message
->set_recv_complete_stamp(ceph_clock_now());
2240 // release bytes reserved from the throttlers on failure
2241 if (policy
.throttler_messages
) {
2242 ldout(msgr
->cct
,10) << "reader releasing " << 1 << " message to policy throttler "
2243 << policy
.throttler_messages
->get_current() << "/"
2244 << policy
.throttler_messages
->get_max() << dendl
;
2245 policy
.throttler_messages
->put();
2248 if (policy
.throttler_bytes
) {
2249 ldout(msgr
->cct
,10) << "reader releasing " << message_size
<< " bytes to policy throttler "
2250 << policy
.throttler_bytes
->get_current() << "/"
2251 << policy
.throttler_bytes
->get_max() << dendl
;
2252 policy
.throttler_bytes
->put(message_size
);
2255 in_q
->dispatch_throttle_release(message_size
);
2261 SIGPIPE suppression - for platforms without SO_NOSIGPIPE or MSG_NOSIGNAL
2262 http://krokisplace.blogspot.in/2010/02/suppressing-sigpipe-in-library.html
2263 http://www.microhowto.info/howto/ignore_sigpipe_without_affecting_other_threads_in_a_process.html
2265 void Pipe::suppress_sigpipe()
2267 #if !defined(MSG_NOSIGNAL) && !defined(SO_NOSIGPIPE)
2269 We want to ignore possible SIGPIPE that we can generate on write.
2270 SIGPIPE is delivered *synchronously* and *only* to the thread
2271 doing the write. So if it is reported as already pending (which
2272 means the thread blocks it), then we do nothing: if we generate
2273 SIGPIPE, it will be merged with the pending one (there's no
2274 queuing), and that suits us well. If it is not pending, we block
2275 it in this thread (and we avoid changing signal action, because it
2279 sigemptyset(&pending
);
2280 sigpending(&pending
);
2281 sigpipe_pending
= sigismember(&pending
, SIGPIPE
);
2282 if (!sigpipe_pending
) {
2284 sigemptyset(&blocked
);
2285 pthread_sigmask(SIG_BLOCK
, &sigpipe_mask
, &blocked
);
2287 /* Maybe is was blocked already? */
2288 sigpipe_unblock
= ! sigismember(&blocked
, SIGPIPE
);
2290 #endif /* !defined(MSG_NOSIGNAL) && !defined(SO_NOSIGPIPE) */
2294 void Pipe::restore_sigpipe()
2296 #if !defined(MSG_NOSIGNAL) && !defined(SO_NOSIGPIPE)
2298 If SIGPIPE was pending already we do nothing. Otherwise, if it
2299 become pending (i.e., we generated it), then we sigwait() it (thus
2300 clearing pending status). Then we unblock SIGPIPE, but only if it
2301 were us who blocked it.
2303 if (!sigpipe_pending
) {
2305 sigemptyset(&pending
);
2306 sigpending(&pending
);
2307 if (sigismember(&pending
, SIGPIPE
)) {
2309 Protect ourselves from a situation when SIGPIPE was sent
2310 by the user to the whole process, and was delivered to
2311 other thread before we had a chance to wait for it.
2313 static const struct timespec nowait
= { 0, 0 };
2314 TEMP_FAILURE_RETRY(sigtimedwait(&sigpipe_mask
, NULL
, &nowait
));
2317 if (sigpipe_unblock
)
2318 pthread_sigmask(SIG_UNBLOCK
, &sigpipe_mask
, NULL
);
2320 #endif /* !defined(MSG_NOSIGNAL) && !defined(SO_NOSIGPIPE) */
2324 int Pipe::do_sendmsg(struct msghdr
*msg
, unsigned len
, bool more
)
2329 #if defined(MSG_NOSIGNAL)
2330 r
= ::sendmsg(sd
, msg
, MSG_NOSIGNAL
| (more
? MSG_MORE
: 0));
2332 r
= ::sendmsg(sd
, msg
, (more
? MSG_MORE
: 0));
2335 ldout(msgr
->cct
,10) << "do_sendmsg hmm do_sendmsg got r==0!" << dendl
;
2338 ldout(msgr
->cct
,1) << "do_sendmsg error " << cpp_strerror(r
) << dendl
;
2342 if (state
== STATE_CLOSED
) {
2343 ldout(msgr
->cct
,10) << "do_sendmsg oh look, state == CLOSED, giving up" << dendl
;
2345 return -EINTR
; // close enough
2349 if (len
== 0) break;
2351 // hrmph. trim r bytes off the front of our message.
2352 ldout(msgr
->cct
,20) << "do_sendmsg short write did " << r
<< ", still have " << len
<< dendl
;
2354 if (msg
->msg_iov
[0].iov_len
<= (size_t)r
) {
2355 // lose this whole item
2356 //ldout(msgr->cct,30) << "skipping " << msg->msg_iov[0].iov_len << ", " << (msg->msg_iovlen-1) << " v, " << r << " left" << dendl;
2357 r
-= msg
->msg_iov
[0].iov_len
;
2362 //ldout(msgr->cct,30) << "adjusting " << msg->msg_iov[0].iov_len << ", " << msg->msg_iovlen << " v, " << r << " left" << dendl;
2363 msg
->msg_iov
[0].iov_base
= (char *)msg
->msg_iov
[0].iov_base
+ r
;
2364 msg
->msg_iov
[0].iov_len
-= r
;
2374 int Pipe::write_ack(uint64_t seq
)
2376 ldout(msgr
->cct
,10) << "write_ack " << seq
<< dendl
;
2378 char c
= CEPH_MSGR_TAG_ACK
;
2383 memset(&msg
, 0, sizeof(msg
));
2384 struct iovec msgvec
[2];
2385 msgvec
[0].iov_base
= &c
;
2386 msgvec
[0].iov_len
= 1;
2387 msgvec
[1].iov_base
= &s
;
2388 msgvec
[1].iov_len
= sizeof(s
);
2389 msg
.msg_iov
= msgvec
;
2392 if (do_sendmsg(&msg
, 1 + sizeof(s
), true) < 0)
2397 int Pipe::write_keepalive()
2399 ldout(msgr
->cct
,10) << "write_keepalive" << dendl
;
2401 char c
= CEPH_MSGR_TAG_KEEPALIVE
;
2404 memset(&msg
, 0, sizeof(msg
));
2405 struct iovec msgvec
[2];
2406 msgvec
[0].iov_base
= &c
;
2407 msgvec
[0].iov_len
= 1;
2408 msg
.msg_iov
= msgvec
;
2411 if (do_sendmsg(&msg
, 1) < 0)
2416 int Pipe::write_keepalive2(char tag
, const utime_t
& t
)
2418 ldout(msgr
->cct
,10) << "write_keepalive2 " << (int)tag
<< " " << t
<< dendl
;
2419 struct ceph_timespec ts
;
2420 t
.encode_timeval(&ts
);
2422 memset(&msg
, 0, sizeof(msg
));
2423 struct iovec msgvec
[2];
2424 msgvec
[0].iov_base
= &tag
;
2425 msgvec
[0].iov_len
= 1;
2426 msgvec
[1].iov_base
= &ts
;
2427 msgvec
[1].iov_len
= sizeof(ts
);
2428 msg
.msg_iov
= msgvec
;
2431 if (do_sendmsg(&msg
, 1 + sizeof(ts
)) < 0)
2437 int Pipe::write_message(const ceph_msg_header
& header
, const ceph_msg_footer
& footer
, bufferlist
& blist
)
2441 // set up msghdr and iovecs
2443 memset(&msg
, 0, sizeof(msg
));
2444 msg
.msg_iov
= msgvec
;
2448 char tag
= CEPH_MSGR_TAG_MSG
;
2449 msgvec
[msg
.msg_iovlen
].iov_base
= &tag
;
2450 msgvec
[msg
.msg_iovlen
].iov_len
= 1;
2455 ceph_msg_header_old oldheader
;
2456 if (connection_state
->has_feature(CEPH_FEATURE_NOSRCADDR
)) {
2457 msgvec
[msg
.msg_iovlen
].iov_base
= (char*)&header
;
2458 msgvec
[msg
.msg_iovlen
].iov_len
= sizeof(header
);
2459 msglen
+= sizeof(header
);
2462 memcpy(&oldheader
, &header
, sizeof(header
));
2463 oldheader
.src
.name
= header
.src
;
2464 oldheader
.src
.addr
= connection_state
->get_peer_addr();
2465 oldheader
.orig_src
= oldheader
.src
;
2466 oldheader
.reserved
= header
.reserved
;
2467 if (msgr
->crcflags
& MSG_CRC_HEADER
) {
2468 oldheader
.crc
= ceph_crc32c(0, (unsigned char*)&oldheader
,
2469 sizeof(oldheader
) - sizeof(oldheader
.crc
));
2473 msgvec
[msg
.msg_iovlen
].iov_base
= (char*)&oldheader
;
2474 msgvec
[msg
.msg_iovlen
].iov_len
= sizeof(oldheader
);
2475 msglen
+= sizeof(oldheader
);
2479 // payload (front+data)
2480 list
<bufferptr
>::const_iterator pb
= blist
.buffers().begin();
2481 unsigned b_off
= 0; // carry-over buffer offset, if any
2482 unsigned bl_pos
= 0; // blist pos
2483 unsigned left
= blist
.length();
2486 unsigned donow
= MIN(left
, pb
->length()-b_off
);
2488 ldout(msgr
->cct
,0) << "donow = " << donow
<< " left " << left
<< " pb->length " << pb
->length()
2489 << " b_off " << b_off
<< dendl
;
2492 ldout(msgr
->cct
,30) << " bl_pos " << bl_pos
<< " b_off " << b_off
2493 << " leftinchunk " << left
2494 << " buffer len " << pb
->length()
2495 << " writing " << donow
2498 if (msg
.msg_iovlen
>= SM_IOV_MAX
-2) {
2499 if (do_sendmsg(&msg
, msglen
, true))
2502 // and restart the iov
2503 msg
.msg_iov
= msgvec
;
2508 msgvec
[msg
.msg_iovlen
].iov_base
= (void*)(pb
->c_str()+b_off
);
2509 msgvec
[msg
.msg_iovlen
].iov_len
= donow
;
2513 assert(left
>= donow
);
2519 while (b_off
== pb
->length()) {
2526 // send footer; if receiver doesn't support signatures, use the old footer format
2528 ceph_msg_footer_old old_footer
;
2529 if (connection_state
->has_feature(CEPH_FEATURE_MSG_AUTH
)) {
2530 msgvec
[msg
.msg_iovlen
].iov_base
= (void*)&footer
;
2531 msgvec
[msg
.msg_iovlen
].iov_len
= sizeof(footer
);
2532 msglen
+= sizeof(footer
);
2535 if (msgr
->crcflags
& MSG_CRC_HEADER
) {
2536 old_footer
.front_crc
= footer
.front_crc
;
2537 old_footer
.middle_crc
= footer
.middle_crc
;
2539 old_footer
.front_crc
= old_footer
.middle_crc
= 0;
2541 old_footer
.data_crc
= msgr
->crcflags
& MSG_CRC_DATA
? footer
.data_crc
: 0;
2542 old_footer
.flags
= footer
.flags
;
2543 msgvec
[msg
.msg_iovlen
].iov_base
= (char*)&old_footer
;
2544 msgvec
[msg
.msg_iovlen
].iov_len
= sizeof(old_footer
);
2545 msglen
+= sizeof(old_footer
);
2550 if (do_sendmsg(&msg
, msglen
))
2564 int Pipe::tcp_read(char *buf
, unsigned len
)
2571 if (msgr
->cct
->_conf
->ms_inject_socket_failures
&& sd
>= 0) {
2572 if (rand() % msgr
->cct
->_conf
->ms_inject_socket_failures
== 0) {
2573 ldout(msgr
->cct
, 0) << "injecting socket failure" << dendl
;
2574 ::shutdown(sd
, SHUT_RDWR
);
2578 if (tcp_read_wait() < 0)
2581 ssize_t got
= tcp_read_nonblocking(buf
, len
);
2588 //lgeneric_dout(cct, DBL) << "tcp_read got " << got << ", " << len << " left" << dendl;
2593 int Pipe::tcp_read_wait()
2600 pfd
.events
= POLLIN
;
2601 #if defined(__linux__)
2602 pfd
.events
|= POLLRDHUP
;
2605 if (has_pending_data())
2608 int r
= poll(&pfd
, 1, msgr
->timeout
);
2614 evmask
= POLLERR
| POLLHUP
| POLLNVAL
;
2615 #if defined(__linux__)
2616 evmask
|= POLLRDHUP
;
2618 if (pfd
.revents
& evmask
)
2621 if (!(pfd
.revents
& POLLIN
))
2627 ssize_t
Pipe::do_recv(char *buf
, size_t len
, int flags
)
2630 ssize_t got
= ::recv( sd
, buf
, len
, flags
);
2632 if (errno
== EINTR
) {
2635 ldout(msgr
->cct
, 10) << __func__
<< " socket " << sd
<< " returned "
2636 << got
<< " " << cpp_strerror(errno
) << dendl
;
2645 ssize_t
Pipe::buffered_recv(char *buf
, size_t len
, int flags
)
2648 ssize_t total_recv
= 0;
2649 if (recv_len
> recv_ofs
) {
2650 int to_read
= MIN(recv_len
- recv_ofs
, left
);
2651 memcpy(buf
, &recv_buf
[recv_ofs
], to_read
);
2652 recv_ofs
+= to_read
;
2658 total_recv
+= to_read
;
2661 /* nothing left in the prefetch buffer */
2663 if (left
> recv_max_prefetch
) {
2664 /* this was a large read, we don't prefetch for these */
2665 ssize_t ret
= do_recv(buf
, left
, flags
);
2676 ssize_t got
= do_recv(recv_buf
, recv_max_prefetch
, flags
);
2684 recv_len
= (size_t)got
;
2685 got
= MIN(left
, (size_t)got
);
2686 memcpy(buf
, recv_buf
, got
);
2692 ssize_t
Pipe::tcp_read_nonblocking(char *buf
, unsigned len
)
2694 ssize_t got
= buffered_recv(buf
, len
, MSG_DONTWAIT
);
2696 ldout(msgr
->cct
, 10) << __func__
<< " socket " << sd
<< " returned "
2697 << got
<< " " << cpp_strerror(errno
) << dendl
;
2701 /* poll() said there was data, but we didn't read any - peer
2702 * sent a FIN. Maybe POLLRDHUP signals this, but this is
2703 * standard socket behavior as documented by Stevens.
2710 int Pipe::tcp_write(const char *buf
, unsigned len
)
2716 pfd
.events
= POLLOUT
| POLLHUP
| POLLNVAL
| POLLERR
;
2717 #if defined(__linux__)
2718 pfd
.events
|= POLLRDHUP
;
2721 if (msgr
->cct
->_conf
->ms_inject_socket_failures
&& sd
>= 0) {
2722 if (rand() % msgr
->cct
->_conf
->ms_inject_socket_failures
== 0) {
2723 ldout(msgr
->cct
, 0) << "injecting socket failure" << dendl
;
2724 ::shutdown(sd
, SHUT_RDWR
);
2728 if (poll(&pfd
, 1, -1) < 0)
2731 if (!(pfd
.revents
& POLLOUT
))
2734 //lgeneric_dout(cct, DBL) << "tcp_write writing " << len << dendl;
2740 #if defined(MSG_NOSIGNAL)
2741 did
= ::send( sd
, buf
, len
, MSG_NOSIGNAL
);
2743 did
= ::send( sd
, buf
, len
, 0);
2746 //lgeneric_dout(cct, 1) << "tcp_write error did = " << did << " " << cpp_strerror(errno) << dendl;
2747 //lgeneric_derr(cct, 1) << "tcp_write error did = " << did << " " << cpp_strerror(errno) << dendl;
2752 //lgeneric_dout(cct, DBL) << "tcp_write did " << did << ", " << len << " left" << dendl;