]> git.proxmox.com Git - ceph.git/blob - ceph/src/mon/MonClient.h
de6bba574ff1a296fd539e6ccee2290da4414ad2
[ceph.git] / ceph / src / mon / MonClient.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14 #ifndef CEPH_MONCLIENT_H
15 #define CEPH_MONCLIENT_H
16
17 #include <functional>
18 #include <list>
19 #include <map>
20 #include <memory>
21 #include <set>
22 #include <string>
23 #include <vector>
24
25 #include "msg/Messenger.h"
26
27 #include "MonMap.h"
28 #include "MonSub.h"
29
30 #include "common/async/completion.h"
31 #include "common/Timer.h"
32 #include "common/config.h"
33 #include "messages/MMonGetVersion.h"
34
35 #include "auth/AuthClient.h"
36 #include "auth/AuthServer.h"
37
38 class MMonMap;
39 class MConfig;
40 class MMonGetVersionReply;
41 class MMonCommandAck;
42 class LogClient;
43 class AuthClientHandler;
44 class AuthRegistry;
45 class KeyRing;
46 class RotatingKeyRing;
47
48 class MonConnection {
49 public:
50 MonConnection(CephContext *cct,
51 ConnectionRef conn,
52 uint64_t global_id,
53 AuthRegistry *auth_registry);
54 ~MonConnection();
55 MonConnection(MonConnection&& rhs) = default;
56 MonConnection& operator=(MonConnection&&) = default;
57 MonConnection(const MonConnection& rhs) = delete;
58 MonConnection& operator=(const MonConnection&) = delete;
59 int handle_auth(MAuthReply *m,
60 const EntityName& entity_name,
61 uint32_t want_keys,
62 RotatingKeyRing* keyring);
63 int authenticate(MAuthReply *m);
64 void start(epoch_t epoch,
65 const EntityName& entity_name);
66 bool have_session() const;
67 uint64_t get_global_id() const {
68 return global_id;
69 }
70 ConnectionRef get_con() {
71 return con;
72 }
73 std::unique_ptr<AuthClientHandler>& get_auth() {
74 return auth;
75 }
76
77 int get_auth_request(
78 uint32_t *method,
79 std::vector<uint32_t> *preferred_modes,
80 ceph::buffer::list *out,
81 const EntityName& entity_name,
82 uint32_t want_keys,
83 RotatingKeyRing* keyring);
84 int handle_auth_reply_more(
85 AuthConnectionMeta *auth_meta,
86 const ceph::buffer::list& bl,
87 ceph::buffer::list *reply);
88 int handle_auth_done(
89 AuthConnectionMeta *auth_meta,
90 uint64_t global_id,
91 const ceph::buffer::list& bl,
92 CryptoKey *session_key,
93 std::string *connection_secret);
94 int handle_auth_bad_method(
95 uint32_t old_auth_method,
96 int result,
97 const std::vector<uint32_t>& allowed_methods,
98 const std::vector<uint32_t>& allowed_modes);
99
100 bool is_con(Connection *c) const {
101 return con.get() == c;
102 }
103 void queue_command(Message *m) {
104 pending_tell_command = m;
105 }
106
107 private:
108 int _negotiate(MAuthReply *m,
109 const EntityName& entity_name,
110 uint32_t want_keys,
111 RotatingKeyRing* keyring);
112 int _init_auth(uint32_t method,
113 const EntityName& entity_name,
114 uint32_t want_keys,
115 RotatingKeyRing* keyring,
116 bool msgr2);
117
118 private:
119 CephContext *cct;
120 enum class State {
121 NONE,
122 NEGOTIATING, // v1 only
123 AUTHENTICATING, // v1 and v2
124 HAVE_SESSION,
125 };
126 State state = State::NONE;
127 ConnectionRef con;
128 int auth_method = -1;
129 utime_t auth_start;
130
131 std::unique_ptr<AuthClientHandler> auth;
132 uint64_t global_id;
133
134 MessageRef pending_tell_command;
135
136 AuthRegistry *auth_registry;
137 };
138
139
140 struct MonClientPinger : public Dispatcher,
141 public AuthClient {
142 ceph::mutex lock = ceph::make_mutex("MonClientPinger::lock");
143 ceph::condition_variable ping_recvd_cond;
144 std::string *result;
145 bool done;
146 RotatingKeyRing *keyring;
147 std::unique_ptr<MonConnection> mc;
148
149 MonClientPinger(CephContext *cct_,
150 RotatingKeyRing *keyring,
151 std::string *res_) :
152 Dispatcher(cct_),
153 result(res_),
154 done(false),
155 keyring(keyring)
156 { }
157
158 int wait_for_reply(double timeout = 0.0) {
159 std::unique_lock locker{lock};
160 if (timeout <= 0) {
161 timeout = std::chrono::duration<double>(cct->_conf.get_val<std::chrono::seconds>("client_mount_timeout")).count();
162 }
163 done = false;
164 if (ping_recvd_cond.wait_for(locker,
165 ceph::make_timespan(timeout),
166 [this] { return done; })) {
167 return 0;
168 } else {
169 return ETIMEDOUT;
170 }
171 }
172
173 bool ms_dispatch(Message *m) override {
174 using ceph::decode;
175 std::lock_guard l(lock);
176 if (m->get_type() != CEPH_MSG_PING)
177 return false;
178
179 ceph::buffer::list &payload = m->get_payload();
180 if (result && payload.length() > 0) {
181 auto p = std::cbegin(payload);
182 decode(*result, p);
183 }
184 done = true;
185 ping_recvd_cond.notify_all();
186 m->put();
187 return true;
188 }
189 bool ms_handle_reset(Connection *con) override {
190 std::lock_guard l(lock);
191 done = true;
192 ping_recvd_cond.notify_all();
193 return true;
194 }
195 void ms_handle_remote_reset(Connection *con) override {}
196 bool ms_handle_refused(Connection *con) override {
197 return false;
198 }
199
200 // AuthClient
201 int get_auth_request(
202 Connection *con,
203 AuthConnectionMeta *auth_meta,
204 uint32_t *auth_method,
205 std::vector<uint32_t> *preferred_modes,
206 ceph::buffer::list *bl) override {
207 return mc->get_auth_request(auth_method, preferred_modes, bl,
208 cct->_conf->name, 0, keyring);
209 }
210 int handle_auth_reply_more(
211 Connection *con,
212 AuthConnectionMeta *auth_meta,
213 const ceph::buffer::list& bl,
214 ceph::buffer::list *reply) override {
215 return mc->handle_auth_reply_more(auth_meta, bl, reply);
216 }
217 int handle_auth_done(
218 Connection *con,
219 AuthConnectionMeta *auth_meta,
220 uint64_t global_id,
221 uint32_t con_mode,
222 const ceph::buffer::list& bl,
223 CryptoKey *session_key,
224 std::string *connection_secret) override {
225 return mc->handle_auth_done(auth_meta, global_id, bl,
226 session_key, connection_secret);
227 }
228 int handle_auth_bad_method(
229 Connection *con,
230 AuthConnectionMeta *auth_meta,
231 uint32_t old_auth_method,
232 int result,
233 const std::vector<uint32_t>& allowed_methods,
234 const std::vector<uint32_t>& allowed_modes) override {
235 return mc->handle_auth_bad_method(old_auth_method, result,
236 allowed_methods, allowed_modes);
237 }
238 };
239
240 const boost::system::error_category& monc_category() noexcept;
241
242 enum class monc_errc {
243 shutting_down = 1, // Command failed due to MonClient shutting down
244 session_reset, // Monitor session was reset
245 rank_dne, // Requested monitor rank does not exist
246 mon_dne, // Requested monitor does not exist
247 timed_out, // Monitor operation timed out
248 mon_unavailable // Monitor unavailable
249 };
250
251 namespace boost::system {
252 template<>
253 struct is_error_code_enum<::monc_errc> {
254 static const bool value = true;
255 };
256 }
257
258 // implicit conversion:
259 inline boost::system::error_code make_error_code(monc_errc e) noexcept {
260 return { static_cast<int>(e), monc_category() };
261 }
262
263 // explicit conversion:
264 inline boost::system::error_condition make_error_condition(monc_errc e) noexcept {
265 return { static_cast<int>(e), monc_category() };
266 }
267
268 const boost::system::error_category& monc_category() noexcept;
269
270 class MonClient : public Dispatcher,
271 public AuthClient,
272 public AuthServer /* for mgr, osd, mds */ {
273 static constexpr auto dout_subsys = ceph_subsys_monc;
274 public:
275 // Error, Newest, Oldest
276 using VersionSig = void(boost::system::error_code, version_t, version_t);
277 using VersionCompletion = ceph::async::Completion<VersionSig>;
278
279 using CommandSig = void(boost::system::error_code, std::string,
280 ceph::buffer::list);
281 using CommandCompletion = ceph::async::Completion<CommandSig>;
282
283 MonMap monmap;
284 std::map<std::string,std::string> config_mgr;
285
286 private:
287 Messenger *messenger;
288
289 std::unique_ptr<MonConnection> active_con;
290 std::map<entity_addrvec_t, MonConnection> pending_cons;
291 std::set<unsigned> tried;
292
293 EntityName entity_name;
294
295 mutable ceph::mutex monc_lock = ceph::make_mutex("MonClient::monc_lock");
296 SafeTimer timer;
297 boost::asio::io_context& service;
298 boost::asio::io_context::strand finish_strand{service};
299
300 bool initialized;
301 bool stopping = false;
302
303 LogClient *log_client;
304 bool more_log_pending;
305
306 void send_log(bool flush = false);
307
308 bool ms_dispatch(Message *m) override;
309 bool ms_handle_reset(Connection *con) override;
310 void ms_handle_remote_reset(Connection *con) override {}
311 bool ms_handle_refused(Connection *con) override { return false; }
312
313 void handle_monmap(MMonMap *m);
314 void handle_config(MConfig *m);
315
316 void handle_auth(MAuthReply *m);
317
318 // monitor session
319 utime_t last_keepalive;
320 utime_t last_send_log;
321
322 void tick();
323 void schedule_tick();
324
325 // monclient
326 bool want_monmap;
327 ceph::condition_variable map_cond;
328 bool passthrough_monmap = false;
329
330 bool want_bootstrap_config = false;
331 ceph::ref_t<MConfig> bootstrap_config;
332
333 // authenticate
334 std::unique_ptr<AuthClientHandler> auth;
335 uint32_t want_keys = 0;
336 uint64_t global_id = 0;
337 ceph::condition_variable auth_cond;
338 int authenticate_err = 0;
339 bool authenticated = false;
340
341 std::list<MessageRef> waiting_for_session;
342 utime_t last_rotating_renew_sent;
343 bool had_a_connection;
344 double reopen_interval_multiplier;
345
346 Dispatcher *handle_authentication_dispatcher = nullptr;
347 bool _opened() const;
348 bool _hunting() const;
349 void _start_hunting();
350 void _finish_hunting(int auth_err);
351 void _finish_auth(int auth_err);
352 void _reopen_session(int rank = -1);
353 void _add_conn(unsigned rank);
354 void _add_conns();
355 void _un_backoff();
356 void _send_mon_message(MessageRef m);
357
358 std::map<entity_addrvec_t, MonConnection>::iterator _find_pending_con(
359 const ConnectionRef& con) {
360 for (auto i = pending_cons.begin(); i != pending_cons.end(); ++i) {
361 if (i->second.get_con() == con) {
362 return i;
363 }
364 }
365 return pending_cons.end();
366 }
367
368 public:
369 // AuthClient
370 int get_auth_request(
371 Connection *con,
372 AuthConnectionMeta *auth_meta,
373 uint32_t *method,
374 std::vector<uint32_t> *preferred_modes,
375 ceph::buffer::list *bl) override;
376 int handle_auth_reply_more(
377 Connection *con,
378 AuthConnectionMeta *auth_meta,
379 const ceph::buffer::list& bl,
380 ceph::buffer::list *reply) override;
381 int handle_auth_done(
382 Connection *con,
383 AuthConnectionMeta *auth_meta,
384 uint64_t global_id,
385 uint32_t con_mode,
386 const ceph::buffer::list& bl,
387 CryptoKey *session_key,
388 std::string *connection_secret) override;
389 int handle_auth_bad_method(
390 Connection *con,
391 AuthConnectionMeta *auth_meta,
392 uint32_t old_auth_method,
393 int result,
394 const std::vector<uint32_t>& allowed_methods,
395 const std::vector<uint32_t>& allowed_modes) override;
396 // AuthServer
397 int handle_auth_request(
398 Connection *con,
399 AuthConnectionMeta *auth_meta,
400 bool more,
401 uint32_t auth_method,
402 const ceph::buffer::list& bl,
403 ceph::buffer::list *reply) override;
404
405 void set_entity_name(EntityName name) { entity_name = name; }
406 void set_handle_authentication_dispatcher(Dispatcher *d) {
407 handle_authentication_dispatcher = d;
408 }
409 int _check_auth_tickets();
410 int _check_auth_rotating();
411 int wait_auth_rotating(double timeout);
412
413 int authenticate(double timeout=0.0);
414 bool is_authenticated() const {return authenticated;}
415
416 bool is_connected() const { return active_con != nullptr; }
417
418 /**
419 * Try to flush as many log messages as we can in a single
420 * message. Use this before shutting down to transmit your
421 * last message.
422 */
423 void flush_log();
424
425 private:
426 // mon subscriptions
427 MonSub sub;
428 void _renew_subs();
429 void handle_subscribe_ack(MMonSubscribeAck* m);
430
431 public:
432 void renew_subs() {
433 std::lock_guard l(monc_lock);
434 _renew_subs();
435 }
436 bool sub_want(std::string what, version_t start, unsigned flags) {
437 std::lock_guard l(monc_lock);
438 return sub.want(what, start, flags);
439 }
440 void sub_got(std::string what, version_t have) {
441 std::lock_guard l(monc_lock);
442 sub.got(what, have);
443 }
444 void sub_unwant(std::string what) {
445 std::lock_guard l(monc_lock);
446 sub.unwant(what);
447 }
448 bool sub_want_increment(std::string what, version_t start, unsigned flags) {
449 std::lock_guard l(monc_lock);
450 return sub.inc_want(what, start, flags);
451 }
452
453 std::unique_ptr<KeyRing> keyring;
454 std::unique_ptr<RotatingKeyRing> rotating_secrets;
455
456 public:
457 MonClient(CephContext *cct_, boost::asio::io_context& service);
458 MonClient(const MonClient &) = delete;
459 MonClient& operator=(const MonClient &) = delete;
460 ~MonClient() override;
461
462 int init();
463 void shutdown();
464
465 void set_log_client(LogClient *clog) {
466 log_client = clog;
467 }
468 LogClient *get_log_client() {
469 return log_client;
470 }
471
472 int build_initial_monmap();
473 int get_monmap();
474 int get_monmap_and_config();
475 /**
476 * If you want to see MonMap messages, set this and
477 * the MonClient will tell the Messenger it hasn't
478 * dealt with it.
479 * Note that if you do this, *you* are of course responsible for
480 * putting the message reference!
481 */
482 void set_passthrough_monmap() {
483 std::lock_guard l(monc_lock);
484 passthrough_monmap = true;
485 }
486 void unset_passthrough_monmap() {
487 std::lock_guard l(monc_lock);
488 passthrough_monmap = false;
489 }
490 /**
491 * Ping monitor with ID @p mon_id and record the resulting
492 * reply in @p result_reply.
493 *
494 * @param[in] mon_id Target monitor's ID
495 * @param[out] result_reply reply from mon.ID, if param != NULL
496 * @returns 0 in case of success; < 0 in case of error,
497 * -ETIMEDOUT if monitor didn't reply before timeout
498 * expired (default: conf->client_mount_timeout).
499 */
500 int ping_monitor(const std::string &mon_id, std::string *result_reply);
501
502 void send_mon_message(Message *m) {
503 send_mon_message(MessageRef{m, false});
504 }
505 void send_mon_message(MessageRef m);
506
507 void reopen_session() {
508 std::lock_guard l(monc_lock);
509 _reopen_session();
510 }
511
512 const uuid_d& get_fsid() const {
513 return monmap.fsid;
514 }
515
516 entity_addrvec_t get_mon_addrs(unsigned i) const {
517 std::lock_guard l(monc_lock);
518 if (i < monmap.size())
519 return monmap.get_addrs(i);
520 return entity_addrvec_t();
521 }
522 int get_num_mon() const {
523 std::lock_guard l(monc_lock);
524 return monmap.size();
525 }
526
527 uint64_t get_global_id() const {
528 std::lock_guard l(monc_lock);
529 return global_id;
530 }
531
532 void set_messenger(Messenger *m) { messenger = m; }
533 entity_addrvec_t get_myaddrs() const { return messenger->get_myaddrs(); }
534 AuthAuthorizer* build_authorizer(int service_id) const;
535
536 void set_want_keys(uint32_t want) {
537 want_keys = want;
538 }
539
540 // admin commands
541 private:
542 uint64_t last_mon_command_tid;
543
544 struct MonCommand {
545 // for tell only
546 std::string target_name;
547 int target_rank = -1;
548 ConnectionRef target_con;
549 std::unique_ptr<MonConnection> target_session;
550 unsigned send_attempts = 0; ///< attempt count for legacy mons
551 utime_t last_send_attempt;
552 uint64_t tid;
553 std::vector<std::string> cmd;
554 ceph::buffer::list inbl;
555 std::unique_ptr<CommandCompletion> onfinish;
556 std::optional<boost::asio::steady_timer> cancel_timer;
557
558 MonCommand(MonClient& monc, uint64_t t, std::unique_ptr<CommandCompletion> onfinish)
559 : tid(t), onfinish(std::move(onfinish)) {
560 auto timeout =
561 monc.cct->_conf.get_val<std::chrono::seconds>("rados_mon_op_timeout");
562 if (timeout.count() > 0) {
563 cancel_timer.emplace(monc.service, timeout);
564 cancel_timer->async_wait(
565 [this, &monc](boost::system::error_code ec) {
566 if (ec)
567 return;
568 std::scoped_lock l(monc.monc_lock);
569 monc._cancel_mon_command(tid);
570 });
571 }
572 }
573
574 bool is_tell() const {
575 return target_name.size() || target_rank >= 0;
576 }
577 };
578 friend MonCommand;
579 std::map<uint64_t,MonCommand*> mon_commands;
580
581 void _send_command(MonCommand *r);
582 void _check_tell_commands();
583 void _resend_mon_commands();
584 int _cancel_mon_command(uint64_t tid);
585 void _finish_command(MonCommand *r, boost::system::error_code ret, std::string_view rs,
586 bufferlist&& bl);
587 void _finish_auth();
588 void handle_mon_command_ack(MMonCommandAck *ack);
589 void handle_command_reply(MCommandReply *reply);
590
591 public:
592 template<typename CompletionToken>
593 auto start_mon_command(const std::vector<std::string>& cmd,
594 const ceph::buffer::list& inbl,
595 CompletionToken&& token) {
596 ldout(cct,10) << __func__ << " cmd=" << cmd << dendl;
597 boost::asio::async_completion<CompletionToken, CommandSig> init(token);
598 {
599 std::scoped_lock l(monc_lock);
600 auto h = CommandCompletion::create(service.get_executor(),
601 std::move(init.completion_handler));
602 if (!initialized || stopping) {
603 ceph::async::post(std::move(h), monc_errc::shutting_down, std::string{},
604 bufferlist{});
605 } else {
606 auto r = new MonCommand(*this, ++last_mon_command_tid, std::move(h));
607 r->cmd = cmd;
608 r->inbl = inbl;
609 mon_commands.emplace(r->tid, r);
610 _send_command(r);
611 }
612 }
613 return init.result.get();
614 }
615
616 template<typename CompletionToken>
617 auto start_mon_command(int mon_rank, const std::vector<std::string>& cmd,
618 const ceph::buffer::list& inbl, CompletionToken&& token) {
619 ldout(cct,10) << __func__ << " cmd=" << cmd << dendl;
620 boost::asio::async_completion<CompletionToken, CommandSig> init(token);
621 {
622 std::scoped_lock l(monc_lock);
623 auto h = CommandCompletion::create(service.get_executor(),
624 std::move(init.completion_handler));
625 if (!initialized || stopping) {
626 ceph::async::post(std::move(h), monc_errc::shutting_down, std::string{},
627 bufferlist{});
628 } else {
629 auto r = new MonCommand(*this, ++last_mon_command_tid, std::move(h));
630 r->target_rank = mon_rank;
631 r->cmd = cmd;
632 r->inbl = inbl;
633 mon_commands.emplace(r->tid, r);
634 _send_command(r);
635 }
636 }
637 return init.result.get();
638 }
639
640 template<typename CompletionToken>
641 auto start_mon_command(const std::string& mon_name,
642 const std::vector<std::string>& cmd,
643 const ceph::buffer::list& inbl,
644 CompletionToken&& token) {
645 ldout(cct,10) << __func__ << " cmd=" << cmd << dendl;
646 boost::asio::async_completion<CompletionToken, CommandSig> init(token);
647 {
648 std::scoped_lock l(monc_lock);
649 auto h = CommandCompletion::create(service.get_executor(),
650 std::move(init.completion_handler));
651 if (!initialized || stopping) {
652 ceph::async::post(std::move(h), monc_errc::shutting_down, std::string{},
653 bufferlist{});
654 } else {
655 auto r = new MonCommand(*this, ++last_mon_command_tid, std::move(h));
656 // detect/tolerate mon *rank* passed as a string
657 std::string err;
658 int rank = strict_strtoll(mon_name.c_str(), 10, &err);
659 if (err.size() == 0 && rank >= 0) {
660 ldout(cct,10) << __func__ << " interpreting name '" << mon_name
661 << "' as rank " << rank << dendl;
662 r->target_rank = rank;
663 } else {
664 r->target_name = mon_name;
665 }
666 r->cmd = cmd;
667 r->inbl = inbl;
668 mon_commands.emplace(r->tid, r);
669 _send_command(r);
670 }
671 }
672 return init.result.get();
673 }
674
675 class ContextVerter {
676 std::string* outs;
677 ceph::bufferlist* outbl;
678 Context* onfinish;
679
680 public:
681 ContextVerter(std::string* outs, ceph::bufferlist* outbl, Context* onfinish)
682 : outs(outs), outbl(outbl), onfinish(onfinish) {}
683 ~ContextVerter() = default;
684 ContextVerter(const ContextVerter&) = default;
685 ContextVerter& operator =(const ContextVerter&) = default;
686 ContextVerter(ContextVerter&&) = default;
687 ContextVerter& operator =(ContextVerter&&) = default;
688
689 void operator()(boost::system::error_code e,
690 std::string s,
691 ceph::bufferlist bl) {
692 if (outs)
693 *outs = std::move(s);
694 if (outbl)
695 *outbl = std::move(bl);
696 if (onfinish)
697 onfinish->complete(ceph::from_error_code(e));
698 }
699 };
700
701 void start_mon_command(const std::vector<std::string>& cmd, const bufferlist& inbl,
702 bufferlist *outbl, std::string *outs,
703 Context *onfinish) {
704 start_mon_command(cmd, inbl, ContextVerter(outs, outbl, onfinish));
705 }
706 void start_mon_command(int mon_rank,
707 const std::vector<std::string>& cmd, const bufferlist& inbl,
708 bufferlist *outbl, std::string *outs,
709 Context *onfinish) {
710 start_mon_command(mon_rank, cmd, inbl, ContextVerter(outs, outbl, onfinish));
711 }
712 void start_mon_command(const std::string &mon_name, ///< mon name, with mon. prefix
713 const std::vector<std::string>& cmd, const bufferlist& inbl,
714 bufferlist *outbl, std::string *outs,
715 Context *onfinish) {
716 start_mon_command(mon_name, cmd, inbl, ContextVerter(outs, outbl, onfinish));
717 }
718
719
720 // version requests
721 public:
722 /**
723 * get latest known version(s) of cluster map
724 *
725 * @param map string name of map (e.g., 'osdmap')
726 * @param token context that will be triggered on completion
727 * @return (via Completion) {} on success,
728 * boost::system::errc::resource_unavailable_try_again if we need to
729 * resubmit our request
730 */
731 template<typename CompletionToken>
732 auto get_version(std::string&& map, CompletionToken&& token) {
733 boost::asio::async_completion<CompletionToken, VersionSig> init(token);
734 {
735 std::scoped_lock l(monc_lock);
736 auto m = ceph::make_message<MMonGetVersion>();
737 m->what = std::move(map);
738 m->handle = ++version_req_id;
739 version_requests.emplace(m->handle,
740 VersionCompletion::create(
741 service.get_executor(),
742 std::move(init.completion_handler)));
743 _send_mon_message(m);
744 }
745 return init.result.get();
746 }
747
748 /**
749 * Run a callback within our lock, with a reference
750 * to the MonMap
751 */
752 template<typename Callback, typename...Args>
753 auto with_monmap(Callback&& cb, Args&&...args) const ->
754 decltype(cb(monmap, std::forward<Args>(args)...)) {
755 std::lock_guard l(monc_lock);
756 return std::forward<Callback>(cb)(monmap, std::forward<Args>(args)...);
757 }
758
759 void register_config_callback(md_config_t::config_callback fn);
760 void register_config_notify_callback(std::function<void(void)> f) {
761 config_notify_cb = f;
762 }
763 md_config_t::config_callback get_config_callback();
764
765 private:
766
767 std::map<ceph_tid_t, std::unique_ptr<VersionCompletion>> version_requests;
768 ceph_tid_t version_req_id;
769 void handle_get_version_reply(MMonGetVersionReply* m);
770 md_config_t::config_callback config_cb;
771 std::function<void(void)> config_notify_cb;
772 };
773
774 #endif