]> git.proxmox.com Git - ceph.git/blob - ceph/src/msg/simple/Pipe.cc
import ceph 12.2.12
[ceph.git] / ceph / src / msg / simple / Pipe.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #include <sys/types.h>
16 #include <sys/socket.h>
17 #include <netinet/in.h>
18 #include <netinet/ip.h>
19 #include <netinet/tcp.h>
20 #include <sys/uio.h>
21 #include <limits.h>
22 #include <poll.h>
23
24 #include "msg/Message.h"
25 #include "Pipe.h"
26 #include "SimpleMessenger.h"
27
28 #include "common/debug.h"
29 #include "common/errno.h"
30 #include "common/valgrind.h"
31
32 // Below included to get encode_encrypt(); That probably should be in Crypto.h, instead
33
34 #include "auth/Crypto.h"
35 #include "auth/cephx/CephxProtocol.h"
36 #include "auth/AuthSessionHandler.h"
37
38 #include "include/compat.h"
39 #include "include/sock_compat.h"
40
41 // Constant to limit starting sequence number to 2^31. Nothing special about it, just a big number. PLR
42 #define SEQ_MASK 0x7fffffff
43 #define dout_subsys ceph_subsys_ms
44
45 #undef dout_prefix
46 #define dout_prefix *_dout << *this
47 ostream& Pipe::_pipe_prefix(std::ostream &out) const {
48 return out << "-- " << msgr->get_myinst().addr << " >> " << peer_addr << " pipe(" << this
49 << " sd=" << sd << " :" << port
50 << " s=" << state
51 << " pgs=" << peer_global_seq
52 << " cs=" << connect_seq
53 << " l=" << policy.lossy
54 << " c=" << connection_state
55 << ").";
56 }
57
58 ostream& operator<<(ostream &out, const Pipe &pipe) {
59 return pipe._pipe_prefix(out);
60 }
61
62 /**
63 * The DelayedDelivery is for injecting delays into Message delivery off
64 * the socket. It is only enabled if delays are requested, and if they
65 * are then it pulls Messages off the DelayQueue and puts them into the
66 * in_q (SimpleMessenger::dispatch_queue).
67 * Please note that this probably has issues with Pipe shutdown and
68 * replacement semantics. I've tried, but no guarantees.
69 */
70 class Pipe::DelayedDelivery: public Thread {
71 Pipe *pipe;
72 std::deque< pair<utime_t,Message*> > delay_queue;
73 Mutex delay_lock;
74 Cond delay_cond;
75 int flush_count;
76 bool active_flush;
77 bool stop_delayed_delivery;
78 bool delay_dispatching; // we are in fast dispatch now
79 bool stop_fast_dispatching_flag; // we need to stop fast dispatching
80
81 public:
82 explicit DelayedDelivery(Pipe *p)
83 : pipe(p),
84 delay_lock("Pipe::DelayedDelivery::delay_lock"), flush_count(0),
85 active_flush(false),
86 stop_delayed_delivery(false),
87 delay_dispatching(false),
88 stop_fast_dispatching_flag(false) { }
89 ~DelayedDelivery() override {
90 discard();
91 }
92 void *entry() override;
93 void queue(utime_t release, Message *m) {
94 Mutex::Locker l(delay_lock);
95 delay_queue.push_back(make_pair(release, m));
96 delay_cond.Signal();
97 }
98 void discard();
99 void flush();
100 bool is_flushing() {
101 Mutex::Locker l(delay_lock);
102 return flush_count > 0 || active_flush;
103 }
104 void wait_for_flush() {
105 Mutex::Locker l(delay_lock);
106 while (flush_count > 0 || active_flush)
107 delay_cond.Wait(delay_lock);
108 }
109 void stop() {
110 delay_lock.Lock();
111 stop_delayed_delivery = true;
112 delay_cond.Signal();
113 delay_lock.Unlock();
114 }
115 void steal_for_pipe(Pipe *new_owner) {
116 Mutex::Locker l(delay_lock);
117 pipe = new_owner;
118 }
119 /**
120 * We need to stop fast dispatching before we need to stop putting
121 * normal messages into the DispatchQueue.
122 */
123 void stop_fast_dispatching();
124 };
125
126 /**************************************
127 * Pipe
128 */
129
130 Pipe::Pipe(SimpleMessenger *r, int st, PipeConnection *con)
131 : RefCountedObject(r->cct),
132 reader_thread(this),
133 writer_thread(this),
134 delay_thread(NULL),
135 msgr(r),
136 conn_id(r->dispatch_queue.get_id()),
137 recv_ofs(0),
138 recv_len(0),
139 sd(-1), port(0),
140 peer_type(-1),
141 pipe_lock("SimpleMessenger::Pipe::pipe_lock"),
142 state(st),
143 connection_state(NULL),
144 reader_running(false), reader_needs_join(false),
145 reader_dispatching(false), notify_on_dispatch_done(false),
146 writer_running(false),
147 in_q(&(r->dispatch_queue)),
148 send_keepalive(false),
149 send_keepalive_ack(false),
150 connect_seq(0), peer_global_seq(0),
151 out_seq(0), in_seq(0), in_seq_acked(0) {
152 ANNOTATE_BENIGN_RACE_SIZED(&sd, sizeof(sd), "Pipe socket");
153 ANNOTATE_BENIGN_RACE_SIZED(&state, sizeof(state), "Pipe state");
154 ANNOTATE_BENIGN_RACE_SIZED(&recv_len, sizeof(recv_len), "Pipe recv_len");
155 ANNOTATE_BENIGN_RACE_SIZED(&recv_ofs, sizeof(recv_ofs), "Pipe recv_ofs");
156 if (con) {
157 connection_state = con;
158 connection_state->reset_pipe(this);
159 } else {
160 connection_state = new PipeConnection(msgr->cct, msgr);
161 connection_state->pipe = get();
162 }
163
164 if (randomize_out_seq()) {
165 lsubdout(msgr->cct,ms,15) << "Pipe(): Could not get random bytes to set seq number for session reset; set seq number to " << out_seq << dendl;
166 }
167
168
169 msgr->timeout = msgr->cct->_conf->ms_tcp_read_timeout * 1000; //convert to ms
170 if (msgr->timeout == 0)
171 msgr->timeout = -1;
172
173 recv_max_prefetch = msgr->cct->_conf->ms_tcp_prefetch_max_size;
174 recv_buf = new char[recv_max_prefetch];
175 }
176
177 Pipe::~Pipe()
178 {
179 assert(out_q.empty());
180 assert(sent.empty());
181 delete delay_thread;
182 delete[] recv_buf;
183 }
184
185 void Pipe::handle_ack(uint64_t seq)
186 {
187 lsubdout(msgr->cct, ms, 15) << "reader got ack seq " << seq << dendl;
188 // trim sent list
189 while (!sent.empty() &&
190 sent.front()->get_seq() <= seq) {
191 Message *m = sent.front();
192 sent.pop_front();
193 lsubdout(msgr->cct, ms, 10) << "reader got ack seq "
194 << seq << " >= " << m->get_seq() << " on " << m << " " << *m << dendl;
195 m->put();
196 }
197 }
198
199 void Pipe::start_reader()
200 {
201 assert(pipe_lock.is_locked());
202 assert(!reader_running);
203 if (reader_needs_join) {
204 reader_thread.join();
205 reader_needs_join = false;
206 }
207 reader_running = true;
208 reader_thread.create("ms_pipe_read", msgr->cct->_conf->ms_rwthread_stack_bytes);
209 }
210
211 void Pipe::maybe_start_delay_thread()
212 {
213 if (!delay_thread) {
214 auto pos = msgr->cct->_conf->get_val<std::string>("ms_inject_delay_type").find(ceph_entity_type_name(connection_state->peer_type));
215 if (pos != string::npos) {
216 lsubdout(msgr->cct, ms, 1) << "setting up a delay queue on Pipe " << this << dendl;
217 delay_thread = new DelayedDelivery(this);
218 delay_thread->create("ms_pipe_delay");
219 }
220 }
221 }
222
223 void Pipe::start_writer()
224 {
225 assert(pipe_lock.is_locked());
226 assert(!writer_running);
227 writer_running = true;
228 writer_thread.create("ms_pipe_write", msgr->cct->_conf->ms_rwthread_stack_bytes);
229 }
230
231 void Pipe::join_reader()
232 {
233 if (!reader_running)
234 return;
235 cond.Signal();
236 pipe_lock.Unlock();
237 reader_thread.join();
238 pipe_lock.Lock();
239 reader_needs_join = false;
240 }
241
242 void Pipe::DelayedDelivery::discard()
243 {
244 lgeneric_subdout(pipe->msgr->cct, ms, 20) << *pipe << "DelayedDelivery::discard" << dendl;
245 Mutex::Locker l(delay_lock);
246 while (!delay_queue.empty()) {
247 Message *m = delay_queue.front().second;
248 pipe->in_q->dispatch_throttle_release(m->get_dispatch_throttle_size());
249 m->put();
250 delay_queue.pop_front();
251 }
252 }
253
254 void Pipe::DelayedDelivery::flush()
255 {
256 lgeneric_subdout(pipe->msgr->cct, ms, 20) << *pipe << "DelayedDelivery::flush" << dendl;
257 Mutex::Locker l(delay_lock);
258 flush_count = delay_queue.size();
259 delay_cond.Signal();
260 }
261
262 void *Pipe::DelayedDelivery::entry()
263 {
264 Mutex::Locker locker(delay_lock);
265 lgeneric_subdout(pipe->msgr->cct, ms, 20) << *pipe << "DelayedDelivery::entry start" << dendl;
266
267 while (!stop_delayed_delivery) {
268 if (delay_queue.empty()) {
269 lgeneric_subdout(pipe->msgr->cct, ms, 30) << *pipe << "DelayedDelivery::entry sleeping on delay_cond because delay queue is empty" << dendl;
270 delay_cond.Wait(delay_lock);
271 continue;
272 }
273 utime_t release = delay_queue.front().first;
274 Message *m = delay_queue.front().second;
275 string delay_msg_type = pipe->msgr->cct->_conf->ms_inject_delay_msg_type;
276 if (!flush_count &&
277 (release > ceph_clock_now() &&
278 (delay_msg_type.empty() || m->get_type_name() == delay_msg_type))) {
279 lgeneric_subdout(pipe->msgr->cct, ms, 10) << *pipe << "DelayedDelivery::entry sleeping on delay_cond until " << release << dendl;
280 delay_cond.WaitUntil(delay_lock, release);
281 continue;
282 }
283 lgeneric_subdout(pipe->msgr->cct, ms, 10) << *pipe << "DelayedDelivery::entry dequeuing message " << m << " for delivery, past " << release << dendl;
284 delay_queue.pop_front();
285 if (flush_count > 0) {
286 --flush_count;
287 active_flush = true;
288 }
289 if (pipe->in_q->can_fast_dispatch(m)) {
290 if (!stop_fast_dispatching_flag) {
291 delay_dispatching = true;
292 delay_lock.Unlock();
293 pipe->in_q->fast_dispatch(m);
294 delay_lock.Lock();
295 delay_dispatching = false;
296 if (stop_fast_dispatching_flag) {
297 // we need to let the stopping thread proceed
298 delay_cond.Signal();
299 delay_lock.Unlock();
300 delay_lock.Lock();
301 }
302 }
303 } else {
304 pipe->in_q->enqueue(m, m->get_priority(), pipe->conn_id);
305 }
306 active_flush = false;
307 }
308 lgeneric_subdout(pipe->msgr->cct, ms, 20) << *pipe << "DelayedDelivery::entry stop" << dendl;
309 return NULL;
310 }
311
312 void Pipe::DelayedDelivery::stop_fast_dispatching() {
313 Mutex::Locker l(delay_lock);
314 stop_fast_dispatching_flag = true;
315 while (delay_dispatching)
316 delay_cond.Wait(delay_lock);
317 }
318
319
320 int Pipe::accept()
321 {
322 ldout(msgr->cct,10) << "accept" << dendl;
323 assert(pipe_lock.is_locked());
324 assert(state == STATE_ACCEPTING);
325
326 pipe_lock.Unlock();
327
328 // vars
329 bufferlist addrs;
330 entity_addr_t socket_addr;
331 socklen_t len;
332 int r;
333 char banner[strlen(CEPH_BANNER)+1];
334 bufferlist addrbl;
335 ceph_msg_connect connect;
336 ceph_msg_connect_reply reply;
337 Pipe *existing = 0;
338 bufferptr bp;
339 bufferlist authorizer, authorizer_reply;
340 bool authorizer_valid;
341 uint64_t feat_missing;
342 bool replaced = false;
343 // this variable denotes if the connection attempt from peer is a hard
344 // reset or not, it is true if there is an existing connection and the
345 // connection sequence from peer is equal to zero
346 bool is_reset_from_peer = false;
347 CryptoKey session_key;
348 int removed; // single-use down below
349
350 // this should roughly mirror pseudocode at
351 // http://ceph.com/wiki/Messaging_protocol
352 int reply_tag = 0;
353 uint64_t existing_seq = -1;
354
355 // used for reading in the remote acked seq on connect
356 uint64_t newly_acked_seq = 0;
357
358 bool need_challenge = false;
359 bool had_challenge = false;
360 std::unique_ptr<AuthAuthorizerChallenge> authorizer_challenge;
361
362 recv_reset();
363
364 set_socket_options();
365
366 // announce myself.
367 r = tcp_write(CEPH_BANNER, strlen(CEPH_BANNER));
368 if (r < 0) {
369 ldout(msgr->cct,10) << "accept couldn't write banner" << dendl;
370 goto fail_unlocked;
371 }
372
373 // and my addr
374 ::encode(msgr->my_inst.addr, addrs, 0); // legacy
375
376 port = msgr->my_inst.addr.get_port();
377
378 // and peer's socket addr (they might not know their ip)
379 sockaddr_storage ss;
380 len = sizeof(ss);
381 r = ::getpeername(sd, (sockaddr*)&ss, &len);
382 if (r < 0) {
383 ldout(msgr->cct,0) << "accept failed to getpeername " << cpp_strerror(errno) << dendl;
384 goto fail_unlocked;
385 }
386 socket_addr.set_sockaddr((sockaddr*)&ss);
387 ::encode(socket_addr, addrs, 0); // legacy
388
389 r = tcp_write(addrs.c_str(), addrs.length());
390 if (r < 0) {
391 ldout(msgr->cct,10) << "accept couldn't write my+peer addr" << dendl;
392 goto fail_unlocked;
393 }
394
395 ldout(msgr->cct,1) << "accept sd=" << sd << " " << socket_addr << dendl;
396
397 // identify peer
398 if (tcp_read(banner, strlen(CEPH_BANNER)) < 0) {
399 ldout(msgr->cct,10) << "accept couldn't read banner" << dendl;
400 goto fail_unlocked;
401 }
402 if (memcmp(banner, CEPH_BANNER, strlen(CEPH_BANNER))) {
403 banner[strlen(CEPH_BANNER)] = 0;
404 ldout(msgr->cct,1) << "accept peer sent bad banner '" << banner << "' (should be '" << CEPH_BANNER << "')" << dendl;
405 goto fail_unlocked;
406 }
407 {
408 bufferptr tp(sizeof(ceph_entity_addr));
409 addrbl.push_back(std::move(tp));
410 }
411 if (tcp_read(addrbl.c_str(), addrbl.length()) < 0) {
412 ldout(msgr->cct,10) << "accept couldn't read peer_addr" << dendl;
413 goto fail_unlocked;
414 }
415 try {
416 bufferlist::iterator ti = addrbl.begin();
417 ::decode(peer_addr, ti);
418 } catch (const buffer::error& e) {
419 ldout(msgr->cct,2) << __func__ << " decode peer_addr failed: " << e.what()
420 << dendl;
421 goto fail_unlocked;
422 }
423
424 ldout(msgr->cct,10) << "accept peer addr is " << peer_addr << dendl;
425 if (peer_addr.is_blank_ip()) {
426 // peer apparently doesn't know what ip they have; figure it out for them.
427 int port = peer_addr.get_port();
428 peer_addr.u = socket_addr.u;
429 peer_addr.set_port(port);
430 ldout(msgr->cct,0) << "accept peer addr is really " << peer_addr
431 << " (socket is " << socket_addr << ")" << dendl;
432 }
433 set_peer_addr(peer_addr); // so that connection_state gets set up
434
435 while (1) {
436 if (tcp_read((char*)&connect, sizeof(connect)) < 0) {
437 ldout(msgr->cct,10) << "accept couldn't read connect" << dendl;
438 goto fail_unlocked;
439 }
440
441 authorizer.clear();
442 if (connect.authorizer_len) {
443 bp = buffer::create(connect.authorizer_len);
444 if (tcp_read(bp.c_str(), connect.authorizer_len) < 0) {
445 ldout(msgr->cct,10) << "accept couldn't read connect authorizer" << dendl;
446 goto fail_unlocked;
447 }
448 authorizer.push_back(std::move(bp));
449 authorizer_reply.clear();
450 }
451
452 ldout(msgr->cct,20) << "accept got peer connect_seq " << connect.connect_seq
453 << " global_seq " << connect.global_seq
454 << dendl;
455
456 msgr->lock.Lock(); // FIXME
457 pipe_lock.Lock();
458 if (msgr->dispatch_queue.stop)
459 goto shutting_down;
460 if (state != STATE_ACCEPTING) {
461 goto shutting_down;
462 }
463
464 // note peer's type, flags
465 set_peer_type(connect.host_type);
466 policy = msgr->get_policy(connect.host_type);
467 ldout(msgr->cct,10) << "accept of host_type " << connect.host_type
468 << ", policy.lossy=" << policy.lossy
469 << " policy.server=" << policy.server
470 << " policy.standby=" << policy.standby
471 << " policy.resetcheck=" << policy.resetcheck
472 << dendl;
473
474 memset(&reply, 0, sizeof(reply));
475 reply.protocol_version = msgr->get_proto_version(peer_type, false);
476 msgr->lock.Unlock();
477
478 // mismatch?
479 ldout(msgr->cct,10) << "accept my proto " << reply.protocol_version
480 << ", their proto " << connect.protocol_version << dendl;
481 if (connect.protocol_version != reply.protocol_version) {
482 reply.tag = CEPH_MSGR_TAG_BADPROTOVER;
483 goto reply;
484 }
485
486 // require signatures for cephx?
487 if (connect.authorizer_protocol == CEPH_AUTH_CEPHX) {
488 if (peer_type == CEPH_ENTITY_TYPE_OSD ||
489 peer_type == CEPH_ENTITY_TYPE_MDS ||
490 peer_type == CEPH_ENTITY_TYPE_MGR) {
491 if (msgr->cct->_conf->cephx_require_signatures ||
492 msgr->cct->_conf->cephx_cluster_require_signatures) {
493 ldout(msgr->cct,10) << "using cephx, requiring MSG_AUTH feature bit for cluster" << dendl;
494 policy.features_required |= CEPH_FEATURE_MSG_AUTH;
495 }
496 if (msgr->cct->_conf->cephx_require_version >= 2 ||
497 msgr->cct->_conf->cephx_cluster_require_version >= 2) {
498 ldout(msgr->cct,10) << "using cephx, requiring cephx v2 feature bit for cluster" << dendl;
499 policy.features_required |= CEPH_FEATUREMASK_CEPHX_V2;
500 }
501 } else {
502 if (msgr->cct->_conf->cephx_require_signatures ||
503 msgr->cct->_conf->cephx_service_require_signatures) {
504 ldout(msgr->cct,10) << "using cephx, requiring MSG_AUTH feature bit for service" << dendl;
505 policy.features_required |= CEPH_FEATURE_MSG_AUTH;
506 }
507 if (msgr->cct->_conf->cephx_require_version >= 2 ||
508 msgr->cct->_conf->cephx_service_require_version >= 2) {
509 ldout(msgr->cct,10) << "using cephx, requiring cephx v2 feature bit for cluster" << dendl;
510 policy.features_required |= CEPH_FEATUREMASK_CEPHX_V2;
511 }
512 }
513 }
514
515 feat_missing = policy.features_required & ~(uint64_t)connect.features;
516 if (feat_missing) {
517 ldout(msgr->cct,1) << "peer missing required features " << std::hex << feat_missing << std::dec << dendl;
518 reply.tag = CEPH_MSGR_TAG_FEATURES;
519 goto reply;
520 }
521
522 // Check the authorizer. If not good, bail out.
523
524 pipe_lock.Unlock();
525
526 need_challenge = HAVE_FEATURE(connect.features, CEPHX_V2);
527 had_challenge = (bool)authorizer_challenge;
528 authorizer_reply.clear();
529 if (!msgr->verify_authorizer(
530 connection_state.get(), peer_type, connect.authorizer_protocol, authorizer,
531 authorizer_reply, authorizer_valid, session_key,
532 need_challenge ? &authorizer_challenge : nullptr) ||
533 !authorizer_valid) {
534 pipe_lock.Lock();
535 if (state != STATE_ACCEPTING)
536 goto shutting_down_msgr_unlocked;
537 if (!had_challenge && need_challenge && authorizer_challenge) {
538 ldout(msgr->cct,10) << "accept: challenging authorizer "
539 << authorizer_reply.length()
540 << " bytes" << dendl;
541 assert(authorizer_reply.length());
542 reply.tag = CEPH_MSGR_TAG_CHALLENGE_AUTHORIZER;
543 } else {
544 ldout(msgr->cct,0) << "accept: got bad authorizer" << dendl;
545 reply.tag = CEPH_MSGR_TAG_BADAUTHORIZER;
546 }
547 session_security.reset();
548 goto reply;
549 }
550
551 // We've verified the authorizer for this pipe, so set up the session security structure. PLR
552
553 ldout(msgr->cct,10) << "accept: setting up session_security." << dendl;
554
555 retry_existing_lookup:
556 msgr->lock.Lock();
557 pipe_lock.Lock();
558 if (msgr->dispatch_queue.stop)
559 goto shutting_down;
560 if (state != STATE_ACCEPTING)
561 goto shutting_down;
562
563 // existing?
564 existing = msgr->_lookup_pipe(peer_addr);
565 if (existing) {
566 existing->pipe_lock.Lock(true); // skip lockdep check (we are locking a second Pipe here)
567 if (existing->reader_dispatching) {
568 /** we need to wait, or we can deadlock if downstream
569 * fast_dispatchers are (naughtily!) waiting on resources
570 * held by somebody trying to make use of the SimpleMessenger lock.
571 * So drop locks, wait, and retry. It just looks like a slow network
572 * to everybody else.
573 *
574 * We take a ref to existing here since it might get reaped before we
575 * wake up (see bug #15870). We can be confident that it lived until
576 * locked it since we held the msgr lock from _lookup_pipe through to
577 * locking existing->lock and checking reader_dispatching.
578 */
579 existing->get();
580 pipe_lock.Unlock();
581 msgr->lock.Unlock();
582 existing->notify_on_dispatch_done = true;
583 while (existing->reader_dispatching)
584 existing->cond.Wait(existing->pipe_lock);
585 existing->pipe_lock.Unlock();
586 existing->put();
587 existing = nullptr;
588 goto retry_existing_lookup;
589 }
590
591 if (connect.global_seq < existing->peer_global_seq) {
592 ldout(msgr->cct,10) << "accept existing " << existing << ".gseq " << existing->peer_global_seq
593 << " > " << connect.global_seq << ", RETRY_GLOBAL" << dendl;
594 reply.tag = CEPH_MSGR_TAG_RETRY_GLOBAL;
595 reply.global_seq = existing->peer_global_seq; // so we can send it below..
596 existing->pipe_lock.Unlock();
597 msgr->lock.Unlock();
598 goto reply;
599 } else {
600 ldout(msgr->cct,10) << "accept existing " << existing << ".gseq " << existing->peer_global_seq
601 << " <= " << connect.global_seq << ", looks ok" << dendl;
602 }
603
604 if (existing->policy.lossy) {
605 ldout(msgr->cct,0) << "accept replacing existing (lossy) channel (new one lossy="
606 << policy.lossy << ")" << dendl;
607 existing->was_session_reset();
608 goto replace;
609 }
610
611 ldout(msgr->cct,0) << "accept connect_seq " << connect.connect_seq
612 << " vs existing " << existing->connect_seq
613 << " state " << existing->get_state_name() << dendl;
614
615 if (connect.connect_seq == 0 && existing->connect_seq > 0) {
616 ldout(msgr->cct,0) << "accept peer reset, then tried to connect to us, replacing" << dendl;
617 // this is a hard reset from peer
618 is_reset_from_peer = true;
619 if (policy.resetcheck)
620 existing->was_session_reset(); // this resets out_queue, msg_ and connect_seq #'s
621 goto replace;
622 }
623
624 if (connect.connect_seq < existing->connect_seq) {
625 // old attempt, or we sent READY but they didn't get it.
626 ldout(msgr->cct,10) << "accept existing " << existing << ".cseq " << existing->connect_seq
627 << " > " << connect.connect_seq << ", RETRY_SESSION" << dendl;
628 goto retry_session;
629 }
630
631 if (connect.connect_seq == existing->connect_seq) {
632 // if the existing connection successfully opened, and/or
633 // subsequently went to standby, then the peer should bump
634 // their connect_seq and retry: this is not a connection race
635 // we need to resolve here.
636 if (existing->state == STATE_OPEN ||
637 existing->state == STATE_STANDBY) {
638 ldout(msgr->cct,10) << "accept connection race, existing " << existing
639 << ".cseq " << existing->connect_seq
640 << " == " << connect.connect_seq
641 << ", OPEN|STANDBY, RETRY_SESSION" << dendl;
642 goto retry_session;
643 }
644
645 // connection race?
646 if (peer_addr < msgr->my_inst.addr ||
647 existing->policy.server) {
648 // incoming wins
649 ldout(msgr->cct,10) << "accept connection race, existing " << existing << ".cseq " << existing->connect_seq
650 << " == " << connect.connect_seq << ", or we are server, replacing my attempt" << dendl;
651 if (!(existing->state == STATE_CONNECTING ||
652 existing->state == STATE_WAIT))
653 lderr(msgr->cct) << "accept race bad state, would replace, existing="
654 << existing->get_state_name()
655 << " " << existing << ".cseq=" << existing->connect_seq
656 << " == " << connect.connect_seq
657 << dendl;
658 assert(existing->state == STATE_CONNECTING ||
659 existing->state == STATE_WAIT);
660 goto replace;
661 } else {
662 // our existing outgoing wins
663 ldout(msgr->cct,10) << "accept connection race, existing " << existing << ".cseq " << existing->connect_seq
664 << " == " << connect.connect_seq << ", sending WAIT" << dendl;
665 assert(peer_addr > msgr->my_inst.addr);
666 if (!(existing->state == STATE_CONNECTING))
667 lderr(msgr->cct) << "accept race bad state, would send wait, existing="
668 << existing->get_state_name()
669 << " " << existing << ".cseq=" << existing->connect_seq
670 << " == " << connect.connect_seq
671 << dendl;
672 assert(existing->state == STATE_CONNECTING);
673 // make sure our outgoing connection will follow through
674 existing->_send_keepalive();
675 reply.tag = CEPH_MSGR_TAG_WAIT;
676 existing->pipe_lock.Unlock();
677 msgr->lock.Unlock();
678 goto reply;
679 }
680 }
681
682 assert(connect.connect_seq > existing->connect_seq);
683 assert(connect.global_seq >= existing->peer_global_seq);
684 if (policy.resetcheck && // RESETSESSION only used by servers; peers do not reset each other
685 existing->connect_seq == 0) {
686 ldout(msgr->cct,0) << "accept we reset (peer sent cseq " << connect.connect_seq
687 << ", " << existing << ".cseq = " << existing->connect_seq
688 << "), sending RESETSESSION" << dendl;
689 reply.tag = CEPH_MSGR_TAG_RESETSESSION;
690 msgr->lock.Unlock();
691 existing->pipe_lock.Unlock();
692 goto reply;
693 }
694
695 // reconnect
696 ldout(msgr->cct,10) << "accept peer sent cseq " << connect.connect_seq
697 << " > " << existing->connect_seq << dendl;
698 goto replace;
699 } // existing
700 else if (connect.connect_seq > 0) {
701 // we reset, and they are opening a new session
702 ldout(msgr->cct,0) << "accept we reset (peer sent cseq " << connect.connect_seq << "), sending RESETSESSION" << dendl;
703 msgr->lock.Unlock();
704 reply.tag = CEPH_MSGR_TAG_RESETSESSION;
705 goto reply;
706 } else {
707 // new session
708 ldout(msgr->cct,10) << "accept new session" << dendl;
709 existing = NULL;
710 goto open;
711 }
712 ceph_abort();
713
714 retry_session:
715 assert(existing->pipe_lock.is_locked());
716 assert(pipe_lock.is_locked());
717 reply.tag = CEPH_MSGR_TAG_RETRY_SESSION;
718 reply.connect_seq = existing->connect_seq + 1;
719 existing->pipe_lock.Unlock();
720 msgr->lock.Unlock();
721 goto reply;
722
723 reply:
724 assert(pipe_lock.is_locked());
725 reply.features = ((uint64_t)connect.features & policy.features_supported) | policy.features_required;
726 reply.authorizer_len = authorizer_reply.length();
727 pipe_lock.Unlock();
728 r = tcp_write((char*)&reply, sizeof(reply));
729 if (r < 0)
730 goto fail_unlocked;
731 if (reply.authorizer_len) {
732 r = tcp_write(authorizer_reply.c_str(), authorizer_reply.length());
733 if (r < 0)
734 goto fail_unlocked;
735 }
736 }
737
738 replace:
739 assert(existing->pipe_lock.is_locked());
740 assert(pipe_lock.is_locked());
741 // if it is a hard reset from peer, we don't need a round-trip to negotiate in/out sequence
742 if ((connect.features & CEPH_FEATURE_RECONNECT_SEQ) && !is_reset_from_peer) {
743 reply_tag = CEPH_MSGR_TAG_SEQ;
744 existing_seq = existing->in_seq;
745 }
746 ldout(msgr->cct,10) << "accept replacing " << existing << dendl;
747 existing->stop();
748 existing->unregister_pipe();
749 replaced = true;
750
751 if (existing->policy.lossy) {
752 // disconnect from the Connection
753 assert(existing->connection_state);
754 if (existing->connection_state->clear_pipe(existing))
755 msgr->dispatch_queue.queue_reset(existing->connection_state.get());
756 } else {
757 // queue a reset on the new connection, which we're dumping for the old
758 msgr->dispatch_queue.queue_reset(connection_state.get());
759
760 // drop my Connection, and take a ref to the existing one. do not
761 // clear existing->connection_state, since read_message and
762 // write_message both dereference it without pipe_lock.
763 connection_state = existing->connection_state;
764
765 // make existing Connection reference us
766 connection_state->reset_pipe(this);
767
768 if (existing->delay_thread) {
769 existing->delay_thread->steal_for_pipe(this);
770 delay_thread = existing->delay_thread;
771 existing->delay_thread = NULL;
772 delay_thread->flush();
773 }
774
775 // steal incoming queue
776 uint64_t replaced_conn_id = conn_id;
777 conn_id = existing->conn_id;
778 existing->conn_id = replaced_conn_id;
779
780 // reset the in_seq if this is a hard reset from peer,
781 // otherwise we respect our original connection's value
782 in_seq = is_reset_from_peer ? 0 : existing->in_seq;
783 in_seq_acked = in_seq;
784
785 // steal outgoing queue and out_seq
786 existing->requeue_sent();
787 out_seq = existing->out_seq;
788 ldout(msgr->cct,10) << "accept re-queuing on out_seq " << out_seq << " in_seq " << in_seq << dendl;
789 for (map<int, list<Message*> >::iterator p = existing->out_q.begin();
790 p != existing->out_q.end();
791 ++p)
792 out_q[p->first].splice(out_q[p->first].begin(), p->second);
793 }
794 existing->stop_and_wait();
795 existing->pipe_lock.Unlock();
796
797 open:
798 // open
799 assert(pipe_lock.is_locked());
800 connect_seq = connect.connect_seq + 1;
801 peer_global_seq = connect.global_seq;
802 assert(state == STATE_ACCEPTING);
803 state = STATE_OPEN;
804 ldout(msgr->cct,10) << "accept success, connect_seq = " << connect_seq << ", sending READY" << dendl;
805
806 // send READY reply
807 reply.tag = (reply_tag ? reply_tag : CEPH_MSGR_TAG_READY);
808 reply.features = policy.features_supported;
809 reply.global_seq = msgr->get_global_seq();
810 reply.connect_seq = connect_seq;
811 reply.flags = 0;
812 reply.authorizer_len = authorizer_reply.length();
813 if (policy.lossy)
814 reply.flags = reply.flags | CEPH_MSG_CONNECT_LOSSY;
815
816 connection_state->set_features((uint64_t)reply.features & (uint64_t)connect.features);
817 ldout(msgr->cct,10) << "accept features " << connection_state->get_features() << dendl;
818
819 session_security.reset(
820 get_auth_session_handler(msgr->cct,
821 connect.authorizer_protocol,
822 session_key,
823 connection_state->get_features()));
824
825 // notify
826 msgr->dispatch_queue.queue_accept(connection_state.get());
827 msgr->ms_deliver_handle_fast_accept(connection_state.get());
828
829 // ok!
830 if (msgr->dispatch_queue.stop)
831 goto shutting_down;
832 removed = msgr->accepting_pipes.erase(this);
833 assert(removed == 1);
834 register_pipe();
835 msgr->lock.Unlock();
836 pipe_lock.Unlock();
837
838 r = tcp_write((char*)&reply, sizeof(reply));
839 if (r < 0) {
840 goto fail_registered;
841 }
842
843 if (reply.authorizer_len) {
844 r = tcp_write(authorizer_reply.c_str(), authorizer_reply.length());
845 if (r < 0) {
846 goto fail_registered;
847 }
848 }
849
850 if (reply_tag == CEPH_MSGR_TAG_SEQ) {
851 if (tcp_write((char*)&existing_seq, sizeof(existing_seq)) < 0) {
852 ldout(msgr->cct,2) << "accept write error on in_seq" << dendl;
853 goto fail_registered;
854 }
855 if (tcp_read((char*)&newly_acked_seq, sizeof(newly_acked_seq)) < 0) {
856 ldout(msgr->cct,2) << "accept read error on newly_acked_seq" << dendl;
857 goto fail_registered;
858 }
859 }
860
861 pipe_lock.Lock();
862 discard_requeued_up_to(newly_acked_seq);
863 if (state != STATE_CLOSED) {
864 ldout(msgr->cct,10) << "accept starting writer, state " << get_state_name() << dendl;
865 start_writer();
866 }
867 ldout(msgr->cct,20) << "accept done" << dendl;
868
869 maybe_start_delay_thread();
870
871 return 0; // success.
872
873 fail_registered:
874 ldout(msgr->cct, 10) << "accept fault after register" << dendl;
875
876 if (msgr->cct->_conf->ms_inject_internal_delays) {
877 ldout(msgr->cct, 10) << " sleep for " << msgr->cct->_conf->ms_inject_internal_delays << dendl;
878 utime_t t;
879 t.set_from_double(msgr->cct->_conf->ms_inject_internal_delays);
880 t.sleep();
881 }
882
883 fail_unlocked:
884 pipe_lock.Lock();
885 if (state != STATE_CLOSED) {
886 bool queued = is_queued();
887 ldout(msgr->cct, 10) << " queued = " << (int)queued << dendl;
888 if (queued) {
889 state = policy.server ? STATE_STANDBY : STATE_CONNECTING;
890 } else if (replaced) {
891 state = STATE_STANDBY;
892 } else {
893 state = STATE_CLOSED;
894 state_closed = true;
895 }
896 fault();
897 if (queued || replaced)
898 start_writer();
899 }
900 return -1;
901
902 shutting_down:
903 msgr->lock.Unlock();
904 shutting_down_msgr_unlocked:
905 assert(pipe_lock.is_locked());
906
907 if (msgr->cct->_conf->ms_inject_internal_delays) {
908 ldout(msgr->cct, 10) << " sleep for " << msgr->cct->_conf->ms_inject_internal_delays << dendl;
909 utime_t t;
910 t.set_from_double(msgr->cct->_conf->ms_inject_internal_delays);
911 t.sleep();
912 }
913
914 state = STATE_CLOSED;
915 state_closed = true;
916 fault();
917 return -1;
918 }
919
920 void Pipe::set_socket_options()
921 {
922 // disable Nagle algorithm?
923 if (msgr->cct->_conf->ms_tcp_nodelay) {
924 int flag = 1;
925 int r = ::setsockopt(sd, IPPROTO_TCP, TCP_NODELAY, (char*)&flag, sizeof(flag));
926 if (r < 0) {
927 r = -errno;
928 ldout(msgr->cct,0) << "couldn't set TCP_NODELAY: "
929 << cpp_strerror(r) << dendl;
930 }
931 }
932 if (msgr->cct->_conf->ms_tcp_rcvbuf) {
933 int size = msgr->cct->_conf->ms_tcp_rcvbuf;
934 int r = ::setsockopt(sd, SOL_SOCKET, SO_RCVBUF, (void*)&size, sizeof(size));
935 if (r < 0) {
936 r = -errno;
937 ldout(msgr->cct,0) << "couldn't set SO_RCVBUF to " << size
938 << ": " << cpp_strerror(r) << dendl;
939 }
940 }
941
942 // block ESIGPIPE
943 #ifdef CEPH_USE_SO_NOSIGPIPE
944 int val = 1;
945 int r = ::setsockopt(sd, SOL_SOCKET, SO_NOSIGPIPE, (void*)&val, sizeof(val));
946 if (r) {
947 r = -errno;
948 ldout(msgr->cct,0) << "couldn't set SO_NOSIGPIPE: "
949 << cpp_strerror(r) << dendl;
950 }
951 #endif
952
953 #ifdef SO_PRIORITY
954 int prio = msgr->get_socket_priority();
955 if (prio >= 0) {
956 int r = -1;
957 #ifdef IPTOS_CLASS_CS6
958 int iptos = IPTOS_CLASS_CS6;
959 int addr_family = 0;
960 if (!peer_addr.is_blank_ip()) {
961 addr_family = peer_addr.get_family();
962 } else {
963 addr_family = msgr->get_myaddr().get_family();
964 }
965 switch (addr_family) {
966 case AF_INET:
967 r = ::setsockopt(sd, IPPROTO_IP, IP_TOS, &iptos, sizeof(iptos));
968 break;
969 case AF_INET6:
970 r = ::setsockopt(sd, IPPROTO_IPV6, IPV6_TCLASS, &iptos, sizeof(iptos));
971 break;
972 default:
973 lderr(msgr->cct) << "couldn't set ToS of unknown family ("
974 << addr_family << ")"
975 << " to " << iptos << dendl;
976 return;
977 }
978 if (r < 0) {
979 r = -errno;
980 ldout(msgr->cct,0) << "couldn't set TOS to " << iptos
981 << ": " << cpp_strerror(r) << dendl;
982 }
983 #endif
984 // setsockopt(IPTOS_CLASS_CS6) sets the priority of the socket as 0.
985 // See http://goo.gl/QWhvsD and http://goo.gl/laTbjT
986 // We need to call setsockopt(SO_PRIORITY) after it.
987 r = ::setsockopt(sd, SOL_SOCKET, SO_PRIORITY, &prio, sizeof(prio));
988 if (r < 0) {
989 r = -errno;
990 ldout(msgr->cct,0) << "couldn't set SO_PRIORITY to " << prio
991 << ": " << cpp_strerror(r) << dendl;
992 }
993 }
994 #endif
995 }
996
997 int Pipe::connect()
998 {
999 bool got_bad_auth = false;
1000
1001 ldout(msgr->cct,10) << "connect " << connect_seq << dendl;
1002 assert(pipe_lock.is_locked());
1003
1004 __u32 cseq = connect_seq;
1005 __u32 gseq = msgr->get_global_seq();
1006
1007 // stop reader thread
1008 join_reader();
1009
1010 pipe_lock.Unlock();
1011
1012 char tag = -1;
1013 int rc = -1;
1014 struct msghdr msg;
1015 struct iovec msgvec[2];
1016 int msglen;
1017 char banner[strlen(CEPH_BANNER) + 1]; // extra byte makes coverity happy
1018 entity_addr_t paddr;
1019 entity_addr_t peer_addr_for_me, socket_addr;
1020 AuthAuthorizer *authorizer = NULL;
1021 bufferlist addrbl, myaddrbl;
1022 const md_config_t *conf = msgr->cct->_conf;
1023
1024 // close old socket. this is safe because we stopped the reader thread above.
1025 if (sd >= 0)
1026 ::close(sd);
1027
1028 // create socket?
1029 sd = socket_cloexec(peer_addr.get_family(), SOCK_STREAM, 0);
1030 if (sd < 0) {
1031 int e = errno;
1032 lderr(msgr->cct) << "connect couldn't create socket " << cpp_strerror(e) << dendl;
1033 rc = -e;
1034 goto fail;
1035 }
1036
1037 recv_reset();
1038
1039 set_socket_options();
1040
1041 {
1042 entity_addr_t addr2bind = msgr->get_myaddr();
1043 if (msgr->cct->_conf->ms_bind_before_connect && (!addr2bind.is_blank_ip())) {
1044 addr2bind.set_port(0);
1045 int r = ::bind(sd , addr2bind.get_sockaddr(), addr2bind.get_sockaddr_len());
1046 if (r < 0) {
1047 ldout(msgr->cct,2) << "client bind error " << ", " << cpp_strerror(errno) << dendl;
1048 goto fail;
1049 }
1050 }
1051 }
1052
1053 // connect!
1054 ldout(msgr->cct,10) << "connecting to " << peer_addr << dendl;
1055 rc = ::connect(sd, peer_addr.get_sockaddr(), peer_addr.get_sockaddr_len());
1056 if (rc < 0) {
1057 int stored_errno = errno;
1058 ldout(msgr->cct,2) << "connect error " << peer_addr
1059 << ", " << cpp_strerror(stored_errno) << dendl;
1060 if (stored_errno == ECONNREFUSED) {
1061 ldout(msgr->cct, 2) << "connection refused!" << dendl;
1062 msgr->dispatch_queue.queue_refused(connection_state.get());
1063 }
1064 goto fail;
1065 }
1066
1067 // verify banner
1068 // FIXME: this should be non-blocking, or in some other way verify the banner as we get it.
1069 rc = tcp_read((char*)&banner, strlen(CEPH_BANNER));
1070 if (rc < 0) {
1071 ldout(msgr->cct,2) << "connect couldn't read banner, " << cpp_strerror(rc) << dendl;
1072 goto fail;
1073 }
1074 if (memcmp(banner, CEPH_BANNER, strlen(CEPH_BANNER))) {
1075 ldout(msgr->cct,0) << "connect protocol error (bad banner) on peer " << peer_addr << dendl;
1076 goto fail;
1077 }
1078
1079 memset(&msg, 0, sizeof(msg));
1080 msgvec[0].iov_base = banner;
1081 msgvec[0].iov_len = strlen(CEPH_BANNER);
1082 msg.msg_iov = msgvec;
1083 msg.msg_iovlen = 1;
1084 msglen = msgvec[0].iov_len;
1085 rc = do_sendmsg(&msg, msglen);
1086 if (rc < 0) {
1087 ldout(msgr->cct,2) << "connect couldn't write my banner, " << cpp_strerror(rc) << dendl;
1088 goto fail;
1089 }
1090
1091 // identify peer
1092 {
1093 #if defined(__linux__) || defined(DARWIN) || defined(__FreeBSD__)
1094 bufferptr p(sizeof(ceph_entity_addr) * 2);
1095 #else
1096 int wirelen = sizeof(__u32) * 2 + sizeof(ceph_sockaddr_storage);
1097 bufferptr p(wirelen * 2);
1098 #endif
1099 addrbl.push_back(std::move(p));
1100 }
1101 rc = tcp_read(addrbl.c_str(), addrbl.length());
1102 if (rc < 0) {
1103 ldout(msgr->cct,2) << "connect couldn't read peer addrs, " << cpp_strerror(rc) << dendl;
1104 goto fail;
1105 }
1106 try {
1107 bufferlist::iterator p = addrbl.begin();
1108 ::decode(paddr, p);
1109 ::decode(peer_addr_for_me, p);
1110 }
1111 catch (buffer::error& e) {
1112 ldout(msgr->cct,2) << "connect couldn't decode peer addrs: " << e.what()
1113 << dendl;
1114 goto fail;
1115 }
1116 port = peer_addr_for_me.get_port();
1117
1118 ldout(msgr->cct,20) << "connect read peer addr " << paddr << " on socket " << sd << dendl;
1119 if (peer_addr != paddr) {
1120 if (paddr.is_blank_ip() &&
1121 peer_addr.get_port() == paddr.get_port() &&
1122 peer_addr.get_nonce() == paddr.get_nonce()) {
1123 ldout(msgr->cct,0) << "connect claims to be "
1124 << paddr << " not " << peer_addr << " - presumably this is the same node!" << dendl;
1125 } else {
1126 ldout(msgr->cct,10) << "connect claims to be "
1127 << paddr << " not " << peer_addr << dendl;
1128 goto fail;
1129 }
1130 }
1131
1132 ldout(msgr->cct,20) << "connect peer addr for me is " << peer_addr_for_me << dendl;
1133
1134 msgr->learned_addr(peer_addr_for_me);
1135
1136 ::encode(msgr->my_inst.addr, myaddrbl, 0); // legacy
1137
1138 memset(&msg, 0, sizeof(msg));
1139 msgvec[0].iov_base = myaddrbl.c_str();
1140 msgvec[0].iov_len = myaddrbl.length();
1141 msg.msg_iov = msgvec;
1142 msg.msg_iovlen = 1;
1143 msglen = msgvec[0].iov_len;
1144 rc = do_sendmsg(&msg, msglen);
1145 if (rc < 0) {
1146 ldout(msgr->cct,2) << "connect couldn't write my addr, " << cpp_strerror(rc) << dendl;
1147 goto fail;
1148 }
1149 ldout(msgr->cct,10) << "connect sent my addr " << msgr->my_inst.addr << dendl;
1150
1151
1152 while (1) {
1153 if (!authorizer) {
1154 authorizer = msgr->get_authorizer(peer_type, false);
1155 }
1156 bufferlist authorizer_reply;
1157
1158 ceph_msg_connect connect;
1159 connect.features = policy.features_supported;
1160 connect.host_type = msgr->get_myinst().name.type();
1161 connect.global_seq = gseq;
1162 connect.connect_seq = cseq;
1163 connect.protocol_version = msgr->get_proto_version(peer_type, true);
1164 connect.authorizer_protocol = authorizer ? authorizer->protocol : 0;
1165 connect.authorizer_len = authorizer ? authorizer->bl.length() : 0;
1166 if (authorizer)
1167 ldout(msgr->cct,10) << "connect.authorizer_len=" << connect.authorizer_len
1168 << " protocol=" << connect.authorizer_protocol << dendl;
1169 connect.flags = 0;
1170 if (policy.lossy)
1171 connect.flags |= CEPH_MSG_CONNECT_LOSSY; // this is fyi, actually, server decides!
1172 memset(&msg, 0, sizeof(msg));
1173 msgvec[0].iov_base = (char*)&connect;
1174 msgvec[0].iov_len = sizeof(connect);
1175 msg.msg_iov = msgvec;
1176 msg.msg_iovlen = 1;
1177 msglen = msgvec[0].iov_len;
1178 if (authorizer) {
1179 msgvec[1].iov_base = authorizer->bl.c_str();
1180 msgvec[1].iov_len = authorizer->bl.length();
1181 msg.msg_iovlen++;
1182 msglen += msgvec[1].iov_len;
1183 }
1184
1185 ldout(msgr->cct,10) << "connect sending gseq=" << gseq << " cseq=" << cseq
1186 << " proto=" << connect.protocol_version << dendl;
1187 rc = do_sendmsg(&msg, msglen);
1188 if (rc < 0) {
1189 ldout(msgr->cct,2) << "connect couldn't write gseq, cseq, " << cpp_strerror(rc) << dendl;
1190 goto fail;
1191 }
1192
1193 ldout(msgr->cct,20) << "connect wrote (self +) cseq, waiting for reply" << dendl;
1194 ceph_msg_connect_reply reply;
1195 rc = tcp_read((char*)&reply, sizeof(reply));
1196 if (rc < 0) {
1197 ldout(msgr->cct,2) << "connect read reply " << cpp_strerror(rc) << dendl;
1198 goto fail;
1199 }
1200
1201 ldout(msgr->cct,20) << "connect got reply tag " << (int)reply.tag
1202 << " connect_seq " << reply.connect_seq
1203 << " global_seq " << reply.global_seq
1204 << " proto " << reply.protocol_version
1205 << " flags " << (int)reply.flags
1206 << " features " << reply.features
1207 << dendl;
1208
1209 authorizer_reply.clear();
1210
1211 if (reply.authorizer_len) {
1212 ldout(msgr->cct,10) << "reply.authorizer_len=" << reply.authorizer_len << dendl;
1213 bufferptr bp = buffer::create(reply.authorizer_len);
1214 rc = tcp_read(bp.c_str(), reply.authorizer_len);
1215 if (rc < 0) {
1216 ldout(msgr->cct,10) << "connect couldn't read connect authorizer_reply" << cpp_strerror(rc) << dendl;
1217 goto fail;
1218 }
1219 authorizer_reply.push_back(bp);
1220 }
1221
1222 if (reply.tag == CEPH_MSGR_TAG_CHALLENGE_AUTHORIZER) {
1223 authorizer->add_challenge(msgr->cct, authorizer_reply);
1224 ldout(msgr->cct,10) << " got authorizer challenge, " << authorizer_reply.length()
1225 << " bytes" << dendl;
1226 continue;
1227 }
1228
1229 if (authorizer) {
1230 bufferlist::iterator iter = authorizer_reply.begin();
1231 if (!authorizer->verify_reply(iter)) {
1232 ldout(msgr->cct,0) << "failed verifying authorize reply" << dendl;
1233 goto fail;
1234 }
1235 }
1236
1237 if (conf->ms_inject_internal_delays) {
1238 ldout(msgr->cct, 10) << " sleep for " << msgr->cct->_conf->ms_inject_internal_delays << dendl;
1239 utime_t t;
1240 t.set_from_double(msgr->cct->_conf->ms_inject_internal_delays);
1241 t.sleep();
1242 }
1243
1244 pipe_lock.Lock();
1245 if (state != STATE_CONNECTING) {
1246 ldout(msgr->cct,0) << "connect got RESETSESSION but no longer connecting" << dendl;
1247 goto stop_locked;
1248 }
1249
1250 if (reply.tag == CEPH_MSGR_TAG_FEATURES) {
1251 ldout(msgr->cct,0) << "connect protocol feature mismatch, my " << std::hex
1252 << connect.features << " < peer " << reply.features
1253 << " missing " << (reply.features & ~policy.features_supported)
1254 << std::dec << dendl;
1255 goto fail_locked;
1256 }
1257
1258 if (reply.tag == CEPH_MSGR_TAG_BADPROTOVER) {
1259 ldout(msgr->cct,0) << "connect protocol version mismatch, my " << connect.protocol_version
1260 << " != " << reply.protocol_version << dendl;
1261 goto fail_locked;
1262 }
1263
1264 if (reply.tag == CEPH_MSGR_TAG_BADAUTHORIZER) {
1265 ldout(msgr->cct,0) << "connect got BADAUTHORIZER" << dendl;
1266 if (got_bad_auth)
1267 goto stop_locked;
1268 got_bad_auth = true;
1269 pipe_lock.Unlock();
1270 delete authorizer;
1271 authorizer = msgr->get_authorizer(peer_type, true); // try harder
1272 continue;
1273 }
1274 if (reply.tag == CEPH_MSGR_TAG_RESETSESSION) {
1275 ldout(msgr->cct,0) << "connect got RESETSESSION" << dendl;
1276 was_session_reset();
1277 cseq = 0;
1278 pipe_lock.Unlock();
1279 continue;
1280 }
1281 if (reply.tag == CEPH_MSGR_TAG_RETRY_GLOBAL) {
1282 gseq = msgr->get_global_seq(reply.global_seq);
1283 ldout(msgr->cct,10) << "connect got RETRY_GLOBAL " << reply.global_seq
1284 << " chose new " << gseq << dendl;
1285 pipe_lock.Unlock();
1286 continue;
1287 }
1288 if (reply.tag == CEPH_MSGR_TAG_RETRY_SESSION) {
1289 assert(reply.connect_seq > connect_seq);
1290 ldout(msgr->cct,10) << "connect got RETRY_SESSION " << connect_seq
1291 << " -> " << reply.connect_seq << dendl;
1292 cseq = connect_seq = reply.connect_seq;
1293 pipe_lock.Unlock();
1294 continue;
1295 }
1296
1297 if (reply.tag == CEPH_MSGR_TAG_WAIT) {
1298 ldout(msgr->cct,3) << "connect got WAIT (connection race)" << dendl;
1299 state = STATE_WAIT;
1300 goto stop_locked;
1301 }
1302
1303 if (reply.tag == CEPH_MSGR_TAG_READY ||
1304 reply.tag == CEPH_MSGR_TAG_SEQ) {
1305 uint64_t feat_missing = policy.features_required & ~(uint64_t)reply.features;
1306 if (feat_missing) {
1307 ldout(msgr->cct,1) << "missing required features " << std::hex << feat_missing << std::dec << dendl;
1308 goto fail_locked;
1309 }
1310
1311 if (reply.tag == CEPH_MSGR_TAG_SEQ) {
1312 ldout(msgr->cct,10) << "got CEPH_MSGR_TAG_SEQ, reading acked_seq and writing in_seq" << dendl;
1313 uint64_t newly_acked_seq = 0;
1314 rc = tcp_read((char*)&newly_acked_seq, sizeof(newly_acked_seq));
1315 if (rc < 0) {
1316 ldout(msgr->cct,2) << "connect read error on newly_acked_seq" << cpp_strerror(rc) << dendl;
1317 goto fail_locked;
1318 }
1319 ldout(msgr->cct,2) << " got newly_acked_seq " << newly_acked_seq
1320 << " vs out_seq " << out_seq << dendl;
1321 while (newly_acked_seq > out_seq) {
1322 Message *m = _get_next_outgoing();
1323 assert(m);
1324 ldout(msgr->cct,2) << " discarding previously sent " << m->get_seq()
1325 << " " << *m << dendl;
1326 assert(m->get_seq() <= newly_acked_seq);
1327 m->put();
1328 ++out_seq;
1329 }
1330 if (tcp_write((char*)&in_seq, sizeof(in_seq)) < 0) {
1331 ldout(msgr->cct,2) << "connect write error on in_seq" << dendl;
1332 goto fail_locked;
1333 }
1334 }
1335
1336 // hooray!
1337 peer_global_seq = reply.global_seq;
1338 policy.lossy = reply.flags & CEPH_MSG_CONNECT_LOSSY;
1339 state = STATE_OPEN;
1340 connect_seq = cseq + 1;
1341 assert(connect_seq == reply.connect_seq);
1342 backoff = utime_t();
1343 connection_state->set_features((uint64_t)reply.features & (uint64_t)connect.features);
1344 ldout(msgr->cct,10) << "connect success " << connect_seq << ", lossy = " << policy.lossy
1345 << ", features " << connection_state->get_features() << dendl;
1346
1347
1348 // If we have an authorizer, get a new AuthSessionHandler to deal with ongoing security of the
1349 // connection. PLR
1350
1351 if (authorizer != NULL) {
1352 session_security.reset(
1353 get_auth_session_handler(msgr->cct,
1354 authorizer->protocol,
1355 authorizer->session_key,
1356 connection_state->get_features()));
1357 } else {
1358 // We have no authorizer, so we shouldn't be applying security to messages in this pipe. PLR
1359 session_security.reset();
1360 }
1361
1362 msgr->dispatch_queue.queue_connect(connection_state.get());
1363 msgr->ms_deliver_handle_fast_connect(connection_state.get());
1364
1365 if (!reader_running) {
1366 ldout(msgr->cct,20) << "connect starting reader" << dendl;
1367 start_reader();
1368 }
1369 maybe_start_delay_thread();
1370 delete authorizer;
1371 return 0;
1372 }
1373
1374 // protocol error
1375 ldout(msgr->cct,0) << "connect got bad tag " << (int)tag << dendl;
1376 goto fail_locked;
1377 }
1378
1379 fail:
1380 if (conf->ms_inject_internal_delays) {
1381 ldout(msgr->cct, 10) << " sleep for " << msgr->cct->_conf->ms_inject_internal_delays << dendl;
1382 utime_t t;
1383 t.set_from_double(msgr->cct->_conf->ms_inject_internal_delays);
1384 t.sleep();
1385 }
1386
1387 pipe_lock.Lock();
1388 fail_locked:
1389 if (state == STATE_CONNECTING)
1390 fault();
1391 else
1392 ldout(msgr->cct,3) << "connect fault, but state = " << get_state_name()
1393 << " != connecting, stopping" << dendl;
1394
1395 stop_locked:
1396 delete authorizer;
1397 return rc;
1398 }
1399
1400 void Pipe::register_pipe()
1401 {
1402 ldout(msgr->cct,10) << "register_pipe" << dendl;
1403 assert(msgr->lock.is_locked());
1404 Pipe *existing = msgr->_lookup_pipe(peer_addr);
1405 assert(existing == NULL);
1406 msgr->rank_pipe[peer_addr] = this;
1407 }
1408
1409 void Pipe::unregister_pipe()
1410 {
1411 assert(msgr->lock.is_locked());
1412 ceph::unordered_map<entity_addr_t,Pipe*>::iterator p = msgr->rank_pipe.find(peer_addr);
1413 if (p != msgr->rank_pipe.end() && p->second == this) {
1414 ldout(msgr->cct,10) << "unregister_pipe" << dendl;
1415 msgr->rank_pipe.erase(p);
1416 } else {
1417 ldout(msgr->cct,10) << "unregister_pipe - not registered" << dendl;
1418 msgr->accepting_pipes.erase(this); // somewhat overkill, but safe.
1419 }
1420 }
1421
1422 void Pipe::join()
1423 {
1424 ldout(msgr->cct, 20) << "join" << dendl;
1425 if (writer_thread.is_started())
1426 writer_thread.join();
1427 if (reader_thread.is_started())
1428 reader_thread.join();
1429 if (delay_thread) {
1430 ldout(msgr->cct, 20) << "joining delay_thread" << dendl;
1431 delay_thread->stop();
1432 delay_thread->join();
1433 }
1434 }
1435
1436 void Pipe::requeue_sent()
1437 {
1438 if (sent.empty())
1439 return;
1440
1441 list<Message*>& rq = out_q[CEPH_MSG_PRIO_HIGHEST];
1442 while (!sent.empty()) {
1443 Message *m = sent.back();
1444 sent.pop_back();
1445 ldout(msgr->cct,10) << "requeue_sent " << *m << " for resend seq " << out_seq
1446 << " (" << m->get_seq() << ")" << dendl;
1447 rq.push_front(m);
1448 out_seq--;
1449 }
1450 }
1451
1452 void Pipe::discard_requeued_up_to(uint64_t seq)
1453 {
1454 ldout(msgr->cct, 10) << "discard_requeued_up_to " << seq << dendl;
1455 if (out_q.count(CEPH_MSG_PRIO_HIGHEST) == 0)
1456 return;
1457 list<Message*>& rq = out_q[CEPH_MSG_PRIO_HIGHEST];
1458 while (!rq.empty()) {
1459 Message *m = rq.front();
1460 if (m->get_seq() == 0 || m->get_seq() > seq)
1461 break;
1462 ldout(msgr->cct,10) << "discard_requeued_up_to " << *m << " for resend seq " << out_seq
1463 << " <= " << seq << ", discarding" << dendl;
1464 m->put();
1465 rq.pop_front();
1466 out_seq++;
1467 }
1468 if (rq.empty())
1469 out_q.erase(CEPH_MSG_PRIO_HIGHEST);
1470 }
1471
1472 /*
1473 * Tears down the Pipe's message queues, and removes them from the DispatchQueue
1474 * Must hold pipe_lock prior to calling.
1475 */
1476 void Pipe::discard_out_queue()
1477 {
1478 ldout(msgr->cct,10) << "discard_queue" << dendl;
1479
1480 for (list<Message*>::iterator p = sent.begin(); p != sent.end(); ++p) {
1481 ldout(msgr->cct,20) << " discard " << *p << dendl;
1482 (*p)->put();
1483 }
1484 sent.clear();
1485 for (map<int,list<Message*> >::iterator p = out_q.begin(); p != out_q.end(); ++p)
1486 for (list<Message*>::iterator r = p->second.begin(); r != p->second.end(); ++r) {
1487 ldout(msgr->cct,20) << " discard " << *r << dendl;
1488 (*r)->put();
1489 }
1490 out_q.clear();
1491 }
1492
1493 void Pipe::fault(bool onread)
1494 {
1495 const md_config_t *conf = msgr->cct->_conf;
1496 assert(pipe_lock.is_locked());
1497 cond.Signal();
1498
1499 if (onread && state == STATE_CONNECTING) {
1500 ldout(msgr->cct,10) << "fault already connecting, reader shutting down" << dendl;
1501 return;
1502 }
1503
1504 ldout(msgr->cct,2) << "fault " << cpp_strerror(errno) << dendl;
1505
1506 if (state == STATE_CLOSED ||
1507 state == STATE_CLOSING) {
1508 ldout(msgr->cct,10) << "fault already closed|closing" << dendl;
1509 if (connection_state->clear_pipe(this))
1510 msgr->dispatch_queue.queue_reset(connection_state.get());
1511 return;
1512 }
1513
1514 shutdown_socket();
1515
1516 // lossy channel?
1517 if (policy.lossy && state != STATE_CONNECTING) {
1518 ldout(msgr->cct,10) << "fault on lossy channel, failing" << dendl;
1519
1520 // disconnect from Connection, and mark it failed. future messages
1521 // will be dropped.
1522 assert(connection_state);
1523 stop();
1524 bool cleared = connection_state->clear_pipe(this);
1525
1526 // crib locks, blech. note that Pipe is now STATE_CLOSED and the
1527 // rank_pipe entry is ignored by others.
1528 pipe_lock.Unlock();
1529
1530 if (conf->ms_inject_internal_delays) {
1531 ldout(msgr->cct, 10) << " sleep for " << msgr->cct->_conf->ms_inject_internal_delays << dendl;
1532 utime_t t;
1533 t.set_from_double(msgr->cct->_conf->ms_inject_internal_delays);
1534 t.sleep();
1535 }
1536
1537 msgr->lock.Lock();
1538 pipe_lock.Lock();
1539 unregister_pipe();
1540 msgr->lock.Unlock();
1541
1542 if (delay_thread)
1543 delay_thread->discard();
1544 in_q->discard_queue(conn_id);
1545 discard_out_queue();
1546 if (cleared)
1547 msgr->dispatch_queue.queue_reset(connection_state.get());
1548 return;
1549 }
1550
1551 // queue delayed items immediately
1552 if (delay_thread)
1553 delay_thread->flush();
1554
1555 // requeue sent items
1556 requeue_sent();
1557
1558 if (policy.standby && !is_queued()) {
1559 ldout(msgr->cct,0) << "fault with nothing to send, going to standby" << dendl;
1560 state = STATE_STANDBY;
1561 return;
1562 }
1563
1564 if (state != STATE_CONNECTING) {
1565 if (policy.server) {
1566 ldout(msgr->cct,0) << "fault, server, going to standby" << dendl;
1567 state = STATE_STANDBY;
1568 } else {
1569 ldout(msgr->cct,0) << "fault, initiating reconnect" << dendl;
1570 connect_seq++;
1571 state = STATE_CONNECTING;
1572 }
1573 backoff = utime_t();
1574 } else if (backoff == utime_t()) {
1575 ldout(msgr->cct,0) << "fault" << dendl;
1576 backoff.set_from_double(conf->ms_initial_backoff);
1577 } else {
1578 ldout(msgr->cct,10) << "fault waiting " << backoff << dendl;
1579 cond.WaitInterval(pipe_lock, backoff);
1580 backoff += backoff;
1581 if (backoff > conf->ms_max_backoff)
1582 backoff.set_from_double(conf->ms_max_backoff);
1583 ldout(msgr->cct,10) << "fault done waiting or woke up" << dendl;
1584 }
1585 }
1586
1587 int Pipe::randomize_out_seq()
1588 {
1589 if (connection_state->get_features() & CEPH_FEATURE_MSG_AUTH) {
1590 // Set out_seq to a random value, so CRC won't be predictable. Don't bother checking seq_error
1591 // here. We'll check it on the call. PLR
1592 int seq_error = get_random_bytes((char *)&out_seq, sizeof(out_seq));
1593 out_seq &= SEQ_MASK;
1594 lsubdout(msgr->cct, ms, 10) << "randomize_out_seq " << out_seq << dendl;
1595 return seq_error;
1596 } else {
1597 // previously, seq #'s always started at 0.
1598 out_seq = 0;
1599 return 0;
1600 }
1601 }
1602
1603 void Pipe::was_session_reset()
1604 {
1605 assert(pipe_lock.is_locked());
1606
1607 ldout(msgr->cct,10) << "was_session_reset" << dendl;
1608 in_q->discard_queue(conn_id);
1609 if (delay_thread)
1610 delay_thread->discard();
1611 discard_out_queue();
1612
1613 msgr->dispatch_queue.queue_remote_reset(connection_state.get());
1614
1615 if (randomize_out_seq()) {
1616 lsubdout(msgr->cct,ms,15) << "was_session_reset(): Could not get random bytes to set seq number for session reset; set seq number to " << out_seq << dendl;
1617 }
1618
1619 in_seq = 0;
1620 connect_seq = 0;
1621 }
1622
1623 void Pipe::stop()
1624 {
1625 ldout(msgr->cct,10) << "stop" << dendl;
1626 assert(pipe_lock.is_locked());
1627 state = STATE_CLOSED;
1628 state_closed = true;
1629 cond.Signal();
1630 shutdown_socket();
1631 }
1632
1633 void Pipe::stop_and_wait()
1634 {
1635 assert(pipe_lock.is_locked_by_me());
1636 if (state != STATE_CLOSED)
1637 stop();
1638
1639 if (msgr->cct->_conf->ms_inject_internal_delays) {
1640 ldout(msgr->cct, 10) << __func__ << " sleep for "
1641 << msgr->cct->_conf->ms_inject_internal_delays
1642 << dendl;
1643 utime_t t;
1644 t.set_from_double(msgr->cct->_conf->ms_inject_internal_delays);
1645 t.sleep();
1646 }
1647
1648 if (delay_thread) {
1649 pipe_lock.Unlock();
1650 delay_thread->stop_fast_dispatching();
1651 pipe_lock.Lock();
1652 }
1653 while (reader_running &&
1654 reader_dispatching)
1655 cond.Wait(pipe_lock);
1656 }
1657
1658 /* read msgs from socket.
1659 * also, server.
1660 */
1661 void Pipe::reader()
1662 {
1663 pipe_lock.Lock();
1664
1665 if (state == STATE_ACCEPTING) {
1666 accept();
1667 assert(pipe_lock.is_locked());
1668 }
1669
1670 // loop.
1671 while (state != STATE_CLOSED &&
1672 state != STATE_CONNECTING) {
1673 assert(pipe_lock.is_locked());
1674
1675 // sleep if (re)connecting
1676 if (state == STATE_STANDBY) {
1677 ldout(msgr->cct,20) << "reader sleeping during reconnect|standby" << dendl;
1678 cond.Wait(pipe_lock);
1679 continue;
1680 }
1681
1682 // get a reference to the AuthSessionHandler while we have the pipe_lock
1683 ceph::shared_ptr<AuthSessionHandler> auth_handler = session_security;
1684
1685 pipe_lock.Unlock();
1686
1687 char tag = -1;
1688 ldout(msgr->cct,20) << "reader reading tag..." << dendl;
1689 if (tcp_read((char*)&tag, 1) < 0) {
1690 pipe_lock.Lock();
1691 ldout(msgr->cct,2) << "reader couldn't read tag, " << cpp_strerror(errno) << dendl;
1692 fault(true);
1693 continue;
1694 }
1695
1696 if (tag == CEPH_MSGR_TAG_KEEPALIVE) {
1697 ldout(msgr->cct,2) << "reader got KEEPALIVE" << dendl;
1698 pipe_lock.Lock();
1699 connection_state->set_last_keepalive(ceph_clock_now());
1700 continue;
1701 }
1702 if (tag == CEPH_MSGR_TAG_KEEPALIVE2) {
1703 ldout(msgr->cct,30) << "reader got KEEPALIVE2 tag ..." << dendl;
1704 ceph_timespec t;
1705 int rc = tcp_read((char*)&t, sizeof(t));
1706 pipe_lock.Lock();
1707 if (rc < 0) {
1708 ldout(msgr->cct,2) << "reader couldn't read KEEPALIVE2 stamp "
1709 << cpp_strerror(errno) << dendl;
1710 fault(true);
1711 } else {
1712 send_keepalive_ack = true;
1713 keepalive_ack_stamp = utime_t(t);
1714 ldout(msgr->cct,2) << "reader got KEEPALIVE2 " << keepalive_ack_stamp
1715 << dendl;
1716 connection_state->set_last_keepalive(ceph_clock_now());
1717 cond.Signal();
1718 }
1719 continue;
1720 }
1721 if (tag == CEPH_MSGR_TAG_KEEPALIVE2_ACK) {
1722 ldout(msgr->cct,2) << "reader got KEEPALIVE_ACK" << dendl;
1723 struct ceph_timespec t;
1724 int rc = tcp_read((char*)&t, sizeof(t));
1725 pipe_lock.Lock();
1726 if (rc < 0) {
1727 ldout(msgr->cct,2) << "reader couldn't read KEEPALIVE2 stamp " << cpp_strerror(errno) << dendl;
1728 fault(true);
1729 } else {
1730 connection_state->set_last_keepalive_ack(utime_t(t));
1731 }
1732 continue;
1733 }
1734
1735 // open ...
1736 if (tag == CEPH_MSGR_TAG_ACK) {
1737 ldout(msgr->cct,20) << "reader got ACK" << dendl;
1738 ceph_le64 seq;
1739 int rc = tcp_read((char*)&seq, sizeof(seq));
1740 pipe_lock.Lock();
1741 if (rc < 0) {
1742 ldout(msgr->cct,2) << "reader couldn't read ack seq, " << cpp_strerror(errno) << dendl;
1743 fault(true);
1744 } else if (state != STATE_CLOSED) {
1745 handle_ack(seq);
1746 }
1747 continue;
1748 }
1749
1750 else if (tag == CEPH_MSGR_TAG_MSG) {
1751 ldout(msgr->cct,20) << "reader got MSG" << dendl;
1752 Message *m = 0;
1753 int r = read_message(&m, auth_handler.get());
1754
1755 pipe_lock.Lock();
1756
1757 if (!m) {
1758 if (r < 0)
1759 fault(true);
1760 continue;
1761 }
1762
1763 m->trace.event("pipe read message");
1764
1765 if (state == STATE_CLOSED ||
1766 state == STATE_CONNECTING) {
1767 in_q->dispatch_throttle_release(m->get_dispatch_throttle_size());
1768 m->put();
1769 continue;
1770 }
1771
1772 // check received seq#. if it is old, drop the message.
1773 // note that incoming messages may skip ahead. this is convenient for the client
1774 // side queueing because messages can't be renumbered, but the (kernel) client will
1775 // occasionally pull a message out of the sent queue to send elsewhere. in that case
1776 // it doesn't matter if we "got" it or not.
1777 if (m->get_seq() <= in_seq) {
1778 ldout(msgr->cct,0) << "reader got old message "
1779 << m->get_seq() << " <= " << in_seq << " " << m << " " << *m
1780 << ", discarding" << dendl;
1781 in_q->dispatch_throttle_release(m->get_dispatch_throttle_size());
1782 m->put();
1783 if (connection_state->has_feature(CEPH_FEATURE_RECONNECT_SEQ) &&
1784 msgr->cct->_conf->ms_die_on_old_message)
1785 assert(0 == "old msgs despite reconnect_seq feature");
1786 continue;
1787 }
1788 if (m->get_seq() > in_seq + 1) {
1789 ldout(msgr->cct,0) << "reader missed message? skipped from seq "
1790 << in_seq << " to " << m->get_seq() << dendl;
1791 if (msgr->cct->_conf->ms_die_on_skipped_message)
1792 assert(0 == "skipped incoming seq");
1793 }
1794
1795 m->set_connection(connection_state.get());
1796
1797 // note last received message.
1798 in_seq = m->get_seq();
1799
1800 cond.Signal(); // wake up writer, to ack this
1801
1802 ldout(msgr->cct,10) << "reader got message "
1803 << m->get_seq() << " " << m << " " << *m
1804 << dendl;
1805 in_q->fast_preprocess(m);
1806
1807 if (delay_thread) {
1808 utime_t release;
1809 if (rand() % 10000 < msgr->cct->_conf->ms_inject_delay_probability * 10000.0) {
1810 release = m->get_recv_stamp();
1811 release += msgr->cct->_conf->ms_inject_delay_max * (double)(rand() % 10000) / 10000.0;
1812 lsubdout(msgr->cct, ms, 1) << "queue_received will delay until " << release << " on " << m << " " << *m << dendl;
1813 }
1814 delay_thread->queue(release, m);
1815 } else {
1816 if (in_q->can_fast_dispatch(m)) {
1817 reader_dispatching = true;
1818 pipe_lock.Unlock();
1819 in_q->fast_dispatch(m);
1820 pipe_lock.Lock();
1821 reader_dispatching = false;
1822 if (state == STATE_CLOSED ||
1823 notify_on_dispatch_done) { // there might be somebody waiting
1824 notify_on_dispatch_done = false;
1825 cond.Signal();
1826 }
1827 } else {
1828 in_q->enqueue(m, m->get_priority(), conn_id);
1829 }
1830 }
1831 }
1832
1833 else if (tag == CEPH_MSGR_TAG_CLOSE) {
1834 ldout(msgr->cct,20) << "reader got CLOSE" << dendl;
1835 pipe_lock.Lock();
1836 if (state == STATE_CLOSING) {
1837 state = STATE_CLOSED;
1838 state_closed = true;
1839 } else {
1840 state = STATE_CLOSING;
1841 }
1842 cond.Signal();
1843 break;
1844 }
1845 else {
1846 ldout(msgr->cct,0) << "reader bad tag " << (int)tag << dendl;
1847 pipe_lock.Lock();
1848 fault(true);
1849 }
1850 }
1851
1852
1853 // reap?
1854 reader_running = false;
1855 reader_needs_join = true;
1856 unlock_maybe_reap();
1857 ldout(msgr->cct,10) << "reader done" << dendl;
1858 }
1859
1860 /* write msgs to socket.
1861 * also, client.
1862 */
1863 void Pipe::writer()
1864 {
1865 pipe_lock.Lock();
1866 while (state != STATE_CLOSED) {// && state != STATE_WAIT) {
1867 ldout(msgr->cct,10) << "writer: state = " << get_state_name()
1868 << " policy.server=" << policy.server << dendl;
1869
1870 // standby?
1871 if (is_queued() && state == STATE_STANDBY && !policy.server)
1872 state = STATE_CONNECTING;
1873
1874 // connect?
1875 if (state == STATE_CONNECTING) {
1876 assert(!policy.server);
1877 connect();
1878 continue;
1879 }
1880
1881 if (state == STATE_CLOSING) {
1882 // write close tag
1883 ldout(msgr->cct,20) << "writer writing CLOSE tag" << dendl;
1884 char tag = CEPH_MSGR_TAG_CLOSE;
1885 state = STATE_CLOSED;
1886 state_closed = true;
1887 pipe_lock.Unlock();
1888 if (sd >= 0) {
1889 // we can ignore return value, actually; we don't care if this succeeds.
1890 int r = ::write(sd, &tag, 1);
1891 (void)r;
1892 }
1893 pipe_lock.Lock();
1894 continue;
1895 }
1896
1897 if (state != STATE_CONNECTING && state != STATE_WAIT && state != STATE_STANDBY &&
1898 (is_queued() || in_seq > in_seq_acked)) {
1899
1900 // keepalive?
1901 if (send_keepalive) {
1902 int rc;
1903 if (connection_state->has_feature(CEPH_FEATURE_MSGR_KEEPALIVE2)) {
1904 pipe_lock.Unlock();
1905 rc = write_keepalive2(CEPH_MSGR_TAG_KEEPALIVE2,
1906 ceph_clock_now());
1907 } else {
1908 pipe_lock.Unlock();
1909 rc = write_keepalive();
1910 }
1911 pipe_lock.Lock();
1912 if (rc < 0) {
1913 ldout(msgr->cct,2) << "writer couldn't write keepalive[2], "
1914 << cpp_strerror(errno) << dendl;
1915 fault();
1916 continue;
1917 }
1918 send_keepalive = false;
1919 }
1920 if (send_keepalive_ack) {
1921 utime_t t = keepalive_ack_stamp;
1922 pipe_lock.Unlock();
1923 int rc = write_keepalive2(CEPH_MSGR_TAG_KEEPALIVE2_ACK, t);
1924 pipe_lock.Lock();
1925 if (rc < 0) {
1926 ldout(msgr->cct,2) << "writer couldn't write keepalive_ack, " << cpp_strerror(errno) << dendl;
1927 fault();
1928 continue;
1929 }
1930 send_keepalive_ack = false;
1931 }
1932
1933 // send ack?
1934 if (in_seq > in_seq_acked) {
1935 uint64_t send_seq = in_seq;
1936 pipe_lock.Unlock();
1937 int rc = write_ack(send_seq);
1938 pipe_lock.Lock();
1939 if (rc < 0) {
1940 ldout(msgr->cct,2) << "writer couldn't write ack, " << cpp_strerror(errno) << dendl;
1941 fault();
1942 continue;
1943 }
1944 in_seq_acked = send_seq;
1945 }
1946
1947 // grab outgoing message
1948 Message *m = _get_next_outgoing();
1949 if (m) {
1950 m->set_seq(++out_seq);
1951 if (!policy.lossy) {
1952 // put on sent list
1953 sent.push_back(m);
1954 m->get();
1955 }
1956
1957 // associate message with Connection (for benefit of encode_payload)
1958 m->set_connection(connection_state.get());
1959
1960 uint64_t features = connection_state->get_features();
1961
1962 if (m->empty_payload())
1963 ldout(msgr->cct,20) << "writer encoding " << m->get_seq() << " features " << features
1964 << " " << m << " " << *m << dendl;
1965 else
1966 ldout(msgr->cct,20) << "writer half-reencoding " << m->get_seq() << " features " << features
1967 << " " << m << " " << *m << dendl;
1968
1969 // encode and copy out of *m
1970 m->encode(features, msgr->crcflags);
1971
1972 // prepare everything
1973 const ceph_msg_header& header = m->get_header();
1974 const ceph_msg_footer& footer = m->get_footer();
1975
1976 // Now that we have all the crcs calculated, handle the
1977 // digital signature for the message, if the pipe has session
1978 // security set up. Some session security options do not
1979 // actually calculate and check the signature, but they should
1980 // handle the calls to sign_message and check_signature. PLR
1981 if (session_security.get() == NULL) {
1982 ldout(msgr->cct, 20) << "writer no session security" << dendl;
1983 } else {
1984 if (session_security->sign_message(m)) {
1985 ldout(msgr->cct, 20) << "writer failed to sign seq # " << header.seq
1986 << "): sig = " << footer.sig << dendl;
1987 } else {
1988 ldout(msgr->cct, 20) << "writer signed seq # " << header.seq
1989 << "): sig = " << footer.sig << dendl;
1990 }
1991 }
1992
1993 bufferlist blist = m->get_payload();
1994 blist.append(m->get_middle());
1995 blist.append(m->get_data());
1996
1997 pipe_lock.Unlock();
1998
1999 m->trace.event("pipe writing message");
2000
2001 ldout(msgr->cct,20) << "writer sending " << m->get_seq() << " " << m << dendl;
2002 int rc = write_message(header, footer, blist);
2003
2004 pipe_lock.Lock();
2005 if (rc < 0) {
2006 ldout(msgr->cct,1) << "writer error sending " << m << ", "
2007 << cpp_strerror(errno) << dendl;
2008 fault();
2009 }
2010 m->put();
2011 }
2012 continue;
2013 }
2014
2015 // wait
2016 ldout(msgr->cct,20) << "writer sleeping" << dendl;
2017 cond.Wait(pipe_lock);
2018 }
2019
2020 ldout(msgr->cct,20) << "writer finishing" << dendl;
2021
2022 // reap?
2023 writer_running = false;
2024 unlock_maybe_reap();
2025 ldout(msgr->cct,10) << "writer done" << dendl;
2026 }
2027
2028 void Pipe::unlock_maybe_reap()
2029 {
2030 if (!reader_running && !writer_running) {
2031 shutdown_socket();
2032 pipe_lock.Unlock();
2033 if (delay_thread && delay_thread->is_flushing()) {
2034 delay_thread->wait_for_flush();
2035 }
2036 msgr->queue_reap(this);
2037 } else {
2038 pipe_lock.Unlock();
2039 }
2040 }
2041
2042 static void alloc_aligned_buffer(bufferlist& data, unsigned len, unsigned off)
2043 {
2044 // create a buffer to read into that matches the data alignment
2045 unsigned left = len;
2046 if (off & ~CEPH_PAGE_MASK) {
2047 // head
2048 unsigned head = 0;
2049 head = MIN(CEPH_PAGE_SIZE - (off & ~CEPH_PAGE_MASK), left);
2050 data.push_back(buffer::create(head));
2051 left -= head;
2052 }
2053 unsigned middle = left & CEPH_PAGE_MASK;
2054 if (middle > 0) {
2055 data.push_back(buffer::create_page_aligned(middle));
2056 left -= middle;
2057 }
2058 if (left) {
2059 data.push_back(buffer::create(left));
2060 }
2061 }
2062
2063 int Pipe::read_message(Message **pm, AuthSessionHandler* auth_handler)
2064 {
2065 int ret = -1;
2066 // envelope
2067 //ldout(msgr->cct,10) << "receiver.read_message from sd " << sd << dendl;
2068
2069 ceph_msg_header header;
2070 ceph_msg_footer footer;
2071 __u32 header_crc = 0;
2072
2073 if (connection_state->has_feature(CEPH_FEATURE_NOSRCADDR)) {
2074 if (tcp_read((char*)&header, sizeof(header)) < 0)
2075 return -1;
2076 if (msgr->crcflags & MSG_CRC_HEADER) {
2077 header_crc = ceph_crc32c(0, (unsigned char *)&header, sizeof(header) - sizeof(header.crc));
2078 }
2079 } else {
2080 ceph_msg_header_old oldheader;
2081 if (tcp_read((char*)&oldheader, sizeof(oldheader)) < 0)
2082 return -1;
2083 // this is fugly
2084 memcpy(&header, &oldheader, sizeof(header));
2085 header.src = oldheader.src.name;
2086 header.reserved = oldheader.reserved;
2087 if (msgr->crcflags & MSG_CRC_HEADER) {
2088 header.crc = oldheader.crc;
2089 header_crc = ceph_crc32c(0, (unsigned char *)&oldheader, sizeof(oldheader) - sizeof(oldheader.crc));
2090 }
2091 }
2092
2093 ldout(msgr->cct,20) << "reader got envelope type=" << header.type
2094 << " src " << entity_name_t(header.src)
2095 << " front=" << header.front_len
2096 << " data=" << header.data_len
2097 << " off " << header.data_off
2098 << dendl;
2099
2100 // verify header crc
2101 if ((msgr->crcflags & MSG_CRC_HEADER) && header_crc != header.crc) {
2102 ldout(msgr->cct,0) << "reader got bad header crc " << header_crc << " != " << header.crc << dendl;
2103 return -1;
2104 }
2105
2106 bufferlist front, middle, data;
2107 int front_len, middle_len;
2108 unsigned data_len, data_off;
2109 int aborted;
2110 Message *message;
2111 utime_t recv_stamp = ceph_clock_now();
2112
2113 if (policy.throttler_messages) {
2114 ldout(msgr->cct,10) << "reader wants " << 1 << " message from policy throttler "
2115 << policy.throttler_messages->get_current() << "/"
2116 << policy.throttler_messages->get_max() << dendl;
2117 policy.throttler_messages->get();
2118 }
2119
2120 uint64_t message_size = header.front_len + header.middle_len + header.data_len;
2121 if (message_size) {
2122 if (policy.throttler_bytes) {
2123 ldout(msgr->cct,10) << "reader wants " << message_size << " bytes from policy throttler "
2124 << policy.throttler_bytes->get_current() << "/"
2125 << policy.throttler_bytes->get_max() << dendl;
2126 policy.throttler_bytes->get(message_size);
2127 }
2128
2129 // throttle total bytes waiting for dispatch. do this _after_ the
2130 // policy throttle, as this one does not deadlock (unless dispatch
2131 // blocks indefinitely, which it shouldn't). in contrast, the
2132 // policy throttle carries for the lifetime of the message.
2133 ldout(msgr->cct,10) << "reader wants " << message_size << " from dispatch throttler "
2134 << in_q->dispatch_throttler.get_current() << "/"
2135 << in_q->dispatch_throttler.get_max() << dendl;
2136 in_q->dispatch_throttler.get(message_size);
2137 }
2138
2139 utime_t throttle_stamp = ceph_clock_now();
2140
2141 // read front
2142 front_len = header.front_len;
2143 if (front_len) {
2144 bufferptr bp = buffer::create(front_len);
2145 if (tcp_read(bp.c_str(), front_len) < 0)
2146 goto out_dethrottle;
2147 front.push_back(std::move(bp));
2148 ldout(msgr->cct,20) << "reader got front " << front.length() << dendl;
2149 }
2150
2151 // read middle
2152 middle_len = header.middle_len;
2153 if (middle_len) {
2154 bufferptr bp = buffer::create(middle_len);
2155 if (tcp_read(bp.c_str(), middle_len) < 0)
2156 goto out_dethrottle;
2157 middle.push_back(std::move(bp));
2158 ldout(msgr->cct,20) << "reader got middle " << middle.length() << dendl;
2159 }
2160
2161
2162 // read data
2163 data_len = le32_to_cpu(header.data_len);
2164 data_off = le32_to_cpu(header.data_off);
2165 if (data_len) {
2166 unsigned offset = 0;
2167 unsigned left = data_len;
2168
2169 bufferlist newbuf, rxbuf;
2170 bufferlist::iterator blp;
2171 int rxbuf_version = 0;
2172
2173 while (left > 0) {
2174 // wait for data
2175 if (tcp_read_wait() < 0)
2176 goto out_dethrottle;
2177
2178 // get a buffer
2179 connection_state->lock.Lock();
2180 map<ceph_tid_t,pair<bufferlist,int> >::iterator p = connection_state->rx_buffers.find(header.tid);
2181 if (p != connection_state->rx_buffers.end()) {
2182 if (rxbuf.length() == 0 || p->second.second != rxbuf_version) {
2183 ldout(msgr->cct,10) << "reader seleting rx buffer v " << p->second.second
2184 << " at offset " << offset
2185 << " len " << p->second.first.length() << dendl;
2186 rxbuf = p->second.first;
2187 rxbuf_version = p->second.second;
2188 // make sure it's big enough
2189 if (rxbuf.length() < data_len)
2190 rxbuf.push_back(buffer::create(data_len - rxbuf.length()));
2191 blp = p->second.first.begin();
2192 blp.advance(offset);
2193 }
2194 } else {
2195 if (!newbuf.length()) {
2196 ldout(msgr->cct,20) << "reader allocating new rx buffer at offset " << offset << dendl;
2197 alloc_aligned_buffer(newbuf, data_len, data_off);
2198 blp = newbuf.begin();
2199 blp.advance(offset);
2200 }
2201 }
2202 bufferptr bp = blp.get_current_ptr();
2203 int read = MIN(bp.length(), left);
2204 ldout(msgr->cct,20) << "reader reading nonblocking into " << (void*)bp.c_str() << " len " << bp.length() << dendl;
2205 ssize_t got = tcp_read_nonblocking(bp.c_str(), read);
2206 ldout(msgr->cct,30) << "reader read " << got << " of " << read << dendl;
2207 connection_state->lock.Unlock();
2208 if (got < 0)
2209 goto out_dethrottle;
2210 if (got > 0) {
2211 blp.advance(got);
2212 data.append(bp, 0, got);
2213 offset += got;
2214 left -= got;
2215 } // else we got a signal or something; just loop.
2216 }
2217 }
2218
2219 // footer
2220 if (connection_state->has_feature(CEPH_FEATURE_MSG_AUTH)) {
2221 if (tcp_read((char*)&footer, sizeof(footer)) < 0)
2222 goto out_dethrottle;
2223 } else {
2224 ceph_msg_footer_old old_footer;
2225 if (tcp_read((char*)&old_footer, sizeof(old_footer)) < 0)
2226 goto out_dethrottle;
2227 footer.front_crc = old_footer.front_crc;
2228 footer.middle_crc = old_footer.middle_crc;
2229 footer.data_crc = old_footer.data_crc;
2230 footer.sig = 0;
2231 footer.flags = old_footer.flags;
2232 }
2233
2234 aborted = (footer.flags & CEPH_MSG_FOOTER_COMPLETE) == 0;
2235 ldout(msgr->cct,10) << "aborted = " << aborted << dendl;
2236 if (aborted) {
2237 ldout(msgr->cct,0) << "reader got " << front.length() << " + " << middle.length() << " + " << data.length()
2238 << " byte message.. ABORTED" << dendl;
2239 ret = 0;
2240 goto out_dethrottle;
2241 }
2242
2243 ldout(msgr->cct,20) << "reader got " << front.length() << " + " << middle.length() << " + " << data.length()
2244 << " byte message" << dendl;
2245 message = decode_message(msgr->cct, msgr->crcflags, header, footer,
2246 front, middle, data, connection_state.get());
2247 if (!message) {
2248 ret = -EINVAL;
2249 goto out_dethrottle;
2250 }
2251
2252 //
2253 // Check the signature if one should be present. A zero return indicates success. PLR
2254 //
2255
2256 if (auth_handler == NULL) {
2257 ldout(msgr->cct, 10) << "No session security set" << dendl;
2258 } else {
2259 if (auth_handler->check_message_signature(message)) {
2260 ldout(msgr->cct, 0) << "Signature check failed" << dendl;
2261 message->put();
2262 ret = -EINVAL;
2263 goto out_dethrottle;
2264 }
2265 }
2266
2267 message->set_byte_throttler(policy.throttler_bytes);
2268 message->set_message_throttler(policy.throttler_messages);
2269
2270 // store reservation size in message, so we don't get confused
2271 // by messages entering the dispatch queue through other paths.
2272 message->set_dispatch_throttle_size(message_size);
2273
2274 message->set_recv_stamp(recv_stamp);
2275 message->set_throttle_stamp(throttle_stamp);
2276 message->set_recv_complete_stamp(ceph_clock_now());
2277
2278 *pm = message;
2279 return 0;
2280
2281 out_dethrottle:
2282 // release bytes reserved from the throttlers on failure
2283 if (policy.throttler_messages) {
2284 ldout(msgr->cct,10) << "reader releasing " << 1 << " message to policy throttler "
2285 << policy.throttler_messages->get_current() << "/"
2286 << policy.throttler_messages->get_max() << dendl;
2287 policy.throttler_messages->put();
2288 }
2289 if (message_size) {
2290 if (policy.throttler_bytes) {
2291 ldout(msgr->cct,10) << "reader releasing " << message_size << " bytes to policy throttler "
2292 << policy.throttler_bytes->get_current() << "/"
2293 << policy.throttler_bytes->get_max() << dendl;
2294 policy.throttler_bytes->put(message_size);
2295 }
2296
2297 in_q->dispatch_throttle_release(message_size);
2298 }
2299 return ret;
2300 }
2301
2302 int Pipe::do_sendmsg(struct msghdr *msg, unsigned len, bool more)
2303 {
2304 MSGR_SIGPIPE_STOPPER;
2305 while (len > 0) {
2306 int r;
2307 r = ::sendmsg(sd, msg, MSG_NOSIGNAL | (more ? MSG_MORE : 0));
2308 if (r == 0)
2309 ldout(msgr->cct,10) << "do_sendmsg hmm do_sendmsg got r==0!" << dendl;
2310 if (r < 0) {
2311 r = -errno;
2312 ldout(msgr->cct,1) << "do_sendmsg error " << cpp_strerror(r) << dendl;
2313 return r;
2314 }
2315 if (state == STATE_CLOSED) {
2316 ldout(msgr->cct,10) << "do_sendmsg oh look, state == CLOSED, giving up" << dendl;
2317 return -EINTR; // close enough
2318 }
2319
2320 len -= r;
2321 if (len == 0) break;
2322
2323 // hrmph. trim r bytes off the front of our message.
2324 ldout(msgr->cct,20) << "do_sendmsg short write did " << r << ", still have " << len << dendl;
2325 while (r > 0) {
2326 if (msg->msg_iov[0].iov_len <= (size_t)r) {
2327 // lose this whole item
2328 //ldout(msgr->cct,30) << "skipping " << msg->msg_iov[0].iov_len << ", " << (msg->msg_iovlen-1) << " v, " << r << " left" << dendl;
2329 r -= msg->msg_iov[0].iov_len;
2330 msg->msg_iov++;
2331 msg->msg_iovlen--;
2332 } else {
2333 // partial!
2334 //ldout(msgr->cct,30) << "adjusting " << msg->msg_iov[0].iov_len << ", " << msg->msg_iovlen << " v, " << r << " left" << dendl;
2335 msg->msg_iov[0].iov_base = (char *)msg->msg_iov[0].iov_base + r;
2336 msg->msg_iov[0].iov_len -= r;
2337 break;
2338 }
2339 }
2340 }
2341 return 0;
2342 }
2343
2344
2345 int Pipe::write_ack(uint64_t seq)
2346 {
2347 ldout(msgr->cct,10) << "write_ack " << seq << dendl;
2348
2349 char c = CEPH_MSGR_TAG_ACK;
2350 ceph_le64 s;
2351 s = seq;
2352
2353 struct msghdr msg;
2354 memset(&msg, 0, sizeof(msg));
2355 struct iovec msgvec[2];
2356 msgvec[0].iov_base = &c;
2357 msgvec[0].iov_len = 1;
2358 msgvec[1].iov_base = &s;
2359 msgvec[1].iov_len = sizeof(s);
2360 msg.msg_iov = msgvec;
2361 msg.msg_iovlen = 2;
2362
2363 if (do_sendmsg(&msg, 1 + sizeof(s), true) < 0)
2364 return -1;
2365 return 0;
2366 }
2367
2368 int Pipe::write_keepalive()
2369 {
2370 ldout(msgr->cct,10) << "write_keepalive" << dendl;
2371
2372 char c = CEPH_MSGR_TAG_KEEPALIVE;
2373
2374 struct msghdr msg;
2375 memset(&msg, 0, sizeof(msg));
2376 struct iovec msgvec[2];
2377 msgvec[0].iov_base = &c;
2378 msgvec[0].iov_len = 1;
2379 msg.msg_iov = msgvec;
2380 msg.msg_iovlen = 1;
2381
2382 if (do_sendmsg(&msg, 1) < 0)
2383 return -1;
2384 return 0;
2385 }
2386
2387 int Pipe::write_keepalive2(char tag, const utime_t& t)
2388 {
2389 ldout(msgr->cct,10) << "write_keepalive2 " << (int)tag << " " << t << dendl;
2390 struct ceph_timespec ts;
2391 t.encode_timeval(&ts);
2392 struct msghdr msg;
2393 memset(&msg, 0, sizeof(msg));
2394 struct iovec msgvec[2];
2395 msgvec[0].iov_base = &tag;
2396 msgvec[0].iov_len = 1;
2397 msgvec[1].iov_base = &ts;
2398 msgvec[1].iov_len = sizeof(ts);
2399 msg.msg_iov = msgvec;
2400 msg.msg_iovlen = 2;
2401
2402 if (do_sendmsg(&msg, 1 + sizeof(ts)) < 0)
2403 return -1;
2404 return 0;
2405 }
2406
2407
2408 int Pipe::write_message(const ceph_msg_header& header, const ceph_msg_footer& footer, bufferlist& blist)
2409 {
2410 int ret;
2411
2412 // set up msghdr and iovecs
2413 struct msghdr msg;
2414 memset(&msg, 0, sizeof(msg));
2415 msg.msg_iov = msgvec;
2416 int msglen = 0;
2417
2418 // send tag
2419 char tag = CEPH_MSGR_TAG_MSG;
2420 msgvec[msg.msg_iovlen].iov_base = &tag;
2421 msgvec[msg.msg_iovlen].iov_len = 1;
2422 msglen++;
2423 msg.msg_iovlen++;
2424
2425 // send envelope
2426 ceph_msg_header_old oldheader;
2427 if (connection_state->has_feature(CEPH_FEATURE_NOSRCADDR)) {
2428 msgvec[msg.msg_iovlen].iov_base = (char*)&header;
2429 msgvec[msg.msg_iovlen].iov_len = sizeof(header);
2430 msglen += sizeof(header);
2431 msg.msg_iovlen++;
2432 } else {
2433 memcpy(&oldheader, &header, sizeof(header));
2434 oldheader.src.name = header.src;
2435 oldheader.src.addr = connection_state->get_peer_addr();
2436 oldheader.orig_src = oldheader.src;
2437 oldheader.reserved = header.reserved;
2438 if (msgr->crcflags & MSG_CRC_HEADER) {
2439 oldheader.crc = ceph_crc32c(0, (unsigned char*)&oldheader,
2440 sizeof(oldheader) - sizeof(oldheader.crc));
2441 } else {
2442 oldheader.crc = 0;
2443 }
2444 msgvec[msg.msg_iovlen].iov_base = (char*)&oldheader;
2445 msgvec[msg.msg_iovlen].iov_len = sizeof(oldheader);
2446 msglen += sizeof(oldheader);
2447 msg.msg_iovlen++;
2448 }
2449
2450 // payload (front+data)
2451 list<bufferptr>::const_iterator pb = blist.buffers().begin();
2452 unsigned b_off = 0; // carry-over buffer offset, if any
2453 unsigned bl_pos = 0; // blist pos
2454 unsigned left = blist.length();
2455
2456 while (left > 0) {
2457 unsigned donow = MIN(left, pb->length()-b_off);
2458 if (donow == 0) {
2459 ldout(msgr->cct,0) << "donow = " << donow << " left " << left << " pb->length " << pb->length()
2460 << " b_off " << b_off << dendl;
2461 }
2462 assert(donow > 0);
2463 ldout(msgr->cct,30) << " bl_pos " << bl_pos << " b_off " << b_off
2464 << " leftinchunk " << left
2465 << " buffer len " << pb->length()
2466 << " writing " << donow
2467 << dendl;
2468
2469 if (msg.msg_iovlen >= SM_IOV_MAX-2) {
2470 if (do_sendmsg(&msg, msglen, true))
2471 goto fail;
2472
2473 // and restart the iov
2474 msg.msg_iov = msgvec;
2475 msg.msg_iovlen = 0;
2476 msglen = 0;
2477 }
2478
2479 msgvec[msg.msg_iovlen].iov_base = (void*)(pb->c_str()+b_off);
2480 msgvec[msg.msg_iovlen].iov_len = donow;
2481 msglen += donow;
2482 msg.msg_iovlen++;
2483
2484 assert(left >= donow);
2485 left -= donow;
2486 b_off += donow;
2487 bl_pos += donow;
2488 if (left == 0)
2489 break;
2490 while (b_off == pb->length()) {
2491 ++pb;
2492 b_off = 0;
2493 }
2494 }
2495 assert(left == 0);
2496
2497 // send footer; if receiver doesn't support signatures, use the old footer format
2498
2499 ceph_msg_footer_old old_footer;
2500 if (connection_state->has_feature(CEPH_FEATURE_MSG_AUTH)) {
2501 msgvec[msg.msg_iovlen].iov_base = (void*)&footer;
2502 msgvec[msg.msg_iovlen].iov_len = sizeof(footer);
2503 msglen += sizeof(footer);
2504 msg.msg_iovlen++;
2505 } else {
2506 if (msgr->crcflags & MSG_CRC_HEADER) {
2507 old_footer.front_crc = footer.front_crc;
2508 old_footer.middle_crc = footer.middle_crc;
2509 } else {
2510 old_footer.front_crc = old_footer.middle_crc = 0;
2511 }
2512 old_footer.data_crc = msgr->crcflags & MSG_CRC_DATA ? footer.data_crc : 0;
2513 old_footer.flags = footer.flags;
2514 msgvec[msg.msg_iovlen].iov_base = (char*)&old_footer;
2515 msgvec[msg.msg_iovlen].iov_len = sizeof(old_footer);
2516 msglen += sizeof(old_footer);
2517 msg.msg_iovlen++;
2518 }
2519
2520 // send
2521 if (do_sendmsg(&msg, msglen))
2522 goto fail;
2523
2524 ret = 0;
2525
2526 out:
2527 return ret;
2528
2529 fail:
2530 ret = -1;
2531 goto out;
2532 }
2533
2534
2535 int Pipe::tcp_read(char *buf, unsigned len)
2536 {
2537 if (sd < 0)
2538 return -EINVAL;
2539
2540 while (len > 0) {
2541
2542 if (msgr->cct->_conf->ms_inject_socket_failures && sd >= 0) {
2543 if (rand() % msgr->cct->_conf->ms_inject_socket_failures == 0) {
2544 ldout(msgr->cct, 0) << "injecting socket failure" << dendl;
2545 ::shutdown(sd, SHUT_RDWR);
2546 }
2547 }
2548
2549 if (tcp_read_wait() < 0)
2550 return -1;
2551
2552 ssize_t got = tcp_read_nonblocking(buf, len);
2553
2554 if (got < 0)
2555 return -1;
2556
2557 len -= got;
2558 buf += got;
2559 //lgeneric_dout(cct, DBL) << "tcp_read got " << got << ", " << len << " left" << dendl;
2560 }
2561 return 0;
2562 }
2563
2564 int Pipe::tcp_read_wait()
2565 {
2566 if (sd < 0)
2567 return -EINVAL;
2568 struct pollfd pfd;
2569 short evmask;
2570 pfd.fd = sd;
2571 pfd.events = POLLIN;
2572 #if defined(__linux__)
2573 pfd.events |= POLLRDHUP;
2574 #endif
2575
2576 if (has_pending_data())
2577 return 0;
2578
2579 int r = poll(&pfd, 1, msgr->timeout);
2580 if (r < 0)
2581 return -errno;
2582 if (r == 0)
2583 return -EAGAIN;
2584
2585 evmask = POLLERR | POLLHUP | POLLNVAL;
2586 #if defined(__linux__)
2587 evmask |= POLLRDHUP;
2588 #endif
2589 if (pfd.revents & evmask)
2590 return -1;
2591
2592 if (!(pfd.revents & POLLIN))
2593 return -1;
2594
2595 return 0;
2596 }
2597
2598 ssize_t Pipe::do_recv(char *buf, size_t len, int flags)
2599 {
2600 again:
2601 ssize_t got = ::recv( sd, buf, len, flags );
2602 if (got < 0) {
2603 if (errno == EINTR) {
2604 goto again;
2605 }
2606 ldout(msgr->cct, 10) << __func__ << " socket " << sd << " returned "
2607 << got << " " << cpp_strerror(errno) << dendl;
2608 return -1;
2609 }
2610 if (got == 0) {
2611 return -1;
2612 }
2613 return got;
2614 }
2615
2616 ssize_t Pipe::buffered_recv(char *buf, size_t len, int flags)
2617 {
2618 size_t left = len;
2619 ssize_t total_recv = 0;
2620 if (recv_len > recv_ofs) {
2621 int to_read = MIN(recv_len - recv_ofs, left);
2622 memcpy(buf, &recv_buf[recv_ofs], to_read);
2623 recv_ofs += to_read;
2624 left -= to_read;
2625 if (left == 0) {
2626 return to_read;
2627 }
2628 buf += to_read;
2629 total_recv += to_read;
2630 }
2631
2632 /* nothing left in the prefetch buffer */
2633
2634 if (left > recv_max_prefetch) {
2635 /* this was a large read, we don't prefetch for these */
2636 ssize_t ret = do_recv(buf, left, flags );
2637 if (ret < 0) {
2638 if (total_recv > 0)
2639 return total_recv;
2640 return ret;
2641 }
2642 total_recv += ret;
2643 return total_recv;
2644 }
2645
2646
2647 ssize_t got = do_recv(recv_buf, recv_max_prefetch, flags);
2648 if (got < 0) {
2649 if (total_recv > 0)
2650 return total_recv;
2651
2652 return got;
2653 }
2654
2655 recv_len = (size_t)got;
2656 got = MIN(left, (size_t)got);
2657 memcpy(buf, recv_buf, got);
2658 recv_ofs = got;
2659 total_recv += got;
2660 return total_recv;
2661 }
2662
2663 ssize_t Pipe::tcp_read_nonblocking(char *buf, unsigned len)
2664 {
2665 ssize_t got = buffered_recv(buf, len, MSG_DONTWAIT );
2666 if (got < 0) {
2667 ldout(msgr->cct, 10) << __func__ << " socket " << sd << " returned "
2668 << got << " " << cpp_strerror(errno) << dendl;
2669 return -1;
2670 }
2671 if (got == 0) {
2672 /* poll() said there was data, but we didn't read any - peer
2673 * sent a FIN. Maybe POLLRDHUP signals this, but this is
2674 * standard socket behavior as documented by Stevens.
2675 */
2676 return -1;
2677 }
2678 return got;
2679 }
2680
2681 int Pipe::tcp_write(const char *buf, unsigned len)
2682 {
2683 if (sd < 0)
2684 return -1;
2685 struct pollfd pfd;
2686 pfd.fd = sd;
2687 pfd.events = POLLOUT | POLLHUP | POLLNVAL | POLLERR;
2688 #if defined(__linux__)
2689 pfd.events |= POLLRDHUP;
2690 #endif
2691
2692 if (msgr->cct->_conf->ms_inject_socket_failures && sd >= 0) {
2693 if (rand() % msgr->cct->_conf->ms_inject_socket_failures == 0) {
2694 ldout(msgr->cct, 0) << "injecting socket failure" << dendl;
2695 ::shutdown(sd, SHUT_RDWR);
2696 }
2697 }
2698
2699 if (poll(&pfd, 1, -1) < 0)
2700 return -1;
2701
2702 if (!(pfd.revents & POLLOUT))
2703 return -1;
2704
2705 //lgeneric_dout(cct, DBL) << "tcp_write writing " << len << dendl;
2706 assert(len > 0);
2707 while (len > 0) {
2708 MSGR_SIGPIPE_STOPPER;
2709 int did = ::send( sd, buf, len, MSG_NOSIGNAL );
2710 if (did < 0) {
2711 //lgeneric_dout(cct, 1) << "tcp_write error did = " << did << " " << cpp_strerror(errno) << dendl;
2712 //lgeneric_derr(cct, 1) << "tcp_write error did = " << did << " " << cpp_strerror(errno) << dendl;
2713 return did;
2714 }
2715 len -= did;
2716 buf += did;
2717 //lgeneric_dout(cct, DBL) << "tcp_write did " << did << ", " << len << " left" << dendl;
2718 }
2719 return 0;
2720 }