]> git.proxmox.com Git - ceph.git/blob - ceph/src/mon/MonClient.cc
import ceph 15.2.11
[ceph.git] / ceph / src / mon / MonClient.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #include <algorithm>
16 #include <iterator>
17 #include <random>
18 #include <boost/range/adaptor/map.hpp>
19 #include <boost/range/adaptor/filtered.hpp>
20 #include <boost/range/algorithm/copy.hpp>
21 #include <boost/range/algorithm_ext/copy_n.hpp>
22 #include "common/weighted_shuffle.h"
23
24 #include "include/scope_guard.h"
25 #include "include/stringify.h"
26
27 #include "messages/MMonGetMap.h"
28 #include "messages/MMonGetVersion.h"
29 #include "messages/MMonGetVersionReply.h"
30 #include "messages/MMonMap.h"
31 #include "messages/MConfig.h"
32 #include "messages/MGetConfig.h"
33 #include "messages/MAuth.h"
34 #include "messages/MLogAck.h"
35 #include "messages/MAuthReply.h"
36 #include "messages/MMonCommand.h"
37 #include "messages/MMonCommandAck.h"
38 #include "messages/MCommand.h"
39 #include "messages/MCommandReply.h"
40 #include "messages/MPing.h"
41
42 #include "messages/MMonSubscribe.h"
43 #include "messages/MMonSubscribeAck.h"
44 #include "common/errno.h"
45 #include "common/hostname.h"
46 #include "common/LogClient.h"
47
48 #include "MonClient.h"
49 #include "MonMap.h"
50
51 #include "auth/Auth.h"
52 #include "auth/KeyRing.h"
53 #include "auth/AuthClientHandler.h"
54 #include "auth/AuthRegistry.h"
55 #include "auth/RotatingKeyRing.h"
56
57 #define dout_subsys ceph_subsys_monc
58 #undef dout_prefix
59 #define dout_prefix *_dout << "monclient" << (_hunting() ? "(hunting)":"") << ": "
60
61 using std::string;
62
63 MonClient::MonClient(CephContext *cct_) :
64 Dispatcher(cct_),
65 AuthServer(cct_),
66 messenger(NULL),
67 timer(cct_, monc_lock),
68 finisher(cct_),
69 initialized(false),
70 log_client(NULL),
71 more_log_pending(false),
72 want_monmap(true),
73 had_a_connection(false),
74 reopen_interval_multiplier(
75 cct_->_conf.get_val<double>("mon_client_hunt_interval_min_multiple")),
76 last_mon_command_tid(0),
77 version_req_id(0)
78 {}
79
80 MonClient::~MonClient()
81 {
82 }
83
84 int MonClient::build_initial_monmap()
85 {
86 ldout(cct, 10) << __func__ << dendl;
87 int r = monmap.build_initial(cct, false, std::cerr);
88 ldout(cct,10) << "monmap:\n";
89 monmap.print(*_dout);
90 *_dout << dendl;
91 return r;
92 }
93
94 int MonClient::get_monmap()
95 {
96 ldout(cct, 10) << __func__ << dendl;
97 std::unique_lock l(monc_lock);
98
99 sub.want("monmap", 0, 0);
100 if (!_opened())
101 _reopen_session();
102 map_cond.wait(l, [this] { return !want_monmap; });
103 ldout(cct, 10) << __func__ << " done" << dendl;
104 return 0;
105 }
106
107 int MonClient::get_monmap_and_config()
108 {
109 ldout(cct, 10) << __func__ << dendl;
110 ceph_assert(!messenger);
111
112 int tries = 10;
113
114 cct->init_crypto();
115 auto shutdown_crypto = make_scope_guard([this] {
116 cct->shutdown_crypto();
117 });
118
119 int r = build_initial_monmap();
120 if (r < 0) {
121 lderr(cct) << __func__ << " cannot identify monitors to contact" << dendl;
122 return r;
123 }
124
125 messenger = Messenger::create_client_messenger(
126 cct, "temp_mon_client");
127 ceph_assert(messenger);
128 messenger->add_dispatcher_head(this);
129 messenger->start();
130 auto shutdown_msgr = make_scope_guard([this] {
131 messenger->shutdown();
132 messenger->wait();
133 delete messenger;
134 messenger = nullptr;
135 if (!monmap.fsid.is_zero()) {
136 cct->_conf.set_val("fsid", stringify(monmap.fsid));
137 }
138 });
139
140 while (tries-- > 0) {
141 r = init();
142 if (r < 0) {
143 return r;
144 }
145 r = authenticate(cct->_conf->client_mount_timeout);
146 if (r == -ETIMEDOUT) {
147 shutdown();
148 continue;
149 }
150 if (r < 0) {
151 break;
152 }
153 {
154 std::unique_lock l(monc_lock);
155 if (monmap.get_epoch() &&
156 !monmap.persistent_features.contains_all(
157 ceph::features::mon::FEATURE_MIMIC)) {
158 ldout(cct,10) << __func__ << " pre-mimic monitor, no config to fetch"
159 << dendl;
160 r = 0;
161 break;
162 }
163 while ((!got_config || monmap.get_epoch() == 0) && r == 0) {
164 ldout(cct,20) << __func__ << " waiting for monmap|config" << dendl;
165 auto status = map_cond.wait_for(l, ceph::make_timespan(
166 cct->_conf->mon_client_hunt_interval));
167 if (status == std::cv_status::timeout) {
168 r = -ETIMEDOUT;
169 }
170 }
171 if (got_config) {
172 ldout(cct,10) << __func__ << " success" << dendl;
173 r = 0;
174 break;
175 }
176 }
177 lderr(cct) << __func__ << " failed to get config" << dendl;
178 shutdown();
179 continue;
180 }
181
182 shutdown();
183 return r;
184 }
185
186
187 /**
188 * Ping the monitor with id @p mon_id and set the resulting reply in
189 * the provided @p result_reply, if this last parameter is not NULL.
190 *
191 * So that we don't rely on the MonClient's default messenger, set up
192 * during connect(), we create our own messenger to comunicate with the
193 * specified monitor. This is advantageous in the following ways:
194 *
195 * - Isolate the ping procedure from the rest of the MonClient's operations,
196 * allowing us to not acquire or manage the big monc_lock, thus not
197 * having to block waiting for some other operation to finish before we
198 * can proceed.
199 * * for instance, we can ping mon.FOO even if we are currently hunting
200 * or blocked waiting for auth to complete with mon.BAR.
201 *
202 * - Ping a monitor prior to establishing a connection (using connect())
203 * and properly establish the MonClient's messenger. This frees us
204 * from dealing with the complex foo that happens in connect().
205 *
206 * We also don't rely on MonClient as a dispatcher for this messenger,
207 * unlike what happens with the MonClient's default messenger. This allows
208 * us to sandbox the whole ping, having it much as a separate entity in
209 * the MonClient class, considerably simplifying the handling and dispatching
210 * of messages without needing to consider monc_lock.
211 *
212 * Current drawback is that we will establish a messenger for each ping
213 * we want to issue, instead of keeping a single messenger instance that
214 * would be used for all pings.
215 */
216 int MonClient::ping_monitor(const string &mon_id, string *result_reply)
217 {
218 ldout(cct, 10) << __func__ << dendl;
219
220 string new_mon_id;
221 if (monmap.contains("noname-"+mon_id)) {
222 new_mon_id = "noname-"+mon_id;
223 } else {
224 new_mon_id = mon_id;
225 }
226
227 if (new_mon_id.empty()) {
228 ldout(cct, 10) << __func__ << " specified mon id is empty!" << dendl;
229 return -EINVAL;
230 } else if (!monmap.contains(new_mon_id)) {
231 ldout(cct, 10) << __func__ << " no such monitor 'mon." << new_mon_id << "'"
232 << dendl;
233 return -ENOENT;
234 }
235
236 // N.B. monc isn't initialized
237
238 auth_registry.refresh_config();
239
240 KeyRing keyring;
241 keyring.from_ceph_context(cct);
242 RotatingKeyRing rkeyring(cct, cct->get_module_type(), &keyring);
243
244 MonClientPinger *pinger = new MonClientPinger(cct,
245 &rkeyring,
246 result_reply);
247
248 Messenger *smsgr = Messenger::create_client_messenger(cct, "temp_ping_client");
249 smsgr->add_dispatcher_head(pinger);
250 smsgr->set_auth_client(pinger);
251 smsgr->start();
252
253 ConnectionRef con = smsgr->connect_to_mon(monmap.get_addrs(new_mon_id));
254 ldout(cct, 10) << __func__ << " ping mon." << new_mon_id
255 << " " << con->get_peer_addr() << dendl;
256
257 pinger->mc.reset(new MonConnection(cct, con, 0, &auth_registry));
258 pinger->mc->start(monmap.get_epoch(), entity_name);
259 con->send_message(new MPing);
260
261 int ret = pinger->wait_for_reply(cct->_conf->mon_client_ping_timeout);
262 if (ret == 0) {
263 ldout(cct,10) << __func__ << " got ping reply" << dendl;
264 } else {
265 ret = -ret;
266 }
267
268 con->mark_down();
269 pinger->mc.reset();
270 smsgr->shutdown();
271 smsgr->wait();
272 delete smsgr;
273 delete pinger;
274 return ret;
275 }
276
277 bool MonClient::ms_dispatch(Message *m)
278 {
279 // we only care about these message types
280 switch (m->get_type()) {
281 case CEPH_MSG_MON_MAP:
282 case CEPH_MSG_AUTH_REPLY:
283 case CEPH_MSG_MON_SUBSCRIBE_ACK:
284 case CEPH_MSG_MON_GET_VERSION_REPLY:
285 case MSG_MON_COMMAND_ACK:
286 case MSG_COMMAND_REPLY:
287 case MSG_LOGACK:
288 case MSG_CONFIG:
289 break;
290 case CEPH_MSG_PING:
291 m->put();
292 return true;
293 default:
294 return false;
295 }
296
297 std::lock_guard lock(monc_lock);
298
299 if (!m->get_connection()->is_anon() &&
300 m->get_source().type() == CEPH_ENTITY_TYPE_MON) {
301 if (_hunting()) {
302 auto p = _find_pending_con(m->get_connection());
303 if (p == pending_cons.end()) {
304 // ignore any messages outside hunting sessions
305 ldout(cct, 10) << "discarding stray monitor message " << *m << dendl;
306 m->put();
307 return true;
308 }
309 } else if (!active_con || active_con->get_con() != m->get_connection()) {
310 // ignore any messages outside our session(s)
311 ldout(cct, 10) << "discarding stray monitor message " << *m << dendl;
312 m->put();
313 return true;
314 }
315 }
316
317 switch (m->get_type()) {
318 case CEPH_MSG_MON_MAP:
319 handle_monmap(static_cast<MMonMap*>(m));
320 if (passthrough_monmap) {
321 return false;
322 } else {
323 m->put();
324 }
325 break;
326 case CEPH_MSG_AUTH_REPLY:
327 handle_auth(static_cast<MAuthReply*>(m));
328 break;
329 case CEPH_MSG_MON_SUBSCRIBE_ACK:
330 handle_subscribe_ack(static_cast<MMonSubscribeAck*>(m));
331 break;
332 case CEPH_MSG_MON_GET_VERSION_REPLY:
333 handle_get_version_reply(static_cast<MMonGetVersionReply*>(m));
334 break;
335 case MSG_MON_COMMAND_ACK:
336 handle_mon_command_ack(static_cast<MMonCommandAck*>(m));
337 break;
338 case MSG_COMMAND_REPLY:
339 if (m->get_connection()->is_anon() &&
340 m->get_source().type() == CEPH_ENTITY_TYPE_MON) {
341 // this connection is from 'tell'... ignore everything except our command
342 // reply. (we'll get misc other message because we authenticated, but we
343 // don't need them.)
344 handle_command_reply(static_cast<MCommandReply*>(m));
345 return true;
346 }
347 // leave the message for another dispatch handler (e.g., Objecter)
348 return false;
349 case MSG_LOGACK:
350 if (log_client) {
351 log_client->handle_log_ack(static_cast<MLogAck*>(m));
352 m->put();
353 if (more_log_pending) {
354 send_log();
355 }
356 } else {
357 m->put();
358 }
359 break;
360 case MSG_CONFIG:
361 handle_config(static_cast<MConfig*>(m));
362 break;
363 }
364 return true;
365 }
366
367 void MonClient::send_log(bool flush)
368 {
369 if (log_client) {
370 auto lm = log_client->get_mon_log_message(flush);
371 if (lm)
372 _send_mon_message(std::move(lm));
373 more_log_pending = log_client->are_pending();
374 }
375 }
376
377 void MonClient::flush_log()
378 {
379 std::lock_guard l(monc_lock);
380 send_log();
381 }
382
383 /* Unlike all the other message-handling functions, we don't put away a reference
384 * because we want to support MMonMap passthrough to other Dispatchers. */
385 void MonClient::handle_monmap(MMonMap *m)
386 {
387 ldout(cct, 10) << __func__ << " " << *m << dendl;
388 auto con_addrs = m->get_source_addrs();
389 string old_name = monmap.get_name(con_addrs);
390 const auto old_epoch = monmap.get_epoch();
391
392 auto p = m->monmapbl.cbegin();
393 decode(monmap, p);
394
395 ldout(cct, 10) << " got monmap " << monmap.epoch
396 << " from mon." << old_name
397 << " (according to old e" << monmap.get_epoch() << ")"
398 << dendl;
399 ldout(cct, 10) << "dump:\n";
400 monmap.print(*_dout);
401 *_dout << dendl;
402
403 if (old_epoch != monmap.get_epoch()) {
404 tried.clear();
405 }
406 if (old_name.size() == 0) {
407 ldout(cct,10) << " can't identify which mon we were connected to" << dendl;
408 _reopen_session();
409 } else {
410 auto new_name = monmap.get_name(con_addrs);
411 if (new_name.empty()) {
412 ldout(cct, 10) << "mon." << old_name << " at " << con_addrs
413 << " went away" << dendl;
414 // can't find the mon we were talking to (above)
415 _reopen_session();
416 } else if (messenger->should_use_msgr2() &&
417 monmap.get_addrs(new_name).has_msgr2() &&
418 !con_addrs.has_msgr2()) {
419 ldout(cct,1) << " mon." << new_name << " has (v2) addrs "
420 << monmap.get_addrs(new_name) << " but i'm connected to "
421 << con_addrs << ", reconnecting" << dendl;
422 _reopen_session();
423 }
424 }
425
426 cct->set_mon_addrs(monmap);
427
428 sub.got("monmap", monmap.get_epoch());
429 map_cond.notify_all();
430 want_monmap = false;
431
432 if (authenticate_err == 1) {
433 _finish_auth(0);
434 }
435 }
436
437 void MonClient::handle_config(MConfig *m)
438 {
439 ldout(cct,10) << __func__ << " " << *m << dendl;
440 finisher.queue(new LambdaContext([this, m](int r) {
441 cct->_conf.set_mon_vals(cct, m->config, config_cb);
442 if (config_notify_cb) {
443 config_notify_cb();
444 }
445 m->put();
446 }));
447 got_config = true;
448 map_cond.notify_all();
449 }
450
451 // ----------------------
452
453 int MonClient::init()
454 {
455 ldout(cct, 10) << __func__ << dendl;
456
457 entity_name = cct->_conf->name;
458
459 auth_registry.refresh_config();
460
461 std::lock_guard l(monc_lock);
462 keyring.reset(new KeyRing);
463 if (auth_registry.is_supported_method(messenger->get_mytype(),
464 CEPH_AUTH_CEPHX)) {
465 // this should succeed, because auth_registry just checked!
466 int r = keyring->from_ceph_context(cct);
467 if (r != 0) {
468 // but be somewhat graceful in case there was a race condition
469 lderr(cct) << "keyring not found" << dendl;
470 return r;
471 }
472 }
473 if (!auth_registry.any_supported_methods(messenger->get_mytype())) {
474 return -ENOENT;
475 }
476
477 rotating_secrets.reset(
478 new RotatingKeyRing(cct, cct->get_module_type(), keyring.get()));
479
480 initialized = true;
481
482 messenger->set_auth_client(this);
483 messenger->add_dispatcher_head(this);
484
485 timer.init();
486 finisher.start();
487 schedule_tick();
488
489 return 0;
490 }
491
492 void MonClient::shutdown()
493 {
494 ldout(cct, 10) << __func__ << dendl;
495 monc_lock.lock();
496 stopping = true;
497 while (!version_requests.empty()) {
498 version_requests.begin()->second->context->complete(-ECANCELED);
499 ldout(cct, 20) << __func__ << " canceling and discarding version request "
500 << version_requests.begin()->second << dendl;
501 delete version_requests.begin()->second;
502 version_requests.erase(version_requests.begin());
503 }
504 while (!mon_commands.empty()) {
505 auto tid = mon_commands.begin()->first;
506 _cancel_mon_command(tid);
507 }
508 ldout(cct, 20) << __func__ << " discarding " << waiting_for_session.size()
509 << " pending message(s)" << dendl;
510 waiting_for_session.clear();
511
512 active_con.reset();
513 pending_cons.clear();
514
515 auth.reset();
516 global_id = 0;
517 authenticate_err = 0;
518 authenticated = false;
519
520 monc_lock.unlock();
521
522 if (initialized) {
523 finisher.wait_for_empty();
524 finisher.stop();
525 initialized = false;
526 }
527 monc_lock.lock();
528 timer.shutdown();
529 stopping = false;
530 monc_lock.unlock();
531 }
532
533 int MonClient::authenticate(double timeout)
534 {
535 std::unique_lock lock{monc_lock};
536
537 if (active_con) {
538 ldout(cct, 5) << "already authenticated" << dendl;
539 return 0;
540 }
541 sub.want("monmap", monmap.get_epoch() ? monmap.get_epoch() + 1 : 0, 0);
542 sub.want("config", 0, 0);
543 if (!_opened())
544 _reopen_session();
545
546 auto until = ceph::real_clock::now();
547 until += ceph::make_timespan(timeout);
548 if (timeout > 0.0)
549 ldout(cct, 10) << "authenticate will time out at " << until << dendl;
550 authenticate_err = 1; // == in progress
551 while (!active_con && authenticate_err >= 0) {
552 if (timeout > 0.0) {
553 auto r = auth_cond.wait_until(lock, until);
554 if (r == cv_status::timeout && !active_con) {
555 ldout(cct, 0) << "authenticate timed out after " << timeout << dendl;
556 authenticate_err = -ETIMEDOUT;
557 }
558 } else {
559 auth_cond.wait(lock);
560 }
561 }
562
563 if (active_con) {
564 ldout(cct, 5) << __func__ << " success, global_id "
565 << active_con->get_global_id() << dendl;
566 // active_con should not have been set if there was an error
567 ceph_assert(authenticate_err >= 0);
568 authenticated = true;
569 }
570
571 if (authenticate_err < 0 && auth_registry.no_keyring_disabled_cephx()) {
572 lderr(cct) << __func__ << " NOTE: no keyring found; disabled cephx authentication" << dendl;
573 }
574
575 return authenticate_err;
576 }
577
578 void MonClient::handle_auth(MAuthReply *m)
579 {
580 ceph_assert(ceph_mutex_is_locked(monc_lock));
581
582 if (m->get_connection()->is_anon()) {
583 // anon connection, used for mon tell commands
584 for (auto& p : mon_commands) {
585 if (p.second->target_con == m->get_connection()) {
586 auto& mc = p.second->target_session;
587 int ret = mc->handle_auth(m, entity_name,
588 CEPH_ENTITY_TYPE_MON,
589 rotating_secrets.get());
590 (void)ret; // we don't care
591 break;
592 }
593 }
594 m->put();
595 return;
596 }
597
598 if (!_hunting()) {
599 std::swap(active_con->get_auth(), auth);
600 int ret = active_con->authenticate(m);
601 m->put();
602 std::swap(auth, active_con->get_auth());
603 if (global_id != active_con->get_global_id()) {
604 lderr(cct) << __func__ << " peer assigned me a different global_id: "
605 << active_con->get_global_id() << dendl;
606 }
607 if (ret != -EAGAIN) {
608 _finish_auth(ret);
609 }
610 return;
611 }
612
613 // hunting
614 auto found = _find_pending_con(m->get_connection());
615 ceph_assert(found != pending_cons.end());
616 int auth_err = found->second.handle_auth(m, entity_name, want_keys,
617 rotating_secrets.get());
618 m->put();
619 if (auth_err == -EAGAIN) {
620 return;
621 }
622 if (auth_err) {
623 pending_cons.erase(found);
624 if (!pending_cons.empty()) {
625 // keep trying with pending connections
626 return;
627 }
628 // the last try just failed, give up.
629 } else {
630 auto& mc = found->second;
631 ceph_assert(mc.have_session());
632 active_con.reset(new MonConnection(std::move(mc)));
633 pending_cons.clear();
634 }
635
636 _finish_hunting(auth_err);
637 _finish_auth(auth_err);
638 }
639
640 void MonClient::_finish_auth(int auth_err)
641 {
642 ldout(cct,10) << __func__ << " " << auth_err << dendl;
643 authenticate_err = auth_err;
644 // _resend_mon_commands() could _reopen_session() if the connected mon is not
645 // the one the MonCommand is targeting.
646 if (!auth_err && active_con) {
647 ceph_assert(auth);
648 _check_auth_tickets();
649 }
650 auth_cond.notify_all();
651
652 if (!auth_err) {
653 Context *cb = nullptr;
654 if (session_established_context) {
655 cb = session_established_context.release();
656 }
657 if (cb) {
658 monc_lock.unlock();
659 cb->complete(0);
660 monc_lock.lock();
661 }
662 }
663 }
664
665 // ---------
666
667 void MonClient::send_mon_message(MessageRef m)
668 {
669 std::lock_guard l{monc_lock};
670 _send_mon_message(std::move(m));
671 }
672
673 void MonClient::_send_mon_message(MessageRef m)
674 {
675 ceph_assert(ceph_mutex_is_locked(monc_lock));
676 if (active_con) {
677 auto cur_con = active_con->get_con();
678 ldout(cct, 10) << "_send_mon_message to mon."
679 << monmap.get_name(cur_con->get_peer_addr())
680 << " at " << cur_con->get_peer_addr() << dendl;
681 cur_con->send_message2(std::move(m));
682 } else {
683 waiting_for_session.push_back(std::move(m));
684 }
685 }
686
687 void MonClient::_reopen_session(int rank)
688 {
689 ceph_assert(ceph_mutex_is_locked(monc_lock));
690 ldout(cct, 10) << __func__ << " rank " << rank << dendl;
691
692 active_con.reset();
693 pending_cons.clear();
694
695 _start_hunting();
696
697 if (rank >= 0) {
698 _add_conn(rank);
699 } else {
700 _add_conns();
701 }
702
703 // throw out old queued messages
704 waiting_for_session.clear();
705
706 // throw out version check requests
707 while (!version_requests.empty()) {
708 finisher.queue(version_requests.begin()->second->context, -EAGAIN);
709 delete version_requests.begin()->second;
710 version_requests.erase(version_requests.begin());
711 }
712
713 for (auto& c : pending_cons) {
714 c.second.start(monmap.get_epoch(), entity_name);
715 }
716
717 if (sub.reload()) {
718 _renew_subs();
719 }
720 }
721
722 MonConnection& MonClient::_add_conn(unsigned rank)
723 {
724 auto peer = monmap.get_addrs(rank);
725 auto conn = messenger->connect_to_mon(peer);
726 MonConnection mc(cct, conn, global_id, &auth_registry);
727 if (auth) {
728 mc.get_auth().reset(auth->clone());
729 }
730 auto inserted = pending_cons.insert(std::make_pair(peer, std::move(mc)));
731 ldout(cct, 10) << "picked mon." << monmap.get_name(rank)
732 << " con " << conn
733 << " addr " << peer
734 << dendl;
735 return inserted.first->second;
736 }
737
738 void MonClient::_add_conns()
739 {
740 // collect the next batch of candidates who are listed right next to the ones
741 // already tried
742 auto get_next_batch = [this]() -> std::vector<unsigned> {
743 std::multimap<uint16_t, unsigned> ranks_by_priority;
744 boost::copy(
745 monmap.mon_info | boost::adaptors::filtered(
746 [this](auto& info) {
747 auto rank = monmap.get_rank(info.first);
748 return tried.count(rank) == 0;
749 }) | boost::adaptors::transformed(
750 [this](auto& info) {
751 auto rank = monmap.get_rank(info.first);
752 return std::make_pair(info.second.priority, rank);
753 }), std::inserter(ranks_by_priority, end(ranks_by_priority)));
754 if (ranks_by_priority.empty()) {
755 return {};
756 }
757 // only choose the monitors with lowest priority
758 auto cands = boost::make_iterator_range(
759 ranks_by_priority.equal_range(ranks_by_priority.begin()->first));
760 std::vector<unsigned> ranks;
761 boost::range::copy(cands | boost::adaptors::map_values,
762 std::back_inserter(ranks));
763 return ranks;
764 };
765 auto ranks = get_next_batch();
766 if (ranks.empty()) {
767 tried.clear(); // start over
768 ranks = get_next_batch();
769 }
770 ceph_assert(!ranks.empty());
771 if (ranks.size() > 1) {
772 std::vector<uint16_t> weights;
773 for (auto i : ranks) {
774 auto rank_name = monmap.get_name(i);
775 weights.push_back(monmap.get_weight(rank_name));
776 }
777 std::random_device rd;
778 if (std::accumulate(begin(weights), end(weights), 0u) == 0) {
779 std::shuffle(begin(ranks), end(ranks), std::mt19937{rd()});
780 } else {
781 weighted_shuffle(begin(ranks), end(ranks), begin(weights), end(weights),
782 std::mt19937{rd()});
783 }
784 }
785 ldout(cct, 10) << __func__ << " ranks=" << ranks << dendl;
786 unsigned n = cct->_conf->mon_client_hunt_parallel;
787 if (n == 0 || n > ranks.size()) {
788 n = ranks.size();
789 }
790 for (unsigned i = 0; i < n; i++) {
791 _add_conn(ranks[i]);
792 tried.insert(ranks[i]);
793 }
794 }
795
796 bool MonClient::ms_handle_reset(Connection *con)
797 {
798 std::lock_guard lock(monc_lock);
799
800 if (con->get_peer_type() != CEPH_ENTITY_TYPE_MON)
801 return false;
802
803 if (con->is_anon()) {
804 auto p = mon_commands.begin();
805 while (p != mon_commands.end()) {
806 auto cmd = p->second;
807 ++p;
808 if (cmd->target_con == con) {
809 _send_command(cmd); // may retry or fail
810 break;
811 }
812 }
813 return true;
814 }
815
816 if (_hunting()) {
817 if (pending_cons.count(con->get_peer_addrs())) {
818 ldout(cct, 10) << __func__ << " hunted mon " << con->get_peer_addrs()
819 << dendl;
820 } else {
821 ldout(cct, 10) << __func__ << " stray mon " << con->get_peer_addrs()
822 << dendl;
823 }
824 return true;
825 } else {
826 if (active_con && con == active_con->get_con()) {
827 ldout(cct, 10) << __func__ << " current mon " << con->get_peer_addrs()
828 << dendl;
829 _reopen_session();
830 return false;
831 } else {
832 ldout(cct, 10) << "ms_handle_reset stray mon " << con->get_peer_addrs()
833 << dendl;
834 return true;
835 }
836 }
837 }
838
839 bool MonClient::_opened() const
840 {
841 ceph_assert(ceph_mutex_is_locked(monc_lock));
842 return active_con || _hunting();
843 }
844
845 bool MonClient::_hunting() const
846 {
847 return !pending_cons.empty();
848 }
849
850 void MonClient::_start_hunting()
851 {
852 ceph_assert(!_hunting());
853 // adjust timeouts if necessary
854 if (!had_a_connection)
855 return;
856 reopen_interval_multiplier *= cct->_conf->mon_client_hunt_interval_backoff;
857 if (reopen_interval_multiplier >
858 cct->_conf->mon_client_hunt_interval_max_multiple) {
859 reopen_interval_multiplier =
860 cct->_conf->mon_client_hunt_interval_max_multiple;
861 }
862 }
863
864 void MonClient::_finish_hunting(int auth_err)
865 {
866 ldout(cct,10) << __func__ << " " << auth_err << dendl;
867 ceph_assert(ceph_mutex_is_locked(monc_lock));
868 // the pending conns have been cleaned.
869 ceph_assert(!_hunting());
870 if (active_con) {
871 auto con = active_con->get_con();
872 ldout(cct, 1) << "found mon."
873 << monmap.get_name(con->get_peer_addr())
874 << dendl;
875 } else {
876 ldout(cct, 1) << "no mon sessions established" << dendl;
877 }
878
879 had_a_connection = true;
880 _un_backoff();
881
882 if (!auth_err) {
883 last_rotating_renew_sent = utime_t();
884 while (!waiting_for_session.empty()) {
885 _send_mon_message(std::move(waiting_for_session.front()));
886 waiting_for_session.pop_front();
887 }
888 _resend_mon_commands();
889 send_log(true);
890 if (active_con) {
891 auth = std::move(active_con->get_auth());
892 if (global_id && global_id != active_con->get_global_id()) {
893 lderr(cct) << __func__ << " global_id changed from " << global_id
894 << " to " << active_con->get_global_id() << dendl;
895 }
896 global_id = active_con->get_global_id();
897 }
898 }
899 }
900
901 void MonClient::tick()
902 {
903 ldout(cct, 10) << __func__ << dendl;
904
905 utime_t now = ceph_clock_now();
906
907 auto reschedule_tick = make_scope_guard([this] {
908 schedule_tick();
909 });
910
911 _check_auth_tickets();
912 _check_tell_commands();
913
914 if (_hunting()) {
915 ldout(cct, 1) << "continuing hunt" << dendl;
916 return _reopen_session();
917 } else if (active_con) {
918 // just renew as needed
919 auto cur_con = active_con->get_con();
920 if (!cur_con->has_feature(CEPH_FEATURE_MON_STATEFUL_SUB)) {
921 const bool maybe_renew = sub.need_renew();
922 ldout(cct, 10) << "renew subs? -- " << (maybe_renew ? "yes" : "no")
923 << dendl;
924 if (maybe_renew) {
925 _renew_subs();
926 }
927 }
928
929 if (now > last_keepalive + cct->_conf->mon_client_ping_interval) {
930 cur_con->send_keepalive();
931 last_keepalive = now;
932
933 if (cct->_conf->mon_client_ping_timeout > 0 &&
934 cur_con->has_feature(CEPH_FEATURE_MSGR_KEEPALIVE2)) {
935 utime_t lk = cur_con->get_last_keepalive_ack();
936 utime_t interval = now - lk;
937 if (interval > cct->_conf->mon_client_ping_timeout) {
938 ldout(cct, 1) << "no keepalive since " << lk << " (" << interval
939 << " seconds), reconnecting" << dendl;
940 return _reopen_session();
941 }
942 }
943
944 _un_backoff();
945 }
946
947 if (now > last_send_log + cct->_conf->mon_client_log_interval) {
948 send_log();
949 last_send_log = now;
950 }
951 }
952 }
953
954 void MonClient::_un_backoff()
955 {
956 // un-backoff our reconnect interval
957 reopen_interval_multiplier = std::max(
958 cct->_conf.get_val<double>("mon_client_hunt_interval_min_multiple"),
959 reopen_interval_multiplier /
960 cct->_conf.get_val<double>("mon_client_hunt_interval_backoff"));
961 ldout(cct, 20) << __func__ << " reopen_interval_multipler now "
962 << reopen_interval_multiplier << dendl;
963 }
964
965 void MonClient::schedule_tick()
966 {
967 auto do_tick = make_lambda_context([this](int) { tick(); });
968 if (!is_connected()) {
969 // start another round of hunting
970 const auto hunt_interval = (cct->_conf->mon_client_hunt_interval *
971 reopen_interval_multiplier);
972 timer.add_event_after(hunt_interval, do_tick);
973 } else {
974 // keep in touch
975 timer.add_event_after(std::min(cct->_conf->mon_client_ping_interval,
976 cct->_conf->mon_client_log_interval),
977 do_tick);
978 }
979 }
980
981 // ---------
982
983 void MonClient::_renew_subs()
984 {
985 ceph_assert(ceph_mutex_is_locked(monc_lock));
986 if (!sub.have_new()) {
987 ldout(cct, 10) << __func__ << " - empty" << dendl;
988 return;
989 }
990
991 ldout(cct, 10) << __func__ << dendl;
992 if (!_opened())
993 _reopen_session();
994 else {
995 auto m = ceph::make_message<MMonSubscribe>();
996 m->what = sub.get_subs();
997 m->hostname = ceph_get_short_hostname();
998 _send_mon_message(std::move(m));
999 sub.renewed();
1000 }
1001 }
1002
1003 void MonClient::handle_subscribe_ack(MMonSubscribeAck *m)
1004 {
1005 sub.acked(m->interval);
1006 m->put();
1007 }
1008
1009 int MonClient::_check_auth_tickets()
1010 {
1011 ceph_assert(ceph_mutex_is_locked(monc_lock));
1012 if (active_con && auth) {
1013 if (auth->need_tickets()) {
1014 ldout(cct, 10) << __func__ << " getting new tickets!" << dendl;
1015 auto m = ceph::make_message<MAuth>();
1016 m->protocol = auth->get_protocol();
1017 auth->prepare_build_request();
1018 auth->build_request(m->auth_payload);
1019 _send_mon_message(m);
1020 }
1021
1022 _check_auth_rotating();
1023 }
1024 return 0;
1025 }
1026
1027 int MonClient::_check_auth_rotating()
1028 {
1029 ceph_assert(ceph_mutex_is_locked(monc_lock));
1030 if (!rotating_secrets ||
1031 !auth_principal_needs_rotating_keys(entity_name)) {
1032 ldout(cct, 20) << "_check_auth_rotating not needed by " << entity_name << dendl;
1033 return 0;
1034 }
1035
1036 if (!active_con || !auth) {
1037 ldout(cct, 10) << "_check_auth_rotating waiting for auth session" << dendl;
1038 return 0;
1039 }
1040
1041 utime_t now = ceph_clock_now();
1042 utime_t cutoff = now;
1043 cutoff -= std::min(30.0, cct->_conf->auth_service_ticket_ttl / 4.0);
1044 utime_t issued_at_lower_bound = now;
1045 issued_at_lower_bound -= cct->_conf->auth_service_ticket_ttl;
1046 if (!rotating_secrets->need_new_secrets(cutoff)) {
1047 ldout(cct, 10) << "_check_auth_rotating have uptodate secrets (they expire after " << cutoff << ")" << dendl;
1048 rotating_secrets->dump_rotating();
1049 return 0;
1050 }
1051
1052 ldout(cct, 10) << "_check_auth_rotating renewing rotating keys (they expired before " << cutoff << ")" << dendl;
1053 if (!rotating_secrets->need_new_secrets() &&
1054 rotating_secrets->need_new_secrets(issued_at_lower_bound)) {
1055 // the key has expired before it has been issued?
1056 lderr(cct) << __func__ << " possible clock skew, rotating keys expired way too early"
1057 << " (before " << issued_at_lower_bound << ")" << dendl;
1058 }
1059 if ((now > last_rotating_renew_sent) &&
1060 double(now - last_rotating_renew_sent) < 1) {
1061 ldout(cct, 10) << __func__ << " called too often (last: "
1062 << last_rotating_renew_sent << "), skipping refresh" << dendl;
1063 return 0;
1064 }
1065 auto m = ceph::make_message<MAuth>();
1066 m->protocol = auth->get_protocol();
1067 if (auth->build_rotating_request(m->auth_payload)) {
1068 last_rotating_renew_sent = now;
1069 _send_mon_message(std::move(m));
1070 }
1071 return 0;
1072 }
1073
1074 int MonClient::wait_auth_rotating(double timeout)
1075 {
1076 std::unique_lock l(monc_lock);
1077
1078 // Must be initialized
1079 ceph_assert(auth != nullptr);
1080
1081 if (auth->get_protocol() == CEPH_AUTH_NONE)
1082 return 0;
1083
1084 if (!rotating_secrets)
1085 return 0;
1086
1087 ldout(cct, 10) << __func__ << " waiting for " << timeout << dendl;
1088 utime_t now = ceph_clock_now();
1089 if (auth_cond.wait_for(l, ceph::make_timespan(timeout), [now, this] {
1090 return (!auth_principal_needs_rotating_keys(entity_name) ||
1091 !rotating_secrets->need_new_secrets(now));
1092 })) {
1093 ldout(cct, 10) << __func__ << " done" << dendl;
1094 return 0;
1095 } else {
1096 ldout(cct, 0) << __func__ << " timed out after " << timeout << dendl;
1097 return -ETIMEDOUT;
1098 }
1099 }
1100
1101 // ---------
1102
1103 void MonClient::_send_command(MonCommand *r)
1104 {
1105 if (r->is_tell()) {
1106 ++r->send_attempts;
1107 if (r->send_attempts > cct->_conf->mon_client_directed_command_retry) {
1108 _finish_command(r, -ENXIO, "mon unavailable");
1109 return;
1110 }
1111
1112 // tell-style command
1113 if (monmap.min_mon_release >= ceph_release_t::octopus) {
1114 if (r->target_con) {
1115 r->target_con->mark_down();
1116 }
1117 if (r->target_rank >= 0) {
1118 if (r->target_rank >= (int)monmap.size()) {
1119 ldout(cct, 10) << " target " << r->target_rank
1120 << " >= max mon " << monmap.size() << dendl;
1121 _finish_command(r, -ENOENT, "mon rank dne");
1122 return;
1123 }
1124 r->target_con = messenger->connect_to_mon(
1125 monmap.get_addrs(r->target_rank), true /* anon */);
1126 } else {
1127 if (!monmap.contains(r->target_name)) {
1128 ldout(cct, 10) << " target " << r->target_name
1129 << " not present in monmap" << dendl;
1130 _finish_command(r, -ENOENT, "mon dne");
1131 return;
1132 }
1133 r->target_con = messenger->connect_to_mon(
1134 monmap.get_addrs(r->target_name), true /* anon */);
1135 }
1136
1137 r->target_session.reset(new MonConnection(cct, r->target_con, 0,
1138 &auth_registry));
1139 r->target_session->start(monmap.get_epoch(), entity_name);
1140 r->last_send_attempt = ceph_clock_now();
1141
1142 MCommand *m = new MCommand(monmap.fsid);
1143 m->set_tid(r->tid);
1144 m->cmd = r->cmd;
1145 m->set_data(r->inbl);
1146 r->target_session->queue_command(m);
1147 return;
1148 }
1149
1150 // ugly legacy handling of pre-octopus mons
1151 entity_addr_t peer;
1152 if (active_con) {
1153 peer = active_con->get_con()->get_peer_addr();
1154 }
1155
1156 if (r->target_rank >= 0 &&
1157 r->target_rank != monmap.get_rank(peer)) {
1158 ldout(cct, 10) << __func__ << " " << r->tid << " " << r->cmd
1159 << " wants rank " << r->target_rank
1160 << ", reopening session"
1161 << dendl;
1162 if (r->target_rank >= (int)monmap.size()) {
1163 ldout(cct, 10) << " target " << r->target_rank
1164 << " >= max mon " << monmap.size() << dendl;
1165 _finish_command(r, -ENOENT, "mon rank dne");
1166 return;
1167 }
1168 _reopen_session(r->target_rank);
1169 return;
1170 }
1171 if (r->target_name.length() &&
1172 r->target_name != monmap.get_name(peer)) {
1173 ldout(cct, 10) << __func__ << " " << r->tid << " " << r->cmd
1174 << " wants mon " << r->target_name
1175 << ", reopening session"
1176 << dendl;
1177 if (!monmap.contains(r->target_name)) {
1178 ldout(cct, 10) << " target " << r->target_name
1179 << " not present in monmap" << dendl;
1180 _finish_command(r, -ENOENT, "mon dne");
1181 return;
1182 }
1183 _reopen_session(monmap.get_rank(r->target_name));
1184 return;
1185 }
1186 // fall-thru to send 'normal' CLI command
1187 }
1188
1189 // normal CLI command
1190 ldout(cct, 10) << __func__ << " " << r->tid << " " << r->cmd << dendl;
1191 auto m = ceph::make_message<MMonCommand>(monmap.fsid);
1192 m->set_tid(r->tid);
1193 m->cmd = r->cmd;
1194 m->set_data(r->inbl);
1195 _send_mon_message(std::move(m));
1196 return;
1197 }
1198
1199 void MonClient::_check_tell_commands()
1200 {
1201 // resend any requests
1202 auto now = ceph_clock_now();
1203 auto p = mon_commands.begin();
1204 while (p != mon_commands.end()) {
1205 auto cmd = p->second;
1206 ++p;
1207 if (cmd->is_tell() &&
1208 cmd->last_send_attempt != utime_t() &&
1209 now - cmd->last_send_attempt > cct->_conf->mon_client_hunt_interval) {
1210 ldout(cct,5) << __func__ << " timeout tell command " << cmd->tid << dendl;
1211 _send_command(cmd); // might remove cmd from mon_commands
1212 }
1213 }
1214 }
1215
1216 void MonClient::_resend_mon_commands()
1217 {
1218 // resend any requests
1219 auto p = mon_commands.begin();
1220 while (p != mon_commands.end()) {
1221 auto cmd = p->second;
1222 ++p;
1223 if (cmd->is_tell() && monmap.min_mon_release >= ceph_release_t::octopus) {
1224 // starting with octopus, tell commands use their own connetion and need no
1225 // special resend when we finish hunting.
1226 } else {
1227 _send_command(cmd); // might remove cmd from mon_commands
1228 }
1229 }
1230 }
1231
1232 void MonClient::handle_mon_command_ack(MMonCommandAck *ack)
1233 {
1234 MonCommand *r = NULL;
1235 uint64_t tid = ack->get_tid();
1236
1237 if (tid == 0 && !mon_commands.empty()) {
1238 r = mon_commands.begin()->second;
1239 ldout(cct, 10) << __func__ << " has tid 0, assuming it is " << r->tid << dendl;
1240 } else {
1241 auto p = mon_commands.find(tid);
1242 if (p == mon_commands.end()) {
1243 ldout(cct, 10) << __func__ << " " << ack->get_tid() << " not found" << dendl;
1244 ack->put();
1245 return;
1246 }
1247 r = p->second;
1248 }
1249
1250 ldout(cct, 10) << __func__ << " " << r->tid << " " << r->cmd << dendl;
1251 if (r->poutbl)
1252 r->poutbl->claim(ack->get_data());
1253 _finish_command(r, ack->r, ack->rs);
1254 ack->put();
1255 }
1256
1257 void MonClient::handle_command_reply(MCommandReply *reply)
1258 {
1259 MonCommand *r = NULL;
1260 uint64_t tid = reply->get_tid();
1261
1262 if (tid == 0 && !mon_commands.empty()) {
1263 r = mon_commands.begin()->second;
1264 ldout(cct, 10) << __func__ << " has tid 0, assuming it is " << r->tid
1265 << dendl;
1266 } else {
1267 auto p = mon_commands.find(tid);
1268 if (p == mon_commands.end()) {
1269 ldout(cct, 10) << __func__ << " " << reply->get_tid() << " not found"
1270 << dendl;
1271 reply->put();
1272 return;
1273 }
1274 r = p->second;
1275 }
1276
1277 ldout(cct, 10) << __func__ << " " << r->tid << " " << r->cmd << dendl;
1278 if (r->poutbl)
1279 r->poutbl->claim(reply->get_data());
1280 _finish_command(r, reply->r, reply->rs);
1281 reply->put();
1282 }
1283
1284 int MonClient::_cancel_mon_command(uint64_t tid)
1285 {
1286 ceph_assert(ceph_mutex_is_locked(monc_lock));
1287
1288 auto it = mon_commands.find(tid);
1289 if (it == mon_commands.end()) {
1290 ldout(cct, 10) << __func__ << " tid " << tid << " dne" << dendl;
1291 return -ENOENT;
1292 }
1293
1294 ldout(cct, 10) << __func__ << " tid " << tid << dendl;
1295
1296 MonCommand *cmd = it->second;
1297 _finish_command(cmd, -ETIMEDOUT, "");
1298 return 0;
1299 }
1300
1301 void MonClient::_finish_command(MonCommand *r, int ret, string rs)
1302 {
1303 ldout(cct, 10) << __func__ << " " << r->tid << " = " << ret << " " << rs << dendl;
1304 if (r->prval)
1305 *(r->prval) = ret;
1306 if (r->prs)
1307 *(r->prs) = rs;
1308 if (r->onfinish)
1309 finisher.queue(r->onfinish, ret);
1310 if (r->target_con) {
1311 r->target_con->mark_down();
1312 }
1313 mon_commands.erase(r->tid);
1314 delete r;
1315 }
1316
1317 void MonClient::start_mon_command(const std::vector<string>& cmd,
1318 const ceph::buffer::list& inbl,
1319 ceph::buffer::list *outbl, string *outs,
1320 Context *onfinish)
1321 {
1322 ldout(cct,10) << __func__ << " cmd=" << cmd << dendl;
1323 std::lock_guard l(monc_lock);
1324 if (!initialized || stopping) {
1325 if (onfinish) {
1326 onfinish->complete(-ECANCELED);
1327 }
1328 return;
1329 }
1330 MonCommand *r = new MonCommand(++last_mon_command_tid);
1331 r->cmd = cmd;
1332 r->inbl = inbl;
1333 r->poutbl = outbl;
1334 r->prs = outs;
1335 r->onfinish = onfinish;
1336 auto timeout = cct->_conf.get_val<std::chrono::seconds>("rados_mon_op_timeout");
1337 if (timeout.count() > 0) {
1338 class C_CancelMonCommand : public Context
1339 {
1340 uint64_t tid;
1341 MonClient *monc;
1342 public:
1343 C_CancelMonCommand(uint64_t tid, MonClient *monc) : tid(tid), monc(monc) {}
1344 void finish(int r) override {
1345 monc->_cancel_mon_command(tid);
1346 }
1347 };
1348 r->ontimeout = new C_CancelMonCommand(r->tid, this);
1349 timer.add_event_after(static_cast<double>(timeout.count()), r->ontimeout);
1350 }
1351 mon_commands[r->tid] = r;
1352 _send_command(r);
1353 }
1354
1355 void MonClient::start_mon_command(const string &mon_name,
1356 const std::vector<string>& cmd,
1357 const ceph::buffer::list& inbl,
1358 ceph::buffer::list *outbl, string *outs,
1359 Context *onfinish)
1360 {
1361 ldout(cct,10) << __func__ << " mon." << mon_name << " cmd=" << cmd << dendl;
1362 std::lock_guard l(monc_lock);
1363 if (!initialized || stopping) {
1364 if (onfinish) {
1365 onfinish->complete(-ECANCELED);
1366 }
1367 return;
1368 }
1369 MonCommand *r = new MonCommand(++last_mon_command_tid);
1370
1371 // detect/tolerate mon *rank* passed as a string
1372 string err;
1373 int rank = strict_strtoll(mon_name.c_str(), 10, &err);
1374 if (err.size() == 0 && rank >= 0) {
1375 ldout(cct,10) << __func__ << " interpreting name '" << mon_name
1376 << "' as rank " << rank << dendl;
1377 r->target_rank = rank;
1378 } else {
1379 r->target_name = mon_name;
1380 }
1381 r->cmd = cmd;
1382 r->inbl = inbl;
1383 r->poutbl = outbl;
1384 r->prs = outs;
1385 r->onfinish = onfinish;
1386 mon_commands[r->tid] = r;
1387 _send_command(r);
1388 }
1389
1390 void MonClient::start_mon_command(int rank,
1391 const std::vector<string>& cmd,
1392 const ceph::buffer::list& inbl,
1393 ceph::buffer::list *outbl, string *outs,
1394 Context *onfinish)
1395 {
1396 ldout(cct,10) << __func__ << " rank " << rank << " cmd=" << cmd << dendl;
1397 std::lock_guard l(monc_lock);
1398 if (!initialized || stopping) {
1399 if (onfinish) {
1400 onfinish->complete(-ECANCELED);
1401 }
1402 return;
1403 }
1404 MonCommand *r = new MonCommand(++last_mon_command_tid);
1405 r->target_rank = rank;
1406 r->cmd = cmd;
1407 r->inbl = inbl;
1408 r->poutbl = outbl;
1409 r->prs = outs;
1410 r->onfinish = onfinish;
1411 mon_commands[r->tid] = r;
1412 _send_command(r);
1413 }
1414
1415 // ---------
1416
1417 void MonClient::get_version(string map, version_t *newest, version_t *oldest, Context *onfinish)
1418 {
1419 version_req_d *req = new version_req_d(onfinish, newest, oldest);
1420 ldout(cct, 10) << "get_version " << map << " req " << req << dendl;
1421 std::lock_guard l(monc_lock);
1422 auto m = ceph::make_message<MMonGetVersion>();
1423 m->what = map;
1424 m->handle = ++version_req_id;
1425 version_requests[m->handle] = req;
1426 _send_mon_message(std::move(m));
1427 }
1428
1429 void MonClient::handle_get_version_reply(MMonGetVersionReply* m)
1430 {
1431 ceph_assert(ceph_mutex_is_locked(monc_lock));
1432 auto iter = version_requests.find(m->handle);
1433 if (iter == version_requests.end()) {
1434 ldout(cct, 0) << __func__ << " version request with handle " << m->handle
1435 << " not found" << dendl;
1436 } else {
1437 version_req_d *req = iter->second;
1438 ldout(cct, 10) << __func__ << " finishing " << req << " version " << m->version << dendl;
1439 version_requests.erase(iter);
1440 if (req->newest)
1441 *req->newest = m->version;
1442 if (req->oldest)
1443 *req->oldest = m->oldest_version;
1444 finisher.queue(req->context, 0);
1445 delete req;
1446 }
1447 m->put();
1448 }
1449
1450 int MonClient::get_auth_request(
1451 Connection *con,
1452 AuthConnectionMeta *auth_meta,
1453 uint32_t *auth_method,
1454 std::vector<uint32_t> *preferred_modes,
1455 ceph::buffer::list *bl)
1456 {
1457 std::lock_guard l(monc_lock);
1458 ldout(cct,10) << __func__ << " con " << con << " auth_method " << *auth_method
1459 << dendl;
1460
1461 // connection to mon?
1462 if (con->get_peer_type() == CEPH_ENTITY_TYPE_MON) {
1463 ceph_assert(!auth_meta->authorizer);
1464 if (con->is_anon()) {
1465 for (auto& i : mon_commands) {
1466 if (i.second->target_con == con) {
1467 return i.second->target_session->get_auth_request(
1468 auth_method, preferred_modes, bl,
1469 entity_name, want_keys, rotating_secrets.get());
1470 }
1471 }
1472 }
1473 for (auto& i : pending_cons) {
1474 if (i.second.is_con(con)) {
1475 return i.second.get_auth_request(
1476 auth_method, preferred_modes, bl,
1477 entity_name, want_keys, rotating_secrets.get());
1478 }
1479 }
1480 return -ENOENT;
1481 }
1482
1483 // generate authorizer
1484 if (!auth) {
1485 lderr(cct) << __func__ << " but no auth handler is set up" << dendl;
1486 return -EACCES;
1487 }
1488 auth_meta->authorizer.reset(auth->build_authorizer(con->get_peer_type()));
1489 if (!auth_meta->authorizer) {
1490 lderr(cct) << __func__ << " failed to build_authorizer for type "
1491 << ceph_entity_type_name(con->get_peer_type()) << dendl;
1492 return -EACCES;
1493 }
1494 auth_meta->auth_method = auth_meta->authorizer->protocol;
1495 auth_registry.get_supported_modes(con->get_peer_type(),
1496 auth_meta->auth_method,
1497 preferred_modes);
1498 *bl = auth_meta->authorizer->bl;
1499 return 0;
1500 }
1501
1502 int MonClient::handle_auth_reply_more(
1503 Connection *con,
1504 AuthConnectionMeta *auth_meta,
1505 const ceph::buffer::list& bl,
1506 ceph::buffer::list *reply)
1507 {
1508 std::lock_guard l(monc_lock);
1509
1510 if (con->get_peer_type() == CEPH_ENTITY_TYPE_MON) {
1511 if (con->is_anon()) {
1512 for (auto& i : mon_commands) {
1513 if (i.second->target_con == con) {
1514 return i.second->target_session->handle_auth_reply_more(
1515 auth_meta, bl, reply);
1516 }
1517 }
1518 }
1519 for (auto& i : pending_cons) {
1520 if (i.second.is_con(con)) {
1521 return i.second.handle_auth_reply_more(auth_meta, bl, reply);
1522 }
1523 }
1524 return -ENOENT;
1525 }
1526
1527 // authorizer challenges
1528 if (!auth || !auth_meta->authorizer) {
1529 lderr(cct) << __func__ << " no authorizer?" << dendl;
1530 return -1;
1531 }
1532 auth_meta->authorizer->add_challenge(cct, bl);
1533 *reply = auth_meta->authorizer->bl;
1534 return 0;
1535 }
1536
1537 int MonClient::handle_auth_done(
1538 Connection *con,
1539 AuthConnectionMeta *auth_meta,
1540 uint64_t global_id,
1541 uint32_t con_mode,
1542 const ceph::buffer::list& bl,
1543 CryptoKey *session_key,
1544 std::string *connection_secret)
1545 {
1546 if (con->get_peer_type() == CEPH_ENTITY_TYPE_MON) {
1547 std::lock_guard l(monc_lock);
1548 if (con->is_anon()) {
1549 for (auto& i : mon_commands) {
1550 if (i.second->target_con == con) {
1551 return i.second->target_session->handle_auth_done(
1552 auth_meta, global_id, bl,
1553 session_key, connection_secret);
1554 }
1555 }
1556 }
1557 for (auto& i : pending_cons) {
1558 if (i.second.is_con(con)) {
1559 int r = i.second.handle_auth_done(
1560 auth_meta, global_id, bl,
1561 session_key, connection_secret);
1562 if (r) {
1563 pending_cons.erase(i.first);
1564 if (!pending_cons.empty()) {
1565 return r;
1566 }
1567 } else {
1568 active_con.reset(new MonConnection(std::move(i.second)));
1569 pending_cons.clear();
1570 ceph_assert(active_con->have_session());
1571 }
1572
1573 _finish_hunting(r);
1574 if (r || monmap.get_epoch() > 0) {
1575 _finish_auth(r);
1576 }
1577 return r;
1578 }
1579 }
1580 return -ENOENT;
1581 } else {
1582 // verify authorizer reply
1583 auto p = bl.begin();
1584 if (!auth_meta->authorizer->verify_reply(p, &auth_meta->connection_secret)) {
1585 ldout(cct, 0) << __func__ << " failed verifying authorizer reply"
1586 << dendl;
1587 return -EACCES;
1588 }
1589 auth_meta->session_key = auth_meta->authorizer->session_key;
1590 return 0;
1591 }
1592 }
1593
1594 int MonClient::handle_auth_bad_method(
1595 Connection *con,
1596 AuthConnectionMeta *auth_meta,
1597 uint32_t old_auth_method,
1598 int result,
1599 const std::vector<uint32_t>& allowed_methods,
1600 const std::vector<uint32_t>& allowed_modes)
1601 {
1602 auth_meta->allowed_methods = allowed_methods;
1603
1604 std::lock_guard l(monc_lock);
1605 if (con->get_peer_type() == CEPH_ENTITY_TYPE_MON) {
1606 if (con->is_anon()) {
1607 for (auto& i : mon_commands) {
1608 if (i.second->target_con == con) {
1609 int r = i.second->target_session->handle_auth_bad_method(
1610 old_auth_method,
1611 result,
1612 allowed_methods,
1613 allowed_modes);
1614 if (r < 0) {
1615 _finish_command(i.second, r, "auth failed");
1616 }
1617 return r;
1618 }
1619 }
1620 }
1621 for (auto& i : pending_cons) {
1622 if (i.second.is_con(con)) {
1623 int r = i.second.handle_auth_bad_method(old_auth_method,
1624 result,
1625 allowed_methods,
1626 allowed_modes);
1627 if (r == 0) {
1628 return r; // try another method on this con
1629 }
1630 pending_cons.erase(i.first);
1631 if (!pending_cons.empty()) {
1632 return r; // fail this con, maybe another con will succeed
1633 }
1634 // fail hunt
1635 _finish_hunting(r);
1636 _finish_auth(r);
1637 return r;
1638 }
1639 }
1640 return -ENOENT;
1641 } else {
1642 // huh...
1643 ldout(cct,10) << __func__ << " hmm, they didn't like " << old_auth_method
1644 << " result " << cpp_strerror(result)
1645 << " and auth is " << (auth ? auth->get_protocol() : 0)
1646 << dendl;
1647 return -EACCES;
1648 }
1649 }
1650
1651 int MonClient::handle_auth_request(
1652 Connection *con,
1653 AuthConnectionMeta *auth_meta,
1654 bool more,
1655 uint32_t auth_method,
1656 const ceph::buffer::list& payload,
1657 ceph::buffer::list *reply)
1658 {
1659 if (payload.length() == 0) {
1660 // for some channels prior to nautilus (osd heartbeat), we
1661 // tolerate the lack of an authorizer.
1662 if (!con->get_messenger()->require_authorizer) {
1663 handle_authentication_dispatcher->ms_handle_authentication(con);
1664 return 1;
1665 }
1666 return -EACCES;
1667 }
1668 auth_meta->auth_mode = payload[0];
1669 if (auth_meta->auth_mode < AUTH_MODE_AUTHORIZER ||
1670 auth_meta->auth_mode > AUTH_MODE_AUTHORIZER_MAX) {
1671 return -EACCES;
1672 }
1673 AuthAuthorizeHandler *ah = get_auth_authorize_handler(con->get_peer_type(),
1674 auth_method);
1675 if (!ah) {
1676 lderr(cct) << __func__ << " no AuthAuthorizeHandler found for auth method "
1677 << auth_method << dendl;
1678 return -EOPNOTSUPP;
1679 }
1680
1681 auto ac = &auth_meta->authorizer_challenge;
1682 if (auth_meta->skip_authorizer_challenge) {
1683 ldout(cct, 10) << __func__ << " skipping challenge on " << con << dendl;
1684 ac = nullptr;
1685 }
1686
1687 bool was_challenge = (bool)auth_meta->authorizer_challenge;
1688 bool isvalid = ah->verify_authorizer(
1689 cct,
1690 *rotating_secrets,
1691 payload,
1692 auth_meta->get_connection_secret_length(),
1693 reply,
1694 &con->peer_name,
1695 &con->peer_global_id,
1696 &con->peer_caps_info,
1697 &auth_meta->session_key,
1698 &auth_meta->connection_secret,
1699 ac);
1700 if (isvalid) {
1701 handle_authentication_dispatcher->ms_handle_authentication(con);
1702 return 1;
1703 }
1704 if (!more && !was_challenge && auth_meta->authorizer_challenge) {
1705 ldout(cct,10) << __func__ << " added challenge on " << con << dendl;
1706 return 0;
1707 }
1708 ldout(cct,10) << __func__ << " bad authorizer on " << con << dendl;
1709 // discard old challenge
1710 auth_meta->authorizer_challenge.reset();
1711 return -EACCES;
1712 }
1713
1714 AuthAuthorizer* MonClient::build_authorizer(int service_id) const {
1715 std::lock_guard l(monc_lock);
1716 if (auth) {
1717 return auth->build_authorizer(service_id);
1718 } else {
1719 ldout(cct, 0) << __func__ << " for " << ceph_entity_type_name(service_id)
1720 << ", but no auth is available now" << dendl;
1721 return nullptr;
1722 }
1723 }
1724
1725 #define dout_subsys ceph_subsys_monc
1726 #undef dout_prefix
1727 #define dout_prefix *_dout << "monclient" << (have_session() ? ": " : "(hunting): ")
1728
1729 MonConnection::MonConnection(
1730 CephContext *cct, ConnectionRef con, uint64_t global_id,
1731 AuthRegistry *ar)
1732 : cct(cct), con(con), global_id(global_id), auth_registry(ar)
1733 {}
1734
1735 MonConnection::~MonConnection()
1736 {
1737 if (con) {
1738 con->mark_down();
1739 con.reset();
1740 }
1741 }
1742
1743 bool MonConnection::have_session() const
1744 {
1745 return state == State::HAVE_SESSION;
1746 }
1747
1748 void MonConnection::start(epoch_t epoch,
1749 const EntityName& entity_name)
1750 {
1751 using ceph::encode;
1752 auth_start = ceph_clock_now();
1753
1754 if (con->get_peer_addr().is_msgr2()) {
1755 ldout(cct, 10) << __func__ << " opening mon connection" << dendl;
1756 state = State::AUTHENTICATING;
1757 con->send_message(new MMonGetMap());
1758 return;
1759 }
1760
1761 // restart authentication handshake
1762 state = State::NEGOTIATING;
1763
1764 // send an initial keepalive to ensure our timestamp is valid by the
1765 // time we are in an OPENED state (by sequencing this before
1766 // authentication).
1767 con->send_keepalive();
1768
1769 auto m = new MAuth;
1770 m->protocol = CEPH_AUTH_UNKNOWN;
1771 m->monmap_epoch = epoch;
1772 __u8 struct_v = 1;
1773 encode(struct_v, m->auth_payload);
1774 std::vector<uint32_t> auth_supported;
1775 auth_registry->get_supported_methods(con->get_peer_type(), &auth_supported);
1776 encode(auth_supported, m->auth_payload);
1777 encode(entity_name, m->auth_payload);
1778 encode(global_id, m->auth_payload);
1779 con->send_message(m);
1780 }
1781
1782 int MonConnection::get_auth_request(
1783 uint32_t *method,
1784 std::vector<uint32_t> *preferred_modes,
1785 ceph::buffer::list *bl,
1786 const EntityName& entity_name,
1787 uint32_t want_keys,
1788 RotatingKeyRing* keyring)
1789 {
1790 using ceph::encode;
1791 // choose method
1792 if (auth_method < 0) {
1793 std::vector<uint32_t> as;
1794 auth_registry->get_supported_methods(con->get_peer_type(), &as);
1795 if (as.empty()) {
1796 return -EACCES;
1797 }
1798 auth_method = as.front();
1799 }
1800 *method = auth_method;
1801 auth_registry->get_supported_modes(con->get_peer_type(), auth_method,
1802 preferred_modes);
1803 ldout(cct,10) << __func__ << " method " << *method
1804 << " preferred_modes " << *preferred_modes << dendl;
1805 if (preferred_modes->empty()) {
1806 return -EACCES;
1807 }
1808
1809 int r = _init_auth(*method, entity_name, want_keys, keyring, true);
1810 ceph_assert(r == 0);
1811
1812 // initial requset includes some boilerplate...
1813 encode((char)AUTH_MODE_MON, *bl);
1814 encode(entity_name, *bl);
1815 encode(global_id, *bl);
1816
1817 // and (maybe) some method-specific initial payload
1818 auth->build_initial_request(bl);
1819
1820 return 0;
1821 }
1822
1823 int MonConnection::handle_auth_reply_more(
1824 AuthConnectionMeta *auth_meta,
1825 const ceph::buffer::list& bl,
1826 ceph::buffer::list *reply)
1827 {
1828 ldout(cct, 10) << __func__ << " payload " << bl.length() << dendl;
1829 ldout(cct, 30) << __func__ << " got\n";
1830 bl.hexdump(*_dout);
1831 *_dout << dendl;
1832
1833 auto p = bl.cbegin();
1834 ldout(cct, 10) << __func__ << " payload_len " << bl.length() << dendl;
1835 int r = auth->handle_response(0, p, &auth_meta->session_key,
1836 &auth_meta->connection_secret);
1837 if (r == -EAGAIN) {
1838 auth->prepare_build_request();
1839 auth->build_request(*reply);
1840 ldout(cct, 10) << __func__ << " responding with " << reply->length()
1841 << " bytes" << dendl;
1842 r = 0;
1843 } else if (r < 0) {
1844 lderr(cct) << __func__ << " handle_response returned " << r << dendl;
1845 } else {
1846 ldout(cct, 10) << __func__ << " authenticated!" << dendl;
1847 // FIXME
1848 ceph_abort(cct, "write me");
1849 }
1850 return r;
1851 }
1852
1853 int MonConnection::handle_auth_done(
1854 AuthConnectionMeta *auth_meta,
1855 uint64_t new_global_id,
1856 const ceph::buffer::list& bl,
1857 CryptoKey *session_key,
1858 std::string *connection_secret)
1859 {
1860 ldout(cct,10) << __func__ << " global_id " << new_global_id
1861 << " payload " << bl.length()
1862 << dendl;
1863 global_id = new_global_id;
1864 auth->set_global_id(global_id);
1865 auto p = bl.begin();
1866 int auth_err = auth->handle_response(0, p, &auth_meta->session_key,
1867 &auth_meta->connection_secret);
1868 if (auth_err >= 0) {
1869 state = State::HAVE_SESSION;
1870 }
1871 con->set_last_keepalive_ack(auth_start);
1872
1873 if (pending_tell_command) {
1874 con->send_message2(std::move(pending_tell_command));
1875 }
1876 return auth_err;
1877 }
1878
1879 int MonConnection::handle_auth_bad_method(
1880 uint32_t old_auth_method,
1881 int result,
1882 const std::vector<uint32_t>& allowed_methods,
1883 const std::vector<uint32_t>& allowed_modes)
1884 {
1885 ldout(cct,10) << __func__ << " old_auth_method " << old_auth_method
1886 << " result " << cpp_strerror(result)
1887 << " allowed_methods " << allowed_methods << dendl;
1888 std::vector<uint32_t> auth_supported;
1889 auth_registry->get_supported_methods(con->get_peer_type(), &auth_supported);
1890 auto p = std::find(auth_supported.begin(), auth_supported.end(),
1891 old_auth_method);
1892 assert(p != auth_supported.end());
1893 p = std::find_first_of(std::next(p), auth_supported.end(),
1894 allowed_methods.begin(), allowed_methods.end());
1895 if (p == auth_supported.end()) {
1896 lderr(cct) << __func__ << " server allowed_methods " << allowed_methods
1897 << " but i only support " << auth_supported << dendl;
1898 return -EACCES;
1899 }
1900 auth_method = *p;
1901 ldout(cct,10) << __func__ << " will try " << auth_method << " next" << dendl;
1902 return 0;
1903 }
1904
1905 int MonConnection::handle_auth(MAuthReply* m,
1906 const EntityName& entity_name,
1907 uint32_t want_keys,
1908 RotatingKeyRing* keyring)
1909 {
1910 if (state == State::NEGOTIATING) {
1911 int r = _negotiate(m, entity_name, want_keys, keyring);
1912 if (r) {
1913 return r;
1914 }
1915 state = State::AUTHENTICATING;
1916 }
1917 int r = authenticate(m);
1918 if (!r) {
1919 state = State::HAVE_SESSION;
1920 }
1921 return r;
1922 }
1923
1924 int MonConnection::_negotiate(MAuthReply *m,
1925 const EntityName& entity_name,
1926 uint32_t want_keys,
1927 RotatingKeyRing* keyring)
1928 {
1929 int r = _init_auth(m->protocol, entity_name, want_keys, keyring, false);
1930 if (r == -ENOTSUP) {
1931 if (m->result == -ENOTSUP) {
1932 ldout(cct, 10) << "none of our auth protocols are supported by the server"
1933 << dendl;
1934 }
1935 return m->result;
1936 }
1937 return r;
1938 }
1939
1940 int MonConnection::_init_auth(
1941 uint32_t method,
1942 const EntityName& entity_name,
1943 uint32_t want_keys,
1944 RotatingKeyRing* keyring,
1945 bool msgr2)
1946 {
1947 ldout(cct, 10) << __func__ << " method " << method << dendl;
1948 if (auth && auth->get_protocol() == (int)method) {
1949 ldout(cct, 10) << __func__ << " already have auth, reseting" << dendl;
1950 auth->reset();
1951 return 0;
1952 }
1953
1954 ldout(cct, 10) << __func__ << " creating new auth" << dendl;
1955 auth.reset(AuthClientHandler::create(cct, method, keyring));
1956 if (!auth) {
1957 ldout(cct, 10) << " no handler for protocol " << method << dendl;
1958 return -ENOTSUP;
1959 }
1960
1961 // do not request MGR key unless the mon has the SERVER_KRAKEN
1962 // feature. otherwise it will give us an auth error. note that
1963 // we have to use the FEATUREMASK because pre-jewel the kraken
1964 // feature bit was used for something else.
1965 if (!msgr2 &&
1966 (want_keys & CEPH_ENTITY_TYPE_MGR) &&
1967 !(con->has_features(CEPH_FEATUREMASK_SERVER_KRAKEN))) {
1968 ldout(cct, 1) << __func__
1969 << " not requesting MGR keys from pre-kraken monitor"
1970 << dendl;
1971 want_keys &= ~CEPH_ENTITY_TYPE_MGR;
1972 }
1973 auth->set_want_keys(want_keys);
1974 auth->init(entity_name);
1975 auth->set_global_id(global_id);
1976 return 0;
1977 }
1978
1979 int MonConnection::authenticate(MAuthReply *m)
1980 {
1981 ceph_assert(auth);
1982 if (!m->global_id) {
1983 ldout(cct, 1) << "peer sent an invalid global_id" << dendl;
1984 }
1985 if (m->global_id != global_id) {
1986 // it's a new session
1987 auth->reset();
1988 global_id = m->global_id;
1989 auth->set_global_id(global_id);
1990 ldout(cct, 10) << "my global_id is " << m->global_id << dendl;
1991 }
1992 auto p = m->result_bl.cbegin();
1993 int ret = auth->handle_response(m->result, p, nullptr, nullptr);
1994 if (ret == -EAGAIN) {
1995 auto ma = new MAuth;
1996 ma->protocol = auth->get_protocol();
1997 auth->prepare_build_request();
1998 auth->build_request(ma->auth_payload);
1999 con->send_message(ma);
2000 }
2001 if (ret == 0 && pending_tell_command) {
2002 con->send_message2(std::move(pending_tell_command));
2003 }
2004
2005 return ret;
2006 }
2007
2008 void MonClient::register_config_callback(md_config_t::config_callback fn) {
2009 ceph_assert(!config_cb);
2010 config_cb = fn;
2011 }
2012
2013 md_config_t::config_callback MonClient::get_config_callback() {
2014 return config_cb;
2015 }