1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2009 Sage Weil <sage@newdream.net>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
15 #include "MonmapMonitor.h"
17 #include "messages/MMonCommand.h"
18 #include "messages/MMonJoin.h"
20 #include "common/ceph_argparse.h"
21 #include "common/errno.h"
23 #include "common/config.h"
24 #include "common/cmdparse.h"
26 #include "include/ceph_assert.h"
27 #include "include/stringify.h"
29 #define dout_subsys ceph_subsys_mon
31 #define dout_prefix _prefix(_dout, mon)
32 using namespace TOPNSPC::common
;
33 static ostream
& _prefix(std::ostream
*_dout
, Monitor
*mon
) {
34 return *_dout
<< "mon." << mon
->name
<< "@" << mon
->rank
35 << "(" << mon
->get_state_name()
36 << ").monmap v" << mon
->monmap
->epoch
<< " ";
39 void MonmapMonitor::create_initial()
41 dout(10) << __func__
<< " using current monmap" << dendl
;
42 pending_map
= *mon
->monmap
;
43 pending_map
.epoch
= 1;
45 if (g_conf()->mon_debug_no_initial_persistent_features
) {
46 derr
<< __func__
<< " mon_debug_no_initial_persistent_features=true"
49 // initialize with default persistent features for new clusters
50 pending_map
.persistent_features
= ceph::features::mon::get_persistent();
51 pending_map
.min_mon_release
= ceph_release();
55 void MonmapMonitor::update_from_paxos(bool *need_bootstrap
)
57 version_t version
= get_last_committed();
58 if (version
<= mon
->monmap
->get_epoch())
61 dout(10) << __func__
<< " version " << version
62 << ", my v " << mon
->monmap
->epoch
<< dendl
;
64 if (need_bootstrap
&& version
!= mon
->monmap
->get_epoch()) {
65 dout(10) << " signaling that we need a bootstrap" << dendl
;
66 *need_bootstrap
= true;
71 int ret
= get_version(version
, monmap_bl
);
72 ceph_assert(ret
== 0);
73 ceph_assert(monmap_bl
.length());
75 dout(10) << __func__
<< " got " << version
<< dendl
;
76 mon
->monmap
->decode(monmap_bl
);
78 if (mon
->store
->exists("mkfs", "monmap")) {
79 auto t(std::make_shared
<MonitorDBStore::Transaction
>());
80 t
->erase("mkfs", "monmap");
81 mon
->store
->apply_transaction(t
);
86 // make sure we've recorded min_mon_release
88 if (mon
->store
->read_meta("min_mon_release", &val
) < 0 ||
90 atoi(val
.c_str()) != (int)ceph_release()) {
91 dout(10) << __func__
<< " updating min_mon_release meta" << dendl
;
92 mon
->store
->write_meta("min_mon_release",
93 stringify(ceph_release()));
97 void MonmapMonitor::create_pending()
99 pending_map
= *mon
->monmap
;
101 pending_map
.last_changed
= ceph_clock_now();
102 dout(10) << __func__
<< " monmap epoch " << pending_map
.epoch
<< dendl
;
105 void MonmapMonitor::encode_pending(MonitorDBStore::TransactionRef t
)
107 dout(10) << __func__
<< " epoch " << pending_map
.epoch
<< dendl
;
109 ceph_assert(mon
->monmap
->epoch
+ 1 == pending_map
.epoch
||
110 pending_map
.epoch
== 1); // special case mkfs!
112 pending_map
.encode(bl
, mon
->get_quorum_con_features());
114 put_version(t
, pending_map
.epoch
, bl
);
115 put_last_committed(t
, pending_map
.epoch
);
117 // generate a cluster fingerprint, too?
118 if (pending_map
.epoch
== 1) {
119 mon
->prepare_new_fingerprint(t
);
123 class C_ApplyFeatures
: public Context
{
125 mon_feature_t features
;
126 ceph_release_t min_mon_release
;
128 C_ApplyFeatures(MonmapMonitor
*s
, const mon_feature_t
& f
, ceph_release_t mmr
) :
129 svc(s
), features(f
), min_mon_release(mmr
) { }
130 void finish(int r
) override
{
132 svc
->apply_mon_features(features
, min_mon_release
);
133 } else if (r
== -EAGAIN
|| r
== -ECANCELED
) {
134 // discard features if we're no longer on the quorum that
135 // established them in the first place.
138 ceph_abort_msg("bad C_ApplyFeatures return value");
143 void MonmapMonitor::apply_mon_features(const mon_feature_t
& features
,
144 ceph_release_t min_mon_release
)
146 if (!is_writeable()) {
147 dout(5) << __func__
<< " wait for service to be writeable" << dendl
;
148 wait_for_writeable_ctx(new C_ApplyFeatures(this, features
, min_mon_release
));
152 // do nothing here unless we have a full quorum
153 if (mon
->get_quorum().size() < mon
->monmap
->size()) {
157 ceph_assert(is_writeable());
158 ceph_assert(features
.contains_all(pending_map
.persistent_features
));
159 // we should never hit this because `features` should be the result
160 // of the quorum's supported features. But if it happens, die.
161 ceph_assert(ceph::features::mon::get_supported().contains_all(features
));
163 mon_feature_t new_features
=
164 (pending_map
.persistent_features
^
165 (features
& ceph::features::mon::get_persistent()));
167 if (new_features
.empty() &&
168 pending_map
.min_mon_release
== min_mon_release
) {
169 dout(10) << __func__
<< " min_mon_release (" << (int)min_mon_release
170 << ") and features (" << features
<< ") match" << dendl
;
174 if (!new_features
.empty()) {
175 dout(1) << __func__
<< " applying new features "
176 << new_features
<< ", had " << pending_map
.persistent_features
178 << (new_features
| pending_map
.persistent_features
)
180 pending_map
.persistent_features
|= new_features
;
182 if (min_mon_release
> pending_map
.min_mon_release
) {
183 dout(1) << __func__
<< " increasing min_mon_release to "
184 << ceph::to_integer
<int>(min_mon_release
) << " (" << min_mon_release
186 pending_map
.min_mon_release
= min_mon_release
;
192 void MonmapMonitor::on_active()
194 if (get_last_committed() >= 1 && !mon
->has_ever_joined
) {
195 // make note of the fact that i was, once, part of the quorum.
196 dout(10) << "noting that i was, once, part of an active quorum." << dendl
;
198 /* This is some form of nasty in-breeding we have between the MonmapMonitor
199 and the Monitor itself. We should find a way to get rid of it given our
200 new architecture. Until then, stick with it since we are a
201 single-threaded process and, truth be told, no one else relies on this
204 auto t(std::make_shared
<MonitorDBStore::Transaction
>());
205 t
->put(Monitor::MONITOR_NAME
, "joined", 1);
206 mon
->store
->apply_transaction(t
);
207 mon
->has_ever_joined
= true;
210 if (mon
->is_leader()) {
211 mon
->clog
->debug() << "monmap " << *mon
->monmap
;
214 apply_mon_features(mon
->get_quorum_mon_features(),
215 mon
->quorum_min_mon_release
);
218 bool MonmapMonitor::preprocess_query(MonOpRequestRef op
)
220 auto m
= op
->get_req
<PaxosServiceMessage
>();
221 switch (m
->get_type()) {
223 case MSG_MON_COMMAND
:
225 return preprocess_command(op
);
227 catch (const bad_cmd_get
& e
) {
229 mon
->reply_command(op
, -EINVAL
, e
.what(), bl
, get_last_committed());
233 return preprocess_join(op
);
240 void MonmapMonitor::dump_info(Formatter
*f
)
242 f
->dump_unsigned("monmap_first_committed", get_first_committed());
243 f
->dump_unsigned("monmap_last_committed", get_last_committed());
244 f
->open_object_section("monmap");
245 mon
->monmap
->dump(f
);
247 f
->open_array_section("quorum");
248 for (set
<int>::iterator q
= mon
->get_quorum().begin(); q
!= mon
->get_quorum().end(); ++q
)
249 f
->dump_int("mon", *q
);
253 bool MonmapMonitor::preprocess_command(MonOpRequestRef op
)
255 auto m
= op
->get_req
<MMonCommand
>();
261 if (!cmdmap_from_json(m
->cmd
, &cmdmap
, ss
)) {
262 string rs
= ss
.str();
263 mon
->reply_command(op
, -EINVAL
, rs
, rdata
, get_last_committed());
268 cmd_getval(cmdmap
, "prefix", prefix
);
270 MonSession
*session
= op
->get_session();
272 mon
->reply_command(op
, -EACCES
, "access denied", get_last_committed());
277 cmd_getval(cmdmap
, "format", format
, string("plain"));
278 boost::scoped_ptr
<Formatter
> f(Formatter::create(format
));
280 if (prefix
== "mon stat") {
281 mon
->monmap
->print_summary(ss
);
282 ss
<< ", election epoch " << mon
->get_epoch() << ", leader "
283 << mon
->get_leader() << " " << mon
->get_leader_name()
284 << ", quorum " << mon
->get_quorum() << " " << mon
->get_quorum_names();
289 } else if (prefix
== "mon getmap" ||
290 prefix
== "mon dump") {
294 cmd_getval(cmdmap
, "epoch", epochnum
, (int64_t)0);
297 MonMap
*p
= mon
->monmap
;
300 r
= get_version(epoch
, bl
);
302 ss
<< "there is no map for epoch " << epoch
;
306 ceph_assert(bl
.length() > 0);
313 if (prefix
== "mon getmap") {
314 p
->encode(rdata
, m
->get_connection()->get_features());
316 ss
<< "got monmap epoch " << p
->get_epoch();
317 } else if (prefix
== "mon dump") {
320 f
->open_object_section("monmap");
322 f
->open_array_section("quorum");
323 for (set
<int>::iterator q
= mon
->get_quorum().begin();
324 q
!= mon
->get_quorum().end(); ++q
) {
325 f
->dump_int("mon", *q
);
336 ss
<< "dumped monmap epoch " << p
->get_epoch();
338 if (p
!= mon
->monmap
) {
343 } else if (prefix
== "mon feature ls") {
345 bool list_with_value
= false;
347 if (cmd_getval(cmdmap
, "with_value", with_value
) &&
348 with_value
== "--with-value") {
349 list_with_value
= true;
352 MonMap
*p
= mon
->monmap
;
355 mon_feature_t supported
= ceph::features::mon::get_supported();
356 mon_feature_t persistent
= ceph::features::mon::get_persistent();
357 mon_feature_t required
= p
->get_required_features();
360 auto print_feature
= [&](mon_feature_t
& m_features
, const char* m_str
) {
363 m_features
.dump_with_value(f
.get(), m_str
);
365 m_features
.dump(f
.get(), m_str
);
368 m_features
.print_with_value(ds
);
370 m_features
.print(ds
);
375 f
->open_object_section("features");
377 f
->open_object_section("all");
378 print_feature(supported
, "supported");
379 print_feature(persistent
, "persistent");
380 f
->close_section(); // all
382 f
->open_object_section("monmap");
383 print_feature(p
->persistent_features
, "persistent");
384 print_feature(p
->optional_features
, "optional");
385 print_feature(required
, "required");
386 f
->close_section(); // monmap
388 f
->close_section(); // features
392 ds
<< "all features" << std::endl
394 print_feature(supported
, nullptr);
397 print_feature(persistent
, nullptr);
401 ds
<< "on current monmap (epoch "
402 << p
->get_epoch() << ")" << std::endl
404 print_feature(p
->persistent_features
, nullptr);
406 // omit optional features in plain-text
407 // makes it easier to read, and they're, currently, empty.
409 print_feature(required
, nullptr);
421 mon
->reply_command(op
, r
, rs
, rdata
, get_last_committed());
428 bool MonmapMonitor::prepare_update(MonOpRequestRef op
)
430 auto m
= op
->get_req
<PaxosServiceMessage
>();
431 dout(7) << __func__
<< " " << *m
<< " from " << m
->get_orig_source_inst() << dendl
;
433 switch (m
->get_type()) {
434 case MSG_MON_COMMAND
:
436 return prepare_command(op
);
437 } catch (const bad_cmd_get
& e
) {
439 mon
->reply_command(op
, -EINVAL
, e
.what(), bl
, get_last_committed());
443 return prepare_join(op
);
451 bool MonmapMonitor::prepare_command(MonOpRequestRef op
)
453 auto m
= op
->get_req
<MMonCommand
>();
459 if (!cmdmap_from_json(m
->cmd
, &cmdmap
, ss
)) {
460 string rs
= ss
.str();
461 mon
->reply_command(op
, -EINVAL
, rs
, get_last_committed());
466 cmd_getval(cmdmap
, "prefix", prefix
);
468 MonSession
*session
= op
->get_session();
470 mon
->reply_command(op
, -EACCES
, "access denied", get_last_committed());
474 /* We should follow the following rules:
476 * - 'monmap' is the current, consistent version of the monmap
477 * - 'pending_map' is the uncommitted version of the monmap
479 * All checks for the current state must be made against 'monmap'.
480 * All changes are made against 'pending_map'.
482 * If there are concurrent operations modifying 'pending_map', please
483 * follow the following rules.
485 * - if pending_map has already been changed, the second operation must
486 * wait for the proposal to finish and be run again; This is the easiest
487 * path to guarantee correctness but may impact performance (i.e., it
488 * will take longer for the user to get a reply).
490 * - if the result of the second operation can be guaranteed to be
491 * idempotent, the operation may reply to the user once the proposal
492 * finishes; still needs to wait for the proposal to finish.
494 * - An operation _NEVER_ returns to the user based on pending state.
496 * If an operation does not modify current stable monmap, it may be
497 * serialized before current pending map, regardless of any change that
498 * has been made to the pending map -- remember, pending is uncommitted
499 * state, thus we are not bound by it.
502 ceph_assert(mon
->monmap
);
503 MonMap
&monmap
= *mon
->monmap
;
508 * Adding or removing monitors may lead to loss of quorum.
510 * Because quorum may be lost, it's important to reply something
511 * to the user, lest she end up waiting forever for a reply. And
512 * no reply will ever be sent until quorum is formed again.
514 * On the other hand, this means we're leaking uncommitted state
515 * to the user. As such, please be mindful of the reply message.
517 * e.g., 'adding monitor mon.foo' is okay ('adding' is an on-going
518 * operation and conveys its not-yet-permanent nature); whereas
519 * 'added monitor mon.foo' presumes the action has successfully
520 * completed and state has been committed, which may not be true.
524 bool propose
= false;
525 if (prefix
== "mon add") {
527 cmd_getval(cmdmap
, "name", name
);
529 cmd_getval(cmdmap
, "addr", addrstr
);
533 if (!addr
.parse(addrstr
.c_str())) {
535 ss
<< "addr " << addrstr
<< "does not parse";
539 entity_addrvec_t addrs
;
540 if (monmap
.persistent_features
.contains_all(
541 ceph::features::mon::FEATURE_NAUTILUS
)) {
542 if (addr
.get_port() == CEPH_MON_PORT_IANA
) {
543 addr
.set_type(entity_addr_t::TYPE_MSGR2
);
545 if (addr
.get_port() == CEPH_MON_PORT_LEGACY
) {
546 // if they specified the *old* default they probably don't care
549 if (addr
.get_port()) {
550 addrs
.v
.push_back(addr
);
552 addr
.set_type(entity_addr_t::TYPE_MSGR2
);
553 addr
.set_port(CEPH_MON_PORT_IANA
);
554 addrs
.v
.push_back(addr
);
555 addr
.set_type(entity_addr_t::TYPE_LEGACY
);
556 addr
.set_port(CEPH_MON_PORT_LEGACY
);
557 addrs
.v
.push_back(addr
);
560 if (addr
.get_port() == 0) {
561 addr
.set_port(CEPH_MON_PORT_LEGACY
);
563 addr
.set_type(entity_addr_t::TYPE_LEGACY
);
564 addrs
.v
.push_back(addr
);
566 dout(20) << __func__
<< " addr " << addr
<< " -> addrs " << addrs
<< dendl
;
569 * If we have a monitor with the same name and different addr, then EEXIST
570 * If we have a monitor with the same addr and different name, then EEXIST
571 * If we have a monitor with the same addr and same name, then wait for
572 * the proposal to finish and return success.
573 * If we don't have the monitor, add it.
577 if (!ss
.str().empty())
581 if (monmap
.contains(name
)) {
582 if (monmap
.get_addrs(name
) == addrs
) {
583 // stable map contains monitor with the same name at the same address.
584 // serialize before current pending map.
585 err
= 0; // for clarity; this has already been set above.
586 ss
<< "mon." << name
<< " at " << addrs
<< " already exists";
590 << " already exists at address " << monmap
.get_addrs(name
);
592 } else if (monmap
.contains(addrs
)) {
593 // we established on the previous branch that name is different
594 ss
<< "mon." << monmap
.get_name(addrs
)
595 << " already exists at address " << addr
;
604 /* Given there's no delay between proposals on the MonmapMonitor (see
605 * MonmapMonitor::should_propose()), there is no point in checking for
606 * a mismatch between name and addr on pending_map.
608 * Once we established the monitor does not exist in the committed state,
609 * we can simply go ahead and add the monitor.
612 pending_map
.add(name
, addrs
);
613 pending_map
.last_changed
= ceph_clock_now();
614 ss
<< "adding mon." << name
<< " at " << addrs
;
616 dout(0) << __func__
<< " proposing new mon." << name
<< dendl
;
618 } else if (prefix
== "mon remove" ||
619 prefix
== "mon rm") {
621 cmd_getval(cmdmap
, "name", name
);
622 if (!monmap
.contains(name
)) {
624 ss
<< "mon." << name
<< " does not exist or has already been removed";
628 if (monmap
.size() == 1) {
630 ss
<< "error: refusing removal of last monitor " << name
;
634 /* At the time of writing, there is no risk of races when multiple clients
635 * attempt to use the same name. The reason is simple but may not be
638 * In a nutshell, we do not collate proposals on the MonmapMonitor. As
639 * soon as we return 'true' below, PaxosService::dispatch() will check if
640 * the service should propose, and - if so - the service will be marked as
641 * 'proposing' and a proposal will be triggered. The PaxosService class
642 * guarantees that once a service is marked 'proposing' no further writes
645 * The decision on whether the service should propose or not is, in this
646 * case, made by MonmapMonitor::should_propose(), which always considers
647 * the proposal delay being 0.0 seconds. This is key for PaxosService to
648 * trigger the proposal immediately.
649 * 0.0 seconds of delay.
651 * From the above, there's no point in performing further checks on the
652 * pending_map, as we don't ever have multiple proposals in-flight in
653 * this service. As we've established the committed state contains the
654 * monitor, we can simply go ahead and remove it.
656 * Please note that the code hinges on all of the above to be true. It
657 * has been true since time immemorial and we don't see a good reason
658 * to make it sturdier at this time - mainly because we don't think it's
659 * going to change any time soon, lest for any bug that may be unwillingly
663 entity_addrvec_t addrs
= pending_map
.get_addrs(name
);
664 pending_map
.remove(name
);
665 pending_map
.last_changed
= ceph_clock_now();
666 ss
<< "removing mon." << name
<< " at " << addrs
667 << ", there will be " << pending_map
.size() << " monitors" ;
671 } else if (prefix
== "mon feature set") {
675 * We currently only support setting/unsetting persistent features.
676 * This is by design, given at the moment we still don't have optional
677 * features, and, as such, there is no point introducing an interface
678 * to manipulate them. This allows us to provide a cleaner, more
679 * intuitive interface to the user, modifying solely persistent
682 * In the future we should consider adding another interface to handle
683 * optional features/flags; e.g., 'mon feature flag set/unset', or
684 * 'mon flag set/unset'.
687 if (!cmd_getval(cmdmap
, "feature_name", feature_name
)) {
688 ss
<< "missing required feature name";
693 mon_feature_t feature
;
694 feature
= ceph::features::mon::get_feature_by_name(feature_name
);
695 if (feature
== ceph::features::mon::FEATURE_NONE
) {
696 ss
<< "unknown feature '" << feature_name
<< "'";
702 cmd_getval(cmdmap
, "yes_i_really_mean_it", sure
);
704 ss
<< "please specify '--yes-i-really-mean-it' if you "
705 << "really, **really** want to set feature '"
706 << feature
<< "' in the monmap.";
711 if (!mon
->get_quorum_mon_features().contains_all(feature
)) {
712 ss
<< "current quorum does not support feature '" << feature
713 << "'; supported features: "
714 << mon
->get_quorum_mon_features();
719 ss
<< "setting feature '" << feature
<< "'";
722 if (monmap
.persistent_features
.contains_all(feature
)) {
723 dout(10) << __func__
<< " feature '" << feature
724 << "' already set on monmap; no-op." << dendl
;
728 pending_map
.persistent_features
.set_feature(feature
);
729 pending_map
.last_changed
= ceph_clock_now();
732 dout(1) << __func__
<< " " << ss
.str() << "; new features will be: "
733 << "persistent = " << pending_map
.persistent_features
734 // output optional nevertheless, for auditing purposes.
735 << ", optional = " << pending_map
.optional_features
<< dendl
;
737 } else if (prefix
== "mon set-rank") {
740 if (!cmd_getval(cmdmap
, "name", name
) ||
741 !cmd_getval(cmdmap
, "rank", rank
)) {
745 int oldrank
= pending_map
.get_rank(name
);
747 ss
<< "mon." << name
<< " does not exist in monmap";
752 pending_map
.set_rank(name
, rank
);
753 pending_map
.last_changed
= ceph_clock_now();
755 } else if (prefix
== "mon set-addrs") {
758 if (!cmd_getval(cmdmap
, "name", name
) ||
759 !cmd_getval(cmdmap
, "addrs", addrs
)) {
763 if (!pending_map
.contains(name
)) {
764 ss
<< "mon." << name
<< " does not exist";
769 if (!av
.parse(addrs
.c_str(), nullptr)) {
770 ss
<< "failed to parse addrs '" << addrs
<< "'";
774 for (auto& a
: av
.v
) {
777 ss
<< "monitor must bind to a non-zero port, not " << a
;
783 pending_map
.set_addrvec(name
, av
);
784 pending_map
.last_changed
= ceph_clock_now();
786 } else if (prefix
== "mon set-weight") {
789 if (!cmd_getval(cmdmap
, "name", name
) ||
790 !cmd_getval(cmdmap
, "weight", weight
)) {
794 if (!pending_map
.contains(name
)) {
795 ss
<< "mon." << name
<< " does not exist";
800 pending_map
.set_weight(name
, weight
);
801 pending_map
.last_changed
= ceph_clock_now();
803 } else if (prefix
== "mon enable-msgr2") {
804 if (!monmap
.get_required_features().contains_all(
805 ceph::features::mon::FEATURE_NAUTILUS
)) {
807 ss
<< "all monitors must be running nautilus to enable v2";
810 for (auto& i
: pending_map
.mon_info
) {
811 if (i
.second
.public_addrs
.v
.size() == 1 &&
812 i
.second
.public_addrs
.front().is_legacy() &&
813 i
.second
.public_addrs
.front().get_port() == CEPH_MON_PORT_LEGACY
) {
815 entity_addr_t a
= i
.second
.public_addrs
.front();
816 a
.set_type(entity_addr_t::TYPE_MSGR2
);
817 a
.set_port(CEPH_MON_PORT_IANA
);
819 av
.v
.push_back(i
.second
.public_addrs
.front());
820 dout(10) << " setting mon." << i
.first
821 << " addrs " << i
.second
.public_addrs
822 << " -> " << av
<< dendl
;
823 pending_map
.set_addrvec(i
.first
, av
);
825 pending_map
.last_changed
= ceph_clock_now();
830 ss
<< "unknown command " << prefix
;
836 mon
->reply_command(op
, err
, rs
, get_last_committed());
837 // we are returning to the user; do not propose.
841 bool MonmapMonitor::preprocess_join(MonOpRequestRef op
)
843 auto join
= op
->get_req
<MMonJoin
>();
844 dout(10) << __func__
<< " " << join
->name
<< " at " << join
->addrs
<< dendl
;
846 MonSession
*session
= op
->get_session();
848 !session
->is_capable("mon", MON_CAP_W
| MON_CAP_X
)) {
849 dout(10) << " insufficient caps" << dendl
;
853 if (pending_map
.contains(join
->name
) &&
854 !pending_map
.get_addrs(join
->name
).front().is_blank_ip()) {
855 dout(10) << " already have " << join
->name
<< dendl
;
858 if (pending_map
.contains(join
->addrs
) &&
859 pending_map
.get_name(join
->addrs
) == join
->name
) {
860 dout(10) << " already have " << join
->addrs
<< dendl
;
865 bool MonmapMonitor::prepare_join(MonOpRequestRef op
)
867 auto join
= op
->get_req
<MMonJoin
>();
868 dout(0) << "adding/updating " << join
->name
869 << " at " << join
->addrs
<< " to monitor cluster" << dendl
;
870 if (pending_map
.contains(join
->name
))
871 pending_map
.remove(join
->name
);
872 if (pending_map
.contains(join
->addrs
))
873 pending_map
.remove(pending_map
.get_name(join
->addrs
));
874 pending_map
.add(join
->name
, join
->addrs
);
875 pending_map
.last_changed
= ceph_clock_now();
879 bool MonmapMonitor::should_propose(double& delay
)
885 int MonmapMonitor::get_monmap(bufferlist
&bl
)
887 version_t latest_ver
= get_last_committed();
888 dout(10) << __func__
<< " ver " << latest_ver
<< dendl
;
890 if (!mon
->store
->exists(get_service_name(), stringify(latest_ver
)))
893 int err
= get_version(latest_ver
, bl
);
895 dout(1) << __func__
<< " error obtaining monmap: "
896 << cpp_strerror(err
) << dendl
;
902 void MonmapMonitor::check_subs()
904 const string type
= "monmap";
905 mon
->with_session_map([this, &type
](const MonSessionMap
& session_map
) {
906 auto subs
= session_map
.subs
.find(type
);
907 if (subs
== session_map
.subs
.end())
909 for (auto sub
: *subs
->second
) {
915 void MonmapMonitor::check_sub(Subscription
*sub
)
917 const auto epoch
= mon
->monmap
->get_epoch();
919 << " monmap next " << sub
->next
920 << " have " << epoch
<< dendl
;
921 if (sub
->next
<= epoch
) {
922 mon
->send_latest_monmap(sub
->session
->con
.get());
924 mon
->with_session_map([sub
](MonSessionMap
& session_map
) {
925 session_map
.remove_sub(sub
);
928 sub
->next
= epoch
+ 1;
933 void MonmapMonitor::tick()
940 if (mon
->monmap
->created
.is_zero()) {
941 dout(10) << __func__
<< " detected empty created stamp" << dendl
;
943 for (version_t v
= 1; v
<= get_last_committed(); v
++) {
945 int r
= get_version(v
, bl
);
950 auto p
= bl
.cbegin();
952 if (!m
.last_changed
.is_zero()) {
953 dout(10) << __func__
<< " first monmap with last_changed is "
954 << v
<< " with " << m
.last_changed
<< dendl
;
955 ctime
= m
.last_changed
;
959 if (ctime
.is_zero()) {
960 ctime
= ceph_clock_now();
962 dout(10) << __func__
<< " updating created stamp to " << ctime
<< dendl
;
963 pending_map
.created
= ctime
;