]> git.proxmox.com Git - ceph.git/blob - ceph/src/mon/MonClient.cc
38ad26f61bdcf346db20b357a3f05b30a4b34580
[ceph.git] / ceph / src / mon / MonClient.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #include <algorithm>
16 #include <iterator>
17 #include <random>
18 #include <boost/range/adaptor/map.hpp>
19 #include <boost/range/adaptor/filtered.hpp>
20 #include <boost/range/algorithm/copy.hpp>
21 #include <boost/range/algorithm_ext/copy_n.hpp>
22 #include "common/weighted_shuffle.h"
23
24 #include "include/random.h"
25 #include "include/scope_guard.h"
26 #include "include/stringify.h"
27
28 #include "messages/MMonGetMap.h"
29 #include "messages/MMonGetVersion.h"
30 #include "messages/MMonGetMap.h"
31 #include "messages/MMonGetVersionReply.h"
32 #include "messages/MMonMap.h"
33 #include "messages/MConfig.h"
34 #include "messages/MAuth.h"
35 #include "messages/MLogAck.h"
36 #include "messages/MAuthReply.h"
37 #include "messages/MMonCommand.h"
38 #include "messages/MMonCommandAck.h"
39 #include "messages/MCommand.h"
40 #include "messages/MCommandReply.h"
41 #include "messages/MPing.h"
42
43 #include "messages/MMonSubscribe.h"
44 #include "messages/MMonSubscribeAck.h"
45 #include "common/errno.h"
46 #include "common/hostname.h"
47 #include "common/LogClient.h"
48
49 #include "MonClient.h"
50 #include "error_code.h"
51 #include "MonMap.h"
52
53 #include "auth/Auth.h"
54 #include "auth/KeyRing.h"
55 #include "auth/AuthClientHandler.h"
56 #include "auth/AuthRegistry.h"
57 #include "auth/RotatingKeyRing.h"
58
59 #define dout_subsys ceph_subsys_monc
60 #undef dout_prefix
61 #define dout_prefix *_dout << "monclient" << (_hunting() ? "(hunting)":"") << ": "
62
63 namespace bs = boost::system;
64 using std::string;
65 using namespace std::literals;
66
67 MonClient::MonClient(CephContext *cct_, boost::asio::io_context& service) :
68 Dispatcher(cct_),
69 AuthServer(cct_),
70 messenger(NULL),
71 timer(cct_, monc_lock),
72 service(service),
73 initialized(false),
74 log_client(NULL),
75 more_log_pending(false),
76 want_monmap(true),
77 had_a_connection(false),
78 reopen_interval_multiplier(
79 cct_->_conf.get_val<double>("mon_client_hunt_interval_min_multiple")),
80 last_mon_command_tid(0),
81 version_req_id(0)
82 {}
83
84 MonClient::~MonClient()
85 {
86 }
87
88 int MonClient::build_initial_monmap()
89 {
90 ldout(cct, 10) << __func__ << dendl;
91 int r = monmap.build_initial(cct, false, std::cerr);
92 ldout(cct,10) << "monmap:\n";
93 monmap.print(*_dout);
94 *_dout << dendl;
95 return r;
96 }
97
98 int MonClient::get_monmap()
99 {
100 ldout(cct, 10) << __func__ << dendl;
101 std::unique_lock l(monc_lock);
102
103 sub.want("monmap", 0, 0);
104 if (!_opened())
105 _reopen_session();
106 map_cond.wait(l, [this] { return !want_monmap; });
107 ldout(cct, 10) << __func__ << " done" << dendl;
108 return 0;
109 }
110
111 int MonClient::get_monmap_and_config()
112 {
113 ldout(cct, 10) << __func__ << dendl;
114 ceph_assert(!messenger);
115
116 int tries = 10;
117
118 cct->init_crypto();
119 auto shutdown_crypto = make_scope_guard([this] {
120 cct->shutdown_crypto();
121 });
122
123 int r = build_initial_monmap();
124 if (r < 0) {
125 lderr(cct) << __func__ << " cannot identify monitors to contact" << dendl;
126 return r;
127 }
128
129 messenger = Messenger::create_client_messenger(
130 cct, "temp_mon_client");
131 ceph_assert(messenger);
132 messenger->add_dispatcher_head(this);
133 messenger->start();
134 auto shutdown_msgr = make_scope_guard([this] {
135 messenger->shutdown();
136 messenger->wait();
137 delete messenger;
138 messenger = nullptr;
139 if (!monmap.fsid.is_zero()) {
140 cct->_conf.set_val("fsid", stringify(monmap.fsid));
141 }
142 });
143
144 want_bootstrap_config = true;
145 auto shutdown_config = make_scope_guard([this] {
146 std::unique_lock l(monc_lock);
147 want_bootstrap_config = false;
148 bootstrap_config.reset();
149 });
150
151 ceph::ref_t<MConfig> config;
152 while (tries-- > 0) {
153 r = init();
154 if (r < 0) {
155 return r;
156 }
157 r = authenticate(std::chrono::duration<double>(cct->_conf.get_val<std::chrono::seconds>("client_mount_timeout")).count());
158 if (r == -ETIMEDOUT) {
159 shutdown();
160 continue;
161 }
162 if (r < 0) {
163 break;
164 }
165 {
166 std::unique_lock l(monc_lock);
167 if (monmap.get_epoch() &&
168 !monmap.persistent_features.contains_all(
169 ceph::features::mon::FEATURE_MIMIC)) {
170 ldout(cct,10) << __func__ << " pre-mimic monitor, no config to fetch"
171 << dendl;
172 r = 0;
173 break;
174 }
175 while ((!bootstrap_config || monmap.get_epoch() == 0) && r == 0) {
176 ldout(cct,20) << __func__ << " waiting for monmap|config" << dendl;
177 auto status = map_cond.wait_for(l, ceph::make_timespan(
178 cct->_conf->mon_client_hunt_interval));
179 if (status == std::cv_status::timeout) {
180 r = -ETIMEDOUT;
181 }
182 }
183
184 if (bootstrap_config) {
185 ldout(cct,10) << __func__ << " success" << dendl;
186 config = std::move(bootstrap_config);
187 r = 0;
188 break;
189 }
190 }
191 lderr(cct) << __func__ << " failed to get config" << dendl;
192 shutdown();
193 continue;
194 }
195
196 if (config) {
197 // apply the bootstrap config to ensure its applied prior to completing
198 // the bootstrap
199 cct->_conf.set_mon_vals(cct, config->config, config_cb);
200 }
201
202 shutdown();
203 return r;
204 }
205
206
207 /**
208 * Ping the monitor with id @p mon_id and set the resulting reply in
209 * the provided @p result_reply, if this last parameter is not NULL.
210 *
211 * So that we don't rely on the MonClient's default messenger, set up
212 * during connect(), we create our own messenger to comunicate with the
213 * specified monitor. This is advantageous in the following ways:
214 *
215 * - Isolate the ping procedure from the rest of the MonClient's operations,
216 * allowing us to not acquire or manage the big monc_lock, thus not
217 * having to block waiting for some other operation to finish before we
218 * can proceed.
219 * * for instance, we can ping mon.FOO even if we are currently hunting
220 * or blocked waiting for auth to complete with mon.BAR.
221 *
222 * - Ping a monitor prior to establishing a connection (using connect())
223 * and properly establish the MonClient's messenger. This frees us
224 * from dealing with the complex foo that happens in connect().
225 *
226 * We also don't rely on MonClient as a dispatcher for this messenger,
227 * unlike what happens with the MonClient's default messenger. This allows
228 * us to sandbox the whole ping, having it much as a separate entity in
229 * the MonClient class, considerably simplifying the handling and dispatching
230 * of messages without needing to consider monc_lock.
231 *
232 * Current drawback is that we will establish a messenger for each ping
233 * we want to issue, instead of keeping a single messenger instance that
234 * would be used for all pings.
235 */
236 int MonClient::ping_monitor(const string &mon_id, string *result_reply)
237 {
238 ldout(cct, 10) << __func__ << dendl;
239
240 string new_mon_id;
241 if (monmap.contains("noname-"+mon_id)) {
242 new_mon_id = "noname-"+mon_id;
243 } else {
244 new_mon_id = mon_id;
245 }
246
247 if (new_mon_id.empty()) {
248 ldout(cct, 10) << __func__ << " specified mon id is empty!" << dendl;
249 return -EINVAL;
250 } else if (!monmap.contains(new_mon_id)) {
251 ldout(cct, 10) << __func__ << " no such monitor 'mon." << new_mon_id << "'"
252 << dendl;
253 return -ENOENT;
254 }
255
256 // N.B. monc isn't initialized
257
258 auth_registry.refresh_config();
259
260 KeyRing keyring;
261 keyring.from_ceph_context(cct);
262 RotatingKeyRing rkeyring(cct, cct->get_module_type(), &keyring);
263
264 MonClientPinger *pinger = new MonClientPinger(cct,
265 &rkeyring,
266 result_reply);
267
268 Messenger *smsgr = Messenger::create_client_messenger(cct, "temp_ping_client");
269 smsgr->add_dispatcher_head(pinger);
270 smsgr->set_auth_client(pinger);
271 smsgr->start();
272
273 ConnectionRef con = smsgr->connect_to_mon(monmap.get_addrs(new_mon_id));
274 ldout(cct, 10) << __func__ << " ping mon." << new_mon_id
275 << " " << con->get_peer_addr() << dendl;
276
277 pinger->mc.reset(new MonConnection(cct, con, 0, &auth_registry));
278 pinger->mc->start(monmap.get_epoch(), entity_name);
279 con->send_message(new MPing);
280
281 int ret = pinger->wait_for_reply(cct->_conf->mon_client_ping_timeout);
282 if (ret == 0) {
283 ldout(cct,10) << __func__ << " got ping reply" << dendl;
284 } else {
285 ret = -ret;
286 }
287
288 con->mark_down();
289 pinger->mc.reset();
290 smsgr->shutdown();
291 smsgr->wait();
292 delete smsgr;
293 delete pinger;
294 return ret;
295 }
296
297 bool MonClient::ms_dispatch(Message *m)
298 {
299 // we only care about these message types
300 switch (m->get_type()) {
301 case CEPH_MSG_MON_MAP:
302 case CEPH_MSG_AUTH_REPLY:
303 case CEPH_MSG_MON_SUBSCRIBE_ACK:
304 case CEPH_MSG_MON_GET_VERSION_REPLY:
305 case MSG_MON_COMMAND_ACK:
306 case MSG_COMMAND_REPLY:
307 case MSG_LOGACK:
308 case MSG_CONFIG:
309 break;
310 case CEPH_MSG_PING:
311 m->put();
312 return true;
313 default:
314 return false;
315 }
316
317 std::lock_guard lock(monc_lock);
318
319 if (!m->get_connection()->is_anon() &&
320 m->get_source().type() == CEPH_ENTITY_TYPE_MON) {
321 if (_hunting()) {
322 auto p = _find_pending_con(m->get_connection());
323 if (p == pending_cons.end()) {
324 // ignore any messages outside hunting sessions
325 ldout(cct, 10) << "discarding stray monitor message " << *m << dendl;
326 m->put();
327 return true;
328 }
329 } else if (!active_con || active_con->get_con() != m->get_connection()) {
330 // ignore any messages outside our session(s)
331 ldout(cct, 10) << "discarding stray monitor message " << *m << dendl;
332 m->put();
333 return true;
334 }
335 }
336
337 switch (m->get_type()) {
338 case CEPH_MSG_MON_MAP:
339 handle_monmap(static_cast<MMonMap*>(m));
340 if (passthrough_monmap) {
341 return false;
342 } else {
343 m->put();
344 }
345 break;
346 case CEPH_MSG_AUTH_REPLY:
347 handle_auth(static_cast<MAuthReply*>(m));
348 break;
349 case CEPH_MSG_MON_SUBSCRIBE_ACK:
350 handle_subscribe_ack(static_cast<MMonSubscribeAck*>(m));
351 break;
352 case CEPH_MSG_MON_GET_VERSION_REPLY:
353 handle_get_version_reply(static_cast<MMonGetVersionReply*>(m));
354 break;
355 case MSG_MON_COMMAND_ACK:
356 handle_mon_command_ack(static_cast<MMonCommandAck*>(m));
357 break;
358 case MSG_COMMAND_REPLY:
359 if (m->get_connection()->is_anon() &&
360 m->get_source().type() == CEPH_ENTITY_TYPE_MON) {
361 // this connection is from 'tell'... ignore everything except our command
362 // reply. (we'll get misc other message because we authenticated, but we
363 // don't need them.)
364 handle_command_reply(static_cast<MCommandReply*>(m));
365 return true;
366 }
367 // leave the message for another dispatch handler (e.g., Objecter)
368 return false;
369 case MSG_LOGACK:
370 if (log_client) {
371 log_client->handle_log_ack(static_cast<MLogAck*>(m));
372 m->put();
373 if (more_log_pending) {
374 send_log();
375 }
376 } else {
377 m->put();
378 }
379 break;
380 case MSG_CONFIG:
381 handle_config(static_cast<MConfig*>(m));
382 break;
383 }
384 return true;
385 }
386
387 void MonClient::send_log(bool flush)
388 {
389 if (log_client) {
390 auto lm = log_client->get_mon_log_message(flush);
391 if (lm)
392 _send_mon_message(std::move(lm));
393 more_log_pending = log_client->are_pending();
394 }
395 }
396
397 void MonClient::flush_log()
398 {
399 std::lock_guard l(monc_lock);
400 send_log();
401 }
402
403 /* Unlike all the other message-handling functions, we don't put away a reference
404 * because we want to support MMonMap passthrough to other Dispatchers. */
405 void MonClient::handle_monmap(MMonMap *m)
406 {
407 ldout(cct, 10) << __func__ << " " << *m << dendl;
408 auto con_addrs = m->get_source_addrs();
409 string old_name = monmap.get_name(con_addrs);
410 const auto old_epoch = monmap.get_epoch();
411
412 auto p = m->monmapbl.cbegin();
413 decode(monmap, p);
414
415 ldout(cct, 10) << " got monmap " << monmap.epoch
416 << " from mon." << old_name
417 << " (according to old e" << monmap.get_epoch() << ")"
418 << dendl;
419 ldout(cct, 10) << "dump:\n";
420 monmap.print(*_dout);
421 *_dout << dendl;
422
423 if (old_epoch != monmap.get_epoch()) {
424 tried.clear();
425 }
426 if (old_name.size() == 0) {
427 ldout(cct,10) << " can't identify which mon we were connected to" << dendl;
428 _reopen_session();
429 } else {
430 auto new_name = monmap.get_name(con_addrs);
431 if (new_name.empty()) {
432 ldout(cct, 10) << "mon." << old_name << " at " << con_addrs
433 << " went away" << dendl;
434 // can't find the mon we were talking to (above)
435 _reopen_session();
436 } else if (messenger->should_use_msgr2() &&
437 monmap.get_addrs(new_name).has_msgr2() &&
438 !con_addrs.has_msgr2()) {
439 ldout(cct,1) << " mon." << new_name << " has (v2) addrs "
440 << monmap.get_addrs(new_name) << " but i'm connected to "
441 << con_addrs << ", reconnecting" << dendl;
442 _reopen_session();
443 }
444 }
445
446 cct->set_mon_addrs(monmap);
447
448 sub.got("monmap", monmap.get_epoch());
449 map_cond.notify_all();
450 want_monmap = false;
451
452 if (authenticate_err == 1) {
453 _finish_auth(0);
454 }
455 }
456
457 void MonClient::handle_config(MConfig *m)
458 {
459 ldout(cct,10) << __func__ << " " << *m << dendl;
460
461 if (want_bootstrap_config) {
462 // get_monmap_and_config is waiting for config which it will apply
463 // synchronously
464 bootstrap_config = ceph::ref_t<MConfig>(m, false);
465 map_cond.notify_all();
466 return;
467 }
468
469 // Take the sledgehammer approach to ensuring we don't depend on
470 // anything in MonClient.
471 boost::asio::post(finish_strand,
472 [m, cct = boost::intrusive_ptr<CephContext>(cct),
473 config_notify_cb = config_notify_cb,
474 config_cb = config_cb]() {
475 cct->_conf.set_mon_vals(cct.get(), m->config, config_cb);
476 if (config_notify_cb) {
477 config_notify_cb();
478 }
479 m->put();
480 });
481 }
482
483 // ----------------------
484
485 int MonClient::init()
486 {
487 ldout(cct, 10) << __func__ << dendl;
488
489 entity_name = cct->_conf->name;
490
491 auth_registry.refresh_config();
492
493 std::lock_guard l(monc_lock);
494 keyring.reset(new KeyRing);
495 if (auth_registry.is_supported_method(messenger->get_mytype(),
496 CEPH_AUTH_CEPHX)) {
497 // this should succeed, because auth_registry just checked!
498 int r = keyring->from_ceph_context(cct);
499 if (r != 0) {
500 // but be somewhat graceful in case there was a race condition
501 lderr(cct) << "keyring not found" << dendl;
502 return r;
503 }
504 }
505 if (!auth_registry.any_supported_methods(messenger->get_mytype())) {
506 return -ENOENT;
507 }
508
509 rotating_secrets.reset(
510 new RotatingKeyRing(cct, cct->get_module_type(), keyring.get()));
511
512 initialized = true;
513
514 messenger->set_auth_client(this);
515 messenger->add_dispatcher_head(this);
516
517 timer.init();
518 schedule_tick();
519
520 return 0;
521 }
522
523 void MonClient::shutdown()
524 {
525 ldout(cct, 10) << __func__ << dendl;
526 monc_lock.lock();
527 stopping = true;
528 while (!version_requests.empty()) {
529 ceph::async::post(std::move(version_requests.begin()->second),
530 monc_errc::shutting_down, 0, 0);
531 ldout(cct, 20) << __func__ << " canceling and discarding version request "
532 << version_requests.begin()->first << dendl;
533 version_requests.erase(version_requests.begin());
534 }
535 while (!mon_commands.empty()) {
536 auto tid = mon_commands.begin()->first;
537 _cancel_mon_command(tid);
538 }
539 ldout(cct, 20) << __func__ << " discarding " << waiting_for_session.size()
540 << " pending message(s)" << dendl;
541 waiting_for_session.clear();
542
543 active_con.reset();
544 pending_cons.clear();
545
546 auth.reset();
547 global_id = 0;
548 authenticate_err = 0;
549 authenticated = false;
550
551 monc_lock.unlock();
552
553 if (initialized) {
554 initialized = false;
555 }
556 monc_lock.lock();
557 timer.shutdown();
558 stopping = false;
559 monc_lock.unlock();
560 }
561
562 int MonClient::authenticate(double timeout)
563 {
564 std::unique_lock lock{monc_lock};
565
566 if (active_con) {
567 ldout(cct, 5) << "already authenticated" << dendl;
568 return 0;
569 }
570 sub.want("monmap", monmap.get_epoch() ? monmap.get_epoch() + 1 : 0, 0);
571 sub.want("config", 0, 0);
572 if (!_opened())
573 _reopen_session();
574
575 auto until = ceph::mono_clock::now();
576 until += ceph::make_timespan(timeout);
577 if (timeout > 0.0)
578 ldout(cct, 10) << "authenticate will time out at " << until << dendl;
579 while (!active_con && authenticate_err >= 0) {
580 if (timeout > 0.0) {
581 auto r = auth_cond.wait_until(lock, until);
582 if (r == std::cv_status::timeout && !active_con) {
583 ldout(cct, 0) << "authenticate timed out after " << timeout << dendl;
584 authenticate_err = -ETIMEDOUT;
585 }
586 } else {
587 auth_cond.wait(lock);
588 }
589 }
590
591 if (active_con) {
592 ldout(cct, 5) << __func__ << " success, global_id "
593 << active_con->get_global_id() << dendl;
594 // active_con should not have been set if there was an error
595 ceph_assert(authenticate_err >= 0);
596 authenticated = true;
597 }
598
599 if (authenticate_err < 0 && auth_registry.no_keyring_disabled_cephx()) {
600 lderr(cct) << __func__ << " NOTE: no keyring found; disabled cephx authentication" << dendl;
601 }
602
603 return authenticate_err;
604 }
605
606 void MonClient::handle_auth(MAuthReply *m)
607 {
608 ceph_assert(ceph_mutex_is_locked(monc_lock));
609
610 if (m->get_connection()->is_anon()) {
611 // anon connection, used for mon tell commands
612 for (auto& p : mon_commands) {
613 if (p.second->target_con == m->get_connection()) {
614 auto& mc = p.second->target_session;
615 int ret = mc->handle_auth(m, entity_name,
616 CEPH_ENTITY_TYPE_MON,
617 rotating_secrets.get());
618 (void)ret; // we don't care
619 break;
620 }
621 }
622 m->put();
623 return;
624 }
625
626 if (!_hunting()) {
627 std::swap(active_con->get_auth(), auth);
628 int ret = active_con->authenticate(m);
629 m->put();
630 std::swap(auth, active_con->get_auth());
631 if (global_id != active_con->get_global_id()) {
632 lderr(cct) << __func__ << " peer assigned me a different global_id: "
633 << active_con->get_global_id() << dendl;
634 }
635 if (ret != -EAGAIN) {
636 _finish_auth(ret);
637 }
638 return;
639 }
640
641 // hunting
642 auto found = _find_pending_con(m->get_connection());
643 ceph_assert(found != pending_cons.end());
644 int auth_err = found->second.handle_auth(m, entity_name, want_keys,
645 rotating_secrets.get());
646 m->put();
647 if (auth_err == -EAGAIN) {
648 return;
649 }
650 if (auth_err) {
651 pending_cons.erase(found);
652 if (!pending_cons.empty()) {
653 // keep trying with pending connections
654 return;
655 }
656 // the last try just failed, give up.
657 } else {
658 auto& mc = found->second;
659 ceph_assert(mc.have_session());
660 active_con.reset(new MonConnection(std::move(mc)));
661 pending_cons.clear();
662 }
663
664 _finish_hunting(auth_err);
665 _finish_auth(auth_err);
666 }
667
668 void MonClient::_finish_auth(int auth_err)
669 {
670 ldout(cct,10) << __func__ << " " << auth_err << dendl;
671 authenticate_err = auth_err;
672 // _resend_mon_commands() could _reopen_session() if the connected mon is not
673 // the one the MonCommand is targeting.
674 if (!auth_err && active_con) {
675 ceph_assert(auth);
676 _check_auth_tickets();
677 }
678 auth_cond.notify_all();
679 }
680
681 // ---------
682
683 void MonClient::send_mon_message(MessageRef m)
684 {
685 std::lock_guard l{monc_lock};
686 _send_mon_message(std::move(m));
687 }
688
689 void MonClient::_send_mon_message(MessageRef m)
690 {
691 ceph_assert(ceph_mutex_is_locked(monc_lock));
692 if (active_con) {
693 auto cur_con = active_con->get_con();
694 ldout(cct, 10) << "_send_mon_message to mon."
695 << monmap.get_name(cur_con->get_peer_addr())
696 << " at " << cur_con->get_peer_addr() << dendl;
697 cur_con->send_message2(std::move(m));
698 } else {
699 waiting_for_session.push_back(std::move(m));
700 }
701 }
702
703 void MonClient::_reopen_session(int rank)
704 {
705 ceph_assert(ceph_mutex_is_locked(monc_lock));
706 ldout(cct, 10) << __func__ << " rank " << rank << dendl;
707
708 active_con.reset();
709 pending_cons.clear();
710
711 authenticate_err = 1; // == in progress
712
713 _start_hunting();
714
715 if (rank >= 0) {
716 _add_conn(rank);
717 } else {
718 _add_conns();
719 }
720
721 // throw out old queued messages
722 waiting_for_session.clear();
723
724 // throw out version check requests
725 while (!version_requests.empty()) {
726 ceph::async::post(std::move(version_requests.begin()->second),
727 monc_errc::session_reset, 0, 0);
728 version_requests.erase(version_requests.begin());
729 }
730
731 for (auto& c : pending_cons) {
732 c.second.start(monmap.get_epoch(), entity_name);
733 }
734
735 if (sub.reload()) {
736 _renew_subs();
737 }
738 }
739
740 void MonClient::_add_conn(unsigned rank)
741 {
742 auto peer = monmap.get_addrs(rank);
743 auto conn = messenger->connect_to_mon(peer);
744 MonConnection mc(cct, conn, global_id, &auth_registry);
745 if (auth) {
746 mc.get_auth().reset(auth->clone());
747 }
748 pending_cons.insert(std::make_pair(peer, std::move(mc)));
749 ldout(cct, 10) << "picked mon." << monmap.get_name(rank)
750 << " con " << conn
751 << " addr " << peer
752 << dendl;
753 }
754
755 void MonClient::_add_conns()
756 {
757 // collect the next batch of candidates who are listed right next to the ones
758 // already tried
759 auto get_next_batch = [this]() -> std::vector<unsigned> {
760 std::multimap<uint16_t, unsigned> ranks_by_priority;
761 boost::copy(
762 monmap.mon_info | boost::adaptors::filtered(
763 [this](auto& info) {
764 auto rank = monmap.get_rank(info.first);
765 return tried.count(rank) == 0;
766 }) | boost::adaptors::transformed(
767 [this](auto& info) {
768 auto rank = monmap.get_rank(info.first);
769 return std::make_pair(info.second.priority, rank);
770 }), std::inserter(ranks_by_priority, end(ranks_by_priority)));
771 if (ranks_by_priority.empty()) {
772 return {};
773 }
774 // only choose the monitors with lowest priority
775 auto cands = boost::make_iterator_range(
776 ranks_by_priority.equal_range(ranks_by_priority.begin()->first));
777 std::vector<unsigned> ranks;
778 boost::range::copy(cands | boost::adaptors::map_values,
779 std::back_inserter(ranks));
780 return ranks;
781 };
782 auto ranks = get_next_batch();
783 if (ranks.empty()) {
784 tried.clear(); // start over
785 ranks = get_next_batch();
786 }
787 ceph_assert(!ranks.empty());
788 if (ranks.size() > 1) {
789 std::vector<uint16_t> weights;
790 for (auto i : ranks) {
791 auto rank_name = monmap.get_name(i);
792 weights.push_back(monmap.get_weight(rank_name));
793 }
794 random_device_t rd;
795 if (std::accumulate(begin(weights), end(weights), 0u) == 0) {
796 std::shuffle(begin(ranks), end(ranks), std::mt19937{rd()});
797 } else {
798 weighted_shuffle(begin(ranks), end(ranks), begin(weights), end(weights),
799 std::mt19937{rd()});
800 }
801 }
802 ldout(cct, 10) << __func__ << " ranks=" << ranks << dendl;
803 unsigned n = cct->_conf->mon_client_hunt_parallel;
804 if (n == 0 || n > ranks.size()) {
805 n = ranks.size();
806 }
807 for (unsigned i = 0; i < n; i++) {
808 _add_conn(ranks[i]);
809 tried.insert(ranks[i]);
810 }
811 }
812
813 bool MonClient::ms_handle_reset(Connection *con)
814 {
815 std::lock_guard lock(monc_lock);
816
817 if (con->get_peer_type() != CEPH_ENTITY_TYPE_MON)
818 return false;
819
820 if (con->is_anon()) {
821 auto p = mon_commands.begin();
822 while (p != mon_commands.end()) {
823 auto cmd = p->second;
824 ++p;
825 if (cmd->target_con == con) {
826 _send_command(cmd); // may retry or fail
827 break;
828 }
829 }
830 return true;
831 }
832
833 if (_hunting()) {
834 if (pending_cons.count(con->get_peer_addrs())) {
835 ldout(cct, 10) << __func__ << " hunted mon " << con->get_peer_addrs()
836 << dendl;
837 } else {
838 ldout(cct, 10) << __func__ << " stray mon " << con->get_peer_addrs()
839 << dendl;
840 }
841 return true;
842 } else {
843 if (active_con && con == active_con->get_con()) {
844 ldout(cct, 10) << __func__ << " current mon " << con->get_peer_addrs()
845 << dendl;
846 _reopen_session();
847 return false;
848 } else {
849 ldout(cct, 10) << "ms_handle_reset stray mon " << con->get_peer_addrs()
850 << dendl;
851 return true;
852 }
853 }
854 }
855
856 bool MonClient::_opened() const
857 {
858 ceph_assert(ceph_mutex_is_locked(monc_lock));
859 return active_con || _hunting();
860 }
861
862 bool MonClient::_hunting() const
863 {
864 return !pending_cons.empty();
865 }
866
867 void MonClient::_start_hunting()
868 {
869 ceph_assert(!_hunting());
870 // adjust timeouts if necessary
871 if (!had_a_connection)
872 return;
873 reopen_interval_multiplier *= cct->_conf->mon_client_hunt_interval_backoff;
874 if (reopen_interval_multiplier >
875 cct->_conf->mon_client_hunt_interval_max_multiple) {
876 reopen_interval_multiplier =
877 cct->_conf->mon_client_hunt_interval_max_multiple;
878 }
879 }
880
881 void MonClient::_finish_hunting(int auth_err)
882 {
883 ldout(cct,10) << __func__ << " " << auth_err << dendl;
884 ceph_assert(ceph_mutex_is_locked(monc_lock));
885 // the pending conns have been cleaned.
886 ceph_assert(!_hunting());
887 if (active_con) {
888 auto con = active_con->get_con();
889 ldout(cct, 1) << "found mon."
890 << monmap.get_name(con->get_peer_addr())
891 << dendl;
892 } else {
893 ldout(cct, 1) << "no mon sessions established" << dendl;
894 }
895
896 had_a_connection = true;
897 _un_backoff();
898
899 if (!auth_err) {
900 last_rotating_renew_sent = utime_t();
901 while (!waiting_for_session.empty()) {
902 _send_mon_message(std::move(waiting_for_session.front()));
903 waiting_for_session.pop_front();
904 }
905 _resend_mon_commands();
906 send_log(true);
907 if (active_con) {
908 auth = std::move(active_con->get_auth());
909 if (global_id && global_id != active_con->get_global_id()) {
910 lderr(cct) << __func__ << " global_id changed from " << global_id
911 << " to " << active_con->get_global_id() << dendl;
912 }
913 global_id = active_con->get_global_id();
914 }
915 }
916 }
917
918 void MonClient::tick()
919 {
920 ldout(cct, 10) << __func__ << dendl;
921
922 utime_t now = ceph_clock_now();
923
924 auto reschedule_tick = make_scope_guard([this] {
925 schedule_tick();
926 });
927
928 _check_auth_tickets();
929 _check_tell_commands();
930
931 if (_hunting()) {
932 ldout(cct, 1) << "continuing hunt" << dendl;
933 return _reopen_session();
934 } else if (active_con) {
935 // just renew as needed
936 auto cur_con = active_con->get_con();
937 if (!cur_con->has_feature(CEPH_FEATURE_MON_STATEFUL_SUB)) {
938 const bool maybe_renew = sub.need_renew();
939 ldout(cct, 10) << "renew subs? -- " << (maybe_renew ? "yes" : "no")
940 << dendl;
941 if (maybe_renew) {
942 _renew_subs();
943 }
944 }
945
946 if (now > last_keepalive + cct->_conf->mon_client_ping_interval) {
947 cur_con->send_keepalive();
948 last_keepalive = now;
949
950 if (cct->_conf->mon_client_ping_timeout > 0 &&
951 cur_con->has_feature(CEPH_FEATURE_MSGR_KEEPALIVE2)) {
952 utime_t lk = cur_con->get_last_keepalive_ack();
953 utime_t interval = now - lk;
954 if (interval > cct->_conf->mon_client_ping_timeout) {
955 ldout(cct, 1) << "no keepalive since " << lk << " (" << interval
956 << " seconds), reconnecting" << dendl;
957 return _reopen_session();
958 }
959 }
960
961 _un_backoff();
962 }
963
964 if (now > last_send_log + cct->_conf->mon_client_log_interval) {
965 send_log();
966 last_send_log = now;
967 }
968 }
969 }
970
971 void MonClient::_un_backoff()
972 {
973 // un-backoff our reconnect interval
974 reopen_interval_multiplier = std::max(
975 cct->_conf.get_val<double>("mon_client_hunt_interval_min_multiple"),
976 reopen_interval_multiplier /
977 cct->_conf.get_val<double>("mon_client_hunt_interval_backoff"));
978 ldout(cct, 20) << __func__ << " reopen_interval_multipler now "
979 << reopen_interval_multiplier << dendl;
980 }
981
982 void MonClient::schedule_tick()
983 {
984 auto do_tick = make_lambda_context([this](int) { tick(); });
985 if (!is_connected()) {
986 // start another round of hunting
987 const auto hunt_interval = (cct->_conf->mon_client_hunt_interval *
988 reopen_interval_multiplier);
989 timer.add_event_after(hunt_interval, do_tick);
990 } else {
991 // keep in touch
992 timer.add_event_after(std::min(cct->_conf->mon_client_ping_interval,
993 cct->_conf->mon_client_log_interval),
994 do_tick);
995 }
996 }
997
998 // ---------
999
1000 void MonClient::_renew_subs()
1001 {
1002 ceph_assert(ceph_mutex_is_locked(monc_lock));
1003 if (!sub.have_new()) {
1004 ldout(cct, 10) << __func__ << " - empty" << dendl;
1005 return;
1006 }
1007
1008 ldout(cct, 10) << __func__ << dendl;
1009 if (!_opened())
1010 _reopen_session();
1011 else {
1012 auto m = ceph::make_message<MMonSubscribe>();
1013 m->what = sub.get_subs();
1014 m->hostname = ceph_get_short_hostname();
1015 _send_mon_message(std::move(m));
1016 sub.renewed();
1017 }
1018 }
1019
1020 void MonClient::handle_subscribe_ack(MMonSubscribeAck *m)
1021 {
1022 sub.acked(m->interval);
1023 m->put();
1024 }
1025
1026 int MonClient::_check_auth_tickets()
1027 {
1028 ceph_assert(ceph_mutex_is_locked(monc_lock));
1029 if (active_con && auth) {
1030 if (auth->need_tickets()) {
1031 ldout(cct, 10) << __func__ << " getting new tickets!" << dendl;
1032 auto m = ceph::make_message<MAuth>();
1033 m->protocol = auth->get_protocol();
1034 auth->prepare_build_request();
1035 auth->build_request(m->auth_payload);
1036 _send_mon_message(m);
1037 }
1038
1039 _check_auth_rotating();
1040 }
1041 return 0;
1042 }
1043
1044 int MonClient::_check_auth_rotating()
1045 {
1046 ceph_assert(ceph_mutex_is_locked(monc_lock));
1047 if (!rotating_secrets ||
1048 !auth_principal_needs_rotating_keys(entity_name)) {
1049 ldout(cct, 20) << "_check_auth_rotating not needed by " << entity_name << dendl;
1050 return 0;
1051 }
1052
1053 if (!active_con || !auth) {
1054 ldout(cct, 10) << "_check_auth_rotating waiting for auth session" << dendl;
1055 return 0;
1056 }
1057
1058 utime_t now = ceph_clock_now();
1059 utime_t cutoff = now;
1060 cutoff -= std::min(30.0, cct->_conf->auth_service_ticket_ttl / 4.0);
1061 utime_t issued_at_lower_bound = now;
1062 issued_at_lower_bound -= cct->_conf->auth_service_ticket_ttl;
1063 if (!rotating_secrets->need_new_secrets(cutoff)) {
1064 ldout(cct, 10) << "_check_auth_rotating have uptodate secrets (they expire after " << cutoff << ")" << dendl;
1065 rotating_secrets->dump_rotating();
1066 return 0;
1067 }
1068
1069 ldout(cct, 10) << "_check_auth_rotating renewing rotating keys (they expired before " << cutoff << ")" << dendl;
1070 if (!rotating_secrets->need_new_secrets() &&
1071 rotating_secrets->need_new_secrets(issued_at_lower_bound)) {
1072 // the key has expired before it has been issued?
1073 lderr(cct) << __func__ << " possible clock skew, rotating keys expired way too early"
1074 << " (before " << issued_at_lower_bound << ")" << dendl;
1075 }
1076 if ((now > last_rotating_renew_sent) &&
1077 double(now - last_rotating_renew_sent) < 1) {
1078 ldout(cct, 10) << __func__ << " called too often (last: "
1079 << last_rotating_renew_sent << "), skipping refresh" << dendl;
1080 return 0;
1081 }
1082 auto m = ceph::make_message<MAuth>();
1083 m->protocol = auth->get_protocol();
1084 if (auth->build_rotating_request(m->auth_payload)) {
1085 last_rotating_renew_sent = now;
1086 _send_mon_message(std::move(m));
1087 }
1088 return 0;
1089 }
1090
1091 int MonClient::wait_auth_rotating(double timeout)
1092 {
1093 std::unique_lock l(monc_lock);
1094
1095 // Must be initialized
1096 ceph_assert(auth != nullptr);
1097
1098 if (auth->get_protocol() == CEPH_AUTH_NONE)
1099 return 0;
1100
1101 if (!rotating_secrets)
1102 return 0;
1103
1104 ldout(cct, 10) << __func__ << " waiting for " << timeout << dendl;
1105 utime_t cutoff = ceph_clock_now();
1106 cutoff -= std::min(30.0, cct->_conf->auth_service_ticket_ttl / 4.0);
1107 if (auth_cond.wait_for(l, ceph::make_timespan(timeout), [this, cutoff] {
1108 return (!auth_principal_needs_rotating_keys(entity_name) ||
1109 !rotating_secrets->need_new_secrets(cutoff));
1110 })) {
1111 ldout(cct, 10) << __func__ << " done" << dendl;
1112 return 0;
1113 } else {
1114 ldout(cct, 0) << __func__ << " timed out after " << timeout << dendl;
1115 return -ETIMEDOUT;
1116 }
1117 }
1118
1119 // ---------
1120
1121 void MonClient::_send_command(MonCommand *r)
1122 {
1123 if (r->is_tell()) {
1124 ++r->send_attempts;
1125 if (r->send_attempts > cct->_conf->mon_client_directed_command_retry) {
1126 _finish_command(r, monc_errc::mon_unavailable, "mon unavailable", {});
1127 return;
1128 }
1129 // tell-style command
1130 if (monmap.min_mon_release >= ceph_release_t::octopus) {
1131 if (r->target_con) {
1132 r->target_con->mark_down();
1133 }
1134 if (r->target_rank >= 0) {
1135 if (r->target_rank >= (int)monmap.size()) {
1136 ldout(cct, 10) << " target " << r->target_rank
1137 << " >= max mon " << monmap.size() << dendl;
1138 _finish_command(r, monc_errc::rank_dne, "mon rank dne"sv, {});
1139 return;
1140 }
1141 r->target_con = messenger->connect_to_mon(
1142 monmap.get_addrs(r->target_rank), true /* anon */);
1143 } else {
1144 if (!monmap.contains(r->target_name)) {
1145 ldout(cct, 10) << " target " << r->target_name
1146 << " not present in monmap" << dendl;
1147 _finish_command(r, monc_errc::mon_dne, "mon dne"sv, {});
1148 return;
1149 }
1150 r->target_con = messenger->connect_to_mon(
1151 monmap.get_addrs(r->target_name), true /* anon */);
1152 }
1153
1154 r->target_session.reset(new MonConnection(cct, r->target_con, 0,
1155 &auth_registry));
1156 r->target_session->start(monmap.get_epoch(), entity_name);
1157 r->last_send_attempt = ceph_clock_now();
1158
1159 MCommand *m = new MCommand(monmap.fsid);
1160 m->set_tid(r->tid);
1161 m->cmd = r->cmd;
1162 m->set_data(r->inbl);
1163 r->target_session->queue_command(m);
1164 return;
1165 }
1166
1167 // ugly legacy handling of pre-octopus mons
1168 entity_addr_t peer;
1169 if (active_con) {
1170 peer = active_con->get_con()->get_peer_addr();
1171 }
1172
1173 if (r->target_rank >= 0 &&
1174 r->target_rank != monmap.get_rank(peer)) {
1175 ldout(cct, 10) << __func__ << " " << r->tid << " " << r->cmd
1176 << " wants rank " << r->target_rank
1177 << ", reopening session"
1178 << dendl;
1179 if (r->target_rank >= (int)monmap.size()) {
1180 ldout(cct, 10) << " target " << r->target_rank
1181 << " >= max mon " << monmap.size() << dendl;
1182 _finish_command(r, monc_errc::rank_dne, "mon rank dne"sv, {});
1183 return;
1184 }
1185 _reopen_session(r->target_rank);
1186 return;
1187 }
1188 if (r->target_name.length() &&
1189 r->target_name != monmap.get_name(peer)) {
1190 ldout(cct, 10) << __func__ << " " << r->tid << " " << r->cmd
1191 << " wants mon " << r->target_name
1192 << ", reopening session"
1193 << dendl;
1194 if (!monmap.contains(r->target_name)) {
1195 ldout(cct, 10) << " target " << r->target_name
1196 << " not present in monmap" << dendl;
1197 _finish_command(r, monc_errc::mon_dne, "mon dne"sv, {});
1198 return;
1199 }
1200 _reopen_session(monmap.get_rank(r->target_name));
1201 return;
1202 }
1203 // fall-thru to send 'normal' CLI command
1204 }
1205
1206 // normal CLI command
1207 ldout(cct, 10) << __func__ << " " << r->tid << " " << r->cmd << dendl;
1208 auto m = ceph::make_message<MMonCommand>(monmap.fsid);
1209 m->set_tid(r->tid);
1210 m->cmd = r->cmd;
1211 m->set_data(r->inbl);
1212 _send_mon_message(std::move(m));
1213 return;
1214 }
1215
1216 void MonClient::_check_tell_commands()
1217 {
1218 // resend any requests
1219 auto now = ceph_clock_now();
1220 auto p = mon_commands.begin();
1221 while (p != mon_commands.end()) {
1222 auto cmd = p->second;
1223 ++p;
1224 if (cmd->is_tell() &&
1225 cmd->last_send_attempt != utime_t() &&
1226 now - cmd->last_send_attempt > cct->_conf->mon_client_hunt_interval) {
1227 ldout(cct,5) << __func__ << " timeout tell command " << cmd->tid << dendl;
1228 _send_command(cmd); // might remove cmd from mon_commands
1229 }
1230 }
1231 }
1232
1233 void MonClient::_resend_mon_commands()
1234 {
1235 // resend any requests
1236 auto p = mon_commands.begin();
1237 while (p != mon_commands.end()) {
1238 auto cmd = p->second;
1239 ++p;
1240 if (cmd->is_tell() && monmap.min_mon_release >= ceph_release_t::octopus) {
1241 // starting with octopus, tell commands use their own connetion and need no
1242 // special resend when we finish hunting.
1243 } else {
1244 _send_command(cmd); // might remove cmd from mon_commands
1245 }
1246 }
1247 }
1248
1249 void MonClient::handle_mon_command_ack(MMonCommandAck *ack)
1250 {
1251 MonCommand *r = NULL;
1252 uint64_t tid = ack->get_tid();
1253
1254 if (tid == 0 && !mon_commands.empty()) {
1255 r = mon_commands.begin()->second;
1256 ldout(cct, 10) << __func__ << " has tid 0, assuming it is " << r->tid << dendl;
1257 } else {
1258 auto p = mon_commands.find(tid);
1259 if (p == mon_commands.end()) {
1260 ldout(cct, 10) << __func__ << " " << ack->get_tid() << " not found" << dendl;
1261 ack->put();
1262 return;
1263 }
1264 r = p->second;
1265 }
1266
1267 ldout(cct, 10) << __func__ << " " << r->tid << " " << r->cmd << dendl;
1268 auto ec = ack->r < 0 ? bs::error_code(-ack->r, mon_category())
1269 : bs::error_code();
1270 _finish_command(r, ec, ack->rs,
1271 std::move(ack->get_data()));
1272 ack->put();
1273 }
1274
1275 void MonClient::handle_command_reply(MCommandReply *reply)
1276 {
1277 MonCommand *r = NULL;
1278 uint64_t tid = reply->get_tid();
1279
1280 if (tid == 0 && !mon_commands.empty()) {
1281 r = mon_commands.begin()->second;
1282 ldout(cct, 10) << __func__ << " has tid 0, assuming it is " << r->tid
1283 << dendl;
1284 } else {
1285 auto p = mon_commands.find(tid);
1286 if (p == mon_commands.end()) {
1287 ldout(cct, 10) << __func__ << " " << reply->get_tid() << " not found"
1288 << dendl;
1289 reply->put();
1290 return;
1291 }
1292 r = p->second;
1293 }
1294
1295 ldout(cct, 10) << __func__ << " " << r->tid << " " << r->cmd << dendl;
1296 auto ec = reply->r < 0 ? bs::error_code(-reply->r, mon_category())
1297 : bs::error_code();
1298 _finish_command(r, ec, reply->rs, std::move(reply->get_data()));
1299 reply->put();
1300 }
1301
1302 int MonClient::_cancel_mon_command(uint64_t tid)
1303 {
1304 ceph_assert(ceph_mutex_is_locked(monc_lock));
1305
1306 auto it = mon_commands.find(tid);
1307 if (it == mon_commands.end()) {
1308 ldout(cct, 10) << __func__ << " tid " << tid << " dne" << dendl;
1309 return -ENOENT;
1310 }
1311
1312 ldout(cct, 10) << __func__ << " tid " << tid << dendl;
1313
1314 MonCommand *cmd = it->second;
1315 _finish_command(cmd, monc_errc::timed_out, "timed out"sv, {});
1316 return 0;
1317 }
1318
1319 void MonClient::_finish_command(MonCommand *r, bs::error_code ret,
1320 std::string_view rs, ceph::buffer::list&& bl)
1321 {
1322 ldout(cct, 10) << __func__ << " " << r->tid << " = " << ret << " " << rs
1323 << dendl;
1324 ceph::async::post(std::move(r->onfinish), ret, std::string(rs),
1325 std::move(bl));
1326 if (r->target_con) {
1327 r->target_con->mark_down();
1328 }
1329 mon_commands.erase(r->tid);
1330 delete r;
1331 }
1332
1333 // ---------
1334
1335 void MonClient::handle_get_version_reply(MMonGetVersionReply* m)
1336 {
1337 ceph_assert(ceph_mutex_is_locked(monc_lock));
1338 auto iter = version_requests.find(m->handle);
1339 if (iter == version_requests.end()) {
1340 ldout(cct, 0) << __func__ << " version request with handle " << m->handle
1341 << " not found" << dendl;
1342 } else {
1343 auto req = std::move(iter->second);
1344 ldout(cct, 10) << __func__ << " finishing " << iter->first << " version "
1345 << m->version << dendl;
1346 version_requests.erase(iter);
1347 ceph::async::post(std::move(req), bs::error_code(),
1348 m->version, m->oldest_version);
1349 }
1350 m->put();
1351 }
1352
1353 int MonClient::get_auth_request(
1354 Connection *con,
1355 AuthConnectionMeta *auth_meta,
1356 uint32_t *auth_method,
1357 std::vector<uint32_t> *preferred_modes,
1358 ceph::buffer::list *bl)
1359 {
1360 std::lock_guard l(monc_lock);
1361 ldout(cct,10) << __func__ << " con " << con << " auth_method " << *auth_method
1362 << dendl;
1363
1364 // connection to mon?
1365 if (con->get_peer_type() == CEPH_ENTITY_TYPE_MON) {
1366 ceph_assert(!auth_meta->authorizer);
1367 if (con->is_anon()) {
1368 for (auto& i : mon_commands) {
1369 if (i.second->target_con == con) {
1370 return i.second->target_session->get_auth_request(
1371 auth_method, preferred_modes, bl,
1372 entity_name, want_keys, rotating_secrets.get());
1373 }
1374 }
1375 }
1376 for (auto& i : pending_cons) {
1377 if (i.second.is_con(con)) {
1378 return i.second.get_auth_request(
1379 auth_method, preferred_modes, bl,
1380 entity_name, want_keys, rotating_secrets.get());
1381 }
1382 }
1383 return -ENOENT;
1384 }
1385
1386 // generate authorizer
1387 if (!auth) {
1388 lderr(cct) << __func__ << " but no auth handler is set up" << dendl;
1389 return -EACCES;
1390 }
1391 auth_meta->authorizer.reset(auth->build_authorizer(con->get_peer_type()));
1392 if (!auth_meta->authorizer) {
1393 lderr(cct) << __func__ << " failed to build_authorizer for type "
1394 << ceph_entity_type_name(con->get_peer_type()) << dendl;
1395 return -EACCES;
1396 }
1397 auth_meta->auth_method = auth_meta->authorizer->protocol;
1398 auth_registry.get_supported_modes(con->get_peer_type(),
1399 auth_meta->auth_method,
1400 preferred_modes);
1401 *bl = auth_meta->authorizer->bl;
1402 return 0;
1403 }
1404
1405 int MonClient::handle_auth_reply_more(
1406 Connection *con,
1407 AuthConnectionMeta *auth_meta,
1408 const ceph::buffer::list& bl,
1409 ceph::buffer::list *reply)
1410 {
1411 std::lock_guard l(monc_lock);
1412
1413 if (con->get_peer_type() == CEPH_ENTITY_TYPE_MON) {
1414 if (con->is_anon()) {
1415 for (auto& i : mon_commands) {
1416 if (i.second->target_con == con) {
1417 return i.second->target_session->handle_auth_reply_more(
1418 auth_meta, bl, reply);
1419 }
1420 }
1421 }
1422 for (auto& i : pending_cons) {
1423 if (i.second.is_con(con)) {
1424 return i.second.handle_auth_reply_more(auth_meta, bl, reply);
1425 }
1426 }
1427 return -ENOENT;
1428 }
1429
1430 // authorizer challenges
1431 if (!auth || !auth_meta->authorizer) {
1432 lderr(cct) << __func__ << " no authorizer?" << dendl;
1433 return -1;
1434 }
1435 auth_meta->authorizer->add_challenge(cct, bl);
1436 *reply = auth_meta->authorizer->bl;
1437 return 0;
1438 }
1439
1440 int MonClient::handle_auth_done(
1441 Connection *con,
1442 AuthConnectionMeta *auth_meta,
1443 uint64_t global_id,
1444 uint32_t con_mode,
1445 const ceph::buffer::list& bl,
1446 CryptoKey *session_key,
1447 std::string *connection_secret)
1448 {
1449 if (con->get_peer_type() == CEPH_ENTITY_TYPE_MON) {
1450 std::lock_guard l(monc_lock);
1451 if (con->is_anon()) {
1452 for (auto& i : mon_commands) {
1453 if (i.second->target_con == con) {
1454 return i.second->target_session->handle_auth_done(
1455 auth_meta, global_id, bl,
1456 session_key, connection_secret);
1457 }
1458 }
1459 }
1460 for (auto& i : pending_cons) {
1461 if (i.second.is_con(con)) {
1462 int r = i.second.handle_auth_done(
1463 auth_meta, global_id, bl,
1464 session_key, connection_secret);
1465 if (r) {
1466 pending_cons.erase(i.first);
1467 if (!pending_cons.empty()) {
1468 return r;
1469 }
1470 } else {
1471 active_con.reset(new MonConnection(std::move(i.second)));
1472 pending_cons.clear();
1473 ceph_assert(active_con->have_session());
1474 }
1475
1476 _finish_hunting(r);
1477 if (r || monmap.get_epoch() > 0) {
1478 _finish_auth(r);
1479 }
1480 return r;
1481 }
1482 }
1483 return -ENOENT;
1484 } else {
1485 // verify authorizer reply
1486 auto p = bl.begin();
1487 if (!auth_meta->authorizer->verify_reply(p, &auth_meta->connection_secret)) {
1488 ldout(cct, 0) << __func__ << " failed verifying authorizer reply"
1489 << dendl;
1490 return -EACCES;
1491 }
1492 auth_meta->session_key = auth_meta->authorizer->session_key;
1493 return 0;
1494 }
1495 }
1496
1497 int MonClient::handle_auth_bad_method(
1498 Connection *con,
1499 AuthConnectionMeta *auth_meta,
1500 uint32_t old_auth_method,
1501 int result,
1502 const std::vector<uint32_t>& allowed_methods,
1503 const std::vector<uint32_t>& allowed_modes)
1504 {
1505 auth_meta->allowed_methods = allowed_methods;
1506
1507 std::lock_guard l(monc_lock);
1508 if (con->get_peer_type() == CEPH_ENTITY_TYPE_MON) {
1509 if (con->is_anon()) {
1510 for (auto& i : mon_commands) {
1511 if (i.second->target_con == con) {
1512 int r = i.second->target_session->handle_auth_bad_method(
1513 old_auth_method,
1514 result,
1515 allowed_methods,
1516 allowed_modes);
1517 if (r < 0) {
1518 auto ec = bs::error_code(-r, mon_category());
1519 _finish_command(i.second, ec, "auth failed"sv, {});
1520 }
1521 return r;
1522 }
1523 }
1524 }
1525 for (auto& i : pending_cons) {
1526 if (i.second.is_con(con)) {
1527 int r = i.second.handle_auth_bad_method(old_auth_method,
1528 result,
1529 allowed_methods,
1530 allowed_modes);
1531 if (r == 0) {
1532 return r; // try another method on this con
1533 }
1534 pending_cons.erase(i.first);
1535 if (!pending_cons.empty()) {
1536 return r; // fail this con, maybe another con will succeed
1537 }
1538 // fail hunt
1539 _finish_hunting(r);
1540 _finish_auth(r);
1541 return r;
1542 }
1543 }
1544 return -ENOENT;
1545 } else {
1546 // huh...
1547 ldout(cct,10) << __func__ << " hmm, they didn't like " << old_auth_method
1548 << " result " << cpp_strerror(result)
1549 << " and auth is " << (auth ? auth->get_protocol() : 0)
1550 << dendl;
1551 return -EACCES;
1552 }
1553 }
1554
1555 int MonClient::handle_auth_request(
1556 Connection *con,
1557 AuthConnectionMeta *auth_meta,
1558 bool more,
1559 uint32_t auth_method,
1560 const ceph::buffer::list& payload,
1561 ceph::buffer::list *reply)
1562 {
1563 if (payload.length() == 0) {
1564 // for some channels prior to nautilus (osd heartbeat), we
1565 // tolerate the lack of an authorizer.
1566 if (!con->get_messenger()->require_authorizer) {
1567 handle_authentication_dispatcher->ms_handle_authentication(con);
1568 return 1;
1569 }
1570 return -EACCES;
1571 }
1572 auth_meta->auth_mode = payload[0];
1573 if (auth_meta->auth_mode < AUTH_MODE_AUTHORIZER ||
1574 auth_meta->auth_mode > AUTH_MODE_AUTHORIZER_MAX) {
1575 return -EACCES;
1576 }
1577 AuthAuthorizeHandler *ah = get_auth_authorize_handler(con->get_peer_type(),
1578 auth_method);
1579 if (!ah) {
1580 lderr(cct) << __func__ << " no AuthAuthorizeHandler found for auth method "
1581 << auth_method << dendl;
1582 return -EOPNOTSUPP;
1583 }
1584
1585 auto ac = &auth_meta->authorizer_challenge;
1586 if (auth_meta->skip_authorizer_challenge) {
1587 ldout(cct, 10) << __func__ << " skipping challenge on " << con << dendl;
1588 ac = nullptr;
1589 }
1590
1591 bool was_challenge = (bool)auth_meta->authorizer_challenge;
1592 bool isvalid = ah->verify_authorizer(
1593 cct,
1594 *rotating_secrets,
1595 payload,
1596 auth_meta->get_connection_secret_length(),
1597 reply,
1598 &con->peer_name,
1599 &con->peer_global_id,
1600 &con->peer_caps_info,
1601 &auth_meta->session_key,
1602 &auth_meta->connection_secret,
1603 ac);
1604 if (isvalid) {
1605 handle_authentication_dispatcher->ms_handle_authentication(con);
1606 return 1;
1607 }
1608 if (!more && !was_challenge && auth_meta->authorizer_challenge) {
1609 ldout(cct,10) << __func__ << " added challenge on " << con << dendl;
1610 return 0;
1611 }
1612 ldout(cct,10) << __func__ << " bad authorizer on " << con << dendl;
1613 // discard old challenge
1614 auth_meta->authorizer_challenge.reset();
1615 return -EACCES;
1616 }
1617
1618 AuthAuthorizer* MonClient::build_authorizer(int service_id) const {
1619 std::lock_guard l(monc_lock);
1620 if (auth) {
1621 return auth->build_authorizer(service_id);
1622 } else {
1623 ldout(cct, 0) << __func__ << " for " << ceph_entity_type_name(service_id)
1624 << ", but no auth is available now" << dendl;
1625 return nullptr;
1626 }
1627 }
1628
1629 #define dout_subsys ceph_subsys_monc
1630 #undef dout_prefix
1631 #define dout_prefix *_dout << "monclient" << (have_session() ? ": " : "(hunting): ")
1632
1633 MonConnection::MonConnection(
1634 CephContext *cct, ConnectionRef con, uint64_t global_id,
1635 AuthRegistry *ar)
1636 : cct(cct), con(con), global_id(global_id), auth_registry(ar)
1637 {}
1638
1639 MonConnection::~MonConnection()
1640 {
1641 if (con) {
1642 con->mark_down();
1643 con.reset();
1644 }
1645 }
1646
1647 bool MonConnection::have_session() const
1648 {
1649 return state == State::HAVE_SESSION;
1650 }
1651
1652 void MonConnection::start(epoch_t epoch,
1653 const EntityName& entity_name)
1654 {
1655 using ceph::encode;
1656 auth_start = ceph_clock_now();
1657
1658 if (con->get_peer_addr().is_msgr2()) {
1659 ldout(cct, 10) << __func__ << " opening mon connection" << dendl;
1660 state = State::AUTHENTICATING;
1661 con->send_message(new MMonGetMap());
1662 return;
1663 }
1664
1665 // restart authentication handshake
1666 state = State::NEGOTIATING;
1667
1668 // send an initial keepalive to ensure our timestamp is valid by the
1669 // time we are in an OPENED state (by sequencing this before
1670 // authentication).
1671 con->send_keepalive();
1672
1673 auto m = new MAuth;
1674 m->protocol = CEPH_AUTH_UNKNOWN;
1675 m->monmap_epoch = epoch;
1676 __u8 struct_v = 1;
1677 encode(struct_v, m->auth_payload);
1678 std::vector<uint32_t> auth_supported;
1679 auth_registry->get_supported_methods(con->get_peer_type(), &auth_supported);
1680 encode(auth_supported, m->auth_payload);
1681 encode(entity_name, m->auth_payload);
1682 encode(global_id, m->auth_payload);
1683 con->send_message(m);
1684 }
1685
1686 int MonConnection::get_auth_request(
1687 uint32_t *method,
1688 std::vector<uint32_t> *preferred_modes,
1689 ceph::buffer::list *bl,
1690 const EntityName& entity_name,
1691 uint32_t want_keys,
1692 RotatingKeyRing* keyring)
1693 {
1694 using ceph::encode;
1695 // choose method
1696 if (auth_method < 0) {
1697 std::vector<uint32_t> as;
1698 auth_registry->get_supported_methods(con->get_peer_type(), &as);
1699 if (as.empty()) {
1700 return -EACCES;
1701 }
1702 auth_method = as.front();
1703 }
1704 *method = auth_method;
1705 auth_registry->get_supported_modes(con->get_peer_type(), auth_method,
1706 preferred_modes);
1707 ldout(cct,10) << __func__ << " method " << *method
1708 << " preferred_modes " << *preferred_modes << dendl;
1709 if (preferred_modes->empty()) {
1710 return -EACCES;
1711 }
1712
1713 int r = _init_auth(*method, entity_name, want_keys, keyring, true);
1714 ceph_assert(r == 0);
1715
1716 // initial requset includes some boilerplate...
1717 encode((char)AUTH_MODE_MON, *bl);
1718 encode(entity_name, *bl);
1719 encode(global_id, *bl);
1720
1721 // and (maybe) some method-specific initial payload
1722 auth->build_initial_request(bl);
1723
1724 return 0;
1725 }
1726
1727 int MonConnection::handle_auth_reply_more(
1728 AuthConnectionMeta *auth_meta,
1729 const ceph::buffer::list& bl,
1730 ceph::buffer::list *reply)
1731 {
1732 ldout(cct, 10) << __func__ << " payload " << bl.length() << dendl;
1733 ldout(cct, 30) << __func__ << " got\n";
1734 bl.hexdump(*_dout);
1735 *_dout << dendl;
1736
1737 auto p = bl.cbegin();
1738 ldout(cct, 10) << __func__ << " payload_len " << bl.length() << dendl;
1739 int r = auth->handle_response(0, p, &auth_meta->session_key,
1740 &auth_meta->connection_secret);
1741 if (r == -EAGAIN) {
1742 auth->prepare_build_request();
1743 auth->build_request(*reply);
1744 ldout(cct, 10) << __func__ << " responding with " << reply->length()
1745 << " bytes" << dendl;
1746 r = 0;
1747 } else if (r < 0) {
1748 lderr(cct) << __func__ << " handle_response returned " << r << dendl;
1749 } else {
1750 ldout(cct, 10) << __func__ << " authenticated!" << dendl;
1751 // FIXME
1752 ceph_abort(cct, "write me");
1753 }
1754 return r;
1755 }
1756
1757 int MonConnection::handle_auth_done(
1758 AuthConnectionMeta *auth_meta,
1759 uint64_t new_global_id,
1760 const ceph::buffer::list& bl,
1761 CryptoKey *session_key,
1762 std::string *connection_secret)
1763 {
1764 ldout(cct,10) << __func__ << " global_id " << new_global_id
1765 << " payload " << bl.length()
1766 << dendl;
1767 global_id = new_global_id;
1768 auth->set_global_id(global_id);
1769 auto p = bl.begin();
1770 int auth_err = auth->handle_response(0, p, &auth_meta->session_key,
1771 &auth_meta->connection_secret);
1772 if (auth_err >= 0) {
1773 state = State::HAVE_SESSION;
1774 }
1775 con->set_last_keepalive_ack(auth_start);
1776
1777 if (pending_tell_command) {
1778 con->send_message2(std::move(pending_tell_command));
1779 }
1780 return auth_err;
1781 }
1782
1783 int MonConnection::handle_auth_bad_method(
1784 uint32_t old_auth_method,
1785 int result,
1786 const std::vector<uint32_t>& allowed_methods,
1787 const std::vector<uint32_t>& allowed_modes)
1788 {
1789 ldout(cct,10) << __func__ << " old_auth_method " << old_auth_method
1790 << " result " << cpp_strerror(result)
1791 << " allowed_methods " << allowed_methods << dendl;
1792 std::vector<uint32_t> auth_supported;
1793 auth_registry->get_supported_methods(con->get_peer_type(), &auth_supported);
1794 auto p = std::find(auth_supported.begin(), auth_supported.end(),
1795 old_auth_method);
1796 assert(p != auth_supported.end());
1797 p = std::find_first_of(std::next(p), auth_supported.end(),
1798 allowed_methods.begin(), allowed_methods.end());
1799 if (p == auth_supported.end()) {
1800 lderr(cct) << __func__ << " server allowed_methods " << allowed_methods
1801 << " but i only support " << auth_supported << dendl;
1802 return -EACCES;
1803 }
1804 auth_method = *p;
1805 ldout(cct,10) << __func__ << " will try " << auth_method << " next" << dendl;
1806 return 0;
1807 }
1808
1809 int MonConnection::handle_auth(MAuthReply* m,
1810 const EntityName& entity_name,
1811 uint32_t want_keys,
1812 RotatingKeyRing* keyring)
1813 {
1814 if (state == State::NEGOTIATING) {
1815 int r = _negotiate(m, entity_name, want_keys, keyring);
1816 if (r) {
1817 return r;
1818 }
1819 state = State::AUTHENTICATING;
1820 }
1821 int r = authenticate(m);
1822 if (!r) {
1823 state = State::HAVE_SESSION;
1824 }
1825 return r;
1826 }
1827
1828 int MonConnection::_negotiate(MAuthReply *m,
1829 const EntityName& entity_name,
1830 uint32_t want_keys,
1831 RotatingKeyRing* keyring)
1832 {
1833 int r = _init_auth(m->protocol, entity_name, want_keys, keyring, false);
1834 if (r == -ENOTSUP) {
1835 if (m->result == -ENOTSUP) {
1836 ldout(cct, 10) << "none of our auth protocols are supported by the server"
1837 << dendl;
1838 }
1839 return m->result;
1840 }
1841 return r;
1842 }
1843
1844 int MonConnection::_init_auth(
1845 uint32_t method,
1846 const EntityName& entity_name,
1847 uint32_t want_keys,
1848 RotatingKeyRing* keyring,
1849 bool msgr2)
1850 {
1851 ldout(cct, 10) << __func__ << " method " << method << dendl;
1852 if (auth && auth->get_protocol() == (int)method) {
1853 ldout(cct, 10) << __func__ << " already have auth, reseting" << dendl;
1854 auth->reset();
1855 return 0;
1856 }
1857
1858 ldout(cct, 10) << __func__ << " creating new auth" << dendl;
1859 auth.reset(AuthClientHandler::create(cct, method, keyring));
1860 if (!auth) {
1861 ldout(cct, 10) << " no handler for protocol " << method << dendl;
1862 return -ENOTSUP;
1863 }
1864
1865 // do not request MGR key unless the mon has the SERVER_KRAKEN
1866 // feature. otherwise it will give us an auth error. note that
1867 // we have to use the FEATUREMASK because pre-jewel the kraken
1868 // feature bit was used for something else.
1869 if (!msgr2 &&
1870 (want_keys & CEPH_ENTITY_TYPE_MGR) &&
1871 !(con->has_features(CEPH_FEATUREMASK_SERVER_KRAKEN))) {
1872 ldout(cct, 1) << __func__
1873 << " not requesting MGR keys from pre-kraken monitor"
1874 << dendl;
1875 want_keys &= ~CEPH_ENTITY_TYPE_MGR;
1876 }
1877 auth->set_want_keys(want_keys);
1878 auth->init(entity_name);
1879 auth->set_global_id(global_id);
1880 return 0;
1881 }
1882
1883 int MonConnection::authenticate(MAuthReply *m)
1884 {
1885 ceph_assert(auth);
1886 if (!m->global_id) {
1887 ldout(cct, 1) << "peer sent an invalid global_id" << dendl;
1888 }
1889 if (m->global_id != global_id) {
1890 // it's a new session
1891 auth->reset();
1892 global_id = m->global_id;
1893 auth->set_global_id(global_id);
1894 ldout(cct, 10) << "my global_id is " << m->global_id << dendl;
1895 }
1896 auto p = m->result_bl.cbegin();
1897 int ret = auth->handle_response(m->result, p, nullptr, nullptr);
1898 if (ret == -EAGAIN) {
1899 auto ma = new MAuth;
1900 ma->protocol = auth->get_protocol();
1901 auth->prepare_build_request();
1902 auth->build_request(ma->auth_payload);
1903 con->send_message(ma);
1904 }
1905 if (ret == 0 && pending_tell_command) {
1906 con->send_message2(std::move(pending_tell_command));
1907 }
1908
1909 return ret;
1910 }
1911
1912 void MonClient::register_config_callback(md_config_t::config_callback fn) {
1913 ceph_assert(!config_cb);
1914 config_cb = fn;
1915 }
1916
1917 md_config_t::config_callback MonClient::get_config_callback() {
1918 return config_cb;
1919 }
1920
1921 #pragma GCC diagnostic push
1922 #pragma GCC diagnostic ignored "-Wnon-virtual-dtor"
1923 #pragma clang diagnostic push
1924 #pragma clang diagnostic ignored "-Wnon-virtual-dtor"
1925 class monc_error_category : public ceph::converting_category {
1926 public:
1927 monc_error_category(){}
1928 const char* name() const noexcept override;
1929 const char* message(int ev, char*, std::size_t) const noexcept override;
1930 std::string message(int ev) const override;
1931 bs::error_condition default_error_condition(int ev) const noexcept
1932 override;
1933 bool equivalent(int ev, const bs::error_condition& c) const
1934 noexcept override;
1935 using ceph::converting_category::equivalent;
1936 int from_code(int ev) const noexcept override;
1937 };
1938 #pragma GCC diagnostic pop
1939 #pragma clang diagnostic pop
1940
1941 const char* monc_error_category::name() const noexcept {
1942 return "monc";
1943 }
1944
1945 const char* monc_error_category::message(int ev, char*, std::size_t) const noexcept {
1946 if (ev == 0)
1947 return "No error";
1948
1949 switch (static_cast<monc_errc>(ev)) {
1950 case monc_errc::shutting_down: // Command failed due to MonClient shutting down
1951 return "Command failed due to MonClient shutting down";
1952 case monc_errc::session_reset:
1953 return "Monitor session was reset";
1954 case monc_errc::rank_dne:
1955 return "Requested monitor rank does not exist";
1956 case monc_errc::mon_dne:
1957 return "Requested monitor does not exist";
1958 case monc_errc::timed_out:
1959 return "Monitor operation timed out";
1960 case monc_errc::mon_unavailable:
1961 return "Monitor unavailable";
1962 }
1963
1964 return "Unknown error";
1965 }
1966
1967 std::string monc_error_category::message(int ev) const {
1968 return message(ev, nullptr, 0);
1969 }
1970
1971 bs::error_condition monc_error_category::default_error_condition(int ev) const noexcept {
1972 switch (static_cast<monc_errc>(ev)) {
1973 case monc_errc::shutting_down:
1974 return bs::errc::operation_canceled;
1975 case monc_errc::session_reset:
1976 return bs::errc::resource_unavailable_try_again;
1977 case monc_errc::rank_dne:
1978 [[fallthrough]];
1979 case monc_errc::mon_dne:
1980 return ceph::errc::not_in_map;
1981 case monc_errc::timed_out:
1982 return bs::errc::timed_out;
1983 case monc_errc::mon_unavailable:
1984 return bs::errc::no_such_device;
1985 }
1986 return { ev, *this };
1987 }
1988
1989 bool monc_error_category::equivalent(int ev, const bs::error_condition& c) const noexcept {
1990 switch (static_cast<monc_errc>(ev)) {
1991 case monc_errc::rank_dne:
1992 [[fallthrough]];
1993 case monc_errc::mon_dne:
1994 return c == bs::errc::no_such_file_or_directory;
1995 default:
1996 return default_error_condition(ev) == c;
1997 }
1998 }
1999
2000 int monc_error_category::from_code(int ev) const noexcept {
2001 if (ev == 0)
2002 return 0;
2003
2004 switch (static_cast<monc_errc>(ev)) {
2005 case monc_errc::shutting_down:
2006 return -ECANCELED;
2007 case monc_errc::session_reset:
2008 return -EAGAIN;
2009 case monc_errc::rank_dne:
2010 [[fallthrough]];
2011 case monc_errc::mon_dne:
2012 return -ENOENT;
2013 case monc_errc::timed_out:
2014 return -ETIMEDOUT;
2015 case monc_errc::mon_unavailable:
2016 return -ENXIO;
2017 }
2018 return -EDOM;
2019 }
2020
2021 const bs::error_category& monc_category() noexcept {
2022 static const monc_error_category c;
2023 return c;
2024 }