]> git.proxmox.com Git - ceph.git/blob - ceph/src/msg/simple/Accepter.cc
16e6dfbfa31b00cf24906c2bd9b88614044269e4
[ceph.git] / ceph / src / msg / simple / Accepter.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #include "include/compat.h"
16 #include <sys/socket.h>
17 #include <netinet/tcp.h>
18 #include <sys/uio.h>
19 #include <limits.h>
20 #include <poll.h>
21
22 #include "msg/msg_types.h"
23 #include "msg/Message.h"
24
25 #include "Accepter.h"
26 #include "Pipe.h"
27 #include "SimpleMessenger.h"
28
29 #include "common/debug.h"
30 #include "common/errno.h"
31 #include "common/safe_io.h"
32
33 #define dout_subsys ceph_subsys_ms
34
35 #undef dout_prefix
36 #define dout_prefix *_dout << "accepter."
37
38
39 /********************************************
40 * Accepter
41 */
42
43 static int set_close_on_exec(int fd)
44 {
45 int flags = fcntl(fd, F_GETFD, 0);
46 if (flags < 0) {
47 return errno;
48 }
49 if (fcntl(fd, F_SETFD, flags | FD_CLOEXEC)) {
50 return errno;
51 }
52 return 0;
53 }
54
55 int Accepter::create_selfpipe(int *pipe_rd, int *pipe_wr) {
56 int selfpipe[2];
57 int ret = ::pipe2(selfpipe, (O_CLOEXEC|O_NONBLOCK));
58 if (ret < 0 ) {
59 lderr(msgr->cct) << __func__ << " unable to create the selfpipe: "
60 << cpp_strerror(errno) << dendl;
61 return -errno;
62 }
63 *pipe_rd = selfpipe[0];
64 *pipe_wr = selfpipe[1];
65 return 0;
66 }
67
68 int Accepter::bind(const entity_addr_t &bind_addr, const set<int>& avoid_ports)
69 {
70 const md_config_t *conf = msgr->cct->_conf;
71 // bind to a socket
72 ldout(msgr->cct,10) << __func__ << dendl;
73
74 int family;
75 switch (bind_addr.get_family()) {
76 case AF_INET:
77 case AF_INET6:
78 family = bind_addr.get_family();
79 break;
80
81 default:
82 // bind_addr is empty
83 family = conf->ms_bind_ipv6 ? AF_INET6 : AF_INET;
84 }
85
86 /* socket creation */
87 listen_sd = ::socket(family, SOCK_STREAM, 0);
88 ldout(msgr->cct,10) << __func__ << " socket sd: " << listen_sd << dendl;
89 if (listen_sd < 0) {
90 lderr(msgr->cct) << __func__ << " unable to create socket: "
91 << cpp_strerror(errno) << dendl;
92 return -errno;
93 }
94
95 if (set_close_on_exec(listen_sd)) {
96 lderr(msgr->cct) << __func__ << " unable to set_close_exec(): "
97 << cpp_strerror(errno) << dendl;
98 }
99
100
101 // use whatever user specified (if anything)
102 entity_addr_t listen_addr = bind_addr;
103 if (listen_addr.get_type() == entity_addr_t::TYPE_NONE) {
104 listen_addr.set_type(entity_addr_t::TYPE_LEGACY);
105 }
106 listen_addr.set_family(family);
107
108 /* bind to port */
109 int rc = -1;
110 int r = -1;
111
112 for (int i = 0; i < conf->ms_bind_retry_count; i++) {
113
114 if (i > 0) {
115 lderr(msgr->cct) << __func__ << " was unable to bind. Trying again in "
116 << conf->ms_bind_retry_delay << " seconds " << dendl;
117 sleep(conf->ms_bind_retry_delay);
118 }
119
120 if (listen_addr.get_port()) {
121 // specific port
122
123 // reuse addr+port when possible
124 int on = 1;
125 rc = ::setsockopt(listen_sd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
126 if (rc < 0) {
127 lderr(msgr->cct) << __func__ << " unable to setsockopt: "
128 << cpp_strerror(errno) << dendl;
129 r = -errno;
130 continue;
131 }
132
133 rc = ::bind(listen_sd, listen_addr.get_sockaddr(),
134 listen_addr.get_sockaddr_len());
135 if (rc < 0) {
136 lderr(msgr->cct) << __func__ << " unable to bind to " << listen_addr
137 << ": " << cpp_strerror(errno) << dendl;
138 r = -errno;
139 continue;
140 }
141 } else {
142 // try a range of ports
143 for (int port = msgr->cct->_conf->ms_bind_port_min;
144 port <= msgr->cct->_conf->ms_bind_port_max; port++) {
145 if (avoid_ports.count(port))
146 continue;
147
148 listen_addr.set_port(port);
149 rc = ::bind(listen_sd, listen_addr.get_sockaddr(),
150 listen_addr.get_sockaddr_len());
151 if (rc == 0)
152 break;
153 }
154 if (rc < 0) {
155 lderr(msgr->cct) << __func__ << " unable to bind to " << listen_addr
156 << " on any port in range " << msgr->cct->_conf->ms_bind_port_min
157 << "-" << msgr->cct->_conf->ms_bind_port_max
158 << ": " << cpp_strerror(errno)
159 << dendl;
160 r = -errno;
161 // Clear port before retry, otherwise we shall fail again.
162 listen_addr.set_port(0);
163 continue;
164 }
165 ldout(msgr->cct,10) << __func__ << " bound on random port "
166 << listen_addr << dendl;
167 }
168
169 if (rc == 0)
170 break;
171 }
172
173 // It seems that binding completely failed, return with that exit status
174 if (rc < 0) {
175 lderr(msgr->cct) << __func__ << " was unable to bind after "
176 << conf->ms_bind_retry_count << " attempts: "
177 << cpp_strerror(errno) << dendl;
178 ::close(listen_sd);
179 listen_sd = -1;
180 return r;
181 }
182
183 // what port did we get?
184 sockaddr_storage ss;
185 socklen_t llen = sizeof(ss);
186 rc = getsockname(listen_sd, (sockaddr*)&ss, &llen);
187 if (rc < 0) {
188 rc = -errno;
189 lderr(msgr->cct) << __func__ << " failed getsockname: "
190 << cpp_strerror(rc) << dendl;
191 ::close(listen_sd);
192 listen_sd = -1;
193 return rc;
194 }
195 listen_addr.set_sockaddr((sockaddr*)&ss);
196
197 if (msgr->cct->_conf->ms_tcp_rcvbuf) {
198 int size = msgr->cct->_conf->ms_tcp_rcvbuf;
199 rc = ::setsockopt(listen_sd, SOL_SOCKET, SO_RCVBUF,
200 (void*)&size, sizeof(size));
201 if (rc < 0) {
202 rc = -errno;
203 lderr(msgr->cct) << __func__ << " failed to set SO_RCVBUF to "
204 << size << ": " << cpp_strerror(rc) << dendl;
205 ::close(listen_sd);
206 listen_sd = -1;
207 return rc;
208 }
209 }
210
211 ldout(msgr->cct,10) << __func__ << " bound to " << listen_addr << dendl;
212
213 // listen!
214 rc = ::listen(listen_sd, 128);
215 if (rc < 0) {
216 rc = -errno;
217 lderr(msgr->cct) << __func__ << " unable to listen on " << listen_addr
218 << ": " << cpp_strerror(rc) << dendl;
219 ::close(listen_sd);
220 listen_sd = -1;
221 return rc;
222 }
223
224 msgr->set_myaddr(bind_addr);
225 if (bind_addr != entity_addr_t())
226 msgr->learned_addr(bind_addr);
227 else
228 assert(msgr->get_need_addr()); // should still be true.
229
230 if (msgr->get_myaddr().get_port() == 0) {
231 msgr->set_myaddr(listen_addr);
232 }
233 entity_addr_t addr = msgr->get_myaddr();
234 addr.nonce = nonce;
235 msgr->set_myaddr(addr);
236
237 msgr->init_local_connection();
238
239 rc = create_selfpipe(&shutdown_rd_fd, &shutdown_wr_fd);
240 if (rc < 0) {
241 lderr(msgr->cct) << __func__ << " unable to create signalling pipe " << listen_addr
242 << ": " << cpp_strerror(rc) << dendl;
243 return rc;
244 }
245
246 ldout(msgr->cct,1) << __func__ << " my_inst.addr is " << msgr->get_myaddr()
247 << " need_addr=" << msgr->get_need_addr() << dendl;
248 return 0;
249 }
250
251 int Accepter::rebind(const set<int>& avoid_ports)
252 {
253 ldout(msgr->cct,1) << __func__ << " avoid " << avoid_ports << dendl;
254
255 entity_addr_t addr = msgr->get_myaddr();
256 set<int> new_avoid = avoid_ports;
257 new_avoid.insert(addr.get_port());
258 addr.set_port(0);
259
260 // adjust the nonce; we want our entity_addr_t to be truly unique.
261 nonce += 1000000;
262 msgr->my_inst.addr.nonce = nonce;
263 ldout(msgr->cct,10) << __func__ << " new nonce " << nonce << " and inst "
264 << msgr->my_inst << dendl;
265
266 ldout(msgr->cct,10) << " will try " << addr << " and avoid ports " << new_avoid << dendl;
267 int r = bind(addr, new_avoid);
268 if (r == 0)
269 start();
270 return r;
271 }
272
273 int Accepter::start()
274 {
275 ldout(msgr->cct,1) << __func__ << dendl;
276
277 // start thread
278 create("ms_accepter");
279
280 return 0;
281 }
282
283 void *Accepter::entry()
284 {
285 ldout(msgr->cct,1) << __func__ << " start" << dendl;
286
287 int errors = 0;
288 int ch;
289
290 struct pollfd pfd[2];
291 memset(pfd, 0, sizeof(pfd));
292
293 pfd[0].fd = listen_sd;
294 pfd[0].events = POLLIN | POLLERR | POLLNVAL | POLLHUP;
295 pfd[1].fd = shutdown_rd_fd;
296 pfd[1].events = POLLIN | POLLERR | POLLNVAL | POLLHUP;
297 while (!done) {
298 ldout(msgr->cct,20) << __func__ << " calling poll for sd:" << listen_sd << dendl;
299 int r = poll(pfd, 2, -1);
300 if (r < 0) {
301 if (errno == EINTR) {
302 continue;
303 }
304 ldout(msgr->cct,1) << __func__ << " poll got error"
305 << " errno " << errno << " " << cpp_strerror(errno) << dendl;
306 break;
307 }
308 ldout(msgr->cct,10) << __func__ << " poll returned oke: " << r << dendl;
309 ldout(msgr->cct,20) << __func__ << " pfd.revents[0]=" << pfd[0].revents << dendl;
310 ldout(msgr->cct,20) << __func__ << " pfd.revents[1]=" << pfd[1].revents << dendl;
311
312 if (pfd[0].revents & (POLLERR | POLLNVAL | POLLHUP)) {
313 ldout(msgr->cct,1) << __func__ << " poll got errors in revents "
314 << pfd[0].revents << dendl;
315 break;
316 }
317 if (pfd[1].revents & (POLLIN | POLLERR | POLLNVAL | POLLHUP)) {
318 // We got "signaled" to exit the poll
319 // clean the selfpipe
320 if (::read(shutdown_rd_fd, &ch, 1) == -1) {
321 if (errno != EAGAIN)
322 ldout(msgr->cct,1) << __func__ << " Cannot read selfpipe: "
323 << " errno " << errno << " " << cpp_strerror(errno) << dendl;
324 }
325 break;
326 }
327 if (done) break;
328
329 // accept
330 sockaddr_storage ss;
331 socklen_t slen = sizeof(ss);
332 int sd = ::accept(listen_sd, (sockaddr*)&ss, &slen);
333 if (sd >= 0) {
334 int r = set_close_on_exec(sd);
335 if (r) {
336 ldout(msgr->cct,1) << __func__ << " set_close_on_exec() failed "
337 << cpp_strerror(r) << dendl;
338 }
339 errors = 0;
340 ldout(msgr->cct,10) << __func__ << " incoming on sd " << sd << dendl;
341
342 msgr->add_accept_pipe(sd);
343 } else {
344 ldout(msgr->cct,0) << __func__ << " no incoming connection? sd = " << sd
345 << " errno " << errno << " " << cpp_strerror(errno) << dendl;
346 if (++errors > 4)
347 break;
348 }
349 }
350
351 ldout(msgr->cct,20) << __func__ << " closing" << dendl;
352 // socket is closed right after the thread has joined.
353 // closing it here might race
354 if (shutdown_rd_fd >= 0) {
355 ::close(shutdown_rd_fd);
356 shutdown_rd_fd = -1;
357 }
358
359 ldout(msgr->cct,10) << __func__ << " stopping" << dendl;
360 return 0;
361 }
362
363 void Accepter::stop()
364 {
365 done = true;
366 ldout(msgr->cct,10) << __func__ << " accept listening on: " << listen_sd << dendl;
367
368 if (shutdown_wr_fd < 0)
369 return;
370
371 // Send a byte to the shutdown pipe that the thread is listening to
372 char buf[1] = { 0x0 };
373 int ret = safe_write(shutdown_wr_fd, buf, 1);
374 if (ret < 0) {
375 ldout(msgr->cct,1) << __func__ << "close failed: "
376 << " errno " << errno << " " << cpp_strerror(errno) << dendl;
377 } else {
378 ldout(msgr->cct,15) << __func__ << " signaled poll" << dendl;
379 }
380 VOID_TEMP_FAILURE_RETRY(close(shutdown_wr_fd));
381 shutdown_wr_fd = -1;
382
383 // wait for thread to stop before closing the socket, to avoid
384 // racing against fd re-use.
385 if (is_started()) {
386 ldout(msgr->cct,5) << __func__ << " wait for thread to join." << dendl;
387 join();
388 }
389
390 if (listen_sd >= 0) {
391 if (::close(listen_sd) < 0) {
392 ldout(msgr->cct,1) << __func__ << "close listen_sd failed: "
393 << " errno " << errno << " " << cpp_strerror(errno) << dendl;
394 }
395 listen_sd = -1;
396 }
397 if (shutdown_rd_fd >= 0) {
398 if (::close(shutdown_rd_fd) < 0) {
399 ldout(msgr->cct,1) << __func__ << "close shutdown_rd_fd failed: "
400 << " errno " << errno << " " << cpp_strerror(errno) << dendl;
401 }
402 shutdown_rd_fd = -1;
403 }
404 done = false;
405 }
406
407
408
409