]> git.proxmox.com Git - ceph.git/blob - ceph/src/mon/MonClient.cc
39fbf44883c4777325c37e8f98ffde856e9f3f6e
[ceph.git] / ceph / src / mon / MonClient.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #include <algorithm>
16 #include <iterator>
17 #include <random>
18 #include <boost/range/adaptor/map.hpp>
19 #include <boost/range/adaptor/filtered.hpp>
20 #include <boost/range/algorithm/copy.hpp>
21 #include <boost/range/algorithm_ext/copy_n.hpp>
22 #include "common/weighted_shuffle.h"
23
24 #include "include/random.h"
25 #include "include/scope_guard.h"
26 #include "include/stringify.h"
27
28 #include "messages/MMonGetMap.h"
29 #include "messages/MMonGetVersion.h"
30 #include "messages/MMonGetMap.h"
31 #include "messages/MMonGetVersionReply.h"
32 #include "messages/MMonMap.h"
33 #include "messages/MConfig.h"
34 #include "messages/MAuth.h"
35 #include "messages/MLogAck.h"
36 #include "messages/MAuthReply.h"
37 #include "messages/MMonCommand.h"
38 #include "messages/MMonCommandAck.h"
39 #include "messages/MCommand.h"
40 #include "messages/MCommandReply.h"
41 #include "messages/MPing.h"
42
43 #include "messages/MMonSubscribe.h"
44 #include "messages/MMonSubscribeAck.h"
45 #include "common/errno.h"
46 #include "common/hostname.h"
47 #include "common/LogClient.h"
48
49 #include "MonClient.h"
50 #include "error_code.h"
51 #include "MonMap.h"
52
53 #include "auth/Auth.h"
54 #include "auth/KeyRing.h"
55 #include "auth/AuthClientHandler.h"
56 #include "auth/AuthRegistry.h"
57 #include "auth/RotatingKeyRing.h"
58
59 #define dout_subsys ceph_subsys_monc
60 #undef dout_prefix
61 #define dout_prefix *_dout << "monclient" << (_hunting() ? "(hunting)":"") << ": "
62
63 namespace bs = boost::system;
64 using std::string;
65 using namespace std::literals;
66
67 MonClient::MonClient(CephContext *cct_, boost::asio::io_context& service) :
68 Dispatcher(cct_),
69 AuthServer(cct_),
70 messenger(NULL),
71 timer(cct_, monc_lock),
72 service(service),
73 initialized(false),
74 log_client(NULL),
75 more_log_pending(false),
76 want_monmap(true),
77 had_a_connection(false),
78 reopen_interval_multiplier(
79 cct_->_conf.get_val<double>("mon_client_hunt_interval_min_multiple")),
80 last_mon_command_tid(0),
81 version_req_id(0)
82 {}
83
84 MonClient::~MonClient()
85 {
86 }
87
88 int MonClient::build_initial_monmap()
89 {
90 ldout(cct, 10) << __func__ << dendl;
91 int r = monmap.build_initial(cct, false, std::cerr);
92 ldout(cct,10) << "monmap:\n";
93 monmap.print(*_dout);
94 *_dout << dendl;
95 return r;
96 }
97
98 int MonClient::get_monmap()
99 {
100 ldout(cct, 10) << __func__ << dendl;
101 std::unique_lock l(monc_lock);
102
103 sub.want("monmap", 0, 0);
104 if (!_opened())
105 _reopen_session();
106 map_cond.wait(l, [this] { return !want_monmap; });
107 ldout(cct, 10) << __func__ << " done" << dendl;
108 return 0;
109 }
110
111 int MonClient::get_monmap_and_config()
112 {
113 ldout(cct, 10) << __func__ << dendl;
114 ceph_assert(!messenger);
115
116 int tries = 10;
117
118 cct->init_crypto();
119 auto shutdown_crypto = make_scope_guard([this] {
120 cct->shutdown_crypto();
121 });
122
123 int r = build_initial_monmap();
124 if (r < 0) {
125 lderr(cct) << __func__ << " cannot identify monitors to contact" << dendl;
126 return r;
127 }
128
129 messenger = Messenger::create_client_messenger(
130 cct, "temp_mon_client");
131 ceph_assert(messenger);
132 messenger->add_dispatcher_head(this);
133 messenger->start();
134 auto shutdown_msgr = make_scope_guard([this] {
135 messenger->shutdown();
136 messenger->wait();
137 delete messenger;
138 messenger = nullptr;
139 if (!monmap.fsid.is_zero()) {
140 cct->_conf.set_val("fsid", stringify(monmap.fsid));
141 }
142 });
143
144 want_bootstrap_config = true;
145 auto shutdown_config = make_scope_guard([this] {
146 std::unique_lock l(monc_lock);
147 want_bootstrap_config = false;
148 bootstrap_config.reset();
149 });
150
151 ceph::ref_t<MConfig> config;
152 while (tries-- > 0) {
153 r = init();
154 if (r < 0) {
155 return r;
156 }
157 r = authenticate(std::chrono::duration<double>(cct->_conf.get_val<std::chrono::seconds>("client_mount_timeout")).count());
158 if (r == -ETIMEDOUT) {
159 shutdown();
160 continue;
161 }
162 if (r < 0) {
163 break;
164 }
165 {
166 std::unique_lock l(monc_lock);
167 if (monmap.get_epoch() &&
168 !monmap.persistent_features.contains_all(
169 ceph::features::mon::FEATURE_MIMIC)) {
170 ldout(cct,10) << __func__ << " pre-mimic monitor, no config to fetch"
171 << dendl;
172 r = 0;
173 break;
174 }
175 while ((!bootstrap_config || monmap.get_epoch() == 0) && r == 0) {
176 ldout(cct,20) << __func__ << " waiting for monmap|config" << dendl;
177 auto status = map_cond.wait_for(l, ceph::make_timespan(
178 cct->_conf->mon_client_hunt_interval));
179 if (status == std::cv_status::timeout) {
180 r = -ETIMEDOUT;
181 }
182 }
183
184 if (bootstrap_config) {
185 ldout(cct,10) << __func__ << " success" << dendl;
186 config = std::move(bootstrap_config);
187 r = 0;
188 break;
189 }
190 }
191 lderr(cct) << __func__ << " failed to get config" << dendl;
192 shutdown();
193 continue;
194 }
195
196 if (config) {
197 // apply the bootstrap config to ensure its applied prior to completing
198 // the bootstrap
199 cct->_conf.set_mon_vals(cct, config->config, config_cb);
200 }
201
202 shutdown();
203 return r;
204 }
205
206
207 /**
208 * Ping the monitor with id @p mon_id and set the resulting reply in
209 * the provided @p result_reply, if this last parameter is not NULL.
210 *
211 * So that we don't rely on the MonClient's default messenger, set up
212 * during connect(), we create our own messenger to comunicate with the
213 * specified monitor. This is advantageous in the following ways:
214 *
215 * - Isolate the ping procedure from the rest of the MonClient's operations,
216 * allowing us to not acquire or manage the big monc_lock, thus not
217 * having to block waiting for some other operation to finish before we
218 * can proceed.
219 * * for instance, we can ping mon.FOO even if we are currently hunting
220 * or blocked waiting for auth to complete with mon.BAR.
221 *
222 * - Ping a monitor prior to establishing a connection (using connect())
223 * and properly establish the MonClient's messenger. This frees us
224 * from dealing with the complex foo that happens in connect().
225 *
226 * We also don't rely on MonClient as a dispatcher for this messenger,
227 * unlike what happens with the MonClient's default messenger. This allows
228 * us to sandbox the whole ping, having it much as a separate entity in
229 * the MonClient class, considerably simplifying the handling and dispatching
230 * of messages without needing to consider monc_lock.
231 *
232 * Current drawback is that we will establish a messenger for each ping
233 * we want to issue, instead of keeping a single messenger instance that
234 * would be used for all pings.
235 */
236 int MonClient::ping_monitor(const string &mon_id, string *result_reply)
237 {
238 ldout(cct, 10) << __func__ << dendl;
239
240 string new_mon_id;
241 if (monmap.contains("noname-"+mon_id)) {
242 new_mon_id = "noname-"+mon_id;
243 } else {
244 new_mon_id = mon_id;
245 }
246
247 if (new_mon_id.empty()) {
248 ldout(cct, 10) << __func__ << " specified mon id is empty!" << dendl;
249 return -EINVAL;
250 } else if (!monmap.contains(new_mon_id)) {
251 ldout(cct, 10) << __func__ << " no such monitor 'mon." << new_mon_id << "'"
252 << dendl;
253 return -ENOENT;
254 }
255
256 // N.B. monc isn't initialized
257
258 auth_registry.refresh_config();
259
260 KeyRing keyring;
261 keyring.from_ceph_context(cct);
262 RotatingKeyRing rkeyring(cct, cct->get_module_type(), &keyring);
263
264 MonClientPinger *pinger = new MonClientPinger(cct,
265 &rkeyring,
266 result_reply);
267
268 Messenger *smsgr = Messenger::create_client_messenger(cct, "temp_ping_client");
269 smsgr->add_dispatcher_head(pinger);
270 smsgr->set_auth_client(pinger);
271 smsgr->start();
272
273 ConnectionRef con = smsgr->connect_to_mon(monmap.get_addrs(new_mon_id));
274 ldout(cct, 10) << __func__ << " ping mon." << new_mon_id
275 << " " << con->get_peer_addr() << dendl;
276
277 pinger->mc.reset(new MonConnection(cct, con, 0, &auth_registry));
278 pinger->mc->start(monmap.get_epoch(), entity_name);
279 con->send_message(new MPing);
280
281 int ret = pinger->wait_for_reply(cct->_conf->mon_client_ping_timeout);
282 if (ret == 0) {
283 ldout(cct,10) << __func__ << " got ping reply" << dendl;
284 } else {
285 ret = -ret;
286 }
287
288 con->mark_down();
289 pinger->mc.reset();
290 smsgr->shutdown();
291 smsgr->wait();
292 delete smsgr;
293 delete pinger;
294 return ret;
295 }
296
297 bool MonClient::ms_dispatch(Message *m)
298 {
299 // we only care about these message types
300 switch (m->get_type()) {
301 case CEPH_MSG_MON_MAP:
302 case CEPH_MSG_AUTH_REPLY:
303 case CEPH_MSG_MON_SUBSCRIBE_ACK:
304 case CEPH_MSG_MON_GET_VERSION_REPLY:
305 case MSG_MON_COMMAND_ACK:
306 case MSG_COMMAND_REPLY:
307 case MSG_LOGACK:
308 case MSG_CONFIG:
309 break;
310 case CEPH_MSG_PING:
311 m->put();
312 return true;
313 default:
314 return false;
315 }
316
317 std::lock_guard lock(monc_lock);
318
319 if (!m->get_connection()->is_anon() &&
320 m->get_source().type() == CEPH_ENTITY_TYPE_MON) {
321 if (_hunting()) {
322 auto p = _find_pending_con(m->get_connection());
323 if (p == pending_cons.end()) {
324 // ignore any messages outside hunting sessions
325 ldout(cct, 10) << "discarding stray monitor message " << *m << dendl;
326 m->put();
327 return true;
328 }
329 } else if (!active_con || active_con->get_con() != m->get_connection()) {
330 // ignore any messages outside our session(s)
331 ldout(cct, 10) << "discarding stray monitor message " << *m << dendl;
332 m->put();
333 return true;
334 }
335 }
336
337 switch (m->get_type()) {
338 case CEPH_MSG_MON_MAP:
339 handle_monmap(static_cast<MMonMap*>(m));
340 if (passthrough_monmap) {
341 return false;
342 } else {
343 m->put();
344 }
345 break;
346 case CEPH_MSG_AUTH_REPLY:
347 handle_auth(static_cast<MAuthReply*>(m));
348 break;
349 case CEPH_MSG_MON_SUBSCRIBE_ACK:
350 handle_subscribe_ack(static_cast<MMonSubscribeAck*>(m));
351 break;
352 case CEPH_MSG_MON_GET_VERSION_REPLY:
353 handle_get_version_reply(static_cast<MMonGetVersionReply*>(m));
354 break;
355 case MSG_MON_COMMAND_ACK:
356 handle_mon_command_ack(static_cast<MMonCommandAck*>(m));
357 break;
358 case MSG_COMMAND_REPLY:
359 if (m->get_connection()->is_anon() &&
360 m->get_source().type() == CEPH_ENTITY_TYPE_MON) {
361 // this connection is from 'tell'... ignore everything except our command
362 // reply. (we'll get misc other message because we authenticated, but we
363 // don't need them.)
364 handle_command_reply(static_cast<MCommandReply*>(m));
365 return true;
366 }
367 // leave the message for another dispatch handler (e.g., Objecter)
368 return false;
369 case MSG_LOGACK:
370 if (log_client) {
371 log_client->handle_log_ack(static_cast<MLogAck*>(m));
372 m->put();
373 if (more_log_pending) {
374 send_log();
375 }
376 } else {
377 m->put();
378 }
379 break;
380 case MSG_CONFIG:
381 handle_config(static_cast<MConfig*>(m));
382 break;
383 }
384 return true;
385 }
386
387 void MonClient::send_log(bool flush)
388 {
389 if (log_client) {
390 auto lm = log_client->get_mon_log_message(flush);
391 if (lm)
392 _send_mon_message(std::move(lm));
393 more_log_pending = log_client->are_pending();
394 }
395 }
396
397 void MonClient::flush_log()
398 {
399 std::lock_guard l(monc_lock);
400 send_log();
401 }
402
403 /* Unlike all the other message-handling functions, we don't put away a reference
404 * because we want to support MMonMap passthrough to other Dispatchers. */
405 void MonClient::handle_monmap(MMonMap *m)
406 {
407 ldout(cct, 10) << __func__ << " " << *m << dendl;
408 auto con_addrs = m->get_source_addrs();
409 string old_name = monmap.get_name(con_addrs);
410 const auto old_epoch = monmap.get_epoch();
411
412 auto p = m->monmapbl.cbegin();
413 decode(monmap, p);
414
415 ldout(cct, 10) << " got monmap " << monmap.epoch
416 << " from mon." << old_name
417 << " (according to old e" << monmap.get_epoch() << ")"
418 << dendl;
419 ldout(cct, 10) << "dump:\n";
420 monmap.print(*_dout);
421 *_dout << dendl;
422
423 if (old_epoch != monmap.get_epoch()) {
424 tried.clear();
425 }
426 if (old_name.size() == 0) {
427 ldout(cct,10) << " can't identify which mon we were connected to" << dendl;
428 _reopen_session();
429 } else {
430 auto new_name = monmap.get_name(con_addrs);
431 if (new_name.empty()) {
432 ldout(cct, 10) << "mon." << old_name << " at " << con_addrs
433 << " went away" << dendl;
434 // can't find the mon we were talking to (above)
435 _reopen_session();
436 } else if (messenger->should_use_msgr2() &&
437 monmap.get_addrs(new_name).has_msgr2() &&
438 !con_addrs.has_msgr2()) {
439 ldout(cct,1) << " mon." << new_name << " has (v2) addrs "
440 << monmap.get_addrs(new_name) << " but i'm connected to "
441 << con_addrs << ", reconnecting" << dendl;
442 _reopen_session();
443 }
444 }
445
446 cct->set_mon_addrs(monmap);
447
448 sub.got("monmap", monmap.get_epoch());
449 map_cond.notify_all();
450 want_monmap = false;
451
452 if (authenticate_err == 1) {
453 _finish_auth(0);
454 }
455 }
456
457 void MonClient::handle_config(MConfig *m)
458 {
459 ldout(cct,10) << __func__ << " " << *m << dendl;
460
461 if (want_bootstrap_config) {
462 // get_monmap_and_config is waiting for config which it will apply
463 // synchronously
464 bootstrap_config = ceph::ref_t<MConfig>(m, false);
465 map_cond.notify_all();
466 return;
467 }
468
469 // Take the sledgehammer approach to ensuring we don't depend on
470 // anything in MonClient.
471 boost::asio::post(finish_strand,
472 [m, cct = boost::intrusive_ptr<CephContext>(cct),
473 config_notify_cb = config_notify_cb,
474 config_cb = config_cb]() {
475 cct->_conf.set_mon_vals(cct.get(), m->config, config_cb);
476 if (config_notify_cb) {
477 config_notify_cb();
478 }
479 m->put();
480 });
481 }
482
483 // ----------------------
484
485 int MonClient::init()
486 {
487 ldout(cct, 10) << __func__ << dendl;
488
489 entity_name = cct->_conf->name;
490
491 auth_registry.refresh_config();
492
493 std::lock_guard l(monc_lock);
494 keyring.reset(new KeyRing);
495 if (auth_registry.is_supported_method(messenger->get_mytype(),
496 CEPH_AUTH_CEPHX)) {
497 // this should succeed, because auth_registry just checked!
498 int r = keyring->from_ceph_context(cct);
499 if (r != 0) {
500 // but be somewhat graceful in case there was a race condition
501 lderr(cct) << "keyring not found" << dendl;
502 return r;
503 }
504 }
505 if (!auth_registry.any_supported_methods(messenger->get_mytype())) {
506 return -ENOENT;
507 }
508
509 rotating_secrets.reset(
510 new RotatingKeyRing(cct, cct->get_module_type(), keyring.get()));
511
512 initialized = true;
513
514 messenger->set_auth_client(this);
515 messenger->add_dispatcher_head(this);
516
517 timer.init();
518 schedule_tick();
519
520 cct->get_admin_socket()->register_command(
521 "rotate-key",
522 this,
523 "rotate live authentication key");
524
525 return 0;
526 }
527
528 void MonClient::shutdown()
529 {
530 ldout(cct, 10) << __func__ << dendl;
531
532 cct->get_admin_socket()->unregister_commands(this);
533
534 monc_lock.lock();
535 stopping = true;
536 while (!version_requests.empty()) {
537 ceph::async::post(std::move(version_requests.begin()->second),
538 monc_errc::shutting_down, 0, 0);
539 ldout(cct, 20) << __func__ << " canceling and discarding version request "
540 << version_requests.begin()->first << dendl;
541 version_requests.erase(version_requests.begin());
542 }
543 while (!mon_commands.empty()) {
544 auto tid = mon_commands.begin()->first;
545 _cancel_mon_command(tid);
546 }
547 ldout(cct, 20) << __func__ << " discarding " << waiting_for_session.size()
548 << " pending message(s)" << dendl;
549 waiting_for_session.clear();
550
551 active_con.reset();
552 pending_cons.clear();
553
554 auth.reset();
555 global_id = 0;
556 authenticate_err = 0;
557 authenticated = false;
558
559 monc_lock.unlock();
560
561 if (initialized) {
562 initialized = false;
563 }
564 monc_lock.lock();
565 timer.shutdown();
566 stopping = false;
567 monc_lock.unlock();
568 }
569
570 int MonClient::authenticate(double timeout)
571 {
572 std::unique_lock lock{monc_lock};
573
574 if (active_con) {
575 ldout(cct, 5) << "already authenticated" << dendl;
576 return 0;
577 }
578 sub.want("monmap", monmap.get_epoch() ? monmap.get_epoch() + 1 : 0, 0);
579 sub.want("config", 0, 0);
580 if (!_opened())
581 _reopen_session();
582
583 auto until = ceph::mono_clock::now();
584 until += ceph::make_timespan(timeout);
585 if (timeout > 0.0)
586 ldout(cct, 10) << "authenticate will time out at " << until << dendl;
587 while (!active_con && authenticate_err >= 0) {
588 if (timeout > 0.0) {
589 auto r = auth_cond.wait_until(lock, until);
590 if (r == std::cv_status::timeout && !active_con) {
591 ldout(cct, 0) << "authenticate timed out after " << timeout << dendl;
592 authenticate_err = -ETIMEDOUT;
593 }
594 } else {
595 auth_cond.wait(lock);
596 }
597 }
598
599 if (active_con) {
600 ldout(cct, 5) << __func__ << " success, global_id "
601 << active_con->get_global_id() << dendl;
602 // active_con should not have been set if there was an error
603 ceph_assert(authenticate_err >= 0);
604 authenticated = true;
605 }
606
607 if (authenticate_err < 0 && auth_registry.no_keyring_disabled_cephx()) {
608 lderr(cct) << __func__ << " NOTE: no keyring found; disabled cephx authentication" << dendl;
609 }
610
611 return authenticate_err;
612 }
613
614 int MonClient::call(
615 std::string_view command,
616 const cmdmap_t& cmdmap,
617 const ceph::buffer::list &inbl,
618 ceph::Formatter *f,
619 std::ostream& errss,
620 ceph::buffer::list& out)
621 {
622 if (command == "rotate-key") {
623 CryptoKey key;
624 try {
625 key.decode_base64(inbl.to_str());
626 } catch (buffer::error& e) {
627 errss << "error decoding key: " << e.what();
628 return -EINVAL;
629 }
630 if (keyring) {
631 ldout(cct, 1) << "rotate live key for " << entity_name << dendl;
632 keyring->add(entity_name, key);
633 } else {
634 errss << "cephx not enabled; no key to rotate";
635 return -EINVAL;
636 }
637 }
638 return 0;
639 }
640
641 void MonClient::handle_auth(MAuthReply *m)
642 {
643 ceph_assert(ceph_mutex_is_locked(monc_lock));
644
645 if (m->get_connection()->is_anon()) {
646 // anon connection, used for mon tell commands
647 for (auto& p : mon_commands) {
648 if (p.second->target_con == m->get_connection()) {
649 auto& mc = p.second->target_session;
650 int ret = mc->handle_auth(m, entity_name,
651 CEPH_ENTITY_TYPE_MON,
652 rotating_secrets.get());
653 (void)ret; // we don't care
654 break;
655 }
656 }
657 m->put();
658 return;
659 }
660
661 if (!_hunting()) {
662 std::swap(active_con->get_auth(), auth);
663 int ret = active_con->authenticate(m);
664 m->put();
665 std::swap(auth, active_con->get_auth());
666 if (global_id != active_con->get_global_id()) {
667 lderr(cct) << __func__ << " peer assigned me a different global_id: "
668 << active_con->get_global_id() << dendl;
669 }
670 if (ret != -EAGAIN) {
671 _finish_auth(ret);
672 }
673 return;
674 }
675
676 // hunting
677 auto found = _find_pending_con(m->get_connection());
678 ceph_assert(found != pending_cons.end());
679 int auth_err = found->second.handle_auth(m, entity_name, want_keys,
680 rotating_secrets.get());
681 m->put();
682 if (auth_err == -EAGAIN) {
683 return;
684 }
685 if (auth_err) {
686 pending_cons.erase(found);
687 if (!pending_cons.empty()) {
688 // keep trying with pending connections
689 return;
690 }
691 // the last try just failed, give up.
692 } else {
693 auto& mc = found->second;
694 ceph_assert(mc.have_session());
695 active_con.reset(new MonConnection(std::move(mc)));
696 pending_cons.clear();
697 }
698
699 _finish_hunting(auth_err);
700 _finish_auth(auth_err);
701 }
702
703 void MonClient::_finish_auth(int auth_err)
704 {
705 ldout(cct,10) << __func__ << " " << auth_err << dendl;
706 authenticate_err = auth_err;
707 // _resend_mon_commands() could _reopen_session() if the connected mon is not
708 // the one the MonCommand is targeting.
709 if (!auth_err && active_con) {
710 ceph_assert(auth);
711 _check_auth_tickets();
712 }
713 auth_cond.notify_all();
714 }
715
716 // ---------
717
718 void MonClient::send_mon_message(MessageRef m)
719 {
720 std::lock_guard l{monc_lock};
721 _send_mon_message(std::move(m));
722 }
723
724 void MonClient::_send_mon_message(MessageRef m)
725 {
726 ceph_assert(ceph_mutex_is_locked(monc_lock));
727 if (active_con) {
728 auto cur_con = active_con->get_con();
729 ldout(cct, 10) << "_send_mon_message to mon."
730 << monmap.get_name(cur_con->get_peer_addr())
731 << " at " << cur_con->get_peer_addr() << dendl;
732 cur_con->send_message2(std::move(m));
733 } else {
734 waiting_for_session.push_back(std::move(m));
735 }
736 }
737
738 void MonClient::_reopen_session(int rank)
739 {
740 ceph_assert(ceph_mutex_is_locked(monc_lock));
741 ldout(cct, 10) << __func__ << " rank " << rank << dendl;
742
743 active_con.reset();
744 pending_cons.clear();
745
746 authenticate_err = 1; // == in progress
747
748 _start_hunting();
749
750 if (rank >= 0) {
751 _add_conn(rank);
752 } else {
753 _add_conns();
754 }
755
756 // throw out old queued messages
757 waiting_for_session.clear();
758
759 // throw out version check requests
760 while (!version_requests.empty()) {
761 ceph::async::post(std::move(version_requests.begin()->second),
762 monc_errc::session_reset, 0, 0);
763 version_requests.erase(version_requests.begin());
764 }
765
766 for (auto& c : pending_cons) {
767 c.second.start(monmap.get_epoch(), entity_name);
768 }
769
770 if (sub.reload()) {
771 _renew_subs();
772 }
773 }
774
775 void MonClient::_add_conn(unsigned rank)
776 {
777 auto peer = monmap.get_addrs(rank);
778 auto conn = messenger->connect_to_mon(peer);
779 MonConnection mc(cct, conn, global_id, &auth_registry);
780 if (auth) {
781 mc.get_auth().reset(auth->clone());
782 }
783 pending_cons.insert(std::make_pair(peer, std::move(mc)));
784 ldout(cct, 10) << "picked mon." << monmap.get_name(rank)
785 << " con " << conn
786 << " addr " << peer
787 << dendl;
788 }
789
790 void MonClient::_add_conns()
791 {
792 // collect the next batch of candidates who are listed right next to the ones
793 // already tried
794 auto get_next_batch = [this]() -> std::vector<unsigned> {
795 std::multimap<uint16_t, unsigned> ranks_by_priority;
796 boost::copy(
797 monmap.mon_info | boost::adaptors::filtered(
798 [this](auto& info) {
799 auto rank = monmap.get_rank(info.first);
800 return tried.count(rank) == 0;
801 }) | boost::adaptors::transformed(
802 [this](auto& info) {
803 auto rank = monmap.get_rank(info.first);
804 return std::make_pair(info.second.priority, rank);
805 }), std::inserter(ranks_by_priority, end(ranks_by_priority)));
806 if (ranks_by_priority.empty()) {
807 return {};
808 }
809 // only choose the monitors with lowest priority
810 auto cands = boost::make_iterator_range(
811 ranks_by_priority.equal_range(ranks_by_priority.begin()->first));
812 std::vector<unsigned> ranks;
813 boost::range::copy(cands | boost::adaptors::map_values,
814 std::back_inserter(ranks));
815 return ranks;
816 };
817 auto ranks = get_next_batch();
818 if (ranks.empty()) {
819 tried.clear(); // start over
820 ranks = get_next_batch();
821 }
822 ceph_assert(!ranks.empty());
823 if (ranks.size() > 1) {
824 std::vector<uint16_t> weights;
825 for (auto i : ranks) {
826 auto rank_name = monmap.get_name(i);
827 weights.push_back(monmap.get_weight(rank_name));
828 }
829 random_device_t rd;
830 if (std::accumulate(begin(weights), end(weights), 0u) == 0) {
831 std::shuffle(begin(ranks), end(ranks), std::mt19937{rd()});
832 } else {
833 weighted_shuffle(begin(ranks), end(ranks), begin(weights), end(weights),
834 std::mt19937{rd()});
835 }
836 }
837 ldout(cct, 10) << __func__ << " ranks=" << ranks << dendl;
838 unsigned n = cct->_conf->mon_client_hunt_parallel;
839 if (n == 0 || n > ranks.size()) {
840 n = ranks.size();
841 }
842 for (unsigned i = 0; i < n; i++) {
843 _add_conn(ranks[i]);
844 tried.insert(ranks[i]);
845 }
846 }
847
848 bool MonClient::ms_handle_reset(Connection *con)
849 {
850 std::lock_guard lock(monc_lock);
851
852 if (con->get_peer_type() != CEPH_ENTITY_TYPE_MON)
853 return false;
854
855 if (con->is_anon()) {
856 auto p = mon_commands.begin();
857 while (p != mon_commands.end()) {
858 auto cmd = p->second;
859 ++p;
860 if (cmd->target_con == con) {
861 _send_command(cmd); // may retry or fail
862 break;
863 }
864 }
865 return true;
866 }
867
868 if (_hunting()) {
869 if (pending_cons.count(con->get_peer_addrs())) {
870 ldout(cct, 10) << __func__ << " hunted mon " << con->get_peer_addrs()
871 << dendl;
872 } else {
873 ldout(cct, 10) << __func__ << " stray mon " << con->get_peer_addrs()
874 << dendl;
875 }
876 return true;
877 } else {
878 if (active_con && con == active_con->get_con()) {
879 ldout(cct, 10) << __func__ << " current mon " << con->get_peer_addrs()
880 << dendl;
881 _reopen_session();
882 return false;
883 } else {
884 ldout(cct, 10) << "ms_handle_reset stray mon " << con->get_peer_addrs()
885 << dendl;
886 return true;
887 }
888 }
889 }
890
891 bool MonClient::_opened() const
892 {
893 ceph_assert(ceph_mutex_is_locked(monc_lock));
894 return active_con || _hunting();
895 }
896
897 bool MonClient::_hunting() const
898 {
899 return !pending_cons.empty();
900 }
901
902 void MonClient::_start_hunting()
903 {
904 ceph_assert(!_hunting());
905 // adjust timeouts if necessary
906 if (!had_a_connection)
907 return;
908 reopen_interval_multiplier *= cct->_conf->mon_client_hunt_interval_backoff;
909 if (reopen_interval_multiplier >
910 cct->_conf->mon_client_hunt_interval_max_multiple) {
911 reopen_interval_multiplier =
912 cct->_conf->mon_client_hunt_interval_max_multiple;
913 }
914 }
915
916 void MonClient::_finish_hunting(int auth_err)
917 {
918 ldout(cct,10) << __func__ << " " << auth_err << dendl;
919 ceph_assert(ceph_mutex_is_locked(monc_lock));
920 // the pending conns have been cleaned.
921 ceph_assert(!_hunting());
922 if (active_con) {
923 auto con = active_con->get_con();
924 ldout(cct, 1) << "found mon."
925 << monmap.get_name(con->get_peer_addr())
926 << dendl;
927 } else {
928 ldout(cct, 1) << "no mon sessions established" << dendl;
929 }
930
931 had_a_connection = true;
932 _un_backoff();
933
934 if (!auth_err) {
935 last_rotating_renew_sent = utime_t();
936 while (!waiting_for_session.empty()) {
937 _send_mon_message(std::move(waiting_for_session.front()));
938 waiting_for_session.pop_front();
939 }
940 _resend_mon_commands();
941 send_log(true);
942 if (active_con) {
943 auth = std::move(active_con->get_auth());
944 if (global_id && global_id != active_con->get_global_id()) {
945 lderr(cct) << __func__ << " global_id changed from " << global_id
946 << " to " << active_con->get_global_id() << dendl;
947 }
948 global_id = active_con->get_global_id();
949 }
950 }
951 }
952
953 void MonClient::tick()
954 {
955 ldout(cct, 10) << __func__ << dendl;
956
957 utime_t now = ceph_clock_now();
958
959 auto reschedule_tick = make_scope_guard([this] {
960 schedule_tick();
961 });
962
963 _check_auth_tickets();
964 _check_tell_commands();
965
966 if (_hunting()) {
967 ldout(cct, 1) << "continuing hunt" << dendl;
968 return _reopen_session();
969 } else if (active_con) {
970 // just renew as needed
971 auto cur_con = active_con->get_con();
972 if (!cur_con->has_feature(CEPH_FEATURE_MON_STATEFUL_SUB)) {
973 const bool maybe_renew = sub.need_renew();
974 ldout(cct, 10) << "renew subs? -- " << (maybe_renew ? "yes" : "no")
975 << dendl;
976 if (maybe_renew) {
977 _renew_subs();
978 }
979 }
980
981 if (now > last_keepalive + cct->_conf->mon_client_ping_interval) {
982 cur_con->send_keepalive();
983 last_keepalive = now;
984
985 if (cct->_conf->mon_client_ping_timeout > 0 &&
986 cur_con->has_feature(CEPH_FEATURE_MSGR_KEEPALIVE2)) {
987 utime_t lk = cur_con->get_last_keepalive_ack();
988 utime_t interval = now - lk;
989 if (interval > cct->_conf->mon_client_ping_timeout) {
990 ldout(cct, 1) << "no keepalive since " << lk << " (" << interval
991 << " seconds), reconnecting" << dendl;
992 return _reopen_session();
993 }
994 }
995
996 _un_backoff();
997 }
998
999 if (now > last_send_log + cct->_conf->mon_client_log_interval) {
1000 send_log();
1001 last_send_log = now;
1002 }
1003 }
1004 }
1005
1006 void MonClient::_un_backoff()
1007 {
1008 // un-backoff our reconnect interval
1009 reopen_interval_multiplier = std::max(
1010 cct->_conf.get_val<double>("mon_client_hunt_interval_min_multiple"),
1011 reopen_interval_multiplier /
1012 cct->_conf.get_val<double>("mon_client_hunt_interval_backoff"));
1013 ldout(cct, 20) << __func__ << " reopen_interval_multipler now "
1014 << reopen_interval_multiplier << dendl;
1015 }
1016
1017 void MonClient::schedule_tick()
1018 {
1019 auto do_tick = make_lambda_context([this](int) { tick(); });
1020 if (!is_connected()) {
1021 // start another round of hunting
1022 const auto hunt_interval = (cct->_conf->mon_client_hunt_interval *
1023 reopen_interval_multiplier);
1024 timer.add_event_after(hunt_interval, do_tick);
1025 } else {
1026 // keep in touch
1027 timer.add_event_after(std::min(cct->_conf->mon_client_ping_interval,
1028 cct->_conf->mon_client_log_interval),
1029 do_tick);
1030 }
1031 }
1032
1033 // ---------
1034
1035 void MonClient::_renew_subs()
1036 {
1037 ceph_assert(ceph_mutex_is_locked(monc_lock));
1038 if (!sub.have_new()) {
1039 ldout(cct, 10) << __func__ << " - empty" << dendl;
1040 return;
1041 }
1042
1043 ldout(cct, 10) << __func__ << dendl;
1044 if (!_opened())
1045 _reopen_session();
1046 else {
1047 auto m = ceph::make_message<MMonSubscribe>();
1048 m->what = sub.get_subs();
1049 m->hostname = ceph_get_short_hostname();
1050 _send_mon_message(std::move(m));
1051 sub.renewed();
1052 }
1053 }
1054
1055 void MonClient::handle_subscribe_ack(MMonSubscribeAck *m)
1056 {
1057 sub.acked(m->interval);
1058 m->put();
1059 }
1060
1061 int MonClient::_check_auth_tickets()
1062 {
1063 ldout(cct, 10) << __func__ << dendl;
1064 ceph_assert(ceph_mutex_is_locked(monc_lock));
1065 if (active_con && auth) {
1066 if (auth->need_tickets()) {
1067 ldout(cct, 10) << __func__ << " getting new tickets!" << dendl;
1068 auto m = ceph::make_message<MAuth>();
1069 m->protocol = auth->get_protocol();
1070 auth->prepare_build_request();
1071 auth->build_request(m->auth_payload);
1072 _send_mon_message(m);
1073 }
1074
1075 _check_auth_rotating();
1076 }
1077 return 0;
1078 }
1079
1080 int MonClient::_check_auth_rotating()
1081 {
1082 ceph_assert(ceph_mutex_is_locked(monc_lock));
1083 if (!rotating_secrets ||
1084 !auth_principal_needs_rotating_keys(entity_name)) {
1085 ldout(cct, 20) << "_check_auth_rotating not needed by " << entity_name << dendl;
1086 return 0;
1087 }
1088
1089 if (!active_con || !auth) {
1090 ldout(cct, 10) << "_check_auth_rotating waiting for auth session" << dendl;
1091 return 0;
1092 }
1093
1094 utime_t now = ceph_clock_now();
1095 utime_t cutoff = now;
1096 cutoff -= std::min(30.0, cct->_conf->auth_service_ticket_ttl / 4.0);
1097 utime_t issued_at_lower_bound = now;
1098 issued_at_lower_bound -= cct->_conf->auth_service_ticket_ttl;
1099 if (!rotating_secrets->need_new_secrets(cutoff)) {
1100 ldout(cct, 10) << "_check_auth_rotating have uptodate secrets (they expire after " << cutoff << ")" << dendl;
1101 rotating_secrets->dump_rotating();
1102 return 0;
1103 }
1104
1105 ldout(cct, 10) << "_check_auth_rotating renewing rotating keys (they expired before " << cutoff << ")" << dendl;
1106 if (!rotating_secrets->need_new_secrets() &&
1107 rotating_secrets->need_new_secrets(issued_at_lower_bound)) {
1108 // the key has expired before it has been issued?
1109 lderr(cct) << __func__ << " possible clock skew, rotating keys expired way too early"
1110 << " (before " << issued_at_lower_bound << ")" << dendl;
1111 }
1112 if ((now > last_rotating_renew_sent) &&
1113 double(now - last_rotating_renew_sent) < 1) {
1114 ldout(cct, 10) << __func__ << " called too often (last: "
1115 << last_rotating_renew_sent << "), skipping refresh" << dendl;
1116 return 0;
1117 }
1118 auto m = ceph::make_message<MAuth>();
1119 m->protocol = auth->get_protocol();
1120 if (auth->build_rotating_request(m->auth_payload)) {
1121 last_rotating_renew_sent = now;
1122 _send_mon_message(std::move(m));
1123 }
1124 return 0;
1125 }
1126
1127 int MonClient::wait_auth_rotating(double timeout)
1128 {
1129 std::unique_lock l(monc_lock);
1130
1131 // Must be initialized
1132 ceph_assert(auth != nullptr);
1133
1134 if (auth->get_protocol() == CEPH_AUTH_NONE)
1135 return 0;
1136
1137 if (!rotating_secrets)
1138 return 0;
1139
1140 ldout(cct, 10) << __func__ << " waiting for " << timeout << dendl;
1141 utime_t cutoff = ceph_clock_now();
1142 cutoff -= std::min(30.0, cct->_conf->auth_service_ticket_ttl / 4.0);
1143 if (auth_cond.wait_for(l, ceph::make_timespan(timeout), [this, cutoff] {
1144 return (!auth_principal_needs_rotating_keys(entity_name) ||
1145 !rotating_secrets->need_new_secrets(cutoff));
1146 })) {
1147 ldout(cct, 10) << __func__ << " done" << dendl;
1148 return 0;
1149 } else {
1150 ldout(cct, 0) << __func__ << " timed out after " << timeout << dendl;
1151 return -ETIMEDOUT;
1152 }
1153 }
1154
1155 // ---------
1156
1157 void MonClient::_send_command(MonCommand *r)
1158 {
1159 if (r->is_tell()) {
1160 ++r->send_attempts;
1161 if (r->send_attempts > cct->_conf->mon_client_directed_command_retry) {
1162 _finish_command(r, monc_errc::mon_unavailable, "mon unavailable", {});
1163 return;
1164 }
1165 // tell-style command
1166 if (monmap.min_mon_release >= ceph_release_t::octopus) {
1167 if (r->target_con) {
1168 r->target_con->mark_down();
1169 }
1170 if (r->target_rank >= 0) {
1171 if (r->target_rank >= (int)monmap.size()) {
1172 ldout(cct, 10) << " target " << r->target_rank
1173 << " >= max mon " << monmap.size() << dendl;
1174 _finish_command(r, monc_errc::rank_dne, "mon rank dne"sv, {});
1175 return;
1176 }
1177 r->target_con = messenger->connect_to_mon(
1178 monmap.get_addrs(r->target_rank), true /* anon */);
1179 } else {
1180 if (!monmap.contains(r->target_name)) {
1181 ldout(cct, 10) << " target " << r->target_name
1182 << " not present in monmap" << dendl;
1183 _finish_command(r, monc_errc::mon_dne, "mon dne"sv, {});
1184 return;
1185 }
1186 r->target_con = messenger->connect_to_mon(
1187 monmap.get_addrs(r->target_name), true /* anon */);
1188 }
1189
1190 r->target_session.reset(new MonConnection(cct, r->target_con, 0,
1191 &auth_registry));
1192 r->target_session->start(monmap.get_epoch(), entity_name);
1193 r->last_send_attempt = ceph_clock_now();
1194
1195 MCommand *m = new MCommand(monmap.fsid);
1196 m->set_tid(r->tid);
1197 m->cmd = r->cmd;
1198 m->set_data(r->inbl);
1199 r->target_session->queue_command(m);
1200 return;
1201 }
1202
1203 // ugly legacy handling of pre-octopus mons
1204 entity_addr_t peer;
1205 if (active_con) {
1206 peer = active_con->get_con()->get_peer_addr();
1207 }
1208
1209 if (r->target_rank >= 0 &&
1210 r->target_rank != monmap.get_rank(peer)) {
1211 ldout(cct, 10) << __func__ << " " << r->tid << " " << r->cmd
1212 << " wants rank " << r->target_rank
1213 << ", reopening session"
1214 << dendl;
1215 if (r->target_rank >= (int)monmap.size()) {
1216 ldout(cct, 10) << " target " << r->target_rank
1217 << " >= max mon " << monmap.size() << dendl;
1218 _finish_command(r, monc_errc::rank_dne, "mon rank dne"sv, {});
1219 return;
1220 }
1221 _reopen_session(r->target_rank);
1222 return;
1223 }
1224 if (r->target_name.length() &&
1225 r->target_name != monmap.get_name(peer)) {
1226 ldout(cct, 10) << __func__ << " " << r->tid << " " << r->cmd
1227 << " wants mon " << r->target_name
1228 << ", reopening session"
1229 << dendl;
1230 if (!monmap.contains(r->target_name)) {
1231 ldout(cct, 10) << " target " << r->target_name
1232 << " not present in monmap" << dendl;
1233 _finish_command(r, monc_errc::mon_dne, "mon dne"sv, {});
1234 return;
1235 }
1236 _reopen_session(monmap.get_rank(r->target_name));
1237 return;
1238 }
1239 // fall-thru to send 'normal' CLI command
1240 }
1241
1242 // normal CLI command
1243 ldout(cct, 10) << __func__ << " " << r->tid << " " << r->cmd << dendl;
1244 auto m = ceph::make_message<MMonCommand>(monmap.fsid);
1245 m->set_tid(r->tid);
1246 m->cmd = r->cmd;
1247 m->set_data(r->inbl);
1248 _send_mon_message(std::move(m));
1249 return;
1250 }
1251
1252 void MonClient::_check_tell_commands()
1253 {
1254 // resend any requests
1255 auto now = ceph_clock_now();
1256 auto p = mon_commands.begin();
1257 while (p != mon_commands.end()) {
1258 auto cmd = p->second;
1259 ++p;
1260 if (cmd->is_tell() &&
1261 cmd->last_send_attempt != utime_t() &&
1262 now - cmd->last_send_attempt > cct->_conf->mon_client_hunt_interval) {
1263 ldout(cct,5) << __func__ << " timeout tell command " << cmd->tid << dendl;
1264 _send_command(cmd); // might remove cmd from mon_commands
1265 }
1266 }
1267 }
1268
1269 void MonClient::_resend_mon_commands()
1270 {
1271 // resend any requests
1272 auto p = mon_commands.begin();
1273 while (p != mon_commands.end()) {
1274 auto cmd = p->second;
1275 ++p;
1276 if (cmd->is_tell() && monmap.min_mon_release >= ceph_release_t::octopus) {
1277 // starting with octopus, tell commands use their own connetion and need no
1278 // special resend when we finish hunting.
1279 } else {
1280 _send_command(cmd); // might remove cmd from mon_commands
1281 }
1282 }
1283 }
1284
1285 void MonClient::handle_mon_command_ack(MMonCommandAck *ack)
1286 {
1287 MonCommand *r = NULL;
1288 uint64_t tid = ack->get_tid();
1289
1290 if (tid == 0 && !mon_commands.empty()) {
1291 r = mon_commands.begin()->second;
1292 ldout(cct, 10) << __func__ << " has tid 0, assuming it is " << r->tid << dendl;
1293 } else {
1294 auto p = mon_commands.find(tid);
1295 if (p == mon_commands.end()) {
1296 ldout(cct, 10) << __func__ << " " << ack->get_tid() << " not found" << dendl;
1297 ack->put();
1298 return;
1299 }
1300 r = p->second;
1301 }
1302
1303 ldout(cct, 10) << __func__ << " " << r->tid << " " << r->cmd << dendl;
1304 auto ec = ack->r < 0 ? bs::error_code(-ack->r, mon_category())
1305 : bs::error_code();
1306 _finish_command(r, ec, ack->rs,
1307 std::move(ack->get_data()));
1308 ack->put();
1309 }
1310
1311 void MonClient::handle_command_reply(MCommandReply *reply)
1312 {
1313 MonCommand *r = NULL;
1314 uint64_t tid = reply->get_tid();
1315
1316 if (tid == 0 && !mon_commands.empty()) {
1317 r = mon_commands.begin()->second;
1318 ldout(cct, 10) << __func__ << " has tid 0, assuming it is " << r->tid
1319 << dendl;
1320 } else {
1321 auto p = mon_commands.find(tid);
1322 if (p == mon_commands.end()) {
1323 ldout(cct, 10) << __func__ << " " << reply->get_tid() << " not found"
1324 << dendl;
1325 reply->put();
1326 return;
1327 }
1328 r = p->second;
1329 }
1330
1331 ldout(cct, 10) << __func__ << " " << r->tid << " " << r->cmd << dendl;
1332 auto ec = reply->r < 0 ? bs::error_code(-reply->r, mon_category())
1333 : bs::error_code();
1334 _finish_command(r, ec, reply->rs, std::move(reply->get_data()));
1335 reply->put();
1336 }
1337
1338 int MonClient::_cancel_mon_command(uint64_t tid)
1339 {
1340 ceph_assert(ceph_mutex_is_locked(monc_lock));
1341
1342 auto it = mon_commands.find(tid);
1343 if (it == mon_commands.end()) {
1344 ldout(cct, 10) << __func__ << " tid " << tid << " dne" << dendl;
1345 return -ENOENT;
1346 }
1347
1348 ldout(cct, 10) << __func__ << " tid " << tid << dendl;
1349
1350 MonCommand *cmd = it->second;
1351 _finish_command(cmd, monc_errc::timed_out, "timed out"sv, {});
1352 return 0;
1353 }
1354
1355 void MonClient::_finish_command(MonCommand *r, bs::error_code ret,
1356 std::string_view rs, ceph::buffer::list&& bl)
1357 {
1358 ldout(cct, 10) << __func__ << " " << r->tid << " = " << ret << " " << rs
1359 << dendl;
1360 ceph::async::post(std::move(r->onfinish), ret, std::string(rs),
1361 std::move(bl));
1362 if (r->target_con) {
1363 r->target_con->mark_down();
1364 }
1365 mon_commands.erase(r->tid);
1366 delete r;
1367 }
1368
1369 // ---------
1370
1371 void MonClient::handle_get_version_reply(MMonGetVersionReply* m)
1372 {
1373 ceph_assert(ceph_mutex_is_locked(monc_lock));
1374 auto iter = version_requests.find(m->handle);
1375 if (iter == version_requests.end()) {
1376 ldout(cct, 0) << __func__ << " version request with handle " << m->handle
1377 << " not found" << dendl;
1378 } else {
1379 auto req = std::move(iter->second);
1380 ldout(cct, 10) << __func__ << " finishing " << iter->first << " version "
1381 << m->version << dendl;
1382 version_requests.erase(iter);
1383 ceph::async::post(std::move(req), bs::error_code(),
1384 m->version, m->oldest_version);
1385 }
1386 m->put();
1387 }
1388
1389 int MonClient::get_auth_request(
1390 Connection *con,
1391 AuthConnectionMeta *auth_meta,
1392 uint32_t *auth_method,
1393 std::vector<uint32_t> *preferred_modes,
1394 ceph::buffer::list *bl)
1395 {
1396 std::lock_guard l(monc_lock);
1397 ldout(cct,10) << __func__ << " con " << con << " auth_method " << *auth_method
1398 << dendl;
1399
1400 // connection to mon?
1401 if (con->get_peer_type() == CEPH_ENTITY_TYPE_MON) {
1402 ceph_assert(!auth_meta->authorizer);
1403 if (con->is_anon()) {
1404 for (auto& i : mon_commands) {
1405 if (i.second->target_con == con) {
1406 return i.second->target_session->get_auth_request(
1407 auth_method, preferred_modes, bl,
1408 entity_name, want_keys, rotating_secrets.get());
1409 }
1410 }
1411 }
1412 for (auto& i : pending_cons) {
1413 if (i.second.is_con(con)) {
1414 return i.second.get_auth_request(
1415 auth_method, preferred_modes, bl,
1416 entity_name, want_keys, rotating_secrets.get());
1417 }
1418 }
1419 return -ENOENT;
1420 }
1421
1422 // generate authorizer
1423 if (!auth) {
1424 lderr(cct) << __func__ << " but no auth handler is set up" << dendl;
1425 return -EACCES;
1426 }
1427 auth_meta->authorizer.reset(auth->build_authorizer(con->get_peer_type()));
1428 if (!auth_meta->authorizer) {
1429 lderr(cct) << __func__ << " failed to build_authorizer for type "
1430 << ceph_entity_type_name(con->get_peer_type()) << dendl;
1431 return -EACCES;
1432 }
1433 auth_meta->auth_method = auth_meta->authorizer->protocol;
1434 auth_registry.get_supported_modes(con->get_peer_type(),
1435 auth_meta->auth_method,
1436 preferred_modes);
1437 *bl = auth_meta->authorizer->bl;
1438 return 0;
1439 }
1440
1441 int MonClient::handle_auth_reply_more(
1442 Connection *con,
1443 AuthConnectionMeta *auth_meta,
1444 const ceph::buffer::list& bl,
1445 ceph::buffer::list *reply)
1446 {
1447 std::lock_guard l(monc_lock);
1448
1449 if (con->get_peer_type() == CEPH_ENTITY_TYPE_MON) {
1450 if (con->is_anon()) {
1451 for (auto& i : mon_commands) {
1452 if (i.second->target_con == con) {
1453 return i.second->target_session->handle_auth_reply_more(
1454 auth_meta, bl, reply);
1455 }
1456 }
1457 }
1458 for (auto& i : pending_cons) {
1459 if (i.second.is_con(con)) {
1460 return i.second.handle_auth_reply_more(auth_meta, bl, reply);
1461 }
1462 }
1463 return -ENOENT;
1464 }
1465
1466 // authorizer challenges
1467 if (!auth || !auth_meta->authorizer) {
1468 lderr(cct) << __func__ << " no authorizer?" << dendl;
1469 return -1;
1470 }
1471 auth_meta->authorizer->add_challenge(cct, bl);
1472 *reply = auth_meta->authorizer->bl;
1473 return 0;
1474 }
1475
1476 int MonClient::handle_auth_done(
1477 Connection *con,
1478 AuthConnectionMeta *auth_meta,
1479 uint64_t global_id,
1480 uint32_t con_mode,
1481 const ceph::buffer::list& bl,
1482 CryptoKey *session_key,
1483 std::string *connection_secret)
1484 {
1485 if (con->get_peer_type() == CEPH_ENTITY_TYPE_MON) {
1486 std::lock_guard l(monc_lock);
1487 if (con->is_anon()) {
1488 for (auto& i : mon_commands) {
1489 if (i.second->target_con == con) {
1490 return i.second->target_session->handle_auth_done(
1491 auth_meta, global_id, bl,
1492 session_key, connection_secret);
1493 }
1494 }
1495 }
1496 for (auto& i : pending_cons) {
1497 if (i.second.is_con(con)) {
1498 int r = i.second.handle_auth_done(
1499 auth_meta, global_id, bl,
1500 session_key, connection_secret);
1501 if (r) {
1502 pending_cons.erase(i.first);
1503 if (!pending_cons.empty()) {
1504 return r;
1505 }
1506 } else {
1507 active_con.reset(new MonConnection(std::move(i.second)));
1508 pending_cons.clear();
1509 ceph_assert(active_con->have_session());
1510 }
1511
1512 _finish_hunting(r);
1513 if (r || monmap.get_epoch() > 0) {
1514 _finish_auth(r);
1515 }
1516 return r;
1517 }
1518 }
1519 return -ENOENT;
1520 } else {
1521 // verify authorizer reply
1522 auto p = bl.begin();
1523 if (!auth_meta->authorizer->verify_reply(p, &auth_meta->connection_secret)) {
1524 ldout(cct, 0) << __func__ << " failed verifying authorizer reply"
1525 << dendl;
1526 return -EACCES;
1527 }
1528 auth_meta->session_key = auth_meta->authorizer->session_key;
1529 return 0;
1530 }
1531 }
1532
1533 int MonClient::handle_auth_bad_method(
1534 Connection *con,
1535 AuthConnectionMeta *auth_meta,
1536 uint32_t old_auth_method,
1537 int result,
1538 const std::vector<uint32_t>& allowed_methods,
1539 const std::vector<uint32_t>& allowed_modes)
1540 {
1541 auth_meta->allowed_methods = allowed_methods;
1542
1543 std::lock_guard l(monc_lock);
1544 if (con->get_peer_type() == CEPH_ENTITY_TYPE_MON) {
1545 if (con->is_anon()) {
1546 for (auto& i : mon_commands) {
1547 if (i.second->target_con == con) {
1548 int r = i.second->target_session->handle_auth_bad_method(
1549 old_auth_method,
1550 result,
1551 allowed_methods,
1552 allowed_modes);
1553 if (r < 0) {
1554 auto ec = bs::error_code(-r, mon_category());
1555 _finish_command(i.second, ec, "auth failed"sv, {});
1556 }
1557 return r;
1558 }
1559 }
1560 }
1561 for (auto& i : pending_cons) {
1562 if (i.second.is_con(con)) {
1563 int r = i.second.handle_auth_bad_method(old_auth_method,
1564 result,
1565 allowed_methods,
1566 allowed_modes);
1567 if (r == 0) {
1568 return r; // try another method on this con
1569 }
1570 pending_cons.erase(i.first);
1571 if (!pending_cons.empty()) {
1572 return r; // fail this con, maybe another con will succeed
1573 }
1574 // fail hunt
1575 _finish_hunting(r);
1576 _finish_auth(r);
1577 return r;
1578 }
1579 }
1580 return -ENOENT;
1581 } else {
1582 // huh...
1583 ldout(cct,10) << __func__ << " hmm, they didn't like " << old_auth_method
1584 << " result " << cpp_strerror(result)
1585 << " and auth is " << (auth ? auth->get_protocol() : 0)
1586 << dendl;
1587 return -EACCES;
1588 }
1589 }
1590
1591 int MonClient::handle_auth_request(
1592 Connection *con,
1593 AuthConnectionMeta *auth_meta,
1594 bool more,
1595 uint32_t auth_method,
1596 const ceph::buffer::list& payload,
1597 ceph::buffer::list *reply)
1598 {
1599 if (payload.length() == 0) {
1600 // for some channels prior to nautilus (osd heartbeat), we
1601 // tolerate the lack of an authorizer.
1602 if (!con->get_messenger()->require_authorizer) {
1603 handle_authentication_dispatcher->ms_handle_authentication(con);
1604 return 1;
1605 }
1606 return -EACCES;
1607 }
1608 auth_meta->auth_mode = payload[0];
1609 if (auth_meta->auth_mode < AUTH_MODE_AUTHORIZER ||
1610 auth_meta->auth_mode > AUTH_MODE_AUTHORIZER_MAX) {
1611 return -EACCES;
1612 }
1613 AuthAuthorizeHandler *ah = get_auth_authorize_handler(con->get_peer_type(),
1614 auth_method);
1615 if (!ah) {
1616 lderr(cct) << __func__ << " no AuthAuthorizeHandler found for auth method "
1617 << auth_method << dendl;
1618 return -EOPNOTSUPP;
1619 }
1620
1621 auto ac = &auth_meta->authorizer_challenge;
1622 if (auth_meta->skip_authorizer_challenge) {
1623 ldout(cct, 10) << __func__ << " skipping challenge on " << con << dendl;
1624 ac = nullptr;
1625 }
1626
1627 bool was_challenge = (bool)auth_meta->authorizer_challenge;
1628 bool isvalid = ah->verify_authorizer(
1629 cct,
1630 *rotating_secrets,
1631 payload,
1632 auth_meta->get_connection_secret_length(),
1633 reply,
1634 &con->peer_name,
1635 &con->peer_global_id,
1636 &con->peer_caps_info,
1637 &auth_meta->session_key,
1638 &auth_meta->connection_secret,
1639 ac);
1640 if (isvalid) {
1641 handle_authentication_dispatcher->ms_handle_authentication(con);
1642 return 1;
1643 }
1644 if (!more && !was_challenge && auth_meta->authorizer_challenge) {
1645 ldout(cct,10) << __func__ << " added challenge on " << con << dendl;
1646 return 0;
1647 }
1648 ldout(cct,10) << __func__ << " bad authorizer on " << con << dendl;
1649 // discard old challenge
1650 auth_meta->authorizer_challenge.reset();
1651 return -EACCES;
1652 }
1653
1654 AuthAuthorizer* MonClient::build_authorizer(int service_id) const {
1655 std::lock_guard l(monc_lock);
1656 if (auth) {
1657 return auth->build_authorizer(service_id);
1658 } else {
1659 ldout(cct, 0) << __func__ << " for " << ceph_entity_type_name(service_id)
1660 << ", but no auth is available now" << dendl;
1661 return nullptr;
1662 }
1663 }
1664
1665 #define dout_subsys ceph_subsys_monc
1666 #undef dout_prefix
1667 #define dout_prefix *_dout << "monclient" << (have_session() ? ": " : "(hunting): ")
1668
1669 MonConnection::MonConnection(
1670 CephContext *cct, ConnectionRef con, uint64_t global_id,
1671 AuthRegistry *ar)
1672 : cct(cct), con(con), global_id(global_id), auth_registry(ar)
1673 {}
1674
1675 MonConnection::~MonConnection()
1676 {
1677 if (con) {
1678 con->mark_down();
1679 con.reset();
1680 }
1681 }
1682
1683 bool MonConnection::have_session() const
1684 {
1685 return state == State::HAVE_SESSION;
1686 }
1687
1688 void MonConnection::start(epoch_t epoch,
1689 const EntityName& entity_name)
1690 {
1691 using ceph::encode;
1692 auth_start = ceph_clock_now();
1693
1694 if (con->get_peer_addr().is_msgr2()) {
1695 ldout(cct, 10) << __func__ << " opening mon connection" << dendl;
1696 state = State::AUTHENTICATING;
1697 con->send_message(new MMonGetMap());
1698 return;
1699 }
1700
1701 // restart authentication handshake
1702 state = State::NEGOTIATING;
1703
1704 // send an initial keepalive to ensure our timestamp is valid by the
1705 // time we are in an OPENED state (by sequencing this before
1706 // authentication).
1707 con->send_keepalive();
1708
1709 auto m = new MAuth;
1710 m->protocol = CEPH_AUTH_UNKNOWN;
1711 m->monmap_epoch = epoch;
1712 __u8 struct_v = 1;
1713 encode(struct_v, m->auth_payload);
1714 std::vector<uint32_t> auth_supported;
1715 auth_registry->get_supported_methods(con->get_peer_type(), &auth_supported);
1716 encode(auth_supported, m->auth_payload);
1717 encode(entity_name, m->auth_payload);
1718 encode(global_id, m->auth_payload);
1719 con->send_message(m);
1720 }
1721
1722 int MonConnection::get_auth_request(
1723 uint32_t *method,
1724 std::vector<uint32_t> *preferred_modes,
1725 ceph::buffer::list *bl,
1726 const EntityName& entity_name,
1727 uint32_t want_keys,
1728 RotatingKeyRing* keyring)
1729 {
1730 using ceph::encode;
1731 // choose method
1732 if (auth_method < 0) {
1733 std::vector<uint32_t> as;
1734 auth_registry->get_supported_methods(con->get_peer_type(), &as);
1735 if (as.empty()) {
1736 return -EACCES;
1737 }
1738 auth_method = as.front();
1739 }
1740 *method = auth_method;
1741 auth_registry->get_supported_modes(con->get_peer_type(), auth_method,
1742 preferred_modes);
1743 ldout(cct,10) << __func__ << " method " << *method
1744 << " preferred_modes " << *preferred_modes << dendl;
1745 if (preferred_modes->empty()) {
1746 return -EACCES;
1747 }
1748
1749 int r = _init_auth(*method, entity_name, want_keys, keyring, true);
1750 ceph_assert(r == 0);
1751
1752 // initial requset includes some boilerplate...
1753 encode((char)AUTH_MODE_MON, *bl);
1754 encode(entity_name, *bl);
1755 encode(global_id, *bl);
1756
1757 // and (maybe) some method-specific initial payload
1758 auth->build_initial_request(bl);
1759
1760 return 0;
1761 }
1762
1763 int MonConnection::handle_auth_reply_more(
1764 AuthConnectionMeta *auth_meta,
1765 const ceph::buffer::list& bl,
1766 ceph::buffer::list *reply)
1767 {
1768 ldout(cct, 10) << __func__ << " payload " << bl.length() << dendl;
1769 ldout(cct, 30) << __func__ << " got\n";
1770 bl.hexdump(*_dout);
1771 *_dout << dendl;
1772
1773 auto p = bl.cbegin();
1774 ldout(cct, 10) << __func__ << " payload_len " << bl.length() << dendl;
1775 int r = auth->handle_response(0, p, &auth_meta->session_key,
1776 &auth_meta->connection_secret);
1777 if (r == -EAGAIN) {
1778 auth->prepare_build_request();
1779 auth->build_request(*reply);
1780 ldout(cct, 10) << __func__ << " responding with " << reply->length()
1781 << " bytes" << dendl;
1782 r = 0;
1783 } else if (r < 0) {
1784 lderr(cct) << __func__ << " handle_response returned " << r << dendl;
1785 } else {
1786 ldout(cct, 10) << __func__ << " authenticated!" << dendl;
1787 // FIXME
1788 ceph_abort(cct, "write me");
1789 }
1790 return r;
1791 }
1792
1793 int MonConnection::handle_auth_done(
1794 AuthConnectionMeta *auth_meta,
1795 uint64_t new_global_id,
1796 const ceph::buffer::list& bl,
1797 CryptoKey *session_key,
1798 std::string *connection_secret)
1799 {
1800 ldout(cct,10) << __func__ << " global_id " << new_global_id
1801 << " payload " << bl.length()
1802 << dendl;
1803 global_id = new_global_id;
1804 auth->set_global_id(global_id);
1805 auto p = bl.begin();
1806 int auth_err = auth->handle_response(0, p, &auth_meta->session_key,
1807 &auth_meta->connection_secret);
1808 if (auth_err >= 0) {
1809 state = State::HAVE_SESSION;
1810 }
1811 con->set_last_keepalive_ack(auth_start);
1812
1813 if (pending_tell_command) {
1814 con->send_message2(std::move(pending_tell_command));
1815 }
1816 return auth_err;
1817 }
1818
1819 int MonConnection::handle_auth_bad_method(
1820 uint32_t old_auth_method,
1821 int result,
1822 const std::vector<uint32_t>& allowed_methods,
1823 const std::vector<uint32_t>& allowed_modes)
1824 {
1825 ldout(cct,10) << __func__ << " old_auth_method " << old_auth_method
1826 << " result " << cpp_strerror(result)
1827 << " allowed_methods " << allowed_methods << dendl;
1828 std::vector<uint32_t> auth_supported;
1829 auth_registry->get_supported_methods(con->get_peer_type(), &auth_supported);
1830 auto p = std::find(auth_supported.begin(), auth_supported.end(),
1831 old_auth_method);
1832 assert(p != auth_supported.end());
1833 p = std::find_first_of(std::next(p), auth_supported.end(),
1834 allowed_methods.begin(), allowed_methods.end());
1835 if (p == auth_supported.end()) {
1836 lderr(cct) << __func__ << " server allowed_methods " << allowed_methods
1837 << " but i only support " << auth_supported << dendl;
1838 return -EACCES;
1839 }
1840 auth_method = *p;
1841 ldout(cct,10) << __func__ << " will try " << auth_method << " next" << dendl;
1842 return 0;
1843 }
1844
1845 int MonConnection::handle_auth(MAuthReply* m,
1846 const EntityName& entity_name,
1847 uint32_t want_keys,
1848 RotatingKeyRing* keyring)
1849 {
1850 if (state == State::NEGOTIATING) {
1851 int r = _negotiate(m, entity_name, want_keys, keyring);
1852 if (r) {
1853 return r;
1854 }
1855 state = State::AUTHENTICATING;
1856 }
1857 int r = authenticate(m);
1858 if (!r) {
1859 state = State::HAVE_SESSION;
1860 }
1861 return r;
1862 }
1863
1864 int MonConnection::_negotiate(MAuthReply *m,
1865 const EntityName& entity_name,
1866 uint32_t want_keys,
1867 RotatingKeyRing* keyring)
1868 {
1869 ldout(cct, 10) << __func__ << dendl;
1870 int r = _init_auth(m->protocol, entity_name, want_keys, keyring, false);
1871 if (r == -ENOTSUP) {
1872 if (m->result == -ENOTSUP) {
1873 ldout(cct, 10) << "none of our auth protocols are supported by the server"
1874 << dendl;
1875 }
1876 return m->result;
1877 }
1878 return r;
1879 }
1880
1881 int MonConnection::_init_auth(
1882 uint32_t method,
1883 const EntityName& entity_name,
1884 uint32_t want_keys,
1885 RotatingKeyRing* keyring,
1886 bool msgr2)
1887 {
1888 ldout(cct, 10) << __func__ << " method " << method << dendl;
1889 if (auth && auth->get_protocol() == (int)method) {
1890 ldout(cct, 10) << __func__ << " already have auth, reseting" << dendl;
1891 auth->reset();
1892 return 0;
1893 }
1894
1895 ldout(cct, 10) << __func__ << " creating new auth" << dendl;
1896 auth.reset(AuthClientHandler::create(cct, method, keyring));
1897 if (!auth) {
1898 ldout(cct, 10) << " no handler for protocol " << method << dendl;
1899 return -ENOTSUP;
1900 }
1901
1902 // do not request MGR key unless the mon has the SERVER_KRAKEN
1903 // feature. otherwise it will give us an auth error. note that
1904 // we have to use the FEATUREMASK because pre-jewel the kraken
1905 // feature bit was used for something else.
1906 if (!msgr2 &&
1907 (want_keys & CEPH_ENTITY_TYPE_MGR) &&
1908 !(con->has_features(CEPH_FEATUREMASK_SERVER_KRAKEN))) {
1909 ldout(cct, 1) << __func__
1910 << " not requesting MGR keys from pre-kraken monitor"
1911 << dendl;
1912 want_keys &= ~CEPH_ENTITY_TYPE_MGR;
1913 }
1914 auth->set_want_keys(want_keys);
1915 auth->init(entity_name);
1916 auth->set_global_id(global_id);
1917 return 0;
1918 }
1919
1920 int MonConnection::authenticate(MAuthReply *m)
1921 {
1922 ceph_assert(auth);
1923 if (!m->global_id) {
1924 ldout(cct, 1) << "peer sent an invalid global_id" << dendl;
1925 }
1926 if (m->global_id != global_id) {
1927 // it's a new session
1928 auth->reset();
1929 global_id = m->global_id;
1930 auth->set_global_id(global_id);
1931 ldout(cct, 10) << "my global_id is " << m->global_id << dendl;
1932 }
1933 auto p = m->result_bl.cbegin();
1934 int ret = auth->handle_response(m->result, p, nullptr, nullptr);
1935 if (ret == -EAGAIN) {
1936 auto ma = new MAuth;
1937 ma->protocol = auth->get_protocol();
1938 auth->prepare_build_request();
1939 auth->build_request(ma->auth_payload);
1940 con->send_message(ma);
1941 }
1942 if (ret == 0 && pending_tell_command) {
1943 con->send_message2(std::move(pending_tell_command));
1944 }
1945
1946 return ret;
1947 }
1948
1949 void MonClient::register_config_callback(md_config_t::config_callback fn) {
1950 ceph_assert(!config_cb);
1951 config_cb = fn;
1952 }
1953
1954 md_config_t::config_callback MonClient::get_config_callback() {
1955 return config_cb;
1956 }
1957
1958 #pragma GCC diagnostic push
1959 #pragma GCC diagnostic ignored "-Wnon-virtual-dtor"
1960 #pragma clang diagnostic push
1961 #pragma clang diagnostic ignored "-Wnon-virtual-dtor"
1962 class monc_error_category : public ceph::converting_category {
1963 public:
1964 monc_error_category(){}
1965 const char* name() const noexcept override;
1966 const char* message(int ev, char*, std::size_t) const noexcept override;
1967 std::string message(int ev) const override;
1968 bs::error_condition default_error_condition(int ev) const noexcept
1969 override;
1970 bool equivalent(int ev, const bs::error_condition& c) const
1971 noexcept override;
1972 using ceph::converting_category::equivalent;
1973 int from_code(int ev) const noexcept override;
1974 };
1975 #pragma GCC diagnostic pop
1976 #pragma clang diagnostic pop
1977
1978 const char* monc_error_category::name() const noexcept {
1979 return "monc";
1980 }
1981
1982 const char* monc_error_category::message(int ev, char*, std::size_t) const noexcept {
1983 if (ev == 0)
1984 return "No error";
1985
1986 switch (static_cast<monc_errc>(ev)) {
1987 case monc_errc::shutting_down: // Command failed due to MonClient shutting down
1988 return "Command failed due to MonClient shutting down";
1989 case monc_errc::session_reset:
1990 return "Monitor session was reset";
1991 case monc_errc::rank_dne:
1992 return "Requested monitor rank does not exist";
1993 case monc_errc::mon_dne:
1994 return "Requested monitor does not exist";
1995 case monc_errc::timed_out:
1996 return "Monitor operation timed out";
1997 case monc_errc::mon_unavailable:
1998 return "Monitor unavailable";
1999 }
2000
2001 return "Unknown error";
2002 }
2003
2004 std::string monc_error_category::message(int ev) const {
2005 return message(ev, nullptr, 0);
2006 }
2007
2008 bs::error_condition monc_error_category::default_error_condition(int ev) const noexcept {
2009 switch (static_cast<monc_errc>(ev)) {
2010 case monc_errc::shutting_down:
2011 return bs::errc::operation_canceled;
2012 case monc_errc::session_reset:
2013 return bs::errc::resource_unavailable_try_again;
2014 case monc_errc::rank_dne:
2015 [[fallthrough]];
2016 case monc_errc::mon_dne:
2017 return ceph::errc::not_in_map;
2018 case monc_errc::timed_out:
2019 return bs::errc::timed_out;
2020 case monc_errc::mon_unavailable:
2021 return bs::errc::no_such_device;
2022 }
2023 return { ev, *this };
2024 }
2025
2026 bool monc_error_category::equivalent(int ev, const bs::error_condition& c) const noexcept {
2027 switch (static_cast<monc_errc>(ev)) {
2028 case monc_errc::rank_dne:
2029 [[fallthrough]];
2030 case monc_errc::mon_dne:
2031 return c == bs::errc::no_such_file_or_directory;
2032 default:
2033 return default_error_condition(ev) == c;
2034 }
2035 }
2036
2037 int monc_error_category::from_code(int ev) const noexcept {
2038 if (ev == 0)
2039 return 0;
2040
2041 switch (static_cast<monc_errc>(ev)) {
2042 case monc_errc::shutting_down:
2043 return -ECANCELED;
2044 case monc_errc::session_reset:
2045 return -EAGAIN;
2046 case monc_errc::rank_dne:
2047 [[fallthrough]];
2048 case monc_errc::mon_dne:
2049 return -ENOENT;
2050 case monc_errc::timed_out:
2051 return -ETIMEDOUT;
2052 case monc_errc::mon_unavailable:
2053 return -ENXIO;
2054 }
2055 return -EDOM;
2056 }
2057
2058 const bs::error_category& monc_category() noexcept {
2059 static const monc_error_category c;
2060 return c;
2061 }