1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2011 New Dream Network
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
19 #include "common/OutputDataSocket.h"
20 #include "common/errno.h"
21 #include "common/debug.h"
22 #include "common/safe_io.h"
23 #include "include/compat.h"
24 #include "include/sock_compat.h"
26 // re-include our assert to clobber the system one; fix dout:
27 #include "include/ceph_assert.h"
29 #define dout_subsys ceph_subsys_asok
31 #define dout_prefix *_dout << "asok(" << (void*)m_cct << ") "
33 using std::ostringstream
;
36 * UNIX domain sockets created by an application persist even after that
37 * application closes, unless they're explicitly unlinked. This is because the
38 * directory containing the socket keeps a reference to the socket.
40 * This code makes things a little nicer by unlinking those dead sockets when
41 * the application exits normally.
43 static pthread_mutex_t cleanup_lock
= PTHREAD_MUTEX_INITIALIZER
;
44 static std::vector
<const char*> cleanup_files
;
45 static bool cleanup_atexit
= false;
47 static void remove_cleanup_file(const char *file
)
49 pthread_mutex_lock(&cleanup_lock
);
50 VOID_TEMP_FAILURE_RETRY(unlink(file
));
51 for (std::vector
<const char*>::iterator i
= cleanup_files
.begin();
52 i
!= cleanup_files
.end(); ++i
) {
53 if (strcmp(file
, *i
) == 0) {
55 cleanup_files
.erase(i
);
59 pthread_mutex_unlock(&cleanup_lock
);
62 static void remove_all_cleanup_files()
64 pthread_mutex_lock(&cleanup_lock
);
65 for (std::vector
<const char*>::iterator i
= cleanup_files
.begin();
66 i
!= cleanup_files
.end(); ++i
) {
67 VOID_TEMP_FAILURE_RETRY(unlink(*i
));
70 cleanup_files
.clear();
71 pthread_mutex_unlock(&cleanup_lock
);
74 static void add_cleanup_file(const char *file
)
76 char *fname
= strdup(file
);
79 pthread_mutex_lock(&cleanup_lock
);
80 cleanup_files
.push_back(fname
);
81 if (!cleanup_atexit
) {
82 atexit(remove_all_cleanup_files
);
83 cleanup_atexit
= true;
85 pthread_mutex_unlock(&cleanup_lock
);
89 OutputDataSocket::OutputDataSocket(CephContext
*cct
, uint64_t _backlog
)
91 data_max_backlog(_backlog
),
101 OutputDataSocket::~OutputDataSocket()
107 * This thread listens on the UNIX domain socket for incoming connections.
108 * It only handles one connection at a time at the moment. All I/O is nonblocking,
109 * so that we can implement sensible timeouts. [TODO: make all I/O nonblocking]
111 * This thread also listens to m_shutdown_rd_fd. If there is any data sent to this
112 * pipe, the thread terminates itself gracefully, allowing the
113 * OutputDataSocketConfigObs class to join() it.
116 #define PFL_SUCCESS ((void*)(intptr_t)0)
117 #define PFL_FAIL ((void*)(intptr_t)1)
119 std::string
OutputDataSocket::create_shutdown_pipe(int *pipe_rd
, int *pipe_wr
)
122 if (pipe_cloexec(pipefd
, 0) < 0) {
125 oss
<< "OutputDataSocket::create_shutdown_pipe error: " << cpp_strerror(e
);
129 *pipe_rd
= pipefd
[0];
130 *pipe_wr
= pipefd
[1];
134 std::string
OutputDataSocket::bind_and_listen(const std::string
&sock_path
, int *fd
)
136 ldout(m_cct
, 5) << "bind_and_listen " << sock_path
<< dendl
;
138 struct sockaddr_un address
;
139 if (sock_path
.size() > sizeof(address
.sun_path
) - 1) {
141 oss
<< "OutputDataSocket::bind_and_listen: "
142 << "The UNIX domain socket path " << sock_path
<< " is too long! The "
143 << "maximum length on this system is "
144 << (sizeof(address
.sun_path
) - 1);
147 int sock_fd
= socket_cloexec(PF_UNIX
, SOCK_STREAM
, 0);
151 oss
<< "OutputDataSocket::bind_and_listen: "
152 << "failed to create socket: " << cpp_strerror(err
);
155 // FIPS zeroization audit 20191115: this memset is not security related.
156 memset(&address
, 0, sizeof(struct sockaddr_un
));
157 address
.sun_family
= AF_UNIX
;
158 snprintf(address
.sun_path
, sizeof(address
.sun_path
),
159 "%s", sock_path
.c_str());
160 if (::bind(sock_fd
, (struct sockaddr
*)&address
,
161 sizeof(struct sockaddr_un
)) != 0) {
163 if (err
== EADDRINUSE
) {
164 // The old UNIX domain socket must still be there.
165 // Let's unlink it and try again.
166 VOID_TEMP_FAILURE_RETRY(unlink(sock_path
.c_str()));
167 if (::bind(sock_fd
, (struct sockaddr
*)&address
,
168 sizeof(struct sockaddr_un
)) == 0) {
177 oss
<< "OutputDataSocket::bind_and_listen: "
178 << "failed to bind the UNIX domain socket to '" << sock_path
179 << "': " << cpp_strerror(err
);
184 if (listen(sock_fd
, 5) != 0) {
187 oss
<< "OutputDataSocket::bind_and_listen: "
188 << "failed to listen to socket: " << cpp_strerror(err
);
190 VOID_TEMP_FAILURE_RETRY(unlink(sock_path
.c_str()));
197 void* OutputDataSocket::entry()
199 ldout(m_cct
, 5) << "entry start" << dendl
;
201 struct pollfd fds
[2];
202 // FIPS zeroization audit 20191115: this memset is not security related.
203 memset(fds
, 0, sizeof(fds
));
204 fds
[0].fd
= m_sock_fd
;
205 fds
[0].events
= POLLIN
| POLLRDBAND
;
206 fds
[1].fd
= m_shutdown_rd_fd
;
207 fds
[1].events
= POLLIN
| POLLRDBAND
;
209 int ret
= poll(fds
, 2, -1);
215 lderr(m_cct
) << "OutputDataSocket: poll(2) error: '"
216 << cpp_strerror(err
) << dendl
;
220 if (fds
[0].revents
& POLLIN
) {
221 // Send out some data
224 if (fds
[1].revents
& POLLIN
) {
225 // Parent wants us to shut down
229 ldout(m_cct
, 5) << "entry exit" << dendl
;
231 return PFL_SUCCESS
; // unreachable
235 bool OutputDataSocket::do_accept()
237 struct sockaddr_un address
;
238 socklen_t address_length
= sizeof(address
);
239 ldout(m_cct
, 30) << "OutputDataSocket: calling accept" << dendl
;
240 int connection_fd
= accept_cloexec(m_sock_fd
, (struct sockaddr
*) &address
,
242 if (connection_fd
< 0) {
244 lderr(m_cct
) << "OutputDataSocket: do_accept error: '"
245 << cpp_strerror(err
) << dendl
;
248 ldout(m_cct
, 30) << "OutputDataSocket: finished accept" << dendl
;
250 handle_connection(connection_fd
);
251 close_connection(connection_fd
);
256 void OutputDataSocket::handle_connection(int fd
)
258 ceph::buffer::list bl
;
265 /* need to special case the connection init buffer output, as it needs
266 * to be dumped before any data, including older data that was sent
267 * before the connection was established, or before we identified
268 * older connection was broken
270 int ret
= safe_write(fd
, bl
.c_str(), bl
.length());
276 int ret
= dump_data(fd
);
282 std::unique_lock
l(m_lock
);
294 int OutputDataSocket::dump_data(int fd
)
297 auto l
= std::move(data
);
302 for (auto iter
= l
.begin(); iter
!= l
.end(); ++iter
) {
303 ceph::buffer::list
& bl
= *iter
;
304 int ret
= safe_write(fd
, bl
.c_str(), bl
.length());
306 ret
= safe_write(fd
, delim
.c_str(), delim
.length());
309 std::scoped_lock
lock(m_lock
);
310 for (; iter
!= l
.end(); ++iter
) {
311 ceph::buffer::list
& bl
= *iter
;
313 data_size
+= bl
.length();
322 void OutputDataSocket::close_connection(int fd
)
324 VOID_TEMP_FAILURE_RETRY(close(fd
));
327 bool OutputDataSocket::init(const std::string
&path
)
329 ldout(m_cct
, 5) << "init " << path
<< dendl
;
331 /* Set up things for the new thread */
333 int pipe_rd
= -1, pipe_wr
= -1;
334 err
= create_shutdown_pipe(&pipe_rd
, &pipe_wr
);
336 lderr(m_cct
) << "OutputDataSocketConfigObs::init: error: " << err
<< dendl
;
340 err
= bind_and_listen(path
, &sock_fd
);
342 lderr(m_cct
) << "OutputDataSocketConfigObs::init: failed: " << err
<< dendl
;
348 /* Create new thread */
350 m_shutdown_rd_fd
= pipe_rd
;
351 m_shutdown_wr_fd
= pipe_wr
;
353 create("out_data_socket");
354 add_cleanup_file(m_path
.c_str());
358 void OutputDataSocket::shutdown()
365 if (m_shutdown_wr_fd
< 0)
368 ldout(m_cct
, 5) << "shutdown" << dendl
;
370 // Send a byte to the shutdown pipe that the thread is listening to
371 char buf
[1] = { 0x0 };
372 int ret
= safe_write(m_shutdown_wr_fd
, buf
, sizeof(buf
));
373 VOID_TEMP_FAILURE_RETRY(close(m_shutdown_wr_fd
));
374 m_shutdown_wr_fd
= -1;
379 lderr(m_cct
) << "OutputDataSocket::shutdown: failed to write "
380 "to thread shutdown pipe: error " << ret
<< dendl
;
383 remove_cleanup_file(m_path
.c_str());
387 void OutputDataSocket::append_output(ceph::buffer::list
& bl
)
389 std::lock_guard
l(m_lock
);
391 if (data_size
+ bl
.length() > data_max_backlog
) {
392 if (skipped
% 100 == 0) {
393 ldout(m_cct
, 0) << "dropping data output, max backlog reached (skipped=="
403 data_size
+= bl
.length();