]> git.proxmox.com Git - ceph.git/blame - ceph/src/msg/simple/Accepter.cc
update sources to 12.2.10
[ceph.git] / ceph / src / msg / simple / Accepter.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15#include "include/compat.h"
91327a77 16#include "include/sock_compat.h"
7c673cae
FG
17#include <sys/socket.h>
18#include <netinet/tcp.h>
19#include <sys/uio.h>
20#include <limits.h>
21#include <poll.h>
22
23#include "msg/msg_types.h"
24#include "msg/Message.h"
25
26#include "Accepter.h"
27#include "Pipe.h"
28#include "SimpleMessenger.h"
29
30#include "common/debug.h"
31#include "common/errno.h"
32#include "common/safe_io.h"
33
34#define dout_subsys ceph_subsys_ms
35
36#undef dout_prefix
37#define dout_prefix *_dout << "accepter."
38
39
40/********************************************
41 * Accepter
42 */
43
7c673cae
FG
44int Accepter::create_selfpipe(int *pipe_rd, int *pipe_wr) {
45 int selfpipe[2];
91327a77
AA
46 if (pipe_cloexec(selfpipe) < 0) {
47 int e = errno;
7c673cae 48 lderr(msgr->cct) << __func__ << " unable to create the selfpipe: "
91327a77
AA
49 << cpp_strerror(e) << dendl;
50 return -e;
51 }
52 for (size_t i = 0; i < 2; i++) {
53 int rc = fcntl(selfpipe[i], F_GETFL);
54 assert(rc != -1);
55 rc = fcntl(selfpipe[i], F_SETFL, rc | O_NONBLOCK);
56 assert(rc != -1);
7c673cae
FG
57 }
58 *pipe_rd = selfpipe[0];
59 *pipe_wr = selfpipe[1];
60 return 0;
61}
62
63int Accepter::bind(const entity_addr_t &bind_addr, const set<int>& avoid_ports)
64{
65 const md_config_t *conf = msgr->cct->_conf;
66 // bind to a socket
67 ldout(msgr->cct,10) << __func__ << dendl;
68
69 int family;
70 switch (bind_addr.get_family()) {
71 case AF_INET:
72 case AF_INET6:
73 family = bind_addr.get_family();
74 break;
75
76 default:
77 // bind_addr is empty
78 family = conf->ms_bind_ipv6 ? AF_INET6 : AF_INET;
79 }
80
81 /* socket creation */
91327a77 82 listen_sd = socket_cloexec(family, SOCK_STREAM, 0);
7c673cae 83 if (listen_sd < 0) {
91327a77 84 int e = errno;
7c673cae 85 lderr(msgr->cct) << __func__ << " unable to create socket: "
91327a77
AA
86 << cpp_strerror(e) << dendl;
87 return -e;
7c673cae 88 }
91327a77 89 ldout(msgr->cct,10) << __func__ << " socket sd: " << listen_sd << dendl;
7c673cae
FG
90
91 // use whatever user specified (if anything)
92 entity_addr_t listen_addr = bind_addr;
93 if (listen_addr.get_type() == entity_addr_t::TYPE_NONE) {
94 listen_addr.set_type(entity_addr_t::TYPE_LEGACY);
95 }
96 listen_addr.set_family(family);
97
98 /* bind to port */
99 int rc = -1;
100 int r = -1;
101
102 for (int i = 0; i < conf->ms_bind_retry_count; i++) {
103
104 if (i > 0) {
105 lderr(msgr->cct) << __func__ << " was unable to bind. Trying again in "
106 << conf->ms_bind_retry_delay << " seconds " << dendl;
107 sleep(conf->ms_bind_retry_delay);
108 }
109
110 if (listen_addr.get_port()) {
111 // specific port
112
113 // reuse addr+port when possible
114 int on = 1;
115 rc = ::setsockopt(listen_sd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
116 if (rc < 0) {
117 lderr(msgr->cct) << __func__ << " unable to setsockopt: "
118 << cpp_strerror(errno) << dendl;
119 r = -errno;
120 continue;
121 }
122
123 rc = ::bind(listen_sd, listen_addr.get_sockaddr(),
124 listen_addr.get_sockaddr_len());
125 if (rc < 0) {
126 lderr(msgr->cct) << __func__ << " unable to bind to " << listen_addr
127 << ": " << cpp_strerror(errno) << dendl;
128 r = -errno;
129 continue;
130 }
131 } else {
132 // try a range of ports
133 for (int port = msgr->cct->_conf->ms_bind_port_min;
134 port <= msgr->cct->_conf->ms_bind_port_max; port++) {
135 if (avoid_ports.count(port))
136 continue;
137
138 listen_addr.set_port(port);
139 rc = ::bind(listen_sd, listen_addr.get_sockaddr(),
140 listen_addr.get_sockaddr_len());
141 if (rc == 0)
142 break;
143 }
144 if (rc < 0) {
145 lderr(msgr->cct) << __func__ << " unable to bind to " << listen_addr
146 << " on any port in range " << msgr->cct->_conf->ms_bind_port_min
147 << "-" << msgr->cct->_conf->ms_bind_port_max
148 << ": " << cpp_strerror(errno)
149 << dendl;
150 r = -errno;
151 // Clear port before retry, otherwise we shall fail again.
152 listen_addr.set_port(0);
153 continue;
154 }
155 ldout(msgr->cct,10) << __func__ << " bound on random port "
156 << listen_addr << dendl;
157 }
158
159 if (rc == 0)
160 break;
161 }
162
163 // It seems that binding completely failed, return with that exit status
164 if (rc < 0) {
165 lderr(msgr->cct) << __func__ << " was unable to bind after "
166 << conf->ms_bind_retry_count << " attempts: "
167 << cpp_strerror(errno) << dendl;
168 ::close(listen_sd);
169 listen_sd = -1;
170 return r;
171 }
172
173 // what port did we get?
174 sockaddr_storage ss;
175 socklen_t llen = sizeof(ss);
176 rc = getsockname(listen_sd, (sockaddr*)&ss, &llen);
177 if (rc < 0) {
178 rc = -errno;
179 lderr(msgr->cct) << __func__ << " failed getsockname: "
180 << cpp_strerror(rc) << dendl;
181 ::close(listen_sd);
182 listen_sd = -1;
183 return rc;
184 }
185 listen_addr.set_sockaddr((sockaddr*)&ss);
186
187 if (msgr->cct->_conf->ms_tcp_rcvbuf) {
188 int size = msgr->cct->_conf->ms_tcp_rcvbuf;
189 rc = ::setsockopt(listen_sd, SOL_SOCKET, SO_RCVBUF,
190 (void*)&size, sizeof(size));
191 if (rc < 0) {
192 rc = -errno;
193 lderr(msgr->cct) << __func__ << " failed to set SO_RCVBUF to "
194 << size << ": " << cpp_strerror(rc) << dendl;
195 ::close(listen_sd);
196 listen_sd = -1;
197 return rc;
198 }
199 }
200
201 ldout(msgr->cct,10) << __func__ << " bound to " << listen_addr << dendl;
202
203 // listen!
224ce89b 204 rc = ::listen(listen_sd, msgr->cct->_conf->ms_tcp_listen_backlog);
7c673cae
FG
205 if (rc < 0) {
206 rc = -errno;
207 lderr(msgr->cct) << __func__ << " unable to listen on " << listen_addr
208 << ": " << cpp_strerror(rc) << dendl;
209 ::close(listen_sd);
210 listen_sd = -1;
211 return rc;
212 }
213
214 msgr->set_myaddr(bind_addr);
215 if (bind_addr != entity_addr_t())
216 msgr->learned_addr(bind_addr);
217 else
218 assert(msgr->get_need_addr()); // should still be true.
219
220 if (msgr->get_myaddr().get_port() == 0) {
221 msgr->set_myaddr(listen_addr);
222 }
223 entity_addr_t addr = msgr->get_myaddr();
224 addr.nonce = nonce;
225 msgr->set_myaddr(addr);
226
227 msgr->init_local_connection();
228
229 rc = create_selfpipe(&shutdown_rd_fd, &shutdown_wr_fd);
230 if (rc < 0) {
231 lderr(msgr->cct) << __func__ << " unable to create signalling pipe " << listen_addr
232 << ": " << cpp_strerror(rc) << dendl;
233 return rc;
234 }
235
236 ldout(msgr->cct,1) << __func__ << " my_inst.addr is " << msgr->get_myaddr()
237 << " need_addr=" << msgr->get_need_addr() << dendl;
238 return 0;
239}
240
241int Accepter::rebind(const set<int>& avoid_ports)
242{
243 ldout(msgr->cct,1) << __func__ << " avoid " << avoid_ports << dendl;
244
245 entity_addr_t addr = msgr->get_myaddr();
246 set<int> new_avoid = avoid_ports;
247 new_avoid.insert(addr.get_port());
248 addr.set_port(0);
249
250 // adjust the nonce; we want our entity_addr_t to be truly unique.
251 nonce += 1000000;
252 msgr->my_inst.addr.nonce = nonce;
253 ldout(msgr->cct,10) << __func__ << " new nonce " << nonce << " and inst "
254 << msgr->my_inst << dendl;
255
256 ldout(msgr->cct,10) << " will try " << addr << " and avoid ports " << new_avoid << dendl;
257 int r = bind(addr, new_avoid);
258 if (r == 0)
259 start();
260 return r;
261}
262
263int Accepter::start()
264{
265 ldout(msgr->cct,1) << __func__ << dendl;
266
267 // start thread
268 create("ms_accepter");
269
270 return 0;
271}
272
273void *Accepter::entry()
274{
275 ldout(msgr->cct,1) << __func__ << " start" << dendl;
276
277 int errors = 0;
278 int ch;
279
280 struct pollfd pfd[2];
281 memset(pfd, 0, sizeof(pfd));
282
283 pfd[0].fd = listen_sd;
284 pfd[0].events = POLLIN | POLLERR | POLLNVAL | POLLHUP;
285 pfd[1].fd = shutdown_rd_fd;
286 pfd[1].events = POLLIN | POLLERR | POLLNVAL | POLLHUP;
287 while (!done) {
288 ldout(msgr->cct,20) << __func__ << " calling poll for sd:" << listen_sd << dendl;
289 int r = poll(pfd, 2, -1);
290 if (r < 0) {
291 if (errno == EINTR) {
292 continue;
293 }
294 ldout(msgr->cct,1) << __func__ << " poll got error"
295 << " errno " << errno << " " << cpp_strerror(errno) << dendl;
91327a77 296 ceph_abort();
7c673cae
FG
297 }
298 ldout(msgr->cct,10) << __func__ << " poll returned oke: " << r << dendl;
299 ldout(msgr->cct,20) << __func__ << " pfd.revents[0]=" << pfd[0].revents << dendl;
300 ldout(msgr->cct,20) << __func__ << " pfd.revents[1]=" << pfd[1].revents << dendl;
301
302 if (pfd[0].revents & (POLLERR | POLLNVAL | POLLHUP)) {
303 ldout(msgr->cct,1) << __func__ << " poll got errors in revents "
304 << pfd[0].revents << dendl;
91327a77 305 ceph_abort();
7c673cae
FG
306 }
307 if (pfd[1].revents & (POLLIN | POLLERR | POLLNVAL | POLLHUP)) {
308 // We got "signaled" to exit the poll
309 // clean the selfpipe
310 if (::read(shutdown_rd_fd, &ch, 1) == -1) {
311 if (errno != EAGAIN)
312 ldout(msgr->cct,1) << __func__ << " Cannot read selfpipe: "
313 << " errno " << errno << " " << cpp_strerror(errno) << dendl;
314 }
315 break;
316 }
317 if (done) break;
318
319 // accept
320 sockaddr_storage ss;
321 socklen_t slen = sizeof(ss);
91327a77 322 int sd = accept_cloexec(listen_sd, (sockaddr*)&ss, &slen);
7c673cae 323 if (sd >= 0) {
7c673cae
FG
324 errors = 0;
325 ldout(msgr->cct,10) << __func__ << " incoming on sd " << sd << dendl;
326
327 msgr->add_accept_pipe(sd);
328 } else {
91327a77 329 int e = errno;
7c673cae 330 ldout(msgr->cct,0) << __func__ << " no incoming connection? sd = " << sd
91327a77
AA
331 << " errno " << e << " " << cpp_strerror(e) << dendl;
332 if (++errors > msgr->cct->_conf->ms_max_accept_failures) {
333 lderr(msgr->cct) << "accetper has encoutered enough errors, just do ceph_abort()." << dendl;
334 ceph_abort();
335 }
7c673cae
FG
336 }
337 }
338
339 ldout(msgr->cct,20) << __func__ << " closing" << dendl;
340 // socket is closed right after the thread has joined.
341 // closing it here might race
342 if (shutdown_rd_fd >= 0) {
343 ::close(shutdown_rd_fd);
344 shutdown_rd_fd = -1;
345 }
346
347 ldout(msgr->cct,10) << __func__ << " stopping" << dendl;
348 return 0;
349}
350
351void Accepter::stop()
352{
353 done = true;
354 ldout(msgr->cct,10) << __func__ << " accept listening on: " << listen_sd << dendl;
355
356 if (shutdown_wr_fd < 0)
357 return;
358
359 // Send a byte to the shutdown pipe that the thread is listening to
360 char buf[1] = { 0x0 };
361 int ret = safe_write(shutdown_wr_fd, buf, 1);
362 if (ret < 0) {
363 ldout(msgr->cct,1) << __func__ << "close failed: "
364 << " errno " << errno << " " << cpp_strerror(errno) << dendl;
365 } else {
366 ldout(msgr->cct,15) << __func__ << " signaled poll" << dendl;
367 }
368 VOID_TEMP_FAILURE_RETRY(close(shutdown_wr_fd));
369 shutdown_wr_fd = -1;
370
371 // wait for thread to stop before closing the socket, to avoid
372 // racing against fd re-use.
373 if (is_started()) {
374 ldout(msgr->cct,5) << __func__ << " wait for thread to join." << dendl;
375 join();
376 }
377
378 if (listen_sd >= 0) {
379 if (::close(listen_sd) < 0) {
380 ldout(msgr->cct,1) << __func__ << "close listen_sd failed: "
381 << " errno " << errno << " " << cpp_strerror(errno) << dendl;
382 }
383 listen_sd = -1;
384 }
385 if (shutdown_rd_fd >= 0) {
386 if (::close(shutdown_rd_fd) < 0) {
387 ldout(msgr->cct,1) << __func__ << "close shutdown_rd_fd failed: "
388 << " errno " << errno << " " << cpp_strerror(errno) << dendl;
389 }
390 shutdown_rd_fd = -1;
391 }
392 done = false;
393}
394
395
396
397