1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2011 New Dream Network
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
15 #include "common/OutputDataSocket.h"
16 #include "common/errno.h"
17 #include "common/pipe.h"
18 #include "common/safe_io.h"
19 #include "include/compat.h"
24 // re-include our assert to clobber the system one; fix dout:
25 #include "include/assert.h"
27 #define dout_subsys ceph_subsys_asok
29 #define dout_prefix *_dout << "asok(" << (void*)m_cct << ") "
31 using std::ostringstream
;
34 * UNIX domain sockets created by an application persist even after that
35 * application closes, unless they're explicitly unlinked. This is because the
36 * directory containing the socket keeps a reference to the socket.
38 * This code makes things a little nicer by unlinking those dead sockets when
39 * the application exits normally.
41 static pthread_mutex_t cleanup_lock
= PTHREAD_MUTEX_INITIALIZER
;
42 static std::vector
<const char*> cleanup_files
;
43 static bool cleanup_atexit
= false;
45 static void remove_cleanup_file(const char *file
)
47 pthread_mutex_lock(&cleanup_lock
);
48 VOID_TEMP_FAILURE_RETRY(unlink(file
));
49 for (std::vector
<const char*>::iterator i
= cleanup_files
.begin();
50 i
!= cleanup_files
.end(); ++i
) {
51 if (strcmp(file
, *i
) == 0) {
53 cleanup_files
.erase(i
);
57 pthread_mutex_unlock(&cleanup_lock
);
60 static void remove_all_cleanup_files()
62 pthread_mutex_lock(&cleanup_lock
);
63 for (std::vector
<const char*>::iterator i
= cleanup_files
.begin();
64 i
!= cleanup_files
.end(); ++i
) {
65 VOID_TEMP_FAILURE_RETRY(unlink(*i
));
68 cleanup_files
.clear();
69 pthread_mutex_unlock(&cleanup_lock
);
72 static void add_cleanup_file(const char *file
)
74 char *fname
= strdup(file
);
77 pthread_mutex_lock(&cleanup_lock
);
78 cleanup_files
.push_back(fname
);
79 if (!cleanup_atexit
) {
80 atexit(remove_all_cleanup_files
);
81 cleanup_atexit
= true;
83 pthread_mutex_unlock(&cleanup_lock
);
87 OutputDataSocket::OutputDataSocket(CephContext
*cct
, uint64_t _backlog
)
89 data_max_backlog(_backlog
),
95 m_lock("OutputDataSocket::m_lock")
99 OutputDataSocket::~OutputDataSocket()
105 * This thread listens on the UNIX domain socket for incoming connections.
106 * It only handles one connection at a time at the moment. All I/O is nonblocking,
107 * so that we can implement sensible timeouts. [TODO: make all I/O nonblocking]
109 * This thread also listens to m_shutdown_rd_fd. If there is any data sent to this
110 * pipe, the thread terminates itself gracefully, allowing the
111 * OutputDataSocketConfigObs class to join() it.
114 #define PFL_SUCCESS ((void*)(intptr_t)0)
115 #define PFL_FAIL ((void*)(intptr_t)1)
117 std::string
OutputDataSocket::create_shutdown_pipe(int *pipe_rd
, int *pipe_wr
)
120 int ret
= pipe_cloexec(pipefd
);
123 oss
<< "OutputDataSocket::create_shutdown_pipe error: " << cpp_strerror(ret
);
127 *pipe_rd
= pipefd
[0];
128 *pipe_wr
= pipefd
[1];
132 std::string
OutputDataSocket::bind_and_listen(const std::string
&sock_path
, int *fd
)
134 ldout(m_cct
, 5) << "bind_and_listen " << sock_path
<< dendl
;
136 struct sockaddr_un address
;
137 if (sock_path
.size() > sizeof(address
.sun_path
) - 1) {
139 oss
<< "OutputDataSocket::bind_and_listen: "
140 << "The UNIX domain socket path " << sock_path
<< " is too long! The "
141 << "maximum length on this system is "
142 << (sizeof(address
.sun_path
) - 1);
145 int sock_fd
= socket(PF_UNIX
, SOCK_STREAM
, 0);
149 oss
<< "OutputDataSocket::bind_and_listen: "
150 << "failed to create socket: " << cpp_strerror(err
);
153 int r
= fcntl(sock_fd
, F_SETFD
, FD_CLOEXEC
);
156 VOID_TEMP_FAILURE_RETRY(::close(sock_fd
));
158 oss
<< "OutputDataSocket::bind_and_listen: failed to fcntl on socket: " << cpp_strerror(r
);
161 memset(&address
, 0, sizeof(struct sockaddr_un
));
162 address
.sun_family
= AF_UNIX
;
163 snprintf(address
.sun_path
, sizeof(address
.sun_path
),
164 "%s", sock_path
.c_str());
165 if (::bind(sock_fd
, (struct sockaddr
*)&address
,
166 sizeof(struct sockaddr_un
)) != 0) {
168 if (err
== EADDRINUSE
) {
169 // The old UNIX domain socket must still be there.
170 // Let's unlink it and try again.
171 VOID_TEMP_FAILURE_RETRY(unlink(sock_path
.c_str()));
172 if (::bind(sock_fd
, (struct sockaddr
*)&address
,
173 sizeof(struct sockaddr_un
)) == 0) {
182 oss
<< "OutputDataSocket::bind_and_listen: "
183 << "failed to bind the UNIX domain socket to '" << sock_path
184 << "': " << cpp_strerror(err
);
189 if (listen(sock_fd
, 5) != 0) {
192 oss
<< "OutputDataSocket::bind_and_listen: "
193 << "failed to listen to socket: " << cpp_strerror(err
);
195 VOID_TEMP_FAILURE_RETRY(unlink(sock_path
.c_str()));
202 void* OutputDataSocket::entry()
204 ldout(m_cct
, 5) << "entry start" << dendl
;
206 struct pollfd fds
[2];
207 memset(fds
, 0, sizeof(fds
));
208 fds
[0].fd
= m_sock_fd
;
209 fds
[0].events
= POLLIN
| POLLRDBAND
;
210 fds
[1].fd
= m_shutdown_rd_fd
;
211 fds
[1].events
= POLLIN
| POLLRDBAND
;
213 int ret
= poll(fds
, 2, -1);
219 lderr(m_cct
) << "OutputDataSocket: poll(2) error: '"
220 << cpp_strerror(err
) << dendl
;
224 if (fds
[0].revents
& POLLIN
) {
225 // Send out some data
228 if (fds
[1].revents
& POLLIN
) {
229 // Parent wants us to shut down
233 ldout(m_cct
, 5) << "entry exit" << dendl
;
235 return PFL_SUCCESS
; // unreachable
239 bool OutputDataSocket::do_accept()
241 struct sockaddr_un address
;
242 socklen_t address_length
= sizeof(address
);
243 ldout(m_cct
, 30) << "OutputDataSocket: calling accept" << dendl
;
244 int connection_fd
= accept(m_sock_fd
, (struct sockaddr
*) &address
,
246 ldout(m_cct
, 30) << "OutputDataSocket: finished accept" << dendl
;
247 if (connection_fd
< 0) {
249 lderr(m_cct
) << "OutputDataSocket: do_accept error: '"
250 << cpp_strerror(err
) << dendl
;
254 handle_connection(connection_fd
);
255 close_connection(connection_fd
);
260 void OutputDataSocket::handle_connection(int fd
)
269 /* need to special case the connection init buffer output, as it needs
270 * to be dumped before any data, including older data that was sent
271 * before the connection was established, or before we identified
272 * older connection was broken
274 int ret
= safe_write(fd
, bl
.c_str(), bl
.length());
280 int ret
= dump_data(fd
);
298 int OutputDataSocket::dump_data(int fd
)
301 list
<bufferlist
> l
= std::move(data
);
306 for (list
<bufferlist
>::iterator iter
= l
.begin(); iter
!= l
.end(); ++iter
) {
307 bufferlist
& bl
= *iter
;
308 int ret
= safe_write(fd
, bl
.c_str(), bl
.length());
310 ret
= safe_write(fd
, delim
.c_str(), delim
.length());
313 for (; iter
!= l
.end(); ++iter
) {
314 bufferlist
& bl
= *iter
;
316 data_size
+= bl
.length();
325 void OutputDataSocket::close_connection(int fd
)
327 VOID_TEMP_FAILURE_RETRY(close(fd
));
330 bool OutputDataSocket::init(const std::string
&path
)
332 ldout(m_cct
, 5) << "init " << path
<< dendl
;
334 /* Set up things for the new thread */
336 int pipe_rd
= -1, pipe_wr
= -1;
337 err
= create_shutdown_pipe(&pipe_rd
, &pipe_wr
);
339 lderr(m_cct
) << "OutputDataSocketConfigObs::init: error: " << err
<< dendl
;
343 err
= bind_and_listen(path
, &sock_fd
);
345 lderr(m_cct
) << "OutputDataSocketConfigObs::init: failed: " << err
<< dendl
;
351 /* Create new thread */
353 m_shutdown_rd_fd
= pipe_rd
;
354 m_shutdown_wr_fd
= pipe_wr
;
356 create("out_data_socket");
357 add_cleanup_file(m_path
.c_str());
361 void OutputDataSocket::shutdown()
368 if (m_shutdown_wr_fd
< 0)
371 ldout(m_cct
, 5) << "shutdown" << dendl
;
373 // Send a byte to the shutdown pipe that the thread is listening to
374 char buf
[1] = { 0x0 };
375 int ret
= safe_write(m_shutdown_wr_fd
, buf
, sizeof(buf
));
376 VOID_TEMP_FAILURE_RETRY(close(m_shutdown_wr_fd
));
377 m_shutdown_wr_fd
= -1;
382 lderr(m_cct
) << "OutputDataSocket::shutdown: failed to write "
383 "to thread shutdown pipe: error " << ret
<< dendl
;
386 remove_cleanup_file(m_path
.c_str());
390 void OutputDataSocket::append_output(bufferlist
& bl
)
392 Mutex::Locker
l(m_lock
);
394 if (data_size
+ bl
.length() > data_max_backlog
) {
395 ldout(m_cct
, 20) << "dropping data output, max backlog reached" << dendl
;
399 data_size
+= bl
.length();