1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2011 New Dream Network
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
17 #include "common/admin_socket.h"
18 #include "common/admin_socket_client.h"
19 #include "common/dout.h"
20 #include "common/errno.h"
21 #include "common/safe_io.h"
22 #include "common/Thread.h"
23 #include "common/version.h"
24 #include "common/ceph_mutex.h"
27 #include "common/Cond.h"
30 #include "messages/MCommand.h"
31 #include "messages/MCommandReply.h"
32 #include "messages/MMonCommand.h"
33 #include "messages/MMonCommandAck.h"
35 // re-include our assert to clobber the system one; fix dout:
36 #include "include/ceph_assert.h"
37 #include "include/compat.h"
38 #include "include/sock_compat.h"
40 #define dout_subsys ceph_subsys_asok
42 #define dout_prefix *_dout << "asok(" << (void*)m_cct << ") "
44 using namespace std::literals
;
46 using std::ostringstream
;
48 using std::stringstream
;
50 using namespace TOPNSPC::common
;
52 using ceph::bufferlist
;
54 using ceph::Formatter
;
58 * UNIX domain sockets created by an application persist even after that
59 * application closes, unless they're explicitly unlinked. This is because the
60 * directory containing the socket keeps a reference to the socket.
62 * This code makes things a little nicer by unlinking those dead sockets when
63 * the application exits normally.
66 template<typename F
, typename
... Args
>
67 inline int retry_sys_call(F f
, Args
... args
) {
71 } while (r
< 0 && errno
== EINTR
);
76 static std::mutex cleanup_lock
;
77 static std::vector
<std::string
> cleanup_files
;
78 static bool cleanup_atexit
= false;
80 static void remove_cleanup_file(std::string_view file
) {
81 std::unique_lock
l(cleanup_lock
);
83 if (auto i
= std::find(cleanup_files
.cbegin(), cleanup_files
.cend(), file
);
84 i
!= cleanup_files
.cend()) {
85 retry_sys_call(::unlink
, i
->c_str());
86 cleanup_files
.erase(i
);
90 void remove_all_cleanup_files() {
91 std::unique_lock
l(cleanup_lock
);
92 for (const auto& s
: cleanup_files
) {
93 retry_sys_call(::unlink
, s
.c_str());
95 cleanup_files
.clear();
98 static void add_cleanup_file(std::string file
) {
99 std::unique_lock
l(cleanup_lock
);
100 cleanup_files
.push_back(std::move(file
));
101 if (!cleanup_atexit
) {
102 atexit(remove_all_cleanup_files
);
103 cleanup_atexit
= true;
107 AdminSocket::AdminSocket(CephContext
*cct
)
111 AdminSocket::~AdminSocket()
117 * This thread listens on the UNIX domain socket for incoming connections.
118 * It only handles one connection at a time at the moment. All I/O is nonblocking,
119 * so that we can implement sensible timeouts. [TODO: make all I/O nonblocking]
121 * This thread also listens to m_wakeup_rd_fd. If there is any data sent to this
122 * pipe, the thread wakes up. If m_shutdown is set, the thread terminates
123 * itself gracefully, allowing the AdminSocketConfigObs class to join() it.
126 std::string
AdminSocket::create_wakeup_pipe(int *pipe_rd
, int *pipe_wr
)
130 if (win_socketpair(pipefd
) < 0) {
132 if (pipe_cloexec(pipefd
, O_NONBLOCK
) < 0) {
134 int e
= ceph_sock_errno();
136 oss
<< "AdminSocket::create_wakeup_pipe error: " << cpp_strerror(e
);
140 *pipe_rd
= pipefd
[0];
141 *pipe_wr
= pipefd
[1];
145 std::string
AdminSocket::destroy_wakeup_pipe()
147 // Send a byte to the wakeup pipe that the thread is listening to
148 char buf
[1] = { 0x0 };
149 int ret
= safe_send(m_wakeup_wr_fd
, buf
, sizeof(buf
));
152 retry_sys_call(::compat_closesocket
, m_wakeup_wr_fd
);
157 oss
<< "AdminSocket::destroy_shutdown_pipe error: failed to write"
158 "to thread shutdown pipe: error " << ret
;
164 // Close read end. Doing this before join() blocks the listenter and prevents
166 retry_sys_call(::compat_closesocket
, m_wakeup_rd_fd
);
172 std::string
AdminSocket::bind_and_listen(const std::string
&sock_path
, int *fd
)
174 ldout(m_cct
, 5) << "bind_and_listen " << sock_path
<< dendl
;
176 struct sockaddr_un address
;
177 if (sock_path
.size() > sizeof(address
.sun_path
) - 1) {
179 oss
<< "AdminSocket::bind_and_listen: "
180 << "The UNIX domain socket path " << sock_path
<< " is too long! The "
181 << "maximum length on this system is "
182 << (sizeof(address
.sun_path
) - 1);
185 int sock_fd
= socket_cloexec(PF_UNIX
, SOCK_STREAM
, 0);
187 int err
= ceph_sock_errno();
189 oss
<< "AdminSocket::bind_and_listen: "
190 << "failed to create socket: " << cpp_strerror(err
);
193 // FIPS zeroization audit 20191115: this memset is fine.
194 memset(&address
, 0, sizeof(struct sockaddr_un
));
195 address
.sun_family
= AF_UNIX
;
196 snprintf(address
.sun_path
, sizeof(address
.sun_path
),
197 "%s", sock_path
.c_str());
198 if (::bind(sock_fd
, (struct sockaddr
*)&address
,
199 sizeof(struct sockaddr_un
)) != 0) {
200 int err
= ceph_sock_errno();
201 if (err
== EADDRINUSE
) {
202 AdminSocketClient
client(sock_path
);
206 ldout(m_cct
, 20) << "socket " << sock_path
<< " is in use" << dendl
;
209 ldout(m_cct
, 20) << "unlink stale file " << sock_path
<< dendl
;
210 retry_sys_call(::unlink
, sock_path
.c_str());
211 if (::bind(sock_fd
, (struct sockaddr
*)&address
,
212 sizeof(struct sockaddr_un
)) == 0) {
215 err
= ceph_sock_errno();
221 oss
<< "AdminSocket::bind_and_listen: "
222 << "failed to bind the UNIX domain socket to '" << sock_path
223 << "': " << cpp_strerror(err
);
228 if (listen(sock_fd
, 5) != 0) {
229 int err
= ceph_sock_errno();
231 oss
<< "AdminSocket::bind_and_listen: "
232 << "failed to listen to socket: " << cpp_strerror(err
);
234 retry_sys_call(::unlink
, sock_path
.c_str());
241 void AdminSocket::entry() noexcept
243 ldout(m_cct
, 5) << "entry start" << dendl
;
245 struct pollfd fds
[2];
246 // FIPS zeroization audit 20191115: this memset is fine.
247 memset(fds
, 0, sizeof(fds
));
248 fds
[0].fd
= m_sock_fd
;
249 fds
[0].events
= POLLIN
| POLLRDBAND
;
250 fds
[1].fd
= m_wakeup_rd_fd
;
251 fds
[1].events
= POLLIN
| POLLRDBAND
;
253 ldout(m_cct
,20) << __func__
<< " waiting" << dendl
;
254 int ret
= poll(fds
, 2, -1);
256 int err
= ceph_sock_errno();
260 lderr(m_cct
) << "AdminSocket: poll(2) error: '"
261 << cpp_strerror(err
) << dendl
;
264 ldout(m_cct
,20) << __func__
<< " awake" << dendl
;
266 if (fds
[0].revents
& POLLIN
) {
267 // Send out some data
270 if (fds
[1].revents
& POLLIN
) {
273 auto s
= safe_recv(m_wakeup_rd_fd
, &buf
, 1);
275 int e
= ceph_sock_errno();
276 ldout(m_cct
, 5) << "AdminSocket: (ignoring) read(2) error: '"
277 << cpp_strerror(e
) << dendl
;
282 // Parent wants us to shut down
286 ldout(m_cct
, 5) << "entry exit" << dendl
;
289 void AdminSocket::chown(uid_t uid
, gid_t gid
)
291 if (m_sock_fd
>= 0) {
292 int r
= ::chown(m_path
.c_str(), uid
, gid
);
295 lderr(m_cct
) << "AdminSocket: failed to chown socket: "
296 << cpp_strerror(r
) << dendl
;
301 void AdminSocket::chmod(mode_t mode
)
303 if (m_sock_fd
>= 0) {
304 int r
= ::chmod(m_path
.c_str(), mode
);
307 lderr(m_cct
) << "AdminSocket: failed to chmod socket: "
308 << cpp_strerror(r
) << dendl
;
313 void AdminSocket::do_accept()
315 struct sockaddr_un address
;
316 socklen_t address_length
= sizeof(address
);
317 ldout(m_cct
, 30) << "AdminSocket: calling accept" << dendl
;
318 int connection_fd
= accept_cloexec(m_sock_fd
, (struct sockaddr
*) &address
,
320 if (connection_fd
< 0) {
321 int err
= ceph_sock_errno();
322 lderr(m_cct
) << "AdminSocket: do_accept error: '"
323 << cpp_strerror(err
) << dendl
;
326 ldout(m_cct
, 30) << "AdminSocket: finished accept" << dendl
;
332 int ret
= safe_recv(connection_fd
, &cmd
[pos
], 1);
335 lderr(m_cct
) << "AdminSocket: error reading request code: "
336 << cpp_strerror(ret
) << dendl
;
338 retry_sys_call(::compat_closesocket
, connection_fd
);
341 if (cmd
[0] == '\0') {
342 // old protocol: __be32
343 if (pos
== 3 && cmd
[0] == '\0') {
349 c
= "perfcounters_dump";
352 c
= "perfcounters_schema";
358 //wrap command with new protocol
359 c
= "{\"prefix\": \"" + c
+ "\"}";
363 // new protocol: null or \n terminated string
364 if (cmd
[pos
] == '\n' || cmd
[pos
] == '\0') {
370 if (++pos
>= sizeof(cmd
)) {
371 lderr(m_cct
) << "AdminSocket: error reading request too long" << dendl
;
372 retry_sys_call(::compat_closesocket
, connection_fd
);
377 std::vector
<std::string
> cmdvec
= { c
};
378 bufferlist empty
, out
;
380 int rval
= execute_command(cmdvec
, empty
/* inbl */, err
, &out
);
382 // Unfortunately, the asok wire protocol does not let us pass an error code,
383 // and many asok command implementations return helpful error strings. So,
384 // let's prepend an error string to the output if there is an error code.
387 ss
<< "ERROR: " << cpp_strerror(rval
) << "\n";
388 ss
<< err
.str() << "\n";
394 uint32_t len
= htonl(out
.length());
395 int ret
= safe_send(connection_fd
, &len
, sizeof(len
));
397 lderr(m_cct
) << "AdminSocket: error writing response length "
398 << cpp_strerror(ret
) << dendl
;
400 int r
= out
.send_fd(connection_fd
);
402 lderr(m_cct
) << "AdminSocket: error writing response payload "
403 << cpp_strerror(ret
) << dendl
;
406 retry_sys_call(::compat_closesocket
, connection_fd
);
409 void AdminSocket::do_tell_queue()
411 ldout(m_cct
,10) << __func__
<< dendl
;
412 std::list
<cref_t
<MCommand
>> q
;
413 std::list
<cref_t
<MMonCommand
>> lq
;
415 std::lock_guard
l(tell_lock
);
417 lq
.swap(tell_legacy_queue
);
424 [m
](int r
, const std::string
& err
, bufferlist
& outbl
) {
425 auto reply
= new MCommandReply(r
, err
);
426 reply
->set_tid(m
->get_tid());
427 reply
->set_data(outbl
);
429 // TODO: crimson: handle asok commmand from alien thread
431 m
->get_connection()->send_message(reply
);
440 [m
](int r
, const std::string
& err
, bufferlist
& outbl
) {
441 auto reply
= new MMonCommandAck(m
->cmd
, r
, err
, 0);
442 reply
->set_tid(m
->get_tid());
443 reply
->set_data(outbl
);
445 // TODO: crimson: handle asok commmand from alien thread
447 m
->get_connection()->send_message(reply
);
453 int AdminSocket::execute_command(
454 const std::vector
<std::string
>& cmd
,
455 const bufferlist
& inbl
,
460 // TODO: crimson: blocking execute_command() in alien thread
465 ceph::mutex mylock
= ceph::make_mutex("admin_socket::excute_command::mylock");
466 ceph::condition_variable mycond
;
467 C_SafeCond
fin(mylock
, mycond
, &done
, &rval
);
471 [&errss
, outbl
, &fin
](int r
, const std::string
& err
, bufferlist
& out
) {
473 *outbl
= std::move(out
);
477 std::unique_lock l
{mylock
};
478 mycond
.wait(l
, [&done
] { return done
;});
484 void AdminSocket::execute_command(
485 const std::vector
<std::string
>& cmdvec
,
486 const bufferlist
& inbl
,
487 std::function
<void(int,const std::string
&,bufferlist
&)> on_finish
)
493 ldout(m_cct
,10) << __func__
<< " cmdvec='" << cmdvec
<< "'" << dendl
;
494 if (!cmdmap_from_json(cmdvec
, &cmdmap
, errss
)) {
495 ldout(m_cct
, 0) << "AdminSocket: " << errss
.str() << dendl
;
496 return on_finish(-EINVAL
, "invalid json", empty
);
500 cmd_getval(cmdmap
, "format", format
);
501 cmd_getval(cmdmap
, "prefix", prefix
);
502 } catch (const bad_cmd_get
& e
) {
503 return on_finish(-EINVAL
, "invalid json, missing format and/or prefix",
507 auto f
= Formatter::create(format
, "json-pretty", "json-pretty");
509 auto [retval
, hook
] = find_matched_hook(prefix
, cmdmap
);
512 lderr(m_cct
) << "AdminSocket: request '" << cmdvec
513 << "' not defined" << dendl
;
515 return on_finish(-EINVAL
, "unknown command prefix "s
+ prefix
, empty
);
518 return on_finish(-EINVAL
, "invalid command json", empty
);
524 prefix
, cmdmap
, f
, inbl
,
525 [f
, on_finish
](int r
, const std::string
& err
, bufferlist
& out
) {
526 // handle either existing output in bufferlist *or* via formatter
527 if (r
>= 0 && out
.length() == 0) {
531 on_finish(r
, err
, out
);
534 std::unique_lock
l(lock
);
536 in_hook_cond
.notify_all();
539 std::pair
<int, AdminSocketHook
*>
540 AdminSocket::find_matched_hook(std::string
& prefix
,
541 const cmdmap_t
& cmdmap
)
543 std::unique_lock
l(lock
);
544 // Drop lock after done with the lookup to avoid cycles in cases where the
545 // hook takes the same lock that was held during calls to
546 // register/unregister, and set in_hook to allow unregister to wait for us
547 // before removing this hook.
548 auto [hooks_begin
, hooks_end
] = hooks
.equal_range(prefix
);
549 if (hooks_begin
== hooks_end
) {
550 return {ENOENT
, nullptr};
552 // make sure one of the registered commands with this prefix validates.
554 for (auto hook
= hooks_begin
; hook
!= hooks_end
; ++hook
) {
555 if (validate_cmd(m_cct
, hook
->second
.desc
, cmdmap
, errss
)) {
557 return {0, hook
->second
.hook
};
560 return {EINVAL
, nullptr};
563 void AdminSocket::queue_tell_command(cref_t
<MCommand
> m
)
565 ldout(m_cct
,10) << __func__
<< " " << *m
<< dendl
;
566 std::lock_guard
l(tell_lock
);
567 tell_queue
.push_back(std::move(m
));
570 void AdminSocket::queue_tell_command(cref_t
<MMonCommand
> m
)
572 ldout(m_cct
,10) << __func__
<< " " << *m
<< dendl
;
573 std::lock_guard
l(tell_lock
);
574 tell_legacy_queue
.push_back(std::move(m
));
578 int AdminSocket::register_command(std::string_view cmddesc
,
579 AdminSocketHook
*hook
,
580 std::string_view help
)
583 std::unique_lock
l(lock
);
584 string prefix
= cmddesc_get_prefix(cmddesc
);
585 auto i
= hooks
.find(prefix
);
586 if (i
!= hooks
.cend() &&
587 i
->second
.desc
== cmddesc
) {
588 ldout(m_cct
, 5) << "register_command " << prefix
589 << " cmddesc " << cmddesc
<< " hook " << hook
590 << " EEXIST" << dendl
;
593 ldout(m_cct
, 5) << "register_command " << prefix
<< " hook " << hook
595 hooks
.emplace_hint(i
,
596 std::piecewise_construct
,
597 std::forward_as_tuple(prefix
),
598 std::forward_as_tuple(hook
, cmddesc
, help
));
604 void AdminSocket::unregister_commands(const AdminSocketHook
*hook
)
606 std::unique_lock
l(lock
);
607 auto i
= hooks
.begin();
608 while (i
!= hooks
.end()) {
609 if (i
->second
.hook
== hook
) {
610 ldout(m_cct
, 5) << __func__
<< " " << i
->first
<< dendl
;
612 // If we are currently processing a command, wait for it to
613 // complete in case it referenced the hook that we are
615 in_hook_cond
.wait(l
, [this]() { return !in_hook
; });
623 class VersionHook
: public AdminSocketHook
{
625 int call(std::string_view command
, const cmdmap_t
& cmdmap
,
628 bufferlist
& out
) override
{
629 if (command
== "0"sv
) {
630 out
.append(CEPH_ADMIN_SOCK_VERSION
);
632 f
->open_object_section("version");
633 if (command
== "version") {
634 f
->dump_string("version", ceph_version_to_str());
635 f
->dump_string("release", ceph_release_to_str());
636 f
->dump_string("release_type", ceph_release_type());
637 } else if (command
== "git_version") {
638 f
->dump_string("git_version", git_version_to_str());
647 class HelpHook
: public AdminSocketHook
{
650 explicit HelpHook(AdminSocket
*as
) : m_as(as
) {}
651 int call(std::string_view command
, const cmdmap_t
& cmdmap
,
654 bufferlist
& out
) override
{
655 f
->open_object_section("help");
656 for (const auto& [command
, info
] : m_as
->hooks
) {
657 if (info
.help
.length())
658 f
->dump_string(command
.c_str(), info
.help
);
665 class GetdescsHook
: public AdminSocketHook
{
668 explicit GetdescsHook(AdminSocket
*as
) : m_as(as
) {}
669 int call(std::string_view command
, const cmdmap_t
& cmdmap
,
672 bufferlist
& out
) override
{
674 f
->open_object_section("command_descriptions");
675 for (const auto& [command
, info
] : m_as
->hooks
) {
676 // GCC 8 actually has [[maybe_unused]] on a structured binding
677 // do what you'd expect. GCC 7 does not.
679 ostringstream secname
;
680 secname
<< "cmd" << std::setfill('0') << std::setw(3) << cmdnum
;
681 dump_cmd_and_help_to_json(f
,
683 secname
.str().c_str(),
688 f
->close_section(); // command_descriptions
693 bool AdminSocket::init(const std::string
& path
)
695 ldout(m_cct
, 5) << "init " << path
<< dendl
;
698 OSVERSIONINFOEXW ver
= {0};
699 ver
.dwOSVersionInfoSize
= sizeof(ver
);
700 get_windows_version(&ver
);
702 if (std::tie(ver
.dwMajorVersion
, ver
.dwMinorVersion
, ver
.dwBuildNumber
) <
703 std::make_tuple(10, 0, 17063)) {
704 ldout(m_cct
, 5) << "Unix sockets require Windows 10.0.17063 or later. "
705 << "The admin socket will not be available." << dendl
;
710 /* Set up things for the new thread */
712 int pipe_rd
= -1, pipe_wr
= -1;
713 err
= create_wakeup_pipe(&pipe_rd
, &pipe_wr
);
715 lderr(m_cct
) << "AdminSocketConfigObs::init: error: " << err
<< dendl
;
719 err
= bind_and_listen(path
, &sock_fd
);
721 lderr(m_cct
) << "AdminSocketConfigObs::init: failed: " << err
<< dendl
;
727 /* Create new thread */
729 m_wakeup_rd_fd
= pipe_rd
;
730 m_wakeup_wr_fd
= pipe_wr
;
733 version_hook
= std::make_unique
<VersionHook
>();
734 register_command("0", version_hook
.get(), "");
735 register_command("version", version_hook
.get(), "get ceph version");
736 register_command("git_version", version_hook
.get(),
738 help_hook
= std::make_unique
<HelpHook
>(this);
739 register_command("help", help_hook
.get(),
740 "list available commands");
741 getdescs_hook
= std::make_unique
<GetdescsHook
>(this);
742 register_command("get_command_descriptions",
743 getdescs_hook
.get(), "list available commands");
745 th
= make_named_thread("admin_socket", &AdminSocket::entry
, this);
746 add_cleanup_file(m_path
.c_str());
750 void AdminSocket::shutdown()
752 // Under normal operation this is unlikely to occur. However for some unit
753 // tests, some object members are not initialized and so cannot be deleted
755 if (m_wakeup_wr_fd
< 0)
758 ldout(m_cct
, 5) << "shutdown" << dendl
;
761 auto err
= destroy_wakeup_pipe();
763 lderr(m_cct
) << "AdminSocket::shutdown: error: " << err
<< dendl
;
766 retry_sys_call(::compat_closesocket
, m_sock_fd
);
768 unregister_commands(version_hook
.get());
769 version_hook
.reset();
771 unregister_commands(help_hook
.get());
774 unregister_commands(getdescs_hook
.get());
775 getdescs_hook
.reset();
777 remove_cleanup_file(m_path
);
781 void AdminSocket::wakeup()
783 // Send a byte to the wakeup pipe that the thread is listening to
784 char buf
[1] = { 0x0 };
785 int r
= safe_send(m_wakeup_wr_fd
, buf
, sizeof(buf
));