]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | /* | |
4 | * Ceph - scalable distributed file system | |
5 | * | |
6 | * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net> | |
7 | * | |
8 | * This is free software; you can redistribute it and/or | |
9 | * modify it under the terms of the GNU Lesser General Public | |
10 | * License version 2.1, as published by the Free Software | |
11 | * Foundation. See file COPYING. | |
12 | * | |
13 | */ | |
14 | ||
15 | #include "include/compat.h" | |
16 | #include <sys/socket.h> | |
17 | #include <netinet/tcp.h> | |
18 | #include <sys/uio.h> | |
19 | #include <limits.h> | |
20 | #include <poll.h> | |
21 | ||
22 | #include "msg/msg_types.h" | |
23 | #include "msg/Message.h" | |
24 | ||
25 | #include "Accepter.h" | |
26 | #include "Pipe.h" | |
27 | #include "SimpleMessenger.h" | |
28 | ||
29 | #include "common/debug.h" | |
30 | #include "common/errno.h" | |
31 | #include "common/safe_io.h" | |
32 | ||
33 | #define dout_subsys ceph_subsys_ms | |
34 | ||
35 | #undef dout_prefix | |
36 | #define dout_prefix *_dout << "accepter." | |
37 | ||
38 | ||
39 | /******************************************** | |
40 | * Accepter | |
41 | */ | |
42 | ||
43 | static int set_close_on_exec(int fd) | |
44 | { | |
45 | int flags = fcntl(fd, F_GETFD, 0); | |
46 | if (flags < 0) { | |
47 | return errno; | |
48 | } | |
49 | if (fcntl(fd, F_SETFD, flags | FD_CLOEXEC)) { | |
50 | return errno; | |
51 | } | |
52 | return 0; | |
53 | } | |
54 | ||
55 | int Accepter::create_selfpipe(int *pipe_rd, int *pipe_wr) { | |
56 | int selfpipe[2]; | |
57 | int ret = ::pipe2(selfpipe, (O_CLOEXEC|O_NONBLOCK)); | |
58 | if (ret < 0 ) { | |
59 | lderr(msgr->cct) << __func__ << " unable to create the selfpipe: " | |
60 | << cpp_strerror(errno) << dendl; | |
61 | return -errno; | |
62 | } | |
63 | *pipe_rd = selfpipe[0]; | |
64 | *pipe_wr = selfpipe[1]; | |
65 | return 0; | |
66 | } | |
67 | ||
68 | int Accepter::bind(const entity_addr_t &bind_addr, const set<int>& avoid_ports) | |
69 | { | |
70 | const md_config_t *conf = msgr->cct->_conf; | |
71 | // bind to a socket | |
72 | ldout(msgr->cct,10) << __func__ << dendl; | |
73 | ||
74 | int family; | |
75 | switch (bind_addr.get_family()) { | |
76 | case AF_INET: | |
77 | case AF_INET6: | |
78 | family = bind_addr.get_family(); | |
79 | break; | |
80 | ||
81 | default: | |
82 | // bind_addr is empty | |
83 | family = conf->ms_bind_ipv6 ? AF_INET6 : AF_INET; | |
84 | } | |
85 | ||
86 | /* socket creation */ | |
87 | listen_sd = ::socket(family, SOCK_STREAM, 0); | |
88 | ldout(msgr->cct,10) << __func__ << " socket sd: " << listen_sd << dendl; | |
89 | if (listen_sd < 0) { | |
90 | lderr(msgr->cct) << __func__ << " unable to create socket: " | |
91 | << cpp_strerror(errno) << dendl; | |
92 | return -errno; | |
93 | } | |
94 | ||
95 | if (set_close_on_exec(listen_sd)) { | |
96 | lderr(msgr->cct) << __func__ << " unable to set_close_exec(): " | |
97 | << cpp_strerror(errno) << dendl; | |
98 | } | |
99 | ||
100 | ||
101 | // use whatever user specified (if anything) | |
102 | entity_addr_t listen_addr = bind_addr; | |
103 | if (listen_addr.get_type() == entity_addr_t::TYPE_NONE) { | |
104 | listen_addr.set_type(entity_addr_t::TYPE_LEGACY); | |
105 | } | |
106 | listen_addr.set_family(family); | |
107 | ||
108 | /* bind to port */ | |
109 | int rc = -1; | |
110 | int r = -1; | |
111 | ||
112 | for (int i = 0; i < conf->ms_bind_retry_count; i++) { | |
113 | ||
114 | if (i > 0) { | |
115 | lderr(msgr->cct) << __func__ << " was unable to bind. Trying again in " | |
116 | << conf->ms_bind_retry_delay << " seconds " << dendl; | |
117 | sleep(conf->ms_bind_retry_delay); | |
118 | } | |
119 | ||
120 | if (listen_addr.get_port()) { | |
121 | // specific port | |
122 | ||
123 | // reuse addr+port when possible | |
124 | int on = 1; | |
125 | rc = ::setsockopt(listen_sd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)); | |
126 | if (rc < 0) { | |
127 | lderr(msgr->cct) << __func__ << " unable to setsockopt: " | |
128 | << cpp_strerror(errno) << dendl; | |
129 | r = -errno; | |
130 | continue; | |
131 | } | |
132 | ||
133 | rc = ::bind(listen_sd, listen_addr.get_sockaddr(), | |
134 | listen_addr.get_sockaddr_len()); | |
135 | if (rc < 0) { | |
136 | lderr(msgr->cct) << __func__ << " unable to bind to " << listen_addr | |
137 | << ": " << cpp_strerror(errno) << dendl; | |
138 | r = -errno; | |
139 | continue; | |
140 | } | |
141 | } else { | |
142 | // try a range of ports | |
143 | for (int port = msgr->cct->_conf->ms_bind_port_min; | |
144 | port <= msgr->cct->_conf->ms_bind_port_max; port++) { | |
145 | if (avoid_ports.count(port)) | |
146 | continue; | |
147 | ||
148 | listen_addr.set_port(port); | |
149 | rc = ::bind(listen_sd, listen_addr.get_sockaddr(), | |
150 | listen_addr.get_sockaddr_len()); | |
151 | if (rc == 0) | |
152 | break; | |
153 | } | |
154 | if (rc < 0) { | |
155 | lderr(msgr->cct) << __func__ << " unable to bind to " << listen_addr | |
156 | << " on any port in range " << msgr->cct->_conf->ms_bind_port_min | |
157 | << "-" << msgr->cct->_conf->ms_bind_port_max | |
158 | << ": " << cpp_strerror(errno) | |
159 | << dendl; | |
160 | r = -errno; | |
161 | // Clear port before retry, otherwise we shall fail again. | |
162 | listen_addr.set_port(0); | |
163 | continue; | |
164 | } | |
165 | ldout(msgr->cct,10) << __func__ << " bound on random port " | |
166 | << listen_addr << dendl; | |
167 | } | |
168 | ||
169 | if (rc == 0) | |
170 | break; | |
171 | } | |
172 | ||
173 | // It seems that binding completely failed, return with that exit status | |
174 | if (rc < 0) { | |
175 | lderr(msgr->cct) << __func__ << " was unable to bind after " | |
176 | << conf->ms_bind_retry_count << " attempts: " | |
177 | << cpp_strerror(errno) << dendl; | |
178 | ::close(listen_sd); | |
179 | listen_sd = -1; | |
180 | return r; | |
181 | } | |
182 | ||
183 | // what port did we get? | |
184 | sockaddr_storage ss; | |
185 | socklen_t llen = sizeof(ss); | |
186 | rc = getsockname(listen_sd, (sockaddr*)&ss, &llen); | |
187 | if (rc < 0) { | |
188 | rc = -errno; | |
189 | lderr(msgr->cct) << __func__ << " failed getsockname: " | |
190 | << cpp_strerror(rc) << dendl; | |
191 | ::close(listen_sd); | |
192 | listen_sd = -1; | |
193 | return rc; | |
194 | } | |
195 | listen_addr.set_sockaddr((sockaddr*)&ss); | |
196 | ||
197 | if (msgr->cct->_conf->ms_tcp_rcvbuf) { | |
198 | int size = msgr->cct->_conf->ms_tcp_rcvbuf; | |
199 | rc = ::setsockopt(listen_sd, SOL_SOCKET, SO_RCVBUF, | |
200 | (void*)&size, sizeof(size)); | |
201 | if (rc < 0) { | |
202 | rc = -errno; | |
203 | lderr(msgr->cct) << __func__ << " failed to set SO_RCVBUF to " | |
204 | << size << ": " << cpp_strerror(rc) << dendl; | |
205 | ::close(listen_sd); | |
206 | listen_sd = -1; | |
207 | return rc; | |
208 | } | |
209 | } | |
210 | ||
211 | ldout(msgr->cct,10) << __func__ << " bound to " << listen_addr << dendl; | |
212 | ||
213 | // listen! | |
214 | rc = ::listen(listen_sd, 128); | |
215 | if (rc < 0) { | |
216 | rc = -errno; | |
217 | lderr(msgr->cct) << __func__ << " unable to listen on " << listen_addr | |
218 | << ": " << cpp_strerror(rc) << dendl; | |
219 | ::close(listen_sd); | |
220 | listen_sd = -1; | |
221 | return rc; | |
222 | } | |
223 | ||
224 | msgr->set_myaddr(bind_addr); | |
225 | if (bind_addr != entity_addr_t()) | |
226 | msgr->learned_addr(bind_addr); | |
227 | else | |
228 | assert(msgr->get_need_addr()); // should still be true. | |
229 | ||
230 | if (msgr->get_myaddr().get_port() == 0) { | |
231 | msgr->set_myaddr(listen_addr); | |
232 | } | |
233 | entity_addr_t addr = msgr->get_myaddr(); | |
234 | addr.nonce = nonce; | |
235 | msgr->set_myaddr(addr); | |
236 | ||
237 | msgr->init_local_connection(); | |
238 | ||
239 | rc = create_selfpipe(&shutdown_rd_fd, &shutdown_wr_fd); | |
240 | if (rc < 0) { | |
241 | lderr(msgr->cct) << __func__ << " unable to create signalling pipe " << listen_addr | |
242 | << ": " << cpp_strerror(rc) << dendl; | |
243 | return rc; | |
244 | } | |
245 | ||
246 | ldout(msgr->cct,1) << __func__ << " my_inst.addr is " << msgr->get_myaddr() | |
247 | << " need_addr=" << msgr->get_need_addr() << dendl; | |
248 | return 0; | |
249 | } | |
250 | ||
251 | int Accepter::rebind(const set<int>& avoid_ports) | |
252 | { | |
253 | ldout(msgr->cct,1) << __func__ << " avoid " << avoid_ports << dendl; | |
254 | ||
255 | entity_addr_t addr = msgr->get_myaddr(); | |
256 | set<int> new_avoid = avoid_ports; | |
257 | new_avoid.insert(addr.get_port()); | |
258 | addr.set_port(0); | |
259 | ||
260 | // adjust the nonce; we want our entity_addr_t to be truly unique. | |
261 | nonce += 1000000; | |
262 | msgr->my_inst.addr.nonce = nonce; | |
263 | ldout(msgr->cct,10) << __func__ << " new nonce " << nonce << " and inst " | |
264 | << msgr->my_inst << dendl; | |
265 | ||
266 | ldout(msgr->cct,10) << " will try " << addr << " and avoid ports " << new_avoid << dendl; | |
267 | int r = bind(addr, new_avoid); | |
268 | if (r == 0) | |
269 | start(); | |
270 | return r; | |
271 | } | |
272 | ||
273 | int Accepter::start() | |
274 | { | |
275 | ldout(msgr->cct,1) << __func__ << dendl; | |
276 | ||
277 | // start thread | |
278 | create("ms_accepter"); | |
279 | ||
280 | return 0; | |
281 | } | |
282 | ||
283 | void *Accepter::entry() | |
284 | { | |
285 | ldout(msgr->cct,1) << __func__ << " start" << dendl; | |
286 | ||
287 | int errors = 0; | |
288 | int ch; | |
289 | ||
290 | struct pollfd pfd[2]; | |
291 | memset(pfd, 0, sizeof(pfd)); | |
292 | ||
293 | pfd[0].fd = listen_sd; | |
294 | pfd[0].events = POLLIN | POLLERR | POLLNVAL | POLLHUP; | |
295 | pfd[1].fd = shutdown_rd_fd; | |
296 | pfd[1].events = POLLIN | POLLERR | POLLNVAL | POLLHUP; | |
297 | while (!done) { | |
298 | ldout(msgr->cct,20) << __func__ << " calling poll for sd:" << listen_sd << dendl; | |
299 | int r = poll(pfd, 2, -1); | |
300 | if (r < 0) { | |
301 | if (errno == EINTR) { | |
302 | continue; | |
303 | } | |
304 | ldout(msgr->cct,1) << __func__ << " poll got error" | |
305 | << " errno " << errno << " " << cpp_strerror(errno) << dendl; | |
306 | break; | |
307 | } | |
308 | ldout(msgr->cct,10) << __func__ << " poll returned oke: " << r << dendl; | |
309 | ldout(msgr->cct,20) << __func__ << " pfd.revents[0]=" << pfd[0].revents << dendl; | |
310 | ldout(msgr->cct,20) << __func__ << " pfd.revents[1]=" << pfd[1].revents << dendl; | |
311 | ||
312 | if (pfd[0].revents & (POLLERR | POLLNVAL | POLLHUP)) { | |
313 | ldout(msgr->cct,1) << __func__ << " poll got errors in revents " | |
314 | << pfd[0].revents << dendl; | |
315 | break; | |
316 | } | |
317 | if (pfd[1].revents & (POLLIN | POLLERR | POLLNVAL | POLLHUP)) { | |
318 | // We got "signaled" to exit the poll | |
319 | // clean the selfpipe | |
320 | if (::read(shutdown_rd_fd, &ch, 1) == -1) { | |
321 | if (errno != EAGAIN) | |
322 | ldout(msgr->cct,1) << __func__ << " Cannot read selfpipe: " | |
323 | << " errno " << errno << " " << cpp_strerror(errno) << dendl; | |
324 | } | |
325 | break; | |
326 | } | |
327 | if (done) break; | |
328 | ||
329 | // accept | |
330 | sockaddr_storage ss; | |
331 | socklen_t slen = sizeof(ss); | |
332 | int sd = ::accept(listen_sd, (sockaddr*)&ss, &slen); | |
333 | if (sd >= 0) { | |
334 | int r = set_close_on_exec(sd); | |
335 | if (r) { | |
336 | ldout(msgr->cct,1) << __func__ << " set_close_on_exec() failed " | |
337 | << cpp_strerror(r) << dendl; | |
338 | } | |
339 | errors = 0; | |
340 | ldout(msgr->cct,10) << __func__ << " incoming on sd " << sd << dendl; | |
341 | ||
342 | msgr->add_accept_pipe(sd); | |
343 | } else { | |
344 | ldout(msgr->cct,0) << __func__ << " no incoming connection? sd = " << sd | |
345 | << " errno " << errno << " " << cpp_strerror(errno) << dendl; | |
346 | if (++errors > 4) | |
347 | break; | |
348 | } | |
349 | } | |
350 | ||
351 | ldout(msgr->cct,20) << __func__ << " closing" << dendl; | |
352 | // socket is closed right after the thread has joined. | |
353 | // closing it here might race | |
354 | if (shutdown_rd_fd >= 0) { | |
355 | ::close(shutdown_rd_fd); | |
356 | shutdown_rd_fd = -1; | |
357 | } | |
358 | ||
359 | ldout(msgr->cct,10) << __func__ << " stopping" << dendl; | |
360 | return 0; | |
361 | } | |
362 | ||
363 | void Accepter::stop() | |
364 | { | |
365 | done = true; | |
366 | ldout(msgr->cct,10) << __func__ << " accept listening on: " << listen_sd << dendl; | |
367 | ||
368 | if (shutdown_wr_fd < 0) | |
369 | return; | |
370 | ||
371 | // Send a byte to the shutdown pipe that the thread is listening to | |
372 | char buf[1] = { 0x0 }; | |
373 | int ret = safe_write(shutdown_wr_fd, buf, 1); | |
374 | if (ret < 0) { | |
375 | ldout(msgr->cct,1) << __func__ << "close failed: " | |
376 | << " errno " << errno << " " << cpp_strerror(errno) << dendl; | |
377 | } else { | |
378 | ldout(msgr->cct,15) << __func__ << " signaled poll" << dendl; | |
379 | } | |
380 | VOID_TEMP_FAILURE_RETRY(close(shutdown_wr_fd)); | |
381 | shutdown_wr_fd = -1; | |
382 | ||
383 | // wait for thread to stop before closing the socket, to avoid | |
384 | // racing against fd re-use. | |
385 | if (is_started()) { | |
386 | ldout(msgr->cct,5) << __func__ << " wait for thread to join." << dendl; | |
387 | join(); | |
388 | } | |
389 | ||
390 | if (listen_sd >= 0) { | |
391 | if (::close(listen_sd) < 0) { | |
392 | ldout(msgr->cct,1) << __func__ << "close listen_sd failed: " | |
393 | << " errno " << errno << " " << cpp_strerror(errno) << dendl; | |
394 | } | |
395 | listen_sd = -1; | |
396 | } | |
397 | if (shutdown_rd_fd >= 0) { | |
398 | if (::close(shutdown_rd_fd) < 0) { | |
399 | ldout(msgr->cct,1) << __func__ << "close shutdown_rd_fd failed: " | |
400 | << " errno " << errno << " " << cpp_strerror(errno) << dendl; | |
401 | } | |
402 | shutdown_rd_fd = -1; | |
403 | } | |
404 | done = false; | |
405 | } | |
406 | ||
407 | ||
408 | ||
409 |