]> git.proxmox.com Git - ceph.git/blame - ceph/src/mon/MonClient.h
bump version to 18.2.2-pve1
[ceph.git] / ceph / src / mon / MonClient.h
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
7c673cae
FG
14#ifndef CEPH_MONCLIENT_H
15#define CEPH_MONCLIENT_H
16
9f95a23c
TL
17#include <functional>
18#include <list>
19#include <map>
7c673cae 20#include <memory>
9f95a23c
TL
21#include <set>
22#include <string>
23#include <vector>
7c673cae
FG
24
25#include "msg/Messenger.h"
26
27#include "MonMap.h"
11fdf7f2 28#include "MonSub.h"
7c673cae 29
39ae355f 30#include "common/admin_socket.h"
f67539c2 31#include "common/async/completion.h"
7c673cae 32#include "common/Timer.h"
7c673cae 33#include "common/config.h"
f67539c2 34#include "messages/MMonGetVersion.h"
7c673cae 35
11fdf7f2
TL
36#include "auth/AuthClient.h"
37#include "auth/AuthServer.h"
7c673cae
FG
38
39class MMonMap;
11fdf7f2 40class MConfig;
7c673cae 41class MMonGetVersionReply;
7c673cae 42class MMonCommandAck;
7c673cae 43class LogClient;
7c673cae 44class AuthClientHandler;
11fdf7f2 45class AuthRegistry;
7c673cae
FG
46class KeyRing;
47class RotatingKeyRing;
48
11fdf7f2
TL
49class MonConnection {
50public:
51 MonConnection(CephContext *cct,
52 ConnectionRef conn,
53 uint64_t global_id,
54 AuthRegistry *auth_registry);
55 ~MonConnection();
56 MonConnection(MonConnection&& rhs) = default;
57 MonConnection& operator=(MonConnection&&) = default;
58 MonConnection(const MonConnection& rhs) = delete;
59 MonConnection& operator=(const MonConnection&) = delete;
60 int handle_auth(MAuthReply *m,
61 const EntityName& entity_name,
62 uint32_t want_keys,
63 RotatingKeyRing* keyring);
64 int authenticate(MAuthReply *m);
65 void start(epoch_t epoch,
66 const EntityName& entity_name);
67 bool have_session() const;
68 uint64_t get_global_id() const {
69 return global_id;
70 }
71 ConnectionRef get_con() {
72 return con;
73 }
74 std::unique_ptr<AuthClientHandler>& get_auth() {
75 return auth;
76 }
77
78 int get_auth_request(
79 uint32_t *method,
80 std::vector<uint32_t> *preferred_modes,
9f95a23c 81 ceph::buffer::list *out,
11fdf7f2
TL
82 const EntityName& entity_name,
83 uint32_t want_keys,
84 RotatingKeyRing* keyring);
85 int handle_auth_reply_more(
86 AuthConnectionMeta *auth_meta,
9f95a23c
TL
87 const ceph::buffer::list& bl,
88 ceph::buffer::list *reply);
11fdf7f2
TL
89 int handle_auth_done(
90 AuthConnectionMeta *auth_meta,
91 uint64_t global_id,
9f95a23c 92 const ceph::buffer::list& bl,
11fdf7f2
TL
93 CryptoKey *session_key,
94 std::string *connection_secret);
95 int handle_auth_bad_method(
96 uint32_t old_auth_method,
97 int result,
98 const std::vector<uint32_t>& allowed_methods,
99 const std::vector<uint32_t>& allowed_modes);
100
101 bool is_con(Connection *c) const {
102 return con.get() == c;
103 }
9f95a23c
TL
104 void queue_command(Message *m) {
105 pending_tell_command = m;
106 }
11fdf7f2
TL
107
108private:
109 int _negotiate(MAuthReply *m,
110 const EntityName& entity_name,
111 uint32_t want_keys,
112 RotatingKeyRing* keyring);
113 int _init_auth(uint32_t method,
114 const EntityName& entity_name,
115 uint32_t want_keys,
116 RotatingKeyRing* keyring,
117 bool msgr2);
118
119private:
120 CephContext *cct;
121 enum class State {
122 NONE,
123 NEGOTIATING, // v1 only
124 AUTHENTICATING, // v1 and v2
125 HAVE_SESSION,
126 };
127 State state = State::NONE;
128 ConnectionRef con;
129 int auth_method = -1;
130 utime_t auth_start;
131
132 std::unique_ptr<AuthClientHandler> auth;
133 uint64_t global_id;
134
9f95a23c
TL
135 MessageRef pending_tell_command;
136
11fdf7f2
TL
137 AuthRegistry *auth_registry;
138};
139
140
141struct MonClientPinger : public Dispatcher,
142 public AuthClient {
9f95a23c
TL
143 ceph::mutex lock = ceph::make_mutex("MonClientPinger::lock");
144 ceph::condition_variable ping_recvd_cond;
145 std::string *result;
7c673cae 146 bool done;
11fdf7f2
TL
147 RotatingKeyRing *keyring;
148 std::unique_ptr<MonConnection> mc;
7c673cae 149
11fdf7f2
TL
150 MonClientPinger(CephContext *cct_,
151 RotatingKeyRing *keyring,
9f95a23c 152 std::string *res_) :
7c673cae 153 Dispatcher(cct_),
7c673cae 154 result(res_),
11fdf7f2
TL
155 done(false),
156 keyring(keyring)
7c673cae
FG
157 { }
158
159 int wait_for_reply(double timeout = 0.0) {
9f95a23c
TL
160 std::unique_lock locker{lock};
161 if (timeout <= 0) {
2a845540 162 timeout = std::chrono::duration<double>(cct->_conf.get_val<std::chrono::seconds>("client_mount_timeout")).count();
9f95a23c 163 }
7c673cae 164 done = false;
9f95a23c
TL
165 if (ping_recvd_cond.wait_for(locker,
166 ceph::make_timespan(timeout),
167 [this] { return done; })) {
168 return 0;
169 } else {
170 return ETIMEDOUT;
7c673cae 171 }
7c673cae
FG
172 }
173
174 bool ms_dispatch(Message *m) override {
9f95a23c 175 using ceph::decode;
11fdf7f2 176 std::lock_guard l(lock);
7c673cae
FG
177 if (m->get_type() != CEPH_MSG_PING)
178 return false;
179
9f95a23c 180 ceph::buffer::list &payload = m->get_payload();
7c673cae 181 if (result && payload.length() > 0) {
11fdf7f2
TL
182 auto p = std::cbegin(payload);
183 decode(*result, p);
7c673cae
FG
184 }
185 done = true;
9f95a23c 186 ping_recvd_cond.notify_all();
7c673cae
FG
187 m->put();
188 return true;
189 }
190 bool ms_handle_reset(Connection *con) override {
11fdf7f2 191 std::lock_guard l(lock);
7c673cae 192 done = true;
9f95a23c 193 ping_recvd_cond.notify_all();
7c673cae
FG
194 return true;
195 }
196 void ms_handle_remote_reset(Connection *con) override {}
197 bool ms_handle_refused(Connection *con) override {
198 return false;
199 }
7c673cae 200
11fdf7f2
TL
201 // AuthClient
202 int get_auth_request(
203 Connection *con,
204 AuthConnectionMeta *auth_meta,
205 uint32_t *auth_method,
206 std::vector<uint32_t> *preferred_modes,
9f95a23c 207 ceph::buffer::list *bl) override {
11fdf7f2
TL
208 return mc->get_auth_request(auth_method, preferred_modes, bl,
209 cct->_conf->name, 0, keyring);
7c673cae 210 }
11fdf7f2
TL
211 int handle_auth_reply_more(
212 Connection *con,
213 AuthConnectionMeta *auth_meta,
9f95a23c
TL
214 const ceph::buffer::list& bl,
215 ceph::buffer::list *reply) override {
11fdf7f2 216 return mc->handle_auth_reply_more(auth_meta, bl, reply);
7c673cae 217 }
11fdf7f2
TL
218 int handle_auth_done(
219 Connection *con,
220 AuthConnectionMeta *auth_meta,
221 uint64_t global_id,
222 uint32_t con_mode,
9f95a23c 223 const ceph::buffer::list& bl,
11fdf7f2
TL
224 CryptoKey *session_key,
225 std::string *connection_secret) override {
226 return mc->handle_auth_done(auth_meta, global_id, bl,
227 session_key, connection_secret);
228 }
229 int handle_auth_bad_method(
230 Connection *con,
231 AuthConnectionMeta *auth_meta,
232 uint32_t old_auth_method,
233 int result,
234 const std::vector<uint32_t>& allowed_methods,
235 const std::vector<uint32_t>& allowed_modes) override {
236 return mc->handle_auth_bad_method(old_auth_method, result,
237 allowed_methods, allowed_modes);
7c673cae 238 }
7c673cae
FG
239};
240
f67539c2
TL
241const boost::system::error_category& monc_category() noexcept;
242
243enum class monc_errc {
244 shutting_down = 1, // Command failed due to MonClient shutting down
245 session_reset, // Monitor session was reset
246 rank_dne, // Requested monitor rank does not exist
247 mon_dne, // Requested monitor does not exist
248 timed_out, // Monitor operation timed out
249 mon_unavailable // Monitor unavailable
250};
251
252namespace boost::system {
253template<>
254struct is_error_code_enum<::monc_errc> {
255 static const bool value = true;
256};
257}
258
259// implicit conversion:
260inline boost::system::error_code make_error_code(monc_errc e) noexcept {
261 return { static_cast<int>(e), monc_category() };
262}
263
264// explicit conversion:
265inline boost::system::error_condition make_error_condition(monc_errc e) noexcept {
266 return { static_cast<int>(e), monc_category() };
267}
268
269const boost::system::error_category& monc_category() noexcept;
11fdf7f2
TL
270
271class MonClient : public Dispatcher,
272 public AuthClient,
39ae355f
TL
273 public AuthServer, /* for mgr, osd, mds */
274 public AdminSocketHook {
f67539c2 275 static constexpr auto dout_subsys = ceph_subsys_monc;
7c673cae 276public:
f67539c2
TL
277 // Error, Newest, Oldest
278 using VersionSig = void(boost::system::error_code, version_t, version_t);
279 using VersionCompletion = ceph::async::Completion<VersionSig>;
280
281 using CommandSig = void(boost::system::error_code, std::string,
282 ceph::buffer::list);
283 using CommandCompletion = ceph::async::Completion<CommandSig>;
284
7c673cae 285 MonMap monmap;
9f95a23c 286 std::map<std::string,std::string> config_mgr;
11fdf7f2 287
7c673cae
FG
288private:
289 Messenger *messenger;
290
291 std::unique_ptr<MonConnection> active_con;
11fdf7f2 292 std::map<entity_addrvec_t, MonConnection> pending_cons;
9f95a23c 293 std::set<unsigned> tried;
7c673cae
FG
294
295 EntityName entity_name;
296
9f95a23c 297 mutable ceph::mutex monc_lock = ceph::make_mutex("MonClient::monc_lock");
7c673cae 298 SafeTimer timer;
f67539c2
TL
299 boost::asio::io_context& service;
300 boost::asio::io_context::strand finish_strand{service};
7c673cae
FG
301
302 bool initialized;
11fdf7f2 303 bool stopping = false;
7c673cae
FG
304
305 LogClient *log_client;
306 bool more_log_pending;
307
308 void send_log(bool flush = false);
309
7c673cae
FG
310 bool ms_dispatch(Message *m) override;
311 bool ms_handle_reset(Connection *con) override;
312 void ms_handle_remote_reset(Connection *con) override {}
313 bool ms_handle_refused(Connection *con) override { return false; }
314
315 void handle_monmap(MMonMap *m);
11fdf7f2 316 void handle_config(MConfig *m);
7c673cae
FG
317
318 void handle_auth(MAuthReply *m);
319
39ae355f
TL
320 int call(
321 std::string_view command,
322 const cmdmap_t& cmdmap,
323 const ceph::buffer::list &inbl,
324 ceph::Formatter *f,
325 std::ostream& errss,
326 ceph::buffer::list& out) override;
327
7c673cae 328 // monitor session
9f95a23c
TL
329 utime_t last_keepalive;
330 utime_t last_send_log;
331
7c673cae
FG
332 void tick();
333 void schedule_tick();
334
335 // monclient
336 bool want_monmap;
9f95a23c 337 ceph::condition_variable map_cond;
31f18b77 338 bool passthrough_monmap = false;
f67539c2
TL
339
340 bool want_bootstrap_config = false;
341 ceph::ref_t<MConfig> bootstrap_config;
31f18b77 342
7c673cae
FG
343 // authenticate
344 std::unique_ptr<AuthClientHandler> auth;
345 uint32_t want_keys = 0;
346 uint64_t global_id = 0;
9f95a23c 347 ceph::condition_variable auth_cond;
7c673cae
FG
348 int authenticate_err = 0;
349 bool authenticated = false;
350
9f95a23c 351 std::list<MessageRef> waiting_for_session;
7c673cae 352 utime_t last_rotating_renew_sent;
7c673cae
FG
353 bool had_a_connection;
354 double reopen_interval_multiplier;
355
11fdf7f2 356 Dispatcher *handle_authentication_dispatcher = nullptr;
7c673cae
FG
357 bool _opened() const;
358 bool _hunting() const;
359 void _start_hunting();
11fdf7f2 360 void _finish_hunting(int auth_err);
7c673cae
FG
361 void _finish_auth(int auth_err);
362 void _reopen_session(int rank = -1);
f67539c2 363 void _add_conn(unsigned rank);
c5c27e9a 364 void _add_conns();
c07f9fc5 365 void _un_backoff();
9f95a23c 366 void _send_mon_message(MessageRef m);
7c673cae 367
11fdf7f2
TL
368 std::map<entity_addrvec_t, MonConnection>::iterator _find_pending_con(
369 const ConnectionRef& con) {
370 for (auto i = pending_cons.begin(); i != pending_cons.end(); ++i) {
371 if (i->second.get_con() == con) {
372 return i;
373 }
374 }
375 return pending_cons.end();
376 }
377
7c673cae 378public:
11fdf7f2
TL
379 // AuthClient
380 int get_auth_request(
381 Connection *con,
382 AuthConnectionMeta *auth_meta,
383 uint32_t *method,
384 std::vector<uint32_t> *preferred_modes,
9f95a23c 385 ceph::buffer::list *bl) override;
11fdf7f2
TL
386 int handle_auth_reply_more(
387 Connection *con,
388 AuthConnectionMeta *auth_meta,
9f95a23c
TL
389 const ceph::buffer::list& bl,
390 ceph::buffer::list *reply) override;
11fdf7f2
TL
391 int handle_auth_done(
392 Connection *con,
393 AuthConnectionMeta *auth_meta,
394 uint64_t global_id,
395 uint32_t con_mode,
9f95a23c 396 const ceph::buffer::list& bl,
11fdf7f2
TL
397 CryptoKey *session_key,
398 std::string *connection_secret) override;
399 int handle_auth_bad_method(
400 Connection *con,
401 AuthConnectionMeta *auth_meta,
402 uint32_t old_auth_method,
403 int result,
404 const std::vector<uint32_t>& allowed_methods,
405 const std::vector<uint32_t>& allowed_modes) override;
406 // AuthServer
407 int handle_auth_request(
408 Connection *con,
409 AuthConnectionMeta *auth_meta,
410 bool more,
411 uint32_t auth_method,
9f95a23c 412 const ceph::buffer::list& bl,
f67539c2 413 ceph::buffer::list *reply) override;
7c673cae 414
11fdf7f2
TL
415 void set_entity_name(EntityName name) { entity_name = name; }
416 void set_handle_authentication_dispatcher(Dispatcher *d) {
417 handle_authentication_dispatcher = d;
418 }
7c673cae
FG
419 int _check_auth_tickets();
420 int _check_auth_rotating();
421 int wait_auth_rotating(double timeout);
422
423 int authenticate(double timeout=0.0);
424 bool is_authenticated() const {return authenticated;}
425
94b18763
FG
426 bool is_connected() const { return active_con != nullptr; }
427
7c673cae
FG
428 /**
429 * Try to flush as many log messages as we can in a single
430 * message. Use this before shutting down to transmit your
431 * last message.
432 */
433 void flush_log();
434
7c673cae 435private:
11fdf7f2
TL
436 // mon subscriptions
437 MonSub sub;
7c673cae
FG
438 void _renew_subs();
439 void handle_subscribe_ack(MMonSubscribeAck* m);
440
7c673cae
FG
441public:
442 void renew_subs() {
11fdf7f2 443 std::lock_guard l(monc_lock);
7c673cae
FG
444 _renew_subs();
445 }
9f95a23c 446 bool sub_want(std::string what, version_t start, unsigned flags) {
11fdf7f2
TL
447 std::lock_guard l(monc_lock);
448 return sub.want(what, start, flags);
7c673cae 449 }
9f95a23c 450 void sub_got(std::string what, version_t have) {
11fdf7f2
TL
451 std::lock_guard l(monc_lock);
452 sub.got(what, have);
7c673cae 453 }
9f95a23c 454 void sub_unwant(std::string what) {
11fdf7f2
TL
455 std::lock_guard l(monc_lock);
456 sub.unwant(what);
7c673cae 457 }
9f95a23c 458 bool sub_want_increment(std::string what, version_t start, unsigned flags) {
11fdf7f2
TL
459 std::lock_guard l(monc_lock);
460 return sub.inc_want(what, start, flags);
7c673cae 461 }
f67539c2 462
7c673cae
FG
463 std::unique_ptr<KeyRing> keyring;
464 std::unique_ptr<RotatingKeyRing> rotating_secrets;
465
466 public:
f67539c2 467 MonClient(CephContext *cct_, boost::asio::io_context& service);
7c673cae
FG
468 MonClient(const MonClient &) = delete;
469 MonClient& operator=(const MonClient &) = delete;
470 ~MonClient() override;
471
472 int init();
473 void shutdown();
474
475 void set_log_client(LogClient *clog) {
476 log_client = clog;
477 }
9f95a23c
TL
478 LogClient *get_log_client() {
479 return log_client;
480 }
7c673cae
FG
481
482 int build_initial_monmap();
483 int get_monmap();
11fdf7f2 484 int get_monmap_and_config();
31f18b77
FG
485 /**
486 * If you want to see MonMap messages, set this and
487 * the MonClient will tell the Messenger it hasn't
488 * dealt with it.
489 * Note that if you do this, *you* are of course responsible for
490 * putting the message reference!
491 */
492 void set_passthrough_monmap() {
11fdf7f2 493 std::lock_guard l(monc_lock);
31f18b77
FG
494 passthrough_monmap = true;
495 }
496 void unset_passthrough_monmap() {
11fdf7f2 497 std::lock_guard l(monc_lock);
31f18b77
FG
498 passthrough_monmap = false;
499 }
7c673cae
FG
500 /**
501 * Ping monitor with ID @p mon_id and record the resulting
502 * reply in @p result_reply.
503 *
504 * @param[in] mon_id Target monitor's ID
505 * @param[out] result_reply reply from mon.ID, if param != NULL
506 * @returns 0 in case of success; < 0 in case of error,
507 * -ETIMEDOUT if monitor didn't reply before timeout
508 * expired (default: conf->client_mount_timeout).
509 */
9f95a23c 510 int ping_monitor(const std::string &mon_id, std::string *result_reply);
7c673cae
FG
511
512 void send_mon_message(Message *m) {
9f95a23c 513 send_mon_message(MessageRef{m, false});
7c673cae 514 }
9f95a23c 515 void send_mon_message(MessageRef m);
b3b6e05e
TL
516
517 void reopen_session() {
11fdf7f2 518 std::lock_guard l(monc_lock);
7c673cae
FG
519 _reopen_session();
520 }
521
7c673cae
FG
522 const uuid_d& get_fsid() const {
523 return monmap.fsid;
524 }
525
11fdf7f2
TL
526 entity_addrvec_t get_mon_addrs(unsigned i) const {
527 std::lock_guard l(monc_lock);
7c673cae 528 if (i < monmap.size())
11fdf7f2
TL
529 return monmap.get_addrs(i);
530 return entity_addrvec_t();
7c673cae
FG
531 }
532 int get_num_mon() const {
11fdf7f2 533 std::lock_guard l(monc_lock);
7c673cae
FG
534 return monmap.size();
535 }
536
537 uint64_t get_global_id() const {
11fdf7f2 538 std::lock_guard l(monc_lock);
7c673cae
FG
539 return global_id;
540 }
541
542 void set_messenger(Messenger *m) { messenger = m; }
11fdf7f2 543 entity_addrvec_t get_myaddrs() const { return messenger->get_myaddrs(); }
7c673cae
FG
544 AuthAuthorizer* build_authorizer(int service_id) const;
545
546 void set_want_keys(uint32_t want) {
547 want_keys = want;
548 }
549
550 // admin commands
551private:
552 uint64_t last_mon_command_tid;
11fdf7f2 553
7c673cae 554 struct MonCommand {
9f95a23c
TL
555 // for tell only
556 std::string target_name;
f67539c2 557 int target_rank = -1;
9f95a23c
TL
558 ConnectionRef target_con;
559 std::unique_ptr<MonConnection> target_session;
560 unsigned send_attempts = 0; ///< attempt count for legacy mons
561 utime_t last_send_attempt;
7c673cae 562 uint64_t tid;
9f95a23c
TL
563 std::vector<std::string> cmd;
564 ceph::buffer::list inbl;
f67539c2
TL
565 std::unique_ptr<CommandCompletion> onfinish;
566 std::optional<boost::asio::steady_timer> cancel_timer;
567
568 MonCommand(MonClient& monc, uint64_t t, std::unique_ptr<CommandCompletion> onfinish)
569 : tid(t), onfinish(std::move(onfinish)) {
570 auto timeout =
571 monc.cct->_conf.get_val<std::chrono::seconds>("rados_mon_op_timeout");
572 if (timeout.count() > 0) {
573 cancel_timer.emplace(monc.service, timeout);
574 cancel_timer->async_wait(
575 [this, &monc](boost::system::error_code ec) {
576 if (ec)
577 return;
578 std::scoped_lock l(monc.monc_lock);
579 monc._cancel_mon_command(tid);
580 });
581 }
582 }
9f95a23c
TL
583
584 bool is_tell() const {
585 return target_name.size() || target_rank >= 0;
586 }
7c673cae 587 };
f67539c2 588 friend MonCommand;
9f95a23c 589 std::map<uint64_t,MonCommand*> mon_commands;
7c673cae
FG
590
591 void _send_command(MonCommand *r);
9f95a23c 592 void _check_tell_commands();
7c673cae 593 void _resend_mon_commands();
31f18b77 594 int _cancel_mon_command(uint64_t tid);
f67539c2
TL
595 void _finish_command(MonCommand *r, boost::system::error_code ret, std::string_view rs,
596 bufferlist&& bl);
7c673cae
FG
597 void _finish_auth();
598 void handle_mon_command_ack(MMonCommandAck *ack);
9f95a23c 599 void handle_command_reply(MCommandReply *reply);
7c673cae
FG
600
601public:
f67539c2
TL
602 template<typename CompletionToken>
603 auto start_mon_command(const std::vector<std::string>& cmd,
604 const ceph::buffer::list& inbl,
605 CompletionToken&& token) {
606 ldout(cct,10) << __func__ << " cmd=" << cmd << dendl;
607 boost::asio::async_completion<CompletionToken, CommandSig> init(token);
608 {
609 std::scoped_lock l(monc_lock);
610 auto h = CommandCompletion::create(service.get_executor(),
611 std::move(init.completion_handler));
612 if (!initialized || stopping) {
613 ceph::async::post(std::move(h), monc_errc::shutting_down, std::string{},
614 bufferlist{});
615 } else {
616 auto r = new MonCommand(*this, ++last_mon_command_tid, std::move(h));
617 r->cmd = cmd;
618 r->inbl = inbl;
619 mon_commands.emplace(r->tid, r);
620 _send_command(r);
621 }
622 }
623 return init.result.get();
624 }
625
626 template<typename CompletionToken>
627 auto start_mon_command(int mon_rank, const std::vector<std::string>& cmd,
628 const ceph::buffer::list& inbl, CompletionToken&& token) {
629 ldout(cct,10) << __func__ << " cmd=" << cmd << dendl;
630 boost::asio::async_completion<CompletionToken, CommandSig> init(token);
631 {
632 std::scoped_lock l(monc_lock);
633 auto h = CommandCompletion::create(service.get_executor(),
634 std::move(init.completion_handler));
635 if (!initialized || stopping) {
636 ceph::async::post(std::move(h), monc_errc::shutting_down, std::string{},
637 bufferlist{});
638 } else {
639 auto r = new MonCommand(*this, ++last_mon_command_tid, std::move(h));
640 r->target_rank = mon_rank;
641 r->cmd = cmd;
642 r->inbl = inbl;
643 mon_commands.emplace(r->tid, r);
644 _send_command(r);
645 }
646 }
647 return init.result.get();
648 }
649
650 template<typename CompletionToken>
651 auto start_mon_command(const std::string& mon_name,
652 const std::vector<std::string>& cmd,
653 const ceph::buffer::list& inbl,
654 CompletionToken&& token) {
655 ldout(cct,10) << __func__ << " cmd=" << cmd << dendl;
656 boost::asio::async_completion<CompletionToken, CommandSig> init(token);
657 {
658 std::scoped_lock l(monc_lock);
659 auto h = CommandCompletion::create(service.get_executor(),
660 std::move(init.completion_handler));
661 if (!initialized || stopping) {
662 ceph::async::post(std::move(h), monc_errc::shutting_down, std::string{},
663 bufferlist{});
664 } else {
665 auto r = new MonCommand(*this, ++last_mon_command_tid, std::move(h));
666 // detect/tolerate mon *rank* passed as a string
667 std::string err;
668 int rank = strict_strtoll(mon_name.c_str(), 10, &err);
669 if (err.size() == 0 && rank >= 0) {
670 ldout(cct,10) << __func__ << " interpreting name '" << mon_name
671 << "' as rank " << rank << dendl;
672 r->target_rank = rank;
673 } else {
674 r->target_name = mon_name;
675 }
676 r->cmd = cmd;
677 r->inbl = inbl;
678 mon_commands.emplace(r->tid, r);
679 _send_command(r);
680 }
681 }
682 return init.result.get();
683 }
684
685 class ContextVerter {
686 std::string* outs;
687 ceph::bufferlist* outbl;
688 Context* onfinish;
689
690 public:
691 ContextVerter(std::string* outs, ceph::bufferlist* outbl, Context* onfinish)
692 : outs(outs), outbl(outbl), onfinish(onfinish) {}
693 ~ContextVerter() = default;
694 ContextVerter(const ContextVerter&) = default;
695 ContextVerter& operator =(const ContextVerter&) = default;
696 ContextVerter(ContextVerter&&) = default;
697 ContextVerter& operator =(ContextVerter&&) = default;
698
699 void operator()(boost::system::error_code e,
700 std::string s,
701 ceph::bufferlist bl) {
702 if (outs)
703 *outs = std::move(s);
704 if (outbl)
705 *outbl = std::move(bl);
706 if (onfinish)
707 onfinish->complete(ceph::from_error_code(e));
708 }
709 };
710
20effc67
TL
711 void start_mon_command(const std::vector<std::string>& cmd, const bufferlist& inbl,
712 bufferlist *outbl, std::string *outs,
f67539c2
TL
713 Context *onfinish) {
714 start_mon_command(cmd, inbl, ContextVerter(outs, outbl, onfinish));
715 }
7c673cae 716 void start_mon_command(int mon_rank,
20effc67
TL
717 const std::vector<std::string>& cmd, const bufferlist& inbl,
718 bufferlist *outbl, std::string *outs,
f67539c2
TL
719 Context *onfinish) {
720 start_mon_command(mon_rank, cmd, inbl, ContextVerter(outs, outbl, onfinish));
721 }
20effc67
TL
722 void start_mon_command(const std::string &mon_name, ///< mon name, with mon. prefix
723 const std::vector<std::string>& cmd, const bufferlist& inbl,
724 bufferlist *outbl, std::string *outs,
f67539c2
TL
725 Context *onfinish) {
726 start_mon_command(mon_name, cmd, inbl, ContextVerter(outs, outbl, onfinish));
727 }
728
7c673cae
FG
729
730 // version requests
731public:
732 /**
733 * get latest known version(s) of cluster map
734 *
f67539c2
TL
735 * @param map string name of map (e.g., 'osdmap')
736 * @param token context that will be triggered on completion
737 * @return (via Completion) {} on success,
738 * boost::system::errc::resource_unavailable_try_again if we need to
739 * resubmit our request
7c673cae 740 */
f67539c2
TL
741 template<typename CompletionToken>
742 auto get_version(std::string&& map, CompletionToken&& token) {
743 boost::asio::async_completion<CompletionToken, VersionSig> init(token);
744 {
745 std::scoped_lock l(monc_lock);
746 auto m = ceph::make_message<MMonGetVersion>();
747 m->what = std::move(map);
748 m->handle = ++version_req_id;
749 version_requests.emplace(m->handle,
750 VersionCompletion::create(
751 service.get_executor(),
752 std::move(init.completion_handler)));
753 _send_mon_message(m);
754 }
755 return init.result.get();
756 }
757
7c673cae
FG
758 /**
759 * Run a callback within our lock, with a reference
760 * to the MonMap
761 */
762 template<typename Callback, typename...Args>
763 auto with_monmap(Callback&& cb, Args&&...args) const ->
764 decltype(cb(monmap, std::forward<Args>(args)...)) {
11fdf7f2 765 std::lock_guard l(monc_lock);
7c673cae
FG
766 return std::forward<Callback>(cb)(monmap, std::forward<Args>(args)...);
767 }
768
11fdf7f2
TL
769 void register_config_callback(md_config_t::config_callback fn);
770 void register_config_notify_callback(std::function<void(void)> f) {
771 config_notify_cb = f;
772 }
773 md_config_t::config_callback get_config_callback();
774
7c673cae 775private:
7c673cae 776
f67539c2 777 std::map<ceph_tid_t, std::unique_ptr<VersionCompletion>> version_requests;
7c673cae
FG
778 ceph_tid_t version_req_id;
779 void handle_get_version_reply(MMonGetVersionReply* m);
11fdf7f2
TL
780 md_config_t::config_callback config_cb;
781 std::function<void(void)> config_notify_cb;
7c673cae
FG
782};
783
784#endif