]> git.proxmox.com Git - ceph.git/blob - ceph/src/common/ceph_context.cc
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / common / ceph_context.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2011 New Dream Network
7 * Copyright (C) 2017 OVH
8 *
9 * This is free software; you can redistribute it and/or
10 * modify it under the terms of the GNU Lesser General Public
11 * License version 2.1, as published by the Free Software
12 * Foundation. See file COPYING.
13 *
14 */
15
16 #include "common/ceph_context.h"
17
18 #include <mutex>
19 #include <iostream>
20
21 #include <pthread.h>
22
23 #include <boost/algorithm/string.hpp>
24
25 #include "include/common_fwd.h"
26 #include "include/mempool.h"
27 #include "include/stringify.h"
28 #include "common/admin_socket.h"
29 #include "common/code_environment.h"
30 #include "common/ceph_mutex.h"
31 #include "common/debug.h"
32 #include "common/config.h"
33 #include "common/ceph_crypto.h"
34 #include "common/lockdep.h"
35 #include "common/HeartbeatMap.h"
36 #include "common/errno.h"
37 #include "common/Graylog.h"
38
39 #include "log/Log.h"
40
41 #include "auth/Crypto.h"
42 #include "include/str_list.h"
43 #include "common/config.h"
44 #include "common/config_obs.h"
45 #include "common/PluginRegistry.h"
46 #include "common/valgrind.h"
47 #include "include/spinlock.h"
48 #if !(defined(WITH_SEASTAR) && !defined(WITH_ALIEN))
49 #include "mon/MonMap.h"
50 #endif
51
52 // for CINIT_FLAGS
53 #include "common/common_init.h"
54
55 #include <iostream>
56 #include <pthread.h>
57
58 using namespace std::literals;
59
60 using ceph::bufferlist;
61 using ceph::HeartbeatMap;
62
63
64 #if defined(WITH_SEASTAR) && !defined(WITH_ALIEN)
65 namespace crimson::common {
66 CephContext::CephContext()
67 : _conf{crimson::common::local_conf()},
68 _perf_counters_collection{crimson::common::local_perf_coll()},
69 _crypto_random{std::make_unique<CryptoRandom>()}
70 {}
71
72 // define the dtor in .cc as CryptoRandom is an incomplete type in the header
73 CephContext::~CephContext()
74 {}
75
76 uint32_t CephContext::get_module_type() const
77 {
78 return CEPH_ENTITY_TYPE_OSD;
79 }
80
81 CryptoRandom* CephContext::random() const
82 {
83 return _crypto_random.get();
84 }
85
86 CephContext* CephContext::get()
87 {
88 ++nref;
89 return this;
90 }
91
92 void CephContext::put()
93 {
94 if (--nref == 0) {
95 delete this;
96 }
97 }
98
99 PerfCountersCollectionImpl* CephContext::get_perfcounters_collection()
100 {
101 return _perf_counters_collection.get_perf_collection();
102 }
103
104 }
105 #else // WITH_SEASTAR
106 namespace {
107
108 class LockdepObs : public md_config_obs_t {
109 public:
110 explicit LockdepObs(CephContext *cct)
111 : m_cct(cct), m_registered(false), lock(ceph::make_mutex("lock_dep_obs")) {
112 }
113 ~LockdepObs() override {
114 if (m_registered) {
115 lockdep_unregister_ceph_context(m_cct);
116 }
117 }
118
119 const char** get_tracked_conf_keys() const override {
120 static const char *KEYS[] = {"lockdep", NULL};
121 return KEYS;
122 }
123
124 void handle_conf_change(const ConfigProxy& conf,
125 const std::set <std::string> &changed) override {
126 std::unique_lock locker(lock);
127 if (conf->lockdep && !m_registered) {
128 lockdep_register_ceph_context(m_cct);
129 m_registered = true;
130 } else if (!conf->lockdep && m_registered) {
131 lockdep_unregister_ceph_context(m_cct);
132 m_registered = false;
133 }
134 }
135 private:
136 CephContext *m_cct;
137 bool m_registered;
138 ceph::mutex lock;
139 };
140
141 class MempoolObs : public md_config_obs_t,
142 public AdminSocketHook {
143 CephContext *cct;
144 ceph::mutex lock;
145
146 public:
147 explicit MempoolObs(CephContext *cct)
148 : cct(cct), lock(ceph::make_mutex("mem_pool_obs")) {
149 cct->_conf.add_observer(this);
150 int r = cct->get_admin_socket()->register_command(
151 "dump_mempools",
152 this,
153 "get mempool stats");
154 ceph_assert(r == 0);
155 }
156 ~MempoolObs() override {
157 cct->_conf.remove_observer(this);
158 cct->get_admin_socket()->unregister_commands(this);
159 }
160
161 // md_config_obs_t
162 const char** get_tracked_conf_keys() const override {
163 static const char *KEYS[] = {
164 "mempool_debug",
165 NULL
166 };
167 return KEYS;
168 }
169
170 void handle_conf_change(const ConfigProxy& conf,
171 const std::set <std::string> &changed) override {
172 std::unique_lock locker(lock);
173 if (changed.count("mempool_debug")) {
174 mempool::set_debug_mode(cct->_conf->mempool_debug);
175 }
176 }
177
178 // AdminSocketHook
179 int call(std::string_view command, const cmdmap_t& cmdmap,
180 ceph::Formatter *f,
181 std::ostream& errss,
182 bufferlist& out) override {
183 if (command == "dump_mempools") {
184 f->open_object_section("mempools");
185 mempool::dump(f);
186 f->close_section();
187 return 0;
188 }
189 return -ENOSYS;
190 }
191 };
192
193 } // anonymous namespace
194
195 namespace ceph::common {
196 class CephContextServiceThread : public Thread
197 {
198 public:
199 explicit CephContextServiceThread(CephContext *cct)
200 : _reopen_logs(false), _exit_thread(false), _cct(cct)
201 {
202 }
203
204 ~CephContextServiceThread() override {}
205
206 void *entry() override
207 {
208 while (1) {
209 std::unique_lock l(_lock);
210 if (_exit_thread) {
211 break;
212 }
213
214 if (_cct->_conf->heartbeat_interval) {
215 auto interval = ceph::make_timespan(_cct->_conf->heartbeat_interval);
216 _cond.wait_for(l, interval);
217 } else
218 _cond.wait(l);
219
220 if (_exit_thread) {
221 break;
222 }
223
224 if (_reopen_logs) {
225 _cct->_log->reopen_log_file();
226 _reopen_logs = false;
227 }
228 _cct->_heartbeat_map->check_touch_file();
229
230 // refresh the perf coutners
231 _cct->_refresh_perf_values();
232 }
233 return NULL;
234 }
235
236 void reopen_logs()
237 {
238 std::lock_guard l(_lock);
239 _reopen_logs = true;
240 _cond.notify_all();
241 }
242
243 void exit_thread()
244 {
245 std::lock_guard l(_lock);
246 _exit_thread = true;
247 _cond.notify_all();
248 }
249
250 private:
251 ceph::mutex _lock = ceph::make_mutex("CephContextServiceThread::_lock");
252 ceph::condition_variable _cond;
253 bool _reopen_logs;
254 bool _exit_thread;
255 CephContext *_cct;
256 };
257 }
258
259 /**
260 * observe logging config changes
261 *
262 * The logging subsystem sits below most of the ceph code, including
263 * the config subsystem, to keep it simple and self-contained. Feed
264 * logging-related config changes to the log.
265 */
266 class LogObs : public md_config_obs_t {
267 ceph::logging::Log *log;
268 ceph::mutex lock;
269
270 public:
271 explicit LogObs(ceph::logging::Log *l)
272 : log(l), lock(ceph::make_mutex("log_obs")) {
273 }
274
275 const char** get_tracked_conf_keys() const override {
276 static const char *KEYS[] = {
277 "log_file",
278 "log_max_new",
279 "log_max_recent",
280 "log_to_file",
281 "log_to_syslog",
282 "err_to_syslog",
283 "log_stderr_prefix",
284 "log_to_stderr",
285 "err_to_stderr",
286 "log_to_graylog",
287 "err_to_graylog",
288 "log_graylog_host",
289 "log_graylog_port",
290 "log_coarse_timestamps",
291 "fsid",
292 "host",
293 NULL
294 };
295 return KEYS;
296 }
297
298 void handle_conf_change(const ConfigProxy& conf,
299 const std::set <std::string> &changed) override {
300 std::unique_lock locker(lock);
301 // stderr
302 if (changed.count("log_to_stderr") || changed.count("err_to_stderr")) {
303 int l = conf->log_to_stderr ? 99 : (conf->err_to_stderr ? -1 : -2);
304 log->set_stderr_level(l, l);
305 }
306
307 // syslog
308 if (changed.count("log_to_syslog")) {
309 int l = conf->log_to_syslog ? 99 : (conf->err_to_syslog ? -1 : -2);
310 log->set_syslog_level(l, l);
311 }
312
313 // file
314 if (changed.count("log_file") ||
315 changed.count("log_to_file")) {
316 if (conf->log_to_file) {
317 log->set_log_file(conf->log_file);
318 } else {
319 log->set_log_file({});
320 }
321 log->reopen_log_file();
322 }
323
324 if (changed.count("log_stderr_prefix")) {
325 log->set_log_stderr_prefix(conf.get_val<std::string>("log_stderr_prefix"));
326 }
327
328 if (changed.count("log_max_new")) {
329
330 log->set_max_new(conf->log_max_new);
331 }
332
333 if (changed.count("log_max_recent")) {
334 log->set_max_recent(conf->log_max_recent);
335 }
336
337 // graylog
338 if (changed.count("log_to_graylog") || changed.count("err_to_graylog")) {
339 int l = conf->log_to_graylog ? 99 : (conf->err_to_graylog ? -1 : -2);
340 log->set_graylog_level(l, l);
341
342 if (conf->log_to_graylog || conf->err_to_graylog) {
343 log->start_graylog();
344 } else if (! (conf->log_to_graylog && conf->err_to_graylog)) {
345 log->stop_graylog();
346 }
347 }
348
349 if (log->graylog() && (changed.count("log_graylog_host") || changed.count("log_graylog_port"))) {
350 log->graylog()->set_destination(conf->log_graylog_host, conf->log_graylog_port);
351 }
352
353 if (changed.find("log_coarse_timestamps") != changed.end()) {
354 log->set_coarse_timestamps(conf.get_val<bool>("log_coarse_timestamps"));
355 }
356
357 // metadata
358 if (log->graylog() && changed.count("host")) {
359 log->graylog()->set_hostname(conf->host);
360 }
361
362 if (log->graylog() && changed.count("fsid")) {
363 log->graylog()->set_fsid(conf.get_val<uuid_d>("fsid"));
364 }
365 }
366 };
367
368
369 namespace ceph::common {
370 // cct config watcher
371 class CephContextObs : public md_config_obs_t {
372 CephContext *cct;
373
374 public:
375 explicit CephContextObs(CephContext *cct) : cct(cct) {}
376
377 const char** get_tracked_conf_keys() const override {
378 static const char *KEYS[] = {
379 "enable_experimental_unrecoverable_data_corrupting_features",
380 "crush_location",
381 "container_image", // just so we don't hear complaints about it!
382 NULL
383 };
384 return KEYS;
385 }
386
387 void handle_conf_change(const ConfigProxy& conf,
388 const std::set <std::string> &changed) override {
389 if (changed.count(
390 "enable_experimental_unrecoverable_data_corrupting_features")) {
391 std::lock_guard lg(cct->_feature_lock);
392
393 cct->_experimental_features.clear();
394 auto add_experimental_feature = [this] (auto feature) {
395 cct->_experimental_features.emplace(std::string{feature});
396 };
397 for_each_substr(conf->enable_experimental_unrecoverable_data_corrupting_features,
398 ";,= \t", add_experimental_feature);
399
400 if (getenv("CEPH_DEV") == NULL) {
401 if (!cct->_experimental_features.empty()) {
402 if (cct->_experimental_features.count("*")) {
403 lderr(cct) << "WARNING: all dangerous and experimental features are enabled." << dendl;
404 } else {
405 lderr(cct) << "WARNING: the following dangerous and experimental features are enabled: "
406 << cct->_experimental_features << dendl;
407 }
408 }
409 }
410
411 }
412 if (changed.count("crush_location")) {
413 cct->crush_location.update_from_conf();
414 }
415 }
416 };
417 // perfcounter hooks
418
419 class CephContextHook : public AdminSocketHook {
420 CephContext *m_cct;
421
422 public:
423 explicit CephContextHook(CephContext *cct) : m_cct(cct) {}
424
425 int call(std::string_view command, const cmdmap_t& cmdmap,
426 Formatter *f,
427 std::ostream& errss,
428 bufferlist& out) override {
429 try {
430 return m_cct->do_command(command, cmdmap, f, errss, &out);
431 } catch (const bad_cmd_get& e) {
432 return -EINVAL;
433 }
434 }
435 };
436
437
438 bool CephContext::check_experimental_feature_enabled(const std::string& feat)
439 {
440 std::stringstream message;
441 bool enabled = check_experimental_feature_enabled(feat, &message);
442 lderr(this) << message.str() << dendl;
443 return enabled;
444 }
445
446 bool CephContext::check_experimental_feature_enabled(const std::string& feat,
447 std::ostream *message)
448 {
449 std::unique_lock<ceph::spinlock> lg(_feature_lock);
450
451 bool enabled = (_experimental_features.count(feat) ||
452 _experimental_features.count("*"));
453
454 if (enabled) {
455 (*message) << "WARNING: experimental feature '" << feat << "' is enabled\n";
456 (*message) << "Please be aware that this feature is experimental, untested,\n";
457 (*message) << "unsupported, and may result in data corruption, data loss,\n";
458 (*message) << "and/or irreparable damage to your cluster. Do not use\n";
459 (*message) << "feature with important data.\n";
460 } else {
461 (*message) << "*** experimental feature '" << feat << "' is not enabled ***\n";
462 (*message) << "This feature is marked as experimental, which means it\n";
463 (*message) << " - is untested\n";
464 (*message) << " - is unsupported\n";
465 (*message) << " - may corrupt your data\n";
466 (*message) << " - may break your cluster is an unrecoverable fashion\n";
467 (*message) << "To enable this feature, add this to your ceph.conf:\n";
468 (*message) << " enable experimental unrecoverable data corrupting features = " << feat << "\n";
469 }
470 return enabled;
471 }
472
473 int CephContext::do_command(std::string_view command, const cmdmap_t& cmdmap,
474 Formatter *f,
475 std::ostream& ss,
476 bufferlist *out)
477 {
478 try {
479 return _do_command(command, cmdmap, f, ss, out);
480 } catch (const bad_cmd_get& e) {
481 ss << e.what();
482 return -EINVAL;
483 }
484 }
485
486 int CephContext::_do_command(
487 std::string_view command, const cmdmap_t& cmdmap,
488 Formatter *f,
489 std::ostream& ss,
490 bufferlist *out)
491 {
492 int r = 0;
493 lgeneric_dout(this, 1) << "do_command '" << command << "' '" << cmdmap << "'"
494 << dendl;
495 ceph_assert_always(!(command == "assert" && _conf->debug_asok_assert_abort));
496 if (command == "abort") {
497 if (_conf->debug_asok_assert_abort) {
498 ceph_abort();
499 } else {
500 return -EPERM;
501 }
502 }
503 if (command == "leak_some_memory") {
504 char *foo = new char[1234];
505 (void)foo;
506 }
507 else if (command == "perfcounters_dump" || command == "1" ||
508 command == "perf dump") {
509 std::string logger;
510 std::string counter;
511 cmd_getval(cmdmap, "logger", logger);
512 cmd_getval(cmdmap, "counter", counter);
513 _perf_counters_collection->dump_formatted(f, false, logger, counter);
514 }
515 else if (command == "perfcounters_schema" || command == "2" ||
516 command == "perf schema") {
517 _perf_counters_collection->dump_formatted(f, true);
518 }
519 else if (command == "perf histogram dump") {
520 std::string logger;
521 std::string counter;
522 cmd_getval(cmdmap, "logger", logger);
523 cmd_getval(cmdmap, "counter", counter);
524 _perf_counters_collection->dump_formatted_histograms(f, false, logger,
525 counter);
526 }
527 else if (command == "perf histogram schema") {
528 _perf_counters_collection->dump_formatted_histograms(f, true);
529 }
530 else if (command == "perf reset") {
531 std::string var;
532 std::string section(command);
533 f->open_object_section(section.c_str());
534 if (!cmd_getval(cmdmap, "var", var)) {
535 f->dump_string("error", "syntax error: 'perf reset <var>'");
536 } else {
537 if(!_perf_counters_collection->reset(var))
538 f->dump_stream("error") << "Not find: " << var;
539 else
540 f->dump_string("success", std::string(command) + ' ' + var);
541 }
542 f->close_section();
543 }
544 else {
545 std::string section(command);
546 boost::replace_all(section, " ", "_");
547 f->open_object_section(section.c_str());
548 if (command == "config show") {
549 _conf.show_config(f);
550 }
551 else if (command == "config unset") {
552 std::string var;
553 if (!(cmd_getval(cmdmap, "var", var))) {
554 r = -EINVAL;
555 } else {
556 r = _conf.rm_val(var.c_str());
557 if (r < 0 && r != -ENOENT) {
558 ss << "error unsetting '" << var << "': "
559 << cpp_strerror(r);
560 } else {
561 _conf.apply_changes(&ss);
562 r = 0;
563 }
564 }
565
566 }
567 else if (command == "config set") {
568 std::string var;
569 std::vector<std::string> val;
570
571 if (!(cmd_getval(cmdmap, "var", var)) ||
572 !(cmd_getval(cmdmap, "val", val))) {
573 r = -EINVAL;
574 } else {
575 // val may be multiple words
576 auto valstr = str_join(val, " ");
577 r = _conf.set_val(var.c_str(), valstr.c_str());
578 if (r < 0) {
579 ss << "error setting '" << var << "' to '" << valstr << "': "
580 << cpp_strerror(r);
581 } else {
582 std::stringstream ss;
583 _conf.apply_changes(&ss);
584 f->dump_string("success", ss.str());
585 }
586 }
587 } else if (command == "config get") {
588 std::string var;
589 if (!cmd_getval(cmdmap, "var", var)) {
590 r = -EINVAL;
591 } else {
592 char buf[4096];
593 // FIPS zeroization audit 20191115: this memset is not security related.
594 memset(buf, 0, sizeof(buf));
595 char *tmp = buf;
596 r = _conf.get_val(var.c_str(), &tmp, sizeof(buf));
597 if (r < 0) {
598 ss << "error getting '" << var << "': " << cpp_strerror(r);
599 } else {
600 f->dump_string(var.c_str(), buf);
601 }
602 }
603 } else if (command == "config help") {
604 std::string var;
605 if (cmd_getval(cmdmap, "var", var)) {
606 // Output a single one
607 std::string key = ConfFile::normalize_key_name(var);
608 auto schema = _conf.get_schema(key);
609 if (!schema) {
610 ss << "Setting not found: '" << key << "'";
611 r = -ENOENT;
612 } else {
613 f->dump_object("option", *schema);
614 }
615 } else {
616 // Output all
617 f->open_array_section("options");
618 for (const auto &option : ceph_options) {
619 f->dump_object("option", option);
620 }
621 f->close_section();
622 }
623 } else if (command == "config diff") {
624 f->open_object_section("diff");
625 _conf.diff(f);
626 f->close_section(); // unknown
627 } else if (command == "config diff get") {
628 std::string setting;
629 f->open_object_section("diff");
630 _conf.diff(f, setting);
631 f->close_section(); // unknown
632 }
633 else if (command == "injectargs") {
634 std::vector<std::string> argsvec;
635 cmd_getval(cmdmap, "injected_args", argsvec);
636 if (!argsvec.empty()) {
637 auto args = joinify<std::string>(argsvec.begin(), argsvec.end(), " ");
638 r = _conf.injectargs(args, &ss);
639 }
640 }
641 else if (command == "log flush") {
642 _log->flush();
643 }
644 else if (command == "log dump") {
645 _log->dump_recent();
646 }
647 else if (command == "log reopen") {
648 _log->reopen_log_file();
649 }
650 else {
651 ceph_abort_msg("registered under wrong command?");
652 }
653 f->close_section();
654 }
655 lgeneric_dout(this, 1) << "do_command '" << command << "' '" << cmdmap
656 << "' result is " << out->length() << " bytes" << dendl;
657 return r;
658 }
659
660 CephContext::CephContext(uint32_t module_type_,
661 enum code_environment_t code_env,
662 int init_flags_)
663 : nref(1),
664 _conf{code_env == CODE_ENVIRONMENT_DAEMON},
665 _log(NULL),
666 _module_type(module_type_),
667 _init_flags(init_flags_),
668 _set_uid(0),
669 _set_gid(0),
670 _set_uid_string(),
671 _set_gid_string(),
672 _crypto_inited(0),
673 _service_thread(NULL),
674 _log_obs(NULL),
675 _admin_socket(NULL),
676 _perf_counters_collection(NULL),
677 _perf_counters_conf_obs(NULL),
678 _heartbeat_map(NULL),
679 _crypto_none(NULL),
680 _crypto_aes(NULL),
681 _plugin_registry(NULL),
682 _lockdep_obs(NULL),
683 crush_location(this)
684 {
685 _log = new ceph::logging::Log(&_conf->subsys);
686
687 _log_obs = new LogObs(_log);
688 _conf.add_observer(_log_obs);
689
690 _cct_obs = new CephContextObs(this);
691 _conf.add_observer(_cct_obs);
692
693 _lockdep_obs = new LockdepObs(this);
694 _conf.add_observer(_lockdep_obs);
695
696 _perf_counters_collection = new PerfCountersCollection(this);
697
698 _admin_socket = new AdminSocket(this);
699 _heartbeat_map = new HeartbeatMap(this);
700
701 _plugin_registry = new PluginRegistry(this);
702
703 _admin_hook = new CephContextHook(this);
704 _admin_socket->register_command("assert", _admin_hook, "");
705 _admin_socket->register_command("abort", _admin_hook, "");
706 _admin_socket->register_command("leak_some_memory", _admin_hook, "");
707 _admin_socket->register_command("perfcounters_dump", _admin_hook, "");
708 _admin_socket->register_command("1", _admin_hook, "");
709 _admin_socket->register_command("perf dump name=logger,type=CephString,req=false name=counter,type=CephString,req=false", _admin_hook, "dump perfcounters value");
710 _admin_socket->register_command("perfcounters_schema", _admin_hook, "");
711 _admin_socket->register_command("perf histogram dump name=logger,type=CephString,req=false name=counter,type=CephString,req=false", _admin_hook, "dump perf histogram values");
712 _admin_socket->register_command("2", _admin_hook, "");
713 _admin_socket->register_command("perf schema", _admin_hook, "dump perfcounters schema");
714 _admin_socket->register_command("perf histogram schema", _admin_hook, "dump perf histogram schema");
715 _admin_socket->register_command("perf reset name=var,type=CephString", _admin_hook, "perf reset <name>: perf reset all or one perfcounter name");
716 _admin_socket->register_command("config show", _admin_hook, "dump current config settings");
717 _admin_socket->register_command("config help name=var,type=CephString,req=false", _admin_hook, "get config setting schema and descriptions");
718 _admin_socket->register_command("config set name=var,type=CephString name=val,type=CephString,n=N", _admin_hook, "config set <field> <val> [<val> ...]: set a config variable");
719 _admin_socket->register_command("config unset name=var,type=CephString", _admin_hook, "config unset <field>: unset a config variable");
720 _admin_socket->register_command("config get name=var,type=CephString", _admin_hook, "config get <field>: get the config value");
721 _admin_socket->register_command(
722 "config diff", _admin_hook,
723 "dump diff of current config and default config");
724 _admin_socket->register_command(
725 "config diff get name=var,type=CephString", _admin_hook,
726 "dump diff get <field>: dump diff of current and default config setting <field>");
727 _admin_socket->register_command("injectargs name=injected_args,type=CephString,n=N", _admin_hook, "inject configuration arguments into running daemon"),
728 _admin_socket->register_command("log flush", _admin_hook, "flush log entries to log file");
729 _admin_socket->register_command("log dump", _admin_hook, "dump recent log entries to log file");
730 _admin_socket->register_command("log reopen", _admin_hook, "reopen log file");
731
732 _crypto_none = CryptoHandler::create(CEPH_CRYPTO_NONE);
733 _crypto_aes = CryptoHandler::create(CEPH_CRYPTO_AES);
734 _crypto_random.reset(new CryptoRandom());
735
736 lookup_or_create_singleton_object<MempoolObs>("mempool_obs", false, this);
737 }
738
739 CephContext::~CephContext()
740 {
741 associated_objs.clear();
742 join_service_thread();
743
744 if (_cct_perf) {
745 _perf_counters_collection->remove(_cct_perf);
746 delete _cct_perf;
747 _cct_perf = NULL;
748 }
749
750 delete _plugin_registry;
751
752 _admin_socket->unregister_commands(_admin_hook);
753 delete _admin_hook;
754 delete _admin_socket;
755
756 delete _heartbeat_map;
757
758 delete _perf_counters_collection;
759 _perf_counters_collection = NULL;
760
761 delete _perf_counters_conf_obs;
762 _perf_counters_conf_obs = NULL;
763
764 _conf.remove_observer(_log_obs);
765 delete _log_obs;
766 _log_obs = NULL;
767
768 _conf.remove_observer(_cct_obs);
769 delete _cct_obs;
770 _cct_obs = NULL;
771
772 _conf.remove_observer(_lockdep_obs);
773 delete _lockdep_obs;
774 _lockdep_obs = NULL;
775
776 _log->stop();
777 delete _log;
778 _log = NULL;
779
780 delete _crypto_none;
781 delete _crypto_aes;
782 if (_crypto_inited > 0) {
783 ceph_assert(_crypto_inited == 1); // or else someone explicitly did
784 // init but not shutdown
785 shutdown_crypto();
786 }
787 }
788
789 void CephContext::put() {
790 if (--nref == 0) {
791 ANNOTATE_HAPPENS_AFTER(&nref);
792 ANNOTATE_HAPPENS_BEFORE_FORGET_ALL(&nref);
793 if (g_ceph_context == this)
794 g_ceph_context = nullptr;
795 delete this;
796 } else {
797 ANNOTATE_HAPPENS_BEFORE(&nref);
798 }
799 }
800
801 void CephContext::init_crypto()
802 {
803 if (_crypto_inited++ == 0) {
804 TOPNSPC::crypto::init();
805 }
806 }
807
808 void CephContext::shutdown_crypto()
809 {
810 if (--_crypto_inited == 0) {
811 TOPNSPC::crypto::shutdown(g_code_env == CODE_ENVIRONMENT_LIBRARY);
812 }
813 }
814
815 void CephContext::start_service_thread()
816 {
817 {
818 std::lock_guard lg(_service_thread_lock);
819 if (_service_thread) {
820 return;
821 }
822 _service_thread = new CephContextServiceThread(this);
823 _service_thread->create("service");
824 }
825
826 if (!(get_init_flags() & CINIT_FLAG_NO_CCT_PERF_COUNTERS))
827 _enable_perf_counter();
828
829 // make logs flush on_exit()
830 if (_conf->log_flush_on_exit)
831 _log->set_flush_on_exit();
832
833 // Trigger callbacks on any config observers that were waiting for
834 // it to become safe to start threads.
835 _conf.set_safe_to_start_threads();
836 _conf.call_all_observers();
837
838 // start admin socket
839 if (_conf->admin_socket.length())
840 _admin_socket->init(_conf->admin_socket);
841 }
842
843 void CephContext::reopen_logs()
844 {
845 std::lock_guard lg(_service_thread_lock);
846 if (_service_thread)
847 _service_thread->reopen_logs();
848 }
849
850 void CephContext::join_service_thread()
851 {
852 std::unique_lock<ceph::spinlock> lg(_service_thread_lock);
853
854 CephContextServiceThread *thread = _service_thread;
855 if (!thread) {
856 return;
857 }
858 _service_thread = NULL;
859
860 lg.unlock();
861
862 thread->exit_thread();
863 thread->join();
864 delete thread;
865
866 if (!(get_init_flags() & CINIT_FLAG_NO_CCT_PERF_COUNTERS))
867 _disable_perf_counter();
868 }
869
870 uint32_t CephContext::get_module_type() const
871 {
872 return _module_type;
873 }
874
875 void CephContext::set_init_flags(int flags)
876 {
877 _init_flags = flags;
878 }
879
880 int CephContext::get_init_flags() const
881 {
882 return _init_flags;
883 }
884
885 PerfCountersCollection *CephContext::get_perfcounters_collection()
886 {
887 return _perf_counters_collection;
888 }
889
890 void CephContext::_enable_perf_counter()
891 {
892 assert(!_cct_perf);
893 PerfCountersBuilder plb(this, "cct", l_cct_first, l_cct_last);
894 plb.add_u64(l_cct_total_workers, "total_workers", "Total workers");
895 plb.add_u64(l_cct_unhealthy_workers, "unhealthy_workers", "Unhealthy workers");
896 _cct_perf = plb.create_perf_counters();
897 _perf_counters_collection->add(_cct_perf);
898
899 assert(_mempool_perf_names.empty());
900 assert(_mempool_perf_descriptions.empty());
901 _mempool_perf_names.reserve(mempool::num_pools * 2);
902 _mempool_perf_descriptions.reserve(mempool::num_pools * 2);
903 for (unsigned i = 0; i < mempool::num_pools; ++i) {
904 std::string n = mempool::get_pool_name(mempool::pool_index_t(i));
905 _mempool_perf_names.push_back(n + "_bytes"s);
906 _mempool_perf_descriptions.push_back(
907 "mempool "s + n + " total bytes");
908 _mempool_perf_names.push_back(n + "_items"s);
909 _mempool_perf_descriptions.push_back(
910 "mempool "s + n + " total items"s);
911 }
912
913 PerfCountersBuilder plb2(this, "mempool", l_mempool_first,
914 l_mempool_first + 1 + 2*mempool::num_pools);
915 unsigned l = l_mempool_first + 1;
916 for (unsigned i = 0; i < mempool::num_pools; ++i) {
917 plb2.add_u64(l++, _mempool_perf_names[i*2].c_str(),
918 _mempool_perf_descriptions[i*2].c_str());
919 plb2.add_u64(l++, _mempool_perf_names[i*2+1].c_str(),
920 _mempool_perf_descriptions[i*2+1].c_str());
921 }
922 _mempool_perf = plb2.create_perf_counters();
923 _perf_counters_collection->add(_mempool_perf);
924 }
925
926 void CephContext::_disable_perf_counter()
927 {
928 if (!_cct_perf) {
929 return;
930 }
931 _perf_counters_collection->remove(_cct_perf);
932 delete _cct_perf;
933 _cct_perf = nullptr;
934
935 _perf_counters_collection->remove(_mempool_perf);
936 delete _mempool_perf;
937 _mempool_perf = nullptr;
938 _mempool_perf_names.clear();
939 _mempool_perf_descriptions.clear();
940 }
941
942 void CephContext::_refresh_perf_values()
943 {
944 if (_cct_perf) {
945 _cct_perf->set(l_cct_total_workers, _heartbeat_map->get_total_workers());
946 _cct_perf->set(l_cct_unhealthy_workers, _heartbeat_map->get_unhealthy_workers());
947 }
948 unsigned l = l_mempool_first + 1;
949 for (unsigned i = 0; i < mempool::num_pools; ++i) {
950 mempool::pool_t& p = mempool::get_pool(mempool::pool_index_t(i));
951 _mempool_perf->set(l++, p.allocated_bytes());
952 _mempool_perf->set(l++, p.allocated_items());
953 }
954 }
955
956 AdminSocket *CephContext::get_admin_socket()
957 {
958 return _admin_socket;
959 }
960
961 CryptoHandler *CephContext::get_crypto_handler(int type)
962 {
963 switch (type) {
964 case CEPH_CRYPTO_NONE:
965 return _crypto_none;
966 case CEPH_CRYPTO_AES:
967 return _crypto_aes;
968 default:
969 return NULL;
970 }
971 }
972
973 void CephContext::notify_pre_fork()
974 {
975 {
976 std::lock_guard lg(_fork_watchers_lock);
977 for (auto &&t : _fork_watchers) {
978 t->handle_pre_fork();
979 }
980 }
981 {
982 // note: we don't hold a lock here, but we assume we are idle at
983 // fork time, which happens during process init and startup.
984 auto i = associated_objs.begin();
985 while (i != associated_objs.end()) {
986 if (associated_objs_drop_on_fork.count(i->first.first)) {
987 i = associated_objs.erase(i);
988 } else {
989 ++i;
990 }
991 }
992 associated_objs_drop_on_fork.clear();
993 }
994 }
995
996 void CephContext::notify_post_fork()
997 {
998 ceph::spin_unlock(&_fork_watchers_lock);
999 for (auto &&t : _fork_watchers)
1000 t->handle_post_fork();
1001 }
1002
1003 void CephContext::set_mon_addrs(const MonMap& mm) {
1004 std::vector<entity_addrvec_t> mon_addrs;
1005 for (auto& i : mm.mon_info) {
1006 mon_addrs.push_back(i.second.public_addrs);
1007 }
1008
1009 set_mon_addrs(mon_addrs);
1010 }
1011 }
1012 #endif // WITH_SEASTAR