]> git.proxmox.com Git - ceph.git/blame - ceph/src/mon/MonClient.cc
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / mon / MonClient.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
11fdf7f2
TL
15#include <algorithm>
16#include <iterator>
7c673cae
FG
17#include <random>
18
c07f9fc5 19#include "include/scope_guard.h"
11fdf7f2 20#include "include/stringify.h"
c07f9fc5 21
7c673cae
FG
22#include "messages/MMonGetMap.h"
23#include "messages/MMonGetVersion.h"
24#include "messages/MMonGetVersionReply.h"
25#include "messages/MMonMap.h"
11fdf7f2
TL
26#include "messages/MConfig.h"
27#include "messages/MGetConfig.h"
7c673cae
FG
28#include "messages/MAuth.h"
29#include "messages/MLogAck.h"
30#include "messages/MAuthReply.h"
31#include "messages/MMonCommand.h"
32#include "messages/MMonCommandAck.h"
33#include "messages/MPing.h"
34
35#include "messages/MMonSubscribe.h"
36#include "messages/MMonSubscribeAck.h"
37#include "common/errno.h"
11fdf7f2 38#include "common/hostname.h"
7c673cae
FG
39#include "common/LogClient.h"
40
41#include "MonClient.h"
42#include "MonMap.h"
43
44#include "auth/Auth.h"
45#include "auth/KeyRing.h"
46#include "auth/AuthClientHandler.h"
47#include "auth/AuthMethodList.h"
11fdf7f2 48#include "auth/AuthRegistry.h"
7c673cae
FG
49#include "auth/RotatingKeyRing.h"
50
7c673cae
FG
51#define dout_subsys ceph_subsys_monc
52#undef dout_prefix
53#define dout_prefix *_dout << "monclient" << (_hunting() ? "(hunting)":"") << ": "
54
55MonClient::MonClient(CephContext *cct_) :
56 Dispatcher(cct_),
11fdf7f2 57 AuthServer(cct_),
7c673cae
FG
58 messenger(NULL),
59 monc_lock("MonClient::monc_lock"),
31f18b77
FG
60 timer(cct_, monc_lock),
61 finisher(cct_),
7c673cae 62 initialized(false),
7c673cae
FG
63 log_client(NULL),
64 more_log_pending(false),
65 want_monmap(true),
66 had_a_connection(false),
c07f9fc5 67 reopen_interval_multiplier(
11fdf7f2 68 cct_->_conf.get_val<double>("mon_client_hunt_interval_min_multiple")),
7c673cae
FG
69 last_mon_command_tid(0),
70 version_req_id(0)
11fdf7f2 71{}
7c673cae
FG
72
73MonClient::~MonClient()
74{
75}
76
77int MonClient::build_initial_monmap()
78{
79 ldout(cct, 10) << __func__ << dendl;
11fdf7f2
TL
80 int r = monmap.build_initial(cct, false, cerr);
81 ldout(cct,10) << "monmap:\n";
82 monmap.print(*_dout);
83 *_dout << dendl;
84 return r;
7c673cae
FG
85}
86
87int MonClient::get_monmap()
88{
89 ldout(cct, 10) << __func__ << dendl;
11fdf7f2 90 std::lock_guard l(monc_lock);
7c673cae 91
11fdf7f2 92 sub.want("monmap", 0, 0);
7c673cae
FG
93 if (!_opened())
94 _reopen_session();
95
96 while (want_monmap)
97 map_cond.Wait(monc_lock);
98
99 ldout(cct, 10) << __func__ << " done" << dendl;
100 return 0;
101}
102
11fdf7f2 103int MonClient::get_monmap_and_config()
7c673cae
FG
104{
105 ldout(cct, 10) << __func__ << dendl;
11fdf7f2 106 ceph_assert(!messenger);
7c673cae 107
11fdf7f2 108 int tries = 10;
7c673cae 109
11fdf7f2
TL
110 utime_t interval;
111 interval.set_from_double(cct->_conf->mon_client_hunt_interval);
7c673cae 112
11fdf7f2
TL
113 cct->init_crypto();
114 auto shutdown_crypto = make_scope_guard([this] {
115 cct->shutdown_crypto();
116 });
7c673cae 117
11fdf7f2
TL
118 int r = build_initial_monmap();
119 if (r < 0) {
120 lderr(cct) << __func__ << " cannot identify monitors to contact" << dendl;
121 return r;
7c673cae
FG
122 }
123
11fdf7f2
TL
124 messenger = Messenger::create_client_messenger(
125 cct, "temp_mon_client");
126 ceph_assert(messenger);
127 messenger->add_dispatcher_head(this);
128 messenger->start();
129 auto shutdown_msgr = make_scope_guard([this] {
7c673cae 130 messenger->shutdown();
11fdf7f2 131 messenger->wait();
7c673cae 132 delete messenger;
11fdf7f2
TL
133 messenger = nullptr;
134 if (!monmap.fsid.is_zero()) {
135 cct->_conf.set_val("fsid", stringify(monmap.fsid));
136 }
137 });
7c673cae 138
11fdf7f2
TL
139 while (tries-- > 0) {
140 r = init();
141 if (r < 0) {
142 return r;
143 }
144 r = authenticate(cct->_conf->client_mount_timeout);
145 if (r == -ETIMEDOUT) {
146 shutdown();
147 continue;
148 }
149 if (r < 0) {
150 break;
151 }
152 {
153 std::lock_guard l(monc_lock);
154 if (monmap.get_epoch() &&
155 !monmap.persistent_features.contains_all(
156 ceph::features::mon::FEATURE_MIMIC)) {
157 ldout(cct,10) << __func__ << " pre-mimic monitor, no config to fetch"
158 << dendl;
159 r = 0;
160 break;
161 }
162 while ((!got_config || monmap.get_epoch() == 0) && r == 0) {
163 ldout(cct,20) << __func__ << " waiting for monmap|config" << dendl;
164 r = map_cond.WaitInterval(monc_lock, interval);
165 }
166 if (got_config) {
167 ldout(cct,10) << __func__ << " success" << dendl;
168 r = 0;
169 break;
170 }
171 }
172 lderr(cct) << __func__ << " failed to get config" << dendl;
173 shutdown();
174 continue;
175 }
7c673cae 176
11fdf7f2
TL
177 shutdown();
178 return r;
7c673cae
FG
179}
180
181
182/**
183 * Ping the monitor with id @p mon_id and set the resulting reply in
184 * the provided @p result_reply, if this last parameter is not NULL.
185 *
186 * So that we don't rely on the MonClient's default messenger, set up
187 * during connect(), we create our own messenger to comunicate with the
188 * specified monitor. This is advantageous in the following ways:
189 *
190 * - Isolate the ping procedure from the rest of the MonClient's operations,
191 * allowing us to not acquire or manage the big monc_lock, thus not
192 * having to block waiting for some other operation to finish before we
193 * can proceed.
194 * * for instance, we can ping mon.FOO even if we are currently hunting
195 * or blocked waiting for auth to complete with mon.BAR.
196 *
197 * - Ping a monitor prior to establishing a connection (using connect())
198 * and properly establish the MonClient's messenger. This frees us
199 * from dealing with the complex foo that happens in connect().
200 *
201 * We also don't rely on MonClient as a dispatcher for this messenger,
202 * unlike what happens with the MonClient's default messenger. This allows
203 * us to sandbox the whole ping, having it much as a separate entity in
204 * the MonClient class, considerably simplifying the handling and dispatching
205 * of messages without needing to consider monc_lock.
206 *
207 * Current drawback is that we will establish a messenger for each ping
208 * we want to issue, instead of keeping a single messenger instance that
209 * would be used for all pings.
210 */
211int MonClient::ping_monitor(const string &mon_id, string *result_reply)
212{
213 ldout(cct, 10) << __func__ << dendl;
214
215 string new_mon_id;
216 if (monmap.contains("noname-"+mon_id)) {
217 new_mon_id = "noname-"+mon_id;
218 } else {
219 new_mon_id = mon_id;
220 }
221
222 if (new_mon_id.empty()) {
223 ldout(cct, 10) << __func__ << " specified mon id is empty!" << dendl;
224 return -EINVAL;
225 } else if (!monmap.contains(new_mon_id)) {
226 ldout(cct, 10) << __func__ << " no such monitor 'mon." << new_mon_id << "'"
227 << dendl;
228 return -ENOENT;
229 }
230
11fdf7f2
TL
231 // N.B. monc isn't initialized
232
233 auth_registry.refresh_config();
234
235 KeyRing keyring;
236 keyring.from_ceph_context(cct);
237 RotatingKeyRing rkeyring(cct, cct->get_module_type(), &keyring);
238
239 MonClientPinger *pinger = new MonClientPinger(cct,
240 &rkeyring,
241 result_reply);
7c673cae
FG
242
243 Messenger *smsgr = Messenger::create_client_messenger(cct, "temp_ping_client");
244 smsgr->add_dispatcher_head(pinger);
11fdf7f2 245 smsgr->set_auth_client(pinger);
7c673cae
FG
246 smsgr->start();
247
11fdf7f2 248 ConnectionRef con = smsgr->connect_to_mon(monmap.get_addrs(new_mon_id));
7c673cae
FG
249 ldout(cct, 10) << __func__ << " ping mon." << new_mon_id
250 << " " << con->get_peer_addr() << dendl;
11fdf7f2
TL
251
252 pinger->mc.reset(new MonConnection(cct, con, 0, &auth_registry));
253 pinger->mc->start(monmap.get_epoch(), entity_name);
7c673cae
FG
254 con->send_message(new MPing);
255
256 pinger->lock.Lock();
11fdf7f2 257 int ret = pinger->wait_for_reply(cct->_conf->mon_client_ping_timeout);
7c673cae
FG
258 if (ret == 0) {
259 ldout(cct,10) << __func__ << " got ping reply" << dendl;
260 } else {
261 ret = -ret;
262 }
263 pinger->lock.Unlock();
264
265 con->mark_down();
11fdf7f2 266 pinger->mc.reset();
7c673cae
FG
267 smsgr->shutdown();
268 smsgr->wait();
269 delete smsgr;
270 delete pinger;
271 return ret;
272}
273
274bool MonClient::ms_dispatch(Message *m)
275{
7c673cae
FG
276 // we only care about these message types
277 switch (m->get_type()) {
278 case CEPH_MSG_MON_MAP:
279 case CEPH_MSG_AUTH_REPLY:
280 case CEPH_MSG_MON_SUBSCRIBE_ACK:
281 case CEPH_MSG_MON_GET_VERSION_REPLY:
282 case MSG_MON_COMMAND_ACK:
283 case MSG_LOGACK:
11fdf7f2 284 case MSG_CONFIG:
7c673cae 285 break;
11fdf7f2
TL
286 case CEPH_MSG_PING:
287 m->put();
288 return true;
7c673cae
FG
289 default:
290 return false;
291 }
292
11fdf7f2 293 std::lock_guard lock(monc_lock);
7c673cae
FG
294
295 if (_hunting()) {
11fdf7f2
TL
296 auto p = _find_pending_con(m->get_connection());
297 if (p == pending_cons.end()) {
7c673cae
FG
298 // ignore any messages outside hunting sessions
299 ldout(cct, 10) << "discarding stray monitor message " << *m << dendl;
300 m->put();
301 return true;
302 }
303 } else if (!active_con || active_con->get_con() != m->get_connection()) {
304 // ignore any messages outside our session(s)
305 ldout(cct, 10) << "discarding stray monitor message " << *m << dendl;
306 m->put();
307 return true;
308 }
309
310 switch (m->get_type()) {
311 case CEPH_MSG_MON_MAP:
312 handle_monmap(static_cast<MMonMap*>(m));
31f18b77
FG
313 if (passthrough_monmap) {
314 return false;
315 } else {
316 m->put();
317 }
7c673cae
FG
318 break;
319 case CEPH_MSG_AUTH_REPLY:
320 handle_auth(static_cast<MAuthReply*>(m));
321 break;
322 case CEPH_MSG_MON_SUBSCRIBE_ACK:
323 handle_subscribe_ack(static_cast<MMonSubscribeAck*>(m));
324 break;
325 case CEPH_MSG_MON_GET_VERSION_REPLY:
326 handle_get_version_reply(static_cast<MMonGetVersionReply*>(m));
327 break;
328 case MSG_MON_COMMAND_ACK:
329 handle_mon_command_ack(static_cast<MMonCommandAck*>(m));
330 break;
331 case MSG_LOGACK:
332 if (log_client) {
333 log_client->handle_log_ack(static_cast<MLogAck*>(m));
334 m->put();
335 if (more_log_pending) {
336 send_log();
337 }
338 } else {
339 m->put();
340 }
341 break;
11fdf7f2
TL
342 case MSG_CONFIG:
343 handle_config(static_cast<MConfig*>(m));
344 break;
7c673cae
FG
345 }
346 return true;
347}
348
349void MonClient::send_log(bool flush)
350{
351 if (log_client) {
352 Message *lm = log_client->get_mon_log_message(flush);
353 if (lm)
354 _send_mon_message(lm);
355 more_log_pending = log_client->are_pending();
356 }
357}
358
359void MonClient::flush_log()
360{
11fdf7f2 361 std::lock_guard l(monc_lock);
7c673cae
FG
362 send_log();
363}
364
31f18b77
FG
365/* Unlike all the other message-handling functions, we don't put away a reference
366* because we want to support MMonMap passthrough to other Dispatchers. */
7c673cae
FG
367void MonClient::handle_monmap(MMonMap *m)
368{
369 ldout(cct, 10) << __func__ << " " << *m << dendl;
11fdf7f2
TL
370 auto con_addrs = m->get_source_addrs();
371 string old_name = monmap.get_name(con_addrs);
7c673cae 372
11fdf7f2
TL
373 // NOTE: we're not paying attention to the epoch, here.
374 auto p = m->monmapbl.cbegin();
375 decode(monmap, p);
7c673cae
FG
376
377 ldout(cct, 10) << " got monmap " << monmap.epoch
11fdf7f2
TL
378 << " from mon." << old_name
379 << " (according to old e" << monmap.get_epoch() << ")"
380 << dendl;
7c673cae
FG
381 ldout(cct, 10) << "dump:\n";
382 monmap.print(*_dout);
383 *_dout << dendl;
384
11fdf7f2
TL
385 if (old_name.size() == 0) {
386 ldout(cct,10) << " can't identify which mon we were connected to" << dendl;
7c673cae 387 _reopen_session();
11fdf7f2
TL
388 } else {
389 int new_rank = monmap.get_rank(m->get_source_addr());
390 if (new_rank < 0) {
391 ldout(cct, 10) << "mon." << new_rank << " at " << m->get_source_addrs()
392 << " went away" << dendl;
393 // can't find the mon we were talking to (above)
394 _reopen_session();
395 } else if (messenger->should_use_msgr2() &&
396 monmap.get_addrs(new_rank).has_msgr2() &&
397 !con_addrs.has_msgr2()) {
398 ldout(cct,1) << " mon." << new_rank << " has (v2) addrs "
399 << monmap.get_addrs(new_rank) << " but i'm connected to "
400 << con_addrs << ", reconnecting" << dendl;
401 _reopen_session();
402 }
7c673cae
FG
403 }
404
11fdf7f2 405 sub.got("monmap", monmap.get_epoch());
7c673cae
FG
406 map_cond.Signal();
407 want_monmap = false;
11fdf7f2
TL
408
409 if (authenticate_err == 1) {
410 _finish_auth(0);
411 }
412}
413
414void MonClient::handle_config(MConfig *m)
415{
416 ldout(cct,10) << __func__ << " " << *m << dendl;
417 finisher.queue(new FunctionContext([this, m](int r) {
418 cct->_conf.set_mon_vals(cct, m->config, config_cb);
419 if (config_notify_cb) {
420 config_notify_cb();
421 }
422 m->put();
423 }));
424 got_config = true;
425 map_cond.Signal();
7c673cae
FG
426}
427
428// ----------------------
429
430int MonClient::init()
431{
432 ldout(cct, 10) << __func__ << dendl;
433
7c673cae
FG
434 entity_name = cct->_conf->name;
435
11fdf7f2
TL
436 auth_registry.refresh_config();
437
438 std::lock_guard l(monc_lock);
439 keyring.reset(new KeyRing);
440 if (auth_registry.is_supported_method(messenger->get_mytype(),
441 CEPH_AUTH_CEPHX)) {
442 // this should succeed, because auth_registry just checked!
443 int r = keyring->from_ceph_context(cct);
444 if (r != 0) {
445 // but be somewhat graceful in case there was a race condition
446 lderr(cct) << "keyring not found" << dendl;
447 return r;
7c673cae
FG
448 }
449 }
11fdf7f2
TL
450 if (!auth_registry.any_supported_methods(messenger->get_mytype())) {
451 return -ENOENT;
7c673cae
FG
452 }
453
454 rotating_secrets.reset(
455 new RotatingKeyRing(cct, cct->get_module_type(), keyring.get()));
456
457 initialized = true;
458
11fdf7f2
TL
459 messenger->set_auth_client(this);
460 messenger->add_dispatcher_head(this);
461
7c673cae
FG
462 timer.init();
463 finisher.start();
464 schedule_tick();
465
466 return 0;
467}
468
469void MonClient::shutdown()
470{
471 ldout(cct, 10) << __func__ << dendl;
472 monc_lock.Lock();
11fdf7f2 473 stopping = true;
7c673cae
FG
474 while (!version_requests.empty()) {
475 version_requests.begin()->second->context->complete(-ECANCELED);
476 ldout(cct, 20) << __func__ << " canceling and discarding version request "
477 << version_requests.begin()->second << dendl;
478 delete version_requests.begin()->second;
479 version_requests.erase(version_requests.begin());
480 }
31f18b77
FG
481 while (!mon_commands.empty()) {
482 auto tid = mon_commands.begin()->first;
483 _cancel_mon_command(tid);
484 }
7c673cae
FG
485 while (!waiting_for_session.empty()) {
486 ldout(cct, 20) << __func__ << " discarding pending message " << *waiting_for_session.front() << dendl;
487 waiting_for_session.front()->put();
488 waiting_for_session.pop_front();
489 }
490
491 active_con.reset();
492 pending_cons.clear();
493 auth.reset();
494
495 monc_lock.Unlock();
496
497 if (initialized) {
498 finisher.wait_for_empty();
499 finisher.stop();
11fdf7f2 500 initialized = false;
7c673cae
FG
501 }
502 monc_lock.Lock();
503 timer.shutdown();
11fdf7f2 504 stopping = false;
7c673cae
FG
505 monc_lock.Unlock();
506}
507
508int MonClient::authenticate(double timeout)
509{
11fdf7f2 510 std::lock_guard lock(monc_lock);
7c673cae
FG
511
512 if (active_con) {
513 ldout(cct, 5) << "already authenticated" << dendl;
514 return 0;
515 }
11fdf7f2
TL
516 sub.want("monmap", monmap.get_epoch() ? monmap.get_epoch() + 1 : 0, 0);
517 sub.want("config", 0, 0);
7c673cae
FG
518 if (!_opened())
519 _reopen_session();
520
521 utime_t until = ceph_clock_now();
522 until += timeout;
523 if (timeout > 0.0)
524 ldout(cct, 10) << "authenticate will time out at " << until << dendl;
11fdf7f2
TL
525 authenticate_err = 1; // == in progress
526 while (!active_con && authenticate_err >= 0) {
7c673cae
FG
527 if (timeout > 0.0) {
528 int r = auth_cond.WaitUntil(monc_lock, until);
11fdf7f2 529 if (r == ETIMEDOUT && !active_con) {
7c673cae
FG
530 ldout(cct, 0) << "authenticate timed out after " << timeout << dendl;
531 authenticate_err = -r;
532 }
533 } else {
534 auth_cond.Wait(monc_lock);
535 }
536 }
537
538 if (active_con) {
539 ldout(cct, 5) << __func__ << " success, global_id "
540 << active_con->get_global_id() << dendl;
541 // active_con should not have been set if there was an error
11fdf7f2 542 ceph_assert(authenticate_err >= 0);
7c673cae
FG
543 authenticated = true;
544 }
545
11fdf7f2 546 if (authenticate_err < 0 && auth_registry.no_keyring_disabled_cephx()) {
7c673cae
FG
547 lderr(cct) << __func__ << " NOTE: no keyring found; disabled cephx authentication" << dendl;
548 }
549
550 return authenticate_err;
551}
552
553void MonClient::handle_auth(MAuthReply *m)
554{
11fdf7f2 555 ceph_assert(monc_lock.is_locked());
7c673cae
FG
556 if (!_hunting()) {
557 std::swap(active_con->get_auth(), auth);
558 int ret = active_con->authenticate(m);
559 m->put();
560 std::swap(auth, active_con->get_auth());
561 if (global_id != active_con->get_global_id()) {
562 lderr(cct) << __func__ << " peer assigned me a different global_id: "
563 << active_con->get_global_id() << dendl;
564 }
565 if (ret != -EAGAIN) {
566 _finish_auth(ret);
567 }
568 return;
569 }
570
571 // hunting
11fdf7f2
TL
572 auto found = _find_pending_con(m->get_connection());
573 ceph_assert(found != pending_cons.end());
7c673cae
FG
574 int auth_err = found->second.handle_auth(m, entity_name, want_keys,
575 rotating_secrets.get());
576 m->put();
577 if (auth_err == -EAGAIN) {
578 return;
579 }
580 if (auth_err) {
581 pending_cons.erase(found);
582 if (!pending_cons.empty()) {
583 // keep trying with pending connections
584 return;
585 }
586 // the last try just failed, give up.
587 } else {
588 auto& mc = found->second;
11fdf7f2 589 ceph_assert(mc.have_session());
7c673cae
FG
590 active_con.reset(new MonConnection(std::move(mc)));
591 pending_cons.clear();
592 }
593
11fdf7f2
TL
594 _finish_hunting(auth_err);
595 _finish_auth(auth_err);
596}
7c673cae 597
11fdf7f2
TL
598void MonClient::_finish_auth(int auth_err)
599{
600 ldout(cct,10) << __func__ << " " << auth_err << dendl;
601 authenticate_err = auth_err;
602 // _resend_mon_commands() could _reopen_session() if the connected mon is not
603 // the one the MonCommand is targeting.
604 if (!auth_err && active_con) {
605 ceph_assert(auth);
606 _check_auth_tickets();
7c673cae 607 }
11fdf7f2
TL
608 auth_cond.SignalAll();
609
7c673cae
FG
610 if (!auth_err) {
611 Context *cb = nullptr;
612 if (session_established_context) {
613 cb = session_established_context.release();
614 }
615 if (cb) {
616 monc_lock.Unlock();
617 cb->complete(0);
618 monc_lock.Lock();
619 }
620 }
621}
622
7c673cae
FG
623// ---------
624
625void MonClient::_send_mon_message(Message *m)
626{
11fdf7f2 627 ceph_assert(monc_lock.is_locked());
7c673cae
FG
628 if (active_con) {
629 auto cur_con = active_con->get_con();
630 ldout(cct, 10) << "_send_mon_message to mon."
631 << monmap.get_name(cur_con->get_peer_addr())
632 << " at " << cur_con->get_peer_addr() << dendl;
633 cur_con->send_message(m);
634 } else {
635 waiting_for_session.push_back(m);
636 }
637}
638
639void MonClient::_reopen_session(int rank)
640{
11fdf7f2 641 ceph_assert(monc_lock.is_locked());
7c673cae
FG
642 ldout(cct, 10) << __func__ << " rank " << rank << dendl;
643
644 active_con.reset();
645 pending_cons.clear();
646
647 _start_hunting();
648
649 if (rank >= 0) {
650 _add_conn(rank, global_id);
651 } else {
652 _add_conns(global_id);
653 }
654
655 // throw out old queued messages
656 while (!waiting_for_session.empty()) {
657 waiting_for_session.front()->put();
658 waiting_for_session.pop_front();
659 }
660
661 // throw out version check requests
662 while (!version_requests.empty()) {
663 finisher.queue(version_requests.begin()->second->context, -EAGAIN);
664 delete version_requests.begin()->second;
665 version_requests.erase(version_requests.begin());
666 }
667
668 for (auto& c : pending_cons) {
11fdf7f2 669 c.second.start(monmap.get_epoch(), entity_name);
7c673cae
FG
670 }
671
11fdf7f2 672 if (sub.reload()) {
7c673cae 673 _renew_subs();
11fdf7f2 674 }
7c673cae
FG
675}
676
677MonConnection& MonClient::_add_conn(unsigned rank, uint64_t global_id)
678{
11fdf7f2
TL
679 auto peer = monmap.get_addrs(rank);
680 auto conn = messenger->connect_to_mon(peer);
681 MonConnection mc(cct, conn, global_id, &auth_registry);
7c673cae
FG
682 auto inserted = pending_cons.insert(make_pair(peer, move(mc)));
683 ldout(cct, 10) << "picked mon." << monmap.get_name(rank)
684 << " con " << conn
11fdf7f2 685 << " addr " << peer
7c673cae
FG
686 << dendl;
687 return inserted.first->second;
688}
689
690void MonClient::_add_conns(uint64_t global_id)
691{
224ce89b
WB
692 uint16_t min_priority = std::numeric_limits<uint16_t>::max();
693 for (const auto& m : monmap.mon_info) {
694 if (m.second.priority < min_priority) {
695 min_priority = m.second.priority;
696 }
697 }
698 vector<unsigned> ranks;
699 for (const auto& m : monmap.mon_info) {
700 if (m.second.priority == min_priority) {
701 ranks.push_back(monmap.get_rank(m.first));
702 }
7c673cae
FG
703 }
704 std::random_device rd;
705 std::mt19937 rng(rd());
706 std::shuffle(ranks.begin(), ranks.end(), rng);
7c673cae 707 unsigned n = cct->_conf->mon_client_hunt_parallel;
224ce89b
WB
708 if (n == 0 || n > ranks.size()) {
709 n = ranks.size();
7c673cae
FG
710 }
711 for (unsigned i = 0; i < n; i++) {
712 _add_conn(ranks[i], global_id);
713 }
714}
715
716bool MonClient::ms_handle_reset(Connection *con)
717{
11fdf7f2 718 std::lock_guard lock(monc_lock);
7c673cae
FG
719
720 if (con->get_peer_type() != CEPH_ENTITY_TYPE_MON)
721 return false;
722
723 if (_hunting()) {
11fdf7f2
TL
724 if (pending_cons.count(con->get_peer_addrs())) {
725 ldout(cct, 10) << __func__ << " hunted mon " << con->get_peer_addrs()
726 << dendl;
7c673cae 727 } else {
11fdf7f2
TL
728 ldout(cct, 10) << __func__ << " stray mon " << con->get_peer_addrs()
729 << dendl;
7c673cae
FG
730 }
731 return true;
732 } else {
733 if (active_con && con == active_con->get_con()) {
11fdf7f2
TL
734 ldout(cct, 10) << __func__ << " current mon " << con->get_peer_addrs()
735 << dendl;
7c673cae
FG
736 _reopen_session();
737 return false;
738 } else {
11fdf7f2
TL
739 ldout(cct, 10) << "ms_handle_reset stray mon " << con->get_peer_addrs()
740 << dendl;
7c673cae
FG
741 return true;
742 }
743 }
744}
745
746bool MonClient::_opened() const
747{
11fdf7f2 748 ceph_assert(monc_lock.is_locked());
7c673cae
FG
749 return active_con || _hunting();
750}
751
752bool MonClient::_hunting() const
753{
754 return !pending_cons.empty();
755}
756
757void MonClient::_start_hunting()
758{
11fdf7f2 759 ceph_assert(!_hunting());
7c673cae
FG
760 // adjust timeouts if necessary
761 if (!had_a_connection)
762 return;
763 reopen_interval_multiplier *= cct->_conf->mon_client_hunt_interval_backoff;
764 if (reopen_interval_multiplier >
765 cct->_conf->mon_client_hunt_interval_max_multiple) {
766 reopen_interval_multiplier =
767 cct->_conf->mon_client_hunt_interval_max_multiple;
768 }
769}
770
11fdf7f2 771void MonClient::_finish_hunting(int auth_err)
7c673cae 772{
11fdf7f2
TL
773 ldout(cct,10) << __func__ << " " << auth_err << dendl;
774 ceph_assert(monc_lock.is_locked());
7c673cae 775 // the pending conns have been cleaned.
11fdf7f2 776 ceph_assert(!_hunting());
7c673cae
FG
777 if (active_con) {
778 auto con = active_con->get_con();
779 ldout(cct, 1) << "found mon."
780 << monmap.get_name(con->get_peer_addr())
781 << dendl;
782 } else {
783 ldout(cct, 1) << "no mon sessions established" << dendl;
784 }
785
786 had_a_connection = true;
c07f9fc5 787 _un_backoff();
11fdf7f2
TL
788
789 if (!auth_err) {
790 last_rotating_renew_sent = utime_t();
791 while (!waiting_for_session.empty()) {
792 _send_mon_message(waiting_for_session.front());
793 waiting_for_session.pop_front();
794 }
795 _resend_mon_commands();
796 send_log(true);
797 if (active_con) {
798 std::swap(auth, active_con->get_auth());
799 if (global_id && global_id != active_con->get_global_id()) {
800 lderr(cct) << __func__ << " global_id changed from " << global_id
801 << " to " << active_con->get_global_id() << dendl;
802 }
803 global_id = active_con->get_global_id();
804 }
805 }
7c673cae
FG
806}
807
808void MonClient::tick()
809{
810 ldout(cct, 10) << __func__ << dendl;
811
c07f9fc5
FG
812 auto reschedule_tick = make_scope_guard([this] {
813 schedule_tick();
814 });
815
7c673cae
FG
816 _check_auth_tickets();
817
818 if (_hunting()) {
819 ldout(cct, 1) << "continuing hunt" << dendl;
c07f9fc5 820 return _reopen_session();
7c673cae
FG
821 } else if (active_con) {
822 // just renew as needed
823 utime_t now = ceph_clock_now();
824 auto cur_con = active_con->get_con();
825 if (!cur_con->has_feature(CEPH_FEATURE_MON_STATEFUL_SUB)) {
11fdf7f2
TL
826 const bool maybe_renew = sub.need_renew();
827 ldout(cct, 10) << "renew subs? -- " << (maybe_renew ? "yes" : "no")
7c673cae 828 << dendl;
11fdf7f2 829 if (maybe_renew) {
7c673cae 830 _renew_subs();
11fdf7f2 831 }
7c673cae
FG
832 }
833
834 cur_con->send_keepalive();
835
836 if (cct->_conf->mon_client_ping_timeout > 0 &&
837 cur_con->has_feature(CEPH_FEATURE_MSGR_KEEPALIVE2)) {
838 utime_t lk = cur_con->get_last_keepalive_ack();
839 utime_t interval = now - lk;
840 if (interval > cct->_conf->mon_client_ping_timeout) {
841 ldout(cct, 1) << "no keepalive since " << lk << " (" << interval
842 << " seconds), reconnecting" << dendl;
c07f9fc5 843 return _reopen_session();
7c673cae 844 }
7c673cae
FG
845 send_log();
846 }
c07f9fc5
FG
847
848 _un_backoff();
7c673cae 849 }
c07f9fc5 850}
7c673cae 851
c07f9fc5
FG
852void MonClient::_un_backoff()
853{
854 // un-backoff our reconnect interval
855 reopen_interval_multiplier = std::max(
11fdf7f2 856 cct->_conf.get_val<double>("mon_client_hunt_interval_min_multiple"),
c07f9fc5 857 reopen_interval_multiplier /
11fdf7f2 858 cct->_conf.get_val<double>("mon_client_hunt_interval_backoff"));
c07f9fc5
FG
859 ldout(cct, 20) << __func__ << " reopen_interval_multipler now "
860 << reopen_interval_multiplier << dendl;
7c673cae
FG
861}
862
863void MonClient::schedule_tick()
864{
11fdf7f2 865 auto do_tick = make_lambda_context([this]() { tick(); });
7c673cae 866 if (_hunting()) {
11fdf7f2
TL
867 const auto hunt_interval = (cct->_conf->mon_client_hunt_interval *
868 reopen_interval_multiplier);
869 timer.add_event_after(hunt_interval, do_tick);
870 } else {
871 timer.add_event_after(cct->_conf->mon_client_ping_interval, do_tick);
872 }
7c673cae
FG
873}
874
875// ---------
876
877void MonClient::_renew_subs()
878{
11fdf7f2
TL
879 ceph_assert(monc_lock.is_locked());
880 if (!sub.have_new()) {
7c673cae
FG
881 ldout(cct, 10) << __func__ << " - empty" << dendl;
882 return;
883 }
884
885 ldout(cct, 10) << __func__ << dendl;
886 if (!_opened())
887 _reopen_session();
888 else {
7c673cae 889 MMonSubscribe *m = new MMonSubscribe;
11fdf7f2
TL
890 m->what = sub.get_subs();
891 m->hostname = ceph_get_short_hostname();
7c673cae 892 _send_mon_message(m);
11fdf7f2 893 sub.renewed();
7c673cae
FG
894 }
895}
896
897void MonClient::handle_subscribe_ack(MMonSubscribeAck *m)
898{
11fdf7f2 899 sub.acked(m->interval);
7c673cae
FG
900 m->put();
901}
902
903int MonClient::_check_auth_tickets()
904{
11fdf7f2 905 ceph_assert(monc_lock.is_locked());
7c673cae
FG
906 if (active_con && auth) {
907 if (auth->need_tickets()) {
908 ldout(cct, 10) << __func__ << " getting new tickets!" << dendl;
909 MAuth *m = new MAuth;
910 m->protocol = auth->get_protocol();
911 auth->prepare_build_request();
912 auth->build_request(m->auth_payload);
913 _send_mon_message(m);
914 }
915
916 _check_auth_rotating();
917 }
918 return 0;
919}
920
921int MonClient::_check_auth_rotating()
922{
11fdf7f2 923 ceph_assert(monc_lock.is_locked());
7c673cae
FG
924 if (!rotating_secrets ||
925 !auth_principal_needs_rotating_keys(entity_name)) {
926 ldout(cct, 20) << "_check_auth_rotating not needed by " << entity_name << dendl;
927 return 0;
928 }
929
930 if (!active_con || !auth) {
931 ldout(cct, 10) << "_check_auth_rotating waiting for auth session" << dendl;
932 return 0;
933 }
934
935 utime_t now = ceph_clock_now();
936 utime_t cutoff = now;
11fdf7f2 937 cutoff -= std::min(30.0, cct->_conf->auth_service_ticket_ttl / 4.0);
7c673cae
FG
938 utime_t issued_at_lower_bound = now;
939 issued_at_lower_bound -= cct->_conf->auth_service_ticket_ttl;
940 if (!rotating_secrets->need_new_secrets(cutoff)) {
941 ldout(cct, 10) << "_check_auth_rotating have uptodate secrets (they expire after " << cutoff << ")" << dendl;
942 rotating_secrets->dump_rotating();
943 return 0;
944 }
945
946 ldout(cct, 10) << "_check_auth_rotating renewing rotating keys (they expired before " << cutoff << ")" << dendl;
947 if (!rotating_secrets->need_new_secrets() &&
948 rotating_secrets->need_new_secrets(issued_at_lower_bound)) {
949 // the key has expired before it has been issued?
950 lderr(cct) << __func__ << " possible clock skew, rotating keys expired way too early"
951 << " (before " << issued_at_lower_bound << ")" << dendl;
952 }
953 if ((now > last_rotating_renew_sent) &&
954 double(now - last_rotating_renew_sent) < 1) {
955 ldout(cct, 10) << __func__ << " called too often (last: "
956 << last_rotating_renew_sent << "), skipping refresh" << dendl;
957 return 0;
958 }
959 MAuth *m = new MAuth;
960 m->protocol = auth->get_protocol();
961 if (auth->build_rotating_request(m->auth_payload)) {
962 last_rotating_renew_sent = now;
963 _send_mon_message(m);
964 } else {
965 m->put();
966 }
967 return 0;
968}
969
970int MonClient::wait_auth_rotating(double timeout)
971{
11fdf7f2 972 std::lock_guard l(monc_lock);
7c673cae
FG
973 utime_t now = ceph_clock_now();
974 utime_t until = now;
975 until += timeout;
976
977 // Must be initialized
11fdf7f2 978 ceph_assert(auth != nullptr);
7c673cae
FG
979
980 if (auth->get_protocol() == CEPH_AUTH_NONE)
981 return 0;
982
983 if (!rotating_secrets)
984 return 0;
985
986 while (auth_principal_needs_rotating_keys(entity_name) &&
987 rotating_secrets->need_new_secrets(now)) {
988 if (now >= until) {
989 ldout(cct, 0) << __func__ << " timed out after " << timeout << dendl;
990 return -ETIMEDOUT;
991 }
992 ldout(cct, 10) << __func__ << " waiting (until " << until << ")" << dendl;
993 auth_cond.WaitUntil(monc_lock, until);
994 now = ceph_clock_now();
995 }
996 ldout(cct, 10) << __func__ << " done" << dendl;
997 return 0;
998}
999
1000// ---------
1001
1002void MonClient::_send_command(MonCommand *r)
1003{
1004 entity_addr_t peer;
1005 if (active_con) {
1006 peer = active_con->get_con()->get_peer_addr();
1007 }
1008
1009 if (r->target_rank >= 0 &&
1010 r->target_rank != monmap.get_rank(peer)) {
1011 ldout(cct, 10) << __func__ << " " << r->tid << " " << r->cmd
1012 << " wants rank " << r->target_rank
1013 << ", reopening session"
1014 << dendl;
1015 if (r->target_rank >= (int)monmap.size()) {
1016 ldout(cct, 10) << " target " << r->target_rank << " >= max mon " << monmap.size() << dendl;
1017 _finish_command(r, -ENOENT, "mon rank dne");
1018 return;
1019 }
1020 _reopen_session(r->target_rank);
1021 return;
1022 }
1023
1024 if (r->target_name.length() &&
1025 r->target_name != monmap.get_name(peer)) {
1026 ldout(cct, 10) << __func__ << " " << r->tid << " " << r->cmd
1027 << " wants mon " << r->target_name
1028 << ", reopening session"
1029 << dendl;
1030 if (!monmap.contains(r->target_name)) {
1031 ldout(cct, 10) << " target " << r->target_name << " not present in monmap" << dendl;
1032 _finish_command(r, -ENOENT, "mon dne");
1033 return;
1034 }
1035 _reopen_session(monmap.get_rank(r->target_name));
1036 return;
1037 }
1038
1039 ldout(cct, 10) << __func__ << " " << r->tid << " " << r->cmd << dendl;
1040 MMonCommand *m = new MMonCommand(monmap.fsid);
1041 m->set_tid(r->tid);
1042 m->cmd = r->cmd;
1043 m->set_data(r->inbl);
1044 _send_mon_message(m);
1045 return;
1046}
1047
1048void MonClient::_resend_mon_commands()
1049{
1050 // resend any requests
1051 for (map<uint64_t,MonCommand*>::iterator p = mon_commands.begin();
1052 p != mon_commands.end();
1053 ++p) {
1054 _send_command(p->second);
1055 }
1056}
1057
1058void MonClient::handle_mon_command_ack(MMonCommandAck *ack)
1059{
1060 MonCommand *r = NULL;
1061 uint64_t tid = ack->get_tid();
1062
1063 if (tid == 0 && !mon_commands.empty()) {
1064 r = mon_commands.begin()->second;
1065 ldout(cct, 10) << __func__ << " has tid 0, assuming it is " << r->tid << dendl;
1066 } else {
1067 map<uint64_t,MonCommand*>::iterator p = mon_commands.find(tid);
1068 if (p == mon_commands.end()) {
1069 ldout(cct, 10) << __func__ << " " << ack->get_tid() << " not found" << dendl;
1070 ack->put();
1071 return;
1072 }
1073 r = p->second;
1074 }
1075
1076 ldout(cct, 10) << __func__ << " " << r->tid << " " << r->cmd << dendl;
1077 if (r->poutbl)
1078 r->poutbl->claim(ack->get_data());
1079 _finish_command(r, ack->r, ack->rs);
1080 ack->put();
1081}
1082
31f18b77 1083int MonClient::_cancel_mon_command(uint64_t tid)
7c673cae 1084{
11fdf7f2 1085 ceph_assert(monc_lock.is_locked());
7c673cae
FG
1086
1087 map<ceph_tid_t, MonCommand*>::iterator it = mon_commands.find(tid);
1088 if (it == mon_commands.end()) {
1089 ldout(cct, 10) << __func__ << " tid " << tid << " dne" << dendl;
1090 return -ENOENT;
1091 }
1092
1093 ldout(cct, 10) << __func__ << " tid " << tid << dendl;
1094
1095 MonCommand *cmd = it->second;
1096 _finish_command(cmd, -ETIMEDOUT, "");
1097 return 0;
1098}
1099
1100void MonClient::_finish_command(MonCommand *r, int ret, string rs)
1101{
1102 ldout(cct, 10) << __func__ << " " << r->tid << " = " << ret << " " << rs << dendl;
1103 if (r->prval)
1104 *(r->prval) = ret;
1105 if (r->prs)
1106 *(r->prs) = rs;
1107 if (r->onfinish)
1108 finisher.queue(r->onfinish, ret);
1109 mon_commands.erase(r->tid);
1110 delete r;
1111}
1112
1113void MonClient::start_mon_command(const vector<string>& cmd,
1114 const bufferlist& inbl,
1115 bufferlist *outbl, string *outs,
1116 Context *onfinish)
1117{
11fdf7f2
TL
1118 std::lock_guard l(monc_lock);
1119 if (!initialized || stopping) {
1120 onfinish->complete(-ECANCELED);
1121 return;
1122 }
7c673cae
FG
1123 MonCommand *r = new MonCommand(++last_mon_command_tid);
1124 r->cmd = cmd;
1125 r->inbl = inbl;
1126 r->poutbl = outbl;
1127 r->prs = outs;
1128 r->onfinish = onfinish;
1129 if (cct->_conf->rados_mon_op_timeout > 0) {
1130 class C_CancelMonCommand : public Context
1131 {
1132 uint64_t tid;
1133 MonClient *monc;
1134 public:
1135 C_CancelMonCommand(uint64_t tid, MonClient *monc) : tid(tid), monc(monc) {}
1136 void finish(int r) override {
31f18b77 1137 monc->_cancel_mon_command(tid);
7c673cae
FG
1138 }
1139 };
1140 r->ontimeout = new C_CancelMonCommand(r->tid, this);
1141 timer.add_event_after(cct->_conf->rados_mon_op_timeout, r->ontimeout);
1142 }
1143 mon_commands[r->tid] = r;
1144 _send_command(r);
1145}
1146
1147void MonClient::start_mon_command(const string &mon_name,
1148 const vector<string>& cmd,
1149 const bufferlist& inbl,
1150 bufferlist *outbl, string *outs,
1151 Context *onfinish)
1152{
11fdf7f2
TL
1153 std::lock_guard l(monc_lock);
1154 if (!initialized || stopping) {
1155 onfinish->complete(-ECANCELED);
1156 return;
1157 }
7c673cae
FG
1158 MonCommand *r = new MonCommand(++last_mon_command_tid);
1159 r->target_name = mon_name;
1160 r->cmd = cmd;
1161 r->inbl = inbl;
1162 r->poutbl = outbl;
1163 r->prs = outs;
1164 r->onfinish = onfinish;
1165 mon_commands[r->tid] = r;
1166 _send_command(r);
1167}
1168
1169void MonClient::start_mon_command(int rank,
1170 const vector<string>& cmd,
1171 const bufferlist& inbl,
1172 bufferlist *outbl, string *outs,
1173 Context *onfinish)
1174{
11fdf7f2
TL
1175 std::lock_guard l(monc_lock);
1176 if (!initialized || stopping) {
1177 onfinish->complete(-ECANCELED);
1178 return;
1179 }
7c673cae
FG
1180 MonCommand *r = new MonCommand(++last_mon_command_tid);
1181 r->target_rank = rank;
1182 r->cmd = cmd;
1183 r->inbl = inbl;
1184 r->poutbl = outbl;
1185 r->prs = outs;
1186 r->onfinish = onfinish;
1187 mon_commands[r->tid] = r;
1188 _send_command(r);
1189}
1190
1191// ---------
1192
1193void MonClient::get_version(string map, version_t *newest, version_t *oldest, Context *onfinish)
1194{
1195 version_req_d *req = new version_req_d(onfinish, newest, oldest);
1196 ldout(cct, 10) << "get_version " << map << " req " << req << dendl;
11fdf7f2 1197 std::lock_guard l(monc_lock);
7c673cae
FG
1198 MMonGetVersion *m = new MMonGetVersion();
1199 m->what = map;
1200 m->handle = ++version_req_id;
1201 version_requests[m->handle] = req;
1202 _send_mon_message(m);
1203}
1204
1205void MonClient::handle_get_version_reply(MMonGetVersionReply* m)
1206{
11fdf7f2 1207 ceph_assert(monc_lock.is_locked());
7c673cae
FG
1208 map<ceph_tid_t, version_req_d*>::iterator iter = version_requests.find(m->handle);
1209 if (iter == version_requests.end()) {
1210 ldout(cct, 0) << __func__ << " version request with handle " << m->handle
1211 << " not found" << dendl;
1212 } else {
1213 version_req_d *req = iter->second;
1214 ldout(cct, 10) << __func__ << " finishing " << req << " version " << m->version << dendl;
1215 version_requests.erase(iter);
1216 if (req->newest)
1217 *req->newest = m->version;
1218 if (req->oldest)
1219 *req->oldest = m->oldest_version;
1220 finisher.queue(req->context, 0);
1221 delete req;
1222 }
1223 m->put();
1224}
1225
11fdf7f2
TL
1226int MonClient::get_auth_request(
1227 Connection *con,
1228 AuthConnectionMeta *auth_meta,
1229 uint32_t *auth_method,
1230 std::vector<uint32_t> *preferred_modes,
1231 bufferlist *bl)
1232{
1233 std::lock_guard l(monc_lock);
1234 ldout(cct,10) << __func__ << " con " << con << " auth_method " << *auth_method
1235 << dendl;
1236
1237 // connection to mon?
1238 if (con->get_peer_type() == CEPH_ENTITY_TYPE_MON) {
1239 ceph_assert(!auth_meta->authorizer);
1240 for (auto& i : pending_cons) {
1241 if (i.second.is_con(con)) {
1242 return i.second.get_auth_request(
1243 auth_method, preferred_modes, bl,
1244 entity_name, want_keys, rotating_secrets.get());
1245 }
1246 }
1247 return -ENOENT;
1248 }
1249
1250 // generate authorizer
1251 if (!auth) {
1252 lderr(cct) << __func__ << " but no auth handler is set up" << dendl;
1253 return -EACCES;
1254 }
1255 auth_meta->authorizer.reset(auth->build_authorizer(con->get_peer_type()));
1256 if (!auth_meta->authorizer) {
1257 lderr(cct) << __func__ << " failed to build_authorizer for type "
1258 << ceph_entity_type_name(con->get_peer_type()) << dendl;
1259 return -EACCES;
1260 }
1261 auth_meta->auth_method = auth_meta->authorizer->protocol;
1262 auth_registry.get_supported_modes(con->get_peer_type(),
1263 auth_meta->auth_method,
1264 preferred_modes);
1265 *bl = auth_meta->authorizer->bl;
1266 return 0;
1267}
1268
1269int MonClient::handle_auth_reply_more(
1270 Connection *con,
1271 AuthConnectionMeta *auth_meta,
1272 const bufferlist& bl,
1273 bufferlist *reply)
1274{
1275 std::lock_guard l(monc_lock);
1276
1277 if (con->get_peer_type() == CEPH_ENTITY_TYPE_MON) {
1278 for (auto& i : pending_cons) {
1279 if (i.second.is_con(con)) {
1280 return i.second.handle_auth_reply_more(auth_meta, bl, reply);
1281 }
1282 }
1283 return -ENOENT;
1284 }
1285
1286 // authorizer challenges
1287 if (!auth || !auth_meta->authorizer) {
1288 lderr(cct) << __func__ << " no authorizer?" << dendl;
1289 return -1;
1290 }
1291 auth_meta->authorizer->add_challenge(cct, bl);
1292 *reply = auth_meta->authorizer->bl;
1293 return 0;
1294}
1295
1296int MonClient::handle_auth_done(
1297 Connection *con,
1298 AuthConnectionMeta *auth_meta,
1299 uint64_t global_id,
1300 uint32_t con_mode,
1301 const bufferlist& bl,
1302 CryptoKey *session_key,
1303 std::string *connection_secret)
1304{
1305 if (con->get_peer_type() == CEPH_ENTITY_TYPE_MON) {
1306 std::lock_guard l(monc_lock);
1307 for (auto& i : pending_cons) {
1308 if (i.second.is_con(con)) {
1309 int r = i.second.handle_auth_done(
1310 auth_meta, global_id, bl,
1311 session_key, connection_secret);
1312 if (r) {
1313 pending_cons.erase(i.first);
1314 if (!pending_cons.empty()) {
1315 return r;
1316 }
1317 } else {
1318 active_con.reset(new MonConnection(std::move(i.second)));
1319 pending_cons.clear();
1320 ceph_assert(active_con->have_session());
1321 }
1322
1323 _finish_hunting(r);
1324 if (r || monmap.get_epoch() > 0) {
1325 _finish_auth(r);
1326 }
1327 return r;
1328 }
1329 }
1330 return -ENOENT;
1331 } else {
1332 // verify authorizer reply
1333 auto p = bl.begin();
1334 if (!auth_meta->authorizer->verify_reply(p, &auth_meta->connection_secret)) {
1335 ldout(cct, 0) << __func__ << " failed verifying authorizer reply"
1336 << dendl;
1337 return -EACCES;
1338 }
1339 auth_meta->session_key = auth_meta->authorizer->session_key;
1340 return 0;
1341 }
1342}
1343
1344int MonClient::handle_auth_bad_method(
1345 Connection *con,
1346 AuthConnectionMeta *auth_meta,
1347 uint32_t old_auth_method,
1348 int result,
1349 const std::vector<uint32_t>& allowed_methods,
1350 const std::vector<uint32_t>& allowed_modes)
1351{
1352 auth_meta->allowed_methods = allowed_methods;
1353
1354 std::lock_guard l(monc_lock);
1355 if (con->get_peer_type() == CEPH_ENTITY_TYPE_MON) {
1356 for (auto& i : pending_cons) {
1357 if (i.second.is_con(con)) {
1358 int r = i.second.handle_auth_bad_method(old_auth_method,
1359 result,
1360 allowed_methods,
1361 allowed_modes);
1362 if (r == 0) {
1363 return r; // try another method on this con
1364 }
1365 pending_cons.erase(i.first);
1366 if (!pending_cons.empty()) {
1367 return r; // fail this con, maybe another con will succeed
1368 }
1369 // fail hunt
1370 _finish_hunting(r);
1371 _finish_auth(r);
1372 return r;
1373 }
1374 }
1375 return -ENOENT;
1376 } else {
1377 // huh...
1378 ldout(cct,10) << __func__ << " hmm, they didn't like " << old_auth_method
1379 << " result " << cpp_strerror(result)
1380 << " and auth is " << (auth ? auth->get_protocol() : 0)
1381 << dendl;
1382 return -EACCES;
1383 }
1384}
1385
1386int MonClient::handle_auth_request(
1387 Connection *con,
1388 AuthConnectionMeta *auth_meta,
1389 bool more,
1390 uint32_t auth_method,
1391 const bufferlist& payload,
1392 bufferlist *reply)
1393{
1394 auth_meta->auth_mode = payload[0];
1395 if (auth_meta->auth_mode < AUTH_MODE_AUTHORIZER ||
1396 auth_meta->auth_mode > AUTH_MODE_AUTHORIZER_MAX) {
1397 return -EACCES;
1398 }
1399 AuthAuthorizeHandler *ah = get_auth_authorize_handler(con->get_peer_type(),
1400 auth_method);
1401 if (!ah) {
1402 lderr(cct) << __func__ << " no AuthAuthorizeHandler found for auth method "
1403 << auth_method << dendl;
1404 return -EOPNOTSUPP;
1405 }
1406 bool was_challenge = (bool)auth_meta->authorizer_challenge;
1407 bool isvalid = ah->verify_authorizer(
1408 cct,
1409 rotating_secrets.get(),
1410 payload,
1411 auth_meta->get_connection_secret_length(),
1412 reply,
1413 &con->peer_name,
1414 &con->peer_global_id,
1415 &con->peer_caps_info,
1416 &auth_meta->session_key,
1417 &auth_meta->connection_secret,
1418 &auth_meta->authorizer_challenge);
1419 if (isvalid) {
1420 handle_authentication_dispatcher->ms_handle_authentication(con);
1421 return 1;
1422 }
1423 if (!more && !was_challenge && auth_meta->authorizer_challenge) {
1424 ldout(cct,10) << __func__ << " added challenge on " << con << dendl;
1425 return 0;
1426 }
1427 ldout(cct,10) << __func__ << " bad authorizer on " << con << dendl;
1428 return -EACCES;
1429}
1430
7c673cae 1431AuthAuthorizer* MonClient::build_authorizer(int service_id) const {
11fdf7f2 1432 std::lock_guard l(monc_lock);
7c673cae
FG
1433 if (auth) {
1434 return auth->build_authorizer(service_id);
1435 } else {
1436 ldout(cct, 0) << __func__ << " for " << ceph_entity_type_name(service_id)
1437 << ", but no auth is available now" << dendl;
1438 return nullptr;
1439 }
1440}
1441
1442#define dout_subsys ceph_subsys_monc
1443#undef dout_prefix
1444#define dout_prefix *_dout << "monclient" << (have_session() ? ": " : "(hunting): ")
1445
11fdf7f2
TL
1446MonConnection::MonConnection(
1447 CephContext *cct, ConnectionRef con, uint64_t global_id,
1448 AuthRegistry *ar)
1449 : cct(cct), con(con), global_id(global_id), auth_registry(ar)
7c673cae
FG
1450{}
1451
1452MonConnection::~MonConnection()
1453{
1454 if (con) {
1455 con->mark_down();
1456 con.reset();
1457 }
1458}
1459
1460bool MonConnection::have_session() const
1461{
1462 return state == State::HAVE_SESSION;
1463}
1464
1465void MonConnection::start(epoch_t epoch,
11fdf7f2 1466 const EntityName& entity_name)
7c673cae 1467{
11fdf7f2
TL
1468 auth_start = ceph_clock_now();
1469
1470 if (con->get_peer_addr().is_msgr2()) {
1471 ldout(cct, 10) << __func__ << " opening mon connection" << dendl;
1472 state = State::AUTHENTICATING;
1473 con->send_message(new MMonGetMap());
1474 return;
1475 }
1476
7c673cae
FG
1477 // restart authentication handshake
1478 state = State::NEGOTIATING;
1479
1480 // send an initial keepalive to ensure our timestamp is valid by the
1481 // time we are in an OPENED state (by sequencing this before
1482 // authentication).
1483 con->send_keepalive();
1484
1485 auto m = new MAuth;
11fdf7f2 1486 m->protocol = CEPH_AUTH_UNKNOWN;
7c673cae
FG
1487 m->monmap_epoch = epoch;
1488 __u8 struct_v = 1;
11fdf7f2
TL
1489 encode(struct_v, m->auth_payload);
1490 vector<uint32_t> auth_supported;
1491 auth_registry->get_supported_methods(con->get_peer_type(), &auth_supported);
1492 encode(auth_supported, m->auth_payload);
1493 encode(entity_name, m->auth_payload);
1494 encode(global_id, m->auth_payload);
7c673cae
FG
1495 con->send_message(m);
1496}
1497
11fdf7f2
TL
1498int MonConnection::get_auth_request(
1499 uint32_t *method,
1500 std::vector<uint32_t> *preferred_modes,
1501 bufferlist *bl,
1502 const EntityName& entity_name,
1503 uint32_t want_keys,
1504 RotatingKeyRing* keyring)
1505{
1506 // choose method
1507 if (auth_method < 0) {
1508 vector<uint32_t> as;
1509 auth_registry->get_supported_methods(con->get_peer_type(), &as);
1510 if (as.empty()) {
1511 return -EACCES;
1512 }
1513 auth_method = as.front();
1514 }
1515 *method = auth_method;
1516 auth_registry->get_supported_modes(con->get_peer_type(), auth_method,
1517 preferred_modes);
1518 ldout(cct,10) << __func__ << " method " << *method
1519 << " preferred_modes " << *preferred_modes << dendl;
1520 if (preferred_modes->empty()) {
1521 return -EACCES;
1522 }
1523
1524 if (auth) {
1525 auth.reset();
1526 }
1527 int r = _init_auth(*method, entity_name, want_keys, keyring, true);
1528 ceph_assert(r == 0);
1529
1530 // initial requset includes some boilerplate...
1531 encode((char)AUTH_MODE_MON, *bl);
1532 encode(entity_name, *bl);
1533 encode(global_id, *bl);
1534
1535 // and (maybe) some method-specific initial payload
1536 auth->build_initial_request(bl);
1537
1538 return 0;
1539}
1540
1541int MonConnection::handle_auth_reply_more(
1542 AuthConnectionMeta *auth_meta,
1543 const bufferlist& bl,
1544 bufferlist *reply)
1545{
1546 ldout(cct, 10) << __func__ << " payload " << bl.length() << dendl;
1547 ldout(cct, 30) << __func__ << " got\n";
1548 bl.hexdump(*_dout);
1549 *_dout << dendl;
1550
1551 auto p = bl.cbegin();
1552 ldout(cct, 10) << __func__ << " payload_len " << bl.length() << dendl;
1553 int r = auth->handle_response(0, p, &auth_meta->session_key,
1554 &auth_meta->connection_secret);
1555 if (r == -EAGAIN) {
1556 auth->prepare_build_request();
1557 auth->build_request(*reply);
1558 ldout(cct, 10) << __func__ << " responding with " << reply->length()
1559 << " bytes" << dendl;
1560 r = 0;
1561 } else if (r < 0) {
1562 lderr(cct) << __func__ << " handle_response returned " << r << dendl;
1563 } else {
1564 ldout(cct, 10) << __func__ << " authenticated!" << dendl;
1565 // FIXME
1566 ceph_abort(cct, "write me");
1567 }
1568 return r;
1569}
1570
1571int MonConnection::handle_auth_done(
1572 AuthConnectionMeta *auth_meta,
1573 uint64_t new_global_id,
1574 const bufferlist& bl,
1575 CryptoKey *session_key,
1576 std::string *connection_secret)
1577{
1578 ldout(cct,10) << __func__ << " global_id " << new_global_id
1579 << " payload " << bl.length()
1580 << dendl;
1581 global_id = new_global_id;
1582 auth->set_global_id(global_id);
1583 auto p = bl.begin();
1584 int auth_err = auth->handle_response(0, p, &auth_meta->session_key,
1585 &auth_meta->connection_secret);
1586 if (auth_err >= 0) {
1587 state = State::HAVE_SESSION;
1588 }
1589 con->set_last_keepalive_ack(auth_start);
1590 return auth_err;
1591}
1592
1593int MonConnection::handle_auth_bad_method(
1594 uint32_t old_auth_method,
1595 int result,
1596 const std::vector<uint32_t>& allowed_methods,
1597 const std::vector<uint32_t>& allowed_modes)
1598{
1599 ldout(cct,10) << __func__ << " old_auth_method " << old_auth_method
1600 << " result " << cpp_strerror(result)
1601 << " allowed_methods " << allowed_methods << dendl;
1602 vector<uint32_t> auth_supported;
1603 auth_registry->get_supported_methods(con->get_peer_type(), &auth_supported);
1604 auto p = std::find(auth_supported.begin(), auth_supported.end(),
1605 old_auth_method);
1606 assert(p != auth_supported.end());
1607 p = std::find_first_of(std::next(p), auth_supported.end(),
1608 allowed_methods.begin(), allowed_methods.end());
1609 if (p == auth_supported.end()) {
1610 lderr(cct) << __func__ << " server allowed_methods " << allowed_methods
1611 << " but i only support " << auth_supported << dendl;
1612 return -EACCES;
1613 }
1614 auth_method = *p;
1615 ldout(cct,10) << __func__ << " will try " << auth_method << " next" << dendl;
1616 return 0;
1617}
1618
7c673cae
FG
1619int MonConnection::handle_auth(MAuthReply* m,
1620 const EntityName& entity_name,
1621 uint32_t want_keys,
1622 RotatingKeyRing* keyring)
1623{
1624 if (state == State::NEGOTIATING) {
1625 int r = _negotiate(m, entity_name, want_keys, keyring);
1626 if (r) {
1627 return r;
1628 }
1629 state = State::AUTHENTICATING;
1630 }
1631 int r = authenticate(m);
1632 if (!r) {
1633 state = State::HAVE_SESSION;
1634 }
1635 return r;
1636}
1637
1638int MonConnection::_negotiate(MAuthReply *m,
1639 const EntityName& entity_name,
1640 uint32_t want_keys,
1641 RotatingKeyRing* keyring)
1642{
1643 if (auth && (int)m->protocol == auth->get_protocol()) {
1644 // good, negotiation completed
1645 auth->reset();
1646 return 0;
1647 }
1648
11fdf7f2
TL
1649 int r = _init_auth(m->protocol, entity_name, want_keys, keyring, false);
1650 if (r == -ENOTSUP) {
7c673cae
FG
1651 if (m->result == -ENOTSUP) {
1652 ldout(cct, 10) << "none of our auth protocols are supported by the server"
1653 << dendl;
1654 }
1655 return m->result;
1656 }
11fdf7f2
TL
1657 return r;
1658}
1659
1660int MonConnection::_init_auth(
1661 uint32_t method,
1662 const EntityName& entity_name,
1663 uint32_t want_keys,
1664 RotatingKeyRing* keyring,
1665 bool msgr2)
1666{
1667 ldout(cct,10) << __func__ << " method " << method << dendl;
1668 auth.reset(
1669 AuthClientHandler::create(cct, method, keyring));
1670 if (!auth) {
1671 ldout(cct, 10) << " no handler for protocol " << method << dendl;
1672 return -ENOTSUP;
1673 }
7c673cae
FG
1674
1675 // do not request MGR key unless the mon has the SERVER_KRAKEN
1676 // feature. otherwise it will give us an auth error. note that
1677 // we have to use the FEATUREMASK because pre-jewel the kraken
1678 // feature bit was used for something else.
11fdf7f2
TL
1679 if (!msgr2 &&
1680 (want_keys & CEPH_ENTITY_TYPE_MGR) &&
1681 !(con->has_features(CEPH_FEATUREMASK_SERVER_KRAKEN))) {
7c673cae
FG
1682 ldout(cct, 1) << __func__
1683 << " not requesting MGR keys from pre-kraken monitor"
1684 << dendl;
1685 want_keys &= ~CEPH_ENTITY_TYPE_MGR;
1686 }
1687 auth->set_want_keys(want_keys);
1688 auth->init(entity_name);
1689 auth->set_global_id(global_id);
1690 return 0;
1691}
1692
1693int MonConnection::authenticate(MAuthReply *m)
1694{
11fdf7f2 1695 ceph_assert(auth);
7c673cae
FG
1696 if (!m->global_id) {
1697 ldout(cct, 1) << "peer sent an invalid global_id" << dendl;
1698 }
1699 if (m->global_id != global_id) {
1700 // it's a new session
1701 auth->reset();
1702 global_id = m->global_id;
1703 auth->set_global_id(global_id);
1704 ldout(cct, 10) << "my global_id is " << m->global_id << dendl;
1705 }
11fdf7f2
TL
1706 auto p = m->result_bl.cbegin();
1707 int ret = auth->handle_response(m->result, p, nullptr, nullptr);
7c673cae
FG
1708 if (ret == -EAGAIN) {
1709 auto ma = new MAuth;
1710 ma->protocol = auth->get_protocol();
1711 auth->prepare_build_request();
1712 auth->build_request(ma->auth_payload);
1713 con->send_message(ma);
1714 }
1715 return ret;
1716}
11fdf7f2
TL
1717
1718void MonClient::register_config_callback(md_config_t::config_callback fn) {
1719 ceph_assert(!config_cb);
1720 config_cb = fn;
1721}
1722
1723md_config_t::config_callback MonClient::get_config_callback() {
1724 return config_cb;
1725}