]> git.proxmox.com Git - ceph.git/blame - ceph/src/msg/simple/Pipe.cc
update sources to 12.2.7
[ceph.git] / ceph / src / msg / simple / Pipe.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15#include <sys/types.h>
16#include <sys/socket.h>
17#include <netinet/in.h>
18#include <netinet/ip.h>
19#include <netinet/tcp.h>
20#include <sys/uio.h>
21#include <limits.h>
22#include <poll.h>
23
24#include "msg/Message.h"
25#include "Pipe.h"
26#include "SimpleMessenger.h"
27
28#include "common/debug.h"
29#include "common/errno.h"
30#include "common/valgrind.h"
31
32// Below included to get encode_encrypt(); That probably should be in Crypto.h, instead
33
34#include "auth/Crypto.h"
35#include "auth/cephx/CephxProtocol.h"
36#include "auth/AuthSessionHandler.h"
37
38#include "include/sock_compat.h"
39
40// Constant to limit starting sequence number to 2^31. Nothing special about it, just a big number. PLR
41#define SEQ_MASK 0x7fffffff
42#define dout_subsys ceph_subsys_ms
43
44#undef dout_prefix
45#define dout_prefix *_dout << *this
46ostream& Pipe::_pipe_prefix(std::ostream &out) const {
47 return out << "-- " << msgr->get_myinst().addr << " >> " << peer_addr << " pipe(" << this
48 << " sd=" << sd << " :" << port
49 << " s=" << state
50 << " pgs=" << peer_global_seq
51 << " cs=" << connect_seq
52 << " l=" << policy.lossy
53 << " c=" << connection_state
54 << ").";
55}
56
57ostream& operator<<(ostream &out, const Pipe &pipe) {
58 return pipe._pipe_prefix(out);
59}
60
61/**
62 * The DelayedDelivery is for injecting delays into Message delivery off
63 * the socket. It is only enabled if delays are requested, and if they
64 * are then it pulls Messages off the DelayQueue and puts them into the
65 * in_q (SimpleMessenger::dispatch_queue).
66 * Please note that this probably has issues with Pipe shutdown and
67 * replacement semantics. I've tried, but no guarantees.
68 */
69class Pipe::DelayedDelivery: public Thread {
70 Pipe *pipe;
71 std::deque< pair<utime_t,Message*> > delay_queue;
72 Mutex delay_lock;
73 Cond delay_cond;
74 int flush_count;
75 bool active_flush;
76 bool stop_delayed_delivery;
77 bool delay_dispatching; // we are in fast dispatch now
78 bool stop_fast_dispatching_flag; // we need to stop fast dispatching
79
80public:
81 explicit DelayedDelivery(Pipe *p)
82 : pipe(p),
83 delay_lock("Pipe::DelayedDelivery::delay_lock"), flush_count(0),
84 active_flush(false),
85 stop_delayed_delivery(false),
86 delay_dispatching(false),
87 stop_fast_dispatching_flag(false) { }
88 ~DelayedDelivery() override {
89 discard();
90 }
91 void *entry() override;
92 void queue(utime_t release, Message *m) {
93 Mutex::Locker l(delay_lock);
94 delay_queue.push_back(make_pair(release, m));
95 delay_cond.Signal();
96 }
97 void discard();
98 void flush();
99 bool is_flushing() {
100 Mutex::Locker l(delay_lock);
101 return flush_count > 0 || active_flush;
102 }
103 void wait_for_flush() {
104 Mutex::Locker l(delay_lock);
105 while (flush_count > 0 || active_flush)
106 delay_cond.Wait(delay_lock);
107 }
108 void stop() {
109 delay_lock.Lock();
110 stop_delayed_delivery = true;
111 delay_cond.Signal();
112 delay_lock.Unlock();
113 }
114 void steal_for_pipe(Pipe *new_owner) {
115 Mutex::Locker l(delay_lock);
116 pipe = new_owner;
117 }
118 /**
119 * We need to stop fast dispatching before we need to stop putting
120 * normal messages into the DispatchQueue.
121 */
122 void stop_fast_dispatching();
123};
124
125/**************************************
126 * Pipe
127 */
128
129Pipe::Pipe(SimpleMessenger *r, int st, PipeConnection *con)
130 : RefCountedObject(r->cct),
131 reader_thread(this),
132 writer_thread(this),
133 delay_thread(NULL),
134 msgr(r),
135 conn_id(r->dispatch_queue.get_id()),
136 recv_ofs(0),
137 recv_len(0),
138 sd(-1), port(0),
139 peer_type(-1),
140 pipe_lock("SimpleMessenger::Pipe::pipe_lock"),
141 state(st),
142 connection_state(NULL),
143 reader_running(false), reader_needs_join(false),
144 reader_dispatching(false), notify_on_dispatch_done(false),
145 writer_running(false),
146 in_q(&(r->dispatch_queue)),
147 send_keepalive(false),
148 send_keepalive_ack(false),
149 connect_seq(0), peer_global_seq(0),
150 out_seq(0), in_seq(0), in_seq_acked(0) {
151 ANNOTATE_BENIGN_RACE_SIZED(&sd, sizeof(sd), "Pipe socket");
152 ANNOTATE_BENIGN_RACE_SIZED(&state, sizeof(state), "Pipe state");
153 ANNOTATE_BENIGN_RACE_SIZED(&recv_len, sizeof(recv_len), "Pipe recv_len");
154 ANNOTATE_BENIGN_RACE_SIZED(&recv_ofs, sizeof(recv_ofs), "Pipe recv_ofs");
155 if (con) {
156 connection_state = con;
157 connection_state->reset_pipe(this);
158 } else {
159 connection_state = new PipeConnection(msgr->cct, msgr);
160 connection_state->pipe = get();
161 }
162
163 if (randomize_out_seq()) {
164 lsubdout(msgr->cct,ms,15) << "Pipe(): Could not get random bytes to set seq number for session reset; set seq number to " << out_seq << dendl;
165 }
166
167
168 msgr->timeout = msgr->cct->_conf->ms_tcp_read_timeout * 1000; //convert to ms
169 if (msgr->timeout == 0)
170 msgr->timeout = -1;
171
172 recv_max_prefetch = msgr->cct->_conf->ms_tcp_prefetch_max_size;
173 recv_buf = new char[recv_max_prefetch];
174}
175
176Pipe::~Pipe()
177{
178 assert(out_q.empty());
179 assert(sent.empty());
180 delete delay_thread;
181 delete[] recv_buf;
182}
183
184void Pipe::handle_ack(uint64_t seq)
185{
186 lsubdout(msgr->cct, ms, 15) << "reader got ack seq " << seq << dendl;
187 // trim sent list
188 while (!sent.empty() &&
189 sent.front()->get_seq() <= seq) {
190 Message *m = sent.front();
191 sent.pop_front();
192 lsubdout(msgr->cct, ms, 10) << "reader got ack seq "
193 << seq << " >= " << m->get_seq() << " on " << m << " " << *m << dendl;
194 m->put();
195 }
196}
197
198void Pipe::start_reader()
199{
200 assert(pipe_lock.is_locked());
201 assert(!reader_running);
202 if (reader_needs_join) {
203 reader_thread.join();
204 reader_needs_join = false;
205 }
206 reader_running = true;
207 reader_thread.create("ms_pipe_read", msgr->cct->_conf->ms_rwthread_stack_bytes);
208}
209
210void Pipe::maybe_start_delay_thread()
211{
212 if (!delay_thread) {
213 auto pos = msgr->cct->_conf->get_val<std::string>("ms_inject_delay_type").find(ceph_entity_type_name(connection_state->peer_type));
214 if (pos != string::npos) {
215 lsubdout(msgr->cct, ms, 1) << "setting up a delay queue on Pipe " << this << dendl;
216 delay_thread = new DelayedDelivery(this);
217 delay_thread->create("ms_pipe_delay");
218 }
219 }
220}
221
222void Pipe::start_writer()
223{
224 assert(pipe_lock.is_locked());
225 assert(!writer_running);
226 writer_running = true;
227 writer_thread.create("ms_pipe_write", msgr->cct->_conf->ms_rwthread_stack_bytes);
228}
229
230void Pipe::join_reader()
231{
232 if (!reader_running)
233 return;
234 cond.Signal();
235 pipe_lock.Unlock();
236 reader_thread.join();
237 pipe_lock.Lock();
238 reader_needs_join = false;
239}
240
241void Pipe::DelayedDelivery::discard()
242{
243 lgeneric_subdout(pipe->msgr->cct, ms, 20) << *pipe << "DelayedDelivery::discard" << dendl;
244 Mutex::Locker l(delay_lock);
245 while (!delay_queue.empty()) {
246 Message *m = delay_queue.front().second;
247 pipe->in_q->dispatch_throttle_release(m->get_dispatch_throttle_size());
248 m->put();
249 delay_queue.pop_front();
250 }
251}
252
253void Pipe::DelayedDelivery::flush()
254{
255 lgeneric_subdout(pipe->msgr->cct, ms, 20) << *pipe << "DelayedDelivery::flush" << dendl;
256 Mutex::Locker l(delay_lock);
257 flush_count = delay_queue.size();
258 delay_cond.Signal();
259}
260
261void *Pipe::DelayedDelivery::entry()
262{
263 Mutex::Locker locker(delay_lock);
264 lgeneric_subdout(pipe->msgr->cct, ms, 20) << *pipe << "DelayedDelivery::entry start" << dendl;
265
266 while (!stop_delayed_delivery) {
267 if (delay_queue.empty()) {
268 lgeneric_subdout(pipe->msgr->cct, ms, 30) << *pipe << "DelayedDelivery::entry sleeping on delay_cond because delay queue is empty" << dendl;
269 delay_cond.Wait(delay_lock);
270 continue;
271 }
272 utime_t release = delay_queue.front().first;
273 Message *m = delay_queue.front().second;
274 string delay_msg_type = pipe->msgr->cct->_conf->ms_inject_delay_msg_type;
275 if (!flush_count &&
276 (release > ceph_clock_now() &&
277 (delay_msg_type.empty() || m->get_type_name() == delay_msg_type))) {
278 lgeneric_subdout(pipe->msgr->cct, ms, 10) << *pipe << "DelayedDelivery::entry sleeping on delay_cond until " << release << dendl;
279 delay_cond.WaitUntil(delay_lock, release);
280 continue;
281 }
282 lgeneric_subdout(pipe->msgr->cct, ms, 10) << *pipe << "DelayedDelivery::entry dequeuing message " << m << " for delivery, past " << release << dendl;
283 delay_queue.pop_front();
284 if (flush_count > 0) {
285 --flush_count;
286 active_flush = true;
287 }
288 if (pipe->in_q->can_fast_dispatch(m)) {
289 if (!stop_fast_dispatching_flag) {
290 delay_dispatching = true;
291 delay_lock.Unlock();
292 pipe->in_q->fast_dispatch(m);
293 delay_lock.Lock();
294 delay_dispatching = false;
295 if (stop_fast_dispatching_flag) {
296 // we need to let the stopping thread proceed
297 delay_cond.Signal();
298 delay_lock.Unlock();
299 delay_lock.Lock();
300 }
301 }
302 } else {
303 pipe->in_q->enqueue(m, m->get_priority(), pipe->conn_id);
304 }
305 active_flush = false;
306 }
307 lgeneric_subdout(pipe->msgr->cct, ms, 20) << *pipe << "DelayedDelivery::entry stop" << dendl;
308 return NULL;
309}
310
311void Pipe::DelayedDelivery::stop_fast_dispatching() {
312 Mutex::Locker l(delay_lock);
313 stop_fast_dispatching_flag = true;
314 while (delay_dispatching)
315 delay_cond.Wait(delay_lock);
316}
317
318
319int Pipe::accept()
320{
321 ldout(msgr->cct,10) << "accept" << dendl;
322 assert(pipe_lock.is_locked());
323 assert(state == STATE_ACCEPTING);
324
325 pipe_lock.Unlock();
326
327 // vars
328 bufferlist addrs;
329 entity_addr_t socket_addr;
330 socklen_t len;
331 int r;
332 char banner[strlen(CEPH_BANNER)+1];
333 bufferlist addrbl;
334 ceph_msg_connect connect;
335 ceph_msg_connect_reply reply;
336 Pipe *existing = 0;
337 bufferptr bp;
338 bufferlist authorizer, authorizer_reply;
339 bool authorizer_valid;
340 uint64_t feat_missing;
341 bool replaced = false;
342 // this variable denotes if the connection attempt from peer is a hard
343 // reset or not, it is true if there is an existing connection and the
344 // connection sequence from peer is equal to zero
345 bool is_reset_from_peer = false;
346 CryptoKey session_key;
347 int removed; // single-use down below
348
349 // this should roughly mirror pseudocode at
350 // http://ceph.com/wiki/Messaging_protocol
351 int reply_tag = 0;
352 uint64_t existing_seq = -1;
353
354 // used for reading in the remote acked seq on connect
355 uint64_t newly_acked_seq = 0;
356
28e407b8
AA
357 bool need_challenge = false;
358 bool had_challenge = false;
359 std::unique_ptr<AuthAuthorizerChallenge> authorizer_challenge;
360
7c673cae
FG
361 recv_reset();
362
363 set_socket_options();
364
365 // announce myself.
366 r = tcp_write(CEPH_BANNER, strlen(CEPH_BANNER));
367 if (r < 0) {
368 ldout(msgr->cct,10) << "accept couldn't write banner" << dendl;
369 goto fail_unlocked;
370 }
371
372 // and my addr
373 ::encode(msgr->my_inst.addr, addrs, 0); // legacy
374
375 port = msgr->my_inst.addr.get_port();
376
377 // and peer's socket addr (they might not know their ip)
378 sockaddr_storage ss;
379 len = sizeof(ss);
380 r = ::getpeername(sd, (sockaddr*)&ss, &len);
381 if (r < 0) {
382 ldout(msgr->cct,0) << "accept failed to getpeername " << cpp_strerror(errno) << dendl;
383 goto fail_unlocked;
384 }
385 socket_addr.set_sockaddr((sockaddr*)&ss);
386 ::encode(socket_addr, addrs, 0); // legacy
387
388 r = tcp_write(addrs.c_str(), addrs.length());
389 if (r < 0) {
390 ldout(msgr->cct,10) << "accept couldn't write my+peer addr" << dendl;
391 goto fail_unlocked;
392 }
393
394 ldout(msgr->cct,1) << "accept sd=" << sd << " " << socket_addr << dendl;
395
396 // identify peer
397 if (tcp_read(banner, strlen(CEPH_BANNER)) < 0) {
398 ldout(msgr->cct,10) << "accept couldn't read banner" << dendl;
399 goto fail_unlocked;
400 }
401 if (memcmp(banner, CEPH_BANNER, strlen(CEPH_BANNER))) {
402 banner[strlen(CEPH_BANNER)] = 0;
403 ldout(msgr->cct,1) << "accept peer sent bad banner '" << banner << "' (should be '" << CEPH_BANNER << "')" << dendl;
404 goto fail_unlocked;
405 }
406 {
407 bufferptr tp(sizeof(ceph_entity_addr));
408 addrbl.push_back(std::move(tp));
409 }
410 if (tcp_read(addrbl.c_str(), addrbl.length()) < 0) {
411 ldout(msgr->cct,10) << "accept couldn't read peer_addr" << dendl;
412 goto fail_unlocked;
413 }
414 {
415 bufferlist::iterator ti = addrbl.begin();
416 ::decode(peer_addr, ti);
417 }
418
419 ldout(msgr->cct,10) << "accept peer addr is " << peer_addr << dendl;
420 if (peer_addr.is_blank_ip()) {
421 // peer apparently doesn't know what ip they have; figure it out for them.
422 int port = peer_addr.get_port();
423 peer_addr.u = socket_addr.u;
424 peer_addr.set_port(port);
425 ldout(msgr->cct,0) << "accept peer addr is really " << peer_addr
426 << " (socket is " << socket_addr << ")" << dendl;
427 }
428 set_peer_addr(peer_addr); // so that connection_state gets set up
429
430 while (1) {
431 if (tcp_read((char*)&connect, sizeof(connect)) < 0) {
432 ldout(msgr->cct,10) << "accept couldn't read connect" << dendl;
433 goto fail_unlocked;
434 }
435
436 authorizer.clear();
437 if (connect.authorizer_len) {
438 bp = buffer::create(connect.authorizer_len);
439 if (tcp_read(bp.c_str(), connect.authorizer_len) < 0) {
440 ldout(msgr->cct,10) << "accept couldn't read connect authorizer" << dendl;
441 goto fail_unlocked;
442 }
443 authorizer.push_back(std::move(bp));
444 authorizer_reply.clear();
445 }
446
447 ldout(msgr->cct,20) << "accept got peer connect_seq " << connect.connect_seq
448 << " global_seq " << connect.global_seq
449 << dendl;
450
451 msgr->lock.Lock(); // FIXME
452 pipe_lock.Lock();
453 if (msgr->dispatch_queue.stop)
454 goto shutting_down;
455 if (state != STATE_ACCEPTING) {
456 goto shutting_down;
457 }
458
459 // note peer's type, flags
460 set_peer_type(connect.host_type);
461 policy = msgr->get_policy(connect.host_type);
462 ldout(msgr->cct,10) << "accept of host_type " << connect.host_type
463 << ", policy.lossy=" << policy.lossy
464 << " policy.server=" << policy.server
465 << " policy.standby=" << policy.standby
466 << " policy.resetcheck=" << policy.resetcheck
467 << dendl;
468
469 memset(&reply, 0, sizeof(reply));
470 reply.protocol_version = msgr->get_proto_version(peer_type, false);
471 msgr->lock.Unlock();
472
473 // mismatch?
474 ldout(msgr->cct,10) << "accept my proto " << reply.protocol_version
475 << ", their proto " << connect.protocol_version << dendl;
476 if (connect.protocol_version != reply.protocol_version) {
477 reply.tag = CEPH_MSGR_TAG_BADPROTOVER;
478 goto reply;
479 }
480
481 // require signatures for cephx?
482 if (connect.authorizer_protocol == CEPH_AUTH_CEPHX) {
483 if (peer_type == CEPH_ENTITY_TYPE_OSD ||
28e407b8
AA
484 peer_type == CEPH_ENTITY_TYPE_MDS ||
485 peer_type == CEPH_ENTITY_TYPE_MGR) {
7c673cae
FG
486 if (msgr->cct->_conf->cephx_require_signatures ||
487 msgr->cct->_conf->cephx_cluster_require_signatures) {
488 ldout(msgr->cct,10) << "using cephx, requiring MSG_AUTH feature bit for cluster" << dendl;
489 policy.features_required |= CEPH_FEATURE_MSG_AUTH;
490 }
28e407b8
AA
491 if (msgr->cct->_conf->cephx_require_version >= 2 ||
492 msgr->cct->_conf->cephx_cluster_require_version >= 2) {
493 ldout(msgr->cct,10) << "using cephx, requiring cephx v2 feature bit for cluster" << dendl;
494 policy.features_required |= CEPH_FEATUREMASK_CEPHX_V2;
495 }
7c673cae
FG
496 } else {
497 if (msgr->cct->_conf->cephx_require_signatures ||
498 msgr->cct->_conf->cephx_service_require_signatures) {
499 ldout(msgr->cct,10) << "using cephx, requiring MSG_AUTH feature bit for service" << dendl;
500 policy.features_required |= CEPH_FEATURE_MSG_AUTH;
501 }
28e407b8
AA
502 if (msgr->cct->_conf->cephx_require_version >= 2 ||
503 msgr->cct->_conf->cephx_service_require_version >= 2) {
504 ldout(msgr->cct,10) << "using cephx, requiring cephx v2 feature bit for cluster" << dendl;
505 policy.features_required |= CEPH_FEATUREMASK_CEPHX_V2;
506 }
7c673cae
FG
507 }
508 }
509
510 feat_missing = policy.features_required & ~(uint64_t)connect.features;
511 if (feat_missing) {
512 ldout(msgr->cct,1) << "peer missing required features " << std::hex << feat_missing << std::dec << dendl;
513 reply.tag = CEPH_MSGR_TAG_FEATURES;
514 goto reply;
515 }
516
517 // Check the authorizer. If not good, bail out.
518
519 pipe_lock.Unlock();
520
28e407b8
AA
521 need_challenge = HAVE_FEATURE(connect.features, CEPHX_V2);
522 had_challenge = (bool)authorizer_challenge;
523 authorizer_reply.clear();
524 if (!msgr->verify_authorizer(
525 connection_state.get(), peer_type, connect.authorizer_protocol, authorizer,
526 authorizer_reply, authorizer_valid, session_key,
527 need_challenge ? &authorizer_challenge : nullptr) ||
7c673cae 528 !authorizer_valid) {
7c673cae
FG
529 pipe_lock.Lock();
530 if (state != STATE_ACCEPTING)
531 goto shutting_down_msgr_unlocked;
28e407b8
AA
532 if (!had_challenge && need_challenge && authorizer_challenge) {
533 ldout(msgr->cct,0) << "accept: challenging authorizer "
534 << authorizer_reply.length()
535 << " bytes" << dendl;
536 assert(authorizer_reply.length());
537 reply.tag = CEPH_MSGR_TAG_CHALLENGE_AUTHORIZER;
538 } else {
539 ldout(msgr->cct,0) << "accept: got bad authorizer" << dendl;
540 reply.tag = CEPH_MSGR_TAG_BADAUTHORIZER;
541 }
7c673cae
FG
542 session_security.reset();
543 goto reply;
544 }
545
546 // We've verified the authorizer for this pipe, so set up the session security structure. PLR
547
548 ldout(msgr->cct,10) << "accept: setting up session_security." << dendl;
549
550 retry_existing_lookup:
551 msgr->lock.Lock();
552 pipe_lock.Lock();
553 if (msgr->dispatch_queue.stop)
554 goto shutting_down;
555 if (state != STATE_ACCEPTING)
556 goto shutting_down;
557
558 // existing?
559 existing = msgr->_lookup_pipe(peer_addr);
560 if (existing) {
561 existing->pipe_lock.Lock(true); // skip lockdep check (we are locking a second Pipe here)
562 if (existing->reader_dispatching) {
563 /** we need to wait, or we can deadlock if downstream
564 * fast_dispatchers are (naughtily!) waiting on resources
565 * held by somebody trying to make use of the SimpleMessenger lock.
566 * So drop locks, wait, and retry. It just looks like a slow network
567 * to everybody else.
568 *
569 * We take a ref to existing here since it might get reaped before we
570 * wake up (see bug #15870). We can be confident that it lived until
571 * locked it since we held the msgr lock from _lookup_pipe through to
572 * locking existing->lock and checking reader_dispatching.
573 */
574 existing->get();
575 pipe_lock.Unlock();
576 msgr->lock.Unlock();
577 existing->notify_on_dispatch_done = true;
578 while (existing->reader_dispatching)
579 existing->cond.Wait(existing->pipe_lock);
580 existing->pipe_lock.Unlock();
581 existing->put();
582 existing = nullptr;
583 goto retry_existing_lookup;
584 }
585
586 if (connect.global_seq < existing->peer_global_seq) {
587 ldout(msgr->cct,10) << "accept existing " << existing << ".gseq " << existing->peer_global_seq
588 << " > " << connect.global_seq << ", RETRY_GLOBAL" << dendl;
589 reply.tag = CEPH_MSGR_TAG_RETRY_GLOBAL;
590 reply.global_seq = existing->peer_global_seq; // so we can send it below..
591 existing->pipe_lock.Unlock();
592 msgr->lock.Unlock();
593 goto reply;
594 } else {
595 ldout(msgr->cct,10) << "accept existing " << existing << ".gseq " << existing->peer_global_seq
596 << " <= " << connect.global_seq << ", looks ok" << dendl;
597 }
598
599 if (existing->policy.lossy) {
600 ldout(msgr->cct,0) << "accept replacing existing (lossy) channel (new one lossy="
601 << policy.lossy << ")" << dendl;
602 existing->was_session_reset();
603 goto replace;
604 }
605
606 ldout(msgr->cct,0) << "accept connect_seq " << connect.connect_seq
607 << " vs existing " << existing->connect_seq
608 << " state " << existing->get_state_name() << dendl;
609
610 if (connect.connect_seq == 0 && existing->connect_seq > 0) {
611 ldout(msgr->cct,0) << "accept peer reset, then tried to connect to us, replacing" << dendl;
612 // this is a hard reset from peer
613 is_reset_from_peer = true;
614 if (policy.resetcheck)
615 existing->was_session_reset(); // this resets out_queue, msg_ and connect_seq #'s
616 goto replace;
617 }
618
619 if (connect.connect_seq < existing->connect_seq) {
620 // old attempt, or we sent READY but they didn't get it.
621 ldout(msgr->cct,10) << "accept existing " << existing << ".cseq " << existing->connect_seq
622 << " > " << connect.connect_seq << ", RETRY_SESSION" << dendl;
623 goto retry_session;
624 }
625
626 if (connect.connect_seq == existing->connect_seq) {
627 // if the existing connection successfully opened, and/or
628 // subsequently went to standby, then the peer should bump
629 // their connect_seq and retry: this is not a connection race
630 // we need to resolve here.
631 if (existing->state == STATE_OPEN ||
632 existing->state == STATE_STANDBY) {
633 ldout(msgr->cct,10) << "accept connection race, existing " << existing
634 << ".cseq " << existing->connect_seq
635 << " == " << connect.connect_seq
636 << ", OPEN|STANDBY, RETRY_SESSION" << dendl;
637 goto retry_session;
638 }
639
640 // connection race?
641 if (peer_addr < msgr->my_inst.addr ||
642 existing->policy.server) {
643 // incoming wins
644 ldout(msgr->cct,10) << "accept connection race, existing " << existing << ".cseq " << existing->connect_seq
645 << " == " << connect.connect_seq << ", or we are server, replacing my attempt" << dendl;
646 if (!(existing->state == STATE_CONNECTING ||
647 existing->state == STATE_WAIT))
648 lderr(msgr->cct) << "accept race bad state, would replace, existing="
649 << existing->get_state_name()
650 << " " << existing << ".cseq=" << existing->connect_seq
651 << " == " << connect.connect_seq
652 << dendl;
653 assert(existing->state == STATE_CONNECTING ||
654 existing->state == STATE_WAIT);
655 goto replace;
656 } else {
657 // our existing outgoing wins
658 ldout(msgr->cct,10) << "accept connection race, existing " << existing << ".cseq " << existing->connect_seq
659 << " == " << connect.connect_seq << ", sending WAIT" << dendl;
660 assert(peer_addr > msgr->my_inst.addr);
661 if (!(existing->state == STATE_CONNECTING))
662 lderr(msgr->cct) << "accept race bad state, would send wait, existing="
663 << existing->get_state_name()
664 << " " << existing << ".cseq=" << existing->connect_seq
665 << " == " << connect.connect_seq
666 << dendl;
667 assert(existing->state == STATE_CONNECTING);
668 // make sure our outgoing connection will follow through
669 existing->_send_keepalive();
670 reply.tag = CEPH_MSGR_TAG_WAIT;
671 existing->pipe_lock.Unlock();
672 msgr->lock.Unlock();
673 goto reply;
674 }
675 }
676
677 assert(connect.connect_seq > existing->connect_seq);
678 assert(connect.global_seq >= existing->peer_global_seq);
679 if (policy.resetcheck && // RESETSESSION only used by servers; peers do not reset each other
680 existing->connect_seq == 0) {
681 ldout(msgr->cct,0) << "accept we reset (peer sent cseq " << connect.connect_seq
682 << ", " << existing << ".cseq = " << existing->connect_seq
683 << "), sending RESETSESSION" << dendl;
684 reply.tag = CEPH_MSGR_TAG_RESETSESSION;
685 msgr->lock.Unlock();
686 existing->pipe_lock.Unlock();
687 goto reply;
688 }
689
690 // reconnect
691 ldout(msgr->cct,10) << "accept peer sent cseq " << connect.connect_seq
692 << " > " << existing->connect_seq << dendl;
693 goto replace;
694 } // existing
695 else if (connect.connect_seq > 0) {
696 // we reset, and they are opening a new session
697 ldout(msgr->cct,0) << "accept we reset (peer sent cseq " << connect.connect_seq << "), sending RESETSESSION" << dendl;
698 msgr->lock.Unlock();
699 reply.tag = CEPH_MSGR_TAG_RESETSESSION;
700 goto reply;
701 } else {
702 // new session
703 ldout(msgr->cct,10) << "accept new session" << dendl;
704 existing = NULL;
705 goto open;
706 }
707 ceph_abort();
708
709 retry_session:
710 assert(existing->pipe_lock.is_locked());
711 assert(pipe_lock.is_locked());
712 reply.tag = CEPH_MSGR_TAG_RETRY_SESSION;
713 reply.connect_seq = existing->connect_seq + 1;
714 existing->pipe_lock.Unlock();
715 msgr->lock.Unlock();
716 goto reply;
717
718 reply:
719 assert(pipe_lock.is_locked());
720 reply.features = ((uint64_t)connect.features & policy.features_supported) | policy.features_required;
721 reply.authorizer_len = authorizer_reply.length();
722 pipe_lock.Unlock();
723 r = tcp_write((char*)&reply, sizeof(reply));
724 if (r < 0)
725 goto fail_unlocked;
726 if (reply.authorizer_len) {
727 r = tcp_write(authorizer_reply.c_str(), authorizer_reply.length());
728 if (r < 0)
729 goto fail_unlocked;
730 }
731 }
732
733 replace:
734 assert(existing->pipe_lock.is_locked());
735 assert(pipe_lock.is_locked());
736 // if it is a hard reset from peer, we don't need a round-trip to negotiate in/out sequence
737 if ((connect.features & CEPH_FEATURE_RECONNECT_SEQ) && !is_reset_from_peer) {
738 reply_tag = CEPH_MSGR_TAG_SEQ;
739 existing_seq = existing->in_seq;
740 }
741 ldout(msgr->cct,10) << "accept replacing " << existing << dendl;
742 existing->stop();
743 existing->unregister_pipe();
744 replaced = true;
745
746 if (existing->policy.lossy) {
747 // disconnect from the Connection
748 assert(existing->connection_state);
749 if (existing->connection_state->clear_pipe(existing))
750 msgr->dispatch_queue.queue_reset(existing->connection_state.get());
751 } else {
752 // queue a reset on the new connection, which we're dumping for the old
753 msgr->dispatch_queue.queue_reset(connection_state.get());
754
755 // drop my Connection, and take a ref to the existing one. do not
756 // clear existing->connection_state, since read_message and
757 // write_message both dereference it without pipe_lock.
758 connection_state = existing->connection_state;
759
760 // make existing Connection reference us
761 connection_state->reset_pipe(this);
762
763 if (existing->delay_thread) {
764 existing->delay_thread->steal_for_pipe(this);
765 delay_thread = existing->delay_thread;
766 existing->delay_thread = NULL;
767 delay_thread->flush();
768 }
769
770 // steal incoming queue
771 uint64_t replaced_conn_id = conn_id;
772 conn_id = existing->conn_id;
773 existing->conn_id = replaced_conn_id;
774
775 // reset the in_seq if this is a hard reset from peer,
776 // otherwise we respect our original connection's value
777 in_seq = is_reset_from_peer ? 0 : existing->in_seq;
778 in_seq_acked = in_seq;
779
780 // steal outgoing queue and out_seq
781 existing->requeue_sent();
782 out_seq = existing->out_seq;
783 ldout(msgr->cct,10) << "accept re-queuing on out_seq " << out_seq << " in_seq " << in_seq << dendl;
784 for (map<int, list<Message*> >::iterator p = existing->out_q.begin();
785 p != existing->out_q.end();
786 ++p)
787 out_q[p->first].splice(out_q[p->first].begin(), p->second);
788 }
789 existing->stop_and_wait();
790 existing->pipe_lock.Unlock();
791
792 open:
793 // open
794 assert(pipe_lock.is_locked());
795 connect_seq = connect.connect_seq + 1;
796 peer_global_seq = connect.global_seq;
797 assert(state == STATE_ACCEPTING);
798 state = STATE_OPEN;
799 ldout(msgr->cct,10) << "accept success, connect_seq = " << connect_seq << ", sending READY" << dendl;
800
801 // send READY reply
802 reply.tag = (reply_tag ? reply_tag : CEPH_MSGR_TAG_READY);
803 reply.features = policy.features_supported;
804 reply.global_seq = msgr->get_global_seq();
805 reply.connect_seq = connect_seq;
806 reply.flags = 0;
807 reply.authorizer_len = authorizer_reply.length();
808 if (policy.lossy)
809 reply.flags = reply.flags | CEPH_MSG_CONNECT_LOSSY;
810
811 connection_state->set_features((uint64_t)reply.features & (uint64_t)connect.features);
812 ldout(msgr->cct,10) << "accept features " << connection_state->get_features() << dendl;
813
814 session_security.reset(
815 get_auth_session_handler(msgr->cct,
816 connect.authorizer_protocol,
817 session_key,
818 connection_state->get_features()));
819
820 // notify
821 msgr->dispatch_queue.queue_accept(connection_state.get());
822 msgr->ms_deliver_handle_fast_accept(connection_state.get());
823
824 // ok!
825 if (msgr->dispatch_queue.stop)
826 goto shutting_down;
827 removed = msgr->accepting_pipes.erase(this);
828 assert(removed == 1);
829 register_pipe();
830 msgr->lock.Unlock();
831 pipe_lock.Unlock();
832
833 r = tcp_write((char*)&reply, sizeof(reply));
834 if (r < 0) {
835 goto fail_registered;
836 }
837
838 if (reply.authorizer_len) {
839 r = tcp_write(authorizer_reply.c_str(), authorizer_reply.length());
840 if (r < 0) {
841 goto fail_registered;
842 }
843 }
844
845 if (reply_tag == CEPH_MSGR_TAG_SEQ) {
846 if (tcp_write((char*)&existing_seq, sizeof(existing_seq)) < 0) {
847 ldout(msgr->cct,2) << "accept write error on in_seq" << dendl;
848 goto fail_registered;
849 }
850 if (tcp_read((char*)&newly_acked_seq, sizeof(newly_acked_seq)) < 0) {
851 ldout(msgr->cct,2) << "accept read error on newly_acked_seq" << dendl;
852 goto fail_registered;
853 }
854 }
855
856 pipe_lock.Lock();
857 discard_requeued_up_to(newly_acked_seq);
858 if (state != STATE_CLOSED) {
859 ldout(msgr->cct,10) << "accept starting writer, state " << get_state_name() << dendl;
860 start_writer();
861 }
862 ldout(msgr->cct,20) << "accept done" << dendl;
863
864 maybe_start_delay_thread();
865
866 return 0; // success.
867
868 fail_registered:
869 ldout(msgr->cct, 10) << "accept fault after register" << dendl;
870
871 if (msgr->cct->_conf->ms_inject_internal_delays) {
872 ldout(msgr->cct, 10) << " sleep for " << msgr->cct->_conf->ms_inject_internal_delays << dendl;
873 utime_t t;
874 t.set_from_double(msgr->cct->_conf->ms_inject_internal_delays);
875 t.sleep();
876 }
877
878 fail_unlocked:
879 pipe_lock.Lock();
880 if (state != STATE_CLOSED) {
881 bool queued = is_queued();
882 ldout(msgr->cct, 10) << " queued = " << (int)queued << dendl;
883 if (queued) {
884 state = policy.server ? STATE_STANDBY : STATE_CONNECTING;
885 } else if (replaced) {
886 state = STATE_STANDBY;
887 } else {
888 state = STATE_CLOSED;
31f18b77 889 state_closed = true;
7c673cae
FG
890 }
891 fault();
892 if (queued || replaced)
893 start_writer();
894 }
895 return -1;
896
897 shutting_down:
898 msgr->lock.Unlock();
899 shutting_down_msgr_unlocked:
900 assert(pipe_lock.is_locked());
901
902 if (msgr->cct->_conf->ms_inject_internal_delays) {
903 ldout(msgr->cct, 10) << " sleep for " << msgr->cct->_conf->ms_inject_internal_delays << dendl;
904 utime_t t;
905 t.set_from_double(msgr->cct->_conf->ms_inject_internal_delays);
906 t.sleep();
907 }
908
909 state = STATE_CLOSED;
31f18b77 910 state_closed = true;
7c673cae
FG
911 fault();
912 return -1;
913}
914
915void Pipe::set_socket_options()
916{
917 // disable Nagle algorithm?
918 if (msgr->cct->_conf->ms_tcp_nodelay) {
919 int flag = 1;
920 int r = ::setsockopt(sd, IPPROTO_TCP, TCP_NODELAY, (char*)&flag, sizeof(flag));
921 if (r < 0) {
922 r = -errno;
923 ldout(msgr->cct,0) << "couldn't set TCP_NODELAY: "
924 << cpp_strerror(r) << dendl;
925 }
926 }
927 if (msgr->cct->_conf->ms_tcp_rcvbuf) {
928 int size = msgr->cct->_conf->ms_tcp_rcvbuf;
929 int r = ::setsockopt(sd, SOL_SOCKET, SO_RCVBUF, (void*)&size, sizeof(size));
930 if (r < 0) {
931 r = -errno;
932 ldout(msgr->cct,0) << "couldn't set SO_RCVBUF to " << size
933 << ": " << cpp_strerror(r) << dendl;
934 }
935 }
936
937 // block ESIGPIPE
3efd9988 938#ifdef CEPH_USE_SO_NOSIGPIPE
7c673cae
FG
939 int val = 1;
940 int r = ::setsockopt(sd, SOL_SOCKET, SO_NOSIGPIPE, (void*)&val, sizeof(val));
941 if (r) {
942 r = -errno;
943 ldout(msgr->cct,0) << "couldn't set SO_NOSIGPIPE: "
944 << cpp_strerror(r) << dendl;
945 }
946#endif
947
948#ifdef SO_PRIORITY
949 int prio = msgr->get_socket_priority();
950 if (prio >= 0) {
951 int r = -1;
952#ifdef IPTOS_CLASS_CS6
953 int iptos = IPTOS_CLASS_CS6;
954 int addr_family = 0;
955 if (!peer_addr.is_blank_ip()) {
956 addr_family = peer_addr.get_family();
957 } else {
958 addr_family = msgr->get_myaddr().get_family();
959 }
960 switch (addr_family) {
961 case AF_INET:
962 r = ::setsockopt(sd, IPPROTO_IP, IP_TOS, &iptos, sizeof(iptos));
963 break;
964 case AF_INET6:
965 r = ::setsockopt(sd, IPPROTO_IPV6, IPV6_TCLASS, &iptos, sizeof(iptos));
966 break;
967 default:
968 lderr(msgr->cct) << "couldn't set ToS of unknown family ("
969 << addr_family << ")"
970 << " to " << iptos << dendl;
971 return;
972 }
973 if (r < 0) {
974 r = -errno;
975 ldout(msgr->cct,0) << "couldn't set TOS to " << iptos
976 << ": " << cpp_strerror(r) << dendl;
977 }
978#endif
979 // setsockopt(IPTOS_CLASS_CS6) sets the priority of the socket as 0.
980 // See http://goo.gl/QWhvsD and http://goo.gl/laTbjT
981 // We need to call setsockopt(SO_PRIORITY) after it.
982 r = ::setsockopt(sd, SOL_SOCKET, SO_PRIORITY, &prio, sizeof(prio));
983 if (r < 0) {
984 r = -errno;
985 ldout(msgr->cct,0) << "couldn't set SO_PRIORITY to " << prio
986 << ": " << cpp_strerror(r) << dendl;
987 }
988 }
989#endif
990}
991
992int Pipe::connect()
993{
994 bool got_bad_auth = false;
995
996 ldout(msgr->cct,10) << "connect " << connect_seq << dendl;
997 assert(pipe_lock.is_locked());
998
999 __u32 cseq = connect_seq;
1000 __u32 gseq = msgr->get_global_seq();
1001
1002 // stop reader thread
1003 join_reader();
1004
1005 pipe_lock.Unlock();
1006
1007 char tag = -1;
1008 int rc = -1;
1009 struct msghdr msg;
1010 struct iovec msgvec[2];
1011 int msglen;
1012 char banner[strlen(CEPH_BANNER) + 1]; // extra byte makes coverity happy
1013 entity_addr_t paddr;
1014 entity_addr_t peer_addr_for_me, socket_addr;
1015 AuthAuthorizer *authorizer = NULL;
1016 bufferlist addrbl, myaddrbl;
1017 const md_config_t *conf = msgr->cct->_conf;
1018
1019 // close old socket. this is safe because we stopped the reader thread above.
1020 if (sd >= 0)
1021 ::close(sd);
1022
1023 // create socket?
1024 sd = ::socket(peer_addr.get_family(), SOCK_STREAM, 0);
1025 if (sd < 0) {
1026 rc = -errno;
1027 lderr(msgr->cct) << "connect couldn't create socket " << cpp_strerror(rc) << dendl;
1028 goto fail;
1029 }
1030
1031 recv_reset();
1032
1033 set_socket_options();
1034
1035 {
1036 entity_addr_t addr2bind = msgr->get_myaddr();
1037 if (msgr->cct->_conf->ms_bind_before_connect && (!addr2bind.is_blank_ip())) {
1038 addr2bind.set_port(0);
1039 int r = ::bind(sd , addr2bind.get_sockaddr(), addr2bind.get_sockaddr_len());
1040 if (r < 0) {
1041 ldout(msgr->cct,2) << "client bind error " << ", " << cpp_strerror(errno) << dendl;
1042 goto fail;
1043 }
1044 }
1045 }
1046
1047 // connect!
1048 ldout(msgr->cct,10) << "connecting to " << peer_addr << dendl;
1049 rc = ::connect(sd, peer_addr.get_sockaddr(), peer_addr.get_sockaddr_len());
1050 if (rc < 0) {
1051 int stored_errno = errno;
1052 ldout(msgr->cct,2) << "connect error " << peer_addr
1053 << ", " << cpp_strerror(stored_errno) << dendl;
1054 if (stored_errno == ECONNREFUSED) {
1055 ldout(msgr->cct, 2) << "connection refused!" << dendl;
1056 msgr->dispatch_queue.queue_refused(connection_state.get());
1057 }
1058 goto fail;
1059 }
1060
1061 // verify banner
1062 // FIXME: this should be non-blocking, or in some other way verify the banner as we get it.
1063 rc = tcp_read((char*)&banner, strlen(CEPH_BANNER));
1064 if (rc < 0) {
1065 ldout(msgr->cct,2) << "connect couldn't read banner, " << cpp_strerror(rc) << dendl;
1066 goto fail;
1067 }
1068 if (memcmp(banner, CEPH_BANNER, strlen(CEPH_BANNER))) {
1069 ldout(msgr->cct,0) << "connect protocol error (bad banner) on peer " << peer_addr << dendl;
1070 goto fail;
1071 }
1072
1073 memset(&msg, 0, sizeof(msg));
1074 msgvec[0].iov_base = banner;
1075 msgvec[0].iov_len = strlen(CEPH_BANNER);
1076 msg.msg_iov = msgvec;
1077 msg.msg_iovlen = 1;
1078 msglen = msgvec[0].iov_len;
1079 rc = do_sendmsg(&msg, msglen);
1080 if (rc < 0) {
1081 ldout(msgr->cct,2) << "connect couldn't write my banner, " << cpp_strerror(rc) << dendl;
1082 goto fail;
1083 }
1084
1085 // identify peer
1086 {
1087#if defined(__linux__) || defined(DARWIN) || defined(__FreeBSD__)
1088 bufferptr p(sizeof(ceph_entity_addr) * 2);
1089#else
1090 int wirelen = sizeof(__u32) * 2 + sizeof(ceph_sockaddr_storage);
1091 bufferptr p(wirelen * 2);
1092#endif
1093 addrbl.push_back(std::move(p));
1094 }
1095 rc = tcp_read(addrbl.c_str(), addrbl.length());
1096 if (rc < 0) {
1097 ldout(msgr->cct,2) << "connect couldn't read peer addrs, " << cpp_strerror(rc) << dendl;
1098 goto fail;
1099 }
1100 try {
1101 bufferlist::iterator p = addrbl.begin();
1102 ::decode(paddr, p);
1103 ::decode(peer_addr_for_me, p);
1104 }
1105 catch (buffer::error& e) {
1106 ldout(msgr->cct,2) << "connect couldn't decode peer addrs: " << e.what()
1107 << dendl;
1108 goto fail;
1109 }
1110 port = peer_addr_for_me.get_port();
1111
1112 ldout(msgr->cct,20) << "connect read peer addr " << paddr << " on socket " << sd << dendl;
1113 if (peer_addr != paddr) {
1114 if (paddr.is_blank_ip() &&
1115 peer_addr.get_port() == paddr.get_port() &&
1116 peer_addr.get_nonce() == paddr.get_nonce()) {
1117 ldout(msgr->cct,0) << "connect claims to be "
1118 << paddr << " not " << peer_addr << " - presumably this is the same node!" << dendl;
1119 } else {
224ce89b 1120 ldout(msgr->cct,10) << "connect claims to be "
c07f9fc5
FG
1121 << paddr << " not " << peer_addr << dendl;
1122 goto fail;
7c673cae
FG
1123 }
1124 }
1125
1126 ldout(msgr->cct,20) << "connect peer addr for me is " << peer_addr_for_me << dendl;
1127
1128 msgr->learned_addr(peer_addr_for_me);
1129
1130 ::encode(msgr->my_inst.addr, myaddrbl, 0); // legacy
1131
1132 memset(&msg, 0, sizeof(msg));
1133 msgvec[0].iov_base = myaddrbl.c_str();
1134 msgvec[0].iov_len = myaddrbl.length();
1135 msg.msg_iov = msgvec;
1136 msg.msg_iovlen = 1;
1137 msglen = msgvec[0].iov_len;
1138 rc = do_sendmsg(&msg, msglen);
1139 if (rc < 0) {
1140 ldout(msgr->cct,2) << "connect couldn't write my addr, " << cpp_strerror(rc) << dendl;
1141 goto fail;
1142 }
1143 ldout(msgr->cct,10) << "connect sent my addr " << msgr->my_inst.addr << dendl;
1144
1145
1146 while (1) {
28e407b8
AA
1147 if (!authorizer) {
1148 authorizer = msgr->get_authorizer(peer_type, false);
1149 }
7c673cae
FG
1150 bufferlist authorizer_reply;
1151
1152 ceph_msg_connect connect;
1153 connect.features = policy.features_supported;
1154 connect.host_type = msgr->get_myinst().name.type();
1155 connect.global_seq = gseq;
1156 connect.connect_seq = cseq;
1157 connect.protocol_version = msgr->get_proto_version(peer_type, true);
1158 connect.authorizer_protocol = authorizer ? authorizer->protocol : 0;
1159 connect.authorizer_len = authorizer ? authorizer->bl.length() : 0;
1160 if (authorizer)
1161 ldout(msgr->cct,10) << "connect.authorizer_len=" << connect.authorizer_len
1162 << " protocol=" << connect.authorizer_protocol << dendl;
1163 connect.flags = 0;
1164 if (policy.lossy)
1165 connect.flags |= CEPH_MSG_CONNECT_LOSSY; // this is fyi, actually, server decides!
1166 memset(&msg, 0, sizeof(msg));
1167 msgvec[0].iov_base = (char*)&connect;
1168 msgvec[0].iov_len = sizeof(connect);
1169 msg.msg_iov = msgvec;
1170 msg.msg_iovlen = 1;
1171 msglen = msgvec[0].iov_len;
1172 if (authorizer) {
1173 msgvec[1].iov_base = authorizer->bl.c_str();
1174 msgvec[1].iov_len = authorizer->bl.length();
1175 msg.msg_iovlen++;
1176 msglen += msgvec[1].iov_len;
1177 }
1178
1179 ldout(msgr->cct,10) << "connect sending gseq=" << gseq << " cseq=" << cseq
1180 << " proto=" << connect.protocol_version << dendl;
1181 rc = do_sendmsg(&msg, msglen);
1182 if (rc < 0) {
1183 ldout(msgr->cct,2) << "connect couldn't write gseq, cseq, " << cpp_strerror(rc) << dendl;
1184 goto fail;
1185 }
1186
1187 ldout(msgr->cct,20) << "connect wrote (self +) cseq, waiting for reply" << dendl;
1188 ceph_msg_connect_reply reply;
1189 rc = tcp_read((char*)&reply, sizeof(reply));
1190 if (rc < 0) {
1191 ldout(msgr->cct,2) << "connect read reply " << cpp_strerror(rc) << dendl;
1192 goto fail;
1193 }
1194
1195 ldout(msgr->cct,20) << "connect got reply tag " << (int)reply.tag
1196 << " connect_seq " << reply.connect_seq
1197 << " global_seq " << reply.global_seq
1198 << " proto " << reply.protocol_version
1199 << " flags " << (int)reply.flags
1200 << " features " << reply.features
1201 << dendl;
1202
1203 authorizer_reply.clear();
1204
1205 if (reply.authorizer_len) {
1206 ldout(msgr->cct,10) << "reply.authorizer_len=" << reply.authorizer_len << dendl;
1207 bufferptr bp = buffer::create(reply.authorizer_len);
1208 rc = tcp_read(bp.c_str(), reply.authorizer_len);
1209 if (rc < 0) {
1210 ldout(msgr->cct,10) << "connect couldn't read connect authorizer_reply" << cpp_strerror(rc) << dendl;
1211 goto fail;
1212 }
1213 authorizer_reply.push_back(bp);
1214 }
1215
28e407b8
AA
1216 if (reply.tag == CEPH_MSGR_TAG_CHALLENGE_AUTHORIZER) {
1217 authorizer->add_challenge(msgr->cct, authorizer_reply);
1218 ldout(msgr->cct,10) << " got authorizer challenge, " << authorizer_reply.length()
1219 << " bytes" << dendl;
1220 continue;
1221 }
1222
7c673cae
FG
1223 if (authorizer) {
1224 bufferlist::iterator iter = authorizer_reply.begin();
1225 if (!authorizer->verify_reply(iter)) {
1226 ldout(msgr->cct,0) << "failed verifying authorize reply" << dendl;
1227 goto fail;
1228 }
1229 }
1230
1231 if (conf->ms_inject_internal_delays) {
1232 ldout(msgr->cct, 10) << " sleep for " << msgr->cct->_conf->ms_inject_internal_delays << dendl;
1233 utime_t t;
1234 t.set_from_double(msgr->cct->_conf->ms_inject_internal_delays);
1235 t.sleep();
1236 }
1237
1238 pipe_lock.Lock();
1239 if (state != STATE_CONNECTING) {
1240 ldout(msgr->cct,0) << "connect got RESETSESSION but no longer connecting" << dendl;
1241 goto stop_locked;
1242 }
1243
1244 if (reply.tag == CEPH_MSGR_TAG_FEATURES) {
1245 ldout(msgr->cct,0) << "connect protocol feature mismatch, my " << std::hex
1246 << connect.features << " < peer " << reply.features
1247 << " missing " << (reply.features & ~policy.features_supported)
1248 << std::dec << dendl;
1249 goto fail_locked;
1250 }
1251
1252 if (reply.tag == CEPH_MSGR_TAG_BADPROTOVER) {
1253 ldout(msgr->cct,0) << "connect protocol version mismatch, my " << connect.protocol_version
1254 << " != " << reply.protocol_version << dendl;
1255 goto fail_locked;
1256 }
1257
1258 if (reply.tag == CEPH_MSGR_TAG_BADAUTHORIZER) {
1259 ldout(msgr->cct,0) << "connect got BADAUTHORIZER" << dendl;
1260 if (got_bad_auth)
1261 goto stop_locked;
1262 got_bad_auth = true;
1263 pipe_lock.Unlock();
1264 delete authorizer;
1265 authorizer = msgr->get_authorizer(peer_type, true); // try harder
1266 continue;
1267 }
1268 if (reply.tag == CEPH_MSGR_TAG_RESETSESSION) {
1269 ldout(msgr->cct,0) << "connect got RESETSESSION" << dendl;
1270 was_session_reset();
1271 cseq = 0;
1272 pipe_lock.Unlock();
1273 continue;
1274 }
1275 if (reply.tag == CEPH_MSGR_TAG_RETRY_GLOBAL) {
1276 gseq = msgr->get_global_seq(reply.global_seq);
1277 ldout(msgr->cct,10) << "connect got RETRY_GLOBAL " << reply.global_seq
1278 << " chose new " << gseq << dendl;
1279 pipe_lock.Unlock();
1280 continue;
1281 }
1282 if (reply.tag == CEPH_MSGR_TAG_RETRY_SESSION) {
1283 assert(reply.connect_seq > connect_seq);
1284 ldout(msgr->cct,10) << "connect got RETRY_SESSION " << connect_seq
1285 << " -> " << reply.connect_seq << dendl;
1286 cseq = connect_seq = reply.connect_seq;
1287 pipe_lock.Unlock();
1288 continue;
1289 }
1290
1291 if (reply.tag == CEPH_MSGR_TAG_WAIT) {
1292 ldout(msgr->cct,3) << "connect got WAIT (connection race)" << dendl;
1293 state = STATE_WAIT;
1294 goto stop_locked;
1295 }
1296
1297 if (reply.tag == CEPH_MSGR_TAG_READY ||
1298 reply.tag == CEPH_MSGR_TAG_SEQ) {
1299 uint64_t feat_missing = policy.features_required & ~(uint64_t)reply.features;
1300 if (feat_missing) {
1301 ldout(msgr->cct,1) << "missing required features " << std::hex << feat_missing << std::dec << dendl;
1302 goto fail_locked;
1303 }
1304
1305 if (reply.tag == CEPH_MSGR_TAG_SEQ) {
1306 ldout(msgr->cct,10) << "got CEPH_MSGR_TAG_SEQ, reading acked_seq and writing in_seq" << dendl;
1307 uint64_t newly_acked_seq = 0;
1308 rc = tcp_read((char*)&newly_acked_seq, sizeof(newly_acked_seq));
1309 if (rc < 0) {
1310 ldout(msgr->cct,2) << "connect read error on newly_acked_seq" << cpp_strerror(rc) << dendl;
1311 goto fail_locked;
1312 }
1313 ldout(msgr->cct,2) << " got newly_acked_seq " << newly_acked_seq
1314 << " vs out_seq " << out_seq << dendl;
1315 while (newly_acked_seq > out_seq) {
1316 Message *m = _get_next_outgoing();
1317 assert(m);
1318 ldout(msgr->cct,2) << " discarding previously sent " << m->get_seq()
1319 << " " << *m << dendl;
1320 assert(m->get_seq() <= newly_acked_seq);
1321 m->put();
1322 ++out_seq;
1323 }
1324 if (tcp_write((char*)&in_seq, sizeof(in_seq)) < 0) {
1325 ldout(msgr->cct,2) << "connect write error on in_seq" << dendl;
1326 goto fail_locked;
1327 }
1328 }
1329
1330 // hooray!
1331 peer_global_seq = reply.global_seq;
1332 policy.lossy = reply.flags & CEPH_MSG_CONNECT_LOSSY;
1333 state = STATE_OPEN;
1334 connect_seq = cseq + 1;
1335 assert(connect_seq == reply.connect_seq);
1336 backoff = utime_t();
1337 connection_state->set_features((uint64_t)reply.features & (uint64_t)connect.features);
1338 ldout(msgr->cct,10) << "connect success " << connect_seq << ", lossy = " << policy.lossy
1339 << ", features " << connection_state->get_features() << dendl;
1340
1341
1342 // If we have an authorizer, get a new AuthSessionHandler to deal with ongoing security of the
1343 // connection. PLR
1344
1345 if (authorizer != NULL) {
1346 session_security.reset(
1347 get_auth_session_handler(msgr->cct,
1348 authorizer->protocol,
1349 authorizer->session_key,
1350 connection_state->get_features()));
1351 } else {
1352 // We have no authorizer, so we shouldn't be applying security to messages in this pipe. PLR
1353 session_security.reset();
1354 }
1355
1356 msgr->dispatch_queue.queue_connect(connection_state.get());
1357 msgr->ms_deliver_handle_fast_connect(connection_state.get());
1358
1359 if (!reader_running) {
1360 ldout(msgr->cct,20) << "connect starting reader" << dendl;
1361 start_reader();
1362 }
1363 maybe_start_delay_thread();
1364 delete authorizer;
1365 return 0;
1366 }
1367
1368 // protocol error
1369 ldout(msgr->cct,0) << "connect got bad tag " << (int)tag << dendl;
1370 goto fail_locked;
1371 }
1372
1373 fail:
1374 if (conf->ms_inject_internal_delays) {
1375 ldout(msgr->cct, 10) << " sleep for " << msgr->cct->_conf->ms_inject_internal_delays << dendl;
1376 utime_t t;
1377 t.set_from_double(msgr->cct->_conf->ms_inject_internal_delays);
1378 t.sleep();
1379 }
1380
1381 pipe_lock.Lock();
1382 fail_locked:
1383 if (state == STATE_CONNECTING)
1384 fault();
1385 else
1386 ldout(msgr->cct,3) << "connect fault, but state = " << get_state_name()
1387 << " != connecting, stopping" << dendl;
1388
1389 stop_locked:
1390 delete authorizer;
1391 return rc;
1392}
1393
1394void Pipe::register_pipe()
1395{
1396 ldout(msgr->cct,10) << "register_pipe" << dendl;
1397 assert(msgr->lock.is_locked());
1398 Pipe *existing = msgr->_lookup_pipe(peer_addr);
1399 assert(existing == NULL);
1400 msgr->rank_pipe[peer_addr] = this;
1401}
1402
1403void Pipe::unregister_pipe()
1404{
1405 assert(msgr->lock.is_locked());
1406 ceph::unordered_map<entity_addr_t,Pipe*>::iterator p = msgr->rank_pipe.find(peer_addr);
1407 if (p != msgr->rank_pipe.end() && p->second == this) {
1408 ldout(msgr->cct,10) << "unregister_pipe" << dendl;
1409 msgr->rank_pipe.erase(p);
1410 } else {
1411 ldout(msgr->cct,10) << "unregister_pipe - not registered" << dendl;
1412 msgr->accepting_pipes.erase(this); // somewhat overkill, but safe.
1413 }
1414}
1415
1416void Pipe::join()
1417{
1418 ldout(msgr->cct, 20) << "join" << dendl;
1419 if (writer_thread.is_started())
1420 writer_thread.join();
1421 if (reader_thread.is_started())
1422 reader_thread.join();
1423 if (delay_thread) {
1424 ldout(msgr->cct, 20) << "joining delay_thread" << dendl;
1425 delay_thread->stop();
1426 delay_thread->join();
1427 }
1428}
1429
1430void Pipe::requeue_sent()
1431{
1432 if (sent.empty())
1433 return;
1434
1435 list<Message*>& rq = out_q[CEPH_MSG_PRIO_HIGHEST];
1436 while (!sent.empty()) {
1437 Message *m = sent.back();
1438 sent.pop_back();
1439 ldout(msgr->cct,10) << "requeue_sent " << *m << " for resend seq " << out_seq
1440 << " (" << m->get_seq() << ")" << dendl;
1441 rq.push_front(m);
1442 out_seq--;
1443 }
1444}
1445
1446void Pipe::discard_requeued_up_to(uint64_t seq)
1447{
1448 ldout(msgr->cct, 10) << "discard_requeued_up_to " << seq << dendl;
1449 if (out_q.count(CEPH_MSG_PRIO_HIGHEST) == 0)
1450 return;
1451 list<Message*>& rq = out_q[CEPH_MSG_PRIO_HIGHEST];
1452 while (!rq.empty()) {
1453 Message *m = rq.front();
1454 if (m->get_seq() == 0 || m->get_seq() > seq)
1455 break;
1456 ldout(msgr->cct,10) << "discard_requeued_up_to " << *m << " for resend seq " << out_seq
1457 << " <= " << seq << ", discarding" << dendl;
1458 m->put();
1459 rq.pop_front();
1460 out_seq++;
1461 }
1462 if (rq.empty())
1463 out_q.erase(CEPH_MSG_PRIO_HIGHEST);
1464}
1465
1466/*
1467 * Tears down the Pipe's message queues, and removes them from the DispatchQueue
1468 * Must hold pipe_lock prior to calling.
1469 */
1470void Pipe::discard_out_queue()
1471{
1472 ldout(msgr->cct,10) << "discard_queue" << dendl;
1473
1474 for (list<Message*>::iterator p = sent.begin(); p != sent.end(); ++p) {
1475 ldout(msgr->cct,20) << " discard " << *p << dendl;
1476 (*p)->put();
1477 }
1478 sent.clear();
1479 for (map<int,list<Message*> >::iterator p = out_q.begin(); p != out_q.end(); ++p)
1480 for (list<Message*>::iterator r = p->second.begin(); r != p->second.end(); ++r) {
1481 ldout(msgr->cct,20) << " discard " << *r << dendl;
1482 (*r)->put();
1483 }
1484 out_q.clear();
1485}
1486
1487void Pipe::fault(bool onread)
1488{
1489 const md_config_t *conf = msgr->cct->_conf;
1490 assert(pipe_lock.is_locked());
1491 cond.Signal();
1492
1493 if (onread && state == STATE_CONNECTING) {
1494 ldout(msgr->cct,10) << "fault already connecting, reader shutting down" << dendl;
1495 return;
1496 }
1497
1498 ldout(msgr->cct,2) << "fault " << cpp_strerror(errno) << dendl;
1499
1500 if (state == STATE_CLOSED ||
1501 state == STATE_CLOSING) {
1502 ldout(msgr->cct,10) << "fault already closed|closing" << dendl;
1503 if (connection_state->clear_pipe(this))
1504 msgr->dispatch_queue.queue_reset(connection_state.get());
1505 return;
1506 }
1507
1508 shutdown_socket();
1509
1510 // lossy channel?
1511 if (policy.lossy && state != STATE_CONNECTING) {
1512 ldout(msgr->cct,10) << "fault on lossy channel, failing" << dendl;
1513
1514 // disconnect from Connection, and mark it failed. future messages
1515 // will be dropped.
1516 assert(connection_state);
1517 stop();
1518 bool cleared = connection_state->clear_pipe(this);
1519
1520 // crib locks, blech. note that Pipe is now STATE_CLOSED and the
1521 // rank_pipe entry is ignored by others.
1522 pipe_lock.Unlock();
1523
1524 if (conf->ms_inject_internal_delays) {
1525 ldout(msgr->cct, 10) << " sleep for " << msgr->cct->_conf->ms_inject_internal_delays << dendl;
1526 utime_t t;
1527 t.set_from_double(msgr->cct->_conf->ms_inject_internal_delays);
1528 t.sleep();
1529 }
1530
1531 msgr->lock.Lock();
1532 pipe_lock.Lock();
1533 unregister_pipe();
1534 msgr->lock.Unlock();
1535
1536 if (delay_thread)
1537 delay_thread->discard();
1538 in_q->discard_queue(conn_id);
1539 discard_out_queue();
1540 if (cleared)
1541 msgr->dispatch_queue.queue_reset(connection_state.get());
1542 return;
1543 }
1544
1545 // queue delayed items immediately
1546 if (delay_thread)
1547 delay_thread->flush();
1548
1549 // requeue sent items
1550 requeue_sent();
1551
1552 if (policy.standby && !is_queued()) {
1553 ldout(msgr->cct,0) << "fault with nothing to send, going to standby" << dendl;
1554 state = STATE_STANDBY;
1555 return;
1556 }
1557
1558 if (state != STATE_CONNECTING) {
1559 if (policy.server) {
1560 ldout(msgr->cct,0) << "fault, server, going to standby" << dendl;
1561 state = STATE_STANDBY;
1562 } else {
1563 ldout(msgr->cct,0) << "fault, initiating reconnect" << dendl;
1564 connect_seq++;
1565 state = STATE_CONNECTING;
1566 }
1567 backoff = utime_t();
1568 } else if (backoff == utime_t()) {
1569 ldout(msgr->cct,0) << "fault" << dendl;
1570 backoff.set_from_double(conf->ms_initial_backoff);
1571 } else {
1572 ldout(msgr->cct,10) << "fault waiting " << backoff << dendl;
1573 cond.WaitInterval(pipe_lock, backoff);
1574 backoff += backoff;
1575 if (backoff > conf->ms_max_backoff)
1576 backoff.set_from_double(conf->ms_max_backoff);
1577 ldout(msgr->cct,10) << "fault done waiting or woke up" << dendl;
1578 }
1579}
1580
1581int Pipe::randomize_out_seq()
1582{
1583 if (connection_state->get_features() & CEPH_FEATURE_MSG_AUTH) {
1584 // Set out_seq to a random value, so CRC won't be predictable. Don't bother checking seq_error
1585 // here. We'll check it on the call. PLR
1586 int seq_error = get_random_bytes((char *)&out_seq, sizeof(out_seq));
1587 out_seq &= SEQ_MASK;
1588 lsubdout(msgr->cct, ms, 10) << "randomize_out_seq " << out_seq << dendl;
1589 return seq_error;
1590 } else {
1591 // previously, seq #'s always started at 0.
1592 out_seq = 0;
1593 return 0;
1594 }
1595}
1596
1597void Pipe::was_session_reset()
1598{
1599 assert(pipe_lock.is_locked());
1600
1601 ldout(msgr->cct,10) << "was_session_reset" << dendl;
1602 in_q->discard_queue(conn_id);
1603 if (delay_thread)
1604 delay_thread->discard();
1605 discard_out_queue();
1606
1607 msgr->dispatch_queue.queue_remote_reset(connection_state.get());
1608
1609 if (randomize_out_seq()) {
1610 lsubdout(msgr->cct,ms,15) << "was_session_reset(): Could not get random bytes to set seq number for session reset; set seq number to " << out_seq << dendl;
1611 }
1612
1613 in_seq = 0;
1614 connect_seq = 0;
1615}
1616
1617void Pipe::stop()
1618{
1619 ldout(msgr->cct,10) << "stop" << dendl;
1620 assert(pipe_lock.is_locked());
1621 state = STATE_CLOSED;
31f18b77 1622 state_closed = true;
7c673cae
FG
1623 cond.Signal();
1624 shutdown_socket();
1625}
1626
1627void Pipe::stop_and_wait()
1628{
1629 assert(pipe_lock.is_locked_by_me());
1630 if (state != STATE_CLOSED)
1631 stop();
1632
1633 if (msgr->cct->_conf->ms_inject_internal_delays) {
1634 ldout(msgr->cct, 10) << __func__ << " sleep for "
1635 << msgr->cct->_conf->ms_inject_internal_delays
1636 << dendl;
1637 utime_t t;
1638 t.set_from_double(msgr->cct->_conf->ms_inject_internal_delays);
1639 t.sleep();
1640 }
1641
1642 if (delay_thread) {
1643 pipe_lock.Unlock();
1644 delay_thread->stop_fast_dispatching();
1645 pipe_lock.Lock();
1646 }
1647 while (reader_running &&
1648 reader_dispatching)
1649 cond.Wait(pipe_lock);
1650}
1651
1652/* read msgs from socket.
1653 * also, server.
1654 */
1655void Pipe::reader()
1656{
1657 pipe_lock.Lock();
1658
1659 if (state == STATE_ACCEPTING) {
1660 accept();
1661 assert(pipe_lock.is_locked());
1662 }
1663
1664 // loop.
1665 while (state != STATE_CLOSED &&
1666 state != STATE_CONNECTING) {
1667 assert(pipe_lock.is_locked());
1668
1669 // sleep if (re)connecting
1670 if (state == STATE_STANDBY) {
1671 ldout(msgr->cct,20) << "reader sleeping during reconnect|standby" << dendl;
1672 cond.Wait(pipe_lock);
1673 continue;
1674 }
1675
1676 // get a reference to the AuthSessionHandler while we have the pipe_lock
1677 ceph::shared_ptr<AuthSessionHandler> auth_handler = session_security;
1678
1679 pipe_lock.Unlock();
1680
1681 char tag = -1;
1682 ldout(msgr->cct,20) << "reader reading tag..." << dendl;
1683 if (tcp_read((char*)&tag, 1) < 0) {
1684 pipe_lock.Lock();
1685 ldout(msgr->cct,2) << "reader couldn't read tag, " << cpp_strerror(errno) << dendl;
1686 fault(true);
1687 continue;
1688 }
1689
1690 if (tag == CEPH_MSGR_TAG_KEEPALIVE) {
1691 ldout(msgr->cct,2) << "reader got KEEPALIVE" << dendl;
1692 pipe_lock.Lock();
1693 connection_state->set_last_keepalive(ceph_clock_now());
1694 continue;
1695 }
1696 if (tag == CEPH_MSGR_TAG_KEEPALIVE2) {
1697 ldout(msgr->cct,30) << "reader got KEEPALIVE2 tag ..." << dendl;
1698 ceph_timespec t;
1699 int rc = tcp_read((char*)&t, sizeof(t));
1700 pipe_lock.Lock();
1701 if (rc < 0) {
1702 ldout(msgr->cct,2) << "reader couldn't read KEEPALIVE2 stamp "
1703 << cpp_strerror(errno) << dendl;
1704 fault(true);
1705 } else {
1706 send_keepalive_ack = true;
1707 keepalive_ack_stamp = utime_t(t);
1708 ldout(msgr->cct,2) << "reader got KEEPALIVE2 " << keepalive_ack_stamp
1709 << dendl;
1710 connection_state->set_last_keepalive(ceph_clock_now());
1711 cond.Signal();
1712 }
1713 continue;
1714 }
1715 if (tag == CEPH_MSGR_TAG_KEEPALIVE2_ACK) {
1716 ldout(msgr->cct,2) << "reader got KEEPALIVE_ACK" << dendl;
1717 struct ceph_timespec t;
1718 int rc = tcp_read((char*)&t, sizeof(t));
1719 pipe_lock.Lock();
1720 if (rc < 0) {
1721 ldout(msgr->cct,2) << "reader couldn't read KEEPALIVE2 stamp " << cpp_strerror(errno) << dendl;
1722 fault(true);
1723 } else {
1724 connection_state->set_last_keepalive_ack(utime_t(t));
1725 }
1726 continue;
1727 }
1728
1729 // open ...
1730 if (tag == CEPH_MSGR_TAG_ACK) {
1731 ldout(msgr->cct,20) << "reader got ACK" << dendl;
1732 ceph_le64 seq;
1733 int rc = tcp_read((char*)&seq, sizeof(seq));
1734 pipe_lock.Lock();
1735 if (rc < 0) {
1736 ldout(msgr->cct,2) << "reader couldn't read ack seq, " << cpp_strerror(errno) << dendl;
1737 fault(true);
1738 } else if (state != STATE_CLOSED) {
1739 handle_ack(seq);
1740 }
1741 continue;
1742 }
1743
1744 else if (tag == CEPH_MSGR_TAG_MSG) {
1745 ldout(msgr->cct,20) << "reader got MSG" << dendl;
1746 Message *m = 0;
1747 int r = read_message(&m, auth_handler.get());
1748
1749 pipe_lock.Lock();
1750
1751 if (!m) {
1752 if (r < 0)
1753 fault(true);
1754 continue;
1755 }
1756
1757 m->trace.event("pipe read message");
1758
1759 if (state == STATE_CLOSED ||
1760 state == STATE_CONNECTING) {
1761 in_q->dispatch_throttle_release(m->get_dispatch_throttle_size());
1762 m->put();
1763 continue;
1764 }
1765
1766 // check received seq#. if it is old, drop the message.
1767 // note that incoming messages may skip ahead. this is convenient for the client
1768 // side queueing because messages can't be renumbered, but the (kernel) client will
1769 // occasionally pull a message out of the sent queue to send elsewhere. in that case
1770 // it doesn't matter if we "got" it or not.
1771 if (m->get_seq() <= in_seq) {
1772 ldout(msgr->cct,0) << "reader got old message "
1773 << m->get_seq() << " <= " << in_seq << " " << m << " " << *m
1774 << ", discarding" << dendl;
1775 in_q->dispatch_throttle_release(m->get_dispatch_throttle_size());
1776 m->put();
1777 if (connection_state->has_feature(CEPH_FEATURE_RECONNECT_SEQ) &&
1778 msgr->cct->_conf->ms_die_on_old_message)
1779 assert(0 == "old msgs despite reconnect_seq feature");
1780 continue;
1781 }
1782 if (m->get_seq() > in_seq + 1) {
1783 ldout(msgr->cct,0) << "reader missed message? skipped from seq "
1784 << in_seq << " to " << m->get_seq() << dendl;
1785 if (msgr->cct->_conf->ms_die_on_skipped_message)
1786 assert(0 == "skipped incoming seq");
1787 }
1788
1789 m->set_connection(connection_state.get());
1790
1791 // note last received message.
1792 in_seq = m->get_seq();
1793
1794 cond.Signal(); // wake up writer, to ack this
1795
1796 ldout(msgr->cct,10) << "reader got message "
1797 << m->get_seq() << " " << m << " " << *m
1798 << dendl;
1799 in_q->fast_preprocess(m);
1800
1801 if (delay_thread) {
1802 utime_t release;
1803 if (rand() % 10000 < msgr->cct->_conf->ms_inject_delay_probability * 10000.0) {
1804 release = m->get_recv_stamp();
1805 release += msgr->cct->_conf->ms_inject_delay_max * (double)(rand() % 10000) / 10000.0;
1806 lsubdout(msgr->cct, ms, 1) << "queue_received will delay until " << release << " on " << m << " " << *m << dendl;
1807 }
1808 delay_thread->queue(release, m);
1809 } else {
1810 if (in_q->can_fast_dispatch(m)) {
1811 reader_dispatching = true;
1812 pipe_lock.Unlock();
1813 in_q->fast_dispatch(m);
1814 pipe_lock.Lock();
1815 reader_dispatching = false;
1816 if (state == STATE_CLOSED ||
1817 notify_on_dispatch_done) { // there might be somebody waiting
1818 notify_on_dispatch_done = false;
1819 cond.Signal();
1820 }
1821 } else {
1822 in_q->enqueue(m, m->get_priority(), conn_id);
1823 }
1824 }
1825 }
1826
1827 else if (tag == CEPH_MSGR_TAG_CLOSE) {
1828 ldout(msgr->cct,20) << "reader got CLOSE" << dendl;
1829 pipe_lock.Lock();
1830 if (state == STATE_CLOSING) {
1831 state = STATE_CLOSED;
31f18b77 1832 state_closed = true;
7c673cae
FG
1833 } else {
1834 state = STATE_CLOSING;
1835 }
1836 cond.Signal();
1837 break;
1838 }
1839 else {
1840 ldout(msgr->cct,0) << "reader bad tag " << (int)tag << dendl;
1841 pipe_lock.Lock();
1842 fault(true);
1843 }
1844 }
1845
1846
1847 // reap?
1848 reader_running = false;
1849 reader_needs_join = true;
1850 unlock_maybe_reap();
1851 ldout(msgr->cct,10) << "reader done" << dendl;
1852}
1853
1854/* write msgs to socket.
1855 * also, client.
1856 */
1857void Pipe::writer()
1858{
1859 pipe_lock.Lock();
1860 while (state != STATE_CLOSED) {// && state != STATE_WAIT) {
1861 ldout(msgr->cct,10) << "writer: state = " << get_state_name()
1862 << " policy.server=" << policy.server << dendl;
1863
1864 // standby?
1865 if (is_queued() && state == STATE_STANDBY && !policy.server)
1866 state = STATE_CONNECTING;
1867
1868 // connect?
1869 if (state == STATE_CONNECTING) {
1870 assert(!policy.server);
1871 connect();
1872 continue;
1873 }
1874
1875 if (state == STATE_CLOSING) {
1876 // write close tag
1877 ldout(msgr->cct,20) << "writer writing CLOSE tag" << dendl;
1878 char tag = CEPH_MSGR_TAG_CLOSE;
1879 state = STATE_CLOSED;
31f18b77 1880 state_closed = true;
7c673cae
FG
1881 pipe_lock.Unlock();
1882 if (sd >= 0) {
1883 // we can ignore return value, actually; we don't care if this succeeds.
1884 int r = ::write(sd, &tag, 1);
1885 (void)r;
1886 }
1887 pipe_lock.Lock();
1888 continue;
1889 }
1890
1891 if (state != STATE_CONNECTING && state != STATE_WAIT && state != STATE_STANDBY &&
1892 (is_queued() || in_seq > in_seq_acked)) {
1893
1894 // keepalive?
1895 if (send_keepalive) {
1896 int rc;
1897 if (connection_state->has_feature(CEPH_FEATURE_MSGR_KEEPALIVE2)) {
1898 pipe_lock.Unlock();
1899 rc = write_keepalive2(CEPH_MSGR_TAG_KEEPALIVE2,
1900 ceph_clock_now());
1901 } else {
1902 pipe_lock.Unlock();
1903 rc = write_keepalive();
1904 }
1905 pipe_lock.Lock();
1906 if (rc < 0) {
1907 ldout(msgr->cct,2) << "writer couldn't write keepalive[2], "
1908 << cpp_strerror(errno) << dendl;
1909 fault();
1910 continue;
1911 }
1912 send_keepalive = false;
1913 }
1914 if (send_keepalive_ack) {
1915 utime_t t = keepalive_ack_stamp;
1916 pipe_lock.Unlock();
1917 int rc = write_keepalive2(CEPH_MSGR_TAG_KEEPALIVE2_ACK, t);
1918 pipe_lock.Lock();
1919 if (rc < 0) {
1920 ldout(msgr->cct,2) << "writer couldn't write keepalive_ack, " << cpp_strerror(errno) << dendl;
1921 fault();
1922 continue;
1923 }
1924 send_keepalive_ack = false;
1925 }
1926
1927 // send ack?
1928 if (in_seq > in_seq_acked) {
1929 uint64_t send_seq = in_seq;
1930 pipe_lock.Unlock();
1931 int rc = write_ack(send_seq);
1932 pipe_lock.Lock();
1933 if (rc < 0) {
1934 ldout(msgr->cct,2) << "writer couldn't write ack, " << cpp_strerror(errno) << dendl;
1935 fault();
1936 continue;
1937 }
1938 in_seq_acked = send_seq;
1939 }
1940
1941 // grab outgoing message
1942 Message *m = _get_next_outgoing();
1943 if (m) {
1944 m->set_seq(++out_seq);
1945 if (!policy.lossy) {
1946 // put on sent list
1947 sent.push_back(m);
1948 m->get();
1949 }
1950
1951 // associate message with Connection (for benefit of encode_payload)
1952 m->set_connection(connection_state.get());
1953
1954 uint64_t features = connection_state->get_features();
1955
1956 if (m->empty_payload())
1957 ldout(msgr->cct,20) << "writer encoding " << m->get_seq() << " features " << features
1958 << " " << m << " " << *m << dendl;
1959 else
1960 ldout(msgr->cct,20) << "writer half-reencoding " << m->get_seq() << " features " << features
1961 << " " << m << " " << *m << dendl;
1962
1963 // encode and copy out of *m
1964 m->encode(features, msgr->crcflags);
1965
1966 // prepare everything
1967 const ceph_msg_header& header = m->get_header();
1968 const ceph_msg_footer& footer = m->get_footer();
1969
1970 // Now that we have all the crcs calculated, handle the
1971 // digital signature for the message, if the pipe has session
1972 // security set up. Some session security options do not
1973 // actually calculate and check the signature, but they should
1974 // handle the calls to sign_message and check_signature. PLR
1975 if (session_security.get() == NULL) {
1976 ldout(msgr->cct, 20) << "writer no session security" << dendl;
1977 } else {
1978 if (session_security->sign_message(m)) {
1979 ldout(msgr->cct, 20) << "writer failed to sign seq # " << header.seq
1980 << "): sig = " << footer.sig << dendl;
1981 } else {
1982 ldout(msgr->cct, 20) << "writer signed seq # " << header.seq
1983 << "): sig = " << footer.sig << dendl;
1984 }
1985 }
1986
1987 bufferlist blist = m->get_payload();
1988 blist.append(m->get_middle());
1989 blist.append(m->get_data());
1990
1991 pipe_lock.Unlock();
1992
1993 m->trace.event("pipe writing message");
1994
1995 ldout(msgr->cct,20) << "writer sending " << m->get_seq() << " " << m << dendl;
1996 int rc = write_message(header, footer, blist);
1997
1998 pipe_lock.Lock();
1999 if (rc < 0) {
2000 ldout(msgr->cct,1) << "writer error sending " << m << ", "
2001 << cpp_strerror(errno) << dendl;
2002 fault();
2003 }
2004 m->put();
2005 }
2006 continue;
2007 }
2008
2009 // wait
2010 ldout(msgr->cct,20) << "writer sleeping" << dendl;
2011 cond.Wait(pipe_lock);
2012 }
2013
2014 ldout(msgr->cct,20) << "writer finishing" << dendl;
2015
2016 // reap?
2017 writer_running = false;
2018 unlock_maybe_reap();
2019 ldout(msgr->cct,10) << "writer done" << dendl;
2020}
2021
2022void Pipe::unlock_maybe_reap()
2023{
2024 if (!reader_running && !writer_running) {
2025 shutdown_socket();
2026 pipe_lock.Unlock();
2027 if (delay_thread && delay_thread->is_flushing()) {
2028 delay_thread->wait_for_flush();
2029 }
2030 msgr->queue_reap(this);
2031 } else {
2032 pipe_lock.Unlock();
2033 }
2034}
2035
2036static void alloc_aligned_buffer(bufferlist& data, unsigned len, unsigned off)
2037{
2038 // create a buffer to read into that matches the data alignment
2039 unsigned left = len;
2040 if (off & ~CEPH_PAGE_MASK) {
2041 // head
2042 unsigned head = 0;
2043 head = MIN(CEPH_PAGE_SIZE - (off & ~CEPH_PAGE_MASK), left);
2044 data.push_back(buffer::create(head));
2045 left -= head;
2046 }
2047 unsigned middle = left & CEPH_PAGE_MASK;
2048 if (middle > 0) {
2049 data.push_back(buffer::create_page_aligned(middle));
2050 left -= middle;
2051 }
2052 if (left) {
2053 data.push_back(buffer::create(left));
2054 }
2055}
2056
2057int Pipe::read_message(Message **pm, AuthSessionHandler* auth_handler)
2058{
2059 int ret = -1;
2060 // envelope
2061 //ldout(msgr->cct,10) << "receiver.read_message from sd " << sd << dendl;
2062
2063 ceph_msg_header header;
2064 ceph_msg_footer footer;
2065 __u32 header_crc = 0;
2066
2067 if (connection_state->has_feature(CEPH_FEATURE_NOSRCADDR)) {
2068 if (tcp_read((char*)&header, sizeof(header)) < 0)
2069 return -1;
2070 if (msgr->crcflags & MSG_CRC_HEADER) {
2071 header_crc = ceph_crc32c(0, (unsigned char *)&header, sizeof(header) - sizeof(header.crc));
2072 }
2073 } else {
2074 ceph_msg_header_old oldheader;
2075 if (tcp_read((char*)&oldheader, sizeof(oldheader)) < 0)
2076 return -1;
2077 // this is fugly
2078 memcpy(&header, &oldheader, sizeof(header));
2079 header.src = oldheader.src.name;
2080 header.reserved = oldheader.reserved;
2081 if (msgr->crcflags & MSG_CRC_HEADER) {
2082 header.crc = oldheader.crc;
2083 header_crc = ceph_crc32c(0, (unsigned char *)&oldheader, sizeof(oldheader) - sizeof(oldheader.crc));
2084 }
2085 }
2086
2087 ldout(msgr->cct,20) << "reader got envelope type=" << header.type
2088 << " src " << entity_name_t(header.src)
2089 << " front=" << header.front_len
2090 << " data=" << header.data_len
2091 << " off " << header.data_off
2092 << dendl;
2093
2094 // verify header crc
2095 if ((msgr->crcflags & MSG_CRC_HEADER) && header_crc != header.crc) {
2096 ldout(msgr->cct,0) << "reader got bad header crc " << header_crc << " != " << header.crc << dendl;
2097 return -1;
2098 }
2099
2100 bufferlist front, middle, data;
2101 int front_len, middle_len;
2102 unsigned data_len, data_off;
2103 int aborted;
2104 Message *message;
2105 utime_t recv_stamp = ceph_clock_now();
2106
2107 if (policy.throttler_messages) {
2108 ldout(msgr->cct,10) << "reader wants " << 1 << " message from policy throttler "
2109 << policy.throttler_messages->get_current() << "/"
2110 << policy.throttler_messages->get_max() << dendl;
2111 policy.throttler_messages->get();
2112 }
2113
2114 uint64_t message_size = header.front_len + header.middle_len + header.data_len;
2115 if (message_size) {
2116 if (policy.throttler_bytes) {
2117 ldout(msgr->cct,10) << "reader wants " << message_size << " bytes from policy throttler "
2118 << policy.throttler_bytes->get_current() << "/"
2119 << policy.throttler_bytes->get_max() << dendl;
2120 policy.throttler_bytes->get(message_size);
2121 }
2122
2123 // throttle total bytes waiting for dispatch. do this _after_ the
2124 // policy throttle, as this one does not deadlock (unless dispatch
2125 // blocks indefinitely, which it shouldn't). in contrast, the
2126 // policy throttle carries for the lifetime of the message.
2127 ldout(msgr->cct,10) << "reader wants " << message_size << " from dispatch throttler "
2128 << in_q->dispatch_throttler.get_current() << "/"
2129 << in_q->dispatch_throttler.get_max() << dendl;
2130 in_q->dispatch_throttler.get(message_size);
2131 }
2132
2133 utime_t throttle_stamp = ceph_clock_now();
2134
2135 // read front
2136 front_len = header.front_len;
2137 if (front_len) {
2138 bufferptr bp = buffer::create(front_len);
2139 if (tcp_read(bp.c_str(), front_len) < 0)
2140 goto out_dethrottle;
2141 front.push_back(std::move(bp));
2142 ldout(msgr->cct,20) << "reader got front " << front.length() << dendl;
2143 }
2144
2145 // read middle
2146 middle_len = header.middle_len;
2147 if (middle_len) {
2148 bufferptr bp = buffer::create(middle_len);
2149 if (tcp_read(bp.c_str(), middle_len) < 0)
2150 goto out_dethrottle;
2151 middle.push_back(std::move(bp));
2152 ldout(msgr->cct,20) << "reader got middle " << middle.length() << dendl;
2153 }
2154
2155
2156 // read data
2157 data_len = le32_to_cpu(header.data_len);
2158 data_off = le32_to_cpu(header.data_off);
2159 if (data_len) {
2160 unsigned offset = 0;
2161 unsigned left = data_len;
2162
2163 bufferlist newbuf, rxbuf;
2164 bufferlist::iterator blp;
2165 int rxbuf_version = 0;
2166
2167 while (left > 0) {
2168 // wait for data
2169 if (tcp_read_wait() < 0)
2170 goto out_dethrottle;
2171
2172 // get a buffer
2173 connection_state->lock.Lock();
2174 map<ceph_tid_t,pair<bufferlist,int> >::iterator p = connection_state->rx_buffers.find(header.tid);
2175 if (p != connection_state->rx_buffers.end()) {
2176 if (rxbuf.length() == 0 || p->second.second != rxbuf_version) {
2177 ldout(msgr->cct,10) << "reader seleting rx buffer v " << p->second.second
2178 << " at offset " << offset
2179 << " len " << p->second.first.length() << dendl;
2180 rxbuf = p->second.first;
2181 rxbuf_version = p->second.second;
2182 // make sure it's big enough
2183 if (rxbuf.length() < data_len)
2184 rxbuf.push_back(buffer::create(data_len - rxbuf.length()));
2185 blp = p->second.first.begin();
2186 blp.advance(offset);
2187 }
2188 } else {
2189 if (!newbuf.length()) {
2190 ldout(msgr->cct,20) << "reader allocating new rx buffer at offset " << offset << dendl;
2191 alloc_aligned_buffer(newbuf, data_len, data_off);
2192 blp = newbuf.begin();
2193 blp.advance(offset);
2194 }
2195 }
2196 bufferptr bp = blp.get_current_ptr();
2197 int read = MIN(bp.length(), left);
2198 ldout(msgr->cct,20) << "reader reading nonblocking into " << (void*)bp.c_str() << " len " << bp.length() << dendl;
2199 ssize_t got = tcp_read_nonblocking(bp.c_str(), read);
2200 ldout(msgr->cct,30) << "reader read " << got << " of " << read << dendl;
2201 connection_state->lock.Unlock();
2202 if (got < 0)
2203 goto out_dethrottle;
2204 if (got > 0) {
2205 blp.advance(got);
2206 data.append(bp, 0, got);
2207 offset += got;
2208 left -= got;
2209 } // else we got a signal or something; just loop.
2210 }
2211 }
2212
2213 // footer
2214 if (connection_state->has_feature(CEPH_FEATURE_MSG_AUTH)) {
2215 if (tcp_read((char*)&footer, sizeof(footer)) < 0)
2216 goto out_dethrottle;
2217 } else {
2218 ceph_msg_footer_old old_footer;
2219 if (tcp_read((char*)&old_footer, sizeof(old_footer)) < 0)
2220 goto out_dethrottle;
2221 footer.front_crc = old_footer.front_crc;
2222 footer.middle_crc = old_footer.middle_crc;
2223 footer.data_crc = old_footer.data_crc;
2224 footer.sig = 0;
2225 footer.flags = old_footer.flags;
2226 }
2227
2228 aborted = (footer.flags & CEPH_MSG_FOOTER_COMPLETE) == 0;
2229 ldout(msgr->cct,10) << "aborted = " << aborted << dendl;
2230 if (aborted) {
2231 ldout(msgr->cct,0) << "reader got " << front.length() << " + " << middle.length() << " + " << data.length()
2232 << " byte message.. ABORTED" << dendl;
2233 ret = 0;
2234 goto out_dethrottle;
2235 }
2236
2237 ldout(msgr->cct,20) << "reader got " << front.length() << " + " << middle.length() << " + " << data.length()
2238 << " byte message" << dendl;
2239 message = decode_message(msgr->cct, msgr->crcflags, header, footer,
2240 front, middle, data, connection_state.get());
2241 if (!message) {
2242 ret = -EINVAL;
2243 goto out_dethrottle;
2244 }
2245
2246 //
2247 // Check the signature if one should be present. A zero return indicates success. PLR
2248 //
2249
2250 if (auth_handler == NULL) {
2251 ldout(msgr->cct, 10) << "No session security set" << dendl;
2252 } else {
2253 if (auth_handler->check_message_signature(message)) {
2254 ldout(msgr->cct, 0) << "Signature check failed" << dendl;
2255 message->put();
2256 ret = -EINVAL;
2257 goto out_dethrottle;
2258 }
2259 }
2260
2261 message->set_byte_throttler(policy.throttler_bytes);
2262 message->set_message_throttler(policy.throttler_messages);
2263
2264 // store reservation size in message, so we don't get confused
2265 // by messages entering the dispatch queue through other paths.
2266 message->set_dispatch_throttle_size(message_size);
2267
2268 message->set_recv_stamp(recv_stamp);
2269 message->set_throttle_stamp(throttle_stamp);
2270 message->set_recv_complete_stamp(ceph_clock_now());
2271
2272 *pm = message;
2273 return 0;
2274
2275 out_dethrottle:
2276 // release bytes reserved from the throttlers on failure
2277 if (policy.throttler_messages) {
2278 ldout(msgr->cct,10) << "reader releasing " << 1 << " message to policy throttler "
2279 << policy.throttler_messages->get_current() << "/"
2280 << policy.throttler_messages->get_max() << dendl;
2281 policy.throttler_messages->put();
2282 }
2283 if (message_size) {
2284 if (policy.throttler_bytes) {
2285 ldout(msgr->cct,10) << "reader releasing " << message_size << " bytes to policy throttler "
2286 << policy.throttler_bytes->get_current() << "/"
2287 << policy.throttler_bytes->get_max() << dendl;
2288 policy.throttler_bytes->put(message_size);
2289 }
2290
2291 in_q->dispatch_throttle_release(message_size);
2292 }
2293 return ret;
2294}
2295
7c673cae
FG
2296int Pipe::do_sendmsg(struct msghdr *msg, unsigned len, bool more)
2297{
3efd9988 2298 MSGR_SIGPIPE_STOPPER;
7c673cae
FG
2299 while (len > 0) {
2300 int r;
7c673cae 2301 r = ::sendmsg(sd, msg, MSG_NOSIGNAL | (more ? MSG_MORE : 0));
7c673cae
FG
2302 if (r == 0)
2303 ldout(msgr->cct,10) << "do_sendmsg hmm do_sendmsg got r==0!" << dendl;
2304 if (r < 0) {
2305 r = -errno;
2306 ldout(msgr->cct,1) << "do_sendmsg error " << cpp_strerror(r) << dendl;
7c673cae
FG
2307 return r;
2308 }
2309 if (state == STATE_CLOSED) {
2310 ldout(msgr->cct,10) << "do_sendmsg oh look, state == CLOSED, giving up" << dendl;
7c673cae
FG
2311 return -EINTR; // close enough
2312 }
2313
2314 len -= r;
2315 if (len == 0) break;
2316
2317 // hrmph. trim r bytes off the front of our message.
2318 ldout(msgr->cct,20) << "do_sendmsg short write did " << r << ", still have " << len << dendl;
2319 while (r > 0) {
2320 if (msg->msg_iov[0].iov_len <= (size_t)r) {
2321 // lose this whole item
2322 //ldout(msgr->cct,30) << "skipping " << msg->msg_iov[0].iov_len << ", " << (msg->msg_iovlen-1) << " v, " << r << " left" << dendl;
2323 r -= msg->msg_iov[0].iov_len;
2324 msg->msg_iov++;
2325 msg->msg_iovlen--;
2326 } else {
2327 // partial!
2328 //ldout(msgr->cct,30) << "adjusting " << msg->msg_iov[0].iov_len << ", " << msg->msg_iovlen << " v, " << r << " left" << dendl;
2329 msg->msg_iov[0].iov_base = (char *)msg->msg_iov[0].iov_base + r;
2330 msg->msg_iov[0].iov_len -= r;
2331 break;
2332 }
2333 }
2334 }
7c673cae
FG
2335 return 0;
2336}
2337
2338
2339int Pipe::write_ack(uint64_t seq)
2340{
2341 ldout(msgr->cct,10) << "write_ack " << seq << dendl;
2342
2343 char c = CEPH_MSGR_TAG_ACK;
2344 ceph_le64 s;
2345 s = seq;
2346
2347 struct msghdr msg;
2348 memset(&msg, 0, sizeof(msg));
2349 struct iovec msgvec[2];
2350 msgvec[0].iov_base = &c;
2351 msgvec[0].iov_len = 1;
2352 msgvec[1].iov_base = &s;
2353 msgvec[1].iov_len = sizeof(s);
2354 msg.msg_iov = msgvec;
2355 msg.msg_iovlen = 2;
2356
2357 if (do_sendmsg(&msg, 1 + sizeof(s), true) < 0)
2358 return -1;
2359 return 0;
2360}
2361
2362int Pipe::write_keepalive()
2363{
2364 ldout(msgr->cct,10) << "write_keepalive" << dendl;
2365
2366 char c = CEPH_MSGR_TAG_KEEPALIVE;
2367
2368 struct msghdr msg;
2369 memset(&msg, 0, sizeof(msg));
2370 struct iovec msgvec[2];
2371 msgvec[0].iov_base = &c;
2372 msgvec[0].iov_len = 1;
2373 msg.msg_iov = msgvec;
2374 msg.msg_iovlen = 1;
2375
2376 if (do_sendmsg(&msg, 1) < 0)
2377 return -1;
2378 return 0;
2379}
2380
2381int Pipe::write_keepalive2(char tag, const utime_t& t)
2382{
2383 ldout(msgr->cct,10) << "write_keepalive2 " << (int)tag << " " << t << dendl;
2384 struct ceph_timespec ts;
2385 t.encode_timeval(&ts);
2386 struct msghdr msg;
2387 memset(&msg, 0, sizeof(msg));
2388 struct iovec msgvec[2];
2389 msgvec[0].iov_base = &tag;
2390 msgvec[0].iov_len = 1;
2391 msgvec[1].iov_base = &ts;
2392 msgvec[1].iov_len = sizeof(ts);
2393 msg.msg_iov = msgvec;
2394 msg.msg_iovlen = 2;
2395
2396 if (do_sendmsg(&msg, 1 + sizeof(ts)) < 0)
2397 return -1;
2398 return 0;
2399}
2400
2401
2402int Pipe::write_message(const ceph_msg_header& header, const ceph_msg_footer& footer, bufferlist& blist)
2403{
2404 int ret;
2405
2406 // set up msghdr and iovecs
2407 struct msghdr msg;
2408 memset(&msg, 0, sizeof(msg));
2409 msg.msg_iov = msgvec;
2410 int msglen = 0;
2411
2412 // send tag
2413 char tag = CEPH_MSGR_TAG_MSG;
2414 msgvec[msg.msg_iovlen].iov_base = &tag;
2415 msgvec[msg.msg_iovlen].iov_len = 1;
2416 msglen++;
2417 msg.msg_iovlen++;
2418
2419 // send envelope
2420 ceph_msg_header_old oldheader;
2421 if (connection_state->has_feature(CEPH_FEATURE_NOSRCADDR)) {
2422 msgvec[msg.msg_iovlen].iov_base = (char*)&header;
2423 msgvec[msg.msg_iovlen].iov_len = sizeof(header);
2424 msglen += sizeof(header);
2425 msg.msg_iovlen++;
2426 } else {
2427 memcpy(&oldheader, &header, sizeof(header));
2428 oldheader.src.name = header.src;
2429 oldheader.src.addr = connection_state->get_peer_addr();
2430 oldheader.orig_src = oldheader.src;
2431 oldheader.reserved = header.reserved;
2432 if (msgr->crcflags & MSG_CRC_HEADER) {
2433 oldheader.crc = ceph_crc32c(0, (unsigned char*)&oldheader,
2434 sizeof(oldheader) - sizeof(oldheader.crc));
2435 } else {
2436 oldheader.crc = 0;
2437 }
2438 msgvec[msg.msg_iovlen].iov_base = (char*)&oldheader;
2439 msgvec[msg.msg_iovlen].iov_len = sizeof(oldheader);
2440 msglen += sizeof(oldheader);
2441 msg.msg_iovlen++;
2442 }
2443
2444 // payload (front+data)
2445 list<bufferptr>::const_iterator pb = blist.buffers().begin();
2446 unsigned b_off = 0; // carry-over buffer offset, if any
2447 unsigned bl_pos = 0; // blist pos
2448 unsigned left = blist.length();
2449
2450 while (left > 0) {
2451 unsigned donow = MIN(left, pb->length()-b_off);
2452 if (donow == 0) {
2453 ldout(msgr->cct,0) << "donow = " << donow << " left " << left << " pb->length " << pb->length()
2454 << " b_off " << b_off << dendl;
2455 }
2456 assert(donow > 0);
2457 ldout(msgr->cct,30) << " bl_pos " << bl_pos << " b_off " << b_off
2458 << " leftinchunk " << left
2459 << " buffer len " << pb->length()
2460 << " writing " << donow
2461 << dendl;
2462
2463 if (msg.msg_iovlen >= SM_IOV_MAX-2) {
2464 if (do_sendmsg(&msg, msglen, true))
2465 goto fail;
2466
2467 // and restart the iov
2468 msg.msg_iov = msgvec;
2469 msg.msg_iovlen = 0;
2470 msglen = 0;
2471 }
2472
2473 msgvec[msg.msg_iovlen].iov_base = (void*)(pb->c_str()+b_off);
2474 msgvec[msg.msg_iovlen].iov_len = donow;
2475 msglen += donow;
2476 msg.msg_iovlen++;
2477
2478 assert(left >= donow);
2479 left -= donow;
2480 b_off += donow;
2481 bl_pos += donow;
2482 if (left == 0)
2483 break;
2484 while (b_off == pb->length()) {
2485 ++pb;
2486 b_off = 0;
2487 }
2488 }
2489 assert(left == 0);
2490
2491 // send footer; if receiver doesn't support signatures, use the old footer format
2492
2493 ceph_msg_footer_old old_footer;
2494 if (connection_state->has_feature(CEPH_FEATURE_MSG_AUTH)) {
2495 msgvec[msg.msg_iovlen].iov_base = (void*)&footer;
2496 msgvec[msg.msg_iovlen].iov_len = sizeof(footer);
2497 msglen += sizeof(footer);
2498 msg.msg_iovlen++;
2499 } else {
2500 if (msgr->crcflags & MSG_CRC_HEADER) {
2501 old_footer.front_crc = footer.front_crc;
2502 old_footer.middle_crc = footer.middle_crc;
2503 } else {
2504 old_footer.front_crc = old_footer.middle_crc = 0;
2505 }
2506 old_footer.data_crc = msgr->crcflags & MSG_CRC_DATA ? footer.data_crc : 0;
2507 old_footer.flags = footer.flags;
2508 msgvec[msg.msg_iovlen].iov_base = (char*)&old_footer;
2509 msgvec[msg.msg_iovlen].iov_len = sizeof(old_footer);
2510 msglen += sizeof(old_footer);
2511 msg.msg_iovlen++;
2512 }
2513
2514 // send
2515 if (do_sendmsg(&msg, msglen))
2516 goto fail;
2517
2518 ret = 0;
2519
2520 out:
2521 return ret;
2522
2523 fail:
2524 ret = -1;
2525 goto out;
2526}
2527
2528
2529int Pipe::tcp_read(char *buf, unsigned len)
2530{
2531 if (sd < 0)
2532 return -EINVAL;
2533
2534 while (len > 0) {
2535
2536 if (msgr->cct->_conf->ms_inject_socket_failures && sd >= 0) {
2537 if (rand() % msgr->cct->_conf->ms_inject_socket_failures == 0) {
2538 ldout(msgr->cct, 0) << "injecting socket failure" << dendl;
2539 ::shutdown(sd, SHUT_RDWR);
2540 }
2541 }
2542
2543 if (tcp_read_wait() < 0)
2544 return -1;
2545
2546 ssize_t got = tcp_read_nonblocking(buf, len);
2547
2548 if (got < 0)
2549 return -1;
2550
2551 len -= got;
2552 buf += got;
2553 //lgeneric_dout(cct, DBL) << "tcp_read got " << got << ", " << len << " left" << dendl;
2554 }
2555 return 0;
2556}
2557
2558int Pipe::tcp_read_wait()
2559{
2560 if (sd < 0)
2561 return -EINVAL;
2562 struct pollfd pfd;
2563 short evmask;
2564 pfd.fd = sd;
2565 pfd.events = POLLIN;
2566#if defined(__linux__)
2567 pfd.events |= POLLRDHUP;
2568#endif
2569
2570 if (has_pending_data())
2571 return 0;
2572
2573 int r = poll(&pfd, 1, msgr->timeout);
2574 if (r < 0)
2575 return -errno;
2576 if (r == 0)
2577 return -EAGAIN;
2578
2579 evmask = POLLERR | POLLHUP | POLLNVAL;
2580#if defined(__linux__)
2581 evmask |= POLLRDHUP;
2582#endif
2583 if (pfd.revents & evmask)
2584 return -1;
2585
2586 if (!(pfd.revents & POLLIN))
2587 return -1;
2588
2589 return 0;
2590}
2591
2592ssize_t Pipe::do_recv(char *buf, size_t len, int flags)
2593{
2594again:
2595 ssize_t got = ::recv( sd, buf, len, flags );
2596 if (got < 0) {
2597 if (errno == EINTR) {
2598 goto again;
2599 }
2600 ldout(msgr->cct, 10) << __func__ << " socket " << sd << " returned "
2601 << got << " " << cpp_strerror(errno) << dendl;
2602 return -1;
2603 }
2604 if (got == 0) {
2605 return -1;
2606 }
2607 return got;
2608}
2609
2610ssize_t Pipe::buffered_recv(char *buf, size_t len, int flags)
2611{
2612 size_t left = len;
2613 ssize_t total_recv = 0;
2614 if (recv_len > recv_ofs) {
2615 int to_read = MIN(recv_len - recv_ofs, left);
2616 memcpy(buf, &recv_buf[recv_ofs], to_read);
2617 recv_ofs += to_read;
2618 left -= to_read;
2619 if (left == 0) {
2620 return to_read;
2621 }
2622 buf += to_read;
2623 total_recv += to_read;
2624 }
2625
2626 /* nothing left in the prefetch buffer */
2627
2628 if (left > recv_max_prefetch) {
2629 /* this was a large read, we don't prefetch for these */
2630 ssize_t ret = do_recv(buf, left, flags );
2631 if (ret < 0) {
2632 if (total_recv > 0)
2633 return total_recv;
2634 return ret;
2635 }
2636 total_recv += ret;
2637 return total_recv;
2638 }
2639
2640
2641 ssize_t got = do_recv(recv_buf, recv_max_prefetch, flags);
2642 if (got < 0) {
2643 if (total_recv > 0)
2644 return total_recv;
2645
2646 return got;
2647 }
2648
2649 recv_len = (size_t)got;
2650 got = MIN(left, (size_t)got);
2651 memcpy(buf, recv_buf, got);
2652 recv_ofs = got;
2653 total_recv += got;
2654 return total_recv;
2655}
2656
2657ssize_t Pipe::tcp_read_nonblocking(char *buf, unsigned len)
2658{
2659 ssize_t got = buffered_recv(buf, len, MSG_DONTWAIT );
2660 if (got < 0) {
2661 ldout(msgr->cct, 10) << __func__ << " socket " << sd << " returned "
2662 << got << " " << cpp_strerror(errno) << dendl;
2663 return -1;
2664 }
2665 if (got == 0) {
2666 /* poll() said there was data, but we didn't read any - peer
2667 * sent a FIN. Maybe POLLRDHUP signals this, but this is
2668 * standard socket behavior as documented by Stevens.
2669 */
2670 return -1;
2671 }
2672 return got;
2673}
2674
2675int Pipe::tcp_write(const char *buf, unsigned len)
2676{
2677 if (sd < 0)
2678 return -1;
2679 struct pollfd pfd;
2680 pfd.fd = sd;
2681 pfd.events = POLLOUT | POLLHUP | POLLNVAL | POLLERR;
2682#if defined(__linux__)
2683 pfd.events |= POLLRDHUP;
2684#endif
2685
2686 if (msgr->cct->_conf->ms_inject_socket_failures && sd >= 0) {
2687 if (rand() % msgr->cct->_conf->ms_inject_socket_failures == 0) {
2688 ldout(msgr->cct, 0) << "injecting socket failure" << dendl;
2689 ::shutdown(sd, SHUT_RDWR);
2690 }
2691 }
2692
2693 if (poll(&pfd, 1, -1) < 0)
2694 return -1;
2695
2696 if (!(pfd.revents & POLLOUT))
2697 return -1;
2698
2699 //lgeneric_dout(cct, DBL) << "tcp_write writing " << len << dendl;
2700 assert(len > 0);
7c673cae 2701 while (len > 0) {
3efd9988
FG
2702 MSGR_SIGPIPE_STOPPER;
2703 int did = ::send( sd, buf, len, MSG_NOSIGNAL );
7c673cae
FG
2704 if (did < 0) {
2705 //lgeneric_dout(cct, 1) << "tcp_write error did = " << did << " " << cpp_strerror(errno) << dendl;
2706 //lgeneric_derr(cct, 1) << "tcp_write error did = " << did << " " << cpp_strerror(errno) << dendl;
2707 return did;
2708 }
2709 len -= did;
2710 buf += did;
2711 //lgeneric_dout(cct, DBL) << "tcp_write did " << did << ", " << len << " left" << dendl;
2712 }
7c673cae
FG
2713 return 0;
2714}