1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
15 #include "include/compat.h"
16 #include <sys/socket.h>
17 #include <netinet/tcp.h>
22 #include "msg/msg_types.h"
23 #include "msg/Message.h"
27 #include "SimpleMessenger.h"
29 #include "common/debug.h"
30 #include "common/errno.h"
31 #include "common/safe_io.h"
33 #define dout_subsys ceph_subsys_ms
36 #define dout_prefix *_dout << "accepter."
39 /********************************************
43 static int set_close_on_exec(int fd
)
45 int flags
= fcntl(fd
, F_GETFD
, 0);
49 if (fcntl(fd
, F_SETFD
, flags
| FD_CLOEXEC
)) {
55 int Accepter::create_selfpipe(int *pipe_rd
, int *pipe_wr
) {
57 int ret
= ::pipe2(selfpipe
, (O_CLOEXEC
|O_NONBLOCK
));
59 lderr(msgr
->cct
) << __func__
<< " unable to create the selfpipe: "
60 << cpp_strerror(errno
) << dendl
;
63 *pipe_rd
= selfpipe
[0];
64 *pipe_wr
= selfpipe
[1];
68 int Accepter::bind(const entity_addr_t
&bind_addr
, const set
<int>& avoid_ports
)
70 const md_config_t
*conf
= msgr
->cct
->_conf
;
72 ldout(msgr
->cct
,10) << __func__
<< dendl
;
75 switch (bind_addr
.get_family()) {
78 family
= bind_addr
.get_family();
83 family
= conf
->ms_bind_ipv6
? AF_INET6
: AF_INET
;
87 listen_sd
= ::socket(family
, SOCK_STREAM
, 0);
88 ldout(msgr
->cct
,10) << __func__
<< " socket sd: " << listen_sd
<< dendl
;
90 lderr(msgr
->cct
) << __func__
<< " unable to create socket: "
91 << cpp_strerror(errno
) << dendl
;
95 if (set_close_on_exec(listen_sd
)) {
96 lderr(msgr
->cct
) << __func__
<< " unable to set_close_exec(): "
97 << cpp_strerror(errno
) << dendl
;
101 // use whatever user specified (if anything)
102 entity_addr_t listen_addr
= bind_addr
;
103 if (listen_addr
.get_type() == entity_addr_t::TYPE_NONE
) {
104 listen_addr
.set_type(entity_addr_t::TYPE_LEGACY
);
106 listen_addr
.set_family(family
);
112 for (int i
= 0; i
< conf
->ms_bind_retry_count
; i
++) {
115 lderr(msgr
->cct
) << __func__
<< " was unable to bind. Trying again in "
116 << conf
->ms_bind_retry_delay
<< " seconds " << dendl
;
117 sleep(conf
->ms_bind_retry_delay
);
120 if (listen_addr
.get_port()) {
123 // reuse addr+port when possible
125 rc
= ::setsockopt(listen_sd
, SOL_SOCKET
, SO_REUSEADDR
, &on
, sizeof(on
));
127 lderr(msgr
->cct
) << __func__
<< " unable to setsockopt: "
128 << cpp_strerror(errno
) << dendl
;
133 rc
= ::bind(listen_sd
, listen_addr
.get_sockaddr(),
134 listen_addr
.get_sockaddr_len());
136 lderr(msgr
->cct
) << __func__
<< " unable to bind to " << listen_addr
137 << ": " << cpp_strerror(errno
) << dendl
;
142 // try a range of ports
143 for (int port
= msgr
->cct
->_conf
->ms_bind_port_min
;
144 port
<= msgr
->cct
->_conf
->ms_bind_port_max
; port
++) {
145 if (avoid_ports
.count(port
))
148 listen_addr
.set_port(port
);
149 rc
= ::bind(listen_sd
, listen_addr
.get_sockaddr(),
150 listen_addr
.get_sockaddr_len());
155 lderr(msgr
->cct
) << __func__
<< " unable to bind to " << listen_addr
156 << " on any port in range " << msgr
->cct
->_conf
->ms_bind_port_min
157 << "-" << msgr
->cct
->_conf
->ms_bind_port_max
158 << ": " << cpp_strerror(errno
)
161 // Clear port before retry, otherwise we shall fail again.
162 listen_addr
.set_port(0);
165 ldout(msgr
->cct
,10) << __func__
<< " bound on random port "
166 << listen_addr
<< dendl
;
173 // It seems that binding completely failed, return with that exit status
175 lderr(msgr
->cct
) << __func__
<< " was unable to bind after "
176 << conf
->ms_bind_retry_count
<< " attempts: "
177 << cpp_strerror(errno
) << dendl
;
183 // what port did we get?
185 socklen_t llen
= sizeof(ss
);
186 rc
= getsockname(listen_sd
, (sockaddr
*)&ss
, &llen
);
189 lderr(msgr
->cct
) << __func__
<< " failed getsockname: "
190 << cpp_strerror(rc
) << dendl
;
195 listen_addr
.set_sockaddr((sockaddr
*)&ss
);
197 if (msgr
->cct
->_conf
->ms_tcp_rcvbuf
) {
198 int size
= msgr
->cct
->_conf
->ms_tcp_rcvbuf
;
199 rc
= ::setsockopt(listen_sd
, SOL_SOCKET
, SO_RCVBUF
,
200 (void*)&size
, sizeof(size
));
203 lderr(msgr
->cct
) << __func__
<< " failed to set SO_RCVBUF to "
204 << size
<< ": " << cpp_strerror(rc
) << dendl
;
211 ldout(msgr
->cct
,10) << __func__
<< " bound to " << listen_addr
<< dendl
;
214 rc
= ::listen(listen_sd
, 128);
217 lderr(msgr
->cct
) << __func__
<< " unable to listen on " << listen_addr
218 << ": " << cpp_strerror(rc
) << dendl
;
224 msgr
->set_myaddr(bind_addr
);
225 if (bind_addr
!= entity_addr_t())
226 msgr
->learned_addr(bind_addr
);
228 assert(msgr
->get_need_addr()); // should still be true.
230 if (msgr
->get_myaddr().get_port() == 0) {
231 msgr
->set_myaddr(listen_addr
);
233 entity_addr_t addr
= msgr
->get_myaddr();
235 msgr
->set_myaddr(addr
);
237 msgr
->init_local_connection();
239 rc
= create_selfpipe(&shutdown_rd_fd
, &shutdown_wr_fd
);
241 lderr(msgr
->cct
) << __func__
<< " unable to create signalling pipe " << listen_addr
242 << ": " << cpp_strerror(rc
) << dendl
;
246 ldout(msgr
->cct
,1) << __func__
<< " my_inst.addr is " << msgr
->get_myaddr()
247 << " need_addr=" << msgr
->get_need_addr() << dendl
;
251 int Accepter::rebind(const set
<int>& avoid_ports
)
253 ldout(msgr
->cct
,1) << __func__
<< " avoid " << avoid_ports
<< dendl
;
255 entity_addr_t addr
= msgr
->get_myaddr();
256 set
<int> new_avoid
= avoid_ports
;
257 new_avoid
.insert(addr
.get_port());
260 // adjust the nonce; we want our entity_addr_t to be truly unique.
262 msgr
->my_inst
.addr
.nonce
= nonce
;
263 ldout(msgr
->cct
,10) << __func__
<< " new nonce " << nonce
<< " and inst "
264 << msgr
->my_inst
<< dendl
;
266 ldout(msgr
->cct
,10) << " will try " << addr
<< " and avoid ports " << new_avoid
<< dendl
;
267 int r
= bind(addr
, new_avoid
);
273 int Accepter::start()
275 ldout(msgr
->cct
,1) << __func__
<< dendl
;
278 create("ms_accepter");
283 void *Accepter::entry()
285 ldout(msgr
->cct
,1) << __func__
<< " start" << dendl
;
290 struct pollfd pfd
[2];
291 memset(pfd
, 0, sizeof(pfd
));
293 pfd
[0].fd
= listen_sd
;
294 pfd
[0].events
= POLLIN
| POLLERR
| POLLNVAL
| POLLHUP
;
295 pfd
[1].fd
= shutdown_rd_fd
;
296 pfd
[1].events
= POLLIN
| POLLERR
| POLLNVAL
| POLLHUP
;
298 ldout(msgr
->cct
,20) << __func__
<< " calling poll for sd:" << listen_sd
<< dendl
;
299 int r
= poll(pfd
, 2, -1);
301 if (errno
== EINTR
) {
304 ldout(msgr
->cct
,1) << __func__
<< " poll got error"
305 << " errno " << errno
<< " " << cpp_strerror(errno
) << dendl
;
308 ldout(msgr
->cct
,10) << __func__
<< " poll returned oke: " << r
<< dendl
;
309 ldout(msgr
->cct
,20) << __func__
<< " pfd.revents[0]=" << pfd
[0].revents
<< dendl
;
310 ldout(msgr
->cct
,20) << __func__
<< " pfd.revents[1]=" << pfd
[1].revents
<< dendl
;
312 if (pfd
[0].revents
& (POLLERR
| POLLNVAL
| POLLHUP
)) {
313 ldout(msgr
->cct
,1) << __func__
<< " poll got errors in revents "
314 << pfd
[0].revents
<< dendl
;
317 if (pfd
[1].revents
& (POLLIN
| POLLERR
| POLLNVAL
| POLLHUP
)) {
318 // We got "signaled" to exit the poll
319 // clean the selfpipe
320 if (::read(shutdown_rd_fd
, &ch
, 1) == -1) {
322 ldout(msgr
->cct
,1) << __func__
<< " Cannot read selfpipe: "
323 << " errno " << errno
<< " " << cpp_strerror(errno
) << dendl
;
331 socklen_t slen
= sizeof(ss
);
332 int sd
= ::accept(listen_sd
, (sockaddr
*)&ss
, &slen
);
334 int r
= set_close_on_exec(sd
);
336 ldout(msgr
->cct
,1) << __func__
<< " set_close_on_exec() failed "
337 << cpp_strerror(r
) << dendl
;
340 ldout(msgr
->cct
,10) << __func__
<< " incoming on sd " << sd
<< dendl
;
342 msgr
->add_accept_pipe(sd
);
344 ldout(msgr
->cct
,0) << __func__
<< " no incoming connection? sd = " << sd
345 << " errno " << errno
<< " " << cpp_strerror(errno
) << dendl
;
351 ldout(msgr
->cct
,20) << __func__
<< " closing" << dendl
;
352 // socket is closed right after the thread has joined.
353 // closing it here might race
354 if (shutdown_rd_fd
>= 0) {
355 ::close(shutdown_rd_fd
);
359 ldout(msgr
->cct
,10) << __func__
<< " stopping" << dendl
;
363 void Accepter::stop()
366 ldout(msgr
->cct
,10) << __func__
<< " accept listening on: " << listen_sd
<< dendl
;
368 if (shutdown_wr_fd
< 0)
371 // Send a byte to the shutdown pipe that the thread is listening to
372 char buf
[1] = { 0x0 };
373 int ret
= safe_write(shutdown_wr_fd
, buf
, 1);
375 ldout(msgr
->cct
,1) << __func__
<< "close failed: "
376 << " errno " << errno
<< " " << cpp_strerror(errno
) << dendl
;
378 ldout(msgr
->cct
,15) << __func__
<< " signaled poll" << dendl
;
380 VOID_TEMP_FAILURE_RETRY(close(shutdown_wr_fd
));
383 // wait for thread to stop before closing the socket, to avoid
384 // racing against fd re-use.
386 ldout(msgr
->cct
,5) << __func__
<< " wait for thread to join." << dendl
;
390 if (listen_sd
>= 0) {
391 if (::close(listen_sd
) < 0) {
392 ldout(msgr
->cct
,1) << __func__
<< "close listen_sd failed: "
393 << " errno " << errno
<< " " << cpp_strerror(errno
) << dendl
;
397 if (shutdown_rd_fd
>= 0) {
398 if (::close(shutdown_rd_fd
) < 0) {
399 ldout(msgr
->cct
,1) << __func__
<< "close shutdown_rd_fd failed: "
400 << " errno " << errno
<< " " << cpp_strerror(errno
) << dendl
;