]> git.proxmox.com Git - ceph.git/blob - ceph/src/msg/simple/Pipe.cc
import ceph nautilus 14.2.2
[ceph.git] / ceph / src / msg / simple / Pipe.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #include <sys/types.h>
16 #include <sys/socket.h>
17 #include <netinet/in.h>
18 #include <netinet/ip.h>
19 #include <netinet/tcp.h>
20 #include <sys/uio.h>
21 #include <limits.h>
22 #include <poll.h>
23
24 #include "msg/Message.h"
25 #include "Pipe.h"
26 #include "SimpleMessenger.h"
27
28 #include "common/debug.h"
29 #include "common/errno.h"
30 #include "common/valgrind.h"
31
32 // Below included to get encode_encrypt(); That probably should be in Crypto.h, instead
33
34 #include "auth/cephx/CephxProtocol.h"
35 #include "auth/AuthSessionHandler.h"
36
37 #include "include/compat.h"
38 #include "include/sock_compat.h"
39 #include "include/random.h"
40
41 // Constant to limit starting sequence number to 2^31. Nothing special about it, just a big number. PLR
42 #define SEQ_MASK 0x7fffffff
43 #define dout_subsys ceph_subsys_ms
44
45 #undef dout_prefix
46 #define dout_prefix *_dout << *this
47 ostream& Pipe::_pipe_prefix(std::ostream &out) const {
48 return out << "-- " << msgr->get_myaddr_legacy() << " >> " << peer_addr
49 << " pipe(" << this
50 << " sd=" << sd << " :" << port
51 << " s=" << state
52 << " pgs=" << peer_global_seq
53 << " cs=" << connect_seq
54 << " l=" << policy.lossy
55 << " c=" << connection_state
56 << ").";
57 }
58
59 ostream& operator<<(ostream &out, const Pipe &pipe) {
60 return pipe._pipe_prefix(out);
61 }
62
63 /**
64 * The DelayedDelivery is for injecting delays into Message delivery off
65 * the socket. It is only enabled if delays are requested, and if they
66 * are then it pulls Messages off the DelayQueue and puts them into the
67 * in_q (SimpleMessenger::dispatch_queue).
68 * Please note that this probably has issues with Pipe shutdown and
69 * replacement semantics. I've tried, but no guarantees.
70 */
71 class Pipe::DelayedDelivery: public Thread {
72 Pipe *pipe;
73 std::deque< pair<utime_t,Message*> > delay_queue;
74 Mutex delay_lock;
75 Cond delay_cond;
76 int flush_count;
77 bool active_flush;
78 bool stop_delayed_delivery;
79 bool delay_dispatching; // we are in fast dispatch now
80 bool stop_fast_dispatching_flag; // we need to stop fast dispatching
81
82 public:
83 explicit DelayedDelivery(Pipe *p)
84 : pipe(p),
85 delay_lock("Pipe::DelayedDelivery::delay_lock"), flush_count(0),
86 active_flush(false),
87 stop_delayed_delivery(false),
88 delay_dispatching(false),
89 stop_fast_dispatching_flag(false) { }
90 ~DelayedDelivery() override {
91 discard();
92 }
93 void *entry() override;
94 void queue(utime_t release, Message *m) {
95 Mutex::Locker l(delay_lock);
96 delay_queue.push_back(make_pair(release, m));
97 delay_cond.Signal();
98 }
99 void discard();
100 void flush();
101 bool is_flushing() {
102 Mutex::Locker l(delay_lock);
103 return flush_count > 0 || active_flush;
104 }
105 void wait_for_flush() {
106 Mutex::Locker l(delay_lock);
107 while (flush_count > 0 || active_flush)
108 delay_cond.Wait(delay_lock);
109 }
110 void stop() {
111 delay_lock.Lock();
112 stop_delayed_delivery = true;
113 delay_cond.Signal();
114 delay_lock.Unlock();
115 }
116 void steal_for_pipe(Pipe *new_owner) {
117 Mutex::Locker l(delay_lock);
118 pipe = new_owner;
119 }
120 /**
121 * We need to stop fast dispatching before we need to stop putting
122 * normal messages into the DispatchQueue.
123 */
124 void stop_fast_dispatching();
125 };
126
127 /**************************************
128 * Pipe
129 */
130
131 Pipe::Pipe(SimpleMessenger *r, int st, PipeConnection *con)
132 : RefCountedObject(r->cct),
133 reader_thread(this),
134 writer_thread(this),
135 delay_thread(NULL),
136 msgr(r),
137 conn_id(r->dispatch_queue.get_id()),
138 recv_ofs(0),
139 recv_len(0),
140 sd(-1), port(0),
141 peer_type(-1),
142 pipe_lock("SimpleMessenger::Pipe::pipe_lock"),
143 state(st),
144 connection_state(NULL),
145 reader_running(false), reader_needs_join(false),
146 reader_dispatching(false), notify_on_dispatch_done(false),
147 writer_running(false),
148 in_q(&(r->dispatch_queue)),
149 send_keepalive(false),
150 send_keepalive_ack(false),
151 connect_seq(0), peer_global_seq(0),
152 out_seq(0), in_seq(0), in_seq_acked(0) {
153 ANNOTATE_BENIGN_RACE_SIZED(&sd, sizeof(sd), "Pipe socket");
154 ANNOTATE_BENIGN_RACE_SIZED(&state, sizeof(state), "Pipe state");
155 ANNOTATE_BENIGN_RACE_SIZED(&recv_len, sizeof(recv_len), "Pipe recv_len");
156 ANNOTATE_BENIGN_RACE_SIZED(&recv_ofs, sizeof(recv_ofs), "Pipe recv_ofs");
157 if (con) {
158 connection_state = con;
159 connection_state->reset_pipe(this);
160 } else {
161 connection_state = new PipeConnection(msgr->cct, msgr);
162 connection_state->pipe = get();
163 }
164
165 randomize_out_seq();
166
167 msgr->timeout = msgr->cct->_conf->ms_connection_idle_timeout * 1000; //convert to ms
168 if (msgr->timeout == 0)
169 msgr->timeout = -1;
170
171 recv_max_prefetch = msgr->cct->_conf->ms_tcp_prefetch_max_size;
172 recv_buf = new char[recv_max_prefetch];
173 }
174
175 Pipe::~Pipe()
176 {
177 ceph_assert(out_q.empty());
178 ceph_assert(sent.empty());
179 delete delay_thread;
180 delete[] recv_buf;
181 }
182
183 void Pipe::handle_ack(uint64_t seq)
184 {
185 lsubdout(msgr->cct, ms, 15) << "reader got ack seq " << seq << dendl;
186 // trim sent list
187 while (!sent.empty() &&
188 sent.front()->get_seq() <= seq) {
189 Message *m = sent.front();
190 sent.pop_front();
191 lsubdout(msgr->cct, ms, 10) << "reader got ack seq "
192 << seq << " >= " << m->get_seq() << " on " << m << " " << *m << dendl;
193 m->put();
194 }
195 }
196
197 void Pipe::start_reader()
198 {
199 ceph_assert(pipe_lock.is_locked());
200 ceph_assert(!reader_running);
201 if (reader_needs_join) {
202 reader_thread.join();
203 reader_needs_join = false;
204 }
205 reader_running = true;
206 reader_thread.create("ms_pipe_read", msgr->cct->_conf->ms_rwthread_stack_bytes);
207 }
208
209 void Pipe::maybe_start_delay_thread()
210 {
211 if (!delay_thread) {
212 auto pos = msgr->cct->_conf.get_val<std::string>("ms_inject_delay_type").find(ceph_entity_type_name(connection_state->peer_type));
213 if (pos != string::npos) {
214 lsubdout(msgr->cct, ms, 1) << "setting up a delay queue on Pipe " << this << dendl;
215 delay_thread = new DelayedDelivery(this);
216 delay_thread->create("ms_pipe_delay");
217 }
218 }
219 }
220
221 void Pipe::start_writer()
222 {
223 ceph_assert(pipe_lock.is_locked());
224 ceph_assert(!writer_running);
225 writer_running = true;
226 writer_thread.create("ms_pipe_write", msgr->cct->_conf->ms_rwthread_stack_bytes);
227 }
228
229 void Pipe::join_reader()
230 {
231 if (!reader_running)
232 return;
233 cond.Signal();
234 pipe_lock.Unlock();
235 reader_thread.join();
236 pipe_lock.Lock();
237 reader_needs_join = false;
238 }
239
240 void Pipe::DelayedDelivery::discard()
241 {
242 lgeneric_subdout(pipe->msgr->cct, ms, 20) << *pipe << "DelayedDelivery::discard" << dendl;
243 Mutex::Locker l(delay_lock);
244 while (!delay_queue.empty()) {
245 Message *m = delay_queue.front().second;
246 pipe->in_q->dispatch_throttle_release(m->get_dispatch_throttle_size());
247 m->put();
248 delay_queue.pop_front();
249 }
250 }
251
252 void Pipe::DelayedDelivery::flush()
253 {
254 lgeneric_subdout(pipe->msgr->cct, ms, 20) << *pipe << "DelayedDelivery::flush" << dendl;
255 Mutex::Locker l(delay_lock);
256 flush_count = delay_queue.size();
257 delay_cond.Signal();
258 }
259
260 void *Pipe::DelayedDelivery::entry()
261 {
262 Mutex::Locker locker(delay_lock);
263 lgeneric_subdout(pipe->msgr->cct, ms, 20) << *pipe << "DelayedDelivery::entry start" << dendl;
264
265 while (!stop_delayed_delivery) {
266 if (delay_queue.empty()) {
267 lgeneric_subdout(pipe->msgr->cct, ms, 30) << *pipe << "DelayedDelivery::entry sleeping on delay_cond because delay queue is empty" << dendl;
268 delay_cond.Wait(delay_lock);
269 continue;
270 }
271 utime_t release = delay_queue.front().first;
272 Message *m = delay_queue.front().second;
273 string delay_msg_type = pipe->msgr->cct->_conf->ms_inject_delay_msg_type;
274 if (!flush_count &&
275 (release > ceph_clock_now() &&
276 (delay_msg_type.empty() || m->get_type_name() == delay_msg_type))) {
277 lgeneric_subdout(pipe->msgr->cct, ms, 10) << *pipe << "DelayedDelivery::entry sleeping on delay_cond until " << release << dendl;
278 delay_cond.WaitUntil(delay_lock, release);
279 continue;
280 }
281 lgeneric_subdout(pipe->msgr->cct, ms, 10) << *pipe << "DelayedDelivery::entry dequeuing message " << m << " for delivery, past " << release << dendl;
282 delay_queue.pop_front();
283 if (flush_count > 0) {
284 --flush_count;
285 active_flush = true;
286 }
287 if (pipe->in_q->can_fast_dispatch(m)) {
288 if (!stop_fast_dispatching_flag) {
289 delay_dispatching = true;
290 delay_lock.Unlock();
291 pipe->in_q->fast_dispatch(m);
292 delay_lock.Lock();
293 delay_dispatching = false;
294 if (stop_fast_dispatching_flag) {
295 // we need to let the stopping thread proceed
296 delay_cond.Signal();
297 delay_lock.Unlock();
298 delay_lock.Lock();
299 }
300 }
301 } else {
302 pipe->in_q->enqueue(m, m->get_priority(), pipe->conn_id);
303 }
304 active_flush = false;
305 }
306 lgeneric_subdout(pipe->msgr->cct, ms, 20) << *pipe << "DelayedDelivery::entry stop" << dendl;
307 return NULL;
308 }
309
310 void Pipe::DelayedDelivery::stop_fast_dispatching() {
311 Mutex::Locker l(delay_lock);
312 stop_fast_dispatching_flag = true;
313 while (delay_dispatching)
314 delay_cond.Wait(delay_lock);
315 }
316
317
318 int Pipe::accept()
319 {
320 ldout(msgr->cct,10) << "accept" << dendl;
321 ceph_assert(pipe_lock.is_locked());
322 ceph_assert(state == STATE_ACCEPTING);
323
324 pipe_lock.Unlock();
325
326 // vars
327 bufferlist addrs;
328 entity_addr_t socket_addr;
329 socklen_t len;
330 int r;
331 char banner[strlen(CEPH_BANNER)+1];
332 bufferlist addrbl;
333 ceph_msg_connect connect;
334 ceph_msg_connect_reply reply;
335 Pipe *existing = 0;
336 bufferptr bp;
337 bufferlist authorizer, authorizer_reply;
338 bool authorizer_valid;
339 uint64_t feat_missing;
340 bool replaced = false;
341 // this variable denotes if the connection attempt from peer is a hard
342 // reset or not, it is true if there is an existing connection and the
343 // connection sequence from peer is equal to zero
344 bool is_reset_from_peer = false;
345 CryptoKey session_key;
346 int removed; // single-use down below
347
348 // this should roughly mirror pseudocode at
349 // http://ceph.com/wiki/Messaging_protocol
350 int reply_tag = 0;
351 uint64_t existing_seq = -1;
352
353 // used for reading in the remote acked seq on connect
354 uint64_t newly_acked_seq = 0;
355
356 bool need_challenge = false;
357 bool had_challenge = false;
358 std::unique_ptr<AuthAuthorizerChallenge> authorizer_challenge;
359
360 recv_reset();
361
362 set_socket_options();
363
364 // announce myself.
365 r = tcp_write(CEPH_BANNER, strlen(CEPH_BANNER));
366 if (r < 0) {
367 ldout(msgr->cct,10) << "accept couldn't write banner" << dendl;
368 goto fail_unlocked;
369 }
370
371 // and my addr
372 encode(msgr->my_addr, addrs, 0); // legacy
373
374 port = msgr->my_addr.get_port();
375
376 // and peer's socket addr (they might not know their ip)
377 sockaddr_storage ss;
378 len = sizeof(ss);
379 r = ::getpeername(sd, (sockaddr*)&ss, &len);
380 if (r < 0) {
381 ldout(msgr->cct,0) << "accept failed to getpeername " << cpp_strerror(errno) << dendl;
382 goto fail_unlocked;
383 }
384 socket_addr.set_sockaddr((sockaddr*)&ss);
385 encode(socket_addr, addrs, 0); // legacy
386
387 r = tcp_write(addrs.c_str(), addrs.length());
388 if (r < 0) {
389 ldout(msgr->cct,10) << "accept couldn't write my+peer addr" << dendl;
390 goto fail_unlocked;
391 }
392
393 ldout(msgr->cct,1) << "accept sd=" << sd << " " << socket_addr << dendl;
394
395 // identify peer
396 if (tcp_read(banner, strlen(CEPH_BANNER)) < 0) {
397 ldout(msgr->cct,10) << "accept couldn't read banner" << dendl;
398 goto fail_unlocked;
399 }
400 if (memcmp(banner, CEPH_BANNER, strlen(CEPH_BANNER))) {
401 banner[strlen(CEPH_BANNER)] = 0;
402 ldout(msgr->cct,1) << "accept peer sent bad banner '" << banner << "' (should be '" << CEPH_BANNER << "')" << dendl;
403 goto fail_unlocked;
404 }
405 {
406 bufferptr tp(sizeof(ceph_entity_addr));
407 addrbl.push_back(std::move(tp));
408 }
409 if (tcp_read(addrbl.c_str(), addrbl.length()) < 0) {
410 ldout(msgr->cct,10) << "accept couldn't read peer_addr" << dendl;
411 goto fail_unlocked;
412 }
413 try {
414 auto ti = addrbl.cbegin();
415 decode(peer_addr, ti);
416 } catch (const buffer::error& e) {
417 ldout(msgr->cct,2) << __func__ << " decode peer_addr failed: " << e.what()
418 << dendl;
419 goto fail_unlocked;
420 }
421
422 ldout(msgr->cct,10) << "accept peer addr is " << peer_addr << dendl;
423 if (peer_addr.is_blank_ip()) {
424 // peer apparently doesn't know what ip they have; figure it out for them.
425 int port = peer_addr.get_port();
426 peer_addr.u = socket_addr.u;
427 peer_addr.set_port(port);
428 ldout(msgr->cct,0) << "accept peer addr is really " << peer_addr
429 << " (socket is " << socket_addr << ")" << dendl;
430 }
431 set_peer_addr(peer_addr); // so that connection_state gets set up
432
433 while (1) {
434 if (tcp_read((char*)&connect, sizeof(connect)) < 0) {
435 ldout(msgr->cct,10) << "accept couldn't read connect" << dendl;
436 goto fail_unlocked;
437 }
438
439 authorizer.clear();
440 if (connect.authorizer_len) {
441 bp = buffer::create(connect.authorizer_len);
442 if (tcp_read(bp.c_str(), connect.authorizer_len) < 0) {
443 ldout(msgr->cct,10) << "accept couldn't read connect authorizer" << dendl;
444 goto fail_unlocked;
445 }
446 authorizer.push_back(std::move(bp));
447 authorizer_reply.clear();
448 }
449
450 ldout(msgr->cct,20) << "accept got peer connect_seq " << connect.connect_seq
451 << " global_seq " << connect.global_seq
452 << dendl;
453
454 msgr->lock.Lock(); // FIXME
455 pipe_lock.Lock();
456 if (msgr->dispatch_queue.stop)
457 goto shutting_down;
458 if (state != STATE_ACCEPTING) {
459 goto shutting_down;
460 }
461
462 // note peer's type, flags
463 set_peer_type(connect.host_type);
464 policy = msgr->get_policy(connect.host_type);
465 ldout(msgr->cct,10) << "accept of host_type " << connect.host_type
466 << ", policy.lossy=" << policy.lossy
467 << " policy.server=" << policy.server
468 << " policy.standby=" << policy.standby
469 << " policy.resetcheck=" << policy.resetcheck
470 << dendl;
471
472 memset(&reply, 0, sizeof(reply));
473 reply.protocol_version = msgr->get_proto_version(peer_type, false);
474 msgr->lock.Unlock();
475
476 // mismatch?
477 ldout(msgr->cct,10) << "accept my proto " << reply.protocol_version
478 << ", their proto " << connect.protocol_version << dendl;
479 if (connect.protocol_version != reply.protocol_version) {
480 reply.tag = CEPH_MSGR_TAG_BADPROTOVER;
481 goto reply;
482 }
483
484 // require signatures for cephx?
485 if (connect.authorizer_protocol == CEPH_AUTH_CEPHX) {
486 if (peer_type == CEPH_ENTITY_TYPE_OSD ||
487 peer_type == CEPH_ENTITY_TYPE_MDS ||
488 peer_type == CEPH_ENTITY_TYPE_MGR) {
489 if (msgr->cct->_conf->cephx_require_signatures ||
490 msgr->cct->_conf->cephx_cluster_require_signatures) {
491 ldout(msgr->cct,10) << "using cephx, requiring MSG_AUTH feature bit for cluster" << dendl;
492 policy.features_required |= CEPH_FEATURE_MSG_AUTH;
493 }
494 if (msgr->cct->_conf->cephx_require_version >= 2 ||
495 msgr->cct->_conf->cephx_cluster_require_version >= 2) {
496 ldout(msgr->cct,10) << "using cephx, requiring cephx v2 feature bit for cluster" << dendl;
497 policy.features_required |= CEPH_FEATUREMASK_CEPHX_V2;
498 }
499 } else {
500 if (msgr->cct->_conf->cephx_require_signatures ||
501 msgr->cct->_conf->cephx_service_require_signatures) {
502 ldout(msgr->cct,10) << "using cephx, requiring MSG_AUTH feature bit for service" << dendl;
503 policy.features_required |= CEPH_FEATURE_MSG_AUTH;
504 }
505 if (msgr->cct->_conf->cephx_require_version >= 2 ||
506 msgr->cct->_conf->cephx_service_require_version >= 2) {
507 ldout(msgr->cct,10) << "using cephx, requiring cephx v2 feature bit for cluster" << dendl;
508 policy.features_required |= CEPH_FEATUREMASK_CEPHX_V2;
509 }
510 }
511 }
512
513 feat_missing = policy.features_required & ~(uint64_t)connect.features;
514 if (feat_missing) {
515 ldout(msgr->cct,1) << "peer missing required features " << std::hex << feat_missing << std::dec << dendl;
516 reply.tag = CEPH_MSGR_TAG_FEATURES;
517 goto reply;
518 }
519
520 // Check the authorizer. If not good, bail out.
521
522 pipe_lock.Unlock();
523
524 need_challenge = HAVE_FEATURE(connect.features, CEPHX_V2);
525 had_challenge = (bool)authorizer_challenge;
526 authorizer_reply.clear();
527 if (!msgr->ms_deliver_verify_authorizer(
528 connection_state.get(), peer_type, connect.authorizer_protocol,
529 authorizer,
530 authorizer_reply, authorizer_valid, session_key,
531 nullptr /* connection_secret */,
532 need_challenge ? &authorizer_challenge : nullptr) ||
533 !authorizer_valid) {
534 pipe_lock.Lock();
535 if (state != STATE_ACCEPTING)
536 goto shutting_down_msgr_unlocked;
537 if (!had_challenge && need_challenge && authorizer_challenge) {
538 ldout(msgr->cct,10) << "accept: challenging authorizer "
539 << authorizer_reply.length()
540 << " bytes" << dendl;
541 ceph_assert(authorizer_reply.length());
542 reply.tag = CEPH_MSGR_TAG_CHALLENGE_AUTHORIZER;
543 } else {
544 ldout(msgr->cct,0) << "accept: got bad authorizer" << dendl;
545 reply.tag = CEPH_MSGR_TAG_BADAUTHORIZER;
546 }
547 session_security.reset();
548 goto reply;
549 }
550
551 // We've verified the authorizer for this pipe, so set up the session security structure. PLR
552
553 ldout(msgr->cct,10) << "accept: setting up session_security." << dendl;
554
555 retry_existing_lookup:
556 msgr->lock.Lock();
557 pipe_lock.Lock();
558 if (msgr->dispatch_queue.stop)
559 goto shutting_down;
560 if (state != STATE_ACCEPTING)
561 goto shutting_down;
562
563 // existing?
564 existing = msgr->_lookup_pipe(peer_addr);
565 if (existing) {
566 existing->pipe_lock.Lock(true); // skip lockdep check (we are locking a second Pipe here)
567 if (existing->reader_dispatching) {
568 /** we need to wait, or we can deadlock if downstream
569 * fast_dispatchers are (naughtily!) waiting on resources
570 * held by somebody trying to make use of the SimpleMessenger lock.
571 * So drop locks, wait, and retry. It just looks like a slow network
572 * to everybody else.
573 *
574 * We take a ref to existing here since it might get reaped before we
575 * wake up (see bug #15870). We can be confident that it lived until
576 * locked it since we held the msgr lock from _lookup_pipe through to
577 * locking existing->lock and checking reader_dispatching.
578 */
579 existing->get();
580 pipe_lock.Unlock();
581 msgr->lock.Unlock();
582 existing->notify_on_dispatch_done = true;
583 while (existing->reader_dispatching)
584 existing->cond.Wait(existing->pipe_lock);
585 existing->pipe_lock.Unlock();
586 existing->put();
587 existing = nullptr;
588 goto retry_existing_lookup;
589 }
590
591 if (connect.global_seq < existing->peer_global_seq) {
592 ldout(msgr->cct,10) << "accept existing " << existing << ".gseq " << existing->peer_global_seq
593 << " > " << connect.global_seq << ", RETRY_GLOBAL" << dendl;
594 reply.tag = CEPH_MSGR_TAG_RETRY_GLOBAL;
595 reply.global_seq = existing->peer_global_seq; // so we can send it below..
596 existing->pipe_lock.Unlock();
597 msgr->lock.Unlock();
598 goto reply;
599 } else {
600 ldout(msgr->cct,10) << "accept existing " << existing << ".gseq " << existing->peer_global_seq
601 << " <= " << connect.global_seq << ", looks ok" << dendl;
602 }
603
604 if (existing->policy.lossy) {
605 ldout(msgr->cct,0) << "accept replacing existing (lossy) channel (new one lossy="
606 << policy.lossy << ")" << dendl;
607 existing->was_session_reset();
608 goto replace;
609 }
610
611 ldout(msgr->cct,0) << "accept connect_seq " << connect.connect_seq
612 << " vs existing " << existing->connect_seq
613 << " state " << existing->get_state_name() << dendl;
614
615 if (connect.connect_seq == 0 && existing->connect_seq > 0) {
616 ldout(msgr->cct,0) << "accept peer reset, then tried to connect to us, replacing" << dendl;
617 // this is a hard reset from peer
618 is_reset_from_peer = true;
619 if (policy.resetcheck)
620 existing->was_session_reset(); // this resets out_queue, msg_ and connect_seq #'s
621 goto replace;
622 }
623
624 if (connect.connect_seq < existing->connect_seq) {
625 // old attempt, or we sent READY but they didn't get it.
626 ldout(msgr->cct,10) << "accept existing " << existing << ".cseq " << existing->connect_seq
627 << " > " << connect.connect_seq << ", RETRY_SESSION" << dendl;
628 goto retry_session;
629 }
630
631 if (connect.connect_seq == existing->connect_seq) {
632 // if the existing connection successfully opened, and/or
633 // subsequently went to standby, then the peer should bump
634 // their connect_seq and retry: this is not a connection race
635 // we need to resolve here.
636 if (existing->state == STATE_OPEN ||
637 existing->state == STATE_STANDBY) {
638 ldout(msgr->cct,10) << "accept connection race, existing " << existing
639 << ".cseq " << existing->connect_seq
640 << " == " << connect.connect_seq
641 << ", OPEN|STANDBY, RETRY_SESSION" << dendl;
642 goto retry_session;
643 }
644
645 // connection race?
646 if (peer_addr < msgr->my_addr ||
647 existing->policy.server) {
648 // incoming wins
649 ldout(msgr->cct,10) << "accept connection race, existing " << existing << ".cseq " << existing->connect_seq
650 << " == " << connect.connect_seq << ", or we are server, replacing my attempt" << dendl;
651 if (!(existing->state == STATE_CONNECTING ||
652 existing->state == STATE_WAIT))
653 lderr(msgr->cct) << "accept race bad state, would replace, existing="
654 << existing->get_state_name()
655 << " " << existing << ".cseq=" << existing->connect_seq
656 << " == " << connect.connect_seq
657 << dendl;
658 ceph_assert(existing->state == STATE_CONNECTING ||
659 existing->state == STATE_WAIT);
660 goto replace;
661 } else {
662 // our existing outgoing wins
663 ldout(msgr->cct,10) << "accept connection race, existing " << existing << ".cseq " << existing->connect_seq
664 << " == " << connect.connect_seq << ", sending WAIT" << dendl;
665 ceph_assert(peer_addr > msgr->my_addr);
666 if (!(existing->state == STATE_CONNECTING))
667 lderr(msgr->cct) << "accept race bad state, would send wait, existing="
668 << existing->get_state_name()
669 << " " << existing << ".cseq=" << existing->connect_seq
670 << " == " << connect.connect_seq
671 << dendl;
672 ceph_assert(existing->state == STATE_CONNECTING);
673 // make sure our outgoing connection will follow through
674 existing->_send_keepalive();
675 reply.tag = CEPH_MSGR_TAG_WAIT;
676 existing->pipe_lock.Unlock();
677 msgr->lock.Unlock();
678 goto reply;
679 }
680 }
681
682 ceph_assert(connect.connect_seq > existing->connect_seq);
683 ceph_assert(connect.global_seq >= existing->peer_global_seq);
684 if (policy.resetcheck && // RESETSESSION only used by servers; peers do not reset each other
685 existing->connect_seq == 0) {
686 ldout(msgr->cct,0) << "accept we reset (peer sent cseq " << connect.connect_seq
687 << ", " << existing << ".cseq = " << existing->connect_seq
688 << "), sending RESETSESSION" << dendl;
689 reply.tag = CEPH_MSGR_TAG_RESETSESSION;
690 msgr->lock.Unlock();
691 existing->pipe_lock.Unlock();
692 goto reply;
693 }
694
695 // reconnect
696 ldout(msgr->cct,10) << "accept peer sent cseq " << connect.connect_seq
697 << " > " << existing->connect_seq << dendl;
698 goto replace;
699 } // existing
700 else if (connect.connect_seq > 0) {
701 // we reset, and they are opening a new session
702 ldout(msgr->cct,0) << "accept we reset (peer sent cseq " << connect.connect_seq << "), sending RESETSESSION" << dendl;
703 msgr->lock.Unlock();
704 reply.tag = CEPH_MSGR_TAG_RESETSESSION;
705 goto reply;
706 } else {
707 // new session
708 ldout(msgr->cct,10) << "accept new session" << dendl;
709 existing = NULL;
710 goto open;
711 }
712 ceph_abort();
713
714 retry_session:
715 ceph_assert(existing->pipe_lock.is_locked());
716 ceph_assert(pipe_lock.is_locked());
717 reply.tag = CEPH_MSGR_TAG_RETRY_SESSION;
718 reply.connect_seq = existing->connect_seq + 1;
719 existing->pipe_lock.Unlock();
720 msgr->lock.Unlock();
721 goto reply;
722
723 reply:
724 ceph_assert(pipe_lock.is_locked());
725 reply.features = ((uint64_t)connect.features & policy.features_supported) | policy.features_required;
726 reply.authorizer_len = authorizer_reply.length();
727 pipe_lock.Unlock();
728 r = tcp_write((char*)&reply, sizeof(reply));
729 if (r < 0)
730 goto fail_unlocked;
731 if (reply.authorizer_len) {
732 r = tcp_write(authorizer_reply.c_str(), authorizer_reply.length());
733 if (r < 0)
734 goto fail_unlocked;
735 }
736 }
737
738 replace:
739 ceph_assert(existing->pipe_lock.is_locked());
740 ceph_assert(pipe_lock.is_locked());
741 // if it is a hard reset from peer, we don't need a round-trip to negotiate in/out sequence
742 if ((connect.features & CEPH_FEATURE_RECONNECT_SEQ) && !is_reset_from_peer) {
743 reply_tag = CEPH_MSGR_TAG_SEQ;
744 existing_seq = existing->in_seq;
745 }
746 ldout(msgr->cct,10) << "accept replacing " << existing << dendl;
747 existing->stop();
748 existing->unregister_pipe();
749 replaced = true;
750
751 if (existing->policy.lossy) {
752 // disconnect from the Connection
753 ceph_assert(existing->connection_state);
754 if (existing->connection_state->clear_pipe(existing))
755 msgr->dispatch_queue.queue_reset(existing->connection_state.get());
756 } else {
757 // queue a reset on the new connection, which we're dumping for the old
758 msgr->dispatch_queue.queue_reset(connection_state.get());
759
760 // drop my Connection, and take a ref to the existing one. do not
761 // clear existing->connection_state, since read_message and
762 // write_message both dereference it without pipe_lock.
763 connection_state = existing->connection_state;
764
765 // make existing Connection reference us
766 connection_state->reset_pipe(this);
767
768 if (existing->delay_thread) {
769 existing->delay_thread->steal_for_pipe(this);
770 delay_thread = existing->delay_thread;
771 existing->delay_thread = NULL;
772 delay_thread->flush();
773 }
774
775 // steal incoming queue
776 uint64_t replaced_conn_id = conn_id;
777 conn_id = existing->conn_id;
778 existing->conn_id = replaced_conn_id;
779
780 // reset the in_seq if this is a hard reset from peer,
781 // otherwise we respect our original connection's value
782 in_seq = is_reset_from_peer ? 0 : existing->in_seq;
783 in_seq_acked = in_seq;
784
785 // steal outgoing queue and out_seq
786 existing->requeue_sent();
787 out_seq = existing->out_seq;
788 ldout(msgr->cct,10) << "accept re-queuing on out_seq " << out_seq << " in_seq " << in_seq << dendl;
789 for (map<int, list<Message*> >::iterator p = existing->out_q.begin();
790 p != existing->out_q.end();
791 ++p)
792 out_q[p->first].splice(out_q[p->first].begin(), p->second);
793 }
794 existing->stop_and_wait();
795 existing->pipe_lock.Unlock();
796
797 open:
798 // open
799 ceph_assert(pipe_lock.is_locked());
800 connect_seq = connect.connect_seq + 1;
801 peer_global_seq = connect.global_seq;
802 ceph_assert(state == STATE_ACCEPTING);
803 state = STATE_OPEN;
804 ldout(msgr->cct,10) << "accept success, connect_seq = " << connect_seq << ", sending READY" << dendl;
805
806 // send READY reply
807 reply.tag = (reply_tag ? reply_tag : CEPH_MSGR_TAG_READY);
808 reply.features = policy.features_supported;
809 reply.global_seq = msgr->get_global_seq();
810 reply.connect_seq = connect_seq;
811 reply.flags = 0;
812 reply.authorizer_len = authorizer_reply.length();
813 if (policy.lossy)
814 reply.flags = reply.flags | CEPH_MSG_CONNECT_LOSSY;
815
816 connection_state->set_features((uint64_t)reply.features & (uint64_t)connect.features);
817 ldout(msgr->cct,10) << "accept features " << connection_state->get_features() << dendl;
818
819 session_security.reset(
820 get_auth_session_handler(msgr->cct,
821 connect.authorizer_protocol,
822 session_key,
823 connection_state->get_features()));
824
825 // notify
826 msgr->dispatch_queue.queue_accept(connection_state.get());
827 msgr->ms_deliver_handle_fast_accept(connection_state.get());
828
829 // ok!
830 if (msgr->dispatch_queue.stop)
831 goto shutting_down;
832 removed = msgr->accepting_pipes.erase(this);
833 ceph_assert(removed == 1);
834 register_pipe();
835 msgr->lock.Unlock();
836 pipe_lock.Unlock();
837
838 r = tcp_write((char*)&reply, sizeof(reply));
839 if (r < 0) {
840 goto fail_registered;
841 }
842
843 if (reply.authorizer_len) {
844 r = tcp_write(authorizer_reply.c_str(), authorizer_reply.length());
845 if (r < 0) {
846 goto fail_registered;
847 }
848 }
849
850 if (reply_tag == CEPH_MSGR_TAG_SEQ) {
851 if (tcp_write((char*)&existing_seq, sizeof(existing_seq)) < 0) {
852 ldout(msgr->cct,2) << "accept write error on in_seq" << dendl;
853 goto fail_registered;
854 }
855 if (tcp_read((char*)&newly_acked_seq, sizeof(newly_acked_seq)) < 0) {
856 ldout(msgr->cct,2) << "accept read error on newly_acked_seq" << dendl;
857 goto fail_registered;
858 }
859 }
860
861 pipe_lock.Lock();
862 discard_requeued_up_to(newly_acked_seq);
863 if (state != STATE_CLOSED) {
864 ldout(msgr->cct,10) << "accept starting writer, state " << get_state_name() << dendl;
865 start_writer();
866 }
867 ldout(msgr->cct,20) << "accept done" << dendl;
868
869 maybe_start_delay_thread();
870
871 return 0; // success.
872
873 fail_registered:
874 ldout(msgr->cct, 10) << "accept fault after register" << dendl;
875
876 if (msgr->cct->_conf->ms_inject_internal_delays) {
877 ldout(msgr->cct, 10) << " sleep for " << msgr->cct->_conf->ms_inject_internal_delays << dendl;
878 utime_t t;
879 t.set_from_double(msgr->cct->_conf->ms_inject_internal_delays);
880 t.sleep();
881 }
882
883 fail_unlocked:
884 pipe_lock.Lock();
885 if (state != STATE_CLOSED) {
886 bool queued = is_queued();
887 ldout(msgr->cct, 10) << " queued = " << (int)queued << dendl;
888 if (queued) {
889 state = policy.server ? STATE_STANDBY : STATE_CONNECTING;
890 } else if (replaced) {
891 state = STATE_STANDBY;
892 } else {
893 state = STATE_CLOSED;
894 state_closed = true;
895 }
896 fault();
897 if (queued || replaced)
898 start_writer();
899 }
900 return -1;
901
902 shutting_down:
903 msgr->lock.Unlock();
904 shutting_down_msgr_unlocked:
905 ceph_assert(pipe_lock.is_locked());
906
907 if (msgr->cct->_conf->ms_inject_internal_delays) {
908 ldout(msgr->cct, 10) << " sleep for " << msgr->cct->_conf->ms_inject_internal_delays << dendl;
909 utime_t t;
910 t.set_from_double(msgr->cct->_conf->ms_inject_internal_delays);
911 t.sleep();
912 }
913
914 state = STATE_CLOSED;
915 state_closed = true;
916 fault();
917 return -1;
918 }
919
920 void Pipe::set_socket_options()
921 {
922 // disable Nagle algorithm?
923 if (msgr->cct->_conf->ms_tcp_nodelay) {
924 int flag = 1;
925 int r = ::setsockopt(sd, IPPROTO_TCP, TCP_NODELAY, (char*)&flag, sizeof(flag));
926 if (r < 0) {
927 r = -errno;
928 ldout(msgr->cct,0) << "couldn't set TCP_NODELAY: "
929 << cpp_strerror(r) << dendl;
930 }
931 }
932 if (msgr->cct->_conf->ms_tcp_rcvbuf) {
933 int size = msgr->cct->_conf->ms_tcp_rcvbuf;
934 int r = ::setsockopt(sd, SOL_SOCKET, SO_RCVBUF, (void*)&size, sizeof(size));
935 if (r < 0) {
936 r = -errno;
937 ldout(msgr->cct,0) << "couldn't set SO_RCVBUF to " << size
938 << ": " << cpp_strerror(r) << dendl;
939 }
940 }
941
942 // block ESIGPIPE
943 #ifdef CEPH_USE_SO_NOSIGPIPE
944 int val = 1;
945 int r = ::setsockopt(sd, SOL_SOCKET, SO_NOSIGPIPE, (void*)&val, sizeof(val));
946 if (r) {
947 r = -errno;
948 ldout(msgr->cct,0) << "couldn't set SO_NOSIGPIPE: "
949 << cpp_strerror(r) << dendl;
950 }
951 #endif
952
953 #ifdef SO_PRIORITY
954 int prio = msgr->get_socket_priority();
955 if (prio >= 0) {
956 int r = -1;
957 #ifdef IPTOS_CLASS_CS6
958 int iptos = IPTOS_CLASS_CS6;
959 int addr_family = 0;
960 if (!peer_addr.is_blank_ip()) {
961 addr_family = peer_addr.get_family();
962 } else {
963 addr_family = msgr->get_myaddr_legacy().get_family();
964 }
965 switch (addr_family) {
966 case AF_INET:
967 r = ::setsockopt(sd, IPPROTO_IP, IP_TOS, &iptos, sizeof(iptos));
968 break;
969 case AF_INET6:
970 r = ::setsockopt(sd, IPPROTO_IPV6, IPV6_TCLASS, &iptos, sizeof(iptos));
971 break;
972 default:
973 lderr(msgr->cct) << "couldn't set ToS of unknown family ("
974 << addr_family << ")"
975 << " to " << iptos << dendl;
976 return;
977 }
978 if (r < 0) {
979 r = -errno;
980 ldout(msgr->cct,0) << "couldn't set TOS to " << iptos
981 << ": " << cpp_strerror(r) << dendl;
982 }
983 #endif
984 // setsockopt(IPTOS_CLASS_CS6) sets the priority of the socket as 0.
985 // See http://goo.gl/QWhvsD and http://goo.gl/laTbjT
986 // We need to call setsockopt(SO_PRIORITY) after it.
987 r = ::setsockopt(sd, SOL_SOCKET, SO_PRIORITY, &prio, sizeof(prio));
988 if (r < 0) {
989 r = -errno;
990 ldout(msgr->cct,0) << "couldn't set SO_PRIORITY to " << prio
991 << ": " << cpp_strerror(r) << dendl;
992 }
993 }
994 #endif
995 }
996
997 int Pipe::connect()
998 {
999 ldout(msgr->cct,10) << "connect " << connect_seq << dendl;
1000 ceph_assert(pipe_lock.is_locked());
1001
1002 __u32 cseq = connect_seq;
1003 __u32 gseq = msgr->get_global_seq();
1004
1005 // stop reader thread
1006 join_reader();
1007
1008 pipe_lock.Unlock();
1009
1010 char tag = -1;
1011 int rc = -1;
1012 struct msghdr msg;
1013 struct iovec msgvec[2];
1014 int msglen;
1015 char banner[strlen(CEPH_BANNER) + 1]; // extra byte makes coverity happy
1016 entity_addr_t paddr;
1017 entity_addr_t peer_addr_for_me, socket_addr;
1018 AuthAuthorizer *authorizer = NULL;
1019 bufferlist addrbl, myaddrbl;
1020 const auto& conf = msgr->cct->_conf;
1021
1022 // close old socket. this is safe because we stopped the reader thread above.
1023 if (sd >= 0)
1024 ::close(sd);
1025
1026 // create socket?
1027 sd = socket_cloexec(peer_addr.get_family(), SOCK_STREAM, 0);
1028 if (sd < 0) {
1029 int e = errno;
1030 lderr(msgr->cct) << "connect couldn't create socket " << cpp_strerror(e) << dendl;
1031 rc = -e;
1032 goto fail;
1033 }
1034
1035 recv_reset();
1036
1037 set_socket_options();
1038
1039 {
1040 entity_addr_t addr2bind = msgr->get_myaddr_legacy();
1041 if (msgr->cct->_conf->ms_bind_before_connect && (!addr2bind.is_blank_ip())) {
1042 addr2bind.set_port(0);
1043 int r = ::bind(sd , addr2bind.get_sockaddr(), addr2bind.get_sockaddr_len());
1044 if (r < 0) {
1045 ldout(msgr->cct,2) << "client bind error " << ", " << cpp_strerror(errno) << dendl;
1046 goto fail;
1047 }
1048 }
1049 }
1050
1051 // connect!
1052 ldout(msgr->cct,10) << "connecting to " << peer_addr << dendl;
1053 rc = ::connect(sd, peer_addr.get_sockaddr(), peer_addr.get_sockaddr_len());
1054 if (rc < 0) {
1055 int stored_errno = errno;
1056 ldout(msgr->cct,2) << "connect error " << peer_addr
1057 << ", " << cpp_strerror(stored_errno) << dendl;
1058 if (stored_errno == ECONNREFUSED) {
1059 ldout(msgr->cct, 2) << "connection refused!" << dendl;
1060 msgr->dispatch_queue.queue_refused(connection_state.get());
1061 }
1062 goto fail;
1063 }
1064
1065 // verify banner
1066 // FIXME: this should be non-blocking, or in some other way verify the banner as we get it.
1067 rc = tcp_read((char*)&banner, strlen(CEPH_BANNER));
1068 if (rc < 0) {
1069 ldout(msgr->cct,2) << "connect couldn't read banner, " << cpp_strerror(rc) << dendl;
1070 goto fail;
1071 }
1072 if (memcmp(banner, CEPH_BANNER, strlen(CEPH_BANNER))) {
1073 ldout(msgr->cct,0) << "connect protocol error (bad banner) on peer " << peer_addr << dendl;
1074 goto fail;
1075 }
1076
1077 memset(&msg, 0, sizeof(msg));
1078 msgvec[0].iov_base = banner;
1079 msgvec[0].iov_len = strlen(CEPH_BANNER);
1080 msg.msg_iov = msgvec;
1081 msg.msg_iovlen = 1;
1082 msglen = msgvec[0].iov_len;
1083 rc = do_sendmsg(&msg, msglen);
1084 if (rc < 0) {
1085 ldout(msgr->cct,2) << "connect couldn't write my banner, " << cpp_strerror(rc) << dendl;
1086 goto fail;
1087 }
1088
1089 // identify peer
1090 {
1091 #if defined(__linux__) || defined(__APPLE__) || defined(__FreeBSD__)
1092 bufferptr p(sizeof(ceph_entity_addr) * 2);
1093 #else
1094 int wirelen = sizeof(__u32) * 2 + sizeof(ceph_sockaddr_storage);
1095 bufferptr p(wirelen * 2);
1096 #endif
1097 addrbl.push_back(std::move(p));
1098 }
1099 rc = tcp_read(addrbl.c_str(), addrbl.length());
1100 if (rc < 0) {
1101 ldout(msgr->cct,2) << "connect couldn't read peer addrs, " << cpp_strerror(rc) << dendl;
1102 goto fail;
1103 }
1104 try {
1105 auto p = addrbl.cbegin();
1106 decode(paddr, p);
1107 decode(peer_addr_for_me, p);
1108 }
1109 catch (buffer::error& e) {
1110 ldout(msgr->cct,2) << "connect couldn't decode peer addrs: " << e.what()
1111 << dendl;
1112 goto fail;
1113 }
1114 port = peer_addr_for_me.get_port();
1115
1116 ldout(msgr->cct,20) << "connect read peer addr " << paddr << " on socket " << sd << dendl;
1117 if (peer_addr != paddr) {
1118 if (paddr.is_blank_ip() &&
1119 peer_addr.get_port() == paddr.get_port() &&
1120 peer_addr.get_nonce() == paddr.get_nonce()) {
1121 ldout(msgr->cct,0) << "connect claims to be "
1122 << paddr << " not " << peer_addr << " - presumably this is the same node!" << dendl;
1123 } else {
1124 ldout(msgr->cct,10) << "connect claims to be "
1125 << paddr << " not " << peer_addr << dendl;
1126 goto fail;
1127 }
1128 }
1129
1130 ldout(msgr->cct,20) << "connect peer addr for me is " << peer_addr_for_me << dendl;
1131
1132 msgr->learned_addr(peer_addr_for_me);
1133
1134 encode(msgr->my_addr, myaddrbl, 0); // legacy
1135
1136 memset(&msg, 0, sizeof(msg));
1137 msgvec[0].iov_base = myaddrbl.c_str();
1138 msgvec[0].iov_len = myaddrbl.length();
1139 msg.msg_iov = msgvec;
1140 msg.msg_iovlen = 1;
1141 msglen = msgvec[0].iov_len;
1142 rc = do_sendmsg(&msg, msglen);
1143 if (rc < 0) {
1144 ldout(msgr->cct,2) << "connect couldn't write my addr, " << cpp_strerror(rc) << dendl;
1145 goto fail;
1146 }
1147 ldout(msgr->cct,10) << "connect sent my addr " << msgr->my_addr << dendl;
1148
1149
1150 while (1) {
1151 if (!authorizer) {
1152 authorizer = msgr->ms_deliver_get_authorizer(peer_type);
1153 }
1154 bufferlist authorizer_reply;
1155
1156 ceph_msg_connect connect;
1157 connect.features = policy.features_supported;
1158 connect.host_type = msgr->get_myname().type();
1159 connect.global_seq = gseq;
1160 connect.connect_seq = cseq;
1161 connect.protocol_version = msgr->get_proto_version(peer_type, true);
1162 connect.authorizer_protocol = authorizer ? authorizer->protocol : 0;
1163 connect.authorizer_len = authorizer ? authorizer->bl.length() : 0;
1164 if (authorizer)
1165 ldout(msgr->cct,10) << "connect.authorizer_len=" << connect.authorizer_len
1166 << " protocol=" << connect.authorizer_protocol << dendl;
1167 connect.flags = 0;
1168 if (policy.lossy)
1169 connect.flags |= CEPH_MSG_CONNECT_LOSSY; // this is fyi, actually, server decides!
1170 memset(&msg, 0, sizeof(msg));
1171 msgvec[0].iov_base = (char*)&connect;
1172 msgvec[0].iov_len = sizeof(connect);
1173 msg.msg_iov = msgvec;
1174 msg.msg_iovlen = 1;
1175 msglen = msgvec[0].iov_len;
1176 if (authorizer) {
1177 msgvec[1].iov_base = authorizer->bl.c_str();
1178 msgvec[1].iov_len = authorizer->bl.length();
1179 msg.msg_iovlen++;
1180 msglen += msgvec[1].iov_len;
1181 }
1182
1183 ldout(msgr->cct,10) << "connect sending gseq=" << gseq << " cseq=" << cseq
1184 << " proto=" << connect.protocol_version << dendl;
1185 rc = do_sendmsg(&msg, msglen);
1186 if (rc < 0) {
1187 ldout(msgr->cct,2) << "connect couldn't write gseq, cseq, " << cpp_strerror(rc) << dendl;
1188 goto fail;
1189 }
1190
1191 ldout(msgr->cct,20) << "connect wrote (self +) cseq, waiting for reply" << dendl;
1192 ceph_msg_connect_reply reply;
1193 rc = tcp_read((char*)&reply, sizeof(reply));
1194 if (rc < 0) {
1195 ldout(msgr->cct,2) << "connect read reply " << cpp_strerror(rc) << dendl;
1196 goto fail;
1197 }
1198
1199 ldout(msgr->cct,20) << "connect got reply tag " << (int)reply.tag
1200 << " connect_seq " << reply.connect_seq
1201 << " global_seq " << reply.global_seq
1202 << " proto " << reply.protocol_version
1203 << " flags " << (int)reply.flags
1204 << " features " << reply.features
1205 << dendl;
1206
1207 authorizer_reply.clear();
1208
1209 if (reply.authorizer_len) {
1210 ldout(msgr->cct,10) << "reply.authorizer_len=" << reply.authorizer_len << dendl;
1211 bufferptr bp = buffer::create(reply.authorizer_len);
1212 rc = tcp_read(bp.c_str(), reply.authorizer_len);
1213 if (rc < 0) {
1214 ldout(msgr->cct,10) << "connect couldn't read connect authorizer_reply" << cpp_strerror(rc) << dendl;
1215 goto fail;
1216 }
1217 authorizer_reply.push_back(bp);
1218 }
1219
1220 if (reply.tag == CEPH_MSGR_TAG_CHALLENGE_AUTHORIZER) {
1221 authorizer->add_challenge(msgr->cct, authorizer_reply);
1222 ldout(msgr->cct,10) << " got authorizer challenge, " << authorizer_reply.length()
1223 << " bytes" << dendl;
1224 continue;
1225 }
1226
1227 if (authorizer) {
1228 auto iter = authorizer_reply.cbegin();
1229 if (!authorizer->verify_reply(iter, nullptr /* connection_secret */)) {
1230 ldout(msgr->cct,0) << "failed verifying authorize reply" << dendl;
1231 goto fail;
1232 }
1233 }
1234
1235 if (conf->ms_inject_internal_delays) {
1236 ldout(msgr->cct, 10) << " sleep for " << msgr->cct->_conf->ms_inject_internal_delays << dendl;
1237 utime_t t;
1238 t.set_from_double(msgr->cct->_conf->ms_inject_internal_delays);
1239 t.sleep();
1240 }
1241
1242 pipe_lock.Lock();
1243 if (state != STATE_CONNECTING) {
1244 ldout(msgr->cct,0) << "connect got RESETSESSION but no longer connecting" << dendl;
1245 goto stop_locked;
1246 }
1247
1248 if (reply.tag == CEPH_MSGR_TAG_FEATURES) {
1249 ldout(msgr->cct,0) << "connect protocol feature mismatch, my " << std::hex
1250 << connect.features << " < peer " << reply.features
1251 << " missing " << (reply.features & ~policy.features_supported)
1252 << std::dec << dendl;
1253 goto fail_locked;
1254 }
1255
1256 if (reply.tag == CEPH_MSGR_TAG_BADPROTOVER) {
1257 ldout(msgr->cct,0) << "connect protocol version mismatch, my " << connect.protocol_version
1258 << " != " << reply.protocol_version << dendl;
1259 goto fail_locked;
1260 }
1261
1262 if (reply.tag == CEPH_MSGR_TAG_BADAUTHORIZER) {
1263 ldout(msgr->cct,0) << "connect got BADAUTHORIZER" << dendl;
1264 goto fail_locked;
1265 }
1266 if (reply.tag == CEPH_MSGR_TAG_RESETSESSION) {
1267 ldout(msgr->cct,0) << "connect got RESETSESSION" << dendl;
1268 was_session_reset();
1269 cseq = 0;
1270 pipe_lock.Unlock();
1271 continue;
1272 }
1273 if (reply.tag == CEPH_MSGR_TAG_RETRY_GLOBAL) {
1274 gseq = msgr->get_global_seq(reply.global_seq);
1275 ldout(msgr->cct,10) << "connect got RETRY_GLOBAL " << reply.global_seq
1276 << " chose new " << gseq << dendl;
1277 pipe_lock.Unlock();
1278 continue;
1279 }
1280 if (reply.tag == CEPH_MSGR_TAG_RETRY_SESSION) {
1281 ceph_assert(reply.connect_seq > connect_seq);
1282 ldout(msgr->cct,10) << "connect got RETRY_SESSION " << connect_seq
1283 << " -> " << reply.connect_seq << dendl;
1284 cseq = connect_seq = reply.connect_seq;
1285 pipe_lock.Unlock();
1286 continue;
1287 }
1288
1289 if (reply.tag == CEPH_MSGR_TAG_WAIT) {
1290 ldout(msgr->cct,3) << "connect got WAIT (connection race)" << dendl;
1291 state = STATE_WAIT;
1292 goto stop_locked;
1293 }
1294
1295 if (reply.tag == CEPH_MSGR_TAG_READY ||
1296 reply.tag == CEPH_MSGR_TAG_SEQ) {
1297 uint64_t feat_missing = policy.features_required & ~(uint64_t)reply.features;
1298 if (feat_missing) {
1299 ldout(msgr->cct,1) << "missing required features " << std::hex << feat_missing << std::dec << dendl;
1300 goto fail_locked;
1301 }
1302
1303 if (reply.tag == CEPH_MSGR_TAG_SEQ) {
1304 ldout(msgr->cct,10) << "got CEPH_MSGR_TAG_SEQ, reading acked_seq and writing in_seq" << dendl;
1305 uint64_t newly_acked_seq = 0;
1306 rc = tcp_read((char*)&newly_acked_seq, sizeof(newly_acked_seq));
1307 if (rc < 0) {
1308 ldout(msgr->cct,2) << "connect read error on newly_acked_seq" << cpp_strerror(rc) << dendl;
1309 goto fail_locked;
1310 }
1311 ldout(msgr->cct,2) << " got newly_acked_seq " << newly_acked_seq
1312 << " vs out_seq " << out_seq << dendl;
1313 while (newly_acked_seq > out_seq) {
1314 Message *m = _get_next_outgoing();
1315 ceph_assert(m);
1316 ldout(msgr->cct,2) << " discarding previously sent " << m->get_seq()
1317 << " " << *m << dendl;
1318 ceph_assert(m->get_seq() <= newly_acked_seq);
1319 m->put();
1320 ++out_seq;
1321 }
1322 if (tcp_write((char*)&in_seq, sizeof(in_seq)) < 0) {
1323 ldout(msgr->cct,2) << "connect write error on in_seq" << dendl;
1324 goto fail_locked;
1325 }
1326 }
1327
1328 // hooray!
1329 peer_global_seq = reply.global_seq;
1330 policy.lossy = reply.flags & CEPH_MSG_CONNECT_LOSSY;
1331 state = STATE_OPEN;
1332 connect_seq = cseq + 1;
1333 ceph_assert(connect_seq == reply.connect_seq);
1334 backoff = utime_t();
1335 connection_state->set_features((uint64_t)reply.features & (uint64_t)connect.features);
1336 ldout(msgr->cct,10) << "connect success " << connect_seq << ", lossy = " << policy.lossy
1337 << ", features " << connection_state->get_features() << dendl;
1338
1339
1340 // If we have an authorizer, get a new AuthSessionHandler to deal with ongoing security of the
1341 // connection. PLR
1342
1343 if (authorizer != NULL) {
1344 session_security.reset(
1345 get_auth_session_handler(
1346 msgr->cct,
1347 authorizer->protocol,
1348 authorizer->session_key,
1349 connection_state->get_features()));
1350 } else {
1351 // We have no authorizer, so we shouldn't be applying security to messages in this pipe. PLR
1352 session_security.reset();
1353 }
1354
1355 msgr->dispatch_queue.queue_connect(connection_state.get());
1356 msgr->ms_deliver_handle_fast_connect(connection_state.get());
1357
1358 if (!reader_running) {
1359 ldout(msgr->cct,20) << "connect starting reader" << dendl;
1360 start_reader();
1361 }
1362 maybe_start_delay_thread();
1363 delete authorizer;
1364 return 0;
1365 }
1366
1367 // protocol error
1368 ldout(msgr->cct,0) << "connect got bad tag " << (int)tag << dendl;
1369 goto fail_locked;
1370 }
1371
1372 fail:
1373 if (conf->ms_inject_internal_delays) {
1374 ldout(msgr->cct, 10) << " sleep for " << msgr->cct->_conf->ms_inject_internal_delays << dendl;
1375 utime_t t;
1376 t.set_from_double(msgr->cct->_conf->ms_inject_internal_delays);
1377 t.sleep();
1378 }
1379
1380 pipe_lock.Lock();
1381 fail_locked:
1382 if (state == STATE_CONNECTING)
1383 fault();
1384 else
1385 ldout(msgr->cct,3) << "connect fault, but state = " << get_state_name()
1386 << " != connecting, stopping" << dendl;
1387
1388 stop_locked:
1389 delete authorizer;
1390 return rc;
1391 }
1392
1393 void Pipe::register_pipe()
1394 {
1395 ldout(msgr->cct,10) << "register_pipe" << dendl;
1396 ceph_assert(msgr->lock.is_locked());
1397 Pipe *existing = msgr->_lookup_pipe(peer_addr);
1398 ceph_assert(existing == NULL);
1399 msgr->rank_pipe[peer_addr] = this;
1400 }
1401
1402 void Pipe::unregister_pipe()
1403 {
1404 ceph_assert(msgr->lock.is_locked());
1405 ceph::unordered_map<entity_addr_t,Pipe*>::iterator p = msgr->rank_pipe.find(peer_addr);
1406 if (p != msgr->rank_pipe.end() && p->second == this) {
1407 ldout(msgr->cct,10) << "unregister_pipe" << dendl;
1408 msgr->rank_pipe.erase(p);
1409 } else {
1410 ldout(msgr->cct,10) << "unregister_pipe - not registered" << dendl;
1411 msgr->accepting_pipes.erase(this); // somewhat overkill, but safe.
1412 }
1413 }
1414
1415 void Pipe::join()
1416 {
1417 ldout(msgr->cct, 20) << "join" << dendl;
1418 if (writer_thread.is_started())
1419 writer_thread.join();
1420 if (reader_thread.is_started())
1421 reader_thread.join();
1422 if (delay_thread) {
1423 ldout(msgr->cct, 20) << "joining delay_thread" << dendl;
1424 delay_thread->stop();
1425 delay_thread->join();
1426 }
1427 }
1428
1429 void Pipe::requeue_sent()
1430 {
1431 if (sent.empty())
1432 return;
1433
1434 list<Message*>& rq = out_q[CEPH_MSG_PRIO_HIGHEST];
1435 while (!sent.empty()) {
1436 Message *m = sent.back();
1437 sent.pop_back();
1438 ldout(msgr->cct,10) << "requeue_sent " << *m << " for resend seq " << out_seq
1439 << " (" << m->get_seq() << ")" << dendl;
1440 rq.push_front(m);
1441 out_seq--;
1442 }
1443 }
1444
1445 void Pipe::discard_requeued_up_to(uint64_t seq)
1446 {
1447 ldout(msgr->cct, 10) << "discard_requeued_up_to " << seq << dendl;
1448 if (out_q.count(CEPH_MSG_PRIO_HIGHEST) == 0) {
1449 out_seq = seq;
1450 return;
1451 }
1452 list<Message*>& rq = out_q[CEPH_MSG_PRIO_HIGHEST];
1453 while (!rq.empty()) {
1454 Message *m = rq.front();
1455 if (m->get_seq() == 0 || m->get_seq() > seq)
1456 break;
1457 ldout(msgr->cct,10) << "discard_requeued_up_to " << *m << " for resend seq " << out_seq
1458 << " <= " << seq << ", discarding" << dendl;
1459 m->put();
1460 rq.pop_front();
1461 out_seq++;
1462 }
1463 if (rq.empty())
1464 out_q.erase(CEPH_MSG_PRIO_HIGHEST);
1465 }
1466
1467 /*
1468 * Tears down the Pipe's message queues, and removes them from the DispatchQueue
1469 * Must hold pipe_lock prior to calling.
1470 */
1471 void Pipe::discard_out_queue()
1472 {
1473 ldout(msgr->cct,10) << "discard_queue" << dendl;
1474
1475 for (list<Message*>::iterator p = sent.begin(); p != sent.end(); ++p) {
1476 ldout(msgr->cct,20) << " discard " << *p << dendl;
1477 (*p)->put();
1478 }
1479 sent.clear();
1480 for (map<int,list<Message*> >::iterator p = out_q.begin(); p != out_q.end(); ++p)
1481 for (list<Message*>::iterator r = p->second.begin(); r != p->second.end(); ++r) {
1482 ldout(msgr->cct,20) << " discard " << *r << dendl;
1483 (*r)->put();
1484 }
1485 out_q.clear();
1486 }
1487
1488 void Pipe::fault(bool onread)
1489 {
1490 const auto& conf = msgr->cct->_conf;
1491 ceph_assert(pipe_lock.is_locked());
1492 cond.Signal();
1493
1494 if (onread && state == STATE_CONNECTING) {
1495 ldout(msgr->cct,10) << "fault already connecting, reader shutting down" << dendl;
1496 return;
1497 }
1498
1499 ldout(msgr->cct,2) << "fault " << cpp_strerror(errno) << dendl;
1500
1501 if (state == STATE_CLOSED ||
1502 state == STATE_CLOSING) {
1503 ldout(msgr->cct,10) << "fault already closed|closing" << dendl;
1504 if (connection_state->clear_pipe(this))
1505 msgr->dispatch_queue.queue_reset(connection_state.get());
1506 return;
1507 }
1508
1509 shutdown_socket();
1510
1511 // lossy channel?
1512 if (policy.lossy && state != STATE_CONNECTING) {
1513 ldout(msgr->cct,10) << "fault on lossy channel, failing" << dendl;
1514
1515 // disconnect from Connection, and mark it failed. future messages
1516 // will be dropped.
1517 ceph_assert(connection_state);
1518 stop();
1519 bool cleared = connection_state->clear_pipe(this);
1520
1521 // crib locks, blech. note that Pipe is now STATE_CLOSED and the
1522 // rank_pipe entry is ignored by others.
1523 pipe_lock.Unlock();
1524
1525 if (conf->ms_inject_internal_delays) {
1526 ldout(msgr->cct, 10) << " sleep for " << msgr->cct->_conf->ms_inject_internal_delays << dendl;
1527 utime_t t;
1528 t.set_from_double(msgr->cct->_conf->ms_inject_internal_delays);
1529 t.sleep();
1530 }
1531
1532 msgr->lock.Lock();
1533 pipe_lock.Lock();
1534 unregister_pipe();
1535 msgr->lock.Unlock();
1536
1537 if (delay_thread)
1538 delay_thread->discard();
1539 in_q->discard_queue(conn_id);
1540 discard_out_queue();
1541 if (cleared)
1542 msgr->dispatch_queue.queue_reset(connection_state.get());
1543 return;
1544 }
1545
1546 // queue delayed items immediately
1547 if (delay_thread)
1548 delay_thread->flush();
1549
1550 // requeue sent items
1551 requeue_sent();
1552
1553 if (policy.standby && !is_queued()) {
1554 ldout(msgr->cct,0) << "fault with nothing to send, going to standby" << dendl;
1555 state = STATE_STANDBY;
1556 return;
1557 }
1558
1559 if (state != STATE_CONNECTING) {
1560 if (policy.server) {
1561 ldout(msgr->cct,0) << "fault, server, going to standby" << dendl;
1562 state = STATE_STANDBY;
1563 } else {
1564 ldout(msgr->cct,0) << "fault, initiating reconnect" << dendl;
1565 connect_seq++;
1566 state = STATE_CONNECTING;
1567 }
1568 backoff = utime_t();
1569 } else if (backoff == utime_t()) {
1570 ldout(msgr->cct,0) << "fault" << dendl;
1571 backoff.set_from_double(conf->ms_initial_backoff);
1572 } else {
1573 ldout(msgr->cct,10) << "fault waiting " << backoff << dendl;
1574 cond.WaitInterval(pipe_lock, backoff);
1575 backoff += backoff;
1576 if (backoff > conf->ms_max_backoff)
1577 backoff.set_from_double(conf->ms_max_backoff);
1578 ldout(msgr->cct,10) << "fault done waiting or woke up" << dendl;
1579 }
1580 }
1581
1582 void Pipe::randomize_out_seq()
1583 {
1584 if (connection_state->get_features() & CEPH_FEATURE_MSG_AUTH) {
1585 // Set out_seq to a random value, so CRC won't be predictable.
1586 out_seq = ceph::util::generate_random_number<uint64_t>(0, SEQ_MASK);
1587 lsubdout(msgr->cct, ms, 10) << "randomize_out_seq " << out_seq << dendl;
1588 } else {
1589 // previously, seq #'s always started at 0.
1590 out_seq = 0;
1591 }
1592 }
1593
1594 void Pipe::was_session_reset()
1595 {
1596 ceph_assert(pipe_lock.is_locked());
1597
1598 ldout(msgr->cct,10) << "was_session_reset" << dendl;
1599 in_q->discard_queue(conn_id);
1600 if (delay_thread)
1601 delay_thread->discard();
1602 discard_out_queue();
1603
1604 msgr->dispatch_queue.queue_remote_reset(connection_state.get());
1605
1606 randomize_out_seq();
1607
1608 in_seq = 0;
1609 connect_seq = 0;
1610 }
1611
1612 void Pipe::stop()
1613 {
1614 ldout(msgr->cct,10) << "stop" << dendl;
1615 ceph_assert(pipe_lock.is_locked());
1616 state = STATE_CLOSED;
1617 state_closed = true;
1618 cond.Signal();
1619 shutdown_socket();
1620 }
1621
1622 void Pipe::stop_and_wait()
1623 {
1624 ceph_assert(pipe_lock.is_locked_by_me());
1625 if (state != STATE_CLOSED)
1626 stop();
1627
1628 if (msgr->cct->_conf->ms_inject_internal_delays) {
1629 ldout(msgr->cct, 10) << __func__ << " sleep for "
1630 << msgr->cct->_conf->ms_inject_internal_delays
1631 << dendl;
1632 utime_t t;
1633 t.set_from_double(msgr->cct->_conf->ms_inject_internal_delays);
1634 t.sleep();
1635 }
1636
1637 if (delay_thread) {
1638 pipe_lock.Unlock();
1639 delay_thread->stop_fast_dispatching();
1640 pipe_lock.Lock();
1641 }
1642 while (reader_running &&
1643 reader_dispatching)
1644 cond.Wait(pipe_lock);
1645 }
1646
1647 /* read msgs from socket.
1648 * also, server.
1649 */
1650 void Pipe::reader()
1651 {
1652 pipe_lock.Lock();
1653
1654 if (state == STATE_ACCEPTING) {
1655 accept();
1656 ceph_assert(pipe_lock.is_locked());
1657 }
1658
1659 // loop.
1660 while (state != STATE_CLOSED &&
1661 state != STATE_CONNECTING) {
1662 ceph_assert(pipe_lock.is_locked());
1663
1664 // sleep if (re)connecting
1665 if (state == STATE_STANDBY) {
1666 ldout(msgr->cct,20) << "reader sleeping during reconnect|standby" << dendl;
1667 cond.Wait(pipe_lock);
1668 continue;
1669 }
1670
1671 // get a reference to the AuthSessionHandler while we have the pipe_lock
1672 std::shared_ptr<AuthSessionHandler> auth_handler = session_security;
1673
1674 pipe_lock.Unlock();
1675
1676 char tag = -1;
1677 ldout(msgr->cct,20) << "reader reading tag..." << dendl;
1678 if (tcp_read((char*)&tag, 1) < 0) {
1679 pipe_lock.Lock();
1680 ldout(msgr->cct,2) << "reader couldn't read tag, " << cpp_strerror(errno) << dendl;
1681 fault(true);
1682 continue;
1683 }
1684
1685 if (tag == CEPH_MSGR_TAG_KEEPALIVE) {
1686 ldout(msgr->cct,2) << "reader got KEEPALIVE" << dendl;
1687 pipe_lock.Lock();
1688 connection_state->set_last_keepalive(ceph_clock_now());
1689 continue;
1690 }
1691 if (tag == CEPH_MSGR_TAG_KEEPALIVE2) {
1692 ldout(msgr->cct,30) << "reader got KEEPALIVE2 tag ..." << dendl;
1693 ceph_timespec t;
1694 int rc = tcp_read((char*)&t, sizeof(t));
1695 pipe_lock.Lock();
1696 if (rc < 0) {
1697 ldout(msgr->cct,2) << "reader couldn't read KEEPALIVE2 stamp "
1698 << cpp_strerror(errno) << dendl;
1699 fault(true);
1700 } else {
1701 send_keepalive_ack = true;
1702 keepalive_ack_stamp = utime_t(t);
1703 ldout(msgr->cct,2) << "reader got KEEPALIVE2 " << keepalive_ack_stamp
1704 << dendl;
1705 connection_state->set_last_keepalive(ceph_clock_now());
1706 cond.Signal();
1707 }
1708 continue;
1709 }
1710 if (tag == CEPH_MSGR_TAG_KEEPALIVE2_ACK) {
1711 ldout(msgr->cct,2) << "reader got KEEPALIVE_ACK" << dendl;
1712 struct ceph_timespec t;
1713 int rc = tcp_read((char*)&t, sizeof(t));
1714 pipe_lock.Lock();
1715 if (rc < 0) {
1716 ldout(msgr->cct,2) << "reader couldn't read KEEPALIVE2 stamp " << cpp_strerror(errno) << dendl;
1717 fault(true);
1718 } else {
1719 connection_state->set_last_keepalive_ack(utime_t(t));
1720 }
1721 continue;
1722 }
1723
1724 // open ...
1725 if (tag == CEPH_MSGR_TAG_ACK) {
1726 ldout(msgr->cct,20) << "reader got ACK" << dendl;
1727 ceph_le64 seq;
1728 int rc = tcp_read((char*)&seq, sizeof(seq));
1729 pipe_lock.Lock();
1730 if (rc < 0) {
1731 ldout(msgr->cct,2) << "reader couldn't read ack seq, " << cpp_strerror(errno) << dendl;
1732 fault(true);
1733 } else if (state != STATE_CLOSED) {
1734 handle_ack(seq);
1735 }
1736 continue;
1737 }
1738
1739 else if (tag == CEPH_MSGR_TAG_MSG) {
1740 ldout(msgr->cct,20) << "reader got MSG" << dendl;
1741 Message *m = 0;
1742 int r = read_message(&m, auth_handler.get());
1743
1744 pipe_lock.Lock();
1745
1746 if (!m) {
1747 if (r < 0)
1748 fault(true);
1749 continue;
1750 }
1751
1752 m->trace.event("pipe read message");
1753
1754 if (state == STATE_CLOSED ||
1755 state == STATE_CONNECTING) {
1756 in_q->dispatch_throttle_release(m->get_dispatch_throttle_size());
1757 m->put();
1758 continue;
1759 }
1760
1761 // check received seq#. if it is old, drop the message.
1762 // note that incoming messages may skip ahead. this is convenient for the client
1763 // side queueing because messages can't be renumbered, but the (kernel) client will
1764 // occasionally pull a message out of the sent queue to send elsewhere. in that case
1765 // it doesn't matter if we "got" it or not.
1766 if (m->get_seq() <= in_seq) {
1767 ldout(msgr->cct,0) << "reader got old message "
1768 << m->get_seq() << " <= " << in_seq << " " << m << " " << *m
1769 << ", discarding" << dendl;
1770 in_q->dispatch_throttle_release(m->get_dispatch_throttle_size());
1771 m->put();
1772 if (connection_state->has_feature(CEPH_FEATURE_RECONNECT_SEQ) &&
1773 msgr->cct->_conf->ms_die_on_old_message)
1774 ceph_abort_msg("old msgs despite reconnect_seq feature");
1775 continue;
1776 }
1777 if (m->get_seq() > in_seq + 1) {
1778 ldout(msgr->cct,0) << "reader missed message? skipped from seq "
1779 << in_seq << " to " << m->get_seq() << dendl;
1780 if (msgr->cct->_conf->ms_die_on_skipped_message)
1781 ceph_abort_msg("skipped incoming seq");
1782 }
1783
1784 m->set_connection(connection_state.get());
1785
1786 // note last received message.
1787 in_seq = m->get_seq();
1788
1789 cond.Signal(); // wake up writer, to ack this
1790
1791 ldout(msgr->cct,10) << "reader got message "
1792 << m->get_seq() << " " << m << " " << *m
1793 << dendl;
1794 in_q->fast_preprocess(m);
1795
1796 if (delay_thread) {
1797 utime_t release;
1798 if (rand() % 10000 < msgr->cct->_conf->ms_inject_delay_probability * 10000.0) {
1799 release = m->get_recv_stamp();
1800 release += msgr->cct->_conf->ms_inject_delay_max * (double)(rand() % 10000) / 10000.0;
1801 lsubdout(msgr->cct, ms, 1) << "queue_received will delay until " << release << " on " << m << " " << *m << dendl;
1802 }
1803 delay_thread->queue(release, m);
1804 } else {
1805 if (in_q->can_fast_dispatch(m)) {
1806 reader_dispatching = true;
1807 pipe_lock.Unlock();
1808 in_q->fast_dispatch(m);
1809 pipe_lock.Lock();
1810 reader_dispatching = false;
1811 if (state == STATE_CLOSED ||
1812 notify_on_dispatch_done) { // there might be somebody waiting
1813 notify_on_dispatch_done = false;
1814 cond.Signal();
1815 }
1816 } else {
1817 in_q->enqueue(m, m->get_priority(), conn_id);
1818 }
1819 }
1820 }
1821
1822 else if (tag == CEPH_MSGR_TAG_CLOSE) {
1823 ldout(msgr->cct,20) << "reader got CLOSE" << dendl;
1824 pipe_lock.Lock();
1825 if (state == STATE_CLOSING) {
1826 state = STATE_CLOSED;
1827 state_closed = true;
1828 } else {
1829 state = STATE_CLOSING;
1830 }
1831 cond.Signal();
1832 break;
1833 }
1834 else {
1835 ldout(msgr->cct,0) << "reader bad tag " << (int)tag << dendl;
1836 pipe_lock.Lock();
1837 fault(true);
1838 }
1839 }
1840
1841
1842 // reap?
1843 reader_running = false;
1844 reader_needs_join = true;
1845 unlock_maybe_reap();
1846 ldout(msgr->cct,10) << "reader done" << dendl;
1847 }
1848
1849 /* write msgs to socket.
1850 * also, client.
1851 */
1852 void Pipe::writer()
1853 {
1854 pipe_lock.Lock();
1855 while (state != STATE_CLOSED) {// && state != STATE_WAIT) {
1856 ldout(msgr->cct,10) << "writer: state = " << get_state_name()
1857 << " policy.server=" << policy.server << dendl;
1858
1859 // standby?
1860 if (is_queued() && state == STATE_STANDBY && !policy.server)
1861 state = STATE_CONNECTING;
1862
1863 // connect?
1864 if (state == STATE_CONNECTING) {
1865 ceph_assert(!policy.server);
1866 connect();
1867 continue;
1868 }
1869
1870 if (state == STATE_CLOSING) {
1871 // write close tag
1872 ldout(msgr->cct,20) << "writer writing CLOSE tag" << dendl;
1873 char tag = CEPH_MSGR_TAG_CLOSE;
1874 state = STATE_CLOSED;
1875 state_closed = true;
1876 pipe_lock.Unlock();
1877 if (sd >= 0) {
1878 // we can ignore return value, actually; we don't care if this succeeds.
1879 int r = ::write(sd, &tag, 1);
1880 (void)r;
1881 }
1882 pipe_lock.Lock();
1883 continue;
1884 }
1885
1886 if (state != STATE_CONNECTING && state != STATE_WAIT && state != STATE_STANDBY &&
1887 (is_queued() || in_seq > in_seq_acked)) {
1888
1889 // keepalive?
1890 if (send_keepalive) {
1891 int rc;
1892 if (connection_state->has_feature(CEPH_FEATURE_MSGR_KEEPALIVE2)) {
1893 pipe_lock.Unlock();
1894 rc = write_keepalive2(CEPH_MSGR_TAG_KEEPALIVE2,
1895 ceph_clock_now());
1896 } else {
1897 pipe_lock.Unlock();
1898 rc = write_keepalive();
1899 }
1900 pipe_lock.Lock();
1901 if (rc < 0) {
1902 ldout(msgr->cct,2) << "writer couldn't write keepalive[2], "
1903 << cpp_strerror(errno) << dendl;
1904 fault();
1905 continue;
1906 }
1907 send_keepalive = false;
1908 }
1909 if (send_keepalive_ack) {
1910 utime_t t = keepalive_ack_stamp;
1911 pipe_lock.Unlock();
1912 int rc = write_keepalive2(CEPH_MSGR_TAG_KEEPALIVE2_ACK, t);
1913 pipe_lock.Lock();
1914 if (rc < 0) {
1915 ldout(msgr->cct,2) << "writer couldn't write keepalive_ack, " << cpp_strerror(errno) << dendl;
1916 fault();
1917 continue;
1918 }
1919 send_keepalive_ack = false;
1920 }
1921
1922 // send ack?
1923 if (in_seq > in_seq_acked) {
1924 uint64_t send_seq = in_seq;
1925 pipe_lock.Unlock();
1926 int rc = write_ack(send_seq);
1927 pipe_lock.Lock();
1928 if (rc < 0) {
1929 ldout(msgr->cct,2) << "writer couldn't write ack, " << cpp_strerror(errno) << dendl;
1930 fault();
1931 continue;
1932 }
1933 in_seq_acked = send_seq;
1934 }
1935
1936 // grab outgoing message
1937 Message *m = _get_next_outgoing();
1938 if (m) {
1939 m->set_seq(++out_seq);
1940 if (!policy.lossy) {
1941 // put on sent list
1942 sent.push_back(m);
1943 m->get();
1944 }
1945
1946 // associate message with Connection (for benefit of encode_payload)
1947 m->set_connection(connection_state.get());
1948
1949 uint64_t features = connection_state->get_features();
1950
1951 if (m->empty_payload())
1952 ldout(msgr->cct,20) << "writer encoding " << m->get_seq() << " features " << features
1953 << " " << m << " " << *m << dendl;
1954 else
1955 ldout(msgr->cct,20) << "writer half-reencoding " << m->get_seq() << " features " << features
1956 << " " << m << " " << *m << dendl;
1957
1958 // encode and copy out of *m
1959 m->encode(features, msgr->crcflags);
1960
1961 // prepare everything
1962 const ceph_msg_header& header = m->get_header();
1963 const ceph_msg_footer& footer = m->get_footer();
1964
1965 // Now that we have all the crcs calculated, handle the
1966 // digital signature for the message, if the pipe has session
1967 // security set up. Some session security options do not
1968 // actually calculate and check the signature, but they should
1969 // handle the calls to sign_message and check_signature. PLR
1970 if (session_security.get() == NULL) {
1971 ldout(msgr->cct, 20) << "writer no session security" << dendl;
1972 } else {
1973 if (session_security->sign_message(m)) {
1974 ldout(msgr->cct, 20) << "writer failed to sign seq # " << header.seq
1975 << "): sig = " << footer.sig << dendl;
1976 } else {
1977 ldout(msgr->cct, 20) << "writer signed seq # " << header.seq
1978 << "): sig = " << footer.sig << dendl;
1979 }
1980 }
1981
1982 bufferlist blist = m->get_payload();
1983 blist.append(m->get_middle());
1984 blist.append(m->get_data());
1985
1986 pipe_lock.Unlock();
1987
1988 m->trace.event("pipe writing message");
1989
1990 ldout(msgr->cct,20) << "writer sending " << m->get_seq() << " " << m << dendl;
1991 int rc = write_message(header, footer, blist);
1992
1993 pipe_lock.Lock();
1994 if (rc < 0) {
1995 ldout(msgr->cct,1) << "writer error sending " << m << ", "
1996 << cpp_strerror(errno) << dendl;
1997 fault();
1998 }
1999 m->put();
2000 }
2001 continue;
2002 }
2003
2004 // wait
2005 ldout(msgr->cct,20) << "writer sleeping" << dendl;
2006 cond.Wait(pipe_lock);
2007 }
2008
2009 ldout(msgr->cct,20) << "writer finishing" << dendl;
2010
2011 // reap?
2012 writer_running = false;
2013 unlock_maybe_reap();
2014 ldout(msgr->cct,10) << "writer done" << dendl;
2015 }
2016
2017 void Pipe::unlock_maybe_reap()
2018 {
2019 if (!reader_running && !writer_running) {
2020 shutdown_socket();
2021 pipe_lock.Unlock();
2022 if (delay_thread && delay_thread->is_flushing()) {
2023 delay_thread->wait_for_flush();
2024 }
2025 msgr->queue_reap(this);
2026 } else {
2027 pipe_lock.Unlock();
2028 }
2029 }
2030
2031 static void alloc_aligned_buffer(bufferlist& data, unsigned len, unsigned off)
2032 {
2033 // create a buffer to read into that matches the data alignment
2034 unsigned left = len;
2035 if (off & ~CEPH_PAGE_MASK) {
2036 // head
2037 unsigned head = 0;
2038 head = std::min<uint64_t>(CEPH_PAGE_SIZE - (off & ~CEPH_PAGE_MASK), left);
2039 data.push_back(buffer::create(head));
2040 left -= head;
2041 }
2042 unsigned middle = left & CEPH_PAGE_MASK;
2043 if (middle > 0) {
2044 data.push_back(buffer::create_small_page_aligned(middle));
2045 left -= middle;
2046 }
2047 if (left) {
2048 data.push_back(buffer::create(left));
2049 }
2050 }
2051
2052 int Pipe::read_message(Message **pm, AuthSessionHandler* auth_handler)
2053 {
2054 int ret = -1;
2055 // envelope
2056 //ldout(msgr->cct,10) << "receiver.read_message from sd " << sd << dendl;
2057
2058 ceph_msg_header header;
2059 ceph_msg_footer footer;
2060 __u32 header_crc = 0;
2061
2062 if (tcp_read((char*)&header, sizeof(header)) < 0)
2063 return -1;
2064 if (msgr->crcflags & MSG_CRC_HEADER) {
2065 header_crc = ceph_crc32c(0, (unsigned char *)&header, sizeof(header) - sizeof(header.crc));
2066 }
2067
2068 ldout(msgr->cct,20) << "reader got envelope type=" << header.type
2069 << " src " << entity_name_t(header.src)
2070 << " front=" << header.front_len
2071 << " data=" << header.data_len
2072 << " off " << header.data_off
2073 << dendl;
2074
2075 // verify header crc
2076 if ((msgr->crcflags & MSG_CRC_HEADER) && header_crc != header.crc) {
2077 ldout(msgr->cct,0) << "reader got bad header crc " << header_crc << " != " << header.crc << dendl;
2078 return -1;
2079 }
2080
2081 bufferlist front, middle, data;
2082 int front_len, middle_len;
2083 unsigned data_len, data_off;
2084 int aborted;
2085 Message *message;
2086 utime_t recv_stamp = ceph_clock_now();
2087
2088 if (policy.throttler_messages) {
2089 ldout(msgr->cct,10) << "reader wants " << 1 << " message from policy throttler "
2090 << policy.throttler_messages->get_current() << "/"
2091 << policy.throttler_messages->get_max() << dendl;
2092 policy.throttler_messages->get();
2093 }
2094
2095 uint64_t message_size = header.front_len + header.middle_len + header.data_len;
2096 if (message_size) {
2097 if (policy.throttler_bytes) {
2098 ldout(msgr->cct,10) << "reader wants " << message_size << " bytes from policy throttler "
2099 << policy.throttler_bytes->get_current() << "/"
2100 << policy.throttler_bytes->get_max() << dendl;
2101 policy.throttler_bytes->get(message_size);
2102 }
2103
2104 // throttle total bytes waiting for dispatch. do this _after_ the
2105 // policy throttle, as this one does not deadlock (unless dispatch
2106 // blocks indefinitely, which it shouldn't). in contrast, the
2107 // policy throttle carries for the lifetime of the message.
2108 ldout(msgr->cct,10) << "reader wants " << message_size << " from dispatch throttler "
2109 << in_q->dispatch_throttler.get_current() << "/"
2110 << in_q->dispatch_throttler.get_max() << dendl;
2111 in_q->dispatch_throttler.get(message_size);
2112 }
2113
2114 utime_t throttle_stamp = ceph_clock_now();
2115
2116 // read front
2117 front_len = header.front_len;
2118 if (front_len) {
2119 bufferptr bp = buffer::create(front_len);
2120 if (tcp_read(bp.c_str(), front_len) < 0)
2121 goto out_dethrottle;
2122 front.push_back(std::move(bp));
2123 ldout(msgr->cct,20) << "reader got front " << front.length() << dendl;
2124 }
2125
2126 // read middle
2127 middle_len = header.middle_len;
2128 if (middle_len) {
2129 bufferptr bp = buffer::create(middle_len);
2130 if (tcp_read(bp.c_str(), middle_len) < 0)
2131 goto out_dethrottle;
2132 middle.push_back(std::move(bp));
2133 ldout(msgr->cct,20) << "reader got middle " << middle.length() << dendl;
2134 }
2135
2136
2137 // read data
2138 data_len = le32_to_cpu(header.data_len);
2139 data_off = le32_to_cpu(header.data_off);
2140 if (data_len) {
2141 unsigned offset = 0;
2142 unsigned left = data_len;
2143
2144 bufferlist newbuf, rxbuf;
2145 bufferlist::iterator blp;
2146 // int rxbuf_version = 0;
2147
2148 while (left > 0) {
2149 // wait for data
2150 if (tcp_read_wait() < 0)
2151 goto out_dethrottle;
2152
2153 // get a buffer
2154 #if 0
2155 // The rx_buffers implementation is buggy:
2156 // - see http://tracker.ceph.com/issues/22480
2157 //
2158 // - From inspection, I think that we have problems if we read *part*
2159 // of the message into an rx_buffer, then drop the lock, someone revokes,
2160 // and then later try to read the rest. In that case our final bufferlist
2161 // will have part of the original static_buffer from the first chunk and
2162 // partly a piece that we allocated. I think that to make this correct,
2163 // we need to keep the bufferlist we are reading into in Connection under
2164 // the lock, and on revoke, if the data is partly read, rebuild() to copy
2165 // into fresh buffers so that all references to our static buffer are
2166 // cleared up.
2167 //
2168 // - Also... what happens if we fully read into the static
2169 // buffer, then revoke? We still have some bufferlist out there
2170 // in the process of getting dispatched back to objecter or
2171 // librados that references the static buffer.
2172 connection_state->lock.Lock();
2173 map<ceph_tid_t,pair<bufferlist,int> >::iterator p = connection_state->rx_buffers.find(header.tid);
2174 if (p != connection_state->rx_buffers.end()) {
2175 if (rxbuf.length() == 0 || p->second.second != rxbuf_version) {
2176 ldout(msgr->cct,10) << "reader seleting rx buffer v " << p->second.second
2177 << " at offset " << offset
2178 << " len " << p->second.first.length() << dendl;
2179 rxbuf = p->second.first;
2180 rxbuf_version = p->second.second;
2181 // make sure it's big enough
2182 if (rxbuf.length() < data_len)
2183 rxbuf.push_back(buffer::create(data_len - rxbuf.length()));
2184 blp = p->second.first.begin();
2185 blp.advance(offset);
2186 }
2187 } else {
2188 if (!newbuf.length()) {
2189 ldout(msgr->cct,20) << "reader allocating new rx buffer at offset " << offset << dendl;
2190 alloc_aligned_buffer(newbuf, data_len, data_off);
2191 blp = newbuf.begin();
2192 blp.advance(offset);
2193 }
2194 }
2195 bufferptr bp = blp.get_current_ptr();
2196 int read = std::min(bp.length(), left);
2197 ldout(msgr->cct,20) << "reader reading nonblocking into " << (void*)bp.c_str() << " len " << bp.length() << dendl;
2198 ssize_t got = tcp_read_nonblocking(bp.c_str(), read);
2199 ldout(msgr->cct,30) << "reader read " << got << " of " << read << dendl;
2200 connection_state->lock.Unlock();
2201 #else
2202 // rx_buffer-less implementation
2203 if (!newbuf.length()) {
2204 ldout(msgr->cct,20) << "reader allocating new rx buffer at offset "
2205 << offset << dendl;
2206 alloc_aligned_buffer(newbuf, data_len, data_off);
2207 blp = newbuf.begin();
2208 blp.advance(offset);
2209 }
2210 bufferptr bp = blp.get_current_ptr();
2211 int read = std::min(bp.length(), left);
2212 ldout(msgr->cct,20) << "reader reading nonblocking into "
2213 << (void*)bp.c_str() << " len " << bp.length()
2214 << dendl;
2215 ssize_t got = tcp_read_nonblocking(bp.c_str(), read);
2216 ldout(msgr->cct,30) << "reader read " << got << " of " << read << dendl;
2217 #endif
2218 if (got < 0)
2219 goto out_dethrottle;
2220 if (got > 0) {
2221 blp.advance(static_cast<size_t>(got));
2222 data.append(bp, 0, got);
2223 offset += got;
2224 left -= got;
2225 } // else we got a signal or something; just loop.
2226 }
2227 }
2228
2229 // footer
2230 if (connection_state->has_feature(CEPH_FEATURE_MSG_AUTH)) {
2231 if (tcp_read((char*)&footer, sizeof(footer)) < 0)
2232 goto out_dethrottle;
2233 } else {
2234 ceph_msg_footer_old old_footer;
2235 if (tcp_read((char*)&old_footer, sizeof(old_footer)) < 0)
2236 goto out_dethrottle;
2237 footer.front_crc = old_footer.front_crc;
2238 footer.middle_crc = old_footer.middle_crc;
2239 footer.data_crc = old_footer.data_crc;
2240 footer.sig = 0;
2241 footer.flags = old_footer.flags;
2242 }
2243
2244 aborted = (footer.flags & CEPH_MSG_FOOTER_COMPLETE) == 0;
2245 ldout(msgr->cct,10) << "aborted = " << aborted << dendl;
2246 if (aborted) {
2247 ldout(msgr->cct,0) << "reader got " << front.length() << " + " << middle.length() << " + " << data.length()
2248 << " byte message.. ABORTED" << dendl;
2249 ret = 0;
2250 goto out_dethrottle;
2251 }
2252
2253 ldout(msgr->cct,20) << "reader got " << front.length() << " + " << middle.length() << " + " << data.length()
2254 << " byte message" << dendl;
2255 message = decode_message(msgr->cct, msgr->crcflags, header, footer,
2256 front, middle, data, connection_state.get());
2257 if (!message) {
2258 ret = -EINVAL;
2259 goto out_dethrottle;
2260 }
2261
2262 //
2263 // Check the signature if one should be present. A zero return indicates success. PLR
2264 //
2265
2266 if (auth_handler == NULL) {
2267 ldout(msgr->cct, 10) << "No session security set" << dendl;
2268 } else {
2269 if (auth_handler->check_message_signature(message)) {
2270 ldout(msgr->cct, 0) << "Signature check failed" << dendl;
2271 message->put();
2272 ret = -EINVAL;
2273 goto out_dethrottle;
2274 }
2275 }
2276
2277 message->set_byte_throttler(policy.throttler_bytes);
2278 message->set_message_throttler(policy.throttler_messages);
2279
2280 // store reservation size in message, so we don't get confused
2281 // by messages entering the dispatch queue through other paths.
2282 message->set_dispatch_throttle_size(message_size);
2283
2284 message->set_recv_stamp(recv_stamp);
2285 message->set_throttle_stamp(throttle_stamp);
2286 message->set_recv_complete_stamp(ceph_clock_now());
2287
2288 *pm = message;
2289 return 0;
2290
2291 out_dethrottle:
2292 // release bytes reserved from the throttlers on failure
2293 if (policy.throttler_messages) {
2294 ldout(msgr->cct,10) << "reader releasing " << 1 << " message to policy throttler "
2295 << policy.throttler_messages->get_current() << "/"
2296 << policy.throttler_messages->get_max() << dendl;
2297 policy.throttler_messages->put();
2298 }
2299 if (message_size) {
2300 if (policy.throttler_bytes) {
2301 ldout(msgr->cct,10) << "reader releasing " << message_size << " bytes to policy throttler "
2302 << policy.throttler_bytes->get_current() << "/"
2303 << policy.throttler_bytes->get_max() << dendl;
2304 policy.throttler_bytes->put(message_size);
2305 }
2306
2307 in_q->dispatch_throttle_release(message_size);
2308 }
2309 return ret;
2310 }
2311
2312 int Pipe::do_sendmsg(struct msghdr *msg, unsigned len, bool more)
2313 {
2314 MSGR_SIGPIPE_STOPPER;
2315 while (len > 0) {
2316 int r;
2317 r = ::sendmsg(sd, msg, MSG_NOSIGNAL | (more ? MSG_MORE : 0));
2318 if (r == 0)
2319 ldout(msgr->cct,10) << "do_sendmsg hmm do_sendmsg got r==0!" << dendl;
2320 if (r < 0) {
2321 r = -errno;
2322 ldout(msgr->cct,1) << "do_sendmsg error " << cpp_strerror(r) << dendl;
2323 return r;
2324 }
2325 if (state == STATE_CLOSED) {
2326 ldout(msgr->cct,10) << "do_sendmsg oh look, state == CLOSED, giving up" << dendl;
2327 return -EINTR; // close enough
2328 }
2329
2330 len -= r;
2331 if (len == 0) break;
2332
2333 // hrmph. trim r bytes off the front of our message.
2334 ldout(msgr->cct,20) << "do_sendmsg short write did " << r << ", still have " << len << dendl;
2335 while (r > 0) {
2336 if (msg->msg_iov[0].iov_len <= (size_t)r) {
2337 // lose this whole item
2338 //ldout(msgr->cct,30) << "skipping " << msg->msg_iov[0].iov_len << ", " << (msg->msg_iovlen-1) << " v, " << r << " left" << dendl;
2339 r -= msg->msg_iov[0].iov_len;
2340 msg->msg_iov++;
2341 msg->msg_iovlen--;
2342 } else {
2343 // partial!
2344 //ldout(msgr->cct,30) << "adjusting " << msg->msg_iov[0].iov_len << ", " << msg->msg_iovlen << " v, " << r << " left" << dendl;
2345 msg->msg_iov[0].iov_base = (char *)msg->msg_iov[0].iov_base + r;
2346 msg->msg_iov[0].iov_len -= r;
2347 break;
2348 }
2349 }
2350 }
2351 return 0;
2352 }
2353
2354
2355 int Pipe::write_ack(uint64_t seq)
2356 {
2357 ldout(msgr->cct,10) << "write_ack " << seq << dendl;
2358
2359 char c = CEPH_MSGR_TAG_ACK;
2360 ceph_le64 s;
2361 s = seq;
2362
2363 struct msghdr msg;
2364 memset(&msg, 0, sizeof(msg));
2365 struct iovec msgvec[2];
2366 msgvec[0].iov_base = &c;
2367 msgvec[0].iov_len = 1;
2368 msgvec[1].iov_base = &s;
2369 msgvec[1].iov_len = sizeof(s);
2370 msg.msg_iov = msgvec;
2371 msg.msg_iovlen = 2;
2372
2373 if (do_sendmsg(&msg, 1 + sizeof(s), true) < 0)
2374 return -1;
2375 return 0;
2376 }
2377
2378 int Pipe::write_keepalive()
2379 {
2380 ldout(msgr->cct,10) << "write_keepalive" << dendl;
2381
2382 char c = CEPH_MSGR_TAG_KEEPALIVE;
2383
2384 struct msghdr msg;
2385 memset(&msg, 0, sizeof(msg));
2386 struct iovec msgvec[2];
2387 msgvec[0].iov_base = &c;
2388 msgvec[0].iov_len = 1;
2389 msg.msg_iov = msgvec;
2390 msg.msg_iovlen = 1;
2391
2392 if (do_sendmsg(&msg, 1) < 0)
2393 return -1;
2394 return 0;
2395 }
2396
2397 int Pipe::write_keepalive2(char tag, const utime_t& t)
2398 {
2399 ldout(msgr->cct,10) << "write_keepalive2 " << (int)tag << " " << t << dendl;
2400 struct ceph_timespec ts;
2401 t.encode_timeval(&ts);
2402 struct msghdr msg;
2403 memset(&msg, 0, sizeof(msg));
2404 struct iovec msgvec[2];
2405 msgvec[0].iov_base = &tag;
2406 msgvec[0].iov_len = 1;
2407 msgvec[1].iov_base = &ts;
2408 msgvec[1].iov_len = sizeof(ts);
2409 msg.msg_iov = msgvec;
2410 msg.msg_iovlen = 2;
2411
2412 if (do_sendmsg(&msg, 1 + sizeof(ts)) < 0)
2413 return -1;
2414 return 0;
2415 }
2416
2417
2418 int Pipe::write_message(const ceph_msg_header& header, const ceph_msg_footer& footer, bufferlist& blist)
2419 {
2420 int ret;
2421
2422 // set up msghdr and iovecs
2423 struct msghdr msg;
2424 memset(&msg, 0, sizeof(msg));
2425 msg.msg_iov = msgvec;
2426 int msglen = 0;
2427
2428 // send tag
2429 char tag = CEPH_MSGR_TAG_MSG;
2430 msgvec[msg.msg_iovlen].iov_base = &tag;
2431 msgvec[msg.msg_iovlen].iov_len = 1;
2432 msglen++;
2433 msg.msg_iovlen++;
2434
2435 // send envelope
2436 msgvec[msg.msg_iovlen].iov_base = (char*)&header;
2437 msgvec[msg.msg_iovlen].iov_len = sizeof(header);
2438 msglen += sizeof(header);
2439 msg.msg_iovlen++;
2440
2441 // payload (front+data)
2442 auto pb = std::cbegin(blist.buffers());
2443 unsigned b_off = 0; // carry-over buffer offset, if any
2444 unsigned bl_pos = 0; // blist pos
2445 unsigned left = blist.length();
2446
2447 while (left > 0) {
2448 unsigned donow = std::min(left, pb->length()-b_off);
2449 if (donow == 0) {
2450 ldout(msgr->cct,0) << "donow = " << donow << " left " << left << " pb->length " << pb->length()
2451 << " b_off " << b_off << dendl;
2452 }
2453 ceph_assert(donow > 0);
2454 ldout(msgr->cct,30) << " bl_pos " << bl_pos << " b_off " << b_off
2455 << " leftinchunk " << left
2456 << " buffer len " << pb->length()
2457 << " writing " << donow
2458 << dendl;
2459
2460 if (msg.msg_iovlen >= SM_IOV_MAX-2) {
2461 if (do_sendmsg(&msg, msglen, true))
2462 goto fail;
2463
2464 // and restart the iov
2465 msg.msg_iov = msgvec;
2466 msg.msg_iovlen = 0;
2467 msglen = 0;
2468 }
2469
2470 msgvec[msg.msg_iovlen].iov_base = (void*)(pb->c_str()+b_off);
2471 msgvec[msg.msg_iovlen].iov_len = donow;
2472 msglen += donow;
2473 msg.msg_iovlen++;
2474
2475 ceph_assert(left >= donow);
2476 left -= donow;
2477 b_off += donow;
2478 bl_pos += donow;
2479 if (left == 0)
2480 break;
2481 while (b_off == pb->length()) {
2482 ++pb;
2483 b_off = 0;
2484 }
2485 }
2486 ceph_assert(left == 0);
2487
2488 // send footer; if receiver doesn't support signatures, use the old footer format
2489
2490 ceph_msg_footer_old old_footer;
2491 if (connection_state->has_feature(CEPH_FEATURE_MSG_AUTH)) {
2492 msgvec[msg.msg_iovlen].iov_base = (void*)&footer;
2493 msgvec[msg.msg_iovlen].iov_len = sizeof(footer);
2494 msglen += sizeof(footer);
2495 msg.msg_iovlen++;
2496 } else {
2497 if (msgr->crcflags & MSG_CRC_HEADER) {
2498 old_footer.front_crc = footer.front_crc;
2499 old_footer.middle_crc = footer.middle_crc;
2500 } else {
2501 old_footer.front_crc = old_footer.middle_crc = 0;
2502 }
2503 old_footer.data_crc = msgr->crcflags & MSG_CRC_DATA ? footer.data_crc : 0;
2504 old_footer.flags = footer.flags;
2505 msgvec[msg.msg_iovlen].iov_base = (char*)&old_footer;
2506 msgvec[msg.msg_iovlen].iov_len = sizeof(old_footer);
2507 msglen += sizeof(old_footer);
2508 msg.msg_iovlen++;
2509 }
2510
2511 // send
2512 if (do_sendmsg(&msg, msglen))
2513 goto fail;
2514
2515 ret = 0;
2516
2517 out:
2518 return ret;
2519
2520 fail:
2521 ret = -1;
2522 goto out;
2523 }
2524
2525
2526 int Pipe::tcp_read(char *buf, unsigned len)
2527 {
2528 if (sd < 0)
2529 return -EINVAL;
2530
2531 while (len > 0) {
2532
2533 if (msgr->cct->_conf->ms_inject_socket_failures && sd >= 0) {
2534 if (rand() % msgr->cct->_conf->ms_inject_socket_failures == 0) {
2535 ldout(msgr->cct, 0) << "injecting socket failure" << dendl;
2536 ::shutdown(sd, SHUT_RDWR);
2537 }
2538 }
2539
2540 if (tcp_read_wait() < 0)
2541 return -1;
2542
2543 ssize_t got = tcp_read_nonblocking(buf, len);
2544
2545 if (got < 0)
2546 return -1;
2547
2548 len -= got;
2549 buf += got;
2550 //lgeneric_dout(cct, DBL) << "tcp_read got " << got << ", " << len << " left" << dendl;
2551 }
2552 return 0;
2553 }
2554
2555 int Pipe::tcp_read_wait()
2556 {
2557 if (sd < 0)
2558 return -EINVAL;
2559 struct pollfd pfd;
2560 short evmask;
2561 pfd.fd = sd;
2562 pfd.events = POLLIN;
2563 #if defined(__linux__)
2564 pfd.events |= POLLRDHUP;
2565 #endif
2566
2567 if (has_pending_data())
2568 return 0;
2569
2570 int r = poll(&pfd, 1, msgr->timeout);
2571 if (r < 0)
2572 return -errno;
2573 if (r == 0)
2574 return -EAGAIN;
2575
2576 evmask = POLLERR | POLLHUP | POLLNVAL;
2577 #if defined(__linux__)
2578 evmask |= POLLRDHUP;
2579 #endif
2580 if (pfd.revents & evmask)
2581 return -1;
2582
2583 if (!(pfd.revents & POLLIN))
2584 return -1;
2585
2586 return 0;
2587 }
2588
2589 ssize_t Pipe::do_recv(char *buf, size_t len, int flags)
2590 {
2591 again:
2592 ssize_t got = ::recv( sd, buf, len, flags );
2593 if (got < 0) {
2594 if (errno == EINTR) {
2595 goto again;
2596 }
2597 ldout(msgr->cct, 10) << __func__ << " socket " << sd << " returned "
2598 << got << " " << cpp_strerror(errno) << dendl;
2599 return -1;
2600 }
2601 if (got == 0) {
2602 return -1;
2603 }
2604 return got;
2605 }
2606
2607 ssize_t Pipe::buffered_recv(char *buf, size_t len, int flags)
2608 {
2609 size_t left = len;
2610 ssize_t total_recv = 0;
2611 if (recv_len > recv_ofs) {
2612 int to_read = std::min(recv_len - recv_ofs, left);
2613 memcpy(buf, &recv_buf[recv_ofs], to_read);
2614 recv_ofs += to_read;
2615 left -= to_read;
2616 if (left == 0) {
2617 return to_read;
2618 }
2619 buf += to_read;
2620 total_recv += to_read;
2621 }
2622
2623 /* nothing left in the prefetch buffer */
2624
2625 if (left > recv_max_prefetch) {
2626 /* this was a large read, we don't prefetch for these */
2627 ssize_t ret = do_recv(buf, left, flags );
2628 if (ret < 0) {
2629 if (total_recv > 0)
2630 return total_recv;
2631 return ret;
2632 }
2633 total_recv += ret;
2634 return total_recv;
2635 }
2636
2637
2638 ssize_t got = do_recv(recv_buf, recv_max_prefetch, flags);
2639 if (got < 0) {
2640 if (total_recv > 0)
2641 return total_recv;
2642
2643 return got;
2644 }
2645
2646 recv_len = (size_t)got;
2647 got = std::min(left, (size_t)got);
2648 memcpy(buf, recv_buf, got);
2649 recv_ofs = got;
2650 total_recv += got;
2651 return total_recv;
2652 }
2653
2654 ssize_t Pipe::tcp_read_nonblocking(char *buf, unsigned len)
2655 {
2656 ssize_t got = buffered_recv(buf, len, MSG_DONTWAIT );
2657 if (got < 0) {
2658 ldout(msgr->cct, 10) << __func__ << " socket " << sd << " returned "
2659 << got << " " << cpp_strerror(errno) << dendl;
2660 return -1;
2661 }
2662 if (got == 0) {
2663 /* poll() said there was data, but we didn't read any - peer
2664 * sent a FIN. Maybe POLLRDHUP signals this, but this is
2665 * standard socket behavior as documented by Stevens.
2666 */
2667 return -1;
2668 }
2669 return got;
2670 }
2671
2672 int Pipe::tcp_write(const char *buf, unsigned len)
2673 {
2674 if (sd < 0)
2675 return -1;
2676 struct pollfd pfd;
2677 pfd.fd = sd;
2678 pfd.events = POLLOUT | POLLHUP | POLLNVAL | POLLERR;
2679 #if defined(__linux__)
2680 pfd.events |= POLLRDHUP;
2681 #endif
2682
2683 if (msgr->cct->_conf->ms_inject_socket_failures && sd >= 0) {
2684 if (rand() % msgr->cct->_conf->ms_inject_socket_failures == 0) {
2685 ldout(msgr->cct, 0) << "injecting socket failure" << dendl;
2686 ::shutdown(sd, SHUT_RDWR);
2687 }
2688 }
2689
2690 if (poll(&pfd, 1, -1) < 0)
2691 return -1;
2692
2693 if (!(pfd.revents & POLLOUT))
2694 return -1;
2695
2696 //lgeneric_dout(cct, DBL) << "tcp_write writing " << len << dendl;
2697 ceph_assert(len > 0);
2698 while (len > 0) {
2699 MSGR_SIGPIPE_STOPPER;
2700 int did = ::send( sd, buf, len, MSG_NOSIGNAL );
2701 if (did < 0) {
2702 //lgeneric_dout(cct, 1) << "tcp_write error did = " << did << " " << cpp_strerror(errno) << dendl;
2703 //lgeneric_derr(cct, 1) << "tcp_write error did = " << did << " " << cpp_strerror(errno) << dendl;
2704 return did;
2705 }
2706 len -= did;
2707 buf += did;
2708 //lgeneric_dout(cct, DBL) << "tcp_write did " << did << ", " << len << " left" << dendl;
2709 }
2710 return 0;
2711 }