]>
Commit | Line | Data |
---|---|---|
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- | |
2 | // vim: ts=8 sw=2 smarttab | |
3 | /* | |
4 | * Ceph - scalable distributed file system | |
5 | * | |
6 | * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net> | |
7 | * | |
8 | * This is free software; you can redistribute it and/or | |
9 | * modify it under the terms of the GNU Lesser General Public | |
10 | * License version 2.1, as published by the Free Software | |
11 | * Foundation. See file COPYING. | |
12 | * | |
13 | */ | |
14 | #ifndef CEPH_MONCLIENT_H | |
15 | #define CEPH_MONCLIENT_H | |
16 | ||
17 | #include <functional> | |
18 | #include <list> | |
19 | #include <map> | |
20 | #include <memory> | |
21 | #include <set> | |
22 | #include <string> | |
23 | #include <vector> | |
24 | ||
25 | #include "msg/Messenger.h" | |
26 | ||
27 | #include "MonMap.h" | |
28 | #include "MonSub.h" | |
29 | ||
30 | #include "common/async/completion.h" | |
31 | #include "common/Timer.h" | |
32 | #include "common/config.h" | |
33 | #include "messages/MMonGetVersion.h" | |
34 | ||
35 | #include "auth/AuthClient.h" | |
36 | #include "auth/AuthServer.h" | |
37 | ||
38 | class MMonMap; | |
39 | class MConfig; | |
40 | class MMonGetVersionReply; | |
41 | class MMonCommandAck; | |
42 | class LogClient; | |
43 | class AuthClientHandler; | |
44 | class AuthRegistry; | |
45 | class KeyRing; | |
46 | class RotatingKeyRing; | |
47 | ||
48 | class MonConnection { | |
49 | public: | |
50 | MonConnection(CephContext *cct, | |
51 | ConnectionRef conn, | |
52 | uint64_t global_id, | |
53 | AuthRegistry *auth_registry); | |
54 | ~MonConnection(); | |
55 | MonConnection(MonConnection&& rhs) = default; | |
56 | MonConnection& operator=(MonConnection&&) = default; | |
57 | MonConnection(const MonConnection& rhs) = delete; | |
58 | MonConnection& operator=(const MonConnection&) = delete; | |
59 | int handle_auth(MAuthReply *m, | |
60 | const EntityName& entity_name, | |
61 | uint32_t want_keys, | |
62 | RotatingKeyRing* keyring); | |
63 | int authenticate(MAuthReply *m); | |
64 | void start(epoch_t epoch, | |
65 | const EntityName& entity_name); | |
66 | bool have_session() const; | |
67 | uint64_t get_global_id() const { | |
68 | return global_id; | |
69 | } | |
70 | ConnectionRef get_con() { | |
71 | return con; | |
72 | } | |
73 | std::unique_ptr<AuthClientHandler>& get_auth() { | |
74 | return auth; | |
75 | } | |
76 | ||
77 | int get_auth_request( | |
78 | uint32_t *method, | |
79 | std::vector<uint32_t> *preferred_modes, | |
80 | ceph::buffer::list *out, | |
81 | const EntityName& entity_name, | |
82 | uint32_t want_keys, | |
83 | RotatingKeyRing* keyring); | |
84 | int handle_auth_reply_more( | |
85 | AuthConnectionMeta *auth_meta, | |
86 | const ceph::buffer::list& bl, | |
87 | ceph::buffer::list *reply); | |
88 | int handle_auth_done( | |
89 | AuthConnectionMeta *auth_meta, | |
90 | uint64_t global_id, | |
91 | const ceph::buffer::list& bl, | |
92 | CryptoKey *session_key, | |
93 | std::string *connection_secret); | |
94 | int handle_auth_bad_method( | |
95 | uint32_t old_auth_method, | |
96 | int result, | |
97 | const std::vector<uint32_t>& allowed_methods, | |
98 | const std::vector<uint32_t>& allowed_modes); | |
99 | ||
100 | bool is_con(Connection *c) const { | |
101 | return con.get() == c; | |
102 | } | |
103 | void queue_command(Message *m) { | |
104 | pending_tell_command = m; | |
105 | } | |
106 | ||
107 | private: | |
108 | int _negotiate(MAuthReply *m, | |
109 | const EntityName& entity_name, | |
110 | uint32_t want_keys, | |
111 | RotatingKeyRing* keyring); | |
112 | int _init_auth(uint32_t method, | |
113 | const EntityName& entity_name, | |
114 | uint32_t want_keys, | |
115 | RotatingKeyRing* keyring, | |
116 | bool msgr2); | |
117 | ||
118 | private: | |
119 | CephContext *cct; | |
120 | enum class State { | |
121 | NONE, | |
122 | NEGOTIATING, // v1 only | |
123 | AUTHENTICATING, // v1 and v2 | |
124 | HAVE_SESSION, | |
125 | }; | |
126 | State state = State::NONE; | |
127 | ConnectionRef con; | |
128 | int auth_method = -1; | |
129 | utime_t auth_start; | |
130 | ||
131 | std::unique_ptr<AuthClientHandler> auth; | |
132 | uint64_t global_id; | |
133 | ||
134 | MessageRef pending_tell_command; | |
135 | ||
136 | AuthRegistry *auth_registry; | |
137 | }; | |
138 | ||
139 | ||
140 | struct MonClientPinger : public Dispatcher, | |
141 | public AuthClient { | |
142 | ceph::mutex lock = ceph::make_mutex("MonClientPinger::lock"); | |
143 | ceph::condition_variable ping_recvd_cond; | |
144 | std::string *result; | |
145 | bool done; | |
146 | RotatingKeyRing *keyring; | |
147 | std::unique_ptr<MonConnection> mc; | |
148 | ||
149 | MonClientPinger(CephContext *cct_, | |
150 | RotatingKeyRing *keyring, | |
151 | std::string *res_) : | |
152 | Dispatcher(cct_), | |
153 | result(res_), | |
154 | done(false), | |
155 | keyring(keyring) | |
156 | { } | |
157 | ||
158 | int wait_for_reply(double timeout = 0.0) { | |
159 | std::unique_lock locker{lock}; | |
160 | if (timeout <= 0) { | |
161 | timeout = cct->_conf->client_mount_timeout; | |
162 | } | |
163 | done = false; | |
164 | if (ping_recvd_cond.wait_for(locker, | |
165 | ceph::make_timespan(timeout), | |
166 | [this] { return done; })) { | |
167 | return 0; | |
168 | } else { | |
169 | return ETIMEDOUT; | |
170 | } | |
171 | } | |
172 | ||
173 | bool ms_dispatch(Message *m) override { | |
174 | using ceph::decode; | |
175 | std::lock_guard l(lock); | |
176 | if (m->get_type() != CEPH_MSG_PING) | |
177 | return false; | |
178 | ||
179 | ceph::buffer::list &payload = m->get_payload(); | |
180 | if (result && payload.length() > 0) { | |
181 | auto p = std::cbegin(payload); | |
182 | decode(*result, p); | |
183 | } | |
184 | done = true; | |
185 | ping_recvd_cond.notify_all(); | |
186 | m->put(); | |
187 | return true; | |
188 | } | |
189 | bool ms_handle_reset(Connection *con) override { | |
190 | std::lock_guard l(lock); | |
191 | done = true; | |
192 | ping_recvd_cond.notify_all(); | |
193 | return true; | |
194 | } | |
195 | void ms_handle_remote_reset(Connection *con) override {} | |
196 | bool ms_handle_refused(Connection *con) override { | |
197 | return false; | |
198 | } | |
199 | ||
200 | // AuthClient | |
201 | int get_auth_request( | |
202 | Connection *con, | |
203 | AuthConnectionMeta *auth_meta, | |
204 | uint32_t *auth_method, | |
205 | std::vector<uint32_t> *preferred_modes, | |
206 | ceph::buffer::list *bl) override { | |
207 | return mc->get_auth_request(auth_method, preferred_modes, bl, | |
208 | cct->_conf->name, 0, keyring); | |
209 | } | |
210 | int handle_auth_reply_more( | |
211 | Connection *con, | |
212 | AuthConnectionMeta *auth_meta, | |
213 | const ceph::buffer::list& bl, | |
214 | ceph::buffer::list *reply) override { | |
215 | return mc->handle_auth_reply_more(auth_meta, bl, reply); | |
216 | } | |
217 | int handle_auth_done( | |
218 | Connection *con, | |
219 | AuthConnectionMeta *auth_meta, | |
220 | uint64_t global_id, | |
221 | uint32_t con_mode, | |
222 | const ceph::buffer::list& bl, | |
223 | CryptoKey *session_key, | |
224 | std::string *connection_secret) override { | |
225 | return mc->handle_auth_done(auth_meta, global_id, bl, | |
226 | session_key, connection_secret); | |
227 | } | |
228 | int handle_auth_bad_method( | |
229 | Connection *con, | |
230 | AuthConnectionMeta *auth_meta, | |
231 | uint32_t old_auth_method, | |
232 | int result, | |
233 | const std::vector<uint32_t>& allowed_methods, | |
234 | const std::vector<uint32_t>& allowed_modes) override { | |
235 | return mc->handle_auth_bad_method(old_auth_method, result, | |
236 | allowed_methods, allowed_modes); | |
237 | } | |
238 | }; | |
239 | ||
240 | const boost::system::error_category& monc_category() noexcept; | |
241 | ||
242 | enum class monc_errc { | |
243 | shutting_down = 1, // Command failed due to MonClient shutting down | |
244 | session_reset, // Monitor session was reset | |
245 | rank_dne, // Requested monitor rank does not exist | |
246 | mon_dne, // Requested monitor does not exist | |
247 | timed_out, // Monitor operation timed out | |
248 | mon_unavailable // Monitor unavailable | |
249 | }; | |
250 | ||
251 | namespace boost::system { | |
252 | template<> | |
253 | struct is_error_code_enum<::monc_errc> { | |
254 | static const bool value = true; | |
255 | }; | |
256 | } | |
257 | ||
258 | // implicit conversion: | |
259 | inline boost::system::error_code make_error_code(monc_errc e) noexcept { | |
260 | return { static_cast<int>(e), monc_category() }; | |
261 | } | |
262 | ||
263 | // explicit conversion: | |
264 | inline boost::system::error_condition make_error_condition(monc_errc e) noexcept { | |
265 | return { static_cast<int>(e), monc_category() }; | |
266 | } | |
267 | ||
268 | const boost::system::error_category& monc_category() noexcept; | |
269 | ||
270 | class MonClient : public Dispatcher, | |
271 | public AuthClient, | |
272 | public AuthServer /* for mgr, osd, mds */ { | |
273 | static constexpr auto dout_subsys = ceph_subsys_monc; | |
274 | public: | |
275 | // Error, Newest, Oldest | |
276 | using VersionSig = void(boost::system::error_code, version_t, version_t); | |
277 | using VersionCompletion = ceph::async::Completion<VersionSig>; | |
278 | ||
279 | using CommandSig = void(boost::system::error_code, std::string, | |
280 | ceph::buffer::list); | |
281 | using CommandCompletion = ceph::async::Completion<CommandSig>; | |
282 | ||
283 | MonMap monmap; | |
284 | std::map<std::string,std::string> config_mgr; | |
285 | ||
286 | private: | |
287 | Messenger *messenger; | |
288 | ||
289 | std::unique_ptr<MonConnection> active_con; | |
290 | std::map<entity_addrvec_t, MonConnection> pending_cons; | |
291 | std::set<unsigned> tried; | |
292 | ||
293 | EntityName entity_name; | |
294 | ||
295 | mutable ceph::mutex monc_lock = ceph::make_mutex("MonClient::monc_lock"); | |
296 | SafeTimer timer; | |
297 | boost::asio::io_context& service; | |
298 | boost::asio::io_context::strand finish_strand{service}; | |
299 | ||
300 | bool initialized; | |
301 | bool stopping = false; | |
302 | ||
303 | LogClient *log_client; | |
304 | bool more_log_pending; | |
305 | ||
306 | void send_log(bool flush = false); | |
307 | ||
308 | bool ms_dispatch(Message *m) override; | |
309 | bool ms_handle_reset(Connection *con) override; | |
310 | void ms_handle_remote_reset(Connection *con) override {} | |
311 | bool ms_handle_refused(Connection *con) override { return false; } | |
312 | ||
313 | void handle_monmap(MMonMap *m); | |
314 | void handle_config(MConfig *m); | |
315 | ||
316 | void handle_auth(MAuthReply *m); | |
317 | ||
318 | // monitor session | |
319 | utime_t last_keepalive; | |
320 | utime_t last_send_log; | |
321 | ||
322 | void tick(); | |
323 | void schedule_tick(); | |
324 | ||
325 | // monclient | |
326 | bool want_monmap; | |
327 | ceph::condition_variable map_cond; | |
328 | bool passthrough_monmap = false; | |
329 | ||
330 | bool want_bootstrap_config = false; | |
331 | ceph::ref_t<MConfig> bootstrap_config; | |
332 | ||
333 | // authenticate | |
334 | std::unique_ptr<AuthClientHandler> auth; | |
335 | uint32_t want_keys = 0; | |
336 | uint64_t global_id = 0; | |
337 | ceph::condition_variable auth_cond; | |
338 | int authenticate_err = 0; | |
339 | bool authenticated = false; | |
340 | ||
341 | std::list<MessageRef> waiting_for_session; | |
342 | utime_t last_rotating_renew_sent; | |
343 | bool had_a_connection; | |
344 | double reopen_interval_multiplier; | |
345 | ||
346 | Dispatcher *handle_authentication_dispatcher = nullptr; | |
347 | bool _opened() const; | |
348 | bool _hunting() const; | |
349 | void _start_hunting(); | |
350 | void _finish_hunting(int auth_err); | |
351 | void _finish_auth(int auth_err); | |
352 | void _reopen_session(int rank = -1); | |
353 | void _add_conn(unsigned rank); | |
354 | void _add_conns(); | |
355 | void _un_backoff(); | |
356 | void _send_mon_message(MessageRef m); | |
357 | ||
358 | std::map<entity_addrvec_t, MonConnection>::iterator _find_pending_con( | |
359 | const ConnectionRef& con) { | |
360 | for (auto i = pending_cons.begin(); i != pending_cons.end(); ++i) { | |
361 | if (i->second.get_con() == con) { | |
362 | return i; | |
363 | } | |
364 | } | |
365 | return pending_cons.end(); | |
366 | } | |
367 | ||
368 | public: | |
369 | // AuthClient | |
370 | int get_auth_request( | |
371 | Connection *con, | |
372 | AuthConnectionMeta *auth_meta, | |
373 | uint32_t *method, | |
374 | std::vector<uint32_t> *preferred_modes, | |
375 | ceph::buffer::list *bl) override; | |
376 | int handle_auth_reply_more( | |
377 | Connection *con, | |
378 | AuthConnectionMeta *auth_meta, | |
379 | const ceph::buffer::list& bl, | |
380 | ceph::buffer::list *reply) override; | |
381 | int handle_auth_done( | |
382 | Connection *con, | |
383 | AuthConnectionMeta *auth_meta, | |
384 | uint64_t global_id, | |
385 | uint32_t con_mode, | |
386 | const ceph::buffer::list& bl, | |
387 | CryptoKey *session_key, | |
388 | std::string *connection_secret) override; | |
389 | int handle_auth_bad_method( | |
390 | Connection *con, | |
391 | AuthConnectionMeta *auth_meta, | |
392 | uint32_t old_auth_method, | |
393 | int result, | |
394 | const std::vector<uint32_t>& allowed_methods, | |
395 | const std::vector<uint32_t>& allowed_modes) override; | |
396 | // AuthServer | |
397 | int handle_auth_request( | |
398 | Connection *con, | |
399 | AuthConnectionMeta *auth_meta, | |
400 | bool more, | |
401 | uint32_t auth_method, | |
402 | const ceph::buffer::list& bl, | |
403 | ceph::buffer::list *reply) override; | |
404 | ||
405 | void set_entity_name(EntityName name) { entity_name = name; } | |
406 | void set_handle_authentication_dispatcher(Dispatcher *d) { | |
407 | handle_authentication_dispatcher = d; | |
408 | } | |
409 | int _check_auth_tickets(); | |
410 | int _check_auth_rotating(); | |
411 | int wait_auth_rotating(double timeout); | |
412 | ||
413 | int authenticate(double timeout=0.0); | |
414 | bool is_authenticated() const {return authenticated;} | |
415 | ||
416 | bool is_connected() const { return active_con != nullptr; } | |
417 | ||
418 | /** | |
419 | * Try to flush as many log messages as we can in a single | |
420 | * message. Use this before shutting down to transmit your | |
421 | * last message. | |
422 | */ | |
423 | void flush_log(); | |
424 | ||
425 | private: | |
426 | // mon subscriptions | |
427 | MonSub sub; | |
428 | void _renew_subs(); | |
429 | void handle_subscribe_ack(MMonSubscribeAck* m); | |
430 | ||
431 | public: | |
432 | void renew_subs() { | |
433 | std::lock_guard l(monc_lock); | |
434 | _renew_subs(); | |
435 | } | |
436 | bool sub_want(std::string what, version_t start, unsigned flags) { | |
437 | std::lock_guard l(monc_lock); | |
438 | return sub.want(what, start, flags); | |
439 | } | |
440 | void sub_got(std::string what, version_t have) { | |
441 | std::lock_guard l(monc_lock); | |
442 | sub.got(what, have); | |
443 | } | |
444 | void sub_unwant(std::string what) { | |
445 | std::lock_guard l(monc_lock); | |
446 | sub.unwant(what); | |
447 | } | |
448 | bool sub_want_increment(std::string what, version_t start, unsigned flags) { | |
449 | std::lock_guard l(monc_lock); | |
450 | return sub.inc_want(what, start, flags); | |
451 | } | |
452 | ||
453 | std::unique_ptr<KeyRing> keyring; | |
454 | std::unique_ptr<RotatingKeyRing> rotating_secrets; | |
455 | ||
456 | public: | |
457 | MonClient(CephContext *cct_, boost::asio::io_context& service); | |
458 | MonClient(const MonClient &) = delete; | |
459 | MonClient& operator=(const MonClient &) = delete; | |
460 | ~MonClient() override; | |
461 | ||
462 | int init(); | |
463 | void shutdown(); | |
464 | ||
465 | void set_log_client(LogClient *clog) { | |
466 | log_client = clog; | |
467 | } | |
468 | LogClient *get_log_client() { | |
469 | return log_client; | |
470 | } | |
471 | ||
472 | int build_initial_monmap(); | |
473 | int get_monmap(); | |
474 | int get_monmap_and_config(); | |
475 | /** | |
476 | * If you want to see MonMap messages, set this and | |
477 | * the MonClient will tell the Messenger it hasn't | |
478 | * dealt with it. | |
479 | * Note that if you do this, *you* are of course responsible for | |
480 | * putting the message reference! | |
481 | */ | |
482 | void set_passthrough_monmap() { | |
483 | std::lock_guard l(monc_lock); | |
484 | passthrough_monmap = true; | |
485 | } | |
486 | void unset_passthrough_monmap() { | |
487 | std::lock_guard l(monc_lock); | |
488 | passthrough_monmap = false; | |
489 | } | |
490 | /** | |
491 | * Ping monitor with ID @p mon_id and record the resulting | |
492 | * reply in @p result_reply. | |
493 | * | |
494 | * @param[in] mon_id Target monitor's ID | |
495 | * @param[out] result_reply reply from mon.ID, if param != NULL | |
496 | * @returns 0 in case of success; < 0 in case of error, | |
497 | * -ETIMEDOUT if monitor didn't reply before timeout | |
498 | * expired (default: conf->client_mount_timeout). | |
499 | */ | |
500 | int ping_monitor(const std::string &mon_id, std::string *result_reply); | |
501 | ||
502 | void send_mon_message(Message *m) { | |
503 | send_mon_message(MessageRef{m, false}); | |
504 | } | |
505 | void send_mon_message(MessageRef m); | |
506 | ||
507 | void reopen_session() { | |
508 | std::lock_guard l(monc_lock); | |
509 | _reopen_session(); | |
510 | } | |
511 | ||
512 | const uuid_d& get_fsid() const { | |
513 | return monmap.fsid; | |
514 | } | |
515 | ||
516 | entity_addrvec_t get_mon_addrs(unsigned i) const { | |
517 | std::lock_guard l(monc_lock); | |
518 | if (i < monmap.size()) | |
519 | return monmap.get_addrs(i); | |
520 | return entity_addrvec_t(); | |
521 | } | |
522 | int get_num_mon() const { | |
523 | std::lock_guard l(monc_lock); | |
524 | return monmap.size(); | |
525 | } | |
526 | ||
527 | uint64_t get_global_id() const { | |
528 | std::lock_guard l(monc_lock); | |
529 | return global_id; | |
530 | } | |
531 | ||
532 | void set_messenger(Messenger *m) { messenger = m; } | |
533 | entity_addrvec_t get_myaddrs() const { return messenger->get_myaddrs(); } | |
534 | AuthAuthorizer* build_authorizer(int service_id) const; | |
535 | ||
536 | void set_want_keys(uint32_t want) { | |
537 | want_keys = want; | |
538 | } | |
539 | ||
540 | // admin commands | |
541 | private: | |
542 | uint64_t last_mon_command_tid; | |
543 | ||
544 | struct MonCommand { | |
545 | // for tell only | |
546 | std::string target_name; | |
547 | int target_rank = -1; | |
548 | ConnectionRef target_con; | |
549 | std::unique_ptr<MonConnection> target_session; | |
550 | unsigned send_attempts = 0; ///< attempt count for legacy mons | |
551 | utime_t last_send_attempt; | |
552 | uint64_t tid; | |
553 | std::vector<std::string> cmd; | |
554 | ceph::buffer::list inbl; | |
555 | std::unique_ptr<CommandCompletion> onfinish; | |
556 | std::optional<boost::asio::steady_timer> cancel_timer; | |
557 | ||
558 | MonCommand(MonClient& monc, uint64_t t, std::unique_ptr<CommandCompletion> onfinish) | |
559 | : tid(t), onfinish(std::move(onfinish)) { | |
560 | auto timeout = | |
561 | monc.cct->_conf.get_val<std::chrono::seconds>("rados_mon_op_timeout"); | |
562 | if (timeout.count() > 0) { | |
563 | cancel_timer.emplace(monc.service, timeout); | |
564 | cancel_timer->async_wait( | |
565 | [this, &monc](boost::system::error_code ec) { | |
566 | if (ec) | |
567 | return; | |
568 | std::scoped_lock l(monc.monc_lock); | |
569 | monc._cancel_mon_command(tid); | |
570 | }); | |
571 | } | |
572 | } | |
573 | ||
574 | bool is_tell() const { | |
575 | return target_name.size() || target_rank >= 0; | |
576 | } | |
577 | }; | |
578 | friend MonCommand; | |
579 | std::map<uint64_t,MonCommand*> mon_commands; | |
580 | ||
581 | void _send_command(MonCommand *r); | |
582 | void _check_tell_commands(); | |
583 | void _resend_mon_commands(); | |
584 | int _cancel_mon_command(uint64_t tid); | |
585 | void _finish_command(MonCommand *r, boost::system::error_code ret, std::string_view rs, | |
586 | bufferlist&& bl); | |
587 | void _finish_auth(); | |
588 | void handle_mon_command_ack(MMonCommandAck *ack); | |
589 | void handle_command_reply(MCommandReply *reply); | |
590 | ||
591 | public: | |
592 | template<typename CompletionToken> | |
593 | auto start_mon_command(const std::vector<std::string>& cmd, | |
594 | const ceph::buffer::list& inbl, | |
595 | CompletionToken&& token) { | |
596 | ldout(cct,10) << __func__ << " cmd=" << cmd << dendl; | |
597 | boost::asio::async_completion<CompletionToken, CommandSig> init(token); | |
598 | { | |
599 | std::scoped_lock l(monc_lock); | |
600 | auto h = CommandCompletion::create(service.get_executor(), | |
601 | std::move(init.completion_handler)); | |
602 | if (!initialized || stopping) { | |
603 | ceph::async::post(std::move(h), monc_errc::shutting_down, std::string{}, | |
604 | bufferlist{}); | |
605 | } else { | |
606 | auto r = new MonCommand(*this, ++last_mon_command_tid, std::move(h)); | |
607 | r->cmd = cmd; | |
608 | r->inbl = inbl; | |
609 | mon_commands.emplace(r->tid, r); | |
610 | _send_command(r); | |
611 | } | |
612 | } | |
613 | return init.result.get(); | |
614 | } | |
615 | ||
616 | template<typename CompletionToken> | |
617 | auto start_mon_command(int mon_rank, const std::vector<std::string>& cmd, | |
618 | const ceph::buffer::list& inbl, CompletionToken&& token) { | |
619 | ldout(cct,10) << __func__ << " cmd=" << cmd << dendl; | |
620 | boost::asio::async_completion<CompletionToken, CommandSig> init(token); | |
621 | { | |
622 | std::scoped_lock l(monc_lock); | |
623 | auto h = CommandCompletion::create(service.get_executor(), | |
624 | std::move(init.completion_handler)); | |
625 | if (!initialized || stopping) { | |
626 | ceph::async::post(std::move(h), monc_errc::shutting_down, std::string{}, | |
627 | bufferlist{}); | |
628 | } else { | |
629 | auto r = new MonCommand(*this, ++last_mon_command_tid, std::move(h)); | |
630 | r->target_rank = mon_rank; | |
631 | r->cmd = cmd; | |
632 | r->inbl = inbl; | |
633 | mon_commands.emplace(r->tid, r); | |
634 | _send_command(r); | |
635 | } | |
636 | } | |
637 | return init.result.get(); | |
638 | } | |
639 | ||
640 | template<typename CompletionToken> | |
641 | auto start_mon_command(const std::string& mon_name, | |
642 | const std::vector<std::string>& cmd, | |
643 | const ceph::buffer::list& inbl, | |
644 | CompletionToken&& token) { | |
645 | ldout(cct,10) << __func__ << " cmd=" << cmd << dendl; | |
646 | boost::asio::async_completion<CompletionToken, CommandSig> init(token); | |
647 | { | |
648 | std::scoped_lock l(monc_lock); | |
649 | auto h = CommandCompletion::create(service.get_executor(), | |
650 | std::move(init.completion_handler)); | |
651 | if (!initialized || stopping) { | |
652 | ceph::async::post(std::move(h), monc_errc::shutting_down, std::string{}, | |
653 | bufferlist{}); | |
654 | } else { | |
655 | auto r = new MonCommand(*this, ++last_mon_command_tid, std::move(h)); | |
656 | // detect/tolerate mon *rank* passed as a string | |
657 | std::string err; | |
658 | int rank = strict_strtoll(mon_name.c_str(), 10, &err); | |
659 | if (err.size() == 0 && rank >= 0) { | |
660 | ldout(cct,10) << __func__ << " interpreting name '" << mon_name | |
661 | << "' as rank " << rank << dendl; | |
662 | r->target_rank = rank; | |
663 | } else { | |
664 | r->target_name = mon_name; | |
665 | } | |
666 | r->cmd = cmd; | |
667 | r->inbl = inbl; | |
668 | mon_commands.emplace(r->tid, r); | |
669 | _send_command(r); | |
670 | } | |
671 | } | |
672 | return init.result.get(); | |
673 | } | |
674 | ||
675 | class ContextVerter { | |
676 | std::string* outs; | |
677 | ceph::bufferlist* outbl; | |
678 | Context* onfinish; | |
679 | ||
680 | public: | |
681 | ContextVerter(std::string* outs, ceph::bufferlist* outbl, Context* onfinish) | |
682 | : outs(outs), outbl(outbl), onfinish(onfinish) {} | |
683 | ~ContextVerter() = default; | |
684 | ContextVerter(const ContextVerter&) = default; | |
685 | ContextVerter& operator =(const ContextVerter&) = default; | |
686 | ContextVerter(ContextVerter&&) = default; | |
687 | ContextVerter& operator =(ContextVerter&&) = default; | |
688 | ||
689 | void operator()(boost::system::error_code e, | |
690 | std::string s, | |
691 | ceph::bufferlist bl) { | |
692 | if (outs) | |
693 | *outs = std::move(s); | |
694 | if (outbl) | |
695 | *outbl = std::move(bl); | |
696 | if (onfinish) | |
697 | onfinish->complete(ceph::from_error_code(e)); | |
698 | } | |
699 | }; | |
700 | ||
701 | void start_mon_command(const vector<string>& cmd, const bufferlist& inbl, | |
702 | bufferlist *outbl, string *outs, | |
703 | Context *onfinish) { | |
704 | start_mon_command(cmd, inbl, ContextVerter(outs, outbl, onfinish)); | |
705 | } | |
706 | void start_mon_command(int mon_rank, | |
707 | const vector<string>& cmd, const bufferlist& inbl, | |
708 | bufferlist *outbl, string *outs, | |
709 | Context *onfinish) { | |
710 | start_mon_command(mon_rank, cmd, inbl, ContextVerter(outs, outbl, onfinish)); | |
711 | } | |
712 | void start_mon_command(const string &mon_name, ///< mon name, with mon. prefix | |
713 | const vector<string>& cmd, const bufferlist& inbl, | |
714 | bufferlist *outbl, string *outs, | |
715 | Context *onfinish) { | |
716 | start_mon_command(mon_name, cmd, inbl, ContextVerter(outs, outbl, onfinish)); | |
717 | } | |
718 | ||
719 | ||
720 | // version requests | |
721 | public: | |
722 | /** | |
723 | * get latest known version(s) of cluster map | |
724 | * | |
725 | * @param map string name of map (e.g., 'osdmap') | |
726 | * @param token context that will be triggered on completion | |
727 | * @return (via Completion) {} on success, | |
728 | * boost::system::errc::resource_unavailable_try_again if we need to | |
729 | * resubmit our request | |
730 | */ | |
731 | template<typename CompletionToken> | |
732 | auto get_version(std::string&& map, CompletionToken&& token) { | |
733 | boost::asio::async_completion<CompletionToken, VersionSig> init(token); | |
734 | { | |
735 | std::scoped_lock l(monc_lock); | |
736 | auto m = ceph::make_message<MMonGetVersion>(); | |
737 | m->what = std::move(map); | |
738 | m->handle = ++version_req_id; | |
739 | version_requests.emplace(m->handle, | |
740 | VersionCompletion::create( | |
741 | service.get_executor(), | |
742 | std::move(init.completion_handler))); | |
743 | _send_mon_message(m); | |
744 | } | |
745 | return init.result.get(); | |
746 | } | |
747 | ||
748 | /** | |
749 | * Run a callback within our lock, with a reference | |
750 | * to the MonMap | |
751 | */ | |
752 | template<typename Callback, typename...Args> | |
753 | auto with_monmap(Callback&& cb, Args&&...args) const -> | |
754 | decltype(cb(monmap, std::forward<Args>(args)...)) { | |
755 | std::lock_guard l(monc_lock); | |
756 | return std::forward<Callback>(cb)(monmap, std::forward<Args>(args)...); | |
757 | } | |
758 | ||
759 | void register_config_callback(md_config_t::config_callback fn); | |
760 | void register_config_notify_callback(std::function<void(void)> f) { | |
761 | config_notify_cb = f; | |
762 | } | |
763 | md_config_t::config_callback get_config_callback(); | |
764 | ||
765 | private: | |
766 | ||
767 | std::map<ceph_tid_t, std::unique_ptr<VersionCompletion>> version_requests; | |
768 | ceph_tid_t version_req_id; | |
769 | void handle_get_version_reply(MMonGetVersionReply* m); | |
770 | md_config_t::config_callback config_cb; | |
771 | std::function<void(void)> config_notify_cb; | |
772 | }; | |
773 | ||
774 | #endif |