]> git.proxmox.com Git - ceph.git/blob - ceph/src/mon/MonClient.h
import ceph quincy 17.2.6
[ceph.git] / ceph / src / mon / MonClient.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14 #ifndef CEPH_MONCLIENT_H
15 #define CEPH_MONCLIENT_H
16
17 #include <functional>
18 #include <list>
19 #include <map>
20 #include <memory>
21 #include <set>
22 #include <string>
23 #include <vector>
24
25 #include "msg/Messenger.h"
26
27 #include "MonMap.h"
28 #include "MonSub.h"
29
30 #include "common/admin_socket.h"
31 #include "common/async/completion.h"
32 #include "common/Timer.h"
33 #include "common/config.h"
34 #include "messages/MMonGetVersion.h"
35
36 #include "auth/AuthClient.h"
37 #include "auth/AuthServer.h"
38
39 class MMonMap;
40 class MConfig;
41 class MMonGetVersionReply;
42 class MMonCommandAck;
43 class LogClient;
44 class AuthClientHandler;
45 class AuthRegistry;
46 class KeyRing;
47 class RotatingKeyRing;
48
49 class MonConnection {
50 public:
51 MonConnection(CephContext *cct,
52 ConnectionRef conn,
53 uint64_t global_id,
54 AuthRegistry *auth_registry);
55 ~MonConnection();
56 MonConnection(MonConnection&& rhs) = default;
57 MonConnection& operator=(MonConnection&&) = default;
58 MonConnection(const MonConnection& rhs) = delete;
59 MonConnection& operator=(const MonConnection&) = delete;
60 int handle_auth(MAuthReply *m,
61 const EntityName& entity_name,
62 uint32_t want_keys,
63 RotatingKeyRing* keyring);
64 int authenticate(MAuthReply *m);
65 void start(epoch_t epoch,
66 const EntityName& entity_name);
67 bool have_session() const;
68 uint64_t get_global_id() const {
69 return global_id;
70 }
71 ConnectionRef get_con() {
72 return con;
73 }
74 std::unique_ptr<AuthClientHandler>& get_auth() {
75 return auth;
76 }
77
78 int get_auth_request(
79 uint32_t *method,
80 std::vector<uint32_t> *preferred_modes,
81 ceph::buffer::list *out,
82 const EntityName& entity_name,
83 uint32_t want_keys,
84 RotatingKeyRing* keyring);
85 int handle_auth_reply_more(
86 AuthConnectionMeta *auth_meta,
87 const ceph::buffer::list& bl,
88 ceph::buffer::list *reply);
89 int handle_auth_done(
90 AuthConnectionMeta *auth_meta,
91 uint64_t global_id,
92 const ceph::buffer::list& bl,
93 CryptoKey *session_key,
94 std::string *connection_secret);
95 int handle_auth_bad_method(
96 uint32_t old_auth_method,
97 int result,
98 const std::vector<uint32_t>& allowed_methods,
99 const std::vector<uint32_t>& allowed_modes);
100
101 bool is_con(Connection *c) const {
102 return con.get() == c;
103 }
104 void queue_command(Message *m) {
105 pending_tell_command = m;
106 }
107
108 private:
109 int _negotiate(MAuthReply *m,
110 const EntityName& entity_name,
111 uint32_t want_keys,
112 RotatingKeyRing* keyring);
113 int _init_auth(uint32_t method,
114 const EntityName& entity_name,
115 uint32_t want_keys,
116 RotatingKeyRing* keyring,
117 bool msgr2);
118
119 private:
120 CephContext *cct;
121 enum class State {
122 NONE,
123 NEGOTIATING, // v1 only
124 AUTHENTICATING, // v1 and v2
125 HAVE_SESSION,
126 };
127 State state = State::NONE;
128 ConnectionRef con;
129 int auth_method = -1;
130 utime_t auth_start;
131
132 std::unique_ptr<AuthClientHandler> auth;
133 uint64_t global_id;
134
135 MessageRef pending_tell_command;
136
137 AuthRegistry *auth_registry;
138 };
139
140
141 struct MonClientPinger : public Dispatcher,
142 public AuthClient {
143 ceph::mutex lock = ceph::make_mutex("MonClientPinger::lock");
144 ceph::condition_variable ping_recvd_cond;
145 std::string *result;
146 bool done;
147 RotatingKeyRing *keyring;
148 std::unique_ptr<MonConnection> mc;
149
150 MonClientPinger(CephContext *cct_,
151 RotatingKeyRing *keyring,
152 std::string *res_) :
153 Dispatcher(cct_),
154 result(res_),
155 done(false),
156 keyring(keyring)
157 { }
158
159 int wait_for_reply(double timeout = 0.0) {
160 std::unique_lock locker{lock};
161 if (timeout <= 0) {
162 timeout = std::chrono::duration<double>(cct->_conf.get_val<std::chrono::seconds>("client_mount_timeout")).count();
163 }
164 done = false;
165 if (ping_recvd_cond.wait_for(locker,
166 ceph::make_timespan(timeout),
167 [this] { return done; })) {
168 return 0;
169 } else {
170 return ETIMEDOUT;
171 }
172 }
173
174 bool ms_dispatch(Message *m) override {
175 using ceph::decode;
176 std::lock_guard l(lock);
177 if (m->get_type() != CEPH_MSG_PING)
178 return false;
179
180 ceph::buffer::list &payload = m->get_payload();
181 if (result && payload.length() > 0) {
182 auto p = std::cbegin(payload);
183 decode(*result, p);
184 }
185 done = true;
186 ping_recvd_cond.notify_all();
187 m->put();
188 return true;
189 }
190 bool ms_handle_reset(Connection *con) override {
191 std::lock_guard l(lock);
192 done = true;
193 ping_recvd_cond.notify_all();
194 return true;
195 }
196 void ms_handle_remote_reset(Connection *con) override {}
197 bool ms_handle_refused(Connection *con) override {
198 return false;
199 }
200
201 // AuthClient
202 int get_auth_request(
203 Connection *con,
204 AuthConnectionMeta *auth_meta,
205 uint32_t *auth_method,
206 std::vector<uint32_t> *preferred_modes,
207 ceph::buffer::list *bl) override {
208 return mc->get_auth_request(auth_method, preferred_modes, bl,
209 cct->_conf->name, 0, keyring);
210 }
211 int handle_auth_reply_more(
212 Connection *con,
213 AuthConnectionMeta *auth_meta,
214 const ceph::buffer::list& bl,
215 ceph::buffer::list *reply) override {
216 return mc->handle_auth_reply_more(auth_meta, bl, reply);
217 }
218 int handle_auth_done(
219 Connection *con,
220 AuthConnectionMeta *auth_meta,
221 uint64_t global_id,
222 uint32_t con_mode,
223 const ceph::buffer::list& bl,
224 CryptoKey *session_key,
225 std::string *connection_secret) override {
226 return mc->handle_auth_done(auth_meta, global_id, bl,
227 session_key, connection_secret);
228 }
229 int handle_auth_bad_method(
230 Connection *con,
231 AuthConnectionMeta *auth_meta,
232 uint32_t old_auth_method,
233 int result,
234 const std::vector<uint32_t>& allowed_methods,
235 const std::vector<uint32_t>& allowed_modes) override {
236 return mc->handle_auth_bad_method(old_auth_method, result,
237 allowed_methods, allowed_modes);
238 }
239 };
240
241 const boost::system::error_category& monc_category() noexcept;
242
243 enum class monc_errc {
244 shutting_down = 1, // Command failed due to MonClient shutting down
245 session_reset, // Monitor session was reset
246 rank_dne, // Requested monitor rank does not exist
247 mon_dne, // Requested monitor does not exist
248 timed_out, // Monitor operation timed out
249 mon_unavailable // Monitor unavailable
250 };
251
252 namespace boost::system {
253 template<>
254 struct is_error_code_enum<::monc_errc> {
255 static const bool value = true;
256 };
257 }
258
259 // implicit conversion:
260 inline boost::system::error_code make_error_code(monc_errc e) noexcept {
261 return { static_cast<int>(e), monc_category() };
262 }
263
264 // explicit conversion:
265 inline boost::system::error_condition make_error_condition(monc_errc e) noexcept {
266 return { static_cast<int>(e), monc_category() };
267 }
268
269 const boost::system::error_category& monc_category() noexcept;
270
271 class MonClient : public Dispatcher,
272 public AuthClient,
273 public AuthServer, /* for mgr, osd, mds */
274 public AdminSocketHook {
275 static constexpr auto dout_subsys = ceph_subsys_monc;
276 public:
277 // Error, Newest, Oldest
278 using VersionSig = void(boost::system::error_code, version_t, version_t);
279 using VersionCompletion = ceph::async::Completion<VersionSig>;
280
281 using CommandSig = void(boost::system::error_code, std::string,
282 ceph::buffer::list);
283 using CommandCompletion = ceph::async::Completion<CommandSig>;
284
285 MonMap monmap;
286 std::map<std::string,std::string> config_mgr;
287
288 private:
289 Messenger *messenger;
290
291 std::unique_ptr<MonConnection> active_con;
292 std::map<entity_addrvec_t, MonConnection> pending_cons;
293 std::set<unsigned> tried;
294
295 EntityName entity_name;
296
297 mutable ceph::mutex monc_lock = ceph::make_mutex("MonClient::monc_lock");
298 SafeTimer timer;
299 boost::asio::io_context& service;
300 boost::asio::io_context::strand finish_strand{service};
301
302 bool initialized;
303 bool stopping = false;
304
305 LogClient *log_client;
306 bool more_log_pending;
307
308 void send_log(bool flush = false);
309
310 bool ms_dispatch(Message *m) override;
311 bool ms_handle_reset(Connection *con) override;
312 void ms_handle_remote_reset(Connection *con) override {}
313 bool ms_handle_refused(Connection *con) override { return false; }
314
315 void handle_monmap(MMonMap *m);
316 void handle_config(MConfig *m);
317
318 void handle_auth(MAuthReply *m);
319
320 int call(
321 std::string_view command,
322 const cmdmap_t& cmdmap,
323 const ceph::buffer::list &inbl,
324 ceph::Formatter *f,
325 std::ostream& errss,
326 ceph::buffer::list& out) override;
327
328 // monitor session
329 utime_t last_keepalive;
330 utime_t last_send_log;
331
332 void tick();
333 void schedule_tick();
334
335 // monclient
336 bool want_monmap;
337 ceph::condition_variable map_cond;
338 bool passthrough_monmap = false;
339
340 bool want_bootstrap_config = false;
341 ceph::ref_t<MConfig> bootstrap_config;
342
343 // authenticate
344 std::unique_ptr<AuthClientHandler> auth;
345 uint32_t want_keys = 0;
346 uint64_t global_id = 0;
347 ceph::condition_variable auth_cond;
348 int authenticate_err = 0;
349 bool authenticated = false;
350
351 std::list<MessageRef> waiting_for_session;
352 utime_t last_rotating_renew_sent;
353 bool had_a_connection;
354 double reopen_interval_multiplier;
355
356 Dispatcher *handle_authentication_dispatcher = nullptr;
357 bool _opened() const;
358 bool _hunting() const;
359 void _start_hunting();
360 void _finish_hunting(int auth_err);
361 void _finish_auth(int auth_err);
362 void _reopen_session(int rank = -1);
363 void _add_conn(unsigned rank);
364 void _add_conns();
365 void _un_backoff();
366 void _send_mon_message(MessageRef m);
367
368 std::map<entity_addrvec_t, MonConnection>::iterator _find_pending_con(
369 const ConnectionRef& con) {
370 for (auto i = pending_cons.begin(); i != pending_cons.end(); ++i) {
371 if (i->second.get_con() == con) {
372 return i;
373 }
374 }
375 return pending_cons.end();
376 }
377
378 public:
379 // AuthClient
380 int get_auth_request(
381 Connection *con,
382 AuthConnectionMeta *auth_meta,
383 uint32_t *method,
384 std::vector<uint32_t> *preferred_modes,
385 ceph::buffer::list *bl) override;
386 int handle_auth_reply_more(
387 Connection *con,
388 AuthConnectionMeta *auth_meta,
389 const ceph::buffer::list& bl,
390 ceph::buffer::list *reply) override;
391 int handle_auth_done(
392 Connection *con,
393 AuthConnectionMeta *auth_meta,
394 uint64_t global_id,
395 uint32_t con_mode,
396 const ceph::buffer::list& bl,
397 CryptoKey *session_key,
398 std::string *connection_secret) override;
399 int handle_auth_bad_method(
400 Connection *con,
401 AuthConnectionMeta *auth_meta,
402 uint32_t old_auth_method,
403 int result,
404 const std::vector<uint32_t>& allowed_methods,
405 const std::vector<uint32_t>& allowed_modes) override;
406 // AuthServer
407 int handle_auth_request(
408 Connection *con,
409 AuthConnectionMeta *auth_meta,
410 bool more,
411 uint32_t auth_method,
412 const ceph::buffer::list& bl,
413 ceph::buffer::list *reply) override;
414
415 void set_entity_name(EntityName name) { entity_name = name; }
416 void set_handle_authentication_dispatcher(Dispatcher *d) {
417 handle_authentication_dispatcher = d;
418 }
419 int _check_auth_tickets();
420 int _check_auth_rotating();
421 int wait_auth_rotating(double timeout);
422
423 int authenticate(double timeout=0.0);
424 bool is_authenticated() const {return authenticated;}
425
426 bool is_connected() const { return active_con != nullptr; }
427
428 /**
429 * Try to flush as many log messages as we can in a single
430 * message. Use this before shutting down to transmit your
431 * last message.
432 */
433 void flush_log();
434
435 private:
436 // mon subscriptions
437 MonSub sub;
438 void _renew_subs();
439 void handle_subscribe_ack(MMonSubscribeAck* m);
440
441 public:
442 void renew_subs() {
443 std::lock_guard l(monc_lock);
444 _renew_subs();
445 }
446 bool sub_want(std::string what, version_t start, unsigned flags) {
447 std::lock_guard l(monc_lock);
448 return sub.want(what, start, flags);
449 }
450 void sub_got(std::string what, version_t have) {
451 std::lock_guard l(monc_lock);
452 sub.got(what, have);
453 }
454 void sub_unwant(std::string what) {
455 std::lock_guard l(monc_lock);
456 sub.unwant(what);
457 }
458 bool sub_want_increment(std::string what, version_t start, unsigned flags) {
459 std::lock_guard l(monc_lock);
460 return sub.inc_want(what, start, flags);
461 }
462
463 std::unique_ptr<KeyRing> keyring;
464 std::unique_ptr<RotatingKeyRing> rotating_secrets;
465
466 public:
467 MonClient(CephContext *cct_, boost::asio::io_context& service);
468 MonClient(const MonClient &) = delete;
469 MonClient& operator=(const MonClient &) = delete;
470 ~MonClient() override;
471
472 int init();
473 void shutdown();
474
475 void set_log_client(LogClient *clog) {
476 log_client = clog;
477 }
478 LogClient *get_log_client() {
479 return log_client;
480 }
481
482 int build_initial_monmap();
483 int get_monmap();
484 int get_monmap_and_config();
485 /**
486 * If you want to see MonMap messages, set this and
487 * the MonClient will tell the Messenger it hasn't
488 * dealt with it.
489 * Note that if you do this, *you* are of course responsible for
490 * putting the message reference!
491 */
492 void set_passthrough_monmap() {
493 std::lock_guard l(monc_lock);
494 passthrough_monmap = true;
495 }
496 void unset_passthrough_monmap() {
497 std::lock_guard l(monc_lock);
498 passthrough_monmap = false;
499 }
500 /**
501 * Ping monitor with ID @p mon_id and record the resulting
502 * reply in @p result_reply.
503 *
504 * @param[in] mon_id Target monitor's ID
505 * @param[out] result_reply reply from mon.ID, if param != NULL
506 * @returns 0 in case of success; < 0 in case of error,
507 * -ETIMEDOUT if monitor didn't reply before timeout
508 * expired (default: conf->client_mount_timeout).
509 */
510 int ping_monitor(const std::string &mon_id, std::string *result_reply);
511
512 void send_mon_message(Message *m) {
513 send_mon_message(MessageRef{m, false});
514 }
515 void send_mon_message(MessageRef m);
516
517 void reopen_session() {
518 std::lock_guard l(monc_lock);
519 _reopen_session();
520 }
521
522 const uuid_d& get_fsid() const {
523 return monmap.fsid;
524 }
525
526 entity_addrvec_t get_mon_addrs(unsigned i) const {
527 std::lock_guard l(monc_lock);
528 if (i < monmap.size())
529 return monmap.get_addrs(i);
530 return entity_addrvec_t();
531 }
532 int get_num_mon() const {
533 std::lock_guard l(monc_lock);
534 return monmap.size();
535 }
536
537 uint64_t get_global_id() const {
538 std::lock_guard l(monc_lock);
539 return global_id;
540 }
541
542 void set_messenger(Messenger *m) { messenger = m; }
543 entity_addrvec_t get_myaddrs() const { return messenger->get_myaddrs(); }
544 AuthAuthorizer* build_authorizer(int service_id) const;
545
546 void set_want_keys(uint32_t want) {
547 want_keys = want;
548 }
549
550 // admin commands
551 private:
552 uint64_t last_mon_command_tid;
553
554 struct MonCommand {
555 // for tell only
556 std::string target_name;
557 int target_rank = -1;
558 ConnectionRef target_con;
559 std::unique_ptr<MonConnection> target_session;
560 unsigned send_attempts = 0; ///< attempt count for legacy mons
561 utime_t last_send_attempt;
562 uint64_t tid;
563 std::vector<std::string> cmd;
564 ceph::buffer::list inbl;
565 std::unique_ptr<CommandCompletion> onfinish;
566 std::optional<boost::asio::steady_timer> cancel_timer;
567
568 MonCommand(MonClient& monc, uint64_t t, std::unique_ptr<CommandCompletion> onfinish)
569 : tid(t), onfinish(std::move(onfinish)) {
570 auto timeout =
571 monc.cct->_conf.get_val<std::chrono::seconds>("rados_mon_op_timeout");
572 if (timeout.count() > 0) {
573 cancel_timer.emplace(monc.service, timeout);
574 cancel_timer->async_wait(
575 [this, &monc](boost::system::error_code ec) {
576 if (ec)
577 return;
578 std::scoped_lock l(monc.monc_lock);
579 monc._cancel_mon_command(tid);
580 });
581 }
582 }
583
584 bool is_tell() const {
585 return target_name.size() || target_rank >= 0;
586 }
587 };
588 friend MonCommand;
589 std::map<uint64_t,MonCommand*> mon_commands;
590
591 void _send_command(MonCommand *r);
592 void _check_tell_commands();
593 void _resend_mon_commands();
594 int _cancel_mon_command(uint64_t tid);
595 void _finish_command(MonCommand *r, boost::system::error_code ret, std::string_view rs,
596 bufferlist&& bl);
597 void _finish_auth();
598 void handle_mon_command_ack(MMonCommandAck *ack);
599 void handle_command_reply(MCommandReply *reply);
600
601 public:
602 template<typename CompletionToken>
603 auto start_mon_command(const std::vector<std::string>& cmd,
604 const ceph::buffer::list& inbl,
605 CompletionToken&& token) {
606 ldout(cct,10) << __func__ << " cmd=" << cmd << dendl;
607 boost::asio::async_completion<CompletionToken, CommandSig> init(token);
608 {
609 std::scoped_lock l(monc_lock);
610 auto h = CommandCompletion::create(service.get_executor(),
611 std::move(init.completion_handler));
612 if (!initialized || stopping) {
613 ceph::async::post(std::move(h), monc_errc::shutting_down, std::string{},
614 bufferlist{});
615 } else {
616 auto r = new MonCommand(*this, ++last_mon_command_tid, std::move(h));
617 r->cmd = cmd;
618 r->inbl = inbl;
619 mon_commands.emplace(r->tid, r);
620 _send_command(r);
621 }
622 }
623 return init.result.get();
624 }
625
626 template<typename CompletionToken>
627 auto start_mon_command(int mon_rank, const std::vector<std::string>& cmd,
628 const ceph::buffer::list& inbl, CompletionToken&& token) {
629 ldout(cct,10) << __func__ << " cmd=" << cmd << dendl;
630 boost::asio::async_completion<CompletionToken, CommandSig> init(token);
631 {
632 std::scoped_lock l(monc_lock);
633 auto h = CommandCompletion::create(service.get_executor(),
634 std::move(init.completion_handler));
635 if (!initialized || stopping) {
636 ceph::async::post(std::move(h), monc_errc::shutting_down, std::string{},
637 bufferlist{});
638 } else {
639 auto r = new MonCommand(*this, ++last_mon_command_tid, std::move(h));
640 r->target_rank = mon_rank;
641 r->cmd = cmd;
642 r->inbl = inbl;
643 mon_commands.emplace(r->tid, r);
644 _send_command(r);
645 }
646 }
647 return init.result.get();
648 }
649
650 template<typename CompletionToken>
651 auto start_mon_command(const std::string& mon_name,
652 const std::vector<std::string>& cmd,
653 const ceph::buffer::list& inbl,
654 CompletionToken&& token) {
655 ldout(cct,10) << __func__ << " cmd=" << cmd << dendl;
656 boost::asio::async_completion<CompletionToken, CommandSig> init(token);
657 {
658 std::scoped_lock l(monc_lock);
659 auto h = CommandCompletion::create(service.get_executor(),
660 std::move(init.completion_handler));
661 if (!initialized || stopping) {
662 ceph::async::post(std::move(h), monc_errc::shutting_down, std::string{},
663 bufferlist{});
664 } else {
665 auto r = new MonCommand(*this, ++last_mon_command_tid, std::move(h));
666 // detect/tolerate mon *rank* passed as a string
667 std::string err;
668 int rank = strict_strtoll(mon_name.c_str(), 10, &err);
669 if (err.size() == 0 && rank >= 0) {
670 ldout(cct,10) << __func__ << " interpreting name '" << mon_name
671 << "' as rank " << rank << dendl;
672 r->target_rank = rank;
673 } else {
674 r->target_name = mon_name;
675 }
676 r->cmd = cmd;
677 r->inbl = inbl;
678 mon_commands.emplace(r->tid, r);
679 _send_command(r);
680 }
681 }
682 return init.result.get();
683 }
684
685 class ContextVerter {
686 std::string* outs;
687 ceph::bufferlist* outbl;
688 Context* onfinish;
689
690 public:
691 ContextVerter(std::string* outs, ceph::bufferlist* outbl, Context* onfinish)
692 : outs(outs), outbl(outbl), onfinish(onfinish) {}
693 ~ContextVerter() = default;
694 ContextVerter(const ContextVerter&) = default;
695 ContextVerter& operator =(const ContextVerter&) = default;
696 ContextVerter(ContextVerter&&) = default;
697 ContextVerter& operator =(ContextVerter&&) = default;
698
699 void operator()(boost::system::error_code e,
700 std::string s,
701 ceph::bufferlist bl) {
702 if (outs)
703 *outs = std::move(s);
704 if (outbl)
705 *outbl = std::move(bl);
706 if (onfinish)
707 onfinish->complete(ceph::from_error_code(e));
708 }
709 };
710
711 void start_mon_command(const std::vector<std::string>& cmd, const bufferlist& inbl,
712 bufferlist *outbl, std::string *outs,
713 Context *onfinish) {
714 start_mon_command(cmd, inbl, ContextVerter(outs, outbl, onfinish));
715 }
716 void start_mon_command(int mon_rank,
717 const std::vector<std::string>& cmd, const bufferlist& inbl,
718 bufferlist *outbl, std::string *outs,
719 Context *onfinish) {
720 start_mon_command(mon_rank, cmd, inbl, ContextVerter(outs, outbl, onfinish));
721 }
722 void start_mon_command(const std::string &mon_name, ///< mon name, with mon. prefix
723 const std::vector<std::string>& cmd, const bufferlist& inbl,
724 bufferlist *outbl, std::string *outs,
725 Context *onfinish) {
726 start_mon_command(mon_name, cmd, inbl, ContextVerter(outs, outbl, onfinish));
727 }
728
729
730 // version requests
731 public:
732 /**
733 * get latest known version(s) of cluster map
734 *
735 * @param map string name of map (e.g., 'osdmap')
736 * @param token context that will be triggered on completion
737 * @return (via Completion) {} on success,
738 * boost::system::errc::resource_unavailable_try_again if we need to
739 * resubmit our request
740 */
741 template<typename CompletionToken>
742 auto get_version(std::string&& map, CompletionToken&& token) {
743 boost::asio::async_completion<CompletionToken, VersionSig> init(token);
744 {
745 std::scoped_lock l(monc_lock);
746 auto m = ceph::make_message<MMonGetVersion>();
747 m->what = std::move(map);
748 m->handle = ++version_req_id;
749 version_requests.emplace(m->handle,
750 VersionCompletion::create(
751 service.get_executor(),
752 std::move(init.completion_handler)));
753 _send_mon_message(m);
754 }
755 return init.result.get();
756 }
757
758 /**
759 * Run a callback within our lock, with a reference
760 * to the MonMap
761 */
762 template<typename Callback, typename...Args>
763 auto with_monmap(Callback&& cb, Args&&...args) const ->
764 decltype(cb(monmap, std::forward<Args>(args)...)) {
765 std::lock_guard l(monc_lock);
766 return std::forward<Callback>(cb)(monmap, std::forward<Args>(args)...);
767 }
768
769 void register_config_callback(md_config_t::config_callback fn);
770 void register_config_notify_callback(std::function<void(void)> f) {
771 config_notify_cb = f;
772 }
773 md_config_t::config_callback get_config_callback();
774
775 private:
776
777 std::map<ceph_tid_t, std::unique_ptr<VersionCompletion>> version_requests;
778 ceph_tid_t version_req_id;
779 void handle_get_version_reply(MMonGetVersionReply* m);
780 md_config_t::config_callback config_cb;
781 std::function<void(void)> config_notify_cb;
782 };
783
784 #endif