]> git.proxmox.com Git - ceph.git/blob - ceph/src/mon/LogMonitor.cc
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / mon / LogMonitor.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #include <boost/algorithm/string/predicate.hpp>
16
17 #include <sstream>
18 #include <syslog.h>
19
20 #include "LogMonitor.h"
21 #include "Monitor.h"
22 #include "MonitorDBStore.h"
23
24 #include "messages/MMonCommand.h"
25 #include "messages/MLog.h"
26 #include "messages/MLogAck.h"
27 #include "common/Graylog.h"
28 #include "common/errno.h"
29 #include "common/strtol.h"
30 #include "include/ceph_assert.h"
31 #include "include/str_list.h"
32 #include "include/str_map.h"
33 #include "include/compat.h"
34
35 #define dout_subsys ceph_subsys_mon
36 #undef dout_prefix
37 #define dout_prefix _prefix(_dout, mon, get_last_committed())
38 static ostream& _prefix(std::ostream *_dout, Monitor *mon, version_t v) {
39 return *_dout << "mon." << mon->name << "@" << mon->rank
40 << "(" << mon->get_state_name()
41 << ").log v" << v << " ";
42 }
43
44 ostream& operator<<(ostream &out, const LogMonitor &pm)
45 {
46 return out << "log";
47 }
48
49 /*
50 Tick function to update the map based on performance every N seconds
51 */
52
53 void LogMonitor::tick()
54 {
55 if (!is_active()) return;
56
57 dout(10) << *this << dendl;
58
59 }
60
61 void LogMonitor::create_initial()
62 {
63 dout(10) << "create_initial -- creating initial map" << dendl;
64 LogEntry e;
65 e.name = g_conf()->name;
66 e.rank = entity_name_t::MON(mon->rank);
67 e.addrs = mon->messenger->get_myaddrs();
68 e.stamp = ceph_clock_now();
69 e.prio = CLOG_INFO;
70 std::stringstream ss;
71 ss << "mkfs " << mon->monmap->get_fsid();
72 e.msg = ss.str();
73 e.seq = 0;
74 pending_log.insert(pair<utime_t,LogEntry>(e.stamp, e));
75 }
76
77 void LogMonitor::update_from_paxos(bool *need_bootstrap)
78 {
79 dout(10) << __func__ << dendl;
80 version_t version = get_last_committed();
81 dout(10) << __func__ << " version " << version
82 << " summary v " << summary.version << dendl;
83 if (version == summary.version)
84 return;
85 ceph_assert(version >= summary.version);
86
87 map<string,bufferlist> channel_blog;
88
89 version_t latest_full = get_version_latest_full();
90 dout(10) << __func__ << " latest full " << latest_full << dendl;
91 if ((latest_full > 0) && (latest_full > summary.version)) {
92 bufferlist latest_bl;
93 get_version_full(latest_full, latest_bl);
94 ceph_assert(latest_bl.length() != 0);
95 dout(7) << __func__ << " loading summary e" << latest_full << dendl;
96 auto p = latest_bl.cbegin();
97 decode(summary, p);
98 dout(7) << __func__ << " loaded summary e" << summary.version << dendl;
99 }
100
101 // walk through incrementals
102 while (version > summary.version) {
103 bufferlist bl;
104 int err = get_version(summary.version+1, bl);
105 ceph_assert(err == 0);
106 ceph_assert(bl.length());
107
108 auto p = bl.cbegin();
109 __u8 v;
110 decode(v, p);
111 while (!p.end()) {
112 LogEntry le;
113 le.decode(p);
114 dout(7) << "update_from_paxos applying incremental log " << summary.version+1 << " " << le << dendl;
115
116 string channel = le.channel;
117 if (channel.empty()) // keep retrocompatibility
118 channel = CLOG_CHANNEL_CLUSTER;
119
120 if (g_conf().get_val<bool>("mon_cluster_log_to_stderr")) {
121 cerr << channel << " " << le << std::endl;
122 }
123
124 if (channels.do_log_to_syslog(channel)) {
125 string level = channels.get_level(channel);
126 string facility = channels.get_facility(channel);
127 if (level.empty() || facility.empty()) {
128 derr << __func__ << " unable to log to syslog -- level or facility"
129 << " not defined (level: " << level << ", facility: "
130 << facility << ")" << dendl;
131 continue;
132 }
133 le.log_to_syslog(channels.get_level(channel),
134 channels.get_facility(channel));
135 }
136
137 if (channels.do_log_to_graylog(channel)) {
138 ceph::logging::Graylog::Ref graylog = channels.get_graylog(channel);
139 if (graylog) {
140 graylog->log_log_entry(&le);
141 }
142 dout(7) << "graylog: " << channel << " " << graylog
143 << " host:" << channels.log_to_graylog_host << dendl;
144 }
145
146 if (g_conf()->mon_cluster_log_to_file) {
147 string log_file = channels.get_log_file(channel);
148 dout(20) << __func__ << " logging for channel '" << channel
149 << "' to file '" << log_file << "'" << dendl;
150
151 if (!log_file.empty()) {
152 string log_file_level = channels.get_log_file_level(channel);
153 if (log_file_level.empty()) {
154 dout(1) << __func__ << " warning: log file level not defined for"
155 << " channel '" << channel << "' yet a log file is --"
156 << " will assume lowest level possible" << dendl;
157 }
158
159 int min = string_to_syslog_level(log_file_level);
160 int l = clog_type_to_syslog_level(le.prio);
161 if (l <= min) {
162 stringstream ss;
163 ss << le << "\n";
164 // init entry if DNE
165 bufferlist &blog = channel_blog[channel];
166 blog.append(ss.str());
167 }
168 }
169 }
170
171 summary.add(le);
172 }
173
174 summary.version++;
175 summary.prune(g_conf()->mon_log_max_summary);
176 }
177
178 dout(15) << __func__ << " logging for "
179 << channel_blog.size() << " channels" << dendl;
180 for(map<string,bufferlist>::iterator p = channel_blog.begin();
181 p != channel_blog.end(); ++p) {
182 if (!p->second.length()) {
183 dout(15) << __func__ << " channel '" << p->first
184 << "': nothing to log" << dendl;
185 continue;
186 }
187
188 dout(15) << __func__ << " channel '" << p->first
189 << "' logging " << p->second.length() << " bytes" << dendl;
190 string log_file = channels.get_log_file(p->first);
191
192 int fd = ::open(log_file.c_str(), O_WRONLY|O_APPEND|O_CREAT|O_CLOEXEC, 0600);
193 if (fd < 0) {
194 int err = -errno;
195 dout(1) << "unable to write to '" << log_file << "' for channel '"
196 << p->first << "': " << cpp_strerror(err) << dendl;
197 } else {
198 int err = p->second.write_fd(fd);
199 if (err < 0) {
200 dout(1) << "error writing to '" << log_file << "' for channel '"
201 << p->first << ": " << cpp_strerror(err) << dendl;
202 }
203 VOID_TEMP_FAILURE_RETRY(::close(fd));
204 }
205 }
206
207 check_subs();
208 }
209
210 void LogMonitor::create_pending()
211 {
212 pending_log.clear();
213 pending_summary = summary;
214 dout(10) << "create_pending v " << (get_last_committed() + 1) << dendl;
215 }
216
217 void LogMonitor::encode_pending(MonitorDBStore::TransactionRef t)
218 {
219 version_t version = get_last_committed() + 1;
220 bufferlist bl;
221 dout(10) << __func__ << " v" << version << dendl;
222 __u8 v = 1;
223 encode(v, bl);
224 multimap<utime_t,LogEntry>::iterator p;
225 for (p = pending_log.begin(); p != pending_log.end(); ++p)
226 p->second.encode(bl, mon->get_quorum_con_features());
227
228 put_version(t, version, bl);
229 put_last_committed(t, version);
230 }
231
232 void LogMonitor::encode_full(MonitorDBStore::TransactionRef t)
233 {
234 dout(10) << __func__ << " log v " << summary.version << dendl;
235 ceph_assert(get_last_committed() == summary.version);
236
237 bufferlist summary_bl;
238 encode(summary, summary_bl, mon->get_quorum_con_features());
239
240 put_version_full(t, summary.version, summary_bl);
241 put_version_latest_full(t, summary.version);
242 }
243
244 version_t LogMonitor::get_trim_to() const
245 {
246 if (!mon->is_leader())
247 return 0;
248
249 unsigned max = g_conf()->mon_max_log_epochs;
250 version_t version = get_last_committed();
251 if (version > max)
252 return version - max;
253 return 0;
254 }
255
256 bool LogMonitor::preprocess_query(MonOpRequestRef op)
257 {
258 op->mark_logmon_event("preprocess_query");
259 PaxosServiceMessage *m = static_cast<PaxosServiceMessage*>(op->get_req());
260 dout(10) << "preprocess_query " << *m << " from " << m->get_orig_source_inst() << dendl;
261 switch (m->get_type()) {
262 case MSG_MON_COMMAND:
263 try {
264 return preprocess_command(op);
265 } catch (const bad_cmd_get& e) {
266 bufferlist bl;
267 mon->reply_command(op, -EINVAL, e.what(), bl, get_last_committed());
268 return true;
269 }
270
271 case MSG_LOG:
272 return preprocess_log(op);
273
274 default:
275 ceph_abort();
276 return true;
277 }
278 }
279
280 bool LogMonitor::prepare_update(MonOpRequestRef op)
281 {
282 op->mark_logmon_event("prepare_update");
283 PaxosServiceMessage *m = static_cast<PaxosServiceMessage*>(op->get_req());
284 dout(10) << "prepare_update " << *m << " from " << m->get_orig_source_inst() << dendl;
285 switch (m->get_type()) {
286 case MSG_MON_COMMAND:
287 try {
288 return prepare_command(op);
289 } catch (const bad_cmd_get& e) {
290 bufferlist bl;
291 mon->reply_command(op, -EINVAL, e.what(), bl, get_last_committed());
292 return true;
293 }
294 case MSG_LOG:
295 return prepare_log(op);
296 default:
297 ceph_abort();
298 return false;
299 }
300 }
301
302 bool LogMonitor::preprocess_log(MonOpRequestRef op)
303 {
304 op->mark_logmon_event("preprocess_log");
305 MLog *m = static_cast<MLog*>(op->get_req());
306 dout(10) << "preprocess_log " << *m << " from " << m->get_orig_source() << dendl;
307 int num_new = 0;
308
309 MonSession *session = op->get_session();
310 if (!session)
311 goto done;
312 if (!session->is_capable("log", MON_CAP_W)) {
313 dout(0) << "preprocess_log got MLog from entity with insufficient privileges "
314 << session->caps << dendl;
315 goto done;
316 }
317
318 for (deque<LogEntry>::iterator p = m->entries.begin();
319 p != m->entries.end();
320 ++p) {
321 if (!pending_summary.contains(p->key()))
322 num_new++;
323 }
324 if (!num_new) {
325 dout(10) << " nothing new" << dendl;
326 goto done;
327 }
328
329 return false;
330
331 done:
332 mon->no_reply(op);
333 return true;
334 }
335
336 struct LogMonitor::C_Log : public C_MonOp {
337 LogMonitor *logmon;
338 C_Log(LogMonitor *p, MonOpRequestRef o) :
339 C_MonOp(o), logmon(p) {}
340 void _finish(int r) override {
341 if (r == -ECANCELED) {
342 return;
343 }
344 logmon->_updated_log(op);
345 }
346 };
347
348 bool LogMonitor::prepare_log(MonOpRequestRef op)
349 {
350 op->mark_logmon_event("prepare_log");
351 MLog *m = static_cast<MLog*>(op->get_req());
352 dout(10) << "prepare_log " << *m << " from " << m->get_orig_source() << dendl;
353
354 if (m->fsid != mon->monmap->fsid) {
355 dout(0) << "handle_log on fsid " << m->fsid << " != " << mon->monmap->fsid
356 << dendl;
357 return false;
358 }
359
360 for (deque<LogEntry>::iterator p = m->entries.begin();
361 p != m->entries.end();
362 ++p) {
363 dout(10) << " logging " << *p << dendl;
364 if (!pending_summary.contains(p->key())) {
365 pending_summary.add(*p);
366 pending_log.insert(pair<utime_t,LogEntry>(p->stamp, *p));
367 }
368 }
369 pending_summary.prune(g_conf()->mon_log_max_summary);
370 wait_for_finished_proposal(op, new C_Log(this, op));
371 return true;
372 }
373
374 void LogMonitor::_updated_log(MonOpRequestRef op)
375 {
376 MLog *m = static_cast<MLog*>(op->get_req());
377 dout(7) << "_updated_log for " << m->get_orig_source_inst() << dendl;
378 mon->send_reply(op, new MLogAck(m->fsid, m->entries.rbegin()->seq));
379 }
380
381 bool LogMonitor::should_propose(double& delay)
382 {
383 // commit now if we have a lot of pending events
384 if (g_conf()->mon_max_log_entries_per_event > 0 &&
385 pending_log.size() >= (unsigned)g_conf()->mon_max_log_entries_per_event)
386 return true;
387
388 // otherwise fall back to generic policy
389 return PaxosService::should_propose(delay);
390 }
391
392
393 bool LogMonitor::preprocess_command(MonOpRequestRef op)
394 {
395 op->mark_logmon_event("preprocess_command");
396 MMonCommand *m = static_cast<MMonCommand*>(op->get_req());
397 int r = -EINVAL;
398 bufferlist rdata;
399 stringstream ss;
400
401 cmdmap_t cmdmap;
402 if (!cmdmap_from_json(m->cmd, &cmdmap, ss)) {
403 string rs = ss.str();
404 mon->reply_command(op, -EINVAL, rs, get_last_committed());
405 return true;
406 }
407 MonSession *session = op->get_session();
408 if (!session) {
409 mon->reply_command(op, -EACCES, "access denied", get_last_committed());
410 return true;
411 }
412
413 string prefix;
414 cmd_getval(g_ceph_context, cmdmap, "prefix", prefix);
415
416 string format;
417 cmd_getval(g_ceph_context, cmdmap, "format", format, string("plain"));
418 boost::scoped_ptr<Formatter> f(Formatter::create(format));
419
420 if (prefix == "log last") {
421 int64_t num = 20;
422 cmd_getval(g_ceph_context, cmdmap, "num", num);
423 if (f) {
424 f->open_array_section("tail");
425 }
426
427 std::string level_str;
428 clog_type level;
429 if (cmd_getval(g_ceph_context, cmdmap, "level", level_str)) {
430 level = LogEntry::str_to_level(level_str);
431 if (level == CLOG_UNKNOWN) {
432 ss << "Invalid severity '" << level_str << "'";
433 mon->reply_command(op, -EINVAL, ss.str(), get_last_committed());
434 return true;
435 }
436 } else {
437 level = CLOG_INFO;
438 }
439
440 std::string channel;
441 if (!cmd_getval(g_ceph_context, cmdmap, "channel", channel)) {
442 channel = CLOG_CHANNEL_DEFAULT;
443 }
444
445 // We'll apply this twice, once while counting out lines
446 // and once while outputting them.
447 auto match = [level](const LogEntry &entry) {
448 return entry.prio >= level;
449 };
450
451 // Decrement operation that sets to container end when hitting rbegin
452 ostringstream ss;
453 if (channel == "*") {
454 list<LogEntry> full_tail;
455 summary.build_ordered_tail(&full_tail);
456 derr << "full " << full_tail << dendl;
457 auto rp = full_tail.rbegin();
458 for (; num > 0 && rp != full_tail.rend(); ++rp) {
459 if (match(*rp)) {
460 num--;
461 }
462 }
463 if (rp == full_tail.rend()) {
464 --rp;
465 }
466
467 // Decrement a reverse iterator such that going past rbegin()
468 // sets it to rend(). This is for writing a for() loop that
469 // goes up to (and including) rbegin()
470 auto dec = [&rp, &full_tail] () {
471 if (rp == full_tail.rbegin()) {
472 rp = full_tail.rend();
473 } else {
474 --rp;
475 }
476 };
477
478 // Move forward to the end of the container (decrement the reverse
479 // iterator).
480 for (; rp != full_tail.rend(); dec()) {
481 if (!match(*rp)) {
482 continue;
483 }
484 if (f) {
485 f->dump_object("entry", *rp);
486 } else {
487 ss << *rp << "\n";
488 }
489 }
490 } else {
491 auto p = summary.tail_by_channel.find(channel);
492 if (p != summary.tail_by_channel.end()) {
493 auto rp = p->second.rbegin();
494 for (; num > 0 && rp != p->second.rend(); ++rp) {
495 if (match(rp->second)) {
496 num--;
497 }
498 }
499 if (rp == p->second.rend()) {
500 --rp;
501 }
502
503 // Decrement a reverse iterator such that going past rbegin()
504 // sets it to rend(). This is for writing a for() loop that
505 // goes up to (and including) rbegin()
506 auto dec = [&rp, &p] () {
507 if (rp == p->second.rbegin()) {
508 rp = p->second.rend();
509 } else {
510 --rp;
511 }
512 };
513
514 // Move forward to the end of the container (decrement the reverse
515 // iterator).
516 for (; rp != p->second.rend(); dec()) {
517 if (!match(rp->second)) {
518 continue;
519 }
520 if (f) {
521 f->dump_object("entry", rp->second);
522 } else {
523 ss << rp->second << "\n";
524 }
525 }
526 }
527 }
528 if (f) {
529 f->close_section();
530 f->flush(rdata);
531 } else {
532 rdata.append(ss.str());
533 }
534 r = 0;
535 } else {
536 return false;
537 }
538
539 string rs;
540 getline(ss, rs);
541 mon->reply_command(op, r, rs, rdata, get_last_committed());
542 return true;
543 }
544
545
546 bool LogMonitor::prepare_command(MonOpRequestRef op)
547 {
548 op->mark_logmon_event("prepare_command");
549 MMonCommand *m = static_cast<MMonCommand*>(op->get_req());
550 stringstream ss;
551 string rs;
552 int err = -EINVAL;
553
554 cmdmap_t cmdmap;
555 if (!cmdmap_from_json(m->cmd, &cmdmap, ss)) {
556 // ss has reason for failure
557 string rs = ss.str();
558 mon->reply_command(op, -EINVAL, rs, get_last_committed());
559 return true;
560 }
561
562 string prefix;
563 cmd_getval(g_ceph_context, cmdmap, "prefix", prefix);
564
565 MonSession *session = op->get_session();
566 if (!session) {
567 mon->reply_command(op, -EACCES, "access denied", get_last_committed());
568 return true;
569 }
570
571 if (prefix == "log") {
572 vector<string> logtext;
573 cmd_getval(g_ceph_context, cmdmap, "logtext", logtext);
574 LogEntry le;
575 le.rank = m->get_orig_source();
576 le.addrs.v.push_back(m->get_orig_source_addr());
577 le.name = session->entity_name;
578 le.stamp = m->get_recv_stamp();
579 le.seq = 0;
580 le.prio = CLOG_INFO;
581 le.channel = CLOG_CHANNEL_DEFAULT;
582 le.msg = str_join(logtext, " ");
583 pending_summary.add(le);
584 pending_summary.prune(g_conf()->mon_log_max_summary);
585 pending_log.insert(pair<utime_t,LogEntry>(le.stamp, le));
586 wait_for_finished_proposal(op, new Monitor::C_Command(
587 mon, op, 0, string(), get_last_committed() + 1));
588 return true;
589 }
590
591 getline(ss, rs);
592 mon->reply_command(op, err, rs, get_last_committed());
593 return false;
594 }
595
596
597 int LogMonitor::sub_name_to_id(const string& n)
598 {
599 if (n.substr(0, 4) == "log-" && n.size() > 4) {
600 return LogEntry::str_to_level(n.substr(4));
601 } else {
602 return CLOG_UNKNOWN;
603 }
604 }
605
606 void LogMonitor::check_subs()
607 {
608 dout(10) << __func__ << dendl;
609 for (map<string, xlist<Subscription*>*>::iterator i = mon->session_map.subs.begin();
610 i != mon->session_map.subs.end();
611 ++i) {
612 for (xlist<Subscription*>::iterator j = i->second->begin(); !j.end(); ++j) {
613 if (sub_name_to_id((*j)->type) >= 0)
614 check_sub(*j);
615 }
616 }
617 }
618
619 void LogMonitor::check_sub(Subscription *s)
620 {
621 dout(10) << __func__ << " client wants " << s->type << " ver " << s->next << dendl;
622
623 int sub_level = sub_name_to_id(s->type);
624 ceph_assert(sub_level >= 0);
625
626 version_t summary_version = summary.version;
627 if (s->next > summary_version) {
628 dout(10) << __func__ << " client " << s->session->name
629 << " requested version (" << s->next << ") is greater than ours ("
630 << summary_version << "), which means we already sent him"
631 << " everything we have." << dendl;
632 return;
633 }
634
635 MLog *mlog = new MLog(mon->monmap->fsid);
636
637 if (s->next == 0) {
638 /* First timer, heh? */
639 _create_sub_incremental(mlog, sub_level, get_last_committed());
640 } else {
641 /* let us send you an incremental log... */
642 _create_sub_incremental(mlog, sub_level, s->next);
643 }
644
645 dout(10) << __func__ << " sending message to " << s->session->name
646 << " with " << mlog->entries.size() << " entries"
647 << " (version " << mlog->version << ")" << dendl;
648
649 if (!mlog->entries.empty()) {
650 s->session->con->send_message(mlog);
651 } else {
652 mlog->put();
653 }
654 if (s->onetime)
655 mon->session_map.remove_sub(s);
656 else
657 s->next = summary_version+1;
658 }
659
660 /**
661 * Create an incremental log message from version \p sv to \p summary.version
662 *
663 * @param mlog Log message we'll send to the client with the messages received
664 * since version \p sv, inclusive.
665 * @param level The max log level of the messages the client is interested in.
666 * @param sv The version the client is looking for.
667 */
668 void LogMonitor::_create_sub_incremental(MLog *mlog, int level, version_t sv)
669 {
670 dout(10) << __func__ << " level " << level << " ver " << sv
671 << " cur summary ver " << summary.version << dendl;
672
673 if (sv < get_first_committed()) {
674 dout(10) << __func__ << " skipped from " << sv
675 << " to first_committed " << get_first_committed() << dendl;
676 LogEntry le;
677 le.stamp = ceph_clock_now();
678 le.prio = CLOG_WARN;
679 ostringstream ss;
680 ss << "skipped log messages from " << sv << " to " << get_first_committed();
681 le.msg = ss.str();
682 mlog->entries.push_back(le);
683 sv = get_first_committed();
684 }
685
686 version_t summary_ver = summary.version;
687 while (sv && sv <= summary_ver) {
688 bufferlist bl;
689 int err = get_version(sv, bl);
690 ceph_assert(err == 0);
691 ceph_assert(bl.length());
692 auto p = bl.cbegin();
693 __u8 v;
694 decode(v,p);
695 while (!p.end()) {
696 LogEntry le;
697 le.decode(p);
698
699 if (le.prio < level) {
700 dout(20) << __func__ << " requested " << level
701 << " entry " << le.prio << dendl;
702 continue;
703 }
704
705 mlog->entries.push_back(le);
706 }
707 mlog->version = sv++;
708 }
709
710 dout(10) << __func__ << " incremental message ready ("
711 << mlog->entries.size() << " entries)" << dendl;
712 }
713
714 void LogMonitor::update_log_channels()
715 {
716 ostringstream oss;
717
718 channels.clear();
719
720 int r = get_conf_str_map_helper(g_conf()->mon_cluster_log_to_syslog,
721 oss, &channels.log_to_syslog,
722 CLOG_CONFIG_DEFAULT_KEY);
723 if (r < 0) {
724 derr << __func__ << " error parsing 'mon_cluster_log_to_syslog'" << dendl;
725 return;
726 }
727
728 r = get_conf_str_map_helper(g_conf()->mon_cluster_log_to_syslog_level,
729 oss, &channels.syslog_level,
730 CLOG_CONFIG_DEFAULT_KEY);
731 if (r < 0) {
732 derr << __func__ << " error parsing 'mon_cluster_log_to_syslog_level'"
733 << dendl;
734 return;
735 }
736
737 r = get_conf_str_map_helper(g_conf()->mon_cluster_log_to_syslog_facility,
738 oss, &channels.syslog_facility,
739 CLOG_CONFIG_DEFAULT_KEY);
740 if (r < 0) {
741 derr << __func__ << " error parsing 'mon_cluster_log_to_syslog_facility'"
742 << dendl;
743 return;
744 }
745
746 r = get_conf_str_map_helper(g_conf()->mon_cluster_log_file, oss,
747 &channels.log_file,
748 CLOG_CONFIG_DEFAULT_KEY);
749 if (r < 0) {
750 derr << __func__ << " error parsing 'mon_cluster_log_file'" << dendl;
751 return;
752 }
753
754 r = get_conf_str_map_helper(g_conf()->mon_cluster_log_file_level, oss,
755 &channels.log_file_level,
756 CLOG_CONFIG_DEFAULT_KEY);
757 if (r < 0) {
758 derr << __func__ << " error parsing 'mon_cluster_log_file_level'"
759 << dendl;
760 return;
761 }
762
763 r = get_conf_str_map_helper(g_conf()->mon_cluster_log_to_graylog, oss,
764 &channels.log_to_graylog,
765 CLOG_CONFIG_DEFAULT_KEY);
766 if (r < 0) {
767 derr << __func__ << " error parsing 'mon_cluster_log_to_graylog'"
768 << dendl;
769 return;
770 }
771
772 r = get_conf_str_map_helper(g_conf()->mon_cluster_log_to_graylog_host, oss,
773 &channels.log_to_graylog_host,
774 CLOG_CONFIG_DEFAULT_KEY);
775 if (r < 0) {
776 derr << __func__ << " error parsing 'mon_cluster_log_to_graylog_host'"
777 << dendl;
778 return;
779 }
780
781 r = get_conf_str_map_helper(g_conf()->mon_cluster_log_to_graylog_port, oss,
782 &channels.log_to_graylog_port,
783 CLOG_CONFIG_DEFAULT_KEY);
784 if (r < 0) {
785 derr << __func__ << " error parsing 'mon_cluster_log_to_graylog_port'"
786 << dendl;
787 return;
788 }
789
790 channels.expand_channel_meta();
791 }
792
793 void LogMonitor::log_channel_info::expand_channel_meta(map<string,string> &m)
794 {
795 generic_dout(20) << __func__ << " expand map: " << m << dendl;
796 for (map<string,string>::iterator p = m.begin(); p != m.end(); ++p) {
797 m[p->first] = expand_channel_meta(p->second, p->first);
798 }
799 generic_dout(20) << __func__ << " expanded map: " << m << dendl;
800 }
801
802 string LogMonitor::log_channel_info::expand_channel_meta(
803 const string &input,
804 const string &change_to)
805 {
806 size_t pos = string::npos;
807 string s(input);
808 while ((pos = s.find(LOG_META_CHANNEL)) != string::npos) {
809 string tmp = s.substr(0, pos) + change_to;
810 if (pos+LOG_META_CHANNEL.length() < s.length())
811 tmp += s.substr(pos+LOG_META_CHANNEL.length());
812 s = tmp;
813 }
814 generic_dout(20) << __func__ << " from '" << input
815 << "' to '" << s << "'" << dendl;
816
817 return s;
818 }
819
820 bool LogMonitor::log_channel_info::do_log_to_syslog(const string &channel) {
821 string v = get_str_map_key(log_to_syslog, channel,
822 &CLOG_CONFIG_DEFAULT_KEY);
823 // We expect booleans, but they are in k/v pairs, kept
824 // as strings, in 'log_to_syslog'. We must ensure
825 // compatibility with existing boolean handling, and so
826 // we are here using a modified version of how
827 // md_config_t::set_val_raw() handles booleans. We will
828 // accept both 'true' and 'false', but will also check for
829 // '1' and '0'. The main distiction between this and the
830 // original code is that we will assume everything not '1',
831 // '0', 'true' or 'false' to be 'false'.
832 bool ret = false;
833
834 if (boost::iequals(v, "false")) {
835 ret = false;
836 } else if (boost::iequals(v, "true")) {
837 ret = true;
838 } else {
839 std::string err;
840 int b = strict_strtol(v.c_str(), 10, &err);
841 ret = (err.empty() && b == 1);
842 }
843
844 return ret;
845 }
846
847 ceph::logging::Graylog::Ref LogMonitor::log_channel_info::get_graylog(
848 const string &channel)
849 {
850 generic_dout(25) << __func__ << " for channel '"
851 << channel << "'" << dendl;
852
853 if (graylogs.count(channel) == 0) {
854 auto graylog(std::make_shared<ceph::logging::Graylog>("mon"));
855
856 graylog->set_fsid(g_conf().get_val<uuid_d>("fsid"));
857 graylog->set_hostname(g_conf()->host);
858 graylog->set_destination(get_str_map_key(log_to_graylog_host, channel,
859 &CLOG_CONFIG_DEFAULT_KEY),
860 atoi(get_str_map_key(log_to_graylog_port, channel,
861 &CLOG_CONFIG_DEFAULT_KEY).c_str()));
862
863 graylogs[channel] = graylog;
864 generic_dout(20) << __func__ << " for channel '"
865 << channel << "' to graylog host '"
866 << log_to_graylog_host[channel] << ":"
867 << log_to_graylog_port[channel]
868 << "'" << dendl;
869 }
870 return graylogs[channel];
871 }
872
873 void LogMonitor::handle_conf_change(const ConfigProxy& conf,
874 const std::set<std::string> &changed)
875 {
876 if (changed.count("mon_cluster_log_to_syslog") ||
877 changed.count("mon_cluster_log_to_syslog_level") ||
878 changed.count("mon_cluster_log_to_syslog_facility") ||
879 changed.count("mon_cluster_log_file") ||
880 changed.count("mon_cluster_log_file_level") ||
881 changed.count("mon_cluster_log_to_graylog") ||
882 changed.count("mon_cluster_log_to_graylog_host") ||
883 changed.count("mon_cluster_log_to_graylog_port")) {
884 update_log_channels();
885 }
886 }