]> git.proxmox.com Git - ceph.git/blame - ceph/src/mon/MonClient.cc
import ceph pacific 16.2.5
[ceph.git] / ceph / src / mon / MonClient.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
11fdf7f2
TL
15#include <algorithm>
16#include <iterator>
7c673cae 17#include <random>
9f95a23c
TL
18#include <boost/range/adaptor/map.hpp>
19#include <boost/range/adaptor/filtered.hpp>
20#include <boost/range/algorithm/copy.hpp>
21#include <boost/range/algorithm_ext/copy_n.hpp>
22#include "common/weighted_shuffle.h"
7c673cae 23
f67539c2 24#include "include/random.h"
c07f9fc5 25#include "include/scope_guard.h"
11fdf7f2 26#include "include/stringify.h"
c07f9fc5 27
7c673cae
FG
28#include "messages/MMonGetMap.h"
29#include "messages/MMonGetVersion.h"
f67539c2 30#include "messages/MMonGetMap.h"
7c673cae
FG
31#include "messages/MMonGetVersionReply.h"
32#include "messages/MMonMap.h"
11fdf7f2
TL
33#include "messages/MConfig.h"
34#include "messages/MGetConfig.h"
7c673cae
FG
35#include "messages/MAuth.h"
36#include "messages/MLogAck.h"
37#include "messages/MAuthReply.h"
38#include "messages/MMonCommand.h"
39#include "messages/MMonCommandAck.h"
9f95a23c
TL
40#include "messages/MCommand.h"
41#include "messages/MCommandReply.h"
7c673cae
FG
42#include "messages/MPing.h"
43
44#include "messages/MMonSubscribe.h"
45#include "messages/MMonSubscribeAck.h"
46#include "common/errno.h"
11fdf7f2 47#include "common/hostname.h"
7c673cae
FG
48#include "common/LogClient.h"
49
50#include "MonClient.h"
f67539c2 51#include "error_code.h"
7c673cae
FG
52#include "MonMap.h"
53
54#include "auth/Auth.h"
55#include "auth/KeyRing.h"
56#include "auth/AuthClientHandler.h"
11fdf7f2 57#include "auth/AuthRegistry.h"
7c673cae
FG
58#include "auth/RotatingKeyRing.h"
59
7c673cae
FG
60#define dout_subsys ceph_subsys_monc
61#undef dout_prefix
62#define dout_prefix *_dout << "monclient" << (_hunting() ? "(hunting)":"") << ": "
63
f67539c2 64namespace bs = boost::system;
9f95a23c 65using std::string;
f67539c2 66using namespace std::literals;
9f95a23c 67
f67539c2 68MonClient::MonClient(CephContext *cct_, boost::asio::io_context& service) :
7c673cae 69 Dispatcher(cct_),
11fdf7f2 70 AuthServer(cct_),
7c673cae 71 messenger(NULL),
31f18b77 72 timer(cct_, monc_lock),
f67539c2 73 service(service),
7c673cae 74 initialized(false),
7c673cae
FG
75 log_client(NULL),
76 more_log_pending(false),
77 want_monmap(true),
78 had_a_connection(false),
c07f9fc5 79 reopen_interval_multiplier(
11fdf7f2 80 cct_->_conf.get_val<double>("mon_client_hunt_interval_min_multiple")),
7c673cae
FG
81 last_mon_command_tid(0),
82 version_req_id(0)
11fdf7f2 83{}
7c673cae
FG
84
85MonClient::~MonClient()
86{
87}
88
89int MonClient::build_initial_monmap()
90{
91 ldout(cct, 10) << __func__ << dendl;
9f95a23c 92 int r = monmap.build_initial(cct, false, std::cerr);
11fdf7f2
TL
93 ldout(cct,10) << "monmap:\n";
94 monmap.print(*_dout);
95 *_dout << dendl;
96 return r;
7c673cae
FG
97}
98
99int MonClient::get_monmap()
100{
101 ldout(cct, 10) << __func__ << dendl;
9f95a23c 102 std::unique_lock l(monc_lock);
f67539c2 103
11fdf7f2 104 sub.want("monmap", 0, 0);
7c673cae
FG
105 if (!_opened())
106 _reopen_session();
9f95a23c 107 map_cond.wait(l, [this] { return !want_monmap; });
7c673cae
FG
108 ldout(cct, 10) << __func__ << " done" << dendl;
109 return 0;
110}
111
11fdf7f2 112int MonClient::get_monmap_and_config()
7c673cae
FG
113{
114 ldout(cct, 10) << __func__ << dendl;
11fdf7f2 115 ceph_assert(!messenger);
7c673cae 116
11fdf7f2 117 int tries = 10;
7c673cae 118
11fdf7f2
TL
119 cct->init_crypto();
120 auto shutdown_crypto = make_scope_guard([this] {
121 cct->shutdown_crypto();
122 });
7c673cae 123
11fdf7f2
TL
124 int r = build_initial_monmap();
125 if (r < 0) {
126 lderr(cct) << __func__ << " cannot identify monitors to contact" << dendl;
127 return r;
7c673cae
FG
128 }
129
11fdf7f2
TL
130 messenger = Messenger::create_client_messenger(
131 cct, "temp_mon_client");
132 ceph_assert(messenger);
133 messenger->add_dispatcher_head(this);
134 messenger->start();
135 auto shutdown_msgr = make_scope_guard([this] {
7c673cae 136 messenger->shutdown();
11fdf7f2 137 messenger->wait();
7c673cae 138 delete messenger;
11fdf7f2
TL
139 messenger = nullptr;
140 if (!monmap.fsid.is_zero()) {
141 cct->_conf.set_val("fsid", stringify(monmap.fsid));
142 }
143 });
7c673cae 144
f67539c2
TL
145 want_bootstrap_config = true;
146 auto shutdown_config = make_scope_guard([this] {
147 std::unique_lock l(monc_lock);
148 want_bootstrap_config = false;
149 bootstrap_config.reset();
150 });
151
152 ceph::ref_t<MConfig> config;
11fdf7f2
TL
153 while (tries-- > 0) {
154 r = init();
155 if (r < 0) {
156 return r;
157 }
158 r = authenticate(cct->_conf->client_mount_timeout);
159 if (r == -ETIMEDOUT) {
160 shutdown();
161 continue;
162 }
163 if (r < 0) {
164 break;
165 }
166 {
9f95a23c 167 std::unique_lock l(monc_lock);
11fdf7f2
TL
168 if (monmap.get_epoch() &&
169 !monmap.persistent_features.contains_all(
170 ceph::features::mon::FEATURE_MIMIC)) {
171 ldout(cct,10) << __func__ << " pre-mimic monitor, no config to fetch"
172 << dendl;
173 r = 0;
174 break;
175 }
f67539c2 176 while ((!bootstrap_config || monmap.get_epoch() == 0) && r == 0) {
11fdf7f2 177 ldout(cct,20) << __func__ << " waiting for monmap|config" << dendl;
c5c27e9a
TL
178 auto status = map_cond.wait_for(l, ceph::make_timespan(
179 cct->_conf->mon_client_hunt_interval));
180 if (status == std::cv_status::timeout) {
181 r = -ETIMEDOUT;
182 }
11fdf7f2 183 }
f67539c2
TL
184
185 if (bootstrap_config) {
11fdf7f2 186 ldout(cct,10) << __func__ << " success" << dendl;
f67539c2 187 config = std::move(bootstrap_config);
11fdf7f2
TL
188 r = 0;
189 break;
190 }
191 }
192 lderr(cct) << __func__ << " failed to get config" << dendl;
193 shutdown();
194 continue;
195 }
7c673cae 196
f67539c2
TL
197 if (config) {
198 // apply the bootstrap config to ensure its applied prior to completing
199 // the bootstrap
200 cct->_conf.set_mon_vals(cct, config->config, config_cb);
201 }
202
11fdf7f2
TL
203 shutdown();
204 return r;
7c673cae
FG
205}
206
207
208/**
209 * Ping the monitor with id @p mon_id and set the resulting reply in
210 * the provided @p result_reply, if this last parameter is not NULL.
211 *
212 * So that we don't rely on the MonClient's default messenger, set up
213 * during connect(), we create our own messenger to comunicate with the
214 * specified monitor. This is advantageous in the following ways:
215 *
216 * - Isolate the ping procedure from the rest of the MonClient's operations,
217 * allowing us to not acquire or manage the big monc_lock, thus not
218 * having to block waiting for some other operation to finish before we
219 * can proceed.
220 * * for instance, we can ping mon.FOO even if we are currently hunting
221 * or blocked waiting for auth to complete with mon.BAR.
222 *
223 * - Ping a monitor prior to establishing a connection (using connect())
224 * and properly establish the MonClient's messenger. This frees us
225 * from dealing with the complex foo that happens in connect().
226 *
227 * We also don't rely on MonClient as a dispatcher for this messenger,
228 * unlike what happens with the MonClient's default messenger. This allows
229 * us to sandbox the whole ping, having it much as a separate entity in
230 * the MonClient class, considerably simplifying the handling and dispatching
231 * of messages without needing to consider monc_lock.
232 *
233 * Current drawback is that we will establish a messenger for each ping
234 * we want to issue, instead of keeping a single messenger instance that
235 * would be used for all pings.
236 */
237int MonClient::ping_monitor(const string &mon_id, string *result_reply)
238{
239 ldout(cct, 10) << __func__ << dendl;
240
241 string new_mon_id;
242 if (monmap.contains("noname-"+mon_id)) {
243 new_mon_id = "noname-"+mon_id;
244 } else {
245 new_mon_id = mon_id;
246 }
247
248 if (new_mon_id.empty()) {
249 ldout(cct, 10) << __func__ << " specified mon id is empty!" << dendl;
250 return -EINVAL;
251 } else if (!monmap.contains(new_mon_id)) {
252 ldout(cct, 10) << __func__ << " no such monitor 'mon." << new_mon_id << "'"
253 << dendl;
254 return -ENOENT;
255 }
256
11fdf7f2
TL
257 // N.B. monc isn't initialized
258
259 auth_registry.refresh_config();
260
261 KeyRing keyring;
262 keyring.from_ceph_context(cct);
263 RotatingKeyRing rkeyring(cct, cct->get_module_type(), &keyring);
264
265 MonClientPinger *pinger = new MonClientPinger(cct,
266 &rkeyring,
267 result_reply);
7c673cae
FG
268
269 Messenger *smsgr = Messenger::create_client_messenger(cct, "temp_ping_client");
270 smsgr->add_dispatcher_head(pinger);
11fdf7f2 271 smsgr->set_auth_client(pinger);
7c673cae
FG
272 smsgr->start();
273
11fdf7f2 274 ConnectionRef con = smsgr->connect_to_mon(monmap.get_addrs(new_mon_id));
7c673cae
FG
275 ldout(cct, 10) << __func__ << " ping mon." << new_mon_id
276 << " " << con->get_peer_addr() << dendl;
11fdf7f2
TL
277
278 pinger->mc.reset(new MonConnection(cct, con, 0, &auth_registry));
279 pinger->mc->start(monmap.get_epoch(), entity_name);
7c673cae
FG
280 con->send_message(new MPing);
281
11fdf7f2 282 int ret = pinger->wait_for_reply(cct->_conf->mon_client_ping_timeout);
7c673cae
FG
283 if (ret == 0) {
284 ldout(cct,10) << __func__ << " got ping reply" << dendl;
285 } else {
286 ret = -ret;
287 }
7c673cae
FG
288
289 con->mark_down();
11fdf7f2 290 pinger->mc.reset();
7c673cae
FG
291 smsgr->shutdown();
292 smsgr->wait();
293 delete smsgr;
294 delete pinger;
295 return ret;
296}
297
298bool MonClient::ms_dispatch(Message *m)
299{
7c673cae
FG
300 // we only care about these message types
301 switch (m->get_type()) {
302 case CEPH_MSG_MON_MAP:
303 case CEPH_MSG_AUTH_REPLY:
304 case CEPH_MSG_MON_SUBSCRIBE_ACK:
305 case CEPH_MSG_MON_GET_VERSION_REPLY:
306 case MSG_MON_COMMAND_ACK:
9f95a23c 307 case MSG_COMMAND_REPLY:
7c673cae 308 case MSG_LOGACK:
11fdf7f2 309 case MSG_CONFIG:
7c673cae 310 break;
11fdf7f2
TL
311 case CEPH_MSG_PING:
312 m->put();
313 return true;
7c673cae
FG
314 default:
315 return false;
316 }
317
11fdf7f2 318 std::lock_guard lock(monc_lock);
7c673cae 319
9f95a23c
TL
320 if (!m->get_connection()->is_anon() &&
321 m->get_source().type() == CEPH_ENTITY_TYPE_MON) {
322 if (_hunting()) {
323 auto p = _find_pending_con(m->get_connection());
324 if (p == pending_cons.end()) {
325 // ignore any messages outside hunting sessions
326 ldout(cct, 10) << "discarding stray monitor message " << *m << dendl;
327 m->put();
328 return true;
329 }
330 } else if (!active_con || active_con->get_con() != m->get_connection()) {
331 // ignore any messages outside our session(s)
7c673cae
FG
332 ldout(cct, 10) << "discarding stray monitor message " << *m << dendl;
333 m->put();
334 return true;
335 }
7c673cae
FG
336 }
337
338 switch (m->get_type()) {
339 case CEPH_MSG_MON_MAP:
340 handle_monmap(static_cast<MMonMap*>(m));
31f18b77
FG
341 if (passthrough_monmap) {
342 return false;
343 } else {
344 m->put();
345 }
7c673cae
FG
346 break;
347 case CEPH_MSG_AUTH_REPLY:
348 handle_auth(static_cast<MAuthReply*>(m));
349 break;
350 case CEPH_MSG_MON_SUBSCRIBE_ACK:
351 handle_subscribe_ack(static_cast<MMonSubscribeAck*>(m));
352 break;
353 case CEPH_MSG_MON_GET_VERSION_REPLY:
354 handle_get_version_reply(static_cast<MMonGetVersionReply*>(m));
355 break;
356 case MSG_MON_COMMAND_ACK:
357 handle_mon_command_ack(static_cast<MMonCommandAck*>(m));
358 break;
9f95a23c
TL
359 case MSG_COMMAND_REPLY:
360 if (m->get_connection()->is_anon() &&
361 m->get_source().type() == CEPH_ENTITY_TYPE_MON) {
362 // this connection is from 'tell'... ignore everything except our command
363 // reply. (we'll get misc other message because we authenticated, but we
364 // don't need them.)
365 handle_command_reply(static_cast<MCommandReply*>(m));
366 return true;
367 }
368 // leave the message for another dispatch handler (e.g., Objecter)
369 return false;
7c673cae
FG
370 case MSG_LOGACK:
371 if (log_client) {
372 log_client->handle_log_ack(static_cast<MLogAck*>(m));
373 m->put();
374 if (more_log_pending) {
375 send_log();
376 }
377 } else {
378 m->put();
379 }
380 break;
11fdf7f2
TL
381 case MSG_CONFIG:
382 handle_config(static_cast<MConfig*>(m));
383 break;
7c673cae
FG
384 }
385 return true;
386}
387
388void MonClient::send_log(bool flush)
389{
390 if (log_client) {
9f95a23c 391 auto lm = log_client->get_mon_log_message(flush);
7c673cae 392 if (lm)
9f95a23c 393 _send_mon_message(std::move(lm));
7c673cae
FG
394 more_log_pending = log_client->are_pending();
395 }
396}
397
398void MonClient::flush_log()
399{
11fdf7f2 400 std::lock_guard l(monc_lock);
7c673cae
FG
401 send_log();
402}
403
31f18b77
FG
404/* Unlike all the other message-handling functions, we don't put away a reference
405* because we want to support MMonMap passthrough to other Dispatchers. */
7c673cae
FG
406void MonClient::handle_monmap(MMonMap *m)
407{
408 ldout(cct, 10) << __func__ << " " << *m << dendl;
11fdf7f2
TL
409 auto con_addrs = m->get_source_addrs();
410 string old_name = monmap.get_name(con_addrs);
9f95a23c 411 const auto old_epoch = monmap.get_epoch();
7c673cae 412
11fdf7f2
TL
413 auto p = m->monmapbl.cbegin();
414 decode(monmap, p);
7c673cae
FG
415
416 ldout(cct, 10) << " got monmap " << monmap.epoch
11fdf7f2
TL
417 << " from mon." << old_name
418 << " (according to old e" << monmap.get_epoch() << ")"
419 << dendl;
7c673cae
FG
420 ldout(cct, 10) << "dump:\n";
421 monmap.print(*_dout);
422 *_dout << dendl;
423
9f95a23c
TL
424 if (old_epoch != monmap.get_epoch()) {
425 tried.clear();
426 }
11fdf7f2
TL
427 if (old_name.size() == 0) {
428 ldout(cct,10) << " can't identify which mon we were connected to" << dendl;
7c673cae 429 _reopen_session();
11fdf7f2 430 } else {
9f95a23c
TL
431 auto new_name = monmap.get_name(con_addrs);
432 if (new_name.empty()) {
433 ldout(cct, 10) << "mon." << old_name << " at " << con_addrs
11fdf7f2
TL
434 << " went away" << dendl;
435 // can't find the mon we were talking to (above)
436 _reopen_session();
437 } else if (messenger->should_use_msgr2() &&
9f95a23c 438 monmap.get_addrs(new_name).has_msgr2() &&
11fdf7f2 439 !con_addrs.has_msgr2()) {
9f95a23c
TL
440 ldout(cct,1) << " mon." << new_name << " has (v2) addrs "
441 << monmap.get_addrs(new_name) << " but i'm connected to "
11fdf7f2
TL
442 << con_addrs << ", reconnecting" << dendl;
443 _reopen_session();
444 }
7c673cae
FG
445 }
446
f91f0fd5
TL
447 cct->set_mon_addrs(monmap);
448
11fdf7f2 449 sub.got("monmap", monmap.get_epoch());
9f95a23c 450 map_cond.notify_all();
7c673cae 451 want_monmap = false;
11fdf7f2
TL
452
453 if (authenticate_err == 1) {
454 _finish_auth(0);
455 }
456}
457
458void MonClient::handle_config(MConfig *m)
459{
460 ldout(cct,10) << __func__ << " " << *m << dendl;
f67539c2
TL
461
462 if (want_bootstrap_config) {
463 // get_monmap_and_config is waiting for config which it will apply
464 // synchronously
465 bootstrap_config = ceph::ref_t<MConfig>(m, false);
466 map_cond.notify_all();
467 return;
468 }
469
470 // Take the sledgehammer approach to ensuring we don't depend on
471 // anything in MonClient.
472 boost::asio::post(finish_strand,
473 [m, cct = boost::intrusive_ptr<CephContext>(cct),
474 config_notify_cb = config_notify_cb,
475 config_cb = config_cb]() {
476 cct->_conf.set_mon_vals(cct.get(), m->config, config_cb);
477 if (config_notify_cb) {
478 config_notify_cb();
479 }
480 m->put();
481 });
7c673cae
FG
482}
483
484// ----------------------
485
486int MonClient::init()
487{
488 ldout(cct, 10) << __func__ << dendl;
489
7c673cae
FG
490 entity_name = cct->_conf->name;
491
11fdf7f2
TL
492 auth_registry.refresh_config();
493
494 std::lock_guard l(monc_lock);
495 keyring.reset(new KeyRing);
496 if (auth_registry.is_supported_method(messenger->get_mytype(),
497 CEPH_AUTH_CEPHX)) {
498 // this should succeed, because auth_registry just checked!
499 int r = keyring->from_ceph_context(cct);
500 if (r != 0) {
501 // but be somewhat graceful in case there was a race condition
502 lderr(cct) << "keyring not found" << dendl;
503 return r;
7c673cae
FG
504 }
505 }
11fdf7f2
TL
506 if (!auth_registry.any_supported_methods(messenger->get_mytype())) {
507 return -ENOENT;
7c673cae
FG
508 }
509
510 rotating_secrets.reset(
511 new RotatingKeyRing(cct, cct->get_module_type(), keyring.get()));
512
513 initialized = true;
514
11fdf7f2
TL
515 messenger->set_auth_client(this);
516 messenger->add_dispatcher_head(this);
517
7c673cae 518 timer.init();
7c673cae
FG
519 schedule_tick();
520
521 return 0;
522}
523
524void MonClient::shutdown()
525{
526 ldout(cct, 10) << __func__ << dendl;
9f95a23c 527 monc_lock.lock();
11fdf7f2 528 stopping = true;
7c673cae 529 while (!version_requests.empty()) {
f67539c2
TL
530 ceph::async::post(std::move(version_requests.begin()->second),
531 monc_errc::shutting_down, 0, 0);
7c673cae 532 ldout(cct, 20) << __func__ << " canceling and discarding version request "
f67539c2 533 << version_requests.begin()->first << dendl;
7c673cae
FG
534 version_requests.erase(version_requests.begin());
535 }
31f18b77
FG
536 while (!mon_commands.empty()) {
537 auto tid = mon_commands.begin()->first;
538 _cancel_mon_command(tid);
539 }
9f95a23c
TL
540 ldout(cct, 20) << __func__ << " discarding " << waiting_for_session.size()
541 << " pending message(s)" << dendl;
542 waiting_for_session.clear();
7c673cae
FG
543
544 active_con.reset();
545 pending_cons.clear();
c5c27e9a 546
7c673cae 547 auth.reset();
c5c27e9a
TL
548 global_id = 0;
549 authenticate_err = 0;
550 authenticated = false;
7c673cae 551
9f95a23c 552 monc_lock.unlock();
7c673cae
FG
553
554 if (initialized) {
11fdf7f2 555 initialized = false;
7c673cae 556 }
9f95a23c 557 monc_lock.lock();
7c673cae 558 timer.shutdown();
11fdf7f2 559 stopping = false;
9f95a23c 560 monc_lock.unlock();
7c673cae
FG
561}
562
563int MonClient::authenticate(double timeout)
564{
9f95a23c 565 std::unique_lock lock{monc_lock};
7c673cae
FG
566
567 if (active_con) {
568 ldout(cct, 5) << "already authenticated" << dendl;
569 return 0;
570 }
11fdf7f2
TL
571 sub.want("monmap", monmap.get_epoch() ? monmap.get_epoch() + 1 : 0, 0);
572 sub.want("config", 0, 0);
7c673cae
FG
573 if (!_opened())
574 _reopen_session();
575
9f95a23c
TL
576 auto until = ceph::real_clock::now();
577 until += ceph::make_timespan(timeout);
7c673cae
FG
578 if (timeout > 0.0)
579 ldout(cct, 10) << "authenticate will time out at " << until << dendl;
11fdf7f2 580 while (!active_con && authenticate_err >= 0) {
7c673cae 581 if (timeout > 0.0) {
9f95a23c 582 auto r = auth_cond.wait_until(lock, until);
f67539c2 583 if (r == std::cv_status::timeout && !active_con) {
7c673cae 584 ldout(cct, 0) << "authenticate timed out after " << timeout << dendl;
9f95a23c 585 authenticate_err = -ETIMEDOUT;
7c673cae
FG
586 }
587 } else {
9f95a23c 588 auth_cond.wait(lock);
7c673cae
FG
589 }
590 }
591
592 if (active_con) {
593 ldout(cct, 5) << __func__ << " success, global_id "
594 << active_con->get_global_id() << dendl;
595 // active_con should not have been set if there was an error
11fdf7f2 596 ceph_assert(authenticate_err >= 0);
7c673cae
FG
597 authenticated = true;
598 }
599
11fdf7f2 600 if (authenticate_err < 0 && auth_registry.no_keyring_disabled_cephx()) {
7c673cae
FG
601 lderr(cct) << __func__ << " NOTE: no keyring found; disabled cephx authentication" << dendl;
602 }
603
604 return authenticate_err;
605}
606
607void MonClient::handle_auth(MAuthReply *m)
608{
9f95a23c
TL
609 ceph_assert(ceph_mutex_is_locked(monc_lock));
610
611 if (m->get_connection()->is_anon()) {
612 // anon connection, used for mon tell commands
613 for (auto& p : mon_commands) {
614 if (p.second->target_con == m->get_connection()) {
615 auto& mc = p.second->target_session;
616 int ret = mc->handle_auth(m, entity_name,
617 CEPH_ENTITY_TYPE_MON,
618 rotating_secrets.get());
619 (void)ret; // we don't care
620 break;
621 }
622 }
623 m->put();
624 return;
625 }
626
7c673cae
FG
627 if (!_hunting()) {
628 std::swap(active_con->get_auth(), auth);
629 int ret = active_con->authenticate(m);
630 m->put();
631 std::swap(auth, active_con->get_auth());
632 if (global_id != active_con->get_global_id()) {
633 lderr(cct) << __func__ << " peer assigned me a different global_id: "
634 << active_con->get_global_id() << dendl;
635 }
636 if (ret != -EAGAIN) {
637 _finish_auth(ret);
638 }
639 return;
640 }
641
642 // hunting
11fdf7f2
TL
643 auto found = _find_pending_con(m->get_connection());
644 ceph_assert(found != pending_cons.end());
7c673cae
FG
645 int auth_err = found->second.handle_auth(m, entity_name, want_keys,
646 rotating_secrets.get());
647 m->put();
648 if (auth_err == -EAGAIN) {
649 return;
650 }
651 if (auth_err) {
652 pending_cons.erase(found);
653 if (!pending_cons.empty()) {
654 // keep trying with pending connections
655 return;
656 }
657 // the last try just failed, give up.
658 } else {
659 auto& mc = found->second;
11fdf7f2 660 ceph_assert(mc.have_session());
7c673cae
FG
661 active_con.reset(new MonConnection(std::move(mc)));
662 pending_cons.clear();
663 }
664
11fdf7f2
TL
665 _finish_hunting(auth_err);
666 _finish_auth(auth_err);
667}
7c673cae 668
11fdf7f2
TL
669void MonClient::_finish_auth(int auth_err)
670{
671 ldout(cct,10) << __func__ << " " << auth_err << dendl;
672 authenticate_err = auth_err;
673 // _resend_mon_commands() could _reopen_session() if the connected mon is not
674 // the one the MonCommand is targeting.
675 if (!auth_err && active_con) {
676 ceph_assert(auth);
677 _check_auth_tickets();
7c673cae 678 }
9f95a23c 679 auth_cond.notify_all();
7c673cae
FG
680}
681
7c673cae
FG
682// ---------
683
9f95a23c
TL
684void MonClient::send_mon_message(MessageRef m)
685{
686 std::lock_guard l{monc_lock};
687 _send_mon_message(std::move(m));
688}
689
690void MonClient::_send_mon_message(MessageRef m)
7c673cae 691{
9f95a23c 692 ceph_assert(ceph_mutex_is_locked(monc_lock));
7c673cae
FG
693 if (active_con) {
694 auto cur_con = active_con->get_con();
695 ldout(cct, 10) << "_send_mon_message to mon."
696 << monmap.get_name(cur_con->get_peer_addr())
697 << " at " << cur_con->get_peer_addr() << dendl;
9f95a23c 698 cur_con->send_message2(std::move(m));
7c673cae 699 } else {
9f95a23c 700 waiting_for_session.push_back(std::move(m));
7c673cae
FG
701 }
702}
703
704void MonClient::_reopen_session(int rank)
705{
9f95a23c 706 ceph_assert(ceph_mutex_is_locked(monc_lock));
7c673cae
FG
707 ldout(cct, 10) << __func__ << " rank " << rank << dendl;
708
709 active_con.reset();
710 pending_cons.clear();
711
b3b6e05e
TL
712 authenticate_err = 1; // == in progress
713
7c673cae
FG
714 _start_hunting();
715
716 if (rank >= 0) {
c5c27e9a 717 _add_conn(rank);
7c673cae 718 } else {
c5c27e9a 719 _add_conns();
7c673cae
FG
720 }
721
722 // throw out old queued messages
9f95a23c 723 waiting_for_session.clear();
7c673cae
FG
724
725 // throw out version check requests
726 while (!version_requests.empty()) {
f67539c2
TL
727 ceph::async::post(std::move(version_requests.begin()->second),
728 monc_errc::session_reset, 0, 0);
7c673cae
FG
729 version_requests.erase(version_requests.begin());
730 }
731
732 for (auto& c : pending_cons) {
11fdf7f2 733 c.second.start(monmap.get_epoch(), entity_name);
7c673cae
FG
734 }
735
11fdf7f2 736 if (sub.reload()) {
7c673cae 737 _renew_subs();
11fdf7f2 738 }
7c673cae
FG
739}
740
f67539c2 741void MonClient::_add_conn(unsigned rank)
7c673cae 742{
11fdf7f2
TL
743 auto peer = monmap.get_addrs(rank);
744 auto conn = messenger->connect_to_mon(peer);
745 MonConnection mc(cct, conn, global_id, &auth_registry);
c5c27e9a
TL
746 if (auth) {
747 mc.get_auth().reset(auth->clone());
748 }
f67539c2 749 pending_cons.insert(std::make_pair(peer, std::move(mc)));
7c673cae
FG
750 ldout(cct, 10) << "picked mon." << monmap.get_name(rank)
751 << " con " << conn
11fdf7f2 752 << " addr " << peer
7c673cae 753 << dendl;
7c673cae
FG
754}
755
c5c27e9a 756void MonClient::_add_conns()
7c673cae 757{
9f95a23c
TL
758 // collect the next batch of candidates who are listed right next to the ones
759 // already tried
760 auto get_next_batch = [this]() -> std::vector<unsigned> {
761 std::multimap<uint16_t, unsigned> ranks_by_priority;
762 boost::copy(
763 monmap.mon_info | boost::adaptors::filtered(
764 [this](auto& info) {
765 auto rank = monmap.get_rank(info.first);
766 return tried.count(rank) == 0;
767 }) | boost::adaptors::transformed(
768 [this](auto& info) {
769 auto rank = monmap.get_rank(info.first);
770 return std::make_pair(info.second.priority, rank);
771 }), std::inserter(ranks_by_priority, end(ranks_by_priority)));
772 if (ranks_by_priority.empty()) {
773 return {};
224ce89b 774 }
9f95a23c
TL
775 // only choose the monitors with lowest priority
776 auto cands = boost::make_iterator_range(
777 ranks_by_priority.equal_range(ranks_by_priority.begin()->first));
778 std::vector<unsigned> ranks;
779 boost::range::copy(cands | boost::adaptors::map_values,
780 std::back_inserter(ranks));
781 return ranks;
782 };
783 auto ranks = get_next_batch();
784 if (ranks.empty()) {
785 tried.clear(); // start over
786 ranks = get_next_batch();
787 }
788 ceph_assert(!ranks.empty());
789 if (ranks.size() > 1) {
790 std::vector<uint16_t> weights;
791 for (auto i : ranks) {
792 auto rank_name = monmap.get_name(i);
793 weights.push_back(monmap.get_weight(rank_name));
794 }
f67539c2 795 random_device_t rd;
9f95a23c
TL
796 if (std::accumulate(begin(weights), end(weights), 0u) == 0) {
797 std::shuffle(begin(ranks), end(ranks), std::mt19937{rd()});
798 } else {
799 weighted_shuffle(begin(ranks), end(ranks), begin(weights), end(weights),
800 std::mt19937{rd()});
224ce89b 801 }
7c673cae 802 }
9f95a23c 803 ldout(cct, 10) << __func__ << " ranks=" << ranks << dendl;
7c673cae 804 unsigned n = cct->_conf->mon_client_hunt_parallel;
224ce89b
WB
805 if (n == 0 || n > ranks.size()) {
806 n = ranks.size();
7c673cae
FG
807 }
808 for (unsigned i = 0; i < n; i++) {
c5c27e9a 809 _add_conn(ranks[i]);
9f95a23c 810 tried.insert(ranks[i]);
7c673cae
FG
811 }
812}
813
814bool MonClient::ms_handle_reset(Connection *con)
815{
11fdf7f2 816 std::lock_guard lock(monc_lock);
7c673cae
FG
817
818 if (con->get_peer_type() != CEPH_ENTITY_TYPE_MON)
819 return false;
820
9f95a23c
TL
821 if (con->is_anon()) {
822 auto p = mon_commands.begin();
823 while (p != mon_commands.end()) {
824 auto cmd = p->second;
825 ++p;
826 if (cmd->target_con == con) {
827 _send_command(cmd); // may retry or fail
828 break;
829 }
830 }
831 return true;
832 }
833
7c673cae 834 if (_hunting()) {
11fdf7f2
TL
835 if (pending_cons.count(con->get_peer_addrs())) {
836 ldout(cct, 10) << __func__ << " hunted mon " << con->get_peer_addrs()
837 << dendl;
7c673cae 838 } else {
11fdf7f2
TL
839 ldout(cct, 10) << __func__ << " stray mon " << con->get_peer_addrs()
840 << dendl;
7c673cae
FG
841 }
842 return true;
843 } else {
844 if (active_con && con == active_con->get_con()) {
11fdf7f2
TL
845 ldout(cct, 10) << __func__ << " current mon " << con->get_peer_addrs()
846 << dendl;
7c673cae
FG
847 _reopen_session();
848 return false;
849 } else {
11fdf7f2
TL
850 ldout(cct, 10) << "ms_handle_reset stray mon " << con->get_peer_addrs()
851 << dendl;
7c673cae
FG
852 return true;
853 }
854 }
855}
856
857bool MonClient::_opened() const
858{
9f95a23c 859 ceph_assert(ceph_mutex_is_locked(monc_lock));
7c673cae
FG
860 return active_con || _hunting();
861}
862
863bool MonClient::_hunting() const
864{
865 return !pending_cons.empty();
866}
867
868void MonClient::_start_hunting()
869{
11fdf7f2 870 ceph_assert(!_hunting());
7c673cae
FG
871 // adjust timeouts if necessary
872 if (!had_a_connection)
873 return;
874 reopen_interval_multiplier *= cct->_conf->mon_client_hunt_interval_backoff;
875 if (reopen_interval_multiplier >
876 cct->_conf->mon_client_hunt_interval_max_multiple) {
877 reopen_interval_multiplier =
878 cct->_conf->mon_client_hunt_interval_max_multiple;
879 }
880}
881
11fdf7f2 882void MonClient::_finish_hunting(int auth_err)
7c673cae 883{
11fdf7f2 884 ldout(cct,10) << __func__ << " " << auth_err << dendl;
9f95a23c 885 ceph_assert(ceph_mutex_is_locked(monc_lock));
7c673cae 886 // the pending conns have been cleaned.
11fdf7f2 887 ceph_assert(!_hunting());
7c673cae
FG
888 if (active_con) {
889 auto con = active_con->get_con();
890 ldout(cct, 1) << "found mon."
891 << monmap.get_name(con->get_peer_addr())
892 << dendl;
893 } else {
894 ldout(cct, 1) << "no mon sessions established" << dendl;
895 }
896
897 had_a_connection = true;
c07f9fc5 898 _un_backoff();
11fdf7f2
TL
899
900 if (!auth_err) {
901 last_rotating_renew_sent = utime_t();
902 while (!waiting_for_session.empty()) {
9f95a23c 903 _send_mon_message(std::move(waiting_for_session.front()));
11fdf7f2
TL
904 waiting_for_session.pop_front();
905 }
906 _resend_mon_commands();
907 send_log(true);
908 if (active_con) {
c5c27e9a 909 auth = std::move(active_con->get_auth());
11fdf7f2
TL
910 if (global_id && global_id != active_con->get_global_id()) {
911 lderr(cct) << __func__ << " global_id changed from " << global_id
912 << " to " << active_con->get_global_id() << dendl;
913 }
914 global_id = active_con->get_global_id();
915 }
916 }
7c673cae
FG
917}
918
919void MonClient::tick()
920{
921 ldout(cct, 10) << __func__ << dendl;
922
9f95a23c
TL
923 utime_t now = ceph_clock_now();
924
c07f9fc5
FG
925 auto reschedule_tick = make_scope_guard([this] {
926 schedule_tick();
927 });
928
7c673cae 929 _check_auth_tickets();
9f95a23c 930 _check_tell_commands();
7c673cae
FG
931
932 if (_hunting()) {
933 ldout(cct, 1) << "continuing hunt" << dendl;
c07f9fc5 934 return _reopen_session();
7c673cae
FG
935 } else if (active_con) {
936 // just renew as needed
7c673cae
FG
937 auto cur_con = active_con->get_con();
938 if (!cur_con->has_feature(CEPH_FEATURE_MON_STATEFUL_SUB)) {
11fdf7f2
TL
939 const bool maybe_renew = sub.need_renew();
940 ldout(cct, 10) << "renew subs? -- " << (maybe_renew ? "yes" : "no")
7c673cae 941 << dendl;
11fdf7f2 942 if (maybe_renew) {
7c673cae 943 _renew_subs();
11fdf7f2 944 }
7c673cae
FG
945 }
946
9f95a23c
TL
947 if (now > last_keepalive + cct->_conf->mon_client_ping_interval) {
948 cur_con->send_keepalive();
949 last_keepalive = now;
950
951 if (cct->_conf->mon_client_ping_timeout > 0 &&
952 cur_con->has_feature(CEPH_FEATURE_MSGR_KEEPALIVE2)) {
953 utime_t lk = cur_con->get_last_keepalive_ack();
954 utime_t interval = now - lk;
955 if (interval > cct->_conf->mon_client_ping_timeout) {
956 ldout(cct, 1) << "no keepalive since " << lk << " (" << interval
957 << " seconds), reconnecting" << dendl;
958 return _reopen_session();
959 }
7c673cae 960 }
9f95a23c
TL
961
962 _un_backoff();
7c673cae 963 }
c07f9fc5 964
9f95a23c
TL
965 if (now > last_send_log + cct->_conf->mon_client_log_interval) {
966 send_log();
967 last_send_log = now;
968 }
7c673cae 969 }
c07f9fc5 970}
7c673cae 971
c07f9fc5
FG
972void MonClient::_un_backoff()
973{
974 // un-backoff our reconnect interval
975 reopen_interval_multiplier = std::max(
11fdf7f2 976 cct->_conf.get_val<double>("mon_client_hunt_interval_min_multiple"),
c07f9fc5 977 reopen_interval_multiplier /
11fdf7f2 978 cct->_conf.get_val<double>("mon_client_hunt_interval_backoff"));
c07f9fc5
FG
979 ldout(cct, 20) << __func__ << " reopen_interval_multipler now "
980 << reopen_interval_multiplier << dendl;
7c673cae
FG
981}
982
983void MonClient::schedule_tick()
984{
9f95a23c 985 auto do_tick = make_lambda_context([this](int) { tick(); });
f6b5b4d7
TL
986 if (!is_connected()) {
987 // start another round of hunting
11fdf7f2
TL
988 const auto hunt_interval = (cct->_conf->mon_client_hunt_interval *
989 reopen_interval_multiplier);
990 timer.add_event_after(hunt_interval, do_tick);
991 } else {
f6b5b4d7 992 // keep in touch
9f95a23c
TL
993 timer.add_event_after(std::min(cct->_conf->mon_client_ping_interval,
994 cct->_conf->mon_client_log_interval),
995 do_tick);
11fdf7f2 996 }
7c673cae
FG
997}
998
999// ---------
1000
1001void MonClient::_renew_subs()
1002{
9f95a23c 1003 ceph_assert(ceph_mutex_is_locked(monc_lock));
11fdf7f2 1004 if (!sub.have_new()) {
7c673cae
FG
1005 ldout(cct, 10) << __func__ << " - empty" << dendl;
1006 return;
1007 }
1008
1009 ldout(cct, 10) << __func__ << dendl;
1010 if (!_opened())
1011 _reopen_session();
1012 else {
9f95a23c 1013 auto m = ceph::make_message<MMonSubscribe>();
11fdf7f2
TL
1014 m->what = sub.get_subs();
1015 m->hostname = ceph_get_short_hostname();
9f95a23c 1016 _send_mon_message(std::move(m));
11fdf7f2 1017 sub.renewed();
7c673cae
FG
1018 }
1019}
1020
1021void MonClient::handle_subscribe_ack(MMonSubscribeAck *m)
1022{
11fdf7f2 1023 sub.acked(m->interval);
7c673cae
FG
1024 m->put();
1025}
1026
1027int MonClient::_check_auth_tickets()
1028{
9f95a23c 1029 ceph_assert(ceph_mutex_is_locked(monc_lock));
7c673cae
FG
1030 if (active_con && auth) {
1031 if (auth->need_tickets()) {
1032 ldout(cct, 10) << __func__ << " getting new tickets!" << dendl;
9f95a23c 1033 auto m = ceph::make_message<MAuth>();
7c673cae
FG
1034 m->protocol = auth->get_protocol();
1035 auth->prepare_build_request();
1036 auth->build_request(m->auth_payload);
1037 _send_mon_message(m);
1038 }
1039
1040 _check_auth_rotating();
1041 }
1042 return 0;
1043}
1044
1045int MonClient::_check_auth_rotating()
1046{
9f95a23c 1047 ceph_assert(ceph_mutex_is_locked(monc_lock));
7c673cae
FG
1048 if (!rotating_secrets ||
1049 !auth_principal_needs_rotating_keys(entity_name)) {
1050 ldout(cct, 20) << "_check_auth_rotating not needed by " << entity_name << dendl;
1051 return 0;
1052 }
1053
1054 if (!active_con || !auth) {
1055 ldout(cct, 10) << "_check_auth_rotating waiting for auth session" << dendl;
1056 return 0;
1057 }
1058
1059 utime_t now = ceph_clock_now();
1060 utime_t cutoff = now;
11fdf7f2 1061 cutoff -= std::min(30.0, cct->_conf->auth_service_ticket_ttl / 4.0);
7c673cae
FG
1062 utime_t issued_at_lower_bound = now;
1063 issued_at_lower_bound -= cct->_conf->auth_service_ticket_ttl;
1064 if (!rotating_secrets->need_new_secrets(cutoff)) {
1065 ldout(cct, 10) << "_check_auth_rotating have uptodate secrets (they expire after " << cutoff << ")" << dendl;
1066 rotating_secrets->dump_rotating();
1067 return 0;
1068 }
1069
1070 ldout(cct, 10) << "_check_auth_rotating renewing rotating keys (they expired before " << cutoff << ")" << dendl;
1071 if (!rotating_secrets->need_new_secrets() &&
1072 rotating_secrets->need_new_secrets(issued_at_lower_bound)) {
1073 // the key has expired before it has been issued?
1074 lderr(cct) << __func__ << " possible clock skew, rotating keys expired way too early"
1075 << " (before " << issued_at_lower_bound << ")" << dendl;
1076 }
1077 if ((now > last_rotating_renew_sent) &&
1078 double(now - last_rotating_renew_sent) < 1) {
1079 ldout(cct, 10) << __func__ << " called too often (last: "
1080 << last_rotating_renew_sent << "), skipping refresh" << dendl;
1081 return 0;
1082 }
9f95a23c 1083 auto m = ceph::make_message<MAuth>();
7c673cae
FG
1084 m->protocol = auth->get_protocol();
1085 if (auth->build_rotating_request(m->auth_payload)) {
1086 last_rotating_renew_sent = now;
9f95a23c 1087 _send_mon_message(std::move(m));
7c673cae
FG
1088 }
1089 return 0;
1090}
1091
1092int MonClient::wait_auth_rotating(double timeout)
1093{
9f95a23c 1094 std::unique_lock l(monc_lock);
7c673cae
FG
1095
1096 // Must be initialized
11fdf7f2 1097 ceph_assert(auth != nullptr);
7c673cae
FG
1098
1099 if (auth->get_protocol() == CEPH_AUTH_NONE)
1100 return 0;
1101
1102 if (!rotating_secrets)
1103 return 0;
1104
9f95a23c 1105 ldout(cct, 10) << __func__ << " waiting for " << timeout << dendl;
b3b6e05e
TL
1106 utime_t cutoff = ceph_clock_now();
1107 cutoff -= std::min(30.0, cct->_conf->auth_service_ticket_ttl / 4.0);
1108 if (auth_cond.wait_for(l, ceph::make_timespan(timeout), [this, cutoff] {
9f95a23c 1109 return (!auth_principal_needs_rotating_keys(entity_name) ||
b3b6e05e 1110 !rotating_secrets->need_new_secrets(cutoff));
9f95a23c
TL
1111 })) {
1112 ldout(cct, 10) << __func__ << " done" << dendl;
1113 return 0;
1114 } else {
1115 ldout(cct, 0) << __func__ << " timed out after " << timeout << dendl;
1116 return -ETIMEDOUT;
7c673cae 1117 }
7c673cae
FG
1118}
1119
1120// ---------
1121
1122void MonClient::_send_command(MonCommand *r)
1123{
9f95a23c
TL
1124 if (r->is_tell()) {
1125 ++r->send_attempts;
eafe8130 1126 if (r->send_attempts > cct->_conf->mon_client_directed_command_retry) {
f67539c2 1127 _finish_command(r, monc_errc::mon_unavailable, "mon unavailable", {});
eafe8130
TL
1128 return;
1129 }
9f95a23c
TL
1130 // tell-style command
1131 if (monmap.min_mon_release >= ceph_release_t::octopus) {
1132 if (r->target_con) {
1133 r->target_con->mark_down();
1134 }
1135 if (r->target_rank >= 0) {
1136 if (r->target_rank >= (int)monmap.size()) {
1137 ldout(cct, 10) << " target " << r->target_rank
1138 << " >= max mon " << monmap.size() << dendl;
f67539c2 1139 _finish_command(r, monc_errc::rank_dne, "mon rank dne"sv, {});
9f95a23c
TL
1140 return;
1141 }
1142 r->target_con = messenger->connect_to_mon(
1143 monmap.get_addrs(r->target_rank), true /* anon */);
1144 } else {
1145 if (!monmap.contains(r->target_name)) {
1146 ldout(cct, 10) << " target " << r->target_name
1147 << " not present in monmap" << dendl;
f67539c2 1148 _finish_command(r, monc_errc::mon_dne, "mon dne"sv, {});
9f95a23c
TL
1149 return;
1150 }
1151 r->target_con = messenger->connect_to_mon(
1152 monmap.get_addrs(r->target_name), true /* anon */);
1153 }
1154
1155 r->target_session.reset(new MonConnection(cct, r->target_con, 0,
1156 &auth_registry));
1157 r->target_session->start(monmap.get_epoch(), entity_name);
1158 r->last_send_attempt = ceph_clock_now();
1159
1160 MCommand *m = new MCommand(monmap.fsid);
1161 m->set_tid(r->tid);
1162 m->cmd = r->cmd;
1163 m->set_data(r->inbl);
1164 r->target_session->queue_command(m);
7c673cae
FG
1165 return;
1166 }
7c673cae 1167
9f95a23c
TL
1168 // ugly legacy handling of pre-octopus mons
1169 entity_addr_t peer;
1170 if (active_con) {
1171 peer = active_con->get_con()->get_peer_addr();
1172 }
1173
1174 if (r->target_rank >= 0 &&
1175 r->target_rank != monmap.get_rank(peer)) {
1176 ldout(cct, 10) << __func__ << " " << r->tid << " " << r->cmd
1177 << " wants rank " << r->target_rank
1178 << ", reopening session"
1179 << dendl;
1180 if (r->target_rank >= (int)monmap.size()) {
1181 ldout(cct, 10) << " target " << r->target_rank
1182 << " >= max mon " << monmap.size() << dendl;
f67539c2 1183 _finish_command(r, monc_errc::rank_dne, "mon rank dne"sv, {});
9f95a23c
TL
1184 return;
1185 }
1186 _reopen_session(r->target_rank);
eafe8130
TL
1187 return;
1188 }
9f95a23c
TL
1189 if (r->target_name.length() &&
1190 r->target_name != monmap.get_name(peer)) {
1191 ldout(cct, 10) << __func__ << " " << r->tid << " " << r->cmd
1192 << " wants mon " << r->target_name
1193 << ", reopening session"
1194 << dendl;
1195 if (!monmap.contains(r->target_name)) {
1196 ldout(cct, 10) << " target " << r->target_name
1197 << " not present in monmap" << dendl;
f67539c2 1198 _finish_command(r, monc_errc::mon_dne, "mon dne"sv, {});
9f95a23c
TL
1199 return;
1200 }
1201 _reopen_session(monmap.get_rank(r->target_name));
7c673cae
FG
1202 return;
1203 }
9f95a23c 1204 // fall-thru to send 'normal' CLI command
7c673cae
FG
1205 }
1206
9f95a23c 1207 // normal CLI command
7c673cae 1208 ldout(cct, 10) << __func__ << " " << r->tid << " " << r->cmd << dendl;
9f95a23c 1209 auto m = ceph::make_message<MMonCommand>(monmap.fsid);
7c673cae
FG
1210 m->set_tid(r->tid);
1211 m->cmd = r->cmd;
1212 m->set_data(r->inbl);
9f95a23c 1213 _send_mon_message(std::move(m));
7c673cae
FG
1214 return;
1215}
1216
9f95a23c
TL
1217void MonClient::_check_tell_commands()
1218{
1219 // resend any requests
1220 auto now = ceph_clock_now();
1221 auto p = mon_commands.begin();
1222 while (p != mon_commands.end()) {
1223 auto cmd = p->second;
1224 ++p;
1225 if (cmd->is_tell() &&
1226 cmd->last_send_attempt != utime_t() &&
1227 now - cmd->last_send_attempt > cct->_conf->mon_client_hunt_interval) {
1228 ldout(cct,5) << __func__ << " timeout tell command " << cmd->tid << dendl;
1229 _send_command(cmd); // might remove cmd from mon_commands
1230 }
1231 }
1232}
1233
7c673cae
FG
1234void MonClient::_resend_mon_commands()
1235{
1236 // resend any requests
9f95a23c 1237 auto p = mon_commands.begin();
eafe8130
TL
1238 while (p != mon_commands.end()) {
1239 auto cmd = p->second;
1240 ++p;
9f95a23c
TL
1241 if (cmd->is_tell() && monmap.min_mon_release >= ceph_release_t::octopus) {
1242 // starting with octopus, tell commands use their own connetion and need no
1243 // special resend when we finish hunting.
1244 } else {
1245 _send_command(cmd); // might remove cmd from mon_commands
1246 }
7c673cae
FG
1247 }
1248}
1249
1250void MonClient::handle_mon_command_ack(MMonCommandAck *ack)
1251{
1252 MonCommand *r = NULL;
1253 uint64_t tid = ack->get_tid();
1254
1255 if (tid == 0 && !mon_commands.empty()) {
1256 r = mon_commands.begin()->second;
1257 ldout(cct, 10) << __func__ << " has tid 0, assuming it is " << r->tid << dendl;
1258 } else {
9f95a23c 1259 auto p = mon_commands.find(tid);
7c673cae
FG
1260 if (p == mon_commands.end()) {
1261 ldout(cct, 10) << __func__ << " " << ack->get_tid() << " not found" << dendl;
1262 ack->put();
1263 return;
1264 }
1265 r = p->second;
1266 }
1267
1268 ldout(cct, 10) << __func__ << " " << r->tid << " " << r->cmd << dendl;
f67539c2
TL
1269 auto ec = ack->r < 0 ? bs::error_code(-ack->r, mon_category())
1270 : bs::error_code();
1271 _finish_command(r, ec, ack->rs,
1272 std::move(ack->get_data()));
7c673cae
FG
1273 ack->put();
1274}
1275
9f95a23c
TL
1276void MonClient::handle_command_reply(MCommandReply *reply)
1277{
1278 MonCommand *r = NULL;
1279 uint64_t tid = reply->get_tid();
1280
1281 if (tid == 0 && !mon_commands.empty()) {
1282 r = mon_commands.begin()->second;
1283 ldout(cct, 10) << __func__ << " has tid 0, assuming it is " << r->tid
1284 << dendl;
1285 } else {
1286 auto p = mon_commands.find(tid);
1287 if (p == mon_commands.end()) {
1288 ldout(cct, 10) << __func__ << " " << reply->get_tid() << " not found"
1289 << dendl;
1290 reply->put();
1291 return;
1292 }
1293 r = p->second;
1294 }
1295
1296 ldout(cct, 10) << __func__ << " " << r->tid << " " << r->cmd << dendl;
f67539c2
TL
1297 auto ec = reply->r < 0 ? bs::error_code(-reply->r, mon_category())
1298 : bs::error_code();
1299 _finish_command(r, ec, reply->rs, std::move(reply->get_data()));
9f95a23c
TL
1300 reply->put();
1301}
1302
31f18b77 1303int MonClient::_cancel_mon_command(uint64_t tid)
7c673cae 1304{
9f95a23c 1305 ceph_assert(ceph_mutex_is_locked(monc_lock));
7c673cae 1306
9f95a23c 1307 auto it = mon_commands.find(tid);
7c673cae
FG
1308 if (it == mon_commands.end()) {
1309 ldout(cct, 10) << __func__ << " tid " << tid << " dne" << dendl;
1310 return -ENOENT;
1311 }
1312
1313 ldout(cct, 10) << __func__ << " tid " << tid << dendl;
1314
1315 MonCommand *cmd = it->second;
f67539c2 1316 _finish_command(cmd, monc_errc::timed_out, "timed out"sv, {});
7c673cae
FG
1317 return 0;
1318}
1319
f67539c2
TL
1320void MonClient::_finish_command(MonCommand *r, bs::error_code ret,
1321 std::string_view rs, ceph::buffer::list&& bl)
7c673cae 1322{
f67539c2
TL
1323 ldout(cct, 10) << __func__ << " " << r->tid << " = " << ret << " " << rs
1324 << dendl;
1325 ceph::async::post(std::move(r->onfinish), ret, std::string(rs),
1326 std::move(bl));
9f95a23c
TL
1327 if (r->target_con) {
1328 r->target_con->mark_down();
1329 }
7c673cae
FG
1330 mon_commands.erase(r->tid);
1331 delete r;
1332}
1333
7c673cae
FG
1334// ---------
1335
7c673cae
FG
1336void MonClient::handle_get_version_reply(MMonGetVersionReply* m)
1337{
9f95a23c
TL
1338 ceph_assert(ceph_mutex_is_locked(monc_lock));
1339 auto iter = version_requests.find(m->handle);
7c673cae
FG
1340 if (iter == version_requests.end()) {
1341 ldout(cct, 0) << __func__ << " version request with handle " << m->handle
1342 << " not found" << dendl;
1343 } else {
f67539c2
TL
1344 auto req = std::move(iter->second);
1345 ldout(cct, 10) << __func__ << " finishing " << iter->first << " version "
1346 << m->version << dendl;
7c673cae 1347 version_requests.erase(iter);
f67539c2
TL
1348 ceph::async::post(std::move(req), bs::error_code(),
1349 m->version, m->oldest_version);
7c673cae
FG
1350 }
1351 m->put();
1352}
1353
11fdf7f2
TL
1354int MonClient::get_auth_request(
1355 Connection *con,
1356 AuthConnectionMeta *auth_meta,
1357 uint32_t *auth_method,
1358 std::vector<uint32_t> *preferred_modes,
9f95a23c 1359 ceph::buffer::list *bl)
11fdf7f2
TL
1360{
1361 std::lock_guard l(monc_lock);
1362 ldout(cct,10) << __func__ << " con " << con << " auth_method " << *auth_method
1363 << dendl;
1364
1365 // connection to mon?
1366 if (con->get_peer_type() == CEPH_ENTITY_TYPE_MON) {
1367 ceph_assert(!auth_meta->authorizer);
9f95a23c
TL
1368 if (con->is_anon()) {
1369 for (auto& i : mon_commands) {
1370 if (i.second->target_con == con) {
1371 return i.second->target_session->get_auth_request(
1372 auth_method, preferred_modes, bl,
1373 entity_name, want_keys, rotating_secrets.get());
1374 }
1375 }
1376 }
11fdf7f2
TL
1377 for (auto& i : pending_cons) {
1378 if (i.second.is_con(con)) {
1379 return i.second.get_auth_request(
1380 auth_method, preferred_modes, bl,
1381 entity_name, want_keys, rotating_secrets.get());
1382 }
1383 }
1384 return -ENOENT;
1385 }
1386
1387 // generate authorizer
1388 if (!auth) {
1389 lderr(cct) << __func__ << " but no auth handler is set up" << dendl;
1390 return -EACCES;
1391 }
1392 auth_meta->authorizer.reset(auth->build_authorizer(con->get_peer_type()));
1393 if (!auth_meta->authorizer) {
1394 lderr(cct) << __func__ << " failed to build_authorizer for type "
1395 << ceph_entity_type_name(con->get_peer_type()) << dendl;
1396 return -EACCES;
1397 }
1398 auth_meta->auth_method = auth_meta->authorizer->protocol;
1399 auth_registry.get_supported_modes(con->get_peer_type(),
1400 auth_meta->auth_method,
1401 preferred_modes);
1402 *bl = auth_meta->authorizer->bl;
1403 return 0;
1404}
1405
1406int MonClient::handle_auth_reply_more(
1407 Connection *con,
1408 AuthConnectionMeta *auth_meta,
9f95a23c
TL
1409 const ceph::buffer::list& bl,
1410 ceph::buffer::list *reply)
11fdf7f2
TL
1411{
1412 std::lock_guard l(monc_lock);
1413
1414 if (con->get_peer_type() == CEPH_ENTITY_TYPE_MON) {
9f95a23c
TL
1415 if (con->is_anon()) {
1416 for (auto& i : mon_commands) {
1417 if (i.second->target_con == con) {
1418 return i.second->target_session->handle_auth_reply_more(
1419 auth_meta, bl, reply);
1420 }
1421 }
1422 }
11fdf7f2
TL
1423 for (auto& i : pending_cons) {
1424 if (i.second.is_con(con)) {
1425 return i.second.handle_auth_reply_more(auth_meta, bl, reply);
1426 }
1427 }
1428 return -ENOENT;
1429 }
1430
1431 // authorizer challenges
1432 if (!auth || !auth_meta->authorizer) {
1433 lderr(cct) << __func__ << " no authorizer?" << dendl;
1434 return -1;
1435 }
1436 auth_meta->authorizer->add_challenge(cct, bl);
1437 *reply = auth_meta->authorizer->bl;
1438 return 0;
1439}
1440
1441int MonClient::handle_auth_done(
1442 Connection *con,
1443 AuthConnectionMeta *auth_meta,
1444 uint64_t global_id,
1445 uint32_t con_mode,
9f95a23c 1446 const ceph::buffer::list& bl,
11fdf7f2
TL
1447 CryptoKey *session_key,
1448 std::string *connection_secret)
1449{
1450 if (con->get_peer_type() == CEPH_ENTITY_TYPE_MON) {
1451 std::lock_guard l(monc_lock);
9f95a23c
TL
1452 if (con->is_anon()) {
1453 for (auto& i : mon_commands) {
1454 if (i.second->target_con == con) {
1455 return i.second->target_session->handle_auth_done(
1456 auth_meta, global_id, bl,
1457 session_key, connection_secret);
1458 }
1459 }
1460 }
11fdf7f2
TL
1461 for (auto& i : pending_cons) {
1462 if (i.second.is_con(con)) {
1463 int r = i.second.handle_auth_done(
1464 auth_meta, global_id, bl,
1465 session_key, connection_secret);
1466 if (r) {
1467 pending_cons.erase(i.first);
1468 if (!pending_cons.empty()) {
1469 return r;
1470 }
1471 } else {
1472 active_con.reset(new MonConnection(std::move(i.second)));
1473 pending_cons.clear();
1474 ceph_assert(active_con->have_session());
1475 }
1476
1477 _finish_hunting(r);
1478 if (r || monmap.get_epoch() > 0) {
1479 _finish_auth(r);
1480 }
1481 return r;
1482 }
1483 }
1484 return -ENOENT;
1485 } else {
1486 // verify authorizer reply
1487 auto p = bl.begin();
1488 if (!auth_meta->authorizer->verify_reply(p, &auth_meta->connection_secret)) {
1489 ldout(cct, 0) << __func__ << " failed verifying authorizer reply"
1490 << dendl;
1491 return -EACCES;
1492 }
1493 auth_meta->session_key = auth_meta->authorizer->session_key;
1494 return 0;
1495 }
1496}
1497
1498int MonClient::handle_auth_bad_method(
1499 Connection *con,
1500 AuthConnectionMeta *auth_meta,
1501 uint32_t old_auth_method,
1502 int result,
1503 const std::vector<uint32_t>& allowed_methods,
1504 const std::vector<uint32_t>& allowed_modes)
1505{
1506 auth_meta->allowed_methods = allowed_methods;
1507
1508 std::lock_guard l(monc_lock);
1509 if (con->get_peer_type() == CEPH_ENTITY_TYPE_MON) {
9f95a23c
TL
1510 if (con->is_anon()) {
1511 for (auto& i : mon_commands) {
1512 if (i.second->target_con == con) {
1513 int r = i.second->target_session->handle_auth_bad_method(
1514 old_auth_method,
1515 result,
1516 allowed_methods,
1517 allowed_modes);
1518 if (r < 0) {
f67539c2
TL
1519 auto ec = bs::error_code(-r, mon_category());
1520 _finish_command(i.second, ec, "auth failed"sv, {});
9f95a23c
TL
1521 }
1522 return r;
1523 }
1524 }
1525 }
11fdf7f2
TL
1526 for (auto& i : pending_cons) {
1527 if (i.second.is_con(con)) {
1528 int r = i.second.handle_auth_bad_method(old_auth_method,
1529 result,
1530 allowed_methods,
1531 allowed_modes);
1532 if (r == 0) {
1533 return r; // try another method on this con
1534 }
1535 pending_cons.erase(i.first);
1536 if (!pending_cons.empty()) {
1537 return r; // fail this con, maybe another con will succeed
1538 }
1539 // fail hunt
1540 _finish_hunting(r);
1541 _finish_auth(r);
1542 return r;
1543 }
1544 }
1545 return -ENOENT;
1546 } else {
1547 // huh...
1548 ldout(cct,10) << __func__ << " hmm, they didn't like " << old_auth_method
1549 << " result " << cpp_strerror(result)
1550 << " and auth is " << (auth ? auth->get_protocol() : 0)
1551 << dendl;
1552 return -EACCES;
1553 }
1554}
1555
1556int MonClient::handle_auth_request(
1557 Connection *con,
1558 AuthConnectionMeta *auth_meta,
1559 bool more,
1560 uint32_t auth_method,
9f95a23c
TL
1561 const ceph::buffer::list& payload,
1562 ceph::buffer::list *reply)
11fdf7f2 1563{
9f95a23c
TL
1564 if (payload.length() == 0) {
1565 // for some channels prior to nautilus (osd heartbeat), we
1566 // tolerate the lack of an authorizer.
1567 if (!con->get_messenger()->require_authorizer) {
1568 handle_authentication_dispatcher->ms_handle_authentication(con);
1569 return 1;
1570 }
1571 return -EACCES;
1572 }
11fdf7f2
TL
1573 auth_meta->auth_mode = payload[0];
1574 if (auth_meta->auth_mode < AUTH_MODE_AUTHORIZER ||
1575 auth_meta->auth_mode > AUTH_MODE_AUTHORIZER_MAX) {
1576 return -EACCES;
1577 }
1578 AuthAuthorizeHandler *ah = get_auth_authorize_handler(con->get_peer_type(),
1579 auth_method);
1580 if (!ah) {
1581 lderr(cct) << __func__ << " no AuthAuthorizeHandler found for auth method "
1582 << auth_method << dendl;
1583 return -EOPNOTSUPP;
1584 }
eafe8130
TL
1585
1586 auto ac = &auth_meta->authorizer_challenge;
be3bc32f
TL
1587 if (auth_meta->skip_authorizer_challenge) {
1588 ldout(cct, 10) << __func__ << " skipping challenge on " << con << dendl;
eafe8130
TL
1589 ac = nullptr;
1590 }
1591
11fdf7f2
TL
1592 bool was_challenge = (bool)auth_meta->authorizer_challenge;
1593 bool isvalid = ah->verify_authorizer(
1594 cct,
9f95a23c 1595 *rotating_secrets,
11fdf7f2
TL
1596 payload,
1597 auth_meta->get_connection_secret_length(),
1598 reply,
1599 &con->peer_name,
1600 &con->peer_global_id,
1601 &con->peer_caps_info,
1602 &auth_meta->session_key,
1603 &auth_meta->connection_secret,
eafe8130 1604 ac);
11fdf7f2
TL
1605 if (isvalid) {
1606 handle_authentication_dispatcher->ms_handle_authentication(con);
1607 return 1;
1608 }
1609 if (!more && !was_challenge && auth_meta->authorizer_challenge) {
1610 ldout(cct,10) << __func__ << " added challenge on " << con << dendl;
1611 return 0;
1612 }
1613 ldout(cct,10) << __func__ << " bad authorizer on " << con << dendl;
9f95a23c
TL
1614 // discard old challenge
1615 auth_meta->authorizer_challenge.reset();
11fdf7f2
TL
1616 return -EACCES;
1617}
1618
7c673cae 1619AuthAuthorizer* MonClient::build_authorizer(int service_id) const {
11fdf7f2 1620 std::lock_guard l(monc_lock);
7c673cae
FG
1621 if (auth) {
1622 return auth->build_authorizer(service_id);
1623 } else {
1624 ldout(cct, 0) << __func__ << " for " << ceph_entity_type_name(service_id)
1625 << ", but no auth is available now" << dendl;
1626 return nullptr;
1627 }
1628}
1629
1630#define dout_subsys ceph_subsys_monc
1631#undef dout_prefix
1632#define dout_prefix *_dout << "monclient" << (have_session() ? ": " : "(hunting): ")
1633
11fdf7f2
TL
1634MonConnection::MonConnection(
1635 CephContext *cct, ConnectionRef con, uint64_t global_id,
1636 AuthRegistry *ar)
1637 : cct(cct), con(con), global_id(global_id), auth_registry(ar)
7c673cae
FG
1638{}
1639
1640MonConnection::~MonConnection()
1641{
1642 if (con) {
1643 con->mark_down();
1644 con.reset();
1645 }
1646}
1647
1648bool MonConnection::have_session() const
1649{
1650 return state == State::HAVE_SESSION;
1651}
1652
1653void MonConnection::start(epoch_t epoch,
11fdf7f2 1654 const EntityName& entity_name)
7c673cae 1655{
9f95a23c 1656 using ceph::encode;
11fdf7f2
TL
1657 auth_start = ceph_clock_now();
1658
1659 if (con->get_peer_addr().is_msgr2()) {
1660 ldout(cct, 10) << __func__ << " opening mon connection" << dendl;
1661 state = State::AUTHENTICATING;
1662 con->send_message(new MMonGetMap());
1663 return;
1664 }
1665
7c673cae
FG
1666 // restart authentication handshake
1667 state = State::NEGOTIATING;
1668
1669 // send an initial keepalive to ensure our timestamp is valid by the
1670 // time we are in an OPENED state (by sequencing this before
1671 // authentication).
1672 con->send_keepalive();
1673
1674 auto m = new MAuth;
11fdf7f2 1675 m->protocol = CEPH_AUTH_UNKNOWN;
7c673cae
FG
1676 m->monmap_epoch = epoch;
1677 __u8 struct_v = 1;
11fdf7f2 1678 encode(struct_v, m->auth_payload);
9f95a23c 1679 std::vector<uint32_t> auth_supported;
11fdf7f2
TL
1680 auth_registry->get_supported_methods(con->get_peer_type(), &auth_supported);
1681 encode(auth_supported, m->auth_payload);
1682 encode(entity_name, m->auth_payload);
1683 encode(global_id, m->auth_payload);
7c673cae
FG
1684 con->send_message(m);
1685}
1686
11fdf7f2
TL
1687int MonConnection::get_auth_request(
1688 uint32_t *method,
1689 std::vector<uint32_t> *preferred_modes,
9f95a23c 1690 ceph::buffer::list *bl,
11fdf7f2
TL
1691 const EntityName& entity_name,
1692 uint32_t want_keys,
1693 RotatingKeyRing* keyring)
1694{
9f95a23c 1695 using ceph::encode;
11fdf7f2
TL
1696 // choose method
1697 if (auth_method < 0) {
9f95a23c 1698 std::vector<uint32_t> as;
11fdf7f2
TL
1699 auth_registry->get_supported_methods(con->get_peer_type(), &as);
1700 if (as.empty()) {
1701 return -EACCES;
1702 }
1703 auth_method = as.front();
1704 }
1705 *method = auth_method;
1706 auth_registry->get_supported_modes(con->get_peer_type(), auth_method,
1707 preferred_modes);
1708 ldout(cct,10) << __func__ << " method " << *method
1709 << " preferred_modes " << *preferred_modes << dendl;
1710 if (preferred_modes->empty()) {
1711 return -EACCES;
1712 }
1713
11fdf7f2
TL
1714 int r = _init_auth(*method, entity_name, want_keys, keyring, true);
1715 ceph_assert(r == 0);
1716
1717 // initial requset includes some boilerplate...
1718 encode((char)AUTH_MODE_MON, *bl);
1719 encode(entity_name, *bl);
1720 encode(global_id, *bl);
1721
1722 // and (maybe) some method-specific initial payload
1723 auth->build_initial_request(bl);
1724
1725 return 0;
1726}
1727
1728int MonConnection::handle_auth_reply_more(
1729 AuthConnectionMeta *auth_meta,
9f95a23c
TL
1730 const ceph::buffer::list& bl,
1731 ceph::buffer::list *reply)
11fdf7f2
TL
1732{
1733 ldout(cct, 10) << __func__ << " payload " << bl.length() << dendl;
1734 ldout(cct, 30) << __func__ << " got\n";
1735 bl.hexdump(*_dout);
1736 *_dout << dendl;
1737
1738 auto p = bl.cbegin();
1739 ldout(cct, 10) << __func__ << " payload_len " << bl.length() << dendl;
1740 int r = auth->handle_response(0, p, &auth_meta->session_key,
1741 &auth_meta->connection_secret);
1742 if (r == -EAGAIN) {
1743 auth->prepare_build_request();
1744 auth->build_request(*reply);
1745 ldout(cct, 10) << __func__ << " responding with " << reply->length()
1746 << " bytes" << dendl;
1747 r = 0;
1748 } else if (r < 0) {
1749 lderr(cct) << __func__ << " handle_response returned " << r << dendl;
1750 } else {
1751 ldout(cct, 10) << __func__ << " authenticated!" << dendl;
1752 // FIXME
1753 ceph_abort(cct, "write me");
1754 }
1755 return r;
1756}
1757
1758int MonConnection::handle_auth_done(
1759 AuthConnectionMeta *auth_meta,
1760 uint64_t new_global_id,
9f95a23c 1761 const ceph::buffer::list& bl,
11fdf7f2
TL
1762 CryptoKey *session_key,
1763 std::string *connection_secret)
1764{
1765 ldout(cct,10) << __func__ << " global_id " << new_global_id
1766 << " payload " << bl.length()
1767 << dendl;
1768 global_id = new_global_id;
1769 auth->set_global_id(global_id);
1770 auto p = bl.begin();
1771 int auth_err = auth->handle_response(0, p, &auth_meta->session_key,
1772 &auth_meta->connection_secret);
1773 if (auth_err >= 0) {
1774 state = State::HAVE_SESSION;
1775 }
1776 con->set_last_keepalive_ack(auth_start);
9f95a23c
TL
1777
1778 if (pending_tell_command) {
1779 con->send_message2(std::move(pending_tell_command));
1780 }
11fdf7f2
TL
1781 return auth_err;
1782}
1783
1784int MonConnection::handle_auth_bad_method(
1785 uint32_t old_auth_method,
1786 int result,
1787 const std::vector<uint32_t>& allowed_methods,
1788 const std::vector<uint32_t>& allowed_modes)
1789{
1790 ldout(cct,10) << __func__ << " old_auth_method " << old_auth_method
1791 << " result " << cpp_strerror(result)
1792 << " allowed_methods " << allowed_methods << dendl;
9f95a23c 1793 std::vector<uint32_t> auth_supported;
11fdf7f2
TL
1794 auth_registry->get_supported_methods(con->get_peer_type(), &auth_supported);
1795 auto p = std::find(auth_supported.begin(), auth_supported.end(),
1796 old_auth_method);
1797 assert(p != auth_supported.end());
1798 p = std::find_first_of(std::next(p), auth_supported.end(),
1799 allowed_methods.begin(), allowed_methods.end());
1800 if (p == auth_supported.end()) {
1801 lderr(cct) << __func__ << " server allowed_methods " << allowed_methods
1802 << " but i only support " << auth_supported << dendl;
1803 return -EACCES;
1804 }
1805 auth_method = *p;
1806 ldout(cct,10) << __func__ << " will try " << auth_method << " next" << dendl;
1807 return 0;
1808}
1809
7c673cae
FG
1810int MonConnection::handle_auth(MAuthReply* m,
1811 const EntityName& entity_name,
1812 uint32_t want_keys,
1813 RotatingKeyRing* keyring)
1814{
1815 if (state == State::NEGOTIATING) {
1816 int r = _negotiate(m, entity_name, want_keys, keyring);
1817 if (r) {
1818 return r;
1819 }
1820 state = State::AUTHENTICATING;
1821 }
1822 int r = authenticate(m);
1823 if (!r) {
1824 state = State::HAVE_SESSION;
1825 }
1826 return r;
1827}
1828
1829int MonConnection::_negotiate(MAuthReply *m,
1830 const EntityName& entity_name,
1831 uint32_t want_keys,
1832 RotatingKeyRing* keyring)
1833{
11fdf7f2
TL
1834 int r = _init_auth(m->protocol, entity_name, want_keys, keyring, false);
1835 if (r == -ENOTSUP) {
7c673cae
FG
1836 if (m->result == -ENOTSUP) {
1837 ldout(cct, 10) << "none of our auth protocols are supported by the server"
1838 << dendl;
1839 }
1840 return m->result;
1841 }
11fdf7f2
TL
1842 return r;
1843}
1844
1845int MonConnection::_init_auth(
1846 uint32_t method,
1847 const EntityName& entity_name,
1848 uint32_t want_keys,
1849 RotatingKeyRing* keyring,
1850 bool msgr2)
1851{
c5c27e9a
TL
1852 ldout(cct, 10) << __func__ << " method " << method << dendl;
1853 if (auth && auth->get_protocol() == (int)method) {
1854 ldout(cct, 10) << __func__ << " already have auth, reseting" << dendl;
1855 auth->reset();
1856 return 0;
1857 }
1858
1859 ldout(cct, 10) << __func__ << " creating new auth" << dendl;
1860 auth.reset(AuthClientHandler::create(cct, method, keyring));
11fdf7f2
TL
1861 if (!auth) {
1862 ldout(cct, 10) << " no handler for protocol " << method << dendl;
1863 return -ENOTSUP;
1864 }
7c673cae
FG
1865
1866 // do not request MGR key unless the mon has the SERVER_KRAKEN
1867 // feature. otherwise it will give us an auth error. note that
1868 // we have to use the FEATUREMASK because pre-jewel the kraken
1869 // feature bit was used for something else.
11fdf7f2
TL
1870 if (!msgr2 &&
1871 (want_keys & CEPH_ENTITY_TYPE_MGR) &&
1872 !(con->has_features(CEPH_FEATUREMASK_SERVER_KRAKEN))) {
7c673cae
FG
1873 ldout(cct, 1) << __func__
1874 << " not requesting MGR keys from pre-kraken monitor"
1875 << dendl;
1876 want_keys &= ~CEPH_ENTITY_TYPE_MGR;
1877 }
1878 auth->set_want_keys(want_keys);
1879 auth->init(entity_name);
1880 auth->set_global_id(global_id);
1881 return 0;
1882}
1883
1884int MonConnection::authenticate(MAuthReply *m)
1885{
11fdf7f2 1886 ceph_assert(auth);
7c673cae
FG
1887 if (!m->global_id) {
1888 ldout(cct, 1) << "peer sent an invalid global_id" << dendl;
1889 }
1890 if (m->global_id != global_id) {
1891 // it's a new session
1892 auth->reset();
1893 global_id = m->global_id;
1894 auth->set_global_id(global_id);
1895 ldout(cct, 10) << "my global_id is " << m->global_id << dendl;
1896 }
11fdf7f2
TL
1897 auto p = m->result_bl.cbegin();
1898 int ret = auth->handle_response(m->result, p, nullptr, nullptr);
7c673cae
FG
1899 if (ret == -EAGAIN) {
1900 auto ma = new MAuth;
1901 ma->protocol = auth->get_protocol();
1902 auth->prepare_build_request();
1903 auth->build_request(ma->auth_payload);
1904 con->send_message(ma);
1905 }
9f95a23c
TL
1906 if (ret == 0 && pending_tell_command) {
1907 con->send_message2(std::move(pending_tell_command));
1908 }
1909
7c673cae
FG
1910 return ret;
1911}
11fdf7f2
TL
1912
1913void MonClient::register_config_callback(md_config_t::config_callback fn) {
1914 ceph_assert(!config_cb);
1915 config_cb = fn;
1916}
1917
1918md_config_t::config_callback MonClient::get_config_callback() {
1919 return config_cb;
1920}
f67539c2
TL
1921
1922#pragma GCC diagnostic push
1923#pragma GCC diagnostic ignored "-Wnon-virtual-dtor"
1924#pragma clang diagnostic push
1925#pragma clang diagnostic ignored "-Wnon-virtual-dtor"
1926class monc_error_category : public ceph::converting_category {
1927public:
1928 monc_error_category(){}
1929 const char* name() const noexcept override;
1930 const char* message(int ev, char*, std::size_t) const noexcept override;
1931 std::string message(int ev) const override;
1932 bs::error_condition default_error_condition(int ev) const noexcept
1933 override;
1934 bool equivalent(int ev, const bs::error_condition& c) const
1935 noexcept override;
1936 using ceph::converting_category::equivalent;
1937 int from_code(int ev) const noexcept override;
1938};
1939#pragma GCC diagnostic pop
1940#pragma clang diagnostic pop
1941
1942const char* monc_error_category::name() const noexcept {
1943 return "monc";
1944}
1945
1946const char* monc_error_category::message(int ev, char*, std::size_t) const noexcept {
1947 if (ev == 0)
1948 return "No error";
1949
1950 switch (static_cast<monc_errc>(ev)) {
1951 case monc_errc::shutting_down: // Command failed due to MonClient shutting down
1952 return "Command failed due to MonClient shutting down";
1953 case monc_errc::session_reset:
1954 return "Monitor session was reset";
1955 case monc_errc::rank_dne:
1956 return "Requested monitor rank does not exist";
1957 case monc_errc::mon_dne:
1958 return "Requested monitor does not exist";
1959 case monc_errc::timed_out:
1960 return "Monitor operation timed out";
1961 case monc_errc::mon_unavailable:
1962 return "Monitor unavailable";
1963 }
1964
1965 return "Unknown error";
1966}
1967
1968std::string monc_error_category::message(int ev) const {
1969 return message(ev, nullptr, 0);
1970}
1971
1972bs::error_condition monc_error_category::default_error_condition(int ev) const noexcept {
1973 switch (static_cast<monc_errc>(ev)) {
1974 case monc_errc::shutting_down:
1975 return bs::errc::operation_canceled;
1976 case monc_errc::session_reset:
1977 return bs::errc::resource_unavailable_try_again;
1978 case monc_errc::rank_dne:
1979 [[fallthrough]];
1980 case monc_errc::mon_dne:
1981 return ceph::errc::not_in_map;
1982 case monc_errc::timed_out:
1983 return bs::errc::timed_out;
1984 case monc_errc::mon_unavailable:
1985 return bs::errc::no_such_device;
1986 }
1987 return { ev, *this };
1988}
1989
1990bool monc_error_category::equivalent(int ev, const bs::error_condition& c) const noexcept {
1991 switch (static_cast<monc_errc>(ev)) {
1992 case monc_errc::rank_dne:
1993 [[fallthrough]];
1994 case monc_errc::mon_dne:
1995 return c == bs::errc::no_such_file_or_directory;
1996 default:
1997 return default_error_condition(ev) == c;
1998 }
1999}
2000
2001int monc_error_category::from_code(int ev) const noexcept {
2002 if (ev == 0)
2003 return 0;
2004
2005 switch (static_cast<monc_errc>(ev)) {
2006 case monc_errc::shutting_down:
2007 return -ECANCELED;
2008 case monc_errc::session_reset:
2009 return -EAGAIN;
2010 case monc_errc::rank_dne:
2011 [[fallthrough]];
2012 case monc_errc::mon_dne:
2013 return -ENOENT;
2014 case monc_errc::timed_out:
2015 return -ETIMEDOUT;
2016 case monc_errc::mon_unavailable:
2017 return -ENXIO;
2018 }
2019 return -EDOM;
2020}
2021
2022const bs::error_category& monc_category() noexcept {
2023 static const monc_error_category c;
2024 return c;
2025}