]> git.proxmox.com Git - ceph.git/blob - ceph/src/mon/MonmapMonitor.cc
6c5a9ce5736a7fa1e12593d3450d1f517fd7573f
[ceph.git] / ceph / src / mon / MonmapMonitor.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2009 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #include "MonmapMonitor.h"
16 #include "Monitor.h"
17 #include "OSDMonitor.h"
18 #include "messages/MMonCommand.h"
19 #include "messages/MMonJoin.h"
20
21 #include "common/ceph_argparse.h"
22 #include "common/errno.h"
23 #include <sstream>
24 #include "common/config.h"
25 #include "common/cmdparse.h"
26
27 #include "include/ceph_assert.h"
28 #include "include/stringify.h"
29
30 #define dout_subsys ceph_subsys_mon
31 #undef dout_prefix
32 #define dout_prefix _prefix(_dout, mon)
33 using namespace TOPNSPC::common;
34
35 using std::cout;
36 using std::dec;
37 using std::hex;
38 using std::list;
39 using std::map;
40 using std::make_pair;
41 using std::ostream;
42 using std::ostringstream;
43 using std::pair;
44 using std::set;
45 using std::setfill;
46 using std::string;
47 using std::stringstream;
48 using std::to_string;
49 using std::vector;
50 using std::unique_ptr;
51
52 using ceph::bufferlist;
53 using ceph::decode;
54 using ceph::encode;
55 using ceph::Formatter;
56 using ceph::JSONFormatter;
57 using ceph::make_message;
58 using ceph::mono_clock;
59 using ceph::mono_time;
60 using ceph::timespan_str;
61 static ostream& _prefix(std::ostream *_dout, Monitor &mon) {
62 return *_dout << "mon." << mon.name << "@" << mon.rank
63 << "(" << mon.get_state_name()
64 << ").monmap v" << mon.monmap->epoch << " ";
65 }
66
67 void MonmapMonitor::create_initial()
68 {
69 dout(10) << __func__ << " using current monmap" << dendl;
70 pending_map = *mon.monmap;
71 pending_map.epoch = 1;
72
73 if (g_conf()->mon_debug_no_initial_persistent_features) {
74 derr << __func__ << " mon_debug_no_initial_persistent_features=true"
75 << dendl;
76 } else {
77 // initialize with default persistent features for new clusters
78 pending_map.persistent_features = ceph::features::mon::get_persistent();
79 pending_map.min_mon_release = ceph_release();
80 }
81 }
82
83 void MonmapMonitor::update_from_paxos(bool *need_bootstrap)
84 {
85 version_t version = get_last_committed();
86 if (version <= mon.monmap->get_epoch())
87 return;
88
89 dout(10) << __func__ << " version " << version
90 << ", my v " << mon.monmap->epoch << dendl;
91
92 if (need_bootstrap && version != mon.monmap->get_epoch()) {
93 dout(10) << " signaling that we need a bootstrap" << dendl;
94 *need_bootstrap = true;
95 }
96
97 // read and decode
98 monmap_bl.clear();
99 int ret = get_version(version, monmap_bl);
100 ceph_assert(ret == 0);
101 ceph_assert(monmap_bl.length());
102
103 dout(10) << __func__ << " got " << version << dendl;
104 mon.monmap->decode(monmap_bl);
105
106 if (mon.store->exists("mkfs", "monmap")) {
107 auto t(std::make_shared<MonitorDBStore::Transaction>());
108 t->erase("mkfs", "monmap");
109 mon.store->apply_transaction(t);
110 }
111
112 check_subs();
113
114 // make sure we've recorded min_mon_release
115 string val;
116 if (mon.store->read_meta("min_mon_release", &val) < 0 ||
117 val.size() == 0 ||
118 atoi(val.c_str()) != (int)ceph_release()) {
119 dout(10) << __func__ << " updating min_mon_release meta" << dendl;
120 mon.store->write_meta("min_mon_release",
121 stringify(ceph_release()));
122 }
123
124 mon.notify_new_monmap(true);
125 }
126
127 void MonmapMonitor::create_pending()
128 {
129 pending_map = *mon.monmap;
130 pending_map.epoch++;
131 pending_map.last_changed = ceph_clock_now();
132 dout(10) << __func__ << " monmap epoch " << pending_map.epoch << dendl;
133 }
134
135 void MonmapMonitor::encode_pending(MonitorDBStore::TransactionRef t)
136 {
137 dout(10) << __func__ << " epoch " << pending_map.epoch << dendl;
138
139 ceph_assert(mon.monmap->epoch + 1 == pending_map.epoch ||
140 pending_map.epoch == 1); // special case mkfs!
141 bufferlist bl;
142 pending_map.encode(bl, mon.get_quorum_con_features());
143
144 put_version(t, pending_map.epoch, bl);
145 put_last_committed(t, pending_map.epoch);
146
147 // generate a cluster fingerprint, too?
148 if (pending_map.epoch == 1) {
149 mon.prepare_new_fingerprint(t);
150 }
151
152 //health
153 health_check_map_t next;
154 pending_map.check_health(&next);
155 encode_health(next, t);
156 }
157
158 class C_ApplyFeatures : public Context {
159 MonmapMonitor *svc;
160 mon_feature_t features;
161 ceph_release_t min_mon_release;
162 public:
163 C_ApplyFeatures(MonmapMonitor *s, const mon_feature_t& f, ceph_release_t mmr) :
164 svc(s), features(f), min_mon_release(mmr) { }
165 void finish(int r) override {
166 if (r >= 0) {
167 svc->apply_mon_features(features, min_mon_release);
168 } else if (r == -EAGAIN || r == -ECANCELED) {
169 // discard features if we're no longer on the quorum that
170 // established them in the first place.
171 return;
172 } else {
173 ceph_abort_msg("bad C_ApplyFeatures return value");
174 }
175 }
176 };
177
178 void MonmapMonitor::apply_mon_features(const mon_feature_t& features,
179 ceph_release_t min_mon_release)
180 {
181 if (!is_writeable()) {
182 dout(5) << __func__ << " wait for service to be writeable" << dendl;
183 wait_for_writeable_ctx(new C_ApplyFeatures(this, features, min_mon_release));
184 return;
185 }
186
187 // do nothing here unless we have a full quorum
188 if (mon.get_quorum().size() < mon.monmap->size()) {
189 return;
190 }
191
192 ceph_assert(is_writeable());
193 ceph_assert(features.contains_all(pending_map.persistent_features));
194 // we should never hit this because `features` should be the result
195 // of the quorum's supported features. But if it happens, die.
196 ceph_assert(ceph::features::mon::get_supported().contains_all(features));
197
198 mon_feature_t new_features =
199 (pending_map.persistent_features ^
200 (features & ceph::features::mon::get_persistent()));
201
202 if (new_features.empty() &&
203 pending_map.min_mon_release == min_mon_release) {
204 dout(10) << __func__ << " min_mon_release (" << (int)min_mon_release
205 << ") and features (" << features << ") match" << dendl;
206 return;
207 }
208
209 if (!new_features.empty()) {
210 dout(1) << __func__ << " applying new features "
211 << new_features << ", had " << pending_map.persistent_features
212 << ", will have "
213 << (new_features | pending_map.persistent_features)
214 << dendl;
215 pending_map.persistent_features |= new_features;
216 }
217 if (min_mon_release > pending_map.min_mon_release) {
218 dout(1) << __func__ << " increasing min_mon_release to "
219 << to_integer<int>(min_mon_release) << " (" << min_mon_release
220 << ")" << dendl;
221 pending_map.min_mon_release = min_mon_release;
222 }
223
224 propose_pending();
225 }
226
227 void MonmapMonitor::on_active()
228 {
229 if (get_last_committed() >= 1 && !mon.has_ever_joined) {
230 // make note of the fact that i was, once, part of the quorum.
231 dout(10) << "noting that i was, once, part of an active quorum." << dendl;
232
233 /* This is some form of nasty in-breeding we have between the MonmapMonitor
234 and the Monitor itself. We should find a way to get rid of it given our
235 new architecture. Until then, stick with it since we are a
236 single-threaded process and, truth be told, no one else relies on this
237 thing besides us.
238 */
239 auto t(std::make_shared<MonitorDBStore::Transaction>());
240 t->put(Monitor::MONITOR_NAME, "joined", 1);
241 mon.store->apply_transaction(t);
242 mon.has_ever_joined = true;
243 }
244
245 if (mon.is_leader()) {
246 mon.clog->debug() << "monmap " << *mon.monmap;
247 }
248
249 apply_mon_features(mon.get_quorum_mon_features(),
250 mon.quorum_min_mon_release);
251
252 mon.update_pending_metadata();
253 }
254
255 bool MonmapMonitor::preprocess_query(MonOpRequestRef op)
256 {
257 auto m = op->get_req<PaxosServiceMessage>();
258 switch (m->get_type()) {
259 // READs
260 case MSG_MON_COMMAND:
261 try {
262 return preprocess_command(op);
263 }
264 catch (const bad_cmd_get& e) {
265 bufferlist bl;
266 mon.reply_command(op, -EINVAL, e.what(), bl, get_last_committed());
267 return true;
268 }
269 case MSG_MON_JOIN:
270 return preprocess_join(op);
271 default:
272 ceph_abort();
273 return true;
274 }
275 }
276
277 void MonmapMonitor::dump_info(Formatter *f)
278 {
279 f->dump_unsigned("monmap_first_committed", get_first_committed());
280 f->dump_unsigned("monmap_last_committed", get_last_committed());
281 f->open_object_section("monmap");
282 mon.monmap->dump(f);
283 f->close_section();
284 f->open_array_section("quorum");
285 for (set<int>::iterator q = mon.get_quorum().begin(); q != mon.get_quorum().end(); ++q)
286 f->dump_int("mon", *q);
287 f->close_section();
288 }
289
290 bool MonmapMonitor::preprocess_command(MonOpRequestRef op)
291 {
292 auto m = op->get_req<MMonCommand>();
293 int r = -1;
294 bufferlist rdata;
295 stringstream ss;
296
297 cmdmap_t cmdmap;
298 if (!cmdmap_from_json(m->cmd, &cmdmap, ss)) {
299 string rs = ss.str();
300 mon.reply_command(op, -EINVAL, rs, rdata, get_last_committed());
301 return true;
302 }
303
304 string prefix;
305 cmd_getval(cmdmap, "prefix", prefix);
306
307 MonSession *session = op->get_session();
308 if (!session) {
309 mon.reply_command(op, -EACCES, "access denied", get_last_committed());
310 return true;
311 }
312
313 string format = cmd_getval_or<string>(cmdmap, "format", "plain");
314 boost::scoped_ptr<Formatter> f(Formatter::create(format));
315
316 if (prefix == "mon stat") {
317 if (f) {
318 f->open_object_section("monmap");
319 mon.monmap->dump_summary(f.get());
320 f->dump_string("leader", mon.get_leader_name());
321 f->open_array_section("quorum");
322 for (auto rank: mon.get_quorum()) {
323 std::string name = mon.monmap->get_name(rank);
324 f->open_object_section("mon");
325 f->dump_int("rank", rank);
326 f->dump_string("name", name);
327 f->close_section(); // mon
328 }
329 f->close_section(); // quorum
330 f->close_section(); // monmap
331 f->flush(ss);
332 } else {
333 mon.monmap->print_summary(ss);
334 ss << ", election epoch " << mon.get_epoch() << ", leader "
335 << mon.get_leader() << " " << mon.get_leader_name()
336 << ", quorum " << mon.get_quorum()
337 << " " << mon.get_quorum_names();
338 }
339
340 rdata.append(ss);
341 ss.str("");
342 r = 0;
343
344 } else if (prefix == "mon getmap" ||
345 prefix == "mon dump") {
346
347 epoch_t epoch;
348 int64_t epochnum = cmd_getval_or<int64_t>(cmdmap, "epoch", 0);
349 epoch = epochnum;
350
351 MonMap *p = mon.monmap;
352 if (epoch) {
353 bufferlist bl;
354 r = get_version(epoch, bl);
355 if (r == -ENOENT) {
356 ss << "there is no map for epoch " << epoch;
357 goto reply;
358 }
359 ceph_assert(r == 0);
360 ceph_assert(bl.length() > 0);
361 p = new MonMap;
362 p->decode(bl);
363 }
364
365 ceph_assert(p);
366
367 if (prefix == "mon getmap") {
368 p->encode(rdata, m->get_connection()->get_features());
369 r = 0;
370 ss << "got monmap epoch " << p->get_epoch();
371 } else if (prefix == "mon dump") {
372 stringstream ds;
373 if (f) {
374 f->open_object_section("monmap");
375 p->dump(f.get());
376 f->open_array_section("quorum");
377 for (set<int>::iterator q = mon.get_quorum().begin();
378 q != mon.get_quorum().end(); ++q) {
379 f->dump_int("mon", *q);
380 }
381 f->close_section();
382 f->close_section();
383 f->flush(ds);
384 r = 0;
385 } else {
386 p->print(ds);
387 r = 0;
388 }
389 rdata.append(ds);
390 ss << "dumped monmap epoch " << p->get_epoch();
391 }
392 if (p != mon.monmap) {
393 delete p;
394 p = nullptr;
395 }
396
397 } else if (prefix == "mon feature ls") {
398
399 bool list_with_value = false;
400 cmd_getval_compat_cephbool(cmdmap, "with_value", list_with_value);
401
402 MonMap *p = mon.monmap;
403
404 // list features
405 mon_feature_t supported = ceph::features::mon::get_supported();
406 mon_feature_t persistent = ceph::features::mon::get_persistent();
407 mon_feature_t required = p->get_required_features();
408
409 stringstream ds;
410 auto print_feature = [&](mon_feature_t& m_features, const char* m_str) {
411 if (f) {
412 if (list_with_value)
413 m_features.dump_with_value(f.get(), m_str);
414 else
415 m_features.dump(f.get(), m_str);
416 } else {
417 if (list_with_value)
418 m_features.print_with_value(ds);
419 else
420 m_features.print(ds);
421 }
422 };
423
424 if (f) {
425 f->open_object_section("features");
426
427 f->open_object_section("all");
428 print_feature(supported, "supported");
429 print_feature(persistent, "persistent");
430 f->close_section(); // all
431
432 f->open_object_section("monmap");
433 print_feature(p->persistent_features, "persistent");
434 print_feature(p->optional_features, "optional");
435 print_feature(required, "required");
436 f->close_section(); // monmap
437
438 f->close_section(); // features
439 f->flush(ds);
440
441 } else {
442 ds << "all features" << std::endl
443 << "\tsupported: ";
444 print_feature(supported, nullptr);
445 ds << std::endl
446 << "\tpersistent: ";
447 print_feature(persistent, nullptr);
448 ds << std::endl
449 << std::endl;
450
451 ds << "on current monmap (epoch "
452 << p->get_epoch() << ")" << std::endl
453 << "\tpersistent: ";
454 print_feature(p->persistent_features, nullptr);
455 ds << std::endl
456 // omit optional features in plain-text
457 // makes it easier to read, and they're, currently, empty.
458 << "\trequired: ";
459 print_feature(required, nullptr);
460 ds << std::endl;
461 }
462 rdata.append(ds);
463 r = 0;
464 }
465
466 reply:
467 if (r != -1) {
468 string rs;
469 getline(ss, rs);
470
471 mon.reply_command(op, r, rs, rdata, get_last_committed());
472 return true;
473 } else
474 return false;
475 }
476
477
478 bool MonmapMonitor::prepare_update(MonOpRequestRef op)
479 {
480 auto m = op->get_req<PaxosServiceMessage>();
481 dout(7) << __func__ << " " << *m << " from " << m->get_orig_source_inst() << dendl;
482
483 switch (m->get_type()) {
484 case MSG_MON_COMMAND:
485 try {
486 return prepare_command(op);
487 } catch (const bad_cmd_get& e) {
488 bufferlist bl;
489 mon.reply_command(op, -EINVAL, e.what(), bl, get_last_committed());
490 return true;
491 }
492 case MSG_MON_JOIN:
493 return prepare_join(op);
494 default:
495 ceph_abort();
496 }
497
498 return false;
499 }
500
501 bool MonmapMonitor::prepare_command(MonOpRequestRef op)
502 {
503 auto m = op->get_req<MMonCommand>();
504 stringstream ss;
505 string rs;
506 int err = -EINVAL;
507
508 cmdmap_t cmdmap;
509 if (!cmdmap_from_json(m->cmd, &cmdmap, ss)) {
510 string rs = ss.str();
511 mon.reply_command(op, -EINVAL, rs, get_last_committed());
512 return true;
513 }
514
515 string prefix;
516 cmd_getval(cmdmap, "prefix", prefix);
517
518 MonSession *session = op->get_session();
519 if (!session) {
520 mon.reply_command(op, -EACCES, "access denied", get_last_committed());
521 return true;
522 }
523
524 /* We should follow the following rules:
525 *
526 * - 'monmap' is the current, consistent version of the monmap
527 * - 'pending_map' is the uncommitted version of the monmap
528 *
529 * All checks for the current state must be made against 'monmap'.
530 * All changes are made against 'pending_map'.
531 *
532 * If there are concurrent operations modifying 'pending_map', please
533 * follow the following rules.
534 *
535 * - if pending_map has already been changed, the second operation must
536 * wait for the proposal to finish and be run again; This is the easiest
537 * path to guarantee correctness but may impact performance (i.e., it
538 * will take longer for the user to get a reply).
539 *
540 * - if the result of the second operation can be guaranteed to be
541 * idempotent, the operation may reply to the user once the proposal
542 * finishes; still needs to wait for the proposal to finish.
543 *
544 * - An operation _NEVER_ returns to the user based on pending state.
545 *
546 * If an operation does not modify current stable monmap, it may be
547 * serialized before current pending map, regardless of any change that
548 * has been made to the pending map -- remember, pending is uncommitted
549 * state, thus we are not bound by it.
550 */
551
552 ceph_assert(mon.monmap);
553 MonMap &monmap = *mon.monmap;
554
555
556 /* Please note:
557 *
558 * Adding or removing monitors may lead to loss of quorum.
559 *
560 * Because quorum may be lost, it's important to reply something
561 * to the user, lest she end up waiting forever for a reply. And
562 * no reply will ever be sent until quorum is formed again.
563 *
564 * On the other hand, this means we're leaking uncommitted state
565 * to the user. As such, please be mindful of the reply message.
566 *
567 * e.g., 'adding monitor mon.foo' is okay ('adding' is an on-going
568 * operation and conveys its not-yet-permanent nature); whereas
569 * 'added monitor mon.foo' presumes the action has successfully
570 * completed and state has been committed, which may not be true.
571 */
572
573
574 bool propose = false;
575 if (prefix == "mon add") {
576 string name;
577 cmd_getval(cmdmap, "name", name);
578 string addrstr;
579 cmd_getval(cmdmap, "addr", addrstr);
580 entity_addr_t addr;
581 bufferlist rdata;
582
583 if (!addr.parse(addrstr)) {
584 err = -EINVAL;
585 ss << "addr " << addrstr << "does not parse";
586 goto reply;
587 }
588
589 vector<string> locationvec;
590 map<string, string> loc;
591 cmd_getval(cmdmap, "location", locationvec);
592 CrushWrapper::parse_loc_map(locationvec, &loc);
593 if (locationvec.size() &&
594 !mon.get_quorum_mon_features().contains_all(
595 ceph::features::mon::FEATURE_PINGING)) {
596 err = -ENOTSUP;
597 ss << "Not all monitors support adding monitors with a location; please upgrade first!";
598 goto reply;
599 }
600 if (locationvec.size() && !loc.size()) {
601 ss << "We could not parse your input location to anything real; " << locationvec
602 << " turned into an empty map!";
603 err = -EINVAL;
604 goto reply;
605 }
606
607 dout(10) << "mon add setting location for " << name << " to " << loc << dendl;
608
609 // TODO: validate location in crush map
610 if (monmap.stretch_mode_enabled && !loc.size()) {
611 ss << "We are in stretch mode and new monitors must have a location, but "
612 << "could not parse your input location to anything real; " << locationvec
613 << " turned into an empty map!";
614 err = -EINVAL;
615 goto reply;
616 }
617 // TODO: validate location against any existing stretch config
618
619 entity_addrvec_t addrs;
620 if (monmap.persistent_features.contains_all(
621 ceph::features::mon::FEATURE_NAUTILUS)) {
622 if (addr.get_port() == CEPH_MON_PORT_IANA) {
623 addr.set_type(entity_addr_t::TYPE_MSGR2);
624 }
625 if (addr.get_port() == CEPH_MON_PORT_LEGACY) {
626 // if they specified the *old* default they probably don't care
627 addr.set_port(0);
628 }
629 if (addr.get_port()) {
630 addrs.v.push_back(addr);
631 } else {
632 addr.set_type(entity_addr_t::TYPE_MSGR2);
633 addr.set_port(CEPH_MON_PORT_IANA);
634 addrs.v.push_back(addr);
635 addr.set_type(entity_addr_t::TYPE_LEGACY);
636 addr.set_port(CEPH_MON_PORT_LEGACY);
637 addrs.v.push_back(addr);
638 }
639 } else {
640 if (addr.get_port() == 0) {
641 addr.set_port(CEPH_MON_PORT_LEGACY);
642 }
643 addr.set_type(entity_addr_t::TYPE_LEGACY);
644 addrs.v.push_back(addr);
645 }
646 dout(20) << __func__ << " addr " << addr << " -> addrs " << addrs << dendl;
647
648 /**
649 * If we have a monitor with the same name and different addr, then EEXIST
650 * If we have a monitor with the same addr and different name, then EEXIST
651 * If we have a monitor with the same addr and same name, then wait for
652 * the proposal to finish and return success.
653 * If we don't have the monitor, add it.
654 */
655
656 err = 0;
657 if (!ss.str().empty())
658 ss << "; ";
659
660 do {
661 if (monmap.contains(name)) {
662 if (monmap.get_addrs(name) == addrs) {
663 // stable map contains monitor with the same name at the same address.
664 // serialize before current pending map.
665 err = 0; // for clarity; this has already been set above.
666 ss << "mon." << name << " at " << addrs << " already exists";
667 goto reply;
668 } else {
669 ss << "mon." << name
670 << " already exists at address " << monmap.get_addrs(name);
671 }
672 } else if (monmap.contains(addrs)) {
673 // we established on the previous branch that name is different
674 ss << "mon." << monmap.get_name(addrs)
675 << " already exists at address " << addr;
676 } else {
677 // go ahead and add
678 break;
679 }
680 err = -EEXIST;
681 goto reply;
682 } while (false);
683
684 if (pending_map.stretch_mode_enabled) {
685
686 }
687
688 /* Given there's no delay between proposals on the MonmapMonitor (see
689 * MonmapMonitor::should_propose()), there is no point in checking for
690 * a mismatch between name and addr on pending_map.
691 *
692 * Once we established the monitor does not exist in the committed state,
693 * we can simply go ahead and add the monitor.
694 */
695
696 pending_map.add(name, addrs);
697 pending_map.mon_info[name].crush_loc = loc;
698 pending_map.last_changed = ceph_clock_now();
699 ss << "adding mon." << name << " at " << addrs;
700 propose = true;
701 dout(0) << __func__ << " proposing new mon." << name << dendl;
702
703 } else if (prefix == "mon remove" ||
704 prefix == "mon rm") {
705 string name;
706 cmd_getval(cmdmap, "name", name);
707 if (!monmap.contains(name)) {
708 err = 0;
709 ss << "mon." << name << " does not exist or has already been removed";
710 goto reply;
711 }
712
713 if (monmap.size() == 1) {
714 err = -EINVAL;
715 ss << "error: refusing removal of last monitor " << name;
716 goto reply;
717 }
718
719 if (pending_map.stretch_mode_enabled &&
720 name == pending_map.tiebreaker_mon) {
721 err = -EINVAL;
722 ss << "you cannot remove stretch mode's tiebreaker monitor";
723 goto reply;
724 }
725 /* At the time of writing, there is no risk of races when multiple clients
726 * attempt to use the same name. The reason is simple but may not be
727 * obvious.
728 *
729 * In a nutshell, we do not collate proposals on the MonmapMonitor. As
730 * soon as we return 'true' below, PaxosService::dispatch() will check if
731 * the service should propose, and - if so - the service will be marked as
732 * 'proposing' and a proposal will be triggered. The PaxosService class
733 * guarantees that once a service is marked 'proposing' no further writes
734 * will be handled.
735 *
736 * The decision on whether the service should propose or not is, in this
737 * case, made by MonmapMonitor::should_propose(), which always considers
738 * the proposal delay being 0.0 seconds. This is key for PaxosService to
739 * trigger the proposal immediately.
740 * 0.0 seconds of delay.
741 *
742 * From the above, there's no point in performing further checks on the
743 * pending_map, as we don't ever have multiple proposals in-flight in
744 * this service. As we've established the committed state contains the
745 * monitor, we can simply go ahead and remove it.
746 *
747 * Please note that the code hinges on all of the above to be true. It
748 * has been true since time immemorial and we don't see a good reason
749 * to make it sturdier at this time - mainly because we don't think it's
750 * going to change any time soon, lest for any bug that may be unwillingly
751 * introduced.
752 */
753
754 entity_addrvec_t addrs = pending_map.get_addrs(name);
755 pending_map.remove(name);
756 pending_map.disallowed_leaders.erase(name);
757 pending_map.last_changed = ceph_clock_now();
758 ss << "removing mon." << name << " at " << addrs
759 << ", there will be " << pending_map.size() << " monitors" ;
760 propose = true;
761 err = 0;
762
763 } else if (prefix == "mon feature set") {
764
765 /* PLEASE NOTE:
766 *
767 * We currently only support setting/unsetting persistent features.
768 * This is by design, given at the moment we still don't have optional
769 * features, and, as such, there is no point introducing an interface
770 * to manipulate them. This allows us to provide a cleaner, more
771 * intuitive interface to the user, modifying solely persistent
772 * features.
773 *
774 * In the future we should consider adding another interface to handle
775 * optional features/flags; e.g., 'mon feature flag set/unset', or
776 * 'mon flag set/unset'.
777 */
778 string feature_name;
779 if (!cmd_getval(cmdmap, "feature_name", feature_name)) {
780 ss << "missing required feature name";
781 err = -EINVAL;
782 goto reply;
783 }
784
785 mon_feature_t feature;
786 feature = ceph::features::mon::get_feature_by_name(feature_name);
787 if (feature == ceph::features::mon::FEATURE_NONE) {
788 ss << "unknown feature '" << feature_name << "'";
789 err = -ENOENT;
790 goto reply;
791 }
792
793 bool sure = false;
794 cmd_getval(cmdmap, "yes_i_really_mean_it", sure);
795 if (!sure) {
796 ss << "please specify '--yes-i-really-mean-it' if you "
797 << "really, **really** want to set feature '"
798 << feature << "' in the monmap.";
799 err = -EPERM;
800 goto reply;
801 }
802
803 if (!mon.get_quorum_mon_features().contains_all(feature)) {
804 ss << "current quorum does not support feature '" << feature
805 << "'; supported features: "
806 << mon.get_quorum_mon_features();
807 err = -EINVAL;
808 goto reply;
809 }
810
811 ss << "setting feature '" << feature << "'";
812
813 err = 0;
814 if (monmap.persistent_features.contains_all(feature)) {
815 dout(10) << __func__ << " feature '" << feature
816 << "' already set on monmap; no-op." << dendl;
817 goto reply;
818 }
819
820 pending_map.persistent_features.set_feature(feature);
821 pending_map.last_changed = ceph_clock_now();
822 propose = true;
823
824 dout(1) << __func__ << " " << ss.str() << "; new features will be: "
825 << "persistent = " << pending_map.persistent_features
826 // output optional nevertheless, for auditing purposes.
827 << ", optional = " << pending_map.optional_features << dendl;
828
829 } else if (prefix == "mon set-rank") {
830 string name;
831 int64_t rank;
832 if (!cmd_getval(cmdmap, "name", name) ||
833 !cmd_getval(cmdmap, "rank", rank)) {
834 err = -EINVAL;
835 goto reply;
836 }
837 int oldrank = pending_map.get_rank(name);
838 if (oldrank < 0) {
839 ss << "mon." << name << " does not exist in monmap";
840 err = -ENOENT;
841 goto reply;
842 }
843 err = 0;
844 pending_map.set_rank(name, rank);
845 pending_map.last_changed = ceph_clock_now();
846 propose = true;
847 } else if (prefix == "mon set-addrs") {
848 string name;
849 string addrs;
850 if (!cmd_getval(cmdmap, "name", name) ||
851 !cmd_getval(cmdmap, "addrs", addrs)) {
852 err = -EINVAL;
853 goto reply;
854 }
855 if (!pending_map.contains(name)) {
856 ss << "mon." << name << " does not exist";
857 err = -ENOENT;
858 goto reply;
859 }
860 entity_addrvec_t av;
861 if (!av.parse(addrs.c_str(), nullptr)) {
862 ss << "failed to parse addrs '" << addrs << "'";
863 err = -EINVAL;
864 goto reply;
865 }
866 for (auto& a : av.v) {
867 a.set_nonce(0);
868 if (!a.get_port()) {
869 ss << "monitor must bind to a non-zero port, not " << a;
870 err = -EINVAL;
871 goto reply;
872 }
873 }
874 err = 0;
875 pending_map.set_addrvec(name, av);
876 pending_map.last_changed = ceph_clock_now();
877 propose = true;
878 } else if (prefix == "mon set-weight") {
879 string name;
880 int64_t weight;
881 if (!cmd_getval(cmdmap, "name", name) ||
882 !cmd_getval(cmdmap, "weight", weight)) {
883 err = -EINVAL;
884 goto reply;
885 }
886 if (!pending_map.contains(name)) {
887 ss << "mon." << name << " does not exist";
888 err = -ENOENT;
889 goto reply;
890 }
891 err = 0;
892 pending_map.set_weight(name, weight);
893 pending_map.last_changed = ceph_clock_now();
894 propose = true;
895 } else if (prefix == "mon enable-msgr2") {
896 if (!monmap.get_required_features().contains_all(
897 ceph::features::mon::FEATURE_NAUTILUS)) {
898 err = -EACCES;
899 ss << "all monitors must be running nautilus to enable v2";
900 goto reply;
901 }
902 for (auto& i : pending_map.mon_info) {
903 if (i.second.public_addrs.v.size() == 1 &&
904 i.second.public_addrs.front().is_legacy() &&
905 i.second.public_addrs.front().get_port() == CEPH_MON_PORT_LEGACY) {
906 entity_addrvec_t av;
907 entity_addr_t a = i.second.public_addrs.front();
908 a.set_type(entity_addr_t::TYPE_MSGR2);
909 a.set_port(CEPH_MON_PORT_IANA);
910 av.v.push_back(a);
911 av.v.push_back(i.second.public_addrs.front());
912 dout(10) << " setting mon." << i.first
913 << " addrs " << i.second.public_addrs
914 << " -> " << av << dendl;
915 pending_map.set_addrvec(i.first, av);
916 propose = true;
917 pending_map.last_changed = ceph_clock_now();
918 }
919 }
920 err = 0;
921 } else if (prefix == "mon set election_strategy") {
922 if (!mon.get_quorum_mon_features().contains_all(
923 ceph::features::mon::FEATURE_PINGING)) {
924 err = -ENOTSUP;
925 ss << "Not all monitors support changing election strategies; please upgrade first!";
926 goto reply;
927 }
928 string strat;
929 MonMap::election_strategy strategy;
930 if (!cmd_getval(cmdmap, "strategy", strat)) {
931 err = -EINVAL;
932 goto reply;
933 }
934 if (strat == "classic") {
935 strategy = MonMap::CLASSIC;
936 } else if (strat == "disallow") {
937 strategy = MonMap::DISALLOW;
938 } else if (strat == "connectivity") {
939 strategy = MonMap::CONNECTIVITY;
940 } else {
941 err = -EINVAL;
942 goto reply;
943 }
944 err = 0;
945 pending_map.strategy = strategy;
946 pending_map.last_changed = ceph_clock_now();
947 propose = true;
948 } else if (prefix == "mon add disallowed_leader") {
949 if (!mon.get_quorum_mon_features().contains_all(
950 ceph::features::mon::FEATURE_PINGING)) {
951 err = -ENOTSUP;
952 ss << "Not all monitors support changing election strategies; please upgrade first!";
953 goto reply;
954 }
955 string name;
956 if (!cmd_getval(cmdmap, "name", name)) {
957 err = -EINVAL;
958 goto reply;
959 }
960 if (pending_map.strategy != MonMap::DISALLOW &&
961 pending_map.strategy != MonMap::CONNECTIVITY) {
962 ss << "You cannot disallow monitors in your current election mode";
963 err = -EINVAL;
964 goto reply;
965 }
966 if (!pending_map.contains(name)) {
967 ss << "mon." << name << " does not exist";
968 err = -ENOENT;
969 goto reply;
970 }
971 if (pending_map.disallowed_leaders.count(name)) {
972 ss << "mon." << name << " is already disallowed";
973 err = 0;
974 goto reply;
975 }
976 if (pending_map.disallowed_leaders.size() == pending_map.size() - 1) {
977 ss << "mon." << name << " is the only remaining allowed leader!";
978 err = -EINVAL;
979 goto reply;
980 }
981 pending_map.disallowed_leaders.insert(name);
982 pending_map.last_changed = ceph_clock_now();
983 err = 0;
984 propose = true;
985 } else if (prefix == "mon rm disallowed_leader") {
986 if (!mon.get_quorum_mon_features().contains_all(
987 ceph::features::mon::FEATURE_PINGING)) {
988 err = -ENOTSUP;
989 ss << "Not all monitors support changing election strategies; please upgrade first!";
990 goto reply;
991 }
992 string name;
993 if (!cmd_getval(cmdmap, "name", name)) {
994 err = -EINVAL;
995 goto reply;
996 }
997 if (pending_map.strategy != MonMap::DISALLOW &&
998 pending_map.strategy != MonMap::CONNECTIVITY) {
999 ss << "You cannot disallow monitors in your current election mode";
1000 err = -EINVAL;
1001 goto reply;
1002 }
1003 if (!pending_map.contains(name)) {
1004 ss << "mon." << name << " does not exist";
1005 err = -ENOENT;
1006 goto reply;
1007 }
1008 if (!pending_map.disallowed_leaders.count(name)) {
1009 ss << "mon." << name << " is already allowed";
1010 err = 0;
1011 goto reply;
1012 }
1013 pending_map.disallowed_leaders.erase(name);
1014 pending_map.last_changed = ceph_clock_now();
1015 err = 0;
1016 propose = true;
1017 } else if (prefix == "mon set_location") {
1018 if (!mon.get_quorum_mon_features().contains_all(
1019 ceph::features::mon::FEATURE_PINGING)) {
1020 err = -ENOTSUP;
1021 ss << "Not all monitors support monitor locations; please upgrade first!";
1022 goto reply;
1023 }
1024 string name;
1025 if (!cmd_getval(cmdmap, "name", name)) {
1026 err = -EINVAL;
1027 goto reply;
1028 }
1029 if (!pending_map.contains(name)) {
1030 ss << "mon." << name << " does not exist";
1031 err = -ENOENT;
1032 goto reply;
1033 }
1034
1035 vector<string> argvec;
1036 map<string, string> loc;
1037 cmd_getval(cmdmap, "args", argvec);
1038 CrushWrapper::parse_loc_map(argvec, &loc);
1039
1040 dout(10) << "mon set_location for " << name << " to " << loc << dendl;
1041
1042 // TODO: validate location in crush map
1043 if (!loc.size()) {
1044 ss << "We could not parse your input location to anything real; " << argvec
1045 << " turned into an empty map!";
1046 err = -EINVAL;
1047 goto reply;
1048 }
1049 // TODO: validate location against any existing stretch config
1050 pending_map.mon_info[name].crush_loc = loc;
1051 pending_map.last_changed = ceph_clock_now();
1052 err = 0;
1053 propose = true;
1054 } else if (prefix == "mon set_new_tiebreaker") {
1055 if (!pending_map.stretch_mode_enabled) {
1056 err = -EINVAL;
1057 ss << "Stretch mode is not enabled, so there is no tiebreaker";
1058 goto reply;
1059 }
1060 string name;
1061 if (!cmd_getval(cmdmap, "name", name)) {
1062 err = -EINVAL;
1063 goto reply;
1064 }
1065 bool sure = false;
1066 cmd_getval(cmdmap, "yes_i_really_mean_it", sure);
1067
1068 const auto &existing_tiebreaker_info_i = pending_map.mon_info.find(pending_map.tiebreaker_mon);
1069 const auto &new_tiebreaker_info_i = pending_map.mon_info.find(name);
1070 if (new_tiebreaker_info_i == pending_map.mon_info.end()) {
1071 ss << "mon." << name << " does not exist";
1072 err = -ENOENT;
1073 goto reply;
1074 }
1075 const auto& new_info = new_tiebreaker_info_i->second;
1076 if (new_info.crush_loc.empty()) {
1077 ss << "mon." << name << " does not have a location specified";
1078 err = -EINVAL;
1079 goto reply;
1080 }
1081
1082 if (!mon.osdmon()->is_readable()) {
1083 dout(10) << __func__
1084 << ": waiting for osdmon readable to inspect crush barrier"
1085 << dendl;
1086 mon.osdmon()->wait_for_readable(op, new Monitor::C_RetryMessage(&mon, op));
1087 return false;
1088 }
1089 int32_t stretch_divider_id = mon.osdmon()->osdmap.stretch_mode_bucket;
1090 string stretch_bucket_divider = mon.osdmon()->osdmap.crush->
1091 get_type_name(stretch_divider_id);
1092
1093 const auto& new_loc_i = new_info.crush_loc.find(stretch_bucket_divider);
1094 if (new_loc_i == new_info.crush_loc.end()) {
1095 ss << "mon." << name << " has a specificed location, but not a "
1096 << stretch_bucket_divider << ", which is the stretch divider";
1097 err = -EINVAL;
1098 goto reply;
1099 }
1100 const string& new_loc = new_loc_i->second;
1101 set<string> matching_mons;
1102 for (const auto& mii : pending_map.mon_info) {
1103 const auto& other_loc_i = mii.second.crush_loc.find(stretch_bucket_divider);
1104 if (mii.first == name) {
1105 continue;
1106 }
1107 if (other_loc_i == mii.second.crush_loc.end()) { // huh
1108 continue;
1109 }
1110 const string& other_loc = other_loc_i->second;
1111 if (other_loc == new_loc &&
1112 mii.first != existing_tiebreaker_info_i->first) {
1113 matching_mons.insert(mii.first);
1114 }
1115 }
1116 if (!matching_mons.empty()) {
1117 ss << "mon." << name << " has location " << new_loc_i->second
1118 << ", which matches mons " << matching_mons << " on the "
1119 << stretch_bucket_divider << " dividing bucket for stretch mode. "
1120 "Pass --yes-i-really-mean-it if you're sure you want to do this."
1121 "(You really don't.)";
1122 err = -EINVAL;
1123 goto reply;
1124 }
1125 pending_map.tiebreaker_mon = name;
1126 pending_map.disallowed_leaders.insert(name);
1127 pending_map.last_changed = ceph_clock_now();
1128 err = 0;
1129 propose = true;
1130 } else if (prefix == "mon enable_stretch_mode") {
1131 if (!mon.osdmon()->is_writeable()) {
1132 dout(10) << __func__
1133 << ": waiting for osdmon writeable for stretch mode" << dendl;
1134 mon.osdmon()->wait_for_writeable(op, new Monitor::C_RetryMessage(&mon, op));
1135 return false;
1136 }
1137 {
1138 if (monmap.stretch_mode_enabled) {
1139 ss << "stretch mode is already engaged";
1140 err = -EINVAL;
1141 goto reply;
1142 }
1143 if (pending_map.stretch_mode_enabled) {
1144 ss << "stretch mode currently committing";
1145 err = 0;
1146 goto reply;
1147 }
1148 string tiebreaker_mon;
1149 if (!cmd_getval(cmdmap, "tiebreaker_mon", tiebreaker_mon)) {
1150 ss << "must specify a tiebreaker monitor";
1151 err = -EINVAL;
1152 goto reply;
1153 }
1154 string new_crush_rule;
1155 if (!cmd_getval(cmdmap, "new_crush_rule", new_crush_rule)) {
1156 ss << "must specify a new crush rule that spreads out copies over multiple sites";
1157 err = -EINVAL;
1158 goto reply;
1159 }
1160 string dividing_bucket;
1161 if (!cmd_getval(cmdmap, "dividing_bucket", dividing_bucket)) {
1162 ss << "must specify a dividing bucket";
1163 err = -EINVAL;
1164 goto reply;
1165 }
1166 //okay, initial arguments make sense, check pools and cluster state
1167 err = mon.osdmon()->check_cluster_features(CEPH_FEATUREMASK_STRETCH_MODE, ss);
1168 if (err)
1169 goto reply;
1170 struct Plugger {
1171 Paxos &p;
1172 Plugger(Paxos &p) : p(p) { p.plug(); }
1173 ~Plugger() { p.unplug(); }
1174 } plugger(paxos);
1175
1176 set<pg_pool_t*> pools;
1177 bool okay = false;
1178 int errcode = 0;
1179
1180 mon.osdmon()->try_enable_stretch_mode_pools(ss, &okay, &errcode,
1181 &pools, new_crush_rule);
1182 if (!okay) {
1183 err = errcode;
1184 goto reply;
1185 }
1186 try_enable_stretch_mode(ss, &okay, &errcode, false,
1187 tiebreaker_mon, dividing_bucket);
1188 if (!okay) {
1189 err = errcode;
1190 goto reply;
1191 }
1192 mon.osdmon()->try_enable_stretch_mode(ss, &okay, &errcode, false,
1193 dividing_bucket, 2, pools, new_crush_rule);
1194 if (!okay) {
1195 err = errcode;
1196 goto reply;
1197 }
1198 // everything looks good, actually commit the changes!
1199 try_enable_stretch_mode(ss, &okay, &errcode, true,
1200 tiebreaker_mon, dividing_bucket);
1201 mon.osdmon()->try_enable_stretch_mode(ss, &okay, &errcode, true,
1202 dividing_bucket,
1203 2, // right now we only support 2 sites
1204 pools, new_crush_rule);
1205 ceph_assert(okay == true);
1206 }
1207 request_proposal(mon.osdmon());
1208 err = 0;
1209 propose = true;
1210 } else {
1211 ss << "unknown command " << prefix;
1212 err = -EINVAL;
1213 }
1214
1215 reply:
1216 getline(ss, rs);
1217 mon.reply_command(op, err, rs, get_last_committed());
1218 // we are returning to the user; do not propose.
1219 return propose;
1220 }
1221
1222 void MonmapMonitor::try_enable_stretch_mode(stringstream& ss, bool *okay,
1223 int *errcode, bool commit,
1224 const string& tiebreaker_mon,
1225 const string& dividing_bucket)
1226 {
1227 dout(20) << __func__ << dendl;
1228 *okay = false;
1229 if (pending_map.strategy != MonMap::CONNECTIVITY) {
1230 ss << "Monitors must use the connectivity strategy to enable stretch mode";
1231 *errcode = -EINVAL;
1232 ceph_assert(!commit);
1233 return;
1234 }
1235 if (!pending_map.contains(tiebreaker_mon)) {
1236 ss << "mon " << tiebreaker_mon << "does not seem to exist";
1237 *errcode = -ENOENT;
1238 ceph_assert(!commit);
1239 return;
1240 }
1241 map<string,string> buckets;
1242 for (const auto&mii : mon.monmap->mon_info) {
1243 const auto& mi = mii.second;
1244 const auto& bi = mi.crush_loc.find(dividing_bucket);
1245 if (bi == mi.crush_loc.end()) {
1246 ss << "Could not find location entry for " << dividing_bucket
1247 << " on monitor " << mi.name;
1248 *errcode = -EINVAL;
1249 ceph_assert(!commit);
1250 return;
1251 }
1252 buckets[mii.first] = bi->second;
1253 }
1254 string bucket1, bucket2, tiebreaker_bucket;
1255 for (auto& i : buckets) {
1256 if (i.first == tiebreaker_mon) {
1257 tiebreaker_bucket = i.second;
1258 continue;
1259 }
1260 if (bucket1.empty()) {
1261 bucket1 = i.second;
1262 }
1263 if (bucket1 != i.second &&
1264 bucket2.empty()) {
1265 bucket2 = i.second;
1266 }
1267 if (bucket1 != i.second &&
1268 bucket2 != i.second) {
1269 ss << "There are too many monitor buckets for stretch mode, found "
1270 << bucket1 << "," << bucket2 << "," << i.second;
1271 *errcode = -EINVAL;
1272 ceph_assert(!commit);
1273 return;
1274 }
1275 }
1276 if (bucket1.empty() || bucket2.empty()) {
1277 ss << "There are not enough monitor buckets for stretch mode;"
1278 << " must have at least 2 plus the tiebreaker but only found "
1279 << (bucket1.empty() ? bucket1 : bucket2);
1280 *errcode = -EINVAL;
1281 ceph_assert(!commit);
1282 return;
1283 }
1284 if (tiebreaker_bucket == bucket1 ||
1285 tiebreaker_bucket == bucket2) {
1286 ss << "The named tiebreaker monitor " << tiebreaker_mon
1287 << " is in the same CRUSH bucket " << tiebreaker_bucket
1288 << " as other monitors";
1289 *errcode = -EINVAL;
1290 ceph_assert(!commit);
1291 return;
1292 }
1293 if (commit) {
1294 pending_map.disallowed_leaders.insert(tiebreaker_mon);
1295 pending_map.tiebreaker_mon = tiebreaker_mon;
1296 pending_map.stretch_mode_enabled = true;
1297 }
1298 *okay = true;
1299 }
1300
1301 void MonmapMonitor::trigger_degraded_stretch_mode(const set<string>& dead_mons)
1302 {
1303 dout(20) << __func__ << dendl;
1304 pending_map.stretch_marked_down_mons.insert(dead_mons.begin(), dead_mons.end());
1305 propose_pending();
1306 }
1307
1308 void MonmapMonitor::trigger_healthy_stretch_mode()
1309 {
1310 dout(20) << __func__ << dendl;
1311 pending_map.stretch_marked_down_mons.clear();
1312 propose_pending();
1313 }
1314
1315 bool MonmapMonitor::preprocess_join(MonOpRequestRef op)
1316 {
1317 auto join = op->get_req<MMonJoin>();
1318 dout(10) << __func__ << " " << join->name << " at " << join->addrs << dendl;
1319
1320 MonSession *session = op->get_session();
1321 if (!session ||
1322 !session->is_capable("mon", MON_CAP_W | MON_CAP_X)) {
1323 dout(10) << " insufficient caps" << dendl;
1324 return true;
1325 }
1326
1327 const auto name_info_i = pending_map.mon_info.find(join->name);
1328 if (name_info_i != pending_map.mon_info.end() &&
1329 !name_info_i->second.public_addrs.front().is_blank_ip() &&
1330 (!join->force_loc || join->crush_loc == name_info_i->second.crush_loc)) {
1331 dout(10) << " already have " << join->name << dendl;
1332 return true;
1333 }
1334 string addr_name;
1335 if (pending_map.contains(join->addrs)) {
1336 addr_name = pending_map.get_name(join->addrs);
1337 }
1338 if (!addr_name.empty() &&
1339 addr_name == join->name &&
1340 (!join->force_loc || join->crush_loc.empty() ||
1341 pending_map.mon_info[addr_name].crush_loc == join->crush_loc)) {
1342 dout(10) << " already have " << join->addrs << dendl;
1343 return true;
1344 }
1345 if (pending_map.stretch_mode_enabled &&
1346 join->crush_loc.empty() &&
1347 (addr_name.empty() ||
1348 pending_map.mon_info[addr_name].crush_loc.empty())) {
1349 dout(10) << "stretch mode engaged but no source of crush_loc" << dendl;
1350 mon.clog->info() << join->name << " attempted to join from " << join->name
1351 << ' ' << join->addrs
1352 << "; but lacks a crush_location for stretch mode";
1353 return true;
1354 }
1355 return false;
1356 }
1357
1358 bool MonmapMonitor::prepare_join(MonOpRequestRef op)
1359 {
1360 auto join = op->get_req<MMonJoin>();
1361 dout(0) << "adding/updating " << join->name
1362 << " at " << join->addrs << " to monitor cluster" << dendl;
1363 map<string,string> existing_loc;
1364 if (pending_map.contains(join->addrs)) {
1365 string name = pending_map.get_name(join->addrs);
1366 existing_loc = pending_map.mon_info[name].crush_loc;
1367 pending_map.remove(name);
1368 }
1369 if (pending_map.contains(join->name))
1370 pending_map.remove(join->name);
1371 pending_map.add(join->name, join->addrs);
1372 pending_map.mon_info[join->name].crush_loc =
1373 ((join->force_loc || existing_loc.empty()) ?
1374 join->crush_loc : existing_loc);
1375 pending_map.last_changed = ceph_clock_now();
1376 return true;
1377 }
1378
1379 bool MonmapMonitor::should_propose(double& delay)
1380 {
1381 delay = 0.0;
1382 return true;
1383 }
1384
1385 int MonmapMonitor::get_monmap(bufferlist &bl)
1386 {
1387 version_t latest_ver = get_last_committed();
1388 dout(10) << __func__ << " ver " << latest_ver << dendl;
1389
1390 if (!mon.store->exists(get_service_name(), stringify(latest_ver)))
1391 return -ENOENT;
1392
1393 int err = get_version(latest_ver, bl);
1394 if (err < 0) {
1395 dout(1) << __func__ << " error obtaining monmap: "
1396 << cpp_strerror(err) << dendl;
1397 return err;
1398 }
1399 return 0;
1400 }
1401
1402 void MonmapMonitor::check_subs()
1403 {
1404 const string type = "monmap";
1405 mon.with_session_map([this, &type](const MonSessionMap& session_map) {
1406 auto subs = session_map.subs.find(type);
1407 if (subs == session_map.subs.end())
1408 return;
1409 for (auto sub : *subs->second) {
1410 check_sub(sub);
1411 }
1412 });
1413 }
1414
1415 void MonmapMonitor::check_sub(Subscription *sub)
1416 {
1417 const auto epoch = mon.monmap->get_epoch();
1418 dout(10) << __func__
1419 << " monmap next " << sub->next
1420 << " have " << epoch << dendl;
1421 if (sub->next <= epoch) {
1422 mon.send_latest_monmap(sub->session->con.get());
1423 if (sub->onetime) {
1424 mon.with_session_map([sub](MonSessionMap& session_map) {
1425 session_map.remove_sub(sub);
1426 });
1427 } else {
1428 sub->next = epoch + 1;
1429 }
1430 }
1431 }
1432
1433 void MonmapMonitor::tick()
1434 {
1435 if (!is_active() ||
1436 !mon.is_leader()) {
1437 return;
1438 }
1439
1440 if (mon.monmap->created.is_zero()) {
1441 dout(10) << __func__ << " detected empty created stamp" << dendl;
1442 utime_t ctime;
1443 for (version_t v = 1; v <= get_last_committed(); v++) {
1444 bufferlist bl;
1445 int r = get_version(v, bl);
1446 if (r < 0) {
1447 continue;
1448 }
1449 MonMap m;
1450 auto p = bl.cbegin();
1451 decode(m, p);
1452 if (!m.last_changed.is_zero()) {
1453 dout(10) << __func__ << " first monmap with last_changed is "
1454 << v << " with " << m.last_changed << dendl;
1455 ctime = m.last_changed;
1456 break;
1457 }
1458 }
1459 if (ctime.is_zero()) {
1460 ctime = ceph_clock_now();
1461 }
1462 dout(10) << __func__ << " updating created stamp to " << ctime << dendl;
1463 pending_map.created = ctime;
1464 propose_pending();
1465 }
1466 }