1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2011 New Dream Network
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
15 #include "common/OutputDataSocket.h"
16 #include "common/errno.h"
17 #include "common/safe_io.h"
18 #include "include/compat.h"
19 #include "include/sock_compat.h"
24 // re-include our assert to clobber the system one; fix dout:
25 #include "include/assert.h"
27 #define dout_subsys ceph_subsys_asok
29 #define dout_prefix *_dout << "asok(" << (void*)m_cct << ") "
31 using std::ostringstream
;
34 * UNIX domain sockets created by an application persist even after that
35 * application closes, unless they're explicitly unlinked. This is because the
36 * directory containing the socket keeps a reference to the socket.
38 * This code makes things a little nicer by unlinking those dead sockets when
39 * the application exits normally.
41 static pthread_mutex_t cleanup_lock
= PTHREAD_MUTEX_INITIALIZER
;
42 static std::vector
<const char*> cleanup_files
;
43 static bool cleanup_atexit
= false;
45 static void remove_cleanup_file(const char *file
)
47 pthread_mutex_lock(&cleanup_lock
);
48 VOID_TEMP_FAILURE_RETRY(unlink(file
));
49 for (std::vector
<const char*>::iterator i
= cleanup_files
.begin();
50 i
!= cleanup_files
.end(); ++i
) {
51 if (strcmp(file
, *i
) == 0) {
53 cleanup_files
.erase(i
);
57 pthread_mutex_unlock(&cleanup_lock
);
60 static void remove_all_cleanup_files()
62 pthread_mutex_lock(&cleanup_lock
);
63 for (std::vector
<const char*>::iterator i
= cleanup_files
.begin();
64 i
!= cleanup_files
.end(); ++i
) {
65 VOID_TEMP_FAILURE_RETRY(unlink(*i
));
68 cleanup_files
.clear();
69 pthread_mutex_unlock(&cleanup_lock
);
72 static void add_cleanup_file(const char *file
)
74 char *fname
= strdup(file
);
77 pthread_mutex_lock(&cleanup_lock
);
78 cleanup_files
.push_back(fname
);
79 if (!cleanup_atexit
) {
80 atexit(remove_all_cleanup_files
);
81 cleanup_atexit
= true;
83 pthread_mutex_unlock(&cleanup_lock
);
87 OutputDataSocket::OutputDataSocket(CephContext
*cct
, uint64_t _backlog
)
89 data_max_backlog(_backlog
),
95 m_lock("OutputDataSocket::m_lock")
99 OutputDataSocket::~OutputDataSocket()
105 * This thread listens on the UNIX domain socket for incoming connections.
106 * It only handles one connection at a time at the moment. All I/O is nonblocking,
107 * so that we can implement sensible timeouts. [TODO: make all I/O nonblocking]
109 * This thread also listens to m_shutdown_rd_fd. If there is any data sent to this
110 * pipe, the thread terminates itself gracefully, allowing the
111 * OutputDataSocketConfigObs class to join() it.
114 #define PFL_SUCCESS ((void*)(intptr_t)0)
115 #define PFL_FAIL ((void*)(intptr_t)1)
117 std::string
OutputDataSocket::create_shutdown_pipe(int *pipe_rd
, int *pipe_wr
)
120 if (pipe_cloexec(pipefd
) < 0) {
123 oss
<< "OutputDataSocket::create_shutdown_pipe error: " << cpp_strerror(e
);
127 *pipe_rd
= pipefd
[0];
128 *pipe_wr
= pipefd
[1];
132 std::string
OutputDataSocket::bind_and_listen(const std::string
&sock_path
, int *fd
)
134 ldout(m_cct
, 5) << "bind_and_listen " << sock_path
<< dendl
;
136 struct sockaddr_un address
;
137 if (sock_path
.size() > sizeof(address
.sun_path
) - 1) {
139 oss
<< "OutputDataSocket::bind_and_listen: "
140 << "The UNIX domain socket path " << sock_path
<< " is too long! The "
141 << "maximum length on this system is "
142 << (sizeof(address
.sun_path
) - 1);
145 int sock_fd
= socket_cloexec(PF_UNIX
, SOCK_STREAM
, 0);
149 oss
<< "OutputDataSocket::bind_and_listen: "
150 << "failed to create socket: " << cpp_strerror(err
);
153 memset(&address
, 0, sizeof(struct sockaddr_un
));
154 address
.sun_family
= AF_UNIX
;
155 snprintf(address
.sun_path
, sizeof(address
.sun_path
),
156 "%s", sock_path
.c_str());
157 if (::bind(sock_fd
, (struct sockaddr
*)&address
,
158 sizeof(struct sockaddr_un
)) != 0) {
160 if (err
== EADDRINUSE
) {
161 // The old UNIX domain socket must still be there.
162 // Let's unlink it and try again.
163 VOID_TEMP_FAILURE_RETRY(unlink(sock_path
.c_str()));
164 if (::bind(sock_fd
, (struct sockaddr
*)&address
,
165 sizeof(struct sockaddr_un
)) == 0) {
174 oss
<< "OutputDataSocket::bind_and_listen: "
175 << "failed to bind the UNIX domain socket to '" << sock_path
176 << "': " << cpp_strerror(err
);
181 if (listen(sock_fd
, 5) != 0) {
184 oss
<< "OutputDataSocket::bind_and_listen: "
185 << "failed to listen to socket: " << cpp_strerror(err
);
187 VOID_TEMP_FAILURE_RETRY(unlink(sock_path
.c_str()));
194 void* OutputDataSocket::entry()
196 ldout(m_cct
, 5) << "entry start" << dendl
;
198 struct pollfd fds
[2];
199 memset(fds
, 0, sizeof(fds
));
200 fds
[0].fd
= m_sock_fd
;
201 fds
[0].events
= POLLIN
| POLLRDBAND
;
202 fds
[1].fd
= m_shutdown_rd_fd
;
203 fds
[1].events
= POLLIN
| POLLRDBAND
;
205 int ret
= poll(fds
, 2, -1);
211 lderr(m_cct
) << "OutputDataSocket: poll(2) error: '"
212 << cpp_strerror(err
) << dendl
;
216 if (fds
[0].revents
& POLLIN
) {
217 // Send out some data
220 if (fds
[1].revents
& POLLIN
) {
221 // Parent wants us to shut down
225 ldout(m_cct
, 5) << "entry exit" << dendl
;
227 return PFL_SUCCESS
; // unreachable
231 bool OutputDataSocket::do_accept()
233 struct sockaddr_un address
;
234 socklen_t address_length
= sizeof(address
);
235 ldout(m_cct
, 30) << "OutputDataSocket: calling accept" << dendl
;
236 int connection_fd
= accept_cloexec(m_sock_fd
, (struct sockaddr
*) &address
,
238 if (connection_fd
< 0) {
240 lderr(m_cct
) << "OutputDataSocket: do_accept error: '"
241 << cpp_strerror(err
) << dendl
;
244 ldout(m_cct
, 30) << "OutputDataSocket: finished accept" << dendl
;
246 handle_connection(connection_fd
);
247 close_connection(connection_fd
);
252 void OutputDataSocket::handle_connection(int fd
)
261 /* need to special case the connection init buffer output, as it needs
262 * to be dumped before any data, including older data that was sent
263 * before the connection was established, or before we identified
264 * older connection was broken
266 int ret
= safe_write(fd
, bl
.c_str(), bl
.length());
272 int ret
= dump_data(fd
);
290 int OutputDataSocket::dump_data(int fd
)
293 list
<bufferlist
> l
= std::move(data
);
298 for (list
<bufferlist
>::iterator iter
= l
.begin(); iter
!= l
.end(); ++iter
) {
299 bufferlist
& bl
= *iter
;
300 int ret
= safe_write(fd
, bl
.c_str(), bl
.length());
302 ret
= safe_write(fd
, delim
.c_str(), delim
.length());
305 for (; iter
!= l
.end(); ++iter
) {
306 bufferlist
& bl
= *iter
;
308 data_size
+= bl
.length();
317 void OutputDataSocket::close_connection(int fd
)
319 VOID_TEMP_FAILURE_RETRY(close(fd
));
322 bool OutputDataSocket::init(const std::string
&path
)
324 ldout(m_cct
, 5) << "init " << path
<< dendl
;
326 /* Set up things for the new thread */
328 int pipe_rd
= -1, pipe_wr
= -1;
329 err
= create_shutdown_pipe(&pipe_rd
, &pipe_wr
);
331 lderr(m_cct
) << "OutputDataSocketConfigObs::init: error: " << err
<< dendl
;
335 err
= bind_and_listen(path
, &sock_fd
);
337 lderr(m_cct
) << "OutputDataSocketConfigObs::init: failed: " << err
<< dendl
;
343 /* Create new thread */
345 m_shutdown_rd_fd
= pipe_rd
;
346 m_shutdown_wr_fd
= pipe_wr
;
348 create("out_data_socket");
349 add_cleanup_file(m_path
.c_str());
353 void OutputDataSocket::shutdown()
360 if (m_shutdown_wr_fd
< 0)
363 ldout(m_cct
, 5) << "shutdown" << dendl
;
365 // Send a byte to the shutdown pipe that the thread is listening to
366 char buf
[1] = { 0x0 };
367 int ret
= safe_write(m_shutdown_wr_fd
, buf
, sizeof(buf
));
368 VOID_TEMP_FAILURE_RETRY(close(m_shutdown_wr_fd
));
369 m_shutdown_wr_fd
= -1;
374 lderr(m_cct
) << "OutputDataSocket::shutdown: failed to write "
375 "to thread shutdown pipe: error " << ret
<< dendl
;
378 remove_cleanup_file(m_path
.c_str());
382 void OutputDataSocket::append_output(bufferlist
& bl
)
384 Mutex::Locker
l(m_lock
);
386 if (data_size
+ bl
.length() > data_max_backlog
) {
387 ldout(m_cct
, 20) << "dropping data output, max backlog reached" << dendl
;
391 data_size
+= bl
.length();