]> git.proxmox.com Git - ceph.git/blob - ceph/src/msg/simple/Pipe.cc
bump version to 12.2.5-pve1
[ceph.git] / ceph / src / msg / simple / Pipe.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #include <sys/types.h>
16 #include <sys/socket.h>
17 #include <netinet/in.h>
18 #include <netinet/ip.h>
19 #include <netinet/tcp.h>
20 #include <sys/uio.h>
21 #include <limits.h>
22 #include <poll.h>
23
24 #include "msg/Message.h"
25 #include "Pipe.h"
26 #include "SimpleMessenger.h"
27
28 #include "common/debug.h"
29 #include "common/errno.h"
30 #include "common/valgrind.h"
31
32 // Below included to get encode_encrypt(); That probably should be in Crypto.h, instead
33
34 #include "auth/Crypto.h"
35 #include "auth/cephx/CephxProtocol.h"
36 #include "auth/AuthSessionHandler.h"
37
38 #include "include/sock_compat.h"
39
40 // Constant to limit starting sequence number to 2^31. Nothing special about it, just a big number. PLR
41 #define SEQ_MASK 0x7fffffff
42 #define dout_subsys ceph_subsys_ms
43
44 #undef dout_prefix
45 #define dout_prefix *_dout << *this
46 ostream& Pipe::_pipe_prefix(std::ostream &out) const {
47 return out << "-- " << msgr->get_myinst().addr << " >> " << peer_addr << " pipe(" << this
48 << " sd=" << sd << " :" << port
49 << " s=" << state
50 << " pgs=" << peer_global_seq
51 << " cs=" << connect_seq
52 << " l=" << policy.lossy
53 << " c=" << connection_state
54 << ").";
55 }
56
57 ostream& operator<<(ostream &out, const Pipe &pipe) {
58 return pipe._pipe_prefix(out);
59 }
60
61 /**
62 * The DelayedDelivery is for injecting delays into Message delivery off
63 * the socket. It is only enabled if delays are requested, and if they
64 * are then it pulls Messages off the DelayQueue and puts them into the
65 * in_q (SimpleMessenger::dispatch_queue).
66 * Please note that this probably has issues with Pipe shutdown and
67 * replacement semantics. I've tried, but no guarantees.
68 */
69 class Pipe::DelayedDelivery: public Thread {
70 Pipe *pipe;
71 std::deque< pair<utime_t,Message*> > delay_queue;
72 Mutex delay_lock;
73 Cond delay_cond;
74 int flush_count;
75 bool active_flush;
76 bool stop_delayed_delivery;
77 bool delay_dispatching; // we are in fast dispatch now
78 bool stop_fast_dispatching_flag; // we need to stop fast dispatching
79
80 public:
81 explicit DelayedDelivery(Pipe *p)
82 : pipe(p),
83 delay_lock("Pipe::DelayedDelivery::delay_lock"), flush_count(0),
84 active_flush(false),
85 stop_delayed_delivery(false),
86 delay_dispatching(false),
87 stop_fast_dispatching_flag(false) { }
88 ~DelayedDelivery() override {
89 discard();
90 }
91 void *entry() override;
92 void queue(utime_t release, Message *m) {
93 Mutex::Locker l(delay_lock);
94 delay_queue.push_back(make_pair(release, m));
95 delay_cond.Signal();
96 }
97 void discard();
98 void flush();
99 bool is_flushing() {
100 Mutex::Locker l(delay_lock);
101 return flush_count > 0 || active_flush;
102 }
103 void wait_for_flush() {
104 Mutex::Locker l(delay_lock);
105 while (flush_count > 0 || active_flush)
106 delay_cond.Wait(delay_lock);
107 }
108 void stop() {
109 delay_lock.Lock();
110 stop_delayed_delivery = true;
111 delay_cond.Signal();
112 delay_lock.Unlock();
113 }
114 void steal_for_pipe(Pipe *new_owner) {
115 Mutex::Locker l(delay_lock);
116 pipe = new_owner;
117 }
118 /**
119 * We need to stop fast dispatching before we need to stop putting
120 * normal messages into the DispatchQueue.
121 */
122 void stop_fast_dispatching();
123 };
124
125 /**************************************
126 * Pipe
127 */
128
129 Pipe::Pipe(SimpleMessenger *r, int st, PipeConnection *con)
130 : RefCountedObject(r->cct),
131 reader_thread(this),
132 writer_thread(this),
133 delay_thread(NULL),
134 msgr(r),
135 conn_id(r->dispatch_queue.get_id()),
136 recv_ofs(0),
137 recv_len(0),
138 sd(-1), port(0),
139 peer_type(-1),
140 pipe_lock("SimpleMessenger::Pipe::pipe_lock"),
141 state(st),
142 connection_state(NULL),
143 reader_running(false), reader_needs_join(false),
144 reader_dispatching(false), notify_on_dispatch_done(false),
145 writer_running(false),
146 in_q(&(r->dispatch_queue)),
147 send_keepalive(false),
148 send_keepalive_ack(false),
149 connect_seq(0), peer_global_seq(0),
150 out_seq(0), in_seq(0), in_seq_acked(0) {
151 ANNOTATE_BENIGN_RACE_SIZED(&sd, sizeof(sd), "Pipe socket");
152 ANNOTATE_BENIGN_RACE_SIZED(&state, sizeof(state), "Pipe state");
153 ANNOTATE_BENIGN_RACE_SIZED(&recv_len, sizeof(recv_len), "Pipe recv_len");
154 ANNOTATE_BENIGN_RACE_SIZED(&recv_ofs, sizeof(recv_ofs), "Pipe recv_ofs");
155 if (con) {
156 connection_state = con;
157 connection_state->reset_pipe(this);
158 } else {
159 connection_state = new PipeConnection(msgr->cct, msgr);
160 connection_state->pipe = get();
161 }
162
163 if (randomize_out_seq()) {
164 lsubdout(msgr->cct,ms,15) << "Pipe(): Could not get random bytes to set seq number for session reset; set seq number to " << out_seq << dendl;
165 }
166
167
168 msgr->timeout = msgr->cct->_conf->ms_tcp_read_timeout * 1000; //convert to ms
169 if (msgr->timeout == 0)
170 msgr->timeout = -1;
171
172 recv_max_prefetch = msgr->cct->_conf->ms_tcp_prefetch_max_size;
173 recv_buf = new char[recv_max_prefetch];
174 }
175
176 Pipe::~Pipe()
177 {
178 assert(out_q.empty());
179 assert(sent.empty());
180 delete delay_thread;
181 delete[] recv_buf;
182 }
183
184 void Pipe::handle_ack(uint64_t seq)
185 {
186 lsubdout(msgr->cct, ms, 15) << "reader got ack seq " << seq << dendl;
187 // trim sent list
188 while (!sent.empty() &&
189 sent.front()->get_seq() <= seq) {
190 Message *m = sent.front();
191 sent.pop_front();
192 lsubdout(msgr->cct, ms, 10) << "reader got ack seq "
193 << seq << " >= " << m->get_seq() << " on " << m << " " << *m << dendl;
194 m->put();
195 }
196 }
197
198 void Pipe::start_reader()
199 {
200 assert(pipe_lock.is_locked());
201 assert(!reader_running);
202 if (reader_needs_join) {
203 reader_thread.join();
204 reader_needs_join = false;
205 }
206 reader_running = true;
207 reader_thread.create("ms_pipe_read", msgr->cct->_conf->ms_rwthread_stack_bytes);
208 }
209
210 void Pipe::maybe_start_delay_thread()
211 {
212 if (!delay_thread) {
213 auto pos = msgr->cct->_conf->get_val<std::string>("ms_inject_delay_type").find(ceph_entity_type_name(connection_state->peer_type));
214 if (pos != string::npos) {
215 lsubdout(msgr->cct, ms, 1) << "setting up a delay queue on Pipe " << this << dendl;
216 delay_thread = new DelayedDelivery(this);
217 delay_thread->create("ms_pipe_delay");
218 }
219 }
220 }
221
222 void Pipe::start_writer()
223 {
224 assert(pipe_lock.is_locked());
225 assert(!writer_running);
226 writer_running = true;
227 writer_thread.create("ms_pipe_write", msgr->cct->_conf->ms_rwthread_stack_bytes);
228 }
229
230 void Pipe::join_reader()
231 {
232 if (!reader_running)
233 return;
234 cond.Signal();
235 pipe_lock.Unlock();
236 reader_thread.join();
237 pipe_lock.Lock();
238 reader_needs_join = false;
239 }
240
241 void Pipe::DelayedDelivery::discard()
242 {
243 lgeneric_subdout(pipe->msgr->cct, ms, 20) << *pipe << "DelayedDelivery::discard" << dendl;
244 Mutex::Locker l(delay_lock);
245 while (!delay_queue.empty()) {
246 Message *m = delay_queue.front().second;
247 pipe->in_q->dispatch_throttle_release(m->get_dispatch_throttle_size());
248 m->put();
249 delay_queue.pop_front();
250 }
251 }
252
253 void Pipe::DelayedDelivery::flush()
254 {
255 lgeneric_subdout(pipe->msgr->cct, ms, 20) << *pipe << "DelayedDelivery::flush" << dendl;
256 Mutex::Locker l(delay_lock);
257 flush_count = delay_queue.size();
258 delay_cond.Signal();
259 }
260
261 void *Pipe::DelayedDelivery::entry()
262 {
263 Mutex::Locker locker(delay_lock);
264 lgeneric_subdout(pipe->msgr->cct, ms, 20) << *pipe << "DelayedDelivery::entry start" << dendl;
265
266 while (!stop_delayed_delivery) {
267 if (delay_queue.empty()) {
268 lgeneric_subdout(pipe->msgr->cct, ms, 30) << *pipe << "DelayedDelivery::entry sleeping on delay_cond because delay queue is empty" << dendl;
269 delay_cond.Wait(delay_lock);
270 continue;
271 }
272 utime_t release = delay_queue.front().first;
273 Message *m = delay_queue.front().second;
274 string delay_msg_type = pipe->msgr->cct->_conf->ms_inject_delay_msg_type;
275 if (!flush_count &&
276 (release > ceph_clock_now() &&
277 (delay_msg_type.empty() || m->get_type_name() == delay_msg_type))) {
278 lgeneric_subdout(pipe->msgr->cct, ms, 10) << *pipe << "DelayedDelivery::entry sleeping on delay_cond until " << release << dendl;
279 delay_cond.WaitUntil(delay_lock, release);
280 continue;
281 }
282 lgeneric_subdout(pipe->msgr->cct, ms, 10) << *pipe << "DelayedDelivery::entry dequeuing message " << m << " for delivery, past " << release << dendl;
283 delay_queue.pop_front();
284 if (flush_count > 0) {
285 --flush_count;
286 active_flush = true;
287 }
288 if (pipe->in_q->can_fast_dispatch(m)) {
289 if (!stop_fast_dispatching_flag) {
290 delay_dispatching = true;
291 delay_lock.Unlock();
292 pipe->in_q->fast_dispatch(m);
293 delay_lock.Lock();
294 delay_dispatching = false;
295 if (stop_fast_dispatching_flag) {
296 // we need to let the stopping thread proceed
297 delay_cond.Signal();
298 delay_lock.Unlock();
299 delay_lock.Lock();
300 }
301 }
302 } else {
303 pipe->in_q->enqueue(m, m->get_priority(), pipe->conn_id);
304 }
305 active_flush = false;
306 }
307 lgeneric_subdout(pipe->msgr->cct, ms, 20) << *pipe << "DelayedDelivery::entry stop" << dendl;
308 return NULL;
309 }
310
311 void Pipe::DelayedDelivery::stop_fast_dispatching() {
312 Mutex::Locker l(delay_lock);
313 stop_fast_dispatching_flag = true;
314 while (delay_dispatching)
315 delay_cond.Wait(delay_lock);
316 }
317
318
319 int Pipe::accept()
320 {
321 ldout(msgr->cct,10) << "accept" << dendl;
322 assert(pipe_lock.is_locked());
323 assert(state == STATE_ACCEPTING);
324
325 pipe_lock.Unlock();
326
327 // vars
328 bufferlist addrs;
329 entity_addr_t socket_addr;
330 socklen_t len;
331 int r;
332 char banner[strlen(CEPH_BANNER)+1];
333 bufferlist addrbl;
334 ceph_msg_connect connect;
335 ceph_msg_connect_reply reply;
336 Pipe *existing = 0;
337 bufferptr bp;
338 bufferlist authorizer, authorizer_reply;
339 bool authorizer_valid;
340 uint64_t feat_missing;
341 bool replaced = false;
342 // this variable denotes if the connection attempt from peer is a hard
343 // reset or not, it is true if there is an existing connection and the
344 // connection sequence from peer is equal to zero
345 bool is_reset_from_peer = false;
346 CryptoKey session_key;
347 int removed; // single-use down below
348
349 // this should roughly mirror pseudocode at
350 // http://ceph.com/wiki/Messaging_protocol
351 int reply_tag = 0;
352 uint64_t existing_seq = -1;
353
354 // used for reading in the remote acked seq on connect
355 uint64_t newly_acked_seq = 0;
356
357 recv_reset();
358
359 set_socket_options();
360
361 // announce myself.
362 r = tcp_write(CEPH_BANNER, strlen(CEPH_BANNER));
363 if (r < 0) {
364 ldout(msgr->cct,10) << "accept couldn't write banner" << dendl;
365 goto fail_unlocked;
366 }
367
368 // and my addr
369 ::encode(msgr->my_inst.addr, addrs, 0); // legacy
370
371 port = msgr->my_inst.addr.get_port();
372
373 // and peer's socket addr (they might not know their ip)
374 sockaddr_storage ss;
375 len = sizeof(ss);
376 r = ::getpeername(sd, (sockaddr*)&ss, &len);
377 if (r < 0) {
378 ldout(msgr->cct,0) << "accept failed to getpeername " << cpp_strerror(errno) << dendl;
379 goto fail_unlocked;
380 }
381 socket_addr.set_sockaddr((sockaddr*)&ss);
382 ::encode(socket_addr, addrs, 0); // legacy
383
384 r = tcp_write(addrs.c_str(), addrs.length());
385 if (r < 0) {
386 ldout(msgr->cct,10) << "accept couldn't write my+peer addr" << dendl;
387 goto fail_unlocked;
388 }
389
390 ldout(msgr->cct,1) << "accept sd=" << sd << " " << socket_addr << dendl;
391
392 // identify peer
393 if (tcp_read(banner, strlen(CEPH_BANNER)) < 0) {
394 ldout(msgr->cct,10) << "accept couldn't read banner" << dendl;
395 goto fail_unlocked;
396 }
397 if (memcmp(banner, CEPH_BANNER, strlen(CEPH_BANNER))) {
398 banner[strlen(CEPH_BANNER)] = 0;
399 ldout(msgr->cct,1) << "accept peer sent bad banner '" << banner << "' (should be '" << CEPH_BANNER << "')" << dendl;
400 goto fail_unlocked;
401 }
402 {
403 bufferptr tp(sizeof(ceph_entity_addr));
404 addrbl.push_back(std::move(tp));
405 }
406 if (tcp_read(addrbl.c_str(), addrbl.length()) < 0) {
407 ldout(msgr->cct,10) << "accept couldn't read peer_addr" << dendl;
408 goto fail_unlocked;
409 }
410 {
411 bufferlist::iterator ti = addrbl.begin();
412 ::decode(peer_addr, ti);
413 }
414
415 ldout(msgr->cct,10) << "accept peer addr is " << peer_addr << dendl;
416 if (peer_addr.is_blank_ip()) {
417 // peer apparently doesn't know what ip they have; figure it out for them.
418 int port = peer_addr.get_port();
419 peer_addr.u = socket_addr.u;
420 peer_addr.set_port(port);
421 ldout(msgr->cct,0) << "accept peer addr is really " << peer_addr
422 << " (socket is " << socket_addr << ")" << dendl;
423 }
424 set_peer_addr(peer_addr); // so that connection_state gets set up
425
426 while (1) {
427 if (tcp_read((char*)&connect, sizeof(connect)) < 0) {
428 ldout(msgr->cct,10) << "accept couldn't read connect" << dendl;
429 goto fail_unlocked;
430 }
431
432 authorizer.clear();
433 if (connect.authorizer_len) {
434 bp = buffer::create(connect.authorizer_len);
435 if (tcp_read(bp.c_str(), connect.authorizer_len) < 0) {
436 ldout(msgr->cct,10) << "accept couldn't read connect authorizer" << dendl;
437 goto fail_unlocked;
438 }
439 authorizer.push_back(std::move(bp));
440 authorizer_reply.clear();
441 }
442
443 ldout(msgr->cct,20) << "accept got peer connect_seq " << connect.connect_seq
444 << " global_seq " << connect.global_seq
445 << dendl;
446
447 msgr->lock.Lock(); // FIXME
448 pipe_lock.Lock();
449 if (msgr->dispatch_queue.stop)
450 goto shutting_down;
451 if (state != STATE_ACCEPTING) {
452 goto shutting_down;
453 }
454
455 // note peer's type, flags
456 set_peer_type(connect.host_type);
457 policy = msgr->get_policy(connect.host_type);
458 ldout(msgr->cct,10) << "accept of host_type " << connect.host_type
459 << ", policy.lossy=" << policy.lossy
460 << " policy.server=" << policy.server
461 << " policy.standby=" << policy.standby
462 << " policy.resetcheck=" << policy.resetcheck
463 << dendl;
464
465 memset(&reply, 0, sizeof(reply));
466 reply.protocol_version = msgr->get_proto_version(peer_type, false);
467 msgr->lock.Unlock();
468
469 // mismatch?
470 ldout(msgr->cct,10) << "accept my proto " << reply.protocol_version
471 << ", their proto " << connect.protocol_version << dendl;
472 if (connect.protocol_version != reply.protocol_version) {
473 reply.tag = CEPH_MSGR_TAG_BADPROTOVER;
474 goto reply;
475 }
476
477 // require signatures for cephx?
478 if (connect.authorizer_protocol == CEPH_AUTH_CEPHX) {
479 if (peer_type == CEPH_ENTITY_TYPE_OSD ||
480 peer_type == CEPH_ENTITY_TYPE_MDS) {
481 if (msgr->cct->_conf->cephx_require_signatures ||
482 msgr->cct->_conf->cephx_cluster_require_signatures) {
483 ldout(msgr->cct,10) << "using cephx, requiring MSG_AUTH feature bit for cluster" << dendl;
484 policy.features_required |= CEPH_FEATURE_MSG_AUTH;
485 }
486 } else {
487 if (msgr->cct->_conf->cephx_require_signatures ||
488 msgr->cct->_conf->cephx_service_require_signatures) {
489 ldout(msgr->cct,10) << "using cephx, requiring MSG_AUTH feature bit for service" << dendl;
490 policy.features_required |= CEPH_FEATURE_MSG_AUTH;
491 }
492 }
493 }
494
495 feat_missing = policy.features_required & ~(uint64_t)connect.features;
496 if (feat_missing) {
497 ldout(msgr->cct,1) << "peer missing required features " << std::hex << feat_missing << std::dec << dendl;
498 reply.tag = CEPH_MSGR_TAG_FEATURES;
499 goto reply;
500 }
501
502 // Check the authorizer. If not good, bail out.
503
504 pipe_lock.Unlock();
505
506 if (!msgr->verify_authorizer(connection_state.get(), peer_type, connect.authorizer_protocol, authorizer,
507 authorizer_reply, authorizer_valid, session_key) ||
508 !authorizer_valid) {
509 ldout(msgr->cct,0) << "accept: got bad authorizer" << dendl;
510 pipe_lock.Lock();
511 if (state != STATE_ACCEPTING)
512 goto shutting_down_msgr_unlocked;
513 reply.tag = CEPH_MSGR_TAG_BADAUTHORIZER;
514 session_security.reset();
515 goto reply;
516 }
517
518 // We've verified the authorizer for this pipe, so set up the session security structure. PLR
519
520 ldout(msgr->cct,10) << "accept: setting up session_security." << dendl;
521
522 retry_existing_lookup:
523 msgr->lock.Lock();
524 pipe_lock.Lock();
525 if (msgr->dispatch_queue.stop)
526 goto shutting_down;
527 if (state != STATE_ACCEPTING)
528 goto shutting_down;
529
530 // existing?
531 existing = msgr->_lookup_pipe(peer_addr);
532 if (existing) {
533 existing->pipe_lock.Lock(true); // skip lockdep check (we are locking a second Pipe here)
534 if (existing->reader_dispatching) {
535 /** we need to wait, or we can deadlock if downstream
536 * fast_dispatchers are (naughtily!) waiting on resources
537 * held by somebody trying to make use of the SimpleMessenger lock.
538 * So drop locks, wait, and retry. It just looks like a slow network
539 * to everybody else.
540 *
541 * We take a ref to existing here since it might get reaped before we
542 * wake up (see bug #15870). We can be confident that it lived until
543 * locked it since we held the msgr lock from _lookup_pipe through to
544 * locking existing->lock and checking reader_dispatching.
545 */
546 existing->get();
547 pipe_lock.Unlock();
548 msgr->lock.Unlock();
549 existing->notify_on_dispatch_done = true;
550 while (existing->reader_dispatching)
551 existing->cond.Wait(existing->pipe_lock);
552 existing->pipe_lock.Unlock();
553 existing->put();
554 existing = nullptr;
555 goto retry_existing_lookup;
556 }
557
558 if (connect.global_seq < existing->peer_global_seq) {
559 ldout(msgr->cct,10) << "accept existing " << existing << ".gseq " << existing->peer_global_seq
560 << " > " << connect.global_seq << ", RETRY_GLOBAL" << dendl;
561 reply.tag = CEPH_MSGR_TAG_RETRY_GLOBAL;
562 reply.global_seq = existing->peer_global_seq; // so we can send it below..
563 existing->pipe_lock.Unlock();
564 msgr->lock.Unlock();
565 goto reply;
566 } else {
567 ldout(msgr->cct,10) << "accept existing " << existing << ".gseq " << existing->peer_global_seq
568 << " <= " << connect.global_seq << ", looks ok" << dendl;
569 }
570
571 if (existing->policy.lossy) {
572 ldout(msgr->cct,0) << "accept replacing existing (lossy) channel (new one lossy="
573 << policy.lossy << ")" << dendl;
574 existing->was_session_reset();
575 goto replace;
576 }
577
578 ldout(msgr->cct,0) << "accept connect_seq " << connect.connect_seq
579 << " vs existing " << existing->connect_seq
580 << " state " << existing->get_state_name() << dendl;
581
582 if (connect.connect_seq == 0 && existing->connect_seq > 0) {
583 ldout(msgr->cct,0) << "accept peer reset, then tried to connect to us, replacing" << dendl;
584 // this is a hard reset from peer
585 is_reset_from_peer = true;
586 if (policy.resetcheck)
587 existing->was_session_reset(); // this resets out_queue, msg_ and connect_seq #'s
588 goto replace;
589 }
590
591 if (connect.connect_seq < existing->connect_seq) {
592 // old attempt, or we sent READY but they didn't get it.
593 ldout(msgr->cct,10) << "accept existing " << existing << ".cseq " << existing->connect_seq
594 << " > " << connect.connect_seq << ", RETRY_SESSION" << dendl;
595 goto retry_session;
596 }
597
598 if (connect.connect_seq == existing->connect_seq) {
599 // if the existing connection successfully opened, and/or
600 // subsequently went to standby, then the peer should bump
601 // their connect_seq and retry: this is not a connection race
602 // we need to resolve here.
603 if (existing->state == STATE_OPEN ||
604 existing->state == STATE_STANDBY) {
605 ldout(msgr->cct,10) << "accept connection race, existing " << existing
606 << ".cseq " << existing->connect_seq
607 << " == " << connect.connect_seq
608 << ", OPEN|STANDBY, RETRY_SESSION" << dendl;
609 goto retry_session;
610 }
611
612 // connection race?
613 if (peer_addr < msgr->my_inst.addr ||
614 existing->policy.server) {
615 // incoming wins
616 ldout(msgr->cct,10) << "accept connection race, existing " << existing << ".cseq " << existing->connect_seq
617 << " == " << connect.connect_seq << ", or we are server, replacing my attempt" << dendl;
618 if (!(existing->state == STATE_CONNECTING ||
619 existing->state == STATE_WAIT))
620 lderr(msgr->cct) << "accept race bad state, would replace, existing="
621 << existing->get_state_name()
622 << " " << existing << ".cseq=" << existing->connect_seq
623 << " == " << connect.connect_seq
624 << dendl;
625 assert(existing->state == STATE_CONNECTING ||
626 existing->state == STATE_WAIT);
627 goto replace;
628 } else {
629 // our existing outgoing wins
630 ldout(msgr->cct,10) << "accept connection race, existing " << existing << ".cseq " << existing->connect_seq
631 << " == " << connect.connect_seq << ", sending WAIT" << dendl;
632 assert(peer_addr > msgr->my_inst.addr);
633 if (!(existing->state == STATE_CONNECTING))
634 lderr(msgr->cct) << "accept race bad state, would send wait, existing="
635 << existing->get_state_name()
636 << " " << existing << ".cseq=" << existing->connect_seq
637 << " == " << connect.connect_seq
638 << dendl;
639 assert(existing->state == STATE_CONNECTING);
640 // make sure our outgoing connection will follow through
641 existing->_send_keepalive();
642 reply.tag = CEPH_MSGR_TAG_WAIT;
643 existing->pipe_lock.Unlock();
644 msgr->lock.Unlock();
645 goto reply;
646 }
647 }
648
649 assert(connect.connect_seq > existing->connect_seq);
650 assert(connect.global_seq >= existing->peer_global_seq);
651 if (policy.resetcheck && // RESETSESSION only used by servers; peers do not reset each other
652 existing->connect_seq == 0) {
653 ldout(msgr->cct,0) << "accept we reset (peer sent cseq " << connect.connect_seq
654 << ", " << existing << ".cseq = " << existing->connect_seq
655 << "), sending RESETSESSION" << dendl;
656 reply.tag = CEPH_MSGR_TAG_RESETSESSION;
657 msgr->lock.Unlock();
658 existing->pipe_lock.Unlock();
659 goto reply;
660 }
661
662 // reconnect
663 ldout(msgr->cct,10) << "accept peer sent cseq " << connect.connect_seq
664 << " > " << existing->connect_seq << dendl;
665 goto replace;
666 } // existing
667 else if (connect.connect_seq > 0) {
668 // we reset, and they are opening a new session
669 ldout(msgr->cct,0) << "accept we reset (peer sent cseq " << connect.connect_seq << "), sending RESETSESSION" << dendl;
670 msgr->lock.Unlock();
671 reply.tag = CEPH_MSGR_TAG_RESETSESSION;
672 goto reply;
673 } else {
674 // new session
675 ldout(msgr->cct,10) << "accept new session" << dendl;
676 existing = NULL;
677 goto open;
678 }
679 ceph_abort();
680
681 retry_session:
682 assert(existing->pipe_lock.is_locked());
683 assert(pipe_lock.is_locked());
684 reply.tag = CEPH_MSGR_TAG_RETRY_SESSION;
685 reply.connect_seq = existing->connect_seq + 1;
686 existing->pipe_lock.Unlock();
687 msgr->lock.Unlock();
688 goto reply;
689
690 reply:
691 assert(pipe_lock.is_locked());
692 reply.features = ((uint64_t)connect.features & policy.features_supported) | policy.features_required;
693 reply.authorizer_len = authorizer_reply.length();
694 pipe_lock.Unlock();
695 r = tcp_write((char*)&reply, sizeof(reply));
696 if (r < 0)
697 goto fail_unlocked;
698 if (reply.authorizer_len) {
699 r = tcp_write(authorizer_reply.c_str(), authorizer_reply.length());
700 if (r < 0)
701 goto fail_unlocked;
702 }
703 }
704
705 replace:
706 assert(existing->pipe_lock.is_locked());
707 assert(pipe_lock.is_locked());
708 // if it is a hard reset from peer, we don't need a round-trip to negotiate in/out sequence
709 if ((connect.features & CEPH_FEATURE_RECONNECT_SEQ) && !is_reset_from_peer) {
710 reply_tag = CEPH_MSGR_TAG_SEQ;
711 existing_seq = existing->in_seq;
712 }
713 ldout(msgr->cct,10) << "accept replacing " << existing << dendl;
714 existing->stop();
715 existing->unregister_pipe();
716 replaced = true;
717
718 if (existing->policy.lossy) {
719 // disconnect from the Connection
720 assert(existing->connection_state);
721 if (existing->connection_state->clear_pipe(existing))
722 msgr->dispatch_queue.queue_reset(existing->connection_state.get());
723 } else {
724 // queue a reset on the new connection, which we're dumping for the old
725 msgr->dispatch_queue.queue_reset(connection_state.get());
726
727 // drop my Connection, and take a ref to the existing one. do not
728 // clear existing->connection_state, since read_message and
729 // write_message both dereference it without pipe_lock.
730 connection_state = existing->connection_state;
731
732 // make existing Connection reference us
733 connection_state->reset_pipe(this);
734
735 if (existing->delay_thread) {
736 existing->delay_thread->steal_for_pipe(this);
737 delay_thread = existing->delay_thread;
738 existing->delay_thread = NULL;
739 delay_thread->flush();
740 }
741
742 // steal incoming queue
743 uint64_t replaced_conn_id = conn_id;
744 conn_id = existing->conn_id;
745 existing->conn_id = replaced_conn_id;
746
747 // reset the in_seq if this is a hard reset from peer,
748 // otherwise we respect our original connection's value
749 in_seq = is_reset_from_peer ? 0 : existing->in_seq;
750 in_seq_acked = in_seq;
751
752 // steal outgoing queue and out_seq
753 existing->requeue_sent();
754 out_seq = existing->out_seq;
755 ldout(msgr->cct,10) << "accept re-queuing on out_seq " << out_seq << " in_seq " << in_seq << dendl;
756 for (map<int, list<Message*> >::iterator p = existing->out_q.begin();
757 p != existing->out_q.end();
758 ++p)
759 out_q[p->first].splice(out_q[p->first].begin(), p->second);
760 }
761 existing->stop_and_wait();
762 existing->pipe_lock.Unlock();
763
764 open:
765 // open
766 assert(pipe_lock.is_locked());
767 connect_seq = connect.connect_seq + 1;
768 peer_global_seq = connect.global_seq;
769 assert(state == STATE_ACCEPTING);
770 state = STATE_OPEN;
771 ldout(msgr->cct,10) << "accept success, connect_seq = " << connect_seq << ", sending READY" << dendl;
772
773 // send READY reply
774 reply.tag = (reply_tag ? reply_tag : CEPH_MSGR_TAG_READY);
775 reply.features = policy.features_supported;
776 reply.global_seq = msgr->get_global_seq();
777 reply.connect_seq = connect_seq;
778 reply.flags = 0;
779 reply.authorizer_len = authorizer_reply.length();
780 if (policy.lossy)
781 reply.flags = reply.flags | CEPH_MSG_CONNECT_LOSSY;
782
783 connection_state->set_features((uint64_t)reply.features & (uint64_t)connect.features);
784 ldout(msgr->cct,10) << "accept features " << connection_state->get_features() << dendl;
785
786 session_security.reset(
787 get_auth_session_handler(msgr->cct,
788 connect.authorizer_protocol,
789 session_key,
790 connection_state->get_features()));
791
792 // notify
793 msgr->dispatch_queue.queue_accept(connection_state.get());
794 msgr->ms_deliver_handle_fast_accept(connection_state.get());
795
796 // ok!
797 if (msgr->dispatch_queue.stop)
798 goto shutting_down;
799 removed = msgr->accepting_pipes.erase(this);
800 assert(removed == 1);
801 register_pipe();
802 msgr->lock.Unlock();
803 pipe_lock.Unlock();
804
805 r = tcp_write((char*)&reply, sizeof(reply));
806 if (r < 0) {
807 goto fail_registered;
808 }
809
810 if (reply.authorizer_len) {
811 r = tcp_write(authorizer_reply.c_str(), authorizer_reply.length());
812 if (r < 0) {
813 goto fail_registered;
814 }
815 }
816
817 if (reply_tag == CEPH_MSGR_TAG_SEQ) {
818 if (tcp_write((char*)&existing_seq, sizeof(existing_seq)) < 0) {
819 ldout(msgr->cct,2) << "accept write error on in_seq" << dendl;
820 goto fail_registered;
821 }
822 if (tcp_read((char*)&newly_acked_seq, sizeof(newly_acked_seq)) < 0) {
823 ldout(msgr->cct,2) << "accept read error on newly_acked_seq" << dendl;
824 goto fail_registered;
825 }
826 }
827
828 pipe_lock.Lock();
829 discard_requeued_up_to(newly_acked_seq);
830 if (state != STATE_CLOSED) {
831 ldout(msgr->cct,10) << "accept starting writer, state " << get_state_name() << dendl;
832 start_writer();
833 }
834 ldout(msgr->cct,20) << "accept done" << dendl;
835
836 maybe_start_delay_thread();
837
838 return 0; // success.
839
840 fail_registered:
841 ldout(msgr->cct, 10) << "accept fault after register" << dendl;
842
843 if (msgr->cct->_conf->ms_inject_internal_delays) {
844 ldout(msgr->cct, 10) << " sleep for " << msgr->cct->_conf->ms_inject_internal_delays << dendl;
845 utime_t t;
846 t.set_from_double(msgr->cct->_conf->ms_inject_internal_delays);
847 t.sleep();
848 }
849
850 fail_unlocked:
851 pipe_lock.Lock();
852 if (state != STATE_CLOSED) {
853 bool queued = is_queued();
854 ldout(msgr->cct, 10) << " queued = " << (int)queued << dendl;
855 if (queued) {
856 state = policy.server ? STATE_STANDBY : STATE_CONNECTING;
857 } else if (replaced) {
858 state = STATE_STANDBY;
859 } else {
860 state = STATE_CLOSED;
861 state_closed = true;
862 }
863 fault();
864 if (queued || replaced)
865 start_writer();
866 }
867 return -1;
868
869 shutting_down:
870 msgr->lock.Unlock();
871 shutting_down_msgr_unlocked:
872 assert(pipe_lock.is_locked());
873
874 if (msgr->cct->_conf->ms_inject_internal_delays) {
875 ldout(msgr->cct, 10) << " sleep for " << msgr->cct->_conf->ms_inject_internal_delays << dendl;
876 utime_t t;
877 t.set_from_double(msgr->cct->_conf->ms_inject_internal_delays);
878 t.sleep();
879 }
880
881 state = STATE_CLOSED;
882 state_closed = true;
883 fault();
884 return -1;
885 }
886
887 void Pipe::set_socket_options()
888 {
889 // disable Nagle algorithm?
890 if (msgr->cct->_conf->ms_tcp_nodelay) {
891 int flag = 1;
892 int r = ::setsockopt(sd, IPPROTO_TCP, TCP_NODELAY, (char*)&flag, sizeof(flag));
893 if (r < 0) {
894 r = -errno;
895 ldout(msgr->cct,0) << "couldn't set TCP_NODELAY: "
896 << cpp_strerror(r) << dendl;
897 }
898 }
899 if (msgr->cct->_conf->ms_tcp_rcvbuf) {
900 int size = msgr->cct->_conf->ms_tcp_rcvbuf;
901 int r = ::setsockopt(sd, SOL_SOCKET, SO_RCVBUF, (void*)&size, sizeof(size));
902 if (r < 0) {
903 r = -errno;
904 ldout(msgr->cct,0) << "couldn't set SO_RCVBUF to " << size
905 << ": " << cpp_strerror(r) << dendl;
906 }
907 }
908
909 // block ESIGPIPE
910 #ifdef CEPH_USE_SO_NOSIGPIPE
911 int val = 1;
912 int r = ::setsockopt(sd, SOL_SOCKET, SO_NOSIGPIPE, (void*)&val, sizeof(val));
913 if (r) {
914 r = -errno;
915 ldout(msgr->cct,0) << "couldn't set SO_NOSIGPIPE: "
916 << cpp_strerror(r) << dendl;
917 }
918 #endif
919
920 #ifdef SO_PRIORITY
921 int prio = msgr->get_socket_priority();
922 if (prio >= 0) {
923 int r = -1;
924 #ifdef IPTOS_CLASS_CS6
925 int iptos = IPTOS_CLASS_CS6;
926 int addr_family = 0;
927 if (!peer_addr.is_blank_ip()) {
928 addr_family = peer_addr.get_family();
929 } else {
930 addr_family = msgr->get_myaddr().get_family();
931 }
932 switch (addr_family) {
933 case AF_INET:
934 r = ::setsockopt(sd, IPPROTO_IP, IP_TOS, &iptos, sizeof(iptos));
935 break;
936 case AF_INET6:
937 r = ::setsockopt(sd, IPPROTO_IPV6, IPV6_TCLASS, &iptos, sizeof(iptos));
938 break;
939 default:
940 lderr(msgr->cct) << "couldn't set ToS of unknown family ("
941 << addr_family << ")"
942 << " to " << iptos << dendl;
943 return;
944 }
945 if (r < 0) {
946 r = -errno;
947 ldout(msgr->cct,0) << "couldn't set TOS to " << iptos
948 << ": " << cpp_strerror(r) << dendl;
949 }
950 #endif
951 // setsockopt(IPTOS_CLASS_CS6) sets the priority of the socket as 0.
952 // See http://goo.gl/QWhvsD and http://goo.gl/laTbjT
953 // We need to call setsockopt(SO_PRIORITY) after it.
954 r = ::setsockopt(sd, SOL_SOCKET, SO_PRIORITY, &prio, sizeof(prio));
955 if (r < 0) {
956 r = -errno;
957 ldout(msgr->cct,0) << "couldn't set SO_PRIORITY to " << prio
958 << ": " << cpp_strerror(r) << dendl;
959 }
960 }
961 #endif
962 }
963
964 int Pipe::connect()
965 {
966 bool got_bad_auth = false;
967
968 ldout(msgr->cct,10) << "connect " << connect_seq << dendl;
969 assert(pipe_lock.is_locked());
970
971 __u32 cseq = connect_seq;
972 __u32 gseq = msgr->get_global_seq();
973
974 // stop reader thread
975 join_reader();
976
977 pipe_lock.Unlock();
978
979 char tag = -1;
980 int rc = -1;
981 struct msghdr msg;
982 struct iovec msgvec[2];
983 int msglen;
984 char banner[strlen(CEPH_BANNER) + 1]; // extra byte makes coverity happy
985 entity_addr_t paddr;
986 entity_addr_t peer_addr_for_me, socket_addr;
987 AuthAuthorizer *authorizer = NULL;
988 bufferlist addrbl, myaddrbl;
989 const md_config_t *conf = msgr->cct->_conf;
990
991 // close old socket. this is safe because we stopped the reader thread above.
992 if (sd >= 0)
993 ::close(sd);
994
995 // create socket?
996 sd = ::socket(peer_addr.get_family(), SOCK_STREAM, 0);
997 if (sd < 0) {
998 rc = -errno;
999 lderr(msgr->cct) << "connect couldn't create socket " << cpp_strerror(rc) << dendl;
1000 goto fail;
1001 }
1002
1003 recv_reset();
1004
1005 set_socket_options();
1006
1007 {
1008 entity_addr_t addr2bind = msgr->get_myaddr();
1009 if (msgr->cct->_conf->ms_bind_before_connect && (!addr2bind.is_blank_ip())) {
1010 addr2bind.set_port(0);
1011 int r = ::bind(sd , addr2bind.get_sockaddr(), addr2bind.get_sockaddr_len());
1012 if (r < 0) {
1013 ldout(msgr->cct,2) << "client bind error " << ", " << cpp_strerror(errno) << dendl;
1014 goto fail;
1015 }
1016 }
1017 }
1018
1019 // connect!
1020 ldout(msgr->cct,10) << "connecting to " << peer_addr << dendl;
1021 rc = ::connect(sd, peer_addr.get_sockaddr(), peer_addr.get_sockaddr_len());
1022 if (rc < 0) {
1023 int stored_errno = errno;
1024 ldout(msgr->cct,2) << "connect error " << peer_addr
1025 << ", " << cpp_strerror(stored_errno) << dendl;
1026 if (stored_errno == ECONNREFUSED) {
1027 ldout(msgr->cct, 2) << "connection refused!" << dendl;
1028 msgr->dispatch_queue.queue_refused(connection_state.get());
1029 }
1030 goto fail;
1031 }
1032
1033 // verify banner
1034 // FIXME: this should be non-blocking, or in some other way verify the banner as we get it.
1035 rc = tcp_read((char*)&banner, strlen(CEPH_BANNER));
1036 if (rc < 0) {
1037 ldout(msgr->cct,2) << "connect couldn't read banner, " << cpp_strerror(rc) << dendl;
1038 goto fail;
1039 }
1040 if (memcmp(banner, CEPH_BANNER, strlen(CEPH_BANNER))) {
1041 ldout(msgr->cct,0) << "connect protocol error (bad banner) on peer " << peer_addr << dendl;
1042 goto fail;
1043 }
1044
1045 memset(&msg, 0, sizeof(msg));
1046 msgvec[0].iov_base = banner;
1047 msgvec[0].iov_len = strlen(CEPH_BANNER);
1048 msg.msg_iov = msgvec;
1049 msg.msg_iovlen = 1;
1050 msglen = msgvec[0].iov_len;
1051 rc = do_sendmsg(&msg, msglen);
1052 if (rc < 0) {
1053 ldout(msgr->cct,2) << "connect couldn't write my banner, " << cpp_strerror(rc) << dendl;
1054 goto fail;
1055 }
1056
1057 // identify peer
1058 {
1059 #if defined(__linux__) || defined(DARWIN) || defined(__FreeBSD__)
1060 bufferptr p(sizeof(ceph_entity_addr) * 2);
1061 #else
1062 int wirelen = sizeof(__u32) * 2 + sizeof(ceph_sockaddr_storage);
1063 bufferptr p(wirelen * 2);
1064 #endif
1065 addrbl.push_back(std::move(p));
1066 }
1067 rc = tcp_read(addrbl.c_str(), addrbl.length());
1068 if (rc < 0) {
1069 ldout(msgr->cct,2) << "connect couldn't read peer addrs, " << cpp_strerror(rc) << dendl;
1070 goto fail;
1071 }
1072 try {
1073 bufferlist::iterator p = addrbl.begin();
1074 ::decode(paddr, p);
1075 ::decode(peer_addr_for_me, p);
1076 }
1077 catch (buffer::error& e) {
1078 ldout(msgr->cct,2) << "connect couldn't decode peer addrs: " << e.what()
1079 << dendl;
1080 goto fail;
1081 }
1082 port = peer_addr_for_me.get_port();
1083
1084 ldout(msgr->cct,20) << "connect read peer addr " << paddr << " on socket " << sd << dendl;
1085 if (peer_addr != paddr) {
1086 if (paddr.is_blank_ip() &&
1087 peer_addr.get_port() == paddr.get_port() &&
1088 peer_addr.get_nonce() == paddr.get_nonce()) {
1089 ldout(msgr->cct,0) << "connect claims to be "
1090 << paddr << " not " << peer_addr << " - presumably this is the same node!" << dendl;
1091 } else {
1092 ldout(msgr->cct,10) << "connect claims to be "
1093 << paddr << " not " << peer_addr << dendl;
1094 goto fail;
1095 }
1096 }
1097
1098 ldout(msgr->cct,20) << "connect peer addr for me is " << peer_addr_for_me << dendl;
1099
1100 msgr->learned_addr(peer_addr_for_me);
1101
1102 ::encode(msgr->my_inst.addr, myaddrbl, 0); // legacy
1103
1104 memset(&msg, 0, sizeof(msg));
1105 msgvec[0].iov_base = myaddrbl.c_str();
1106 msgvec[0].iov_len = myaddrbl.length();
1107 msg.msg_iov = msgvec;
1108 msg.msg_iovlen = 1;
1109 msglen = msgvec[0].iov_len;
1110 rc = do_sendmsg(&msg, msglen);
1111 if (rc < 0) {
1112 ldout(msgr->cct,2) << "connect couldn't write my addr, " << cpp_strerror(rc) << dendl;
1113 goto fail;
1114 }
1115 ldout(msgr->cct,10) << "connect sent my addr " << msgr->my_inst.addr << dendl;
1116
1117
1118 while (1) {
1119 delete authorizer;
1120 authorizer = msgr->get_authorizer(peer_type, false);
1121 bufferlist authorizer_reply;
1122
1123 ceph_msg_connect connect;
1124 connect.features = policy.features_supported;
1125 connect.host_type = msgr->get_myinst().name.type();
1126 connect.global_seq = gseq;
1127 connect.connect_seq = cseq;
1128 connect.protocol_version = msgr->get_proto_version(peer_type, true);
1129 connect.authorizer_protocol = authorizer ? authorizer->protocol : 0;
1130 connect.authorizer_len = authorizer ? authorizer->bl.length() : 0;
1131 if (authorizer)
1132 ldout(msgr->cct,10) << "connect.authorizer_len=" << connect.authorizer_len
1133 << " protocol=" << connect.authorizer_protocol << dendl;
1134 connect.flags = 0;
1135 if (policy.lossy)
1136 connect.flags |= CEPH_MSG_CONNECT_LOSSY; // this is fyi, actually, server decides!
1137 memset(&msg, 0, sizeof(msg));
1138 msgvec[0].iov_base = (char*)&connect;
1139 msgvec[0].iov_len = sizeof(connect);
1140 msg.msg_iov = msgvec;
1141 msg.msg_iovlen = 1;
1142 msglen = msgvec[0].iov_len;
1143 if (authorizer) {
1144 msgvec[1].iov_base = authorizer->bl.c_str();
1145 msgvec[1].iov_len = authorizer->bl.length();
1146 msg.msg_iovlen++;
1147 msglen += msgvec[1].iov_len;
1148 }
1149
1150 ldout(msgr->cct,10) << "connect sending gseq=" << gseq << " cseq=" << cseq
1151 << " proto=" << connect.protocol_version << dendl;
1152 rc = do_sendmsg(&msg, msglen);
1153 if (rc < 0) {
1154 ldout(msgr->cct,2) << "connect couldn't write gseq, cseq, " << cpp_strerror(rc) << dendl;
1155 goto fail;
1156 }
1157
1158 ldout(msgr->cct,20) << "connect wrote (self +) cseq, waiting for reply" << dendl;
1159 ceph_msg_connect_reply reply;
1160 rc = tcp_read((char*)&reply, sizeof(reply));
1161 if (rc < 0) {
1162 ldout(msgr->cct,2) << "connect read reply " << cpp_strerror(rc) << dendl;
1163 goto fail;
1164 }
1165
1166 ldout(msgr->cct,20) << "connect got reply tag " << (int)reply.tag
1167 << " connect_seq " << reply.connect_seq
1168 << " global_seq " << reply.global_seq
1169 << " proto " << reply.protocol_version
1170 << " flags " << (int)reply.flags
1171 << " features " << reply.features
1172 << dendl;
1173
1174 authorizer_reply.clear();
1175
1176 if (reply.authorizer_len) {
1177 ldout(msgr->cct,10) << "reply.authorizer_len=" << reply.authorizer_len << dendl;
1178 bufferptr bp = buffer::create(reply.authorizer_len);
1179 rc = tcp_read(bp.c_str(), reply.authorizer_len);
1180 if (rc < 0) {
1181 ldout(msgr->cct,10) << "connect couldn't read connect authorizer_reply" << cpp_strerror(rc) << dendl;
1182 goto fail;
1183 }
1184 authorizer_reply.push_back(bp);
1185 }
1186
1187 if (authorizer) {
1188 bufferlist::iterator iter = authorizer_reply.begin();
1189 if (!authorizer->verify_reply(iter)) {
1190 ldout(msgr->cct,0) << "failed verifying authorize reply" << dendl;
1191 goto fail;
1192 }
1193 }
1194
1195 if (conf->ms_inject_internal_delays) {
1196 ldout(msgr->cct, 10) << " sleep for " << msgr->cct->_conf->ms_inject_internal_delays << dendl;
1197 utime_t t;
1198 t.set_from_double(msgr->cct->_conf->ms_inject_internal_delays);
1199 t.sleep();
1200 }
1201
1202 pipe_lock.Lock();
1203 if (state != STATE_CONNECTING) {
1204 ldout(msgr->cct,0) << "connect got RESETSESSION but no longer connecting" << dendl;
1205 goto stop_locked;
1206 }
1207
1208 if (reply.tag == CEPH_MSGR_TAG_FEATURES) {
1209 ldout(msgr->cct,0) << "connect protocol feature mismatch, my " << std::hex
1210 << connect.features << " < peer " << reply.features
1211 << " missing " << (reply.features & ~policy.features_supported)
1212 << std::dec << dendl;
1213 goto fail_locked;
1214 }
1215
1216 if (reply.tag == CEPH_MSGR_TAG_BADPROTOVER) {
1217 ldout(msgr->cct,0) << "connect protocol version mismatch, my " << connect.protocol_version
1218 << " != " << reply.protocol_version << dendl;
1219 goto fail_locked;
1220 }
1221
1222 if (reply.tag == CEPH_MSGR_TAG_BADAUTHORIZER) {
1223 ldout(msgr->cct,0) << "connect got BADAUTHORIZER" << dendl;
1224 if (got_bad_auth)
1225 goto stop_locked;
1226 got_bad_auth = true;
1227 pipe_lock.Unlock();
1228 delete authorizer;
1229 authorizer = msgr->get_authorizer(peer_type, true); // try harder
1230 continue;
1231 }
1232 if (reply.tag == CEPH_MSGR_TAG_RESETSESSION) {
1233 ldout(msgr->cct,0) << "connect got RESETSESSION" << dendl;
1234 was_session_reset();
1235 cseq = 0;
1236 pipe_lock.Unlock();
1237 continue;
1238 }
1239 if (reply.tag == CEPH_MSGR_TAG_RETRY_GLOBAL) {
1240 gseq = msgr->get_global_seq(reply.global_seq);
1241 ldout(msgr->cct,10) << "connect got RETRY_GLOBAL " << reply.global_seq
1242 << " chose new " << gseq << dendl;
1243 pipe_lock.Unlock();
1244 continue;
1245 }
1246 if (reply.tag == CEPH_MSGR_TAG_RETRY_SESSION) {
1247 assert(reply.connect_seq > connect_seq);
1248 ldout(msgr->cct,10) << "connect got RETRY_SESSION " << connect_seq
1249 << " -> " << reply.connect_seq << dendl;
1250 cseq = connect_seq = reply.connect_seq;
1251 pipe_lock.Unlock();
1252 continue;
1253 }
1254
1255 if (reply.tag == CEPH_MSGR_TAG_WAIT) {
1256 ldout(msgr->cct,3) << "connect got WAIT (connection race)" << dendl;
1257 state = STATE_WAIT;
1258 goto stop_locked;
1259 }
1260
1261 if (reply.tag == CEPH_MSGR_TAG_READY ||
1262 reply.tag == CEPH_MSGR_TAG_SEQ) {
1263 uint64_t feat_missing = policy.features_required & ~(uint64_t)reply.features;
1264 if (feat_missing) {
1265 ldout(msgr->cct,1) << "missing required features " << std::hex << feat_missing << std::dec << dendl;
1266 goto fail_locked;
1267 }
1268
1269 if (reply.tag == CEPH_MSGR_TAG_SEQ) {
1270 ldout(msgr->cct,10) << "got CEPH_MSGR_TAG_SEQ, reading acked_seq and writing in_seq" << dendl;
1271 uint64_t newly_acked_seq = 0;
1272 rc = tcp_read((char*)&newly_acked_seq, sizeof(newly_acked_seq));
1273 if (rc < 0) {
1274 ldout(msgr->cct,2) << "connect read error on newly_acked_seq" << cpp_strerror(rc) << dendl;
1275 goto fail_locked;
1276 }
1277 ldout(msgr->cct,2) << " got newly_acked_seq " << newly_acked_seq
1278 << " vs out_seq " << out_seq << dendl;
1279 while (newly_acked_seq > out_seq) {
1280 Message *m = _get_next_outgoing();
1281 assert(m);
1282 ldout(msgr->cct,2) << " discarding previously sent " << m->get_seq()
1283 << " " << *m << dendl;
1284 assert(m->get_seq() <= newly_acked_seq);
1285 m->put();
1286 ++out_seq;
1287 }
1288 if (tcp_write((char*)&in_seq, sizeof(in_seq)) < 0) {
1289 ldout(msgr->cct,2) << "connect write error on in_seq" << dendl;
1290 goto fail_locked;
1291 }
1292 }
1293
1294 // hooray!
1295 peer_global_seq = reply.global_seq;
1296 policy.lossy = reply.flags & CEPH_MSG_CONNECT_LOSSY;
1297 state = STATE_OPEN;
1298 connect_seq = cseq + 1;
1299 assert(connect_seq == reply.connect_seq);
1300 backoff = utime_t();
1301 connection_state->set_features((uint64_t)reply.features & (uint64_t)connect.features);
1302 ldout(msgr->cct,10) << "connect success " << connect_seq << ", lossy = " << policy.lossy
1303 << ", features " << connection_state->get_features() << dendl;
1304
1305
1306 // If we have an authorizer, get a new AuthSessionHandler to deal with ongoing security of the
1307 // connection. PLR
1308
1309 if (authorizer != NULL) {
1310 session_security.reset(
1311 get_auth_session_handler(msgr->cct,
1312 authorizer->protocol,
1313 authorizer->session_key,
1314 connection_state->get_features()));
1315 } else {
1316 // We have no authorizer, so we shouldn't be applying security to messages in this pipe. PLR
1317 session_security.reset();
1318 }
1319
1320 msgr->dispatch_queue.queue_connect(connection_state.get());
1321 msgr->ms_deliver_handle_fast_connect(connection_state.get());
1322
1323 if (!reader_running) {
1324 ldout(msgr->cct,20) << "connect starting reader" << dendl;
1325 start_reader();
1326 }
1327 maybe_start_delay_thread();
1328 delete authorizer;
1329 return 0;
1330 }
1331
1332 // protocol error
1333 ldout(msgr->cct,0) << "connect got bad tag " << (int)tag << dendl;
1334 goto fail_locked;
1335 }
1336
1337 fail:
1338 if (conf->ms_inject_internal_delays) {
1339 ldout(msgr->cct, 10) << " sleep for " << msgr->cct->_conf->ms_inject_internal_delays << dendl;
1340 utime_t t;
1341 t.set_from_double(msgr->cct->_conf->ms_inject_internal_delays);
1342 t.sleep();
1343 }
1344
1345 pipe_lock.Lock();
1346 fail_locked:
1347 if (state == STATE_CONNECTING)
1348 fault();
1349 else
1350 ldout(msgr->cct,3) << "connect fault, but state = " << get_state_name()
1351 << " != connecting, stopping" << dendl;
1352
1353 stop_locked:
1354 delete authorizer;
1355 return rc;
1356 }
1357
1358 void Pipe::register_pipe()
1359 {
1360 ldout(msgr->cct,10) << "register_pipe" << dendl;
1361 assert(msgr->lock.is_locked());
1362 Pipe *existing = msgr->_lookup_pipe(peer_addr);
1363 assert(existing == NULL);
1364 msgr->rank_pipe[peer_addr] = this;
1365 }
1366
1367 void Pipe::unregister_pipe()
1368 {
1369 assert(msgr->lock.is_locked());
1370 ceph::unordered_map<entity_addr_t,Pipe*>::iterator p = msgr->rank_pipe.find(peer_addr);
1371 if (p != msgr->rank_pipe.end() && p->second == this) {
1372 ldout(msgr->cct,10) << "unregister_pipe" << dendl;
1373 msgr->rank_pipe.erase(p);
1374 } else {
1375 ldout(msgr->cct,10) << "unregister_pipe - not registered" << dendl;
1376 msgr->accepting_pipes.erase(this); // somewhat overkill, but safe.
1377 }
1378 }
1379
1380 void Pipe::join()
1381 {
1382 ldout(msgr->cct, 20) << "join" << dendl;
1383 if (writer_thread.is_started())
1384 writer_thread.join();
1385 if (reader_thread.is_started())
1386 reader_thread.join();
1387 if (delay_thread) {
1388 ldout(msgr->cct, 20) << "joining delay_thread" << dendl;
1389 delay_thread->stop();
1390 delay_thread->join();
1391 }
1392 }
1393
1394 void Pipe::requeue_sent()
1395 {
1396 if (sent.empty())
1397 return;
1398
1399 list<Message*>& rq = out_q[CEPH_MSG_PRIO_HIGHEST];
1400 while (!sent.empty()) {
1401 Message *m = sent.back();
1402 sent.pop_back();
1403 ldout(msgr->cct,10) << "requeue_sent " << *m << " for resend seq " << out_seq
1404 << " (" << m->get_seq() << ")" << dendl;
1405 rq.push_front(m);
1406 out_seq--;
1407 }
1408 }
1409
1410 void Pipe::discard_requeued_up_to(uint64_t seq)
1411 {
1412 ldout(msgr->cct, 10) << "discard_requeued_up_to " << seq << dendl;
1413 if (out_q.count(CEPH_MSG_PRIO_HIGHEST) == 0)
1414 return;
1415 list<Message*>& rq = out_q[CEPH_MSG_PRIO_HIGHEST];
1416 while (!rq.empty()) {
1417 Message *m = rq.front();
1418 if (m->get_seq() == 0 || m->get_seq() > seq)
1419 break;
1420 ldout(msgr->cct,10) << "discard_requeued_up_to " << *m << " for resend seq " << out_seq
1421 << " <= " << seq << ", discarding" << dendl;
1422 m->put();
1423 rq.pop_front();
1424 out_seq++;
1425 }
1426 if (rq.empty())
1427 out_q.erase(CEPH_MSG_PRIO_HIGHEST);
1428 }
1429
1430 /*
1431 * Tears down the Pipe's message queues, and removes them from the DispatchQueue
1432 * Must hold pipe_lock prior to calling.
1433 */
1434 void Pipe::discard_out_queue()
1435 {
1436 ldout(msgr->cct,10) << "discard_queue" << dendl;
1437
1438 for (list<Message*>::iterator p = sent.begin(); p != sent.end(); ++p) {
1439 ldout(msgr->cct,20) << " discard " << *p << dendl;
1440 (*p)->put();
1441 }
1442 sent.clear();
1443 for (map<int,list<Message*> >::iterator p = out_q.begin(); p != out_q.end(); ++p)
1444 for (list<Message*>::iterator r = p->second.begin(); r != p->second.end(); ++r) {
1445 ldout(msgr->cct,20) << " discard " << *r << dendl;
1446 (*r)->put();
1447 }
1448 out_q.clear();
1449 }
1450
1451 void Pipe::fault(bool onread)
1452 {
1453 const md_config_t *conf = msgr->cct->_conf;
1454 assert(pipe_lock.is_locked());
1455 cond.Signal();
1456
1457 if (onread && state == STATE_CONNECTING) {
1458 ldout(msgr->cct,10) << "fault already connecting, reader shutting down" << dendl;
1459 return;
1460 }
1461
1462 ldout(msgr->cct,2) << "fault " << cpp_strerror(errno) << dendl;
1463
1464 if (state == STATE_CLOSED ||
1465 state == STATE_CLOSING) {
1466 ldout(msgr->cct,10) << "fault already closed|closing" << dendl;
1467 if (connection_state->clear_pipe(this))
1468 msgr->dispatch_queue.queue_reset(connection_state.get());
1469 return;
1470 }
1471
1472 shutdown_socket();
1473
1474 // lossy channel?
1475 if (policy.lossy && state != STATE_CONNECTING) {
1476 ldout(msgr->cct,10) << "fault on lossy channel, failing" << dendl;
1477
1478 // disconnect from Connection, and mark it failed. future messages
1479 // will be dropped.
1480 assert(connection_state);
1481 stop();
1482 bool cleared = connection_state->clear_pipe(this);
1483
1484 // crib locks, blech. note that Pipe is now STATE_CLOSED and the
1485 // rank_pipe entry is ignored by others.
1486 pipe_lock.Unlock();
1487
1488 if (conf->ms_inject_internal_delays) {
1489 ldout(msgr->cct, 10) << " sleep for " << msgr->cct->_conf->ms_inject_internal_delays << dendl;
1490 utime_t t;
1491 t.set_from_double(msgr->cct->_conf->ms_inject_internal_delays);
1492 t.sleep();
1493 }
1494
1495 msgr->lock.Lock();
1496 pipe_lock.Lock();
1497 unregister_pipe();
1498 msgr->lock.Unlock();
1499
1500 if (delay_thread)
1501 delay_thread->discard();
1502 in_q->discard_queue(conn_id);
1503 discard_out_queue();
1504 if (cleared)
1505 msgr->dispatch_queue.queue_reset(connection_state.get());
1506 return;
1507 }
1508
1509 // queue delayed items immediately
1510 if (delay_thread)
1511 delay_thread->flush();
1512
1513 // requeue sent items
1514 requeue_sent();
1515
1516 if (policy.standby && !is_queued()) {
1517 ldout(msgr->cct,0) << "fault with nothing to send, going to standby" << dendl;
1518 state = STATE_STANDBY;
1519 return;
1520 }
1521
1522 if (state != STATE_CONNECTING) {
1523 if (policy.server) {
1524 ldout(msgr->cct,0) << "fault, server, going to standby" << dendl;
1525 state = STATE_STANDBY;
1526 } else {
1527 ldout(msgr->cct,0) << "fault, initiating reconnect" << dendl;
1528 connect_seq++;
1529 state = STATE_CONNECTING;
1530 }
1531 backoff = utime_t();
1532 } else if (backoff == utime_t()) {
1533 ldout(msgr->cct,0) << "fault" << dendl;
1534 backoff.set_from_double(conf->ms_initial_backoff);
1535 } else {
1536 ldout(msgr->cct,10) << "fault waiting " << backoff << dendl;
1537 cond.WaitInterval(pipe_lock, backoff);
1538 backoff += backoff;
1539 if (backoff > conf->ms_max_backoff)
1540 backoff.set_from_double(conf->ms_max_backoff);
1541 ldout(msgr->cct,10) << "fault done waiting or woke up" << dendl;
1542 }
1543 }
1544
1545 int Pipe::randomize_out_seq()
1546 {
1547 if (connection_state->get_features() & CEPH_FEATURE_MSG_AUTH) {
1548 // Set out_seq to a random value, so CRC won't be predictable. Don't bother checking seq_error
1549 // here. We'll check it on the call. PLR
1550 int seq_error = get_random_bytes((char *)&out_seq, sizeof(out_seq));
1551 out_seq &= SEQ_MASK;
1552 lsubdout(msgr->cct, ms, 10) << "randomize_out_seq " << out_seq << dendl;
1553 return seq_error;
1554 } else {
1555 // previously, seq #'s always started at 0.
1556 out_seq = 0;
1557 return 0;
1558 }
1559 }
1560
1561 void Pipe::was_session_reset()
1562 {
1563 assert(pipe_lock.is_locked());
1564
1565 ldout(msgr->cct,10) << "was_session_reset" << dendl;
1566 in_q->discard_queue(conn_id);
1567 if (delay_thread)
1568 delay_thread->discard();
1569 discard_out_queue();
1570
1571 msgr->dispatch_queue.queue_remote_reset(connection_state.get());
1572
1573 if (randomize_out_seq()) {
1574 lsubdout(msgr->cct,ms,15) << "was_session_reset(): Could not get random bytes to set seq number for session reset; set seq number to " << out_seq << dendl;
1575 }
1576
1577 in_seq = 0;
1578 connect_seq = 0;
1579 }
1580
1581 void Pipe::stop()
1582 {
1583 ldout(msgr->cct,10) << "stop" << dendl;
1584 assert(pipe_lock.is_locked());
1585 state = STATE_CLOSED;
1586 state_closed = true;
1587 cond.Signal();
1588 shutdown_socket();
1589 }
1590
1591 void Pipe::stop_and_wait()
1592 {
1593 assert(pipe_lock.is_locked_by_me());
1594 if (state != STATE_CLOSED)
1595 stop();
1596
1597 if (msgr->cct->_conf->ms_inject_internal_delays) {
1598 ldout(msgr->cct, 10) << __func__ << " sleep for "
1599 << msgr->cct->_conf->ms_inject_internal_delays
1600 << dendl;
1601 utime_t t;
1602 t.set_from_double(msgr->cct->_conf->ms_inject_internal_delays);
1603 t.sleep();
1604 }
1605
1606 if (delay_thread) {
1607 pipe_lock.Unlock();
1608 delay_thread->stop_fast_dispatching();
1609 pipe_lock.Lock();
1610 }
1611 while (reader_running &&
1612 reader_dispatching)
1613 cond.Wait(pipe_lock);
1614 }
1615
1616 /* read msgs from socket.
1617 * also, server.
1618 */
1619 void Pipe::reader()
1620 {
1621 pipe_lock.Lock();
1622
1623 if (state == STATE_ACCEPTING) {
1624 accept();
1625 assert(pipe_lock.is_locked());
1626 }
1627
1628 // loop.
1629 while (state != STATE_CLOSED &&
1630 state != STATE_CONNECTING) {
1631 assert(pipe_lock.is_locked());
1632
1633 // sleep if (re)connecting
1634 if (state == STATE_STANDBY) {
1635 ldout(msgr->cct,20) << "reader sleeping during reconnect|standby" << dendl;
1636 cond.Wait(pipe_lock);
1637 continue;
1638 }
1639
1640 // get a reference to the AuthSessionHandler while we have the pipe_lock
1641 ceph::shared_ptr<AuthSessionHandler> auth_handler = session_security;
1642
1643 pipe_lock.Unlock();
1644
1645 char tag = -1;
1646 ldout(msgr->cct,20) << "reader reading tag..." << dendl;
1647 if (tcp_read((char*)&tag, 1) < 0) {
1648 pipe_lock.Lock();
1649 ldout(msgr->cct,2) << "reader couldn't read tag, " << cpp_strerror(errno) << dendl;
1650 fault(true);
1651 continue;
1652 }
1653
1654 if (tag == CEPH_MSGR_TAG_KEEPALIVE) {
1655 ldout(msgr->cct,2) << "reader got KEEPALIVE" << dendl;
1656 pipe_lock.Lock();
1657 connection_state->set_last_keepalive(ceph_clock_now());
1658 continue;
1659 }
1660 if (tag == CEPH_MSGR_TAG_KEEPALIVE2) {
1661 ldout(msgr->cct,30) << "reader got KEEPALIVE2 tag ..." << dendl;
1662 ceph_timespec t;
1663 int rc = tcp_read((char*)&t, sizeof(t));
1664 pipe_lock.Lock();
1665 if (rc < 0) {
1666 ldout(msgr->cct,2) << "reader couldn't read KEEPALIVE2 stamp "
1667 << cpp_strerror(errno) << dendl;
1668 fault(true);
1669 } else {
1670 send_keepalive_ack = true;
1671 keepalive_ack_stamp = utime_t(t);
1672 ldout(msgr->cct,2) << "reader got KEEPALIVE2 " << keepalive_ack_stamp
1673 << dendl;
1674 connection_state->set_last_keepalive(ceph_clock_now());
1675 cond.Signal();
1676 }
1677 continue;
1678 }
1679 if (tag == CEPH_MSGR_TAG_KEEPALIVE2_ACK) {
1680 ldout(msgr->cct,2) << "reader got KEEPALIVE_ACK" << dendl;
1681 struct ceph_timespec t;
1682 int rc = tcp_read((char*)&t, sizeof(t));
1683 pipe_lock.Lock();
1684 if (rc < 0) {
1685 ldout(msgr->cct,2) << "reader couldn't read KEEPALIVE2 stamp " << cpp_strerror(errno) << dendl;
1686 fault(true);
1687 } else {
1688 connection_state->set_last_keepalive_ack(utime_t(t));
1689 }
1690 continue;
1691 }
1692
1693 // open ...
1694 if (tag == CEPH_MSGR_TAG_ACK) {
1695 ldout(msgr->cct,20) << "reader got ACK" << dendl;
1696 ceph_le64 seq;
1697 int rc = tcp_read((char*)&seq, sizeof(seq));
1698 pipe_lock.Lock();
1699 if (rc < 0) {
1700 ldout(msgr->cct,2) << "reader couldn't read ack seq, " << cpp_strerror(errno) << dendl;
1701 fault(true);
1702 } else if (state != STATE_CLOSED) {
1703 handle_ack(seq);
1704 }
1705 continue;
1706 }
1707
1708 else if (tag == CEPH_MSGR_TAG_MSG) {
1709 ldout(msgr->cct,20) << "reader got MSG" << dendl;
1710 Message *m = 0;
1711 int r = read_message(&m, auth_handler.get());
1712
1713 pipe_lock.Lock();
1714
1715 if (!m) {
1716 if (r < 0)
1717 fault(true);
1718 continue;
1719 }
1720
1721 m->trace.event("pipe read message");
1722
1723 if (state == STATE_CLOSED ||
1724 state == STATE_CONNECTING) {
1725 in_q->dispatch_throttle_release(m->get_dispatch_throttle_size());
1726 m->put();
1727 continue;
1728 }
1729
1730 // check received seq#. if it is old, drop the message.
1731 // note that incoming messages may skip ahead. this is convenient for the client
1732 // side queueing because messages can't be renumbered, but the (kernel) client will
1733 // occasionally pull a message out of the sent queue to send elsewhere. in that case
1734 // it doesn't matter if we "got" it or not.
1735 if (m->get_seq() <= in_seq) {
1736 ldout(msgr->cct,0) << "reader got old message "
1737 << m->get_seq() << " <= " << in_seq << " " << m << " " << *m
1738 << ", discarding" << dendl;
1739 in_q->dispatch_throttle_release(m->get_dispatch_throttle_size());
1740 m->put();
1741 if (connection_state->has_feature(CEPH_FEATURE_RECONNECT_SEQ) &&
1742 msgr->cct->_conf->ms_die_on_old_message)
1743 assert(0 == "old msgs despite reconnect_seq feature");
1744 continue;
1745 }
1746 if (m->get_seq() > in_seq + 1) {
1747 ldout(msgr->cct,0) << "reader missed message? skipped from seq "
1748 << in_seq << " to " << m->get_seq() << dendl;
1749 if (msgr->cct->_conf->ms_die_on_skipped_message)
1750 assert(0 == "skipped incoming seq");
1751 }
1752
1753 m->set_connection(connection_state.get());
1754
1755 // note last received message.
1756 in_seq = m->get_seq();
1757
1758 cond.Signal(); // wake up writer, to ack this
1759
1760 ldout(msgr->cct,10) << "reader got message "
1761 << m->get_seq() << " " << m << " " << *m
1762 << dendl;
1763 in_q->fast_preprocess(m);
1764
1765 if (delay_thread) {
1766 utime_t release;
1767 if (rand() % 10000 < msgr->cct->_conf->ms_inject_delay_probability * 10000.0) {
1768 release = m->get_recv_stamp();
1769 release += msgr->cct->_conf->ms_inject_delay_max * (double)(rand() % 10000) / 10000.0;
1770 lsubdout(msgr->cct, ms, 1) << "queue_received will delay until " << release << " on " << m << " " << *m << dendl;
1771 }
1772 delay_thread->queue(release, m);
1773 } else {
1774 if (in_q->can_fast_dispatch(m)) {
1775 reader_dispatching = true;
1776 pipe_lock.Unlock();
1777 in_q->fast_dispatch(m);
1778 pipe_lock.Lock();
1779 reader_dispatching = false;
1780 if (state == STATE_CLOSED ||
1781 notify_on_dispatch_done) { // there might be somebody waiting
1782 notify_on_dispatch_done = false;
1783 cond.Signal();
1784 }
1785 } else {
1786 in_q->enqueue(m, m->get_priority(), conn_id);
1787 }
1788 }
1789 }
1790
1791 else if (tag == CEPH_MSGR_TAG_CLOSE) {
1792 ldout(msgr->cct,20) << "reader got CLOSE" << dendl;
1793 pipe_lock.Lock();
1794 if (state == STATE_CLOSING) {
1795 state = STATE_CLOSED;
1796 state_closed = true;
1797 } else {
1798 state = STATE_CLOSING;
1799 }
1800 cond.Signal();
1801 break;
1802 }
1803 else {
1804 ldout(msgr->cct,0) << "reader bad tag " << (int)tag << dendl;
1805 pipe_lock.Lock();
1806 fault(true);
1807 }
1808 }
1809
1810
1811 // reap?
1812 reader_running = false;
1813 reader_needs_join = true;
1814 unlock_maybe_reap();
1815 ldout(msgr->cct,10) << "reader done" << dendl;
1816 }
1817
1818 /* write msgs to socket.
1819 * also, client.
1820 */
1821 void Pipe::writer()
1822 {
1823 pipe_lock.Lock();
1824 while (state != STATE_CLOSED) {// && state != STATE_WAIT) {
1825 ldout(msgr->cct,10) << "writer: state = " << get_state_name()
1826 << " policy.server=" << policy.server << dendl;
1827
1828 // standby?
1829 if (is_queued() && state == STATE_STANDBY && !policy.server)
1830 state = STATE_CONNECTING;
1831
1832 // connect?
1833 if (state == STATE_CONNECTING) {
1834 assert(!policy.server);
1835 connect();
1836 continue;
1837 }
1838
1839 if (state == STATE_CLOSING) {
1840 // write close tag
1841 ldout(msgr->cct,20) << "writer writing CLOSE tag" << dendl;
1842 char tag = CEPH_MSGR_TAG_CLOSE;
1843 state = STATE_CLOSED;
1844 state_closed = true;
1845 pipe_lock.Unlock();
1846 if (sd >= 0) {
1847 // we can ignore return value, actually; we don't care if this succeeds.
1848 int r = ::write(sd, &tag, 1);
1849 (void)r;
1850 }
1851 pipe_lock.Lock();
1852 continue;
1853 }
1854
1855 if (state != STATE_CONNECTING && state != STATE_WAIT && state != STATE_STANDBY &&
1856 (is_queued() || in_seq > in_seq_acked)) {
1857
1858 // keepalive?
1859 if (send_keepalive) {
1860 int rc;
1861 if (connection_state->has_feature(CEPH_FEATURE_MSGR_KEEPALIVE2)) {
1862 pipe_lock.Unlock();
1863 rc = write_keepalive2(CEPH_MSGR_TAG_KEEPALIVE2,
1864 ceph_clock_now());
1865 } else {
1866 pipe_lock.Unlock();
1867 rc = write_keepalive();
1868 }
1869 pipe_lock.Lock();
1870 if (rc < 0) {
1871 ldout(msgr->cct,2) << "writer couldn't write keepalive[2], "
1872 << cpp_strerror(errno) << dendl;
1873 fault();
1874 continue;
1875 }
1876 send_keepalive = false;
1877 }
1878 if (send_keepalive_ack) {
1879 utime_t t = keepalive_ack_stamp;
1880 pipe_lock.Unlock();
1881 int rc = write_keepalive2(CEPH_MSGR_TAG_KEEPALIVE2_ACK, t);
1882 pipe_lock.Lock();
1883 if (rc < 0) {
1884 ldout(msgr->cct,2) << "writer couldn't write keepalive_ack, " << cpp_strerror(errno) << dendl;
1885 fault();
1886 continue;
1887 }
1888 send_keepalive_ack = false;
1889 }
1890
1891 // send ack?
1892 if (in_seq > in_seq_acked) {
1893 uint64_t send_seq = in_seq;
1894 pipe_lock.Unlock();
1895 int rc = write_ack(send_seq);
1896 pipe_lock.Lock();
1897 if (rc < 0) {
1898 ldout(msgr->cct,2) << "writer couldn't write ack, " << cpp_strerror(errno) << dendl;
1899 fault();
1900 continue;
1901 }
1902 in_seq_acked = send_seq;
1903 }
1904
1905 // grab outgoing message
1906 Message *m = _get_next_outgoing();
1907 if (m) {
1908 m->set_seq(++out_seq);
1909 if (!policy.lossy) {
1910 // put on sent list
1911 sent.push_back(m);
1912 m->get();
1913 }
1914
1915 // associate message with Connection (for benefit of encode_payload)
1916 m->set_connection(connection_state.get());
1917
1918 uint64_t features = connection_state->get_features();
1919
1920 if (m->empty_payload())
1921 ldout(msgr->cct,20) << "writer encoding " << m->get_seq() << " features " << features
1922 << " " << m << " " << *m << dendl;
1923 else
1924 ldout(msgr->cct,20) << "writer half-reencoding " << m->get_seq() << " features " << features
1925 << " " << m << " " << *m << dendl;
1926
1927 // encode and copy out of *m
1928 m->encode(features, msgr->crcflags);
1929
1930 // prepare everything
1931 const ceph_msg_header& header = m->get_header();
1932 const ceph_msg_footer& footer = m->get_footer();
1933
1934 // Now that we have all the crcs calculated, handle the
1935 // digital signature for the message, if the pipe has session
1936 // security set up. Some session security options do not
1937 // actually calculate and check the signature, but they should
1938 // handle the calls to sign_message and check_signature. PLR
1939 if (session_security.get() == NULL) {
1940 ldout(msgr->cct, 20) << "writer no session security" << dendl;
1941 } else {
1942 if (session_security->sign_message(m)) {
1943 ldout(msgr->cct, 20) << "writer failed to sign seq # " << header.seq
1944 << "): sig = " << footer.sig << dendl;
1945 } else {
1946 ldout(msgr->cct, 20) << "writer signed seq # " << header.seq
1947 << "): sig = " << footer.sig << dendl;
1948 }
1949 }
1950
1951 bufferlist blist = m->get_payload();
1952 blist.append(m->get_middle());
1953 blist.append(m->get_data());
1954
1955 pipe_lock.Unlock();
1956
1957 m->trace.event("pipe writing message");
1958
1959 ldout(msgr->cct,20) << "writer sending " << m->get_seq() << " " << m << dendl;
1960 int rc = write_message(header, footer, blist);
1961
1962 pipe_lock.Lock();
1963 if (rc < 0) {
1964 ldout(msgr->cct,1) << "writer error sending " << m << ", "
1965 << cpp_strerror(errno) << dendl;
1966 fault();
1967 }
1968 m->put();
1969 }
1970 continue;
1971 }
1972
1973 // wait
1974 ldout(msgr->cct,20) << "writer sleeping" << dendl;
1975 cond.Wait(pipe_lock);
1976 }
1977
1978 ldout(msgr->cct,20) << "writer finishing" << dendl;
1979
1980 // reap?
1981 writer_running = false;
1982 unlock_maybe_reap();
1983 ldout(msgr->cct,10) << "writer done" << dendl;
1984 }
1985
1986 void Pipe::unlock_maybe_reap()
1987 {
1988 if (!reader_running && !writer_running) {
1989 shutdown_socket();
1990 pipe_lock.Unlock();
1991 if (delay_thread && delay_thread->is_flushing()) {
1992 delay_thread->wait_for_flush();
1993 }
1994 msgr->queue_reap(this);
1995 } else {
1996 pipe_lock.Unlock();
1997 }
1998 }
1999
2000 static void alloc_aligned_buffer(bufferlist& data, unsigned len, unsigned off)
2001 {
2002 // create a buffer to read into that matches the data alignment
2003 unsigned left = len;
2004 if (off & ~CEPH_PAGE_MASK) {
2005 // head
2006 unsigned head = 0;
2007 head = MIN(CEPH_PAGE_SIZE - (off & ~CEPH_PAGE_MASK), left);
2008 data.push_back(buffer::create(head));
2009 left -= head;
2010 }
2011 unsigned middle = left & CEPH_PAGE_MASK;
2012 if (middle > 0) {
2013 data.push_back(buffer::create_page_aligned(middle));
2014 left -= middle;
2015 }
2016 if (left) {
2017 data.push_back(buffer::create(left));
2018 }
2019 }
2020
2021 int Pipe::read_message(Message **pm, AuthSessionHandler* auth_handler)
2022 {
2023 int ret = -1;
2024 // envelope
2025 //ldout(msgr->cct,10) << "receiver.read_message from sd " << sd << dendl;
2026
2027 ceph_msg_header header;
2028 ceph_msg_footer footer;
2029 __u32 header_crc = 0;
2030
2031 if (connection_state->has_feature(CEPH_FEATURE_NOSRCADDR)) {
2032 if (tcp_read((char*)&header, sizeof(header)) < 0)
2033 return -1;
2034 if (msgr->crcflags & MSG_CRC_HEADER) {
2035 header_crc = ceph_crc32c(0, (unsigned char *)&header, sizeof(header) - sizeof(header.crc));
2036 }
2037 } else {
2038 ceph_msg_header_old oldheader;
2039 if (tcp_read((char*)&oldheader, sizeof(oldheader)) < 0)
2040 return -1;
2041 // this is fugly
2042 memcpy(&header, &oldheader, sizeof(header));
2043 header.src = oldheader.src.name;
2044 header.reserved = oldheader.reserved;
2045 if (msgr->crcflags & MSG_CRC_HEADER) {
2046 header.crc = oldheader.crc;
2047 header_crc = ceph_crc32c(0, (unsigned char *)&oldheader, sizeof(oldheader) - sizeof(oldheader.crc));
2048 }
2049 }
2050
2051 ldout(msgr->cct,20) << "reader got envelope type=" << header.type
2052 << " src " << entity_name_t(header.src)
2053 << " front=" << header.front_len
2054 << " data=" << header.data_len
2055 << " off " << header.data_off
2056 << dendl;
2057
2058 // verify header crc
2059 if ((msgr->crcflags & MSG_CRC_HEADER) && header_crc != header.crc) {
2060 ldout(msgr->cct,0) << "reader got bad header crc " << header_crc << " != " << header.crc << dendl;
2061 return -1;
2062 }
2063
2064 bufferlist front, middle, data;
2065 int front_len, middle_len;
2066 unsigned data_len, data_off;
2067 int aborted;
2068 Message *message;
2069 utime_t recv_stamp = ceph_clock_now();
2070
2071 if (policy.throttler_messages) {
2072 ldout(msgr->cct,10) << "reader wants " << 1 << " message from policy throttler "
2073 << policy.throttler_messages->get_current() << "/"
2074 << policy.throttler_messages->get_max() << dendl;
2075 policy.throttler_messages->get();
2076 }
2077
2078 uint64_t message_size = header.front_len + header.middle_len + header.data_len;
2079 if (message_size) {
2080 if (policy.throttler_bytes) {
2081 ldout(msgr->cct,10) << "reader wants " << message_size << " bytes from policy throttler "
2082 << policy.throttler_bytes->get_current() << "/"
2083 << policy.throttler_bytes->get_max() << dendl;
2084 policy.throttler_bytes->get(message_size);
2085 }
2086
2087 // throttle total bytes waiting for dispatch. do this _after_ the
2088 // policy throttle, as this one does not deadlock (unless dispatch
2089 // blocks indefinitely, which it shouldn't). in contrast, the
2090 // policy throttle carries for the lifetime of the message.
2091 ldout(msgr->cct,10) << "reader wants " << message_size << " from dispatch throttler "
2092 << in_q->dispatch_throttler.get_current() << "/"
2093 << in_q->dispatch_throttler.get_max() << dendl;
2094 in_q->dispatch_throttler.get(message_size);
2095 }
2096
2097 utime_t throttle_stamp = ceph_clock_now();
2098
2099 // read front
2100 front_len = header.front_len;
2101 if (front_len) {
2102 bufferptr bp = buffer::create(front_len);
2103 if (tcp_read(bp.c_str(), front_len) < 0)
2104 goto out_dethrottle;
2105 front.push_back(std::move(bp));
2106 ldout(msgr->cct,20) << "reader got front " << front.length() << dendl;
2107 }
2108
2109 // read middle
2110 middle_len = header.middle_len;
2111 if (middle_len) {
2112 bufferptr bp = buffer::create(middle_len);
2113 if (tcp_read(bp.c_str(), middle_len) < 0)
2114 goto out_dethrottle;
2115 middle.push_back(std::move(bp));
2116 ldout(msgr->cct,20) << "reader got middle " << middle.length() << dendl;
2117 }
2118
2119
2120 // read data
2121 data_len = le32_to_cpu(header.data_len);
2122 data_off = le32_to_cpu(header.data_off);
2123 if (data_len) {
2124 unsigned offset = 0;
2125 unsigned left = data_len;
2126
2127 bufferlist newbuf, rxbuf;
2128 bufferlist::iterator blp;
2129 int rxbuf_version = 0;
2130
2131 while (left > 0) {
2132 // wait for data
2133 if (tcp_read_wait() < 0)
2134 goto out_dethrottle;
2135
2136 // get a buffer
2137 connection_state->lock.Lock();
2138 map<ceph_tid_t,pair<bufferlist,int> >::iterator p = connection_state->rx_buffers.find(header.tid);
2139 if (p != connection_state->rx_buffers.end()) {
2140 if (rxbuf.length() == 0 || p->second.second != rxbuf_version) {
2141 ldout(msgr->cct,10) << "reader seleting rx buffer v " << p->second.second
2142 << " at offset " << offset
2143 << " len " << p->second.first.length() << dendl;
2144 rxbuf = p->second.first;
2145 rxbuf_version = p->second.second;
2146 // make sure it's big enough
2147 if (rxbuf.length() < data_len)
2148 rxbuf.push_back(buffer::create(data_len - rxbuf.length()));
2149 blp = p->second.first.begin();
2150 blp.advance(offset);
2151 }
2152 } else {
2153 if (!newbuf.length()) {
2154 ldout(msgr->cct,20) << "reader allocating new rx buffer at offset " << offset << dendl;
2155 alloc_aligned_buffer(newbuf, data_len, data_off);
2156 blp = newbuf.begin();
2157 blp.advance(offset);
2158 }
2159 }
2160 bufferptr bp = blp.get_current_ptr();
2161 int read = MIN(bp.length(), left);
2162 ldout(msgr->cct,20) << "reader reading nonblocking into " << (void*)bp.c_str() << " len " << bp.length() << dendl;
2163 ssize_t got = tcp_read_nonblocking(bp.c_str(), read);
2164 ldout(msgr->cct,30) << "reader read " << got << " of " << read << dendl;
2165 connection_state->lock.Unlock();
2166 if (got < 0)
2167 goto out_dethrottle;
2168 if (got > 0) {
2169 blp.advance(got);
2170 data.append(bp, 0, got);
2171 offset += got;
2172 left -= got;
2173 } // else we got a signal or something; just loop.
2174 }
2175 }
2176
2177 // footer
2178 if (connection_state->has_feature(CEPH_FEATURE_MSG_AUTH)) {
2179 if (tcp_read((char*)&footer, sizeof(footer)) < 0)
2180 goto out_dethrottle;
2181 } else {
2182 ceph_msg_footer_old old_footer;
2183 if (tcp_read((char*)&old_footer, sizeof(old_footer)) < 0)
2184 goto out_dethrottle;
2185 footer.front_crc = old_footer.front_crc;
2186 footer.middle_crc = old_footer.middle_crc;
2187 footer.data_crc = old_footer.data_crc;
2188 footer.sig = 0;
2189 footer.flags = old_footer.flags;
2190 }
2191
2192 aborted = (footer.flags & CEPH_MSG_FOOTER_COMPLETE) == 0;
2193 ldout(msgr->cct,10) << "aborted = " << aborted << dendl;
2194 if (aborted) {
2195 ldout(msgr->cct,0) << "reader got " << front.length() << " + " << middle.length() << " + " << data.length()
2196 << " byte message.. ABORTED" << dendl;
2197 ret = 0;
2198 goto out_dethrottle;
2199 }
2200
2201 ldout(msgr->cct,20) << "reader got " << front.length() << " + " << middle.length() << " + " << data.length()
2202 << " byte message" << dendl;
2203 message = decode_message(msgr->cct, msgr->crcflags, header, footer,
2204 front, middle, data, connection_state.get());
2205 if (!message) {
2206 ret = -EINVAL;
2207 goto out_dethrottle;
2208 }
2209
2210 //
2211 // Check the signature if one should be present. A zero return indicates success. PLR
2212 //
2213
2214 if (auth_handler == NULL) {
2215 ldout(msgr->cct, 10) << "No session security set" << dendl;
2216 } else {
2217 if (auth_handler->check_message_signature(message)) {
2218 ldout(msgr->cct, 0) << "Signature check failed" << dendl;
2219 message->put();
2220 ret = -EINVAL;
2221 goto out_dethrottle;
2222 }
2223 }
2224
2225 message->set_byte_throttler(policy.throttler_bytes);
2226 message->set_message_throttler(policy.throttler_messages);
2227
2228 // store reservation size in message, so we don't get confused
2229 // by messages entering the dispatch queue through other paths.
2230 message->set_dispatch_throttle_size(message_size);
2231
2232 message->set_recv_stamp(recv_stamp);
2233 message->set_throttle_stamp(throttle_stamp);
2234 message->set_recv_complete_stamp(ceph_clock_now());
2235
2236 *pm = message;
2237 return 0;
2238
2239 out_dethrottle:
2240 // release bytes reserved from the throttlers on failure
2241 if (policy.throttler_messages) {
2242 ldout(msgr->cct,10) << "reader releasing " << 1 << " message to policy throttler "
2243 << policy.throttler_messages->get_current() << "/"
2244 << policy.throttler_messages->get_max() << dendl;
2245 policy.throttler_messages->put();
2246 }
2247 if (message_size) {
2248 if (policy.throttler_bytes) {
2249 ldout(msgr->cct,10) << "reader releasing " << message_size << " bytes to policy throttler "
2250 << policy.throttler_bytes->get_current() << "/"
2251 << policy.throttler_bytes->get_max() << dendl;
2252 policy.throttler_bytes->put(message_size);
2253 }
2254
2255 in_q->dispatch_throttle_release(message_size);
2256 }
2257 return ret;
2258 }
2259
2260 int Pipe::do_sendmsg(struct msghdr *msg, unsigned len, bool more)
2261 {
2262 MSGR_SIGPIPE_STOPPER;
2263 while (len > 0) {
2264 int r;
2265 r = ::sendmsg(sd, msg, MSG_NOSIGNAL | (more ? MSG_MORE : 0));
2266 if (r == 0)
2267 ldout(msgr->cct,10) << "do_sendmsg hmm do_sendmsg got r==0!" << dendl;
2268 if (r < 0) {
2269 r = -errno;
2270 ldout(msgr->cct,1) << "do_sendmsg error " << cpp_strerror(r) << dendl;
2271 return r;
2272 }
2273 if (state == STATE_CLOSED) {
2274 ldout(msgr->cct,10) << "do_sendmsg oh look, state == CLOSED, giving up" << dendl;
2275 return -EINTR; // close enough
2276 }
2277
2278 len -= r;
2279 if (len == 0) break;
2280
2281 // hrmph. trim r bytes off the front of our message.
2282 ldout(msgr->cct,20) << "do_sendmsg short write did " << r << ", still have " << len << dendl;
2283 while (r > 0) {
2284 if (msg->msg_iov[0].iov_len <= (size_t)r) {
2285 // lose this whole item
2286 //ldout(msgr->cct,30) << "skipping " << msg->msg_iov[0].iov_len << ", " << (msg->msg_iovlen-1) << " v, " << r << " left" << dendl;
2287 r -= msg->msg_iov[0].iov_len;
2288 msg->msg_iov++;
2289 msg->msg_iovlen--;
2290 } else {
2291 // partial!
2292 //ldout(msgr->cct,30) << "adjusting " << msg->msg_iov[0].iov_len << ", " << msg->msg_iovlen << " v, " << r << " left" << dendl;
2293 msg->msg_iov[0].iov_base = (char *)msg->msg_iov[0].iov_base + r;
2294 msg->msg_iov[0].iov_len -= r;
2295 break;
2296 }
2297 }
2298 }
2299 return 0;
2300 }
2301
2302
2303 int Pipe::write_ack(uint64_t seq)
2304 {
2305 ldout(msgr->cct,10) << "write_ack " << seq << dendl;
2306
2307 char c = CEPH_MSGR_TAG_ACK;
2308 ceph_le64 s;
2309 s = seq;
2310
2311 struct msghdr msg;
2312 memset(&msg, 0, sizeof(msg));
2313 struct iovec msgvec[2];
2314 msgvec[0].iov_base = &c;
2315 msgvec[0].iov_len = 1;
2316 msgvec[1].iov_base = &s;
2317 msgvec[1].iov_len = sizeof(s);
2318 msg.msg_iov = msgvec;
2319 msg.msg_iovlen = 2;
2320
2321 if (do_sendmsg(&msg, 1 + sizeof(s), true) < 0)
2322 return -1;
2323 return 0;
2324 }
2325
2326 int Pipe::write_keepalive()
2327 {
2328 ldout(msgr->cct,10) << "write_keepalive" << dendl;
2329
2330 char c = CEPH_MSGR_TAG_KEEPALIVE;
2331
2332 struct msghdr msg;
2333 memset(&msg, 0, sizeof(msg));
2334 struct iovec msgvec[2];
2335 msgvec[0].iov_base = &c;
2336 msgvec[0].iov_len = 1;
2337 msg.msg_iov = msgvec;
2338 msg.msg_iovlen = 1;
2339
2340 if (do_sendmsg(&msg, 1) < 0)
2341 return -1;
2342 return 0;
2343 }
2344
2345 int Pipe::write_keepalive2(char tag, const utime_t& t)
2346 {
2347 ldout(msgr->cct,10) << "write_keepalive2 " << (int)tag << " " << t << dendl;
2348 struct ceph_timespec ts;
2349 t.encode_timeval(&ts);
2350 struct msghdr msg;
2351 memset(&msg, 0, sizeof(msg));
2352 struct iovec msgvec[2];
2353 msgvec[0].iov_base = &tag;
2354 msgvec[0].iov_len = 1;
2355 msgvec[1].iov_base = &ts;
2356 msgvec[1].iov_len = sizeof(ts);
2357 msg.msg_iov = msgvec;
2358 msg.msg_iovlen = 2;
2359
2360 if (do_sendmsg(&msg, 1 + sizeof(ts)) < 0)
2361 return -1;
2362 return 0;
2363 }
2364
2365
2366 int Pipe::write_message(const ceph_msg_header& header, const ceph_msg_footer& footer, bufferlist& blist)
2367 {
2368 int ret;
2369
2370 // set up msghdr and iovecs
2371 struct msghdr msg;
2372 memset(&msg, 0, sizeof(msg));
2373 msg.msg_iov = msgvec;
2374 int msglen = 0;
2375
2376 // send tag
2377 char tag = CEPH_MSGR_TAG_MSG;
2378 msgvec[msg.msg_iovlen].iov_base = &tag;
2379 msgvec[msg.msg_iovlen].iov_len = 1;
2380 msglen++;
2381 msg.msg_iovlen++;
2382
2383 // send envelope
2384 ceph_msg_header_old oldheader;
2385 if (connection_state->has_feature(CEPH_FEATURE_NOSRCADDR)) {
2386 msgvec[msg.msg_iovlen].iov_base = (char*)&header;
2387 msgvec[msg.msg_iovlen].iov_len = sizeof(header);
2388 msglen += sizeof(header);
2389 msg.msg_iovlen++;
2390 } else {
2391 memcpy(&oldheader, &header, sizeof(header));
2392 oldheader.src.name = header.src;
2393 oldheader.src.addr = connection_state->get_peer_addr();
2394 oldheader.orig_src = oldheader.src;
2395 oldheader.reserved = header.reserved;
2396 if (msgr->crcflags & MSG_CRC_HEADER) {
2397 oldheader.crc = ceph_crc32c(0, (unsigned char*)&oldheader,
2398 sizeof(oldheader) - sizeof(oldheader.crc));
2399 } else {
2400 oldheader.crc = 0;
2401 }
2402 msgvec[msg.msg_iovlen].iov_base = (char*)&oldheader;
2403 msgvec[msg.msg_iovlen].iov_len = sizeof(oldheader);
2404 msglen += sizeof(oldheader);
2405 msg.msg_iovlen++;
2406 }
2407
2408 // payload (front+data)
2409 list<bufferptr>::const_iterator pb = blist.buffers().begin();
2410 unsigned b_off = 0; // carry-over buffer offset, if any
2411 unsigned bl_pos = 0; // blist pos
2412 unsigned left = blist.length();
2413
2414 while (left > 0) {
2415 unsigned donow = MIN(left, pb->length()-b_off);
2416 if (donow == 0) {
2417 ldout(msgr->cct,0) << "donow = " << donow << " left " << left << " pb->length " << pb->length()
2418 << " b_off " << b_off << dendl;
2419 }
2420 assert(donow > 0);
2421 ldout(msgr->cct,30) << " bl_pos " << bl_pos << " b_off " << b_off
2422 << " leftinchunk " << left
2423 << " buffer len " << pb->length()
2424 << " writing " << donow
2425 << dendl;
2426
2427 if (msg.msg_iovlen >= SM_IOV_MAX-2) {
2428 if (do_sendmsg(&msg, msglen, true))
2429 goto fail;
2430
2431 // and restart the iov
2432 msg.msg_iov = msgvec;
2433 msg.msg_iovlen = 0;
2434 msglen = 0;
2435 }
2436
2437 msgvec[msg.msg_iovlen].iov_base = (void*)(pb->c_str()+b_off);
2438 msgvec[msg.msg_iovlen].iov_len = donow;
2439 msglen += donow;
2440 msg.msg_iovlen++;
2441
2442 assert(left >= donow);
2443 left -= donow;
2444 b_off += donow;
2445 bl_pos += donow;
2446 if (left == 0)
2447 break;
2448 while (b_off == pb->length()) {
2449 ++pb;
2450 b_off = 0;
2451 }
2452 }
2453 assert(left == 0);
2454
2455 // send footer; if receiver doesn't support signatures, use the old footer format
2456
2457 ceph_msg_footer_old old_footer;
2458 if (connection_state->has_feature(CEPH_FEATURE_MSG_AUTH)) {
2459 msgvec[msg.msg_iovlen].iov_base = (void*)&footer;
2460 msgvec[msg.msg_iovlen].iov_len = sizeof(footer);
2461 msglen += sizeof(footer);
2462 msg.msg_iovlen++;
2463 } else {
2464 if (msgr->crcflags & MSG_CRC_HEADER) {
2465 old_footer.front_crc = footer.front_crc;
2466 old_footer.middle_crc = footer.middle_crc;
2467 } else {
2468 old_footer.front_crc = old_footer.middle_crc = 0;
2469 }
2470 old_footer.data_crc = msgr->crcflags & MSG_CRC_DATA ? footer.data_crc : 0;
2471 old_footer.flags = footer.flags;
2472 msgvec[msg.msg_iovlen].iov_base = (char*)&old_footer;
2473 msgvec[msg.msg_iovlen].iov_len = sizeof(old_footer);
2474 msglen += sizeof(old_footer);
2475 msg.msg_iovlen++;
2476 }
2477
2478 // send
2479 if (do_sendmsg(&msg, msglen))
2480 goto fail;
2481
2482 ret = 0;
2483
2484 out:
2485 return ret;
2486
2487 fail:
2488 ret = -1;
2489 goto out;
2490 }
2491
2492
2493 int Pipe::tcp_read(char *buf, unsigned len)
2494 {
2495 if (sd < 0)
2496 return -EINVAL;
2497
2498 while (len > 0) {
2499
2500 if (msgr->cct->_conf->ms_inject_socket_failures && sd >= 0) {
2501 if (rand() % msgr->cct->_conf->ms_inject_socket_failures == 0) {
2502 ldout(msgr->cct, 0) << "injecting socket failure" << dendl;
2503 ::shutdown(sd, SHUT_RDWR);
2504 }
2505 }
2506
2507 if (tcp_read_wait() < 0)
2508 return -1;
2509
2510 ssize_t got = tcp_read_nonblocking(buf, len);
2511
2512 if (got < 0)
2513 return -1;
2514
2515 len -= got;
2516 buf += got;
2517 //lgeneric_dout(cct, DBL) << "tcp_read got " << got << ", " << len << " left" << dendl;
2518 }
2519 return 0;
2520 }
2521
2522 int Pipe::tcp_read_wait()
2523 {
2524 if (sd < 0)
2525 return -EINVAL;
2526 struct pollfd pfd;
2527 short evmask;
2528 pfd.fd = sd;
2529 pfd.events = POLLIN;
2530 #if defined(__linux__)
2531 pfd.events |= POLLRDHUP;
2532 #endif
2533
2534 if (has_pending_data())
2535 return 0;
2536
2537 int r = poll(&pfd, 1, msgr->timeout);
2538 if (r < 0)
2539 return -errno;
2540 if (r == 0)
2541 return -EAGAIN;
2542
2543 evmask = POLLERR | POLLHUP | POLLNVAL;
2544 #if defined(__linux__)
2545 evmask |= POLLRDHUP;
2546 #endif
2547 if (pfd.revents & evmask)
2548 return -1;
2549
2550 if (!(pfd.revents & POLLIN))
2551 return -1;
2552
2553 return 0;
2554 }
2555
2556 ssize_t Pipe::do_recv(char *buf, size_t len, int flags)
2557 {
2558 again:
2559 ssize_t got = ::recv( sd, buf, len, flags );
2560 if (got < 0) {
2561 if (errno == EINTR) {
2562 goto again;
2563 }
2564 ldout(msgr->cct, 10) << __func__ << " socket " << sd << " returned "
2565 << got << " " << cpp_strerror(errno) << dendl;
2566 return -1;
2567 }
2568 if (got == 0) {
2569 return -1;
2570 }
2571 return got;
2572 }
2573
2574 ssize_t Pipe::buffered_recv(char *buf, size_t len, int flags)
2575 {
2576 size_t left = len;
2577 ssize_t total_recv = 0;
2578 if (recv_len > recv_ofs) {
2579 int to_read = MIN(recv_len - recv_ofs, left);
2580 memcpy(buf, &recv_buf[recv_ofs], to_read);
2581 recv_ofs += to_read;
2582 left -= to_read;
2583 if (left == 0) {
2584 return to_read;
2585 }
2586 buf += to_read;
2587 total_recv += to_read;
2588 }
2589
2590 /* nothing left in the prefetch buffer */
2591
2592 if (left > recv_max_prefetch) {
2593 /* this was a large read, we don't prefetch for these */
2594 ssize_t ret = do_recv(buf, left, flags );
2595 if (ret < 0) {
2596 if (total_recv > 0)
2597 return total_recv;
2598 return ret;
2599 }
2600 total_recv += ret;
2601 return total_recv;
2602 }
2603
2604
2605 ssize_t got = do_recv(recv_buf, recv_max_prefetch, flags);
2606 if (got < 0) {
2607 if (total_recv > 0)
2608 return total_recv;
2609
2610 return got;
2611 }
2612
2613 recv_len = (size_t)got;
2614 got = MIN(left, (size_t)got);
2615 memcpy(buf, recv_buf, got);
2616 recv_ofs = got;
2617 total_recv += got;
2618 return total_recv;
2619 }
2620
2621 ssize_t Pipe::tcp_read_nonblocking(char *buf, unsigned len)
2622 {
2623 ssize_t got = buffered_recv(buf, len, MSG_DONTWAIT );
2624 if (got < 0) {
2625 ldout(msgr->cct, 10) << __func__ << " socket " << sd << " returned "
2626 << got << " " << cpp_strerror(errno) << dendl;
2627 return -1;
2628 }
2629 if (got == 0) {
2630 /* poll() said there was data, but we didn't read any - peer
2631 * sent a FIN. Maybe POLLRDHUP signals this, but this is
2632 * standard socket behavior as documented by Stevens.
2633 */
2634 return -1;
2635 }
2636 return got;
2637 }
2638
2639 int Pipe::tcp_write(const char *buf, unsigned len)
2640 {
2641 if (sd < 0)
2642 return -1;
2643 struct pollfd pfd;
2644 pfd.fd = sd;
2645 pfd.events = POLLOUT | POLLHUP | POLLNVAL | POLLERR;
2646 #if defined(__linux__)
2647 pfd.events |= POLLRDHUP;
2648 #endif
2649
2650 if (msgr->cct->_conf->ms_inject_socket_failures && sd >= 0) {
2651 if (rand() % msgr->cct->_conf->ms_inject_socket_failures == 0) {
2652 ldout(msgr->cct, 0) << "injecting socket failure" << dendl;
2653 ::shutdown(sd, SHUT_RDWR);
2654 }
2655 }
2656
2657 if (poll(&pfd, 1, -1) < 0)
2658 return -1;
2659
2660 if (!(pfd.revents & POLLOUT))
2661 return -1;
2662
2663 //lgeneric_dout(cct, DBL) << "tcp_write writing " << len << dendl;
2664 assert(len > 0);
2665 while (len > 0) {
2666 MSGR_SIGPIPE_STOPPER;
2667 int did = ::send( sd, buf, len, MSG_NOSIGNAL );
2668 if (did < 0) {
2669 //lgeneric_dout(cct, 1) << "tcp_write error did = " << did << " " << cpp_strerror(errno) << dendl;
2670 //lgeneric_derr(cct, 1) << "tcp_write error did = " << did << " " << cpp_strerror(errno) << dendl;
2671 return did;
2672 }
2673 len -= did;
2674 buf += did;
2675 //lgeneric_dout(cct, DBL) << "tcp_write did " << did << ", " << len << " left" << dendl;
2676 }
2677 return 0;
2678 }