1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2011 New Dream Network
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
15 #include "include/int_types.h"
17 #include "common/Thread.h"
18 #include "common/OutputDataSocket.h"
19 #include "common/config.h"
20 #include "common/dout.h"
21 #include "common/errno.h"
22 #include "common/perf_counters.h"
23 #include "common/pipe.h"
24 #include "common/safe_io.h"
25 #include "common/version.h"
26 #include "common/Formatter.h"
37 #include <sys/socket.h>
38 #include <sys/types.h>
42 #include "include/compat.h"
44 #define dout_subsys ceph_subsys_asok
46 #define dout_prefix *_dout << "asok(" << (void*)m_cct << ") "
48 using std::ostringstream
;
51 * UNIX domain sockets created by an application persist even after that
52 * application closes, unless they're explicitly unlinked. This is because the
53 * directory containing the socket keeps a reference to the socket.
55 * This code makes things a little nicer by unlinking those dead sockets when
56 * the application exits normally.
58 static pthread_mutex_t cleanup_lock
= PTHREAD_MUTEX_INITIALIZER
;
59 static std::vector
<const char*> cleanup_files
;
60 static bool cleanup_atexit
= false;
62 static void remove_cleanup_file(const char *file
)
64 pthread_mutex_lock(&cleanup_lock
);
65 VOID_TEMP_FAILURE_RETRY(unlink(file
));
66 for (std::vector
<const char*>::iterator i
= cleanup_files
.begin();
67 i
!= cleanup_files
.end(); ++i
) {
68 if (strcmp(file
, *i
) == 0) {
70 cleanup_files
.erase(i
);
74 pthread_mutex_unlock(&cleanup_lock
);
77 static void remove_all_cleanup_files()
79 pthread_mutex_lock(&cleanup_lock
);
80 for (std::vector
<const char*>::iterator i
= cleanup_files
.begin();
81 i
!= cleanup_files
.end(); ++i
) {
82 VOID_TEMP_FAILURE_RETRY(unlink(*i
));
85 cleanup_files
.clear();
86 pthread_mutex_unlock(&cleanup_lock
);
89 static void add_cleanup_file(const char *file
)
91 char *fname
= strdup(file
);
94 pthread_mutex_lock(&cleanup_lock
);
95 cleanup_files
.push_back(fname
);
96 if (!cleanup_atexit
) {
97 atexit(remove_all_cleanup_files
);
98 cleanup_atexit
= true;
100 pthread_mutex_unlock(&cleanup_lock
);
104 OutputDataSocket::OutputDataSocket(CephContext
*cct
, uint64_t _backlog
)
106 data_max_backlog(_backlog
),
108 m_shutdown_rd_fd(-1),
109 m_shutdown_wr_fd(-1),
112 m_lock("OutputDataSocket::m_lock")
116 OutputDataSocket::~OutputDataSocket()
122 * This thread listens on the UNIX domain socket for incoming connections.
123 * It only handles one connection at a time at the moment. All I/O is nonblocking,
124 * so that we can implement sensible timeouts. [TODO: make all I/O nonblocking]
126 * This thread also listens to m_shutdown_rd_fd. If there is any data sent to this
127 * pipe, the thread terminates itself gracefully, allowing the
128 * OutputDataSocketConfigObs class to join() it.
131 #define PFL_SUCCESS ((void*)(intptr_t)0)
132 #define PFL_FAIL ((void*)(intptr_t)1)
134 std::string
OutputDataSocket::create_shutdown_pipe(int *pipe_rd
, int *pipe_wr
)
137 int ret
= pipe_cloexec(pipefd
);
140 oss
<< "OutputDataSocket::create_shutdown_pipe error: " << cpp_strerror(ret
);
144 *pipe_rd
= pipefd
[0];
145 *pipe_wr
= pipefd
[1];
149 std::string
OutputDataSocket::bind_and_listen(const std::string
&sock_path
, int *fd
)
151 ldout(m_cct
, 5) << "bind_and_listen " << sock_path
<< dendl
;
153 struct sockaddr_un address
;
154 if (sock_path
.size() > sizeof(address
.sun_path
) - 1) {
156 oss
<< "OutputDataSocket::bind_and_listen: "
157 << "The UNIX domain socket path " << sock_path
<< " is too long! The "
158 << "maximum length on this system is "
159 << (sizeof(address
.sun_path
) - 1);
162 int sock_fd
= socket(PF_UNIX
, SOCK_STREAM
, 0);
166 oss
<< "OutputDataSocket::bind_and_listen: "
167 << "failed to create socket: " << cpp_strerror(err
);
170 int r
= fcntl(sock_fd
, F_SETFD
, FD_CLOEXEC
);
173 VOID_TEMP_FAILURE_RETRY(::close(sock_fd
));
175 oss
<< "OutputDataSocket::bind_and_listen: failed to fcntl on socket: " << cpp_strerror(r
);
178 memset(&address
, 0, sizeof(struct sockaddr_un
));
179 address
.sun_family
= AF_UNIX
;
180 snprintf(address
.sun_path
, sizeof(address
.sun_path
),
181 "%s", sock_path
.c_str());
182 if (::bind(sock_fd
, (struct sockaddr
*)&address
,
183 sizeof(struct sockaddr_un
)) != 0) {
185 if (err
== EADDRINUSE
) {
186 // The old UNIX domain socket must still be there.
187 // Let's unlink it and try again.
188 VOID_TEMP_FAILURE_RETRY(unlink(sock_path
.c_str()));
189 if (::bind(sock_fd
, (struct sockaddr
*)&address
,
190 sizeof(struct sockaddr_un
)) == 0) {
199 oss
<< "OutputDataSocket::bind_and_listen: "
200 << "failed to bind the UNIX domain socket to '" << sock_path
201 << "': " << cpp_strerror(err
);
206 if (listen(sock_fd
, 5) != 0) {
209 oss
<< "OutputDataSocket::bind_and_listen: "
210 << "failed to listen to socket: " << cpp_strerror(err
);
212 VOID_TEMP_FAILURE_RETRY(unlink(sock_path
.c_str()));
219 void* OutputDataSocket::entry()
221 ldout(m_cct
, 5) << "entry start" << dendl
;
223 struct pollfd fds
[2];
224 memset(fds
, 0, sizeof(fds
));
225 fds
[0].fd
= m_sock_fd
;
226 fds
[0].events
= POLLIN
| POLLRDBAND
;
227 fds
[1].fd
= m_shutdown_rd_fd
;
228 fds
[1].events
= POLLIN
| POLLRDBAND
;
230 int ret
= poll(fds
, 2, -1);
236 lderr(m_cct
) << "OutputDataSocket: poll(2) error: '"
237 << cpp_strerror(err
) << dendl
;
241 if (fds
[0].revents
& POLLIN
) {
242 // Send out some data
245 if (fds
[1].revents
& POLLIN
) {
246 // Parent wants us to shut down
250 ldout(m_cct
, 5) << "entry exit" << dendl
;
252 return PFL_SUCCESS
; // unreachable
256 bool OutputDataSocket::do_accept()
258 struct sockaddr_un address
;
259 socklen_t address_length
= sizeof(address
);
260 ldout(m_cct
, 30) << "OutputDataSocket: calling accept" << dendl
;
261 int connection_fd
= accept(m_sock_fd
, (struct sockaddr
*) &address
,
263 ldout(m_cct
, 30) << "OutputDataSocket: finished accept" << dendl
;
264 if (connection_fd
< 0) {
266 lderr(m_cct
) << "OutputDataSocket: do_accept error: '"
267 << cpp_strerror(err
) << dendl
;
271 handle_connection(connection_fd
);
272 close_connection(connection_fd
);
277 void OutputDataSocket::handle_connection(int fd
)
286 /* need to special case the connection init buffer output, as it needs
287 * to be dumped before any data, including older data that was sent
288 * before the connection was established, or before we identified
289 * older connection was broken
291 int ret
= safe_write(fd
, bl
.c_str(), bl
.length());
297 int ret
= dump_data(fd
);
315 int OutputDataSocket::dump_data(int fd
)
324 for (list
<bufferlist
>::iterator iter
= l
.begin(); iter
!= l
.end(); ++iter
) {
325 bufferlist
& bl
= *iter
;
326 int ret
= safe_write(fd
, bl
.c_str(), bl
.length());
328 ret
= safe_write(fd
, delim
.c_str(), delim
.length());
331 for (; iter
!= l
.end(); ++iter
) {
332 bufferlist
& bl
= *iter
;
334 data_size
+= bl
.length();
343 void OutputDataSocket::close_connection(int fd
)
345 VOID_TEMP_FAILURE_RETRY(close(fd
));
348 bool OutputDataSocket::init(const std::string
&path
)
350 ldout(m_cct
, 5) << "init " << path
<< dendl
;
352 /* Set up things for the new thread */
354 int pipe_rd
= -1, pipe_wr
= -1;
355 err
= create_shutdown_pipe(&pipe_rd
, &pipe_wr
);
357 lderr(m_cct
) << "OutputDataSocketConfigObs::init: error: " << err
<< dendl
;
361 err
= bind_and_listen(path
, &sock_fd
);
363 lderr(m_cct
) << "OutputDataSocketConfigObs::init: failed: " << err
<< dendl
;
369 /* Create new thread */
371 m_shutdown_rd_fd
= pipe_rd
;
372 m_shutdown_wr_fd
= pipe_wr
;
374 create("out_data_socket");
375 add_cleanup_file(m_path
.c_str());
379 void OutputDataSocket::shutdown()
386 if (m_shutdown_wr_fd
< 0)
389 ldout(m_cct
, 5) << "shutdown" << dendl
;
391 // Send a byte to the shutdown pipe that the thread is listening to
392 char buf
[1] = { 0x0 };
393 int ret
= safe_write(m_shutdown_wr_fd
, buf
, sizeof(buf
));
394 VOID_TEMP_FAILURE_RETRY(close(m_shutdown_wr_fd
));
395 m_shutdown_wr_fd
= -1;
400 lderr(m_cct
) << "OutputDataSocket::shutdown: failed to write "
401 "to thread shutdown pipe: error " << ret
<< dendl
;
404 remove_cleanup_file(m_path
.c_str());
408 void OutputDataSocket::append_output(bufferlist
& bl
)
410 Mutex::Locker
l(m_lock
);
412 if (data_size
+ bl
.length() > data_max_backlog
) {
413 ldout(m_cct
, 20) << "dropping data output, max backlog reached" << dendl
;
417 data_size
+= bl
.length();