1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
22 - LogSummary contains last N entries for every channel
23 - LogSummary (as "full") written on every commit
24 - LogSummary contains "keys" which LogEntryKey hash_set for the
25 same set of entries (for deduping)
29 - LogSummary contains, for each channel,
31 - end seq (last written seq + 1)
32 - LogSummary contains an LRUSet for tracking dups
33 - LogSummary written every N commits
34 - each LogEntry written in a separate key
35 - "%s/%08x" % (channel, seq) -> LogEntry
36 - per-commit record includes channel -> begin (trim bounds)
37 - 'external_log_to' meta records version to which we have logged externally
43 #include <boost/algorithm/string/predicate.hpp>
48 #include "LogMonitor.h"
50 #include "MonitorDBStore.h"
52 #include "messages/MMonCommand.h"
53 #include "messages/MLog.h"
54 #include "messages/MLogAck.h"
55 #include "common/Graylog.h"
56 #include "common/Journald.h"
57 #include "common/errno.h"
58 #include "common/strtol.h"
59 #include "include/ceph_assert.h"
60 #include "include/str_list.h"
61 #include "include/str_map.h"
62 #include "include/compat.h"
64 #define dout_subsys ceph_subsys_mon
66 using namespace TOPNSPC::common
;
77 using std::ostringstream
;
82 using std::stringstream
;
85 using std::unique_ptr
;
87 using ceph::bufferlist
;
90 using ceph::Formatter
;
91 using ceph::JSONFormatter
;
92 using ceph::make_message
;
93 using ceph::mono_clock
;
94 using ceph::mono_time
;
95 using ceph::timespan_str
;
97 string
LogMonitor::log_channel_info::get_log_file(const string
&channel
)
99 dout(25) << __func__
<< " for channel '"
100 << channel
<< "'" << dendl
;
102 if (expanded_log_file
.count(channel
) == 0) {
103 string fname
= expand_channel_meta(
104 get_str_map_key(log_file
, channel
, &CLOG_CONFIG_DEFAULT_KEY
),
106 expanded_log_file
[channel
] = fname
;
108 dout(20) << __func__
<< " for channel '"
109 << channel
<< "' expanded to '"
110 << fname
<< "'" << dendl
;
112 return expanded_log_file
[channel
];
116 void LogMonitor::log_channel_info::expand_channel_meta(map
<string
,string
> &m
)
118 dout(20) << __func__
<< " expand map: " << m
<< dendl
;
119 for (map
<string
,string
>::iterator p
= m
.begin(); p
!= m
.end(); ++p
) {
120 m
[p
->first
] = expand_channel_meta(p
->second
, p
->first
);
122 dout(20) << __func__
<< " expanded map: " << m
<< dendl
;
125 string
LogMonitor::log_channel_info::expand_channel_meta(
127 const string
&change_to
)
129 size_t pos
= string::npos
;
131 while ((pos
= s
.find(LOG_META_CHANNEL
)) != string::npos
) {
132 string tmp
= s
.substr(0, pos
) + change_to
;
133 if (pos
+LOG_META_CHANNEL
.length() < s
.length())
134 tmp
+= s
.substr(pos
+LOG_META_CHANNEL
.length());
137 dout(20) << __func__
<< " from '" << input
138 << "' to '" << s
<< "'" << dendl
;
143 bool LogMonitor::log_channel_info::do_log_to_syslog(const string
&channel
) {
144 string v
= get_str_map_key(log_to_syslog
, channel
,
145 &CLOG_CONFIG_DEFAULT_KEY
);
146 // We expect booleans, but they are in k/v pairs, kept
147 // as strings, in 'log_to_syslog'. We must ensure
148 // compatibility with existing boolean handling, and so
149 // we are here using a modified version of how
150 // md_config_t::set_val_raw() handles booleans. We will
151 // accept both 'true' and 'false', but will also check for
152 // '1' and '0'. The main distiction between this and the
153 // original code is that we will assume everything not '1',
154 // '0', 'true' or 'false' to be 'false'.
157 if (boost::iequals(v
, "false")) {
159 } else if (boost::iequals(v
, "true")) {
163 int b
= strict_strtol(v
.c_str(), 10, &err
);
164 ret
= (err
.empty() && b
== 1);
170 ceph::logging::Graylog::Ref
LogMonitor::log_channel_info::get_graylog(
171 const string
&channel
)
173 dout(25) << __func__
<< " for channel '"
174 << channel
<< "'" << dendl
;
176 if (graylogs
.count(channel
) == 0) {
177 auto graylog(std::make_shared
<ceph::logging::Graylog
>("mon"));
179 graylog
->set_fsid(g_conf().get_val
<uuid_d
>("fsid"));
180 graylog
->set_hostname(g_conf()->host
);
181 graylog
->set_destination(get_str_map_key(log_to_graylog_host
, channel
,
182 &CLOG_CONFIG_DEFAULT_KEY
),
183 atoi(get_str_map_key(log_to_graylog_port
, channel
,
184 &CLOG_CONFIG_DEFAULT_KEY
).c_str()));
186 graylogs
[channel
] = graylog
;
187 dout(20) << __func__
<< " for channel '"
188 << channel
<< "' to graylog host '"
189 << log_to_graylog_host
[channel
] << ":"
190 << log_to_graylog_port
[channel
]
193 return graylogs
[channel
];
196 ceph::logging::JournaldClusterLogger
&LogMonitor::log_channel_info::get_journald()
198 dout(25) << __func__
<< dendl
;
201 journald
= std::make_unique
<ceph::logging::JournaldClusterLogger
>();
206 void LogMonitor::log_channel_info::clear()
208 log_to_syslog
.clear();
209 syslog_level
.clear();
210 syslog_facility
.clear();
212 expanded_log_file
.clear();
213 log_file_level
.clear();
214 log_to_graylog
.clear();
215 log_to_graylog_host
.clear();
216 log_to_graylog_port
.clear();
217 log_to_journald
.clear();
222 LogMonitor::log_channel_info::log_channel_info() = default;
223 LogMonitor::log_channel_info::~log_channel_info() = default;
227 #define dout_prefix _prefix(_dout, mon, get_last_committed())
228 static ostream
& _prefix(std::ostream
*_dout
, Monitor
&mon
, version_t v
) {
229 return *_dout
<< "mon." << mon
.name
<< "@" << mon
.rank
230 << "(" << mon
.get_state_name()
231 << ").log v" << v
<< " ";
234 ostream
& operator<<(ostream
&out
, const LogMonitor
&pm
)
240 Tick function to update the map based on performance every N seconds
243 void LogMonitor::tick()
245 if (!is_active()) return;
247 dout(10) << *this << dendl
;
251 void LogMonitor::create_initial()
253 dout(10) << "create_initial -- creating initial map" << dendl
;
255 e
.name
= g_conf()->name
;
256 e
.rank
= entity_name_t::MON(mon
.rank
);
257 e
.addrs
= mon
.messenger
->get_myaddrs();
258 e
.stamp
= ceph_clock_now();
260 e
.channel
= CLOG_CHANNEL_CLUSTER
;
261 std::stringstream ss
;
262 ss
<< "mkfs " << mon
.monmap
->get_fsid();
265 pending_log
.insert(pair
<utime_t
,LogEntry
>(e
.stamp
, e
));
268 void LogMonitor::update_from_paxos(bool *need_bootstrap
)
270 dout(10) << __func__
<< dendl
;
271 version_t version
= get_last_committed();
272 dout(10) << __func__
<< " version " << version
273 << " summary v " << summary
.version
<< dendl
;
275 log_external_backlog();
277 if (version
== summary
.version
)
279 ceph_assert(version
>= summary
.version
);
281 version_t latest_full
= get_version_latest_full();
282 dout(10) << __func__
<< " latest full " << latest_full
<< dendl
;
283 if ((latest_full
> 0) && (latest_full
> summary
.version
)) {
284 bufferlist latest_bl
;
285 get_version_full(latest_full
, latest_bl
);
286 ceph_assert(latest_bl
.length() != 0);
287 dout(7) << __func__
<< " loading summary e" << latest_full
<< dendl
;
288 auto p
= latest_bl
.cbegin();
290 dout(7) << __func__
<< " loaded summary e" << summary
.version
<< dendl
;
293 // walk through incrementals
294 while (version
> summary
.version
) {
296 int err
= get_version(summary
.version
+1, bl
);
297 ceph_assert(err
== 0);
298 ceph_assert(bl
.length());
300 auto p
= bl
.cbegin();
304 // legacy pre-quincy commits
308 dout(7) << "update_from_paxos applying incremental log "
309 << summary
.version
+1 << " " << le
<< dendl
;
310 summary
.add_legacy(le
);
318 dout(7) << "update_from_paxos applying incremental log "
319 << summary
.version
+1 << " " << le
<< dendl
;
320 summary
.recent_keys
.insert(le
.key());
321 summary
.channel_info
[le
.channel
].second
++;
322 // we may have logged past the (persisted) summary in a prior quorum
323 if (version
> external_log_to
) {
327 map
<string
,version_t
> prune_channels_to
;
328 decode(prune_channels_to
, p
);
329 for (auto& [channel
, prune_to
] : prune_channels_to
) {
330 dout(20) << __func__
<< " channel " << channel
331 << " pruned to " << prune_to
<< dendl
;
332 summary
.channel_info
[channel
].first
= prune_to
;
334 // zero out pre-quincy fields (encode_pending needs this to reliably detect
336 summary
.tail_by_channel
.clear();
337 summary
.keys
.clear();
341 summary
.prune(g_conf()->mon_log_max_summary
);
343 dout(10) << " summary.channel_info " << summary
.channel_info
<< dendl
;
344 external_log_to
= version
;
345 mon
.store
->write_meta("external_log_to", stringify(external_log_to
));
350 void LogMonitor::log_external(const LogEntry
& le
)
352 string channel
= le
.channel
;
353 if (channel
.empty()) { // keep retrocompatibility
354 channel
= CLOG_CHANNEL_CLUSTER
;
357 if (g_conf().get_val
<bool>("mon_cluster_log_to_stderr")) {
358 cerr
<< channel
<< " " << le
<< std::endl
;
361 if (channels
.do_log_to_syslog(channel
)) {
362 string level
= channels
.get_level(channel
);
363 string facility
= channels
.get_facility(channel
);
364 if (level
.empty() || facility
.empty()) {
365 derr
<< __func__
<< " unable to log to syslog -- level or facility"
366 << " not defined (level: " << level
<< ", facility: "
367 << facility
<< ")" << dendl
;
369 le
.log_to_syslog(channels
.get_level(channel
),
370 channels
.get_facility(channel
));
374 if (channels
.do_log_to_graylog(channel
)) {
375 ceph::logging::Graylog::Ref graylog
= channels
.get_graylog(channel
);
377 graylog
->log_log_entry(&le
);
379 dout(7) << "graylog: " << channel
<< " " << graylog
380 << " host:" << channels
.log_to_graylog_host
<< dendl
;
383 if (channels
.do_log_to_journald(channel
)) {
384 auto &journald
= channels
.get_journald();
385 journald
.log_log_entry(le
);
386 dout(7) << "journald: " << channel
<< dendl
;
389 if (g_conf()->mon_cluster_log_to_file
) {
390 if (this->log_rotated
.exchange(false)) {
391 this->log_external_close_fds();
394 auto p
= channel_fds
.find(channel
);
396 if (p
== channel_fds
.end()) {
397 string log_file
= channels
.get_log_file(channel
);
398 dout(20) << __func__
<< " logging for channel '" << channel
399 << "' to file '" << log_file
<< "'" << dendl
;
400 if (log_file
.empty()) {
401 // do not log this channel
404 fd
= ::open(log_file
.c_str(), O_WRONLY
|O_APPEND
|O_CREAT
|O_CLOEXEC
, 0600);
407 dout(1) << "unable to write to '" << log_file
<< "' for channel '"
408 << channel
<< "': " << cpp_strerror(err
) << dendl
;
410 channel_fds
[channel
] = fd
;
418 fmt::format_to(std::back_inserter(file_log_buffer
), "{}\n", le
);
419 int err
= safe_write(fd
, file_log_buffer
.data(), file_log_buffer
.size());
420 file_log_buffer
.clear();
422 dout(1) << "error writing to '" << channels
.get_log_file(channel
)
423 << "' for channel '" << channel
424 << ": " << cpp_strerror(err
) << dendl
;
426 channel_fds
.erase(channel
);
432 void LogMonitor::log_external_close_fds()
434 for (auto& [channel
, fd
] : channel_fds
) {
436 dout(10) << __func__
<< " closing " << channel
<< " (" << fd
<< ")" << dendl
;
443 /// catch external logs up to summary.version
444 void LogMonitor::log_external_backlog()
446 if (!external_log_to
) {
448 int r
= mon
.store
->read_meta("external_log_to", &cur_str
);
450 external_log_to
= std::stoull(cur_str
);
451 dout(10) << __func__
<< " initialized external_log_to = " << external_log_to
452 << " (recorded log_to position)" << dendl
;
454 // pre-quincy, we assumed that anything through summary.version was
455 // logged externally.
456 assert(r
== -ENOENT
);
457 external_log_to
= summary
.version
;
458 dout(10) << __func__
<< " initialized external_log_to = " << external_log_to
459 << " (summary v " << summary
.version
<< ")" << dendl
;
462 // we may have logged ahead of summary.version, but never ahead of paxos
463 if (external_log_to
> get_last_committed()) {
464 derr
<< __func__
<< " rewinding external_log_to from " << external_log_to
465 << " -> " << get_last_committed() << " (sync_force? mon rebuild?)" << dendl
;
466 external_log_to
= get_last_committed();
468 if (external_log_to
>= summary
.version
) {
471 if (auto first
= get_first_committed(); external_log_to
< first
) {
472 derr
<< __func__
<< " local logs at " << external_log_to
473 << ", skipping to " << first
<< dendl
;
474 external_log_to
= first
;
475 // FIXME: write marker in each channel log file?
477 for (; external_log_to
< summary
.version
; ++external_log_to
) {
479 int err
= get_version(external_log_to
+1, bl
);
480 ceph_assert(err
== 0);
481 ceph_assert(bl
.length());
482 auto p
= bl
.cbegin();
489 while ((num
== -2 && !p
.end()) || (num
>= 0 && num
--)) {
495 mon
.store
->write_meta("external_log_to", stringify(external_log_to
));
498 void LogMonitor::create_pending()
501 pending_keys
.clear();
502 dout(10) << "create_pending v " << (get_last_committed() + 1) << dendl
;
505 void LogMonitor::generate_logentry_key(
506 const std::string
& channel
,
510 out
->append(channel
);
513 snprintf(vs
, sizeof(vs
), "%08llx", (unsigned long long)v
);
517 void LogMonitor::encode_pending(MonitorDBStore::TransactionRef t
)
519 version_t version
= get_last_committed() + 1;
521 dout(10) << __func__
<< " v" << version
<< dendl
;
523 if (mon
.monmap
->min_mon_release
< ceph_release_t::quincy
) {
524 // legacy encoding for pre-quincy quorum
526 encode(struct_v
, bl
);
527 for (auto& p
: pending_log
) {
528 p
.second
.encode(bl
, mon
.get_quorum_con_features());
530 put_version(t
, version
, bl
);
531 put_last_committed(t
, version
);
536 encode(struct_v
, bl
);
538 // first commit after upgrading to quincy?
539 if (!summary
.tail_by_channel
.empty()) {
540 // include past log entries
541 for (auto& p
: summary
.tail_by_channel
) {
542 for (auto& q
: p
.second
) {
543 pending_log
.emplace(make_pair(q
.second
.stamp
, q
.second
));
548 // record new entries
549 auto pending_channel_info
= summary
.channel_info
;
550 uint32_t num
= pending_log
.size();
552 dout(20) << __func__
<< " writing " << num
<< " entries" << dendl
;
553 for (auto& p
: pending_log
) {
555 p
.second
.encode(ebl
, mon
.get_quorum_con_features());
557 auto& bounds
= pending_channel_info
[p
.second
.channel
];
558 version_t v
= bounds
.second
++;
560 generate_logentry_key(p
.second
.channel
, v
, &key
);
561 t
->put(get_service_name(), key
, ebl
);
563 bl
.claim_append(ebl
);
566 // prune log entries?
567 map
<string
,version_t
> prune_channels_to
;
568 for (auto& [channel
, info
] : summary
.channel_info
) {
569 if (info
.second
- info
.first
> g_conf()->mon_log_max
) {
570 const version_t from
= info
.first
;
571 const version_t to
= info
.second
- g_conf()->mon_log_max
;
572 dout(10) << __func__
<< " pruning channel " << channel
573 << " " << from
<< " -> " << to
<< dendl
;
574 prune_channels_to
[channel
] = to
;
575 pending_channel_info
[channel
].first
= to
;
576 for (version_t v
= from
; v
< to
; ++v
) {
578 generate_logentry_key(channel
, v
, &key
);
579 t
->erase(get_service_name(), key
);
583 dout(20) << __func__
<< " prune_channels_to " << prune_channels_to
<< dendl
;
584 encode(prune_channels_to
, bl
);
586 put_version(t
, version
, bl
);
587 put_last_committed(t
, version
);
590 bool LogMonitor::should_stash_full()
592 if (mon
.monmap
->min_mon_release
< ceph_release_t::quincy
) {
593 // commit a LogSummary on every commit
597 // store periodic summary
598 auto period
= std::min
<uint64_t>(
599 g_conf()->mon_log_full_interval
,
600 g_conf()->mon_max_log_epochs
602 return (get_last_committed() - get_version_latest_full() > period
);
606 void LogMonitor::encode_full(MonitorDBStore::TransactionRef t
)
608 dout(10) << __func__
<< " log v " << summary
.version
<< dendl
;
609 ceph_assert(get_last_committed() == summary
.version
);
611 bufferlist summary_bl
;
612 encode(summary
, summary_bl
, mon
.get_quorum_con_features());
614 put_version_full(t
, summary
.version
, summary_bl
);
615 put_version_latest_full(t
, summary
.version
);
618 version_t
LogMonitor::get_trim_to() const
620 if (!mon
.is_leader())
623 unsigned max
= g_conf()->mon_max_log_epochs
;
624 version_t version
= get_last_committed();
626 return version
- max
;
630 bool LogMonitor::preprocess_query(MonOpRequestRef op
)
632 op
->mark_logmon_event("preprocess_query");
633 auto m
= op
->get_req
<PaxosServiceMessage
>();
634 dout(10) << "preprocess_query " << *m
<< " from " << m
->get_orig_source_inst() << dendl
;
635 switch (m
->get_type()) {
636 case MSG_MON_COMMAND
:
638 return preprocess_command(op
);
639 } catch (const bad_cmd_get
& e
) {
641 mon
.reply_command(op
, -EINVAL
, e
.what(), bl
, get_last_committed());
646 return preprocess_log(op
);
654 bool LogMonitor::prepare_update(MonOpRequestRef op
)
656 op
->mark_logmon_event("prepare_update");
657 auto m
= op
->get_req
<PaxosServiceMessage
>();
658 dout(10) << "prepare_update " << *m
<< " from " << m
->get_orig_source_inst() << dendl
;
659 switch (m
->get_type()) {
660 case MSG_MON_COMMAND
:
662 return prepare_command(op
);
663 } catch (const bad_cmd_get
& e
) {
665 mon
.reply_command(op
, -EINVAL
, e
.what(), bl
, get_last_committed());
669 return prepare_log(op
);
676 bool LogMonitor::preprocess_log(MonOpRequestRef op
)
678 op
->mark_logmon_event("preprocess_log");
679 auto m
= op
->get_req
<MLog
>();
680 dout(10) << "preprocess_log " << *m
<< " from " << m
->get_orig_source() << dendl
;
683 MonSession
*session
= op
->get_session();
686 if (!session
->is_capable("log", MON_CAP_W
)) {
687 dout(0) << "preprocess_log got MLog from entity with insufficient privileges "
688 << session
->caps
<< dendl
;
692 for (auto p
= m
->entries
.begin();
693 p
!= m
->entries
.end();
695 if (!summary
.contains(p
->key()))
699 dout(10) << " nothing new" << dendl
;
710 struct LogMonitor::C_Log
: public C_MonOp
{
712 C_Log(LogMonitor
*p
, MonOpRequestRef o
) :
713 C_MonOp(o
), logmon(p
) {}
714 void _finish(int r
) override
{
715 if (r
== -ECANCELED
) {
718 logmon
->_updated_log(op
);
722 bool LogMonitor::prepare_log(MonOpRequestRef op
)
724 op
->mark_logmon_event("prepare_log");
725 auto m
= op
->get_req
<MLog
>();
726 dout(10) << "prepare_log " << *m
<< " from " << m
->get_orig_source() << dendl
;
728 if (m
->fsid
!= mon
.monmap
->fsid
) {
729 dout(0) << "handle_log on fsid " << m
->fsid
<< " != " << mon
.monmap
->fsid
734 for (auto p
= m
->entries
.begin();
735 p
!= m
->entries
.end();
737 dout(10) << " logging " << *p
<< dendl
;
738 if (!summary
.contains(p
->key()) &&
739 !pending_keys
.count(p
->key())) {
740 pending_keys
.insert(p
->key());
741 pending_log
.insert(pair
<utime_t
,LogEntry
>(p
->stamp
, *p
));
744 wait_for_finished_proposal(op
, new C_Log(this, op
));
748 void LogMonitor::_updated_log(MonOpRequestRef op
)
750 auto m
= op
->get_req
<MLog
>();
751 dout(7) << "_updated_log for " << m
->get_orig_source_inst() << dendl
;
752 mon
.send_reply(op
, new MLogAck(m
->fsid
, m
->entries
.rbegin()->seq
));
755 bool LogMonitor::should_propose(double& delay
)
757 // commit now if we have a lot of pending events
758 if (g_conf()->mon_max_log_entries_per_event
> 0 &&
759 pending_log
.size() >= (unsigned)g_conf()->mon_max_log_entries_per_event
)
762 // otherwise fall back to generic policy
763 return PaxosService::should_propose(delay
);
767 bool LogMonitor::preprocess_command(MonOpRequestRef op
)
769 op
->mark_logmon_event("preprocess_command");
770 auto m
= op
->get_req
<MMonCommand
>();
776 if (!cmdmap_from_json(m
->cmd
, &cmdmap
, ss
)) {
777 string rs
= ss
.str();
778 mon
.reply_command(op
, -EINVAL
, rs
, get_last_committed());
781 MonSession
*session
= op
->get_session();
783 mon
.reply_command(op
, -EACCES
, "access denied", get_last_committed());
788 cmd_getval(cmdmap
, "prefix", prefix
);
790 string format
= cmd_getval_or
<string
>(cmdmap
, "format", "plain");
791 boost::scoped_ptr
<Formatter
> f(Formatter::create(format
));
793 if (prefix
== "log last") {
795 cmd_getval(cmdmap
, "num", num
);
797 f
->open_array_section("tail");
800 std::string level_str
;
802 if (cmd_getval(cmdmap
, "level", level_str
)) {
803 level
= LogEntry::str_to_level(level_str
);
804 if (level
== CLOG_UNKNOWN
) {
805 ss
<< "Invalid severity '" << level_str
<< "'";
806 mon
.reply_command(op
, -EINVAL
, ss
.str(), get_last_committed());
814 if (!cmd_getval(cmdmap
, "channel", channel
)) {
815 channel
= CLOG_CHANNEL_DEFAULT
;
818 // We'll apply this twice, once while counting out lines
819 // and once while outputting them.
820 auto match
= [level
](const LogEntry
&entry
) {
821 return entry
.prio
>= level
;
825 if (!summary
.tail_by_channel
.empty()) {
827 // Decrement operation that sets to container end when hitting rbegin
828 if (channel
== "*") {
829 list
<LogEntry
> full_tail
;
830 summary
.build_ordered_tail_legacy(&full_tail
);
831 auto rp
= full_tail
.rbegin();
832 for (; num
> 0 && rp
!= full_tail
.rend(); ++rp
) {
837 if (rp
== full_tail
.rend()) {
841 // Decrement a reverse iterator such that going past rbegin()
842 // sets it to rend(). This is for writing a for() loop that
843 // goes up to (and including) rbegin()
844 auto dec
= [&rp
, &full_tail
] () {
845 if (rp
== full_tail
.rbegin()) {
846 rp
= full_tail
.rend();
852 // Move forward to the end of the container (decrement the reverse
854 for (; rp
!= full_tail
.rend(); dec()) {
859 f
->dump_object("entry", *rp
);
865 auto p
= summary
.tail_by_channel
.find(channel
);
866 if (p
!= summary
.tail_by_channel
.end()) {
867 auto rp
= p
->second
.rbegin();
868 for (; num
> 0 && rp
!= p
->second
.rend(); ++rp
) {
869 if (match(rp
->second
)) {
873 if (rp
== p
->second
.rend()) {
877 // Decrement a reverse iterator such that going past rbegin()
878 // sets it to rend(). This is for writing a for() loop that
879 // goes up to (and including) rbegin()
880 auto dec
= [&rp
, &p
] () {
881 if (rp
== p
->second
.rbegin()) {
882 rp
= p
->second
.rend();
888 // Move forward to the end of the container (decrement the reverse
890 for (; rp
!= p
->second
.rend(); dec()) {
891 if (!match(rp
->second
)) {
895 f
->dump_object("entry", rp
->second
);
897 ss
<< rp
->second
<< "\n";
904 if (channel
== "*") {
905 // tail all channels; we need to mix by timestamp
906 multimap
<utime_t
,LogEntry
> entries
; // merge+sort all channels by timestamp
907 for (auto& p
: summary
.channel_info
) {
908 version_t from
= p
.second
.first
;
909 version_t to
= p
.second
.second
;
911 if (to
> (version_t
)num
) {
912 start
= std::max(to
- num
, from
);
916 dout(10) << __func__
<< " channnel " << p
.first
917 << " from " << from
<< " to " << to
<< dendl
;
918 for (version_t v
= start
; v
< to
; ++v
) {
921 generate_logentry_key(p
.first
, v
, &key
);
922 int r
= mon
.store
->get(get_service_name(), key
, ebl
);
924 derr
<< __func__
<< " missing key " << key
<< dendl
;
928 auto p
= ebl
.cbegin();
930 entries
.insert(make_pair(le
.stamp
, le
));
933 while ((int)entries
.size() > num
) {
934 entries
.erase(entries
.begin());
936 for (auto& p
: entries
) {
938 f
->dump_object("entry", p
.second
);
940 ss
<< p
.second
<< "\n";
945 auto p
= summary
.channel_info
.find(channel
);
946 if (p
!= summary
.channel_info
.end()) {
947 version_t from
= p
->second
.first
;
948 version_t to
= p
->second
.second
;
950 if (to
> (version_t
)num
) {
951 start
= std::max(to
- num
, from
);
955 dout(10) << __func__
<< " from " << from
<< " to " << to
<< dendl
;
956 for (version_t v
= start
; v
< to
; ++v
) {
959 generate_logentry_key(channel
, v
, &key
);
960 int r
= mon
.store
->get(get_service_name(), key
, ebl
);
962 derr
<< __func__
<< " missing key " << key
<< dendl
;
966 auto p
= ebl
.cbegin();
969 f
->dump_object("entry", le
);
981 rdata
.append(ss
.str());
990 mon
.reply_command(op
, r
, rs
, rdata
, get_last_committed());
995 bool LogMonitor::prepare_command(MonOpRequestRef op
)
997 op
->mark_logmon_event("prepare_command");
998 auto m
= op
->get_req
<MMonCommand
>();
1004 if (!cmdmap_from_json(m
->cmd
, &cmdmap
, ss
)) {
1005 // ss has reason for failure
1006 string rs
= ss
.str();
1007 mon
.reply_command(op
, -EINVAL
, rs
, get_last_committed());
1012 cmd_getval(cmdmap
, "prefix", prefix
);
1014 MonSession
*session
= op
->get_session();
1016 mon
.reply_command(op
, -EACCES
, "access denied", get_last_committed());
1020 if (prefix
== "log") {
1021 vector
<string
> logtext
;
1022 cmd_getval(cmdmap
, "logtext", logtext
);
1024 le
.rank
= m
->get_orig_source();
1025 le
.addrs
.v
.push_back(m
->get_orig_source_addr());
1026 le
.name
= session
->entity_name
;
1027 le
.stamp
= m
->get_recv_stamp();
1029 string level_str
= cmd_getval_or
<string
>(cmdmap
, "level", "info");
1030 le
.prio
= LogEntry::str_to_level(level_str
);
1031 le
.channel
= CLOG_CHANNEL_DEFAULT
;
1032 le
.msg
= str_join(logtext
, " ");
1033 pending_keys
.insert(le
.key());
1034 pending_log
.insert(pair
<utime_t
,LogEntry
>(le
.stamp
, le
));
1035 wait_for_finished_proposal(op
, new Monitor::C_Command(
1036 mon
, op
, 0, string(), get_last_committed() + 1));
1041 mon
.reply_command(op
, err
, rs
, get_last_committed());
1046 int LogMonitor::sub_name_to_id(const string
& n
)
1048 if (n
.substr(0, 4) == "log-" && n
.size() > 4) {
1049 return LogEntry::str_to_level(n
.substr(4));
1051 return CLOG_UNKNOWN
;
1055 void LogMonitor::check_subs()
1057 dout(10) << __func__
<< dendl
;
1058 for (map
<string
, xlist
<Subscription
*>*>::iterator i
= mon
.session_map
.subs
.begin();
1059 i
!= mon
.session_map
.subs
.end();
1061 for (xlist
<Subscription
*>::iterator j
= i
->second
->begin(); !j
.end(); ++j
) {
1062 if (sub_name_to_id((*j
)->type
) >= 0)
1068 void LogMonitor::check_sub(Subscription
*s
)
1070 dout(10) << __func__
<< " client wants " << s
->type
<< " ver " << s
->next
<< dendl
;
1072 int sub_level
= sub_name_to_id(s
->type
);
1073 ceph_assert(sub_level
>= 0);
1075 version_t summary_version
= summary
.version
;
1076 if (s
->next
> summary_version
) {
1077 dout(10) << __func__
<< " client " << s
->session
->name
1078 << " requested version (" << s
->next
<< ") is greater than ours ("
1079 << summary_version
<< "), which means we already sent him"
1080 << " everything we have." << dendl
;
1084 MLog
*mlog
= new MLog(mon
.monmap
->fsid
);
1087 /* First timer, heh? */
1088 _create_sub_incremental(mlog
, sub_level
, get_last_committed());
1090 /* let us send you an incremental log... */
1091 _create_sub_incremental(mlog
, sub_level
, s
->next
);
1094 dout(10) << __func__
<< " sending message to " << s
->session
->name
1095 << " with " << mlog
->entries
.size() << " entries"
1096 << " (version " << mlog
->version
<< ")" << dendl
;
1098 if (!mlog
->entries
.empty()) {
1099 s
->session
->con
->send_message(mlog
);
1104 mon
.session_map
.remove_sub(s
);
1106 s
->next
= summary_version
+1;
1110 * Create an incremental log message from version \p sv to \p summary.version
1112 * @param mlog Log message we'll send to the client with the messages received
1113 * since version \p sv, inclusive.
1114 * @param level The max log level of the messages the client is interested in.
1115 * @param sv The version the client is looking for.
1117 void LogMonitor::_create_sub_incremental(MLog
*mlog
, int level
, version_t sv
)
1119 dout(10) << __func__
<< " level " << level
<< " ver " << sv
1120 << " cur summary ver " << summary
.version
<< dendl
;
1122 if (sv
< get_first_committed()) {
1123 dout(10) << __func__
<< " skipped from " << sv
1124 << " to first_committed " << get_first_committed() << dendl
;
1126 le
.stamp
= ceph_clock_now();
1127 le
.prio
= CLOG_WARN
;
1129 ss
<< "skipped log messages from " << sv
<< " to " << get_first_committed();
1131 mlog
->entries
.push_back(le
);
1132 sv
= get_first_committed();
1135 version_t summary_ver
= summary
.version
;
1136 while (sv
&& sv
<= summary_ver
) {
1138 int err
= get_version(sv
, bl
);
1139 ceph_assert(err
== 0);
1140 ceph_assert(bl
.length());
1141 auto p
= bl
.cbegin();
1147 dout(20) << __func__
<< " sv " << sv
<< " has " << num
<< " entries" << dendl
;
1149 while ((num
== -2 && !p
.end()) || (num
>= 0 && num
--)) {
1152 if (le
.prio
< level
) {
1153 dout(20) << __func__
<< " requested " << level
1154 << ", skipping " << le
<< dendl
;
1157 mlog
->entries
.push_back(le
);
1159 mlog
->version
= sv
++;
1162 dout(10) << __func__
<< " incremental message ready ("
1163 << mlog
->entries
.size() << " entries)" << dendl
;
1166 void LogMonitor::update_log_channels()
1172 int r
= get_conf_str_map_helper(
1173 g_conf().get_val
<string
>("mon_cluster_log_to_syslog"),
1174 oss
, &channels
.log_to_syslog
,
1175 CLOG_CONFIG_DEFAULT_KEY
);
1177 derr
<< __func__
<< " error parsing 'mon_cluster_log_to_syslog'" << dendl
;
1181 r
= get_conf_str_map_helper(
1182 g_conf().get_val
<string
>("mon_cluster_log_to_syslog_level"),
1183 oss
, &channels
.syslog_level
,
1184 CLOG_CONFIG_DEFAULT_KEY
);
1186 derr
<< __func__
<< " error parsing 'mon_cluster_log_to_syslog_level'"
1191 r
= get_conf_str_map_helper(
1192 g_conf().get_val
<string
>("mon_cluster_log_to_syslog_facility"),
1193 oss
, &channels
.syslog_facility
,
1194 CLOG_CONFIG_DEFAULT_KEY
);
1196 derr
<< __func__
<< " error parsing 'mon_cluster_log_to_syslog_facility'"
1201 r
= get_conf_str_map_helper(
1202 g_conf().get_val
<string
>("mon_cluster_log_file"), oss
,
1204 CLOG_CONFIG_DEFAULT_KEY
);
1206 derr
<< __func__
<< " error parsing 'mon_cluster_log_file'" << dendl
;
1210 r
= get_conf_str_map_helper(
1211 g_conf().get_val
<string
>("mon_cluster_log_file_level"), oss
,
1212 &channels
.log_file_level
,
1213 CLOG_CONFIG_DEFAULT_KEY
);
1215 derr
<< __func__
<< " error parsing 'mon_cluster_log_file_level'"
1220 r
= get_conf_str_map_helper(
1221 g_conf().get_val
<string
>("mon_cluster_log_to_graylog"), oss
,
1222 &channels
.log_to_graylog
,
1223 CLOG_CONFIG_DEFAULT_KEY
);
1225 derr
<< __func__
<< " error parsing 'mon_cluster_log_to_graylog'"
1230 r
= get_conf_str_map_helper(
1231 g_conf().get_val
<string
>("mon_cluster_log_to_graylog_host"), oss
,
1232 &channels
.log_to_graylog_host
,
1233 CLOG_CONFIG_DEFAULT_KEY
);
1235 derr
<< __func__
<< " error parsing 'mon_cluster_log_to_graylog_host'"
1240 r
= get_conf_str_map_helper(
1241 g_conf().get_val
<string
>("mon_cluster_log_to_graylog_port"), oss
,
1242 &channels
.log_to_graylog_port
,
1243 CLOG_CONFIG_DEFAULT_KEY
);
1245 derr
<< __func__
<< " error parsing 'mon_cluster_log_to_graylog_port'"
1250 r
= get_conf_str_map_helper(
1251 g_conf().get_val
<string
>("mon_cluster_log_to_journald"), oss
,
1252 &channels
.log_to_journald
,
1253 CLOG_CONFIG_DEFAULT_KEY
);
1255 derr
<< __func__
<< " error parsing 'mon_cluster_log_to_journald'"
1260 channels
.expand_channel_meta();
1261 log_external_close_fds();
1265 void LogMonitor::handle_conf_change(const ConfigProxy
& conf
,
1266 const std::set
<std::string
> &changed
)
1268 if (changed
.count("mon_cluster_log_to_syslog") ||
1269 changed
.count("mon_cluster_log_to_syslog_level") ||
1270 changed
.count("mon_cluster_log_to_syslog_facility") ||
1271 changed
.count("mon_cluster_log_file") ||
1272 changed
.count("mon_cluster_log_file_level") ||
1273 changed
.count("mon_cluster_log_to_graylog") ||
1274 changed
.count("mon_cluster_log_to_graylog_host") ||
1275 changed
.count("mon_cluster_log_to_graylog_port") ||
1276 changed
.count("mon_cluster_log_to_journald") ||
1277 changed
.count("mon_cluster_log_to_file")) {
1278 update_log_channels();