]> git.proxmox.com Git - ceph.git/blob - ceph/src/mon/LogMonitor.cc
update sources to 12.2.8
[ceph.git] / ceph / src / mon / LogMonitor.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #include <boost/algorithm/string/predicate.hpp>
16
17 #include <sstream>
18 #include <syslog.h>
19
20 #include "LogMonitor.h"
21 #include "Monitor.h"
22 #include "MonitorDBStore.h"
23
24 #include "messages/MMonCommand.h"
25 #include "messages/MLog.h"
26 #include "messages/MLogAck.h"
27 #include "common/Graylog.h"
28 #include "common/errno.h"
29 #include "common/strtol.h"
30 #include "include/assert.h"
31 #include "include/str_list.h"
32 #include "include/str_map.h"
33 #include "include/compat.h"
34
35 #define dout_subsys ceph_subsys_mon
36 #undef dout_prefix
37 #define dout_prefix _prefix(_dout, mon, get_last_committed())
38 static ostream& _prefix(std::ostream *_dout, Monitor *mon, version_t v) {
39 return *_dout << "mon." << mon->name << "@" << mon->rank
40 << "(" << mon->get_state_name()
41 << ").log v" << v << " ";
42 }
43
44 ostream& operator<<(ostream &out, const LogMonitor &pm)
45 {
46 return out << "log";
47 }
48
49 /*
50 Tick function to update the map based on performance every N seconds
51 */
52
53 void LogMonitor::tick()
54 {
55 if (!is_active()) return;
56
57 dout(10) << *this << dendl;
58
59 }
60
61 void LogMonitor::create_initial()
62 {
63 dout(10) << "create_initial -- creating initial map" << dendl;
64 LogEntry e;
65 e.name = g_conf->name;
66 e.stamp = ceph_clock_now();
67 e.prio = CLOG_INFO;
68 std::stringstream ss;
69 ss << "mkfs " << mon->monmap->get_fsid();
70 e.msg = ss.str();
71 e.seq = 0;
72 pending_log.insert(pair<utime_t,LogEntry>(e.stamp, e));
73 }
74
75 void LogMonitor::update_from_paxos(bool *need_bootstrap)
76 {
77 dout(10) << __func__ << dendl;
78 version_t version = get_last_committed();
79 dout(10) << __func__ << " version " << version
80 << " summary v " << summary.version << dendl;
81 if (version == summary.version)
82 return;
83 assert(version >= summary.version);
84
85 map<string,bufferlist> channel_blog;
86
87 version_t latest_full = get_version_latest_full();
88 dout(10) << __func__ << " latest full " << latest_full << dendl;
89 if ((latest_full > 0) && (latest_full > summary.version)) {
90 bufferlist latest_bl;
91 get_version_full(latest_full, latest_bl);
92 assert(latest_bl.length() != 0);
93 dout(7) << __func__ << " loading summary e" << latest_full << dendl;
94 bufferlist::iterator p = latest_bl.begin();
95 ::decode(summary, p);
96 dout(7) << __func__ << " loaded summary e" << summary.version << dendl;
97 }
98
99 // walk through incrementals
100 while (version > summary.version) {
101 bufferlist bl;
102 int err = get_version(summary.version+1, bl);
103 assert(err == 0);
104 assert(bl.length());
105
106 bufferlist::iterator p = bl.begin();
107 __u8 v;
108 ::decode(v, p);
109 while (!p.end()) {
110 LogEntry le;
111 le.decode(p);
112 dout(7) << "update_from_paxos applying incremental log " << summary.version+1 << " " << le << dendl;
113
114 string channel = le.channel;
115 if (channel.empty()) // keep retrocompatibility
116 channel = CLOG_CHANNEL_CLUSTER;
117
118 if (g_conf->get_val<bool>("mon_cluster_log_to_stderr")) {
119 cerr << channel << " " << le << std::endl;
120 }
121
122 if (channels.do_log_to_syslog(channel)) {
123 string level = channels.get_level(channel);
124 string facility = channels.get_facility(channel);
125 if (level.empty() || facility.empty()) {
126 derr << __func__ << " unable to log to syslog -- level or facility"
127 << " not defined (level: " << level << ", facility: "
128 << facility << ")" << dendl;
129 continue;
130 }
131 le.log_to_syslog(channels.get_level(channel),
132 channels.get_facility(channel));
133 }
134
135 if (channels.do_log_to_graylog(channel)) {
136 ceph::logging::Graylog::Ref graylog = channels.get_graylog(channel);
137 if (graylog) {
138 graylog->log_log_entry(&le);
139 }
140 dout(7) << "graylog: " << channel << " " << graylog
141 << " host:" << channels.log_to_graylog_host << dendl;
142 }
143
144 string log_file = channels.get_log_file(channel);
145 dout(20) << __func__ << " logging for channel '" << channel
146 << "' to file '" << log_file << "'" << dendl;
147
148 if (!log_file.empty()) {
149 string log_file_level = channels.get_log_file_level(channel);
150 if (log_file_level.empty()) {
151 dout(1) << __func__ << " warning: log file level not defined for"
152 << " channel '" << channel << "' yet a log file is --"
153 << " will assume lowest level possible" << dendl;
154 }
155
156 int min = string_to_syslog_level(log_file_level);
157 int l = clog_type_to_syslog_level(le.prio);
158 if (l <= min) {
159 stringstream ss;
160 ss << le << "\n";
161 // init entry if DNE
162 bufferlist &blog = channel_blog[channel];
163 blog.append(ss.str());
164 }
165 }
166
167 summary.add(le);
168 }
169
170 summary.version++;
171 summary.prune(g_conf->mon_log_max_summary);
172 }
173
174 dout(15) << __func__ << " logging for "
175 << channel_blog.size() << " channels" << dendl;
176 for(map<string,bufferlist>::iterator p = channel_blog.begin();
177 p != channel_blog.end(); ++p) {
178 if (!p->second.length()) {
179 dout(15) << __func__ << " channel '" << p->first
180 << "': nothing to log" << dendl;
181 continue;
182 }
183
184 dout(15) << __func__ << " channel '" << p->first
185 << "' logging " << p->second.length() << " bytes" << dendl;
186 string log_file = channels.get_log_file(p->first);
187
188 int fd = ::open(log_file.c_str(), O_WRONLY|O_APPEND|O_CREAT, 0600);
189 if (fd < 0) {
190 int err = -errno;
191 dout(1) << "unable to write to '" << log_file << "' for channel '"
192 << p->first << "': " << cpp_strerror(err) << dendl;
193 } else {
194 int err = p->second.write_fd(fd);
195 if (err < 0) {
196 dout(1) << "error writing to '" << log_file << "' for channel '"
197 << p->first << ": " << cpp_strerror(err) << dendl;
198 }
199 VOID_TEMP_FAILURE_RETRY(::close(fd));
200 }
201 }
202
203 check_subs();
204 }
205
206 void LogMonitor::create_pending()
207 {
208 pending_log.clear();
209 pending_summary = summary;
210 dout(10) << "create_pending v " << (get_last_committed() + 1) << dendl;
211 }
212
213 void LogMonitor::encode_pending(MonitorDBStore::TransactionRef t)
214 {
215 version_t version = get_last_committed() + 1;
216 bufferlist bl;
217 dout(10) << __func__ << " v" << version << dendl;
218 __u8 v = 1;
219 ::encode(v, bl);
220 multimap<utime_t,LogEntry>::iterator p;
221 for (p = pending_log.begin(); p != pending_log.end(); ++p)
222 p->second.encode(bl, mon->get_quorum_con_features());
223
224 put_version(t, version, bl);
225 put_last_committed(t, version);
226 }
227
228 void LogMonitor::encode_full(MonitorDBStore::TransactionRef t)
229 {
230 dout(10) << __func__ << " log v " << summary.version << dendl;
231 assert(get_last_committed() == summary.version);
232
233 bufferlist summary_bl;
234 ::encode(summary, summary_bl, mon->get_quorum_con_features());
235
236 put_version_full(t, summary.version, summary_bl);
237 put_version_latest_full(t, summary.version);
238 }
239
240 version_t LogMonitor::get_trim_to()
241 {
242 if (!mon->is_leader())
243 return 0;
244
245 unsigned max = g_conf->mon_max_log_epochs;
246 version_t version = get_last_committed();
247 if (version > max)
248 return version - max;
249 return 0;
250 }
251
252 bool LogMonitor::preprocess_query(MonOpRequestRef op)
253 {
254 op->mark_logmon_event("preprocess_query");
255 PaxosServiceMessage *m = static_cast<PaxosServiceMessage*>(op->get_req());
256 dout(10) << "preprocess_query " << *m << " from " << m->get_orig_source_inst() << dendl;
257 switch (m->get_type()) {
258 case MSG_MON_COMMAND:
259 return preprocess_command(op);
260
261 case MSG_LOG:
262 return preprocess_log(op);
263
264 default:
265 ceph_abort();
266 return true;
267 }
268 }
269
270 bool LogMonitor::prepare_update(MonOpRequestRef op)
271 {
272 op->mark_logmon_event("prepare_update");
273 PaxosServiceMessage *m = static_cast<PaxosServiceMessage*>(op->get_req());
274 dout(10) << "prepare_update " << *m << " from " << m->get_orig_source_inst() << dendl;
275 switch (m->get_type()) {
276 case MSG_MON_COMMAND:
277 return prepare_command(op);
278 case MSG_LOG:
279 return prepare_log(op);
280 default:
281 ceph_abort();
282 return false;
283 }
284 }
285
286 bool LogMonitor::preprocess_log(MonOpRequestRef op)
287 {
288 op->mark_logmon_event("preprocess_log");
289 MLog *m = static_cast<MLog*>(op->get_req());
290 dout(10) << "preprocess_log " << *m << " from " << m->get_orig_source() << dendl;
291 int num_new = 0;
292
293 MonSession *session = m->get_session();
294 if (!session)
295 goto done;
296 if (!session->is_capable("log", MON_CAP_W)) {
297 dout(0) << "preprocess_log got MLog from entity with insufficient privileges "
298 << session->caps << dendl;
299 goto done;
300 }
301
302 for (deque<LogEntry>::iterator p = m->entries.begin();
303 p != m->entries.end();
304 ++p) {
305 if (!pending_summary.contains(p->key()))
306 num_new++;
307 }
308 if (!num_new) {
309 dout(10) << " nothing new" << dendl;
310 goto done;
311 }
312
313 return false;
314
315 done:
316 mon->no_reply(op);
317 return true;
318 }
319
320 struct LogMonitor::C_Log : public C_MonOp {
321 LogMonitor *logmon;
322 C_Log(LogMonitor *p, MonOpRequestRef o) :
323 C_MonOp(o), logmon(p) {}
324 void _finish(int r) override {
325 if (r == -ECANCELED) {
326 return;
327 }
328 logmon->_updated_log(op);
329 }
330 };
331
332 bool LogMonitor::prepare_log(MonOpRequestRef op)
333 {
334 op->mark_logmon_event("prepare_log");
335 MLog *m = static_cast<MLog*>(op->get_req());
336 dout(10) << "prepare_log " << *m << " from " << m->get_orig_source() << dendl;
337
338 if (m->fsid != mon->monmap->fsid) {
339 dout(0) << "handle_log on fsid " << m->fsid << " != " << mon->monmap->fsid
340 << dendl;
341 return false;
342 }
343
344 for (deque<LogEntry>::iterator p = m->entries.begin();
345 p != m->entries.end();
346 ++p) {
347 dout(10) << " logging " << *p << dendl;
348 if (!pending_summary.contains(p->key())) {
349 pending_summary.add(*p);
350 pending_log.insert(pair<utime_t,LogEntry>(p->stamp, *p));
351 }
352 }
353 pending_summary.prune(g_conf->mon_log_max_summary);
354 wait_for_finished_proposal(op, new C_Log(this, op));
355 return true;
356 }
357
358 void LogMonitor::_updated_log(MonOpRequestRef op)
359 {
360 MLog *m = static_cast<MLog*>(op->get_req());
361 dout(7) << "_updated_log for " << m->get_orig_source_inst() << dendl;
362 mon->send_reply(op, new MLogAck(m->fsid, m->entries.rbegin()->seq));
363 }
364
365 bool LogMonitor::should_propose(double& delay)
366 {
367 // commit now if we have a lot of pending events
368 if (g_conf->mon_max_log_entries_per_event > 0 &&
369 pending_log.size() >= (unsigned)g_conf->mon_max_log_entries_per_event)
370 return true;
371
372 // otherwise fall back to generic policy
373 return PaxosService::should_propose(delay);
374 }
375
376
377 bool LogMonitor::preprocess_command(MonOpRequestRef op)
378 {
379 op->mark_logmon_event("preprocess_command");
380 MMonCommand *m = static_cast<MMonCommand*>(op->get_req());
381 int r = -EINVAL;
382 bufferlist rdata;
383 stringstream ss;
384
385 map<string, cmd_vartype> cmdmap;
386 if (!cmdmap_from_json(m->cmd, &cmdmap, ss)) {
387 string rs = ss.str();
388 mon->reply_command(op, -EINVAL, rs, get_last_committed());
389 return true;
390 }
391 MonSession *session = m->get_session();
392 if (!session) {
393 mon->reply_command(op, -EACCES, "access denied", get_last_committed());
394 return true;
395 }
396
397 string prefix;
398 cmd_getval(g_ceph_context, cmdmap, "prefix", prefix);
399
400 string format;
401 cmd_getval(g_ceph_context, cmdmap, "format", format, string("plain"));
402 boost::scoped_ptr<Formatter> f(Formatter::create(format));
403
404 if (prefix == "log last") {
405 int64_t num = 20;
406 cmd_getval(g_ceph_context, cmdmap, "num", num);
407 if (f) {
408 f->open_array_section("tail");
409 }
410
411 std::string level_str;
412 clog_type level;
413 if (cmd_getval(g_ceph_context, cmdmap, "level", level_str)) {
414 level = LogEntry::str_to_level(level_str);
415 if (level == CLOG_UNKNOWN) {
416 ss << "Invalid severity '" << level_str << "'";
417 mon->reply_command(op, -EINVAL, ss.str(), get_last_committed());
418 return true;
419 }
420 } else {
421 level = CLOG_INFO;
422 }
423
424 std::string channel;
425 if (!cmd_getval(g_ceph_context, cmdmap, "channel", channel)) {
426 channel = CLOG_CHANNEL_DEFAULT;
427 }
428
429 // We'll apply this twice, once while counting out lines
430 // and once while outputting them.
431 auto match = [level, channel](const LogEntry &entry) {
432 return entry.prio >= level && (entry.channel == channel || channel == "*");
433 };
434
435 auto rp = summary.tail.rbegin();
436 for (; num > 0 && rp != summary.tail.rend(); ++rp) {
437 if (match(*rp)) {
438 num--;
439 }
440 }
441 if (rp == summary.tail.rend()) {
442 --rp;
443 }
444 ostringstream ss;
445 for (; rp != summary.tail.rbegin(); --rp) {
446 if (!match(*rp)) {
447 continue;
448 }
449
450 if (f) {
451 f->dump_object("entry", *rp);
452 } else {
453 ss << *rp << "\n";
454 }
455 }
456 if (f) {
457 f->close_section();
458 f->flush(rdata);
459 } else {
460 rdata.append(ss.str());
461 }
462 r = 0;
463 } else {
464 return false;
465 }
466
467 string rs;
468 getline(ss, rs);
469 mon->reply_command(op, r, rs, rdata, get_last_committed());
470 return true;
471 }
472
473
474 bool LogMonitor::prepare_command(MonOpRequestRef op)
475 {
476 op->mark_logmon_event("prepare_command");
477 MMonCommand *m = static_cast<MMonCommand*>(op->get_req());
478 stringstream ss;
479 string rs;
480 int err = -EINVAL;
481
482 map<string, cmd_vartype> cmdmap;
483 if (!cmdmap_from_json(m->cmd, &cmdmap, ss)) {
484 // ss has reason for failure
485 string rs = ss.str();
486 mon->reply_command(op, -EINVAL, rs, get_last_committed());
487 return true;
488 }
489
490 string prefix;
491 cmd_getval(g_ceph_context, cmdmap, "prefix", prefix);
492
493 MonSession *session = m->get_session();
494 if (!session) {
495 mon->reply_command(op, -EACCES, "access denied", get_last_committed());
496 return true;
497 }
498
499 if (prefix == "log") {
500 vector<string> logtext;
501 cmd_getval(g_ceph_context, cmdmap, "logtext", logtext);
502 LogEntry le;
503 le.who = m->get_orig_source_inst();
504 le.name = session->entity_name;
505 le.stamp = m->get_recv_stamp();
506 le.seq = 0;
507 le.prio = CLOG_INFO;
508 le.channel = CLOG_CHANNEL_DEFAULT;
509 le.msg = str_join(logtext, " ");
510 pending_summary.add(le);
511 pending_summary.prune(g_conf->mon_log_max_summary);
512 pending_log.insert(pair<utime_t,LogEntry>(le.stamp, le));
513 wait_for_finished_proposal(op, new Monitor::C_Command(
514 mon, op, 0, string(), get_last_committed() + 1));
515 return true;
516 }
517
518 getline(ss, rs);
519 mon->reply_command(op, err, rs, get_last_committed());
520 return false;
521 }
522
523
524 int LogMonitor::sub_name_to_id(const string& n)
525 {
526 if (n.substr(0, 4) == "log-" && n.size() > 4) {
527 return LogEntry::str_to_level(n.substr(4));
528 } else {
529 return CLOG_UNKNOWN;
530 }
531 }
532
533 void LogMonitor::check_subs()
534 {
535 dout(10) << __func__ << dendl;
536 for (map<string, xlist<Subscription*>*>::iterator i = mon->session_map.subs.begin();
537 i != mon->session_map.subs.end();
538 ++i) {
539 for (xlist<Subscription*>::iterator j = i->second->begin(); !j.end(); ++j) {
540 if (sub_name_to_id((*j)->type) >= 0)
541 check_sub(*j);
542 }
543 }
544 }
545
546 void LogMonitor::check_sub(Subscription *s)
547 {
548 dout(10) << __func__ << " client wants " << s->type << " ver " << s->next << dendl;
549
550 int sub_level = sub_name_to_id(s->type);
551 assert(sub_level >= 0);
552
553 version_t summary_version = summary.version;
554 if (s->next > summary_version) {
555 dout(10) << __func__ << " client " << s->session->inst
556 << " requested version (" << s->next << ") is greater than ours ("
557 << summary_version << "), which means we already sent him"
558 << " everything we have." << dendl;
559 return;
560 }
561
562 MLog *mlog = new MLog(mon->monmap->fsid);
563
564 if (s->next == 0) {
565 /* First timer, heh? */
566 bool ret = _create_sub_summary(mlog, sub_level);
567 if (!ret) {
568 dout(1) << __func__ << " ret = " << ret << dendl;
569 mlog->put();
570 return;
571 }
572 } else {
573 /* let us send you an incremental log... */
574 _create_sub_incremental(mlog, sub_level, s->next);
575 }
576
577 dout(1) << __func__ << " sending message to " << s->session->inst
578 << " with " << mlog->entries.size() << " entries"
579 << " (version " << mlog->version << ")" << dendl;
580
581 if (!mlog->entries.empty()) {
582 s->session->con->send_message(mlog);
583 } else {
584 mlog->put();
585 }
586 if (s->onetime)
587 mon->session_map.remove_sub(s);
588 else
589 s->next = summary_version+1;
590 }
591
592 /**
593 * Create a log message containing only the last message in the summary.
594 *
595 * @param mlog Log message we'll send to the client.
596 * @param level Maximum log level the client is interested in.
597 * @return 'true' if we consider we successfully populated @mlog;
598 * 'false' otherwise.
599 */
600 bool LogMonitor::_create_sub_summary(MLog *mlog, int level)
601 {
602 dout(10) << __func__ << dendl;
603
604 assert(mlog != NULL);
605
606 if (!summary.tail.size())
607 return false;
608
609 list<LogEntry>::reverse_iterator it = summary.tail.rbegin();
610 for (; it != summary.tail.rend(); ++it) {
611 LogEntry e = *it;
612 if (e.prio < level)
613 continue;
614
615 mlog->entries.push_back(e);
616 mlog->version = summary.version;
617 break;
618 }
619
620 return true;
621 }
622
623 /**
624 * Create an incremental log message from version \p sv to \p summary.version
625 *
626 * @param mlog Log message we'll send to the client with the messages received
627 * since version \p sv, inclusive.
628 * @param level The max log level of the messages the client is interested in.
629 * @param sv The version the client is looking for.
630 */
631 void LogMonitor::_create_sub_incremental(MLog *mlog, int level, version_t sv)
632 {
633 dout(10) << __func__ << " level " << level << " ver " << sv
634 << " cur summary ver " << summary.version << dendl;
635
636 if (sv < get_first_committed()) {
637 dout(10) << __func__ << " skipped from " << sv
638 << " to first_committed " << get_first_committed() << dendl;
639 LogEntry le;
640 le.stamp = ceph_clock_now();
641 le.prio = CLOG_WARN;
642 ostringstream ss;
643 ss << "skipped log messages from " << sv << " to " << get_first_committed();
644 le.msg = ss.str();
645 mlog->entries.push_back(le);
646 sv = get_first_committed();
647 }
648
649 version_t summary_ver = summary.version;
650 while (sv && sv <= summary_ver) {
651 bufferlist bl;
652 int err = get_version(sv, bl);
653 assert(err == 0);
654 assert(bl.length());
655 bufferlist::iterator p = bl.begin();
656 __u8 v;
657 ::decode(v,p);
658 while (!p.end()) {
659 LogEntry le;
660 le.decode(p);
661
662 if (le.prio < level) {
663 dout(20) << __func__ << " requested " << level
664 << " entry " << le.prio << dendl;
665 continue;
666 }
667
668 mlog->entries.push_back(le);
669 }
670 mlog->version = sv++;
671 }
672
673 dout(10) << __func__ << " incremental message ready ("
674 << mlog->entries.size() << " entries)" << dendl;
675 }
676
677 void LogMonitor::update_log_channels()
678 {
679 ostringstream oss;
680
681 channels.clear();
682
683 int r = get_conf_str_map_helper(g_conf->mon_cluster_log_to_syslog,
684 oss, &channels.log_to_syslog,
685 CLOG_CONFIG_DEFAULT_KEY);
686 if (r < 0) {
687 derr << __func__ << " error parsing 'mon_cluster_log_to_syslog'" << dendl;
688 return;
689 }
690
691 r = get_conf_str_map_helper(g_conf->mon_cluster_log_to_syslog_level,
692 oss, &channels.syslog_level,
693 CLOG_CONFIG_DEFAULT_KEY);
694 if (r < 0) {
695 derr << __func__ << " error parsing 'mon_cluster_log_to_syslog_level'"
696 << dendl;
697 return;
698 }
699
700 r = get_conf_str_map_helper(g_conf->mon_cluster_log_to_syslog_facility,
701 oss, &channels.syslog_facility,
702 CLOG_CONFIG_DEFAULT_KEY);
703 if (r < 0) {
704 derr << __func__ << " error parsing 'mon_cluster_log_to_syslog_facility'"
705 << dendl;
706 return;
707 }
708
709 r = get_conf_str_map_helper(g_conf->mon_cluster_log_file, oss,
710 &channels.log_file,
711 CLOG_CONFIG_DEFAULT_KEY);
712 if (r < 0) {
713 derr << __func__ << " error parsing 'mon_cluster_log_file'" << dendl;
714 return;
715 }
716
717 r = get_conf_str_map_helper(g_conf->mon_cluster_log_file_level, oss,
718 &channels.log_file_level,
719 CLOG_CONFIG_DEFAULT_KEY);
720 if (r < 0) {
721 derr << __func__ << " error parsing 'mon_cluster_log_file_level'"
722 << dendl;
723 return;
724 }
725
726 r = get_conf_str_map_helper(g_conf->mon_cluster_log_to_graylog, oss,
727 &channels.log_to_graylog,
728 CLOG_CONFIG_DEFAULT_KEY);
729 if (r < 0) {
730 derr << __func__ << " error parsing 'mon_cluster_log_to_graylog'"
731 << dendl;
732 return;
733 }
734
735 r = get_conf_str_map_helper(g_conf->mon_cluster_log_to_graylog_host, oss,
736 &channels.log_to_graylog_host,
737 CLOG_CONFIG_DEFAULT_KEY);
738 if (r < 0) {
739 derr << __func__ << " error parsing 'mon_cluster_log_to_graylog_host'"
740 << dendl;
741 return;
742 }
743
744 r = get_conf_str_map_helper(g_conf->mon_cluster_log_to_graylog_port, oss,
745 &channels.log_to_graylog_port,
746 CLOG_CONFIG_DEFAULT_KEY);
747 if (r < 0) {
748 derr << __func__ << " error parsing 'mon_cluster_log_to_graylog_port'"
749 << dendl;
750 return;
751 }
752
753 channels.expand_channel_meta();
754 }
755
756 void LogMonitor::log_channel_info::expand_channel_meta(map<string,string> &m)
757 {
758 generic_dout(20) << __func__ << " expand map: " << m << dendl;
759 for (map<string,string>::iterator p = m.begin(); p != m.end(); ++p) {
760 m[p->first] = expand_channel_meta(p->second, p->first);
761 }
762 generic_dout(20) << __func__ << " expanded map: " << m << dendl;
763 }
764
765 string LogMonitor::log_channel_info::expand_channel_meta(
766 const string &input,
767 const string &change_to)
768 {
769 size_t pos = string::npos;
770 string s(input);
771 while ((pos = s.find(LOG_META_CHANNEL)) != string::npos) {
772 string tmp = s.substr(0, pos) + change_to;
773 if (pos+LOG_META_CHANNEL.length() < s.length())
774 tmp += s.substr(pos+LOG_META_CHANNEL.length());
775 s = tmp;
776 }
777 generic_dout(20) << __func__ << " from '" << input
778 << "' to '" << s << "'" << dendl;
779
780 return s;
781 }
782
783 bool LogMonitor::log_channel_info::do_log_to_syslog(const string &channel) {
784 string v = get_str_map_key(log_to_syslog, channel,
785 &CLOG_CONFIG_DEFAULT_KEY);
786 // We expect booleans, but they are in k/v pairs, kept
787 // as strings, in 'log_to_syslog'. We must ensure
788 // compatibility with existing boolean handling, and so
789 // we are here using a modified version of how
790 // md_config_t::set_val_raw() handles booleans. We will
791 // accept both 'true' and 'false', but will also check for
792 // '1' and '0'. The main distiction between this and the
793 // original code is that we will assume everything not '1',
794 // '0', 'true' or 'false' to be 'false'.
795 bool ret = false;
796
797 if (boost::iequals(v, "false")) {
798 ret = false;
799 } else if (boost::iequals(v, "true")) {
800 ret = true;
801 } else {
802 std::string err;
803 int b = strict_strtol(v.c_str(), 10, &err);
804 ret = (err.empty() && b == 1);
805 }
806
807 return ret;
808 }
809
810 ceph::logging::Graylog::Ref LogMonitor::log_channel_info::get_graylog(
811 const string &channel)
812 {
813 generic_dout(25) << __func__ << " for channel '"
814 << channel << "'" << dendl;
815
816 if (graylogs.count(channel) == 0) {
817 auto graylog(std::make_shared<ceph::logging::Graylog>("mon"));
818
819 graylog->set_fsid(g_conf->get_val<uuid_d>("fsid"));
820 graylog->set_hostname(g_conf->host);
821 graylog->set_destination(get_str_map_key(log_to_graylog_host, channel,
822 &CLOG_CONFIG_DEFAULT_KEY),
823 atoi(get_str_map_key(log_to_graylog_port, channel,
824 &CLOG_CONFIG_DEFAULT_KEY).c_str()));
825
826 graylogs[channel] = graylog;
827 generic_dout(20) << __func__ << " for channel '"
828 << channel << "' to graylog host '"
829 << log_to_graylog_host[channel] << ":"
830 << log_to_graylog_port[channel]
831 << "'" << dendl;
832 }
833 return graylogs[channel];
834 }
835
836 void LogMonitor::handle_conf_change(const struct md_config_t *conf,
837 const std::set<std::string> &changed)
838 {
839 if (changed.count("mon_cluster_log_to_syslog") ||
840 changed.count("mon_cluster_log_to_syslog_level") ||
841 changed.count("mon_cluster_log_to_syslog_facility") ||
842 changed.count("mon_cluster_log_file") ||
843 changed.count("mon_cluster_log_file_level") ||
844 changed.count("mon_cluster_log_to_graylog") ||
845 changed.count("mon_cluster_log_to_graylog_host") ||
846 changed.count("mon_cluster_log_to_graylog_port")) {
847 update_log_channels();
848 }
849 }