1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
15 #include "include/compat.h"
16 #include "include/sock_compat.h"
17 #include <sys/socket.h>
18 #include <netinet/tcp.h>
23 #include "msg/msg_types.h"
24 #include "msg/Message.h"
28 #include "SimpleMessenger.h"
30 #include "common/debug.h"
31 #include "common/errno.h"
32 #include "common/safe_io.h"
34 #define dout_subsys ceph_subsys_ms
37 #define dout_prefix *_dout << "accepter."
40 /********************************************
44 int Accepter::create_selfpipe(int *pipe_rd
, int *pipe_wr
) {
46 if (pipe_cloexec(selfpipe
) < 0) {
48 lderr(msgr
->cct
) << __func__
<< " unable to create the selfpipe: "
49 << cpp_strerror(e
) << dendl
;
52 for (size_t i
= 0; i
< 2; i
++) {
53 int rc
= fcntl(selfpipe
[i
], F_GETFL
);
55 rc
= fcntl(selfpipe
[i
], F_SETFL
, rc
| O_NONBLOCK
);
58 *pipe_rd
= selfpipe
[0];
59 *pipe_wr
= selfpipe
[1];
63 int Accepter::bind(const entity_addr_t
&bind_addr
, const set
<int>& avoid_ports
)
65 const md_config_t
*conf
= msgr
->cct
->_conf
;
67 ldout(msgr
->cct
,10) << __func__
<< dendl
;
70 switch (bind_addr
.get_family()) {
73 family
= bind_addr
.get_family();
78 family
= conf
->ms_bind_ipv6
? AF_INET6
: AF_INET
;
82 listen_sd
= socket_cloexec(family
, SOCK_STREAM
, 0);
85 lderr(msgr
->cct
) << __func__
<< " unable to create socket: "
86 << cpp_strerror(e
) << dendl
;
89 ldout(msgr
->cct
,10) << __func__
<< " socket sd: " << listen_sd
<< dendl
;
91 // use whatever user specified (if anything)
92 entity_addr_t listen_addr
= bind_addr
;
93 if (listen_addr
.get_type() == entity_addr_t::TYPE_NONE
) {
94 listen_addr
.set_type(entity_addr_t::TYPE_LEGACY
);
96 listen_addr
.set_family(family
);
102 for (int i
= 0; i
< conf
->ms_bind_retry_count
; i
++) {
105 lderr(msgr
->cct
) << __func__
<< " was unable to bind. Trying again in "
106 << conf
->ms_bind_retry_delay
<< " seconds " << dendl
;
107 sleep(conf
->ms_bind_retry_delay
);
110 if (listen_addr
.get_port()) {
113 // reuse addr+port when possible
115 rc
= ::setsockopt(listen_sd
, SOL_SOCKET
, SO_REUSEADDR
, &on
, sizeof(on
));
117 lderr(msgr
->cct
) << __func__
<< " unable to setsockopt: "
118 << cpp_strerror(errno
) << dendl
;
123 rc
= ::bind(listen_sd
, listen_addr
.get_sockaddr(),
124 listen_addr
.get_sockaddr_len());
126 lderr(msgr
->cct
) << __func__
<< " unable to bind to " << listen_addr
127 << ": " << cpp_strerror(errno
) << dendl
;
132 // try a range of ports
133 for (int port
= msgr
->cct
->_conf
->ms_bind_port_min
;
134 port
<= msgr
->cct
->_conf
->ms_bind_port_max
; port
++) {
135 if (avoid_ports
.count(port
))
138 listen_addr
.set_port(port
);
139 rc
= ::bind(listen_sd
, listen_addr
.get_sockaddr(),
140 listen_addr
.get_sockaddr_len());
145 lderr(msgr
->cct
) << __func__
<< " unable to bind to " << listen_addr
146 << " on any port in range " << msgr
->cct
->_conf
->ms_bind_port_min
147 << "-" << msgr
->cct
->_conf
->ms_bind_port_max
148 << ": " << cpp_strerror(errno
)
151 // Clear port before retry, otherwise we shall fail again.
152 listen_addr
.set_port(0);
155 ldout(msgr
->cct
,10) << __func__
<< " bound on random port "
156 << listen_addr
<< dendl
;
163 // It seems that binding completely failed, return with that exit status
165 lderr(msgr
->cct
) << __func__
<< " was unable to bind after "
166 << conf
->ms_bind_retry_count
<< " attempts: "
167 << cpp_strerror(errno
) << dendl
;
173 // what port did we get?
175 socklen_t llen
= sizeof(ss
);
176 rc
= getsockname(listen_sd
, (sockaddr
*)&ss
, &llen
);
179 lderr(msgr
->cct
) << __func__
<< " failed getsockname: "
180 << cpp_strerror(rc
) << dendl
;
185 listen_addr
.set_sockaddr((sockaddr
*)&ss
);
187 if (msgr
->cct
->_conf
->ms_tcp_rcvbuf
) {
188 int size
= msgr
->cct
->_conf
->ms_tcp_rcvbuf
;
189 rc
= ::setsockopt(listen_sd
, SOL_SOCKET
, SO_RCVBUF
,
190 (void*)&size
, sizeof(size
));
193 lderr(msgr
->cct
) << __func__
<< " failed to set SO_RCVBUF to "
194 << size
<< ": " << cpp_strerror(rc
) << dendl
;
201 ldout(msgr
->cct
,10) << __func__
<< " bound to " << listen_addr
<< dendl
;
204 rc
= ::listen(listen_sd
, msgr
->cct
->_conf
->ms_tcp_listen_backlog
);
207 lderr(msgr
->cct
) << __func__
<< " unable to listen on " << listen_addr
208 << ": " << cpp_strerror(rc
) << dendl
;
214 msgr
->set_myaddr(bind_addr
);
215 if (bind_addr
!= entity_addr_t())
216 msgr
->learned_addr(bind_addr
);
218 assert(msgr
->get_need_addr()); // should still be true.
220 if (msgr
->get_myaddr().get_port() == 0) {
221 msgr
->set_myaddr(listen_addr
);
223 entity_addr_t addr
= msgr
->get_myaddr();
225 msgr
->set_myaddr(addr
);
227 msgr
->init_local_connection();
229 rc
= create_selfpipe(&shutdown_rd_fd
, &shutdown_wr_fd
);
231 lderr(msgr
->cct
) << __func__
<< " unable to create signalling pipe " << listen_addr
232 << ": " << cpp_strerror(rc
) << dendl
;
236 ldout(msgr
->cct
,1) << __func__
<< " my_inst.addr is " << msgr
->get_myaddr()
237 << " need_addr=" << msgr
->get_need_addr() << dendl
;
241 int Accepter::rebind(const set
<int>& avoid_ports
)
243 ldout(msgr
->cct
,1) << __func__
<< " avoid " << avoid_ports
<< dendl
;
245 entity_addr_t addr
= msgr
->get_myaddr();
246 set
<int> new_avoid
= avoid_ports
;
247 new_avoid
.insert(addr
.get_port());
250 // adjust the nonce; we want our entity_addr_t to be truly unique.
252 msgr
->my_inst
.addr
.nonce
= nonce
;
253 ldout(msgr
->cct
,10) << __func__
<< " new nonce " << nonce
<< " and inst "
254 << msgr
->my_inst
<< dendl
;
256 ldout(msgr
->cct
,10) << " will try " << addr
<< " and avoid ports " << new_avoid
<< dendl
;
257 int r
= bind(addr
, new_avoid
);
263 int Accepter::start()
265 ldout(msgr
->cct
,1) << __func__
<< dendl
;
268 create("ms_accepter");
273 void *Accepter::entry()
275 ldout(msgr
->cct
,1) << __func__
<< " start" << dendl
;
280 struct pollfd pfd
[2];
281 memset(pfd
, 0, sizeof(pfd
));
283 pfd
[0].fd
= listen_sd
;
284 pfd
[0].events
= POLLIN
| POLLERR
| POLLNVAL
| POLLHUP
;
285 pfd
[1].fd
= shutdown_rd_fd
;
286 pfd
[1].events
= POLLIN
| POLLERR
| POLLNVAL
| POLLHUP
;
288 ldout(msgr
->cct
,20) << __func__
<< " calling poll for sd:" << listen_sd
<< dendl
;
289 int r
= poll(pfd
, 2, -1);
291 if (errno
== EINTR
) {
294 ldout(msgr
->cct
,1) << __func__
<< " poll got error"
295 << " errno " << errno
<< " " << cpp_strerror(errno
) << dendl
;
298 ldout(msgr
->cct
,10) << __func__
<< " poll returned oke: " << r
<< dendl
;
299 ldout(msgr
->cct
,20) << __func__
<< " pfd.revents[0]=" << pfd
[0].revents
<< dendl
;
300 ldout(msgr
->cct
,20) << __func__
<< " pfd.revents[1]=" << pfd
[1].revents
<< dendl
;
302 if (pfd
[0].revents
& (POLLERR
| POLLNVAL
| POLLHUP
)) {
303 ldout(msgr
->cct
,1) << __func__
<< " poll got errors in revents "
304 << pfd
[0].revents
<< dendl
;
307 if (pfd
[1].revents
& (POLLIN
| POLLERR
| POLLNVAL
| POLLHUP
)) {
308 // We got "signaled" to exit the poll
309 // clean the selfpipe
310 if (::read(shutdown_rd_fd
, &ch
, 1) == -1) {
312 ldout(msgr
->cct
,1) << __func__
<< " Cannot read selfpipe: "
313 << " errno " << errno
<< " " << cpp_strerror(errno
) << dendl
;
321 socklen_t slen
= sizeof(ss
);
322 int sd
= accept_cloexec(listen_sd
, (sockaddr
*)&ss
, &slen
);
325 ldout(msgr
->cct
,10) << __func__
<< " incoming on sd " << sd
<< dendl
;
327 msgr
->add_accept_pipe(sd
);
330 ldout(msgr
->cct
,0) << __func__
<< " no incoming connection? sd = " << sd
331 << " errno " << e
<< " " << cpp_strerror(e
) << dendl
;
332 if (++errors
> msgr
->cct
->_conf
->ms_max_accept_failures
) {
333 lderr(msgr
->cct
) << "accetper has encoutered enough errors, just do ceph_abort()." << dendl
;
339 ldout(msgr
->cct
,20) << __func__
<< " closing" << dendl
;
340 // socket is closed right after the thread has joined.
341 // closing it here might race
342 if (shutdown_rd_fd
>= 0) {
343 ::close(shutdown_rd_fd
);
347 ldout(msgr
->cct
,10) << __func__
<< " stopping" << dendl
;
351 void Accepter::stop()
354 ldout(msgr
->cct
,10) << __func__
<< " accept listening on: " << listen_sd
<< dendl
;
356 if (shutdown_wr_fd
< 0)
359 // Send a byte to the shutdown pipe that the thread is listening to
360 char buf
[1] = { 0x0 };
361 int ret
= safe_write(shutdown_wr_fd
, buf
, 1);
363 ldout(msgr
->cct
,1) << __func__
<< "close failed: "
364 << " errno " << errno
<< " " << cpp_strerror(errno
) << dendl
;
366 ldout(msgr
->cct
,15) << __func__
<< " signaled poll" << dendl
;
368 VOID_TEMP_FAILURE_RETRY(close(shutdown_wr_fd
));
371 // wait for thread to stop before closing the socket, to avoid
372 // racing against fd re-use.
374 ldout(msgr
->cct
,5) << __func__
<< " wait for thread to join." << dendl
;
378 if (listen_sd
>= 0) {
379 if (::close(listen_sd
) < 0) {
380 ldout(msgr
->cct
,1) << __func__
<< "close listen_sd failed: "
381 << " errno " << errno
<< " " << cpp_strerror(errno
) << dendl
;
385 if (shutdown_rd_fd
>= 0) {
386 if (::close(shutdown_rd_fd
) < 0) {
387 ldout(msgr
->cct
,1) << __func__
<< "close shutdown_rd_fd failed: "
388 << " errno " << errno
<< " " << cpp_strerror(errno
) << dendl
;