]> git.proxmox.com Git - ceph.git/blob - ceph/src/mon/MonClient.cc
update sources to v12.2.3
[ceph.git] / ceph / src / mon / MonClient.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #include <random>
16
17 #include "include/scope_guard.h"
18
19 #include "messages/MMonGetMap.h"
20 #include "messages/MMonGetVersion.h"
21 #include "messages/MMonGetVersionReply.h"
22 #include "messages/MMonMap.h"
23 #include "messages/MAuth.h"
24 #include "messages/MLogAck.h"
25 #include "messages/MAuthReply.h"
26 #include "messages/MMonCommand.h"
27 #include "messages/MMonCommandAck.h"
28 #include "messages/MPing.h"
29
30 #include "messages/MMonSubscribe.h"
31 #include "messages/MMonSubscribeAck.h"
32 #include "common/errno.h"
33 #include "common/LogClient.h"
34
35 #include "MonClient.h"
36 #include "MonMap.h"
37
38 #include "auth/Auth.h"
39 #include "auth/KeyRing.h"
40 #include "auth/AuthClientHandler.h"
41 #include "auth/AuthMethodList.h"
42 #include "auth/RotatingKeyRing.h"
43
44 #define dout_subsys ceph_subsys_monc
45 #undef dout_prefix
46 #define dout_prefix *_dout << "monclient" << (_hunting() ? "(hunting)":"") << ": "
47
48 MonClient::MonClient(CephContext *cct_) :
49 Dispatcher(cct_),
50 messenger(NULL),
51 monc_lock("MonClient::monc_lock"),
52 timer(cct_, monc_lock),
53 finisher(cct_),
54 initialized(false),
55 no_keyring_disabled_cephx(false),
56 log_client(NULL),
57 more_log_pending(false),
58 want_monmap(true),
59 had_a_connection(false),
60 reopen_interval_multiplier(
61 cct_->_conf->get_val<double>("mon_client_hunt_interval_min_multiple")),
62 last_mon_command_tid(0),
63 version_req_id(0)
64 {
65 }
66
67 MonClient::~MonClient()
68 {
69 }
70
71 int MonClient::build_initial_monmap()
72 {
73 ldout(cct, 10) << __func__ << dendl;
74 return monmap.build_initial(cct, cerr);
75 }
76
77 int MonClient::get_monmap()
78 {
79 ldout(cct, 10) << __func__ << dendl;
80 Mutex::Locker l(monc_lock);
81
82 _sub_want("monmap", 0, 0);
83 if (!_opened())
84 _reopen_session();
85
86 while (want_monmap)
87 map_cond.Wait(monc_lock);
88
89 ldout(cct, 10) << __func__ << " done" << dendl;
90 return 0;
91 }
92
93 int MonClient::get_monmap_privately()
94 {
95 ldout(cct, 10) << __func__ << dendl;
96 Mutex::Locker l(monc_lock);
97
98 bool temp_msgr = false;
99 Messenger* smessenger = NULL;
100 if (!messenger) {
101 messenger = smessenger = Messenger::create_client_messenger(cct, "temp_mon_client");
102 if (NULL == messenger) {
103 return -1;
104 }
105 messenger->add_dispatcher_head(this);
106 smessenger->start();
107 temp_msgr = true;
108 }
109
110 int attempt = 10;
111
112 ldout(cct, 10) << "have " << monmap.epoch << " fsid " << monmap.fsid << dendl;
113
114 std::random_device rd;
115 std::mt19937 rng(rd());
116 assert(monmap.size() > 0);
117 std::uniform_int_distribution<unsigned> ranks(0, monmap.size() - 1);
118 while (monmap.fsid.is_zero()) {
119 auto rank = ranks(rng);
120 auto& pending_con = _add_conn(rank, 0);
121 auto con = pending_con.get_con();
122 ldout(cct, 10) << "querying mon." << monmap.get_name(rank) << " "
123 << con->get_peer_addr() << dendl;
124 con->send_message(new MMonGetMap);
125
126 if (--attempt == 0)
127 break;
128
129 utime_t interval;
130 interval.set_from_double(cct->_conf->mon_client_hunt_interval);
131 map_cond.WaitInterval(monc_lock, interval);
132
133 if (monmap.fsid.is_zero() && con) {
134 con->mark_down(); // nope, clean that connection up
135 }
136 }
137
138 if (temp_msgr) {
139 pending_cons.clear();
140 monc_lock.Unlock();
141 messenger->shutdown();
142 if (smessenger)
143 smessenger->wait();
144 delete messenger;
145 messenger = 0;
146 monc_lock.Lock();
147 }
148
149 pending_cons.clear();
150
151 if (!monmap.fsid.is_zero())
152 return 0;
153 return -1;
154 }
155
156
157 /**
158 * Ping the monitor with id @p mon_id and set the resulting reply in
159 * the provided @p result_reply, if this last parameter is not NULL.
160 *
161 * So that we don't rely on the MonClient's default messenger, set up
162 * during connect(), we create our own messenger to comunicate with the
163 * specified monitor. This is advantageous in the following ways:
164 *
165 * - Isolate the ping procedure from the rest of the MonClient's operations,
166 * allowing us to not acquire or manage the big monc_lock, thus not
167 * having to block waiting for some other operation to finish before we
168 * can proceed.
169 * * for instance, we can ping mon.FOO even if we are currently hunting
170 * or blocked waiting for auth to complete with mon.BAR.
171 *
172 * - Ping a monitor prior to establishing a connection (using connect())
173 * and properly establish the MonClient's messenger. This frees us
174 * from dealing with the complex foo that happens in connect().
175 *
176 * We also don't rely on MonClient as a dispatcher for this messenger,
177 * unlike what happens with the MonClient's default messenger. This allows
178 * us to sandbox the whole ping, having it much as a separate entity in
179 * the MonClient class, considerably simplifying the handling and dispatching
180 * of messages without needing to consider monc_lock.
181 *
182 * Current drawback is that we will establish a messenger for each ping
183 * we want to issue, instead of keeping a single messenger instance that
184 * would be used for all pings.
185 */
186 int MonClient::ping_monitor(const string &mon_id, string *result_reply)
187 {
188 ldout(cct, 10) << __func__ << dendl;
189
190 string new_mon_id;
191 if (monmap.contains("noname-"+mon_id)) {
192 new_mon_id = "noname-"+mon_id;
193 } else {
194 new_mon_id = mon_id;
195 }
196
197 if (new_mon_id.empty()) {
198 ldout(cct, 10) << __func__ << " specified mon id is empty!" << dendl;
199 return -EINVAL;
200 } else if (!monmap.contains(new_mon_id)) {
201 ldout(cct, 10) << __func__ << " no such monitor 'mon." << new_mon_id << "'"
202 << dendl;
203 return -ENOENT;
204 }
205
206 MonClientPinger *pinger = new MonClientPinger(cct, result_reply);
207
208 Messenger *smsgr = Messenger::create_client_messenger(cct, "temp_ping_client");
209 smsgr->add_dispatcher_head(pinger);
210 smsgr->start();
211
212 ConnectionRef con = smsgr->get_connection(monmap.get_inst(new_mon_id));
213 ldout(cct, 10) << __func__ << " ping mon." << new_mon_id
214 << " " << con->get_peer_addr() << dendl;
215 con->send_message(new MPing);
216
217 pinger->lock.Lock();
218 int ret = pinger->wait_for_reply(cct->_conf->client_mount_timeout);
219 if (ret == 0) {
220 ldout(cct,10) << __func__ << " got ping reply" << dendl;
221 } else {
222 ret = -ret;
223 }
224 pinger->lock.Unlock();
225
226 con->mark_down();
227 smsgr->shutdown();
228 smsgr->wait();
229 delete smsgr;
230 delete pinger;
231 return ret;
232 }
233
234 bool MonClient::ms_dispatch(Message *m)
235 {
236 if (my_addr == entity_addr_t())
237 my_addr = messenger->get_myaddr();
238
239 // we only care about these message types
240 switch (m->get_type()) {
241 case CEPH_MSG_MON_MAP:
242 case CEPH_MSG_AUTH_REPLY:
243 case CEPH_MSG_MON_SUBSCRIBE_ACK:
244 case CEPH_MSG_MON_GET_VERSION_REPLY:
245 case MSG_MON_COMMAND_ACK:
246 case MSG_LOGACK:
247 break;
248 default:
249 return false;
250 }
251
252 Mutex::Locker lock(monc_lock);
253
254 if (_hunting()) {
255 auto pending_con = pending_cons.find(m->get_source_addr());
256 if (pending_con == pending_cons.end() ||
257 pending_con->second.get_con() != m->get_connection()) {
258 // ignore any messages outside hunting sessions
259 ldout(cct, 10) << "discarding stray monitor message " << *m << dendl;
260 m->put();
261 return true;
262 }
263 } else if (!active_con || active_con->get_con() != m->get_connection()) {
264 // ignore any messages outside our session(s)
265 ldout(cct, 10) << "discarding stray monitor message " << *m << dendl;
266 m->put();
267 return true;
268 }
269
270 switch (m->get_type()) {
271 case CEPH_MSG_MON_MAP:
272 handle_monmap(static_cast<MMonMap*>(m));
273 if (passthrough_monmap) {
274 return false;
275 } else {
276 m->put();
277 }
278 break;
279 case CEPH_MSG_AUTH_REPLY:
280 handle_auth(static_cast<MAuthReply*>(m));
281 break;
282 case CEPH_MSG_MON_SUBSCRIBE_ACK:
283 handle_subscribe_ack(static_cast<MMonSubscribeAck*>(m));
284 break;
285 case CEPH_MSG_MON_GET_VERSION_REPLY:
286 handle_get_version_reply(static_cast<MMonGetVersionReply*>(m));
287 break;
288 case MSG_MON_COMMAND_ACK:
289 handle_mon_command_ack(static_cast<MMonCommandAck*>(m));
290 break;
291 case MSG_LOGACK:
292 if (log_client) {
293 log_client->handle_log_ack(static_cast<MLogAck*>(m));
294 m->put();
295 if (more_log_pending) {
296 send_log();
297 }
298 } else {
299 m->put();
300 }
301 break;
302 }
303 return true;
304 }
305
306 void MonClient::send_log(bool flush)
307 {
308 if (log_client) {
309 Message *lm = log_client->get_mon_log_message(flush);
310 if (lm)
311 _send_mon_message(lm);
312 more_log_pending = log_client->are_pending();
313 }
314 }
315
316 void MonClient::flush_log()
317 {
318 Mutex::Locker l(monc_lock);
319 send_log();
320 }
321
322 /* Unlike all the other message-handling functions, we don't put away a reference
323 * because we want to support MMonMap passthrough to other Dispatchers. */
324 void MonClient::handle_monmap(MMonMap *m)
325 {
326 ldout(cct, 10) << __func__ << " " << *m << dendl;
327 auto peer = m->get_source_addr();
328 string cur_mon = monmap.get_name(peer);
329
330 bufferlist::iterator p = m->monmapbl.begin();
331 ::decode(monmap, p);
332
333 ldout(cct, 10) << " got monmap " << monmap.epoch
334 << ", mon." << cur_mon << " is now rank " << monmap.get_rank(cur_mon)
335 << dendl;
336 ldout(cct, 10) << "dump:\n";
337 monmap.print(*_dout);
338 *_dout << dendl;
339
340 _sub_got("monmap", monmap.get_epoch());
341
342 if (!monmap.get_addr_name(peer, cur_mon)) {
343 ldout(cct, 10) << "mon." << cur_mon << " went away" << dendl;
344 // can't find the mon we were talking to (above)
345 _reopen_session();
346 }
347
348 map_cond.Signal();
349 want_monmap = false;
350 }
351
352 // ----------------------
353
354 int MonClient::init()
355 {
356 ldout(cct, 10) << __func__ << dendl;
357
358 messenger->add_dispatcher_head(this);
359
360 entity_name = cct->_conf->name;
361
362 Mutex::Locker l(monc_lock);
363
364 string method;
365 if (!cct->_conf->auth_supported.empty())
366 method = cct->_conf->auth_supported;
367 else if (entity_name.get_type() == CEPH_ENTITY_TYPE_OSD ||
368 entity_name.get_type() == CEPH_ENTITY_TYPE_MDS ||
369 entity_name.get_type() == CEPH_ENTITY_TYPE_MON ||
370 entity_name.get_type() == CEPH_ENTITY_TYPE_MGR)
371 method = cct->_conf->auth_cluster_required;
372 else
373 method = cct->_conf->auth_client_required;
374 auth_supported.reset(new AuthMethodList(cct, method));
375 ldout(cct, 10) << "auth_supported " << auth_supported->get_supported_set() << " method " << method << dendl;
376
377 int r = 0;
378 keyring.reset(new KeyRing); // initializing keyring anyway
379
380 if (auth_supported->is_supported_auth(CEPH_AUTH_CEPHX)) {
381 r = keyring->from_ceph_context(cct);
382 if (r == -ENOENT) {
383 auth_supported->remove_supported_auth(CEPH_AUTH_CEPHX);
384 if (!auth_supported->get_supported_set().empty()) {
385 r = 0;
386 no_keyring_disabled_cephx = true;
387 } else {
388 lderr(cct) << "ERROR: missing keyring, cannot use cephx for authentication" << dendl;
389 }
390 }
391 }
392
393 if (r < 0) {
394 return r;
395 }
396
397 rotating_secrets.reset(
398 new RotatingKeyRing(cct, cct->get_module_type(), keyring.get()));
399
400 initialized = true;
401
402 timer.init();
403 finisher.start();
404 schedule_tick();
405
406 return 0;
407 }
408
409 void MonClient::shutdown()
410 {
411 ldout(cct, 10) << __func__ << dendl;
412 monc_lock.Lock();
413 while (!version_requests.empty()) {
414 version_requests.begin()->second->context->complete(-ECANCELED);
415 ldout(cct, 20) << __func__ << " canceling and discarding version request "
416 << version_requests.begin()->second << dendl;
417 delete version_requests.begin()->second;
418 version_requests.erase(version_requests.begin());
419 }
420 while (!mon_commands.empty()) {
421 auto tid = mon_commands.begin()->first;
422 _cancel_mon_command(tid);
423 }
424 while (!waiting_for_session.empty()) {
425 ldout(cct, 20) << __func__ << " discarding pending message " << *waiting_for_session.front() << dendl;
426 waiting_for_session.front()->put();
427 waiting_for_session.pop_front();
428 }
429
430 active_con.reset();
431 pending_cons.clear();
432 auth.reset();
433
434 monc_lock.Unlock();
435
436 if (initialized) {
437 finisher.wait_for_empty();
438 finisher.stop();
439 }
440 monc_lock.Lock();
441 timer.shutdown();
442
443 monc_lock.Unlock();
444 }
445
446 int MonClient::authenticate(double timeout)
447 {
448 Mutex::Locker lock(monc_lock);
449
450 if (active_con) {
451 ldout(cct, 5) << "already authenticated" << dendl;
452 return 0;
453 }
454
455 _sub_want("monmap", monmap.get_epoch() ? monmap.get_epoch() + 1 : 0, 0);
456 if (!_opened())
457 _reopen_session();
458
459 utime_t until = ceph_clock_now();
460 until += timeout;
461 if (timeout > 0.0)
462 ldout(cct, 10) << "authenticate will time out at " << until << dendl;
463 while (!active_con && !authenticate_err) {
464 if (timeout > 0.0) {
465 int r = auth_cond.WaitUntil(monc_lock, until);
466 if (r == ETIMEDOUT) {
467 ldout(cct, 0) << "authenticate timed out after " << timeout << dendl;
468 authenticate_err = -r;
469 }
470 } else {
471 auth_cond.Wait(monc_lock);
472 }
473 }
474
475 if (active_con) {
476 ldout(cct, 5) << __func__ << " success, global_id "
477 << active_con->get_global_id() << dendl;
478 // active_con should not have been set if there was an error
479 assert(authenticate_err == 0);
480 authenticated = true;
481 }
482
483 if (authenticate_err < 0 && no_keyring_disabled_cephx) {
484 lderr(cct) << __func__ << " NOTE: no keyring found; disabled cephx authentication" << dendl;
485 }
486
487 return authenticate_err;
488 }
489
490 void MonClient::handle_auth(MAuthReply *m)
491 {
492 assert(monc_lock.is_locked());
493 if (!_hunting()) {
494 std::swap(active_con->get_auth(), auth);
495 int ret = active_con->authenticate(m);
496 m->put();
497 std::swap(auth, active_con->get_auth());
498 if (global_id != active_con->get_global_id()) {
499 lderr(cct) << __func__ << " peer assigned me a different global_id: "
500 << active_con->get_global_id() << dendl;
501 }
502 if (ret != -EAGAIN) {
503 _finish_auth(ret);
504 }
505 return;
506 }
507
508 // hunting
509 auto found = pending_cons.find(m->get_source_addr());
510 assert(found != pending_cons.end());
511 int auth_err = found->second.handle_auth(m, entity_name, want_keys,
512 rotating_secrets.get());
513 m->put();
514 if (auth_err == -EAGAIN) {
515 return;
516 }
517 if (auth_err) {
518 pending_cons.erase(found);
519 if (!pending_cons.empty()) {
520 // keep trying with pending connections
521 return;
522 }
523 // the last try just failed, give up.
524 } else {
525 auto& mc = found->second;
526 assert(mc.have_session());
527 active_con.reset(new MonConnection(std::move(mc)));
528 pending_cons.clear();
529 }
530
531 _finish_hunting();
532
533 if (!auth_err) {
534 last_rotating_renew_sent = utime_t();
535 while (!waiting_for_session.empty()) {
536 _send_mon_message(waiting_for_session.front());
537 waiting_for_session.pop_front();
538 }
539 _resend_mon_commands();
540 send_log(true);
541 if (active_con) {
542 std::swap(auth, active_con->get_auth());
543 global_id = active_con->get_global_id();
544 }
545 }
546 _finish_auth(auth_err);
547 if (!auth_err) {
548 Context *cb = nullptr;
549 if (session_established_context) {
550 cb = session_established_context.release();
551 }
552 if (cb) {
553 monc_lock.Unlock();
554 cb->complete(0);
555 monc_lock.Lock();
556 }
557 }
558 }
559
560 void MonClient::_finish_auth(int auth_err)
561 {
562 authenticate_err = auth_err;
563 // _resend_mon_commands() could _reopen_session() if the connected mon is not
564 // the one the MonCommand is targeting.
565 if (!auth_err && active_con) {
566 assert(auth);
567 _check_auth_tickets();
568 }
569 auth_cond.SignalAll();
570 }
571
572 // ---------
573
574 void MonClient::_send_mon_message(Message *m)
575 {
576 assert(monc_lock.is_locked());
577 if (active_con) {
578 auto cur_con = active_con->get_con();
579 ldout(cct, 10) << "_send_mon_message to mon."
580 << monmap.get_name(cur_con->get_peer_addr())
581 << " at " << cur_con->get_peer_addr() << dendl;
582 cur_con->send_message(m);
583 } else {
584 waiting_for_session.push_back(m);
585 }
586 }
587
588 void MonClient::_reopen_session(int rank)
589 {
590 assert(monc_lock.is_locked());
591 ldout(cct, 10) << __func__ << " rank " << rank << dendl;
592
593 active_con.reset();
594 pending_cons.clear();
595
596 _start_hunting();
597
598 if (rank >= 0) {
599 _add_conn(rank, global_id);
600 } else {
601 _add_conns(global_id);
602 }
603
604 // throw out old queued messages
605 while (!waiting_for_session.empty()) {
606 waiting_for_session.front()->put();
607 waiting_for_session.pop_front();
608 }
609
610 // throw out version check requests
611 while (!version_requests.empty()) {
612 finisher.queue(version_requests.begin()->second->context, -EAGAIN);
613 delete version_requests.begin()->second;
614 version_requests.erase(version_requests.begin());
615 }
616
617 for (auto& c : pending_cons) {
618 c.second.start(monmap.get_epoch(), entity_name, *auth_supported);
619 }
620
621 for (map<string,ceph_mon_subscribe_item>::iterator p = sub_sent.begin();
622 p != sub_sent.end();
623 ++p) {
624 if (sub_new.count(p->first) == 0)
625 sub_new[p->first] = p->second;
626 }
627 if (!sub_new.empty())
628 _renew_subs();
629 }
630
631 MonConnection& MonClient::_add_conn(unsigned rank, uint64_t global_id)
632 {
633 auto peer = monmap.get_addr(rank);
634 auto conn = messenger->get_connection(monmap.get_inst(rank));
635 MonConnection mc(cct, conn, global_id);
636 auto inserted = pending_cons.insert(make_pair(peer, move(mc)));
637 ldout(cct, 10) << "picked mon." << monmap.get_name(rank)
638 << " con " << conn
639 << " addr " << conn->get_peer_addr()
640 << dendl;
641 return inserted.first->second;
642 }
643
644 void MonClient::_add_conns(uint64_t global_id)
645 {
646 uint16_t min_priority = std::numeric_limits<uint16_t>::max();
647 for (const auto& m : monmap.mon_info) {
648 if (m.second.priority < min_priority) {
649 min_priority = m.second.priority;
650 }
651 }
652 vector<unsigned> ranks;
653 for (const auto& m : monmap.mon_info) {
654 if (m.second.priority == min_priority) {
655 ranks.push_back(monmap.get_rank(m.first));
656 }
657 }
658 std::random_device rd;
659 std::mt19937 rng(rd());
660 std::shuffle(ranks.begin(), ranks.end(), rng);
661 unsigned n = cct->_conf->mon_client_hunt_parallel;
662 if (n == 0 || n > ranks.size()) {
663 n = ranks.size();
664 }
665 for (unsigned i = 0; i < n; i++) {
666 _add_conn(ranks[i], global_id);
667 }
668 }
669
670 bool MonClient::ms_handle_reset(Connection *con)
671 {
672 Mutex::Locker lock(monc_lock);
673
674 if (con->get_peer_type() != CEPH_ENTITY_TYPE_MON)
675 return false;
676
677 if (_hunting()) {
678 if (pending_cons.count(con->get_peer_addr())) {
679 ldout(cct, 10) << __func__ << " hunted mon " << con->get_peer_addr() << dendl;
680 } else {
681 ldout(cct, 10) << __func__ << " stray mon " << con->get_peer_addr() << dendl;
682 }
683 return true;
684 } else {
685 if (active_con && con == active_con->get_con()) {
686 ldout(cct, 10) << __func__ << " current mon " << con->get_peer_addr() << dendl;
687 _reopen_session();
688 return false;
689 } else {
690 ldout(cct, 10) << "ms_handle_reset stray mon " << con->get_peer_addr() << dendl;
691 return true;
692 }
693 }
694 }
695
696 bool MonClient::_opened() const
697 {
698 assert(monc_lock.is_locked());
699 return active_con || _hunting();
700 }
701
702 bool MonClient::_hunting() const
703 {
704 return !pending_cons.empty();
705 }
706
707 void MonClient::_start_hunting()
708 {
709 assert(!_hunting());
710 // adjust timeouts if necessary
711 if (!had_a_connection)
712 return;
713 reopen_interval_multiplier *= cct->_conf->mon_client_hunt_interval_backoff;
714 if (reopen_interval_multiplier >
715 cct->_conf->mon_client_hunt_interval_max_multiple) {
716 reopen_interval_multiplier =
717 cct->_conf->mon_client_hunt_interval_max_multiple;
718 }
719 }
720
721 void MonClient::_finish_hunting()
722 {
723 assert(monc_lock.is_locked());
724 // the pending conns have been cleaned.
725 assert(!_hunting());
726 if (active_con) {
727 auto con = active_con->get_con();
728 ldout(cct, 1) << "found mon."
729 << monmap.get_name(con->get_peer_addr())
730 << dendl;
731 } else {
732 ldout(cct, 1) << "no mon sessions established" << dendl;
733 }
734
735 had_a_connection = true;
736 _un_backoff();
737 }
738
739 void MonClient::tick()
740 {
741 ldout(cct, 10) << __func__ << dendl;
742
743 auto reschedule_tick = make_scope_guard([this] {
744 schedule_tick();
745 });
746
747 _check_auth_tickets();
748
749 if (_hunting()) {
750 ldout(cct, 1) << "continuing hunt" << dendl;
751 return _reopen_session();
752 } else if (active_con) {
753 // just renew as needed
754 utime_t now = ceph_clock_now();
755 auto cur_con = active_con->get_con();
756 if (!cur_con->has_feature(CEPH_FEATURE_MON_STATEFUL_SUB)) {
757 ldout(cct, 10) << "renew subs? (now: " << now
758 << "; renew after: " << sub_renew_after << ") -- "
759 << (now > sub_renew_after ? "yes" : "no")
760 << dendl;
761 if (now > sub_renew_after)
762 _renew_subs();
763 }
764
765 cur_con->send_keepalive();
766
767 if (cct->_conf->mon_client_ping_timeout > 0 &&
768 cur_con->has_feature(CEPH_FEATURE_MSGR_KEEPALIVE2)) {
769 utime_t lk = cur_con->get_last_keepalive_ack();
770 utime_t interval = now - lk;
771 if (interval > cct->_conf->mon_client_ping_timeout) {
772 ldout(cct, 1) << "no keepalive since " << lk << " (" << interval
773 << " seconds), reconnecting" << dendl;
774 return _reopen_session();
775 }
776 send_log();
777 }
778
779 _un_backoff();
780 }
781 }
782
783 void MonClient::_un_backoff()
784 {
785 // un-backoff our reconnect interval
786 reopen_interval_multiplier = std::max(
787 cct->_conf->get_val<double>("mon_client_hunt_interval_min_multiple"),
788 reopen_interval_multiplier /
789 cct->_conf->get_val<double>("mon_client_hunt_interval_backoff"));
790 ldout(cct, 20) << __func__ << " reopen_interval_multipler now "
791 << reopen_interval_multiplier << dendl;
792 }
793
794 void MonClient::schedule_tick()
795 {
796 struct C_Tick : public Context {
797 MonClient *monc;
798 explicit C_Tick(MonClient *m) : monc(m) {}
799 void finish(int r) override {
800 monc->tick();
801 }
802 };
803
804 if (_hunting()) {
805 timer.add_event_after(cct->_conf->mon_client_hunt_interval
806 * reopen_interval_multiplier,
807 new C_Tick(this));
808 } else
809 timer.add_event_after(cct->_conf->mon_client_ping_interval, new C_Tick(this));
810 }
811
812 // ---------
813
814 void MonClient::_renew_subs()
815 {
816 assert(monc_lock.is_locked());
817 if (sub_new.empty()) {
818 ldout(cct, 10) << __func__ << " - empty" << dendl;
819 return;
820 }
821
822 ldout(cct, 10) << __func__ << dendl;
823 if (!_opened())
824 _reopen_session();
825 else {
826 if (sub_renew_sent == utime_t())
827 sub_renew_sent = ceph_clock_now();
828
829 MMonSubscribe *m = new MMonSubscribe;
830 m->what = sub_new;
831 _send_mon_message(m);
832
833 // update sub_sent with sub_new
834 sub_new.insert(sub_sent.begin(), sub_sent.end());
835 std::swap(sub_new, sub_sent);
836 sub_new.clear();
837 }
838 }
839
840 void MonClient::handle_subscribe_ack(MMonSubscribeAck *m)
841 {
842 if (sub_renew_sent != utime_t()) {
843 // NOTE: this is only needed for legacy (infernalis or older)
844 // mons; see tick().
845 sub_renew_after = sub_renew_sent;
846 sub_renew_after += m->interval / 2.0;
847 ldout(cct, 10) << __func__ << " sent " << sub_renew_sent << " renew after " << sub_renew_after << dendl;
848 sub_renew_sent = utime_t();
849 } else {
850 ldout(cct, 10) << __func__ << " sent " << sub_renew_sent << ", ignoring" << dendl;
851 }
852
853 m->put();
854 }
855
856 int MonClient::_check_auth_tickets()
857 {
858 assert(monc_lock.is_locked());
859 if (active_con && auth) {
860 if (auth->need_tickets()) {
861 ldout(cct, 10) << __func__ << " getting new tickets!" << dendl;
862 MAuth *m = new MAuth;
863 m->protocol = auth->get_protocol();
864 auth->prepare_build_request();
865 auth->build_request(m->auth_payload);
866 _send_mon_message(m);
867 }
868
869 _check_auth_rotating();
870 }
871 return 0;
872 }
873
874 int MonClient::_check_auth_rotating()
875 {
876 assert(monc_lock.is_locked());
877 if (!rotating_secrets ||
878 !auth_principal_needs_rotating_keys(entity_name)) {
879 ldout(cct, 20) << "_check_auth_rotating not needed by " << entity_name << dendl;
880 return 0;
881 }
882
883 if (!active_con || !auth) {
884 ldout(cct, 10) << "_check_auth_rotating waiting for auth session" << dendl;
885 return 0;
886 }
887
888 utime_t now = ceph_clock_now();
889 utime_t cutoff = now;
890 cutoff -= MIN(30.0, cct->_conf->auth_service_ticket_ttl / 4.0);
891 utime_t issued_at_lower_bound = now;
892 issued_at_lower_bound -= cct->_conf->auth_service_ticket_ttl;
893 if (!rotating_secrets->need_new_secrets(cutoff)) {
894 ldout(cct, 10) << "_check_auth_rotating have uptodate secrets (they expire after " << cutoff << ")" << dendl;
895 rotating_secrets->dump_rotating();
896 return 0;
897 }
898
899 ldout(cct, 10) << "_check_auth_rotating renewing rotating keys (they expired before " << cutoff << ")" << dendl;
900 if (!rotating_secrets->need_new_secrets() &&
901 rotating_secrets->need_new_secrets(issued_at_lower_bound)) {
902 // the key has expired before it has been issued?
903 lderr(cct) << __func__ << " possible clock skew, rotating keys expired way too early"
904 << " (before " << issued_at_lower_bound << ")" << dendl;
905 }
906 if ((now > last_rotating_renew_sent) &&
907 double(now - last_rotating_renew_sent) < 1) {
908 ldout(cct, 10) << __func__ << " called too often (last: "
909 << last_rotating_renew_sent << "), skipping refresh" << dendl;
910 return 0;
911 }
912 MAuth *m = new MAuth;
913 m->protocol = auth->get_protocol();
914 if (auth->build_rotating_request(m->auth_payload)) {
915 last_rotating_renew_sent = now;
916 _send_mon_message(m);
917 } else {
918 m->put();
919 }
920 return 0;
921 }
922
923 int MonClient::wait_auth_rotating(double timeout)
924 {
925 Mutex::Locker l(monc_lock);
926 utime_t now = ceph_clock_now();
927 utime_t until = now;
928 until += timeout;
929
930 // Must be initialized
931 assert(auth != nullptr);
932
933 if (auth->get_protocol() == CEPH_AUTH_NONE)
934 return 0;
935
936 if (!rotating_secrets)
937 return 0;
938
939 while (auth_principal_needs_rotating_keys(entity_name) &&
940 rotating_secrets->need_new_secrets(now)) {
941 if (now >= until) {
942 ldout(cct, 0) << __func__ << " timed out after " << timeout << dendl;
943 return -ETIMEDOUT;
944 }
945 ldout(cct, 10) << __func__ << " waiting (until " << until << ")" << dendl;
946 auth_cond.WaitUntil(monc_lock, until);
947 now = ceph_clock_now();
948 }
949 ldout(cct, 10) << __func__ << " done" << dendl;
950 return 0;
951 }
952
953 // ---------
954
955 void MonClient::_send_command(MonCommand *r)
956 {
957 entity_addr_t peer;
958 if (active_con) {
959 peer = active_con->get_con()->get_peer_addr();
960 }
961
962 if (r->target_rank >= 0 &&
963 r->target_rank != monmap.get_rank(peer)) {
964 ldout(cct, 10) << __func__ << " " << r->tid << " " << r->cmd
965 << " wants rank " << r->target_rank
966 << ", reopening session"
967 << dendl;
968 if (r->target_rank >= (int)monmap.size()) {
969 ldout(cct, 10) << " target " << r->target_rank << " >= max mon " << monmap.size() << dendl;
970 _finish_command(r, -ENOENT, "mon rank dne");
971 return;
972 }
973 _reopen_session(r->target_rank);
974 return;
975 }
976
977 if (r->target_name.length() &&
978 r->target_name != monmap.get_name(peer)) {
979 ldout(cct, 10) << __func__ << " " << r->tid << " " << r->cmd
980 << " wants mon " << r->target_name
981 << ", reopening session"
982 << dendl;
983 if (!monmap.contains(r->target_name)) {
984 ldout(cct, 10) << " target " << r->target_name << " not present in monmap" << dendl;
985 _finish_command(r, -ENOENT, "mon dne");
986 return;
987 }
988 _reopen_session(monmap.get_rank(r->target_name));
989 return;
990 }
991
992 ldout(cct, 10) << __func__ << " " << r->tid << " " << r->cmd << dendl;
993 MMonCommand *m = new MMonCommand(monmap.fsid);
994 m->set_tid(r->tid);
995 m->cmd = r->cmd;
996 m->set_data(r->inbl);
997 _send_mon_message(m);
998 return;
999 }
1000
1001 void MonClient::_resend_mon_commands()
1002 {
1003 // resend any requests
1004 for (map<uint64_t,MonCommand*>::iterator p = mon_commands.begin();
1005 p != mon_commands.end();
1006 ++p) {
1007 _send_command(p->second);
1008 }
1009 }
1010
1011 void MonClient::handle_mon_command_ack(MMonCommandAck *ack)
1012 {
1013 MonCommand *r = NULL;
1014 uint64_t tid = ack->get_tid();
1015
1016 if (tid == 0 && !mon_commands.empty()) {
1017 r = mon_commands.begin()->second;
1018 ldout(cct, 10) << __func__ << " has tid 0, assuming it is " << r->tid << dendl;
1019 } else {
1020 map<uint64_t,MonCommand*>::iterator p = mon_commands.find(tid);
1021 if (p == mon_commands.end()) {
1022 ldout(cct, 10) << __func__ << " " << ack->get_tid() << " not found" << dendl;
1023 ack->put();
1024 return;
1025 }
1026 r = p->second;
1027 }
1028
1029 ldout(cct, 10) << __func__ << " " << r->tid << " " << r->cmd << dendl;
1030 if (r->poutbl)
1031 r->poutbl->claim(ack->get_data());
1032 _finish_command(r, ack->r, ack->rs);
1033 ack->put();
1034 }
1035
1036 int MonClient::_cancel_mon_command(uint64_t tid)
1037 {
1038 assert(monc_lock.is_locked());
1039
1040 map<ceph_tid_t, MonCommand*>::iterator it = mon_commands.find(tid);
1041 if (it == mon_commands.end()) {
1042 ldout(cct, 10) << __func__ << " tid " << tid << " dne" << dendl;
1043 return -ENOENT;
1044 }
1045
1046 ldout(cct, 10) << __func__ << " tid " << tid << dendl;
1047
1048 MonCommand *cmd = it->second;
1049 _finish_command(cmd, -ETIMEDOUT, "");
1050 return 0;
1051 }
1052
1053 void MonClient::_finish_command(MonCommand *r, int ret, string rs)
1054 {
1055 ldout(cct, 10) << __func__ << " " << r->tid << " = " << ret << " " << rs << dendl;
1056 if (r->prval)
1057 *(r->prval) = ret;
1058 if (r->prs)
1059 *(r->prs) = rs;
1060 if (r->onfinish)
1061 finisher.queue(r->onfinish, ret);
1062 mon_commands.erase(r->tid);
1063 delete r;
1064 }
1065
1066 void MonClient::start_mon_command(const vector<string>& cmd,
1067 const bufferlist& inbl,
1068 bufferlist *outbl, string *outs,
1069 Context *onfinish)
1070 {
1071 Mutex::Locker l(monc_lock);
1072 MonCommand *r = new MonCommand(++last_mon_command_tid);
1073 r->cmd = cmd;
1074 r->inbl = inbl;
1075 r->poutbl = outbl;
1076 r->prs = outs;
1077 r->onfinish = onfinish;
1078 if (cct->_conf->rados_mon_op_timeout > 0) {
1079 class C_CancelMonCommand : public Context
1080 {
1081 uint64_t tid;
1082 MonClient *monc;
1083 public:
1084 C_CancelMonCommand(uint64_t tid, MonClient *monc) : tid(tid), monc(monc) {}
1085 void finish(int r) override {
1086 monc->_cancel_mon_command(tid);
1087 }
1088 };
1089 r->ontimeout = new C_CancelMonCommand(r->tid, this);
1090 timer.add_event_after(cct->_conf->rados_mon_op_timeout, r->ontimeout);
1091 }
1092 mon_commands[r->tid] = r;
1093 _send_command(r);
1094 }
1095
1096 void MonClient::start_mon_command(const string &mon_name,
1097 const vector<string>& cmd,
1098 const bufferlist& inbl,
1099 bufferlist *outbl, string *outs,
1100 Context *onfinish)
1101 {
1102 Mutex::Locker l(monc_lock);
1103 MonCommand *r = new MonCommand(++last_mon_command_tid);
1104 r->target_name = mon_name;
1105 r->cmd = cmd;
1106 r->inbl = inbl;
1107 r->poutbl = outbl;
1108 r->prs = outs;
1109 r->onfinish = onfinish;
1110 mon_commands[r->tid] = r;
1111 _send_command(r);
1112 }
1113
1114 void MonClient::start_mon_command(int rank,
1115 const vector<string>& cmd,
1116 const bufferlist& inbl,
1117 bufferlist *outbl, string *outs,
1118 Context *onfinish)
1119 {
1120 Mutex::Locker l(monc_lock);
1121 MonCommand *r = new MonCommand(++last_mon_command_tid);
1122 r->target_rank = rank;
1123 r->cmd = cmd;
1124 r->inbl = inbl;
1125 r->poutbl = outbl;
1126 r->prs = outs;
1127 r->onfinish = onfinish;
1128 mon_commands[r->tid] = r;
1129 _send_command(r);
1130 }
1131
1132 // ---------
1133
1134 void MonClient::get_version(string map, version_t *newest, version_t *oldest, Context *onfinish)
1135 {
1136 version_req_d *req = new version_req_d(onfinish, newest, oldest);
1137 ldout(cct, 10) << "get_version " << map << " req " << req << dendl;
1138 Mutex::Locker l(monc_lock);
1139 MMonGetVersion *m = new MMonGetVersion();
1140 m->what = map;
1141 m->handle = ++version_req_id;
1142 version_requests[m->handle] = req;
1143 _send_mon_message(m);
1144 }
1145
1146 void MonClient::handle_get_version_reply(MMonGetVersionReply* m)
1147 {
1148 assert(monc_lock.is_locked());
1149 map<ceph_tid_t, version_req_d*>::iterator iter = version_requests.find(m->handle);
1150 if (iter == version_requests.end()) {
1151 ldout(cct, 0) << __func__ << " version request with handle " << m->handle
1152 << " not found" << dendl;
1153 } else {
1154 version_req_d *req = iter->second;
1155 ldout(cct, 10) << __func__ << " finishing " << req << " version " << m->version << dendl;
1156 version_requests.erase(iter);
1157 if (req->newest)
1158 *req->newest = m->version;
1159 if (req->oldest)
1160 *req->oldest = m->oldest_version;
1161 finisher.queue(req->context, 0);
1162 delete req;
1163 }
1164 m->put();
1165 }
1166
1167 AuthAuthorizer* MonClient::build_authorizer(int service_id) const {
1168 Mutex::Locker l(monc_lock);
1169 if (auth) {
1170 return auth->build_authorizer(service_id);
1171 } else {
1172 ldout(cct, 0) << __func__ << " for " << ceph_entity_type_name(service_id)
1173 << ", but no auth is available now" << dendl;
1174 return nullptr;
1175 }
1176 }
1177
1178 #define dout_subsys ceph_subsys_monc
1179 #undef dout_prefix
1180 #define dout_prefix *_dout << "monclient" << (have_session() ? ": " : "(hunting): ")
1181
1182 MonConnection::MonConnection(CephContext *cct, ConnectionRef con, uint64_t global_id)
1183 : cct(cct), con(con), global_id(global_id)
1184 {}
1185
1186 MonConnection::~MonConnection()
1187 {
1188 if (con) {
1189 con->mark_down();
1190 con.reset();
1191 }
1192 }
1193
1194 bool MonConnection::have_session() const
1195 {
1196 return state == State::HAVE_SESSION;
1197 }
1198
1199 void MonConnection::start(epoch_t epoch,
1200 const EntityName& entity_name,
1201 const AuthMethodList& auth_supported)
1202 {
1203 // restart authentication handshake
1204 state = State::NEGOTIATING;
1205
1206 // send an initial keepalive to ensure our timestamp is valid by the
1207 // time we are in an OPENED state (by sequencing this before
1208 // authentication).
1209 con->send_keepalive();
1210
1211 auto m = new MAuth;
1212 m->protocol = 0;
1213 m->monmap_epoch = epoch;
1214 __u8 struct_v = 1;
1215 ::encode(struct_v, m->auth_payload);
1216 ::encode(auth_supported.get_supported_set(), m->auth_payload);
1217 ::encode(entity_name, m->auth_payload);
1218 ::encode(global_id, m->auth_payload);
1219 con->send_message(m);
1220 }
1221
1222 int MonConnection::handle_auth(MAuthReply* m,
1223 const EntityName& entity_name,
1224 uint32_t want_keys,
1225 RotatingKeyRing* keyring)
1226 {
1227 if (state == State::NEGOTIATING) {
1228 int r = _negotiate(m, entity_name, want_keys, keyring);
1229 if (r) {
1230 return r;
1231 }
1232 state = State::AUTHENTICATING;
1233 }
1234 int r = authenticate(m);
1235 if (!r) {
1236 state = State::HAVE_SESSION;
1237 }
1238 return r;
1239 }
1240
1241 int MonConnection::_negotiate(MAuthReply *m,
1242 const EntityName& entity_name,
1243 uint32_t want_keys,
1244 RotatingKeyRing* keyring)
1245 {
1246 if (auth && (int)m->protocol == auth->get_protocol()) {
1247 // good, negotiation completed
1248 auth->reset();
1249 return 0;
1250 }
1251
1252 auth.reset(get_auth_client_handler(cct, m->protocol, keyring));
1253 if (!auth) {
1254 ldout(cct, 10) << "no handler for protocol " << m->protocol << dendl;
1255 if (m->result == -ENOTSUP) {
1256 ldout(cct, 10) << "none of our auth protocols are supported by the server"
1257 << dendl;
1258 }
1259 return m->result;
1260 }
1261
1262 // do not request MGR key unless the mon has the SERVER_KRAKEN
1263 // feature. otherwise it will give us an auth error. note that
1264 // we have to use the FEATUREMASK because pre-jewel the kraken
1265 // feature bit was used for something else.
1266 if ((want_keys & CEPH_ENTITY_TYPE_MGR) &&
1267 !(m->get_connection()->has_features(CEPH_FEATUREMASK_SERVER_KRAKEN))) {
1268 ldout(cct, 1) << __func__
1269 << " not requesting MGR keys from pre-kraken monitor"
1270 << dendl;
1271 want_keys &= ~CEPH_ENTITY_TYPE_MGR;
1272 }
1273 auth->set_want_keys(want_keys);
1274 auth->init(entity_name);
1275 auth->set_global_id(global_id);
1276 return 0;
1277 }
1278
1279 int MonConnection::authenticate(MAuthReply *m)
1280 {
1281 assert(auth);
1282 if (!m->global_id) {
1283 ldout(cct, 1) << "peer sent an invalid global_id" << dendl;
1284 }
1285 if (m->global_id != global_id) {
1286 // it's a new session
1287 auth->reset();
1288 global_id = m->global_id;
1289 auth->set_global_id(global_id);
1290 ldout(cct, 10) << "my global_id is " << m->global_id << dendl;
1291 }
1292 auto p = m->result_bl.begin();
1293 int ret = auth->handle_response(m->result, p);
1294 if (ret == -EAGAIN) {
1295 auto ma = new MAuth;
1296 ma->protocol = auth->get_protocol();
1297 auth->prepare_build_request();
1298 auth->build_request(ma->auth_payload);
1299 con->send_message(ma);
1300 }
1301 return ret;
1302 }