]> git.proxmox.com Git - ceph.git/blob - ceph/src/msg/simple/Pipe.cc
fc415df8a093978609f8b885426636703f8d4de3
[ceph.git] / ceph / src / msg / simple / Pipe.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #include <sys/types.h>
16 #include <sys/socket.h>
17 #include <netinet/in.h>
18 #include <netinet/ip.h>
19 #include <netinet/tcp.h>
20 #include <sys/uio.h>
21 #include <limits.h>
22 #include <poll.h>
23
24 #include "msg/Message.h"
25 #include "Pipe.h"
26 #include "SimpleMessenger.h"
27
28 #include "common/debug.h"
29 #include "common/errno.h"
30 #include "common/valgrind.h"
31
32 // Below included to get encode_encrypt(); That probably should be in Crypto.h, instead
33
34 #include "auth/Crypto.h"
35 #include "auth/cephx/CephxProtocol.h"
36 #include "auth/AuthSessionHandler.h"
37
38 #include "include/sock_compat.h"
39
40 // Constant to limit starting sequence number to 2^31. Nothing special about it, just a big number. PLR
41 #define SEQ_MASK 0x7fffffff
42 #define dout_subsys ceph_subsys_ms
43
44 #undef dout_prefix
45 #define dout_prefix *_dout << *this
46 ostream& Pipe::_pipe_prefix(std::ostream &out) const {
47 return out << "-- " << msgr->get_myinst().addr << " >> " << peer_addr << " pipe(" << this
48 << " sd=" << sd << " :" << port
49 << " s=" << state
50 << " pgs=" << peer_global_seq
51 << " cs=" << connect_seq
52 << " l=" << policy.lossy
53 << " c=" << connection_state
54 << ").";
55 }
56
57 ostream& operator<<(ostream &out, const Pipe &pipe) {
58 return pipe._pipe_prefix(out);
59 }
60
61 /**
62 * The DelayedDelivery is for injecting delays into Message delivery off
63 * the socket. It is only enabled if delays are requested, and if they
64 * are then it pulls Messages off the DelayQueue and puts them into the
65 * in_q (SimpleMessenger::dispatch_queue).
66 * Please note that this probably has issues with Pipe shutdown and
67 * replacement semantics. I've tried, but no guarantees.
68 */
69 class Pipe::DelayedDelivery: public Thread {
70 Pipe *pipe;
71 std::deque< pair<utime_t,Message*> > delay_queue;
72 Mutex delay_lock;
73 Cond delay_cond;
74 int flush_count;
75 bool active_flush;
76 bool stop_delayed_delivery;
77 bool delay_dispatching; // we are in fast dispatch now
78 bool stop_fast_dispatching_flag; // we need to stop fast dispatching
79
80 public:
81 explicit DelayedDelivery(Pipe *p)
82 : pipe(p),
83 delay_lock("Pipe::DelayedDelivery::delay_lock"), flush_count(0),
84 active_flush(false),
85 stop_delayed_delivery(false),
86 delay_dispatching(false),
87 stop_fast_dispatching_flag(false) { }
88 ~DelayedDelivery() override {
89 discard();
90 }
91 void *entry() override;
92 void queue(utime_t release, Message *m) {
93 Mutex::Locker l(delay_lock);
94 delay_queue.push_back(make_pair(release, m));
95 delay_cond.Signal();
96 }
97 void discard();
98 void flush();
99 bool is_flushing() {
100 Mutex::Locker l(delay_lock);
101 return flush_count > 0 || active_flush;
102 }
103 void wait_for_flush() {
104 Mutex::Locker l(delay_lock);
105 while (flush_count > 0 || active_flush)
106 delay_cond.Wait(delay_lock);
107 }
108 void stop() {
109 delay_lock.Lock();
110 stop_delayed_delivery = true;
111 delay_cond.Signal();
112 delay_lock.Unlock();
113 }
114 void steal_for_pipe(Pipe *new_owner) {
115 Mutex::Locker l(delay_lock);
116 pipe = new_owner;
117 }
118 /**
119 * We need to stop fast dispatching before we need to stop putting
120 * normal messages into the DispatchQueue.
121 */
122 void stop_fast_dispatching();
123 };
124
125 /**************************************
126 * Pipe
127 */
128
129 Pipe::Pipe(SimpleMessenger *r, int st, PipeConnection *con)
130 : RefCountedObject(r->cct),
131 reader_thread(this),
132 writer_thread(this),
133 delay_thread(NULL),
134 msgr(r),
135 conn_id(r->dispatch_queue.get_id()),
136 recv_ofs(0),
137 recv_len(0),
138 sd(-1), port(0),
139 peer_type(-1),
140 pipe_lock("SimpleMessenger::Pipe::pipe_lock"),
141 state(st),
142 connection_state(NULL),
143 reader_running(false), reader_needs_join(false),
144 reader_dispatching(false), notify_on_dispatch_done(false),
145 writer_running(false),
146 in_q(&(r->dispatch_queue)),
147 send_keepalive(false),
148 send_keepalive_ack(false),
149 connect_seq(0), peer_global_seq(0),
150 out_seq(0), in_seq(0), in_seq_acked(0) {
151 ANNOTATE_BENIGN_RACE_SIZED(&sd, sizeof(sd), "Pipe socket");
152 ANNOTATE_BENIGN_RACE_SIZED(&state, sizeof(state), "Pipe state");
153 ANNOTATE_BENIGN_RACE_SIZED(&recv_len, sizeof(recv_len), "Pipe recv_len");
154 ANNOTATE_BENIGN_RACE_SIZED(&recv_ofs, sizeof(recv_ofs), "Pipe recv_ofs");
155 if (con) {
156 connection_state = con;
157 connection_state->reset_pipe(this);
158 } else {
159 connection_state = new PipeConnection(msgr->cct, msgr);
160 connection_state->pipe = get();
161 }
162
163 if (randomize_out_seq()) {
164 lsubdout(msgr->cct,ms,15) << "Pipe(): Could not get random bytes to set seq number for session reset; set seq number to " << out_seq << dendl;
165 }
166
167
168 msgr->timeout = msgr->cct->_conf->ms_tcp_read_timeout * 1000; //convert to ms
169 if (msgr->timeout == 0)
170 msgr->timeout = -1;
171
172 recv_max_prefetch = msgr->cct->_conf->ms_tcp_prefetch_max_size;
173 recv_buf = new char[recv_max_prefetch];
174 }
175
176 Pipe::~Pipe()
177 {
178 assert(out_q.empty());
179 assert(sent.empty());
180 delete delay_thread;
181 delete[] recv_buf;
182 }
183
184 void Pipe::handle_ack(uint64_t seq)
185 {
186 lsubdout(msgr->cct, ms, 15) << "reader got ack seq " << seq << dendl;
187 // trim sent list
188 while (!sent.empty() &&
189 sent.front()->get_seq() <= seq) {
190 Message *m = sent.front();
191 sent.pop_front();
192 lsubdout(msgr->cct, ms, 10) << "reader got ack seq "
193 << seq << " >= " << m->get_seq() << " on " << m << " " << *m << dendl;
194 m->put();
195 }
196 }
197
198 void Pipe::start_reader()
199 {
200 assert(pipe_lock.is_locked());
201 assert(!reader_running);
202 if (reader_needs_join) {
203 reader_thread.join();
204 reader_needs_join = false;
205 }
206 reader_running = true;
207 reader_thread.create("ms_pipe_read", msgr->cct->_conf->ms_rwthread_stack_bytes);
208 }
209
210 void Pipe::maybe_start_delay_thread()
211 {
212 if (!delay_thread) {
213 auto pos = msgr->cct->_conf->get_val<std::string>("ms_inject_delay_type").find(ceph_entity_type_name(connection_state->peer_type));
214 if (pos != string::npos) {
215 lsubdout(msgr->cct, ms, 1) << "setting up a delay queue on Pipe " << this << dendl;
216 delay_thread = new DelayedDelivery(this);
217 delay_thread->create("ms_pipe_delay");
218 }
219 }
220 }
221
222 void Pipe::start_writer()
223 {
224 assert(pipe_lock.is_locked());
225 assert(!writer_running);
226 writer_running = true;
227 writer_thread.create("ms_pipe_write", msgr->cct->_conf->ms_rwthread_stack_bytes);
228 }
229
230 void Pipe::join_reader()
231 {
232 if (!reader_running)
233 return;
234 cond.Signal();
235 pipe_lock.Unlock();
236 reader_thread.join();
237 pipe_lock.Lock();
238 reader_needs_join = false;
239 }
240
241 void Pipe::DelayedDelivery::discard()
242 {
243 lgeneric_subdout(pipe->msgr->cct, ms, 20) << *pipe << "DelayedDelivery::discard" << dendl;
244 Mutex::Locker l(delay_lock);
245 while (!delay_queue.empty()) {
246 Message *m = delay_queue.front().second;
247 pipe->in_q->dispatch_throttle_release(m->get_dispatch_throttle_size());
248 m->put();
249 delay_queue.pop_front();
250 }
251 }
252
253 void Pipe::DelayedDelivery::flush()
254 {
255 lgeneric_subdout(pipe->msgr->cct, ms, 20) << *pipe << "DelayedDelivery::flush" << dendl;
256 Mutex::Locker l(delay_lock);
257 flush_count = delay_queue.size();
258 delay_cond.Signal();
259 }
260
261 void *Pipe::DelayedDelivery::entry()
262 {
263 Mutex::Locker locker(delay_lock);
264 lgeneric_subdout(pipe->msgr->cct, ms, 20) << *pipe << "DelayedDelivery::entry start" << dendl;
265
266 while (!stop_delayed_delivery) {
267 if (delay_queue.empty()) {
268 lgeneric_subdout(pipe->msgr->cct, ms, 30) << *pipe << "DelayedDelivery::entry sleeping on delay_cond because delay queue is empty" << dendl;
269 delay_cond.Wait(delay_lock);
270 continue;
271 }
272 utime_t release = delay_queue.front().first;
273 Message *m = delay_queue.front().second;
274 string delay_msg_type = pipe->msgr->cct->_conf->ms_inject_delay_msg_type;
275 if (!flush_count &&
276 (release > ceph_clock_now() &&
277 (delay_msg_type.empty() || m->get_type_name() == delay_msg_type))) {
278 lgeneric_subdout(pipe->msgr->cct, ms, 10) << *pipe << "DelayedDelivery::entry sleeping on delay_cond until " << release << dendl;
279 delay_cond.WaitUntil(delay_lock, release);
280 continue;
281 }
282 lgeneric_subdout(pipe->msgr->cct, ms, 10) << *pipe << "DelayedDelivery::entry dequeuing message " << m << " for delivery, past " << release << dendl;
283 delay_queue.pop_front();
284 if (flush_count > 0) {
285 --flush_count;
286 active_flush = true;
287 }
288 if (pipe->in_q->can_fast_dispatch(m)) {
289 if (!stop_fast_dispatching_flag) {
290 delay_dispatching = true;
291 delay_lock.Unlock();
292 pipe->in_q->fast_dispatch(m);
293 delay_lock.Lock();
294 delay_dispatching = false;
295 if (stop_fast_dispatching_flag) {
296 // we need to let the stopping thread proceed
297 delay_cond.Signal();
298 delay_lock.Unlock();
299 delay_lock.Lock();
300 }
301 }
302 } else {
303 pipe->in_q->enqueue(m, m->get_priority(), pipe->conn_id);
304 }
305 active_flush = false;
306 }
307 lgeneric_subdout(pipe->msgr->cct, ms, 20) << *pipe << "DelayedDelivery::entry stop" << dendl;
308 return NULL;
309 }
310
311 void Pipe::DelayedDelivery::stop_fast_dispatching() {
312 Mutex::Locker l(delay_lock);
313 stop_fast_dispatching_flag = true;
314 while (delay_dispatching)
315 delay_cond.Wait(delay_lock);
316 }
317
318
319 int Pipe::accept()
320 {
321 ldout(msgr->cct,10) << "accept" << dendl;
322 assert(pipe_lock.is_locked());
323 assert(state == STATE_ACCEPTING);
324
325 pipe_lock.Unlock();
326
327 // vars
328 bufferlist addrs;
329 entity_addr_t socket_addr;
330 socklen_t len;
331 int r;
332 char banner[strlen(CEPH_BANNER)+1];
333 bufferlist addrbl;
334 ceph_msg_connect connect;
335 ceph_msg_connect_reply reply;
336 Pipe *existing = 0;
337 bufferptr bp;
338 bufferlist authorizer, authorizer_reply;
339 bool authorizer_valid;
340 uint64_t feat_missing;
341 bool replaced = false;
342 // this variable denotes if the connection attempt from peer is a hard
343 // reset or not, it is true if there is an existing connection and the
344 // connection sequence from peer is equal to zero
345 bool is_reset_from_peer = false;
346 CryptoKey session_key;
347 int removed; // single-use down below
348
349 // this should roughly mirror pseudocode at
350 // http://ceph.com/wiki/Messaging_protocol
351 int reply_tag = 0;
352 uint64_t existing_seq = -1;
353
354 // used for reading in the remote acked seq on connect
355 uint64_t newly_acked_seq = 0;
356
357 recv_reset();
358
359 set_socket_options();
360
361 // announce myself.
362 r = tcp_write(CEPH_BANNER, strlen(CEPH_BANNER));
363 if (r < 0) {
364 ldout(msgr->cct,10) << "accept couldn't write banner" << dendl;
365 goto fail_unlocked;
366 }
367
368 // and my addr
369 ::encode(msgr->my_inst.addr, addrs, 0); // legacy
370
371 port = msgr->my_inst.addr.get_port();
372
373 // and peer's socket addr (they might not know their ip)
374 sockaddr_storage ss;
375 len = sizeof(ss);
376 r = ::getpeername(sd, (sockaddr*)&ss, &len);
377 if (r < 0) {
378 ldout(msgr->cct,0) << "accept failed to getpeername " << cpp_strerror(errno) << dendl;
379 goto fail_unlocked;
380 }
381 socket_addr.set_sockaddr((sockaddr*)&ss);
382 ::encode(socket_addr, addrs, 0); // legacy
383
384 r = tcp_write(addrs.c_str(), addrs.length());
385 if (r < 0) {
386 ldout(msgr->cct,10) << "accept couldn't write my+peer addr" << dendl;
387 goto fail_unlocked;
388 }
389
390 ldout(msgr->cct,1) << "accept sd=" << sd << " " << socket_addr << dendl;
391
392 // identify peer
393 if (tcp_read(banner, strlen(CEPH_BANNER)) < 0) {
394 ldout(msgr->cct,10) << "accept couldn't read banner" << dendl;
395 goto fail_unlocked;
396 }
397 if (memcmp(banner, CEPH_BANNER, strlen(CEPH_BANNER))) {
398 banner[strlen(CEPH_BANNER)] = 0;
399 ldout(msgr->cct,1) << "accept peer sent bad banner '" << banner << "' (should be '" << CEPH_BANNER << "')" << dendl;
400 goto fail_unlocked;
401 }
402 {
403 bufferptr tp(sizeof(ceph_entity_addr));
404 addrbl.push_back(std::move(tp));
405 }
406 if (tcp_read(addrbl.c_str(), addrbl.length()) < 0) {
407 ldout(msgr->cct,10) << "accept couldn't read peer_addr" << dendl;
408 goto fail_unlocked;
409 }
410 {
411 bufferlist::iterator ti = addrbl.begin();
412 ::decode(peer_addr, ti);
413 }
414
415 ldout(msgr->cct,10) << "accept peer addr is " << peer_addr << dendl;
416 if (peer_addr.is_blank_ip()) {
417 // peer apparently doesn't know what ip they have; figure it out for them.
418 int port = peer_addr.get_port();
419 peer_addr.u = socket_addr.u;
420 peer_addr.set_port(port);
421 ldout(msgr->cct,0) << "accept peer addr is really " << peer_addr
422 << " (socket is " << socket_addr << ")" << dendl;
423 }
424 set_peer_addr(peer_addr); // so that connection_state gets set up
425
426 while (1) {
427 if (tcp_read((char*)&connect, sizeof(connect)) < 0) {
428 ldout(msgr->cct,10) << "accept couldn't read connect" << dendl;
429 goto fail_unlocked;
430 }
431
432 authorizer.clear();
433 if (connect.authorizer_len) {
434 bp = buffer::create(connect.authorizer_len);
435 if (tcp_read(bp.c_str(), connect.authorizer_len) < 0) {
436 ldout(msgr->cct,10) << "accept couldn't read connect authorizer" << dendl;
437 goto fail_unlocked;
438 }
439 authorizer.push_back(std::move(bp));
440 authorizer_reply.clear();
441 }
442
443 ldout(msgr->cct,20) << "accept got peer connect_seq " << connect.connect_seq
444 << " global_seq " << connect.global_seq
445 << dendl;
446
447 msgr->lock.Lock(); // FIXME
448 pipe_lock.Lock();
449 if (msgr->dispatch_queue.stop)
450 goto shutting_down;
451 if (state != STATE_ACCEPTING) {
452 goto shutting_down;
453 }
454
455 // note peer's type, flags
456 set_peer_type(connect.host_type);
457 policy = msgr->get_policy(connect.host_type);
458 ldout(msgr->cct,10) << "accept of host_type " << connect.host_type
459 << ", policy.lossy=" << policy.lossy
460 << " policy.server=" << policy.server
461 << " policy.standby=" << policy.standby
462 << " policy.resetcheck=" << policy.resetcheck
463 << dendl;
464
465 memset(&reply, 0, sizeof(reply));
466 reply.protocol_version = msgr->get_proto_version(peer_type, false);
467 msgr->lock.Unlock();
468
469 // mismatch?
470 ldout(msgr->cct,10) << "accept my proto " << reply.protocol_version
471 << ", their proto " << connect.protocol_version << dendl;
472 if (connect.protocol_version != reply.protocol_version) {
473 reply.tag = CEPH_MSGR_TAG_BADPROTOVER;
474 goto reply;
475 }
476
477 // require signatures for cephx?
478 if (connect.authorizer_protocol == CEPH_AUTH_CEPHX) {
479 if (peer_type == CEPH_ENTITY_TYPE_OSD ||
480 peer_type == CEPH_ENTITY_TYPE_MDS) {
481 if (msgr->cct->_conf->cephx_require_signatures ||
482 msgr->cct->_conf->cephx_cluster_require_signatures) {
483 ldout(msgr->cct,10) << "using cephx, requiring MSG_AUTH feature bit for cluster" << dendl;
484 policy.features_required |= CEPH_FEATURE_MSG_AUTH;
485 }
486 } else {
487 if (msgr->cct->_conf->cephx_require_signatures ||
488 msgr->cct->_conf->cephx_service_require_signatures) {
489 ldout(msgr->cct,10) << "using cephx, requiring MSG_AUTH feature bit for service" << dendl;
490 policy.features_required |= CEPH_FEATURE_MSG_AUTH;
491 }
492 }
493 }
494
495 feat_missing = policy.features_required & ~(uint64_t)connect.features;
496 if (feat_missing) {
497 ldout(msgr->cct,1) << "peer missing required features " << std::hex << feat_missing << std::dec << dendl;
498 reply.tag = CEPH_MSGR_TAG_FEATURES;
499 goto reply;
500 }
501
502 // Check the authorizer. If not good, bail out.
503
504 pipe_lock.Unlock();
505
506 if (!msgr->verify_authorizer(connection_state.get(), peer_type, connect.authorizer_protocol, authorizer,
507 authorizer_reply, authorizer_valid, session_key) ||
508 !authorizer_valid) {
509 ldout(msgr->cct,0) << "accept: got bad authorizer" << dendl;
510 pipe_lock.Lock();
511 if (state != STATE_ACCEPTING)
512 goto shutting_down_msgr_unlocked;
513 reply.tag = CEPH_MSGR_TAG_BADAUTHORIZER;
514 session_security.reset();
515 goto reply;
516 }
517
518 // We've verified the authorizer for this pipe, so set up the session security structure. PLR
519
520 ldout(msgr->cct,10) << "accept: setting up session_security." << dendl;
521
522 retry_existing_lookup:
523 msgr->lock.Lock();
524 pipe_lock.Lock();
525 if (msgr->dispatch_queue.stop)
526 goto shutting_down;
527 if (state != STATE_ACCEPTING)
528 goto shutting_down;
529
530 // existing?
531 existing = msgr->_lookup_pipe(peer_addr);
532 if (existing) {
533 existing->pipe_lock.Lock(true); // skip lockdep check (we are locking a second Pipe here)
534 if (existing->reader_dispatching) {
535 /** we need to wait, or we can deadlock if downstream
536 * fast_dispatchers are (naughtily!) waiting on resources
537 * held by somebody trying to make use of the SimpleMessenger lock.
538 * So drop locks, wait, and retry. It just looks like a slow network
539 * to everybody else.
540 *
541 * We take a ref to existing here since it might get reaped before we
542 * wake up (see bug #15870). We can be confident that it lived until
543 * locked it since we held the msgr lock from _lookup_pipe through to
544 * locking existing->lock and checking reader_dispatching.
545 */
546 existing->get();
547 pipe_lock.Unlock();
548 msgr->lock.Unlock();
549 existing->notify_on_dispatch_done = true;
550 while (existing->reader_dispatching)
551 existing->cond.Wait(existing->pipe_lock);
552 existing->pipe_lock.Unlock();
553 existing->put();
554 existing = nullptr;
555 goto retry_existing_lookup;
556 }
557
558 if (connect.global_seq < existing->peer_global_seq) {
559 ldout(msgr->cct,10) << "accept existing " << existing << ".gseq " << existing->peer_global_seq
560 << " > " << connect.global_seq << ", RETRY_GLOBAL" << dendl;
561 reply.tag = CEPH_MSGR_TAG_RETRY_GLOBAL;
562 reply.global_seq = existing->peer_global_seq; // so we can send it below..
563 existing->pipe_lock.Unlock();
564 msgr->lock.Unlock();
565 goto reply;
566 } else {
567 ldout(msgr->cct,10) << "accept existing " << existing << ".gseq " << existing->peer_global_seq
568 << " <= " << connect.global_seq << ", looks ok" << dendl;
569 }
570
571 if (existing->policy.lossy) {
572 ldout(msgr->cct,0) << "accept replacing existing (lossy) channel (new one lossy="
573 << policy.lossy << ")" << dendl;
574 existing->was_session_reset();
575 goto replace;
576 }
577
578 ldout(msgr->cct,0) << "accept connect_seq " << connect.connect_seq
579 << " vs existing " << existing->connect_seq
580 << " state " << existing->get_state_name() << dendl;
581
582 if (connect.connect_seq == 0 && existing->connect_seq > 0) {
583 ldout(msgr->cct,0) << "accept peer reset, then tried to connect to us, replacing" << dendl;
584 // this is a hard reset from peer
585 is_reset_from_peer = true;
586 if (policy.resetcheck)
587 existing->was_session_reset(); // this resets out_queue, msg_ and connect_seq #'s
588 goto replace;
589 }
590
591 if (connect.connect_seq < existing->connect_seq) {
592 // old attempt, or we sent READY but they didn't get it.
593 ldout(msgr->cct,10) << "accept existing " << existing << ".cseq " << existing->connect_seq
594 << " > " << connect.connect_seq << ", RETRY_SESSION" << dendl;
595 goto retry_session;
596 }
597
598 if (connect.connect_seq == existing->connect_seq) {
599 // if the existing connection successfully opened, and/or
600 // subsequently went to standby, then the peer should bump
601 // their connect_seq and retry: this is not a connection race
602 // we need to resolve here.
603 if (existing->state == STATE_OPEN ||
604 existing->state == STATE_STANDBY) {
605 ldout(msgr->cct,10) << "accept connection race, existing " << existing
606 << ".cseq " << existing->connect_seq
607 << " == " << connect.connect_seq
608 << ", OPEN|STANDBY, RETRY_SESSION" << dendl;
609 goto retry_session;
610 }
611
612 // connection race?
613 if (peer_addr < msgr->my_inst.addr ||
614 existing->policy.server) {
615 // incoming wins
616 ldout(msgr->cct,10) << "accept connection race, existing " << existing << ".cseq " << existing->connect_seq
617 << " == " << connect.connect_seq << ", or we are server, replacing my attempt" << dendl;
618 if (!(existing->state == STATE_CONNECTING ||
619 existing->state == STATE_WAIT))
620 lderr(msgr->cct) << "accept race bad state, would replace, existing="
621 << existing->get_state_name()
622 << " " << existing << ".cseq=" << existing->connect_seq
623 << " == " << connect.connect_seq
624 << dendl;
625 assert(existing->state == STATE_CONNECTING ||
626 existing->state == STATE_WAIT);
627 goto replace;
628 } else {
629 // our existing outgoing wins
630 ldout(msgr->cct,10) << "accept connection race, existing " << existing << ".cseq " << existing->connect_seq
631 << " == " << connect.connect_seq << ", sending WAIT" << dendl;
632 assert(peer_addr > msgr->my_inst.addr);
633 if (!(existing->state == STATE_CONNECTING))
634 lderr(msgr->cct) << "accept race bad state, would send wait, existing="
635 << existing->get_state_name()
636 << " " << existing << ".cseq=" << existing->connect_seq
637 << " == " << connect.connect_seq
638 << dendl;
639 assert(existing->state == STATE_CONNECTING);
640 // make sure our outgoing connection will follow through
641 existing->_send_keepalive();
642 reply.tag = CEPH_MSGR_TAG_WAIT;
643 existing->pipe_lock.Unlock();
644 msgr->lock.Unlock();
645 goto reply;
646 }
647 }
648
649 assert(connect.connect_seq > existing->connect_seq);
650 assert(connect.global_seq >= existing->peer_global_seq);
651 if (policy.resetcheck && // RESETSESSION only used by servers; peers do not reset each other
652 existing->connect_seq == 0) {
653 ldout(msgr->cct,0) << "accept we reset (peer sent cseq " << connect.connect_seq
654 << ", " << existing << ".cseq = " << existing->connect_seq
655 << "), sending RESETSESSION" << dendl;
656 reply.tag = CEPH_MSGR_TAG_RESETSESSION;
657 msgr->lock.Unlock();
658 existing->pipe_lock.Unlock();
659 goto reply;
660 }
661
662 // reconnect
663 ldout(msgr->cct,10) << "accept peer sent cseq " << connect.connect_seq
664 << " > " << existing->connect_seq << dendl;
665 goto replace;
666 } // existing
667 else if (connect.connect_seq > 0) {
668 // we reset, and they are opening a new session
669 ldout(msgr->cct,0) << "accept we reset (peer sent cseq " << connect.connect_seq << "), sending RESETSESSION" << dendl;
670 msgr->lock.Unlock();
671 reply.tag = CEPH_MSGR_TAG_RESETSESSION;
672 goto reply;
673 } else {
674 // new session
675 ldout(msgr->cct,10) << "accept new session" << dendl;
676 existing = NULL;
677 goto open;
678 }
679 ceph_abort();
680
681 retry_session:
682 assert(existing->pipe_lock.is_locked());
683 assert(pipe_lock.is_locked());
684 reply.tag = CEPH_MSGR_TAG_RETRY_SESSION;
685 reply.connect_seq = existing->connect_seq + 1;
686 existing->pipe_lock.Unlock();
687 msgr->lock.Unlock();
688 goto reply;
689
690 reply:
691 assert(pipe_lock.is_locked());
692 reply.features = ((uint64_t)connect.features & policy.features_supported) | policy.features_required;
693 reply.authorizer_len = authorizer_reply.length();
694 pipe_lock.Unlock();
695 r = tcp_write((char*)&reply, sizeof(reply));
696 if (r < 0)
697 goto fail_unlocked;
698 if (reply.authorizer_len) {
699 r = tcp_write(authorizer_reply.c_str(), authorizer_reply.length());
700 if (r < 0)
701 goto fail_unlocked;
702 }
703 }
704
705 replace:
706 assert(existing->pipe_lock.is_locked());
707 assert(pipe_lock.is_locked());
708 // if it is a hard reset from peer, we don't need a round-trip to negotiate in/out sequence
709 if ((connect.features & CEPH_FEATURE_RECONNECT_SEQ) && !is_reset_from_peer) {
710 reply_tag = CEPH_MSGR_TAG_SEQ;
711 existing_seq = existing->in_seq;
712 }
713 ldout(msgr->cct,10) << "accept replacing " << existing << dendl;
714 existing->stop();
715 existing->unregister_pipe();
716 replaced = true;
717
718 if (existing->policy.lossy) {
719 // disconnect from the Connection
720 assert(existing->connection_state);
721 if (existing->connection_state->clear_pipe(existing))
722 msgr->dispatch_queue.queue_reset(existing->connection_state.get());
723 } else {
724 // queue a reset on the new connection, which we're dumping for the old
725 msgr->dispatch_queue.queue_reset(connection_state.get());
726
727 // drop my Connection, and take a ref to the existing one. do not
728 // clear existing->connection_state, since read_message and
729 // write_message both dereference it without pipe_lock.
730 connection_state = existing->connection_state;
731
732 // make existing Connection reference us
733 connection_state->reset_pipe(this);
734
735 if (existing->delay_thread) {
736 existing->delay_thread->steal_for_pipe(this);
737 delay_thread = existing->delay_thread;
738 existing->delay_thread = NULL;
739 delay_thread->flush();
740 }
741
742 // steal incoming queue
743 uint64_t replaced_conn_id = conn_id;
744 conn_id = existing->conn_id;
745 existing->conn_id = replaced_conn_id;
746
747 // reset the in_seq if this is a hard reset from peer,
748 // otherwise we respect our original connection's value
749 in_seq = is_reset_from_peer ? 0 : existing->in_seq;
750 in_seq_acked = in_seq;
751
752 // steal outgoing queue and out_seq
753 existing->requeue_sent();
754 out_seq = existing->out_seq;
755 ldout(msgr->cct,10) << "accept re-queuing on out_seq " << out_seq << " in_seq " << in_seq << dendl;
756 for (map<int, list<Message*> >::iterator p = existing->out_q.begin();
757 p != existing->out_q.end();
758 ++p)
759 out_q[p->first].splice(out_q[p->first].begin(), p->second);
760 }
761 existing->stop_and_wait();
762 existing->pipe_lock.Unlock();
763
764 open:
765 // open
766 assert(pipe_lock.is_locked());
767 connect_seq = connect.connect_seq + 1;
768 peer_global_seq = connect.global_seq;
769 assert(state == STATE_ACCEPTING);
770 state = STATE_OPEN;
771 ldout(msgr->cct,10) << "accept success, connect_seq = " << connect_seq << ", sending READY" << dendl;
772
773 // send READY reply
774 reply.tag = (reply_tag ? reply_tag : CEPH_MSGR_TAG_READY);
775 reply.features = policy.features_supported;
776 reply.global_seq = msgr->get_global_seq();
777 reply.connect_seq = connect_seq;
778 reply.flags = 0;
779 reply.authorizer_len = authorizer_reply.length();
780 if (policy.lossy)
781 reply.flags = reply.flags | CEPH_MSG_CONNECT_LOSSY;
782
783 connection_state->set_features((uint64_t)reply.features & (uint64_t)connect.features);
784 ldout(msgr->cct,10) << "accept features " << connection_state->get_features() << dendl;
785
786 session_security.reset(
787 get_auth_session_handler(msgr->cct,
788 connect.authorizer_protocol,
789 session_key,
790 connection_state->get_features()));
791
792 // notify
793 msgr->dispatch_queue.queue_accept(connection_state.get());
794 msgr->ms_deliver_handle_fast_accept(connection_state.get());
795
796 // ok!
797 if (msgr->dispatch_queue.stop)
798 goto shutting_down;
799 removed = msgr->accepting_pipes.erase(this);
800 assert(removed == 1);
801 register_pipe();
802 msgr->lock.Unlock();
803 pipe_lock.Unlock();
804
805 r = tcp_write((char*)&reply, sizeof(reply));
806 if (r < 0) {
807 goto fail_registered;
808 }
809
810 if (reply.authorizer_len) {
811 r = tcp_write(authorizer_reply.c_str(), authorizer_reply.length());
812 if (r < 0) {
813 goto fail_registered;
814 }
815 }
816
817 if (reply_tag == CEPH_MSGR_TAG_SEQ) {
818 if (tcp_write((char*)&existing_seq, sizeof(existing_seq)) < 0) {
819 ldout(msgr->cct,2) << "accept write error on in_seq" << dendl;
820 goto fail_registered;
821 }
822 if (tcp_read((char*)&newly_acked_seq, sizeof(newly_acked_seq)) < 0) {
823 ldout(msgr->cct,2) << "accept read error on newly_acked_seq" << dendl;
824 goto fail_registered;
825 }
826 }
827
828 pipe_lock.Lock();
829 discard_requeued_up_to(newly_acked_seq);
830 if (state != STATE_CLOSED) {
831 ldout(msgr->cct,10) << "accept starting writer, state " << get_state_name() << dendl;
832 start_writer();
833 }
834 ldout(msgr->cct,20) << "accept done" << dendl;
835
836 maybe_start_delay_thread();
837
838 return 0; // success.
839
840 fail_registered:
841 ldout(msgr->cct, 10) << "accept fault after register" << dendl;
842
843 if (msgr->cct->_conf->ms_inject_internal_delays) {
844 ldout(msgr->cct, 10) << " sleep for " << msgr->cct->_conf->ms_inject_internal_delays << dendl;
845 utime_t t;
846 t.set_from_double(msgr->cct->_conf->ms_inject_internal_delays);
847 t.sleep();
848 }
849
850 fail_unlocked:
851 pipe_lock.Lock();
852 if (state != STATE_CLOSED) {
853 bool queued = is_queued();
854 ldout(msgr->cct, 10) << " queued = " << (int)queued << dendl;
855 if (queued) {
856 state = policy.server ? STATE_STANDBY : STATE_CONNECTING;
857 } else if (replaced) {
858 state = STATE_STANDBY;
859 } else {
860 state = STATE_CLOSED;
861 state_closed = true;
862 }
863 fault();
864 if (queued || replaced)
865 start_writer();
866 }
867 return -1;
868
869 shutting_down:
870 msgr->lock.Unlock();
871 shutting_down_msgr_unlocked:
872 assert(pipe_lock.is_locked());
873
874 if (msgr->cct->_conf->ms_inject_internal_delays) {
875 ldout(msgr->cct, 10) << " sleep for " << msgr->cct->_conf->ms_inject_internal_delays << dendl;
876 utime_t t;
877 t.set_from_double(msgr->cct->_conf->ms_inject_internal_delays);
878 t.sleep();
879 }
880
881 state = STATE_CLOSED;
882 state_closed = true;
883 fault();
884 return -1;
885 }
886
887 void Pipe::set_socket_options()
888 {
889 // disable Nagle algorithm?
890 if (msgr->cct->_conf->ms_tcp_nodelay) {
891 int flag = 1;
892 int r = ::setsockopt(sd, IPPROTO_TCP, TCP_NODELAY, (char*)&flag, sizeof(flag));
893 if (r < 0) {
894 r = -errno;
895 ldout(msgr->cct,0) << "couldn't set TCP_NODELAY: "
896 << cpp_strerror(r) << dendl;
897 }
898 }
899 if (msgr->cct->_conf->ms_tcp_rcvbuf) {
900 int size = msgr->cct->_conf->ms_tcp_rcvbuf;
901 int r = ::setsockopt(sd, SOL_SOCKET, SO_RCVBUF, (void*)&size, sizeof(size));
902 if (r < 0) {
903 r = -errno;
904 ldout(msgr->cct,0) << "couldn't set SO_RCVBUF to " << size
905 << ": " << cpp_strerror(r) << dendl;
906 }
907 }
908
909 // block ESIGPIPE
910 #if defined(SO_NOSIGPIPE)
911 int val = 1;
912 int r = ::setsockopt(sd, SOL_SOCKET, SO_NOSIGPIPE, (void*)&val, sizeof(val));
913 if (r) {
914 r = -errno;
915 ldout(msgr->cct,0) << "couldn't set SO_NOSIGPIPE: "
916 << cpp_strerror(r) << dendl;
917 }
918 #endif
919
920 #ifdef SO_PRIORITY
921 int prio = msgr->get_socket_priority();
922 if (prio >= 0) {
923 int r = -1;
924 #ifdef IPTOS_CLASS_CS6
925 int iptos = IPTOS_CLASS_CS6;
926 int addr_family = 0;
927 if (!peer_addr.is_blank_ip()) {
928 addr_family = peer_addr.get_family();
929 } else {
930 addr_family = msgr->get_myaddr().get_family();
931 }
932 switch (addr_family) {
933 case AF_INET:
934 r = ::setsockopt(sd, IPPROTO_IP, IP_TOS, &iptos, sizeof(iptos));
935 break;
936 case AF_INET6:
937 r = ::setsockopt(sd, IPPROTO_IPV6, IPV6_TCLASS, &iptos, sizeof(iptos));
938 break;
939 default:
940 lderr(msgr->cct) << "couldn't set ToS of unknown family ("
941 << addr_family << ")"
942 << " to " << iptos << dendl;
943 return;
944 }
945 if (r < 0) {
946 r = -errno;
947 ldout(msgr->cct,0) << "couldn't set TOS to " << iptos
948 << ": " << cpp_strerror(r) << dendl;
949 }
950 #endif
951 // setsockopt(IPTOS_CLASS_CS6) sets the priority of the socket as 0.
952 // See http://goo.gl/QWhvsD and http://goo.gl/laTbjT
953 // We need to call setsockopt(SO_PRIORITY) after it.
954 r = ::setsockopt(sd, SOL_SOCKET, SO_PRIORITY, &prio, sizeof(prio));
955 if (r < 0) {
956 r = -errno;
957 ldout(msgr->cct,0) << "couldn't set SO_PRIORITY to " << prio
958 << ": " << cpp_strerror(r) << dendl;
959 }
960 }
961 #endif
962 }
963
964 int Pipe::connect()
965 {
966 bool got_bad_auth = false;
967
968 ldout(msgr->cct,10) << "connect " << connect_seq << dendl;
969 assert(pipe_lock.is_locked());
970
971 __u32 cseq = connect_seq;
972 __u32 gseq = msgr->get_global_seq();
973
974 // stop reader thread
975 join_reader();
976
977 pipe_lock.Unlock();
978
979 char tag = -1;
980 int rc = -1;
981 struct msghdr msg;
982 struct iovec msgvec[2];
983 int msglen;
984 char banner[strlen(CEPH_BANNER) + 1]; // extra byte makes coverity happy
985 entity_addr_t paddr;
986 entity_addr_t peer_addr_for_me, socket_addr;
987 AuthAuthorizer *authorizer = NULL;
988 bufferlist addrbl, myaddrbl;
989 const md_config_t *conf = msgr->cct->_conf;
990
991 // close old socket. this is safe because we stopped the reader thread above.
992 if (sd >= 0)
993 ::close(sd);
994
995 // create socket?
996 sd = ::socket(peer_addr.get_family(), SOCK_STREAM, 0);
997 if (sd < 0) {
998 rc = -errno;
999 lderr(msgr->cct) << "connect couldn't create socket " << cpp_strerror(rc) << dendl;
1000 goto fail;
1001 }
1002
1003 recv_reset();
1004
1005 set_socket_options();
1006
1007 {
1008 entity_addr_t addr2bind = msgr->get_myaddr();
1009 if (msgr->cct->_conf->ms_bind_before_connect && (!addr2bind.is_blank_ip())) {
1010 addr2bind.set_port(0);
1011 int r = ::bind(sd , addr2bind.get_sockaddr(), addr2bind.get_sockaddr_len());
1012 if (r < 0) {
1013 ldout(msgr->cct,2) << "client bind error " << ", " << cpp_strerror(errno) << dendl;
1014 goto fail;
1015 }
1016 }
1017 }
1018
1019 // connect!
1020 ldout(msgr->cct,10) << "connecting to " << peer_addr << dendl;
1021 rc = ::connect(sd, peer_addr.get_sockaddr(), peer_addr.get_sockaddr_len());
1022 if (rc < 0) {
1023 int stored_errno = errno;
1024 ldout(msgr->cct,2) << "connect error " << peer_addr
1025 << ", " << cpp_strerror(stored_errno) << dendl;
1026 if (stored_errno == ECONNREFUSED) {
1027 ldout(msgr->cct, 2) << "connection refused!" << dendl;
1028 msgr->dispatch_queue.queue_refused(connection_state.get());
1029 }
1030 goto fail;
1031 }
1032
1033 // verify banner
1034 // FIXME: this should be non-blocking, or in some other way verify the banner as we get it.
1035 rc = tcp_read((char*)&banner, strlen(CEPH_BANNER));
1036 if (rc < 0) {
1037 ldout(msgr->cct,2) << "connect couldn't read banner, " << cpp_strerror(rc) << dendl;
1038 goto fail;
1039 }
1040 if (memcmp(banner, CEPH_BANNER, strlen(CEPH_BANNER))) {
1041 ldout(msgr->cct,0) << "connect protocol error (bad banner) on peer " << peer_addr << dendl;
1042 goto fail;
1043 }
1044
1045 memset(&msg, 0, sizeof(msg));
1046 msgvec[0].iov_base = banner;
1047 msgvec[0].iov_len = strlen(CEPH_BANNER);
1048 msg.msg_iov = msgvec;
1049 msg.msg_iovlen = 1;
1050 msglen = msgvec[0].iov_len;
1051 rc = do_sendmsg(&msg, msglen);
1052 if (rc < 0) {
1053 ldout(msgr->cct,2) << "connect couldn't write my banner, " << cpp_strerror(rc) << dendl;
1054 goto fail;
1055 }
1056
1057 // identify peer
1058 {
1059 #if defined(__linux__) || defined(DARWIN) || defined(__FreeBSD__)
1060 bufferptr p(sizeof(ceph_entity_addr) * 2);
1061 #else
1062 int wirelen = sizeof(__u32) * 2 + sizeof(ceph_sockaddr_storage);
1063 bufferptr p(wirelen * 2);
1064 #endif
1065 addrbl.push_back(std::move(p));
1066 }
1067 rc = tcp_read(addrbl.c_str(), addrbl.length());
1068 if (rc < 0) {
1069 ldout(msgr->cct,2) << "connect couldn't read peer addrs, " << cpp_strerror(rc) << dendl;
1070 goto fail;
1071 }
1072 try {
1073 bufferlist::iterator p = addrbl.begin();
1074 ::decode(paddr, p);
1075 ::decode(peer_addr_for_me, p);
1076 }
1077 catch (buffer::error& e) {
1078 ldout(msgr->cct,2) << "connect couldn't decode peer addrs: " << e.what()
1079 << dendl;
1080 goto fail;
1081 }
1082 port = peer_addr_for_me.get_port();
1083
1084 ldout(msgr->cct,20) << "connect read peer addr " << paddr << " on socket " << sd << dendl;
1085 if (peer_addr != paddr) {
1086 if (paddr.is_blank_ip() &&
1087 peer_addr.get_port() == paddr.get_port() &&
1088 peer_addr.get_nonce() == paddr.get_nonce()) {
1089 ldout(msgr->cct,0) << "connect claims to be "
1090 << paddr << " not " << peer_addr << " - presumably this is the same node!" << dendl;
1091 } else {
1092 ldout(msgr->cct,0) << "connect claims to be "
1093 << paddr << " not " << peer_addr << " - wrong node!" << dendl;
1094 goto fail;
1095 }
1096 }
1097
1098 ldout(msgr->cct,20) << "connect peer addr for me is " << peer_addr_for_me << dendl;
1099
1100 msgr->learned_addr(peer_addr_for_me);
1101
1102 ::encode(msgr->my_inst.addr, myaddrbl, 0); // legacy
1103
1104 memset(&msg, 0, sizeof(msg));
1105 msgvec[0].iov_base = myaddrbl.c_str();
1106 msgvec[0].iov_len = myaddrbl.length();
1107 msg.msg_iov = msgvec;
1108 msg.msg_iovlen = 1;
1109 msglen = msgvec[0].iov_len;
1110 rc = do_sendmsg(&msg, msglen);
1111 if (rc < 0) {
1112 ldout(msgr->cct,2) << "connect couldn't write my addr, " << cpp_strerror(rc) << dendl;
1113 goto fail;
1114 }
1115 ldout(msgr->cct,10) << "connect sent my addr " << msgr->my_inst.addr << dendl;
1116
1117
1118 while (1) {
1119 delete authorizer;
1120 authorizer = msgr->get_authorizer(peer_type, false);
1121 bufferlist authorizer_reply;
1122
1123 ceph_msg_connect connect;
1124 connect.features = policy.features_supported;
1125 connect.host_type = msgr->get_myinst().name.type();
1126 connect.global_seq = gseq;
1127 connect.connect_seq = cseq;
1128 connect.protocol_version = msgr->get_proto_version(peer_type, true);
1129 connect.authorizer_protocol = authorizer ? authorizer->protocol : 0;
1130 connect.authorizer_len = authorizer ? authorizer->bl.length() : 0;
1131 if (authorizer)
1132 ldout(msgr->cct,10) << "connect.authorizer_len=" << connect.authorizer_len
1133 << " protocol=" << connect.authorizer_protocol << dendl;
1134 connect.flags = 0;
1135 if (policy.lossy)
1136 connect.flags |= CEPH_MSG_CONNECT_LOSSY; // this is fyi, actually, server decides!
1137 memset(&msg, 0, sizeof(msg));
1138 msgvec[0].iov_base = (char*)&connect;
1139 msgvec[0].iov_len = sizeof(connect);
1140 msg.msg_iov = msgvec;
1141 msg.msg_iovlen = 1;
1142 msglen = msgvec[0].iov_len;
1143 if (authorizer) {
1144 msgvec[1].iov_base = authorizer->bl.c_str();
1145 msgvec[1].iov_len = authorizer->bl.length();
1146 msg.msg_iovlen++;
1147 msglen += msgvec[1].iov_len;
1148 }
1149
1150 ldout(msgr->cct,10) << "connect sending gseq=" << gseq << " cseq=" << cseq
1151 << " proto=" << connect.protocol_version << dendl;
1152 rc = do_sendmsg(&msg, msglen);
1153 if (rc < 0) {
1154 ldout(msgr->cct,2) << "connect couldn't write gseq, cseq, " << cpp_strerror(rc) << dendl;
1155 goto fail;
1156 }
1157
1158 ldout(msgr->cct,20) << "connect wrote (self +) cseq, waiting for reply" << dendl;
1159 ceph_msg_connect_reply reply;
1160 rc = tcp_read((char*)&reply, sizeof(reply));
1161 if (rc < 0) {
1162 ldout(msgr->cct,2) << "connect read reply " << cpp_strerror(rc) << dendl;
1163 goto fail;
1164 }
1165
1166 ldout(msgr->cct,20) << "connect got reply tag " << (int)reply.tag
1167 << " connect_seq " << reply.connect_seq
1168 << " global_seq " << reply.global_seq
1169 << " proto " << reply.protocol_version
1170 << " flags " << (int)reply.flags
1171 << " features " << reply.features
1172 << dendl;
1173
1174 authorizer_reply.clear();
1175
1176 if (reply.authorizer_len) {
1177 ldout(msgr->cct,10) << "reply.authorizer_len=" << reply.authorizer_len << dendl;
1178 bufferptr bp = buffer::create(reply.authorizer_len);
1179 rc = tcp_read(bp.c_str(), reply.authorizer_len);
1180 if (rc < 0) {
1181 ldout(msgr->cct,10) << "connect couldn't read connect authorizer_reply" << cpp_strerror(rc) << dendl;
1182 goto fail;
1183 }
1184 authorizer_reply.push_back(bp);
1185 }
1186
1187 if (authorizer) {
1188 bufferlist::iterator iter = authorizer_reply.begin();
1189 if (!authorizer->verify_reply(iter)) {
1190 ldout(msgr->cct,0) << "failed verifying authorize reply" << dendl;
1191 goto fail;
1192 }
1193 }
1194
1195 if (conf->ms_inject_internal_delays) {
1196 ldout(msgr->cct, 10) << " sleep for " << msgr->cct->_conf->ms_inject_internal_delays << dendl;
1197 utime_t t;
1198 t.set_from_double(msgr->cct->_conf->ms_inject_internal_delays);
1199 t.sleep();
1200 }
1201
1202 pipe_lock.Lock();
1203 if (state != STATE_CONNECTING) {
1204 ldout(msgr->cct,0) << "connect got RESETSESSION but no longer connecting" << dendl;
1205 goto stop_locked;
1206 }
1207
1208 if (reply.tag == CEPH_MSGR_TAG_FEATURES) {
1209 ldout(msgr->cct,0) << "connect protocol feature mismatch, my " << std::hex
1210 << connect.features << " < peer " << reply.features
1211 << " missing " << (reply.features & ~policy.features_supported)
1212 << std::dec << dendl;
1213 goto fail_locked;
1214 }
1215
1216 if (reply.tag == CEPH_MSGR_TAG_BADPROTOVER) {
1217 ldout(msgr->cct,0) << "connect protocol version mismatch, my " << connect.protocol_version
1218 << " != " << reply.protocol_version << dendl;
1219 goto fail_locked;
1220 }
1221
1222 if (reply.tag == CEPH_MSGR_TAG_BADAUTHORIZER) {
1223 ldout(msgr->cct,0) << "connect got BADAUTHORIZER" << dendl;
1224 if (got_bad_auth)
1225 goto stop_locked;
1226 got_bad_auth = true;
1227 pipe_lock.Unlock();
1228 delete authorizer;
1229 authorizer = msgr->get_authorizer(peer_type, true); // try harder
1230 continue;
1231 }
1232 if (reply.tag == CEPH_MSGR_TAG_RESETSESSION) {
1233 ldout(msgr->cct,0) << "connect got RESETSESSION" << dendl;
1234 was_session_reset();
1235 cseq = 0;
1236 pipe_lock.Unlock();
1237 continue;
1238 }
1239 if (reply.tag == CEPH_MSGR_TAG_RETRY_GLOBAL) {
1240 gseq = msgr->get_global_seq(reply.global_seq);
1241 ldout(msgr->cct,10) << "connect got RETRY_GLOBAL " << reply.global_seq
1242 << " chose new " << gseq << dendl;
1243 pipe_lock.Unlock();
1244 continue;
1245 }
1246 if (reply.tag == CEPH_MSGR_TAG_RETRY_SESSION) {
1247 assert(reply.connect_seq > connect_seq);
1248 ldout(msgr->cct,10) << "connect got RETRY_SESSION " << connect_seq
1249 << " -> " << reply.connect_seq << dendl;
1250 cseq = connect_seq = reply.connect_seq;
1251 pipe_lock.Unlock();
1252 continue;
1253 }
1254
1255 if (reply.tag == CEPH_MSGR_TAG_WAIT) {
1256 ldout(msgr->cct,3) << "connect got WAIT (connection race)" << dendl;
1257 state = STATE_WAIT;
1258 goto stop_locked;
1259 }
1260
1261 if (reply.tag == CEPH_MSGR_TAG_READY ||
1262 reply.tag == CEPH_MSGR_TAG_SEQ) {
1263 uint64_t feat_missing = policy.features_required & ~(uint64_t)reply.features;
1264 if (feat_missing) {
1265 ldout(msgr->cct,1) << "missing required features " << std::hex << feat_missing << std::dec << dendl;
1266 goto fail_locked;
1267 }
1268
1269 if (reply.tag == CEPH_MSGR_TAG_SEQ) {
1270 ldout(msgr->cct,10) << "got CEPH_MSGR_TAG_SEQ, reading acked_seq and writing in_seq" << dendl;
1271 uint64_t newly_acked_seq = 0;
1272 rc = tcp_read((char*)&newly_acked_seq, sizeof(newly_acked_seq));
1273 if (rc < 0) {
1274 ldout(msgr->cct,2) << "connect read error on newly_acked_seq" << cpp_strerror(rc) << dendl;
1275 goto fail_locked;
1276 }
1277 ldout(msgr->cct,2) << " got newly_acked_seq " << newly_acked_seq
1278 << " vs out_seq " << out_seq << dendl;
1279 while (newly_acked_seq > out_seq) {
1280 Message *m = _get_next_outgoing();
1281 assert(m);
1282 ldout(msgr->cct,2) << " discarding previously sent " << m->get_seq()
1283 << " " << *m << dendl;
1284 assert(m->get_seq() <= newly_acked_seq);
1285 m->put();
1286 ++out_seq;
1287 }
1288 if (tcp_write((char*)&in_seq, sizeof(in_seq)) < 0) {
1289 ldout(msgr->cct,2) << "connect write error on in_seq" << dendl;
1290 goto fail_locked;
1291 }
1292 }
1293
1294 // hooray!
1295 peer_global_seq = reply.global_seq;
1296 policy.lossy = reply.flags & CEPH_MSG_CONNECT_LOSSY;
1297 state = STATE_OPEN;
1298 connect_seq = cseq + 1;
1299 assert(connect_seq == reply.connect_seq);
1300 backoff = utime_t();
1301 connection_state->set_features((uint64_t)reply.features & (uint64_t)connect.features);
1302 ldout(msgr->cct,10) << "connect success " << connect_seq << ", lossy = " << policy.lossy
1303 << ", features " << connection_state->get_features() << dendl;
1304
1305
1306 // If we have an authorizer, get a new AuthSessionHandler to deal with ongoing security of the
1307 // connection. PLR
1308
1309 if (authorizer != NULL) {
1310 session_security.reset(
1311 get_auth_session_handler(msgr->cct,
1312 authorizer->protocol,
1313 authorizer->session_key,
1314 connection_state->get_features()));
1315 } else {
1316 // We have no authorizer, so we shouldn't be applying security to messages in this pipe. PLR
1317 session_security.reset();
1318 }
1319
1320 msgr->dispatch_queue.queue_connect(connection_state.get());
1321 msgr->ms_deliver_handle_fast_connect(connection_state.get());
1322
1323 if (!reader_running) {
1324 ldout(msgr->cct,20) << "connect starting reader" << dendl;
1325 start_reader();
1326 }
1327 maybe_start_delay_thread();
1328 delete authorizer;
1329 return 0;
1330 }
1331
1332 // protocol error
1333 ldout(msgr->cct,0) << "connect got bad tag " << (int)tag << dendl;
1334 goto fail_locked;
1335 }
1336
1337 fail:
1338 if (conf->ms_inject_internal_delays) {
1339 ldout(msgr->cct, 10) << " sleep for " << msgr->cct->_conf->ms_inject_internal_delays << dendl;
1340 utime_t t;
1341 t.set_from_double(msgr->cct->_conf->ms_inject_internal_delays);
1342 t.sleep();
1343 }
1344
1345 pipe_lock.Lock();
1346 fail_locked:
1347 if (state == STATE_CONNECTING)
1348 fault();
1349 else
1350 ldout(msgr->cct,3) << "connect fault, but state = " << get_state_name()
1351 << " != connecting, stopping" << dendl;
1352
1353 stop_locked:
1354 delete authorizer;
1355 return rc;
1356 }
1357
1358 void Pipe::register_pipe()
1359 {
1360 ldout(msgr->cct,10) << "register_pipe" << dendl;
1361 assert(msgr->lock.is_locked());
1362 Pipe *existing = msgr->_lookup_pipe(peer_addr);
1363 assert(existing == NULL);
1364 msgr->rank_pipe[peer_addr] = this;
1365 }
1366
1367 void Pipe::unregister_pipe()
1368 {
1369 assert(msgr->lock.is_locked());
1370 ceph::unordered_map<entity_addr_t,Pipe*>::iterator p = msgr->rank_pipe.find(peer_addr);
1371 if (p != msgr->rank_pipe.end() && p->second == this) {
1372 ldout(msgr->cct,10) << "unregister_pipe" << dendl;
1373 msgr->rank_pipe.erase(p);
1374 } else {
1375 ldout(msgr->cct,10) << "unregister_pipe - not registered" << dendl;
1376 msgr->accepting_pipes.erase(this); // somewhat overkill, but safe.
1377 }
1378 }
1379
1380 void Pipe::join()
1381 {
1382 ldout(msgr->cct, 20) << "join" << dendl;
1383 if (writer_thread.is_started())
1384 writer_thread.join();
1385 if (reader_thread.is_started())
1386 reader_thread.join();
1387 if (delay_thread) {
1388 ldout(msgr->cct, 20) << "joining delay_thread" << dendl;
1389 delay_thread->stop();
1390 delay_thread->join();
1391 }
1392 }
1393
1394 void Pipe::requeue_sent()
1395 {
1396 if (sent.empty())
1397 return;
1398
1399 list<Message*>& rq = out_q[CEPH_MSG_PRIO_HIGHEST];
1400 while (!sent.empty()) {
1401 Message *m = sent.back();
1402 sent.pop_back();
1403 ldout(msgr->cct,10) << "requeue_sent " << *m << " for resend seq " << out_seq
1404 << " (" << m->get_seq() << ")" << dendl;
1405 rq.push_front(m);
1406 out_seq--;
1407 }
1408 }
1409
1410 void Pipe::discard_requeued_up_to(uint64_t seq)
1411 {
1412 ldout(msgr->cct, 10) << "discard_requeued_up_to " << seq << dendl;
1413 if (out_q.count(CEPH_MSG_PRIO_HIGHEST) == 0)
1414 return;
1415 list<Message*>& rq = out_q[CEPH_MSG_PRIO_HIGHEST];
1416 while (!rq.empty()) {
1417 Message *m = rq.front();
1418 if (m->get_seq() == 0 || m->get_seq() > seq)
1419 break;
1420 ldout(msgr->cct,10) << "discard_requeued_up_to " << *m << " for resend seq " << out_seq
1421 << " <= " << seq << ", discarding" << dendl;
1422 m->put();
1423 rq.pop_front();
1424 out_seq++;
1425 }
1426 if (rq.empty())
1427 out_q.erase(CEPH_MSG_PRIO_HIGHEST);
1428 }
1429
1430 /*
1431 * Tears down the Pipe's message queues, and removes them from the DispatchQueue
1432 * Must hold pipe_lock prior to calling.
1433 */
1434 void Pipe::discard_out_queue()
1435 {
1436 ldout(msgr->cct,10) << "discard_queue" << dendl;
1437
1438 for (list<Message*>::iterator p = sent.begin(); p != sent.end(); ++p) {
1439 ldout(msgr->cct,20) << " discard " << *p << dendl;
1440 (*p)->put();
1441 }
1442 sent.clear();
1443 for (map<int,list<Message*> >::iterator p = out_q.begin(); p != out_q.end(); ++p)
1444 for (list<Message*>::iterator r = p->second.begin(); r != p->second.end(); ++r) {
1445 ldout(msgr->cct,20) << " discard " << *r << dendl;
1446 (*r)->put();
1447 }
1448 out_q.clear();
1449 }
1450
1451 void Pipe::fault(bool onread)
1452 {
1453 const md_config_t *conf = msgr->cct->_conf;
1454 assert(pipe_lock.is_locked());
1455 cond.Signal();
1456
1457 if (onread && state == STATE_CONNECTING) {
1458 ldout(msgr->cct,10) << "fault already connecting, reader shutting down" << dendl;
1459 return;
1460 }
1461
1462 ldout(msgr->cct,2) << "fault " << cpp_strerror(errno) << dendl;
1463
1464 if (state == STATE_CLOSED ||
1465 state == STATE_CLOSING) {
1466 ldout(msgr->cct,10) << "fault already closed|closing" << dendl;
1467 if (connection_state->clear_pipe(this))
1468 msgr->dispatch_queue.queue_reset(connection_state.get());
1469 return;
1470 }
1471
1472 shutdown_socket();
1473
1474 // lossy channel?
1475 if (policy.lossy && state != STATE_CONNECTING) {
1476 ldout(msgr->cct,10) << "fault on lossy channel, failing" << dendl;
1477
1478 // disconnect from Connection, and mark it failed. future messages
1479 // will be dropped.
1480 assert(connection_state);
1481 stop();
1482 bool cleared = connection_state->clear_pipe(this);
1483
1484 // crib locks, blech. note that Pipe is now STATE_CLOSED and the
1485 // rank_pipe entry is ignored by others.
1486 pipe_lock.Unlock();
1487
1488 if (conf->ms_inject_internal_delays) {
1489 ldout(msgr->cct, 10) << " sleep for " << msgr->cct->_conf->ms_inject_internal_delays << dendl;
1490 utime_t t;
1491 t.set_from_double(msgr->cct->_conf->ms_inject_internal_delays);
1492 t.sleep();
1493 }
1494
1495 msgr->lock.Lock();
1496 pipe_lock.Lock();
1497 unregister_pipe();
1498 msgr->lock.Unlock();
1499
1500 if (delay_thread)
1501 delay_thread->discard();
1502 in_q->discard_queue(conn_id);
1503 discard_out_queue();
1504 if (cleared)
1505 msgr->dispatch_queue.queue_reset(connection_state.get());
1506 return;
1507 }
1508
1509 // queue delayed items immediately
1510 if (delay_thread)
1511 delay_thread->flush();
1512
1513 // requeue sent items
1514 requeue_sent();
1515
1516 if (policy.standby && !is_queued()) {
1517 ldout(msgr->cct,0) << "fault with nothing to send, going to standby" << dendl;
1518 state = STATE_STANDBY;
1519 return;
1520 }
1521
1522 if (state != STATE_CONNECTING) {
1523 if (policy.server) {
1524 ldout(msgr->cct,0) << "fault, server, going to standby" << dendl;
1525 state = STATE_STANDBY;
1526 } else {
1527 ldout(msgr->cct,0) << "fault, initiating reconnect" << dendl;
1528 connect_seq++;
1529 state = STATE_CONNECTING;
1530 }
1531 backoff = utime_t();
1532 } else if (backoff == utime_t()) {
1533 ldout(msgr->cct,0) << "fault" << dendl;
1534 backoff.set_from_double(conf->ms_initial_backoff);
1535 } else {
1536 ldout(msgr->cct,10) << "fault waiting " << backoff << dendl;
1537 cond.WaitInterval(pipe_lock, backoff);
1538 backoff += backoff;
1539 if (backoff > conf->ms_max_backoff)
1540 backoff.set_from_double(conf->ms_max_backoff);
1541 ldout(msgr->cct,10) << "fault done waiting or woke up" << dendl;
1542 }
1543 }
1544
1545 int Pipe::randomize_out_seq()
1546 {
1547 if (connection_state->get_features() & CEPH_FEATURE_MSG_AUTH) {
1548 // Set out_seq to a random value, so CRC won't be predictable. Don't bother checking seq_error
1549 // here. We'll check it on the call. PLR
1550 int seq_error = get_random_bytes((char *)&out_seq, sizeof(out_seq));
1551 out_seq &= SEQ_MASK;
1552 lsubdout(msgr->cct, ms, 10) << "randomize_out_seq " << out_seq << dendl;
1553 return seq_error;
1554 } else {
1555 // previously, seq #'s always started at 0.
1556 out_seq = 0;
1557 return 0;
1558 }
1559 }
1560
1561 void Pipe::was_session_reset()
1562 {
1563 assert(pipe_lock.is_locked());
1564
1565 ldout(msgr->cct,10) << "was_session_reset" << dendl;
1566 in_q->discard_queue(conn_id);
1567 if (delay_thread)
1568 delay_thread->discard();
1569 discard_out_queue();
1570
1571 msgr->dispatch_queue.queue_remote_reset(connection_state.get());
1572
1573 if (randomize_out_seq()) {
1574 lsubdout(msgr->cct,ms,15) << "was_session_reset(): Could not get random bytes to set seq number for session reset; set seq number to " << out_seq << dendl;
1575 }
1576
1577 in_seq = 0;
1578 connect_seq = 0;
1579 }
1580
1581 void Pipe::stop()
1582 {
1583 ldout(msgr->cct,10) << "stop" << dendl;
1584 assert(pipe_lock.is_locked());
1585 state = STATE_CLOSED;
1586 state_closed = true;
1587 cond.Signal();
1588 shutdown_socket();
1589 }
1590
1591 void Pipe::stop_and_wait()
1592 {
1593 assert(pipe_lock.is_locked_by_me());
1594 if (state != STATE_CLOSED)
1595 stop();
1596
1597 if (msgr->cct->_conf->ms_inject_internal_delays) {
1598 ldout(msgr->cct, 10) << __func__ << " sleep for "
1599 << msgr->cct->_conf->ms_inject_internal_delays
1600 << dendl;
1601 utime_t t;
1602 t.set_from_double(msgr->cct->_conf->ms_inject_internal_delays);
1603 t.sleep();
1604 }
1605
1606 if (delay_thread) {
1607 pipe_lock.Unlock();
1608 delay_thread->stop_fast_dispatching();
1609 pipe_lock.Lock();
1610 }
1611 while (reader_running &&
1612 reader_dispatching)
1613 cond.Wait(pipe_lock);
1614 }
1615
1616 /* read msgs from socket.
1617 * also, server.
1618 */
1619 void Pipe::reader()
1620 {
1621 pipe_lock.Lock();
1622
1623 if (state == STATE_ACCEPTING) {
1624 accept();
1625 assert(pipe_lock.is_locked());
1626 }
1627
1628 // loop.
1629 while (state != STATE_CLOSED &&
1630 state != STATE_CONNECTING) {
1631 assert(pipe_lock.is_locked());
1632
1633 // sleep if (re)connecting
1634 if (state == STATE_STANDBY) {
1635 ldout(msgr->cct,20) << "reader sleeping during reconnect|standby" << dendl;
1636 cond.Wait(pipe_lock);
1637 continue;
1638 }
1639
1640 // get a reference to the AuthSessionHandler while we have the pipe_lock
1641 ceph::shared_ptr<AuthSessionHandler> auth_handler = session_security;
1642
1643 pipe_lock.Unlock();
1644
1645 char tag = -1;
1646 ldout(msgr->cct,20) << "reader reading tag..." << dendl;
1647 if (tcp_read((char*)&tag, 1) < 0) {
1648 pipe_lock.Lock();
1649 ldout(msgr->cct,2) << "reader couldn't read tag, " << cpp_strerror(errno) << dendl;
1650 fault(true);
1651 continue;
1652 }
1653
1654 if (tag == CEPH_MSGR_TAG_KEEPALIVE) {
1655 ldout(msgr->cct,2) << "reader got KEEPALIVE" << dendl;
1656 pipe_lock.Lock();
1657 connection_state->set_last_keepalive(ceph_clock_now());
1658 continue;
1659 }
1660 if (tag == CEPH_MSGR_TAG_KEEPALIVE2) {
1661 ldout(msgr->cct,30) << "reader got KEEPALIVE2 tag ..." << dendl;
1662 ceph_timespec t;
1663 int rc = tcp_read((char*)&t, sizeof(t));
1664 pipe_lock.Lock();
1665 if (rc < 0) {
1666 ldout(msgr->cct,2) << "reader couldn't read KEEPALIVE2 stamp "
1667 << cpp_strerror(errno) << dendl;
1668 fault(true);
1669 } else {
1670 send_keepalive_ack = true;
1671 keepalive_ack_stamp = utime_t(t);
1672 ldout(msgr->cct,2) << "reader got KEEPALIVE2 " << keepalive_ack_stamp
1673 << dendl;
1674 connection_state->set_last_keepalive(ceph_clock_now());
1675 cond.Signal();
1676 }
1677 continue;
1678 }
1679 if (tag == CEPH_MSGR_TAG_KEEPALIVE2_ACK) {
1680 ldout(msgr->cct,2) << "reader got KEEPALIVE_ACK" << dendl;
1681 struct ceph_timespec t;
1682 int rc = tcp_read((char*)&t, sizeof(t));
1683 pipe_lock.Lock();
1684 if (rc < 0) {
1685 ldout(msgr->cct,2) << "reader couldn't read KEEPALIVE2 stamp " << cpp_strerror(errno) << dendl;
1686 fault(true);
1687 } else {
1688 connection_state->set_last_keepalive_ack(utime_t(t));
1689 }
1690 continue;
1691 }
1692
1693 // open ...
1694 if (tag == CEPH_MSGR_TAG_ACK) {
1695 ldout(msgr->cct,20) << "reader got ACK" << dendl;
1696 ceph_le64 seq;
1697 int rc = tcp_read((char*)&seq, sizeof(seq));
1698 pipe_lock.Lock();
1699 if (rc < 0) {
1700 ldout(msgr->cct,2) << "reader couldn't read ack seq, " << cpp_strerror(errno) << dendl;
1701 fault(true);
1702 } else if (state != STATE_CLOSED) {
1703 handle_ack(seq);
1704 }
1705 continue;
1706 }
1707
1708 else if (tag == CEPH_MSGR_TAG_MSG) {
1709 ldout(msgr->cct,20) << "reader got MSG" << dendl;
1710 Message *m = 0;
1711 int r = read_message(&m, auth_handler.get());
1712
1713 pipe_lock.Lock();
1714
1715 if (!m) {
1716 if (r < 0)
1717 fault(true);
1718 continue;
1719 }
1720
1721 m->trace.event("pipe read message");
1722
1723 if (state == STATE_CLOSED ||
1724 state == STATE_CONNECTING) {
1725 in_q->dispatch_throttle_release(m->get_dispatch_throttle_size());
1726 m->put();
1727 continue;
1728 }
1729
1730 // check received seq#. if it is old, drop the message.
1731 // note that incoming messages may skip ahead. this is convenient for the client
1732 // side queueing because messages can't be renumbered, but the (kernel) client will
1733 // occasionally pull a message out of the sent queue to send elsewhere. in that case
1734 // it doesn't matter if we "got" it or not.
1735 if (m->get_seq() <= in_seq) {
1736 ldout(msgr->cct,0) << "reader got old message "
1737 << m->get_seq() << " <= " << in_seq << " " << m << " " << *m
1738 << ", discarding" << dendl;
1739 in_q->dispatch_throttle_release(m->get_dispatch_throttle_size());
1740 m->put();
1741 if (connection_state->has_feature(CEPH_FEATURE_RECONNECT_SEQ) &&
1742 msgr->cct->_conf->ms_die_on_old_message)
1743 assert(0 == "old msgs despite reconnect_seq feature");
1744 continue;
1745 }
1746 if (m->get_seq() > in_seq + 1) {
1747 ldout(msgr->cct,0) << "reader missed message? skipped from seq "
1748 << in_seq << " to " << m->get_seq() << dendl;
1749 if (msgr->cct->_conf->ms_die_on_skipped_message)
1750 assert(0 == "skipped incoming seq");
1751 }
1752
1753 m->set_connection(connection_state.get());
1754
1755 // note last received message.
1756 in_seq = m->get_seq();
1757
1758 cond.Signal(); // wake up writer, to ack this
1759
1760 ldout(msgr->cct,10) << "reader got message "
1761 << m->get_seq() << " " << m << " " << *m
1762 << dendl;
1763 in_q->fast_preprocess(m);
1764
1765 if (delay_thread) {
1766 utime_t release;
1767 if (rand() % 10000 < msgr->cct->_conf->ms_inject_delay_probability * 10000.0) {
1768 release = m->get_recv_stamp();
1769 release += msgr->cct->_conf->ms_inject_delay_max * (double)(rand() % 10000) / 10000.0;
1770 lsubdout(msgr->cct, ms, 1) << "queue_received will delay until " << release << " on " << m << " " << *m << dendl;
1771 }
1772 delay_thread->queue(release, m);
1773 } else {
1774 if (in_q->can_fast_dispatch(m)) {
1775 reader_dispatching = true;
1776 pipe_lock.Unlock();
1777 in_q->fast_dispatch(m);
1778 pipe_lock.Lock();
1779 reader_dispatching = false;
1780 if (state == STATE_CLOSED ||
1781 notify_on_dispatch_done) { // there might be somebody waiting
1782 notify_on_dispatch_done = false;
1783 cond.Signal();
1784 }
1785 } else {
1786 in_q->enqueue(m, m->get_priority(), conn_id);
1787 }
1788 }
1789 }
1790
1791 else if (tag == CEPH_MSGR_TAG_CLOSE) {
1792 ldout(msgr->cct,20) << "reader got CLOSE" << dendl;
1793 pipe_lock.Lock();
1794 if (state == STATE_CLOSING) {
1795 state = STATE_CLOSED;
1796 state_closed = true;
1797 } else {
1798 state = STATE_CLOSING;
1799 }
1800 cond.Signal();
1801 break;
1802 }
1803 else {
1804 ldout(msgr->cct,0) << "reader bad tag " << (int)tag << dendl;
1805 pipe_lock.Lock();
1806 fault(true);
1807 }
1808 }
1809
1810
1811 // reap?
1812 reader_running = false;
1813 reader_needs_join = true;
1814 unlock_maybe_reap();
1815 ldout(msgr->cct,10) << "reader done" << dendl;
1816 }
1817
1818 /* write msgs to socket.
1819 * also, client.
1820 */
1821 void Pipe::writer()
1822 {
1823 pipe_lock.Lock();
1824 while (state != STATE_CLOSED) {// && state != STATE_WAIT) {
1825 ldout(msgr->cct,10) << "writer: state = " << get_state_name()
1826 << " policy.server=" << policy.server << dendl;
1827
1828 // standby?
1829 if (is_queued() && state == STATE_STANDBY && !policy.server)
1830 state = STATE_CONNECTING;
1831
1832 // connect?
1833 if (state == STATE_CONNECTING) {
1834 assert(!policy.server);
1835 connect();
1836 continue;
1837 }
1838
1839 if (state == STATE_CLOSING) {
1840 // write close tag
1841 ldout(msgr->cct,20) << "writer writing CLOSE tag" << dendl;
1842 char tag = CEPH_MSGR_TAG_CLOSE;
1843 state = STATE_CLOSED;
1844 state_closed = true;
1845 pipe_lock.Unlock();
1846 if (sd >= 0) {
1847 // we can ignore return value, actually; we don't care if this succeeds.
1848 int r = ::write(sd, &tag, 1);
1849 (void)r;
1850 }
1851 pipe_lock.Lock();
1852 continue;
1853 }
1854
1855 if (state != STATE_CONNECTING && state != STATE_WAIT && state != STATE_STANDBY &&
1856 (is_queued() || in_seq > in_seq_acked)) {
1857
1858 // keepalive?
1859 if (send_keepalive) {
1860 int rc;
1861 if (connection_state->has_feature(CEPH_FEATURE_MSGR_KEEPALIVE2)) {
1862 pipe_lock.Unlock();
1863 rc = write_keepalive2(CEPH_MSGR_TAG_KEEPALIVE2,
1864 ceph_clock_now());
1865 } else {
1866 pipe_lock.Unlock();
1867 rc = write_keepalive();
1868 }
1869 pipe_lock.Lock();
1870 if (rc < 0) {
1871 ldout(msgr->cct,2) << "writer couldn't write keepalive[2], "
1872 << cpp_strerror(errno) << dendl;
1873 fault();
1874 continue;
1875 }
1876 send_keepalive = false;
1877 }
1878 if (send_keepalive_ack) {
1879 utime_t t = keepalive_ack_stamp;
1880 pipe_lock.Unlock();
1881 int rc = write_keepalive2(CEPH_MSGR_TAG_KEEPALIVE2_ACK, t);
1882 pipe_lock.Lock();
1883 if (rc < 0) {
1884 ldout(msgr->cct,2) << "writer couldn't write keepalive_ack, " << cpp_strerror(errno) << dendl;
1885 fault();
1886 continue;
1887 }
1888 send_keepalive_ack = false;
1889 }
1890
1891 // send ack?
1892 if (in_seq > in_seq_acked) {
1893 uint64_t send_seq = in_seq;
1894 pipe_lock.Unlock();
1895 int rc = write_ack(send_seq);
1896 pipe_lock.Lock();
1897 if (rc < 0) {
1898 ldout(msgr->cct,2) << "writer couldn't write ack, " << cpp_strerror(errno) << dendl;
1899 fault();
1900 continue;
1901 }
1902 in_seq_acked = send_seq;
1903 }
1904
1905 // grab outgoing message
1906 Message *m = _get_next_outgoing();
1907 if (m) {
1908 m->set_seq(++out_seq);
1909 if (!policy.lossy) {
1910 // put on sent list
1911 sent.push_back(m);
1912 m->get();
1913 }
1914
1915 // associate message with Connection (for benefit of encode_payload)
1916 m->set_connection(connection_state.get());
1917
1918 uint64_t features = connection_state->get_features();
1919
1920 if (m->empty_payload())
1921 ldout(msgr->cct,20) << "writer encoding " << m->get_seq() << " features " << features
1922 << " " << m << " " << *m << dendl;
1923 else
1924 ldout(msgr->cct,20) << "writer half-reencoding " << m->get_seq() << " features " << features
1925 << " " << m << " " << *m << dendl;
1926
1927 // encode and copy out of *m
1928 m->encode(features, msgr->crcflags);
1929
1930 // prepare everything
1931 const ceph_msg_header& header = m->get_header();
1932 const ceph_msg_footer& footer = m->get_footer();
1933
1934 // Now that we have all the crcs calculated, handle the
1935 // digital signature for the message, if the pipe has session
1936 // security set up. Some session security options do not
1937 // actually calculate and check the signature, but they should
1938 // handle the calls to sign_message and check_signature. PLR
1939 if (session_security.get() == NULL) {
1940 ldout(msgr->cct, 20) << "writer no session security" << dendl;
1941 } else {
1942 if (session_security->sign_message(m)) {
1943 ldout(msgr->cct, 20) << "writer failed to sign seq # " << header.seq
1944 << "): sig = " << footer.sig << dendl;
1945 } else {
1946 ldout(msgr->cct, 20) << "writer signed seq # " << header.seq
1947 << "): sig = " << footer.sig << dendl;
1948 }
1949 }
1950
1951 bufferlist blist = m->get_payload();
1952 blist.append(m->get_middle());
1953 blist.append(m->get_data());
1954
1955 pipe_lock.Unlock();
1956
1957 m->trace.event("pipe writing message");
1958
1959 ldout(msgr->cct,20) << "writer sending " << m->get_seq() << " " << m << dendl;
1960 int rc = write_message(header, footer, blist);
1961
1962 pipe_lock.Lock();
1963 if (rc < 0) {
1964 ldout(msgr->cct,1) << "writer error sending " << m << ", "
1965 << cpp_strerror(errno) << dendl;
1966 fault();
1967 }
1968 m->put();
1969 }
1970 continue;
1971 }
1972
1973 // wait
1974 ldout(msgr->cct,20) << "writer sleeping" << dendl;
1975 cond.Wait(pipe_lock);
1976 }
1977
1978 ldout(msgr->cct,20) << "writer finishing" << dendl;
1979
1980 // reap?
1981 writer_running = false;
1982 unlock_maybe_reap();
1983 ldout(msgr->cct,10) << "writer done" << dendl;
1984 }
1985
1986 void Pipe::unlock_maybe_reap()
1987 {
1988 if (!reader_running && !writer_running) {
1989 shutdown_socket();
1990 pipe_lock.Unlock();
1991 if (delay_thread && delay_thread->is_flushing()) {
1992 delay_thread->wait_for_flush();
1993 }
1994 msgr->queue_reap(this);
1995 } else {
1996 pipe_lock.Unlock();
1997 }
1998 }
1999
2000 static void alloc_aligned_buffer(bufferlist& data, unsigned len, unsigned off)
2001 {
2002 // create a buffer to read into that matches the data alignment
2003 unsigned left = len;
2004 if (off & ~CEPH_PAGE_MASK) {
2005 // head
2006 unsigned head = 0;
2007 head = MIN(CEPH_PAGE_SIZE - (off & ~CEPH_PAGE_MASK), left);
2008 data.push_back(buffer::create(head));
2009 left -= head;
2010 }
2011 unsigned middle = left & CEPH_PAGE_MASK;
2012 if (middle > 0) {
2013 data.push_back(buffer::create_page_aligned(middle));
2014 left -= middle;
2015 }
2016 if (left) {
2017 data.push_back(buffer::create(left));
2018 }
2019 }
2020
2021 int Pipe::read_message(Message **pm, AuthSessionHandler* auth_handler)
2022 {
2023 int ret = -1;
2024 // envelope
2025 //ldout(msgr->cct,10) << "receiver.read_message from sd " << sd << dendl;
2026
2027 ceph_msg_header header;
2028 ceph_msg_footer footer;
2029 __u32 header_crc = 0;
2030
2031 if (connection_state->has_feature(CEPH_FEATURE_NOSRCADDR)) {
2032 if (tcp_read((char*)&header, sizeof(header)) < 0)
2033 return -1;
2034 if (msgr->crcflags & MSG_CRC_HEADER) {
2035 header_crc = ceph_crc32c(0, (unsigned char *)&header, sizeof(header) - sizeof(header.crc));
2036 }
2037 } else {
2038 ceph_msg_header_old oldheader;
2039 if (tcp_read((char*)&oldheader, sizeof(oldheader)) < 0)
2040 return -1;
2041 // this is fugly
2042 memcpy(&header, &oldheader, sizeof(header));
2043 header.src = oldheader.src.name;
2044 header.reserved = oldheader.reserved;
2045 if (msgr->crcflags & MSG_CRC_HEADER) {
2046 header.crc = oldheader.crc;
2047 header_crc = ceph_crc32c(0, (unsigned char *)&oldheader, sizeof(oldheader) - sizeof(oldheader.crc));
2048 }
2049 }
2050
2051 ldout(msgr->cct,20) << "reader got envelope type=" << header.type
2052 << " src " << entity_name_t(header.src)
2053 << " front=" << header.front_len
2054 << " data=" << header.data_len
2055 << " off " << header.data_off
2056 << dendl;
2057
2058 // verify header crc
2059 if ((msgr->crcflags & MSG_CRC_HEADER) && header_crc != header.crc) {
2060 ldout(msgr->cct,0) << "reader got bad header crc " << header_crc << " != " << header.crc << dendl;
2061 return -1;
2062 }
2063
2064 bufferlist front, middle, data;
2065 int front_len, middle_len;
2066 unsigned data_len, data_off;
2067 int aborted;
2068 Message *message;
2069 utime_t recv_stamp = ceph_clock_now();
2070
2071 if (policy.throttler_messages) {
2072 ldout(msgr->cct,10) << "reader wants " << 1 << " message from policy throttler "
2073 << policy.throttler_messages->get_current() << "/"
2074 << policy.throttler_messages->get_max() << dendl;
2075 policy.throttler_messages->get();
2076 }
2077
2078 uint64_t message_size = header.front_len + header.middle_len + header.data_len;
2079 if (message_size) {
2080 if (policy.throttler_bytes) {
2081 ldout(msgr->cct,10) << "reader wants " << message_size << " bytes from policy throttler "
2082 << policy.throttler_bytes->get_current() << "/"
2083 << policy.throttler_bytes->get_max() << dendl;
2084 policy.throttler_bytes->get(message_size);
2085 }
2086
2087 // throttle total bytes waiting for dispatch. do this _after_ the
2088 // policy throttle, as this one does not deadlock (unless dispatch
2089 // blocks indefinitely, which it shouldn't). in contrast, the
2090 // policy throttle carries for the lifetime of the message.
2091 ldout(msgr->cct,10) << "reader wants " << message_size << " from dispatch throttler "
2092 << in_q->dispatch_throttler.get_current() << "/"
2093 << in_q->dispatch_throttler.get_max() << dendl;
2094 in_q->dispatch_throttler.get(message_size);
2095 }
2096
2097 utime_t throttle_stamp = ceph_clock_now();
2098
2099 // read front
2100 front_len = header.front_len;
2101 if (front_len) {
2102 bufferptr bp = buffer::create(front_len);
2103 if (tcp_read(bp.c_str(), front_len) < 0)
2104 goto out_dethrottle;
2105 front.push_back(std::move(bp));
2106 ldout(msgr->cct,20) << "reader got front " << front.length() << dendl;
2107 }
2108
2109 // read middle
2110 middle_len = header.middle_len;
2111 if (middle_len) {
2112 bufferptr bp = buffer::create(middle_len);
2113 if (tcp_read(bp.c_str(), middle_len) < 0)
2114 goto out_dethrottle;
2115 middle.push_back(std::move(bp));
2116 ldout(msgr->cct,20) << "reader got middle " << middle.length() << dendl;
2117 }
2118
2119
2120 // read data
2121 data_len = le32_to_cpu(header.data_len);
2122 data_off = le32_to_cpu(header.data_off);
2123 if (data_len) {
2124 unsigned offset = 0;
2125 unsigned left = data_len;
2126
2127 bufferlist newbuf, rxbuf;
2128 bufferlist::iterator blp;
2129 int rxbuf_version = 0;
2130
2131 while (left > 0) {
2132 // wait for data
2133 if (tcp_read_wait() < 0)
2134 goto out_dethrottle;
2135
2136 // get a buffer
2137 connection_state->lock.Lock();
2138 map<ceph_tid_t,pair<bufferlist,int> >::iterator p = connection_state->rx_buffers.find(header.tid);
2139 if (p != connection_state->rx_buffers.end()) {
2140 if (rxbuf.length() == 0 || p->second.second != rxbuf_version) {
2141 ldout(msgr->cct,10) << "reader seleting rx buffer v " << p->second.second
2142 << " at offset " << offset
2143 << " len " << p->second.first.length() << dendl;
2144 rxbuf = p->second.first;
2145 rxbuf_version = p->second.second;
2146 // make sure it's big enough
2147 if (rxbuf.length() < data_len)
2148 rxbuf.push_back(buffer::create(data_len - rxbuf.length()));
2149 blp = p->second.first.begin();
2150 blp.advance(offset);
2151 }
2152 } else {
2153 if (!newbuf.length()) {
2154 ldout(msgr->cct,20) << "reader allocating new rx buffer at offset " << offset << dendl;
2155 alloc_aligned_buffer(newbuf, data_len, data_off);
2156 blp = newbuf.begin();
2157 blp.advance(offset);
2158 }
2159 }
2160 bufferptr bp = blp.get_current_ptr();
2161 int read = MIN(bp.length(), left);
2162 ldout(msgr->cct,20) << "reader reading nonblocking into " << (void*)bp.c_str() << " len " << bp.length() << dendl;
2163 ssize_t got = tcp_read_nonblocking(bp.c_str(), read);
2164 ldout(msgr->cct,30) << "reader read " << got << " of " << read << dendl;
2165 connection_state->lock.Unlock();
2166 if (got < 0)
2167 goto out_dethrottle;
2168 if (got > 0) {
2169 blp.advance(got);
2170 data.append(bp, 0, got);
2171 offset += got;
2172 left -= got;
2173 } // else we got a signal or something; just loop.
2174 }
2175 }
2176
2177 // footer
2178 if (connection_state->has_feature(CEPH_FEATURE_MSG_AUTH)) {
2179 if (tcp_read((char*)&footer, sizeof(footer)) < 0)
2180 goto out_dethrottle;
2181 } else {
2182 ceph_msg_footer_old old_footer;
2183 if (tcp_read((char*)&old_footer, sizeof(old_footer)) < 0)
2184 goto out_dethrottle;
2185 footer.front_crc = old_footer.front_crc;
2186 footer.middle_crc = old_footer.middle_crc;
2187 footer.data_crc = old_footer.data_crc;
2188 footer.sig = 0;
2189 footer.flags = old_footer.flags;
2190 }
2191
2192 aborted = (footer.flags & CEPH_MSG_FOOTER_COMPLETE) == 0;
2193 ldout(msgr->cct,10) << "aborted = " << aborted << dendl;
2194 if (aborted) {
2195 ldout(msgr->cct,0) << "reader got " << front.length() << " + " << middle.length() << " + " << data.length()
2196 << " byte message.. ABORTED" << dendl;
2197 ret = 0;
2198 goto out_dethrottle;
2199 }
2200
2201 ldout(msgr->cct,20) << "reader got " << front.length() << " + " << middle.length() << " + " << data.length()
2202 << " byte message" << dendl;
2203 message = decode_message(msgr->cct, msgr->crcflags, header, footer,
2204 front, middle, data, connection_state.get());
2205 if (!message) {
2206 ret = -EINVAL;
2207 goto out_dethrottle;
2208 }
2209
2210 //
2211 // Check the signature if one should be present. A zero return indicates success. PLR
2212 //
2213
2214 if (auth_handler == NULL) {
2215 ldout(msgr->cct, 10) << "No session security set" << dendl;
2216 } else {
2217 if (auth_handler->check_message_signature(message)) {
2218 ldout(msgr->cct, 0) << "Signature check failed" << dendl;
2219 message->put();
2220 ret = -EINVAL;
2221 goto out_dethrottle;
2222 }
2223 }
2224
2225 message->set_byte_throttler(policy.throttler_bytes);
2226 message->set_message_throttler(policy.throttler_messages);
2227
2228 // store reservation size in message, so we don't get confused
2229 // by messages entering the dispatch queue through other paths.
2230 message->set_dispatch_throttle_size(message_size);
2231
2232 message->set_recv_stamp(recv_stamp);
2233 message->set_throttle_stamp(throttle_stamp);
2234 message->set_recv_complete_stamp(ceph_clock_now());
2235
2236 *pm = message;
2237 return 0;
2238
2239 out_dethrottle:
2240 // release bytes reserved from the throttlers on failure
2241 if (policy.throttler_messages) {
2242 ldout(msgr->cct,10) << "reader releasing " << 1 << " message to policy throttler "
2243 << policy.throttler_messages->get_current() << "/"
2244 << policy.throttler_messages->get_max() << dendl;
2245 policy.throttler_messages->put();
2246 }
2247 if (message_size) {
2248 if (policy.throttler_bytes) {
2249 ldout(msgr->cct,10) << "reader releasing " << message_size << " bytes to policy throttler "
2250 << policy.throttler_bytes->get_current() << "/"
2251 << policy.throttler_bytes->get_max() << dendl;
2252 policy.throttler_bytes->put(message_size);
2253 }
2254
2255 in_q->dispatch_throttle_release(message_size);
2256 }
2257 return ret;
2258 }
2259
2260 /*
2261 SIGPIPE suppression - for platforms without SO_NOSIGPIPE or MSG_NOSIGNAL
2262 http://krokisplace.blogspot.in/2010/02/suppressing-sigpipe-in-library.html
2263 http://www.microhowto.info/howto/ignore_sigpipe_without_affecting_other_threads_in_a_process.html
2264 */
2265 void Pipe::suppress_sigpipe()
2266 {
2267 #if !defined(MSG_NOSIGNAL) && !defined(SO_NOSIGPIPE)
2268 /*
2269 We want to ignore possible SIGPIPE that we can generate on write.
2270 SIGPIPE is delivered *synchronously* and *only* to the thread
2271 doing the write. So if it is reported as already pending (which
2272 means the thread blocks it), then we do nothing: if we generate
2273 SIGPIPE, it will be merged with the pending one (there's no
2274 queuing), and that suits us well. If it is not pending, we block
2275 it in this thread (and we avoid changing signal action, because it
2276 is per-process).
2277 */
2278 sigset_t pending;
2279 sigemptyset(&pending);
2280 sigpending(&pending);
2281 sigpipe_pending = sigismember(&pending, SIGPIPE);
2282 if (!sigpipe_pending) {
2283 sigset_t blocked;
2284 sigemptyset(&blocked);
2285 pthread_sigmask(SIG_BLOCK, &sigpipe_mask, &blocked);
2286
2287 /* Maybe is was blocked already? */
2288 sigpipe_unblock = ! sigismember(&blocked, SIGPIPE);
2289 }
2290 #endif /* !defined(MSG_NOSIGNAL) && !defined(SO_NOSIGPIPE) */
2291 }
2292
2293
2294 void Pipe::restore_sigpipe()
2295 {
2296 #if !defined(MSG_NOSIGNAL) && !defined(SO_NOSIGPIPE)
2297 /*
2298 If SIGPIPE was pending already we do nothing. Otherwise, if it
2299 become pending (i.e., we generated it), then we sigwait() it (thus
2300 clearing pending status). Then we unblock SIGPIPE, but only if it
2301 were us who blocked it.
2302 */
2303 if (!sigpipe_pending) {
2304 sigset_t pending;
2305 sigemptyset(&pending);
2306 sigpending(&pending);
2307 if (sigismember(&pending, SIGPIPE)) {
2308 /*
2309 Protect ourselves from a situation when SIGPIPE was sent
2310 by the user to the whole process, and was delivered to
2311 other thread before we had a chance to wait for it.
2312 */
2313 static const struct timespec nowait = { 0, 0 };
2314 TEMP_FAILURE_RETRY(sigtimedwait(&sigpipe_mask, NULL, &nowait));
2315 }
2316
2317 if (sigpipe_unblock)
2318 pthread_sigmask(SIG_UNBLOCK, &sigpipe_mask, NULL);
2319 }
2320 #endif /* !defined(MSG_NOSIGNAL) && !defined(SO_NOSIGPIPE) */
2321 }
2322
2323
2324 int Pipe::do_sendmsg(struct msghdr *msg, unsigned len, bool more)
2325 {
2326 suppress_sigpipe();
2327 while (len > 0) {
2328 int r;
2329 #if defined(MSG_NOSIGNAL)
2330 r = ::sendmsg(sd, msg, MSG_NOSIGNAL | (more ? MSG_MORE : 0));
2331 #else
2332 r = ::sendmsg(sd, msg, (more ? MSG_MORE : 0));
2333 #endif
2334 if (r == 0)
2335 ldout(msgr->cct,10) << "do_sendmsg hmm do_sendmsg got r==0!" << dendl;
2336 if (r < 0) {
2337 r = -errno;
2338 ldout(msgr->cct,1) << "do_sendmsg error " << cpp_strerror(r) << dendl;
2339 restore_sigpipe();
2340 return r;
2341 }
2342 if (state == STATE_CLOSED) {
2343 ldout(msgr->cct,10) << "do_sendmsg oh look, state == CLOSED, giving up" << dendl;
2344 restore_sigpipe();
2345 return -EINTR; // close enough
2346 }
2347
2348 len -= r;
2349 if (len == 0) break;
2350
2351 // hrmph. trim r bytes off the front of our message.
2352 ldout(msgr->cct,20) << "do_sendmsg short write did " << r << ", still have " << len << dendl;
2353 while (r > 0) {
2354 if (msg->msg_iov[0].iov_len <= (size_t)r) {
2355 // lose this whole item
2356 //ldout(msgr->cct,30) << "skipping " << msg->msg_iov[0].iov_len << ", " << (msg->msg_iovlen-1) << " v, " << r << " left" << dendl;
2357 r -= msg->msg_iov[0].iov_len;
2358 msg->msg_iov++;
2359 msg->msg_iovlen--;
2360 } else {
2361 // partial!
2362 //ldout(msgr->cct,30) << "adjusting " << msg->msg_iov[0].iov_len << ", " << msg->msg_iovlen << " v, " << r << " left" << dendl;
2363 msg->msg_iov[0].iov_base = (char *)msg->msg_iov[0].iov_base + r;
2364 msg->msg_iov[0].iov_len -= r;
2365 break;
2366 }
2367 }
2368 }
2369 restore_sigpipe();
2370 return 0;
2371 }
2372
2373
2374 int Pipe::write_ack(uint64_t seq)
2375 {
2376 ldout(msgr->cct,10) << "write_ack " << seq << dendl;
2377
2378 char c = CEPH_MSGR_TAG_ACK;
2379 ceph_le64 s;
2380 s = seq;
2381
2382 struct msghdr msg;
2383 memset(&msg, 0, sizeof(msg));
2384 struct iovec msgvec[2];
2385 msgvec[0].iov_base = &c;
2386 msgvec[0].iov_len = 1;
2387 msgvec[1].iov_base = &s;
2388 msgvec[1].iov_len = sizeof(s);
2389 msg.msg_iov = msgvec;
2390 msg.msg_iovlen = 2;
2391
2392 if (do_sendmsg(&msg, 1 + sizeof(s), true) < 0)
2393 return -1;
2394 return 0;
2395 }
2396
2397 int Pipe::write_keepalive()
2398 {
2399 ldout(msgr->cct,10) << "write_keepalive" << dendl;
2400
2401 char c = CEPH_MSGR_TAG_KEEPALIVE;
2402
2403 struct msghdr msg;
2404 memset(&msg, 0, sizeof(msg));
2405 struct iovec msgvec[2];
2406 msgvec[0].iov_base = &c;
2407 msgvec[0].iov_len = 1;
2408 msg.msg_iov = msgvec;
2409 msg.msg_iovlen = 1;
2410
2411 if (do_sendmsg(&msg, 1) < 0)
2412 return -1;
2413 return 0;
2414 }
2415
2416 int Pipe::write_keepalive2(char tag, const utime_t& t)
2417 {
2418 ldout(msgr->cct,10) << "write_keepalive2 " << (int)tag << " " << t << dendl;
2419 struct ceph_timespec ts;
2420 t.encode_timeval(&ts);
2421 struct msghdr msg;
2422 memset(&msg, 0, sizeof(msg));
2423 struct iovec msgvec[2];
2424 msgvec[0].iov_base = &tag;
2425 msgvec[0].iov_len = 1;
2426 msgvec[1].iov_base = &ts;
2427 msgvec[1].iov_len = sizeof(ts);
2428 msg.msg_iov = msgvec;
2429 msg.msg_iovlen = 2;
2430
2431 if (do_sendmsg(&msg, 1 + sizeof(ts)) < 0)
2432 return -1;
2433 return 0;
2434 }
2435
2436
2437 int Pipe::write_message(const ceph_msg_header& header, const ceph_msg_footer& footer, bufferlist& blist)
2438 {
2439 int ret;
2440
2441 // set up msghdr and iovecs
2442 struct msghdr msg;
2443 memset(&msg, 0, sizeof(msg));
2444 msg.msg_iov = msgvec;
2445 int msglen = 0;
2446
2447 // send tag
2448 char tag = CEPH_MSGR_TAG_MSG;
2449 msgvec[msg.msg_iovlen].iov_base = &tag;
2450 msgvec[msg.msg_iovlen].iov_len = 1;
2451 msglen++;
2452 msg.msg_iovlen++;
2453
2454 // send envelope
2455 ceph_msg_header_old oldheader;
2456 if (connection_state->has_feature(CEPH_FEATURE_NOSRCADDR)) {
2457 msgvec[msg.msg_iovlen].iov_base = (char*)&header;
2458 msgvec[msg.msg_iovlen].iov_len = sizeof(header);
2459 msglen += sizeof(header);
2460 msg.msg_iovlen++;
2461 } else {
2462 memcpy(&oldheader, &header, sizeof(header));
2463 oldheader.src.name = header.src;
2464 oldheader.src.addr = connection_state->get_peer_addr();
2465 oldheader.orig_src = oldheader.src;
2466 oldheader.reserved = header.reserved;
2467 if (msgr->crcflags & MSG_CRC_HEADER) {
2468 oldheader.crc = ceph_crc32c(0, (unsigned char*)&oldheader,
2469 sizeof(oldheader) - sizeof(oldheader.crc));
2470 } else {
2471 oldheader.crc = 0;
2472 }
2473 msgvec[msg.msg_iovlen].iov_base = (char*)&oldheader;
2474 msgvec[msg.msg_iovlen].iov_len = sizeof(oldheader);
2475 msglen += sizeof(oldheader);
2476 msg.msg_iovlen++;
2477 }
2478
2479 // payload (front+data)
2480 list<bufferptr>::const_iterator pb = blist.buffers().begin();
2481 unsigned b_off = 0; // carry-over buffer offset, if any
2482 unsigned bl_pos = 0; // blist pos
2483 unsigned left = blist.length();
2484
2485 while (left > 0) {
2486 unsigned donow = MIN(left, pb->length()-b_off);
2487 if (donow == 0) {
2488 ldout(msgr->cct,0) << "donow = " << donow << " left " << left << " pb->length " << pb->length()
2489 << " b_off " << b_off << dendl;
2490 }
2491 assert(donow > 0);
2492 ldout(msgr->cct,30) << " bl_pos " << bl_pos << " b_off " << b_off
2493 << " leftinchunk " << left
2494 << " buffer len " << pb->length()
2495 << " writing " << donow
2496 << dendl;
2497
2498 if (msg.msg_iovlen >= SM_IOV_MAX-2) {
2499 if (do_sendmsg(&msg, msglen, true))
2500 goto fail;
2501
2502 // and restart the iov
2503 msg.msg_iov = msgvec;
2504 msg.msg_iovlen = 0;
2505 msglen = 0;
2506 }
2507
2508 msgvec[msg.msg_iovlen].iov_base = (void*)(pb->c_str()+b_off);
2509 msgvec[msg.msg_iovlen].iov_len = donow;
2510 msglen += donow;
2511 msg.msg_iovlen++;
2512
2513 assert(left >= donow);
2514 left -= donow;
2515 b_off += donow;
2516 bl_pos += donow;
2517 if (left == 0)
2518 break;
2519 while (b_off == pb->length()) {
2520 ++pb;
2521 b_off = 0;
2522 }
2523 }
2524 assert(left == 0);
2525
2526 // send footer; if receiver doesn't support signatures, use the old footer format
2527
2528 ceph_msg_footer_old old_footer;
2529 if (connection_state->has_feature(CEPH_FEATURE_MSG_AUTH)) {
2530 msgvec[msg.msg_iovlen].iov_base = (void*)&footer;
2531 msgvec[msg.msg_iovlen].iov_len = sizeof(footer);
2532 msglen += sizeof(footer);
2533 msg.msg_iovlen++;
2534 } else {
2535 if (msgr->crcflags & MSG_CRC_HEADER) {
2536 old_footer.front_crc = footer.front_crc;
2537 old_footer.middle_crc = footer.middle_crc;
2538 } else {
2539 old_footer.front_crc = old_footer.middle_crc = 0;
2540 }
2541 old_footer.data_crc = msgr->crcflags & MSG_CRC_DATA ? footer.data_crc : 0;
2542 old_footer.flags = footer.flags;
2543 msgvec[msg.msg_iovlen].iov_base = (char*)&old_footer;
2544 msgvec[msg.msg_iovlen].iov_len = sizeof(old_footer);
2545 msglen += sizeof(old_footer);
2546 msg.msg_iovlen++;
2547 }
2548
2549 // send
2550 if (do_sendmsg(&msg, msglen))
2551 goto fail;
2552
2553 ret = 0;
2554
2555 out:
2556 return ret;
2557
2558 fail:
2559 ret = -1;
2560 goto out;
2561 }
2562
2563
2564 int Pipe::tcp_read(char *buf, unsigned len)
2565 {
2566 if (sd < 0)
2567 return -EINVAL;
2568
2569 while (len > 0) {
2570
2571 if (msgr->cct->_conf->ms_inject_socket_failures && sd >= 0) {
2572 if (rand() % msgr->cct->_conf->ms_inject_socket_failures == 0) {
2573 ldout(msgr->cct, 0) << "injecting socket failure" << dendl;
2574 ::shutdown(sd, SHUT_RDWR);
2575 }
2576 }
2577
2578 if (tcp_read_wait() < 0)
2579 return -1;
2580
2581 ssize_t got = tcp_read_nonblocking(buf, len);
2582
2583 if (got < 0)
2584 return -1;
2585
2586 len -= got;
2587 buf += got;
2588 //lgeneric_dout(cct, DBL) << "tcp_read got " << got << ", " << len << " left" << dendl;
2589 }
2590 return 0;
2591 }
2592
2593 int Pipe::tcp_read_wait()
2594 {
2595 if (sd < 0)
2596 return -EINVAL;
2597 struct pollfd pfd;
2598 short evmask;
2599 pfd.fd = sd;
2600 pfd.events = POLLIN;
2601 #if defined(__linux__)
2602 pfd.events |= POLLRDHUP;
2603 #endif
2604
2605 if (has_pending_data())
2606 return 0;
2607
2608 int r = poll(&pfd, 1, msgr->timeout);
2609 if (r < 0)
2610 return -errno;
2611 if (r == 0)
2612 return -EAGAIN;
2613
2614 evmask = POLLERR | POLLHUP | POLLNVAL;
2615 #if defined(__linux__)
2616 evmask |= POLLRDHUP;
2617 #endif
2618 if (pfd.revents & evmask)
2619 return -1;
2620
2621 if (!(pfd.revents & POLLIN))
2622 return -1;
2623
2624 return 0;
2625 }
2626
2627 ssize_t Pipe::do_recv(char *buf, size_t len, int flags)
2628 {
2629 again:
2630 ssize_t got = ::recv( sd, buf, len, flags );
2631 if (got < 0) {
2632 if (errno == EINTR) {
2633 goto again;
2634 }
2635 ldout(msgr->cct, 10) << __func__ << " socket " << sd << " returned "
2636 << got << " " << cpp_strerror(errno) << dendl;
2637 return -1;
2638 }
2639 if (got == 0) {
2640 return -1;
2641 }
2642 return got;
2643 }
2644
2645 ssize_t Pipe::buffered_recv(char *buf, size_t len, int flags)
2646 {
2647 size_t left = len;
2648 ssize_t total_recv = 0;
2649 if (recv_len > recv_ofs) {
2650 int to_read = MIN(recv_len - recv_ofs, left);
2651 memcpy(buf, &recv_buf[recv_ofs], to_read);
2652 recv_ofs += to_read;
2653 left -= to_read;
2654 if (left == 0) {
2655 return to_read;
2656 }
2657 buf += to_read;
2658 total_recv += to_read;
2659 }
2660
2661 /* nothing left in the prefetch buffer */
2662
2663 if (left > recv_max_prefetch) {
2664 /* this was a large read, we don't prefetch for these */
2665 ssize_t ret = do_recv(buf, left, flags );
2666 if (ret < 0) {
2667 if (total_recv > 0)
2668 return total_recv;
2669 return ret;
2670 }
2671 total_recv += ret;
2672 return total_recv;
2673 }
2674
2675
2676 ssize_t got = do_recv(recv_buf, recv_max_prefetch, flags);
2677 if (got < 0) {
2678 if (total_recv > 0)
2679 return total_recv;
2680
2681 return got;
2682 }
2683
2684 recv_len = (size_t)got;
2685 got = MIN(left, (size_t)got);
2686 memcpy(buf, recv_buf, got);
2687 recv_ofs = got;
2688 total_recv += got;
2689 return total_recv;
2690 }
2691
2692 ssize_t Pipe::tcp_read_nonblocking(char *buf, unsigned len)
2693 {
2694 ssize_t got = buffered_recv(buf, len, MSG_DONTWAIT );
2695 if (got < 0) {
2696 ldout(msgr->cct, 10) << __func__ << " socket " << sd << " returned "
2697 << got << " " << cpp_strerror(errno) << dendl;
2698 return -1;
2699 }
2700 if (got == 0) {
2701 /* poll() said there was data, but we didn't read any - peer
2702 * sent a FIN. Maybe POLLRDHUP signals this, but this is
2703 * standard socket behavior as documented by Stevens.
2704 */
2705 return -1;
2706 }
2707 return got;
2708 }
2709
2710 int Pipe::tcp_write(const char *buf, unsigned len)
2711 {
2712 if (sd < 0)
2713 return -1;
2714 struct pollfd pfd;
2715 pfd.fd = sd;
2716 pfd.events = POLLOUT | POLLHUP | POLLNVAL | POLLERR;
2717 #if defined(__linux__)
2718 pfd.events |= POLLRDHUP;
2719 #endif
2720
2721 if (msgr->cct->_conf->ms_inject_socket_failures && sd >= 0) {
2722 if (rand() % msgr->cct->_conf->ms_inject_socket_failures == 0) {
2723 ldout(msgr->cct, 0) << "injecting socket failure" << dendl;
2724 ::shutdown(sd, SHUT_RDWR);
2725 }
2726 }
2727
2728 if (poll(&pfd, 1, -1) < 0)
2729 return -1;
2730
2731 if (!(pfd.revents & POLLOUT))
2732 return -1;
2733
2734 //lgeneric_dout(cct, DBL) << "tcp_write writing " << len << dendl;
2735 assert(len > 0);
2736 suppress_sigpipe();
2737
2738 while (len > 0) {
2739 int did;
2740 #if defined(MSG_NOSIGNAL)
2741 did = ::send( sd, buf, len, MSG_NOSIGNAL );
2742 #else
2743 did = ::send( sd, buf, len, 0);
2744 #endif
2745 if (did < 0) {
2746 //lgeneric_dout(cct, 1) << "tcp_write error did = " << did << " " << cpp_strerror(errno) << dendl;
2747 //lgeneric_derr(cct, 1) << "tcp_write error did = " << did << " " << cpp_strerror(errno) << dendl;
2748 return did;
2749 }
2750 len -= did;
2751 buf += did;
2752 //lgeneric_dout(cct, DBL) << "tcp_write did " << did << ", " << len << " left" << dendl;
2753 }
2754 restore_sigpipe();
2755
2756 return 0;
2757 }