]> git.proxmox.com Git - ceph.git/blob - ceph/src/mon/MonClient.cc
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / mon / MonClient.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #include <algorithm>
16 #include <iterator>
17 #include <random>
18 #include <boost/range/adaptor/map.hpp>
19 #include <boost/range/adaptor/filtered.hpp>
20 #include <boost/range/algorithm/copy.hpp>
21 #include <boost/range/algorithm_ext/copy_n.hpp>
22 #include "common/weighted_shuffle.h"
23
24 #include "include/random.h"
25 #include "include/scope_guard.h"
26 #include "include/stringify.h"
27
28 #include "messages/MMonGetMap.h"
29 #include "messages/MMonGetVersion.h"
30 #include "messages/MMonGetMap.h"
31 #include "messages/MMonGetVersionReply.h"
32 #include "messages/MMonMap.h"
33 #include "messages/MConfig.h"
34 #include "messages/MGetConfig.h"
35 #include "messages/MAuth.h"
36 #include "messages/MLogAck.h"
37 #include "messages/MAuthReply.h"
38 #include "messages/MMonCommand.h"
39 #include "messages/MMonCommandAck.h"
40 #include "messages/MCommand.h"
41 #include "messages/MCommandReply.h"
42 #include "messages/MPing.h"
43
44 #include "messages/MMonSubscribe.h"
45 #include "messages/MMonSubscribeAck.h"
46 #include "common/errno.h"
47 #include "common/hostname.h"
48 #include "common/LogClient.h"
49
50 #include "MonClient.h"
51 #include "error_code.h"
52 #include "MonMap.h"
53
54 #include "auth/Auth.h"
55 #include "auth/KeyRing.h"
56 #include "auth/AuthClientHandler.h"
57 #include "auth/AuthRegistry.h"
58 #include "auth/RotatingKeyRing.h"
59
60 #define dout_subsys ceph_subsys_monc
61 #undef dout_prefix
62 #define dout_prefix *_dout << "monclient" << (_hunting() ? "(hunting)":"") << ": "
63
64 namespace bs = boost::system;
65 using std::string;
66 using namespace std::literals;
67
68 MonClient::MonClient(CephContext *cct_, boost::asio::io_context& service) :
69 Dispatcher(cct_),
70 AuthServer(cct_),
71 messenger(NULL),
72 timer(cct_, monc_lock),
73 service(service),
74 initialized(false),
75 log_client(NULL),
76 more_log_pending(false),
77 want_monmap(true),
78 had_a_connection(false),
79 reopen_interval_multiplier(
80 cct_->_conf.get_val<double>("mon_client_hunt_interval_min_multiple")),
81 last_mon_command_tid(0),
82 version_req_id(0)
83 {}
84
85 MonClient::~MonClient()
86 {
87 }
88
89 int MonClient::build_initial_monmap()
90 {
91 ldout(cct, 10) << __func__ << dendl;
92 int r = monmap.build_initial(cct, false, std::cerr);
93 ldout(cct,10) << "monmap:\n";
94 monmap.print(*_dout);
95 *_dout << dendl;
96 return r;
97 }
98
99 int MonClient::get_monmap()
100 {
101 ldout(cct, 10) << __func__ << dendl;
102 std::unique_lock l(monc_lock);
103
104 sub.want("monmap", 0, 0);
105 if (!_opened())
106 _reopen_session();
107 map_cond.wait(l, [this] { return !want_monmap; });
108 ldout(cct, 10) << __func__ << " done" << dendl;
109 return 0;
110 }
111
112 int MonClient::get_monmap_and_config()
113 {
114 ldout(cct, 10) << __func__ << dendl;
115 ceph_assert(!messenger);
116
117 int tries = 10;
118
119 cct->init_crypto();
120 auto shutdown_crypto = make_scope_guard([this] {
121 cct->shutdown_crypto();
122 });
123
124 int r = build_initial_monmap();
125 if (r < 0) {
126 lderr(cct) << __func__ << " cannot identify monitors to contact" << dendl;
127 return r;
128 }
129
130 messenger = Messenger::create_client_messenger(
131 cct, "temp_mon_client");
132 ceph_assert(messenger);
133 messenger->add_dispatcher_head(this);
134 messenger->start();
135 auto shutdown_msgr = make_scope_guard([this] {
136 messenger->shutdown();
137 messenger->wait();
138 delete messenger;
139 messenger = nullptr;
140 if (!monmap.fsid.is_zero()) {
141 cct->_conf.set_val("fsid", stringify(monmap.fsid));
142 }
143 });
144
145 want_bootstrap_config = true;
146 auto shutdown_config = make_scope_guard([this] {
147 std::unique_lock l(monc_lock);
148 want_bootstrap_config = false;
149 bootstrap_config.reset();
150 });
151
152 ceph::ref_t<MConfig> config;
153 while (tries-- > 0) {
154 r = init();
155 if (r < 0) {
156 return r;
157 }
158 r = authenticate(cct->_conf->client_mount_timeout);
159 if (r == -ETIMEDOUT) {
160 shutdown();
161 continue;
162 }
163 if (r < 0) {
164 break;
165 }
166 {
167 std::unique_lock l(monc_lock);
168 if (monmap.get_epoch() &&
169 !monmap.persistent_features.contains_all(
170 ceph::features::mon::FEATURE_MIMIC)) {
171 ldout(cct,10) << __func__ << " pre-mimic monitor, no config to fetch"
172 << dendl;
173 r = 0;
174 break;
175 }
176 while ((!bootstrap_config || monmap.get_epoch() == 0) && r == 0) {
177 ldout(cct,20) << __func__ << " waiting for monmap|config" << dendl;
178 auto status = map_cond.wait_for(l, ceph::make_timespan(
179 cct->_conf->mon_client_hunt_interval));
180 if (status == std::cv_status::timeout) {
181 r = -ETIMEDOUT;
182 }
183 }
184
185 if (bootstrap_config) {
186 ldout(cct,10) << __func__ << " success" << dendl;
187 config = std::move(bootstrap_config);
188 r = 0;
189 break;
190 }
191 }
192 lderr(cct) << __func__ << " failed to get config" << dendl;
193 shutdown();
194 continue;
195 }
196
197 if (config) {
198 // apply the bootstrap config to ensure its applied prior to completing
199 // the bootstrap
200 cct->_conf.set_mon_vals(cct, config->config, config_cb);
201 }
202
203 shutdown();
204 return r;
205 }
206
207
208 /**
209 * Ping the monitor with id @p mon_id and set the resulting reply in
210 * the provided @p result_reply, if this last parameter is not NULL.
211 *
212 * So that we don't rely on the MonClient's default messenger, set up
213 * during connect(), we create our own messenger to comunicate with the
214 * specified monitor. This is advantageous in the following ways:
215 *
216 * - Isolate the ping procedure from the rest of the MonClient's operations,
217 * allowing us to not acquire or manage the big monc_lock, thus not
218 * having to block waiting for some other operation to finish before we
219 * can proceed.
220 * * for instance, we can ping mon.FOO even if we are currently hunting
221 * or blocked waiting for auth to complete with mon.BAR.
222 *
223 * - Ping a monitor prior to establishing a connection (using connect())
224 * and properly establish the MonClient's messenger. This frees us
225 * from dealing with the complex foo that happens in connect().
226 *
227 * We also don't rely on MonClient as a dispatcher for this messenger,
228 * unlike what happens with the MonClient's default messenger. This allows
229 * us to sandbox the whole ping, having it much as a separate entity in
230 * the MonClient class, considerably simplifying the handling and dispatching
231 * of messages without needing to consider monc_lock.
232 *
233 * Current drawback is that we will establish a messenger for each ping
234 * we want to issue, instead of keeping a single messenger instance that
235 * would be used for all pings.
236 */
237 int MonClient::ping_monitor(const string &mon_id, string *result_reply)
238 {
239 ldout(cct, 10) << __func__ << dendl;
240
241 string new_mon_id;
242 if (monmap.contains("noname-"+mon_id)) {
243 new_mon_id = "noname-"+mon_id;
244 } else {
245 new_mon_id = mon_id;
246 }
247
248 if (new_mon_id.empty()) {
249 ldout(cct, 10) << __func__ << " specified mon id is empty!" << dendl;
250 return -EINVAL;
251 } else if (!monmap.contains(new_mon_id)) {
252 ldout(cct, 10) << __func__ << " no such monitor 'mon." << new_mon_id << "'"
253 << dendl;
254 return -ENOENT;
255 }
256
257 // N.B. monc isn't initialized
258
259 auth_registry.refresh_config();
260
261 KeyRing keyring;
262 keyring.from_ceph_context(cct);
263 RotatingKeyRing rkeyring(cct, cct->get_module_type(), &keyring);
264
265 MonClientPinger *pinger = new MonClientPinger(cct,
266 &rkeyring,
267 result_reply);
268
269 Messenger *smsgr = Messenger::create_client_messenger(cct, "temp_ping_client");
270 smsgr->add_dispatcher_head(pinger);
271 smsgr->set_auth_client(pinger);
272 smsgr->start();
273
274 ConnectionRef con = smsgr->connect_to_mon(monmap.get_addrs(new_mon_id));
275 ldout(cct, 10) << __func__ << " ping mon." << new_mon_id
276 << " " << con->get_peer_addr() << dendl;
277
278 pinger->mc.reset(new MonConnection(cct, con, 0, &auth_registry));
279 pinger->mc->start(monmap.get_epoch(), entity_name);
280 con->send_message(new MPing);
281
282 int ret = pinger->wait_for_reply(cct->_conf->mon_client_ping_timeout);
283 if (ret == 0) {
284 ldout(cct,10) << __func__ << " got ping reply" << dendl;
285 } else {
286 ret = -ret;
287 }
288
289 con->mark_down();
290 pinger->mc.reset();
291 smsgr->shutdown();
292 smsgr->wait();
293 delete smsgr;
294 delete pinger;
295 return ret;
296 }
297
298 bool MonClient::ms_dispatch(Message *m)
299 {
300 // we only care about these message types
301 switch (m->get_type()) {
302 case CEPH_MSG_MON_MAP:
303 case CEPH_MSG_AUTH_REPLY:
304 case CEPH_MSG_MON_SUBSCRIBE_ACK:
305 case CEPH_MSG_MON_GET_VERSION_REPLY:
306 case MSG_MON_COMMAND_ACK:
307 case MSG_COMMAND_REPLY:
308 case MSG_LOGACK:
309 case MSG_CONFIG:
310 break;
311 case CEPH_MSG_PING:
312 m->put();
313 return true;
314 default:
315 return false;
316 }
317
318 std::lock_guard lock(monc_lock);
319
320 if (!m->get_connection()->is_anon() &&
321 m->get_source().type() == CEPH_ENTITY_TYPE_MON) {
322 if (_hunting()) {
323 auto p = _find_pending_con(m->get_connection());
324 if (p == pending_cons.end()) {
325 // ignore any messages outside hunting sessions
326 ldout(cct, 10) << "discarding stray monitor message " << *m << dendl;
327 m->put();
328 return true;
329 }
330 } else if (!active_con || active_con->get_con() != m->get_connection()) {
331 // ignore any messages outside our session(s)
332 ldout(cct, 10) << "discarding stray monitor message " << *m << dendl;
333 m->put();
334 return true;
335 }
336 }
337
338 switch (m->get_type()) {
339 case CEPH_MSG_MON_MAP:
340 handle_monmap(static_cast<MMonMap*>(m));
341 if (passthrough_monmap) {
342 return false;
343 } else {
344 m->put();
345 }
346 break;
347 case CEPH_MSG_AUTH_REPLY:
348 handle_auth(static_cast<MAuthReply*>(m));
349 break;
350 case CEPH_MSG_MON_SUBSCRIBE_ACK:
351 handle_subscribe_ack(static_cast<MMonSubscribeAck*>(m));
352 break;
353 case CEPH_MSG_MON_GET_VERSION_REPLY:
354 handle_get_version_reply(static_cast<MMonGetVersionReply*>(m));
355 break;
356 case MSG_MON_COMMAND_ACK:
357 handle_mon_command_ack(static_cast<MMonCommandAck*>(m));
358 break;
359 case MSG_COMMAND_REPLY:
360 if (m->get_connection()->is_anon() &&
361 m->get_source().type() == CEPH_ENTITY_TYPE_MON) {
362 // this connection is from 'tell'... ignore everything except our command
363 // reply. (we'll get misc other message because we authenticated, but we
364 // don't need them.)
365 handle_command_reply(static_cast<MCommandReply*>(m));
366 return true;
367 }
368 // leave the message for another dispatch handler (e.g., Objecter)
369 return false;
370 case MSG_LOGACK:
371 if (log_client) {
372 log_client->handle_log_ack(static_cast<MLogAck*>(m));
373 m->put();
374 if (more_log_pending) {
375 send_log();
376 }
377 } else {
378 m->put();
379 }
380 break;
381 case MSG_CONFIG:
382 handle_config(static_cast<MConfig*>(m));
383 break;
384 }
385 return true;
386 }
387
388 void MonClient::send_log(bool flush)
389 {
390 if (log_client) {
391 auto lm = log_client->get_mon_log_message(flush);
392 if (lm)
393 _send_mon_message(std::move(lm));
394 more_log_pending = log_client->are_pending();
395 }
396 }
397
398 void MonClient::flush_log()
399 {
400 std::lock_guard l(monc_lock);
401 send_log();
402 }
403
404 /* Unlike all the other message-handling functions, we don't put away a reference
405 * because we want to support MMonMap passthrough to other Dispatchers. */
406 void MonClient::handle_monmap(MMonMap *m)
407 {
408 ldout(cct, 10) << __func__ << " " << *m << dendl;
409 auto con_addrs = m->get_source_addrs();
410 string old_name = monmap.get_name(con_addrs);
411 const auto old_epoch = monmap.get_epoch();
412
413 auto p = m->monmapbl.cbegin();
414 decode(monmap, p);
415
416 ldout(cct, 10) << " got monmap " << monmap.epoch
417 << " from mon." << old_name
418 << " (according to old e" << monmap.get_epoch() << ")"
419 << dendl;
420 ldout(cct, 10) << "dump:\n";
421 monmap.print(*_dout);
422 *_dout << dendl;
423
424 if (old_epoch != monmap.get_epoch()) {
425 tried.clear();
426 }
427 if (old_name.size() == 0) {
428 ldout(cct,10) << " can't identify which mon we were connected to" << dendl;
429 _reopen_session();
430 } else {
431 auto new_name = monmap.get_name(con_addrs);
432 if (new_name.empty()) {
433 ldout(cct, 10) << "mon." << old_name << " at " << con_addrs
434 << " went away" << dendl;
435 // can't find the mon we were talking to (above)
436 _reopen_session();
437 } else if (messenger->should_use_msgr2() &&
438 monmap.get_addrs(new_name).has_msgr2() &&
439 !con_addrs.has_msgr2()) {
440 ldout(cct,1) << " mon." << new_name << " has (v2) addrs "
441 << monmap.get_addrs(new_name) << " but i'm connected to "
442 << con_addrs << ", reconnecting" << dendl;
443 _reopen_session();
444 }
445 }
446
447 cct->set_mon_addrs(monmap);
448
449 sub.got("monmap", monmap.get_epoch());
450 map_cond.notify_all();
451 want_monmap = false;
452
453 if (authenticate_err == 1) {
454 _finish_auth(0);
455 }
456 }
457
458 void MonClient::handle_config(MConfig *m)
459 {
460 ldout(cct,10) << __func__ << " " << *m << dendl;
461
462 if (want_bootstrap_config) {
463 // get_monmap_and_config is waiting for config which it will apply
464 // synchronously
465 bootstrap_config = ceph::ref_t<MConfig>(m, false);
466 map_cond.notify_all();
467 return;
468 }
469
470 // Take the sledgehammer approach to ensuring we don't depend on
471 // anything in MonClient.
472 boost::asio::post(finish_strand,
473 [m, cct = boost::intrusive_ptr<CephContext>(cct),
474 config_notify_cb = config_notify_cb,
475 config_cb = config_cb]() {
476 cct->_conf.set_mon_vals(cct.get(), m->config, config_cb);
477 if (config_notify_cb) {
478 config_notify_cb();
479 }
480 m->put();
481 });
482 }
483
484 // ----------------------
485
486 int MonClient::init()
487 {
488 ldout(cct, 10) << __func__ << dendl;
489
490 entity_name = cct->_conf->name;
491
492 auth_registry.refresh_config();
493
494 std::lock_guard l(monc_lock);
495 keyring.reset(new KeyRing);
496 if (auth_registry.is_supported_method(messenger->get_mytype(),
497 CEPH_AUTH_CEPHX)) {
498 // this should succeed, because auth_registry just checked!
499 int r = keyring->from_ceph_context(cct);
500 if (r != 0) {
501 // but be somewhat graceful in case there was a race condition
502 lderr(cct) << "keyring not found" << dendl;
503 return r;
504 }
505 }
506 if (!auth_registry.any_supported_methods(messenger->get_mytype())) {
507 return -ENOENT;
508 }
509
510 rotating_secrets.reset(
511 new RotatingKeyRing(cct, cct->get_module_type(), keyring.get()));
512
513 initialized = true;
514
515 messenger->set_auth_client(this);
516 messenger->add_dispatcher_head(this);
517
518 timer.init();
519 schedule_tick();
520
521 return 0;
522 }
523
524 void MonClient::shutdown()
525 {
526 ldout(cct, 10) << __func__ << dendl;
527 monc_lock.lock();
528 stopping = true;
529 while (!version_requests.empty()) {
530 ceph::async::post(std::move(version_requests.begin()->second),
531 monc_errc::shutting_down, 0, 0);
532 ldout(cct, 20) << __func__ << " canceling and discarding version request "
533 << version_requests.begin()->first << dendl;
534 version_requests.erase(version_requests.begin());
535 }
536 while (!mon_commands.empty()) {
537 auto tid = mon_commands.begin()->first;
538 _cancel_mon_command(tid);
539 }
540 ldout(cct, 20) << __func__ << " discarding " << waiting_for_session.size()
541 << " pending message(s)" << dendl;
542 waiting_for_session.clear();
543
544 active_con.reset();
545 pending_cons.clear();
546
547 auth.reset();
548 global_id = 0;
549 authenticate_err = 0;
550 authenticated = false;
551
552 monc_lock.unlock();
553
554 if (initialized) {
555 initialized = false;
556 }
557 monc_lock.lock();
558 timer.shutdown();
559 stopping = false;
560 monc_lock.unlock();
561 }
562
563 int MonClient::authenticate(double timeout)
564 {
565 std::unique_lock lock{monc_lock};
566
567 if (active_con) {
568 ldout(cct, 5) << "already authenticated" << dendl;
569 return 0;
570 }
571 sub.want("monmap", monmap.get_epoch() ? monmap.get_epoch() + 1 : 0, 0);
572 sub.want("config", 0, 0);
573 if (!_opened())
574 _reopen_session();
575
576 auto until = ceph::real_clock::now();
577 until += ceph::make_timespan(timeout);
578 if (timeout > 0.0)
579 ldout(cct, 10) << "authenticate will time out at " << until << dendl;
580 authenticate_err = 1; // == in progress
581 while (!active_con && authenticate_err >= 0) {
582 if (timeout > 0.0) {
583 auto r = auth_cond.wait_until(lock, until);
584 if (r == std::cv_status::timeout && !active_con) {
585 ldout(cct, 0) << "authenticate timed out after " << timeout << dendl;
586 authenticate_err = -ETIMEDOUT;
587 }
588 } else {
589 auth_cond.wait(lock);
590 }
591 }
592
593 if (active_con) {
594 ldout(cct, 5) << __func__ << " success, global_id "
595 << active_con->get_global_id() << dendl;
596 // active_con should not have been set if there was an error
597 ceph_assert(authenticate_err >= 0);
598 authenticated = true;
599 }
600
601 if (authenticate_err < 0 && auth_registry.no_keyring_disabled_cephx()) {
602 lderr(cct) << __func__ << " NOTE: no keyring found; disabled cephx authentication" << dendl;
603 }
604
605 return authenticate_err;
606 }
607
608 void MonClient::handle_auth(MAuthReply *m)
609 {
610 ceph_assert(ceph_mutex_is_locked(monc_lock));
611
612 if (m->get_connection()->is_anon()) {
613 // anon connection, used for mon tell commands
614 for (auto& p : mon_commands) {
615 if (p.second->target_con == m->get_connection()) {
616 auto& mc = p.second->target_session;
617 int ret = mc->handle_auth(m, entity_name,
618 CEPH_ENTITY_TYPE_MON,
619 rotating_secrets.get());
620 (void)ret; // we don't care
621 break;
622 }
623 }
624 m->put();
625 return;
626 }
627
628 if (!_hunting()) {
629 std::swap(active_con->get_auth(), auth);
630 int ret = active_con->authenticate(m);
631 m->put();
632 std::swap(auth, active_con->get_auth());
633 if (global_id != active_con->get_global_id()) {
634 lderr(cct) << __func__ << " peer assigned me a different global_id: "
635 << active_con->get_global_id() << dendl;
636 }
637 if (ret != -EAGAIN) {
638 _finish_auth(ret);
639 }
640 return;
641 }
642
643 // hunting
644 auto found = _find_pending_con(m->get_connection());
645 ceph_assert(found != pending_cons.end());
646 int auth_err = found->second.handle_auth(m, entity_name, want_keys,
647 rotating_secrets.get());
648 m->put();
649 if (auth_err == -EAGAIN) {
650 return;
651 }
652 if (auth_err) {
653 pending_cons.erase(found);
654 if (!pending_cons.empty()) {
655 // keep trying with pending connections
656 return;
657 }
658 // the last try just failed, give up.
659 } else {
660 auto& mc = found->second;
661 ceph_assert(mc.have_session());
662 active_con.reset(new MonConnection(std::move(mc)));
663 pending_cons.clear();
664 }
665
666 _finish_hunting(auth_err);
667 _finish_auth(auth_err);
668 }
669
670 void MonClient::_finish_auth(int auth_err)
671 {
672 ldout(cct,10) << __func__ << " " << auth_err << dendl;
673 authenticate_err = auth_err;
674 // _resend_mon_commands() could _reopen_session() if the connected mon is not
675 // the one the MonCommand is targeting.
676 if (!auth_err && active_con) {
677 ceph_assert(auth);
678 _check_auth_tickets();
679 }
680 auth_cond.notify_all();
681
682 if (!auth_err) {
683 Context *cb = nullptr;
684 if (session_established_context) {
685 cb = session_established_context.release();
686 }
687 if (cb) {
688 monc_lock.unlock();
689 cb->complete(0);
690 monc_lock.lock();
691 }
692 }
693 }
694
695 // ---------
696
697 void MonClient::send_mon_message(MessageRef m)
698 {
699 std::lock_guard l{monc_lock};
700 _send_mon_message(std::move(m));
701 }
702
703 void MonClient::_send_mon_message(MessageRef m)
704 {
705 ceph_assert(ceph_mutex_is_locked(monc_lock));
706 if (active_con) {
707 auto cur_con = active_con->get_con();
708 ldout(cct, 10) << "_send_mon_message to mon."
709 << monmap.get_name(cur_con->get_peer_addr())
710 << " at " << cur_con->get_peer_addr() << dendl;
711 cur_con->send_message2(std::move(m));
712 } else {
713 waiting_for_session.push_back(std::move(m));
714 }
715 }
716
717 void MonClient::_reopen_session(int rank)
718 {
719 ceph_assert(ceph_mutex_is_locked(monc_lock));
720 ldout(cct, 10) << __func__ << " rank " << rank << dendl;
721
722 active_con.reset();
723 pending_cons.clear();
724
725 _start_hunting();
726
727 if (rank >= 0) {
728 _add_conn(rank);
729 } else {
730 _add_conns();
731 }
732
733 // throw out old queued messages
734 waiting_for_session.clear();
735
736 // throw out version check requests
737 while (!version_requests.empty()) {
738 ceph::async::post(std::move(version_requests.begin()->second),
739 monc_errc::session_reset, 0, 0);
740 version_requests.erase(version_requests.begin());
741 }
742
743 for (auto& c : pending_cons) {
744 c.second.start(monmap.get_epoch(), entity_name);
745 }
746
747 if (sub.reload()) {
748 _renew_subs();
749 }
750 }
751
752 void MonClient::_add_conn(unsigned rank)
753 {
754 auto peer = monmap.get_addrs(rank);
755 auto conn = messenger->connect_to_mon(peer);
756 MonConnection mc(cct, conn, global_id, &auth_registry);
757 if (auth) {
758 mc.get_auth().reset(auth->clone());
759 }
760 pending_cons.insert(std::make_pair(peer, std::move(mc)));
761 ldout(cct, 10) << "picked mon." << monmap.get_name(rank)
762 << " con " << conn
763 << " addr " << peer
764 << dendl;
765 }
766
767 void MonClient::_add_conns()
768 {
769 // collect the next batch of candidates who are listed right next to the ones
770 // already tried
771 auto get_next_batch = [this]() -> std::vector<unsigned> {
772 std::multimap<uint16_t, unsigned> ranks_by_priority;
773 boost::copy(
774 monmap.mon_info | boost::adaptors::filtered(
775 [this](auto& info) {
776 auto rank = monmap.get_rank(info.first);
777 return tried.count(rank) == 0;
778 }) | boost::adaptors::transformed(
779 [this](auto& info) {
780 auto rank = monmap.get_rank(info.first);
781 return std::make_pair(info.second.priority, rank);
782 }), std::inserter(ranks_by_priority, end(ranks_by_priority)));
783 if (ranks_by_priority.empty()) {
784 return {};
785 }
786 // only choose the monitors with lowest priority
787 auto cands = boost::make_iterator_range(
788 ranks_by_priority.equal_range(ranks_by_priority.begin()->first));
789 std::vector<unsigned> ranks;
790 boost::range::copy(cands | boost::adaptors::map_values,
791 std::back_inserter(ranks));
792 return ranks;
793 };
794 auto ranks = get_next_batch();
795 if (ranks.empty()) {
796 tried.clear(); // start over
797 ranks = get_next_batch();
798 }
799 ceph_assert(!ranks.empty());
800 if (ranks.size() > 1) {
801 std::vector<uint16_t> weights;
802 for (auto i : ranks) {
803 auto rank_name = monmap.get_name(i);
804 weights.push_back(monmap.get_weight(rank_name));
805 }
806 random_device_t rd;
807 if (std::accumulate(begin(weights), end(weights), 0u) == 0) {
808 std::shuffle(begin(ranks), end(ranks), std::mt19937{rd()});
809 } else {
810 weighted_shuffle(begin(ranks), end(ranks), begin(weights), end(weights),
811 std::mt19937{rd()});
812 }
813 }
814 ldout(cct, 10) << __func__ << " ranks=" << ranks << dendl;
815 unsigned n = cct->_conf->mon_client_hunt_parallel;
816 if (n == 0 || n > ranks.size()) {
817 n = ranks.size();
818 }
819 for (unsigned i = 0; i < n; i++) {
820 _add_conn(ranks[i]);
821 tried.insert(ranks[i]);
822 }
823 }
824
825 bool MonClient::ms_handle_reset(Connection *con)
826 {
827 std::lock_guard lock(monc_lock);
828
829 if (con->get_peer_type() != CEPH_ENTITY_TYPE_MON)
830 return false;
831
832 if (con->is_anon()) {
833 auto p = mon_commands.begin();
834 while (p != mon_commands.end()) {
835 auto cmd = p->second;
836 ++p;
837 if (cmd->target_con == con) {
838 _send_command(cmd); // may retry or fail
839 break;
840 }
841 }
842 return true;
843 }
844
845 if (_hunting()) {
846 if (pending_cons.count(con->get_peer_addrs())) {
847 ldout(cct, 10) << __func__ << " hunted mon " << con->get_peer_addrs()
848 << dendl;
849 } else {
850 ldout(cct, 10) << __func__ << " stray mon " << con->get_peer_addrs()
851 << dendl;
852 }
853 return true;
854 } else {
855 if (active_con && con == active_con->get_con()) {
856 ldout(cct, 10) << __func__ << " current mon " << con->get_peer_addrs()
857 << dendl;
858 _reopen_session();
859 return false;
860 } else {
861 ldout(cct, 10) << "ms_handle_reset stray mon " << con->get_peer_addrs()
862 << dendl;
863 return true;
864 }
865 }
866 }
867
868 bool MonClient::_opened() const
869 {
870 ceph_assert(ceph_mutex_is_locked(monc_lock));
871 return active_con || _hunting();
872 }
873
874 bool MonClient::_hunting() const
875 {
876 return !pending_cons.empty();
877 }
878
879 void MonClient::_start_hunting()
880 {
881 ceph_assert(!_hunting());
882 // adjust timeouts if necessary
883 if (!had_a_connection)
884 return;
885 reopen_interval_multiplier *= cct->_conf->mon_client_hunt_interval_backoff;
886 if (reopen_interval_multiplier >
887 cct->_conf->mon_client_hunt_interval_max_multiple) {
888 reopen_interval_multiplier =
889 cct->_conf->mon_client_hunt_interval_max_multiple;
890 }
891 }
892
893 void MonClient::_finish_hunting(int auth_err)
894 {
895 ldout(cct,10) << __func__ << " " << auth_err << dendl;
896 ceph_assert(ceph_mutex_is_locked(monc_lock));
897 // the pending conns have been cleaned.
898 ceph_assert(!_hunting());
899 if (active_con) {
900 auto con = active_con->get_con();
901 ldout(cct, 1) << "found mon."
902 << monmap.get_name(con->get_peer_addr())
903 << dendl;
904 } else {
905 ldout(cct, 1) << "no mon sessions established" << dendl;
906 }
907
908 had_a_connection = true;
909 _un_backoff();
910
911 if (!auth_err) {
912 last_rotating_renew_sent = utime_t();
913 while (!waiting_for_session.empty()) {
914 _send_mon_message(std::move(waiting_for_session.front()));
915 waiting_for_session.pop_front();
916 }
917 _resend_mon_commands();
918 send_log(true);
919 if (active_con) {
920 auth = std::move(active_con->get_auth());
921 if (global_id && global_id != active_con->get_global_id()) {
922 lderr(cct) << __func__ << " global_id changed from " << global_id
923 << " to " << active_con->get_global_id() << dendl;
924 }
925 global_id = active_con->get_global_id();
926 }
927 }
928 }
929
930 void MonClient::tick()
931 {
932 ldout(cct, 10) << __func__ << dendl;
933
934 utime_t now = ceph_clock_now();
935
936 auto reschedule_tick = make_scope_guard([this] {
937 schedule_tick();
938 });
939
940 _check_auth_tickets();
941 _check_tell_commands();
942
943 if (_hunting()) {
944 ldout(cct, 1) << "continuing hunt" << dendl;
945 return _reopen_session();
946 } else if (active_con) {
947 // just renew as needed
948 auto cur_con = active_con->get_con();
949 if (!cur_con->has_feature(CEPH_FEATURE_MON_STATEFUL_SUB)) {
950 const bool maybe_renew = sub.need_renew();
951 ldout(cct, 10) << "renew subs? -- " << (maybe_renew ? "yes" : "no")
952 << dendl;
953 if (maybe_renew) {
954 _renew_subs();
955 }
956 }
957
958 if (now > last_keepalive + cct->_conf->mon_client_ping_interval) {
959 cur_con->send_keepalive();
960 last_keepalive = now;
961
962 if (cct->_conf->mon_client_ping_timeout > 0 &&
963 cur_con->has_feature(CEPH_FEATURE_MSGR_KEEPALIVE2)) {
964 utime_t lk = cur_con->get_last_keepalive_ack();
965 utime_t interval = now - lk;
966 if (interval > cct->_conf->mon_client_ping_timeout) {
967 ldout(cct, 1) << "no keepalive since " << lk << " (" << interval
968 << " seconds), reconnecting" << dendl;
969 return _reopen_session();
970 }
971 }
972
973 _un_backoff();
974 }
975
976 if (now > last_send_log + cct->_conf->mon_client_log_interval) {
977 send_log();
978 last_send_log = now;
979 }
980 }
981 }
982
983 void MonClient::_un_backoff()
984 {
985 // un-backoff our reconnect interval
986 reopen_interval_multiplier = std::max(
987 cct->_conf.get_val<double>("mon_client_hunt_interval_min_multiple"),
988 reopen_interval_multiplier /
989 cct->_conf.get_val<double>("mon_client_hunt_interval_backoff"));
990 ldout(cct, 20) << __func__ << " reopen_interval_multipler now "
991 << reopen_interval_multiplier << dendl;
992 }
993
994 void MonClient::schedule_tick()
995 {
996 auto do_tick = make_lambda_context([this](int) { tick(); });
997 if (!is_connected()) {
998 // start another round of hunting
999 const auto hunt_interval = (cct->_conf->mon_client_hunt_interval *
1000 reopen_interval_multiplier);
1001 timer.add_event_after(hunt_interval, do_tick);
1002 } else {
1003 // keep in touch
1004 timer.add_event_after(std::min(cct->_conf->mon_client_ping_interval,
1005 cct->_conf->mon_client_log_interval),
1006 do_tick);
1007 }
1008 }
1009
1010 // ---------
1011
1012 void MonClient::_renew_subs()
1013 {
1014 ceph_assert(ceph_mutex_is_locked(monc_lock));
1015 if (!sub.have_new()) {
1016 ldout(cct, 10) << __func__ << " - empty" << dendl;
1017 return;
1018 }
1019
1020 ldout(cct, 10) << __func__ << dendl;
1021 if (!_opened())
1022 _reopen_session();
1023 else {
1024 auto m = ceph::make_message<MMonSubscribe>();
1025 m->what = sub.get_subs();
1026 m->hostname = ceph_get_short_hostname();
1027 _send_mon_message(std::move(m));
1028 sub.renewed();
1029 }
1030 }
1031
1032 void MonClient::handle_subscribe_ack(MMonSubscribeAck *m)
1033 {
1034 sub.acked(m->interval);
1035 m->put();
1036 }
1037
1038 int MonClient::_check_auth_tickets()
1039 {
1040 ceph_assert(ceph_mutex_is_locked(monc_lock));
1041 if (active_con && auth) {
1042 if (auth->need_tickets()) {
1043 ldout(cct, 10) << __func__ << " getting new tickets!" << dendl;
1044 auto m = ceph::make_message<MAuth>();
1045 m->protocol = auth->get_protocol();
1046 auth->prepare_build_request();
1047 auth->build_request(m->auth_payload);
1048 _send_mon_message(m);
1049 }
1050
1051 _check_auth_rotating();
1052 }
1053 return 0;
1054 }
1055
1056 int MonClient::_check_auth_rotating()
1057 {
1058 ceph_assert(ceph_mutex_is_locked(monc_lock));
1059 if (!rotating_secrets ||
1060 !auth_principal_needs_rotating_keys(entity_name)) {
1061 ldout(cct, 20) << "_check_auth_rotating not needed by " << entity_name << dendl;
1062 return 0;
1063 }
1064
1065 if (!active_con || !auth) {
1066 ldout(cct, 10) << "_check_auth_rotating waiting for auth session" << dendl;
1067 return 0;
1068 }
1069
1070 utime_t now = ceph_clock_now();
1071 utime_t cutoff = now;
1072 cutoff -= std::min(30.0, cct->_conf->auth_service_ticket_ttl / 4.0);
1073 utime_t issued_at_lower_bound = now;
1074 issued_at_lower_bound -= cct->_conf->auth_service_ticket_ttl;
1075 if (!rotating_secrets->need_new_secrets(cutoff)) {
1076 ldout(cct, 10) << "_check_auth_rotating have uptodate secrets (they expire after " << cutoff << ")" << dendl;
1077 rotating_secrets->dump_rotating();
1078 return 0;
1079 }
1080
1081 ldout(cct, 10) << "_check_auth_rotating renewing rotating keys (they expired before " << cutoff << ")" << dendl;
1082 if (!rotating_secrets->need_new_secrets() &&
1083 rotating_secrets->need_new_secrets(issued_at_lower_bound)) {
1084 // the key has expired before it has been issued?
1085 lderr(cct) << __func__ << " possible clock skew, rotating keys expired way too early"
1086 << " (before " << issued_at_lower_bound << ")" << dendl;
1087 }
1088 if ((now > last_rotating_renew_sent) &&
1089 double(now - last_rotating_renew_sent) < 1) {
1090 ldout(cct, 10) << __func__ << " called too often (last: "
1091 << last_rotating_renew_sent << "), skipping refresh" << dendl;
1092 return 0;
1093 }
1094 auto m = ceph::make_message<MAuth>();
1095 m->protocol = auth->get_protocol();
1096 if (auth->build_rotating_request(m->auth_payload)) {
1097 last_rotating_renew_sent = now;
1098 _send_mon_message(std::move(m));
1099 }
1100 return 0;
1101 }
1102
1103 int MonClient::wait_auth_rotating(double timeout)
1104 {
1105 std::unique_lock l(monc_lock);
1106
1107 // Must be initialized
1108 ceph_assert(auth != nullptr);
1109
1110 if (auth->get_protocol() == CEPH_AUTH_NONE)
1111 return 0;
1112
1113 if (!rotating_secrets)
1114 return 0;
1115
1116 ldout(cct, 10) << __func__ << " waiting for " << timeout << dendl;
1117 utime_t now = ceph_clock_now();
1118 if (auth_cond.wait_for(l, ceph::make_timespan(timeout), [now, this] {
1119 return (!auth_principal_needs_rotating_keys(entity_name) ||
1120 !rotating_secrets->need_new_secrets(now));
1121 })) {
1122 ldout(cct, 10) << __func__ << " done" << dendl;
1123 return 0;
1124 } else {
1125 ldout(cct, 0) << __func__ << " timed out after " << timeout << dendl;
1126 return -ETIMEDOUT;
1127 }
1128 }
1129
1130 // ---------
1131
1132 void MonClient::_send_command(MonCommand *r)
1133 {
1134 if (r->is_tell()) {
1135 ++r->send_attempts;
1136 if (r->send_attempts > cct->_conf->mon_client_directed_command_retry) {
1137 _finish_command(r, monc_errc::mon_unavailable, "mon unavailable", {});
1138 return;
1139 }
1140 // tell-style command
1141 if (monmap.min_mon_release >= ceph_release_t::octopus) {
1142 if (r->target_con) {
1143 r->target_con->mark_down();
1144 }
1145 if (r->target_rank >= 0) {
1146 if (r->target_rank >= (int)monmap.size()) {
1147 ldout(cct, 10) << " target " << r->target_rank
1148 << " >= max mon " << monmap.size() << dendl;
1149 _finish_command(r, monc_errc::rank_dne, "mon rank dne"sv, {});
1150 return;
1151 }
1152 r->target_con = messenger->connect_to_mon(
1153 monmap.get_addrs(r->target_rank), true /* anon */);
1154 } else {
1155 if (!monmap.contains(r->target_name)) {
1156 ldout(cct, 10) << " target " << r->target_name
1157 << " not present in monmap" << dendl;
1158 _finish_command(r, monc_errc::mon_dne, "mon dne"sv, {});
1159 return;
1160 }
1161 r->target_con = messenger->connect_to_mon(
1162 monmap.get_addrs(r->target_name), true /* anon */);
1163 }
1164
1165 r->target_session.reset(new MonConnection(cct, r->target_con, 0,
1166 &auth_registry));
1167 r->target_session->start(monmap.get_epoch(), entity_name);
1168 r->last_send_attempt = ceph_clock_now();
1169
1170 MCommand *m = new MCommand(monmap.fsid);
1171 m->set_tid(r->tid);
1172 m->cmd = r->cmd;
1173 m->set_data(r->inbl);
1174 r->target_session->queue_command(m);
1175 return;
1176 }
1177
1178 // ugly legacy handling of pre-octopus mons
1179 entity_addr_t peer;
1180 if (active_con) {
1181 peer = active_con->get_con()->get_peer_addr();
1182 }
1183
1184 if (r->target_rank >= 0 &&
1185 r->target_rank != monmap.get_rank(peer)) {
1186 ldout(cct, 10) << __func__ << " " << r->tid << " " << r->cmd
1187 << " wants rank " << r->target_rank
1188 << ", reopening session"
1189 << dendl;
1190 if (r->target_rank >= (int)monmap.size()) {
1191 ldout(cct, 10) << " target " << r->target_rank
1192 << " >= max mon " << monmap.size() << dendl;
1193 _finish_command(r, monc_errc::rank_dne, "mon rank dne"sv, {});
1194 return;
1195 }
1196 _reopen_session(r->target_rank);
1197 return;
1198 }
1199 if (r->target_name.length() &&
1200 r->target_name != monmap.get_name(peer)) {
1201 ldout(cct, 10) << __func__ << " " << r->tid << " " << r->cmd
1202 << " wants mon " << r->target_name
1203 << ", reopening session"
1204 << dendl;
1205 if (!monmap.contains(r->target_name)) {
1206 ldout(cct, 10) << " target " << r->target_name
1207 << " not present in monmap" << dendl;
1208 _finish_command(r, monc_errc::mon_dne, "mon dne"sv, {});
1209 return;
1210 }
1211 _reopen_session(monmap.get_rank(r->target_name));
1212 return;
1213 }
1214 // fall-thru to send 'normal' CLI command
1215 }
1216
1217 // normal CLI command
1218 ldout(cct, 10) << __func__ << " " << r->tid << " " << r->cmd << dendl;
1219 auto m = ceph::make_message<MMonCommand>(monmap.fsid);
1220 m->set_tid(r->tid);
1221 m->cmd = r->cmd;
1222 m->set_data(r->inbl);
1223 _send_mon_message(std::move(m));
1224 return;
1225 }
1226
1227 void MonClient::_check_tell_commands()
1228 {
1229 // resend any requests
1230 auto now = ceph_clock_now();
1231 auto p = mon_commands.begin();
1232 while (p != mon_commands.end()) {
1233 auto cmd = p->second;
1234 ++p;
1235 if (cmd->is_tell() &&
1236 cmd->last_send_attempt != utime_t() &&
1237 now - cmd->last_send_attempt > cct->_conf->mon_client_hunt_interval) {
1238 ldout(cct,5) << __func__ << " timeout tell command " << cmd->tid << dendl;
1239 _send_command(cmd); // might remove cmd from mon_commands
1240 }
1241 }
1242 }
1243
1244 void MonClient::_resend_mon_commands()
1245 {
1246 // resend any requests
1247 auto p = mon_commands.begin();
1248 while (p != mon_commands.end()) {
1249 auto cmd = p->second;
1250 ++p;
1251 if (cmd->is_tell() && monmap.min_mon_release >= ceph_release_t::octopus) {
1252 // starting with octopus, tell commands use their own connetion and need no
1253 // special resend when we finish hunting.
1254 } else {
1255 _send_command(cmd); // might remove cmd from mon_commands
1256 }
1257 }
1258 }
1259
1260 void MonClient::handle_mon_command_ack(MMonCommandAck *ack)
1261 {
1262 MonCommand *r = NULL;
1263 uint64_t tid = ack->get_tid();
1264
1265 if (tid == 0 && !mon_commands.empty()) {
1266 r = mon_commands.begin()->second;
1267 ldout(cct, 10) << __func__ << " has tid 0, assuming it is " << r->tid << dendl;
1268 } else {
1269 auto p = mon_commands.find(tid);
1270 if (p == mon_commands.end()) {
1271 ldout(cct, 10) << __func__ << " " << ack->get_tid() << " not found" << dendl;
1272 ack->put();
1273 return;
1274 }
1275 r = p->second;
1276 }
1277
1278 ldout(cct, 10) << __func__ << " " << r->tid << " " << r->cmd << dendl;
1279 auto ec = ack->r < 0 ? bs::error_code(-ack->r, mon_category())
1280 : bs::error_code();
1281 _finish_command(r, ec, ack->rs,
1282 std::move(ack->get_data()));
1283 ack->put();
1284 }
1285
1286 void MonClient::handle_command_reply(MCommandReply *reply)
1287 {
1288 MonCommand *r = NULL;
1289 uint64_t tid = reply->get_tid();
1290
1291 if (tid == 0 && !mon_commands.empty()) {
1292 r = mon_commands.begin()->second;
1293 ldout(cct, 10) << __func__ << " has tid 0, assuming it is " << r->tid
1294 << dendl;
1295 } else {
1296 auto p = mon_commands.find(tid);
1297 if (p == mon_commands.end()) {
1298 ldout(cct, 10) << __func__ << " " << reply->get_tid() << " not found"
1299 << dendl;
1300 reply->put();
1301 return;
1302 }
1303 r = p->second;
1304 }
1305
1306 ldout(cct, 10) << __func__ << " " << r->tid << " " << r->cmd << dendl;
1307 auto ec = reply->r < 0 ? bs::error_code(-reply->r, mon_category())
1308 : bs::error_code();
1309 _finish_command(r, ec, reply->rs, std::move(reply->get_data()));
1310 reply->put();
1311 }
1312
1313 int MonClient::_cancel_mon_command(uint64_t tid)
1314 {
1315 ceph_assert(ceph_mutex_is_locked(monc_lock));
1316
1317 auto it = mon_commands.find(tid);
1318 if (it == mon_commands.end()) {
1319 ldout(cct, 10) << __func__ << " tid " << tid << " dne" << dendl;
1320 return -ENOENT;
1321 }
1322
1323 ldout(cct, 10) << __func__ << " tid " << tid << dendl;
1324
1325 MonCommand *cmd = it->second;
1326 _finish_command(cmd, monc_errc::timed_out, "timed out"sv, {});
1327 return 0;
1328 }
1329
1330 void MonClient::_finish_command(MonCommand *r, bs::error_code ret,
1331 std::string_view rs, ceph::buffer::list&& bl)
1332 {
1333 ldout(cct, 10) << __func__ << " " << r->tid << " = " << ret << " " << rs
1334 << dendl;
1335 ceph::async::post(std::move(r->onfinish), ret, std::string(rs),
1336 std::move(bl));
1337 if (r->target_con) {
1338 r->target_con->mark_down();
1339 }
1340 mon_commands.erase(r->tid);
1341 delete r;
1342 }
1343
1344 // ---------
1345
1346 void MonClient::handle_get_version_reply(MMonGetVersionReply* m)
1347 {
1348 ceph_assert(ceph_mutex_is_locked(monc_lock));
1349 auto iter = version_requests.find(m->handle);
1350 if (iter == version_requests.end()) {
1351 ldout(cct, 0) << __func__ << " version request with handle " << m->handle
1352 << " not found" << dendl;
1353 } else {
1354 auto req = std::move(iter->second);
1355 ldout(cct, 10) << __func__ << " finishing " << iter->first << " version "
1356 << m->version << dendl;
1357 version_requests.erase(iter);
1358 ceph::async::post(std::move(req), bs::error_code(),
1359 m->version, m->oldest_version);
1360 }
1361 m->put();
1362 }
1363
1364 int MonClient::get_auth_request(
1365 Connection *con,
1366 AuthConnectionMeta *auth_meta,
1367 uint32_t *auth_method,
1368 std::vector<uint32_t> *preferred_modes,
1369 ceph::buffer::list *bl)
1370 {
1371 std::lock_guard l(monc_lock);
1372 ldout(cct,10) << __func__ << " con " << con << " auth_method " << *auth_method
1373 << dendl;
1374
1375 // connection to mon?
1376 if (con->get_peer_type() == CEPH_ENTITY_TYPE_MON) {
1377 ceph_assert(!auth_meta->authorizer);
1378 if (con->is_anon()) {
1379 for (auto& i : mon_commands) {
1380 if (i.second->target_con == con) {
1381 return i.second->target_session->get_auth_request(
1382 auth_method, preferred_modes, bl,
1383 entity_name, want_keys, rotating_secrets.get());
1384 }
1385 }
1386 }
1387 for (auto& i : pending_cons) {
1388 if (i.second.is_con(con)) {
1389 return i.second.get_auth_request(
1390 auth_method, preferred_modes, bl,
1391 entity_name, want_keys, rotating_secrets.get());
1392 }
1393 }
1394 return -ENOENT;
1395 }
1396
1397 // generate authorizer
1398 if (!auth) {
1399 lderr(cct) << __func__ << " but no auth handler is set up" << dendl;
1400 return -EACCES;
1401 }
1402 auth_meta->authorizer.reset(auth->build_authorizer(con->get_peer_type()));
1403 if (!auth_meta->authorizer) {
1404 lderr(cct) << __func__ << " failed to build_authorizer for type "
1405 << ceph_entity_type_name(con->get_peer_type()) << dendl;
1406 return -EACCES;
1407 }
1408 auth_meta->auth_method = auth_meta->authorizer->protocol;
1409 auth_registry.get_supported_modes(con->get_peer_type(),
1410 auth_meta->auth_method,
1411 preferred_modes);
1412 *bl = auth_meta->authorizer->bl;
1413 return 0;
1414 }
1415
1416 int MonClient::handle_auth_reply_more(
1417 Connection *con,
1418 AuthConnectionMeta *auth_meta,
1419 const ceph::buffer::list& bl,
1420 ceph::buffer::list *reply)
1421 {
1422 std::lock_guard l(monc_lock);
1423
1424 if (con->get_peer_type() == CEPH_ENTITY_TYPE_MON) {
1425 if (con->is_anon()) {
1426 for (auto& i : mon_commands) {
1427 if (i.second->target_con == con) {
1428 return i.second->target_session->handle_auth_reply_more(
1429 auth_meta, bl, reply);
1430 }
1431 }
1432 }
1433 for (auto& i : pending_cons) {
1434 if (i.second.is_con(con)) {
1435 return i.second.handle_auth_reply_more(auth_meta, bl, reply);
1436 }
1437 }
1438 return -ENOENT;
1439 }
1440
1441 // authorizer challenges
1442 if (!auth || !auth_meta->authorizer) {
1443 lderr(cct) << __func__ << " no authorizer?" << dendl;
1444 return -1;
1445 }
1446 auth_meta->authorizer->add_challenge(cct, bl);
1447 *reply = auth_meta->authorizer->bl;
1448 return 0;
1449 }
1450
1451 int MonClient::handle_auth_done(
1452 Connection *con,
1453 AuthConnectionMeta *auth_meta,
1454 uint64_t global_id,
1455 uint32_t con_mode,
1456 const ceph::buffer::list& bl,
1457 CryptoKey *session_key,
1458 std::string *connection_secret)
1459 {
1460 if (con->get_peer_type() == CEPH_ENTITY_TYPE_MON) {
1461 std::lock_guard l(monc_lock);
1462 if (con->is_anon()) {
1463 for (auto& i : mon_commands) {
1464 if (i.second->target_con == con) {
1465 return i.second->target_session->handle_auth_done(
1466 auth_meta, global_id, bl,
1467 session_key, connection_secret);
1468 }
1469 }
1470 }
1471 for (auto& i : pending_cons) {
1472 if (i.second.is_con(con)) {
1473 int r = i.second.handle_auth_done(
1474 auth_meta, global_id, bl,
1475 session_key, connection_secret);
1476 if (r) {
1477 pending_cons.erase(i.first);
1478 if (!pending_cons.empty()) {
1479 return r;
1480 }
1481 } else {
1482 active_con.reset(new MonConnection(std::move(i.second)));
1483 pending_cons.clear();
1484 ceph_assert(active_con->have_session());
1485 }
1486
1487 _finish_hunting(r);
1488 if (r || monmap.get_epoch() > 0) {
1489 _finish_auth(r);
1490 }
1491 return r;
1492 }
1493 }
1494 return -ENOENT;
1495 } else {
1496 // verify authorizer reply
1497 auto p = bl.begin();
1498 if (!auth_meta->authorizer->verify_reply(p, &auth_meta->connection_secret)) {
1499 ldout(cct, 0) << __func__ << " failed verifying authorizer reply"
1500 << dendl;
1501 return -EACCES;
1502 }
1503 auth_meta->session_key = auth_meta->authorizer->session_key;
1504 return 0;
1505 }
1506 }
1507
1508 int MonClient::handle_auth_bad_method(
1509 Connection *con,
1510 AuthConnectionMeta *auth_meta,
1511 uint32_t old_auth_method,
1512 int result,
1513 const std::vector<uint32_t>& allowed_methods,
1514 const std::vector<uint32_t>& allowed_modes)
1515 {
1516 auth_meta->allowed_methods = allowed_methods;
1517
1518 std::lock_guard l(monc_lock);
1519 if (con->get_peer_type() == CEPH_ENTITY_TYPE_MON) {
1520 if (con->is_anon()) {
1521 for (auto& i : mon_commands) {
1522 if (i.second->target_con == con) {
1523 int r = i.second->target_session->handle_auth_bad_method(
1524 old_auth_method,
1525 result,
1526 allowed_methods,
1527 allowed_modes);
1528 if (r < 0) {
1529 auto ec = bs::error_code(-r, mon_category());
1530 _finish_command(i.second, ec, "auth failed"sv, {});
1531 }
1532 return r;
1533 }
1534 }
1535 }
1536 for (auto& i : pending_cons) {
1537 if (i.second.is_con(con)) {
1538 int r = i.second.handle_auth_bad_method(old_auth_method,
1539 result,
1540 allowed_methods,
1541 allowed_modes);
1542 if (r == 0) {
1543 return r; // try another method on this con
1544 }
1545 pending_cons.erase(i.first);
1546 if (!pending_cons.empty()) {
1547 return r; // fail this con, maybe another con will succeed
1548 }
1549 // fail hunt
1550 _finish_hunting(r);
1551 _finish_auth(r);
1552 return r;
1553 }
1554 }
1555 return -ENOENT;
1556 } else {
1557 // huh...
1558 ldout(cct,10) << __func__ << " hmm, they didn't like " << old_auth_method
1559 << " result " << cpp_strerror(result)
1560 << " and auth is " << (auth ? auth->get_protocol() : 0)
1561 << dendl;
1562 return -EACCES;
1563 }
1564 }
1565
1566 int MonClient::handle_auth_request(
1567 Connection *con,
1568 AuthConnectionMeta *auth_meta,
1569 bool more,
1570 uint32_t auth_method,
1571 const ceph::buffer::list& payload,
1572 ceph::buffer::list *reply)
1573 {
1574 if (payload.length() == 0) {
1575 // for some channels prior to nautilus (osd heartbeat), we
1576 // tolerate the lack of an authorizer.
1577 if (!con->get_messenger()->require_authorizer) {
1578 handle_authentication_dispatcher->ms_handle_authentication(con);
1579 return 1;
1580 }
1581 return -EACCES;
1582 }
1583 auth_meta->auth_mode = payload[0];
1584 if (auth_meta->auth_mode < AUTH_MODE_AUTHORIZER ||
1585 auth_meta->auth_mode > AUTH_MODE_AUTHORIZER_MAX) {
1586 return -EACCES;
1587 }
1588 AuthAuthorizeHandler *ah = get_auth_authorize_handler(con->get_peer_type(),
1589 auth_method);
1590 if (!ah) {
1591 lderr(cct) << __func__ << " no AuthAuthorizeHandler found for auth method "
1592 << auth_method << dendl;
1593 return -EOPNOTSUPP;
1594 }
1595
1596 auto ac = &auth_meta->authorizer_challenge;
1597 if (auth_meta->skip_authorizer_challenge) {
1598 ldout(cct, 10) << __func__ << " skipping challenge on " << con << dendl;
1599 ac = nullptr;
1600 }
1601
1602 bool was_challenge = (bool)auth_meta->authorizer_challenge;
1603 bool isvalid = ah->verify_authorizer(
1604 cct,
1605 *rotating_secrets,
1606 payload,
1607 auth_meta->get_connection_secret_length(),
1608 reply,
1609 &con->peer_name,
1610 &con->peer_global_id,
1611 &con->peer_caps_info,
1612 &auth_meta->session_key,
1613 &auth_meta->connection_secret,
1614 ac);
1615 if (isvalid) {
1616 handle_authentication_dispatcher->ms_handle_authentication(con);
1617 return 1;
1618 }
1619 if (!more && !was_challenge && auth_meta->authorizer_challenge) {
1620 ldout(cct,10) << __func__ << " added challenge on " << con << dendl;
1621 return 0;
1622 }
1623 ldout(cct,10) << __func__ << " bad authorizer on " << con << dendl;
1624 // discard old challenge
1625 auth_meta->authorizer_challenge.reset();
1626 return -EACCES;
1627 }
1628
1629 AuthAuthorizer* MonClient::build_authorizer(int service_id) const {
1630 std::lock_guard l(monc_lock);
1631 if (auth) {
1632 return auth->build_authorizer(service_id);
1633 } else {
1634 ldout(cct, 0) << __func__ << " for " << ceph_entity_type_name(service_id)
1635 << ", but no auth is available now" << dendl;
1636 return nullptr;
1637 }
1638 }
1639
1640 #define dout_subsys ceph_subsys_monc
1641 #undef dout_prefix
1642 #define dout_prefix *_dout << "monclient" << (have_session() ? ": " : "(hunting): ")
1643
1644 MonConnection::MonConnection(
1645 CephContext *cct, ConnectionRef con, uint64_t global_id,
1646 AuthRegistry *ar)
1647 : cct(cct), con(con), global_id(global_id), auth_registry(ar)
1648 {}
1649
1650 MonConnection::~MonConnection()
1651 {
1652 if (con) {
1653 con->mark_down();
1654 con.reset();
1655 }
1656 }
1657
1658 bool MonConnection::have_session() const
1659 {
1660 return state == State::HAVE_SESSION;
1661 }
1662
1663 void MonConnection::start(epoch_t epoch,
1664 const EntityName& entity_name)
1665 {
1666 using ceph::encode;
1667 auth_start = ceph_clock_now();
1668
1669 if (con->get_peer_addr().is_msgr2()) {
1670 ldout(cct, 10) << __func__ << " opening mon connection" << dendl;
1671 state = State::AUTHENTICATING;
1672 con->send_message(new MMonGetMap());
1673 return;
1674 }
1675
1676 // restart authentication handshake
1677 state = State::NEGOTIATING;
1678
1679 // send an initial keepalive to ensure our timestamp is valid by the
1680 // time we are in an OPENED state (by sequencing this before
1681 // authentication).
1682 con->send_keepalive();
1683
1684 auto m = new MAuth;
1685 m->protocol = CEPH_AUTH_UNKNOWN;
1686 m->monmap_epoch = epoch;
1687 __u8 struct_v = 1;
1688 encode(struct_v, m->auth_payload);
1689 std::vector<uint32_t> auth_supported;
1690 auth_registry->get_supported_methods(con->get_peer_type(), &auth_supported);
1691 encode(auth_supported, m->auth_payload);
1692 encode(entity_name, m->auth_payload);
1693 encode(global_id, m->auth_payload);
1694 con->send_message(m);
1695 }
1696
1697 int MonConnection::get_auth_request(
1698 uint32_t *method,
1699 std::vector<uint32_t> *preferred_modes,
1700 ceph::buffer::list *bl,
1701 const EntityName& entity_name,
1702 uint32_t want_keys,
1703 RotatingKeyRing* keyring)
1704 {
1705 using ceph::encode;
1706 // choose method
1707 if (auth_method < 0) {
1708 std::vector<uint32_t> as;
1709 auth_registry->get_supported_methods(con->get_peer_type(), &as);
1710 if (as.empty()) {
1711 return -EACCES;
1712 }
1713 auth_method = as.front();
1714 }
1715 *method = auth_method;
1716 auth_registry->get_supported_modes(con->get_peer_type(), auth_method,
1717 preferred_modes);
1718 ldout(cct,10) << __func__ << " method " << *method
1719 << " preferred_modes " << *preferred_modes << dendl;
1720 if (preferred_modes->empty()) {
1721 return -EACCES;
1722 }
1723
1724 int r = _init_auth(*method, entity_name, want_keys, keyring, true);
1725 ceph_assert(r == 0);
1726
1727 // initial requset includes some boilerplate...
1728 encode((char)AUTH_MODE_MON, *bl);
1729 encode(entity_name, *bl);
1730 encode(global_id, *bl);
1731
1732 // and (maybe) some method-specific initial payload
1733 auth->build_initial_request(bl);
1734
1735 return 0;
1736 }
1737
1738 int MonConnection::handle_auth_reply_more(
1739 AuthConnectionMeta *auth_meta,
1740 const ceph::buffer::list& bl,
1741 ceph::buffer::list *reply)
1742 {
1743 ldout(cct, 10) << __func__ << " payload " << bl.length() << dendl;
1744 ldout(cct, 30) << __func__ << " got\n";
1745 bl.hexdump(*_dout);
1746 *_dout << dendl;
1747
1748 auto p = bl.cbegin();
1749 ldout(cct, 10) << __func__ << " payload_len " << bl.length() << dendl;
1750 int r = auth->handle_response(0, p, &auth_meta->session_key,
1751 &auth_meta->connection_secret);
1752 if (r == -EAGAIN) {
1753 auth->prepare_build_request();
1754 auth->build_request(*reply);
1755 ldout(cct, 10) << __func__ << " responding with " << reply->length()
1756 << " bytes" << dendl;
1757 r = 0;
1758 } else if (r < 0) {
1759 lderr(cct) << __func__ << " handle_response returned " << r << dendl;
1760 } else {
1761 ldout(cct, 10) << __func__ << " authenticated!" << dendl;
1762 // FIXME
1763 ceph_abort(cct, "write me");
1764 }
1765 return r;
1766 }
1767
1768 int MonConnection::handle_auth_done(
1769 AuthConnectionMeta *auth_meta,
1770 uint64_t new_global_id,
1771 const ceph::buffer::list& bl,
1772 CryptoKey *session_key,
1773 std::string *connection_secret)
1774 {
1775 ldout(cct,10) << __func__ << " global_id " << new_global_id
1776 << " payload " << bl.length()
1777 << dendl;
1778 global_id = new_global_id;
1779 auth->set_global_id(global_id);
1780 auto p = bl.begin();
1781 int auth_err = auth->handle_response(0, p, &auth_meta->session_key,
1782 &auth_meta->connection_secret);
1783 if (auth_err >= 0) {
1784 state = State::HAVE_SESSION;
1785 }
1786 con->set_last_keepalive_ack(auth_start);
1787
1788 if (pending_tell_command) {
1789 con->send_message2(std::move(pending_tell_command));
1790 }
1791 return auth_err;
1792 }
1793
1794 int MonConnection::handle_auth_bad_method(
1795 uint32_t old_auth_method,
1796 int result,
1797 const std::vector<uint32_t>& allowed_methods,
1798 const std::vector<uint32_t>& allowed_modes)
1799 {
1800 ldout(cct,10) << __func__ << " old_auth_method " << old_auth_method
1801 << " result " << cpp_strerror(result)
1802 << " allowed_methods " << allowed_methods << dendl;
1803 std::vector<uint32_t> auth_supported;
1804 auth_registry->get_supported_methods(con->get_peer_type(), &auth_supported);
1805 auto p = std::find(auth_supported.begin(), auth_supported.end(),
1806 old_auth_method);
1807 assert(p != auth_supported.end());
1808 p = std::find_first_of(std::next(p), auth_supported.end(),
1809 allowed_methods.begin(), allowed_methods.end());
1810 if (p == auth_supported.end()) {
1811 lderr(cct) << __func__ << " server allowed_methods " << allowed_methods
1812 << " but i only support " << auth_supported << dendl;
1813 return -EACCES;
1814 }
1815 auth_method = *p;
1816 ldout(cct,10) << __func__ << " will try " << auth_method << " next" << dendl;
1817 return 0;
1818 }
1819
1820 int MonConnection::handle_auth(MAuthReply* m,
1821 const EntityName& entity_name,
1822 uint32_t want_keys,
1823 RotatingKeyRing* keyring)
1824 {
1825 if (state == State::NEGOTIATING) {
1826 int r = _negotiate(m, entity_name, want_keys, keyring);
1827 if (r) {
1828 return r;
1829 }
1830 state = State::AUTHENTICATING;
1831 }
1832 int r = authenticate(m);
1833 if (!r) {
1834 state = State::HAVE_SESSION;
1835 }
1836 return r;
1837 }
1838
1839 int MonConnection::_negotiate(MAuthReply *m,
1840 const EntityName& entity_name,
1841 uint32_t want_keys,
1842 RotatingKeyRing* keyring)
1843 {
1844 int r = _init_auth(m->protocol, entity_name, want_keys, keyring, false);
1845 if (r == -ENOTSUP) {
1846 if (m->result == -ENOTSUP) {
1847 ldout(cct, 10) << "none of our auth protocols are supported by the server"
1848 << dendl;
1849 }
1850 return m->result;
1851 }
1852 return r;
1853 }
1854
1855 int MonConnection::_init_auth(
1856 uint32_t method,
1857 const EntityName& entity_name,
1858 uint32_t want_keys,
1859 RotatingKeyRing* keyring,
1860 bool msgr2)
1861 {
1862 ldout(cct, 10) << __func__ << " method " << method << dendl;
1863 if (auth && auth->get_protocol() == (int)method) {
1864 ldout(cct, 10) << __func__ << " already have auth, reseting" << dendl;
1865 auth->reset();
1866 return 0;
1867 }
1868
1869 ldout(cct, 10) << __func__ << " creating new auth" << dendl;
1870 auth.reset(AuthClientHandler::create(cct, method, keyring));
1871 if (!auth) {
1872 ldout(cct, 10) << " no handler for protocol " << method << dendl;
1873 return -ENOTSUP;
1874 }
1875
1876 // do not request MGR key unless the mon has the SERVER_KRAKEN
1877 // feature. otherwise it will give us an auth error. note that
1878 // we have to use the FEATUREMASK because pre-jewel the kraken
1879 // feature bit was used for something else.
1880 if (!msgr2 &&
1881 (want_keys & CEPH_ENTITY_TYPE_MGR) &&
1882 !(con->has_features(CEPH_FEATUREMASK_SERVER_KRAKEN))) {
1883 ldout(cct, 1) << __func__
1884 << " not requesting MGR keys from pre-kraken monitor"
1885 << dendl;
1886 want_keys &= ~CEPH_ENTITY_TYPE_MGR;
1887 }
1888 auth->set_want_keys(want_keys);
1889 auth->init(entity_name);
1890 auth->set_global_id(global_id);
1891 return 0;
1892 }
1893
1894 int MonConnection::authenticate(MAuthReply *m)
1895 {
1896 ceph_assert(auth);
1897 if (!m->global_id) {
1898 ldout(cct, 1) << "peer sent an invalid global_id" << dendl;
1899 }
1900 if (m->global_id != global_id) {
1901 // it's a new session
1902 auth->reset();
1903 global_id = m->global_id;
1904 auth->set_global_id(global_id);
1905 ldout(cct, 10) << "my global_id is " << m->global_id << dendl;
1906 }
1907 auto p = m->result_bl.cbegin();
1908 int ret = auth->handle_response(m->result, p, nullptr, nullptr);
1909 if (ret == -EAGAIN) {
1910 auto ma = new MAuth;
1911 ma->protocol = auth->get_protocol();
1912 auth->prepare_build_request();
1913 auth->build_request(ma->auth_payload);
1914 con->send_message(ma);
1915 }
1916 if (ret == 0 && pending_tell_command) {
1917 con->send_message2(std::move(pending_tell_command));
1918 }
1919
1920 return ret;
1921 }
1922
1923 void MonClient::register_config_callback(md_config_t::config_callback fn) {
1924 ceph_assert(!config_cb);
1925 config_cb = fn;
1926 }
1927
1928 md_config_t::config_callback MonClient::get_config_callback() {
1929 return config_cb;
1930 }
1931
1932 #pragma GCC diagnostic push
1933 #pragma GCC diagnostic ignored "-Wnon-virtual-dtor"
1934 #pragma clang diagnostic push
1935 #pragma clang diagnostic ignored "-Wnon-virtual-dtor"
1936 class monc_error_category : public ceph::converting_category {
1937 public:
1938 monc_error_category(){}
1939 const char* name() const noexcept override;
1940 const char* message(int ev, char*, std::size_t) const noexcept override;
1941 std::string message(int ev) const override;
1942 bs::error_condition default_error_condition(int ev) const noexcept
1943 override;
1944 bool equivalent(int ev, const bs::error_condition& c) const
1945 noexcept override;
1946 using ceph::converting_category::equivalent;
1947 int from_code(int ev) const noexcept override;
1948 };
1949 #pragma GCC diagnostic pop
1950 #pragma clang diagnostic pop
1951
1952 const char* monc_error_category::name() const noexcept {
1953 return "monc";
1954 }
1955
1956 const char* monc_error_category::message(int ev, char*, std::size_t) const noexcept {
1957 if (ev == 0)
1958 return "No error";
1959
1960 switch (static_cast<monc_errc>(ev)) {
1961 case monc_errc::shutting_down: // Command failed due to MonClient shutting down
1962 return "Command failed due to MonClient shutting down";
1963 case monc_errc::session_reset:
1964 return "Monitor session was reset";
1965 case monc_errc::rank_dne:
1966 return "Requested monitor rank does not exist";
1967 case monc_errc::mon_dne:
1968 return "Requested monitor does not exist";
1969 case monc_errc::timed_out:
1970 return "Monitor operation timed out";
1971 case monc_errc::mon_unavailable:
1972 return "Monitor unavailable";
1973 }
1974
1975 return "Unknown error";
1976 }
1977
1978 std::string monc_error_category::message(int ev) const {
1979 return message(ev, nullptr, 0);
1980 }
1981
1982 bs::error_condition monc_error_category::default_error_condition(int ev) const noexcept {
1983 switch (static_cast<monc_errc>(ev)) {
1984 case monc_errc::shutting_down:
1985 return bs::errc::operation_canceled;
1986 case monc_errc::session_reset:
1987 return bs::errc::resource_unavailable_try_again;
1988 case monc_errc::rank_dne:
1989 [[fallthrough]];
1990 case monc_errc::mon_dne:
1991 return ceph::errc::not_in_map;
1992 case monc_errc::timed_out:
1993 return bs::errc::timed_out;
1994 case monc_errc::mon_unavailable:
1995 return bs::errc::no_such_device;
1996 }
1997 return { ev, *this };
1998 }
1999
2000 bool monc_error_category::equivalent(int ev, const bs::error_condition& c) const noexcept {
2001 switch (static_cast<monc_errc>(ev)) {
2002 case monc_errc::rank_dne:
2003 [[fallthrough]];
2004 case monc_errc::mon_dne:
2005 return c == bs::errc::no_such_file_or_directory;
2006 default:
2007 return default_error_condition(ev) == c;
2008 }
2009 }
2010
2011 int monc_error_category::from_code(int ev) const noexcept {
2012 if (ev == 0)
2013 return 0;
2014
2015 switch (static_cast<monc_errc>(ev)) {
2016 case monc_errc::shutting_down:
2017 return -ECANCELED;
2018 case monc_errc::session_reset:
2019 return -EAGAIN;
2020 case monc_errc::rank_dne:
2021 [[fallthrough]];
2022 case monc_errc::mon_dne:
2023 return -ENOENT;
2024 case monc_errc::timed_out:
2025 return -ETIMEDOUT;
2026 case monc_errc::mon_unavailable:
2027 return -ENXIO;
2028 }
2029 return -EDOM;
2030 }
2031
2032 const bs::error_category& monc_category() noexcept {
2033 static const monc_error_category c;
2034 return c;
2035 }