]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | /* | |
4 | * Ceph - scalable distributed file system | |
5 | * | |
6 | * Copyright (C) 2011 New Dream Network | |
7 | * | |
8 | * This is free software; you can redistribute it and/or | |
9 | * modify it under the terms of the GNU Lesser General Public | |
10 | * License version 2.1, as published by the Free Software | |
11 | * Foundation. See file COPYING. | |
12 | * | |
13 | */ | |
14 | ||
11fdf7f2 TL |
15 | #include <poll.h> |
16 | #include <sys/un.h> | |
17 | #include <unistd.h> | |
18 | ||
7c673cae | 19 | #include "common/OutputDataSocket.h" |
7c673cae | 20 | #include "common/errno.h" |
11fdf7f2 | 21 | #include "common/debug.h" |
7c673cae | 22 | #include "common/safe_io.h" |
31f18b77 | 23 | #include "include/compat.h" |
91327a77 | 24 | #include "include/sock_compat.h" |
7c673cae | 25 | |
31f18b77 | 26 | // re-include our assert to clobber the system one; fix dout: |
11fdf7f2 | 27 | #include "include/ceph_assert.h" |
7c673cae FG |
28 | |
29 | #define dout_subsys ceph_subsys_asok | |
30 | #undef dout_prefix | |
31 | #define dout_prefix *_dout << "asok(" << (void*)m_cct << ") " | |
32 | ||
33 | using std::ostringstream; | |
34 | ||
35 | /* | |
36 | * UNIX domain sockets created by an application persist even after that | |
37 | * application closes, unless they're explicitly unlinked. This is because the | |
38 | * directory containing the socket keeps a reference to the socket. | |
39 | * | |
40 | * This code makes things a little nicer by unlinking those dead sockets when | |
41 | * the application exits normally. | |
42 | */ | |
43 | static pthread_mutex_t cleanup_lock = PTHREAD_MUTEX_INITIALIZER; | |
44 | static std::vector <const char*> cleanup_files; | |
45 | static bool cleanup_atexit = false; | |
46 | ||
47 | static void remove_cleanup_file(const char *file) | |
48 | { | |
49 | pthread_mutex_lock(&cleanup_lock); | |
50 | VOID_TEMP_FAILURE_RETRY(unlink(file)); | |
51 | for (std::vector <const char*>::iterator i = cleanup_files.begin(); | |
52 | i != cleanup_files.end(); ++i) { | |
53 | if (strcmp(file, *i) == 0) { | |
54 | free((void*)*i); | |
55 | cleanup_files.erase(i); | |
56 | break; | |
57 | } | |
58 | } | |
59 | pthread_mutex_unlock(&cleanup_lock); | |
60 | } | |
61 | ||
62 | static void remove_all_cleanup_files() | |
63 | { | |
64 | pthread_mutex_lock(&cleanup_lock); | |
65 | for (std::vector <const char*>::iterator i = cleanup_files.begin(); | |
66 | i != cleanup_files.end(); ++i) { | |
67 | VOID_TEMP_FAILURE_RETRY(unlink(*i)); | |
68 | free((void*)*i); | |
69 | } | |
70 | cleanup_files.clear(); | |
71 | pthread_mutex_unlock(&cleanup_lock); | |
72 | } | |
73 | ||
74 | static void add_cleanup_file(const char *file) | |
75 | { | |
76 | char *fname = strdup(file); | |
77 | if (!fname) | |
78 | return; | |
79 | pthread_mutex_lock(&cleanup_lock); | |
80 | cleanup_files.push_back(fname); | |
81 | if (!cleanup_atexit) { | |
82 | atexit(remove_all_cleanup_files); | |
83 | cleanup_atexit = true; | |
84 | } | |
85 | pthread_mutex_unlock(&cleanup_lock); | |
86 | } | |
87 | ||
88 | ||
89 | OutputDataSocket::OutputDataSocket(CephContext *cct, uint64_t _backlog) | |
90 | : m_cct(cct), | |
91 | data_max_backlog(_backlog), | |
92 | m_sock_fd(-1), | |
93 | m_shutdown_rd_fd(-1), | |
94 | m_shutdown_wr_fd(-1), | |
95 | going_down(false), | |
494da23a TL |
96 | data_size(0), |
97 | skipped(0) | |
7c673cae FG |
98 | { |
99 | } | |
100 | ||
101 | OutputDataSocket::~OutputDataSocket() | |
102 | { | |
103 | shutdown(); | |
104 | } | |
105 | ||
106 | /* | |
107 | * This thread listens on the UNIX domain socket for incoming connections. | |
108 | * It only handles one connection at a time at the moment. All I/O is nonblocking, | |
109 | * so that we can implement sensible timeouts. [TODO: make all I/O nonblocking] | |
110 | * | |
111 | * This thread also listens to m_shutdown_rd_fd. If there is any data sent to this | |
112 | * pipe, the thread terminates itself gracefully, allowing the | |
113 | * OutputDataSocketConfigObs class to join() it. | |
114 | */ | |
115 | ||
116 | #define PFL_SUCCESS ((void*)(intptr_t)0) | |
117 | #define PFL_FAIL ((void*)(intptr_t)1) | |
118 | ||
119 | std::string OutputDataSocket::create_shutdown_pipe(int *pipe_rd, int *pipe_wr) | |
120 | { | |
121 | int pipefd[2]; | |
9f95a23c | 122 | if (pipe_cloexec(pipefd, 0) < 0) { |
91327a77 | 123 | int e = errno; |
7c673cae | 124 | ostringstream oss; |
91327a77 | 125 | oss << "OutputDataSocket::create_shutdown_pipe error: " << cpp_strerror(e); |
7c673cae FG |
126 | return oss.str(); |
127 | } | |
128 | ||
129 | *pipe_rd = pipefd[0]; | |
130 | *pipe_wr = pipefd[1]; | |
131 | return ""; | |
132 | } | |
133 | ||
134 | std::string OutputDataSocket::bind_and_listen(const std::string &sock_path, int *fd) | |
135 | { | |
136 | ldout(m_cct, 5) << "bind_and_listen " << sock_path << dendl; | |
137 | ||
138 | struct sockaddr_un address; | |
139 | if (sock_path.size() > sizeof(address.sun_path) - 1) { | |
140 | ostringstream oss; | |
141 | oss << "OutputDataSocket::bind_and_listen: " | |
142 | << "The UNIX domain socket path " << sock_path << " is too long! The " | |
143 | << "maximum length on this system is " | |
144 | << (sizeof(address.sun_path) - 1); | |
145 | return oss.str(); | |
146 | } | |
91327a77 | 147 | int sock_fd = socket_cloexec(PF_UNIX, SOCK_STREAM, 0); |
7c673cae FG |
148 | if (sock_fd < 0) { |
149 | int err = errno; | |
150 | ostringstream oss; | |
151 | oss << "OutputDataSocket::bind_and_listen: " | |
152 | << "failed to create socket: " << cpp_strerror(err); | |
153 | return oss.str(); | |
154 | } | |
92f5a8d4 | 155 | // FIPS zeroization audit 20191115: this memset is not security related. |
7c673cae FG |
156 | memset(&address, 0, sizeof(struct sockaddr_un)); |
157 | address.sun_family = AF_UNIX; | |
158 | snprintf(address.sun_path, sizeof(address.sun_path), | |
159 | "%s", sock_path.c_str()); | |
160 | if (::bind(sock_fd, (struct sockaddr*)&address, | |
161 | sizeof(struct sockaddr_un)) != 0) { | |
162 | int err = errno; | |
163 | if (err == EADDRINUSE) { | |
164 | // The old UNIX domain socket must still be there. | |
165 | // Let's unlink it and try again. | |
166 | VOID_TEMP_FAILURE_RETRY(unlink(sock_path.c_str())); | |
167 | if (::bind(sock_fd, (struct sockaddr*)&address, | |
168 | sizeof(struct sockaddr_un)) == 0) { | |
169 | err = 0; | |
170 | } | |
171 | else { | |
172 | err = errno; | |
173 | } | |
174 | } | |
175 | if (err != 0) { | |
176 | ostringstream oss; | |
177 | oss << "OutputDataSocket::bind_and_listen: " | |
178 | << "failed to bind the UNIX domain socket to '" << sock_path | |
179 | << "': " << cpp_strerror(err); | |
180 | close(sock_fd); | |
181 | return oss.str(); | |
182 | } | |
183 | } | |
184 | if (listen(sock_fd, 5) != 0) { | |
185 | int err = errno; | |
186 | ostringstream oss; | |
187 | oss << "OutputDataSocket::bind_and_listen: " | |
188 | << "failed to listen to socket: " << cpp_strerror(err); | |
189 | close(sock_fd); | |
190 | VOID_TEMP_FAILURE_RETRY(unlink(sock_path.c_str())); | |
191 | return oss.str(); | |
192 | } | |
193 | *fd = sock_fd; | |
194 | return ""; | |
195 | } | |
196 | ||
197 | void* OutputDataSocket::entry() | |
198 | { | |
199 | ldout(m_cct, 5) << "entry start" << dendl; | |
200 | while (true) { | |
201 | struct pollfd fds[2]; | |
92f5a8d4 | 202 | // FIPS zeroization audit 20191115: this memset is not security related. |
7c673cae FG |
203 | memset(fds, 0, sizeof(fds)); |
204 | fds[0].fd = m_sock_fd; | |
205 | fds[0].events = POLLIN | POLLRDBAND; | |
206 | fds[1].fd = m_shutdown_rd_fd; | |
207 | fds[1].events = POLLIN | POLLRDBAND; | |
208 | ||
209 | int ret = poll(fds, 2, -1); | |
210 | if (ret < 0) { | |
211 | int err = errno; | |
212 | if (err == EINTR) { | |
213 | continue; | |
214 | } | |
215 | lderr(m_cct) << "OutputDataSocket: poll(2) error: '" | |
216 | << cpp_strerror(err) << dendl; | |
217 | return PFL_FAIL; | |
218 | } | |
219 | ||
220 | if (fds[0].revents & POLLIN) { | |
221 | // Send out some data | |
222 | do_accept(); | |
223 | } | |
224 | if (fds[1].revents & POLLIN) { | |
225 | // Parent wants us to shut down | |
226 | return PFL_SUCCESS; | |
227 | } | |
228 | } | |
229 | ldout(m_cct, 5) << "entry exit" << dendl; | |
230 | ||
231 | return PFL_SUCCESS; // unreachable | |
232 | } | |
233 | ||
234 | ||
235 | bool OutputDataSocket::do_accept() | |
236 | { | |
237 | struct sockaddr_un address; | |
238 | socklen_t address_length = sizeof(address); | |
239 | ldout(m_cct, 30) << "OutputDataSocket: calling accept" << dendl; | |
91327a77 | 240 | int connection_fd = accept_cloexec(m_sock_fd, (struct sockaddr*) &address, |
7c673cae | 241 | &address_length); |
7c673cae FG |
242 | if (connection_fd < 0) { |
243 | int err = errno; | |
244 | lderr(m_cct) << "OutputDataSocket: do_accept error: '" | |
245 | << cpp_strerror(err) << dendl; | |
246 | return false; | |
247 | } | |
91327a77 | 248 | ldout(m_cct, 30) << "OutputDataSocket: finished accept" << dendl; |
7c673cae FG |
249 | |
250 | handle_connection(connection_fd); | |
251 | close_connection(connection_fd); | |
252 | ||
253 | return 0; | |
254 | } | |
255 | ||
256 | void OutputDataSocket::handle_connection(int fd) | |
257 | { | |
f67539c2 | 258 | ceph::buffer::list bl; |
7c673cae | 259 | |
11fdf7f2 | 260 | m_lock.lock(); |
7c673cae | 261 | init_connection(bl); |
11fdf7f2 | 262 | m_lock.unlock(); |
7c673cae FG |
263 | |
264 | if (bl.length()) { | |
265 | /* need to special case the connection init buffer output, as it needs | |
266 | * to be dumped before any data, including older data that was sent | |
267 | * before the connection was established, or before we identified | |
268 | * older connection was broken | |
269 | */ | |
270 | int ret = safe_write(fd, bl.c_str(), bl.length()); | |
271 | if (ret < 0) { | |
272 | return; | |
273 | } | |
274 | } | |
275 | ||
276 | int ret = dump_data(fd); | |
277 | if (ret < 0) | |
278 | return; | |
279 | ||
280 | do { | |
11fdf7f2 TL |
281 | { |
282 | std::unique_lock l(m_lock); | |
283 | if (!going_down) { | |
284 | cond.wait(l); | |
285 | } | |
286 | if (going_down) { | |
287 | break; | |
288 | } | |
7c673cae | 289 | } |
7c673cae FG |
290 | ret = dump_data(fd); |
291 | } while (ret >= 0); | |
292 | } | |
293 | ||
294 | int OutputDataSocket::dump_data(int fd) | |
295 | { | |
11fdf7f2 | 296 | m_lock.lock(); |
f67539c2 | 297 | auto l = std::move(data); |
7c673cae FG |
298 | data.clear(); |
299 | data_size = 0; | |
11fdf7f2 | 300 | m_lock.unlock(); |
7c673cae | 301 | |
494da23a | 302 | for (auto iter = l.begin(); iter != l.end(); ++iter) { |
f67539c2 | 303 | ceph::buffer::list& bl = *iter; |
7c673cae FG |
304 | int ret = safe_write(fd, bl.c_str(), bl.length()); |
305 | if (ret >= 0) { | |
306 | ret = safe_write(fd, delim.c_str(), delim.length()); | |
307 | } | |
308 | if (ret < 0) { | |
494da23a | 309 | std::scoped_lock lock(m_lock); |
7c673cae | 310 | for (; iter != l.end(); ++iter) { |
f67539c2 | 311 | ceph::buffer::list& bl = *iter; |
7c673cae FG |
312 | data.push_back(bl); |
313 | data_size += bl.length(); | |
314 | } | |
315 | return ret; | |
316 | } | |
317 | } | |
318 | ||
319 | return 0; | |
320 | } | |
321 | ||
322 | void OutputDataSocket::close_connection(int fd) | |
323 | { | |
324 | VOID_TEMP_FAILURE_RETRY(close(fd)); | |
325 | } | |
326 | ||
327 | bool OutputDataSocket::init(const std::string &path) | |
328 | { | |
329 | ldout(m_cct, 5) << "init " << path << dendl; | |
330 | ||
331 | /* Set up things for the new thread */ | |
332 | std::string err; | |
333 | int pipe_rd = -1, pipe_wr = -1; | |
334 | err = create_shutdown_pipe(&pipe_rd, &pipe_wr); | |
335 | if (!err.empty()) { | |
336 | lderr(m_cct) << "OutputDataSocketConfigObs::init: error: " << err << dendl; | |
337 | return false; | |
338 | } | |
339 | int sock_fd; | |
340 | err = bind_and_listen(path, &sock_fd); | |
341 | if (!err.empty()) { | |
342 | lderr(m_cct) << "OutputDataSocketConfigObs::init: failed: " << err << dendl; | |
343 | close(pipe_rd); | |
344 | close(pipe_wr); | |
345 | return false; | |
346 | } | |
347 | ||
348 | /* Create new thread */ | |
349 | m_sock_fd = sock_fd; | |
350 | m_shutdown_rd_fd = pipe_rd; | |
351 | m_shutdown_wr_fd = pipe_wr; | |
352 | m_path = path; | |
353 | create("out_data_socket"); | |
354 | add_cleanup_file(m_path.c_str()); | |
355 | return true; | |
356 | } | |
357 | ||
358 | void OutputDataSocket::shutdown() | |
359 | { | |
11fdf7f2 | 360 | m_lock.lock(); |
7c673cae | 361 | going_down = true; |
11fdf7f2 TL |
362 | cond.notify_all(); |
363 | m_lock.unlock(); | |
7c673cae FG |
364 | |
365 | if (m_shutdown_wr_fd < 0) | |
366 | return; | |
367 | ||
368 | ldout(m_cct, 5) << "shutdown" << dendl; | |
369 | ||
370 | // Send a byte to the shutdown pipe that the thread is listening to | |
371 | char buf[1] = { 0x0 }; | |
372 | int ret = safe_write(m_shutdown_wr_fd, buf, sizeof(buf)); | |
373 | VOID_TEMP_FAILURE_RETRY(close(m_shutdown_wr_fd)); | |
374 | m_shutdown_wr_fd = -1; | |
375 | ||
376 | if (ret == 0) { | |
377 | join(); | |
378 | } else { | |
379 | lderr(m_cct) << "OutputDataSocket::shutdown: failed to write " | |
380 | "to thread shutdown pipe: error " << ret << dendl; | |
381 | } | |
382 | ||
383 | remove_cleanup_file(m_path.c_str()); | |
384 | m_path.clear(); | |
385 | } | |
386 | ||
f67539c2 | 387 | void OutputDataSocket::append_output(ceph::buffer::list& bl) |
7c673cae | 388 | { |
11fdf7f2 | 389 | std::lock_guard l(m_lock); |
7c673cae FG |
390 | |
391 | if (data_size + bl.length() > data_max_backlog) { | |
494da23a TL |
392 | if (skipped % 100 == 0) { |
393 | ldout(m_cct, 0) << "dropping data output, max backlog reached (skipped==" | |
394 | << skipped << ")" | |
395 | << dendl; | |
396 | skipped = 1; | |
397 | } else | |
398 | ++skipped; | |
39ae355f TL |
399 | |
400 | cond.notify_all(); | |
494da23a | 401 | return; |
7c673cae | 402 | } |
7c673cae | 403 | |
494da23a | 404 | data.push_back(bl); |
7c673cae | 405 | data_size += bl.length(); |
11fdf7f2 | 406 | cond.notify_all(); |
7c673cae | 407 | } |