]> git.proxmox.com Git - ceph.git/blame - ceph/src/mon/LogMonitor.cc
update sources to v12.1.1
[ceph.git] / ceph / src / mon / LogMonitor.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15#include <boost/algorithm/string/predicate.hpp>
16
17#include <sstream>
18#include <syslog.h>
19
20#include "LogMonitor.h"
21#include "Monitor.h"
22#include "MonitorDBStore.h"
23
24#include "messages/MMonCommand.h"
25#include "messages/MLog.h"
26#include "messages/MLogAck.h"
27#include "common/Graylog.h"
28#include "common/errno.h"
29#include "common/strtol.h"
30#include "include/assert.h"
31#include "include/str_list.h"
32#include "include/str_map.h"
33#include "include/compat.h"
34
35#define dout_subsys ceph_subsys_mon
36#undef dout_prefix
37#define dout_prefix _prefix(_dout, mon, get_last_committed())
38static ostream& _prefix(std::ostream *_dout, Monitor *mon, version_t v) {
39 return *_dout << "mon." << mon->name << "@" << mon->rank
40 << "(" << mon->get_state_name()
41 << ").log v" << v << " ";
42}
43
44ostream& operator<<(ostream &out, const LogMonitor &pm)
45{
46 return out << "log";
47}
48
49/*
50 Tick function to update the map based on performance every N seconds
51*/
52
53void LogMonitor::tick()
54{
55 if (!is_active()) return;
56
57 dout(10) << *this << dendl;
58
59}
60
61void LogMonitor::create_initial()
62{
63 dout(10) << "create_initial -- creating initial map" << dendl;
64 LogEntry e;
65 memset(&e.who, 0, sizeof(e.who));
31f18b77 66 e.name = g_conf->name;
7c673cae
FG
67 e.stamp = ceph_clock_now();
68 e.prio = CLOG_INFO;
69 std::stringstream ss;
70 ss << "mkfs " << mon->monmap->get_fsid();
71 e.msg = ss.str();
72 e.seq = 0;
73 pending_log.insert(pair<utime_t,LogEntry>(e.stamp, e));
74}
75
76void LogMonitor::update_from_paxos(bool *need_bootstrap)
77{
78 dout(10) << __func__ << dendl;
79 version_t version = get_last_committed();
80 dout(10) << __func__ << " version " << version
81 << " summary v " << summary.version << dendl;
82 if (version == summary.version)
83 return;
84 assert(version >= summary.version);
85
86 map<string,bufferlist> channel_blog;
87
88 version_t latest_full = get_version_latest_full();
89 dout(10) << __func__ << " latest full " << latest_full << dendl;
90 if ((latest_full > 0) && (latest_full > summary.version)) {
91 bufferlist latest_bl;
92 get_version_full(latest_full, latest_bl);
93 assert(latest_bl.length() != 0);
94 dout(7) << __func__ << " loading summary e" << latest_full << dendl;
95 bufferlist::iterator p = latest_bl.begin();
96 ::decode(summary, p);
97 dout(7) << __func__ << " loaded summary e" << summary.version << dendl;
98 }
99
100 // walk through incrementals
101 while (version > summary.version) {
102 bufferlist bl;
103 int err = get_version(summary.version+1, bl);
104 assert(err == 0);
105 assert(bl.length());
106
107 bufferlist::iterator p = bl.begin();
108 __u8 v;
109 ::decode(v, p);
110 while (!p.end()) {
111 LogEntry le;
112 le.decode(p);
113 dout(7) << "update_from_paxos applying incremental log " << summary.version+1 << " " << le << dendl;
114
115 string channel = le.channel;
116 if (channel.empty()) // keep retrocompatibility
117 channel = CLOG_CHANNEL_CLUSTER;
118
119 if (channels.do_log_to_syslog(channel)) {
120 string level = channels.get_level(channel);
121 string facility = channels.get_facility(channel);
122 if (level.empty() || facility.empty()) {
123 derr << __func__ << " unable to log to syslog -- level or facility"
124 << " not defined (level: " << level << ", facility: "
125 << facility << ")" << dendl;
126 continue;
127 }
128 le.log_to_syslog(channels.get_level(channel),
129 channels.get_facility(channel));
130 }
131
132 if (channels.do_log_to_graylog(channel)) {
133 ceph::logging::Graylog::Ref graylog = channels.get_graylog(channel);
134 if (graylog) {
135 graylog->log_log_entry(&le);
136 }
137 dout(7) << "graylog: " << channel << " " << graylog
138 << " host:" << channels.log_to_graylog_host << dendl;
139 }
140
141 string log_file = channels.get_log_file(channel);
142 dout(20) << __func__ << " logging for channel '" << channel
143 << "' to file '" << log_file << "'" << dendl;
144
145 if (!log_file.empty()) {
146 string log_file_level = channels.get_log_file_level(channel);
147 if (log_file_level.empty()) {
148 dout(1) << __func__ << " warning: log file level not defined for"
149 << " channel '" << channel << "' yet a log file is --"
150 << " will assume lowest level possible" << dendl;
151 }
152
153 int min = string_to_syslog_level(log_file_level);
154 int l = clog_type_to_syslog_level(le.prio);
155 if (l <= min) {
156 stringstream ss;
157 ss << le << "\n";
158 // init entry if DNE
159 bufferlist &blog = channel_blog[channel];
160 blog.append(ss.str());
161 }
162 }
163
164 summary.add(le);
165 }
166
167 summary.version++;
31f18b77 168 summary.prune(g_conf->mon_log_max_summary);
7c673cae
FG
169 }
170
171 dout(15) << __func__ << " logging for "
172 << channel_blog.size() << " channels" << dendl;
173 for(map<string,bufferlist>::iterator p = channel_blog.begin();
174 p != channel_blog.end(); ++p) {
175 if (!p->second.length()) {
176 dout(15) << __func__ << " channel '" << p->first
177 << "': nothing to log" << dendl;
178 continue;
179 }
180
181 dout(15) << __func__ << " channel '" << p->first
182 << "' logging " << p->second.length() << " bytes" << dendl;
183 string log_file = channels.get_log_file(p->first);
184
185 int fd = ::open(log_file.c_str(), O_WRONLY|O_APPEND|O_CREAT, 0600);
186 if (fd < 0) {
187 int err = -errno;
188 dout(1) << "unable to write to '" << log_file << "' for channel '"
189 << p->first << "': " << cpp_strerror(err) << dendl;
190 } else {
191 int err = p->second.write_fd(fd);
192 if (err < 0) {
193 dout(1) << "error writing to '" << log_file << "' for channel '"
194 << p->first << ": " << cpp_strerror(err) << dendl;
195 }
196 VOID_TEMP_FAILURE_RETRY(::close(fd));
197 }
198 }
199
200 check_subs();
201}
202
203void LogMonitor::create_pending()
204{
205 pending_log.clear();
206 pending_summary = summary;
207 dout(10) << "create_pending v " << (get_last_committed() + 1) << dendl;
208}
209
210void LogMonitor::encode_pending(MonitorDBStore::TransactionRef t)
211{
212 version_t version = get_last_committed() + 1;
213 bufferlist bl;
214 dout(10) << __func__ << " v" << version << dendl;
215 __u8 v = 1;
216 ::encode(v, bl);
217 multimap<utime_t,LogEntry>::iterator p;
218 for (p = pending_log.begin(); p != pending_log.end(); ++p)
219 p->second.encode(bl, mon->get_quorum_con_features());
220
221 put_version(t, version, bl);
222 put_last_committed(t, version);
223}
224
225void LogMonitor::encode_full(MonitorDBStore::TransactionRef t)
226{
227 dout(10) << __func__ << " log v " << summary.version << dendl;
228 assert(get_last_committed() == summary.version);
229
230 bufferlist summary_bl;
231 ::encode(summary, summary_bl, mon->get_quorum_con_features());
232
233 put_version_full(t, summary.version, summary_bl);
234 put_version_latest_full(t, summary.version);
235}
236
237version_t LogMonitor::get_trim_to()
238{
239 if (!mon->is_leader())
240 return 0;
241
242 unsigned max = g_conf->mon_max_log_epochs;
243 version_t version = get_last_committed();
244 if (version > max)
245 return version - max;
246 return 0;
247}
248
249bool LogMonitor::preprocess_query(MonOpRequestRef op)
250{
251 op->mark_logmon_event("preprocess_query");
252 PaxosServiceMessage *m = static_cast<PaxosServiceMessage*>(op->get_req());
253 dout(10) << "preprocess_query " << *m << " from " << m->get_orig_source_inst() << dendl;
254 switch (m->get_type()) {
255 case MSG_MON_COMMAND:
256 return preprocess_command(op);
257
258 case MSG_LOG:
259 return preprocess_log(op);
260
261 default:
262 ceph_abort();
263 return true;
264 }
265}
266
267bool LogMonitor::prepare_update(MonOpRequestRef op)
268{
269 op->mark_logmon_event("prepare_update");
270 PaxosServiceMessage *m = static_cast<PaxosServiceMessage*>(op->get_req());
271 dout(10) << "prepare_update " << *m << " from " << m->get_orig_source_inst() << dendl;
272 switch (m->get_type()) {
273 case MSG_MON_COMMAND:
274 return prepare_command(op);
275 case MSG_LOG:
276 return prepare_log(op);
277 default:
278 ceph_abort();
279 return false;
280 }
281}
282
283bool LogMonitor::preprocess_log(MonOpRequestRef op)
284{
285 op->mark_logmon_event("preprocess_log");
286 MLog *m = static_cast<MLog*>(op->get_req());
287 dout(10) << "preprocess_log " << *m << " from " << m->get_orig_source() << dendl;
288 int num_new = 0;
289
290 MonSession *session = m->get_session();
291 if (!session)
292 goto done;
293 if (!session->is_capable("log", MON_CAP_W)) {
294 dout(0) << "preprocess_log got MLog from entity with insufficient privileges "
295 << session->caps << dendl;
296 goto done;
297 }
298
299 for (deque<LogEntry>::iterator p = m->entries.begin();
300 p != m->entries.end();
301 ++p) {
302 if (!pending_summary.contains(p->key()))
303 num_new++;
304 }
305 if (!num_new) {
306 dout(10) << " nothing new" << dendl;
307 goto done;
308 }
309
310 return false;
311
312 done:
313 return true;
314}
315
316struct LogMonitor::C_Log : public C_MonOp {
317 LogMonitor *logmon;
318 C_Log(LogMonitor *p, MonOpRequestRef o) :
319 C_MonOp(o), logmon(p) {}
320 void _finish(int r) override {
321 if (r == -ECANCELED) {
322 return;
323 }
324 logmon->_updated_log(op);
325 }
326};
327
328bool LogMonitor::prepare_log(MonOpRequestRef op)
329{
330 op->mark_logmon_event("prepare_log");
331 MLog *m = static_cast<MLog*>(op->get_req());
332 dout(10) << "prepare_log " << *m << " from " << m->get_orig_source() << dendl;
333
334 if (m->fsid != mon->monmap->fsid) {
335 dout(0) << "handle_log on fsid " << m->fsid << " != " << mon->monmap->fsid
336 << dendl;
337 return false;
338 }
339
340 for (deque<LogEntry>::iterator p = m->entries.begin();
341 p != m->entries.end();
342 ++p) {
343 dout(10) << " logging " << *p << dendl;
344 if (!pending_summary.contains(p->key())) {
345 pending_summary.add(*p);
346 pending_log.insert(pair<utime_t,LogEntry>(p->stamp, *p));
347 }
348 }
31f18b77 349 pending_summary.prune(g_conf->mon_log_max_summary);
7c673cae
FG
350 wait_for_finished_proposal(op, new C_Log(this, op));
351 return true;
352}
353
354void LogMonitor::_updated_log(MonOpRequestRef op)
355{
356 MLog *m = static_cast<MLog*>(op->get_req());
357 dout(7) << "_updated_log for " << m->get_orig_source_inst() << dendl;
358 mon->send_reply(op, new MLogAck(m->fsid, m->entries.rbegin()->seq));
359}
360
361bool LogMonitor::should_propose(double& delay)
362{
363 // commit now if we have a lot of pending events
364 if (g_conf->mon_max_log_entries_per_event > 0 &&
365 pending_log.size() >= (unsigned)g_conf->mon_max_log_entries_per_event)
366 return true;
367
368 // otherwise fall back to generic policy
369 return PaxosService::should_propose(delay);
370}
371
372
373bool LogMonitor::preprocess_command(MonOpRequestRef op)
374{
375 op->mark_logmon_event("preprocess_command");
31f18b77
FG
376 MMonCommand *m = static_cast<MMonCommand*>(op->get_req());
377 int r = -EINVAL;
7c673cae
FG
378 bufferlist rdata;
379 stringstream ss;
380
31f18b77
FG
381 map<string, cmd_vartype> cmdmap;
382 if (!cmdmap_from_json(m->cmd, &cmdmap, ss)) {
383 string rs = ss.str();
384 mon->reply_command(op, -EINVAL, rs, get_last_committed());
385 return true;
386 }
387 MonSession *session = m->get_session();
388 if (!session) {
389 mon->reply_command(op, -EACCES, "access denied", get_last_committed());
7c673cae 390 return true;
31f18b77
FG
391 }
392
393 string prefix;
394 cmd_getval(g_ceph_context, cmdmap, "prefix", prefix);
395
396 string format;
397 cmd_getval(g_ceph_context, cmdmap, "format", format, string("plain"));
398 boost::scoped_ptr<Formatter> f(Formatter::create(format));
399
400 if (prefix == "log last") {
401 int64_t num = 20;
402 cmd_getval(g_ceph_context, cmdmap, "num", num);
403 if (f) {
404 f->open_array_section("tail");
405 }
224ce89b
WB
406
407 std::string level_str;
408 clog_type level;
409 if (cmd_getval(g_ceph_context, cmdmap, "level", level_str)) {
410 level = LogEntry::str_to_level(level_str);
411 if (level == CLOG_UNKNOWN) {
412 ss << "Invalid severity '" << level_str << "'";
413 mon->reply_command(op, -EINVAL, ss.str(), get_last_committed());
414 return true;
415 }
416 } else {
417 level = CLOG_INFO;
418 }
419
420 std::string channel;
421 if (!cmd_getval(g_ceph_context, cmdmap, "channel", channel)) {
422 channel = CLOG_CHANNEL_DEFAULT;
423 }
424
425 // We'll apply this twice, once while counting out lines
426 // and once while outputting them.
427 auto match = [level, channel](const LogEntry &entry) {
428 return entry.prio >= level && (entry.channel == channel || channel == "*");
429 };
430
31f18b77
FG
431 auto p = summary.tail.end();
432 while (num > 0 && p != summary.tail.begin()) {
224ce89b
WB
433 if (match(*p)) {
434 num--;
435 }
31f18b77
FG
436 --p;
437 }
438 ostringstream ss;
439 for ( ; p != summary.tail.end(); ++p) {
224ce89b
WB
440 if (!match(*p)) {
441 continue;
442 }
443
31f18b77
FG
444 if (f) {
445 f->dump_object("entry", *p);
446 } else {
447 ss << *p << "\n";
448 }
449 }
450 if (f) {
451 f->close_section();
452 f->flush(rdata);
453 } else {
454 rdata.append(ss.str());
455 }
456 r = 0;
457 } else {
7c673cae 458 return false;
31f18b77
FG
459 }
460
461 string rs;
462 getline(ss, rs);
463 mon->reply_command(op, r, rs, rdata, get_last_committed());
464 return true;
7c673cae
FG
465}
466
467
468bool LogMonitor::prepare_command(MonOpRequestRef op)
469{
470 op->mark_logmon_event("prepare_command");
471 MMonCommand *m = static_cast<MMonCommand*>(op->get_req());
472 stringstream ss;
473 string rs;
474 int err = -EINVAL;
475
476 map<string, cmd_vartype> cmdmap;
477 if (!cmdmap_from_json(m->cmd, &cmdmap, ss)) {
478 // ss has reason for failure
479 string rs = ss.str();
480 mon->reply_command(op, -EINVAL, rs, get_last_committed());
481 return true;
482 }
483
484 string prefix;
485 cmd_getval(g_ceph_context, cmdmap, "prefix", prefix);
486
487 MonSession *session = m->get_session();
488 if (!session) {
489 mon->reply_command(op, -EACCES, "access denied", get_last_committed());
490 return true;
491 }
492
493 if (prefix == "log") {
494 vector<string> logtext;
495 cmd_getval(g_ceph_context, cmdmap, "logtext", logtext);
496 LogEntry le;
497 le.who = m->get_orig_source_inst();
31f18b77 498 le.name = session->entity_name;
7c673cae
FG
499 le.stamp = m->get_recv_stamp();
500 le.seq = 0;
501 le.prio = CLOG_INFO;
224ce89b 502 le.channel = CLOG_CHANNEL_DEFAULT;
7c673cae
FG
503 le.msg = str_join(logtext, " ");
504 pending_summary.add(le);
31f18b77 505 pending_summary.prune(g_conf->mon_log_max_summary);
7c673cae
FG
506 pending_log.insert(pair<utime_t,LogEntry>(le.stamp, le));
507 wait_for_finished_proposal(op, new Monitor::C_Command(
508 mon, op, 0, string(), get_last_committed() + 1));
509 return true;
510 }
511
512 getline(ss, rs);
513 mon->reply_command(op, err, rs, get_last_committed());
514 return false;
515}
516
517
518int LogMonitor::sub_name_to_id(const string& n)
519{
224ce89b
WB
520 if (n.substr(0, 4) == "log-" && n.size() > 4) {
521 return LogEntry::str_to_level(n.substr(4));
522 } else {
523 return CLOG_UNKNOWN;
524 }
7c673cae
FG
525}
526
527void LogMonitor::check_subs()
528{
529 dout(10) << __func__ << dendl;
530 for (map<string, xlist<Subscription*>*>::iterator i = mon->session_map.subs.begin();
531 i != mon->session_map.subs.end();
532 ++i) {
533 for (xlist<Subscription*>::iterator j = i->second->begin(); !j.end(); ++j) {
534 if (sub_name_to_id((*j)->type) >= 0)
535 check_sub(*j);
536 }
537 }
538}
539
540void LogMonitor::check_sub(Subscription *s)
541{
542 dout(10) << __func__ << " client wants " << s->type << " ver " << s->next << dendl;
543
544 int sub_level = sub_name_to_id(s->type);
545 assert(sub_level >= 0);
546
547 version_t summary_version = summary.version;
548 if (s->next > summary_version) {
549 dout(10) << __func__ << " client " << s->session->inst
550 << " requested version (" << s->next << ") is greater than ours ("
551 << summary_version << "), which means we already sent him"
552 << " everything we have." << dendl;
553 return;
554 }
555
556 MLog *mlog = new MLog(mon->monmap->fsid);
557
558 if (s->next == 0) {
559 /* First timer, heh? */
560 bool ret = _create_sub_summary(mlog, sub_level);
561 if (!ret) {
562 dout(1) << __func__ << " ret = " << ret << dendl;
563 mlog->put();
564 return;
565 }
566 } else {
567 /* let us send you an incremental log... */
568 _create_sub_incremental(mlog, sub_level, s->next);
569 }
570
571 dout(1) << __func__ << " sending message to " << s->session->inst
572 << " with " << mlog->entries.size() << " entries"
573 << " (version " << mlog->version << ")" << dendl;
574
575 if (!mlog->entries.empty()) {
576 s->session->con->send_message(mlog);
577 } else {
578 mlog->put();
579 }
580 if (s->onetime)
581 mon->session_map.remove_sub(s);
582 else
583 s->next = summary_version+1;
584}
585
586/**
587 * Create a log message containing only the last message in the summary.
588 *
589 * @param mlog Log message we'll send to the client.
590 * @param level Maximum log level the client is interested in.
591 * @return 'true' if we consider we successfully populated @mlog;
592 * 'false' otherwise.
593 */
594bool LogMonitor::_create_sub_summary(MLog *mlog, int level)
595{
596 dout(10) << __func__ << dendl;
597
598 assert(mlog != NULL);
599
600 if (!summary.tail.size())
601 return false;
602
603 list<LogEntry>::reverse_iterator it = summary.tail.rbegin();
604 for (; it != summary.tail.rend(); ++it) {
605 LogEntry e = *it;
606 if (e.prio < level)
607 continue;
608
609 mlog->entries.push_back(e);
610 mlog->version = summary.version;
611 break;
612 }
613
614 return true;
615}
616
617/**
618 * Create an incremental log message from version \p sv to \p summary.version
619 *
620 * @param mlog Log message we'll send to the client with the messages received
621 * since version \p sv, inclusive.
622 * @param level The max log level of the messages the client is interested in.
623 * @param sv The version the client is looking for.
624 */
625void LogMonitor::_create_sub_incremental(MLog *mlog, int level, version_t sv)
626{
627 dout(10) << __func__ << " level " << level << " ver " << sv
628 << " cur summary ver " << summary.version << dendl;
629
630 if (sv < get_first_committed()) {
631 dout(10) << __func__ << " skipped from " << sv
632 << " to first_committed " << get_first_committed() << dendl;
633 LogEntry le;
634 le.stamp = ceph_clock_now();
635 le.prio = CLOG_WARN;
636 ostringstream ss;
637 ss << "skipped log messages from " << sv << " to " << get_first_committed();
638 le.msg = ss.str();
639 mlog->entries.push_back(le);
640 sv = get_first_committed();
641 }
642
643 version_t summary_ver = summary.version;
644 while (sv <= summary_ver) {
645 bufferlist bl;
646 int err = get_version(sv, bl);
647 assert(err == 0);
648 assert(bl.length());
649 bufferlist::iterator p = bl.begin();
650 __u8 v;
651 ::decode(v,p);
652 while (!p.end()) {
653 LogEntry le;
654 le.decode(p);
655
656 if (le.prio < level) {
657 dout(20) << __func__ << " requested " << level
658 << " entry " << le.prio << dendl;
659 continue;
660 }
661
662 mlog->entries.push_back(le);
663 }
664 mlog->version = sv++;
665 }
666
667 dout(10) << __func__ << " incremental message ready ("
668 << mlog->entries.size() << " entries)" << dendl;
669}
670
671void LogMonitor::update_log_channels()
672{
673 ostringstream oss;
674
675 channels.clear();
676
677 int r = get_conf_str_map_helper(g_conf->mon_cluster_log_to_syslog,
678 oss, &channels.log_to_syslog,
679 CLOG_CONFIG_DEFAULT_KEY);
680 if (r < 0) {
681 derr << __func__ << " error parsing 'mon_cluster_log_to_syslog'" << dendl;
682 return;
683 }
684
685 r = get_conf_str_map_helper(g_conf->mon_cluster_log_to_syslog_level,
686 oss, &channels.syslog_level,
687 CLOG_CONFIG_DEFAULT_KEY);
688 if (r < 0) {
689 derr << __func__ << " error parsing 'mon_cluster_log_to_syslog_level'"
690 << dendl;
691 return;
692 }
693
694 r = get_conf_str_map_helper(g_conf->mon_cluster_log_to_syslog_facility,
695 oss, &channels.syslog_facility,
696 CLOG_CONFIG_DEFAULT_KEY);
697 if (r < 0) {
698 derr << __func__ << " error parsing 'mon_cluster_log_to_syslog_facility'"
699 << dendl;
700 return;
701 }
702
703 r = get_conf_str_map_helper(g_conf->mon_cluster_log_file, oss,
704 &channels.log_file,
705 CLOG_CONFIG_DEFAULT_KEY);
706 if (r < 0) {
707 derr << __func__ << " error parsing 'mon_cluster_log_file'" << dendl;
708 return;
709 }
710
711 r = get_conf_str_map_helper(g_conf->mon_cluster_log_file_level, oss,
712 &channels.log_file_level,
713 CLOG_CONFIG_DEFAULT_KEY);
714 if (r < 0) {
715 derr << __func__ << " error parsing 'mon_cluster_log_file_level'"
716 << dendl;
717 return;
718 }
719
720 r = get_conf_str_map_helper(g_conf->mon_cluster_log_to_graylog, oss,
721 &channels.log_to_graylog,
722 CLOG_CONFIG_DEFAULT_KEY);
723 if (r < 0) {
724 derr << __func__ << " error parsing 'mon_cluster_log_to_graylog'"
725 << dendl;
726 return;
727 }
728
729 r = get_conf_str_map_helper(g_conf->mon_cluster_log_to_graylog_host, oss,
730 &channels.log_to_graylog_host,
731 CLOG_CONFIG_DEFAULT_KEY);
732 if (r < 0) {
733 derr << __func__ << " error parsing 'mon_cluster_log_to_graylog_host'"
734 << dendl;
735 return;
736 }
737
738 r = get_conf_str_map_helper(g_conf->mon_cluster_log_to_graylog_port, oss,
739 &channels.log_to_graylog_port,
740 CLOG_CONFIG_DEFAULT_KEY);
741 if (r < 0) {
742 derr << __func__ << " error parsing 'mon_cluster_log_to_graylog_port'"
743 << dendl;
744 return;
745 }
746
747 channels.expand_channel_meta();
748}
749
750void LogMonitor::log_channel_info::expand_channel_meta(map<string,string> &m)
751{
752 generic_dout(20) << __func__ << " expand map: " << m << dendl;
753 for (map<string,string>::iterator p = m.begin(); p != m.end(); ++p) {
754 m[p->first] = expand_channel_meta(p->second, p->first);
755 }
756 generic_dout(20) << __func__ << " expanded map: " << m << dendl;
757}
758
759string LogMonitor::log_channel_info::expand_channel_meta(
760 const string &input,
761 const string &change_to)
762{
763 size_t pos = string::npos;
764 string s(input);
765 while ((pos = s.find(LOG_META_CHANNEL)) != string::npos) {
766 string tmp = s.substr(0, pos) + change_to;
767 if (pos+LOG_META_CHANNEL.length() < s.length())
768 tmp += s.substr(pos+LOG_META_CHANNEL.length());
769 s = tmp;
770 }
771 generic_dout(20) << __func__ << " from '" << input
772 << "' to '" << s << "'" << dendl;
773
774 return s;
775}
776
777bool LogMonitor::log_channel_info::do_log_to_syslog(const string &channel) {
778 string v = get_str_map_key(log_to_syslog, channel,
779 &CLOG_CONFIG_DEFAULT_KEY);
780 // We expect booleans, but they are in k/v pairs, kept
781 // as strings, in 'log_to_syslog'. We must ensure
782 // compatibility with existing boolean handling, and so
783 // we are here using a modified version of how
784 // md_config_t::set_val_raw() handles booleans. We will
785 // accept both 'true' and 'false', but will also check for
786 // '1' and '0'. The main distiction between this and the
787 // original code is that we will assume everything not '1',
788 // '0', 'true' or 'false' to be 'false'.
789 bool ret = false;
790
791 if (boost::iequals(v, "false")) {
792 ret = false;
793 } else if (boost::iequals(v, "true")) {
794 ret = true;
795 } else {
796 std::string err;
797 int b = strict_strtol(v.c_str(), 10, &err);
798 ret = (err.empty() && b == 1);
799 }
800
801 return ret;
802}
803
804ceph::logging::Graylog::Ref LogMonitor::log_channel_info::get_graylog(
805 const string &channel)
806{
807 generic_dout(25) << __func__ << " for channel '"
808 << channel << "'" << dendl;
809
810 if (graylogs.count(channel) == 0) {
811 auto graylog(std::make_shared<ceph::logging::Graylog>("mon"));
812
813 graylog->set_fsid(g_conf->fsid);
814 graylog->set_hostname(g_conf->host);
815 graylog->set_destination(get_str_map_key(log_to_graylog_host, channel,
816 &CLOG_CONFIG_DEFAULT_KEY),
817 atoi(get_str_map_key(log_to_graylog_port, channel,
818 &CLOG_CONFIG_DEFAULT_KEY).c_str()));
819
820 graylogs[channel] = graylog;
821 generic_dout(20) << __func__ << " for channel '"
822 << channel << "' to graylog host '"
823 << log_to_graylog_host[channel] << ":"
824 << log_to_graylog_port[channel]
825 << "'" << dendl;
826 }
827 return graylogs[channel];
828}
829
830void LogMonitor::handle_conf_change(const struct md_config_t *conf,
831 const std::set<std::string> &changed)
832{
833 if (changed.count("mon_cluster_log_to_syslog") ||
834 changed.count("mon_cluster_log_to_syslog_level") ||
835 changed.count("mon_cluster_log_to_syslog_facility") ||
836 changed.count("mon_cluster_log_file") ||
837 changed.count("mon_cluster_log_file_level") ||
838 changed.count("mon_cluster_log_to_graylog") ||
839 changed.count("mon_cluster_log_to_graylog_host") ||
840 changed.count("mon_cluster_log_to_graylog_port")) {
841 update_log_channels();
842 }
843}