]> git.proxmox.com Git - ceph.git/blame - ceph/src/mon/MonClient.cc
import ceph 14.2.5
[ceph.git] / ceph / src / mon / MonClient.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
11fdf7f2
TL
15#include <algorithm>
16#include <iterator>
7c673cae
FG
17#include <random>
18
c07f9fc5 19#include "include/scope_guard.h"
11fdf7f2 20#include "include/stringify.h"
c07f9fc5 21
7c673cae
FG
22#include "messages/MMonGetMap.h"
23#include "messages/MMonGetVersion.h"
24#include "messages/MMonGetVersionReply.h"
25#include "messages/MMonMap.h"
11fdf7f2
TL
26#include "messages/MConfig.h"
27#include "messages/MGetConfig.h"
7c673cae
FG
28#include "messages/MAuth.h"
29#include "messages/MLogAck.h"
30#include "messages/MAuthReply.h"
31#include "messages/MMonCommand.h"
32#include "messages/MMonCommandAck.h"
33#include "messages/MPing.h"
34
35#include "messages/MMonSubscribe.h"
36#include "messages/MMonSubscribeAck.h"
37#include "common/errno.h"
11fdf7f2 38#include "common/hostname.h"
7c673cae
FG
39#include "common/LogClient.h"
40
41#include "MonClient.h"
42#include "MonMap.h"
43
44#include "auth/Auth.h"
45#include "auth/KeyRing.h"
46#include "auth/AuthClientHandler.h"
47#include "auth/AuthMethodList.h"
11fdf7f2 48#include "auth/AuthRegistry.h"
7c673cae
FG
49#include "auth/RotatingKeyRing.h"
50
7c673cae
FG
51#define dout_subsys ceph_subsys_monc
52#undef dout_prefix
53#define dout_prefix *_dout << "monclient" << (_hunting() ? "(hunting)":"") << ": "
54
55MonClient::MonClient(CephContext *cct_) :
56 Dispatcher(cct_),
11fdf7f2 57 AuthServer(cct_),
7c673cae
FG
58 messenger(NULL),
59 monc_lock("MonClient::monc_lock"),
31f18b77
FG
60 timer(cct_, monc_lock),
61 finisher(cct_),
7c673cae 62 initialized(false),
7c673cae
FG
63 log_client(NULL),
64 more_log_pending(false),
65 want_monmap(true),
66 had_a_connection(false),
c07f9fc5 67 reopen_interval_multiplier(
11fdf7f2 68 cct_->_conf.get_val<double>("mon_client_hunt_interval_min_multiple")),
7c673cae
FG
69 last_mon_command_tid(0),
70 version_req_id(0)
11fdf7f2 71{}
7c673cae
FG
72
73MonClient::~MonClient()
74{
75}
76
77int MonClient::build_initial_monmap()
78{
79 ldout(cct, 10) << __func__ << dendl;
11fdf7f2
TL
80 int r = monmap.build_initial(cct, false, cerr);
81 ldout(cct,10) << "monmap:\n";
82 monmap.print(*_dout);
83 *_dout << dendl;
84 return r;
7c673cae
FG
85}
86
87int MonClient::get_monmap()
88{
89 ldout(cct, 10) << __func__ << dendl;
11fdf7f2 90 std::lock_guard l(monc_lock);
7c673cae 91
11fdf7f2 92 sub.want("monmap", 0, 0);
7c673cae
FG
93 if (!_opened())
94 _reopen_session();
95
96 while (want_monmap)
97 map_cond.Wait(monc_lock);
98
99 ldout(cct, 10) << __func__ << " done" << dendl;
100 return 0;
101}
102
11fdf7f2 103int MonClient::get_monmap_and_config()
7c673cae
FG
104{
105 ldout(cct, 10) << __func__ << dendl;
11fdf7f2 106 ceph_assert(!messenger);
7c673cae 107
11fdf7f2 108 int tries = 10;
7c673cae 109
11fdf7f2
TL
110 utime_t interval;
111 interval.set_from_double(cct->_conf->mon_client_hunt_interval);
7c673cae 112
11fdf7f2
TL
113 cct->init_crypto();
114 auto shutdown_crypto = make_scope_guard([this] {
115 cct->shutdown_crypto();
116 });
7c673cae 117
11fdf7f2
TL
118 int r = build_initial_monmap();
119 if (r < 0) {
120 lderr(cct) << __func__ << " cannot identify monitors to contact" << dendl;
121 return r;
7c673cae
FG
122 }
123
11fdf7f2
TL
124 messenger = Messenger::create_client_messenger(
125 cct, "temp_mon_client");
126 ceph_assert(messenger);
127 messenger->add_dispatcher_head(this);
128 messenger->start();
129 auto shutdown_msgr = make_scope_guard([this] {
7c673cae 130 messenger->shutdown();
11fdf7f2 131 messenger->wait();
7c673cae 132 delete messenger;
11fdf7f2
TL
133 messenger = nullptr;
134 if (!monmap.fsid.is_zero()) {
135 cct->_conf.set_val("fsid", stringify(monmap.fsid));
136 }
137 });
7c673cae 138
11fdf7f2
TL
139 while (tries-- > 0) {
140 r = init();
141 if (r < 0) {
142 return r;
143 }
144 r = authenticate(cct->_conf->client_mount_timeout);
145 if (r == -ETIMEDOUT) {
146 shutdown();
147 continue;
148 }
149 if (r < 0) {
150 break;
151 }
152 {
153 std::lock_guard l(monc_lock);
154 if (monmap.get_epoch() &&
155 !monmap.persistent_features.contains_all(
156 ceph::features::mon::FEATURE_MIMIC)) {
157 ldout(cct,10) << __func__ << " pre-mimic monitor, no config to fetch"
158 << dendl;
159 r = 0;
160 break;
161 }
162 while ((!got_config || monmap.get_epoch() == 0) && r == 0) {
163 ldout(cct,20) << __func__ << " waiting for monmap|config" << dendl;
164 r = map_cond.WaitInterval(monc_lock, interval);
165 }
166 if (got_config) {
167 ldout(cct,10) << __func__ << " success" << dendl;
168 r = 0;
169 break;
170 }
171 }
172 lderr(cct) << __func__ << " failed to get config" << dendl;
173 shutdown();
174 continue;
175 }
7c673cae 176
11fdf7f2
TL
177 shutdown();
178 return r;
7c673cae
FG
179}
180
181
182/**
183 * Ping the monitor with id @p mon_id and set the resulting reply in
184 * the provided @p result_reply, if this last parameter is not NULL.
185 *
186 * So that we don't rely on the MonClient's default messenger, set up
187 * during connect(), we create our own messenger to comunicate with the
188 * specified monitor. This is advantageous in the following ways:
189 *
190 * - Isolate the ping procedure from the rest of the MonClient's operations,
191 * allowing us to not acquire or manage the big monc_lock, thus not
192 * having to block waiting for some other operation to finish before we
193 * can proceed.
194 * * for instance, we can ping mon.FOO even if we are currently hunting
195 * or blocked waiting for auth to complete with mon.BAR.
196 *
197 * - Ping a monitor prior to establishing a connection (using connect())
198 * and properly establish the MonClient's messenger. This frees us
199 * from dealing with the complex foo that happens in connect().
200 *
201 * We also don't rely on MonClient as a dispatcher for this messenger,
202 * unlike what happens with the MonClient's default messenger. This allows
203 * us to sandbox the whole ping, having it much as a separate entity in
204 * the MonClient class, considerably simplifying the handling and dispatching
205 * of messages without needing to consider monc_lock.
206 *
207 * Current drawback is that we will establish a messenger for each ping
208 * we want to issue, instead of keeping a single messenger instance that
209 * would be used for all pings.
210 */
211int MonClient::ping_monitor(const string &mon_id, string *result_reply)
212{
213 ldout(cct, 10) << __func__ << dendl;
214
215 string new_mon_id;
216 if (monmap.contains("noname-"+mon_id)) {
217 new_mon_id = "noname-"+mon_id;
218 } else {
219 new_mon_id = mon_id;
220 }
221
222 if (new_mon_id.empty()) {
223 ldout(cct, 10) << __func__ << " specified mon id is empty!" << dendl;
224 return -EINVAL;
225 } else if (!monmap.contains(new_mon_id)) {
226 ldout(cct, 10) << __func__ << " no such monitor 'mon." << new_mon_id << "'"
227 << dendl;
228 return -ENOENT;
229 }
230
11fdf7f2
TL
231 // N.B. monc isn't initialized
232
233 auth_registry.refresh_config();
234
235 KeyRing keyring;
236 keyring.from_ceph_context(cct);
237 RotatingKeyRing rkeyring(cct, cct->get_module_type(), &keyring);
238
239 MonClientPinger *pinger = new MonClientPinger(cct,
240 &rkeyring,
241 result_reply);
7c673cae
FG
242
243 Messenger *smsgr = Messenger::create_client_messenger(cct, "temp_ping_client");
244 smsgr->add_dispatcher_head(pinger);
11fdf7f2 245 smsgr->set_auth_client(pinger);
7c673cae
FG
246 smsgr->start();
247
11fdf7f2 248 ConnectionRef con = smsgr->connect_to_mon(monmap.get_addrs(new_mon_id));
7c673cae
FG
249 ldout(cct, 10) << __func__ << " ping mon." << new_mon_id
250 << " " << con->get_peer_addr() << dendl;
11fdf7f2
TL
251
252 pinger->mc.reset(new MonConnection(cct, con, 0, &auth_registry));
253 pinger->mc->start(monmap.get_epoch(), entity_name);
7c673cae
FG
254 con->send_message(new MPing);
255
256 pinger->lock.Lock();
11fdf7f2 257 int ret = pinger->wait_for_reply(cct->_conf->mon_client_ping_timeout);
7c673cae
FG
258 if (ret == 0) {
259 ldout(cct,10) << __func__ << " got ping reply" << dendl;
260 } else {
261 ret = -ret;
262 }
263 pinger->lock.Unlock();
264
265 con->mark_down();
11fdf7f2 266 pinger->mc.reset();
7c673cae
FG
267 smsgr->shutdown();
268 smsgr->wait();
269 delete smsgr;
270 delete pinger;
271 return ret;
272}
273
274bool MonClient::ms_dispatch(Message *m)
275{
7c673cae
FG
276 // we only care about these message types
277 switch (m->get_type()) {
278 case CEPH_MSG_MON_MAP:
279 case CEPH_MSG_AUTH_REPLY:
280 case CEPH_MSG_MON_SUBSCRIBE_ACK:
281 case CEPH_MSG_MON_GET_VERSION_REPLY:
282 case MSG_MON_COMMAND_ACK:
283 case MSG_LOGACK:
11fdf7f2 284 case MSG_CONFIG:
7c673cae 285 break;
11fdf7f2
TL
286 case CEPH_MSG_PING:
287 m->put();
288 return true;
7c673cae
FG
289 default:
290 return false;
291 }
292
11fdf7f2 293 std::lock_guard lock(monc_lock);
7c673cae
FG
294
295 if (_hunting()) {
11fdf7f2
TL
296 auto p = _find_pending_con(m->get_connection());
297 if (p == pending_cons.end()) {
7c673cae
FG
298 // ignore any messages outside hunting sessions
299 ldout(cct, 10) << "discarding stray monitor message " << *m << dendl;
300 m->put();
301 return true;
302 }
303 } else if (!active_con || active_con->get_con() != m->get_connection()) {
304 // ignore any messages outside our session(s)
305 ldout(cct, 10) << "discarding stray monitor message " << *m << dendl;
306 m->put();
307 return true;
308 }
309
310 switch (m->get_type()) {
311 case CEPH_MSG_MON_MAP:
312 handle_monmap(static_cast<MMonMap*>(m));
31f18b77
FG
313 if (passthrough_monmap) {
314 return false;
315 } else {
316 m->put();
317 }
7c673cae
FG
318 break;
319 case CEPH_MSG_AUTH_REPLY:
320 handle_auth(static_cast<MAuthReply*>(m));
321 break;
322 case CEPH_MSG_MON_SUBSCRIBE_ACK:
323 handle_subscribe_ack(static_cast<MMonSubscribeAck*>(m));
324 break;
325 case CEPH_MSG_MON_GET_VERSION_REPLY:
326 handle_get_version_reply(static_cast<MMonGetVersionReply*>(m));
327 break;
328 case MSG_MON_COMMAND_ACK:
329 handle_mon_command_ack(static_cast<MMonCommandAck*>(m));
330 break;
331 case MSG_LOGACK:
332 if (log_client) {
333 log_client->handle_log_ack(static_cast<MLogAck*>(m));
334 m->put();
335 if (more_log_pending) {
336 send_log();
337 }
338 } else {
339 m->put();
340 }
341 break;
11fdf7f2
TL
342 case MSG_CONFIG:
343 handle_config(static_cast<MConfig*>(m));
344 break;
7c673cae
FG
345 }
346 return true;
347}
348
349void MonClient::send_log(bool flush)
350{
351 if (log_client) {
352 Message *lm = log_client->get_mon_log_message(flush);
353 if (lm)
354 _send_mon_message(lm);
355 more_log_pending = log_client->are_pending();
356 }
357}
358
359void MonClient::flush_log()
360{
11fdf7f2 361 std::lock_guard l(monc_lock);
7c673cae
FG
362 send_log();
363}
364
31f18b77
FG
365/* Unlike all the other message-handling functions, we don't put away a reference
366* because we want to support MMonMap passthrough to other Dispatchers. */
7c673cae
FG
367void MonClient::handle_monmap(MMonMap *m)
368{
369 ldout(cct, 10) << __func__ << " " << *m << dendl;
11fdf7f2
TL
370 auto con_addrs = m->get_source_addrs();
371 string old_name = monmap.get_name(con_addrs);
7c673cae 372
11fdf7f2
TL
373 // NOTE: we're not paying attention to the epoch, here.
374 auto p = m->monmapbl.cbegin();
375 decode(monmap, p);
7c673cae
FG
376
377 ldout(cct, 10) << " got monmap " << monmap.epoch
11fdf7f2
TL
378 << " from mon." << old_name
379 << " (according to old e" << monmap.get_epoch() << ")"
380 << dendl;
7c673cae
FG
381 ldout(cct, 10) << "dump:\n";
382 monmap.print(*_dout);
383 *_dout << dendl;
384
11fdf7f2
TL
385 if (old_name.size() == 0) {
386 ldout(cct,10) << " can't identify which mon we were connected to" << dendl;
7c673cae 387 _reopen_session();
11fdf7f2
TL
388 } else {
389 int new_rank = monmap.get_rank(m->get_source_addr());
390 if (new_rank < 0) {
391 ldout(cct, 10) << "mon." << new_rank << " at " << m->get_source_addrs()
392 << " went away" << dendl;
393 // can't find the mon we were talking to (above)
394 _reopen_session();
395 } else if (messenger->should_use_msgr2() &&
396 monmap.get_addrs(new_rank).has_msgr2() &&
397 !con_addrs.has_msgr2()) {
398 ldout(cct,1) << " mon." << new_rank << " has (v2) addrs "
399 << monmap.get_addrs(new_rank) << " but i'm connected to "
400 << con_addrs << ", reconnecting" << dendl;
401 _reopen_session();
402 }
7c673cae
FG
403 }
404
11fdf7f2 405 sub.got("monmap", monmap.get_epoch());
7c673cae
FG
406 map_cond.Signal();
407 want_monmap = false;
11fdf7f2
TL
408
409 if (authenticate_err == 1) {
410 _finish_auth(0);
411 }
412}
413
414void MonClient::handle_config(MConfig *m)
415{
416 ldout(cct,10) << __func__ << " " << *m << dendl;
417 finisher.queue(new FunctionContext([this, m](int r) {
418 cct->_conf.set_mon_vals(cct, m->config, config_cb);
419 if (config_notify_cb) {
420 config_notify_cb();
421 }
422 m->put();
423 }));
424 got_config = true;
425 map_cond.Signal();
7c673cae
FG
426}
427
428// ----------------------
429
430int MonClient::init()
431{
432 ldout(cct, 10) << __func__ << dendl;
433
7c673cae
FG
434 entity_name = cct->_conf->name;
435
11fdf7f2
TL
436 auth_registry.refresh_config();
437
438 std::lock_guard l(monc_lock);
439 keyring.reset(new KeyRing);
440 if (auth_registry.is_supported_method(messenger->get_mytype(),
441 CEPH_AUTH_CEPHX)) {
442 // this should succeed, because auth_registry just checked!
443 int r = keyring->from_ceph_context(cct);
444 if (r != 0) {
445 // but be somewhat graceful in case there was a race condition
446 lderr(cct) << "keyring not found" << dendl;
447 return r;
7c673cae
FG
448 }
449 }
11fdf7f2
TL
450 if (!auth_registry.any_supported_methods(messenger->get_mytype())) {
451 return -ENOENT;
7c673cae
FG
452 }
453
454 rotating_secrets.reset(
455 new RotatingKeyRing(cct, cct->get_module_type(), keyring.get()));
456
457 initialized = true;
458
11fdf7f2
TL
459 messenger->set_auth_client(this);
460 messenger->add_dispatcher_head(this);
461
7c673cae
FG
462 timer.init();
463 finisher.start();
464 schedule_tick();
465
466 return 0;
467}
468
469void MonClient::shutdown()
470{
471 ldout(cct, 10) << __func__ << dendl;
472 monc_lock.Lock();
11fdf7f2 473 stopping = true;
7c673cae
FG
474 while (!version_requests.empty()) {
475 version_requests.begin()->second->context->complete(-ECANCELED);
476 ldout(cct, 20) << __func__ << " canceling and discarding version request "
477 << version_requests.begin()->second << dendl;
478 delete version_requests.begin()->second;
479 version_requests.erase(version_requests.begin());
480 }
31f18b77
FG
481 while (!mon_commands.empty()) {
482 auto tid = mon_commands.begin()->first;
483 _cancel_mon_command(tid);
484 }
7c673cae
FG
485 while (!waiting_for_session.empty()) {
486 ldout(cct, 20) << __func__ << " discarding pending message " << *waiting_for_session.front() << dendl;
487 waiting_for_session.front()->put();
488 waiting_for_session.pop_front();
489 }
490
491 active_con.reset();
492 pending_cons.clear();
493 auth.reset();
494
495 monc_lock.Unlock();
496
497 if (initialized) {
498 finisher.wait_for_empty();
499 finisher.stop();
11fdf7f2 500 initialized = false;
7c673cae
FG
501 }
502 monc_lock.Lock();
503 timer.shutdown();
11fdf7f2 504 stopping = false;
7c673cae
FG
505 monc_lock.Unlock();
506}
507
508int MonClient::authenticate(double timeout)
509{
11fdf7f2 510 std::lock_guard lock(monc_lock);
7c673cae
FG
511
512 if (active_con) {
513 ldout(cct, 5) << "already authenticated" << dendl;
514 return 0;
515 }
11fdf7f2
TL
516 sub.want("monmap", monmap.get_epoch() ? monmap.get_epoch() + 1 : 0, 0);
517 sub.want("config", 0, 0);
7c673cae
FG
518 if (!_opened())
519 _reopen_session();
520
521 utime_t until = ceph_clock_now();
522 until += timeout;
523 if (timeout > 0.0)
524 ldout(cct, 10) << "authenticate will time out at " << until << dendl;
11fdf7f2
TL
525 authenticate_err = 1; // == in progress
526 while (!active_con && authenticate_err >= 0) {
7c673cae
FG
527 if (timeout > 0.0) {
528 int r = auth_cond.WaitUntil(monc_lock, until);
11fdf7f2 529 if (r == ETIMEDOUT && !active_con) {
7c673cae
FG
530 ldout(cct, 0) << "authenticate timed out after " << timeout << dendl;
531 authenticate_err = -r;
532 }
533 } else {
534 auth_cond.Wait(monc_lock);
535 }
536 }
537
538 if (active_con) {
539 ldout(cct, 5) << __func__ << " success, global_id "
540 << active_con->get_global_id() << dendl;
541 // active_con should not have been set if there was an error
11fdf7f2 542 ceph_assert(authenticate_err >= 0);
7c673cae
FG
543 authenticated = true;
544 }
545
11fdf7f2 546 if (authenticate_err < 0 && auth_registry.no_keyring_disabled_cephx()) {
7c673cae
FG
547 lderr(cct) << __func__ << " NOTE: no keyring found; disabled cephx authentication" << dendl;
548 }
549
550 return authenticate_err;
551}
552
553void MonClient::handle_auth(MAuthReply *m)
554{
11fdf7f2 555 ceph_assert(monc_lock.is_locked());
7c673cae
FG
556 if (!_hunting()) {
557 std::swap(active_con->get_auth(), auth);
558 int ret = active_con->authenticate(m);
559 m->put();
560 std::swap(auth, active_con->get_auth());
561 if (global_id != active_con->get_global_id()) {
562 lderr(cct) << __func__ << " peer assigned me a different global_id: "
563 << active_con->get_global_id() << dendl;
564 }
565 if (ret != -EAGAIN) {
566 _finish_auth(ret);
567 }
568 return;
569 }
570
571 // hunting
11fdf7f2
TL
572 auto found = _find_pending_con(m->get_connection());
573 ceph_assert(found != pending_cons.end());
7c673cae
FG
574 int auth_err = found->second.handle_auth(m, entity_name, want_keys,
575 rotating_secrets.get());
576 m->put();
577 if (auth_err == -EAGAIN) {
578 return;
579 }
580 if (auth_err) {
581 pending_cons.erase(found);
582 if (!pending_cons.empty()) {
583 // keep trying with pending connections
584 return;
585 }
586 // the last try just failed, give up.
587 } else {
588 auto& mc = found->second;
11fdf7f2 589 ceph_assert(mc.have_session());
7c673cae
FG
590 active_con.reset(new MonConnection(std::move(mc)));
591 pending_cons.clear();
592 }
593
11fdf7f2
TL
594 _finish_hunting(auth_err);
595 _finish_auth(auth_err);
596}
7c673cae 597
11fdf7f2
TL
598void MonClient::_finish_auth(int auth_err)
599{
600 ldout(cct,10) << __func__ << " " << auth_err << dendl;
601 authenticate_err = auth_err;
602 // _resend_mon_commands() could _reopen_session() if the connected mon is not
603 // the one the MonCommand is targeting.
604 if (!auth_err && active_con) {
605 ceph_assert(auth);
606 _check_auth_tickets();
7c673cae 607 }
11fdf7f2
TL
608 auth_cond.SignalAll();
609
7c673cae
FG
610 if (!auth_err) {
611 Context *cb = nullptr;
612 if (session_established_context) {
613 cb = session_established_context.release();
614 }
615 if (cb) {
616 monc_lock.Unlock();
617 cb->complete(0);
618 monc_lock.Lock();
619 }
620 }
621}
622
7c673cae
FG
623// ---------
624
625void MonClient::_send_mon_message(Message *m)
626{
11fdf7f2 627 ceph_assert(monc_lock.is_locked());
7c673cae
FG
628 if (active_con) {
629 auto cur_con = active_con->get_con();
630 ldout(cct, 10) << "_send_mon_message to mon."
631 << monmap.get_name(cur_con->get_peer_addr())
632 << " at " << cur_con->get_peer_addr() << dendl;
633 cur_con->send_message(m);
634 } else {
635 waiting_for_session.push_back(m);
636 }
637}
638
639void MonClient::_reopen_session(int rank)
640{
11fdf7f2 641 ceph_assert(monc_lock.is_locked());
7c673cae
FG
642 ldout(cct, 10) << __func__ << " rank " << rank << dendl;
643
644 active_con.reset();
645 pending_cons.clear();
646
647 _start_hunting();
648
649 if (rank >= 0) {
650 _add_conn(rank, global_id);
651 } else {
652 _add_conns(global_id);
653 }
654
655 // throw out old queued messages
656 while (!waiting_for_session.empty()) {
657 waiting_for_session.front()->put();
658 waiting_for_session.pop_front();
659 }
660
661 // throw out version check requests
662 while (!version_requests.empty()) {
663 finisher.queue(version_requests.begin()->second->context, -EAGAIN);
664 delete version_requests.begin()->second;
665 version_requests.erase(version_requests.begin());
666 }
667
668 for (auto& c : pending_cons) {
11fdf7f2 669 c.second.start(monmap.get_epoch(), entity_name);
7c673cae
FG
670 }
671
11fdf7f2 672 if (sub.reload()) {
7c673cae 673 _renew_subs();
11fdf7f2 674 }
7c673cae
FG
675}
676
677MonConnection& MonClient::_add_conn(unsigned rank, uint64_t global_id)
678{
11fdf7f2
TL
679 auto peer = monmap.get_addrs(rank);
680 auto conn = messenger->connect_to_mon(peer);
681 MonConnection mc(cct, conn, global_id, &auth_registry);
7c673cae
FG
682 auto inserted = pending_cons.insert(make_pair(peer, move(mc)));
683 ldout(cct, 10) << "picked mon." << monmap.get_name(rank)
684 << " con " << conn
11fdf7f2 685 << " addr " << peer
7c673cae
FG
686 << dendl;
687 return inserted.first->second;
688}
689
690void MonClient::_add_conns(uint64_t global_id)
691{
224ce89b
WB
692 uint16_t min_priority = std::numeric_limits<uint16_t>::max();
693 for (const auto& m : monmap.mon_info) {
694 if (m.second.priority < min_priority) {
695 min_priority = m.second.priority;
696 }
697 }
698 vector<unsigned> ranks;
699 for (const auto& m : monmap.mon_info) {
700 if (m.second.priority == min_priority) {
701 ranks.push_back(monmap.get_rank(m.first));
702 }
7c673cae
FG
703 }
704 std::random_device rd;
705 std::mt19937 rng(rd());
706 std::shuffle(ranks.begin(), ranks.end(), rng);
7c673cae 707 unsigned n = cct->_conf->mon_client_hunt_parallel;
224ce89b
WB
708 if (n == 0 || n > ranks.size()) {
709 n = ranks.size();
7c673cae
FG
710 }
711 for (unsigned i = 0; i < n; i++) {
712 _add_conn(ranks[i], global_id);
713 }
714}
715
716bool MonClient::ms_handle_reset(Connection *con)
717{
11fdf7f2 718 std::lock_guard lock(monc_lock);
7c673cae
FG
719
720 if (con->get_peer_type() != CEPH_ENTITY_TYPE_MON)
721 return false;
722
723 if (_hunting()) {
11fdf7f2
TL
724 if (pending_cons.count(con->get_peer_addrs())) {
725 ldout(cct, 10) << __func__ << " hunted mon " << con->get_peer_addrs()
726 << dendl;
7c673cae 727 } else {
11fdf7f2
TL
728 ldout(cct, 10) << __func__ << " stray mon " << con->get_peer_addrs()
729 << dendl;
7c673cae
FG
730 }
731 return true;
732 } else {
733 if (active_con && con == active_con->get_con()) {
11fdf7f2
TL
734 ldout(cct, 10) << __func__ << " current mon " << con->get_peer_addrs()
735 << dendl;
7c673cae
FG
736 _reopen_session();
737 return false;
738 } else {
11fdf7f2
TL
739 ldout(cct, 10) << "ms_handle_reset stray mon " << con->get_peer_addrs()
740 << dendl;
7c673cae
FG
741 return true;
742 }
743 }
744}
745
746bool MonClient::_opened() const
747{
11fdf7f2 748 ceph_assert(monc_lock.is_locked());
7c673cae
FG
749 return active_con || _hunting();
750}
751
752bool MonClient::_hunting() const
753{
754 return !pending_cons.empty();
755}
756
757void MonClient::_start_hunting()
758{
11fdf7f2 759 ceph_assert(!_hunting());
7c673cae
FG
760 // adjust timeouts if necessary
761 if (!had_a_connection)
762 return;
763 reopen_interval_multiplier *= cct->_conf->mon_client_hunt_interval_backoff;
764 if (reopen_interval_multiplier >
765 cct->_conf->mon_client_hunt_interval_max_multiple) {
766 reopen_interval_multiplier =
767 cct->_conf->mon_client_hunt_interval_max_multiple;
768 }
769}
770
11fdf7f2 771void MonClient::_finish_hunting(int auth_err)
7c673cae 772{
11fdf7f2
TL
773 ldout(cct,10) << __func__ << " " << auth_err << dendl;
774 ceph_assert(monc_lock.is_locked());
7c673cae 775 // the pending conns have been cleaned.
11fdf7f2 776 ceph_assert(!_hunting());
7c673cae
FG
777 if (active_con) {
778 auto con = active_con->get_con();
779 ldout(cct, 1) << "found mon."
780 << monmap.get_name(con->get_peer_addr())
781 << dendl;
782 } else {
783 ldout(cct, 1) << "no mon sessions established" << dendl;
784 }
785
786 had_a_connection = true;
c07f9fc5 787 _un_backoff();
11fdf7f2
TL
788
789 if (!auth_err) {
790 last_rotating_renew_sent = utime_t();
791 while (!waiting_for_session.empty()) {
792 _send_mon_message(waiting_for_session.front());
793 waiting_for_session.pop_front();
794 }
795 _resend_mon_commands();
796 send_log(true);
797 if (active_con) {
798 std::swap(auth, active_con->get_auth());
799 if (global_id && global_id != active_con->get_global_id()) {
800 lderr(cct) << __func__ << " global_id changed from " << global_id
801 << " to " << active_con->get_global_id() << dendl;
802 }
803 global_id = active_con->get_global_id();
804 }
805 }
7c673cae
FG
806}
807
808void MonClient::tick()
809{
810 ldout(cct, 10) << __func__ << dendl;
811
c07f9fc5
FG
812 auto reschedule_tick = make_scope_guard([this] {
813 schedule_tick();
814 });
815
7c673cae
FG
816 _check_auth_tickets();
817
818 if (_hunting()) {
819 ldout(cct, 1) << "continuing hunt" << dendl;
c07f9fc5 820 return _reopen_session();
7c673cae
FG
821 } else if (active_con) {
822 // just renew as needed
823 utime_t now = ceph_clock_now();
824 auto cur_con = active_con->get_con();
825 if (!cur_con->has_feature(CEPH_FEATURE_MON_STATEFUL_SUB)) {
11fdf7f2
TL
826 const bool maybe_renew = sub.need_renew();
827 ldout(cct, 10) << "renew subs? -- " << (maybe_renew ? "yes" : "no")
7c673cae 828 << dendl;
11fdf7f2 829 if (maybe_renew) {
7c673cae 830 _renew_subs();
11fdf7f2 831 }
7c673cae
FG
832 }
833
834 cur_con->send_keepalive();
835
836 if (cct->_conf->mon_client_ping_timeout > 0 &&
837 cur_con->has_feature(CEPH_FEATURE_MSGR_KEEPALIVE2)) {
838 utime_t lk = cur_con->get_last_keepalive_ack();
839 utime_t interval = now - lk;
840 if (interval > cct->_conf->mon_client_ping_timeout) {
841 ldout(cct, 1) << "no keepalive since " << lk << " (" << interval
842 << " seconds), reconnecting" << dendl;
c07f9fc5 843 return _reopen_session();
7c673cae 844 }
7c673cae
FG
845 send_log();
846 }
c07f9fc5
FG
847
848 _un_backoff();
7c673cae 849 }
c07f9fc5 850}
7c673cae 851
c07f9fc5
FG
852void MonClient::_un_backoff()
853{
854 // un-backoff our reconnect interval
855 reopen_interval_multiplier = std::max(
11fdf7f2 856 cct->_conf.get_val<double>("mon_client_hunt_interval_min_multiple"),
c07f9fc5 857 reopen_interval_multiplier /
11fdf7f2 858 cct->_conf.get_val<double>("mon_client_hunt_interval_backoff"));
c07f9fc5
FG
859 ldout(cct, 20) << __func__ << " reopen_interval_multipler now "
860 << reopen_interval_multiplier << dendl;
7c673cae
FG
861}
862
863void MonClient::schedule_tick()
864{
11fdf7f2 865 auto do_tick = make_lambda_context([this]() { tick(); });
7c673cae 866 if (_hunting()) {
11fdf7f2
TL
867 const auto hunt_interval = (cct->_conf->mon_client_hunt_interval *
868 reopen_interval_multiplier);
869 timer.add_event_after(hunt_interval, do_tick);
870 } else {
871 timer.add_event_after(cct->_conf->mon_client_ping_interval, do_tick);
872 }
7c673cae
FG
873}
874
875// ---------
876
877void MonClient::_renew_subs()
878{
11fdf7f2
TL
879 ceph_assert(monc_lock.is_locked());
880 if (!sub.have_new()) {
7c673cae
FG
881 ldout(cct, 10) << __func__ << " - empty" << dendl;
882 return;
883 }
884
885 ldout(cct, 10) << __func__ << dendl;
886 if (!_opened())
887 _reopen_session();
888 else {
7c673cae 889 MMonSubscribe *m = new MMonSubscribe;
11fdf7f2
TL
890 m->what = sub.get_subs();
891 m->hostname = ceph_get_short_hostname();
7c673cae 892 _send_mon_message(m);
11fdf7f2 893 sub.renewed();
7c673cae
FG
894 }
895}
896
897void MonClient::handle_subscribe_ack(MMonSubscribeAck *m)
898{
11fdf7f2 899 sub.acked(m->interval);
7c673cae
FG
900 m->put();
901}
902
903int MonClient::_check_auth_tickets()
904{
11fdf7f2 905 ceph_assert(monc_lock.is_locked());
7c673cae
FG
906 if (active_con && auth) {
907 if (auth->need_tickets()) {
908 ldout(cct, 10) << __func__ << " getting new tickets!" << dendl;
909 MAuth *m = new MAuth;
910 m->protocol = auth->get_protocol();
911 auth->prepare_build_request();
912 auth->build_request(m->auth_payload);
913 _send_mon_message(m);
914 }
915
916 _check_auth_rotating();
917 }
918 return 0;
919}
920
921int MonClient::_check_auth_rotating()
922{
11fdf7f2 923 ceph_assert(monc_lock.is_locked());
7c673cae
FG
924 if (!rotating_secrets ||
925 !auth_principal_needs_rotating_keys(entity_name)) {
926 ldout(cct, 20) << "_check_auth_rotating not needed by " << entity_name << dendl;
927 return 0;
928 }
929
930 if (!active_con || !auth) {
931 ldout(cct, 10) << "_check_auth_rotating waiting for auth session" << dendl;
932 return 0;
933 }
934
935 utime_t now = ceph_clock_now();
936 utime_t cutoff = now;
11fdf7f2 937 cutoff -= std::min(30.0, cct->_conf->auth_service_ticket_ttl / 4.0);
7c673cae
FG
938 utime_t issued_at_lower_bound = now;
939 issued_at_lower_bound -= cct->_conf->auth_service_ticket_ttl;
940 if (!rotating_secrets->need_new_secrets(cutoff)) {
941 ldout(cct, 10) << "_check_auth_rotating have uptodate secrets (they expire after " << cutoff << ")" << dendl;
942 rotating_secrets->dump_rotating();
943 return 0;
944 }
945
946 ldout(cct, 10) << "_check_auth_rotating renewing rotating keys (they expired before " << cutoff << ")" << dendl;
947 if (!rotating_secrets->need_new_secrets() &&
948 rotating_secrets->need_new_secrets(issued_at_lower_bound)) {
949 // the key has expired before it has been issued?
950 lderr(cct) << __func__ << " possible clock skew, rotating keys expired way too early"
951 << " (before " << issued_at_lower_bound << ")" << dendl;
952 }
953 if ((now > last_rotating_renew_sent) &&
954 double(now - last_rotating_renew_sent) < 1) {
955 ldout(cct, 10) << __func__ << " called too often (last: "
956 << last_rotating_renew_sent << "), skipping refresh" << dendl;
957 return 0;
958 }
959 MAuth *m = new MAuth;
960 m->protocol = auth->get_protocol();
961 if (auth->build_rotating_request(m->auth_payload)) {
962 last_rotating_renew_sent = now;
963 _send_mon_message(m);
964 } else {
965 m->put();
966 }
967 return 0;
968}
969
970int MonClient::wait_auth_rotating(double timeout)
971{
11fdf7f2 972 std::lock_guard l(monc_lock);
7c673cae
FG
973 utime_t now = ceph_clock_now();
974 utime_t until = now;
975 until += timeout;
976
977 // Must be initialized
11fdf7f2 978 ceph_assert(auth != nullptr);
7c673cae
FG
979
980 if (auth->get_protocol() == CEPH_AUTH_NONE)
981 return 0;
982
983 if (!rotating_secrets)
984 return 0;
985
986 while (auth_principal_needs_rotating_keys(entity_name) &&
987 rotating_secrets->need_new_secrets(now)) {
988 if (now >= until) {
989 ldout(cct, 0) << __func__ << " timed out after " << timeout << dendl;
990 return -ETIMEDOUT;
991 }
992 ldout(cct, 10) << __func__ << " waiting (until " << until << ")" << dendl;
993 auth_cond.WaitUntil(monc_lock, until);
994 now = ceph_clock_now();
995 }
996 ldout(cct, 10) << __func__ << " done" << dendl;
997 return 0;
998}
999
1000// ---------
1001
1002void MonClient::_send_command(MonCommand *r)
1003{
eafe8130
TL
1004 ++r->send_attempts;
1005
7c673cae
FG
1006 entity_addr_t peer;
1007 if (active_con) {
1008 peer = active_con->get_con()->get_peer_addr();
1009 }
1010
1011 if (r->target_rank >= 0 &&
1012 r->target_rank != monmap.get_rank(peer)) {
eafe8130
TL
1013 if (r->send_attempts > cct->_conf->mon_client_directed_command_retry) {
1014 _finish_command(r, -ENXIO, "mon unavailable");
1015 return;
1016 }
7c673cae
FG
1017 ldout(cct, 10) << __func__ << " " << r->tid << " " << r->cmd
1018 << " wants rank " << r->target_rank
1019 << ", reopening session"
1020 << dendl;
1021 if (r->target_rank >= (int)monmap.size()) {
1022 ldout(cct, 10) << " target " << r->target_rank << " >= max mon " << monmap.size() << dendl;
1023 _finish_command(r, -ENOENT, "mon rank dne");
1024 return;
1025 }
1026 _reopen_session(r->target_rank);
1027 return;
1028 }
1029
1030 if (r->target_name.length() &&
1031 r->target_name != monmap.get_name(peer)) {
eafe8130
TL
1032 if (r->send_attempts > cct->_conf->mon_client_directed_command_retry) {
1033 _finish_command(r, -ENXIO, "mon unavailable");
1034 return;
1035 }
7c673cae
FG
1036 ldout(cct, 10) << __func__ << " " << r->tid << " " << r->cmd
1037 << " wants mon " << r->target_name
1038 << ", reopening session"
1039 << dendl;
1040 if (!monmap.contains(r->target_name)) {
1041 ldout(cct, 10) << " target " << r->target_name << " not present in monmap" << dendl;
1042 _finish_command(r, -ENOENT, "mon dne");
1043 return;
1044 }
1045 _reopen_session(monmap.get_rank(r->target_name));
1046 return;
1047 }
1048
1049 ldout(cct, 10) << __func__ << " " << r->tid << " " << r->cmd << dendl;
1050 MMonCommand *m = new MMonCommand(monmap.fsid);
1051 m->set_tid(r->tid);
1052 m->cmd = r->cmd;
1053 m->set_data(r->inbl);
1054 _send_mon_message(m);
1055 return;
1056}
1057
1058void MonClient::_resend_mon_commands()
1059{
1060 // resend any requests
eafe8130
TL
1061 map<uint64_t,MonCommand*>::iterator p = mon_commands.begin();
1062 while (p != mon_commands.end()) {
1063 auto cmd = p->second;
1064 ++p;
1065 _send_command(cmd); // might remove cmd from mon_commands
7c673cae
FG
1066 }
1067}
1068
1069void MonClient::handle_mon_command_ack(MMonCommandAck *ack)
1070{
1071 MonCommand *r = NULL;
1072 uint64_t tid = ack->get_tid();
1073
1074 if (tid == 0 && !mon_commands.empty()) {
1075 r = mon_commands.begin()->second;
1076 ldout(cct, 10) << __func__ << " has tid 0, assuming it is " << r->tid << dendl;
1077 } else {
1078 map<uint64_t,MonCommand*>::iterator p = mon_commands.find(tid);
1079 if (p == mon_commands.end()) {
1080 ldout(cct, 10) << __func__ << " " << ack->get_tid() << " not found" << dendl;
1081 ack->put();
1082 return;
1083 }
1084 r = p->second;
1085 }
1086
1087 ldout(cct, 10) << __func__ << " " << r->tid << " " << r->cmd << dendl;
1088 if (r->poutbl)
1089 r->poutbl->claim(ack->get_data());
1090 _finish_command(r, ack->r, ack->rs);
1091 ack->put();
1092}
1093
31f18b77 1094int MonClient::_cancel_mon_command(uint64_t tid)
7c673cae 1095{
11fdf7f2 1096 ceph_assert(monc_lock.is_locked());
7c673cae
FG
1097
1098 map<ceph_tid_t, MonCommand*>::iterator it = mon_commands.find(tid);
1099 if (it == mon_commands.end()) {
1100 ldout(cct, 10) << __func__ << " tid " << tid << " dne" << dendl;
1101 return -ENOENT;
1102 }
1103
1104 ldout(cct, 10) << __func__ << " tid " << tid << dendl;
1105
1106 MonCommand *cmd = it->second;
1107 _finish_command(cmd, -ETIMEDOUT, "");
1108 return 0;
1109}
1110
1111void MonClient::_finish_command(MonCommand *r, int ret, string rs)
1112{
1113 ldout(cct, 10) << __func__ << " " << r->tid << " = " << ret << " " << rs << dendl;
1114 if (r->prval)
1115 *(r->prval) = ret;
1116 if (r->prs)
1117 *(r->prs) = rs;
1118 if (r->onfinish)
1119 finisher.queue(r->onfinish, ret);
1120 mon_commands.erase(r->tid);
1121 delete r;
1122}
1123
1124void MonClient::start_mon_command(const vector<string>& cmd,
1125 const bufferlist& inbl,
1126 bufferlist *outbl, string *outs,
1127 Context *onfinish)
1128{
eafe8130 1129 ldout(cct,10) << __func__ << " cmd=" << cmd << dendl;
11fdf7f2
TL
1130 std::lock_guard l(monc_lock);
1131 if (!initialized || stopping) {
eafe8130
TL
1132 if (onfinish) {
1133 onfinish->complete(-ECANCELED);
1134 }
11fdf7f2
TL
1135 return;
1136 }
7c673cae
FG
1137 MonCommand *r = new MonCommand(++last_mon_command_tid);
1138 r->cmd = cmd;
1139 r->inbl = inbl;
1140 r->poutbl = outbl;
1141 r->prs = outs;
1142 r->onfinish = onfinish;
1143 if (cct->_conf->rados_mon_op_timeout > 0) {
1144 class C_CancelMonCommand : public Context
1145 {
1146 uint64_t tid;
1147 MonClient *monc;
1148 public:
1149 C_CancelMonCommand(uint64_t tid, MonClient *monc) : tid(tid), monc(monc) {}
1150 void finish(int r) override {
31f18b77 1151 monc->_cancel_mon_command(tid);
7c673cae
FG
1152 }
1153 };
1154 r->ontimeout = new C_CancelMonCommand(r->tid, this);
1155 timer.add_event_after(cct->_conf->rados_mon_op_timeout, r->ontimeout);
1156 }
1157 mon_commands[r->tid] = r;
1158 _send_command(r);
1159}
1160
1161void MonClient::start_mon_command(const string &mon_name,
1162 const vector<string>& cmd,
1163 const bufferlist& inbl,
1164 bufferlist *outbl, string *outs,
1165 Context *onfinish)
1166{
eafe8130 1167 ldout(cct,10) << __func__ << " mon." << mon_name << " cmd=" << cmd << dendl;
11fdf7f2
TL
1168 std::lock_guard l(monc_lock);
1169 if (!initialized || stopping) {
eafe8130
TL
1170 if (onfinish) {
1171 onfinish->complete(-ECANCELED);
1172 }
11fdf7f2
TL
1173 return;
1174 }
7c673cae
FG
1175 MonCommand *r = new MonCommand(++last_mon_command_tid);
1176 r->target_name = mon_name;
1177 r->cmd = cmd;
1178 r->inbl = inbl;
1179 r->poutbl = outbl;
1180 r->prs = outs;
1181 r->onfinish = onfinish;
1182 mon_commands[r->tid] = r;
1183 _send_command(r);
1184}
1185
1186void MonClient::start_mon_command(int rank,
1187 const vector<string>& cmd,
1188 const bufferlist& inbl,
1189 bufferlist *outbl, string *outs,
1190 Context *onfinish)
1191{
eafe8130 1192 ldout(cct,10) << __func__ << " rank " << rank << " cmd=" << cmd << dendl;
11fdf7f2
TL
1193 std::lock_guard l(monc_lock);
1194 if (!initialized || stopping) {
eafe8130
TL
1195 if (onfinish) {
1196 onfinish->complete(-ECANCELED);
1197 }
11fdf7f2
TL
1198 return;
1199 }
7c673cae
FG
1200 MonCommand *r = new MonCommand(++last_mon_command_tid);
1201 r->target_rank = rank;
1202 r->cmd = cmd;
1203 r->inbl = inbl;
1204 r->poutbl = outbl;
1205 r->prs = outs;
1206 r->onfinish = onfinish;
1207 mon_commands[r->tid] = r;
1208 _send_command(r);
1209}
1210
1211// ---------
1212
1213void MonClient::get_version(string map, version_t *newest, version_t *oldest, Context *onfinish)
1214{
1215 version_req_d *req = new version_req_d(onfinish, newest, oldest);
1216 ldout(cct, 10) << "get_version " << map << " req " << req << dendl;
11fdf7f2 1217 std::lock_guard l(monc_lock);
7c673cae
FG
1218 MMonGetVersion *m = new MMonGetVersion();
1219 m->what = map;
1220 m->handle = ++version_req_id;
1221 version_requests[m->handle] = req;
1222 _send_mon_message(m);
1223}
1224
1225void MonClient::handle_get_version_reply(MMonGetVersionReply* m)
1226{
11fdf7f2 1227 ceph_assert(monc_lock.is_locked());
7c673cae
FG
1228 map<ceph_tid_t, version_req_d*>::iterator iter = version_requests.find(m->handle);
1229 if (iter == version_requests.end()) {
1230 ldout(cct, 0) << __func__ << " version request with handle " << m->handle
1231 << " not found" << dendl;
1232 } else {
1233 version_req_d *req = iter->second;
1234 ldout(cct, 10) << __func__ << " finishing " << req << " version " << m->version << dendl;
1235 version_requests.erase(iter);
1236 if (req->newest)
1237 *req->newest = m->version;
1238 if (req->oldest)
1239 *req->oldest = m->oldest_version;
1240 finisher.queue(req->context, 0);
1241 delete req;
1242 }
1243 m->put();
1244}
1245
11fdf7f2
TL
1246int MonClient::get_auth_request(
1247 Connection *con,
1248 AuthConnectionMeta *auth_meta,
1249 uint32_t *auth_method,
1250 std::vector<uint32_t> *preferred_modes,
1251 bufferlist *bl)
1252{
1253 std::lock_guard l(monc_lock);
1254 ldout(cct,10) << __func__ << " con " << con << " auth_method " << *auth_method
1255 << dendl;
1256
1257 // connection to mon?
1258 if (con->get_peer_type() == CEPH_ENTITY_TYPE_MON) {
1259 ceph_assert(!auth_meta->authorizer);
1260 for (auto& i : pending_cons) {
1261 if (i.second.is_con(con)) {
1262 return i.second.get_auth_request(
1263 auth_method, preferred_modes, bl,
1264 entity_name, want_keys, rotating_secrets.get());
1265 }
1266 }
1267 return -ENOENT;
1268 }
1269
1270 // generate authorizer
1271 if (!auth) {
1272 lderr(cct) << __func__ << " but no auth handler is set up" << dendl;
1273 return -EACCES;
1274 }
1275 auth_meta->authorizer.reset(auth->build_authorizer(con->get_peer_type()));
1276 if (!auth_meta->authorizer) {
1277 lderr(cct) << __func__ << " failed to build_authorizer for type "
1278 << ceph_entity_type_name(con->get_peer_type()) << dendl;
1279 return -EACCES;
1280 }
1281 auth_meta->auth_method = auth_meta->authorizer->protocol;
1282 auth_registry.get_supported_modes(con->get_peer_type(),
1283 auth_meta->auth_method,
1284 preferred_modes);
1285 *bl = auth_meta->authorizer->bl;
1286 return 0;
1287}
1288
1289int MonClient::handle_auth_reply_more(
1290 Connection *con,
1291 AuthConnectionMeta *auth_meta,
1292 const bufferlist& bl,
1293 bufferlist *reply)
1294{
1295 std::lock_guard l(monc_lock);
1296
1297 if (con->get_peer_type() == CEPH_ENTITY_TYPE_MON) {
1298 for (auto& i : pending_cons) {
1299 if (i.second.is_con(con)) {
1300 return i.second.handle_auth_reply_more(auth_meta, bl, reply);
1301 }
1302 }
1303 return -ENOENT;
1304 }
1305
1306 // authorizer challenges
1307 if (!auth || !auth_meta->authorizer) {
1308 lderr(cct) << __func__ << " no authorizer?" << dendl;
1309 return -1;
1310 }
1311 auth_meta->authorizer->add_challenge(cct, bl);
1312 *reply = auth_meta->authorizer->bl;
1313 return 0;
1314}
1315
1316int MonClient::handle_auth_done(
1317 Connection *con,
1318 AuthConnectionMeta *auth_meta,
1319 uint64_t global_id,
1320 uint32_t con_mode,
1321 const bufferlist& bl,
1322 CryptoKey *session_key,
1323 std::string *connection_secret)
1324{
1325 if (con->get_peer_type() == CEPH_ENTITY_TYPE_MON) {
1326 std::lock_guard l(monc_lock);
1327 for (auto& i : pending_cons) {
1328 if (i.second.is_con(con)) {
1329 int r = i.second.handle_auth_done(
1330 auth_meta, global_id, bl,
1331 session_key, connection_secret);
1332 if (r) {
1333 pending_cons.erase(i.first);
1334 if (!pending_cons.empty()) {
1335 return r;
1336 }
1337 } else {
1338 active_con.reset(new MonConnection(std::move(i.second)));
1339 pending_cons.clear();
1340 ceph_assert(active_con->have_session());
1341 }
1342
1343 _finish_hunting(r);
1344 if (r || monmap.get_epoch() > 0) {
1345 _finish_auth(r);
1346 }
1347 return r;
1348 }
1349 }
1350 return -ENOENT;
1351 } else {
1352 // verify authorizer reply
1353 auto p = bl.begin();
1354 if (!auth_meta->authorizer->verify_reply(p, &auth_meta->connection_secret)) {
1355 ldout(cct, 0) << __func__ << " failed verifying authorizer reply"
1356 << dendl;
1357 return -EACCES;
1358 }
1359 auth_meta->session_key = auth_meta->authorizer->session_key;
1360 return 0;
1361 }
1362}
1363
1364int MonClient::handle_auth_bad_method(
1365 Connection *con,
1366 AuthConnectionMeta *auth_meta,
1367 uint32_t old_auth_method,
1368 int result,
1369 const std::vector<uint32_t>& allowed_methods,
1370 const std::vector<uint32_t>& allowed_modes)
1371{
1372 auth_meta->allowed_methods = allowed_methods;
1373
1374 std::lock_guard l(monc_lock);
1375 if (con->get_peer_type() == CEPH_ENTITY_TYPE_MON) {
1376 for (auto& i : pending_cons) {
1377 if (i.second.is_con(con)) {
1378 int r = i.second.handle_auth_bad_method(old_auth_method,
1379 result,
1380 allowed_methods,
1381 allowed_modes);
1382 if (r == 0) {
1383 return r; // try another method on this con
1384 }
1385 pending_cons.erase(i.first);
1386 if (!pending_cons.empty()) {
1387 return r; // fail this con, maybe another con will succeed
1388 }
1389 // fail hunt
1390 _finish_hunting(r);
1391 _finish_auth(r);
1392 return r;
1393 }
1394 }
1395 return -ENOENT;
1396 } else {
1397 // huh...
1398 ldout(cct,10) << __func__ << " hmm, they didn't like " << old_auth_method
1399 << " result " << cpp_strerror(result)
1400 << " and auth is " << (auth ? auth->get_protocol() : 0)
1401 << dendl;
1402 return -EACCES;
1403 }
1404}
1405
1406int MonClient::handle_auth_request(
1407 Connection *con,
1408 AuthConnectionMeta *auth_meta,
1409 bool more,
1410 uint32_t auth_method,
1411 const bufferlist& payload,
1412 bufferlist *reply)
1413{
1414 auth_meta->auth_mode = payload[0];
1415 if (auth_meta->auth_mode < AUTH_MODE_AUTHORIZER ||
1416 auth_meta->auth_mode > AUTH_MODE_AUTHORIZER_MAX) {
1417 return -EACCES;
1418 }
1419 AuthAuthorizeHandler *ah = get_auth_authorize_handler(con->get_peer_type(),
1420 auth_method);
1421 if (!ah) {
1422 lderr(cct) << __func__ << " no AuthAuthorizeHandler found for auth method "
1423 << auth_method << dendl;
1424 return -EOPNOTSUPP;
1425 }
eafe8130
TL
1426
1427 auto ac = &auth_meta->authorizer_challenge;
1428 if (!HAVE_FEATURE(con->get_features(), CEPHX_V2)) {
1429 if (cct->_conf->cephx_service_require_version >= 2) {
1430 ldout(cct,10) << __func__ << " client missing CEPHX_V2 ("
1431 << "cephx_service_requre_version = "
1432 << cct->_conf->cephx_service_require_version << ")" << dendl;
1433 return -EACCES;
1434 }
1435 ac = nullptr;
1436 }
1437
11fdf7f2
TL
1438 bool was_challenge = (bool)auth_meta->authorizer_challenge;
1439 bool isvalid = ah->verify_authorizer(
1440 cct,
1441 rotating_secrets.get(),
1442 payload,
1443 auth_meta->get_connection_secret_length(),
1444 reply,
1445 &con->peer_name,
1446 &con->peer_global_id,
1447 &con->peer_caps_info,
1448 &auth_meta->session_key,
1449 &auth_meta->connection_secret,
eafe8130 1450 ac);
11fdf7f2
TL
1451 if (isvalid) {
1452 handle_authentication_dispatcher->ms_handle_authentication(con);
1453 return 1;
1454 }
1455 if (!more && !was_challenge && auth_meta->authorizer_challenge) {
1456 ldout(cct,10) << __func__ << " added challenge on " << con << dendl;
1457 return 0;
1458 }
1459 ldout(cct,10) << __func__ << " bad authorizer on " << con << dendl;
1460 return -EACCES;
1461}
1462
7c673cae 1463AuthAuthorizer* MonClient::build_authorizer(int service_id) const {
11fdf7f2 1464 std::lock_guard l(monc_lock);
7c673cae
FG
1465 if (auth) {
1466 return auth->build_authorizer(service_id);
1467 } else {
1468 ldout(cct, 0) << __func__ << " for " << ceph_entity_type_name(service_id)
1469 << ", but no auth is available now" << dendl;
1470 return nullptr;
1471 }
1472}
1473
1474#define dout_subsys ceph_subsys_monc
1475#undef dout_prefix
1476#define dout_prefix *_dout << "monclient" << (have_session() ? ": " : "(hunting): ")
1477
11fdf7f2
TL
1478MonConnection::MonConnection(
1479 CephContext *cct, ConnectionRef con, uint64_t global_id,
1480 AuthRegistry *ar)
1481 : cct(cct), con(con), global_id(global_id), auth_registry(ar)
7c673cae
FG
1482{}
1483
1484MonConnection::~MonConnection()
1485{
1486 if (con) {
1487 con->mark_down();
1488 con.reset();
1489 }
1490}
1491
1492bool MonConnection::have_session() const
1493{
1494 return state == State::HAVE_SESSION;
1495}
1496
1497void MonConnection::start(epoch_t epoch,
11fdf7f2 1498 const EntityName& entity_name)
7c673cae 1499{
11fdf7f2
TL
1500 auth_start = ceph_clock_now();
1501
1502 if (con->get_peer_addr().is_msgr2()) {
1503 ldout(cct, 10) << __func__ << " opening mon connection" << dendl;
1504 state = State::AUTHENTICATING;
1505 con->send_message(new MMonGetMap());
1506 return;
1507 }
1508
7c673cae
FG
1509 // restart authentication handshake
1510 state = State::NEGOTIATING;
1511
1512 // send an initial keepalive to ensure our timestamp is valid by the
1513 // time we are in an OPENED state (by sequencing this before
1514 // authentication).
1515 con->send_keepalive();
1516
1517 auto m = new MAuth;
11fdf7f2 1518 m->protocol = CEPH_AUTH_UNKNOWN;
7c673cae
FG
1519 m->monmap_epoch = epoch;
1520 __u8 struct_v = 1;
11fdf7f2
TL
1521 encode(struct_v, m->auth_payload);
1522 vector<uint32_t> auth_supported;
1523 auth_registry->get_supported_methods(con->get_peer_type(), &auth_supported);
1524 encode(auth_supported, m->auth_payload);
1525 encode(entity_name, m->auth_payload);
1526 encode(global_id, m->auth_payload);
7c673cae
FG
1527 con->send_message(m);
1528}
1529
11fdf7f2
TL
1530int MonConnection::get_auth_request(
1531 uint32_t *method,
1532 std::vector<uint32_t> *preferred_modes,
1533 bufferlist *bl,
1534 const EntityName& entity_name,
1535 uint32_t want_keys,
1536 RotatingKeyRing* keyring)
1537{
1538 // choose method
1539 if (auth_method < 0) {
1540 vector<uint32_t> as;
1541 auth_registry->get_supported_methods(con->get_peer_type(), &as);
1542 if (as.empty()) {
1543 return -EACCES;
1544 }
1545 auth_method = as.front();
1546 }
1547 *method = auth_method;
1548 auth_registry->get_supported_modes(con->get_peer_type(), auth_method,
1549 preferred_modes);
1550 ldout(cct,10) << __func__ << " method " << *method
1551 << " preferred_modes " << *preferred_modes << dendl;
1552 if (preferred_modes->empty()) {
1553 return -EACCES;
1554 }
1555
1556 if (auth) {
1557 auth.reset();
1558 }
1559 int r = _init_auth(*method, entity_name, want_keys, keyring, true);
1560 ceph_assert(r == 0);
1561
1562 // initial requset includes some boilerplate...
1563 encode((char)AUTH_MODE_MON, *bl);
1564 encode(entity_name, *bl);
1565 encode(global_id, *bl);
1566
1567 // and (maybe) some method-specific initial payload
1568 auth->build_initial_request(bl);
1569
1570 return 0;
1571}
1572
1573int MonConnection::handle_auth_reply_more(
1574 AuthConnectionMeta *auth_meta,
1575 const bufferlist& bl,
1576 bufferlist *reply)
1577{
1578 ldout(cct, 10) << __func__ << " payload " << bl.length() << dendl;
1579 ldout(cct, 30) << __func__ << " got\n";
1580 bl.hexdump(*_dout);
1581 *_dout << dendl;
1582
1583 auto p = bl.cbegin();
1584 ldout(cct, 10) << __func__ << " payload_len " << bl.length() << dendl;
1585 int r = auth->handle_response(0, p, &auth_meta->session_key,
1586 &auth_meta->connection_secret);
1587 if (r == -EAGAIN) {
1588 auth->prepare_build_request();
1589 auth->build_request(*reply);
1590 ldout(cct, 10) << __func__ << " responding with " << reply->length()
1591 << " bytes" << dendl;
1592 r = 0;
1593 } else if (r < 0) {
1594 lderr(cct) << __func__ << " handle_response returned " << r << dendl;
1595 } else {
1596 ldout(cct, 10) << __func__ << " authenticated!" << dendl;
1597 // FIXME
1598 ceph_abort(cct, "write me");
1599 }
1600 return r;
1601}
1602
1603int MonConnection::handle_auth_done(
1604 AuthConnectionMeta *auth_meta,
1605 uint64_t new_global_id,
1606 const bufferlist& bl,
1607 CryptoKey *session_key,
1608 std::string *connection_secret)
1609{
1610 ldout(cct,10) << __func__ << " global_id " << new_global_id
1611 << " payload " << bl.length()
1612 << dendl;
1613 global_id = new_global_id;
1614 auth->set_global_id(global_id);
1615 auto p = bl.begin();
1616 int auth_err = auth->handle_response(0, p, &auth_meta->session_key,
1617 &auth_meta->connection_secret);
1618 if (auth_err >= 0) {
1619 state = State::HAVE_SESSION;
1620 }
1621 con->set_last_keepalive_ack(auth_start);
1622 return auth_err;
1623}
1624
1625int MonConnection::handle_auth_bad_method(
1626 uint32_t old_auth_method,
1627 int result,
1628 const std::vector<uint32_t>& allowed_methods,
1629 const std::vector<uint32_t>& allowed_modes)
1630{
1631 ldout(cct,10) << __func__ << " old_auth_method " << old_auth_method
1632 << " result " << cpp_strerror(result)
1633 << " allowed_methods " << allowed_methods << dendl;
1634 vector<uint32_t> auth_supported;
1635 auth_registry->get_supported_methods(con->get_peer_type(), &auth_supported);
1636 auto p = std::find(auth_supported.begin(), auth_supported.end(),
1637 old_auth_method);
1638 assert(p != auth_supported.end());
1639 p = std::find_first_of(std::next(p), auth_supported.end(),
1640 allowed_methods.begin(), allowed_methods.end());
1641 if (p == auth_supported.end()) {
1642 lderr(cct) << __func__ << " server allowed_methods " << allowed_methods
1643 << " but i only support " << auth_supported << dendl;
1644 return -EACCES;
1645 }
1646 auth_method = *p;
1647 ldout(cct,10) << __func__ << " will try " << auth_method << " next" << dendl;
1648 return 0;
1649}
1650
7c673cae
FG
1651int MonConnection::handle_auth(MAuthReply* m,
1652 const EntityName& entity_name,
1653 uint32_t want_keys,
1654 RotatingKeyRing* keyring)
1655{
1656 if (state == State::NEGOTIATING) {
1657 int r = _negotiate(m, entity_name, want_keys, keyring);
1658 if (r) {
1659 return r;
1660 }
1661 state = State::AUTHENTICATING;
1662 }
1663 int r = authenticate(m);
1664 if (!r) {
1665 state = State::HAVE_SESSION;
1666 }
1667 return r;
1668}
1669
1670int MonConnection::_negotiate(MAuthReply *m,
1671 const EntityName& entity_name,
1672 uint32_t want_keys,
1673 RotatingKeyRing* keyring)
1674{
1675 if (auth && (int)m->protocol == auth->get_protocol()) {
1676 // good, negotiation completed
1677 auth->reset();
1678 return 0;
1679 }
1680
11fdf7f2
TL
1681 int r = _init_auth(m->protocol, entity_name, want_keys, keyring, false);
1682 if (r == -ENOTSUP) {
7c673cae
FG
1683 if (m->result == -ENOTSUP) {
1684 ldout(cct, 10) << "none of our auth protocols are supported by the server"
1685 << dendl;
1686 }
1687 return m->result;
1688 }
11fdf7f2
TL
1689 return r;
1690}
1691
1692int MonConnection::_init_auth(
1693 uint32_t method,
1694 const EntityName& entity_name,
1695 uint32_t want_keys,
1696 RotatingKeyRing* keyring,
1697 bool msgr2)
1698{
1699 ldout(cct,10) << __func__ << " method " << method << dendl;
1700 auth.reset(
1701 AuthClientHandler::create(cct, method, keyring));
1702 if (!auth) {
1703 ldout(cct, 10) << " no handler for protocol " << method << dendl;
1704 return -ENOTSUP;
1705 }
7c673cae
FG
1706
1707 // do not request MGR key unless the mon has the SERVER_KRAKEN
1708 // feature. otherwise it will give us an auth error. note that
1709 // we have to use the FEATUREMASK because pre-jewel the kraken
1710 // feature bit was used for something else.
11fdf7f2
TL
1711 if (!msgr2 &&
1712 (want_keys & CEPH_ENTITY_TYPE_MGR) &&
1713 !(con->has_features(CEPH_FEATUREMASK_SERVER_KRAKEN))) {
7c673cae
FG
1714 ldout(cct, 1) << __func__
1715 << " not requesting MGR keys from pre-kraken monitor"
1716 << dendl;
1717 want_keys &= ~CEPH_ENTITY_TYPE_MGR;
1718 }
1719 auth->set_want_keys(want_keys);
1720 auth->init(entity_name);
1721 auth->set_global_id(global_id);
1722 return 0;
1723}
1724
1725int MonConnection::authenticate(MAuthReply *m)
1726{
11fdf7f2 1727 ceph_assert(auth);
7c673cae
FG
1728 if (!m->global_id) {
1729 ldout(cct, 1) << "peer sent an invalid global_id" << dendl;
1730 }
1731 if (m->global_id != global_id) {
1732 // it's a new session
1733 auth->reset();
1734 global_id = m->global_id;
1735 auth->set_global_id(global_id);
1736 ldout(cct, 10) << "my global_id is " << m->global_id << dendl;
1737 }
11fdf7f2
TL
1738 auto p = m->result_bl.cbegin();
1739 int ret = auth->handle_response(m->result, p, nullptr, nullptr);
7c673cae
FG
1740 if (ret == -EAGAIN) {
1741 auto ma = new MAuth;
1742 ma->protocol = auth->get_protocol();
1743 auth->prepare_build_request();
1744 auth->build_request(ma->auth_payload);
1745 con->send_message(ma);
1746 }
1747 return ret;
1748}
11fdf7f2
TL
1749
1750void MonClient::register_config_callback(md_config_t::config_callback fn) {
1751 ceph_assert(!config_cb);
1752 config_cb = fn;
1753}
1754
1755md_config_t::config_callback MonClient::get_config_callback() {
1756 return config_cb;
1757}