#include "MonMap.h"
#include "MonSub.h"
+#include "common/async/completion.h"
#include "common/Timer.h"
-#include "common/Finisher.h"
#include "common/config.h"
+#include "messages/MMonGetVersion.h"
#include "auth/AuthClient.h"
#include "auth/AuthServer.h"
class MMonMap;
class MConfig;
class MMonGetVersionReply;
-struct MMonSubscribeAck;
class MMonCommandAck;
-struct MAuthReply;
class LogClient;
-class AuthAuthorizer;
class AuthClientHandler;
class AuthRegistry;
class KeyRing;
struct MonClientPinger : public Dispatcher,
public AuthClient {
-
ceph::mutex lock = ceph::make_mutex("MonClientPinger::lock");
ceph::condition_variable ping_recvd_cond;
std::string *result;
}
};
+const boost::system::error_category& monc_category() noexcept;
+
+enum class monc_errc {
+ shutting_down = 1, // Command failed due to MonClient shutting down
+ session_reset, // Monitor session was reset
+ rank_dne, // Requested monitor rank does not exist
+ mon_dne, // Requested monitor does not exist
+ timed_out, // Monitor operation timed out
+ mon_unavailable // Monitor unavailable
+};
+
+namespace boost::system {
+template<>
+struct is_error_code_enum<::monc_errc> {
+ static const bool value = true;
+};
+}
+
+// implicit conversion:
+inline boost::system::error_code make_error_code(monc_errc e) noexcept {
+ return { static_cast<int>(e), monc_category() };
+}
+
+// explicit conversion:
+inline boost::system::error_condition make_error_condition(monc_errc e) noexcept {
+ return { static_cast<int>(e), monc_category() };
+}
+
+const boost::system::error_category& monc_category() noexcept;
class MonClient : public Dispatcher,
public AuthClient,
public AuthServer /* for mgr, osd, mds */ {
+ static constexpr auto dout_subsys = ceph_subsys_monc;
public:
+ // Error, Newest, Oldest
+ using VersionSig = void(boost::system::error_code, version_t, version_t);
+ using VersionCompletion = ceph::async::Completion<VersionSig>;
+
+ using CommandSig = void(boost::system::error_code, std::string,
+ ceph::buffer::list);
+ using CommandCompletion = ceph::async::Completion<CommandSig>;
+
MonMap monmap;
std::map<std::string,std::string> config_mgr;
mutable ceph::mutex monc_lock = ceph::make_mutex("MonClient::monc_lock");
SafeTimer timer;
- Finisher finisher;
+ boost::asio::io_context& service;
+ boost::asio::io_context::strand finish_strand{service};
bool initialized;
bool stopping = false;
bool want_monmap;
ceph::condition_variable map_cond;
bool passthrough_monmap = false;
- bool got_config = false;
+
+ bool want_bootstrap_config = false;
+ ceph::ref_t<MConfig> bootstrap_config;
// authenticate
std::unique_ptr<AuthClientHandler> auth;
double reopen_interval_multiplier;
Dispatcher *handle_authentication_dispatcher = nullptr;
-
bool _opened() const;
bool _hunting() const;
void _start_hunting();
void _finish_hunting(int auth_err);
void _finish_auth(int auth_err);
void _reopen_session(int rank = -1);
- MonConnection& _add_conn(unsigned rank);
+ void _add_conn(unsigned rank);
void _add_conns();
void _un_backoff();
void _send_mon_message(MessageRef m);
bool more,
uint32_t auth_method,
const ceph::buffer::list& bl,
- ceph::buffer::list *reply);
+ ceph::buffer::list *reply) override;
void set_entity_name(EntityName name) { entity_name = name; }
void set_handle_authentication_dispatcher(Dispatcher *d) {
std::lock_guard l(monc_lock);
return sub.inc_want(what, start, flags);
}
-
+
std::unique_ptr<KeyRing> keyring;
std::unique_ptr<RotatingKeyRing> rotating_secrets;
public:
- explicit MonClient(CephContext *cct_);
+ MonClient(CephContext *cct_, boost::asio::io_context& service);
MonClient(const MonClient &) = delete;
MonClient& operator=(const MonClient &) = delete;
~MonClient() override;
struct MonCommand {
// for tell only
std::string target_name;
- int target_rank;
+ int target_rank = -1;
ConnectionRef target_con;
std::unique_ptr<MonConnection> target_session;
unsigned send_attempts = 0; ///< attempt count for legacy mons
utime_t last_send_attempt;
-
uint64_t tid;
std::vector<std::string> cmd;
ceph::buffer::list inbl;
- ceph::buffer::list *poutbl;
- std::string *prs;
- int *prval;
- Context *onfinish, *ontimeout;
-
- explicit MonCommand(uint64_t t)
- : target_rank(-1),
- tid(t),
- poutbl(NULL), prs(NULL), prval(NULL), onfinish(NULL), ontimeout(NULL)
- {}
+ std::unique_ptr<CommandCompletion> onfinish;
+ std::optional<boost::asio::steady_timer> cancel_timer;
+
+ MonCommand(MonClient& monc, uint64_t t, std::unique_ptr<CommandCompletion> onfinish)
+ : tid(t), onfinish(std::move(onfinish)) {
+ auto timeout =
+ monc.cct->_conf.get_val<std::chrono::seconds>("rados_mon_op_timeout");
+ if (timeout.count() > 0) {
+ cancel_timer.emplace(monc.service, timeout);
+ cancel_timer->async_wait(
+ [this, &monc](boost::system::error_code ec) {
+ if (ec)
+ return;
+ std::scoped_lock l(monc.monc_lock);
+ monc._cancel_mon_command(tid);
+ });
+ }
+ }
bool is_tell() const {
return target_name.size() || target_rank >= 0;
}
};
+ friend MonCommand;
std::map<uint64_t,MonCommand*> mon_commands;
void _send_command(MonCommand *r);
void _check_tell_commands();
void _resend_mon_commands();
int _cancel_mon_command(uint64_t tid);
- void _finish_command(MonCommand *r, int ret, std::string rs);
+ void _finish_command(MonCommand *r, boost::system::error_code ret, std::string_view rs,
+ bufferlist&& bl);
void _finish_auth();
void handle_mon_command_ack(MMonCommandAck *ack);
void handle_command_reply(MCommandReply *reply);
public:
- void start_mon_command(const std::vector<std::string>& cmd, const ceph::buffer::list& inbl,
- ceph::buffer::list *outbl, std::string *outs,
- Context *onfinish);
+ template<typename CompletionToken>
+ auto start_mon_command(const std::vector<std::string>& cmd,
+ const ceph::buffer::list& inbl,
+ CompletionToken&& token) {
+ ldout(cct,10) << __func__ << " cmd=" << cmd << dendl;
+ boost::asio::async_completion<CompletionToken, CommandSig> init(token);
+ {
+ std::scoped_lock l(monc_lock);
+ auto h = CommandCompletion::create(service.get_executor(),
+ std::move(init.completion_handler));
+ if (!initialized || stopping) {
+ ceph::async::post(std::move(h), monc_errc::shutting_down, std::string{},
+ bufferlist{});
+ } else {
+ auto r = new MonCommand(*this, ++last_mon_command_tid, std::move(h));
+ r->cmd = cmd;
+ r->inbl = inbl;
+ mon_commands.emplace(r->tid, r);
+ _send_command(r);
+ }
+ }
+ return init.result.get();
+ }
+
+ template<typename CompletionToken>
+ auto start_mon_command(int mon_rank, const std::vector<std::string>& cmd,
+ const ceph::buffer::list& inbl, CompletionToken&& token) {
+ ldout(cct,10) << __func__ << " cmd=" << cmd << dendl;
+ boost::asio::async_completion<CompletionToken, CommandSig> init(token);
+ {
+ std::scoped_lock l(monc_lock);
+ auto h = CommandCompletion::create(service.get_executor(),
+ std::move(init.completion_handler));
+ if (!initialized || stopping) {
+ ceph::async::post(std::move(h), monc_errc::shutting_down, std::string{},
+ bufferlist{});
+ } else {
+ auto r = new MonCommand(*this, ++last_mon_command_tid, std::move(h));
+ r->target_rank = mon_rank;
+ r->cmd = cmd;
+ r->inbl = inbl;
+ mon_commands.emplace(r->tid, r);
+ _send_command(r);
+ }
+ }
+ return init.result.get();
+ }
+
+ template<typename CompletionToken>
+ auto start_mon_command(const std::string& mon_name,
+ const std::vector<std::string>& cmd,
+ const ceph::buffer::list& inbl,
+ CompletionToken&& token) {
+ ldout(cct,10) << __func__ << " cmd=" << cmd << dendl;
+ boost::asio::async_completion<CompletionToken, CommandSig> init(token);
+ {
+ std::scoped_lock l(monc_lock);
+ auto h = CommandCompletion::create(service.get_executor(),
+ std::move(init.completion_handler));
+ if (!initialized || stopping) {
+ ceph::async::post(std::move(h), monc_errc::shutting_down, std::string{},
+ bufferlist{});
+ } else {
+ auto r = new MonCommand(*this, ++last_mon_command_tid, std::move(h));
+ // detect/tolerate mon *rank* passed as a string
+ std::string err;
+ int rank = strict_strtoll(mon_name.c_str(), 10, &err);
+ if (err.size() == 0 && rank >= 0) {
+ ldout(cct,10) << __func__ << " interpreting name '" << mon_name
+ << "' as rank " << rank << dendl;
+ r->target_rank = rank;
+ } else {
+ r->target_name = mon_name;
+ }
+ r->cmd = cmd;
+ r->inbl = inbl;
+ mon_commands.emplace(r->tid, r);
+ _send_command(r);
+ }
+ }
+ return init.result.get();
+ }
+
+ class ContextVerter {
+ std::string* outs;
+ ceph::bufferlist* outbl;
+ Context* onfinish;
+
+ public:
+ ContextVerter(std::string* outs, ceph::bufferlist* outbl, Context* onfinish)
+ : outs(outs), outbl(outbl), onfinish(onfinish) {}
+ ~ContextVerter() = default;
+ ContextVerter(const ContextVerter&) = default;
+ ContextVerter& operator =(const ContextVerter&) = default;
+ ContextVerter(ContextVerter&&) = default;
+ ContextVerter& operator =(ContextVerter&&) = default;
+
+ void operator()(boost::system::error_code e,
+ std::string s,
+ ceph::bufferlist bl) {
+ if (outs)
+ *outs = std::move(s);
+ if (outbl)
+ *outbl = std::move(bl);
+ if (onfinish)
+ onfinish->complete(ceph::from_error_code(e));
+ }
+ };
+
+ void start_mon_command(const vector<string>& cmd, const bufferlist& inbl,
+ bufferlist *outbl, string *outs,
+ Context *onfinish) {
+ start_mon_command(cmd, inbl, ContextVerter(outs, outbl, onfinish));
+ }
void start_mon_command(int mon_rank,
- const std::vector<std::string>& cmd, const ceph::buffer::list& inbl,
- ceph::buffer::list *outbl, std::string *outs,
- Context *onfinish);
- void start_mon_command(const std::string &mon_name, ///< mon name, with mon. prefix
- const std::vector<std::string>& cmd, const ceph::buffer::list& inbl,
- ceph::buffer::list *outbl, std::string *outs,
- Context *onfinish);
+ const vector<string>& cmd, const bufferlist& inbl,
+ bufferlist *outbl, string *outs,
+ Context *onfinish) {
+ start_mon_command(mon_rank, cmd, inbl, ContextVerter(outs, outbl, onfinish));
+ }
+ void start_mon_command(const string &mon_name, ///< mon name, with mon. prefix
+ const vector<string>& cmd, const bufferlist& inbl,
+ bufferlist *outbl, string *outs,
+ Context *onfinish) {
+ start_mon_command(mon_name, cmd, inbl, ContextVerter(outs, outbl, onfinish));
+ }
+
// version requests
public:
/**
* get latest known version(s) of cluster map
*
- * @param map std::string name of map (e.g., 'osdmap')
- * @param newest pointer where newest map version will be stored
- * @param oldest pointer where oldest map version will be stored
- * @param onfinish context that will be triggered on completion
- * @return (via context) 0 on success, -EAGAIN if we need to resubmit our request
+ * @param map string name of map (e.g., 'osdmap')
+ * @param token context that will be triggered on completion
+ * @return (via Completion) {} on success,
+ * boost::system::errc::resource_unavailable_try_again if we need to
+ * resubmit our request
*/
- void get_version(std::string map, version_t *newest, version_t *oldest, Context *onfinish);
+ template<typename CompletionToken>
+ auto get_version(std::string&& map, CompletionToken&& token) {
+ boost::asio::async_completion<CompletionToken, VersionSig> init(token);
+ {
+ std::scoped_lock l(monc_lock);
+ auto m = ceph::make_message<MMonGetVersion>();
+ m->what = std::move(map);
+ m->handle = ++version_req_id;
+ version_requests.emplace(m->handle,
+ VersionCompletion::create(
+ service.get_executor(),
+ std::move(init.completion_handler)));
+ _send_mon_message(m);
+ }
+ return init.result.get();
+ }
+
/**
* Run a callback within our lock, with a reference
* to the MonMap
md_config_t::config_callback get_config_callback();
private:
- struct version_req_d {
- Context *context;
- version_t *newest, *oldest;
- version_req_d(Context *con, version_t *n, version_t *o) : context(con),newest(n), oldest(o) {}
- };
- std::map<ceph_tid_t, version_req_d*> version_requests;
+ std::map<ceph_tid_t, std::unique_ptr<VersionCompletion>> version_requests;
ceph_tid_t version_req_id;
void handle_get_version_reply(MMonGetVersionReply* m);
-
md_config_t::config_callback config_cb;
std::function<void(void)> config_notify_cb;
};