]> git.proxmox.com Git - ceph.git/blob - ceph/src/mon/LogMonitor.cc
update sources to 12.2.7
[ceph.git] / ceph / src / mon / LogMonitor.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #include <boost/algorithm/string/predicate.hpp>
16
17 #include <sstream>
18 #include <syslog.h>
19
20 #include "LogMonitor.h"
21 #include "Monitor.h"
22 #include "MonitorDBStore.h"
23
24 #include "messages/MMonCommand.h"
25 #include "messages/MLog.h"
26 #include "messages/MLogAck.h"
27 #include "common/Graylog.h"
28 #include "common/errno.h"
29 #include "common/strtol.h"
30 #include "include/assert.h"
31 #include "include/str_list.h"
32 #include "include/str_map.h"
33 #include "include/compat.h"
34
35 #define dout_subsys ceph_subsys_mon
36 #undef dout_prefix
37 #define dout_prefix _prefix(_dout, mon, get_last_committed())
38 static ostream& _prefix(std::ostream *_dout, Monitor *mon, version_t v) {
39 return *_dout << "mon." << mon->name << "@" << mon->rank
40 << "(" << mon->get_state_name()
41 << ").log v" << v << " ";
42 }
43
44 ostream& operator<<(ostream &out, const LogMonitor &pm)
45 {
46 return out << "log";
47 }
48
49 /*
50 Tick function to update the map based on performance every N seconds
51 */
52
53 void LogMonitor::tick()
54 {
55 if (!is_active()) return;
56
57 dout(10) << *this << dendl;
58
59 }
60
61 void LogMonitor::create_initial()
62 {
63 dout(10) << "create_initial -- creating initial map" << dendl;
64 LogEntry e;
65 memset(&e.who, 0, sizeof(e.who));
66 e.name = g_conf->name;
67 e.stamp = ceph_clock_now();
68 e.prio = CLOG_INFO;
69 std::stringstream ss;
70 ss << "mkfs " << mon->monmap->get_fsid();
71 e.msg = ss.str();
72 e.seq = 0;
73 pending_log.insert(pair<utime_t,LogEntry>(e.stamp, e));
74 }
75
76 void LogMonitor::update_from_paxos(bool *need_bootstrap)
77 {
78 dout(10) << __func__ << dendl;
79 version_t version = get_last_committed();
80 dout(10) << __func__ << " version " << version
81 << " summary v " << summary.version << dendl;
82 if (version == summary.version)
83 return;
84 assert(version >= summary.version);
85
86 map<string,bufferlist> channel_blog;
87
88 version_t latest_full = get_version_latest_full();
89 dout(10) << __func__ << " latest full " << latest_full << dendl;
90 if ((latest_full > 0) && (latest_full > summary.version)) {
91 bufferlist latest_bl;
92 get_version_full(latest_full, latest_bl);
93 assert(latest_bl.length() != 0);
94 dout(7) << __func__ << " loading summary e" << latest_full << dendl;
95 bufferlist::iterator p = latest_bl.begin();
96 ::decode(summary, p);
97 dout(7) << __func__ << " loaded summary e" << summary.version << dendl;
98 }
99
100 // walk through incrementals
101 while (version > summary.version) {
102 bufferlist bl;
103 int err = get_version(summary.version+1, bl);
104 assert(err == 0);
105 assert(bl.length());
106
107 bufferlist::iterator p = bl.begin();
108 __u8 v;
109 ::decode(v, p);
110 while (!p.end()) {
111 LogEntry le;
112 le.decode(p);
113 dout(7) << "update_from_paxos applying incremental log " << summary.version+1 << " " << le << dendl;
114
115 string channel = le.channel;
116 if (channel.empty()) // keep retrocompatibility
117 channel = CLOG_CHANNEL_CLUSTER;
118
119 if (g_conf->get_val<bool>("mon_cluster_log_to_stderr")) {
120 cerr << channel << " " << le << std::endl;
121 }
122
123 if (channels.do_log_to_syslog(channel)) {
124 string level = channels.get_level(channel);
125 string facility = channels.get_facility(channel);
126 if (level.empty() || facility.empty()) {
127 derr << __func__ << " unable to log to syslog -- level or facility"
128 << " not defined (level: " << level << ", facility: "
129 << facility << ")" << dendl;
130 continue;
131 }
132 le.log_to_syslog(channels.get_level(channel),
133 channels.get_facility(channel));
134 }
135
136 if (channels.do_log_to_graylog(channel)) {
137 ceph::logging::Graylog::Ref graylog = channels.get_graylog(channel);
138 if (graylog) {
139 graylog->log_log_entry(&le);
140 }
141 dout(7) << "graylog: " << channel << " " << graylog
142 << " host:" << channels.log_to_graylog_host << dendl;
143 }
144
145 string log_file = channels.get_log_file(channel);
146 dout(20) << __func__ << " logging for channel '" << channel
147 << "' to file '" << log_file << "'" << dendl;
148
149 if (!log_file.empty()) {
150 string log_file_level = channels.get_log_file_level(channel);
151 if (log_file_level.empty()) {
152 dout(1) << __func__ << " warning: log file level not defined for"
153 << " channel '" << channel << "' yet a log file is --"
154 << " will assume lowest level possible" << dendl;
155 }
156
157 int min = string_to_syslog_level(log_file_level);
158 int l = clog_type_to_syslog_level(le.prio);
159 if (l <= min) {
160 stringstream ss;
161 ss << le << "\n";
162 // init entry if DNE
163 bufferlist &blog = channel_blog[channel];
164 blog.append(ss.str());
165 }
166 }
167
168 summary.add(le);
169 }
170
171 summary.version++;
172 summary.prune(g_conf->mon_log_max_summary);
173 }
174
175 dout(15) << __func__ << " logging for "
176 << channel_blog.size() << " channels" << dendl;
177 for(map<string,bufferlist>::iterator p = channel_blog.begin();
178 p != channel_blog.end(); ++p) {
179 if (!p->second.length()) {
180 dout(15) << __func__ << " channel '" << p->first
181 << "': nothing to log" << dendl;
182 continue;
183 }
184
185 dout(15) << __func__ << " channel '" << p->first
186 << "' logging " << p->second.length() << " bytes" << dendl;
187 string log_file = channels.get_log_file(p->first);
188
189 int fd = ::open(log_file.c_str(), O_WRONLY|O_APPEND|O_CREAT, 0600);
190 if (fd < 0) {
191 int err = -errno;
192 dout(1) << "unable to write to '" << log_file << "' for channel '"
193 << p->first << "': " << cpp_strerror(err) << dendl;
194 } else {
195 int err = p->second.write_fd(fd);
196 if (err < 0) {
197 dout(1) << "error writing to '" << log_file << "' for channel '"
198 << p->first << ": " << cpp_strerror(err) << dendl;
199 }
200 VOID_TEMP_FAILURE_RETRY(::close(fd));
201 }
202 }
203
204 check_subs();
205 }
206
207 void LogMonitor::create_pending()
208 {
209 pending_log.clear();
210 pending_summary = summary;
211 dout(10) << "create_pending v " << (get_last_committed() + 1) << dendl;
212 }
213
214 void LogMonitor::encode_pending(MonitorDBStore::TransactionRef t)
215 {
216 version_t version = get_last_committed() + 1;
217 bufferlist bl;
218 dout(10) << __func__ << " v" << version << dendl;
219 __u8 v = 1;
220 ::encode(v, bl);
221 multimap<utime_t,LogEntry>::iterator p;
222 for (p = pending_log.begin(); p != pending_log.end(); ++p)
223 p->second.encode(bl, mon->get_quorum_con_features());
224
225 put_version(t, version, bl);
226 put_last_committed(t, version);
227 }
228
229 void LogMonitor::encode_full(MonitorDBStore::TransactionRef t)
230 {
231 dout(10) << __func__ << " log v " << summary.version << dendl;
232 assert(get_last_committed() == summary.version);
233
234 bufferlist summary_bl;
235 ::encode(summary, summary_bl, mon->get_quorum_con_features());
236
237 put_version_full(t, summary.version, summary_bl);
238 put_version_latest_full(t, summary.version);
239 }
240
241 version_t LogMonitor::get_trim_to()
242 {
243 if (!mon->is_leader())
244 return 0;
245
246 unsigned max = g_conf->mon_max_log_epochs;
247 version_t version = get_last_committed();
248 if (version > max)
249 return version - max;
250 return 0;
251 }
252
253 bool LogMonitor::preprocess_query(MonOpRequestRef op)
254 {
255 op->mark_logmon_event("preprocess_query");
256 PaxosServiceMessage *m = static_cast<PaxosServiceMessage*>(op->get_req());
257 dout(10) << "preprocess_query " << *m << " from " << m->get_orig_source_inst() << dendl;
258 switch (m->get_type()) {
259 case MSG_MON_COMMAND:
260 return preprocess_command(op);
261
262 case MSG_LOG:
263 return preprocess_log(op);
264
265 default:
266 ceph_abort();
267 return true;
268 }
269 }
270
271 bool LogMonitor::prepare_update(MonOpRequestRef op)
272 {
273 op->mark_logmon_event("prepare_update");
274 PaxosServiceMessage *m = static_cast<PaxosServiceMessage*>(op->get_req());
275 dout(10) << "prepare_update " << *m << " from " << m->get_orig_source_inst() << dendl;
276 switch (m->get_type()) {
277 case MSG_MON_COMMAND:
278 return prepare_command(op);
279 case MSG_LOG:
280 return prepare_log(op);
281 default:
282 ceph_abort();
283 return false;
284 }
285 }
286
287 bool LogMonitor::preprocess_log(MonOpRequestRef op)
288 {
289 op->mark_logmon_event("preprocess_log");
290 MLog *m = static_cast<MLog*>(op->get_req());
291 dout(10) << "preprocess_log " << *m << " from " << m->get_orig_source() << dendl;
292 int num_new = 0;
293
294 MonSession *session = m->get_session();
295 if (!session)
296 goto done;
297 if (!session->is_capable("log", MON_CAP_W)) {
298 dout(0) << "preprocess_log got MLog from entity with insufficient privileges "
299 << session->caps << dendl;
300 goto done;
301 }
302
303 for (deque<LogEntry>::iterator p = m->entries.begin();
304 p != m->entries.end();
305 ++p) {
306 if (!pending_summary.contains(p->key()))
307 num_new++;
308 }
309 if (!num_new) {
310 dout(10) << " nothing new" << dendl;
311 goto done;
312 }
313
314 return false;
315
316 done:
317 mon->no_reply(op);
318 return true;
319 }
320
321 struct LogMonitor::C_Log : public C_MonOp {
322 LogMonitor *logmon;
323 C_Log(LogMonitor *p, MonOpRequestRef o) :
324 C_MonOp(o), logmon(p) {}
325 void _finish(int r) override {
326 if (r == -ECANCELED) {
327 return;
328 }
329 logmon->_updated_log(op);
330 }
331 };
332
333 bool LogMonitor::prepare_log(MonOpRequestRef op)
334 {
335 op->mark_logmon_event("prepare_log");
336 MLog *m = static_cast<MLog*>(op->get_req());
337 dout(10) << "prepare_log " << *m << " from " << m->get_orig_source() << dendl;
338
339 if (m->fsid != mon->monmap->fsid) {
340 dout(0) << "handle_log on fsid " << m->fsid << " != " << mon->monmap->fsid
341 << dendl;
342 return false;
343 }
344
345 for (deque<LogEntry>::iterator p = m->entries.begin();
346 p != m->entries.end();
347 ++p) {
348 dout(10) << " logging " << *p << dendl;
349 if (!pending_summary.contains(p->key())) {
350 pending_summary.add(*p);
351 pending_log.insert(pair<utime_t,LogEntry>(p->stamp, *p));
352 }
353 }
354 pending_summary.prune(g_conf->mon_log_max_summary);
355 wait_for_finished_proposal(op, new C_Log(this, op));
356 return true;
357 }
358
359 void LogMonitor::_updated_log(MonOpRequestRef op)
360 {
361 MLog *m = static_cast<MLog*>(op->get_req());
362 dout(7) << "_updated_log for " << m->get_orig_source_inst() << dendl;
363 mon->send_reply(op, new MLogAck(m->fsid, m->entries.rbegin()->seq));
364 }
365
366 bool LogMonitor::should_propose(double& delay)
367 {
368 // commit now if we have a lot of pending events
369 if (g_conf->mon_max_log_entries_per_event > 0 &&
370 pending_log.size() >= (unsigned)g_conf->mon_max_log_entries_per_event)
371 return true;
372
373 // otherwise fall back to generic policy
374 return PaxosService::should_propose(delay);
375 }
376
377
378 bool LogMonitor::preprocess_command(MonOpRequestRef op)
379 {
380 op->mark_logmon_event("preprocess_command");
381 MMonCommand *m = static_cast<MMonCommand*>(op->get_req());
382 int r = -EINVAL;
383 bufferlist rdata;
384 stringstream ss;
385
386 map<string, cmd_vartype> cmdmap;
387 if (!cmdmap_from_json(m->cmd, &cmdmap, ss)) {
388 string rs = ss.str();
389 mon->reply_command(op, -EINVAL, rs, get_last_committed());
390 return true;
391 }
392 MonSession *session = m->get_session();
393 if (!session) {
394 mon->reply_command(op, -EACCES, "access denied", get_last_committed());
395 return true;
396 }
397
398 string prefix;
399 cmd_getval(g_ceph_context, cmdmap, "prefix", prefix);
400
401 string format;
402 cmd_getval(g_ceph_context, cmdmap, "format", format, string("plain"));
403 boost::scoped_ptr<Formatter> f(Formatter::create(format));
404
405 if (prefix == "log last") {
406 int64_t num = 20;
407 cmd_getval(g_ceph_context, cmdmap, "num", num);
408 if (f) {
409 f->open_array_section("tail");
410 }
411
412 std::string level_str;
413 clog_type level;
414 if (cmd_getval(g_ceph_context, cmdmap, "level", level_str)) {
415 level = LogEntry::str_to_level(level_str);
416 if (level == CLOG_UNKNOWN) {
417 ss << "Invalid severity '" << level_str << "'";
418 mon->reply_command(op, -EINVAL, ss.str(), get_last_committed());
419 return true;
420 }
421 } else {
422 level = CLOG_INFO;
423 }
424
425 std::string channel;
426 if (!cmd_getval(g_ceph_context, cmdmap, "channel", channel)) {
427 channel = CLOG_CHANNEL_DEFAULT;
428 }
429
430 // We'll apply this twice, once while counting out lines
431 // and once while outputting them.
432 auto match = [level, channel](const LogEntry &entry) {
433 return entry.prio >= level && (entry.channel == channel || channel == "*");
434 };
435
436 auto rp = summary.tail.rbegin();
437 for (; num > 0 && rp != summary.tail.rend(); ++rp) {
438 if (match(*rp)) {
439 num--;
440 }
441 }
442 if (rp == summary.tail.rend()) {
443 --rp;
444 }
445 ostringstream ss;
446 for (; rp != summary.tail.rbegin(); --rp) {
447 if (!match(*rp)) {
448 continue;
449 }
450
451 if (f) {
452 f->dump_object("entry", *rp);
453 } else {
454 ss << *rp << "\n";
455 }
456 }
457 if (f) {
458 f->close_section();
459 f->flush(rdata);
460 } else {
461 rdata.append(ss.str());
462 }
463 r = 0;
464 } else {
465 return false;
466 }
467
468 string rs;
469 getline(ss, rs);
470 mon->reply_command(op, r, rs, rdata, get_last_committed());
471 return true;
472 }
473
474
475 bool LogMonitor::prepare_command(MonOpRequestRef op)
476 {
477 op->mark_logmon_event("prepare_command");
478 MMonCommand *m = static_cast<MMonCommand*>(op->get_req());
479 stringstream ss;
480 string rs;
481 int err = -EINVAL;
482
483 map<string, cmd_vartype> cmdmap;
484 if (!cmdmap_from_json(m->cmd, &cmdmap, ss)) {
485 // ss has reason for failure
486 string rs = ss.str();
487 mon->reply_command(op, -EINVAL, rs, get_last_committed());
488 return true;
489 }
490
491 string prefix;
492 cmd_getval(g_ceph_context, cmdmap, "prefix", prefix);
493
494 MonSession *session = m->get_session();
495 if (!session) {
496 mon->reply_command(op, -EACCES, "access denied", get_last_committed());
497 return true;
498 }
499
500 if (prefix == "log") {
501 vector<string> logtext;
502 cmd_getval(g_ceph_context, cmdmap, "logtext", logtext);
503 LogEntry le;
504 le.who = m->get_orig_source_inst();
505 le.name = session->entity_name;
506 le.stamp = m->get_recv_stamp();
507 le.seq = 0;
508 le.prio = CLOG_INFO;
509 le.channel = CLOG_CHANNEL_DEFAULT;
510 le.msg = str_join(logtext, " ");
511 pending_summary.add(le);
512 pending_summary.prune(g_conf->mon_log_max_summary);
513 pending_log.insert(pair<utime_t,LogEntry>(le.stamp, le));
514 wait_for_finished_proposal(op, new Monitor::C_Command(
515 mon, op, 0, string(), get_last_committed() + 1));
516 return true;
517 }
518
519 getline(ss, rs);
520 mon->reply_command(op, err, rs, get_last_committed());
521 return false;
522 }
523
524
525 int LogMonitor::sub_name_to_id(const string& n)
526 {
527 if (n.substr(0, 4) == "log-" && n.size() > 4) {
528 return LogEntry::str_to_level(n.substr(4));
529 } else {
530 return CLOG_UNKNOWN;
531 }
532 }
533
534 void LogMonitor::check_subs()
535 {
536 dout(10) << __func__ << dendl;
537 for (map<string, xlist<Subscription*>*>::iterator i = mon->session_map.subs.begin();
538 i != mon->session_map.subs.end();
539 ++i) {
540 for (xlist<Subscription*>::iterator j = i->second->begin(); !j.end(); ++j) {
541 if (sub_name_to_id((*j)->type) >= 0)
542 check_sub(*j);
543 }
544 }
545 }
546
547 void LogMonitor::check_sub(Subscription *s)
548 {
549 dout(10) << __func__ << " client wants " << s->type << " ver " << s->next << dendl;
550
551 int sub_level = sub_name_to_id(s->type);
552 assert(sub_level >= 0);
553
554 version_t summary_version = summary.version;
555 if (s->next > summary_version) {
556 dout(10) << __func__ << " client " << s->session->inst
557 << " requested version (" << s->next << ") is greater than ours ("
558 << summary_version << "), which means we already sent him"
559 << " everything we have." << dendl;
560 return;
561 }
562
563 MLog *mlog = new MLog(mon->monmap->fsid);
564
565 if (s->next == 0) {
566 /* First timer, heh? */
567 bool ret = _create_sub_summary(mlog, sub_level);
568 if (!ret) {
569 dout(1) << __func__ << " ret = " << ret << dendl;
570 mlog->put();
571 return;
572 }
573 } else {
574 /* let us send you an incremental log... */
575 _create_sub_incremental(mlog, sub_level, s->next);
576 }
577
578 dout(1) << __func__ << " sending message to " << s->session->inst
579 << " with " << mlog->entries.size() << " entries"
580 << " (version " << mlog->version << ")" << dendl;
581
582 if (!mlog->entries.empty()) {
583 s->session->con->send_message(mlog);
584 } else {
585 mlog->put();
586 }
587 if (s->onetime)
588 mon->session_map.remove_sub(s);
589 else
590 s->next = summary_version+1;
591 }
592
593 /**
594 * Create a log message containing only the last message in the summary.
595 *
596 * @param mlog Log message we'll send to the client.
597 * @param level Maximum log level the client is interested in.
598 * @return 'true' if we consider we successfully populated @mlog;
599 * 'false' otherwise.
600 */
601 bool LogMonitor::_create_sub_summary(MLog *mlog, int level)
602 {
603 dout(10) << __func__ << dendl;
604
605 assert(mlog != NULL);
606
607 if (!summary.tail.size())
608 return false;
609
610 list<LogEntry>::reverse_iterator it = summary.tail.rbegin();
611 for (; it != summary.tail.rend(); ++it) {
612 LogEntry e = *it;
613 if (e.prio < level)
614 continue;
615
616 mlog->entries.push_back(e);
617 mlog->version = summary.version;
618 break;
619 }
620
621 return true;
622 }
623
624 /**
625 * Create an incremental log message from version \p sv to \p summary.version
626 *
627 * @param mlog Log message we'll send to the client with the messages received
628 * since version \p sv, inclusive.
629 * @param level The max log level of the messages the client is interested in.
630 * @param sv The version the client is looking for.
631 */
632 void LogMonitor::_create_sub_incremental(MLog *mlog, int level, version_t sv)
633 {
634 dout(10) << __func__ << " level " << level << " ver " << sv
635 << " cur summary ver " << summary.version << dendl;
636
637 if (sv < get_first_committed()) {
638 dout(10) << __func__ << " skipped from " << sv
639 << " to first_committed " << get_first_committed() << dendl;
640 LogEntry le;
641 le.stamp = ceph_clock_now();
642 le.prio = CLOG_WARN;
643 ostringstream ss;
644 ss << "skipped log messages from " << sv << " to " << get_first_committed();
645 le.msg = ss.str();
646 mlog->entries.push_back(le);
647 sv = get_first_committed();
648 }
649
650 version_t summary_ver = summary.version;
651 while (sv && sv <= summary_ver) {
652 bufferlist bl;
653 int err = get_version(sv, bl);
654 assert(err == 0);
655 assert(bl.length());
656 bufferlist::iterator p = bl.begin();
657 __u8 v;
658 ::decode(v,p);
659 while (!p.end()) {
660 LogEntry le;
661 le.decode(p);
662
663 if (le.prio < level) {
664 dout(20) << __func__ << " requested " << level
665 << " entry " << le.prio << dendl;
666 continue;
667 }
668
669 mlog->entries.push_back(le);
670 }
671 mlog->version = sv++;
672 }
673
674 dout(10) << __func__ << " incremental message ready ("
675 << mlog->entries.size() << " entries)" << dendl;
676 }
677
678 void LogMonitor::update_log_channels()
679 {
680 ostringstream oss;
681
682 channels.clear();
683
684 int r = get_conf_str_map_helper(g_conf->mon_cluster_log_to_syslog,
685 oss, &channels.log_to_syslog,
686 CLOG_CONFIG_DEFAULT_KEY);
687 if (r < 0) {
688 derr << __func__ << " error parsing 'mon_cluster_log_to_syslog'" << dendl;
689 return;
690 }
691
692 r = get_conf_str_map_helper(g_conf->mon_cluster_log_to_syslog_level,
693 oss, &channels.syslog_level,
694 CLOG_CONFIG_DEFAULT_KEY);
695 if (r < 0) {
696 derr << __func__ << " error parsing 'mon_cluster_log_to_syslog_level'"
697 << dendl;
698 return;
699 }
700
701 r = get_conf_str_map_helper(g_conf->mon_cluster_log_to_syslog_facility,
702 oss, &channels.syslog_facility,
703 CLOG_CONFIG_DEFAULT_KEY);
704 if (r < 0) {
705 derr << __func__ << " error parsing 'mon_cluster_log_to_syslog_facility'"
706 << dendl;
707 return;
708 }
709
710 r = get_conf_str_map_helper(g_conf->mon_cluster_log_file, oss,
711 &channels.log_file,
712 CLOG_CONFIG_DEFAULT_KEY);
713 if (r < 0) {
714 derr << __func__ << " error parsing 'mon_cluster_log_file'" << dendl;
715 return;
716 }
717
718 r = get_conf_str_map_helper(g_conf->mon_cluster_log_file_level, oss,
719 &channels.log_file_level,
720 CLOG_CONFIG_DEFAULT_KEY);
721 if (r < 0) {
722 derr << __func__ << " error parsing 'mon_cluster_log_file_level'"
723 << dendl;
724 return;
725 }
726
727 r = get_conf_str_map_helper(g_conf->mon_cluster_log_to_graylog, oss,
728 &channels.log_to_graylog,
729 CLOG_CONFIG_DEFAULT_KEY);
730 if (r < 0) {
731 derr << __func__ << " error parsing 'mon_cluster_log_to_graylog'"
732 << dendl;
733 return;
734 }
735
736 r = get_conf_str_map_helper(g_conf->mon_cluster_log_to_graylog_host, oss,
737 &channels.log_to_graylog_host,
738 CLOG_CONFIG_DEFAULT_KEY);
739 if (r < 0) {
740 derr << __func__ << " error parsing 'mon_cluster_log_to_graylog_host'"
741 << dendl;
742 return;
743 }
744
745 r = get_conf_str_map_helper(g_conf->mon_cluster_log_to_graylog_port, oss,
746 &channels.log_to_graylog_port,
747 CLOG_CONFIG_DEFAULT_KEY);
748 if (r < 0) {
749 derr << __func__ << " error parsing 'mon_cluster_log_to_graylog_port'"
750 << dendl;
751 return;
752 }
753
754 channels.expand_channel_meta();
755 }
756
757 void LogMonitor::log_channel_info::expand_channel_meta(map<string,string> &m)
758 {
759 generic_dout(20) << __func__ << " expand map: " << m << dendl;
760 for (map<string,string>::iterator p = m.begin(); p != m.end(); ++p) {
761 m[p->first] = expand_channel_meta(p->second, p->first);
762 }
763 generic_dout(20) << __func__ << " expanded map: " << m << dendl;
764 }
765
766 string LogMonitor::log_channel_info::expand_channel_meta(
767 const string &input,
768 const string &change_to)
769 {
770 size_t pos = string::npos;
771 string s(input);
772 while ((pos = s.find(LOG_META_CHANNEL)) != string::npos) {
773 string tmp = s.substr(0, pos) + change_to;
774 if (pos+LOG_META_CHANNEL.length() < s.length())
775 tmp += s.substr(pos+LOG_META_CHANNEL.length());
776 s = tmp;
777 }
778 generic_dout(20) << __func__ << " from '" << input
779 << "' to '" << s << "'" << dendl;
780
781 return s;
782 }
783
784 bool LogMonitor::log_channel_info::do_log_to_syslog(const string &channel) {
785 string v = get_str_map_key(log_to_syslog, channel,
786 &CLOG_CONFIG_DEFAULT_KEY);
787 // We expect booleans, but they are in k/v pairs, kept
788 // as strings, in 'log_to_syslog'. We must ensure
789 // compatibility with existing boolean handling, and so
790 // we are here using a modified version of how
791 // md_config_t::set_val_raw() handles booleans. We will
792 // accept both 'true' and 'false', but will also check for
793 // '1' and '0'. The main distiction between this and the
794 // original code is that we will assume everything not '1',
795 // '0', 'true' or 'false' to be 'false'.
796 bool ret = false;
797
798 if (boost::iequals(v, "false")) {
799 ret = false;
800 } else if (boost::iequals(v, "true")) {
801 ret = true;
802 } else {
803 std::string err;
804 int b = strict_strtol(v.c_str(), 10, &err);
805 ret = (err.empty() && b == 1);
806 }
807
808 return ret;
809 }
810
811 ceph::logging::Graylog::Ref LogMonitor::log_channel_info::get_graylog(
812 const string &channel)
813 {
814 generic_dout(25) << __func__ << " for channel '"
815 << channel << "'" << dendl;
816
817 if (graylogs.count(channel) == 0) {
818 auto graylog(std::make_shared<ceph::logging::Graylog>("mon"));
819
820 graylog->set_fsid(g_conf->get_val<uuid_d>("fsid"));
821 graylog->set_hostname(g_conf->host);
822 graylog->set_destination(get_str_map_key(log_to_graylog_host, channel,
823 &CLOG_CONFIG_DEFAULT_KEY),
824 atoi(get_str_map_key(log_to_graylog_port, channel,
825 &CLOG_CONFIG_DEFAULT_KEY).c_str()));
826
827 graylogs[channel] = graylog;
828 generic_dout(20) << __func__ << " for channel '"
829 << channel << "' to graylog host '"
830 << log_to_graylog_host[channel] << ":"
831 << log_to_graylog_port[channel]
832 << "'" << dendl;
833 }
834 return graylogs[channel];
835 }
836
837 void LogMonitor::handle_conf_change(const struct md_config_t *conf,
838 const std::set<std::string> &changed)
839 {
840 if (changed.count("mon_cluster_log_to_syslog") ||
841 changed.count("mon_cluster_log_to_syslog_level") ||
842 changed.count("mon_cluster_log_to_syslog_facility") ||
843 changed.count("mon_cluster_log_file") ||
844 changed.count("mon_cluster_log_file_level") ||
845 changed.count("mon_cluster_log_to_graylog") ||
846 changed.count("mon_cluster_log_to_graylog_host") ||
847 changed.count("mon_cluster_log_to_graylog_port")) {
848 update_log_channels();
849 }
850 }