]> git.proxmox.com Git - ceph.git/blob - ceph/src/ceph_mon.cc
import ceph pacific 16.2.5
[ceph.git] / ceph / src / ceph_mon.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #include <sys/types.h>
16 #include <sys/stat.h>
17 #include <fcntl.h>
18
19 #include <iostream>
20 #include <string>
21
22 #include "common/config.h"
23 #include "include/ceph_features.h"
24
25 #include "mon/MonMap.h"
26 #include "mon/Monitor.h"
27 #include "mon/MonitorDBStore.h"
28 #include "mon/MonClient.h"
29
30 #include "msg/Messenger.h"
31
32 #include "include/CompatSet.h"
33
34 #include "common/ceph_argparse.h"
35 #include "common/pick_address.h"
36 #include "common/Throttle.h"
37 #include "common/Timer.h"
38 #include "common/errno.h"
39 #include "common/Preforker.h"
40
41 #include "global/global_init.h"
42 #include "global/signal_handler.h"
43
44 #include "perfglue/heap_profiler.h"
45
46 #include "include/ceph_assert.h"
47
48 #define dout_subsys ceph_subsys_mon
49
50 using std::cerr;
51 using std::cout;
52 using std::list;
53 using std::map;
54 using std::ostringstream;
55 using std::string;
56 using std::vector;
57
58 using ceph::bufferlist;
59 using ceph::decode;
60 using ceph::encode;
61 using ceph::JSONFormatter;
62
63 Monitor *mon = NULL;
64
65
66 void handle_mon_signal(int signum)
67 {
68 if (mon)
69 mon->handle_signal(signum);
70 }
71
72
73 int obtain_monmap(MonitorDBStore &store, bufferlist &bl)
74 {
75 dout(10) << __func__ << dendl;
76 /*
77 * the monmap may be in one of three places:
78 * 'mon_sync:temp_newer_monmap' - stashed newer map for bootstrap
79 * 'monmap:<latest_version_no>' - the monmap we'd really like to have
80 * 'mon_sync:latest_monmap' - last monmap backed up for the last sync
81 * 'mkfs:monmap' - a monmap resulting from mkfs
82 */
83
84 if (store.exists("monmap", "last_committed")) {
85 version_t latest_ver = store.get("monmap", "last_committed");
86 if (store.exists("monmap", latest_ver)) {
87 int err = store.get("monmap", latest_ver, bl);
88 ceph_assert(err == 0);
89 ceph_assert(bl.length() > 0);
90 dout(10) << __func__ << " read last committed monmap ver "
91 << latest_ver << dendl;
92
93 // see if there is stashed newer map (see bootstrap())
94 if (store.exists("mon_sync", "temp_newer_monmap")) {
95 bufferlist bl2;
96 int err = store.get("mon_sync", "temp_newer_monmap", bl2);
97 ceph_assert(err == 0);
98 ceph_assert(bl2.length() > 0);
99 MonMap b;
100 b.decode(bl2);
101 if (b.get_epoch() > latest_ver) {
102 dout(10) << __func__ << " using stashed monmap " << b.get_epoch()
103 << " instead" << dendl;
104 bl = std::move(bl2);
105 } else {
106 dout(10) << __func__ << " ignoring stashed monmap " << b.get_epoch()
107 << dendl;
108 }
109 }
110 return 0;
111 }
112 }
113
114 if (store.exists("mon_sync", "in_sync")
115 || store.exists("mon_sync", "force_sync")) {
116 dout(10) << __func__ << " detected aborted sync" << dendl;
117 if (store.exists("mon_sync", "latest_monmap")) {
118 int err = store.get("mon_sync", "latest_monmap", bl);
119 ceph_assert(err == 0);
120 ceph_assert(bl.length() > 0);
121 dout(10) << __func__ << " read backup monmap" << dendl;
122 return 0;
123 }
124 }
125
126 if (store.exists("mon_sync", "temp_newer_monmap")) {
127 dout(10) << __func__ << " found temp_newer_monmap" << dendl;
128 int err = store.get("mon_sync", "temp_newer_monmap", bl);
129 ceph_assert(err == 0);
130 ceph_assert(bl.length() > 0);
131 return 0;
132 }
133
134 if (store.exists("mkfs", "monmap")) {
135 dout(10) << __func__ << " found mkfs monmap" << dendl;
136 int err = store.get("mkfs", "monmap", bl);
137 ceph_assert(err == 0);
138 ceph_assert(bl.length() > 0);
139 return 0;
140 }
141
142 derr << __func__ << " unable to find a monmap" << dendl;
143 return -ENOENT;
144 }
145
146 int check_mon_data_exists()
147 {
148 string mon_data = g_conf()->mon_data;
149 struct stat buf;
150 if (::stat(mon_data.c_str(), &buf)) {
151 if (errno != ENOENT) {
152 derr << "stat(" << mon_data << ") " << cpp_strerror(errno) << dendl;
153 }
154 return -errno;
155 }
156 return 0;
157 }
158
159 /** Check whether **mon data** is empty.
160 *
161 * Being empty means mkfs has not been run and there's no monitor setup
162 * at **g_conf()->mon_data**.
163 *
164 * If the directory g_conf()->mon_data is not empty we will return -ENOTEMPTY.
165 * Otherwise we will return 0. Any other negative returns will represent
166 * a failure to be handled by the caller.
167 *
168 * @return **0** on success, -ENOTEMPTY if not empty or **-errno** otherwise.
169 */
170 int check_mon_data_empty()
171 {
172 string mon_data = g_conf()->mon_data;
173
174 DIR *dir = ::opendir(mon_data.c_str());
175 if (!dir) {
176 derr << "opendir(" << mon_data << ") " << cpp_strerror(errno) << dendl;
177 return -errno;
178 }
179 int code = 0;
180 struct dirent *de = nullptr;
181 errno = 0;
182 while ((de = ::readdir(dir))) {
183 if (string(".") != de->d_name &&
184 string("..") != de->d_name &&
185 string("kv_backend") != de->d_name) {
186 code = -ENOTEMPTY;
187 break;
188 }
189 }
190 if (!de && errno) {
191 derr << "readdir(" << mon_data << ") " << cpp_strerror(errno) << dendl;
192 code = -errno;
193 }
194
195 ::closedir(dir);
196
197 return code;
198 }
199
200 static void usage()
201 {
202 cout << "usage: ceph-mon -i <ID> [flags]\n"
203 << " --debug_mon n\n"
204 << " debug monitor level (e.g. 10)\n"
205 << " --mkfs\n"
206 << " build fresh monitor fs\n"
207 << " --force-sync\n"
208 << " force a sync from another mon by wiping local data (BE CAREFUL)\n"
209 << " --yes-i-really-mean-it\n"
210 << " mandatory safeguard for --force-sync\n"
211 << " --compact\n"
212 << " compact the monitor store\n"
213 << " --osdmap <filename>\n"
214 << " only used when --mkfs is provided: load the osdmap from <filename>\n"
215 << " --inject-monmap <filename>\n"
216 << " write the <filename> monmap to the local monitor store and exit\n"
217 << " --extract-monmap <filename>\n"
218 << " extract the monmap from the local monitor store and exit\n"
219 << " --mon-data <directory>\n"
220 << " where the mon store and keyring are located\n"
221 << " --set-crush-location <bucket>=<foo>"
222 << " sets monitor's crush bucket location (only for stretch mode)"
223 << std::endl;
224 generic_server_usage();
225 }
226
227 entity_addrvec_t make_mon_addrs(entity_addr_t a)
228 {
229 entity_addrvec_t addrs;
230 if (a.get_port() == 0) {
231 a.set_type(entity_addr_t::TYPE_MSGR2);
232 a.set_port(CEPH_MON_PORT_IANA);
233 addrs.v.push_back(a);
234 a.set_type(entity_addr_t::TYPE_LEGACY);
235 a.set_port(CEPH_MON_PORT_LEGACY);
236 addrs.v.push_back(a);
237 } else if (a.get_port() == CEPH_MON_PORT_LEGACY) {
238 a.set_type(entity_addr_t::TYPE_LEGACY);
239 addrs.v.push_back(a);
240 } else if (a.get_type() == entity_addr_t::TYPE_ANY) {
241 a.set_type(entity_addr_t::TYPE_MSGR2);
242 addrs.v.push_back(a);
243 } else {
244 addrs.v.push_back(a);
245 }
246 return addrs;
247 }
248
249 int main(int argc, const char **argv)
250 {
251 // reset our process name, in case we did a respawn, so that it's not
252 // left as "exe".
253 ceph_pthread_setname(pthread_self(), "ceph-mon");
254
255 int err;
256
257 bool mkfs = false;
258 bool compact = false;
259 bool force_sync = false;
260 bool yes_really = false;
261 std::string osdmapfn, inject_monmap, extract_monmap, crush_loc;
262
263 vector<const char*> args;
264 argv_to_vec(argc, argv, args);
265 if (args.empty()) {
266 cerr << argv[0] << ": -h or --help for usage" << std::endl;
267 exit(1);
268 }
269 if (ceph_argparse_need_usage(args)) {
270 usage();
271 exit(0);
272 }
273
274 // We need to specify some default values that may be overridden by the
275 // user, that are specific to the monitor. The options we are overriding
276 // are also used on the OSD (or in any other component that uses leveldb),
277 // so changing the global defaults is not an option.
278 // This is not the prettiest way of doing this, especially since it has us
279 // having a different place defining default values, but it's not horribly
280 // wrong enough to prevent us from doing it :)
281 //
282 // NOTE: user-defined options will take precedence over ours.
283 //
284 // leveldb_write_buffer_size = 32*1024*1024 = 33554432 // 32MB
285 // leveldb_cache_size = 512*1024*1204 = 536870912 // 512MB
286 // leveldb_block_size = 64*1024 = 65536 // 64KB
287 // leveldb_compression = false
288 // leveldb_log = ""
289 map<string,string> defaults = {
290 { "leveldb_write_buffer_size", "33554432" },
291 { "leveldb_cache_size", "536870912" },
292 { "leveldb_block_size", "65536" },
293 { "leveldb_compression", "false"},
294 { "leveldb_log", "" },
295 { "keyring", "$mon_data/keyring" },
296 };
297
298 int flags = 0;
299 {
300 vector<const char*> args_copy = args;
301 std::string val;
302 for (std::vector<const char*>::iterator i = args_copy.begin();
303 i != args_copy.end(); ) {
304 if (ceph_argparse_double_dash(args_copy, i)) {
305 break;
306 } else if (ceph_argparse_flag(args_copy, i, "--mkfs", (char*)NULL)) {
307 flags |= CINIT_FLAG_NO_DAEMON_ACTIONS;
308 } else if (ceph_argparse_witharg(args_copy, i, &val, "--inject_monmap", (char*)NULL)) {
309 flags |= CINIT_FLAG_NO_DAEMON_ACTIONS;
310 } else if (ceph_argparse_witharg(args_copy, i, &val, "--extract-monmap", (char*)NULL)) {
311 flags |= CINIT_FLAG_NO_DAEMON_ACTIONS;
312 } else {
313 ++i;
314 }
315 }
316 }
317
318 // don't try to get config from mon cluster during startup
319 flags |= CINIT_FLAG_NO_MON_CONFIG;
320
321 auto cct = global_init(&defaults, args,
322 CEPH_ENTITY_TYPE_MON, CODE_ENVIRONMENT_DAEMON,
323 flags);
324 ceph_heap_profiler_init();
325
326 std::string val;
327 for (std::vector<const char*>::iterator i = args.begin(); i != args.end(); ) {
328 if (ceph_argparse_double_dash(args, i)) {
329 break;
330 } else if (ceph_argparse_flag(args, i, "--mkfs", (char*)NULL)) {
331 mkfs = true;
332 } else if (ceph_argparse_flag(args, i, "--compact", (char*)NULL)) {
333 compact = true;
334 } else if (ceph_argparse_flag(args, i, "--force-sync", (char*)NULL)) {
335 force_sync = true;
336 } else if (ceph_argparse_flag(args, i, "--yes-i-really-mean-it", (char*)NULL)) {
337 yes_really = true;
338 } else if (ceph_argparse_witharg(args, i, &val, "--osdmap", (char*)NULL)) {
339 osdmapfn = val;
340 } else if (ceph_argparse_witharg(args, i, &val, "--inject_monmap", (char*)NULL)) {
341 inject_monmap = val;
342 } else if (ceph_argparse_witharg(args, i, &val, "--extract-monmap", (char*)NULL)) {
343 extract_monmap = val;
344 } else if (ceph_argparse_witharg(args, i, &val, "--set-crush-location", (char*)NULL)) {
345 crush_loc = val;
346 } else {
347 ++i;
348 }
349 }
350 if (!args.empty()) {
351 cerr << "too many arguments: " << args << std::endl;
352 exit(1);
353 }
354
355 if (force_sync && !yes_really) {
356 cerr << "are you SURE you want to force a sync? this will erase local data and may\n"
357 << "break your mon cluster. pass --yes-i-really-mean-it if you do." << std::endl;
358 exit(1);
359 }
360
361 if (g_conf()->mon_data.empty()) {
362 cerr << "must specify '--mon-data=foo' data path" << std::endl;
363 exit(1);
364 }
365
366 if (g_conf()->name.get_id().empty()) {
367 cerr << "must specify id (--id <id> or --name mon.<id>)" << std::endl;
368 exit(1);
369 }
370
371 // -- mkfs --
372 if (mkfs) {
373
374 int err = check_mon_data_exists();
375 if (err == -ENOENT) {
376 if (::mkdir(g_conf()->mon_data.c_str(), 0755)) {
377 derr << "mkdir(" << g_conf()->mon_data << ") : "
378 << cpp_strerror(errno) << dendl;
379 exit(1);
380 }
381 } else if (err < 0) {
382 derr << "error opening '" << g_conf()->mon_data << "': "
383 << cpp_strerror(-err) << dendl;
384 exit(-err);
385 }
386
387 err = check_mon_data_empty();
388 if (err == -ENOTEMPTY) {
389 // Mon may exist. Let the user know and exit gracefully.
390 derr << "'" << g_conf()->mon_data << "' already exists and is not empty"
391 << ": monitor may already exist" << dendl;
392 exit(0);
393 } else if (err < 0) {
394 derr << "error checking if '" << g_conf()->mon_data << "' is empty: "
395 << cpp_strerror(-err) << dendl;
396 exit(-err);
397 }
398
399 // resolve public_network -> public_addr
400 pick_addresses(g_ceph_context, CEPH_PICK_ADDRESS_PUBLIC);
401
402 dout(10) << "public_network " << g_conf()->public_network << dendl;
403 dout(10) << "public_addr " << g_conf()->public_addr << dendl;
404 dout(10) << "public_addrv " << g_conf()->public_addrv << dendl;
405
406 common_init_finish(g_ceph_context);
407
408 bufferlist monmapbl, osdmapbl;
409 std::string error;
410 MonMap monmap;
411
412 // load or generate monmap
413 const auto monmap_fn = g_conf().get_val<string>("monmap");
414 if (monmap_fn.length()) {
415 int err = monmapbl.read_file(monmap_fn.c_str(), &error);
416 if (err < 0) {
417 derr << argv[0] << ": error reading " << monmap_fn << ": " << error << dendl;
418 exit(1);
419 }
420 try {
421 monmap.decode(monmapbl);
422
423 // always mark seed/mkfs monmap as epoch 0
424 monmap.set_epoch(0);
425 } catch (const ceph::buffer::error& e) {
426 derr << argv[0] << ": error decoding monmap " << monmap_fn << ": " << e.what() << dendl;
427 exit(1);
428 }
429
430 dout(1) << "imported monmap:\n";
431 monmap.print(*_dout);
432 *_dout << dendl;
433
434 } else {
435 ostringstream oss;
436 int err = monmap.build_initial(g_ceph_context, true, oss);
437 if (oss.tellp())
438 derr << oss.str() << dendl;
439 if (err < 0) {
440 derr << argv[0] << ": warning: no initial monitors; must use admin socket to feed hints" << dendl;
441 }
442
443 dout(1) << "initial generated monmap:\n";
444 monmap.print(*_dout);
445 *_dout << dendl;
446
447 // am i part of the initial quorum?
448 if (monmap.contains(g_conf()->name.get_id())) {
449 // hmm, make sure the ip listed exists on the current host?
450 // maybe later.
451 } else if (!g_conf()->public_addrv.empty()) {
452 entity_addrvec_t av = g_conf()->public_addrv;
453 string name;
454 if (monmap.contains(av, &name)) {
455 monmap.rename(name, g_conf()->name.get_id());
456 dout(0) << argv[0] << ": renaming mon." << name << " " << av
457 << " to mon." << g_conf()->name.get_id() << dendl;
458 }
459 } else if (!g_conf()->public_addr.is_blank_ip()) {
460 entity_addrvec_t av = make_mon_addrs(g_conf()->public_addr);
461 string name;
462 if (monmap.contains(av, &name)) {
463 monmap.rename(name, g_conf()->name.get_id());
464 dout(0) << argv[0] << ": renaming mon." << name << " " << av
465 << " to mon." << g_conf()->name.get_id() << dendl;
466 }
467 } else {
468 // is a local address listed without a name? if so, name myself.
469 list<entity_addr_t> ls;
470 monmap.list_addrs(ls);
471 dout(0) << " monmap addrs are " << ls << ", checking if any are local"
472 << dendl;
473
474 entity_addr_t local;
475 if (have_local_addr(g_ceph_context, ls, &local)) {
476 dout(0) << " have local addr " << local << dendl;
477 string name;
478 local.set_type(entity_addr_t::TYPE_MSGR2);
479 if (!monmap.get_addr_name(local, name)) {
480 local.set_type(entity_addr_t::TYPE_LEGACY);
481 if (!monmap.get_addr_name(local, name)) {
482 dout(0) << "no local addresses appear in bootstrap monmap"
483 << dendl;
484 }
485 }
486 if (name.compare(0, 7, "noname-") == 0) {
487 dout(0) << argv[0] << ": mon." << name << " " << local
488 << " is local, renaming to mon." << g_conf()->name.get_id()
489 << dendl;
490 monmap.rename(name, g_conf()->name.get_id());
491 } else if (name.size()) {
492 dout(0) << argv[0] << ": mon." << name << " " << local
493 << " is local, but not 'noname-' + something; "
494 << "not assuming it's me" << dendl;
495 }
496 } else {
497 dout(0) << " no local addrs match monmap" << dendl;
498 }
499 }
500 }
501
502 const auto fsid = g_conf().get_val<uuid_d>("fsid");
503 if (!fsid.is_zero()) {
504 monmap.fsid = fsid;
505 dout(0) << argv[0] << ": set fsid to " << fsid << dendl;
506 }
507
508 if (monmap.fsid.is_zero()) {
509 derr << argv[0] << ": generated monmap has no fsid; use '--fsid <uuid>'" << dendl;
510 exit(10);
511 }
512
513 //monmap.print(cout);
514
515 // osdmap
516 if (osdmapfn.length()) {
517 err = osdmapbl.read_file(osdmapfn.c_str(), &error);
518 if (err < 0) {
519 derr << argv[0] << ": error reading " << osdmapfn << ": "
520 << error << dendl;
521 exit(1);
522 }
523 }
524
525 // go
526 MonitorDBStore store(g_conf()->mon_data);
527 ostringstream oss;
528 int r = store.create_and_open(oss);
529 if (oss.tellp())
530 derr << oss.str() << dendl;
531 if (r < 0) {
532 derr << argv[0] << ": error opening mon data directory at '"
533 << g_conf()->mon_data << "': " << cpp_strerror(r) << dendl;
534 exit(1);
535 }
536 ceph_assert(r == 0);
537
538 Monitor mon(g_ceph_context, g_conf()->name.get_id(), &store, 0, 0, &monmap);
539 r = mon.mkfs(osdmapbl);
540 if (r < 0) {
541 derr << argv[0] << ": error creating monfs: " << cpp_strerror(r) << dendl;
542 exit(1);
543 }
544 store.close();
545 dout(0) << argv[0] << ": created monfs at " << g_conf()->mon_data
546 << " for " << g_conf()->name << dendl;
547 return 0;
548 }
549
550 err = check_mon_data_exists();
551 if (err < 0 && err == -ENOENT) {
552 derr << "monitor data directory at '" << g_conf()->mon_data << "'"
553 << " does not exist: have you run 'mkfs'?" << dendl;
554 exit(1);
555 } else if (err < 0) {
556 derr << "error accessing monitor data directory at '"
557 << g_conf()->mon_data << "': " << cpp_strerror(-err) << dendl;
558 exit(1);
559 }
560
561 err = check_mon_data_empty();
562 if (err == 0) {
563 derr << "monitor data directory at '" << g_conf()->mon_data
564 << "' is empty: have you run 'mkfs'?" << dendl;
565 exit(1);
566 } else if (err < 0 && err != -ENOTEMPTY) {
567 // we don't want an empty data dir by now
568 derr << "error accessing '" << g_conf()->mon_data << "': "
569 << cpp_strerror(-err) << dendl;
570 exit(1);
571 }
572
573 {
574 // check fs stats. don't start if it's critically close to full.
575 ceph_data_stats_t stats;
576 int err = get_fs_stats(stats, g_conf()->mon_data.c_str());
577 if (err < 0) {
578 derr << "error checking monitor data's fs stats: " << cpp_strerror(err)
579 << dendl;
580 exit(-err);
581 }
582 if (stats.avail_percent <= g_conf()->mon_data_avail_crit) {
583 derr << "error: monitor data filesystem reached concerning levels of"
584 << " available storage space (available: "
585 << stats.avail_percent << "% " << byte_u_t(stats.byte_avail)
586 << ")\nyou may adjust 'mon data avail crit' to a lower value"
587 << " to make this go away (default: " << g_conf()->mon_data_avail_crit
588 << "%)\n" << dendl;
589 exit(ENOSPC);
590 }
591 }
592
593 // we fork early to prevent leveldb's environment static state from
594 // screwing us over
595 Preforker prefork;
596 if (!(flags & CINIT_FLAG_NO_DAEMON_ACTIONS)) {
597 if (global_init_prefork(g_ceph_context) >= 0) {
598 string err_msg;
599 err = prefork.prefork(err_msg);
600 if (err < 0) {
601 derr << err_msg << dendl;
602 prefork.exit(err);
603 }
604 if (prefork.is_parent()) {
605 err = prefork.parent_wait(err_msg);
606 if (err < 0)
607 derr << err_msg << dendl;
608 prefork.exit(err);
609 }
610 setsid();
611 global_init_postfork_start(g_ceph_context);
612 }
613 common_init_finish(g_ceph_context);
614 global_init_chdir(g_ceph_context);
615 if (global_init_preload_erasure_code(g_ceph_context) < 0)
616 prefork.exit(1);
617 }
618
619 // set up signal handlers, now that we've daemonized/forked.
620 init_async_signal_handler();
621 register_async_signal_handler(SIGHUP, sighup_handler);
622
623 MonitorDBStore *store = new MonitorDBStore(g_conf()->mon_data);
624
625 // make sure we aren't upgrading too fast
626 {
627 string val;
628 int r = store->read_meta("min_mon_release", &val);
629 if (r >= 0 && val.size()) {
630 ceph_release_t from_release = ceph_release_from_name(val);
631 ostringstream err;
632 if (!can_upgrade_from(from_release, "min_mon_release", err)) {
633 derr << err.str() << dendl;
634 prefork.exit(1);
635 }
636 }
637 }
638
639 {
640 ostringstream oss;
641 err = store->open(oss);
642 if (oss.tellp())
643 derr << oss.str() << dendl;
644 if (err < 0) {
645 derr << "error opening mon data directory at '"
646 << g_conf()->mon_data << "': " << cpp_strerror(err) << dendl;
647 prefork.exit(1);
648 }
649 }
650
651 bufferlist magicbl;
652 err = store->get(Monitor::MONITOR_NAME, "magic", magicbl);
653 if (err || !magicbl.length()) {
654 derr << "unable to read magic from mon data" << dendl;
655 prefork.exit(1);
656 }
657 string magic(magicbl.c_str(), magicbl.length()-1); // ignore trailing \n
658 if (strcmp(magic.c_str(), CEPH_MON_ONDISK_MAGIC)) {
659 derr << "mon fs magic '" << magic << "' != current '" << CEPH_MON_ONDISK_MAGIC << "'" << dendl;
660 prefork.exit(1);
661 }
662
663 err = Monitor::check_features(store);
664 if (err < 0) {
665 derr << "error checking features: " << cpp_strerror(err) << dendl;
666 prefork.exit(1);
667 }
668
669 // inject new monmap?
670 if (!inject_monmap.empty()) {
671 bufferlist bl;
672 std::string error;
673 int r = bl.read_file(inject_monmap.c_str(), &error);
674 if (r) {
675 derr << "unable to read monmap from " << inject_monmap << ": "
676 << error << dendl;
677 prefork.exit(1);
678 }
679
680 // get next version
681 version_t v = store->get("monmap", "last_committed");
682 dout(0) << "last committed monmap epoch is " << v << ", injected map will be " << (v+1)
683 << dendl;
684 v++;
685
686 // set the version
687 MonMap tmp;
688 tmp.decode(bl);
689 if (tmp.get_epoch() != v) {
690 dout(0) << "changing monmap epoch from " << tmp.get_epoch()
691 << " to " << v << dendl;
692 tmp.set_epoch(v);
693 }
694 bufferlist mapbl;
695 tmp.encode(mapbl, CEPH_FEATURES_ALL);
696 bufferlist final;
697 encode(v, final);
698 encode(mapbl, final);
699
700 auto t(std::make_shared<MonitorDBStore::Transaction>());
701 // save it
702 t->put("monmap", v, mapbl);
703 t->put("monmap", "latest", final);
704 t->put("monmap", "last_committed", v);
705 store->apply_transaction(t);
706
707 dout(0) << "done." << dendl;
708 prefork.exit(0);
709 }
710
711 // monmap?
712 MonMap monmap;
713 {
714 // note that even if we don't find a viable monmap, we should go ahead
715 // and try to build it up in the next if-else block.
716 bufferlist mapbl;
717 int err = obtain_monmap(*store, mapbl);
718 if (err >= 0) {
719 try {
720 monmap.decode(mapbl);
721 } catch (const ceph::buffer::error& e) {
722 derr << "can't decode monmap: " << e.what() << dendl;
723 }
724 } else {
725 derr << "unable to obtain a monmap: " << cpp_strerror(err) << dendl;
726 }
727
728 dout(10) << __func__ << " monmap:\n";
729 JSONFormatter jf(true);
730 jf.dump_object("monmap", monmap);
731 jf.flush(*_dout);
732 *_dout << dendl;
733
734 if (!extract_monmap.empty()) {
735 int r = mapbl.write_file(extract_monmap.c_str());
736 if (r < 0) {
737 r = -errno;
738 derr << "error writing monmap to " << extract_monmap << ": " << cpp_strerror(r) << dendl;
739 prefork.exit(1);
740 }
741 derr << "wrote monmap to " << extract_monmap << dendl;
742 prefork.exit(0);
743 }
744 }
745
746 // this is what i will bind to
747 entity_addrvec_t ipaddrs;
748
749 if (monmap.contains(g_conf()->name.get_id())) {
750 ipaddrs = monmap.get_addrs(g_conf()->name.get_id());
751
752 // print helpful warning if the conf file doesn't match
753 std::vector<std::string> my_sections = g_conf().get_my_sections();
754 std::string mon_addr_str;
755 if (g_conf().get_val_from_conf_file(my_sections, "mon addr",
756 mon_addr_str, true) == 0) {
757 entity_addr_t conf_addr;
758 if (conf_addr.parse(mon_addr_str.c_str())) {
759 entity_addrvec_t conf_addrs = make_mon_addrs(conf_addr);
760 if (ipaddrs != conf_addrs) {
761 derr << "WARNING: 'mon addr' config option " << conf_addrs
762 << " does not match monmap file" << std::endl
763 << " continuing with monmap configuration" << dendl;
764 }
765 } else
766 derr << "WARNING: invalid 'mon addr' config option" << std::endl
767 << " continuing with monmap configuration" << dendl;
768 }
769 } else {
770 dout(0) << g_conf()->name << " does not exist in monmap, will attempt to join an existing cluster" << dendl;
771
772 pick_addresses(g_ceph_context, CEPH_PICK_ADDRESS_PUBLIC);
773 if (!g_conf()->public_addrv.empty()) {
774 ipaddrs = g_conf()->public_addrv;
775 dout(0) << "using public_addrv " << ipaddrs << dendl;
776 } else if (!g_conf()->public_addr.is_blank_ip()) {
777 ipaddrs = make_mon_addrs(g_conf()->public_addr);
778 dout(0) << "using public_addr " << g_conf()->public_addr << " -> "
779 << ipaddrs << dendl;
780 } else {
781 MonMap tmpmap;
782 ostringstream oss;
783 int err = tmpmap.build_initial(g_ceph_context, true, oss);
784 if (oss.tellp())
785 derr << oss.str() << dendl;
786 if (err < 0) {
787 derr << argv[0] << ": error generating initial monmap: "
788 << cpp_strerror(err) << dendl;
789 prefork.exit(1);
790 }
791 if (tmpmap.contains(g_conf()->name.get_id())) {
792 ipaddrs = tmpmap.get_addrs(g_conf()->name.get_id());
793 } else {
794 derr << "no public_addr or public_network specified, and "
795 << g_conf()->name << " not present in monmap or ceph.conf" << dendl;
796 prefork.exit(1);
797 }
798 }
799 }
800
801 // bind
802 int rank = monmap.get_rank(g_conf()->name.get_id());
803 std::string public_msgr_type = g_conf()->ms_public_type.empty() ? g_conf().get_val<std::string>("ms_type") : g_conf()->ms_public_type;
804 Messenger *msgr = Messenger::create(g_ceph_context, public_msgr_type,
805 entity_name_t::MON(rank), "mon", 0);
806 if (!msgr)
807 exit(1);
808 msgr->set_cluster_protocol(CEPH_MON_PROTOCOL);
809 msgr->set_default_send_priority(CEPH_MSG_PRIO_HIGH);
810
811 msgr->set_default_policy(Messenger::Policy::stateless_server(0));
812 msgr->set_policy(entity_name_t::TYPE_MON,
813 Messenger::Policy::lossless_peer_reuse(
814 CEPH_FEATURE_SERVER_LUMINOUS));
815 msgr->set_policy(entity_name_t::TYPE_OSD,
816 Messenger::Policy::stateless_server(
817 CEPH_FEATURE_SERVER_LUMINOUS));
818 msgr->set_policy(entity_name_t::TYPE_CLIENT,
819 Messenger::Policy::stateless_server(0));
820 msgr->set_policy(entity_name_t::TYPE_MDS,
821 Messenger::Policy::stateless_server(0));
822
823 // throttle client traffic
824 Throttle *client_throttler = new Throttle(g_ceph_context, "mon_client_bytes",
825 g_conf()->mon_client_bytes);
826 msgr->set_policy_throttlers(entity_name_t::TYPE_CLIENT,
827 client_throttler, NULL);
828
829 // throttle daemon traffic
830 // NOTE: actual usage on the leader may multiply by the number of
831 // monitors if they forward large update messages from daemons.
832 Throttle *daemon_throttler = new Throttle(g_ceph_context, "mon_daemon_bytes",
833 g_conf()->mon_daemon_bytes);
834 msgr->set_policy_throttlers(entity_name_t::TYPE_OSD, daemon_throttler,
835 NULL);
836 msgr->set_policy_throttlers(entity_name_t::TYPE_MDS, daemon_throttler,
837 NULL);
838
839 entity_addrvec_t bind_addrs = ipaddrs;
840 entity_addrvec_t public_addrs = ipaddrs;
841
842 // check if the public_bind_addr option is set
843 if (!g_conf()->public_bind_addr.is_blank_ip()) {
844 bind_addrs = make_mon_addrs(g_conf()->public_bind_addr);
845 }
846
847 dout(0) << "starting " << g_conf()->name << " rank " << rank
848 << " at public addrs " << public_addrs
849 << " at bind addrs " << bind_addrs
850 << " mon_data " << g_conf()->mon_data
851 << " fsid " << monmap.get_fsid()
852 << dendl;
853
854 Messenger *mgr_msgr = Messenger::create(g_ceph_context, public_msgr_type,
855 entity_name_t::MON(rank), "mon-mgrc",
856 Messenger::get_pid_nonce());
857 if (!mgr_msgr) {
858 derr << "unable to create mgr_msgr" << dendl;
859 prefork.exit(1);
860 }
861
862 mon = new Monitor(g_ceph_context, g_conf()->name.get_id(), store,
863 msgr, mgr_msgr, &monmap);
864
865 mon->orig_argc = argc;
866 mon->orig_argv = argv;
867
868 if (force_sync) {
869 derr << "flagging a forced sync ..." << dendl;
870 ostringstream oss;
871 JSONFormatter jf(true);
872 mon->sync_force(&jf);
873 derr << "out:\n";
874 jf.flush(*_dout);
875 *_dout << dendl;
876 }
877
878 err = mon->preinit();
879 if (err < 0) {
880 derr << "failed to initialize" << dendl;
881 prefork.exit(1);
882 }
883
884 if (compact || g_conf()->mon_compact_on_start) {
885 derr << "compacting monitor store ..." << dendl;
886 mon->store->compact();
887 derr << "done compacting" << dendl;
888 }
889
890 // bind
891 err = msgr->bindv(bind_addrs);
892 if (err < 0) {
893 derr << "unable to bind monitor to " << bind_addrs << dendl;
894 prefork.exit(1);
895 }
896
897 // if the public and bind addr are different set the msgr addr
898 // to the public one, now that the bind is complete.
899 if (public_addrs != bind_addrs) {
900 msgr->set_addrs(public_addrs);
901 }
902
903 if (g_conf()->daemonize) {
904 global_init_postfork_finish(g_ceph_context);
905 prefork.daemonize();
906 }
907
908 msgr->start();
909 mgr_msgr->start();
910
911 mon->set_mon_crush_location(crush_loc);
912 mon->init();
913
914 register_async_signal_handler_oneshot(SIGINT, handle_mon_signal);
915 register_async_signal_handler_oneshot(SIGTERM, handle_mon_signal);
916
917 if (g_conf()->inject_early_sigterm)
918 kill(getpid(), SIGTERM);
919
920 msgr->wait();
921 mgr_msgr->wait();
922
923 store->close();
924
925 unregister_async_signal_handler(SIGHUP, sighup_handler);
926 unregister_async_signal_handler(SIGINT, handle_mon_signal);
927 unregister_async_signal_handler(SIGTERM, handle_mon_signal);
928 shutdown_async_signal_handler();
929
930 delete mon;
931 delete store;
932 delete msgr;
933 delete mgr_msgr;
934 delete client_throttler;
935 delete daemon_throttler;
936
937 // cd on exit, so that gmon.out (if any) goes into a separate directory for each node.
938 char s[20];
939 snprintf(s, sizeof(s), "gmon/%d", getpid());
940 if ((mkdir(s, 0755) == 0) && (chdir(s) == 0)) {
941 dout(0) << "ceph-mon: gmon.out should be in " << s << dendl;
942 }
943
944 prefork.signal_exit(0);
945 return 0;
946 }