]> git.proxmox.com Git - ceph.git/blob - ceph/src/mon/MonClient.cc
import 15.2.0 Octopus source
[ceph.git] / ceph / src / mon / MonClient.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #include <algorithm>
16 #include <iterator>
17 #include <random>
18 #include <boost/range/adaptor/map.hpp>
19 #include <boost/range/adaptor/filtered.hpp>
20 #include <boost/range/algorithm/copy.hpp>
21 #include <boost/range/algorithm_ext/copy_n.hpp>
22 #include "common/weighted_shuffle.h"
23
24 #include "include/scope_guard.h"
25 #include "include/stringify.h"
26
27 #include "messages/MMonGetMap.h"
28 #include "messages/MMonGetVersion.h"
29 #include "messages/MMonGetVersionReply.h"
30 #include "messages/MMonMap.h"
31 #include "messages/MConfig.h"
32 #include "messages/MGetConfig.h"
33 #include "messages/MAuth.h"
34 #include "messages/MLogAck.h"
35 #include "messages/MAuthReply.h"
36 #include "messages/MMonCommand.h"
37 #include "messages/MMonCommandAck.h"
38 #include "messages/MCommand.h"
39 #include "messages/MCommandReply.h"
40 #include "messages/MPing.h"
41
42 #include "messages/MMonSubscribe.h"
43 #include "messages/MMonSubscribeAck.h"
44 #include "common/errno.h"
45 #include "common/hostname.h"
46 #include "common/LogClient.h"
47
48 #include "MonClient.h"
49 #include "MonMap.h"
50
51 #include "auth/Auth.h"
52 #include "auth/KeyRing.h"
53 #include "auth/AuthClientHandler.h"
54 #include "auth/AuthRegistry.h"
55 #include "auth/RotatingKeyRing.h"
56
57 #define dout_subsys ceph_subsys_monc
58 #undef dout_prefix
59 #define dout_prefix *_dout << "monclient" << (_hunting() ? "(hunting)":"") << ": "
60
61 using std::string;
62
63 MonClient::MonClient(CephContext *cct_) :
64 Dispatcher(cct_),
65 AuthServer(cct_),
66 messenger(NULL),
67 timer(cct_, monc_lock),
68 finisher(cct_),
69 initialized(false),
70 log_client(NULL),
71 more_log_pending(false),
72 want_monmap(true),
73 had_a_connection(false),
74 reopen_interval_multiplier(
75 cct_->_conf.get_val<double>("mon_client_hunt_interval_min_multiple")),
76 last_mon_command_tid(0),
77 version_req_id(0)
78 {}
79
80 MonClient::~MonClient()
81 {
82 }
83
84 int MonClient::build_initial_monmap()
85 {
86 ldout(cct, 10) << __func__ << dendl;
87 int r = monmap.build_initial(cct, false, std::cerr);
88 ldout(cct,10) << "monmap:\n";
89 monmap.print(*_dout);
90 *_dout << dendl;
91 return r;
92 }
93
94 int MonClient::get_monmap()
95 {
96 ldout(cct, 10) << __func__ << dendl;
97 std::unique_lock l(monc_lock);
98
99 sub.want("monmap", 0, 0);
100 if (!_opened())
101 _reopen_session();
102 map_cond.wait(l, [this] { return !want_monmap; });
103 ldout(cct, 10) << __func__ << " done" << dendl;
104 return 0;
105 }
106
107 int MonClient::get_monmap_and_config()
108 {
109 ldout(cct, 10) << __func__ << dendl;
110 ceph_assert(!messenger);
111
112 int tries = 10;
113
114 cct->init_crypto();
115 auto shutdown_crypto = make_scope_guard([this] {
116 cct->shutdown_crypto();
117 });
118
119 int r = build_initial_monmap();
120 if (r < 0) {
121 lderr(cct) << __func__ << " cannot identify monitors to contact" << dendl;
122 return r;
123 }
124
125 messenger = Messenger::create_client_messenger(
126 cct, "temp_mon_client");
127 ceph_assert(messenger);
128 messenger->add_dispatcher_head(this);
129 messenger->start();
130 auto shutdown_msgr = make_scope_guard([this] {
131 messenger->shutdown();
132 messenger->wait();
133 delete messenger;
134 messenger = nullptr;
135 if (!monmap.fsid.is_zero()) {
136 cct->_conf.set_val("fsid", stringify(monmap.fsid));
137 }
138 });
139
140 while (tries-- > 0) {
141 r = init();
142 if (r < 0) {
143 return r;
144 }
145 r = authenticate(cct->_conf->client_mount_timeout);
146 if (r == -ETIMEDOUT) {
147 shutdown();
148 continue;
149 }
150 if (r < 0) {
151 break;
152 }
153 {
154 std::unique_lock l(monc_lock);
155 if (monmap.get_epoch() &&
156 !monmap.persistent_features.contains_all(
157 ceph::features::mon::FEATURE_MIMIC)) {
158 ldout(cct,10) << __func__ << " pre-mimic monitor, no config to fetch"
159 << dendl;
160 r = 0;
161 break;
162 }
163 while ((!got_config || monmap.get_epoch() == 0) && r == 0) {
164 ldout(cct,20) << __func__ << " waiting for monmap|config" << dendl;
165 map_cond.wait_for(l, ceph::make_timespan(
166 cct->_conf->mon_client_hunt_interval));
167 }
168 if (got_config) {
169 ldout(cct,10) << __func__ << " success" << dendl;
170 r = 0;
171 break;
172 }
173 }
174 lderr(cct) << __func__ << " failed to get config" << dendl;
175 shutdown();
176 continue;
177 }
178
179 shutdown();
180 return r;
181 }
182
183
184 /**
185 * Ping the monitor with id @p mon_id and set the resulting reply in
186 * the provided @p result_reply, if this last parameter is not NULL.
187 *
188 * So that we don't rely on the MonClient's default messenger, set up
189 * during connect(), we create our own messenger to comunicate with the
190 * specified monitor. This is advantageous in the following ways:
191 *
192 * - Isolate the ping procedure from the rest of the MonClient's operations,
193 * allowing us to not acquire or manage the big monc_lock, thus not
194 * having to block waiting for some other operation to finish before we
195 * can proceed.
196 * * for instance, we can ping mon.FOO even if we are currently hunting
197 * or blocked waiting for auth to complete with mon.BAR.
198 *
199 * - Ping a monitor prior to establishing a connection (using connect())
200 * and properly establish the MonClient's messenger. This frees us
201 * from dealing with the complex foo that happens in connect().
202 *
203 * We also don't rely on MonClient as a dispatcher for this messenger,
204 * unlike what happens with the MonClient's default messenger. This allows
205 * us to sandbox the whole ping, having it much as a separate entity in
206 * the MonClient class, considerably simplifying the handling and dispatching
207 * of messages without needing to consider monc_lock.
208 *
209 * Current drawback is that we will establish a messenger for each ping
210 * we want to issue, instead of keeping a single messenger instance that
211 * would be used for all pings.
212 */
213 int MonClient::ping_monitor(const string &mon_id, string *result_reply)
214 {
215 ldout(cct, 10) << __func__ << dendl;
216
217 string new_mon_id;
218 if (monmap.contains("noname-"+mon_id)) {
219 new_mon_id = "noname-"+mon_id;
220 } else {
221 new_mon_id = mon_id;
222 }
223
224 if (new_mon_id.empty()) {
225 ldout(cct, 10) << __func__ << " specified mon id is empty!" << dendl;
226 return -EINVAL;
227 } else if (!monmap.contains(new_mon_id)) {
228 ldout(cct, 10) << __func__ << " no such monitor 'mon." << new_mon_id << "'"
229 << dendl;
230 return -ENOENT;
231 }
232
233 // N.B. monc isn't initialized
234
235 auth_registry.refresh_config();
236
237 KeyRing keyring;
238 keyring.from_ceph_context(cct);
239 RotatingKeyRing rkeyring(cct, cct->get_module_type(), &keyring);
240
241 MonClientPinger *pinger = new MonClientPinger(cct,
242 &rkeyring,
243 result_reply);
244
245 Messenger *smsgr = Messenger::create_client_messenger(cct, "temp_ping_client");
246 smsgr->add_dispatcher_head(pinger);
247 smsgr->set_auth_client(pinger);
248 smsgr->start();
249
250 ConnectionRef con = smsgr->connect_to_mon(monmap.get_addrs(new_mon_id));
251 ldout(cct, 10) << __func__ << " ping mon." << new_mon_id
252 << " " << con->get_peer_addr() << dendl;
253
254 pinger->mc.reset(new MonConnection(cct, con, 0, &auth_registry));
255 pinger->mc->start(monmap.get_epoch(), entity_name);
256 con->send_message(new MPing);
257
258 int ret = pinger->wait_for_reply(cct->_conf->mon_client_ping_timeout);
259 if (ret == 0) {
260 ldout(cct,10) << __func__ << " got ping reply" << dendl;
261 } else {
262 ret = -ret;
263 }
264
265 con->mark_down();
266 pinger->mc.reset();
267 smsgr->shutdown();
268 smsgr->wait();
269 delete smsgr;
270 delete pinger;
271 return ret;
272 }
273
274 bool MonClient::ms_dispatch(Message *m)
275 {
276 // we only care about these message types
277 switch (m->get_type()) {
278 case CEPH_MSG_MON_MAP:
279 case CEPH_MSG_AUTH_REPLY:
280 case CEPH_MSG_MON_SUBSCRIBE_ACK:
281 case CEPH_MSG_MON_GET_VERSION_REPLY:
282 case MSG_MON_COMMAND_ACK:
283 case MSG_COMMAND_REPLY:
284 case MSG_LOGACK:
285 case MSG_CONFIG:
286 break;
287 case CEPH_MSG_PING:
288 m->put();
289 return true;
290 default:
291 return false;
292 }
293
294 std::lock_guard lock(monc_lock);
295
296 if (!m->get_connection()->is_anon() &&
297 m->get_source().type() == CEPH_ENTITY_TYPE_MON) {
298 if (_hunting()) {
299 auto p = _find_pending_con(m->get_connection());
300 if (p == pending_cons.end()) {
301 // ignore any messages outside hunting sessions
302 ldout(cct, 10) << "discarding stray monitor message " << *m << dendl;
303 m->put();
304 return true;
305 }
306 } else if (!active_con || active_con->get_con() != m->get_connection()) {
307 // ignore any messages outside our session(s)
308 ldout(cct, 10) << "discarding stray monitor message " << *m << dendl;
309 m->put();
310 return true;
311 }
312 }
313
314 switch (m->get_type()) {
315 case CEPH_MSG_MON_MAP:
316 handle_monmap(static_cast<MMonMap*>(m));
317 if (passthrough_monmap) {
318 return false;
319 } else {
320 m->put();
321 }
322 break;
323 case CEPH_MSG_AUTH_REPLY:
324 handle_auth(static_cast<MAuthReply*>(m));
325 break;
326 case CEPH_MSG_MON_SUBSCRIBE_ACK:
327 handle_subscribe_ack(static_cast<MMonSubscribeAck*>(m));
328 break;
329 case CEPH_MSG_MON_GET_VERSION_REPLY:
330 handle_get_version_reply(static_cast<MMonGetVersionReply*>(m));
331 break;
332 case MSG_MON_COMMAND_ACK:
333 handle_mon_command_ack(static_cast<MMonCommandAck*>(m));
334 break;
335 case MSG_COMMAND_REPLY:
336 if (m->get_connection()->is_anon() &&
337 m->get_source().type() == CEPH_ENTITY_TYPE_MON) {
338 // this connection is from 'tell'... ignore everything except our command
339 // reply. (we'll get misc other message because we authenticated, but we
340 // don't need them.)
341 handle_command_reply(static_cast<MCommandReply*>(m));
342 return true;
343 }
344 // leave the message for another dispatch handler (e.g., Objecter)
345 return false;
346 case MSG_LOGACK:
347 if (log_client) {
348 log_client->handle_log_ack(static_cast<MLogAck*>(m));
349 m->put();
350 if (more_log_pending) {
351 send_log();
352 }
353 } else {
354 m->put();
355 }
356 break;
357 case MSG_CONFIG:
358 handle_config(static_cast<MConfig*>(m));
359 break;
360 }
361 return true;
362 }
363
364 void MonClient::send_log(bool flush)
365 {
366 if (log_client) {
367 auto lm = log_client->get_mon_log_message(flush);
368 if (lm)
369 _send_mon_message(std::move(lm));
370 more_log_pending = log_client->are_pending();
371 }
372 }
373
374 void MonClient::flush_log()
375 {
376 std::lock_guard l(monc_lock);
377 send_log();
378 }
379
380 /* Unlike all the other message-handling functions, we don't put away a reference
381 * because we want to support MMonMap passthrough to other Dispatchers. */
382 void MonClient::handle_monmap(MMonMap *m)
383 {
384 ldout(cct, 10) << __func__ << " " << *m << dendl;
385 auto con_addrs = m->get_source_addrs();
386 string old_name = monmap.get_name(con_addrs);
387 const auto old_epoch = monmap.get_epoch();
388
389 auto p = m->monmapbl.cbegin();
390 decode(monmap, p);
391
392 ldout(cct, 10) << " got monmap " << monmap.epoch
393 << " from mon." << old_name
394 << " (according to old e" << monmap.get_epoch() << ")"
395 << dendl;
396 ldout(cct, 10) << "dump:\n";
397 monmap.print(*_dout);
398 *_dout << dendl;
399
400 if (old_epoch != monmap.get_epoch()) {
401 tried.clear();
402 }
403 if (old_name.size() == 0) {
404 ldout(cct,10) << " can't identify which mon we were connected to" << dendl;
405 _reopen_session();
406 } else {
407 auto new_name = monmap.get_name(con_addrs);
408 if (new_name.empty()) {
409 ldout(cct, 10) << "mon." << old_name << " at " << con_addrs
410 << " went away" << dendl;
411 // can't find the mon we were talking to (above)
412 _reopen_session();
413 } else if (messenger->should_use_msgr2() &&
414 monmap.get_addrs(new_name).has_msgr2() &&
415 !con_addrs.has_msgr2()) {
416 ldout(cct,1) << " mon." << new_name << " has (v2) addrs "
417 << monmap.get_addrs(new_name) << " but i'm connected to "
418 << con_addrs << ", reconnecting" << dendl;
419 _reopen_session();
420 }
421 }
422
423 sub.got("monmap", monmap.get_epoch());
424 map_cond.notify_all();
425 want_monmap = false;
426
427 if (authenticate_err == 1) {
428 _finish_auth(0);
429 }
430 }
431
432 void MonClient::handle_config(MConfig *m)
433 {
434 ldout(cct,10) << __func__ << " " << *m << dendl;
435 finisher.queue(new LambdaContext([this, m](int r) {
436 cct->_conf.set_mon_vals(cct, m->config, config_cb);
437 if (config_notify_cb) {
438 config_notify_cb();
439 }
440 m->put();
441 }));
442 got_config = true;
443 map_cond.notify_all();
444 }
445
446 // ----------------------
447
448 int MonClient::init()
449 {
450 ldout(cct, 10) << __func__ << dendl;
451
452 entity_name = cct->_conf->name;
453
454 auth_registry.refresh_config();
455
456 std::lock_guard l(monc_lock);
457 keyring.reset(new KeyRing);
458 if (auth_registry.is_supported_method(messenger->get_mytype(),
459 CEPH_AUTH_CEPHX)) {
460 // this should succeed, because auth_registry just checked!
461 int r = keyring->from_ceph_context(cct);
462 if (r != 0) {
463 // but be somewhat graceful in case there was a race condition
464 lderr(cct) << "keyring not found" << dendl;
465 return r;
466 }
467 }
468 if (!auth_registry.any_supported_methods(messenger->get_mytype())) {
469 return -ENOENT;
470 }
471
472 rotating_secrets.reset(
473 new RotatingKeyRing(cct, cct->get_module_type(), keyring.get()));
474
475 initialized = true;
476
477 messenger->set_auth_client(this);
478 messenger->add_dispatcher_head(this);
479
480 timer.init();
481 finisher.start();
482 schedule_tick();
483
484 return 0;
485 }
486
487 void MonClient::shutdown()
488 {
489 ldout(cct, 10) << __func__ << dendl;
490 monc_lock.lock();
491 stopping = true;
492 while (!version_requests.empty()) {
493 version_requests.begin()->second->context->complete(-ECANCELED);
494 ldout(cct, 20) << __func__ << " canceling and discarding version request "
495 << version_requests.begin()->second << dendl;
496 delete version_requests.begin()->second;
497 version_requests.erase(version_requests.begin());
498 }
499 while (!mon_commands.empty()) {
500 auto tid = mon_commands.begin()->first;
501 _cancel_mon_command(tid);
502 }
503 ldout(cct, 20) << __func__ << " discarding " << waiting_for_session.size()
504 << " pending message(s)" << dendl;
505 waiting_for_session.clear();
506
507 active_con.reset();
508 pending_cons.clear();
509 auth.reset();
510
511 monc_lock.unlock();
512
513 if (initialized) {
514 finisher.wait_for_empty();
515 finisher.stop();
516 initialized = false;
517 }
518 monc_lock.lock();
519 timer.shutdown();
520 stopping = false;
521 monc_lock.unlock();
522 }
523
524 int MonClient::authenticate(double timeout)
525 {
526 std::unique_lock lock{monc_lock};
527
528 if (active_con) {
529 ldout(cct, 5) << "already authenticated" << dendl;
530 return 0;
531 }
532 sub.want("monmap", monmap.get_epoch() ? monmap.get_epoch() + 1 : 0, 0);
533 sub.want("config", 0, 0);
534 if (!_opened())
535 _reopen_session();
536
537 auto until = ceph::real_clock::now();
538 until += ceph::make_timespan(timeout);
539 if (timeout > 0.0)
540 ldout(cct, 10) << "authenticate will time out at " << until << dendl;
541 authenticate_err = 1; // == in progress
542 while (!active_con && authenticate_err >= 0) {
543 if (timeout > 0.0) {
544 auto r = auth_cond.wait_until(lock, until);
545 if (r == cv_status::timeout && !active_con) {
546 ldout(cct, 0) << "authenticate timed out after " << timeout << dendl;
547 authenticate_err = -ETIMEDOUT;
548 }
549 } else {
550 auth_cond.wait(lock);
551 }
552 }
553
554 if (active_con) {
555 ldout(cct, 5) << __func__ << " success, global_id "
556 << active_con->get_global_id() << dendl;
557 // active_con should not have been set if there was an error
558 ceph_assert(authenticate_err >= 0);
559 authenticated = true;
560 }
561
562 if (authenticate_err < 0 && auth_registry.no_keyring_disabled_cephx()) {
563 lderr(cct) << __func__ << " NOTE: no keyring found; disabled cephx authentication" << dendl;
564 }
565
566 return authenticate_err;
567 }
568
569 void MonClient::handle_auth(MAuthReply *m)
570 {
571 ceph_assert(ceph_mutex_is_locked(monc_lock));
572
573 if (m->get_connection()->is_anon()) {
574 // anon connection, used for mon tell commands
575 for (auto& p : mon_commands) {
576 if (p.second->target_con == m->get_connection()) {
577 auto& mc = p.second->target_session;
578 int ret = mc->handle_auth(m, entity_name,
579 CEPH_ENTITY_TYPE_MON,
580 rotating_secrets.get());
581 (void)ret; // we don't care
582 break;
583 }
584 }
585 m->put();
586 return;
587 }
588
589 if (!_hunting()) {
590 std::swap(active_con->get_auth(), auth);
591 int ret = active_con->authenticate(m);
592 m->put();
593 std::swap(auth, active_con->get_auth());
594 if (global_id != active_con->get_global_id()) {
595 lderr(cct) << __func__ << " peer assigned me a different global_id: "
596 << active_con->get_global_id() << dendl;
597 }
598 if (ret != -EAGAIN) {
599 _finish_auth(ret);
600 }
601 return;
602 }
603
604 // hunting
605 auto found = _find_pending_con(m->get_connection());
606 ceph_assert(found != pending_cons.end());
607 int auth_err = found->second.handle_auth(m, entity_name, want_keys,
608 rotating_secrets.get());
609 m->put();
610 if (auth_err == -EAGAIN) {
611 return;
612 }
613 if (auth_err) {
614 pending_cons.erase(found);
615 if (!pending_cons.empty()) {
616 // keep trying with pending connections
617 return;
618 }
619 // the last try just failed, give up.
620 } else {
621 auto& mc = found->second;
622 ceph_assert(mc.have_session());
623 active_con.reset(new MonConnection(std::move(mc)));
624 pending_cons.clear();
625 }
626
627 _finish_hunting(auth_err);
628 _finish_auth(auth_err);
629 }
630
631 void MonClient::_finish_auth(int auth_err)
632 {
633 ldout(cct,10) << __func__ << " " << auth_err << dendl;
634 authenticate_err = auth_err;
635 // _resend_mon_commands() could _reopen_session() if the connected mon is not
636 // the one the MonCommand is targeting.
637 if (!auth_err && active_con) {
638 ceph_assert(auth);
639 _check_auth_tickets();
640 }
641 auth_cond.notify_all();
642
643 if (!auth_err) {
644 Context *cb = nullptr;
645 if (session_established_context) {
646 cb = session_established_context.release();
647 }
648 if (cb) {
649 monc_lock.unlock();
650 cb->complete(0);
651 monc_lock.lock();
652 }
653 }
654 }
655
656 // ---------
657
658 void MonClient::send_mon_message(MessageRef m)
659 {
660 std::lock_guard l{monc_lock};
661 _send_mon_message(std::move(m));
662 }
663
664 void MonClient::_send_mon_message(MessageRef m)
665 {
666 ceph_assert(ceph_mutex_is_locked(monc_lock));
667 if (active_con) {
668 auto cur_con = active_con->get_con();
669 ldout(cct, 10) << "_send_mon_message to mon."
670 << monmap.get_name(cur_con->get_peer_addr())
671 << " at " << cur_con->get_peer_addr() << dendl;
672 cur_con->send_message2(std::move(m));
673 } else {
674 waiting_for_session.push_back(std::move(m));
675 }
676 }
677
678 void MonClient::_reopen_session(int rank)
679 {
680 ceph_assert(ceph_mutex_is_locked(monc_lock));
681 ldout(cct, 10) << __func__ << " rank " << rank << dendl;
682
683 active_con.reset();
684 pending_cons.clear();
685
686 _start_hunting();
687
688 if (rank >= 0) {
689 _add_conn(rank, global_id);
690 } else {
691 _add_conns(global_id);
692 }
693
694 // throw out old queued messages
695 waiting_for_session.clear();
696
697 // throw out version check requests
698 while (!version_requests.empty()) {
699 finisher.queue(version_requests.begin()->second->context, -EAGAIN);
700 delete version_requests.begin()->second;
701 version_requests.erase(version_requests.begin());
702 }
703
704 for (auto& c : pending_cons) {
705 c.second.start(monmap.get_epoch(), entity_name);
706 }
707
708 if (sub.reload()) {
709 _renew_subs();
710 }
711 }
712
713 MonConnection& MonClient::_add_conn(unsigned rank, uint64_t global_id)
714 {
715 auto peer = monmap.get_addrs(rank);
716 auto conn = messenger->connect_to_mon(peer);
717 MonConnection mc(cct, conn, global_id, &auth_registry);
718 auto inserted = pending_cons.insert(std::make_pair(peer, std::move(mc)));
719 ldout(cct, 10) << "picked mon." << monmap.get_name(rank)
720 << " con " << conn
721 << " addr " << peer
722 << dendl;
723 return inserted.first->second;
724 }
725
726 void MonClient::_add_conns(uint64_t global_id)
727 {
728 // collect the next batch of candidates who are listed right next to the ones
729 // already tried
730 auto get_next_batch = [this]() -> std::vector<unsigned> {
731 std::multimap<uint16_t, unsigned> ranks_by_priority;
732 boost::copy(
733 monmap.mon_info | boost::adaptors::filtered(
734 [this](auto& info) {
735 auto rank = monmap.get_rank(info.first);
736 return tried.count(rank) == 0;
737 }) | boost::adaptors::transformed(
738 [this](auto& info) {
739 auto rank = monmap.get_rank(info.first);
740 return std::make_pair(info.second.priority, rank);
741 }), std::inserter(ranks_by_priority, end(ranks_by_priority)));
742 if (ranks_by_priority.empty()) {
743 return {};
744 }
745 // only choose the monitors with lowest priority
746 auto cands = boost::make_iterator_range(
747 ranks_by_priority.equal_range(ranks_by_priority.begin()->first));
748 std::vector<unsigned> ranks;
749 boost::range::copy(cands | boost::adaptors::map_values,
750 std::back_inserter(ranks));
751 return ranks;
752 };
753 auto ranks = get_next_batch();
754 if (ranks.empty()) {
755 tried.clear(); // start over
756 ranks = get_next_batch();
757 }
758 ceph_assert(!ranks.empty());
759 if (ranks.size() > 1) {
760 std::vector<uint16_t> weights;
761 for (auto i : ranks) {
762 auto rank_name = monmap.get_name(i);
763 weights.push_back(monmap.get_weight(rank_name));
764 }
765 std::random_device rd;
766 if (std::accumulate(begin(weights), end(weights), 0u) == 0) {
767 std::shuffle(begin(ranks), end(ranks), std::mt19937{rd()});
768 } else {
769 weighted_shuffle(begin(ranks), end(ranks), begin(weights), end(weights),
770 std::mt19937{rd()});
771 }
772 }
773 ldout(cct, 10) << __func__ << " ranks=" << ranks << dendl;
774 unsigned n = cct->_conf->mon_client_hunt_parallel;
775 if (n == 0 || n > ranks.size()) {
776 n = ranks.size();
777 }
778 for (unsigned i = 0; i < n; i++) {
779 _add_conn(ranks[i], global_id);
780 tried.insert(ranks[i]);
781 }
782 }
783
784 bool MonClient::ms_handle_reset(Connection *con)
785 {
786 std::lock_guard lock(monc_lock);
787
788 if (con->get_peer_type() != CEPH_ENTITY_TYPE_MON)
789 return false;
790
791 if (con->is_anon()) {
792 auto p = mon_commands.begin();
793 while (p != mon_commands.end()) {
794 auto cmd = p->second;
795 ++p;
796 if (cmd->target_con == con) {
797 _send_command(cmd); // may retry or fail
798 break;
799 }
800 }
801 return true;
802 }
803
804 if (_hunting()) {
805 if (pending_cons.count(con->get_peer_addrs())) {
806 ldout(cct, 10) << __func__ << " hunted mon " << con->get_peer_addrs()
807 << dendl;
808 } else {
809 ldout(cct, 10) << __func__ << " stray mon " << con->get_peer_addrs()
810 << dendl;
811 }
812 return true;
813 } else {
814 if (active_con && con == active_con->get_con()) {
815 ldout(cct, 10) << __func__ << " current mon " << con->get_peer_addrs()
816 << dendl;
817 _reopen_session();
818 return false;
819 } else {
820 ldout(cct, 10) << "ms_handle_reset stray mon " << con->get_peer_addrs()
821 << dendl;
822 return true;
823 }
824 }
825 }
826
827 bool MonClient::_opened() const
828 {
829 ceph_assert(ceph_mutex_is_locked(monc_lock));
830 return active_con || _hunting();
831 }
832
833 bool MonClient::_hunting() const
834 {
835 return !pending_cons.empty();
836 }
837
838 void MonClient::_start_hunting()
839 {
840 ceph_assert(!_hunting());
841 // adjust timeouts if necessary
842 if (!had_a_connection)
843 return;
844 reopen_interval_multiplier *= cct->_conf->mon_client_hunt_interval_backoff;
845 if (reopen_interval_multiplier >
846 cct->_conf->mon_client_hunt_interval_max_multiple) {
847 reopen_interval_multiplier =
848 cct->_conf->mon_client_hunt_interval_max_multiple;
849 }
850 }
851
852 void MonClient::_finish_hunting(int auth_err)
853 {
854 ldout(cct,10) << __func__ << " " << auth_err << dendl;
855 ceph_assert(ceph_mutex_is_locked(monc_lock));
856 // the pending conns have been cleaned.
857 ceph_assert(!_hunting());
858 if (active_con) {
859 auto con = active_con->get_con();
860 ldout(cct, 1) << "found mon."
861 << monmap.get_name(con->get_peer_addr())
862 << dendl;
863 } else {
864 ldout(cct, 1) << "no mon sessions established" << dendl;
865 }
866
867 had_a_connection = true;
868 _un_backoff();
869
870 if (!auth_err) {
871 last_rotating_renew_sent = utime_t();
872 while (!waiting_for_session.empty()) {
873 _send_mon_message(std::move(waiting_for_session.front()));
874 waiting_for_session.pop_front();
875 }
876 _resend_mon_commands();
877 send_log(true);
878 if (active_con) {
879 std::swap(auth, active_con->get_auth());
880 if (global_id && global_id != active_con->get_global_id()) {
881 lderr(cct) << __func__ << " global_id changed from " << global_id
882 << " to " << active_con->get_global_id() << dendl;
883 }
884 global_id = active_con->get_global_id();
885 }
886 }
887 }
888
889 void MonClient::tick()
890 {
891 ldout(cct, 10) << __func__ << dendl;
892
893 utime_t now = ceph_clock_now();
894
895 auto reschedule_tick = make_scope_guard([this] {
896 schedule_tick();
897 });
898
899 _check_auth_tickets();
900 _check_tell_commands();
901
902 if (_hunting()) {
903 ldout(cct, 1) << "continuing hunt" << dendl;
904 return _reopen_session();
905 } else if (active_con) {
906 // just renew as needed
907 auto cur_con = active_con->get_con();
908 if (!cur_con->has_feature(CEPH_FEATURE_MON_STATEFUL_SUB)) {
909 const bool maybe_renew = sub.need_renew();
910 ldout(cct, 10) << "renew subs? -- " << (maybe_renew ? "yes" : "no")
911 << dendl;
912 if (maybe_renew) {
913 _renew_subs();
914 }
915 }
916
917 if (now > last_keepalive + cct->_conf->mon_client_ping_interval) {
918 cur_con->send_keepalive();
919 last_keepalive = now;
920
921 if (cct->_conf->mon_client_ping_timeout > 0 &&
922 cur_con->has_feature(CEPH_FEATURE_MSGR_KEEPALIVE2)) {
923 utime_t lk = cur_con->get_last_keepalive_ack();
924 utime_t interval = now - lk;
925 if (interval > cct->_conf->mon_client_ping_timeout) {
926 ldout(cct, 1) << "no keepalive since " << lk << " (" << interval
927 << " seconds), reconnecting" << dendl;
928 return _reopen_session();
929 }
930 }
931
932 _un_backoff();
933 }
934
935 if (now > last_send_log + cct->_conf->mon_client_log_interval) {
936 send_log();
937 last_send_log = now;
938 }
939 }
940 }
941
942 void MonClient::_un_backoff()
943 {
944 // un-backoff our reconnect interval
945 reopen_interval_multiplier = std::max(
946 cct->_conf.get_val<double>("mon_client_hunt_interval_min_multiple"),
947 reopen_interval_multiplier /
948 cct->_conf.get_val<double>("mon_client_hunt_interval_backoff"));
949 ldout(cct, 20) << __func__ << " reopen_interval_multipler now "
950 << reopen_interval_multiplier << dendl;
951 }
952
953 void MonClient::schedule_tick()
954 {
955 auto do_tick = make_lambda_context([this](int) { tick(); });
956 if (_hunting()) {
957 const auto hunt_interval = (cct->_conf->mon_client_hunt_interval *
958 reopen_interval_multiplier);
959 timer.add_event_after(hunt_interval, do_tick);
960 } else {
961 timer.add_event_after(std::min(cct->_conf->mon_client_ping_interval,
962 cct->_conf->mon_client_log_interval),
963 do_tick);
964 }
965 }
966
967 // ---------
968
969 void MonClient::_renew_subs()
970 {
971 ceph_assert(ceph_mutex_is_locked(monc_lock));
972 if (!sub.have_new()) {
973 ldout(cct, 10) << __func__ << " - empty" << dendl;
974 return;
975 }
976
977 ldout(cct, 10) << __func__ << dendl;
978 if (!_opened())
979 _reopen_session();
980 else {
981 auto m = ceph::make_message<MMonSubscribe>();
982 m->what = sub.get_subs();
983 m->hostname = ceph_get_short_hostname();
984 _send_mon_message(std::move(m));
985 sub.renewed();
986 }
987 }
988
989 void MonClient::handle_subscribe_ack(MMonSubscribeAck *m)
990 {
991 sub.acked(m->interval);
992 m->put();
993 }
994
995 int MonClient::_check_auth_tickets()
996 {
997 ceph_assert(ceph_mutex_is_locked(monc_lock));
998 if (active_con && auth) {
999 if (auth->need_tickets()) {
1000 ldout(cct, 10) << __func__ << " getting new tickets!" << dendl;
1001 auto m = ceph::make_message<MAuth>();
1002 m->protocol = auth->get_protocol();
1003 auth->prepare_build_request();
1004 auth->build_request(m->auth_payload);
1005 _send_mon_message(m);
1006 }
1007
1008 _check_auth_rotating();
1009 }
1010 return 0;
1011 }
1012
1013 int MonClient::_check_auth_rotating()
1014 {
1015 ceph_assert(ceph_mutex_is_locked(monc_lock));
1016 if (!rotating_secrets ||
1017 !auth_principal_needs_rotating_keys(entity_name)) {
1018 ldout(cct, 20) << "_check_auth_rotating not needed by " << entity_name << dendl;
1019 return 0;
1020 }
1021
1022 if (!active_con || !auth) {
1023 ldout(cct, 10) << "_check_auth_rotating waiting for auth session" << dendl;
1024 return 0;
1025 }
1026
1027 utime_t now = ceph_clock_now();
1028 utime_t cutoff = now;
1029 cutoff -= std::min(30.0, cct->_conf->auth_service_ticket_ttl / 4.0);
1030 utime_t issued_at_lower_bound = now;
1031 issued_at_lower_bound -= cct->_conf->auth_service_ticket_ttl;
1032 if (!rotating_secrets->need_new_secrets(cutoff)) {
1033 ldout(cct, 10) << "_check_auth_rotating have uptodate secrets (they expire after " << cutoff << ")" << dendl;
1034 rotating_secrets->dump_rotating();
1035 return 0;
1036 }
1037
1038 ldout(cct, 10) << "_check_auth_rotating renewing rotating keys (they expired before " << cutoff << ")" << dendl;
1039 if (!rotating_secrets->need_new_secrets() &&
1040 rotating_secrets->need_new_secrets(issued_at_lower_bound)) {
1041 // the key has expired before it has been issued?
1042 lderr(cct) << __func__ << " possible clock skew, rotating keys expired way too early"
1043 << " (before " << issued_at_lower_bound << ")" << dendl;
1044 }
1045 if ((now > last_rotating_renew_sent) &&
1046 double(now - last_rotating_renew_sent) < 1) {
1047 ldout(cct, 10) << __func__ << " called too often (last: "
1048 << last_rotating_renew_sent << "), skipping refresh" << dendl;
1049 return 0;
1050 }
1051 auto m = ceph::make_message<MAuth>();
1052 m->protocol = auth->get_protocol();
1053 if (auth->build_rotating_request(m->auth_payload)) {
1054 last_rotating_renew_sent = now;
1055 _send_mon_message(std::move(m));
1056 }
1057 return 0;
1058 }
1059
1060 int MonClient::wait_auth_rotating(double timeout)
1061 {
1062 std::unique_lock l(monc_lock);
1063
1064 // Must be initialized
1065 ceph_assert(auth != nullptr);
1066
1067 if (auth->get_protocol() == CEPH_AUTH_NONE)
1068 return 0;
1069
1070 if (!rotating_secrets)
1071 return 0;
1072
1073 ldout(cct, 10) << __func__ << " waiting for " << timeout << dendl;
1074 utime_t now = ceph_clock_now();
1075 if (auth_cond.wait_for(l, ceph::make_timespan(timeout), [now, this] {
1076 return (!auth_principal_needs_rotating_keys(entity_name) ||
1077 !rotating_secrets->need_new_secrets(now));
1078 })) {
1079 ldout(cct, 10) << __func__ << " done" << dendl;
1080 return 0;
1081 } else {
1082 ldout(cct, 0) << __func__ << " timed out after " << timeout << dendl;
1083 return -ETIMEDOUT;
1084 }
1085 }
1086
1087 // ---------
1088
1089 void MonClient::_send_command(MonCommand *r)
1090 {
1091 if (r->is_tell()) {
1092 ++r->send_attempts;
1093 if (r->send_attempts > cct->_conf->mon_client_directed_command_retry) {
1094 _finish_command(r, -ENXIO, "mon unavailable");
1095 return;
1096 }
1097
1098 // tell-style command
1099 if (monmap.min_mon_release >= ceph_release_t::octopus) {
1100 if (r->target_con) {
1101 r->target_con->mark_down();
1102 }
1103 if (r->target_rank >= 0) {
1104 if (r->target_rank >= (int)monmap.size()) {
1105 ldout(cct, 10) << " target " << r->target_rank
1106 << " >= max mon " << monmap.size() << dendl;
1107 _finish_command(r, -ENOENT, "mon rank dne");
1108 return;
1109 }
1110 r->target_con = messenger->connect_to_mon(
1111 monmap.get_addrs(r->target_rank), true /* anon */);
1112 } else {
1113 if (!monmap.contains(r->target_name)) {
1114 ldout(cct, 10) << " target " << r->target_name
1115 << " not present in monmap" << dendl;
1116 _finish_command(r, -ENOENT, "mon dne");
1117 return;
1118 }
1119 r->target_con = messenger->connect_to_mon(
1120 monmap.get_addrs(r->target_name), true /* anon */);
1121 }
1122
1123 r->target_session.reset(new MonConnection(cct, r->target_con, 0,
1124 &auth_registry));
1125 r->target_session->start(monmap.get_epoch(), entity_name);
1126 r->last_send_attempt = ceph_clock_now();
1127
1128 MCommand *m = new MCommand(monmap.fsid);
1129 m->set_tid(r->tid);
1130 m->cmd = r->cmd;
1131 m->set_data(r->inbl);
1132 r->target_session->queue_command(m);
1133 return;
1134 }
1135
1136 // ugly legacy handling of pre-octopus mons
1137 entity_addr_t peer;
1138 if (active_con) {
1139 peer = active_con->get_con()->get_peer_addr();
1140 }
1141
1142 if (r->target_rank >= 0 &&
1143 r->target_rank != monmap.get_rank(peer)) {
1144 ldout(cct, 10) << __func__ << " " << r->tid << " " << r->cmd
1145 << " wants rank " << r->target_rank
1146 << ", reopening session"
1147 << dendl;
1148 if (r->target_rank >= (int)monmap.size()) {
1149 ldout(cct, 10) << " target " << r->target_rank
1150 << " >= max mon " << monmap.size() << dendl;
1151 _finish_command(r, -ENOENT, "mon rank dne");
1152 return;
1153 }
1154 _reopen_session(r->target_rank);
1155 return;
1156 }
1157 if (r->target_name.length() &&
1158 r->target_name != monmap.get_name(peer)) {
1159 ldout(cct, 10) << __func__ << " " << r->tid << " " << r->cmd
1160 << " wants mon " << r->target_name
1161 << ", reopening session"
1162 << dendl;
1163 if (!monmap.contains(r->target_name)) {
1164 ldout(cct, 10) << " target " << r->target_name
1165 << " not present in monmap" << dendl;
1166 _finish_command(r, -ENOENT, "mon dne");
1167 return;
1168 }
1169 _reopen_session(monmap.get_rank(r->target_name));
1170 return;
1171 }
1172 // fall-thru to send 'normal' CLI command
1173 }
1174
1175 // normal CLI command
1176 ldout(cct, 10) << __func__ << " " << r->tid << " " << r->cmd << dendl;
1177 auto m = ceph::make_message<MMonCommand>(monmap.fsid);
1178 m->set_tid(r->tid);
1179 m->cmd = r->cmd;
1180 m->set_data(r->inbl);
1181 _send_mon_message(std::move(m));
1182 return;
1183 }
1184
1185 void MonClient::_check_tell_commands()
1186 {
1187 // resend any requests
1188 auto now = ceph_clock_now();
1189 auto p = mon_commands.begin();
1190 while (p != mon_commands.end()) {
1191 auto cmd = p->second;
1192 ++p;
1193 if (cmd->is_tell() &&
1194 cmd->last_send_attempt != utime_t() &&
1195 now - cmd->last_send_attempt > cct->_conf->mon_client_hunt_interval) {
1196 ldout(cct,5) << __func__ << " timeout tell command " << cmd->tid << dendl;
1197 _send_command(cmd); // might remove cmd from mon_commands
1198 }
1199 }
1200 }
1201
1202 void MonClient::_resend_mon_commands()
1203 {
1204 // resend any requests
1205 auto p = mon_commands.begin();
1206 while (p != mon_commands.end()) {
1207 auto cmd = p->second;
1208 ++p;
1209 if (cmd->is_tell() && monmap.min_mon_release >= ceph_release_t::octopus) {
1210 // starting with octopus, tell commands use their own connetion and need no
1211 // special resend when we finish hunting.
1212 } else {
1213 _send_command(cmd); // might remove cmd from mon_commands
1214 }
1215 }
1216 }
1217
1218 void MonClient::handle_mon_command_ack(MMonCommandAck *ack)
1219 {
1220 MonCommand *r = NULL;
1221 uint64_t tid = ack->get_tid();
1222
1223 if (tid == 0 && !mon_commands.empty()) {
1224 r = mon_commands.begin()->second;
1225 ldout(cct, 10) << __func__ << " has tid 0, assuming it is " << r->tid << dendl;
1226 } else {
1227 auto p = mon_commands.find(tid);
1228 if (p == mon_commands.end()) {
1229 ldout(cct, 10) << __func__ << " " << ack->get_tid() << " not found" << dendl;
1230 ack->put();
1231 return;
1232 }
1233 r = p->second;
1234 }
1235
1236 ldout(cct, 10) << __func__ << " " << r->tid << " " << r->cmd << dendl;
1237 if (r->poutbl)
1238 r->poutbl->claim(ack->get_data());
1239 _finish_command(r, ack->r, ack->rs);
1240 ack->put();
1241 }
1242
1243 void MonClient::handle_command_reply(MCommandReply *reply)
1244 {
1245 MonCommand *r = NULL;
1246 uint64_t tid = reply->get_tid();
1247
1248 if (tid == 0 && !mon_commands.empty()) {
1249 r = mon_commands.begin()->second;
1250 ldout(cct, 10) << __func__ << " has tid 0, assuming it is " << r->tid
1251 << dendl;
1252 } else {
1253 auto p = mon_commands.find(tid);
1254 if (p == mon_commands.end()) {
1255 ldout(cct, 10) << __func__ << " " << reply->get_tid() << " not found"
1256 << dendl;
1257 reply->put();
1258 return;
1259 }
1260 r = p->second;
1261 }
1262
1263 ldout(cct, 10) << __func__ << " " << r->tid << " " << r->cmd << dendl;
1264 if (r->poutbl)
1265 r->poutbl->claim(reply->get_data());
1266 _finish_command(r, reply->r, reply->rs);
1267 reply->put();
1268 }
1269
1270 int MonClient::_cancel_mon_command(uint64_t tid)
1271 {
1272 ceph_assert(ceph_mutex_is_locked(monc_lock));
1273
1274 auto it = mon_commands.find(tid);
1275 if (it == mon_commands.end()) {
1276 ldout(cct, 10) << __func__ << " tid " << tid << " dne" << dendl;
1277 return -ENOENT;
1278 }
1279
1280 ldout(cct, 10) << __func__ << " tid " << tid << dendl;
1281
1282 MonCommand *cmd = it->second;
1283 _finish_command(cmd, -ETIMEDOUT, "");
1284 return 0;
1285 }
1286
1287 void MonClient::_finish_command(MonCommand *r, int ret, string rs)
1288 {
1289 ldout(cct, 10) << __func__ << " " << r->tid << " = " << ret << " " << rs << dendl;
1290 if (r->prval)
1291 *(r->prval) = ret;
1292 if (r->prs)
1293 *(r->prs) = rs;
1294 if (r->onfinish)
1295 finisher.queue(r->onfinish, ret);
1296 if (r->target_con) {
1297 r->target_con->mark_down();
1298 }
1299 mon_commands.erase(r->tid);
1300 delete r;
1301 }
1302
1303 void MonClient::start_mon_command(const std::vector<string>& cmd,
1304 const ceph::buffer::list& inbl,
1305 ceph::buffer::list *outbl, string *outs,
1306 Context *onfinish)
1307 {
1308 ldout(cct,10) << __func__ << " cmd=" << cmd << dendl;
1309 std::lock_guard l(monc_lock);
1310 if (!initialized || stopping) {
1311 if (onfinish) {
1312 onfinish->complete(-ECANCELED);
1313 }
1314 return;
1315 }
1316 MonCommand *r = new MonCommand(++last_mon_command_tid);
1317 r->cmd = cmd;
1318 r->inbl = inbl;
1319 r->poutbl = outbl;
1320 r->prs = outs;
1321 r->onfinish = onfinish;
1322 if (cct->_conf->rados_mon_op_timeout > 0) {
1323 class C_CancelMonCommand : public Context
1324 {
1325 uint64_t tid;
1326 MonClient *monc;
1327 public:
1328 C_CancelMonCommand(uint64_t tid, MonClient *monc) : tid(tid), monc(monc) {}
1329 void finish(int r) override {
1330 monc->_cancel_mon_command(tid);
1331 }
1332 };
1333 r->ontimeout = new C_CancelMonCommand(r->tid, this);
1334 timer.add_event_after(cct->_conf->rados_mon_op_timeout, r->ontimeout);
1335 }
1336 mon_commands[r->tid] = r;
1337 _send_command(r);
1338 }
1339
1340 void MonClient::start_mon_command(const string &mon_name,
1341 const std::vector<string>& cmd,
1342 const ceph::buffer::list& inbl,
1343 ceph::buffer::list *outbl, string *outs,
1344 Context *onfinish)
1345 {
1346 ldout(cct,10) << __func__ << " mon." << mon_name << " cmd=" << cmd << dendl;
1347 std::lock_guard l(monc_lock);
1348 if (!initialized || stopping) {
1349 if (onfinish) {
1350 onfinish->complete(-ECANCELED);
1351 }
1352 return;
1353 }
1354 MonCommand *r = new MonCommand(++last_mon_command_tid);
1355
1356 // detect/tolerate mon *rank* passed as a string
1357 string err;
1358 int rank = strict_strtoll(mon_name.c_str(), 10, &err);
1359 if (err.size() == 0 && rank >= 0) {
1360 ldout(cct,10) << __func__ << " interpreting name '" << mon_name
1361 << "' as rank " << rank << dendl;
1362 r->target_rank = rank;
1363 } else {
1364 r->target_name = mon_name;
1365 }
1366 r->cmd = cmd;
1367 r->inbl = inbl;
1368 r->poutbl = outbl;
1369 r->prs = outs;
1370 r->onfinish = onfinish;
1371 mon_commands[r->tid] = r;
1372 _send_command(r);
1373 }
1374
1375 void MonClient::start_mon_command(int rank,
1376 const std::vector<string>& cmd,
1377 const ceph::buffer::list& inbl,
1378 ceph::buffer::list *outbl, string *outs,
1379 Context *onfinish)
1380 {
1381 ldout(cct,10) << __func__ << " rank " << rank << " cmd=" << cmd << dendl;
1382 std::lock_guard l(monc_lock);
1383 if (!initialized || stopping) {
1384 if (onfinish) {
1385 onfinish->complete(-ECANCELED);
1386 }
1387 return;
1388 }
1389 MonCommand *r = new MonCommand(++last_mon_command_tid);
1390 r->target_rank = rank;
1391 r->cmd = cmd;
1392 r->inbl = inbl;
1393 r->poutbl = outbl;
1394 r->prs = outs;
1395 r->onfinish = onfinish;
1396 mon_commands[r->tid] = r;
1397 _send_command(r);
1398 }
1399
1400 // ---------
1401
1402 void MonClient::get_version(string map, version_t *newest, version_t *oldest, Context *onfinish)
1403 {
1404 version_req_d *req = new version_req_d(onfinish, newest, oldest);
1405 ldout(cct, 10) << "get_version " << map << " req " << req << dendl;
1406 std::lock_guard l(monc_lock);
1407 auto m = ceph::make_message<MMonGetVersion>();
1408 m->what = map;
1409 m->handle = ++version_req_id;
1410 version_requests[m->handle] = req;
1411 _send_mon_message(std::move(m));
1412 }
1413
1414 void MonClient::handle_get_version_reply(MMonGetVersionReply* m)
1415 {
1416 ceph_assert(ceph_mutex_is_locked(monc_lock));
1417 auto iter = version_requests.find(m->handle);
1418 if (iter == version_requests.end()) {
1419 ldout(cct, 0) << __func__ << " version request with handle " << m->handle
1420 << " not found" << dendl;
1421 } else {
1422 version_req_d *req = iter->second;
1423 ldout(cct, 10) << __func__ << " finishing " << req << " version " << m->version << dendl;
1424 version_requests.erase(iter);
1425 if (req->newest)
1426 *req->newest = m->version;
1427 if (req->oldest)
1428 *req->oldest = m->oldest_version;
1429 finisher.queue(req->context, 0);
1430 delete req;
1431 }
1432 m->put();
1433 }
1434
1435 int MonClient::get_auth_request(
1436 Connection *con,
1437 AuthConnectionMeta *auth_meta,
1438 uint32_t *auth_method,
1439 std::vector<uint32_t> *preferred_modes,
1440 ceph::buffer::list *bl)
1441 {
1442 std::lock_guard l(monc_lock);
1443 ldout(cct,10) << __func__ << " con " << con << " auth_method " << *auth_method
1444 << dendl;
1445
1446 // connection to mon?
1447 if (con->get_peer_type() == CEPH_ENTITY_TYPE_MON) {
1448 ceph_assert(!auth_meta->authorizer);
1449 if (con->is_anon()) {
1450 for (auto& i : mon_commands) {
1451 if (i.second->target_con == con) {
1452 return i.second->target_session->get_auth_request(
1453 auth_method, preferred_modes, bl,
1454 entity_name, want_keys, rotating_secrets.get());
1455 }
1456 }
1457 }
1458 for (auto& i : pending_cons) {
1459 if (i.second.is_con(con)) {
1460 return i.second.get_auth_request(
1461 auth_method, preferred_modes, bl,
1462 entity_name, want_keys, rotating_secrets.get());
1463 }
1464 }
1465 return -ENOENT;
1466 }
1467
1468 // generate authorizer
1469 if (!auth) {
1470 lderr(cct) << __func__ << " but no auth handler is set up" << dendl;
1471 return -EACCES;
1472 }
1473 auth_meta->authorizer.reset(auth->build_authorizer(con->get_peer_type()));
1474 if (!auth_meta->authorizer) {
1475 lderr(cct) << __func__ << " failed to build_authorizer for type "
1476 << ceph_entity_type_name(con->get_peer_type()) << dendl;
1477 return -EACCES;
1478 }
1479 auth_meta->auth_method = auth_meta->authorizer->protocol;
1480 auth_registry.get_supported_modes(con->get_peer_type(),
1481 auth_meta->auth_method,
1482 preferred_modes);
1483 *bl = auth_meta->authorizer->bl;
1484 return 0;
1485 }
1486
1487 int MonClient::handle_auth_reply_more(
1488 Connection *con,
1489 AuthConnectionMeta *auth_meta,
1490 const ceph::buffer::list& bl,
1491 ceph::buffer::list *reply)
1492 {
1493 std::lock_guard l(monc_lock);
1494
1495 if (con->get_peer_type() == CEPH_ENTITY_TYPE_MON) {
1496 if (con->is_anon()) {
1497 for (auto& i : mon_commands) {
1498 if (i.second->target_con == con) {
1499 return i.second->target_session->handle_auth_reply_more(
1500 auth_meta, bl, reply);
1501 }
1502 }
1503 }
1504 for (auto& i : pending_cons) {
1505 if (i.second.is_con(con)) {
1506 return i.second.handle_auth_reply_more(auth_meta, bl, reply);
1507 }
1508 }
1509 return -ENOENT;
1510 }
1511
1512 // authorizer challenges
1513 if (!auth || !auth_meta->authorizer) {
1514 lderr(cct) << __func__ << " no authorizer?" << dendl;
1515 return -1;
1516 }
1517 auth_meta->authorizer->add_challenge(cct, bl);
1518 *reply = auth_meta->authorizer->bl;
1519 return 0;
1520 }
1521
1522 int MonClient::handle_auth_done(
1523 Connection *con,
1524 AuthConnectionMeta *auth_meta,
1525 uint64_t global_id,
1526 uint32_t con_mode,
1527 const ceph::buffer::list& bl,
1528 CryptoKey *session_key,
1529 std::string *connection_secret)
1530 {
1531 if (con->get_peer_type() == CEPH_ENTITY_TYPE_MON) {
1532 std::lock_guard l(monc_lock);
1533 if (con->is_anon()) {
1534 for (auto& i : mon_commands) {
1535 if (i.second->target_con == con) {
1536 return i.second->target_session->handle_auth_done(
1537 auth_meta, global_id, bl,
1538 session_key, connection_secret);
1539 }
1540 }
1541 }
1542 for (auto& i : pending_cons) {
1543 if (i.second.is_con(con)) {
1544 int r = i.second.handle_auth_done(
1545 auth_meta, global_id, bl,
1546 session_key, connection_secret);
1547 if (r) {
1548 pending_cons.erase(i.first);
1549 if (!pending_cons.empty()) {
1550 return r;
1551 }
1552 } else {
1553 active_con.reset(new MonConnection(std::move(i.second)));
1554 pending_cons.clear();
1555 ceph_assert(active_con->have_session());
1556 }
1557
1558 _finish_hunting(r);
1559 if (r || monmap.get_epoch() > 0) {
1560 _finish_auth(r);
1561 }
1562 return r;
1563 }
1564 }
1565 return -ENOENT;
1566 } else {
1567 // verify authorizer reply
1568 auto p = bl.begin();
1569 if (!auth_meta->authorizer->verify_reply(p, &auth_meta->connection_secret)) {
1570 ldout(cct, 0) << __func__ << " failed verifying authorizer reply"
1571 << dendl;
1572 return -EACCES;
1573 }
1574 auth_meta->session_key = auth_meta->authorizer->session_key;
1575 return 0;
1576 }
1577 }
1578
1579 int MonClient::handle_auth_bad_method(
1580 Connection *con,
1581 AuthConnectionMeta *auth_meta,
1582 uint32_t old_auth_method,
1583 int result,
1584 const std::vector<uint32_t>& allowed_methods,
1585 const std::vector<uint32_t>& allowed_modes)
1586 {
1587 auth_meta->allowed_methods = allowed_methods;
1588
1589 std::lock_guard l(monc_lock);
1590 if (con->get_peer_type() == CEPH_ENTITY_TYPE_MON) {
1591 if (con->is_anon()) {
1592 for (auto& i : mon_commands) {
1593 if (i.second->target_con == con) {
1594 int r = i.second->target_session->handle_auth_bad_method(
1595 old_auth_method,
1596 result,
1597 allowed_methods,
1598 allowed_modes);
1599 if (r < 0) {
1600 _finish_command(i.second, r, "auth failed");
1601 }
1602 return r;
1603 }
1604 }
1605 }
1606 for (auto& i : pending_cons) {
1607 if (i.second.is_con(con)) {
1608 int r = i.second.handle_auth_bad_method(old_auth_method,
1609 result,
1610 allowed_methods,
1611 allowed_modes);
1612 if (r == 0) {
1613 return r; // try another method on this con
1614 }
1615 pending_cons.erase(i.first);
1616 if (!pending_cons.empty()) {
1617 return r; // fail this con, maybe another con will succeed
1618 }
1619 // fail hunt
1620 _finish_hunting(r);
1621 _finish_auth(r);
1622 return r;
1623 }
1624 }
1625 return -ENOENT;
1626 } else {
1627 // huh...
1628 ldout(cct,10) << __func__ << " hmm, they didn't like " << old_auth_method
1629 << " result " << cpp_strerror(result)
1630 << " and auth is " << (auth ? auth->get_protocol() : 0)
1631 << dendl;
1632 return -EACCES;
1633 }
1634 }
1635
1636 int MonClient::handle_auth_request(
1637 Connection *con,
1638 AuthConnectionMeta *auth_meta,
1639 bool more,
1640 uint32_t auth_method,
1641 const ceph::buffer::list& payload,
1642 ceph::buffer::list *reply)
1643 {
1644 if (payload.length() == 0) {
1645 // for some channels prior to nautilus (osd heartbeat), we
1646 // tolerate the lack of an authorizer.
1647 if (!con->get_messenger()->require_authorizer) {
1648 handle_authentication_dispatcher->ms_handle_authentication(con);
1649 return 1;
1650 }
1651 return -EACCES;
1652 }
1653 auth_meta->auth_mode = payload[0];
1654 if (auth_meta->auth_mode < AUTH_MODE_AUTHORIZER ||
1655 auth_meta->auth_mode > AUTH_MODE_AUTHORIZER_MAX) {
1656 return -EACCES;
1657 }
1658 AuthAuthorizeHandler *ah = get_auth_authorize_handler(con->get_peer_type(),
1659 auth_method);
1660 if (!ah) {
1661 lderr(cct) << __func__ << " no AuthAuthorizeHandler found for auth method "
1662 << auth_method << dendl;
1663 return -EOPNOTSUPP;
1664 }
1665
1666 auto ac = &auth_meta->authorizer_challenge;
1667 if (!HAVE_FEATURE(con->get_features(), CEPHX_V2)) {
1668 if (cct->_conf->cephx_service_require_version >= 2) {
1669 ldout(cct,10) << __func__ << " client missing CEPHX_V2 ("
1670 << "cephx_service_requre_version = "
1671 << cct->_conf->cephx_service_require_version << ")" << dendl;
1672 return -EACCES;
1673 }
1674 ac = nullptr;
1675 }
1676
1677 bool was_challenge = (bool)auth_meta->authorizer_challenge;
1678 bool isvalid = ah->verify_authorizer(
1679 cct,
1680 *rotating_secrets,
1681 payload,
1682 auth_meta->get_connection_secret_length(),
1683 reply,
1684 &con->peer_name,
1685 &con->peer_global_id,
1686 &con->peer_caps_info,
1687 &auth_meta->session_key,
1688 &auth_meta->connection_secret,
1689 ac);
1690 if (isvalid) {
1691 handle_authentication_dispatcher->ms_handle_authentication(con);
1692 return 1;
1693 }
1694 if (!more && !was_challenge && auth_meta->authorizer_challenge) {
1695 ldout(cct,10) << __func__ << " added challenge on " << con << dendl;
1696 return 0;
1697 }
1698 ldout(cct,10) << __func__ << " bad authorizer on " << con << dendl;
1699 // discard old challenge
1700 auth_meta->authorizer_challenge.reset();
1701 return -EACCES;
1702 }
1703
1704 AuthAuthorizer* MonClient::build_authorizer(int service_id) const {
1705 std::lock_guard l(monc_lock);
1706 if (auth) {
1707 return auth->build_authorizer(service_id);
1708 } else {
1709 ldout(cct, 0) << __func__ << " for " << ceph_entity_type_name(service_id)
1710 << ", but no auth is available now" << dendl;
1711 return nullptr;
1712 }
1713 }
1714
1715 #define dout_subsys ceph_subsys_monc
1716 #undef dout_prefix
1717 #define dout_prefix *_dout << "monclient" << (have_session() ? ": " : "(hunting): ")
1718
1719 MonConnection::MonConnection(
1720 CephContext *cct, ConnectionRef con, uint64_t global_id,
1721 AuthRegistry *ar)
1722 : cct(cct), con(con), global_id(global_id), auth_registry(ar)
1723 {}
1724
1725 MonConnection::~MonConnection()
1726 {
1727 if (con) {
1728 con->mark_down();
1729 con.reset();
1730 }
1731 }
1732
1733 bool MonConnection::have_session() const
1734 {
1735 return state == State::HAVE_SESSION;
1736 }
1737
1738 void MonConnection::start(epoch_t epoch,
1739 const EntityName& entity_name)
1740 {
1741 using ceph::encode;
1742 auth_start = ceph_clock_now();
1743
1744 if (con->get_peer_addr().is_msgr2()) {
1745 ldout(cct, 10) << __func__ << " opening mon connection" << dendl;
1746 state = State::AUTHENTICATING;
1747 con->send_message(new MMonGetMap());
1748 return;
1749 }
1750
1751 // restart authentication handshake
1752 state = State::NEGOTIATING;
1753
1754 // send an initial keepalive to ensure our timestamp is valid by the
1755 // time we are in an OPENED state (by sequencing this before
1756 // authentication).
1757 con->send_keepalive();
1758
1759 auto m = new MAuth;
1760 m->protocol = CEPH_AUTH_UNKNOWN;
1761 m->monmap_epoch = epoch;
1762 __u8 struct_v = 1;
1763 encode(struct_v, m->auth_payload);
1764 std::vector<uint32_t> auth_supported;
1765 auth_registry->get_supported_methods(con->get_peer_type(), &auth_supported);
1766 encode(auth_supported, m->auth_payload);
1767 encode(entity_name, m->auth_payload);
1768 encode(global_id, m->auth_payload);
1769 con->send_message(m);
1770 }
1771
1772 int MonConnection::get_auth_request(
1773 uint32_t *method,
1774 std::vector<uint32_t> *preferred_modes,
1775 ceph::buffer::list *bl,
1776 const EntityName& entity_name,
1777 uint32_t want_keys,
1778 RotatingKeyRing* keyring)
1779 {
1780 using ceph::encode;
1781 // choose method
1782 if (auth_method < 0) {
1783 std::vector<uint32_t> as;
1784 auth_registry->get_supported_methods(con->get_peer_type(), &as);
1785 if (as.empty()) {
1786 return -EACCES;
1787 }
1788 auth_method = as.front();
1789 }
1790 *method = auth_method;
1791 auth_registry->get_supported_modes(con->get_peer_type(), auth_method,
1792 preferred_modes);
1793 ldout(cct,10) << __func__ << " method " << *method
1794 << " preferred_modes " << *preferred_modes << dendl;
1795 if (preferred_modes->empty()) {
1796 return -EACCES;
1797 }
1798
1799 if (auth) {
1800 auth.reset();
1801 }
1802 int r = _init_auth(*method, entity_name, want_keys, keyring, true);
1803 ceph_assert(r == 0);
1804
1805 // initial requset includes some boilerplate...
1806 encode((char)AUTH_MODE_MON, *bl);
1807 encode(entity_name, *bl);
1808 encode(global_id, *bl);
1809
1810 // and (maybe) some method-specific initial payload
1811 auth->build_initial_request(bl);
1812
1813 return 0;
1814 }
1815
1816 int MonConnection::handle_auth_reply_more(
1817 AuthConnectionMeta *auth_meta,
1818 const ceph::buffer::list& bl,
1819 ceph::buffer::list *reply)
1820 {
1821 ldout(cct, 10) << __func__ << " payload " << bl.length() << dendl;
1822 ldout(cct, 30) << __func__ << " got\n";
1823 bl.hexdump(*_dout);
1824 *_dout << dendl;
1825
1826 auto p = bl.cbegin();
1827 ldout(cct, 10) << __func__ << " payload_len " << bl.length() << dendl;
1828 int r = auth->handle_response(0, p, &auth_meta->session_key,
1829 &auth_meta->connection_secret);
1830 if (r == -EAGAIN) {
1831 auth->prepare_build_request();
1832 auth->build_request(*reply);
1833 ldout(cct, 10) << __func__ << " responding with " << reply->length()
1834 << " bytes" << dendl;
1835 r = 0;
1836 } else if (r < 0) {
1837 lderr(cct) << __func__ << " handle_response returned " << r << dendl;
1838 } else {
1839 ldout(cct, 10) << __func__ << " authenticated!" << dendl;
1840 // FIXME
1841 ceph_abort(cct, "write me");
1842 }
1843 return r;
1844 }
1845
1846 int MonConnection::handle_auth_done(
1847 AuthConnectionMeta *auth_meta,
1848 uint64_t new_global_id,
1849 const ceph::buffer::list& bl,
1850 CryptoKey *session_key,
1851 std::string *connection_secret)
1852 {
1853 ldout(cct,10) << __func__ << " global_id " << new_global_id
1854 << " payload " << bl.length()
1855 << dendl;
1856 global_id = new_global_id;
1857 auth->set_global_id(global_id);
1858 auto p = bl.begin();
1859 int auth_err = auth->handle_response(0, p, &auth_meta->session_key,
1860 &auth_meta->connection_secret);
1861 if (auth_err >= 0) {
1862 state = State::HAVE_SESSION;
1863 }
1864 con->set_last_keepalive_ack(auth_start);
1865
1866 if (pending_tell_command) {
1867 con->send_message2(std::move(pending_tell_command));
1868 }
1869 return auth_err;
1870 }
1871
1872 int MonConnection::handle_auth_bad_method(
1873 uint32_t old_auth_method,
1874 int result,
1875 const std::vector<uint32_t>& allowed_methods,
1876 const std::vector<uint32_t>& allowed_modes)
1877 {
1878 ldout(cct,10) << __func__ << " old_auth_method " << old_auth_method
1879 << " result " << cpp_strerror(result)
1880 << " allowed_methods " << allowed_methods << dendl;
1881 std::vector<uint32_t> auth_supported;
1882 auth_registry->get_supported_methods(con->get_peer_type(), &auth_supported);
1883 auto p = std::find(auth_supported.begin(), auth_supported.end(),
1884 old_auth_method);
1885 assert(p != auth_supported.end());
1886 p = std::find_first_of(std::next(p), auth_supported.end(),
1887 allowed_methods.begin(), allowed_methods.end());
1888 if (p == auth_supported.end()) {
1889 lderr(cct) << __func__ << " server allowed_methods " << allowed_methods
1890 << " but i only support " << auth_supported << dendl;
1891 return -EACCES;
1892 }
1893 auth_method = *p;
1894 ldout(cct,10) << __func__ << " will try " << auth_method << " next" << dendl;
1895 return 0;
1896 }
1897
1898 int MonConnection::handle_auth(MAuthReply* m,
1899 const EntityName& entity_name,
1900 uint32_t want_keys,
1901 RotatingKeyRing* keyring)
1902 {
1903 if (state == State::NEGOTIATING) {
1904 int r = _negotiate(m, entity_name, want_keys, keyring);
1905 if (r) {
1906 return r;
1907 }
1908 state = State::AUTHENTICATING;
1909 }
1910 int r = authenticate(m);
1911 if (!r) {
1912 state = State::HAVE_SESSION;
1913 }
1914 return r;
1915 }
1916
1917 int MonConnection::_negotiate(MAuthReply *m,
1918 const EntityName& entity_name,
1919 uint32_t want_keys,
1920 RotatingKeyRing* keyring)
1921 {
1922 if (auth && (int)m->protocol == auth->get_protocol()) {
1923 // good, negotiation completed
1924 auth->reset();
1925 return 0;
1926 }
1927
1928 int r = _init_auth(m->protocol, entity_name, want_keys, keyring, false);
1929 if (r == -ENOTSUP) {
1930 if (m->result == -ENOTSUP) {
1931 ldout(cct, 10) << "none of our auth protocols are supported by the server"
1932 << dendl;
1933 }
1934 return m->result;
1935 }
1936 return r;
1937 }
1938
1939 int MonConnection::_init_auth(
1940 uint32_t method,
1941 const EntityName& entity_name,
1942 uint32_t want_keys,
1943 RotatingKeyRing* keyring,
1944 bool msgr2)
1945 {
1946 ldout(cct,10) << __func__ << " method " << method << dendl;
1947 auth.reset(
1948 AuthClientHandler::create(cct, method, keyring));
1949 if (!auth) {
1950 ldout(cct, 10) << " no handler for protocol " << method << dendl;
1951 return -ENOTSUP;
1952 }
1953
1954 // do not request MGR key unless the mon has the SERVER_KRAKEN
1955 // feature. otherwise it will give us an auth error. note that
1956 // we have to use the FEATUREMASK because pre-jewel the kraken
1957 // feature bit was used for something else.
1958 if (!msgr2 &&
1959 (want_keys & CEPH_ENTITY_TYPE_MGR) &&
1960 !(con->has_features(CEPH_FEATUREMASK_SERVER_KRAKEN))) {
1961 ldout(cct, 1) << __func__
1962 << " not requesting MGR keys from pre-kraken monitor"
1963 << dendl;
1964 want_keys &= ~CEPH_ENTITY_TYPE_MGR;
1965 }
1966 auth->set_want_keys(want_keys);
1967 auth->init(entity_name);
1968 auth->set_global_id(global_id);
1969 return 0;
1970 }
1971
1972 int MonConnection::authenticate(MAuthReply *m)
1973 {
1974 ceph_assert(auth);
1975 if (!m->global_id) {
1976 ldout(cct, 1) << "peer sent an invalid global_id" << dendl;
1977 }
1978 if (m->global_id != global_id) {
1979 // it's a new session
1980 auth->reset();
1981 global_id = m->global_id;
1982 auth->set_global_id(global_id);
1983 ldout(cct, 10) << "my global_id is " << m->global_id << dendl;
1984 }
1985 auto p = m->result_bl.cbegin();
1986 int ret = auth->handle_response(m->result, p, nullptr, nullptr);
1987 if (ret == -EAGAIN) {
1988 auto ma = new MAuth;
1989 ma->protocol = auth->get_protocol();
1990 auth->prepare_build_request();
1991 auth->build_request(ma->auth_payload);
1992 con->send_message(ma);
1993 }
1994 if (ret == 0 && pending_tell_command) {
1995 con->send_message2(std::move(pending_tell_command));
1996 }
1997
1998 return ret;
1999 }
2000
2001 void MonClient::register_config_callback(md_config_t::config_callback fn) {
2002 ceph_assert(!config_cb);
2003 config_cb = fn;
2004 }
2005
2006 md_config_t::config_callback MonClient::get_config_callback() {
2007 return config_cb;
2008 }