]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | /* | |
4 | * Ceph - scalable distributed file system | |
5 | * | |
6 | * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net> | |
7 | * | |
8 | * This is free software; you can redistribute it and/or | |
9 | * modify it under the terms of the GNU Lesser General Public | |
10 | * License version 2.1, as published by the Free Software | |
11 | * Foundation. See file COPYING. | |
12 | * | |
13 | */ | |
14 | ||
15 | #include "include/compat.h" | |
91327a77 | 16 | #include "include/sock_compat.h" |
7c673cae FG |
17 | #include <sys/socket.h> |
18 | #include <netinet/tcp.h> | |
19 | #include <sys/uio.h> | |
20 | #include <limits.h> | |
21 | #include <poll.h> | |
22 | ||
23 | #include "msg/msg_types.h" | |
24 | #include "msg/Message.h" | |
25 | ||
26 | #include "Accepter.h" | |
27 | #include "Pipe.h" | |
28 | #include "SimpleMessenger.h" | |
29 | ||
30 | #include "common/debug.h" | |
31 | #include "common/errno.h" | |
32 | #include "common/safe_io.h" | |
33 | ||
34 | #define dout_subsys ceph_subsys_ms | |
35 | ||
36 | #undef dout_prefix | |
37 | #define dout_prefix *_dout << "accepter." | |
38 | ||
39 | ||
40 | /******************************************** | |
41 | * Accepter | |
42 | */ | |
43 | ||
7c673cae FG |
44 | int Accepter::create_selfpipe(int *pipe_rd, int *pipe_wr) { |
45 | int selfpipe[2]; | |
91327a77 AA |
46 | if (pipe_cloexec(selfpipe) < 0) { |
47 | int e = errno; | |
7c673cae | 48 | lderr(msgr->cct) << __func__ << " unable to create the selfpipe: " |
91327a77 AA |
49 | << cpp_strerror(e) << dendl; |
50 | return -e; | |
51 | } | |
52 | for (size_t i = 0; i < 2; i++) { | |
53 | int rc = fcntl(selfpipe[i], F_GETFL); | |
54 | assert(rc != -1); | |
55 | rc = fcntl(selfpipe[i], F_SETFL, rc | O_NONBLOCK); | |
56 | assert(rc != -1); | |
7c673cae FG |
57 | } |
58 | *pipe_rd = selfpipe[0]; | |
59 | *pipe_wr = selfpipe[1]; | |
60 | return 0; | |
61 | } | |
62 | ||
63 | int Accepter::bind(const entity_addr_t &bind_addr, const set<int>& avoid_ports) | |
64 | { | |
65 | const md_config_t *conf = msgr->cct->_conf; | |
66 | // bind to a socket | |
67 | ldout(msgr->cct,10) << __func__ << dendl; | |
68 | ||
69 | int family; | |
70 | switch (bind_addr.get_family()) { | |
71 | case AF_INET: | |
72 | case AF_INET6: | |
73 | family = bind_addr.get_family(); | |
74 | break; | |
75 | ||
76 | default: | |
77 | // bind_addr is empty | |
78 | family = conf->ms_bind_ipv6 ? AF_INET6 : AF_INET; | |
79 | } | |
80 | ||
81 | /* socket creation */ | |
91327a77 | 82 | listen_sd = socket_cloexec(family, SOCK_STREAM, 0); |
7c673cae | 83 | if (listen_sd < 0) { |
91327a77 | 84 | int e = errno; |
7c673cae | 85 | lderr(msgr->cct) << __func__ << " unable to create socket: " |
91327a77 AA |
86 | << cpp_strerror(e) << dendl; |
87 | return -e; | |
7c673cae | 88 | } |
91327a77 | 89 | ldout(msgr->cct,10) << __func__ << " socket sd: " << listen_sd << dendl; |
7c673cae FG |
90 | |
91 | // use whatever user specified (if anything) | |
92 | entity_addr_t listen_addr = bind_addr; | |
93 | if (listen_addr.get_type() == entity_addr_t::TYPE_NONE) { | |
94 | listen_addr.set_type(entity_addr_t::TYPE_LEGACY); | |
95 | } | |
96 | listen_addr.set_family(family); | |
97 | ||
98 | /* bind to port */ | |
99 | int rc = -1; | |
100 | int r = -1; | |
101 | ||
102 | for (int i = 0; i < conf->ms_bind_retry_count; i++) { | |
103 | ||
104 | if (i > 0) { | |
105 | lderr(msgr->cct) << __func__ << " was unable to bind. Trying again in " | |
106 | << conf->ms_bind_retry_delay << " seconds " << dendl; | |
107 | sleep(conf->ms_bind_retry_delay); | |
108 | } | |
109 | ||
110 | if (listen_addr.get_port()) { | |
111 | // specific port | |
112 | ||
113 | // reuse addr+port when possible | |
114 | int on = 1; | |
115 | rc = ::setsockopt(listen_sd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)); | |
116 | if (rc < 0) { | |
117 | lderr(msgr->cct) << __func__ << " unable to setsockopt: " | |
118 | << cpp_strerror(errno) << dendl; | |
119 | r = -errno; | |
120 | continue; | |
121 | } | |
122 | ||
123 | rc = ::bind(listen_sd, listen_addr.get_sockaddr(), | |
124 | listen_addr.get_sockaddr_len()); | |
125 | if (rc < 0) { | |
126 | lderr(msgr->cct) << __func__ << " unable to bind to " << listen_addr | |
127 | << ": " << cpp_strerror(errno) << dendl; | |
128 | r = -errno; | |
129 | continue; | |
130 | } | |
131 | } else { | |
132 | // try a range of ports | |
133 | for (int port = msgr->cct->_conf->ms_bind_port_min; | |
134 | port <= msgr->cct->_conf->ms_bind_port_max; port++) { | |
135 | if (avoid_ports.count(port)) | |
136 | continue; | |
137 | ||
138 | listen_addr.set_port(port); | |
139 | rc = ::bind(listen_sd, listen_addr.get_sockaddr(), | |
140 | listen_addr.get_sockaddr_len()); | |
141 | if (rc == 0) | |
142 | break; | |
143 | } | |
144 | if (rc < 0) { | |
145 | lderr(msgr->cct) << __func__ << " unable to bind to " << listen_addr | |
146 | << " on any port in range " << msgr->cct->_conf->ms_bind_port_min | |
147 | << "-" << msgr->cct->_conf->ms_bind_port_max | |
148 | << ": " << cpp_strerror(errno) | |
149 | << dendl; | |
150 | r = -errno; | |
151 | // Clear port before retry, otherwise we shall fail again. | |
152 | listen_addr.set_port(0); | |
153 | continue; | |
154 | } | |
155 | ldout(msgr->cct,10) << __func__ << " bound on random port " | |
156 | << listen_addr << dendl; | |
157 | } | |
158 | ||
159 | if (rc == 0) | |
160 | break; | |
161 | } | |
162 | ||
163 | // It seems that binding completely failed, return with that exit status | |
164 | if (rc < 0) { | |
165 | lderr(msgr->cct) << __func__ << " was unable to bind after " | |
166 | << conf->ms_bind_retry_count << " attempts: " | |
167 | << cpp_strerror(errno) << dendl; | |
168 | ::close(listen_sd); | |
169 | listen_sd = -1; | |
170 | return r; | |
171 | } | |
172 | ||
173 | // what port did we get? | |
174 | sockaddr_storage ss; | |
175 | socklen_t llen = sizeof(ss); | |
176 | rc = getsockname(listen_sd, (sockaddr*)&ss, &llen); | |
177 | if (rc < 0) { | |
178 | rc = -errno; | |
179 | lderr(msgr->cct) << __func__ << " failed getsockname: " | |
180 | << cpp_strerror(rc) << dendl; | |
181 | ::close(listen_sd); | |
182 | listen_sd = -1; | |
183 | return rc; | |
184 | } | |
185 | listen_addr.set_sockaddr((sockaddr*)&ss); | |
186 | ||
187 | if (msgr->cct->_conf->ms_tcp_rcvbuf) { | |
188 | int size = msgr->cct->_conf->ms_tcp_rcvbuf; | |
189 | rc = ::setsockopt(listen_sd, SOL_SOCKET, SO_RCVBUF, | |
190 | (void*)&size, sizeof(size)); | |
191 | if (rc < 0) { | |
192 | rc = -errno; | |
193 | lderr(msgr->cct) << __func__ << " failed to set SO_RCVBUF to " | |
194 | << size << ": " << cpp_strerror(rc) << dendl; | |
195 | ::close(listen_sd); | |
196 | listen_sd = -1; | |
197 | return rc; | |
198 | } | |
199 | } | |
200 | ||
201 | ldout(msgr->cct,10) << __func__ << " bound to " << listen_addr << dendl; | |
202 | ||
203 | // listen! | |
224ce89b | 204 | rc = ::listen(listen_sd, msgr->cct->_conf->ms_tcp_listen_backlog); |
7c673cae FG |
205 | if (rc < 0) { |
206 | rc = -errno; | |
207 | lderr(msgr->cct) << __func__ << " unable to listen on " << listen_addr | |
208 | << ": " << cpp_strerror(rc) << dendl; | |
209 | ::close(listen_sd); | |
210 | listen_sd = -1; | |
211 | return rc; | |
212 | } | |
213 | ||
214 | msgr->set_myaddr(bind_addr); | |
215 | if (bind_addr != entity_addr_t()) | |
216 | msgr->learned_addr(bind_addr); | |
217 | else | |
218 | assert(msgr->get_need_addr()); // should still be true. | |
219 | ||
220 | if (msgr->get_myaddr().get_port() == 0) { | |
221 | msgr->set_myaddr(listen_addr); | |
222 | } | |
223 | entity_addr_t addr = msgr->get_myaddr(); | |
224 | addr.nonce = nonce; | |
225 | msgr->set_myaddr(addr); | |
226 | ||
227 | msgr->init_local_connection(); | |
228 | ||
229 | rc = create_selfpipe(&shutdown_rd_fd, &shutdown_wr_fd); | |
230 | if (rc < 0) { | |
231 | lderr(msgr->cct) << __func__ << " unable to create signalling pipe " << listen_addr | |
232 | << ": " << cpp_strerror(rc) << dendl; | |
233 | return rc; | |
234 | } | |
235 | ||
236 | ldout(msgr->cct,1) << __func__ << " my_inst.addr is " << msgr->get_myaddr() | |
237 | << " need_addr=" << msgr->get_need_addr() << dendl; | |
238 | return 0; | |
239 | } | |
240 | ||
241 | int Accepter::rebind(const set<int>& avoid_ports) | |
242 | { | |
243 | ldout(msgr->cct,1) << __func__ << " avoid " << avoid_ports << dendl; | |
244 | ||
245 | entity_addr_t addr = msgr->get_myaddr(); | |
246 | set<int> new_avoid = avoid_ports; | |
247 | new_avoid.insert(addr.get_port()); | |
248 | addr.set_port(0); | |
249 | ||
250 | // adjust the nonce; we want our entity_addr_t to be truly unique. | |
251 | nonce += 1000000; | |
252 | msgr->my_inst.addr.nonce = nonce; | |
253 | ldout(msgr->cct,10) << __func__ << " new nonce " << nonce << " and inst " | |
254 | << msgr->my_inst << dendl; | |
255 | ||
256 | ldout(msgr->cct,10) << " will try " << addr << " and avoid ports " << new_avoid << dendl; | |
257 | int r = bind(addr, new_avoid); | |
258 | if (r == 0) | |
259 | start(); | |
260 | return r; | |
261 | } | |
262 | ||
263 | int Accepter::start() | |
264 | { | |
265 | ldout(msgr->cct,1) << __func__ << dendl; | |
266 | ||
267 | // start thread | |
268 | create("ms_accepter"); | |
269 | ||
270 | return 0; | |
271 | } | |
272 | ||
273 | void *Accepter::entry() | |
274 | { | |
275 | ldout(msgr->cct,1) << __func__ << " start" << dendl; | |
276 | ||
277 | int errors = 0; | |
278 | int ch; | |
279 | ||
280 | struct pollfd pfd[2]; | |
281 | memset(pfd, 0, sizeof(pfd)); | |
282 | ||
283 | pfd[0].fd = listen_sd; | |
284 | pfd[0].events = POLLIN | POLLERR | POLLNVAL | POLLHUP; | |
285 | pfd[1].fd = shutdown_rd_fd; | |
286 | pfd[1].events = POLLIN | POLLERR | POLLNVAL | POLLHUP; | |
287 | while (!done) { | |
288 | ldout(msgr->cct,20) << __func__ << " calling poll for sd:" << listen_sd << dendl; | |
289 | int r = poll(pfd, 2, -1); | |
290 | if (r < 0) { | |
291 | if (errno == EINTR) { | |
292 | continue; | |
293 | } | |
294 | ldout(msgr->cct,1) << __func__ << " poll got error" | |
295 | << " errno " << errno << " " << cpp_strerror(errno) << dendl; | |
91327a77 | 296 | ceph_abort(); |
7c673cae FG |
297 | } |
298 | ldout(msgr->cct,10) << __func__ << " poll returned oke: " << r << dendl; | |
299 | ldout(msgr->cct,20) << __func__ << " pfd.revents[0]=" << pfd[0].revents << dendl; | |
300 | ldout(msgr->cct,20) << __func__ << " pfd.revents[1]=" << pfd[1].revents << dendl; | |
301 | ||
302 | if (pfd[0].revents & (POLLERR | POLLNVAL | POLLHUP)) { | |
303 | ldout(msgr->cct,1) << __func__ << " poll got errors in revents " | |
304 | << pfd[0].revents << dendl; | |
91327a77 | 305 | ceph_abort(); |
7c673cae FG |
306 | } |
307 | if (pfd[1].revents & (POLLIN | POLLERR | POLLNVAL | POLLHUP)) { | |
308 | // We got "signaled" to exit the poll | |
309 | // clean the selfpipe | |
310 | if (::read(shutdown_rd_fd, &ch, 1) == -1) { | |
311 | if (errno != EAGAIN) | |
312 | ldout(msgr->cct,1) << __func__ << " Cannot read selfpipe: " | |
313 | << " errno " << errno << " " << cpp_strerror(errno) << dendl; | |
314 | } | |
315 | break; | |
316 | } | |
317 | if (done) break; | |
318 | ||
319 | // accept | |
320 | sockaddr_storage ss; | |
321 | socklen_t slen = sizeof(ss); | |
91327a77 | 322 | int sd = accept_cloexec(listen_sd, (sockaddr*)&ss, &slen); |
7c673cae | 323 | if (sd >= 0) { |
7c673cae FG |
324 | errors = 0; |
325 | ldout(msgr->cct,10) << __func__ << " incoming on sd " << sd << dendl; | |
326 | ||
327 | msgr->add_accept_pipe(sd); | |
328 | } else { | |
91327a77 | 329 | int e = errno; |
7c673cae | 330 | ldout(msgr->cct,0) << __func__ << " no incoming connection? sd = " << sd |
91327a77 AA |
331 | << " errno " << e << " " << cpp_strerror(e) << dendl; |
332 | if (++errors > msgr->cct->_conf->ms_max_accept_failures) { | |
333 | lderr(msgr->cct) << "accetper has encoutered enough errors, just do ceph_abort()." << dendl; | |
334 | ceph_abort(); | |
335 | } | |
7c673cae FG |
336 | } |
337 | } | |
338 | ||
339 | ldout(msgr->cct,20) << __func__ << " closing" << dendl; | |
340 | // socket is closed right after the thread has joined. | |
341 | // closing it here might race | |
342 | if (shutdown_rd_fd >= 0) { | |
343 | ::close(shutdown_rd_fd); | |
344 | shutdown_rd_fd = -1; | |
345 | } | |
346 | ||
347 | ldout(msgr->cct,10) << __func__ << " stopping" << dendl; | |
348 | return 0; | |
349 | } | |
350 | ||
351 | void Accepter::stop() | |
352 | { | |
353 | done = true; | |
354 | ldout(msgr->cct,10) << __func__ << " accept listening on: " << listen_sd << dendl; | |
355 | ||
356 | if (shutdown_wr_fd < 0) | |
357 | return; | |
358 | ||
359 | // Send a byte to the shutdown pipe that the thread is listening to | |
360 | char buf[1] = { 0x0 }; | |
361 | int ret = safe_write(shutdown_wr_fd, buf, 1); | |
362 | if (ret < 0) { | |
363 | ldout(msgr->cct,1) << __func__ << "close failed: " | |
364 | << " errno " << errno << " " << cpp_strerror(errno) << dendl; | |
365 | } else { | |
366 | ldout(msgr->cct,15) << __func__ << " signaled poll" << dendl; | |
367 | } | |
368 | VOID_TEMP_FAILURE_RETRY(close(shutdown_wr_fd)); | |
369 | shutdown_wr_fd = -1; | |
370 | ||
371 | // wait for thread to stop before closing the socket, to avoid | |
372 | // racing against fd re-use. | |
373 | if (is_started()) { | |
374 | ldout(msgr->cct,5) << __func__ << " wait for thread to join." << dendl; | |
375 | join(); | |
376 | } | |
377 | ||
378 | if (listen_sd >= 0) { | |
379 | if (::close(listen_sd) < 0) { | |
380 | ldout(msgr->cct,1) << __func__ << "close listen_sd failed: " | |
381 | << " errno " << errno << " " << cpp_strerror(errno) << dendl; | |
382 | } | |
383 | listen_sd = -1; | |
384 | } | |
385 | if (shutdown_rd_fd >= 0) { | |
386 | if (::close(shutdown_rd_fd) < 0) { | |
387 | ldout(msgr->cct,1) << __func__ << "close shutdown_rd_fd failed: " | |
388 | << " errno " << errno << " " << cpp_strerror(errno) << dendl; | |
389 | } | |
390 | shutdown_rd_fd = -1; | |
391 | } | |
392 | done = false; | |
393 | } | |
394 | ||
395 | ||
396 | ||
397 |