]> git.proxmox.com Git - ceph.git/blame - ceph/src/common/OutputDataSocket.cc
update sources to 12.2.10
[ceph.git] / ceph / src / common / OutputDataSocket.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2011 New Dream Network
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
7c673cae 15#include "common/OutputDataSocket.h"
7c673cae 16#include "common/errno.h"
7c673cae 17#include "common/safe_io.h"
31f18b77 18#include "include/compat.h"
91327a77 19#include "include/sock_compat.h"
7c673cae 20
7c673cae 21#include <poll.h>
7c673cae 22#include <sys/un.h>
7c673cae 23
31f18b77
FG
24// re-include our assert to clobber the system one; fix dout:
25#include "include/assert.h"
7c673cae
FG
26
27#define dout_subsys ceph_subsys_asok
28#undef dout_prefix
29#define dout_prefix *_dout << "asok(" << (void*)m_cct << ") "
30
31using std::ostringstream;
32
33/*
34 * UNIX domain sockets created by an application persist even after that
35 * application closes, unless they're explicitly unlinked. This is because the
36 * directory containing the socket keeps a reference to the socket.
37 *
38 * This code makes things a little nicer by unlinking those dead sockets when
39 * the application exits normally.
40 */
41static pthread_mutex_t cleanup_lock = PTHREAD_MUTEX_INITIALIZER;
42static std::vector <const char*> cleanup_files;
43static bool cleanup_atexit = false;
44
45static void remove_cleanup_file(const char *file)
46{
47 pthread_mutex_lock(&cleanup_lock);
48 VOID_TEMP_FAILURE_RETRY(unlink(file));
49 for (std::vector <const char*>::iterator i = cleanup_files.begin();
50 i != cleanup_files.end(); ++i) {
51 if (strcmp(file, *i) == 0) {
52 free((void*)*i);
53 cleanup_files.erase(i);
54 break;
55 }
56 }
57 pthread_mutex_unlock(&cleanup_lock);
58}
59
60static void remove_all_cleanup_files()
61{
62 pthread_mutex_lock(&cleanup_lock);
63 for (std::vector <const char*>::iterator i = cleanup_files.begin();
64 i != cleanup_files.end(); ++i) {
65 VOID_TEMP_FAILURE_RETRY(unlink(*i));
66 free((void*)*i);
67 }
68 cleanup_files.clear();
69 pthread_mutex_unlock(&cleanup_lock);
70}
71
72static void add_cleanup_file(const char *file)
73{
74 char *fname = strdup(file);
75 if (!fname)
76 return;
77 pthread_mutex_lock(&cleanup_lock);
78 cleanup_files.push_back(fname);
79 if (!cleanup_atexit) {
80 atexit(remove_all_cleanup_files);
81 cleanup_atexit = true;
82 }
83 pthread_mutex_unlock(&cleanup_lock);
84}
85
86
87OutputDataSocket::OutputDataSocket(CephContext *cct, uint64_t _backlog)
88 : m_cct(cct),
89 data_max_backlog(_backlog),
90 m_sock_fd(-1),
91 m_shutdown_rd_fd(-1),
92 m_shutdown_wr_fd(-1),
93 going_down(false),
94 data_size(0),
95 m_lock("OutputDataSocket::m_lock")
96{
97}
98
99OutputDataSocket::~OutputDataSocket()
100{
101 shutdown();
102}
103
104/*
105 * This thread listens on the UNIX domain socket for incoming connections.
106 * It only handles one connection at a time at the moment. All I/O is nonblocking,
107 * so that we can implement sensible timeouts. [TODO: make all I/O nonblocking]
108 *
109 * This thread also listens to m_shutdown_rd_fd. If there is any data sent to this
110 * pipe, the thread terminates itself gracefully, allowing the
111 * OutputDataSocketConfigObs class to join() it.
112 */
113
114#define PFL_SUCCESS ((void*)(intptr_t)0)
115#define PFL_FAIL ((void*)(intptr_t)1)
116
117std::string OutputDataSocket::create_shutdown_pipe(int *pipe_rd, int *pipe_wr)
118{
119 int pipefd[2];
91327a77
AA
120 if (pipe_cloexec(pipefd) < 0) {
121 int e = errno;
7c673cae 122 ostringstream oss;
91327a77 123 oss << "OutputDataSocket::create_shutdown_pipe error: " << cpp_strerror(e);
7c673cae
FG
124 return oss.str();
125 }
126
127 *pipe_rd = pipefd[0];
128 *pipe_wr = pipefd[1];
129 return "";
130}
131
132std::string OutputDataSocket::bind_and_listen(const std::string &sock_path, int *fd)
133{
134 ldout(m_cct, 5) << "bind_and_listen " << sock_path << dendl;
135
136 struct sockaddr_un address;
137 if (sock_path.size() > sizeof(address.sun_path) - 1) {
138 ostringstream oss;
139 oss << "OutputDataSocket::bind_and_listen: "
140 << "The UNIX domain socket path " << sock_path << " is too long! The "
141 << "maximum length on this system is "
142 << (sizeof(address.sun_path) - 1);
143 return oss.str();
144 }
91327a77 145 int sock_fd = socket_cloexec(PF_UNIX, SOCK_STREAM, 0);
7c673cae
FG
146 if (sock_fd < 0) {
147 int err = errno;
148 ostringstream oss;
149 oss << "OutputDataSocket::bind_and_listen: "
150 << "failed to create socket: " << cpp_strerror(err);
151 return oss.str();
152 }
7c673cae
FG
153 memset(&address, 0, sizeof(struct sockaddr_un));
154 address.sun_family = AF_UNIX;
155 snprintf(address.sun_path, sizeof(address.sun_path),
156 "%s", sock_path.c_str());
157 if (::bind(sock_fd, (struct sockaddr*)&address,
158 sizeof(struct sockaddr_un)) != 0) {
159 int err = errno;
160 if (err == EADDRINUSE) {
161 // The old UNIX domain socket must still be there.
162 // Let's unlink it and try again.
163 VOID_TEMP_FAILURE_RETRY(unlink(sock_path.c_str()));
164 if (::bind(sock_fd, (struct sockaddr*)&address,
165 sizeof(struct sockaddr_un)) == 0) {
166 err = 0;
167 }
168 else {
169 err = errno;
170 }
171 }
172 if (err != 0) {
173 ostringstream oss;
174 oss << "OutputDataSocket::bind_and_listen: "
175 << "failed to bind the UNIX domain socket to '" << sock_path
176 << "': " << cpp_strerror(err);
177 close(sock_fd);
178 return oss.str();
179 }
180 }
181 if (listen(sock_fd, 5) != 0) {
182 int err = errno;
183 ostringstream oss;
184 oss << "OutputDataSocket::bind_and_listen: "
185 << "failed to listen to socket: " << cpp_strerror(err);
186 close(sock_fd);
187 VOID_TEMP_FAILURE_RETRY(unlink(sock_path.c_str()));
188 return oss.str();
189 }
190 *fd = sock_fd;
191 return "";
192}
193
194void* OutputDataSocket::entry()
195{
196 ldout(m_cct, 5) << "entry start" << dendl;
197 while (true) {
198 struct pollfd fds[2];
199 memset(fds, 0, sizeof(fds));
200 fds[0].fd = m_sock_fd;
201 fds[0].events = POLLIN | POLLRDBAND;
202 fds[1].fd = m_shutdown_rd_fd;
203 fds[1].events = POLLIN | POLLRDBAND;
204
205 int ret = poll(fds, 2, -1);
206 if (ret < 0) {
207 int err = errno;
208 if (err == EINTR) {
209 continue;
210 }
211 lderr(m_cct) << "OutputDataSocket: poll(2) error: '"
212 << cpp_strerror(err) << dendl;
213 return PFL_FAIL;
214 }
215
216 if (fds[0].revents & POLLIN) {
217 // Send out some data
218 do_accept();
219 }
220 if (fds[1].revents & POLLIN) {
221 // Parent wants us to shut down
222 return PFL_SUCCESS;
223 }
224 }
225 ldout(m_cct, 5) << "entry exit" << dendl;
226
227 return PFL_SUCCESS; // unreachable
228}
229
230
231bool OutputDataSocket::do_accept()
232{
233 struct sockaddr_un address;
234 socklen_t address_length = sizeof(address);
235 ldout(m_cct, 30) << "OutputDataSocket: calling accept" << dendl;
91327a77 236 int connection_fd = accept_cloexec(m_sock_fd, (struct sockaddr*) &address,
7c673cae 237 &address_length);
7c673cae
FG
238 if (connection_fd < 0) {
239 int err = errno;
240 lderr(m_cct) << "OutputDataSocket: do_accept error: '"
241 << cpp_strerror(err) << dendl;
242 return false;
243 }
91327a77 244 ldout(m_cct, 30) << "OutputDataSocket: finished accept" << dendl;
7c673cae
FG
245
246 handle_connection(connection_fd);
247 close_connection(connection_fd);
248
249 return 0;
250}
251
252void OutputDataSocket::handle_connection(int fd)
253{
254 bufferlist bl;
255
256 m_lock.Lock();
257 init_connection(bl);
258 m_lock.Unlock();
259
260 if (bl.length()) {
261 /* need to special case the connection init buffer output, as it needs
262 * to be dumped before any data, including older data that was sent
263 * before the connection was established, or before we identified
264 * older connection was broken
265 */
266 int ret = safe_write(fd, bl.c_str(), bl.length());
267 if (ret < 0) {
268 return;
269 }
270 }
271
272 int ret = dump_data(fd);
273 if (ret < 0)
274 return;
275
276 do {
277 m_lock.Lock();
278 cond.Wait(m_lock);
279
280 if (going_down) {
281 m_lock.Unlock();
282 break;
283 }
284 m_lock.Unlock();
285
286 ret = dump_data(fd);
287 } while (ret >= 0);
288}
289
290int OutputDataSocket::dump_data(int fd)
291{
292 m_lock.Lock();
c07f9fc5 293 list<bufferlist> l = std::move(data);
7c673cae
FG
294 data.clear();
295 data_size = 0;
296 m_lock.Unlock();
297
298 for (list<bufferlist>::iterator iter = l.begin(); iter != l.end(); ++iter) {
299 bufferlist& bl = *iter;
300 int ret = safe_write(fd, bl.c_str(), bl.length());
301 if (ret >= 0) {
302 ret = safe_write(fd, delim.c_str(), delim.length());
303 }
304 if (ret < 0) {
305 for (; iter != l.end(); ++iter) {
306 bufferlist& bl = *iter;
307 data.push_back(bl);
308 data_size += bl.length();
309 }
310 return ret;
311 }
312 }
313
314 return 0;
315}
316
317void OutputDataSocket::close_connection(int fd)
318{
319 VOID_TEMP_FAILURE_RETRY(close(fd));
320}
321
322bool OutputDataSocket::init(const std::string &path)
323{
324 ldout(m_cct, 5) << "init " << path << dendl;
325
326 /* Set up things for the new thread */
327 std::string err;
328 int pipe_rd = -1, pipe_wr = -1;
329 err = create_shutdown_pipe(&pipe_rd, &pipe_wr);
330 if (!err.empty()) {
331 lderr(m_cct) << "OutputDataSocketConfigObs::init: error: " << err << dendl;
332 return false;
333 }
334 int sock_fd;
335 err = bind_and_listen(path, &sock_fd);
336 if (!err.empty()) {
337 lderr(m_cct) << "OutputDataSocketConfigObs::init: failed: " << err << dendl;
338 close(pipe_rd);
339 close(pipe_wr);
340 return false;
341 }
342
343 /* Create new thread */
344 m_sock_fd = sock_fd;
345 m_shutdown_rd_fd = pipe_rd;
346 m_shutdown_wr_fd = pipe_wr;
347 m_path = path;
348 create("out_data_socket");
349 add_cleanup_file(m_path.c_str());
350 return true;
351}
352
353void OutputDataSocket::shutdown()
354{
355 m_lock.Lock();
356 going_down = true;
357 cond.Signal();
358 m_lock.Unlock();
359
360 if (m_shutdown_wr_fd < 0)
361 return;
362
363 ldout(m_cct, 5) << "shutdown" << dendl;
364
365 // Send a byte to the shutdown pipe that the thread is listening to
366 char buf[1] = { 0x0 };
367 int ret = safe_write(m_shutdown_wr_fd, buf, sizeof(buf));
368 VOID_TEMP_FAILURE_RETRY(close(m_shutdown_wr_fd));
369 m_shutdown_wr_fd = -1;
370
371 if (ret == 0) {
372 join();
373 } else {
374 lderr(m_cct) << "OutputDataSocket::shutdown: failed to write "
375 "to thread shutdown pipe: error " << ret << dendl;
376 }
377
378 remove_cleanup_file(m_path.c_str());
379 m_path.clear();
380}
381
382void OutputDataSocket::append_output(bufferlist& bl)
383{
384 Mutex::Locker l(m_lock);
385
386 if (data_size + bl.length() > data_max_backlog) {
387 ldout(m_cct, 20) << "dropping data output, max backlog reached" << dendl;
388 }
389 data.push_back(bl);
390
391 data_size += bl.length();
392
393 cond.Signal();
394}