#include "common/safe_io.h"
#include "common/Thread.h"
#include "common/version.h"
+#include "common/ceph_mutex.h"
+#ifndef WITH_SEASTAR
+#include "common/Cond.h"
+#endif
+
+#include "messages/MCommand.h"
+#include "messages/MCommandReply.h"
+#include "messages/MMonCommand.h"
+#include "messages/MMonCommandAck.h"
// re-include our assert to clobber the system one; fix dout:
#include "include/ceph_assert.h"
using std::ostringstream;
+using namespace TOPNSPC::common;
/*
* UNIX domain sockets created by an application persist even after that
* It only handles one connection at a time at the moment. All I/O is nonblocking,
* so that we can implement sensible timeouts. [TODO: make all I/O nonblocking]
*
- * This thread also listens to m_shutdown_rd_fd. If there is any data sent to this
- * pipe, the thread terminates itself gracefully, allowing the
- * AdminSocketConfigObs class to join() it.
+ * This thread also listens to m_wakeup_rd_fd. If there is any data sent to this
+ * pipe, the thread wakes up. If m_shutdown is set, the thread terminates
+ * itself gracefully, allowing the AdminSocketConfigObs class to join() it.
*/
-std::string AdminSocket::create_shutdown_pipe(int *pipe_rd, int *pipe_wr)
+std::string AdminSocket::create_wakeup_pipe(int *pipe_rd, int *pipe_wr)
{
int pipefd[2];
- if (pipe_cloexec(pipefd) < 0) {
+ if (pipe_cloexec(pipefd, O_NONBLOCK) < 0) {
int e = errno;
ostringstream oss;
- oss << "AdminSocket::create_shutdown_pipe error: " << cpp_strerror(e);
+ oss << "AdminSocket::create_wakeup_pipe error: " << cpp_strerror(e);
return oss.str();
}
return "";
}
-std::string AdminSocket::destroy_shutdown_pipe()
+std::string AdminSocket::destroy_wakeup_pipe()
{
- // Send a byte to the shutdown pipe that the thread is listening to
+ // Send a byte to the wakeup pipe that the thread is listening to
char buf[1] = { 0x0 };
- int ret = safe_write(m_shutdown_wr_fd, buf, sizeof(buf));
+ int ret = safe_write(m_wakeup_wr_fd, buf, sizeof(buf));
// Close write end
- retry_sys_call(::close, m_shutdown_wr_fd);
- m_shutdown_wr_fd = -1;
+ retry_sys_call(::close, m_wakeup_wr_fd);
+ m_wakeup_wr_fd = -1;
if (ret != 0) {
ostringstream oss;
// Close read end. Doing this before join() blocks the listenter and prevents
// joining.
- retry_sys_call(::close, m_shutdown_rd_fd);
- m_shutdown_rd_fd = -1;
+ retry_sys_call(::close, m_wakeup_rd_fd);
+ m_wakeup_rd_fd = -1;
return "";
}
memset(fds, 0, sizeof(fds));
fds[0].fd = m_sock_fd;
fds[0].events = POLLIN | POLLRDBAND;
- fds[1].fd = m_shutdown_rd_fd;
+ fds[1].fd = m_wakeup_rd_fd;
fds[1].events = POLLIN | POLLRDBAND;
+ ldout(m_cct,20) << __func__ << " waiting" << dendl;
int ret = poll(fds, 2, -1);
if (ret < 0) {
int err = errno;
<< cpp_strerror(err) << dendl;
return;
}
+ ldout(m_cct,20) << __func__ << " awake" << dendl;
if (fds[0].revents & POLLIN) {
// Send out some data
do_accept();
}
if (fds[1].revents & POLLIN) {
+ // read off one byte
+ char buf;
+ auto s = ::read(m_wakeup_rd_fd, &buf, 1);
+ if (s == -1) {
+ int e = errno;
+ ldout(m_cct, 5) << "AdminSocket: (ignoring) read(2) error: '"
+ << cpp_strerror(e) << dendl;
+ }
+ do_tell_queue();
+ }
+ if (m_shutdown) {
// Parent wants us to shut down
return;
}
}
}
-bool AdminSocket::do_accept()
+void AdminSocket::do_accept()
{
struct sockaddr_un address;
socklen_t address_length = sizeof(address);
int err = errno;
lderr(m_cct) << "AdminSocket: do_accept error: '"
<< cpp_strerror(err) << dendl;
- return false;
+ return;
}
ldout(m_cct, 30) << "AdminSocket: finished accept" << dendl;
<< cpp_strerror(ret) << dendl;
}
retry_sys_call(::close, connection_fd);
- return false;
+ return;
}
if (cmd[0] == '\0') {
// old protocol: __be32
if (++pos >= sizeof(cmd)) {
lderr(m_cct) << "AdminSocket: error reading request too long" << dendl;
retry_sys_call(::close, connection_fd);
- return false;
+ return;
}
}
- bool rval;
- bufferlist out;
- rval = execute_command(c, out);
- if (rval) {
- uint32_t len = htonl(out.length());
- int ret = safe_write(connection_fd, &len, sizeof(len));
- if (ret < 0) {
- lderr(m_cct) << "AdminSocket: error writing response length "
- << cpp_strerror(ret) << dendl;
- rval = false;
- } else {
- if (out.write_fd(connection_fd) >= 0)
- rval = true;
+ std::vector<std::string> cmdvec = { c };
+ bufferlist empty, out;
+ ostringstream err;
+ int rval = execute_command(cmdvec, empty /* inbl */, err, &out);
+
+ // Unfortunately, the asok wire protocol does not let us pass an error code,
+ // and many asok command implementations return helpful error strings. So,
+ // let's prepend an error string to the output if there is an error code.
+ if (rval < 0) {
+ ostringstream ss;
+ ss << "ERROR: " << cpp_strerror(rval) << "\n";
+ ss << err.str() << "\n";
+ bufferlist o;
+ o.append(ss.str());
+ o.claim_append(out);
+ out.claim_append(o);
+ }
+ uint32_t len = htonl(out.length());
+ int ret = safe_write(connection_fd, &len, sizeof(len));
+ if (ret < 0) {
+ lderr(m_cct) << "AdminSocket: error writing response length "
+ << cpp_strerror(ret) << dendl;
+ } else {
+ if (out.write_fd(connection_fd) < 0) {
+ lderr(m_cct) << "AdminSocket: error writing response payload "
+ << cpp_strerror(ret) << dendl;
}
}
-
retry_sys_call(::close, connection_fd);
+}
+
+void AdminSocket::do_tell_queue()
+{
+ ldout(m_cct,10) << __func__ << dendl;
+ std::list<cref_t<MCommand>> q;
+ std::list<cref_t<MMonCommand>> lq;
+ {
+ std::lock_guard l(tell_lock);
+ q.swap(tell_queue);
+ lq.swap(tell_legacy_queue);
+ }
+ for (auto& m : q) {
+ bufferlist outbl;
+ execute_command(
+ m->cmd,
+ m->get_data(),
+ [m](int r, const std::string& err, bufferlist& outbl) {
+ auto reply = new MCommandReply(r, err);
+ reply->set_tid(m->get_tid());
+ reply->set_data(outbl);
+#ifdef WITH_SEASTAR
+#warning "fix message send with crimson"
+#else
+ m->get_connection()->send_message(reply);
+#endif
+ });
+ }
+ for (auto& m : lq) {
+ bufferlist outbl;
+ execute_command(
+ m->cmd,
+ m->get_data(),
+ [m](int r, const std::string& err, bufferlist& outbl) {
+ auto reply = new MMonCommandAck(m->cmd, r, err, 0);
+ reply->set_tid(m->get_tid());
+ reply->set_data(outbl);
+#ifdef WITH_SEASTAR
+#warning "fix message send with crimson"
+#else
+ m->get_connection()->send_message(reply);
+#endif
+ });
+ }
+}
+
+int AdminSocket::execute_command(
+ const std::vector<std::string>& cmd,
+ const bufferlist& inbl,
+ std::ostream& errss,
+ bufferlist *outbl)
+{
+#ifdef WITH_SEASTAR
+#warning "must implement admin socket blocking execute_command() for crimson"
+ return -ENOSYS;
+#else
+ bool done = false;
+ int rval = 0;
+ ceph::mutex mylock = ceph::make_mutex("admin_socket::excute_command::mylock");
+ ceph::condition_variable mycond;
+ C_SafeCond fin(mylock, mycond, &done, &rval);
+ execute_command(
+ cmd,
+ inbl,
+ [&errss, outbl, &fin](int r, const std::string& err, bufferlist& out) {
+ errss << err;
+ outbl->claim(out);
+ fin.finish(r);
+ });
+ {
+ std::unique_lock l{mylock};
+ mycond.wait(l, [&done] { return done;});
+ }
return rval;
+#endif
}
-int AdminSocket::execute_command(const std::string& cmd, ceph::bufferlist& out)
+void AdminSocket::execute_command(
+ const std::vector<std::string>& cmdvec,
+ const bufferlist& inbl,
+ std::function<void(int,const std::string&,bufferlist&)> on_finish)
{
cmdmap_t cmdmap;
string format;
- vector<string> cmdvec;
stringstream errss;
- cmdvec.push_back(cmd);
+ bufferlist empty;
+ ldout(m_cct,10) << __func__ << " cmdvec='" << cmdvec << "'" << dendl;
if (!cmdmap_from_json(cmdvec, &cmdmap, errss)) {
ldout(m_cct, 0) << "AdminSocket: " << errss.str() << dendl;
- return false;
+ return on_finish(-EINVAL, "invalid json", empty);
}
- string match;
+ string prefix;
try {
- cmd_getval(m_cct, cmdmap, "format", format);
- cmd_getval(m_cct, cmdmap, "prefix", match);
+ cmd_getval(cmdmap, "format", format);
+ cmd_getval(cmdmap, "prefix", prefix);
} catch (const bad_cmd_get& e) {
- return false;
+ return on_finish(-EINVAL, "invalid json, missing format and/or prefix",
+ empty);
}
- if (format != "json" && format != "json-pretty" &&
- format != "xml" && format != "xml-pretty")
- format = "json-pretty";
- std::unique_lock l(lock);
- decltype(hooks)::iterator p;
- while (match.size()) {
- p = hooks.find(match);
- if (p != hooks.cend())
- break;
-
- // drop right-most word
- size_t pos = match.rfind(' ');
- if (pos == std::string::npos) {
- match.clear(); // we fail
- break;
- } else {
- match.resize(pos);
- }
+ Formatter *f = Formatter::create(format, "json-pretty", "json-pretty");
+
+ auto [retval, hook] = find_matched_hook(prefix, cmdmap);
+ switch (retval) {
+ case ENOENT:
+ lderr(m_cct) << "AdminSocket: request '" << cmdvec
+ << "' not defined" << dendl;
+ delete f;
+ return on_finish(-EINVAL, "unknown command prefix "s + prefix, empty);
+ case EINVAL:
+ delete f;
+ return on_finish(-EINVAL, "invalid command json", empty);
+ default:
+ assert(retval == 0);
}
- if (p == hooks.cend()) {
- lderr(m_cct) << "AdminSocket: request '" << cmd << "' not defined" << dendl;
- return false;
- }
- string args;
- if (match != cmd) {
- args = cmd.substr(match.length() + 1);
- }
-
- // Drop lock to avoid cycles in cases where the hook takes
- // the same lock that was held during calls to register/unregister,
- // and set in_hook to allow unregister to wait for us before
- // removing this hook.
- in_hook = true;
- auto match_hook = p->second.hook;
- l.unlock();
- bool success = (validate(match, cmdmap, out) &&
- match_hook->call(match, cmdmap, format, out));
- l.lock();
+ hook->call_async(
+ prefix, cmdmap, f, inbl,
+ [f, on_finish](int r, const std::string& err, bufferlist& out) {
+ // handle either existing output in bufferlist *or* via formatter
+ if (r >= 0 && out.length() == 0) {
+ f->flush(out);
+ }
+ delete f;
+ on_finish(r, err, out);
+ });
+
+ std::unique_lock l(lock);
in_hook = false;
in_hook_cond.notify_all();
- if (!success) {
- ldout(m_cct, 0) << "AdminSocket: request '" << match << "' args '" << args
- << "' to " << match_hook << " failed" << dendl;
- out.append("failed");
- } else {
- ldout(m_cct, 5) << "AdminSocket: request '" << match << "' '" << args
- << "' to " << match_hook
- << " returned " << out.length() << " bytes" << dendl;
- }
- return true;
}
-
-
-bool AdminSocket::validate(const std::string& command,
- const cmdmap_t& cmdmap,
- bufferlist& out) const
+std::pair<int, AdminSocketHook*>
+AdminSocket::find_matched_hook(std::string& prefix,
+ const cmdmap_t& cmdmap)
{
- stringstream os;
- if (validate_cmd(m_cct, hooks.at(command).desc, cmdmap, os)) {
- return true;
- } else {
- out.append(os);
- return false;
+ std::unique_lock l(lock);
+ // Drop lock after done with the lookup to avoid cycles in cases where the
+ // hook takes the same lock that was held during calls to
+ // register/unregister, and set in_hook to allow unregister to wait for us
+ // before removing this hook.
+ auto [hooks_begin, hooks_end] = hooks.equal_range(prefix);
+ if (hooks_begin == hooks_end) {
+ return {ENOENT, nullptr};
}
+ // make sure one of the registered commands with this prefix validates.
+ stringstream errss;
+ for (auto hook = hooks_begin; hook != hooks_end; ++hook) {
+ if (validate_cmd(m_cct, hook->second.desc, cmdmap, errss)) {
+ in_hook = true;
+ return {0, hook->second.hook};
+ }
+ }
+ return {EINVAL, nullptr};
+}
+
+void AdminSocket::queue_tell_command(cref_t<MCommand> m)
+{
+ ldout(m_cct,10) << __func__ << " " << *m << dendl;
+ std::lock_guard l(tell_lock);
+ tell_queue.push_back(std::move(m));
+ wakeup();
+}
+void AdminSocket::queue_tell_command(cref_t<MMonCommand> m)
+{
+ ldout(m_cct,10) << __func__ << " " << *m << dendl;
+ std::lock_guard l(tell_lock);
+ tell_legacy_queue.push_back(std::move(m));
+ wakeup();
}
-int AdminSocket::register_command(std::string_view command,
- std::string_view cmddesc,
+int AdminSocket::register_command(std::string_view cmddesc,
AdminSocketHook *hook,
std::string_view help)
{
int ret;
std::unique_lock l(lock);
- auto i = hooks.find(command);
- if (i != hooks.cend()) {
- ldout(m_cct, 5) << "register_command " << command << " hook " << hook
+ string prefix = cmddesc_get_prefix(cmddesc);
+ auto i = hooks.find(prefix);
+ if (i != hooks.cend() &&
+ i->second.desc == cmddesc) {
+ ldout(m_cct, 5) << "register_command " << prefix
+ << " cmddesc " << cmddesc << " hook " << hook
<< " EEXIST" << dendl;
ret = -EEXIST;
} else {
- ldout(m_cct, 5) << "register_command " << command << " hook " << hook
+ ldout(m_cct, 5) << "register_command " << prefix << " hook " << hook
<< dendl;
hooks.emplace_hint(i,
std::piecewise_construct,
- std::forward_as_tuple(command),
+ std::forward_as_tuple(prefix),
std::forward_as_tuple(hook, cmddesc, help));
ret = 0;
}
return ret;
}
-int AdminSocket::unregister_command(std::string_view command)
-{
- int ret;
- std::unique_lock l(lock);
- auto i = hooks.find(command);
- if (i != hooks.cend()) {
- ldout(m_cct, 5) << "unregister_command " << command << dendl;
-
- // If we are currently processing a command, wait for it to
- // complete in case it referenced the hook that we are
- // unregistering.
- in_hook_cond.wait(l, [this]() { return !in_hook; });
-
- hooks.erase(i);
-
-
- ret = 0;
- } else {
- ldout(m_cct, 5) << "unregister_command " << command << " ENOENT" << dendl;
- ret = -ENOENT;
- }
- return ret;
-}
-
void AdminSocket::unregister_commands(const AdminSocketHook *hook)
{
std::unique_lock l(lock);
class VersionHook : public AdminSocketHook {
public:
- bool call(std::string_view command, const cmdmap_t& cmdmap,
- std::string_view format, bufferlist& out) override {
+ int call(std::string_view command, const cmdmap_t& cmdmap,
+ Formatter *f,
+ std::ostream& errss,
+ bufferlist& out) override {
if (command == "0"sv) {
out.append(CEPH_ADMIN_SOCK_VERSION);
} else {
- JSONFormatter jf;
- jf.open_object_section("version");
+ f->open_object_section("version");
if (command == "version") {
- jf.dump_string("version", ceph_version_to_str());
- jf.dump_string("release", ceph_release_name(ceph_release()));
- jf.dump_string("release_type", ceph_release_type());
+ f->dump_string("version", ceph_version_to_str());
+ f->dump_string("release", ceph_release_to_str());
+ f->dump_string("release_type", ceph_release_type());
} else if (command == "git_version") {
- jf.dump_string("git_version", git_version_to_str());
+ f->dump_string("git_version", git_version_to_str());
}
ostringstream ss;
- jf.close_section();
- jf.enable_line_break();
- jf.flush(ss);
- out.append(ss.str());
+ f->close_section();
}
- return true;
+ return 0;
}
};
AdminSocket *m_as;
public:
explicit HelpHook(AdminSocket *as) : m_as(as) {}
- bool call(std::string_view command, const cmdmap_t& cmdmap,
- std::string_view format,
- bufferlist& out) override {
- std::unique_ptr<Formatter> f(Formatter::create(format, "json-pretty"sv,
- "json-pretty"sv));
+ int call(std::string_view command, const cmdmap_t& cmdmap,
+ Formatter *f,
+ std::ostream& errss,
+ bufferlist& out) override {
f->open_object_section("help");
for (const auto& [command, info] : m_as->hooks) {
if (info.help.length())
f->dump_string(command.c_str(), info.help);
}
f->close_section();
- ostringstream ss;
- f->flush(ss);
- out.append(ss.str());
- return true;
+ return 0;
}
};
AdminSocket *m_as;
public:
explicit GetdescsHook(AdminSocket *as) : m_as(as) {}
- bool call(std::string_view command, const cmdmap_t& cmdmap,
- std::string_view format, bufferlist& out) override {
+ int call(std::string_view command, const cmdmap_t& cmdmap,
+ Formatter *f,
+ std::ostream& errss,
+ bufferlist& out) override {
int cmdnum = 0;
- JSONFormatter jf;
- jf.open_object_section("command_descriptions");
+ f->open_object_section("command_descriptions");
for (const auto& [command, info] : m_as->hooks) {
// GCC 8 actually has [[maybe_unused]] on a structured binding
// do what you'd expect. GCC 7 does not.
(void)command;
ostringstream secname;
secname << "cmd" << setfill('0') << std::setw(3) << cmdnum;
- dump_cmd_and_help_to_json(&jf,
+ dump_cmd_and_help_to_json(f,
CEPH_FEATURES_ALL,
secname.str().c_str(),
info.desc,
info.help);
cmdnum++;
}
- jf.close_section(); // command_descriptions
- jf.enable_line_break();
- ostringstream ss;
- jf.flush(ss);
- out.append(ss.str());
- return true;
+ f->close_section(); // command_descriptions
+ return 0;
}
};
/* Set up things for the new thread */
std::string err;
int pipe_rd = -1, pipe_wr = -1;
- err = create_shutdown_pipe(&pipe_rd, &pipe_wr);
+ err = create_wakeup_pipe(&pipe_rd, &pipe_wr);
if (!err.empty()) {
lderr(m_cct) << "AdminSocketConfigObs::init: error: " << err << dendl;
return false;
/* Create new thread */
m_sock_fd = sock_fd;
- m_shutdown_rd_fd = pipe_rd;
- m_shutdown_wr_fd = pipe_wr;
+ m_wakeup_rd_fd = pipe_rd;
+ m_wakeup_wr_fd = pipe_wr;
m_path = path;
version_hook = std::make_unique<VersionHook>();
- register_command("0", "0", version_hook.get(), "");
- register_command("version", "version", version_hook.get(), "get ceph version");
- register_command("git_version", "git_version", version_hook.get(),
+ register_command("0", version_hook.get(), "");
+ register_command("version", version_hook.get(), "get ceph version");
+ register_command("git_version", version_hook.get(),
"get git sha1");
help_hook = std::make_unique<HelpHook>(this);
- register_command("help", "help", help_hook.get(),
+ register_command("help", help_hook.get(),
"list available commands");
getdescs_hook = std::make_unique<GetdescsHook>(this);
- register_command("get_command_descriptions", "get_command_descriptions",
+ register_command("get_command_descriptions",
getdescs_hook.get(), "list available commands");
th = make_named_thread("admin_socket", &AdminSocket::entry, this);
// Under normal operation this is unlikely to occur. However for some unit
// tests, some object members are not initialized and so cannot be deleted
// without fault.
- if (m_shutdown_wr_fd < 0)
+ if (m_wakeup_wr_fd < 0)
return;
ldout(m_cct, 5) << "shutdown" << dendl;
+ m_shutdown = true;
- auto err = destroy_shutdown_pipe();
+ auto err = destroy_wakeup_pipe();
if (!err.empty()) {
lderr(m_cct) << "AdminSocket::shutdown: error: " << err << dendl;
}
unregister_commands(version_hook.get());
version_hook.reset();
- unregister_command("help");
+ unregister_commands(help_hook.get());
help_hook.reset();
- unregister_command("get_command_descriptions");
+ unregister_commands(getdescs_hook.get());
getdescs_hook.reset();
remove_cleanup_file(m_path);
m_path.clear();
}
+
+void AdminSocket::wakeup()
+{
+ // Send a byte to the wakeup pipe that the thread is listening to
+ char buf[1] = { 0x0 };
+ int r = safe_write(m_wakeup_wr_fd, buf, sizeof(buf));
+ (void)r;
+}