]> git.proxmox.com Git - ceph.git/blame - ceph/src/common/OutputDataSocket.cc
add subtree-ish sources for 12.0.3
[ceph.git] / ceph / src / common / OutputDataSocket.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2011 New Dream Network
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15#include "include/int_types.h"
16
17#include "common/Thread.h"
18#include "common/OutputDataSocket.h"
19#include "common/config.h"
20#include "common/dout.h"
21#include "common/errno.h"
22#include "common/perf_counters.h"
23#include "common/pipe.h"
24#include "common/safe_io.h"
25#include "common/version.h"
26#include "common/Formatter.h"
27
28#include <errno.h>
29#include <fcntl.h>
30#include <map>
31#include <poll.h>
32#include <set>
33#include <sstream>
34#include <stdint.h>
35#include <string.h>
36#include <string>
37#include <sys/socket.h>
38#include <sys/types.h>
39#include <sys/un.h>
40#include <unistd.h>
41
42#include "include/compat.h"
43
44#define dout_subsys ceph_subsys_asok
45#undef dout_prefix
46#define dout_prefix *_dout << "asok(" << (void*)m_cct << ") "
47
48using std::ostringstream;
49
50/*
51 * UNIX domain sockets created by an application persist even after that
52 * application closes, unless they're explicitly unlinked. This is because the
53 * directory containing the socket keeps a reference to the socket.
54 *
55 * This code makes things a little nicer by unlinking those dead sockets when
56 * the application exits normally.
57 */
58static pthread_mutex_t cleanup_lock = PTHREAD_MUTEX_INITIALIZER;
59static std::vector <const char*> cleanup_files;
60static bool cleanup_atexit = false;
61
62static void remove_cleanup_file(const char *file)
63{
64 pthread_mutex_lock(&cleanup_lock);
65 VOID_TEMP_FAILURE_RETRY(unlink(file));
66 for (std::vector <const char*>::iterator i = cleanup_files.begin();
67 i != cleanup_files.end(); ++i) {
68 if (strcmp(file, *i) == 0) {
69 free((void*)*i);
70 cleanup_files.erase(i);
71 break;
72 }
73 }
74 pthread_mutex_unlock(&cleanup_lock);
75}
76
77static void remove_all_cleanup_files()
78{
79 pthread_mutex_lock(&cleanup_lock);
80 for (std::vector <const char*>::iterator i = cleanup_files.begin();
81 i != cleanup_files.end(); ++i) {
82 VOID_TEMP_FAILURE_RETRY(unlink(*i));
83 free((void*)*i);
84 }
85 cleanup_files.clear();
86 pthread_mutex_unlock(&cleanup_lock);
87}
88
89static void add_cleanup_file(const char *file)
90{
91 char *fname = strdup(file);
92 if (!fname)
93 return;
94 pthread_mutex_lock(&cleanup_lock);
95 cleanup_files.push_back(fname);
96 if (!cleanup_atexit) {
97 atexit(remove_all_cleanup_files);
98 cleanup_atexit = true;
99 }
100 pthread_mutex_unlock(&cleanup_lock);
101}
102
103
104OutputDataSocket::OutputDataSocket(CephContext *cct, uint64_t _backlog)
105 : m_cct(cct),
106 data_max_backlog(_backlog),
107 m_sock_fd(-1),
108 m_shutdown_rd_fd(-1),
109 m_shutdown_wr_fd(-1),
110 going_down(false),
111 data_size(0),
112 m_lock("OutputDataSocket::m_lock")
113{
114}
115
116OutputDataSocket::~OutputDataSocket()
117{
118 shutdown();
119}
120
121/*
122 * This thread listens on the UNIX domain socket for incoming connections.
123 * It only handles one connection at a time at the moment. All I/O is nonblocking,
124 * so that we can implement sensible timeouts. [TODO: make all I/O nonblocking]
125 *
126 * This thread also listens to m_shutdown_rd_fd. If there is any data sent to this
127 * pipe, the thread terminates itself gracefully, allowing the
128 * OutputDataSocketConfigObs class to join() it.
129 */
130
131#define PFL_SUCCESS ((void*)(intptr_t)0)
132#define PFL_FAIL ((void*)(intptr_t)1)
133
134std::string OutputDataSocket::create_shutdown_pipe(int *pipe_rd, int *pipe_wr)
135{
136 int pipefd[2];
137 int ret = pipe_cloexec(pipefd);
138 if (ret < 0) {
139 ostringstream oss;
140 oss << "OutputDataSocket::create_shutdown_pipe error: " << cpp_strerror(ret);
141 return oss.str();
142 }
143
144 *pipe_rd = pipefd[0];
145 *pipe_wr = pipefd[1];
146 return "";
147}
148
149std::string OutputDataSocket::bind_and_listen(const std::string &sock_path, int *fd)
150{
151 ldout(m_cct, 5) << "bind_and_listen " << sock_path << dendl;
152
153 struct sockaddr_un address;
154 if (sock_path.size() > sizeof(address.sun_path) - 1) {
155 ostringstream oss;
156 oss << "OutputDataSocket::bind_and_listen: "
157 << "The UNIX domain socket path " << sock_path << " is too long! The "
158 << "maximum length on this system is "
159 << (sizeof(address.sun_path) - 1);
160 return oss.str();
161 }
162 int sock_fd = socket(PF_UNIX, SOCK_STREAM, 0);
163 if (sock_fd < 0) {
164 int err = errno;
165 ostringstream oss;
166 oss << "OutputDataSocket::bind_and_listen: "
167 << "failed to create socket: " << cpp_strerror(err);
168 return oss.str();
169 }
170 int r = fcntl(sock_fd, F_SETFD, FD_CLOEXEC);
171 if (r < 0) {
172 r = errno;
173 VOID_TEMP_FAILURE_RETRY(::close(sock_fd));
174 ostringstream oss;
175 oss << "OutputDataSocket::bind_and_listen: failed to fcntl on socket: " << cpp_strerror(r);
176 return oss.str();
177 }
178 memset(&address, 0, sizeof(struct sockaddr_un));
179 address.sun_family = AF_UNIX;
180 snprintf(address.sun_path, sizeof(address.sun_path),
181 "%s", sock_path.c_str());
182 if (::bind(sock_fd, (struct sockaddr*)&address,
183 sizeof(struct sockaddr_un)) != 0) {
184 int err = errno;
185 if (err == EADDRINUSE) {
186 // The old UNIX domain socket must still be there.
187 // Let's unlink it and try again.
188 VOID_TEMP_FAILURE_RETRY(unlink(sock_path.c_str()));
189 if (::bind(sock_fd, (struct sockaddr*)&address,
190 sizeof(struct sockaddr_un)) == 0) {
191 err = 0;
192 }
193 else {
194 err = errno;
195 }
196 }
197 if (err != 0) {
198 ostringstream oss;
199 oss << "OutputDataSocket::bind_and_listen: "
200 << "failed to bind the UNIX domain socket to '" << sock_path
201 << "': " << cpp_strerror(err);
202 close(sock_fd);
203 return oss.str();
204 }
205 }
206 if (listen(sock_fd, 5) != 0) {
207 int err = errno;
208 ostringstream oss;
209 oss << "OutputDataSocket::bind_and_listen: "
210 << "failed to listen to socket: " << cpp_strerror(err);
211 close(sock_fd);
212 VOID_TEMP_FAILURE_RETRY(unlink(sock_path.c_str()));
213 return oss.str();
214 }
215 *fd = sock_fd;
216 return "";
217}
218
219void* OutputDataSocket::entry()
220{
221 ldout(m_cct, 5) << "entry start" << dendl;
222 while (true) {
223 struct pollfd fds[2];
224 memset(fds, 0, sizeof(fds));
225 fds[0].fd = m_sock_fd;
226 fds[0].events = POLLIN | POLLRDBAND;
227 fds[1].fd = m_shutdown_rd_fd;
228 fds[1].events = POLLIN | POLLRDBAND;
229
230 int ret = poll(fds, 2, -1);
231 if (ret < 0) {
232 int err = errno;
233 if (err == EINTR) {
234 continue;
235 }
236 lderr(m_cct) << "OutputDataSocket: poll(2) error: '"
237 << cpp_strerror(err) << dendl;
238 return PFL_FAIL;
239 }
240
241 if (fds[0].revents & POLLIN) {
242 // Send out some data
243 do_accept();
244 }
245 if (fds[1].revents & POLLIN) {
246 // Parent wants us to shut down
247 return PFL_SUCCESS;
248 }
249 }
250 ldout(m_cct, 5) << "entry exit" << dendl;
251
252 return PFL_SUCCESS; // unreachable
253}
254
255
256bool OutputDataSocket::do_accept()
257{
258 struct sockaddr_un address;
259 socklen_t address_length = sizeof(address);
260 ldout(m_cct, 30) << "OutputDataSocket: calling accept" << dendl;
261 int connection_fd = accept(m_sock_fd, (struct sockaddr*) &address,
262 &address_length);
263 ldout(m_cct, 30) << "OutputDataSocket: finished accept" << dendl;
264 if (connection_fd < 0) {
265 int err = errno;
266 lderr(m_cct) << "OutputDataSocket: do_accept error: '"
267 << cpp_strerror(err) << dendl;
268 return false;
269 }
270
271 handle_connection(connection_fd);
272 close_connection(connection_fd);
273
274 return 0;
275}
276
277void OutputDataSocket::handle_connection(int fd)
278{
279 bufferlist bl;
280
281 m_lock.Lock();
282 init_connection(bl);
283 m_lock.Unlock();
284
285 if (bl.length()) {
286 /* need to special case the connection init buffer output, as it needs
287 * to be dumped before any data, including older data that was sent
288 * before the connection was established, or before we identified
289 * older connection was broken
290 */
291 int ret = safe_write(fd, bl.c_str(), bl.length());
292 if (ret < 0) {
293 return;
294 }
295 }
296
297 int ret = dump_data(fd);
298 if (ret < 0)
299 return;
300
301 do {
302 m_lock.Lock();
303 cond.Wait(m_lock);
304
305 if (going_down) {
306 m_lock.Unlock();
307 break;
308 }
309 m_lock.Unlock();
310
311 ret = dump_data(fd);
312 } while (ret >= 0);
313}
314
315int OutputDataSocket::dump_data(int fd)
316{
317 m_lock.Lock();
318 list<bufferlist> l;
319 l = data;
320 data.clear();
321 data_size = 0;
322 m_lock.Unlock();
323
324 for (list<bufferlist>::iterator iter = l.begin(); iter != l.end(); ++iter) {
325 bufferlist& bl = *iter;
326 int ret = safe_write(fd, bl.c_str(), bl.length());
327 if (ret >= 0) {
328 ret = safe_write(fd, delim.c_str(), delim.length());
329 }
330 if (ret < 0) {
331 for (; iter != l.end(); ++iter) {
332 bufferlist& bl = *iter;
333 data.push_back(bl);
334 data_size += bl.length();
335 }
336 return ret;
337 }
338 }
339
340 return 0;
341}
342
343void OutputDataSocket::close_connection(int fd)
344{
345 VOID_TEMP_FAILURE_RETRY(close(fd));
346}
347
348bool OutputDataSocket::init(const std::string &path)
349{
350 ldout(m_cct, 5) << "init " << path << dendl;
351
352 /* Set up things for the new thread */
353 std::string err;
354 int pipe_rd = -1, pipe_wr = -1;
355 err = create_shutdown_pipe(&pipe_rd, &pipe_wr);
356 if (!err.empty()) {
357 lderr(m_cct) << "OutputDataSocketConfigObs::init: error: " << err << dendl;
358 return false;
359 }
360 int sock_fd;
361 err = bind_and_listen(path, &sock_fd);
362 if (!err.empty()) {
363 lderr(m_cct) << "OutputDataSocketConfigObs::init: failed: " << err << dendl;
364 close(pipe_rd);
365 close(pipe_wr);
366 return false;
367 }
368
369 /* Create new thread */
370 m_sock_fd = sock_fd;
371 m_shutdown_rd_fd = pipe_rd;
372 m_shutdown_wr_fd = pipe_wr;
373 m_path = path;
374 create("out_data_socket");
375 add_cleanup_file(m_path.c_str());
376 return true;
377}
378
379void OutputDataSocket::shutdown()
380{
381 m_lock.Lock();
382 going_down = true;
383 cond.Signal();
384 m_lock.Unlock();
385
386 if (m_shutdown_wr_fd < 0)
387 return;
388
389 ldout(m_cct, 5) << "shutdown" << dendl;
390
391 // Send a byte to the shutdown pipe that the thread is listening to
392 char buf[1] = { 0x0 };
393 int ret = safe_write(m_shutdown_wr_fd, buf, sizeof(buf));
394 VOID_TEMP_FAILURE_RETRY(close(m_shutdown_wr_fd));
395 m_shutdown_wr_fd = -1;
396
397 if (ret == 0) {
398 join();
399 } else {
400 lderr(m_cct) << "OutputDataSocket::shutdown: failed to write "
401 "to thread shutdown pipe: error " << ret << dendl;
402 }
403
404 remove_cleanup_file(m_path.c_str());
405 m_path.clear();
406}
407
408void OutputDataSocket::append_output(bufferlist& bl)
409{
410 Mutex::Locker l(m_lock);
411
412 if (data_size + bl.length() > data_max_backlog) {
413 ldout(m_cct, 20) << "dropping data output, max backlog reached" << dendl;
414 }
415 data.push_back(bl);
416
417 data_size += bl.length();
418
419 cond.Signal();
420}