]> git.proxmox.com Git - ceph.git/blob - ceph/src/mon/MonClient.cc
bump version to 18.2.2-pve1
[ceph.git] / ceph / src / mon / MonClient.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #include <algorithm>
16 #include <iterator>
17 #include <random>
18 #include <boost/range/adaptor/map.hpp>
19 #include <boost/range/adaptor/filtered.hpp>
20 #include <boost/range/algorithm/copy.hpp>
21 #include <boost/range/algorithm_ext/copy_n.hpp>
22 #include "common/weighted_shuffle.h"
23
24 #include "include/random.h"
25 #include "include/scope_guard.h"
26 #include "include/stringify.h"
27
28 #include "messages/MMonGetMap.h"
29 #include "messages/MMonGetVersion.h"
30 #include "messages/MMonGetMap.h"
31 #include "messages/MMonGetVersionReply.h"
32 #include "messages/MMonMap.h"
33 #include "messages/MConfig.h"
34 #include "messages/MAuth.h"
35 #include "messages/MLogAck.h"
36 #include "messages/MAuthReply.h"
37 #include "messages/MMonCommand.h"
38 #include "messages/MMonCommandAck.h"
39 #include "messages/MCommand.h"
40 #include "messages/MCommandReply.h"
41 #include "messages/MPing.h"
42
43 #include "messages/MMonSubscribe.h"
44 #include "messages/MMonSubscribeAck.h"
45 #include "common/errno.h"
46 #include "common/hostname.h"
47 #include "common/LogClient.h"
48
49 #include "MonClient.h"
50 #include "error_code.h"
51 #include "MonMap.h"
52
53 #include "auth/Auth.h"
54 #include "auth/KeyRing.h"
55 #include "auth/AuthClientHandler.h"
56 #include "auth/AuthRegistry.h"
57 #include "auth/RotatingKeyRing.h"
58
59 #define dout_subsys ceph_subsys_monc
60 #undef dout_prefix
61 #define dout_prefix *_dout << "monclient" << (_hunting() ? "(hunting)":"") << ": "
62
63 namespace bs = boost::system;
64 using std::string;
65 using namespace std::literals;
66
67 MonClient::MonClient(CephContext *cct_, boost::asio::io_context& service) :
68 Dispatcher(cct_),
69 AuthServer(cct_),
70 messenger(NULL),
71 timer(cct_, monc_lock),
72 service(service),
73 initialized(false),
74 log_client(NULL),
75 more_log_pending(false),
76 want_monmap(true),
77 had_a_connection(false),
78 reopen_interval_multiplier(
79 cct_->_conf.get_val<double>("mon_client_hunt_interval_min_multiple")),
80 last_mon_command_tid(0),
81 version_req_id(0)
82 {}
83
84 MonClient::~MonClient()
85 {
86 }
87
88 int MonClient::build_initial_monmap()
89 {
90 ldout(cct, 10) << __func__ << dendl;
91 int r = monmap.build_initial(cct, false, std::cerr);
92 ldout(cct,10) << "monmap:\n";
93 monmap.print(*_dout);
94 *_dout << dendl;
95 return r;
96 }
97
98 int MonClient::get_monmap()
99 {
100 ldout(cct, 10) << __func__ << dendl;
101 std::unique_lock l(monc_lock);
102
103 sub.want("monmap", 0, 0);
104 if (!_opened())
105 _reopen_session();
106 map_cond.wait(l, [this] { return !want_monmap; });
107 ldout(cct, 10) << __func__ << " done" << dendl;
108 return 0;
109 }
110
111 int MonClient::get_monmap_and_config()
112 {
113 ldout(cct, 10) << __func__ << dendl;
114 ceph_assert(!messenger);
115
116 int tries = 10;
117
118 cct->init_crypto();
119 auto shutdown_crypto = make_scope_guard([this] {
120 cct->shutdown_crypto();
121 });
122
123 int r = build_initial_monmap();
124 if (r < 0) {
125 lderr(cct) << __func__ << " cannot identify monitors to contact" << dendl;
126 return r;
127 }
128
129 messenger = Messenger::create_client_messenger(
130 cct, "temp_mon_client");
131 ceph_assert(messenger);
132 messenger->add_dispatcher_head(this);
133 messenger->start();
134 auto shutdown_msgr = make_scope_guard([this] {
135 messenger->shutdown();
136 messenger->wait();
137 delete messenger;
138 messenger = nullptr;
139 if (!monmap.fsid.is_zero()) {
140 cct->_conf.set_val("fsid", stringify(monmap.fsid));
141 }
142 });
143
144 want_bootstrap_config = true;
145 auto shutdown_config = make_scope_guard([this] {
146 std::unique_lock l(monc_lock);
147 want_bootstrap_config = false;
148 bootstrap_config.reset();
149 });
150
151 ceph::ref_t<MConfig> config;
152 while (tries-- > 0) {
153 r = init();
154 if (r < 0) {
155 return r;
156 }
157 r = authenticate(
158 cct->_conf.get_val<std::chrono::seconds>("client_mount_timeout").count());
159 if (r < 0) {
160 break;
161 }
162 {
163 std::unique_lock l(monc_lock);
164 if (monmap.get_epoch() &&
165 !monmap.persistent_features.contains_all(
166 ceph::features::mon::FEATURE_MIMIC)) {
167 ldout(cct,10) << __func__ << " pre-mimic monitor, no config to fetch"
168 << dendl;
169 r = 0;
170 break;
171 }
172 while ((!bootstrap_config || monmap.get_epoch() == 0) && r == 0) {
173 ldout(cct,20) << __func__ << " waiting for monmap|config" << dendl;
174 auto status = map_cond.wait_for(l, ceph::make_timespan(
175 cct->_conf->mon_client_hunt_interval));
176 if (status == std::cv_status::timeout) {
177 r = -ETIMEDOUT;
178 }
179 }
180
181 if (bootstrap_config) {
182 ldout(cct,10) << __func__ << " success" << dendl;
183 config = std::move(bootstrap_config);
184 r = 0;
185 break;
186 }
187 }
188 lderr(cct) << __func__ << " failed to get config" << dendl;
189 shutdown();
190 continue;
191 }
192
193 if (config) {
194 // apply the bootstrap config to ensure its applied prior to completing
195 // the bootstrap
196 cct->_conf.set_mon_vals(cct, config->config, config_cb);
197 }
198
199 shutdown();
200 return r;
201 }
202
203
204 /**
205 * Ping the monitor with id @p mon_id and set the resulting reply in
206 * the provided @p result_reply, if this last parameter is not NULL.
207 *
208 * So that we don't rely on the MonClient's default messenger, set up
209 * during connect(), we create our own messenger to comunicate with the
210 * specified monitor. This is advantageous in the following ways:
211 *
212 * - Isolate the ping procedure from the rest of the MonClient's operations,
213 * allowing us to not acquire or manage the big monc_lock, thus not
214 * having to block waiting for some other operation to finish before we
215 * can proceed.
216 * * for instance, we can ping mon.FOO even if we are currently hunting
217 * or blocked waiting for auth to complete with mon.BAR.
218 *
219 * - Ping a monitor prior to establishing a connection (using connect())
220 * and properly establish the MonClient's messenger. This frees us
221 * from dealing with the complex foo that happens in connect().
222 *
223 * We also don't rely on MonClient as a dispatcher for this messenger,
224 * unlike what happens with the MonClient's default messenger. This allows
225 * us to sandbox the whole ping, having it much as a separate entity in
226 * the MonClient class, considerably simplifying the handling and dispatching
227 * of messages without needing to consider monc_lock.
228 *
229 * Current drawback is that we will establish a messenger for each ping
230 * we want to issue, instead of keeping a single messenger instance that
231 * would be used for all pings.
232 */
233 int MonClient::ping_monitor(const string &mon_id, string *result_reply)
234 {
235 ldout(cct, 10) << __func__ << dendl;
236
237 string new_mon_id;
238 if (monmap.contains("noname-"+mon_id)) {
239 new_mon_id = "noname-"+mon_id;
240 } else {
241 new_mon_id = mon_id;
242 }
243
244 if (new_mon_id.empty()) {
245 ldout(cct, 10) << __func__ << " specified mon id is empty!" << dendl;
246 return -EINVAL;
247 } else if (!monmap.contains(new_mon_id)) {
248 ldout(cct, 10) << __func__ << " no such monitor 'mon." << new_mon_id << "'"
249 << dendl;
250 return -ENOENT;
251 }
252
253 // N.B. monc isn't initialized
254
255 auth_registry.refresh_config();
256
257 KeyRing keyring;
258 keyring.from_ceph_context(cct);
259 RotatingKeyRing rkeyring(cct, cct->get_module_type(), &keyring);
260
261 MonClientPinger *pinger = new MonClientPinger(cct,
262 &rkeyring,
263 result_reply);
264
265 Messenger *smsgr = Messenger::create_client_messenger(cct, "temp_ping_client");
266 smsgr->add_dispatcher_head(pinger);
267 smsgr->set_auth_client(pinger);
268 smsgr->start();
269
270 ConnectionRef con = smsgr->connect_to_mon(monmap.get_addrs(new_mon_id));
271 ldout(cct, 10) << __func__ << " ping mon." << new_mon_id
272 << " " << con->get_peer_addr() << dendl;
273
274 pinger->mc.reset(new MonConnection(cct, con, 0, &auth_registry));
275 pinger->mc->start(monmap.get_epoch(), entity_name);
276 con->send_message(new MPing);
277
278 int ret = pinger->wait_for_reply(cct->_conf->mon_client_ping_timeout);
279 if (ret == 0) {
280 ldout(cct,10) << __func__ << " got ping reply" << dendl;
281 } else {
282 ret = -ret;
283 }
284
285 con->mark_down();
286 pinger->mc.reset();
287 smsgr->shutdown();
288 smsgr->wait();
289 delete smsgr;
290 delete pinger;
291 return ret;
292 }
293
294 bool MonClient::ms_dispatch(Message *m)
295 {
296 // we only care about these message types
297 switch (m->get_type()) {
298 case CEPH_MSG_MON_MAP:
299 case CEPH_MSG_AUTH_REPLY:
300 case CEPH_MSG_MON_SUBSCRIBE_ACK:
301 case CEPH_MSG_MON_GET_VERSION_REPLY:
302 case MSG_MON_COMMAND_ACK:
303 case MSG_COMMAND_REPLY:
304 case MSG_LOGACK:
305 case MSG_CONFIG:
306 break;
307 case CEPH_MSG_PING:
308 m->put();
309 return true;
310 default:
311 return false;
312 }
313
314 std::lock_guard lock(monc_lock);
315
316 if (!m->get_connection()->is_anon() &&
317 m->get_source().type() == CEPH_ENTITY_TYPE_MON) {
318 if (_hunting()) {
319 auto p = _find_pending_con(m->get_connection());
320 if (p == pending_cons.end()) {
321 // ignore any messages outside hunting sessions
322 ldout(cct, 10) << "discarding stray monitor message " << *m << dendl;
323 m->put();
324 return true;
325 }
326 } else if (!active_con || active_con->get_con() != m->get_connection()) {
327 // ignore any messages outside our session(s)
328 ldout(cct, 10) << "discarding stray monitor message " << *m << dendl;
329 m->put();
330 return true;
331 }
332 }
333
334 switch (m->get_type()) {
335 case CEPH_MSG_MON_MAP:
336 handle_monmap(static_cast<MMonMap*>(m));
337 if (passthrough_monmap) {
338 return false;
339 } else {
340 m->put();
341 }
342 break;
343 case CEPH_MSG_AUTH_REPLY:
344 handle_auth(static_cast<MAuthReply*>(m));
345 break;
346 case CEPH_MSG_MON_SUBSCRIBE_ACK:
347 handle_subscribe_ack(static_cast<MMonSubscribeAck*>(m));
348 break;
349 case CEPH_MSG_MON_GET_VERSION_REPLY:
350 handle_get_version_reply(static_cast<MMonGetVersionReply*>(m));
351 break;
352 case MSG_MON_COMMAND_ACK:
353 handle_mon_command_ack(static_cast<MMonCommandAck*>(m));
354 break;
355 case MSG_COMMAND_REPLY:
356 if (m->get_connection()->is_anon() &&
357 m->get_source().type() == CEPH_ENTITY_TYPE_MON) {
358 // this connection is from 'tell'... ignore everything except our command
359 // reply. (we'll get misc other message because we authenticated, but we
360 // don't need them.)
361 handle_command_reply(static_cast<MCommandReply*>(m));
362 return true;
363 }
364 // leave the message for another dispatch handler (e.g., Objecter)
365 return false;
366 case MSG_LOGACK:
367 if (log_client) {
368 log_client->handle_log_ack(static_cast<MLogAck*>(m));
369 m->put();
370 if (more_log_pending) {
371 send_log();
372 }
373 } else {
374 m->put();
375 }
376 break;
377 case MSG_CONFIG:
378 handle_config(static_cast<MConfig*>(m));
379 break;
380 }
381 return true;
382 }
383
384 void MonClient::send_log(bool flush)
385 {
386 if (log_client) {
387 auto lm = log_client->get_mon_log_message(flush);
388 if (lm)
389 _send_mon_message(std::move(lm));
390 more_log_pending = log_client->are_pending();
391 }
392 }
393
394 void MonClient::flush_log()
395 {
396 std::lock_guard l(monc_lock);
397 send_log();
398 }
399
400 /* Unlike all the other message-handling functions, we don't put away a reference
401 * because we want to support MMonMap passthrough to other Dispatchers. */
402 void MonClient::handle_monmap(MMonMap *m)
403 {
404 ldout(cct, 10) << __func__ << " " << *m << dendl;
405 auto con_addrs = m->get_source_addrs();
406 string old_name = monmap.get_name(con_addrs);
407 const auto old_epoch = monmap.get_epoch();
408
409 auto p = m->monmapbl.cbegin();
410 decode(monmap, p);
411
412 ldout(cct, 10) << " got monmap " << monmap.epoch
413 << " from mon." << old_name
414 << " (according to old e" << monmap.get_epoch() << ")"
415 << dendl;
416 ldout(cct, 10) << "dump:\n";
417 monmap.print(*_dout);
418 *_dout << dendl;
419
420 if (old_epoch != monmap.get_epoch()) {
421 tried.clear();
422 }
423 if (old_name.size() == 0) {
424 ldout(cct,10) << " can't identify which mon we were connected to" << dendl;
425 _reopen_session();
426 } else {
427 auto new_name = monmap.get_name(con_addrs);
428 if (new_name.empty()) {
429 ldout(cct, 10) << "mon." << old_name << " at " << con_addrs
430 << " went away" << dendl;
431 // can't find the mon we were talking to (above)
432 _reopen_session();
433 } else if (messenger->should_use_msgr2() &&
434 monmap.get_addrs(new_name).has_msgr2() &&
435 !con_addrs.has_msgr2()) {
436 ldout(cct,1) << " mon." << new_name << " has (v2) addrs "
437 << monmap.get_addrs(new_name) << " but i'm connected to "
438 << con_addrs << ", reconnecting" << dendl;
439 _reopen_session();
440 }
441 }
442
443 cct->set_mon_addrs(monmap);
444
445 sub.got("monmap", monmap.get_epoch());
446 map_cond.notify_all();
447 want_monmap = false;
448
449 if (authenticate_err == 1) {
450 _finish_auth(0);
451 }
452 }
453
454 void MonClient::handle_config(MConfig *m)
455 {
456 ldout(cct,10) << __func__ << " " << *m << dendl;
457
458 if (want_bootstrap_config) {
459 // get_monmap_and_config is waiting for config which it will apply
460 // synchronously
461 bootstrap_config = ceph::ref_t<MConfig>(m, false);
462 map_cond.notify_all();
463 return;
464 }
465
466 // Take the sledgehammer approach to ensuring we don't depend on
467 // anything in MonClient.
468 boost::asio::post(finish_strand,
469 [m, cct = boost::intrusive_ptr<CephContext>(cct),
470 config_notify_cb = config_notify_cb,
471 config_cb = config_cb]() {
472 cct->_conf.set_mon_vals(cct.get(), m->config, config_cb);
473 if (config_notify_cb) {
474 config_notify_cb();
475 }
476 m->put();
477 });
478 }
479
480 // ----------------------
481
482 int MonClient::init()
483 {
484 ldout(cct, 10) << __func__ << dendl;
485
486 entity_name = cct->_conf->name;
487
488 auth_registry.refresh_config();
489
490 std::lock_guard l(monc_lock);
491 keyring.reset(new KeyRing);
492 if (auth_registry.is_supported_method(messenger->get_mytype(),
493 CEPH_AUTH_CEPHX)) {
494 // this should succeed, because auth_registry just checked!
495 int r = keyring->from_ceph_context(cct);
496 if (r != 0) {
497 // but be somewhat graceful in case there was a race condition
498 lderr(cct) << "keyring not found" << dendl;
499 return r;
500 }
501 }
502 if (!auth_registry.any_supported_methods(messenger->get_mytype())) {
503 return -ENOENT;
504 }
505
506 rotating_secrets.reset(
507 new RotatingKeyRing(cct, cct->get_module_type(), keyring.get()));
508
509 initialized = true;
510
511 messenger->set_auth_client(this);
512 messenger->add_dispatcher_head(this);
513
514 timer.init();
515 schedule_tick();
516
517 cct->get_admin_socket()->register_command(
518 "rotate-key",
519 this,
520 "rotate live authentication key");
521
522 return 0;
523 }
524
525 void MonClient::shutdown()
526 {
527 ldout(cct, 10) << __func__ << dendl;
528
529 cct->get_admin_socket()->unregister_commands(this);
530
531 monc_lock.lock();
532 stopping = true;
533 while (!version_requests.empty()) {
534 ceph::async::post(std::move(version_requests.begin()->second),
535 monc_errc::shutting_down, 0, 0);
536 ldout(cct, 20) << __func__ << " canceling and discarding version request "
537 << version_requests.begin()->first << dendl;
538 version_requests.erase(version_requests.begin());
539 }
540 while (!mon_commands.empty()) {
541 auto tid = mon_commands.begin()->first;
542 _cancel_mon_command(tid);
543 }
544 ldout(cct, 20) << __func__ << " discarding " << waiting_for_session.size()
545 << " pending message(s)" << dendl;
546 waiting_for_session.clear();
547
548 active_con.reset();
549 pending_cons.clear();
550
551 auth.reset();
552 global_id = 0;
553 authenticate_err = 0;
554 authenticated = false;
555
556 monc_lock.unlock();
557
558 if (initialized) {
559 initialized = false;
560 }
561 monc_lock.lock();
562 timer.shutdown();
563 stopping = false;
564 monc_lock.unlock();
565 }
566
567 int MonClient::authenticate(double timeout)
568 {
569 std::unique_lock lock{monc_lock};
570
571 if (active_con) {
572 ldout(cct, 5) << "already authenticated" << dendl;
573 return 0;
574 }
575 sub.want("monmap", monmap.get_epoch() ? monmap.get_epoch() + 1 : 0, 0);
576 sub.want("config", 0, 0);
577 if (!_opened())
578 _reopen_session();
579
580 auto until = ceph::mono_clock::now();
581 until += ceph::make_timespan(timeout);
582 if (timeout > 0.0)
583 ldout(cct, 10) << "authenticate will time out at " << until << dendl;
584 while (!active_con && authenticate_err >= 0) {
585 if (timeout > 0.0) {
586 auto r = auth_cond.wait_until(lock, until);
587 if (r == std::cv_status::timeout && !active_con) {
588 ldout(cct, 0) << "authenticate timed out after " << timeout << dendl;
589 authenticate_err = -ETIMEDOUT;
590 }
591 } else {
592 auth_cond.wait(lock);
593 }
594 }
595
596 if (active_con) {
597 ldout(cct, 5) << __func__ << " success, global_id "
598 << active_con->get_global_id() << dendl;
599 // active_con should not have been set if there was an error
600 ceph_assert(authenticate_err >= 0);
601 authenticated = true;
602 }
603
604 if (authenticate_err < 0 && auth_registry.no_keyring_disabled_cephx()) {
605 lderr(cct) << __func__ << " NOTE: no keyring found; disabled cephx authentication" << dendl;
606 }
607
608 return authenticate_err;
609 }
610
611 int MonClient::call(
612 std::string_view command,
613 const cmdmap_t& cmdmap,
614 const ceph::buffer::list &inbl,
615 ceph::Formatter *f,
616 std::ostream& errss,
617 ceph::buffer::list& out)
618 {
619 if (command == "rotate-key") {
620 CryptoKey key;
621 try {
622 key.decode_base64(inbl.to_str());
623 } catch (buffer::error& e) {
624 errss << "error decoding key: " << e.what();
625 return -EINVAL;
626 }
627 if (keyring) {
628 ldout(cct, 1) << "rotate live key for " << entity_name << dendl;
629 keyring->add(entity_name, key);
630 } else {
631 errss << "cephx not enabled; no key to rotate";
632 return -EINVAL;
633 }
634 }
635 return 0;
636 }
637
638 void MonClient::handle_auth(MAuthReply *m)
639 {
640 ceph_assert(ceph_mutex_is_locked(monc_lock));
641
642 if (m->get_connection()->is_anon()) {
643 // anon connection, used for mon tell commands
644 for (auto& p : mon_commands) {
645 if (p.second->target_con == m->get_connection()) {
646 auto& mc = p.second->target_session;
647 int ret = mc->handle_auth(m, entity_name,
648 CEPH_ENTITY_TYPE_MON,
649 rotating_secrets.get());
650 (void)ret; // we don't care
651 break;
652 }
653 }
654 m->put();
655 return;
656 }
657
658 if (!_hunting()) {
659 std::swap(active_con->get_auth(), auth);
660 int ret = active_con->authenticate(m);
661 m->put();
662 std::swap(auth, active_con->get_auth());
663 if (global_id != active_con->get_global_id()) {
664 lderr(cct) << __func__ << " peer assigned me a different global_id: "
665 << active_con->get_global_id() << dendl;
666 }
667 if (ret != -EAGAIN) {
668 _finish_auth(ret);
669 }
670 return;
671 }
672
673 // hunting
674 auto found = _find_pending_con(m->get_connection());
675 ceph_assert(found != pending_cons.end());
676 int auth_err = found->second.handle_auth(m, entity_name, want_keys,
677 rotating_secrets.get());
678 m->put();
679 if (auth_err == -EAGAIN) {
680 return;
681 }
682 if (auth_err) {
683 pending_cons.erase(found);
684 if (!pending_cons.empty()) {
685 // keep trying with pending connections
686 return;
687 }
688 // the last try just failed, give up.
689 } else {
690 auto& mc = found->second;
691 ceph_assert(mc.have_session());
692 active_con.reset(new MonConnection(std::move(mc)));
693 pending_cons.clear();
694 }
695
696 _finish_hunting(auth_err);
697 _finish_auth(auth_err);
698 }
699
700 void MonClient::_finish_auth(int auth_err)
701 {
702 ldout(cct,10) << __func__ << " " << auth_err << dendl;
703 authenticate_err = auth_err;
704 // _resend_mon_commands() could _reopen_session() if the connected mon is not
705 // the one the MonCommand is targeting.
706 if (!auth_err && active_con) {
707 ceph_assert(auth);
708 _check_auth_tickets();
709 } else if (auth_err == -EAGAIN && !active_con) {
710 ldout(cct,10) << __func__
711 << " auth returned EAGAIN, reopening the session to try again"
712 << dendl;
713 _reopen_session();
714 }
715 auth_cond.notify_all();
716 }
717
718 // ---------
719
720 void MonClient::send_mon_message(MessageRef m)
721 {
722 std::lock_guard l{monc_lock};
723 _send_mon_message(std::move(m));
724 }
725
726 void MonClient::_send_mon_message(MessageRef m)
727 {
728 ceph_assert(ceph_mutex_is_locked(monc_lock));
729 if (active_con) {
730 auto cur_con = active_con->get_con();
731 ldout(cct, 10) << "_send_mon_message to mon."
732 << monmap.get_name(cur_con->get_peer_addr())
733 << " at " << cur_con->get_peer_addr() << dendl;
734 cur_con->send_message2(std::move(m));
735 } else {
736 waiting_for_session.push_back(std::move(m));
737 }
738 }
739
740 void MonClient::_reopen_session(int rank)
741 {
742 ceph_assert(ceph_mutex_is_locked(monc_lock));
743 ldout(cct, 10) << __func__ << " rank " << rank << dendl;
744
745 active_con.reset();
746 pending_cons.clear();
747
748 authenticate_err = 1; // == in progress
749
750 _start_hunting();
751
752 if (rank >= 0) {
753 _add_conn(rank);
754 } else {
755 _add_conns();
756 }
757
758 // throw out old queued messages
759 waiting_for_session.clear();
760
761 // throw out version check requests
762 while (!version_requests.empty()) {
763 ceph::async::post(std::move(version_requests.begin()->second),
764 monc_errc::session_reset, 0, 0);
765 version_requests.erase(version_requests.begin());
766 }
767
768 for (auto& c : pending_cons) {
769 c.second.start(monmap.get_epoch(), entity_name);
770 }
771
772 if (sub.reload()) {
773 _renew_subs();
774 }
775 }
776
777 void MonClient::_add_conn(unsigned rank)
778 {
779 auto peer = monmap.get_addrs(rank);
780 auto conn = messenger->connect_to_mon(peer);
781 MonConnection mc(cct, conn, global_id, &auth_registry);
782 if (auth) {
783 mc.get_auth().reset(auth->clone());
784 }
785 pending_cons.insert(std::make_pair(peer, std::move(mc)));
786 ldout(cct, 10) << "picked mon." << monmap.get_name(rank)
787 << " con " << conn
788 << " addr " << peer
789 << dendl;
790 }
791
792 void MonClient::_add_conns()
793 {
794 // collect the next batch of candidates who are listed right next to the ones
795 // already tried
796 auto get_next_batch = [this]() -> std::vector<unsigned> {
797 std::multimap<uint16_t, unsigned> ranks_by_priority;
798 boost::copy(
799 monmap.mon_info | boost::adaptors::filtered(
800 [this](auto& info) {
801 auto rank = monmap.get_rank(info.first);
802 return tried.count(rank) == 0;
803 }) | boost::adaptors::transformed(
804 [this](auto& info) {
805 auto rank = monmap.get_rank(info.first);
806 return std::make_pair(info.second.priority, rank);
807 }), std::inserter(ranks_by_priority, end(ranks_by_priority)));
808 if (ranks_by_priority.empty()) {
809 return {};
810 }
811 // only choose the monitors with lowest priority
812 auto cands = boost::make_iterator_range(
813 ranks_by_priority.equal_range(ranks_by_priority.begin()->first));
814 std::vector<unsigned> ranks;
815 boost::range::copy(cands | boost::adaptors::map_values,
816 std::back_inserter(ranks));
817 return ranks;
818 };
819 auto ranks = get_next_batch();
820 if (ranks.empty()) {
821 tried.clear(); // start over
822 ranks = get_next_batch();
823 }
824 ceph_assert(!ranks.empty());
825 if (ranks.size() > 1) {
826 std::vector<uint16_t> weights;
827 for (auto i : ranks) {
828 auto rank_name = monmap.get_name(i);
829 weights.push_back(monmap.get_weight(rank_name));
830 }
831 random_device_t rd;
832 if (std::accumulate(begin(weights), end(weights), 0u) == 0) {
833 std::shuffle(begin(ranks), end(ranks), std::mt19937{rd()});
834 } else {
835 weighted_shuffle(begin(ranks), end(ranks), begin(weights), end(weights),
836 std::mt19937{rd()});
837 }
838 }
839 ldout(cct, 10) << __func__ << " ranks=" << ranks << dendl;
840 unsigned n = cct->_conf->mon_client_hunt_parallel;
841 if (n == 0 || n > ranks.size()) {
842 n = ranks.size();
843 }
844 for (unsigned i = 0; i < n; i++) {
845 _add_conn(ranks[i]);
846 tried.insert(ranks[i]);
847 }
848 }
849
850 bool MonClient::ms_handle_reset(Connection *con)
851 {
852 std::lock_guard lock(monc_lock);
853
854 if (con->get_peer_type() != CEPH_ENTITY_TYPE_MON)
855 return false;
856
857 if (con->is_anon()) {
858 auto p = mon_commands.begin();
859 while (p != mon_commands.end()) {
860 auto cmd = p->second;
861 ++p;
862 if (cmd->target_con == con) {
863 _send_command(cmd); // may retry or fail
864 break;
865 }
866 }
867 return true;
868 }
869
870 if (_hunting()) {
871 if (pending_cons.count(con->get_peer_addrs())) {
872 ldout(cct, 10) << __func__ << " hunted mon " << con->get_peer_addrs()
873 << dendl;
874 } else {
875 ldout(cct, 10) << __func__ << " stray mon " << con->get_peer_addrs()
876 << dendl;
877 }
878 return true;
879 } else {
880 if (active_con && con == active_con->get_con()) {
881 ldout(cct, 10) << __func__ << " current mon " << con->get_peer_addrs()
882 << dendl;
883 _reopen_session();
884 return false;
885 } else {
886 ldout(cct, 10) << "ms_handle_reset stray mon " << con->get_peer_addrs()
887 << dendl;
888 return true;
889 }
890 }
891 }
892
893 bool MonClient::_opened() const
894 {
895 ceph_assert(ceph_mutex_is_locked(monc_lock));
896 return active_con || _hunting();
897 }
898
899 bool MonClient::_hunting() const
900 {
901 return !pending_cons.empty();
902 }
903
904 void MonClient::_start_hunting()
905 {
906 ceph_assert(!_hunting());
907 // adjust timeouts if necessary
908 if (!had_a_connection)
909 return;
910 reopen_interval_multiplier *= cct->_conf->mon_client_hunt_interval_backoff;
911 if (reopen_interval_multiplier >
912 cct->_conf->mon_client_hunt_interval_max_multiple) {
913 reopen_interval_multiplier =
914 cct->_conf->mon_client_hunt_interval_max_multiple;
915 }
916 }
917
918 void MonClient::_finish_hunting(int auth_err)
919 {
920 ldout(cct,10) << __func__ << " " << auth_err << dendl;
921 ceph_assert(ceph_mutex_is_locked(monc_lock));
922 // the pending conns have been cleaned.
923 ceph_assert(!_hunting());
924 if (active_con) {
925 auto con = active_con->get_con();
926 ldout(cct, 1) << "found mon."
927 << monmap.get_name(con->get_peer_addr())
928 << dendl;
929 } else {
930 ldout(cct, 1) << "no mon sessions established" << dendl;
931 }
932
933 had_a_connection = true;
934 _un_backoff();
935
936 if (!auth_err) {
937 last_rotating_renew_sent = utime_t();
938 while (!waiting_for_session.empty()) {
939 _send_mon_message(std::move(waiting_for_session.front()));
940 waiting_for_session.pop_front();
941 }
942 _resend_mon_commands();
943 send_log(true);
944 if (active_con) {
945 auth = std::move(active_con->get_auth());
946 if (global_id && global_id != active_con->get_global_id()) {
947 lderr(cct) << __func__ << " global_id changed from " << global_id
948 << " to " << active_con->get_global_id() << dendl;
949 }
950 global_id = active_con->get_global_id();
951 }
952 }
953 }
954
955 void MonClient::tick()
956 {
957 ldout(cct, 10) << __func__ << dendl;
958
959 utime_t now = ceph_clock_now();
960
961 auto reschedule_tick = make_scope_guard([this] {
962 schedule_tick();
963 });
964
965 _check_auth_tickets();
966 _check_tell_commands();
967
968 if (_hunting()) {
969 ldout(cct, 1) << "continuing hunt" << dendl;
970 return _reopen_session();
971 } else if (active_con) {
972 // just renew as needed
973 auto cur_con = active_con->get_con();
974 if (!cur_con->has_feature(CEPH_FEATURE_MON_STATEFUL_SUB)) {
975 const bool maybe_renew = sub.need_renew();
976 ldout(cct, 10) << "renew subs? -- " << (maybe_renew ? "yes" : "no")
977 << dendl;
978 if (maybe_renew) {
979 _renew_subs();
980 }
981 }
982
983 if (now > last_keepalive + cct->_conf->mon_client_ping_interval) {
984 cur_con->send_keepalive();
985 last_keepalive = now;
986
987 if (cct->_conf->mon_client_ping_timeout > 0 &&
988 cur_con->has_feature(CEPH_FEATURE_MSGR_KEEPALIVE2)) {
989 utime_t lk = cur_con->get_last_keepalive_ack();
990 utime_t interval = now - lk;
991 if (interval > cct->_conf->mon_client_ping_timeout) {
992 ldout(cct, 1) << "no keepalive since " << lk << " (" << interval
993 << " seconds), reconnecting" << dendl;
994 return _reopen_session();
995 }
996 }
997
998 _un_backoff();
999 }
1000
1001 if (now > last_send_log + cct->_conf->mon_client_log_interval) {
1002 send_log();
1003 last_send_log = now;
1004 }
1005 }
1006 }
1007
1008 void MonClient::_un_backoff()
1009 {
1010 // un-backoff our reconnect interval
1011 reopen_interval_multiplier = std::max(
1012 cct->_conf.get_val<double>("mon_client_hunt_interval_min_multiple"),
1013 reopen_interval_multiplier /
1014 cct->_conf.get_val<double>("mon_client_hunt_interval_backoff"));
1015 ldout(cct, 20) << __func__ << " reopen_interval_multipler now "
1016 << reopen_interval_multiplier << dendl;
1017 }
1018
1019 void MonClient::schedule_tick()
1020 {
1021 auto do_tick = make_lambda_context([this](int) { tick(); });
1022 if (!is_connected()) {
1023 // start another round of hunting
1024 const auto hunt_interval = (cct->_conf->mon_client_hunt_interval *
1025 reopen_interval_multiplier);
1026 timer.add_event_after(hunt_interval, do_tick);
1027 } else {
1028 // keep in touch
1029 timer.add_event_after(std::min(cct->_conf->mon_client_ping_interval,
1030 cct->_conf->mon_client_log_interval),
1031 do_tick);
1032 }
1033 }
1034
1035 // ---------
1036
1037 void MonClient::_renew_subs()
1038 {
1039 ceph_assert(ceph_mutex_is_locked(monc_lock));
1040 if (!sub.have_new()) {
1041 ldout(cct, 10) << __func__ << " - empty" << dendl;
1042 return;
1043 }
1044
1045 ldout(cct, 10) << __func__ << dendl;
1046 if (!_opened())
1047 _reopen_session();
1048 else {
1049 auto m = ceph::make_message<MMonSubscribe>();
1050 m->what = sub.get_subs();
1051 m->hostname = ceph_get_short_hostname();
1052 _send_mon_message(std::move(m));
1053 sub.renewed();
1054 }
1055 }
1056
1057 void MonClient::handle_subscribe_ack(MMonSubscribeAck *m)
1058 {
1059 sub.acked(m->interval);
1060 m->put();
1061 }
1062
1063 int MonClient::_check_auth_tickets()
1064 {
1065 ldout(cct, 10) << __func__ << dendl;
1066 ceph_assert(ceph_mutex_is_locked(monc_lock));
1067 if (active_con && auth) {
1068 if (auth->need_tickets()) {
1069 ldout(cct, 10) << __func__ << " getting new tickets!" << dendl;
1070 auto m = ceph::make_message<MAuth>();
1071 m->protocol = auth->get_protocol();
1072 auth->prepare_build_request();
1073 auth->build_request(m->auth_payload);
1074 _send_mon_message(m);
1075 }
1076
1077 _check_auth_rotating();
1078 }
1079 return 0;
1080 }
1081
1082 int MonClient::_check_auth_rotating()
1083 {
1084 ceph_assert(ceph_mutex_is_locked(monc_lock));
1085 if (!rotating_secrets ||
1086 !auth_principal_needs_rotating_keys(entity_name)) {
1087 ldout(cct, 20) << "_check_auth_rotating not needed by " << entity_name << dendl;
1088 return 0;
1089 }
1090
1091 if (!active_con || !auth) {
1092 ldout(cct, 10) << "_check_auth_rotating waiting for auth session" << dendl;
1093 return 0;
1094 }
1095
1096 utime_t now = ceph_clock_now();
1097 utime_t cutoff = now;
1098 cutoff -= std::min(30.0, cct->_conf->auth_service_ticket_ttl / 4.0);
1099 utime_t issued_at_lower_bound = now;
1100 issued_at_lower_bound -= cct->_conf->auth_service_ticket_ttl;
1101 if (!rotating_secrets->need_new_secrets(cutoff)) {
1102 ldout(cct, 10) << "_check_auth_rotating have uptodate secrets (they expire after " << cutoff << ")" << dendl;
1103 rotating_secrets->dump_rotating();
1104 return 0;
1105 }
1106
1107 ldout(cct, 10) << "_check_auth_rotating renewing rotating keys (they expired before " << cutoff << ")" << dendl;
1108 if (!rotating_secrets->need_new_secrets() &&
1109 rotating_secrets->need_new_secrets(issued_at_lower_bound)) {
1110 // the key has expired before it has been issued?
1111 lderr(cct) << __func__ << " possible clock skew, rotating keys expired way too early"
1112 << " (before " << issued_at_lower_bound << ")" << dendl;
1113 }
1114 if ((now > last_rotating_renew_sent) &&
1115 double(now - last_rotating_renew_sent) < 1) {
1116 ldout(cct, 10) << __func__ << " called too often (last: "
1117 << last_rotating_renew_sent << "), skipping refresh" << dendl;
1118 return 0;
1119 }
1120 auto m = ceph::make_message<MAuth>();
1121 m->protocol = auth->get_protocol();
1122 if (auth->build_rotating_request(m->auth_payload)) {
1123 last_rotating_renew_sent = now;
1124 _send_mon_message(std::move(m));
1125 }
1126 return 0;
1127 }
1128
1129 int MonClient::wait_auth_rotating(double timeout)
1130 {
1131 std::unique_lock l(monc_lock);
1132
1133 // Must be initialized
1134 ceph_assert(auth != nullptr);
1135
1136 if (auth->get_protocol() == CEPH_AUTH_NONE)
1137 return 0;
1138
1139 if (!rotating_secrets)
1140 return 0;
1141
1142 ldout(cct, 10) << __func__ << " waiting for " << timeout << dendl;
1143 utime_t cutoff = ceph_clock_now();
1144 cutoff -= std::min(30.0, cct->_conf->auth_service_ticket_ttl / 4.0);
1145 if (auth_cond.wait_for(l, ceph::make_timespan(timeout), [this, cutoff] {
1146 return (!auth_principal_needs_rotating_keys(entity_name) ||
1147 !rotating_secrets->need_new_secrets(cutoff));
1148 })) {
1149 ldout(cct, 10) << __func__ << " done" << dendl;
1150 return 0;
1151 } else {
1152 ldout(cct, 0) << __func__ << " timed out after " << timeout << dendl;
1153 return -ETIMEDOUT;
1154 }
1155 }
1156
1157 // ---------
1158
1159 void MonClient::_send_command(MonCommand *r)
1160 {
1161 if (r->is_tell()) {
1162 ++r->send_attempts;
1163 if (r->send_attempts > cct->_conf->mon_client_directed_command_retry) {
1164 _finish_command(r, monc_errc::mon_unavailable, "mon unavailable", {});
1165 return;
1166 }
1167 // tell-style command
1168 if (monmap.min_mon_release >= ceph_release_t::octopus) {
1169 if (r->target_con) {
1170 r->target_con->mark_down();
1171 }
1172 if (r->target_rank >= 0) {
1173 if (r->target_rank >= (int)monmap.size()) {
1174 ldout(cct, 10) << " target " << r->target_rank
1175 << " >= max mon " << monmap.size() << dendl;
1176 _finish_command(r, monc_errc::rank_dne, "mon rank dne"sv, {});
1177 return;
1178 }
1179 r->target_con = messenger->connect_to_mon(
1180 monmap.get_addrs(r->target_rank), true /* anon */);
1181 } else {
1182 if (!monmap.contains(r->target_name)) {
1183 ldout(cct, 10) << " target " << r->target_name
1184 << " not present in monmap" << dendl;
1185 _finish_command(r, monc_errc::mon_dne, "mon dne"sv, {});
1186 return;
1187 }
1188 r->target_con = messenger->connect_to_mon(
1189 monmap.get_addrs(r->target_name), true /* anon */);
1190 }
1191
1192 r->target_session.reset(new MonConnection(cct, r->target_con, 0,
1193 &auth_registry));
1194 r->target_session->start(monmap.get_epoch(), entity_name);
1195 r->last_send_attempt = ceph_clock_now();
1196
1197 MCommand *m = new MCommand(monmap.fsid);
1198 m->set_tid(r->tid);
1199 m->cmd = r->cmd;
1200 m->set_data(r->inbl);
1201 r->target_session->queue_command(m);
1202 return;
1203 }
1204
1205 // ugly legacy handling of pre-octopus mons
1206 entity_addr_t peer;
1207 if (active_con) {
1208 peer = active_con->get_con()->get_peer_addr();
1209 }
1210
1211 if (r->target_rank >= 0 &&
1212 r->target_rank != monmap.get_rank(peer)) {
1213 ldout(cct, 10) << __func__ << " " << r->tid << " " << r->cmd
1214 << " wants rank " << r->target_rank
1215 << ", reopening session"
1216 << dendl;
1217 if (r->target_rank >= (int)monmap.size()) {
1218 ldout(cct, 10) << " target " << r->target_rank
1219 << " >= max mon " << monmap.size() << dendl;
1220 _finish_command(r, monc_errc::rank_dne, "mon rank dne"sv, {});
1221 return;
1222 }
1223 _reopen_session(r->target_rank);
1224 return;
1225 }
1226 if (r->target_name.length() &&
1227 r->target_name != monmap.get_name(peer)) {
1228 ldout(cct, 10) << __func__ << " " << r->tid << " " << r->cmd
1229 << " wants mon " << r->target_name
1230 << ", reopening session"
1231 << dendl;
1232 if (!monmap.contains(r->target_name)) {
1233 ldout(cct, 10) << " target " << r->target_name
1234 << " not present in monmap" << dendl;
1235 _finish_command(r, monc_errc::mon_dne, "mon dne"sv, {});
1236 return;
1237 }
1238 _reopen_session(monmap.get_rank(r->target_name));
1239 return;
1240 }
1241 // fall-thru to send 'normal' CLI command
1242 }
1243
1244 // normal CLI command
1245 ldout(cct, 10) << __func__ << " " << r->tid << " " << r->cmd << dendl;
1246 auto m = ceph::make_message<MMonCommand>(monmap.fsid);
1247 m->set_tid(r->tid);
1248 m->cmd = r->cmd;
1249 m->set_data(r->inbl);
1250 _send_mon_message(std::move(m));
1251 return;
1252 }
1253
1254 void MonClient::_check_tell_commands()
1255 {
1256 // resend any requests
1257 auto now = ceph_clock_now();
1258 auto p = mon_commands.begin();
1259 while (p != mon_commands.end()) {
1260 auto cmd = p->second;
1261 ++p;
1262 if (cmd->is_tell() &&
1263 cmd->last_send_attempt != utime_t() &&
1264 now - cmd->last_send_attempt > cct->_conf->mon_client_hunt_interval) {
1265 ldout(cct,5) << __func__ << " timeout tell command " << cmd->tid << dendl;
1266 _send_command(cmd); // might remove cmd from mon_commands
1267 }
1268 }
1269 }
1270
1271 void MonClient::_resend_mon_commands()
1272 {
1273 // resend any requests
1274 auto p = mon_commands.begin();
1275 while (p != mon_commands.end()) {
1276 auto cmd = p->second;
1277 ++p;
1278 if (cmd->is_tell() && monmap.min_mon_release >= ceph_release_t::octopus) {
1279 // starting with octopus, tell commands use their own connetion and need no
1280 // special resend when we finish hunting.
1281 } else {
1282 _send_command(cmd); // might remove cmd from mon_commands
1283 }
1284 }
1285 }
1286
1287 void MonClient::handle_mon_command_ack(MMonCommandAck *ack)
1288 {
1289 MonCommand *r = NULL;
1290 uint64_t tid = ack->get_tid();
1291
1292 if (tid == 0 && !mon_commands.empty()) {
1293 r = mon_commands.begin()->second;
1294 ldout(cct, 10) << __func__ << " has tid 0, assuming it is " << r->tid << dendl;
1295 } else {
1296 auto p = mon_commands.find(tid);
1297 if (p == mon_commands.end()) {
1298 ldout(cct, 10) << __func__ << " " << ack->get_tid() << " not found" << dendl;
1299 ack->put();
1300 return;
1301 }
1302 r = p->second;
1303 }
1304
1305 ldout(cct, 10) << __func__ << " " << r->tid << " " << r->cmd << dendl;
1306 auto ec = ack->r < 0 ? bs::error_code(-ack->r, mon_category())
1307 : bs::error_code();
1308 _finish_command(r, ec, ack->rs,
1309 std::move(ack->get_data()));
1310 ack->put();
1311 }
1312
1313 void MonClient::handle_command_reply(MCommandReply *reply)
1314 {
1315 MonCommand *r = NULL;
1316 uint64_t tid = reply->get_tid();
1317
1318 if (tid == 0 && !mon_commands.empty()) {
1319 r = mon_commands.begin()->second;
1320 ldout(cct, 10) << __func__ << " has tid 0, assuming it is " << r->tid
1321 << dendl;
1322 } else {
1323 auto p = mon_commands.find(tid);
1324 if (p == mon_commands.end()) {
1325 ldout(cct, 10) << __func__ << " " << reply->get_tid() << " not found"
1326 << dendl;
1327 reply->put();
1328 return;
1329 }
1330 r = p->second;
1331 }
1332
1333 ldout(cct, 10) << __func__ << " " << r->tid << " " << r->cmd << dendl;
1334 auto ec = reply->r < 0 ? bs::error_code(-reply->r, mon_category())
1335 : bs::error_code();
1336 _finish_command(r, ec, reply->rs, std::move(reply->get_data()));
1337 reply->put();
1338 }
1339
1340 int MonClient::_cancel_mon_command(uint64_t tid)
1341 {
1342 ceph_assert(ceph_mutex_is_locked(monc_lock));
1343
1344 auto it = mon_commands.find(tid);
1345 if (it == mon_commands.end()) {
1346 ldout(cct, 10) << __func__ << " tid " << tid << " dne" << dendl;
1347 return -ENOENT;
1348 }
1349
1350 ldout(cct, 10) << __func__ << " tid " << tid << dendl;
1351
1352 MonCommand *cmd = it->second;
1353 _finish_command(cmd, monc_errc::timed_out, "timed out"sv, {});
1354 return 0;
1355 }
1356
1357 void MonClient::_finish_command(MonCommand *r, bs::error_code ret,
1358 std::string_view rs, ceph::buffer::list&& bl)
1359 {
1360 ldout(cct, 10) << __func__ << " " << r->tid << " = " << ret << " " << rs
1361 << dendl;
1362 ceph::async::post(std::move(r->onfinish), ret, std::string(rs),
1363 std::move(bl));
1364 if (r->target_con) {
1365 r->target_con->mark_down();
1366 }
1367 mon_commands.erase(r->tid);
1368 delete r;
1369 }
1370
1371 // ---------
1372
1373 void MonClient::handle_get_version_reply(MMonGetVersionReply* m)
1374 {
1375 ceph_assert(ceph_mutex_is_locked(monc_lock));
1376 auto iter = version_requests.find(m->handle);
1377 if (iter == version_requests.end()) {
1378 ldout(cct, 0) << __func__ << " version request with handle " << m->handle
1379 << " not found" << dendl;
1380 } else {
1381 auto req = std::move(iter->second);
1382 ldout(cct, 10) << __func__ << " finishing " << iter->first << " version "
1383 << m->version << dendl;
1384 version_requests.erase(iter);
1385 ceph::async::post(std::move(req), bs::error_code(),
1386 m->version, m->oldest_version);
1387 }
1388 m->put();
1389 }
1390
1391 int MonClient::get_auth_request(
1392 Connection *con,
1393 AuthConnectionMeta *auth_meta,
1394 uint32_t *auth_method,
1395 std::vector<uint32_t> *preferred_modes,
1396 ceph::buffer::list *bl)
1397 {
1398 std::lock_guard l(monc_lock);
1399 ldout(cct,10) << __func__ << " con " << con << " auth_method " << *auth_method
1400 << dendl;
1401
1402 // connection to mon?
1403 if (con->get_peer_type() == CEPH_ENTITY_TYPE_MON) {
1404 ceph_assert(!auth_meta->authorizer);
1405 if (con->is_anon()) {
1406 for (auto& i : mon_commands) {
1407 if (i.second->target_con == con) {
1408 return i.second->target_session->get_auth_request(
1409 auth_method, preferred_modes, bl,
1410 entity_name, want_keys, rotating_secrets.get());
1411 }
1412 }
1413 }
1414 for (auto& i : pending_cons) {
1415 if (i.second.is_con(con)) {
1416 return i.second.get_auth_request(
1417 auth_method, preferred_modes, bl,
1418 entity_name, want_keys, rotating_secrets.get());
1419 }
1420 }
1421 return -ENOENT;
1422 }
1423
1424 // generate authorizer
1425 if (!auth) {
1426 lderr(cct) << __func__ << " but no auth handler is set up" << dendl;
1427 return -EACCES;
1428 }
1429 auth_meta->authorizer.reset(auth->build_authorizer(con->get_peer_type()));
1430 if (!auth_meta->authorizer) {
1431 lderr(cct) << __func__ << " failed to build_authorizer for type "
1432 << ceph_entity_type_name(con->get_peer_type()) << dendl;
1433 return -EACCES;
1434 }
1435 auth_meta->auth_method = auth_meta->authorizer->protocol;
1436 auth_registry.get_supported_modes(con->get_peer_type(),
1437 auth_meta->auth_method,
1438 preferred_modes);
1439 *bl = auth_meta->authorizer->bl;
1440 return 0;
1441 }
1442
1443 int MonClient::handle_auth_reply_more(
1444 Connection *con,
1445 AuthConnectionMeta *auth_meta,
1446 const ceph::buffer::list& bl,
1447 ceph::buffer::list *reply)
1448 {
1449 std::lock_guard l(monc_lock);
1450
1451 if (con->get_peer_type() == CEPH_ENTITY_TYPE_MON) {
1452 if (con->is_anon()) {
1453 for (auto& i : mon_commands) {
1454 if (i.second->target_con == con) {
1455 return i.second->target_session->handle_auth_reply_more(
1456 auth_meta, bl, reply);
1457 }
1458 }
1459 }
1460 for (auto& i : pending_cons) {
1461 if (i.second.is_con(con)) {
1462 return i.second.handle_auth_reply_more(auth_meta, bl, reply);
1463 }
1464 }
1465 return -ENOENT;
1466 }
1467
1468 // authorizer challenges
1469 if (!auth || !auth_meta->authorizer) {
1470 lderr(cct) << __func__ << " no authorizer?" << dendl;
1471 return -1;
1472 }
1473 auth_meta->authorizer->add_challenge(cct, bl);
1474 *reply = auth_meta->authorizer->bl;
1475 return 0;
1476 }
1477
1478 int MonClient::handle_auth_done(
1479 Connection *con,
1480 AuthConnectionMeta *auth_meta,
1481 uint64_t global_id,
1482 uint32_t con_mode,
1483 const ceph::buffer::list& bl,
1484 CryptoKey *session_key,
1485 std::string *connection_secret)
1486 {
1487 if (con->get_peer_type() == CEPH_ENTITY_TYPE_MON) {
1488 std::lock_guard l(monc_lock);
1489 if (con->is_anon()) {
1490 for (auto& i : mon_commands) {
1491 if (i.second->target_con == con) {
1492 return i.second->target_session->handle_auth_done(
1493 auth_meta, global_id, bl,
1494 session_key, connection_secret);
1495 }
1496 }
1497 }
1498 for (auto& i : pending_cons) {
1499 if (i.second.is_con(con)) {
1500 int r = i.second.handle_auth_done(
1501 auth_meta, global_id, bl,
1502 session_key, connection_secret);
1503 if (r) {
1504 pending_cons.erase(i.first);
1505 if (!pending_cons.empty()) {
1506 return r;
1507 }
1508 } else {
1509 active_con.reset(new MonConnection(std::move(i.second)));
1510 pending_cons.clear();
1511 ceph_assert(active_con->have_session());
1512 }
1513
1514 _finish_hunting(r);
1515 if (r || monmap.get_epoch() > 0) {
1516 _finish_auth(r);
1517 }
1518 return r;
1519 }
1520 }
1521 return -ENOENT;
1522 } else {
1523 // verify authorizer reply
1524 auto p = bl.begin();
1525 if (!auth_meta->authorizer->verify_reply(p, &auth_meta->connection_secret)) {
1526 ldout(cct, 0) << __func__ << " failed verifying authorizer reply"
1527 << dendl;
1528 return -EACCES;
1529 }
1530 auth_meta->session_key = auth_meta->authorizer->session_key;
1531 return 0;
1532 }
1533 }
1534
1535 int MonClient::handle_auth_bad_method(
1536 Connection *con,
1537 AuthConnectionMeta *auth_meta,
1538 uint32_t old_auth_method,
1539 int result,
1540 const std::vector<uint32_t>& allowed_methods,
1541 const std::vector<uint32_t>& allowed_modes)
1542 {
1543 auth_meta->allowed_methods = allowed_methods;
1544
1545 std::lock_guard l(monc_lock);
1546 if (con->get_peer_type() == CEPH_ENTITY_TYPE_MON) {
1547 if (con->is_anon()) {
1548 for (auto& i : mon_commands) {
1549 if (i.second->target_con == con) {
1550 int r = i.second->target_session->handle_auth_bad_method(
1551 old_auth_method,
1552 result,
1553 allowed_methods,
1554 allowed_modes);
1555 if (r < 0) {
1556 auto ec = bs::error_code(-r, mon_category());
1557 _finish_command(i.second, ec, "auth failed"sv, {});
1558 }
1559 return r;
1560 }
1561 }
1562 }
1563 for (auto& i : pending_cons) {
1564 if (i.second.is_con(con)) {
1565 int r = i.second.handle_auth_bad_method(old_auth_method,
1566 result,
1567 allowed_methods,
1568 allowed_modes);
1569 if (r == 0) {
1570 return r; // try another method on this con
1571 }
1572 pending_cons.erase(i.first);
1573 if (!pending_cons.empty()) {
1574 return r; // fail this con, maybe another con will succeed
1575 }
1576 // fail hunt
1577 _finish_hunting(r);
1578 _finish_auth(r);
1579 return r;
1580 }
1581 }
1582 return -ENOENT;
1583 } else {
1584 // huh...
1585 ldout(cct,10) << __func__ << " hmm, they didn't like " << old_auth_method
1586 << " result " << cpp_strerror(result)
1587 << " and auth is " << (auth ? auth->get_protocol() : 0)
1588 << dendl;
1589 return -EACCES;
1590 }
1591 }
1592
1593 int MonClient::handle_auth_request(
1594 Connection *con,
1595 AuthConnectionMeta *auth_meta,
1596 bool more,
1597 uint32_t auth_method,
1598 const ceph::buffer::list& payload,
1599 ceph::buffer::list *reply)
1600 {
1601 if (payload.length() == 0) {
1602 // for some channels prior to nautilus (osd heartbeat), we
1603 // tolerate the lack of an authorizer.
1604 if (!con->get_messenger()->require_authorizer) {
1605 handle_authentication_dispatcher->ms_handle_fast_authentication(con);
1606 return 1;
1607 }
1608 return -EACCES;
1609 }
1610 auth_meta->auth_mode = payload[0];
1611 if (auth_meta->auth_mode < AUTH_MODE_AUTHORIZER ||
1612 auth_meta->auth_mode > AUTH_MODE_AUTHORIZER_MAX) {
1613 return -EACCES;
1614 }
1615 AuthAuthorizeHandler *ah = get_auth_authorize_handler(con->get_peer_type(),
1616 auth_method);
1617 if (!ah) {
1618 lderr(cct) << __func__ << " no AuthAuthorizeHandler found for auth method "
1619 << auth_method << dendl;
1620 return -EOPNOTSUPP;
1621 }
1622
1623 auto ac = &auth_meta->authorizer_challenge;
1624 if (auth_meta->skip_authorizer_challenge) {
1625 ldout(cct, 10) << __func__ << " skipping challenge on " << con << dendl;
1626 ac = nullptr;
1627 }
1628
1629 bool was_challenge = (bool)auth_meta->authorizer_challenge;
1630 bool isvalid = ah->verify_authorizer(
1631 cct,
1632 *rotating_secrets,
1633 payload,
1634 auth_meta->get_connection_secret_length(),
1635 reply,
1636 &con->peer_name,
1637 &con->peer_global_id,
1638 &con->peer_caps_info,
1639 &auth_meta->session_key,
1640 &auth_meta->connection_secret,
1641 ac);
1642 if (isvalid) {
1643 handle_authentication_dispatcher->ms_handle_fast_authentication(con);
1644 return 1;
1645 }
1646 if (!more && !was_challenge && auth_meta->authorizer_challenge) {
1647 ldout(cct,10) << __func__ << " added challenge on " << con << dendl;
1648 return 0;
1649 }
1650 ldout(cct,10) << __func__ << " bad authorizer on " << con << dendl;
1651 // discard old challenge
1652 auth_meta->authorizer_challenge.reset();
1653 return -EACCES;
1654 }
1655
1656 AuthAuthorizer* MonClient::build_authorizer(int service_id) const {
1657 std::lock_guard l(monc_lock);
1658 if (auth) {
1659 return auth->build_authorizer(service_id);
1660 } else {
1661 ldout(cct, 0) << __func__ << " for " << ceph_entity_type_name(service_id)
1662 << ", but no auth is available now" << dendl;
1663 return nullptr;
1664 }
1665 }
1666
1667 #define dout_subsys ceph_subsys_monc
1668 #undef dout_prefix
1669 #define dout_prefix *_dout << "monclient" << (have_session() ? ": " : "(hunting): ")
1670
1671 MonConnection::MonConnection(
1672 CephContext *cct, ConnectionRef con, uint64_t global_id,
1673 AuthRegistry *ar)
1674 : cct(cct), con(con), global_id(global_id), auth_registry(ar)
1675 {}
1676
1677 MonConnection::~MonConnection()
1678 {
1679 if (con) {
1680 con->mark_down();
1681 con.reset();
1682 }
1683 }
1684
1685 bool MonConnection::have_session() const
1686 {
1687 return state == State::HAVE_SESSION;
1688 }
1689
1690 void MonConnection::start(epoch_t epoch,
1691 const EntityName& entity_name)
1692 {
1693 using ceph::encode;
1694 auth_start = ceph_clock_now();
1695
1696 if (con->get_peer_addr().is_msgr2()) {
1697 ldout(cct, 10) << __func__ << " opening mon connection" << dendl;
1698 state = State::AUTHENTICATING;
1699 con->send_message(new MMonGetMap());
1700 return;
1701 }
1702
1703 // restart authentication handshake
1704 state = State::NEGOTIATING;
1705
1706 // send an initial keepalive to ensure our timestamp is valid by the
1707 // time we are in an OPENED state (by sequencing this before
1708 // authentication).
1709 con->send_keepalive();
1710
1711 auto m = new MAuth;
1712 m->protocol = CEPH_AUTH_UNKNOWN;
1713 m->monmap_epoch = epoch;
1714 __u8 struct_v = 1;
1715 encode(struct_v, m->auth_payload);
1716 std::vector<uint32_t> auth_supported;
1717 auth_registry->get_supported_methods(con->get_peer_type(), &auth_supported);
1718 encode(auth_supported, m->auth_payload);
1719 encode(entity_name, m->auth_payload);
1720 encode(global_id, m->auth_payload);
1721 con->send_message(m);
1722 }
1723
1724 int MonConnection::get_auth_request(
1725 uint32_t *method,
1726 std::vector<uint32_t> *preferred_modes,
1727 ceph::buffer::list *bl,
1728 const EntityName& entity_name,
1729 uint32_t want_keys,
1730 RotatingKeyRing* keyring)
1731 {
1732 using ceph::encode;
1733 // choose method
1734 if (auth_method < 0) {
1735 std::vector<uint32_t> as;
1736 auth_registry->get_supported_methods(con->get_peer_type(), &as);
1737 if (as.empty()) {
1738 return -EACCES;
1739 }
1740 auth_method = as.front();
1741 }
1742 *method = auth_method;
1743 auth_registry->get_supported_modes(con->get_peer_type(), auth_method,
1744 preferred_modes);
1745 ldout(cct,10) << __func__ << " method " << *method
1746 << " preferred_modes " << *preferred_modes << dendl;
1747 if (preferred_modes->empty()) {
1748 return -EACCES;
1749 }
1750
1751 int r = _init_auth(*method, entity_name, want_keys, keyring, true);
1752 ceph_assert(r == 0);
1753
1754 // initial requset includes some boilerplate...
1755 encode((char)AUTH_MODE_MON, *bl);
1756 encode(entity_name, *bl);
1757 encode(global_id, *bl);
1758
1759 // and (maybe) some method-specific initial payload
1760 auth->build_initial_request(bl);
1761
1762 return 0;
1763 }
1764
1765 int MonConnection::handle_auth_reply_more(
1766 AuthConnectionMeta *auth_meta,
1767 const ceph::buffer::list& bl,
1768 ceph::buffer::list *reply)
1769 {
1770 ldout(cct, 10) << __func__ << " payload " << bl.length() << dendl;
1771 ldout(cct, 30) << __func__ << " got\n";
1772 bl.hexdump(*_dout);
1773 *_dout << dendl;
1774
1775 auto p = bl.cbegin();
1776 ldout(cct, 10) << __func__ << " payload_len " << bl.length() << dendl;
1777 int r = auth->handle_response(0, p, &auth_meta->session_key,
1778 &auth_meta->connection_secret);
1779 if (r == -EAGAIN) {
1780 auth->prepare_build_request();
1781 auth->build_request(*reply);
1782 ldout(cct, 10) << __func__ << " responding with " << reply->length()
1783 << " bytes" << dendl;
1784 r = 0;
1785 } else if (r < 0) {
1786 lderr(cct) << __func__ << " handle_response returned " << r << dendl;
1787 } else {
1788 ldout(cct, 10) << __func__ << " authenticated!" << dendl;
1789 // FIXME
1790 ceph_abort(cct, "write me");
1791 }
1792 return r;
1793 }
1794
1795 int MonConnection::handle_auth_done(
1796 AuthConnectionMeta *auth_meta,
1797 uint64_t new_global_id,
1798 const ceph::buffer::list& bl,
1799 CryptoKey *session_key,
1800 std::string *connection_secret)
1801 {
1802 ldout(cct,10) << __func__ << " global_id " << new_global_id
1803 << " payload " << bl.length()
1804 << dendl;
1805 global_id = new_global_id;
1806 auth->set_global_id(global_id);
1807 auto p = bl.begin();
1808 int auth_err = auth->handle_response(0, p, &auth_meta->session_key,
1809 &auth_meta->connection_secret);
1810 if (auth_err >= 0) {
1811 state = State::HAVE_SESSION;
1812 }
1813 con->set_last_keepalive_ack(auth_start);
1814
1815 if (pending_tell_command) {
1816 con->send_message2(std::move(pending_tell_command));
1817 }
1818 return auth_err;
1819 }
1820
1821 int MonConnection::handle_auth_bad_method(
1822 uint32_t old_auth_method,
1823 int result,
1824 const std::vector<uint32_t>& allowed_methods,
1825 const std::vector<uint32_t>& allowed_modes)
1826 {
1827 ldout(cct,10) << __func__ << " old_auth_method " << old_auth_method
1828 << " result " << cpp_strerror(result)
1829 << " allowed_methods " << allowed_methods << dendl;
1830 std::vector<uint32_t> auth_supported;
1831 auth_registry->get_supported_methods(con->get_peer_type(), &auth_supported);
1832 auto p = std::find(auth_supported.begin(), auth_supported.end(),
1833 old_auth_method);
1834 assert(p != auth_supported.end());
1835 p = std::find_first_of(std::next(p), auth_supported.end(),
1836 allowed_methods.begin(), allowed_methods.end());
1837 if (p == auth_supported.end()) {
1838 lderr(cct) << __func__ << " server allowed_methods " << allowed_methods
1839 << " but i only support " << auth_supported << dendl;
1840 return -EACCES;
1841 }
1842 auth_method = *p;
1843 ldout(cct,10) << __func__ << " will try " << auth_method << " next" << dendl;
1844 return 0;
1845 }
1846
1847 int MonConnection::handle_auth(MAuthReply* m,
1848 const EntityName& entity_name,
1849 uint32_t want_keys,
1850 RotatingKeyRing* keyring)
1851 {
1852 if (state == State::NEGOTIATING) {
1853 int r = _negotiate(m, entity_name, want_keys, keyring);
1854 if (r) {
1855 return r;
1856 }
1857 state = State::AUTHENTICATING;
1858 }
1859 int r = authenticate(m);
1860 if (!r) {
1861 state = State::HAVE_SESSION;
1862 }
1863 return r;
1864 }
1865
1866 int MonConnection::_negotiate(MAuthReply *m,
1867 const EntityName& entity_name,
1868 uint32_t want_keys,
1869 RotatingKeyRing* keyring)
1870 {
1871 ldout(cct, 10) << __func__ << dendl;
1872 int r = _init_auth(m->protocol, entity_name, want_keys, keyring, false);
1873 if (r == -ENOTSUP) {
1874 if (m->result == -ENOTSUP) {
1875 ldout(cct, 10) << "none of our auth protocols are supported by the server"
1876 << dendl;
1877 }
1878 return m->result;
1879 }
1880 return r;
1881 }
1882
1883 int MonConnection::_init_auth(
1884 uint32_t method,
1885 const EntityName& entity_name,
1886 uint32_t want_keys,
1887 RotatingKeyRing* keyring,
1888 bool msgr2)
1889 {
1890 ldout(cct, 10) << __func__ << " method " << method << dendl;
1891 if (auth && auth->get_protocol() == (int)method) {
1892 ldout(cct, 10) << __func__ << " already have auth, reseting" << dendl;
1893 auth->reset();
1894 return 0;
1895 }
1896
1897 ldout(cct, 10) << __func__ << " creating new auth" << dendl;
1898 auth.reset(AuthClientHandler::create(cct, method, keyring));
1899 if (!auth) {
1900 ldout(cct, 10) << " no handler for protocol " << method << dendl;
1901 return -ENOTSUP;
1902 }
1903
1904 // do not request MGR key unless the mon has the SERVER_KRAKEN
1905 // feature. otherwise it will give us an auth error. note that
1906 // we have to use the FEATUREMASK because pre-jewel the kraken
1907 // feature bit was used for something else.
1908 if (!msgr2 &&
1909 (want_keys & CEPH_ENTITY_TYPE_MGR) &&
1910 !(con->has_features(CEPH_FEATUREMASK_SERVER_KRAKEN))) {
1911 ldout(cct, 1) << __func__
1912 << " not requesting MGR keys from pre-kraken monitor"
1913 << dendl;
1914 want_keys &= ~CEPH_ENTITY_TYPE_MGR;
1915 }
1916 auth->set_want_keys(want_keys);
1917 auth->init(entity_name);
1918 auth->set_global_id(global_id);
1919 return 0;
1920 }
1921
1922 int MonConnection::authenticate(MAuthReply *m)
1923 {
1924 ceph_assert(auth);
1925 if (!m->global_id) {
1926 ldout(cct, 1) << "peer sent an invalid global_id" << dendl;
1927 }
1928 if (m->global_id != global_id) {
1929 // it's a new session
1930 auth->reset();
1931 global_id = m->global_id;
1932 auth->set_global_id(global_id);
1933 ldout(cct, 10) << "my global_id is " << m->global_id << dendl;
1934 }
1935 auto p = m->result_bl.cbegin();
1936 int ret = auth->handle_response(m->result, p, nullptr, nullptr);
1937 if (ret == -EAGAIN) {
1938 auto ma = new MAuth;
1939 ma->protocol = auth->get_protocol();
1940 auth->prepare_build_request();
1941 auth->build_request(ma->auth_payload);
1942 con->send_message(ma);
1943 }
1944 if (ret == 0 && pending_tell_command) {
1945 con->send_message2(std::move(pending_tell_command));
1946 }
1947
1948 return ret;
1949 }
1950
1951 void MonClient::register_config_callback(md_config_t::config_callback fn) {
1952 ceph_assert(!config_cb);
1953 config_cb = fn;
1954 }
1955
1956 md_config_t::config_callback MonClient::get_config_callback() {
1957 return config_cb;
1958 }
1959
1960 #pragma GCC diagnostic push
1961 #pragma GCC diagnostic ignored "-Wnon-virtual-dtor"
1962 #pragma clang diagnostic push
1963 #pragma clang diagnostic ignored "-Wnon-virtual-dtor"
1964 class monc_error_category : public ceph::converting_category {
1965 public:
1966 monc_error_category(){}
1967 const char* name() const noexcept override;
1968 const char* message(int ev, char*, std::size_t) const noexcept override;
1969 std::string message(int ev) const override;
1970 bs::error_condition default_error_condition(int ev) const noexcept
1971 override;
1972 bool equivalent(int ev, const bs::error_condition& c) const
1973 noexcept override;
1974 using ceph::converting_category::equivalent;
1975 int from_code(int ev) const noexcept override;
1976 };
1977 #pragma GCC diagnostic pop
1978 #pragma clang diagnostic pop
1979
1980 const char* monc_error_category::name() const noexcept {
1981 return "monc";
1982 }
1983
1984 const char* monc_error_category::message(int ev, char*, std::size_t) const noexcept {
1985 if (ev == 0)
1986 return "No error";
1987
1988 switch (static_cast<monc_errc>(ev)) {
1989 case monc_errc::shutting_down: // Command failed due to MonClient shutting down
1990 return "Command failed due to MonClient shutting down";
1991 case monc_errc::session_reset:
1992 return "Monitor session was reset";
1993 case monc_errc::rank_dne:
1994 return "Requested monitor rank does not exist";
1995 case monc_errc::mon_dne:
1996 return "Requested monitor does not exist";
1997 case monc_errc::timed_out:
1998 return "Monitor operation timed out";
1999 case monc_errc::mon_unavailable:
2000 return "Monitor unavailable";
2001 }
2002
2003 return "Unknown error";
2004 }
2005
2006 std::string monc_error_category::message(int ev) const {
2007 return message(ev, nullptr, 0);
2008 }
2009
2010 bs::error_condition monc_error_category::default_error_condition(int ev) const noexcept {
2011 switch (static_cast<monc_errc>(ev)) {
2012 case monc_errc::shutting_down:
2013 return bs::errc::operation_canceled;
2014 case monc_errc::session_reset:
2015 return bs::errc::resource_unavailable_try_again;
2016 case monc_errc::rank_dne:
2017 [[fallthrough]];
2018 case monc_errc::mon_dne:
2019 return ceph::errc::not_in_map;
2020 case monc_errc::timed_out:
2021 return bs::errc::timed_out;
2022 case monc_errc::mon_unavailable:
2023 return bs::errc::no_such_device;
2024 }
2025 return { ev, *this };
2026 }
2027
2028 bool monc_error_category::equivalent(int ev, const bs::error_condition& c) const noexcept {
2029 switch (static_cast<monc_errc>(ev)) {
2030 case monc_errc::rank_dne:
2031 [[fallthrough]];
2032 case monc_errc::mon_dne:
2033 return c == bs::errc::no_such_file_or_directory;
2034 default:
2035 return default_error_condition(ev) == c;
2036 }
2037 }
2038
2039 int monc_error_category::from_code(int ev) const noexcept {
2040 if (ev == 0)
2041 return 0;
2042
2043 switch (static_cast<monc_errc>(ev)) {
2044 case monc_errc::shutting_down:
2045 return -ECANCELED;
2046 case monc_errc::session_reset:
2047 return -EAGAIN;
2048 case monc_errc::rank_dne:
2049 [[fallthrough]];
2050 case monc_errc::mon_dne:
2051 return -ENOENT;
2052 case monc_errc::timed_out:
2053 return -ETIMEDOUT;
2054 case monc_errc::mon_unavailable:
2055 return -ENXIO;
2056 }
2057 return -EDOM;
2058 }
2059
2060 const bs::error_category& monc_category() noexcept {
2061 static const monc_error_category c;
2062 return c;
2063 }