]> git.proxmox.com Git - ceph.git/blame - ceph/src/common/OutputDataSocket.cc
import ceph quincy 17.2.6
[ceph.git] / ceph / src / common / OutputDataSocket.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2011 New Dream Network
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
11fdf7f2
TL
15#include <poll.h>
16#include <sys/un.h>
17#include <unistd.h>
18
7c673cae 19#include "common/OutputDataSocket.h"
7c673cae 20#include "common/errno.h"
11fdf7f2 21#include "common/debug.h"
7c673cae 22#include "common/safe_io.h"
31f18b77 23#include "include/compat.h"
91327a77 24#include "include/sock_compat.h"
7c673cae 25
31f18b77 26// re-include our assert to clobber the system one; fix dout:
11fdf7f2 27#include "include/ceph_assert.h"
7c673cae
FG
28
29#define dout_subsys ceph_subsys_asok
30#undef dout_prefix
31#define dout_prefix *_dout << "asok(" << (void*)m_cct << ") "
32
33using std::ostringstream;
34
35/*
36 * UNIX domain sockets created by an application persist even after that
37 * application closes, unless they're explicitly unlinked. This is because the
38 * directory containing the socket keeps a reference to the socket.
39 *
40 * This code makes things a little nicer by unlinking those dead sockets when
41 * the application exits normally.
42 */
43static pthread_mutex_t cleanup_lock = PTHREAD_MUTEX_INITIALIZER;
44static std::vector <const char*> cleanup_files;
45static bool cleanup_atexit = false;
46
47static void remove_cleanup_file(const char *file)
48{
49 pthread_mutex_lock(&cleanup_lock);
50 VOID_TEMP_FAILURE_RETRY(unlink(file));
51 for (std::vector <const char*>::iterator i = cleanup_files.begin();
52 i != cleanup_files.end(); ++i) {
53 if (strcmp(file, *i) == 0) {
54 free((void*)*i);
55 cleanup_files.erase(i);
56 break;
57 }
58 }
59 pthread_mutex_unlock(&cleanup_lock);
60}
61
62static void remove_all_cleanup_files()
63{
64 pthread_mutex_lock(&cleanup_lock);
65 for (std::vector <const char*>::iterator i = cleanup_files.begin();
66 i != cleanup_files.end(); ++i) {
67 VOID_TEMP_FAILURE_RETRY(unlink(*i));
68 free((void*)*i);
69 }
70 cleanup_files.clear();
71 pthread_mutex_unlock(&cleanup_lock);
72}
73
74static void add_cleanup_file(const char *file)
75{
76 char *fname = strdup(file);
77 if (!fname)
78 return;
79 pthread_mutex_lock(&cleanup_lock);
80 cleanup_files.push_back(fname);
81 if (!cleanup_atexit) {
82 atexit(remove_all_cleanup_files);
83 cleanup_atexit = true;
84 }
85 pthread_mutex_unlock(&cleanup_lock);
86}
87
88
89OutputDataSocket::OutputDataSocket(CephContext *cct, uint64_t _backlog)
90 : m_cct(cct),
91 data_max_backlog(_backlog),
92 m_sock_fd(-1),
93 m_shutdown_rd_fd(-1),
94 m_shutdown_wr_fd(-1),
95 going_down(false),
494da23a
TL
96 data_size(0),
97 skipped(0)
7c673cae
FG
98{
99}
100
101OutputDataSocket::~OutputDataSocket()
102{
103 shutdown();
104}
105
106/*
107 * This thread listens on the UNIX domain socket for incoming connections.
108 * It only handles one connection at a time at the moment. All I/O is nonblocking,
109 * so that we can implement sensible timeouts. [TODO: make all I/O nonblocking]
110 *
111 * This thread also listens to m_shutdown_rd_fd. If there is any data sent to this
112 * pipe, the thread terminates itself gracefully, allowing the
113 * OutputDataSocketConfigObs class to join() it.
114 */
115
116#define PFL_SUCCESS ((void*)(intptr_t)0)
117#define PFL_FAIL ((void*)(intptr_t)1)
118
119std::string OutputDataSocket::create_shutdown_pipe(int *pipe_rd, int *pipe_wr)
120{
121 int pipefd[2];
9f95a23c 122 if (pipe_cloexec(pipefd, 0) < 0) {
91327a77 123 int e = errno;
7c673cae 124 ostringstream oss;
91327a77 125 oss << "OutputDataSocket::create_shutdown_pipe error: " << cpp_strerror(e);
7c673cae
FG
126 return oss.str();
127 }
128
129 *pipe_rd = pipefd[0];
130 *pipe_wr = pipefd[1];
131 return "";
132}
133
134std::string OutputDataSocket::bind_and_listen(const std::string &sock_path, int *fd)
135{
136 ldout(m_cct, 5) << "bind_and_listen " << sock_path << dendl;
137
138 struct sockaddr_un address;
139 if (sock_path.size() > sizeof(address.sun_path) - 1) {
140 ostringstream oss;
141 oss << "OutputDataSocket::bind_and_listen: "
142 << "The UNIX domain socket path " << sock_path << " is too long! The "
143 << "maximum length on this system is "
144 << (sizeof(address.sun_path) - 1);
145 return oss.str();
146 }
91327a77 147 int sock_fd = socket_cloexec(PF_UNIX, SOCK_STREAM, 0);
7c673cae
FG
148 if (sock_fd < 0) {
149 int err = errno;
150 ostringstream oss;
151 oss << "OutputDataSocket::bind_and_listen: "
152 << "failed to create socket: " << cpp_strerror(err);
153 return oss.str();
154 }
92f5a8d4 155 // FIPS zeroization audit 20191115: this memset is not security related.
7c673cae
FG
156 memset(&address, 0, sizeof(struct sockaddr_un));
157 address.sun_family = AF_UNIX;
158 snprintf(address.sun_path, sizeof(address.sun_path),
159 "%s", sock_path.c_str());
160 if (::bind(sock_fd, (struct sockaddr*)&address,
161 sizeof(struct sockaddr_un)) != 0) {
162 int err = errno;
163 if (err == EADDRINUSE) {
164 // The old UNIX domain socket must still be there.
165 // Let's unlink it and try again.
166 VOID_TEMP_FAILURE_RETRY(unlink(sock_path.c_str()));
167 if (::bind(sock_fd, (struct sockaddr*)&address,
168 sizeof(struct sockaddr_un)) == 0) {
169 err = 0;
170 }
171 else {
172 err = errno;
173 }
174 }
175 if (err != 0) {
176 ostringstream oss;
177 oss << "OutputDataSocket::bind_and_listen: "
178 << "failed to bind the UNIX domain socket to '" << sock_path
179 << "': " << cpp_strerror(err);
180 close(sock_fd);
181 return oss.str();
182 }
183 }
184 if (listen(sock_fd, 5) != 0) {
185 int err = errno;
186 ostringstream oss;
187 oss << "OutputDataSocket::bind_and_listen: "
188 << "failed to listen to socket: " << cpp_strerror(err);
189 close(sock_fd);
190 VOID_TEMP_FAILURE_RETRY(unlink(sock_path.c_str()));
191 return oss.str();
192 }
193 *fd = sock_fd;
194 return "";
195}
196
197void* OutputDataSocket::entry()
198{
199 ldout(m_cct, 5) << "entry start" << dendl;
200 while (true) {
201 struct pollfd fds[2];
92f5a8d4 202 // FIPS zeroization audit 20191115: this memset is not security related.
7c673cae
FG
203 memset(fds, 0, sizeof(fds));
204 fds[0].fd = m_sock_fd;
205 fds[0].events = POLLIN | POLLRDBAND;
206 fds[1].fd = m_shutdown_rd_fd;
207 fds[1].events = POLLIN | POLLRDBAND;
208
209 int ret = poll(fds, 2, -1);
210 if (ret < 0) {
211 int err = errno;
212 if (err == EINTR) {
213 continue;
214 }
215 lderr(m_cct) << "OutputDataSocket: poll(2) error: '"
216 << cpp_strerror(err) << dendl;
217 return PFL_FAIL;
218 }
219
220 if (fds[0].revents & POLLIN) {
221 // Send out some data
222 do_accept();
223 }
224 if (fds[1].revents & POLLIN) {
225 // Parent wants us to shut down
226 return PFL_SUCCESS;
227 }
228 }
229 ldout(m_cct, 5) << "entry exit" << dendl;
230
231 return PFL_SUCCESS; // unreachable
232}
233
234
235bool OutputDataSocket::do_accept()
236{
237 struct sockaddr_un address;
238 socklen_t address_length = sizeof(address);
239 ldout(m_cct, 30) << "OutputDataSocket: calling accept" << dendl;
91327a77 240 int connection_fd = accept_cloexec(m_sock_fd, (struct sockaddr*) &address,
7c673cae 241 &address_length);
7c673cae
FG
242 if (connection_fd < 0) {
243 int err = errno;
244 lderr(m_cct) << "OutputDataSocket: do_accept error: '"
245 << cpp_strerror(err) << dendl;
246 return false;
247 }
91327a77 248 ldout(m_cct, 30) << "OutputDataSocket: finished accept" << dendl;
7c673cae
FG
249
250 handle_connection(connection_fd);
251 close_connection(connection_fd);
252
253 return 0;
254}
255
256void OutputDataSocket::handle_connection(int fd)
257{
f67539c2 258 ceph::buffer::list bl;
7c673cae 259
11fdf7f2 260 m_lock.lock();
7c673cae 261 init_connection(bl);
11fdf7f2 262 m_lock.unlock();
7c673cae
FG
263
264 if (bl.length()) {
265 /* need to special case the connection init buffer output, as it needs
266 * to be dumped before any data, including older data that was sent
267 * before the connection was established, or before we identified
268 * older connection was broken
269 */
270 int ret = safe_write(fd, bl.c_str(), bl.length());
271 if (ret < 0) {
272 return;
273 }
274 }
275
276 int ret = dump_data(fd);
277 if (ret < 0)
278 return;
279
280 do {
11fdf7f2
TL
281 {
282 std::unique_lock l(m_lock);
283 if (!going_down) {
284 cond.wait(l);
285 }
286 if (going_down) {
287 break;
288 }
7c673cae 289 }
7c673cae
FG
290 ret = dump_data(fd);
291 } while (ret >= 0);
292}
293
294int OutputDataSocket::dump_data(int fd)
295{
11fdf7f2 296 m_lock.lock();
f67539c2 297 auto l = std::move(data);
7c673cae
FG
298 data.clear();
299 data_size = 0;
11fdf7f2 300 m_lock.unlock();
7c673cae 301
494da23a 302 for (auto iter = l.begin(); iter != l.end(); ++iter) {
f67539c2 303 ceph::buffer::list& bl = *iter;
7c673cae
FG
304 int ret = safe_write(fd, bl.c_str(), bl.length());
305 if (ret >= 0) {
306 ret = safe_write(fd, delim.c_str(), delim.length());
307 }
308 if (ret < 0) {
494da23a 309 std::scoped_lock lock(m_lock);
7c673cae 310 for (; iter != l.end(); ++iter) {
f67539c2 311 ceph::buffer::list& bl = *iter;
7c673cae
FG
312 data.push_back(bl);
313 data_size += bl.length();
314 }
315 return ret;
316 }
317 }
318
319 return 0;
320}
321
322void OutputDataSocket::close_connection(int fd)
323{
324 VOID_TEMP_FAILURE_RETRY(close(fd));
325}
326
327bool OutputDataSocket::init(const std::string &path)
328{
329 ldout(m_cct, 5) << "init " << path << dendl;
330
331 /* Set up things for the new thread */
332 std::string err;
333 int pipe_rd = -1, pipe_wr = -1;
334 err = create_shutdown_pipe(&pipe_rd, &pipe_wr);
335 if (!err.empty()) {
336 lderr(m_cct) << "OutputDataSocketConfigObs::init: error: " << err << dendl;
337 return false;
338 }
339 int sock_fd;
340 err = bind_and_listen(path, &sock_fd);
341 if (!err.empty()) {
342 lderr(m_cct) << "OutputDataSocketConfigObs::init: failed: " << err << dendl;
343 close(pipe_rd);
344 close(pipe_wr);
345 return false;
346 }
347
348 /* Create new thread */
349 m_sock_fd = sock_fd;
350 m_shutdown_rd_fd = pipe_rd;
351 m_shutdown_wr_fd = pipe_wr;
352 m_path = path;
353 create("out_data_socket");
354 add_cleanup_file(m_path.c_str());
355 return true;
356}
357
358void OutputDataSocket::shutdown()
359{
11fdf7f2 360 m_lock.lock();
7c673cae 361 going_down = true;
11fdf7f2
TL
362 cond.notify_all();
363 m_lock.unlock();
7c673cae
FG
364
365 if (m_shutdown_wr_fd < 0)
366 return;
367
368 ldout(m_cct, 5) << "shutdown" << dendl;
369
370 // Send a byte to the shutdown pipe that the thread is listening to
371 char buf[1] = { 0x0 };
372 int ret = safe_write(m_shutdown_wr_fd, buf, sizeof(buf));
373 VOID_TEMP_FAILURE_RETRY(close(m_shutdown_wr_fd));
374 m_shutdown_wr_fd = -1;
375
376 if (ret == 0) {
377 join();
378 } else {
379 lderr(m_cct) << "OutputDataSocket::shutdown: failed to write "
380 "to thread shutdown pipe: error " << ret << dendl;
381 }
382
383 remove_cleanup_file(m_path.c_str());
384 m_path.clear();
385}
386
f67539c2 387void OutputDataSocket::append_output(ceph::buffer::list& bl)
7c673cae 388{
11fdf7f2 389 std::lock_guard l(m_lock);
7c673cae
FG
390
391 if (data_size + bl.length() > data_max_backlog) {
494da23a
TL
392 if (skipped % 100 == 0) {
393 ldout(m_cct, 0) << "dropping data output, max backlog reached (skipped=="
394 << skipped << ")"
395 << dendl;
396 skipped = 1;
397 } else
398 ++skipped;
39ae355f
TL
399
400 cond.notify_all();
494da23a 401 return;
7c673cae 402 }
7c673cae 403
494da23a 404 data.push_back(bl);
7c673cae 405 data_size += bl.length();
11fdf7f2 406 cond.notify_all();
7c673cae 407}